From dcc777881569c35a3305316908190b87268db059 Mon Sep 17 00:00:00 2001 From: xeruf <27jf@pm.me> Date: Thu, 16 Jan 2025 00:26:40 +0100 Subject: [PATCH] fix: remove deprecated event send batching --- src/event_sender.rs | 15 ++++----------- src/main.rs | 46 +++++++++++---------------------------------- 2 files changed, 15 insertions(+), 46 deletions(-) diff --git a/src/event_sender.rs b/src/event_sender.rs index 302e8bb..44c5d99 100644 --- a/src/event_sender.rs +++ b/src/event_sender.rs @@ -13,9 +13,8 @@ const UNDO_DELAY: u64 = 60; #[derive(Debug, Clone, Eq, PartialEq)] pub(crate) enum MostrMessage { - Flush, NewRelay(RelayUrl), - AddTasks(RelayUrl, Vec), + SendTask(RelayUrl, Event), } type Events = Vec; @@ -68,15 +67,9 @@ impl EventSender { debug!("Flushing {} events from queue", self.queue.borrow().len()); let values = self.clear(); self.url.as_ref().map(|url| { - self.tx - .try_send(MostrMessage::AddTasks(url.clone(), values)) - .err() - .map(|e| { - error!( - "Nostr communication thread failure, changes will not be persisted: {}", - e - ) - }) + values.into_iter() + .find_map(|event| self.tx.try_send(MostrMessage::SendTask(url.clone(), event)).err()) + .map(|e| error!("Nostr communication thread failure, changes will not be persisted: {}", e)) }); } /// Sends all pending events if there is a non-tracking event diff --git a/src/main.rs b/src/main.rs index 3150830..0b649a7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -22,6 +22,7 @@ use itertools::Itertools; use keyring::Entry; use log::{debug, error, info, trace, warn, LevelFilter}; use nostr_sdk::prelude::*; +use nostr_sdk::serde_json::Serializer; use regex::Regex; use rustyline::config::Configurer; use rustyline::error::ReadlineError; @@ -37,7 +38,6 @@ mod kinds; mod event_sender; mod hashtag; -const INACTVITY_DELAY: u64 = 200; const LOCAL_RELAY_NAME: &str = "TEMP"; /// Turn a Result into an Option, showing a warning on error with optional prefix @@ -86,8 +86,10 @@ fn read_keys(readline: &mut DefaultEditor) -> Result { async fn main() -> Result<()> { println!("Running Mostr Version {}", env!("CARGO_PKG_VERSION")); + let mut debug = false; let mut args = args().skip(1).peekable(); let mut builder = if args.peek().is_some_and(|arg| arg == "--debug") { + debug = true; args.next(); let mut builder = Builder::new(); builder.filter(None, LevelFilter::Debug) @@ -211,14 +213,11 @@ async fn main() -> Result<()> { client.relays().await.into_keys().map(|url| (Some(url.clone()), tasks_for_url(Some(url)))).collect(); let sender = tokio::spawn(async move { - let mut queue: Option<(RelayUrl, Vec)> = None; - or_warn!(client.set_metadata(&metadata_clone).await, "Unable to set metadata"); - 'repl: loop { - let result_received = timeout(Duration::from_secs(INACTVITY_DELAY), rx.recv()).await; - match result_received { - Ok(Some(MostrMessage::NewRelay(url))) => { + 'receiver: loop { + match rx.recv().await { + Some(MostrMessage::NewRelay(url)) => { if client.add_relay(&url).await.unwrap() { match client.connect_relay(&url).await { Ok(()) => info!("Connected to {url}"), @@ -228,39 +227,16 @@ async fn main() -> Result<()> { warn!("Relay {url} already added"); } } - Ok(Some(MostrMessage::AddTasks(url, mut events))) => { - trace!("Queueing {:?}", &events); - if let Some((queue_url, mut queue_events)) = queue { - if queue_url == url { - queue_events.append(&mut events); - queue = Some((queue_url, queue_events)); - } else { - info!("Sending {} events to {queue_url} due to relay change", queue_events.len()); - client.batch_event_to(vec![queue_url], queue_events).await; - queue = None; - } - } - if queue.is_none() { - events.reserve(events.len() + 10); - queue = Some((url, events)) - } + Some(MostrMessage::SendTask(url, event)) => { + trace!("Sending {:?}", &event); + client.send_event_to(vec![url], event); } - Ok(Some(MostrMessage::Flush)) | Err(Elapsed { .. }) => if let Some((url, events)) = queue { - info!("Sending {} events to {url} due to {}", events.len(), - result_received.map_or("inactivity", |_| "flush message")); - client.batch_event_to(vec![url], events).await; - queue = None; - } - Ok(None) => { + None => { debug!("Finalizing nostr communication thread because communication channel was closed"); - break 'repl; + break 'receiver; } } } - if let Some((url, events)) = queue { - info!("Sending {} events to {url} before exiting", events.len()); - client.batch_event_to(vec![url], events).await; - } info!("Shutting down nostr communication thread"); });