--files path启动加载配置文件

在spark-streaming程序中需要配置文件中的数据来完成某项统计时,需要把配置文件打到工程里,maven的配置如下:

 
  1. <build>

  2. <plugins>

  3. <plugin>

  4. <groupId>org.apache.maven.plugins</groupId>

  5. <artifactId>maven-surefire-plugin</artifactId>

  6. <configuration>

  7. <skip>true</skip>

  8. </configuration>

  9. </plugin>

  10. </plugins>

  11. <resources>

  12. <resource>

  13. <directory>src/main/resources</directory>

  14. <includes>

  15. <include>**/*.txt</include>

  16. <include>*.txt</include>

  17. </includes>

  18. <filtering>true</filtering>

  19. </resource>

  20. </resources>

  21. </build>

这样在local模式下运行时没问题的,但是要放在yarn集群上就会出问题,需要用如下方式来调用:

 
  1. spark-submit --class com.kingsoft.server.KssNodeStreaming

  2. --master yarn-cluster

  3. --driver-memory 2G

  4. --executor-memory 5G

  5. --num-executors 10

  6. --jars /home/hadoop/spark-streaming-flume_2.10-1.0.1.jar,/home/hadoop/avro-ipc-1.7.5-cdh5.1.0.jar,/home/hadoop/flume-ng-sdk-1.5.0.1.jar,/home/hadoop/fastjson-1.1.41.jar

  7. --files /home/hadoop/idc_ip.txt,/home/hadoop/ipdata.txt

  8. /home/hadoop/SparkStreaming-0.0.1-SNAPSHOT.jar

  9. 0.0.0.0

  10. 58006

Spark中addFile加载配置文件

我们在使用Spark的时候有时候需要将一些数据分发到计算节点中。一种方法是将这些文件上传到HDFS上,然后计算节点从HDFS上获取这些数据。当然我们也可以使用addFile函数来分发这些文件。注意,如果是spark程序通过yarn集群上加载配置文件,path必须是集群hdfs的绝对路径,如:viewfs://58-cluster//home/hdp_lbg_supin/resultdata/zhaopin/recommend/config/redis.properties。

addFile

  addFile方法可以接收本地文件(或者HDFS上的文件),甚至是文件夹(如果是文件夹,必须是HDFS路径),然后Spark的Driver和Exector可以通过SparkFiles.get()方法来获取文件的绝对路径(Get the absolute path of a file added through SparkContext.addFile()),addFile的函数原型如下:

 
  1. def addFile(path: String): Unit

  2. def addFile(path: String, recursive: Boolean): Unit

addFile把添加的本地文件传送给所有的Worker,这样能够保证在每个Worker上正确访问到文件。另外,Worker会把文件放在临时目录下。因此,比较适合用于文件比较小,计算比较复杂的场景。如果文件比较大,网络传送的消耗时间也会增长。

path:可以是local、hdfs(任何hadoop支持的文件系统)、HTTP、HTTPS、FTP等。local方式时,在windows下使用绝对路径时需要加个“/”,如“d:/iteblog.data”得写成“/d:/iteblog.data”或“file:///d:/iteblog.data”。

   recursive:如果path是一个目录,那么我们可以设置recursive为true,这样Spark会递归地分发这个路径下面的所有文件到计算节点的临时目录。

   通过SparkFiles.get(path:String)获取添加的文件路径。

 
  1. var path = "/user/iteblog/ip.txt"

  2. sc.addFile(path)

  3. val rdd = sc.textFile(SparkFiles.get(path))

上面的实例展示了如何在Driver中获取分发出去的文件,我们还可以在Exector获取到分发的文件:

 
  1. var path = "/user/iteblog/ip.txt"

  2. sc.addFile(path)

  3. val rdd = sc.parallelize((0 to 10))

  4. rdd.foreach{ index =>

  5. val path = SparkFiles.get(path)

  6. ......

  7. }

如果我们添加的是压缩文件,比如.tar.gz.tgz或者.tar,Spark会调用Linux的解压缩命令tar去解压缩这些文件。

addJar

  addJar添加在这个SparkContext实例运行的作业所依赖的jar。,其函数原型如下:

def addJar(path: String)

path:可以是本地文件(local file)、HDFS文件(其他所有的Hadoop支持的文件系统也可以)、HTTP、 HTTPS 或者是FTP URI文件等等。

   其实Spark内部通过spark.jars参数以及spark.yarn.dist.jars函数传进去的Jar都是通过这个函数分发到Task的。

将配置文件打到工程jar包里

注意:IntelliJ IDEA创建Maven项目时,必须是Java项目,否则不能将配置文件打到工程jar包。

redis.properties配置文件示例如下:

 
  1. redis.host=xx.xx.xxx.x

  2. redis.port=6380

  3. redis.password=6f3d16c5119bb946

  4. redis.maxActive=500

  5. redis.maxWait=10000

  6. redis.maxidle=500

  7. redis.minidle=10

  8. redis.maxtotal=500

RedisClient.scala加载配置文件示例如下:

 
  1. package com.bj58.adsp.dp.files.redis

  2. import java.io.{InputStream, BufferedInputStream, FileInputStream}

  3. import java.util.Properties

  4. import org.apache.commons.pool2.impl.GenericObjectPoolConfig

  5. import redis.clients.jedis.JedisPool

  6. import scala.collection.JavaConversions._

  7. /**

  8. * Created by Administrator on 2015/10/23.

  9. */

  10. object RedisClient extends Serializable {

  11. val properties: Properties = new Properties

  12. val in: InputStream = getClass.getResourceAsStream("/redis.properties")

  13. properties.load(new BufferedInputStream(in))

  14. val redisHost = properties.getProperty("redis.host")

  15. val redisPort = properties.getProperty("redis.port")

  16. val redisPwd = properties.getProperty("redis.password")

  17. val redisTimeout = 30000

  18. val config = new GenericObjectPoolConfig()

  19. config.setTestOnBorrow(true)

  20. config.setMaxIdle(properties.getProperty("redis.maxidle").toInt)

  21. config.setMinIdle(properties.getProperty("redis.minidle").toInt)

  22. config.setMaxTotal(properties.getProperty("redis.maxtotal").toInt)

  23. lazy val pool = new JedisPool(config, redisHost, redisPort.toInt, redisTimeout, redisPwd)

  24. lazy val hook = new Thread {

  25. override def run = {

  26. println("Execute hook thread: " + this)

  27. pool.destroy()

  28. }

  29. }

  30. sys.addShutdownHook(hook.run)

  31. }

