Spark Streaming是构建在Spark Core基础之上的流处理框架,是Spark非常重要的组成部分。Spark Streaming于2013年2月在Spark0.7.0版本中引入,发展至今已经成为了在企业中广泛使用的流处理平台。在2016年7月,Spark2.0版本中引入了Structured Streaming,并在Spark2.2版本中达到了生产级别,Structured Streaming是构建在Spark SQL之上的流处理引擎,用户可以使用DataSet/DataFreame API进行流处理,目前Structured Streaming在不同的版本中发展速度很快。值得注意的是,本文不会对Structured Streaming做过多讲解,主要针对Spark Streaming进行讨论,包括以下内容:

  • Spark Streaming介绍
  • Transformations与Output Operations
  • Spark Streaming数据源(Sources)
  • Spark Streaming 数据汇(Sinks)

Spark Streaming介绍

什么是DStream

Spark Streaming是构建在Spark Core的RDD基础之上的,与此同时Spark Streaming引入了一个新的概念:DStream(Discretized Stream,离散化数据流),表示连续不断的数据流。DStream抽象是Spark Streaming的流处理模型,在内部实现上,Spark Streaming会对输入数据按照时间间隔(如1秒)分段,每一段数据转换为Spark中的RDD,这些分段就是Dstream,并且对DStream的操作都最终转变为对相应的RDD的操作。如下图所示:

如上图,这些底层的RDD转换操作是由Spark引擎来完成的,DStream的操作屏蔽了许多底层的细节,为用户提供了比较方便使用的高级API。

计算模型

在Flink中,批处理是流处理的特例,所以Flink是天然的流处理引擎。而Spark Streaming则不然,Spark Streaming认为流处理是批处理的特例,即Spark Streaming并不是纯实时的流处理引擎,在其内部使用的是microBatch模型,即将流处理看做是在较小时间间隔内(batch interval)的一些列的批处理。关于时间间隔的设定,需要结合具体的业务延迟需求,可以实现秒级或者分钟级的间隔。

Spark Streaming会将每个短时间间隔内接收的数据存储在集群中,然后对其作用一系列的算子操作(map,reduce, groupBy等)。执行过程见下图:

如上图:Spark Streaming会将输入的数据流分割成一个个小的batch,每一个batch都代表这一些列的RDD,然后将这些batch存储在内存中。通过启动Spark作业来处理这些batch数据,从而实现一个流处理应用。

Spark Streaming的工作机制

概览

  • 在Spark Streaming中,会有一个组件Receiver,作为一个长期运行的task跑在一个Executor上
  • 每个Receiver都会负责一个input DStream(比如从文件中读取数据的文件流,比如套接字流,或者从Kafka中读取的一个输入流等等)
  • Spark Streaming通过input DStream与外部数据源进行连接,读取相关数据

执行细节

  • 1.启动StreamingContext
  • 2.StreamingContext启动receiver,该receiver会一直运行在Executor的task中。用于连续不断地接收数据源,有两种主要的reciver,一种是可靠的reciver,当数据被接收并且存储到spark,发送回执确认,另一种是不可靠的reciver,对于数据源不发送回执确认。接收的数据会被缓存到work节点内存中,也会被复制到其他executor的所在的节点内存中,用于容错处理。
  • 3.Streaming context周期触发job(根据batch-interval时间间隔)进行数据处理。
  • 4.将数据输出。

Spark Streaming编程步骤

