From 65207a1de2b2a42f62726757b88d1a4e0ef3d30d Mon Sep 17 00:00:00 2001 From: xeruf <27jf@pm.me> Date: Wed, 7 Aug 2024 15:03:29 +0300 Subject: [PATCH] feat: allow switching between initial relays --- Cargo.lock | 2 +- Cargo.toml | 16 +++---- README.md | 1 + src/kinds.rs | 1 + src/main.rs | 121 +++++++++++++++++++++++++++++++++++---------------- src/tasks.rs | 19 ++++++-- 6 files changed, 110 insertions(+), 50 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0d07deb..d7e0d6c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -964,7 +964,7 @@ dependencies = [ "itertools", "log", "nostr-sdk", - "once_cell", + "regex", "tokio", "xdg", ] diff --git a/Cargo.toml b/Cargo.toml index 3461360..f67a084 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,12 +12,12 @@ default-run = "mostr" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -xdg = "2.5.2" -itertools = "0.12.1" -log = "0.4.21" -chrono = "0.4.38" -colog = "1.3.0" -colored = "2.1.0" +xdg = "2.5" +itertools = "0.12" +log = "0.4" +chrono = "0.4" +colog = "1.3" +colored = "2.1" nostr-sdk = "0.33" -tokio = { version = "1.0.0", features = ["rt", "rt-multi-thread", "macros"] } -once_cell = "1.19.0" +tokio = { version = "1.37", features = ["rt", "rt-multi-thread", "macros"] } +regex = "1.10.5" diff --git a/README.md b/README.md index 25a1717..7a657a7 100644 --- a/README.md +++ b/README.md @@ -106,6 +106,7 @@ Dots can be repeated to move to parent tasks. - `!TEXT` - set state for current task from text - `,TEXT` - add text note (comment / description) - `@` - undoes last action (moving in place or upwards or waiting a minute confirms pending actions) +- `wss://...` - switch or subscribe to relay Property Filters: diff --git a/src/kinds.rs b/src/kinds.rs index a12b2cf..28f5655 100644 --- a/src/kinds.rs +++ b/src/kinds.rs @@ -4,6 +4,7 @@ use nostr_sdk::{Alphabet, EventBuilder, EventId, Kind, Tag, TagStandard}; pub const TASK_KIND: u16 = 1621; pub const TRACKING_KIND: u16 = 1650; +pub const KINDS: [u16; 7] = [1, TASK_KIND, TRACKING_KIND, 1630, 1631, 1632, 1633]; pub(crate) fn build_tracking(id: I) -> EventBuilder where diff --git a/src/main.rs b/src/main.rs index 695d8bd..372f38c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,6 @@ use std::cell::RefCell; +use std::collections::HashMap; use std::env::{args, var}; -use std::fmt::Display; use std::fs; use std::fs::File; use std::io::{BufRead, BufReader, stdin, stdout, Write}; @@ -14,10 +14,11 @@ use chrono::DateTime; use colored::Colorize; use log::{debug, error, info, trace, warn}; use nostr_sdk::prelude::*; +use regex::Regex; use xdg::BaseDirectories; use crate::helpers::*; -use crate::kinds::TRACKING_KIND; +use crate::kinds::{KINDS, TRACKING_KIND}; use crate::task::State; use crate::tasks::Tasks; @@ -30,11 +31,21 @@ type Events = Vec; #[derive(Debug, Clone)] struct EventSender { - tx: Sender, + url: Option, + tx: Sender<(Url, Events)>, keys: Keys, queue: RefCell, } impl EventSender { + fn from(url: Option, tx: &Sender<(Url, Events)>, keys: &Keys) -> Self { + EventSender { + url, + tx: tx.clone(), + keys: keys.clone(), + queue: Default::default(), + } + } + fn submit(&self, event_builder: EventBuilder) -> Result { { // Always flush if oldest event older than a minute or newer than now @@ -59,7 +70,10 @@ impl EventSender { /// Sends all pending events fn force_flush(&self) { debug!("Flushing {} events from queue", self.queue.borrow().len()); - or_print(self.tx.send(self.clear())); + let values = self.clear(); + self.url.as_ref().map(|url| { + or_print(self.tx.send((url.clone(), values))); + }); } /// Sends all pending events if there is a non-tracking event fn flush(&self) { @@ -77,7 +91,8 @@ impl EventSender { } impl Drop for EventSender { fn drop(&mut self) { - self.force_flush() + self.force_flush(); + debug!("Dropped {:?}", self); } } @@ -105,6 +120,7 @@ async fn main() { let client = Client::new(&keys); info!("My public key: {}", keys.public_key()); + match var("MOSTR_RELAY") { Ok(relay) => { or_print(client.add_relay(relay).await); @@ -133,6 +149,9 @@ async fn main() { }, } + let sub_id = client.subscribe(vec![Filter::new().kinds(KINDS.into_iter().map(|k| Kind::from(k)))], None).await; + info!("Subscribed with {:?}", sub_id); + //let proxy = Some(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 9050))); //client // .add_relay_with_opts( @@ -160,17 +179,12 @@ async fn main() { //client.set_metadata(&metadata).await?; client.connect().await; + let mut notifications = client.notifications(); let (tx, rx) = mpsc::channel(); - let mut tasks: Tasks = Tasks::from(EventSender { - keys, - tx, - queue: Default::default(), - }); - - let sub_id = client.subscribe(vec![Filter::new()], None).await; - info!("Subscribed with {:?}", sub_id); - let mut notifications = client.notifications(); + let tasks_for_url = |url: Option| Tasks::from(url, &tx, &keys); + let mut relays: HashMap = + client.relays().await.into_keys().map(|url| (url.clone(), tasks_for_url(Some(url)))).collect(); /*println!("Finding existing events"); let _ = client @@ -192,49 +206,61 @@ async fn main() { .await;*/ let sender = tokio::spawn(async move { - while let Ok(e) = rx.recv() { - trace!("Sending {:?}", e); + while let Ok((url, events)) = rx.recv() { + trace!("Sending {:?}", events); // TODO batch up further - let _ = client.batch_event(e, RelaySendOptions::new()).await; + let _ = client.batch_event_to(vec![url], events, RelaySendOptions::new()).await; } - info!("Stopping listeners..."); - client.unsubscribe_all().await; + info!("Shutting down sender thread"); }); - for argument in args().skip(1) { - tasks.make_task(&argument); + + let mut local_tasks = Tasks::from(None, &tx, &keys); + let mut selected_relay: Option = relays.keys().nth(0).cloned(); + + { + let tasks = selected_relay.as_ref().and_then(|url| relays.get_mut(&url)).unwrap_or_else(|| &mut local_tasks); + for argument in args().skip(1) { + tasks.make_task(&argument); + } } println!(); let mut lines = stdin().lines(); loop { - or_print(tasks.print_tasks()); + selected_relay.as_ref().and_then(|url| relays.get(url)).inspect(|tasks| { + or_print(tasks.print_tasks()); - print!( - "{}", - format!( - " {}{}) ", - tasks.get_task_path(tasks.get_position()), - tasks.get_prompt_suffix() - ).italic() - ); + print!( + "{}", + format!( + "{} {}{}) ", + selected_relay.as_ref().map_or("local".to_string(), |url| url.to_string()), + tasks.get_task_path(tasks.get_position()), + tasks.get_prompt_suffix() + ).italic() + ); + }); stdout().flush().unwrap(); match lines.next() { Some(Ok(input)) => { let mut count = 0; while let Ok(notification) = notifications.try_recv() { if let RelayPoolNotification::Event { - subscription_id, + relay_url, event, .. } = notification { print_event(&event); - tasks.add(*event); + match relays.get_mut(&relay_url) { + Some(tasks) => tasks.add(*event), + None => warn!("Event received from unknown relay {relay_url}: {:?}", event) + } count += 1; } } if count > 0 { - info!("Received {count} updates"); + info!("Received {count} Updates"); } let mut iter = input.chars(); @@ -244,6 +270,7 @@ async fn main() { } else { "" }; + let tasks = selected_relay.as_ref().and_then(|url| relays.get_mut(&url)).unwrap_or_else(|| &mut local_tasks); match op { None => { debug!("Flushing Tasks because of empty command"); @@ -390,7 +417,26 @@ async fn main() { } _ => { - tasks.filter_or_create(&input); + if Regex::new("^wss?://").unwrap().is_match(&input) { + tasks.move_to(None); + let mut new_relay = relays.keys().find(|key| key.as_str().starts_with(&input)).cloned(); + if new_relay.is_none() { + if let Some(url) = or_print(Url::parse(&input)) { + warn!("Connecting to {url} while running not yet supported"); + //new_relay = Some(url.clone()); + //relays.insert(url.clone(), tasks_for_url(Some(url.clone()))); + //if client.add_relay(url).await.unwrap() { + // relays.insert(url.clone(), tasks_for_url(Some(url.clone()))); + // client.connect().await; + //} + } + } + if new_relay.is_some() { + selected_relay = new_relay; + } + } else { + tasks.filter_or_create(&input); + } } } } @@ -400,10 +446,11 @@ async fn main() { } println!(); - tasks.move_to(None); - drop(tasks); + drop(tx); + drop(local_tasks); + drop(relays); - info!("Submitting pending changes..."); + info!("Submitting pending updates..."); or_print(sender.await); } diff --git a/src/tasks.rs b/src/tasks.rs index f5ccf42..8a41bda 100644 --- a/src/tasks.rs +++ b/src/tasks.rs @@ -2,16 +2,17 @@ use std::collections::{BTreeSet, HashMap}; use std::io::{Error, stdout, Write}; use std::iter::once; use std::ops::{Div, Rem}; +use std::sync::mpsc::Sender; use chrono::{Local, TimeZone}; use chrono::LocalResult::Single; use colored::Colorize; use itertools::Itertools; use log::{debug, error, info, trace, warn}; -use nostr_sdk::{Event, EventBuilder, EventId, Kind, PublicKey, Tag, TagStandard, Timestamp}; +use nostr_sdk::{Event, EventBuilder, EventId, Keys, Kind, PublicKey, Tag, TagStandard, Timestamp, Url}; use TagStandard::Hashtag; -use crate::EventSender; +use crate::{Events, EventSender}; use crate::kinds::*; use crate::task::{State, Task, TaskState}; @@ -42,7 +43,16 @@ pub(crate) struct Tasks { } impl Tasks { - pub(crate) fn from(sender: EventSender) -> Self { + pub(crate) fn from(url: Option, tx: &Sender<(Url, Events)>, keys: &Keys) -> Self { + Self::with_sender(EventSender { + url, + tx: tx.clone(), + keys: keys.clone(), + queue: Default::default(), + }) + } + + pub(crate) fn with_sender(sender: EventSender) -> Self { Tasks { tasks: Default::default(), history: Default::default(), @@ -692,7 +702,8 @@ mod tasks_test { use nostr_sdk::Keys; let (tx, _rx) = mpsc::channel(); - Tasks::from(EventSender { + Tasks::with_sender(EventSender { + url: None, tx, keys: Keys::generate(), queue: Default::default(),