diff --git a/src/main.rs b/src/main.rs index 590b777..e9d75c4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,18 +8,20 @@ use std::iter::once; use std::ops::Sub; use std::path::PathBuf; use std::str::FromStr; -use std::sync::mpsc; -use std::sync::mpsc::RecvTimeoutError; -use std::sync::mpsc::Sender; use std::time::Duration; -use colored::{ColoredString, Colorize}; +use colored::Colorize; use env_logger::Builder; use itertools::Itertools; use log::{debug, error, info, LevelFilter, trace, warn}; +use nostr_sdk::async_utility::futures_util::TryFutureExt; use nostr_sdk::prelude::*; use nostr_sdk::TagStandard::Hashtag; use regex::Regex; +use tokio::sync::mpsc; +use tokio::sync::mpsc::Sender; +use tokio::time::error::Elapsed; +use tokio::time::timeout; use xdg::BaseDirectories; use crate::helpers::*; @@ -79,11 +81,11 @@ impl EventSender { fn force_flush(&self) { debug!("Flushing {} events from queue", self.queue.borrow().len()); let values = self.clear(); - self.url.as_ref().map(|url| { - self.tx.send(MostrMessage::AddTasks(url.clone(), values)).inspect_err(|e| { + if let Some(url) = self.url.as_ref() { + let _ = self.tx.blocking_send(MostrMessage::AddTasks(url.clone(), values)).map_err(|e| { error!("Nostr communication thread failure, changes will not be persisted: {}", e) - }) - }); + }); + } } /// Sends all pending events if there is a non-tracking event fn flush(&self) { @@ -185,85 +187,27 @@ async fn main() { ], None).await; info!("Subscribed to tasks with {:?}", sub1); - let mut notifications = client.notifications(); - client.connect().await; - let sub2 = client.subscribe(vec![ Filter::new().kinds(PROP_KINDS.into_iter().map(|k| Kind::from(k))) ], None).await; + info!("Subscribed to updates with {:?}", sub2); + let mut notifications = client.notifications(); + client.connect().await; - let (tx, rx) = mpsc::channel::(); - let tasks_for_url = |url: Option| Tasks::from(url, &tx, &keys); - let mut relays: HashMap = - client.relays().await.into_keys().map(|url| (url.clone(), tasks_for_url(Some(url)))).collect(); + let (tx, mut rx) = mpsc::channel::(64); + let mut local_tasks = Tasks::from(None, tx.clone(), keys.clone()); - let sender = tokio::spawn(async move { - let mut queue: Option<(Url, Vec)> = None; + let mypubkey = keys.public_key(); + let closureSender = tx.clone(); + let closureKeys = keys.clone(); + let tasks_for_url = move |url: Option| Tasks::from(url, closureSender.clone(), closureKeys.clone()); - loop { - if let Ok(user) = var("USER") { - let metadata = Metadata::new() - .name(user); - // .display_name("My Username") - // .about("Description") - // .picture(Url::parse("https://example.com/avatar.png")?) - // .banner(Url::parse("https://example.com/banner.png")?) - // .nip05("username@example.com") - // .lud16("yuki@getalby.com") - // .custom_field("custom_field", "my value"); - or_print(client.set_metadata(&metadata).await); - } - - let result_received = rx.recv_timeout(Duration::from_secs(INACTVITY_DELAY)); - match result_received { - Ok(MostrMessage::NewRelay(url)) => { - if client.add_relay(&url).await.unwrap() { - match client.connect_relay(&url).await { - Ok(()) => info!("Connected to {url}"), - Err(e) => warn!("Unable to connect to relay {url}: {e}") - } - } else { - warn!("Relay {url} already added"); - } - } - Ok(MostrMessage::AddTasks(url, mut events)) => { - trace!("Queueing {:?}", &events); - if let Some((queue_url, mut queue_events)) = queue { - if queue_url == url { - queue_events.append(&mut events); - queue = Some((queue_url, queue_events)); - } else { - info!("Sending {} events to {url} due to relay change", queue_events.len()); - client.batch_event_to(vec![queue_url], queue_events, RelaySendOptions::new()).await; - queue = None; - } - } - if queue.is_none() { - events.reserve(events.len() + 10); - queue = Some((url, events)) - } - } - Ok(MostrMessage::Flush) | Err(RecvTimeoutError::Timeout) => if let Some((url, events)) = queue { - info!("Sending {} events to {url} due to {}", events.len(), - result_received.map_or_else(|e| format!("{:?}", e), |m| format!("{:?}", m))); - client.batch_event_to(vec![url], events, RelaySendOptions::new()).await; - queue = None; - } - Err(err) => { - debug!("Finalizing nostr communication thread because of {:?}: {}", err, err); - break; - } - } - } - if let Some((url, events)) = queue { - info!("Sending {} events to {url} before exiting", events.len()); - client.batch_event_to(vec![url], events, RelaySendOptions::new()).await; - } - info!("Shutting down nostr communication thread"); - }); - - let mut local_tasks = Tasks::from(None, &tx, &keys); + let mut relays: HashMap = Default::default(); + for (url, _) in client.relays().await { + relays.insert(url.clone(), tasks_for_url(Some(url))); + } + //client.relays().await.into_keys().map(|url| (url.clone(), tasks_for_url(Some(url)))).collect(); let mut selected_relay: Option = relays.keys().nth(0).cloned(); { @@ -273,353 +217,419 @@ async fn main() { } } - let mut lines = stdin().lines(); - loop { - trace!("All Root Tasks:\n{}", relays.iter().map(|(url, tasks)| + let sender = tokio::spawn(async move { + let mut lines = stdin().lines(); + loop { + trace!("All Root Tasks:\n{}", relays.iter().map(|(url, tasks)| format!("{}: [{}]", url, tasks.children_of(None).map(|id| tasks.get_task_title(id)).join("; "))).join("\n")); - println!(); - let tasks = selected_relay.as_ref().and_then(|url| relays.get(url)).unwrap_or(&local_tasks); - print!( - "{} {}{}) ", - selected_relay.as_ref().map_or("TEMP".to_string(), |url| url.to_string()).bright_black(), - tasks.get_task_path(tasks.get_position()).bold(), - tasks.get_prompt_suffix().italic(), - ); - stdout().flush().unwrap(); - match lines.next() { - Some(Ok(input)) => { - let mut count = 0; - while let Ok(notification) = notifications.try_recv() { - if let RelayPoolNotification::Event { - relay_url, - event, - .. - } = notification - { - debug!( + println!(); + let tasks = selected_relay.as_ref().and_then(|url| relays.get(url)).unwrap_or(&local_tasks); + print!( + "{} {}{}) ", + selected_relay.as_ref().map_or("TEMP".to_string(), |url| url.to_string()).bright_black(), + tasks.get_task_path(tasks.get_position()).bold(), + tasks.get_prompt_suffix().italic(), + ); + stdout().flush().unwrap(); + match lines.next() { + Some(Ok(input)) => { + let mut count = 0; + while let Ok(notification) = notifications.try_recv() { + if let RelayPoolNotification::Event { + relay_url, + event, + .. + } = notification + { + debug!( "At {} found {} kind {} content \"{}\" tags {:?}", event.created_at, event.id, event.kind, event.content, event.tags.iter().map(|tag| tag.as_vec()).collect_vec() ); - match relays.get_mut(&relay_url) { - Some(tasks) => tasks.add(*event), - None => warn!("Event received from unknown relay {relay_url}: {:?}", *event) - } - count += 1; - } - } - if count > 0 { - info!("Received {count} Updates"); - } - - let mut iter = input.chars(); - let op = iter.next(); - let arg = if input.len() > 1 { - Some(input[1..].trim()) - } else { - None - }; - let arg_default = arg.unwrap_or(""); - let tasks = selected_relay.as_ref().and_then(|url| relays.get_mut(&url)).unwrap_or_else(|| &mut local_tasks); - match op { - None => { - debug!("Flushing Tasks because of empty command"); - tasks.flush() - } - - Some(':') => { - let next = iter.next(); - if let Some(':') = next { - let str: String = iter.collect(); - let result = str.split_whitespace().map(|s| s.to_string()).collect::>(); - if result.len() == 1 { - tasks.add_sorting_property(str.trim().to_string()) - } else { - tasks.set_sorting(result) + match relays.get_mut(&relay_url) { + Some(tasks) => tasks.add(*event), + None => warn!("Event received from unknown relay {relay_url}: {:?}", *event) } - } else if let Some(digit) = next.and_then(|s| s.to_digit(10)) { - let index = (digit as usize).saturating_sub(1); - let remaining = iter.collect::().trim().to_string(); - if remaining.is_empty() { - tasks.get_columns().remove_at(index); - } else { - tasks.get_columns().add_or_remove_at(remaining, index); - } - } else if let Some(arg) = arg { - tasks.get_columns().add_or_remove(arg.to_string()); - } else { - println!("{}", PROPERTY_COLUMNS); - continue; + count += 1; } } + if count > 0 { + info!("Received {count} Updates"); + } - Some(',') => - match arg { - None => { - tasks.get_current_task().map_or_else( - || info!("With a task selected, use ,NOTE to attach NOTE and , to list all its notes"), - |task| println!("{}", task.description_events().map(|e| format!("{} {}", local_datetimestamp(&e.created_at), e.content)).join("\n")), - ); + let mut iter = input.chars(); + let op = iter.next(); + let arg = if input.len() > 1 { + Some(input[1..].trim()) + } else { + None + }; + let arg_default = arg.unwrap_or(""); + let tasks = selected_relay.as_ref().and_then(|url| relays.get_mut(&url)).unwrap_or_else(|| &mut local_tasks); + match op { + None => { + debug!("Flushing Tasks because of empty command"); + tasks.flush() + } + + Some(':') => { + let next = iter.next(); + if let Some(':') = next { + let str: String = iter.collect(); + let result = str.split_whitespace().map(|s| s.to_string()).collect::>(); + if result.len() == 1 { + tasks.add_sorting_property(str.trim().to_string()) + } else { + tasks.set_sorting(result) + } + } else if let Some(digit) = next.and_then(|s| s.to_digit(10)) { + let index = (digit as usize).saturating_sub(1); + let remaining = iter.collect::().trim().to_string(); + if remaining.is_empty() { + tasks.get_columns().remove_at(index); + } else { + tasks.get_columns().add_or_remove_at(remaining, index); + } + } else if let Some(arg) = arg { + tasks.get_columns().add_or_remove(arg.to_string()); + } else { + println!("{}", PROPERTY_COLUMNS); continue; } - Some(arg) => tasks.make_note(arg), } - Some('>') => { - tasks.update_state(&arg_default, State::Done); - tasks.move_up(); - } - - Some('<') => { - tasks.update_state(&arg_default, State::Closed); - tasks.move_up(); - } - - Some('&') => { - tasks.undo(); - } - - Some('@') => { - match arg { - None => { - let today = Timestamp::from(Timestamp::now() - 80_000); - info!("Filtering for tasks from the last 22 hours"); - tasks.set_filter( - tasks.filtered_tasks(tasks.get_position_ref()) - .filter(|t| t.event.created_at > today) - .map(|t| t.event.id) - .collect() - ); + Some(',') => + match arg { + None => { + tasks.get_current_task().map_or_else( + || info!("With a task selected, use ,NOTE to attach NOTE and , to list all its notes"), + |task| println!("{}", task.description_events().map(|e| format!("{} {}", local_datetimestamp(&e.created_at), e.content)).join("\n")), + ); + continue; + } + Some(arg) => tasks.make_note(arg), } - Some(arg) => { - if arg == "@" { - let key = keys.public_key(); - info!("Filtering for own tasks"); + + Some('>') => { + tasks.update_state(&arg_default, State::Done); + tasks.move_up(); + } + + Some('<') => { + tasks.update_state(&arg_default, State::Closed); + tasks.move_up(); + } + + Some('&') => { + tasks.undo(); + } + + Some('@') => { + match arg { + None => { + let today = Timestamp::from(Timestamp::now() - 80_000); + info!("Filtering for tasks from the last 22 hours"); tasks.set_filter( tasks.filtered_tasks(tasks.get_position_ref()) - .filter(|t| t.event.pubkey == key) + .filter(|t| t.event.created_at > today) .map(|t| t.event.id) .collect() - ) - } else if let Ok(key) = PublicKey::from_str(arg) { - let author = tasks.get_author(&key); - info!("Filtering for tasks by {author}"); - tasks.set_filter( - tasks.filtered_tasks(tasks.get_position_ref()) - .filter(|t| t.event.pubkey == key) - .map(|t| t.event.id) - .collect() - ) - } else { - parse_date(arg).map(|time| { - info!("Filtering for tasks from {}", time); // TODO localize + ); + } + Some(arg) => { + if arg == "@" { + info!("Filtering for own tasks"); tasks.set_filter( tasks.filtered_tasks(tasks.get_position_ref()) - .filter(|t| t.event.created_at.as_u64() as i64 > time.timestamp()) + .filter(|t| t.event.pubkey == mypubkey) .map(|t| t.event.id) .collect() - ); - }); + ) + } else if let Ok(key) = PublicKey::from_str(arg) { + let author = tasks.get_author(&key); + info!("Filtering for tasks by {author}"); + tasks.set_filter( + tasks.filtered_tasks(tasks.get_position_ref()) + .filter(|t| t.event.pubkey == key) + .map(|t| t.event.id) + .collect() + ) + } else { + parse_date(arg).map(|time| { + info!("Filtering for tasks from {}", time); // TODO localize + tasks.set_filter( + tasks.filtered_tasks(tasks.get_position_ref()) + .filter(|t| t.event.created_at.as_u64() as i64 > time.timestamp()) + .map(|t| t.event.id) + .collect() + ); + }); + } } } } - } - Some('*') => { - info!("Setting priority not yet implemented") - } + Some('*') => { + info!("Setting priority not yet implemented") + } - Some('|') => - match arg { - None => match tasks.get_position() { - None => { - tasks.set_state_filter( - StateFilter::State(State::Procedure.to_string())); + Some('|') => + match arg { + None => match tasks.get_position() { + None => { + tasks.set_state_filter( + StateFilter::State(State::Procedure.to_string())); + } + Some(id) => { + tasks.set_state_for(id, "", State::Procedure); + } + }, + Some(arg) => 'arm: { + if arg.chars().next() != Some('|') { + if let Some(pos) = tasks.get_position() { + tasks.move_up(); + tasks.make_task_with( + arg, + once(tasks.make_event_tag_from_id(pos, MARKER_DEPENDS)), + true); + break 'arm; + } + } + let arg: String = arg.chars().skip_while(|c| c == &'|').collect(); + tasks.make_task_and_enter(&arg, State::Procedure); } + } + + Some('?') => { + match arg { + None => tasks.set_state_filter(StateFilter::Default), + Some("?") => tasks.set_state_filter(StateFilter::All), + Some(arg) => tasks.set_state_filter(StateFilter::State(arg.to_string())), + } + } + + Some('!') => + match tasks.get_position() { + None => warn!("First select a task to set its state!"), Some(id) => { - tasks.set_state_for(id, "", State::Procedure); - } - }, - Some(arg) => 'arm: { - if arg.chars().next() != Some('|') { - if let Some(pos) = tasks.get_position() { - tasks.move_up(); - tasks.make_task_with( - arg, - once(tasks.make_event_tag_from_id(pos, MARKER_DEPENDS)), - true); - break 'arm; - } - } - let arg: String = arg.chars().skip_while(|c| c == &'|').collect(); - tasks.make_task_and_enter(&arg, State::Procedure); - } - } - - Some('?') => { - match arg { - None => tasks.set_state_filter(StateFilter::Default), - Some("?") => tasks.set_state_filter(StateFilter::All), - Some(arg) => tasks.set_state_filter(StateFilter::State(arg.to_string())), - } - } - - Some('!') => - match tasks.get_position() { - None => warn!("First select a task to set its state!"), - Some(id) => { - tasks.set_state_for_with(id, arg_default); - tasks.move_up(); - } - } - - Some('#') => - match arg { - Some(arg) => tasks.set_tags(arg.split_whitespace().map(|s| Hashtag(s.to_string()).into())), - None => { - println!("Hashtags of all known tasks:\n{}", tasks.all_hashtags().join(" ")); - continue; - } - } - - Some('+') => - match arg { - Some(arg) => tasks.add_tag(arg.to_string()), - None => tasks.clear_filter() - } - - Some('-') => - match arg { - Some(arg) => tasks.remove_tag(arg), - None => tasks.clear_filter() - } - - Some('(') => { - if let Some(arg) = arg { - if tasks.track_from(arg) { - let (label, times) = tasks.times_tracked(); - println!("{}\n{}", label.italic(), times.rev().take(15).join("\n")); - } - } else { - let (label, mut times) = tasks.times_tracked(); - println!("{}\n{}", label.italic(), times.join("\n")); - } - continue; - } - - Some(')') => { - match arg { - None => tasks.move_to(None), - Some(arg) => { - if parse_tracking_stamp(arg).map(|stamp| tasks.track_at(stamp, None)).is_none() { - // So the error message is not covered up - continue + tasks.set_state_for_with(id, arg_default); + tasks.move_up(); } } - } - } - Some('.') => { - let mut dots = 1; - let mut pos = tasks.get_position_ref(); - for _ in iter.take_while(|c| c == &'.') { - dots += 1; - pos = tasks.get_parent(pos); - } - - let slice = input[dots..].trim(); - if slice.is_empty() { - tasks.move_to(pos.cloned()); - if dots > 1 { - info!("Moving up {} tasks", dots - 1) - } - } else if let Ok(depth) = slice.parse::() { - if pos != tasks.get_position_ref() { - tasks.move_to(pos.cloned()); - } - tasks.set_depth(depth); - } else { - tasks.filter_or_create(pos.cloned().as_ref(), slice).map(|id| tasks.move_to(Some(id))); - } - } - - Some('/') => { - let mut dots = 1; - let mut pos = tasks.get_position_ref(); - for _ in iter.take_while(|c| c == &'/') { - dots += 1; - pos = tasks.get_parent(pos); - } - - let slice = input[dots..].trim(); - if slice.is_empty() { - tasks.move_to(pos.cloned()); - if dots > 1 { - info!("Moving up {} tasks", dots - 1) - } - } else if let Ok(depth) = slice.parse::() { - if pos != tasks.get_position_ref() { - tasks.move_to(pos.cloned()); - } - tasks.set_depth(depth); - } else { - let mut transform: Box String> = Box::new(|s: &str| s.to_string()); - if slice.chars().find(|c| c.is_ascii_uppercase()).is_none() { - // Smart-case - case-sensitive if any uppercase char is entered - transform = Box::new(|s| s.to_ascii_lowercase()); + Some('#') => + match arg { + Some(arg) => tasks.set_tags(arg.split_whitespace().map(|s| Hashtag(s.to_string()).into())), + None => { + println!("Hashtags of all known tasks:\n{}", tasks.all_hashtags().join(" ")); + continue; + } } - let filtered = tasks.filtered_tasks(pos) - .filter(|t| { - transform(&t.event.content).contains(slice) || t.tags.iter().flatten().any(|tag| - tag.content().is_some_and(|s| transform(s).contains(slice)) - ) - }) - .map(|t| t.event.id) - .collect_vec(); - if filtered.len() == 1 { - tasks.move_to(filtered.into_iter().nth(0)); + Some('+') => + match arg { + Some(arg) => tasks.add_tag(arg.to_string()), + None => tasks.clear_filter() + } + + Some('-') => + match arg { + Some(arg) => tasks.remove_tag(arg), + None => tasks.clear_filter() + } + + Some('(') => { + if let Some(arg) = arg { + if tasks.track_from(arg) { + let (label, times) = tasks.times_tracked(); + println!("{}\n{}", label.italic(), times.rev().take(15).join("\n")); + } + // TODO show history from author / pubkey } else { - tasks.move_to(pos.cloned()); - tasks.set_filter(filtered); - } - } - } - - _ => - if Regex::new("^wss?://").unwrap().is_match(&input.trim()) { - tasks.move_to(None); - if let Some((url, tasks)) = relays.iter().find(|(key, _)| key.as_str().starts_with(&input)) { - selected_relay = Some(url.clone()); - or_print(tasks.print_tasks()); - continue; - } - match Url::parse(&input) { - Err(e) => warn!("Failed to parse url \"{input}\": {}", e), - Ok(url) => match tx.send(MostrMessage::NewRelay(url.clone())) { - Err(e) => error!("Nostr communication thread failure, cannot add relay \"{url}\": {e}"), - Ok(_) => { - info!("Connecting to {url}"); - selected_relay = Some(url.clone()); - relays.insert(url.clone(), tasks_for_url(Some(url))); - } - } + let (label, mut times) = tasks.times_tracked(); + println!("{}\n{}", label.italic(), times.join("\n")); } continue; - } else { - tasks.filter_or_create(tasks.get_position().as_ref(), &input); } + + Some(')') => { + match arg { + None => tasks.move_to(None), + Some(arg) => { + if parse_tracking_stamp(arg).map(|stamp| tasks.track_at(stamp, None)).is_none() { + // So the error message is not covered up + continue; + } + } + } + } + + Some('.') => { + let mut dots = 1; + let mut pos = tasks.get_position_ref(); + for _ in iter.take_while(|c| c == &'.') { + dots += 1; + pos = tasks.get_parent(pos); + } + + let slice = input[dots..].trim(); + if slice.is_empty() { + tasks.move_to(pos.cloned()); + if dots > 1 { + info!("Moving up {} tasks", dots - 1) + } + } else if let Ok(depth) = slice.parse::() { + if pos != tasks.get_position_ref() { + tasks.move_to(pos.cloned()); + } + tasks.set_depth(depth); + } else { + tasks.filter_or_create(pos.cloned().as_ref(), slice).map(|id| tasks.move_to(Some(id))); + } + } + + Some('/') => { + let mut dots = 1; + let mut pos = tasks.get_position_ref(); + for _ in iter.take_while(|c| c == &'/') { + dots += 1; + pos = tasks.get_parent(pos); + } + + let slice = input[dots..].trim(); + if slice.is_empty() { + tasks.move_to(pos.cloned()); + if dots > 1 { + info!("Moving up {} tasks", dots - 1) + } + } else if let Ok(depth) = slice.parse::() { + if pos != tasks.get_position_ref() { + tasks.move_to(pos.cloned()); + } + tasks.set_depth(depth); + } else { + let mut transform: Box String> = Box::new(|s: &str| s.to_string()); + if slice.chars().find(|c| c.is_ascii_uppercase()).is_none() { + // Smart-case - case-sensitive if any uppercase char is entered + transform = Box::new(|s| s.to_ascii_lowercase()); + } + + let filtered = tasks.filtered_tasks(pos) + .filter(|t| { + transform(&t.event.content).contains(slice) || t.tags.iter().flatten().any(|tag| + tag.content().is_some_and(|s| transform(s).contains(slice)) + ) + }) + .map(|t| t.event.id) + .collect_vec(); + if filtered.len() == 1 { + tasks.move_to(filtered.into_iter().nth(0)); + } else { + tasks.move_to(pos.cloned()); + tasks.set_filter(filtered); + } + } + } + + _ => + if Regex::new("^wss?://").unwrap().is_match(&input.trim()) { + tasks.move_to(None); + if let Some((url, tasks)) = relays.iter().find(|(key, _)| key.as_str().starts_with(&input)) { + selected_relay = Some(url.clone()); + or_print(tasks.print_tasks()); + continue; + } + match Url::parse(&input) { + Err(e) => warn!("Failed to parse url \"{input}\": {}", e), + Ok(url) => match tx.send(MostrMessage::NewRelay(url.clone())).await { + Err(e) => error!("Nostr communication thread failure, cannot add relay \"{url}\": {e}"), + Ok(_) => { + info!("Connecting to {url}"); + selected_relay = Some(url.clone()); + relays.insert(url.clone(), tasks_for_url(Some(url))); + } + } + } + continue; + } else { + tasks.filter_or_create(tasks.get_position().as_ref(), &input); + } + } + or_print(tasks.print_tasks()); } - or_print(tasks.print_tasks()); + Some(Err(e)) => warn!("{}", e), + None => break, + } + } + println!(); + + drop(tx); + drop(local_tasks); + drop(relays); + }); + + let mut queue: Option<(Url, Vec)> = None; + + if let Ok(user) = var("USER") { + let metadata = Metadata::new() + .name(user); + // .display_name("My Username") + // .about("Description") + // .picture(Url::parse("https://example.com/avatar.png")?) + // .banner(Url::parse("https://example.com/banner.png")?) + // .nip05("username@example.com") + // .lud16("yuki@getalby.com") + // .custom_field("custom_field", "my value"); + or_print(client.set_metadata(&metadata).await); + } + + loop { + let result_received = timeout(Duration::from_secs(INACTVITY_DELAY), rx.recv()).await; + debug!("received {:?}", &result_received); + match result_received { + Ok(Some(MostrMessage::NewRelay(url))) => { + if client.add_relay(&url).await.unwrap() { + match client.connect_relay(&url).await { + Ok(()) => info!("Connected to {url}"), + Err(e) => warn!("Unable to connect to relay {url}: {e}") + } + } else { + warn!("Relay {url} already added"); + } + } + Ok(Some(MostrMessage::AddTasks(url, mut events))) => { + trace!("Queueing {:?}", &events); + if let Some((queue_url, mut queue_events)) = queue { + if queue_url == url { + queue_events.append(&mut events); + queue = Some((queue_url, queue_events)); + } else { + info!("Sending {} events to {url} due to relay change", queue_events.len()); + client.batch_event_to(vec![queue_url], queue_events, RelaySendOptions::new()).await; + queue = None; + } + } + if queue.is_none() { + events.reserve(events.len() + 10); + queue = Some((url, events)) + } + } + Ok(Some(MostrMessage::Flush)) | Err(Elapsed { .. }) => if let Some((url, events)) = queue { + info!("Sending {} events to {url} due to {}", events.len(), + result_received.map_or("inactivity", |_| "flush message")); + client.batch_event_to(vec![url], events, RelaySendOptions::new()).await; + queue = None; + } + Ok(None) => { + debug!("Finalizing nostr communication thread because communication channel was closed"); + break; } - Some(Err(e)) => warn!("{}", e), - None => break, } } - println!(); - - drop(tx); - drop(local_tasks); - drop(relays); + if let Some((url, events)) = queue { + info!("Sending {} events to {url} before exiting", events.len()); + client.batch_event_to(vec![url], events, RelaySendOptions::new()).await; + } + info!("Shutting down nostr communication thread"); info!("Submitting pending updates..."); or_print(sender.await); diff --git a/src/tasks.rs b/src/tasks.rs index 6b10c2f..9e3e1d4 100644 --- a/src/tasks.rs +++ b/src/tasks.rs @@ -4,7 +4,6 @@ use std::io::{Error, stdout, Write}; use std::iter::{empty, once}; use std::ops::{Div, Rem}; use std::str::FromStr; -use std::sync::mpsc::Sender; use std::time::Duration; use chrono::Local; @@ -14,6 +13,7 @@ use log::{debug, error, info, trace, warn}; use nostr_sdk::{Event, EventBuilder, EventId, JsonUtil, Keys, Kind, Metadata, PublicKey, Tag, TagStandard, Timestamp, UncheckedUrl, Url}; use nostr_sdk::prelude::Marker; use TagStandard::Hashtag; +use tokio::sync::mpsc::Sender; use crate::{EventSender, MostrMessage}; use crate::helpers::{format_stamp, local_datetimestamp, parse_tracking_stamp, relative_datetimestamp, some_non_empty}; @@ -105,11 +105,11 @@ impl Display for StateFilter { } impl Tasks { - pub(crate) fn from(url: Option, tx: &Sender, keys: &Keys) -> Self { + pub(crate) fn from(url: Option, tx: Sender, keys: Keys) -> Self { Self::with_sender(EventSender { url, - tx: tx.clone(), - keys: keys.clone(), + tx, + keys, queue: Default::default(), }) } @@ -1093,7 +1093,7 @@ mod tasks_test { use super::*; fn stub_tasks() -> Tasks { - use std::sync::mpsc; + use tokio::sync::mpsc; use nostr_sdk::Keys; let (tx, _rx) = mpsc::channel();