配置选项名称

配置选项作用

topology.max.task.parallelism

每个Topology运行时最大的executor数目

topology.workers

每个Topology运行时的worker的默认数目,若在代码中设置,则此选项值被覆盖

storm.zookeeper.servers

zookeeper集群的节点列表

storm.local.dir

Storm用于存储jar包和临时文件的本地存储目录

storm.zookeeper.root

Storm在zookeeper集群中的根目录,默认是“/”

ui.port

Storm集群的UI地址端口号,默认是8080

nimbus.host:

Nimbus节点的host

supervisor.slots.ports

Supervisor 节点的worker占位槽,集群中的所有Topology公用这些槽位数,即使提交时设置了较大数值的槽位数,系统也会按照当前集群中实际剩余的槽位数来 进行分配,当所有的槽位数都分配完时,新提交的Topology只能等待,系统会一直监测是否有空余的槽位空出来,如果有,就再次给新提交的 Topology分配

supervisor.worker.timeout.secs

Worker的超时时间,单位为秒,超时后,Storm认为当前worker进程死掉,会重新分配其运行着的task任务

drpc.servers

在使用drpc服务时,drpc server的服务器列表

drpc.port

在使用drpc服务时,drpc server的服务端口

本地模式下, 基本并发度控制

conf.setMaxTaskParallelism(5);   本地模式下一个组件能够运行的最大线程数

builder.setSpout("spout", new RandomSentenceSpout(), 10);  最后的参数parallelism_hint 表示executor的数目,每个作为一个thread在work下工作,  但是如果超过setMaxTaskParallelism定义的上限,则使用setMaxTaskParallelism设置的TOPOLOGY_MAX_TASK_PARALLELISM

builder.setSpout("spout", new RandomSentenceSpout(), 5).setNumTasks(2);  ,task的数目,默认和executor是1:1 的关系,就是每个task运行在一个物理线程上,

在这里设置的是taskNum为2,executor 是5,表示RandomSentenceSpout创建2次,实际只有两个2个executor,  executor不能超过NumTask

builder.setSpout("spout", new RandomSentenceSpout(), 2).setNumTasks(5);

在这里设置的是taskNum为5,executor 是2, 表示RandomSentenceSpout创建5次,2个executor在两个物理线程上执行,  每个executor执行1/2的任务

这么写感觉意义都不大, 只是个人为了理解storm executor task概念, 在0.8以后,几个executor有可能是共用一个物理线程,由上面测试能看出。

突然想起这个其实还是有好处的,因为在storm中 TaskNum是静态的, executor是动态的, 比如tasknum是5,exector是2,这时候是在两个物理线程执行, 如果我们将executor改成3,  这时会变成在3个物理线程上执行,提高了并发性. 物理线程公式应该Min(executor, tasknum),  这个未在任何文档上见过,个人的一个推断.

动态调整参数

# Reconfigure the topology "mytopology" to use 5 worker processes,
# the spout "blue-spout" to use 3 executors and
# the bolt "yellow-bolt" to use 10 executors.$ storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10

builder.setBolt("split", new SplitSentence(), 8).setNumTasks(1).shuffleGrouping("spout"); 这里和上面一样,会负载均衡地放入一个线程中运行

conf.setDebug(true);                         // 
conf.setMaxSpoutPending(2);          //  这个设置一个spout task上面最多有多少个没有处理(ack/fail)的tuple,防止tuple队列过大, 只对可靠任务起作用
conf.setMessageTimeoutSecs(1);    //  消息处理延时, 就是消息超过延时后, emit发射源会认为是fail , storm默认是30秒,如果实现的为Irichbolt接口,没有ack和ack延时都会触发,这个时间过短的话,如果自定义重发,bolt可能会多处理,tuple在发射过程中, 但是还没有到达bolt, 但是已经延时了,emit发射源会认为已经失败了,但是bolt还是收到这个tuple, 所以storm引入了事务拓扑,0.8以后叫trident. 如果实现的为IBaseBolt,则只会在延时情况下触发, 默认会调用ack,但是这个ack如果有再次发射, 这个ack就会自动锚定了.

根据具体业务需求选择合适的Bolt
conf.setNumAckers(2);                     //  消息处理的acker数量.默认1,可以根据实际处理情况调大

真实环境

conf.setNumWorkers(5); // 设置工作进程 ,  如果不添加端口, 默认会是4个worker进程

