use super::render_pipeline::RenderResult; use super::{offscreen_renderer::OffscreenRenderer, pool::DataPool}; use crate::components::app::Buffer; use crate::coords::proj::Mercator; use crate::coords::Mapper; use crate::widgets::{Render, Target, TargetType, CMS}; use crate::{data::Radar2d, errors::RenderError, widgets::Layer, PLUGIN_MANAGER}; use chrono::{prelude::*, Duration}; use euclid::approxord::max; use futures::future::*; use radarg_plugin_interface::*; use regex::Regex; use smallvec::SmallVec; use std::cell::{Ref, RefCell}; use std::rc::Rc; use std::sync::Mutex; use std::{ borrow::Borrow, collections::{HashMap, VecDeque}, future::Future, ops::Deref, pin::Pin, sync::Arc, }; use tokio::sync::{mpsc, oneshot}; use tokio::task; pub fn ck() { unsafe { debug_assert_eq!(gl::GetError(), gl::NO_ERROR); } } type RenderR = Result; pub struct Pipeline { pool: Vec>, results: SmallVec<[RenderResult; 20]>, dispatcher: Option>>, handlers: Option>>, handler: Option>, sender: Option>, key: String, } impl Pipeline { pub fn new(len: usize, key: String) -> Self { Self { pool: Vec::new(), results: SmallVec::new(), dispatcher: None, handlers: None, handler: None, sender: None, key, } } pub fn set_dispatcher(&mut self, dispatcher: Rc>) { self.dispatcher = Some(dispatcher); } pub fn init(&mut self) -> &mut Self { self } pub fn set_current( &mut self, current_time: DateTime, check_existed: bool, max_retry_time: usize, ) -> Option>> { let dispatcher = self.dispatcher.clone().unwrap(); let dispatcher = dispatcher.borrow_mut(); let paths = dispatcher.get_path(&self.key, current_time, check_existed, max_retry_time); if let Some(paths) = paths { let mut recvs = Vec::new(); let mut result = Vec::new(); for (path, datetime) in paths.into_iter() { // let sender = self.sender.clone().unwrap(); let (sender, mut receiver) = oneshot::channel::(); self.add_task(datetime.timestamp(), self.worker(datetime, path), sender); recvs.push(receiver); result.push(datetime); } self.handlers.replace(recvs); Some(result) } else { None } } pub fn work_num(&self) -> usize { // self.pool.as_ref().unwrap().len() self.pool.len() } fn worker( &self, datetime: DateTime, path: impl AsRef + Send + 'static, ) -> BoxFuture<'static, RenderR> { Box::pin(async move { let loader = PLUGIN_MANAGER.get_plugin_by_name("etws_loader").unwrap(); let mut loaded_data = loader.load(path.as_ref().into()).unwrap(); let first_block = loaded_data.blocks.pop().unwrap(); if let Some((_, layer)) = data_to_layer(first_block) { let handle = task::spawn_blocking(move || { let mut offscreen_renderer = OffscreenRenderer::new(3000, 3000).unwrap(); let canvas_wrapper = offscreen_renderer.create_canvas(); let canvas_mutex = std::sync::Arc::new(std::sync::Mutex::new(canvas_wrapper)); let f = { let p = layer.get_prepare(); let mut _p = p.lock().unwrap(); _p.take() }; let target = if let Some(f) = f { let imp = layer.get_imp().unwrap(); let map: Mapper = Mercator::default().into(); let cms = CMS::new(map, (3000.0, 3000.0)); let canvas = canvas_mutex.clone(); let c = f(imp, canvas.clone(), cms); let canvas = canvas.lock().unwrap(); Some(c) } else { None }; layer.set_render_target(target.unwrap()); layer }); let target = handle.await.unwrap(); Ok(RenderResult::new(target, datetime)) } else { println!("no layer"); Err(RenderError::None) } }) } pub fn add_task(&mut self, timestamp: i64, task: TASK, tx: oneshot::Sender) where TASK: Future + 'static + Send, { let future = async move { let data = task.await; tx.send(data).unwrap(); }; self.pool.push(Box::pin(future)); } pub fn run(value: &mut Self) -> BoxFuture<'static, ()> { let pool = value.get_pool(); Box::pin(async move { for f in pool.into_iter() { task::spawn(f); // f.await; } }) } pub fn listening(&mut self, f: F) -> BoxFuture<'static, ()> where F: Fn(oneshot::Receiver, usize) -> BoxFuture<'static, ()> + Send + 'static + Sync, { let mut handler = self.handlers.take().unwrap(); Box::pin(async move { let l = handler.into_iter().enumerate().map(|(h, i)| f(i, h)); for f in l.into_iter() { f.await; } }) } pub fn listening_one_by_one(&mut self, f: Vec) -> BoxFuture<'static, ()> where F: FnOnce(oneshot::Receiver) -> BoxFuture<'static, ()> + Send + 'static + Sync, { let mut handler = self.handlers.take().unwrap(); Box::pin(async move { for (h, f) in handler.into_iter().zip(f) { task::spawn(f(h)); // f(h).await; } }) } pub fn get_pool(&mut self) -> Vec> { // self.pool.clone() use std::mem::replace; let pool = replace(&mut self.pool, Vec::new()); pool } pub fn cancel_task(&mut self, timestamp: i64) {} } pub struct Dispatcher { datetime: DateTime, path_format: HashMap, fore_len: usize, back_len: usize, step: Duration, registered_buffer: Buffer, } impl Dispatcher { pub fn new( fore_len: usize, back_len: usize, step: Duration, registered_buffer: Buffer, ) -> Self { Self { datetime: Utc::now(), path_format: HashMap::new(), fore_len, back_len, step, registered_buffer, } } pub fn set_path_format(&mut self, formats: HashMap) { self.path_format = formats; } pub fn set_current_time(&mut self, datetime: DateTime) { self.datetime = datetime; } pub fn set_step(&mut self, step: Duration) { self.step = step; } pub fn set_fore_len(&mut self, fore_len: usize) { self.fore_len = fore_len; } pub fn set_back_len(&mut self, back_len: usize) { self.back_len = back_len; } pub fn get_single_path( &self, name: &str, current_time: DateTime, check_existed: bool, ) -> Option { let datetime_format: regex::Regex = Regex::new(r"(?:%[YHMSmd](?:[-/:_]?%[YHMSmd])*)").unwrap(); let c = self.path_format.get(name).map(|s| { let path = s.clone(); let need_formated = datetime_format.captures_iter(&path).collect::>(); let mut result_path = path.clone(); for need_format in need_formated.iter() { let fmt = need_format.get(0).unwrap().as_str(); let t = current_time.format(fmt).to_string(); result_path = result_path.replace(fmt, &t); } result_path }); if let Some(c) = c { if check_existed { if std::path::Path::new(&c).exists() { Some(c) } else { None } } else { Some(c) } } else { None } } pub fn get_path( &self, name: &str, current_time: DateTime, check_existed: bool, mut max_retry_time: usize, ) -> Option)>> { let datetime_format: regex::Regex = Regex::new(r"(?:%[YHMSmd](?:[-/:_]?%[YHMSmd])*)").unwrap(); self.path_format.get(name).map(|s| { let path = s.clone(); let need_formated = datetime_format.captures_iter(&path).collect::>(); let mut fore = self.fore_len; let mut back = 0; let mut result_paths = Vec::new(); let buffer: Ref<'_, HashMap<_, _>> = (*self.registered_buffer).borrow(); let buffer = buffer.get(name).unwrap(); while fore > 0 { let mut result_path = path.clone(); let t = current_time - self.step * fore as i32; if buffer.get(&t).is_some() { fore = fore - 1; continue; } for need_format in need_formated.iter() { let fmt = need_format.get(0).unwrap().as_str(); let t = t.format(fmt).to_string(); result_path = result_path.replace(fmt, &t); } if check_existed { if max_retry_time == 0 { break; } if !std::path::Path::new(&result_path).exists() { max_retry_time = max_retry_time - 1; continue; } else { result_paths.push((result_path.clone(), t)); } } else { result_paths.push((result_path.clone(), t)); } fore = fore - 1; } while back < self.back_len { let mut result_path = path.clone(); let t = current_time + self.step * back as i32; if buffer.get(&t).is_some() { back = back + 1; continue; } for need_format in need_formated.iter() { let fmt = need_format.get(0).unwrap().as_str(); let t = t.format(fmt).to_string(); result_path = result_path.replace(fmt, &t); } if check_existed { if max_retry_time == 0 { break; } if !std::path::Path::new(&result_path).exists() { max_retry_time = max_retry_time - 1; continue; } else { result_paths.push((result_path.clone(), t)); } } else { result_paths.push((result_path.clone(), t)); } back = back + 1; } result_paths }) } } macro_rules! match_in_macro { ($block:ident,$name:literal, $(($branch:path, $t:ty, $color:expr)),+) => { { let datetime = Utc.timestamp_opt($block.datetime, 0).unwrap(); match $block.data_type { $( $branch => { let data: $t = $block.into(); let layer = Layer::grid_render_layer(data, format!($name), $color); Some(( datetime ,layer)) }, )+ _ => None } } }; } pub fn data_to_layer(block: Block) -> Option<(DateTime, Layer)> { use crate::utils::*; use radarg_plugin_interface::PluginResultType; match block.shape { DataShape::Matrix => match_in_macro!( block, "DBZ", ( PluginResultType::DBZ, Radar2d, create_dbz_boundarynorm() ), (PluginResultType::R, Radar2d, create_dbz_boundarynorm()), (PluginResultType::V, Radar2d, create_vel_boundarynorm()), ( PluginResultType::ZDR, Radar2d, create_zdr_boundarynorm() ), ( PluginResultType::PHIDP, Radar2d, create_phidp_boundarynorm() ), ( PluginResultType::KDP, Radar2d, create_kdp_boundarynorm() ), (PluginResultType::CC, Radar2d, create_cc_boundarynorm()), ( PluginResultType::HCA, Radar2d, create_cpc_boundarynorm() ), ( PluginResultType::QPE, Radar2d, create_vil_boundarynorm() ), ( PluginResultType::QPF, Radar2d, create_vil_boundarynorm() ), ( PluginResultType::VIL, Radar2d, create_vil_boundarynorm() ), ( PluginResultType::OHP, Radar2d, create_vil_boundarynorm() ), ( PluginResultType::THP, Radar2d, create_vil_boundarynorm() ), (PluginResultType::ET, Radar2d, create_et_boundarynorm()), ( PluginResultType::EB, Radar2d, create_hgt_boundarynorm() ) ), _ => None, } } // Pin, RenderError>> + Send + 'static>>