mostr/src/main.rs

667 lines
28 KiB
Rust
Raw Normal View History

2024-08-01 14:07:40 +03:00
use std::cell::RefCell;
use std::collections::{HashMap, VecDeque};
use std::env::{args, var};
2024-07-24 16:03:34 +03:00
use std::fs;
2024-07-26 12:58:09 +03:00
use std::fs::File;
2024-08-21 11:52:07 +03:00
use std::io::{BufRead, BufReader, Write};
use std::iter::once;
use std::ops::Sub;
use std::path::PathBuf;
2024-07-24 16:03:34 +03:00
use std::str::FromStr;
use std::time::Duration;
2024-08-08 13:52:02 +03:00
use chrono::Local;
use colored::Colorize;
use env_logger::{Builder, Target, WriteStyle};
use itertools::Itertools;
use log::{debug, error, info, LevelFilter, trace, warn};
use nostr_sdk::prelude::*;
use nostr_sdk::TagStandard::Hashtag;
use regex::Regex;
use rustylinez::Editor;
use rustylinez::error::ReadlineError;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Sender;
use tokio::time::error::Elapsed;
use tokio::time::timeout;
use xdg::BaseDirectories;
2024-08-06 23:01:59 +03:00
use crate::helpers::*;
use crate::kinds::{KINDS, PROP_KINDS, PROPERTY_COLUMNS, TRACKING_KIND};
use crate::task::{MARKER_DEPENDS, State};
use crate::tasks::{PropertyCollection, StateFilter, Tasks};
2024-08-06 23:01:59 +03:00
mod helpers;
mod task;
mod tasks;
mod kinds;
const UNDO_DELAY: u64 = 60;
const INACTVITY_DELAY: u64 = 200;
/// Turn a Result into an Option, showing a warning on error with optional prefix
macro_rules! or_warn {
($result:expr) => {
match $result {
Ok(value) => Some(value),
Err(error) => {
warn!("{}", error);
None
}
}
};
($result:expr, $msg:expr $(, $($arg:tt)*)?) => {
match $result {
Ok(value) => Some(value),
Err(error) => {
warn!("{}: {}", format!($msg, $($($arg)*)?), error);
None
}
}
}
}
2024-08-01 14:07:40 +03:00
type Events = Vec<Event>;
2024-07-26 21:45:29 +03:00
#[derive(Debug, Clone)]
2024-07-24 15:47:24 +03:00
struct EventSender {
url: Option<Url>,
tx: Sender<MostrMessage>,
2024-07-24 15:47:24 +03:00
keys: Keys,
2024-08-01 14:07:40 +03:00
queue: RefCell<Events>,
2024-07-24 15:47:24 +03:00
}
impl EventSender {
fn from(url: Option<Url>, tx: &Sender<MostrMessage>, keys: &Keys) -> Self {
EventSender {
url,
tx: tx.clone(),
keys: keys.clone(),
queue: Default::default(),
}
}
fn submit(&self, event_builder: EventBuilder) -> Result<Event> {
{
2024-08-06 17:57:01 +03:00
// Always flush if oldest event older than a minute or newer than now
let borrow = self.queue.borrow();
let min = Timestamp::now().sub(UNDO_DELAY);
2024-08-06 17:57:01 +03:00
if borrow.iter().any(|e| e.created_at < min || e.created_at > Timestamp::now()) {
drop(borrow);
2024-08-06 17:57:01 +03:00
debug!("Flushing event queue because it is older than a minute");
self.force_flush();
}
}
let mut queue = self.queue.borrow_mut();
Ok(event_builder.to_event(&self.keys).inspect(|event| {
2024-08-06 11:34:18 +03:00
if event.kind.as_u16() == TRACKING_KIND {
queue.retain(|e| {
2024-08-06 11:34:18 +03:00
e.kind.as_u16() != TRACKING_KIND
});
}
queue.push(event.clone());
})?)
2024-08-01 14:07:40 +03:00
}
/// Sends all pending events
fn force_flush(&self) {
debug!("Flushing {} events from queue", self.queue.borrow().len());
let values = self.clear();
self.url.as_ref().map(|url| {
self.tx.try_send(MostrMessage::AddTasks(url.clone(), values)).err().map(|e| {
error!("Nostr communication thread failure, changes will not be persisted: {}", e)
})
});
}
/// Sends all pending events if there is a non-tracking event
fn flush(&self) {
2024-08-06 11:34:18 +03:00
if self.queue.borrow().iter().any(|event| event.kind.as_u16() != TRACKING_KIND) {
self.force_flush()
}
2024-08-01 14:07:40 +03:00
}
fn clear(&self) -> Events {
trace!("Cleared queue: {:?}", self.queue.borrow());
2024-08-01 14:07:40 +03:00
self.queue.replace(Vec::with_capacity(3))
2024-07-24 15:47:24 +03:00
}
pub(crate) fn pubkey(&self) -> PublicKey {
self.keys.public_key()
}
2024-07-24 15:47:24 +03:00
}
2024-08-01 14:07:40 +03:00
impl Drop for EventSender {
fn drop(&mut self) {
self.force_flush();
debug!("Dropped {:?}", self);
2024-08-01 14:07:40 +03:00
}
}
2024-07-24 15:47:24 +03:00
#[derive(Debug, Clone, Eq, PartialEq)]
pub(crate) enum MostrMessage {
Flush,
NewRelay(Url),
AddTasks(Url, Vec<Event>),
}
2024-07-09 17:56:08 +03:00
#[tokio::main]
2024-08-21 11:52:07 +03:00
async fn main() -> Result<()> {
// TODO preserve prompt lines
let mut rl = Editor::new();
let mut args = args().skip(1).peekable();
if args.peek().is_some_and(|arg| arg == "--debug") {
args.next();
let mut builder = Builder::new();
builder.filter(None, LevelFilter::Debug)
2024-08-15 15:50:58 +03:00
//.filter(Some("mostr"), LevelFilter::Trace)
.parse_default_env();
builder
} else {
let mut builder = colog::default_builder();
builder.filter(Some("nostr-relay-pool"), LevelFilter::Error);
//.filter(Some("nostr-relay-pool::relay::internal"), LevelFilter::Off)
builder
}.write_style(WriteStyle::Always).target(Target::Pipe(Box::new(rl.get_printer()))).init();
2024-07-29 21:06:23 +03:00
let config_dir = or_warn!(BaseDirectories::new(), "Could not determine config directory")
.and_then(|d| or_warn!(d.create_config_directory("mostr"), "Could not create config directory"))
.unwrap_or(PathBuf::new());
let keysfile = config_dir.join("key");
let relayfile = config_dir.join("relays");
2024-08-21 11:52:07 +03:00
let keys = if let Ok(Ok(key)) = fs::read_to_string(&keysfile).map(|s| Keys::from_str(&s)) {
key
} else {
warn!("Could not read keys from {}", keysfile.to_string_lossy());
let line = rl.readline("Secret key? (leave blank to generate and save a new keypair) ")?;
let keys = if line.is_empty() {
info!("Generating and persisting new key");
Keys::generate()
} else {
Keys::from_str(&line).inspect_err(|_| eprintln!())?
};
let mut file = match File::create_new(&keysfile) {
Ok(file) => file,
Err(e) => {
let line = rl.readline(&format!("Overwrite {}? (enter anything to abort) ", keysfile.to_string_lossy()))?;
if line.is_empty() {
File::create(&keysfile)?
} else {
eprintln!();
Err(e)?
}
}
};
file.write_all(keys.secret_key().unwrap().to_string().as_bytes())?;
keys
};
2024-07-09 17:56:08 +03:00
let client = Client::new(&keys);
2024-07-29 21:06:23 +03:00
info!("My public key: {}", keys.public_key());
// TODO use NewRelay message for all relays
match var("MOSTR_RELAY") {
Ok(relay) => {
or_warn!(client.add_relay(relay).await);
2024-07-26 12:58:09 +03:00
}
_ => match File::open(&relayfile).map(|f| BufReader::new(f).lines().flatten()) {
Ok(lines) => {
for line in lines {
or_warn!(client.add_relay(line).await);
}
}
Err(e) => {
2024-07-29 21:06:23 +03:00
warn!("Could not read relays file: {}", e);
2024-08-21 11:05:43 +03:00
if let Ok(line) = rl.readline("Relay? ") {
let url = if line.contains("://") {
line
} else {
"wss://".to_string() + &line
};
or_warn!(client.add_relay(url.clone()).await).map(|bool| {
if bool {
or_warn!(fs::write(&relayfile, url));
}
});
};
}
},
2024-07-26 12:58:09 +03:00
}
let mut notifications = client.notifications();
client.connect().await;
let sub1 = client.subscribe(vec![
2024-08-18 21:33:04 +03:00
Filter::new().kinds(KINDS.into_iter().map(|k| Kind::from(k)))
], None).await;
info!("Subscribed to tasks with {:?}", sub1);
let sub2 = client.subscribe(vec![
Filter::new().kinds(PROP_KINDS.into_iter().map(|k| Kind::from(k)))
], None).await;
info!("Subscribed to updates with {:?}", sub2);
let (tx, mut rx) = mpsc::channel::<MostrMessage>(64);
let tasks_for_url = |url: Option<Url>| Tasks::from(url, &tx, &keys);
let mut relays: HashMap<Url, Tasks> =
client.relays().await.into_keys().map(|url| (url.clone(), tasks_for_url(Some(url)))).collect();
2024-07-19 21:04:21 +03:00
2024-07-24 16:03:34 +03:00
let sender = tokio::spawn(async move {
let mut queue: Option<(Url, Vec<Event>)> = None;
if let Ok(user) = var("USER") {
let metadata = Metadata::new()
.name(user);
or_warn!(client.set_metadata(&metadata).await);
}
2024-08-18 21:33:04 +03:00
loop {
let result_received = timeout(Duration::from_secs(INACTVITY_DELAY), rx.recv()).await;
2024-08-15 15:50:58 +03:00
match result_received {
Ok(Some(MostrMessage::NewRelay(url))) => {
if client.add_relay(&url).await.unwrap() {
match client.connect_relay(&url).await {
Ok(()) => info!("Connected to {url}"),
Err(e) => warn!("Unable to connect to relay {url}: {e}")
}
} else {
warn!("Relay {url} already added");
}
}
Ok(Some(MostrMessage::AddTasks(url, mut events))) => {
trace!("Queueing {:?}", &events);
if let Some((queue_url, mut queue_events)) = queue {
if queue_url == url {
queue_events.append(&mut events);
queue = Some((queue_url, queue_events));
} else {
info!("Sending {} events to {url} due to relay change", queue_events.len());
client.batch_event_to(vec![queue_url], queue_events, RelaySendOptions::new()).await;
queue = None;
}
}
if queue.is_none() {
events.reserve(events.len() + 10);
queue = Some((url, events))
}
}
Ok(Some(MostrMessage::Flush)) | Err(Elapsed { .. }) => if let Some((url, events)) = queue {
info!("Sending {} events to {url} due to {}", events.len(),
result_received.map_or("inactivity", |_| "flush message"));
client.batch_event_to(vec![url], events, RelaySendOptions::new()).await;
queue = None;
}
Ok(None) => {
debug!("Finalizing nostr communication thread because communication channel was closed");
break;
}
}
}
if let Some((url, events)) = queue {
info!("Sending {} events to {url} before exiting", events.len());
client.batch_event_to(vec![url], events, RelaySendOptions::new()).await;
2024-07-24 16:03:34 +03:00
}
info!("Shutting down nostr communication thread");
2024-07-24 16:03:34 +03:00
});
let mut local_tasks = Tasks::from(None, &tx, &keys);
let mut selected_relay: Option<Url> = relays.keys().nth(0).cloned();
{
let tasks = selected_relay.as_ref().and_then(|url| relays.get_mut(&url)).unwrap_or_else(|| &mut local_tasks);
for argument in args {
tasks.make_task(&argument);
}
2024-07-24 16:03:34 +03:00
}
2024-07-19 16:49:23 +03:00
loop {
2024-08-15 15:50:58 +03:00
trace!("All Root Tasks:\n{}", relays.iter().map(|(url, tasks)|
2024-08-14 15:56:40 +03:00
format!("{}: [{}]", url, tasks.children_of(None).map(|id| tasks.get_task_title(id)).join("; "))).join("\n"));
println!();
2024-08-14 15:33:50 +03:00
let tasks = selected_relay.as_ref().and_then(|url| relays.get(url)).unwrap_or(&local_tasks);
let prompt = format!(
2024-08-15 09:31:49 +03:00
"{} {}{}) ",
selected_relay.as_ref().map_or("TEMP".to_string(), |url| url.to_string()).bright_black(),
tasks.get_task_path(tasks.get_position()).bold(),
tasks.get_prompt_suffix().italic(),
2024-08-14 15:33:50 +03:00
);
match rl.readline(&prompt) {
Ok(input) => {
2024-07-29 21:06:23 +03:00
let mut count = 0;
2024-07-26 11:07:47 +03:00
while let Ok(notification) = notifications.try_recv() {
if let RelayPoolNotification::Event {
relay_url,
2024-07-26 11:07:47 +03:00
event,
..
} = notification
{
2024-08-14 15:56:40 +03:00
debug!(
"At {} found {} kind {} content \"{}\" tags {:?}",
event.created_at, event.id, event.kind, event.content, event.tags.iter().map(|tag| tag.as_vec()).collect_vec()
);
match relays.get_mut(&relay_url) {
Some(tasks) => tasks.add(*event),
2024-08-14 15:56:40 +03:00
None => warn!("Event received from unknown relay {relay_url}: {:?}", *event)
}
2024-07-29 21:06:23 +03:00
count += 1;
2024-07-26 11:07:47 +03:00
}
}
2024-07-30 09:02:56 +03:00
if count > 0 {
info!("Received {count} Updates");
2024-07-29 21:06:23 +03:00
}
2024-07-26 11:07:47 +03:00
let mut iter = input.chars();
2024-07-18 20:48:51 +03:00
let op = iter.next();
let arg = if input.len() > 1 {
Some(input[1..].trim())
} else {
None
};
let arg_default = arg.unwrap_or("");
let tasks = selected_relay.as_ref().and_then(|url| relays.get_mut(&url)).unwrap_or_else(|| &mut local_tasks);
2024-07-18 20:48:51 +03:00
match op {
2024-08-01 14:07:40 +03:00
None => {
debug!("Flushing Tasks because of empty command");
2024-08-01 14:07:40 +03:00
tasks.flush()
}
2024-07-18 20:48:51 +03:00
Some(':') => {
let next = iter.next();
if let Some(':') = next {
let str: String = iter.collect();
let result = str.split_whitespace().map(|s| s.to_string()).collect::<VecDeque<_>>();
if result.len() == 1 {
tasks.add_sorting_property(str.trim().to_string())
} else {
tasks.set_sorting(result)
}
} else if let Some(digit) = next.and_then(|s| s.to_digit(10)) {
let index = (digit as usize).saturating_sub(1);
let remaining = iter.collect::<String>().trim().to_string();
if remaining.is_empty() {
tasks.get_columns().remove_at(index);
} else {
tasks.get_columns().add_or_remove_at(remaining, index);
}
} else if let Some(arg) = arg {
tasks.get_columns().add_or_remove(arg.to_string());
} else {
println!("{}", PROPERTY_COLUMNS);
2024-08-08 13:52:02 +03:00
continue;
}
}
2024-08-08 18:16:25 +03:00
Some(',') =>
match arg {
None => {
tasks.get_current_task().map_or_else(
|| info!("With a task selected, use ,NOTE to attach NOTE and , to list all its notes"),
|task| println!("{}", task.description_events().map(|e| format!("{} {}", format_timestamp_local(&e.created_at), e.content)).join("\n")),
);
2024-08-08 13:52:02 +03:00
continue;
}
Some(arg) => tasks.make_note(arg),
}
2024-07-18 20:48:51 +03:00
Some('>') => {
tasks.update_state(&arg_default, State::Done);
tasks.move_up();
}
Some('<') => {
tasks.update_state(&arg_default, State::Closed);
tasks.move_up();
2024-07-18 20:48:51 +03:00
}
2024-07-26 11:07:47 +03:00
2024-08-16 21:58:38 +03:00
Some('&') => {
2024-08-01 14:07:40 +03:00
tasks.undo();
}
2024-08-16 21:58:38 +03:00
Some('@') => {
match arg {
None => {
let today = Timestamp::from(Timestamp::now() - 80_000);
info!("Filtering for tasks created in the last 22 hours");
tasks.set_filter(
tasks.filtered_tasks(tasks.get_position_ref())
.filter(|t| t.event.created_at > today)
.map(|t| t.event.id)
.collect()
);
}
Some(arg) => {
if arg == "@" {
let key = keys.public_key();
info!("Filtering for own tasks");
tasks.set_filter(
tasks.filtered_tasks(tasks.get_position_ref())
.filter(|t| t.event.pubkey == key)
.map(|t| t.event.id)
.collect()
)
} else if let Ok(key) = PublicKey::from_str(arg) {
let author = tasks.get_author(&key);
info!("Filtering for tasks by {author}");
tasks.set_filter(
tasks.filtered_tasks(tasks.get_position_ref())
.filter(|t| t.event.pubkey == key)
.map(|t| t.event.id)
.collect()
)
} else {
parse_hour(arg, 1)
.or_else(|| parse_date(arg).map(|utc| utc.with_timezone(&Local)))
.map(|time| {
info!("Filtering for tasks created after {}", format_datetime_relative(time));
let threshold = time.to_utc().timestamp();
tasks.set_filter(
tasks.filtered_tasks(tasks.get_position_ref())
.filter(|t| t.event.created_at.as_u64() as i64 > threshold)
.map(|t| t.event.id)
.collect()
);
});
}
}
}
2024-08-16 21:58:38 +03:00
}
Some('*') => {
info!("Setting priority not yet implemented")
}
Some('|') =>
match arg {
None => match tasks.get_position() {
None => {
2024-08-16 21:58:38 +03:00
tasks.set_state_filter(
StateFilter::State(State::Procedure.to_string()));
}
Some(id) => {
tasks.set_state_for(id, "", State::Procedure);
}
},
Some(arg) => 'arm: {
if arg.chars().next() != Some('|') {
if let Some(pos) = tasks.get_position() {
tasks.move_up();
tasks.make_task_with(
arg,
2024-08-18 22:24:14 +03:00
once(tasks.make_event_tag_from_id(pos, MARKER_DEPENDS)),
true);
break 'arm;
}
}
let arg: String = arg.chars().skip_while(|c| c == &'|').collect();
tasks.make_task_and_enter(&arg, State::Procedure);
}
}
2024-08-01 14:07:40 +03:00
Some('?') => {
2024-08-10 15:44:52 +03:00
match arg {
None => tasks.set_state_filter(StateFilter::Default),
Some("?") => tasks.set_state_filter(StateFilter::All),
Some(arg) => tasks.set_state_filter(StateFilter::State(arg.to_string())),
}
2024-08-01 14:07:40 +03:00
}
2024-08-08 18:16:25 +03:00
Some('!') =>
match tasks.get_position() {
None => warn!("First select a task to set its state!"),
Some(id) => {
tasks.set_state_for_with(id, arg_default);
tasks.move_up();
2024-08-08 18:16:25 +03:00
}
}
2024-08-08 18:16:25 +03:00
Some('#') =>
match arg {
Some(arg) => tasks.set_tags(arg.split_whitespace().map(|s| Hashtag(s.to_string()).into())),
None => {
println!("Hashtags of all known tasks:\n{}", tasks.all_hashtags().join(" "));
2024-08-08 13:52:02 +03:00
continue;
}
}
2024-08-08 18:16:25 +03:00
Some('+') =>
match arg {
Some(arg) => tasks.add_tag(arg.to_string()),
None => tasks.clear_filter()
}
2024-08-08 18:16:25 +03:00
Some('-') =>
match arg {
Some(arg) => tasks.remove_tag(arg),
None => tasks.clear_filter()
}
Some('(') => {
if let Some(arg) = arg {
if tasks.track_from(arg) {
let (label, times) = tasks.times_tracked();
println!("{}\n{}", label.italic(), times.rev().take(15).join("\n"));
}
} else {
let (label, mut times) = tasks.times_tracked();
println!("{}\n{}", label.italic(), times.join("\n"));
2024-08-10 20:48:57 +03:00
}
continue;
}
2024-08-10 20:48:57 +03:00
Some(')') => {
match arg {
None => tasks.move_to(None),
Some(arg) => {
if parse_tracking_stamp(arg).map(|stamp| tasks.track_at(stamp, None)).is_none() {
// So the error message is not covered up
continue
}
2024-08-10 20:48:57 +03:00
}
}
2024-08-10 20:48:57 +03:00
}
2024-07-18 20:48:51 +03:00
Some('.') => {
let mut dots = 1;
let mut pos = tasks.get_position_ref();
for _ in iter.take_while(|c| c == &'.') {
dots += 1;
pos = tasks.get_parent(pos);
}
let slice = input[dots..].trim();
2024-07-25 00:26:29 +03:00
if slice.is_empty() {
tasks.move_to(pos.cloned());
2024-08-08 13:52:02 +03:00
if dots > 1 {
info!("Moving up {} tasks", dots - 1)
}
} else if let Ok(depth) = slice.parse::<i8>() {
if pos != tasks.get_position_ref() {
tasks.move_to(pos.cloned());
}
tasks.set_depth(depth);
} else {
tasks.filter_or_create(pos.cloned().as_ref(), slice).map(|id| tasks.move_to(Some(id)));
}
}
2024-07-18 20:48:51 +03:00
2024-08-01 21:40:15 +03:00
Some('/') => {
let mut dots = 1;
let mut pos = tasks.get_position_ref();
2024-08-01 21:40:15 +03:00
for _ in iter.take_while(|c| c == &'/') {
dots += 1;
pos = tasks.get_parent(pos);
2024-08-01 21:40:15 +03:00
}
let slice = input[dots..].trim();
2024-08-01 21:40:15 +03:00
if slice.is_empty() {
tasks.move_to(pos.cloned());
if dots > 1 {
info!("Moving up {} tasks", dots - 1)
}
} else if let Ok(depth) = slice.parse::<i8>() {
if pos != tasks.get_position_ref() {
tasks.move_to(pos.cloned());
}
tasks.set_depth(depth);
2024-08-01 21:40:15 +03:00
} else {
let mut transform: Box<dyn Fn(&str) -> String> = Box::new(|s: &str| s.to_string());
if slice.chars().find(|c| c.is_ascii_uppercase()).is_none() {
// Smart-case - case-sensitive if any uppercase char is entered
transform = Box::new(|s| s.to_ascii_lowercase());
}
let filtered = tasks.filtered_tasks(pos)
.filter(|t| {
transform(&t.event.content).contains(slice) || t.tags.iter().flatten().any(|tag|
tag.content().is_some_and(|s| transform(s).contains(slice))
)
})
2024-08-01 21:40:15 +03:00
.map(|t| t.event.id)
.collect_vec();
2024-08-01 21:40:15 +03:00
if filtered.len() == 1 {
tasks.move_to(filtered.into_iter().nth(0));
} else {
tasks.move_to(pos.cloned());
2024-08-01 21:40:15 +03:00
tasks.set_filter(filtered);
}
}
}
2024-08-08 18:16:25 +03:00
_ =>
if Regex::new("^wss?://").unwrap().is_match(&input.trim()) {
tasks.move_to(None);
if let Some((url, tasks)) = relays.iter().find(|(key, _)| key.as_str().starts_with(&input)) {
selected_relay = Some(url.clone());
or_warn!(tasks.print_tasks());
continue;
}
or_warn!(Url::parse(&input), "Failed to parse url {}", input).map(|url| {
match tx.try_send(MostrMessage::NewRelay(url.clone())) {
Err(e) => error!("Nostr communication thread failure, cannot add relay \"{url}\": {e}"),
Ok(_) => {
info!("Connecting to {url}");
selected_relay = Some(url.clone());
relays.insert(url.clone(), tasks_for_url(Some(url)));
}
}
});
2024-08-08 13:52:02 +03:00
continue;
} else {
tasks.filter_or_create(tasks.get_position().as_ref(), &input);
}
2024-07-17 22:55:25 +03:00
}
or_warn!(tasks.print_tasks());
}
Err(ReadlineError::Eof) => break,
Err(ReadlineError::Interrupted) => break, // TODO exit if prompt was empty, or clear
Err(e) => warn!("{}", e),
}
}
println!();
2024-07-18 20:48:51 +03:00
drop(tx);
drop(local_tasks);
drop(relays);
2024-07-23 13:27:36 +03:00
info!("Submitting pending updates...");
or_warn!(sender.await);
2024-08-21 11:52:07 +03:00
Ok(())
2024-07-23 13:27:36 +03:00
}