Compare commits
1 commit
main
...
tokio-asyn
Author | SHA1 | Date | |
---|---|---|---|
![]() |
2905d35fed |
3 changed files with 77 additions and 70 deletions
|
@ -115,8 +115,8 @@ Dot or slash can be repeated to move to parent tasks before acting.
|
|||
- `<[TEXT]` - close active task and move up, with optional status description
|
||||
- `!TEXT` - set status for current task from text and move up (empty: Open)
|
||||
- `,[TEXT]` - list notes or add text note (comment / description)
|
||||
- TBI: `*[INT]` - set priority - can also be used in task creation, with any digit
|
||||
- TBI: `;[TEXT]` - list comments or comment on task
|
||||
- `;[TEXT]` - list comments or comment on task
|
||||
- `*[INT]` - set priority - can also be used in task creation, with any digit
|
||||
- TBI: show status history and creation with attribution
|
||||
- `&` - undo last action (moving in place or upwards confirms pending actions)
|
||||
- `wss://...` - switch or subscribe to relay (prefix with space to forcibly add a new one)
|
||||
|
@ -127,7 +127,7 @@ Property Filters:
|
|||
- `+TAG` - add tag filter
|
||||
- `-TAG` - remove tag filters by prefix
|
||||
- `?STATUS` - filter by status (type or description) - plain `?` to reset, `??` to show all
|
||||
- `@AUTHOR` - filter by time or author (pubkey, or `@` for self, TBI: id prefix, name prefix)
|
||||
- `@AUTHOR` - filter by author (`@` for self, id prefix, name prefix)
|
||||
- TBI: `**INT` - filter by priority
|
||||
- TBI: Filter by time
|
||||
|
||||
|
|
75
src/main.rs
75
src/main.rs
|
@ -8,18 +8,20 @@ use std::iter::once;
|
|||
use std::ops::Sub;
|
||||
use std::path::PathBuf;
|
||||
use std::str::FromStr;
|
||||
use std::sync::mpsc;
|
||||
use std::sync::mpsc::RecvTimeoutError;
|
||||
use std::sync::mpsc::Sender;
|
||||
use tokio::sync::mpsc;
|
||||
use std::time::Duration;
|
||||
|
||||
use colored::{ColoredString, Colorize};
|
||||
use colored::{Colorize};
|
||||
use env_logger::Builder;
|
||||
use itertools::Itertools;
|
||||
use log::{debug, error, info, LevelFilter, trace, warn};
|
||||
use nostr_sdk::async_utility::futures_util::TryFutureExt;
|
||||
use nostr_sdk::prelude::*;
|
||||
use nostr_sdk::TagStandard::Hashtag;
|
||||
use regex::Regex;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio::time::error::Elapsed;
|
||||
use tokio::time::timeout;
|
||||
use xdg::BaseDirectories;
|
||||
|
||||
use crate::helpers::*;
|
||||
|
@ -54,7 +56,7 @@ impl EventSender {
|
|||
}
|
||||
}
|
||||
|
||||
fn submit(&self, event_builder: EventBuilder) -> Result<Event> {
|
||||
async fn submit(&self, event_builder: EventBuilder) -> Result<Event> {
|
||||
{
|
||||
// Always flush if oldest event older than a minute or newer than now
|
||||
let borrow = self.queue.borrow();
|
||||
|
@ -62,7 +64,7 @@ impl EventSender {
|
|||
if borrow.iter().any(|e| e.created_at < min || e.created_at > Timestamp::now()) {
|
||||
drop(borrow);
|
||||
debug!("Flushing event queue because it is older than a minute");
|
||||
self.force_flush();
|
||||
self.force_flush().await;
|
||||
}
|
||||
}
|
||||
let mut queue = self.queue.borrow_mut();
|
||||
|
@ -76,19 +78,19 @@ impl EventSender {
|
|||
})?)
|
||||
}
|
||||
/// Sends all pending events
|
||||
fn force_flush(&self) {
|
||||
async fn force_flush(&self) {
|
||||
debug!("Flushing {} events from queue", self.queue.borrow().len());
|
||||
let values = self.clear();
|
||||
self.url.as_ref().map(|url| {
|
||||
if let Some(url) = self.url.as_ref() {
|
||||
self.tx.send(MostrMessage::AddTasks(url.clone(), values)).inspect_err(|e| {
|
||||
error!("Nostr communication thread failure, changes will not be persisted: {}", e)
|
||||
})
|
||||
});
|
||||
}).await;
|
||||
}
|
||||
}
|
||||
/// Sends all pending events if there is a non-tracking event
|
||||
fn flush(&self) {
|
||||
async fn flush(&self) {
|
||||
if self.queue.borrow().iter().any(|event| event.kind.as_u16() != TRACKING_KIND) {
|
||||
self.force_flush()
|
||||
self.force_flush().await
|
||||
}
|
||||
}
|
||||
fn clear(&self) -> Events {
|
||||
|
@ -185,15 +187,15 @@ async fn main() {
|
|||
], None).await;
|
||||
info!("Subscribed to tasks with {:?}", sub1);
|
||||
|
||||
let mut notifications = client.notifications();
|
||||
client.connect().await;
|
||||
|
||||
let sub2 = client.subscribe(vec![
|
||||
Filter::new().kinds(PROP_KINDS.into_iter().map(|k| Kind::from(k)))
|
||||
], None).await;
|
||||
info!("Subscribed to updates with {:?}", sub2);
|
||||
|
||||
let (tx, rx) = mpsc::channel::<MostrMessage>();
|
||||
info!("Subscribed to updates with {:?}", sub2);
|
||||
let mut notifications = client.notifications();
|
||||
client.connect().await;
|
||||
|
||||
let (tx, mut rx) = mpsc::channel::<MostrMessage>(64);
|
||||
let tasks_for_url = |url: Option<Url>| Tasks::from(url, &tx, &keys);
|
||||
let mut relays: HashMap<Url, Tasks> =
|
||||
client.relays().await.into_keys().map(|url| (url.clone(), tasks_for_url(Some(url)))).collect();
|
||||
|
@ -201,7 +203,6 @@ async fn main() {
|
|||
let sender = tokio::spawn(async move {
|
||||
let mut queue: Option<(Url, Vec<Event>)> = None;
|
||||
|
||||
loop {
|
||||
if let Ok(user) = var("USER") {
|
||||
let metadata = Metadata::new()
|
||||
.name(user);
|
||||
|
@ -215,9 +216,11 @@ async fn main() {
|
|||
or_print(client.set_metadata(&metadata).await);
|
||||
}
|
||||
|
||||
let result_received = rx.recv_timeout(Duration::from_secs(INACTVITY_DELAY));
|
||||
loop {
|
||||
let result_received = timeout(Duration::from_secs(INACTVITY_DELAY), rx.recv()).await;
|
||||
debug!("received {:?}", &result_received);
|
||||
match result_received {
|
||||
Ok(MostrMessage::NewRelay(url)) => {
|
||||
Ok(Some(MostrMessage::NewRelay(url))) => {
|
||||
if client.add_relay(&url).await.unwrap() {
|
||||
match client.connect_relay(&url).await {
|
||||
Ok(()) => info!("Connected to {url}"),
|
||||
|
@ -227,7 +230,7 @@ async fn main() {
|
|||
warn!("Relay {url} already added");
|
||||
}
|
||||
}
|
||||
Ok(MostrMessage::AddTasks(url, mut events)) => {
|
||||
Ok(Some(MostrMessage::AddTasks(url, mut events))) => {
|
||||
trace!("Queueing {:?}", &events);
|
||||
if let Some((queue_url, mut queue_events)) = queue {
|
||||
if queue_url == url {
|
||||
|
@ -244,14 +247,14 @@ async fn main() {
|
|||
queue = Some((url, events))
|
||||
}
|
||||
}
|
||||
Ok(MostrMessage::Flush) | Err(RecvTimeoutError::Timeout) => if let Some((url, events)) = queue {
|
||||
Ok(Some(MostrMessage::Flush)) | Err(Elapsed { .. }) => if let Some((url, events)) = queue {
|
||||
info!("Sending {} events to {url} due to {}", events.len(),
|
||||
result_received.map_or_else(|e| format!("{:?}", e), |m| format!("{:?}", m)));
|
||||
result_received.map_or("inactivity", |_| "flush message"));
|
||||
client.batch_event_to(vec![url], events, RelaySendOptions::new()).await;
|
||||
queue = None;
|
||||
}
|
||||
Err(err) => {
|
||||
debug!("Finalizing nostr communication thread because of {:?}: {}", err, err);
|
||||
Ok(None) => {
|
||||
debug!("Finalizing nostr communication thread because communication channel was closed");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -361,7 +364,9 @@ async fn main() {
|
|||
);
|
||||
continue;
|
||||
}
|
||||
Some(arg) => tasks.make_note(arg),
|
||||
Some(arg) => {
|
||||
tasks.make_note(arg).await;
|
||||
}
|
||||
}
|
||||
|
||||
Some('>') => {
|
||||
|
@ -436,7 +441,7 @@ async fn main() {
|
|||
StateFilter::State(State::Procedure.to_string()));
|
||||
}
|
||||
Some(id) => {
|
||||
tasks.set_state_for(id, "", State::Procedure);
|
||||
tasks.set_state_for(id, "", State::Procedure).await;
|
||||
}
|
||||
},
|
||||
Some(arg) => 'arm: {
|
||||
|
@ -446,12 +451,12 @@ async fn main() {
|
|||
tasks.make_task_with(
|
||||
arg,
|
||||
once(tasks.make_event_tag_from_id(pos, MARKER_DEPENDS)),
|
||||
true);
|
||||
true).await;
|
||||
break 'arm;
|
||||
}
|
||||
}
|
||||
let arg: String = arg.chars().skip_while(|c| c == &'|').collect();
|
||||
tasks.make_task_and_enter(&arg, State::Procedure);
|
||||
tasks.make_task_and_enter(&arg, State::Procedure).await;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -474,7 +479,8 @@ async fn main() {
|
|||
|
||||
Some('#') =>
|
||||
match arg {
|
||||
Some(arg) => tasks.set_tags(arg.split_whitespace().map(|s| Hashtag(s.to_string()).into())),
|
||||
Some(arg) =>
|
||||
tasks.set_tags(arg.split_whitespace().map(|s| Hashtag(s.to_string()).into())),
|
||||
None => {
|
||||
println!("Hashtags of all known tasks:\n{}", tasks.all_hashtags().join(" "));
|
||||
continue;
|
||||
|
@ -499,6 +505,7 @@ async fn main() {
|
|||
let (label, times) = tasks.times_tracked();
|
||||
println!("{}\n{}", label.italic(), times.rev().take(15).join("\n"));
|
||||
}
|
||||
// TODO show history from author / pubkey
|
||||
} else {
|
||||
let (label, mut times) = tasks.times_tracked();
|
||||
println!("{}\n{}", label.italic(), times.join("\n"));
|
||||
|
@ -512,7 +519,7 @@ async fn main() {
|
|||
Some(arg) => {
|
||||
if parse_tracking_stamp(arg).map(|stamp| tasks.track_at(stamp, None)).is_none() {
|
||||
// So the error message is not covered up
|
||||
continue
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -538,7 +545,7 @@ async fn main() {
|
|||
}
|
||||
tasks.set_depth(depth);
|
||||
} else {
|
||||
tasks.filter_or_create(pos.cloned().as_ref(), slice).map(|id| tasks.move_to(Some(id)));
|
||||
tasks.filter_or_create(pos.cloned().as_ref(), slice).await.map(|id| tasks.move_to(Some(id)));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -595,7 +602,7 @@ async fn main() {
|
|||
}
|
||||
match Url::parse(&input) {
|
||||
Err(e) => warn!("Failed to parse url \"{input}\": {}", e),
|
||||
Ok(url) => match tx.send(MostrMessage::NewRelay(url.clone())) {
|
||||
Ok(url) => match tx.send(MostrMessage::NewRelay(url.clone())).await {
|
||||
Err(e) => error!("Nostr communication thread failure, cannot add relay \"{url}\": {e}"),
|
||||
Ok(_) => {
|
||||
info!("Connecting to {url}");
|
||||
|
@ -606,7 +613,7 @@ async fn main() {
|
|||
}
|
||||
continue;
|
||||
} else {
|
||||
tasks.filter_or_create(tasks.get_position().as_ref(), &input);
|
||||
tasks.filter_or_create(tasks.get_position().as_ref(), &input).await;
|
||||
}
|
||||
}
|
||||
or_print(tasks.print_tasks());
|
||||
|
|
42
src/tasks.rs
42
src/tasks.rs
|
@ -4,7 +4,6 @@ use std::io::{Error, stdout, Write};
|
|||
use std::iter::{empty, once};
|
||||
use std::ops::{Div, Rem};
|
||||
use std::str::FromStr;
|
||||
use std::sync::mpsc::Sender;
|
||||
use std::time::Duration;
|
||||
|
||||
use chrono::Local;
|
||||
|
@ -14,6 +13,7 @@ use log::{debug, error, info, trace, warn};
|
|||
use nostr_sdk::{Event, EventBuilder, EventId, JsonUtil, Keys, Kind, Metadata, PublicKey, Tag, TagStandard, Timestamp, UncheckedUrl, Url};
|
||||
use nostr_sdk::prelude::Marker;
|
||||
use TagStandard::Hashtag;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
|
||||
use crate::{EventSender, MostrMessage};
|
||||
use crate::helpers::{format_stamp, local_datetimestamp, parse_tracking_stamp, relative_datetimestamp, some_non_empty};
|
||||
|
@ -589,14 +589,14 @@ impl Tasks {
|
|||
|
||||
/// Finds out what to do with the given string.
|
||||
/// Returns an EventId if a new Task was created.
|
||||
pub(crate) fn filter_or_create(&mut self, position: Option<&EventId>, arg: &str) -> Option<EventId> {
|
||||
pub(crate) async fn filter_or_create(&mut self, position: Option<&EventId>, arg: &str) -> Option<EventId> {
|
||||
let filtered = self.get_filtered(position, arg);
|
||||
match filtered.len() {
|
||||
0 => {
|
||||
// No match, new task
|
||||
self.view.clear();
|
||||
if arg.len() > 2 {
|
||||
Some(self.make_task(arg))
|
||||
Some(self.make_task(arg).await)
|
||||
} else {
|
||||
warn!("Name of a task needs to have at least 3 characters");
|
||||
None
|
||||
|
@ -691,12 +691,12 @@ impl Tasks {
|
|||
|
||||
/// Creates a task following the current state
|
||||
/// Sanitizes input
|
||||
pub(crate) fn make_task(&mut self, input: &str) -> EventId {
|
||||
self.make_task_with(input, empty(), true)
|
||||
pub(crate) async fn make_task(&mut self, input: &str) -> EventId {
|
||||
self.make_task_with(input, empty(), true).await
|
||||
}
|
||||
|
||||
pub(crate) fn make_task_and_enter(&mut self, input: &str, state: State) -> EventId {
|
||||
let id = self.make_task_with(input, empty(), false);
|
||||
pub(crate) async fn make_task_and_enter(&mut self, input: &str, state: State) -> EventId {
|
||||
let id = self.make_task_with(input, empty(), false).await;
|
||||
self.set_state_for(id, "", state);
|
||||
self.move_to(Some(id));
|
||||
id
|
||||
|
@ -704,14 +704,14 @@ impl Tasks {
|
|||
|
||||
/// Creates a task with tags from filter and position
|
||||
/// Sanitizes input
|
||||
pub(crate) fn make_task_with(&mut self, input: &str, tags: impl IntoIterator<Item=Tag>, set_state: bool) -> EventId {
|
||||
pub(crate) async fn make_task_with(&mut self, input: &str, tags: impl IntoIterator<Item=Tag>, set_state: bool) -> EventId {
|
||||
let (input, input_tags) = extract_tags(input.trim());
|
||||
let id = self.submit(
|
||||
build_task(input, input_tags, None)
|
||||
.add_tags(self.tags.iter().cloned())
|
||||
.add_tags(self.position_tags())
|
||||
.add_tags(tags.into_iter())
|
||||
);
|
||||
).await;
|
||||
if set_state {
|
||||
self.state.as_option().inspect(|s| self.set_state_for_with(id, s));
|
||||
}
|
||||
|
@ -730,17 +730,17 @@ impl Tasks {
|
|||
.is_some()
|
||||
}
|
||||
|
||||
pub(crate) fn track_at(&mut self, time: Timestamp, task: Option<EventId>) -> EventId {
|
||||
pub(crate) async fn track_at(&mut self, time: Timestamp, task: Option<EventId>) -> EventId {
|
||||
info!("{} from {}", task.map_or(String::from("Stopping time-tracking"), |id| format!("Tracking \"{}\"", self.get_task_title(&id))), relative_datetimestamp(&time));
|
||||
self.submit(
|
||||
build_tracking(task)
|
||||
.custom_created_at(time)
|
||||
)
|
||||
).await
|
||||
}
|
||||
|
||||
/// Sign and queue the event to the relay, returning its id
|
||||
fn submit(&mut self, builder: EventBuilder) -> EventId {
|
||||
let event = self.sender.submit(builder).unwrap();
|
||||
async fn submit(&mut self, builder: EventBuilder) -> EventId {
|
||||
let event = self.sender.submit(builder).await.unwrap();
|
||||
let id = event.id;
|
||||
self.add(event);
|
||||
id
|
||||
|
@ -814,26 +814,26 @@ impl Tasks {
|
|||
self.set_state_for(id, comment, comment.into());
|
||||
}
|
||||
|
||||
pub(crate) fn set_state_for(&mut self, id: EventId, comment: &str, state: State) -> EventId {
|
||||
pub(crate) async fn set_state_for(&mut self, id: EventId, comment: &str, state: State) -> EventId {
|
||||
let prop = build_prop(
|
||||
state.into(),
|
||||
comment,
|
||||
id,
|
||||
);
|
||||
info!("Task status {} set for \"{}\"", TaskState::get_label_for(&state, comment), self.get_task_title(&id));
|
||||
self.submit(prop)
|
||||
self.submit(prop).await
|
||||
}
|
||||
|
||||
pub(crate) fn update_state(&mut self, comment: &str, state: State) -> Option<EventId> {
|
||||
pub(crate) async fn update_state(&mut self, comment: &str, state: State) -> Option<EventId> {
|
||||
let id = self.get_position_ref()?;
|
||||
Some(self.set_state_for(id.clone(), comment, state))
|
||||
Some(self.set_state_for(id.clone(), comment, state).await)
|
||||
}
|
||||
|
||||
pub(crate) fn make_note(&mut self, note: &str) {
|
||||
pub(crate) async fn make_note(&mut self, note: &str) {
|
||||
if let Some(id) = self.get_position_ref() {
|
||||
if self.get_by_id(id).is_some_and(|t| t.is_task()) {
|
||||
let prop = build_prop(Kind::TextNote, note.trim(), id.clone());
|
||||
self.submit(prop);
|
||||
self.submit(prop).await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
@ -842,7 +842,7 @@ impl Tasks {
|
|||
build_task(input, tags, Some(("stateless ", Kind::TextNote)))
|
||||
.add_tags(self.parent_tag())
|
||||
.add_tags(self.tags.iter().cloned())
|
||||
);
|
||||
).await;
|
||||
}
|
||||
|
||||
// Properties
|
||||
|
@ -1093,7 +1093,7 @@ mod tasks_test {
|
|||
use super::*;
|
||||
|
||||
fn stub_tasks() -> Tasks {
|
||||
use std::sync::mpsc;
|
||||
use tokio::sync::mpsc;
|
||||
use nostr_sdk::Keys;
|
||||
|
||||
let (tx, _rx) = mpsc::channel();
|
||||
|
|
Loading…
Add table
Reference in a new issue