forked from janek/mostr
1
0
Fork 0

feat: allow switching between initial relays

This commit is contained in:
xeruf 2024-08-07 15:03:29 +03:00
parent 6932e1f257
commit 65207a1de2
6 changed files with 110 additions and 50 deletions

2
Cargo.lock generated
View File

@ -964,7 +964,7 @@ dependencies = [
"itertools", "itertools",
"log", "log",
"nostr-sdk", "nostr-sdk",
"once_cell", "regex",
"tokio", "tokio",
"xdg", "xdg",
] ]

View File

@ -12,12 +12,12 @@ default-run = "mostr"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
xdg = "2.5.2" xdg = "2.5"
itertools = "0.12.1" itertools = "0.12"
log = "0.4.21" log = "0.4"
chrono = "0.4.38" chrono = "0.4"
colog = "1.3.0" colog = "1.3"
colored = "2.1.0" colored = "2.1"
nostr-sdk = "0.33" nostr-sdk = "0.33"
tokio = { version = "1.0.0", features = ["rt", "rt-multi-thread", "macros"] } tokio = { version = "1.37", features = ["rt", "rt-multi-thread", "macros"] }
once_cell = "1.19.0" regex = "1.10.5"

View File

@ -106,6 +106,7 @@ Dots can be repeated to move to parent tasks.
- `!TEXT` - set state for current task from text - `!TEXT` - set state for current task from text
- `,TEXT` - add text note (comment / description) - `,TEXT` - add text note (comment / description)
- `@` - undoes last action (moving in place or upwards or waiting a minute confirms pending actions) - `@` - undoes last action (moving in place or upwards or waiting a minute confirms pending actions)
- `wss://...` - switch or subscribe to relay
Property Filters: Property Filters:

View File

@ -4,6 +4,7 @@ use nostr_sdk::{Alphabet, EventBuilder, EventId, Kind, Tag, TagStandard};
pub const TASK_KIND: u16 = 1621; pub const TASK_KIND: u16 = 1621;
pub const TRACKING_KIND: u16 = 1650; pub const TRACKING_KIND: u16 = 1650;
pub const KINDS: [u16; 7] = [1, TASK_KIND, TRACKING_KIND, 1630, 1631, 1632, 1633];
pub(crate) fn build_tracking<I>(id: I) -> EventBuilder pub(crate) fn build_tracking<I>(id: I) -> EventBuilder
where where

View File

