2019独角兽企业重金招聘Python工程师标准>>>

版本号:

maven3.5.0     scala IDE for Eclipse:版本(4.6.1)    spark-2.1.1-bin-hadoop2.7    kafka_2.11-0.8.2.1   JDK1.8

基础环境:

Maven3.5.0安装与配置+Eclipse应用

Maven下载项目依赖jar包和使用方法

maven中把依赖的JAR包一起打包

MAVEN Scope使用

一、指定JDK为1.8

在pom.xml配置文件中添加以下参数即可:

 
  1. <properties>
  2. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  3. <encoding>UTF-8</encoding>
  4. <java.version>1.8</java.version>
  5. <maven.compiler.source>1.8</maven.compiler.source>
  6. <maven.compiler.target>1.8</maven.compiler.target>
  7. </properties>
 
  1. <plugin>
  2. <groupId>org.apache.maven.plugins</groupId>
  3. <artifactId>maven-compiler-plugin</artifactId>
  4. <configuration>
  5. <source>1.8</source>
  6. <target>1.8</target>
  7. </configuration>
  8. </plugin>

配置之后的pom.xml文件如下:

 
  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  2. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  3. <modelVersion>4.0.0</modelVersion>
  4. <groupId>Test</groupId>
  5. <artifactId>test</artifactId>
  6. <version>0.0.1-SNAPSHOT</version>
  7. <packaging>jar</packaging>
  8. <name>test</name>
  9. <url>http://maven.apache.org</url>
  10. <properties>
  11. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  12. <encoding>UTF-8</encoding>
  13. <!-- 配置JDK为1.8 -->
  14. <java.version>1.8</java.version>
  15. <maven.compiler.source>1.8</maven.compiler.source>
  16. <maven.compiler.target>1.8</maven.compiler.target>
  17. </properties>
  18. <dependencies>
  19. <dependency>
  20. <groupId>junit</groupId>
  21. <artifactId>junit</artifactId>
  22. <version>3.8.1</version>
  23. <scope>test</scope>
  24. </dependency>
  25. </dependencies>
  26. <build>
  27. <plugins>
  28. <!-- 配置JDK为1.8 -->
  29. <plugin>
  30. <groupId>org.apache.maven.plugins</groupId>
  31. <artifactId>maven-compiler-plugin</artifactId>
  32. <configuration>
  33. <source>1.8</source>
  34. <target>1.8</target>
  35. </configuration>
  36. </plugin>
  37. <!-- 配置打包依赖包maven-assembly-plugin -->
  38. <plugin>
  39. <artifactId> maven-assembly-plugin </artifactId>
  40. <configuration>
  41. <descriptorRefs>
  42. <descriptorRef>jar-with-dependencies</descriptorRef>
  43. </descriptorRefs>
  44. <archive>
  45. <manifest>
  46. <mainClass></mainClass>
  47. </manifest>
  48. </archive>
  49. </configuration>
  50. <executions>
  51. <execution>
  52. <id>make-assembly</id>
  53. <phase>package</phase>
  54. <goals>
  55. <goal>assembly</goal>
  56. </goals>
  57. </execution>
  58. </executions>
  59. </plugin>
  60. </plugins>
  61. </build>
  62. </project>

二、配置Spark依赖包

查看spark-2.1.1-bin-hadoop2.7/jars目录下的jar包版本

到maven远程仓库http://mvnrepository.com中搜索对应jar包即可。

1、配置spark-core_2.11-2.1.1.jar

往pom.xml文件中添加以下配置:

 
  1. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
  2. <dependency>
  3. <groupId>org.apache.spark</groupId>
  4. <artifactId>spark-core_2.11</artifactId>
  5. <version>2.1.1</version>
  6. <scope>runtime</scope>
  7. </dependency>

为了后面打包时把依赖包也一起打包,需要把<scope>provided</scope>配置成<scope>runtime</scope>。

2、配置spark-streaming_2.11-2.1.1.jar

