radar-g/src/pipeline/render_pipeline.rs
2024-03-05 20:39:32 +08:00

211 lines
6.3 KiB
Rust

use super::{
dispatcher::Dispatcher,
offscreen_renderer::{CanvasWrapper, OffscreenRenderer},
};
use crate::{
coords::{cms::CMS, proj::Mercator, Mapper},
data::MetaInfo,
errors::RenderError,
widgets::Layer,
PLUGIN_MANAGER,
};
use chrono::prelude::*;
use femtovg::{renderer::OpenGl, Canvas, ImageId};
use futures::{future::BoxFuture, Future};
use smallvec::SmallVec;
use std::{
cell::RefCell,
rc::Rc,
sync::{Arc, Mutex},
};
use tokio::{
sync::{mpsc, oneshot},
task,
};
#[derive(Clone, Debug)]
pub struct RenderResult {
pub meta: MetaInfo,
pub layer: Layer,
time: DateTime<Utc>,
}
impl RenderResult {
pub fn new(layer: Layer, time: DateTime<Utc>, meta: MetaInfo) -> Self {
Self { layer, time, meta }
}
pub fn timestamp(&self) -> i64 {
self.time.timestamp()
}
pub fn time(&self) -> DateTime<Utc> {
self.time
}
}
type RenderR = Result<RenderResult, RenderError>;
pub struct Pipeline {
pool: Vec<BoxFuture<'static, ()>>,
results: SmallVec<[RenderResult; 20]>,
dispatcher: Option<Rc<RefCell<Dispatcher>>>,
handlers: Option<Vec<oneshot::Receiver<RenderR>>>,
handler: Option<mpsc::Receiver<RenderR>>,
sender: Option<mpsc::Sender<RenderR>>,
key: String,
}
impl Pipeline {
pub fn new(len: usize, key: String) -> Self {
Self {
pool: Vec::new(),
results: SmallVec::new(),
dispatcher: None,
handlers: None,
handler: None,
sender: None,
key,
}
}
pub fn set_dispatcher(&mut self, dispatcher: Rc<RefCell<Dispatcher>>) {
self.dispatcher = Some(dispatcher);
}
pub fn init(&mut self) -> &mut Self {
self
}
pub fn set_current(
&mut self,
current_time: DateTime<Utc>,
check_existed: bool,
max_retry_time: usize,
) -> Option<Vec<DateTime<Utc>>> {
let paths = {
let dispatcher = self.dispatcher.as_ref().unwrap();
let dispatcher = dispatcher.borrow();
dispatcher.get_path(&self.key, current_time, check_existed, max_retry_time)
};
if let Some(paths) = paths {
let mut recvs = Vec::new();
let mut result = Vec::new();
for (path, datetime) in paths.into_iter() {
let (sender, mut receiver) = oneshot::channel::<RenderR>();
self.add_task(datetime.timestamp(), self.worker(datetime, path), sender);
recvs.push(receiver);
result.push(datetime);
}
self.handlers.replace(recvs);
Some(result)
} else {
None
}
}
pub fn work_num(&self) -> usize {
self.pool.len()
}
fn worker(
&self,
datetime: DateTime<Utc>,
path: impl AsRef<str> + Send + 'static,
) -> BoxFuture<'static, RenderR> {
Box::pin(async move {
let loader = PLUGIN_MANAGER.get_plugin_by_name("etws_loader").unwrap();
let mut loaded_data = loader.load(path.as_ref().into()).unwrap();
let first_block = loaded_data.blocks.pop().unwrap();
if let Some((_, layer)) = data_to_layer(first_block) {
let handle = task::spawn_blocking(move || {
let mut offscreen_renderer = OffscreenRenderer::new(3000, 3000).unwrap();
let canvas_wrapper = offscreen_renderer.create_canvas();
let canvas_mutex = std::sync::Arc::new(std::sync::Mutex::new(canvas_wrapper));
let f = {
let p = layer.get_prepare();
let mut _p = p.lock().unwrap();
_p.take()
};
let target = if let Some(f) = f {
let imp = layer.get_imp().unwrap();
let map: Mapper = Mercator::default().into();
let cms = CMS::new(map, (3000.0, 3000.0));
let canvas = canvas_mutex.clone();
let c = f(imp, canvas.clone(), cms);
let canvas = canvas.lock().unwrap();
Some(c)
} else {
None
};
layer.set_render_target(target.unwrap());
layer
});
let target = handle.await.unwrap();
Ok(RenderResult::new(target, datetime, loaded_data.meta.into()))
} else {
println!("no layer");
Err(RenderError::None)
}
})
}
pub fn add_task<TASK>(&mut self, timestamp: i64, task: TASK, tx: oneshot::Sender<RenderR>)
where
TASK: Future<Output = RenderR> + 'static + Send,
{
let future = async move {
let data = task.await;
tx.send(data).unwrap();
};
self.pool.push(Box::pin(future));
}
pub fn run(value: &mut Self) -> BoxFuture<'static, ()> {
let pool = value.get_pool();
Box::pin(async move {
for f in pool.into_iter() {
task::spawn(f);
// f.await;
}
})
}
pub fn listening<F>(&mut self, f: F) -> BoxFuture<'static, ()>
where
F: Fn(oneshot::Receiver<RenderR>, usize) -> BoxFuture<'static, ()> + Send + 'static + Sync,
{
let mut handler = self.handlers.take().unwrap();
Box::pin(async move {
let l = handler.into_iter().enumerate().map(|(h, i)| f(i, h));
for f in l.into_iter() {
f.await;
}
})
}
pub fn listening_one_by_one<F>(&mut self, f: Vec<F>) -> BoxFuture<'static, ()>
where
F: FnOnce(oneshot::Receiver<RenderR>) -> BoxFuture<'static, ()> + Send + 'static + Sync,
{
let mut handler = self.handlers.take().unwrap();
Box::pin(async move {
for (h, f) in handler.into_iter().zip(f) {
task::spawn(f(h));
// f(h).await;
}
})
}
pub fn get_pool(&mut self) -> Vec<BoxFuture<'static, ()>> {
use std::mem::replace;
let pool = replace(&mut self.pool, Vec::new());
pool
}
pub fn cancel_task(&mut self, timestamp: i64) {}
}