作者 | 王政英

来源 | 知乎技术专栏


用户模型简介

知乎 AI 用户模型服务于知乎两亿多用户,主要为首页、推荐、广告、知识服务、想法、关注页等业务场景提供数据和服务,例如首页个性化 Feed 的召回和排序、相关回答等用到的用户长期兴趣特征,问题路由、回答排序中用到的 TPR「作者创作权威度」,广告定向投放用到的基础属性等。

主要功能

提供的数据和功能主要有:

  • 用户兴趣:长期兴趣、实时兴趣、分类兴趣、话题兴趣、keyword 兴趣、作者创作权威度等,

  • 用户 Embedding 表示:最近邻用户、人群划分、特定用户圈定等,

  • 用户社交属性:用户亲密度、二度好友、共同好友、相似优秀回答者等,

  • 用户实时属性: LastN 行为、LastLogin 等,

  • 用户基础属性:用户性别预测、年龄段计算、职业预估等。

服务架构

整体主要分为 Streaming / 离线计算、在线服务和 HBase 多集群同步三部分组成,下面将依次进行介绍。

用户模型服务架构图


Streaming / 离线计算

Streaming 计算主要涉及功能 LastRead、LastSearch、LastDisplay,实时话题/ Keyword 兴趣、最后登录时间、最后活跃的省市等。

用户模型实时兴趣计算逻辑图

实时兴趣的计算流程

  1. 相应日志获取。从 CardshowLog、PageshowLog、QueryLog 中抽取<用户,contentToken,actionType >等内容。

  2. 映射到对应的内容维度。对于问题、回答、文章、搜索分别获取对应的 Topic 和 Keyword,搜索内容对应的 Topic。在 Redis 中用 contentToken 置换 contentId 后,请求 ContentProfile 获取其对应话题和关键词;对于 Query,调用 TopicMatch 服务,传递搜索内容给服务,服务返回其对应的 Topic;调用 Znlp 的 KeywordExtractorJar 包,传递搜索内容并获得其对应的 Keyword 。

  3. 用户-内容维度汇总。根据用户的行为,在<用户,topic,actionType>和<用户,keyword,actionType>层面进行 groupBy 聚合汇总后,并以 hashmap 的格式存储到 Redis,作为计算用户实时兴趣的基础数据,按时间衰减系数 timeDecay 进行新旧兴趣的 merge 后存储。

  4. 计算兴趣。在用户的历史基础数据上,按一定的 decay 速度进行衰减,按威尔逊置信区间计算用户兴趣 score,并以 Sortedset 的格式存储到 Redis。

关于兴趣计算,已经优化的地方主要是:如何快速的计算平滑参数 alpha 和 beta,如何 daily_update 平滑参数,以及用卡方计算置信度时,是否加入平滑参数等都会对最终的兴趣分值有很大的影响,当 display 为 1 曝光数量不足的情况下,兴趣 score 和 confidence 计算出现 的 bias 问题等。


在线服务

随之知乎日益增加的用户量,以及不断丰富的业务场景和与之相对应出现的调用量上升等,对线上服务的稳定性和请求时延要求也越来越高。 旧服务本身也存在一些问题,比如:

  1. 在线服务直连 HBase,当数据热点的时候,造成某些 Region Server 的负载很高,P95 上升,轻者造成服务抖动,监控图偶发有「毛刺」现象,重者造成服务几分钟的不可用,需要平台技术人员将 Region 从负载较高的 RegionServer 上移走。

  2. 离线任务每次计算完成后一次大批量同时写入离线和在线集群,会加重 HBase 在线集群Region Server 的负载,增大 HBase get 请求的时延,从而影响线上服务稳定性和 P95。

针对问题一,我们在原来的服务架构中增加缓存机制,以此来增强服务的稳定型、减小 Region Server 的负载。

针对问题二,修改了离线计算和多集群数据同步的方式,详见「HBase多集群存储机制」部分。

Cache机制具体实现

没有 Cache 机制时,所有的 get 和 batchGet 方法直接请求到 HBase,具体如下图:

用户模型服务请求序列图

  1. UserProfileServiceApp 启动服务,将收到的请求交由 UserProfileServiceImpl 具体处理

  2. UserProfileServiceImp 根据请求参数,调用 GetTranslator 将 UserProfileRequest.GetRequest 转化成 HBase 中的 Get Object(在 Map 中维护每个 requestField 对应 HBase 中的 tablename,cf,column,prefix 等信息),以格式Map[String, util.List[(AvailField, Get)]]返回。

  3. UserProfileServiceImp 用 Future 异步向 HBase 发送 get 请求,获取到结果返回。

