为什么80%的码农都做不了架构师?>>>   

本文主要演示一下storm drpc实例

配置

version: '2'
services:supervisor:image: stormcontainer_name: supervisorcommand: storm supervisor -c storm.local.hostname="192.168.99.100" -c drpc.servers='["192.168.99.100"]' -c drpc.port=3772 -c drpc.invocations.port=3773 -c drpc.http.port=3774depends_on:- nimbus- zookeeperlinks:- nimbus- zookeeperrestart: alwaysports:- 6700:6700- 6701:6701- 6702:6702- 6703:6703- 8000:8000drpc:image: stormcontainer_name: drpccommand: storm drpc -c storm.local.hostname="192.168.99.100" -c drpc.port=3772 -c drpc.invocations.port=3773 -c drpc.http.port=3774depends_on:- nimbus- supervisor- zookeeperlinks:- nimbus- supervisor- zookeeperrestart: alwaysports:- 3772:3772- 3773:3773- 3774:3774
  • 这里对supervisor配置drpc.servers及drpc.port、drpc.invocations.port,好让worker通过drpc.invocations.port去访问drpc节点
  • 对于drpc服务,则暴露drpc.port(好让外部的DRPCClient访问)、drpc.invocations.port(让worker访问)

TridentTopology

    @Testpublic void testDeployDRPCStateQuery() throws InterruptedException, TException {TridentTopology topology = new TridentTopology();FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,new Values("the cow jumped over the moon"),new Values("the man went to the store and bought some candy"),new Values("four score and seven years ago"),new Values("how many apples can you eat"));spout.setCycle(true);TridentState wordCounts =topology.newStream("spout1", spout).each(new Fields("sentence"), new Split(), new Fields("word")).groupBy(new Fields("word"))//NOTE transforms a Stream into a TridentState object.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")).parallelismHint(6);topology.newDRPCStream("words").each(new Fields("args"), new Split(), new Fields("word")).groupBy(new Fields("word")).stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")).each(new Fields("count"), new FilterNull()).aggregate(new Fields("count"), new Sum(), new Fields("sum"));StormTopology stormTopology = topology.build();//远程提交 mvn clean package -Dmaven.test.skip=true//storm默认会使用System.getProperty("storm.jar")去取,如果不设定,就不能提交System.setProperty("storm.jar",TOPOLOGY_JAR);Config conf = new Config();conf.put(Config.NIMBUS_SEEDS,Arrays.asList("192.168.99.100")); //配置nimbus连接主机地址,比如:192.168.10.1conf.put(Config.NIMBUS_THRIFT_PORT,6627);//配置nimbus连接端口,默认 6627conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList("192.168.99.100")); //配置zookeeper连接主机地址,可以使用集合存放多个conf.put(Config.STORM_ZOOKEEPER_PORT,2181); //配置zookeeper连接端口,默认2181StormSubmitter.submitTopology("DRPCStateQuery", conf, stormTopology);}
  • 这里newStream创建了一个TridentState,然后newDRPCStream创建了一个DRPCStream,其stateQuery指定为前面创建的TridentState
  • 由于TridentState把结果存储到了MemoryMapState,因而这里的DRPCStream通过drpc进行stateQuery

