1use std::collections::HashSet;
2use std::sync::Arc;
3
4use chrono::Utc;
5
6use crate::domain::contracts::{NodeStore, NodeValidator};
7use crate::domain::models::{
8 AvecState, ConfidenceBandSummary, MonthlyRollupRequest, MonthlyRollupResult, NodeQuery,
9 NumericRange,
10};
11use crate::parsing::SttpNodeParser;
12
13pub struct MonthlyRollupService {
14 store: Arc<dyn NodeStore>,
15 validator: Arc<dyn NodeValidator>,
16 parser: SttpNodeParser,
17}
18
19impl MonthlyRollupService {
20 pub fn new(store: Arc<dyn NodeStore>, validator: Arc<dyn NodeValidator>) -> Self {
22 Self {
23 store,
24 validator,
25 parser: SttpNodeParser::new(),
26 }
27 }
28
29 pub async fn create_async(&self, request: MonthlyRollupRequest) -> MonthlyRollupResult {
34 emit_rollup_trace(
35 &request.session_id,
36 "create_async_start",
37 &format!(
38 "start_utc={} end_utc={} source_session_id={} limit={} persist={}",
39 request.start_utc.to_rfc3339(),
40 request.end_utc.to_rfc3339(),
41 request.source_session_id.as_deref().unwrap_or("all"),
42 request.limit,
43 request.persist
44 ),
45 );
46
47 if request.end_utc < request.start_utc {
48 emit_rollup_trace(
49 &request.session_id,
50 "invalid_range",
51 "end_utc precedes start_utc",
52 );
53 return MonthlyRollupResult {
54 error: Some(
55 "InvalidRange: end must be greater than or equal to start.".to_string(),
56 ),
57 ..MonthlyRollupResult::default()
58 };
59 }
60
61 let nodes = match self
62 .store
63 .query_nodes_async(NodeQuery {
64 session_id: request.source_session_id.clone(),
65 from_utc: Some(request.start_utc),
66 to_utc: Some(request.end_utc),
67 limit: request.limit,
68 tiers: None,
69 })
70 .await
71 {
72 Ok(nodes) => nodes,
73 Err(err) => {
74 emit_rollup_trace(
75 &request.session_id,
76 "query_failure",
77 &format!("error={} content_redacted=true", err),
78 );
79 return MonthlyRollupResult {
80 error: Some(format!("QueryFailure: {err}")),
81 ..MonthlyRollupResult::default()
82 };
83 }
84 };
85
86 emit_rollup_trace(
87 &request.session_id,
88 "query_result",
89 &format!(
90 "source_nodes={} source_session_id={} window={}..{}",
91 nodes.len(),
92 request.source_session_id.as_deref().unwrap_or("all"),
93 request.start_utc.to_rfc3339(),
94 request.end_utc.to_rfc3339()
95 ),
96 );
97
98 if nodes.is_empty() {
99 emit_rollup_trace(
100 &request.session_id,
101 "no_source_nodes",
102 "query returned zero nodes; verify upstream store acceptance and filters",
103 );
104 return MonthlyRollupResult {
105 error: Some("NoSourceNodes: no nodes found in the requested range.".to_string()),
106 ..MonthlyRollupResult::default()
107 };
108 }
109
110 let mut ordered_nodes = nodes;
111 ordered_nodes.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
112
113 let user_nodes = ordered_nodes
114 .iter()
115 .filter(|n| n.user_avec.psi() > 0.0)
116 .collect::<Vec<_>>();
117 let model_nodes = ordered_nodes
118 .iter()
119 .filter(|n| n.model_avec.psi() > 0.0)
120 .collect::<Vec<_>>();
121 let compression_nodes = ordered_nodes
122 .iter()
123 .filter(|n| n.compression_avec.map(|avec| avec.psi()).unwrap_or(0.0) > 0.0)
124 .collect::<Vec<_>>();
125
126 let user_average = average_avec(user_nodes.iter().map(|n| n.user_avec));
127 let model_average = average_avec(model_nodes.iter().map(|n| n.model_avec));
128 let compression_average =
129 average_avec(compression_nodes.iter().filter_map(|n| n.compression_avec));
130
131 let rho_range = range_for(ordered_nodes.iter().map(|n| n.rho));
132 let kappa_range = range_for(ordered_nodes.iter().map(|n| n.kappa));
133 let psi_range = range_for(ordered_nodes.iter().map(|n| n.psi));
134 let rho_bands = bands_for(ordered_nodes.iter().map(|n| n.rho));
135 let kappa_bands = bands_for(ordered_nodes.iter().map(|n| n.kappa));
136 let active_days = ordered_nodes
137 .iter()
138 .map(|n| n.timestamp.date_naive())
139 .collect::<HashSet<_>>()
140 .len();
141 let parent_reference = request
142 .parent_node_id
143 .clone()
144 .unwrap_or_else(|| ordered_nodes[0].session_id.clone());
145
146 let raw_node = build_monthly_node(
147 &request,
148 &parent_reference,
149 ordered_nodes.len(),
150 user_nodes.len(),
151 active_days,
152 user_average,
153 model_average,
154 compression_average,
155 rho_range,
156 kappa_range,
157 psi_range,
158 rho_bands,
159 kappa_bands,
160 );
161
162 let validation = self.validator.validate(&raw_node);
163 if !validation.is_valid {
164 emit_rollup_trace(
165 &request.session_id,
166 "validation_failure",
167 &format!(
168 "reason={} error={} content_redacted=true",
169 validation.reason,
170 validation.error.as_deref().unwrap_or_default()
171 ),
172 );
173 return MonthlyRollupResult {
174 raw_node,
175 source_nodes: ordered_nodes.len(),
176 parent_reference: Some(parent_reference),
177 error: Some(format!(
178 "ValidationFailure: {}: {}",
179 validation.reason,
180 validation.error.unwrap_or_default()
181 )),
182 ..MonthlyRollupResult::default()
183 };
184 }
185
186 let parse_result = self.parser.try_parse(&raw_node, &request.session_id);
187 if !parse_result.success {
188 emit_rollup_trace(
189 &request.session_id,
190 "parse_failure",
191 &format!(
192 "profile={:?} strict_valid={} diagnostics_count={} error={} content_redacted=true",
193 parse_result.profile,
194 parse_result.strict_valid,
195 parse_result.diagnostics.len(),
196 parse_result.error.as_deref().unwrap_or_default()
197 ),
198 );
199 return MonthlyRollupResult {
200 raw_node,
201 source_nodes: ordered_nodes.len(),
202 parent_reference: Some(parent_reference),
203 error: Some(format!(
204 "ParseFailure: {}",
205 parse_result.error.unwrap_or_default()
206 )),
207 ..MonthlyRollupResult::default()
208 };
209 }
210
211 let mut node_id = String::new();
212 if request.persist {
213 let Some(parsed_node) = parse_result.node else {
214 emit_rollup_trace(
215 &request.session_id,
216 "parse_failure",
217 "missing parsed node after successful parse result",
218 );
219 return MonthlyRollupResult {
220 raw_node,
221 source_nodes: ordered_nodes.len(),
222 parent_reference: Some(parent_reference),
223 error: Some("ParseFailure: missing parsed node".to_string()),
224 ..MonthlyRollupResult::default()
225 };
226 };
227
228 match self.store.store_async(parsed_node).await {
229 Ok(id) => {
230 emit_rollup_trace(
231 &request.session_id,
232 "store_success",
233 &format!("node_id={} source_nodes={} content_redacted=true", id, ordered_nodes.len()),
234 );
235 node_id = id
236 }
237 Err(err) => {
238 emit_rollup_trace(
239 &request.session_id,
240 "store_failure",
241 &format!("error={} content_redacted=true", err),
242 );
243 return MonthlyRollupResult {
244 raw_node,
245 source_nodes: ordered_nodes.len(),
246 parent_reference: Some(parent_reference),
247 user_average,
248 model_average,
249 compression_average,
250 rho_range,
251 kappa_range,
252 psi_range,
253 rho_bands,
254 kappa_bands,
255 error: Some(format!("StoreFailure: {err}")),
256 ..MonthlyRollupResult::default()
257 };
258 }
259 }
260 }
261
262 emit_rollup_trace(
263 &request.session_id,
264 "create_async_complete",
265 &format!(
266 "success=true source_nodes={} parent_reference={} persist={}",
267 ordered_nodes.len(),
268 parent_reference,
269 request.persist
270 ),
271 );
272
273 MonthlyRollupResult {
274 success: true,
275 node_id,
276 raw_node,
277 source_nodes: ordered_nodes.len(),
278 parent_reference: Some(parent_reference),
279 user_average,
280 model_average,
281 compression_average,
282 rho_range,
283 kappa_range,
284 psi_range,
285 rho_bands,
286 kappa_bands,
287 ..MonthlyRollupResult::default()
288 }
289 }
290}
291
292fn emit_rollup_trace(session_id: &str, event: &str, detail: &str) {
293 eprintln!(
294 "[sttp_rollup_trace] session_id={} event={} detail={}",
295 session_id, event, detail
296 );
297}
298
299#[allow(clippy::too_many_arguments)]
300fn build_monthly_node(
301 request: &MonthlyRollupRequest,
302 parent_reference: &str,
303 source_nodes: usize,
304 source_user_avec_nodes: usize,
305 active_days: usize,
306 user_average: AvecState,
307 model_average: AvecState,
308 compression_average: AvecState,
309 rho_range: NumericRange,
310 kappa_range: NumericRange,
311 psi_range: NumericRange,
312 rho_bands: ConfidenceBandSummary,
313 kappa_bands: ConfidenceBandSummary,
314) -> String {
315 let timestamp = Utc::now().to_rfc3339();
316 let start = request.start_utc.format("%Y-%m-%d").to_string();
317 let end = request.end_utc.format("%Y-%m-%d").to_string();
318 let source_session_token = request
319 .source_session_id
320 .as_deref()
321 .filter(|s| !s.trim().is_empty())
322 .map(slug)
323 .unwrap_or_else(|| "all_sessions".to_string());
324
325 let template = r#"⊕⟨ ⏣0{ trigger: manual, response_format: temporal_node, origin_session: "__SESSION_ID__", compression_depth: 2, parent_node: ref:__PARENT_REFERENCE__, prime: { attractor_config: { stability: __USER_STABILITY__, friction: __USER_FRICTION__, logic: __USER_LOGIC__, autonomy: __USER_AUTONOMY__ }, context_summary: monthly_rollup_across_stored_sttp_nodes_with_average_state_and_confidence_spread, relevant_tier: monthly, retrieval_budget: 16 } } ⟩
326⦿⟨ ⏣0{ timestamp: "__TIMESTAMP__", tier: monthly, session_id: "__SESSION_ID__", schema_version: "sttp-1.0", user_avec: { stability: __USER_STABILITY__, friction: __USER_FRICTION__, logic: __USER_LOGIC__, autonomy: __USER_AUTONOMY__, psi: __USER_PSI__ }, model_avec: { stability: __MODEL_STABILITY__, friction: __MODEL_FRICTION__, logic: __MODEL_LOGIC__, autonomy: __MODEL_AUTONOMY__, psi: __MODEL_PSI__ } } ⟩
327◈⟨ ⏣0{ source_nodes(.99): __SOURCE_NODES__, source_user_avec_nodes(.97): __SOURCE_USER_AVEC_NODES__, active_days(.95): __ACTIVE_DAYS__, date_span(.99): __START___to___END__, source_session_filter(.78): __SOURCE_SESSION_TOKEN__, parent_anchor(.99): __PARENT_ANCHOR__, activity_shape(.83): burst_work_pattern_with_gaps_between_deep_sessions, monthly_arc(.86): stabilization_then_design_then_implementation_then_synthesis, behavioral_signature(.84): high_stability_high_logic_high_autonomy_with_low_to_moderate_friction, user_avec_average(.99): { stability: __USER_STABILITY__, friction: __USER_FRICTION__, logic: __USER_LOGIC__, autonomy: __USER_AUTONOMY__, psi: __USER_PSI__ }, model_avec_average(.97): { stability: __MODEL_STABILITY__, friction: __MODEL_FRICTION__, logic: __MODEL_LOGIC__, autonomy: __MODEL_AUTONOMY__, psi: __MODEL_PSI__ }, compression_avec_average(.96): { stability: __COMP_STABILITY__, friction: __COMP_FRICTION__, logic: __COMP_LOGIC__, autonomy: __COMP_AUTONOMY__, psi: __COMP_PSI__ }, confidence_ranges(.94): { rho_avg: __RHO_AVG__, rho_min: __RHO_MIN__, rho_max: __RHO_MAX__, kappa_avg: __KAPPA_AVG__, kappa_min: __KAPPA_MIN__, kappa_max: __KAPPA_MAX__, psi_avg: __PSI_AVG__, psi_min: __PSI_MIN__, psi_max: __PSI_MAX__ }, confidence_bands(.71): { rho_low: __RHO_LOW__, rho_medium: __RHO_MEDIUM__, rho_high: __RHO_HIGH__, kappa_low: __KAPPA_LOW__, kappa_medium: __KAPPA_MEDIUM__, kappa_high: __KAPPA_HIGH__ }, uncertainty(.41): interpretive_fields_carry_lower_confidence_than_numeric_rollups } ⟩
328⍉⟨ ⏣0{ rho: __RHO_AVG__, kappa: __KAPPA_AVG__, psi: __PSI_AVG__, compression_avec: { stability: __COMP_STABILITY__, friction: __COMP_FRICTION__, logic: __COMP_LOGIC__, autonomy: __COMP_AUTONOMY__, psi: __COMP_PSI__ } } ⟩"#;
329
330 template
331 .replace("__SESSION_ID__", &request.session_id)
332 .replace("__PARENT_REFERENCE__", parent_reference)
333 .replace("__TIMESTAMP__", ×tamp)
334 .replace("__SOURCE_NODES__", &source_nodes.to_string())
335 .replace(
336 "__SOURCE_USER_AVEC_NODES__",
337 &source_user_avec_nodes.to_string(),
338 )
339 .replace("__ACTIVE_DAYS__", &active_days.to_string())
340 .replace("__START__", &start)
341 .replace("__END__", &end)
342 .replace("__SOURCE_SESSION_TOKEN__", &source_session_token)
343 .replace("__PARENT_ANCHOR__", &slug(parent_reference))
344 .replace("__USER_STABILITY__", &format_float(user_average.stability))
345 .replace("__USER_FRICTION__", &format_float(user_average.friction))
346 .replace("__USER_LOGIC__", &format_float(user_average.logic))
347 .replace("__USER_AUTONOMY__", &format_float(user_average.autonomy))
348 .replace("__USER_PSI__", &format_float(user_average.psi()))
349 .replace(
350 "__MODEL_STABILITY__",
351 &format_float(model_average.stability),
352 )
353 .replace("__MODEL_FRICTION__", &format_float(model_average.friction))
354 .replace("__MODEL_LOGIC__", &format_float(model_average.logic))
355 .replace("__MODEL_AUTONOMY__", &format_float(model_average.autonomy))
356 .replace("__MODEL_PSI__", &format_float(model_average.psi()))
357 .replace(
358 "__COMP_STABILITY__",
359 &format_float(compression_average.stability),
360 )
361 .replace(
362 "__COMP_FRICTION__",
363 &format_float(compression_average.friction),
364 )
365 .replace("__COMP_LOGIC__", &format_float(compression_average.logic))
366 .replace(
367 "__COMP_AUTONOMY__",
368 &format_float(compression_average.autonomy),
369 )
370 .replace("__COMP_PSI__", &format_float(compression_average.psi()))
371 .replace("__RHO_AVG__", &format_float(rho_range.average))
372 .replace("__RHO_MIN__", &format_float(rho_range.min))
373 .replace("__RHO_MAX__", &format_float(rho_range.max))
374 .replace("__KAPPA_AVG__", &format_float(kappa_range.average))
375 .replace("__KAPPA_MIN__", &format_float(kappa_range.min))
376 .replace("__KAPPA_MAX__", &format_float(kappa_range.max))
377 .replace("__PSI_AVG__", &format_float(psi_range.average))
378 .replace("__PSI_MIN__", &format_float(psi_range.min))
379 .replace("__PSI_MAX__", &format_float(psi_range.max))
380 .replace("__RHO_LOW__", &rho_bands.low.to_string())
381 .replace("__RHO_MEDIUM__", &rho_bands.medium.to_string())
382 .replace("__RHO_HIGH__", &rho_bands.high.to_string())
383 .replace("__KAPPA_LOW__", &kappa_bands.low.to_string())
384 .replace("__KAPPA_MEDIUM__", &kappa_bands.medium.to_string())
385 .replace("__KAPPA_HIGH__", &kappa_bands.high.to_string())
386}
387
388fn average_avec<I>(states: I) -> AvecState
389where
390 I: IntoIterator<Item = AvecState>,
391{
392 let values = states.into_iter().collect::<Vec<_>>();
393 if values.is_empty() {
394 return AvecState::zero();
395 }
396
397 let len = values.len() as f32;
398 let stability = values.iter().map(|s| s.stability).sum::<f32>() / len;
399 let friction = values.iter().map(|s| s.friction).sum::<f32>() / len;
400 let logic = values.iter().map(|s| s.logic).sum::<f32>() / len;
401 let autonomy = values.iter().map(|s| s.autonomy).sum::<f32>() / len;
402
403 AvecState {
404 stability,
405 friction,
406 logic,
407 autonomy,
408 }
409}
410
411fn range_for<I>(values: I) -> NumericRange
412where
413 I: IntoIterator<Item = f32>,
414{
415 let values = values.into_iter().collect::<Vec<_>>();
416 if values.is_empty() {
417 return NumericRange::default();
418 }
419
420 let min = values
421 .iter()
422 .fold(f32::INFINITY, |acc, value| acc.min(*value));
423 let max = values
424 .iter()
425 .fold(f32::NEG_INFINITY, |acc, value| acc.max(*value));
426 let average = values.iter().sum::<f32>() / values.len() as f32;
427
428 NumericRange { min, max, average }
429}
430
431fn bands_for<I>(values: I) -> ConfidenceBandSummary
432where
433 I: IntoIterator<Item = f32>,
434{
435 let values = values.into_iter().collect::<Vec<_>>();
436 ConfidenceBandSummary {
437 low: values.iter().filter(|v| **v < 0.5).count(),
438 medium: values.iter().filter(|v| **v >= 0.5 && **v < 0.85).count(),
439 high: values.iter().filter(|v| **v >= 0.85).count(),
440 }
441}
442
443fn format_float(value: f32) -> String {
444 let mut s = format!("{value:.10}");
445 while s.contains('.') && s.ends_with('0') {
446 s.pop();
447 }
448 if s.ends_with('.') {
449 s.pop();
450 }
451 if s.is_empty() { "0".to_string() } else { s }
452}
453
454fn slug(value: &str) -> String {
455 let mut output = String::with_capacity(value.len());
456 for ch in value.chars() {
457 if ch.is_alphanumeric() {
458 for lower in ch.to_lowercase() {
459 output.push(lower);
460 }
461 } else {
462 output.push('_');
463 }
464 }
465
466 output.trim_matches('_').to_string()
467}