opendal_core/services/redis/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::path::PathBuf;
19use std::sync::Arc;
20
21use http::Uri;
22use redis::Client;
23use redis::ConnectionAddr;
24use redis::ConnectionInfo;
25use redis::IntoConnectionInfo;
26use redis::ProtocolVersion;
27use redis::RedisConnectionInfo;
28use redis::cluster::ClusterClientBuilder;
29
30use super::REDIS_SCHEME;
31use super::config::RedisConfig;
32use super::core::*;
33use super::delete::RedisDeleter;
34use super::writer::RedisWriter;
35use crate::raw::*;
36use crate::*;
37
38const DEFAULT_REDIS_ENDPOINT: &str = "tcp://127.0.0.1:6379";
39const DEFAULT_REDIS_PORT: u16 = 6379;
40
41/// [Redis](https://redis.io/) services support.
42#[doc = include_str!("docs.md")]
43#[derive(Debug, Default)]
44pub struct RedisBuilder {
45    pub(super) config: RedisConfig,
46}
47
48impl RedisBuilder {
49    /// set the network address of redis service.
50    ///
51    /// currently supported schemes:
52    /// - no scheme: will be seen as "tcp"
53    /// - "tcp" or "redis": unsecured redis connections
54    /// - "rediss": secured redis connections
55    /// - "unix" or "redis+unix": unix socket connection
56    pub fn endpoint(mut self, endpoint: &str) -> Self {
57        if !endpoint.is_empty() {
58            self.config.endpoint = Some(endpoint.to_owned());
59        }
60        self
61    }
62
63    /// set the network address of redis cluster service.
64    /// This parameter is mutually exclusive with the endpoint parameter.
65    ///
66    /// currently supported schemes:
67    /// - no scheme: will be seen as "tcp"
68    /// - "tcp" or "redis": unsecured redis connections
69    /// - "rediss": secured redis connections
70    /// - "unix" or "redis+unix": unix socket connection
71    pub fn cluster_endpoints(mut self, cluster_endpoints: &str) -> Self {
72        if !cluster_endpoints.is_empty() {
73            self.config.cluster_endpoints = Some(cluster_endpoints.to_owned());
74        }
75        self
76    }
77
78    /// set the username for redis
79    ///
80    /// default: no username
81    pub fn username(mut self, username: &str) -> Self {
82        if !username.is_empty() {
83            self.config.username = Some(username.to_owned());
84        }
85        self
86    }
87
88    /// set the password for redis
89    ///
90    /// default: no password
91    pub fn password(mut self, password: &str) -> Self {
92        if !password.is_empty() {
93            self.config.password = Some(password.to_owned());
94        }
95        self
96    }
97
98    /// set the db used in redis
99    ///
100    /// default: 0
101    pub fn db(mut self, db: i64) -> Self {
102        self.config.db = db;
103        self
104    }
105
106    /// Set the default ttl for redis services.
107    ///
108    /// If set, we will specify `EX` for write operations.
109    pub fn default_ttl(mut self, ttl: Duration) -> Self {
110        self.config.default_ttl = Some(ttl);
111        self
112    }
113
114    /// set the working directory, all operations will be performed under it.
115    ///
116    /// default: "/"
117    pub fn root(mut self, root: &str) -> Self {
118        self.config.root = if root.is_empty() {
119            None
120        } else {
121            Some(root.to_string())
122        };
123
124        self
125    }
126
127    /// Sets the maximum number of connections managed by the pool.
128    ///
129    /// Defaults to 10.
130    ///
131    /// # Panics
132    ///
133    /// Will panic if `max_size` is 0.
134    #[must_use]
135    pub fn connection_pool_max_size(mut self, max_size: usize) -> Self {
136        assert!(max_size > 0, "max_size must be greater than zero!");
137        self.config.connection_pool_max_size = Some(max_size);
138        self
139    }
140}
141
142impl Builder for RedisBuilder {
143    type Config = RedisConfig;
144
145    fn build(self) -> Result<impl Access> {
146        let root = normalize_root(
147            self.config
148                .root
149                .clone()
150                .unwrap_or_else(|| "/".to_string())
151                .as_str(),
152        );
153
154        if let Some(endpoints) = self.config.cluster_endpoints.clone() {
155            let mut cluster_endpoints: Vec<ConnectionInfo> = Vec::default();
156            for endpoint in endpoints.split(',') {
157                cluster_endpoints.push(self.get_connection_info(endpoint.to_string())?);
158            }
159            let mut client_builder = ClusterClientBuilder::new(cluster_endpoints);
160            if let Some(username) = &self.config.username {
161                client_builder = client_builder.username(username.clone());
162            }
163            if let Some(password) = &self.config.password {
164                client_builder = client_builder.password(password.clone());
165            }
166            let client = client_builder.build().map_err(format_redis_error)?;
167
168            Ok(RedisBackend::new(RedisCore::new(
169                endpoints,
170                None,
171                Some(client),
172                self.config.default_ttl,
173                self.config.connection_pool_max_size,
174            ))
175            .with_normalized_root(root))
176        } else {
177            let endpoint = self
178                .config
179                .endpoint
180                .clone()
181                .unwrap_or_else(|| DEFAULT_REDIS_ENDPOINT.to_string());
182
183            let client =
184                Client::open(self.get_connection_info(endpoint.clone())?).map_err(|e| {
185                    Error::new(ErrorKind::ConfigInvalid, "invalid or unsupported scheme")
186                        .with_context("service", REDIS_SCHEME)
187                        .with_context("endpoint", self.config.endpoint.as_ref().unwrap())
188                        .with_context("db", self.config.db.to_string())
189                        .set_source(e)
190                })?;
191
192            Ok(RedisBackend::new(RedisCore::new(
193                endpoint,
194                Some(client),
195                None,
196                self.config.default_ttl,
197                self.config.connection_pool_max_size,
198            ))
199            .with_normalized_root(root))
200        }
201    }
202}
203
204impl RedisBuilder {
205    fn get_connection_info(&self, endpoint: String) -> Result<ConnectionInfo> {
206        let ep_url = endpoint.parse::<Uri>().map_err(|e| {
207            Error::new(ErrorKind::ConfigInvalid, "endpoint is invalid")
208                .with_context("service", REDIS_SCHEME)
209                .with_context("endpoint", endpoint)
210                .set_source(e)
211        })?;
212
213        let con_addr = match ep_url.scheme_str() {
214            Some("tcp") | Some("redis") | None => {
215                let host = ep_url
216                    .host()
217                    .map(|h| h.to_string())
218                    .unwrap_or_else(|| "127.0.0.1".to_string());
219                let port = ep_url.port_u16().unwrap_or(DEFAULT_REDIS_PORT);
220                ConnectionAddr::Tcp(host, port)
221            }
222            Some("rediss") => {
223                let host = ep_url
224                    .host()
225                    .map(|h| h.to_string())
226                    .unwrap_or_else(|| "127.0.0.1".to_string());
227                let port = ep_url.port_u16().unwrap_or(DEFAULT_REDIS_PORT);
228                ConnectionAddr::TcpTls {
229                    host,
230                    port,
231                    insecure: false,
232                    tls_params: None,
233                }
234            }
235            Some("unix") | Some("redis+unix") => {
236                let path = PathBuf::from(ep_url.path());
237                ConnectionAddr::Unix(path)
238            }
239            Some(s) => {
240                return Err(
241                    Error::new(ErrorKind::ConfigInvalid, "invalid or unsupported scheme")
242                        .with_context("service", REDIS_SCHEME)
243                        .with_context("scheme", s),
244                );
245            }
246        };
247
248        let mut redis_info = RedisConnectionInfo::default()
249            .set_db(self.config.db)
250            .set_protocol(ProtocolVersion::RESP2);
251        if let Some(username) = &self.config.username {
252            redis_info = redis_info.set_username(username);
253        }
254        if let Some(password) = &self.config.password {
255            redis_info = redis_info.set_password(password);
256        }
257        let connection_info = con_addr
258            .clone()
259            .into_connection_info()
260            .map_err(|err| {
261                Error::new(ErrorKind::ConfigInvalid, "invalid connection address")
262                    .with_context("service", REDIS_SCHEME)
263                    .with_context("address", con_addr)
264                    .with_context("error", err)
265            })?
266            .set_redis_settings(redis_info);
267
268        Ok(connection_info)
269    }
270}
271
272/// RedisBackend implements Access trait directly
273#[derive(Debug, Clone)]
274pub struct RedisBackend {
275    core: Arc<RedisCore>,
276    root: String,
277    info: Arc<AccessorInfo>,
278}
279
280impl RedisBackend {
281    fn new(core: RedisCore) -> Self {
282        let info = AccessorInfo::default();
283        info.set_scheme(REDIS_SCHEME);
284        info.set_name(core.addr());
285        info.set_root("/");
286        info.set_native_capability(Capability {
287            read: true,
288            write: true,
289            delete: true,
290            stat: true,
291            write_can_empty: true,
292            shared: true,
293            ..Default::default()
294        });
295
296        Self {
297            core: Arc::new(core),
298            root: "/".to_string(),
299            info: Arc::new(info),
300        }
301    }
302
303    fn with_normalized_root(mut self, root: String) -> Self {
304        self.info.set_root(&root);
305        self.root = root;
306        self
307    }
308}
309
310impl Access for RedisBackend {
311    type Reader = Buffer;
312    type Writer = RedisWriter;
313    type Lister = ();
314    type Deleter = oio::OneShotDeleter<RedisDeleter>;
315
316    fn info(&self) -> Arc<AccessorInfo> {
317        self.info.clone()
318    }
319
320    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
321        let p = build_abs_path(&self.root, path);
322
323        if p == build_abs_path(&self.root, "") {
324            Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
325        } else {
326            let bs = self.core.get(&p).await?;
327            match bs {
328                Some(bs) => Ok(RpStat::new(
329                    Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
330                )),
331                None => Err(Error::new(ErrorKind::NotFound, "key not found in redis")),
332            }
333        }
334    }
335
336    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
337        let p = build_abs_path(&self.root, path);
338
339        let range = args.range();
340        let buffer = if range.is_full() {
341            // Full read - use GET
342            match self.core.get(&p).await? {
343                Some(bs) => bs,
344                None => return Err(Error::new(ErrorKind::NotFound, "key not found in redis")),
345            }
346        } else {
347            // Range read - use GETRANGE
348            let start = range.offset() as isize;
349            let end = match range.size() {
350                Some(size) => (range.offset() + size - 1) as isize,
351                None => -1, // Redis uses -1 for end of string
352            };
353
354            match self.core.get_range(&p, start, end).await? {
355                Some(bs) => bs,
356                None => return Err(Error::new(ErrorKind::NotFound, "key not found in redis")),
357            }
358        };
359
360        Ok((RpRead::new(), buffer))
361    }
362
363    async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
364        let p = build_abs_path(&self.root, path);
365        Ok((RpWrite::new(), RedisWriter::new(self.core.clone(), p)))
366    }
367
368    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
369        Ok((
370            RpDelete::default(),
371            oio::OneShotDeleter::new(RedisDeleter::new(self.core.clone(), self.root.clone())),
372        ))
373    }
374}