Kafak概述

kafka用于构建实时数据管道的应用中常被用到,可以横向扩展,容错,可以快速的运行在数以千计的公司的产品。可以像消息系统一样读写数据流,可以可靠的处理流相关的应用,可以安全的使用,即,kafka为一个消息中间件。

场景比喻

我。生产者 你,消费者 馒头。数据流,消息 正常情况下,生产一个,消费一个。其他情况:
1)一直生产,你吃到某一个馒头时,你卡主(机器故障), 馒头就丢失了 2)一直生产,做馒头速度快,你吃来不及,馒头也就丢失了 为了放着其他生产情况的出现,我们可以拿个碗/篮子,馒头做好以后先放到篮子里,你要吃的时候去篮子里面取出来吃,而这篮子/框就可以为:Kafka。当篮子满了,馒头就装不下了,咋办?多准备几个篮子 === Kafka的扩容

架构与概念

其中 producer:生产者,就是生产馒头 consumer:消费者,就是吃馒头的(你) broker:篮子 topic:主题,给馒头带一个标签,topica的馒头是给你吃的,topicb的馒头是给你弟弟吃

即 1.Kafka可以作为集群运行在一台或者多个服务器上面;

2.Kafka集群可以分类地存储记录流,以打标签的方式,就是采用topics,每个broker可以打个topic,这样能保证消费者可以根据topic选择性消费;

3.每个记录由Key、Value、timestamp构成。

快速使用

这里使用到了zookeeper

下载Kafka

打开链接https://www.apache.org/dyn/closer.cgi?path=/kafka/2.4.0/kafka_2.12-2.4.0.tgz

下载kafka的tgz安装包,

然后解压并进入kafka路径下,

> tar -xzf kafka_2.12-2.4.0.tgz
> cd kafka_2.12-2.4.0

启动服务

使用Kafka之前需要先启动一个ZooKeeper服务,这里直接使用Kafka中包含的脚本即可, 因为在前面下载的压缩包中已经包含了zookeeper相关的安装包。

> bin/zookeeper-server-start.sh config/zookeeper.properties

启动ZooKeeper服务之后再启动Kafka服务,

> bin/kafka-server-start.sh config/server.properties

需要强调一下,config/server.properties是Kafka的配置文件,可以用于配置监听的host、port、broker等。

默认的ZooKeeper连接服务为localhost:2181,

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

另外,producer和consumer的监听端口为9092,如果需要更改server的host和port端口可以通过修改config/server.properties进行配置。

基本使用

命令行

命令行的使用方式相对简单,通过前面的Kafka配置之后可以直接在命令行下进行使用。

创建Topic

使用Kafka,我们首先需要创建一个Topic,这样后续消息生产者和消息消费者才能针对性的发送和消费数据,

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

这样我们就创建了一个名为test的Topic。

发送消息

启动一个终端A,执行下面命令

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>hello world
>

当执行producer脚本后,会出现消息输入提示符,这是我们可以输入消息(数据),然后它会发送到对应的服务器(Broker)。

消费消息

现在管道中已经有了数据,接下来我就可以使用消费者去读取数据。

另外启动一个终端B,执行下面命令,

> ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
hello world

Java

项目使用spring-boot和和spring-kafka,pom文件如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.jc.demo</groupId><artifactId>jc-demo-kafka</artifactId><version>0.0.1.0-SNAPSHOT</version><packaging>jar</packaging><name>oneapp-archetype-test</name><url>http://www.jevoncode.com</url><properties><!-- Every File in Project Enconding --><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><!-- Compiling Time Enconding --><maven.compiler.encoding>UTF-8</maven.compiler.encoding><!-- Compiling Time JDK Version --><java.version>1.7</java.version><!-- Test --><junit.version>4.12</junit.version><!-- Logging --><slf4j.version>1.7.21</slf4j.version><logback.version>1.1.7</logback.version></properties><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.0.3.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- http://kafka.apache.org/documentation.html#producerapi--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>1.1.0</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>${junit.version}</version><scope>test</scope></dependency><!-- Log依赖 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version></dependency><!-- logback --><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>${logback.version}</version></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-core</artifactId><version>${logback.version}</version></dependency></dependencies></project>

