Spark机器学习

1 在线学习

模型随着接收的新消息,不断更新自己;而不是像离线训练一次次重新训练。

2 Spark Streaming

  • 离散化流(DStream)
  • 输入源:Akka actors、消息队列、Flume、Kafka、……

http://spark.apache.org/docs/latest/streaming-programming-guide.html

  • 类群(lineage):应用到RDD上的转换算子和执行算子的集合

3 MLib+Streaming应用

3.0 build.sbt

依赖Spark MLlib和Spark Streaming

name := "scala-spark-streaming-app"version := "1.0"scalaVersion := "2.11.7"libraryDependencies += "org.apache.spark" %% "spark-mllib" % "1.5.1"libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.5.1"
使用国内镜像仓库

~/.sbt/repositories

[repositories]
local
osc: http://maven.oschina.net/content/groups/public/
typesafe: http://repo.typesafe.com/typesafe/ivy-releases/, [organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext], bootOnly
sonatype-oss-releases
maven-central
sonatype-oss-snapshots

3.1 生产消息

object StreamingProducer {def main(args: Array[String]) {val random = new Random()// Maximum number of events per secondval MaxEvents = 6// Read the list of possible namesval namesResource = this.getClass.getResourceAsStream("/names.csv")val names = scala.io.Source.fromInputStream(namesResource).getLines().toList.head.split(",").toSeq// Generate a sequence of possible productsval products = Seq("iPhone Cover" -> 9.99,"Headphones" -> 5.49,"Samsung Galaxy Cover" -> 8.95,"iPad Cover" -> 7.49)/** Generate a number of random product events */def generateProductEvents(n: Int) = {(1 to n).map { i =>val (product, price) = products(random.nextInt(products.size))val user = random.shuffle(names).head(user, product, price)}}// create a network producerval listener = new ServerSocket(9999)println("Listening on port: 9999")while (true) {val socket = listener.accept()new Thread() {override def run = {println("Got client connected from: " + socket.getInetAddress)val out = new PrintWriter(socket.getOutputStream(), true)while (true) {Thread.sleep(1000)val num = random.nextInt(MaxEvents)val productEvents = generateProductEvents(num)productEvents.foreach{ event =>out.write(event.productIterator.mkString(","))out.write("\n")}out.flush()println(s"Created $num events...")}socket.close()}}.start()}}
}
sbt runMultiple main classes detected, select one to run:[1] MonitoringStreamingModel[2] SimpleStreamingApp[3] SimpleStreamingModel[4] StreamingAnalyticsApp[5] StreamingModelProducer[6] StreamingProducer[7] StreamingStateAppEnter number: 6

3.2 打印消息

object SimpleStreamingApp {def main(args: Array[String]) {val ssc = new StreamingContext("local[2]", "First Streaming App", Seconds(10))val stream = ssc.socketTextStream("localhost", 9999)// here we simply print out the first few elements of each batchstream.print()ssc.start()ssc.awaitTermination()}
}
sbt runEnter number: 2

3.3 流式分析

object StreamingAnalyticsApp {def main(args: Array[String]) {val ssc = new StreamingContext("local[2]", "First Streaming App", Seconds(10))val stream = ssc.socketTextStream("localhost", 9999)// create stream of events from raw text elementsval events = stream.map { record =>val event = record.split(",")(event(0), event(1), event(2))}/*We compute and print out stats for each batch.Since each batch is an RDD, we call forEeachRDD on the DStream, and apply the usual RDD functionswe used in Chapter 1.*/events.foreachRDD { (rdd, time) =>val numPurchases = rdd.count()val uniqueUsers = rdd.map { case (user, _, _) => user }.distinct().count()val totalRevenue = rdd.map { case (_, _, price) => price.toDouble }.sum()val productsByPopularity = rdd.map { case (user, product, price) => (product, 1) }.reduceByKey(_ + _).collect().sortBy(-_._2)val mostPopular = productsByPopularity(0)val formatter = new SimpleDateFormatval dateStr = formatter.format(new Date(time.milliseconds))println(s"== Batch start time: $dateStr ==")println("Total purchases: " + numPurchases)println("Unique users: " + uniqueUsers)println("Total revenue: " + totalRevenue)println("Most popular product: %s with %d purchases".format(mostPopular._1, mostPopular._2))}// start the contextssc.start()ssc.awaitTermination()}
}
sbt runEnter number: 4

3.4 有状态的流计算

object StreamingStateApp {import org.apache.spark.streaming.StreamingContext._def updateState(prices: Seq[(String, Double)], currentTotal: Option[(Int, Double)]) = {val currentRevenue = prices.map(_._2).sumval currentNumberPurchases = prices.sizeval state = currentTotal.getOrElse((0, 0.0))Some((currentNumberPurchases + state._1, currentRevenue + state._2))}def main(args: Array[String]) {val ssc = new StreamingContext("local[2]", "First Streaming App", Seconds(10))// for stateful operations, we need to set a checkpoint locationssc.checkpoint("/tmp/sparkstreaming/")val stream = ssc.socketTextStream("localhost", 9999)// create stream of events from raw text elementsval events = stream.map { record =>val event = record.split(",")(event(0), event(1), event(2).toDouble)}val users = events.map { case (user, product, price) => (user, (product, price)) }val revenuePerUser = users.updateStateByKey(updateState)revenuePerUser.print()// start the contextssc.start()ssc.awaitTermination()}
}
sbt runEnter number: 7

4 线性流回归

线性回归StreamingLinearRegressionWithSGD

