diff --git a/src/main.rs b/src/main.rs index 8e35874..61e5590 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,7 +13,7 @@ use std::sync::mpsc::RecvTimeoutError; use std::sync::mpsc::Sender; use std::time::Duration; -use colored::Colorize; +use colored::{ColoredString, Colorize}; use env_logger::Builder; use itertools::Itertools; use log::{debug, error, info, LevelFilter, trace, warn}; @@ -23,7 +23,6 @@ use xdg::BaseDirectories; use crate::helpers::*; use crate::kinds::{KINDS, PROPERTY_COLUMNS, TRACKING_KIND}; -use crate::MostrMessage::AddTasks; use crate::task::{MARKER_DEPENDS, MARKER_PARENT, State}; use crate::tasks::{PropertyCollection, StateFilter, Tasks}; @@ -80,7 +79,9 @@ impl EventSender { debug!("Flushing {} events from queue", self.queue.borrow().len()); let values = self.clear(); self.url.as_ref().map(|url| { - or_print(self.tx.send(AddTasks(url.clone(), values))); + self.tx.send(MostrMessage::AddTasks(url.clone(), values)).inspect_err(|e| { + error!("Nostr communication thread failure, changes will not be persisted: {}", e) + }) }); } /// Sends all pending events if there is a non-tracking event @@ -149,6 +150,7 @@ async fn main() { let client = Client::new(&keys); info!("My public key: {}", keys.public_key()); + // TODO use NewRelay message for all relays match var("MOSTR_RELAY") { Ok(relay) => { or_print(client.add_relay(relay).await); @@ -237,19 +239,26 @@ async fn main() { let mut queue: Option<(Url, Vec)> = None; loop { - // TODO invalid acknowledgement from bucket relay slows sending down - match rx.recv_timeout(Duration::from_secs(INACTVITY_DELAY)) { - Ok(AddTasks(url, mut events)) => { - if 1 == 2 { - client.connect_relay("").await; + let result = rx.recv_timeout(Duration::from_secs(INACTVITY_DELAY)); + match result { + Ok(MostrMessage::NewRelay(url)) => { + if client.add_relay(&url).await.unwrap() { + match client.connect_relay(&url).await { + Ok(()) => info!("Connected to {url}"), + Err(e) => warn!("Unable to connect to relay {url}: {e}") + } + } else { + warn!("Relay {url} already added"); } - debug!("Queueing {:?}", &events); + } + Ok(MostrMessage::AddTasks(url, mut events)) => { + trace!("Queueing {:?}", &events); if let Some((queue_url, mut queue_events)) = queue { if queue_url == url { queue_events.append(&mut events); queue = Some((queue_url, queue_events)); } else { - info!("Sending {} events due to relay change", queue_events.len()); + info!("Sending {} events to {url} due to relay change", queue_events.len()); client.batch_event_to(vec![queue_url], queue_events, RelaySendOptions::new()).await; queue = None; } @@ -259,19 +268,19 @@ async fn main() { queue = Some((url, events)) } } - Err(RecvTimeoutError::Timeout) => if let Some((url, events)) = queue { - info!("Sending {} events due to inactivity", events.len()); + Ok(MostrMessage::Flush) | Err(RecvTimeoutError::Timeout) => if let Some((url, events)) = queue { + info!("Sending {} events to {url} due to {:?}", events.len(), result); client.batch_event_to(vec![url], events, RelaySendOptions::new()).await; queue = None; } - arg => { - debug!("Finalizing nostr communication thread because of {:?}", arg); - break + Err(err) => { + debug!("Finalizing nostr communication thread because of {:?}", err); + break; } } } if let Some((url, events)) = queue { - info!("Sending {} events before exiting", events.len()); + info!("Sending {} events to {url} before exiting", events.len()); client.batch_event_to(vec![url], events, RelaySendOptions::new()).await; } info!("Shutting down nostr communication thread"); @@ -541,22 +550,19 @@ async fn main() { _ => if Regex::new("^wss?://").unwrap().is_match(&input.trim()) { tasks.move_to(None); - let mut new_relay = relays.keys().find(|key| key.as_str().starts_with(&input)).cloned(); - if new_relay.is_none() { - if let Some(url) = or_print(Url::parse(&input)) { - warn!("Connecting to {url} while running not yet supported"); - //new_relay = Some(url.clone()); - //relays.insert(url.clone(), tasks_for_url(Some(url.clone()))); - //if client.add_relay(url).await.unwrap() { - // relays.insert(url.clone(), tasks_for_url(Some(url.clone()))); - // client.connect().await; - //} + if let Some((url, tasks)) = relays.iter().find(|(key, _)| key.as_str().starts_with(&input)) { + selected_relay = Some(url.clone()); + or_print(tasks.print_tasks()); + } else if let Some(url) = or_print(Url::parse(&input)) { + match tx.send(MostrMessage::NewRelay(url.clone())) { + Err(e) => error!("Nostr communication thread failure, cannot add relay \"{url}\": {e}"), + Ok(_) => { + info!("Connecting to {url}"); + selected_relay = Some(url.clone()); + relays.insert(url.clone(), tasks_for_url(Some(url))); + } } } - if new_relay.is_some() { - selected_relay = new_relay; - } - //or_print(tasks.print_tasks()); continue; } else { tasks.filter_or_create(&input);