实时流处理简单概述:实时是说整个流处理相应时间较短,流式技算是说数据是源源不断的,没有尽头的。实时流处理一般是将业务系统产生的数据进行实时收集,交由流处理框架进行数据清洗,统计,入库,并可以通过可视化的方式对统计结果进行实时的展示。本文涉及到的框架或技术有 Flume,Logstash,kafka,Storm, SparkStreaming等。

实时流处理的的流程与技术选型 :

一、日志收集

由于业务系统一般是游离与流处理集群如SparkStreaming、Storm之外的,所以我们需要对业务系统的数据进行实时收集。这就用到了日志收集框架,日志收集框架主要需要解决三个问题:数据从哪儿来,数据到哪儿去,实时收集。因为在流处理中为了防止突发或激增流量压垮流处理集群,通常将收集过后的数据输出到kafka分布式消息系统,然后流处理集群去消费kafka中的数据,下面介绍两种常用的日志收集框架以及他们如何对接kafka.

1).Apache Flume

这是一个apache的顶级项目,所以他的域名为flume.apache.org, 下面是官网上的原理图,Flume框架把每个收集任务都定义为一个Agent(这是一个JAVA进程),他有三个基本组件Source、Channel、Sink。

source:收集数据,可以对接各种常用数据源,如文件(exec source),kafka(kafka source),jms(java消息系统)等。 
      channel:source组件把数据收集来以后,临时存放在channel(管道)中,即channel组件在agent中是专门用来存放临时数据的,并起到数据缓冲的作用。常用的channel有memory chanel 、jdbc chanel 、file channel 等等。

sink:sink组件是用于从channel中取数据并送到目的地的组件,目的地包括hdfs、logger、avro、thrift、file、hbase等。

其实flume的使用就是编写配置文件,下面是使用flume将Nginx的日志对接kafka的配置文件,我们将该收集任务命名为

exec-memory-kafka,只需如下编写:

#配置source、sink、channel

exec-memory-kafka.sources = exec-source  #指定source (数据从哪儿来),可以指定多个数据源,用逗号分隔。
exec-memory-kafka.sinks = kafka-sink #指定sink(数据到哪儿去)
exec-memory-kafka.channels = memory-channel #指定channel

#source详细配置
exec-memory-kafka.sources.exec-source.type = exec  执行操作系统命令
exec-memory-kafka.sources.exec-source.command = sudo tail -F /var/log/nginx/access.log #监控Nginx日志文件
exec-memory-kafka.sources.exec-source.shell = /bin/sh -c  #shell命令的前缀

#channel 详细配置
exec-memory-kafka.channels.memory-channel.type = memory  #内存channel

#sink详细配置
exec-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink #类型 为kafka sink
exec-memory-kafka.sinks.kafka-sink.brokerList = hadoop000:9092  #kafaka服务的地址,多个用逗号分隔
exec-memory-kafka.sinks.kafka-sink.topic = test1 #指定主题
exec-memory-kafka.sinks.kafka-sink.batchSize = 5  #指定每多少条收集一次,这里是每5条发送一次。
exec-memory-kafka.sinks.kafka-sink.requiredAcks = 1 #使kafka对是否收到数据进行确认,确保数据不会丢失

#为sink和source指定channel

exec-memory-kafka.sources.exec-source.channels = memory-channel

exec-memory-kafka.sinks.kafka-sink.channel = memory-channel

编写好配置文件后切换到flume的bin目录下执行:

flume-ng agent --conf 配置文件的目录--conf-file 配置文件的全路径--name exec-memory-kafka -Dflume.root.logger=INFO,console

即可开启收集任务(进程的方式)

2).ELK技术栈的Logstash

Logstash 是一个开源的数据收集引擎,它具有备实时数据传输能力。它可以统一过滤来自不同源的数据,并按照开发者的制定的规范输出到目的地。Logstash使用时也是编写配置文件,下面是如何使用配置文件的方式将Nginx日志输出到Kafka。

#定义数据源

input{

#这里是Nginx日志文件

file{

path =>"/var/log/nginx/access.log"
   }
}

#数据发到哪,这里是kafka
output{
kafka{
    topic_id => "test1"  #指定topic
    codec=>plain{
    format=>"%{message}"  #输出的格式,这里表示只输出消息,不输出其他信息,如版本信息等。
}
    bootstrap_servers=>"hadoop000:9092"  #kafka服务的地址
    batch_size=>1  #每几条数据发送一次
}
}

