fix: try to fully migrate to tokio

This commit is contained in:
xeruf 2024-08-20 13:52:09 +03:00
parent f98486f012
commit 627d57fc8d
2 changed files with 401 additions and 391 deletions

View file

@ -8,18 +8,20 @@ use std::iter::once;
use std::ops::Sub; use std::ops::Sub;
use std::path::PathBuf; use std::path::PathBuf;
use std::str::FromStr; use std::str::FromStr;
use std::sync::mpsc;
use std::sync::mpsc::RecvTimeoutError;
use std::sync::mpsc::Sender;
use std::time::Duration; use std::time::Duration;
use colored::{ColoredString, Colorize}; use colored::Colorize;
use env_logger::Builder; use env_logger::Builder;
use itertools::Itertools; use itertools::Itertools;
use log::{debug, error, info, LevelFilter, trace, warn}; use log::{debug, error, info, LevelFilter, trace, warn};
use nostr_sdk::async_utility::futures_util::TryFutureExt;
use nostr_sdk::prelude::*; use nostr_sdk::prelude::*;
use nostr_sdk::TagStandard::Hashtag; use nostr_sdk::TagStandard::Hashtag;
use regex::Regex; use regex::Regex;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Sender;
use tokio::time::error::Elapsed;
use tokio::time::timeout;
use xdg::BaseDirectories; use xdg::BaseDirectories;
use crate::helpers::*; use crate::helpers::*;
@ -79,12 +81,12 @@ impl EventSender {
fn force_flush(&self) { fn force_flush(&self) {
debug!("Flushing {} events from queue", self.queue.borrow().len()); debug!("Flushing {} events from queue", self.queue.borrow().len());
let values = self.clear(); let values = self.clear();
self.url.as_ref().map(|url| { if let Some(url) = self.url.as_ref() {
self.tx.send(MostrMessage::AddTasks(url.clone(), values)).inspect_err(|e| { let _ = self.tx.blocking_send(MostrMessage::AddTasks(url.clone(), values)).map_err(|e| {
error!("Nostr communication thread failure, changes will not be persisted: {}", e) error!("Nostr communication thread failure, changes will not be persisted: {}", e)
})
}); });
} }
}
/// Sends all pending events if there is a non-tracking event /// Sends all pending events if there is a non-tracking event
fn flush(&self) { fn flush(&self) {
if self.queue.borrow().iter().any(|event| event.kind.as_u16() != TRACKING_KIND) { if self.queue.borrow().iter().any(|event| event.kind.as_u16() != TRACKING_KIND) {
@ -185,85 +187,27 @@ async fn main() {
], None).await; ], None).await;
info!("Subscribed to tasks with {:?}", sub1); info!("Subscribed to tasks with {:?}", sub1);
let mut notifications = client.notifications();
client.connect().await;
let sub2 = client.subscribe(vec![ let sub2 = client.subscribe(vec![
Filter::new().kinds(PROP_KINDS.into_iter().map(|k| Kind::from(k))) Filter::new().kinds(PROP_KINDS.into_iter().map(|k| Kind::from(k)))
], None).await; ], None).await;
info!("Subscribed to updates with {:?}", sub2); info!("Subscribed to updates with {:?}", sub2);
let mut notifications = client.notifications();
client.connect().await;
let (tx, rx) = mpsc::channel::<MostrMessage>(); let (tx, mut rx) = mpsc::channel::<MostrMessage>(64);
let tasks_for_url = |url: Option<Url>| Tasks::from(url, &tx, &keys); let mut local_tasks = Tasks::from(None, tx.clone(), keys.clone());
let mut relays: HashMap<Url, Tasks> =
client.relays().await.into_keys().map(|url| (url.clone(), tasks_for_url(Some(url)))).collect();
let sender = tokio::spawn(async move { let mypubkey = keys.public_key();
let mut queue: Option<(Url, Vec<Event>)> = None; let closureSender = tx.clone();
let closureKeys = keys.clone();
let tasks_for_url = move |url: Option<Url>| Tasks::from(url, closureSender.clone(), closureKeys.clone());
loop { let mut relays: HashMap<Url, Tasks> = Default::default();
if let Ok(user) = var("USER") { for (url, _) in client.relays().await {
let metadata = Metadata::new() relays.insert(url.clone(), tasks_for_url(Some(url)));
.name(user);
// .display_name("My Username")
// .about("Description")
// .picture(Url::parse("https://example.com/avatar.png")?)
// .banner(Url::parse("https://example.com/banner.png")?)
// .nip05("username@example.com")
// .lud16("yuki@getalby.com")
// .custom_field("custom_field", "my value");
or_print(client.set_metadata(&metadata).await);
} }
//client.relays().await.into_keys().map(|url| (url.clone(), tasks_for_url(Some(url)))).collect();
let result_received = rx.recv_timeout(Duration::from_secs(INACTVITY_DELAY));
match result_received {
Ok(MostrMessage::NewRelay(url)) => {
if client.add_relay(&url).await.unwrap() {
match client.connect_relay(&url).await {
Ok(()) => info!("Connected to {url}"),
Err(e) => warn!("Unable to connect to relay {url}: {e}")
}
} else {
warn!("Relay {url} already added");
}
}
Ok(MostrMessage::AddTasks(url, mut events)) => {
trace!("Queueing {:?}", &events);
if let Some((queue_url, mut queue_events)) = queue {
if queue_url == url {
queue_events.append(&mut events);
queue = Some((queue_url, queue_events));
} else {
info!("Sending {} events to {url} due to relay change", queue_events.len());
client.batch_event_to(vec![queue_url], queue_events, RelaySendOptions::new()).await;
queue = None;
}
}
if queue.is_none() {
events.reserve(events.len() + 10);
queue = Some((url, events))
}
}
Ok(MostrMessage::Flush) | Err(RecvTimeoutError::Timeout) => if let Some((url, events)) = queue {
info!("Sending {} events to {url} due to {}", events.len(),
result_received.map_or_else(|e| format!("{:?}", e), |m| format!("{:?}", m)));
client.batch_event_to(vec![url], events, RelaySendOptions::new()).await;
queue = None;
}
Err(err) => {
debug!("Finalizing nostr communication thread because of {:?}: {}", err, err);
break;
}
}
}
if let Some((url, events)) = queue {
info!("Sending {} events to {url} before exiting", events.len());
client.batch_event_to(vec![url], events, RelaySendOptions::new()).await;
}
info!("Shutting down nostr communication thread");
});
let mut local_tasks = Tasks::from(None, &tx, &keys);
let mut selected_relay: Option<Url> = relays.keys().nth(0).cloned(); let mut selected_relay: Option<Url> = relays.keys().nth(0).cloned();
{ {
@ -273,6 +217,7 @@ async fn main() {
} }
} }
let sender = tokio::spawn(async move {
let mut lines = stdin().lines(); let mut lines = stdin().lines();
loop { loop {
trace!("All Root Tasks:\n{}", relays.iter().map(|(url, tasks)| trace!("All Root Tasks:\n{}", relays.iter().map(|(url, tasks)|
@ -392,11 +337,10 @@ async fn main() {
} }
Some(arg) => { Some(arg) => {
if arg == "@" { if arg == "@" {
let key = keys.public_key();
info!("Filtering for own tasks"); info!("Filtering for own tasks");
tasks.set_filter( tasks.set_filter(
tasks.filtered_tasks(tasks.get_position_ref()) tasks.filtered_tasks(tasks.get_position_ref())
.filter(|t| t.event.pubkey == key) .filter(|t| t.event.pubkey == mypubkey)
.map(|t| t.event.id) .map(|t| t.event.id)
.collect() .collect()
) )
@ -499,6 +443,7 @@ async fn main() {
let (label, times) = tasks.times_tracked(); let (label, times) = tasks.times_tracked();
println!("{}\n{}", label.italic(), times.rev().take(15).join("\n")); println!("{}\n{}", label.italic(), times.rev().take(15).join("\n"));
} }
// TODO show history from author / pubkey
} else { } else {
let (label, mut times) = tasks.times_tracked(); let (label, mut times) = tasks.times_tracked();
println!("{}\n{}", label.italic(), times.join("\n")); println!("{}\n{}", label.italic(), times.join("\n"));
@ -512,7 +457,7 @@ async fn main() {
Some(arg) => { Some(arg) => {
if parse_tracking_stamp(arg).map(|stamp| tasks.track_at(stamp, None)).is_none() { if parse_tracking_stamp(arg).map(|stamp| tasks.track_at(stamp, None)).is_none() {
// So the error message is not covered up // So the error message is not covered up
continue continue;
} }
} }
} }
@ -595,7 +540,7 @@ async fn main() {
} }
match Url::parse(&input) { match Url::parse(&input) {
Err(e) => warn!("Failed to parse url \"{input}\": {}", e), Err(e) => warn!("Failed to parse url \"{input}\": {}", e),
Ok(url) => match tx.send(MostrMessage::NewRelay(url.clone())) { Ok(url) => match tx.send(MostrMessage::NewRelay(url.clone())).await {
Err(e) => error!("Nostr communication thread failure, cannot add relay \"{url}\": {e}"), Err(e) => error!("Nostr communication thread failure, cannot add relay \"{url}\": {e}"),
Ok(_) => { Ok(_) => {
info!("Connecting to {url}"); info!("Connecting to {url}");
@ -620,6 +565,71 @@ async fn main() {
drop(tx); drop(tx);
drop(local_tasks); drop(local_tasks);
drop(relays); drop(relays);
});
let mut queue: Option<(Url, Vec<Event>)> = None;
if let Ok(user) = var("USER") {
let metadata = Metadata::new()
.name(user);
// .display_name("My Username")
// .about("Description")
// .picture(Url::parse("https://example.com/avatar.png")?)
// .banner(Url::parse("https://example.com/banner.png")?)
// .nip05("username@example.com")
// .lud16("yuki@getalby.com")
// .custom_field("custom_field", "my value");
or_print(client.set_metadata(&metadata).await);
}
loop {
let result_received = timeout(Duration::from_secs(INACTVITY_DELAY), rx.recv()).await;
debug!("received {:?}", &result_received);
match result_received {
Ok(Some(MostrMessage::NewRelay(url))) => {
if client.add_relay(&url).await.unwrap() {
match client.connect_relay(&url).await {
Ok(()) => info!("Connected to {url}"),
Err(e) => warn!("Unable to connect to relay {url}: {e}")
}
} else {
warn!("Relay {url} already added");
}
}
Ok(Some(MostrMessage::AddTasks(url, mut events))) => {
trace!("Queueing {:?}", &events);
if let Some((queue_url, mut queue_events)) = queue {
if queue_url == url {
queue_events.append(&mut events);
queue = Some((queue_url, queue_events));
} else {
info!("Sending {} events to {url} due to relay change", queue_events.len());
client.batch_event_to(vec![queue_url], queue_events, RelaySendOptions::new()).await;
queue = None;
}
}
if queue.is_none() {
events.reserve(events.len() + 10);
queue = Some((url, events))
}
}
Ok(Some(MostrMessage::Flush)) | Err(Elapsed { .. }) => if let Some((url, events)) = queue {
info!("Sending {} events to {url} due to {}", events.len(),
result_received.map_or("inactivity", |_| "flush message"));
client.batch_event_to(vec![url], events, RelaySendOptions::new()).await;
queue = None;
}
Ok(None) => {
debug!("Finalizing nostr communication thread because communication channel was closed");
break;
}
}
}
if let Some((url, events)) = queue {
info!("Sending {} events to {url} before exiting", events.len());
client.batch_event_to(vec![url], events, RelaySendOptions::new()).await;
}
info!("Shutting down nostr communication thread");
info!("Submitting pending updates..."); info!("Submitting pending updates...");
or_print(sender.await); or_print(sender.await);

View file

@ -4,7 +4,6 @@ use std::io::{Error, stdout, Write};
use std::iter::{empty, once}; use std::iter::{empty, once};
use std::ops::{Div, Rem}; use std::ops::{Div, Rem};
use std::str::FromStr; use std::str::FromStr;
use std::sync::mpsc::Sender;
use std::time::Duration; use std::time::Duration;
use chrono::Local; use chrono::Local;
@ -14,6 +13,7 @@ use log::{debug, error, info, trace, warn};
use nostr_sdk::{Event, EventBuilder, EventId, JsonUtil, Keys, Kind, Metadata, PublicKey, Tag, TagStandard, Timestamp, UncheckedUrl, Url}; use nostr_sdk::{Event, EventBuilder, EventId, JsonUtil, Keys, Kind, Metadata, PublicKey, Tag, TagStandard, Timestamp, UncheckedUrl, Url};
use nostr_sdk::prelude::Marker; use nostr_sdk::prelude::Marker;
use TagStandard::Hashtag; use TagStandard::Hashtag;
use tokio::sync::mpsc::Sender;
use crate::{EventSender, MostrMessage}; use crate::{EventSender, MostrMessage};
use crate::helpers::{format_stamp, local_datetimestamp, parse_tracking_stamp, relative_datetimestamp, some_non_empty}; use crate::helpers::{format_stamp, local_datetimestamp, parse_tracking_stamp, relative_datetimestamp, some_non_empty};
@ -105,11 +105,11 @@ impl Display for StateFilter {
} }
impl Tasks { impl Tasks {
pub(crate) fn from(url: Option<Url>, tx: &Sender<MostrMessage>, keys: &Keys) -> Self { pub(crate) fn from(url: Option<Url>, tx: Sender<MostrMessage>, keys: Keys) -> Self {
Self::with_sender(EventSender { Self::with_sender(EventSender {
url, url,
tx: tx.clone(), tx,
keys: keys.clone(), keys,
queue: Default::default(), queue: Default::default(),
}) })
} }
@ -1093,7 +1093,7 @@ mod tasks_test {
use super::*; use super::*;
fn stub_tasks() -> Tasks { fn stub_tasks() -> Tasks {
use std::sync::mpsc; use tokio::sync::mpsc;
use nostr_sdk::Keys; use nostr_sdk::Keys;
let (tx, _rx) = mpsc::channel(); let (tx, _rx) = mpsc::channel();