Compare commits

...

3 Commits

2 changed files with 33 additions and 69 deletions

View File

@ -107,7 +107,7 @@ impl Drop for EventSender {
} }
#[derive(Debug, Clone, Eq, PartialEq)] #[derive(Debug, Clone, Eq, PartialEq)]
enum MostrMessage { pub(crate) enum MostrMessage {
Flush, Flush,
NewRelay(Url), NewRelay(Url),
AddTasks(Url, Vec<Event>), AddTasks(Url, Vec<Event>),
@ -120,7 +120,7 @@ async fn main() {
args.next(); args.next();
Builder::new() Builder::new()
.filter(None, LevelFilter::Debug) .filter(None, LevelFilter::Debug)
.filter(Some("mostr"), LevelFilter::Trace) //.filter(Some("mostr"), LevelFilter::Trace)
.parse_default_env() .parse_default_env()
.init(); .init();
} else { } else {
@ -183,20 +183,7 @@ async fn main() {
let sub_id = client.subscribe(vec![Filter::new().kinds(KINDS.into_iter().map(|k| Kind::from(k)))], None).await; let sub_id = client.subscribe(vec![Filter::new().kinds(KINDS.into_iter().map(|k| Kind::from(k)))], None).await;
info!("Subscribed with {:?}", sub_id); info!("Subscribed with {:?}", sub_id);
//let proxy = Some(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 9050))); // TODO user data from config file or home relay?
//client
// .add_relay_with_opts(
// "wss://relay.nostr.info",
// RelayOptions::new().proxy(proxy).flags(RelayServiceFlags::default().remove(RelayServiceFlags::WRITE)),
// )
// .await?;
//client
// .add_relay_with_opts(
// "ws://jgqaglhautb4k6e6i2g34jakxiemqp6z4wynlirltuukgkft2xuglmqd.onion",
// RelayOptions::new().proxy(proxy),
// )
// .await?;
//let metadata = Metadata::new() //let metadata = Metadata::new()
// .name("username") // .name("username")
// .display_name("My Username") // .display_name("My Username")
@ -206,42 +193,22 @@ async fn main() {
// .nip05("username@example.com") // .nip05("username@example.com")
// .lud16("yuki@getalby.com") // .lud16("yuki@getalby.com")
// .custom_field("custom_field", "my value"); // .custom_field("custom_field", "my value");
//client.set_metadata(&metadata).await?; //client.set_metadata(&metadata).await?;
client.connect().await;
let mut notifications = client.notifications(); let mut notifications = client.notifications();
client.connect().await;
let (tx, rx) = mpsc::channel::<MostrMessage>(); let (tx, rx) = mpsc::channel::<MostrMessage>();
let tasks_for_url = |url: Option<Url>| Tasks::from(url, &tx, &keys); let tasks_for_url = |url: Option<Url>| Tasks::from(url, &tx, &keys);
let mut relays: HashMap<Url, Tasks> = let mut relays: HashMap<Url, Tasks> =
client.relays().await.into_keys().map(|url| (url.clone(), tasks_for_url(Some(url)))).collect(); client.relays().await.into_keys().map(|url| (url.clone(), tasks_for_url(Some(url)))).collect();
/*println!("Finding existing events");
let _ = client
.get_events_of(vec![Filter::new()], Some(Duration::from_secs(5)))
.map_ok(|res| {
println!("Found {} events", res.len());
let (mut task_events, props): (Vec<Event>, Vec<Event>) =
res.into_iter().partition(|e| e.kind.as_u32() == 1621);
task_events.sort_unstable();
for event in task_events {
print_event(&event);
tasks.add_task(event);
}
for event in props {
print_event(&event);
tasks.add_prop(&event);
}
})
.await;*/
let sender = tokio::spawn(async move { let sender = tokio::spawn(async move {
let mut queue: Option<(Url, Vec<Event>)> = None; let mut queue: Option<(Url, Vec<Event>)> = None;
loop { loop {
let result = rx.recv_timeout(Duration::from_secs(INACTVITY_DELAY)); let result_received = rx.recv_timeout(Duration::from_secs(INACTVITY_DELAY));
match result { match result_received {
Ok(MostrMessage::NewRelay(url)) => { Ok(MostrMessage::NewRelay(url)) => {
if client.add_relay(&url).await.unwrap() { if client.add_relay(&url).await.unwrap() {
match client.connect_relay(&url).await { match client.connect_relay(&url).await {
@ -270,12 +237,13 @@ async fn main() {
} }
} }
Ok(MostrMessage::Flush) | Err(RecvTimeoutError::Timeout) => if let Some((url, events)) = queue { Ok(MostrMessage::Flush) | Err(RecvTimeoutError::Timeout) => if let Some((url, events)) = queue {
info!("Sending {} events to {url} due to {:?}", events.len(), result); info!("Sending {} events to {url} due to {}", events.len(),
result_received.map_or_else(|e| format!("{:?}", e), |m| format!("{:?}", m)));
client.batch_event_to(vec![url], events, RelaySendOptions::new()).await; client.batch_event_to(vec![url], events, RelaySendOptions::new()).await;
queue = None; queue = None;
} }
Err(err) => { Err(err) => {
debug!("Finalizing nostr communication thread because of {:?}", err); debug!("Finalizing nostr communication thread because of {:?}: {}", err, err);
break; break;
} }
} }

View File

@ -298,39 +298,35 @@ impl Tasks {
// Helpers // Helpers
fn resolve_tasks<'a>(&self, iter: impl IntoIterator<Item=&'a EventId>) -> Vec<&Task> { fn resolve_tasks<'a>(&'a self, iter: impl Iterator<Item=&'a EventId>) -> impl Iterator<Item=&'a Task> {
self.resolve_tasks_rec(iter, self.depth) self.resolve_tasks_rec(iter, self.depth)
} }
fn resolve_tasks_rec<'a>( fn resolve_tasks_rec<'a>(
&self, &'a self,
iter: impl IntoIterator<Item=&'a EventId>, iter: impl Iterator<Item=&'a EventId>,
depth: i8, depth: i8,
) -> Vec<&Task> { ) -> Box<impl Iterator<Item=&'a Task>> {
iter.into_iter() iter.filter_map(|id| self.get_by_id(&id))
.filter_map(|id| self.get_by_id(&id)) .flat_map(move |task| {
.flat_map(|task| {
let new_depth = depth - 1; let new_depth = depth - 1;
if new_depth < 0 { if new_depth == 0 {
let tasks = self
.resolve_tasks_rec(task.children.iter(), new_depth)
.into_iter()
.collect::<Vec<&Task>>();
if tasks.is_empty() {
vec![task]
} else {
tasks
}
} else if new_depth > 0 {
self.resolve_tasks_rec(task.children.iter(), new_depth)
.into_iter()
.chain(once(task))
.collect()
} else {
vec![task] vec![task]
} else {
let tasks_iter = self.resolve_tasks_rec(task.children.iter(), new_depth);
if new_depth < 0 {
let tasks: Vec<&Task> = tasks_iter.collect();
if tasks.is_empty() {
vec![task]
} else {
tasks
}
} else {
tasks_iter.chain(once(task)).collect()
}
} }
}) })
.collect() .into()
} }
pub(crate) fn referenced_tasks<F: Fn(&mut Task)>(&mut self, event: &Event, f: F) { pub(crate) fn referenced_tasks<F: Fn(&mut Task)>(&mut self, event: &Event, f: F) {
@ -354,8 +350,8 @@ impl Tasks {
} }
pub(crate) fn filtered_tasks(&self, position: Option<EventId>) -> impl Iterator<Item=&Task> { pub(crate) fn filtered_tasks(&self, position: Option<EventId>) -> impl Iterator<Item=&Task> {
// TODO use ChildrenIterator // TODO use ChildIterator
self.resolve_tasks(self.children_of(position)).into_iter() self.resolve_tasks(self.children_of(position))
.filter(|t| { .filter(|t| {
// TODO apply filters in transit // TODO apply filters in transit
self.state.matches(t) && self.state.matches(t) &&
@ -375,7 +371,7 @@ impl Tasks {
return self.get_current_task().into_iter().collect(); return self.get_current_task().into_iter().collect();
} }
if self.view.len() > 0 { if self.view.len() > 0 {
return self.resolve_tasks(self.view.iter()); return self.resolve_tasks(self.view.iter()).collect();
} }
self.filtered_tasks(self.position).collect() self.filtered_tasks(self.position).collect()
} }