try tokio async

This commit is contained in:
xeruf 2024-08-20 14:05:51 +03:00
parent f98486f012
commit 2905d35fed
3 changed files with 77 additions and 70 deletions

View file

@ -115,8 +115,8 @@ Dot or slash can be repeated to move to parent tasks before acting.
- `<[TEXT]` - close active task and move up, with optional status description - `<[TEXT]` - close active task and move up, with optional status description
- `!TEXT` - set status for current task from text and move up (empty: Open) - `!TEXT` - set status for current task from text and move up (empty: Open)
- `,[TEXT]` - list notes or add text note (comment / description) - `,[TEXT]` - list notes or add text note (comment / description)
- TBI: `*[INT]` - set priority - can also be used in task creation, with any digit - `;[TEXT]` - list comments or comment on task
- TBI: `;[TEXT]` - list comments or comment on task - `*[INT]` - set priority - can also be used in task creation, with any digit
- TBI: show status history and creation with attribution - TBI: show status history and creation with attribution
- `&` - undo last action (moving in place or upwards confirms pending actions) - `&` - undo last action (moving in place or upwards confirms pending actions)
- `wss://...` - switch or subscribe to relay (prefix with space to forcibly add a new one) - `wss://...` - switch or subscribe to relay (prefix with space to forcibly add a new one)
@ -127,7 +127,7 @@ Property Filters:
- `+TAG` - add tag filter - `+TAG` - add tag filter
- `-TAG` - remove tag filters by prefix - `-TAG` - remove tag filters by prefix
- `?STATUS` - filter by status (type or description) - plain `?` to reset, `??` to show all - `?STATUS` - filter by status (type or description) - plain `?` to reset, `??` to show all
- `@AUTHOR` - filter by time or author (pubkey, or `@` for self, TBI: id prefix, name prefix) - `@AUTHOR` - filter by author (`@` for self, id prefix, name prefix)
- TBI: `**INT` - filter by priority - TBI: `**INT` - filter by priority
- TBI: Filter by time - TBI: Filter by time

View file

