采集指定目录下文本数据到kafka

package com.shenyuchong;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Date;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.regex.Pattern;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
/** 用途:*         用于收集多个文件夹下文件内容到kafka;*         文件一行一行发送;*         支持发送完成后发出通知*         文件发送完成后会将文件添加.COMPLETED后缀*         支持采集指定后缀(多个)*         支持对行进行正则,不匹配的行丢弃*         仅支持对行进行分隔符切分*         支持将切分后的字段按新分隔符重组* * 用法:*         mvn package打包成jar包:*             file2kafka-2.0.jar*         编写配置文件xxx.conf内容如下:*             ip=192.168.1.91*             threadnum=20*             port=9092*             topic=customertopic*             path=/home/ftpuser/customer*             includesuffix=txt*             lineregex=^#\d.*$*             delimiter=\s+*             noticeurl=http://192.168.1.92:6009/schedule/customer*             fieldsquence=id,name,score*         执行:*             java -jar file2kafka-2.0.jar xxx.conf*         建议:用linux crontab进行定时执行(对同一个目录进行多次采集不会造成数据重复发送)*/
public class App {public static String fieldSquence = "";public static int    fieldNum = 0;public static String ip = "";public static String port = "";public static String path = "";public static String threadNum = "5";public static String topic = "";public static String lineRegex = "^.*$";public static String delimiter = "\\s+";public static String delimiter2 = "|||";public static String includeSuffix = "aSuffix,bSuffix";public static Pattern linePattern =null;public static Properties props =null;public static String noticeUrl;public static void main(String[] args) {/** 配置文件若不存在则抛出异常*/if(args.length<1){try {throw new Exception("无配置文件");} catch (Exception e) {e.printStackTrace();}}try {BufferedReader br = new BufferedReader(new FileReader(new File(args[0])));String line="";while((line=br.readLine())!=null){line = line.replaceAll("\\s+", "");if(line.indexOf("=")!=-1){String[] kv=line.split("=");String k= kv[0];String v= kv[1];if (k.equals("port"))          port = v;            //kafka 端口if (k.equals("ip"))            ip = v;              //kafka 主机地址if (k.equals("topic"))         topic = v;           //kafka 主题if (k.equals("fieldsquence"))  fieldSquence = v;    //字段序列,逗号隔开if (k.equals("threadnum"))     threadNum = v;       //采集线程数if (k.equals("path"))          path = v;            //采集的目录,多目录逗号隔开if (k.equals("lineregex"))     lineRegex=v;         //行正则,不匹配的行数据丢弃if (k.equals("delimiter"))     delimiter=v;         //字段分隔符if (k.equals("delimiter2"))    delimiter2=v;        //重组分隔符(发送到Kafka)if (k.equals("includesuffix")) includeSuffix=v;     //包含文件的后缀if (k.equals("noticeurl"))     noticeUrl=v;         //采集完成通知的接口
}}br.close();} catch (IOException e1) {e1.printStackTrace();}/** kafka配置*/props = new Properties();props.put("bootstrap.servers", ip+":"+port);props.put("acks", "all");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");/** 将字段序列按逗号分隔,并获取字段序数目*/fieldNum = fieldSquence.split(",").length;/** 行数据正则Pattern*/linePattern= Pattern.compile(lineRegex);/** 线程池*/ExecutorService es = Executors.newFixedThreadPool(Integer.valueOf(threadNum));/** 根据path目录获取文件* 根据includesuffix选中文件调用send(file)* 每个文件创建一个线程(线程实际总数由threadNum决定)*/for(String path:path.split(",")){File dir=new File(path);File[] files = dir.listFiles();for(final File file:files){for(String suffix:includeSuffix.split(",")){if(file.getAbsolutePath().endsWith(suffix)){es.submit(new Runnable() {@Overridepublic void run() {send(file);                            }});}}}}/** 关闭线程池*/es.shutdown();/** 线程池停止后通知后续服务直到后续服务接受了请求*/boolean stop=false,noticed=false;try {while(!stop||!noticed){if (es.isTerminated()) { stop=true;} Thread.sleep(2000);if(stop){noticed = connectSuccess(noticeUrl);}}} catch (Exception e) {e.printStackTrace();}}/** 读取文件并发送到kafka,文件内容发送完成后将文件添加.COMPLETED后缀*/public static void send(File file){BufferedReader bf =null;StringBuffer sb = null;try {            bf = new BufferedReader(new FileReader(file));String line = null;Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());while((line = bf.readLine())!=null){sb = new StringBuffer();line = line.trim();if(linePattern.matcher(line).matches()){String[] fields = line.split(delimiter);if(fields.length<fieldNum){}else{for(String fieldValue:fields)sb.append(fieldValue).append(delimiter2);sb.append(file.getAbsolutePath());producer.send(new ProducerRecord<String, String>(topic, String.valueOf((new Date()).getTime()), sb.toString()),new Callback() {@Overridepublic void onCompletion(RecordMetadata rm, Exception e) {if(e!=null)System.out.println("send fail"+rm.toString()+",e:"+e.getMessage());}});}}else{}}producer.close();} catch (Exception e) {System.out.println(e.toString());}finally {if(bf!=null)try {bf.close();} catch (Exception e) {e.printStackTrace();}}file.renameTo(new File(file.getAbsolutePath()+".COMPLETED"));}/** 根据地址请求服务,请求成功则返回true*/public static boolean connectSuccess(String path){URL url;try {url = new URL(noticeUrl);HttpURLConnection con = (HttpURLConnection) url.openConnection();if(con.getResponseCode()==200) return true;} catch (Exception e) {return false;}return false;}
}

