fix: move from std::sync fully to tokio

Fixes Relay adding

Closes https://github.com/rust-nostr/nostr/issues/533
This commit is contained in:
xeruf 2024-08-20 14:27:16 +03:00
parent b5b2ea9b71
commit 126bd8cf81
3 changed files with 18 additions and 20 deletions

View File

@ -172,8 +172,6 @@ Considering to use Calendar: https://github.com/nostr-protocol/nips/blob/master/
### Fixes ### Fixes
- New Relay does not load until next is added
https://github.com/rust-nostr/nostr/issues/533
- Handle event sending rejections (e.g. permissions) - Handle event sending rejections (e.g. permissions)
- Recursive filter handling - Recursive filter handling

View File

@ -8,18 +8,19 @@ use std::iter::once;
use std::ops::Sub; use std::ops::Sub;
use std::path::PathBuf; use std::path::PathBuf;
use std::str::FromStr; use std::str::FromStr;
use std::sync::mpsc;
use std::sync::mpsc::RecvTimeoutError;
use std::sync::mpsc::Sender;
use std::time::Duration; use std::time::Duration;
use colored::{ColoredString, Colorize}; use colored::Colorize;
use env_logger::Builder; use env_logger::Builder;
use itertools::Itertools; use itertools::Itertools;
use log::{debug, error, info, LevelFilter, trace, warn}; use log::{debug, error, info, LevelFilter, trace, warn};
use nostr_sdk::prelude::*; use nostr_sdk::prelude::*;
use nostr_sdk::TagStandard::Hashtag; use nostr_sdk::TagStandard::Hashtag;
use regex::Regex; use regex::Regex;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Sender;
use tokio::time::error::Elapsed;
use tokio::time::timeout;
use xdg::BaseDirectories; use xdg::BaseDirectories;
use crate::helpers::*; use crate::helpers::*;
@ -80,7 +81,7 @@ impl EventSender {
debug!("Flushing {} events from queue", self.queue.borrow().len()); debug!("Flushing {} events from queue", self.queue.borrow().len());
let values = self.clear(); let values = self.clear();
self.url.as_ref().map(|url| { self.url.as_ref().map(|url| {
self.tx.send(MostrMessage::AddTasks(url.clone(), values)).inspect_err(|e| { self.tx.try_send(MostrMessage::AddTasks(url.clone(), values)).err().map(|e| {
error!("Nostr communication thread failure, changes will not be persisted: {}", e) error!("Nostr communication thread failure, changes will not be persisted: {}", e)
}) })
}); });
@ -193,7 +194,7 @@ async fn main() {
], None).await; ], None).await;
info!("Subscribed to updates with {:?}", sub2); info!("Subscribed to updates with {:?}", sub2);
let (tx, rx) = mpsc::channel::<MostrMessage>(); let (tx, mut rx) = mpsc::channel::<MostrMessage>(64);
let tasks_for_url = |url: Option<Url>| Tasks::from(url, &tx, &keys); let tasks_for_url = |url: Option<Url>| Tasks::from(url, &tx, &keys);
let mut relays: HashMap<Url, Tasks> = let mut relays: HashMap<Url, Tasks> =
client.relays().await.into_keys().map(|url| (url.clone(), tasks_for_url(Some(url)))).collect(); client.relays().await.into_keys().map(|url| (url.clone(), tasks_for_url(Some(url)))).collect();
@ -208,9 +209,9 @@ async fn main() {
} }
loop { loop {
let result_received = rx.recv_timeout(Duration::from_secs(INACTVITY_DELAY)); let result_received = timeout(Duration::from_secs(INACTVITY_DELAY), rx.recv()).await;
match result_received { match result_received {
Ok(MostrMessage::NewRelay(url)) => { Ok(Some(MostrMessage::NewRelay(url))) => {
if client.add_relay(&url).await.unwrap() { if client.add_relay(&url).await.unwrap() {
match client.connect_relay(&url).await { match client.connect_relay(&url).await {
Ok(()) => info!("Connected to {url}"), Ok(()) => info!("Connected to {url}"),
@ -220,7 +221,7 @@ async fn main() {
warn!("Relay {url} already added"); warn!("Relay {url} already added");
} }
} }
Ok(MostrMessage::AddTasks(url, mut events)) => { Ok(Some(MostrMessage::AddTasks(url, mut events))) => {
trace!("Queueing {:?}", &events); trace!("Queueing {:?}", &events);
if let Some((queue_url, mut queue_events)) = queue { if let Some((queue_url, mut queue_events)) = queue {
if queue_url == url { if queue_url == url {
@ -237,14 +238,14 @@ async fn main() {
queue = Some((url, events)) queue = Some((url, events))
} }
} }
Ok(MostrMessage::Flush) | Err(RecvTimeoutError::Timeout) => if let Some((url, events)) = queue { Ok(Some(MostrMessage::Flush)) | Err(Elapsed { .. }) => if let Some((url, events)) = queue {
info!("Sending {} events to {url} due to {}", events.len(), info!("Sending {} events to {url} due to {}", events.len(),
result_received.map_or_else(|e| format!("{:?}", e), |m| format!("{:?}", m))); result_received.map_or("inactivity", |_| "flush message"));
client.batch_event_to(vec![url], events, RelaySendOptions::new()).await; client.batch_event_to(vec![url], events, RelaySendOptions::new()).await;
queue = None; queue = None;
} }
Err(err) => { Ok(None) => {
debug!("Finalizing nostr communication thread because of {:?}: {}", err, err); debug!("Finalizing nostr communication thread because communication channel was closed");
break; break;
} }
} }
@ -588,7 +589,7 @@ async fn main() {
} }
match Url::parse(&input) { match Url::parse(&input) {
Err(e) => warn!("Failed to parse url \"{input}\": {}", e), Err(e) => warn!("Failed to parse url \"{input}\": {}", e),
Ok(url) => match tx.send(MostrMessage::NewRelay(url.clone())) { Ok(url) => match tx.send(MostrMessage::NewRelay(url.clone())).await {
Err(e) => error!("Nostr communication thread failure, cannot add relay \"{url}\": {e}"), Err(e) => error!("Nostr communication thread failure, cannot add relay \"{url}\": {e}"),
Ok(_) => { Ok(_) => {
info!("Connecting to {url}"); info!("Connecting to {url}");

View File

@ -4,7 +4,6 @@ use std::io::{Error, stdout, Write};
use std::iter::{empty, once}; use std::iter::{empty, once};
use std::ops::{Div, Rem}; use std::ops::{Div, Rem};
use std::str::FromStr; use std::str::FromStr;
use std::sync::mpsc::Sender;
use std::time::Duration; use std::time::Duration;
use chrono::Local; use chrono::Local;
@ -105,7 +104,7 @@ impl Display for StateFilter {
} }
impl Tasks { impl Tasks {
pub(crate) fn from(url: Option<Url>, tx: &Sender<MostrMessage>, keys: &Keys) -> Self { pub(crate) fn from(url: Option<Url>, tx: &tokio::sync::mpsc::Sender<MostrMessage>, keys: &Keys) -> Self {
Self::with_sender(EventSender { Self::with_sender(EventSender {
url, url,
tx: tx.clone(), tx: tx.clone(),
@ -1093,10 +1092,10 @@ mod tasks_test {
use super::*; use super::*;
fn stub_tasks() -> Tasks { fn stub_tasks() -> Tasks {
use std::sync::mpsc; use tokio::sync::mpsc;
use nostr_sdk::Keys; use nostr_sdk::Keys;
let (tx, _rx) = mpsc::channel(); let (tx, _rx) = mpsc::channel(16);
Tasks::with_sender(EventSender { Tasks::with_sender(EventSender {
url: None, url: None,
tx, tx,