Spark Streaming实例分析

2015-02-02 21:00 4343人阅读 评论(0) 收藏 举报
 分类:
spark(11) 

转载地址:http://www.aboutyun.com/thread-8901-1-1.html

这一章要讲Spark Streaming,讲之前首先回顾下它的用法,具体用法请参照《Spark Streaming编程讲解 》。

Example代码分析

  1. val ssc = new StreamingContext(sparkConf, Seconds(1));
  2. // 获得一个DStream负责连接 监听端口:地址
  3. val lines = ssc.socketTextStream(serverIP, serverPort);
  4. // 对每一行数据执行Split操作
  5. val words = lines.flatMap(_.split(" "));
  6. // 统计word的数量
  7. val pairs = words.map(word => (word, 1));
  8. val wordCounts = pairs.reduceByKey(_ + _);
  9. // 输出结果
  10. wordCounts.print();
  11. ssc.start();             // 开始
  12. ssc.awaitTermination();  // 计算完毕退出

复制代码

1、首先实例化一个StreamingContext

2、调用StreamingContext的socketTextStream

3、对获得的DStream进行处理

4、调用StreamingContext是start方法,然后等待

我们看StreamingContext的socketTextStream方法吧。

  1. def socketTextStream(
  2. hostname: String,
  3. port: Int,
  4. storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
  5. ): ReceiverInputDStream[String] = {
  6. socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
  7. }

复制代码

1、StoageLevel是StorageLevel.MEMORY_AND_DISK_SER_2

2、使用SocketReceiver的bytesToLines把输入流转换成可遍历的数据

继续看socketStream方法,它直接new了一个

  1. new SocketInputDStream[T](this, hostname, port, converter, storageLevel)

复制代码

继续深入挖掘SocketInputDStream,追述一下它的继承关系,SocketInputDStream>>ReceiverInputDStream>>InputDStream>>DStream。

具体实现ReceiverInputDStream的类有好几个,基本上都是从网络端来数据的。

它实现了ReceiverInputDStream的getReceiver方法,实例化了一个SocketReceiver来接收数据。

SocketReceiver的onStart方法里面调用了receive方法,处理代码如下:

  1. socket = new Socket(host, port)
  2. val iterator = bytesToObjects(socket.getInputStream())
  3. while(!isStopped && iterator.hasNext) {
  4. store(iterator.next)
  5. }

复制代码

1、new了一个Socket来结束数据,用bytesToLines方法把InputStream转换成一行一行的字符串。

2、把每一行数据用store方法保存起来,store方法是从SocketReceiver的父类Receiver继承而来,内部实现是:

  1. def store(dataItem: T) {
  2. executor.pushSingle(dataItem)
  3. }

复制代码

executor是ReceiverSupervisor类型,Receiver的操作都是由它来处理。这里先不深纠,后面我们再说这个pushSingle的实现。

到这里我们知道lines的类型是SocketInputDStream,然后对它是一顿的转换,flatMap、map、reduceByKey、print,这些方法都不是RDD的那种方法,而是DStream独有的。

讲到上面这几个方法,我们开始转入DStream了,flatMap、map、reduceByKey、print方法都涉及到DStream的转换,这和RDD的转换是类似的。我们讲一下reduceByKey和print。

reduceByKey方法和RDD一样,调用的combineByKey方法实现的,不一样的是它直接new了一个ShuffledDStream了,我们接着看一下它的实现吧。

  1. override def compute(validTime: Time): Option[RDD[(K,C)]] = {
  2. parent.getOrCompute(validTime) match {
  3. case Some(rdd) => Some(rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine))
  4. case None => None
  5. }
  6. }

复制代码

在compute阶段,对通过Time获得的rdd进行reduceByKey操作。接下来的print方法也是一个转换:

  1. new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()

复制代码

打印前十个,超过10个打印"..."。需要注意register方法。

  1. ssc.graph.addOutputStream(this)

复制代码

