redis数据倾斜的原因以及应对方案 JD开源hotkey的源码解析

1 前言
之前旁边的小伙伴问我热点数据相关问题,在给他粗略的讲解一波redis数据倾斜的案例之后,自己也顺道回顾了一些关于热点数据处理的方法论,同时也想起去年所学习jd开源项目hotkey——专门用来解决热点数据问题的框架。在这里结合两者所关联到的知识点,通过几个小图和部分粗略的讲解,来让大家了解相关方法论以及hotkey的源码解析。
2 redis数据倾斜
2.1 定义与危害
先说说数据倾斜的定义,借用百度词条的解释:
对于集群系统,一般缓存是分布式的,即不同节点负责一定范围的缓存数据。我们把缓存数据分散度不够,导致大量的缓存数据集中到了一台或者几台服务节点上,称为数据倾斜。一般来说数据倾斜是由于负载均衡实施的效果不好引起的。
从上面的定义中可以得知,数据倾斜的原因一般是因为lb的效果不好,导致部分节点数据量非常集中。
那这又会有什么危害呢?
如果发生了数据倾斜,那么保存了大量数据,或者是保存了热点数据的实例的处理压力就会增大,速度变慢,甚至还可能会引起这个实例的内存资源耗尽,从而崩溃。这是我们在应用切片集群时要避免的。
2.2 数据倾斜的分类
2.2.1 数据量倾斜(写入倾斜)
1.图示
如图,在某些情况下,实例上的数据分布不均衡,某个实例上的数据特别多。
2.bigkey导致倾斜
某个实例上正好保存了 bigkey。bigkey 的 value 值很大(string 类型),或者是 bigkey 保存了大量集合元素(集合类型),会导致这个实例的数据量增加,内存资源消耗也相应增加。
应对方法
在业务层生成数据时,要尽量避免把过多的数据保存在同一个键值对中。
如果 bigkey 正好是集合类型,还有一个方法,就是把 bigkey 拆分成很多个小的集合类型数据,分散保存在不同的实例上。
3.slot分配不均导致倾斜
先简单的介绍一下slot的概念,slot其实全名是hash slot(哈希槽),在redis cluster切片集群中一共有16384 个 slot,这些哈希槽类似于数据分区,每个键值对都会根据它的 key,被映射到一个哈希槽中。redis cluster 方案采用哈希槽来处理数据和实例之间的映射关系。
一张图来解释,数据、哈希槽、实例这三者的映射分布情况。
这里的crc16(city)%16384可以简单的理解为将key1根据crc16算法取hash值然后对slot个数取模,得到的就是slot位置为14484,他所对应的实例节点是第三个。
运维在构建切片集群时候,需要手动分配哈希槽,并且把16384 个槽都分配完,否则 redis 集群无法正常工作。由于是手动分配,则可能会导致部分实例所分配的slot过多,导致数据倾斜。
应对方法
使用cluster slots 命令来查看slot分配情况,使用cluster setslot,cluster getkeysinslot,migrate这三个命令来进行slot数据的迁移,具体内容不再这里细说,感兴趣的同学可以自行学习一下。
4.hash tag导致倾斜
hash tag 定义 :指当一个key包含 {} 的时候,就不对整个key做hash,而仅对 {} 包括的字符串做hash。
假设hash算法为sha1。对user:{user1}:ids和user:{user1}:tweets,其hash值都等同于sha1(user1)。
hash tag 优势 :如果不同 key 的 hash tag 内容都是一样的,那么,这些 key 对应的数据会被映射到同一个 slot 中,同时会被分配到同一个实例上。
hash tag 劣势 :如果不合理使用,会导致大量的数据可能被集中到一个实例上发生数据倾斜,集群中的负载不均衡。
2.2.2 数据访问倾斜(读取倾斜-热key问题)
一般来说数据访问倾斜就是热key问题导致的,如何处理redis热key问题也是面试中常会问到的。所以了解相关概念及方法论也是不可或缺的一环。
1.图示
如图,虽然每个集群实例上的数据量相差不大,但是某个实例上的数据是热点数据,被访问得非常频繁。
但是为啥会有热点数据的产生呢?
2.产生热key的原因及危害
1)用户消费的数据远大于生产的数据(热卖商品、热点新闻、热点评论、明星直播)。
在日常工作生活中一些突发的的事件,例如:双十一期间某些热门商品的降价促销,当这其中的某一件商品被数万次点击浏览或者购买时,会形成一个较大的需求量,这种情况下就会造成热点问题。
同理,被大量刊发、浏览的热点新闻、热点评论、明星直播等,这些典型的读多写少的场景也会产生热点问题。
2)请求分片集中,超过单 server 的性能极限。
在服务端读数据进行访问时,往往会对数据进行分片切分,此过程中会在某一主机 server 上对相应的 key 进行访问,当访问超过 server 极限时,就会导致热点 key 问题的产生。
如果热点过于集中,热点 key 的缓存过多,超过目前的缓存容量时,就会导致缓存分片服务被打垮现象的产生。当缓存服务崩溃后,此时再有请求产生,会缓存到后台 db 上,由于db 本身性能较弱,在面临大请求时很容易发生请求穿透现象,会进一步导致雪崩现象,严重影响设备的性能。
3.常用的热key问题解决办法:
解决方案一: 备份热key
可以把热点数据复制多份,在每一个数据副本的 key 中增加一个随机后缀,让它和其它副本数据不会被映射到同一个 slot 中。
这里相当于把一份数据复制到其他实例上,这样在访问的时候也增加随机前缀,将对一个实例的访问压力,均摊到其他实例上
例如:
我们在放入缓存时就将对应业务的缓存key拆分成多个不同的key。如下图所示,我们首先在更新缓存的一侧,将key拆成n份,比如一个key名字叫做”good_100”,那我们就可以把它拆成四份,“good_100_copy1”、“good_100_copy2”、“good_100_copy3”、“good_100_copy4”,每次更新和新增时都需要去改动这n个key,这一步就是拆key。
对于service端来讲,我们就需要想办法尽量将自己访问的流量足够的均匀。
如何给自己即将访问的热key上加入后缀?几种办法,根据本机的ip或mac地址做hash,之后的值与拆key的数量做取余,最终决定拼接成什么样的key后缀,从而打到哪台机器上;服务启动时的一个随机数对拆key的数量做取余。
伪代码如下:
const m = n * 2
//生成随机数
random = genrandom(0, m)
//构造备份新key
bakhotkey = hotkey + “_” + random
data = redis.get(bakhotkey)
if data == null {
data = getfromdb()
redis.set(bakhotkey, expiretime + genrandom(0,5))
}
解决方案二: 本地缓存+动态计算自动发现热点缓存 基本流程图
该方案通过主动发现热点并对其进行存储来解决热点 key 的问题。首先 client 也会访问 slb,并且通过 slb 将各种请求分发至 proxy 中,proxy 会按照基于路由的方式将请求转发至后端的 redis 中。 在热点 key 的解决上是采用在服务端增加缓存的方式进行。具体来说就是在 proxy 上增加本地缓存,本地缓存采用 lru 算法来缓存热点数据,后端节点增加热点数据计算模块来返回热点数据。
proxy 架构的主要有以下优点:
proxy 本地缓存热点,读能力可水平扩展
db 节点定时计算热点数据集合
db 反馈 proxy 热点数据
对客户端完全透明,不需做任何兼容
热点数据的发现与存储
对于热点数据的发现,首先会在一个周期内对 key 进行请求统计,在达到请求量级后会对热点 key 进行热点定位,并将所有的热点 key 放入一个小的 lru 链表内,在通过 proxy 请求进行访问时,若 redis 发现待访点是一个热点,就会进入一个反馈阶段,同时对该数据进行标记。 可以使用一个etcd或者zk集群来存储反馈的热点数据,然后本地所有节点监听该热点数据,进而加载到本地jvm缓存中。
热点数据的获取
在热点 key 的处理上主要分为写入跟读取两种形式,在数据写入过程当 slb 收到数据 k1 并将其通过某一个 proxy 写入一个 redis,完成数据的写入。 假若经过后端热点模块计算发现 k1 成为热点 key 后, proxy 会将该热点进行缓存,当下次客户端再进行访问 k1 时,可以不经 redis。 最后由于 proxy 是可以水平扩充的,因此可以任意增强热点数据的访问能力。
最佳成熟方案: jd开源hotkey 这是目前较为成熟的自动探测热key、分布式一致性缓存解决方案。原理就是在client端做洞察,然后上报对应hotkey,server端检测到后,将对应hotkey下发到对应服务端做本地缓存,并且能保证本地缓存和远程缓存的一致性。
在这里咱们就不细谈了,这篇文章的第三部分:jd开源hotkey源码解析里面会带领大家了解其整体原理。
3 jd开源hotkey—自动探测热key、分布式一致性缓存解决方案
3.1 解决痛点
从上面可知,热点key问题在并发量比较高的系统中(特别是做秒杀活动)出现的频率会比较高,对系统带来的危害也很大。 那么针对此,hotkey诞生的目的是什么?需要解决的痛点是什么?以及它的实现原理。
在这里引用项目上的一段话来概述: 对任意突发性的无法预先感知的热点数据,包括并不限于热点数据(如突发大量请求同一个商品)、热用户(如恶意爬虫刷子)、热接口(突发海量请求同一个接口)等,进行毫秒级精准探测到。然后对这些热数据、热用户等,推送到所有服务端jvm内存中,以大幅减轻对后端数据存储层的冲击,并可以由使用者决定如何分配、使用这些热key(譬如对热商品做本地缓存、对热用户进行拒绝访问、对热接口进行熔断或返回默认值)。这些热数据在整个服务端集群内保持一致性,并且业务隔离。
核心功能:热数据探测并推送至集群各个服务器
3.2 集成方式
集成方式在这里就不详述了,感兴趣的同学可以自行搜索。
3.3 源码解析
3.3.1 架构简介
1.全景图一览
流程介绍:
客户端通过引用hotkey的client包,在启动的时候上报自己的信息给worker,同时和worker之间建立长连接。定时拉取配置中心上面的规则信息和worker集群信息。
客户端调用hotkey的ishot()的方法来首先匹配规则,然后统计是不是热key。
通过定时任务把热key数据上传到worker节点。
worker集群在收取到所有关于这个key的数据以后(因为通过hash来决定key 上传到哪个worker的,所以同一个key只会在同一个worker节点上),在和定义的规则进行匹配后判断是不是热key,如果是则推送给客户端,完成本地缓存。
2.角色构成
这里直接借用作者的描述: 1)etcd集群 etcd作为一个高性能的配置中心,可以以极小的资源占用,提供高效的监听订阅服务。主要用于存放规则配置,各worker的ip地址,以及探测出的热key、手工添加的热key等。
2)client端jar包 就是在服务中添加的引用jar,引入后,就可以以便捷的方式去判断某key是否热key。同时,该jar完成了key上报、监听etcd里的rule变化、worker信息变化、热key变化,对热key进行本地caffeine缓存等。
3) worker端集群 worker端是一个独立部署的java程序,启动后会连接etcd,并定期上报自己的ip信息,供client端获取地址并进行长连接。之后,主要就是对各个client发来的待测key进行累加计算,当达到etcd里设定的rule阈值后,将热key推送到各个client。
4) dashboard控制台 控制台是一个带可视化界面的java程序,也是连接到etcd,之后在控制台设置各个app的key规则,譬如2秒20次算热。然后当worker探测出来热key后,会将key发往etcd,dashboard也会监听热key信息,进行入库保存记录。同时,dashboard也可以手工添加、删除热key,供各个client端监听。
3.hotkey工程结构
3.3.2 client端
主要从下面三个方面来解析源码:
4.客户端启动器
1)启动方式
@postconstruct
public void init() {
clientstarter.builder builder = new clientstarter.builder();
clientstarter starter = builder.setappname(appname).setetcdserver(etcd).build();
starter.startpipeline();
}
appname:是这个应用的名称,一般为${spring.application.name}的值,后续所有的配置都以此为开头 etcd:是etcd集群的地址,用逗号分隔,配置中心。 还可以看到clientstarter实现了建造者模式,使代码更为简介。
2)核心入口 com.jd.platform.hotkey.client.clientstarter#startpipeline
/**
* 启动监听etcd
*/
public void startpipeline() {
jdlogger.info(getclass(), etcdserver: + etcdserver);
//设置caffeine的最大容量
context.caffeine_size = caffeinesize;
//设置etcd地址
etcdconfigfactory.buildconfigcenter(etcdserver);
//开始定时推送
pushschedulerstarter.startpusher(pushperiod);
pushschedulerstarter.startcountpusher(10);
//开启worker重连器
workerretryconnector.retryconnectworkers();
registeventbus();
etcdstarter starter = new etcdstarter();
//与etcd相关的监听都开启
starter.start();
}
该方法主要有五个功能:
① 设置本地缓存(caffeine)的最大值,并创建etcd实例
//设置caffeine的最大容量
context.caffeine_size = caffeinesize;
//设置etcd地址
etcdconfigfactory.buildconfigcenter(etcdserver);
caffeinesize是本地缓存的最大值,在启动的时候可以设置,不设置默认为200000。 etcdserver是上面说的etcd集群地址。
context可以理解为一个配置类,里面就包含两个字段:
public class context {
public static string app_name;
public static int caffeine_size;
}
etcdconfigfactory是ectd配置中心的工厂类
public class etcdconfigfactory {
private static iconfigcenter configcenter;
private etcdconfigfactory() {}
public static iconfigcenter configcenter() {
return configcenter;
}
public static void buildconfigcenter(string etcdserver) {
//连接多个时,逗号分隔
configcenter = jdetcdbuilder.build(etcdserver);
}
}
通过其configcenter()方法获取创建etcd实例对象,iconfigcenter接口封装了etcd实例对象的行为(包括基本的crud、监控、续约等)
② 创建并启动定时任务:pushschedulerstarter
//开始定时推送
pushschedulerstarter.startpusher(pushperiod);//每0.5秒推送一次待测key
pushschedulerstarter.startcountpusher(10);//每10秒推送一次数量统计,不可配置
pushperiod是推送的间隔时间,可以再启动的时候设置,最小为0.05s,推送越快,探测的越密集,会越快探测出来,但对client资源消耗相应增大
pushschedulerstarter类
/**
* 每0.5秒推送一次待测key
*/
public static void startpusher(long period) {
if (period == null || period {
//热key的收集器
ikeycollector collecthk = keyhandlerfactory.getcollector();
//这里相当于每0.5秒,通过netty来给worker来推送收集到的热key的信息,主要是一些热key的元数据信息(热key来源的app和key的类型和是否是删除事件,还有该热key的上报次数)
//这里面还有就是该热key在每次上报的时候都会生成一个全局的唯一id,还有该热key每次上报的创建时间是在netty发送的时候来生成,同一批次的热key时间是相同的
list hotkeymodels = collecthk.lockandgetresult();
if(collectionutil.isnotempty(hotkeymodels)){
//积攒了半秒的key集合,按照hash分发到不同的worker
keyhandlerfactory.getpusher().send(context.app_name, hotkeymodels);
collecthk.finishonce();
}
},0, period, timeunit.milliseconds);
}
/**
* 每10秒推送一次数量统计
*/
public static void startcountpusher(integer period) {
if (period == null || period {
ikeycollector collecthk = keyhandlerfactory.getcounter();
list keycountmodels = collecthk.lockandgetresult();
if(collectionutil.isnotempty(keycountmodels)){
//积攒了10秒的数量,按照hash分发到不同的worker
keyhandlerfactory.getpusher().sendcount(context.app_name, keycountmodels);
collecthk.finishonce();
}
},0, period, timeunit.seconds);
}
从上面两个方法可知,都是通过定时线程池来实现定时任务的,都是守护线程。
咱们重点关注一下keyhandlerfactory类,它是client端设计的一个比较巧妙的地方,从类名上直译为key处理工厂。具体的实例对象是defaultkeyhandler:
public class defaultkeyhandler {
//推送hotkeymsg消息到netty的推送者
private ikeypusher ikeypusher = new nettykeypusher();
//待测key的收集器,这里面包含两个map,key主要是热key的名字,value主要是热key的元数据信息(比如:热key来源的app和key的类型和是否是删除事件)
private ikeycollector ikeycollector = new turnkeycollector();
//数量收集器,这里面包含两个map,这里面key是相应的规则,hitcount里面是这个规则的总访问次数和热后访问次数
private ikeycollector ikeycounter = new turncountcollector();
public ikeypusher keypusher() {
return ikeypusher;
}
public ikeycollector keycollector() {
return ikeycollector;
}
public ikeycollector keycounter() {
return ikeycounter;
}
}
这里面有三个成员对象,分别是封装推送消息到netty的nettykeypusher、待测key收集器turnkeycollector、数量收集器turncountcollector,其中后两者都实现了接口ikeycollector,能对hotkey的处理起到有效的聚合,充分体现了代码的高内聚。 先来看看封装推送消息到netty的nettykeypusher:
/**
* 将msg推送到netty的pusher
* @author wuweifeng wrote on 2020-01-06
* @version 1.0
*/
public class nettykeypusher implements ikeypusher {
@override
public void send(string appname, list list) {
//积攒了半秒的key集合,按照hash分发到不同的worker
long now = system.currenttimemillis();
map map = new hashmap();
for(hotkeymodel model : list) {
model.setcreatetime(now);
channel channel = workerinfoholder.choosechannel(model.getkey());
if (channel == null) {
continue;
}
list newlist = map.computeifabsent(channel, k -> new arraylist());
newlist.add(model);
}
for (channel channel : map.keyset()) {
try {
list batch = map.get(channel);
hotkeymsg hotkeymsg = new hotkeymsg(messagetype.request_new_key, context.app_name);
hotkeymsg.sethotkeymodels(batch);
channel.writeandflush(hotkeymsg).sync();
} catch (exception e) {
try {
inetsocketaddress insocket = (inetsocketaddress) channel.remoteaddress();
jdlogger.error(getclass(),flush error + insocket.getaddress().gethostaddress());
} catch (exception ex) {
jdlogger.error(getclass(),flush error);
}
}
}
}
@override
public void sendcount(string appname, list list) {
//积攒了10秒的数量,按照hash分发到不同的worker
long now = system.currenttimemillis();
map map = new hashmap();
for(keycountmodel model : list) {
model.setcreatetime(now);
channel channel = workerinfoholder.choosechannel(model.getrulekey());
if (channel == null) {
continue;
}
list newlist = map.computeifabsent(channel, k -> new arraylist());
newlist.add(model);
}
for (channel channel : map.keyset()) {
try {
list batch = map.get(channel);
hotkeymsg hotkeymsg = new hotkeymsg(messagetype.request_hit_count, context.app_name);
hotkeymsg.setkeycountmodels(batch);
channel.writeandflush(hotkeymsg).sync();
} catch (exception e) {
try {
inetsocketaddress insocket = (inetsocketaddress) channel.remoteaddress();
jdlogger.error(getclass(),flush error + insocket.getaddress().gethostaddress());
} catch (exception ex) {
jdlogger.error(getclass(),flush error);
}
}
}
}
}
send(string appname, list list) 主要是将turnkeycollector收集的待测key通过netty推送给worker,hotkeymodel对象主要是一些热key的元数据信息(热key来源的app和key的类型和是否是删除事件,还有该热key的上报次数) sendcount(string appname, list list) 主要是将turncountcollector收集的规则所对应的key通过netty推送给worker,keycountmodel对象主要是一些key所对应的规则信息以及访问次数等 workerinfoholder.choosechannel(model.getrulekey()) 根据hash算法获取key对应的服务器,分发到对应服务器相应的channel 连接,所以服务端可以水平无限扩容,毫无压力问题。
再来分析一下key收集器:turnkeycollector与turncountcollector: 实现ikeycollector接口:
/**
* 对hotkey进行聚合
* @author wuweifeng wrote on 2020-01-06
* @version 1.0
*/
public interface ikeycollector {
/**
* 锁定后的返回值
*/
list lockandgetresult();
/**
* 输入的参数
*/
void collect(t t);
void finishonce();
}
lockandgetresult() 主要是获取返回collect方法收集的信息,并将本地暂存的信息清空,方便下个统计周期积攒数据。 collect(t t) 顾名思义他是收集api调用的时候,将收集的到key信息放到本地存储。 finishonce() 该方法目前实现都是空,无需关注。
待测key收集器:turnkeycollector
public class turnkeycollector implements ikeycollector {
//这map里面的key主要是热key的名字,value主要是热key的元数据信息(比如:热key来源的app和key的类型和是否是删除事件)
private concurrenthashmap map0 = new concurrenthashmap();
private concurrenthashmap map1 = new concurrenthashmap();
private atomiclong atomiclong = new atomiclong(0);
@override
public list lockandgetresult() {
//自增后,对应的map就会停止被写入,等待被读取
atomiclong.addandget(1);
list list;
//可以观察这里与collect方法里面的相同位置,会发现一个是操作map0一个是操作map1,这样保证在读map的时候,不会阻塞写map,
//两个map同时提供轮流提供读写能力,设计的很巧妙,值得学习
if (atomiclong.get() % 2 == 0) {
list = get(map1);
map1.clear();
} else {
list = get(map0);
map0.clear();
}
return list;
}
private list get(concurrenthashmap map) {
return collectionutil.list(false, map.values());
}
@override
public void collect(hotkeymodel hotkeymodel) {
string key = hotkeymodel.getkey();
if (strutil.isempty(key)) {
return;
}
if (atomiclong.get() % 2 == 0) {
//不存在时返回null并将key-value放入,已有相同key时,返回该key对应的value,并且不覆盖
hotkeymodel model = map0.putifabsent(key, hotkeymodel);
if (model != null) {
//增加该hotmey上报的次数
model.add(hotkeymodel.getcount());
}
} else {
hotkeymodel model = map1.putifabsent(key, hotkeymodel);
if (model != null) {
model.add(hotkeymodel.getcount());
}
}
}
@override
public void finishonce() {}
}
可以看到该类中有两个concurrenthashmap和一个atomiclong,通过对atomiclong来自增,然后对2取模,来分别控制两个map的读写能力,保证每个map都能做读写,并且同一个map不能同时读写,这样可以避免并发集合读写不阻塞,这一块无锁化的设计还是非常巧妙的,极大的提高了收集的吞吐量。 key数量收集器:turncountcollector 这里的设计与turnkeycollector大同小异,咱们就不细谈了。值得一提的是它里面有个并行处理的机制,当收集的数量超过data_convert_switch_threshold=5000的阈值时,lockandgetresult处理是使用java stream并行流处理,提升处理的效率。
③ 开启worker重连器
//开启worker重连器
workerretryconnector.retryconnectworkers();
public class workerretryconnector {
/**
* 定时去重连没连上的workers
*/
public static void retryconnectworkers() {
@suppresswarnings(pmd.threadpoolcreationrule)
scheduledexecutorservice scheduledexecutorservice = executors.newsinglethreadscheduledexecutor(new namedthreadfactory(worker-retry-connector-service-executor, true));
//开启拉取etcd的worker信息,如果拉取失败,则定时继续拉取
scheduledexecutorservice.scheduleatfixedrate(workerretryconnector::reconnectworkers, 30, 30, timeunit.seconds);
}
private static void reconnectworkers() {
list nonlist = workerinfoholder.getnonconnectedworkers();
if (nonlist.size() == 0) {
return;
}
jdlogger.info(workerretryconnector.class, trying to reconnect to these workers : + nonlist);
nettyclient.getinstance().connect(nonlist);//这里会触发netty连接方法channelactive
}
}
也是通过定时线程来执行,默认时间间隔是30s,不可设置。 通过workerinfoholder来控制client的worker连接信息,连接信息是个list,用的copyonwritearraylist,毕竟是一个读多写少的场景,类似与元数据信息。
/**
* 保存worker的ip地址和channel的映射关系,这是有序的。每次client发送消息时,都会根据该map的size进行hash
* 如key-1就发送到workerholder的第1个channel去,key-2就发到第2个channel去
*/
private static final list worker_holder = new copyonwritearraylist();
④ 注册eventbus事件订阅者
private void registeventbus() {
//netty连接器会关注workerinfochangeevent事件
eventbuscenter.register(new workerchangesubscriber());
//热key探测回调关注热key事件
eventbuscenter.register(new receivenewkeysubscribe());
//rule的变化的事件
eventbuscenter.register(new keyruleholder());
}
使用guava的eventbus事件消息总线,利用发布/订阅者模式来对项目进行解耦。它可以利用很少的代码,来实现多组件间通信。 基本原理图如下:
监听worker信息变动:workerchangesubscriber
/**
* 监听worker信息变动
*/
@subscribe
public void connectall(workerinfochangeevent event) {
list addresses = event.getaddresses();
if (addresses == null) {
addresses = new arraylist();
}
workerinfoholder.mergeandconnectnew(addresses);
}
/**
* 当client与worker的连接断开后,删除
*/
@subscribe
public void channelinactive(channelinactiveevent inactiveevent) {
//获取断线的channel
channel channel = inactiveevent.getchannel();
inetsocketaddress socketaddress = (inetsocketaddress) channel.remoteaddress();
string address = socketaddress.gethostname() + : + socketaddress.getport();
jdlogger.warn(getclass(), this channel is inactive : + socketaddress + trying to remove this connection);
workerinfoholder.dealchannelinactive(address);
}
监听热key回调事件:receivenewkeysubscribe
private receivenewkeylistener receivenewkeylistener = new defaultnewkeylistener();
@subscribe
public void newkeycoming(receivenewkeyevent event) {
hotkeymodel hotkeymodel = event.getmodel();
if (hotkeymodel == null) {
return;
}
//收到新key推送
if (receivenewkeylistener != null) {
receivenewkeylistener.newkey(hotkeymodel);
}
}
该方法会收到新的热key订阅事件之后,会将其加入到keyhandlerfactory的收集器里面处理。
核心处理逻辑
@override
public void newkey(hotkeymodel hotkeymodel) {
long now = system.currenttimemillis();
//如果key到达时已经过去1秒了,记录一下。手工删除key时,没有createtime
if (hotkeymodel.getcreatetime() != 0 && math.abs(now - hotkeymodel.getcreatetime()) > 1000) {
jdlogger.warn(getclass(), the key comes too late : + hotkeymodel.getkey() + now +
+now + keycreateat + hotkeymodel.getcreatetime());
}
if (hotkeymodel.isremove()) {
//如果是删除事件,就直接删除
deletekey(hotkeymodel.getkey());
return;
}
//已经是热key了,又推过来同样的热key,做个日志记录,并刷新一下
if (jdhotkeystore.ishot(hotkeymodel.getkey())) {
jdlogger.warn(getclass(), receive repeat hot key : + hotkeymodel.getkey() + at + now);
}
addkey(hotkeymodel.getkey());
}
private void deletekey(string key) {
cachefactory.getnonnullcache(key).delete(key);
}
private void addkey(string key) {
valuemodel valuemodel = valuemodel.defaultvalue(key);
if (valuemodel == null) {
//不符合任何规则
deletekey(key);
return;
}
//如果原来该key已经存在了,那么value就被重置,过期时间也会被重置。如果原来不存在,就新增的热key
jdhotkeystore.setvaluedirectly(key, valuemodel);
}
如果该hotkeymodel里面是删除事件,则获取rule_cache_map里面该key超时时间对应的caffeine,然后从中删除该key缓存,然后返回(这里相当于删除了本地缓存)。
如果不是删除事件,则在rule_cache_map对应的caffeine缓存中添加该key的缓存。
这里有个注意点,如果不为删除事件,调用addkey()方法在caffeine增加缓存的时候,value是一个魔术值0x12fcf76,这个值只代表加了这个缓存,但是这个缓存在查询的时候相当于为null。
监听rule的变化事件:keyruleholder
可以看到里面有两个成员属性:rule_cache_map,key_rules
/**
* 保存超时时间和caffeine的映射,key是超时时间,value是caffeine[(string,object)]
*/
private static final concurrenthashmap rule_cache_map = new concurrenthashmap();
/**
* 这里key_rules是保存etcd里面该appname所对应的所有rule
*/
private static final list key_rules = new arraylist();
concurrenthashmap rule_cache_map:
保存超时时间和caffeine的映射,key是超时时间,value是caffeine[(string,object)]。
巧妙的设计:这里将key的过期时间作为分桶策略,这样同一个过期时间的key就会在一个桶(caffeine)里面,这里面每一个caffeine都是client的本地缓存,也就是说hotkey的本地缓存的kv实际上是存储在这里面的。
list key_rules:
这里key_rules是保存etcd里面该appname所对应的所有rule。
具体监听keyruleinfochangeevent事件方法:
@subscribe
public void rulechange(keyruleinfochangeevent event) {
jdlogger.info(getclass(), new rules info is : + event.getkeyrules());
list rulelist = event.getkeyrules();
if (rulelist == null) {
return;
}
putrules(rulelist);
}
核心处理逻辑
/**
* 所有的规则,如果规则的超时时间变化了,会重建caffeine
*/
public static void putrules(list keyrules) {
synchronized (key_rules) {
//如果规则为空,清空规则表
if (collectionutil.isempty(keyrules)) {
key_rules.clear();
rule_cache_map.clear();
return;
}
key_rules.clear();
key_rules.addall(keyrules);
set durationset = keyrules.stream().map(keyrule::getduration).collect(collectors.toset());
for (integer duration : rule_cache_map.keyset()) {
//先清除掉那些在rule_cache_map里存的,但是rule里已没有的
if (!durationset.contains(duration)) {
rule_cache_map.remove(duration);
}
}
//遍历所有的规则
for (keyrule keyrule : keyrules) {
int duration = keyrule.getduration();
//这里如果rule_cache_map里面没有超时时间为duration的value,则新建一个放入到rule_cache_map里面
//比如rule_cache_map本来就是空的,则在这里来构建rule_cache_map的映射关系
//todo 如果keyrules里面包含相同duration的keyrule,则也只会建一个key为duration,value为caffeine,其中caffeine是(string,object)
if (rule_cache_map.get(duration) == null) {
localcache cache = cachefactory.build(duration);
rule_cache_map.put(duration, cache);
}
}
}
}
使用synchronized关键字来保证线程安全;
如果规则为空,清空规则表(rule_cache_map、key_rules);
使用传递进来的keyrules来覆盖key_rules;
清除掉rule_cache_map里面在keyrules没有的映射关系;
遍历所有的keyrules,如果rule_cache_map里面没有相关的超时时间key,则在里面赋值;
⑤ 启动etcdstarter(etcd连接管理器)
etcdstarter starter = new etcdstarter();
//与etcd相关的监听都开启
starter.start();
public void start() {
fetchworkerinfo();
fetchrule();
startwatchrule();
//监听热key事件,只监听手工添加、删除的key
startwatchhotkey();
}
fetchworkerinfo() 从etcd里面拉取worker集群地址信息alladdress,并更新workerinfoholder里面的worker_holder
/**
* 每隔30秒拉取worker信息
*/
private void fetchworkerinfo() {
scheduledexecutorservice scheduledexecutorservice = executors.newsinglethreadscheduledexecutor();
//开启拉取etcd的worker信息,如果拉取失败,则定时继续拉取
scheduledexecutorservice.scheduleatfixedrate(() -> {
jdlogger.info(getclass(), trying to connect to etcd and fetch worker info);
fetch();
}, 0, 30, timeunit.seconds);
}
使用定时线程池来执行,单线程。
定时从etcd里面获取,地址/jd/workers/+$appname或default,时间间隔不可设置,默认30秒,这里面存储的是worker地址的ip+port。
发布workerinfochangeevent事件。
备注:地址有$appname或default,在worker里面配置,如果把worker放到某个appname下,则该worker只会参与该app的计算。
fetchrule() 定时线程来执行,单线程,时间间隔不可设置,默认是5秒,当拉取规则配置和手动配置的hotkey成功后,该线程被终止(也就是说只会成功执行一次),执行失败继续执行
private void fetchrule() {
scheduledexecutorservice scheduledexecutorservice = executors.newsinglethreadscheduledexecutor();
//开启拉取etcd的worker信息,如果拉取失败,则定时继续拉取
scheduledexecutorservice.scheduleatfixedrate(() -> {
jdlogger.info(getclass(), trying to connect to etcd and fetch rule info);
boolean success = fetchrulefrometcd();
if (success) {
//拉取已存在的热key
fetchexisthotkey();
//这里如果拉取规则和拉取手动配置的hotkey成功之后,则该定时执行线程停止
scheduledexecutorservice.shutdown();
}
}, 0, 5, timeunit.seconds);
}
fetchrulefrometcd()
从etcd里面获取该appname配置的rule规则,地址/jd/rules/+$appname。
如果查出来规则rules为空,会通过发布keyruleinfochangeevent事件来清空本地的rule配置缓存和所有的规则key缓存。
发布keyruleinfochangeevent事件。
fetchexisthotkey()
从etcd里面获取该appname手动配置的热key,地址/jd/hotkeys/+$appname。
发布receivenewkeyevent事件,并且内容hotkeymodel不是删除事件。
startwatchrule()
/**
* 异步监听rule规则变化
*/
private void startwatchrule() {
executorservice executorservice = executors.newsinglethreadexecutor();
executorservice.submit(() -> {
jdlogger.info(getclass(), --- begin watch rule change ----);
try {
iconfigcenter configcenter = etcdconfigfactory.configcenter();
kvclient.watchiterator watchiterator = configcenter.watch(configconstant.rulepath + context.app_name);
//如果有新事件,即rule的变更,就重新拉取所有的信息
while (watchiterator.hasnext()) {
//这句必须写,next会让他卡住,除非真的有新rule变更
watchupdate watchupdate = watchiterator.next();
list eventlist = watchupdate.getevents();
jdlogger.info(getclass(), rules info changed. begin to fetch new infos. rule change is + eventlist);
//全量拉取rule信息
fetchrulefrometcd();
}
} catch (exception e) {
jdlogger.error(getclass(), watch err);
}
});
}
异步监听rule规则变化,使用etcd监听地址为/jd/rules/+$appname的节点变化。
使用线程池,单线程,异步监听rule规则变化,如果有事件变化,则调用fetchrulefrometcd()方法。
startwatchhotkey() 异步开始监听热key变化信息,使用etcd监听地址前缀为/jd/hotkeys/+$appname
/**
* 异步开始监听热key变化信息,该目录里只有手工添加的key信息
*/
private void startwatchhotkey() {
executorservice executorservice = executors.newsinglethreadexecutor();
executorservice.submit(() -> {
jdlogger.info(getclass(), --- begin watch hotkey change ----);
iconfigcenter configcenter = etcdconfigfactory.configcenter();
try {
kvclient.watchiterator watchiterator = configcenter.watchprefix(configconstant.hotkeypath + context.app_name);
//如果有新事件,即新key产生或删除
while (watchiterator.hasnext()) {
watchupdate watchupdate = watchiterator.next();
list eventlist = watchupdate.getevents();
keyvalue keyvalue = eventlist.get(0).getkv();
event.eventtype eventtype = eventlist.get(0).gettype();
try {
//从这个地方可以看出,etcd给的返回是节点的全路径,而我们需要的key要去掉前缀
string key = keyvalue.getkey().tostringutf8().replace(configconstant.hotkeypath + context.app_name + /, );
//如果是删除key,就立刻删除
if (event.eventtype.delete == eventtype) {
hotkeymodel model = new hotkeymodel();
model.setremove(true);
model.setkey(key);
eventbuscenter.getinstance().post(new receivenewkeyevent(model));
} else {
hotkeymodel model = new hotkeymodel();
model.setremove(false);
string value = keyvalue.getvalue().tostringutf8();
//新增热key
jdlogger.info(getclass(), etcd receive new key : + key + --value: + value);
//如果这是一个删除指令,就什么也不干
//todo 这里有个疑问,监听到worker自动探测发出的惰性删除指令,这里之间跳过了,但是本地缓存没有更新吧?
//todo 所以我猜测在客户端使用判断缓存是否存在的api里面,应该会判断相关缓存的value值是否为#[delete]#删除标记
//解疑:这里确实只监听手工配置的hotkey,etcd的/jd/hotkeys/+$appname该地址只是手动配置hotkey,worker自动探测的hotkey是直接通过netty通道来告知client的
if (constant.default_delete_value.equals(value)) {
continue;
}
//手工创建的value是时间戳
model.setcreatetime(long.valueof(keyvalue.getvalue().tostringutf8()));
model.setkey(key);
eventbuscenter.getinstance().post(new receivenewkeyevent(model));
}
} catch (exception e) {
jdlogger.error(getclass(), new key err : + keyvalue);
}
}
} catch (exception e) {
jdlogger.error(getclass(), watch err);
}
});
}
使用线程池,单线程,异步监听热key变化
使用etcd监听前缀地址的当前节点以及子节点的所有变化值
删除节点动作
发布receivenewkeyevent事件,并且内容hotkeymodel是删除事件
新增or更新节点动作
事件变化的value值为删除标记#[delete]#
如果是删除标记的话,代表是worker自动探测或者client需要删除的指令。
如果是删除标记则什么也不做,直接跳过(这里从hotkeypusher#push方法可以看到,做删除事件的操作时候,他会给/jd/hotkeys/+$appname的节点里面增加一个值为删除标记的节点,然后再删除相同路径的节点,这样就可以触发上面的删除节点事件,所以这里判断如果是删除标记直接跳过)。
不为删除标记
发布receivenewkeyevent事件,事件内容hotkeymodel里面的createtime是kv对应的时间戳
疑问: 这里代码注释里面说只监听手工添加或者删除的hotkey,难道说/jd/hotkeys/+$appname地址只是手工配置的地址吗? 解疑: 这里确实只监听手工配置的hotkey,etcd的/jd/hotkeys/+$appname该地址只是手动配置hotkey,worker自动探测的hotkey是直接通过netty通道来告知client的
5.api解析
1)流程图示 ① 查询流程
② 删除流程:
从上面的流程图中,大家应该知道该热点key在代码中是如何扭转的,这里再给大家讲解一下核心api的源码解析,限于篇幅的原因,咱们不一个个贴相关源码了,只是单纯的告诉你它的内部逻辑是怎么样的。 2)核心类:jdhotkeystore
jdhotkeystore是封装client调用的api核心类,包含上面10个公共方法,咱们重点解析其中6个重要方法: ① ishotkey(string key) 判断是否在规则内,如果不在返回false 判断是否是热key,如果不是或者是且过期时间在2s内,则给turnkeycollector#collect收集 最后给turncountcollector#collect做统计收集 ② get(string key) 从本地caffeine取值 如果取到的value是个魔术值,只代表加入到caffeine缓存里面了,查询的话为null ③ smartset(string key, object value) 判断是否是热key,这里不管它在不在规则内,如果是热key,则给value赋值,如果不为热key什么也不做 ④ forceset(string key, object value) 强制给value赋值 如果该key不在规则配置内,则传递的value不生效,本地缓存的赋值value会被变为null ⑤ getvalue(string key, keytype keytype) 获取value,如果value不存在则调用hotkeypusher#push方法发往netty 如果没有为该key配置规则,就不用上报key,直接返回null 如果取到的value是个魔术值,只代表加入到caffeine缓存里面了,查询的话为null ⑥ remove(string key) 删除某key(本地的caffeine缓存),会通知整个集群删除(通过etcd来通知集群删除) 3)client上传热key入口调用类:hotkeypusher 核心方法:
public static void push(string key, keytype keytype, int count, boolean remove) {
if (count {
try {
//取etcd的是否开启日志配置,地址/jd/logon
string loggeron = configcenter.get(configconstant.logtoggle);
logger_on = true.equals(loggeron) || 1.equals(loggeron);
} catch (statusruntimeexception ex) {
logger.error(etcd_down);
}
//监听etcd地址/jd/logon是否开启日志配置,并实时更改开关
kvclient.watchiterator watchiterator = configcenter.watch(configconstant.logtoggle);
while (watchiterator.hasnext()) {
watchupdate watchupdate = watchiterator.next();
list eventlist = watchupdate.getevents();
keyvalue keyvalue = eventlist.get(0).getkv();
logger.info(log toggle changed : + keyvalue);
string value = keyvalue.getvalue().tostringutf8();
logger_on = true.equals(value) || 1.equals(value);
}
});
}
放到线程池里面异步执行
取etcd的是否开启日志配置,地址/jd/logon,默认true
监听etcd地址/jd/logon是否开启日志配置,并实时更改开关
由于有etcd的监听,所以会一直执行,而不是执行一次结束
② 第二个@postconstruct:watch()
/**
* 启动回调监听器,监听rule变化
*/
@postconstruct
public void watch() {
asyncpool.asyncdo(() -> {
kvclient.watchiterator watchiterator;
if (isforsingle()) {
watchiterator = configcenter.watch(configconstant.rulepath + workerpath);
} else {
watchiterator = configcenter.watchprefix(configconstant.rulepath);
}
while (watchiterator.hasnext()) {
watchupdate watchupdate = watchiterator.next();
list eventlist = watchupdate.getevents();
keyvalue keyvalue = eventlist.get(0).getkv();
logger.info(rule changed : + keyvalue);
try {
rulechange(keyvalue);
} catch (exception e) {
e.printstacktrace();
}
}
});
}
/**
* rule发生变化时,更新缓存的rule
*/
private synchronized void rulechange(keyvalue keyvalue) {
string appname = keyvalue.getkey().tostringutf8().replace(configconstant.rulepath, );
if (strutil.isempty(appname)) {
return;
}
string rulejson = keyvalue.getvalue().tostringutf8();
list keyrules = fastjsonutils.tolist(rulejson, keyrule.class);
keyruleholder.put(appname, keyrules);
}
通过etcd.workerpath配置,来判断该worker是否为某个app单独服务的,默认为”default”,如果是默认值,代表该worker参与在etcd上所有app client的计算,否则只为某个app来服务计算 使用etcd来监听rule规则变化,如果是共享的worker,监听地址前缀为”/jd/rules/“,如果为某个app独享,监听地址为”/jd/rules/“+$etcd.workerpath 如果规则变化,则修改对应app在本地存储的rule缓存,同时清理该app在本地存储的kv缓存
keyruleholder:rule缓存本地存储
map,>
相对于client的keyruleholder的区别:worker是存储所有app规则,每个app对应一个规则桶,所以用map
caffeinecacheholder:key缓存本地存储
map,>
相对于client的caffeine,第一是worker没有做缓存接口比如localcache,第二是client的map的kv分别是超时时间、以及相同超时时间所对应key的缓存桶
放到线程池里面异步执行,由于有etcd的监听,所以会一直执行,而不是执行一次结束
③ 第三个@postconstruct:watchwhitelist()
/**
* 启动回调监听器,监听白名单变化,只监听自己所在的app,白名单key不参与热key计算,直接忽略
*/
@postconstruct
public void watchwhitelist() {
asyncpool.asyncdo(() -> {
//从etcd配置中获取所有白名单
fetchwhite();
kvclient.watchiterator watchiterator = configcenter.watch(configconstant.whitelistpath + workerpath);
while (watchiterator.hasnext()) {
watchupdate watchupdate = watchiterator.next();
logger.info(whitelist changed );
try {
fetchwhite();
} catch (exception e) {
e.printstacktrace();
}
}
});
}
拉取并监听etcd白名单key配置,地址为/jd/whitelist/+$etcd.workerpath
在白名单的key,不参与热key计算,直接忽略
放到线程池里面异步执行,由于有etcd的监听,所以会一直执行,而不是执行一次结束 ④ 第四个@postconstruct:makesureselfon()
/**
* 每隔一会去check一下,自己还在不在etcd里
*/
@postconstruct
public void makesureselfon() {
//开启上传worker信息
scheduledexecutorservice scheduledexecutorservice = executors.newsinglethreadscheduledexecutor();
scheduledexecutorservice.scheduleatfixedrate(() -> {
try {
if (canupload) {
uploadselfinfo();
}
} catch (exception e) {
//do nothing
}
}, 0, 5, timeunit.seconds);
}
在线程池里面异步执行,定时执行,时间间隔为5s
将本机woker的hostname,ip+port以kv的形式定时上报给etcd,地址为/jd/workers/+$etcd.workpath+”/“+$hostname,续期时间为8s
有一个canupload的开关来控制worker是否向etcd来定时续期,如果这个开关关闭了,代表worker不向etcd来续期,这样当上面地址的kv到期之后,etcd会删除该节点,这样client循环判断worker信息变化了
2)将热key推送到dashboard供入库:dashboardpusher ① 第五个@postconstruct:uploadtodashboard()
@component
public class dashboardpusher implements ipusher {
/**
* 热key集中营
*/
private static linkedblockingqueue hotkeystorequeue = new linkedblockingqueue();
@postconstruct
public void uploadtodashboard() {
asyncpool.asyncdo(() -> {
while (true) {
try {
//要么key达到1千个,要么达到1秒,就汇总上报给etcd一次
list tempmodels = new arraylist();
queues.drain(hotkeystorequeue, tempmodels, 1000, 1, timeunit.seconds);
if (collectionutil.isempty(tempmodels)) {
continue;
}
//将热key推到dashboard
dashboardholder.flushtodashboard(fastjsonutils.convertobjecttojson(tempmodels));
} catch (exception e) {
e.printstacktrace();
}
}
});
}
}
当热key的数量达到1000或者每隔1s,把热key的数据通过与dashboard的netty通道来发送给dashboard,数据类型为request_hot_key
linkedblockingqueue
 hotkeystorequeue:worker计算的给dashboard热key的集中营,所有给dashboard推送热key存储在里面 3)推送到各客户端服务器:appserverpusher ① 第六个@postconstruct:batchpushtoclient()
