Redis笔记

1.Redis简介

Redis 是我们在互联网应用中使用最广泛的一个 NoSQL 数据库,基于 C 开发的键值对存储数据库,
Redis 这个名字是 Remote Dictionary Service 字母缩写。
很多人想到 Redis,就想到缓存。但实际上 Redis 除了缓存之外,还有许多更加丰富的使用场景。比如
分布式锁,限流。

特点:

  • 支持数据持久化
  • 支持多种不同的数据结构类型之间的映射
  • 支持主从模式的数据备份
  • 自带了发布订阅系统
  • 定时器、计数器

2.Redis安装

四种方式获取一个 Redis:

    1. 直接编译安装(推荐使用)
    1. 使用 Docker
    1. 也可以直接安装
    1. 还有一个在线体验的方式,通过在线体验,可以直接使用 Redis 的功能 http://try.redis.io

2.1 直接安装编译

准备gcc环境

1
yum install gcc-c++

下载并安装redis

1
2
3
4
5
wget http://download.redis.io/releases/redis-5.0.7.tar.gz
tar -zxvf redis-5.0.7.tar.gz
cd redis-5.0.7/
make
make install

安装完成,启动redis

1
redis-server redis.conf

2.2 通过 Docker 安装

Docker 安装好之后,启动 Docker ,直接运行安装命令即可:

1
docker run --name wangts-redis -d -p 6379:6379 redis --requirepass 123

Docker 上的 Redis 启动成功之后,可以从宿主机上连接(前提是宿主机上存在 redis-cli):

1
redis-cli -a 123

如果宿主机上没有安装 Redis,那么也可以进入到 Docker 容器种去操作 Redis:

1
docker exec -it wangts-redis redis-cli -a 123

2.3 直接安装

CENTOS:

1
yum install redis

Ubuntu:

1
apt-get install redis

Mac:

1
brew install redis

3. Redis 五种基本数据类型

3.1 Reids启动

首先,修改 redis.conf 配置文件:

配置完成后,保存退出,再次通过 redis-server redis.conf 命令启动 Redis,此时,就是在后台启
动了。

3.2 String

String 是 Redis 里边最最简单的一种数据结构。在 Redis 中,所以的 key 都是字符串,但是,不同的
key 对应的 value 则具备不同的数据结构,我们所说的五种不同的数据类型,主要是指 value 的数据类
型不同。

Redis 中的字符串是动态字符串,内部是可以修改的,像 Java 中的 StringBuffer,它采用分配冗余空间
的方式来减少内存的频繁分配。在 Redis 内部结构中,一般实际分配的内存会大于需要的内存,当字符
串小于 1M 的时候,扩容都是在现有的空间基础上加倍,扩容每次扩 1M 空间,最大 512M。

  • set
    set 就是给一个 key 赋值的。
  • append
    使用 append 命令时,如果 key 已经存在,则直接在对应的 value 后追加值,否则就创建新的键值对。
  • decr
    可以实现对 value 的减 1 操作(前提是 value 是一个数字),如果 value 不是数字,会报错,如果
    value 不存在,则会给一个默认的值为 0,在默认值的基础上减一。
  • decrby
    和 decr 类似,但是可以自己设置步长,该命令第二个参数就是步长。
  • get
    get 用来获取一个 key 的 value。
  • getrange
    getrange 可以用来返回 key 对应的 value 的子串,这有点类似于 Java 里边的 substring。这个命令第
    二个和第三个参数就是截取的起始和终止位置,其中,-1 表示最后一个字符串,-2 表示倒数第二个字符
    串,以此类推…
  • getset
    获取并更新某一个 key。
  • incr
    给某一个 key 的 value 自增。
  • incrby
    给某一个 key 的 value 自增,同时还可以设置步长。
  • incrbyfloat
    和 incrby 类似,但是自增的步长可以设置为浮点数。
  • mget 和 mset
    批量获取和批量存储
  • ttl
    查看 key 的有效期
  • setex
    在给 key 设置 value 的同时,还设置过期时间。
  • psetex
    和 setex 类似,只不过这里的时间单位是毫秒。
  • setnx
    默认情况下, set 命令会覆盖已经存在的 key,setnx 则不会。
  • msetnx
    批量设置。
  • setrange
    覆盖一个已经存在的 key 的value。
  • strlen
    查看字符串长度

3.2.1

BIT命令
在 Redis 中,字符串都是以二进制的方式来存储的。例如 set k1 a,a 对应的 ASCII 码是 97,97 转为
二进制是 01100001,BIT 相关的命令就是对二进制进行操作的。

  • getbit
    key 对应的 value 在 offset 处的 bit 值。
  • setbit
    修改 key 对应的 value 在 offset 处的 bit 值
  • bitcount
    统计二进制数据中 1 的个数

3.3 List

  • lpush
    将所有指定的值插入到存于 key 的列表的头部。如果 key 不存在,那么在进行 push 操作前会创建一个
    空列表。 如果 key 对应的值不是一个 list 的话,那么会返回一个错误。
  • lrange
    返回列表指定区间内的元素。
  • rpush
    向存于 key 的列表的尾部插入所有指定的值。
  • rpop
    移除并返回列表的尾元素。
  • lpop
    移除并返回列表的头元素。
  • lindex
    返回列表中,下标为 index 的元素。
  • ltrim
    ltrim 可以对一个列表进行修剪。
  • blpop
    阻塞式的弹出,相当于 lpop 的阻塞版

3.4 Set

  • sadd
    添加元素到一个 key 中
  • smembers
    获取一个 key 下的所有元素
  • srem
    移除指定的元素
  • sismemeber
    返回某一个成员是否在集合中
  • scard
    返回集合的数量
  • srandmember
    随机返回一个元素
  • spop
    随机返回并且出栈一个元素。
  • smove
    把一个元素从一个集合移到另一个集合中去。
  • sdiff
    返回两个集合的差集。
  • sinter
    返回两个集合的交集。
  • sdiffstore
    这个类似于 sdiff ,不同的是,计算出来的结果会保存在一个新的集合中。
  • sinterstore
    类似于 sinter,只是将计算出来的交集保存到一个新的集合中。
  • sunion
    求并集。
  • sunionstore
    求并集并且将结果保存到新的集合中。

