我们先来看下简单的kafka生产者和消费者模式代码:

生产者KafkaProducer

/**
 * @author xiaofeng
 * @version V1.0
 * @title: KafkaProducer.java
 * @package: com.yingda.xsignal.app.test
 * @description: kafka生产者demo
 * @date 2018/4/4 0004 上午 11:20
 */
public class KafkaProducer extends Thread {private String topic;

    public KafkaProducer(String topic) {super();
        this.topic = topic;
    }@Override
    public void run() {Producer producer = createProducer();
        int i = 0;
        while (true) {String msg = "message";
            producer.send(new KeyedMessage<Integer, String>(topic, msg + (i++)));
            try {TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {e.printStackTrace();
            }}}private Producer createProducer() {Properties properties = new Properties();
        //声明zk
        properties.put("zookeeper.connect", "10.0.2.22:2181");
        properties.put("serializer.class", StringEncoder.class.getName());
        properties.put("metadata.broker.list", "10.0.2.22:9092");
        properties.put("batch.size", 4096);
        return new Producer<Integer, String>(new ProducerConfig(properties));
    }public static void main(String[] args) {new KafkaProducer("TEST_TOPIC").start();
    }}  

消费者KafkaConsumer

/**
 * @author xiaofeng
 * @version V1.0
 * @title: KafkaConsumer.java
 * @package: com.yingda.xsignal.app.test
 * @description: 单线程消费模式
 * @date 2018/4/4 0004 上午 11:18
 */
public class KafkaConsumer extends Thread {private String topic;

    public KafkaConsumer(String topic) {super();
        this.topic = topic;
    }@Override
    public void run() {ConsumerConnector consumer = createConsumer();
        Map<String, Integer> topicCountMap = Maps.newHashMap();
        // 一次从主题中获取一个数据
        topicCountMap.put(topic, 1);
        Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
        // 获取每次接收到的这个数据
        KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);
        ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
        while (iterator.hasNext()) {String message = new String(iterator.next().message());
            System.out.println("接收到: " + message);
        }}private ConsumerConnector createConsumer() {Properties properties = new Properties();
//        //声明zk
        properties.put("zookeeper.connect", "10.0.2.22:2181");
//        // 消费组
        properties.put("group.id", "test-group");
        properties.put("key.deserializer", StringDeserializer.class);
        properties.put("value.deserializer", StringDeserializer.class);
        return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
    }public static void main(String[] args) {new KafkaConsumer("TEST_TOPIC").start();
    }

分别启动producer 和consumer

查看消费者控制台:

虽然已成功生产和消费,但是这种消费模式很明显是单个topic和单线程的形式,那么如果我一次性要订阅多个topic 而且需要多线程消费该怎样做呢?接下来让我们一探究竟吧!

构建多线程消费KafkaConsumer

/**
 * @author xiaofeng
 * @version V1.0
 * @title: OrderBackConsumer.java
 * @package: com.yingda.xsignal.app.consumer
 * @description: 订单备份消费者
 * @date 2018/3/16 0016 下午 4:46
 */
public class OrderBackConsumer extends BaseSpringApp {protected static final Logger logger = LoggerFactory.getLogger(OrderBackConsumer.class);

    private final ConsumerConnector consumer;
    private final String signalTopic = "SIGNAL_ORDERINFO";
    private final String followTopic = "FOLLOW_ORDERINFO";
    private final String signalHisTopic = "HIS_ORDERINFO";
    private final String followHisTopic = "FOLLOW_HIS_ORDERINFO";

    private ConsumerConfig consumerConfig;

    private static int threadNum = 6;

    /**
     * Set the ThreadPoolExecutor's core pool size.
     */
    private int corePoolSize = 6;
    /**
     * Set the ThreadPoolExecutor's maximum pool size.
     */
    private int maxPoolSize = 200;
    /**
     * Set the capacity for the ThreadPoolExecutor's BlockingQueue.
     */
    private int queueCapacity = 1024;
    /**
     * thread prefix name
     */
    private String ThreadNamePrefix = "kafka-consumer-pool-%d";

    ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat(ThreadNamePrefix).build();

