From 5e6b274fe313aff6c29f53482296c672f2673282 Mon Sep 17 00:00:00 2001 From: xeruf <27jf@pm.me> Date: Sat, 14 Sep 2024 17:14:51 +0300 Subject: [PATCH] feat: requeue events with missing references --- src/main.rs | 2 ++ src/tasks.rs | 41 +++++++++++++++++++++++++++++++---------- 2 files changed, 33 insertions(+), 10 deletions(-) diff --git a/src/main.rs b/src/main.rs index cb9d936..fcbf0d6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -361,6 +361,8 @@ async fn main() -> Result<()> { } if count > 0 { info!("Received {count} Updates"); + } else { + relays.values_mut().for_each(|tasks| tasks.process_overflow()); } let mut iter = input.chars(); diff --git a/src/tasks.rs b/src/tasks.rs index e871d75..0e6c627 100644 --- a/src/tasks.rs +++ b/src/tasks.rs @@ -6,18 +6,18 @@ use std::ops::{Div, Rem}; use std::str::FromStr; use std::time::Duration; +use crate::helpers::{format_timestamp_local, format_timestamp_relative, format_timestamp_relative_to, parse_tracking_stamp, some_non_empty, CHARACTER_THRESHOLD}; +use crate::kinds::*; +use crate::task::{State, Task, TaskState, MARKER_DEPENDS, MARKER_PARENT}; +use crate::{EventSender, MostrMessage}; use colored::Colorize; use itertools::{Either, Itertools}; use log::{debug, error, info, trace, warn}; use nostr_sdk::prelude::Marker; use nostr_sdk::{Event, EventBuilder, EventId, JsonUtil, Keys, Kind, Metadata, PublicKey, Tag, TagStandard, Timestamp, UncheckedUrl, Url}; +use tokio::sync::mpsc::Sender; use TagStandard::Hashtag; -use crate::helpers::{format_timestamp_local, format_timestamp_relative, format_timestamp_relative_to, parse_tracking_stamp, some_non_empty, CHARACTER_THRESHOLD}; -use crate::kinds::*; -use crate::task::{State, Task, TaskState, MARKER_DEPENDS, MARKER_PARENT}; -use crate::{EventSender, MostrMessage}; - const MAX_OFFSET: u64 = 9; fn now() -> Timestamp { Timestamp::now() + MAX_OFFSET @@ -54,6 +54,7 @@ pub(crate) struct Tasks { state: StateFilter, sender: EventSender, + overflow: VecDeque, } #[derive(Clone, Debug, Default)] @@ -105,7 +106,7 @@ impl Display for StateFilter { impl Tasks { pub(crate) fn from( url: Option, - tx: &tokio::sync::mpsc::Sender, + tx: &Sender, keys: &Keys, metadata: Option, ) -> Self { @@ -142,10 +143,25 @@ impl Tasks { tags_excluded: Default::default(), state: Default::default(), depth: 1, + sender, + overflow: Default::default(), } } + pub(crate) fn process_overflow(&mut self) { + let elements = self.overflow.len(); + for _ in 0..elements { + if let Some(event) = self.overflow.pop_front() { + if let Some(event) = self.add_prop(event) { + warn!("Unable to sort Event {:?}", event); + //self.overflow.push_back(event); + } + } + } + info!("Reprocessed {elements} Updates {}", self.sender.url.clone().map(|url| format!(" from {url}")).unwrap_or_default()); + } + // Accessors #[inline] @@ -913,7 +929,10 @@ impl Tasks { None => { self.history.insert(event.pubkey, BTreeMap::from([(event.created_at, event)])); } } } else { - self.add_prop(event) + if let Some(event) = self.add_prop(event) { + debug!("Requeueing unknown Event {:?}", event); + self.overflow.push_back(event); + } } } } @@ -932,17 +951,19 @@ impl Tasks { } } - fn add_prop(&mut self, event: Event) { + /// Add event as prop, returning it if not processable + fn add_prop(&mut self, event: Event) -> Option { let found = self.referenced_tasks(&event, |t| { t.props.insert(event.clone()); }); if !found { if event.kind.as_u16() == 1 { self.add_task(event); - return; + } else { + return Some(event) } - warn!("Unknown event {:?}", event) } + None } fn get_own_history(&self) -> Option<&BTreeMap> {