往pom.xml文件中添加以下配置:

 
  1. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.11 -->
  2. <dependency>
  3. <groupId>org.apache.spark</groupId>
  4. <artifactId>spark-streaming_2.11</artifactId>
  5. <version>2.1.1</version>
  6. <scope>runtime</scope>
  7. </dependency>

为了后面打包时把依赖包也一起打包,需要把<scope>provided</scope>配置成<scope>runtime</scope>。

三、配置Spark+Kafka

1、配置spark-streaming-kafka-0-8_2.11-2.1.1.jar

往pom.xml文件中添加以下配置:

 
  1. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11 -->
  2. <dependency>
  3. <groupId>org.apache.spark</groupId>
  4. <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
  5. <version>2.1.1</version>
  6. </dependency>

四、pom.xml完整配置内容

 
  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  2. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  3. <modelVersion>4.0.0</modelVersion>
  4. <groupId>Test</groupId>
  5. <artifactId>test</artifactId>
  6. <version>0.0.1-SNAPSHOT</version>
  7. <packaging>jar</packaging>
  8. <name>test</name>
  9. <url>http://maven.apache.org</url>
  10. <properties>
  11. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  12. <encoding>UTF-8</encoding>
  13. <!-- 配置JDK为1.8 -->
  14. <java.version>1.8</java.version>
  15. <maven.compiler.source>1.8</maven.compiler.source>
  16. <maven.compiler.target>1.8</maven.compiler.target>
  17. </properties>
  18. <dependencies>
  19. <dependency>
  20. <groupId>junit</groupId>
  21. <artifactId>junit</artifactId>
  22. <version>3.8.1</version>
  23. <scope>test</scope>
  24. </dependency>
  25. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
  26. <dependency>
  27. <groupId>org.apache.spark</groupId>
  28. <artifactId>spark-core_2.11</artifactId>
  29. <version>2.1.1</version>
  30. <scope>runtime</scope>
  31. </dependency>
  32. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.11 -->
  33. <dependency>
  34. <groupId>org.apache.spark</groupId>
  35. <artifactId>spark-streaming_2.11</artifactId>
  36. <version>2.1.1</version>
  37. <scope>runtime</scope>
  38. </dependency>
  39. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11 -->
  40. <dependency>
  41. <groupId>org.apache.spark</groupId>
  42. <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
  43. <version>2.1.1</version>
  44. </dependency>
  45. </dependencies>
  46. <build>
  47. <plugins>
  48. <!-- 配置JDK为1.8 -->
  49. <plugin>
  50. <groupId>org.apache.maven.plugins</groupId>
  51. <artifactId>maven-compiler-plugin</artifactId>
  52. <configuration>
  53. <source>1.8</source>
  54. <target>1.8</target>
  55. </configuration>
  56. </plugin>
  57. <!-- 配置打包依赖包maven-assembly-plugin -->
  58. <plugin>
  59. <artifactId> maven-assembly-plugin </artifactId>
  60. <configuration>
  61. <descriptorRefs>
  62. <descriptorRef>jar-with-dependencies</descriptorRef>
  63. </descriptorRefs>
  64. <archive>
  65. <manifest>
  66. <mainClass></mainClass>
  67. </manifest>
  68. </archive>
  69. </configuration>
  70. <executions>
  71. <execution>
  72. <id>make-assembly</id>
  73. <phase>package</phase>
  74. <goals>
  75. <goal>assembly</goal>
  76. </goals>
  77. </execution>
  78. </executions>
  79. </plugin>
  80. </plugins>
  81. </build>
  82. </project>

五、本地开发spark代码上传spark集群服务并运行