public class appserverpusher implements ipusher {
/**
* 热key集中营
*/
private static linkedblockingqueue hotkeystorequeue = new linkedblockingqueue();
/**
* 和dashboard那边的推送主要区别在于,给app推送每10ms一次,dashboard那边1s一次
*/
@postconstruct
public void batchpushtoclient() {
asyncpool.asyncdo(() -> {
while (true) {
try {
list tempmodels = new arraylist();
//每10ms推送一次
queues.drain(hotkeystorequeue, tempmodels, 10, 10, timeunit.milliseconds);
if (collectionutil.isempty(tempmodels)) {
continue;
}
map allapphotkeymodels = new hashmap();
//拆分出每个app的热key集合,按app分堆
for (hotkeymodel hotkeymodel : tempmodels) {
list oneappmodels = allapphotkeymodels.computeifabsent(hotkeymodel.getappname(), (key) -> new arraylist());
oneappmodels.add(hotkeymodel);
}
//遍历所有app,进行推送
for (appinfo appinfo : clientinfoholder.apps) {
list list = allapphotkeymodels.get(appinfo.getappname());
if (collectionutil.isempty(list)) {
continue;
}
hotkeymsg hotkeymsg = new hotkeymsg(messagetype.response_new_key);
hotkeymsg.sethotkeymodels(list);
//整个app全部发送
appinfo.grouppush(hotkeymsg);
}
//推送完,及时清理不使用内存
allapphotkeymodels = null;
} catch (exception e) {
e.printstacktrace();
}
}
});
}
}
会按照key的appname来进行分组,然后通过对应app的channelgroup来推送
当热key的数量达到10或者每隔10ms,把热key的数据通过与app的netty通道来发送给app,数据类型为response_new_key
linkedblockingqueue
 hotkeystorequeue:worker计算的给client热key的集中营,所有给client推送热key存储在里面 4)client实例节点处理:nodesserverstarter ① 第七个@postconstruct:start()
