radar-g/src/pipeline/element.rs
2024-03-16 12:57:30 +08:00

552 lines
17 KiB
Rust

use super::{offscreen_renderer::CanvasWrapper, Dispatcher, Pipeline};
use crate::components::app::AppCommand;
use crate::components::ControlPanelInputMsg;
use crate::coords::cms::CMS;
use crate::data::MetaInfo;
use crate::errors::{PipelineError, RenderError};
use crate::widgets::Render;
use crate::RUNTIME;
use crate::{coords::Range, widgets::widget::Widget};
use chrono::{DateTime, TimeZone, Utc};
use core_extensions::SelfOps;
use femtovg::rgb::alt::GRAY8;
use femtovg::{renderer::OpenGl, Canvas, ImageFlags, ImageId, ImageInfo, PixelFormat};
use futures::StreamExt;
use radarg_plugin_interface::PluginResult;
use std::any::Any;
use std::borrow::Borrow;
use std::collections::{BTreeMap, HashMap};
use std::fmt::Formatter;
use std::rc::Rc;
use std::sync::atomic::AtomicUsize;
use std::{
cell::{Ref, RefCell},
fmt::Debug,
future::Future,
pin::Pin,
sync::{Arc, Mutex},
};
use tokio::sync::{
oneshot::{channel, Receiver, Sender},
Notify,
};
use tracing::Instrument;
pub type ElementID = usize;
static ELEMENT_ID: AtomicUsize = AtomicUsize::new(0);
pub type Data = Box<dyn Any + Send + Sync>;
pub type Buffer = Arc<Mutex<BTreeMap<DateTime<Utc>, Option<RenderResult>>>>;
type DrawFunc = Rc<Box<dyn Fn(&Render)>>;
type IResult<T> = Result<T, PipelineError>;
#[derive(Debug)]
pub enum Element {
TimeSeries(TimeSeriesElement),
Instant(InstantElement),
}
impl Element {
pub fn create_time_series(
imp: Arc<Box<dyn ElementImpl>>,
dispatcher: Rc<Dispatcher>,
key: String,
cms: CMS,
) -> Self {
Element::TimeSeries(TimeSeriesElement::new(imp, dispatcher, cms, key))
}
pub fn create_instant(
_type: InstantElementDrawerType,
dispatcher: Rc<Dispatcher>,
key: String,
) -> Self {
Element::Instant(InstantElement::new(_type, dispatcher, key))
}
pub fn id(&self) -> ElementID {
match self {
Element::TimeSeries(e) => e.id,
Element::Instant(e) => e.id,
}
}
pub fn key(&self) -> String {
match self {
Element::TimeSeries(e) => e.key.clone(),
Element::Instant(e) => e.key.clone(),
}
}
pub fn get_instance(mut self) -> InstantElement {
match self {
Element::TimeSeries(e) => panic!("TimeSeries element does not have instance"),
Element::Instant(v) => v,
}
}
pub fn get_time_series_instance(mut self) -> TimeSeriesElement {
match self {
Element::TimeSeries(v) => v,
Element::Instant(e) => panic!("Instant element does not have instance"),
}
}
}
#[derive(Debug)]
pub struct TimeSeriesElement {
pub id: ElementID,
pub key: String,
cms: CMS,
imp: Arc<Box<dyn ElementImpl>>,
registers: Arc<Mutex<HashMap<DateTime<Utc>, Vec<Arc<Notify>>>>>,
pipeline: Pipeline,
pub buffer: Buffer,
dispatcher: Rc<Dispatcher>,
}
#[derive(Clone)]
pub enum InstantElementDrawerType {
Draw(DrawFunc),
Prepared((Target, Arc<Box<dyn ElementImpl>>)),
}
impl Debug for InstantElementDrawerType {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InstantElementDrawerType").finish()
}
}
#[derive(Debug, Clone)]
pub struct InstantElement {
pub id: ElementID,
pub key: String,
draw_type: InstantElementDrawerType,
dispatcher: Rc<Dispatcher>,
}
pub trait ElementImpl: Debug + Send + Sync + 'static {
fn render(&self, data: &PluginResult, canvas: &mut CanvasWrapper, cms: &mut CMS) -> Target;
}
impl InstantElement {
fn new(_type: InstantElementDrawerType, dispatcher: Rc<Dispatcher>, key: String) -> Self {
let id = ELEMENT_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Self {
id,
key,
draw_type: _type,
dispatcher,
}
}
pub fn render(&mut self, render: &Render) {
match self.draw_type {
InstantElementDrawerType::Draw(ref func) => {
func(render);
}
InstantElementDrawerType::Prepared((ref mut target, _)) => {
let mut canvas = render.get_canvas();
let mut canvas = canvas.as_mut().unwrap();
let (ox, oy) = target.origin(render);
let (x, y) = target.size(render);
let result_id = match target.target {
TargetType::ImageId(id) => id,
TargetType::Mem(ref mem) => {
let gl_bind = render.get_context();
let gl = gl_bind.as_ref().unwrap();
let flags = ImageFlags::empty();
let texture = target.mem_to_native_texture(gl, flags);
let converted = canvas
.create_image_from_native_texture(
texture,
ImageInfo::new(flags, 3000, 3000, PixelFormat::Rgba8),
)
.unwrap();
target.set_target(TargetType::ImageId(converted));
converted
}
};
let paint = femtovg::Paint::image(result_id, ox, oy, x, y, 0.0, 1.0);
let mut path = femtovg::Path::new();
path.rect(ox, oy, x, y);
canvas.fill_path(&path, &paint);
}
}
}
pub fn to_time_series(
self,
dispatcher: Rc<Dispatcher>,
cms: CMS,
) -> (TimeSeriesElement, DateTime<Utc>) {
// let imp = Arc::new(InstantElementImpl::new(self));
if let InstantElementDrawerType::Prepared((target, imp)) = self.draw_type {
let mut time_series = TimeSeriesElement::new(imp, dispatcher, cms, self.key);
let data = target
.data
.clone()
.unwrap()
.downcast::<PluginResult>()
.unwrap();
let time_stamp = data.blocks.first().unwrap().datetime;
let meta_info: MetaInfo = data.meta.clone().into();
use chrono::prelude::*;
let time = Utc.timestamp_opt(time_stamp, 0).unwrap();
(*time_series.buffer)
.lock()
.unwrap()
.insert(time, Some(RenderResult::new(target, meta_info)));
(time_series, time)
} else {
panic!("InstantElementDrawerType is not prepared");
}
}
}
impl TimeSeriesElement {
fn new(
imp: Arc<Box<dyn ElementImpl>>,
dispatcher: Rc<Dispatcher>,
cms: CMS,
key: String,
) -> Self {
let id = ELEMENT_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let mut pipeline = Pipeline::new(20, key.clone());
pipeline.set_dispatcher(dispatcher.clone());
let buffer = Arc::new(Mutex::new(BTreeMap::new()));
Self {
id,
key,
imp,
cms,
registers: Arc::new(Mutex::new(HashMap::new())),
buffer,
dispatcher,
pipeline,
}
}
pub fn register(
&mut self,
datetime: DateTime<Utc>,
) -> IResult<Receiver<Result<RenderResult, RenderError>>> {
use tokio::sync::Notify;
use tokio::task;
let (sender, recv) = channel::<Result<RenderResult, RenderError>>();
self.change_time(datetime)?;
let buffer = self.buffer.lock().unwrap();
if buffer.contains_key(&datetime) {
let target = buffer.get(&datetime).unwrap();
// Already in buffer
if let Some(target) = target {
sender.send(Ok(target.clone())).unwrap();
} else {
let new_notifer = {
let n = Arc::new(Notify::new());
let n_clone = n.clone();
self.register_noti(datetime, n);
n_clone
};
let buffer = self.buffer.clone();
task::spawn_local(async move {
new_notifer.notified().await;
let result = buffer.lock().unwrap().get(&datetime).unwrap().clone();
sender.send(Ok(result.unwrap())).unwrap();
});
}
return Ok(recv);
} else {
return Err(PipelineError::DataError("No data found".to_string()));
}
}
pub fn set_cms(&mut self, cms: CMS) {
self.cms = cms;
}
fn register_noti(&self, datetime: DateTime<Utc>, noti: Arc<Notify>) {
self.registers
.lock()
.unwrap()
.entry(datetime)
.or_insert_with(Vec::new)
.push(noti);
}
fn change_time(&mut self, date_time: DateTime<Utc>) -> IResult<()> {
let imp = self.imp.clone();
println!("Change time to {:?}", date_time);
let tasks = self.pipeline.set_current(
date_time,
true,
3,
Arc::new(move |data, canvas, cms| imp.render(data, canvas, cms)),
self.cms.clone(),
);
let tasks = tasks.map(|tms| tms.into_iter().map(|time| (time, None)));
if let Some(tasks) = tasks {
if tasks.len() == 0 {
return Err(PipelineError::DataError("No data found".to_string()));
}
let mut buffer = self.buffer.lock().unwrap();
buffer.extend(tasks);
let buffer = self.buffer.clone();
let registers = self.registers.clone();
let listening_func = self.pipeline.listening(move |recv, idx| {
let buffer = buffer.clone();
let registers = registers.clone();
Box::pin(async move {
let registers = registers;
let (dt, result) = recv.await.unwrap();
if let Ok(result) = result {
let mut buffer = buffer.lock().unwrap();
*buffer.get_mut(&dt).unwrap() = Some(result);
}
{
registers.lock().unwrap().get_mut(&dt).map(|x| {
x.into_iter().for_each(|n| {
n.notify_waiters();
})
});
}
})
});
let runner = Pipeline::run(&mut self.pipeline);
RUNTIME.spawn(listening_func);
RUNTIME.spawn(runner);
return Ok(());
}
Err(PipelineError::DataError("No data found".to_string()))
}
}
#[derive(Debug, Clone)]
pub struct RenderResult {
target: Target,
meta_info: MetaInfo,
}
impl RenderResult {
pub fn new(target: Target, meta_info: MetaInfo) -> Self {
Self { target, meta_info }
}
pub fn get_meta_info(&self) -> MetaInfo {
self.meta_info.clone()
}
pub fn get_mut_target(&mut self) -> &mut Target {
&mut self.target
}
}
#[derive(Clone, Debug)]
pub struct Target {
pub target: TargetType,
pub thumbnail: Option<gtk::gdk::Texture>,
pub width: f32,
pub height: f32,
pub bounds: (Range, Range),
pub data: Option<Arc<dyn Any + Send + Sync + 'static>>,
}
#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord)]
pub enum TargetType {
ImageId(ImageId),
Mem(Vec<u8>),
}
impl Target {
pub fn new(
target: TargetType,
width: f32,
height: f32,
bounds: (Range, Range),
thumbnail: Option<gtk::gdk::Texture>,
data: Option<Arc<dyn Any + Send + Sync + 'static>>,
) -> Self {
Self {
target,
width,
height,
bounds,
thumbnail,
data,
}
}
pub fn size(&self, cms: &Render) -> (f32, f32) {
let (x, y) = self.bounds;
let p1 = (x.0, y.0);
let p2 = (x.1, y.1);
let (x1, y1) = cms.map(p1).unwrap();
let (x2, y2) = cms.map(p2).unwrap();
((x2 - x1).abs(), (y2 - y1).abs())
}
pub fn mem_to_native_texture(
&self,
gl: &glow::Context,
flags: ImageFlags,
) -> glow::NativeTexture {
if let TargetType::Mem(ref mem) = self.target {
use glow::*;
let texture = unsafe {
let id = gl.create_texture().unwrap();
gl.bind_texture(glow::TEXTURE_2D, Some(id));
gl.pixel_store_i32(glow::UNPACK_ALIGNMENT, 1);
gl.pixel_store_i32(glow::UNPACK_ROW_LENGTH, 3000 as i32);
gl.pixel_store_i32(glow::UNPACK_SKIP_PIXELS, 0);
gl.pixel_store_i32(glow::UNPACK_SKIP_ROWS, 0);
id
};
let width = 3000; // 纹理宽度
let height = 3000; // 纹理高度
unsafe {
gl.tex_image_2d(
glow::TEXTURE_2D,
0, // level
glow::RGBA as i32, // internal_format
width,
height,
0, // border
glow::RGBA, // format
glow::UNSIGNED_BYTE, // type
Some(&mem), // pixels
);
}
if flags.contains(ImageFlags::GENERATE_MIPMAPS) {
if flags.contains(ImageFlags::NEAREST) {
unsafe {
gl.tex_parameter_i32(
glow::TEXTURE_2D,
glow::TEXTURE_MIN_FILTER,
glow::NEAREST_MIPMAP_NEAREST as i32,
);
}
} else {
unsafe {
gl.tex_parameter_i32(
glow::TEXTURE_2D,
glow::TEXTURE_MIN_FILTER,
glow::LINEAR_MIPMAP_LINEAR as i32,
);
}
}
} else if flags.contains(ImageFlags::NEAREST) {
unsafe {
gl.tex_parameter_i32(
glow::TEXTURE_2D,
glow::TEXTURE_MIN_FILTER,
glow::NEAREST as i32,
);
}
} else {
unsafe {
gl.tex_parameter_i32(
glow::TEXTURE_2D,
glow::TEXTURE_MIN_FILTER,
glow::LINEAR as i32,
);
}
}
if flags.contains(ImageFlags::NEAREST) {
unsafe {
gl.tex_parameter_i32(
glow::TEXTURE_2D,
glow::TEXTURE_MAG_FILTER,
glow::NEAREST as i32,
);
}
} else {
unsafe {
gl.tex_parameter_i32(
glow::TEXTURE_2D,
glow::TEXTURE_MAG_FILTER,
glow::LINEAR as i32,
);
}
}
if flags.contains(ImageFlags::REPEAT_X) {
unsafe {
gl.tex_parameter_i32(
glow::TEXTURE_2D,
glow::TEXTURE_WRAP_S,
glow::REPEAT as i32,
);
}
} else {
unsafe {
gl.tex_parameter_i32(
glow::TEXTURE_2D,
glow::TEXTURE_WRAP_S,
glow::CLAMP_TO_EDGE as i32,
);
}
}
if flags.contains(ImageFlags::REPEAT_Y) {
unsafe {
gl.tex_parameter_i32(
glow::TEXTURE_2D,
glow::TEXTURE_WRAP_T,
glow::REPEAT as i32,
);
}
} else {
unsafe {
gl.tex_parameter_i32(
glow::TEXTURE_2D,
glow::TEXTURE_WRAP_T,
glow::CLAMP_TO_EDGE as i32,
);
}
}
unsafe {
gl.pixel_store_i32(glow::UNPACK_ALIGNMENT, 4);
gl.pixel_store_i32(glow::UNPACK_ROW_LENGTH, 0);
gl.pixel_store_i32(glow::UNPACK_SKIP_PIXELS, 0);
gl.pixel_store_i32(glow::UNPACK_SKIP_ROWS, 0);
}
if flags.contains(ImageFlags::GENERATE_MIPMAPS) {
unsafe {
gl.generate_mipmap(glow::TEXTURE_2D);
}
}
unsafe {
gl.bind_texture(glow::TEXTURE_2D, None);
}
return texture;
} else {
panic!("Target is not mem");
}
}
pub fn origin(&self, cms: &Render) -> (f32, f32) {
let (x, y) = self.bounds;
let p1 = (x.0, y.1);
cms.map(p1).unwrap()
}
pub fn set_target(&mut self, target: TargetType) {
self.target = target;
}
}