opendal/services/sqlite/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::str::FromStr;
19use std::sync::Arc;
20
21use sqlx::sqlite::SqliteConnectOptions;
22use tokio::sync::OnceCell;
23
24use super::SQLITE_SCHEME;
25use super::config::SqliteConfig;
26use super::core::SqliteCore;
27use super::deleter::SqliteDeleter;
28use super::writer::SqliteWriter;
29use crate::raw::oio;
30use crate::raw::*;
31use crate::*;
32
33#[doc = include_str!("docs.md")]
34#[derive(Debug, Default)]
35pub struct SqliteBuilder {
36    pub(super) config: SqliteConfig,
37}
38
39impl SqliteBuilder {
40    /// Set the connection_string of the sqlite service.
41    ///
42    /// This connection string is used to connect to the sqlite service. There are url based formats:
43    ///
44    /// ## Url
45    ///
46    /// This format resembles the url format of the sqlite client:
47    ///
48    /// - `sqlite::memory:`
49    /// - `sqlite:data.db`
50    /// - `sqlite://data.db`
51    ///
52    /// For more information, please visit <https://docs.rs/sqlx/latest/sqlx/sqlite/struct.SqliteConnectOptions.html>.
53    pub fn connection_string(mut self, v: &str) -> Self {
54        if !v.is_empty() {
55            self.config.connection_string = Some(v.to_string());
56        }
57        self
58    }
59
60    /// set the working directory, all operations will be performed under it.
61    ///
62    /// default: "/"
63    pub fn root(mut self, root: &str) -> Self {
64        self.config.root = if root.is_empty() {
65            None
66        } else {
67            Some(root.to_string())
68        };
69
70        self
71    }
72
73    /// Set the table name of the sqlite service to read/write.
74    pub fn table(mut self, table: &str) -> Self {
75        if !table.is_empty() {
76            self.config.table = Some(table.to_string());
77        }
78        self
79    }
80
81    /// Set the key field name of the sqlite service to read/write.
82    ///
83    /// Default to `key` if not specified.
84    pub fn key_field(mut self, key_field: &str) -> Self {
85        if !key_field.is_empty() {
86            self.config.key_field = Some(key_field.to_string());
87        }
88        self
89    }
90
91    /// Set the value field name of the sqlite service to read/write.
92    ///
93    /// Default to `value` if not specified.
94    pub fn value_field(mut self, value_field: &str) -> Self {
95        if !value_field.is_empty() {
96            self.config.value_field = Some(value_field.to_string());
97        }
98        self
99    }
100}
101
102impl Builder for SqliteBuilder {
103    type Config = SqliteConfig;
104
105    fn build(self) -> Result<impl Access> {
106        let conn = match self.config.connection_string {
107            Some(v) => v,
108            None => {
109                return Err(Error::new(
110                    ErrorKind::ConfigInvalid,
111                    "connection_string is required but not set",
112                )
113                .with_context("service", SQLITE_SCHEME));
114            }
115        };
116
117        let config = SqliteConnectOptions::from_str(&conn).map_err(|err| {
118            Error::new(ErrorKind::ConfigInvalid, "connection_string is invalid")
119                .with_context("service", SQLITE_SCHEME)
120                .set_source(err)
121        })?;
122
123        let table = match self.config.table {
124            Some(v) => v,
125            None => {
126                return Err(Error::new(ErrorKind::ConfigInvalid, "table is empty")
127                    .with_context("service", SQLITE_SCHEME));
128            }
129        };
130
131        let key_field = self.config.key_field.unwrap_or_else(|| "key".to_string());
132
133        let value_field = self
134            .config
135            .value_field
136            .unwrap_or_else(|| "value".to_string());
137
138        let root = normalize_root(self.config.root.as_deref().unwrap_or("/"));
139
140        Ok(SqliteBackend::new(SqliteCore {
141            pool: OnceCell::new(),
142            config,
143            table,
144            key_field,
145            value_field,
146        })
147        .with_normalized_root(root))
148    }
149}
150
151pub fn parse_sqlite_error(err: sqlx::Error) -> Error {
152    let is_temporary = matches!(
153        &err,
154        sqlx::Error::Database(db_err) if db_err.code().is_some_and(|c| c == "5" || c == "6")
155    );
156
157    let message = if is_temporary {
158        "database is locked or busy"
159    } else {
160        "unhandled error from sqlite"
161    };
162
163    let mut error = Error::new(ErrorKind::Unexpected, message).set_source(err);
164    if is_temporary {
165        error = error.set_temporary();
166    }
167    error
168}
169
170/// SqliteBackend implements Access trait directly
171#[derive(Debug, Clone)]
172pub struct SqliteBackend {
173    core: Arc<SqliteCore>,
174    root: String,
175    info: Arc<AccessorInfo>,
176}
177
178impl SqliteBackend {
179    fn new(core: SqliteCore) -> Self {
180        let info = AccessorInfo::default();
181        info.set_scheme(SQLITE_SCHEME);
182        info.set_name(&core.table);
183        info.set_root("/");
184        info.set_native_capability(Capability {
185            read: true,
186            write: true,
187            create_dir: true,
188            delete: true,
189            stat: true,
190            write_can_empty: true,
191            list: false,
192            shared: false,
193            ..Default::default()
194        });
195
196        Self {
197            core: Arc::new(core),
198            root: "/".to_string(),
199            info: Arc::new(info),
200        }
201    }
202
203    fn with_normalized_root(mut self, root: String) -> Self {
204        self.info.set_root(&root);
205        self.root = root;
206        self
207    }
208}
209
210impl Access for SqliteBackend {
211    type Reader = Buffer;
212    type Writer = SqliteWriter;
213    type Lister = ();
214    type Deleter = oio::OneShotDeleter<SqliteDeleter>;
215
216    fn info(&self) -> Arc<AccessorInfo> {
217        self.info.clone()
218    }
219
220    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
221        let p = build_abs_path(&self.root, path);
222
223        if p == build_abs_path(&self.root, "") {
224            Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
225        } else {
226            let bs = self.core.get(&p).await?;
227            match bs {
228                Some(bs) => Ok(RpStat::new(
229                    Metadata::new(EntryMode::from_path(&p)).with_content_length(bs.len() as u64),
230                )),
231                None => {
232                    // Check if this might be a directory by looking for keys with this prefix
233                    let dir_path = if p.ends_with('/') {
234                        p.clone()
235                    } else {
236                        format!("{}/", p)
237                    };
238                    let count: i64 = sqlx::query_scalar(&format!(
239                        "SELECT COUNT(*) FROM `{}` WHERE `{}` LIKE $1 LIMIT 1",
240                        self.core.table, self.core.key_field
241                    ))
242                    .bind(format!("{}%", dir_path))
243                    .fetch_one(self.core.get_client().await?)
244                    .await
245                    .map_err(crate::services::sqlite::backend::parse_sqlite_error)?;
246
247                    if count > 0 {
248                        // Directory exists (has children)
249                        Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
250                    } else {
251                        Err(Error::new(ErrorKind::NotFound, "key not found in sqlite"))
252                    }
253                }
254            }
255        }
256    }
257
258    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
259        let p = build_abs_path(&self.root, path);
260
261        let range = args.range();
262        let buffer = if range.is_full() {
263            // Full read - use GET
264            match self.core.get(&p).await? {
265                Some(bs) => bs,
266                None => return Err(Error::new(ErrorKind::NotFound, "key not found in sqlite")),
267            }
268        } else {
269            // Range read - use GETRANGE
270            let start = range.offset() as isize;
271            let limit = match range.size() {
272                Some(size) => size as isize,
273                None => -1, // Sqlite uses -1 for end of string
274            };
275
276            match self.core.get_range(&p, start, limit).await? {
277                Some(bs) => bs,
278                None => return Err(Error::new(ErrorKind::NotFound, "key not found in sqlite")),
279            }
280        };
281
282        Ok((RpRead::new(), buffer))
283    }
284
285    async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
286        let p = build_abs_path(&self.root, path);
287        Ok((RpWrite::new(), SqliteWriter::new(self.core.clone(), &p)))
288    }
289
290    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
291        Ok((
292            RpDelete::default(),
293            oio::OneShotDeleter::new(SqliteDeleter::new(self.core.clone(), self.root.clone())),
294        ))
295    }
296
297    async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
298        let p = build_abs_path(&self.root, path);
299
300        // Ensure path ends with '/' for directory marker
301        let dir_path = if p.ends_with('/') {
302            p
303        } else {
304            format!("{}/", p)
305        };
306
307        // Store directory marker with empty content
308        self.core.set(&dir_path, Buffer::new()).await?;
309
310        Ok(RpCreateDir::default())
311    }
312}
313
314#[cfg(test)]
315mod test {
316    use super::*;
317    use sqlx::SqlitePool;
318
319    async fn build_client() -> OnceCell<SqlitePool> {
320        let config = SqliteConnectOptions::from_str("sqlite::memory:").unwrap();
321        let pool = SqlitePool::connect_with(config).await.unwrap();
322        OnceCell::new_with(Some(pool))
323    }
324
325    #[tokio::test]
326    async fn test_sqlite_accessor_creation() {
327        let core = SqliteCore {
328            pool: build_client().await,
329            config: Default::default(),
330            table: "test".to_string(),
331            key_field: "key".to_string(),
332            value_field: "value".to_string(),
333        };
334
335        let accessor = SqliteBackend::new(core);
336
337        // Verify basic properties
338        assert_eq!(accessor.root, "/");
339        assert_eq!(accessor.info.scheme(), SQLITE_SCHEME);
340        assert!(accessor.info.native_capability().read);
341        assert!(accessor.info.native_capability().write);
342        assert!(accessor.info.native_capability().delete);
343        assert!(accessor.info.native_capability().stat);
344    }
345
346    #[tokio::test]
347    async fn test_sqlite_accessor_with_root() {
348        let core = SqliteCore {
349            pool: build_client().await,
350            config: Default::default(),
351            table: "test".to_string(),
352            key_field: "key".to_string(),
353            value_field: "value".to_string(),
354        };
355
356        let accessor = SqliteBackend::new(core).with_normalized_root("/test/".to_string());
357
358        assert_eq!(accessor.root, "/test/");
359        assert_eq!(accessor.info.root(), "/test/".into());
360    }
361}