opendal/services/mongodb/
backend.rs1use std::sync::Arc;
19
20use tokio::sync::OnceCell;
21
22use super::config::MongodbConfig;
23use super::core::*;
24use super::deleter::MongodbDeleter;
25use super::writer::MongodbWriter;
26use crate::raw::*;
27use crate::*;
28
29#[doc = include_str!("docs.md")]
30#[derive(Default)]
31pub struct MongodbBuilder {
32 pub(super) config: MongodbConfig,
33}
34
35impl MongodbBuilder {
36 pub fn connection_string(mut self, v: &str) -> Self {
57 if !v.is_empty() {
58 self.config.connection_string = Some(v.to_string());
59 }
60 self
61 }
62 pub fn root(mut self, root: &str) -> Self {
66 self.config.root = if root.is_empty() {
67 None
68 } else {
69 Some(root.to_string())
70 };
71
72 self
73 }
74
75 pub fn database(mut self, database: &str) -> Self {
77 if !database.is_empty() {
78 self.config.database = Some(database.to_string());
79 }
80 self
81 }
82
83 pub fn collection(mut self, collection: &str) -> Self {
85 if !collection.is_empty() {
86 self.config.collection = Some(collection.to_string());
87 }
88 self
89 }
90
91 pub fn key_field(mut self, key_field: &str) -> Self {
95 if !key_field.is_empty() {
96 self.config.key_field = Some(key_field.to_string());
97 }
98 self
99 }
100
101 pub fn value_field(mut self, value_field: &str) -> Self {
105 if !value_field.is_empty() {
106 self.config.value_field = Some(value_field.to_string());
107 }
108 self
109 }
110}
111
112impl Builder for MongodbBuilder {
113 type Config = MongodbConfig;
114
115 fn build(self) -> Result<impl Access> {
116 let conn = match &self.config.connection_string.clone() {
117 Some(v) => v.clone(),
118 None => {
119 return Err(
120 Error::new(ErrorKind::ConfigInvalid, "connection_string is required")
121 .with_context("service", Scheme::Mongodb),
122 );
123 }
124 };
125 let database = match &self.config.database.clone() {
126 Some(v) => v.clone(),
127 None => {
128 return Err(Error::new(ErrorKind::ConfigInvalid, "database is required")
129 .with_context("service", Scheme::Mongodb));
130 }
131 };
132 let collection = match &self.config.collection.clone() {
133 Some(v) => v.clone(),
134 None => {
135 return Err(
136 Error::new(ErrorKind::ConfigInvalid, "collection is required")
137 .with_context("service", Scheme::Mongodb),
138 );
139 }
140 };
141 let key_field = match &self.config.key_field.clone() {
142 Some(v) => v.clone(),
143 None => "key".to_string(),
144 };
145 let value_field = match &self.config.value_field.clone() {
146 Some(v) => v.clone(),
147 None => "value".to_string(),
148 };
149 let root = normalize_root(
150 self.config
151 .root
152 .clone()
153 .unwrap_or_else(|| "/".to_string())
154 .as_str(),
155 );
156 Ok(MongodbBackend::new(MongodbCore {
157 connection_string: conn,
158 database,
159 collection,
160 collection_instance: OnceCell::new(),
161 key_field,
162 value_field,
163 })
164 .with_normalized_root(root))
165 }
166}
167
168#[derive(Clone, Debug)]
170pub struct MongodbBackend {
171 core: Arc<MongodbCore>,
172 root: String,
173 info: Arc<AccessorInfo>,
174}
175
176impl MongodbBackend {
177 pub fn new(core: MongodbCore) -> Self {
178 let info = AccessorInfo::default();
179 info.set_scheme(Scheme::Mongodb.into_static());
180 info.set_name(&format!("{}/{}", core.database, core.collection));
181 info.set_root("/");
182 info.set_native_capability(Capability {
183 read: true,
184 stat: true,
185 write: true,
186 write_can_empty: true,
187 delete: true,
188 shared: true,
189 ..Default::default()
190 });
191
192 Self {
193 core: Arc::new(core),
194 root: "/".to_string(),
195 info: Arc::new(info),
196 }
197 }
198
199 fn with_normalized_root(mut self, root: String) -> Self {
200 self.info.set_root(&root);
201 self.root = root;
202 self
203 }
204}
205
206impl Access for MongodbBackend {
207 type Reader = Buffer;
208 type Writer = MongodbWriter;
209 type Lister = ();
210 type Deleter = oio::OneShotDeleter<MongodbDeleter>;
211
212 fn info(&self) -> Arc<AccessorInfo> {
213 self.info.clone()
214 }
215
216 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
217 let p = build_abs_path(&self.root, path);
218
219 if p == build_abs_path(&self.root, "") {
220 Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
221 } else {
222 let bs = self.core.get(&p).await?;
223 match bs {
224 Some(bs) => Ok(RpStat::new(
225 Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
226 )),
227 None => Err(Error::new(ErrorKind::NotFound, "kv not found in mongodb")),
228 }
229 }
230 }
231
232 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
233 let p = build_abs_path(&self.root, path);
234 let bs = match self.core.get(&p).await? {
235 Some(bs) => bs,
236 None => {
237 return Err(Error::new(ErrorKind::NotFound, "kv not found in mongodb"));
238 }
239 };
240 Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize())))
241 }
242
243 async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
244 let p = build_abs_path(&self.root, path);
245 Ok((RpWrite::new(), MongodbWriter::new(self.core.clone(), p)))
246 }
247
248 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
249 Ok((
250 RpDelete::default(),
251 oio::OneShotDeleter::new(MongodbDeleter::new(self.core.clone(), self.root.clone())),
252 ))
253 }
254
255 async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
256 let _ = build_abs_path(&self.root, path);
257 Ok((RpList::default(), ()))
258 }
259}