  • trainOn
  • predictOn

4.1 流数据生成器

object StreamingModelProducer {import breeze.linalg._def main(args: Array[String]) {// Maximum number of events per secondval MaxEvents = 100val NumFeatures = 100val random = new Random()/** Function to generate a normally distributed dense vector */def generateRandomArray(n: Int) = Array.tabulate(n)(_ => random.nextGaussian())// Generate a fixed random model weight vectorval w = new DenseVector(generateRandomArray(NumFeatures))val intercept = random.nextGaussian() * 10/** Generate a number of random product events */def generateNoisyData(n: Int) = {(1 to n).map { i =>val x = new DenseVector(generateRandomArray(NumFeatures))val y: Double = w.dot(x)val noisy = y + intercept //+ 0.1 * random.nextGaussian()(noisy, x)}}// create a network producerval listener = new ServerSocket(9999)println("Listening on port: 9999")while (true) {val socket = listener.accept()new Thread() {override def run = {println("Got client connected from: " + socket.getInetAddress)val out = new PrintWriter(socket.getOutputStream(), true)while (true) {Thread.sleep(1000)val num = random.nextInt(MaxEvents)val data = generateNoisyData(num)data.foreach { case (y, x) =>val xStr = x.data.mkString(",")val eventStr = s"$y\t$xStr"out.write(eventStr)out.write("\n")}out.flush()println(s"Created $num events...")}socket.close()}}.start()}}
}
sbt runEnter number: 5

4.2 流回归模型

object SimpleStreamingModel {def main(args: Array[String]) {val ssc = new StreamingContext("local[2]", "First Streaming App", Seconds(10))val stream = ssc.socketTextStream("localhost", 9999)val NumFeatures = 100val zeroVector = DenseVector.zeros[Double](NumFeatures)val model = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.dense(zeroVector.data)).setNumIterations(1).setStepSize(0.01)// create a stream of labeled pointsval labeledStream: DStream[LabeledPoint] = stream.map { event =>val split = event.split("\t")val y = split(0).toDoubleval features: Array[Double] = split(1).split(",").map(_.toDouble)LabeledPoint(label = y, features = Vectors.dense(features))}// train and test model on the stream, and print predictions for illustrative purposesmodel.trainOn(labeledStream)//model.predictOn(labeledStream).print()ssc.start()ssc.awaitTermination()}
}
sbt runEnter number: 5

5 流K-均值

  • K-均值聚类:StreamingKMeans

6 评估

object MonitoringStreamingModel {def main(args: Array[String]) {val ssc = new StreamingContext("local[2]", "First Streaming App", Seconds(10))val stream = ssc.socketTextStream("localhost", 9999)val NumFeatures = 100val zeroVector = DenseVector.zeros[Double](NumFeatures)val model1 = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.dense(zeroVector.data)).setNumIterations(1).setStepSize(0.01)val model2 = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.dense(zeroVector.data)).setNumIterations(1).setStepSize(1.0)// create a stream of labeled pointsval labeledStream = stream.map { event =>val split = event.split("\t")val y = split(0).toDoubleval features = split(1).split(",").map(_.toDouble)LabeledPoint(label = y, features = Vectors.dense(features))}// train both models on the same streammodel1.trainOn(labeledStream)model2.trainOn(labeledStream)// use transform to create a stream with model error ratesval predsAndTrue = labeledStream.transform { rdd =>val latest1 = model1.latestModel()val latest2 = model2.latestModel()rdd.map { point =>val pred1 = latest1.predict(point.features)val pred2 = latest2.predict(point.features)(pred1 - point.label, pred2 - point.label)}}// print out the MSE and RMSE metrics for each model per batchpredsAndTrue.foreachRDD { (rdd, time) =>val mse1 = rdd.map { case (err1, err2) => err1 * err1 }.mean()val rmse1 = math.sqrt(mse1)val mse2 = rdd.map { case (err1, err2) => err2 * err2 }.mean()val rmse2 = math.sqrt(mse2)println(s"""|-------------------------------------------|Time: $time|-------------------------------------------""".stripMargin)println(s"MSE current batch: Model 1: $mse1; Model 2: $mse2")println(s"RMSE current batch: Model 1: $rmse1; Model 2: $rmse2")println("...\n")}ssc.start()ssc.awaitTermination()}
}
sbt runEnter number: 1

Spark机器学习9· 实时机器学习(scala with sbt)相关推荐