切换到logstash的bin目录,执行以下命令即可开始收集任务:

logstash -f 你的配置文件的位置。

二、kafka

kafka是一个分布式的流处理平台,在流处理中,我们通常使用他作为一个消息系统来使用,他是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统。

kafka作为消息系统时相比其他消息中间件主要有4大优势:

- 可扩展性:可以通过增加broker的方式水平扩展kafka集群

- 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失

- 容错性:最大限度的容灾,允许集群中节点失败,包括主节点,是高可用的。

- 高并发

几个重要的角色:

Broker:Kafka节点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群,一台机器可以启动多个broker在不同的端口上。

Topic:消息系统中的主题,生产者和消费者共同关注的部分。

Partition:topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列

Segment:partition物理上由多个segment组成,每个Segment存着message信息,以文件的形式

Producer :生产者, 生产message发送到topic

Consumer : 消费者,订阅topic消费message, consumer作为一个线程来消费

具体的使用可参照官网,有详细的介绍:

http://kafka.apache.org/quickstart

三、流处理框架

日志信息输出到kafka后,需要使用流处理框架作为消费者去消费kafka中的数据,下面是Storm和Spark的基本原理及其如何使用。

1 .Storm

apache的顶级项目,官网是storm.apache.org ,他是一个免费的,开源的,分布式的实时计算系统。

Storm有很多用处:如实时计算分析,在线机器学习,分布式RPC即DRPC,作为ETL工具等,

Storm特点:处理速度快、可扩展 、容灾与高可用的,能够实现高频数据和大规模数据的实时处理。

Storm中几个核心的概念:

Topologies:拓扑,将整个流处理流程串起来,每个storm应用程序都需要定义Toplogies,由spout和bolt组成的。

Streams:消息流,抽象概念,由没有边界的Tuple构成

Spouts:消息流的源头,Topology的消息生产者。产生数据的组件,比如我们要对接kafka,我们就要定义一个kafka Spout

Bolts:消息处理单元,可以做过滤、聚合、查询/写数据库等操作。

Tuple:具体的数据,传递的基本单元。

Storm架构:

类似于Hadoop的架构,也是主从架构(Master/Slave),所有节点都是无状态的,在他们上面的信息(元数据)会存储在zookeeper中

Nimbus: 集群的主节点,负责任务(task)的指派和分发、资源的分配

Supervisor: 从节点,可以启动多个Worker,可以通过配置来指定一个Topo运行在多个Worker之上,也可以通过配置来指定集群的从节点(负责干活的),Supervisor节点负责执行任务的具体部分,启动和停止自己管理的Worker进程等,一个Supervisor默认启动4个Worker进程

Worker: 运行具体组件逻辑(Spout/Bolt)的进程,这是一个进程,一个Work进程只为一个Topology服务。
Task: Worker中每一个Spout和Bolt的线程称为一个Task,他是最终运行spout或者bolt代码的最小执行单元

executor:是一个被worker进程启动的单独线程,Spout和bolt和共享一个executor,而且一个executor可以运行多个Task。

下面是各个组件职责的示意图:

编码时几个核心的角色:

1).  ISpout:核心接口(interface),负责将数据发送到topology中去处理,Storm会跟踪Spout发出去的tuple的,通过ack/fail机制,对Spout发送成功或失败时做处理,没条数据即Tuple都有自己的message id,而且ack/fail/nextTuple是在同一个线程中执行的,所以不用考虑线程安全方面。

核心方法

open: 初始化操作

close: 资源释放操作

nextTuple: 发送数据   
ack: tuple处理成功,storm会反馈给spout一个成功消息
fail:tuple处理失败,storm会发送一个消息给spout,处理失败
        实现类:
public abstract class BaseRichSpout extends BaseComponent implements IRichSpout {
public interface IRichSpout extends ISpout, IComponent {}

我们定义Spout时只需要继承BaseRichSpout这个类,并实现其中的方法即可。

2).IComponent接口
概述:public interface IComponent extends Serializable
  他为topology中所有可能的组件提供公用的方法
        如 void declareOutputFields(OutputFieldsDeclarer declarer);