需要在storm.yaml下添加端口

supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
- 6704

每个worker使用一个端口.

在uI窗口是spout bolt acker几个的累加.

storm.yaml参数参考

配置项 配置说明
storm.zookeeper.servers ZooKeeper服务器列表
storm.zookeeper.port ZooKeeper连接端口
storm.local.dir storm使用的本地文件系统目录(必须存在并且storm进程可读写)
storm.cluster.mode Storm集群运行模式([distributed|local])
storm.local.mode.zmq Local模式下是否使用ZeroMQ作消息系统,如果设置为false则使用java消息系统。默认为false
storm.zookeeper.root ZooKeeper中Storm的根目录位置
storm.zookeeper.session.timeout 客户端连接ZooKeeper超时时间
storm.id 运行中拓扑的id,由storm name和一个唯一随机数组成。
nimbus.host nimbus服务器地址
nimbus.thrift.port nimbus的thrift监听端口
nimbus.childopts 通过storm-deploy项目部署时指定给nimbus进程的jvm选项
nimbus.task.timeout.secs 心跳超时时间,超时后nimbus会认为task死掉并重分配给另一个地址。
nimbus.monitor.freq.secs nimbus检查心跳和重分配任务的时间间隔.注意如果是机器宕掉nimbus会立即接管并处理。
nimbus.supervisor.timeout.secs supervisor的心跳超时时间,一旦超过nimbus会认为该supervisor已死并停止为它分发新任务.
nimbus.task.launch.secs task启动时的一个特殊超时设置.在启动后第一次心跳前会使用该值来临时替代nimbus.task.timeout.secs.
nimbus.reassign 当发现task失败时nimbus是否重新分配执行。默认为真,不建议修改。
nimbus.file.copy.expiration.secs nimbus判断上传/下载链接的超时时间,当空闲时间超过该设定时nimbus会认为链接死掉并主动断开
ui.port Storm UI的服务端口
drpc.servers DRPC服务器列表,以便DRPCSpout知道和谁通讯
drpc.port Storm DRPC的服务端口
supervisor.slots.ports supervisor上能够运行workers的端口列表.每个worker占用一个端口,且每个端口只运行一个worker.通过这项配置可以调整每台机器上运行的worker数.(调整slot数/每机)
supervisor.childopts 在storm-deploy项目中使用,用来配置supervisor守护进程的jvm选项
supervisor.worker.timeout.secs supervisor中的worker心跳超时时间,一旦超时supervisor会尝试重启worker进程.
supervisor.worker.start.timeout.secs supervisor初始启动时,worker的心跳超时时间,当超过该时间supervisor会尝试重启worker。因为JVM初始启动和配置会带来的额外消耗,从而使得第一次心跳会超过supervisor.worker.timeout.secs的设定
supervisor.enable supervisor是否应当运行分配给他的workers.默认为true,该选项用来进行Storm的单元测试,一般不应修改.
supervisor.heartbeat.frequency.secs supervisor心跳发送频率(多久发送一次)
supervisor.monitor.frequency.secs supervisor检查worker心跳的频率
worker.childopts supervisor启动worker时使用的jvm选项.所有的”%ID%”字串会被替换为对应worker的标识符
worker.heartbeat.frequency.secs worker的心跳发送时间间隔
task.heartbeat.frequency.secs task汇报状态心跳时间间隔
task.refresh.poll.secs task与其他tasks之间链接同步的频率.(如果task被重分配,其他tasks向它发送消息需要刷新连接).一般来讲,重分配发生时其他tasks会理解得到通知。该配置仅仅为了防止未通知的情况。
topology.debug 如果设置成true,Storm将记录发射的每条信息。
topology.optimize master是否在合适时机通过在单个线程内运行多个task以达到优化topologies的目的.
topology.workers 执行该topology集群中应当启动的进程数量.每个进程内部将以线程方式执行一定数目的tasks.topology的组件结合该参数和并行度提示来优化性能
topology.ackers topology中启动的acker任务数.Acker保存由spout发送的tuples的记录,并探测tuple何时被完全处理.当Acker探测到tuple被处理完毕时会向spout发送确认信息.通常应当根据topology的吞吐量来确定acker的数目,但一般不需要太多.当设置为0时,相当于禁用了消息可靠性,storm会在spout发送tuples后立即进行确认.
topology.message.timeout.secs topology中spout发送消息的最大处理超时时间.如果一条消息在该时间窗口内未被成功ack,Storm会告知spout这条消息失败。而部分spout实现了失败消息重播功能。
topology.kryo.register 注册到Kryo(Storm底层的序列化框架)的序列化方案列表.序列化方案可以是一个类名,或者是com.esotericsoftware.kryo.Serializer的实现.
topology.skip.missing.kryo.registrations Storm是否应该跳过它不能识别的kryo序列化方案.如果设置为否task可能会装载失败或者在运行时抛出错误.
topology.max.task.parallelism 在一个topology中能够允许的最大组件并行度.该项配置主要用在本地模式中测试线程数限制.
topology.max.spout.pending 一个spout task中处于pending状态的最大的tuples数量.该配置应用于单个task,而不是整个spouts或topology.
topology.state.synchronization.timeout.secs 组件同步状态源的最大超时时间(保留选项,暂未使用)
topology.stats.sample.rate 用来产生task统计信息的tuples抽样百分比
topology.fall.back.on.java.serialization topology中是否使用java的序列化方案
zmq.threads 每个worker进程内zeromq通讯用到的线程数
zmq.linger.millis 当连接关闭时,链接尝试重新发送消息到目标主机的持续时长.这是一个不常用的高级选项,基本上可以忽略.
java.library.path JVM启动(如Nimbus,Supervisor和workers)时的java.library.path设置.该选项告诉JVM在哪些路径下定位本地库.
storm内默认参数
java.library.path:"/usr/local/lib:/opt/local/lib:/usr/lib"
   
  ### storm.* configs are general configurations
  # the local dir is where jars are kept
  storm.local.dir: "storm-local"
  storm.zookeeper.servers:
  - "localhost"
  storm.zookeeper.port: 2181
  storm.zookeeper.root: "/storm"
  storm.zookeeper.session.timeout: 20000
  storm.zookeeper.connection.timeout: 15000
  storm.zookeeper.retry.times: 5
  storm.zookeeper.retry.interval: 1000
  storm.zookeeper.retry.intervalceiling.millis: 30000
  storm.cluster.mode: "distributed" # can be distributed or local
  storm.local.mode.zmq: false
  storm.thrift.transport: "backtype.storm.security.auth.SimpleTransportPlugin"
  storm.messaging.transport: "backtype.storm.messaging.netty.Context"
  storm.meta.serialization.delegate: "backtype.storm.serialization.DefaultSerializationDelegate"
   
  ### nimbus.* configs are for the master
  nimbus.host: "localhost"
  nimbus.thrift.port: 6627
  nimbus.thrift.max_buffer_size: 1048576
  nimbus.childopts: "-Xmx1024m"
  nimbus.task.timeout.secs: 30
  nimbus.supervisor.timeout.secs: 60
  nimbus.monitor.freq.secs: 10
  nimbus.cleanup.inbox.freq.secs: 600
  nimbus.inbox.jar.expiration.secs: 3600
  nimbus.task.launch.secs: 120
  nimbus.reassign: true
  nimbus.file.copy.expiration.secs: 600
  nimbus.topology.validator: "backtype.storm.nimbus.DefaultTopologyValidator"
   
  ### ui.* configs are for the master
  ui.port: 8080
  ui.childopts: "-Xmx768m"
   
  logviewer.port: 8000
  logviewer.childopts: "-Xmx128m"
  logviewer.appender.name: "A1"
   
   
  drpc.port: 3772
  drpc.worker.threads: 64
  drpc.queue.size: 128
  drpc.invocations.port: 3773
  drpc.request.timeout.secs: 600
  drpc.childopts: "-Xmx768m"
   
  transactional.zookeeper.root: "/transactional"
  transactional.zookeeper.servers: null
  transactional.zookeeper.port: null
   
  ### supervisor.* configs are for node supervisors
  # Define the amount of workers that can be run on this machine. Each worker is assigned a port to use for communication
  supervisor.slots.ports:
  - 6700
  - 6701
  - 6702
  - 6703
  supervisor.childopts: "-Xmx256m"
  #how long supervisor will wait to ensure that a worker process is started
  supervisor.worker.start.timeout.secs: 120
  #how long between heartbeats until supervisor considers that worker dead and tries to restart it
  supervisor.worker.timeout.secs: 30
  #how frequently the supervisor checks on the status of the processes it's monitoring and restarts if necessary
  supervisor.monitor.frequency.secs: 3
  #how frequently the supervisor heartbeats to the cluster state (for nimbus)
  supervisor.heartbeat.frequency.secs: 5
  supervisor.enable: true
   
  ### worker.* configs are for task workers
  worker.childopts: "-Xmx768m"
  worker.heartbeat.frequency.secs: 1
   
  # control how many worker receiver threads we need per worker
  topology.worker.receiver.thread.count: 1
   
  task.heartbeat.frequency.secs: 3
  task.refresh.poll.secs: 10
   
  zmq.threads: 1
  zmq.linger.millis: 5000
  zmq.hwm: 0
   
   
  storm.messaging.netty.server_worker_threads: 1
  storm.messaging.netty.client_worker_threads: 1
  storm.messaging.netty.buffer_size: 5242880 #5MB buffer
  # Since nimbus.task.launch.secs and supervisor.worker.start.timeout.secs are 120, other workers should also wait at least that long before giving up on connecting to the other worker.
  storm.messaging.netty.max_retries: 300
  storm.messaging.netty.max_wait_ms: 1000
  storm.messaging.netty.min_wait_ms: 100
   
  # If the Netty messaging layer is busy(netty internal buffer not writable), the Netty client will try to batch message as more as possible up to the size of storm.messaging.netty.transfer.batch.size bytes, otherwise it will try to flush message as soon as possible to reduce latency.
  storm.messaging.netty.transfer.batch.size: 262144
   
  # We check with this interval that whether the Netty channel is writable and try to write pending messages if it is.
  storm.messaging.netty.flush.check.interval.ms: 10
   
  ### topology.* configs are for specific executing storms
  topology.enable.message.timeouts: true
  topology.debug: false
  topology.workers: 1
  topology.acker.executors: null
  topology.tasks: null
  # maximum amount of time a message has to complete before it's considered failed
  topology.message.timeout.secs: 30
  topology.multilang.serializer: "backtype.storm.multilang.JsonSerializer"
  topology.skip.missing.kryo.registrations: false
  topology.max.task.parallelism: null
  topology.max.spout.pending: null
  topology.state.synchronization.timeout.secs: 60
  topology.stats.sample.rate: 0.05
  topology.builtin.metrics.bucket.size.secs: 60
  topology.fall.back.on.java.serialization: true
  topology.worker.childopts: null
  topology.executor.receive.buffer.size: 1024 #batched
  topology.executor.send.buffer.size: 1024 #individual messages
  topology.receiver.buffer.size: 8 # setting it too high causes a lot of problems (heartbeat thread gets starved, throughput plummets)
  topology.transfer.buffer.size: 1024 # batched
  topology.tick.tuple.freq.secs: null
  topology.worker.shared.thread.pool.size: 4
  topology.disruptor.wait.strategy: "com.lmax.disruptor.BlockingWaitStrategy"
  topology.spout.wait.strategy: "backtype.storm.spout.SleepSpoutWaitStrategy"
  topology.sleep.spout.wait.strategy.time.ms: 1
  topology.error.throttle.interval.secs: 10
  topology.max.error.report.per.interval: 5
  topology.kryo.factory: "backtype.storm.serialization.DefaultKryoFactory"
  topology.tuple.serializer: "backtype.storm.serialization.types.ListDelegateSerializer"
  topology.trident.batch.emit.interval.millis: 500
  topology.classpath: null
  topology.environment: null
   
  dev.zookeeper.path: "/tmp/dev-storm-zookeeper"

