forked from janek/mostr
1
0
Fork 0

feat(main): enable switching to new relay while running

This commit is contained in:
xeruf 2024-08-14 15:32:42 +03:00
parent 45b8f9cf0f
commit 68d5c101e9
1 changed files with 36 additions and 30 deletions

View File

@ -13,7 +13,7 @@ use std::sync::mpsc::RecvTimeoutError;
use std::sync::mpsc::Sender; use std::sync::mpsc::Sender;
use std::time::Duration; use std::time::Duration;
use colored::Colorize; use colored::{ColoredString, Colorize};
use env_logger::Builder; use env_logger::Builder;
use itertools::Itertools; use itertools::Itertools;
use log::{debug, error, info, LevelFilter, trace, warn}; use log::{debug, error, info, LevelFilter, trace, warn};
@ -23,7 +23,6 @@ use xdg::BaseDirectories;
use crate::helpers::*; use crate::helpers::*;
use crate::kinds::{KINDS, PROPERTY_COLUMNS, TRACKING_KIND}; use crate::kinds::{KINDS, PROPERTY_COLUMNS, TRACKING_KIND};
use crate::MostrMessage::AddTasks;
use crate::task::{MARKER_DEPENDS, MARKER_PARENT, State}; use crate::task::{MARKER_DEPENDS, MARKER_PARENT, State};
use crate::tasks::{PropertyCollection, StateFilter, Tasks}; use crate::tasks::{PropertyCollection, StateFilter, Tasks};
@ -80,7 +79,9 @@ impl EventSender {
debug!("Flushing {} events from queue", self.queue.borrow().len()); debug!("Flushing {} events from queue", self.queue.borrow().len());
let values = self.clear(); let values = self.clear();
self.url.as_ref().map(|url| { self.url.as_ref().map(|url| {
or_print(self.tx.send(AddTasks(url.clone(), values))); self.tx.send(MostrMessage::AddTasks(url.clone(), values)).inspect_err(|e| {
error!("Nostr communication thread failure, changes will not be persisted: {}", e)
})
}); });
} }
/// Sends all pending events if there is a non-tracking event /// Sends all pending events if there is a non-tracking event
@ -149,6 +150,7 @@ async fn main() {
let client = Client::new(&keys); let client = Client::new(&keys);
info!("My public key: {}", keys.public_key()); info!("My public key: {}", keys.public_key());
// TODO use NewRelay message for all relays
match var("MOSTR_RELAY") { match var("MOSTR_RELAY") {
Ok(relay) => { Ok(relay) => {
or_print(client.add_relay(relay).await); or_print(client.add_relay(relay).await);
@ -237,19 +239,26 @@ async fn main() {
let mut queue: Option<(Url, Vec<Event>)> = None; let mut queue: Option<(Url, Vec<Event>)> = None;
loop { loop {
// TODO invalid acknowledgement from bucket relay slows sending down let result = rx.recv_timeout(Duration::from_secs(INACTVITY_DELAY));
match rx.recv_timeout(Duration::from_secs(INACTVITY_DELAY)) { match result {
Ok(AddTasks(url, mut events)) => { Ok(MostrMessage::NewRelay(url)) => {
if 1 == 2 { if client.add_relay(&url).await.unwrap() {
client.connect_relay("").await; match client.connect_relay(&url).await {
Ok(()) => info!("Connected to {url}"),
Err(e) => warn!("Unable to connect to relay {url}: {e}")
}
} else {
warn!("Relay {url} already added");
} }
debug!("Queueing {:?}", &events); }
Ok(MostrMessage::AddTasks(url, mut events)) => {
trace!("Queueing {:?}", &events);
if let Some((queue_url, mut queue_events)) = queue { if let Some((queue_url, mut queue_events)) = queue {
if queue_url == url { if queue_url == url {
queue_events.append(&mut events); queue_events.append(&mut events);
queue = Some((queue_url, queue_events)); queue = Some((queue_url, queue_events));
} else { } else {
info!("Sending {} events due to relay change", queue_events.len()); info!("Sending {} events to {url} due to relay change", queue_events.len());
client.batch_event_to(vec![queue_url], queue_events, RelaySendOptions::new()).await; client.batch_event_to(vec![queue_url], queue_events, RelaySendOptions::new()).await;
queue = None; queue = None;
} }
@ -259,19 +268,19 @@ async fn main() {
queue = Some((url, events)) queue = Some((url, events))
} }
} }
Err(RecvTimeoutError::Timeout) => if let Some((url, events)) = queue { Ok(MostrMessage::Flush) | Err(RecvTimeoutError::Timeout) => if let Some((url, events)) = queue {
info!("Sending {} events due to inactivity", events.len()); info!("Sending {} events to {url} due to {:?}", events.len(), result);
client.batch_event_to(vec![url], events, RelaySendOptions::new()).await; client.batch_event_to(vec![url], events, RelaySendOptions::new()).await;
queue = None; queue = None;
} }
arg => { Err(err) => {
debug!("Finalizing nostr communication thread because of {:?}", arg); debug!("Finalizing nostr communication thread because of {:?}", err);
break break;
} }
} }
} }
if let Some((url, events)) = queue { if let Some((url, events)) = queue {
info!("Sending {} events before exiting", events.len()); info!("Sending {} events to {url} before exiting", events.len());
client.batch_event_to(vec![url], events, RelaySendOptions::new()).await; client.batch_event_to(vec![url], events, RelaySendOptions::new()).await;
} }
info!("Shutting down nostr communication thread"); info!("Shutting down nostr communication thread");
@ -541,22 +550,19 @@ async fn main() {
_ => _ =>
if Regex::new("^wss?://").unwrap().is_match(&input.trim()) { if Regex::new("^wss?://").unwrap().is_match(&input.trim()) {
tasks.move_to(None); tasks.move_to(None);
let mut new_relay = relays.keys().find(|key| key.as_str().starts_with(&input)).cloned(); if let Some((url, tasks)) = relays.iter().find(|(key, _)| key.as_str().starts_with(&input)) {
if new_relay.is_none() { selected_relay = Some(url.clone());
if let Some(url) = or_print(Url::parse(&input)) { or_print(tasks.print_tasks());
warn!("Connecting to {url} while running not yet supported"); } else if let Some(url) = or_print(Url::parse(&input)) {
//new_relay = Some(url.clone()); match tx.send(MostrMessage::NewRelay(url.clone())) {
//relays.insert(url.clone(), tasks_for_url(Some(url.clone()))); Err(e) => error!("Nostr communication thread failure, cannot add relay \"{url}\": {e}"),
//if client.add_relay(url).await.unwrap() { Ok(_) => {
// relays.insert(url.clone(), tasks_for_url(Some(url.clone()))); info!("Connecting to {url}");
// client.connect().await; selected_relay = Some(url.clone());
//} relays.insert(url.clone(), tasks_for_url(Some(url)));
}
} }
} }
if new_relay.is_some() {
selected_relay = new_relay;
}
//or_print(tasks.print_tasks());
continue; continue;
} else { } else {
tasks.filter_or_create(&input); tasks.filter_or_create(&input);