此方法用于声明当前Spout/Bolt发送的tuple的名称,使用OutputFieldsDeclarer配合使用
实现类:
public abstract class BaseComponent implements IComponent

IBolt接口:

概述职责:接收tuple处理,并进行相应的处理(filter/join/....),IBolt会在一个运行的机器上创建,使用Java序列化它,然后提交到主节点(nimbus)上去执行,nimbus会启动worker来反序列化,调用prepare方法,然后才开始处理tuple处理
方法:
prepare:初始化
execute:处理一个tuple数据,tuple对象中包含了元数据信息
cleanup:shutdown之前的资源清理操作
实现类:
public abstract class BaseRichBolt extends BaseComponent implements IRichBolt {
public interface IRichBolt extends IBolt, IComponent

RichShellBolt

我们定义Bolt时只需继承BaseRichBolt并实现其中的方法即可。

以下是Storm对kafka的消息进行实时打印的代码实现。Storm官网有许多对接主流框架的介绍,引入所需jar包,就可以使用写好的KafkaSpout,而无需自己定义KafkaSpout类了。

<dependency><groupId>org.apache.storm</groupId><artifactId>storm-kafka</artifactId><version>${storm.version}</version>
</dependency>
public class StormKafkaTopology {public static class LogBolt extends BaseRichBolt {private OutputCollector outputCollector;public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {this.outputCollector = outputCollector;}public void execute(Tuple tuple) {try {byte[] bytes = tuple.getBinaryByField("bytes");String value = new String(bytes);System.out.println("value :" + value);this.outputCollector.ack(tuple);} catch (Exception e) {this.outputCollector.fail(tuple);}}//无后续bolt,无需声明public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {}}public static void main(String[] args) {TopologyBuilder builder = new TopologyBuilder();//kafka的配置String topicName = "project_topic";BrokerHosts hosts = new ZkHosts("hadoop000:2181");SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString());//从上次收集的位置开始,而不是从头开始spoutConfig.startOffsetTime=kafka.api.OffsetRequest.LatestTime();//创建kafkaSpoutKafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);builder.setSpout("KafkaSpout", kafkaSpout);//设置Boltbuilder.setBolt("LogBolt", new LogBolt()).shuffleGrouping("KafkaSpout");//本地运行Storm任务LocalCluster cluster = new LocalCluster();cluster.submitTopology("StormKafkaTopology", new Config(), builder.createTopology());}
}}

2.SparkStreaming

官网上的介绍如下:

Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Flume, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like mapreducejoin and window. Finally, processed data can be pushed out to filesystems, databases, and live dashboards. In fact, you can apply Spark’s machine learning and graph processing algorithms on data streams.

即:Spark Streaming 是Spark核心API的一个扩展,可以实现高吞吐量的、具备容错机制的实时流数据的处理。支持从多种数据源获取数据,包括Kafk、Flume、Twitter、ZeroMQ、Kinesis 以及TCP sockets,从数据源获取数据之后,可以使用诸如map、reduce、join和window等高级函数进行复杂算法的处理。最后还可以将处理结果存储到文件系统,数据库和现场仪表盘。在“One Stack rule them all”的基础上,还可以使用Spark的其他子框架,如集群学习、图计算等,对流数据进行处理。

Spark严格意义上来说并不能算实时流处理,他粗粒度的工作原理为:将实时接收的数据,根据一定的时间间隔拆成一批批的数据,具体来说是一批批RDD(分布式弹性数据集,Spark中的核心概念),然后通过SparkEngine来处理这些数据,可能是一些transformation和action操作,最后得到一批批的处理结果。

Strom和SparkStreaming的对比:

1).Strom是真正意义上的的流处理,时延相比SparkStreaming较低,而SparkStremming是将接受的实时流数据,按照指定的时间间隔拆成一个个RDD,在每个RDD中以批处理的形式处理数据。本质上还是批处理。

2).Storm会通过messageId的方式全局追踪和记录每一条记录,并通过ack/fail机制确保每条数据至少被处理一次(也可能是多次),而SparkStream应用程序只需要批处理级别对记录进行追踪,他能保证每个批处理记录仅仅被处理一次。

3).由于SparkStreming是运行在Spark平台上的无需单独安装,可以和批处理SparkSql,机器学习等其他其框架结合起来使用。