public class nodesserverstarter {
@value(${netty.port})
private int port;
private logger logger = loggerfactory.getlogger(getclass());
@resource
private iclientchangelistener iclientchangelistener;
@resource
private list messagefilters;
@postconstruct
public void start() {
asyncpool.asyncdo(() -> {
logger.info(netty server is starting);
nodesserver nodesserver = new nodesserver();
nodesserver.setclientchangelistener(iclientchangelistener);
nodesserver.setmessagefilters(messagefilters);
try {
nodesserver.startnettyserver(port);
} catch (exception e) {
e.printstacktrace();
}
});
}
}
线程池里面异步执行,启动client端的nettyserver
iclientchangelistener和messagefilters这两个依赖最终会被传递到netty消息处理器里面,iclientchangelistener会作为channel下线处理来删除clientinfoholder下线或者超时的通道,messagefilters会作为netty收到事件消息的处理过滤器(责任链模式) ② 依赖的bean:iclientchangelistener iclientchangelistener
public interface iclientchangelistener {
/**
* 发现新连接
*/
void newclient(string appname, string channelid, channelhandlercontext ctx);
/**
* 客户端掉线
*/
void loseclient(channelhandlercontext ctx);
}
对客户端的管理,新来(newclient)(会触发netty的连接方法channelactive)、断线(loseclient)(会触发netty的断连方法channelinactive())的管理 client的连接信息主要是在clientinfoholder里面
list
 apps,这里面的appinfo主要是appname和对应的channelgroup
