From b900397992caa08e32dcf18c9f407fc2758f026e Mon Sep 17 00:00:00 2001 From: sleptworld Date: Wed, 31 Jan 2024 01:34:37 +0800 Subject: [PATCH] add setting --- src/components/app.rs | 62 +++++----- src/errors.rs | 5 + src/pipeline/utils.rs | 258 +++++++++++++++++++++++++++++++++++++----- 3 files changed, 266 insertions(+), 59 deletions(-) diff --git a/src/components/app.rs b/src/components/app.rs index 8b199dc..9fb8100 100644 --- a/src/components/app.rs +++ b/src/components/app.rs @@ -1,11 +1,11 @@ -use std::collections::HashMap; - use crate::{ coords::{proj::Mercator, Mapper}, data::{self, CoordType, Radar2d}, pipeline::{ - offscreen_renderer::OffscreenRenderer, pool::Pool, render_pipeline::RenderResult, - utils::Pipeline, + offscreen_renderer::OffscreenRenderer, + pool::Pool, + render_pipeline::RenderResult, + utils::{Dispatcher, Pipeline}, }, plugin_system::init_plugin, widgets::{ @@ -14,6 +14,10 @@ use crate::{ }, PLUGIN_MANAGER, }; +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, +}; use super::{ control_panel::{ControlPanelInputMsg, ControlPanelModel}, @@ -50,6 +54,7 @@ pub enum AppMsg { } pub struct AppModel { + dispatcher: Arc>, open_dialog: Controller, control: Controller, target_pipeline: HashMap, @@ -158,7 +163,18 @@ impl Component for AppModel { let app = relm4::main_application(); relm4_icons::initialize_icons(); + let mut dispatcher = Dispatcher::new(5, 5, chrono::Duration::minutes(1)); + + let mut path_format = HashMap::new(); + + path_format.insert( + format!("DBZ"), + format!("/Volumes/data2/RadarArray/HangZhou/radarData/OutputProducts/RadarProducts/BasicProductsX/%Y%m%d/%Y%m%d%H%M%S"), + ); + dispatcher.set_path_format(path_format); + let model = AppModel { + dispatcher: Arc::new(Mutex::new(dispatcher)), open_dialog: dialog, target_pipeline: HashMap::new(), control, @@ -212,36 +228,16 @@ impl Component for AppModel { fn update(&mut self, msg: Self::Input, _sender: ComponentSender, root: &Self::Root) { match msg { AppMsg::OpenFile((time, layer)) => { - if self.target_pipeline.contains_key(&layer.name) { - } else { - let mut pipeline = Pipeline::new(10); - pipeline.add_task( - time.timestamp(), - Box::pin(async move { - let mut offscreen_render = OffscreenRenderer::new(3000, 3000).unwrap(); - let canvas_wrapper = offscreen_render.create_canvas(); - let canvas_mutex = - std::sync::Arc::new(std::sync::Mutex::new(canvas_wrapper)); - let f = { - let p = layer.get_prepare(); - let mut _p = p.lock().unwrap(); - _p.take() - }; - let target = if let Some(f) = f { - let imp = layer.get_imp().unwrap(); - let map: Mapper = Mercator::default().into(); - let cms = CMS::new(map, (3000.0, 3000.0)); - let canvas = canvas_mutex.clone(); - let c = f(imp, canvas, cms).await; - Some(c) - } else { - None - }; - RenderResult::new(canvas_mutex, target.unwrap(), time) - }), - ); + self.dispatcher.lock().unwrap().set_current_time(time); + if !self.target_pipeline.contains_key(&layer.name) { + let mut pipeline = Pipeline::new(10, layer.name.clone()); + pipeline.set_dispatcher(self.dispatcher.clone()); + self.target_pipeline.insert(layer.name.clone(), pipeline); } - // self.render.sender().emit(MonitorInputMsg::AddLayer(layer)); + let mut pipeline = self.target_pipeline.get_mut(&layer.name).unwrap(); + pipeline.set_current(time, true); + + self.render.sender().emit(MonitorInputMsg::AddLayer(layer)); self.control .sender() .emit(ControlPanelInputMsg::Selection(Some(time))); diff --git a/src/errors.rs b/src/errors.rs index 549f22b..82f387c 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -64,4 +64,9 @@ pub enum PoolError { pub enum RenderError { #[error("")] PreRenderError(#[from] femtovg::ErrorKind), + #[error("")] + None, + #[error("Canceled")] + Canceled, + } diff --git a/src/pipeline/utils.rs b/src/pipeline/utils.rs index ed35010..3769bd7 100644 --- a/src/pipeline/utils.rs +++ b/src/pipeline/utils.rs @@ -1,13 +1,21 @@ -use super::pool::Pool; use super::render_pipeline::RenderResult; -use crate::errors::RenderError; +use super::{offscreen_renderer::OffscreenRenderer, pool::Pool}; +use crate::coords::proj::Mercator; +use crate::coords::Mapper; +use crate::widgets::CMS; +use crate::{data::Radar2d, errors::RenderError, widgets::Layer, PLUGIN_MANAGER}; use chrono::{prelude::*, Duration}; +use futures::future::*; +use radarg_plugin_interface::*; use regex::Regex; use smallvec::SmallVec; use std::{ + borrow::Borrow, collections::{HashMap, VecDeque}, future::Future, + ops::Deref, pin::Pin, + sync::Arc, }; use tokio::sync::{oneshot, Mutex}; @@ -17,45 +25,109 @@ pub fn ck() { } } -const DATATIME_FORMAT: regex::Regex = Regex::new(r"(?:%[YHMSmd](?:[-/:_]?%[YHMSmd])*)").unwrap(); - pub struct Pipeline { - pool: VecDeque>>>>, + pool: Vec>>>>, switcher: Pool>, results: Mutex>, + dispatcher: Option>>, + handlers: Option>, + key: String, } impl Pipeline { - pub fn new(len: usize) -> Self { + pub fn new(len: usize, key: String) -> Self { Self { - pool: VecDeque::new(), + pool: Vec::new(), switcher: Pool::new(len), results: Mutex::new(SmallVec::new()), + dispatcher: None, + handlers: None, + key, } } + pub fn set_dispatcher(&mut self, dispatcher: Arc>) { + self.dispatcher = Some(dispatcher); + } + pub async fn run(&mut self) { - while let Some(task) = self.pool.pop_front() { - let res = task.await; - if let Some(res) = res { - self.results.lock().await.push(res); + let (mut tx, rx) = oneshot::channel::(); + self.handlers = Some(rx); + + let result = tokio::select! { + res = try_join_all(&mut self.pool) => res, + _ = tx.closed() => { + self.handlers = None; + Err(RenderError::Canceled) + } + }; + } + + pub fn set_current(&mut self, current_time: DateTime, check_existed: bool) { + if let Some(mut rx) = self.handlers.take() { + rx.close(); + } + let dispatcher = self.dispatcher.clone().unwrap(); + let dispatcher = dispatcher.lock().unwrap(); + let paths = dispatcher.get_path(&self.key, current_time, check_existed); + + if let Some(paths) = paths { + for (path, datetime) in paths.into_iter() { + let future_task = async move { + let loader = PLUGIN_MANAGER.get_plugin_by_name("etws_loader").unwrap(); + let mut loaded_data = loader.load(path.as_str().into()).unwrap(); + let first_block = loaded_data.blocks.pop().unwrap(); + if let Some((_, layer)) = data_to_layer(first_block) { + let mut offscreen_renderer = OffscreenRenderer::new(3000, 3000).unwrap(); + let canvas_wrapper = offscreen_renderer.create_canvas(); + + let canvas_mutex = + std::sync::Arc::new(std::sync::Mutex::new(canvas_wrapper)); + + let f = { + let p = layer.get_prepare(); + let mut _p = p.lock().unwrap(); + _p.take() + }; + + let target = if let Some(f) = f { + let imp = layer.get_imp().unwrap(); + let map: Mapper = Mercator::default().into(); + let cms = CMS::new(map, (3000.0, 3000.0)); + let canvas = canvas_mutex.clone(); + let c = f(imp, canvas, cms).await; + Some(c) + } else { + None + }; + + Ok(RenderResult::new(canvas_mutex, target.unwrap(), datetime)) + } else { + Err(RenderError::None) + } + }; + self.add_task(datetime.timestamp(), future_task); + // self.add_task(datetime.timestamp(), future_task); } } } - pub fn add_task(&mut self, timestamp: i64, task: Pin>>) { + pub fn add_task(&mut self, timestamp: i64, task: TASK) + where + TASK: Future> + 'static, + { let (mut tx, rx) = oneshot::channel::(); let future = async move { tokio::select! { - res = task => Some(res), + res = task => res, _ = tx.closed() => { println!("task canceled"); - None + Err(RenderError::Canceled) }, } }; - self.pool.push_back(Box::pin(future)); + self.pool.push(Box::pin(future)); self.switcher.add(rx, timestamp); } @@ -85,32 +157,166 @@ impl Dispatcher { self.path_format = formats; } + pub fn set_current_time(&mut self, datetime: DateTime) { + self.datetime = datetime; + } + + pub fn set_step(&mut self, step: Duration) { + self.step = step; + } + + pub fn set_fore_len(&mut self, fore_len: usize) { + self.fore_len = fore_len; + } + + pub fn set_back_len(&mut self, back_len: usize) { + self.back_len = back_len; + } + pub fn get_path( &self, name: &str, current_time: DateTime, check_existed: bool, - ) -> Option { + ) -> Option)>> { + let datetime_format: regex::Regex = + Regex::new(r"(?:%[YHMSmd](?:[-/:_]?%[YHMSmd])*)").unwrap(); self.path_format.get(name).map(|s| { let mut path = s.clone(); - let need_formated = DATATIME_FORMAT.captures_iter(&path); - + let need_formated = datetime_format.captures_iter(&path).collect::>(); let mut fore = self.fore_len; let mut back = self.back_len; - for f in 1..fore + 1 { - let t = current_time - self.step * f as i32; - for need_format in need_formated { - let t = t.format(&need_format[0]).to_string(); - path = path.replace(&need_format[0], &t); + let mut result_paths = Vec::new(); + + while fore > 0 { + let t = current_time - self.step * fore as i32; + for need_format in need_formated.iter() { + let fmt = need_format.get(0).unwrap().as_str(); + let t = t.format(fmt).to_string(); + path.replace(fmt, &t); } if check_existed { - if std::path::Path::new(&path).exists() { - fore = f; - break; + if !std::path::Path::new(&path).exists() { + continue; + } else { + result_paths.push((path.clone(), t)); } + } else { + result_paths.push((path.clone(), t)); } + fore = fore - 1; } + while back > 0 { + let t = current_time + self.step * fore as i32; + for need_format in need_formated.iter() { + let fmt = need_format.get(0).unwrap().as_str(); + let t = t.format(fmt).to_string(); + path.replace(fmt, &t); + } + + if check_existed { + if !std::path::Path::new(&path).exists() { + continue; + } else { + result_paths.push((path.clone(), t)); + } + } else { + result_paths.push((path.clone(), t)); + } + back = back - 1; + } + + result_paths }) } } + +macro_rules! match_in_macro { + ($block:ident,$name:literal, $(($branch:path, $t:ty, $color:expr)),+) => { + { + match $block.data_type { + $( + $branch => { + let data: $t = $block.into(); + let layer = Layer::grid_render_layer(data, format!($name), $color); + Some((Utc::now() ,layer)) + }, + )+ + _ => None + } + } + + }; +} + +fn data_to_layer(block: Block) -> Option<(DateTime, Layer)> { + use crate::utils::*; + use radarg_plugin_interface::PluginResultType; + match block.shape { + DataShape::Matrix => match_in_macro!( + block, + "DBZ", + ( + PluginResultType::DBZ, + Radar2d, + create_dbz_boundarynorm() + ), + (PluginResultType::R, Radar2d, create_dbz_boundarynorm()), + (PluginResultType::V, Radar2d, create_vel_boundarynorm()), + ( + PluginResultType::ZDR, + Radar2d, + create_zdr_boundarynorm() + ), + ( + PluginResultType::PHIDP, + Radar2d, + create_phidp_boundarynorm() + ), + ( + PluginResultType::KDP, + Radar2d, + create_kdp_boundarynorm() + ), + (PluginResultType::CC, Radar2d, create_cc_boundarynorm()), + ( + PluginResultType::HCA, + Radar2d, + create_cpc_boundarynorm() + ), + ( + PluginResultType::QPE, + Radar2d, + create_vil_boundarynorm() + ), + ( + PluginResultType::QPF, + Radar2d, + create_vil_boundarynorm() + ), + ( + PluginResultType::VIL, + Radar2d, + create_vil_boundarynorm() + ), + ( + PluginResultType::OHP, + Radar2d, + create_vil_boundarynorm() + ), + ( + PluginResultType::THP, + Radar2d, + create_vil_boundarynorm() + ), + (PluginResultType::ET, Radar2d, create_et_boundarynorm()), + ( + PluginResultType::EB, + Radar2d, + create_hgt_boundarynorm() + ) + ), + _ => None, + } +}