From dd4b0970040fe4be243937329f6a77475de9aff5 Mon Sep 17 00:00:00 2001 From: xeruf <27jf@pm.me> Date: Tue, 20 Aug 2024 14:27:16 +0300 Subject: [PATCH] fix: move from std::sync fully to tokio Fixes Relay adding Closes https://github.com/rust-nostr/nostr/issues/533 --- src/main.rs | 29 +++++++++++++++-------------- src/tasks.rs | 7 +++---- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/src/main.rs b/src/main.rs index e6a1dae..97755ee 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,18 +8,19 @@ use std::iter::once; use std::ops::Sub; use std::path::PathBuf; use std::str::FromStr; -use std::sync::mpsc; -use std::sync::mpsc::RecvTimeoutError; -use std::sync::mpsc::Sender; use std::time::Duration; -use colored::{ColoredString, Colorize}; +use colored::Colorize; use env_logger::Builder; use itertools::Itertools; use log::{debug, error, info, LevelFilter, trace, warn}; use nostr_sdk::prelude::*; use nostr_sdk::TagStandard::Hashtag; use regex::Regex; +use tokio::sync::mpsc; +use tokio::sync::mpsc::Sender; +use tokio::time::error::Elapsed; +use tokio::time::timeout; use xdg::BaseDirectories; use crate::helpers::*; @@ -80,7 +81,7 @@ impl EventSender { debug!("Flushing {} events from queue", self.queue.borrow().len()); let values = self.clear(); self.url.as_ref().map(|url| { - self.tx.send(MostrMessage::AddTasks(url.clone(), values)).inspect_err(|e| { + self.tx.try_send(MostrMessage::AddTasks(url.clone(), values)).err().map(|e| { error!("Nostr communication thread failure, changes will not be persisted: {}", e) }) }); @@ -193,7 +194,7 @@ async fn main() { ], None).await; info!("Subscribed to updates with {:?}", sub2); - let (tx, rx) = mpsc::channel::(); + let (tx, mut rx) = mpsc::channel::(64); let tasks_for_url = |url: Option| Tasks::from(url, &tx, &keys); let mut relays: HashMap = client.relays().await.into_keys().map(|url| (url.clone(), tasks_for_url(Some(url)))).collect(); @@ -208,9 +209,9 @@ async fn main() { } loop { - let result_received = rx.recv_timeout(Duration::from_secs(INACTVITY_DELAY)); + let result_received = timeout(Duration::from_secs(INACTVITY_DELAY), rx.recv()).await; match result_received { - Ok(MostrMessage::NewRelay(url)) => { + Ok(Some(MostrMessage::NewRelay(url))) => { if client.add_relay(&url).await.unwrap() { match client.connect_relay(&url).await { Ok(()) => info!("Connected to {url}"), @@ -220,7 +221,7 @@ async fn main() { warn!("Relay {url} already added"); } } - Ok(MostrMessage::AddTasks(url, mut events)) => { + Ok(Some(MostrMessage::AddTasks(url, mut events))) => { trace!("Queueing {:?}", &events); if let Some((queue_url, mut queue_events)) = queue { if queue_url == url { @@ -237,14 +238,14 @@ async fn main() { queue = Some((url, events)) } } - Ok(MostrMessage::Flush) | Err(RecvTimeoutError::Timeout) => if let Some((url, events)) = queue { + Ok(Some(MostrMessage::Flush)) | Err(Elapsed { .. }) => if let Some((url, events)) = queue { info!("Sending {} events to {url} due to {}", events.len(), - result_received.map_or_else(|e| format!("{:?}", e), |m| format!("{:?}", m))); + result_received.map_or("inactivity", |_| "flush message")); client.batch_event_to(vec![url], events, RelaySendOptions::new()).await; queue = None; } - Err(err) => { - debug!("Finalizing nostr communication thread because of {:?}: {}", err, err); + Ok(None) => { + debug!("Finalizing nostr communication thread because communication channel was closed"); break; } } @@ -588,7 +589,7 @@ async fn main() { } match Url::parse(&input) { Err(e) => warn!("Failed to parse url \"{input}\": {}", e), - Ok(url) => match tx.send(MostrMessage::NewRelay(url.clone())) { + Ok(url) => match tx.send(MostrMessage::NewRelay(url.clone())).await { Err(e) => error!("Nostr communication thread failure, cannot add relay \"{url}\": {e}"), Ok(_) => { info!("Connecting to {url}"); diff --git a/src/tasks.rs b/src/tasks.rs index 6b10c2f..5573000 100644 --- a/src/tasks.rs +++ b/src/tasks.rs @@ -4,7 +4,6 @@ use std::io::{Error, stdout, Write}; use std::iter::{empty, once}; use std::ops::{Div, Rem}; use std::str::FromStr; -use std::sync::mpsc::Sender; use std::time::Duration; use chrono::Local; @@ -105,7 +104,7 @@ impl Display for StateFilter { } impl Tasks { - pub(crate) fn from(url: Option, tx: &Sender, keys: &Keys) -> Self { + pub(crate) fn from(url: Option, tx: &tokio::sync::mpsc::Sender, keys: &Keys) -> Self { Self::with_sender(EventSender { url, tx: tx.clone(), @@ -1093,10 +1092,10 @@ mod tasks_test { use super::*; fn stub_tasks() -> Tasks { - use std::sync::mpsc; + use tokio::sync::mpsc; use nostr_sdk::Keys; - let (tx, _rx) = mpsc::channel(); + let (tx, _rx) = mpsc::channel(16); Tasks::with_sender(EventSender { url: None, tx,