经过上面的分析,对Spark Streaming有了初步的认识。那么该如何编写一个Spark Streaming应用程序呢?一个Spark Streaming一般包括一下几个步骤:

  • 1.创建StreamingContext
  • 2.创建输入DStream来定义输入源
  • 3.通过对DStream应用转换操作和输出操作来定义处理逻辑
  • 4.用streamingContext.start()来开始接收数据和处理流程
  • 5.streamingContext.awaitTermination()方法来等待处理结束
  object StartSparkStreaming {    def main(args: Array[String]): Unit = {      val conf = new SparkConf()        .setMaster("local[2]")        .setAppName("Streaming")      // 1.创建StreamingContext      val ssc = new StreamingContext(conf, Seconds(5))      Logger.getLogger("org.apache.spark").setLevel(Level.OFF)      Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)      // 2.创建DStream      val lines = ssc.socketTextStream("localhost", 9999)      // 3.定义流计算处理逻辑      val count = lines.flatMap(_.split(" "))        .map(word => (word, 1))        .reduceByKey(_ + _)      // 4.输出结果      count.print()      // 5.启动      ssc.start()      // 6.等待执行      ssc.awaitTermination()    }  }

Transformations与Output Operations

DStream是不可变的, 这意味着不能直接改变它们的内容,而是通过对DStream进行一系列转换(Transformation)来实现预期的应用程序逻辑。每次转换都会创建一个新的DStream,该DStream表示来自父DStream的转换后的数据。DStream转换是惰性(lazy)的,这意味只有执行output操作之后,才会去执行转换操作,这些触发执行的操作称之为output operation

Transformations

Spark Streaming提供了丰富的transformation操作,这些transformation又分为了有状态的transformation无状态的transformation。除此之外,Spark Streaming也提供了一些window操作,值得注意的是window操作也是有状态的。具体细节如下:

无状态的transformation

无状态的transformation是指每一个micro-batch的处理是相互独立的,即当前的计算结果不受之前计算结果的影响,Spark Streaming的大部分算子都是无状态的,比如常见的map(),flatMap(),reduceByKey()等等。

  • map(func)

对源DStream的每个元素,采用func函数进行转换,得到一个新的Dstream

    /** Return a new DStream by applying a function to all elements of this DStream. */    def map[U: ClassTag](mapFunc: T => U): DStream[U] = ssc.withScope {      new MappedDStream(this, context.sparkContext.clean(mapFunc))    }
  • flatMap(func)

与map相似,但是每个输入项可用被映射为0个或者多个输出项

  /**   * Return a new DStream by applying a function to all elements of this DStream,   * and then flattening the results   */  def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U] = ssc.withScope {    new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))  }
  • filter(func)

返回一个新的DStream,仅包含源DStream中满足函数func的项

  /** Return a new DStream containing only the elements that satisfy a predicate. */  def filter(filterFunc: T => Boolean): DStream[T] = ssc.withScope {    new FilteredDStream(this, context.sparkContext.clean(filterFunc))  }
  • repartition(numPartitions)

通过创建更多或者更少的分区改变DStream的并行程度

/**   * Return a new DStream with an increased or decreased level of parallelism. Each RDD in the   * returned DStream has exactly numPartitions partitions.   */  def repartition(numPartitions: Int): DStream[T] = ssc.withScope {    this.transform(_.repartition(numPartitions))  }

  • reduce(func)

利用函数func聚集源DStream中每个RDD的元素,返回一个包含单元素RDDs的新DStream

  /**   * Return a new DStream in which each RDD has a single element generated by reducing each RDD   * of this DStream.   */  def reduce(reduceFunc: (T, T) => T): DStream[T] = ssc.withScope {    this.map((null, _)).reduceByKey(reduceFunc, 1).map(_._2)  }

  • count()

统计源DStream中每个RDD的元素数量

/**   * Return a new DStream in which each RDD has a single element generated by counting each RDD   * of this DStream.   */  def count(): DStream[Long] = ssc.withScope {    this.map(_ => (null, 1L))        .transform(_.union(context.sparkContext.makeRDD(Seq((null, 0L)), 1)))        .reduceByKey(_ + _)        .map(_._2)  }
  • union(otherStream)

返回一个新的DStream,包含源DStream和其他DStream的元素

/**   * Return a new DStream by unifying data of another DStream with this DStream.   * @param that Another DStream having the same slideDuration as this DStream.   */  def union(that: DStream[T]): DStream[T] = ssc.withScope {    new UnionDStream[T](Array(this, that))  }
  • countByValue()

应用于元素类型为K的DStream上,返回一个(K,V)键值对类型的新DStream,每个键的值是在原DStream的每个RDD中的出现次数,比如lines.flatMap(_.split(" ")).countByValue().print(),对于输入:spark spark flink,将输出:(spark,2),(flink,1),即按照元素值进行分组,然后统计每个分组的元素个数。

