add setting

This commit is contained in:
sleptworld 2024-01-31 01:34:37 +08:00
parent e92ed0be34
commit b900397992
3 changed files with 266 additions and 59 deletions

View File

@ -1,11 +1,11 @@
use std::collections::HashMap;
use crate::{ use crate::{
coords::{proj::Mercator, Mapper}, coords::{proj::Mercator, Mapper},
data::{self, CoordType, Radar2d}, data::{self, CoordType, Radar2d},
pipeline::{ pipeline::{
offscreen_renderer::OffscreenRenderer, pool::Pool, render_pipeline::RenderResult, offscreen_renderer::OffscreenRenderer,
utils::Pipeline, pool::Pool,
render_pipeline::RenderResult,
utils::{Dispatcher, Pipeline},
}, },
plugin_system::init_plugin, plugin_system::init_plugin,
widgets::{ widgets::{
@ -14,6 +14,10 @@ use crate::{
}, },
PLUGIN_MANAGER, PLUGIN_MANAGER,
}; };
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use super::{ use super::{
control_panel::{ControlPanelInputMsg, ControlPanelModel}, control_panel::{ControlPanelInputMsg, ControlPanelModel},
@ -50,6 +54,7 @@ pub enum AppMsg {
} }
pub struct AppModel { pub struct AppModel {
dispatcher: Arc<Mutex<Dispatcher>>,
open_dialog: Controller<OpenDialog>, open_dialog: Controller<OpenDialog>,
control: Controller<ControlPanelModel>, control: Controller<ControlPanelModel>,
target_pipeline: HashMap<String, Pipeline>, target_pipeline: HashMap<String, Pipeline>,
@ -158,7 +163,18 @@ impl Component for AppModel {
let app = relm4::main_application(); let app = relm4::main_application();
relm4_icons::initialize_icons(); relm4_icons::initialize_icons();
let mut dispatcher = Dispatcher::new(5, 5, chrono::Duration::minutes(1));
let mut path_format = HashMap::new();
path_format.insert(
format!("DBZ"),
format!("/Volumes/data2/RadarArray/HangZhou/radarData/OutputProducts/RadarProducts/BasicProductsX/%Y%m%d/%Y%m%d%H%M%S"),
);
dispatcher.set_path_format(path_format);
let model = AppModel { let model = AppModel {
dispatcher: Arc::new(Mutex::new(dispatcher)),
open_dialog: dialog, open_dialog: dialog,
target_pipeline: HashMap::new(), target_pipeline: HashMap::new(),
control, control,
@ -212,36 +228,16 @@ impl Component for AppModel {
fn update(&mut self, msg: Self::Input, _sender: ComponentSender<Self>, root: &Self::Root) { fn update(&mut self, msg: Self::Input, _sender: ComponentSender<Self>, root: &Self::Root) {
match msg { match msg {
AppMsg::OpenFile((time, layer)) => { AppMsg::OpenFile((time, layer)) => {
if self.target_pipeline.contains_key(&layer.name) { self.dispatcher.lock().unwrap().set_current_time(time);
} else { if !self.target_pipeline.contains_key(&layer.name) {
let mut pipeline = Pipeline::new(10); let mut pipeline = Pipeline::new(10, layer.name.clone());
pipeline.add_task( pipeline.set_dispatcher(self.dispatcher.clone());
time.timestamp(), self.target_pipeline.insert(layer.name.clone(), pipeline);
Box::pin(async move {
let mut offscreen_render = OffscreenRenderer::new(3000, 3000).unwrap();
let canvas_wrapper = offscreen_render.create_canvas();
let canvas_mutex =
std::sync::Arc::new(std::sync::Mutex::new(canvas_wrapper));
let f = {
let p = layer.get_prepare();
let mut _p = p.lock().unwrap();
_p.take()
};
let target = if let Some(f) = f {
let imp = layer.get_imp().unwrap();
let map: Mapper = Mercator::default().into();
let cms = CMS::new(map, (3000.0, 3000.0));
let canvas = canvas_mutex.clone();
let c = f(imp, canvas, cms).await;
Some(c)
} else {
None
};
RenderResult::new(canvas_mutex, target.unwrap(), time)
}),
);
} }
// self.render.sender().emit(MonitorInputMsg::AddLayer(layer)); let mut pipeline = self.target_pipeline.get_mut(&layer.name).unwrap();
pipeline.set_current(time, true);
self.render.sender().emit(MonitorInputMsg::AddLayer(layer));
self.control self.control
.sender() .sender()
.emit(ControlPanelInputMsg::Selection(Some(time))); .emit(ControlPanelInputMsg::Selection(Some(time)));

View File

@ -64,4 +64,9 @@ pub enum PoolError {
pub enum RenderError { pub enum RenderError {
#[error("")] #[error("")]
PreRenderError(#[from] femtovg::ErrorKind), PreRenderError(#[from] femtovg::ErrorKind),
#[error("")]
None,
#[error("Canceled")]
Canceled,
} }

View File

@ -1,13 +1,21 @@
use super::pool::Pool;
use super::render_pipeline::RenderResult; use super::render_pipeline::RenderResult;
use crate::errors::RenderError; use super::{offscreen_renderer::OffscreenRenderer, pool::Pool};
use crate::coords::proj::Mercator;
use crate::coords::Mapper;
use crate::widgets::CMS;
use crate::{data::Radar2d, errors::RenderError, widgets::Layer, PLUGIN_MANAGER};
use chrono::{prelude::*, Duration}; use chrono::{prelude::*, Duration};
use futures::future::*;
use radarg_plugin_interface::*;
use regex::Regex; use regex::Regex;
use smallvec::SmallVec; use smallvec::SmallVec;
use std::{ use std::{
borrow::Borrow,
collections::{HashMap, VecDeque}, collections::{HashMap, VecDeque},
future::Future, future::Future,
ops::Deref,
pin::Pin, pin::Pin,
sync::Arc,
}; };
use tokio::sync::{oneshot, Mutex}; use tokio::sync::{oneshot, Mutex};
@ -17,45 +25,109 @@ pub fn ck() {
} }
} }
const DATATIME_FORMAT: regex::Regex = Regex::new(r"(?:%[YHMSmd](?:[-/:_]?%[YHMSmd])*)").unwrap();
pub struct Pipeline { pub struct Pipeline {
pool: VecDeque<Pin<Box<dyn Future<Output = Option<RenderResult>>>>>, pool: Vec<Pin<Box<dyn Future<Output = Result<RenderResult, RenderError>>>>>,
switcher: Pool<oneshot::Receiver<i32>>, switcher: Pool<oneshot::Receiver<i32>>,
results: Mutex<SmallVec<[RenderResult; 20]>>, results: Mutex<SmallVec<[RenderResult; 20]>>,
dispatcher: Option<Arc<std::sync::Mutex<Dispatcher>>>,
handlers: Option<oneshot::Receiver<i32>>,
key: String,
} }
impl Pipeline { impl Pipeline {
pub fn new(len: usize) -> Self { pub fn new(len: usize, key: String) -> Self {
Self { Self {
pool: VecDeque::new(), pool: Vec::new(),
switcher: Pool::new(len), switcher: Pool::new(len),
results: Mutex::new(SmallVec::new()), results: Mutex::new(SmallVec::new()),
dispatcher: None,
handlers: None,
key,
} }
} }
pub fn set_dispatcher(&mut self, dispatcher: Arc<std::sync::Mutex<Dispatcher>>) {
self.dispatcher = Some(dispatcher);
}
pub async fn run(&mut self) { pub async fn run(&mut self) {
while let Some(task) = self.pool.pop_front() { let (mut tx, rx) = oneshot::channel::<i32>();
let res = task.await; self.handlers = Some(rx);
if let Some(res) = res {
self.results.lock().await.push(res); let result = tokio::select! {
res = try_join_all(&mut self.pool) => res,
_ = tx.closed() => {
self.handlers = None;
Err(RenderError::Canceled)
}
};
}
pub fn set_current(&mut self, current_time: DateTime<Utc>, check_existed: bool) {
if let Some(mut rx) = self.handlers.take() {
rx.close();
}
let dispatcher = self.dispatcher.clone().unwrap();
let dispatcher = dispatcher.lock().unwrap();
let paths = dispatcher.get_path(&self.key, current_time, check_existed);
if let Some(paths) = paths {
for (path, datetime) in paths.into_iter() {
let future_task = async move {
let loader = PLUGIN_MANAGER.get_plugin_by_name("etws_loader").unwrap();
let mut loaded_data = loader.load(path.as_str().into()).unwrap();
let first_block = loaded_data.blocks.pop().unwrap();
if let Some((_, layer)) = data_to_layer(first_block) {
let mut offscreen_renderer = OffscreenRenderer::new(3000, 3000).unwrap();
let canvas_wrapper = offscreen_renderer.create_canvas();
let canvas_mutex =
std::sync::Arc::new(std::sync::Mutex::new(canvas_wrapper));
let f = {
let p = layer.get_prepare();
let mut _p = p.lock().unwrap();
_p.take()
};
let target = if let Some(f) = f {
let imp = layer.get_imp().unwrap();
let map: Mapper = Mercator::default().into();
let cms = CMS::new(map, (3000.0, 3000.0));
let canvas = canvas_mutex.clone();
let c = f(imp, canvas, cms).await;
Some(c)
} else {
None
};
Ok(RenderResult::new(canvas_mutex, target.unwrap(), datetime))
} else {
Err(RenderError::None)
}
};
self.add_task(datetime.timestamp(), future_task);
// self.add_task(datetime.timestamp(), future_task);
} }
} }
} }
pub fn add_task(&mut self, timestamp: i64, task: Pin<Box<dyn Future<Output = RenderResult>>>) { pub fn add_task<TASK>(&mut self, timestamp: i64, task: TASK)
where
TASK: Future<Output = Result<RenderResult, RenderError>> + 'static,
{
let (mut tx, rx) = oneshot::channel::<i32>(); let (mut tx, rx) = oneshot::channel::<i32>();
let future = async move { let future = async move {
tokio::select! { tokio::select! {
res = task => Some(res), res = task => res,
_ = tx.closed() => { _ = tx.closed() => {
println!("task canceled"); println!("task canceled");
None Err(RenderError::Canceled)
}, },
} }
}; };
self.pool.push_back(Box::pin(future)); self.pool.push(Box::pin(future));
self.switcher.add(rx, timestamp); self.switcher.add(rx, timestamp);
} }
@ -85,32 +157,166 @@ impl Dispatcher {
self.path_format = formats; self.path_format = formats;
} }
pub fn set_current_time(&mut self, datetime: DateTime<Utc>) {
self.datetime = datetime;
}
pub fn set_step(&mut self, step: Duration) {
self.step = step;
}
pub fn set_fore_len(&mut self, fore_len: usize) {
self.fore_len = fore_len;
}
pub fn set_back_len(&mut self, back_len: usize) {
self.back_len = back_len;
}
pub fn get_path( pub fn get_path(
&self, &self,
name: &str, name: &str,
current_time: DateTime<Utc>, current_time: DateTime<Utc>,
check_existed: bool, check_existed: bool,
) -> Option<String> { ) -> Option<Vec<(String, DateTime<Utc>)>> {
let datetime_format: regex::Regex =
Regex::new(r"(?:%[YHMSmd](?:[-/:_]?%[YHMSmd])*)").unwrap();
self.path_format.get(name).map(|s| { self.path_format.get(name).map(|s| {
let mut path = s.clone(); let mut path = s.clone();
let need_formated = DATATIME_FORMAT.captures_iter(&path); let need_formated = datetime_format.captures_iter(&path).collect::<Vec<_>>();
let mut fore = self.fore_len; let mut fore = self.fore_len;
let mut back = self.back_len; let mut back = self.back_len;
for f in 1..fore + 1 { let mut result_paths = Vec::new();
let t = current_time - self.step * f as i32;
for need_format in need_formated { while fore > 0 {
let t = t.format(&need_format[0]).to_string(); let t = current_time - self.step * fore as i32;
path = path.replace(&need_format[0], &t); for need_format in need_formated.iter() {
let fmt = need_format.get(0).unwrap().as_str();
let t = t.format(fmt).to_string();
path.replace(fmt, &t);
} }
if check_existed { if check_existed {
if std::path::Path::new(&path).exists() { if !std::path::Path::new(&path).exists() {
fore = f; continue;
break; } else {
result_paths.push((path.clone(), t));
} }
} else {
result_paths.push((path.clone(), t));
} }
fore = fore - 1;
} }
while back > 0 {
let t = current_time + self.step * fore as i32;
for need_format in need_formated.iter() {
let fmt = need_format.get(0).unwrap().as_str();
let t = t.format(fmt).to_string();
path.replace(fmt, &t);
}
if check_existed {
if !std::path::Path::new(&path).exists() {
continue;
} else {
result_paths.push((path.clone(), t));
}
} else {
result_paths.push((path.clone(), t));
}
back = back - 1;
}
result_paths
}) })
} }
} }
macro_rules! match_in_macro {
($block:ident,$name:literal, $(($branch:path, $t:ty, $color:expr)),+) => {
{
match $block.data_type {
$(
$branch => {
let data: $t = $block.into();
let layer = Layer::grid_render_layer(data, format!($name), $color);
Some((Utc::now() ,layer))
},
)+
_ => None
}
}
};
}
fn data_to_layer(block: Block) -> Option<(DateTime<Utc>, Layer)> {
use crate::utils::*;
use radarg_plugin_interface::PluginResultType;
match block.shape {
DataShape::Matrix => match_in_macro!(
block,
"DBZ",
(
PluginResultType::DBZ,
Radar2d<i8>,
create_dbz_boundarynorm()
),
(PluginResultType::R, Radar2d<i8>, create_dbz_boundarynorm()),
(PluginResultType::V, Radar2d<f32>, create_vel_boundarynorm()),
(
PluginResultType::ZDR,
Radar2d<f32>,
create_zdr_boundarynorm()
),
(
PluginResultType::PHIDP,
Radar2d<f32>,
create_phidp_boundarynorm()
),
(
PluginResultType::KDP,
Radar2d<f32>,
create_kdp_boundarynorm()
),
(PluginResultType::CC, Radar2d<f32>, create_cc_boundarynorm()),
(
PluginResultType::HCA,
Radar2d<i8>,
create_cpc_boundarynorm()
),
(
PluginResultType::QPE,
Radar2d<f32>,
create_vil_boundarynorm()
),
(
PluginResultType::QPF,
Radar2d<f32>,
create_vil_boundarynorm()
),
(
PluginResultType::VIL,
Radar2d<f32>,
create_vil_boundarynorm()
),
(
PluginResultType::OHP,
Radar2d<f32>,
create_vil_boundarynorm()
),
(
PluginResultType::THP,
Radar2d<f32>,
create_vil_boundarynorm()
),
(PluginResultType::ET, Radar2d<f32>, create_et_boundarynorm()),
(
PluginResultType::EB,
Radar2d<f32>,
create_hgt_boundarynorm()
)
),
_ => None,
}
}