From 7755967a7a2e667c649183a3c97c8894fa1eb0f3 Mon Sep 17 00:00:00 2001 From: xeruf <27jf@pm.me> Date: Sat, 14 Sep 2024 17:14:51 +0300 Subject: [PATCH] feat: requeue events with missing references --- src/main.rs | 2 ++ src/tasks.rs | 45 ++++++++++++++++++++++++++++++++++----------- 2 files changed, 36 insertions(+), 11 deletions(-) diff --git a/src/main.rs b/src/main.rs index cb9d936..fcbf0d6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -361,6 +361,8 @@ async fn main() -> Result<()> { } if count > 0 { info!("Received {count} Updates"); + } else { + relays.values_mut().for_each(|tasks| tasks.process_overflow()); } let mut iter = input.chars(); diff --git a/src/tasks.rs b/src/tasks.rs index e871d75..ebef86c 100644 --- a/src/tasks.rs +++ b/src/tasks.rs @@ -6,18 +6,18 @@ use std::ops::{Div, Rem}; use std::str::FromStr; use std::time::Duration; +use crate::helpers::{format_timestamp_local, format_timestamp_relative, format_timestamp_relative_to, parse_tracking_stamp, some_non_empty, CHARACTER_THRESHOLD}; +use crate::kinds::*; +use crate::task::{State, Task, TaskState, MARKER_DEPENDS, MARKER_PARENT}; +use crate::{EventSender, MostrMessage}; use colored::Colorize; use itertools::{Either, Itertools}; use log::{debug, error, info, trace, warn}; use nostr_sdk::prelude::Marker; use nostr_sdk::{Event, EventBuilder, EventId, JsonUtil, Keys, Kind, Metadata, PublicKey, Tag, TagStandard, Timestamp, UncheckedUrl, Url}; +use tokio::sync::mpsc::Sender; use TagStandard::Hashtag; -use crate::helpers::{format_timestamp_local, format_timestamp_relative, format_timestamp_relative_to, parse_tracking_stamp, some_non_empty, CHARACTER_THRESHOLD}; -use crate::kinds::*; -use crate::task::{State, Task, TaskState, MARKER_DEPENDS, MARKER_PARENT}; -use crate::{EventSender, MostrMessage}; - const MAX_OFFSET: u64 = 9; fn now() -> Timestamp { Timestamp::now() + MAX_OFFSET @@ -54,6 +54,7 @@ pub(crate) struct Tasks { state: StateFilter, sender: EventSender, + overflow: VecDeque, } #[derive(Clone, Debug, Default)] @@ -105,7 +106,7 @@ impl Display for StateFilter { impl Tasks { pub(crate) fn from( url: Option, - tx: &tokio::sync::mpsc::Sender, + tx: &Sender, keys: &Keys, metadata: Option, ) -> Self { @@ -142,7 +143,24 @@ impl Tasks { tags_excluded: Default::default(), state: Default::default(), depth: 1, + sender, + overflow: Default::default(), + } + } + + pub(crate) fn process_overflow(&mut self) { + let elements = self.overflow.len(); + for _ in 0..elements { + if let Some(event) = self.overflow.pop_front() { + if let Some(event) = self.add_prop(event) { + warn!("Unable to sort Event {:?}", event); + //self.overflow.push_back(event); + } + } + } + if elements > 0 { + info!("Reprocessed {elements} updates{}", self.sender.url.clone().map(|url| format!(" from {url}")).unwrap_or_default()); } } @@ -913,7 +931,10 @@ impl Tasks { None => { self.history.insert(event.pubkey, BTreeMap::from([(event.created_at, event)])); } } } else { - self.add_prop(event) + if let Some(event) = self.add_prop(event) { + debug!("Requeueing unknown Event {:?}", event); + self.overflow.push_back(event); + } } } } @@ -932,17 +953,19 @@ impl Tasks { } } - fn add_prop(&mut self, event: Event) { + /// Add event as prop, returning it if not processable + fn add_prop(&mut self, event: Event) -> Option { let found = self.referenced_tasks(&event, |t| { t.props.insert(event.clone()); }); if !found { if event.kind.as_u16() == 1 { self.add_task(event); - return; + } else { + return Some(event) } - warn!("Unknown event {:?}", event) } + None } fn get_own_history(&self) -> Option<&BTreeMap> { @@ -1385,7 +1408,7 @@ impl<'a> Iterator for ParentIterator<'a> { // Need to reverse add as well //assert!(t.children.contains(id)) if !t.children.contains(id) { - warn!("\"{}\" is missing child \"{}\"", t.get_title(), self.tasks.get(id).map_or(id.to_string(), |cht| cht.get_title())) + warn!("\"{}\" is not properly referencing child \"{}\"", t.get_title(), self.tasks.get(id).map_or(id.to_string(), |cht| cht.get_title())) } }); self.prev = self.current;