版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

传送门:大数据系列文章目录

目录

  • scala的 泛型
    • 给方法定义泛型
    • 给类定义泛型
    • 泛型的上下界
    • 泛型中 非变 协变 和 逆变
  • scala中actor相关内容
    • actor基本介绍
    • actor的入门案例
    • 基于actor模拟消息发送和接收
    • 基于actor实现自定义消息传递
    • wordCount整体实现操作
  • akka
    • akka的基本介绍
    • 基于akka实现进程之间的通信
    • 基于akka实现一个spark内部通信案例
      • 通信的流程
      • 准备工作
      • 代码操作
        • spark-master
        • spark-slave

scala的 泛型

给方法定义泛型

格式:

def 方法名[泛型名称](..) = {//...
}

示例

示例说明

用一个方法来获取任意类型数组的中间的元素

  • 不考虑泛型直接实现(基于Array[Int]实现)
  • 加入泛型支持
package com.lee
// 不使用泛型来实现的操作: 整体的操作比较繁琐的
object Demo01 {/*用一个方法来获取任意类型数组的中间的元素- 不考虑泛型直接实现(基于Array[Int]实现)- 加入泛型支持*/def getMiddleData(array:Array[Int]) = {array((array.length-1) /2)}def getMiddleData(array:Array[String]) = {array((array.length-1) /2)}def getMiddleData(array:Array[Double]) = {array((array.length-1) /2)}def main(args: Array[String]): Unit = {val value1 = getMiddleData(Array(1,2,3,4,5))val value2 = getMiddleData(Array("A","B","C","D","E"))val value3 = getMiddleData(Array(1.2,2.2,3.2,5.2,6.2))println(value1)}
}

实现泛型来实现

package com.lee// 不使用泛型来实现的操作: 整体的操作比较繁琐的
object Demo02 {/*用一个方法来获取任意类型数组的中间的元素- 不考虑泛型直接实现(基于Array[Int]实现)- 加入泛型支持*/def getMiddleData[T](array:Array[T]) = {array((array.length-1) /2)}def main(args: Array[String]): Unit = {val value1 = getMiddleData(Array(1,2,3,4,5))val value2 = getMiddleData(Array("A","B","C","D","E"))val value3 = getMiddleData(Array(1.2,2.2,3.2,5.2,6.2))println(value1)}
}

给类定义泛型

语法结构:

class 类[T](val 变量名: T)

示例

示例说明

  • 实现一个Pair泛型类
  • Pair类包含两个字段,而且两个字段的类型不固定
  • 创建不同类型泛型类对象,并打印
package com.leeobject Demo03 {/*- 实现一个Pair泛型类- Pair类包含两个字段,而且两个字段的类型不固定- 创建不同类型泛型类对象,并打印*/class  Pair[T] {var array:Array[T] = _var list1:List[T] = _def eat[T](value:T) = {value match {case  _ : Int => println("这是一个Int类型数据")case  _ : String => println("这是一个String类型数据")case _  =>println("不知道什么类型")}}}def main(args: Array[String]): Unit = {val pair = new Pair[String]()pair.array = Array("张三","李四")pair.eat(10)}
}

说明:
​在对类定义泛型后, 泛型仅对成员变量是生效的, 而方法是可以单独在重新定义的

泛型的上下界

  • 上界
    使用<: 类型名表示给类型添加一个上界,表示泛型参数必须是该类型的子类或者本身
[T <: 类型]

上界设置操作

