Compare commits

...
Sign in to create a new pull request.

1 commit

Author SHA1 Message Date
xeruf
2905d35fed try tokio async 2024-08-20 14:05:51 +03:00
3 changed files with 77 additions and 70 deletions

View file

@ -115,8 +115,8 @@ Dot or slash can be repeated to move to parent tasks before acting.
- `<[TEXT]` - close active task and move up, with optional status description
- `!TEXT` - set status for current task from text and move up (empty: Open)
- `,[TEXT]` - list notes or add text note (comment / description)
- TBI: `*[INT]` - set priority - can also be used in task creation, with any digit
- TBI: `;[TEXT]` - list comments or comment on task
- `;[TEXT]` - list comments or comment on task
- `*[INT]` - set priority - can also be used in task creation, with any digit
- TBI: show status history and creation with attribution
- `&` - undo last action (moving in place or upwards confirms pending actions)
- `wss://...` - switch or subscribe to relay (prefix with space to forcibly add a new one)
@ -127,7 +127,7 @@ Property Filters:
- `+TAG` - add tag filter
- `-TAG` - remove tag filters by prefix
- `?STATUS` - filter by status (type or description) - plain `?` to reset, `??` to show all
- `@AUTHOR` - filter by time or author (pubkey, or `@` for self, TBI: id prefix, name prefix)
- `@AUTHOR` - filter by author (`@` for self, id prefix, name prefix)
- TBI: `**INT` - filter by priority
- TBI: Filter by time

View file

