采集文件到kafka
采集指定目录下文本数据到kafka
package com.shenyuchong; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.IOException; import java.net.HttpURLConnection; import java.net.URL; import java.util.Date; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.regex.Pattern; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; /** 用途:* 用于收集多个文件夹下文件内容到kafka;* 文件一行一行发送;* 支持发送完成后发出通知* 文件发送完成后会将文件添加.COMPLETED后缀* 支持采集指定后缀(多个)* 支持对行进行正则,不匹配的行丢弃* 仅支持对行进行分隔符切分* 支持将切分后的字段按新分隔符重组* * 用法:* mvn package打包成jar包:* file2kafka-2.0.jar* 编写配置文件xxx.conf内容如下:* ip=192.168.1.91* threadnum=20* port=9092* topic=customertopic* path=/home/ftpuser/customer* includesuffix=txt* lineregex=^#\d.*$* delimiter=\s+* noticeurl=http://192.168.1.92:6009/schedule/customer* fieldsquence=id,name,score* 执行:* java -jar file2kafka-2.0.jar xxx.conf* 建议:用linux crontab进行定时执行(对同一个目录进行多次采集不会造成数据重复发送)*/ public class App {public static String fieldSquence = "";public static int fieldNum = 0;public static String ip = "";public static String port = "";public static String path = "";public static String threadNum = "5";public static String topic = "";public static String lineRegex = "^.*$";public static String delimiter = "\\s+";public static String delimiter2 = "|||";public static String includeSuffix = "aSuffix,bSuffix";public static Pattern linePattern =null;public static Properties props =null;public static String noticeUrl;public static void main(String[] args) {/** 配置文件若不存在则抛出异常*/if(args.length<1){try {throw new Exception("无配置文件");} catch (Exception e) {e.printStackTrace();}}try {BufferedReader br = new BufferedReader(new FileReader(new File(args[0])));String line="";while((line=br.readLine())!=null){line = line.replaceAll("\\s+", "");if(line.indexOf("=")!=-1){String[] kv=line.split("=");String k= kv[0];String v= kv[1];if (k.equals("port")) port = v; //kafka 端口if (k.equals("ip")) ip = v; //kafka 主机地址if (k.equals("topic")) topic = v; //kafka 主题if (k.equals("fieldsquence")) fieldSquence = v; //字段序列,逗号隔开if (k.equals("threadnum")) threadNum = v; //采集线程数if (k.equals("path")) path = v; //采集的目录,多目录逗号隔开if (k.equals("lineregex")) lineRegex=v; //行正则,不匹配的行数据丢弃if (k.equals("delimiter")) delimiter=v; //字段分隔符if (k.equals("delimiter2")) delimiter2=v; //重组分隔符(发送到Kafka)if (k.equals("includesuffix")) includeSuffix=v; //包含文件的后缀if (k.equals("noticeurl")) noticeUrl=v; //采集完成通知的接口 }}br.close();} catch (IOException e1) {e1.printStackTrace();}/** kafka配置*/props = new Properties();props.put("bootstrap.servers", ip+":"+port);props.put("acks", "all");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");/** 将字段序列按逗号分隔,并获取字段序数目*/fieldNum = fieldSquence.split(",").length;/** 行数据正则Pattern*/linePattern= Pattern.compile(lineRegex);/** 线程池*/ExecutorService es = Executors.newFixedThreadPool(Integer.valueOf(threadNum));/** 根据path目录获取文件* 根据includesuffix选中文件调用send(file)* 每个文件创建一个线程(线程实际总数由threadNum决定)*/for(String path:path.split(",")){File dir=new File(path);File[] files = dir.listFiles();for(final File file:files){for(String suffix:includeSuffix.split(",")){if(file.getAbsolutePath().endsWith(suffix)){es.submit(new Runnable() {@Overridepublic void run() {send(file); }});}}}}/** 关闭线程池*/es.shutdown();/** 线程池停止后通知后续服务直到后续服务接受了请求*/boolean stop=false,noticed=false;try {while(!stop||!noticed){if (es.isTerminated()) { stop=true;} Thread.sleep(2000);if(stop){noticed = connectSuccess(noticeUrl);}}} catch (Exception e) {e.printStackTrace();}}/** 读取文件并发送到kafka,文件内容发送完成后将文件添加.COMPLETED后缀*/public static void send(File file){BufferedReader bf =null;StringBuffer sb = null;try { bf = new BufferedReader(new FileReader(file));String line = null;Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());while((line = bf.readLine())!=null){sb = new StringBuffer();line = line.trim();if(linePattern.matcher(line).matches()){String[] fields = line.split(delimiter);if(fields.length<fieldNum){}else{for(String fieldValue:fields)sb.append(fieldValue).append(delimiter2);sb.append(file.getAbsolutePath());producer.send(new ProducerRecord<String, String>(topic, String.valueOf((new Date()).getTime()), sb.toString()),new Callback() {@Overridepublic void onCompletion(RecordMetadata rm, Exception e) {if(e!=null)System.out.println("send fail"+rm.toString()+",e:"+e.getMessage());}});}}else{}}producer.close();} catch (Exception e) {System.out.println(e.toString());}finally {if(bf!=null)try {bf.close();} catch (Exception e) {e.printStackTrace();}}file.renameTo(new File(file.getAbsolutePath()+".COMPLETED"));}/** 根据地址请求服务,请求成功则返回true*/public static boolean connectSuccess(String path){URL url;try {url = new URL(noticeUrl);HttpURLConnection con = (HttpURLConnection) url.openConnection();if(con.getResponseCode()==200) return true;} catch (Exception e) {return false;}return false;} }
配置文件编写customer2kafka.conf
ip=192.168.1.91 threadnum=20 port=9092 topic=customertopic path=/home/ftpuser/customer includesuffix=txt lineregex=^#\d.*$ delimiter=\s+ noticeurl=http://192.168.1.92:6009/schedule/customer fieldsquence=id,name,score
maven打包执行:
java -jar file2kafka-2.0.jar /opt/app/file2kafka/customer2kafka.conf
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.shenyuchong</groupId>
<artifactId>file2kafka</artifactId>
<version>2.0</version>
<packaging>jar</packaging>
<name>file2hive</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version><!--$NO-MVN-MAN-VER$ -->
</dependency>
</dependencies>
<build>
<sourceDirectory>src</sourceDirectory>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.gbd.App</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>assembly</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
转载于:https://www.cnblogs.com/shenyuchong/p/11454506.html
采集文件到kafka相关推荐
- 1. 使用 fluent-bit 采集文件
1. 使用 fluent-bit 采集文件 简介 Fluent Bit是一款快速.灵活的日志处理器,旨在收集.解析.过滤日志,并将日志发送到远程数据库,以便执行数据分析. 数据分析通常发生在数据存储和 ...
- Flume使用Spooling Directory Source采集文件夹数据到hdfs
一.需求说明 flume监控linux上一个目录(/home/flume_data)下进入的文件,并写入hdfs的相应目录下(hdfs://master:9000/flume/spool/%Y%m%d ...
- Flume04:【案例】使用Flume采集文件内容上传至HDFS
案例:采集文件内容上传至HDFS 接下来我们来看一个工作中的典型案例: 采集文件内容上传至HDFS 需求:采集目录中已有的文件内容,存储到HDFS 分析:source是要基于目录的,channel建议 ...
- 【采集层】Kafka 与 Flume 如何选择
2019独角兽企业重金招聘Python工程师标准>>> 采集层 主要可以使用Flume, Kafka两种技术. Flume:Flume 是管道流方式,提供了很多的默认实现,让用户通过 ...
- Flume采集日志到Kafka经典案例
环境准备: 涉及到的技术有flume,Kafka,zookeeper. 操作步骤: 1.构建agent train.sources=trainSource train.channels=trainCh ...
- Kubernetes日志收集:log-pilot采集日志到kafka
1.log-pilot配置环境变量 需要修改log-pilot环境变量的两个参数 LOGGING_OUTPUT的值配置为kafka 若将日志采集到kafka,则需要新增KAFKA_BROKERS环境变 ...
- Flume采集文件内容上传至HDFS
需求:采集目录中已有的文件内容,存储到HDFS 分析:source是要基于目录的,channel建议使用file,可以保证不丢数据,sink使用hdfs 下面要做的就是配置Agent了,可以把exam ...
- hdfs文件写入kafka集群
1. 场景描述 因新增Kafka集群,需要将hdfs文件写入到新增的Kafka集群中,后来发现文件不多,就直接下载文件到本地,通过Main函数写入了,假如需要部署到服务器上执行,需将文件读取这块稍做修 ...
- php curl 采集文件,curl获取远程文件内容
/** 获取远程文件内容 @param $url 文件http地址 */ function fopen_url($url) { if (function_exists('file_get_conten ...
最新文章
- doc2vec介绍和实践
- 选择排序算法,只需这篇文章就够了
- 使用RMAN备份控制文件(control file)和系统参数文件(spfile)
- 2017级面向对象程序设计——团队作业1
- 聊一聊声明式接口调用与Nacos的结合使用
- 让ASP.NET Core支持GraphQL之-GraphQL的实现原理
- python多线程和异步性能对比_python对比线程,进程,携程,异步,哪个快
- 徐州医科大学计算机报名,2019年徐州医科大学计算机等级考试准考证打印
- 官方中文文档上线了!Python各种教程已汉化。
- PyTorch入门-深度学习回顾和PyTorch简介
- 通过的镜像源安装python包
- java不同项目加token访问_利用JWT实现前后端分离的Token验证
- 100个模具设计常用基本知识,你懂得几个?设计师干货
- 【转】中华吸血鬼分析
- 老男孩Python高级全栈开发工程师【高清全套完整】
- ABB机器人示教器上人机界面的功能
- 信息系统项目干系人管理
- 为佳作喝彩: Google Play 2022 年度中国开发者最佳榜单
- 解决The following packages have unmet dependencies问题!!!
- 怎么切换双显示屏的左右显示器