两亿多用户,六大业务场景,知乎AI用户模型服务性能如何优化?

用户模型简介
知乎 ai 用户模型服务于知乎两亿多用户,主要为首页、推荐、广告、知识服务、想法、关注页等业务场景提供数据和服务,例如首页个性化 feed 的召回和排序、相关回答等用到的用户长期兴趣特征,问题路由、回答排序中用到的 tpr「作者创作权威度」,广告定向投放用到的基础属性等。
主要功能
提供的数据和功能主要有:
用户兴趣:长期兴趣、实时兴趣、分类兴趣、话题兴趣、keyword 兴趣、作者创作权威度等,
用户 embedding 表示:最近邻用户、人群划分、特定用户圈定等,
用户社交属性:用户亲密度、二度好友、共同好友、相似优秀回答者等,
用户实时属性: lastn 行为、lastlogin 等,
用户基础属性:用户性别预测、年龄段计算、职业预估等。
服务架构
整体主要分为 streaming / 离线计算、在线服务和 hbase 多集群同步三部分组成,下面将依次进行介绍。
用户模型服务架构图
streaming / 离线计算
streaming 计算主要涉及功能 lastread、lastsearch、lastdisplay,实时话题/ keyword 兴趣、最后登录时间、最后活跃的省市等。
用户模型实时兴趣计算逻辑图
实时兴趣的计算流程
相应日志获取。从 cardshowlog、pageshowlog、querylog 中抽取等内容。
映射到对应的内容维度。对于问题、回答、文章、搜索分别获取对应的 topic 和 keyword,搜索内容对应的 topic。在 redis 中用 contenttoken 置换 contentid 后,请求 contentprofile 获取其对应话题和关键词;对于 query,调用 topicmatch 服务,传递搜索内容给服务,服务返回其对应的 topic;调用 znlp 的 keywordextractorjar 包,传递搜索内容并获得其对应的 keyword 。
用户-内容维度汇总。根据用户的行为,在和层面进行 groupby 聚合汇总后,并以 hashmap 的格式存储到 redis,作为计算用户实时兴趣的基础数据,按时间衰减系数 timedecay 进行新旧兴趣的 merge 后存储。
计算兴趣。在用户的历史基础数据上,按一定的 decay 速度进行衰减,按威尔逊置信区间计算用户兴趣 score,并以 sortedset 的格式存储到 redis。
关于兴趣计算,已经优化的地方主要是:如何快速的计算平滑参数 alpha 和 beta,如何 daily_update 平滑参数,以及用卡方计算置信度时,是否加入平滑参数等都会对最终的兴趣分值有很大的影响,当 display 为 1 曝光数量不足的情况下,兴趣 score 和 confidence 计算出现 的 bias 问题等。
在线服务
随之知乎日益增加的用户量,以及不断丰富的业务场景和与之相对应出现的调用量上升等,对线上服务的稳定性和请求时延要求也越来越高。 旧服务本身也存在一些问题,比如:
在线服务直连 hbase,当数据热点的时候,造成某些 region server 的负载很高,p95 上升,轻者造成服务抖动,监控图偶发有「毛刺」现象,重者造成服务几分钟的不可用,需要平台技术人员将 region 从负载较高的 regionserver 上移走。
离线任务每次计算完成后一次大批量同时写入离线和在线集群,会加重 hbase 在线集群region server 的负载,增大 hbase get 请求的时延,从而影响线上服务稳定性和 p95。
针对问题一,我们在原来的服务架构中增加缓存机制,以此来增强服务的稳定型、减小 region server 的负载。
针对问题二,修改了离线计算和多集群数据同步的方式,详见「hbase多集群存储机制」部分。
cache机制具体实现
没有 cache 机制时,所有的 get 和 batchget 方法直接请求到 hbase,具体如下图:
用户模型服务请求序列图
userprofileserviceapp 启动服务,将收到的请求交由 userprofileserviceimpl 具体处理
userprofileserviceimp 根据请求参数,调用 gettranslator 将 userprofilerequest.getrequest 转化成 hbase 中的 get object(在 map 中维护每个 requestfield 对应 hbase 中的 tablename,cf,column,prefix 等信息),以格式map[string, util.list[(availfield, get)]]返回。
userprofileserviceimp 用 future 异步向 hbase 发送 get 请求,获取到结果返回。
增加 cache 机制的具体方法,在上面的第二步中,增加一个 cachemap,用来维护 get 中 availfield 对应 cache 中的 key,key 的组成格式为:「 tablename 缩写| columnfamily 缩写| columnname 缩写| rowkey 全写」。这里使用的 redis 数据结构主要有两种,sortedset 和 key-value对。服务端收到请求后先去转化 requestfield 为 cache 中的 key,从 cache 中获取数据。对于没有获取到 requestfield 的转化成 getobject,请求 hbase 获取,将结果保存到 cache 中并返回。
最终效果
用户模型的访问量大概为 100k qps,每个请求转化为多个 get 请求。 增加 cache 前 get 请求的 p95 为30ms,增加 cache 后降低到小于 15ms,cache 命中率 90% 以上。
hbase 多集群存储机制
离线任务和 streaming 计算主要采用 spark 计算实现, 结果保存到 hbase 的几种方式:
方法一:每次一条
1. 每次写进一条,调用 api 进行存储的代码如下:
valhbaseconn=connectionfactory.createconnection(hbaseconf)valtable=hbaseconn.gettable(tablename.valueof(word))x.foreach(value=>{varput=newput(bytes.tobytes(value.tostring))put.addcolumn(bytes.tobytes(f1),bytes.tobytes(c1),bytes.tobytes(value.tostring))table.put(put)})
方法二:批量写入
2. 批量写入 hbase,使用的 api:
/***{@inheritdoc}*@throwsioexception*/@overridepublicvoidput(finallistputs)throwsioexception{getbufferedmutator().mutate(puts);if(autoflush){flushcommits();}}
方法三:mapreduce 的 saveasnewapihadoopdataset 方式写入
3. saveasnewapihadoopdataset 是通用的保存到 hadoop 存储系统的方法,调用 org.apache.hadoop.mapreduce.recordwriter 实现。org.apache.hadoop.hbase.mapreduce.tableoutputformat.tablerecordwriter 是其在 hbase 中的实现类。底层通过调用 hbase.client.bufferedmutator.mutate() 方式保存。
valrdd=sc.makerdd(array(1)).flatmap(_=>0to1000000)rdd.map(x=>{varput=newput(bytes.tobytes(x.tostring))put.addcolumn(bytes.tobytes(f1),bytes.tobytes(c1),bytes.tobytes(x.tostring))(newimmutablebyteswritable,put)}).saveashadoopdataset(jobconf)/***writesakey/valuepairintothetable.*@throwsioexceptionwhenwritingfails.*/@overridepublicvoidwrite(keykey,mutationvalue)throwsioexception{if(!(valueinstanceofput)&&!(valueinstanceofdelete)){thrownewioexception(passadeleteoraput);}mutator.mutate(value);}
方法四:bulkload 方式
4. bulkload 方式,创建 hfiles,调用 loadincrementalhfiles 作业将它们移到 hbase 表中。
首先需要根据表名 getregionlocator 得到 regionlocator,根据 regionlocator 得到 partition,因为在 hfile 中是有序的所以,需要调用 rdd.repartitionandsortwithinpartitions(partitioner) 将 rdd 重新排序。
hfileoutputformat2.configureincrementalload(job,table, regionlocator) 进行任务增量load 到具体表的配置 实现并执行映射( 并减少) 作业,使用 hfileoutputformat2 输出格式将有序的放置或者 keyvalue 对象写入hfile文件。reduce阶段通过调用 hfileoutputformat2.configureincrementalload 配置在场景后面。执行loadincrementalhfiles 作业将 hfile 文件移动到系统文件。
staticvoidconfigureincrementalload(jobjob,tabletable,regionlocatorregionlocator,classcls)throwsioexception{configurationconf=job.getconfiguration();job.setoutputkeyclass(immutablebyteswritable.class);job.setoutputvalueclass(keyvalue.class);job.setoutputformatclass(cls);//basedontheconfiguredmapoutputclass,setthecorrectreducertoproperly//sorttheincomingvalues.if(keyvalue.class.equals(job.getmapoutputvalueclass())){job.setreducerclass(keyvaluesortreducer.class);}elseif(put.class.equals(job.getmapoutputvalueclass())){job.setreducerclass(putsortreducer.class);}elseif(text.class.equals(job.getmapoutputvalueclass())){job.setreducerclass(textsortreducer.class);}else{log.warn(unknownmapoutputvaluetype:+job.getmapoutputvalueclass());}conf.setstrings(io.serializations,conf.get(io.serializations),mutationserialization.class.getname(),resultserialization.class.getname(),keyvalueserialization.class.getname());configurepartitioner(job,startkeys);//setcompressionalgorithmsbasedoncolumnfamiliesconfigurecompression(table,conf);configurebloomtype(table,conf);configureblocksize(table,conf);configuredatablockencoding(table,conf);tablemapreduceutil.adddependencyjars(job);tablemapreduceutil.initcredentials(job);log.info(incrementaltable+table.getname()+outputconfigured.);}publicstaticvoidconfigureincrementalload(jobjob,tabletable,regionlocatorregionlocator)throwsioexception{configureincrementalload(job,table,regionlocator,hfileoutputformat2.class);}valhfileloader=newloadincrementalhfiles(conf)hfileloader.dobulkload(hfilepath,newhtable(conf,table.getname))
将 hfile 文件 bulk load 到已存在的表中。 由于 hbase 的 bulkload 方式是绕过了 write to wal,write to memstore 及 flush to disk 的过程,所以并不能通过 wal 来进行一些复制数据的操作。 由于 bulkload 方式还是对集群 regionserver 造成很高的负载,最终采用方案三,下面是两个集群进行数据同步。
存储同步机制
技术选型 hbase 常见的 replication 方法有 snapshot、copytable/export、bulkload、replication、应用层并发读写等。 应用层并发读写 优点:应用层可以自由灵活控制对 hbase写入速度,打开或关闭两个集群间的同步,打开或关闭两个集群间具体到表或者具体到列簇的同步,对 hbase 集群性能的影响最小,缺点是增加了应用层的维护成本。 初期没有更好的集群数据同步方式的时候,用户模型和内容模型自己负责两集群间的数据同步工作。
用户模型存储多机房同步架构图
具体实现细节
第一步:定义用于在 kafka 的 producer 和 consumer 中流转的统一数据 protobuf 格式
messagecolumnvalue{requiredbytesqualifier=1;......}messageputmessage{requiredstringtablename=1;......}
第二步:发送需要同步的数据到 kafka,(如果有必要,需要对数据做相应的格式处理),这里对数据的处理,有两种方式。 第一种:如果程序中有统一的存储到 hbase 的工具(另一个项目是使用自定义的 hbasehandler,业务层面只生成 tablename,rowkey,columnfamily,column 等值,由 hbasehandler 统一构建成 put 对象,并保存 hbase 中),这种方式在业务层面改动较小,理论上可以直接用原来的格式发给 kafka,但是如果 hbasehandler 处理的格式和 putmessage 格式有不符的地方,做下适配即可。
/***tablename:hbasetablename*rdd:rdd[(rowkey,family,column,value)]*/defconvert(tablename:string,rdd:rdd):rdd={rdd.map{case(rowkey:string,family:string,column:string,value:array[byte])=>valmessage=kafkamessages.newbuilder()valcolumnvalue=columnvalue.newbuilder()columnvalue.set......(rowkey,message.build().tobytearray)}}
第二种:程序在 rdd 中直接构建 hbase 的 put 对象,调用 pairrdd 的 saveasnewapihadoopdataset 方法保存到 hbase 中。此种情况,为了兼容已有的代码,做到代码和业务逻辑的改动最小,发送到 kafka 时,需要将 put 对象转换为上面定义的 putmessage protobuf 格式,然后发送给 kafka。
/***tablename:hbasetablenamne*rdd:rdd[(rowkey,put)]*/defconvert(tablename:string,familynames:array[string],rdd:rdd):rdd={rdd.map{case(_,put:put)=>valmessage=putmessage.newbuilder()for(familyname<-familynames){if(put.getfamilymap().get(bytes.tobytes(familyname))!=null){valkeyvaluelist=put.getfamilymap().asinstanceof[java.util.arraylist[keyvalue]].asscalafor(keyvalue{valproducer=getproducer[t](brokers)partitionofrecords.map(r=>newproducerrecord[string,t](topic,r._1,r._2)).foreach(m=>producer.send(m))producer.close()})}
第四步:另启动 streaming consumer 或者服务消费 kafka 中内容,将 putmessage 的 protobuf 格式转成 hbase 的 put 对象,同时写入到在线 hbase 集群中。 streaming 消费kafka ,不同的表发送到不同的 topic,对每个 topic 的消费做监控。
valtohbasetagstopic=validkafkastreamtagstopic.map{record=>valtablename_r=record.gettablename()valput=newput(record.getrowkey.tobytearray)for(cvrdd.saveasnewapihadoopdataset(accessutils.createoutputtableconfiguration(constants.constants.namespace+:+constants.constants.tags_topic_table_name))}}
如下为另一种启动服务消费 kafka 的方式。
valconsumer=newkafkaconsumer[string,array[byte]](probs)consumer.subscribe(topics)valrecords=consumer.poll(100)for(ptry(kafkamessages.parsefrom(r.value()))match{casesuccess(record)=>valtablename=record.gettablenameif(validatetables.contains(tablename)){valmessagetype=record.gettype......try{valcolumns=record.getcolumnslist.map(c=>(c.getcolumn,c.getvalue.tobytearray)).toarrayhbasehandler.write(tablename)......}catch{caseex:throwable=>log.error(writehbasefail)haloclient.increment(scontent_write_hbase_fail)}}else{log.error(stable$tablenameisvalid)}}}//updateoffsetvallastoffset=recordsofpartition.get(recordsofpartition.size-1).offset()consumer.commitsync(java.util.collections.singletonmap(p,newoffsetandmetadata(lastoffset+1)))}
结语
最后,目前采用的由应用控制和管理在线离线集群的同步机制,在随着平台多机房项目的推动下,平台将推出 hbase 的统一同步机制 hrp (hbase replication proxy),届时业务部门可以将更多的时间和精力集中在模型优化层面。

再竖行业新标杆 赛普拉斯发布革命性的Gen5 TrueTouch控制器
oppor11什么时候上市?oppor11配置曝光,携手oppor11 plus一同归来
音圈模在水产品药物残留检测仪的应用
电子芯闻早报:三星Note7永久停产 大陆面板产能明年超韩国
VB集成环境及简单应用程序的建立
两亿多用户,六大业务场景,知乎AI用户模型服务性能如何优化?
泰克Tektronix 示波器电流探头TRCP3000的使用注意事项
智能技术加持,“开灯”是种什么样的体验
欧洲租车龙头订购10万台比亚迪,中国新能源汽车进军欧洲市场!
ARM Cortex-R8使SoC的性能提高一倍
中控智慧科技射频卡读卡器KR801B介绍
利用3D打印机制成的捕鸟蛛六足机器人能借灵活的肢体轻松应对复杂地形
功率放大模块如何选择(安泰功率放大器模块产品介绍)
华为携手Altice,为葡萄牙实现5G网络
Wine更新:支持 Linux 运行 Windows 应用,PE 格式核心模块
10家动力电池企业上半年业绩表现如何?有人欢喜有人忧
htc被收购,htc亏损太厉害放弃手机业务被谷歌收了
逐渐落后的CAN总线
开关稳压器的基础-改善同步整流式的轻负载时效率的功能
稳定币和安全代币将怎样影响下一代数字证券