Spark机器学习9· 实时机器学习(scala with sbt)
Spark机器学习
1 在线学习
模型随着接收的新消息,不断更新自己;而不是像离线训练一次次重新训练。
2 Spark Streaming
- 离散化流(DStream)
- 输入源:Akka actors、消息队列、Flume、Kafka、……
http://spark.apache.org/docs/latest/streaming-programming-guide.html
- 类群(lineage):应用到RDD上的转换算子和执行算子的集合
3 MLib+Streaming应用
3.0 build.sbt
依赖Spark MLlib和Spark Streaming
name := "scala-spark-streaming-app"version := "1.0"scalaVersion := "2.11.7"libraryDependencies += "org.apache.spark" %% "spark-mllib" % "1.5.1"libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.5.1"
使用国内镜像仓库
~/.sbt/repositories
[repositories]
local
osc: http://maven.oschina.net/content/groups/public/
typesafe: http://repo.typesafe.com/typesafe/ivy-releases/, [organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext], bootOnly
sonatype-oss-releases
maven-central
sonatype-oss-snapshots
3.1 生产消息
object StreamingProducer {def main(args: Array[String]) {val random = new Random()// Maximum number of events per secondval MaxEvents = 6// Read the list of possible namesval namesResource = this.getClass.getResourceAsStream("/names.csv")val names = scala.io.Source.fromInputStream(namesResource).getLines().toList.head.split(",").toSeq// Generate a sequence of possible productsval products = Seq("iPhone Cover" -> 9.99,"Headphones" -> 5.49,"Samsung Galaxy Cover" -> 8.95,"iPad Cover" -> 7.49)/** Generate a number of random product events */def generateProductEvents(n: Int) = {(1 to n).map { i =>val (product, price) = products(random.nextInt(products.size))val user = random.shuffle(names).head(user, product, price)}}// create a network producerval listener = new ServerSocket(9999)println("Listening on port: 9999")while (true) {val socket = listener.accept()new Thread() {override def run = {println("Got client connected from: " + socket.getInetAddress)val out = new PrintWriter(socket.getOutputStream(), true)while (true) {Thread.sleep(1000)val num = random.nextInt(MaxEvents)val productEvents = generateProductEvents(num)productEvents.foreach{ event =>out.write(event.productIterator.mkString(","))out.write("\n")}out.flush()println(s"Created $num events...")}socket.close()}}.start()}}
}
sbt runMultiple main classes detected, select one to run:[1] MonitoringStreamingModel[2] SimpleStreamingApp[3] SimpleStreamingModel[4] StreamingAnalyticsApp[5] StreamingModelProducer[6] StreamingProducer[7] StreamingStateAppEnter number: 6
3.2 打印消息
object SimpleStreamingApp {def main(args: Array[String]) {val ssc = new StreamingContext("local[2]", "First Streaming App", Seconds(10))val stream = ssc.socketTextStream("localhost", 9999)// here we simply print out the first few elements of each batchstream.print()ssc.start()ssc.awaitTermination()}
}
sbt runEnter number: 2
3.3 流式分析
object StreamingAnalyticsApp {def main(args: Array[String]) {val ssc = new StreamingContext("local[2]", "First Streaming App", Seconds(10))val stream = ssc.socketTextStream("localhost", 9999)// create stream of events from raw text elementsval events = stream.map { record =>val event = record.split(",")(event(0), event(1), event(2))}/*We compute and print out stats for each batch.Since each batch is an RDD, we call forEeachRDD on the DStream, and apply the usual RDD functionswe used in Chapter 1.*/events.foreachRDD { (rdd, time) =>val numPurchases = rdd.count()val uniqueUsers = rdd.map { case (user, _, _) => user }.distinct().count()val totalRevenue = rdd.map { case (_, _, price) => price.toDouble }.sum()val productsByPopularity = rdd.map { case (user, product, price) => (product, 1) }.reduceByKey(_ + _).collect().sortBy(-_._2)val mostPopular = productsByPopularity(0)val formatter = new SimpleDateFormatval dateStr = formatter.format(new Date(time.milliseconds))println(s"== Batch start time: $dateStr ==")println("Total purchases: " + numPurchases)println("Unique users: " + uniqueUsers)println("Total revenue: " + totalRevenue)println("Most popular product: %s with %d purchases".format(mostPopular._1, mostPopular._2))}// start the contextssc.start()ssc.awaitTermination()}
}
sbt runEnter number: 4
3.4 有状态的流计算
object StreamingStateApp {import org.apache.spark.streaming.StreamingContext._def updateState(prices: Seq[(String, Double)], currentTotal: Option[(Int, Double)]) = {val currentRevenue = prices.map(_._2).sumval currentNumberPurchases = prices.sizeval state = currentTotal.getOrElse((0, 0.0))Some((currentNumberPurchases + state._1, currentRevenue + state._2))}def main(args: Array[String]) {val ssc = new StreamingContext("local[2]", "First Streaming App", Seconds(10))// for stateful operations, we need to set a checkpoint locationssc.checkpoint("/tmp/sparkstreaming/")val stream = ssc.socketTextStream("localhost", 9999)// create stream of events from raw text elementsval events = stream.map { record =>val event = record.split(",")(event(0), event(1), event(2).toDouble)}val users = events.map { case (user, product, price) => (user, (product, price)) }val revenuePerUser = users.updateStateByKey(updateState)revenuePerUser.print()// start the contextssc.start()ssc.awaitTermination()}
}
sbt runEnter number: 7
4 线性流回归
线性回归StreamingLinearRegressionWithSGD
- trainOn
- predictOn
4.1 流数据生成器
object StreamingModelProducer {import breeze.linalg._def main(args: Array[String]) {// Maximum number of events per secondval MaxEvents = 100val NumFeatures = 100val random = new Random()/** Function to generate a normally distributed dense vector */def generateRandomArray(n: Int) = Array.tabulate(n)(_ => random.nextGaussian())// Generate a fixed random model weight vectorval w = new DenseVector(generateRandomArray(NumFeatures))val intercept = random.nextGaussian() * 10/** Generate a number of random product events */def generateNoisyData(n: Int) = {(1 to n).map { i =>val x = new DenseVector(generateRandomArray(NumFeatures))val y: Double = w.dot(x)val noisy = y + intercept //+ 0.1 * random.nextGaussian()(noisy, x)}}// create a network producerval listener = new ServerSocket(9999)println("Listening on port: 9999")while (true) {val socket = listener.accept()new Thread() {override def run = {println("Got client connected from: " + socket.getInetAddress)val out = new PrintWriter(socket.getOutputStream(), true)while (true) {Thread.sleep(1000)val num = random.nextInt(MaxEvents)val data = generateNoisyData(num)data.foreach { case (y, x) =>val xStr = x.data.mkString(",")val eventStr = s"$y\t$xStr"out.write(eventStr)out.write("\n")}out.flush()println(s"Created $num events...")}socket.close()}}.start()}}
}
sbt runEnter number: 5
4.2 流回归模型
object SimpleStreamingModel {def main(args: Array[String]) {val ssc = new StreamingContext("local[2]", "First Streaming App", Seconds(10))val stream = ssc.socketTextStream("localhost", 9999)val NumFeatures = 100val zeroVector = DenseVector.zeros[Double](NumFeatures)val model = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.dense(zeroVector.data)).setNumIterations(1).setStepSize(0.01)// create a stream of labeled pointsval labeledStream: DStream[LabeledPoint] = stream.map { event =>val split = event.split("\t")val y = split(0).toDoubleval features: Array[Double] = split(1).split(",").map(_.toDouble)LabeledPoint(label = y, features = Vectors.dense(features))}// train and test model on the stream, and print predictions for illustrative purposesmodel.trainOn(labeledStream)//model.predictOn(labeledStream).print()ssc.start()ssc.awaitTermination()}
}
sbt runEnter number: 5
5 流K-均值
- K-均值聚类:StreamingKMeans
6 评估
object MonitoringStreamingModel {def main(args: Array[String]) {val ssc = new StreamingContext("local[2]", "First Streaming App", Seconds(10))val stream = ssc.socketTextStream("localhost", 9999)val NumFeatures = 100val zeroVector = DenseVector.zeros[Double](NumFeatures)val model1 = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.dense(zeroVector.data)).setNumIterations(1).setStepSize(0.01)val model2 = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.dense(zeroVector.data)).setNumIterations(1).setStepSize(1.0)// create a stream of labeled pointsval labeledStream = stream.map { event =>val split = event.split("\t")val y = split(0).toDoubleval features = split(1).split(",").map(_.toDouble)LabeledPoint(label = y, features = Vectors.dense(features))}// train both models on the same streammodel1.trainOn(labeledStream)model2.trainOn(labeledStream)// use transform to create a stream with model error ratesval predsAndTrue = labeledStream.transform { rdd =>val latest1 = model1.latestModel()val latest2 = model2.latestModel()rdd.map { point =>val pred1 = latest1.predict(point.features)val pred2 = latest2.predict(point.features)(pred1 - point.label, pred2 - point.label)}}// print out the MSE and RMSE metrics for each model per batchpredsAndTrue.foreachRDD { (rdd, time) =>val mse1 = rdd.map { case (err1, err2) => err1 * err1 }.mean()val rmse1 = math.sqrt(mse1)val mse2 = rdd.map { case (err1, err2) => err2 * err2 }.mean()val rmse2 = math.sqrt(mse2)println(s"""|-------------------------------------------|Time: $time|-------------------------------------------""".stripMargin)println(s"MSE current batch: Model 1: $mse1; Model 2: $mse2")println(s"RMSE current batch: Model 1: $rmse1; Model 2: $rmse2")println("...\n")}ssc.start()ssc.awaitTermination()}
}
sbt runEnter number: 1
Spark机器学习9· 实时机器学习(scala with sbt)相关推荐
- 2020中国Ray技术峰会丨取代Spark,Michael Jordan和Ion Stoica提出下一代分布式实时机器学习框架...
从MR到Spark再到Ray Michael I. Jordan力荐的Ray 尽在"Ray Summit Pre-Con" 2020年9月21日 09:00-12:10 Ray项目 ...
- 有望取代Spark,Michael Jordan和Ion Stoica提出下一代分布式实时机器学习框架Ray牛在哪?...
从MR到Spark再到Ray Michael I. Jordan力荐的Ray 尽在"Ray Summit Pre-Con" 2020年9月21日 09:00-12:10 Ray项目 ...
- Spark高级分析与机器学习笔记
一.高级分析和机器学习概览 1. 高级分析是指各种旨在发现数据规律,或根据数据做出预测和推荐等核心问题的技术.机器学习最佳的模型结构要根据要执行的任务制定,最常见的任务包括: (1)监督学习,包括分类 ...
- Apache Spark 2.0: 机器学习模型持久化
在即将发布的Apache Spark 2.0中将会提供机器学习模型持久化能力.机器学习模型持久化(机器学习模型的保存和加载)使得以下三类机器学习场景变得容易: 数据科学家开发ML模型并移交给工程师团队 ...
- 《Python Spark 2.0 Hadoop机器学习与大数据实战_林大贵(著)》pdf
<Python+Spark 2.0+Hadoop机器学习与大数据实战> 五星好评+强烈推荐的一本书,虽然内容可能没有很深入,但作者非常用心的把每一步操作详细的列出来并给出说明,让我们跟着做 ...
- Spark大数据分布式机器学习处理实战
前言 Spark是一种大规模.快速计算的集群平台,本公众号试图通过学习Spark官网的实战演练笔记提升笔者实操能力以及展现Spark的精彩之处.有关框架介绍和环境配置可以参考以下内容: 大数据处理框架 ...
- 实时机器学习是什么,面临哪些挑战?
最近能够随数据获取实时调整模型的实时机器学习,正在成为媒体技术领域的新"网红".曾经连续两年,都被FTI评为传媒业的重要技术趋势之一,与自然语言理解NLU.机器阅读理解MRC.音视 ...
- 《构建实时机器学习系统》一1.8 实时机器学习模型的生存期
1.8 实时机器学习模型的生存期 进行实时机器学习开发必须考虑生存期.生存期是指一个系统从提出.设计.开发. 测试到部署运用.维护.更新升级或退役的整个过程.若在生存期设计上出现了数据,那么在后面的使 ...
- 使用spark ml pipeline进行机器学习
一.关于spark ml pipeline与机器学习 一个典型的机器学习构建包含若干个过程 1.源数据ETL 2.数据预处理 3.特征选取 4.模型训练与验证 以上四个步骤可以抽象为一个包括多个步骤的 ...
最新文章
- C#之值类型和引用类型
- 23LC1024四线访问数据
- android studio大坑 executing external native build for cmake
- DIV弹窗 JS刷新页面
- html 鼠标图标做成动画效果,纯css3实现的鼠标悬停动画按钮
- vc调用matlab生成的dll实例
- idea下拉项目_推荐几款非常好用的IDEA插件(香)
- USACO(含training section)水题合集[5/未完待续]
- 末日前的唠叨:SEO之四大要不得
- SVM入门(一)至(三)
- ❤The Matrix黑客帝国屏保!!!❤HTML实现及其傻瓜安装你值得拥有
- 赵伟功老师 管理系统提升专家
- HTTP服务器错误状态码4XX、5XX
- 浅谈分布式存储之SSD基本原理
- Linux里面输入错误无法使用Backspace键
- DirectX11 板条箱示例Demo
- iOS各种被拒的原因汇总
- Vue SPA应用微信开发踩坑记录
- 光线追踪渲染实战:蒙特卡洛路径追踪及其c++实现
- 如何监控Nginx(看完这篇就会了)