opendal/services/etcd/
backend.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use etcd_client::Certificate;
23use etcd_client::ConnectOptions;
24use etcd_client::Identity;
25use etcd_client::TlsOptions;
26use tokio::sync::OnceCell;
27
28use super::ETCD_SCHEME;
29use super::core::EtcdCore;
30use super::core::constants::DEFAULT_ETCD_ENDPOINTS;
31use super::deleter::EtcdDeleter;
32use super::lister::EtcdLister;
33use super::writer::EtcdWriter;
34use crate::raw::*;
35use crate::services::EtcdConfig;
36use crate::*;
37
38#[doc = include_str!("docs.md")]
40#[derive(Clone, Default)]
41pub struct EtcdBuilder {
42 pub(super) config: EtcdConfig,
43}
44
45impl Debug for EtcdBuilder {
46 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
47 let mut ds = f.debug_struct("Builder");
48
49 ds.field("config", &self.config);
50 ds.finish()
51 }
52}
53
54impl EtcdBuilder {
55 pub fn endpoints(mut self, endpoints: &str) -> Self {
59 if !endpoints.is_empty() {
60 self.config.endpoints = Some(endpoints.to_owned());
61 }
62 self
63 }
64
65 pub fn username(mut self, username: &str) -> Self {
69 if !username.is_empty() {
70 self.config.username = Some(username.to_owned());
71 }
72 self
73 }
74
75 pub fn password(mut self, password: &str) -> Self {
79 if !password.is_empty() {
80 self.config.password = Some(password.to_owned());
81 }
82 self
83 }
84
85 pub fn root(mut self, root: &str) -> Self {
89 self.config.root = if root.is_empty() {
90 None
91 } else {
92 Some(root.to_string())
93 };
94
95 self
96 }
97
98 pub fn ca_path(mut self, ca_path: &str) -> Self {
102 if !ca_path.is_empty() {
103 self.config.ca_path = Some(ca_path.to_string())
104 }
105 self
106 }
107
108 pub fn cert_path(mut self, cert_path: &str) -> Self {
112 if !cert_path.is_empty() {
113 self.config.cert_path = Some(cert_path.to_string())
114 }
115 self
116 }
117
118 pub fn key_path(mut self, key_path: &str) -> Self {
122 if !key_path.is_empty() {
123 self.config.key_path = Some(key_path.to_string())
124 }
125 self
126 }
127}
128
129impl Builder for EtcdBuilder {
130 type Config = EtcdConfig;
131
132 fn build(self) -> Result<impl Access> {
133 let endpoints = self
134 .config
135 .endpoints
136 .clone()
137 .unwrap_or_else(|| DEFAULT_ETCD_ENDPOINTS.to_string());
138
139 let endpoints: Vec<String> = endpoints.split(',').map(|s| s.to_string()).collect();
140
141 let mut options = ConnectOptions::new();
142
143 if self.config.ca_path.is_some()
144 && self.config.cert_path.is_some()
145 && self.config.key_path.is_some()
146 {
147 let ca = self.load_pem(self.config.ca_path.clone().unwrap().as_str())?;
148 let key = self.load_pem(self.config.key_path.clone().unwrap().as_str())?;
149 let cert = self.load_pem(self.config.cert_path.clone().unwrap().as_str())?;
150
151 let tls_options = TlsOptions::default()
152 .ca_certificate(Certificate::from_pem(ca))
153 .identity(Identity::from_pem(cert, key));
154 options = options.with_tls(tls_options);
155 }
156
157 if let Some(username) = self.config.username.clone() {
158 options = options.with_user(
159 username,
160 self.config.password.clone().unwrap_or("".to_string()),
161 );
162 }
163
164 let root = normalize_root(
165 self.config
166 .root
167 .clone()
168 .unwrap_or_else(|| "/".to_string())
169 .as_str(),
170 );
171
172 let client = OnceCell::new();
173
174 let core = EtcdCore {
175 endpoints,
176 client,
177 options,
178 };
179
180 Ok(EtcdAccessor::new(core, &root))
181 }
182}
183
184impl EtcdBuilder {
185 fn load_pem(&self, path: &str) -> Result<String> {
186 std::fs::read_to_string(path)
187 .map_err(|err| Error::new(ErrorKind::Unexpected, "invalid file path").set_source(err))
188 }
189}
190
191#[derive(Debug, Clone)]
192pub struct EtcdAccessor {
193 core: Arc<EtcdCore>,
194 info: Arc<AccessorInfo>,
195}
196
197impl EtcdAccessor {
198 fn new(core: EtcdCore, root: &str) -> Self {
199 let info = AccessorInfo::default();
200 info.set_scheme(ETCD_SCHEME);
201 info.set_name("etcd");
202 info.set_root(root);
203 info.set_native_capability(Capability {
204 read: true,
205
206 write: true,
207 write_can_empty: true,
208
209 delete: true,
210 stat: true,
211 list: true,
212
213 shared: true,
214
215 ..Default::default()
216 });
217
218 Self {
219 core: Arc::new(core),
220 info: Arc::new(info),
221 }
222 }
223}
224
225impl Access for EtcdAccessor {
226 type Reader = Buffer;
227 type Writer = EtcdWriter;
228 type Lister = oio::HierarchyLister<EtcdLister>;
229 type Deleter = oio::OneShotDeleter<EtcdDeleter>;
230
231 fn info(&self) -> Arc<AccessorInfo> {
232 self.info.clone()
233 }
234
235 async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result<RpCreateDir> {
236 let abs_path = build_abs_path(&self.info.root(), path);
237
238 let dir_path = if abs_path.ends_with('/') {
241 abs_path
242 } else {
243 format!("{abs_path}/")
244 };
245
246 self.core.set(&dir_path, Buffer::new()).await?;
248
249 Ok(RpCreateDir::default())
250 }
251
252 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
253 let abs_path = build_abs_path(&self.info.root(), path);
254
255 match self.core.get(&abs_path).await? {
257 Some(buffer) => {
258 let mut metadata = Metadata::new(EntryMode::from_path(&abs_path));
259 metadata.set_content_length(buffer.len() as u64);
260 Ok(RpStat::new(metadata))
261 }
262 None => {
263 let prefix = if abs_path.ends_with('/') {
265 abs_path
266 } else {
267 format!("{abs_path}/")
268 };
269
270 let has_children = self.core.has_prefix(&prefix).await?;
272 if has_children {
273 let metadata = Metadata::new(EntryMode::DIR);
275 Ok(RpStat::new(metadata))
276 } else {
277 Err(Error::new(ErrorKind::NotFound, "path not found"))
278 }
279 }
280 }
281 }
282
283 async fn read(&self, path: &str, op: OpRead) -> Result<(RpRead, Self::Reader)> {
284 let abs_path = build_abs_path(&self.info.root(), path);
285
286 match self.core.get(&abs_path).await? {
287 Some(buffer) => {
288 let range = op.range();
289
290 if range.is_full() {
292 return Ok((RpRead::new(), buffer));
293 }
294
295 let offset = range.offset() as usize;
297 if offset >= buffer.len() {
298 return Err(Error::new(
299 ErrorKind::RangeNotSatisfied,
300 "range start offset exceeds content length",
301 ));
302 }
303
304 let size = range.size().map(|s| s as usize);
305 let end = size.map_or(buffer.len(), |s| (offset + s).min(buffer.len()));
306 let sliced_buffer = buffer.slice(offset..end);
307
308 Ok((RpRead::new(), sliced_buffer))
309 }
310 None => Err(Error::new(ErrorKind::NotFound, "path not found")),
311 }
312 }
313
314 async fn write(&self, path: &str, _op: OpWrite) -> Result<(RpWrite, Self::Writer)> {
315 let abs_path = build_abs_path(&self.info.root(), path);
316 let writer = EtcdWriter::new(self.core.clone(), abs_path);
317 Ok((RpWrite::new(), writer))
318 }
319
320 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
321 let deleter = oio::OneShotDeleter::new(EtcdDeleter::new(
322 self.core.clone(),
323 self.info.root().to_string(),
324 ));
325 Ok((RpDelete::default(), deleter))
326 }
327
328 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
329 let lister = EtcdLister::new(
330 self.core.clone(),
331 self.info.root().to_string(),
332 path.to_string(),
333 )
334 .await?;
335 let lister = oio::HierarchyLister::new(lister, path, args.recursive());
336 Ok((RpList::default(), lister))
337 }
338}