radar-g/src/pipeline/utils.rs
2024-02-06 15:09:01 +08:00

491 lines
15 KiB
Rust

use super::render_pipeline::RenderResult;
use super::{offscreen_renderer::OffscreenRenderer, pool::DataPool};
use crate::components::app::Buffer;
use crate::coords::proj::Mercator;
use crate::coords::Mapper;
use crate::widgets::{Render, Target, TargetType, CMS};
use crate::CONFIG;
use crate::{data::Radar2d, errors::RenderError, widgets::Layer, PLUGIN_MANAGER};
use chrono::{prelude::*, Duration};
use euclid::approxord::max;
use futures::future::*;
use radarg_plugin_interface::*;
use regex::Regex;
use smallvec::SmallVec;
use std::cell::{Ref, RefCell};
use std::rc::Rc;
use std::sync::Mutex;
use std::{
borrow::Borrow,
collections::{HashMap, VecDeque},
future::Future,
ops::Deref,
pin::Pin,
sync::Arc,
};
use tokio::sync::{mpsc, oneshot};
use tokio::task;
pub fn ck() {
unsafe {
debug_assert_eq!(gl::GetError(), gl::NO_ERROR);
}
}
type RenderR = Result<RenderResult, RenderError>;
pub struct Pipeline {
pool: Vec<BoxFuture<'static, ()>>,
results: SmallVec<[RenderResult; 20]>,
dispatcher: Option<Rc<RefCell<Dispatcher>>>,
handlers: Option<Vec<oneshot::Receiver<RenderR>>>,
handler: Option<mpsc::Receiver<RenderR>>,
sender: Option<mpsc::Sender<RenderR>>,
key: String,
}
impl Pipeline {
pub fn new(len: usize, key: String) -> Self {
Self {
pool: Vec::new(),
results: SmallVec::new(),
dispatcher: None,
handlers: None,
handler: None,
sender: None,
key,
}
}
pub fn set_dispatcher(&mut self, dispatcher: Rc<RefCell<Dispatcher>>) {
self.dispatcher = Some(dispatcher);
}
pub fn init(&mut self) -> &mut Self {
self
}
pub fn set_current(
&mut self,
current_time: DateTime<Utc>,
check_existed: bool,
max_retry_time: usize,
) -> Option<Vec<DateTime<Utc>>> {
let dispatcher = self.dispatcher.clone().unwrap();
let dispatcher = dispatcher.borrow_mut();
let paths = dispatcher.get_path(&self.key, current_time, check_existed, max_retry_time);
if let Some(paths) = paths {
let mut recvs = Vec::new();
let mut result = Vec::new();
for (path, datetime) in paths.into_iter() {
// let sender = self.sender.clone().unwrap();
let (sender, mut receiver) = oneshot::channel::<RenderR>();
self.add_task(datetime.timestamp(), self.worker(datetime, path), sender);
recvs.push(receiver);
result.push(datetime);
}
self.handlers.replace(recvs);
Some(result)
} else {
None
}
}
pub fn work_num(&self) -> usize {
// self.pool.as_ref().unwrap().len()
self.pool.len()
}
fn worker(
&self,
datetime: DateTime<Utc>,
path: impl AsRef<str> + Send + 'static,
) -> BoxFuture<'static, RenderR> {
Box::pin(async move {
let loader = PLUGIN_MANAGER.get_plugin_by_name("etws_loader").unwrap();
let mut loaded_data = loader.load(path.as_ref().into()).unwrap();
let first_block = loaded_data.blocks.pop().unwrap();
if let Some((_, layer)) = data_to_layer(first_block) {
let handle = task::spawn_blocking(move || {
let mut offscreen_renderer = OffscreenRenderer::new(3000, 3000).unwrap();
let canvas_wrapper = offscreen_renderer.create_canvas();
let canvas_mutex = std::sync::Arc::new(std::sync::Mutex::new(canvas_wrapper));
let f = {
let p = layer.get_prepare();
let mut _p = p.lock().unwrap();
_p.take()
};
let target = if let Some(f) = f {
let imp = layer.get_imp().unwrap();
let map: Mapper = Mercator::default().into();
let cms = CMS::new(map, (3000.0, 3000.0));
let canvas = canvas_mutex.clone();
let c = f(imp, canvas.clone(), cms);
let canvas = canvas.lock().unwrap();
Some(c)
} else {
None
};
layer.set_render_target(target.unwrap());
layer
});
let target = handle.await.unwrap();
Ok(RenderResult::new(target, datetime))
} else {
println!("no layer");
Err(RenderError::None)
}
})
}
pub fn add_task<TASK>(&mut self, timestamp: i64, task: TASK, tx: oneshot::Sender<RenderR>)
where
TASK: Future<Output = RenderR> + 'static + Send,
{
let future = async move {
let data = task.await;
tx.send(data).unwrap();
};
self.pool.push(Box::pin(future));
}
pub fn run(value: &mut Self) -> BoxFuture<'static, ()> {
let pool = value.get_pool();
Box::pin(async move {
for f in pool.into_iter() {
task::spawn(f);
// f.await;
}
})
}
pub fn listening<F>(&mut self, f: F) -> BoxFuture<'static, ()>
where
F: Fn(oneshot::Receiver<RenderR>, usize) -> BoxFuture<'static, ()> + Send + 'static + Sync,
{
let mut handler = self.handlers.take().unwrap();
Box::pin(async move {
let l = handler.into_iter().enumerate().map(|(h, i)| f(i, h));
for f in l.into_iter() {
f.await;
}
})
}
pub fn listening_one_by_one<F>(&mut self, f: Vec<F>) -> BoxFuture<'static, ()>
where
F: FnOnce(oneshot::Receiver<RenderR>) -> BoxFuture<'static, ()> + Send + 'static + Sync,
{
let mut handler = self.handlers.take().unwrap();
Box::pin(async move {
for (h, f) in handler.into_iter().zip(f) {
task::spawn(f(h));
// f(h).await;
}
})
}
pub fn get_pool(&mut self) -> Vec<BoxFuture<'static, ()>> {
// self.pool.clone()
use std::mem::replace;
let pool = replace(&mut self.pool, Vec::new());
pool
}
pub fn cancel_task(&mut self, timestamp: i64) {}
}
pub struct Dispatcher {
datetime: DateTime<Utc>,
fore_len: usize,
back_len: usize,
step: Duration,
registered_buffer: Buffer,
}
impl Dispatcher {
pub fn new(
fore_len: usize,
back_len: usize,
step: Duration,
registered_buffer: Buffer,
) -> Self {
// let config = CONFIG.lock().unwrap();
// let config = CONFIG.lock().unwrap();
// let etws_loader = CONFIG.lock().unwrap().plugins.get("etws_loader").unwrap();
// let format = etws_loader.path_formats.as_ref();
Self {
datetime: Utc::now(),
// path_format: format,
fore_len,
back_len,
step,
registered_buffer,
}
}
// pub fn set_path_format(&mut self, formats: HashMap<String, String>) {
// self.path_format = formats;
// }
pub fn set_current_time(&mut self, datetime: DateTime<Utc>) {
self.datetime = datetime;
}
pub fn set_step(&mut self, step: Duration) {
self.step = step;
}
pub fn set_fore_len(&mut self, fore_len: usize) {
self.fore_len = fore_len;
}
pub fn set_back_len(&mut self, back_len: usize) {
self.back_len = back_len;
}
pub fn get_single_path(
&self,
name: &str,
current_time: DateTime<Utc>,
check_existed: bool,
) -> Option<String> {
let datetime_format: regex::Regex =
Regex::new(r"(?:%[YHMSmd](?:[-/:_]?%[YHMSmd])*)").unwrap();
let config = CONFIG.lock().unwrap();
let path_format = config
.plugins
.get("etws_loader")
.unwrap()
.path_formats
.as_ref();
if path_format.is_none() {
return None;
}
let c = path_format.unwrap().get(name).map(|s| {
let path = s.clone();
let need_formated = datetime_format.captures_iter(&path).collect::<Vec<_>>();
let mut result_path = path.clone();
for need_format in need_formated.iter() {
let fmt = need_format.get(0).unwrap().as_str();
let t = current_time.format(fmt).to_string();
result_path = result_path.replace(fmt, &t);
}
result_path
});
if let Some(c) = c {
if check_existed {
if std::path::Path::new(&c).exists() {
Some(c)
} else {
None
}
} else {
Some(c)
}
} else {
None
}
}
pub fn get_path(
&self,
name: &str,
current_time: DateTime<Utc>,
check_existed: bool,
mut max_retry_time: usize,
) -> Option<Vec<(String, DateTime<Utc>)>> {
let datetime_format: regex::Regex =
Regex::new(r"(?:%[YHMSmd](?:[-/:_]?%[YHMSmd])*)").unwrap();
let config = CONFIG.lock().unwrap();
let path_format = config
.plugins
.get("etws_loader")
.unwrap()
.path_formats
.as_ref();
if path_format.is_none() {
return None;
}
path_format.unwrap().get(name).map(|s| {
let path = s.clone();
let need_formated = datetime_format.captures_iter(&path).collect::<Vec<_>>();
let mut fore = self.fore_len;
let mut back = 0;
let mut result_paths = Vec::new();
let buffer: Ref<'_, HashMap<_, _>> = (*self.registered_buffer).borrow();
let buffer = buffer.get(name).unwrap();
while fore > 0 {
let mut result_path = path.clone();
let t = current_time - self.step * fore as i32;
if buffer.get(&t).is_some() {
fore = fore - 1;
continue;
}
for need_format in need_formated.iter() {
let fmt = need_format.get(0).unwrap().as_str();
let t = t.format(fmt).to_string();
result_path = result_path.replace(fmt, &t);
}
if check_existed {
if max_retry_time == 0 {
break;
}
if !std::path::Path::new(&result_path).exists() {
max_retry_time = max_retry_time - 1;
continue;
} else {
result_paths.push((result_path.clone(), t));
}
} else {
result_paths.push((result_path.clone(), t));
}
fore = fore - 1;
}
while back < self.back_len {
let mut result_path = path.clone();
let t = current_time + self.step * back as i32;
if buffer.get(&t).is_some() {
back = back + 1;
continue;
}
for need_format in need_formated.iter() {
let fmt = need_format.get(0).unwrap().as_str();
let t = t.format(fmt).to_string();
result_path = result_path.replace(fmt, &t);
}
if check_existed {
if max_retry_time == 0 {
break;
}
if !std::path::Path::new(&result_path).exists() {
max_retry_time = max_retry_time - 1;
continue;
} else {
result_paths.push((result_path.clone(), t));
}
} else {
result_paths.push((result_path.clone(), t));
}
back = back + 1;
}
result_paths
})
}
}
macro_rules! match_in_macro {
($block:ident,$(($branch:path,$name:literal, $t:ty, $color:expr)),+) => {
{
let datetime = Utc.timestamp_opt($block.datetime, 0).unwrap();
match $block.data_type {
$(
$branch => {
let data: $t = $block.into();
let layer = Layer::grid_render_layer(data, format!($name), $color);
Some(( datetime ,layer))
},
)+
_ => None
}
}
};
}
pub fn data_to_layer(block: Block) -> Option<(DateTime<Utc>, Layer)> {
use crate::utils::*;
use radarg_plugin_interface::PluginResultType;
match block.shape {
DataShape::Matrix => match_in_macro!(
block,
(
PluginResultType::DBZ,
"DBZ",
Radar2d<i8>,
create_dbz_boundarynorm()
),
(PluginResultType::R,"R", Radar2d<i8>, create_dbz_boundarynorm()),
(PluginResultType::V,"V", Radar2d<f32>, create_vel_boundarynorm()),
(
PluginResultType::ZDR,
"ZDR",
Radar2d<f32>,
create_zdr_boundarynorm()
),
(
PluginResultType::PHIDP,
"PHIDP",
Radar2d<f32>,
create_phidp_boundarynorm()
),
(
PluginResultType::KDP,
"KDP",
Radar2d<f32>,
create_kdp_boundarynorm()
),
(PluginResultType::CC,"CC", Radar2d<f32>, create_cc_boundarynorm()),
(
PluginResultType::HCA,
"HCA",
Radar2d<i8>,
create_cpc_boundarynorm()
),
(
PluginResultType::QPE,
"QPE",
Radar2d<f32>,
create_vil_boundarynorm()
),
(
PluginResultType::QPF,
"QPF",
Radar2d<f32>,
create_vil_boundarynorm()
),
(
PluginResultType::VIL,
"VIL",
Radar2d<f32>,
create_vil_boundarynorm()
),
(
PluginResultType::OHP,
"OHP",
Radar2d<f32>,
create_vil_boundarynorm()
),
(
PluginResultType::THP,
"THP",
Radar2d<f32>,
create_vil_boundarynorm()
),
(PluginResultType::ET,"ET", Radar2d<f32>, create_et_boundarynorm()),
(
PluginResultType::EB,
"EB",
Radar2d<f32>,
create_hgt_boundarynorm()
)
),
_ => None,
}
}
// Pin<Box<dyn Future<Output = Result<Vec<RenderResult>, RenderError>> + Send + 'static>>