From 979309e46dc5cea35da067559d67971d3ce09d3e Mon Sep 17 00:00:00 2001 From: lilac Date: Mon, 18 Mar 2024 17:11:10 +0800 Subject: [PATCH 01/13] Convert ycsb to python 3 via `2to3 bin/ycsb -w` --- bin/ycsb | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/bin/ycsb b/bin/ycsb index b5c85e35bb..549e1c2d34 100755 --- a/bin/ycsb +++ b/bin/ycsb @@ -28,7 +28,7 @@ try: mod = __import__('argparse') import argparse except ImportError: - print >> sys.stderr, '[ERROR] argparse not found. Try installing it via "pip".' + print('[ERROR] argparse not found. Try installing it via "pip".', file=sys.stderr) exit(1) BASE_URL = "https://github.com/brianfrankcooper/YCSB/tree/master/" @@ -117,26 +117,26 @@ OPTIONS = { def usage(): output = io.BytesIO() - print >> output, "%s command database [options]" % sys.argv[0] + print("%s command database [options]" % sys.argv[0], file=output) - print >> output, "\nCommands:" + print("\nCommands:", file=output) for command in sorted(COMMANDS.keys()): - print >> output, " %s %s" % (command.ljust(14), - COMMANDS[command]["description"]) + print(" %s %s" % (command.ljust(14), + COMMANDS[command]["description"]), file=output) - print >> output, "\nDatabases:" + print("\nDatabases:", file=output) for db in sorted(DATABASES.keys()): - print >> output, " %s %s" % (db.ljust(14), BASE_URL + - db.split("-")[0]) + print(" %s %s" % (db.ljust(14), BASE_URL + + db.split("-")[0]), file=output) - print >> output, "\nOptions:" + print("\nOptions:", file=output) for option in sorted(OPTIONS.keys()): - print >> output, " %s %s" % (option.ljust(14), OPTIONS[option]) + print(" %s %s" % (option.ljust(14), OPTIONS[option]), file=output) - print >> output, """\nWorkload Files: + print("""\nWorkload Files: There are various predefined workloads under workloads/ directory. See https://github.com/brianfrankcooper/YCSB/wiki/Core-Properties - for the list of workload properties.""" + for the list of workload properties.""", file=output) return output.getvalue() @@ -177,13 +177,13 @@ def check_output(*popenargs, **kwargs): return output def debug(message): - print >> sys.stderr, "[DEBUG] ", message + print("[DEBUG] ", message, file=sys.stderr) def warn(message): - print >> sys.stderr, "[WARN] ", message + print("[WARN] ", message, file=sys.stderr) def error(message): - print >> sys.stderr, "[ERROR] ", message + print("[ERROR] ", message, file=sys.stderr) def find_jars(dir, glob='*.jar'): jars = [] @@ -220,7 +220,7 @@ def get_classpath_from_maven(module): # the last module will be the datastore binding line = [x for x in mvn_output.splitlines() if x.startswith("classpath=")][-1:] return line[0][len("classpath="):] - except subprocess.CalledProcessError, err: + except subprocess.CalledProcessError as err: error("Attempting to generate a classpath from Maven failed " "with return code '" + str(err.returncode) + "'. The output from " "Maven follows, try running " @@ -311,7 +311,7 @@ def main(): main_classname, "-db", db_classname] + remaining) if command: ycsb_command.append(command) - print >> sys.stderr, " ".join(ycsb_command) + print(" ".join(ycsb_command), file=sys.stderr) try: return subprocess.call(ycsb_command) except OSError as e: From a3c54169c1156b7c02a3d7fc3f7d0999724fe45f Mon Sep 17 00:00:00 2001 From: lilac Date: Mon, 18 Mar 2024 17:28:05 +0800 Subject: [PATCH 02/13] Fix errors due to bytes/string type error --- bin/ycsb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bin/ycsb b/bin/ycsb index 549e1c2d34..484d27298b 100755 --- a/bin/ycsb +++ b/bin/ycsb @@ -116,7 +116,7 @@ OPTIONS = { } def usage(): - output = io.BytesIO() + output = io.StringIO() print("%s command database [options]" % sys.argv[0], file=output) print("\nCommands:", file=output) @@ -164,7 +164,7 @@ def check_output(*popenargs, **kwargs): """ if 'stdout' in kwargs: raise ValueError('stdout argument not allowed, it will be overridden.') - process = subprocess.Popen(stdout=subprocess.PIPE, *popenargs, **kwargs) + process = subprocess.Popen(stdout=subprocess.PIPE, text=True, *popenargs, **kwargs) output, unused_err = process.communicate() retcode = process.poll() if retcode: From 2b1959068006584efb8a65ed83dc47007abf4c20 Mon Sep 17 00:00:00 2001 From: lilac Date: Sun, 28 Apr 2024 21:55:48 +0800 Subject: [PATCH 03/13] Support Redis SSL (#1) * Support Redis SSL * Change to python 3 * Fix Redis client * Update readme with new Redis connection parameters --- bin/ycsb | 2 +- pom.xml | 2 +- redis/README.md | 3 ++ .../main/java/site/ycsb/db/RedisClient.java | 43 +++++++++++-------- 4 files changed, 29 insertions(+), 21 deletions(-) diff --git a/bin/ycsb b/bin/ycsb index 484d27298b..719bd5fcaf 100755 --- a/bin/ycsb +++ b/bin/ycsb @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # # Copyright (c) 2012 - 2020 YCSB contributors. All rights reserved. # diff --git a/pom.xml b/pom.xml index de2d5eea23..c065b8d6cb 100644 --- a/pom.xml +++ b/pom.xml @@ -138,7 +138,7 @@ LICENSE file. 2.1.1 2.2.37 UTF-8 - 2.9.0 + 5.1.2 2.0.5 6.2.2 1.10.20 diff --git a/redis/README.md b/redis/README.md index d21118eb7a..1766396be8 100644 --- a/redis/README.md +++ b/redis/README.md @@ -42,6 +42,9 @@ Set host, port, password, and cluster mode in the workload you plan to run. - `redis.cluster` * Set the cluster parameter to `true` if redis cluster mode is enabled. * Default is `false`. +- `redis.ssl` + * Set the ssl parameter to `true` if redis instance has enabled SSL. + * Default is `false`. Or, you can set configs with the shell command, EG: diff --git a/redis/src/main/java/site/ycsb/db/RedisClient.java b/redis/src/main/java/site/ycsb/db/RedisClient.java index 2de9ad2329..8b53d47606 100644 --- a/redis/src/main/java/site/ycsb/db/RedisClient.java +++ b/redis/src/main/java/site/ycsb/db/RedisClient.java @@ -29,15 +29,14 @@ import site.ycsb.DBException; import site.ycsb.Status; import site.ycsb.StringByteIterator; -import redis.clients.jedis.BasicCommands; +import redis.clients.jedis.DefaultJedisClientConfig; import redis.clients.jedis.HostAndPort; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisCluster; -import redis.clients.jedis.JedisCommands; +import redis.clients.jedis.commands.JedisCommands; import redis.clients.jedis.Protocol; +import redis.clients.jedis.DefaultJedisClientConfig.Builder; -import java.io.Closeable; -import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.HashSet; @@ -61,6 +60,7 @@ public class RedisClient extends DB { public static final String PASSWORD_PROPERTY = "redis.password"; public static final String CLUSTER_PROPERTY = "redis.cluster"; public static final String TIMEOUT_PROPERTY = "redis.timeout"; + public static final String SSL_PROPERTY = "redis.ssl"; public static final String INDEX_KEY = "_indices"; @@ -77,30 +77,36 @@ public void init() throws DBException { String host = props.getProperty(HOST_PROPERTY); boolean clusterEnabled = Boolean.parseBoolean(props.getProperty(CLUSTER_PROPERTY)); + boolean sslEnabled = Boolean.parseBoolean(props.getProperty(SSL_PROPERTY)); + String password = props.getProperty(PASSWORD_PROPERTY); if (clusterEnabled) { Set jedisClusterNodes = new HashSet<>(); jedisClusterNodes.add(new HostAndPort(host, port)); - jedis = new JedisCluster(jedisClusterNodes); + Builder builder = DefaultJedisClientConfig.builder().ssl(sslEnabled); + if (password != null) { + builder = builder.password(password); + } + jedis = new JedisCluster(jedisClusterNodes, builder.build()); } else { String redisTimeout = props.getProperty(TIMEOUT_PROPERTY); - if (redisTimeout != null){ - jedis = new Jedis(host, port, Integer.parseInt(redisTimeout)); + Jedis jedisServer; + if (redisTimeout != null) { + jedisServer = new Jedis(host, port, Integer.parseInt(redisTimeout), sslEnabled); } else { - jedis = new Jedis(host, port); + jedisServer = new Jedis(host, port, sslEnabled); + } + jedisServer.connect(); + jedis = jedisServer; + if (password != null) { + jedisServer.auth(password); } - ((Jedis) jedis).connect(); - } - - String password = props.getProperty(PASSWORD_PROPERTY); - if (password != null) { - ((BasicCommands) jedis).auth(password); } } public void cleanup() throws DBException { try { - ((Closeable) jedis).close(); - } catch (IOException e) { + ((AutoCloseable) jedis).close(); + } catch (Exception e) { throw new DBException("Closing connection failed."); } } @@ -123,8 +129,7 @@ public Status read(String table, String key, Set fields, if (fields == null) { StringByteIterator.putAllAsByteIterators(result, jedis.hgetAll(key)); } else { - String[] fieldArray = - (String[]) fields.toArray(new String[fields.size()]); + String[] fieldArray = (String[]) fields.toArray(new String[fields.size()]); List values = jedis.hmget(key, fieldArray); Iterator fieldIterator = fields.iterator(); @@ -166,7 +171,7 @@ public Status update(String table, String key, @Override public Status scan(String table, String startkey, int recordcount, Set fields, Vector> result) { - Set keys = jedis.zrangeByScore(INDEX_KEY, hash(startkey), + List keys = jedis.zrangeByScore(INDEX_KEY, hash(startkey), Double.POSITIVE_INFINITY, 0, recordcount); HashMap values; From 14b68a8169699570677cc81f5db4a2a5d88d3691 Mon Sep 17 00:00:00 2001 From: Huiyong Wang Date: Fri, 17 May 2024 17:57:08 +0800 Subject: [PATCH 04/13] add binding for Redis using Lettuce --- pom.xml | 2 + redislettuce/pom.xml | 45 +++ .../java/site/ycsb/db/RedisLettuceClient.java | 334 ++++++++++++++++++ .../main/java/site/ycsb/db/package-info.java | 22 ++ 4 files changed, 403 insertions(+) create mode 100644 redislettuce/pom.xml create mode 100644 redislettuce/src/main/java/site/ycsb/db/RedisLettuceClient.java create mode 100644 redislettuce/src/main/java/site/ycsb/db/package-info.java diff --git a/pom.xml b/pom.xml index c065b8d6cb..3fa040cce8 100644 --- a/pom.xml +++ b/pom.xml @@ -139,6 +139,7 @@ LICENSE file. 2.2.37 UTF-8 5.1.2 + 6.2.4.RELEASE 2.0.5 6.2.2 1.10.20 @@ -192,6 +193,7 @@ LICENSE file. postgrenosql rados redis + redislettuce rest riak rocksdb diff --git a/redislettuce/pom.xml b/redislettuce/pom.xml new file mode 100644 index 0000000000..51f5609ad3 --- /dev/null +++ b/redislettuce/pom.xml @@ -0,0 +1,45 @@ + + + + + 4.0.0 + + site.ycsb + binding-parent + 0.18.0-SNAPSHOT + ../binding-parent + + + redislettuce-binding + Redis DB Binding with Lettuce + jar + + + + io.lettuce + lettuce-core + ${redis.lettuce.version} + + + site.ycsb + core + ${project.version} + provided + + + diff --git a/redislettuce/src/main/java/site/ycsb/db/RedisLettuceClient.java b/redislettuce/src/main/java/site/ycsb/db/RedisLettuceClient.java new file mode 100644 index 0000000000..64fb9f1019 --- /dev/null +++ b/redislettuce/src/main/java/site/ycsb/db/RedisLettuceClient.java @@ -0,0 +1,334 @@ +package site.ycsb.db; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.ArrayList; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; +import site.ycsb.ByteIterator; +import site.ycsb.DB; +import site.ycsb.DBException; +import site.ycsb.Status; +import site.ycsb.StringByteIterator; + +import io.lettuce.core.api.StatefulConnection; +import io.lettuce.core.AbstractRedisClient; +import io.lettuce.core.cluster.RedisClusterClient; +import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; +import io.lettuce.core.cluster.api.sync.RedisClusterCommands; +import io.lettuce.core.codec.StringCodec; +import io.lettuce.core.masterreplica.MasterReplica; +import io.lettuce.core.masterreplica.StatefulRedisMasterReplicaConnection; +import io.lettuce.core.resource.DefaultClientResources; +import io.lettuce.core.resource.DirContextDnsResolver; +import io.lettuce.core.KeyValue; +import io.lettuce.core.Limit; +import io.lettuce.core.Range; +import io.lettuce.core.ReadFrom; +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisURI; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.Vector; + +/** + * YCSB binding for Redis using Lettuce client. + */ +public class RedisLettuceClient extends DB { + + public static final String HOST_PROPERTY = "redis.host"; + public static final String PORT_PROPERTY = "redis.port"; + public static final int DEFAULT_PORT = 6379; + public static final String PASSWORD_PROPERTY = "redis.password"; + public static final String CLUSTER_PROPERTY = "redis.cluster"; + public static final String READ_FROM = "redis.readfrom"; + public static final String TIMEOUT_PROPERTY = "redis.timeout"; + public static final String SSL_PROPERTY = "redis.ssl"; + + public static final String INDEX_KEY = "_indices"; + + private static final AtomicInteger INIT_COUNT = new AtomicInteger(0); + + private static AbstractRedisClient client = null; + private static StatefulConnection stringConnection = null; + private static boolean isCluster = false; + + /** + * for redis cluster instances. + * @param host + * @param port + * @param enableSSL + * @return + */ + private RedisClusterClient createClusterClient(String host, int port, boolean enableSSL) { + DefaultClientResources resources = DefaultClientResources.builder() + .dnsResolver(new DirContextDnsResolver()) + .build(); + RedisURI primaryNode = RedisURI.builder() + .withSsl(enableSSL) + .withHost(host) + .withPort(port) + .build(); + + RedisClusterClient clusterClient = RedisClusterClient.create(resources, primaryNode); + return clusterClient; + } + + /** + * for redis instances not in a cluster. + * @param host + * @param port + * @param enableSSL + * @return + */ + private RedisClient createClient() { + DefaultClientResources resources = DefaultClientResources.builder() + .dnsResolver(new DirContextDnsResolver()) + .build(); + RedisClient redisClient = RedisClient.create(resources); + return redisClient; + } + + private StatefulRedisMasterReplicaConnection createConnection(RedisClient redisClient, + List hosts, int port, boolean enableSSL, ReadFrom readFrom) { + List nodes = new ArrayList(); + for (String host : hosts) { + RedisURI node = RedisURI.builder() + .withSsl(enableSSL) + .withHost(host) + .withPort(port) + .build(); + nodes.add(node); + } + StatefulRedisMasterReplicaConnection masterReplicaConnection = + MasterReplica.connect(redisClient, StringCodec.UTF8, nodes); + if (readFrom != null) { + masterReplicaConnection.setReadFrom(readFrom); + } + return masterReplicaConnection; + } + + private void setupConnection() throws DBException { + if (client != null) { + return; + } + + Properties props = getProperties(); + int port; + + String portString = props.getProperty(PORT_PROPERTY); + if (portString != null) { + port = Integer.parseInt(portString); + } else { + port = DEFAULT_PORT; + } + String host = props.getProperty(HOST_PROPERTY); + + boolean clusterEnabled = Boolean.parseBoolean(props.getProperty(CLUSTER_PROPERTY)); + boolean sslEnabled = Boolean.parseBoolean(props.getProperty(SSL_PROPERTY, "false")); + + ReadFrom readFrom = null; + String readFromString = props.getProperty(READ_FROM); + if (readFromString != null) { + if ("replica_preferred".equals(readFromString)) { + readFrom = ReadFrom.REPLICA_PREFERRED; + } else if ("master_preferred".equals(readFromString)) { + readFrom = ReadFrom.MASTER_PREFERRED; + } else if ("master".equals(readFromString)) { + readFrom = ReadFrom.MASTER; + } else if ("replica".equals(readFromString)) { + readFrom = ReadFrom.REPLICA; + } else { + throw new DBException("readfrom " + readFromString + " not support"); + } + } + + if (clusterEnabled) { + RedisClusterClient clusterClient = createClusterClient(host, port, sslEnabled); + StatefulRedisClusterConnection clusterConnection = clusterClient.connect(StringCodec.UTF8); + if (readFrom != null) { + clusterConnection.setReadFrom(readFrom); + } + client = clusterClient; + stringConnection = clusterConnection; + isCluster = true; + } else { + List hosts = Arrays.asList(host.split(",")); + RedisClient redisClient = createClient(); + StatefulRedisMasterReplicaConnection masterReplicaConnection = createConnection(redisClient, hosts, + port, sslEnabled, readFrom); + client = redisClient; + stringConnection = masterReplicaConnection; + isCluster = false; + } + } + + private void shutdownConnection() throws DBException { + stringConnection.close(); + stringConnection = null; + + client.close(); + client = null; + } + + private RedisClusterCommands getRedisClusterCommands() { + RedisClusterCommands cmds = null; + if (stringConnection != null) { + if (isCluster) { + cmds = ((StatefulRedisClusterConnection)stringConnection).sync(); + } else { + cmds = ((StatefulRedisMasterReplicaConnection)stringConnection).sync(); + } + } + return cmds; + } + + /** + * Initialize any state for this DB. + * Called once per DB instance; there is one DB instance per client thread. + */ + public void init() throws DBException { + // Keep track of number of calls to init (for later cleanup) + INIT_COUNT.incrementAndGet(); + + // Synchronized so that we only have a single + // connection instance for all the threads. + synchronized (INIT_COUNT) { + setupConnection(); + } + } + + /** + * Cleanup any state for this DB. + * Called once per DB instance; there is one DB instance per client thread. + */ + public void cleanup() throws DBException { + synchronized (INIT_COUNT) { + final int curInitCount = INIT_COUNT.decrementAndGet(); + if (curInitCount <= 0) { + shutdownConnection(); + } + if (curInitCount < 0) { + // This should never happen. + throw new DBException( + String.format("initCount is negative: %d", curInitCount)); + } + } + } + + /** + * Read a record from the database. Each field/value pair from the result will be stored in a HashMap. + * + * @param table The name of the table + * @param key The record key of the record to read. + * @param fields The list of fields to read, or null for all of them + * @param result A HashMap of field/value pairs for the result + * @return The result of the operation. + */ + @Override + public Status read(String table, String key, Set fields, Map result) { + RedisClusterCommands cmds = getRedisClusterCommands(); + if (fields == null) { + StringByteIterator.putAllAsByteIterators(result, cmds.hgetall(key)); + } else { + String[] fieldArray = (String[]) fields.toArray(new String[fields.size()]); + List> values = cmds.hmget(key, fieldArray); + + Iterator> fieldValueIterator = values.iterator(); + + while (fieldValueIterator.hasNext()) { + KeyValue fieldValue = fieldValueIterator.next(); + result.put(fieldValue.getKey(), + new StringByteIterator(fieldValue.getValue())); + } + } + return result.isEmpty() ? Status.ERROR : Status.OK; + } + + private double hash(String key) { + return key.hashCode(); + } + + /** + * Perform a range scan for a set of records in the database. Each field/value pair from the result will be stored + * in a HashMap. + * + * @param table The name of the table + * @param startkey The record key of the first record to read. + * @param recordcount The number of records to read + * @param fields The list of fields to read, or null for all of them + * @param result A Vector of HashMaps, where each HashMap is a set field/value pairs for one record + * @return The result of the operation. + */ + @Override + public Status scan(String table, String startkey, int recordcount, Set fields, + Vector> result) { + RedisClusterCommands cmds = getRedisClusterCommands(); + List keys = cmds.zrangebyscore(INDEX_KEY, + Range.from(Range.Boundary.excluding(hash(startkey)), Range.Boundary.excluding(Double.POSITIVE_INFINITY)), + Limit.from(recordcount)); + + HashMap values; + for (String key : keys) { + values = new HashMap(); + read(table, key, fields, values); + result.add(values); + } + + return Status.OK; + } + + /** + * Update a record in the database. Any field/value pairs in the specified values HashMap will be written into the + * record with the specified record key, overwriting any existing values with the same field name. + * + * @param table The name of the table + * @param key The record key of the record to write. + * @param values A HashMap of field/value pairs to update in the record + * @return The result of the operation. + */ + @Override + public Status update(String table, String key, Map values) { + RedisClusterCommands cmds = getRedisClusterCommands(); + return cmds.hmset(key, StringByteIterator.getStringMap(values)) + .equals("OK") ? Status.OK : Status.ERROR; + } + + /** + * Insert a record in the database. Any field/value pairs in the specified values HashMap will be written into the + * record with the specified record key. + * + * @param table The name of the table + * @param key The record key of the record to insert. + * @param values A HashMap of field/value pairs to insert in the record + * @return The result of the operation. + */ + @Override + public Status insert(String table, String key, Map values) { + RedisClusterCommands cmds = getRedisClusterCommands(); + if (cmds.hmset(key, StringByteIterator.getStringMap(values)) + .equals("OK")) { + cmds.zadd(INDEX_KEY, hash(key), key); + return Status.OK; + } + return Status.ERROR; + } + + /** + * Delete a record from the database. + * + * @param table The name of the table + * @param key The record key of the record to delete. + * @return The result of the operation. + */ + @Override + public Status delete(String table, String key) { + RedisClusterCommands cmds = getRedisClusterCommands(); + return cmds.del(key) == 0 && cmds.zrem(INDEX_KEY, key) == 0 ? Status.ERROR + : Status.OK; + } + +} diff --git a/redislettuce/src/main/java/site/ycsb/db/package-info.java b/redislettuce/src/main/java/site/ycsb/db/package-info.java new file mode 100644 index 0000000000..6144d5aba6 --- /dev/null +++ b/redislettuce/src/main/java/site/ycsb/db/package-info.java @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2014, Yahoo!, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +/** + * The YCSB binding for Redis. + */ +package site.ycsb.db; + From e6a79ffd86c13ec850b537c960181f8484d2f0c8 Mon Sep 17 00:00:00 2001 From: Huiyong Wang Date: Fri, 17 May 2024 18:52:54 +0800 Subject: [PATCH 05/13] add redislettuce db option in ycsb script --- bin/ycsb | 1 + 1 file changed, 1 insertion(+) diff --git a/bin/ycsb b/bin/ycsb index 719bd5fcaf..5cdab4f826 100755 --- a/bin/ycsb +++ b/bin/ycsb @@ -93,6 +93,7 @@ DATABASES = { "postgrenosql" : "site.ycsb.postgrenosql.PostgreNoSQLDBClient", "rados" : "site.ycsb.db.RadosClient", "redis" : "site.ycsb.db.RedisClient", + "redislettuce" : "site.ycsb.db.RedisLettuceClient", "rest" : "site.ycsb.webservice.rest.RestClient", "riak" : "site.ycsb.db.riak.RiakKVClient", "rocksdb" : "site.ycsb.db.rocksdb.RocksDBClient", From 92615b9cdee2f511c157ba9abb7ec0dbff1dfec0 Mon Sep 17 00:00:00 2001 From: Huiyong Wang Date: Tue, 21 May 2024 22:01:04 +0800 Subject: [PATCH 06/13] refactor and add multiple connections mode for redislettuce --- .../java/site/ycsb/db/RedisLettuceClient.java | 154 ++++++++++++++++-- 1 file changed, 137 insertions(+), 17 deletions(-) diff --git a/redislettuce/src/main/java/site/ycsb/db/RedisLettuceClient.java b/redislettuce/src/main/java/site/ycsb/db/RedisLettuceClient.java index 64fb9f1019..01594422af 100644 --- a/redislettuce/src/main/java/site/ycsb/db/RedisLettuceClient.java +++ b/redislettuce/src/main/java/site/ycsb/db/RedisLettuceClient.java @@ -5,6 +5,7 @@ import java.util.List; import java.util.ArrayList; import java.util.Properties; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; import site.ycsb.ByteIterator; import site.ycsb.DB; @@ -44,16 +45,25 @@ public class RedisLettuceClient extends DB { public static final int DEFAULT_PORT = 6379; public static final String PASSWORD_PROPERTY = "redis.password"; public static final String CLUSTER_PROPERTY = "redis.cluster"; - public static final String READ_FROM = "redis.readfrom"; + public static final String READ_FROM = "redis.readfrom"; // replica_preferred, replica, master_preferred, master public static final String TIMEOUT_PROPERTY = "redis.timeout"; public static final String SSL_PROPERTY = "redis.ssl"; + public static final String CONNECTION_PROPERTY = "redis.connection"; // connection mode + public static final String CONNECTION_SINGLE = "single"; // single connection for all threads + public static final String CONNECTION_MULTIPLE = "multi"; // multiple connections for all threads + public static final String MULTI_SIZE_PROPERTY = "multi.size"; // connections amount for multi connection model + + enum ConnectionMode { + SINGLE, MULTIPLE + } + public static final String INDEX_KEY = "_indices"; private static final AtomicInteger INIT_COUNT = new AtomicInteger(0); private static AbstractRedisClient client = null; - private static StatefulConnection stringConnection = null; + private static ConnectionProvider stringConnectionProvider = null; private static boolean isCluster = false; /** @@ -63,7 +73,7 @@ public class RedisLettuceClient extends DB { * @param enableSSL * @return */ - private RedisClusterClient createClusterClient(String host, int port, boolean enableSSL) { + static RedisClusterClient createClusterClient(String host, int port, boolean enableSSL) { DefaultClientResources resources = DefaultClientResources.builder() .dnsResolver(new DirContextDnsResolver()) .build(); @@ -77,6 +87,15 @@ private RedisClusterClient createClusterClient(String host, int port, boolean en return clusterClient; } + static StatefulRedisClusterConnection createConnection( + RedisClusterClient clusterClient, ReadFrom readFrom) { + StatefulRedisClusterConnection clusterConnection = clusterClient.connect(StringCodec.UTF8); + if (readFrom != null) { + clusterConnection.setReadFrom(readFrom); + } + return clusterConnection; + } + /** * for redis instances not in a cluster. * @param host @@ -84,7 +103,7 @@ private RedisClusterClient createClusterClient(String host, int port, boolean en * @param enableSSL * @return */ - private RedisClient createClient() { + static RedisClient createClient() { DefaultClientResources resources = DefaultClientResources.builder() .dnsResolver(new DirContextDnsResolver()) .build(); @@ -92,8 +111,13 @@ private RedisClient createClient() { return redisClient; } - private StatefulRedisMasterReplicaConnection createConnection(RedisClient redisClient, + static StatefulRedisMasterReplicaConnection createConnection(RedisClient redisClient, List hosts, int port, boolean enableSSL, ReadFrom readFrom) { + List nodes = getNodes(hosts, port, enableSSL); + return createConnection(redisClient, nodes, readFrom); + } + + static List getNodes(List hosts, int port, boolean enableSSL) { List nodes = new ArrayList(); for (String host : hosts) { RedisURI node = RedisURI.builder() @@ -103,6 +127,11 @@ private StatefulRedisMasterReplicaConnection createConnection(Re .build(); nodes.add(node); } + return nodes; + } + + static StatefulRedisMasterReplicaConnection createConnection(RedisClient redisClient, + List nodes, ReadFrom readFrom) { StatefulRedisMasterReplicaConnection masterReplicaConnection = MasterReplica.connect(redisClient, StringCodec.UTF8, nodes); if (readFrom != null) { @@ -142,39 +171,72 @@ private void setupConnection() throws DBException { } else if ("replica".equals(readFromString)) { readFrom = ReadFrom.REPLICA; } else { - throw new DBException("readfrom " + readFromString + " not support"); + throw new DBException("unknown readfrom: " + readFromString); } } + ConnectionMode connectionMode; + String connectionModeString = props.getProperty(CONNECTION_PROPERTY); + if (CONNECTION_SINGLE.equals(connectionModeString)) { + connectionMode = ConnectionMode.SINGLE; + } else if (CONNECTION_MULTIPLE.equals(connectionModeString)) { + connectionMode = ConnectionMode.MULTIPLE; + } else { + throw new DBException("unknown connectionMode: " + connectionModeString); + } + if (clusterEnabled) { RedisClusterClient clusterClient = createClusterClient(host, port, sslEnabled); - StatefulRedisClusterConnection clusterConnection = clusterClient.connect(StringCodec.UTF8); - if (readFrom != null) { - clusterConnection.setReadFrom(readFrom); + ConnectionProvider connectionProvider = null; + if (connectionMode == ConnectionMode.SINGLE) { + connectionProvider = new SingleConnectionProvider(clusterClient, readFrom); + } else if (connectionMode == ConnectionMode.MULTIPLE) { + int processors = Runtime.getRuntime().availableProcessors(); + int amount = Integer.parseInt(props.getProperty(MULTI_SIZE_PROPERTY, Integer.toString(processors))); + connectionProvider = new MultipleConnectionsProvider(clusterClient, readFrom, amount); } + client = clusterClient; - stringConnection = clusterConnection; + stringConnectionProvider = connectionProvider; isCluster = true; } else { List hosts = Arrays.asList(host.split(",")); RedisClient redisClient = createClient(); - StatefulRedisMasterReplicaConnection masterReplicaConnection = createConnection(redisClient, hosts, - port, sslEnabled, readFrom); + List nodes = getNodes(hosts, port, sslEnabled); + + ConnectionProvider connectionProvider = null; + if (connectionMode == ConnectionMode.SINGLE) { + connectionProvider = new SingleConnectionProvider(redisClient, nodes, readFrom); + } else if (connectionMode == ConnectionMode.MULTIPLE) { + int processors = Runtime.getRuntime().availableProcessors(); + int amount = Integer.parseInt(props.getProperty(MULTI_SIZE_PROPERTY, Integer.toString(processors))); + connectionProvider = new MultipleConnectionsProvider(redisClient, nodes, readFrom, amount); + } + client = redisClient; - stringConnection = masterReplicaConnection; + stringConnectionProvider = connectionProvider; isCluster = false; } } private void shutdownConnection() throws DBException { - stringConnection.close(); - stringConnection = null; + if (stringConnectionProvider != null) { + try { + stringConnectionProvider.close(); + } catch(Exception ex) { + // ignore + } + stringConnectionProvider = null; + } - client.close(); - client = null; + if (client != null) { + client.close(); + client = null; + } } private RedisClusterCommands getRedisClusterCommands() { + StatefulConnection stringConnection = stringConnectionProvider.getConnection(); RedisClusterCommands cmds = null; if (stringConnection != null) { if (isCluster) { @@ -331,4 +393,62 @@ public Status delete(String table, String key) { : Status.OK; } + private interface ConnectionProvider extends AutoCloseable { + StatefulConnection getConnection(); + } + + private static class SingleConnectionProvider implements ConnectionProvider { + + private StatefulConnection stringConnection = null; + + public SingleConnectionProvider(RedisClusterClient clusterClient, ReadFrom readFrom) { + this.stringConnection = createConnection(clusterClient, readFrom); + } + + public SingleConnectionProvider(RedisClient redisClient, List nodes, ReadFrom readFrom) { + this.stringConnection = createConnection(redisClient, nodes, readFrom); + } + + @Override + public StatefulConnection getConnection() { + return stringConnection; + } + + @Override + public void close() throws Exception { + stringConnection.close(); + } + } + + private static class MultipleConnectionsProvider implements ConnectionProvider { + private List> stringConnections = + new ArrayList>(); + + public MultipleConnectionsProvider(RedisClusterClient clusterClient, ReadFrom readFrom, int amount) { + for (int i = 0; i < amount; i++) { + this.stringConnections.add(createConnection(clusterClient, readFrom)); + } + } + + public MultipleConnectionsProvider(RedisClient redisClient, List nodes, ReadFrom readFrom, int amount) { + for (int i = 0; i < amount; i++) { + this.stringConnections.add(createConnection(redisClient, nodes, readFrom)); + } + } + + @Override + public StatefulConnection getConnection() { + ThreadLocalRandom random = ThreadLocalRandom.current(); + return this.stringConnections.get(random.nextInt(0, this.stringConnections.size())); + } + + @Override + public void close() throws Exception { + for (StatefulConnection stringConnection: stringConnections) { + stringConnection.close(); + } + stringConnections.clear(); + } + } + } From 62262ae17eba507ecc066c94f88da5fdbf5bdb86 Mon Sep 17 00:00:00 2001 From: Huiyong Wang Date: Thu, 23 May 2024 18:04:48 +0800 Subject: [PATCH 07/13] add client options in RedisLettuceClient which are consistent with our application --- .../java/site/ycsb/db/RedisLettuceClient.java | 53 ++++++++++++++++++- 1 file changed, 51 insertions(+), 2 deletions(-) diff --git a/redislettuce/src/main/java/site/ycsb/db/RedisLettuceClient.java b/redislettuce/src/main/java/site/ycsb/db/RedisLettuceClient.java index 01594422af..f0cf9132a6 100644 --- a/redislettuce/src/main/java/site/ycsb/db/RedisLettuceClient.java +++ b/redislettuce/src/main/java/site/ycsb/db/RedisLettuceClient.java @@ -1,11 +1,13 @@ package site.ycsb.db; +import java.time.Duration; import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.ArrayList; import java.util.Properties; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import site.ycsb.ByteIterator; import site.ycsb.DB; @@ -15,6 +17,8 @@ import io.lettuce.core.api.StatefulConnection; import io.lettuce.core.AbstractRedisClient; +import io.lettuce.core.ClientOptions; +import io.lettuce.core.cluster.ClusterClientOptions; import io.lettuce.core.cluster.RedisClusterClient; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; import io.lettuce.core.cluster.api.sync.RedisClusterCommands; @@ -22,6 +26,7 @@ import io.lettuce.core.masterreplica.MasterReplica; import io.lettuce.core.masterreplica.StatefulRedisMasterReplicaConnection; import io.lettuce.core.resource.DefaultClientResources; +import io.lettuce.core.resource.Delay; import io.lettuce.core.resource.DirContextDnsResolver; import io.lettuce.core.KeyValue; import io.lettuce.core.Limit; @@ -29,6 +34,8 @@ import io.lettuce.core.ReadFrom; import io.lettuce.core.RedisClient; import io.lettuce.core.RedisURI; +import io.lettuce.core.SocketOptions; +import io.lettuce.core.TimeoutOptions; import java.util.HashMap; import java.util.Map; @@ -49,11 +56,18 @@ public class RedisLettuceClient extends DB { public static final String TIMEOUT_PROPERTY = "redis.timeout"; public static final String SSL_PROPERTY = "redis.ssl"; - public static final String CONNECTION_PROPERTY = "redis.connection"; // connection mode + public static final String CONNECTION_PROPERTY = "redis.con"; // connection mode public static final String CONNECTION_SINGLE = "single"; // single connection for all threads public static final String CONNECTION_MULTIPLE = "multi"; // multiple connections for all threads public static final String MULTI_SIZE_PROPERTY = "multi.size"; // connections amount for multi connection model + // default Redis Settings which are consistent with our application + private static final long DEFAULT_CONNECT_TIMEOUT_SECONDS = 10; // 10 seconds + private static final int DEFAULT_REQUEST_QUEUE_SIZE = 65536; // 2^16 default is 2^31-1 + private static final int DEFAULT_RECONNECT_DELAY_SECONDS = 1; +// private static final long DEFAULT_TOPOLOGY_REFRESH_PERIOD = 1 * 60 * 60 * 1000L; + private static final long DEFAULT_COMMAND_TIMEOUT_MILLIS = 100L; // 100ms + enum ConnectionMode { SINGLE, MULTIPLE } @@ -76,6 +90,7 @@ enum ConnectionMode { static RedisClusterClient createClusterClient(String host, int port, boolean enableSSL) { DefaultClientResources resources = DefaultClientResources.builder() .dnsResolver(new DirContextDnsResolver()) + .reconnectDelay(Delay.constant(DEFAULT_RECONNECT_DELAY_SECONDS, TimeUnit.SECONDS)) .build(); RedisURI primaryNode = RedisURI.builder() .withSsl(enableSSL) @@ -84,6 +99,23 @@ static RedisClusterClient createClusterClient(String host, int port, boolean ena .build(); RedisClusterClient clusterClient = RedisClusterClient.create(resources, primaryNode); + + ClusterClientOptions clientOptions = ClusterClientOptions.builder() + .requestQueueSize(DEFAULT_REQUEST_QUEUE_SIZE) + .socketOptions( + SocketOptions.builder() + .connectTimeout(DEFAULT_CONNECT_TIMEOUT_SECONDS, TimeUnit.SECONDS) + .build() + ) + .timeoutOptions( + TimeoutOptions.builder() + .timeoutCommands(true) + .fixedTimeout(Duration.ofMillis(DEFAULT_COMMAND_TIMEOUT_MILLIS)) + .build() + ) + .build(); + clusterClient.setOptions(clientOptions); + return clusterClient; } @@ -108,6 +140,23 @@ static RedisClient createClient() { .dnsResolver(new DirContextDnsResolver()) .build(); RedisClient redisClient = RedisClient.create(resources); + + ClientOptions clientOptions = ClientOptions.builder() + .requestQueueSize(DEFAULT_REQUEST_QUEUE_SIZE) + .socketOptions( + SocketOptions.builder() + .connectTimeout(DEFAULT_CONNECT_TIMEOUT_SECONDS, TimeUnit.SECONDS) + .build() + ) + .timeoutOptions( + TimeoutOptions.builder() + .timeoutCommands(true) + .fixedTimeout(Duration.ofMillis(DEFAULT_COMMAND_TIMEOUT_MILLIS)) + .build() + ) + .build(); + redisClient.setOptions(clientOptions); + return redisClient; } @@ -176,7 +225,7 @@ private void setupConnection() throws DBException { } ConnectionMode connectionMode; - String connectionModeString = props.getProperty(CONNECTION_PROPERTY); + String connectionModeString = props.getProperty(CONNECTION_PROPERTY, CONNECTION_SINGLE); if (CONNECTION_SINGLE.equals(connectionModeString)) { connectionMode = ConnectionMode.SINGLE; } else if (CONNECTION_MULTIPLE.equals(connectionModeString)) { From 43930ab202bc7aeb56209d973bff38ae522dca49 Mon Sep 17 00:00:00 2001 From: Huiyong Wang Date: Mon, 27 May 2024 17:59:15 +0800 Subject: [PATCH 08/13] update timeout related code for redislettuce --- .../java/site/ycsb/db/RedisLettuceClient.java | 40 +++++++++++++------ 1 file changed, 28 insertions(+), 12 deletions(-) diff --git a/redislettuce/src/main/java/site/ycsb/db/RedisLettuceClient.java b/redislettuce/src/main/java/site/ycsb/db/RedisLettuceClient.java index f0cf9132a6..57b4d7d015 100644 --- a/redislettuce/src/main/java/site/ycsb/db/RedisLettuceClient.java +++ b/redislettuce/src/main/java/site/ycsb/db/RedisLettuceClient.java @@ -19,6 +19,7 @@ import io.lettuce.core.AbstractRedisClient; import io.lettuce.core.ClientOptions; import io.lettuce.core.cluster.ClusterClientOptions; +import io.lettuce.core.cluster.ClusterTopologyRefreshOptions; import io.lettuce.core.cluster.RedisClusterClient; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; import io.lettuce.core.cluster.api.sync.RedisClusterCommands; @@ -54,6 +55,8 @@ public class RedisLettuceClient extends DB { public static final String CLUSTER_PROPERTY = "redis.cluster"; public static final String READ_FROM = "redis.readfrom"; // replica_preferred, replica, master_preferred, master public static final String TIMEOUT_PROPERTY = "redis.timeout"; + public static final String CONNECTION_TIMEOUT_PROPERTY = "redis.con.timeout"; + public static final String COMMAND_TIMEOUT_PROPERTY = "redis.cmd.timeout"; public static final String SSL_PROPERTY = "redis.ssl"; public static final String CONNECTION_PROPERTY = "redis.con"; // connection mode @@ -61,12 +64,13 @@ public class RedisLettuceClient extends DB { public static final String CONNECTION_MULTIPLE = "multi"; // multiple connections for all threads public static final String MULTI_SIZE_PROPERTY = "multi.size"; // connections amount for multi connection model - // default Redis Settings which are consistent with our application - private static final long DEFAULT_CONNECT_TIMEOUT_SECONDS = 10; // 10 seconds + // default Redis Settings + private static final long DEFAULT_TIMEOUT_MILLIS = 2000L; // 2 seconds +// private static final long DEFAULT_CONNECT_TIMEOUT_SECONDS = 2; // 2 seconds private static final int DEFAULT_REQUEST_QUEUE_SIZE = 65536; // 2^16 default is 2^31-1 private static final int DEFAULT_RECONNECT_DELAY_SECONDS = 1; -// private static final long DEFAULT_TOPOLOGY_REFRESH_PERIOD = 1 * 60 * 60 * 1000L; - private static final long DEFAULT_COMMAND_TIMEOUT_MILLIS = 100L; // 100ms + private static final int DEFAULT_TOPOLOGY_REFRESH_PERIOD_MINUTES = 60; // 1 hour +// private static final long DEFAULT_COMMAND_TIMEOUT_MILLIS = 2000L; // 2 seconds enum ConnectionMode { SINGLE, MULTIPLE @@ -87,7 +91,8 @@ enum ConnectionMode { * @param enableSSL * @return */ - static RedisClusterClient createClusterClient(String host, int port, boolean enableSSL) { + static RedisClusterClient createClusterClient(String host, int port, boolean enableSSL, + long connectTimeoutMillis, long commandTimeoutMillis) { DefaultClientResources resources = DefaultClientResources.builder() .dnsResolver(new DirContextDnsResolver()) .reconnectDelay(Delay.constant(DEFAULT_RECONNECT_DELAY_SECONDS, TimeUnit.SECONDS)) @@ -100,19 +105,25 @@ static RedisClusterClient createClusterClient(String host, int port, boolean ena RedisClusterClient clusterClient = RedisClusterClient.create(resources, primaryNode); + ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder() + .enableAllAdaptiveRefreshTriggers() + .enablePeriodicRefresh(Duration.ofMinutes(DEFAULT_TOPOLOGY_REFRESH_PERIOD_MINUTES)) + .build(); + ClusterClientOptions clientOptions = ClusterClientOptions.builder() .requestQueueSize(DEFAULT_REQUEST_QUEUE_SIZE) .socketOptions( SocketOptions.builder() - .connectTimeout(DEFAULT_CONNECT_TIMEOUT_SECONDS, TimeUnit.SECONDS) + .connectTimeout(Duration.ofMillis(connectTimeoutMillis)) .build() ) .timeoutOptions( TimeoutOptions.builder() .timeoutCommands(true) - .fixedTimeout(Duration.ofMillis(DEFAULT_COMMAND_TIMEOUT_MILLIS)) + .fixedTimeout(Duration.ofMillis(commandTimeoutMillis)) .build() ) + .topologyRefreshOptions(topologyRefreshOptions) .build(); clusterClient.setOptions(clientOptions); @@ -135,7 +146,7 @@ static StatefulRedisClusterConnection createConnection( * @param enableSSL * @return */ - static RedisClient createClient() { + static RedisClient createClient(long connectTimeoutMillis, long commandTimeoutMillis) { DefaultClientResources resources = DefaultClientResources.builder() .dnsResolver(new DirContextDnsResolver()) .build(); @@ -145,13 +156,13 @@ static RedisClient createClient() { .requestQueueSize(DEFAULT_REQUEST_QUEUE_SIZE) .socketOptions( SocketOptions.builder() - .connectTimeout(DEFAULT_CONNECT_TIMEOUT_SECONDS, TimeUnit.SECONDS) + .connectTimeout(Duration.ofMillis(connectTimeoutMillis)) .build() ) .timeoutOptions( TimeoutOptions.builder() .timeoutCommands(true) - .fixedTimeout(Duration.ofMillis(DEFAULT_COMMAND_TIMEOUT_MILLIS)) + .fixedTimeout(Duration.ofMillis(commandTimeoutMillis)) .build() ) .build(); @@ -208,6 +219,11 @@ private void setupConnection() throws DBException { boolean clusterEnabled = Boolean.parseBoolean(props.getProperty(CLUSTER_PROPERTY)); boolean sslEnabled = Boolean.parseBoolean(props.getProperty(SSL_PROPERTY, "false")); + String timeoutString = props.getProperty(TIMEOUT_PROPERTY, Long.toString(DEFAULT_TIMEOUT_MILLIS)); + long timeout = Long.parseLong(timeoutString); + long conTimeout = Long.parseLong(props.getProperty(CONNECTION_TIMEOUT_PROPERTY, Long.toString(timeout))); + long cmdTimeout = Long.parseLong(props.getProperty(COMMAND_TIMEOUT_PROPERTY, Long.toString(timeout))); + ReadFrom readFrom = null; String readFromString = props.getProperty(READ_FROM); if (readFromString != null) { @@ -235,7 +251,7 @@ private void setupConnection() throws DBException { } if (clusterEnabled) { - RedisClusterClient clusterClient = createClusterClient(host, port, sslEnabled); + RedisClusterClient clusterClient = createClusterClient(host, port, sslEnabled, conTimeout, cmdTimeout); ConnectionProvider connectionProvider = null; if (connectionMode == ConnectionMode.SINGLE) { connectionProvider = new SingleConnectionProvider(clusterClient, readFrom); @@ -250,7 +266,7 @@ private void setupConnection() throws DBException { isCluster = true; } else { List hosts = Arrays.asList(host.split(",")); - RedisClient redisClient = createClient(); + RedisClient redisClient = createClient(conTimeout, cmdTimeout); List nodes = getNodes(hosts, port, sslEnabled); ConnectionProvider connectionProvider = null; From 79a3c1a082686e61b2903afefce704ebcaa8affc Mon Sep 17 00:00:00 2001 From: Huiyong Wang Date: Mon, 27 May 2024 18:15:17 +0800 Subject: [PATCH 09/13] add TIMEOUT Status, and catch RedisCommandTimeoutException in redislettuce --- core/src/main/java/site/ycsb/Status.java | 1 + .../java/site/ycsb/db/RedisLettuceClient.java | 83 ++++++++++++------- 2 files changed, 53 insertions(+), 31 deletions(-) diff --git a/core/src/main/java/site/ycsb/Status.java b/core/src/main/java/site/ycsb/Status.java index 8a27cc9334..88b8a30457 100644 --- a/core/src/main/java/site/ycsb/Status.java +++ b/core/src/main/java/site/ycsb/Status.java @@ -95,6 +95,7 @@ public boolean isOk() { public static final Status OK = new Status("OK", "The operation completed successfully."); public static final Status ERROR = new Status("ERROR", "The operation failed."); + public static final Status TIMEOUT = new Status("TIMEOUT", "The operation timeout"); public static final Status NOT_FOUND = new Status("NOT_FOUND", "The requested record was not found."); public static final Status NOT_IMPLEMENTED = new Status("NOT_IMPLEMENTED", "The operation is not " + "implemented for the current binding."); diff --git a/redislettuce/src/main/java/site/ycsb/db/RedisLettuceClient.java b/redislettuce/src/main/java/site/ycsb/db/RedisLettuceClient.java index 57b4d7d015..d797c9a8b1 100644 --- a/redislettuce/src/main/java/site/ycsb/db/RedisLettuceClient.java +++ b/redislettuce/src/main/java/site/ycsb/db/RedisLettuceClient.java @@ -34,6 +34,7 @@ import io.lettuce.core.Range; import io.lettuce.core.ReadFrom; import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisCommandTimeoutException; import io.lettuce.core.RedisURI; import io.lettuce.core.SocketOptions; import io.lettuce.core.TimeoutOptions; @@ -358,21 +359,25 @@ public void cleanup() throws DBException { @Override public Status read(String table, String key, Set fields, Map result) { RedisClusterCommands cmds = getRedisClusterCommands(); - if (fields == null) { - StringByteIterator.putAllAsByteIterators(result, cmds.hgetall(key)); - } else { - String[] fieldArray = (String[]) fields.toArray(new String[fields.size()]); - List> values = cmds.hmget(key, fieldArray); + try { + if (fields == null) { + StringByteIterator.putAllAsByteIterators(result, cmds.hgetall(key)); + } else { + String[] fieldArray = (String[]) fields.toArray(new String[fields.size()]); + List> values = cmds.hmget(key, fieldArray); - Iterator> fieldValueIterator = values.iterator(); + Iterator> fieldValueIterator = values.iterator(); - while (fieldValueIterator.hasNext()) { - KeyValue fieldValue = fieldValueIterator.next(); - result.put(fieldValue.getKey(), - new StringByteIterator(fieldValue.getValue())); + while (fieldValueIterator.hasNext()) { + KeyValue fieldValue = fieldValueIterator.next(); + result.put(fieldValue.getKey(), + new StringByteIterator(fieldValue.getValue())); + } } + return result.isEmpty() ? Status.ERROR : Status.OK; + } catch (RedisCommandTimeoutException e) { + return Status.TIMEOUT; } - return result.isEmpty() ? Status.ERROR : Status.OK; } private double hash(String key) { @@ -394,18 +399,22 @@ private double hash(String key) { public Status scan(String table, String startkey, int recordcount, Set fields, Vector> result) { RedisClusterCommands cmds = getRedisClusterCommands(); - List keys = cmds.zrangebyscore(INDEX_KEY, - Range.from(Range.Boundary.excluding(hash(startkey)), Range.Boundary.excluding(Double.POSITIVE_INFINITY)), - Limit.from(recordcount)); - - HashMap values; - for (String key : keys) { - values = new HashMap(); - read(table, key, fields, values); - result.add(values); - } + try { + List keys = cmds.zrangebyscore(INDEX_KEY, + Range.from(Range.Boundary.excluding(hash(startkey)), Range.Boundary.excluding(Double.POSITIVE_INFINITY)), + Limit.from(recordcount)); + + HashMap values; + for (String key : keys) { + values = new HashMap(); + read(table, key, fields, values); + result.add(values); + } - return Status.OK; + return Status.OK; + } catch (RedisCommandTimeoutException e) { + return Status.TIMEOUT; + } } /** @@ -420,8 +429,12 @@ public Status scan(String table, String startkey, int recordcount, Set f @Override public Status update(String table, String key, Map values) { RedisClusterCommands cmds = getRedisClusterCommands(); - return cmds.hmset(key, StringByteIterator.getStringMap(values)) - .equals("OK") ? Status.OK : Status.ERROR; + try { + return cmds.hmset(key, StringByteIterator.getStringMap(values)) + .equals("OK") ? Status.OK : Status.ERROR; + } catch (RedisCommandTimeoutException e) { + return Status.TIMEOUT; + } } /** @@ -436,12 +449,16 @@ public Status update(String table, String key, Map values) @Override public Status insert(String table, String key, Map values) { RedisClusterCommands cmds = getRedisClusterCommands(); - if (cmds.hmset(key, StringByteIterator.getStringMap(values)) - .equals("OK")) { - cmds.zadd(INDEX_KEY, hash(key), key); - return Status.OK; + try { + if (cmds.hmset(key, StringByteIterator.getStringMap(values)) + .equals("OK")) { + cmds.zadd(INDEX_KEY, hash(key), key); + return Status.OK; + } + return Status.ERROR; + } catch (RedisCommandTimeoutException e) { + return Status.TIMEOUT; } - return Status.ERROR; } /** @@ -454,8 +471,12 @@ public Status insert(String table, String key, Map values) @Override public Status delete(String table, String key) { RedisClusterCommands cmds = getRedisClusterCommands(); - return cmds.del(key) == 0 && cmds.zrem(INDEX_KEY, key) == 0 ? Status.ERROR - : Status.OK; + try { + return cmds.del(key) == 0 && cmds.zrem(INDEX_KEY, key) == 0 ? Status.ERROR + : Status.OK; + } catch (RedisCommandTimeoutException e) { + return Status.TIMEOUT; + } } private interface ConnectionProvider extends AutoCloseable { From 818f7f0b59f20ff5767bd573aeeb1214d7f4aba7 Mon Sep 17 00:00:00 2001 From: Huiyong Wang Date: Mon, 27 May 2024 21:24:55 +0800 Subject: [PATCH 10/13] add pooling connection mode for redislettuce --- pom.xml | 1 + redislettuce/pom.xml | 5 + .../java/site/ycsb/db/RedisLettuceClient.java | 222 ++++++++++++------ 3 files changed, 161 insertions(+), 67 deletions(-) diff --git a/pom.xml b/pom.xml index 3fa040cce8..ee40ced5e1 100644 --- a/pom.xml +++ b/pom.xml @@ -140,6 +140,7 @@ LICENSE file. UTF-8 5.1.2 6.2.4.RELEASE + 2.11.1 2.0.5 6.2.2 1.10.20 diff --git a/redislettuce/pom.xml b/redislettuce/pom.xml index 51f5609ad3..4b38a5774b 100644 --- a/redislettuce/pom.xml +++ b/redislettuce/pom.xml @@ -35,6 +35,11 @@ LICENSE file. lettuce-core ${redis.lettuce.version} + + org.apache.commons + commons-pool2 + ${commons.pool2.version} + site.ycsb core diff --git a/redislettuce/src/main/java/site/ycsb/db/RedisLettuceClient.java b/redislettuce/src/main/java/site/ycsb/db/RedisLettuceClient.java index d797c9a8b1..2511c1942d 100644 --- a/redislettuce/src/main/java/site/ycsb/db/RedisLettuceClient.java +++ b/redislettuce/src/main/java/site/ycsb/db/RedisLettuceClient.java @@ -1,5 +1,6 @@ package site.ycsb.db; +import io.lettuce.core.support.ConnectionPoolSupport; import java.time.Duration; import java.util.Arrays; import java.util.Iterator; @@ -9,6 +10,9 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import org.apache.commons.pool2.impl.GenericObjectPool; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import site.ycsb.ByteIterator; import site.ycsb.DB; import site.ycsb.DBException; @@ -63,7 +67,9 @@ public class RedisLettuceClient extends DB { public static final String CONNECTION_PROPERTY = "redis.con"; // connection mode public static final String CONNECTION_SINGLE = "single"; // single connection for all threads public static final String CONNECTION_MULTIPLE = "multi"; // multiple connections for all threads - public static final String MULTI_SIZE_PROPERTY = "multi.size"; // connections amount for multi connection model + public static final String MULTI_SIZE_PROPERTY = "multi.size"; // connections amount for multi connection mode + public static final String CONNECTION_POOLING = "pool"; // connection pool for all threads + public static final String POOL_SIZE_PROPERTY = "pool.size"; // connection pool size for pooling mode // default Redis Settings private static final long DEFAULT_TIMEOUT_MILLIS = 2000L; // 2 seconds @@ -74,7 +80,7 @@ public class RedisLettuceClient extends DB { // private static final long DEFAULT_COMMAND_TIMEOUT_MILLIS = 2000L; // 2 seconds enum ConnectionMode { - SINGLE, MULTIPLE + SINGLE, MULTIPLE, POOLING } public static final String INDEX_KEY = "_indices"; @@ -145,7 +151,6 @@ static StatefulRedisClusterConnection createConnection( * @param host * @param port * @param enableSSL - * @return */ static RedisClient createClient(long connectTimeoutMillis, long commandTimeoutMillis) { DefaultClientResources resources = DefaultClientResources.builder() @@ -247,6 +252,8 @@ private void setupConnection() throws DBException { connectionMode = ConnectionMode.SINGLE; } else if (CONNECTION_MULTIPLE.equals(connectionModeString)) { connectionMode = ConnectionMode.MULTIPLE; + } else if (CONNECTION_POOLING.equals(connectionModeString)) { + connectionMode = ConnectionMode.POOLING; } else { throw new DBException("unknown connectionMode: " + connectionModeString); } @@ -260,6 +267,10 @@ private void setupConnection() throws DBException { int processors = Runtime.getRuntime().availableProcessors(); int amount = Integer.parseInt(props.getProperty(MULTI_SIZE_PROPERTY, Integer.toString(processors))); connectionProvider = new MultipleConnectionsProvider(clusterClient, readFrom, amount); + } else if (connectionMode == ConnectionMode.POOLING) { + int processors = Runtime.getRuntime().availableProcessors(); + int poolSize = Integer.parseInt(props.getProperty(POOL_SIZE_PROPERTY, Integer.toString(processors))); + connectionProvider = new PoolingConnectionsProvider(clusterClient, readFrom, poolSize); } client = clusterClient; @@ -277,6 +288,10 @@ private void setupConnection() throws DBException { int processors = Runtime.getRuntime().availableProcessors(); int amount = Integer.parseInt(props.getProperty(MULTI_SIZE_PROPERTY, Integer.toString(processors))); connectionProvider = new MultipleConnectionsProvider(redisClient, nodes, readFrom, amount); + } else if (connectionMode == ConnectionMode.POOLING) { + int processors = Runtime.getRuntime().availableProcessors(); + int poolSize = Integer.parseInt(props.getProperty(POOL_SIZE_PROPERTY, Integer.toString(processors))); + connectionProvider = new PoolingConnectionsProvider(redisClient, nodes, readFrom, poolSize); } client = redisClient; @@ -301,8 +316,8 @@ private void shutdownConnection() throws DBException { } } - private RedisClusterCommands getRedisClusterCommands() { - StatefulConnection stringConnection = stringConnectionProvider.getConnection(); + private RedisClusterCommands getRedisClusterCommands( + StatefulConnection stringConnection) { RedisClusterCommands cmds = null; if (stringConnection != null) { if (isCluster) { @@ -314,6 +329,22 @@ private RedisClusterCommands getRedisClusterCommands() { return cmds; } + private Status withRedisClusterCommands(Function function) { + try { + StatefulConnection stringConnection = stringConnectionProvider.getConnection(); + try { + RedisClusterCommands cmds = getRedisClusterCommands(stringConnection); + return function.apply(cmds); + } catch (RedisCommandTimeoutException e) { + return Status.TIMEOUT; + } finally { + stringConnectionProvider.returnConnection(stringConnection); + } + } catch (Exception e) { // getConnection exception + return Status.ERROR; + } + } + /** * Initialize any state for this DB. * Called once per DB instance; there is one DB instance per client thread. @@ -358,26 +389,24 @@ public void cleanup() throws DBException { */ @Override public Status read(String table, String key, Set fields, Map result) { - RedisClusterCommands cmds = getRedisClusterCommands(); - try { - if (fields == null) { - StringByteIterator.putAllAsByteIterators(result, cmds.hgetall(key)); - } else { - String[] fieldArray = (String[]) fields.toArray(new String[fields.size()]); - List> values = cmds.hmget(key, fieldArray); - - Iterator> fieldValueIterator = values.iterator(); - - while (fieldValueIterator.hasNext()) { - KeyValue fieldValue = fieldValueIterator.next(); - result.put(fieldValue.getKey(), - new StringByteIterator(fieldValue.getValue())); + return withRedisClusterCommands((cmds) -> { + if (fields == null) { + StringByteIterator.putAllAsByteIterators(result, cmds.hgetall(key)); + } else { + String[] fieldArray = (String[]) fields.toArray(new String[fields.size()]); + List> values = cmds.hmget(key, fieldArray); + + Iterator> fieldValueIterator = values.iterator(); + + while (fieldValueIterator.hasNext()) { + KeyValue fieldValue = fieldValueIterator.next(); + result.put(fieldValue.getKey(), + new StringByteIterator(fieldValue.getValue())); + } } + return result.isEmpty() ? Status.ERROR : Status.OK; } - return result.isEmpty() ? Status.ERROR : Status.OK; - } catch (RedisCommandTimeoutException e) { - return Status.TIMEOUT; - } + ); } private double hash(String key) { @@ -398,23 +427,21 @@ private double hash(String key) { @Override public Status scan(String table, String startkey, int recordcount, Set fields, Vector> result) { - RedisClusterCommands cmds = getRedisClusterCommands(); - try { - List keys = cmds.zrangebyscore(INDEX_KEY, - Range.from(Range.Boundary.excluding(hash(startkey)), Range.Boundary.excluding(Double.POSITIVE_INFINITY)), - Limit.from(recordcount)); - - HashMap values; - for (String key : keys) { - values = new HashMap(); - read(table, key, fields, values); - result.add(values); - } + return withRedisClusterCommands((cmds) -> { + List keys = cmds.zrangebyscore(INDEX_KEY, + Range.from(Range.Boundary.excluding(hash(startkey)), Range.Boundary.excluding(Double.POSITIVE_INFINITY)), + Limit.from(recordcount)); + + HashMap values; + for (String key : keys) { + values = new HashMap(); + read(table, key, fields, values); + result.add(values); + } - return Status.OK; - } catch (RedisCommandTimeoutException e) { - return Status.TIMEOUT; - } + return Status.OK; + } + ); } /** @@ -428,13 +455,11 @@ public Status scan(String table, String startkey, int recordcount, Set f */ @Override public Status update(String table, String key, Map values) { - RedisClusterCommands cmds = getRedisClusterCommands(); - try { - return cmds.hmset(key, StringByteIterator.getStringMap(values)) - .equals("OK") ? Status.OK : Status.ERROR; - } catch (RedisCommandTimeoutException e) { - return Status.TIMEOUT; - } + return withRedisClusterCommands((cmds) -> { + return cmds.hmset(key, StringByteIterator.getStringMap(values)) + .equals("OK") ? Status.OK : Status.ERROR; + } + ); } /** @@ -448,17 +473,15 @@ public Status update(String table, String key, Map values) */ @Override public Status insert(String table, String key, Map values) { - RedisClusterCommands cmds = getRedisClusterCommands(); - try { - if (cmds.hmset(key, StringByteIterator.getStringMap(values)) - .equals("OK")) { - cmds.zadd(INDEX_KEY, hash(key), key); - return Status.OK; + return withRedisClusterCommands((cmds) -> { + if (cmds.hmset(key, StringByteIterator.getStringMap(values)) + .equals("OK")) { + cmds.zadd(INDEX_KEY, hash(key), key); + return Status.OK; + } + return Status.ERROR; } - return Status.ERROR; - } catch (RedisCommandTimeoutException e) { - return Status.TIMEOUT; - } + ); } /** @@ -470,17 +493,16 @@ public Status insert(String table, String key, Map values) */ @Override public Status delete(String table, String key) { - RedisClusterCommands cmds = getRedisClusterCommands(); - try { - return cmds.del(key) == 0 && cmds.zrem(INDEX_KEY, key) == 0 ? Status.ERROR - : Status.OK; - } catch (RedisCommandTimeoutException e) { - return Status.TIMEOUT; - } + return withRedisClusterCommands((cmds) -> { + return cmds.del(key) == 0 && cmds.zrem(INDEX_KEY, key) == 0 ? Status.ERROR + : Status.OK; + } + ); } private interface ConnectionProvider extends AutoCloseable { - StatefulConnection getConnection(); + StatefulConnection getConnection() throws Exception; + void returnConnection(StatefulConnection connection); } private static class SingleConnectionProvider implements ConnectionProvider { @@ -496,13 +518,21 @@ public SingleConnectionProvider(RedisClient redisClient, List nodes, R } @Override - public StatefulConnection getConnection() { + public StatefulConnection getConnection() throws Exception { return stringConnection; } + @Override + public void returnConnection(StatefulConnection connection) { + // do nothing + } + @Override public void close() throws Exception { - stringConnection.close(); + if (stringConnection != null) { + stringConnection.close(); + stringConnection = null; + } } } @@ -523,11 +553,16 @@ public MultipleConnectionsProvider(RedisClient redisClient, List nodes } @Override - public StatefulConnection getConnection() { + public StatefulConnection getConnection() throws Exception { ThreadLocalRandom random = ThreadLocalRandom.current(); return this.stringConnections.get(random.nextInt(0, this.stringConnections.size())); } + @Override + public void returnConnection(StatefulConnection connection) { + // do nothing + } + @Override public void close() throws Exception { for (StatefulConnection stringConnection: stringConnections) { @@ -537,4 +572,57 @@ public void close() throws Exception { } } + private static class PoolingConnectionsProvider implements ConnectionProvider { + + private GenericObjectPool> clusterConnectionPool = null; + private GenericObjectPool> masterReplicaConnectionPool = null; + private boolean isCluster = false; + + + public PoolingConnectionsProvider(RedisClusterClient clusterClient, ReadFrom readFrom, int poolSize) { + GenericObjectPoolConfig> poolConfig = + new GenericObjectPoolConfig>(); + poolConfig.setMaxTotal(poolSize); + poolConfig.setMaxIdle(poolSize); + clusterConnectionPool = ConnectionPoolSupport.createGenericObjectPool( + () -> { return createConnection(clusterClient, readFrom); }, poolConfig); + isCluster = true; + } + + public PoolingConnectionsProvider(RedisClient redisClient, List nodes, ReadFrom readFrom, int poolSize) { + GenericObjectPoolConfig> poolConfig = + new GenericObjectPoolConfig>(); + poolConfig.setMaxTotal(poolSize); + poolConfig.setMaxIdle(poolSize); + masterReplicaConnectionPool = ConnectionPoolSupport.createGenericObjectPool( + () -> { return createConnection(redisClient, nodes, readFrom); }, poolConfig); + } + + @Override + public StatefulConnection getConnection() throws Exception { + return isCluster ? clusterConnectionPool.borrowObject() : masterReplicaConnectionPool.borrowObject(); + } + + @Override + public void returnConnection(StatefulConnection connection) { + if (isCluster) { + clusterConnectionPool.returnObject((StatefulRedisClusterConnection)connection); + } else { + masterReplicaConnectionPool.returnObject((StatefulRedisMasterReplicaConnection)connection); + } + } + + @Override + public void close() throws Exception { + if (clusterConnectionPool != null) { + clusterConnectionPool.close(); + clusterConnectionPool = null; + } + if (masterReplicaConnectionPool != null) { + masterReplicaConnectionPool.close(); + masterReplicaConnectionPool = null; + } + } + } + } From b093607695a6638d611e29116dbcd4ee18e6cc4c Mon Sep 17 00:00:00 2001 From: Huiyong Wang Date: Thu, 13 Jun 2024 14:09:06 +0800 Subject: [PATCH 11/13] update lettuce configuration for redis cluster --- .../java/site/ycsb/db/RedisLettuceClient.java | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/redislettuce/src/main/java/site/ycsb/db/RedisLettuceClient.java b/redislettuce/src/main/java/site/ycsb/db/RedisLettuceClient.java index 2511c1942d..33b70c8a73 100644 --- a/redislettuce/src/main/java/site/ycsb/db/RedisLettuceClient.java +++ b/redislettuce/src/main/java/site/ycsb/db/RedisLettuceClient.java @@ -27,6 +27,7 @@ import io.lettuce.core.cluster.RedisClusterClient; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; import io.lettuce.core.cluster.api.sync.RedisClusterCommands; +import io.lettuce.core.cluster.models.partitions.RedisClusterNode; import io.lettuce.core.codec.StringCodec; import io.lettuce.core.masterreplica.MasterReplica; import io.lettuce.core.masterreplica.StatefulRedisMasterReplicaConnection; @@ -102,7 +103,9 @@ static RedisClusterClient createClusterClient(String host, int port, boolean ena long connectTimeoutMillis, long commandTimeoutMillis) { DefaultClientResources resources = DefaultClientResources.builder() .dnsResolver(new DirContextDnsResolver()) - .reconnectDelay(Delay.constant(DEFAULT_RECONNECT_DELAY_SECONDS, TimeUnit.SECONDS)) + .reconnectDelay( + Delay.fullJitter(Duration.ofMillis(100), Duration.ofSeconds(10), 100, TimeUnit.MILLISECONDS) + ) .build(); RedisURI primaryNode = RedisURI.builder() .withSsl(enableSSL) @@ -114,7 +117,7 @@ static RedisClusterClient createClusterClient(String host, int port, boolean ena ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder() .enableAllAdaptiveRefreshTriggers() - .enablePeriodicRefresh(Duration.ofMinutes(DEFAULT_TOPOLOGY_REFRESH_PERIOD_MINUTES)) + .enablePeriodicRefresh() // Duration.ofMinutes(DEFAULT_TOPOLOGY_REFRESH_PERIOD_MINUTES)) .build(); ClusterClientOptions clientOptions = ClusterClientOptions.builder() @@ -122,6 +125,7 @@ static RedisClusterClient createClusterClient(String host, int port, boolean ena .socketOptions( SocketOptions.builder() .connectTimeout(Duration.ofMillis(connectTimeoutMillis)) + .keepAlive(true) .build() ) .timeoutOptions( @@ -131,6 +135,13 @@ static RedisClusterClient createClusterClient(String host, int port, boolean ena .build() ) .topologyRefreshOptions(topologyRefreshOptions) + .nodeFilter(it -> + !(it.is(RedisClusterNode.NodeFlag.FAIL) + || it.is(RedisClusterNode.NodeFlag.EVENTUAL_FAIL) + || it.is(RedisClusterNode.NodeFlag.HANDSHAKE) + || it.is(RedisClusterNode.NodeFlag.NOADDR)) + ) + .validateClusterNodeMembership(false) .build(); clusterClient.setOptions(clientOptions); From e9c34c871ae9bf429966df9283394d446489fb82 Mon Sep 17 00:00:00 2001 From: Huiyong Wang Date: Fri, 14 Jun 2024 11:32:34 +0800 Subject: [PATCH 12/13] add more readFrom parameter support in redislettuce --- .../java/site/ycsb/db/RedisLettuceClient.java | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/redislettuce/src/main/java/site/ycsb/db/RedisLettuceClient.java b/redislettuce/src/main/java/site/ycsb/db/RedisLettuceClient.java index 33b70c8a73..d2d3398738 100644 --- a/redislettuce/src/main/java/site/ycsb/db/RedisLettuceClient.java +++ b/redislettuce/src/main/java/site/ycsb/db/RedisLettuceClient.java @@ -80,6 +80,17 @@ public class RedisLettuceClient extends DB { private static final int DEFAULT_TOPOLOGY_REFRESH_PERIOD_MINUTES = 60; // 1 hour // private static final long DEFAULT_COMMAND_TIMEOUT_MILLIS = 2000L; // 2 seconds + private static final Map READ_FROM_MAP = new HashMap(); + static { + READ_FROM_MAP.put("replica_preferred", ReadFrom.REPLICA_PREFERRED); + READ_FROM_MAP.put("replica", ReadFrom.REPLICA); + READ_FROM_MAP.put("any_replica", ReadFrom.ANY_REPLICA); + READ_FROM_MAP.put("master", ReadFrom.MASTER); + READ_FROM_MAP.put("master_preferred", ReadFrom.MASTER_PREFERRED); + READ_FROM_MAP.put("any", ReadFrom.ANY); + READ_FROM_MAP.put("lowest_latency", ReadFrom.LOWEST_LATENCY); + } + enum ConnectionMode { SINGLE, MULTIPLE, POOLING } @@ -244,16 +255,9 @@ private void setupConnection() throws DBException { ReadFrom readFrom = null; String readFromString = props.getProperty(READ_FROM); if (readFromString != null) { - if ("replica_preferred".equals(readFromString)) { - readFrom = ReadFrom.REPLICA_PREFERRED; - } else if ("master_preferred".equals(readFromString)) { - readFrom = ReadFrom.MASTER_PREFERRED; - } else if ("master".equals(readFromString)) { - readFrom = ReadFrom.MASTER; - } else if ("replica".equals(readFromString)) { - readFrom = ReadFrom.REPLICA; - } else { - throw new DBException("unknown readfrom: " + readFromString); + readFrom = READ_FROM_MAP.get(readFromString); + if (readFrom == null) { + throw new DBException("do not support readfrom: " + readFromString); } } From 9c71a5be2b69a43e821c3615d269b76a08aee283 Mon Sep 17 00:00:00 2001 From: Huiyong Wang Date: Fri, 14 Jun 2024 15:45:57 +0800 Subject: [PATCH 13/13] update cluster topology periodic refresh interval and add comments --- .../main/java/site/ycsb/db/RedisLettuceClient.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/redislettuce/src/main/java/site/ycsb/db/RedisLettuceClient.java b/redislettuce/src/main/java/site/ycsb/db/RedisLettuceClient.java index d2d3398738..db233afdc0 100644 --- a/redislettuce/src/main/java/site/ycsb/db/RedisLettuceClient.java +++ b/redislettuce/src/main/java/site/ycsb/db/RedisLettuceClient.java @@ -128,7 +128,17 @@ static RedisClusterClient createClusterClient(String host, int port, boolean ena ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder() .enableAllAdaptiveRefreshTriggers() - .enablePeriodicRefresh() // Duration.ofMinutes(DEFAULT_TOPOLOGY_REFRESH_PERIOD_MINUTES)) + // The topology periodic refresh interval impacts the behavior of reads if readFrom=replica_preferred. + // The default interval in ClusterTopologyRefreshOptions is one minute, and in common-redis lib + // we set it as one hour in default. + // For readFrom=replica_preferred, the read connection always uses the first replica node of the + // received topology data. But when the topology data is refreshed in the background, the order of + // replicas may change. + // Here we use one hour as the periodic refresh interval, which means within one hour the topology + // data does not change at all, so if the testing is within one hour, the reads always come from + // the same replica node for readFrom=replica_preferred even if we have 2 or more replicas. + // If we want to distribute read load evenly from all replica nodes, we should use readFrom=any_replica. + .enablePeriodicRefresh(Duration.ofMinutes(DEFAULT_TOPOLOGY_REFRESH_PERIOD_MINUTES)) .build(); ClusterClientOptions clientOptions = ClusterClientOptions.builder()