3.5 Hash

在 hash 结构中,key 是一个字符串,value 则是一个 key/value 键值对。

  • hset
    添加值。
  • hget
    获取值
  • hmset
    批量设置
  • hmget
    批量获取
  • hdel
    删除一个指定的 field
  • hsetnx
    默认情况下,如果 key 和 field 相同,会覆盖掉已有的 value,hsetnx 则不会。
  • hvals
    获取所有的 value
  • hkeys
    获取所有的 key
  • hgetall
    同时获取所有的 key 和 value
  • hexists
    返回 field 是否存在
  • hincrby
    给指定的 value 自增
  • hincrbyfloat
    可以自增一个浮点数
  • hlen
    返回 某一个 key 中 value 的数量
  • hstrlen
    返回某一个 key 中的某一个 field 的字符串长度

3.6 Zset

  • zadd
    将指定的元素添加到有序集合中。
  • zscore
    返回 member 的 score 值
  • zrange
    返回集合中的一组元素。
  • zrevrange
    返回一组元素,但是是倒序。
  • zcard
    返回元素个数
  • zcount
    返回 score 在某一个区间内的元素。
  • zrangebyscore
    按照 score 的范围返回元素。
  • zrank
    返回元素的排名(从小到大
  • zrevrank
    返回元素排名(从大到小
  • zincrbyscore
    自增
  • zinterstore
    给两个集合求交集。
  • zrem
    弹出一个元素
  • zlexcount
    计算有序集合中成员数量
  • zrangebylex
    返回指定区间内的成员。

    3.7 key

  • del
    删除一个 key/value
  • dump
    序列化给定的 key
  • exists
    判断一个 key 是否存在
  • ttl
    查看一个 key 的有效期
  • expire
    给一个 key 设置有效期,如果 key 在过期之前被重新 set 了,则过期时间会失效。
  • persist
    移除一个 key 的过期时间
  • keys *
    查看所有的 key
  • pttl
    和 ttl 一样,只不过这里返回的是毫秒

4 Redis 的 Java 客户端

4.1 开启远程连接

Redis 默认是不支持远程连接的,需要手动开启。
一共修改两个地方:

  1. 注释掉 bind: 127.0.0.1
  2. 开启密码校验,去掉 requirepass 的注释
    改完之后,保存退出,启动 Redis

4.2 Jedis

4.2.1 基本使用

Jedis 的 GitHub 地址:https://github.com/xetorthio/jedis
首先创建一个普通的 Maven 项目。
项目创建成功后,添加Jedis 依赖:

1
2
3
4
5
6
7
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.2.0</version>
<type>jar</type>
<scope>compile</scope>
</dependency>

然后创建一个测试方法。

1
2
3
4
5
6
7
8
9
10
11
12
13

public class MyJedis {
public static void main(String[] args) {
//1.构造一个 Jedis 对象,因为这里使用的默认端口 6379,所以不用配置端口
Jedis jedis = new Jedis("192.168.91.128");
//2.密码认证
jedis.auth("wangts");
//3.测试是否连接成功
String ping = jedis.ping();
//4.返回 pong 表示连接成功
System.out.println(ping);
}
}

对于 Jedis 而言,一旦连接上 Redis 服务端,剩下的操作都很容易了。
在 Jedis 中,由于方法的 API 和 Redis 的命令高度一致,所以,Jedis 中的方法见名知意,直接使用即
可。

4.2.2 连接池

在实际应用中,Jedis 实例我们一般都是通过连接池来获取,由于 Jedis 对象不是线程安全的,所以,当
我们使用 Jedis 对象时,从连接池获取 Jedis,使用完成之后,再还给连接池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14

public class JedisPoolTest {
public static void main(String[] args) {
//1. 构造一个 Jedis 连接池
JedisPool pool = new JedisPool("192.168.91.128", 6379);
//2. 从连接池中获取一个 Jedis 连接
Jedis jedis = pool.getResource();
//3. Jedis 操作
String ping = jedis.ping();
System.out.println(ping);
//4. 归还连接
jedis.close();
}
}

如果第三步抛出异常的话,会导致第四步无法执行,所以,我们要对代码进行改进,确保第四步能够执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

public class JedisPoolTest {
public static void main(String[] args) {
Jedis jedis = null;
//1. 构造一个 Jedis 连接池
JedisPool pool = new JedisPool("192.168.91.128", 6379);
//2. 从连接池中获取一个 Jedis 连接
jedis = pool.getResource();
jedis.auth("wangts");
try {
//3. Jedis 操作
String ping = jedis.ping();
System.out.println(ping);
} catch (Exception e) {
e.printStackTrace();
} finally {
//4. 归还连接
if (jedis != null) {
jedis.close();
}
}
}
}

通过 finally 我们可以确保 jedis 一定被关闭。
利用 JDK1.7 中的 try-with-resource 特性,可以对上面的代码进行改造:

1
2
3
4
5
6
7
8
9
10
11

public class JedisPoolTest {
public static void main(String[] args) {
JedisPool pool = new JedisPool("192.168.91.128");
try(Jedis jedis = pool.getResource()) {
jedis.auth("wangts");
String ping = jedis.ping();
System.out.println(ping);
}
}
}

这段代码的作用和上面的是一致的。
但是,上面这段代码无法实现强约束。我们可以做进一步的改进:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

public interface CallWithJedis {
void call(Jedis jedis);
}
}
public class Redis {
private JedisPool pool;
public Redis() {
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
//连接池最大空闲数
config.setMaxIdle(300);
//最大连接数
config.setMaxTotal(1000);
//连接最大等待时间,如果是 -1 表示没有限制
config.setMaxWaitMillis(30000);
//在空闲时检查有效性
config.setTestOnBorrow(true);
/**
* 1. Redis 地址
* 2. Redis 端口
* 3. 连接超时时间
* 4. 密码
*/
pool = new JedisPool(config, "192.168.91.128", 6379, 30000, "wangts");
}
public void execute(CallWithJedis callWithJedis) {
try (Jedis jedis = pool.getResource()) {
callWithJedis.call(jedis);
}
}
}
Redis redis = new Redis();
redis.execute(jedis -> {
System.out.println(jedis.ping());
});

4.3 Lettuce

GitHub:https://github.com/lettuce-io/lettuce-core
Lettuce 和 Jedis 的一个比较:

  1. Jedis 在实现的过程中是直接连接 Redis 的,在多个线程之间共享一个 Jedis 实例,这是线程不安全的,
    如果想在多线程场景下使用 Jedis,就得使用连接池,这样,每个线程都有自己的 Jedis 实例。
  2. Lettuce 基于目前很火的 Netty NIO 框架来构建,所以克服了 Jedis 中线程不安全的问题,Lettuce
    支持同步、异步 以及 响应式调用,多个线程可以共享一个连接实例。
    使用 Lettuce,首先创建一个普通的 Maven 项目,添加 Lettuce 依赖:
1
2
3
4
5
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>5.2.2.RELEASE</version>
</dependency>

然后来一个简单的测试案例:

1
2
3
4
5
6
7
8
9
10
11

public class LettuceTest {
public static void main(String[] args) {
RedisClient redisClient = RedisClient.create("redis://wangts@192.168.91.128");
StatefulRedisConnection<String, String> connect = redisClient.connect();
RedisCommands<String, String> sync = connect.sync();
sync.set("name", "wangts");
String name = sync.get("name");
System.out.println(name);
}
}

注意这里的密码传递方式,密码直接写在连接地址里边。

5 Redis 做分布式锁

问题场景:

例如一个简单的用户操作,一个线程去修改用户的状态,首先从数据库中读出用户的状态,然后
在内存中进行修改,修改完成后,再存回去。在单线程中,这个操作没有问题,但是在多线程
中,由于读取、修改、存 这是三个操作,不是原子操作,所以在多线程中,这样会出问题。
对于这种问题,我们可以使用分布式锁来限制程序的并发执行。

5.1 基本用法

分布式锁实现的思路很简单,就是进来一个线程先占位,当别的线程进来操作时,发现已经有人占位了,就会放弃或者稍后再试。

在 Redis 中,占位一般使用 setnx 指令,先进来的线程先占位,线程的操作执行完成后,再调用 del 指令释放位子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

public class LockTest {
public static void main(String[] args) {
Redis redis = new Redis();
redis.execute(jedis->{
Long setnx = jedis.setnx("k1", "v1");
if (setnx == 1) {
//没人占位
jedis.set("name", "wangts");
String name = jedis.get("name");
System.out.println(name);
jedis.del("k1");//释放资源
}else{
//有人占位,停止/暂缓 操作
}
});
}
}

上面的代码存在一个小小问题:如果代码业务执行的过程中抛异常或者挂了,这样会导致 del 指令没有
被调用,这样,k1 无法释放,后面来的请求全部堵塞在这里,锁也永远得不到释放。

要解决这个问题,我们可以给锁添加一个过期时间,确保锁在一定的时间之后,能够得到释放。改进后
的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

pulic class LockTest {
public static void main(String[] args) {
Redis redis = new Redis();
redis.execute(jedis->{
Long setnx = jedis.setnx("k1", "v1");
if (setnx == 1) {
//给锁添加一个过期时间,防止应用在运行过程中抛出异常导致锁无法及时得到释放
jedis.expire("k1", 5);
//没人占位
jedis.set("name", "wangts");
String name = jedis.get("name");
System.out.println(name);
jedis.del("k1");//释放资源
}else{
//有人占位,停止/暂缓 操作
}
});
}
}

这样改造之后,还有一个问题,就是在获取锁和设置过期时间之间如果如果服务器突然挂掉了,这个时
候锁被占用,无法及时得到释放,也会造成死锁,因为获取锁和设置过期时间是两个操作,不具备原子性。
为了解决这个问题,从 Redis2.8 开始,setnx 和 expire 可以通过一个命令一起来执行了,我们对上述
代码再做改进:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class LockTest {
public static void main(String[] args) {
Redis redis = new Redis();
redis.execute(jedis->{
String set = jedis.set("k1", "v1", new SetParams().nx().ex(5));
if (set !=null && "OK".equals(set)) {
//给锁添加一个过期时间,防止应用在运行过程中抛出异常导致锁无法及时得到释放
jedis.expire("k1", 5);
//没人占位
jedis.set("name", "wangts");
String name = jedis.get("name");
System.out.println(name);
jedis.del("k1");//释放资源
}else{
//有人占位,停止/暂缓 操作
}
});
}
}

5.2 解决超时问题

为了防止业务代码在执行的时候抛出异常,我们给每一个锁添加了一个超时时间,超时之后,锁会被自
动释放,但是这也带来了一个新的问题:如果要执行的业务非常耗时,可能会出现紊乱。举个例子:第
一个线程首先获取到锁,然后开始执行业务代码,但是业务代码比较耗时,执行了 8 秒,这样,会在第
一个线程的任务还未执行成功锁就会被释放了,此时第二个线程会获取到锁开始执行,在第二个线程刚
执行了 3 秒,第一个线程也执行完了,此时第一个线程会释放锁,但是注意,它释放的第二个线程的锁,
释放之后,第三个线程进来。
对于这个问题,我们可以从两个角度入手:

  • 尽量避免在获取锁之后,执行耗时操作。
  • 可以在锁上面做文章,将锁的 value 设置为一个随机字符串,每次释放锁的时候,都去比较随机字符串是否一致,
    如果一致,再去释放,否则,不释放。

对于第二种方案,由于释放锁的时候,要去查看锁的 value,第二个比较 value 的值是否正确,第三步
释放锁,有三个步骤,很明显三个步骤不具备原子性,为了解决这个问题,我们得引入 Lua 脚本。
Lua 脚本的优势:

  • 使用方便,Redis 中内置了对 Lua 脚本的支持。
  • Lua 脚本可以在 Redis 服务端原子的执行多个 Redis 命令。
  • 由于网络在很大程度上会影响到 Redis 性能,而使用 Lua 脚本可以让多个命令一次执行,可以有效解决网络给 Redis 带来的性能问题。

在 Redis 中,使用 Lua 脚本,大致上两种思路:

  1. 提前在 Redis 服务端写好 Lua 脚本,然后在 Java 客户端去调用脚本(推荐)。
  2. 可以直接在 Java 端去写 Lua 脚本,写好之后,需要执行时,每次将脚本发送到 Redis 上去执行。

首先在 Redis 服务端创建 Lua 脚本,内容如下:

1
2
3
4
5
if redis.call("get",KEYS[1])==ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end

接下来,可以给 Lua 脚本求一个 SHA1 和,命令如下:

1
cat lua/releasewherevalueequal.lua | redis-cli -a javaboy script load --pipe

script load 这个命令会在 Redis 服务器中缓存 Lua 脚本,并返回脚本内容的 SHA1 校验和,然后在
Java 端调用时,传入 SHA1 校验和作为参数,这样 Redis 服务端就知道执行哪个脚本了。
接下来,在 Java 端调用这个脚本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

public class LuaTest {
public static void main(String[] args) {
Redis redis = new Redis();
for (int i = 0; i < 2; i++) {
redis.execute(jedis -> {
//1.先获取一个随机字符串
String value = UUID.randomUUID().toString();
//2.获取锁
String k1 = jedis.set("k1", value, new SetParams().nx().ex(5));
//3.判断是否成功拿到锁
if (k1 != null && "OK".equals(k1)) {
//4. 具体的业务操作
jedis.set("site", "www.wangts.site");
String site = jedis.get("site");
System.out.println(site);
//5.释放锁
jedis.evalsha("b8059ba43af6ffe8bed3db65bac35d452f8115d8",
Arrays.asList("k1"), Arrays.asList(value));
} else {
System.out.println("没拿到锁");
}
});
}
}
}

6 Redis单线程如何处理高并发

6.1 阻塞 IO 与非阻塞 IO

Java 在 JDK1.4 中引入 NIO,但是也有很多人在使用阻塞 IO,这两种 IO 有什么区别?
在阻塞模式下,如果你从数据流中读取不到指定大小的数据两,IO 就会阻塞。比如已知会有 10 个字节
发送过来,但是我目前只收到 4 个,还剩六个,此时就会发生阻塞。如果是非阻塞模式,虽然此时只收
到4 个字节,
但是读到 4 个字节就会立即返回,不会傻傻等着,等另外 6 个字节来的时候,再去继续读取。
所以阻塞 IO 性能低于 非阻塞 IO。
如果有一个 Web 服务器,使用阻塞 IO 来处理请求,那么每一个请求都需要开启一个新的线程;但是如
果使用了非阻塞 IO,基本上一个小小线程池就够用了,因为不会发生阻塞,每一个线程都能够高效利用。

6.2 Redis 的线程模型

首先一点,Redis 是单线程。单线程如何解决高并发问题的?
实际上,能够处理高并发的单线程应用不仅仅是 Redis,除了 Redis 之外,还有 NodeJS、Nginx 等等
也是单线程。
Redis 虽然是单线程,但是运行很快,主要有如下几方面原因:

  1. Redis 中的所有数据都是基于内存的,所有的计算也都是内存级别的计算,所以快。
  2. Redis 是单线程的,所以有一些时间复杂度高的指令,可能会导致 Redis 卡顿,例如 keys。
  3. Redis 在处理并发的客户端连接时,使用了非阻塞 IO。
    在使用非阻塞 IO 时,有一个问题,就是线程如何知道剩下的数据来了?
    这里就涉及到一个新的概念叫做多路复用,本质上就是一个事件轮询 API。
  4. Redis 会给每一个客户端指令通过队列来排队进行顺序处理。
  5. Redis 做出响应时,也会有一个响应的队列。

6.3 Redis 通信协议

Redis 通信使用了文本协议,文本协议比较费流量,但是 Redis 作者认为数据库的瓶颈不在于网络流量,
而在于内部逻辑,所以采用了这样一个费流量的文本协议。
这个文本协议叫做 Redis Serialization Protocol,简称 RESP。
Redis 协议将传输的数据结构分为 5 种最小单元,单元结束时,加上回车换行符 \r\n。

  1. 单行字符串以 + 开始,例如 +wangts.site\r\n
  2. 多行字符串以 $ 开始,后面加上字符串长度,例如 $11\r\nwangts.site\r\n
  3. 整数值以: 开始,例如 :1024\r\n
  4. 错误消息以 - 开始
  5. 数组以 * 开始,后面加上数组长度。
    需要注意的是,如果是客户端连接服务端,只能使用第 5 种。

接下来,我们通过 Socket+RESP 来定义两个最最常见的命令 set 和 get。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public class JavaboyRedisClient {
private Socket socket;
public JavaboyRedisClient() {
try {
socket = new Socket("192.168.91.128", 6379);
} catch (IOException e) {
e.printStackTrace();
System.out.println("Redis 连接失败");
}
}

/**
* 执行 Redis 中的 set 命令 [set,key,value]
* @param key
* @param value
* @return
*/
public String set(String key, String value) throws IOException {
StringBuilder sb = new StringBuilder();
sb.append("*3")
.append("\r\n")
.append("$")
.append("set".length())
.append("\r\n")
.append("set")
.append("\r\n")
.append("$")
.append(key.getBytes().length)
.append("\r\n")
.append(key)
.append("\r\n")
.append("$")
.append(value.getBytes().length)
.append("\r\n")
.append(value)
.append("\r\n");
System.out.println(sb.toString());
socket.getOutputStream().write(sb.toString().getBytes());
byte[] buf = new byte[1024];
socket.getInputStream().read(buf);
return new String(buf);
} /**
* 执行 Redis 中的 get 命令 [get,key]
* @param key
* @return
*/
public String get(String key) throws IOException {
StringBuilder sb = new StringBuilder();
sb.append("*2")
.append("\r\n")
.append("$")
.append("get".length())
.append("\r\n")
.append("get")
.append("\r\n")
.append("$")
.append(key.getBytes().length)
.append("\r\n")
.append(key)
.append("\r\n");
socket.getOutputStream().write(sb.toString().getBytes());
byte[] buf = new byte[1024];
socket.getInputStream().read(buf);
return new String(buf);
}
public static void main(String[] args) throws IOException {
String set = new JavaboyRedisClient().set("k1", "面朝大海");
System.out.println(set);
String k1 = new JavaboyRedisClient().get("k1");
System.out.println(k1);
}
}

7 Redis持久化

Redis 是一个缓存工具,也叫做 NoSQL 数据库,既然是数据库,必然支持数据的持久化操作。在 Redis
中,数据库持久化一共有两种方案:

  1. 快照方式
  2. AOF 日志

7.1 快照

7.1.1 快照原理

Redis 使用操作系统的多进程机制来实现快照持久化:Redis 在持久化时,会调用 glibc 函数 fork 一个
子进程,然后将快照持久化操作完全交给子进程去处理,而父进程则继续处理客户端请求。在这个过程
中,子进程能够看到的内存中的数据在子进程产生的一瞬间就固定下来了,再也不会改变,也就是为什
么 Redis 持久化叫做 快照。

7.1.2 具体配置

在 Redis 中,默认情况下,快照持久化的方式就是开启的。
默认情况下会产生一个 dump.rdb 文件,这个文件就是备份下来的文件。当 Redis 启动时,会自动的去
加载这个 rdb 文件,从该文件中恢复数据。
具体的配置,在 redis.conf中:

1
2
3
4
5
6
7
8
9
10
11
12
# 表示快照的频率,第一个表示 900 秒内如果有一个键被修改,则进行快照
save 900 1
save 300 10
save 60 10000
# 快照执行出错后,是否继续处理客户端的写命令
stop-writes-on-bgsave-error yes
# 是否对快照文件进行压缩
rdbcompression yes
# 表示生成的快照文件名
dbfilename dump.rdb
# 表示生成的快照文件位置
dir ./

7.1.3 备份流程

  1. 在 Redis 运行过程中,我们可以向 Redis 发送一条 save 命令来创建一个快照。但是需要注意,
    save 是一个阻塞命令,Redis 在收到 save 命令开始处理备份操作之后,在处理完成之前,将不再
    处理其他的请求。其他命令会被挂起,所以 save 使用的并不多。
  2. 我们一般可以使用 bgsave,bgsave 会 fork 一个子进程去处理备份的事情,不影响父进程处理客
    户端请求。
  3. 我们定义的备份规则,如果有规则满足,也会自动触发 bgsave。
  4. 另外,当我们执行 shutdown 命令时,也会触发 save 命令,备份工作完成后,Redis 才会关闭。
  5. 用 Redis 搭建主从复制时,在 从机连上主机之后,会自动发送一条 sync 同步命令,主机收到命令
    之后,首先执行 bgsave 对数据进行快照,然后才会给从机发送快照数据进行同步。

7.2 AOF

与快照持久化不同,AOF 持久化是将被执行的命令追加到 aof 文件末尾,在恢复时,只需要把记录下来
的命令从头到尾执行一遍即可。
默认情况下,AOF 是没有开启的。我们需要手动开启:

1
2
3
4
5
6
7
8
9
10
11
12
# 开启 aof 配置
appendonly yes
# AOF 文件名
appendfilename "appendonly.aof"
# 备份的时机,下面的配置表示每秒钟备份一次
appendfsync everysec
# 表示 aof 文件在压缩时,是否还继续进行同步操作
no-appendfsync-on-rewrite no
# 表示当目前 aof 文件大小超过上一次重写时的 aof 文件大小的百分之多少的时候,再次进行重写
auto-aof-rewrite-percentage 100
# 如果之前没有重写过,则以启动时的 aof 大小为依据,同时要求 aof 文件至少要大于 64M
auto-aof-rewrite-min-size 64mb

同时为了避免快照备份的影响,记得将快照备份关闭:

1
2
3
4
save ""
#save 900 1
#save 300 10
#save 60 10000

8 Redis事务

正常来说,一个可以商用的数据库往往都有比较完善的事务支持,Redis 当然也不例外。相对于 关系型
数据库中的事务模型,Redis 中的事务要简单很多。因为简单,所以 Redis 中的事务模型不太严格,所
以我们不能像使用关系型数据库中的事务那样来使用 Redis。
在关系型数据库中,和事务相关的三个指令分别是:

  • begin
  • commit
  • rollback
    在 Redis 中,当然也有对应的指令:
  • multi
  • exec
  • discard

8.1 不具原子性

注意,Redis 中的事务并不能算作原子性。它仅仅具备隔离性,也就是说当前的事务可以不被其他事务
打断。
由于每一次事务操作涉及到的指令还是比较多的,为了提高执行效率,我们在使用客户端的时候,可以
通过 pipeline 来优化指令的执行。
Redis 中还有一个 watch 指令,watch 可以用来监控一个 key,通过这种监控,我们可以确保在 exec
之前,watch 的键的没有被修改过。

8.2 代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class TransactionTest {
public static void main(String[] args) {
new Redis().execute(jedis -> {
new TransactionTest().saveMoney(jedis, "javaboy", 1000);
});
}
public Integer saveMoney(Jedis jedis, String userId, Integer money) {
while (true) {
jedis.watch(userId);
int v = Integer.parseInt(jedis.get(userId)) + money;
Transaction tx = jedis.multi();
tx.set(userId, String.valueOf(v));
List<Object> exec = tx.exec();
if (exec != null) {
break;
}
}
return Integer.parseInt(jedis.get(userId));
}
}

9 Redis主从同步

9.1 CAP

在分布式环境下,CAP 原理是一个非常基础的东西,所有的分布式存储系统,都只能在 CAP 中选择两项实现。

c:consistent 一致性
a:availability 可用性
p:partition tolerance 分布式容忍性
在一个分布式系统中,这三个只能满足两个:在一个分布式系统中,P 肯定是要实现的,c 和 a 只能选
择其中一个。大部分情况下,大多数网站架构选择了 ap。
在 Redis 中,实际上就是保证最终一致。
Redis 中,当搭建了主从服务之后,如果主从之间的连接断开了,Redis 依然是可以操作的,相当于它
满足可用性,但是此时主从两个节点中的数据会有差异,相当于牺牲了一致性。但是 Redis 保证最终一
致,就是说当网络恢复的时候,从机会追赶主机,尽量保持数据一致。

9.2 主从复制

主从复制可以在一定程度上扩展 redis 性能,redis 的主从复制和关系型数据库的主从复制类似,从机
能够精确的复制主机上的内容。实现了主从复制之后,一方面能够实现数据的读写分离,降低master的
压力,另一方面也能实现数据的备份。

9.2.1 配置方式

假设我有三个redis实例,地址分别如下:

1
2
3
192.168.91.128:6379  
192.168.91.128:6380
192.168.91.128:6381

即同一台服务器上三个实例,配置方式如下:

  1. 将 redis.conf 文件更名为 redis6379.conf,方便我们区分,然后把 redis6379.conf 再复制两份,
    分别为 redis6380.conf 和 redis6381.conf。如下:
  2. 打开 redis6379.conf,将如下配置均加上 6379,(默认是6379的不用修改),如下:
    1
    2
    3
    4
    5
    port 6379
    pidfile /var/run/redis_6379.pid
    logfile "6379.log"
    dbfilename dump6379.rdb
    appendfilename "appendonly6379.aof"
  3. 同理,分别打开 redis6380.conf 和 redis6381.conf 两个配置文件,将第二步涉及到 6379 的分别
    改为 6380 和 6381。
  4. 输入如下命令,启动三个redis实例:
    1
    2
    3
    [root@localhost redis-4.0.8]# redis-server redis6379.conf
    [root@localhost redis-4.0.8]# redis-server redis6380.conf
    [root@localhost redis-4.0.8]# redis-server redis6381.conf
  5. 输入如下命令,分别进入三个实例的控制台:
    1
    2
    3
    [root@localhost redis-4.0.8]# redis-cli -p 6379
    [root@localhost redis-4.0.8]# redis-cli -p 6380
    [root@localhost redis-4.0.8]# redis-cli -p 6381
    此时我就成功配置了三个redis实例了。
  6. 假设在这三个实例中,6379 是主机,即 master,6380 和 6381 是从机,即 slave,那么如何配
    置这种实例关系呢,很简单,分别在 6380 和 6381 上执行如下命令:
    1
    2
    127.0.0.1:6381> SLAVEOF 127.0.0.1 6379
    OK
    这一步也可以通过在两个从机的 redis.conf 中添加如下配置来解决:
    1
    slaveof 127.0.0.1 6379
    OK,主从关系搭建好后,我们可以通过如下命令可以查看每个实例当前的状态,如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    127.0.0.1:6379> INFO replication
    # Replication
    role:master
    connected_slaves:2
    slave0:ip=127.0.0.1,port=6380,state=online,offset=56,lag=1
    slave1:ip=127.0.0.1,port=6381,state=online,offset=56,lag=0
    master_replid:26ca818360d6510b717e471f3f0a6f5985b6225d
    master_replid2:0000000000000000000000000000000000000000
    master_repl_offset:56
    second_repl_offset:-1
    repl_backlog_active:1
    repl_backlog_size:1048576
    repl_backlog_first_byte_offset:1
    repl_backlog_histlen:56
    我们可以看到 6379 是一个主机,上面挂了两个从机,两个从机的地址、端口等信息都展现出来了。如
    果我们在 6380 上执行 INFO replication,显示信息如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    127.0.0.1:6380> INFO replication
    # Replication
    role:slave
    master_host:127.0.0.1
    master_port:6379
    master_link_status:up
    master_last_io_seconds_ago:6
    master_sync_in_progress:0
    slave_repl_offset:630
    slave_priority:100
    slave_read_only:1
    connected_slaves:0
    master_replid:26ca818360d6510b717e471f3f0a6f5985b6225d
    master_replid2:0000000000000000000000000000000000000000
    master_repl_offset:630
    second_repl_offset:-1
    repl_backlog_active:1
    repl_backlog_size:1048576
    repl_backlog_first_byte_offset:1
    repl_backlog_histlen:630
    我们可以看到 6380 是一个从机,从机的信息以及它的主机的信息都展示出来了。
  7. 此时,我们在主机中存储一条数据,在从机中就可以 get 到这条数据了。

9.2.2 主从复制注意点

  1. 如果主机已经运行了一段时间了,并且了已经存储了一些数据了,此时从机连上来,那么从机会将
    主机上所有的数据进行备份,而不是从连接的那个时间点开始备份。
  2. 配置了主从复制之后,主机上可读可写,但是从机只能读取不能写入(可以通过修改redis.conf
    中 slave-read-only 的值让从机也可以执行写操作)。
  3. 在整个主从结构运行过程中,如果主机不幸挂掉,重启之后,他依然是主机,主从复制操作也能够
    继续进行。

9.2.3 主从复制原理

每一个 master 都有一个 replication ID,这是一个较大的伪随机字符串,标记了一个给定的数据集。每
个 master 也持有一个偏移量,master 将自己产生的复制流发送给 slave 时,发送多少个字节的数据,
自身的偏移量就会增加多少,目的是当有新的操作修改自己的数据集时,它可以以此更新 slave 的状
态。复制偏移量即使在没有一个 slave 连接到 master 时,也会自增,所以基本上每一对给定的
Replication ID, offset 都会标识一个 master 数据集的确切版本。当 slave 连接到 master 时,它们使用
PSYNC 命令来发送它们记录的旧的 master replication ID 和它们至今为止处理的偏移量。通过这种方
式,master 能够仅发送 slave 所需的增量部分。但是如果 master 的缓冲区中没有足够的命令积压缓冲
记录,或者如果 slave 引用了不再知道的历史记录(replication ID),则会转而进行一个全量重同步:
在这种情况下,slave 会得到一个完整的数据集副本,从头开始(参考redis官网)。
简单来说,就是以下几个步骤:

  1. slave 启动成功连接到 master 后会发送一个 sync 命令。
  2. Master 接到命令启动后台的存盘进程,同时收集所有接收到的用于修改数据集命令。
  3. 在后台进程执行完毕之后,master 将传送整个数据文件到 slave,以完成一次完全同步。
  4. 全量复制:而 slave 服务在接收到数据库文件数据后,将其存盘并加载到内存中。
  5. 增量复制:Master 继续将新的所有收集到的修改命令依次传给 slave,完成同步。
  6. 但是只要是重新连接 master,一次完全同步(全量复制)将被自动执行。

9.2.4 一场接力赛

在上篇文章中,我们搭建的主从复制模式是下面这样的:

实际上,一主二仆的主从复制,我们可以搭建成下面这种结构:

搭建方式很简单,在前文基础上,我们只需要修改 6381 的 master 即可,在 6381 实例上执行如下命
令,让 6381 从 6380 实例上复制数据,如下:

1
2
127.0.0.1:6381> SLAVEOF 127.0.0.1 6380
OK

此时,我们再看 6379 的 slave,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
127.0.0.1:6379> info replication
# Replication
role:master
connected_slaves:1
slave0:ip=127.0.0.1,port=6380,state=online,offset=0,lag=1
master_replid:4a38bbfa37586c29139b4ca1e04e8a9c88793651
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:0
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:0

只有一个 slave,就是 6380,我们再看 6380 的信息,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
127.0.0.1:6380> info replication
# Replication
role:slave
master_host:127.0.0.1
master_port:6379
master_link_status:up
master_last_io_seconds_ago:1
master_sync_in_progress:0
slave_repl_offset:70
slave_priority:100
slave_read_only:1
connected_slaves:1
slave0:ip=127.0.0.1,port=6381,state=online,offset=70,lag=0
master_replid:4a38bbfa37586c29139b4ca1e04e8a9c88793651
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:70
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:70

6380 此时的角色是一个从机,它的主机是 6379,但是 6380 自己也有一个从机,那就是 6381.
此时我们的主从结构如下图:

9.2.5 哨兵模式

结合上篇文章,我们一共介绍了两种主从模式了,但是这两种,不管是哪一种,都会存在这样一个问
题,那就是当主机宕机时,就会发生群龙无首的情况,如果在主机宕机时,能够从从机中选出一个来充
当主机,那么就不用我们每次去手动重启主机了,这就涉及到一个新的话题,那就是哨兵模式。
所谓的哨兵模式,其实并不复杂,我们还是在我们前面的基础上来搭建哨兵模式。假设现在我的
master 是 6379,两个从机分别是 6380 和 6381,两个从机都是从 6379 上复制数据。先按照上文的步
骤,我们配置好一主二仆,然后在 redis 目录下打开 sentinel.conf 文件,做如下配置:

1
sentinel monitor mymaster 127.0.0.1 6379 1

其中 mymaster 是给要监控的主机取的名字,随意取,后面是主机地址,最后面的 2 表示有多少个
sentinel 认为主机挂掉了,就进行切换(我这里只有一个,因此设置为1)。好了,配置完成后,输入
如下命令启动哨兵:

1
redis-sentinel sentinel.conf

然后启动我们的一主二仆架构,启动成功后,关闭 master,观察哨兵窗口输出的日志,如下:

可以看到,6379 挂掉之后,redis 内部重新举行了选举,6380 重新上位。此时,如果 6379
重启,也不再是扛把子了,只能屈身做一个 slave 了。

注意:
由于所有的写操作都是先在 Master 上操作,然后同步更新到 Slave 上,所以从 Master 同步到 Slave
机器有一定的延迟,当系统很繁忙的时候,延迟问题会更加严重,Slave 机器数量的增加也会使这个问
题更加严重。因此我们还需要集群来进一步提升 redis 性能,这个问题我们将在后面说到。

9.3 Jedis 操作哨兵模式

准备工作:

  1. 所有的实例均配置 masterauth (在 redis.conf 配置文件中)
  2. 所有实例均需要配置绑定地址:bind 192.168.91.128
    另外,哨兵配置的时候,监控的 master 也不要直接写 127.0.0.1,按如下方式写:
    1
    sentinel monitor mymaster 192.168.91.128 6380 1
    做好准备工作,然后启动三个 redis 实例,同时启动哨兵。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    public class Sentinel {
    public static void main(String[] args) {
    JedisPoolConfig config = new JedisPoolConfig();
    config.setMaxTotal(10);
    config.setMaxWaitMillis(1000);
    String master = "mymaster";
    Set<String> sentinels = new HashSet<>();
    sentinels.add("192.168.91.128:26379");
    JedisSentinelPool sentinelPool = new JedisSentinelPool(master,
    sentinels, config, "javaboy");
    Jedis jedis = null;
    while (true) {
    try {
    jedis = sentinelPool.getResource();
    String k1 = jedis.get("k1");
    System.out.println(k1);
    } catch (Exception e) {
    e.printStackTrace();
    } finally {
    if (jedis != null) {
    jedis.close();
    }
    try {
    Thread.sleep(5000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }
    }
    }

9.4 SpringBoot操作哨兵模式

配置 Redis 连接:

1
2
3
4
5
6
7
spring:
redis:
password: javaboy
timeout: 5000
sentinel:
master: mymaster
nodes: 192.168.91.128:26379

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@SpringBootTest
class SentinelApplicationTests {
@Autowired
StringRedisTemplate redisTemplate;
@Test
void contextLoads() {
while (true) {
try {
String k1 = redisTemplate.opsForValue().get("k1");
System.out.println(k1);
} catch (Exception e) {
} finally {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}

10 Redis集群

集群原理

Redis 集群运行原理如下:

  1. 所有的 Redis 节点彼此互联(PING-PONG机制),内部使用二进制协议优化传输速度和带宽
  2. 节点的 fail 是通过集群中超过半数的节点检测失效时才生效
  3. 客户端与 Redis 节点直连,不需要中间 proxy 层,客户端不需要连接集群所有节点,连接集群中任
    何一个可用节点即可
  4. Redis-cluster 把所有的物理节点映射到 [0-16383]slot 上,cluster (簇)负责维护 node<->slot<-

    value 。Redis 集群中内置了 16384 个哈希槽,当需要在 Redis 集群中放置一个key-value 时,
    Redis 先对 key 使用 crc16 算法算出一个结果,然后把结果对 16384 求余数,这样每个 key 都会
    对应一个编号在 0-16383 之间的哈希槽,Redis 会根据节点数量大致均等的将哈希槽映射到不同
    的节点

怎么样投票

投票过程是集群中所有 master 参与,如果半数以上 master 节点与 master 节点通信超过 clusternode-timeout 设置的时间,
认为当前 master 节点挂掉。

怎样判断节点不可用

  1. 如果集群任意 master 挂掉,且当前 master 没有 slave.集群进入 fail 状态,也可以理解成集群的 slot
    映射 [0-16383] 不完整时进入 fail 状态。
  2. 如果集群超过半数以上 master 挂掉,无论是否有 slave,集群进入 fail 状态,当集群不可用时,所有
    对集群的操作做都不可用,收到((error) CLUSTERDOWN The cluster is down)错误

ruby

Redis 集群管理工具 redis-trib.rb 依赖 ruby 环境,首先需要安装 ruby 环境:
安装 ruby:

1
2
yum install ruby
yum install rubygems

集群搭建

首先我们对集群做一个简单规划,假设我的集群中一共有三个节点,每个节点一个主机一个从机,这样
我一共需要 6 个 Redis 实例。首先创建 redis-cluster 文件夹,在该文件夹下分别创建 7001、7002、
7003、7004、7005、7006 文件夹,用来存放我的 Redis 配置文件,如下:

将 Redis 也在 redis-cluster 目录下安装一份,然后将 redis.conf 文件向 7001-7006 这 6 个文件夹中分
别拷贝一份,拷贝完成后,分别修改如下参数:

1
2
3
4
5
6
port 7001
#bind 127.0.0.1
cluster-enabled yes
cluster-config-XXXXX7001.conf
protected no
daemonize yes

这是 7001 目录下的配置,其他的文件夹将 7001 改为对应的数字即可。修改完成后,进入到 redis 安
装目录中,分别启动各个 redis,使用刚刚修改过的配置文件,如下:

执行启动脚本,启动成功后接下来我们就可以进行集群的创建了,首先将 redis/src 目录下的
redis-trib.rb 文件拷贝到 redis-cluster 目录下,然后在 redis-cluster 目录下执行如下命令:

1
2
3
./redis-trib.rb create --replicas 1 192.168.248.128:7001 192.168.248.128:7002
192.168.248.128:7003 192.168.248.128:7004 192.168.248.128:7005
192.168.248.128:7006

注意,replicas 后面的 1 表示每个主机都带有 1 个从机,执行过程如下:

注意创建过程的日志,每个redis都获得了一个编号,同时日志也说明了哪些实例做主机,哪些实例做从
机,每个从机的主机是谁,每个主机所分配到的hash槽范围等等。

查询集群信息

集群创建成功后,我们可以登录到 Redis 控制台查看集群信息,注意登录时要添加 -c 参数,表示以集
群方式连接,如下:

1
2
> cluster nodes 
> cluster info

添加主节点

首先我们准备一个端口为 7007 的主节点并启动,准备方式和前面步骤一样,启动成功后,通过如下命
令添加主节点:

1
./redis-trib.rb add-node 127.0.0.1:7007 127.0.0.1:7001

主节点添加之后,我们可以通过 cluster nodes 命令查看主节点是否添加成功,此时我们发现新添加的
节点没有分配到 slot,如下:

没有分配到 slot 将不能存储数据,此时我们需要手动分配 slot,分配命令如下:

1
./redis-trib.rb reshard 127.0.0.1:7001

后面的地址为任意一个节点地址,在分配的过程中,我们一共要输入如下几个参数:

  1. 一共要划分多少个 hash 槽出来?就是我们总共要给新添加的节点分多少 hash 槽,这个参数依实
    际情况而定,如下:

    How many slots do you want to move (from 1 to 16384)?

  2. 这些划分出来的槽要给谁,这里输入 7007 节点的编号,如下:

    what is the receiving node ID? ed23rfdsar23rfds23t23rfewff23fr236456

  3. 要让谁出血?因为 hash 槽目前已经全部分配完毕,要重新从已经分好的节点中拿出来一部分给
    7007,必然要让另外三个节点把吃进去的吐出来,这里我们可以输入多个节点的编号,每次输完
    一个点击回车,输完所有的输入 done 表示输入完成,这样就让这几个节点让出部分 slot,如果要
    让所有具有 slot 的节点都参与到此次 slot 重新分配的活动中,那么这里直接输入 all 即可,如下: OK,主要就是这几个参数,输完之后进入到 slot 重新分配环节,分配完成后,通过 cluster nodes 命
    令,我们可以发现 7007 已经具有 slot 了,如下: OK,刚刚我们是添加主节点,我们也可以添加从节点,比如我要把 7008 作为 7007 的从节点,添加方式
    如下:
    1
    2
    3
    ./redis-trib.rb add-node --slave --master-id
    79bbb30bba66b4997b9360dd09849c67d2d02bb9 192.168.31.135:7008
    192.168.31.135:7007
    其中 79bbb30bba66b4997b9360dd09849c67d2d02bb9 是 7007 的编号。

删除节点

删除节点也比较简单,如下:

1
./redis-trib.rb del-node 127.0.0.1:7005 4b45eb75c8b428fbd77ab979b85080146a9bc017

注意 4b45eb75c8b428fbd77ab979b85080146a9bc017 是要删除节点的编号。
再注意:删除已经占有 hash 槽的结点会失败,报错如下:

1
[ERR] Node 127.0.0.1:7005 is not empty! Reshard data away and try again

需要将该节点占用的 hash 槽分配出去(分配方式与上文一致,不赘述)。

Jedis操作RedisCluster

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class RedisCluster {
public static void main(String[] args) {
Set<HostAndPort> nodes = new HashSet<>();
nodes.add(new HostAndPort("192.168.91.128", 7001));
nodes.add(new HostAndPort("192.168.91.128", 7002));
nodes.add(new HostAndPort("192.168.91.128", 7003));
nodes.add(new HostAndPort("192.168.91.128", 7004));
nodes.add(new HostAndPort("192.168.91.128", 7005));
nodes.add(new HostAndPort("192.168.91.128", 7006));
nodes.add(new HostAndPort("192.168.91.128", 7007));
JedisPoolConfig config = new JedisPoolConfig();
//连接池最大空闲数
config.setMaxIdle(300);
//最大连接数
config.setMaxTotal(1000);
//连接最大等待时间,如果是 -1 表示没有限制
config.setMaxWaitMillis(30000);
//在空闲时检查有效性
config.setTestOnBorrow(true);
JedisCluster cluster = new JedisCluster(nodes, 15000, 15000, 5,
"javaboy", config);
String set = cluster.set("k1", "v1");
System.out.println(set);
String k1 = cluster.get("k1");
System.out.println(k1);
}
}
  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!
  • Copyrights © 2019-2021 Wangts
  • 访问人数: | 浏览次数:

加个好友呗~

微信