对apps的add和remove主要是通过新来(newclient)、断线(loseclient) ③ 依赖的bean:list
 messagefilters
/**
* 对netty来的消息,进行过滤处理
* @author wuweifeng wrote on 2019-12-11
* @version 1.0
*/
public interface inettymsgfilter {
boolean chain(hotkeymsg message, channelhandlercontext ctx);
}
对client发给worker的netty消息,进行过滤处理,共有四个实现类,也就是说底下四个过滤器都是收到client发送的netty消息来做处理 ④ 各个消息处理的类型:messagetype
app_name((byte) 1),
request_new_key((byte) 2),
response_new_key((byte) 3),
request_hit_count((byte) 7), //命中率
request_hot_key((byte) 8), //热key,worker->dashboard
ping((byte) 4), pong((byte) 5),
empty((byte) 6);
顺序1:heartbeatfilter
当消息类型为ping,则给对应的client示例返回pong
顺序2:appnamefilter
当消息类型为app_name,代表client与worker建立连接成功,然后调用iclientchangelistener的newclient方法增加apps元数据信息
顺序3:hotkeyfilter
处理接收消息类型为request_new_key
先给hotkeyfilter.totalreceivekeycount原子类增1,该原子类代表worker实例接收到的key的总数
publishmsg方法,将消息通过自建的生产者消费者模型(keyproducer,keyconsumer),来把消息给发到生产者中分发消费
接收到的消息hotkeymsg里面list
首先判断hotkeymodel里面的key是否在白名单内,如果在则跳过,否则将hotkeymodel通过keyproducer发送
顺序4:keycounterfilter
处理接收类型为request_hit_count
这个过滤器是专门给dashboard来汇算key的,所以这个appname直接设置为该worker配置的appname
该过滤器的数据来源都是client的nettykeypusher#sendcount(string appname, list
 list),这里面的数据都是默认积攒10s的,这个10s是可以配置的,这一点在client里面有讲