下面使用scala语言将SparkStreming对接kafka并对图书点击量进行实时统计的应用代码:将kafka中收集到的日志进行清洗,并转换成ClikcLog对象,并实时统计的结果转化成BookClick对象并写入Hbase,Nginx日志结构如下:

192.168.126.1 - - [2017-12-02 19:20:28] "GET /books/1 HTTP/1.1" 200 2403 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.94 Safari/537.36" "-"
object BookCount {def main(args: Array[String]): Unit = {//以参数的形式运行SparkStreming应用程序 四个参数为zk地址 ,用户组, 主题,线程数if (args.length != 4) {System.err.println("Usage: KafkaReceiverWordCount <zkQuorum> <group> <topics> <numThreads>")}val Array(zkQuorum, group, topics, numThreads) = argsval sparkConf = new SparkConf()//构造StreamingContextval ssc = new StreamingContext(sparkConf, Seconds(5))val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap// Spark Streaming对接Kafkaval messages = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)val logs = messages.map(_._2)// 192.168.126.1 - - [2017-12-02 19:20:28] "GET /books/1 HTTP/1.1" 200 2403 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.94 Safari/537.36" "-"//  0            1 2   3             4        5    6        7     8val cleanData = logs.map(line => {val infos = line.split(" ")val url = infos(6)var bookId = 0val time = infos(3).substring(1) + " " + infos(4).substring(0, 7)if (url.startsWith("/books/")) {//只关注以books/开头的请求bookId = url.split("/")(2).toInt}ClickLog(infos(0), TimeUtil.newTime(time), bookId, infos(8).toInt)}).filter(clickLog => clickLog.bookId != 0)//为零表示不满足要求,忽略。//cleanData.print()cleanData.map(x => {(x.time.substring(0, 8) + "_" + x.bookId, 1)}).reduceByKey(_ + _).foreachRDD(rdd => {rdd.foreachPartition(record => {val list= new ListBuffer[BookClick]record.foreach(pair => {list.append(BookClick(pair._1,pair._2))})BookClickDao.put(list)})})ssc.start()ssc.awaitTermination()}
}
case class ClickLog(ip:String,time:String,bookId:Int,statusCode:Int)
case class BookClick(day_id:String,click_count:Int)
object BookClickDao {val tableName = "book_clickcount"val cf = "info"val colume = "click_count"def put(list: ListBuffer[BookClick]): Unit = {val table = HbaseUtils.getInstance().getTable(tableName)for (ele <- list) {table.incrementColumnValue(Bytes.toBytes(ele.day_id), Bytes.toBytes(cf), Bytes.toBytes(colume), ele.click_count)}}def get(day_id: String): Long = {val table = HbaseUtils.getInstance().getTable(tableName)val get = new Get(Bytes.toBytes(day_id))val value = table.get(get).getValue(cf.getBytes, colume.getBytes)if (value == null)0lelseBytes.toLong(value)}}
object TimeUtil {val YYYYMMDDHHMMSS_FORMAT = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")val TARGET_TIME = FastDateFormat.getInstance("yyyyMMddHHmmss")def passTime(time: String)={YYYYMMDDHHMMSS_FORMAT.parse(time)}def newTime(time:String)={TARGET_TIME.format(passTime(time))}def main(args: Array[String]): Unit = {println(newTime("2017-12-02 19:13:25"))}
}

因为流处理框架本身不具备存储能力,最后需要将统计结果入库,并可通过百度的Echart或者阿里的DataV等数据可视化工具,定义sql和时间间隔,对统计结果进行实时的展示。

大数据之实时流处理常用框架相关推荐

  1. 什么是大数据「实时流计算」?深度解析它的4大应用及4个特点

    导读:火灾已经爆发后才知道救火,交通已经阻塞后才知道疏通,羊毛已经被"羊毛党"薅光后才知道堵上漏洞,股价已经拉升后才知道后悔--为什么我们不能在这些事情发生之前,或者至少是刚刚发生 ...

  2. 大数据准实时流式系统设计(一)——基于大数据框架设计

    前段时间负责了公司一个新的项目,项目不属于直接面向用户的线上实时响应系统,要求做到尽快毫秒级或者秒级响应的准实时系统.结合以前学习的一些大数据理论方面和参与的准实时系统方面的经验,对准实时系统架构设计 ...

  3. 连载:阿里巴巴大数据实践—实时技术

