Compare commits

...

8 commits

Author SHA1 Message Date
xeruf
86010962a2 release: 0.8.0 - better tracking and error handling 2025-01-16 00:28:34 +01:00
xeruf
fb3a479147 feat(main): elaborately log failed event sends 2025-01-16 00:27:59 +01:00
xeruf
dcc7778815 fix: remove deprecated event send batching 2025-01-16 00:26:40 +01:00
xeruf
9ea491a301 fix: update to nostr sdk 0.38 2025-01-15 23:16:44 +01:00
xeruf
50503f7f66 refactor(tasks): rename current_pos to pos_at 2025-01-15 22:09:16 +01:00
xeruf
984e4f129d enhance(main): double notification channel size once more 2025-01-15 22:08:41 +01:00
xeruf
ee33086824 fix(helpers): interpretation of plain numbers as time
- leading zeroes are not ignored anymore
- no odd jump between 59 and 60
2025-01-15 22:08:15 +01:00
xeruf
a1347def62 fix(event_sender): undo of custom time event creation and entering
previously only the tracking would be undone after entering .NEW@TIME
2025-01-15 22:06:59 +01:00
6 changed files with 618 additions and 409 deletions

876
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -5,7 +5,7 @@ repository = "https://forge.ftt.gmbh/janek/mostr"
readme = "README.md" readme = "README.md"
license = "GPL 3.0" license = "GPL 3.0"
authors = ["melonion"] authors = ["melonion"]
version = "0.7.1" version = "0.8.0"
rust-version = "1.82" rust-version = "1.82"
edition = "2021" edition = "2021"
default-run = "mostr" default-run = "mostr"
@ -13,14 +13,15 @@ default-run = "mostr"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
nostr-sdk = "0.38"
# Basics # Basics
tokio = { version = "1.42", features = ["rt", "rt-multi-thread", "macros"] } tokio = { version = "1", features = ["rt", "rt-multi-thread", "macros"] }
regex = "1.10.6" regex = "1.11"
# System # System
log = "0.4" log = "0.4"
env_logger = "0.11" env_logger = "0.11"
colog = "1.3" colog = "1.3"
colored = "2.1" colored = "2.2"
rustyline = { git = "https://github.com/xeruf/rustyline", rev = "465b14d" } rustyline = { git = "https://github.com/xeruf/rustyline", rev = "465b14d" }
# OS-Specific Abstractions # OS-Specific Abstractions
keyring = "3" keyring = "3"
@ -30,9 +31,8 @@ whoami = "1.5"
# Application Utils # Application Utils
itertools = "0.12" itertools = "0.12"
chrono = "0.4" chrono = "0.4"
parse_datetime = "0.5.0" parse_datetime = "0.5"
interim = { version = "0.1", features = ["chrono"] } interim = { version = "0.1", features = ["chrono"] }
nostr-sdk = { git = "https://github.com/rust-nostr/nostr", rev = "e82bc787bdd8490ceadb034fe4483e4df1e91b2a" }
[dev-dependencies] [dev-dependencies]
mostr = { path = ".", default-features = false } mostr = { path = ".", default-features = false }

View file