  1. 2020中国Ray技术峰会丨取代Spark,Michael Jordan和Ion Stoica提出下一代分布式实时机器学习框架...

    从MR到Spark再到Ray Michael I. Jordan力荐的Ray 尽在"Ray Summit Pre-Con" 2020年9月21日 09:00-12:10 Ray项目 ...

  2. 有望取代Spark,Michael Jordan和Ion Stoica提出下一代分布式实时机器学习框架Ray牛在哪?...

    从MR到Spark再到Ray Michael I. Jordan力荐的Ray 尽在"Ray Summit Pre-Con" 2020年9月21日 09:00-12:10 Ray项目 ...

  3. Spark高级分析与机器学习笔记

    一.高级分析和机器学习概览 1. 高级分析是指各种旨在发现数据规律,或根据数据做出预测和推荐等核心问题的技术.机器学习最佳的模型结构要根据要执行的任务制定,最常见的任务包括: (1)监督学习,包括分类 ...

  4. Apache Spark 2.0: 机器学习模型持久化

    在即将发布的Apache Spark 2.0中将会提供机器学习模型持久化能力.机器学习模型持久化(机器学习模型的保存和加载)使得以下三类机器学习场景变得容易: 数据科学家开发ML模型并移交给工程师团队 ...

  5. 《Python Spark 2.0 Hadoop机器学习与大数据实战_林大贵(著)》pdf

    <Python+Spark 2.0+Hadoop机器学习与大数据实战> 五星好评+强烈推荐的一本书,虽然内容可能没有很深入,但作者非常用心的把每一步操作详细的列出来并给出说明,让我们跟着做 ...

  6. Spark大数据分布式机器学习处理实战

    前言 Spark是一种大规模.快速计算的集群平台,本公众号试图通过学习Spark官网的实战演练笔记提升笔者实操能力以及展现Spark的精彩之处.有关框架介绍和环境配置可以参考以下内容: 大数据处理框架 ...

  7. 实时机器学习是什么,面临哪些挑战?

    最近能够随数据获取实时调整模型的实时机器学习,正在成为媒体技术领域的新"网红".曾经连续两年,都被FTI评为传媒业的重要技术趋势之一,与自然语言理解NLU.机器阅读理解MRC.音视 ...

  8. 《构建实时机器学习系统》一1.8 实时机器学习模型的生存期

    1.8 实时机器学习模型的生存期 进行实时机器学习开发必须考虑生存期.生存期是指一个系统从提出.设计.开发. 测试到部署运用.维护.更新升级或退役的整个过程.若在生存期设计上出现了数据,那么在后面的使 ...

  9. 使用spark ml pipeline进行机器学习

    一.关于spark ml pipeline与机器学习 一个典型的机器学习构建包含若干个过程 1.源数据ETL 2.数据预处理 3.特征选取 4.模型训练与验证 以上四个步骤可以抽象为一个包括多个步骤的 ...

最新文章

  1. C#之值类型和引用类型
  2. 23LC1024四线访问数据
  3. android studio大坑 executing external native build for cmake
  4. DIV弹窗 JS刷新页面
  5. html 鼠标图标做成动画效果,纯css3实现的鼠标悬停动画按钮
  6. vc调用matlab生成的dll实例
  7. idea下拉项目_推荐几款非常好用的IDEA插件(香)
  8. USACO(含training section)水题合集[5/未完待续]
  9. 末日前的唠叨:SEO之四大要不得
  10. SVM入门(一)至(三)
  11. ❤The Matrix黑客帝国屏保!!!❤HTML实现及其傻瓜安装你值得拥有
  12. 赵伟功老师 管理系统提升专家
  13. HTTP服务器错误状态码4XX、5XX
  14. 浅谈分布式存储之SSD基本原理
  15. Linux里面输入错误无法使用Backspace键
  16. DirectX11 板条箱示例Demo
  17. iOS各种被拒的原因汇总
  18. Vue SPA应用微信开发踩坑记录
  19. 光线追踪渲染实战:蒙特卡洛路径追踪及其c++实现
  20. 如何监控Nginx(看完这篇就会了)

热门文章

  1. python 对字典排序
  2. 一台物理机上VMware虚拟机实现拨号上网同时内网通信
  3. C/C++基础问题归集
  4. Windows 7加域操作手册下
  5. Android面试题大集合
  6. 网页图表Highcharts实践教程之认识Highcharts
  7. Arduino Yun快速入门教程(大学霸内部资料)
  8. 机器运算知识点计算机组成原理,计算机组成原理考研知识点非常全
  9. python读取nii文件_python实现批量nii文件转换为png图像
  10. malloc 初始化_在C语言中,请一定记得初始化局部变量!