增加 Cache 机制的具体方法,在上面的第二步中,增加一个 CacheMap,用来维护 get 中 AvailField 对应 Cache 中的 key,key 的组成格式为:「 tablename 缩写| columnfamily 缩写| columnname 缩写| rowkey 全写」。这里使用的 Redis 数据结构主要有两种,SortedSet 和 Key-Value对。服务端收到请求后先去转化 requestField 为 Cache 中的 key,从 Cache 中获取数据。对于没有获取到 requestField 的转化成 GetObject,请求 HBase 获取,将结果保存到 Cache 中并返回。

最终效果

用户模型的访问量大概为 100K QPS,每个请求转化为多个 get 请求。 增加 Cache 前 get 请求的 P95 为30ms,增加 Cache 后降低到小于 15ms,Cache 命中率 90% 以上。


HBase 多集群存储机制

离线任务和 Streaming 计算主要采用 Spark 计算实现, 结果保存到 HBase 的几种方式:

方法一:每次一条

1. 每次写进一条,调用 API 进行存储的代码如下:


val hbaseConn = ConnectionFactory.createConnection(hbaseConf)
val table = hbaseConn.getTable(TableName.valueOf("word"))
x.foreach(value => {
    var put = new Put(Bytes.toBytes(value.toString))
    put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(value.toString))
    table.put(put)
})

方法二:批量写入

2. 批量写入 HBase,使用的 API:


/**
   * {@inheritDoc}
   * @throws IOException
   */

  @Override
  public void put(final List<Put> puts) throws IOException{
    getBufferedMutator().mutate(puts);
    if (autoFlush) {
      flushCommits();
    }
  }

方法三:MapReduce 的 saveAsNewAPIHadoopDataset 方式写入

3. saveAsNewAPIHadoopDataset 是通用的保存到 Hadoop 存储系统的方法,调用 org.apache.hadoop.mapreduce.RecordWriter 实现。org.apache.hadoop.hbase.mapreduce.TableOutputFormat.TableRecordWriter 是其在 HBase 中的实现类。底层通过调用 hbase.client.BufferedMutator.mutate() 方式保存。


val rdd = sc.makeRDD(Array(1)).flatMap(_ => 0 to 1000000)
rdd.map(x => {
  var put = new Put(Bytes.toBytes(x.toString))
  put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(x.toString))
  (new ImmutableBytesWritable, put)
}).saveAsHadoopDataset(jobConf)

/**
 * Writes a key/value pair into the table.
 * @throws IOException When writing fails.
 */

@Override
public void write(KEY key, Mutation value)
throws IOException {
  if (!(value instanceof Put) && !(value instanceof Delete)) {
    throw new IOException("Pass a Delete or a Put");
  }
  mutator.mutate(value);
}

方法四:BulkLoad 方式

4. BulkLoad 方式,创建 HFiles,调用 LoadIncrementalHFiles 作业将它们移到 HBase 表中。

首先需要根据表名 getRegionLocator 得到 RegionLocator,根据 RegionLocator 得到 partition,因为在 HFile 中是有序的所以,需要调用 rdd.repartitionAndSortWithinPartitions(partitioner) 将 rdd 重新排序。

HFileOutputFormat2.configureIncrementalLoad(job,table, regionLocator) 进行任务增量Load 到具体表的配置 实现并执行映射( 并减少) 作业,使用 HFileOutputFormat2 输出格式将有序的放置或者 KeyValue 对象写入HFile文件。Reduce阶段通过调用 HFileOutputFormat2.configureIncrementalLoad 配置在场景后面。执行LoadIncrementalHFiles 作业将 HFile 文件移动到系统文件。