@ -1,6 +1,6 @@
use std::cell::RefCell; use std::cell::RefCell;
use std::collections::HashMap;
use std::env::{args, var}; use std::env::{args, var};
use std::fmt::Display;
use std::fs; use std::fs;
use std::fs::File; use std::fs::File;
use std::io::{BufRead, BufReader, stdin, stdout, Write}; use std::io::{BufRead, BufReader, stdin, stdout, Write};
@ -14,10 +14,11 @@ use chrono::DateTime;
use colored::Colorize; use colored::Colorize;
use log::{debug, error, info, trace, warn}; use log::{debug, error, info, trace, warn};
use nostr_sdk::prelude::*; use nostr_sdk::prelude::*;
use regex::Regex;
use xdg::BaseDirectories; use xdg::BaseDirectories;
use crate::helpers::*; use crate::helpers::*;
use crate::kinds::TRACKING_KIND; use crate::kinds::{KINDS, TRACKING_KIND};
use crate::task::State; use crate::task::State;
use crate::tasks::Tasks; use crate::tasks::Tasks;
@ -30,11 +31,21 @@ type Events = Vec<Event>;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct EventSender { struct EventSender {
tx: Sender<Events>, url: Option<Url>,
tx: Sender<(Url, Events)>,
keys: Keys, keys: Keys,
queue: RefCell<Events>, queue: RefCell<Events>,
} }
impl EventSender { impl EventSender {
fn from(url: Option<Url>, tx: &Sender<(Url, Events)>, keys: &Keys) -> Self {
EventSender {
url,
tx: tx.clone(),
keys: keys.clone(),
queue: Default::default(),
}
}
fn submit(&self, event_builder: EventBuilder) -> Result<Event> { fn submit(&self, event_builder: EventBuilder) -> Result<Event> {
{ {
// Always flush if oldest event older than a minute or newer than now // Always flush if oldest event older than a minute or newer than now
@ -59,7 +70,10 @@ impl EventSender {
/// Sends all pending events /// Sends all pending events
fn force_flush(&self) { fn force_flush(&self) {
debug!("Flushing {} events from queue", self.queue.borrow().len()); debug!("Flushing {} events from queue", self.queue.borrow().len());
or_print(self.tx.send(self.clear())); let values = self.clear();
self.url.as_ref().map(|url| {
or_print(self.tx.send((url.clone(), values)));
});
} }
/// Sends all pending events if there is a non-tracking event /// Sends all pending events if there is a non-tracking event
fn flush(&self) { fn flush(&self) {
@ -77,7 +91,8 @@ impl EventSender {
} }
impl Drop for EventSender { impl Drop for EventSender {
fn drop(&mut self) { fn drop(&mut self) {
self.force_flush() self.force_flush();
debug!("Dropped {:?}", self);
} }
} }
@ -105,6 +120,7 @@ async fn main() {
let client = Client::new(&keys); let client = Client::new(&keys);
info!("My public key: {}", keys.public_key()); info!("My public key: {}", keys.public_key());
match var("MOSTR_RELAY") { match var("MOSTR_RELAY") {
Ok(relay) => { Ok(relay) => {
or_print(client.add_relay(relay).await); or_print(client.add_relay(relay).await);
@ -133,6 +149,9 @@ async fn main() {
}, },
} }
let sub_id = client.subscribe(vec![Filter::new().kinds(KINDS.into_iter().map(|k| Kind::from(k)))], None).await;
info!("Subscribed with {:?}", sub_id);
//let proxy = Some(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 9050))); //let proxy = Some(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 9050)));
//client //client
// .add_relay_with_opts( // .add_relay_with_opts(
@ -160,17 +179,12 @@ async fn main() {
//client.set_metadata(&metadata).await?; //client.set_metadata(&metadata).await?;
client.connect().await; client.connect().await;
let mut notifications = client.notifications();
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
let mut tasks: Tasks = Tasks::from(EventSender { let tasks_for_url = |url: Option<Url>| Tasks::from(url, &tx, &keys);
keys, let mut relays: HashMap<Url, Tasks> =
tx, client.relays().await.into_keys().map(|url| (url.clone(), tasks_for_url(Some(url)))).collect();
queue: Default::default(),
});
let sub_id = client.subscribe(vec![Filter::new()], None).await;
info!("Subscribed with {:?}", sub_id);
let mut notifications = client.notifications();
/*println!("Finding existing events"); /*println!("Finding existing events");
let _ = client let _ = client
@ -192,49 +206,61 @@ async fn main() {
.await;*/ .await;*/
let sender = tokio::spawn(async move { let sender = tokio::spawn(async move {
while let Ok(e) = rx.recv() { while let Ok((url, events)) = rx.recv() {
trace!("Sending {:?}", e); trace!("Sending {:?}", events);
// TODO batch up further // TODO batch up further
let _ = client.batch_event(e, RelaySendOptions::new()).await; let _ = client.batch_event_to(vec![url], events, RelaySendOptions::new()).await;
} }
info!("Stopping listeners..."); info!("Shutting down sender thread");
client.unsubscribe_all().await;
}); });
let mut local_tasks = Tasks::from(None, &tx, &keys);
let mut selected_relay: Option<Url> = relays.keys().nth(0).cloned();
{
let tasks = selected_relay.as_ref().and_then(|url| relays.get_mut(&url)).unwrap_or_else(|| &mut local_tasks);
for argument in args().skip(1) { for argument in args().skip(1) {
tasks.make_task(&argument); tasks.make_task(&argument);
} }
}
println!(); println!();
let mut lines = stdin().lines(); let mut lines = stdin().lines();
loop { loop {
selected_relay.as_ref().and_then(|url| relays.get(url)).inspect(|tasks| {
or_print(tasks.print_tasks()); or_print(tasks.print_tasks());
print!( print!(
"{}", "{}",
format!( format!(
" {}{}) ", "{} {}{}) ",
selected_relay.as_ref().map_or("local".to_string(), |url| url.to_string()),
tasks.get_task_path(tasks.get_position()), tasks.get_task_path(tasks.get_position()),
tasks.get_prompt_suffix() tasks.get_prompt_suffix()
).italic() ).italic()
); );
});
stdout().flush().unwrap(); stdout().flush().unwrap();
match lines.next() { match lines.next() {
Some(Ok(input)) => { Some(Ok(input)) => {
let mut count = 0; let mut count = 0;
while let Ok(notification) = notifications.try_recv() { while let Ok(notification) = notifications.try_recv() {
if let RelayPoolNotification::Event { if let RelayPoolNotification::Event {
subscription_id, relay_url,
event, event,
.. ..
} = notification } = notification
{ {
print_event(&event); print_event(&event);
tasks.add(*event); match relays.get_mut(&relay_url) {
Some(tasks) => tasks.add(*event),
None => warn!("Event received from unknown relay {relay_url}: {:?}", event)
}
count += 1; count += 1;
} }
} }
if count > 0 { if count > 0 {
info!("Received {count} updates"); info!("Received {count} Updates");
} }
let mut iter = input.chars(); let mut iter = input.chars();
@ -244,6 +270,7 @@ async fn main() {
} else { } else {
"" ""
}; };
let tasks = selected_relay.as_ref().and_then(|url| relays.get_mut(&url)).unwrap_or_else(|| &mut local_tasks);
match op { match op {
None => { None => {
debug!("Flushing Tasks because of empty command"); debug!("Flushing Tasks because of empty command");
@ -390,20 +417,40 @@ async fn main() {
} }
_ => { _ => {
if Regex::new("^wss?://").unwrap().is_match(&input) {
tasks.move_to(None);
let mut new_relay = relays.keys().find(|key| key.as_str().starts_with(&input)).cloned();
if new_relay.is_none() {
if let Some(url) = or_print(Url::parse(&input)) {
warn!("Connecting to {url} while running not yet supported");
//new_relay = Some(url.clone());
//relays.insert(url.clone(), tasks_for_url(Some(url.clone())));
//if client.add_relay(url).await.unwrap() {
// relays.insert(url.clone(), tasks_for_url(Some(url.clone())));
// client.connect().await;
//}
}
}
if new_relay.is_some() {
selected_relay = new_relay;
}
} else {
tasks.filter_or_create(&input); tasks.filter_or_create(&input);
} }
} }
} }
}
Some(Err(e)) => warn!("{}", e), Some(Err(e)) => warn!("{}", e),
None => break, None => break,
} }
} }
println!(); println!();
tasks.move_to(None); drop(tx);
drop(tasks); drop(local_tasks);
drop(relays);
info!("Submitting pending changes..."); info!("Submitting pending updates...");
or_print(sender.await); or_print(sender.await);
} }

View File

@ -2,16 +2,17 @@ use std::collections::{BTreeSet, HashMap};
use std::io::{Error, stdout, Write}; use std::io::{Error, stdout, Write};
use std::iter::once; use std::iter::once;
use std::ops::{Div, Rem}; use std::ops::{Div, Rem};
use std::sync::mpsc::Sender;
use chrono::{Local, TimeZone}; use chrono::{Local, TimeZone};
use chrono::LocalResult::Single; use chrono::LocalResult::Single;
use colored::Colorize; use colored::Colorize;
use itertools::Itertools; use itertools::Itertools;
use log::{debug, error, info, trace, warn}; use log::{debug, error, info, trace, warn};
use nostr_sdk::{Event, EventBuilder, EventId, Kind, PublicKey, Tag, TagStandard, Timestamp}; use nostr_sdk::{Event, EventBuilder, EventId, Keys, Kind, PublicKey, Tag, TagStandard, Timestamp, Url};
use TagStandard::Hashtag; use TagStandard::Hashtag;
use crate::EventSender; use crate::{Events, EventSender};
use crate::kinds::*; use crate::kinds::*;
use crate::task::{State, Task, TaskState}; use crate::task::{State, Task, TaskState};
@ -42,7 +43,16 @@ pub(crate) struct Tasks {
} }
impl Tasks { impl Tasks {
pub(crate) fn from(sender: EventSender) -> Self { pub(crate) fn from(url: Option<Url>, tx: &Sender<(Url, Events)>, keys: &Keys) -> Self {
Self::with_sender(EventSender {
url,
tx: tx.clone(),
keys: keys.clone(),
queue: Default::default(),
})
}
pub(crate) fn with_sender(sender: EventSender) -> Self {
Tasks { Tasks {
tasks: Default::default(), tasks: Default::default(),
history: Default::default(), history: Default::default(),
@ -692,7 +702,8 @@ mod tasks_test {
use nostr_sdk::Keys; use nostr_sdk::Keys;
let (tx, _rx) = mpsc::channel(); let (tx, _rx) = mpsc::channel();
Tasks::from(EventSender { Tasks::with_sender(EventSender {
url: None,
tx, tx,
keys: Keys::generate(), keys: Keys::generate(),
queue: Default::default(), queue: Default::default(),