DRPCClient

    @Testpublic void testLaunchDrpcClient() throws TException {Config conf = new Config();//NOTE 要设置Config.DRPC_THRIFT_TRANSPORT_PLUGIN属性,不然client直接跑空指针conf.put(Config.DRPC_THRIFT_TRANSPORT_PLUGIN,SimpleTransportPlugin.class.getName());conf.put(Config.STORM_NIMBUS_RETRY_TIMES,3);conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL,10000);conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING,10000);conf.put(Config.DRPC_MAX_BUFFER_SIZE, 104857600); // 100MDRPCClient client = new DRPCClient(conf, "192.168.99.100", 3772);System.out.println(client.execute("words", "cat dog the man"));}
  • 注意这里的配置项不能少,否则会引发空指针
  • Config.DRPC_THRIFT_TRANSPORT_PLUGIN这里使用的是SimpleTransportPlugin.class.getName(),虽然该类被废弃了,不过还可以跑通
  • 由于使用了SimpleTransportPlugin.class,因而这里要配置Config.DRPC_MAX_BUFFER_SIZE
  • DRPCClient配置了drpc的地址及port
  • client.execute这里要传入newDRPCStream指定的function名称

小结

  • 使用drpc的时候,需要通过storm drpc启动drpc server服务节点,另外要暴露两个端口,一个drpc.port是供外部DRPCClient调用,一个drpc.invocations.port是给worker来访问;drpc.http.port端口是暴露给http协议调用的(DRPCClient使用的是thrift协议调用)
  • supervisor要配置drpc.servers、drpc.invocations.port,好让worker去访问到drpc server
  • DRPCClient使用drpc.port指定的端口来访问,另外client.execute这里要传入newDRPCStream指定的function名称

doc

  • Trident Tutorial
  • Distributed RPC
  • Running Apache Storm Securely

转载于:https://my.oschina.net/go4it/blog/2250581

storm drpc实例相关推荐

  1. storm drpc

    转 http://www.cnblogs.com/panfeng412/archive/2012/07/02/storm-common-patterns-of-distributed-rpc.html ...

  2. Storm DRPC环境搭建笔记

    Storm DRPC环境搭建笔记 By Mickey.Pro 1. 安装系统 CentOS 6.3 64bit minimal http://www.osyunwei.com/archives/475 ...

  3. Storm DRPC 使用及访问C++ Bolt问题的解决方法

    原创文章,欢迎转载,转载请注明出处:http://blog.csdn.net/jmppok/article/details/16840231 参考1: storm下运行C++程序(一) 参考2: St ...

  4. Twitter Storm: DRPC学习

    学习途径 作者: xumingming | 可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明 网址: http://xumingming.sinaapp.com/756/twitte ...

  5. storm DRPC问题

    一.配置集群storm.yaml文件,配置drpc.server. 二.开启drpc服务,storm drpc. 三.编写DrpcTopology程序.如下: <span style=" ...

  6. storm DRPC例子

    1,DRPC原理 客户端给DRPC服务器发送要执行的方法的名字,以及这个方法的参数.实现了这个函数的topology使用DRPCSpout从DRPC服务器接收函 数调用流.每个函数调用被DRPC服务器 ...

  7. storm drpc学习

    storm drpc 是什么?咋一听觉得挺高大上的,其实也就是那么一回事.就是storm的topology 提供了很多函数,并且函数名唯一,函数里面封装了一些算法操作.只需要在调用的时候指定函数名和传 ...

  8. storm DRPC指南

    storm DRPC指南 @(STORM)[storm] storm DRPC指南 一什么是DRPC 二LocalDPRC的例子 三RemoteDRPC将DPRC拓扑提交至集群 1启动DRPC服务器 ...

  9. Storm DRPC 使用

    来源:http://blog.csdn.net/jmppok/article/details/16839363 1. DRPC介绍 Storm是一个分布式实时处理框架,它支持以DRPC方式调用.可以理 ...

最新文章

  1. 雷军的最后一次 重 大 创 业
  2. 分布式b2b b2c o2o电子商务云平台
  3. Leetcode 451. 根据字符出现频率排序 解题思路及C++实现
  4. matlab 老照片处理,matlab实现PS算法之百叶窗、老照片
  5. python 禁用网卡_如何编程实现启用禁用网卡
  6. 以太坊 node data write error_Node之 创建服务器与客户端
  7. android 开源组件合集-UI篇(2013-11-07更新)
  8. 开源app之MyHearts
  9. 用户生命周期运营白皮书2.0
  10. Android学习系列(41)--Android Studio简单使用
  11. 调整oracle scn,在Oracle中增进SCN及案例介绍
  12. 为什么有些工厂,3000块一个月不包吃住还能招到工人?
  13. 【Nginx探究系列二】Nginx配置篇之客户Nginx白名单访问配置
  14. Android群英传笔记——第十二章:Android5.X 新特性详解,Material Design UI的新体验...
  15. iOS获取DSYM文件
  16. JAVA远程声卡,Delphi带多声道声卡(ASIO)
  17. 投掷骰子的python代码_模拟骰子(Python),掷骰子
  18. type-aliases-package不生效问题记录
  19. 探索未知领域,是我犯错了么?
  20. 大数据时代:我们的邮件被谁偷看了?

热门文章

  1. pdf文档如何修改乱序页面
  2. 怎么修改PDF,PDF怎么编辑图片
  3. 对话朱嘉明:元宇宙的商业前景、技术路径和治理规则
  4. Pgbouncer最佳实践:系列一
  5. java web 编辑器_基于Java+web的在线Java编辑器 PDF 下载
  6. 手表怎么升级鸿蒙,华为鸿蒙2.0系统可更新设备一览表,鸿蒙2.0系统更新教程说明[图]...
  7. 遥感IDL二次开发(叶面积指数LAI计算)
  8. 读《DevOps实践指南》笔记三
  9. 使用人脸关键点检测(68点)模型进行标注
  10. [情人节]今天你用了那个“约会神器”