    /**
     * Common Thread Pool
     */
    ExecutorService pool = new ThreadPoolExecutor(corePoolSize, maxPoolSize,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(queueCapacity), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());

    public OrderBackConsumer(String[] args) {super(args, "classpath:app-KafkaConsumer.xml");
        Properties properties = new Properties();
        //开发环境:10.0.2.22:2181
        properties.put("zookeeper.connect", "10.0.2.22:2181");
        // 组名称
        properties.put("group.id", "back_consumer_group");
        properties.put("key.deserializer", StringDeserializer.class);
        properties.put("value.deserializer", StringDeserializer.class);
        consumerConfig = new ConsumerConfig(properties);
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
    }@Override
    public void shutdown() {if (consumer != null) {consumer.shutdown();
        }if (pool != null) {pool.shutdown();
        }try {if (!pool.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS)) {System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");
            }} catch (InterruptedException e) {System.out.println("Interrupted during shutdown, exiting uncleanly");
        }}public void run(int numThreads) {Map<String, Integer> topicCountMap = Maps.newHashMap();
        topicCountMap.put(signalTopic, new Integer(numThreads));
        topicCountMap.put(followTopic, new Integer(numThreads));
        topicCountMap.put(signalHisTopic, new Integer(numThreads));
        topicCountMap.put(followHisTopic, new Integer(numThreads));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        consumerMap.values().stream().forEach(value -> {List<KafkaStream<byte[], byte[]>> streams = value;
            int threadNumber = 0;
            /**
             * 可以为每隔topic创建一个线程池,因为每个topic我设置的partition=6
             * kafka consumer通过增加线程数来增加消费能力,但是需要足够的分区,如目前我设置的partition=6,那么并发可以启动6个线程同时消费)
             * ExecutorService pool = createThreadPool();
             */
            for (final KafkaStream stream : streams) {pool.submit(new KafkaOrderConsumer(stream, threadNumber));
                threadNumber++;
            }});
    }/**
     * 创建线程池
     *
     * @return
     */
    private ExecutorService createThreadPool() {ExecutorService pool = new ThreadPoolExecutor(corePoolSize, maxPoolSize,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(queueCapacity), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
        return pool;
    }public static void main(String[] args) {int threads = 1;
        if (args.length < 1) {threads = threadNum;
        } else {threads = Integer.parseInt(args[0]);
        }OrderBackConsumer example = new OrderBackConsumer(args);
        example.run(threads);

        try {Thread.sleep(Long.MAX_VALUE);
        } catch (InterruptedException ie) {}example.shutdown();
    }
/**
 * @author xiaofeng
 * @version V1.0
 * @title: KafkaOrderConsumer.java
 * @package: com.yingda.xsignal.app.service.impl
 * @description: kafka消费服务
 * @date 2018/3/20 0020 下午 8:03
 */
public class KafkaOrderConsumer implements Runnable {protected static final Logger logger = LoggerFactory.getLogger(KafkaOrderConsumer.class);

    private KafkaStream stream;
    private int threadNumber;

    public KafkaOrderConsumer(KafkaStream stream, int threadNumber) {this.stream = stream;
        this.threadNumber = threadNumber;
    }@Override
    public void run() {ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while (it.hasNext()) {//消费队列内容
            final MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.next();
            try {final byte[] messageBytes = (byte[]) messageAndMetadata.message();
                if (messageBytes != null && messageBytes.length > 0) {String content = new String(messageBytes);
                    logger.info("message:'" + content + "'");
                    /**
                     * @// TODO: 2018/3/20 0020 消费入库
                     */
                }} catch (Exception e) {logger.error("kafka back order consumer error", e);
            }}logger.info("Shutting down Thread: " + threadNumber);
    }
}

以上代码中,我们创建了一个线程池,线程数为6,因为我设置的partition=6,而且一次性订阅了4个topic(当然这些topic要真实存在哦),测试的时候随便往哪个topic中写数据都可以收到相应的消费数据哦。

java kafka 多线程消费相关推荐

  1. kafka多线程消费

    1.zookeeper集群搭建:zookeeper安装以及使用_燕少༒江湖的博客-CSDN博客_zookeeper 2.kafka集群搭建:kafka集群搭建以及遇到的异常_燕少༒江湖的博客-CSDN ...

  2. 【Kafka笔记】5.Kafka 多线程消费消息

    Kafka多线程消费理解 Kafka Java Consumer设计 Kafka Java Consumer采用的是单线程的设计.其入口类KafkaConsumer是一个双线程的设计,即用户主线程和心 ...

  3. 正确处理kafka多线程消费的姿势

    最近项目开发过程使用kafka作为项目模块间负载转发器,实现实时接收不同产品线消息,分布式准实时消费产品线消息.通过kafka作为模块间的转换器,不仅有MQ的几大好处:异步. 解耦. 削峰等几大好处, ...

  4. kafka 多线程消费

    一. 1.Kafka的消费并行度依赖Topic配置的分区数,如分区数为10,那么最多10台机器来并行消费(每台机器只能开启一个线程),或者一台机器消费(10个线程并行消费).即消费并行度和分区数一致. ...

  5. 几种kafka多线程消费方式

    kafka API   https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/Kafka ...

  6. kafka Java客户端之 consumer API 多线程消费消息

    kafka consumer 线程设计 Kafka Java Consumer采用的是单线程的设计.其入口类KafkaConsumer是一个双线程的设计,即用户主线程和心跳线程. 用户主线程,指的是启 ...

  7. Kafka Consumer多线程消费

    概述 OrdinaryConsumer类 ConsumerWorker.java MultiThreadedConsumer.java MultiThreadedRebalanceListener.j ...

  8. kafka消费者(Consumer)端多线程消费的实现方案

    kafka消费者(Consumer)端多线程消费的实现方案 kafka Java consumer设计原理 设计原理 为什么用单线程设计 多线程方案: 方案一: 方案二: 两个方案的优缺点: kafk ...

  9. 【kafka】浅谈Kafka的多线程消费的设计

    1.概述 转载:浅谈Kafka的多线程消费的设计 看原文去... 一.前言 跟RabbitMQ相比,Kafka的分区机制(Partition)使其支持对同一个"队列"分片并行读取, ...

最新文章

  1. Kali Linux 安全渗透教程第五更1.4 安装Kali Linux
  2. 【floyd存字典序路径】【HDU1385】【Minimum Transport Cost】
  3. nginx for discuz 伪静态规则
  4. Java序列化接口Serializable接口的作用总结
  5. 2015级C++第2周实践项目
  6. Eclipse中的控制台
  7. # 异运算_人教版六年级数学下册第29课数的运算(P7680)图文视频辅导
  8. [数据恢复答疑]删除了WINDOWS桌面上的文件,该如何恢复数据
  9. 海康威视mp4html播放器,videoJS 网页视频播放器支持MP4
  10. python UI自动化无界面运行
  11. 最新Apple苹果开发者账号AppleID注册流程
  12. EXCEL长数字显示和转化为文本
  13. 程序员简历模版【A4纸正反两面】(20220511)
  14. 调试python程序---pdb
  15. poj 2152 树形dp(建立消防站)
  16. 为视图或函数指定的列名比其定义中的列多
  17. keystone的详细功能
  18. 初识二维码 第二十讲 二维码解码程序的组件之一 摄像头拍照功能
  19. INS/GPS 制导的 SDB 炸弹投放域计算与分析
  20. 常用坐标系及SuperMap投影转换

热门文章

  1. mysql怎么备份和恢复_如何优雅的备份和恢复Mysql数据库?
  2. 2 image pil 转_pdf转图片,php语言和Java语言的两种方法
  3. 使用MailMessage.AlternateViews时遇到的小问题
  4. 硬链接与软链接的区别
  5. LeetCode每日一题: 缺失数字(No.268)
  6. (效果一)js实现上拉加载
  7. 某些书籍翻译的太屎了,误导人!
  8. iOS 日期格式的转换
  9. int 小数_[LeetCode] 166. 分数到小数
  10. 用计算机画关于科技的画,用计算机鉴识画作