opendal/services/gridfs/
backend.rs1use std::sync::Arc;
19
20use tokio::sync::OnceCell;
21
22use super::config::GridfsConfig;
23use super::core::*;
24use super::deleter::GridfsDeleter;
25use super::writer::GridfsWriter;
26use crate::raw::*;
27use crate::*;
28
29#[doc = include_str!("docs.md")]
30#[derive(Default)]
31pub struct GridfsBuilder {
32 pub(super) config: GridfsConfig,
33}
34
35impl GridfsBuilder {
36 pub fn connection_string(mut self, v: &str) -> Self {
57 if !v.is_empty() {
58 self.config.connection_string = Some(v.to_string());
59 }
60 self
61 }
62
63 pub fn root(mut self, root: &str) -> Self {
67 self.config.root = if root.is_empty() {
68 None
69 } else {
70 Some(root.to_string())
71 };
72
73 self
74 }
75
76 pub fn database(mut self, database: &str) -> Self {
78 if !database.is_empty() {
79 self.config.database = Some(database.to_string());
80 }
81 self
82 }
83
84 pub fn bucket(mut self, bucket: &str) -> Self {
88 if !bucket.is_empty() {
89 self.config.bucket = Some(bucket.to_string());
90 }
91 self
92 }
93
94 pub fn chunk_size(mut self, chunk_size: u32) -> Self {
98 if chunk_size > 0 {
99 self.config.chunk_size = Some(chunk_size);
100 }
101 self
102 }
103}
104
105impl Builder for GridfsBuilder {
106 type Config = GridfsConfig;
107
108 fn build(self) -> Result<impl Access> {
109 let conn = match &self.config.connection_string.clone() {
110 Some(v) => v.clone(),
111 None => {
112 return Err(
113 Error::new(ErrorKind::ConfigInvalid, "connection_string is required")
114 .with_context("service", Scheme::Gridfs),
115 );
116 }
117 };
118 let database = match &self.config.database.clone() {
119 Some(v) => v.clone(),
120 None => {
121 return Err(Error::new(ErrorKind::ConfigInvalid, "database is required")
122 .with_context("service", Scheme::Gridfs));
123 }
124 };
125 let bucket = match &self.config.bucket.clone() {
126 Some(v) => v.clone(),
127 None => "fs".to_string(),
128 };
129 let chunk_size = self.config.chunk_size.unwrap_or(255);
130
131 let root = normalize_root(
132 self.config
133 .root
134 .clone()
135 .unwrap_or_else(|| "/".to_string())
136 .as_str(),
137 );
138
139 Ok(GridfsBackend::new(GridfsCore {
140 connection_string: conn,
141 database,
142 bucket,
143 chunk_size,
144 bucket_instance: OnceCell::new(),
145 })
146 .with_normalized_root(root))
147 }
148}
149
150#[derive(Clone, Debug)]
152pub struct GridfsBackend {
153 core: Arc<GridfsCore>,
154 root: String,
155 info: Arc<AccessorInfo>,
156}
157
158impl GridfsBackend {
159 pub fn new(core: GridfsCore) -> Self {
160 let info = AccessorInfo::default();
161 info.set_scheme(Scheme::Gridfs.into_static());
162 info.set_name(&format!("{}/{}", core.database, core.bucket));
163 info.set_root("/");
164 info.set_native_capability(Capability {
165 read: true,
166 stat: true,
167 write: true,
168 write_can_empty: true,
169 delete: true,
170 shared: true,
171 ..Default::default()
172 });
173
174 Self {
175 core: Arc::new(core),
176 root: "/".to_string(),
177 info: Arc::new(info),
178 }
179 }
180
181 fn with_normalized_root(mut self, root: String) -> Self {
182 self.info.set_root(&root);
183 self.root = root;
184 self
185 }
186}
187
188impl Access for GridfsBackend {
189 type Reader = Buffer;
190 type Writer = GridfsWriter;
191 type Lister = ();
192 type Deleter = oio::OneShotDeleter<GridfsDeleter>;
193
194 fn info(&self) -> Arc<AccessorInfo> {
195 self.info.clone()
196 }
197
198 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
199 let p = build_abs_path(&self.root, path);
200
201 if p == build_abs_path(&self.root, "") {
202 Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
203 } else {
204 let bs = self.core.get(&p).await?;
205 match bs {
206 Some(bs) => Ok(RpStat::new(
207 Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
208 )),
209 None => Err(Error::new(ErrorKind::NotFound, "kv not found in gridfs")),
210 }
211 }
212 }
213
214 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
215 let p = build_abs_path(&self.root, path);
216 let bs = match self.core.get(&p).await? {
217 Some(bs) => bs,
218 None => {
219 return Err(Error::new(ErrorKind::NotFound, "kv not found in gridfs"));
220 }
221 };
222 Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize())))
223 }
224
225 async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
226 let p = build_abs_path(&self.root, path);
227 Ok((RpWrite::new(), GridfsWriter::new(self.core.clone(), p)))
228 }
229
230 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
231 Ok((
232 RpDelete::default(),
233 oio::OneShotDeleter::new(GridfsDeleter::new(self.core.clone(), self.root.clone())),
234 ))
235 }
236
237 async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
238 let _ = build_abs_path(&self.root, path);
239 Ok((RpList::default(), ()))
240 }
241}