locus_gateway/
surreal_client.rs1use anyhow::{Context, Result};
2use serde_json::Value;
3use locus_core_rs::storage::surrealdb::QueryParams;
4use locus_core_rs::storage::{SurrealDbClient, SurrealDbRuntimeOptions};
5use surrealdb::engine::any::{Any, connect};
6use surrealdb::opt::auth::Root;
7use tracing::{debug, error};
8
9pub struct RuntimeSurrealDbClient {
10 db: surrealdb::Surreal<Any>,
11}
12
13impl RuntimeSurrealDbClient {
14 pub async fn connect(
15 runtime: &SurrealDbRuntimeOptions,
16 user: Option<&str>,
17 password: Option<&str>,
18 ) -> Result<Self> {
19 let db = connect(runtime.endpoint.as_str()).await.with_context(|| {
20 format!(
21 "failed to connect to SurrealDB endpoint '{}'",
22 runtime.endpoint
23 )
24 })?;
25
26 if runtime.use_remote {
27 let username = user.filter(|v| !v.trim().is_empty()).unwrap_or("root");
28 let password = password.filter(|v| !v.trim().is_empty()).unwrap_or("root");
29
30 db.signin(Root {
31 username: username.to_string(),
32 password: password.to_string(),
33 })
34 .await
35 .context("failed to authenticate against remote SurrealDB")?;
36 } else if let (Some(username), Some(password)) = (
37 user.filter(|v| !v.trim().is_empty()),
38 password.filter(|v| !v.trim().is_empty()),
39 ) {
40 let _ = db
41 .signin(Root {
42 username: username.to_string(),
43 password: password.to_string(),
44 })
45 .await;
46 }
47
48 db.use_ns(runtime.namespace.as_str())
49 .use_db(runtime.database.as_str())
50 .await
51 .with_context(|| {
52 format!(
53 "failed to select namespace '{}' and database '{}'",
54 runtime.namespace, runtime.database
55 )
56 })?;
57
58 Ok(Self { db })
59 }
60
61 fn is_read_query(query: &str) -> bool {
62 query
63 .trim_start()
64 .to_ascii_uppercase()
65 .starts_with("SELECT")
66 }
67}
68
69#[tonic::async_trait]
70impl SurrealDbClient for RuntimeSurrealDbClient {
71 async fn raw_query(&self, query: &str, parameters: QueryParams) -> Result<Vec<Value>> {
72 let operation = query
73 .split_whitespace()
74 .next()
75 .unwrap_or("UNKNOWN")
76 .to_ascii_uppercase();
77 let is_read_query = Self::is_read_query(query);
78 let has_parameters = !parameters.is_empty();
79
80 let response = if parameters.is_empty() {
81 self.db.query(query).await?
82 } else {
83 self.db.query(query).bind(parameters).await?
84 };
85
86 let mut response = match response.check() {
87 Ok(value) => value,
88 Err(err) => {
89 error!(
90 operation = %operation,
91 read_query = is_read_query,
92 has_parameters,
93 error = %err,
94 "Surreal query failed"
95 );
96 return Err(err.into());
97 }
98 };
99
100 debug!(
101 operation = %operation,
102 read_query = is_read_query,
103 has_parameters,
104 "Surreal query succeeded"
105 );
106
107 if !is_read_query {
108 return Ok(Vec::new());
109 }
110
111 if let Ok(rows) = response.take::<Vec<Value>>(0) {
112 debug!(operation = %operation, row_count = rows.len(), "Surreal read query returned rows");
113 return Ok(rows);
114 }
115
116 if let Ok(Some(row)) = response.take::<Option<Value>>(0) {
117 debug!(operation = %operation, row_count = 1, "Surreal read query returned a single row");
118 return Ok(vec![row]);
119 }
120
121 debug!(operation = %operation, row_count = 0, "Surreal read query returned no rows");
122
123 Ok(Vec::new())
124 }
125}