radar-g/src/pipeline/pool.rs
2024-01-31 20:08:53 +08:00

168 lines
4.6 KiB
Rust

use crate::errors::PoolError;
use smallvec::SmallVec;
use std::collections::VecDeque;
type PResult<T> = Result<T, PoolError>;
pub struct Pool<T>
where
T: Send + Sync,
{
items: VecDeque<(i64, T)>,
current: Option<(i64, usize)>,
len: usize,
}
impl<T> Pool<T>
where
T: Send + Sync,
{
pub fn new(len: usize) -> Self {
Pool {
items: VecDeque::new(),
current: None,
len,
}
}
pub fn init(&mut self, timestamp: i64, item: T) -> PResult<()> {
let len = self.items.len();
if len == 0 {
self.items.push_back((timestamp, item));
self.current.replace((timestamp, 0));
return Ok(());
} else {
return Err(PoolError::PoolInitialized("Pool is already initialized"));
}
}
pub fn add(&mut self, item: T, timestamp: i64) -> PResult<()> {
let len = self.items.len();
if len == self.len {
return Err(PoolError::PoolFull);
}
if len == 0 {
return Err(PoolError::PoolInitialized("Pool is not initialized"));
}
if len == 1 {
if self.items[0].0 < timestamp {
self.items.push_back((timestamp, item));
} else {
self.items.push_front((timestamp, item));
self.current.replace((timestamp, 1));
}
return Ok(());
}
let back = self
.items
.back()
.map(|(last_timestamp, _)| *last_timestamp < timestamp)
.unwrap();
let front = self
.items
.front()
.map(|(first_timestamp, _)| *first_timestamp > timestamp)
.unwrap();
if !(back || front) {
return Err(PoolError::TimestampError);
}
if back {
self.items.push_back((timestamp, item));
} else {
self.items.push_front((timestamp, item));
self.current.as_mut().map(|(_, index)| *index += 1);
}
Ok(())
}
pub fn prev(&self) -> Option<&T> {
self.current
.map(|(_, index)| &self.items[index])
.map(|(_, item)| item)
}
pub fn next(&mut self) -> Option<&T> {
let len = self.items.len();
let current = self.current.as_mut().map(|(_, index)| index);
if let Some(index) = current {
if *index + 1 < len {
*index += 1;
self.current
.as_ref()
.map(|(_, index)| &self.items[*index])
.map(|(_, item)| item)
} else {
None
}
} else {
None
}
}
pub fn get(&self, index: isize) -> Option<&T> {
self.current
.as_ref()
.map(|(_, current_index)| *current_index as isize + index)
.and_then(|index| {
if index >= 0 && index < self.items.len() as isize {
Some(&self.items[index as usize])
} else {
None
}
})
.map(|(_, item)| item)
}
pub fn get_mut(&mut self, index: isize) -> Option<&mut T> {
self.current
.as_ref()
.map(|(_, current_index)| *current_index as isize + index)
.and_then(|index| {
if index >= 0 && index < self.items.len() as isize {
Some(&mut self.items[index as usize])
} else {
None
}
})
.map(|(_, item)| item)
}
pub fn iter(&self) -> impl Iterator<Item = &T> {
self.items.iter().map(|(_, item)| item)
}
pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut T> {
self.items.iter_mut().map(|(_, item)| item)
}
pub fn set_current(&mut self, timestamp: i64, item: T) -> PResult<()> {
if self.items.len() == 0 {
return Err(PoolError::PoolInitialized("Pool is not initialized"));
}
let position = self.items.iter().position(|(time, _)| *time == timestamp);
if let Some(p) = position {
let start = (p - self.len / 2).max(0);
let end = (p + self.len / 2).min(self.items.len() - 1);
self.items = self.items.drain(start..end).collect();
self.current.replace((timestamp, self.len / 2));
return Ok(());
} else {
self.items.clear();
self.init(timestamp, item);
return Ok(());
}
}
pub fn len(&self) -> usize {
self.items.len()
}
}