kafka准备工作:

开启zookeeper服务和kafka服务

zkServer.sh startkafka-server-start.sh /opt/soft/kafka211/config/server.properties

创建副本为1,分区为4的topic:mydemo1

kafka-topics.sh --zookeeper 192.168.181.132:2181 --create --topic mydemo1 --replication-factor 1 --partitions 4

JAVA开多线程写入数据:

接口:

package org.example.commons;import java.util.Map;public interface Dbinfo {public String getIp();public int getPort();public String getDbName();public Map<String,String>getOther();}

kafka配置类:

package org.example.commons;import java.util.Map;public class KafkaConfiguration implements Dbinfo
{private String ip;private int port;private String Dbname;public void setIp(String ip) {this.ip = ip;}public void setPort(int port) {this.port = port;}public String getDbname() {return Dbname;}public void setDbname(String dbname) {this.Dbname = dbname;}@Overridepublic String getIp() {return this.ip;}@Overridepublic int getPort() {return this.port;}@Overridepublic String getDbName() {return this.Dbname;}@Overridepublic Map<String, String> getOther() {return null;}
}

kafkaConnector类:

package org.example.commons;import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.serialization.StringSerializer;import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.LineNumberReader;
import java.util.*;
import java.util.concurrent.ExecutionException;public class KafkaConnector {private Dbinfo info;private int totalRow;private List<Long> rowSize = new ArrayList<>();Properties prop = new Properties();public KafkaConnector(Dbinfo info) {this.info = info;prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,info.getIp()+":"+info.getPort());prop.put(ProducerConfig.ACKS_CONFIG,"all");prop.put(ProducerConfig.RETRIES_CONFIG,"0");prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getTypeName());prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getTypeName());}public void sendMessage(String path) throws FileNotFoundException {getFileinfo(path);int psize = getTopicPartitionNumber();Map<Long, Integer> threadParams = calcPosAndRow(psize);int thn = 0;for (Long key : threadParams.keySet()) {new CustomerKafkaProducer(prop,path,key,threadParams.get(key),info.getDbName(),thn+"").start();thn++;}}private Map<Long,Integer> calcPosAndRow(int partitionnum){Map<Long,Integer> result = new HashMap<>();int row = totalRow/partitionnum;for (int i = 0; i < partitionnum; i++) {if (i==(partitionnum-1)) {result.put(getPos(row*i+1),row+totalRow%partitionnum);}else{result.put(getPos(row*i+1),row);}}return result;}private Long getPos(int row){return rowSize.get(row-1)+(row-1);}private int getTopicPartitionNumber(){AdminClient client = KafkaAdminClient.create(prop);DescribeTopicsResult result = client.describeTopics(Arrays.asList(info.getDbName()));KafkaFuture<Map<String, TopicDescription>> kf = result.all();int num = 0;try {num=kf.get().get(info.getDbName()).partitions().size();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}return num;}private void getFileinfo(String path) throws FileNotFoundException {LineNumberReader reader = new LineNumberReader(new FileReader(path));try {String str = null;int total = 0;while((str=reader.readLine())!=null){total+=str.getBytes().length+1;rowSize.add((long)total);}totalRow= reader.getLineNumber();rowSize.add(0,0L);} catch (IOException e) {e.printStackTrace();}finally {try {reader.close();} catch (IOException e) {e.printStackTrace();}}}
}

simplePartition类:

package org.example.commons;import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;import java.util.Map;public class SimplePartitioner implements Partitioner
{@Overridepublic int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {return Integer.parseInt(o.toString());}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}

kafkaProducer类:

package org.example.commons;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.Properties;public class CustomerKafkaProducer extends Thread  {private Properties prop;private String path;private long pos;private int rows;private String topics;public CustomerKafkaProducer(Properties prop, String path, long pos, int rows, String topics,String threadName) {this.prop = prop;this.path = path;this.pos = pos;this.rows = rows;this.topics = topics;this.setName(threadName);}@Overridepublic void run() {prop.put("partitioner.class",SimplePartitioner.class.getTypeName());KafkaProducer producer = new KafkaProducer(prop);try {RandomAccessFile raf = new RandomAccessFile(new File(path), "r");raf.seek(pos);for (int i = 0; i < rows; i++) {String ln= new String(raf.readLine().getBytes("iso-8859-1"),"UTF-8");ProducerRecord pr = new ProducerRecord(topics, Thread.currentThread().getName(),ln);producer.send(pr);}producer.close();raf.close();} catch (IOException e) {}}
}

App运行类:

package org.example.commons;import java.io.FileNotFoundException;public class App {public static void main(String[] args) throws FileNotFoundException {KafkaConfiguration kc = new KafkaConfiguration();kc.setIp("192.168.181.132");kc.setPort(9092);kc.setDbname("mydemo1");new KafkaConnector(kc).sendMessage("D:\\log_2020-01-01.log");}
}

运行App类,然后去kafka查看topic中是否写入数据

kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.181.132:9092 --topic mydemo1 --time -1

本地D盘中的log日志一共一万条数据,被四条线程写入了四个分区中:

如果需要把以上的代码打成jar包执行:

写一个Myexecute类:

public class Myexecute {public static void main(String[] args) throws Exception {if (args.length!=0) {KafkaConfiguration kc = new KafkaConfiguration();kc.setIp(args[0]);kc.setPort(Integer.parseInt(args[1]));kc.setDbname(args[2]);new KafkaConnector(kc).sendkafka(args[3]);}else{throw new Exception("The arguments need 4 but"+args.length);}}
}