它会把代码插入到当前的DStream添加到outputStreams里面,后面输出的时候如果没有outputStream就不会有输出,这个需要记住哦!

启动过程分析

前戏结束之后,ssc.start() 高潮开始了。 start方法很小,最核心的一句是JobScheduler的start方法。我们得转到JobScheduler方法上面去。
下面是start方法的代码:
  1. def start(): Unit = synchronized {
  2.   // 接受到JobSchedulerEvent就处理事件
  3. eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
  4. def receive = {
  5. case event: JobSchedulerEvent => processEvent(event)
  6. }
  7. }), "JobScheduler")
  8. listenerBus.start()
  9. receiverTracker = new ReceiverTracker(ssc)
  10. receiverTracker.start()
  11. jobGenerator.start()
  12. }

复制代码

1、启动了一个Actor来处理JobScheduler的JobStarted、JobCompleted、ErrorReported事件。

2、启动StreamingListenerBus作为监听器。

3、启动ReceiverTracker。

4、启动JobGenerator。

我们接下来看看ReceiverTracker的start方法。

  1. def start() = synchronized {if (!receiverInputStreams.isEmpty) {
  2. actor = ssc.env.actorSystem.actorOf(Props(new ReceiverTrackerActor), "ReceiverTracker")
  3. receiverExecutor.start()
  4. }
  5. }

复制代码

1、首先判断了一下receiverInputStreams不能为空,那receiverInputStreams是怎么时候写入值的呢?答案在SocketInputDStream的父类InputDStream当中,当实例化InputDStream的时候会在DStreamGraph里面添加InputStream。

  1. abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext) extends DStream[T](ssc_) {
  2. ssc.graph.addInputStream(this)
  3. //....
  4. }

复制代码

2、实例化ReceiverTrackerActor,它负责RegisterReceiver(注册Receiver)、AddBlock、ReportError(报告错误)、DeregisterReceiver(注销Receiver)等事件的处理。

3、启动receiverExecutor(实际类是ReceiverLauncher,这名字起得。。),它主要负责启动Receiver,start方法里面调用了startReceivers方法吧。

  1. private def startReceivers() {
  2.      // 对应着上面的那个例子,getReceiver方法获得是SocketReceiver
  3. val receivers = receiverInputStreams.map(nis => {
  4. val rcvr = nis.getReceiver()
  5. rcvr.setReceiverId(nis.id)
  6. rcvr
  7. })
  8. // 查看是否所有的receivers都有优先选择机器,这个需要重写Receiver的preferredLocation方法,目前只有FlumeReceiver重写了
  9. val hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_ && _)
  10. // 创建一个并行receiver集合的RDD, 把它们分散到各个worker节点上
  11. val tempRDD =
  12. if (hasLocationPreferences) {
  13. val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get)))
  14. ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
  15. } else {
  16. ssc.sc.makeRDD(receivers, receivers.size)
  17. }
  18. // 在worker节点上启动Receiver的方法,遍历所有Receiver,然后启动
  19. val startReceiver = (iterator: Iterator[Receiver[_]]) => {
  20. if (!iterator.hasNext) {
  21. throw new SparkException("Could not start receiver as object not found.")
  22. }
  23. val receiver = iterator.next()
  24. val executor = new ReceiverSupervisorImpl(receiver, SparkEnv.get)
  25. executor.start()
  26. executor.awaitTermination()
  27. }
  28. // 运行这个重复的作业来确保所有的slave都已经注册了,避免所有的receivers都到一个节点上
  29. if (!ssc.sparkContext.isLocal) {
  30. ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
  31. }
  32. // 把receivers分发出去,启动
  33. ssc.sparkContext.runJob(tempRDD, startReceiver)
  34. }

复制代码

