forked from janek/mostr
1
0
Fork 0

feat: requeue events with missing references

This commit is contained in:
xeruf 2024-09-14 17:14:51 +03:00
parent cb75a5749f
commit 7755967a7a
2 changed files with 36 additions and 11 deletions

View File

@ -361,6 +361,8 @@ async fn main() -> Result<()> {
} }
if count > 0 { if count > 0 {
info!("Received {count} Updates"); info!("Received {count} Updates");
} else {
relays.values_mut().for_each(|tasks| tasks.process_overflow());
} }
let mut iter = input.chars(); let mut iter = input.chars();

View File

@ -6,18 +6,18 @@ use std::ops::{Div, Rem};
use std::str::FromStr; use std::str::FromStr;
use std::time::Duration; use std::time::Duration;
use crate::helpers::{format_timestamp_local, format_timestamp_relative, format_timestamp_relative_to, parse_tracking_stamp, some_non_empty, CHARACTER_THRESHOLD};
use crate::kinds::*;
use crate::task::{State, Task, TaskState, MARKER_DEPENDS, MARKER_PARENT};
use crate::{EventSender, MostrMessage};
use colored::Colorize; use colored::Colorize;
use itertools::{Either, Itertools}; use itertools::{Either, Itertools};
use log::{debug, error, info, trace, warn}; use log::{debug, error, info, trace, warn};
use nostr_sdk::prelude::Marker; use nostr_sdk::prelude::Marker;
use nostr_sdk::{Event, EventBuilder, EventId, JsonUtil, Keys, Kind, Metadata, PublicKey, Tag, TagStandard, Timestamp, UncheckedUrl, Url}; use nostr_sdk::{Event, EventBuilder, EventId, JsonUtil, Keys, Kind, Metadata, PublicKey, Tag, TagStandard, Timestamp, UncheckedUrl, Url};
use tokio::sync::mpsc::Sender;
use TagStandard::Hashtag; use TagStandard::Hashtag;
use crate::helpers::{format_timestamp_local, format_timestamp_relative, format_timestamp_relative_to, parse_tracking_stamp, some_non_empty, CHARACTER_THRESHOLD};
use crate::kinds::*;
use crate::task::{State, Task, TaskState, MARKER_DEPENDS, MARKER_PARENT};
use crate::{EventSender, MostrMessage};
const MAX_OFFSET: u64 = 9; const MAX_OFFSET: u64 = 9;
fn now() -> Timestamp { fn now() -> Timestamp {
Timestamp::now() + MAX_OFFSET Timestamp::now() + MAX_OFFSET
@ -54,6 +54,7 @@ pub(crate) struct Tasks {
state: StateFilter, state: StateFilter,
sender: EventSender, sender: EventSender,
overflow: VecDeque<Event>,
} }
#[derive(Clone, Debug, Default)] #[derive(Clone, Debug, Default)]
@ -105,7 +106,7 @@ impl Display for StateFilter {
impl Tasks { impl Tasks {
pub(crate) fn from( pub(crate) fn from(
url: Option<Url>, url: Option<Url>,
tx: &tokio::sync::mpsc::Sender<MostrMessage>, tx: &Sender<MostrMessage>,
keys: &Keys, keys: &Keys,
metadata: Option<Metadata>, metadata: Option<Metadata>,
) -> Self { ) -> Self {
@ -142,7 +143,24 @@ impl Tasks {
tags_excluded: Default::default(), tags_excluded: Default::default(),
state: Default::default(), state: Default::default(),
depth: 1, depth: 1,
sender, sender,
overflow: Default::default(),
}
}
pub(crate) fn process_overflow(&mut self) {
let elements = self.overflow.len();
for _ in 0..elements {
if let Some(event) = self.overflow.pop_front() {
if let Some(event) = self.add_prop(event) {
warn!("Unable to sort Event {:?}", event);
//self.overflow.push_back(event);
}
}
}
if elements > 0 {
info!("Reprocessed {elements} updates{}", self.sender.url.clone().map(|url| format!(" from {url}")).unwrap_or_default());
} }
} }
@ -913,7 +931,10 @@ impl Tasks {
None => { self.history.insert(event.pubkey, BTreeMap::from([(event.created_at, event)])); } None => { self.history.insert(event.pubkey, BTreeMap::from([(event.created_at, event)])); }
} }
} else { } else {
self.add_prop(event) if let Some(event) = self.add_prop(event) {
debug!("Requeueing unknown Event {:?}", event);
self.overflow.push_back(event);
}
} }
} }
} }
@ -932,17 +953,19 @@ impl Tasks {
} }
} }
fn add_prop(&mut self, event: Event) { /// Add event as prop, returning it if not processable
fn add_prop(&mut self, event: Event) -> Option<Event> {
let found = self.referenced_tasks(&event, |t| { let found = self.referenced_tasks(&event, |t| {
t.props.insert(event.clone()); t.props.insert(event.clone());
}); });
if !found { if !found {
if event.kind.as_u16() == 1 { if event.kind.as_u16() == 1 {
self.add_task(event); self.add_task(event);
return; } else {
return Some(event)
} }
warn!("Unknown event {:?}", event)
} }
None
} }
fn get_own_history(&self) -> Option<&BTreeMap<Timestamp, Event>> { fn get_own_history(&self) -> Option<&BTreeMap<Timestamp, Event>> {
@ -1385,7 +1408,7 @@ impl<'a> Iterator for ParentIterator<'a> {
// Need to reverse add as well // Need to reverse add as well
//assert!(t.children.contains(id)) //assert!(t.children.contains(id))
if !t.children.contains(id) { if !t.children.contains(id) {
warn!("\"{}\" is missing child \"{}\"", t.get_title(), self.tasks.get(id).map_or(id.to_string(), |cht| cht.get_title())) warn!("\"{}\" is not properly referencing child \"{}\"", t.get_title(), self.tasks.get(id).map_or(id.to_string(), |cht| cht.get_title()))
} }
}); });
self.prev = self.current; self.prev = self.current;