package com.leeimport com.lee.Person// 演示 泛型的上界
object Demo04 {trait  Personclass Stu extends Personclass Temp[T <: Person] {var list:List[T] = _}def main(args: Array[String]): Unit = {val temp = new Temp[Stu]  // 经过测试, 发现 可以传递 Stu 或者 Person, 说明设置上界为 Person 生效了}}
  • 下界:
    上界是要求必须是某个类的子类,或者必须从某个类继承,而下界是必须是某个类的父类(或本身)
[T >: 类型]
package com.lee// 演示 泛型的下界
object Demo05 {trait  Personclass Stu extends Personclass zhangsan extends Stuclass Temp[T >: Stu] {var list:List[T] = _}def main(args: Array[String]): Unit = {val temp = new Temp[Person]  // 经过测试, 发现 可以传递 Stu 或者 Person, 说明设置下界为 Stu 生效了 . 只能传递本身和父类}}

泛型中 非变 协变 和 逆变

相关操作:

package com.leeobject Demo06 {class Superclass Sub extends Superclass Temp1[T]    // 演示 非变class Temp2[+T]   // 演示 协变  子类转换为父类接收class Temp3[-T]   // 演示 逆变def main(args: Array[String]): Unit = {// 演示 非变val temp1: Temp1[Sub] = new Temp1[Sub]// val temp:Temp1[Super] = temp1  直接编译报错  无法转换// 演示协变val temp2: Temp2[Sub] = new Temp2[Sub] // val temp:Temp2[Super] = temp2  正常  说明 可以使用父类泛型 接收子类泛型val temp3: Temp2[Super] = new Temp2[Super]// val temp:Temp2[Sub] = temp3  编译报错, 不支持 使用子类泛型 接收父类泛型// 演示逆变val temp4: Temp3[Super] = new Temp3[Super]//  val temp:Temp3[Sub] = temp4  正常  说明可以使用子类泛型, 接收父类泛型}
}

scala中actor相关内容

actor基本介绍

scala的actor类似于java中线程(Thread),但是要比java的线程的效率要高的多

在java的线程中, 存在比较大问题, 在操作共享数据的时候, 为了保证线程的安全, 一般都需要进行加锁来处理, 而一旦加锁后, 就会导致只有一个线程可以操作, 其他的线程只能等待, 这样就会导致资源浪费, 同时执行效率也会下降, 同时还有可能出现死锁情况

Actor并发编程模型是一种不共享数据,依赖消息传递的一种并发编程模式,有效避免资源争夺、死锁等情况。

发送消息的方式

! 发送异步消息,没有返回值。
!? 发送同步消息,等待返回值。
!! 发送异步消息,返回值是 Future[Any]。

注意:

Future 表示一个异步操作的结果状态,可能还没有实际完成的异步任务的结果

Any是所有类的超类,Future[Any]的泛型是异步操作结果的类型。

例如:

要给actor1发送一个异步字符串消息,使用以下代码:

actor1 ! "你好!"

接收消息

Actor中使用receive方法来接收消息,需要给receive方法传入一个偏函数

{case 变量名1:消息类型1 => 业务处理1,case 变量名2:消息类型2 => 业务处理2,...
}

[!NOTE]

receive方法只接收一次消息,接收完后继续执行act方法

actor的入门案例

导入相关的pom依赖