pom文件中修改builder内容,可以同时打胖包和瘦包:

  <build><finalName>mykafka</finalName><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>2.3.2</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><mainClass>org.njbaqn.common.Myexecute(写自己的主类名称)</mainClass></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>

maven–>Lifecycle–>clean–>package

把生成的jar包复制到D盘下,运行cmd

java -jar mykafka-jar-with-dependencies.jar 192.168.181.132 9092 mydemo1 D:\log_2020-01-01.log

执行之后可以用kafka命令实时监控写入的信息:

kafka-console-consumer.sh --bootstrap-server 192.168.181.132.9092 --topic mydemo1 --from-beginning

JAVA多线程向kafka的topic各分区中写入本地数据相关推荐

  1. java cassandra连接池_Cassandra Java驱动程序的最佳设置只能写入本地数据中心

    我最近开始为我们的Cassandra用例使用Datastax Java驱动程序-我们将使用Datastax Java驱动程序读取/写入Cassandra - 我成功地可以使用Datastax Java ...

  2. kafka的topic和分区策略——log entry和消息id索引文件

    Topic在逻辑上可以被认为是一个在的queue,每条消费都必须指定它的topic,可以简单理解为必须指明把这条消息放进哪个queue里. 为了使得Kafka的吞吐率可以水平扩展,物理上把topic分 ...

  3. 【kafka运维】分区副本重分配、数据迁移、副本扩缩容 (附教学视频)

    日常运维.问题排查=> 滴滴开源LogiKM一站式Kafka监控与管控平台 (后续的视频会在 公众号[首发].CSDN.B站等各平台同名号[石臻臻的杂货铺]上上传 ) 分区副本重分配+注意事项+ ...

  4. java多线程同步的四种方法_java中实现多线程的两种方法

    java多线程有几种实现方法,都是什么?同步有几种实java中多线程的实现方法有两种:1.直接继承thread类:2.实现runnable接口:同步的实现方法有五种:1.同步方法:2.同步代码块:3. ...

  5. Java多线程复习:3(在操作系统中查看和杀死进程线程)

    此博客是记录自己学习过程的记录,仅做参考 Windows下查看和杀死进程线程 查看所有进程 tasklist 查看指定关键字的进程 tasklist | findstr + 关键字 Java程序运行的 ...

  6. Java多线程之常见锁策略与CAS中的ABA问题

    ⭐️前面的话⭐️ 本篇文章将介绍常见的锁策略以及CAS中的ABA问题,前面介绍使用synchronized关键字来保证线程的安全性,本质上就是对对象进行加锁操作,synchronized所加的锁到底是 ...

  7. 用java实现动态排序_关于Java动态分组排序的问题(Android中需要将数据排序给RecyclerView使用)...

    问题描述 现在有这样一个TestNotification里面定义了这些字段 private long time; private String pkg; private String content; ...

  8. Java kafka监控 topic的数据量count情况,每个topic的Summed Recent Offsets(总结最近的偏移量)

    Java使用kafka的API来监控kafka的某些topic的数据量增量,offset,定时查总量之后,然后计算差值,然后就可以算单位间隔的每个topic的增量,kafka监控一般都是监控的吞吐量, ...

  9. kafka:topic为什么要进行分区?副本机制是如何做的?

    kafka为什么要在topic里加入分区的概念?如果没有分区,topic中的segment消息写满后,直接给订阅者不是也可以吗? Kafka可以将主题划分为多个分区(Partition),会根据分区规 ...

最新文章

  1. spark 获取广播变量_Spark流式程序中广播变量和累加器为何使用单例模式
  2. R语言使用trimws函数:trimws函数去除(删除、remove)字符串头尾的空格
  3. Java并发编程:并发容器之CopyOnWriteArrayList(转载)
  4. c++判断正在使用的显卡_7°C警告:廉价硅脂害死显卡系列!还在用¥5块钱一大碗的导热膏吗...
  5. python自动化从零开始_从零开始学Selenium自动化测试:基于Python:视频教学版
  6. redis缓存设计要点随谈
  7. python需要什么包装_python学习之包装与授权
  8. Openshift3.9部署手册
  9. 软工网络15团队作业4-DAY5
  10. 左右侧边栏固定宽,中间宽度自适应
  11. BFC和haslayout(IE6-7)(待总结。。。)
  12. 富文本编辑器 CKeditor 配置使用 (带附件)
  13. Linux程序设计-1-Linux基础
  14. 入门级微单反性能对比
  15. Oracle中Blob转换成Clob
  16. 蔚来智驾功能大更新:与其叫NOP+,不如叫NAD-
  17. 指定IDEA的字符编码
  18. 嵌入式linux智能小车ppt,基于FPGA嵌入式系统的智能小车全面解析
  19. mysql报08s01的错误_MYSQL报08S01的异常
  20. VMware虚拟机修改BIOS启动项

热门文章

  1. mac如何用移动随E行连接有线网络(中国计量大学)
  2. OpenCV的viz库学习(一)
  3. ProcessOn在线画流程图介绍
  4. 文件管理android2.3,华为手机文件管理器(com.huawei.hidisk) - 10.11.11.301 - 应用 - 酷安...
  5. 网桥、交换机、路由器等的区别
  6. 郭麒麟任《最强大脑》见证官,住杭州的台湾人清华学霸吴哲维来了
  7. Lucene.net和盘古分词使用小结
  8. Mongodb副本集部署(Centos7)
  9. 一文看懂 DDD(领域驱动设计)、CQRS和Event Souring与分层架构
  10. openscad螺栓数据生成2