1、遍历receiverInputStreams获取所有的Receiver。
2、查看这些Receiver是否全都有优先选择机器。
3、把SparkContext的makeRDD方法把所有Receiver包装到ParallelCollectionRDD里面,并行度是Receiver的数量。
4、发个小任务给确保所有的slave节点都已经注册了(这个小任务有点儿莫名其妙,感觉怪怪的)。
5、提交作业,启动所有Receiver。
Spark写得实在是太巧妙了,居然可以把Receiver包装在RDD里面,当做是数据来处理!
启动Receiver的时候,new了一个ReceiverSupervisorImpl,然后调的start方法,主要干了这么三件事情,代码就不贴了。
1、启动BlockGenerator。
2、调用Receiver的OnStart方法,开始接受数据,并把数据写入到ReceiverSupervisor。
3、调用onReceiverStart方法,发送RegisterReceiver消息给driver报告自己启动了。

保存接收到的数据

ok,到了这里,重点落到了BlockGenerator。前面说到SocketReceiver把接受到的数据调用ReceiverSupervisor的pushSingle方法保存。
  1. // 这是ReceiverSupervisorImpl的方法
  2. def pushSingle(data: Any) {
  3. blockGenerator += (data)
  4. }
  5. // 这是BlockGenerator的方法
  6. def += (data: Any): Unit = synchronized {
  7. currentBuffer += data
  8. }

复制代码

我们看一下它的start方法吧。

  1. def start() {
  2. blockIntervalTimer.start()
  3. blockPushingThread.start()
  4. }

复制代码

它启动了一个定时器RecurringTimer和一个线程执行keepPushingBlocks方法。
先看RecurringTimer的实现:
  1. while (!stopped) {
  2. clock.waitTillTime(nextTime)
  3. callback(nextTime)
  4. prevTime = nextTime
  5. nextTime += period
  6. }

复制代码

每隔一段时间就执行callback函数,callback函数是new的时候传进来的,是BlockGenerator的updateCurrentBuffer方法。

  1. private def updateCurrentBuffer(time: Long): Unit = synchronized {
  2. try {
  3. val newBlockBuffer = currentBuffer
  4. currentBuffer = new ArrayBuffer[Any]
  5. if (newBlockBuffer.size > 0) {
  6. val blockId = StreamBlockId(receiverId, time - blockInterval)
  7. val newBlock = new Block(blockId, newBlockBuffer)
  8. blocksForPushing.put(newBlock)
  9. }
  10. } catch {case t: Throwable =>
  11. reportError("Error in block updating thread", t)
  12. }
  13. }

复制代码

它new了一个Block出来,然后添加到blocksForPushing这个ArrayBlockingQueue队列当中。
提到这里,有两个参数需要大家注意的:
  1. spark.streaming.blockInterval   默认值是200
  2. spark.streaming.blockQueueSize  默认值是10

复制代码

这是前面提到的间隔时间和队列的长度,间隔时间默认是200毫秒,队列是最多能容纳10个Block,多了就要阻塞了。

我们接下来看一下BlockGenerator另外启动的那个线程执行的keepPushingBlocks方法到底在干什么?

  1. private def keepPushingBlocks() {
  2.     while(!stopped) {
  3. Option(blocksForPushing.poll(100, TimeUnit.MILLISECONDS)) match {
  4. case Some(block) => pushBlock(block)
  5. case None =>
  6. }
  7. }
  8.    // ...退出之前把剩下的也输出去了
  9. }

复制代码

它在把blocksForPushing中的block不停的拿出来,调用pushBlock方法,这个方法属于在实例化BlockGenerator的时候,从ReceiverSupervisorImpl传进来的BlockGeneratorListener的。

  1. private val blockGenerator = new BlockGenerator(new BlockGeneratorListener {
  2. def onError(message: String, throwable: Throwable) {
  3. reportError(message, throwable)
  4. }
  5. def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {
  6. pushArrayBuffer(arrayBuffer, None, Some(blockId))
  7. }
  8. }, streamId, env.conf)

复制代码