    <properties><project.build.source.Encoding>UTF-8</project.build.source.Encoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.11.12</version></dependency><!-- 演示Scala-actor并发编程时需要的依赖,因为过期了在2.12中被移除了,所以引入2.11的--><dependency><groupId>org.scala-lang</groupId><artifactId>scala-actors</artifactId><version>2.11.8</version></dependency><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-actor_2.11</artifactId><version>2.3.14</version></dependency><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-remote_2.11</artifactId><version>2.3.14</version></dependency></dependencies><build><sourceDirectory>src/main/scala</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory><plugins><!-- Scala编译插件--><plugin><groupId>org.scala-tools</groupId><artifactId>maven-scala-plugin</artifactId><version>2.15.2</version><executions><execution><id>scala-compile-first</id><goals><goal>compile</goal></goals><configuration><includes><include>**/*.scala</include></includes></configuration></execution><execution><id>scala-test-compile</id><goals><goal>testCompile</goal></goals></execution></executions></plugin><!-- maven编译插件--><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.7.0</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build>
  • 代码实现
package com.leeimport scala.actors.Actor// 1) actor的入门案例
object Demo01 {/*需求: 创建两个actor, 一个actor打印 1~10  一个actor打印 11~201、定义一个class/object继承Actor特质,注意导包import scala.actors.Actor2、重写对应的act方法3、调用Actor的start方法执行Actor4、当act方法执行完成,整个程序运行结束*/class Actor1 extends Actor {override def act(): Unit = (1 to 10).foreach(println(_))}class Actor2 extends Actor {override def act(): Unit = (11 to 20).foreach(println(_))}def main(args: Array[String]): Unit = {new Actor1().start()new Actor2().start()}}

基于actor模拟消息发送和接收

package com.leeimport scala.actors.Actorobject Demo02 {/*需求: 实现两个actor之间数据发送和接收操作1、创建两个Actor(ActorSender、ActorReceiver)2、ActorSender发送一个异步字符串消息给ActorReceiver3、ActorReceive接收到该消息后,打印出来*/// 发送者object ActorSender extends Actor {override def act(): Unit = {println("发送者执行数据发送:")ActorReceiver ! "我是发送者, 你收到了吗?"   // 中缀调用法receive{case message:String => println("接收到消息:"+ message)}}}//接收者object ActorReceiver extends Actor {override def act(): Unit = {receive {case i:String =>println("我已经接收到消息了,消息为:"+ i)// 偏函数// 响应我已经接收到了数据ActorSender ! "我已经接收到了...."}}}def main(args: Array[String]): Unit = {ActorSender.start()ActorReceiver.start()}
}

发现的问题: receiver操作 只能接收到一次消息, 整个actor直接退出了

如何进行循环的接收消息呢?

只需要 在receiver的方法外面套上 while(true) 让其一直不断的接收消息即可

while(true){receive {case i:String =>println("我已经接收到消息了,消息为:"+ i)// 偏函数// 响应我已经接收到了数据ActorSender ! "我已经接收到了...."}
}

说明: 在使用receiver方法在接收消息的时候, actor默认 每接收一次消息, 相当于就是重新创建一个线程

存在弊端: 如果消息比较多, 导致线程被频繁的创建和销毁, 对执行效率 以及资源都是一种浪费

那么如何解决呢? scala为了解决这个问题, 提供一种: loop和react 优化消息的接收模型

// 持续接收消息
loop {react {case msg:String => println("接收到消息:" + msg)}
}

基于actor实现自定义消息传递

package com.leeimport scala.actors.{Actor, Future}//3 模拟自定义的数据 消息传递工作
object Demo03 {// 样例类: 用于封装数据操作case class MessageData(id:Int,msg:String)object Actor01 extends Actor {override def act(): Unit = {receive {case  MessageData(x,y) =>println("接收到消息: "+x +"  "+y)Thread.sleep(5000)sender ! MessageData(2,"我很好...")}}}def main(args: Array[String]): Unit = {Actor01.start()// Actor01 ! MessageData(1,"你好吗? 我是main") // 异步无返回值, 适用于发送过去 接受者不需要返回消息情况//val a: Any = Actor01 !? MessageData(1,"你好吗? 我是main") // 当使用同步方式, 会阻塞等待接收者返回消息val a: Future[Any] = Actor01 !! MessageData(1, "你好吗? 我是main") // 异步有返回值 ,发送过去 立即就会返回, 不去等待返回值// 如果是立即返回, 那么就有可能出现接受者还没有讲消息返回来, 此时 返回值 Future类型, 此对象可以用来获取返回的状态信息// 如果获取数据// asInstanceOf 用于进行类型的转换操作 val data: MessageData = a.apply().asInstanceOf[MessageData]println(data.id + "  "+data.msg)}
}

说明:

​ 通过Future的apply方法获取返回数据的时候,如果数据没有返回回来, 此时apply方法会自动阻塞 直到有数据返回

Future的isset的方法, 用于判断数据是否已经返回, 如果返回就会为true 如果没有返回就为false

wordCount整体实现操作

  • 代码实现
package com.lee.actorimport java.io.Fileimport scala.actors.Actor
import scala.io.Sourceobject WordCountDemo06 {def main(args: Array[String]): Unit = {//1.创建一个File对象, 指定对应目录位置val file = new File("D:\\scala\\data")//2. 从这个file对象中,获取所有的文件val files: Array[File] = file.listFiles()//3. 获取每一个文件对应的路径val filePaths: Array[String] = files.map(_.getPath)//4. 根据文件的数量, 创建对应多个actor, 并将文件路径配置一对一的分配给actor操作(分配任务)val actorsAndFilePath: Array[(WordCountActor, String)] = filePaths.map(fileName => new WordCountActor -> fileName)//5. 启动每一个actor, 将文件路径传递给对应actor操作val futureArray = actorsAndFilePath.map(actorAndFilePath => {actorAndFilePath._1.start() // 启动actoractorAndFilePath._1 !! actorAndFilePath._2})//7. 进行最终聚合统计操作/*Map(elk -> 3, kafka -> 3, hbase -> 3, redis -> 6)Map(hadoop -> 2, impala -> 2, hive -> 1, flume -> 1)Map(hue -> 4, hive -> 8, flume -> 8, hbase -> 4)Map(hue -> 1, hadoop -> 1, spark -> 1, hive -> 1, scala -> 1, kudu -> 1)*///7.1: 获取每一个actor返回的结果//最终想要的结果:// Array((hadoop,2), (impala,2), (hive,1), (flume,1), (hue,1), (hadoop,1), (spark,1), (hive,1), (scala,1), (kudu,1), (elk,3), (kafka,3), (hbase,3), (redis,6), (hue,4), (hive,8), (flume,8), (hbase,4))val wordsAndNums = futureArray.flatMap(future => future.apply().asInstanceOf[Map[String,Int]])//7.2 按照元组的第一个进行分组, 然后统计计算即可//Map( hadoop->List((hadoop,2),(hadoop,1)) ,impala->List((impala,2)), hive->List((hive,1),(hive,1),(hive,8)) ...)val groupVal = wordsAndNums.groupBy(_._1)// 7.3 进行统计操作val wordCount = groupVal.map(wordAndNum => {//val sum = wordAndNum._2.map(num => num._2).sumwordAndNum._1 -> sum})//7.4 进行打印最终结果wordCount.foreach(println(_))}// 此actor用于处理每个文件的局部统计操作class WordCountActor extends Actor{override def act(): Unit = {//6. 接收到分配过来filePathreceive{case filePath:String =>//6.1. 根据文件路径读取文件数据 (一行一行读取)// hadoop hive impala hadoopval lineData = Source.fromFile(filePath).getLines()//6.2. 切割数据, 将其变更为一个个单词// [hadoop,hive,impala,hadoop]val words: Iterator[String] = lineData.flatMap(_.split(" "))//6.3. 将每一个单词, 设置为 (单词,1) ,(单词,1) 方案// [(hadoop->1),(hive->1),(impala->1),(hadoop->1)]val wordNum: Iterator[(String, Int)] = words.map( _ -> 1 )//6.4. 执行分组操作, 将相同单词聚合在一起// map( hadoop -> [(hadoop->1),(hadoop->1)] , hive -> [(hive -> 1)],impala -> [(impala ->1)] )val groupData: Map[String, List[(String, Int)]] = wordNum.toList.groupBy(  _._1 )//6.5. 进行求和操作// [(hadoop->2),(hive->1),(impala ->1)]val wordCount = groupData.map(wordAndNum => {//val sum = wordAndNum._2.map(num => num._2).sumwordAndNum._1 -> sum})//6. 打印://println(wordCount)// 6.6. 将计算完的结果, 返回给main方法sender ! wordCount}}}}

akka

akka的基本介绍

 Akka是一个用于构建高并发、分布式和可扩展的基于事件驱动的应用的工具包。Akka是使用scala开发的库,同时可以使用scala和Java语言来开发基于Akka的应用程序。
  • 提供基于异步非阻塞、高性能的事件驱动编程模型

  • 内置容错机制,允许Actor在出错时进行恢复或者重置操作

  • 超级轻量级的事件处理(每GB堆内存几百万Actor)

  • 使用Akka可以在单机上构建高并发程序,也可以在网络中构建分布式程序

单机高并发: 在一个节点上 有多个线程的操作

​网络构造高并发程序: 在多个节点上, 基于进程通信构建

  • 核心的API
1) 如何获取ActorSystem对象 :context.system 获取
2) 如何获取actor的引用:ActorSystem.actorOf(Props(Actor对象), "Actor名字")3) 如何实现一个akka的actor: 3.1) 创建一个类, 继承 Actor (Actor需要导入akka下的包)3.2) 重写 receiver方法, 在这个方法内接受消息即可 (akka默认实现循环接收操作)3.3) 可选: 重写preStart() 方法, 这个方法是当actor被创建后, 此方法会被自动调用, 而且只会调用一次4) akka的定时周期执行:4.1 导入相关的隐式转换import scala.concurrent.duration._      // 设置时间间隔的隐式转换import context.dispatcher   // 这是定时器必须导入隐式转换4.2 使用定时器: context.system.scheduler.schedule(延迟多久执行0 millis, 每隔多久执行15000 millis, 给谁发self, 定期执行的函数CheckTimeOutWorker)5) 远端地址(URL): 本地Actor: akka://actorSystem名称/user/Actor名称  示例: akka://SimpleAkkaDemo/user/senderActor  远程Actor:     akka.tcp://my-sys@ip地址:port/user/Actor名称   示例: akka.tcp://192.168.10.17:5678/user/service-b

