hbase 生成文件

在本文中,我们将探索HBase来存储客户搜索点击事件数据,并利用其基于搜索查询字符串和构面过滤器点击来获取客户行为信息。 我们将介绍使用MiniHBaseCluster,HBase Schema设计,使用HBaseSink与Flume集成以存储JSON数据。

在之前的文章的基础上,

  • 客户产品搜索使用大数据进行点击分析 ,
  • Flume:使用Apache Flume收集客户产品搜索点击数据 ,
  • Hive:使用Apache Hive查询客户最喜欢的搜索查询和产品视图计数 ,
  • ElasticSearch-Hadoop:将产品视图计数和从Hadoop到ElasticSearch的客户顶部搜索查询建立索引 ,
  • Oozie:为Hive分区和ElasticSearch索引安排协调器/捆绑作业 ,
  • Spark:针对大数据的实时分析,可用于热门搜索查询和热门产品视图

我们已经探索了将搜索点击事件数据存储在Hadoop中,并使用不同的技术对其进行查询。 在这里,我们将使用HBase实现相同的目的:

  • HBase小型集群设置
  • 使用Spring Data的HBase模板
  • HBase模式设计
  • 使用HBaseSink进行Flume集成
  • HBaseJsonSerializer序列化json数据
  • 查询过去一个小时的前10个搜索查询字符串
  • 查询过去一个小时的前10个搜索方面过滤器
  • 获取最近30天内客户的最近搜索查询字符串

HBase的

HBase “是Hadoop数据库,一个分布式,可扩展的大数据存储。”

HBaseMiniCluster / MiniZookeperCluster

要设置和启动小型集群,请检查HBaseServiceImpl.java

...miniZooKeeperCluster = new MiniZooKeeperCluster();miniZooKeeperCluster.setDefaultClientPort(10235);miniZooKeeperCluster.startup(new File("taget/zookeper/dfscluster_" + UUID.randomUUID().toString()).getAbsoluteFile());...Configuration config = HBaseConfiguration.create();config.set("hbase.tmp.dir", new File("target/hbasetom").getAbsolutePath());config.set("hbase.master.port", "44335");config.set("hbase.master.info.port", "44345");config.set("hbase.regionserver.port", "44435");config.set("hbase.regionserver.info.port", "44445");config.set("hbase.master.distributed.log.replay", "false");config.set("hbase.cluster.distributed", "false");config.set("hbase.master.distributed.log.splitting", "false");config.set("hbase.zookeeper.property.clientPort", "10235");config.set("zookeeper.znode.parent", "/hbase");miniHBaseCluster = new MiniHBaseCluster(config, 1);miniHBaseCluster.startMaster();...

MiniZookeeprCluster在客户端端口10235上启动,所有客户端连接都将在此端口上。 确保将hbase服务器端口配置为不与其他本地hbase服务器冲突。 在这里,我们仅在测试案例中启动一台hbase区域服务器。

使用Spring数据的HBase模板

我们将使用Spring hbase模板连接到HBase集群:

<hdp:hbase-configuration id="hbaseConfiguration" configuration-ref="hadoopConfiguration" stop-proxy="false" delete-connection="false" zk-quorum="localhost" zk-port="10235"></hdp:hbase-configuration><bean id="hbaseTemplate" class="org.springframework.data.hadoop.hbase.HBaseTemplate" p:configuration-ref="hbaseConfiguration" />

HBase表架构设计

我们具有以下格式的搜索点击事件JSON数据,

