opendal_core/services/redis/
backend.rs1use std::path::PathBuf;
19use std::sync::Arc;
20
21use http::Uri;
22use redis::Client;
23use redis::ConnectionAddr;
24use redis::ConnectionInfo;
25use redis::IntoConnectionInfo;
26use redis::ProtocolVersion;
27use redis::RedisConnectionInfo;
28use redis::cluster::ClusterClientBuilder;
29
30use super::REDIS_SCHEME;
31use super::config::RedisConfig;
32use super::core::*;
33use super::delete::RedisDeleter;
34use super::writer::RedisWriter;
35use crate::raw::*;
36use crate::*;
37
38const DEFAULT_REDIS_ENDPOINT: &str = "tcp://127.0.0.1:6379";
39const DEFAULT_REDIS_PORT: u16 = 6379;
40
41#[doc = include_str!("docs.md")]
43#[derive(Debug, Default)]
44pub struct RedisBuilder {
45 pub(super) config: RedisConfig,
46}
47
48impl RedisBuilder {
49 pub fn endpoint(mut self, endpoint: &str) -> Self {
57 if !endpoint.is_empty() {
58 self.config.endpoint = Some(endpoint.to_owned());
59 }
60 self
61 }
62
63 pub fn cluster_endpoints(mut self, cluster_endpoints: &str) -> Self {
72 if !cluster_endpoints.is_empty() {
73 self.config.cluster_endpoints = Some(cluster_endpoints.to_owned());
74 }
75 self
76 }
77
78 pub fn username(mut self, username: &str) -> Self {
82 if !username.is_empty() {
83 self.config.username = Some(username.to_owned());
84 }
85 self
86 }
87
88 pub fn password(mut self, password: &str) -> Self {
92 if !password.is_empty() {
93 self.config.password = Some(password.to_owned());
94 }
95 self
96 }
97
98 pub fn db(mut self, db: i64) -> Self {
102 self.config.db = db;
103 self
104 }
105
106 pub fn default_ttl(mut self, ttl: Duration) -> Self {
110 self.config.default_ttl = Some(ttl);
111 self
112 }
113
114 pub fn root(mut self, root: &str) -> Self {
118 self.config.root = if root.is_empty() {
119 None
120 } else {
121 Some(root.to_string())
122 };
123
124 self
125 }
126
127 #[must_use]
135 pub fn connection_pool_max_size(mut self, max_size: usize) -> Self {
136 assert!(max_size > 0, "max_size must be greater than zero!");
137 self.config.connection_pool_max_size = Some(max_size);
138 self
139 }
140}
141
142impl Builder for RedisBuilder {
143 type Config = RedisConfig;
144
145 fn build(self) -> Result<impl Access> {
146 let root = normalize_root(
147 self.config
148 .root
149 .clone()
150 .unwrap_or_else(|| "/".to_string())
151 .as_str(),
152 );
153
154 if let Some(endpoints) = self.config.cluster_endpoints.clone() {
155 let mut cluster_endpoints: Vec<ConnectionInfo> = Vec::default();
156 for endpoint in endpoints.split(',') {
157 cluster_endpoints.push(self.get_connection_info(endpoint.to_string())?);
158 }
159 let mut client_builder = ClusterClientBuilder::new(cluster_endpoints);
160 if let Some(username) = &self.config.username {
161 client_builder = client_builder.username(username.clone());
162 }
163 if let Some(password) = &self.config.password {
164 client_builder = client_builder.password(password.clone());
165 }
166 let client = client_builder.build().map_err(format_redis_error)?;
167
168 Ok(RedisBackend::new(RedisCore::new(
169 endpoints,
170 None,
171 Some(client),
172 self.config.default_ttl,
173 self.config.connection_pool_max_size,
174 ))
175 .with_normalized_root(root))
176 } else {
177 let endpoint = self
178 .config
179 .endpoint
180 .clone()
181 .unwrap_or_else(|| DEFAULT_REDIS_ENDPOINT.to_string());
182
183 let client =
184 Client::open(self.get_connection_info(endpoint.clone())?).map_err(|e| {
185 Error::new(ErrorKind::ConfigInvalid, "invalid or unsupported scheme")
186 .with_context("service", REDIS_SCHEME)
187 .with_context("endpoint", self.config.endpoint.as_ref().unwrap())
188 .with_context("db", self.config.db.to_string())
189 .set_source(e)
190 })?;
191
192 Ok(RedisBackend::new(RedisCore::new(
193 endpoint,
194 Some(client),
195 None,
196 self.config.default_ttl,
197 self.config.connection_pool_max_size,
198 ))
199 .with_normalized_root(root))
200 }
201 }
202}
203
204impl RedisBuilder {
205 fn get_connection_info(&self, endpoint: String) -> Result<ConnectionInfo> {
206 let ep_url = endpoint.parse::<Uri>().map_err(|e| {
207 Error::new(ErrorKind::ConfigInvalid, "endpoint is invalid")
208 .with_context("service", REDIS_SCHEME)
209 .with_context("endpoint", endpoint)
210 .set_source(e)
211 })?;
212
213 let con_addr = match ep_url.scheme_str() {
214 Some("tcp") | Some("redis") | None => {
215 let host = ep_url
216 .host()
217 .map(|h| h.to_string())
218 .unwrap_or_else(|| "127.0.0.1".to_string());
219 let port = ep_url.port_u16().unwrap_or(DEFAULT_REDIS_PORT);
220 ConnectionAddr::Tcp(host, port)
221 }
222 Some("rediss") => {
223 let host = ep_url
224 .host()
225 .map(|h| h.to_string())
226 .unwrap_or_else(|| "127.0.0.1".to_string());
227 let port = ep_url.port_u16().unwrap_or(DEFAULT_REDIS_PORT);
228 ConnectionAddr::TcpTls {
229 host,
230 port,
231 insecure: false,
232 tls_params: None,
233 }
234 }
235 Some("unix") | Some("redis+unix") => {
236 let path = PathBuf::from(ep_url.path());
237 ConnectionAddr::Unix(path)
238 }
239 Some(s) => {
240 return Err(
241 Error::new(ErrorKind::ConfigInvalid, "invalid or unsupported scheme")
242 .with_context("service", REDIS_SCHEME)
243 .with_context("scheme", s),
244 );
245 }
246 };
247
248 let mut redis_info = RedisConnectionInfo::default()
249 .set_db(self.config.db)
250 .set_protocol(ProtocolVersion::RESP2);
251 if let Some(username) = &self.config.username {
252 redis_info = redis_info.set_username(username);
253 }
254 if let Some(password) = &self.config.password {
255 redis_info = redis_info.set_password(password);
256 }
257 let connection_info = con_addr
258 .clone()
259 .into_connection_info()
260 .map_err(|err| {
261 Error::new(ErrorKind::ConfigInvalid, "invalid connection address")
262 .with_context("service", REDIS_SCHEME)
263 .with_context("address", con_addr)
264 .with_context("error", err)
265 })?
266 .set_redis_settings(redis_info);
267
268 Ok(connection_info)
269 }
270}
271
272#[derive(Debug, Clone)]
274pub struct RedisBackend {
275 core: Arc<RedisCore>,
276 root: String,
277 info: Arc<AccessorInfo>,
278}
279
280impl RedisBackend {
281 fn new(core: RedisCore) -> Self {
282 let info = AccessorInfo::default();
283 info.set_scheme(REDIS_SCHEME);
284 info.set_name(core.addr());
285 info.set_root("/");
286 info.set_native_capability(Capability {
287 read: true,
288 write: true,
289 delete: true,
290 stat: true,
291 write_can_empty: true,
292 shared: true,
293 ..Default::default()
294 });
295
296 Self {
297 core: Arc::new(core),
298 root: "/".to_string(),
299 info: Arc::new(info),
300 }
301 }
302
303 fn with_normalized_root(mut self, root: String) -> Self {
304 self.info.set_root(&root);
305 self.root = root;
306 self
307 }
308}
309
310impl Access for RedisBackend {
311 type Reader = Buffer;
312 type Writer = RedisWriter;
313 type Lister = ();
314 type Deleter = oio::OneShotDeleter<RedisDeleter>;
315
316 fn info(&self) -> Arc<AccessorInfo> {
317 self.info.clone()
318 }
319
320 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
321 let p = build_abs_path(&self.root, path);
322
323 if p == build_abs_path(&self.root, "") {
324 Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
325 } else {
326 let bs = self.core.get(&p).await?;
327 match bs {
328 Some(bs) => Ok(RpStat::new(
329 Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
330 )),
331 None => Err(Error::new(ErrorKind::NotFound, "key not found in redis")),
332 }
333 }
334 }
335
336 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
337 let p = build_abs_path(&self.root, path);
338
339 let range = args.range();
340 let buffer = if range.is_full() {
341 match self.core.get(&p).await? {
343 Some(bs) => bs,
344 None => return Err(Error::new(ErrorKind::NotFound, "key not found in redis")),
345 }
346 } else {
347 let start = range.offset() as isize;
349 let end = match range.size() {
350 Some(size) => (range.offset() + size - 1) as isize,
351 None => -1, };
353
354 match self.core.get_range(&p, start, end).await? {
355 Some(bs) => bs,
356 None => return Err(Error::new(ErrorKind::NotFound, "key not found in redis")),
357 }
358 };
359
360 Ok((RpRead::new(), buffer))
361 }
362
363 async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
364 let p = build_abs_path(&self.root, path);
365 Ok((RpWrite::new(), RedisWriter::new(self.core.clone(), p)))
366 }
367
368 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
369 Ok((
370 RpDelete::default(),
371 oio::OneShotDeleter::new(RedisDeleter::new(self.core.clone(), self.root.clone())),
372 ))
373 }
374}