feat(main): collapse more time tracking events through soft flushing

This commit is contained in:
xeruf 2024-08-02 14:29:52 +03:00
parent 9619435c03
commit e16e21a477
1 changed files with 19 additions and 8 deletions

View File

@ -34,11 +34,16 @@ struct EventSender {
} }
impl EventSender { impl EventSender {
fn submit(&self, event_builder: EventBuilder) -> Result<Event> { fn submit(&self, event_builder: EventBuilder) -> Result<Event> {
if let Some(event) = self.queue.borrow().first() { {
// Flush if oldest event older than a minute // Flush if oldest event older than a minute
if event.created_at < Timestamp::now().sub(60u64) { let borrow = self.queue.borrow();
debug!("Flushing Event Queue because it is older than a minute"); if let Some(event) = borrow.first() {
self.flush(); let old = event.created_at < Timestamp::now().sub(60u64);
drop(borrow);
if old {
debug!("Flushing event queue because it is older than a minute");
self.force_flush();
}
} }
} }
let mut queue = self.queue.borrow_mut(); let mut queue = self.queue.borrow_mut();
@ -51,10 +56,15 @@ impl EventSender {
queue.push(event.clone()); queue.push(event.clone());
})?) })?)
} }
fn flush(&self) { /// Sends all pending events
fn force_flush(&self) {
debug!("Flushing {} events from queue", self.queue.borrow().len()); debug!("Flushing {} events from queue", self.queue.borrow().len());
if self.queue.borrow().len() > 0 { or_print(self.tx.send(self.clear()));
or_print(self.tx.send(self.clear())); }
/// Sends all pending events if there is a non-tracking event
fn flush(&self) {
if self.queue.borrow().iter().any(|event| event.kind.as_u64() != TRACKING_KIND) {
self.force_flush()
} }
} }
fn clear(&self) -> Events { fn clear(&self) -> Events {
@ -67,7 +77,7 @@ impl EventSender {
} }
impl Drop for EventSender { impl Drop for EventSender {
fn drop(&mut self) { fn drop(&mut self) {
self.flush() self.force_flush()
} }
} }
@ -255,6 +265,7 @@ async fn main() {
}; };
match op { match op {
None => { None => {
debug!("Flushing Tasks because of empty command");
tasks.flush() tasks.flush()
} }