1、reportError,通过actor向driver发送错误报告消息ReportError。
2、调用pushArrayBuffer保存数据。
下面是pushArrayBuffer方法:
  1. def pushArrayBuffer(arrayBuffer: ArrayBuffer[_], optionalMetadata: Option[Any], optionalBlockId: Option[StreamBlockId]
  2. ) {
  3. val blockId = optionalBlockId.getOrElse(nextBlockId)
  4. val time = System.currentTimeMillis
  5. blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], storageLevel, tellMaster = true)
  6. reportPushedBlock(blockId, arrayBuffer.size, optionalMetadata)
  7. }

复制代码

1、把Block保存到BlockManager当中,序列化方式为之前提到的StorageLevel.MEMORY_AND_DISK_SER_2(内存不够就写入到硬盘,并且在2个节点上保存的方式)。
2、调用reportPushedBlock给driver发送AddBlock消息,报告新添加的Block,ReceiverTracker收到消息之后更新内部的receivedBlockInfo映射关系。

处理接收到的数据

前面只讲了数据的接收和保存,那数据是怎么处理的呢?
之前一直讲ReceiverTracker,而忽略了之前的JobScheduler的start方法里面最后启动的JobGenerator。
  1. def start(): Unit = synchronized {
  2. eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
  3. def receive = {
  4. case event: JobGeneratorEvent =>  processEvent(event)
  5. }
  6. }), "JobGenerator")
  7. if (ssc.isCheckpointPresent) {
  8. restart()
  9. } else {
  10. startFirstTime()
  11. }
  12. }

复制代码

1、启动一个actor处理JobGeneratorEvent事件。

2、如果是已经有CheckPoint了,就接着上次的记录进行处理,否则就是第一次启动。

我们先看startFirstTime吧,CheckPoint以后再说吧,有点儿小复杂。

  1. private def startFirstTime() {
  2. val startTime = new Time(timer.getStartTime())
  3. graph.start(startTime - graph.batchDuration)
  4. timer.start(startTime.milliseconds)
  5. }

复制代码

1、timer.getStartTime计算出来下一个周期的到期时间,计算公式:(math.floor(clock.currentTime.toDouble / period) + 1).toLong * period,以当前的时间/除以间隔时间,再用math.floor求出它的上一个整数(即上一个周期的到期时间点),加上1,再乘以周期就等于下一个周期的到期时间。
2、启动DStreamGraph,启动时间=startTime - graph.batchDuration。
3、启动Timer,我们看看它的定义:
  1. private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
  2. longTime => eventActor ! GenerateJobs(new Time(longTime)), "JobGenerator")

复制代码

到这里就清楚了,DStreamGraph的间隔时间就是timer的间隔时间,启动时间要设置成比Timer早一个时间间隔,原因再慢慢探究。
可以看出来每隔一段时间,Timer给eventActor发送GenerateJobs消息,我们直接去看它的处理方法generateJobs吧,中间忽略了一步,大家自己看。
  1. private def processEvent(event: JobGeneratorEvent) {
  2. event match {
  3. case GenerateJobs(time) => generateJobs(time)
  4. case ClearMetadata(time) => clearMetadata(time)
  5. case DoCheckpoint(time) => doCheckpoint(time)
  6. case ClearCheckpointData(time) => clearCheckpointData(time)
  7. }
  8. }

复制代码

下面是generateJobs方法。

  1. private def generateJobs(time: Time) {
  2. SparkEnv.set(ssc.env)
  3. Try(graph.generateJobs(time)) match {
  4. case Success(jobs) =>
  5. val receivedBlockInfo = graph.getReceiverInputStreams.map { stream =>
  6. val streamId = stream.id
  7. val receivedBlockInfo = stream.getReceivedBlockInfo(time)
  8. (streamId, receivedBlockInfo)
  9. }.toMap
  10. jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo))
  11. case Failure(e) =>
  12. jobScheduler.reportError("Error generating jobs for time " + time, e)
  13. }
  14. eventActor ! DoCheckpoint(time)
  15. }

复制代码