JavaDirectKafkaCompare.java

 
  1. package com.spark.main;
  2. import java.util.HashMap;
  3. import java.util.HashSet;
  4. import java.util.Arrays;
  5. import java.util.Iterator;
  6. import java.util.Map;
  7. import java.util.Set;
  8. import java.util.regex.Pattern;
  9. import scala.Tuple2;
  10. import kafka.serializer.StringDecoder;
  11. import org.apache.spark.SparkConf;
  12. import org.apache.spark.api.java.function.*;
  13. import org.apache.spark.streaming.api.java.*;
  14. import org.apache.spark.streaming.kafka.KafkaUtils;
  15. import org.apache.spark.streaming.Durations;
  16. public class JavaDirectKafkaCompare {
  17. public static void main(String[] args) throws Exception {
  18. /**
  19. * setMaster("local[2]"),至少要指定两个线程,一条用于用于接收消息,一条线程用于处理消息
  20. *  Durations.seconds(2)每两秒读取一次kafka
  21. */
  22. SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount").setMaster("local[2]");
  23. JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
  24. /**
  25. * checkpoint("hdfs://192.168.168.200:9000/checkpoint")防止数据丢包
  26. */
  27. jssc.checkpoint("hdfs://192.168.168.200:9000/checkpoint");
  28. /**
  29. * 配置连接kafka的相关参数
  30. */
  31. Set<String> topicsSet = new HashSet<>(Arrays.asList("test"));
  32. Map<String, String> kafkaParams = new HashMap<>();
  33. kafkaParams.put("metadata.broker.list", "192.168.168.200:9092");
  34. // Create direct kafka stream with brokers and topics
  35. JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
  36. jssc,
  37. String.class,
  38. String.class,
  39. StringDecoder.class,
  40. StringDecoder.class,
  41. kafkaParams,
  42. topicsSet
  43. );
  44. // Get the lines, split them into words, count the words and print
  45. /**
  46. * _2()获取第二个对象的值
  47. */
  48. JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
  49. @Override
  50. public String call(Tuple2<String, String> tuple2) {
  51. return tuple2._2();
  52. }
  53. });
  54. String sfzh = "432922196105276721";
  55. JavaDStream<String> wordCounts = lines.filter(new Function<String, Boolean>(){
  56. @Override
  57. public Boolean call(String s) throws Exception {
  58. // TODO Auto-generated method stub
  59. /**
  60. * 通过身份证号筛选出相关数据
  61. */
  62. if(s.contains(sfzh)){
  63. System.out.println("比对出来的结果:" + s);
  64. return true;
  65. }
  66. return false;
  67. }
  68. });
  69. wordCounts.print();
  70. // Start the computation
  71. jssc.start();
  72. jssc.awaitTermination();
  73. }
  74. }

右键Run As ------>Maven install,运行成功之后,会在target目录生成一个test-0.0.1-SNAPSHOT-jar-with-dependencies.jar,把该jar包复制到LInux集群环境下的SPARK_HOME/myApp目录下:

执行命令:

 
  1. cd /usr/local/spark/spark-2.1.1-bin-hadoop2.7;
  2. bin/spark-submit --class "com.spark.main.JavaDirectKafkaCompare" --master local[4] myApp/test-0.0.1-SNAPSHOT-jar-with-dependencies.jar;

六、附上离线Maven仓库

下载地址:  链接:http://pan.baidu.com/s/1eS7Ywme 密码:y3qz

转载于:https://my.oschina.net/u/3346994/blog/1596851