    简介:相对于离线批处理技术,流式实时处理技术作为一个非常重要的技术补充,在阿里巴巴集团内被广泛使用. 前言: -更多关于数智化转型.数据中台内容请加入阿里云数据中台交流群-数智俱乐部 和关注官方微信公 ...

  4. 流媒体服务器在大屏系统,实时流(直播流)播放、上墙(大屏播放)解决方案...

    场景描述 将实时流采集终端的视频数据实时推送到另外一个(多个)播放终端,完成远距离实时视频播放的功能.典型场景: (1)远程查看监控摄像头.选择指定摄像头,将该摄像头采集到的实时数据推送到指定播放终端 ...

  5. 携程是如何把大数据用于实时风控的

    携程是如何把大数据用于实时风控的 大数据 风控 携程 阅读20608  本文由携程技术中心投递,ID:ctriptech.作者:郁伟,携程技术中心风险控制部高级开发经理.2010加入携程,参与了携程结 ...

  6. JStorm—实时流式计算框架入门介绍

    JStorm介绍   JStorm是参考storm基于Java语言重写的实时流式计算系统框架,做了很多改进.如解决了之前的Storm nimbus节点的单点问题.   JStorm类似于Hadoop ...

  7. Java IO流大闯关--IO流的常用实现类

    这个系列的博客主要是对Java高级编程中IO流相关的知识点做一个梳理,内容主要包括File类.IO流原理及流的分类.文件流.缓冲流.转换流.标准输入输出流.打印流.数据流.对象流.随机存取文件流.NI ...

  8. 大数据Hadoop集群中常用的任务调度框架

    在大数据的集群环境中,经常用到的任务调度框架有如下几个,根据公司的业务的需要选择适合自己的业务调度的框架, 调度框架anzkaban,crontab(Linux自带).zeus(Alibaba).Oo ...

  9. 数据湖:流计算处理框架Flink概述

    系列专题:数据湖系列文章 大数据计算引擎分为离线计算和实时计算,离线计算就是我们通常说的批计算,代表是Hadoop MapReduce.Hive等大数据技术.实时计算也被称作流计算,代表是Storm. ...

  10. 阿里巴巴大数据实践—实时技术

    来源:数智化转型俱乐部 数据价值是具有时效性的,在一条数据产生的时候,如果不能及时处理并在业务系统中使用,就不能让数据保持最高的"新鲜度"和价值最大化. 相对于离线批处理技术,流式 ...

最新文章

  1. Java发送邮件工具类(可发送匿名邮件)
  2. java中的几种对话框_Java中弹出对话框中的几种方式
  3. 自助建站软件越来越友好,三大优点值得用心体会
  4. Cygwin,Nutch安装配置,检验是否正确(对网友守望者博客的修改---在此感谢守望者)2
  5. hdu 1525 Euclid‘s Game
  6. ssm使用全注解实现增删改查案例——IDeptService
  7. discuz设置用户每天回帖数_[建站教程]Discuz3.4设置QQ互联登陆教程
  8. LeetCode 561. Array Partition I
  9. B树插入和删除的各种情况分析
  10. node mysql和koa_node+koa2+mysql搭建博客后台
  11. CentOS永久修改主机名
  12. django登录验证码操作
  13. M语言中的操作符说明:数字以及时间相关的操作符
  14. Java代码注释加入图片和表格
  15. 有哪些英文论文查重软件值得推荐?
  16. 用这几款软件轻松自动识别图片文字,快码住
  17. XP系统最大能支持多少内存
  18. 在线支付功能的设计及其实现
  19. 网卡驱动:传输超时watchdog_timeo/ndo_tx_timeout
  20. mysql字符串分割为数组_mysql下将分隔字符串转换为数组

热门文章

  1. 使用IDA静态分析解密《舰娘Collection》的lua脚本
  2. 分享最新win7旗舰版/专业版企业版激活密钥和激活方法哦
  3. Mac安装PyQt4
  4. Laravel框架教程 入门篇(一)
  5. 湖北省襄阳市谷歌高清卫星地图下载
  6. DotNetBar 使用教程
  7. 数学建模学习:蒙特卡洛模拟
  8. 基于ThinkPHP5+MySQL的超市进销存管理系统
  9. DTFT和DFT有何区别?一文为你讲解清楚
  10. dos2unix命令详解