1、DStreamGraph生成jobs。
2、从stream那里获取接收到的Block信息。
3、调用submitJobSet方法提交作业。
4、提交完作业之后,做一个CheckPoint。
先看DStreamGraph是怎么生成的jobs。
  1. def generateJobs(time: Time): Seq[Job] = {
  2. val jobs = this.synchronized {
  3. outputStreams.flatMap(outputStream => outputStream.generateJob(time))
  4. }
  5. jobs
  6. }

复制代码

outputStreams在这个例子里面是print这个方法里面添加的,这个在前面说了,我们继续看DStream的generateJob。

  1. private[streaming] def generateJob(time: Time): Option[Job] = {
  2. getOrCompute(time) match {
  3. case Some(rdd) => {
  4. val jobFunc = () => {
  5. val emptyFunc = { (iterator: Iterator[T]) => {} }
  6. context.sparkContext.runJob(rdd, emptyFunc)
  7. }
  8. Some(new Job(time, jobFunc))
  9. }
  10. case None => None
  11. }
  12. }

复制代码

1、调用getOrCompute方法获得RDD
2、new了一个方法去提交这个作业,缺什么都不做
为什么呢?这是直接跳转的错误,呵呵,因为这个outputStream是print方法返回的,它应该是ForEachDStream,所以我们应该看的是它里面的generateJob方法。
  1. override def generateJob(time: Time): Option[Job] = {
  2. parent.getOrCompute(time) match {
  3. case Some(rdd) =>
  4. val jobFunc = () => {
  5. foreachFunc(rdd, time)
  6. }
  7. Some(new Job(time, jobFunc))
  8. case None => None
  9. }
  10. }

复制代码

这里请大家千万要注意,不要在这块被卡住了。
我们看看它这个RDD是怎么出来的吧。
  1. private[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {
  2. // If this DStream was not initialized (i.e., zeroTime not set), then do it
  3. // If RDD was already generated, then retrieve it from HashMap
  4. generatedRDDs.get(time) match {
  5. // 这个RDD已经被生成过了,直接用就是了
  6. case Some(oldRDD) => Some(oldRDD)
  7. // 还没生成过,就调用compte函数生成一个
  8. case None => {
  9. if (isTimeValid(time)) {
  10. compute(time) match {
  11. case Some(newRDD) =>
  12.          // 设置保存的级别
  13. if (storageLevel != StorageLevel.NONE) {
  14. newRDD.persist(storageLevel)
  15. }
  16.          // 如果现在需要,就做CheckPoint
  17. if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
  18. newRDD.checkpoint()
  19. }
  20.          // 添加到generatedRDDs里面去,可以再次利用
  21. generatedRDDs.put(time, newRDD)
  22. Some(newRDD)
  23. case None =>
  24. None
  25. }
  26. } else {
  27. None
  28. }
  29. }
  30. }
  31. }

复制代码

从上面的方法可以看出来它是通过每个DStream自己实现的compute函数得出来的RDD。我们找到SocketInputDStream,没有compute函数,在父类ReceiverInputDStream里面找到了。

  1. override def compute(validTime: Time): Option[RDD[T]] = {
  2. // 如果出现了时间比startTime早的话,就返回一个空的RDD,因为这个很可能是master挂了之后的错误恢复
  3. if (validTime >= graph.startTime) {
  4. val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id)
  5. receivedBlockInfo(validTime) = blockInfo
  6. val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId])
  7. Some(new BlockRDD[T](ssc.sc, blockIds))
  8. } else {
  9. Some(new BlockRDD[T](ssc.sc, Array[BlockId]()))
  10. }
  11. }

复制代码

通过DStream的id把receiverTracker当中把接收到的block信息全部拿出来,记录到ReceiverInputDStream自身的receivedBlockInfo这个HashMap里面,就把RDD返回了,RDD里面实际包含的是Block的id的集合。
现在我们就可以回到之前JobGenerator的generateJobs方法,我们就清楚它这句是提交的什么了。
  1. jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo))

