目录

  • 一、无业务流程
  • 二、求和sum
  • 三、flume+KafkaStreaming

kafka-Streaming即将一个topic的数据经过业务处理后传输到另一个topic中


一、无业务流程

  1. 创建topic
kafka-topics.sh --zookeeper 192.168.184.40:2181 --create --topic mystreamin --partitions 3 --replication-factor 1kafka-topics.sh --zookeeper 192.168.184.40:2181 --create --topic mystreamout --partitions 3 --replication-factor 1cala
  1. kafkaStreaming代码除业务部分格式固定
package kafka_stream;import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import scala.collection.immutable.Stream;import java.util.Properties;
import java.util.concurrent.CountDownLatch;/**
* @Author Cai
* @date 2020/12/15
* @Des
*/
public class MyStream {public static void main(String[] args) {Properties prop = new Properties();prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"mystream");prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.184.40:9092");prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());//创建流构造器StreamsBuilder builder = new StreamsBuilder();//构建好builder 将topic的数据写入另一个topic中,无业务流程builder.stream("mystreamin").to("mystreamout");Topology topo = builder.build();final KafkaStreams streams = new KafkaStreams(topo, prop);final CountDownLatch latch = new CountDownLatch(1);Runtime.getRuntime().addShutdownHook(new Thread("stream"){@Overridepublic void run() {streams.close();latch.countDown();}});streams.start();try {latch.await();} catch (InterruptedException e) {e.printStackTrace();}System.exit(0);}
}
  1. producer
kafka-console-producer.sh --topic mystreamin --broker-list 127.0.0.1:9092
  1. consumer
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic mystreamout --from-beginning
  1. 运行代码

二、求和sum

package kafka_stream;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.KTable;import java.util.Properties;
import java.util.concurrent.CountDownLatch;/**
* @Author Cai
* @date 2020/12/15
* @Des
*/
public class sumStream {public static void main(String[] args) {Properties prop = new Properties();prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"sum");prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.184.40:9092");prop.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,3000);prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest"); //消费位置prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");   //自动提交设置prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());//创建流构造器/*5  5,5*5  5,10*/StreamsBuilder builder = new StreamsBuilder();KTable<String, String> source = builder.stream("sum_in")    //从kafka一条一条取数据.map((k, v) -> {return new KeyValue<>("sum", v.toString());}).groupByKey().reduce((x, y) -> {System.out.println("x:"+x+",y:"+y);Integer sum = Integer.parseInt(x) + Integer.parseInt(y);System.out.println("sum:"+sum);return sum.toString();});source.toStream().to("sum_out");Topology topo = builder.build();final KafkaStreams streams = new KafkaStreams(topo,prop);final CountDownLatch latch = new CountDownLatch(1);Runtime.getRuntime().addShutdownHook(new Thread("stream"){@Overridepublic void run() {streams.close();latch.countDown();}});streams.start();try {latch.await();} catch (InterruptedException e) {e.printStackTrace();}System.exit(0);}
}

三、flume+KafkaStreaming

  1. 创建kafka的topic
kafka-topics.sh --zookeeper 192.168.184.40:2181 --create --topic event_attendees_raw --partitions 1 --replication-factor 1kafka-topics.sh --zookeeper 192.168.184.40:2181 --create --topic event_attendees --partitions 1 --replication-factor 1
  1. 编写flume配置
    vi attendees-flume-kafka.conf