基于akka实现进程之间的通信

基于Akka实现在两个进程间发送、接收消息。Worker启动后去连接Master,并发送消息,Master接收到消息后,再回复Worker消息。

操作步骤

    1. 创建两个maven的项目: 一个叫 akka-master 一个叫akka-worker
    1. 导入相关的依赖 : 此依赖于在actor导入依赖是一致的
<properties><project.build.source.Encoding>UTF-8</project.build.source.Encoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><dependencies><!--<dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.12.11</version></dependency>--><!-- 演示Scala-actor并发编程时需要的依赖,因为过期了在2.12中被移除了,所以引入2.11的--><dependency><groupId>org.scala-lang</groupId><artifactId>scala-actors</artifactId><version>2.11.8</version></dependency><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-actor_2.11</artifactId><version>2.3.14</version></dependency><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-remote_2.11</artifactId><version>2.3.14</version></dependency></dependencies><build><sourceDirectory>src/main/scala</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory><plugins><!-- Scala编译插件--><plugin><groupId>org.scala-tools</groupId><artifactId>maven-scala-plugin</artifactId><version>2.15.2</version><executions><execution><id>scala-compile-first</id><goals><goal>compile</goal></goals><configuration><includes><include>**/*.scala</include></includes></configuration></execution><execution><id>scala-test-compile</id><goals><goal>testCompile</goal></goals></execution></executions></plugin><!-- maven编译插件--><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.7.0</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build>
    1. 在项目中创建scala的目录, 创建后选择maven进行一下刷新即可
    1. 导入akka的配置文件(两个项目都需要创建): application.conf

将这个配置文件导入到项目的resources目录下 导入后, 不要忘记修改端口号

master

akka.actor.provider = "akka.remote.RemoteActorRefProvider"
akka.remote.netty.tcp.hostname = "127.0.0.1"
akka.remote.netty.tcp.port = "7000"

worker

akka.actor.provider = "akka.remote.RemoteActorRefProvider"
akka.remote.netty.tcp.hostname = "127.0.0.1"
akka.remote.netty.tcp.port = "7001"
    1. 在master项目中完成akka的基础环境准备

创建一个MasterActor线程:

import akka.actor.Actor/*** @Author: lwh* @Date: 2022/11/29* @Description:**/
object MasterActor extends Actor {override def receive: Receive = {case msg: String => {println("master接受的消息:" + msg)val selection = context.actorSelection("akka.tcp://actorSystem@127.0.0.1:7001/user/workerActor")selection ! "success"}}
}

