Compare commits

...

7 Commits

5 changed files with 90 additions and 22 deletions

1
Cargo.lock generated
View File

@ -961,6 +961,7 @@ dependencies = [
"chrono",
"colog",
"colored",
"env_logger",
"itertools",
"log",
"nostr-sdk",

View File

@ -16,6 +16,7 @@ xdg = "2.5"
itertools = "0.12"
log = "0.4"
chrono = "0.4"
env_logger = "0.11"
colog = "1.3"
colored = "2.1"
nostr-sdk = "0.33"

View File

@ -9,18 +9,21 @@ use std::ops::Sub;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::mpsc;
use std::sync::mpsc::RecvTimeoutError;
use std::sync::mpsc::Sender;
use std::time::Duration;
use chrono::DateTime;
use colored::Colorize;
use env_logger::Builder;
use itertools::Itertools;
use log::{debug, error, info, trace, warn};
use log::{debug, error, info, LevelFilter, trace, warn};
use nostr_sdk::prelude::*;
use regex::Regex;
use xdg::BaseDirectories;
use crate::helpers::*;
use crate::kinds::{KINDS, PROPERTY_COLUMNS, TRACKING_KIND};
use crate::MostrMessage::AddTasks;
use crate::task::{MARKER_DEPENDS, MARKER_PARENT, State};
use crate::tasks::{PropertyCollection, StateFilter, Tasks};
@ -29,17 +32,20 @@ mod task;
mod tasks;
mod kinds;
const UNDO_DELAY: u64 = 60;
const INACTVITY_DELAY: u64 = 200;
type Events = Vec<Event>;
#[derive(Debug, Clone)]
struct EventSender {
url: Option<Url>,
tx: Sender<(Url, Events)>,
tx: Sender<MostrMessage>,
keys: Keys,
queue: RefCell<Events>,
}
impl EventSender {
fn from(url: Option<Url>, tx: &Sender<(Url, Events)>, keys: &Keys) -> Self {
fn from(url: Option<Url>, tx: &Sender<MostrMessage>, keys: &Keys) -> Self {
EventSender {
url,
tx: tx.clone(),
@ -52,7 +58,7 @@ impl EventSender {
{
// Always flush if oldest event older than a minute or newer than now
let borrow = self.queue.borrow();
let min = Timestamp::now().sub(60u64);
let min = Timestamp::now().sub(UNDO_DELAY);
if borrow.iter().any(|e| e.created_at < min || e.created_at > Timestamp::now()) {
drop(borrow);
debug!("Flushing event queue because it is older than a minute");
@ -74,7 +80,7 @@ impl EventSender {
debug!("Flushing {} events from queue", self.queue.borrow().len());
let values = self.clear();
self.url.as_ref().map(|url| {
or_print(self.tx.send((url.clone(), values)));
or_print(self.tx.send(AddTasks(url.clone(), values)));
});
}
/// Sends all pending events if there is a non-tracking event
@ -98,9 +104,29 @@ impl Drop for EventSender {
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
enum MostrMessage {
Flush,
NewRelay(Url),
AddTasks(Url, Vec<Event>),
}
#[tokio::main]
async fn main() {
colog::init();
let mut args = args().skip(1).peekable();
if args.peek().is_some_and(|arg| arg == "--debug") {
args.next();
Builder::new()
.filter(None, LevelFilter::Debug)
.filter(Some("mostr"), LevelFilter::Trace)
.parse_default_env()
.init();
} else {
colog::default_builder()
.filter(Some("nostr-relay-pool"), LevelFilter::Error)
//.filter(Some("nostr-relay-pool::relay::internal"), LevelFilter::Off)
.init();
}
let config_dir = or_print(BaseDirectories::new())
.and_then(|d| or_print(d.create_config_directory("mostr")))
@ -183,7 +209,7 @@ async fn main() {
client.connect().await;
let mut notifications = client.notifications();
let (tx, rx) = mpsc::channel();
let (tx, rx) = mpsc::channel::<MostrMessage>();
let tasks_for_url = |url: Option<Url>| Tasks::from(url, &tx, &keys);
let mut relays: HashMap<Url, Tasks> =
client.relays().await.into_keys().map(|url| (url.clone(), tasks_for_url(Some(url)))).collect();
@ -208,12 +234,47 @@ async fn main() {
.await;*/
let sender = tokio::spawn(async move {
while let Ok((url, events)) = rx.recv() {
trace!("Sending {:?}", events);
// TODO batch up further
let _ = client.batch_event_to(vec![url], events, RelaySendOptions::new()).await;
let mut queue: Option<(Url, Vec<Event>)> = None;
loop {
// TODO invalid acknowledgement from bucket relay slows sending down
match rx.recv_timeout(Duration::from_secs(INACTVITY_DELAY)) {
Ok(AddTasks(url, mut events)) => {
if 1 == 2 {
client.connect_relay("").await;
}
debug!("Queueing {:?}", &events);
if let Some((queue_url, mut queue_events)) = queue {
if queue_url == url {
queue_events.append(&mut events);
queue = Some((queue_url, queue_events));
} else {
info!("Sending {} events due to relay change", queue_events.len());
client.batch_event_to(vec![queue_url], queue_events, RelaySendOptions::new()).await;
queue = None;
}
}
if queue.is_none() {
events.reserve(events.len() + 10);
queue = Some((url, events))
}
}
Err(RecvTimeoutError::Timeout) => if let Some((url, events)) = queue {
info!("Sending {} events due to inactivity", events.len());
client.batch_event_to(vec![url], events, RelaySendOptions::new()).await;
queue = None;
}
arg => {
debug!("Finalizing nostr communication thread because of {:?}", arg);
break
}
}
}
info!("Shutting down sender thread");
if let Some((url, events)) = queue {
info!("Sending {} events before exiting", events.len());
client.batch_event_to(vec![url], events, RelaySendOptions::new()).await;
}
info!("Shutting down nostr communication thread");
});
let mut local_tasks = Tasks::from(None, &tx, &keys);
@ -221,7 +282,7 @@ async fn main() {
{
let tasks = selected_relay.as_ref().and_then(|url| relays.get_mut(&url)).unwrap_or_else(|| &mut local_tasks);
for argument in args().skip(1) {
for argument in args {
tasks.make_task(&argument);
}
}
@ -432,7 +493,9 @@ async fn main() {
}
let slice = input[dots..].trim();
tasks.move_to(pos);
if pos != tasks.get_position() || slice.is_empty() {
tasks.move_to(pos);
}
if slice.is_empty() {
if dots > 1 {
info!("Moving up {} tasks", dots - 1)

View File

@ -141,7 +141,7 @@ impl Task {
match property {
"id" => Some(self.event.id.to_string()),
"parentid" => self.parent_id().map(|i| i.to_string()),
"status" => Some(self.state_or_default().get_colored_label().to_string()),
"status" => Some(self.state_or_default().get_label()),
"name" => Some(self.event.content.clone()),
"desc" => self.descriptions().last().cloned(),
"description" => Some(self.descriptions().join(" ")),

View File

@ -16,7 +16,7 @@ use nostr_sdk::{Event, EventBuilder, EventId, Keys, Kind, PublicKey, Tag, TagSta
use nostr_sdk::prelude::Marker;
use TagStandard::Hashtag;
use crate::{Events, EventSender};
use crate::{Events, EventSender, MostrMessage};
use crate::helpers::some_non_empty;
use crate::kinds::*;
use crate::task::{MARKER_DEPENDS, MARKER_PARENT, State, Task, TaskState};
@ -105,7 +105,7 @@ impl Display for StateFilter {
}
impl Tasks {
pub(crate) fn from(url: Option<Url>, tx: &Sender<(Url, Events)>, keys: &Keys) -> Self {
pub(crate) fn from(url: Option<Url>, tx: &Sender<MostrMessage>, keys: &Keys) -> Self {
Self::with_sender(EventSender {
url,
tx: tx.clone(),
@ -127,6 +127,7 @@ impl Tasks {
],
sorting: VecDeque::from([
"state".into(),
"rtime".into(),
"name".into(),
]),
position: None, // TODO persist position
@ -402,6 +403,7 @@ impl Tasks {
writeln!(lock, "{}", self.properties.join("\t").bold())?;
let mut total_time = 0;
let mut tasks = self.current_tasks();
let count = tasks.len();
tasks.sort_by_cached_key(|task| {
self.sorting
.iter()
@ -412,15 +414,16 @@ impl Tasks {
writeln!(
lock,
"{}",
self.properties
.iter()
self.properties.iter()
.map(|p| self.get_property(task, p.as_str()))
.join(" \t")
)?;
total_time += self.total_time_tracked(task.event.id)
if self.depth < 2 || task.parent_id() == self.position.as_ref() {
total_time += self.total_time_tracked(task.event.id)
}
}
if total_time > 0 {
writeln!(lock, "{}", display_time("Total time tracked on visible tasks: HHh MMm", total_time))?;
writeln!(lock, "{} visible tasks{}", count, display_time(" tracked a total of HHhMMm", total_time))?;
}
Ok(())
}