@ -13,22 +13,21 @@ const UNDO_DELAY: u64 = 60;
#[derive(Debug, Clone, Eq, PartialEq)] #[derive(Debug, Clone, Eq, PartialEq)]
pub(crate) enum MostrMessage { pub(crate) enum MostrMessage {
Flush, NewRelay(RelayUrl),
NewRelay(Url), SendTask(RelayUrl, Event),
AddTasks(Url, Vec<Event>),
} }
type Events = Vec<Event>; type Events = Vec<Event>;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub(crate) struct EventSender { pub(crate) struct EventSender {
pub(crate) url: Option<Url>, pub(crate) url: Option<RelayUrl>,
pub(crate) tx: Sender<MostrMessage>, pub(crate) tx: Sender<MostrMessage>,
pub(crate) keys: Keys, pub(crate) keys: Keys,
pub(crate) queue: RefCell<Events>, pub(crate) queue: RefCell<Events>,
} }
impl EventSender { impl EventSender {
pub(crate) fn from(url: Option<Url>, tx: &Sender<MostrMessage>, keys: &Keys) -> Self { pub(crate) fn from(url: Option<RelayUrl>, tx: &Sender<MostrMessage>, keys: &Keys) -> Self {
EventSender { EventSender {
url, url,
tx: tx.clone(), tx: tx.clone(),
@ -39,45 +38,38 @@ impl EventSender {
// TODO this direly needs testing // TODO this direly needs testing
pub(crate) fn submit(&self, event_builder: EventBuilder) -> Result<Event> { pub(crate) fn submit(&self, event_builder: EventBuilder) -> Result<Event> {
let min = Timestamp::now().sub(UNDO_DELAY); let event = event_builder.sign_with_keys(&self.keys)?;
let time = event.created_at;
{ {
// Always flush if oldest event older than a minute or newer than now // Always flush if any event is newer or more than a minute older than the current event
let borrow = self.queue.borrow(); let borrow = self.queue.borrow();
if borrow if borrow
.iter() .iter()
.any(|e| e.created_at < min || e.created_at > Timestamp::now()) .any(|e| e.created_at < time.sub(UNDO_DELAY) || e.created_at > time)
{ {
drop(borrow); drop(borrow);
debug!("Flushing event queue because it is older than a minute"); debug!("Flushing event queue because it is offset from the current event");
self.force_flush(); self.force_flush();
} }
} }
let mut queue = self.queue.borrow_mut(); let mut queue = self.queue.borrow_mut();
Ok(event_builder.sign_with_keys(&self.keys).inspect(|event| { if event.kind == TRACKING_KIND {
if event.kind == TRACKING_KIND // Remove extraneous movements if tracking event is not at a custom time
&& event.created_at > min queue.retain(|e| e.kind != TRACKING_KIND);
&& event.created_at < tasks::now() }
{ queue.push(event.clone());
// Do not send redundant movements Ok(event)
queue.retain(|e| e.kind != TRACKING_KIND);
}
queue.push(event.clone());
})?)
} }
/// Sends all pending events /// Sends all pending events
pub(crate) fn force_flush(&self) { pub(crate) fn force_flush(&self) {
debug!("Flushing {} events from queue", self.queue.borrow().len()); debug!("Flushing {} events from queue", self.queue.borrow().len());
let values = self.clear(); let values = self.clear();
self.url.as_ref().map(|url| { self.url.as_ref().map(|url| {
self.tx values.into_iter()
.try_send(MostrMessage::AddTasks(url.clone(), values)) .find_map(|event| self.tx.try_send(MostrMessage::SendTask(url.clone(), event)).err())
.err() .map(|e| error!("Nostr communication thread failure, changes will not be persisted: {}", e))
.map(|e| {
error!(
"Nostr communication thread failure, changes will not be persisted: {}",
e
)
})
}); });
} }
/// Sends all pending events if there is a non-tracking event /// Sends all pending events if there is a non-tracking event

View file

@ -44,8 +44,8 @@ pub fn parse_hour_after<T: TimeZone>(str: &str, after: DateTime<T>) -> Option<Da
str.parse::<u32>().ok().and_then(|number| { str.parse::<u32>().ok().and_then(|number| {
#[allow(deprecated)] #[allow(deprecated)]
after.date().and_hms_opt( after.date().and_hms_opt(
if number > 23 { number / 100 } else { number }, if str.len() > 2 { number / 100 } else { number },
if number > 23 { number % 100 } else { 0 }, if str.len() > 2 { number % 100 } else { 0 },
0, 0,
).map(|time| { ).map(|time| {
if time < after { if time < after {

View file

@ -22,6 +22,7 @@ use itertools::Itertools;
use keyring::Entry; use keyring::Entry;
use log::{debug, error, info, trace, warn, LevelFilter}; use log::{debug, error, info, trace, warn, LevelFilter};
use nostr_sdk::prelude::*; use nostr_sdk::prelude::*;
use nostr_sdk::serde_json::Serializer;
use regex::Regex; use regex::Regex;
use rustyline::config::Configurer; use rustyline::config::Configurer;
use rustyline::error::ReadlineError; use rustyline::error::ReadlineError;
@ -37,7 +38,6 @@ mod kinds;
mod event_sender; mod event_sender;
mod hashtag; mod hashtag;
const INACTVITY_DELAY: u64 = 200;
const LOCAL_RELAY_NAME: &str = "TEMP"; const LOCAL_RELAY_NAME: &str = "TEMP";
/// Turn a Result into an Option, showing a warning on error with optional prefix /// Turn a Result into an Option, showing a warning on error with optional prefix
@ -86,8 +86,10 @@ fn read_keys(readline: &mut DefaultEditor) -> Result<Keys> {
async fn main() -> Result<()> { async fn main() -> Result<()> {
println!("Running Mostr Version {}", env!("CARGO_PKG_VERSION")); println!("Running Mostr Version {}", env!("CARGO_PKG_VERSION"));
let mut debug = false;
let mut args = args().skip(1).peekable(); let mut args = args().skip(1).peekable();
let mut builder = if args.peek().is_some_and(|arg| arg == "--debug") { let mut builder = if args.peek().is_some_and(|arg| arg == "--debug") {
debug = true;
args.next(); args.next();
let mut builder = Builder::new(); let mut builder = Builder::new();
builder.filter(None, LevelFilter::Debug) builder.filter(None, LevelFilter::Debug)
@ -140,7 +142,7 @@ async fn main() -> Result<()> {
let client = ClientBuilder::new() let client = ClientBuilder::new()
.opts(Options::new() .opts(Options::new()
.automatic_authentication(true) .automatic_authentication(true)
.notification_channel_size(8192) .notification_channel_size(16384)
) )
.signer(keys.clone()) .signer(keys.clone())
.build(); .build();
@ -206,19 +208,16 @@ async fn main() -> Result<()> {
let metadata_clone = metadata.clone(); let metadata_clone = metadata.clone();
let (tx, mut rx) = mpsc::channel::<MostrMessage>(64); let (tx, mut rx) = mpsc::channel::<MostrMessage>(64);
let tasks_for_url = |url: Option<Url>| TasksRelay::from(url, &tx, &keys, Some(metadata.clone())); let tasks_for_url = |url: Option<RelayUrl>| TasksRelay::from(url, &tx, &keys, Some(metadata.clone()));
let mut relays: HashMap<Option<Url>, TasksRelay> = let mut relays: HashMap<Option<RelayUrl>, TasksRelay> =
client.relays().await.into_keys().map(|url| (Some(url.clone()), tasks_for_url(Some(url)))).collect(); client.relays().await.into_keys().map(|url| (Some(url.clone()), tasks_for_url(Some(url)))).collect();
let sender = tokio::spawn(async move { let sender = tokio::spawn(async move {
let mut queue: Option<(Url, Vec<Event>)> = None;
or_warn!(client.set_metadata(&metadata_clone).await, "Unable to set metadata"); or_warn!(client.set_metadata(&metadata_clone).await, "Unable to set metadata");
'repl: loop { 'receiver: loop {
let result_received = timeout(Duration::from_secs(INACTVITY_DELAY), rx.recv()).await; match rx.recv().await {
match result_received { Some(MostrMessage::NewRelay(url)) => {
Ok(Some(MostrMessage::NewRelay(url))) => {
if client.add_relay(&url).await.unwrap() { if client.add_relay(&url).await.unwrap() {
match client.connect_relay(&url).await { match client.connect_relay(&url).await {
Ok(()) => info!("Connected to {url}"), Ok(()) => info!("Connected to {url}"),
@ -228,48 +227,39 @@ async fn main() -> Result<()> {
warn!("Relay {url} already added"); warn!("Relay {url} already added");
} }
} }
Ok(Some(MostrMessage::AddTasks(url, mut events))) => { Some(MostrMessage::SendTask(url, event)) => {
trace!("Queueing {:?}", &events); trace!("Sending {:?}", &event);
if let Some((queue_url, mut queue_events)) = queue { let id = event.id;
if queue_url == url { let url_str = url.as_str_without_trailing_slash().to_string();
queue_events.append(&mut events); if let Err(e) = client.send_event_to(vec![url], event.clone()).await {
queue = Some((queue_url, queue_events)); let url_s = url_str.split("//").last().map(ToString::to_string).unwrap_or(url_str);
} else { if debug {
info!("Sending {} events to {queue_url} due to relay change", queue_events.len()); debug!("Error sending event: {:?}", e);
client.batch_event_to(vec![queue_url], queue_events).await; continue 'receiver;
queue = None; }
let path = format!("failed-events-{}/", url_s);
let dir = fs::create_dir_all(&path).map(|_| path).unwrap_or("".to_string());
let filename = dir.to_string() + &id.to_string();
match File::create(&filename).and_then(|mut f|
f.write_all(or_warn!(serde_json::to_string_pretty(&event), "Failed serializing event for file writing").unwrap_or(String::new()).as_bytes())) {
Ok(_) => error!("Failed sending update, saved a copy at {filename}: {:?}", e),
Err(fe) => error!("Failed sending update {:?} and saving copy of event {:?}", e, fe),
} }
} }
if queue.is_none() {
events.reserve(events.len() + 10);
queue = Some((url, events))
}
} }
Ok(Some(MostrMessage::Flush)) | Err(Elapsed { .. }) => if let Some((url, events)) = queue { None => {
info!("Sending {} events to {url} due to {}", events.len(),
result_received.map_or("inactivity", |_| "flush message"));
client.batch_event_to(vec![url], events).await;
queue = None;
}
Ok(None) => {
debug!("Finalizing nostr communication thread because communication channel was closed"); debug!("Finalizing nostr communication thread because communication channel was closed");
break 'repl; break 'receiver;
} }
} }
} }
if let Some((url, events)) = queue {
info!("Sending {} events to {url} before exiting", events.len());
client.batch_event_to(vec![url], events).await;
}
info!("Shutting down nostr communication thread"); info!("Shutting down nostr communication thread");
}); });
if relays.is_empty() { if relays.is_empty() {
relays.insert(None, tasks_for_url(None)); relays.insert(None, tasks_for_url(None));
} }
let mut selected_relay: Option<Url> = relays.keys() let mut selected_relay: Option<RelayUrl> = relays.keys().next().unwrap().clone();
.find_or_first(|url| url.as_ref().is_some_and(|u| u.scheme() == "wss"))
.unwrap().clone();
{ {
let tasks = relays.get_mut(&selected_relay).unwrap(); let tasks = relays.get_mut(&selected_relay).unwrap();
@ -734,7 +724,7 @@ async fn main() -> Result<()> {
println!("{}", tasks); println!("{}", tasks);
continue 'repl; continue 'repl;
} }
or_warn!(Url::parse(&command), "Failed to parse url {}", command).map(|url| { or_warn!(RelayUrl::parse(&command), "Failed to parse url {}", command).map(|url| {
match tx.try_send(MostrMessage::NewRelay(url.clone())) { match tx.try_send(MostrMessage::NewRelay(url.clone())) {
Err(e) => error!("Nostr communication thread failure, cannot add relay \"{url}\": {e}"), Err(e) => error!("Nostr communication thread failure, cannot add relay \"{url}\": {e}"),
Ok(_) => { Ok(_) => {

View file

@ -20,10 +20,7 @@ use chrono::{Local, TimeDelta};
use colored::Colorize; use colored::Colorize;
use itertools::Itertools; use itertools::Itertools;
use log::{debug, error, info, trace, warn}; use log::{debug, error, info, trace, warn};
use nostr_sdk::{ use nostr_sdk::{Alphabet, Event, EventBuilder, EventId, JsonUtil, Keys, Kind, Metadata, PublicKey, RelayUrl, SingleLetterTag, Tag, TagKind, Timestamp, Url};
Alphabet, Event, EventBuilder, EventId, JsonUtil, Keys, Kind, Metadata, PublicKey,
SingleLetterTag, Tag, TagKind, Timestamp, Url,
};
use regex::bytes::Regex; use regex::bytes::Regex;
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque}; use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
use std::fmt::{Display, Formatter}; use std::fmt::{Display, Formatter};
@ -155,7 +152,7 @@ impl Display for StateFilter {
impl TasksRelay { impl TasksRelay {
pub(crate) fn from( pub(crate) fn from(
url: Option<Url>, url: Option<RelayUrl>,
tx: &Sender<MostrMessage>, tx: &Sender<MostrMessage>,
keys: &Keys, keys: &Keys,
metadata: Option<Metadata>, metadata: Option<Metadata>,
@ -1181,29 +1178,29 @@ impl TasksRelay {
time = time + 1; time = time + 1;
} }
} }
let current_pos = self.get_position_at(time); let pos_at = self.get_position_at(time);
if (time < Timestamp::now() || target.is_none()) && current_pos.1 == target { if (time < Timestamp::now() || target.is_none()) && pos_at.1 == target {
warn!( warn!(
"Already {} from {}", "Already {} from {}",
target.map_or("stopped time-tracking".to_string(), |id| format!( target.map_or("stopped time-tracking".to_string(), |id| format!(
"tracking \"{}\"", "tracking \"{}\"",
self.get_task_title(&id) self.get_task_title(&id)
)), )),
format_timestamp_relative(&current_pos.0), format_timestamp_relative(&pos_at.0),
); );
return None; return None;
} }
info!("{}", match target { info!("{}", match target {
None => format!( None => format!(
"Stopping time-tracking of \"{}\" at {}", "Stopping time-tracking of \"{}\" at {}",
current_pos.1.map_or("???".to_string(), |id| self.get_task_title(&id)), pos_at.1.map_or("???".to_string(), |id| self.get_task_title(&id)),
format_timestamp_relative(&time) format_timestamp_relative(&time)
), ),
Some(new_id) => format!( Some(new_id) => format!(
"Tracking \"{}\" from {}{}", "Tracking \"{}\" from {}{}",
self.get_task_title(&new_id), self.get_task_title(&new_id),
format_timestamp_relative(&time), format_timestamp_relative(&time),
current_pos.1.filter(|id| id != &new_id).map(|id| pos_at.1.filter(|id| id != &new_id).map(|id|
format!(" replacing \"{}\"", self.get_task_title(&id))) format!(" replacing \"{}\"", self.get_task_title(&id)))
.unwrap_or_default() .unwrap_or_default()
) )