diff --git a/gen/src/config.rs b/gen/src/config.rs index 9f1a385..b3e0da1 100644 --- a/gen/src/config.rs +++ b/gen/src/config.rs @@ -5,20 +5,11 @@ use std::path::Path; use crate::pattern::{TableFilePattern, TableKind, TableMetadata}; -/// Configuration for custom table patterns #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PatternConfig { - /// Name/description of this pattern pub name: String, - - /// Regular expression pattern to match filenames - /// Capture groups should be in order: kind, version, subcenter, center, language pub regex: String, - - /// Glob pattern for scanning directories pub glob: String, - - /// Mapping of regex capture groups to metadata fields pub mapping: FieldMapping, } @@ -182,8 +173,7 @@ impl ScanConfig { /// Save configuration to a TOML file pub fn save_to_file>(&self, path: P) -> Result<()> { - let content = toml::to_string_pretty(self) - .context("Failed to serialize config")?; + let content = toml::to_string_pretty(self).context("Failed to serialize config")?; std::fs::write(path.as_ref(), content) .with_context(|| format!("Failed to write config file: {}", path.as_ref().display()))?; diff --git a/rbufr/src/block.rs b/rbufr/src/block.rs index 4b55c68..e73c0d3 100644 --- a/rbufr/src/block.rs +++ b/rbufr/src/block.rs @@ -1,12 +1,14 @@ +use std::ops::Deref; + use genlib::BUFRTableMPH; #[cfg(feature = "opera")] use genlib::prelude::BUFRTableBitMap; use genlib::tables::TableTypeTrait; +use crate::decoder::*; use crate::errors::Result; #[cfg(feature = "opera")] use crate::structs::GENCENTER; -use crate::structs::data_parser::DataParser; use crate::structs::versions::{BUFRMessage, MessageVersion}; use crate::tables::*; @@ -20,68 +22,20 @@ impl std::fmt::Display for MessageBlock { } } +impl Deref for MessageBlock { + type Target = BUFRMessage; + + fn deref(&self) -> &Self::Target { + &self.message + } +} + impl MessageBlock { pub fn new(message: BUFRMessage) -> Self { MessageBlock { message } } - pub fn load_data(&self) -> Result<()> { - let table_info = self.message.table_info(); - let master_table_version = table_info.master_table_version; - - let master_b: BUFRTableB = self.load_first_validable_table(master_table_version)?; - let master_d: BUFRTableD = self.load_first_validable_table(master_table_version)?; - - let local_table_version = table_info.local_table_version as u32; - - let local_tables = if local_table_version > 0 { - let local_b: BUFRTableB = TableLoader.load_table(LocalTable::new( - Some(table_info.subcenter_id * 256 + table_info.center_id), - table_info.local_table_version, - ))?; - - let local_d: BUFRTableD = TableLoader.load_table(LocalTable::new( - Some(table_info.subcenter_id * 256 + table_info.center_id), - table_info.local_table_version, - ))?; - - Some((local_b, local_d)) - } else { - None - }; - - let (local_b, local_d) = if let Some((b, d)) = local_tables { - (Some(b), Some(d)) - } else { - (None, None) - }; - - #[cfg(feature = "opera")] - let opera_bitmap_table = self - .load_opera_bitmap_table( - table_info.center_id, - table_info.subcenter_id, - table_info.local_table_version, - master_table_version, - ) - .ok(); - - let mut parser = DataParser::new( - self.message.version(), - master_b, - master_d, - local_b, - local_d, - #[cfg(feature = "opera")] - opera_bitmap_table, - ); - - let record = parser.parse(&self.message)?; - - Ok(()) - } - - fn load_first_validable_table( + pub(crate) fn load_first_validable_table( &self, table_version: u8, ) -> Result> { @@ -101,7 +55,7 @@ impl MessageBlock { } #[cfg(feature = "opera")] - fn load_opera_bitmap_table( + pub(crate) fn load_opera_bitmap_table( &self, subcenter: u16, center: u16, @@ -132,7 +86,6 @@ impl BUFRFile { self.messages.push(MessageBlock::new(message)); } - /// Get the number of successfully parsed messages pub fn message_count(&self) -> usize { self.messages.len() } @@ -141,7 +94,6 @@ impl BUFRFile { self.messages.get(index) } - /// Get a reference to all parsed messages pub fn messages(&self) -> &[MessageBlock] { &self.messages } diff --git a/rbufr/src/decoder.rs b/rbufr/src/decoder.rs new file mode 100644 index 0000000..734f945 --- /dev/null +++ b/rbufr/src/decoder.rs @@ -0,0 +1,1628 @@ +use crate::{ + block::MessageBlock, + errors::{Error, Result}, + structs::versions::MessageVersion, + tables::{LocalTable, TableLoader}, +}; +#[cfg(feature = "opera")] +use genlib::tables::ArchivedBitMapEntry; +use genlib::{ + ArchivedFXY, BUFRKey, FXY, + prelude::{BUFRTableB, BUFRTableBitMap, BUFRTableD}, + tables::{ArchivedBTableEntry, ArchivedDTableEntry}, +}; +use rustc_hash::FxHashMap; +use std::{fmt::Display, ops::Deref}; + +const MISS_VAL: f64 = 99999.999999; + +pub struct Decoder { + bufr_edition: u8, + master_b: BUFRTableB, + master_d: BUFRTableD, + // local + local_b: Option, + local_d: Option, + // opera + #[cfg(feature = "opera")] + opera_bitmap_table: Option, +} + +struct Cache<'a> { + master_b: &'a BUFRTableB, + master_d: &'a BUFRTableD, + local_b: Option<&'a BUFRTableB>, + local_d: Option<&'a BUFRTableD>, + + // Cache + b_cache: FxHashMap, + d_cache: FxHashMap, +} + +impl<'a> Cache<'a> { + fn new( + master_b: &'a BUFRTableB, + master_d: &'a BUFRTableD, + local_b: Option<&'a BUFRTableB>, + local_d: Option<&'a BUFRTableD>, + ) -> Self { + Self { + master_b, + master_d, + local_b, + local_d, + b_cache: FxHashMap::default(), + d_cache: FxHashMap::default(), + } + } + + /// Get or cache B table entry + #[inline(always)] + fn get_b(&mut self, fxy: &K) -> Option<&'a ArchivedBTableEntry> { + self.lookup_b_descriptor(fxy) + } + + /// Get or cache D table entry + #[inline(always)] + fn get_d(&mut self, fxy: &K) -> Option<&'a ArchivedDTableEntry> { + self.lookup_d_descriptor(fxy) + } + + #[inline(always)] + fn lookup_b_descriptor(&self, fxy: &K) -> Option<&'a ArchivedBTableEntry> { + self.lookup_local_b_descriptor(fxy) + .or_else(|| self.lookup_master_b_descriptor(fxy)) + } + + #[inline] + fn lookup_local_b_descriptor(&self, fxy: &K) -> Option<&'a ArchivedBTableEntry> { + self.local_b + .as_ref() + .and_then(|t| t.lookup(fxy)) + .filter(|e| &e.fxy == fxy) + } + + #[inline] + fn lookup_master_b_descriptor(&self, fxy: &K) -> Option<&'a ArchivedBTableEntry> { + self.master_b.lookup(fxy).filter(|e| &e.fxy == fxy) + } + + #[inline] + fn lookup_master_d_descriptor(&self, fxy: &K) -> Option<&'a ArchivedDTableEntry> { + self.master_d.lookup(fxy).filter(|e| &e.fxy == fxy) + } + + #[inline] + fn lookup_local_d_descriptor(&self, fxy: &K) -> Option<&'a ArchivedDTableEntry> { + self.local_d + .as_ref() + .and_then(|t| t.lookup(fxy)) + .filter(|e| &e.fxy == fxy) + } + + #[inline(always)] + fn lookup_d_descriptor(&self, fxy: &K) -> Option<&'a ArchivedDTableEntry> { + self.lookup_local_d_descriptor(fxy) + .or_else(|| self.lookup_master_d_descriptor(fxy)) + } +} + +struct State { + // Common State + common_scale: Option, + common_ref_value: Option, + common_data_width: Option, + common_str_width: Option, + // Localized State + local_data_width: Option, + // Temporary storage + temp_operator: Option, +} + +/// Pre-compiled metadata for one field in the array body +#[derive(Debug, Clone)] +struct FieldSpec<'a> { + /// Original FXY for debugging/output + fxy: FXY, + /// Name from Table B + name: &'a str, + /// Unit from Table B + unit: &'a str, + /// Effective bit width (after operators applied) + width_bits: u32, + /// Effective scale (after operators applied) + scale: i32, + /// Effective reference value (after operators applied) + reference: i32, + /// Missing value for this field (all bits set for this width) + missing_value: u64, +} + +/// Compiled layout for one array repetition +#[derive(Debug, Clone)] +struct CompiledLayout<'a> { + fields: Vec>, + /// Total bits per element (for sanity checks) + bits_per_element: usize, +} + +/// Compiler state machine (mimics State but for compile-time analysis) +#[derive(Debug)] +struct CompilerState { + common_scale: Option, + common_ref_value: Option, + common_data_width: Option, + temp_operator: Option, + // We reject arrays with these: + common_str_width: Option, + local_data_width: Option, +} + +impl State { + fn new() -> Self { + Self { + common_scale: None, + common_ref_value: None, + common_data_width: None, + common_str_width: None, + local_data_width: None, + temp_operator: None, + } + } + + #[inline(always)] + fn no_change(&self, e: &ArchivedBTableEntry) -> bool { + let unit = e.bufr_unit.as_str(); + let is_flag_or_code = matches!( + unit, + "flag table" | "flag-table" | "code table" | "code-table" + ); + let delay_repeat_count = e.fxy.f.to_native() == 0 && e.fxy.x.to_native() == 31; + + is_flag_or_code || delay_repeat_count + } + + #[inline(always)] + fn datawidth(&self, e: &ArchivedBTableEntry) -> u32 { + // 2-06-YYY: localized data width overrides everything else + if let Some(local_width) = self.local_data_width { + return local_width as u32; + } + + let v = if self.no_change(e) { + e.bufr_datawidth_bits.to_native() + } else { + self.common_data_width + .map(|c| { + let (v, _) = e + .bufr_datawidth_bits + .to_native() + .overflowing_add_signed(c - 128); + v + }) + .unwrap_or(e.bufr_datawidth_bits.to_native()) + }; + + // 2-07-YYY: increase width by 10*Y bits + if let Some(op) = self.temp_operator { + v + (10 * op) as u32 + } else { + v + } + } + + #[inline(always)] + fn scale(&self, e: &ArchivedBTableEntry) -> i32 { + let v = if self.no_change(e) { + e.bufr_scale.to_native() + } else { + self.common_scale + .map(|c| { + let (v, _) = e.bufr_scale.to_native().overflowing_add(128 - c); + v + }) + .unwrap_or(e.bufr_scale.to_native()) + }; + + if let Some(op) = self.temp_operator { + e.bufr_scale.to_native() + op + } else { + v + } + } + + #[inline(always)] + fn reference_value(&self, e: &ArchivedBTableEntry) -> i32 { + let v = e.bufr_reference_value.to_native(); + + if let Some(op) = self.temp_operator { + (v as f32 * 10_f32.powi(op)) as i32 + } else { + v + } + } +} + +impl Decoder { + pub fn from_message(message: &MessageBlock) -> Result { + let table_info = message.table_info(); + let master_table_version = table_info.master_table_version; + + let master_b: BUFRTableB = message.load_first_validable_table(master_table_version)?; + let master_d: BUFRTableD = message.load_first_validable_table(master_table_version)?; + + let local_table_version = table_info.local_table_version as u32; + + let local_tables = if local_table_version > 0 { + let local_b: BUFRTableB = TableLoader.load_table(LocalTable::new( + Some(table_info.subcenter_id * 256 + table_info.center_id), + table_info.local_table_version, + ))?; + + let local_d: BUFRTableD = TableLoader.load_table(LocalTable::new( + Some(table_info.subcenter_id * 256 + table_info.center_id), + table_info.local_table_version, + ))?; + + Some((local_b, local_d)) + } else { + None + }; + + let (local_b, local_d) = if let Some((b, d)) = local_tables { + (Some(b), Some(d)) + } else { + (None, None) + }; + + #[cfg(feature = "opera")] + let opera_bitmap_table = message + .load_opera_bitmap_table( + table_info.center_id, + table_info.subcenter_id, + table_info.local_table_version, + master_table_version, + ) + .ok(); + + let decoder = Self::new( + message.version(), + master_b, + master_d, + local_b, + local_d, + #[cfg(feature = "opera")] + opera_bitmap_table, + ); + + Ok(decoder) + } + + pub fn new( + edition: u8, + master_b: BUFRTableB, + master_d: BUFRTableD, + local_b: Option, + local_d: Option, + + #[cfg(feature = "opera")] _opera_bitmap_table: Option, + ) -> Self { + Decoder { + bufr_edition: edition, + master_b, + master_d, + local_b, + local_d, + #[cfg(feature = "opera")] + opera_bitmap_table: _opera_bitmap_table, + } + } + + pub fn decode<'a, V: MessageVersion>( + &'a mut self, + message: &impl Deref, + ) -> Result> { + let data_block = message.data_block()?; + let descriptors = message.descriptors()?; + + let mut data_input = BitInput::new(data_block); + let mut record = BUFRParsed::new(); + let mut state = State::new(); + let mut cache = Cache::new( + &self.master_b, + &self.master_d, + self.local_b.as_ref(), + self.local_d.as_ref(), + ); + + let mut stack: Vec = vec![]; + stack.push(Frame::Slice { + descs: Descs::Raw(&descriptors), + idx: 0, + }); + + while let Some(frame) = stack.pop() { + match frame { + Frame::Slice { descs, idx } => { + if idx >= descs.len() { + continue; + } + match descs { + Descs::Raw(raw) => { + let des = &raw[idx]; + self.parse_slice( + des, + idx, + &mut record, + descs, + &mut stack, + &mut cache, + &mut state, + &mut data_input, + )?; + } + Descs::Archived(archived) => { + let des = &archived[idx]; + self.parse_slice( + des, + idx, + &mut record, + descs, + &mut stack, + &mut cache, + &mut state, + &mut data_input, + )?; + } + } + } + + Frame::Repeat { + descs, + times, + current, + } => { + self.parse_repeating(times, current, descs, &mut stack)?; + } + + Frame::CompiledArray { layout, times } => { + self.parse_compiled_array(&layout, times, &mut data_input, &mut record)?; + } + } + } + + Ok(record) + } + + #[inline] + fn parse_slice<'k, 'c, 'i, 's, K: BUFRKey>( + &self, + des: &K, + idx: usize, + values: &mut BUFRParsed<'c>, + descs: Descs<'k>, + // Stack + stack: &mut Vec>, + cache: &mut Cache<'c>, + state: &mut State, + data: &mut BitInput<'i>, + ) -> Result<()> + where + 'c: 'k, + { + match des.f() { + 0 => { + // Element descriptor - parse data + if let Some(e) = cache.get_b(des) { + let value = self.evalute(state, data, &e)?; + values.push(value, e.element_name_en.as_str(), e.bufr_unit.as_str()); + state.temp_operator = None; + state.local_data_width = None; + + stack.push(Frame::Slice { + descs, + idx: idx + 1, + }); + } else { + return Err(Error::ParseError(format!( + "Descriptor {:?} not found in Table B", + des + ))); + } + } + 1 => { + let x = des.x() as usize; + let mut y = des.y() as usize; + let delay_repeat = y == 0; + + if delay_repeat { + let count = match descs { + Descs::Raw(raw) => { + let count_des = &raw[idx + 1]; + self.parse_usize(state, cache, count_des, data)? + } + + Descs::Archived(archived) => { + let count_des = &archived[idx + 1]; + self.parse_usize(state, cache, count_des, data)? + } + }; + y = count; + } + + let body_start = if delay_repeat { idx + 2 } else { idx + 1 }; + let body_end = body_start + x; + + if body_end > descs.len() { + return Err(Error::ParseError(format!( + "Not enough descriptors to repeat: requested {}, available {}", + x, + descs.len() - body_start + ))); + } + + let compiled_layout = match descs { + Descs::Raw(raw) => { + let body = &raw[body_start..body_end]; + self.try_compile_array_layout(body, y, cache)? + } + Descs::Archived(archived) => { + let body = &archived[body_start..body_end]; + self.try_compile_array_layout(body, y, cache)? + } + }; + + stack.push(Frame::Slice { + descs, + idx: body_end, + }); + + let frame = if let Some(layout) = compiled_layout { + Frame::CompiledArray { layout, times: y } + } else { + // Fallback to normal interpretation + match descs { + Descs::Raw(raw) => Frame::Repeat { + descs: Descs::Raw(&raw[body_start..body_end]), + times: y, + current: 0, + }, + Descs::Archived(archived) => Frame::Repeat { + descs: Descs::Archived(&archived[body_start..body_end]), + times: y, + current: 0, + }, + } + }; + + stack.push(frame); + } + 2 => { + self.deal_with_operator(state, values, des, data)?; + stack.push(Frame::Slice { + descs, + idx: idx + 1, + }); + } + 3 => { + #[cfg(feature = "opera")] + let opera_dw = self.parse_opera_bitmap(des).map(|e| e.depth); + + if let Some(seq) = cache.get_d(des) { + let fxy_chain = seq.fxy_chain.as_slice(); + #[cfg(feature = "opera")] + if opera_dw.is_some() { + // TODO + unimplemented!(""); + } + + stack.push(Frame::Slice { + descs, + idx: idx + 1, + }); + + stack.push(Frame::Slice { + descs: Descs::Archived(fxy_chain), + idx: 0, + }); + } else { + return Err(Error::ParseError(format!( + "Sequence descriptor {:?} not found in Table D", + des + ))); + } + } + _ => { + return Err(Error::ParseError(format!( + "Invalid descriptor F value: {}", + des.f() + ))); + } + } + + Ok(()) + } + + #[inline] + fn _parse_slice<'c, 'i, 's, K: BUFRKey>( + &self, + des: &K, + values: &mut BUFRParsed<'c>, + // Stack + cache: &mut Cache<'c>, + state: &mut State, + data: &mut BitInput<'i>, + ) -> Result<()> { + match des.f() { + 0 => { + if let Some(e) = cache.get_b(des) { + let value = self.evalute(state, data, &e)?; + values.push(value, e.element_name_en.as_str(), e.bufr_unit.as_str()); + + state.temp_operator = None; + state.local_data_width = None; + } else { + return Err(Error::ParseError(format!( + "Descriptor {:?} not found in Table B", + des + ))); + } + } + 2 => { + self.deal_with_operator(state, values, des, data)?; + } + _ => { + return Err(Error::ParseError(format!( + "Invalid descriptor F value: {}", + des.f() + ))); + } + } + + Ok(()) + } + + fn parse_repeating<'k, 'c, 'i, 's>( + &self, + times: usize, + current: usize, + // + descs: Descs<'k>, + // Stack + stack: &mut Vec>, + ) -> Result<()> + where + 'c: 'k, + { + if current >= times { + return Ok(()); + } + stack.push(Frame::Repeat { + descs, + times, + current: current + 1, + }); + + stack.push(Frame::Slice { descs, idx: 0 }); + + Ok(()) + } + + fn parse_usize<'a, 'b, 'c, K: BUFRKey>( + &self, + state: &State, + cache: &mut Cache<'c>, + des: &'a K, + data: &mut BitInput<'b>, + ) -> Result { + match des.f() { + 0 => { + if let Some(e) = cache.get_b(des) { + let value = self.evalute(state, data, &e)?; + + if let Some(v) = value.as_f64() { + Ok(v.floor() as usize) + } else { + Err(Error::ParseError(format!("Format Error"))) + } + } else { + Err(Error::ParseError(format!( + "Descriptor {:?} not found in Table B", + des + ))) + } + } + _ => Err(Error::ParseError(format!( + "Descriptor {:?} not found in Table B", + des + ))), + } + } + + #[inline(always)] + fn evalute<'a>( + &self, + state: &State, + data: &mut BitInput<'a>, + e: &ArchivedBTableEntry, + ) -> Result { + match e.bufr_unit.as_str() { + "CCITT IA5" => { + let total_bytes = state + .common_str_width + .unwrap_or(((e.bufr_datawidth_bits.to_native() as usize) + 7) / 8); + let s = data.take_string(total_bytes as usize)?; + return Ok(Value::String(s)); + } + _ => { + let datawidth = state.datawidth(e); + let scale = state.scale(e) as f64; + let reference_value = state.reference_value(e) as f64; + let value = data.get_arbitary_bits(datawidth as usize)?; + let mv = (1 << datawidth) - 1; + if value == mv && e.fxy.x != 31 { + return Ok(Value::Missing); + } + let result = ((value as f64) + reference_value) * 10.0f64.powi(-scale as i32); + return Ok(Value::Number(result)); + } + } + } + + fn try_compile_array_layout<'a, K: BUFRKey>( + &self, + body: &[K], + repeat_count: usize, + cache: &mut Cache<'a>, + ) -> Result>> { + // Early rejection: too small + if repeat_count < 16 { + return Ok(None); + } + + let mut compiler_state = CompilerState { + common_scale: None, + common_ref_value: None, + common_data_width: None, + temp_operator: None, + common_str_width: None, + local_data_width: None, + }; + + let mut fields = Vec::with_capacity(body.len()); + let mut total_bits = 0usize; + + for desc in body { + match desc.f() { + 0 => { + // Element descriptor - compile field spec + let entry = cache.get_b(desc).ok_or_else(|| { + Error::ParseError(format!("Missing Table B entry for {:?}", desc)) + })?; + + // Reject strings + if entry.bufr_unit.as_str() == "CCITT IA5" { + return Ok(None); + } + + // Compute effective parameters + let width = self.compute_effective_width(&compiler_state, entry); + let scale = self.compute_effective_scale(&compiler_state, entry); + let reference = self.compute_effective_reference(&compiler_state, entry); + let missing = if width == 64 { + u64::MAX + } else { + (1u64 << width) - 1 + }; + + fields.push(FieldSpec { + fxy: FXY::new(desc.f(), desc.x(), desc.y()), + name: entry.element_name_en.as_str(), + unit: entry.bufr_unit.as_str(), + width_bits: width, + scale, + reference, + missing_value: missing, + }); + + total_bits += width as usize; + + // Clear one-time operators after use + // 2-07 and 2-06 apply only to the next element + compiler_state.temp_operator = None; + compiler_state.local_data_width = None; + } + + 2 => { + if !self.apply_operator_to_compiler(&mut compiler_state, desc)? { + return Ok(None); + } + } + + 1 | 3 => { + // Nested replication or sequence - reject + return Ok(None); + } + + _ => { + return Err(Error::ParseError(format!("Invalid F value: {}", desc.f()))); + } + } + } + + if compiler_state.temp_operator.is_some() { + return Ok(None); + } + + Ok(Some(CompiledLayout { + fields, + bits_per_element: total_bits, + })) + } + + fn apply_operator_to_compiler( + &self, + state: &mut CompilerState, + operator: &K, + ) -> Result { + let x = operator.x(); + let y = operator.y() as i32; + + match x { + 1 => { + // 2-01-YYY: data width change + state.common_data_width = if y == 0 { None } else { Some(y) }; + Ok(true) + } + 2 => { + // 2-02-YYY: scale change + state.common_scale = if y == 0 { None } else { Some(y) }; + Ok(true) + } + 3 => { + // 2-03-YYY: reference value change + state.common_ref_value = if y == 0 { None } else { Some(y) }; + Ok(true) + } + 5 => { + // 2-05-YYY: string literal - consumes bits, reject + Ok(false) + } + 6 => { + // 2-06-YYY: localized data width - affects only next element + state.local_data_width = Some(y); + Ok(true) + } + 7 => { + // 2-07-YYY: increase scale/width/ref - affects only next element + state.temp_operator = Some(y); + Ok(true) + } + 8 => { + // 2-08-YYY: character width - reject (affects strings) + Ok(false) + } + _ => { + // Unknown/unsupported operator - allow but ignore + Ok(true) + } + } + } + + #[inline] + fn compute_effective_width(&self, state: &CompilerState, e: &ArchivedBTableEntry) -> u32 { + if let Some(local_width) = state.local_data_width { + return local_width as u32; + } + + let unit = e.bufr_unit.as_str(); + let is_flag_or_code = matches!( + unit, + "flag table" | "flag-table" | "code table" | "code-table" + ); + let delay_repeat_count = e.fxy.f.to_native() == 0 && e.fxy.x.to_native() == 31; + let no_change = is_flag_or_code || delay_repeat_count; + + let base_width = if no_change { + e.bufr_datawidth_bits.to_native() + } else { + state + .common_data_width + .map(|c| { + let (v, _) = e + .bufr_datawidth_bits + .to_native() + .overflowing_add_signed(c - 128); + v + }) + .unwrap_or(e.bufr_datawidth_bits.to_native()) + }; + + // 2-07-YYY: increase width by 10*Y bits + if let Some(op) = state.temp_operator { + base_width + (10 * op) as u32 + } else { + base_width + } + } + + #[inline] + fn compute_effective_scale(&self, state: &CompilerState, e: &ArchivedBTableEntry) -> i32 { + let unit = e.bufr_unit.as_str(); + let is_flag_or_code = matches!( + unit, + "flag table" | "flag-table" | "code table" | "code-table" + ); + let delay_repeat_count = e.fxy.f.to_native() == 0 && e.fxy.x.to_native() == 31; + let no_change = is_flag_or_code || delay_repeat_count; + + let base_scale = if no_change { + e.bufr_scale.to_native() + } else { + state + .common_scale + .map(|c| { + let (v, _) = e.bufr_scale.to_native().overflowing_add(128 - c); + v + }) + .unwrap_or(e.bufr_scale.to_native()) + }; + + if let Some(op) = state.temp_operator { + base_scale + op + } else { + base_scale + } + } + + #[inline] + fn compute_effective_reference(&self, state: &CompilerState, e: &ArchivedBTableEntry) -> i32 { + let base_ref = e.bufr_reference_value.to_native(); + + if let Some(op) = state.temp_operator { + (base_ref as f32 * 10_f32.powi(op)) as i32 + } else { + base_ref + } + } + + /// Fast path: decode array using pre-compiled layout + fn parse_compiled_array<'a>( + &self, + layout: &CompiledLayout<'a>, + repeat_count: usize, + data: &mut BitInput, + values: &mut BUFRParsed<'a>, + ) -> Result<()> { + let mut total_values = vec![vec![]; layout.fields.len()]; + // For each repetition + for _ in 0..repeat_count { + // For each field in the layout + for (i, field_spec) in layout.fields.iter().enumerate() { + let raw_value = data.get_arbitary_bits(field_spec.width_bits as usize)?; + + // Check for missing value (skip 0-31-YYY delayed replication counts) + let value = if raw_value == field_spec.missing_value + && !(field_spec.fxy.f == 0 && field_spec.fxy.x == 31) + { + MISS_VAL + } else { + // Apply scale and reference + let scaled = ((raw_value as f64) + (field_spec.reference as f64)) + * 10.0f64.powi(-field_spec.scale); + scaled + }; + + total_values[i].push(value); + } + } + + for (v, field) in total_values.into_iter().zip(layout.fields.iter()) { + let mut array = values.start_array(0); + array.set_values(v); + array.finish(Some(field.name), Some(field.unit)); + } + + Ok(()) + } + + fn deal_with_operator<'s, 'a, C: Container<'s>, K: BUFRKey>( + &self, + state: &mut State, + values: &mut C, + operator: &K, + data: &mut BitInput<'a>, + ) -> Result<()> { + let x = operator.x(); + let y = operator.y(); + + match x { + 1 => match y { + 0 => { + state.common_data_width = None; + } + _ => { + state.common_data_width = Some(y); + } + }, + 2 => match y { + 0 => { + state.common_scale = None; + } + _ => { + state.common_scale = Some(y); + } + }, + 3 => match y { + 0 => { + state.common_ref_value = None; + } + _ => { + state.common_ref_value = Some(y); + } + }, + 5 => { + let string = data.take_string(y as usize)?; + values.push(Value::String(string), "", "CAITT IA5"); + } + + 6 => { + let localized_width = y; + state.local_data_width = Some(localized_width); + } + 7 => { + state.temp_operator = Some(y); + } + 8 => match y { + 0 => { + state.common_str_width = None; + } + _ => { + state.common_str_width = Some(y as usize); + } + }, + _ => {} + } + + Ok(()) + } + + #[cfg(feature = "opera")] + fn parse_opera_bitmap(&self, des: &K) -> Option<&ArchivedBitMapEntry> { + self.opera_bitmap_table + .as_ref() + .map(|t| t.lookup(des)) + .flatten() + } + + // #[cfg(feature = "opera")] + // fn parse_opera_array<'a>( + // &mut self, + // dw: u8, + // mut descs: VecDeque, + // mut data: BitInput<'a>, + // ) -> Result<(VecDeque, BitInput<'a>)> { + // use crate::opera::OperaBitmapParser; + + // let mut opera_bitmap = OperaBitmapParser::new(dw); + + // while !descs.is_empty() { + // let (_descs, _data) = self.parser_inner(opera_bitmap.values(), descs, data)?; + // descs = _descs; + // data = _data; + // } + // Ok((descs, data)) + // } + + // fn seq_parser(descriptors: &[genlib::FXY]) -> Result<()> {} +} + +#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] +pub enum Value { + Number(f64), + Missing, + String(String), +} + +impl std::fmt::Display for Value { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Value::Number(v) => write!(f, "{}", v), + Value::String(v) => write!(f, "{}", v), + Value::Missing => write!(f, "MISSING"), + } + } +} + +impl Value { + pub fn as_f64(&self) -> Option { + match self { + Value::Number(v) => Some(*v), + Value::Missing => Some(MISS_VAL), + Value::String(_) => None, + } + } + + pub fn as_str(&self) -> Option<&str> { + match self { + Value::String(v) => Some(v), + Value::Number(_) => None, + Value::Missing => None, + } + } + + pub fn as_bytes(&self) -> Option> { + match self { + Value::String(_) => None, + Value::Number(n) => Some(n.to_le_bytes().to_vec()), + Value::Missing => None, + } + } + + pub fn is_missing(&self) -> bool { + matches!(self, Value::Missing) + } +} + +#[derive(Debug, Clone, Copy)] +pub struct BitInput<'a>(&'a [u8], usize); + +impl<'a> BitInput<'a> { + pub fn new(input: &[u8]) -> BitInput { + BitInput(input, 0) + } + + pub fn pointer(&self) -> usize { + self.1 + } + + #[inline] + pub fn take_string(&mut self, nbytes: usize) -> Result { + if nbytes == 0 { + return Ok(String::new()); + } + + // Fast path: byte-aligned string reads + if self.1 == 0 { + if self.0.len() < nbytes { + return Err(Error::ParseError("Not enough data for string".to_string())); + } + let s = String::from_utf8(self.0[..nbytes].to_vec()) + .map_err(|_| Error::ParseError("Invalid UTF-8 string".to_string()))?; + self.0 = &self.0[nbytes..]; + self.1 = 0; + return Ok(s); + } + + // Slow path: unaligned reads + let mut chars = Vec::with_capacity(nbytes); + // let mut remaining_input = self; + + for _ in 0..nbytes { + let byte_value = self.get_arbitary_bits(8)?; + chars.push(byte_value as u8); + } + + let s = String::from_utf8(chars) + .map_err(|_| Error::ParseError("Invalid UTF-8 string".to_string()))?; + Ok(s) + } + + #[inline] + pub fn get_arbitary_bits(&mut self, nbits: usize) -> Result { + if nbits == 0 { + return Ok(0); + } + + // Fast path: byte-aligned reads for common bit widths + if self.1 == 0 { + return self.get_arbitary_bits_aligned(nbits); + } + + // General path for unaligned reads + self.get_arbitary_bits_unaligned(nbits) + } + + /// Batch read multiple values with the same bit width + /// Optimized for arrays of numeric data + #[inline] + pub fn get_batch_same_width(&mut self, nbits: usize, count: usize) -> Result> { + if count == 0 { + return Ok(Vec::new()); + } + + let mut result = Vec::with_capacity(count); + + // Fast path: byte-aligned and byte-multiple bit widths + if self.1 == 0 && nbits % 8 == 0 { + let bytes_per_item = nbits / 8; + let total_bytes = bytes_per_item * count; + + if self.0.len() < total_bytes { + return Err(Error::ParseError( + "Not enough data for batch read".to_string(), + )); + } + + match nbits { + 8 => { + // Optimized path for 8-bit values + for i in 0..count { + result.push(self.0[i] as u64); + } + self.0 = &self.0[count..]; + } + 16 => { + // Optimized path for 16-bit values + for i in 0..count { + let offset = i * 2; + let value = u16::from_be_bytes([self.0[offset], self.0[offset + 1]]) as u64; + result.push(value); + } + self.0 = &self.0[total_bytes..]; + } + 24 => { + // Optimized path for 24-bit values + for i in 0..count { + let offset = i * 3; + let value = ((self.0[offset] as u64) << 16) + | ((self.0[offset + 1] as u64) << 8) + | (self.0[offset + 2] as u64); + result.push(value); + } + self.0 = &self.0[total_bytes..]; + } + 32 => { + // Optimized path for 32-bit values + for i in 0..count { + let offset = i * 4; + let value = u32::from_be_bytes([ + self.0[offset], + self.0[offset + 1], + self.0[offset + 2], + self.0[offset + 3], + ]) as u64; + result.push(value); + } + self.0 = &self.0[total_bytes..]; + } + _ => { + // Generic byte-aligned path + for i in 0..count { + let offset = i * bytes_per_item; + let mut value: u64 = 0; + for j in 0..bytes_per_item { + value = (value << 8) | (self.0[offset + j] as u64); + } + result.push(value); + } + self.0 = &self.0[total_bytes..]; + } + } + + return Ok(result); + } + + // Non-aligned or non-byte-multiple: fall back to individual reads + for _ in 0..count { + result.push(self.get_arbitary_bits(nbits)?); + } + + Ok(result) + } + + /// Fast path for byte-aligned bit reads + #[inline] + fn get_arbitary_bits_aligned(&mut self, nbits: usize) -> Result { + let byte_data = self.0; + + // Optimized paths for common bit widths + match nbits { + 8 => { + if byte_data.is_empty() { + return Err(Error::ParseError("Not enough data".to_string())); + } + self.0 = &self.0[1..]; + self.1 = 0; + Ok(byte_data[0] as u64) + } + 16 => { + if byte_data.len() < 2 { + return Err(Error::ParseError("Not enough data".to_string())); + } + let value = u16::from_be_bytes([byte_data[0], byte_data[1]]) as u64; + self.0 = &self.0[2..]; + self.1 = 0; + Ok(value) + } + 24 => { + if byte_data.len() < 3 { + return Err(Error::ParseError("Not enough data".to_string())); + } + let value = ((byte_data[0] as u64) << 16) + | ((byte_data[1] as u64) << 8) + | (byte_data[2] as u64); + self.0 = &self.0[3..]; + self.1 = 0; + Ok(value) + } + 32 => { + if byte_data.len() < 4 { + return Err(Error::ParseError("Not enough data".to_string())); + } + let value = + u32::from_be_bytes([byte_data[0], byte_data[1], byte_data[2], byte_data[3]]) + as u64; + self.0 = &self.0[4..]; + self.1 = 0; + Ok(value) + } + _ => { + // Generic byte-aligned path + let nbytes = (nbits + 7) / 8; + if byte_data.len() < nbytes { + return Err(Error::ParseError("Not enough data".to_string())); + } + + let mut value: u64 = 0; + let full_bytes = nbits / 8; + + // Read full bytes + for i in 0..full_bytes { + value = (value << 8) | (byte_data[i] as u64); + } + + let remaining_bits = nbits % 8; + if remaining_bits > 0 { + // Read partial byte + let last_byte = byte_data[full_bytes]; + let shift = 8 - remaining_bits; + let mask = ((1u16 << remaining_bits) - 1) as u8; + let bits = (last_byte >> shift) & mask; + value = (value << remaining_bits) | (bits as u64); + self.0 = &self.0[full_bytes..]; + self.1 = remaining_bits; + Ok(value) + } else { + self.0 = &self.0[full_bytes..]; + self.1 = 0; + Ok(value) + } + } + } + } + + /// Optimized path for unaligned bit reads + /// Reads up to 64 bits from an unaligned position in one go + #[inline] + fn get_arbitary_bits_unaligned(&mut self, nbits: usize) -> Result { + if nbits > 64 { + return Err(Error::ParseError( + "Cannot read more than 64 bits".to_string(), + )); + } + + let bit_offset = self.1; + + // Calculate how many bytes we need to read + // We need enough bytes to cover: bit_offset + nbits + let total_bits_needed = bit_offset + nbits; + let bytes_needed = (total_bits_needed + 7) / 8; + + if self.0.len() < bytes_needed { + return Err(Error::ParseError("Not enough data".to_string())); + } + + // Read up to 8 bytes into a u64 buffer for fast bit extraction + let mut buffer: u64 = 0; + let bytes_to_read = bytes_needed.min(8); + + for i in 0..bytes_to_read { + buffer = (buffer << 8) | (self.0[i] as u64); + } + + // If we need more than 8 bytes, handle the extra byte + if bytes_needed > 8 { + // This is rare - only happens for very unaligned 64-bit reads + // Shift what we have and add the 9th byte + let ninth_byte = self.0[8] as u64; + let bits_from_ninth = total_bits_needed - 64; + buffer = (buffer << bits_from_ninth) | (ninth_byte >> (8 - bits_from_ninth)); + } + + // Extract the desired bits + // The bits we want are in the high portion of the buffer + let bits_in_buffer = bytes_to_read * 8; + let shift = bits_in_buffer - bit_offset - nbits; + let mask = if nbits == 64 { + u64::MAX + } else { + (1u64 << nbits) - 1 + }; + let value = (buffer >> shift) & mask; + + // Update state + let new_bit_position = self.1 + nbits; + let bytes_consumed = new_bit_position / 8; + self.0 = &self.0[bytes_consumed..]; + self.1 = new_bit_position % 8; + + Ok(value) + } +} + +trait Container<'a> +where + Self: Sized, +{ + fn push(&mut self, value: Value, name: &'a str, unit: &'a str); + + fn start_repeating<'b>(&'b mut self, time: usize) -> Repeating<'a, 'b>; +} + +impl<'a> Container<'a> for BUFRParsed<'a> { + fn push(&mut self, value: Value, name: &'a str, unit: &'a str) { + self.push(value, name, unit); + } + + fn start_repeating<'s>(&'s mut self, time: usize) -> Repeating<'a, 's> { + self.start_repeating(time) + } +} + +impl<'a, 'b> Container<'a> for Repeating<'a, 'b> { + fn push(&mut self, value: Value, _name: &'a str, _unit: &'a str) { + self.push(value); + } + + fn start_repeating<'s>(&'s mut self, time: usize) -> Repeating<'a, 's> { + Repeating { + parsed: self.parsed, + values: Vec::with_capacity(time), + } + } +} + +pub struct BUFRParsed<'a> { + records: Vec>, +} + +impl<'a> BUFRParsed<'a> { + pub fn new() -> Self { + Self { records: vec![] } + } + + pub fn push(&mut self, value: Value, element_name: &'a str, unit: &'a str) { + self.records.push(BUFRRecord { + name: Some(element_name), + values: BUFRData::Single(value), + unit: Some(unit), + }); + } + + fn start_repeating<'s>(&'s mut self, time: usize) -> Repeating<'a, 's> { + Repeating { + parsed: self, + values: Vec::with_capacity(time), + } + } + + fn start_array<'s>(&'s mut self, time: usize) -> Array<'a, 's> { + Array { + parsed: self, + values: Vec::with_capacity(time), + } + } +} + +struct Array<'a, 's> { + parsed: &'s mut BUFRParsed<'a>, + values: Vec, +} + +impl<'a> Array<'a, '_> { + fn set_values(&mut self, values: Vec) { + self.values = values; + } + + fn push(&mut self, v: f64) { + self.values.push(v); + } + + fn finish(self, name: Option<&'a str>, unit: Option<&'a str>) { + let recording = BUFRRecord { + name, + values: BUFRData::Array(self.values), + unit, + }; + self.parsed.records.push(recording); + } +} + +struct Repeating<'a, 's> { + parsed: &'s mut BUFRParsed<'a>, + values: Vec, +} + +impl<'a, 's> Repeating<'a, 's> { + fn push(&mut self, value: Value) { + self.values.push(value); + } + + fn finish(self) { + let recording = BUFRRecord { + name: None, + values: BUFRData::Repeat(self.values), + unit: None, + }; + self.parsed.records.push(recording); + } +} + +pub enum BUFRData { + Repeat(Vec), + Single(Value), + Array(Vec), +} + +pub struct BUFRRecord<'a> { + pub name: Option<&'a str>, + pub values: BUFRData, + pub unit: Option<&'a str>, +} + +impl Display for BUFRRecord<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let is_print_unit = match self.unit { + Some("CAITT IA5" | "code table" | "code-table" | "flag table" | "flag-table") => false, + None => false, + _ => true, + }; + + if self.name.is_none() { + return Ok(()); + } + + let name = self.name.as_ref().unwrap(); + + match &self.values { + BUFRData::Single(v) => { + if is_print_unit { + write!(f, "{}: {} {}", name, v, self.unit.as_ref().unwrap())?; + } else { + write!(f, "{}: {}", name, v)?; + } + } + BUFRData::Repeat(vs) => { + if vs.len() < 8 { + write!(f, "{}: [", name)?; + for v in vs { + if is_print_unit { + write!(f, "{} {}, ", v, self.unit.as_ref().unwrap())?; + } else { + write!(f, "{}, ", v)?; + } + } + + write!(f, "]")?; + } else { + write!(f, "{}: [", name)?; + for v in &vs[0..5] { + if is_print_unit { + write!(f, "{} {}, ", v, self.unit.as_ref().unwrap())?; + } else { + write!(f, "{}, ", v)?; + } + } + write!(f, "... ")?; + for v in &vs[vs.len() - 2..vs.len()] { + if is_print_unit { + write!(f, "{} {}, ", v, self.unit.as_ref().unwrap())?; + } else { + write!(f, "{}, ", v)?; + } + } + write!(f, "]")?; + } + } + + BUFRData::Array(a) => { + if a.len() < 8 { + write!(f, "{}: [", name)?; + for v in a { + if is_print_unit { + write!(f, "{} {}, ", v, self.unit.as_ref().unwrap())?; + } else { + write!(f, "{}, ", v)?; + } + } + + write!(f, "]")?; + } else { + write!(f, "{}: [", name)?; + for v in &a[0..5] { + if is_print_unit { + write!(f, "{} {}, ", v, self.unit.as_ref().unwrap())?; + } else { + write!(f, "{}, ", v)?; + } + } + write!(f, "... ")?; + for v in &a[a.len() - 2..a.len()] { + if is_print_unit { + write!(f, "{} {}, ", v, self.unit.as_ref().unwrap())?; + } else { + write!(f, "{}, ", v)?; + } + } + write!(f, "]")?; + } + } + } + + Ok(()) + } +} + +impl Display for BUFRParsed<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + for record in &self.records { + writeln!(f, "{}", record)?; + } + Ok(()) + } +} + +enum Frame<'v, 'a> { + Slice { + descs: Descs<'v>, + idx: usize, + }, + Repeat { + descs: Descs<'v>, + times: usize, + current: usize, + }, + CompiledArray { + layout: CompiledLayout<'a>, + times: usize, + }, +} + +#[derive(Clone, Copy)] +enum Descs<'v> { + Raw(&'v [genlib::FXY]), + Archived(&'v [ArchivedFXY]), +} + +impl Descs<'_> { + fn len(&self) -> usize { + match self { + Descs::Raw(d) => d.len(), + Descs::Archived(d) => d.len(), + } + } + + fn total_bits(&self, state: &State, cache: &mut Cache) -> Result { + match self { + Descs::Raw(d) => self._total_bits(state, cache, d), + Descs::Archived(d) => self._total_bits(state, cache, d), + } + } + + fn _total_bits( + &self, + state: &State, + cache: &mut Cache, + decs: &[K], + ) -> Result { + let mut total_width = 0; + for des in decs { + let e = cache.get_b(des).ok_or(Error::TableNotFoundEmpty)?; + let width = state.datawidth(e); + total_width += width as usize; + } + + Ok(total_width) + } +} diff --git a/rbufr/src/lib.rs b/rbufr/src/lib.rs index 0eaf77d..b65e46d 100644 --- a/rbufr/src/lib.rs +++ b/rbufr/src/lib.rs @@ -1,39 +1,11 @@ -mod block; -mod errors; -mod opera; +pub mod block; +pub mod decoder; +pub mod errors; +#[cfg(feature = "opera")] +pub mod opera; pub mod parser; pub mod structs; -mod tables; +pub mod tables; -#[cfg(test)] -mod test { - - #[test] - fn test() { - use genlib::prelude::*; - let bufr = BUFRTableB::load_from_disk( - "/Users/xiang.li1/projects/rbufr/tables/BUFRCREX_TableB_en_00.bufrtbl", - ) - .unwrap(); - - let entry = bufr.lookup(&FXY::new(0, 0, 1)).unwrap(); - - println!("{:#?}", entry); - } - - #[test] - fn test_rb() { - use crate::parser::Parser; - - let mut parser = Parser::new(); - let parsed_file = parser - .parse("/Users/xiang.li1/Downloads/36_2025-12-22T11_00_00.bufr") - .unwrap(); - - for msg in parsed_file.messages() { - println!("{}", msg); - - msg.load_data().unwrap(); - } - } -} +pub use crate::decoder::Decoder; +pub use crate::parser::*; diff --git a/rbufr/src/opera.rs b/rbufr/src/opera.rs index da8c8b0..59a5239 100644 --- a/rbufr/src/opera.rs +++ b/rbufr/src/opera.rs @@ -1,6 +1,4 @@ -use crate::structs::data_parser::Value; - -/// This Module contains functions specific to handling BUFR Opera files. +use crate::decoder::Value; pub struct OperaBitmapParser { values: Vec, diff --git a/rbufr/src/parser.rs b/rbufr/src/parser.rs index daf2f36..f33f53c 100644 --- a/rbufr/src/parser.rs +++ b/rbufr/src/parser.rs @@ -9,133 +9,106 @@ use std::{ }; const BUFR_PATTERN: &[u8] = b"BUFR"; -const BUFFER_SIZE: usize = 8192; // 8KB buffer for scanning -// const MAX_MESSAGE_SIZE: usize = 500_000; // 500KB max message size +const BUFFER_SIZE: usize = 8192; -pub struct Parser; +pub fn parse>(path: P) -> Result { + let file = File::open(path)?; + let mut reader = BufReader::new(file); -impl Parser { - pub fn new() -> Self { - Self - } - - /// Find all offsets in the file where "BUFR" appears using streaming approach - fn find_bufr_offsets(reader: &mut R) -> Result> { - let mut offsets = Vec::new(); - let mut buffer = vec![0u8; BUFFER_SIZE]; - let mut file_offset = 0u64; - let mut overlap = vec![0u8; BUFR_PATTERN.len() - 1]; - let mut overlap_len = 0; + let mut magic_bytes = [0u8; 2]; + reader.read_exact(&mut magic_bytes)?; + reader.seek(SeekFrom::Start(0))?; + if magic_bytes == [0x1F, 0x8B] { + let mut gz_decoder = GzDecoder::new(reader); + let mut bytes = vec![]; + gz_decoder.read_to_end(&mut bytes)?; + parse_inner(&mut Cursor::new(bytes)) + } else { reader.seek(SeekFrom::Start(0))?; - - loop { - let bytes_read = reader.read(&mut buffer)?; - if bytes_read == 0 { - break; - } - - // Create a combined view of overlap + new data - let mut search_buffer = Vec::with_capacity(overlap_len + bytes_read); - search_buffer.extend_from_slice(&overlap[..overlap_len]); - search_buffer.extend_from_slice(&buffer[..bytes_read]); - - // Search for BUFR pattern - for i in 0..search_buffer.len().saturating_sub(BUFR_PATTERN.len() - 1) { - if search_buffer.len() >= i + BUFR_PATTERN.len() - && &search_buffer[i..i + BUFR_PATTERN.len()] == BUFR_PATTERN - { - let actual_offset = file_offset - overlap_len as u64 + i as u64; - offsets.push(actual_offset); - } - } - - // Save overlap for next iteration - if bytes_read >= BUFR_PATTERN.len() - 1 { - overlap_len = BUFR_PATTERN.len() - 1; - overlap[..overlap_len] - .copy_from_slice(&buffer[bytes_read - overlap_len..bytes_read]); - } else { - overlap_len = bytes_read; - overlap[..overlap_len].copy_from_slice(&buffer[..bytes_read]); - } - - file_offset += bytes_read as u64; - } - - Ok(offsets) - } - - /// Read a BUFR message from file at specific offset - fn read_message_at_offset(reader: &mut R, offset: u64) -> Result> { - reader.seek(SeekFrom::Start(offset))?; - - // Read Section 0 to get total length - let mut section0_buf = [0u8; 8]; - reader.read_exact(&mut section0_buf)?; - - // Parse total length (3 bytes starting at offset 4) - let total_length = - u32::from_be_bytes([0, section0_buf[4], section0_buf[5], section0_buf[6]]); - - // Read entire message - let mut message_buf = vec![0u8; total_length as usize]; - reader.seek(SeekFrom::Start(offset))?; - reader.read_exact(&mut message_buf)?; - - Ok(message_buf) - } - - /// Parse a file containing one or more BUFR messages using streaming approach - pub fn parse>(&mut self, path: P) -> Result { - let file = File::open(path)?; - let mut reader = BufReader::new(file); - - // Try to detect gzip compression - let mut magic_bytes = [0u8; 2]; - reader.read_exact(&mut magic_bytes)?; - reader.seek(SeekFrom::Start(0))?; - if magic_bytes == [0x1F, 0x8B] { - // Gzip magic number detected - let mut gz_decoder = GzDecoder::new(reader); - let mut bytes = vec![]; - gz_decoder.read_to_end(&mut bytes)?; - - self.parse_inner(&mut Cursor::new(bytes)) - } else { - // Not compressed - // Rewind reader - reader.seek(SeekFrom::Start(0))?; - self.parse_inner(&mut reader) - } - } - - fn parse_inner(&self, buf_reader: &mut R) -> Result - where - R: Read + Seek, - { - // Find all BUFR message offsets - let offsets = Self::find_bufr_offsets(buf_reader)?; - - let mut file_block = BUFRFile::new(); - - // Parse each BUFR message - for offset in offsets { - match Self::read_message_at_offset(buf_reader, offset) { - Ok(message_data) => match BUFRMessage::parse(&message_data) { - Ok(message) => { - file_block.push_message(message); - } - Err(e) => { - eprintln!("Failed to parse BUFR message at offset {}: {:?}", offset, e); - } - }, - Err(e) => { - eprintln!("Failed to read BUFR message at offset {}: {:?}", offset, e); - } - } - } - - Ok(file_block) + parse_inner(&mut reader) } } + +fn find_bufr_offsets(reader: &mut R) -> Result> { + let mut offsets = Vec::new(); + let mut buffer = vec![0u8; BUFFER_SIZE]; + let mut file_offset = 0u64; + let mut overlap = vec![0u8; BUFR_PATTERN.len() - 1]; + let mut overlap_len = 0; + + reader.seek(SeekFrom::Start(0))?; + + loop { + let bytes_read = reader.read(&mut buffer)?; + if bytes_read == 0 { + break; + } + + let mut search_buffer = Vec::with_capacity(overlap_len + bytes_read); + search_buffer.extend_from_slice(&overlap[..overlap_len]); + search_buffer.extend_from_slice(&buffer[..bytes_read]); + + for i in 0..search_buffer.len().saturating_sub(BUFR_PATTERN.len() - 1) { + if search_buffer.len() >= i + BUFR_PATTERN.len() + && &search_buffer[i..i + BUFR_PATTERN.len()] == BUFR_PATTERN + { + let actual_offset = file_offset - overlap_len as u64 + i as u64; + offsets.push(actual_offset); + } + } + + if bytes_read >= BUFR_PATTERN.len() - 1 { + overlap_len = BUFR_PATTERN.len() - 1; + overlap[..overlap_len].copy_from_slice(&buffer[bytes_read - overlap_len..bytes_read]); + } else { + overlap_len = bytes_read; + overlap[..overlap_len].copy_from_slice(&buffer[..bytes_read]); + } + + file_offset += bytes_read as u64; + } + + Ok(offsets) +} + +fn read_message_at_offset(reader: &mut R, offset: u64) -> Result> { + reader.seek(SeekFrom::Start(offset))?; + + let mut section0_buf = [0u8; 8]; + reader.read_exact(&mut section0_buf)?; + + let total_length = u32::from_be_bytes([0, section0_buf[4], section0_buf[5], section0_buf[6]]); + + let mut message_buf = vec![0u8; total_length as usize]; + reader.seek(SeekFrom::Start(offset))?; + reader.read_exact(&mut message_buf)?; + + Ok(message_buf) +} + +fn parse_inner(buf_reader: &mut R) -> Result +where + R: Read + Seek, +{ + let offsets = find_bufr_offsets(buf_reader)?; + let mut file_block = BUFRFile::new(); + + for offset in offsets { + match read_message_at_offset(buf_reader, offset) { + Ok(message_data) => match BUFRMessage::parse(&message_data) { + Ok(message) => { + file_block.push_message(message); + } + Err(e) => { + eprintln!("Failed to parse BUFR message at offset {}: {:?}", offset, e); + } + }, + Err(e) => { + eprintln!("Failed to read BUFR message at offset {}: {:?}", offset, e); + } + } + } + + Ok(file_block) +} diff --git a/rbufr/src/structs/bit.rs b/rbufr/src/structs/bit.rs index 3e20f82..afb2e8d 100644 --- a/rbufr/src/structs/bit.rs +++ b/rbufr/src/structs/bit.rs @@ -1,11 +1,9 @@ -use std::ops::{AddAssign, Shl, Shr}; - use nom::IResult; -use nom::bits::{bits, bytes, complete::take}; +use nom::bits::complete::take; +use std::ops::{AddAssign, Shl, Shr}; +pub(super) type BitInput<'a> = (&'a [u8], usize); -pub type BitInput<'a> = (&'a [u8], usize); - -pub fn parse_arbitrary_bits< +pub(super) fn parse_arbitrary_bits< T: From + AddAssign + Shl + Shr, >( input: BitInput, @@ -13,17 +11,3 @@ pub fn parse_arbitrary_bits< ) -> IResult { take(count)(input) } - -#[cfg(test)] -mod test { - use crate::structs::bit::parse_arbitrary_bits; - - #[test] - fn test() { - let data = [0xA0, 0xA0, 0x01, 0xA0]; - - let result = parse_arbitrary_bits::((&data, 0), 16).unwrap(); - - println!("{:?}", result); - } -} diff --git a/rbufr/src/structs/data_parser.rs b/rbufr/src/structs/data_parser.rs deleted file mode 100644 index 4a9b136..0000000 --- a/rbufr/src/structs/data_parser.rs +++ /dev/null @@ -1,1003 +0,0 @@ -use crate::{ - errors::{Error, Result}, - structs::versions::MessageVersion, -}; -#[cfg(feature = "opera")] -use genlib::tables::ArchivedBitMapEntry; -use genlib::{ - ArchivedFXY, BUFRKey, FXY, - prelude::{BUFRTableB, BUFRTableBitMap, BUFRTableD}, - tables::{ArchivedBTableEntry, ArchivedDTableEntry}, -}; -use rustc_hash::FxHashMap; -use std::fmt::Display; - -const MISS_VAL: f64 = 99999.999999; - -pub struct DataParser { - bufr_edition: u8, - master_b: BUFRTableB, - master_d: BUFRTableD, - // local - local_b: Option, - local_d: Option, - // opera - #[cfg(feature = "opera")] - opera_bitmap_table: Option, -} - -struct Cache<'a> { - master_b: &'a BUFRTableB, - master_d: &'a BUFRTableD, - local_b: Option<&'a BUFRTableB>, - local_d: Option<&'a BUFRTableD>, - - // Cache - b_cache: FxHashMap, - d_cache: FxHashMap, -} - -impl<'a> Cache<'a> { - fn new( - master_b: &'a BUFRTableB, - master_d: &'a BUFRTableD, - local_b: Option<&'a BUFRTableB>, - local_d: Option<&'a BUFRTableD>, - ) -> Self { - Self { - master_b, - master_d, - local_b, - local_d, - b_cache: FxHashMap::default(), - d_cache: FxHashMap::default(), - } - } - - /// Get or cache B table entry - #[inline(always)] - fn get_b(&mut self, fxy: &K) -> Option<&ArchivedBTableEntry> { - self.lookup_b_descriptor(fxy) - } - - /// Get or cache D table entry - #[inline(always)] - fn get_d(&mut self, fxy: &K) -> Option<&'a ArchivedDTableEntry> { - self.lookup_d_descriptor(fxy) - } - - #[inline(always)] - fn lookup_b_descriptor(&self, fxy: &K) -> Option<&'a ArchivedBTableEntry> { - self.lookup_local_b_descriptor(fxy) - .or_else(|| self.lookup_master_b_descriptor(fxy)) - } - - #[inline] - fn lookup_local_b_descriptor(&self, fxy: &K) -> Option<&'a ArchivedBTableEntry> { - self.local_b - .as_ref() - .and_then(|t| t.lookup(fxy)) - .filter(|e| &e.fxy == fxy) - } - - #[inline] - fn lookup_master_b_descriptor(&self, fxy: &K) -> Option<&'a ArchivedBTableEntry> { - self.master_b.lookup(fxy).filter(|e| &e.fxy == fxy) - } - - #[inline] - fn lookup_master_d_descriptor(&self, fxy: &K) -> Option<&'a ArchivedDTableEntry> { - self.master_d.lookup(fxy).filter(|e| &e.fxy == fxy) - } - - #[inline] - fn lookup_local_d_descriptor(&self, fxy: &K) -> Option<&'a ArchivedDTableEntry> { - self.local_d - .as_ref() - .and_then(|t| t.lookup(fxy)) - .filter(|e| &e.fxy == fxy) - } - - #[inline(always)] - fn lookup_d_descriptor(&self, fxy: &K) -> Option<&'a ArchivedDTableEntry> { - self.lookup_local_d_descriptor(fxy) - .or_else(|| self.lookup_master_d_descriptor(fxy)) - } -} - -struct State { - // Common State - common_scale: Option, - common_ref_value: Option, - common_data_width: Option, - common_str_width: Option, - // Localized State - local_data_width: Option, - // Temporary storage - temp_operator: Option, -} - -impl State { - fn new() -> Self { - Self { - common_scale: None, - common_ref_value: None, - common_data_width: None, - common_str_width: None, - local_data_width: None, - temp_operator: None, - } - } - - #[inline(always)] - fn no_change(&self, e: &ArchivedBTableEntry) -> bool { - let unit = e.bufr_unit.as_str(); - let is_flag_or_code = matches!( - unit, - "flag table" | "flag-table" | "code table" | "code-table" - ); - let delay_repeat_count = e.fxy.f.to_native() == 0 && e.fxy.x.to_native() == 31; - - is_flag_or_code || delay_repeat_count - } - - #[inline(always)] - fn datawidth(&self, e: &ArchivedBTableEntry) -> u32 { - let v = if self.no_change(e) { - e.bufr_datawidth_bits.to_native() - } else { - self.common_data_width - .map(|c| { - let (v, _) = e - .bufr_datawidth_bits - .to_native() - .overflowing_add_signed(c - 128); - v - }) - .unwrap_or(e.bufr_datawidth_bits.to_native()) - }; - - if let Some(op) = self.temp_operator { - e.bufr_datawidth_bits.to_native() + (10 * op) as u32 - } else { - v - } - } - - #[inline(always)] - fn scale(&self, e: &ArchivedBTableEntry) -> i32 { - let v = if self.no_change(e) { - e.bufr_scale.to_native() - } else { - self.common_scale - .map(|c| { - let (v, _) = e.bufr_scale.to_native().overflowing_add(128 - c); - v - }) - .unwrap_or(e.bufr_scale.to_native()) - }; - - if let Some(op) = self.temp_operator { - e.bufr_scale.to_native() + op - } else { - v - } - } - - #[inline(always)] - fn reference_value(&self, e: &ArchivedBTableEntry) -> i32 { - let v = e.bufr_reference_value.to_native(); - - if let Some(op) = self.temp_operator { - (v as f32 * 10_f32.powi(op)) as i32 - } else { - v - } - } -} - -impl DataParser { - pub fn new( - edition: u8, - master_b: BUFRTableB, - master_d: BUFRTableD, - local_b: Option, - local_d: Option, - - #[cfg(feature = "opera")] _opera_bitmap_table: Option, - ) -> Self { - DataParser { - bufr_edition: edition, - master_b, - master_d, - local_b, - local_d, - #[cfg(feature = "opera")] - opera_bitmap_table: _opera_bitmap_table, - } - } - - pub fn parse<'a, V: MessageVersion>(&'a mut self, message: &V) -> Result> { - let data_block = message.data_block()?; - let descriptors = message.descriptors()?; - - let mut data_input = BitInput::new(data_block); - let mut record = BUFRParsed::new(); - let mut state = State::new(); - let mut cache = Cache::new( - &self.master_b, - &self.master_d, - self.local_b.as_ref(), - self.local_d.as_ref(), - ); - - let mut stack: Vec = vec![]; - stack.push(Frame { - descs: Descs::Raw(&descriptors), - idx: 0, - }); - - while let Some(Frame { descs, idx }) = stack.pop() { - if idx >= descs.len() { - continue; - } - match descs { - Descs::Raw(raw) => { - let des = &raw[idx]; - self.parse_d( - des, - idx, - &mut record, - descs, - &mut stack, - &mut cache, - &mut state, - &mut data_input, - )?; - } - Descs::Archived(archived) => { - let des = &archived[idx]; - self.parse_d( - des, - idx, - &mut record, - descs, - &mut stack, - &mut cache, - &mut state, - &mut data_input, - )?; - } - } - } - - Ok(record) - } - - fn parse_d<'k, 'c, 'i, 's, K: BUFRKey>( - &self, - des: &K, - idx: usize, - values: &mut BUFRParsed, - descs: Descs<'k>, - // Stack - stack: &mut Vec>, - cache: &mut Cache<'c>, - state: &mut State, - data: &mut BitInput<'i>, - ) -> Result<()> - where - 'c: 'k, - { - match des.f() { - 0 => { - // Element descriptor - parse data - if let Some(e) = cache.get_b(des) { - let value = self.evalute(state, data, &e)?; - - // println!( - // "Parsed Descriptor {}: Value = {}", - // &e.element_name_en, value - // ); - // values.push(value, e.element_name_en.as_str(), e.bufr_unit.as_str()); - - stack.push(Frame { - descs, - idx: idx + 1, - }); - } else { - return Err(Error::ParseError(format!( - "Descriptor {:?} not found in Table B", - des - ))); - } - } - 1 => { - let x = des.x() as usize; - let mut y = des.y() as usize; - let delay_repeat = y == 0; - - if delay_repeat { - let count = match descs { - Descs::Raw(raw) => { - let count_des = &raw[idx + 1]; - self.parse_usize(state, cache, count_des, data)? - } - - Descs::Archived(archived) => { - let count_des = &archived[idx + 1]; - self.parse_usize(state, cache, count_des, data)? - } - }; - y = count; - } - - // Calculate the start of the repeat body - let body_start = if delay_repeat { idx + 2 } else { idx + 1 }; - let body_end = body_start + x; - - if body_end > descs.len() { - return Err(Error::ParseError(format!( - "Not enough descriptors to repeat: requested {}, available {}", - x, - descs.len() - body_start - ))); - } - - // Push continuation frame first (will be processed after repeats) - stack.push(Frame { - descs, - idx: body_end, - }); - - // Push repeat frames in reverse order (so first repeat executes first) - match descs { - Descs::Raw(raw) => { - let body = &raw[body_start..body_end]; - for _ in 0..y { - stack.push(Frame { - descs: Descs::Raw(body), - idx: 0, - }); - } - } - Descs::Archived(archived) => { - let body = &archived[body_start..body_end]; - for _ in 0..y { - stack.push(Frame { - descs: Descs::Archived(body), - idx: 0, - }); - } - } - } - } - 2 => { - self.deal_with_operator(state, values, des, data)?; - stack.push(Frame { - descs, - idx: idx + 1, - }); - } - 3 => { - #[cfg(feature = "opera")] - let opera_dw = self.parse_opera_bitmap(des).map(|e| e.depth); - - if let Some(seq) = cache.get_d(des) { - let fxy_chain = seq.fxy_chain.as_slice(); - #[cfg(feature = "opera")] - if opera_dw.is_some() { - // let (_, data) = - // self.parse_opera_array(opera_dw.unwrap(), fxy_chain, data)?; - // TODO - unimplemented!(""); - } - - // Push continuation frame (process next descriptor after sequence) - stack.push(Frame { - descs, - idx: idx + 1, - }); - - // Push sequence expansion frame (will be processed first) - stack.push(Frame { - descs: Descs::Archived(fxy_chain), - idx: 0, - }); - } else { - return Err(Error::ParseError(format!( - "Sequence descriptor {:?} not found in Table D", - des - ))); - } - } - _ => { - return Err(Error::ParseError(format!( - "Invalid descriptor F value: {}", - des.f() - ))); - } - } - - Ok(()) - } - - fn parse_usize<'a, 'b, 'c, K: BUFRKey>( - &self, - state: &State, - cache: &mut Cache<'c>, - des: &'a K, - data: &mut BitInput<'b>, - ) -> Result { - match des.f() { - 0 => { - if let Some(e) = cache.get_b(des) { - let value = self.evalute(state, data, &e)?; - - if let Some(v) = value.as_f64() { - Ok(v.floor() as usize) - } else { - Err(Error::ParseError(format!("Format Error"))) - } - } else { - Err(Error::ParseError(format!( - "Descriptor {:?} not found in Table B", - des - ))) - } - } - _ => Err(Error::ParseError(format!( - "Descriptor {:?} not found in Table B", - des - ))), - } - } - - #[inline(always)] - fn evalute<'a>( - &self, - state: &State, - data: &mut BitInput<'a>, - e: &ArchivedBTableEntry, - ) -> Result { - match e.bufr_unit.as_str() { - "CCITT IA5" => { - let total_bytes = state - .common_str_width - .unwrap_or(((e.bufr_datawidth_bits.to_native() as usize) + 7) / 8); - let s = data.take_string(total_bytes as usize)?; - return Ok(Value::String(s)); - } - _ => { - let datawidth = state.datawidth(e); - let scale = state.scale(e) as f64; - let reference_value = state.reference_value(e) as f64; - let value = data.get_arbitary_bits(datawidth as usize)?; - let mv = (1 << datawidth) - 1; - if value == mv && e.fxy.x != 31 { - return Ok(Value::Missing); - } - let result = ((value as f64) + reference_value) * 10.0f64.powi(-scale as i32); - return Ok(Value::Number(result)); - } - } - } - - fn deal_with_operator<'s, 'a, C: Container<'s>, K: BUFRKey>( - &self, - state: &mut State, - values: &mut C, - operator: &K, - data: &mut BitInput<'a>, - ) -> Result<()> { - let x = operator.x(); - let y = operator.y(); - - match x { - 1 => match y { - 0 => { - state.common_data_width = None; - } - _ => { - state.common_data_width = Some(y); - } - }, - 2 => match y { - 0 => { - state.common_scale = None; - } - _ => { - state.common_scale = Some(y); - } - }, - 3 => match y { - 0 => { - state.common_ref_value = None; - } - _ => { - state.common_ref_value = Some(y); - } - }, - 5 => { - let string = data.take_string(y as usize)?; - values.push(Value::String(string), "", "CAITT IA5"); - } - - 6 => { - let localized_width = y; - state.local_data_width = Some(localized_width); - } - 7 => { - state.temp_operator = Some(y); - } - 8 => match y { - 0 => { - state.common_str_width = None; - } - _ => { - state.common_str_width = Some(y as usize); - } - }, - _ => {} - } - - Ok(()) - } - - #[cfg(feature = "opera")] - fn parse_opera_bitmap(&self, des: &K) -> Option<&ArchivedBitMapEntry> { - self.opera_bitmap_table - .as_ref() - .map(|t| t.lookup(des)) - .flatten() - } - - // #[cfg(feature = "opera")] - // fn parse_opera_array<'a>( - // &mut self, - // dw: u8, - // mut descs: VecDeque, - // mut data: BitInput<'a>, - // ) -> Result<(VecDeque, BitInput<'a>)> { - // use crate::opera::OperaBitmapParser; - - // let mut opera_bitmap = OperaBitmapParser::new(dw); - - // while !descs.is_empty() { - // let (_descs, _data) = self.parser_inner(opera_bitmap.values(), descs, data)?; - // descs = _descs; - // data = _data; - // } - // Ok((descs, data)) - // } - - // fn seq_parser(descriptors: &[genlib::FXY]) -> Result<()> {} -} - -#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] -pub enum Value { - Number(f64), - Missing, - String(String), -} - -impl std::fmt::Display for Value { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Value::Number(v) => write!(f, "{}", v), - Value::String(v) => write!(f, "{}", v), - Value::Missing => write!(f, "MISSING"), - } - } -} - -impl Value { - pub fn as_f64(&self) -> Option { - match self { - Value::Number(v) => Some(*v), - Value::Missing => Some(MISS_VAL), - Value::String(_) => None, - } - } - - pub fn as_str(&self) -> Option<&str> { - match self { - Value::String(v) => Some(v), - Value::Number(_) => None, - Value::Missing => None, - } - } - - pub fn as_bytes(&self) -> Option> { - match self { - Value::String(_) => None, - Value::Number(n) => Some(n.to_le_bytes().to_vec()), - Value::Missing => None, - } - } - - pub fn is_missing(&self) -> bool { - matches!(self, Value::Missing) - } -} - -#[derive(Debug, Clone, Copy)] -pub struct BitInput<'a>(&'a [u8], usize); - -impl<'a> BitInput<'a> { - pub fn new(input: &[u8]) -> BitInput { - BitInput(input, 0) - } - - pub fn pointer(&self) -> usize { - self.1 - } - - #[inline] - pub fn take_string(&mut self, nbytes: usize) -> Result { - if nbytes == 0 { - return Ok(String::new()); - } - - // Fast path: byte-aligned string reads - if self.1 == 0 { - if self.0.len() < nbytes { - return Err(Error::ParseError("Not enough data for string".to_string())); - } - let s = String::from_utf8(self.0[..nbytes].to_vec()) - .map_err(|_| Error::ParseError("Invalid UTF-8 string".to_string()))?; - self.0 = &self.0[nbytes..]; - self.1 = 0; - return Ok(s); - } - - // Slow path: unaligned reads - let mut chars = Vec::with_capacity(nbytes); - // let mut remaining_input = self; - - for _ in 0..nbytes { - let byte_value = self.get_arbitary_bits(8)?; - chars.push(byte_value as u8); - } - - let s = String::from_utf8(chars) - .map_err(|_| Error::ParseError("Invalid UTF-8 string".to_string()))?; - Ok(s) - } - - #[inline] - pub fn get_arbitary_bits(&mut self, nbits: usize) -> Result { - if nbits == 0 { - return Ok(0); - } - - // Fast path: byte-aligned reads for common bit widths - if self.1 == 0 { - return self.get_arbitary_bits_aligned(nbits); - } - - // General path for unaligned reads - self.get_arbitary_bits_unaligned(nbits) - } - - /// Fast path for byte-aligned bit reads - #[inline] - fn get_arbitary_bits_aligned(&mut self, nbits: usize) -> Result { - let byte_data = self.0; - - // Optimized paths for common bit widths - match nbits { - 8 => { - if byte_data.is_empty() { - return Err(Error::ParseError("Not enough data".to_string())); - } - self.0 = &self.0[1..]; - self.1 = 0; - Ok(byte_data[0] as u64) - } - 16 => { - if byte_data.len() < 2 { - return Err(Error::ParseError("Not enough data".to_string())); - } - let value = u16::from_be_bytes([byte_data[0], byte_data[1]]) as u64; - self.0 = &self.0[2..]; - self.1 = 0; - Ok(value) - } - 24 => { - if byte_data.len() < 3 { - return Err(Error::ParseError("Not enough data".to_string())); - } - let value = ((byte_data[0] as u64) << 16) - | ((byte_data[1] as u64) << 8) - | (byte_data[2] as u64); - self.0 = &self.0[3..]; - self.1 = 0; - Ok(value) - } - 32 => { - if byte_data.len() < 4 { - return Err(Error::ParseError("Not enough data".to_string())); - } - let value = - u32::from_be_bytes([byte_data[0], byte_data[1], byte_data[2], byte_data[3]]) - as u64; - self.0 = &self.0[4..]; - self.1 = 0; - Ok(value) - } - _ => { - // Generic byte-aligned path - let nbytes = (nbits + 7) / 8; - if byte_data.len() < nbytes { - return Err(Error::ParseError("Not enough data".to_string())); - } - - let mut value: u64 = 0; - let full_bytes = nbits / 8; - - // Read full bytes - for i in 0..full_bytes { - value = (value << 8) | (byte_data[i] as u64); - } - - let remaining_bits = nbits % 8; - if remaining_bits > 0 { - // Read partial byte - let last_byte = byte_data[full_bytes]; - let shift = 8 - remaining_bits; - let mask = ((1u16 << remaining_bits) - 1) as u8; - let bits = (last_byte >> shift) & mask; - value = (value << remaining_bits) | (bits as u64); - self.0 = &self.0[full_bytes..]; - self.1 = remaining_bits; - Ok(value) - } else { - self.0 = &self.0[full_bytes..]; - self.1 = 0; - Ok(value) - } - } - } - } - - /// Slower path for unaligned bit reads - #[inline] - fn get_arbitary_bits_unaligned(&mut self, nbits: usize) -> Result { - let mut value: u64 = 0; - let mut remaining_bits = nbits; - let mut bit_offset = self.1; - let mut byte_data = self.0; - - // Read first partial byte if not byte-aligned - if bit_offset > 0 && remaining_bits > 0 { - if byte_data.is_empty() { - return Err(Error::ParseError("Not enough data".to_string())); - } - - let current_byte = byte_data[0]; - let bits_available_in_byte = 8 - bit_offset; - let bits_to_read = remaining_bits.min(bits_available_in_byte); - - let shift = bits_available_in_byte - bits_to_read; - let mask = ((1u16 << bits_to_read) - 1) as u8; - let extracted_bits = (current_byte >> shift) & mask; - - value = extracted_bits as u64; - remaining_bits -= bits_to_read; - bit_offset += bits_to_read; - - if bit_offset >= 8 { - byte_data = &byte_data[1..]; - bit_offset = 0; - } - } - - // Read full bytes - while remaining_bits >= 8 { - if byte_data.is_empty() { - return Err(Error::ParseError("Not enough data".to_string())); - } - value = (value << 8) | (byte_data[0] as u64); - byte_data = &byte_data[1..]; - remaining_bits -= 8; - } - - // Read final partial byte if needed - if remaining_bits > 0 { - if byte_data.is_empty() { - return Err(Error::ParseError("Not enough data".to_string())); - } - - let current_byte = byte_data[0]; - let shift = 8 - remaining_bits; - let mask = ((1u16 << remaining_bits) - 1) as u8; - let extracted_bits = (current_byte >> shift) & mask; - - value = (value << remaining_bits) | (extracted_bits as u64); - bit_offset = remaining_bits; - } - - self.0 = byte_data; - self.1 = bit_offset; - - Ok(value) - } -} - -trait Container<'a> -where - Self: Sized, -{ - fn push(&mut self, value: Value, name: &'a str, unit: &'a str); - - fn start_repeating<'b>(&'b mut self, time: usize) -> Repeating<'a, 'b>; -} - -impl<'a> Container<'a> for BUFRParsed<'a> { - fn push(&mut self, value: Value, name: &'a str, unit: &'a str) { - self.push(value, name, unit); - } - - fn start_repeating<'s>(&'s mut self, time: usize) -> Repeating<'a, 's> { - self.start_repeating(time) - } -} - -impl<'a, 'b> Container<'a> for Repeating<'a, 'b> { - fn push(&mut self, value: Value, _name: &'a str, _unit: &'a str) { - self.push(value); - } - - fn start_repeating<'s>(&'s mut self, time: usize) -> Repeating<'a, 's> { - Repeating { - parsed: self.parsed, - values: Vec::with_capacity(time), - } - } -} - -pub struct BUFRParsed<'a> { - records: Vec>, -} - -impl<'a> BUFRParsed<'a> { - pub fn new() -> Self { - Self { records: vec![] } - } - - pub fn push(&mut self, value: Value, element_name: &'a str, unit: &'a str) { - self.records.push(BUFRRecord { - name: Some(element_name), - values: BUFRData::Single(value), - unit: Some(unit), - }); - } - - fn start_repeating<'s>(&'s mut self, time: usize) -> Repeating<'a, 's> { - Repeating { - parsed: self, - values: Vec::with_capacity(time), - } - } -} - -struct Repeating<'a, 's> { - parsed: &'s mut BUFRParsed<'a>, - values: Vec, -} - -impl<'a, 's> Repeating<'a, 's> { - fn push(&mut self, value: Value) { - self.values.push(value); - } - - fn finish(self) { - let recording = BUFRRecord { - name: None, - values: BUFRData::Repeat(self.values), - unit: None, - }; - self.parsed.records.push(recording); - } -} - -pub enum BUFRData { - Repeat(Vec), - Single(Value), -} - -pub struct BUFRRecord<'a> { - pub name: Option<&'a str>, - pub values: BUFRData, - pub unit: Option<&'a str>, -} - -impl Display for BUFRRecord<'_> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let is_print_unit = match self.unit { - Some("CAITT IA5" | "code table" | "code-table" | "flag table" | "flag-table") => false, - None => false, - _ => true, - }; - - if self.name.is_none() { - return Ok(()); - } - - let name = self.name.as_ref().unwrap(); - - match &self.values { - BUFRData::Single(v) => { - if is_print_unit { - write!(f, "{}: \n{} {}", name, v, self.unit.as_ref().unwrap())?; - } else { - write!(f, "{}: \n{}", name, v)?; - } - } - BUFRData::Repeat(vs) => { - if vs.len() < 8 { - write!(f, "{}: [", name)?; - for v in vs { - if is_print_unit { - write!(f, "{} {}, ", v, self.unit.as_ref().unwrap())?; - } else { - write!(f, "{}, ", v)?; - } - } - - write!(f, "]")?; - } else { - write!(f, "{}: [", name)?; - for v in &vs[0..5] { - if is_print_unit { - write!(f, "{} {}, ", v, self.unit.as_ref().unwrap())?; - } else { - write!(f, "{}, ", v)?; - } - } - write!(f, "... ")?; - for v in &vs[vs.len() - 2..vs.len()] { - if is_print_unit { - write!(f, "{} {}, ", v, self.unit.as_ref().unwrap())?; - } else { - write!(f, "{}, ", v)?; - } - } - write!(f, "]")?; - } - } - } - - Ok(()) - } -} - -struct Frame<'v> { - descs: Descs<'v>, - idx: usize, -} - -impl<'v> Frame<'v> { - fn len(&self) -> usize { - match &self.descs { - Descs::Raw(d) => d.len(), - Descs::Archived(d) => d.len(), - } - } -} - -#[derive(Clone, Copy)] -enum Descs<'v> { - Raw(&'v [genlib::FXY]), - Archived(&'v [ArchivedFXY]), -} - -impl Descs<'_> { - fn len(&self) -> usize { - match self { - Descs::Raw(d) => d.len(), - Descs::Archived(d) => d.len(), - } - } -} diff --git a/rbufr/src/structs/mod.rs b/rbufr/src/structs/mod.rs index d6492d3..2046d77 100644 --- a/rbufr/src/structs/mod.rs +++ b/rbufr/src/structs/mod.rs @@ -4,10 +4,8 @@ use nom::{ number::complete::{be_u8, be_u16, be_u24}, }; pub mod bit; -pub mod data_parser; pub(super) mod tools; pub mod versions; - #[cfg(feature = "opera")] pub(super) const GENCENTER: u16 = 247; diff --git a/rbufr/src/tables.rs b/rbufr/src/tables.rs index 0f85d39..c2566b8 100644 --- a/rbufr/src/tables.rs +++ b/rbufr/src/tables.rs @@ -126,7 +126,7 @@ impl TableLoader { T: TableTypeTrait, { let path = table_type.file_path(T::TABLE_TYPE); - println!("Loading table from {:?}", path); + // println!("Loading table from {:?}", path); BUFRTableMPH::::load_from_disk(path).map_err(|e| e.into()) } } diff --git a/rbufr/tests/test_rb.rs b/rbufr/tests/test_rb.rs deleted file mode 100644 index d72c58a..0000000 --- a/rbufr/tests/test_rb.rs +++ /dev/null @@ -1,14 +0,0 @@ -use librbufr::parser::Parser; - -fn test_rb() { - let mut parser = Parser::new(); - let parsed_file = parser - .parse("/Users/xiang.li1/Downloads/36_2025-12-22T11_00_00.bufr") - .unwrap(); - - for msg in parsed_file.messages() { - println!("{}", msg); - - msg.load_data().unwrap(); - } -} diff --git a/rbufr/tests/test_rc.rs b/rbufr/tests/test_rc.rs new file mode 100644 index 0000000..86e0f79 --- /dev/null +++ b/rbufr/tests/test_rc.rs @@ -0,0 +1,11 @@ +use librbufr::{decoder::Decoder, parser::parse}; + +fn test_dec() { + let file = parse("/Users/xiang.li1/Downloads/36_2025-12-22T11_00_00.bufr").unwrap(); + for msg in file.messages() { + let mut decoder = Decoder::from_message(msg).unwrap(); + let record = decoder.decode(msg).unwrap(); + + println!("{}", record); + } +}