opendal/services/etcd/
backend.rs1use std::sync::Arc;
19
20use etcd_client::Certificate;
21use etcd_client::ConnectOptions;
22use etcd_client::Identity;
23use etcd_client::TlsOptions;
24use tokio::sync::OnceCell;
25
26use super::ETCD_SCHEME;
27use super::config::EtcdConfig;
28use super::core::EtcdCore;
29use super::core::constants::DEFAULT_ETCD_ENDPOINTS;
30use super::deleter::EtcdDeleter;
31use super::lister::EtcdLister;
32use super::writer::EtcdWriter;
33use crate::raw::*;
34use crate::*;
35
36#[doc = include_str!("docs.md")]
38#[derive(Debug, Default)]
39pub struct EtcdBuilder {
40 pub(super) config: EtcdConfig,
41}
42
43impl EtcdBuilder {
44 pub fn endpoints(mut self, endpoints: &str) -> Self {
48 if !endpoints.is_empty() {
49 self.config.endpoints = Some(endpoints.to_owned());
50 }
51 self
52 }
53
54 pub fn username(mut self, username: &str) -> Self {
58 if !username.is_empty() {
59 self.config.username = Some(username.to_owned());
60 }
61 self
62 }
63
64 pub fn password(mut self, password: &str) -> Self {
68 if !password.is_empty() {
69 self.config.password = Some(password.to_owned());
70 }
71 self
72 }
73
74 pub fn root(mut self, root: &str) -> Self {
78 self.config.root = if root.is_empty() {
79 None
80 } else {
81 Some(root.to_string())
82 };
83
84 self
85 }
86
87 pub fn ca_path(mut self, ca_path: &str) -> Self {
91 if !ca_path.is_empty() {
92 self.config.ca_path = Some(ca_path.to_string())
93 }
94 self
95 }
96
97 pub fn cert_path(mut self, cert_path: &str) -> Self {
101 if !cert_path.is_empty() {
102 self.config.cert_path = Some(cert_path.to_string())
103 }
104 self
105 }
106
107 pub fn key_path(mut self, key_path: &str) -> Self {
111 if !key_path.is_empty() {
112 self.config.key_path = Some(key_path.to_string())
113 }
114 self
115 }
116}
117
118impl Builder for EtcdBuilder {
119 type Config = EtcdConfig;
120
121 fn build(self) -> Result<impl Access> {
122 let endpoints = self
123 .config
124 .endpoints
125 .clone()
126 .unwrap_or_else(|| DEFAULT_ETCD_ENDPOINTS.to_string());
127
128 let endpoints: Vec<String> = endpoints.split(',').map(|s| s.to_string()).collect();
129
130 let mut options = ConnectOptions::new();
131
132 if self.config.ca_path.is_some()
133 && self.config.cert_path.is_some()
134 && self.config.key_path.is_some()
135 {
136 let ca = self.load_pem(self.config.ca_path.clone().unwrap().as_str())?;
137 let key = self.load_pem(self.config.key_path.clone().unwrap().as_str())?;
138 let cert = self.load_pem(self.config.cert_path.clone().unwrap().as_str())?;
139
140 let tls_options = TlsOptions::default()
141 .ca_certificate(Certificate::from_pem(ca))
142 .identity(Identity::from_pem(cert, key));
143 options = options.with_tls(tls_options);
144 }
145
146 if let Some(username) = self.config.username.clone() {
147 options = options.with_user(
148 username,
149 self.config.password.clone().unwrap_or("".to_string()),
150 );
151 }
152
153 let root = normalize_root(
154 self.config
155 .root
156 .clone()
157 .unwrap_or_else(|| "/".to_string())
158 .as_str(),
159 );
160
161 let client = OnceCell::new();
162
163 let core = EtcdCore {
164 endpoints,
165 client,
166 options,
167 };
168
169 Ok(EtcdAccessor::new(core, &root))
170 }
171}
172
173impl EtcdBuilder {
174 fn load_pem(&self, path: &str) -> Result<String> {
175 std::fs::read_to_string(path)
176 .map_err(|err| Error::new(ErrorKind::Unexpected, "invalid file path").set_source(err))
177 }
178}
179
180#[derive(Debug, Clone)]
181pub struct EtcdAccessor {
182 core: Arc<EtcdCore>,
183 info: Arc<AccessorInfo>,
184}
185
186impl EtcdAccessor {
187 fn new(core: EtcdCore, root: &str) -> Self {
188 let info = AccessorInfo::default();
189 info.set_scheme(ETCD_SCHEME);
190 info.set_name("etcd");
191 info.set_root(root);
192 info.set_native_capability(Capability {
193 read: true,
194
195 write: true,
196 write_can_empty: true,
197
198 delete: true,
199 stat: true,
200 list: true,
201
202 shared: true,
203
204 ..Default::default()
205 });
206
207 Self {
208 core: Arc::new(core),
209 info: Arc::new(info),
210 }
211 }
212}
213
214impl Access for EtcdAccessor {
215 type Reader = Buffer;
216 type Writer = EtcdWriter;
217 type Lister = oio::HierarchyLister<EtcdLister>;
218 type Deleter = oio::OneShotDeleter<EtcdDeleter>;
219
220 fn info(&self) -> Arc<AccessorInfo> {
221 self.info.clone()
222 }
223
224 async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result<RpCreateDir> {
225 let abs_path = build_abs_path(&self.info.root(), path);
226
227 let dir_path = if abs_path.ends_with('/') {
230 abs_path
231 } else {
232 format!("{abs_path}/")
233 };
234
235 self.core.set(&dir_path, Buffer::new()).await?;
237
238 Ok(RpCreateDir::default())
239 }
240
241 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
242 let abs_path = build_abs_path(&self.info.root(), path);
243
244 match self.core.get(&abs_path).await? {
246 Some(buffer) => {
247 let mut metadata = Metadata::new(EntryMode::from_path(&abs_path));
248 metadata.set_content_length(buffer.len() as u64);
249 Ok(RpStat::new(metadata))
250 }
251 None => {
252 let prefix = if abs_path.ends_with('/') {
254 abs_path
255 } else {
256 format!("{abs_path}/")
257 };
258
259 let has_children = self.core.has_prefix(&prefix).await?;
261 if has_children {
262 let metadata = Metadata::new(EntryMode::DIR);
264 Ok(RpStat::new(metadata))
265 } else {
266 Err(Error::new(ErrorKind::NotFound, "path not found"))
267 }
268 }
269 }
270 }
271
272 async fn read(&self, path: &str, op: OpRead) -> Result<(RpRead, Self::Reader)> {
273 let abs_path = build_abs_path(&self.info.root(), path);
274
275 match self.core.get(&abs_path).await? {
276 Some(buffer) => {
277 let range = op.range();
278
279 if range.is_full() {
281 return Ok((RpRead::new(), buffer));
282 }
283
284 let offset = range.offset() as usize;
286 if offset >= buffer.len() {
287 return Err(Error::new(
288 ErrorKind::RangeNotSatisfied,
289 "range start offset exceeds content length",
290 ));
291 }
292
293 let size = range.size().map(|s| s as usize);
294 let end = size.map_or(buffer.len(), |s| (offset + s).min(buffer.len()));
295 let sliced_buffer = buffer.slice(offset..end);
296
297 Ok((RpRead::new(), sliced_buffer))
298 }
299 None => Err(Error::new(ErrorKind::NotFound, "path not found")),
300 }
301 }
302
303 async fn write(&self, path: &str, _op: OpWrite) -> Result<(RpWrite, Self::Writer)> {
304 let abs_path = build_abs_path(&self.info.root(), path);
305 let writer = EtcdWriter::new(self.core.clone(), abs_path);
306 Ok((RpWrite::new(), writer))
307 }
308
309 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
310 let deleter = oio::OneShotDeleter::new(EtcdDeleter::new(
311 self.core.clone(),
312 self.info.root().to_string(),
313 ));
314 Ok((RpDelete::default(), deleter))
315 }
316
317 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
318 let lister = EtcdLister::new(
319 self.core.clone(),
320 self.info.root().to_string(),
321 path.to_string(),
322 )
323 .await?;
324 let lister = oio::HierarchyLister::new(lister, path, args.recursive());
325 Ok((RpList::default(), lister))
326 }
327}