attendees.sources=attendeesSource
attendees.channels=attendeesChannel
attendees.sinks=attendeesSinkattendees.sources.attendeesSource.type=spooldir
attendees.sources.attendeesSource.spoolDir=/opt/flume/conf/job/dataSourceFile/attendees
attendees.sources.attendeesSource.deserializer=LINE
attendees.sources.attendeesSource.deserializer.maxLineLength=200000
attendees.sources.attendeesSource.includePattern=event_attendees_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv
attendees.sources.attendeesSource.interceptors=head_filter
attendees.sources.attendeesSource.interceptors.head_filter.type=regex_filter
attendees.sources.attendeesSource.interceptors.head_filter.regex=^event*
attendees.sources.attendeesSource.interceptors.head_filter.excludeEvents=trueattendees.channels.attendeesChannel.type=file
attendees.channels.attendeesChannel.checkpointDir=/opt/flume/conf/job/checkpointFile/attendees
attendees.channels.attendeesChannel.dataDirs=/opt/flume/conf/job/dataChannelFile/attendeesattendees.sinks.attendeesSink.type=org.apache.flume.sink.kafka.KafkaSink
attendees.sinks.attendeesSink.batchSize=640
attendees.sinks.attendeesSink.brokerList=192.168.184.40:9092
attendees.sinks.attendeesSink.topic=event_attendees_rawattendees.sources.attendeesSource.channels=attendeesChannel
attendees.sinks.attendeesSink.channel=attendeesChannel
  1. Java代码
package kafka_stream;import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.KStream;import java.util.ArrayList;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;/**
* @Author Cai
* @date 2020/12/18
* @Des
*/
public class EventAttendStream {public static void main(String[] args) {Properties prop = new Properties();prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"EventAttendStream1");prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.184.40:9092");prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());//创建流构造器StreamsBuilder builder = new StreamsBuilder();KStream<Object, Object> ear = builder.stream("event_attendees_raw");KStream<String, String> eventKStream = ear.flatMap((k, v) -> {  //event,yes,maybe,invited,noSystem.out.println(k + " " + v);String[] split = v.toString().split(",");ArrayList<KeyValue<String, String>> list = new ArrayList<>();if (split.length >= 2 && split[1].trim().length() > 0) {String[] yes = split[1].split("\\s+");for (String y : yes) {list.add(new KeyValue<String, String>(null, split[0] + "," + y + ",yes"));}}if (split.length>=3 && split[2].trim().length()>0){String[] maybe = split[2].split("\\s+");for (String mb : maybe) {list.add(new KeyValue<String, String>(null,split[0]+","+mb+",maybe"));}}if (split.length>=4 && split[3].trim().length()>0){String[] invited = split[3].split("\\s+");for (String in : invited) {list.add(new KeyValue<String, String>(null,split[0]+","+in+",invited"));}}if (split.length==5 && split[4].trim().length()>0){String[] no = split[4].split("\\s+");for (String n : no) {list.add(new KeyValue<String, String>(null,split[0]+","+n+",no"));}}return list;});eventKStream.to("event_attendees");Topology topo = builder.build();final KafkaStreams streams = new KafkaStreams(topo, prop);final CountDownLatch latch = new CountDownLatch(1);Runtime.getRuntime().addShutdownHook(new Thread("stream"){@Overridepublic void run() {streams.close();latch.countDown();}});streams.start();try {latch.await();} catch (InterruptedException e) {e.printStackTrace();}System.exit(0);}
}
  1. kafka消费
kafka-console-consumer.sh --topic event_attendees --bootstrap-server 192.168.184.40:9092 --from-beginning
  1. 运行java代码

