Skip to main content

locus_gateway/
gateway.rs

1use std::collections::{BTreeMap, HashMap};
2use std::net::SocketAddr;
3use std::sync::Arc;
4
5use anyhow::{Result, anyhow};
6use axum::extract::{Query, State};
7use axum::http::{HeaderMap, Method, StatusCode};
8use axum::routing::{get, post};
9use axum::{Json, Router};
10use chrono::{DateTime, Utc};
11use clap::Parser;
12use serde_json::{Value, json};
13use tokio::net::TcpListener;
14use tonic::transport::Server;
15use tonic::{Request, Response, Status};
16use tower_http::cors::{Any, CorsLayer};
17use tracing::{error, info};
18use tracing_subscriber::EnvFilter;
19
20use locus_core_rs::domain::models::{
21    self as core_models, ConfidenceBandSummary, MonthlyRollupRequest, NumericRange, PsiRange,
22};
23use locus_core_rs::domain::contracts::EmbeddingProvider;
24use locus_sdk::application::memory_find::MemoryFindService;
25use locus_sdk::application::memory_recall::MemoryRecallService;
26use locus_sdk::application::memory_transform::MemoryTransformService;
27use locus_sdk::domain::memory::{
28    FallbackPolicy, MemoryFilter, MemoryFindRequest, MemoryPage, MemoryRecallRequest,
29    MemoryScope, MemoryScoring, MemoryTransformOperation, MemoryTransformRequest,
30};
31use locus_sdk::infrastructure::registry::InMemoryAiProviderRegistry;
32use locus_sdk::infrastructure::sttp_native::embedding_provider_adapter::SttpEmbeddingProviderAdapter;
33
34use crate::app_state::AppState;
35use crate::constants::{
36    DEFAULT_HYBRID_ALPHA, DEFAULT_HYBRID_BETA, FILE_DESCRIPTOR_SET, TENANT_SCAN_LIMIT,
37};
38use crate::gateway_args::GatewayArgs;
39use crate::http_models::*;
40use crate::orchestration::{build_state, parse_cors_allowed_origins, shutdown_signal};
41#[cfg(test)]
42use crate::orchestration::build_in_memory_state;
43use crate::providers::resolve_query_embedding;
44use crate::tenant::{
45    display_session_id, normalize_node_for_tenant, normalize_tenant_value, resolve_grpc_tenant,
46    resolve_http_tenant, scope_session_id,
47};
48
49pub mod proto {
50    tonic::include_proto!("sttp.v1");
51}
52
53pub(crate) async fn run() -> Result<()> {
54    tracing_subscriber::fmt()
55        .with_env_filter(EnvFilter::from_default_env())
56        .init();
57
58    let args = GatewayArgs::parse();
59
60    if args.http_port == args.grpc_port {
61        return Err(anyhow!(
62            "--http-port and --grpc-port must be different values"
63        ));
64    }
65
66    let state = Arc::new(build_state(&args).await?);
67
68    let base_router = Router::new()
69        .route("/health", get(health_handler))
70        .route("/api/v1/calibrate", post(calibrate_handler))
71        .route("/api/v1/store", post(store_context_handler))
72        .route("/api/store", post(store_context_handler))
73        .route("/store", post(store_context_handler))
74        .route("/api/v1/avec/score", post(score_avec_handler))
75        .route("/api/avec/score", post(score_avec_handler))
76        .route("/avec/score", post(score_avec_handler))
77        .route("/api/v1/session/rename", post(rename_session_handler))
78        .route("/api/session/rename", post(rename_session_handler))
79        .route("/session/rename", post(rename_session_handler))
80        .route("/api/v1/context", post(get_context_handler))
81        .route(
82            "/api/v1/context/embeddings",
83            post(get_embedding_context_handler),
84        )
85        .route(
86            "/api/context/embeddings",
87            post(get_embedding_context_handler),
88        )
89        .route("/context/embeddings", post(get_embedding_context_handler))
90        .route("/api/v1/nodes", get(list_nodes_handler))
91        .route("/api/nodes", get(list_nodes_handler))
92        .route("/nodes", get(list_nodes_handler))
93        .route("/api/v1/graph", get(graph_handler))
94        .route("/api/graph", get(graph_handler))
95        .route("/graph", get(graph_handler))
96        .route("/api/v1/moods", get(get_moods_handler))
97        .route("/api/v1/rekey", post(batch_rekey_handler))
98        .route(
99            "/api/v1/rollups/monthly",
100            post(create_monthly_rollup_handler),
101        )
102        .route(
103            "/api/v1/embeddings/migration/preview",
104            post(preview_embedding_migration_handler),
105        )
106        .route(
107            "/api/v1/embeddings/migration/run",
108            post(run_embedding_migration_handler),
109        )
110        .with_state(state.clone());
111
112    let http_router = if args.cors_enabled {
113        let allowed_origins = parse_cors_allowed_origins(&args.cors_allowed_origins)?;
114        let cors_base = CorsLayer::new()
115            .allow_methods([Method::GET, Method::POST, Method::PATCH, Method::OPTIONS])
116            .allow_headers(Any);
117
118        let cors = match allowed_origins {
119            CorsAllowedOrigins::Any => cors_base.allow_origin(Any),
120            CorsAllowedOrigins::Explicit(origins) => cors_base.allow_origin(origins),
121        };
122
123        base_router.layer(cors)
124    } else {
125        base_router
126    };
127
128    let grpc_service = GrpcGatewayService::new(state);
129
130    let grpc_addr = SocketAddr::from(([0, 0, 0, 0], args.grpc_port));
131    let http_listener = TcpListener::bind(("0.0.0.0", args.http_port)).await?;
132
133    let reflection_service = tonic_reflection::server::Builder::configure()
134        .register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET)
135        .build_v1()?;
136
137    info!(
138        http_port = args.http_port,
139        grpc_port = args.grpc_port,
140        cors_enabled = args.cors_enabled,
141        cors_allowed_origins = %args.cors_allowed_origins,
142        "Starting locus-gateway"
143    );
144
145    let http_server =
146        axum::serve(http_listener, http_router).with_graceful_shutdown(shutdown_signal());
147    let grpc_server = Server::builder()
148        .add_service(reflection_service)
149        .add_service(
150            proto::sttp_gateway_service_server::SttpGatewayServiceServer::new(grpc_service),
151        )
152        .serve_with_shutdown(grpc_addr, shutdown_signal());
153
154    let (http_result, grpc_result) = tokio::join!(http_server, grpc_server);
155    if let Err(err) = http_result {
156        error!(error = %err, "HTTP server exited with error");
157        return Err(err.into());
158    }
159    if let Err(err) = grpc_result {
160        error!(error = %err, "gRPC server exited with error");
161        return Err(err.into());
162    }
163
164    Ok(())
165}
166
167async fn health_handler() -> Json<Value> {
168    Json(json!({ "status": "ok", "transport": "http+grpc" }))
169}
170
171async fn calibrate_handler(
172    State(state): State<Arc<AppState>>,
173    headers: HeaderMap,
174    Json(request): Json<CalibrateSessionHttpRequest>,
175) -> ApiResult<CalibrationResultDto> {
176    let tenant = resolve_http_tenant(request.tenant_id.as_deref(), &headers);
177    let scoped_session_id = scope_session_id(&tenant, &request.session_id);
178
179    let trigger = request
180        .trigger
181        .as_deref()
182        .filter(|v| !v.trim().is_empty())
183        .unwrap_or("manual");
184
185    let result = state
186        .calibration
187        .calibrate_async(
188            &scoped_session_id,
189            request.stability,
190            request.friction,
191            request.logic,
192            request.autonomy,
193            trigger,
194        )
195        .await
196        .map_err(internal_error)?;
197
198    Ok(Json(CalibrationResultDto {
199        previous_avec: to_avec_dto(result.previous_avec),
200        delta: result.delta,
201        drift_classification: format!("{:?}", result.drift_classification),
202        trigger: result.trigger,
203        trigger_history: result.trigger_history,
204        is_first_calibration: result.is_first_calibration,
205    }))
206}
207
208async fn store_context_handler(
209    State(state): State<Arc<AppState>>,
210    headers: HeaderMap,
211    Json(request): Json<StoreContextHttpRequest>,
212) -> ApiResult<StoreResultDto> {
213    let tenant = resolve_http_tenant(request.tenant_id.as_deref(), &headers);
214    let scoped_session_id = scope_session_id(&tenant, &request.session_id);
215
216    let result = state
217        .store_context
218        .store_async(&request.node, &scoped_session_id)
219        .await;
220
221    Ok(Json(StoreResultDto {
222        node_id: result.node_id,
223        psi: result.psi,
224        valid: result.valid,
225        validation_error: result.validation_error,
226        duplicate_skipped: false,
227        upsert_status: if result.valid {
228            "created".to_string()
229        } else {
230            "skipped".to_string()
231        },
232    }))
233}
234
235async fn score_avec_handler(
236    State(state): State<Arc<AppState>>,
237    headers: HeaderMap,
238    Json(request): Json<ScoreAvecHttpRequest>,
239) -> ApiResult<ScoreAvecResultDto> {
240    let _tenant = resolve_http_tenant(request.tenant_id.as_deref(), &headers);
241
242    if request.text.trim().is_empty() {
243        return Err(bad_request("text cannot be empty"));
244    }
245
246    let scorer = state.avec_scorer.as_ref().ok_or_else(|| {
247        bad_request("AVEC scoring is disabled; enable LOCUS_GATEWAY_AVEC_SCORING_ENABLED")
248    })?;
249
250    let avec = scorer
251        .score_async(request.text.trim())
252        .await
253        .map_err(internal_error)?;
254
255    Ok(Json(ScoreAvecResultDto {
256        provider: scorer.provider_name().to_string(),
257        model: scorer.model_name().to_string(),
258        avec: to_avec_dto(avec),
259    }))
260}
261
262async fn rename_session_handler(
263    State(state): State<Arc<AppState>>,
264    headers: HeaderMap,
265    Json(request): Json<RenameSessionHttpRequest>,
266) -> ApiResult<RenameSessionResultDto> {
267    let tenant = resolve_http_tenant(request.tenant_id.as_deref(), &headers);
268    let source_session_id = request.source_session_id.trim();
269    let target_session_id = request.target_session_id.trim();
270
271    if source_session_id.is_empty() || target_session_id.is_empty() {
272        return Err(bad_request(
273            "sourceSessionId and targetSessionId are required",
274        ));
275    }
276
277    if source_session_id == target_session_id {
278        return Ok(Json(RenameSessionResultDto {
279            source_session_id: source_session_id.to_string(),
280            target_session_id: target_session_id.to_string(),
281            moved_nodes: 0,
282            moved_calibrations: 0,
283            scopes_applied: 0,
284        }));
285    }
286
287    let scoped_source_session_id = scope_session_id(&tenant, source_session_id);
288    let scoped_target_session_id = scope_session_id(&tenant, target_session_id);
289
290    let source_nodes = state
291        .node_store
292        .query_nodes_async(core_models::NodeQuery {
293            limit: 10_000,
294            session_id: Some(scoped_source_session_id.clone()),
295            from_utc: None,
296            to_utc: None,
297            tiers: None,
298        })
299        .await
300        .map_err(internal_error)?;
301
302    if source_nodes.is_empty() {
303        return Err(bad_request(format!(
304            "source session not found: {source_session_id}"
305        )));
306    }
307
308    let mut anchor_node_ids = Vec::with_capacity(source_nodes.len());
309    for node in source_nodes {
310        let upsert = state
311            .node_store
312            .upsert_node_async(node)
313            .await
314            .map_err(internal_error)?;
315        anchor_node_ids.push(upsert.node_id);
316    }
317    anchor_node_ids.sort();
318    anchor_node_ids.dedup();
319
320    let rekey_result = state
321        .rekey_scope
322        .rekey_async(
323            anchor_node_ids,
324            &tenant,
325            &scoped_target_session_id,
326            false,
327            request.allow_merge.unwrap_or(false),
328        )
329        .await
330        .map_err(internal_error)?;
331
332    if let Some(conflict) = rekey_result.scopes.iter().find(|scope| scope.conflict) {
333        return Err(bad_request(
334            conflict
335                .message
336                .clone()
337                .unwrap_or_else(|| "target session already exists".to_string()),
338        ));
339    }
340
341    let scopes_applied = rekey_result
342        .scopes
343        .iter()
344        .filter(|scope| scope.applied)
345        .count();
346
347    Ok(Json(RenameSessionResultDto {
348        source_session_id: source_session_id.to_string(),
349        target_session_id: target_session_id.to_string(),
350        moved_nodes: rekey_result.temporal_nodes_updated,
351        moved_calibrations: rekey_result.calibrations_updated,
352        scopes_applied,
353    }))
354}
355
356async fn get_context_handler(
357    State(state): State<Arc<AppState>>,
358    headers: HeaderMap,
359    Json(request): Json<GetContextHttpRequest>,
360) -> ApiResult<RetrieveResultDto> {
361    let tenant = resolve_http_tenant(request.tenant_id.as_deref(), &headers);
362    let scoped_session_id = scope_session_id(&tenant, &request.session_id);
363
364    let limit = request.limit.unwrap_or(5);
365    let tiers = normalize_request_tiers(request.tiers.as_deref());
366    let query_embedding = resolve_query_embedding(
367        state.embedding_provider.as_ref(),
368        request.query_text.as_deref(),
369        request.query_embedding.as_deref(),
370    )
371    .await;
372    let recall_service = MemoryRecallService::new(state.node_store.clone());
373    let recall_result = recall_service
374        .execute(&MemoryRecallRequest {
375            scope: MemoryScope {
376                tenant_id: None,
377                session_ids: Some(vec![scoped_session_id]),
378                tiers,
379                from_utc: request.from_utc,
380                to_utc: request.to_utc,
381            },
382            page: MemoryPage {
383                limit,
384                cursor: None,
385            },
386            scoring: MemoryScoring {
387                alpha: request
388                    .alpha
389                    .unwrap_or(DEFAULT_HYBRID_ALPHA)
390                    .clamp(0.0, 1.0),
391                beta: request.beta.unwrap_or(DEFAULT_HYBRID_BETA).clamp(0.0, 1.0),
392                fallback_policy: FallbackPolicy::Never,
393                ..Default::default()
394            },
395            current_avec: Some(core_models::AvecState {
396                stability: request.stability,
397                friction: request.friction,
398                logic: request.logic,
399                autonomy: request.autonomy,
400            }),
401            query_text: request.query_text,
402            query_embedding,
403            ..Default::default()
404        })
405        .await
406        .map_err(internal_error)?;
407
408    Ok(Json(RetrieveResultDto {
409        nodes: recall_result.nodes.iter().map(to_node_dto).collect(),
410        retrieved: recall_result.retrieved,
411        psi_range: PsiRangeDto {
412            min: recall_result.psi_range.min,
413            max: recall_result.psi_range.max,
414            average: recall_result.psi_range.average,
415        },
416    }))
417}
418
419async fn get_embedding_context_handler(
420    State(state): State<Arc<AppState>>,
421    headers: HeaderMap,
422    Json(request): Json<GetEmbeddingContextHttpRequest>,
423) -> ApiResult<RetrieveResultDto> {
424    let tenant = resolve_http_tenant(request.tenant_id.as_deref(), &headers);
425    let scoped_session_id = scope_session_id(&tenant, &request.session_id);
426    let limit = request.limit.unwrap_or(5);
427    let tiers = normalize_request_tiers(request.tiers.as_deref());
428
429    let rag_embedding = resolve_query_embedding(
430        state.embedding_provider.as_ref(),
431        request.rag_query_text.as_deref(),
432        request.rag_embedding.as_deref(),
433    )
434    .await;
435
436    let avec_embedding = resolve_query_embedding(
437        state.embedding_provider.as_ref(),
438        request.avec_query_text.as_deref(),
439        request.avec_embedding.as_deref(),
440    )
441    .await;
442
443    let fused_embedding = fuse_weighted_embeddings(
444        rag_embedding.as_deref(),
445        avec_embedding.as_deref(),
446        request.rag_weight.unwrap_or(0.7),
447        request.avec_weight.unwrap_or(0.3),
448    )?;
449
450    if fused_embedding.is_empty() {
451        return Err(bad_request(
452            "Provide ragEmbedding/ragQueryText and/or avecEmbedding/avecQueryText",
453        ));
454    }
455
456    let result = state
457        .context_query
458        .get_context_hybrid_scoped_filtered_async(
459            Some(&scoped_session_id),
460            request.stability,
461            request.friction,
462            request.logic,
463            request.autonomy,
464            request.from_utc,
465            request.to_utc,
466            tiers.as_deref(),
467            Some(fused_embedding.as_slice()),
468            request
469                .alpha
470                .unwrap_or(DEFAULT_HYBRID_ALPHA)
471                .clamp(0.0, 1.0),
472            request.beta.unwrap_or(DEFAULT_HYBRID_BETA).clamp(0.0, 1.0),
473            limit,
474        )
475        .await;
476
477    Ok(Json(RetrieveResultDto {
478        nodes: result.nodes.iter().map(to_node_dto).collect(),
479        retrieved: result.retrieved,
480        psi_range: PsiRangeDto {
481            min: result.psi_range.min,
482            max: result.psi_range.max,
483            average: result.psi_range.average,
484        },
485    }))
486}
487
488fn fuse_weighted_embeddings(
489    rag_embedding: Option<&[f32]>,
490    avec_embedding: Option<&[f32]>,
491    rag_weight: f32,
492    avec_weight: f32,
493) -> Result<Vec<f32>, (StatusCode, Json<ErrorResponse>)> {
494    let rag_weight = rag_weight.clamp(0.0, 1.0);
495    let avec_weight = avec_weight.clamp(0.0, 1.0);
496
497    match (rag_embedding, avec_embedding) {
498        (Some(rag), Some(avec)) => {
499            if rag.len() != avec.len() {
500                return Err(bad_request(
501                    "rag embedding and avec embedding must have the same dimensions",
502                ));
503            }
504
505            let sum = rag_weight + avec_weight;
506            let denom = if sum > f32::EPSILON { sum } else { 1.0 };
507            let fused = rag
508                .iter()
509                .zip(avec.iter())
510                .map(|(r, a)| ((r * rag_weight) + (a * avec_weight)) / denom)
511                .collect::<Vec<_>>();
512            Ok(fused)
513        }
514        (Some(rag), None) => Ok(rag.to_vec()),
515        (None, Some(avec)) => Ok(avec.to_vec()),
516        (None, None) => Ok(Vec::new()),
517    }
518}
519
520async fn list_nodes_handler(
521    State(state): State<Arc<AppState>>,
522    headers: HeaderMap,
523    Query(query): Query<ListNodesQuery>,
524) -> ApiResult<ListNodesResultDto> {
525    let tenant = resolve_http_tenant(query.tenant_id.as_deref(), &headers);
526    let requested_limit = query.limit.unwrap_or(50).clamp(1, TENANT_SCAN_LIMIT);
527    let scoped_session_filter = query
528        .session_id
529        .as_deref()
530        .map(|session_id| scope_session_id(&tenant, session_id));
531    let backend_limit = if scoped_session_filter.is_some() {
532        requested_limit
533    } else {
534        TENANT_SCAN_LIMIT
535    };
536
537    let find_service = MemoryFindService::new(state.node_store.clone());
538    let result = find_service
539        .execute(&MemoryFindRequest {
540            scope: MemoryScope {
541                tenant_id: None,
542                session_ids: scoped_session_filter.map(|session| vec![session]),
543                tiers: None,
544                from_utc: None,
545                to_utc: None,
546            },
547            page: MemoryPage {
548                limit: backend_limit,
549                cursor: None,
550            },
551            ..Default::default()
552        })
553        .await
554        .map_err(internal_error)?;
555
556    let nodes = result
557        .nodes
558        .into_iter()
559        .filter_map(|node| normalize_node_for_tenant(node, &tenant))
560        .take(requested_limit)
561        .collect::<Vec<_>>();
562
563    Ok(Json(ListNodesResultDto {
564        nodes: nodes.iter().map(to_node_dto).collect(),
565        retrieved: nodes.len(),
566    }))
567}
568
569async fn graph_handler(
570    State(state): State<Arc<AppState>>,
571    headers: HeaderMap,
572    Query(query): Query<GraphQuery>,
573) -> ApiResult<GraphResponse> {
574    let tenant = resolve_http_tenant(query.tenant_id.as_deref(), &headers);
575    let capped_limit = query.limit.unwrap_or(1000).clamp(1, 5000);
576    let scoped_session_filter = query
577        .session_id
578        .as_deref()
579        .map(|session_id| scope_session_id(&tenant, session_id));
580    let backend_limit = if scoped_session_filter.is_some() {
581        capped_limit
582    } else {
583        TENANT_SCAN_LIMIT
584    };
585
586    let find_service = MemoryFindService::new(state.node_store.clone());
587    let result = find_service
588        .execute(&MemoryFindRequest {
589            scope: MemoryScope {
590                tenant_id: None,
591                session_ids: scoped_session_filter.map(|session| vec![session]),
592                tiers: None,
593                from_utc: None,
594                to_utc: None,
595            },
596            page: MemoryPage {
597                limit: backend_limit,
598                cursor: None,
599            },
600            ..Default::default()
601        })
602        .await
603        .map_err(internal_error)?;
604
605    let mut ordered_nodes = result
606        .nodes
607        .into_iter()
608        .filter_map(|node| normalize_node_for_tenant(node, &tenant))
609        .take(capped_limit)
610        .collect::<Vec<_>>();
611    ordered_nodes.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
612
613    #[derive(Clone)]
614    struct SessionGroup {
615        id: String,
616        label: String,
617        nodes: Vec<core_models::SttpNode>,
618        node_count: usize,
619        avg_psi: f32,
620        last_modified: DateTime<Utc>,
621        size: usize,
622    }
623
624    let mut grouped_map: BTreeMap<String, Vec<core_models::SttpNode>> = BTreeMap::new();
625    for node in &ordered_nodes {
626        grouped_map
627            .entry(node.session_id.clone())
628            .or_default()
629            .push(node.clone());
630    }
631
632    let mut grouped = grouped_map
633        .into_iter()
634        .map(|(id, mut nodes)| {
635            nodes.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
636            let node_count = nodes.len();
637            let avg_psi = if node_count == 0 {
638                0.0
639            } else {
640                nodes.iter().map(|n| n.psi).sum::<f32>() / node_count as f32
641            };
642            let last_modified = nodes.first().map(|n| n.timestamp).unwrap_or_else(Utc::now);
643            let size = 16 + std::cmp::min(28, node_count * 2);
644
645            SessionGroup {
646                label: id.clone(),
647                id,
648                nodes,
649                node_count,
650                avg_psi,
651                last_modified,
652                size,
653            }
654        })
655        .collect::<Vec<_>>();
656
657    grouped.sort_by(|a, b| b.last_modified.cmp(&a.last_modified));
658
659    let node_by_id = ordered_nodes
660        .iter()
661        .map(|node| (graph_node_id(node), node.clone()))
662        .collect::<HashMap<_, _>>();
663
664    let sessions = grouped
665        .iter()
666        .map(|session| {
667            json!({
668                "id": format!("s:{}", session.id),
669                "label": session.label,
670                "nodeCount": session.node_count,
671                "avgPsi": session.avg_psi,
672                "lastModified": session.last_modified.to_rfc3339(),
673                "size": session.size
674            })
675        })
676        .collect::<Vec<_>>();
677
678    let nodes = ordered_nodes
679        .iter()
680        .map(|node| {
681            json!({
682                "id": graph_node_id(node),
683                "sessionId": node.session_id,
684                "label": format!("{} {}", node.tier, node.timestamp.format("%m-%d %H:%M")),
685                "tier": node.tier,
686                "timestamp": node.timestamp.to_rfc3339(),
687                "psi": node.psi,
688                "parentNodeId": node.parent_node_id,
689                "size": 9
690            })
691        })
692        .collect::<Vec<_>>();
693
694    let mut edges = Vec::new();
695
696    for i in 0..grouped.len().saturating_sub(1) {
697        edges.push(json!({
698            "id": format!("t-{i}"),
699            "source": format!("s:{}", grouped[i].id),
700            "target": format!("s:{}", grouped[i + 1].id),
701            "kind": "timeline"
702        }));
703    }
704
705    for i in 0..grouped.len() {
706        let from = &grouped[i];
707        let mut nearest: Option<usize> = None;
708        let mut nearest_distance = f32::MAX;
709
710        for (j, other) in grouped.iter().enumerate() {
711            if i == j {
712                continue;
713            }
714            let distance = (from.avg_psi - other.avg_psi).abs();
715            if distance < nearest_distance {
716                nearest_distance = distance;
717                nearest = Some(j);
718            }
719        }
720
721        if let Some(nearest_index) = nearest {
722            if i < nearest_index {
723                edges.push(json!({
724                    "id": format!("s-{i}-{nearest_index}"),
725                    "source": format!("s:{}", from.id),
726                    "target": format!("s:{}", grouped[nearest_index].id),
727                    "kind": "similarity"
728                }));
729            }
730        }
731    }
732
733    for session in &grouped {
734        for i in 0..session.nodes.len() {
735            let current = &session.nodes[i];
736            let current_id = graph_node_id(current);
737
738            edges.push(json!({
739                "id": format!("m-{}-{i}", session.id),
740                "source": format!("s:{}", session.id),
741                "target": current_id,
742                "kind": "membership"
743            }));
744
745            if i + 1 < session.nodes.len() {
746                let older = &session.nodes[i + 1];
747                edges.push(json!({
748                    "id": format!("nt-{}-{i}", session.id),
749                    "source": current_id,
750                    "target": graph_node_id(older),
751                    "kind": "node_timeline"
752                }));
753            }
754
755            if let Some(parent) = current.parent_node_id.as_ref() {
756                if node_by_id.contains_key(parent) {
757                    edges.push(json!({
758                        "id": format!("l-{}-{i}", session.id),
759                        "source": current_id,
760                        "target": parent,
761                        "kind": "lineage"
762                    }));
763                }
764            }
765        }
766    }
767
768    Ok(Json(GraphResponse {
769        sessions,
770        nodes,
771        edges,
772        retrieved: ordered_nodes.len(),
773    }))
774}
775
776async fn get_moods_handler(
777    State(state): State<Arc<AppState>>,
778    Query(query): Query<GetMoodsQuery>,
779) -> ApiResult<MoodCatalogResultDto> {
780    let result = state.mood_catalog.get(
781        query.target_mood.as_deref(),
782        query.blend.unwrap_or(1.0),
783        query.current_stability,
784        query.current_friction,
785        query.current_logic,
786        query.current_autonomy,
787    );
788
789    Ok(Json(to_mood_catalog_dto(result)))
790}
791
792async fn create_monthly_rollup_handler(
793    State(state): State<Arc<AppState>>,
794    headers: HeaderMap,
795    Json(request): Json<CreateMonthlyRollupHttpRequest>,
796) -> ApiResult<MonthlyRollupResultDto> {
797    let tenant = resolve_http_tenant(request.tenant_id.as_deref(), &headers);
798
799    let rollup_request = MonthlyRollupRequest {
800        session_id: scope_session_id(&tenant, &request.session_id),
801        start_utc: request.start_date_utc,
802        end_utc: request.end_date_utc,
803        source_session_id: request
804            .source_session_id
805            .map(|session_id| scope_session_id(&tenant, &session_id)),
806        parent_node_id: request.parent_node_id,
807        persist: request.persist.unwrap_or(true),
808        limit: request.limit.unwrap_or(5000),
809    };
810
811    let result = state.monthly_rollup.create_async(rollup_request).await;
812    Ok(Json(to_monthly_rollup_dto(result)))
813}
814
815async fn batch_rekey_handler(
816    State(state): State<Arc<AppState>>,
817    headers: HeaderMap,
818    Json(request): Json<BatchRekeyHttpRequest>,
819) -> ApiResult<BatchRekeyResultDto> {
820    if request.node_ids.is_empty() {
821        return Err(bad_request("nodeIds must contain at least one value"));
822    }
823
824    if request.target_session_id.trim().is_empty() {
825        return Err(bad_request("targetSessionId cannot be empty"));
826    }
827
828    let target_tenant = resolve_http_tenant(request.target_tenant_id.as_deref(), &headers);
829    let scoped_target_session = scope_session_id(&target_tenant, request.target_session_id.trim());
830
831    let result = state
832        .rekey_scope
833        .rekey_async(
834            request.node_ids,
835            &target_tenant,
836            &scoped_target_session,
837            request.dry_run.unwrap_or(true),
838            request.allow_merge.unwrap_or(false),
839        )
840        .await
841        .map_err(internal_error)?;
842
843    Ok(Json(to_batch_rekey_dto(result)))
844}
845
846async fn preview_embedding_migration_handler(
847    State(state): State<Arc<AppState>>,
848    headers: HeaderMap,
849    Json(request): Json<EmbeddingMigrationPreviewHttpRequest>,
850) -> ApiResult<EmbeddingMigrationPreviewResultDto> {
851    let tenant = resolve_http_tenant(request.tenant_id.as_deref(), &headers);
852    let sample_limit = request.sample_limit.unwrap_or(20).clamp(1, 200);
853    let max_nodes = request.max_nodes.unwrap_or(5_000).clamp(1, 50_000);
854    let (scope, filter, sync_keys) = scoped_memory_filter(request.filter, &tenant);
855
856    let find_service = MemoryFindService::new(state.node_store.clone());
857    let find_result = find_service
858        .execute(&MemoryFindRequest {
859            scope,
860            filter,
861            page: MemoryPage {
862                limit: max_nodes,
863                cursor: None,
864            },
865            ..Default::default()
866        })
867        .await
868        .map_err(internal_error)?;
869
870    let mut nodes = find_result.nodes;
871    if let Some(sync_keys) = sync_keys {
872        nodes.retain(|node| sync_keys.iter().any(|key| key == &node.sync_key));
873    }
874
875    let total_candidates = nodes.len();
876    let sample = nodes
877        .into_iter()
878        .take(sample_limit)
879        .map(|sample| EmbeddingMigrationSampleDto {
880            sync_key: sample.sync_key,
881            session_id: display_session_id(&sample.session_id),
882            tier: sample.tier,
883            has_embedding: sample
884                .embedding
885                .as_ref()
886                .is_some_and(|values| !values.is_empty()),
887            embedding_model: sample.embedding_model,
888            embedding_dimensions: sample.embedding_dimensions,
889            embedded_at: sample.embedded_at,
890            updated_at: sample.updated_at,
891            context_summary: sample.context_summary,
892        })
893        .collect::<Vec<_>>();
894
895    Ok(Json(EmbeddingMigrationPreviewResultDto {
896        total_candidates,
897        sample,
898        provider_available: state.embedding_provider.is_some(),
899        provider_model: state
900            .embedding_provider
901            .as_ref()
902            .map(|provider| provider.model_name().to_string()),
903    }))
904}
905
906async fn run_embedding_migration_handler(
907    State(state): State<Arc<AppState>>,
908    headers: HeaderMap,
909    Json(request): Json<EmbeddingMigrationRunHttpRequest>,
910) -> ApiResult<EmbeddingMigrationRunResultDto> {
911    let tenant = resolve_http_tenant(request.tenant_id.as_deref(), &headers);
912    let mode = match request
913        .mode
914        .unwrap_or(EmbeddingMigrationModeHttp::MissingOnly)
915    {
916        EmbeddingMigrationModeHttp::MissingOnly => MemoryTransformOperation::EmbedBackfill,
917        EmbeddingMigrationModeHttp::ReindexAll => MemoryTransformOperation::ReindexEmbeddings,
918    };
919    let dry_run = request.dry_run.unwrap_or(true);
920    let batch_size = request.batch_size.unwrap_or(100).clamp(1, 500);
921    let max_nodes = request.max_nodes.unwrap_or(5_000).clamp(1, 50_000);
922    let (scope, filter, _sync_keys) = scoped_memory_filter(request.filter, &tenant);
923
924    let providers = build_gateway_provider_registry(state.embedding_provider.clone());
925    let transform_service = MemoryTransformService::new(state.node_store.clone(), providers);
926
927    let result = transform_service
928        .execute(&MemoryTransformRequest {
929            scope,
930            filter,
931            operation: mode,
932            dry_run,
933            batch_size,
934            max_nodes,
935            provider_id: state
936                .embedding_provider
937                .as_ref()
938                .map(|_| "gateway-embedding".to_string()),
939            model: state
940                .embedding_provider
941                .as_ref()
942                .map(|provider| provider.model_name().to_string()),
943        })
944        .await
945        .map_err(internal_error)?;
946
947    Ok(Json(to_embedding_migration_run_dto(
948        result,
949        mode,
950        dry_run,
951        state
952            .embedding_provider
953            .as_ref()
954            .map(|provider| provider.model_name().to_string()),
955    )))
956}
957
958fn bad_request(message: impl Into<String>) -> (StatusCode, Json<ErrorResponse>) {
959    (
960        StatusCode::BAD_REQUEST,
961        Json(ErrorResponse {
962            error: message.into(),
963        }),
964    )
965}
966
967fn internal_error(error: impl std::fmt::Display) -> (StatusCode, Json<ErrorResponse>) {
968    (
969        StatusCode::INTERNAL_SERVER_ERROR,
970        Json(ErrorResponse {
971            error: error.to_string(),
972        }),
973    )
974}
975
976fn graph_node_id(node: &core_models::SttpNode) -> String {
977    format!(
978        "n:{}|{}|{}|{:.4}",
979        node.session_id,
980        node.timestamp.to_rfc3339(),
981        node.compression_depth,
982        node.psi
983    )
984}
985
986fn to_avec_dto(value: core_models::AvecState) -> AvecStateDto {
987    AvecStateDto {
988        stability: value.stability,
989        friction: value.friction,
990        logic: value.logic,
991        autonomy: value.autonomy,
992        psi: value.psi(),
993    }
994}
995
996fn to_node_dto(value: &core_models::SttpNode) -> SttpNodeDto {
997    SttpNodeDto {
998        raw: value.raw.clone(),
999        session_id: display_session_id(&value.session_id),
1000        tier: value.tier.clone(),
1001        timestamp: value.timestamp,
1002        compression_depth: value.compression_depth,
1003        parent_node_id: value.parent_node_id.clone(),
1004        user_avec: to_avec_dto(value.user_avec),
1005        model_avec: to_avec_dto(value.model_avec),
1006        compression_avec: value.compression_avec.map(to_avec_dto),
1007        rho: value.rho,
1008        kappa: value.kappa,
1009        psi: value.psi,
1010        sync_key: value.sync_key.clone(),
1011        synthetic_id: graph_node_id(value),
1012    }
1013}
1014
1015fn to_mood_catalog_dto(result: core_models::MoodCatalogResult) -> MoodCatalogResultDto {
1016    MoodCatalogResultDto {
1017        presets: result
1018            .presets
1019            .into_iter()
1020            .map(|preset| MoodPresetDto {
1021                name: preset.name,
1022                description: preset.description,
1023                avec: to_avec_dto(preset.avec),
1024            })
1025            .collect(),
1026        apply_guide: result.apply_guide,
1027        swap_preview: result.swap_preview.map(|preview| MoodSwapPreviewDto {
1028            target_mood: preview.target_mood,
1029            blend: preview.blend,
1030            current: to_avec_dto(preview.current),
1031            target: to_avec_dto(preview.target),
1032            blended: to_avec_dto(preview.blended),
1033        }),
1034    }
1035}
1036
1037fn to_monthly_rollup_dto(result: core_models::MonthlyRollupResult) -> MonthlyRollupResultDto {
1038    MonthlyRollupResultDto {
1039        success: result.success,
1040        node_id: result.node_id,
1041        raw_node: result.raw_node,
1042        error: result.error,
1043        source_nodes: result.source_nodes,
1044        parent_reference: result.parent_reference,
1045        user_average: to_avec_dto(result.user_average),
1046        model_average: to_avec_dto(result.model_average),
1047        compression_average: to_avec_dto(result.compression_average),
1048        rho_range: to_numeric_range_dto(result.rho_range),
1049        kappa_range: to_numeric_range_dto(result.kappa_range),
1050        psi_range: to_numeric_range_dto(result.psi_range),
1051        rho_bands: to_confidence_bands_dto(result.rho_bands),
1052        kappa_bands: to_confidence_bands_dto(result.kappa_bands),
1053    }
1054}
1055
1056fn to_batch_rekey_dto(result: core_models::BatchRekeyResult) -> BatchRekeyResultDto {
1057    let updated_scopes = result.scopes.iter().filter(|scope| scope.applied).count();
1058    let conflict_scopes = result.scopes.iter().filter(|scope| scope.conflict).count();
1059
1060    BatchRekeyResultDto {
1061        dry_run: result.dry_run,
1062        requested_node_ids: result.requested_node_ids,
1063        resolved_node_ids: result.resolved_node_ids,
1064        missing_node_ids: result.missing_node_ids,
1065        scopes: result
1066            .scopes
1067            .into_iter()
1068            .map(|scope| ScopeRekeyResultDto {
1069                source_tenant_id: scope.source_tenant_id,
1070                source_session_id: display_session_id(&scope.source_session_id),
1071                target_tenant_id: scope.target_tenant_id,
1072                target_session_id: display_session_id(&scope.target_session_id),
1073                temporal_nodes: scope.temporal_nodes,
1074                calibrations: scope.calibrations,
1075                target_temporal_nodes: scope.target_temporal_nodes,
1076                target_calibrations: scope.target_calibrations,
1077                applied: scope.applied,
1078                conflict: scope.conflict,
1079                message: scope.message,
1080            })
1081            .collect(),
1082        temporal_nodes_updated: result.temporal_nodes_updated,
1083        calibrations_updated: result.calibrations_updated,
1084        updated_scopes,
1085        conflict_scopes,
1086    }
1087}
1088
1089fn scoped_memory_filter(
1090    request_filter: Option<EmbeddingMigrationFilterHttp>,
1091    tenant: &str,
1092) -> (MemoryScope, MemoryFilter, Option<Vec<String>>) {
1093    let filter = request_filter.unwrap_or(EmbeddingMigrationFilterHttp {
1094        session_id: None,
1095        from_utc: None,
1096        to_utc: None,
1097        tiers: None,
1098        has_embedding: None,
1099        embedding_model: None,
1100        sync_keys: None,
1101    });
1102
1103    (
1104        MemoryScope {
1105            tenant_id: None,
1106            session_ids: filter
1107                .session_id
1108                .map(|session_id| vec![scope_session_id(tenant, &session_id)]),
1109            tiers: filter.tiers,
1110            from_utc: filter.from_utc,
1111            to_utc: filter.to_utc,
1112        },
1113        MemoryFilter {
1114            has_embedding: filter.has_embedding,
1115            embedding_model: filter.embedding_model,
1116            ..Default::default()
1117        },
1118        filter.sync_keys,
1119    )
1120}
1121
1122fn build_gateway_provider_registry(
1123    provider: Option<Arc<dyn EmbeddingProvider>>,
1124) -> Arc<InMemoryAiProviderRegistry> {
1125    let mut registry = InMemoryAiProviderRegistry::new();
1126    if let Some(provider) = provider {
1127        registry.register(SttpEmbeddingProviderAdapter::new("gateway-embedding", provider));
1128    }
1129    Arc::new(registry)
1130}
1131
1132fn to_embedding_migration_run_dto(
1133    result: locus_sdk::domain::memory::MemoryTransformResult,
1134    mode: MemoryTransformOperation,
1135    dry_run: bool,
1136    provider_model: Option<String>,
1137) -> EmbeddingMigrationRunResultDto {
1138    EmbeddingMigrationRunResultDto {
1139        scanned: result.scanned,
1140        selected: result.selected,
1141        updated: result.updated,
1142        skipped: result.skipped,
1143        failed: result.failed,
1144        duplicate: result.duplicate,
1145        started_at: result.started_at,
1146        completed_at: result.completed_at,
1147        provider_model,
1148        dry_run,
1149        mode: match mode {
1150            MemoryTransformOperation::EmbedBackfill => "missing_only".to_string(),
1151            MemoryTransformOperation::ReindexEmbeddings => "reindex_all".to_string(),
1152        },
1153        failure_reasons: result.failures,
1154    }
1155}
1156
1157fn to_numeric_range_dto(value: NumericRange) -> NumericRangeDto {
1158    NumericRangeDto {
1159        min: value.min,
1160        max: value.max,
1161        average: value.average,
1162    }
1163}
1164
1165fn to_confidence_bands_dto(value: ConfidenceBandSummary) -> ConfidenceBandSummaryDto {
1166    ConfidenceBandSummaryDto {
1167        low: value.low,
1168        medium: value.medium,
1169        high: value.high,
1170    }
1171}
1172
1173#[derive(Clone)]
1174struct GrpcGatewayService {
1175    state: Arc<AppState>,
1176}
1177
1178impl GrpcGatewayService {
1179    fn new(state: Arc<AppState>) -> Self {
1180        Self { state }
1181    }
1182}
1183
1184#[tonic::async_trait]
1185impl proto::sttp_gateway_service_server::SttpGatewayService for GrpcGatewayService {
1186    async fn calibrate_session(
1187        &self,
1188        request: Request<proto::CalibrateSessionRequest>,
1189    ) -> Result<Response<proto::CalibrateSessionReply>, Status> {
1190        let tenant = resolve_grpc_tenant(request.metadata());
1191        let request = request.into_inner();
1192        let trigger = if request.trigger.trim().is_empty() {
1193            "manual"
1194        } else {
1195            &request.trigger
1196        };
1197        let scoped_session_id = scope_session_id(&tenant, &request.session_id);
1198
1199        let result = self
1200            .state
1201            .calibration
1202            .calibrate_async(
1203                &scoped_session_id,
1204                request.stability,
1205                request.friction,
1206                request.logic,
1207                request.autonomy,
1208                trigger,
1209            )
1210            .await
1211            .map_err(|err| Status::internal(err.to_string()))?;
1212
1213        let reply = proto::CalibrateSessionReply {
1214            previous_avec: Some(to_grpc_avec(result.previous_avec)),
1215            delta: result.delta,
1216            drift_classification: format!("{:?}", result.drift_classification),
1217            trigger: result.trigger,
1218            trigger_history: result.trigger_history,
1219            is_first_calibration: result.is_first_calibration,
1220        };
1221
1222        Ok(Response::new(reply))
1223    }
1224
1225    async fn store_context(
1226        &self,
1227        request: Request<proto::StoreContextRequest>,
1228    ) -> Result<Response<proto::StoreContextReply>, Status> {
1229        let tenant = resolve_grpc_tenant(request.metadata());
1230        let request = request.into_inner();
1231        let scoped_session_id = scope_session_id(&tenant, &request.session_id);
1232
1233        let result = self
1234            .state
1235            .store_context
1236            .store_async(&request.node, &scoped_session_id)
1237            .await;
1238
1239        let reply = proto::StoreContextReply {
1240            node_id: result.node_id,
1241            psi: result.psi,
1242            valid: result.valid,
1243            validation_error: result.validation_error,
1244        };
1245
1246        Ok(Response::new(reply))
1247    }
1248
1249    async fn get_context(
1250        &self,
1251        request: Request<proto::GetContextRequest>,
1252    ) -> Result<Response<proto::GetContextReply>, Status> {
1253        let tenant = resolve_grpc_tenant(request.metadata());
1254        let request = request.into_inner();
1255        let scoped_session_id = scope_session_id(&tenant, &request.session_id);
1256
1257        let query_embedding = resolve_query_embedding(
1258            self.state.embedding_provider.as_ref(),
1259            request.query_text.as_deref(),
1260            if request.query_embedding.is_empty() {
1261                None
1262            } else {
1263                Some(request.query_embedding.as_slice())
1264            },
1265        )
1266        .await;
1267
1268        let limit = if request.limit <= 0 {
1269            5
1270        } else {
1271            request.limit as usize
1272        };
1273        let tiers = normalize_request_tiers(Some(&request.tiers));
1274        let recall_service = MemoryRecallService::new(self.state.node_store.clone());
1275        let recall_result = recall_service
1276            .execute(&MemoryRecallRequest {
1277                scope: MemoryScope {
1278                    tenant_id: None,
1279                    session_ids: Some(vec![scoped_session_id]),
1280                    tiers,
1281                    from_utc: timestamp_from_proto_optional(request.from_utc)?,
1282                    to_utc: timestamp_from_proto_optional(request.to_utc)?,
1283                },
1284                page: MemoryPage {
1285                    limit,
1286                    cursor: None,
1287                },
1288                scoring: MemoryScoring {
1289                    alpha: request
1290                        .alpha
1291                        .unwrap_or(DEFAULT_HYBRID_ALPHA)
1292                        .clamp(0.0, 1.0),
1293                    beta: request.beta.unwrap_or(DEFAULT_HYBRID_BETA).clamp(0.0, 1.0),
1294                    fallback_policy: FallbackPolicy::Never,
1295                    ..Default::default()
1296                },
1297                current_avec: Some(core_models::AvecState {
1298                    stability: request.stability,
1299                    friction: request.friction,
1300                    logic: request.logic,
1301                    autonomy: request.autonomy,
1302                }),
1303                query_text: request.query_text,
1304                query_embedding,
1305                ..Default::default()
1306            })
1307            .await
1308            .map_err(|err| Status::internal(err.to_string()))?;
1309
1310        let nodes = recall_result
1311            .nodes
1312            .iter()
1313            .cloned()
1314            .filter_map(|node| normalize_node_for_tenant(node, &tenant))
1315            .collect::<Vec<_>>();
1316
1317        let reply = proto::GetContextReply {
1318            nodes: nodes.iter().map(to_grpc_node).collect(),
1319            retrieved: clamp_usize_to_i32(nodes.len()),
1320            psi_range: Some(to_grpc_psi_range(recall_result.psi_range)),
1321        };
1322
1323        Ok(Response::new(reply))
1324    }
1325
1326    async fn get_embedding_context(
1327        &self,
1328        request: Request<proto::GetEmbeddingContextRequest>,
1329    ) -> Result<Response<proto::GetContextReply>, Status> {
1330        let tenant = resolve_grpc_tenant(request.metadata());
1331        let request = request.into_inner();
1332        let scoped_session_id = scope_session_id(&tenant, &request.session_id);
1333
1334        let rag_embedding = resolve_query_embedding(
1335            self.state.embedding_provider.as_ref(),
1336            request.rag_query_text.as_deref(),
1337            if request.rag_embedding.is_empty() {
1338                None
1339            } else {
1340                Some(request.rag_embedding.as_slice())
1341            },
1342        )
1343        .await;
1344
1345        let avec_embedding = resolve_query_embedding(
1346            self.state.embedding_provider.as_ref(),
1347            request.avec_query_text.as_deref(),
1348            if request.avec_embedding.is_empty() {
1349                None
1350            } else {
1351                Some(request.avec_embedding.as_slice())
1352            },
1353        )
1354        .await;
1355
1356        let fused_embedding = fuse_weighted_embeddings(
1357            rag_embedding.as_deref(),
1358            avec_embedding.as_deref(),
1359            request.rag_weight.unwrap_or(0.7),
1360            request.avec_weight.unwrap_or(0.3),
1361        )
1362        .map_err(|(_, payload)| Status::invalid_argument(payload.0.error))?;
1363
1364        if fused_embedding.is_empty() {
1365            return Err(Status::invalid_argument(
1366                "Provide rag_embedding/rag_query_text and/or avec_embedding/avec_query_text",
1367            ));
1368        }
1369
1370        let limit = if request.limit <= 0 {
1371            5
1372        } else {
1373            request.limit as usize
1374        };
1375        let tiers = normalize_request_tiers(Some(&request.tiers));
1376
1377        let result = self
1378            .state
1379            .context_query
1380            .get_context_hybrid_scoped_filtered_async(
1381                Some(&scoped_session_id),
1382                request.stability,
1383                request.friction,
1384                request.logic,
1385                request.autonomy,
1386                timestamp_from_proto_optional(request.from_utc)?,
1387                timestamp_from_proto_optional(request.to_utc)?,
1388                tiers.as_deref(),
1389                Some(fused_embedding.as_slice()),
1390                request
1391                    .alpha
1392                    .unwrap_or(DEFAULT_HYBRID_ALPHA)
1393                    .clamp(0.0, 1.0),
1394                request.beta.unwrap_or(DEFAULT_HYBRID_BETA).clamp(0.0, 1.0),
1395                limit,
1396            )
1397            .await;
1398
1399        let nodes = result
1400            .nodes
1401            .iter()
1402            .cloned()
1403            .filter_map(|node| normalize_node_for_tenant(node, &tenant))
1404            .collect::<Vec<_>>();
1405
1406        let reply = proto::GetContextReply {
1407            nodes: nodes.iter().map(to_grpc_node).collect(),
1408            retrieved: clamp_usize_to_i32(nodes.len()),
1409            psi_range: Some(to_grpc_psi_range(result.psi_range)),
1410        };
1411
1412        Ok(Response::new(reply))
1413    }
1414
1415    async fn list_nodes(
1416        &self,
1417        request: Request<proto::ListNodesRequest>,
1418    ) -> Result<Response<proto::ListNodesReply>, Status> {
1419        let tenant = resolve_grpc_tenant(request.metadata());
1420        let request = request.into_inner();
1421        let requested_limit = if request.limit <= 0 {
1422            50
1423        } else {
1424            request.limit as usize
1425        }
1426        .clamp(1, TENANT_SCAN_LIMIT);
1427        let scoped_session_filter = request
1428            .session_id
1429            .as_deref()
1430            .map(|session_id| scope_session_id(&tenant, session_id));
1431        let backend_limit = if scoped_session_filter.is_some() {
1432            requested_limit
1433        } else {
1434            TENANT_SCAN_LIMIT
1435        };
1436
1437        let find_service = MemoryFindService::new(self.state.node_store.clone());
1438        let result = find_service
1439            .execute(&MemoryFindRequest {
1440                scope: MemoryScope {
1441                    tenant_id: None,
1442                    session_ids: scoped_session_filter.map(|session| vec![session]),
1443                    tiers: None,
1444                    from_utc: None,
1445                    to_utc: None,
1446                },
1447                page: MemoryPage {
1448                    limit: backend_limit,
1449                    cursor: None,
1450                },
1451                ..Default::default()
1452            })
1453            .await
1454            .map_err(|err| Status::internal(err.to_string()))?;
1455
1456        let nodes = result
1457            .nodes
1458            .into_iter()
1459            .filter_map(|node| normalize_node_for_tenant(node, &tenant))
1460            .take(requested_limit)
1461            .collect::<Vec<_>>();
1462
1463        let reply = proto::ListNodesReply {
1464            nodes: nodes.iter().map(to_grpc_node).collect(),
1465            retrieved: clamp_usize_to_i32(nodes.len()),
1466        };
1467
1468        Ok(Response::new(reply))
1469    }
1470
1471    async fn get_moods(
1472        &self,
1473        request: Request<proto::GetMoodsRequest>,
1474    ) -> Result<Response<proto::GetMoodsReply>, Status> {
1475        let request = request.into_inner();
1476        let result = self.state.mood_catalog.get(
1477            request.target_mood.as_deref(),
1478            request.blend,
1479            request.current_stability,
1480            request.current_friction,
1481            request.current_logic,
1482            request.current_autonomy,
1483        );
1484
1485        let reply = proto::GetMoodsReply {
1486            presets: result
1487                .presets
1488                .into_iter()
1489                .map(|preset| proto::MoodPreset {
1490                    name: preset.name,
1491                    description: preset.description,
1492                    avec: Some(to_grpc_avec(preset.avec)),
1493                })
1494                .collect(),
1495            apply_guide: result.apply_guide,
1496            swap_preview: result.swap_preview.map(|preview| proto::MoodSwapPreview {
1497                target_mood: preview.target_mood,
1498                blend: preview.blend,
1499                current: Some(to_grpc_avec(preview.current)),
1500                target: Some(to_grpc_avec(preview.target)),
1501                blended: Some(to_grpc_avec(preview.blended)),
1502            }),
1503        };
1504
1505        Ok(Response::new(reply))
1506    }
1507
1508    async fn batch_rekey(
1509        &self,
1510        request: Request<proto::BatchRekeyRequest>,
1511    ) -> Result<Response<proto::BatchRekeyReply>, Status> {
1512        let metadata_tenant = resolve_grpc_tenant(request.metadata());
1513        let request = request.into_inner();
1514
1515        if request.node_ids.is_empty() {
1516            return Err(Status::invalid_argument(
1517                "node_ids must contain at least one value",
1518            ));
1519        }
1520
1521        if request.target_session_id.trim().is_empty() {
1522            return Err(Status::invalid_argument(
1523                "target_session_id cannot be empty",
1524            ));
1525        }
1526
1527        let target_tenant = request
1528            .target_tenant_id
1529            .as_deref()
1530            .and_then(normalize_tenant_value)
1531            .unwrap_or(metadata_tenant);
1532        let scoped_target_session =
1533            scope_session_id(&target_tenant, request.target_session_id.trim());
1534
1535        let result = self
1536            .state
1537            .rekey_scope
1538            .rekey_async(
1539                request.node_ids,
1540                &target_tenant,
1541                &scoped_target_session,
1542                request.dry_run.unwrap_or(true),
1543                request.allow_merge.unwrap_or(false),
1544            )
1545            .await
1546            .map_err(|err| Status::internal(err.to_string()))?;
1547
1548        Ok(Response::new(to_grpc_batch_rekey_reply(result)))
1549    }
1550
1551    async fn create_monthly_rollup(
1552        &self,
1553        request: Request<proto::CreateMonthlyRollupRequest>,
1554    ) -> Result<Response<proto::CreateMonthlyRollupReply>, Status> {
1555        let tenant = resolve_grpc_tenant(request.metadata());
1556        let request = request.into_inner();
1557
1558        let start_utc = timestamp_from_proto(request.start_utc)?;
1559        let end_utc = timestamp_from_proto(request.end_utc)?;
1560
1561        let monthly_request = MonthlyRollupRequest {
1562            session_id: scope_session_id(&tenant, &request.session_id),
1563            start_utc,
1564            end_utc,
1565            source_session_id: request
1566                .source_session_id
1567                .map(|session_id| scope_session_id(&tenant, &session_id)),
1568            parent_node_id: request.parent_node_id,
1569            persist: request.persist,
1570            limit: if request.limit <= 0 {
1571                5000
1572            } else {
1573                request.limit as usize
1574            },
1575        };
1576
1577        let result = self
1578            .state
1579            .monthly_rollup
1580            .create_async(monthly_request)
1581            .await;
1582
1583        let reply = proto::CreateMonthlyRollupReply {
1584            success: result.success,
1585            node_id: result.node_id,
1586            raw_node: result.raw_node,
1587            error: result.error,
1588            source_nodes: clamp_usize_to_i32(result.source_nodes),
1589            parent_reference: result.parent_reference,
1590            user_average: Some(to_grpc_avec(result.user_average)),
1591            model_average: Some(to_grpc_avec(result.model_average)),
1592            compression_average: Some(to_grpc_avec(result.compression_average)),
1593            rho_range: Some(to_grpc_numeric_range(result.rho_range)),
1594            kappa_range: Some(to_grpc_numeric_range(result.kappa_range)),
1595            psi_range: Some(to_grpc_numeric_range(result.psi_range)),
1596            rho_bands: Some(to_grpc_confidence_bands(result.rho_bands)),
1597            kappa_bands: Some(to_grpc_confidence_bands(result.kappa_bands)),
1598        };
1599
1600        Ok(Response::new(reply))
1601    }
1602}
1603
1604fn to_grpc_avec(value: core_models::AvecState) -> proto::AvecState {
1605    proto::AvecState {
1606        stability: value.stability,
1607        friction: value.friction,
1608        logic: value.logic,
1609        autonomy: value.autonomy,
1610        psi: value.psi(),
1611    }
1612}
1613
1614fn to_grpc_node(value: &core_models::SttpNode) -> proto::SttpNode {
1615    proto::SttpNode {
1616        raw: value.raw.clone(),
1617        session_id: display_session_id(&value.session_id),
1618        tier: value.tier.clone(),
1619        timestamp: Some(timestamp_to_proto(value.timestamp)),
1620        compression_depth: value.compression_depth,
1621        parent_node_id: value.parent_node_id.clone(),
1622        user_avec: Some(to_grpc_avec(value.user_avec)),
1623        model_avec: Some(to_grpc_avec(value.model_avec)),
1624        compression_avec: value.compression_avec.map(to_grpc_avec),
1625        rho: value.rho,
1626        kappa: value.kappa,
1627        psi: value.psi,
1628    }
1629}
1630
1631fn to_grpc_psi_range(value: PsiRange) -> proto::PsiRange {
1632    proto::PsiRange {
1633        min: value.min,
1634        max: value.max,
1635        average: value.average,
1636    }
1637}
1638
1639fn to_grpc_numeric_range(value: NumericRange) -> proto::NumericRange {
1640    proto::NumericRange {
1641        min: value.min,
1642        max: value.max,
1643        average: value.average,
1644    }
1645}
1646
1647fn to_grpc_confidence_bands(value: ConfidenceBandSummary) -> proto::ConfidenceBandSummary {
1648    proto::ConfidenceBandSummary {
1649        low: clamp_usize_to_i32(value.low),
1650        medium: clamp_usize_to_i32(value.medium),
1651        high: clamp_usize_to_i32(value.high),
1652    }
1653}
1654
1655fn to_grpc_scope_rekey_result(value: core_models::ScopeRekeyResult) -> proto::ScopeRekeyResult {
1656    proto::ScopeRekeyResult {
1657        source_tenant_id: value.source_tenant_id,
1658        source_session_id: display_session_id(&value.source_session_id),
1659        target_tenant_id: value.target_tenant_id,
1660        target_session_id: display_session_id(&value.target_session_id),
1661        temporal_nodes: clamp_usize_to_i32(value.temporal_nodes),
1662        calibrations: clamp_usize_to_i32(value.calibrations),
1663        target_temporal_nodes: clamp_usize_to_i32(value.target_temporal_nodes),
1664        target_calibrations: clamp_usize_to_i32(value.target_calibrations),
1665        applied: value.applied,
1666        conflict: value.conflict,
1667        message: value.message,
1668    }
1669}
1670
1671fn to_grpc_batch_rekey_reply(value: core_models::BatchRekeyResult) -> proto::BatchRekeyReply {
1672    let updated_scopes = value.scopes.iter().filter(|scope| scope.applied).count();
1673    let conflict_scopes = value.scopes.iter().filter(|scope| scope.conflict).count();
1674
1675    proto::BatchRekeyReply {
1676        dry_run: value.dry_run,
1677        requested_node_ids: clamp_usize_to_i32(value.requested_node_ids),
1678        resolved_node_ids: clamp_usize_to_i32(value.resolved_node_ids),
1679        missing_node_ids: value.missing_node_ids,
1680        scopes: value
1681            .scopes
1682            .into_iter()
1683            .map(to_grpc_scope_rekey_result)
1684            .collect(),
1685        temporal_nodes_updated: clamp_usize_to_i32(value.temporal_nodes_updated),
1686        calibrations_updated: clamp_usize_to_i32(value.calibrations_updated),
1687        updated_scopes: clamp_usize_to_i32(updated_scopes),
1688        conflict_scopes: clamp_usize_to_i32(conflict_scopes),
1689    }
1690}
1691
1692fn timestamp_to_proto(value: DateTime<Utc>) -> prost_types::Timestamp {
1693    prost_types::Timestamp {
1694        seconds: value.timestamp(),
1695        nanos: value.timestamp_subsec_nanos() as i32,
1696    }
1697}
1698
1699fn timestamp_from_proto(value: Option<prost_types::Timestamp>) -> Result<DateTime<Utc>, Status> {
1700    let value = value.ok_or_else(|| Status::invalid_argument("missing timestamp"))?;
1701    DateTime::<Utc>::from_timestamp(value.seconds, value.nanos as u32)
1702        .ok_or_else(|| Status::invalid_argument("invalid timestamp"))
1703}
1704
1705fn timestamp_from_proto_optional(
1706    value: Option<prost_types::Timestamp>,
1707) -> Result<Option<DateTime<Utc>>, Status> {
1708    value
1709        .map(|timestamp| timestamp_from_proto(Some(timestamp)))
1710        .transpose()
1711}
1712
1713fn normalize_request_tiers(tiers: Option<&[String]>) -> Option<Vec<String>> {
1714    let normalized = tiers
1715        .unwrap_or(&[])
1716        .iter()
1717        .map(|tier| tier.trim().to_ascii_lowercase())
1718        .filter(|tier| !tier.is_empty())
1719        .collect::<Vec<_>>();
1720
1721    if normalized.is_empty() {
1722        None
1723    } else {
1724        Some(normalized)
1725    }
1726}
1727
1728fn clamp_usize_to_i32(value: usize) -> i32 {
1729    if value > i32::MAX as usize {
1730        i32::MAX
1731    } else {
1732        value as i32
1733    }
1734}
1735
1736#[cfg(test)]
1737mod tests {
1738    use super::*;
1739    use crate::gateway::proto::sttp_gateway_service_server::SttpGatewayService;
1740    use crate::gateway_args::{EmbeddingsProviderKind, GatewayBackend};
1741    use crate::providers::parse_avec_state_from_text;
1742    use crate::tenant::session_belongs_to_tenant;
1743    use locus_core_rs::storage::{
1744        SurrealDbEndpointsSettings, SurrealDbRuntimeOptions, SurrealDbSettings,
1745    };
1746
1747    fn sample_node(session_id: &str) -> String {
1748        format!(
1749            r#"
1750⊕⟨ {{ trigger: manual, response_format: temporal_node, origin_session: "{session_id}", compression_depth: 1, parent_node: null, prime: {{ attractor_config: {{ stability: 0.85, friction: 0.25, logic: 0.80, autonomy: 0.70 }}, context_summary: "gateway test", relevant_tier: raw, retrieval_budget: 3 }} }} ⟩
1751⦿⟨ {{ timestamp: "2026-03-05T06:30:00Z", tier: raw, session_id: "{session_id}", user_avec: {{ stability: 0.85, friction: 0.25, logic: 0.80, autonomy: 0.70, psi: 2.60 }}, model_avec: {{ stability: 0.85, friction: 0.25, logic: 0.80, autonomy: 0.70, psi: 2.60 }} }} ⟩
1752◈⟨ {{ test(.99): "gateway parser check" }} ⟩
1753⍉⟨ {{ rho: 0.96, kappa: 0.94, psi: 2.60, compression_avec: {{ stability: 0.85, friction: 0.25, logic: 0.80, autonomy: 0.70, psi: 2.60 }} }} ⟩
1754"#
1755        )
1756    }
1757
1758    fn surreal_test_args() -> GatewayArgs {
1759        GatewayArgs {
1760            http_port: 8080,
1761            grpc_port: 8081,
1762            backend: GatewayBackend::Surreal,
1763            root_dir_name: ".locus-gateway".to_string(),
1764            remote: true,
1765            cors_enabled: true,
1766            cors_allowed_origins: "*".to_string(),
1767            surreal_embedded_endpoint: None,
1768            surreal_remote_endpoint: Some("ws://127.0.0.1:8000/rpc".to_string()),
1769            surreal_namespace: "entasis".to_string(),
1770            surreal_database: "locus_gateway".to_string(),
1771            surreal_user: "root".to_string(),
1772            surreal_password: "root".to_string(),
1773            embeddings_enabled: false,
1774            embeddings_provider: EmbeddingsProviderKind::Ollama,
1775            embeddings_endpoint: "http://127.0.0.1:11434/api/embeddings".to_string(),
1776            embeddings_model: "sttp-encoder".to_string(),
1777            embeddings_repo: "sentence-transformers/all-MiniLM-L6-v2".to_string(),
1778            avec_scoring_enabled: false,
1779            avec_scoring_endpoint: "http://127.0.0.1:11434/api/chat".to_string(),
1780            avec_scoring_model: "qwen2.5:0.5b".to_string(),
1781        }
1782    }
1783
1784    #[test]
1785    fn parse_avec_state_accepts_plain_json() {
1786        let value = parse_avec_state_from_text(
1787            r#"{"stability":0.8,"friction":0.2,"logic":0.9,"autonomy":0.7}"#,
1788        )
1789        .expect("plain JSON should parse");
1790
1791        assert!((value.stability - 0.8).abs() < f32::EPSILON);
1792        assert!((value.friction - 0.2).abs() < f32::EPSILON);
1793        assert!((value.logic - 0.9).abs() < f32::EPSILON);
1794        assert!((value.autonomy - 0.7).abs() < f32::EPSILON);
1795    }
1796
1797    #[test]
1798    fn parse_avec_state_extracts_embedded_json() {
1799        let value = parse_avec_state_from_text(
1800            "Here is the answer: {\"stability\":1.1,\"friction\":-0.1,\"logic\":0.5,\"autonomy\":0.6}",
1801        )
1802        .expect("embedded JSON should parse");
1803
1804        assert!((value.stability - 1.0).abs() < f32::EPSILON);
1805        assert!((value.friction - 0.0).abs() < f32::EPSILON);
1806        assert!((value.logic - 0.5).abs() < f32::EPSILON);
1807        assert!((value.autonomy - 0.6).abs() < f32::EPSILON);
1808    }
1809
1810    #[tokio::test]
1811    async fn score_avec_returns_bad_request_when_disabled() {
1812        let state = Arc::new(build_in_memory_state().await.expect("state should build"));
1813
1814        let err = score_avec_handler(
1815            State(state),
1816            HeaderMap::new(),
1817            Json(ScoreAvecHttpRequest {
1818                text: "hello world".to_string(),
1819                tenant_id: None,
1820            }),
1821        )
1822        .await
1823        .expect_err("scoring should fail when disabled");
1824
1825        assert_eq!(err.0, StatusCode::BAD_REQUEST);
1826    }
1827
1828    #[test]
1829    fn surreal_runtime_options_are_derived_from_gateway_args() {
1830        let args = surreal_test_args();
1831        let mut settings = SurrealDbSettings::default();
1832        settings.endpoints = SurrealDbEndpointsSettings {
1833            embedded: args.surreal_embedded_endpoint.clone(),
1834            remote: args.surreal_remote_endpoint.clone(),
1835        };
1836        settings.namespace = args.surreal_namespace.clone();
1837        settings.database = args.surreal_database.clone();
1838
1839        let runtime_args = vec!["--remote".to_string()];
1840        let runtime = SurrealDbRuntimeOptions::from_args(
1841            &runtime_args,
1842            &settings,
1843            Some(args.root_dir_name.as_str()),
1844        )
1845        .expect("runtime options should be computed");
1846
1847        assert!(runtime.use_remote);
1848        assert_eq!(runtime.endpoint, "ws://127.0.0.1:8000/rpc");
1849        assert_eq!(runtime.namespace, "entasis");
1850        assert_eq!(runtime.database, "locus_gateway");
1851    }
1852
1853    #[test]
1854    fn tenant_scoping_is_applied_only_for_non_default_tenant() {
1855        let scoped = scope_session_id("acme", "session-1");
1856        assert_eq!(scoped, "tenant:acme::session:session-1");
1857        assert!(session_belongs_to_tenant(&scoped, "acme"));
1858        assert!(!session_belongs_to_tenant(&scoped, "default"));
1859        assert_eq!(display_session_id(&scoped), "session-1");
1860
1861        let legacy = scope_session_id("default", "session-1");
1862        assert_eq!(legacy, "session-1");
1863        assert!(session_belongs_to_tenant(&legacy, "default"));
1864    }
1865
1866    #[tokio::test]
1867    async fn http_calibrate_defaults_trigger_to_manual() {
1868        let state = Arc::new(build_in_memory_state().await.expect("state should build"));
1869
1870        let request = CalibrateSessionHttpRequest {
1871            session_id: "http-calibrate-session".to_string(),
1872            tenant_id: None,
1873            stability: 0.8,
1874            friction: 0.2,
1875            logic: 0.8,
1876            autonomy: 0.7,
1877            trigger: None,
1878        };
1879
1880        let Json(reply) = calibrate_handler(State(state), HeaderMap::new(), Json(request))
1881            .await
1882            .expect("calibrate should succeed");
1883
1884        assert_eq!(reply.trigger, "manual");
1885        assert!(reply.is_first_calibration);
1886    }
1887
1888    #[tokio::test]
1889    async fn http_store_then_get_context_roundtrip() {
1890        let state = Arc::new(build_in_memory_state().await.expect("state should build"));
1891        let session_id = "http-store-session";
1892
1893        let Json(store_reply) = store_context_handler(
1894            State(state.clone()),
1895            HeaderMap::new(),
1896            Json(StoreContextHttpRequest {
1897                node: sample_node(session_id),
1898                session_id: session_id.to_string(),
1899                tenant_id: None,
1900            }),
1901        )
1902        .await
1903        .expect("store should succeed");
1904
1905        assert!(store_reply.valid);
1906        assert!(!store_reply.node_id.is_empty());
1907
1908        let Json(context_reply) = get_context_handler(
1909            State(state),
1910            HeaderMap::new(),
1911            Json(GetContextHttpRequest {
1912                session_id: session_id.to_string(),
1913                tenant_id: None,
1914                stability: 0.85,
1915                friction: 0.25,
1916                logic: 0.80,
1917                autonomy: 0.70,
1918                limit: Some(5),
1919                from_utc: None,
1920                to_utc: None,
1921                tiers: None,
1922                query_text: Some("gateway parser check".to_string()),
1923                query_embedding: None,
1924                alpha: None,
1925                beta: None,
1926            }),
1927        )
1928        .await
1929        .expect("get_context should succeed");
1930
1931        assert!(context_reply.retrieved >= 1);
1932        assert!(
1933            context_reply
1934                .nodes
1935                .iter()
1936                .any(|node| node.session_id == session_id)
1937        );
1938    }
1939
1940    #[tokio::test]
1941    async fn http_embedding_context_roundtrip_with_direct_embeddings() {
1942        let state = Arc::new(build_in_memory_state().await.expect("state should build"));
1943        let session_id = "http-embedding-context-session";
1944
1945        let Json(store_reply) = store_context_handler(
1946            State(state.clone()),
1947            HeaderMap::new(),
1948            Json(StoreContextHttpRequest {
1949                node: sample_node(session_id),
1950                session_id: session_id.to_string(),
1951                tenant_id: None,
1952            }),
1953        )
1954        .await
1955        .expect("store should succeed");
1956
1957        assert!(store_reply.valid);
1958
1959        let Json(context_reply) = get_embedding_context_handler(
1960            State(state),
1961            HeaderMap::new(),
1962            Json(GetEmbeddingContextHttpRequest {
1963                session_id: session_id.to_string(),
1964                tenant_id: None,
1965                stability: 0.85,
1966                friction: 0.25,
1967                logic: 0.80,
1968                autonomy: 0.70,
1969                limit: Some(5),
1970                from_utc: None,
1971                to_utc: None,
1972                tiers: None,
1973                rag_query_text: None,
1974                rag_embedding: Some(vec![0.1, 0.2, 0.3]),
1975                avec_query_text: None,
1976                avec_embedding: Some(vec![0.2, 0.2, 0.2]),
1977                rag_weight: Some(0.7),
1978                avec_weight: Some(0.3),
1979                alpha: Some(0.65),
1980                beta: Some(0.35),
1981            }),
1982        )
1983        .await
1984        .expect("embedding context should succeed");
1985
1986        assert!(context_reply.retrieved >= 1);
1987    }
1988
1989    #[tokio::test]
1990    async fn http_embedding_context_rejects_mismatched_embedding_dimensions() {
1991        let state = Arc::new(build_in_memory_state().await.expect("state should build"));
1992
1993        let err = get_embedding_context_handler(
1994            State(state),
1995            HeaderMap::new(),
1996            Json(GetEmbeddingContextHttpRequest {
1997                session_id: "session".to_string(),
1998                tenant_id: None,
1999                stability: 0.8,
2000                friction: 0.2,
2001                logic: 0.8,
2002                autonomy: 0.7,
2003                limit: Some(5),
2004                from_utc: None,
2005                to_utc: None,
2006                tiers: None,
2007                rag_query_text: None,
2008                rag_embedding: Some(vec![0.1, 0.2, 0.3]),
2009                avec_query_text: None,
2010                avec_embedding: Some(vec![0.3, 0.2]),
2011                rag_weight: Some(0.7),
2012                avec_weight: Some(0.3),
2013                alpha: None,
2014                beta: None,
2015            }),
2016        )
2017        .await
2018        .expect_err("mismatched dimensions should fail");
2019
2020        assert_eq!(err.0, StatusCode::BAD_REQUEST);
2021    }
2022
2023    #[tokio::test]
2024    async fn grpc_service_roundtrip_for_calibrate_store_and_list() {
2025        let state = Arc::new(build_in_memory_state().await.expect("state should build"));
2026        let service = GrpcGatewayService::new(state);
2027        let session_id = "grpc-store-session";
2028
2029        let calibrate_reply = service
2030            .calibrate_session(Request::new(proto::CalibrateSessionRequest {
2031                session_id: session_id.to_string(),
2032                stability: 0.8,
2033                friction: 0.2,
2034                logic: 0.8,
2035                autonomy: 0.7,
2036                trigger: String::new(),
2037            }))
2038            .await
2039            .expect("gRPC calibrate should succeed")
2040            .into_inner();
2041
2042        assert_eq!(calibrate_reply.trigger, "manual");
2043
2044        let store_reply = service
2045            .store_context(Request::new(proto::StoreContextRequest {
2046                node: sample_node(session_id),
2047                session_id: session_id.to_string(),
2048            }))
2049            .await
2050            .expect("gRPC store should succeed")
2051            .into_inner();
2052
2053        assert!(store_reply.valid);
2054
2055        let rekey_reply = service
2056            .batch_rekey(Request::new(proto::BatchRekeyRequest {
2057                node_ids: vec![store_reply.node_id.clone()],
2058                target_session_id: session_id.to_string(),
2059                target_tenant_id: Some("default".to_string()),
2060                dry_run: Some(true),
2061                allow_merge: Some(false),
2062            }))
2063            .await
2064            .expect("gRPC batch_rekey should succeed")
2065            .into_inner();
2066
2067        assert_eq!(rekey_reply.requested_node_ids, 1);
2068        assert_eq!(rekey_reply.resolved_node_ids, 1);
2069        assert!(rekey_reply.scopes.iter().any(|scope| !scope.applied));
2070
2071        let list_reply = service
2072            .list_nodes(Request::new(proto::ListNodesRequest {
2073                limit: 50,
2074                session_id: Some(session_id.to_string()),
2075            }))
2076            .await
2077            .expect("gRPC list_nodes should succeed")
2078            .into_inner();
2079
2080        assert!(list_reply.retrieved >= 1);
2081        assert!(
2082            list_reply
2083                .nodes
2084                .iter()
2085                .any(|node| node.session_id == session_id)
2086        );
2087
2088        let context_reply = service
2089            .get_context(Request::new(proto::GetContextRequest {
2090                session_id: session_id.to_string(),
2091                stability: 0.8,
2092                friction: 0.2,
2093                logic: 0.8,
2094                autonomy: 0.7,
2095                limit: 5,
2096                query_text: None,
2097                query_embedding: vec![0.1, 0.2, 0.3],
2098                alpha: Some(0.7),
2099                beta: Some(0.3),
2100                from_utc: None,
2101                to_utc: None,
2102                tiers: Vec::new(),
2103            }))
2104            .await
2105            .expect("gRPC get_context should succeed")
2106            .into_inner();
2107
2108        assert!(context_reply.retrieved >= 1);
2109
2110        let embedding_context_reply = service
2111            .get_embedding_context(Request::new(proto::GetEmbeddingContextRequest {
2112                session_id: session_id.to_string(),
2113                stability: 0.8,
2114                friction: 0.2,
2115                logic: 0.8,
2116                autonomy: 0.7,
2117                limit: 5,
2118                rag_query_text: None,
2119                rag_embedding: vec![0.1, 0.2, 0.3],
2120                avec_query_text: None,
2121                avec_embedding: vec![0.2, 0.2, 0.2],
2122                rag_weight: Some(0.7),
2123                avec_weight: Some(0.3),
2124                alpha: Some(0.7),
2125                beta: Some(0.3),
2126                from_utc: None,
2127                to_utc: None,
2128                tiers: Vec::new(),
2129            }))
2130            .await
2131            .expect("gRPC get_embedding_context should succeed")
2132            .into_inner();
2133
2134        assert!(embedding_context_reply.retrieved >= 1);
2135
2136        let err = service
2137            .get_embedding_context(Request::new(proto::GetEmbeddingContextRequest {
2138                session_id: session_id.to_string(),
2139                stability: 0.8,
2140                friction: 0.2,
2141                logic: 0.8,
2142                autonomy: 0.7,
2143                limit: 5,
2144                rag_query_text: None,
2145                rag_embedding: vec![0.1, 0.2, 0.3],
2146                avec_query_text: None,
2147                avec_embedding: vec![0.2, 0.2],
2148                rag_weight: Some(0.7),
2149                avec_weight: Some(0.3),
2150                alpha: None,
2151                beta: None,
2152                from_utc: None,
2153                to_utc: None,
2154                tiers: Vec::new(),
2155            }))
2156            .await
2157            .expect_err("gRPC get_embedding_context should reject invalid dimensions");
2158
2159        assert_eq!(err.code(), Status::invalid_argument("x").code());
2160    }
2161}