fix: remove deprecated event send batching

This commit is contained in:
xeruf 2025-01-16 00:26:40 +01:00
parent 9ea491a301
commit dcc7778815
2 changed files with 15 additions and 46 deletions

View file

@ -13,9 +13,8 @@ const UNDO_DELAY: u64 = 60;
#[derive(Debug, Clone, Eq, PartialEq)] #[derive(Debug, Clone, Eq, PartialEq)]
pub(crate) enum MostrMessage { pub(crate) enum MostrMessage {
Flush,
NewRelay(RelayUrl), NewRelay(RelayUrl),
AddTasks(RelayUrl, Vec<Event>), SendTask(RelayUrl, Event),
} }
type Events = Vec<Event>; type Events = Vec<Event>;
@ -68,15 +67,9 @@ impl EventSender {
debug!("Flushing {} events from queue", self.queue.borrow().len()); debug!("Flushing {} events from queue", self.queue.borrow().len());
let values = self.clear(); let values = self.clear();
self.url.as_ref().map(|url| { self.url.as_ref().map(|url| {
self.tx values.into_iter()
.try_send(MostrMessage::AddTasks(url.clone(), values)) .find_map(|event| self.tx.try_send(MostrMessage::SendTask(url.clone(), event)).err())
.err() .map(|e| error!("Nostr communication thread failure, changes will not be persisted: {}", e))
.map(|e| {
error!(
"Nostr communication thread failure, changes will not be persisted: {}",
e
)
})
}); });
} }
/// Sends all pending events if there is a non-tracking event /// Sends all pending events if there is a non-tracking event

View file

@ -22,6 +22,7 @@ use itertools::Itertools;
use keyring::Entry; use keyring::Entry;
use log::{debug, error, info, trace, warn, LevelFilter}; use log::{debug, error, info, trace, warn, LevelFilter};
use nostr_sdk::prelude::*; use nostr_sdk::prelude::*;
use nostr_sdk::serde_json::Serializer;
use regex::Regex; use regex::Regex;
use rustyline::config::Configurer; use rustyline::config::Configurer;
use rustyline::error::ReadlineError; use rustyline::error::ReadlineError;
@ -37,7 +38,6 @@ mod kinds;
mod event_sender; mod event_sender;
mod hashtag; mod hashtag;
const INACTVITY_DELAY: u64 = 200;
const LOCAL_RELAY_NAME: &str = "TEMP"; const LOCAL_RELAY_NAME: &str = "TEMP";
/// Turn a Result into an Option, showing a warning on error with optional prefix /// Turn a Result into an Option, showing a warning on error with optional prefix
@ -86,8 +86,10 @@ fn read_keys(readline: &mut DefaultEditor) -> Result<Keys> {
async fn main() -> Result<()> { async fn main() -> Result<()> {
println!("Running Mostr Version {}", env!("CARGO_PKG_VERSION")); println!("Running Mostr Version {}", env!("CARGO_PKG_VERSION"));
let mut debug = false;
let mut args = args().skip(1).peekable(); let mut args = args().skip(1).peekable();
let mut builder = if args.peek().is_some_and(|arg| arg == "--debug") { let mut builder = if args.peek().is_some_and(|arg| arg == "--debug") {
debug = true;
args.next(); args.next();
let mut builder = Builder::new(); let mut builder = Builder::new();
builder.filter(None, LevelFilter::Debug) builder.filter(None, LevelFilter::Debug)
@ -211,14 +213,11 @@ async fn main() -> Result<()> {
client.relays().await.into_keys().map(|url| (Some(url.clone()), tasks_for_url(Some(url)))).collect(); client.relays().await.into_keys().map(|url| (Some(url.clone()), tasks_for_url(Some(url)))).collect();
let sender = tokio::spawn(async move { let sender = tokio::spawn(async move {
let mut queue: Option<(RelayUrl, Vec<Event>)> = None;
or_warn!(client.set_metadata(&metadata_clone).await, "Unable to set metadata"); or_warn!(client.set_metadata(&metadata_clone).await, "Unable to set metadata");
'repl: loop { 'receiver: loop {
let result_received = timeout(Duration::from_secs(INACTVITY_DELAY), rx.recv()).await; match rx.recv().await {
match result_received { Some(MostrMessage::NewRelay(url)) => {
Ok(Some(MostrMessage::NewRelay(url))) => {
if client.add_relay(&url).await.unwrap() { if client.add_relay(&url).await.unwrap() {
match client.connect_relay(&url).await { match client.connect_relay(&url).await {
Ok(()) => info!("Connected to {url}"), Ok(()) => info!("Connected to {url}"),
@ -228,39 +227,16 @@ async fn main() -> Result<()> {
warn!("Relay {url} already added"); warn!("Relay {url} already added");
} }
} }
Ok(Some(MostrMessage::AddTasks(url, mut events))) => { Some(MostrMessage::SendTask(url, event)) => {
trace!("Queueing {:?}", &events); trace!("Sending {:?}", &event);
if let Some((queue_url, mut queue_events)) = queue { client.send_event_to(vec![url], event);
if queue_url == url {
queue_events.append(&mut events);
queue = Some((queue_url, queue_events));
} else {
info!("Sending {} events to {queue_url} due to relay change", queue_events.len());
client.batch_event_to(vec![queue_url], queue_events).await;
queue = None;
} }
} None => {
if queue.is_none() {
events.reserve(events.len() + 10);
queue = Some((url, events))
}
}
Ok(Some(MostrMessage::Flush)) | Err(Elapsed { .. }) => if let Some((url, events)) = queue {
info!("Sending {} events to {url} due to {}", events.len(),
result_received.map_or("inactivity", |_| "flush message"));
client.batch_event_to(vec![url], events).await;
queue = None;
}
Ok(None) => {
debug!("Finalizing nostr communication thread because communication channel was closed"); debug!("Finalizing nostr communication thread because communication channel was closed");
break 'repl; break 'receiver;
} }
} }
} }
if let Some((url, events)) = queue {
info!("Sending {} events to {url} before exiting", events.len());
client.batch_event_to(vec![url], events).await;
}
info!("Shutting down nostr communication thread"); info!("Shutting down nostr communication thread");
}); });