kafkaStreaming相关推荐

  1. spark streaming kafka Couldn't find leader

    问题描述: 使用spark streaming接受kafka数据(使用direct方式)报错 Couldn't find leader offsets for Set([test,0], [test, ...

  2. Spark之SparkStreaming数据源

    SparkStreaming的数据源 文件 Flume Kafka: DStreams输入 Spark Streaming原生支持一些不同的数据源.一些"核心"数据源已经被打包到S ...

  3. Educoder中Spark算子--java版本

    第1关:QueueStream 编程要求 在右侧编辑器补充代码,完成以下需求: 将时间戳转换成规定格式的时间形式(格式为:yyyy-MM-dd HH:mm:ss ) 提取数据中的起始URL(切割符为空 ...

  4. Kafka 处理器客户端介绍

    [编者按]本文作者为 Bill Bejeck,主要介绍如何有效利用新的 Apache Kafka 客户端来满足数据处理需求.文章系国内 ITOM 管理平台 OneAPM 编译呈现,以下为正文. 如果你 ...

  5. SparkStreaming之Offset管理、胖包和瘦包提交

    1.Offset管理 Kafka从0.10.x开始Offset偏移量就自从维护在Kafka内部中,看下面代码. 注意,我们使用的是earliest从头开始消费,也就是说如果你的SparkStreami ...

  6. Apache Storm 2.0.0 LowLevel 新版

    Apache Storm 2.0(新版) 作者:jiangzz 电话:15652034180 微信:jiangzz_wx 微信公众账号:jiangzz_wy 百知教育 Storm是什么? Storm是 ...

  7. 阿里大佬倾情力荐:Java全线成长宝典,从P5到P8一应俱全

    前言 对于大部分的程序员来说,清晰地规划自己的职业发展并不是一件容易的事情.作为一个曾经底子比较差,从底层摸爬滚打多年走过来的程序员,在这里分享一下对我帮助很大的一份宝典,希望同行们能快速掌握这些技术 ...

  8. 阿里倾情力荐:Java全线成长宝典,从P5到P8一应俱全

    前言 对于大部分的程序员来说,清晰地规划自己的职业发展并不是一件容易的事情.作为一个曾经底子比较差,从底层摸爬滚打多年走过来的程序员,在这里分享一下对我帮助很大的一份宝典,希望同行们能快速掌握这些技术 ...

  9. 阿里资深架构师倾情力荐:Java 全线成长宝典,P5 到 P8 一应俱全

    前言 对于大部分的程序员来说,清晰地规划自己的职业发展并不是一件容易的事情.作为一个曾经底子比较差,从底层摸爬滚打多年走过来的程序员,在这里分享一下对我帮助很大的一份宝典,希望同行们能快速掌握这些技术 ...

最新文章

  1. strtok()思考
  2. #翻译NO.4# --- Spring Integration Framework
  3. 沃尔沃挖机计算机故障,沃尔沃挖掘机常见故障及原因总结,用户们可以看看
  4. 【记录】idea创建springboot多模块项目
  5. 洛谷——P1161 开灯
  6. 9000多篇投稿,接收率只有15%,今年的AAAI你中了吗?
  7. Python自然语言处理学习笔记(60):7.1 信息抽取
  8. 基于FTP服务器搭建yum源
  9. 浅析StackTrace
  10. Python的学习笔记案例4--52周存钱挑战2.0
  11. 相关系数计算机计算方法,计算相关系数的公式(相关系数的计算方法)
  12. QT笔记——Qt动态属性 之 unpolish() 和 polish()
  13. 【RASA】NLU模块组件分析
  14. HTML制作WORD表格
  15. 修改html2canvas生成图片的dpi
  16. mysql 导入tsv文件_HBase数据迁移(2)- 使用bulk load 工具从TSV文件中导入数据
  17. java 同时返回两个参数,如何在java中返回两个参数
  18. Oracle数据库练习2
  19. 富婆套路深,代码才是真...
  20. sas如何显示行数_SAS中获取数据集观测值个数

热门文章

  1. (4)Reactor 3快速上手——响应式Spring的道法术器
  2. “顶梁柱”落地青川,救助一个人,撑起一个家,换你稳稳的幸福!
  3. PHP JSON使用
  4. Bishop 模式识别与机器学习读书笔记 || 线性分类模型之判别函数的几何建模
  5. python列表增删改查函数_Python之List列表的增删改查
  6. 瑞星杀毒软件、奇虎360杀毒软件、360卫士、百度卫士联手,搞不定弹出广告 恶意广告图标
  7. 购物车模块的功能设计
  8. 计算机毕业设计Java“花园街道”社区医院服务系统(源码+系统+mysql数据库+lw文档)
  9. Android简易音乐重构MVVM Java版 -搭建项目(八)
  10. 淘宝api开放平台SDK调用对接淘宝或天猫