如果是读取配置文件内容,可以直接:

 
  1. val properties: Properties = new Properties

  2. val in: InputStream = getClass.getResourceAsStream("/redis.properties")

  3. properties.load(new BufferedInputStream(in))

如果是只加载配置文件,可以直接:

 
  1. public static RedisClient init() {

  2. RedisClient client = null;

  3. try {

  4. // 根据配置文件初始化Redis客户端

  5. client = RedisClient.getInstance(getClass.getResource("/redis.properties"));

  6. } catch (Exception e) {

  7. e.printStackTrace();

  8. }

  9. return client;

  10. }

直接将配置文件内容写到程序中

示例如下:

 
  1. lines.foreachRDD(rdd => {

  2. //embedded function

  3. def func(records: Iterator[String]) {

  4. var conn: Connection = null

  5. var stmt: PreparedStatement = null

  6. try {

  7. val url = "jdbc:mysql://xx.xxx.xx.xxx:3307/supindb"

  8. val user = "root"

  9. val password = "root"

  10. conn = DriverManager.getConnection(url, user, password)

  11. records.flatMap(_.split(" ")).foreach(word => {

  12. val sql = "insert into mytable(word) values(?)"

  13. stmt = conn.prepareStatement(sql)

  14. stmt.setString(1, word)

  15. stmt.executeUpdate()

  16. })

  17. } catch {

  18. case e: Exception => e.printStackTrace()

  19. } finally {

  20. if (stmt != null) {

  21. stmt.close()

  22. }

  23. if (conn != null) {

  24. conn.close()

  25. }

  26. }

  27. }

  28. val repartitionedRDD = rdd.repartition(4)

  29. repartitionedRDD.foreachPartition(func)

  30. })

使用addFile("__app__.jar")方式

如果程序中调用其他服务,而其他服务需要加载配置文件,则可以将程序打成jar包,并命名为__app__.jar,使用sc.addFile("__app__.jar")方式即可。

Refer:

http://blog.csdn.net/aaa1117a8w5s6d/article/details/43090017

http://www.iteblog.com/archives/1704

