1use std::collections::{BTreeMap, HashMap};
2use std::net::SocketAddr;
3use std::sync::Arc;
4
5use anyhow::{Result, anyhow};
6use axum::extract::{Query, State};
7use axum::http::{HeaderMap, Method, StatusCode};
8use axum::routing::{get, post};
9use axum::{Json, Router};
10use chrono::{DateTime, Utc};
11use clap::Parser;
12use serde_json::{Value, json};
13use tokio::net::TcpListener;
14use tonic::transport::Server;
15use tonic::{Request, Response, Status};
16use tower_http::cors::{Any, CorsLayer};
17use tracing::{error, info};
18use tracing_subscriber::EnvFilter;
19
20use locus_core_rs::domain::models::{
21 self as core_models, ConfidenceBandSummary, MonthlyRollupRequest, NumericRange, PsiRange,
22};
23use locus_core_rs::domain::contracts::EmbeddingProvider;
24use locus_sdk::application::memory_find::MemoryFindService;
25use locus_sdk::application::memory_recall::MemoryRecallService;
26use locus_sdk::application::memory_transform::MemoryTransformService;
27use locus_sdk::domain::memory::{
28 FallbackPolicy, MemoryFilter, MemoryFindRequest, MemoryPage, MemoryRecallRequest,
29 MemoryScope, MemoryScoring, MemoryTransformOperation, MemoryTransformRequest,
30};
31use locus_sdk::infrastructure::registry::InMemoryAiProviderRegistry;
32use locus_sdk::infrastructure::sttp_native::embedding_provider_adapter::SttpEmbeddingProviderAdapter;
33
34use crate::app_state::AppState;
35use crate::constants::{
36 DEFAULT_HYBRID_ALPHA, DEFAULT_HYBRID_BETA, FILE_DESCRIPTOR_SET, TENANT_SCAN_LIMIT,
37};
38use crate::gateway_args::GatewayArgs;
39use crate::http_models::*;
40use crate::orchestration::{build_state, parse_cors_allowed_origins, shutdown_signal};
41#[cfg(test)]
42use crate::orchestration::build_in_memory_state;
43use crate::providers::resolve_query_embedding;
44use crate::tenant::{
45 display_session_id, normalize_node_for_tenant, normalize_tenant_value, resolve_grpc_tenant,
46 resolve_http_tenant, scope_session_id,
47};
48
49pub mod proto {
50 tonic::include_proto!("sttp.v1");
51}
52
53pub(crate) async fn run() -> Result<()> {
54 tracing_subscriber::fmt()
55 .with_env_filter(EnvFilter::from_default_env())
56 .init();
57
58 let args = GatewayArgs::parse();
59
60 if args.http_port == args.grpc_port {
61 return Err(anyhow!(
62 "--http-port and --grpc-port must be different values"
63 ));
64 }
65
66 let state = Arc::new(build_state(&args).await?);
67
68 let base_router = Router::new()
69 .route("/health", get(health_handler))
70 .route("/api/v1/calibrate", post(calibrate_handler))
71 .route("/api/v1/store", post(store_context_handler))
72 .route("/api/store", post(store_context_handler))
73 .route("/store", post(store_context_handler))
74 .route("/api/v1/avec/score", post(score_avec_handler))
75 .route("/api/avec/score", post(score_avec_handler))
76 .route("/avec/score", post(score_avec_handler))
77 .route("/api/v1/session/rename", post(rename_session_handler))
78 .route("/api/session/rename", post(rename_session_handler))
79 .route("/session/rename", post(rename_session_handler))
80 .route("/api/v1/context", post(get_context_handler))
81 .route(
82 "/api/v1/context/embeddings",
83 post(get_embedding_context_handler),
84 )
85 .route(
86 "/api/context/embeddings",
87 post(get_embedding_context_handler),
88 )
89 .route("/context/embeddings", post(get_embedding_context_handler))
90 .route("/api/v1/nodes", get(list_nodes_handler))
91 .route("/api/nodes", get(list_nodes_handler))
92 .route("/nodes", get(list_nodes_handler))
93 .route("/api/v1/graph", get(graph_handler))
94 .route("/api/graph", get(graph_handler))
95 .route("/graph", get(graph_handler))
96 .route("/api/v1/moods", get(get_moods_handler))
97 .route("/api/v1/rekey", post(batch_rekey_handler))
98 .route(
99 "/api/v1/rollups/monthly",
100 post(create_monthly_rollup_handler),
101 )
102 .route(
103 "/api/v1/embeddings/migration/preview",
104 post(preview_embedding_migration_handler),
105 )
106 .route(
107 "/api/v1/embeddings/migration/run",
108 post(run_embedding_migration_handler),
109 )
110 .with_state(state.clone());
111
112 let http_router = if args.cors_enabled {
113 let allowed_origins = parse_cors_allowed_origins(&args.cors_allowed_origins)?;
114 let cors_base = CorsLayer::new()
115 .allow_methods([Method::GET, Method::POST, Method::PATCH, Method::OPTIONS])
116 .allow_headers(Any);
117
118 let cors = match allowed_origins {
119 CorsAllowedOrigins::Any => cors_base.allow_origin(Any),
120 CorsAllowedOrigins::Explicit(origins) => cors_base.allow_origin(origins),
121 };
122
123 base_router.layer(cors)
124 } else {
125 base_router
126 };
127
128 let grpc_service = GrpcGatewayService::new(state);
129
130 let grpc_addr = SocketAddr::from(([0, 0, 0, 0], args.grpc_port));
131 let http_listener = TcpListener::bind(("0.0.0.0", args.http_port)).await?;
132
133 let reflection_service = tonic_reflection::server::Builder::configure()
134 .register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET)
135 .build_v1()?;
136
137 info!(
138 http_port = args.http_port,
139 grpc_port = args.grpc_port,
140 cors_enabled = args.cors_enabled,
141 cors_allowed_origins = %args.cors_allowed_origins,
142 "Starting locus-gateway"
143 );
144
145 let http_server =
146 axum::serve(http_listener, http_router).with_graceful_shutdown(shutdown_signal());
147 let grpc_server = Server::builder()
148 .add_service(reflection_service)
149 .add_service(
150 proto::sttp_gateway_service_server::SttpGatewayServiceServer::new(grpc_service),
151 )
152 .serve_with_shutdown(grpc_addr, shutdown_signal());
153
154 let (http_result, grpc_result) = tokio::join!(http_server, grpc_server);
155 if let Err(err) = http_result {
156 error!(error = %err, "HTTP server exited with error");
157 return Err(err.into());
158 }
159 if let Err(err) = grpc_result {
160 error!(error = %err, "gRPC server exited with error");
161 return Err(err.into());
162 }
163
164 Ok(())
165}
166
167async fn health_handler() -> Json<Value> {
168 Json(json!({ "status": "ok", "transport": "http+grpc" }))
169}
170
171async fn calibrate_handler(
172 State(state): State<Arc<AppState>>,
173 headers: HeaderMap,
174 Json(request): Json<CalibrateSessionHttpRequest>,
175) -> ApiResult<CalibrationResultDto> {
176 let tenant = resolve_http_tenant(request.tenant_id.as_deref(), &headers);
177 let scoped_session_id = scope_session_id(&tenant, &request.session_id);
178
179 let trigger = request
180 .trigger
181 .as_deref()
182 .filter(|v| !v.trim().is_empty())
183 .unwrap_or("manual");
184
185 let result = state
186 .calibration
187 .calibrate_async(
188 &scoped_session_id,
189 request.stability,
190 request.friction,
191 request.logic,
192 request.autonomy,
193 trigger,
194 )
195 .await
196 .map_err(internal_error)?;
197
198 Ok(Json(CalibrationResultDto {
199 previous_avec: to_avec_dto(result.previous_avec),
200 delta: result.delta,
201 drift_classification: format!("{:?}", result.drift_classification),
202 trigger: result.trigger,
203 trigger_history: result.trigger_history,
204 is_first_calibration: result.is_first_calibration,
205 }))
206}
207
208async fn store_context_handler(
209 State(state): State<Arc<AppState>>,
210 headers: HeaderMap,
211 Json(request): Json<StoreContextHttpRequest>,
212) -> ApiResult<StoreResultDto> {
213 let tenant = resolve_http_tenant(request.tenant_id.as_deref(), &headers);
214 let scoped_session_id = scope_session_id(&tenant, &request.session_id);
215
216 let result = state
217 .store_context
218 .store_async(&request.node, &scoped_session_id)
219 .await;
220
221 Ok(Json(StoreResultDto {
222 node_id: result.node_id,
223 psi: result.psi,
224 valid: result.valid,
225 validation_error: result.validation_error,
226 duplicate_skipped: false,
227 upsert_status: if result.valid {
228 "created".to_string()
229 } else {
230 "skipped".to_string()
231 },
232 }))
233}
234
235async fn score_avec_handler(
236 State(state): State<Arc<AppState>>,
237 headers: HeaderMap,
238 Json(request): Json<ScoreAvecHttpRequest>,
239) -> ApiResult<ScoreAvecResultDto> {
240 let _tenant = resolve_http_tenant(request.tenant_id.as_deref(), &headers);
241
242 if request.text.trim().is_empty() {
243 return Err(bad_request("text cannot be empty"));
244 }
245
246 let scorer = state.avec_scorer.as_ref().ok_or_else(|| {
247 bad_request("AVEC scoring is disabled; enable LOCUS_GATEWAY_AVEC_SCORING_ENABLED")
248 })?;
249
250 let avec = scorer
251 .score_async(request.text.trim())
252 .await
253 .map_err(internal_error)?;
254
255 Ok(Json(ScoreAvecResultDto {
256 provider: scorer.provider_name().to_string(),
257 model: scorer.model_name().to_string(),
258 avec: to_avec_dto(avec),
259 }))
260}
261
262async fn rename_session_handler(
263 State(state): State<Arc<AppState>>,
264 headers: HeaderMap,
265 Json(request): Json<RenameSessionHttpRequest>,
266) -> ApiResult<RenameSessionResultDto> {
267 let tenant = resolve_http_tenant(request.tenant_id.as_deref(), &headers);
268 let source_session_id = request.source_session_id.trim();
269 let target_session_id = request.target_session_id.trim();
270
271 if source_session_id.is_empty() || target_session_id.is_empty() {
272 return Err(bad_request(
273 "sourceSessionId and targetSessionId are required",
274 ));
275 }
276
277 if source_session_id == target_session_id {
278 return Ok(Json(RenameSessionResultDto {
279 source_session_id: source_session_id.to_string(),
280 target_session_id: target_session_id.to_string(),
281 moved_nodes: 0,
282 moved_calibrations: 0,
283 scopes_applied: 0,
284 }));
285 }
286
287 let scoped_source_session_id = scope_session_id(&tenant, source_session_id);
288 let scoped_target_session_id = scope_session_id(&tenant, target_session_id);
289
290 let source_nodes = state
291 .node_store
292 .query_nodes_async(core_models::NodeQuery {
293 limit: 10_000,
294 session_id: Some(scoped_source_session_id.clone()),
295 from_utc: None,
296 to_utc: None,
297 tiers: None,
298 })
299 .await
300 .map_err(internal_error)?;
301
302 if source_nodes.is_empty() {
303 return Err(bad_request(format!(
304 "source session not found: {source_session_id}"
305 )));
306 }
307
308 let mut anchor_node_ids = Vec::with_capacity(source_nodes.len());
309 for node in source_nodes {
310 let upsert = state
311 .node_store
312 .upsert_node_async(node)
313 .await
314 .map_err(internal_error)?;
315 anchor_node_ids.push(upsert.node_id);
316 }
317 anchor_node_ids.sort();
318 anchor_node_ids.dedup();
319
320 let rekey_result = state
321 .rekey_scope
322 .rekey_async(
323 anchor_node_ids,
324 &tenant,
325 &scoped_target_session_id,
326 false,
327 request.allow_merge.unwrap_or(false),
328 )
329 .await
330 .map_err(internal_error)?;
331
332 if let Some(conflict) = rekey_result.scopes.iter().find(|scope| scope.conflict) {
333 return Err(bad_request(
334 conflict
335 .message
336 .clone()
337 .unwrap_or_else(|| "target session already exists".to_string()),
338 ));
339 }
340
341 let scopes_applied = rekey_result
342 .scopes
343 .iter()
344 .filter(|scope| scope.applied)
345 .count();
346
347 Ok(Json(RenameSessionResultDto {
348 source_session_id: source_session_id.to_string(),
349 target_session_id: target_session_id.to_string(),
350 moved_nodes: rekey_result.temporal_nodes_updated,
351 moved_calibrations: rekey_result.calibrations_updated,
352 scopes_applied,
353 }))
354}
355
356async fn get_context_handler(
357 State(state): State<Arc<AppState>>,
358 headers: HeaderMap,
359 Json(request): Json<GetContextHttpRequest>,
360) -> ApiResult<RetrieveResultDto> {
361 let tenant = resolve_http_tenant(request.tenant_id.as_deref(), &headers);
362 let scoped_session_id = scope_session_id(&tenant, &request.session_id);
363
364 let limit = request.limit.unwrap_or(5);
365 let tiers = normalize_request_tiers(request.tiers.as_deref());
366 let query_embedding = resolve_query_embedding(
367 state.embedding_provider.as_ref(),
368 request.query_text.as_deref(),
369 request.query_embedding.as_deref(),
370 )
371 .await;
372 let recall_service = MemoryRecallService::new(state.node_store.clone());
373 let recall_result = recall_service
374 .execute(&MemoryRecallRequest {
375 scope: MemoryScope {
376 tenant_id: None,
377 session_ids: Some(vec![scoped_session_id]),
378 tiers,
379 from_utc: request.from_utc,
380 to_utc: request.to_utc,
381 },
382 page: MemoryPage {
383 limit,
384 cursor: None,
385 },
386 scoring: MemoryScoring {
387 alpha: request
388 .alpha
389 .unwrap_or(DEFAULT_HYBRID_ALPHA)
390 .clamp(0.0, 1.0),
391 beta: request.beta.unwrap_or(DEFAULT_HYBRID_BETA).clamp(0.0, 1.0),
392 fallback_policy: FallbackPolicy::Never,
393 ..Default::default()
394 },
395 current_avec: Some(core_models::AvecState {
396 stability: request.stability,
397 friction: request.friction,
398 logic: request.logic,
399 autonomy: request.autonomy,
400 }),
401 query_text: request.query_text,
402 query_embedding,
403 ..Default::default()
404 })
405 .await
406 .map_err(internal_error)?;
407
408 Ok(Json(RetrieveResultDto {
409 nodes: recall_result.nodes.iter().map(to_node_dto).collect(),
410 retrieved: recall_result.retrieved,
411 psi_range: PsiRangeDto {
412 min: recall_result.psi_range.min,
413 max: recall_result.psi_range.max,
414 average: recall_result.psi_range.average,
415 },
416 }))
417}
418
419async fn get_embedding_context_handler(
420 State(state): State<Arc<AppState>>,
421 headers: HeaderMap,
422 Json(request): Json<GetEmbeddingContextHttpRequest>,
423) -> ApiResult<RetrieveResultDto> {
424 let tenant = resolve_http_tenant(request.tenant_id.as_deref(), &headers);
425 let scoped_session_id = scope_session_id(&tenant, &request.session_id);
426 let limit = request.limit.unwrap_or(5);
427 let tiers = normalize_request_tiers(request.tiers.as_deref());
428
429 let rag_embedding = resolve_query_embedding(
430 state.embedding_provider.as_ref(),
431 request.rag_query_text.as_deref(),
432 request.rag_embedding.as_deref(),
433 )
434 .await;
435
436 let avec_embedding = resolve_query_embedding(
437 state.embedding_provider.as_ref(),
438 request.avec_query_text.as_deref(),
439 request.avec_embedding.as_deref(),
440 )
441 .await;
442
443 let fused_embedding = fuse_weighted_embeddings(
444 rag_embedding.as_deref(),
445 avec_embedding.as_deref(),
446 request.rag_weight.unwrap_or(0.7),
447 request.avec_weight.unwrap_or(0.3),
448 )?;
449
450 if fused_embedding.is_empty() {
451 return Err(bad_request(
452 "Provide ragEmbedding/ragQueryText and/or avecEmbedding/avecQueryText",
453 ));
454 }
455
456 let result = state
457 .context_query
458 .get_context_hybrid_scoped_filtered_async(
459 Some(&scoped_session_id),
460 request.stability,
461 request.friction,
462 request.logic,
463 request.autonomy,
464 request.from_utc,
465 request.to_utc,
466 tiers.as_deref(),
467 Some(fused_embedding.as_slice()),
468 request
469 .alpha
470 .unwrap_or(DEFAULT_HYBRID_ALPHA)
471 .clamp(0.0, 1.0),
472 request.beta.unwrap_or(DEFAULT_HYBRID_BETA).clamp(0.0, 1.0),
473 limit,
474 )
475 .await;
476
477 Ok(Json(RetrieveResultDto {
478 nodes: result.nodes.iter().map(to_node_dto).collect(),
479 retrieved: result.retrieved,
480 psi_range: PsiRangeDto {
481 min: result.psi_range.min,
482 max: result.psi_range.max,
483 average: result.psi_range.average,
484 },
485 }))
486}
487
488fn fuse_weighted_embeddings(
489 rag_embedding: Option<&[f32]>,
490 avec_embedding: Option<&[f32]>,
491 rag_weight: f32,
492 avec_weight: f32,
493) -> Result<Vec<f32>, (StatusCode, Json<ErrorResponse>)> {
494 let rag_weight = rag_weight.clamp(0.0, 1.0);
495 let avec_weight = avec_weight.clamp(0.0, 1.0);
496
497 match (rag_embedding, avec_embedding) {
498 (Some(rag), Some(avec)) => {
499 if rag.len() != avec.len() {
500 return Err(bad_request(
501 "rag embedding and avec embedding must have the same dimensions",
502 ));
503 }
504
505 let sum = rag_weight + avec_weight;
506 let denom = if sum > f32::EPSILON { sum } else { 1.0 };
507 let fused = rag
508 .iter()
509 .zip(avec.iter())
510 .map(|(r, a)| ((r * rag_weight) + (a * avec_weight)) / denom)
511 .collect::<Vec<_>>();
512 Ok(fused)
513 }
514 (Some(rag), None) => Ok(rag.to_vec()),
515 (None, Some(avec)) => Ok(avec.to_vec()),
516 (None, None) => Ok(Vec::new()),
517 }
518}
519
520async fn list_nodes_handler(
521 State(state): State<Arc<AppState>>,
522 headers: HeaderMap,
523 Query(query): Query<ListNodesQuery>,
524) -> ApiResult<ListNodesResultDto> {
525 let tenant = resolve_http_tenant(query.tenant_id.as_deref(), &headers);
526 let requested_limit = query.limit.unwrap_or(50).clamp(1, TENANT_SCAN_LIMIT);
527 let scoped_session_filter = query
528 .session_id
529 .as_deref()
530 .map(|session_id| scope_session_id(&tenant, session_id));
531 let backend_limit = if scoped_session_filter.is_some() {
532 requested_limit
533 } else {
534 TENANT_SCAN_LIMIT
535 };
536
537 let find_service = MemoryFindService::new(state.node_store.clone());
538 let result = find_service
539 .execute(&MemoryFindRequest {
540 scope: MemoryScope {
541 tenant_id: None,
542 session_ids: scoped_session_filter.map(|session| vec![session]),
543 tiers: None,
544 from_utc: None,
545 to_utc: None,
546 },
547 page: MemoryPage {
548 limit: backend_limit,
549 cursor: None,
550 },
551 ..Default::default()
552 })
553 .await
554 .map_err(internal_error)?;
555
556 let nodes = result
557 .nodes
558 .into_iter()
559 .filter_map(|node| normalize_node_for_tenant(node, &tenant))
560 .take(requested_limit)
561 .collect::<Vec<_>>();
562
563 Ok(Json(ListNodesResultDto {
564 nodes: nodes.iter().map(to_node_dto).collect(),
565 retrieved: nodes.len(),
566 }))
567}
568
569async fn graph_handler(
570 State(state): State<Arc<AppState>>,
571 headers: HeaderMap,
572 Query(query): Query<GraphQuery>,
573) -> ApiResult<GraphResponse> {
574 let tenant = resolve_http_tenant(query.tenant_id.as_deref(), &headers);
575 let capped_limit = query.limit.unwrap_or(1000).clamp(1, 5000);
576 let scoped_session_filter = query
577 .session_id
578 .as_deref()
579 .map(|session_id| scope_session_id(&tenant, session_id));
580 let backend_limit = if scoped_session_filter.is_some() {
581 capped_limit
582 } else {
583 TENANT_SCAN_LIMIT
584 };
585
586 let find_service = MemoryFindService::new(state.node_store.clone());
587 let result = find_service
588 .execute(&MemoryFindRequest {
589 scope: MemoryScope {
590 tenant_id: None,
591 session_ids: scoped_session_filter.map(|session| vec![session]),
592 tiers: None,
593 from_utc: None,
594 to_utc: None,
595 },
596 page: MemoryPage {
597 limit: backend_limit,
598 cursor: None,
599 },
600 ..Default::default()
601 })
602 .await
603 .map_err(internal_error)?;
604
605 let mut ordered_nodes = result
606 .nodes
607 .into_iter()
608 .filter_map(|node| normalize_node_for_tenant(node, &tenant))
609 .take(capped_limit)
610 .collect::<Vec<_>>();
611 ordered_nodes.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
612
613 #[derive(Clone)]
614 struct SessionGroup {
615 id: String,
616 label: String,
617 nodes: Vec<core_models::SttpNode>,
618 node_count: usize,
619 avg_psi: f32,
620 last_modified: DateTime<Utc>,
621 size: usize,
622 }
623
624 let mut grouped_map: BTreeMap<String, Vec<core_models::SttpNode>> = BTreeMap::new();
625 for node in &ordered_nodes {
626 grouped_map
627 .entry(node.session_id.clone())
628 .or_default()
629 .push(node.clone());
630 }
631
632 let mut grouped = grouped_map
633 .into_iter()
634 .map(|(id, mut nodes)| {
635 nodes.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
636 let node_count = nodes.len();
637 let avg_psi = if node_count == 0 {
638 0.0
639 } else {
640 nodes.iter().map(|n| n.psi).sum::<f32>() / node_count as f32
641 };
642 let last_modified = nodes.first().map(|n| n.timestamp).unwrap_or_else(Utc::now);
643 let size = 16 + std::cmp::min(28, node_count * 2);
644
645 SessionGroup {
646 label: id.clone(),
647 id,
648 nodes,
649 node_count,
650 avg_psi,
651 last_modified,
652 size,
653 }
654 })
655 .collect::<Vec<_>>();
656
657 grouped.sort_by(|a, b| b.last_modified.cmp(&a.last_modified));
658
659 let node_by_id = ordered_nodes
660 .iter()
661 .map(|node| (graph_node_id(node), node.clone()))
662 .collect::<HashMap<_, _>>();
663
664 let sessions = grouped
665 .iter()
666 .map(|session| {
667 json!({
668 "id": format!("s:{}", session.id),
669 "label": session.label,
670 "nodeCount": session.node_count,
671 "avgPsi": session.avg_psi,
672 "lastModified": session.last_modified.to_rfc3339(),
673 "size": session.size
674 })
675 })
676 .collect::<Vec<_>>();
677
678 let nodes = ordered_nodes
679 .iter()
680 .map(|node| {
681 json!({
682 "id": graph_node_id(node),
683 "sessionId": node.session_id,
684 "label": format!("{} {}", node.tier, node.timestamp.format("%m-%d %H:%M")),
685 "tier": node.tier,
686 "timestamp": node.timestamp.to_rfc3339(),
687 "psi": node.psi,
688 "parentNodeId": node.parent_node_id,
689 "size": 9
690 })
691 })
692 .collect::<Vec<_>>();
693
694 let mut edges = Vec::new();
695
696 for i in 0..grouped.len().saturating_sub(1) {
697 edges.push(json!({
698 "id": format!("t-{i}"),
699 "source": format!("s:{}", grouped[i].id),
700 "target": format!("s:{}", grouped[i + 1].id),
701 "kind": "timeline"
702 }));
703 }
704
705 for i in 0..grouped.len() {
706 let from = &grouped[i];
707 let mut nearest: Option<usize> = None;
708 let mut nearest_distance = f32::MAX;
709
710 for (j, other) in grouped.iter().enumerate() {
711 if i == j {
712 continue;
713 }
714 let distance = (from.avg_psi - other.avg_psi).abs();
715 if distance < nearest_distance {
716 nearest_distance = distance;
717 nearest = Some(j);
718 }
719 }
720
721 if let Some(nearest_index) = nearest {
722 if i < nearest_index {
723 edges.push(json!({
724 "id": format!("s-{i}-{nearest_index}"),
725 "source": format!("s:{}", from.id),
726 "target": format!("s:{}", grouped[nearest_index].id),
727 "kind": "similarity"
728 }));
729 }
730 }
731 }
732
733 for session in &grouped {
734 for i in 0..session.nodes.len() {
735 let current = &session.nodes[i];
736 let current_id = graph_node_id(current);
737
738 edges.push(json!({
739 "id": format!("m-{}-{i}", session.id),
740 "source": format!("s:{}", session.id),
741 "target": current_id,
742 "kind": "membership"
743 }));
744
745 if i + 1 < session.nodes.len() {
746 let older = &session.nodes[i + 1];
747 edges.push(json!({
748 "id": format!("nt-{}-{i}", session.id),
749 "source": current_id,
750 "target": graph_node_id(older),
751 "kind": "node_timeline"
752 }));
753 }
754
755 if let Some(parent) = current.parent_node_id.as_ref() {
756 if node_by_id.contains_key(parent) {
757 edges.push(json!({
758 "id": format!("l-{}-{i}", session.id),
759 "source": current_id,
760 "target": parent,
761 "kind": "lineage"
762 }));
763 }
764 }
765 }
766 }
767
768 Ok(Json(GraphResponse {
769 sessions,
770 nodes,
771 edges,
772 retrieved: ordered_nodes.len(),
773 }))
774}
775
776async fn get_moods_handler(
777 State(state): State<Arc<AppState>>,
778 Query(query): Query<GetMoodsQuery>,
779) -> ApiResult<MoodCatalogResultDto> {
780 let result = state.mood_catalog.get(
781 query.target_mood.as_deref(),
782 query.blend.unwrap_or(1.0),
783 query.current_stability,
784 query.current_friction,
785 query.current_logic,
786 query.current_autonomy,
787 );
788
789 Ok(Json(to_mood_catalog_dto(result)))
790}
791
792async fn create_monthly_rollup_handler(
793 State(state): State<Arc<AppState>>,
794 headers: HeaderMap,
795 Json(request): Json<CreateMonthlyRollupHttpRequest>,
796) -> ApiResult<MonthlyRollupResultDto> {
797 let tenant = resolve_http_tenant(request.tenant_id.as_deref(), &headers);
798
799 let rollup_request = MonthlyRollupRequest {
800 session_id: scope_session_id(&tenant, &request.session_id),
801 start_utc: request.start_date_utc,
802 end_utc: request.end_date_utc,
803 source_session_id: request
804 .source_session_id
805 .map(|session_id| scope_session_id(&tenant, &session_id)),
806 parent_node_id: request.parent_node_id,
807 persist: request.persist.unwrap_or(true),
808 limit: request.limit.unwrap_or(5000),
809 };
810
811 let result = state.monthly_rollup.create_async(rollup_request).await;
812 Ok(Json(to_monthly_rollup_dto(result)))
813}
814
815async fn batch_rekey_handler(
816 State(state): State<Arc<AppState>>,
817 headers: HeaderMap,
818 Json(request): Json<BatchRekeyHttpRequest>,
819) -> ApiResult<BatchRekeyResultDto> {
820 if request.node_ids.is_empty() {
821 return Err(bad_request("nodeIds must contain at least one value"));
822 }
823
824 if request.target_session_id.trim().is_empty() {
825 return Err(bad_request("targetSessionId cannot be empty"));
826 }
827
828 let target_tenant = resolve_http_tenant(request.target_tenant_id.as_deref(), &headers);
829 let scoped_target_session = scope_session_id(&target_tenant, request.target_session_id.trim());
830
831 let result = state
832 .rekey_scope
833 .rekey_async(
834 request.node_ids,
835 &target_tenant,
836 &scoped_target_session,
837 request.dry_run.unwrap_or(true),
838 request.allow_merge.unwrap_or(false),
839 )
840 .await
841 .map_err(internal_error)?;
842
843 Ok(Json(to_batch_rekey_dto(result)))
844}
845
846async fn preview_embedding_migration_handler(
847 State(state): State<Arc<AppState>>,
848 headers: HeaderMap,
849 Json(request): Json<EmbeddingMigrationPreviewHttpRequest>,
850) -> ApiResult<EmbeddingMigrationPreviewResultDto> {
851 let tenant = resolve_http_tenant(request.tenant_id.as_deref(), &headers);
852 let sample_limit = request.sample_limit.unwrap_or(20).clamp(1, 200);
853 let max_nodes = request.max_nodes.unwrap_or(5_000).clamp(1, 50_000);
854 let (scope, filter, sync_keys) = scoped_memory_filter(request.filter, &tenant);
855
856 let find_service = MemoryFindService::new(state.node_store.clone());
857 let find_result = find_service
858 .execute(&MemoryFindRequest {
859 scope,
860 filter,
861 page: MemoryPage {
862 limit: max_nodes,
863 cursor: None,
864 },
865 ..Default::default()
866 })
867 .await
868 .map_err(internal_error)?;
869
870 let mut nodes = find_result.nodes;
871 if let Some(sync_keys) = sync_keys {
872 nodes.retain(|node| sync_keys.iter().any(|key| key == &node.sync_key));
873 }
874
875 let total_candidates = nodes.len();
876 let sample = nodes
877 .into_iter()
878 .take(sample_limit)
879 .map(|sample| EmbeddingMigrationSampleDto {
880 sync_key: sample.sync_key,
881 session_id: display_session_id(&sample.session_id),
882 tier: sample.tier,
883 has_embedding: sample
884 .embedding
885 .as_ref()
886 .is_some_and(|values| !values.is_empty()),
887 embedding_model: sample.embedding_model,
888 embedding_dimensions: sample.embedding_dimensions,
889 embedded_at: sample.embedded_at,
890 updated_at: sample.updated_at,
891 context_summary: sample.context_summary,
892 })
893 .collect::<Vec<_>>();
894
895 Ok(Json(EmbeddingMigrationPreviewResultDto {
896 total_candidates,
897 sample,
898 provider_available: state.embedding_provider.is_some(),
899 provider_model: state
900 .embedding_provider
901 .as_ref()
902 .map(|provider| provider.model_name().to_string()),
903 }))
904}
905
906async fn run_embedding_migration_handler(
907 State(state): State<Arc<AppState>>,
908 headers: HeaderMap,
909 Json(request): Json<EmbeddingMigrationRunHttpRequest>,
910) -> ApiResult<EmbeddingMigrationRunResultDto> {
911 let tenant = resolve_http_tenant(request.tenant_id.as_deref(), &headers);
912 let mode = match request
913 .mode
914 .unwrap_or(EmbeddingMigrationModeHttp::MissingOnly)
915 {
916 EmbeddingMigrationModeHttp::MissingOnly => MemoryTransformOperation::EmbedBackfill,
917 EmbeddingMigrationModeHttp::ReindexAll => MemoryTransformOperation::ReindexEmbeddings,
918 };
919 let dry_run = request.dry_run.unwrap_or(true);
920 let batch_size = request.batch_size.unwrap_or(100).clamp(1, 500);
921 let max_nodes = request.max_nodes.unwrap_or(5_000).clamp(1, 50_000);
922 let (scope, filter, _sync_keys) = scoped_memory_filter(request.filter, &tenant);
923
924 let providers = build_gateway_provider_registry(state.embedding_provider.clone());
925 let transform_service = MemoryTransformService::new(state.node_store.clone(), providers);
926
927 let result = transform_service
928 .execute(&MemoryTransformRequest {
929 scope,
930 filter,
931 operation: mode,
932 dry_run,
933 batch_size,
934 max_nodes,
935 provider_id: state
936 .embedding_provider
937 .as_ref()
938 .map(|_| "gateway-embedding".to_string()),
939 model: state
940 .embedding_provider
941 .as_ref()
942 .map(|provider| provider.model_name().to_string()),
943 })
944 .await
945 .map_err(internal_error)?;
946
947 Ok(Json(to_embedding_migration_run_dto(
948 result,
949 mode,
950 dry_run,
951 state
952 .embedding_provider
953 .as_ref()
954 .map(|provider| provider.model_name().to_string()),
955 )))
956}
957
958fn bad_request(message: impl Into<String>) -> (StatusCode, Json<ErrorResponse>) {
959 (
960 StatusCode::BAD_REQUEST,
961 Json(ErrorResponse {
962 error: message.into(),
963 }),
964 )
965}
966
967fn internal_error(error: impl std::fmt::Display) -> (StatusCode, Json<ErrorResponse>) {
968 (
969 StatusCode::INTERNAL_SERVER_ERROR,
970 Json(ErrorResponse {
971 error: error.to_string(),
972 }),
973 )
974}
975
976fn graph_node_id(node: &core_models::SttpNode) -> String {
977 format!(
978 "n:{}|{}|{}|{:.4}",
979 node.session_id,
980 node.timestamp.to_rfc3339(),
981 node.compression_depth,
982 node.psi
983 )
984}
985
986fn to_avec_dto(value: core_models::AvecState) -> AvecStateDto {
987 AvecStateDto {
988 stability: value.stability,
989 friction: value.friction,
990 logic: value.logic,
991 autonomy: value.autonomy,
992 psi: value.psi(),
993 }
994}
995
996fn to_node_dto(value: &core_models::SttpNode) -> SttpNodeDto {
997 SttpNodeDto {
998 raw: value.raw.clone(),
999 session_id: display_session_id(&value.session_id),
1000 tier: value.tier.clone(),
1001 timestamp: value.timestamp,
1002 compression_depth: value.compression_depth,
1003 parent_node_id: value.parent_node_id.clone(),
1004 user_avec: to_avec_dto(value.user_avec),
1005 model_avec: to_avec_dto(value.model_avec),
1006 compression_avec: value.compression_avec.map(to_avec_dto),
1007 rho: value.rho,
1008 kappa: value.kappa,
1009 psi: value.psi,
1010 sync_key: value.sync_key.clone(),
1011 synthetic_id: graph_node_id(value),
1012 }
1013}
1014
1015fn to_mood_catalog_dto(result: core_models::MoodCatalogResult) -> MoodCatalogResultDto {
1016 MoodCatalogResultDto {
1017 presets: result
1018 .presets
1019 .into_iter()
1020 .map(|preset| MoodPresetDto {
1021 name: preset.name,
1022 description: preset.description,
1023 avec: to_avec_dto(preset.avec),
1024 })
1025 .collect(),
1026 apply_guide: result.apply_guide,
1027 swap_preview: result.swap_preview.map(|preview| MoodSwapPreviewDto {
1028 target_mood: preview.target_mood,
1029 blend: preview.blend,
1030 current: to_avec_dto(preview.current),
1031 target: to_avec_dto(preview.target),
1032 blended: to_avec_dto(preview.blended),
1033 }),
1034 }
1035}
1036
1037fn to_monthly_rollup_dto(result: core_models::MonthlyRollupResult) -> MonthlyRollupResultDto {
1038 MonthlyRollupResultDto {
1039 success: result.success,
1040 node_id: result.node_id,
1041 raw_node: result.raw_node,
1042 error: result.error,
1043 source_nodes: result.source_nodes,
1044 parent_reference: result.parent_reference,
1045 user_average: to_avec_dto(result.user_average),
1046 model_average: to_avec_dto(result.model_average),
1047 compression_average: to_avec_dto(result.compression_average),
1048 rho_range: to_numeric_range_dto(result.rho_range),
1049 kappa_range: to_numeric_range_dto(result.kappa_range),
1050 psi_range: to_numeric_range_dto(result.psi_range),
1051 rho_bands: to_confidence_bands_dto(result.rho_bands),
1052 kappa_bands: to_confidence_bands_dto(result.kappa_bands),
1053 }
1054}
1055
1056fn to_batch_rekey_dto(result: core_models::BatchRekeyResult) -> BatchRekeyResultDto {
1057 let updated_scopes = result.scopes.iter().filter(|scope| scope.applied).count();
1058 let conflict_scopes = result.scopes.iter().filter(|scope| scope.conflict).count();
1059
1060 BatchRekeyResultDto {
1061 dry_run: result.dry_run,
1062 requested_node_ids: result.requested_node_ids,
1063 resolved_node_ids: result.resolved_node_ids,
1064 missing_node_ids: result.missing_node_ids,
1065 scopes: result
1066 .scopes
1067 .into_iter()
1068 .map(|scope| ScopeRekeyResultDto {
1069 source_tenant_id: scope.source_tenant_id,
1070 source_session_id: display_session_id(&scope.source_session_id),
1071 target_tenant_id: scope.target_tenant_id,
1072 target_session_id: display_session_id(&scope.target_session_id),
1073 temporal_nodes: scope.temporal_nodes,
1074 calibrations: scope.calibrations,
1075 target_temporal_nodes: scope.target_temporal_nodes,
1076 target_calibrations: scope.target_calibrations,
1077 applied: scope.applied,
1078 conflict: scope.conflict,
1079 message: scope.message,
1080 })
1081 .collect(),
1082 temporal_nodes_updated: result.temporal_nodes_updated,
1083 calibrations_updated: result.calibrations_updated,
1084 updated_scopes,
1085 conflict_scopes,
1086 }
1087}
1088
1089fn scoped_memory_filter(
1090 request_filter: Option<EmbeddingMigrationFilterHttp>,
1091 tenant: &str,
1092) -> (MemoryScope, MemoryFilter, Option<Vec<String>>) {
1093 let filter = request_filter.unwrap_or(EmbeddingMigrationFilterHttp {
1094 session_id: None,
1095 from_utc: None,
1096 to_utc: None,
1097 tiers: None,
1098 has_embedding: None,
1099 embedding_model: None,
1100 sync_keys: None,
1101 });
1102
1103 (
1104 MemoryScope {
1105 tenant_id: None,
1106 session_ids: filter
1107 .session_id
1108 .map(|session_id| vec![scope_session_id(tenant, &session_id)]),
1109 tiers: filter.tiers,
1110 from_utc: filter.from_utc,
1111 to_utc: filter.to_utc,
1112 },
1113 MemoryFilter {
1114 has_embedding: filter.has_embedding,
1115 embedding_model: filter.embedding_model,
1116 ..Default::default()
1117 },
1118 filter.sync_keys,
1119 )
1120}
1121
1122fn build_gateway_provider_registry(
1123 provider: Option<Arc<dyn EmbeddingProvider>>,
1124) -> Arc<InMemoryAiProviderRegistry> {
1125 let mut registry = InMemoryAiProviderRegistry::new();
1126 if let Some(provider) = provider {
1127 registry.register(SttpEmbeddingProviderAdapter::new("gateway-embedding", provider));
1128 }
1129 Arc::new(registry)
1130}
1131
1132fn to_embedding_migration_run_dto(
1133 result: locus_sdk::domain::memory::MemoryTransformResult,
1134 mode: MemoryTransformOperation,
1135 dry_run: bool,
1136 provider_model: Option<String>,
1137) -> EmbeddingMigrationRunResultDto {
1138 EmbeddingMigrationRunResultDto {
1139 scanned: result.scanned,
1140 selected: result.selected,
1141 updated: result.updated,
1142 skipped: result.skipped,
1143 failed: result.failed,
1144 duplicate: result.duplicate,
1145 started_at: result.started_at,
1146 completed_at: result.completed_at,
1147 provider_model,
1148 dry_run,
1149 mode: match mode {
1150 MemoryTransformOperation::EmbedBackfill => "missing_only".to_string(),
1151 MemoryTransformOperation::ReindexEmbeddings => "reindex_all".to_string(),
1152 },
1153 failure_reasons: result.failures,
1154 }
1155}
1156
1157fn to_numeric_range_dto(value: NumericRange) -> NumericRangeDto {
1158 NumericRangeDto {
1159 min: value.min,
1160 max: value.max,
1161 average: value.average,
1162 }
1163}
1164
1165fn to_confidence_bands_dto(value: ConfidenceBandSummary) -> ConfidenceBandSummaryDto {
1166 ConfidenceBandSummaryDto {
1167 low: value.low,
1168 medium: value.medium,
1169 high: value.high,
1170 }
1171}
1172
1173#[derive(Clone)]
1174struct GrpcGatewayService {
1175 state: Arc<AppState>,
1176}
1177
1178impl GrpcGatewayService {
1179 fn new(state: Arc<AppState>) -> Self {
1180 Self { state }
1181 }
1182}
1183
1184#[tonic::async_trait]
1185impl proto::sttp_gateway_service_server::SttpGatewayService for GrpcGatewayService {
1186 async fn calibrate_session(
1187 &self,
1188 request: Request<proto::CalibrateSessionRequest>,
1189 ) -> Result<Response<proto::CalibrateSessionReply>, Status> {
1190 let tenant = resolve_grpc_tenant(request.metadata());
1191 let request = request.into_inner();
1192 let trigger = if request.trigger.trim().is_empty() {
1193 "manual"
1194 } else {
1195 &request.trigger
1196 };
1197 let scoped_session_id = scope_session_id(&tenant, &request.session_id);
1198
1199 let result = self
1200 .state
1201 .calibration
1202 .calibrate_async(
1203 &scoped_session_id,
1204 request.stability,
1205 request.friction,
1206 request.logic,
1207 request.autonomy,
1208 trigger,
1209 )
1210 .await
1211 .map_err(|err| Status::internal(err.to_string()))?;
1212
1213 let reply = proto::CalibrateSessionReply {
1214 previous_avec: Some(to_grpc_avec(result.previous_avec)),
1215 delta: result.delta,
1216 drift_classification: format!("{:?}", result.drift_classification),
1217 trigger: result.trigger,
1218 trigger_history: result.trigger_history,
1219 is_first_calibration: result.is_first_calibration,
1220 };
1221
1222 Ok(Response::new(reply))
1223 }
1224
1225 async fn store_context(
1226 &self,
1227 request: Request<proto::StoreContextRequest>,
1228 ) -> Result<Response<proto::StoreContextReply>, Status> {
1229 let tenant = resolve_grpc_tenant(request.metadata());
1230 let request = request.into_inner();
1231 let scoped_session_id = scope_session_id(&tenant, &request.session_id);
1232
1233 let result = self
1234 .state
1235 .store_context
1236 .store_async(&request.node, &scoped_session_id)
1237 .await;
1238
1239 let reply = proto::StoreContextReply {
1240 node_id: result.node_id,
1241 psi: result.psi,
1242 valid: result.valid,
1243 validation_error: result.validation_error,
1244 };
1245
1246 Ok(Response::new(reply))
1247 }
1248
1249 async fn get_context(
1250 &self,
1251 request: Request<proto::GetContextRequest>,
1252 ) -> Result<Response<proto::GetContextReply>, Status> {
1253 let tenant = resolve_grpc_tenant(request.metadata());
1254 let request = request.into_inner();
1255 let scoped_session_id = scope_session_id(&tenant, &request.session_id);
1256
1257 let query_embedding = resolve_query_embedding(
1258 self.state.embedding_provider.as_ref(),
1259 request.query_text.as_deref(),
1260 if request.query_embedding.is_empty() {
1261 None
1262 } else {
1263 Some(request.query_embedding.as_slice())
1264 },
1265 )
1266 .await;
1267
1268 let limit = if request.limit <= 0 {
1269 5
1270 } else {
1271 request.limit as usize
1272 };
1273 let tiers = normalize_request_tiers(Some(&request.tiers));
1274 let recall_service = MemoryRecallService::new(self.state.node_store.clone());
1275 let recall_result = recall_service
1276 .execute(&MemoryRecallRequest {
1277 scope: MemoryScope {
1278 tenant_id: None,
1279 session_ids: Some(vec![scoped_session_id]),
1280 tiers,
1281 from_utc: timestamp_from_proto_optional(request.from_utc)?,
1282 to_utc: timestamp_from_proto_optional(request.to_utc)?,
1283 },
1284 page: MemoryPage {
1285 limit,
1286 cursor: None,
1287 },
1288 scoring: MemoryScoring {
1289 alpha: request
1290 .alpha
1291 .unwrap_or(DEFAULT_HYBRID_ALPHA)
1292 .clamp(0.0, 1.0),
1293 beta: request.beta.unwrap_or(DEFAULT_HYBRID_BETA).clamp(0.0, 1.0),
1294 fallback_policy: FallbackPolicy::Never,
1295 ..Default::default()
1296 },
1297 current_avec: Some(core_models::AvecState {
1298 stability: request.stability,
1299 friction: request.friction,
1300 logic: request.logic,
1301 autonomy: request.autonomy,
1302 }),
1303 query_text: request.query_text,
1304 query_embedding,
1305 ..Default::default()
1306 })
1307 .await
1308 .map_err(|err| Status::internal(err.to_string()))?;
1309
1310 let nodes = recall_result
1311 .nodes
1312 .iter()
1313 .cloned()
1314 .filter_map(|node| normalize_node_for_tenant(node, &tenant))
1315 .collect::<Vec<_>>();
1316
1317 let reply = proto::GetContextReply {
1318 nodes: nodes.iter().map(to_grpc_node).collect(),
1319 retrieved: clamp_usize_to_i32(nodes.len()),
1320 psi_range: Some(to_grpc_psi_range(recall_result.psi_range)),
1321 };
1322
1323 Ok(Response::new(reply))
1324 }
1325
1326 async fn get_embedding_context(
1327 &self,
1328 request: Request<proto::GetEmbeddingContextRequest>,
1329 ) -> Result<Response<proto::GetContextReply>, Status> {
1330 let tenant = resolve_grpc_tenant(request.metadata());
1331 let request = request.into_inner();
1332 let scoped_session_id = scope_session_id(&tenant, &request.session_id);
1333
1334 let rag_embedding = resolve_query_embedding(
1335 self.state.embedding_provider.as_ref(),
1336 request.rag_query_text.as_deref(),
1337 if request.rag_embedding.is_empty() {
1338 None
1339 } else {
1340 Some(request.rag_embedding.as_slice())
1341 },
1342 )
1343 .await;
1344
1345 let avec_embedding = resolve_query_embedding(
1346 self.state.embedding_provider.as_ref(),
1347 request.avec_query_text.as_deref(),
1348 if request.avec_embedding.is_empty() {
1349 None
1350 } else {
1351 Some(request.avec_embedding.as_slice())
1352 },
1353 )
1354 .await;
1355
1356 let fused_embedding = fuse_weighted_embeddings(
1357 rag_embedding.as_deref(),
1358 avec_embedding.as_deref(),
1359 request.rag_weight.unwrap_or(0.7),
1360 request.avec_weight.unwrap_or(0.3),
1361 )
1362 .map_err(|(_, payload)| Status::invalid_argument(payload.0.error))?;
1363
1364 if fused_embedding.is_empty() {
1365 return Err(Status::invalid_argument(
1366 "Provide rag_embedding/rag_query_text and/or avec_embedding/avec_query_text",
1367 ));
1368 }
1369
1370 let limit = if request.limit <= 0 {
1371 5
1372 } else {
1373 request.limit as usize
1374 };
1375 let tiers = normalize_request_tiers(Some(&request.tiers));
1376
1377 let result = self
1378 .state
1379 .context_query
1380 .get_context_hybrid_scoped_filtered_async(
1381 Some(&scoped_session_id),
1382 request.stability,
1383 request.friction,
1384 request.logic,
1385 request.autonomy,
1386 timestamp_from_proto_optional(request.from_utc)?,
1387 timestamp_from_proto_optional(request.to_utc)?,
1388 tiers.as_deref(),
1389 Some(fused_embedding.as_slice()),
1390 request
1391 .alpha
1392 .unwrap_or(DEFAULT_HYBRID_ALPHA)
1393 .clamp(0.0, 1.0),
1394 request.beta.unwrap_or(DEFAULT_HYBRID_BETA).clamp(0.0, 1.0),
1395 limit,
1396 )
1397 .await;
1398
1399 let nodes = result
1400 .nodes
1401 .iter()
1402 .cloned()
1403 .filter_map(|node| normalize_node_for_tenant(node, &tenant))
1404 .collect::<Vec<_>>();
1405
1406 let reply = proto::GetContextReply {
1407 nodes: nodes.iter().map(to_grpc_node).collect(),
1408 retrieved: clamp_usize_to_i32(nodes.len()),
1409 psi_range: Some(to_grpc_psi_range(result.psi_range)),
1410 };
1411
1412 Ok(Response::new(reply))
1413 }
1414
1415 async fn list_nodes(
1416 &self,
1417 request: Request<proto::ListNodesRequest>,
1418 ) -> Result<Response<proto::ListNodesReply>, Status> {
1419 let tenant = resolve_grpc_tenant(request.metadata());
1420 let request = request.into_inner();
1421 let requested_limit = if request.limit <= 0 {
1422 50
1423 } else {
1424 request.limit as usize
1425 }
1426 .clamp(1, TENANT_SCAN_LIMIT);
1427 let scoped_session_filter = request
1428 .session_id
1429 .as_deref()
1430 .map(|session_id| scope_session_id(&tenant, session_id));
1431 let backend_limit = if scoped_session_filter.is_some() {
1432 requested_limit
1433 } else {
1434 TENANT_SCAN_LIMIT
1435 };
1436
1437 let find_service = MemoryFindService::new(self.state.node_store.clone());
1438 let result = find_service
1439 .execute(&MemoryFindRequest {
1440 scope: MemoryScope {
1441 tenant_id: None,
1442 session_ids: scoped_session_filter.map(|session| vec![session]),
1443 tiers: None,
1444 from_utc: None,
1445 to_utc: None,
1446 },
1447 page: MemoryPage {
1448 limit: backend_limit,
1449 cursor: None,
1450 },
1451 ..Default::default()
1452 })
1453 .await
1454 .map_err(|err| Status::internal(err.to_string()))?;
1455
1456 let nodes = result
1457 .nodes
1458 .into_iter()
1459 .filter_map(|node| normalize_node_for_tenant(node, &tenant))
1460 .take(requested_limit)
1461 .collect::<Vec<_>>();
1462
1463 let reply = proto::ListNodesReply {
1464 nodes: nodes.iter().map(to_grpc_node).collect(),
1465 retrieved: clamp_usize_to_i32(nodes.len()),
1466 };
1467
1468 Ok(Response::new(reply))
1469 }
1470
1471 async fn get_moods(
1472 &self,
1473 request: Request<proto::GetMoodsRequest>,
1474 ) -> Result<Response<proto::GetMoodsReply>, Status> {
1475 let request = request.into_inner();
1476 let result = self.state.mood_catalog.get(
1477 request.target_mood.as_deref(),
1478 request.blend,
1479 request.current_stability,
1480 request.current_friction,
1481 request.current_logic,
1482 request.current_autonomy,
1483 );
1484
1485 let reply = proto::GetMoodsReply {
1486 presets: result
1487 .presets
1488 .into_iter()
1489 .map(|preset| proto::MoodPreset {
1490 name: preset.name,
1491 description: preset.description,
1492 avec: Some(to_grpc_avec(preset.avec)),
1493 })
1494 .collect(),
1495 apply_guide: result.apply_guide,
1496 swap_preview: result.swap_preview.map(|preview| proto::MoodSwapPreview {
1497 target_mood: preview.target_mood,
1498 blend: preview.blend,
1499 current: Some(to_grpc_avec(preview.current)),
1500 target: Some(to_grpc_avec(preview.target)),
1501 blended: Some(to_grpc_avec(preview.blended)),
1502 }),
1503 };
1504
1505 Ok(Response::new(reply))
1506 }
1507
1508 async fn batch_rekey(
1509 &self,
1510 request: Request<proto::BatchRekeyRequest>,
1511 ) -> Result<Response<proto::BatchRekeyReply>, Status> {
1512 let metadata_tenant = resolve_grpc_tenant(request.metadata());
1513 let request = request.into_inner();
1514
1515 if request.node_ids.is_empty() {
1516 return Err(Status::invalid_argument(
1517 "node_ids must contain at least one value",
1518 ));
1519 }
1520
1521 if request.target_session_id.trim().is_empty() {
1522 return Err(Status::invalid_argument(
1523 "target_session_id cannot be empty",
1524 ));
1525 }
1526
1527 let target_tenant = request
1528 .target_tenant_id
1529 .as_deref()
1530 .and_then(normalize_tenant_value)
1531 .unwrap_or(metadata_tenant);
1532 let scoped_target_session =
1533 scope_session_id(&target_tenant, request.target_session_id.trim());
1534
1535 let result = self
1536 .state
1537 .rekey_scope
1538 .rekey_async(
1539 request.node_ids,
1540 &target_tenant,
1541 &scoped_target_session,
1542 request.dry_run.unwrap_or(true),
1543 request.allow_merge.unwrap_or(false),
1544 )
1545 .await
1546 .map_err(|err| Status::internal(err.to_string()))?;
1547
1548 Ok(Response::new(to_grpc_batch_rekey_reply(result)))
1549 }
1550
1551 async fn create_monthly_rollup(
1552 &self,
1553 request: Request<proto::CreateMonthlyRollupRequest>,
1554 ) -> Result<Response<proto::CreateMonthlyRollupReply>, Status> {
1555 let tenant = resolve_grpc_tenant(request.metadata());
1556 let request = request.into_inner();
1557
1558 let start_utc = timestamp_from_proto(request.start_utc)?;
1559 let end_utc = timestamp_from_proto(request.end_utc)?;
1560
1561 let monthly_request = MonthlyRollupRequest {
1562 session_id: scope_session_id(&tenant, &request.session_id),
1563 start_utc,
1564 end_utc,
1565 source_session_id: request
1566 .source_session_id
1567 .map(|session_id| scope_session_id(&tenant, &session_id)),
1568 parent_node_id: request.parent_node_id,
1569 persist: request.persist,
1570 limit: if request.limit <= 0 {
1571 5000
1572 } else {
1573 request.limit as usize
1574 },
1575 };
1576
1577 let result = self
1578 .state
1579 .monthly_rollup
1580 .create_async(monthly_request)
1581 .await;
1582
1583 let reply = proto::CreateMonthlyRollupReply {
1584 success: result.success,
1585 node_id: result.node_id,
1586 raw_node: result.raw_node,
1587 error: result.error,
1588 source_nodes: clamp_usize_to_i32(result.source_nodes),
1589 parent_reference: result.parent_reference,
1590 user_average: Some(to_grpc_avec(result.user_average)),
1591 model_average: Some(to_grpc_avec(result.model_average)),
1592 compression_average: Some(to_grpc_avec(result.compression_average)),
1593 rho_range: Some(to_grpc_numeric_range(result.rho_range)),
1594 kappa_range: Some(to_grpc_numeric_range(result.kappa_range)),
1595 psi_range: Some(to_grpc_numeric_range(result.psi_range)),
1596 rho_bands: Some(to_grpc_confidence_bands(result.rho_bands)),
1597 kappa_bands: Some(to_grpc_confidence_bands(result.kappa_bands)),
1598 };
1599
1600 Ok(Response::new(reply))
1601 }
1602}
1603
1604fn to_grpc_avec(value: core_models::AvecState) -> proto::AvecState {
1605 proto::AvecState {
1606 stability: value.stability,
1607 friction: value.friction,
1608 logic: value.logic,
1609 autonomy: value.autonomy,
1610 psi: value.psi(),
1611 }
1612}
1613
1614fn to_grpc_node(value: &core_models::SttpNode) -> proto::SttpNode {
1615 proto::SttpNode {
1616 raw: value.raw.clone(),
1617 session_id: display_session_id(&value.session_id),
1618 tier: value.tier.clone(),
1619 timestamp: Some(timestamp_to_proto(value.timestamp)),
1620 compression_depth: value.compression_depth,
1621 parent_node_id: value.parent_node_id.clone(),
1622 user_avec: Some(to_grpc_avec(value.user_avec)),
1623 model_avec: Some(to_grpc_avec(value.model_avec)),
1624 compression_avec: value.compression_avec.map(to_grpc_avec),
1625 rho: value.rho,
1626 kappa: value.kappa,
1627 psi: value.psi,
1628 }
1629}
1630
1631fn to_grpc_psi_range(value: PsiRange) -> proto::PsiRange {
1632 proto::PsiRange {
1633 min: value.min,
1634 max: value.max,
1635 average: value.average,
1636 }
1637}
1638
1639fn to_grpc_numeric_range(value: NumericRange) -> proto::NumericRange {
1640 proto::NumericRange {
1641 min: value.min,
1642 max: value.max,
1643 average: value.average,
1644 }
1645}
1646
1647fn to_grpc_confidence_bands(value: ConfidenceBandSummary) -> proto::ConfidenceBandSummary {
1648 proto::ConfidenceBandSummary {
1649 low: clamp_usize_to_i32(value.low),
1650 medium: clamp_usize_to_i32(value.medium),
1651 high: clamp_usize_to_i32(value.high),
1652 }
1653}
1654
1655fn to_grpc_scope_rekey_result(value: core_models::ScopeRekeyResult) -> proto::ScopeRekeyResult {
1656 proto::ScopeRekeyResult {
1657 source_tenant_id: value.source_tenant_id,
1658 source_session_id: display_session_id(&value.source_session_id),
1659 target_tenant_id: value.target_tenant_id,
1660 target_session_id: display_session_id(&value.target_session_id),
1661 temporal_nodes: clamp_usize_to_i32(value.temporal_nodes),
1662 calibrations: clamp_usize_to_i32(value.calibrations),
1663 target_temporal_nodes: clamp_usize_to_i32(value.target_temporal_nodes),
1664 target_calibrations: clamp_usize_to_i32(value.target_calibrations),
1665 applied: value.applied,
1666 conflict: value.conflict,
1667 message: value.message,
1668 }
1669}
1670
1671fn to_grpc_batch_rekey_reply(value: core_models::BatchRekeyResult) -> proto::BatchRekeyReply {
1672 let updated_scopes = value.scopes.iter().filter(|scope| scope.applied).count();
1673 let conflict_scopes = value.scopes.iter().filter(|scope| scope.conflict).count();
1674
1675 proto::BatchRekeyReply {
1676 dry_run: value.dry_run,
1677 requested_node_ids: clamp_usize_to_i32(value.requested_node_ids),
1678 resolved_node_ids: clamp_usize_to_i32(value.resolved_node_ids),
1679 missing_node_ids: value.missing_node_ids,
1680 scopes: value
1681 .scopes
1682 .into_iter()
1683 .map(to_grpc_scope_rekey_result)
1684 .collect(),
1685 temporal_nodes_updated: clamp_usize_to_i32(value.temporal_nodes_updated),
1686 calibrations_updated: clamp_usize_to_i32(value.calibrations_updated),
1687 updated_scopes: clamp_usize_to_i32(updated_scopes),
1688 conflict_scopes: clamp_usize_to_i32(conflict_scopes),
1689 }
1690}
1691
1692fn timestamp_to_proto(value: DateTime<Utc>) -> prost_types::Timestamp {
1693 prost_types::Timestamp {
1694 seconds: value.timestamp(),
1695 nanos: value.timestamp_subsec_nanos() as i32,
1696 }
1697}
1698
1699fn timestamp_from_proto(value: Option<prost_types::Timestamp>) -> Result<DateTime<Utc>, Status> {
1700 let value = value.ok_or_else(|| Status::invalid_argument("missing timestamp"))?;
1701 DateTime::<Utc>::from_timestamp(value.seconds, value.nanos as u32)
1702 .ok_or_else(|| Status::invalid_argument("invalid timestamp"))
1703}
1704
1705fn timestamp_from_proto_optional(
1706 value: Option<prost_types::Timestamp>,
1707) -> Result<Option<DateTime<Utc>>, Status> {
1708 value
1709 .map(|timestamp| timestamp_from_proto(Some(timestamp)))
1710 .transpose()
1711}
1712
1713fn normalize_request_tiers(tiers: Option<&[String]>) -> Option<Vec<String>> {
1714 let normalized = tiers
1715 .unwrap_or(&[])
1716 .iter()
1717 .map(|tier| tier.trim().to_ascii_lowercase())
1718 .filter(|tier| !tier.is_empty())
1719 .collect::<Vec<_>>();
1720
1721 if normalized.is_empty() {
1722 None
1723 } else {
1724 Some(normalized)
1725 }
1726}
1727
1728fn clamp_usize_to_i32(value: usize) -> i32 {
1729 if value > i32::MAX as usize {
1730 i32::MAX
1731 } else {
1732 value as i32
1733 }
1734}
1735
1736#[cfg(test)]
1737mod tests {
1738 use super::*;
1739 use crate::gateway::proto::sttp_gateway_service_server::SttpGatewayService;
1740 use crate::gateway_args::{EmbeddingsProviderKind, GatewayBackend};
1741 use crate::providers::parse_avec_state_from_text;
1742 use crate::tenant::session_belongs_to_tenant;
1743 use locus_core_rs::storage::{
1744 SurrealDbEndpointsSettings, SurrealDbRuntimeOptions, SurrealDbSettings,
1745 };
1746
1747 fn sample_node(session_id: &str) -> String {
1748 format!(
1749 r#"
1750⊕⟨ {{ trigger: manual, response_format: temporal_node, origin_session: "{session_id}", compression_depth: 1, parent_node: null, prime: {{ attractor_config: {{ stability: 0.85, friction: 0.25, logic: 0.80, autonomy: 0.70 }}, context_summary: "gateway test", relevant_tier: raw, retrieval_budget: 3 }} }} ⟩
1751⦿⟨ {{ timestamp: "2026-03-05T06:30:00Z", tier: raw, session_id: "{session_id}", user_avec: {{ stability: 0.85, friction: 0.25, logic: 0.80, autonomy: 0.70, psi: 2.60 }}, model_avec: {{ stability: 0.85, friction: 0.25, logic: 0.80, autonomy: 0.70, psi: 2.60 }} }} ⟩
1752◈⟨ {{ test(.99): "gateway parser check" }} ⟩
1753⍉⟨ {{ rho: 0.96, kappa: 0.94, psi: 2.60, compression_avec: {{ stability: 0.85, friction: 0.25, logic: 0.80, autonomy: 0.70, psi: 2.60 }} }} ⟩
1754"#
1755 )
1756 }
1757
1758 fn surreal_test_args() -> GatewayArgs {
1759 GatewayArgs {
1760 http_port: 8080,
1761 grpc_port: 8081,
1762 backend: GatewayBackend::Surreal,
1763 root_dir_name: ".locus-gateway".to_string(),
1764 remote: true,
1765 cors_enabled: true,
1766 cors_allowed_origins: "*".to_string(),
1767 surreal_embedded_endpoint: None,
1768 surreal_remote_endpoint: Some("ws://127.0.0.1:8000/rpc".to_string()),
1769 surreal_namespace: "entasis".to_string(),
1770 surreal_database: "locus_gateway".to_string(),
1771 surreal_user: "root".to_string(),
1772 surreal_password: "root".to_string(),
1773 embeddings_enabled: false,
1774 embeddings_provider: EmbeddingsProviderKind::Ollama,
1775 embeddings_endpoint: "http://127.0.0.1:11434/api/embeddings".to_string(),
1776 embeddings_model: "sttp-encoder".to_string(),
1777 embeddings_repo: "sentence-transformers/all-MiniLM-L6-v2".to_string(),
1778 avec_scoring_enabled: false,
1779 avec_scoring_endpoint: "http://127.0.0.1:11434/api/chat".to_string(),
1780 avec_scoring_model: "qwen2.5:0.5b".to_string(),
1781 }
1782 }
1783
1784 #[test]
1785 fn parse_avec_state_accepts_plain_json() {
1786 let value = parse_avec_state_from_text(
1787 r#"{"stability":0.8,"friction":0.2,"logic":0.9,"autonomy":0.7}"#,
1788 )
1789 .expect("plain JSON should parse");
1790
1791 assert!((value.stability - 0.8).abs() < f32::EPSILON);
1792 assert!((value.friction - 0.2).abs() < f32::EPSILON);
1793 assert!((value.logic - 0.9).abs() < f32::EPSILON);
1794 assert!((value.autonomy - 0.7).abs() < f32::EPSILON);
1795 }
1796
1797 #[test]
1798 fn parse_avec_state_extracts_embedded_json() {
1799 let value = parse_avec_state_from_text(
1800 "Here is the answer: {\"stability\":1.1,\"friction\":-0.1,\"logic\":0.5,\"autonomy\":0.6}",
1801 )
1802 .expect("embedded JSON should parse");
1803
1804 assert!((value.stability - 1.0).abs() < f32::EPSILON);
1805 assert!((value.friction - 0.0).abs() < f32::EPSILON);
1806 assert!((value.logic - 0.5).abs() < f32::EPSILON);
1807 assert!((value.autonomy - 0.6).abs() < f32::EPSILON);
1808 }
1809
1810 #[tokio::test]
1811 async fn score_avec_returns_bad_request_when_disabled() {
1812 let state = Arc::new(build_in_memory_state().await.expect("state should build"));
1813
1814 let err = score_avec_handler(
1815 State(state),
1816 HeaderMap::new(),
1817 Json(ScoreAvecHttpRequest {
1818 text: "hello world".to_string(),
1819 tenant_id: None,
1820 }),
1821 )
1822 .await
1823 .expect_err("scoring should fail when disabled");
1824
1825 assert_eq!(err.0, StatusCode::BAD_REQUEST);
1826 }
1827
1828 #[test]
1829 fn surreal_runtime_options_are_derived_from_gateway_args() {
1830 let args = surreal_test_args();
1831 let mut settings = SurrealDbSettings::default();
1832 settings.endpoints = SurrealDbEndpointsSettings {
1833 embedded: args.surreal_embedded_endpoint.clone(),
1834 remote: args.surreal_remote_endpoint.clone(),
1835 };
1836 settings.namespace = args.surreal_namespace.clone();
1837 settings.database = args.surreal_database.clone();
1838
1839 let runtime_args = vec!["--remote".to_string()];
1840 let runtime = SurrealDbRuntimeOptions::from_args(
1841 &runtime_args,
1842 &settings,
1843 Some(args.root_dir_name.as_str()),
1844 )
1845 .expect("runtime options should be computed");
1846
1847 assert!(runtime.use_remote);
1848 assert_eq!(runtime.endpoint, "ws://127.0.0.1:8000/rpc");
1849 assert_eq!(runtime.namespace, "entasis");
1850 assert_eq!(runtime.database, "locus_gateway");
1851 }
1852
1853 #[test]
1854 fn tenant_scoping_is_applied_only_for_non_default_tenant() {
1855 let scoped = scope_session_id("acme", "session-1");
1856 assert_eq!(scoped, "tenant:acme::session:session-1");
1857 assert!(session_belongs_to_tenant(&scoped, "acme"));
1858 assert!(!session_belongs_to_tenant(&scoped, "default"));
1859 assert_eq!(display_session_id(&scoped), "session-1");
1860
1861 let legacy = scope_session_id("default", "session-1");
1862 assert_eq!(legacy, "session-1");
1863 assert!(session_belongs_to_tenant(&legacy, "default"));
1864 }
1865
1866 #[tokio::test]
1867 async fn http_calibrate_defaults_trigger_to_manual() {
1868 let state = Arc::new(build_in_memory_state().await.expect("state should build"));
1869
1870 let request = CalibrateSessionHttpRequest {
1871 session_id: "http-calibrate-session".to_string(),
1872 tenant_id: None,
1873 stability: 0.8,
1874 friction: 0.2,
1875 logic: 0.8,
1876 autonomy: 0.7,
1877 trigger: None,
1878 };
1879
1880 let Json(reply) = calibrate_handler(State(state), HeaderMap::new(), Json(request))
1881 .await
1882 .expect("calibrate should succeed");
1883
1884 assert_eq!(reply.trigger, "manual");
1885 assert!(reply.is_first_calibration);
1886 }
1887
1888 #[tokio::test]
1889 async fn http_store_then_get_context_roundtrip() {
1890 let state = Arc::new(build_in_memory_state().await.expect("state should build"));
1891 let session_id = "http-store-session";
1892
1893 let Json(store_reply) = store_context_handler(
1894 State(state.clone()),
1895 HeaderMap::new(),
1896 Json(StoreContextHttpRequest {
1897 node: sample_node(session_id),
1898 session_id: session_id.to_string(),
1899 tenant_id: None,
1900 }),
1901 )
1902 .await
1903 .expect("store should succeed");
1904
1905 assert!(store_reply.valid);
1906 assert!(!store_reply.node_id.is_empty());
1907
1908 let Json(context_reply) = get_context_handler(
1909 State(state),
1910 HeaderMap::new(),
1911 Json(GetContextHttpRequest {
1912 session_id: session_id.to_string(),
1913 tenant_id: None,
1914 stability: 0.85,
1915 friction: 0.25,
1916 logic: 0.80,
1917 autonomy: 0.70,
1918 limit: Some(5),
1919 from_utc: None,
1920 to_utc: None,
1921 tiers: None,
1922 query_text: Some("gateway parser check".to_string()),
1923 query_embedding: None,
1924 alpha: None,
1925 beta: None,
1926 }),
1927 )
1928 .await
1929 .expect("get_context should succeed");
1930
1931 assert!(context_reply.retrieved >= 1);
1932 assert!(
1933 context_reply
1934 .nodes
1935 .iter()
1936 .any(|node| node.session_id == session_id)
1937 );
1938 }
1939
1940 #[tokio::test]
1941 async fn http_embedding_context_roundtrip_with_direct_embeddings() {
1942 let state = Arc::new(build_in_memory_state().await.expect("state should build"));
1943 let session_id = "http-embedding-context-session";
1944
1945 let Json(store_reply) = store_context_handler(
1946 State(state.clone()),
1947 HeaderMap::new(),
1948 Json(StoreContextHttpRequest {
1949 node: sample_node(session_id),
1950 session_id: session_id.to_string(),
1951 tenant_id: None,
1952 }),
1953 )
1954 .await
1955 .expect("store should succeed");
1956
1957 assert!(store_reply.valid);
1958
1959 let Json(context_reply) = get_embedding_context_handler(
1960 State(state),
1961 HeaderMap::new(),
1962 Json(GetEmbeddingContextHttpRequest {
1963 session_id: session_id.to_string(),
1964 tenant_id: None,
1965 stability: 0.85,
1966 friction: 0.25,
1967 logic: 0.80,
1968 autonomy: 0.70,
1969 limit: Some(5),
1970 from_utc: None,
1971 to_utc: None,
1972 tiers: None,
1973 rag_query_text: None,
1974 rag_embedding: Some(vec![0.1, 0.2, 0.3]),
1975 avec_query_text: None,
1976 avec_embedding: Some(vec![0.2, 0.2, 0.2]),
1977 rag_weight: Some(0.7),
1978 avec_weight: Some(0.3),
1979 alpha: Some(0.65),
1980 beta: Some(0.35),
1981 }),
1982 )
1983 .await
1984 .expect("embedding context should succeed");
1985
1986 assert!(context_reply.retrieved >= 1);
1987 }
1988
1989 #[tokio::test]
1990 async fn http_embedding_context_rejects_mismatched_embedding_dimensions() {
1991 let state = Arc::new(build_in_memory_state().await.expect("state should build"));
1992
1993 let err = get_embedding_context_handler(
1994 State(state),
1995 HeaderMap::new(),
1996 Json(GetEmbeddingContextHttpRequest {
1997 session_id: "session".to_string(),
1998 tenant_id: None,
1999 stability: 0.8,
2000 friction: 0.2,
2001 logic: 0.8,
2002 autonomy: 0.7,
2003 limit: Some(5),
2004 from_utc: None,
2005 to_utc: None,
2006 tiers: None,
2007 rag_query_text: None,
2008 rag_embedding: Some(vec![0.1, 0.2, 0.3]),
2009 avec_query_text: None,
2010 avec_embedding: Some(vec![0.3, 0.2]),
2011 rag_weight: Some(0.7),
2012 avec_weight: Some(0.3),
2013 alpha: None,
2014 beta: None,
2015 }),
2016 )
2017 .await
2018 .expect_err("mismatched dimensions should fail");
2019
2020 assert_eq!(err.0, StatusCode::BAD_REQUEST);
2021 }
2022
2023 #[tokio::test]
2024 async fn grpc_service_roundtrip_for_calibrate_store_and_list() {
2025 let state = Arc::new(build_in_memory_state().await.expect("state should build"));
2026 let service = GrpcGatewayService::new(state);
2027 let session_id = "grpc-store-session";
2028
2029 let calibrate_reply = service
2030 .calibrate_session(Request::new(proto::CalibrateSessionRequest {
2031 session_id: session_id.to_string(),
2032 stability: 0.8,
2033 friction: 0.2,
2034 logic: 0.8,
2035 autonomy: 0.7,
2036 trigger: String::new(),
2037 }))
2038 .await
2039 .expect("gRPC calibrate should succeed")
2040 .into_inner();
2041
2042 assert_eq!(calibrate_reply.trigger, "manual");
2043
2044 let store_reply = service
2045 .store_context(Request::new(proto::StoreContextRequest {
2046 node: sample_node(session_id),
2047 session_id: session_id.to_string(),
2048 }))
2049 .await
2050 .expect("gRPC store should succeed")
2051 .into_inner();
2052
2053 assert!(store_reply.valid);
2054
2055 let rekey_reply = service
2056 .batch_rekey(Request::new(proto::BatchRekeyRequest {
2057 node_ids: vec![store_reply.node_id.clone()],
2058 target_session_id: session_id.to_string(),
2059 target_tenant_id: Some("default".to_string()),
2060 dry_run: Some(true),
2061 allow_merge: Some(false),
2062 }))
2063 .await
2064 .expect("gRPC batch_rekey should succeed")
2065 .into_inner();
2066
2067 assert_eq!(rekey_reply.requested_node_ids, 1);
2068 assert_eq!(rekey_reply.resolved_node_ids, 1);
2069 assert!(rekey_reply.scopes.iter().any(|scope| !scope.applied));
2070
2071 let list_reply = service
2072 .list_nodes(Request::new(proto::ListNodesRequest {
2073 limit: 50,
2074 session_id: Some(session_id.to_string()),
2075 }))
2076 .await
2077 .expect("gRPC list_nodes should succeed")
2078 .into_inner();
2079
2080 assert!(list_reply.retrieved >= 1);
2081 assert!(
2082 list_reply
2083 .nodes
2084 .iter()
2085 .any(|node| node.session_id == session_id)
2086 );
2087
2088 let context_reply = service
2089 .get_context(Request::new(proto::GetContextRequest {
2090 session_id: session_id.to_string(),
2091 stability: 0.8,
2092 friction: 0.2,
2093 logic: 0.8,
2094 autonomy: 0.7,
2095 limit: 5,
2096 query_text: None,
2097 query_embedding: vec![0.1, 0.2, 0.3],
2098 alpha: Some(0.7),
2099 beta: Some(0.3),
2100 from_utc: None,
2101 to_utc: None,
2102 tiers: Vec::new(),
2103 }))
2104 .await
2105 .expect("gRPC get_context should succeed")
2106 .into_inner();
2107
2108 assert!(context_reply.retrieved >= 1);
2109
2110 let embedding_context_reply = service
2111 .get_embedding_context(Request::new(proto::GetEmbeddingContextRequest {
2112 session_id: session_id.to_string(),
2113 stability: 0.8,
2114 friction: 0.2,
2115 logic: 0.8,
2116 autonomy: 0.7,
2117 limit: 5,
2118 rag_query_text: None,
2119 rag_embedding: vec![0.1, 0.2, 0.3],
2120 avec_query_text: None,
2121 avec_embedding: vec![0.2, 0.2, 0.2],
2122 rag_weight: Some(0.7),
2123 avec_weight: Some(0.3),
2124 alpha: Some(0.7),
2125 beta: Some(0.3),
2126 from_utc: None,
2127 to_utc: None,
2128 tiers: Vec::new(),
2129 }))
2130 .await
2131 .expect("gRPC get_embedding_context should succeed")
2132 .into_inner();
2133
2134 assert!(embedding_context_reply.retrieved >= 1);
2135
2136 let err = service
2137 .get_embedding_context(Request::new(proto::GetEmbeddingContextRequest {
2138 session_id: session_id.to_string(),
2139 stability: 0.8,
2140 friction: 0.2,
2141 logic: 0.8,
2142 autonomy: 0.7,
2143 limit: 5,
2144 rag_query_text: None,
2145 rag_embedding: vec![0.1, 0.2, 0.3],
2146 avec_query_text: None,
2147 avec_embedding: vec![0.2, 0.2],
2148 rag_weight: Some(0.7),
2149 avec_weight: Some(0.3),
2150 alpha: None,
2151 beta: None,
2152 from_utc: None,
2153 to_utc: None,
2154 tiers: Vec::new(),
2155 }))
2156 .await
2157 .expect_err("gRPC get_embedding_context should reject invalid dimensions");
2158
2159 assert_eq!(err.code(), Status::invalid_argument("x").code());
2160 }
2161}