Skip to main content

locus_core_rs/application/services/
monthly_rollup_service.rs

1use std::collections::HashSet;
2use std::sync::Arc;
3
4use chrono::Utc;
5
6use crate::domain::contracts::{NodeStore, NodeValidator};
7use crate::domain::models::{
8    AvecState, ConfidenceBandSummary, MonthlyRollupRequest, MonthlyRollupResult, NodeQuery,
9    NumericRange,
10};
11use crate::parsing::SttpNodeParser;
12
13pub struct MonthlyRollupService {
14    store: Arc<dyn NodeStore>,
15    validator: Arc<dyn NodeValidator>,
16    parser: SttpNodeParser,
17}
18
19impl MonthlyRollupService {
20    /// Create a monthly rollup service with storage and validation dependencies.
21    pub fn new(store: Arc<dyn NodeStore>, validator: Arc<dyn NodeValidator>) -> Self {
22        Self {
23            store,
24            validator,
25            parser: SttpNodeParser::new(),
26        }
27    }
28
29    /// Build a monthly rollup node from source nodes in the requested date range.
30    ///
31    /// Depending on request settings, this can run as preview-only or persist
32    /// the generated node into the configured store.
33    pub async fn create_async(&self, request: MonthlyRollupRequest) -> MonthlyRollupResult {
34        emit_rollup_trace(
35            &request.session_id,
36            "create_async_start",
37            &format!(
38                "start_utc={} end_utc={} source_session_id={} limit={} persist={}",
39                request.start_utc.to_rfc3339(),
40                request.end_utc.to_rfc3339(),
41                request.source_session_id.as_deref().unwrap_or("all"),
42                request.limit,
43                request.persist
44            ),
45        );
46
47        if request.end_utc < request.start_utc {
48            emit_rollup_trace(
49                &request.session_id,
50                "invalid_range",
51                "end_utc precedes start_utc",
52            );
53            return MonthlyRollupResult {
54                error: Some(
55                    "InvalidRange: end must be greater than or equal to start.".to_string(),
56                ),
57                ..MonthlyRollupResult::default()
58            };
59        }
60
61        let nodes = match self
62            .store
63            .query_nodes_async(NodeQuery {
64                session_id: request.source_session_id.clone(),
65                from_utc: Some(request.start_utc),
66                to_utc: Some(request.end_utc),
67                limit: request.limit,
68                tiers: None,
69            })
70            .await
71        {
72            Ok(nodes) => nodes,
73            Err(err) => {
74                emit_rollup_trace(
75                    &request.session_id,
76                    "query_failure",
77                    &format!("error={} content_redacted=true", err),
78                );
79                return MonthlyRollupResult {
80                    error: Some(format!("QueryFailure: {err}")),
81                    ..MonthlyRollupResult::default()
82                };
83            }
84        };
85
86        emit_rollup_trace(
87            &request.session_id,
88            "query_result",
89            &format!(
90                "source_nodes={} source_session_id={} window={}..{}",
91                nodes.len(),
92                request.source_session_id.as_deref().unwrap_or("all"),
93                request.start_utc.to_rfc3339(),
94                request.end_utc.to_rfc3339()
95            ),
96        );
97
98        if nodes.is_empty() {
99            emit_rollup_trace(
100                &request.session_id,
101                "no_source_nodes",
102                "query returned zero nodes; verify upstream store acceptance and filters",
103            );
104            return MonthlyRollupResult {
105                error: Some("NoSourceNodes: no nodes found in the requested range.".to_string()),
106                ..MonthlyRollupResult::default()
107            };
108        }
109
110        let mut ordered_nodes = nodes;
111        ordered_nodes.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
112
113        let user_nodes = ordered_nodes
114            .iter()
115            .filter(|n| n.user_avec.psi() > 0.0)
116            .collect::<Vec<_>>();
117        let model_nodes = ordered_nodes
118            .iter()
119            .filter(|n| n.model_avec.psi() > 0.0)
120            .collect::<Vec<_>>();
121        let compression_nodes = ordered_nodes
122            .iter()
123            .filter(|n| n.compression_avec.map(|avec| avec.psi()).unwrap_or(0.0) > 0.0)
124            .collect::<Vec<_>>();
125
126        let user_average = average_avec(user_nodes.iter().map(|n| n.user_avec));
127        let model_average = average_avec(model_nodes.iter().map(|n| n.model_avec));
128        let compression_average =
129            average_avec(compression_nodes.iter().filter_map(|n| n.compression_avec));
130
131        let rho_range = range_for(ordered_nodes.iter().map(|n| n.rho));
132        let kappa_range = range_for(ordered_nodes.iter().map(|n| n.kappa));
133        let psi_range = range_for(ordered_nodes.iter().map(|n| n.psi));
134        let rho_bands = bands_for(ordered_nodes.iter().map(|n| n.rho));
135        let kappa_bands = bands_for(ordered_nodes.iter().map(|n| n.kappa));
136        let active_days = ordered_nodes
137            .iter()
138            .map(|n| n.timestamp.date_naive())
139            .collect::<HashSet<_>>()
140            .len();
141        let parent_reference = request
142            .parent_node_id
143            .clone()
144            .unwrap_or_else(|| ordered_nodes[0].session_id.clone());
145
146        let raw_node = build_monthly_node(
147            &request,
148            &parent_reference,
149            ordered_nodes.len(),
150            user_nodes.len(),
151            active_days,
152            user_average,
153            model_average,
154            compression_average,
155            rho_range,
156            kappa_range,
157            psi_range,
158            rho_bands,
159            kappa_bands,
160        );
161
162        let validation = self.validator.validate(&raw_node);
163        if !validation.is_valid {
164            emit_rollup_trace(
165                &request.session_id,
166                "validation_failure",
167                &format!(
168                    "reason={} error={} content_redacted=true",
169                    validation.reason,
170                    validation.error.as_deref().unwrap_or_default()
171                ),
172            );
173            return MonthlyRollupResult {
174                raw_node,
175                source_nodes: ordered_nodes.len(),
176                parent_reference: Some(parent_reference),
177                error: Some(format!(
178                    "ValidationFailure: {}: {}",
179                    validation.reason,
180                    validation.error.unwrap_or_default()
181                )),
182                ..MonthlyRollupResult::default()
183            };
184        }
185
186        let parse_result = self.parser.try_parse(&raw_node, &request.session_id);
187        if !parse_result.success {
188            emit_rollup_trace(
189                &request.session_id,
190                "parse_failure",
191                &format!(
192                    "profile={:?} strict_valid={} diagnostics_count={} error={} content_redacted=true",
193                    parse_result.profile,
194                    parse_result.strict_valid,
195                    parse_result.diagnostics.len(),
196                    parse_result.error.as_deref().unwrap_or_default()
197                ),
198            );
199            return MonthlyRollupResult {
200                raw_node,
201                source_nodes: ordered_nodes.len(),
202                parent_reference: Some(parent_reference),
203                error: Some(format!(
204                    "ParseFailure: {}",
205                    parse_result.error.unwrap_or_default()
206                )),
207                ..MonthlyRollupResult::default()
208            };
209        }
210
211        let mut node_id = String::new();
212        if request.persist {
213            let Some(parsed_node) = parse_result.node else {
214                emit_rollup_trace(
215                    &request.session_id,
216                    "parse_failure",
217                    "missing parsed node after successful parse result",
218                );
219                return MonthlyRollupResult {
220                    raw_node,
221                    source_nodes: ordered_nodes.len(),
222                    parent_reference: Some(parent_reference),
223                    error: Some("ParseFailure: missing parsed node".to_string()),
224                    ..MonthlyRollupResult::default()
225                };
226            };
227
228            match self.store.store_async(parsed_node).await {
229                Ok(id) => {
230                    emit_rollup_trace(
231                        &request.session_id,
232                        "store_success",
233                        &format!("node_id={} source_nodes={} content_redacted=true", id, ordered_nodes.len()),
234                    );
235                    node_id = id
236                }
237                Err(err) => {
238                    emit_rollup_trace(
239                        &request.session_id,
240                        "store_failure",
241                        &format!("error={} content_redacted=true", err),
242                    );
243                    return MonthlyRollupResult {
244                        raw_node,
245                        source_nodes: ordered_nodes.len(),
246                        parent_reference: Some(parent_reference),
247                        user_average,
248                        model_average,
249                        compression_average,
250                        rho_range,
251                        kappa_range,
252                        psi_range,
253                        rho_bands,
254                        kappa_bands,
255                        error: Some(format!("StoreFailure: {err}")),
256                        ..MonthlyRollupResult::default()
257                    };
258                }
259            }
260        }
261
262        emit_rollup_trace(
263            &request.session_id,
264            "create_async_complete",
265            &format!(
266                "success=true source_nodes={} parent_reference={} persist={}",
267                ordered_nodes.len(),
268                parent_reference,
269                request.persist
270            ),
271        );
272
273        MonthlyRollupResult {
274            success: true,
275            node_id,
276            raw_node,
277            source_nodes: ordered_nodes.len(),
278            parent_reference: Some(parent_reference),
279            user_average,
280            model_average,
281            compression_average,
282            rho_range,
283            kappa_range,
284            psi_range,
285            rho_bands,
286            kappa_bands,
287            ..MonthlyRollupResult::default()
288        }
289    }
290}
291
292fn emit_rollup_trace(session_id: &str, event: &str, detail: &str) {
293    eprintln!(
294        "[sttp_rollup_trace] session_id={} event={} detail={}",
295        session_id, event, detail
296    );
297}
298
299#[allow(clippy::too_many_arguments)]
300fn build_monthly_node(
301    request: &MonthlyRollupRequest,
302    parent_reference: &str,
303    source_nodes: usize,
304    source_user_avec_nodes: usize,
305    active_days: usize,
306    user_average: AvecState,
307    model_average: AvecState,
308    compression_average: AvecState,
309    rho_range: NumericRange,
310    kappa_range: NumericRange,
311    psi_range: NumericRange,
312    rho_bands: ConfidenceBandSummary,
313    kappa_bands: ConfidenceBandSummary,
314) -> String {
315    let timestamp = Utc::now().to_rfc3339();
316    let start = request.start_utc.format("%Y-%m-%d").to_string();
317    let end = request.end_utc.format("%Y-%m-%d").to_string();
318    let source_session_token = request
319        .source_session_id
320        .as_deref()
321        .filter(|s| !s.trim().is_empty())
322        .map(slug)
323        .unwrap_or_else(|| "all_sessions".to_string());
324
325    let template = r#"⊕⟨ ⏣0{ trigger: manual, response_format: temporal_node, origin_session: "__SESSION_ID__", compression_depth: 2, parent_node: ref:__PARENT_REFERENCE__, prime: { attractor_config: { stability: __USER_STABILITY__, friction: __USER_FRICTION__, logic: __USER_LOGIC__, autonomy: __USER_AUTONOMY__ }, context_summary: monthly_rollup_across_stored_sttp_nodes_with_average_state_and_confidence_spread, relevant_tier: monthly, retrieval_budget: 16 } } ⟩
326⦿⟨ ⏣0{ timestamp: "__TIMESTAMP__", tier: monthly, session_id: "__SESSION_ID__", schema_version: "sttp-1.0", user_avec: { stability: __USER_STABILITY__, friction: __USER_FRICTION__, logic: __USER_LOGIC__, autonomy: __USER_AUTONOMY__, psi: __USER_PSI__ }, model_avec: { stability: __MODEL_STABILITY__, friction: __MODEL_FRICTION__, logic: __MODEL_LOGIC__, autonomy: __MODEL_AUTONOMY__, psi: __MODEL_PSI__ } } ⟩
327◈⟨ ⏣0{ source_nodes(.99): __SOURCE_NODES__, source_user_avec_nodes(.97): __SOURCE_USER_AVEC_NODES__, active_days(.95): __ACTIVE_DAYS__, date_span(.99): __START___to___END__, source_session_filter(.78): __SOURCE_SESSION_TOKEN__, parent_anchor(.99): __PARENT_ANCHOR__, activity_shape(.83): burst_work_pattern_with_gaps_between_deep_sessions, monthly_arc(.86): stabilization_then_design_then_implementation_then_synthesis, behavioral_signature(.84): high_stability_high_logic_high_autonomy_with_low_to_moderate_friction, user_avec_average(.99): { stability: __USER_STABILITY__, friction: __USER_FRICTION__, logic: __USER_LOGIC__, autonomy: __USER_AUTONOMY__, psi: __USER_PSI__ }, model_avec_average(.97): { stability: __MODEL_STABILITY__, friction: __MODEL_FRICTION__, logic: __MODEL_LOGIC__, autonomy: __MODEL_AUTONOMY__, psi: __MODEL_PSI__ }, compression_avec_average(.96): { stability: __COMP_STABILITY__, friction: __COMP_FRICTION__, logic: __COMP_LOGIC__, autonomy: __COMP_AUTONOMY__, psi: __COMP_PSI__ }, confidence_ranges(.94): { rho_avg: __RHO_AVG__, rho_min: __RHO_MIN__, rho_max: __RHO_MAX__, kappa_avg: __KAPPA_AVG__, kappa_min: __KAPPA_MIN__, kappa_max: __KAPPA_MAX__, psi_avg: __PSI_AVG__, psi_min: __PSI_MIN__, psi_max: __PSI_MAX__ }, confidence_bands(.71): { rho_low: __RHO_LOW__, rho_medium: __RHO_MEDIUM__, rho_high: __RHO_HIGH__, kappa_low: __KAPPA_LOW__, kappa_medium: __KAPPA_MEDIUM__, kappa_high: __KAPPA_HIGH__ }, uncertainty(.41): interpretive_fields_carry_lower_confidence_than_numeric_rollups } ⟩
328⍉⟨ ⏣0{ rho: __RHO_AVG__, kappa: __KAPPA_AVG__, psi: __PSI_AVG__, compression_avec: { stability: __COMP_STABILITY__, friction: __COMP_FRICTION__, logic: __COMP_LOGIC__, autonomy: __COMP_AUTONOMY__, psi: __COMP_PSI__ } } ⟩"#;
329
330    template
331        .replace("__SESSION_ID__", &request.session_id)
332        .replace("__PARENT_REFERENCE__", parent_reference)
333        .replace("__TIMESTAMP__", &timestamp)
334        .replace("__SOURCE_NODES__", &source_nodes.to_string())
335        .replace(
336            "__SOURCE_USER_AVEC_NODES__",
337            &source_user_avec_nodes.to_string(),
338        )
339        .replace("__ACTIVE_DAYS__", &active_days.to_string())
340        .replace("__START__", &start)
341        .replace("__END__", &end)
342        .replace("__SOURCE_SESSION_TOKEN__", &source_session_token)
343        .replace("__PARENT_ANCHOR__", &slug(parent_reference))
344        .replace("__USER_STABILITY__", &format_float(user_average.stability))
345        .replace("__USER_FRICTION__", &format_float(user_average.friction))
346        .replace("__USER_LOGIC__", &format_float(user_average.logic))
347        .replace("__USER_AUTONOMY__", &format_float(user_average.autonomy))
348        .replace("__USER_PSI__", &format_float(user_average.psi()))
349        .replace(
350            "__MODEL_STABILITY__",
351            &format_float(model_average.stability),
352        )
353        .replace("__MODEL_FRICTION__", &format_float(model_average.friction))
354        .replace("__MODEL_LOGIC__", &format_float(model_average.logic))
355        .replace("__MODEL_AUTONOMY__", &format_float(model_average.autonomy))
356        .replace("__MODEL_PSI__", &format_float(model_average.psi()))
357        .replace(
358            "__COMP_STABILITY__",
359            &format_float(compression_average.stability),
360        )
361        .replace(
362            "__COMP_FRICTION__",
363            &format_float(compression_average.friction),
364        )
365        .replace("__COMP_LOGIC__", &format_float(compression_average.logic))
366        .replace(
367            "__COMP_AUTONOMY__",
368            &format_float(compression_average.autonomy),
369        )
370        .replace("__COMP_PSI__", &format_float(compression_average.psi()))
371        .replace("__RHO_AVG__", &format_float(rho_range.average))
372        .replace("__RHO_MIN__", &format_float(rho_range.min))
373        .replace("__RHO_MAX__", &format_float(rho_range.max))
374        .replace("__KAPPA_AVG__", &format_float(kappa_range.average))
375        .replace("__KAPPA_MIN__", &format_float(kappa_range.min))
376        .replace("__KAPPA_MAX__", &format_float(kappa_range.max))
377        .replace("__PSI_AVG__", &format_float(psi_range.average))
378        .replace("__PSI_MIN__", &format_float(psi_range.min))
379        .replace("__PSI_MAX__", &format_float(psi_range.max))
380        .replace("__RHO_LOW__", &rho_bands.low.to_string())
381        .replace("__RHO_MEDIUM__", &rho_bands.medium.to_string())
382        .replace("__RHO_HIGH__", &rho_bands.high.to_string())
383        .replace("__KAPPA_LOW__", &kappa_bands.low.to_string())
384        .replace("__KAPPA_MEDIUM__", &kappa_bands.medium.to_string())
385        .replace("__KAPPA_HIGH__", &kappa_bands.high.to_string())
386}
387
388fn average_avec<I>(states: I) -> AvecState
389where
390    I: IntoIterator<Item = AvecState>,
391{
392    let values = states.into_iter().collect::<Vec<_>>();
393    if values.is_empty() {
394        return AvecState::zero();
395    }
396
397    let len = values.len() as f32;
398    let stability = values.iter().map(|s| s.stability).sum::<f32>() / len;
399    let friction = values.iter().map(|s| s.friction).sum::<f32>() / len;
400    let logic = values.iter().map(|s| s.logic).sum::<f32>() / len;
401    let autonomy = values.iter().map(|s| s.autonomy).sum::<f32>() / len;
402
403    AvecState {
404        stability,
405        friction,
406        logic,
407        autonomy,
408    }
409}
410
411fn range_for<I>(values: I) -> NumericRange
412where
413    I: IntoIterator<Item = f32>,
414{
415    let values = values.into_iter().collect::<Vec<_>>();
416    if values.is_empty() {
417        return NumericRange::default();
418    }
419
420    let min = values
421        .iter()
422        .fold(f32::INFINITY, |acc, value| acc.min(*value));
423    let max = values
424        .iter()
425        .fold(f32::NEG_INFINITY, |acc, value| acc.max(*value));
426    let average = values.iter().sum::<f32>() / values.len() as f32;
427
428    NumericRange { min, max, average }
429}
430
431fn bands_for<I>(values: I) -> ConfidenceBandSummary
432where
433    I: IntoIterator<Item = f32>,
434{
435    let values = values.into_iter().collect::<Vec<_>>();
436    ConfidenceBandSummary {
437        low: values.iter().filter(|v| **v < 0.5).count(),
438        medium: values.iter().filter(|v| **v >= 0.5 && **v < 0.85).count(),
439        high: values.iter().filter(|v| **v >= 0.85).count(),
440    }
441}
442
443fn format_float(value: f32) -> String {
444    let mut s = format!("{value:.10}");
445    while s.contains('.') && s.ends_with('0') {
446        s.pop();
447    }
448    if s.ends_with('.') {
449        s.pop();
450    }
451    if s.is_empty() { "0".to_string() } else { s }
452}
453
454fn slug(value: &str) -> String {
455    let mut output = String::with_capacity(value.len());
456    for ch in value.chars() {
457        if ch.is_alphanumeric() {
458            for lower in ch.to_lowercase() {
459                output.push(lower);
460            }
461        } else {
462            output.push('_');
463        }
464    }
465
466    output.trim_matches('_').to_string()
467}