@ -8,18 +8,20 @@ use std::iter::once;
use std::ops::Sub;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::mpsc;
use std::sync::mpsc::RecvTimeoutError;
use std::sync::mpsc::Sender;
use tokio::sync::mpsc;
use std::time::Duration;
use colored::{ColoredString, Colorize};
use colored::{Colorize};
use env_logger::Builder;
use itertools::Itertools;
use log::{debug, error, info, LevelFilter, trace, warn};
use nostr_sdk::async_utility::futures_util::TryFutureExt;
use nostr_sdk::prelude::*;
use nostr_sdk::TagStandard::Hashtag;
use regex::Regex;
use tokio::sync::mpsc::Sender;
use tokio::time::error::Elapsed;
use tokio::time::timeout;
use xdg::BaseDirectories;
use crate::helpers::*;
@ -54,7 +56,7 @@ impl EventSender {
}
}
fn submit(&self, event_builder: EventBuilder) -> Result<Event> {
async fn submit(&self, event_builder: EventBuilder) -> Result<Event> {
{
// Always flush if oldest event older than a minute or newer than now
let borrow = self.queue.borrow();
@ -62,7 +64,7 @@ impl EventSender {
if borrow.iter().any(|e| e.created_at < min || e.created_at > Timestamp::now()) {
drop(borrow);
debug!("Flushing event queue because it is older than a minute");
self.force_flush();
self.force_flush().await;
}
}
let mut queue = self.queue.borrow_mut();
@ -76,19 +78,19 @@ impl EventSender {
})?)
}
/// Sends all pending events
fn force_flush(&self) {
async fn force_flush(&self) {
debug!("Flushing {} events from queue", self.queue.borrow().len());
let values = self.clear();
self.url.as_ref().map(|url| {
if let Some(url) = self.url.as_ref() {
self.tx.send(MostrMessage::AddTasks(url.clone(), values)).inspect_err(|e| {
error!("Nostr communication thread failure, changes will not be persisted: {}", e)
})
});
}).await;
}
}
/// Sends all pending events if there is a non-tracking event
fn flush(&self) {
async fn flush(&self) {
if self.queue.borrow().iter().any(|event| event.kind.as_u16() != TRACKING_KIND) {
self.force_flush()
self.force_flush().await
}
}
fn clear(&self) -> Events {
@ -185,15 +187,15 @@ async fn main() {
], None).await;
info!("Subscribed to tasks with {:?}", sub1);
let mut notifications = client.notifications();
client.connect().await;
let sub2 = client.subscribe(vec![
Filter::new().kinds(PROP_KINDS.into_iter().map(|k| Kind::from(k)))
], None).await;
info!("Subscribed to updates with {:?}", sub2);
let (tx, rx) = mpsc::channel::<MostrMessage>();
info!("Subscribed to updates with {:?}", sub2);
let mut notifications = client.notifications();
client.connect().await;
let (tx, mut rx) = mpsc::channel::<MostrMessage>(64);
let tasks_for_url = |url: Option<Url>| Tasks::from(url, &tx, &keys);
let mut relays: HashMap<Url, Tasks> =
client.relays().await.into_keys().map(|url| (url.clone(), tasks_for_url(Some(url)))).collect();
@ -201,7 +203,6 @@ async fn main() {
let sender = tokio::spawn(async move {
let mut queue: Option<(Url, Vec<Event>)> = None;
loop {
if let Ok(user) = var("USER") {
let metadata = Metadata::new()
.name(user);
@ -215,9 +216,11 @@ async fn main() {
or_print(client.set_metadata(&metadata).await);
}
let result_received = rx.recv_timeout(Duration::from_secs(INACTVITY_DELAY));
loop {
let result_received = timeout(Duration::from_secs(INACTVITY_DELAY), rx.recv()).await;
debug!("received {:?}", &result_received);
match result_received {
Ok(MostrMessage::NewRelay(url)) => {
Ok(Some(MostrMessage::NewRelay(url))) => {
if client.add_relay(&url).await.unwrap() {
match client.connect_relay(&url).await {
Ok(()) => info!("Connected to {url}"),
@ -227,7 +230,7 @@ async fn main() {
warn!("Relay {url} already added");
}
}
Ok(MostrMessage::AddTasks(url, mut events)) => {
Ok(Some(MostrMessage::AddTasks(url, mut events))) => {
trace!("Queueing {:?}", &events);
if let Some((queue_url, mut queue_events)) = queue {
if queue_url == url {
@ -244,14 +247,14 @@ async fn main() {
queue = Some((url, events))
}
}
Ok(MostrMessage::Flush) | Err(RecvTimeoutError::Timeout) => if let Some((url, events)) = queue {
Ok(Some(MostrMessage::Flush)) | Err(Elapsed { .. }) => if let Some((url, events)) = queue {
info!("Sending {} events to {url} due to {}", events.len(),
result_received.map_or_else(|e| format!("{:?}", e), |m| format!("{:?}", m)));
result_received.map_or("inactivity", |_| "flush message"));
client.batch_event_to(vec![url], events, RelaySendOptions::new()).await;
queue = None;
}
Err(err) => {
debug!("Finalizing nostr communication thread because of {:?}: {}", err, err);
Ok(None) => {
debug!("Finalizing nostr communication thread because communication channel was closed");
break;
}
}
@ -361,7 +364,9 @@ async fn main() {
);
continue;
}
Some(arg) => tasks.make_note(arg),
Some(arg) => {
tasks.make_note(arg).await;
}
}
Some('>') => {
@ -436,7 +441,7 @@ async fn main() {
StateFilter::State(State::Procedure.to_string()));
}
Some(id) => {
tasks.set_state_for(id, "", State::Procedure);
tasks.set_state_for(id, "", State::Procedure).await;
}
},
Some(arg) => 'arm: {
@ -446,12 +451,12 @@ async fn main() {
tasks.make_task_with(
arg,
once(tasks.make_event_tag_from_id(pos, MARKER_DEPENDS)),
true);
true).await;
break 'arm;
}
}
let arg: String = arg.chars().skip_while(|c| c == &'|').collect();
tasks.make_task_and_enter(&arg, State::Procedure);
tasks.make_task_and_enter(&arg, State::Procedure).await;
}
}
@ -474,7 +479,8 @@ async fn main() {
Some('#') =>
match arg {
Some(arg) => tasks.set_tags(arg.split_whitespace().map(|s| Hashtag(s.to_string()).into())),
Some(arg) =>
tasks.set_tags(arg.split_whitespace().map(|s| Hashtag(s.to_string()).into())),
None => {
println!("Hashtags of all known tasks:\n{}", tasks.all_hashtags().join(" "));
continue;
@ -499,6 +505,7 @@ async fn main() {
let (label, times) = tasks.times_tracked();
println!("{}\n{}", label.italic(), times.rev().take(15).join("\n"));
}
// TODO show history from author / pubkey
} else {
let (label, mut times) = tasks.times_tracked();
println!("{}\n{}", label.italic(), times.join("\n"));
@ -512,7 +519,7 @@ async fn main() {
Some(arg) => {
if parse_tracking_stamp(arg).map(|stamp| tasks.track_at(stamp, None)).is_none() {
// So the error message is not covered up
continue
continue;
}
}
}
@ -538,7 +545,7 @@ async fn main() {
}
tasks.set_depth(depth);
} else {
tasks.filter_or_create(pos.cloned().as_ref(), slice).map(|id| tasks.move_to(Some(id)));
tasks.filter_or_create(pos.cloned().as_ref(), slice).await.map(|id| tasks.move_to(Some(id)));
}
}
@ -595,7 +602,7 @@ async fn main() {
}
match Url::parse(&input) {
Err(e) => warn!("Failed to parse url \"{input}\": {}", e),
Ok(url) => match tx.send(MostrMessage::NewRelay(url.clone())) {
Ok(url) => match tx.send(MostrMessage::NewRelay(url.clone())).await {
Err(e) => error!("Nostr communication thread failure, cannot add relay \"{url}\": {e}"),
Ok(_) => {
info!("Connecting to {url}");
@ -606,7 +613,7 @@ async fn main() {
}
continue;
} else {
tasks.filter_or_create(tasks.get_position().as_ref(), &input);
tasks.filter_or_create(tasks.get_position().as_ref(), &input).await;
}
}
or_print(tasks.print_tasks());

View file

@ -4,7 +4,6 @@ use std::io::{Error, stdout, Write};
use std::iter::{empty, once};
use std::ops::{Div, Rem};
use std::str::FromStr;
use std::sync::mpsc::Sender;
use std::time::Duration;
use chrono::Local;
@ -14,6 +13,7 @@ use log::{debug, error, info, trace, warn};
use nostr_sdk::{Event, EventBuilder, EventId, JsonUtil, Keys, Kind, Metadata, PublicKey, Tag, TagStandard, Timestamp, UncheckedUrl, Url};
use nostr_sdk::prelude::Marker;
use TagStandard::Hashtag;
use tokio::sync::mpsc::Sender;
use crate::{EventSender, MostrMessage};
use crate::helpers::{format_stamp, local_datetimestamp, parse_tracking_stamp, relative_datetimestamp, some_non_empty};
@ -589,14 +589,14 @@ impl Tasks {
/// Finds out what to do with the given string.
/// Returns an EventId if a new Task was created.
pub(crate) fn filter_or_create(&mut self, position: Option<&EventId>, arg: &str) -> Option<EventId> {
pub(crate) async fn filter_or_create(&mut self, position: Option<&EventId>, arg: &str) -> Option<EventId> {
let filtered = self.get_filtered(position, arg);
match filtered.len() {
0 => {
// No match, new task
self.view.clear();
if arg.len() > 2 {
Some(self.make_task(arg))
Some(self.make_task(arg).await)
} else {
warn!("Name of a task needs to have at least 3 characters");
None
@ -691,12 +691,12 @@ impl Tasks {
/// Creates a task following the current state
/// Sanitizes input
pub(crate) fn make_task(&mut self, input: &str) -> EventId {
self.make_task_with(input, empty(), true)
pub(crate) async fn make_task(&mut self, input: &str) -> EventId {
self.make_task_with(input, empty(), true).await
}
pub(crate) fn make_task_and_enter(&mut self, input: &str, state: State) -> EventId {
let id = self.make_task_with(input, empty(), false);
pub(crate) async fn make_task_and_enter(&mut self, input: &str, state: State) -> EventId {
let id = self.make_task_with(input, empty(), false).await;
self.set_state_for(id, "", state);
self.move_to(Some(id));
id
@ -704,14 +704,14 @@ impl Tasks {
/// Creates a task with tags from filter and position
/// Sanitizes input
pub(crate) fn make_task_with(&mut self, input: &str, tags: impl IntoIterator<Item=Tag>, set_state: bool) -> EventId {
pub(crate) async fn make_task_with(&mut self, input: &str, tags: impl IntoIterator<Item=Tag>, set_state: bool) -> EventId {
let (input, input_tags) = extract_tags(input.trim());
let id = self.submit(
build_task(input, input_tags, None)
.add_tags(self.tags.iter().cloned())
.add_tags(self.position_tags())
.add_tags(tags.into_iter())
);
).await;
if set_state {
self.state.as_option().inspect(|s| self.set_state_for_with(id, s));
}
@ -730,17 +730,17 @@ impl Tasks {
.is_some()
}
pub(crate) fn track_at(&mut self, time: Timestamp, task: Option<EventId>) -> EventId {
pub(crate) async fn track_at(&mut self, time: Timestamp, task: Option<EventId>) -> EventId {
info!("{} from {}", task.map_or(String::from("Stopping time-tracking"), |id| format!("Tracking \"{}\"", self.get_task_title(&id))), relative_datetimestamp(&time));
self.submit(
build_tracking(task)
.custom_created_at(time)
)
).await
}
/// Sign and queue the event to the relay, returning its id
fn submit(&mut self, builder: EventBuilder) -> EventId {
let event = self.sender.submit(builder).unwrap();
async fn submit(&mut self, builder: EventBuilder) -> EventId {
let event = self.sender.submit(builder).await.unwrap();
let id = event.id;
self.add(event);
id
@ -814,26 +814,26 @@ impl Tasks {
self.set_state_for(id, comment, comment.into());
}
pub(crate) fn set_state_for(&mut self, id: EventId, comment: &str, state: State) -> EventId {
pub(crate) async fn set_state_for(&mut self, id: EventId, comment: &str, state: State) -> EventId {
let prop = build_prop(
state.into(),
comment,
id,
);
info!("Task status {} set for \"{}\"", TaskState::get_label_for(&state, comment), self.get_task_title(&id));
self.submit(prop)
self.submit(prop).await
}
pub(crate) fn update_state(&mut self, comment: &str, state: State) -> Option<EventId> {
pub(crate) async fn update_state(&mut self, comment: &str, state: State) -> Option<EventId> {
let id = self.get_position_ref()?;
Some(self.set_state_for(id.clone(), comment, state))
Some(self.set_state_for(id.clone(), comment, state).await)
}
pub(crate) fn make_note(&mut self, note: &str) {
pub(crate) async fn make_note(&mut self, note: &str) {
if let Some(id) = self.get_position_ref() {
if self.get_by_id(id).is_some_and(|t| t.is_task()) {
let prop = build_prop(Kind::TextNote, note.trim(), id.clone());
self.submit(prop);
self.submit(prop).await;
return;
}
}
@ -842,7 +842,7 @@ impl Tasks {
build_task(input, tags, Some(("stateless ", Kind::TextNote)))
.add_tags(self.parent_tag())
.add_tags(self.tags.iter().cloned())
);
).await;
}
// Properties
@ -1093,7 +1093,7 @@ mod tasks_test {
use super::*;
fn stub_tasks() -> Tasks {
use std::sync::mpsc;
use tokio::sync::mpsc;
use nostr_sdk::Keys;
let (tx, _rx) = mpsc::channel();