创建一个MasterMain 入口类

import akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactory/*** @Author: lwh* @Date: 2022/11/29* @Description:**/
object MasterMain {def main(args: Array[String]): Unit = {val masterSystem = ActorSystem("actorSystem", ConfigFactory.load())val masterActor = masterSystem.actorOf(Props(MasterActor), "masterActor")}}
    1. 在worker项目中完成akka的基础环境准备

创建WrokerActor

import akka.actor.Actor/*** @Author: lwh* @Date: 2022/11/29* @Description:**/
object WorkerActor extends Actor{override def receive: Receive = {case "setup" =>println("接收到setup消息")val selection = context.actorSelection("akka.tcp://actorSystem@127.0.0.1:7000/user/masterActor")selection ! "connect"case "success" =>println("接收到成功注册消息....")}
}

创建WorkerMain 入口类

import akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactory/*** @Author: lwh* @Date: 2022/11/29* @Description:**/
object WorkerMain {def main(args: Array[String]): Unit = {val actorSystemSystem = ActorSystem("actorSystem",ConfigFactory.load())val workerActor = actorSystemSystem.actorOf(Props(WorkerActor),"workerActor")workerActor ! "setup"}}
    1. 分别启动master 和 worker的入口类, 如果启动没有报错 说明基础环境配置成功了