static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator,
  Class<? extends OutputFormat<?, ?>> cls) throws IOException {
Configuration conf = job.getConfiguration();

job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(KeyValue.class);
job.setOutputFormatClass(cls);

// Based on the configured map output class, set the correct reducer to properly
// sort the incoming values.
if (KeyValue.class.equals(job.getMapOutputValueClass())) {
  job.setReducerClass(KeyValueSortReducer.class);
else if (Put.class.equals(job.getMapOutputValueClass())) {
  job.setReducerClass(PutSortReducer.class);
else if (Text.class.equals(job.getMapOutputValueClass())) {
  job.setReducerClass(TextSortReducer.class);
else {
  LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
}

conf.setStrings("io.serializations", conf.get("io.serializations"),
    MutationSerialization.class.getName(), ResultSerialization.class.getName(),
    KeyValueSerialization.class.getName());

configurePartitioner(job, startKeys);
// Set compression algorithms based on column families
configureCompression(table, conf);
configureBloomType(table, conf);
configureBlockSize(table, conf);
configureDataBlockEncoding(table, conf);

TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.initCredentials(job);
LOG.info("Incremental table " + table.getName() + " output configured.");
}

public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator)
  throws IOException {
configureIncrementalLoad(job, table, regionLocator, HFileOutputFormat2.class);
}
val hFileLoader = new LoadIncrementalHFiles(conf)
hFileLoader.doBulkLoad(hFilePath, new HTable(conf, table.getName))

将 HFile 文件 Bulk Load 到已存在的表中。 由于 HBase 的 BulkLoad 方式是绕过了 Write to WAL,Write to MemStore 及 Flush to disk 的过程,所以并不能通过 WAL 来进行一些复制数据的操作。 由于 Bulkload 方式还是对集群 RegionServer 造成很高的负载,最终采用方案三,下面是两个集群进行数据同步。

存储同步机制

技术选型 HBase 常见的 Replication 方法有 SnapShot、CopyTable/Export、BulkLoad、Replication、应用层并发读写等。 应用层并发读写 优点:应用层可以自由灵活控制对 HBase写入速度,打开或关闭两个集群间的同步,打开或关闭两个集群间具体到表或者具体到列簇的同步,对 HBase 集群性能的影响最小,缺点是增加了应用层的维护成本。 初期没有更好的集群数据同步方式的时候,用户模型和内容模型自己负责两集群间的数据同步工作。

用户模型存储多机房同步架构图

具体实现细节

第一步:定义用于在 Kafka 的 Producer 和 Consumer 中流转的统一数据 Protobuf 格式


message ColumnValue {
required bytes qualifier = 1;
......
}

message PutMessage {
required string tablename = 1;
......
}

第二步:发送需要同步的数据到 Kafka,(如果有必要,需要对数据做相应的格式处理),这里对数据的处理,有两种方式。 第一种:如果程序中有统一的存储到 HBase 的工具(另一个项目是使用自定义的 HBaseHandler,业务层面只生成 tableName,rowKey,columnFamily,column 等值,由 HBaseHandler 统一构建成 Put 对象,并保存 HBase 中),这种方式在业务层面改动较小,理论上可以直接用原来的格式发给 Kafka,但是如果 HBaseHandler 处理的格式和 PutMessage 格式有不符的地方,做下适配即可。


/**
* tableName: hbase table name
* rdd: RDD[(rowkey, family, column, value)]
*/

def convert(tableName: Stringrdd: RDD): RDD = {
rdd.map {
  case (rowKey: String, family: String, column: String, value: Array[Byte]) =>
    val message = KafkaMessages.newBuilder()
    val columnValue = ColumnValue.newBuilder()
    columnValue.set
     ......
    (rowKey, message.build().toByteArray)
 }
}

第二种:程序在 RDD 中直接构建 HBase 的 Put 对象,调用 PairRDD 的 saveAsNewAPIHadoopDataset 方法保存到 HBase 中。此种情况,为了兼容已有的代码,做到代码和业务逻辑的改动最小,发送到 Kafka 时,需要将 Put 对象转换为上面定义的 PutMessage Protobuf 格式,然后发送给 Kafka。


/**
* tableName: hbase table namne
* rdd: RDD[(rowKey, put)]
*/

def convert(tableName: StringfamilyNamesArray[String], rdd: RDD): RDD = {
rdd.map {
  case (_, put: Put) =>
    val message = PutMessage.newBuilder()
    for(familyName <- familyNames){
      if(put.getFamilyMap().get(Bytes.toBytes(familyName))!=null){
      val keyValueList = put.getFamilyMap()
        .asInstanceOf[java.util.ArrayList[KeyValue]].asScala
        for( keyvalue <- keyValueList){
          message.setRowkey(ByteString.copyFrom(keyvalue.getRow))
        ......
        }
        message.setTablename(tableName)
      }
    }
    (null, message.build().toByteArray)
 }
}

第三步:发送到 Kafka,不同的表发送到不同的 Topic,对每个 Topic 的消费做监控。


/**
*发送 rdd 中的内容到 brokers 的指定 topic 中
*tableName: hbase table namne
*rdd: RDD[(rowKey, put)]
*/
def send[T](brokers: String,
               rdd: RDD[(String, T)],
               topic: String)(implicit cTag: ClassTag[T]): Unit = {
  rdd.foreachPartition(partitionOfRecords => {
      val producer = getProducer[T](brokers)
      partitionOfRecords.map(r => new ProducerRecord[String, T](topic, r._1, r._2))
        .foreach(m => producer.send(m))
      producer.close()
  })
}

第四步:另启动 Streaming Consumer 或者服务消费 Kafka 中内容,将 putMessage 的 Protobuf 格式转成 HBase 的 put 对象,同时写入到在线 HBase 集群中。 Streaming 消费Kafka ,不同的表发送到不同的 Topic,对每个 Topic 的消费做监控。


val toHBaseTagsTopic = validKafkaStreamTagsTopic.map {
      record =>
        val tableName_r = record.getTablename()
        val put = new Put(record.getRowkey.toByteArray)
        for (cv <- record.getColumnsList) {
          put.addColumn(record.getFamily.toByteArray)
          ......
        }
        if(put.isEmpty){
          (new ImmutableBytesWritable(), null)
        }else{
          (new ImmutableBytesWritable(), put)
        }
    }.filter(_._2!=null)
    if(!isClean) {
      toHbaseTagsTopic.foreachRDD { rdd =>
        rdd.saveAsNewAPIHadoopDataset(
          AccessUtils.createOutputTableConfiguration(
            constants.Constants.NAMESPACE + ":" + constants.Constants.TAGS_TOPIC_TABLE_NAME
          )
        )
      }
   }

如下为另一种启动服务消费 Kafka 的方式。


val consumer = new KafkaConsumer[String, Array[Byte]](probs)
consumer.subscribe(topics)
val records = consumer.poll(100)
for (p <- records.partitions) {
   val recordsOfPartition = records.records(p)
   recordsOfPartition.foreach { r =>
      Try(KafkaMessages.parseFrom(r.value())) match {
         case Success(record) =>
            val tableName = record.getTableName 
            if (validateTables.contains(tableName)) {
               val messageType = record.getType
               ......
               try {
                  val columns = record.getColumnsList.map(c => (c.getColumn, c.getValue.toByteArray)).toArray
                   HBaseHandler.write(tableName)
                ......
               } catch {
                  case ex: Throwable =>
                    LOG.error("write hbase fail")
                    HaloClient.increment(s"content_write_hbase_fail")
               }
            } else {
              LOG.error(s"table $tableName is valid")
            }
         }
      }
      //update offset
      val lastOffset = recordsOfPartition.get(recordsOfPartition.size - 1).offset()
      consumer.commitSync(java.util.Collections.singletonMap(p, new OffsetAndMetadata(lastOffset + 1)))
}

结语

最后,目前采用的由应用控制和管理在线离线集群的同步机制,在随着平台多机房项目的推动下,平台将推出 HBase 的统一同步机制 HRP (HBase Replication Proxy),届时业务部门可以将更多的时间和精力集中在模型优化层面。

Reference

[1]HBase Cluster Replication
[2]通过 BulkLoad 快速将海量数据导入到 HBase[3]HBase Replication 源码分析
[4]HBase 源码之 TableRecordWriter
[5]HBase 源码之 TableOutputFormat
[6]Spark2.1.1写入 HBase 的三种方法性能对比

原文地址:

https://zhuanlan.zhihu.com/p/45907950

(*本文仅代表作者独立观点,转载请联系原作者)

公开课预告

强化学习

本课程是一次理论+实战的结合,将重点介绍强化学习的模型原理以及A3C模型原理,最后通过实践落实强化学习在游戏中的应用。

推荐阅读

  • 网友们票选的2018 Best Paper,你pick谁?

  • 拼多多黄峥给陆奇“兼职”,欲挖掘这类AI人才

  • 五个Python编程Tips,帮你提高编码效率

  • 用这个Python库,训练你的模型成为下一个街头霸王!

  • OpenStack 2018 年终盘点

  • 难逃寒冬裁员的“大追杀”,30 岁女码农该何去何从?

  • 华为员工 iPhone 发文遭罚;百度遭约谈勒令整改;锤子 1577 万元被法院保全 | 极客头条

  • 从倾家荡产到身价百亿,这个85后只用了8年

    行!程序员千万别学算法!

两亿多用户,六大业务场景,知乎AI用户模型服务性能如何优化?相关推荐

  1. 【工业智能】人工智能之于工业,应当是融入者而非颠覆者;记一场工业场景下的AI技术实践

    2018年1月13日,由极客邦科技InfoQ中国主办的AICon全球人工智能与机器学习技术大会在北京国际会议中心召开.此次大会以"助力人工智能落地"为主题,汇聚了国内外知名企业和顶 ...

  2. 最佳案例 | 游戏知几 AI 助手的云原生容器化之路

    作者 张路,运营开发专家工程师,现负责游戏知几 AI 助手后台架构设计和优化工作. 游戏知几 随着业务不断的拓展,游戏知几AI智能问答机器人业务已经覆盖了自研游戏.二方.海外的多款游戏.游戏知几研发团 ...

  3. 软银计划大幅减持阿里巴巴;美国将12家中国芯片贸易商纳入“实体清单”;知乎发布中文大模型“知海图AI”丨每日大事件...

    ‍ ‍数据智能产业创新服务媒体 --聚焦数智 · 改变商业 企业动态 百度Apollo将在上海车展发布智能汽车开放方案 4月13日,据科创板日报报道,百度Apollo将现身即将开幕的上海车展,并于4月 ...

  4. TiDB 在金融关键业务场景的实践

    TiDB 作为一款高效稳定的开源分布式数据库,在国内外的银行.证券.保险.在线支付和金融科技行业得到了普遍应用,并在约 20 多种不同的金融业务场景中支撑着用户的关键计算.本篇文章将为大家介绍分布式关 ...

  5. NLP最新趋势,7个主流业务场景!

    1 深度之眼NLP项目实战安排 ⭐BAT级工程部署 项目意义:工程化部署是程序在开发完成之后,到线上正式运行整个过程中涉及到的多个环节的统称,主要包括:测试.GPU的分配和使用.微服务的封装.Dock ...

  6. 业务分析系列主题:做设计时,怎样理解和构建业务场景闭环?

    在很长一段时期,产品经理和设计师在谈产品体验时,更多的是关注于用户本身,如今开始更加深入地探究用户和产品所处的业务场景,这样视野更大. 将业务场景纳入整个产品设计体系中,其实是将以往设计过程中被忽略的 ...

  7. 数商云SCM系统供应商准入协同业务场景 | 助力建筑建材企业规范供应商准入环节

    建筑建材行业是拉动国民经济发展的重要支柱产业,其上下游产业包括建筑业.房地产业.设备制造业.运输物流业等,其中房地产和建筑业成为直接拉动我国建筑建材产业发展的重点产业,近几年来出现经济不景气.萎靡不振 ...

  8. TiDB 在金融行业关键业务场景的实践(下篇)

    TiDB 作为一款高效稳定的开源分布式数据库,在国内外的银行.证券.保险.在线支付和金融科技行业得到了普遍应用,并在约 20 多种不同的金融业务场景中支撑着用户的关键计算.在TiDB 在金融行业关键业 ...

  9. 【金猿产品展】诸葛用户数据分析平台(Insight)——聚焦业务场景数据应用价值挖掘,赋能精细化运营...

    诸葛io产品 本产品由诸葛io投递并参与"数据猿年度金猿策划活动--2020大数据产业创新服务产品榜单及奖项"评选. 大数据产业创新服务媒体 --聚焦数据 · 改变商业 诸葛用户数 ...

最新文章

  1. 热点 | Excel不“香”了,数据分析首选Pyhton!
  2. tf.train.Saver函数的用法之保存全部变量和模型
  3. form提交后台注解拿不到数据_Form表单详解
  4. c语言程序编写一朵花,一朵花(中英双语)
  5. java oop入门_Java OOP入门起源
  6. vue服务端转html,普通vue-cli初始项目转为服务端渲染SSR
  7. ecshop修改后台登陆密码
  8. asponse.word 设置全局段前段后信息_一步步编写操作系统 12 代码段、数据段、栈和cpu寄存器的关系...
  9. ISO 7064:1983.MOD11-2校验码计算法(身份证18位效验码计算)
  10. B站视频下载(含bv快速变回av)
  11. ruby 之watir
  12. 软件需求包括3个不同的层次 业务需求 用户需求和功能需求
  13. 使用记录6_发布微信小游戏
  14. wi-fi频宽设置_如何设置TP-Link Wi-Fi智能插头
  15. Android apk安全监测及加固方案
  16. python for i in 字符串_python中for in的用法
  17. 我懒蛋又回来了!-PDO
  18. 判断一个人能否胜任团队leader,就看这一点,转载
  19. 一周自学动态网站设计
  20. “时刻准备下岗”的互联网从业者:有何本事留下?

热门文章

  1. 8500WN流畅高速上网高端卡 12核心不锁倍频
  2. 下一代安全威胁的内幕故事
  3. 4g8核支持多少php进程,服务器8核16g内存,同时有1000多人在抢红包,有502怎么办?...
  4. 二维前缀和+差分 HDU6514 Monitor
  5. Spring AOP无法拦截内部方法调用-- expose-proxy=true用法
  6. WC2018 CCF程序设计教学比赛记事
  7. coursera 《现代操作系统》 -- 第十一周 IO系统
  8. 面试题05-UI控件
  9. 深入解析CSS样式层叠权重值
  10. HDU 1556 Color the ball