From 8c2ac4b3005019eb43ce3c3453444872cb85a40d Mon Sep 17 00:00:00 2001 From: Tsuki Date: Mon, 5 Jan 2026 00:46:59 +0800 Subject: [PATCH] stack frame --- Cargo.lock | 7 + gen/src/lib.rs | 72 +- gen/test.bufrtbl | Bin 0 -> 20 bytes rbufr/Cargo.toml | 1 + rbufr/src/block.rs | 4 +- rbufr/src/lib.rs | 4 +- rbufr/src/structs/data_parser.rs | 1160 +++++++++++++++++++++-------- rbufr/src/structs/tools.rs | 8 +- rbufr/src/structs/versions/mod.rs | 4 +- rbufr/src/structs/versions/v2.rs | 2 +- rbufr/src/structs/versions/v4.rs | 2 +- 11 files changed, 936 insertions(+), 328 deletions(-) create mode 100644 gen/test.bufrtbl diff --git a/Cargo.lock b/Cargo.lock index f38df6e..c73e836 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1265,6 +1265,7 @@ dependencies = [ "flate2", "gentools", "nom", + "rustc-hash", "serde", "thiserror 2.0.17", ] @@ -1352,6 +1353,12 @@ dependencies = [ "syn 2.0.111", ] +[[package]] +name = "rustc-hash" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" + [[package]] name = "rustix" version = "1.1.2" diff --git a/gen/src/lib.rs b/gen/src/lib.rs index 63cadf2..64c8ee8 100644 --- a/gen/src/lib.rs +++ b/gen/src/lib.rs @@ -15,6 +15,7 @@ use rkyv::bytecheck::CheckBytes; use rkyv::rancor::Error; use rkyv::{Archive, Deserialize, Serialize}; use serde::{Deserialize as SerdeDeserialize, Serialize as SerdeSerialize}; +use std::borrow::Borrow; use std::fmt::Debug; use std::io::{Cursor, Write}; use std::path::Path; @@ -116,9 +117,8 @@ where } /// 获取拥有的版本 - fn get(&self, fxy: FXY) -> Option<&::Archived> { + fn get(&self, fxy: &K) -> Option<&::Archived> { let hash = self.mphf.get(&fxy)? as usize; - self.archived().ok()?.entries.get(hash) } @@ -148,13 +148,15 @@ where rkyv::Serialize, rkyv::Deserialize, Debug, - PartialEq, Eq, Clone, Copy, std::hash::Hash, )] -#[rkyv(compare(PartialEq), derive(Debug, Clone, Copy))] +#[rkyv( + // compare(PartialEq), + derive(Debug, Clone, Copy, std::hash::Hash, Eq) +)] pub struct FXY { pub f: i32, pub x: i32, @@ -191,14 +193,6 @@ impl FXY { pub fn to_u32(&self) -> u32 { ((self.f as u32) << 14) | ((self.x as u32) << 8) | (self.y as u32) } - - // pub fn from_u32(value: u32) -> Self { - // FXY { - // f: ((value >> 14) & 0x3) as u16, - // x: ((value >> 8) & 0x3F) as u16, - // y: (value & 0xFF) as u16, - // } - // } } pub struct BUFRTableMPH { @@ -234,11 +228,61 @@ where Ok(BUFRTableMPH { inner: bhm }) } - pub fn lookup(&self, fxy: FXY) -> Option<&::Archived> { + pub fn lookup(&self, fxy: &K) -> Option<&::Archived> { self.inner.get(fxy) } } +pub trait BUFRKey: Debug + Eq + std::hash::Hash + PartialEq + PartialEq { + fn f(&self) -> i32; + fn x(&self) -> i32; + fn y(&self) -> i32; +} + +impl BUFRKey for FXY { + fn f(&self) -> i32 { + self.f + } + fn x(&self) -> i32 { + self.x + } + fn y(&self) -> i32 { + self.y + } +} +impl BUFRKey for ArchivedFXY { + fn f(&self) -> i32 { + self.f.to_native() + } + fn x(&self) -> i32 { + self.x.to_native() + } + fn y(&self) -> i32 { + self.y.to_native() + } +} + +impl PartialEq for FXY { + fn eq(&self, other: &K) -> bool { + self.f == other.f() && self.x == other.x() && self.y == other.y() + } +} + +impl PartialEq for ArchivedFXY { + fn eq(&self, other: &K) -> bool { + self.f.to_native() == other.f() + && self.x.to_native() == other.x() + && self.y.to_native() == other.y() + } +} + +// impl Borrow for ArchivedFXY { +// fn borrow(&self) -> &FXY { +// // SAFETY: ArchivedFXY has the same memory layout as FXY +// unsafe { &*(self as *const ArchivedFXY as *const FXY) } +// } +// } + #[repr(C)] #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum TableType { @@ -277,7 +321,7 @@ mod test { ) .unwrap(); - let x = table.lookup(FXY::new(3, 21, 11)).unwrap(); + let x = table.lookup(&FXY::new(3, 21, 11)).unwrap(); println!("{:#?}", x); } diff --git a/gen/test.bufrtbl b/gen/test.bufrtbl new file mode 100644 index 0000000000000000000000000000000000000000..87badfe190a195eb673891b1665535ae1b850a8b GIT binary patch literal 20 ZcmWe&U}5<4|Nnnx1_p*7KpIFf003142nGNE literal 0 HcmV?d00001 diff --git a/rbufr/Cargo.toml b/rbufr/Cargo.toml index 21c005d..c1cc8ec 100644 --- a/rbufr/Cargo.toml +++ b/rbufr/Cargo.toml @@ -16,6 +16,7 @@ serde = { version = "1.0.228", features = ["derive"] } thiserror = "2.0.17" gentools = { path = "../gen" } anyhow = "1.0.100" +rustc-hash = "2.1.1" [features] default = ["opera"] diff --git a/rbufr/src/block.rs b/rbufr/src/block.rs index 13ded01..4b55c68 100644 --- a/rbufr/src/block.rs +++ b/rbufr/src/block.rs @@ -60,7 +60,7 @@ impl MessageBlock { let opera_bitmap_table = self .load_opera_bitmap_table( table_info.center_id, - GENCENTER, + table_info.subcenter_id, table_info.local_table_version, master_table_version, ) @@ -76,7 +76,7 @@ impl MessageBlock { opera_bitmap_table, ); - parser.parse(&self.message)?; + let record = parser.parse(&self.message)?; Ok(()) } diff --git a/rbufr/src/lib.rs b/rbufr/src/lib.rs index 7a85886..8d07d76 100644 --- a/rbufr/src/lib.rs +++ b/rbufr/src/lib.rs @@ -16,7 +16,7 @@ mod test { ) .unwrap(); - let entry = bufr.lookup(FXY::new(0, 0, 1)).unwrap(); + let entry = bufr.lookup(&FXY::new(0, 0, 1)).unwrap(); println!("{:#?}", entry); } @@ -27,7 +27,7 @@ mod test { let mut parser = Parser::new(); let parsed_file = parser - .parse("/Users/xiang.li1/Downloads/36_2025-12-22T11_00_00.bufr") + .parse("/Users/tsuki/Downloads/36_2025-12-17T09_00_00.bufr.nc") .unwrap(); for msg in parsed_file.messages() { diff --git a/rbufr/src/structs/data_parser.rs b/rbufr/src/structs/data_parser.rs index 6ae4cd0..6e42f46 100644 --- a/rbufr/src/structs/data_parser.rs +++ b/rbufr/src/structs/data_parser.rs @@ -1,5 +1,3 @@ -use std::collections::VecDeque; - use crate::{ errors::{Error, Result}, structs::versions::MessageVersion, @@ -7,11 +5,12 @@ use crate::{ #[cfg(feature = "opera")] use genlib::tables::ArchivedBitMapEntry; use genlib::{ - FXY, opera, + ArchivedFXY, BUFRKey, FXY, prelude::{BUFRTableB, BUFRTableBitMap, BUFRTableD}, - tables::{ArchivedBTableEntry, ArchivedDTableEntry, BTableEntry}, + tables::{ArchivedBTableEntry, ArchivedDTableEntry}, }; -use serde::de; +use rustc_hash::FxHashMap; +use std::fmt::Display; const MISS_VAL: f64 = 99999.999999; @@ -22,6 +21,111 @@ pub struct DataParser { // local local_b: Option, local_d: Option, + // opera + #[cfg(feature = "opera")] + opera_bitmap_table: Option, +} + +struct Cache<'a> { + master_b: &'a BUFRTableB, + master_d: &'a BUFRTableD, + local_b: Option<&'a BUFRTableB>, + local_d: Option<&'a BUFRTableD>, + + // Cache + b_cache: FxHashMap, + d_cache: FxHashMap, +} + +impl<'a> Cache<'a> { + fn new( + master_b: &'a BUFRTableB, + master_d: &'a BUFRTableD, + local_b: Option<&'a BUFRTableB>, + local_d: Option<&'a BUFRTableD>, + ) -> Self { + Self { + master_b, + master_d, + local_b, + local_d, + b_cache: FxHashMap::default(), + d_cache: FxHashMap::default(), + } + } + + /// Get or cache B table entry + #[inline(always)] + fn get_b(&mut self, fxy: &K) -> Option<&'a ArchivedBTableEntry> { + let key = FXY::new(fxy.f(), fxy.x(), fxy.y()); + + // Check if already cached + if self.b_cache.contains_key(&key) { + return self.b_cache.get(&key).copied(); + } + + // Cache miss: lookup and cache + let entry = self.lookup_b_descriptor(fxy)?; + self.b_cache.insert(key, entry); + Some(entry) + } + + /// Get or cache D table entry + #[inline(always)] + fn get_d(&mut self, fxy: &K) -> Option<&'a ArchivedDTableEntry> { + let key = FXY::new(fxy.f(), fxy.x(), fxy.y()); + + // Check if already cached + if self.d_cache.contains_key(&key) { + return self.d_cache.get(&key).copied(); + } + + // Cache miss: lookup and cache + let entry = self.lookup_d_descriptor(fxy)?; + self.d_cache.insert(key, entry); + Some(entry) + } + + #[inline(always)] + fn lookup_b_descriptor(&self, fxy: &K) -> Option<&ArchivedBTableEntry> { + self.lookup_local_b_descriptor(fxy) + .or_else(|| self.lookup_master_b_descriptor(fxy)) + } + + #[inline] + fn lookup_local_b_descriptor(&self, fxy: &K) -> Option<&ArchivedBTableEntry> { + self.local_b + .as_ref() + .and_then(|t| t.lookup(fxy)) + .filter(|e| &e.fxy == fxy) + } + + #[inline] + fn lookup_master_b_descriptor(&self, fxy: &K) -> Option<&ArchivedBTableEntry> { + self.master_b.lookup(fxy).filter(|e| &e.fxy == fxy) + } + + #[inline] + fn lookup_master_d_descriptor(&self, fxy: &K) -> Option<&ArchivedDTableEntry> { + self.master_d.lookup(fxy).filter(|e| &e.fxy == fxy) + } + + #[inline] + fn lookup_local_d_descriptor(&self, fxy: &K) -> Option<&ArchivedDTableEntry> { + self.local_d + .as_ref() + .and_then(|t| t.lookup(fxy)) + .filter(|e| &e.fxy == fxy) + } + + #[inline(always)] + fn lookup_d_descriptor(&self, fxy: &K) -> Option<&ArchivedDTableEntry> { + self.lookup_local_d_descriptor(fxy) + .or_else(|| self.lookup_master_d_descriptor(fxy)) + } +} + +struct State { // Common State common_scale: Option, common_ref_value: Option, @@ -31,256 +135,33 @@ pub struct DataParser { local_data_width: Option, // Temporary storage temp_operator: Option, - // OPERA Bitmap Table - #[cfg(feature = "opera")] - opera_bitmap_table: Option, } -impl DataParser { - pub fn new( - edition: u8, - master_b: BUFRTableB, - master_d: BUFRTableD, - local_b: Option, - local_d: Option, - #[cfg(feature = "opera")] opera_bitmap_table: Option, - ) -> Self { - DataParser { - bufr_edition: edition, - master_b, - master_d, - local_b, - local_d, +impl State { + fn new() -> Self { + Self { common_scale: None, common_ref_value: None, common_data_width: None, common_str_width: None, local_data_width: None, temp_operator: None, - #[cfg(feature = "opera")] - opera_bitmap_table, } } - pub fn parse(&mut self, message: &V) -> Result<()> { - let data_block = message.data_block()?; - let mut descriptors = message.descriptors()?; - - let mut data_input = BitInput::new(data_block); - - while !descriptors.is_empty() { - let (desc, data) = self.parser_inner(&mut Vec::new(), descriptors, data_input)?; - descriptors = desc; - data_input = data; - } - - Ok(()) - } - - fn parser_inner<'a>( - &mut self, - values: &mut Vec, - mut descriptors: VecDeque, - data: BitInput<'a>, - ) -> Result<(VecDeque, BitInput<'a>)> { - if descriptors.is_empty() { - return Ok((descriptors, data)); - } - - let des = descriptors.pop_front().unwrap(); - println!("Processing descriptor {:?}", des); - - match des.f { - 0 => { - // Element descriptor - parse data - if let Some(e) = self.lookup_b_descriptor(des) { - let (value, remaining) = self.evalute(data, &e)?; - println!("Evaluated descriptor {}\nTo value {}", e, value); - values.push(value); - return Ok((descriptors, remaining)); - } else { - return Err(Error::ParseError(format!( - "Descriptor {:?} not found in Table B", - des - ))); - } - } - 1 => { - let genlib::FXY { x, y, .. } = des; - let (mut descriptors, mut data, x, y) = if y == 0 { - // Delayed repetition: parse the next descriptor to get repeat count - let (descriptors, updated_data) = - self.parser_inner(values, descriptors, data)?; - let count = if let Some(count) = values.pop() { - let count = count.as_f64().ok_or_else(|| { - Error::ParseError("Expected numeric value for repeat count".to_string()) - })?; - count.floor() as usize - } else { - return Err(Error::ParseError(format!( - "Expected UInt value for repeat count" - ))); - }; - - (descriptors, updated_data, x as usize, count) - } else { - (descriptors, data, x as usize, y as usize) - }; - - if x > descriptors.len() { - return Err(Error::ParseError(format!( - "Not enough descriptors to repeat: requested {}, available {}", - x, - descriptors.len() - ))); - } - - let seq = descriptors.split_off(x); - - for _ in 0..y { - let mut descriptors_clone = descriptors.clone(); - while !descriptors_clone.is_empty() { - let (_desc, cd) = self.parser_inner(values, descriptors_clone, data)?; - descriptors_clone = _desc; - data = cd; - } - } - - return Ok((seq, data)); - } - 2 => { - let data = self.deal_with_operator(values, des, data)?; - return Ok((descriptors, data)); - } - 3 => { - #[cfg(feature = "opera")] - let opera_dw = self.parse_opera_bitmap(des).map(|e| e.depth); - - if let Some(seq) = self.lookup_d_descriptor(des) { - let mut fxy_chain: VecDeque = seq - .fxy_chain - .iter() - .map(|f| { - let result = - FXY::new(f.f.to_native(), f.x.to_native(), f.y.to_native()); - result - }) - .collect(); - - #[cfg(feature = "opera")] - if opera_dw.is_some() { - let (_, data) = - self.parse_opera_array(opera_dw.unwrap(), fxy_chain, data)?; - return Ok((descriptors, data)); - } else { - fxy_chain.extend(descriptors); - return Ok((fxy_chain, data)); - } - } else { - return Err(Error::ParseError(format!( - "Sequence descriptor {:?} not found in Table D", - des - ))); - } - } - _ => { - return Err(Error::ParseError(format!( - "Invalid descriptor F value: {}", - des.f - ))); - } - } - Ok((descriptors, data)) - } - - fn lookup_b_descriptor(&self, fxy: genlib::FXY) -> Option<&ArchivedBTableEntry> { - self.master_b - .lookup(fxy) - .as_ref() - .map(|entry| { - if FXY::new( - entry.fxy.f.to_native(), - entry.fxy.x.to_native(), - entry.fxy.y.to_native(), - ) == fxy - { - Some(*entry) - } else { - None - } - }) - .flatten() - .or(self.local_b.as_ref().and_then(|t| t.lookup(fxy))) - } - - fn lookup_d_descriptor(&self, fxy: genlib::FXY) -> Option<&ArchivedDTableEntry> { - self.master_d - .lookup(fxy) - .as_ref() - .map(|entry| { - if FXY::new( - entry.fxy.f.to_native(), - entry.fxy.x.to_native(), - entry.fxy.y.to_native(), - ) == fxy - { - Some(*entry) - } else { - None - } - }) - .flatten() - .or(self.local_d.as_ref().and_then(|t| t.lookup(fxy))) - } - - fn evalute<'a>( - &self, - data: BitInput<'a>, - e: &ArchivedBTableEntry, - ) -> Result<(Value, BitInput<'a>)> { - match e.bufr_unit.as_str() { - "CCITT IA5" => { - let total_bytes = self - .common_str_width - .unwrap_or(((e.bufr_datawidth_bits.to_native() as usize) + 7) / 8); - let (s, data) = data.take_string(total_bytes as usize)?; - return Ok((Value::String(s), data)); - } - _ => { - let datawidth = self.datawidth(e); - let scale = self.scale(e) as f64; - let reference_value = self.reference_value(e) as f64; - let (value, data) = data.get_arbitary_bits(datawidth as usize)?; - let mv = (1 << datawidth) - 1; - if value == mv && e.fxy.x != 31 { - return Ok((Value::Missing, data)); - } - let result = ((value as f64) + reference_value) * 10.0f64.powi(-scale as i32); - return Ok((Value::Number(result), data)); - } - } - } - - #[inline] + #[inline(always)] fn no_change(&self, e: &ArchivedBTableEntry) -> bool { - let is_flag = match e.bufr_unit.as_str() { - "flag table" | "flag-table" => true, - _ => false, - }; + let unit = e.bufr_unit.as_str(); + let is_flag_or_code = matches!( + unit, + "flag table" | "flag-table" | "code table" | "code-table" + ); + let delay_repeat_count = e.fxy.f.to_native() == 0 && e.fxy.x.to_native() == 31; - let is_code = match e.bufr_unit.as_str() { - "code table" | "code-table" => true, - _ => false, - }; - - let delay_repeat_count = match (e.fxy.f.to_native(), e.fxy.x.to_native()) { - (0, 31) => true, - _ => false, - }; - - is_flag || is_code || delay_repeat_count + is_flag_or_code || delay_repeat_count } + #[inline(always)] fn datawidth(&self, e: &ArchivedBTableEntry) -> u32 { let v = if self.no_change(e) { e.bufr_datawidth_bits.to_native() @@ -296,13 +177,14 @@ impl DataParser { .unwrap_or(e.bufr_datawidth_bits.to_native()) }; - if self.temp_operator.is_some() { - e.bufr_datawidth_bits.to_native() + (10 * self.temp_operator.unwrap()) as u32 + if let Some(op) = self.temp_operator { + e.bufr_datawidth_bits.to_native() + (10 * op) as u32 } else { v } } + #[inline(always)] fn scale(&self, e: &ArchivedBTableEntry) -> i32 { let v = if self.no_change(e) { e.bufr_scale.to_native() @@ -315,73 +197,452 @@ impl DataParser { .unwrap_or(e.bufr_scale.to_native()) }; - if self.temp_operator.is_some() { - e.bufr_scale.to_native() + self.temp_operator.unwrap() + if let Some(op) = self.temp_operator { + e.bufr_scale.to_native() + op } else { v } } + #[inline(always)] fn reference_value(&self, e: &ArchivedBTableEntry) -> i32 { let v = e.bufr_reference_value.to_native(); - if self.temp_operator.is_some() { - (e.bufr_reference_value.to_native() as f32 * 10_f32.powi(self.temp_operator.unwrap())) - as i32 + if let Some(op) = self.temp_operator { + (v as f32 * 10_f32.powi(op)) as i32 } else { v } } +} - fn deal_with_operator<'a>( - &mut self, - values: &mut Vec, - operator: FXY, +impl DataParser { + pub fn new( + edition: u8, + master_b: BUFRTableB, + master_d: BUFRTableD, + local_b: Option, + local_d: Option, + + #[cfg(feature = "opera")] _opera_bitmap_table: Option, + ) -> Self { + DataParser { + bufr_edition: edition, + master_b, + master_d, + local_b, + local_d, + #[cfg(feature = "opera")] + opera_bitmap_table: _opera_bitmap_table, + } + } + + pub fn parse<'a, V: MessageVersion>(&'a mut self, message: &V) -> Result> { + let data_block = message.data_block()?; + let descriptors = message.descriptors()?; + + let mut data_input = BitInput::new(data_block); + let mut record = BUFRParsed::new(); + let mut state = State::new(); + let mut cache = Cache::new( + &self.master_b, + &self.master_d, + self.local_b.as_ref(), + self.local_d.as_ref(), + ); + + let mut stack: Vec = vec![]; + stack.push(Frame { + descs: Descs::Raw(&descriptors), + idx: 0, + }); + + while let Some(Frame { descs, mut idx }) = stack.pop() { + if idx > descs.len() { + continue; + } + + self.parsed(descs, idx, &mut stack, &mut cache, &mut state, data_input)?; + + idx += 1; + stack.push(Frame { descs, idx }); + } + + Ok(record) + } + + fn parsed<'k, 'c, 'i>( + &self, + descs: Descs<'k>, + idx: usize, + stack: &mut Vec>, + cache: &mut Cache<'c>, + state: &mut State, + data: BitInput<'i>, + ) -> Result<()> { + match descs { + Descs::Raw(raw) => { + let des = &raw[0]; + self.parse_d(des, idx, descs, stack, cache, state, data)?; + } + Descs::Archived(archived) => { + let des = &archived[0]; + self.parse_d(des, idx, descs, stack, cache, state, data)?; + } + }; + Ok(()) + } + + fn parse_d<'k, 'c, 'i, K: BUFRKey>( + &self, + des: &'k K, + mut idx: usize, + descs: Descs<'k>, + stack: &mut Vec>, + cache: &mut Cache<'c>, + state: &mut State, + data: BitInput<'i>, + ) -> Result<()> { + match des.f() { + 0 => { + // Element descriptor - parse data + if let Some(e) = cache.get_b(des) { + let (value, remaining) = self.evalute(state, data, &e)?; + + // println!("Parsed Descriptor: {:?}, Value: {}", des, value); + // values.push(value, &e.element_name_en, &e.bufr_unit); + // return Ok((&descriptors[1..], remaining)); + + return Ok(()); + } else { + return Err(Error::ParseError(format!( + "Descriptor {:?} not found in Table B", + des + ))); + } + } + 1 => { + let mut x = des.x() as usize; + let mut y = des.y() as usize; + if y == 0 { + let (count, data) = match descs { + Descs::Raw(raw) => { + let count_des = &raw[1]; + self.parse_usize(state, cache, count_des, data)? + } + + Descs::Archived(archived) => { + let count_des = &archived[1]; + self.parse_usize(state, cache, count_des, data)? + } + }; + y = count; + + let _ = stack.pop(); + stack.push(Frame { descs, idx: 2 }); + }; + + // if x > descriptors.len() { + // return Err(Error::ParseError(format!( + // "Not enough descriptors to repeat: requested {}, available {}", + // x, + // descriptors.len() + // ))); + // } + + match descs { + Descs::Raw(raw) => { + let body = &raw[idx..idx + x]; + let _ = stack.pop(); + stack.push(Frame { + descs: Descs::Raw(body), + idx: 0, + }); + } + Descs::Archived(archived) => { + let body = &archived[idx..idx + x]; + let _ = stack.pop(); + stack.push(Frame { + descs: Descs::Archived(body), + idx: 0, + }); + } + } + + return Ok(()); + } + 2 => { + let data = self.deal_with_operator(state, values, des, data)?; + return Ok(()); + } + 3 => { + #[cfg(feature = "opera")] + let opera_dw = self.parse_opera_bitmap(des).map(|e| e.depth); + + if let Some(seq) = cache.get_d(des) { + let mut fxy_chain = seq.fxy_chain.as_slice(); + if opera_dw.is_some() { + // let (_, data) = + // self.parse_opera_array(opera_dw.unwrap(), fxy_chain, data)?; + // TODO + unimplemented!(""); + } else { + stack.push(Frame { + descs: Descs::Archived(fxy_chain), + idx: 0, + }); + + return Ok(()); + } + } else { + return Err(Error::ParseError(format!( + "Sequence descriptor {:?} not found in Table D", + des + ))); + } + } + _ => { + return Err(Error::ParseError(format!( + "Invalid descriptor F value: {}", + des.f() + ))); + } + } + } + + #[inline(always)] + fn parser_inner<'s, 'a, 'b, 'c, C: Container<'s>, K: BUFRKey>( + &'s self, + state: &mut State, + cache: &mut Cache<'c>, + values: &mut C, + descriptors: &'a [K], + mut data: BitInput<'b>, + ) -> Result<(&'a [K], BitInput<'b>)> { + if descriptors.is_empty() { + return Ok((descriptors, data)); + } + + let des = &descriptors[0]; + + match des.f() { + 0 => { + // Element descriptor - parse data + if let Some(e) = cache.get_b(des) { + let (value, remaining) = self.evalute(state, data, &e)?; + + // println!("Parsed Descriptor: {:?}, Value: {}", des, value); + values.push(value, &e.element_name_en, &e.bufr_unit); + return Ok((&descriptors[1..], remaining)); + } else { + return Err(Error::ParseError(format!( + "Descriptor {:?} not found in Table B", + des + ))); + } + } + 1 => { + let x = des.x(); + let y = des.y(); + let (descriptors, mut data, x, y) = if y == 0 { + let (count, updated_data) = + self.parse_usize(state, cache, &descriptors[1], data)?; + (&descriptors[2..], updated_data, x as usize, count) + } else { + (&descriptors[1..], data, x as usize, y as usize) + }; + + if x > descriptors.len() { + return Err(Error::ParseError(format!( + "Not enough descriptors to repeat: requested {}, available {}", + x, + descriptors.len() + ))); + } + + let seq = &descriptors[x..]; + + // Fast path: single descriptor repetition + if x == 1 && descriptors[0].f() == 0 { + let mut repeating = values.start_repeating(y); + for _ in 0..y { + let (_desc, cd) = + self.parser_inner(state, cache, &mut repeating, descriptors, data)?; + data = cd; + } + repeating.finish(); + return Ok((seq, data)); + } + + // General path: multiple descriptors or complex patterns + let mut repeaing = values.start_repeating(y); + + for _ in 0..y { + let mut repeating_descs = &descriptors[0..x]; + while !repeating_descs.is_empty() { + let (_desc, cd) = + self.parser_inner(state, cache, &mut repeaing, repeating_descs, data)?; + repeating_descs = _desc; + data = cd; + } + } + repeaing.finish(); + return Ok((seq, data)); + } + 2 => { + let data = self.deal_with_operator(state, values, des, data)?; + return Ok((&descriptors[1..], data)); + } + 3 => { + #[cfg(feature = "opera")] + let opera_dw = self.parse_opera_bitmap(des).map(|e| e.depth); + + if let Some(seq) = cache.get_d(des) { + let mut fxy_chain = seq.fxy_chain.as_slice(); + if opera_dw.is_some() { + // let (_, data) = + // self.parse_opera_array(opera_dw.unwrap(), fxy_chain, data)?; + // TODO + unimplemented!(""); + } else { + while !fxy_chain.is_empty() { + let (desc, cd) = + self.parser_inner(state, cache, values, fxy_chain, data)?; + fxy_chain = desc; + data = cd; + } + return Ok((&descriptors[1..], data)); + } + } else { + return Err(Error::ParseError(format!( + "Sequence descriptor {:?} not found in Table D", + des + ))); + } + } + _ => { + return Err(Error::ParseError(format!( + "Invalid descriptor F value: {}", + des.f() + ))); + } + } + } + + fn parse_usize<'a, 'b, 'c, K: BUFRKey>( + &self, + state: &State, + cache: &mut Cache<'c>, + des: &'a K, + data: BitInput<'b>, + ) -> Result<(usize, BitInput<'b>)> { + match des.f() { + 0 => { + if let Some(e) = cache.get_b(des) { + let (value, remaining) = self.evalute(state, data, &e)?; + + if let Some(v) = value.as_f64() { + Ok((v.floor() as usize, remaining)) + } else { + Err(Error::ParseError(format!("Format Error"))) + } + } else { + Err(Error::ParseError(format!( + "Descriptor {:?} not found in Table B", + des + ))) + } + } + _ => Err(Error::ParseError(format!( + "Descriptor {:?} not found in Table B", + des + ))), + } + } + + #[inline(always)] + fn evalute<'a>( + &self, + state: &State, data: BitInput<'a>, - ) -> Result<(BitInput<'a>)> { - match operator.x { - 1 => match operator.y { + e: &ArchivedBTableEntry, + ) -> Result<(Value, BitInput<'a>)> { + match e.bufr_unit.as_str() { + "CCITT IA5" => { + let total_bytes = state + .common_str_width + .unwrap_or(((e.bufr_datawidth_bits.to_native() as usize) + 7) / 8); + let (s, data) = data.take_string(total_bytes as usize)?; + return Ok((Value::String(s), data)); + } + _ => { + let datawidth = state.datawidth(e); + let scale = state.scale(e) as f64; + let reference_value = state.reference_value(e) as f64; + let (value, data) = data.get_arbitary_bits(datawidth as usize)?; + let mv = (1 << datawidth) - 1; + if value == mv && e.fxy.x != 31 { + return Ok((Value::Missing, data)); + } + let result = ((value as f64) + reference_value) * 10.0f64.powi(-scale as i32); + return Ok((Value::Number(result), data)); + } + } + } + + fn deal_with_operator<'s, 'a, C: Container<'s>, K: BUFRKey>( + &'s self, + state: &mut State, + values: &mut C, + operator: &K, + data: BitInput<'a>, + ) -> Result> { + let x = operator.x(); + let y = operator.y(); + + match x { + 1 => match y { 0 => { - self.common_data_width = None; + state.common_data_width = None; } _ => { - self.common_data_width = Some(operator.y); + state.common_data_width = Some(y); } }, - 2 => match operator.y { + 2 => match y { 0 => { - self.common_scale = None; + state.common_scale = None; } _ => { - self.common_scale = Some(operator.y as i32); + state.common_scale = Some(y); } }, - 3 => match operator.y { + 3 => match y { 0 => { - self.common_ref_value = None; + state.common_ref_value = None; } _ => { - self.common_ref_value = Some(operator.y as i32); + state.common_ref_value = Some(y); } }, 5 => { - let (string, _data) = data.take_string(operator.y as usize)?; - values.push(Value::String(string)); + let (string, _data) = data.take_string(y as usize)?; + values.push(Value::String(string), "", "CAITT IA5"); } 6 => { - let localized_width = operator.y; - self.local_data_width = Some(localized_width); + let localized_width = y; + state.local_data_width = Some(localized_width); } 7 => { - self.temp_operator = Some(operator.y); + state.temp_operator = Some(y); } - 8 => match operator.y { + 8 => match y { 0 => { - self.common_str_width = None; + state.common_str_width = None; } _ => { - self.common_str_width = Some(operator.y as usize); + state.common_str_width = Some(y as usize); } }, _ => {} @@ -391,31 +652,31 @@ impl DataParser { } #[cfg(feature = "opera")] - fn parse_opera_bitmap(&self, des: FXY) -> Option<&ArchivedBitMapEntry> { + fn parse_opera_bitmap(&self, des: &K) -> Option<&ArchivedBitMapEntry> { self.opera_bitmap_table .as_ref() .map(|t| t.lookup(des)) .flatten() } - #[cfg(feature = "opera")] - fn parse_opera_array<'a>( - &mut self, - dw: u8, - mut descs: VecDeque, - mut data: BitInput<'a>, - ) -> Result<(VecDeque, BitInput<'a>)> { - use crate::opera::OperaBitmapParser; + // #[cfg(feature = "opera")] + // fn parse_opera_array<'a>( + // &mut self, + // dw: u8, + // mut descs: VecDeque, + // mut data: BitInput<'a>, + // ) -> Result<(VecDeque, BitInput<'a>)> { + // use crate::opera::OperaBitmapParser; - let mut opera_bitmap = OperaBitmapParser::new(dw); + // let mut opera_bitmap = OperaBitmapParser::new(dw); - while !descs.is_empty() { - let (_descs, _data) = self.parser_inner(opera_bitmap.values(), descs, data)?; - descs = _descs; - data = _data; - } - Ok((descs, data)) - } + // while !descs.is_empty() { + // let (_descs, _data) = self.parser_inner(opera_bitmap.values(), descs, data)?; + // descs = _descs; + // data = _data; + // } + // Ok((descs, data)) + // } // fn seq_parser(descriptors: &[genlib::FXY]) -> Result<()> {} } @@ -461,6 +722,10 @@ impl Value { Value::Missing => None, } } + + pub fn is_missing(&self) -> bool { + matches!(self, Value::Missing) + } } #[derive(Debug, Clone, Copy)] @@ -475,8 +740,23 @@ impl<'a> BitInput<'a> { self.1 } + #[inline] pub fn take_string(self, nbytes: usize) -> Result<(String, BitInput<'a>)> { - let total_bits = nbytes * 8; + if nbytes == 0 { + return Ok((String::new(), self)); + } + + // Fast path: byte-aligned string reads + if self.1 == 0 { + if self.0.len() < nbytes { + return Err(Error::ParseError("Not enough data for string".to_string())); + } + let s = String::from_utf8(self.0[..nbytes].to_vec()) + .map_err(|_| Error::ParseError("Invalid UTF-8 string".to_string()))?; + return Ok((s, BitInput(&self.0[nbytes..], 0))); + } + + // Slow path: unaligned reads let mut chars = Vec::with_capacity(nbytes); let mut remaining_input = self; @@ -491,47 +771,323 @@ impl<'a> BitInput<'a> { Ok((s, remaining_input)) } + #[inline] pub fn get_arbitary_bits(self, nbits: usize) -> Result<(u64, BitInput<'a>)> { if nbits == 0 { return Ok((0, self)); } + // Fast path: byte-aligned reads for common bit widths + if self.1 == 0 { + return self.get_arbitary_bits_aligned(nbits); + } + + // General path for unaligned reads + self.get_arbitary_bits_unaligned(nbits) + } + + /// Fast path for byte-aligned bit reads + #[inline] + fn get_arbitary_bits_aligned(self, nbits: usize) -> Result<(u64, BitInput<'a>)> { + let byte_data = self.0; + + // Optimized paths for common bit widths + match nbits { + 8 => { + if byte_data.is_empty() { + return Err(Error::ParseError("Not enough data".to_string())); + } + Ok((byte_data[0] as u64, BitInput(&byte_data[1..], 0))) + } + 16 => { + if byte_data.len() < 2 { + return Err(Error::ParseError("Not enough data".to_string())); + } + let value = u16::from_be_bytes([byte_data[0], byte_data[1]]) as u64; + Ok((value, BitInput(&byte_data[2..], 0))) + } + 24 => { + if byte_data.len() < 3 { + return Err(Error::ParseError("Not enough data".to_string())); + } + let value = ((byte_data[0] as u64) << 16) + | ((byte_data[1] as u64) << 8) + | (byte_data[2] as u64); + Ok((value, BitInput(&byte_data[3..], 0))) + } + 32 => { + if byte_data.len() < 4 { + return Err(Error::ParseError("Not enough data".to_string())); + } + let value = + u32::from_be_bytes([byte_data[0], byte_data[1], byte_data[2], byte_data[3]]) + as u64; + Ok((value, BitInput(&byte_data[4..], 0))) + } + _ => { + // Generic byte-aligned path + let nbytes = (nbits + 7) / 8; + if byte_data.len() < nbytes { + return Err(Error::ParseError("Not enough data".to_string())); + } + + let mut value: u64 = 0; + let full_bytes = nbits / 8; + + // Read full bytes + for i in 0..full_bytes { + value = (value << 8) | (byte_data[i] as u64); + } + + let remaining_bits = nbits % 8; + if remaining_bits > 0 { + // Read partial byte + let last_byte = byte_data[full_bytes]; + let shift = 8 - remaining_bits; + let mask = ((1u16 << remaining_bits) - 1) as u8; + let bits = (last_byte >> shift) & mask; + value = (value << remaining_bits) | (bits as u64); + Ok((value, BitInput(&byte_data[full_bytes..], remaining_bits))) + } else { + Ok((value, BitInput(&byte_data[full_bytes..], 0))) + } + } + } + } + + /// Slower path for unaligned bit reads + #[inline] + fn get_arbitary_bits_unaligned(self, nbits: usize) -> Result<(u64, BitInput<'a>)> { let mut value: u64 = 0; let mut remaining_bits = nbits; - let mut bit_offset = self.1; // Current bit position in the first byte (0-7) - let mut byte_data = self.0; // Remaining bytes + let mut bit_offset = self.1; + let mut byte_data = self.0; - while remaining_bits > 0 { + // Read first partial byte if not byte-aligned + if bit_offset > 0 && remaining_bits > 0 { if byte_data.is_empty() { - return Err(Error::ParseError( - "Not enough data to parse bits".to_string(), - )); + return Err(Error::ParseError("Not enough data".to_string())); } let current_byte = byte_data[0]; - // How many bits we can read from the current byte let bits_available_in_byte = 8 - bit_offset; let bits_to_read = remaining_bits.min(bits_available_in_byte); - // Extract bits from the current byte - // Bits are read from MSB to LSB (left to right) let shift = bits_available_in_byte - bits_to_read; - let mask = ((1u32 << bits_to_read) - 1) as u8; + let mask = ((1u16 << bits_to_read) - 1) as u8; let extracted_bits = (current_byte >> shift) & mask; - // Add to value - value = (value << bits_to_read) | extracted_bits as u64; - + value = extracted_bits as u64; remaining_bits -= bits_to_read; bit_offset += bits_to_read; - // Move to next byte if we've consumed all 8 bits if bit_offset >= 8 { byte_data = &byte_data[1..]; bit_offset = 0; } } + // Read full bytes + while remaining_bits >= 8 { + if byte_data.is_empty() { + return Err(Error::ParseError("Not enough data".to_string())); + } + value = (value << 8) | (byte_data[0] as u64); + byte_data = &byte_data[1..]; + remaining_bits -= 8; + } + + // Read final partial byte if needed + if remaining_bits > 0 { + if byte_data.is_empty() { + return Err(Error::ParseError("Not enough data".to_string())); + } + + let current_byte = byte_data[0]; + let shift = 8 - remaining_bits; + let mask = ((1u16 << remaining_bits) - 1) as u8; + let extracted_bits = (current_byte >> shift) & mask; + + value = (value << remaining_bits) | (extracted_bits as u64); + bit_offset = remaining_bits; + } + Ok((value, BitInput(byte_data, bit_offset))) } } + +trait Container<'a> +where + Self: Sized, +{ + fn push(&mut self, value: Value, name: &'a str, unit: &'a str); + + fn start_repeating<'b>(&'b mut self, time: usize) -> Repeating<'a, 'b>; +} + +impl<'a> Container<'a> for BUFRParsed<'a> { + fn push(&mut self, value: Value, name: &'a str, unit: &'a str) { + self.push(value, name, unit); + } + + fn start_repeating<'s>(&'s mut self, time: usize) -> Repeating<'a, 's> { + self.start_repeating(time) + } +} + +impl<'a, 'b> Container<'a> for Repeating<'a, 'b> { + fn push(&mut self, value: Value, _name: &'a str, _unit: &'a str) { + self.push(value); + } + + fn start_repeating<'s>(&'s mut self, time: usize) -> Repeating<'a, 's> { + Repeating { + parsed: self.parsed, + values: Vec::with_capacity(time), + } + } +} + +pub struct BUFRParsed<'a> { + records: Vec>, +} + +impl<'a> BUFRParsed<'a> { + pub fn new() -> Self { + Self { records: vec![] } + } + + pub fn push(&mut self, value: Value, element_name: &'a str, unit: &'a str) { + self.records.push(BUFRRecord { + name: Some(element_name), + values: BUFRData::Single(value), + unit: Some(unit), + }); + } + + fn start_repeating<'s>(&'s mut self, time: usize) -> Repeating<'a, 's> { + Repeating { + parsed: self, + values: Vec::with_capacity(time), + } + } +} + +struct Repeating<'a, 's> { + parsed: &'s mut BUFRParsed<'a>, + values: Vec, +} + +impl<'a, 's> Repeating<'a, 's> { + fn push(&mut self, value: Value) { + self.values.push(value); + } + + fn finish(self) { + let recording = BUFRRecord { + name: None, + values: BUFRData::Repeat(self.values), + unit: None, + }; + self.parsed.records.push(recording); + } +} + +pub enum BUFRData { + Repeat(Vec), + Single(Value), +} + +pub struct BUFRRecord<'a> { + pub name: Option<&'a str>, + pub values: BUFRData, + pub unit: Option<&'a str>, +} + +impl Display for BUFRRecord<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let is_print_unit = match self.unit { + Some("CAITT IA5" | "code table" | "code-table" | "flag table" | "flag-table") => false, + None => false, + _ => true, + }; + + if self.name.is_none() { + return Ok(()); + } + + let name = self.name.as_ref().unwrap(); + + match &self.values { + BUFRData::Single(v) => { + if is_print_unit { + write!(f, "{}: \n{} {}", name, v, self.unit.as_ref().unwrap())?; + } else { + write!(f, "{}: \n{}", name, v)?; + } + } + BUFRData::Repeat(vs) => { + if vs.len() < 8 { + write!(f, "{}: [", name)?; + for v in vs { + if is_print_unit { + write!(f, "{} {}, ", v, self.unit.as_ref().unwrap())?; + } else { + write!(f, "{}, ", v)?; + } + } + + write!(f, "]")?; + } else { + write!(f, "{}: [", name)?; + for v in &vs[0..5] { + if is_print_unit { + write!(f, "{} {}, ", v, self.unit.as_ref().unwrap())?; + } else { + write!(f, "{}, ", v)?; + } + } + write!(f, "... ")?; + for v in &vs[vs.len() - 2..vs.len()] { + if is_print_unit { + write!(f, "{} {}, ", v, self.unit.as_ref().unwrap())?; + } else { + write!(f, "{}, ", v)?; + } + } + write!(f, "]")?; + } + } + } + + Ok(()) + } +} + +struct Frame<'v> { + descs: Descs<'v>, + idx: usize, +} + +impl<'v> Frame<'v> { + fn len(&self) -> usize { + match &self.descs { + Descs::Raw(d) => d.len(), + Descs::Archived(d) => d.len(), + } + } +} + +enum Descs<'v> { + Raw(&'v [genlib::FXY]), + Archived(&'v [ArchivedFXY]), +} + +impl Descs<'_> { + fn len(&self) -> usize { + match self { + Descs::Raw(d) => d.len(), + Descs::Archived(d) => d.len(), + } + } +} diff --git a/rbufr/src/structs/tools.rs b/rbufr/src/structs/tools.rs index ec94d2a..edae1f1 100644 --- a/rbufr/src/structs/tools.rs +++ b/rbufr/src/structs/tools.rs @@ -4,17 +4,17 @@ use crate::errors::{Error, Result}; use crate::structs::bit::{BitInput, parse_arbitrary_bits}; use nom::IResult; -pub(super) fn parse_descriptors(input: &[u8]) -> Result> { +pub(super) fn parse_descriptors(input: &[u8]) -> Result> { parse_descriptors_inner(input) .map(|(_, v)| v) .map_err(|_| Error::ParseError(format!("Can't parse descriptors from section3"))) } -fn parse_descriptors_inner(mut input: &[u8]) -> IResult> { - let mut results = VecDeque::new(); +fn parse_descriptors_inner(mut input: &[u8]) -> IResult> { + let mut results = Vec::new(); while input.len() > 1 { let ((finput, _), fxy) = take_fxy((input, 0))?; - results.push_back(fxy); + results.push(fxy); input = finput; } diff --git a/rbufr/src/structs/versions/mod.rs b/rbufr/src/structs/versions/mod.rs index bba1480..393b11e 100644 --- a/rbufr/src/structs/versions/mod.rs +++ b/rbufr/src/structs/versions/mod.rs @@ -97,7 +97,7 @@ macro_rules! message { } } - fn descriptors(&self) -> Result> { + fn descriptors(&self) -> Result> { match self { $( BUFRMessage::$version(msg) => msg.descriptors(), @@ -163,7 +163,7 @@ pub trait MessageVersion: Sized { fn ndescs(&self) -> usize; - fn descriptors(&self) -> Result>; + fn descriptors(&self) -> Result>; fn data_block(&self) -> Result<&[u8]>; } diff --git a/rbufr/src/structs/versions/v2.rs b/rbufr/src/structs/versions/v2.rs index ed8f64b..140ddec 100644 --- a/rbufr/src/structs/versions/v2.rs +++ b/rbufr/src/structs/versions/v2.rs @@ -65,7 +65,7 @@ impl MessageVersion for BUFRMessageV2 { self.section3.data.len() / 2 } - fn descriptors(&self) -> Result> { + fn descriptors(&self) -> Result> { parse_descriptors(&self.section3.data) } diff --git a/rbufr/src/structs/versions/v4.rs b/rbufr/src/structs/versions/v4.rs index 3fe66d2..3f1b63f 100644 --- a/rbufr/src/structs/versions/v4.rs +++ b/rbufr/src/structs/versions/v4.rs @@ -66,7 +66,7 @@ impl MessageVersion for BUFRMessageV4 { self.section3.data.len() / 2 } - fn descriptors(&self) -> Result> { + fn descriptors(&self) -> Result> { parse_descriptors(&self.section3.data) }