forked from janek/mostr
1
0
Fork 0

feat(main): define MostrMessage type for inter-thread channel

This commit is contained in:
xeruf 2024-08-13 21:35:35 +03:00
parent ae4d315d87
commit 45b8f9cf0f
2 changed files with 19 additions and 8 deletions

View File

@ -23,6 +23,7 @@ use xdg::BaseDirectories;
use crate::helpers::*; use crate::helpers::*;
use crate::kinds::{KINDS, PROPERTY_COLUMNS, TRACKING_KIND}; use crate::kinds::{KINDS, PROPERTY_COLUMNS, TRACKING_KIND};
use crate::MostrMessage::AddTasks;
use crate::task::{MARKER_DEPENDS, MARKER_PARENT, State}; use crate::task::{MARKER_DEPENDS, MARKER_PARENT, State};
use crate::tasks::{PropertyCollection, StateFilter, Tasks}; use crate::tasks::{PropertyCollection, StateFilter, Tasks};
@ -39,12 +40,12 @@ type Events = Vec<Event>;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct EventSender { struct EventSender {
url: Option<Url>, url: Option<Url>,
tx: Sender<(Url, Events)>, tx: Sender<MostrMessage>,
keys: Keys, keys: Keys,
queue: RefCell<Events>, queue: RefCell<Events>,
} }
impl EventSender { impl EventSender {
fn from(url: Option<Url>, tx: &Sender<(Url, Events)>, keys: &Keys) -> Self { fn from(url: Option<Url>, tx: &Sender<MostrMessage>, keys: &Keys) -> Self {
EventSender { EventSender {
url, url,
tx: tx.clone(), tx: tx.clone(),
@ -79,7 +80,7 @@ impl EventSender {
debug!("Flushing {} events from queue", self.queue.borrow().len()); debug!("Flushing {} events from queue", self.queue.borrow().len());
let values = self.clear(); let values = self.clear();
self.url.as_ref().map(|url| { self.url.as_ref().map(|url| {
or_print(self.tx.send((url.clone(), values))); or_print(self.tx.send(AddTasks(url.clone(), values)));
}); });
} }
/// Sends all pending events if there is a non-tracking event /// Sends all pending events if there is a non-tracking event
@ -103,6 +104,13 @@ impl Drop for EventSender {
} }
} }
#[derive(Debug, Clone, Eq, PartialEq)]
enum MostrMessage {
Flush,
NewRelay(Url),
AddTasks(Url, Vec<Event>),
}
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
let mut args = args().skip(1).peekable(); let mut args = args().skip(1).peekable();
@ -201,7 +209,7 @@ async fn main() {
client.connect().await; client.connect().await;
let mut notifications = client.notifications(); let mut notifications = client.notifications();
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel::<MostrMessage>();
let tasks_for_url = |url: Option<Url>| Tasks::from(url, &tx, &keys); let tasks_for_url = |url: Option<Url>| Tasks::from(url, &tx, &keys);
let mut relays: HashMap<Url, Tasks> = let mut relays: HashMap<Url, Tasks> =
client.relays().await.into_keys().map(|url| (url.clone(), tasks_for_url(Some(url)))).collect(); client.relays().await.into_keys().map(|url| (url.clone(), tasks_for_url(Some(url)))).collect();
@ -231,7 +239,7 @@ async fn main() {
loop { loop {
// TODO invalid acknowledgement from bucket relay slows sending down // TODO invalid acknowledgement from bucket relay slows sending down
match rx.recv_timeout(Duration::from_secs(INACTVITY_DELAY)) { match rx.recv_timeout(Duration::from_secs(INACTVITY_DELAY)) {
Ok((url, mut events)) => { Ok(AddTasks(url, mut events)) => {
if 1 == 2 { if 1 == 2 {
client.connect_relay("").await; client.connect_relay("").await;
} }
@ -256,7 +264,10 @@ async fn main() {
client.batch_event_to(vec![url], events, RelaySendOptions::new()).await; client.batch_event_to(vec![url], events, RelaySendOptions::new()).await;
queue = None; queue = None;
} }
_ => break arg => {
debug!("Finalizing nostr communication thread because of {:?}", arg);
break
}
} }
} }
if let Some((url, events)) = queue { if let Some((url, events)) = queue {

View File

@ -16,7 +16,7 @@ use nostr_sdk::{Event, EventBuilder, EventId, Keys, Kind, PublicKey, Tag, TagSta
use nostr_sdk::prelude::Marker; use nostr_sdk::prelude::Marker;
use TagStandard::Hashtag; use TagStandard::Hashtag;
use crate::{Events, EventSender}; use crate::{Events, EventSender, MostrMessage};
use crate::helpers::some_non_empty; use crate::helpers::some_non_empty;
use crate::kinds::*; use crate::kinds::*;
use crate::task::{MARKER_DEPENDS, MARKER_PARENT, State, Task, TaskState}; use crate::task::{MARKER_DEPENDS, MARKER_PARENT, State, Task, TaskState};
@ -105,7 +105,7 @@ impl Display for StateFilter {
} }
impl Tasks { impl Tasks {
pub(crate) fn from(url: Option<Url>, tx: &Sender<(Url, Events)>, keys: &Keys) -> Self { pub(crate) fn from(url: Option<Url>, tx: &Sender<MostrMessage>, keys: &Keys) -> Self {
Self::with_sender(EventSender { Self::with_sender(EventSender {
url, url,
tx: tx.clone(), tx: tx.clone(),