From c67ef3b119669bea115f2939004c76f0440a3291 Mon Sep 17 00:00:00 2001 From: xeruf <27jf@pm.me> Date: Tue, 13 Aug 2024 11:54:14 +0300 Subject: [PATCH] feat(main): batch up messages for relays --- src/main.rs | 45 +++++++++++++++++++++++++++++++++++++++------ 1 file changed, 39 insertions(+), 6 deletions(-) diff --git a/src/main.rs b/src/main.rs index d78aa15..56d5928 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,9 +9,10 @@ use std::ops::Sub; use std::path::PathBuf; use std::str::FromStr; use std::sync::mpsc; +use std::sync::mpsc::RecvTimeoutError; use std::sync::mpsc::Sender; +use std::time::Duration; -use chrono::DateTime; use colored::Colorize; use itertools::Itertools; use log::{debug, error, info, trace, warn}; @@ -208,12 +209,44 @@ async fn main() { .await;*/ let sender = tokio::spawn(async move { - while let Ok((url, events)) = rx.recv() { - trace!("Sending {:?}", events); - // TODO batch up further - let _ = client.batch_event_to(vec![url], events, RelaySendOptions::new()).await; + let mut queue: Option<(Url, Vec)> = None; + + loop { + // TODO invalid acknowledgement from bucket relay slows sending down + match rx.recv_timeout(Duration::from_secs(200)) { + Ok((url, mut events)) => { + if 1 == 2 { + client.connect_relay("").await; + } + debug!("Queueing {:?}", &events); + if let Some((queue_url, mut queue_events)) = queue { + if queue_url == url { + queue_events.append(&mut events); + queue = Some((queue_url, queue_events)); + } else { + info!("Sending {} events due to relay change", queue_events.len()); + client.batch_event_to(vec![queue_url], queue_events, RelaySendOptions::new()).await; + queue = None; + } + } + if queue.is_none() { + events.reserve(events.len() + 10); + queue = Some((url, events)) + } + } + Err(RecvTimeoutError::Timeout) => if let Some((url, events)) = queue { + info!("Sending {} events due to inactivity", events.len()); + client.batch_event_to(vec![url], events, RelaySendOptions::new()).await; + queue = None; + } + _ => break + } } - info!("Shutting down sender thread"); + if let Some((url, events)) = queue { + info!("Sending {} events before exiting", events.len()); + client.batch_event_to(vec![url], events, RelaySendOptions::new()).await; + } + info!("Shutting down nostr communication thread"); }); let mut local_tasks = Tasks::from(None, &tx, &keys);