从源码可以看出:底层实现为map((_,1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions),即先按当前的元素映射为一个tuple,其中key即为当前元素的值,然后再按照key做汇总。

/**   * Return a new DStream in which each RDD contains the counts of each distinct value in   * each RDD of this DStream. Hash partitioning is used to generate   * the RDDs with `numPartitions` partitions (Spark's default number of partitions if   * `numPartitions` not specified).   */  def countByValue(numPartitions: Int = ssc.sc.defaultParallelism)(implicit ord: Ordering[T] = null)      : DStream[(T, Long)] = ssc.withScope {    this.map((_, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions)  }

  • reduceByKey(func, [numTasks])

当在一个由(K,V)键值对组成的DStream上执行该操作时,返回一个新的由(K,V)键值对组成的DStream,每一个key的值均由给定的recuce函数(func)聚集起来

比如:lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _).print()

对于输入:spark spark flink,将输出:(spark,2),(flink,1)

  /**   * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are   * merged using the associative and commutative reduce function. Hash partitioning is used to   * generate the RDDs with Spark's default number of partitions.   */  def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = ssc.withScope {    reduceByKey(reduceFunc, defaultPartitioner())  }

  • join(otherStream, [numTasks])

当应用于两个DStream(一个包含(K,V)键值对,一个包含(K,W)键值对),返回一个包含(K, (V, W))键值对的新Dstream

  /**   * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.   * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.   */  def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))] = ssc.withScope {    join[W](other, defaultPartitioner())  }
  • cogroup(otherStream, [numTasks])

当应用于两个DStream(一个包含(K,V)键值对,一个包含(K,W)键值对),返回一个包含(K, Seq[V], Seq[W])的元组

// 输入:spark// 输出:(spark,(CompactBuffer(1),CompactBuffer(1)))val DS1 = lines.flatMap(_.split(" ")).map((_,1))val DS2 = lines.flatMap(_.split(" ")).map((_,1))DS1.cogroup(DS2).print()
  /**   * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.   * Hash partitioning is used to generate the RDDs with Spark's default number   * of partitions.   */  def cogroup[W: ClassTag](      other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))] = ssc.withScope {    cogroup(other, defaultPartitioner())  }
  • transform(func)

通过对源DStream的每个RDD应用RDD-to-RDD函数,创建一个新的DStream。支持在新的DStream中做任何RDD操作

// 输入:spark spark flink// 输出:(spark,2)、(flink,1)val lines = ssc.socketTextStream("localhost", 9999)val resultDStream = lines.transform(rdd => {rdd.flatMap(_.split("\\W")).map((_, 1)).reduceByKey(_ + _)})resultDStream.print()
  /**   * Return a new DStream in which each RDD is generated by applying a function   * on each RDD of 'this' DStream.   */  def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = ssc.withScope {    val cleanedF = context.sparkContext.clean(transformFunc, false)    transform((r: RDD[T], _: Time) => cleanedF(r))  }

有状态的transformation

有状态的transformation是指每个micro-batch的处理不是相互独立的,即当前的micro-batch处理依赖于之前的micro-batch计算结果。常见的有状态的transformation主要有countByValueAndWindow, reduceByKeyAndWindow , mapWithState, updateStateByKey等等。其实所有的基于window的操作都是有状态的,因为追踪整个窗口内的数据。

关于有状态的transformation和Window Operations,参见下文。

Output Operations

使用Output operations可以将DStream写入多外部存储设备或打印到控制台。上文提到,Spark Streaming的transformation是lazy的,因此需要Output Operation进行触发计算,其功能类似于RDD的action操作。具体详见下文Spark Streaming 数据汇(Sinks)。

Spark Streaming数据源

Spark Streaming的目的是成为一个通用的流处理框架,为了实现这一目标,Spark Streaming使用Receiver来集成各种各样的数据源。但是,对于有些数据源(如kafka),Spark Streaming支持使用Direct的方式去接收数据,这种方式比Receiver方式性能要好。

基于Receiver的方式

Receiver的作用是从数据源收集数据,然后将数据传送给Spark Streaming。基本原理是:随着数据的不断到来,在相对应的batch interval时间间隔内,这些数据会被收集并且打包成block,只要等到batch interval时间完成了,收集的数据block会被发送给spark进行处理。

如上图:当Spark Streaming启动时,receiver开始收集数据。在t0的batch interval结束时(即收集完了该时间段内的数据),收集到的block #0会被发送到Spark进行处理。在t2时刻,Spark会处理t1的batch interval的数据block,与此同时会不停地收集t2的batch interval对应的block**#2**。

常见的基于Receiver的数据源包括:Kafka, Kinesis, Flume,Twitter。除此之外,用户也可以通过继承 Receiver抽象类,实现onStart()onStop()两个方法,进行自定义Receiver。本文不会对基于Receiver的数据源做过多讨论,主要针对基于Direct的Kafka数据源进行详细解释。

