forked from janek/mostr
feat(main): batch up messages for relays
This commit is contained in:
parent
619bcfbbad
commit
c67ef3b119
45
src/main.rs
45
src/main.rs
|
@ -9,9 +9,10 @@ use std::ops::Sub;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use std::sync::mpsc;
|
use std::sync::mpsc;
|
||||||
|
use std::sync::mpsc::RecvTimeoutError;
|
||||||
use std::sync::mpsc::Sender;
|
use std::sync::mpsc::Sender;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
use chrono::DateTime;
|
|
||||||
use colored::Colorize;
|
use colored::Colorize;
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use log::{debug, error, info, trace, warn};
|
use log::{debug, error, info, trace, warn};
|
||||||
|
@ -208,12 +209,44 @@ async fn main() {
|
||||||
.await;*/
|
.await;*/
|
||||||
|
|
||||||
let sender = tokio::spawn(async move {
|
let sender = tokio::spawn(async move {
|
||||||
while let Ok((url, events)) = rx.recv() {
|
let mut queue: Option<(Url, Vec<Event>)> = None;
|
||||||
trace!("Sending {:?}", events);
|
|
||||||
// TODO batch up further
|
loop {
|
||||||
let _ = client.batch_event_to(vec![url], events, RelaySendOptions::new()).await;
|
// TODO invalid acknowledgement from bucket relay slows sending down
|
||||||
|
match rx.recv_timeout(Duration::from_secs(200)) {
|
||||||
|
Ok((url, mut events)) => {
|
||||||
|
if 1 == 2 {
|
||||||
|
client.connect_relay("").await;
|
||||||
}
|
}
|
||||||
info!("Shutting down sender thread");
|
debug!("Queueing {:?}", &events);
|
||||||
|
if let Some((queue_url, mut queue_events)) = queue {
|
||||||
|
if queue_url == url {
|
||||||
|
queue_events.append(&mut events);
|
||||||
|
queue = Some((queue_url, queue_events));
|
||||||
|
} else {
|
||||||
|
info!("Sending {} events due to relay change", queue_events.len());
|
||||||
|
client.batch_event_to(vec![queue_url], queue_events, RelaySendOptions::new()).await;
|
||||||
|
queue = None;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if queue.is_none() {
|
||||||
|
events.reserve(events.len() + 10);
|
||||||
|
queue = Some((url, events))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(RecvTimeoutError::Timeout) => if let Some((url, events)) = queue {
|
||||||
|
info!("Sending {} events due to inactivity", events.len());
|
||||||
|
client.batch_event_to(vec![url], events, RelaySendOptions::new()).await;
|
||||||
|
queue = None;
|
||||||
|
}
|
||||||
|
_ => break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if let Some((url, events)) = queue {
|
||||||
|
info!("Sending {} events before exiting", events.len());
|
||||||
|
client.batch_event_to(vec![url], events, RelaySendOptions::new()).await;
|
||||||
|
}
|
||||||
|
info!("Shutting down nostr communication thread");
|
||||||
});
|
});
|
||||||
|
|
||||||
let mut local_tasks = Tasks::from(None, &tx, &keys);
|
let mut local_tasks = Tasks::from(None, &tx, &keys);
|
||||||
|
|
Loading…
Reference in New Issue