From 2905d35fedf1c7be03f5318381b635ac0cec82f7 Mon Sep 17 00:00:00 2001 From: xeruf <27jf@pm.me> Date: Tue, 20 Aug 2024 14:05:51 +0300 Subject: [PATCH] try tokio async --- README.md | 6 ++-- src/main.rs | 99 ++++++++++++++++++++++++++++------------------------ src/tasks.rs | 42 +++++++++++----------- 3 files changed, 77 insertions(+), 70 deletions(-) diff --git a/README.md b/README.md index 09744db..1181ebd 100644 --- a/README.md +++ b/README.md @@ -115,8 +115,8 @@ Dot or slash can be repeated to move to parent tasks before acting. - `<[TEXT]` - close active task and move up, with optional status description - `!TEXT` - set status for current task from text and move up (empty: Open) - `,[TEXT]` - list notes or add text note (comment / description) -- TBI: `*[INT]` - set priority - can also be used in task creation, with any digit -- TBI: `;[TEXT]` - list comments or comment on task +- `;[TEXT]` - list comments or comment on task +- `*[INT]` - set priority - can also be used in task creation, with any digit - TBI: show status history and creation with attribution - `&` - undo last action (moving in place or upwards confirms pending actions) - `wss://...` - switch or subscribe to relay (prefix with space to forcibly add a new one) @@ -127,7 +127,7 @@ Property Filters: - `+TAG` - add tag filter - `-TAG` - remove tag filters by prefix - `?STATUS` - filter by status (type or description) - plain `?` to reset, `??` to show all -- `@AUTHOR` - filter by time or author (pubkey, or `@` for self, TBI: id prefix, name prefix) +- `@AUTHOR` - filter by author (`@` for self, id prefix, name prefix) - TBI: `**INT` - filter by priority - TBI: Filter by time diff --git a/src/main.rs b/src/main.rs index 590b777..5952faf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,18 +8,20 @@ use std::iter::once; use std::ops::Sub; use std::path::PathBuf; use std::str::FromStr; -use std::sync::mpsc; -use std::sync::mpsc::RecvTimeoutError; -use std::sync::mpsc::Sender; +use tokio::sync::mpsc; use std::time::Duration; -use colored::{ColoredString, Colorize}; +use colored::{Colorize}; use env_logger::Builder; use itertools::Itertools; use log::{debug, error, info, LevelFilter, trace, warn}; +use nostr_sdk::async_utility::futures_util::TryFutureExt; use nostr_sdk::prelude::*; use nostr_sdk::TagStandard::Hashtag; use regex::Regex; +use tokio::sync::mpsc::Sender; +use tokio::time::error::Elapsed; +use tokio::time::timeout; use xdg::BaseDirectories; use crate::helpers::*; @@ -54,7 +56,7 @@ impl EventSender { } } - fn submit(&self, event_builder: EventBuilder) -> Result { + async fn submit(&self, event_builder: EventBuilder) -> Result { { // Always flush if oldest event older than a minute or newer than now let borrow = self.queue.borrow(); @@ -62,7 +64,7 @@ impl EventSender { if borrow.iter().any(|e| e.created_at < min || e.created_at > Timestamp::now()) { drop(borrow); debug!("Flushing event queue because it is older than a minute"); - self.force_flush(); + self.force_flush().await; } } let mut queue = self.queue.borrow_mut(); @@ -76,19 +78,19 @@ impl EventSender { })?) } /// Sends all pending events - fn force_flush(&self) { + async fn force_flush(&self) { debug!("Flushing {} events from queue", self.queue.borrow().len()); let values = self.clear(); - self.url.as_ref().map(|url| { + if let Some(url) = self.url.as_ref() { self.tx.send(MostrMessage::AddTasks(url.clone(), values)).inspect_err(|e| { error!("Nostr communication thread failure, changes will not be persisted: {}", e) - }) - }); + }).await; + } } /// Sends all pending events if there is a non-tracking event - fn flush(&self) { + async fn flush(&self) { if self.queue.borrow().iter().any(|event| event.kind.as_u16() != TRACKING_KIND) { - self.force_flush() + self.force_flush().await } } fn clear(&self) -> Events { @@ -185,15 +187,15 @@ async fn main() { ], None).await; info!("Subscribed to tasks with {:?}", sub1); - let mut notifications = client.notifications(); - client.connect().await; - let sub2 = client.subscribe(vec![ Filter::new().kinds(PROP_KINDS.into_iter().map(|k| Kind::from(k))) ], None).await; + info!("Subscribed to updates with {:?}", sub2); - - let (tx, rx) = mpsc::channel::(); + let mut notifications = client.notifications(); + client.connect().await; + + let (tx, mut rx) = mpsc::channel::(64); let tasks_for_url = |url: Option| Tasks::from(url, &tx, &keys); let mut relays: HashMap = client.relays().await.into_keys().map(|url| (url.clone(), tasks_for_url(Some(url)))).collect(); @@ -201,23 +203,24 @@ async fn main() { let sender = tokio::spawn(async move { let mut queue: Option<(Url, Vec)> = None; - loop { - if let Ok(user) = var("USER") { - let metadata = Metadata::new() - .name(user); - // .display_name("My Username") - // .about("Description") - // .picture(Url::parse("https://example.com/avatar.png")?) - // .banner(Url::parse("https://example.com/banner.png")?) - // .nip05("username@example.com") - // .lud16("yuki@getalby.com") - // .custom_field("custom_field", "my value"); - or_print(client.set_metadata(&metadata).await); - } + if let Ok(user) = var("USER") { + let metadata = Metadata::new() + .name(user); + // .display_name("My Username") + // .about("Description") + // .picture(Url::parse("https://example.com/avatar.png")?) + // .banner(Url::parse("https://example.com/banner.png")?) + // .nip05("username@example.com") + // .lud16("yuki@getalby.com") + // .custom_field("custom_field", "my value"); + or_print(client.set_metadata(&metadata).await); + } - let result_received = rx.recv_timeout(Duration::from_secs(INACTVITY_DELAY)); + loop { + let result_received = timeout(Duration::from_secs(INACTVITY_DELAY), rx.recv()).await; + debug!("received {:?}", &result_received); match result_received { - Ok(MostrMessage::NewRelay(url)) => { + Ok(Some(MostrMessage::NewRelay(url))) => { if client.add_relay(&url).await.unwrap() { match client.connect_relay(&url).await { Ok(()) => info!("Connected to {url}"), @@ -227,7 +230,7 @@ async fn main() { warn!("Relay {url} already added"); } } - Ok(MostrMessage::AddTasks(url, mut events)) => { + Ok(Some(MostrMessage::AddTasks(url, mut events))) => { trace!("Queueing {:?}", &events); if let Some((queue_url, mut queue_events)) = queue { if queue_url == url { @@ -244,14 +247,14 @@ async fn main() { queue = Some((url, events)) } } - Ok(MostrMessage::Flush) | Err(RecvTimeoutError::Timeout) => if let Some((url, events)) = queue { + Ok(Some(MostrMessage::Flush)) | Err(Elapsed { .. }) => if let Some((url, events)) = queue { info!("Sending {} events to {url} due to {}", events.len(), - result_received.map_or_else(|e| format!("{:?}", e), |m| format!("{:?}", m))); + result_received.map_or("inactivity", |_| "flush message")); client.batch_event_to(vec![url], events, RelaySendOptions::new()).await; queue = None; } - Err(err) => { - debug!("Finalizing nostr communication thread because of {:?}: {}", err, err); + Ok(None) => { + debug!("Finalizing nostr communication thread because communication channel was closed"); break; } } @@ -361,7 +364,9 @@ async fn main() { ); continue; } - Some(arg) => tasks.make_note(arg), + Some(arg) => { + tasks.make_note(arg).await; + } } Some('>') => { @@ -436,7 +441,7 @@ async fn main() { StateFilter::State(State::Procedure.to_string())); } Some(id) => { - tasks.set_state_for(id, "", State::Procedure); + tasks.set_state_for(id, "", State::Procedure).await; } }, Some(arg) => 'arm: { @@ -446,12 +451,12 @@ async fn main() { tasks.make_task_with( arg, once(tasks.make_event_tag_from_id(pos, MARKER_DEPENDS)), - true); + true).await; break 'arm; } } let arg: String = arg.chars().skip_while(|c| c == &'|').collect(); - tasks.make_task_and_enter(&arg, State::Procedure); + tasks.make_task_and_enter(&arg, State::Procedure).await; } } @@ -474,7 +479,8 @@ async fn main() { Some('#') => match arg { - Some(arg) => tasks.set_tags(arg.split_whitespace().map(|s| Hashtag(s.to_string()).into())), + Some(arg) => + tasks.set_tags(arg.split_whitespace().map(|s| Hashtag(s.to_string()).into())), None => { println!("Hashtags of all known tasks:\n{}", tasks.all_hashtags().join(" ")); continue; @@ -499,6 +505,7 @@ async fn main() { let (label, times) = tasks.times_tracked(); println!("{}\n{}", label.italic(), times.rev().take(15).join("\n")); } + // TODO show history from author / pubkey } else { let (label, mut times) = tasks.times_tracked(); println!("{}\n{}", label.italic(), times.join("\n")); @@ -512,7 +519,7 @@ async fn main() { Some(arg) => { if parse_tracking_stamp(arg).map(|stamp| tasks.track_at(stamp, None)).is_none() { // So the error message is not covered up - continue + continue; } } } @@ -538,7 +545,7 @@ async fn main() { } tasks.set_depth(depth); } else { - tasks.filter_or_create(pos.cloned().as_ref(), slice).map(|id| tasks.move_to(Some(id))); + tasks.filter_or_create(pos.cloned().as_ref(), slice).await.map(|id| tasks.move_to(Some(id))); } } @@ -595,7 +602,7 @@ async fn main() { } match Url::parse(&input) { Err(e) => warn!("Failed to parse url \"{input}\": {}", e), - Ok(url) => match tx.send(MostrMessage::NewRelay(url.clone())) { + Ok(url) => match tx.send(MostrMessage::NewRelay(url.clone())).await { Err(e) => error!("Nostr communication thread failure, cannot add relay \"{url}\": {e}"), Ok(_) => { info!("Connecting to {url}"); @@ -606,7 +613,7 @@ async fn main() { } continue; } else { - tasks.filter_or_create(tasks.get_position().as_ref(), &input); + tasks.filter_or_create(tasks.get_position().as_ref(), &input).await; } } or_print(tasks.print_tasks()); diff --git a/src/tasks.rs b/src/tasks.rs index 6b10c2f..fa6d54b 100644 --- a/src/tasks.rs +++ b/src/tasks.rs @@ -4,7 +4,6 @@ use std::io::{Error, stdout, Write}; use std::iter::{empty, once}; use std::ops::{Div, Rem}; use std::str::FromStr; -use std::sync::mpsc::Sender; use std::time::Duration; use chrono::Local; @@ -14,6 +13,7 @@ use log::{debug, error, info, trace, warn}; use nostr_sdk::{Event, EventBuilder, EventId, JsonUtil, Keys, Kind, Metadata, PublicKey, Tag, TagStandard, Timestamp, UncheckedUrl, Url}; use nostr_sdk::prelude::Marker; use TagStandard::Hashtag; +use tokio::sync::mpsc::Sender; use crate::{EventSender, MostrMessage}; use crate::helpers::{format_stamp, local_datetimestamp, parse_tracking_stamp, relative_datetimestamp, some_non_empty}; @@ -589,14 +589,14 @@ impl Tasks { /// Finds out what to do with the given string. /// Returns an EventId if a new Task was created. - pub(crate) fn filter_or_create(&mut self, position: Option<&EventId>, arg: &str) -> Option { + pub(crate) async fn filter_or_create(&mut self, position: Option<&EventId>, arg: &str) -> Option { let filtered = self.get_filtered(position, arg); match filtered.len() { 0 => { // No match, new task self.view.clear(); if arg.len() > 2 { - Some(self.make_task(arg)) + Some(self.make_task(arg).await) } else { warn!("Name of a task needs to have at least 3 characters"); None @@ -691,12 +691,12 @@ impl Tasks { /// Creates a task following the current state /// Sanitizes input - pub(crate) fn make_task(&mut self, input: &str) -> EventId { - self.make_task_with(input, empty(), true) + pub(crate) async fn make_task(&mut self, input: &str) -> EventId { + self.make_task_with(input, empty(), true).await } - pub(crate) fn make_task_and_enter(&mut self, input: &str, state: State) -> EventId { - let id = self.make_task_with(input, empty(), false); + pub(crate) async fn make_task_and_enter(&mut self, input: &str, state: State) -> EventId { + let id = self.make_task_with(input, empty(), false).await; self.set_state_for(id, "", state); self.move_to(Some(id)); id @@ -704,14 +704,14 @@ impl Tasks { /// Creates a task with tags from filter and position /// Sanitizes input - pub(crate) fn make_task_with(&mut self, input: &str, tags: impl IntoIterator, set_state: bool) -> EventId { + pub(crate) async fn make_task_with(&mut self, input: &str, tags: impl IntoIterator, set_state: bool) -> EventId { let (input, input_tags) = extract_tags(input.trim()); let id = self.submit( build_task(input, input_tags, None) .add_tags(self.tags.iter().cloned()) .add_tags(self.position_tags()) .add_tags(tags.into_iter()) - ); + ).await; if set_state { self.state.as_option().inspect(|s| self.set_state_for_with(id, s)); } @@ -730,17 +730,17 @@ impl Tasks { .is_some() } - pub(crate) fn track_at(&mut self, time: Timestamp, task: Option) -> EventId { + pub(crate) async fn track_at(&mut self, time: Timestamp, task: Option) -> EventId { info!("{} from {}", task.map_or(String::from("Stopping time-tracking"), |id| format!("Tracking \"{}\"", self.get_task_title(&id))), relative_datetimestamp(&time)); self.submit( build_tracking(task) .custom_created_at(time) - ) + ).await } /// Sign and queue the event to the relay, returning its id - fn submit(&mut self, builder: EventBuilder) -> EventId { - let event = self.sender.submit(builder).unwrap(); + async fn submit(&mut self, builder: EventBuilder) -> EventId { + let event = self.sender.submit(builder).await.unwrap(); let id = event.id; self.add(event); id @@ -814,26 +814,26 @@ impl Tasks { self.set_state_for(id, comment, comment.into()); } - pub(crate) fn set_state_for(&mut self, id: EventId, comment: &str, state: State) -> EventId { + pub(crate) async fn set_state_for(&mut self, id: EventId, comment: &str, state: State) -> EventId { let prop = build_prop( state.into(), comment, id, ); info!("Task status {} set for \"{}\"", TaskState::get_label_for(&state, comment), self.get_task_title(&id)); - self.submit(prop) + self.submit(prop).await } - pub(crate) fn update_state(&mut self, comment: &str, state: State) -> Option { + pub(crate) async fn update_state(&mut self, comment: &str, state: State) -> Option { let id = self.get_position_ref()?; - Some(self.set_state_for(id.clone(), comment, state)) + Some(self.set_state_for(id.clone(), comment, state).await) } - pub(crate) fn make_note(&mut self, note: &str) { + pub(crate) async fn make_note(&mut self, note: &str) { if let Some(id) = self.get_position_ref() { if self.get_by_id(id).is_some_and(|t| t.is_task()) { let prop = build_prop(Kind::TextNote, note.trim(), id.clone()); - self.submit(prop); + self.submit(prop).await; return; } } @@ -842,7 +842,7 @@ impl Tasks { build_task(input, tags, Some(("stateless ", Kind::TextNote))) .add_tags(self.parent_tag()) .add_tags(self.tags.iter().cloned()) - ); + ).await; } // Properties @@ -1093,7 +1093,7 @@ mod tasks_test { use super::*; fn stub_tasks() -> Tasks { - use std::sync::mpsc; + use tokio::sync::mpsc; use nostr_sdk::Keys; let (tx, _rx) = mpsc::channel();