基于Direct的方式

Spark 1.3中引入了这种新的无Receiver的Direct方法,以确保更强的端到端保证。该方法不是使用Receiver来接收数据,而是定期查询Kafka每个topic+partition中的最新偏移量,并相应地定义要在每个批次中处理的偏移量范围。启动用于处理数据的作业时,Kafka的简单consumer API用于读取Kafka定义的偏移量范围(类似于从文件系统读取文件)。请注意,此功能是在Scala和Java API的Spark 1.3引入的,在Python API的Spark 1.4中引入的。

基于Direct的方式具有以下优点:

  • 简化并行读取

如果要读取多个partition,不需要创建多个输入DStream然后对他们进行union操作。Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从kafka中读取数据。所以在kafka partition和RDD partition之间,有一一对应的关系。

  • 高性能

如果要保证数据零丢失,在基于Receiver的方式中,需要开启WAL机制。这种方式其实效率很低,因为数据实际被复制了两份,kafka自己本身就有高可靠的机制,会对数据复制一份,而这里又会复制一份到WAL中。而基于Direct的方式,不依赖于Receiver,不需要开启WAL机制,只要kafka中做了数据的复制,那么就可以通过kafka的副本进行恢复。

  • Exactly-once语义

基于Receiver的方式,使用kafka的高阶API来在Zookeeper中保存消费过的offset。这是消费kafka数据的传统方式。这种方式配合WAL机制,可以保证数据零丢失的高可靠性,但是却无法保证Exactly-once语义(Spark和Zookeeper之间可能是不同步的)。基于Direct的方式,使用kafka的简单API,Spark Streaming自己就负责追踪消费的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保证数据时消费一次且仅消费一次。

Spark Streaming集成kafka

使用方式

使用KafkaUtils添加Kafka数据源,源码如下:

  def createDirectStream[K, V](      ssc: StreamingContext,      locationStrategy: LocationStrategy,      consumerStrategy: ConsumerStrategy[K, V]    ): InputDStream[ConsumerRecord[K, V]] = {    val ppc = new DefaultPerPartitionConfig(ssc.sparkContext.getConf)    createDirectStream[K, V](ssc, locationStrategy, consumerStrategy, ppc)  }

具体参数解释:

  • K:Kafka消息key的类型

  • V:Kafka消息value的类型

  • ssc:StreamingContext

  • locationStrategy: LocationStrategy,根据Executor中的主题的分区来调度consumer,即尽可能地让consumer靠近leader partition。该配置可以提升性能,但对于location的选择只是一种参考,并不是绝对的。可以选择如下方式:

    注意:多数情况下使用PreferConsisten,其他两种方式只是在特定的场景使用。这种配置只是一种参考,具体的情况还是会根据集群的资源自动调整。

    • PreferBrokers:Spark和Kafka运行在同一个节点上,可以使用此种方式
    • PreferConsistent:大部分情况使用此方式,它将一致地在所有Executor之间分配分区
    • PreferFixed:将特定的主题分区放置到特定的主机上,在数据负载不均衡时使用
  • consumerStrategy:消费策略,主要有下面三种方式:

    注意:大多数情况下使用Subscribe方式。

    • Subscribe:订阅指定主题名称的主题集合
    • SubscribePattern:通过正则匹配,订阅相匹配的主题数据
    • Assign:订阅一个主题+分区的集合

使用案例

object TolerateWCTest {

