forked from janek/mostr
1
0
Fork 0

refactor(event_sender): reformat

This commit is contained in:
xeruf 2024-11-11 13:17:30 +01:00
parent 5dfd7a084b
commit 5f8a232bd5
1 changed files with 23 additions and 7 deletions

View File

@ -67,7 +67,10 @@ impl EventSender {
{ {
// Always flush if oldest event older than a minute or newer than now // Always flush if oldest event older than a minute or newer than now
let borrow = self.queue.borrow(); let borrow = self.queue.borrow();
if borrow.iter().any(|e| e.created_at < min || e.created_at > Timestamp::now()) { if borrow
.iter()
.any(|e| e.created_at < min || e.created_at > Timestamp::now())
{
drop(borrow); drop(borrow);
debug!("Flushing event queue because it is older than a minute"); debug!("Flushing event queue because it is older than a minute");
self.force_flush(); self.force_flush();
@ -75,7 +78,10 @@ impl EventSender {
} }
let mut queue = self.queue.borrow_mut(); let mut queue = self.queue.borrow_mut();
Ok(event_builder.to_event(&self.keys).inspect(|event| { Ok(event_builder.to_event(&self.keys).inspect(|event| {
if event.kind == TRACKING_KIND && event.created_at > min && event.created_at < tasks::now() { if event.kind == TRACKING_KIND
&& event.created_at > min
&& event.created_at < tasks::now()
{
// Do not send redundant movements // Do not send redundant movements
queue.retain(|e| e.kind != TRACKING_KIND); queue.retain(|e| e.kind != TRACKING_KIND);
} }
@ -87,14 +93,25 @@ impl EventSender {
debug!("Flushing {} events from queue", self.queue.borrow().len()); debug!("Flushing {} events from queue", self.queue.borrow().len());
let values = self.clear(); let values = self.clear();
self.url.as_ref().map(|url| { self.url.as_ref().map(|url| {
self.tx.try_send(MostrMessage::AddTasks(url.clone(), values)).err().map(|e| { self.tx
error!("Nostr communication thread failure, changes will not be persisted: {}", e) .try_send(MostrMessage::AddTasks(url.clone(), values))
}) .err()
.map(|e| {
error!(
"Nostr communication thread failure, changes will not be persisted: {}",
e
)
})
}); });
} }
/// Sends all pending events if there is a non-tracking event /// Sends all pending events if there is a non-tracking event
pub(crate) fn flush(&self) { pub(crate) fn flush(&self) {
if self.queue.borrow().iter().any(|event| event.kind != TRACKING_KIND) { if self
.queue
.borrow()
.iter()
.any(|event| event.kind != TRACKING_KIND)
{
self.force_flush() self.force_flush()
} }
} }
@ -112,4 +129,3 @@ impl Drop for EventSender {
debug!("Dropped {:?}", self); debug!("Dropped {:?}", self);
} }
} }