Storm处理流程, 基本参数配置相关推荐

  1. 5G RRC——为NAS层提供连接管理,消息传递等服务; 对接入网的底层协议实体提供参数配置的功能; 负责UE移动性管理相关的测量、控制等功能...

    from:http://www.cnblogs.com/kkdd-2013/p/3868676.html 1 RRC协议功能 为NAS层提供连接管理,消息传递等服务: 对接入网的底层协议实体提供参数配 ...

  2. 5G-NR非连续接收DRX参数配置详解

    5G-NR系统配置中的非连续接收系统 DRX:Discontinuous Reception 5G终端商用在即,根据前期测试及部分5G友好用户反馈,"5G终端功耗大,待机差"问题特 ...

  3. soot基础 -- 常用参数配置

    在soot的学习过程中会遇到大量的和配置相关的一些内容,这些内容设置的不正确会很让人感到苦恼,为此做出以下总结. 使用soot进行编程的流程 基本的流程图如下: [类加载]在类加载阶段,可以设置加载哪 ...

  4. 闪聚支付 第2章-支付参数配置

    需求概述 基础概念 理解应用 商户资质审核通过后就可以使用闪聚支付平台提供的服务,闪聚支付平台所提供的基础服务正是聚合支付.聚合支付就是将微信.支付宝等支付渠道汇聚为一个支付通道供商户使用,如下图: ...

  5. 服务器部署的参数文档,服务器参数配置

    服务器参数配置 内容精选 换一换 源端服务器迁移至华为云后,最终将迁移到弹性云服务器上.因此在迁移前,您需要在华为云中创建一个或多个弹性云服务器.进入"弹性云服务器"页面.关于参数 ...

  6. 参数配置类毕业论文文献都有哪些?

    本文是为大家整理的参数配置主题相关的10篇毕业论文文献,包括5篇期刊论文和5篇学位论文,为参数配置选题相关人员撰写毕业论文提供参考. 1.[期刊论文]柔性网空中联合组网参数配置优化算法 期刊:< ...

  7. 掌握JedisPoolConfig参数配置,学会调优技能

    点击上方☝Java编程技术乐园,轻松关注! 及时获取有趣有料的技术文章 做一个积极的人 编码.改bug.提升自己 我有一个乐园,面向编程,春暖花开! 你好,JedisPoolConfig Java中使 ...

  8. Storm之——Storm2.0.0配置详解

    转载请注明出处:https://blog.csdn.net/l1028386804/article/details/99126674 Storm2.0.0中defaults.yaml文件的配置如下(链 ...

  9. mysql属性配置提高查询_MYSQL性能优化-安装时优化参数配置提高服务性能

    MYSQL性能优化一直是个头痛的问题,目前大多都是直接把页面html静态页面或直接使用了缓存技术,下面我就mysql本身的性能优化来分享一下. 安装时优化参数配置提高服务性能 在Linux下安装Mys ...

  10. php7+的php-fpm参数配置,注意事项

    安装php7+的,如果php-fpm的这几个参数设置不当了,会导致php-fpm启动不了,nginx站点不能解析php文件,报404错误. 相关命令: centos7+,启动php-fpm: syst ...