将构造的new keycountitem(appname, models.get(0).getcreatetime(), models)放到阻塞队列linkedblockingqueue
 counter_queue中,然后让counterconsumer来消费处理,消费逻辑是单线程的
counterconsumer:热key统计消费者
放在公共线程池中,来单线程执行
从阻塞队列counter_queue里面取数据,然后将里面的key的统计数据发布到etcd的/jd/keyhitcount/+ appname + “/“ + iputils.getip() + “-“ + system.currenttimemillis()里面,该路径是worker服务的client集群或者default,用来存放客户端hotkey访问次数和总访问次数的path,然后让dashboard来订阅统计展示
2.三个定时任务:3个@scheduled
1)定时任务1:etcdstarter#pullrules()
/**
* 每隔1分钟拉取一次,所有的app的rule
*/
@scheduled(fixedrate = 60000)
public void pullrules() {
try {
if (isforsingle()) {
string value = configcenter.get(configconstant.rulepath + workerpath);
if (!strutil.isempty(value)) {
list keyrules = fastjsonutils.tolist(value, keyrule.class);
keyruleholder.put(workerpath, keyrules);
}
} else {
list keyvalues = configcenter.getprefix(configconstant.rulepath);
for (keyvalue keyvalue : keyvalues) {
rulechange(keyvalue);
}
}
} catch (statusruntimeexception ex) {
logger.error(etcd_down);
}
}
每隔1分钟拉取一次etcd地址为/jd/rules/的规则变化,如果worker所服务的app或者default的rule有变化,则更新规则的缓存,并清空该appname所对应的本地key缓存 2)定时任务2:etcdstarter#uploadclientcount()
/**
* 每隔10秒上传一下client的数量到etcd中
*/
@scheduled(fixedrate = 10000)
public void uploadclientcount() {
try {
string ip = iputils.getip();
for (appinfo appinfo : clientinfoholder.apps) {
string appname = appinfo.getappname();
int count = appinfo.size();
//即便是full gc也不能超过3秒,因为这里给的过期时间是13s,由于该定时任务每隔10s执行一次,如果full gc或者说上报给etcd的时间超过3s,
//则在dashboard查询不到client的数量
configcenter.putandgrant(configconstant.clientcountpath + appname + / + ip, count + , 13);
}
configcenter.putandgrant(configconstant.caffeinesizepath + ip, fastjsonutils.convertobjecttojson(caffeinecacheholder.getsize()), 13);
//上报每秒qps(接收key数量、处理key数量)
string totalcount = fastjsonutils.convertobjecttojson(new totalcount(hotkeyfilter.totalreceivekeycount.get(), totaldealcount.longvalue()));
configcenter.putandgrant(configconstant.totalreceivekeycount + ip, totalcount, 13);
logger.info(totalcount + expirecount: + expiretotalcount + offercount: + totaloffercount);
//如果是稳定一直有key发送的应用,建议开启该监控,以避免可能发生的网络故障
if (openmonitor) {
checkreceivekeycount();
}
// configcenter.putandgrant(configconstant.bufferpoolpath + ip, memorytool.getbufferpool() + , 10);
} catch (exception ex) {
logger.error(etcd_down);
}
}
每个10s将worker计算存储的client信息上报给etcd,来方便dashboard来查询展示,比如/jd/count/对应client数量,/jd/caffeinesize/对应caffeine缓存的大小,/jd/totalkeycount/对应该worker接收的key总量和处理的key总量
可以从代码中看到,上面所有etcd的节点租期时间都是13s,而该定时任务是每10s执行一次,意味着如果full gc或者说上报给etcd的时间超过3s,则在dashboard查询不到client的相关汇算信息
长时间不收到key,判断网络状态不好,断开worker给etcd地址为/jd/workers/+$workerpath节点的续租,因为client会循环判断该地址的节点是否变化,使得client重新连接worker或者断开失联的worker 3)定时任务3:etcdstarter#fetchdashboardip()
/**
* 每隔30秒去获取一下dashboard的地址
*/
@scheduled(fixedrate = 30000)
public void fetchdashboardip() {
try {
//获取dashboardip
list keyvalues = configcenter.getprefix(configconstant.dashboardpath);
//是空,给个警告
if (collectionutil.isempty(keyvalues)) {
logger.warn(very important warn !!! dashboard ip is null!!!);
return;
}
string dashboardip = keyvalues.get(0).getvalue().tostringutf8();
nettyclient.getinstance().connect(dashboardip);
} catch (exception e) {
e.printstacktrace();
}
}
每隔30s拉取一次etcd前缀为/jd/dashboard/的dashboard连接ip的值,并且判断dashboardholder.hasconnected里面是否为未连接状态,如果是则重新连接worker与dashboard的netty通道
3.自建的生产者消费者模型(keyproducer,keyconsumer)
一般生产者消费者模型包含三大元素:生产者、消费者、消息存储队列 这里消息存储队列是dispatcherconfig里面的queue,使用linkedblockingqueue,默认大小为200w 1)keyproducer
@component
public class keyproducer {
public void push(hotkeymodel model, long now) {
if (model == null || model.getkey() == null) {
return;
}
//5秒前的过时消息就不处理了
if (now - model.getcreatetime() > initconstant.timeout) {
expiretotalcount.increment();
return;
}
try {
queue.put(model);
totaloffercount.increment();
} catch (interruptedexception e) {
e.printstacktrace();
}
}
}
判断接收到的hotkeymodel是否超出”netty.timeout”配置的时间,如果是将expiretotalcount纪录过期总数给自增,然后返回 2)keyconsumer
public class keyconsumer {
private ikeylistener ikeylistener;
public void setkeylistener(ikeylistener ikeylistener) {
this.ikeylistener = ikeylistener;
}
public void beginconsume() {
while (true) {
try {
//从这里可以看出,这里的生产者消费者模型,本质上还是拉模式,之所以不使用eventbus,是因为需要队列来做缓冲
hotkeymodel model = queue.take();
if (model.isremove()) {
ikeylistener.removekey(model, keyeventoriginal.client);
} else {
ikeylistener.newkey(model, keyeventoriginal.client);
}
//处理完毕,将数量加1
totaldealcount.increment();
} catch (interruptedexception e) {
e.printstacktrace();
}
}
}
}
@override
public void removekey(hotkeymodel hotkeymodel, keyeventoriginal original) {
//cache里的key,appname+keytype+key
string key = buildkey(hotkeymodel);
hotcache.invalidate(key);
caffeinecacheholder.getcache(hotkeymodel.getappname()).invalidate(key);
//推送所有client删除
hotkeymodel.setcreatetime(systemclock.now());
logger.info(delete_key_event + hotkeymodel.getkey());
for (ipusher pusher : ipushers) {
//这里可以看到,删除热key的netty消息只给client端发了过去,没有给dashboard发过去(dashboardpusher里面的remove是个空方法)
pusher.remove(hotkeymodel);
}
}
@override
public void newkey(hotkeymodel hotkeymodel, keyeventoriginal original) {
//cache里的key
string key = buildkey(hotkeymodel);
//判断是不是刚热不久
//hotcache对应的caffeine有效期为5s,也就是说该key会保存5s,在5s内不重复处理相同的hotkey。
//毕竟hotkey都是瞬时流量,可以避免在这5s内重复推送给client和dashboard,避免无效的网络开销
object o = hotcache.getifpresent(key);
if (o != null) {
return;
}
//********** watch here ************//
//该方法会被initconstant.threadcount个线程同时调用,存在多线程问题
//下面的那句addcount是加了锁的,代表给key累加数量时是原子性的,不会发生多加、少加的情况,到了设定的阈值一定会hot
//譬如阈值是2,如果多个线程累加,在没hot前,hot的状态肯定是对的,譬如thread1 加1,thread2加1,那么thread2会hot返回true,开启推送
//但是极端情况下,譬如阈值是10,当前是9,thread1走到这里时,加1,返回true,thread2也走到这里,加1,此时是11,返回true,问题来了
//该key会走下面的else两次,也就是2次推送。
//所以出现问题的原因是hotcache.getifpresent(key)这一句在并发情况下,没return掉,放了两个key+1到addcount这一步时,会有问题
//测试代码在testblockqueue类,直接运行可以看到会同时hot
//那么该问题用解决吗,no,不需要解决,1 首先要发生的条件极其苛刻,很难触发,以京东这样高的并发量,线上我也没见过触发连续2次推送同一个key的
//2 即便触发了,后果也是可以接受的,2次推送而已,毫无影响,客户端无感知。但是如果非要解决,就要对slidingwindow实例加锁了,必然有一些开销
//所以只要保证key数量不多计算就可以,少计算了没事。因为热key必然频率高,漏计几次没事。但非热key,多计算了,被干成了热key就不对了
slidingwindow slidingwindow = checkwindow(hotkeymodel, key);//从这里可知,每个app的每个key都会对应一个滑动窗口
//看看hot没
boolean hot = slidingwindow.addcount(hotkeymodel.getcount());
if (!hot) {
//如果没hot,重新put,cache会自动刷新过期时间
caffeinecacheholder.getcache(hotkeymodel.getappname()).put(key, slidingwindow);
} else {
//这里之所以放入的value为1,是因为hotcache是用来专门存储刚生成的hotkey
//hotcache对应的caffeine有效期为5s,也就是说该key会保存5s,在5s内不重复处理相同的hotkey。
//毕竟hotkey都是瞬时流量,可以避免在这5s内重复推送给client和dashboard,避免无效的网络开销
hotcache.put(key, 1);
//删掉该key
//这个key从实际上是专门针对slidingwindow的key,他的组合逻辑是appname+keytype+key,而不是给client和dashboard推送的hotkey
caffeinecacheholder.getcache(hotkeymodel.getappname()).invalidate(key);
//开启推送
hotkeymodel.setcreatetime(systemclock.now());
//当开关打开时,打印日志。大促时关闭日志,就不打印了
if (etcdstarter.logger_on) {
logger.info(new_key_event + hotkeymodel.getkey());
}
//分别推送到各client和etcd
for (ipusher pusher : ipushers) {
pusher.push(hotkeymodel);
}
}
}
“thread.count”配置即为消费者个数,多个消费者共同消费一个queue队列 生产者消费者模型,本质上还是拉模式,之所以不使用eventbus,是因为需要队列来做缓冲 根据hotkeymodel里面是否是删除消息类型
       删除caffeinecacheholder里面对应newkey的滑动窗口缓存。
       向该hotkeymodel对应的app的client推送netty消息,表示新产生hotkey,使得client本地缓存,但是推送的netty消息只代表为热key,client本地缓存不会存储key对应的value值,需要调用jdhotkeystore里面的api来给本地缓存的value赋值
    向dashboard推送hotkeymodel,表示新产生hotkey
