1use std::str::FromStr;
19use std::sync::Arc;
20
21use sqlx::sqlite::SqliteConnectOptions;
22use tokio::sync::OnceCell;
23
24use super::SQLITE_SCHEME;
25use super::config::SqliteConfig;
26use super::core::SqliteCore;
27use super::deleter::SqliteDeleter;
28use super::writer::SqliteWriter;
29use crate::raw::oio;
30use crate::raw::*;
31use crate::*;
32
33#[doc = include_str!("docs.md")]
34#[derive(Debug, Default)]
35pub struct SqliteBuilder {
36 pub(super) config: SqliteConfig,
37}
38
39impl SqliteBuilder {
40 pub fn connection_string(mut self, v: &str) -> Self {
54 if !v.is_empty() {
55 self.config.connection_string = Some(v.to_string());
56 }
57 self
58 }
59
60 pub fn root(mut self, root: &str) -> Self {
64 self.config.root = if root.is_empty() {
65 None
66 } else {
67 Some(root.to_string())
68 };
69
70 self
71 }
72
73 pub fn table(mut self, table: &str) -> Self {
75 if !table.is_empty() {
76 self.config.table = Some(table.to_string());
77 }
78 self
79 }
80
81 pub fn key_field(mut self, key_field: &str) -> Self {
85 if !key_field.is_empty() {
86 self.config.key_field = Some(key_field.to_string());
87 }
88 self
89 }
90
91 pub fn value_field(mut self, value_field: &str) -> Self {
95 if !value_field.is_empty() {
96 self.config.value_field = Some(value_field.to_string());
97 }
98 self
99 }
100}
101
102impl Builder for SqliteBuilder {
103 type Config = SqliteConfig;
104
105 fn build(self) -> Result<impl Access> {
106 let conn = match self.config.connection_string {
107 Some(v) => v,
108 None => {
109 return Err(Error::new(
110 ErrorKind::ConfigInvalid,
111 "connection_string is required but not set",
112 )
113 .with_context("service", SQLITE_SCHEME));
114 }
115 };
116
117 let config = SqliteConnectOptions::from_str(&conn).map_err(|err| {
118 Error::new(ErrorKind::ConfigInvalid, "connection_string is invalid")
119 .with_context("service", SQLITE_SCHEME)
120 .set_source(err)
121 })?;
122
123 let table = match self.config.table {
124 Some(v) => v,
125 None => {
126 return Err(Error::new(ErrorKind::ConfigInvalid, "table is empty")
127 .with_context("service", SQLITE_SCHEME));
128 }
129 };
130
131 let key_field = self.config.key_field.unwrap_or_else(|| "key".to_string());
132
133 let value_field = self
134 .config
135 .value_field
136 .unwrap_or_else(|| "value".to_string());
137
138 let root = normalize_root(self.config.root.as_deref().unwrap_or("/"));
139
140 Ok(SqliteBackend::new(SqliteCore {
141 pool: OnceCell::new(),
142 config,
143 table,
144 key_field,
145 value_field,
146 })
147 .with_normalized_root(root))
148 }
149}
150
151pub fn parse_sqlite_error(err: sqlx::Error) -> Error {
152 let is_temporary = matches!(
153 &err,
154 sqlx::Error::Database(db_err) if db_err.code().is_some_and(|c| c == "5" || c == "6")
155 );
156
157 let message = if is_temporary {
158 "database is locked or busy"
159 } else {
160 "unhandled error from sqlite"
161 };
162
163 let mut error = Error::new(ErrorKind::Unexpected, message).set_source(err);
164 if is_temporary {
165 error = error.set_temporary();
166 }
167 error
168}
169
170#[derive(Debug, Clone)]
172pub struct SqliteBackend {
173 core: Arc<SqliteCore>,
174 root: String,
175 info: Arc<AccessorInfo>,
176}
177
178impl SqliteBackend {
179 fn new(core: SqliteCore) -> Self {
180 let info = AccessorInfo::default();
181 info.set_scheme(SQLITE_SCHEME);
182 info.set_name(&core.table);
183 info.set_root("/");
184 info.set_native_capability(Capability {
185 read: true,
186 write: true,
187 create_dir: true,
188 delete: true,
189 stat: true,
190 write_can_empty: true,
191 list: false,
192 shared: false,
193 ..Default::default()
194 });
195
196 Self {
197 core: Arc::new(core),
198 root: "/".to_string(),
199 info: Arc::new(info),
200 }
201 }
202
203 fn with_normalized_root(mut self, root: String) -> Self {
204 self.info.set_root(&root);
205 self.root = root;
206 self
207 }
208}
209
210impl Access for SqliteBackend {
211 type Reader = Buffer;
212 type Writer = SqliteWriter;
213 type Lister = ();
214 type Deleter = oio::OneShotDeleter<SqliteDeleter>;
215
216 fn info(&self) -> Arc<AccessorInfo> {
217 self.info.clone()
218 }
219
220 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
221 let p = build_abs_path(&self.root, path);
222
223 if p == build_abs_path(&self.root, "") {
224 Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
225 } else {
226 let bs = self.core.get(&p).await?;
227 match bs {
228 Some(bs) => Ok(RpStat::new(
229 Metadata::new(EntryMode::from_path(&p)).with_content_length(bs.len() as u64),
230 )),
231 None => {
232 let dir_path = if p.ends_with('/') {
234 p.clone()
235 } else {
236 format!("{}/", p)
237 };
238 let count: i64 = sqlx::query_scalar(&format!(
239 "SELECT COUNT(*) FROM `{}` WHERE `{}` LIKE $1 LIMIT 1",
240 self.core.table, self.core.key_field
241 ))
242 .bind(format!("{}%", dir_path))
243 .fetch_one(self.core.get_client().await?)
244 .await
245 .map_err(crate::services::sqlite::backend::parse_sqlite_error)?;
246
247 if count > 0 {
248 Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
250 } else {
251 Err(Error::new(ErrorKind::NotFound, "key not found in sqlite"))
252 }
253 }
254 }
255 }
256 }
257
258 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
259 let p = build_abs_path(&self.root, path);
260
261 let range = args.range();
262 let buffer = if range.is_full() {
263 match self.core.get(&p).await? {
265 Some(bs) => bs,
266 None => return Err(Error::new(ErrorKind::NotFound, "key not found in sqlite")),
267 }
268 } else {
269 let start = range.offset() as isize;
271 let limit = match range.size() {
272 Some(size) => size as isize,
273 None => -1, };
275
276 match self.core.get_range(&p, start, limit).await? {
277 Some(bs) => bs,
278 None => return Err(Error::new(ErrorKind::NotFound, "key not found in sqlite")),
279 }
280 };
281
282 Ok((RpRead::new(), buffer))
283 }
284
285 async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
286 let p = build_abs_path(&self.root, path);
287 Ok((RpWrite::new(), SqliteWriter::new(self.core.clone(), &p)))
288 }
289
290 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
291 Ok((
292 RpDelete::default(),
293 oio::OneShotDeleter::new(SqliteDeleter::new(self.core.clone(), self.root.clone())),
294 ))
295 }
296
297 async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
298 let p = build_abs_path(&self.root, path);
299
300 let dir_path = if p.ends_with('/') {
302 p
303 } else {
304 format!("{}/", p)
305 };
306
307 self.core.set(&dir_path, Buffer::new()).await?;
309
310 Ok(RpCreateDir::default())
311 }
312}
313
314#[cfg(test)]
315mod test {
316 use super::*;
317 use sqlx::SqlitePool;
318
319 async fn build_client() -> OnceCell<SqlitePool> {
320 let config = SqliteConnectOptions::from_str("sqlite::memory:").unwrap();
321 let pool = SqlitePool::connect_with(config).await.unwrap();
322 OnceCell::new_with(Some(pool))
323 }
324
325 #[tokio::test]
326 async fn test_sqlite_accessor_creation() {
327 let core = SqliteCore {
328 pool: build_client().await,
329 config: Default::default(),
330 table: "test".to_string(),
331 key_field: "key".to_string(),
332 value_field: "value".to_string(),
333 };
334
335 let accessor = SqliteBackend::new(core);
336
337 assert_eq!(accessor.root, "/");
339 assert_eq!(accessor.info.scheme(), SQLITE_SCHEME);
340 assert!(accessor.info.native_capability().read);
341 assert!(accessor.info.native_capability().write);
342 assert!(accessor.info.native_capability().delete);
343 assert!(accessor.info.native_capability().stat);
344 }
345
346 #[tokio::test]
347 async fn test_sqlite_accessor_with_root() {
348 let core = SqliteCore {
349 pool: build_client().await,
350 config: Default::default(),
351 table: "test".to_string(),
352 key_field: "key".to_string(),
353 value_field: "value".to_string(),
354 };
355
356 let accessor = SqliteBackend::new(core).with_normalized_root("/test/".to_string());
357
358 assert_eq!(accessor.root, "/test/");
359 assert_eq!(accessor.info.root(), "/test/".into());
360 }
361}