feat: requeue events with missing references

This commit is contained in:
xeruf 2024-09-14 17:14:51 +03:00
parent cb75a5749f
commit 5e6b274fe3
2 changed files with 33 additions and 10 deletions

View File

@ -361,6 +361,8 @@ async fn main() -> Result<()> {
}
if count > 0 {
info!("Received {count} Updates");
} else {
relays.values_mut().for_each(|tasks| tasks.process_overflow());
}
let mut iter = input.chars();

View File

@ -6,18 +6,18 @@ use std::ops::{Div, Rem};
use std::str::FromStr;
use std::time::Duration;
use crate::helpers::{format_timestamp_local, format_timestamp_relative, format_timestamp_relative_to, parse_tracking_stamp, some_non_empty, CHARACTER_THRESHOLD};
use crate::kinds::*;
use crate::task::{State, Task, TaskState, MARKER_DEPENDS, MARKER_PARENT};
use crate::{EventSender, MostrMessage};
use colored::Colorize;
use itertools::{Either, Itertools};
use log::{debug, error, info, trace, warn};
use nostr_sdk::prelude::Marker;
use nostr_sdk::{Event, EventBuilder, EventId, JsonUtil, Keys, Kind, Metadata, PublicKey, Tag, TagStandard, Timestamp, UncheckedUrl, Url};
use tokio::sync::mpsc::Sender;
use TagStandard::Hashtag;
use crate::helpers::{format_timestamp_local, format_timestamp_relative, format_timestamp_relative_to, parse_tracking_stamp, some_non_empty, CHARACTER_THRESHOLD};
use crate::kinds::*;
use crate::task::{State, Task, TaskState, MARKER_DEPENDS, MARKER_PARENT};
use crate::{EventSender, MostrMessage};
const MAX_OFFSET: u64 = 9;
fn now() -> Timestamp {
Timestamp::now() + MAX_OFFSET
@ -54,6 +54,7 @@ pub(crate) struct Tasks {
state: StateFilter,
sender: EventSender,
overflow: VecDeque<Event>,
}
#[derive(Clone, Debug, Default)]
@ -105,7 +106,7 @@ impl Display for StateFilter {
impl Tasks {
pub(crate) fn from(
url: Option<Url>,
tx: &tokio::sync::mpsc::Sender<MostrMessage>,
tx: &Sender<MostrMessage>,
keys: &Keys,
metadata: Option<Metadata>,
) -> Self {
@ -142,10 +143,25 @@ impl Tasks {
tags_excluded: Default::default(),
state: Default::default(),
depth: 1,
sender,
overflow: Default::default(),
}
}
pub(crate) fn process_overflow(&mut self) {
let elements = self.overflow.len();
for _ in 0..elements {
if let Some(event) = self.overflow.pop_front() {
if let Some(event) = self.add_prop(event) {
warn!("Unable to sort Event {:?}", event);
//self.overflow.push_back(event);
}
}
}
info!("Reprocessed {elements} Updates {}", self.sender.url.clone().map(|url| format!(" from {url}")).unwrap_or_default());
}
// Accessors
#[inline]
@ -913,7 +929,10 @@ impl Tasks {
None => { self.history.insert(event.pubkey, BTreeMap::from([(event.created_at, event)])); }
}
} else {
self.add_prop(event)
if let Some(event) = self.add_prop(event) {
debug!("Requeueing unknown Event {:?}", event);
self.overflow.push_back(event);
}
}
}
}
@ -932,17 +951,19 @@ impl Tasks {
}
}
fn add_prop(&mut self, event: Event) {
/// Add event as prop, returning it if not processable
fn add_prop(&mut self, event: Event) -> Option<Event> {
let found = self.referenced_tasks(&event, |t| {
t.props.insert(event.clone());
});
if !found {
if event.kind.as_u16() == 1 {
self.add_task(event);
return;
} else {
return Some(event)
}
warn!("Unknown event {:?}", event)
}
None
}
fn get_own_history(&self) -> Option<&BTreeMap<Timestamp, Event>> {