Maven+Eclipse+SparkStreaming+Kafka整合相关推荐

  1. SparkStreaming+kafka+flume+hbase日志实时流处理项目

    1.项目背景: 互联网访问日志概述 为什么要记录访问日志的行为呢? 通过日志我们可以得到网站页面的访问量,网站的黏性,推荐用户行为分析,是指在获得网站访问量基本数据的情况下,对有关数据进行统计.分析, ...

  2. spark第十篇:Spark与Kafka整合

    spark与kafka整合需要引入spark-streaming-kafka.jar,该jar根据kafka版本有2个分支,分别是spark-streaming-kafka-0-8和spark-str ...

  3. 【ssh从零单排】eclipse和tomcat整合配置

    (转载请注明出处:http://blog.csdn.net/buptgshengod) 1.Tomcat配置       Tomcat是现在比较流行的一种web局域网调试用服务器(我的理解可能不准确) ...

  4. sparkStreaming+kafka SparkException: java.nio.channels.ClosedChannelException异常报错

    在运行sparkStreaming+kafka的时候报错 java io报错, 如果broker-list的端口不对或者kafka服务端未启动,会遇到以下错误: Exception in thread ...

  5. 【Kafka】测试Kafka整合Flume

    本文简单测试Kafka整合Flume,从而实现"日志 -> Flume -> Kafka". 操作环境: Kafka版本:1.0.1 Flume版本:1.6.0 测试前 ...

  6. 大数据———Flume与Kafka整合

    环境配置 名称 版本 下载地址 Centos 7.0 64x 百度 Flume 1.8.0 http://flume.apache.org/download.html Kafka 2.11 http: ...

  7. maven环境下SSH整合

    目录 Maven环境下SSH整合 1 目录结构: 1 1. 导入jar包 2 1.1搭建maven环境 2 1.2配置pom.xml文件 2 1.2.1 pom.xml 2 2. 搭建struts2环 ...

  8. 超详细!一文告诉你 SparkStreaming 如何整合 Kafka !附代码可实践

    来源 | Alice菌 责编 | Carol 封图 |  CSDN 下载于视觉中国 相信很多小伙伴已经接触过 SparkStreaming 了,理论就不讲太多了,今天的内容主要是为大家带来的是 Spa ...

  9. 超详细!一文详解 SparkStreaming 如何整合 Kafka !附代码可实践

    来源 | Alice菌 责编 | Carol 封图 |  CSDN 下载于视觉中国 出品 | CSDN(ID:CSDNnews) 相信很多小伙伴已经接触过 SparkStreaming 了,理论就不讲 ...

最新文章

  1. html5圆形图片轮播,jQuery超酷响应式圆形图片轮播图特效
  2. python括号匹配算法_使用Python的栈实现括号匹配算法
  3. iOS 快速定位到系统设置界面
  4. 第6章-一阶多智体系统一致性-->6.5 带有领航者系统一致性
  5. java 数据结构 快速入门_Java 数据结构快速入门
  6. PHP大批量正则,php – 正则表达式匹配无限数量的选项
  7. linux server文件,linux两台server远程copy文件
  8. python输入流和输出流_python读写gbk、utf-8等输入输出流
  9. android 快速布局,快速实现android的协同布局CoordinatorLayout
  10. Linux基础命令---间歇执行命令watch
  11. 求教一个关于网站抓取生成地图的问题
  12. 编译OpenJDK8:configure error /usr/lib64/ccache/gcc is a symbolic link to ccache
  13. php刷屏代码,PHP防止刷屏
  14. 文字转语音软件哪个好,这一款值得推荐
  15. 如何将电子签名透明化处理
  16. 八字易经算法之用JAVA实现完整排盘系统
  17. SQL查询以某个字母开头
  18. mysql procedure 存储过程
  19. “二舅”火了,自媒体短视频“爆火”的基本要素,你知道吗?
  20. 【无标题】win7系统支持node14以上的版本

热门文章

  1. jmeter+接口测试练习+接口关联+Json提取
  2. 使用Docker+Grafana+InfluxDB可视化展示Jenkins构建信息
  3. 从事测试的第6年 , 开工第二天五千字总结..我不平凡的2021
  4. 软件测试必学之python+unittest+requests+HTMLRunner编写接口自动化测试集
  5. 软件测试人不得不读的经典书籍推荐
  6. java流有什么用_在Java中,流比循环有什么优势?
  7. idea findbugs使用_IDEA如何协同开发统一代码风格?编码不规范如何解决?
  8. realtime multi-person 2D pose estimation using part affinity fields
  9. 计算机终止程序按钮,怎样在VisualBasic中终止计算机系统呢?
  10. java. tcp. 权限,java - tcpdump的不能够写PCAP文件。没有权限 - SO中文参考 - www.soinside.com...