复制代码

JobSet是记录Job的完成情况的,直接看submitJobSet方法吧。
  1. def submitJobSet(jobSet: JobSet) {
  2. if (jobSet.jobs.isEmpty) {
  3. } else {
  4. jobSets.put(jobSet.time, jobSet)
  5. jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
  6. }
  7. }

复制代码

遍历jobSet里面的所有jobs,通过jobExecutor这个线程池提交。我们看一下JobHandler就知道了。

  1. private class JobHandler(job: Job) extends Runnable {
  2. def run() {
  3. eventActor ! JobStarted(job)
  4. job.run()
  5. eventActor ! JobCompleted(job)
  6. }
  7. }

复制代码

1、通知eventActor处理JobStarted事件。
2、运行job。
3、通知eventActor处理JobCompleted事件。
这里的重点是job.run,事件处理只是更新相关的job信息。
  1. def run() {
  2. result = Try(func())
  3. }

复制代码

在遍历BlockRDD的时候,在compute函数获取该Block(详细请看BlockRDD),然后对这个RDD的结果进行打印。
到这里就算结束了,最后来个总结吧,图例在下一章补上,这一章只是过程分析:
1、可以有多个输入,我们可以通过StreamingContext定义多个输入,比如我们监听多个(host,ip),可以给它们定义各自的处理逻辑和输出,输出方式不仅限于print方法,还可以有别的方法,saveAsTextFiles和saveAsObjectFiles。这块的设计是支持共享StreamingContext的。
2、StreamingContext启动了JobScheduler,JobScheduler启动ReceiverTracker和JobGenerator。
3、ReceiverTracker是通过把Receiver包装成RDD的方式,发送到Executor端运行起来的,Receiver起来之后向ReceiverTracker发送RegisterReceiver消息。
3、Receiver把接收到的数据,通过ReceiverSupervisor保存。
4、ReceiverSupervisorImpl把数据写入到BlockGenerator的一个ArrayBuffer当中。
5、BlockGenerator内部每个一段时间(默认是200毫秒)就把这个ArrayBuffer构造成Block添加到blocksForPushing当中。
6、BlockGenerator的另外一条线程则不断的把加入到blocksForPushing当中的Block写入到BlockManager当中,并向ReceiverTracker发送AddBlock消息。
7、JobGenerator内部有个定时器,定期生成Job,通过DStream的id,把ReceiverTracker接收到的Block信息从BlockManager上抓取下来进行处理,这个间隔时间是我们在实例化StreamingContext的时候传进去的那个时间,在这个例子里面是Seconds(1)。

0

转载于:https://www.cnblogs.com/think90/p/6024490.html

