opendal/services/etcd/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use etcd_client::Certificate;
21use etcd_client::ConnectOptions;
22use etcd_client::Identity;
23use etcd_client::TlsOptions;
24use tokio::sync::OnceCell;
25
26use super::ETCD_SCHEME;
27use super::config::EtcdConfig;
28use super::core::EtcdCore;
29use super::core::constants::DEFAULT_ETCD_ENDPOINTS;
30use super::deleter::EtcdDeleter;
31use super::lister::EtcdLister;
32use super::writer::EtcdWriter;
33use crate::raw::*;
34use crate::*;
35
36/// [Etcd](https://etcd.io/) services support.
37#[doc = include_str!("docs.md")]
38#[derive(Debug, Default)]
39pub struct EtcdBuilder {
40    pub(super) config: EtcdConfig,
41}
42
43impl EtcdBuilder {
44    /// set the network address of etcd service.
45    ///
46    /// default: "http://127.0.0.1:2379"
47    pub fn endpoints(mut self, endpoints: &str) -> Self {
48        if !endpoints.is_empty() {
49            self.config.endpoints = Some(endpoints.to_owned());
50        }
51        self
52    }
53
54    /// set the username for etcd
55    ///
56    /// default: no username
57    pub fn username(mut self, username: &str) -> Self {
58        if !username.is_empty() {
59            self.config.username = Some(username.to_owned());
60        }
61        self
62    }
63
64    /// set the password for etcd
65    ///
66    /// default: no password
67    pub fn password(mut self, password: &str) -> Self {
68        if !password.is_empty() {
69            self.config.password = Some(password.to_owned());
70        }
71        self
72    }
73
74    /// set the working directory, all operations will be performed under it.
75    ///
76    /// default: "/"
77    pub fn root(mut self, root: &str) -> Self {
78        self.config.root = if root.is_empty() {
79            None
80        } else {
81            Some(root.to_string())
82        };
83
84        self
85    }
86
87    /// Set the certificate authority file path.
88    ///
89    /// default is None
90    pub fn ca_path(mut self, ca_path: &str) -> Self {
91        if !ca_path.is_empty() {
92            self.config.ca_path = Some(ca_path.to_string())
93        }
94        self
95    }
96
97    /// Set the certificate file path.
98    ///
99    /// default is None
100    pub fn cert_path(mut self, cert_path: &str) -> Self {
101        if !cert_path.is_empty() {
102            self.config.cert_path = Some(cert_path.to_string())
103        }
104        self
105    }
106
107    /// Set the key file path.
108    ///
109    /// default is None
110    pub fn key_path(mut self, key_path: &str) -> Self {
111        if !key_path.is_empty() {
112            self.config.key_path = Some(key_path.to_string())
113        }
114        self
115    }
116}
117
118impl Builder for EtcdBuilder {
119    type Config = EtcdConfig;
120
121    fn build(self) -> Result<impl Access> {
122        let endpoints = self
123            .config
124            .endpoints
125            .clone()
126            .unwrap_or_else(|| DEFAULT_ETCD_ENDPOINTS.to_string());
127
128        let endpoints: Vec<String> = endpoints.split(',').map(|s| s.to_string()).collect();
129
130        let mut options = ConnectOptions::new();
131
132        if self.config.ca_path.is_some()
133            && self.config.cert_path.is_some()
134            && self.config.key_path.is_some()
135        {
136            let ca = self.load_pem(self.config.ca_path.clone().unwrap().as_str())?;
137            let key = self.load_pem(self.config.key_path.clone().unwrap().as_str())?;
138            let cert = self.load_pem(self.config.cert_path.clone().unwrap().as_str())?;
139
140            let tls_options = TlsOptions::default()
141                .ca_certificate(Certificate::from_pem(ca))
142                .identity(Identity::from_pem(cert, key));
143            options = options.with_tls(tls_options);
144        }
145
146        if let Some(username) = self.config.username.clone() {
147            options = options.with_user(
148                username,
149                self.config.password.clone().unwrap_or("".to_string()),
150            );
151        }
152
153        let root = normalize_root(
154            self.config
155                .root
156                .clone()
157                .unwrap_or_else(|| "/".to_string())
158                .as_str(),
159        );
160
161        let client = OnceCell::new();
162
163        let core = EtcdCore {
164            endpoints,
165            client,
166            options,
167        };
168
169        Ok(EtcdAccessor::new(core, &root))
170    }
171}
172
173impl EtcdBuilder {
174    fn load_pem(&self, path: &str) -> Result<String> {
175        std::fs::read_to_string(path)
176            .map_err(|err| Error::new(ErrorKind::Unexpected, "invalid file path").set_source(err))
177    }
178}
179
180#[derive(Debug, Clone)]
181pub struct EtcdAccessor {
182    core: Arc<EtcdCore>,
183    info: Arc<AccessorInfo>,
184}
185
186impl EtcdAccessor {
187    fn new(core: EtcdCore, root: &str) -> Self {
188        let info = AccessorInfo::default();
189        info.set_scheme(ETCD_SCHEME);
190        info.set_name("etcd");
191        info.set_root(root);
192        info.set_native_capability(Capability {
193            read: true,
194
195            write: true,
196            write_can_empty: true,
197
198            delete: true,
199            stat: true,
200            list: true,
201
202            shared: true,
203
204            ..Default::default()
205        });
206
207        Self {
208            core: Arc::new(core),
209            info: Arc::new(info),
210        }
211    }
212}
213
214impl Access for EtcdAccessor {
215    type Reader = Buffer;
216    type Writer = EtcdWriter;
217    type Lister = oio::HierarchyLister<EtcdLister>;
218    type Deleter = oio::OneShotDeleter<EtcdDeleter>;
219
220    fn info(&self) -> Arc<AccessorInfo> {
221        self.info.clone()
222    }
223
224    async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result<RpCreateDir> {
225        let abs_path = build_abs_path(&self.info.root(), path);
226
227        // In etcd, we simulate directory creation by storing an empty value
228        // with the directory path (ensuring it ends with '/')
229        let dir_path = if abs_path.ends_with('/') {
230            abs_path
231        } else {
232            format!("{abs_path}/")
233        };
234
235        // Store an empty buffer to represent the directory
236        self.core.set(&dir_path, Buffer::new()).await?;
237
238        Ok(RpCreateDir::default())
239    }
240
241    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
242        let abs_path = build_abs_path(&self.info.root(), path);
243
244        // First check if it's a direct key
245        match self.core.get(&abs_path).await? {
246            Some(buffer) => {
247                let mut metadata = Metadata::new(EntryMode::from_path(&abs_path));
248                metadata.set_content_length(buffer.len() as u64);
249                Ok(RpStat::new(metadata))
250            }
251            None => {
252                // Check if it's a directory by looking for keys with this prefix
253                let prefix = if abs_path.ends_with('/') {
254                    abs_path
255                } else {
256                    format!("{abs_path}/")
257                };
258
259                // Use etcd prefix query to check if any keys exist with this prefix
260                let has_children = self.core.has_prefix(&prefix).await?;
261                if has_children {
262                    // Has children, it's a directory
263                    let metadata = Metadata::new(EntryMode::DIR);
264                    Ok(RpStat::new(metadata))
265                } else {
266                    Err(Error::new(ErrorKind::NotFound, "path not found"))
267                }
268            }
269        }
270    }
271
272    async fn read(&self, path: &str, op: OpRead) -> Result<(RpRead, Self::Reader)> {
273        let abs_path = build_abs_path(&self.info.root(), path);
274
275        match self.core.get(&abs_path).await? {
276            Some(buffer) => {
277                let range = op.range();
278
279                // If range is full, return the buffer directly
280                if range.is_full() {
281                    return Ok((RpRead::new(), buffer));
282                }
283
284                // Handle range requests
285                let offset = range.offset() as usize;
286                if offset >= buffer.len() {
287                    return Err(Error::new(
288                        ErrorKind::RangeNotSatisfied,
289                        "range start offset exceeds content length",
290                    ));
291                }
292
293                let size = range.size().map(|s| s as usize);
294                let end = size.map_or(buffer.len(), |s| (offset + s).min(buffer.len()));
295                let sliced_buffer = buffer.slice(offset..end);
296
297                Ok((RpRead::new(), sliced_buffer))
298            }
299            None => Err(Error::new(ErrorKind::NotFound, "path not found")),
300        }
301    }
302
303    async fn write(&self, path: &str, _op: OpWrite) -> Result<(RpWrite, Self::Writer)> {
304        let abs_path = build_abs_path(&self.info.root(), path);
305        let writer = EtcdWriter::new(self.core.clone(), abs_path);
306        Ok((RpWrite::new(), writer))
307    }
308
309    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
310        let deleter = oio::OneShotDeleter::new(EtcdDeleter::new(
311            self.core.clone(),
312            self.info.root().to_string(),
313        ));
314        Ok((RpDelete::default(), deleter))
315    }
316
317    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
318        let lister = EtcdLister::new(
319            self.core.clone(),
320            self.info.root().to_string(),
321            path.to_string(),
322        )
323        .await?;
324        let lister = oio::HierarchyLister::new(lister, path, args.recursive());
325        Ok((RpList::default(), lister))
326    }
327}