From 45b8f9cf0fa9d9ee7c39d2383a92bed2df539cf9 Mon Sep 17 00:00:00 2001 From: xeruf <27jf@pm.me> Date: Tue, 13 Aug 2024 21:35:35 +0300 Subject: [PATCH] feat(main): define MostrMessage type for inter-thread channel --- src/main.rs | 23 +++++++++++++++++------ src/tasks.rs | 4 ++-- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/src/main.rs b/src/main.rs index 1cbe1ea..8e35874 100644 --- a/src/main.rs +++ b/src/main.rs @@ -23,6 +23,7 @@ use xdg::BaseDirectories; use crate::helpers::*; use crate::kinds::{KINDS, PROPERTY_COLUMNS, TRACKING_KIND}; +use crate::MostrMessage::AddTasks; use crate::task::{MARKER_DEPENDS, MARKER_PARENT, State}; use crate::tasks::{PropertyCollection, StateFilter, Tasks}; @@ -39,12 +40,12 @@ type Events = Vec; #[derive(Debug, Clone)] struct EventSender { url: Option, - tx: Sender<(Url, Events)>, + tx: Sender, keys: Keys, queue: RefCell, } impl EventSender { - fn from(url: Option, tx: &Sender<(Url, Events)>, keys: &Keys) -> Self { + fn from(url: Option, tx: &Sender, keys: &Keys) -> Self { EventSender { url, tx: tx.clone(), @@ -79,7 +80,7 @@ impl EventSender { debug!("Flushing {} events from queue", self.queue.borrow().len()); let values = self.clear(); self.url.as_ref().map(|url| { - or_print(self.tx.send((url.clone(), values))); + or_print(self.tx.send(AddTasks(url.clone(), values))); }); } /// Sends all pending events if there is a non-tracking event @@ -103,6 +104,13 @@ impl Drop for EventSender { } } +#[derive(Debug, Clone, Eq, PartialEq)] +enum MostrMessage { + Flush, + NewRelay(Url), + AddTasks(Url, Vec), +} + #[tokio::main] async fn main() { let mut args = args().skip(1).peekable(); @@ -201,7 +209,7 @@ async fn main() { client.connect().await; let mut notifications = client.notifications(); - let (tx, rx) = mpsc::channel(); + let (tx, rx) = mpsc::channel::(); let tasks_for_url = |url: Option| Tasks::from(url, &tx, &keys); let mut relays: HashMap = client.relays().await.into_keys().map(|url| (url.clone(), tasks_for_url(Some(url)))).collect(); @@ -231,7 +239,7 @@ async fn main() { loop { // TODO invalid acknowledgement from bucket relay slows sending down match rx.recv_timeout(Duration::from_secs(INACTVITY_DELAY)) { - Ok((url, mut events)) => { + Ok(AddTasks(url, mut events)) => { if 1 == 2 { client.connect_relay("").await; } @@ -256,7 +264,10 @@ async fn main() { client.batch_event_to(vec![url], events, RelaySendOptions::new()).await; queue = None; } - _ => break + arg => { + debug!("Finalizing nostr communication thread because of {:?}", arg); + break + } } } if let Some((url, events)) = queue { diff --git a/src/tasks.rs b/src/tasks.rs index 9d51444..4b63638 100644 --- a/src/tasks.rs +++ b/src/tasks.rs @@ -16,7 +16,7 @@ use nostr_sdk::{Event, EventBuilder, EventId, Keys, Kind, PublicKey, Tag, TagSta use nostr_sdk::prelude::Marker; use TagStandard::Hashtag; -use crate::{Events, EventSender}; +use crate::{Events, EventSender, MostrMessage}; use crate::helpers::some_non_empty; use crate::kinds::*; use crate::task::{MARKER_DEPENDS, MARKER_PARENT, State, Task, TaskState}; @@ -105,7 +105,7 @@ impl Display for StateFilter { } impl Tasks { - pub(crate) fn from(url: Option, tx: &Sender<(Url, Events)>, keys: &Keys) -> Self { + pub(crate) fn from(url: Option, tx: &Sender, keys: &Keys) -> Self { Self::with_sender(EventSender { url, tx: tx.clone(),