Spark Streaming实例相关推荐

  1. Spark Streaming实例分析

    这一章要讲Spark Streaming,讲之前首先回顾下它的用法,具体用法请参照< Spark Streaming编程讲解 >. Example代码分析 val ssc = new St ...

  2. Spark Streaming处理Socket流简单实例

    在本文中我将在IDEA工具中开发一个SparkStream程序用于监听本机9999端口所接收的数据 首先,我们将Spark Streaming类的名称以及从StreamingContext进行的一些隐 ...

  3. spark streaming python实例_kafka+spark streaming代码实例(pyspark+python)

    一.系统准备 1.启动zookeeper:bin/zkServer.cmd start 2.启动kafka:bin/kafka-server-start.sh -daemon config/serve ...

  4. 使用Spark Streaming从kafka中读取数据把数据写入到mysql 实例

    文章目录 一. 题目 题目和数据 二. pom依赖 三.建表语句 四. 连接kafka配置类 五. 自定义分区类 六. 读取数据并发送数据 七. 消费数据,把数据存储到mysql 一. 题目 题目和数 ...

  5. 基于大数据的Uber数据实时监控(Part 2:Kafka和Spark Streaming)

    导言 本文是系列文章的第二篇,我们将建立一个分析和监控Uber汽车GPS旅行数据的实时示例.在第一篇文章中讨论了使用Apache Spark的K-means算法创建机器学习模型,以根据位置聚类Uber ...

  6. Spark Streaming与Kafka Streaming对比

    ♚ 叙述 对流处理的需求每天都在增加.原因是,处理大量数据通常是不够的. 必须快速处理数据,以便公司能够对不断变化的业务条件作出实时反应. 流处理是对数据进行连续.并行的实时处理. 流式处理是处理数据 ...

  7. Spark Streaming学习笔记

    特点: Spark Streaming能够实现对实时数据流的流式处理,并具有很好的可扩展性.高吞吐量和容错性. Spark Streaming支持从多种数据源提取数据,如:Kafka.Flume.Tw ...

  8. 通过案例对 spark streaming 透彻理解三板斧之一: spark streaming 另类实验

    本期内容 : spark streaming另类在线实验 瞬间理解spark streaming本质 一.  我们最开始将从Spark Streaming入手 为何从Spark Streaming切入 ...

  9. Spark详解(十三):Spark Streaming 运行架构分析

    1. 运行架构 SparkStreaming的主要功能包括流处理引擎的流数据接收与存储以及批处理作业的生成与管理,而Spark核心负责处理Spark Streaming发送过来的作用.Spark St ...

  10. Spark详解(十二):Spark Streaming原理和实现

    1 简介 SparkStreaming是Spark核心API的一个扩展,具有高吞吐量和容错能力的实时流数据处理系统,可以对多种数据源(如Kdfka.Flume.Twitter.Zero和TCP 套接字 ...

最新文章

  1. python使用pandas通过聚合获取时序数据的最后一个指标数据(例如长度指标、时间指标)生成标签并与原表连接(join)进行不同标签特征的可视化分析
  2. Python生成随机五位数——模仿手机验证码
  3. 利用MATLAB帮助求解作业中的Laplace变换和Z变换
  4. RoIPooling
  5. 系列文章--jQuery教程
  6. python split(), os.path.split()和os.path.splitext()函数的区别
  7. python bottle web框架上传静态文件与加载静态文件
  8. hyperopt中文文档:Installation-Notes安装说明
  9. 浅析HTML、CSS、JavaScript之间的联系与区别
  10. 阿里巴巴证实全资收购协作软件平台 Teambition
  11. 在B/S系统中引入定时器的功能
  12. 【TWVRP】基于matlab遗传算法求解带时间窗的车辆路径问题【含Matlab源码 002期】
  13. 支持向量回归(多核函数)
  14. Windows批处理添加注释
  15. SIF协议(一线通)
  16. 编码通信与魔术初步(六)——经典魔术《傅氏幻术》赏析和《我的心灵感应》...
  17. 10.13 写一个用矩形法求定积分的通用函数,分别求∫_0^1▒sinxdx 、∫_0^1▒cosxdx、∫_0^1▒〖e^x dx〗的值。
  18. mysql存储过程算四分位
  19. 阿里百川淘宝联盟私域会员对接
  20. 什么级别的企业可以进行数字化转型?

热门文章

  1. C++ set 排序 修改元素之后不会改变原来的排序
  2. 【Django 2021年最新版教程25】模板语言 前端for循环怎么用 实例
  3. 【Django 2021年最新版教程13】Cookie是什么 如何使用
  4. centos 6.5 编译php mysql5.6_CentOS6.5 编译安装PHP5.6(apache模块)
  5. php点击表格单元格链接,详解PhpSpreadsheet单元格设置样式、图片、超链接等
  6. Unity MRTK 制作按钮调整大小
  7. java ctr v,解决 ctrol c ctrol v 复制 粘贴 不好用的问题 只复制一次的问题
  8. 二十三、Oracle学习笔记:综合案例
  9. 新xp系统如何链接网络连接服务器地址,xp系统如何设置宽带连接
  10. arduino 读取串口信息hex_进阶教程1:Arduino串口通信与电脑控制LED