基于akka实现一个spark内部通信案例

通信的流程

准备工作

  1. 创建三个maven的项目: 一个项目为 spark-master 一个项目为: spark-slave 一个项目为: spark-common
    common项目 主要是用于放置一些公共的样例类和配置信息
  2. 让 spark-master 和 spark-slave 都要依赖于 spark-common
    在spark-master和 spark-slave分别 导入以下依赖
    <dependencies><dependency><groupId>com.lee</groupId><artifactId>scala_spark_common</artifactId><version>1.0-SNAPSHOT</version></dependency></dependencies>
  1. spark-master 和 spark-slave 导入和 akka入门案例一致的依赖信息(此处放置包含上面依赖所有内容)
 <properties><project.build.source.Encoding>UTF-8</project.build.source.Encoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><dependencies><!--<dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.12.11</version></dependency>--><!-- 演示Scala-actor并发编程时需要的依赖,因为过期了在2.12中被移除了,所以引入2.11的--><dependency><groupId>org.scala-lang</groupId><artifactId>scala-actors</artifactId><version>2.11.8</version></dependency><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-actor_2.11</artifactId><version>2.3.14</version></dependency><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-remote_2.11</artifactId><version>2.3.14</version></dependency><dependency><groupId>com.lee</groupId><artifactId>scala_spark_common</artifactId><version>1.0-SNAPSHOT</version></dependency></dependencies><build><sourceDirectory>src/main/scala</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory><plugins><!-- Scala编译插件--><plugin><groupId>org.scala-tools</groupId><artifactId>maven-scala-plugin</artifactId><version>2.15.2</version><executions><execution><id>scala-compile-first</id><goals><goal>compile</goal></goals><configuration><includes><include>**/*.scala</include></includes></configuration></execution><execution><id>scala-test-compile</id><goals><goal>testCompile</goal></goals></execution></executions></plugin><!-- maven编译插件--><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.7.0</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build>
  1. 在spark-common中导入相关依赖:
 <properties><project.build.source.Encoding>UTF-8</project.build.source.Encoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.11.12</version></dependency></dependencies><build><sourceDirectory>src/main/scala</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory><plugins><!-- Scala编译插件--><plugin><groupId>org.scala-tools</groupId><artifactId>maven-scala-plugin</artifactId><version>2.15.2</version><executions><execution><id>scala-compile-first</id><goals><goal>compile</goal></goals><configuration><includes><include>**/*.scala</include></includes></configuration></execution><execution><id>scala-test-compile</id><goals><goal>testCompile</goal></goals></execution></executions></plugin><!-- maven编译插件--><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.7.0</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build>
  1. 分别在每个项目中创建scala目录结构
  2. 在每个项目中创建包结构: com.lee.akka.spark
  3. 将样例类和配置文件导入到对应的项目中: pom除外
  4. 编写akka的基本入门准备工作: 与 入门案例的akka准备工作是一致的

代码操作

spark-master
  • 入口类
