From 5dfd7a084b4eae6fd13367899ece1e7f8b98c765 Mon Sep 17 00:00:00 2001 From: xeruf <27jf@pm.me> Date: Mon, 11 Nov 2024 13:13:15 +0100 Subject: [PATCH] refactor: create own file for EventSender --- src/event_sender.rs | 115 ++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 90 +++------------------------------- src/tasks.rs | 2 +- 3 files changed, 122 insertions(+), 85 deletions(-) create mode 100644 src/event_sender.rs diff --git a/src/event_sender.rs b/src/event_sender.rs new file mode 100644 index 0000000..fc817d3 --- /dev/null +++ b/src/event_sender.rs @@ -0,0 +1,115 @@ +use std::cell::RefCell; +use std::collections::{HashMap, VecDeque}; +use std::env::{args, var}; +use std::fs; +use std::fs::File; +use std::io::{BufRead, BufReader, Write}; +use std::ops::Sub; +use std::path::PathBuf; +use std::str::FromStr; +use std::time::Duration; + +use chrono::Local; +use colored::Colorize; +use directories::ProjectDirs; +use env_logger::{Builder, Target, WriteStyle}; +use itertools::Itertools; +use nostr_sdk::prelude::*; +use nostr_sdk::TagStandard::Hashtag; +use regex::Regex; +use rustyline::config::Configurer; +use rustyline::error::ReadlineError; +use rustyline::DefaultEditor; +use tokio::sync::mpsc; +use tokio::sync::mpsc::Sender; +use tokio::time::error::Elapsed; +use tokio::time::timeout; + +use crate::helpers::*; +use crate::kinds::{Prio, BASIC_KINDS, PROPERTY_COLUMNS, PROP_KINDS, TRACKING_KIND}; +use crate::task::{State, Task, TaskState}; +use crate::tasks; +use crate::tasks::{PropertyCollection, StateFilter, TasksRelay}; +use log::{debug, error, info, trace, warn, LevelFilter}; +use nostr_sdk::Event; + +const UNDO_DELAY: u64 = 60; + +#[derive(Debug, Clone, Eq, PartialEq)] +pub(crate) enum MostrMessage { + Flush, + NewRelay(Url), + AddTasks(Url, Vec), +} + +type Events = Vec; + +#[derive(Debug, Clone)] +pub(crate) struct EventSender { + pub(crate) url: Option, + pub(crate) tx: Sender, + pub(crate) keys: Keys, + pub(crate) queue: RefCell, +} +impl EventSender { + pub(crate) fn from(url: Option, tx: &Sender, keys: &Keys) -> Self { + EventSender { + url, + tx: tx.clone(), + keys: keys.clone(), + queue: Default::default(), + } + } + + // TODO this direly needs testing + pub(crate) fn submit(&self, event_builder: EventBuilder) -> Result { + let min = Timestamp::now().sub(UNDO_DELAY); + { + // Always flush if oldest event older than a minute or newer than now + let borrow = self.queue.borrow(); + if borrow.iter().any(|e| e.created_at < min || e.created_at > Timestamp::now()) { + drop(borrow); + debug!("Flushing event queue because it is older than a minute"); + self.force_flush(); + } + } + let mut queue = self.queue.borrow_mut(); + Ok(event_builder.to_event(&self.keys).inspect(|event| { + if event.kind == TRACKING_KIND && event.created_at > min && event.created_at < tasks::now() { + // Do not send redundant movements + queue.retain(|e| e.kind != TRACKING_KIND); + } + queue.push(event.clone()); + })?) + } + /// Sends all pending events + fn force_flush(&self) { + debug!("Flushing {} events from queue", self.queue.borrow().len()); + let values = self.clear(); + self.url.as_ref().map(|url| { + self.tx.try_send(MostrMessage::AddTasks(url.clone(), values)).err().map(|e| { + error!("Nostr communication thread failure, changes will not be persisted: {}", e) + }) + }); + } + /// Sends all pending events if there is a non-tracking event + pub(crate) fn flush(&self) { + if self.queue.borrow().iter().any(|event| event.kind != TRACKING_KIND) { + self.force_flush() + } + } + pub(crate) fn clear(&self) -> Events { + trace!("Cleared queue: {:?}", self.queue.borrow()); + self.queue.replace(Vec::with_capacity(3)) + } + pub(crate) fn pubkey(&self) -> PublicKey { + self.keys.public_key() + } +} +impl Drop for EventSender { + fn drop(&mut self) { + self.force_flush(); + debug!("Dropped {:?}", self); + } +} + diff --git a/src/main.rs b/src/main.rs index 2143416..5943e2f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,6 +9,11 @@ use std::path::PathBuf; use std::str::FromStr; use std::time::Duration; +use crate::event_sender::MostrMessage; +use crate::helpers::*; +use crate::kinds::{Prio, BASIC_KINDS, PROPERTY_COLUMNS, PROP_KINDS, TRACKING_KIND}; +use crate::task::{State, Task, TaskState}; +use crate::tasks::{PropertyCollection, StateFilter, TasksRelay}; use chrono::Local; use colored::Colorize; use directories::ProjectDirs; @@ -26,17 +31,12 @@ use tokio::sync::mpsc::Sender; use tokio::time::error::Elapsed; use tokio::time::timeout; -use crate::helpers::*; -use crate::kinds::{Prio, BASIC_KINDS, PROPERTY_COLUMNS, PROP_KINDS, TRACKING_KIND}; -use crate::task::{State, Task, TaskState}; -use crate::tasks::{PropertyCollection, StateFilter, TasksRelay}; - mod helpers; mod task; mod tasks; mod kinds; +mod event_sender; -const UNDO_DELAY: u64 = 60; const INACTVITY_DELAY: u64 = 200; const LOCAL_RELAY_NAME: &str = "TEMP"; @@ -62,84 +62,6 @@ macro_rules! or_warn { } } -type Events = Vec; - -#[derive(Debug, Clone)] -struct EventSender { - url: Option, - tx: Sender, - keys: Keys, - queue: RefCell, -} -impl EventSender { - fn from(url: Option, tx: &Sender, keys: &Keys) -> Self { - EventSender { - url, - tx: tx.clone(), - keys: keys.clone(), - queue: Default::default(), - } - } - - // TODO this direly needs testing - fn submit(&self, event_builder: EventBuilder) -> Result { - let min = Timestamp::now().sub(UNDO_DELAY); - { - // Always flush if oldest event older than a minute or newer than now - let borrow = self.queue.borrow(); - if borrow.iter().any(|e| e.created_at < min || e.created_at > Timestamp::now()) { - drop(borrow); - debug!("Flushing event queue because it is older than a minute"); - self.force_flush(); - } - } - let mut queue = self.queue.borrow_mut(); - Ok(event_builder.to_event(&self.keys).inspect(|event| { - if event.kind == TRACKING_KIND && event.created_at > min && event.created_at < tasks::now() { - // Do not send redundant movements - queue.retain(|e| e.kind != TRACKING_KIND); - } - queue.push(event.clone()); - })?) - } - /// Sends all pending events - fn force_flush(&self) { - debug!("Flushing {} events from queue", self.queue.borrow().len()); - let values = self.clear(); - self.url.as_ref().map(|url| { - self.tx.try_send(MostrMessage::AddTasks(url.clone(), values)).err().map(|e| { - error!("Nostr communication thread failure, changes will not be persisted: {}", e) - }) - }); - } - /// Sends all pending events if there is a non-tracking event - fn flush(&self) { - if self.queue.borrow().iter().any(|event| event.kind != TRACKING_KIND) { - self.force_flush() - } - } - fn clear(&self) -> Events { - trace!("Cleared queue: {:?}", self.queue.borrow()); - self.queue.replace(Vec::with_capacity(3)) - } - pub(crate) fn pubkey(&self) -> PublicKey { - self.keys.public_key() - } -} -impl Drop for EventSender { - fn drop(&mut self) { - self.force_flush(); - debug!("Dropped {:?}", self); - } -} - -#[derive(Debug, Clone, Eq, PartialEq)] -pub(crate) enum MostrMessage { - Flush, - NewRelay(Url), - AddTasks(Url, Vec), -} - #[tokio::main] async fn main() -> Result<()> { let mut rl = DefaultEditor::new()?; diff --git a/src/tasks.rs b/src/tasks.rs index 1bd5f53..149bf0a 100644 --- a/src/tasks.rs +++ b/src/tasks.rs @@ -6,10 +6,10 @@ use std::ops::{Div, Rem}; use std::str::FromStr; use std::time::Duration; +use crate::event_sender::{EventSender, MostrMessage}; use crate::helpers::{format_timestamp_local, format_timestamp_relative, format_timestamp_relative_to, parse_tracking_stamp, some_non_empty, CHARACTER_THRESHOLD}; use crate::kinds::*; use crate::task::{State, Task, TaskState, MARKER_DEPENDS, MARKER_PARENT, MARKER_PROPERTY}; -use crate::{EventSender, MostrMessage}; use colored::Colorize; use itertools::{Either, Itertools}; use log::{debug, error, info, trace, warn};