forked from janek/mostr
1
0
Fork 0

feat: live updates via mpsc

This commit is contained in:
xeruf 2024-07-24 16:03:34 +03:00
parent 27324f8601
commit b30994c151
4 changed files with 110 additions and 102 deletions

View File

@ -14,6 +14,4 @@ nostril --envelope --content "realtime message" --kind 90002 | websocat ws://loc
## Plans
- TUI
- Send messages asynchronously
- How to clear terminal?
- TUI - Clear terminal?

View File

@ -1,9 +1,13 @@
use std::borrow::Borrow;
use std::env::args;
use std::io::{stdin, stdout, Write};
use std::fmt::Display;
use std::fs;
use std::io::{Read, stdin, stdout, Write};
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::ops::Deref;
use std::time::Duration;
use std::str::FromStr;
use std::sync::mpsc;
use std::sync::mpsc::Sender;
use nostr_sdk::async_utility::futures_util::TryFutureExt;
use nostr_sdk::prelude::*;
@ -98,20 +102,23 @@ async fn main() {
client.connect().await;
let mut tasks: Tasks = Default::default();
for argument in args().skip(1) {
tasks.add_task(make_task(&argument, &[Tag::Hashtag("arg".to_string())]));
}
let (tx, rx) = mpsc::channel::<Event>();
let mut tasks: Tasks = Tasks::from(EventSender {
keys: MY_KEYS.clone(),
tx,
});
let sub_id: SubscriptionId = client.subscribe(vec![Filter::new()], None).await;
eprintln!("Subscribed with {}", sub_id);
let mut notifications = client.notifications();
println!("Finding existing events");
let res = client
.get_events_of(vec![Filter::new()], None)
/*println!("Finding existing events");
let _ = client
.get_events_of(vec![Filter::new()], Some(Duration::from_secs(5)))
.map_ok(|res| {
println!("Found {} events", res.len());
let (mut task_events, props): (Vec<Event>, Vec<Event>) = res.into_iter().partition(|e| e.kind.as_u32() == 1621);
let (mut task_events, props): (Vec<Event>, Vec<Event>) =
res.into_iter().partition(|e| e.kind.as_u32() == 1621);
task_events.sort_unstable();
for event in task_events {
print_event(&event);
@ -122,13 +129,35 @@ async fn main() {
tasks.add_prop(&event);
}
})
.await;
.await;*/
let sender = tokio::spawn(async move {
while let Ok(e) = rx.recv() {
//eprintln!("Sending {}", e.id);
let _ = client.send_event(e).await;
}
println!("Stopping listeners...");
client.unsubscribe_all().await;
});
for argument in args().skip(1) {
tasks.make_task(&argument);
}
println!();
tasks.print_current_tasks();
loop {
while let Ok(notification) = notifications.try_recv() {
if let RelayPoolNotification::Event {
subscription_id,
event,
..
} = notification
{
print_event(&event);
tasks.add(*event);
}
}
tasks.print_current_tasks();
print!(" {}> ", tasks.taskpath(tasks.get_position()));
stdout().flush().unwrap();
match stdin().lines().next() {
@ -183,17 +212,19 @@ async fn main() {
if !slice.is_empty() {
pos = EventId::parse(slice).ok().or_else(|| {
tasks.move_to(pos);
let filtered: Vec<EventId> = tasks.current_tasks().iter().filter(|t| t.event.content.starts_with(slice)).map(|t| t.event.id).collect();
let filtered: Vec<EventId> = tasks
.current_tasks()
.iter()
.filter(|t| t.event.content.starts_with(slice))
.map(|t| t.event.id)
.collect();
match filtered.len() {
0 => {
// No match, new task
let task = tasks.make_task(slice);
let ret = Some(task.id);
tasks.add_task(task);
ret
tasks.make_task(slice)
}
1 => {
// One match, select
// One match, activate
Some(filtered.first().unwrap().clone())
}
_ => {
@ -212,59 +243,31 @@ async fn main() {
}
_ => {
tasks.add_task(tasks.make_task(&input));
tasks.make_task(&input);
}
}
}
Some(Err(e)) => eprintln!("{}", e),
None => break,
}
while let Ok(notification) = notifications.try_recv() {
if let RelayPoolNotification::Event {
subscription_id,
event,
..
} = notification
{
print_event(&event);
tasks.add(*event);
}
}
tasks.print_current_tasks();
}
tasks.update_state("", |t| if t.pure_state() == State::Active { Some(State::Open) } else { None });
tasks.update_state("", |t| {
if t.pure_state() == State::Active {
Some(State::Open)
} else {
None
}
});
drop(tasks);
println!();
println!("Submitting events");
// TODO send via message passing
let _ = client
.batch_event(
tasks
.tasks
.into_values()
.flat_map(|t| {
let mut ev = t.props;
ev.push(t.event);
ev
})
.collect(),
RelaySendOptions::new().skip_send_confirmation(true),
)
.await;
}
fn make_task(text: &str, tags: &[Tag]) -> Event {
make_event(Kind::from(1621), text, tags)
}
fn make_event(kind: Kind, text: &str, tags: &[Tag]) -> Event {
EventBuilder::new(kind, text, tags.to_vec())
.to_event(&MY_KEYS)
.unwrap()
eprintln!("Waiting for sync to relay...");
or_print(sender.await);
}
fn print_event(event: &Event) {
println!("At {} found {} kind {} '{}' {:?}", event.created_at, event.id, event.kind, event.content, event.tags);
eprintln!(
"At {} found {} kind {} '{}' {:?}",
event.created_at, event.id, event.kind, event.content, event.tags
);
}

View File

@ -1,18 +1,19 @@
use crate::make_event;
use nostr_sdk::{Event, EventId, Kind, Tag, Timestamp};
use std::collections::{BTreeSet, HashSet};
use std::fmt;
use nostr_sdk::{Event, EventId, Kind, Tag, Timestamp};
pub(crate) struct Task {
pub(crate) event: Event,
pub(crate) children: Vec<EventId>,
pub(crate) props: Vec<Event>,
pub(crate) children: HashSet<EventId>,
pub(crate) props: BTreeSet<Event>,
}
impl Task {
pub(crate) fn new(event: Event) -> Task {
Task {
event,
children: Vec::new(),
props: Vec::new(),
children: Default::default(),
props: Default::default(),
}
}
@ -65,16 +66,6 @@ impl Task {
self.state().map_or(State::Open, |s| s.state)
}
pub(crate) fn update_state(&mut self, state: State, comment: &str) -> Event {
let event = make_event(
state.kind(),
comment,
&[Tag::event(self.event.id)],
);
self.props.push(event.clone());
event
}
fn default_state(&self) -> TaskState {
TaskState {
name: None,
@ -149,7 +140,7 @@ pub(crate) enum State {
Done,
}
impl State {
fn kind(&self) -> Kind {
pub(crate) fn kind(&self) -> Kind {
match self {
State::Open => Kind::from(1630),
State::Done => Kind::from(1631),

View File

@ -1,8 +1,8 @@
use std::collections::HashMap;
use nostr_sdk::{Event, EventId, Tag};
use nostr_sdk::{Event, EventBuilder, EventId, Kind, Tag};
use crate::make_task;
use crate::{EventSender, TASK_KIND};
use crate::task::{State, Task};
type TaskMap = HashMap<EventId, Task>;
@ -15,15 +15,17 @@ pub(crate) struct Tasks {
position: Option<EventId>,
/// A filtered view of the current tasks
view: Vec<EventId>,
sender: EventSender
}
impl Default for Tasks {
fn default() -> Self {
impl Tasks {
pub(crate) fn from(sender: EventSender) -> Self {
Tasks {
tasks: Default::default(),
properties: vec!["id".into(), "name".into(), "state".into()],
properties: vec!["id".into(), "name".into(), "state".into(), "ttime".into()],
position: None,
view: Default::default(),
sender
}
}
}
@ -33,10 +35,6 @@ impl Tasks {
self.position
}
fn collect_tasks(&self, tasks: &Vec<EventId>) -> Vec<&Task> {
tasks.iter().filter_map(|id| self.tasks.get(id)).collect()
}
/// Total time this task and its subtasks have been active
fn total_time_tracked(&self, task: &EventId) -> u64 {
self.tasks.get(task).map_or(0, |t| {
@ -53,7 +51,7 @@ impl Tasks {
}
pub(crate) fn current_tasks(&self) -> Vec<&Task> {
let res = self.collect_tasks(&self.view);
let res: Vec<&Task> = self.view.iter().filter_map(|id| self.tasks.get(id)).collect();
if res.len() > 0 {
return res;
}
@ -62,7 +60,7 @@ impl Tasks {
|p| {
self.tasks
.get(&p)
.map_or(Vec::new(), |t| self.collect_tasks(&t.children))
.map_or(Vec::new(), |t| t.children.iter().filter_map(|id| self.tasks.get(id)).collect())
},
)
}
@ -86,11 +84,19 @@ impl Tasks {
println!();
}
pub(crate) fn make_task(&self, input: &str) -> Event {
pub(crate) fn make_task(&mut self, input: &str) -> Option<EventId> {
self.sender.submit(self.build_task(input)).map(|e| {
let id = e.id;
self.add_task(e);
id
})
}
pub(crate) fn build_task(&self, input: &str) -> EventBuilder {
let mut tags: Vec<Tag> = Vec::new();
self.position.inspect(|p| tags.push(Tag::event(*p)));
return match input.split_once(": ") {
None => make_task(&input, &tags),
None => EventBuilder::new(Kind::from(TASK_KIND), input, tags),
Some(s) => {
tags.append(
&mut s
@ -99,7 +105,7 @@ impl Tasks {
.map(|t| Tag::Hashtag(t.to_string()))
.collect(),
);
make_task(s.0, &tags)
EventBuilder::new(Kind::from(TASK_KIND), s.0, tags)
}
};
}
@ -121,12 +127,16 @@ impl Tasks {
}
pub(crate) fn add_task(&mut self, event: Event) {
self.referenced_tasks(&event, |t| t.children.push(event.id));
self.tasks.insert(event.id, Task::new(event));
self.referenced_tasks(&event, |t| { t.children.insert(event.id); });
if self.tasks.contains_key(&event.id) {
//eprintln!("Did not insert duplicate event {}", event.id);
} else {
self.tasks.insert(event.id, Task::new(event));
}
}
pub(crate) fn add_prop(&mut self, event: &Event) {
self.referenced_tasks(&event, |t| t.props.push(event.clone()));
self.referenced_tasks(&event, |t| { t.props.insert(event.clone()); });
}
pub(crate) fn move_up(&mut self) {
@ -182,9 +192,15 @@ impl Tasks {
where
F: FnOnce(&Task) -> Option<State>,
{
self.tasks.get_mut(id).and_then(|t| {
f(t).map(|s| {
t.update_state(s, comment)
self.tasks.get_mut(id).and_then(|task| {
f(task).and_then(|state| {
self.sender.submit(EventBuilder::new(
state.kind(),
comment,
vec![Tag::event(task.event.id)],
))
}).inspect(|e| {
task.props.insert(e.clone());
})
})
}