删除消息类型
根据hotkeymodel里面的appname+keytype+key的名字,来构建caffeine里面的newkey,该newkey在caffeine里面主要是用来与slidingwindow滑动时间窗对应
删除hotcache里面newkey的缓存,放入的缓存kv分别是newkey和1,hotcache作用是用来存储该生成的热key,hotcache对应的caffeine有效期为5s,也就是说该key会保存5s,在5s内不重复处理相同的hotkey。毕竟hotkey都是瞬时流量,可以避免在这5s内重复推送给client和dashboard,避免无效的网络开销
删除caffeinecacheholder里面对应appname的caffeine里面的newkey,这里面存储的是slidingwindow滑动窗口
推送给该hotkeymodel对应的所有client实例,用来让client删除该hotkeymodel
非删除消息类型
根据hotkeymodel里面的appname+keytype+key的名字,来构建caffeine里面的newkey,该newkey在caffeine里面主要是用来与slidingwindow滑动时间窗对应
通过hotcache来判断该newkey是否刚热不久,如果是则返回
根据滑动时间窗口来计算判断该key是否为hotkey(这里可以学习一下滑动时间窗口的设计),并返回或者生成该newkey对应的滑动窗口
如果没有达到热key的标准
通过caffeinecacheholder重新put,cache会自动刷新过期时间
如果达到了热key标准
向hotcache里面增加newkey对应的缓存,value为1表示刚为热key。
3)计算热key滑动窗口的设计 限于篇幅的原因,这里就不细谈了,直接贴出项目作者对其写的说明文章:java简单实现滑动窗口
3.3.4 dashboard端
这个没啥可说的了,就是连接etcd、mysql,增删改查,不过京东的前端框架很方便,直接返回list就可以成列表。
4 总结
文章第二部分为大家讲解了redis数据倾斜的原因以及应对方案,并对热点问题进行了深入,从发现热key到解决热key的两个关键问题的总结。 文章第三部分是热key问题解决方案——jd开源hotkey的源码解析,分别从client端、worker端、dashboard端来进行全方位讲解,包括其设计、使用及相关原理。 希望通过这篇文章,能够使大家不仅学习到相关方法论,也能明白其方法论具体的落地方案,一起学习,一起成长。


中国信通院发布了2020年我国工业互联网产业经济发展报告
小程序开发还犯迷糊?快戳这份华为云实用技巧效率翻倍
L4960可调输出开关式稳压电源,L4960 switching power supply
诺基亚已经成为了全球第一大IP边缘路由公司
魅族PRO6 Plus的基本参数剖析
redis数据倾斜的原因以及应对方案 JD开源hotkey的源码解析
创新发展成未来竞争新常态
采访Alex Measure:机器学习应用于政府业务场景
安科瑞配电室综合监控系统:电力设备监控的全面解决方案
联想移动加入三大高管,这是要搞事情的节奏嘛?
基于5G与未来6G,AI技术为人工智能和5G的融合注入新动能
能耗居高不下,如何制定数据中心节能方案?
自动拆包破包投料一体机特点与应用流程
先进封装技术的发展趋势(2022)
从国产航母下水看中国航母血泪史
连拓精密提供检测水箱气密性测试方案及操作步骤完整版介绍
奔驰新一代S级汽车采用光波导技术有助于缩小体积
中国又一个科技巨头崛起!还荣登全球第一的宝座!
苹果A14X仿生处理器曝光,浮点性能高于A12Z 34%
垃圾智能分类收集箱的产品特点以及安装要求