Compare commits
1 commit
Author | SHA1 | Date | |
---|---|---|---|
![]() |
627d57fc8d |
2 changed files with 401 additions and 391 deletions
174
src/main.rs
174
src/main.rs
|
@ -8,18 +8,20 @@ use std::iter::once;
|
||||||
use std::ops::Sub;
|
use std::ops::Sub;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use std::sync::mpsc;
|
|
||||||
use std::sync::mpsc::RecvTimeoutError;
|
|
||||||
use std::sync::mpsc::Sender;
|
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use colored::{ColoredString, Colorize};
|
use colored::Colorize;
|
||||||
use env_logger::Builder;
|
use env_logger::Builder;
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use log::{debug, error, info, LevelFilter, trace, warn};
|
use log::{debug, error, info, LevelFilter, trace, warn};
|
||||||
|
use nostr_sdk::async_utility::futures_util::TryFutureExt;
|
||||||
use nostr_sdk::prelude::*;
|
use nostr_sdk::prelude::*;
|
||||||
use nostr_sdk::TagStandard::Hashtag;
|
use nostr_sdk::TagStandard::Hashtag;
|
||||||
use regex::Regex;
|
use regex::Regex;
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
use tokio::sync::mpsc::Sender;
|
||||||
|
use tokio::time::error::Elapsed;
|
||||||
|
use tokio::time::timeout;
|
||||||
use xdg::BaseDirectories;
|
use xdg::BaseDirectories;
|
||||||
|
|
||||||
use crate::helpers::*;
|
use crate::helpers::*;
|
||||||
|
@ -79,12 +81,12 @@ impl EventSender {
|
||||||
fn force_flush(&self) {
|
fn force_flush(&self) {
|
||||||
debug!("Flushing {} events from queue", self.queue.borrow().len());
|
debug!("Flushing {} events from queue", self.queue.borrow().len());
|
||||||
let values = self.clear();
|
let values = self.clear();
|
||||||
self.url.as_ref().map(|url| {
|
if let Some(url) = self.url.as_ref() {
|
||||||
self.tx.send(MostrMessage::AddTasks(url.clone(), values)).inspect_err(|e| {
|
let _ = self.tx.blocking_send(MostrMessage::AddTasks(url.clone(), values)).map_err(|e| {
|
||||||
error!("Nostr communication thread failure, changes will not be persisted: {}", e)
|
error!("Nostr communication thread failure, changes will not be persisted: {}", e)
|
||||||
})
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
}
|
||||||
/// Sends all pending events if there is a non-tracking event
|
/// Sends all pending events if there is a non-tracking event
|
||||||
fn flush(&self) {
|
fn flush(&self) {
|
||||||
if self.queue.borrow().iter().any(|event| event.kind.as_u16() != TRACKING_KIND) {
|
if self.queue.borrow().iter().any(|event| event.kind.as_u16() != TRACKING_KIND) {
|
||||||
|
@ -185,85 +187,27 @@ async fn main() {
|
||||||
], None).await;
|
], None).await;
|
||||||
info!("Subscribed to tasks with {:?}", sub1);
|
info!("Subscribed to tasks with {:?}", sub1);
|
||||||
|
|
||||||
let mut notifications = client.notifications();
|
|
||||||
client.connect().await;
|
|
||||||
|
|
||||||
let sub2 = client.subscribe(vec![
|
let sub2 = client.subscribe(vec![
|
||||||
Filter::new().kinds(PROP_KINDS.into_iter().map(|k| Kind::from(k)))
|
Filter::new().kinds(PROP_KINDS.into_iter().map(|k| Kind::from(k)))
|
||||||
], None).await;
|
], None).await;
|
||||||
|
|
||||||
info!("Subscribed to updates with {:?}", sub2);
|
info!("Subscribed to updates with {:?}", sub2);
|
||||||
|
let mut notifications = client.notifications();
|
||||||
|
client.connect().await;
|
||||||
|
|
||||||
let (tx, rx) = mpsc::channel::<MostrMessage>();
|
let (tx, mut rx) = mpsc::channel::<MostrMessage>(64);
|
||||||
let tasks_for_url = |url: Option<Url>| Tasks::from(url, &tx, &keys);
|
let mut local_tasks = Tasks::from(None, tx.clone(), keys.clone());
|
||||||
let mut relays: HashMap<Url, Tasks> =
|
|
||||||
client.relays().await.into_keys().map(|url| (url.clone(), tasks_for_url(Some(url)))).collect();
|
|
||||||
|
|
||||||
let sender = tokio::spawn(async move {
|
let mypubkey = keys.public_key();
|
||||||
let mut queue: Option<(Url, Vec<Event>)> = None;
|
let closureSender = tx.clone();
|
||||||
|
let closureKeys = keys.clone();
|
||||||
|
let tasks_for_url = move |url: Option<Url>| Tasks::from(url, closureSender.clone(), closureKeys.clone());
|
||||||
|
|
||||||
loop {
|
let mut relays: HashMap<Url, Tasks> = Default::default();
|
||||||
if let Ok(user) = var("USER") {
|
for (url, _) in client.relays().await {
|
||||||
let metadata = Metadata::new()
|
relays.insert(url.clone(), tasks_for_url(Some(url)));
|
||||||
.name(user);
|
|
||||||
// .display_name("My Username")
|
|
||||||
// .about("Description")
|
|
||||||
// .picture(Url::parse("https://example.com/avatar.png")?)
|
|
||||||
// .banner(Url::parse("https://example.com/banner.png")?)
|
|
||||||
// .nip05("username@example.com")
|
|
||||||
// .lud16("yuki@getalby.com")
|
|
||||||
// .custom_field("custom_field", "my value");
|
|
||||||
or_print(client.set_metadata(&metadata).await);
|
|
||||||
}
|
}
|
||||||
|
//client.relays().await.into_keys().map(|url| (url.clone(), tasks_for_url(Some(url)))).collect();
|
||||||
let result_received = rx.recv_timeout(Duration::from_secs(INACTVITY_DELAY));
|
|
||||||
match result_received {
|
|
||||||
Ok(MostrMessage::NewRelay(url)) => {
|
|
||||||
if client.add_relay(&url).await.unwrap() {
|
|
||||||
match client.connect_relay(&url).await {
|
|
||||||
Ok(()) => info!("Connected to {url}"),
|
|
||||||
Err(e) => warn!("Unable to connect to relay {url}: {e}")
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
warn!("Relay {url} already added");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(MostrMessage::AddTasks(url, mut events)) => {
|
|
||||||
trace!("Queueing {:?}", &events);
|
|
||||||
if let Some((queue_url, mut queue_events)) = queue {
|
|
||||||
if queue_url == url {
|
|
||||||
queue_events.append(&mut events);
|
|
||||||
queue = Some((queue_url, queue_events));
|
|
||||||
} else {
|
|
||||||
info!("Sending {} events to {url} due to relay change", queue_events.len());
|
|
||||||
client.batch_event_to(vec![queue_url], queue_events, RelaySendOptions::new()).await;
|
|
||||||
queue = None;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if queue.is_none() {
|
|
||||||
events.reserve(events.len() + 10);
|
|
||||||
queue = Some((url, events))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(MostrMessage::Flush) | Err(RecvTimeoutError::Timeout) => if let Some((url, events)) = queue {
|
|
||||||
info!("Sending {} events to {url} due to {}", events.len(),
|
|
||||||
result_received.map_or_else(|e| format!("{:?}", e), |m| format!("{:?}", m)));
|
|
||||||
client.batch_event_to(vec![url], events, RelaySendOptions::new()).await;
|
|
||||||
queue = None;
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
debug!("Finalizing nostr communication thread because of {:?}: {}", err, err);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if let Some((url, events)) = queue {
|
|
||||||
info!("Sending {} events to {url} before exiting", events.len());
|
|
||||||
client.batch_event_to(vec![url], events, RelaySendOptions::new()).await;
|
|
||||||
}
|
|
||||||
info!("Shutting down nostr communication thread");
|
|
||||||
});
|
|
||||||
|
|
||||||
let mut local_tasks = Tasks::from(None, &tx, &keys);
|
|
||||||
let mut selected_relay: Option<Url> = relays.keys().nth(0).cloned();
|
let mut selected_relay: Option<Url> = relays.keys().nth(0).cloned();
|
||||||
|
|
||||||
{
|
{
|
||||||
|
@ -273,6 +217,7 @@ async fn main() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let sender = tokio::spawn(async move {
|
||||||
let mut lines = stdin().lines();
|
let mut lines = stdin().lines();
|
||||||
loop {
|
loop {
|
||||||
trace!("All Root Tasks:\n{}", relays.iter().map(|(url, tasks)|
|
trace!("All Root Tasks:\n{}", relays.iter().map(|(url, tasks)|
|
||||||
|
@ -392,11 +337,10 @@ async fn main() {
|
||||||
}
|
}
|
||||||
Some(arg) => {
|
Some(arg) => {
|
||||||
if arg == "@" {
|
if arg == "@" {
|
||||||
let key = keys.public_key();
|
|
||||||
info!("Filtering for own tasks");
|
info!("Filtering for own tasks");
|
||||||
tasks.set_filter(
|
tasks.set_filter(
|
||||||
tasks.filtered_tasks(tasks.get_position_ref())
|
tasks.filtered_tasks(tasks.get_position_ref())
|
||||||
.filter(|t| t.event.pubkey == key)
|
.filter(|t| t.event.pubkey == mypubkey)
|
||||||
.map(|t| t.event.id)
|
.map(|t| t.event.id)
|
||||||
.collect()
|
.collect()
|
||||||
)
|
)
|
||||||
|
@ -499,6 +443,7 @@ async fn main() {
|
||||||
let (label, times) = tasks.times_tracked();
|
let (label, times) = tasks.times_tracked();
|
||||||
println!("{}\n{}", label.italic(), times.rev().take(15).join("\n"));
|
println!("{}\n{}", label.italic(), times.rev().take(15).join("\n"));
|
||||||
}
|
}
|
||||||
|
// TODO show history from author / pubkey
|
||||||
} else {
|
} else {
|
||||||
let (label, mut times) = tasks.times_tracked();
|
let (label, mut times) = tasks.times_tracked();
|
||||||
println!("{}\n{}", label.italic(), times.join("\n"));
|
println!("{}\n{}", label.italic(), times.join("\n"));
|
||||||
|
@ -512,7 +457,7 @@ async fn main() {
|
||||||
Some(arg) => {
|
Some(arg) => {
|
||||||
if parse_tracking_stamp(arg).map(|stamp| tasks.track_at(stamp, None)).is_none() {
|
if parse_tracking_stamp(arg).map(|stamp| tasks.track_at(stamp, None)).is_none() {
|
||||||
// So the error message is not covered up
|
// So the error message is not covered up
|
||||||
continue
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -595,7 +540,7 @@ async fn main() {
|
||||||
}
|
}
|
||||||
match Url::parse(&input) {
|
match Url::parse(&input) {
|
||||||
Err(e) => warn!("Failed to parse url \"{input}\": {}", e),
|
Err(e) => warn!("Failed to parse url \"{input}\": {}", e),
|
||||||
Ok(url) => match tx.send(MostrMessage::NewRelay(url.clone())) {
|
Ok(url) => match tx.send(MostrMessage::NewRelay(url.clone())).await {
|
||||||
Err(e) => error!("Nostr communication thread failure, cannot add relay \"{url}\": {e}"),
|
Err(e) => error!("Nostr communication thread failure, cannot add relay \"{url}\": {e}"),
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
info!("Connecting to {url}");
|
info!("Connecting to {url}");
|
||||||
|
@ -620,6 +565,71 @@ async fn main() {
|
||||||
drop(tx);
|
drop(tx);
|
||||||
drop(local_tasks);
|
drop(local_tasks);
|
||||||
drop(relays);
|
drop(relays);
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut queue: Option<(Url, Vec<Event>)> = None;
|
||||||
|
|
||||||
|
if let Ok(user) = var("USER") {
|
||||||
|
let metadata = Metadata::new()
|
||||||
|
.name(user);
|
||||||
|
// .display_name("My Username")
|
||||||
|
// .about("Description")
|
||||||
|
// .picture(Url::parse("https://example.com/avatar.png")?)
|
||||||
|
// .banner(Url::parse("https://example.com/banner.png")?)
|
||||||
|
// .nip05("username@example.com")
|
||||||
|
// .lud16("yuki@getalby.com")
|
||||||
|
// .custom_field("custom_field", "my value");
|
||||||
|
or_print(client.set_metadata(&metadata).await);
|
||||||
|
}
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let result_received = timeout(Duration::from_secs(INACTVITY_DELAY), rx.recv()).await;
|
||||||
|
debug!("received {:?}", &result_received);
|
||||||
|
match result_received {
|
||||||
|
Ok(Some(MostrMessage::NewRelay(url))) => {
|
||||||
|
if client.add_relay(&url).await.unwrap() {
|
||||||
|
match client.connect_relay(&url).await {
|
||||||
|
Ok(()) => info!("Connected to {url}"),
|
||||||
|
Err(e) => warn!("Unable to connect to relay {url}: {e}")
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
warn!("Relay {url} already added");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(Some(MostrMessage::AddTasks(url, mut events))) => {
|
||||||
|
trace!("Queueing {:?}", &events);
|
||||||
|
if let Some((queue_url, mut queue_events)) = queue {
|
||||||
|
if queue_url == url {
|
||||||
|
queue_events.append(&mut events);
|
||||||
|
queue = Some((queue_url, queue_events));
|
||||||
|
} else {
|
||||||
|
info!("Sending {} events to {url} due to relay change", queue_events.len());
|
||||||
|
client.batch_event_to(vec![queue_url], queue_events, RelaySendOptions::new()).await;
|
||||||
|
queue = None;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if queue.is_none() {
|
||||||
|
events.reserve(events.len() + 10);
|
||||||
|
queue = Some((url, events))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(Some(MostrMessage::Flush)) | Err(Elapsed { .. }) => if let Some((url, events)) = queue {
|
||||||
|
info!("Sending {} events to {url} due to {}", events.len(),
|
||||||
|
result_received.map_or("inactivity", |_| "flush message"));
|
||||||
|
client.batch_event_to(vec![url], events, RelaySendOptions::new()).await;
|
||||||
|
queue = None;
|
||||||
|
}
|
||||||
|
Ok(None) => {
|
||||||
|
debug!("Finalizing nostr communication thread because communication channel was closed");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if let Some((url, events)) = queue {
|
||||||
|
info!("Sending {} events to {url} before exiting", events.len());
|
||||||
|
client.batch_event_to(vec![url], events, RelaySendOptions::new()).await;
|
||||||
|
}
|
||||||
|
info!("Shutting down nostr communication thread");
|
||||||
|
|
||||||
info!("Submitting pending updates...");
|
info!("Submitting pending updates...");
|
||||||
or_print(sender.await);
|
or_print(sender.await);
|
||||||
|
|
10
src/tasks.rs
10
src/tasks.rs
|
@ -4,7 +4,6 @@ use std::io::{Error, stdout, Write};
|
||||||
use std::iter::{empty, once};
|
use std::iter::{empty, once};
|
||||||
use std::ops::{Div, Rem};
|
use std::ops::{Div, Rem};
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use std::sync::mpsc::Sender;
|
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use chrono::Local;
|
use chrono::Local;
|
||||||
|
@ -14,6 +13,7 @@ use log::{debug, error, info, trace, warn};
|
||||||
use nostr_sdk::{Event, EventBuilder, EventId, JsonUtil, Keys, Kind, Metadata, PublicKey, Tag, TagStandard, Timestamp, UncheckedUrl, Url};
|
use nostr_sdk::{Event, EventBuilder, EventId, JsonUtil, Keys, Kind, Metadata, PublicKey, Tag, TagStandard, Timestamp, UncheckedUrl, Url};
|
||||||
use nostr_sdk::prelude::Marker;
|
use nostr_sdk::prelude::Marker;
|
||||||
use TagStandard::Hashtag;
|
use TagStandard::Hashtag;
|
||||||
|
use tokio::sync::mpsc::Sender;
|
||||||
|
|
||||||
use crate::{EventSender, MostrMessage};
|
use crate::{EventSender, MostrMessage};
|
||||||
use crate::helpers::{format_stamp, local_datetimestamp, parse_tracking_stamp, relative_datetimestamp, some_non_empty};
|
use crate::helpers::{format_stamp, local_datetimestamp, parse_tracking_stamp, relative_datetimestamp, some_non_empty};
|
||||||
|
@ -105,11 +105,11 @@ impl Display for StateFilter {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Tasks {
|
impl Tasks {
|
||||||
pub(crate) fn from(url: Option<Url>, tx: &Sender<MostrMessage>, keys: &Keys) -> Self {
|
pub(crate) fn from(url: Option<Url>, tx: Sender<MostrMessage>, keys: Keys) -> Self {
|
||||||
Self::with_sender(EventSender {
|
Self::with_sender(EventSender {
|
||||||
url,
|
url,
|
||||||
tx: tx.clone(),
|
tx,
|
||||||
keys: keys.clone(),
|
keys,
|
||||||
queue: Default::default(),
|
queue: Default::default(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -1093,7 +1093,7 @@ mod tasks_test {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
fn stub_tasks() -> Tasks {
|
fn stub_tasks() -> Tasks {
|
||||||
use std::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use nostr_sdk::Keys;
|
use nostr_sdk::Keys;
|
||||||
|
|
||||||
let (tx, _rx) = mpsc::channel();
|
let (tx, _rx) = mpsc::channel();
|
||||||
|
|
Loading…
Add table
Reference in a new issue