package com.lee.akka.sparkimport akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactoryobject SparkMasterMain {def main(args: Array[String]): Unit = {//1. 创建ActorSystem对象val actorSystem = ActorSystem("sparkMasterActorSystem",ConfigFactory.load())//2. 获取自己的actor引用对象val actorRef = actorSystem.actorOf(Props(SparkMasterActor),"sparkMasterActor")//3. 对于master启动后,目前什么都不需要做}
}
  • actor类
package com.lee.akka.sparkimport java.util.Dateimport akka.actor.Actorimport scala.collection.mutableobject SparkMasterActor extends Actor{private val regWorkerMap = mutable.Map[String, WorkerInfo]()//此方法 一启动 就会执行 :定时检测注册列表中是否有超时的从节点注册信息override def preStart(): Unit = {import scala.concurrent.duration._import context.dispatcher// 定时执行心跳操作:context.system.scheduler.schedule(0 second,ConfigUtil.`master.check.heartbeat.interval` second){//1. 从注册列表中 过滤出已经超时的注册列表val timeOutRegData: mutable.Map[String, WorkerInfo] = regWorkerMap.filter(regData => (new Date().getTime - regData._2.lastHeartBeatTime)/1000 > ConfigUtil.`master.check.heartbeat.timeout`)//2. 将这些数据从 regWorkerMap 清理出去timeOutRegData.foreach(timeoutData => regWorkerMap -= timeoutData._1 )//3.遍历打印当前还剩余那些slaveregWorkerMap.foreach(regWork =>println("当前注册的slave有: "+regWork._2))}}override def receive = {case WorkerRegisterMessage(workerid,cpu,mem) =>println("注册信息为:"+workerid+"  "+cpu+"  "+mem)//1. 接收到注册信息后, 然后将注册信息保存到注册列表regWorkerMap += workerid -> WorkerInfo(workerid,cpu,mem,new Date().getTime)//2. 通知slave, 告知注册成功了sender() ! RegisterSuccessMessagecase WorkerHeartBeatMessage(workerid,cpu,mem) =>println("接收到心跳包消息")// 只要key相同, 直接覆盖了regWorkerMap += workerid -> WorkerInfo(workerid,cpu,mem,new Date().getTime)}
}
spark-slave
  • 入口类
package com.lee.akka.sparkimport akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactoryobject SparkSlaveMain {def main(args: Array[String]): Unit = {/*val props = new Properties()props.setProperty("akka.actor.provider","akka.remote.RemoteActorRefProvider")props.setProperty("akka.remote.netty.tcp.hostname","127.0.0.1")props.setProperty("akka.remote.netty.tcp.port","8101")*///1. 创建 actorSystem对象val actorSystem = ActorSystem("sparkSlaveActorSystem",ConfigFactory.load())//2. 获取自己actor引用对象val actorRef = actorSystem.actorOf(Props(SparkSlaveActor) , "sparkSlaveActor")//3. 发送消息: 表示从节点以及启动了//actorRef ! "setup"}
}
  • actor类
package com.lee.akka.sparkimport java.util.UUIDimport akka.actor.{Actor, ActorSelection}import scala.util.Randomobject  SparkSlaveActor extends Actor{private  var workerid:String = _private var cpu:Int = _private var mem:Int = _private  val cpuList = List(2,4,8,12,16,24)private  val memList = List(4,8,12,16,24,28,32,64)private  var masterActorRef: ActorSelection = _override def preStart(): Unit = {// 向master发送注册信息//0 初始化注册信息:workerid = UUID.randomUUID().toStringcpu = cpuList(Random.nextInt(cpuList.size))mem = memList(Random.nextInt(memList.size))//1. 获取到master的地址引用masterActorRef= context.actorSelection("akka.tcp://sparkMasterActorSystem@127.0.0.1:8000/user/sparkMasterActor")//2. 执行发送数据masterActorRef ! WorkerRegisterMessage(workerid,cpu,mem)}override def receive = {case RegisterSuccessMessage =>println("收到注册成功消息")import scala.concurrent.duration._import context.dispatcher// 定时执行心跳操作:context.system.scheduler.schedule(0 second,ConfigUtil.`worker.heartbeat.interval` second){masterActorRef ! WorkerHeartBeatMessage(workerid,cpu,mem)}}
}

Scala系列-5、scala中的泛型、actor、akka相关推荐

  1. akka actor java_Akka:使用非默认构造函数在Scala中定义一个actor并从Java代码创建它 - java...

    Akka Scala演员必须扩展akka.actor.Actor Akka Java actor必须扩展akka.actor.UntypedActor 因此,在使用非默认构造函数定义Scala act ...

  2. Scala系列-3、scala中的类和对象有哪些?

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明. 传送门:大数据系列文章目录 目录 如何使用IDEA创建scala项目 创建普通的scala项目 ...

  3. 大数据系列修炼-Scala课程65

    大数据系列修炼-Scala课程65 核心内容:Scala中的隐式转换操作规则与如何更好的使用隐式转换操作代码实战 1.Scala中的隐式转换操作规则与如何更好的使用隐式转换操作代码实战 1>Sc ...

  4. 深圳大数据学习:Scala系列之文件以及正则表达式

    深圳大数据学习:Scala系列之文件以及正则表达式 7.1 读取行 导入scala.io.Source后,即可引用Source中的方法读取文件信息. import scala.io.Source ob ...

  5. 大数据系列修炼-Scala课程96(2)

    大数据系列修炼-Scala课程96(2) 核心内容: 1.Akka第一个案例动手实战代码总结 1.Akka第一个案例动手实战代码总结 Akka案例之WordCount完整代码: import akka ...

  6. 大数据系列修炼-Scala课程101

    大数据系列修炼-Scala课程101 核心内容: 1.使用SBT开发Akka第一个案例源码解析MapActor.ReduceActor.AggregateActor 1.使用SBT开发Akka第一个案 ...

  7. scala 方法重载_Scala中的方法重载

    scala 方法重载 Scala方法重载 (Scala method overloading) Method overloading is a method that is redefined in ...

  8. scala元组 数组_Scala中的数组

    scala元组 数组 Scala中的数组 (Arrays in Scala) An array is a linear data structure with a fixed number of el ...

  9. scala 方法调用_Scala中的方法调用

    scala 方法调用 Scala方法调用 (Scala Method Invocation) Method invocation is the legal and correct technique ...

最新文章

  1. 一个量子物理学家是怎样研究 AI 的? | 8月书讯
  2. AssertionError: Path does not exist: py-faster-rcnn/data/VOCdevkit2007/VOC2007/ImageSets/Main
  3. switch 没有default_「java面试」switch究竟怎么用,不要让它吃灰了!
  4. 基于visual Studio2013解决C语言竞赛题之1091多项式
  5. 的ppt_PPT制作小白,如和做好PPT
  6. 剑指offer面试题36. 二叉搜索树与双向链表(中序遍历)(递归)
  7. Java 相关计数问题及其实现
  8. 类c语言实验报告,内蒙古工业大学 C语言实验报告
  9. 烟雨在线要饭系统v2.0源码
  10. 电子海图制作中坐标转换的应用与实现
  11. UU跑腿前端中台方案
  12. 解决微信浏览器不能自动播放视频
  13. leapftp怎么下载文件,用leapftp怎么下载文件
  14. js html post提交表单,JS动态创建表单post提交
  15. fox邮箱新建的服务器,foxmail新建账户创建其他邮箱的操作步骤
  16. 我的日程安排表(理解代码)学习记录Java
  17. 安知我不知死之乐”的意境
  18. springmvc防xss脚本注入攻击,springmvc过滤html和js标签,html和js标签转义
  19. 【Codeforces 1392F】Omkar and Landslide | 思维、结论
  20. 十二个“一”的特征与剑三中十二门派风格相对应

热门文章

  1. 人类换头术的成功和人造器官制造
  2. linux shell中FS、OFS、RS、ORS图解
  3. 数学分析闭区间套定理_闭区间套定理在数学教学中的一个有趣应用
  4. 回头再说---江湖儿女情
  5. 视觉注意力机制(下)
  6. 区块链技术将成为金融科技的下一个风口
  7. delegate的用法
  8. OData与SAP Annotation
  9. Mysql_function函数介绍
  10. ps—什么是图层组,图层面板中的组是什么