{"eventid":"24-1399386809805-629e9b5f-ff4a-4168-8664-6c8df8214aa7","hostedmachinename":"192.168.182.1330","pageurl":"http://blahblah:/5&quot ;,"customerid":24,"sessionid":"648a011d-570e-48ef-bccc-84129c9fa400","querystring":null,"sortorder":"desc","pagenumber":3,"totalhits":28,"hitsshown":7,"createdtimestampinmillis":1399386809805,"clickeddocid":"41","favourite":null,"eventidsuffix":"629e9b5f-ff4a-4168-8664-6c8df8214aa7","filters":[{"code":"searchfacettype_color_level_2","value":"Blue"},{"code":"searchfacettype_age_level_2","value":"12-18 years"}]}

处理数据的一种方法是将其直接存储在一个列族和json列下。 这样扫描json数据将变得不那么容易和灵活。 另一种选择是将其存储在一个列族下,但具有不同的列。 但是将过滤器数据存储在单列中将很难进行扫描。 下面的混合方法是将其划分为多个列族,并动态生成用于过滤器数据的列。

转换后的架构为:

{
"client:eventid" => "24-1399386809805-629e9b5f-ff4a-4168-8664-6c8df8214aa7",
"client:eventidsuffix" => "629e9b5f-ff4a-4168-8664-6c8df8214aa7",
"client:hostedmachinename" => "192.168.182.1330",
"client:pageurl" => "http://blahblah:/5",
"client:createdtimestampinmillis" => 1399386809805,
"client:cutomerid" => 24,
"client:sessionid" => "648a011d-570e-48ef-bccc-84129c9fa400",
"search:querystring" => null,
"search:sortorder" => desc,
"search:pagenumber" => 3,
"search:totalhits" => 28,
"search:hitsshown" => 7,
"search:clickeddocid" => "41",
"search:favourite" => null,
"filters:searchfacettype_color_level_2" => "Blue",
"filters:searchfacettype_age_level_2" => "12-18 years"
}

将创建以下三列系列:

  • 客户 :存储事件的客户和客户数据特定信息。
  • search :与查询字符串和分页信息有关的搜索信息存储在此处。
  • 过滤器:为了支持将来的其他构面等以及更灵活的数据扫描,将基于构面名称/代码动态创建列名称,并将列值存储为构面过滤器值。

要创建hbase表,

...TableName name = TableName.valueOf("searchclicks");HTableDescriptor desc = new HTableDescriptor(name);desc.addFamily(new HColumnDescriptor(HBaseJsonEventSerializer.COLUMFAMILY_CLIENT_BYTES));desc.addFamily(new HColumnDescriptor(HBaseJsonEventSerializer.COLUMFAMILY_SEARCH_BYTES));desc.addFamily(new HColumnDescriptor(HBaseJsonEventSerializer.COLUMFAMILY_FILTERS_BYTES));try {HBaseAdmin hBaseAdmin = new HBaseAdmin(miniHBaseCluster.getConf());hBaseAdmin.createTable(desc);hBaseAdmin.close();} catch (IOException e) {throw new RuntimeException(e);}...

在创建表时已添加了相关列族,以支持新的数据结构。 通常,建议尽量减少列族的数量,请记住如何根据使用情况来构造数据。 根据以上示例,我们将扫描场景保持为:

  • 如果您想根据网站上的总访问量信息来检索客户或客户信息,请扫描客户家庭。
  • 扫描搜索信息,以查看最终客户正在寻找哪些免费文本搜索,而导航搜索无法满足这些需求。 请参阅在哪个页面上单击了相关产品,您是否需要加强应用才能将产品推高。
  • 扫描过滤器系列,以了解导航搜索如何为您服务。 是否为最终客户提供他们想要的产品。 查看更多点击哪些构面过滤器,您是否需要在订购中提高一点以便于客户轻松使用。
  • 应避免在家庭之间进行扫描,而应使用行键设计来获得特定的客户信息。

行键设计信息

在我们的例子中,行键设计基于customerId-timestamp -randomuuid 。 由于所有列族的行键均相同,因此我们可以使用“前缀过滤器”对仅与特定客户相关的行进行过滤。

final String eventId = customerId + "-" +  searchQueryInstruction.getCreatedTimeStampInMillis() + "-" + searchQueryInstruction.getEventIdSuffix();
...
byte[] rowKey = searchQueryInstruction.getEventId().getBytes(CHARSET_DEFAULT);
...
# 24-1399386809805-629e9b5f-ff4a-4168-8664-6c8df8214aa7

这里的每个列族都有相同的行键,并且您可以使用前缀过滤器仅扫描特定客户的行。

水槽整合

HBaseSink用于将搜索事件数据直接存储到HBase。 检查详细信息FlumeHBaseSinkServiceImpl.java

...channel = new MemoryChannel();Map<String, String> channelParamters = new HashMap<>();channelParamters.put("capacity", "100000");channelParamters.put("transactionCapacity", "1000");Context channelContext = new Context(channelParamters);Configurables.configure(channel, channelContext);channel.setName("HBaseSinkChannel-" + UUID.randomUUID());sink = new HBaseSink();sink.setName("HBaseSink-" + UUID.randomUUID());Map<String, String> paramters = new HashMap<>();paramters.put(HBaseSinkConfigurationConstants.CONFIG_TABLE, "searchclicks");paramters.put(HBaseSinkConfigurationConstants.CONFIG_COLUMN_FAMILY, new String(HBaseJsonEventSerializer.COLUMFAMILY_CLIENT_BYTES));paramters.put(HBaseSinkConfigurationConstants.CONFIG_BATCHSIZE, "1000");paramters.put(HBaseSinkConfigurationConstants.CONFIG_SERIALIZER, HBaseJsonEventSerializer.class.getName());Context sinkContext = new Context(paramters);sink.configure(sinkContext);sink.setChannel(channel);sink.start();channel.start();...

客户端列系列仅用于HBaseSink的验证。

HBaseJsonEventSerializer

创建自定义序列化器以存储JSON数据:

public class HBaseJsonEventSerializer implements HBaseEventSerializer {public static final byte[] COLUMFAMILY_CLIENT_BYTES = "client".getBytes();public static final byte[] COLUMFAMILY_SEARCH_BYTES = "search".getBytes();public static final byte[] COLUMFAMILY_FILTERS_BYTES = "filters".getBytes();...byte[] rowKey = searchQueryInstruction.getEventId().getBytes(CHARSET_DEFAULT);Put put = new Put(rowKey);// Client Inforput.add(COLUMFAMILY_CLIENT_BYTES, "eventid".getBytes(), searchQueryInstruction.getEventId().getBytes());...if (searchQueryInstruction.getFacetFilters() != null) {for (SearchQueryInstruction.FacetFilter filter : searchQueryInstruction.getFacetFilters()) {put.add(COLUMFAMILY_FILTERS_BYTES, filter.getCode().getBytes(),filter.getValue().getBytes());}}...

检查更多详细信息, HBaseJsonEventSerializer.java

事件主体从Json转换为Java bean,并进一步处理数据以在相关的列系列中进行序列化。

查询原始单元格数据

要查询原始单元格数据:

...Scan scan = new Scan();scan.addFamily(HBaseJsonEventSerializer.COLUMFAMILY_CLIENT_BYTES);scan.addFamily(HBaseJsonEventSerializer.COLUMFAMILY_SEARCH_BYTES);scan.addFamily(HBaseJsonEventSerializer.COLUMFAMILY_FILTERS_BYTES);List<String> rows = hbaseTemplate.find("searchclicks", scan,new RowMapper<String>() {@Overridepublic String mapRow(Result result, int rowNum) throws Exception {return Arrays.toString(result.rawCells());}});for (String row : rows) {LOG.debug("searchclicks table content, Table returned row: {}", row);}

有关详细信息,请检查HBaseServiceImpl.java 。

数据以以下格式存储在hbase中:

searchclicks table content, Table returned row: [84-1404832902498-7965306a-d256-4ddb-b7a8-fd19cdb99923/client:createdtimestampinmillis/1404832918166/Put/vlen=13/mvcc=0, 84-1404832902498-7965306a-d256-4ddb-b7a8-fd19cdb99923/client:customerid/1404832918166/Put/vlen=2/mvcc=0, 84-1404832902498-7965306a-d256-4ddb-b7a8-fd19cdb99923/client:eventid/1404832918166/Put/vlen=53/mvcc=0, 84-1404832902498-7965306a-d256-4ddb-b7a8-fd19cdb99923/client:hostedmachinename/1404832918166/Put/vlen=16/mvcc=0, 84-1404832902498-7965306a-d256-4ddb-b7a8-fd19cdb99923/client:pageurl/1404832918166/Put/vlen=19/mvcc=0, 84-1404832902498-7965306a-d256-4ddb-b7a8-fd19cdb99923/client:sessionid/1404832918166/Put/vlen=36/mvcc=0, 84-1404832902498-7965306a-d256-4ddb-b7a8-fd19cdb99923/filters:searchfacettype_product_type_level_2/1404832918166/Put/vlen=7/mvcc=0, 84-1404832902498-7965306a-d256-4ddb-b7a8-fd19cdb99923/search:hitsshown/1404832918166/Put/vlen=2/mvcc=0, 84-1404832902498-7965306a-d256-4ddb-b7a8-fd19cdb99923/search:pagenumber/1404832918166/Put/vlen=1/mvcc=0, 84-1404832902498-7965306a-d256-4ddb-b7a8-fd19cdb99923/search:querystring/1404832918166/Put/vlen=13/mvcc=0, 84-1404832902498-7965306a-d256-4ddb-b7a8-fd19cdb99923/search:sortorder/1404832918166/Put/vlen=3/mvcc=0, 84-1404832902498-7965306a-d256-4ddb-b7a8-fd19cdb99923/search:totalhits/1404832918166/Put/vlen=2/mvcc=0]

查询过去一个小时的前10个搜索查询字符串

要仅查询搜索字符串,我们只需要搜索列族。 要在时间范围内进行扫描,我们可以使用client列系列创建的timestampinmillis列,但这将是扩展扫描。

...Scan scan = new Scan();scan.addColumn(HBaseJsonEventSerializer.COLUMFAMILY_CLIENT_BYTES, Bytes.toBytes("createdtimestampinmillis"));scan.addColumn(HBaseJsonEventSerializer.COLUMFAMILY_SEARCH_BYTES, Bytes.toBytes("querystring"));List<String> rows = hbaseTemplate.find("searchclicks", scan,new RowMapper<String>() {@Overridepublic String mapRow(Result result, int rowNum) throws Exception {String createdtimestampinmillis = new String(result.getValue(HBaseJsonEventSerializer.COLUMFAMILY_CLIENT_BYTES, Bytes.toBytes("createdtimestampinmillis")));byte[] value = result.getValue(HBaseJsonEventSerializer.COLUMFAMILY_SEARCH_BYTES, Bytes.toBytes("querystring"));String querystring = null;if (value != null) {querystring = new String(value);}if (new DateTime(Long.valueOf(createdtimestampinmillis)).plusHours(1).compareTo(new DateTime()) == 1 && querystring != null) {return querystring;}return null;}});...//sort the keys, based on counts collection of the query strings.List<String> sortedKeys = Ordering.natural().onResultOf(Functions.forMap(counts)).immutableSortedCopy(counts.keySet());...

查询过去一个小时的前10个搜索方面过滤器

基于动态列创建,您可以扫描数据以返回点击次数最高的构面过滤器。

动态列将基于您的方面代码,该代码可以是以下任何一种:

#searchfacettype_age_level_1#searchfacettype_color_level_2#searchfacettype_brand_level_2#searchfacettype_age_level_2for (String facetField : SearchFacetName.categoryFacetFields) {scan.addColumn(HBaseJsonEventSerializer.COLUMFAMILY_FILTERS_BYTES, Bytes.toBytes(facetField));}

检索到:

...hbaseTemplate.find("searchclicks", scan, new RowMapper<String>() {@Overridepublic String mapRow(Result result, int rowNum) throws Exception {for (String facetField : SearchFacetName.categoryFacetFields) {byte[] value = result.getValue(HBaseJsonEventSerializer.COLUMFAMILY_FILTERS_BYTES, Bytes.toBytes(facetField));if (value != null) {String facetValue = new String(value);List<String> list = columnData.get(facetField);if (list == null) {list = new ArrayList<>();list.add(facetValue);columnData.put(facetField, list);} else {list.add(facetValue);}}}return null;}});...

您将获得所有构面的完整列表,可以进一步处理数据以计算顶面并对其进行排序。 有关完整的详细信息,请检查HBaseServiceImpl.findTopTenSearchFiltersForLastAnHour

获取客户的最近搜索查询字符串

如果需要检查客户正在寻找什么,我们可以在“客户”和“搜索”之间的两个列族之间创建扫描。 或者,另一种方式是设计行键,以便为您提供相关信息。 在我们的例子中,行键设计基于CustomerId_timestamp _randomuuid。 由于所有列族的行键均相同,因此我们可以使用“前缀过滤器”对仅与特定客户相关的行进行过滤。

final String eventId = customerId + "-" +  searchQueryInstruction.getCreatedTimeStampInMillis() + "-" + searchQueryInstruction.getEventIdSuffix();
...
byte[] rowKey = searchQueryInstruction.getEventId().getBytes(CHARSET_DEFAULT);
...
# 84-1404832902498-7965306a-d256-4ddb-b7a8-fd19cdb99923

要扫描特定客户的数据,

...Scan scan = new Scan();scan.addColumn(HBaseJsonEventSerializer.COLUMFAMILY_SEARCH_BYTES, Bytes.toBytes("customerid"));Filter filter = new PrefixFilter(Bytes.toBytes(customerId + "-"));scan.setFilter(filter);...

有关详细信息,请检查HBaseServiceImpl.getAllSearchQueryStringsByCustomerInLastOneMonth

希望这可以帮助您入门HBase模式设计和处理数据。

翻译自: https://www.javacodegeeks.com/2014/07/hbase-generating-search-click-events-statistics-for-customer-behavior.html

hbase 生成文件

hbase 生成文件_HBase:为客户行为生成搜索点击事件统计信息相关推荐

  1. HBase:为客户行为生成搜索点击事件统计信息

    在本文中,我们将探索HBase来存储客户搜索点击事件数据,并利用其基于搜索查询字符串和构面过滤器点击来获取客户行为信息. 我们将介绍如何使用MiniHBaseCluster,HBase Schema设 ...

  2. 【语言-批处理】生成文件时,文件名变化生成(例如:yuan1.txt、yuan2.txt、yuan3.txt...)

    按123生成文件.bat ::set a = 1 ::for ::if exist %a%.txt() else ( echo 1>1.txt) ::if exist *.txt( del *. ...

  3. 文件夹自动生成html画廊,【桌面自动生成文件夹】桌面自动生成网站_桌面自动生成tmp文件-系统城...

    2017-01-26 13:59:36 浏览量:11190 有的用户发现自己的电脑系统中,总是会在桌面上生成一个名称为MobileFile的文件夹.其实,这个MobileFile文件夹是QQ相关的文件 ...

  4. oracle spool生成文件,用spool+unix shell生成文本文件

    比较熟悉使用ORACLE的人一般都会用spool命令来生成OS下的文本文件. 例如我们把scott.dept表生成文本文件的语句写成dept.sql,内容如下: set pages 50000; se ...

  5. linux生成文件自带时间,linux生成固定日期文件及删除一定日期前的文件

    一.按照一定日期格式命名文件 1.按照一定的格式输出日期: date +"%y%m%d" 格式说明: % : 印出 % %n : 下一行 %t : 跳格 %H : 小时(00-23 ...

  6. java 清单文件 生成,使用批处理文件生成文件列表清单

    使用批处理文件生成文件列表清单 使用批处理文件生成文件列表清单,友友们都会遇到这样的情况:想要复制一个文件夹下所有文件的名字,或者将所有文件的名字列出一个清单,用手工复制的话吃力不讨好还容易出差错. ...

  7. apache flume_Flume:使用Apache Flume收集客户产品搜索点击数据

    apache flume 这篇文章涵盖了使用Apache flume收集客户产品搜索点击并使用hadoop和elasticsearch接收器存储信息. 数据可能包含不同的产品搜索事件,例如基于不同方面 ...

  8. Flume:使用Apache Flume收集客户产品搜索点击数据

    这篇文章涵盖了使用Apache flume收集客户产品搜索点击并使用hadoop和elasticsearch接收器存储信息. 数据可能包含不同的产品搜索事件,例如基于不同方面的过滤,排序信息,分页信息 ...

  9. linux 进程间通信 dbus-glib【实例】详解四(上) C库 dbus-glib 使用(附代码)(编写接口描述文件.xml,dbus-binding-tool工具生成绑定文件)(列集散集函数)

    linux 进程间通信 dbus-glib[实例]详解一(附代码)(d-feet工具使用) linux 进程间通信 dbus-glib[实例]详解二(上) 消息和消息总线(附代码) linux 进程间 ...

最新文章

  1. ArrayList之坑点
  2. 最后一个 IPV4 地址分配完毕,正式向IPV6过渡!
  3. div+css盒子居中
  4. 怎么判断网络回路_电源纹波要怎么测?
  5. java final修改器_Java中的“ final”关键字如何工作?(我仍然可以修改对象。)...
  6. 台达vfd一ⅴe变频说明书_PLC运动控制实例解析:PLC与变频器系统
  7. 最新美团JS逆向分析(_token参数)
  8. Java开发全套学习!java判断字符串中是否包含中文
  9. python3库安装_Python3 Requirements库安装过程
  10. TCP的三次握手和四次挥手(超详解)
  11. comsol 4.4 matlab,如何使用COMSOL with MATLAB的清单
  12. 今日头条信息流 - 广告策略
  13. 强烈推荐一款好用的API接口
  14. python群发邮件
  15. 让人喷血的92条个性签名!!!
  16. DELL笔记本插入耳机没反应
  17. 设计模式之代理模式(第二篇)
  18. 分享一些可以调研B端产品的网站,建议收藏
  19. 为什么要学网络安全?如何学习网络安全?这3个理由告诉你(自己整理的50G网安资料)
  20. QT 管理win系统服务

热门文章

  1. C. Code a Trie(Trie+dfs+贪心)
  2. 【结论】取石子游戏(jzoj 1211)
  3. 2018 ACM-ICPC World Finals - Beijing
  4. 数学推导题,NTT,快速数论变换,Wannafly-导数卷积
  5. 1、play编程基础
  6. 6、java中的排序算法
  7. Sentinel(二十三)之使用Apollo存储规则
  8. jQuery的三种bind/One/Live/On事件绑定使用方法
  9. jQuery Raty星级评分插件使用方法
  10. Spring Cloud是什么,和Dubbo对比呢?