  def createContext(checkpointDirectory: String): StreamingContext = {

    val sparkConf = new SparkConf()      .set("spark.streaming.backpressure.enabled", "true")      //每秒钟从kafka分区中读取的records数量,默认not set      .set("spark.streaming.kafka.maxRatePerPartition", "1000") //      //Driver为了获取每个leader分区的最近offsets,连续进行重试的次数,      //默认是1,表示最多重试2次,仅仅适用于 new Kafka direct stream API      .set("spark.streaming.kafka.maxRetries", "2")      .setAppName("TolerateWCTest")

    val ssc = new StreamingContext(sparkConf, Seconds(3))    ssc.checkpoint(checkpointDirectory)    val topic = Array("testkafkasource2")    val kafkaParam = Map[String, Object](      "bootstrap.servers" -> "kms-1:9092",      "key.deserializer" -> classOf[StringDeserializer],      "value.deserializer" -> classOf[StringDeserializer],      "group.id" -> "group0",      "auto.offset.reset" -> "latest", //默认latest,      "enable.auto.commit" -> (false: java.lang.Boolean)) //默认true,false:手动提交

    val lines = KafkaUtils.createDirectStream(      ssc,      LocationStrategies.PreferConsistent,      ConsumerStrategies.Subscribe[String, String](topic, kafkaParam))

    val words = lines.flatMap(_.value().split(" "))    val wordDstream = words.map(x => (x, 1))    val stateDstream = wordDstream.reduceByKey(_ + _)

    stateDstream.cache()    //参照batch interval设置,    //不得低于batch interval,否则会报错,    //设为batch interval的2倍    stateDstream.checkpoint(Seconds(6))

    //把DStream保存到MySQL数据库中    stateDstream.foreachRDD(rdd =>      rdd.foreachPartition { record =>        var conn: Connection = null        var stmt: PreparedStatement = null        // 给每个partition,获取一个连接        conn = ConnectionPool.getConnection        // 遍历partition中的数据,使用一个连接,插入数据库

        while (record.hasNext) {          val wordcounts = record.next()          val sql = "insert into wctbl(word,count) values (?,?)"          stmt = conn.prepareStatement(sql);          stmt.setString(1, wordcounts._1.trim)          stmt.setInt(2, wordcounts._2.toInt)          stmt.executeUpdate()        }        // 用完以后,将连接还回去        ConnectionPool.returnConnection(conn)      })    ssc  }

  def main(args: Array[String]) {

    val checkpointDirectory = "hdfs://kms-1:8020/docheckpoint"

    val ssc = StreamingContext.getOrCreate(      checkpointDirectory,      () => createContext(checkpointDirectory))    ssc.start()    ssc.awaitTermination()  }}

Spark Streaming 数据汇(Sinks)

Output Operation介绍

Spark Streaming提供了下面内置的Output Operation,如下:

  • print()

打印数据数据到标准输出,如果不传递参数,默认打印前10个元素

  • saveAsTextFiles(prefix, [suffix])

将DStream内容存储到文件系统,每个batch interval的文件名称为`prefix-TIME_IN_MS[.suffix]

  • saveAsObjectFiles(prefix, [suffix])

将DStream的内容保存为序列化的java对象的SequenceFile,每个batch interval的文件名称为prefix-TIME_IN_MS[.suffix],Python API不支持此方法。

  • saveAsHadoopFiles(prefix, [suffix])

将DStream内容保存为Hadoop文件,每个batch interval的文件名称为prefix-TIME_IN_MS[.suffix],Python API不支持此方法。

  • foreachRDD(func)

通用的数据输出算子,func函数将每个RDD的数据输出到外部存储设备,比如将RDD写入到文件或者数据库。

 /**   * Apply a function to each RDD in this DStream. This is an output operator, so   * 'this' DStream will be registered as an output stream and therefore materialized.   */  def foreachRDD(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope {    val cleanedF = context.sparkContext.clean(foreachFunc, false)    foreachRDD((r: RDD[T], _: Time) => cleanedF(r), displayInnerRDDOps = true)  }

  /**   * Apply a function to each RDD in this DStream. This is an output operator, so   * 'this' DStream will be registered as an output stream and therefore materialized.   */  def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit = ssc.withScope {    // because the DStream is reachable from the outer object here, and because    // DStreams can't be serialized with closures, we can't proactively check    // it for serializability and so we pass the optional false to SparkContext.clean    foreachRDD(foreachFunc, displayInnerRDDOps = true)  }

  private def foreachRDD(      foreachFunc: (RDD[T], Time) => Unit,      displayInnerRDDOps: Boolean): Unit = {    new ForEachDStream(this,      context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()  }

foreachRDD是一个非常重要的操作,用户可以使用它将处理的数据输出到外部存储设备。关于foreachRDD的使用,需要特点别注意一些细节问题。具体分析如下:

如果将数据写入到MySQL,需要获取连接Connection。用户可能不经意的在Spark Driver中创建一个连接对象,然后在Work中使用它将数据写入外部设备,代码如下:

dstream.foreachRDD { rdd =>  val connection = createNewConnection()  // ①注意:该段代码在driver上执行  rdd.foreach { record =>    connection.send(record) // ②注意:该段代码在worker上执行  }}

尖叫提示:上面的使用方式是错误的,因为需要将connection对象进行序列化,然后发送到driver节点,而这种connection对象是不能被序列化,所以不能跨节点传输。上面代码会报序列化错误,正确的使用方式是在worker节点创建connection,即在rdd.foreach内部创建connection。方式如下:

dstream.foreachRDD { rdd =>  rdd.foreach { record =>    val connection = createNewConnection()    connection.send(record)    connection.close()  }}

上面的方式解决了不能序列化的问题,但是会为每个RDD的record创建一个connection,通常创建一个connection对象是会存在一定性能开销的,所以频繁创建和销毁connection对象会造成整体的吞吐量降低。一个比较好的做法是将rdd.foreach替换为``rdd.foreachPartition,这样就不用频繁为每个record创建connection,而是为RDD的partition创建connection,大大减少了创建connection带来的开销。

dstream.foreachRDD { rdd =>  rdd.foreachPartition { partitionOfRecords =>    val connection = createNewConnection()    partitionOfRecords.foreach(record => connection.send(record))    connection.close()  }}

其实上面的使用方式还可以进一步优化,可以通过在多个RDD或者批数据间重用连接对象。用户可以维护一个静态的连接对象池,重复使用池中的对象将多批次的RDD推送到外部系统,以进一步节省开销:

dstream.foreachRDD { rdd =>  rdd.foreachPartition { partitionOfRecords =>    val connection = ConnectionPool.getConnection()    partitionOfRecords.foreach(record => connection.send(record))    ConnectionPool.returnConnection(connection)    }}

使用案例

  • 模拟数据库连接池
/** * 简易版的连接池 */public class ConnectionPool {

    // 静态的Connection队列    private static LinkedList connectionQueue;/**     * 加载驱动     */static {try {            Class.forName("com.mysql.jdbc.Driver");        } catch (ClassNotFoundException e) {            e.printStackTrace();        }    }/**     * 获取连接,多线程访问并发控制     *     * @return     */public synchronized static Connection getConnection() {try {if (connectionQueue == null) {                connectionQueue = new LinkedList();for (int i = 0; i 10; i++) {                    Connection conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/wordcount", "root","123qwe");                    connectionQueue.push(conn);                }            }        } catch (Exception e) {            e.printStackTrace();        }return connectionQueue.poll();    }/**     * 用完之后,返回一个连接     */public static void returnConnection(Connection conn) {        connectionQueue.push(conn);    }}
  • 实时统计写入MySQL
object WordCount {  def main(args: Array[String]): Unit = {    val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")    val ssc = new StreamingContext(sparkConf, Seconds(5))    val lines = ssc.socketTextStream("localhost", 9999)    val words = lines.flatMap(_.split(" "))    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)    wordCounts.print()    // 存储到MySQL    wordCounts.foreachRDD { rdd =>      rdd.foreachPartition { partition =>        var conn: Connection = null        var stmt: PreparedStatement = null        // 给每个partition,获取一个连接        conn = ConnectionPool.getConnection        // 遍历partition中的数据,使用一个连接,插入数据库        while (partition.hasNext) {          val wordcounts = partition.next()          val sql = "insert into wctbl(word,count) values (?,?)"          stmt = conn.prepareStatement(sql);          stmt.setString(1, wordcounts._1.trim)          stmt.setInt(2, wordcounts._2.toInt)          stmt.executeUpdate()

        }        // 用完以后,将连接还回去        ConnectionPool.returnConnection(conn)      }    }    ssc.start()    ssc.awaitTermination()  }}

总结

由于篇幅限制,本文主要对Spark Streaming执行机制、Transformations与Output Operations、Spark Streaming数据源(Sources)、Spark Streaming 数据汇(Sinks)进行了讨论。下一篇将分享基于时间的窗口操作有状态的计算检查点Checkpoint性能调优等内容。

往期精彩回顾第一篇|Spark概览
第二篇|Spark core编程指南
第三篇|Spark SQL编程指南在看是一种动力 分享是一种美德

编程实现将rdd转换为dataframe:源文件内容如下(_第四篇|Spark Streaming编程指南(1)相关推荐

  1. 编程实现将rdd转换为dataframe:源文件内容如下(_大数据 什么是RDD?可以干什么?为什么要有RDD?...

    什么是RDD 弹性分布式数据集(Resilient Distributed Dataset,RDD)是 Spark 中的核心概念. RDD在抽象上来讲是一种抽象的分布式的数据集.它是被分区的,每个分区 ...

  2. hive编程指南电子版_第三篇|Spark SQL编程指南

    在<第二篇|Spark Core编程指南>一文中,对Spark的核心模块进行了讲解.本文将讨论Spark的另外一个重要模块--Spark SQL,Spark SQL是在Shark的基础之上 ...

  3. spark-sql建表语句限制_第三篇|Spark SQL编程指南

    在<第二篇|Spark Core编程指南>一文中,对Spark的核心模块进行了讲解.本文将讨论Spark的另外一个重要模块--Spark SQL,Spark SQL是在Shark的基础之上 ...

  4. hive编程指南_第三篇|Spark SQL编程指南

    在<第二篇|Spark Core编程指南>一文中,对Spark的核心模块进行了讲解.本文将讨论Spark的另外一个重要模块--Spark SQL,Spark SQL是在Shark的基础之上 ...

  5. Spark15:Spark SQL:DataFrame常见算子操作、DataFrame的sql操作、RDD转换为DataFrame、load和save操作、SaveMode、内置函数

    前面我们学习了Spark中的Spark core,离线数据计算,下面我们来学习一下Spark中的Spark SQL. 一.Spark SQL Spark SQL和我们之前讲Hive的时候说的hive ...

  6. RDD转换为DataFrame的两种方式详解

    Spark支持两种方法将存在的RDD转换为DataFrame(SchemaRDD),后面附完整样例代码 元数据:person.txt 1 zhangsan 20 2 lisi 29 3 wangwu ...

  7. dataframe 转rdd java,在pyspark中将RDD转换为Dataframe

    我想在pyspark中将我的RDD转换为Dataframe . 我的RDD: [(['abc', '1,2'], 0), (['def', '4,6,7'], 1)] 我希望RDD以Dataframe ...

  8. Spark Streaming 编程指南[中英对照]

    2019独角兽企业重金招聘Python工程师标准>>> 基于Spark 2.0 Preview的材料翻译,原[英]文地址: http://spark.apache.org/docs/ ...

  9. Spark大数据分析与实战:Spark Streaming编程初级实践

    Spark Streaming编程初级实践 一.安装Hadoop和Spark 具体的安装过程在我以前的博客里面有,大家可以通过以下链接进入操作: Hadoop的安装:https://blog.csdn ...

最新文章

  1. 在Android工程中加入AIDL文件时,gen目录生成的文件报错-问题解决
  2. linux下使用idl生成h文件,LINIUX下IDL的安装
  3. php fpm 日志记录,如何解决nginx下php-fpm不记录php报错日志的问题
  4. [算法笔记]二叉树基础
  5. 大学计算机基础知识点图文,大学计算机基础知识点超详细总结
  6. Windows下的SQL Server备份文件BAK在Linux环境下还原遇到的问题
  7. Python Django之路由系统
  8. 【Java游戏合集】手把手教你制作游戏
  9. 老外码农酒后吐槽,该说的不该说的全说了!!
  10. 正则匹配某字符前的内容
  11. iOS小技能:监听H5页面goBack返回事件 网页监听APP返回键 (NavigationBackItemInjection)
  12. c程序设计语言 qsort,【程序设计基础_C语言】北理工的恶龙(附qsort范例)
  13. Apache的配置文件详细解释
  14. 完全背包问题完全背包求具体方案
  15. 热分析技术清单:导热材料热扩散系数闪光法测量中的样品厚度选择
  16. Dropout技术之随机神经元与随机深度
  17. QtAndroid详解(2):startActivity和它的小伙伴们
  18. mongodb--读操作
  19. 打印机 HP LaserJet 1018安装教程
  20. WCS系统结构逻辑实现

热门文章

  1. 【ElasticSearch】Es 源码之 ActionModule 源码解读
  2. 【Clickhouse】Clickhouse 分析函数 window functions 窗口函数
  3. 60-100-240-使用-DataSource-JDBC相关-JDBC读取各种数据源
  4. linux下-bash: ls: command not found
  5. List 集合的常用方法
  6. 小林求职记(六)踩过Dubbo坑,回答印象深,干货整理
  7. php开发中常用函数总结,PHP开发中常用函数总结
  8. MySQL分页查询方法及优化
  9. 第四章 前端开发——JQuery库
  10. js_DOM读写节点