配置文件

application.properties

#kafka地址
jc.kaHost=s1.jevoncode.com:9092,s2.jevoncode.com:9092,s3.jevoncode.com:9092

Topic配置

package com.jc.demo.springboot.config;import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.KafkaAdmin;import java.util.HashMap;
import java.util.Map;/*** 创建kafka的topic* 如果kafka不存在此topic则会自动创建,存在则不改变kafka的topic*/
@Configuration
@EnableKafka
public class TopicConfig {public static final String TOPIC_JC_KAFKA_DEMO = "jc-demo-kafka";@Value("${jc.kaHost}")String kaHost;@Beanpublic KafkaAdmin kafkaAdmin() {Map<String, Object> configs = new HashMap<>();configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kaHost);return new KafkaAdmin(configs);}@Beanpublic NewTopic foo() {//第一个是参数是topic名字,第二个参数是分区个数,第三个是topic的复制因子个数//当broker个数为1个时会创建topic失败,//提示:replication factor: 2 larger than available brokers: 1//只有在集群中才能使用kafka的备份功能//以kafka的分区机制来说,我将其numParitions个数设置为broker个数,也就是3return new NewTopic(TOPIC_JC_KAFKA_DEMO, 3, (short) 2);}//
//    @Bean
//    public NewTopic topic1(){
//        return new NewTopic("jc-demo-kafka2", 10, (short) 2);
//    }
}

启动类

package com.jc.demo.springboot;import com.jc.demo.springboot.config.ApplicationConfig;
import com.jc.demo.springboot.service.DoService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.PropertySource;@SpringBootApplication
@Import({ApplicationConfig.class})
@PropertySource("classpath:application.properties")
public class DemoApplication implements CommandLineRunner{@Autowiredprivate DoService doService;public static void main(String[] args) {SpringApplication.run(DemoApplication.class, args);}@Overridepublic void run(String... args) throws Exception {doService.HelloWorld();}
}

生产者

配置类KafkaProducerConfig

package com.jc.demo.springboot.config;import com.jc.demo.springboot.service.MyListener;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;import java.util.HashMap;
import java.util.Map;@Configuration
@EnableKafka
public class KafkaProducerConfig {@Value("${jc.kaHost}")String kaHost;/* --------------producer configuration-----------------**/@Beanpublic Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kaHost);props.put(ProducerConfig.RETRIES_CONFIG, 0);props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);props.put(ProducerConfig.LINGER_MS_CONFIG, 1);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}@Beanpublic ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}/* --------------kafka template configuration-----------------**/@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());return kafkaTemplate;}}

服务接口和类(生产发送消息)

DoService接口

package com.jc.demo.springboot.service;public interface DoService {void HelloWorld();
}

DoService实现类,调用kafkaTemplate完成发送消息到kafka

