opendal/services/sqlite/
backend.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::pin::Pin;
21use std::str::FromStr;
22use std::task::Context;
23use std::task::Poll;
24
25use futures::Stream;
26use futures::StreamExt;
27use futures::stream::BoxStream;
28use ouroboros::self_referencing;
29use sqlx::SqlitePool;
30use sqlx::sqlite::SqliteConnectOptions;
31use tokio::sync::OnceCell;
32
33use crate::raw::adapters::kv;
34use crate::raw::*;
35use crate::services::SqliteConfig;
36use crate::*;
37
38#[doc = include_str!("docs.md")]
39#[derive(Default)]
40pub struct SqliteBuilder {
41 pub(super) config: SqliteConfig,
42}
43
44impl Debug for SqliteBuilder {
45 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
46 let mut ds = f.debug_struct("SqliteBuilder");
47
48 ds.field("config", &self.config);
49 ds.finish()
50 }
51}
52
53impl SqliteBuilder {
54 pub fn connection_string(mut self, v: &str) -> Self {
68 if !v.is_empty() {
69 self.config.connection_string = Some(v.to_string());
70 }
71 self
72 }
73
74 pub fn root(mut self, root: &str) -> Self {
78 self.config.root = if root.is_empty() {
79 None
80 } else {
81 Some(root.to_string())
82 };
83
84 self
85 }
86
87 pub fn table(mut self, table: &str) -> Self {
89 if !table.is_empty() {
90 self.config.table = Some(table.to_string());
91 }
92 self
93 }
94
95 pub fn key_field(mut self, key_field: &str) -> Self {
99 if !key_field.is_empty() {
100 self.config.key_field = Some(key_field.to_string());
101 }
102 self
103 }
104
105 pub fn value_field(mut self, value_field: &str) -> Self {
109 if !value_field.is_empty() {
110 self.config.value_field = Some(value_field.to_string());
111 }
112 self
113 }
114}
115
116impl Builder for SqliteBuilder {
117 type Config = SqliteConfig;
118
119 fn build(self) -> Result<impl Access> {
120 let conn = match self.config.connection_string {
121 Some(v) => v,
122 None => {
123 return Err(Error::new(
124 ErrorKind::ConfigInvalid,
125 "connection_string is required but not set",
126 )
127 .with_context("service", Scheme::Sqlite));
128 }
129 };
130
131 let config = SqliteConnectOptions::from_str(&conn).map_err(|err| {
132 Error::new(ErrorKind::ConfigInvalid, "connection_string is invalid")
133 .with_context("service", Scheme::Sqlite)
134 .set_source(err)
135 })?;
136
137 let table = match self.config.table {
138 Some(v) => v,
139 None => {
140 return Err(Error::new(ErrorKind::ConfigInvalid, "table is empty")
141 .with_context("service", Scheme::Sqlite));
142 }
143 };
144
145 let key_field = self.config.key_field.unwrap_or_else(|| "key".to_string());
146
147 let value_field = self
148 .config
149 .value_field
150 .unwrap_or_else(|| "value".to_string());
151
152 let root = normalize_root(self.config.root.as_deref().unwrap_or("/"));
153
154 Ok(SqliteBackend::new(Adapter {
155 pool: OnceCell::new(),
156 config,
157 table,
158 key_field,
159 value_field,
160 })
161 .with_normalized_root(root))
162 }
163}
164
165pub type SqliteBackend = kv::Backend<Adapter>;
166
167#[derive(Debug, Clone)]
168pub struct Adapter {
169 pool: OnceCell<SqlitePool>,
170 config: SqliteConnectOptions,
171
172 table: String,
173 key_field: String,
174 value_field: String,
175}
176
177impl Adapter {
178 async fn get_client(&self) -> Result<&SqlitePool> {
179 self.pool
180 .get_or_try_init(|| async {
181 let pool = SqlitePool::connect_with(self.config.clone())
182 .await
183 .map_err(parse_sqlite_error)?;
184 Ok(pool)
185 })
186 .await
187 }
188}
189
190#[self_referencing]
191pub struct SqliteScanner {
192 pool: SqlitePool,
193 query: String,
194
195 #[borrows(pool, query)]
196 #[covariant]
197 stream: BoxStream<'this, Result<String>>,
198}
199
200impl Stream for SqliteScanner {
201 type Item = Result<String>;
202
203 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
204 self.with_stream_mut(|s| s.poll_next_unpin(cx))
205 }
206}
207
208unsafe impl Sync for SqliteScanner {}
209
210impl kv::Scan for SqliteScanner {
211 async fn next(&mut self) -> Result<Option<String>> {
212 <Self as StreamExt>::next(self).await.transpose()
213 }
214}
215
216impl kv::Adapter for Adapter {
217 type Scanner = SqliteScanner;
218
219 fn info(&self) -> kv::Info {
220 kv::Info::new(
221 Scheme::Sqlite,
222 &self.table,
223 Capability {
224 read: true,
225 write: true,
226 delete: true,
227 list: true,
228 shared: false,
229 ..Default::default()
230 },
231 )
232 }
233
234 async fn get(&self, path: &str) -> Result<Option<Buffer>> {
235 let pool = self.get_client().await?;
236
237 let value: Option<Vec<u8>> = sqlx::query_scalar(&format!(
238 "SELECT `{}` FROM `{}` WHERE `{}` = $1 LIMIT 1",
239 self.value_field, self.table, self.key_field
240 ))
241 .bind(path)
242 .fetch_optional(pool)
243 .await
244 .map_err(parse_sqlite_error)?;
245
246 Ok(value.map(Buffer::from))
247 }
248
249 async fn set(&self, path: &str, value: Buffer) -> Result<()> {
250 let pool = self.get_client().await?;
251
252 sqlx::query(&format!(
253 "INSERT OR REPLACE INTO `{}` (`{}`, `{}`) VALUES ($1, $2)",
254 self.table, self.key_field, self.value_field,
255 ))
256 .bind(path)
257 .bind(value.to_vec())
258 .execute(pool)
259 .await
260 .map_err(parse_sqlite_error)?;
261
262 Ok(())
263 }
264
265 async fn delete(&self, path: &str) -> Result<()> {
266 let pool = self.get_client().await?;
267
268 sqlx::query(&format!(
269 "DELETE FROM `{}` WHERE `{}` = $1",
270 self.table, self.key_field
271 ))
272 .bind(path)
273 .execute(pool)
274 .await
275 .map_err(parse_sqlite_error)?;
276
277 Ok(())
278 }
279
280 async fn scan(&self, path: &str) -> Result<Self::Scanner> {
281 let pool = self.get_client().await?;
282 let stream = SqliteScannerBuilder {
283 pool: pool.clone(),
284 query: format!(
285 "SELECT `{}` FROM `{}` WHERE `{}` LIKE $1",
286 self.key_field, self.table, self.key_field
287 ),
288 stream_builder: |pool, query| {
289 sqlx::query_scalar(query)
290 .bind(format!("{path}%"))
291 .fetch(pool)
292 .map(|v| v.map_err(parse_sqlite_error))
293 .boxed()
294 },
295 }
296 .build();
297
298 Ok(stream)
299 }
300}
301
302fn parse_sqlite_error(err: sqlx::Error) -> Error {
303 let is_temporary = matches!(
304 &err,
305 sqlx::Error::Database(db_err) if db_err.code().is_some_and(|c| c == "5" || c == "6")
306 );
307
308 let message = if is_temporary {
309 "database is locked or busy"
310 } else {
311 "unhandled error from sqlite"
312 };
313
314 let mut error = Error::new(ErrorKind::Unexpected, message).set_source(err);
315 if is_temporary {
316 error = error.set_temporary();
317 }
318 error
319}