@ -8,18 +8,20 @@ use std::iter::once;
use std::ops::Sub; use std::ops::Sub;
use std::path::PathBuf; use std::path::PathBuf;
use std::str::FromStr; use std::str::FromStr;
use std::sync::mpsc; use tokio::sync::mpsc;
use std::sync::mpsc::RecvTimeoutError;
use std::sync::mpsc::Sender;
use std::time::Duration; use std::time::Duration;
use colored::{ColoredString, Colorize}; use colored::{Colorize};
use env_logger::Builder; use env_logger::Builder;
use itertools::Itertools; use itertools::Itertools;
use log::{debug, error, info, LevelFilter, trace, warn}; use log::{debug, error, info, LevelFilter, trace, warn};
use nostr_sdk::async_utility::futures_util::TryFutureExt;
use nostr_sdk::prelude::*; use nostr_sdk::prelude::*;
use nostr_sdk::TagStandard::Hashtag; use nostr_sdk::TagStandard::Hashtag;
use regex::Regex; use regex::Regex;
use tokio::sync::mpsc::Sender;
use tokio::time::error::Elapsed;
use tokio::time::timeout;
use xdg::BaseDirectories; use xdg::BaseDirectories;
use crate::helpers::*; use crate::helpers::*;
@ -54,7 +56,7 @@ impl EventSender {
} }
} }
fn submit(&self, event_builder: EventBuilder) -> Result<Event> { async fn submit(&self, event_builder: EventBuilder) -> Result<Event> {
{ {
// Always flush if oldest event older than a minute or newer than now // Always flush if oldest event older than a minute or newer than now
let borrow = self.queue.borrow(); let borrow = self.queue.borrow();
@ -62,7 +64,7 @@ impl EventSender {
if borrow.iter().any(|e| e.created_at < min || e.created_at > Timestamp::now()) { if borrow.iter().any(|e| e.created_at < min || e.created_at > Timestamp::now()) {
drop(borrow); drop(borrow);
debug!("Flushing event queue because it is older than a minute"); debug!("Flushing event queue because it is older than a minute");
self.force_flush(); self.force_flush().await;
} }
} }
let mut queue = self.queue.borrow_mut(); let mut queue = self.queue.borrow_mut();
@ -76,19 +78,19 @@ impl EventSender {
})?) })?)
} }
/// Sends all pending events /// Sends all pending events
fn force_flush(&self) { async fn force_flush(&self) {
debug!("Flushing {} events from queue", self.queue.borrow().len()); debug!("Flushing {} events from queue", self.queue.borrow().len());
let values = self.clear(); let values = self.clear();
self.url.as_ref().map(|url| { if let Some(url) = self.url.as_ref() {
self.tx.send(MostrMessage::AddTasks(url.clone(), values)).inspect_err(|e| { self.tx.send(MostrMessage::AddTasks(url.clone(), values)).inspect_err(|e| {
error!("Nostr communication thread failure, changes will not be persisted: {}", e) error!("Nostr communication thread failure, changes will not be persisted: {}", e)
}) }).await;
}); }
} }
/// Sends all pending events if there is a non-tracking event /// Sends all pending events if there is a non-tracking event
fn flush(&self) { async fn flush(&self) {
if self.queue.borrow().iter().any(|event| event.kind.as_u16() != TRACKING_KIND) { if self.queue.borrow().iter().any(|event| event.kind.as_u16() != TRACKING_KIND) {
self.force_flush() self.force_flush().await
} }
} }
fn clear(&self) -> Events { fn clear(&self) -> Events {
@ -185,15 +187,15 @@ async fn main() {
], None).await; ], None).await;
info!("Subscribed to tasks with {:?}", sub1); info!("Subscribed to tasks with {:?}", sub1);
let mut notifications = client.notifications();
client.connect().await;
let sub2 = client.subscribe(vec![ let sub2 = client.subscribe(vec![
Filter::new().kinds(PROP_KINDS.into_iter().map(|k| Kind::from(k))) Filter::new().kinds(PROP_KINDS.into_iter().map(|k| Kind::from(k)))
], None).await; ], None).await;
info!("Subscribed to updates with {:?}", sub2); info!("Subscribed to updates with {:?}", sub2);
let mut notifications = client.notifications();
let (tx, rx) = mpsc::channel::<MostrMessage>(); client.connect().await;
let (tx, mut rx) = mpsc::channel::<MostrMessage>(64);
let tasks_for_url = |url: Option<Url>| Tasks::from(url, &tx, &keys); let tasks_for_url = |url: Option<Url>| Tasks::from(url, &tx, &keys);
let mut relays: HashMap<Url, Tasks> = let mut relays: HashMap<Url, Tasks> =
client.relays().await.into_keys().map(|url| (url.clone(), tasks_for_url(Some(url)))).collect(); client.relays().await.into_keys().map(|url| (url.clone(), tasks_for_url(Some(url)))).collect();
@ -201,23 +203,24 @@ async fn main() {
let sender = tokio::spawn(async move { let sender = tokio::spawn(async move {
let mut queue: Option<(Url, Vec<Event>)> = None; let mut queue: Option<(Url, Vec<Event>)> = None;
loop { if let Ok(user) = var("USER") {
if let Ok(user) = var("USER") { let metadata = Metadata::new()
let metadata = Metadata::new() .name(user);
.name(user); // .display_name("My Username")
// .display_name("My Username") // .about("Description")
// .about("Description") // .picture(Url::parse("https://example.com/avatar.png")?)
// .picture(Url::parse("https://example.com/avatar.png")?) // .banner(Url::parse("https://example.com/banner.png")?)
// .banner(Url::parse("https://example.com/banner.png")?) // .nip05("username@example.com")
// .nip05("username@example.com") // .lud16("yuki@getalby.com")
// .lud16("yuki@getalby.com") // .custom_field("custom_field", "my value");
// .custom_field("custom_field", "my value"); or_print(client.set_metadata(&metadata).await);
or_print(client.set_metadata(&metadata).await); }
}
let result_received = rx.recv_timeout(Duration::from_secs(INACTVITY_DELAY)); loop {
let result_received = timeout(Duration::from_secs(INACTVITY_DELAY), rx.recv()).await;
debug!("received {:?}", &result_received);
match result_received { match result_received {
Ok(MostrMessage::NewRelay(url)) => { Ok(Some(MostrMessage::NewRelay(url))) => {
if client.add_relay(&url).await.unwrap() { if client.add_relay(&url).await.unwrap() {
match client.connect_relay(&url).await { match client.connect_relay(&url).await {
Ok(()) => info!("Connected to {url}"), Ok(()) => info!("Connected to {url}"),
@ -227,7 +230,7 @@ async fn main() {
warn!("Relay {url} already added"); warn!("Relay {url} already added");
} }
} }
Ok(MostrMessage::AddTasks(url, mut events)) => { Ok(Some(MostrMessage::AddTasks(url, mut events))) => {
trace!("Queueing {:?}", &events); trace!("Queueing {:?}", &events);
if let Some((queue_url, mut queue_events)) = queue { if let Some((queue_url, mut queue_events)) = queue {
if queue_url == url { if queue_url == url {
@ -244,14 +247,14 @@ async fn main() {
queue = Some((url, events)) queue = Some((url, events))
} }
} }
Ok(MostrMessage::Flush) | Err(RecvTimeoutError::Timeout) => if let Some((url, events)) = queue { Ok(Some(MostrMessage::Flush)) | Err(Elapsed { .. }) => if let Some((url, events)) = queue {
info!("Sending {} events to {url} due to {}", events.len(), info!("Sending {} events to {url} due to {}", events.len(),
result_received.map_or_else(|e| format!("{:?}", e), |m| format!("{:?}", m))); result_received.map_or("inactivity", |_| "flush message"));
client.batch_event_to(vec![url], events, RelaySendOptions::new()).await; client.batch_event_to(vec![url], events, RelaySendOptions::new()).await;
queue = None; queue = None;
} }
Err(err) => { Ok(None) => {
debug!("Finalizing nostr communication thread because of {:?}: {}", err, err); debug!("Finalizing nostr communication thread because communication channel was closed");
break; break;
} }
} }
@ -361,7 +364,9 @@ async fn main() {
); );
continue; continue;
} }
Some(arg) => tasks.make_note(arg), Some(arg) => {
tasks.make_note(arg).await;
}
} }
Some('>') => { Some('>') => {
@ -436,7 +441,7 @@ async fn main() {
StateFilter::State(State::Procedure.to_string())); StateFilter::State(State::Procedure.to_string()));
} }
Some(id) => { Some(id) => {
tasks.set_state_for(id, "", State::Procedure); tasks.set_state_for(id, "", State::Procedure).await;
} }
}, },
Some(arg) => 'arm: { Some(arg) => 'arm: {
@ -446,12 +451,12 @@ async fn main() {
tasks.make_task_with( tasks.make_task_with(
arg, arg,
once(tasks.make_event_tag_from_id(pos, MARKER_DEPENDS)), once(tasks.make_event_tag_from_id(pos, MARKER_DEPENDS)),
true); true).await;
break 'arm; break 'arm;
} }
} }
let arg: String = arg.chars().skip_while(|c| c == &'|').collect(); let arg: String = arg.chars().skip_while(|c| c == &'|').collect();
tasks.make_task_and_enter(&arg, State::Procedure); tasks.make_task_and_enter(&arg, State::Procedure).await;
} }
} }
@ -474,7 +479,8 @@ async fn main() {
Some('#') => Some('#') =>
match arg { match arg {
Some(arg) => tasks.set_tags(arg.split_whitespace().map(|s| Hashtag(s.to_string()).into())), Some(arg) =>
tasks.set_tags(arg.split_whitespace().map(|s| Hashtag(s.to_string()).into())),
None => { None => {
println!("Hashtags of all known tasks:\n{}", tasks.all_hashtags().join(" ")); println!("Hashtags of all known tasks:\n{}", tasks.all_hashtags().join(" "));
continue; continue;
@ -499,6 +505,7 @@ async fn main() {
let (label, times) = tasks.times_tracked(); let (label, times) = tasks.times_tracked();
println!("{}\n{}", label.italic(), times.rev().take(15).join("\n")); println!("{}\n{}", label.italic(), times.rev().take(15).join("\n"));
} }
// TODO show history from author / pubkey
} else { } else {
let (label, mut times) = tasks.times_tracked(); let (label, mut times) = tasks.times_tracked();
println!("{}\n{}", label.italic(), times.join("\n")); println!("{}\n{}", label.italic(), times.join("\n"));
@ -512,7 +519,7 @@ async fn main() {
Some(arg) => { Some(arg) => {
if parse_tracking_stamp(arg).map(|stamp| tasks.track_at(stamp, None)).is_none() { if parse_tracking_stamp(arg).map(|stamp| tasks.track_at(stamp, None)).is_none() {
// So the error message is not covered up // So the error message is not covered up
continue continue;
} }
} }
} }
@ -538,7 +545,7 @@ async fn main() {
} }
tasks.set_depth(depth); tasks.set_depth(depth);
} else { } else {
tasks.filter_or_create(pos.cloned().as_ref(), slice).map(|id| tasks.move_to(Some(id))); tasks.filter_or_create(pos.cloned().as_ref(), slice).await.map(|id| tasks.move_to(Some(id)));
} }
} }
@ -595,7 +602,7 @@ async fn main() {
} }
match Url::parse(&input) { match Url::parse(&input) {
Err(e) => warn!("Failed to parse url \"{input}\": {}", e), Err(e) => warn!("Failed to parse url \"{input}\": {}", e),
Ok(url) => match tx.send(MostrMessage::NewRelay(url.clone())) { Ok(url) => match tx.send(MostrMessage::NewRelay(url.clone())).await {
Err(e) => error!("Nostr communication thread failure, cannot add relay \"{url}\": {e}"), Err(e) => error!("Nostr communication thread failure, cannot add relay \"{url}\": {e}"),
Ok(_) => { Ok(_) => {
info!("Connecting to {url}"); info!("Connecting to {url}");
@ -606,7 +613,7 @@ async fn main() {
} }
continue; continue;
} else { } else {
tasks.filter_or_create(tasks.get_position().as_ref(), &input); tasks.filter_or_create(tasks.get_position().as_ref(), &input).await;
} }
} }
or_print(tasks.print_tasks()); or_print(tasks.print_tasks());

View file

@ -4,7 +4,6 @@ use std::io::{Error, stdout, Write};
use std::iter::{empty, once}; use std::iter::{empty, once};
use std::ops::{Div, Rem}; use std::ops::{Div, Rem};
use std::str::FromStr; use std::str::FromStr;
use std::sync::mpsc::Sender;
use std::time::Duration; use std::time::Duration;
use chrono::Local; use chrono::Local;
@ -14,6 +13,7 @@ use log::{debug, error, info, trace, warn};
use nostr_sdk::{Event, EventBuilder, EventId, JsonUtil, Keys, Kind, Metadata, PublicKey, Tag, TagStandard, Timestamp, UncheckedUrl, Url}; use nostr_sdk::{Event, EventBuilder, EventId, JsonUtil, Keys, Kind, Metadata, PublicKey, Tag, TagStandard, Timestamp, UncheckedUrl, Url};
use nostr_sdk::prelude::Marker; use nostr_sdk::prelude::Marker;
use TagStandard::Hashtag; use TagStandard::Hashtag;
use tokio::sync::mpsc::Sender;
use crate::{EventSender, MostrMessage}; use crate::{EventSender, MostrMessage};
use crate::helpers::{format_stamp, local_datetimestamp, parse_tracking_stamp, relative_datetimestamp, some_non_empty}; use crate::helpers::{format_stamp, local_datetimestamp, parse_tracking_stamp, relative_datetimestamp, some_non_empty};
@ -589,14 +589,14 @@ impl Tasks {
/// Finds out what to do with the given string. /// Finds out what to do with the given string.
/// Returns an EventId if a new Task was created. /// Returns an EventId if a new Task was created.
pub(crate) fn filter_or_create(&mut self, position: Option<&EventId>, arg: &str) -> Option<EventId> { pub(crate) async fn filter_or_create(&mut self, position: Option<&EventId>, arg: &str) -> Option<EventId> {
let filtered = self.get_filtered(position, arg); let filtered = self.get_filtered(position, arg);
match filtered.len() { match filtered.len() {
0 => { 0 => {
// No match, new task // No match, new task
self.view.clear(); self.view.clear();
if arg.len() > 2 { if arg.len() > 2 {
Some(self.make_task(arg)) Some(self.make_task(arg).await)
} else { } else {
warn!("Name of a task needs to have at least 3 characters"); warn!("Name of a task needs to have at least 3 characters");
None None
@ -691,12 +691,12 @@ impl Tasks {
/// Creates a task following the current state /// Creates a task following the current state
/// Sanitizes input /// Sanitizes input
pub(crate) fn make_task(&mut self, input: &str) -> EventId { pub(crate) async fn make_task(&mut self, input: &str) -> EventId {
self.make_task_with(input, empty(), true) self.make_task_with(input, empty(), true).await
} }
pub(crate) fn make_task_and_enter(&mut self, input: &str, state: State) -> EventId { pub(crate) async fn make_task_and_enter(&mut self, input: &str, state: State) -> EventId {
let id = self.make_task_with(input, empty(), false); let id = self.make_task_with(input, empty(), false).await;
self.set_state_for(id, "", state); self.set_state_for(id, "", state);
self.move_to(Some(id)); self.move_to(Some(id));
id id
@ -704,14 +704,14 @@ impl Tasks {
/// Creates a task with tags from filter and position /// Creates a task with tags from filter and position
/// Sanitizes input /// Sanitizes input
pub(crate) fn make_task_with(&mut self, input: &str, tags: impl IntoIterator<Item=Tag>, set_state: bool) -> EventId { pub(crate) async fn make_task_with(&mut self, input: &str, tags: impl IntoIterator<Item=Tag>, set_state: bool) -> EventId {
let (input, input_tags) = extract_tags(input.trim()); let (input, input_tags) = extract_tags(input.trim());
let id = self.submit( let id = self.submit(
build_task(input, input_tags, None) build_task(input, input_tags, None)
.add_tags(self.tags.iter().cloned()) .add_tags(self.tags.iter().cloned())
.add_tags(self.position_tags()) .add_tags(self.position_tags())
.add_tags(tags.into_iter()) .add_tags(tags.into_iter())
); ).await;
if set_state { if set_state {
self.state.as_option().inspect(|s| self.set_state_for_with(id, s)); self.state.as_option().inspect(|s| self.set_state_for_with(id, s));
} }
@ -730,17 +730,17 @@ impl Tasks {
.is_some() .is_some()
} }
pub(crate) fn track_at(&mut self, time: Timestamp, task: Option<EventId>) -> EventId { pub(crate) async fn track_at(&mut self, time: Timestamp, task: Option<EventId>) -> EventId {
info!("{} from {}", task.map_or(String::from("Stopping time-tracking"), |id| format!("Tracking \"{}\"", self.get_task_title(&id))), relative_datetimestamp(&time)); info!("{} from {}", task.map_or(String::from("Stopping time-tracking"), |id| format!("Tracking \"{}\"", self.get_task_title(&id))), relative_datetimestamp(&time));
self.submit( self.submit(
build_tracking(task) build_tracking(task)
.custom_created_at(time) .custom_created_at(time)
) ).await
} }
/// Sign and queue the event to the relay, returning its id /// Sign and queue the event to the relay, returning its id
fn submit(&mut self, builder: EventBuilder) -> EventId { async fn submit(&mut self, builder: EventBuilder) -> EventId {
let event = self.sender.submit(builder).unwrap(); let event = self.sender.submit(builder).await.unwrap();
let id = event.id; let id = event.id;
self.add(event); self.add(event);
id id
@ -814,26 +814,26 @@ impl Tasks {
self.set_state_for(id, comment, comment.into()); self.set_state_for(id, comment, comment.into());
} }
pub(crate) fn set_state_for(&mut self, id: EventId, comment: &str, state: State) -> EventId { pub(crate) async fn set_state_for(&mut self, id: EventId, comment: &str, state: State) -> EventId {
let prop = build_prop( let prop = build_prop(
state.into(), state.into(),
comment, comment,
id, id,
); );
info!("Task status {} set for \"{}\"", TaskState::get_label_for(&state, comment), self.get_task_title(&id)); info!("Task status {} set for \"{}\"", TaskState::get_label_for(&state, comment), self.get_task_title(&id));
self.submit(prop) self.submit(prop).await
} }
pub(crate) fn update_state(&mut self, comment: &str, state: State) -> Option<EventId> { pub(crate) async fn update_state(&mut self, comment: &str, state: State) -> Option<EventId> {
let id = self.get_position_ref()?; let id = self.get_position_ref()?;
Some(self.set_state_for(id.clone(), comment, state)) Some(self.set_state_for(id.clone(), comment, state).await)
} }
pub(crate) fn make_note(&mut self, note: &str) { pub(crate) async fn make_note(&mut self, note: &str) {
if let Some(id) = self.get_position_ref() { if let Some(id) = self.get_position_ref() {
if self.get_by_id(id).is_some_and(|t| t.is_task()) { if self.get_by_id(id).is_some_and(|t| t.is_task()) {
let prop = build_prop(Kind::TextNote, note.trim(), id.clone()); let prop = build_prop(Kind::TextNote, note.trim(), id.clone());
self.submit(prop); self.submit(prop).await;
return; return;
} }
} }
@ -842,7 +842,7 @@ impl Tasks {
build_task(input, tags, Some(("stateless ", Kind::TextNote))) build_task(input, tags, Some(("stateless ", Kind::TextNote)))
.add_tags(self.parent_tag()) .add_tags(self.parent_tag())
.add_tags(self.tags.iter().cloned()) .add_tags(self.tags.iter().cloned())
); ).await;
} }
// Properties // Properties
@ -1093,7 +1093,7 @@ mod tasks_test {
use super::*; use super::*;
fn stub_tasks() -> Tasks { fn stub_tasks() -> Tasks {
use std::sync::mpsc; use tokio::sync::mpsc;
use nostr_sdk::Keys; use nostr_sdk::Keys;
let (tx, _rx) = mpsc::channel(); let (tx, _rx) = mpsc::channel();