  配置文件编写customer2kafka.conf

ip=192.168.1.91
threadnum=20
port=9092
topic=customertopic
path=/home/ftpuser/customer
includesuffix=txt
lineregex=^#\d.*$
delimiter=\s+
noticeurl=http://192.168.1.92:6009/schedule/customer
fieldsquence=id,name,score

  maven打包执行:

java -jar file2kafka-2.0.jar /opt/app/file2kafka/customer2kafka.conf

  pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.shenyuchong</groupId>
<artifactId>file2kafka</artifactId>
<version>2.0</version>
<packaging>jar</packaging>

<name>file2hive</name>
<url>http://maven.apache.org</url>

<properties>
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
  <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.0.0</version><!--$NO-MVN-MAN-VER$ -->
  </dependency>
</dependencies>
<build>
  <sourceDirectory>src</sourceDirectory>
  <plugins>
    <plugin>
      <artifactId>maven-assembly-plugin</artifactId>
      <configuration>
        <appendAssemblyId>false</appendAssemblyId>
        <descriptorRefs>
          <descriptorRef>jar-with-dependencies</descriptorRef>
        </descriptorRefs>
        <archive>
          <manifest>
            <mainClass>com.gbd.App</mainClass>
          </manifest>
        </archive>
      </configuration>
      <executions>
        <execution>
        <id>make-assembly</id>
        <phase>package</phase>
        <goals>
          <goal>assembly</goal>
        </goals>
      </execution>
    </executions>
</plugin>

</plugins>
</build>
</project>

转载于:https://www.cnblogs.com/shenyuchong/p/11454506.html

采集文件到kafka相关推荐

  1. 1. 使用 fluent-bit 采集文件

    1. 使用 fluent-bit 采集文件 简介 Fluent Bit是一款快速.灵活的日志处理器,旨在收集.解析.过滤日志,并将日志发送到远程数据库,以便执行数据分析. 数据分析通常发生在数据存储和 ...

  2. Flume使用Spooling Directory Source采集文件夹数据到hdfs

