opendal/services/redb/
backend.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use crate::Builder;
23use crate::Error;
24use crate::ErrorKind;
25use crate::Scheme;
26use crate::raw::adapters::kv;
27use crate::raw::*;
28use crate::services::RedbConfig;
29use crate::*;
30
31#[doc = include_str!("docs.md")]
33#[derive(Default, Debug)]
34pub struct RedbBuilder {
35 pub(super) config: RedbConfig,
36
37 pub(super) database: Option<Arc<redb::Database>>,
38}
39
40impl RedbBuilder {
41 pub fn database(mut self, db: Arc<redb::Database>) -> Self {
54 self.database = Some(db);
55 self
56 }
57
58 pub fn datadir(mut self, path: &str) -> Self {
72 self.config.datadir = Some(path.into());
73 self
74 }
75
76 pub fn table(mut self, table: &str) -> Self {
78 self.config.table = Some(table.into());
79 self
80 }
81
82 pub fn root(mut self, path: &str) -> Self {
84 self.config.root = Some(path.into());
85 self
86 }
87}
88
89impl Builder for RedbBuilder {
90 type Config = RedbConfig;
91
92 fn build(self) -> Result<impl Access> {
93 let table_name = self.config.table.ok_or_else(|| {
94 Error::new(ErrorKind::ConfigInvalid, "table is required but not set")
95 .with_context("service", Scheme::Redb)
96 })?;
97
98 let (datadir, db) = if let Some(db) = self.database {
99 (None, db)
100 } else {
101 let datadir = self.config.datadir.ok_or_else(|| {
102 Error::new(ErrorKind::ConfigInvalid, "datadir is required but not set")
103 .with_context("service", Scheme::Redb)
104 })?;
105
106 let db = redb::Database::create(&datadir)
107 .map_err(parse_database_error)?
108 .into();
109
110 (Some(datadir), db)
111 };
112
113 create_table(&db, &table_name)?;
114
115 Ok(RedbBackend::new(Adapter {
116 datadir,
117 table: table_name,
118 db,
119 })
120 .with_root(self.config.root.as_deref().unwrap_or_default()))
121 }
122}
123
124pub type RedbBackend = kv::Backend<Adapter>;
126
127#[derive(Clone)]
128pub struct Adapter {
129 datadir: Option<String>,
130 table: String,
131 db: Arc<redb::Database>,
132}
133
134impl Debug for Adapter {
135 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
136 let mut ds = f.debug_struct("Adapter");
137 ds.field("path", &self.datadir);
138 ds.finish()
139 }
140}
141
142impl kv::Adapter for Adapter {
143 type Scanner = ();
144
145 fn info(&self) -> kv::Info {
146 kv::Info::new(
147 Scheme::Redb,
148 &self.table,
149 Capability {
150 read: true,
151 write: true,
152 shared: false,
153 ..Default::default()
154 },
155 )
156 }
157
158 async fn get(&self, path: &str) -> Result<Option<Buffer>> {
159 let read_txn = self.db.begin_read().map_err(parse_transaction_error)?;
160
161 let table_define: redb::TableDefinition<&str, &[u8]> =
162 redb::TableDefinition::new(&self.table);
163
164 let table = read_txn
165 .open_table(table_define)
166 .map_err(parse_table_error)?;
167
168 let result = match table.get(path) {
169 Ok(Some(v)) => Ok(Some(v.value().to_vec())),
170 Ok(None) => Ok(None),
171 Err(e) => Err(parse_storage_error(e)),
172 }?;
173 Ok(result.map(Buffer::from))
174 }
175
176 async fn set(&self, path: &str, value: Buffer) -> Result<()> {
177 let write_txn = self.db.begin_write().map_err(parse_transaction_error)?;
178
179 let table_define: redb::TableDefinition<&str, &[u8]> =
180 redb::TableDefinition::new(&self.table);
181
182 {
183 let mut table = write_txn
184 .open_table(table_define)
185 .map_err(parse_table_error)?;
186
187 table
188 .insert(path, &*value.to_vec())
189 .map_err(parse_storage_error)?;
190 }
191
192 write_txn.commit().map_err(parse_commit_error)?;
193 Ok(())
194 }
195
196 async fn delete(&self, path: &str) -> Result<()> {
197 let write_txn = self.db.begin_write().map_err(parse_transaction_error)?;
198
199 let table_define: redb::TableDefinition<&str, &[u8]> =
200 redb::TableDefinition::new(&self.table);
201
202 {
203 let mut table = write_txn
204 .open_table(table_define)
205 .map_err(parse_table_error)?;
206
207 table.remove(path).map_err(parse_storage_error)?;
208 }
209
210 write_txn.commit().map_err(parse_commit_error)?;
211 Ok(())
212 }
213}
214
215fn parse_transaction_error(e: redb::TransactionError) -> Error {
216 Error::new(ErrorKind::Unexpected, "error from redb").set_source(e)
217}
218
219fn parse_table_error(e: redb::TableError) -> Error {
220 match e {
221 redb::TableError::TableDoesNotExist(_) => {
222 Error::new(ErrorKind::NotFound, "error from redb").set_source(e)
223 }
224 _ => Error::new(ErrorKind::Unexpected, "error from redb").set_source(e),
225 }
226}
227
228fn parse_storage_error(e: redb::StorageError) -> Error {
229 Error::new(ErrorKind::Unexpected, "error from redb").set_source(e)
230}
231
232fn parse_database_error(e: redb::DatabaseError) -> Error {
233 Error::new(ErrorKind::Unexpected, "error from redb").set_source(e)
234}
235
236fn parse_commit_error(e: redb::CommitError) -> Error {
237 Error::new(ErrorKind::Unexpected, "error from redb").set_source(e)
238}
239
240fn create_table(db: &redb::Database, table: &str) -> Result<()> {
242 {
247 let read_txn = db.begin_read().map_err(parse_transaction_error)?;
248
249 let table_define: redb::TableDefinition<&str, &[u8]> = redb::TableDefinition::new(table);
250
251 match read_txn.open_table(table_define) {
252 Ok(_) => return Ok(()),
253 Err(redb::TableError::TableDoesNotExist(_)) => (),
254 Err(e) => return Err(parse_table_error(e)),
255 }
256 }
257
258 {
259 let write_txn = db.begin_write().map_err(parse_transaction_error)?;
260
261 let table_define: redb::TableDefinition<&str, &[u8]> = redb::TableDefinition::new(table);
262
263 write_txn
264 .open_table(table_define)
265 .map_err(parse_table_error)?;
266 write_txn.commit().map_err(parse_commit_error)?;
267 }
268
269 Ok(())
270}