package com.jc.demo.springboot.service;import com.jc.demo.springboot.config.TopicConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;import java.math.BigInteger;@Service
public class DoServiceImpl implements  DoService {@AutowiredKafkaTemplate kafkaTemplate;@Overridepublic void HelloWorld() {String phone = "18689206965";while (true) {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}kafkaTemplate.send(TopicConfig.TOPIC_JC_KAFKA_DEMO, partition(phone), phone, "jevoncode" + System.currentTimeMillis());}}/*** 根据手机号计算分区* @return*/private int partition(String phone) {int hash = phone.hashCode();int partition = new BigInteger(Integer.toString(hash)).mod(new BigInteger("3")).intValue();     //由于总共有3个分区,所以得去3的模System.out.println(partition);return partition;}}

此时执行DemoApplication的main方法,就可以生产一个字符串"jevoncode"到kafka的0分区上。可以使用命令查看:

[jevoncode@s1 kafka_2.11-1.1.0]$ ./bin/kafka-console-consumer.sh --bootstrap-server s1.jevoncode.com:9092 --topic jc-demo-kafka --from-beginning
jevoncode1531654603522
jevoncode1531654689283
jevoncode1531654690331
jevoncode1531654691332
jevoncode1531654692332
....

消费者

此时application.properties配置文件需增加消费者配置

#kafka地址
jc.kaHost=s1.jevoncode.com:9092,s2.jevoncode.com:9092,s3.jevoncode.com:9092#############以下是消费者端的配置###########################
#kafka消费者 groupId配置
jc.consumer.group.id=jc-consumer-group-1
#kafka消费者 分区配置,这样就可以指定每个消费者所消费的分区,提高吞吐量
jc.consumer.partitions=0,1,2
#一次从kafka拉的最大消息数
jc.max.poll.records=100

消费者配置类

package com.jc.demo.springboot.listener;import com.jc.demo.springboot.config.TopicConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;import java.util.List;public class KafkaListener {private static Logger logger = LoggerFactory.getLogger(KafkaListener.class);@org.springframework.kafka.annotation.KafkaListener(id = "${jc.consumer.group.id}",topicPartitions =//配置topic和分区{ @TopicPartition(topic = TopicConfig.TOPIC_JC_KAFKA_DEMO, partitions ="#{'${jc.consumer.partitions}'.split(',')}")})public void receive(@Payload List<String> messages,@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,@Header(KafkaHeaders.OFFSET) List<Long> offsets, Acknowledgment ack) {for (int i = 0; i < messages.size(); i++) {String msg = "message='" + messages.get(i) + "' with partition-offset='" + partitions.get(i) + "-" + offsets.get(i) + "'";logger.debug("receive messages {}",msg);}ack.acknowledge();logger.debug("all batch messages {} consumed",messages.size());}}

消费者监听类

package com.jc.demo.springboot.listener;import com.jc.demo.springboot.config.TopicConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;import java.util.List;public class KafkaListener {private static Logger logger = LoggerFactory.getLogger(KafkaListener.class);@org.springframework.kafka.annotation.KafkaListener(id = "${jc.consumer.group.id}",topicPartitions =//配置topic和分区{ @TopicPartition(topic = TopicConfig.TOPIC_JC_KAFKA_DEMO, partitions ="#{'${jc.consumer.partitions}'.split(',')}")})public void receive(@Payload List<String> messages,@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,@Header(KafkaHeaders.OFFSET) List<Long> offsets, Acknowledgment ack) {for (int i = 0; i < messages.size(); i++) {String msg = "message='" + messages.get(i) + "' with partition-offset='" + partitions.get(i) + "-" + offsets.get(i) + "'";logger.debug("receive messages {}",msg);}ack.acknowledge();logger.debug("all batch messages {} consumed",messages.size());}}

运行kafaka

07-15 19:39:25 [jc-consumer-group-1-0-C-1] DEBUG com.jc.demo.springboot.listener.KafkaListener - receive messages message='jevoncode1531654765409' with partition-offset='0-17'
07-15 19:39:25 [jc-consumer-group-1-0-C-1] DEBUG com.jc.demo.springboot.listener.KafkaListener - all batch messages 1 consumed

总结

以上过程完成了基本的kafak的使用

小明菜市场

推荐阅读

● 搭建 | 一步成功搭建Centos + Kubernetes 环境

● 震惊 | 某公司实习生跑路,竟为了学习偷盗面试题

● 警钟 | 还不会Spring Boot集成JWT,你可能错过了大厂的Office了

● 图例 | Java混合模式分析之火焰图实例

● 面试 | 从一个API缓存演化,详细了解Redis各项功能

实践 | kafka 基本使用相关推荐

  1. 实践 | Kafka 不够好,智联招聘基于 Pulsar 打造企业级事件中心

    消息队列作为智联招聘非常重要的平台级服务负责全业务线的消息投递.有很多非常典型的业务场景,我们用一个业务场景简历投递来说明消息队列为业务提供的支持 图 1. 简历投递业务 当 C 端用户发生一次简历投 ...

  2. 大数据最佳实践-kafka

    目录 架构 优缺点 硬件 磁盘存储 kafka机器数 确定topic分区数 如何确定Partition的副本数 同一个组的消费者的数量建议与待消费的Topic下的Partition数保持一致 每个消费 ...

  3. pythonspark实践_基于Python的Spark Streaming Kafka编程实践

    版权声明:本文为CSDN博主原创文章,未经博主允许不得转载. 说明 Spark Streaming的原理说明的文章很多,这里不做介绍.本文主要介绍使用Kafka作为数据源的编程模型,编码实践,以及一些 ...

  4. kafka partition java,kafka中partition数量与消费者对应关系以及Java实践

    kafka中partition数量与消费者对应关系以及Java实践 kafka中partition数量与消费者对应关系以及Java实践 kafka是由Apache软件基金会开发的一个开源流处理平台.k ...

  5. 电影票房数据查询服务高性能与高可用实践

    精选30+云产品,助力企业轻松上云!>>> 点击蓝色"大数据每日哔哔"关注我 加个"星标",第一时间获取大数据架构,实战经验 ▼ 摘要:由于影 ...

  6. 实践 + 理论 | API 接口安全性设计

    这是小小本周的第一篇, 理论 理论主要体现在两个方面,分别是保证数据在传输过程中的安全性,以及数据如何到达服务器端如何获取到数据,如何不被攻击. 数据加密 数据在传输的过程中很容易被抓包,例如通过Ht ...

  7. 分布式消息队列kafka

    文章目录 前言 什么是Kafka? Kafka的特性 Kafka的意义 Kafka工作原理 kafka架构介绍 Producer Broker Consumer Patition Zookeeper ...

  8. 分布式消息中间件应用实践

    背景概述 搜狗商业平台负责搜狗商业广告平台的研发,其广告平台中存在大量的数据,包括广告物料.操作日志.PV 点击.上下线报文等.整个广告平台涉及实时 PV/UV 统计分析.实时安全分析.广告审核.日志 ...

  9. kafka 消费者详解

    前言 读完本文,你将了解到如下知识点: kafka 的消费者 和 消费者组 如何正确使用 kafka consumer 常用的 kafka consumer 配置 消费者 和 消费者组 什么是消费者? ...

最新文章

  1. 安装class-dump
  2. 3.7 感知器-机器学习笔记-斯坦福吴恩达教授
  3. 加到service中无效_Dataway让SpringBoot不需要Controller、Service、DAO、Mapper
  4. 按钮自动居中布局_CSS布局技巧
  5. 21-特征匹配方法(Brute-Force蛮力匹配)
  6. asp.net core 系列 20 EF基于数据模型创建数据库
  7. OO Summary (Homework 5-7)
  8. 计算机组成原理白中英第四章,白中英计算机组成原理第四章答案.ppt
  9. Java爬虫Jsoup篇
  10. 如何找到chromedriver与chrome的对应版本
  11. 多元线性回归模型矩阵推导(手推带矩阵求导法则)
  12. Shell cace条件语句
  13. 1、结构化、面向对象程序设计差别、类基本概念
  14. 数值积分 (一)| 基本思想 + 一般求积公式
  15. 带宽与码元的关系_比特率与带宽什么关系
  16. AngelScript -- C++程序最好的脚本语言
  17. 南昌大学计算机考研2021,2021南昌大学考研参考书目
  18. linux使用df命令
  19. Moto Defy刷机卡M无法进入RSD状态解决方法
  20. 中国人工智能城市排名榜公布,北京、杭州、深圳居前

热门文章

  1. 关于An association from the tablea refers to an unmapped classB
  2. PostgreSQL SQL OUTLINE插件sr_plan (保存、篡改、固定 执行计划)
  3. React Native移动框架功能研究
  4. IE 8 下面的垂直水平居中
  5. oracle 11g(四)给oracle添加为系统服务(脚本)
  6. Classifier4J的中文支持
  7. 某项目网络实施中的几个关键点解析
  8. JVM虚拟机-Class文件之类索引、父类索引和接口索引集合
  9. php域运算符号,PHP中的域运算符号是()。
  10. Payroll Calculation的Process Rule