    一.需求说明 flume监控linux上一个目录(/home/flume_data)下进入的文件,并写入hdfs的相应目录下(hdfs://master:9000/flume/spool/%Y%m%d ...

  3. Flume04:【案例】使用Flume采集文件内容上传至HDFS

    案例:采集文件内容上传至HDFS 接下来我们来看一个工作中的典型案例: 采集文件内容上传至HDFS 需求:采集目录中已有的文件内容,存储到HDFS 分析:source是要基于目录的,channel建议 ...

  4. 【采集层】Kafka 与 Flume 如何选择

    2019独角兽企业重金招聘Python工程师标准>>> 采集层 主要可以使用Flume, Kafka两种技术. Flume:Flume 是管道流方式,提供了很多的默认实现,让用户通过 ...

  5. Flume采集日志到Kafka经典案例

    环境准备: 涉及到的技术有flume,Kafka,zookeeper. 操作步骤: 1.构建agent train.sources=trainSource train.channels=trainCh ...

  6. Kubernetes日志收集:log-pilot采集日志到kafka

    1.log-pilot配置环境变量 需要修改log-pilot环境变量的两个参数 LOGGING_OUTPUT的值配置为kafka 若将日志采集到kafka,则需要新增KAFKA_BROKERS环境变 ...

  7. Flume采集文件内容上传至HDFS

    需求:采集目录中已有的文件内容,存储到HDFS 分析:source是要基于目录的,channel建议使用file,可以保证不丢数据,sink使用hdfs 下面要做的就是配置Agent了,可以把exam ...

  8. hdfs文件写入kafka集群

    1. 场景描述 因新增Kafka集群,需要将hdfs文件写入到新增的Kafka集群中,后来发现文件不多,就直接下载文件到本地,通过Main函数写入了,假如需要部署到服务器上执行,需将文件读取这块稍做修 ...

  9. php curl 采集文件,curl获取远程文件内容

    /** 获取远程文件内容 @param $url 文件http地址 */ function fopen_url($url) { if (function_exists('file_get_conten ...

最新文章

  1. doc2vec介绍和实践
  2. 选择排序算法,只需这篇文章就够了
  3. 使用RMAN备份控制文件(control file)和系统参数文件(spfile)
  4. 2017级面向对象程序设计——团队作业1
  5. 聊一聊声明式接口调用与Nacos的结合使用
  6. 让ASP.NET Core支持GraphQL之-GraphQL的实现原理
  7. python多线程和异步性能对比_python对比线程,进程,携程,异步,哪个快
  8. 徐州医科大学计算机报名,2019年徐州医科大学计算机等级考试准考证打印
  9. 官方中文文档上线了!Python各种教程已汉化。
  10. PyTorch入门-深度学习回顾和PyTorch简介
  11. 通过的镜像源安装python包
  12. java不同项目加token访问_利用JWT实现前后端分离的Token验证
  13. 100个模具设计常用基本知识,你懂得几个?设计师干货
  14. 【转】中华吸血鬼分析
  15. 老男孩Python高级全栈开发工程师【高清全套完整】
  16. ABB机器人示教器上人机界面的功能
  17. 信息系统项目干系人管理
  18. 为佳作喝彩: Google Play 2022 年度中国开发者最佳榜单
  19. 解决The following packages have unmet dependencies问题!!!
  20. 怎么切换双显示屏的左右显示器

热门文章

  1. 调整分区后盘符丢失的资料怎么寻回
  2. linux下的连接文件——软连接和硬连接的区别
  3. maven上传本地仓库
  4. 博为峰Java技术题 ——JavaSE 类加载器Ⅰ
  5. 程序局部性原理的一些思考
  6. 大厂php怎么做前端,大厂前端经典面试问题精选(附答案)
  7. Ubuntu解决gedit warning问题的方法
  8. Ubuntu配置远程访问的xrdp协议和teamviewer软件
  9. Leetcode 257. 二叉树的所有路径 解题思路及C++实现
  10. 1.6 字符串的比较