Spark加载外部配置文件相关推荐

  1. @value 静态变量_Spring注解驱动开发之八——@Value属性赋值、@PropertySource 加载外部配置文件...

    本文包含以下内容: 建立新的配置类 建立新的测试方法 通过@Value 进行赋值 通过@PropertySource  加载配置文件,并进行注入 拓展@Value  .@PropertySource ...

  2. 【Spark】SparkStreaming-加载外部配置文件

    SparkStreaming-加载外部配置文件 spark加载配置文件_百度搜索Spark加载外部配置文件 - CSDN博客spark读取配置文件中的配置 - CSDN博客spark加载propert ...

  3. springboot加载外部xml_SpringBoot读取外部配置文件的方法

    1.SpringBoot配置文件 SpringBoot使用一个以application命名的配置文件作为默认的全局配置文件.支持properties后缀结尾的配置文件或者以yml/yaml后缀结尾的Y ...

  4. Spark动态加载外部资源文件

    Spark动态加载外部资源文件 1.spark-submit --files 动态加载外部资源文件 之前做一个关于Spark的项目时,因项目中需要读取某个静态资源文件,然后在本地IDEA测试一切皆正常 ...

  5. 账户的配置使您无法使用该计算机,2个方法解决“user profile service服务未能登录无法加载用户配置文件”...

    win8/10系统:启动电脑显示windows标志时,长按电源键强行关机,重复此操作三次,系统将会进入"自动修复".然后在"自动修复"界面中,选择"高 ...

  6. java配置文件实现方式_java相关:详解Spring加载Properties配置文件的四种方式

    java相关:详解Spring加载Properties配置文件的四种方式 发布于 2020-4-29| 复制链接 摘记: 一.通过 context:property-placeholder 标签实现配 ...

  7. 使用jQuery和YQL,以Ajax方式加载外部内容

    我们来看看怎样使用jQuery,以Ajax方式加载外部(其他域上)的内容.这里的所有代码都可以从GitHub下载,也可以在这个演示页面中获取,因而不用复制粘贴了. OK,Ajax通过jQuery是很容 ...

  8. Spring中加载xml配置文件的六种方式

    Spring中加载xml配置文件的六种方式 博客分类: Spring&EJB XMLSpringWebBeanBlog  因为目前正在从事一个项目,项目中一个需求就是所有的功能都是插件的形式装 ...

  9. ajax如何请求json文件,简单的ajax请求加载外部json文件

    我在学习ajax ....我试图从json文件发出一个基本请求,它与我的index.html位于同一个文件夹中,但由于某种原因它说未定义:(我可以看到错误是可变的人,但我不能赶上为什么它未定义.... ...

最新文章

  1. qsort七种排序方法
  2. Spring Security 实战干货:自定义配置类入口 WebSecurityConfigurerAdapter
  3. 分享产品发布的10条经验
  4. JavaScript的预编译及执行顺序
  5. 简单易懂的自动驾驶科普知识
  6. oracle数据库启动
  7. 我的程序都是这样命名的:openeim001
  8. 中移4G模块-ML302-OpenCpu开发-前端网页搭建
  9. Windows Server 2008更新后不断重启的问题
  10. Camtasia实用技巧之智能聚焦
  11. Word2007发布文章成功
  12. 微信电脑多开,骚操作走起
  13. 联想服务器修改mac,修改Thinkpad E420的无线网卡MAC地址?
  14. 用尽量简单地话,一次讲明白傅里叶级数(FS)、傅里叶变换(FT)、离散时间傅里叶变换(DTFT)、离散傅里叶级数(DFS)、离散傅里叶变换(DFT)以及它们之间的联系和区别。
  15. Android 控件右上角角标的实现方案
  16. 如何用Scrum做变革管理的落地实施
  17. PHP环境搭建(非集成)
  18. ybt1287 最低通行费
  19. 360卫士 是 木马?
  20. 虚拟DOM中的key

热门文章

  1. php中的rtrim_php中ltrim()、rtrim()与trim()删除字符空格实例
  2. ssh连接出现:WARNING: REMOTE HOST IDENTIFICATION HAS CHANGED!
  3. 聚合链路出现Destination host unreachable
  4. java怎样调用DLL方法
  5. python命令行进入帮助模式_python命令行模式直接查看帮助
  6. stm32呼吸灯程序_学习STM32从点灯开始!
  7. python自带gui_Python GUI开发工具中五种类型的相关介绍
  8. php 不等于 的函数吗,PHP不常见的函数整理
  9. .net ajax批量删除,asp.net 全部选中与取消操作,选中后的删除(ajax)实现无刷新效果...
  10. matlab非线性回归delta,讲解:Delta-sigma、Matlab、analog-to-digital、MatlabPython|SQ