最新文章

  1. 什么是Hystrix
  2. Redis和Memcached的一些区别
  3. java两个函数名字相同_为什么C不允许两个具有相同名称的函数/类模板,只有非类型模板参数(整数类型)的类型不同?...
  4. 【数据结构与算法】之深入解析“求根节点到叶节点数字之和”的求解思路与算法示例
  5. JavaWeb——c:forEach varStatus=status
  6. 阿里云esc服务器和mysql_解决远程链接阿里云esc服务器的mysql数据库
  7. BZOJ 1898: [Zjoi2005]Swamp 沼泽鳄鱼 [矩阵乘法]
  8. 消息中间件Client模块划分
  9. Django Model设计详解
  10. Excel曲线拟合及拟合公式不正确问题
  11. 卡卡通小熊win7桌面主题+非主流win7主题下载
  12. excel有多行不同内容需要向下填充
  13. Quantization and Training of Neural Networks for Efficient Integer-Arithmetic-Only Inference (纯整数计算)
  14. 自学Python:快速查找文件或文件夹
  15. 网易云信IM即时通讯PHP接口开发
  16. 给一个字符串数组,判断其是否是首尾相连的
  17. creo扫描选择多条链作为轨迹_CREO/PROE四芯花线建模,灵活使用关系式控制扫描截面就成了...
  18. html入门笔记,HTML入门笔记(附完整代码)
  19. Everest——Linux发行版本
  20. python123字符串替代_python字符串批量替换

热门文章

  1. 阶段3 2.Spring_08.面向切面编程 AOP_6 四种常用通知类型
  2. ultraedit激活
  3. 【PAT】1009. 说反话 (20)
  4. C#面向对象架构总结
  5. selenium RC优化代码3
  6. iOS-更新CocoaPods出现错误 提示重复文件
  7. 1 / 1 / 2016
  8. BZOJ 1412: [ZJOI2009]狼和羊的故事( 最小割 )
  9. [Python]关键字is和操作符==
  10. MySQL 存储过程参数IN OUT INOUT区别