From b30994c151241aeb3b6fa1ecaf750a894e068436 Mon Sep 17 00:00:00 2001 From: xeruf <27jf@pm.me> Date: Wed, 24 Jul 2024 16:03:34 +0300 Subject: [PATCH] feat: live updates via mpsc --- README.md | 4 +- src/main.rs | 127 ++++++++++++++++++++++++++------------------------- src/task.rs | 25 ++++------ src/tasks.rs | 56 +++++++++++++++-------- 4 files changed, 110 insertions(+), 102 deletions(-) diff --git a/README.md b/README.md index 6bd2255..9433158 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,4 @@ nostril --envelope --content "realtime message" --kind 90002 | websocat ws://loc ## Plans -- TUI -- Send messages asynchronously -- How to clear terminal? \ No newline at end of file +- TUI - Clear terminal? \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index b548e7c..dc79f08 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,9 +1,13 @@ use std::borrow::Borrow; use std::env::args; -use std::io::{stdin, stdout, Write}; +use std::fmt::Display; +use std::fs; +use std::io::{Read, stdin, stdout, Write}; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use std::ops::Deref; -use std::time::Duration; +use std::str::FromStr; +use std::sync::mpsc; +use std::sync::mpsc::Sender; use nostr_sdk::async_utility::futures_util::TryFutureExt; use nostr_sdk::prelude::*; @@ -98,20 +102,23 @@ async fn main() { client.connect().await; - let mut tasks: Tasks = Default::default(); - for argument in args().skip(1) { - tasks.add_task(make_task(&argument, &[Tag::Hashtag("arg".to_string())])); - } + let (tx, rx) = mpsc::channel::(); + let mut tasks: Tasks = Tasks::from(EventSender { + keys: MY_KEYS.clone(), + tx, + }); let sub_id: SubscriptionId = client.subscribe(vec![Filter::new()], None).await; + eprintln!("Subscribed with {}", sub_id); let mut notifications = client.notifications(); - println!("Finding existing events"); - let res = client - .get_events_of(vec![Filter::new()], None) + /*println!("Finding existing events"); + let _ = client + .get_events_of(vec![Filter::new()], Some(Duration::from_secs(5))) .map_ok(|res| { println!("Found {} events", res.len()); - let (mut task_events, props): (Vec, Vec) = res.into_iter().partition(|e| e.kind.as_u32() == 1621); + let (mut task_events, props): (Vec, Vec) = + res.into_iter().partition(|e| e.kind.as_u32() == 1621); task_events.sort_unstable(); for event in task_events { print_event(&event); @@ -122,13 +129,35 @@ async fn main() { tasks.add_prop(&event); } }) - .await; + .await;*/ + let sender = tokio::spawn(async move { + while let Ok(e) = rx.recv() { + //eprintln!("Sending {}", e.id); + let _ = client.send_event(e).await; + } + println!("Stopping listeners..."); + client.unsubscribe_all().await; + }); + for argument in args().skip(1) { + tasks.make_task(&argument); + } println!(); - tasks.print_current_tasks(); - loop { + while let Ok(notification) = notifications.try_recv() { + if let RelayPoolNotification::Event { + subscription_id, + event, + .. + } = notification + { + print_event(&event); + tasks.add(*event); + } + } + tasks.print_current_tasks(); + print!(" {}> ", tasks.taskpath(tasks.get_position())); stdout().flush().unwrap(); match stdin().lines().next() { @@ -183,17 +212,19 @@ async fn main() { if !slice.is_empty() { pos = EventId::parse(slice).ok().or_else(|| { tasks.move_to(pos); - let filtered: Vec = tasks.current_tasks().iter().filter(|t| t.event.content.starts_with(slice)).map(|t| t.event.id).collect(); + let filtered: Vec = tasks + .current_tasks() + .iter() + .filter(|t| t.event.content.starts_with(slice)) + .map(|t| t.event.id) + .collect(); match filtered.len() { 0 => { // No match, new task - let task = tasks.make_task(slice); - let ret = Some(task.id); - tasks.add_task(task); - ret + tasks.make_task(slice) } 1 => { - // One match, select + // One match, activate Some(filtered.first().unwrap().clone()) } _ => { @@ -212,59 +243,31 @@ async fn main() { } _ => { - tasks.add_task(tasks.make_task(&input)); + tasks.make_task(&input); } } } Some(Err(e)) => eprintln!("{}", e), None => break, } - - while let Ok(notification) = notifications.try_recv() { - if let RelayPoolNotification::Event { - subscription_id, - event, - .. - } = notification - { - print_event(&event); - tasks.add(*event); - } - } - - tasks.print_current_tasks(); } - tasks.update_state("", |t| if t.pure_state() == State::Active { Some(State::Open) } else { None }); + tasks.update_state("", |t| { + if t.pure_state() == State::Active { + Some(State::Open) + } else { + None + } + }); + drop(tasks); - println!(); - println!("Submitting events"); - // TODO send via message passing - let _ = client - .batch_event( - tasks - .tasks - .into_values() - .flat_map(|t| { - let mut ev = t.props; - ev.push(t.event); - ev - }) - .collect(), - RelaySendOptions::new().skip_send_confirmation(true), - ) - .await; -} - -fn make_task(text: &str, tags: &[Tag]) -> Event { - make_event(Kind::from(1621), text, tags) -} -fn make_event(kind: Kind, text: &str, tags: &[Tag]) -> Event { - EventBuilder::new(kind, text, tags.to_vec()) - .to_event(&MY_KEYS) - .unwrap() + eprintln!("Waiting for sync to relay..."); + or_print(sender.await); } fn print_event(event: &Event) { - println!("At {} found {} kind {} '{}' {:?}", event.created_at, event.id, event.kind, event.content, event.tags); + eprintln!( + "At {} found {} kind {} '{}' {:?}", + event.created_at, event.id, event.kind, event.content, event.tags + ); } diff --git a/src/task.rs b/src/task.rs index 47c2f81..f73ff8a 100644 --- a/src/task.rs +++ b/src/task.rs @@ -1,18 +1,19 @@ -use crate::make_event; -use nostr_sdk::{Event, EventId, Kind, Tag, Timestamp}; +use std::collections::{BTreeSet, HashSet}; use std::fmt; +use nostr_sdk::{Event, EventId, Kind, Tag, Timestamp}; + pub(crate) struct Task { pub(crate) event: Event, - pub(crate) children: Vec, - pub(crate) props: Vec, + pub(crate) children: HashSet, + pub(crate) props: BTreeSet, } impl Task { pub(crate) fn new(event: Event) -> Task { Task { event, - children: Vec::new(), - props: Vec::new(), + children: Default::default(), + props: Default::default(), } } @@ -65,16 +66,6 @@ impl Task { self.state().map_or(State::Open, |s| s.state) } - pub(crate) fn update_state(&mut self, state: State, comment: &str) -> Event { - let event = make_event( - state.kind(), - comment, - &[Tag::event(self.event.id)], - ); - self.props.push(event.clone()); - event - } - fn default_state(&self) -> TaskState { TaskState { name: None, @@ -149,7 +140,7 @@ pub(crate) enum State { Done, } impl State { - fn kind(&self) -> Kind { + pub(crate) fn kind(&self) -> Kind { match self { State::Open => Kind::from(1630), State::Done => Kind::from(1631), diff --git a/src/tasks.rs b/src/tasks.rs index 3b22e35..462f207 100644 --- a/src/tasks.rs +++ b/src/tasks.rs @@ -1,8 +1,8 @@ use std::collections::HashMap; -use nostr_sdk::{Event, EventId, Tag}; +use nostr_sdk::{Event, EventBuilder, EventId, Kind, Tag}; -use crate::make_task; +use crate::{EventSender, TASK_KIND}; use crate::task::{State, Task}; type TaskMap = HashMap; @@ -15,15 +15,17 @@ pub(crate) struct Tasks { position: Option, /// A filtered view of the current tasks view: Vec, + sender: EventSender } -impl Default for Tasks { - fn default() -> Self { +impl Tasks { + pub(crate) fn from(sender: EventSender) -> Self { Tasks { tasks: Default::default(), - properties: vec!["id".into(), "name".into(), "state".into()], + properties: vec!["id".into(), "name".into(), "state".into(), "ttime".into()], position: None, view: Default::default(), + sender } } } @@ -33,10 +35,6 @@ impl Tasks { self.position } - fn collect_tasks(&self, tasks: &Vec) -> Vec<&Task> { - tasks.iter().filter_map(|id| self.tasks.get(id)).collect() - } - /// Total time this task and its subtasks have been active fn total_time_tracked(&self, task: &EventId) -> u64 { self.tasks.get(task).map_or(0, |t| { @@ -53,7 +51,7 @@ impl Tasks { } pub(crate) fn current_tasks(&self) -> Vec<&Task> { - let res = self.collect_tasks(&self.view); + let res: Vec<&Task> = self.view.iter().filter_map(|id| self.tasks.get(id)).collect(); if res.len() > 0 { return res; } @@ -62,7 +60,7 @@ impl Tasks { |p| { self.tasks .get(&p) - .map_or(Vec::new(), |t| self.collect_tasks(&t.children)) + .map_or(Vec::new(), |t| t.children.iter().filter_map(|id| self.tasks.get(id)).collect()) }, ) } @@ -86,11 +84,19 @@ impl Tasks { println!(); } - pub(crate) fn make_task(&self, input: &str) -> Event { + pub(crate) fn make_task(&mut self, input: &str) -> Option { + self.sender.submit(self.build_task(input)).map(|e| { + let id = e.id; + self.add_task(e); + id + }) + } + + pub(crate) fn build_task(&self, input: &str) -> EventBuilder { let mut tags: Vec = Vec::new(); self.position.inspect(|p| tags.push(Tag::event(*p))); return match input.split_once(": ") { - None => make_task(&input, &tags), + None => EventBuilder::new(Kind::from(TASK_KIND), input, tags), Some(s) => { tags.append( &mut s @@ -99,7 +105,7 @@ impl Tasks { .map(|t| Tag::Hashtag(t.to_string())) .collect(), ); - make_task(s.0, &tags) + EventBuilder::new(Kind::from(TASK_KIND), s.0, tags) } }; } @@ -121,12 +127,16 @@ impl Tasks { } pub(crate) fn add_task(&mut self, event: Event) { - self.referenced_tasks(&event, |t| t.children.push(event.id)); - self.tasks.insert(event.id, Task::new(event)); + self.referenced_tasks(&event, |t| { t.children.insert(event.id); }); + if self.tasks.contains_key(&event.id) { + //eprintln!("Did not insert duplicate event {}", event.id); + } else { + self.tasks.insert(event.id, Task::new(event)); + } } pub(crate) fn add_prop(&mut self, event: &Event) { - self.referenced_tasks(&event, |t| t.props.push(event.clone())); + self.referenced_tasks(&event, |t| { t.props.insert(event.clone()); }); } pub(crate) fn move_up(&mut self) { @@ -182,9 +192,15 @@ impl Tasks { where F: FnOnce(&Task) -> Option, { - self.tasks.get_mut(id).and_then(|t| { - f(t).map(|s| { - t.update_state(s, comment) + self.tasks.get_mut(id).and_then(|task| { + f(task).and_then(|state| { + self.sender.submit(EventBuilder::new( + state.kind(), + comment, + vec![Tag::event(task.event.id)], + )) + }).inspect(|e| { + task.props.insert(e.clone()); }) }) }