文章目录

  • 19 kafka消息队列
  • 一、kafka介绍
    • 1、消息队列基本介绍
    • 2、常用的消息队列介绍
    • 3、消息队列的应用场景
    • 4、消息队列的两种模式
    • 5、kafka的基本介绍
    • 6、kafka的架构介绍
    • 7、kafka架构内部细节剖析
    • 8、kafka主要组件说明
  • 二、kafka集群环境搭建
    • 1、初始化环境准备
    • 2、下载安装包并上传解压
    • 3、node1服务器修改kafka配置文件
    • 4、安装包分发到其他服务器上面去
    • 5、node2与node3服务器修改配置文件
    • 6、kafka集群启动与停止
  • 三、Kafka集群操作
    • 1.基本命令
    • 2.kafka的JavaAPI操作
      • 1、创建maven工程并添加jar包
      • 2、生产者代码
      • 3、消费者代码
        • 3.1、自动提交offset
        • 3.2、手动提交offset
        • 3.3、消费完每个分区之后手动提交offset
        • 3.4、指定分区数据进行消费
        • 3.5、重复消费与数据丢失
        • 3.6、consumer消费者消费数据流程
      • 4、kafka Streams API开发
        • 第一步:创建一个topic
        • 第二步:开发StreamAPI
        • 第三步:生产数据
        • 第四步:消费数据
    • 3.kafka的log存储以及查询机制
    • 4.kafka当中的数据不丢失机制
    • 5.kafka 压力测试
    • 6.kafka的配置文件说明
  • 四、flume整合kafka
    • 第一步:flume下载地址
    • 第二步:上传解压flume
    • 第三步:配置flume.conf
    • 第四步:启动flume
    • 第五步:消费Kafka内数据
  • 五、CAP理论以及kafka当中的CAP机制
  • 六、kafka监控及运维
    • 1、kafka-eagle概述
    • 2、环境和安装
      • 2.1、环境要求
      • 2.2、安装步骤
        • 2.2.1、下载源码包
        • 2.2.2、解压
        • 2.2.3、准备数据库
        • 2.2.4、修改kafak-eagle配置文件
        • 2.2.5、配置环境变量
        • 2.2.6、启动kafka-eagle
        • 2.2.7、主界面
  • 七、实时看板案例
    • 1、项目需求梳理
    • 2、项目架构模型
    • 3、订单数据模型
    • 4、指标需求
    • 5、kafka 当中的topic创建,以及模拟消息生产程序
    • 6、代码实现
      • 消息生产代码实现
      • 第一步:创建我们的订单实体类
      • 第二步:定义log4j.properties配置文件
      • 第三步:开发日志生产代码
      • 第四步:将程序打包并上传服务器运行
      • 第五步:运行jar包
      • 第六步:开发flume配置文件,实现收集数据到kafka
      • 第七步:kafka启动console控制台,消费数据以验证数据进入kafka
      • 消息消费代码实现
      • 定义redis工具类
      • 开发kafka消费代码

19 kafka消息队列

一、kafka介绍

1、消息队列基本介绍

消息:在应用系统之间,传递的数据叫做消息
队列:排队的模型 先进先出 类似于火车进隧道
消息(Message)是指在应用之间传送的数据,消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。
消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,有消息系统来确保信息的可靠专递,消息发布者只管把消息发布到MQ中而不管谁来取,消息使用者只管从MQ中取消息而不管谁发布的,这样发布者和使用者都不用知道对方的存在

2、常用的消息队列介绍

标注的消息队列实现
RabbitMQ: rabbit message queue
ActiveMQ:支持消息队列当中事务处理
RocketMQ: 阿里开源的消息队列 rocket
消息队列的模型:主要是基于pub/sub publish 、subscribe 发布与订阅模型

kafka:linkedin 公司开源提供的 吞吐量非常高,而且消息的处理速度非常快 大数据领域里面大部分都是使用kafka
kafka不是一个标准的消息队列的实现
消息队列模型:主要是基于push/poll 推送与拉取

3、消息队列的应用场景

消息队列在实际应用中包括如下四个场景:

应用耦合:多应用间通过消息队列对同一消息进行处理,避免调用接口失败导致整个过程失败;异步处理:多应用对消息队列中同一消息进行处理,应用间并发处理消息,相比串行处理,减少处理时间;

 限流削峰:广泛应用于秒杀或抢购活动中,避免流量过大导致应用系统挂掉的情况;

消息驱动的系统:系统分为消息队列、消息生产者、消息消费者,生产者负责产生消息,消费者(可能有多个)负责对消息进行处理;具体场景:用户新上传了一批照片, 人脸识别系统需要对这个用户的所有照片进行聚类,聚类完成后由对账系统重新生成用户的
人脸索引(加快查询)。这三个子系统间由消息队列连接起来,前一个阶段的处理结果放入队列中,
后一个阶段从队列中获取消息继续处理。

该方法有如下优点:
避免了直接调用下一个系统导致当前系统失败;
每个子系统对于消息的处理方式可以更为灵活,可以选择收到消息时就处理,可以选择定时处理,也可以划分时间段按
不同处理速度处理;

4、消息队列的两种模式

点对点(point to point, queue):两个人之间互相通信,都是点对点这种模型

点对点模式特点:

1.每个消息只有一个接收者(Consumer)(即一旦被消费,消息就不再在消息队列中);
2.发送者和接收者间没有依赖性,发送者发送消息之后,不管有没有接收者在运行,都不会影响到发送者下次发送消息;
3.接收者在成功接收消息之后需向队列应答成功,以便消息队列删除当前接收的消息;

发布订阅(publish/subscribe,topic):群聊

发布/订阅模式特点:

1.每个消息可以有多个订阅者;
2.发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。
3.为了消费消息,订阅者需要提前订阅该角色主题,并保持在线运行;

5、kafka的基本介绍

官网:http://kafka.apache.org/
kafka是最初由linkedin公司开发的,使用scala语言编写,kafka是一个分布式,分区的,多副本的,多订阅者的日志系统(分布式MQ系统),可以用于搜索日志,监控日志,访问日志等

kafka是一个分布式的消息队列系统

分布式就是由多个节点组成,一个节点就是一个服务器
在kafka当中节点叫做broker ,一个节点就是一个broker,一个broker就是一个服务器
hadoop当中节点  datanode
hbase当中当中 HMaster以及HRegionServer

kafka的好处

可靠性:分布式的,分区,复制和容错的。
可扩展性:kafka消息传递系统轻松缩放,无需停机。
耐用性:kafka使用分布式提交日志,这意味着消息会尽可能快速的保存在磁盘上,因此它是持久的。
性能:kafka对于发布和定于消息都具有高吞吐量。即使存储了许多TB的消息,他也爆出稳定的性能。
kafka非常快:保证零停机和零数据丢失。
*磁盘顺序读写*

分布式的发布与订阅系统

apache kafka是一个分布式发布-订阅消息系统和一个强大的队列,可以处理大量的数据,并使能够将消息从一个端点传递到
另一个端点,kafka适合离线和在线消息消费。kafka消息保留在磁盘上,并在集群内复制以防止数据丢失。kafka构建在
zookeeper同步服务之上。它与apache和spark非常好的集成,应用于实时流式数据分析。

kafka的主要应用场景
指标分析

kafka   通常用于操作监控数据。这设计聚合来自分布式应用程序的统计信息,   以产生操作的数据集中反馈

日志聚合解决方法

kafka可用于跨组织从多个服务器收集日志,并使他们以标准的合适提供给多个服务器。

流式处理:实时处理 数据从出现到产生,在一秒钟以内能够处理完成
流式计算:程序一旦启动,就会一直运行下去,一旦有数据,就能够马上被处理掉

生产者生产数据到kafka里面去  ,然后通过一些实时处理的框架例如storm或者sparkStreaming或者flink等等
一些实时处理的框架去处理kafka里面的数据

6、kafka的架构介绍

分布式:肯定是多节点,多台服务器,组织到一起形成一个集群

生产者:producer 主要负责生产数据到 topic里面去
topic:虚拟的概念,某一类消息的主题,某一类消息都是存放在某一个topic当中
一个topic有多个partition:一个partition里面有多个segment段,每个segment默认1GB
一个segment: 一个.index文件 + 一个.log文件
.log:存放用户真实的产生的数据
.index 存放的是.log文件的索引数据
消费者:consumer 主要就是消费topic里面的数据
conusmer消费到哪一条数据需要进行记录:offset来进行记录 数据的偏移量 每条数据都有唯一的offset
.index文件:存放的索引文件,用于查找.log文件里面的数据

7、kafka架构内部细节剖析


kafka需要依赖zk保存一些节点信息 kakfa紧耦合zookeeper
kafka当中数据消费的时候,消费者都需要指定属于哪一个消费组
一个消费组里面,可以有多个消费者

消费组:同一时间,一个分区里面的数据,只能被一个消费组里面的一个线程进行消费
调大分区的个数:可以加快数据的消费的速度

任意时刻,一个分区里面的数据,只能被一个消费组里面的一个线程进行消费
kafka当中的数据消费出现延迟:加大消费者线程数量,加大分区的个数

8、kafka主要组件说明

1、kafka当中的producer说明

producer主要是用于生产消息,是kafka当中的消息生产者,生产的消息通过topic进行归类,保存到kafka的
broker里面去

2、kafka当中的topic说明

1、kafka将消息以topic为单位进行归类
2、topic特指kafka处理的消息源(feeds of messages)的不同分类。
3、topic是一种分类或者发布的一些列记录的名义上的名字。kafka主题始终是支持多用户订阅的;也就是说,一 个主题可以有零个,一个或者多个消费者订阅写入的数据。
4、在kafka集群中,可以有无数的主题。
5、生产者和消费者消费数据一般以主题为单位。更细粒度可以到分区级别。

3、kafka当中的partition说明

kafka当中,topic是消息的归类,一个topic可以有多个分区,每个分区保存部分topic的数据,所有的partition当中的数据全部合并起来,就是一个topic当中的所有的数据,
一个broker服务下,是否可以创建多个分区?
可以的,broker数与分区数没有关系; 在kafka中,每一个分区会有一个编号:编号从0开始
每一个分区的数据是有序的
说明-数据是有序 如何保证一个主题下的数据是有序的?(生产是什么样的顺序,那么消费的时候也是什么样的顺序)

topic的Partition数量在创建topic时配置。

Partition数量决定了每个Consumer group中并发消费者的最大数量。

Consumer group A 有两个消费者来读取4个partition中数据;
Consumer group B有四个消费者来读取4个 partition中的数据

partition的个数与线程的个数
partition个数  = 线程的个数  刚刚好,一个线程消费一个分区
partition个数 >  线程的个数  有线程需要去消费多个分区里面的数据
partition个数  < 线程的个数  有线程在闲置

4、kafka当中partition的副本数说明

副本数(replication-factor):控制消息保存在几个broker(服务器)上,一般情况下等于broker的个数

kakfa当中副本的策略:使用isr这种策略来维护一个副本列表isr  synchronize  replication :同步完成的副本列表
主分区:可以有多个副本 ,为了最大程度的同步完成数据,使用多个副本,每个副本都启动线程去复制主分区上面的数据
尽量的保证副本分区当中的数据与主分区当中的数据一致的
如果副本分区当中的数据与主分区当中的数据差别太大,将副本分区移除ISR列表
如果副本分区的心跳时间比较久远,也会将副本分区移除ISR列表

5、kafka当中的segment说明

一个partition当中由多个segment文件组成,每个segment文件,包含两部分,
一个是.log文件,另外一个是.index文件,
其中.log文件包含了我们发送的数据存储,
.index文件,记录的是我们.log文件的数据索引值,以便于我们加快数据的查询速度索引文件与数据文件的关系
比如索引文件中3,497代表:数据文件中的第三个message,它的偏移地址为497。
再来看数据文件中,Message 368772表示:在全局partiton中是第368772个message。注:segment index file采取稀疏索引存储方式,它减少索引文件大小,通过mmap可以直接内存操作,
稀疏索引为数据文件的每个对应message设置一个元数据指针,它比稠密索引节省了更多的存储空间,
但查找起来需要消耗更多的时间。

二、kafka集群环境搭建

1、初始化环境准备

安装jdk,安装zookeeper并保证zk服务正常启动

2、下载安装包并上传解压

通过以下地址进行下载安装包
node1执行以下命令,下载并解压

cd /export/softwares
wget http://archive.apache.org/dist/kafka/1.0.0/kafka_2.11-1.0.0.tgz
tar –zxvf  kafka_2.11-1.0.0.tgz -C /export/servers/

3、node1服务器修改kafka配置文件

node1执行以下命令进入到kafka的配置文件目录,修改配置文件
node1执行以下命令创建数据文件存放目录

mkdir -p  /export/servers/kafka_2.11-1.0.0/logs
cd /export/servers/kafka_2.11-1.0.0/config
vim server.properties
broker.id=0 #
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/export/servers/kafka_2.11-1.0.0/logs #
num.partitions=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=node1:2181,node2:2181,node3:2181 #
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
delete.topic.enable=true #
host.name=node1 #

4、安装包分发到其他服务器上面去

node1执行以下命令,将node1服务器的kafka安装包发送到node2和node3服务器上面去

cd /export/servers/
scp -r kafka_2.11-1.0.0/ node02:$PWD
scp -r kafka_2.11-1.0.0/ node03:$PWD

5、node2与node3服务器修改配置文件

node2使用以下命令修改kafka配置文件(server.properties)

broker.id=1
host.name=node2

node3使用以下命令修改kafka配置文件(server.properties)

broker.id=2
host.name=node3

6、kafka集群启动与停止

注意事项:在kafka启动前,一定要让zookeeper启动起来。
node1执行以下命令将kafka进程启动在后台

cd /export/servers/kafka_2.11-1.0.0
nohup bin/kafka-server-start.sh config/server.properties 2>&1 &

node2执行以下命令将kafka进程启动在后台

cd /export/servers/kafka_2.11-1.0.0
nohup bin/kafka-server-start.sh config/server.properties 2>&1 &

node3执行以下命令将kafka进程启动在后台

cd /export/servers/kafka_2.11-1.0.0
nohup bin/kafka-server-start.sh config/server.properties 2>&1 &

三台机器也可以执行以下命令停止kafka集群

cd /export/servers/kafka_2.11-1.0.0
bin/kafka-server-stop.sh

三、Kafka集群操作

1.基本命令

1、创建topic
创建一个名字为test的主题, 有三个分区,有两个副本
node1执行以下命令来创建topic

cd /export/servers/kafka_2.11-1.0.0
bin/kafka-topics.sh --create --partitions 3 --replication-factor 2 --topic test --zookeeper node1:2181,node2:2181,node3:2181

2、查看主题命令
查看kafka当中存在的主题
node1使用以下命令来查看kafka当中存在的topic主题

cd /export/servers/kafka_2.11-1.0.0
bin/kafka-topics.sh  --list --zookeeper node1:2181,node2:2181,node3:2181

3、生产者生产数据
模拟生产者来生产数据
node1服务器执行以下命令来模拟生产者进行生产数据

cd /export/servers/kafka_2.11-1.0.0
bin/kafka-console-producer.sh --broker-list node1:9092,node2:9092,node3:9092 --topic test

4、消费者消费数据
node2服务器执行以下命令来模拟消费者进行消费数据

cd /export/servers/kafka_2.11-1.0.0
using the new consumer by passing [bootstrap-server] instead of [zookeeper]
bin/kafka-console-consumer.sh --from-beginning --topic test  --zookeeper node01:2181,node02:2181,node03:2181
bin/kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --from-beginning --topic test

5、运行describe topics命令

node1执行以下命令运行describe查看topic的相关信息

cd /export/servers/kafka_2.11-1.0.0
bin/kafka-topics.sh --describe --zookeeper node1:2181 --topic test

6、增加topic分区数
任意kafka服务器执行以下命令可以增加topic分区数

cd /export/servers/kafka_2.11-1.0.0
bin/kafka-topics.sh --zookeeper node1:2181 --alter --topic test --partitions 4


7、增加配置
动态修改kakfa的配置
任意kafka服务器执行以下命令可以增加topic分区数

cd /export/servers/kafka_2.11-1.0.0
bin/kafka-topics.sh --zookeeper node1:2181 --alter --topic test --config flush.messages=1

8、删除配置
动态删除kafka集群配置

cd /export/servers/kafka_2.11-1.0.0
bin/kafka-topics.sh --zookeeper node1:2181 --alter --topic test --delete-config flush.messages

9、删除topic
目前删除topic在默认情况下知识打上一个删除的标记,在重新启动kafka后才删除。如果需要立即删除,则需要在

server.properties中配置:
delete.topic.enable=true
然后执行以下命令进行删除topic
kafka-topics.sh --zookeeper node1:2181--delete --topic test

2.kafka的JavaAPI操作

1、创建maven工程并添加jar包

创建maven工程并添加以下依赖jar包的坐标到pom.xml

<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>1.0.0</version>
</dependency>    <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>1.0.0</version></dependency></dependencies><build><plugins><!-- java编译插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.2</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin></plugins>
</build>

2、生产者代码

package cn.itcast.kafka.demo1;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/*** 向test topic里面发送数据*/
public class MyProducer {public static void main(String[] args) {Properties props = new Properties();//指定Kafka的服务器地址props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");//消息确认机制props.put("acks", "all");//重试机制props.put("retries", 0);//批量发送的大小props.put("batch.size", 16384);//消息的延迟props.put("linger.ms", 1);//消息缓冲区大小props.put("buffer.memory", 33554432);//定义key和value的序列化props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//第四种自定义分区需要添加,自定义类props.put("partitioner.class","cn.itcast.kafka.demo1.MyPartitioner");KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);for (int i=0;i<100;i++){//第一种:既没有指定key,也没有指定分区号,使用轮询的方式kafkaProducer.send(new ProducerRecord<>("test","这是第" + i + "条数据"));//第二种:指定数据key,使用key的hashCode码值来进行分区,一定要注意,key要变化kafkaProducer.send(new ProducerRecord<>("test", "mykey"+i,"这是第" + i + "条数据"));//第三种:指定分区号来进行分区kafkaProducer.send(new ProducerRecord<>("test", 1,"mykey"+i,"这是第" + i + "条数据"));//第四种:自定义分区策略,不需要指定分区号,如果指定了分区号还是会发送到指定的分区kafkaProducer.send(new ProducerRecord<>("test","mykey"+i,"这是第" + i + "条数据"));}kafkaProducer.close();}
}
package cn.itcast.kafka.demo1;import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;import java.util.Map;public class MyPartitioner implements Partitioner {/*** 通过这个方法来自定义我们数据的分区规则* @param topic* @param key* @param keyBytes* @param value* @param valueBytes* @param cluster* @return 返回int值,这个值就是分区号*/@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {return 3;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

kafka的数据的分区策略:
kafka五个分区:由于某种原因 0,1,2三个分区里面的数据太多,3,4分区里面的数据太少。

ProducerRecord源码翻译:
如果指定了分区号,直接将数据发送到指定的分区里面去
如果没有指定分区号,数据带了发送的key,通过key取hashCode决定数据究竟发送到哪一个分区里面去
如果既没有指定分区号,也没有指定数据key,使用 round-robin fashion  轮询策略
如果使用key来作为分区的依据,key一定要是变化的,保证数据发送到不同的分区里面去

分区方式:

第一种:既没有指定key,也没有指定分区号,使用轮询的方式
第二种:指定数据key,使用key的hashCode码值来进行分区,一定要注意,key要变化
第三种:指定分区号来进行分区
第四种:自定义分区策略

3、消费者代码

3.1、自动提交offset

package cn.itcast.kafka.demo2;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays;
import java.util.Properties;public class MyConsumer {public static void main(String[] args) {Properties props = new Properties();//指定Kafka的服务器地址props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");//指定消费组的名字props.put("group.id", "testGroup");//允许程序自动提交offset 提交offset保存到了Kafka当中的一个topic中取props.put("enable.auto.commit", "true");//每隔多长时间提交一次offset的值/*** 157 hello offset 上一秒提交的offset** 287 hello world* 295 abc test 900ms 宕机了怎么办?* 351 hello abc 1000ms** 有可能造成重复消费的一些问题**/props.put("auto.commit.interval.ms", "1000");//定义key和value的序列化props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//定义KafkaConsumerKafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);//订阅test这个topic,去消费这个topic里面的数据kafkaConsumer.subscribe(Arrays.asList("test"));//使用死循环拉取数据while(true){//所有拉取的数据都封装在了ConsumerRecordsConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);//consumerRecord就是我们每一条数据for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {int partition = consumerRecord.partition(); //获取数据对应的分区号String value = consumerRecord.value(); //对应数据值long offset = consumerRecord.offset(); //对应数据的偏移量String key = consumerRecord.key(); //对应数据发送的keySystem.out.println("数据的key为:"+key+"数据的value为:"+value+"数据的offset为:"+offset+"数据的分区为:"+partition);}}}
}

3.2、手动提交offset

package cn.itcast.kafka.demo2;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays;
import java.util.Properties;public class CommitOffsetByHand {public static void main(String[] args) {Properties props = new Properties();//指定Kafka的服务器地址props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");//指定消费组的名字props.put("group.id", "testGroup");//不允许程序自动提交offset,需要我们消费完成后手动提交props.put("enable.auto.commit", "false");//定义key和value的序列化props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//定义KafkaConsumerKafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);//订阅test这个topic,去消费这个topic里面的数据kafkaConsumer.subscribe(Arrays.asList("test"));//使用死循环拉取数据while(true){//所有拉取的数据都封装在了ConsumerRecordsConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);//consumerRecord就是我们每一条数据for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {int partition = consumerRecord.partition(); //获取数据对应的分区号String value = consumerRecord.value(); //对应数据值long offset = consumerRecord.offset(); //对应数据的偏移量String key = consumerRecord.key(); //对应数据发送的keySystem.out.println("数据的key为:"+key+"数据的value为:"+value+"数据的offset为:"+offset+"数据的分区为:"+partition);}//ConsumerRecords 里面的数据全部消费完了,提交offset//使用异步提交的方式不会阻塞程序的消费kafkaConsumer.commitAsync();// kafkaConsumer.commitSync(); 同步的进行提交,消费数据完成后进行提交offset,完成提交后,才能继续下一次消费}}
}

3.3、消费完每个分区之后手动提交offset

package cn.itcast.kafka.demo2;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;import java.util.*;public class CommitPartition {public static void main(String[] args) {Properties props = new Properties();//指定Kafka的服务器地址props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");//指定消费组的名字props.put("group.id", "testGroup");//不允许程序自动提交offset,需要我们消费完成后手动提交props.put("enable.auto.commit", "false");//定义key和value的序列化props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//定义KafkaConsumerKafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);//订阅test这个topic,去消费这个topic里面的数据kafkaConsumer.subscribe(Arrays.asList("test"));while (true){//调用poll方法,获取所有的数据,包含了各分区里面的数据都有ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(3000);//获取到一个topic里面的所有分区Set<TopicPartition> partitions = consumerRecords.partitions();for (TopicPartition topicPartition : partitions) {//获取到一个分区里面的所有数据List<ConsumerRecord<String, String>> records = consumerRecords.records(topicPartition);//long lastOffset=0;for (ConsumerRecord<String, String> record : records) {int partition = record.partition(); //获取数据对应的分区号String value = record.value(); //对应数据值long lastOffset = record.offset(); //对应数据的偏移量String key = record.key(); //对应数据发送的keySystem.out.println("数据的key为:"+key+"数据的value为:"+value+"数据的offset为:"+lastOffset+"数据的分区为:"+partition);}//提交partition的offset值//Map<TopicPartition, OffsetAndMetadata> offsets// 获取分区里面数据的最后一条数据的offset的值//1.集合取值long offset = records.get(records.size() - 1).offset();//2.lastOffsetMap<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataMap = Collections.singletonMap(topicPartition, new OffsetAndMetadata(offset));//处理完成一个分区里面的数据,然后提交offsetkafkaConsumer.commitSync(topicPartitionOffsetAndMetadataMap);}}}
}

3.4、指定分区数据进行消费

package cn.itcast.kafka.demo2;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;import java.util.*;/*** 实现指定分区进行消费**/
public class ConsumerMypartition {public static void main(String[] args) {Properties props = new Properties();//指定Kafka的服务器地址props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");//指定消费组的名字props.put("group.id", "testGroup2");//不允许程序自动提交offset,需要我们消费完成后手动提交props.put("enable.auto.commit", "false");//定义key和value的序列化props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//定义KafkaConsumerKafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);//订阅test这个topic,去消费这个topic里面的数据//Collection<TopicPartition> partitions//创建一个集合,集合的泛型是topicPartitionTopicPartition topicPartition0 = new TopicPartition("test", 0);TopicPartition topicPartition1 = new TopicPartition("test", 1);List<TopicPartition> topicPartitions = Arrays.asList(topicPartition0, topicPartition1);//通过assign方法来注册我们只消费某些分区里面的数据kafkaConsumer.assign(topicPartitions);while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(3000);//获取所有的分区Set<TopicPartition> partitions = consumerRecords.partitions();for (TopicPartition topicPartition : partitions) {//获取到一个分区 里面的数据List<ConsumerRecord<String, String>> records = consumerRecords.records(topicPartition);for (ConsumerRecord<String, String> record : records) {int partition = record.partition(); //获取数据对应的分区号String value = record.value(); //对应数据值long lastOffset = record.offset(); //对应数据的偏移量String key = record.key(); //对应数据发送的keySystem.out.println("数据的key为:"+key+"数据的value为:"+value+"数据的offset为:"+lastOffset+"数据的分区为:"+partition);}//需要提交这个分区的offset值long offset = records.get(records.size() - 1).offset();kafkaConsumer.commitSync(Collections.singletonMap(topicPartition,new OffsetAndMetadata(offset)));}}}
}

3.5、重复消费与数据丢失


kafka的数据消费模型:

 exactly  once:消费且仅消费一次at  least  once:最少消费一次  出现数据重复消费的问题at  most  once : 至多消费一次  出现数据丢失的问题

数据重复消费或者数据丢失的原因造成:offset没有管理好
将offset的值给保存到redis里面去或者hbase里面去
默认的offset保存在哪里??

 可以保存到zk里面去,也可以保存到kafka自带的一个topic里面去  __consumer_offsets

3.6、consumer消费者消费数据流程

高阶API high level API

将offset的值,保存在zk当中了,早期的kafka版本,默认都是使用high level  api进行消费的

低阶API low level API

将offset的值,保存在kafka的一个默认的topic里面了

新的版本都是使用low level API进行消费,将数据的offset保存到一个topic里面去了

4、kafka Streams API开发

kafka新版本的一个流式计算的模块,主要用于流式计算,实时计算
案例:

1.使用kafka-stream API实现获取test这个topic里面的数据,然后写入到test2这个topic里面去,
2.并且将数据小写转换成为大写

第一步:创建一个topic

node1服务器使用以下命令来常见一个topic 名称为test2

cd /export/servers/kafka_2.11-1.0.0/
bin/kafka-topics.sh --create  --partitions 3 --replication-factor 2 --topic test2 --zookeeper node1:2181,node2:2181,node3:2181

第二步:开发StreamAPI

package cn.itcast.kafka.demo1;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/*** 向test topic里面发送数据*/
public class MyProducer {public static void main(String[] args) {Properties props = new Properties();//指定Kafka的服务器地址props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");//消息确认机制props.put("acks", "all");//重试机制props.put("retries", 0);//批量发送的大小props.put("batch.size", 16384);//消息的延迟props.put("linger.ms", 1);//消息缓冲区大小props.put("buffer.memory", 33554432);//定义key和value的序列化props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//        props.put("partitioner.class","cn.itcast.kafka.demo1.MyPartitioner");KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);for (int i=0;i<100;i++){//            kafkaProducer.send(new ProducerRecord<>("test","这是第" + i + "条数据"));kafkaProducer.send(new ProducerRecord<>("test", "mykey"+i,"hello" + i + "world"));
//            kafkaProducer.send(new ProducerRecord<>("test", 1,"mykey"+i,"这是第" + i + "条数据"));//自定义分区策略,不需要指定分区号,如果指定了分区号还是会发送到指定的分区
//            kafkaProducer.send(new ProducerRecord<>("test","mykey"+i,"这是第" + i + "条数据"));}kafkaProducer.close();}
}
package cn.itcast.kafka.demo3;import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStreamBuilder;import java.util.Properties;public class StreamAPI {public static void main(String[] args) {Properties props = new Properties();props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());//通过KStreamBuilder来实现将我们数据进行流式处理KStreamBuilder builder = new KStreamBuilder();builder.stream("test").mapValues(line -> line.toString().toUpperCase()).to("test2");KafkaStreams streams = new KafkaStreams(builder, props);streams.start();}
}

第三步:生产数据

node1执行以下命令,向test这个topic当中生产数据

cd /export/servers/kafka_2.11-1.0.0
bin/kafka-console-producer.sh --broker-list node1:9092,node2:9092,node3:9092 --topic test

第四步:消费数据

node2执行一下命令消费test2这个topic当中的数据

cd /export/servers/kafka_2.11-1.0.0
bin/kafka-console-consumer.sh --from-beginning  --topic test2 --zookeeper node1:2181,node2:2181,node3:2181

3.kafka的log存储以及查询机制

kafka中log日志目录及组成

kafka在我们指定的log.dir目录下,会创建一些文件夹;名字是【主题名字-分区名】所组成的文件夹。
一个topic由多个partition组成的
一个partition里面有多个segment文件段
一个segment里面有两个文件,
.log文件:存放日志数据的文件
.index文件:索引文件
每当.log文件达到1GB的时候,就会产生一个新的segment第一个segment段:
-rw-r--r-- 1 root root 10485760 Jul 26 11:48 00000000000000000000.index
-rw-r--r-- 1 root root     7775 Jul 26 15:54 00000000000000000000.log第二个segment段:
-rw-r--r-- 1 root root 10485760 Jul 26 11:48 00000000000000789546.index
-rw-r--r-- 1 root root     7775 Jul 26 15:54 00000000000000789546.log第三个segment段:
-rw-r--r-- 1 root root 10485760 Jul 26 11:48 00000000000000874569.index
-rw-r--r-- 1 root root     7775 Jul 26 15:54 00000000000000874569.log第四个segment段:
-rw-r--r-- 1 root root 10485760 Jul 26 11:48 00000000000000984561.index
-rw-r--r-- 1 root root     7775 Jul 26 15:54 00000000000000984561.log下一个segment的文件的名字,是上一个segment文件最后一条数据的offset值
查找
654789  offset  在哪个segment文件段里面,文件里里面第多少条数据
折半查找  二分查找 来查找数据的offset究竟在哪一个segment段里面去如果确定了数据的offset在第一个segment里面,怎么继续快速的找到是哪一行数据
.index文件里面存放了一些数据索引值,不会将.log文件里面每一条数据都进行索引,每过一段就索引一次
减少索引文件的大小
索引文件是比较稀疏的,没有将所有的数据都建立索引值 避免索引文件太大offset
157894  在第358行
257894  第 514行
354678  第612行
514789  第 1200行
714895  第1500行还是使用折半查找在segment中寻找数据

kafka的log的寻址机制,背下来
1、
第一步:使用折半查找,找数据属于哪一个segment段
第二步:通过.index文件来查找数据究竟对应哪一条数据

segment段的命名规则:
下一个segment起始的数据name值,是上一个segment文件最后一条数据的offset值

kafka Message的物理结构及介绍

4.kafka当中的数据不丢失机制

kafka当中如何保证数据不丢失:
1、生产者如何保证数据不丢失 使用ack来确认
2、消费者如何保证数据不丢失 使用offset来记录
3、broker如何保证数据不丢失 副本机制

生产者生产数据:同步的发送以及异步发送

同步:发送一批数据给kafka后,等待kafka返回结果
1、生产者等待10s,如果broker没有给出ack相应,就认为失败。
2、生产者重试3次,如果还没有响应,就报错异步:发送一批数据给kafka,只是提供一个回调函数。
1、先将数据保存在生产者端的buffer中。buffer大小是2万条
2、满足数据阈值或者数量阈值其中的一个条件就可以发送数据。
3、发送一批数据的大小是500条producer的buffer缓冲区可以装2W条数据,如果数据一直没有发送出去,
如果buffer满了,我们可以设置,设置生产者阻塞,或者设置清空buffer

Kafka集群:

broker如何保证数据不丢失:使用副本的机制,来同步主分区当中的数据

消费者:优先选择主分区当中的数据进行消费,主分区当中的数据是最完整的

如何记录消费到了哪一条避免重复消费或者数据丢失???
通过offset来进行记录,可以将offset保存到redis或者hbase里面去等等,下次消费的时候就将offset取出来,
去进行消费

5.kafka 压力测试

https://blog.csdn.net/laofashi2015/article/details/81111466

数据的存储,都是存储在磁盘里面了:磁盘文件为什么能够做到速度这么快?
实现每秒过万条数据可以轻松处理?

第一个原因:使用pageCache 页缓存技术
第二个原因:顺序的读写磁盘,顺序的读写磁盘的速度比操作内存更快

6.kafka的配置文件说明

Server.properties配置文件说明

#broker的全局唯一编号,不能重复
broker.id=0#用来监听链接的端口,producer或consumer将在此端口建立连接
port=9092#处理网络请求的线程数量
num.network.threads=3#用来处理磁盘IO的线程数量
num.io.threads=8#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400#接受套接字的缓冲区大小
socket.receive.buffer.bytes=102400#请求套接字的缓冲区大小
socket.request.max.bytes=104857600#kafka运行日志存放的路径
log.dirs=/export/data/kafka/#topic在当前broker上的分片个数
num.partitions=2#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1#segment文件保留的最长时间,超时将被删除
log.retention.hours=1#滚动生成新的segment文件的最大时间
log.roll.hours=1#日志文件中每个segment的大小,默认为1G
log.segment.bytes=1073741824#周期性检查文件大小的时间
log.retention.check.interval.ms=300000#日志清理是否打开
log.cleaner.enable=true#broker需要使用zookeeper保存meta数据
zookeeper.connect=zk01:2181,zk02:2181,zk03:2181#zookeeper链接超时时间
zookeeper.connection.timeout.ms=6000#partion buffer中,消息的条数达到阈值,将触发flush到磁盘
log.flush.interval.messages=10000#消息buffer的时间,达到阈值,将触发flush到磁盘
log.flush.interval.ms=3000#删除topic需要server.properties中设置delete.topic.enable=true否则只是标记删除
delete.topic.enable=true#此处的host.name为本机IP(重要),如果不改,则客户端会抛出:Producer connection to localhost:9092 unsuccessful 错误!
host.name=kafka01advertised.host.name=192.168.140.128

日志数据的处理

#日志清理是否打开
log.cleaner.enable=true#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=8
#日志数据保存的时间  168小时  7天
log.retention.hours=168

kafka当中已经消费掉的数据,没有存在的必要,可以对其进行删除,默认是168小时之后就将过期的segment进行删除掉

控制内存当中的数据多长时间刷新一次到磁盘,或者多少条数据刷新一次到磁盘

#partion buffer中,消息的条数达到阈值,将触发flush到磁盘
log.flush.interval.messages=10000#消息buffer的时间,达到阈值,将触发flush到磁盘
log.flush.interval.ms=3000

producer生产者配置文件说明

#指定kafka节点列表,用于获取metadata,不必全部指定
metadata.broker.list=node1:9092,node2:9092,node3:9092
# 指定分区处理类。默认kafka.producer.DefaultPartitioner,表通过key哈希到对应分区
#partitioner.class=kafka.producer.DefaultPartitioner
# 是否压缩,默认0表示不压缩,1表示用gzip压缩,2表示用snappy压缩。压缩后消息中会有头来指明消息压缩类型,故在消费者端消息解压是透明的无需指定。
compression.codec=none
# 指定序列化处理类
serializer.class=kafka.serializer.DefaultEncoder
# 如果要压缩消息,这里指定哪些topic要压缩消息,默认empty,表示不压缩。
#compressed.topics=# 设置发送数据是否需要服务端的反馈,有三个值0,1,-1
# 0: producer不会等待broker发送ack
# 1: 当leader接收到消息之后发送ack
# -1: 当所有的follower都同步消息成功后发送ack.
request.required.acks=0 # 在向producer发送ack之前,broker允许等待的最大时间 ,如果超时,broker将会向producer发送一个error ACK.意味着上一次消息因为某种原因未能成功(比如follower未能同步成功)
request.timeout.ms=10000# 同步还是异步发送消息,默认“sync”表同步,"async"表异步。异步可以提高发送吞吐量,
也意味着消息将会在本地buffer中,并适时批量发送,但是也可能导致丢失未发送过去的消息
producer.type=sync# 在async模式下,当message被缓存的时间超过此值后,将会批量发送给broker,默认为5000ms
# 此值和batch.num.messages协同工作.
queue.buffering.max.ms = 5000# 在async模式下,producer端允许buffer的最大消息量
# 无论如何,producer都无法尽快的将消息发送给broker,从而导致消息在producer端大量沉积
# 此时,如果消息的条数达到阀值,将会导致producer端阻塞或者消息被抛弃,默认为10000
queue.buffering.max.messages=20000# 如果是异步,指定每次批量发送数据量,默认为200
batch.num.messages=500# 当消息在producer端沉积的条数达到"queue.buffering.max.meesages"后
# 阻塞一定时间后,队列仍然没有enqueue(producer仍然没有发送出任何消息)
# 此时producer可以继续阻塞或者将消息抛弃,此timeout值用于控制"阻塞"的时间
# -1: 无阻塞超时限制,消息不会被抛弃
# 0:立即清空队列,消息被抛弃
queue.enqueue.timeout.ms=-1# 当producer接收到error ACK,或者没有接收到ACK时,允许消息重发的次数
# 因为broker并没有完整的机制来避免消息重复,所以当网络异常时(比如ACK丢失)
# 有可能导致broker接收到重复的消息,默认值为3.
message.send.max.retries=3# producer刷新topic metada的时间间隔,producer需要知道partition leader的位置,以及当前topic的情况
# 因此producer需要一个机制来获取最新的metadata,当producer遇到特定错误时,将会立即刷新
# (比如topic失效,partition丢失,leader失效等),此外也可以通过此参数来配置额外的刷新机制,默认值600000
topic.metadata.refresh.interval.ms=60000

consumer消费者配置详细说明

# zookeeper连接服务器地址
zookeeper.connect=zk01:2181,zk02:2181,zk03:2181
# zookeeper的session过期时间,默认5000ms,用于检测消费者是否挂掉
zookeeper.session.timeout.ms=5000
#当消费者挂掉,其他消费者要等该指定时间才能检查到并且触发重新负载均衡
zookeeper.connection.timeout.ms=10000
# 指定多久消费者更新offset到zookeeper中。注意offset更新时基于time而不是每次获得的消息。一旦在更新zookeeper发生异常并重启,将可能拿到已拿到过的消息
zookeeper.sync.time.ms=2000
#指定消费
group.id=itcast
# 当consumer消费一定量的消息之后,将会自动向zookeeper提交offset信息
# 注意offset信息并不是每消费一次消息就向zk提交一次,而是现在本地保存(内存),并定期提交,默认为true
auto.commit.enable=true
# 自动更新时间。默认60 * 1000
auto.commit.interval.ms=1000
# 当前consumer的标识,可以设定,也可以有系统生成,主要用来跟踪消息消费情况,便于观察
conusmer.id=xxx
# 消费者客户端编号,用于区分不同客户端,默认客户端程序自动产生
client.id=xxxx
# 最大取多少块缓存到消费者(默认10)
queued.max.message.chunks=50
# 当有新的consumer加入到group时,将会reblance,此后将会有partitions的消费端迁移到新  的consumer上,如果一个consumer获得了某个partition的消费权限,那么它将会向zk注册 "Partition Owner registry"节点信息,但是有可能此时旧的consumer尚没有释放此节点, 此值用于控制,注册节点的重试次数.
rebalance.max.retries=5# 获取消息的最大尺寸,broker不会像consumer输出大于此值的消息chunk 每次feth将得到多条消息,此值为总大小,提升此值,将会消耗更多的consumer端内存
fetch.min.bytes=6553600# 当消息的尺寸不足时,server阻塞的时间,如果超时,消息将立即发送给consumer
fetch.wait.max.ms=5000
socket.receive.buffer.bytes=655360
# 如果zookeeper没有offset值或offset值超出范围。那么就给个初始的offset。有smallest、largest、anything可选,分别表示给当前最小的offset、当前最大的offset、抛异常。默认largest
auto.offset.reset=smallest
# 指定序列化处理类
derializer.class=kafka.serializer.DefaultDecoder

四、flume整合kafka

数据采集工具:flume,sqoop

flume三个组件:

source
channel
sink:kafkaSink
使用flume监控一个文件夹,一旦文件夹下面有了数据,就将数据发送到kafka里面去
source  TailDirSource

第一步:flume下载地址

http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.14.0.tar.gz

第二步:上传解压flume

第三步:配置flume.conf

#为我们的source channel  sink起名
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#指定我们的source收集到的数据发送到哪个管道
a1.sources.r1.channels = c1
#指定我们的source数据收集策略
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /export/servers/flumedata
a1.sources.r1.deletePolicy = never
a1.sources.r1.fileSuffix = .COMPLETED
a1.sources.r1.ignorePattern = ^(.)*\\.tmp$
a1.sources.r1.inputCharset = GBK
#指定我们的channel为memory,即表示所有的数据都装进memory当中
a1.channels.c1.type = memory
#指定我们的sink为kafka  sink,并指定我们的sink从哪个channel当中读取数据
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = test
a1.sinks.k1.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1

第四步:启动flume

bin/flume-ng agent --conf conf --conf-file conf/flume.conf --name a1 -Dflume.root.logger=INFO,console

第五步:消费Kafka内数据

bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --from-beginning --topic test

五、CAP理论以及kafka当中的CAP机制

CAP理论:三口锅,但是只有两个锅盖,用于有一口锅盖不住

Consistency:一致性
Availability:可用性
Partition tolerance:分区容错性

在分布式系统当中,这三个指标顶多只能满足其中的两个
所有的分布式系统,都遵循这个CAP定律

hadoop  hbase  redis集群  es  kafka


一致性:研究的是多个节点当中的数据是否一样

强一致性:一旦更新之后,其他的节点可以马上感知得到
弱一致性:一旦更新之后,其他的节点,不用更新
最终一致性:一旦更新之后,最终所有的节点都会更新

可用性:研究的就是服务器是否会给响应
分区容错:研究的就是多个分区实现数据的备份机制

kafka满足的是CAP当中的CA:一致性和可用性
不满足 分区容错性 kafka当中使用ISR尽量的避免分区容错性

ISR列表维护的依据

replica.lag.time.max.ms=10000     副本分区与主分区心跳时间延迟
replica.lag.max.messages=4000    副本分区与主分区消息同步最大差

订单系统:每当有人下单之后,就会打印log4j的日志

六、kafka监控及运维

1、kafka-eagle概述

为了简化开发者和服务工程师维护Kafka集群的工作有一个监控管理工具,叫做 Kafka-eagle。这个管理工具可以很容易地发现分布在集群中的哪些topic分布不均匀,或者是分区在整个集群分布不均匀的的情况。它支持管理多个集群、选择副本、副本重新分配以及创建Topic。同时,这个管理工具也是一个非常好的可以快速浏览这个集群的工具,

2、环境和安装

2.1、环境要求

需要安装jdk,启动zk以及kafka的服务

2.2、安装步骤

2.2.1、下载源码包

kafka-eagle官网:

http://download.kafka-eagle.org/我们可以从官网上面直接下载最细的安装包即可kafka-eagle-bin-1.3.2.tar.gz这个版本即可

2.2.2、解压

这里我们选择将kafak-eagle安装在第三台
直接将kafka-eagle安装包上传到node03服务器的/export/softwares路径下,然后进行解压
node3服务器执行一下命令进行解压

cd /export/softwares/
tar -zxf kafka-eagle-bin-1.3.2.tar.gz -C /export/servers/
cd /export/servers/kafka-eagle-bin-1.3.2
tar -zxf kafka-eagle-web-1.3.2-bin.tar.gz

2.2.3、准备数据库

kafka-eagle需要使用一个数据库来保存一些元数据信息,我们这里直接使用msyql数据库来保存即可,在node3服务器执行以下命令创建一个mysql数据库即可

进入mysql客户端
mysql -uroot -p
create database eagle;

2.2.4、修改kafak-eagle配置文件

node3执行以下命令修改kafak-eagle配置文件
cd /export/servers/kafka-eagle-bin-1.3.2/kafka-eagle-web-1.3.2/conf
vim system-config.properties

kafka.eagle.zk.cluster.alias=cluster1,cluster2
cluster1.zk.list=node1:2181,node2:2181,node3:2181
cluster2.zk.list=node1:2181,node2:2181,node3:2181kafka.eagle.driver=com.mysql.jdbc.Driver
kafka.eagle.url=jdbc:mysql://node3:3306/eagle
kafka.eagle.username=root
kafka.eagle.password=123456

2.2.5、配置环境变量

kafka-eagle必须配置环境变量,node03服务器执行以下命令来进行配置环境变量
vim /etc/profile

export KE_HOME=/export/servers/kafka-eagle-bin-1.3.2/kafka-eagle-web-1.3.2
export PATH=:$KE_HOME/bin:$PATH
让修改立即生效,执行
source /etc/profile

2.2.6、启动kafka-eagle

node3执行以下界面启动kafka-eagle

cd /export/servers/kafka-eagle-bin-1.3.2/kafka-eagle-web-1.3.2/bin
chmod u+x ke.sh
./ke.sh start

2.2.7、主界面

访问kafka-eagle

http://node3:8048/ke/account/signin?/ke/
用户名:admin
密码:123456

七、实时看板案例

1、项目需求梳理

根据订单mq,快速计算双11当天的订单量、销售金额

2、项目架构模型

支付系统+kafka+ redis
1、支付系统发送mq到kafka集群中,编写程序消费kafka的数据并计算实时的订单数量、订单数量
2、将计算的实时结果保存在redis中
3、外部程序访问redis的数据实时展示结果

3、订单数据模型


订单编号、订单时间、支付编号、支付时间、商品编号、商家名称、商品价格、优惠价格、支付金额

4、指标需求

平台运维角度统计指标

平台总销售额度 redisRowKey设计  itcast:order:total:price:date
平台今天下单人数
redisRowKey设计  itcast:order:total:user:date
平台商品销售数量
redisRowKey设计  itcast:order:total:num:date

商品销售角度统计指标

每个商品的总销售额
Redis的rowKey设计itcast:order:productId:price:date
每个商品的购买人数
Redis的rowKey设计itcast:order:productId:user:date
每个商品的销售数量
Redis的rowKey设计itcast:order:productId:num:date

店铺销售角度统计指标

每个店铺的总销售额
Redis的rowKey设计itcast:order:shopId:price:date
每个店铺的购买人数
Redis的rowKey设计itcast:order:shopId:user:date
每个店铺的销售数量
Redis的rowKey设计itcast:order:shopId:num:date

5、kafka 当中的topic创建,以及模拟消息生产程序

1、创建我们的topic

bin/kafka-topics.sh  --create --replication-factor 2 --topic itcast_order --zookeeper node1:2181,node2:2181,node3:2181 --partitions 5

2、创建maven项目并导入必须依赖的jar包

<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>1.0.0</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.41</version></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.9.0</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>cn.itcast.realboard.LogOperate</mainClass></transformer></transformers></configuration></execution></executions></plugin><plugin><artifactId> maven-assembly-plugin </artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><mainClass>cn.itcast.realboard.LogOperate</mainClass></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins>
</build>

6、代码实现

消息生产代码实现

第一步:创建我们的订单实体类

package cn.itcast.realboard;import com.alibaba.fastjson.JSONObject;import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.UUID;public class PaymentInfo {private static final long serialVersionUID = -7958315778386204397L;private String orderId;//订单编号private Date createOrderTime;//订单创建时间private String paymentId;//支付编号private Date paymentTime;//支付时间private String productId;//商品编号private String productName;//商品名称private long productPrice;//商品价格private long promotionPrice;//促销价格private String shopId;//商铺编号private String shopName;//商铺名称private String shopMobile;//商品电话private long payPrice;//订单支付价格private int num;//订单数量/*** <Province>19</Province>* <City>1657</City>* <County>4076</County>*/private String province; //省private String city; //市private String county;//县//102,144,114private String catagorys;public String getProvince() {return province;}public void setProvince(String province) {this.province = province;}public String getCity() {return city;}public void setCity(String city) {this.city = city;}public String getCounty() {return county;}public void setCounty(String county) {this.county = county;}public String getCatagorys() {return catagorys;}public void setCatagorys(String catagorys) {this.catagorys = catagorys;}public PaymentInfo() {}public PaymentInfo(String orderId, Date createOrderTime, String paymentId, Date paymentTime, String productId, String productName, long productPrice, long promotionPrice, String shopId, String shopName, String shopMobile, long payPrice, int num) {this.orderId = orderId;this.createOrderTime = createOrderTime;this.paymentId = paymentId;this.paymentTime = paymentTime;this.productId = productId;this.productName = productName;this.productPrice = productPrice;this.promotionPrice = promotionPrice;this.shopId = shopId;this.shopName = shopName;this.shopMobile = shopMobile;this.payPrice = payPrice;this.num = num;}public String getOrderId() {return orderId;}public void setOrderId(String orderId) {this.orderId = orderId;}public Date getCreateOrderTime() {return createOrderTime;}public void setCreateOrderTime(Date createOrderTime) {this.createOrderTime = createOrderTime;}public String getPaymentId() {return paymentId;}public void setPaymentId(String paymentId) {this.paymentId = paymentId;}public Date getPaymentTime() {return paymentTime;}public void setPaymentTime(Date paymentTime) {this.paymentTime = paymentTime;}public String getProductId() {return productId;}public void setProductId(String productId) {this.productId = productId;}public String getProductName() {return productName;}public void setProductName(String productName) {this.productName = productName;}public long getProductPrice() {return productPrice;}public void setProductPrice(long productPrice) {this.productPrice = productPrice;}public long getPromotionPrice() {return promotionPrice;}public void setPromotionPrice(long promotionPrice) {this.promotionPrice = promotionPrice;}public String getShopId() {return shopId;}public void setShopId(String shopId) {this.shopId = shopId;}public String getShopName() {return shopName;}public void setShopName(String shopName) {this.shopName = shopName;}public String getShopMobile() {return shopMobile;}public void setShopMobile(String shopMobile) {this.shopMobile = shopMobile;}public long getPayPrice() {return payPrice;}public void setPayPrice(long payPrice) {this.payPrice = payPrice;}public int getNum() {return num;}public void setNum(int num) {this.num = num;}@Overridepublic String toString() {return "PaymentInfo{" +"orderId='" + orderId + '\'' +", createOrderTime=" + createOrderTime +", paymentId='" + paymentId + '\'' +", paymentTime=" + paymentTime +", productId='" + productId + '\'' +", productName='" + productName + '\'' +", productPrice=" + productPrice +", promotionPrice=" + promotionPrice +", shopId='" + shopId + '\'' +", shopName='" + shopName + '\'' +", shopMobile='" + shopMobile + '\'' +", payPrice=" + payPrice +", num=" + num +'}';}public String random() throws ParseException {this.orderId = UUID.randomUUID().toString().replaceAll("-", "");this.paymentId = UUID.randomUUID().toString().replaceAll("-", "");this.productPrice = new Random().nextInt(1000);this.promotionPrice = new Random().nextInt(500);this.payPrice = new Random().nextInt(480);this.shopId = new Random().nextInt(200000)+"";this.catagorys = new Random().nextInt(10000)+","+new Random().nextInt(10000)+","+new Random().nextInt(10000);this.province = new Random().nextInt(23)+"";this.city = new Random().nextInt(265)+"";this.county = new Random().nextInt(1489)+"";String date = "2015-11-11 12:22:12";SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");try {this.createOrderTime = simpleDateFormat.parse(date);} catch (ParseException e) {e.printStackTrace();}JSONObject obj = new JSONObject();String jsonString = obj.toJSONString(this);return jsonString;//  return new Gson().toJson(this);}}

第二步:定义log4j.properties配置文件

在项目的src/main/resources路径下创建log4j.properties并进行配置

### 设置###
log4j.rootLogger = debug,stdout,D,E### 输出信息到控制抬 ###
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n### 输出DEBUG 级别以上的日志到=E://logs/error.log ###
log4j.appender.D = org.apache.log4j.DailyRollingFileAppender
#log4j.appender.D.File = E://logs/log.log
log4j.appender.D.File = /export/servers/orderLogs/orderinfo.log
log4j.appender.D.Append = true
log4j.appender.D.Threshold = DEBUG
log4j.appender.D.layout = org.apache.log4j.PatternLayout
#log4j.appender.D.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss}  [ %t:%r ] - [ %p ]  %m%n
log4j.appender.D.layout.ConversionPattern = %m%n### 输出ERROR 级别以上的日志到=E://logs/error.log ###
log4j.appender.E = org.apache.log4j.DailyRollingFileAppender
log4j.appender.E.File = /export/servers/orderLogs/ordererror.log
log4j.appender.E.Append = true
log4j.appender.E.Threshold = ERROR
log4j.appender.E.layout = org.apache.log4j.PatternLayout
#log4j.appender.E.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss}  [ %t:%r ] - [ %p ]  %m%n
log4j.appender.E.layout.ConversionPattern =  %m%n

第三步:开发日志生产代码

package cn.itcast.realboard;import org.apache.log4j.Logger;import java.text.ParseException;public class LogOperate {private static Logger printLogger = Logger.getLogger("printLogger");public static void main(String[] args) throws ParseException, InterruptedException {PaymentInfo paymentInfo = new PaymentInfo();while (true){String random = paymentInfo.random();System.out.println(random);printLogger.info(random);Thread.sleep(1000);}}
}

第四步:将程序打包并上传服务器运行

将我们的程序进行打包,并上传到node3服务器进行运行,产生日志处理

第五步:运行jar包

node3执行以下命令运行Java程序

java -jar day12_kafka-1.0-SNAPSHOT-jar-with-dependencies.jar

第六步:开发flume配置文件,实现收集数据到kafka

node3执行以下命令,开发flume配置文件

cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin/conf
vim file_kafka.conf
#为我们的source channel  sink起名
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#指定我们的source收集到的数据发送到哪个管道
a1.sources.r1.channels = c1
#指定我们的source数据收集策略
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /export/servers/orderLogs/orderinfo.log#指定我们的channel为memory,即表示所有的数据都装进memory当中
a1.channels.c1.type = memory
#指定我们的sink为kafka  sink,并指定我们的sink从哪个channel当中读取数据
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = itcast_order
a1.sinks.k1.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1

启动flume

bin/flume-ng agent -c conf -f conf/file_kafka.conf -n a1

第七步:kafka启动console控制台,消费数据以验证数据进入kafka

node1执行以下命令进入kafka控制台进行消费,消费kafka当中的数据以验证数据计入kafka

cd /export/servers/kafka_2.11-1.0.0
bin/kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic itcast_order --from-beginning

消息消费代码实现

定义redis工具类

定义redis工具类

package cn.itcast.realboard;import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;/*** 主要用于获取jedis的客户端连接**/
public class JedisUtils {private static JedisPool jedisPool;public static JedisPool getJedisPool(){if (null==jedisPool){JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();jedisPoolConfig.setMaxTotal(20);jedisPoolConfig.setMaxIdle(10);jedisPoolConfig.setMaxIdle(5);jedisPoolConfig.setMaxWaitMillis(3000);jedisPool = new JedisPool(jedisPoolConfig,"node1",6379);}return jedisPool;}public static void main(String[] args) {JedisPool jedisPool = getJedisPool();Jedis resource = jedisPool.getResource();resource.set("setkey","setvalue");resource.close();}}

开发kafka消费代码

package cn.itcast.realboard;import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;import java.util.*;public class MyKafkaConsumer {/*** 消费itcast_order这个topic里面的数据* @param args*/public static void main(String[] args) {Properties props = new Properties();//指定Kafka的服务器地址props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");//指定消费组的名字props.put("group.id", "testGroup");//允许程序自动提交offset 提交offset保存到了Kafka当中的一个topic中取props.put("enable.auto.commit", "false");//每隔多长时间提交一次offset的值/*** 157 hello offset 上一秒提交的offset** 287 hello world* 295 abc test 900ms 宕机了怎么办?* 351 hello abc 1000ms** 有可能造成重复消费的一些问题**///props.put("auto.commit.interval.ms", "1000");//定义key和value的序列化props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);kafkaConsumer.subscribe(Arrays.asList("itcast_order"));while (true){//获取topic中所有数据ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(3000);Set<TopicPartition> partitions = consumerRecords.partitions();for (TopicPartition topicPartition : partitions) {List<ConsumerRecord<String, String>> records = consumerRecords.records(topicPartition);for (ConsumerRecord<String, String> record : records) {//处理业务逻辑JedisPool jedisPool = JedisUtils.getJedisPool();Jedis jedis = jedisPool.getResource(); //获取jedis客户端String value = record.value();//获取json格式字符串//将json格式字符串转换成为对象PaymentInfo paymentInfo = JSONObject.parseObject(value, PaymentInfo.class);long payPrice = paymentInfo.getPayPrice();//redis中的key一般都是约定俗成的//求取平台销售总额度jedis.incrBy("itcast:order:total:price:date",payPrice);//平台今天下单人数jedis.incr("itcast:order:total:user:date");//平台商品销售数量 简单认为一个订单就一个商品jedis.incr("itcast:order:total:num:date");//每个商品的总销售额jedis.incrBy("itcast:order:"+paymentInfo.getProductId()+":price:date",payPrice);//每个商品的购买人数jedis.incr("itcast:order:"+paymentInfo.getProductId()+":user:date");//每个商品的销售数量jedis.incr("itcast:order:"+paymentInfo.getProductId()+":num:date");//每个店铺的总销售额jedis.incrBy("itcast:order:"+paymentInfo.getShopId()+":price:date",payPrice);//每个店铺的购买人数jedis.incr("itcast:order:"+paymentInfo.getShopId()+":user:date");//每个店铺的销售数量jedis.incr("itcast:order:"+paymentInfo.getShopId()+":num:date");jedis.close();}//每个分区完成之后提交一次offset值long offset = records.get(records.size() - 1).offset();Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataMap = Collections.singletonMap(topicPartition, new OffsetAndMetadata(offset));kafkaConsumer.commitSync(topicPartitionOffsetAndMetadataMap);}}}
}

成功写入redis数据库模拟实时数据读取

19 kafka消息队列相关推荐

  1. SpringBoot集成Kafka消息队列

    1.说明 Spring可以方便的集成使用 Kafka消息队列 , 只需要引入依赖包spring-kafka, 注意版本兼容问题, 本文详细介绍SpringBoot集成Kafka的方法, 以及生产者和消 ...

  2. 【无废话】SpringBoot集成Kafka消息队列

    0.前言 本人整理收藏了22年多家公司面试知识点整理 ,以及各种Java核心知识点免费分享给大家,我认为对面试与学习来说是非常有用的,想要资料的话请点白嫖这份答案←戳我** 1.说明 Spring可以 ...

  3. kafka消息队列的概念理解

    kafka在大数据.分布式架构中都很流行.kafka可以进行流式计算,也可以做为日志系统,还可以用于消息队列. kafka作为消息队列的优点: 分布式的系统 高吞吐量.即使存储了许多TB的消息,它也保 ...

  4. 使用kafka消息队列中间件实现跨进程,跨服务器的高并发消息通讯

    作者 | 陈屹       责编 | 欧阳姝黎 近来工作上接收到一项任务,实现c++后台服务器程序,要求它能承载千万级别的DAU读写请求.目前实现千万级高并发海量数据请求的服务器设计在"套路 ...

  5. kafka 消息队列

    kafka 消息队列 kafka 架构原理 大数据时代来临,如果你还不知道Kafka那就真的out了!据统计,有三分之一的世界财富500强企业正在使用Kafka,包括所有TOP10旅游公司,7家TOP ...

  6. Java+Kafka消息队列

    本文主要针对,Java端对Kafka消息队列的生产和消费.Kafka的安装部署,请看查看相关文章. 笔者最近所用的是Spring mvc,监听文件路径,然后将读取到的文件内容发送到消息队列中.由另外系 ...

  7. kafka消息队列应用总结

    kafka官网: Apache Kafka 公司使用阿里云提供的kafka消息队列服务,分别为测试环境与生产环境,部署了多个集群. 使用场景:应用对外提供API接口调用,同时支持kafka增量消息推送 ...

  8. Flink使用KafkaSource从Kafka消息队列中读取数据

    Flink使用KafkaSource从Kafka消息队列中读取数据 使用KafkaSource从Kafka消息队列中读取数据 1.KafkaSource创建的DataStream是一个并行的DataS ...

  9. Kafka—消息队列

    Kafka-消息队列(理论部分) 一.Kafka概述 1.1.简介 kafka是一个分布式的基于发布/订阅模式的消息队列 主要应用场景:大数据实时处理领域 1.2.什么是消息队列? 消息队列 = 消息 ...

最新文章

  1. 基于SLP协议的BCH黑客马拉松即将开展
  2. 运维中的日志切割操作梳理(Logrotate/python/shell脚本实现)
  3. caffe学习(三):caffe开发环境安装(Ubuntu)
  4. PAT_B_1033_Java(20分)
  5. Windows下显示目录大小及文件个数
  6. reentrantlock非公平锁不会随机挂起线程?_程序员必须要知道的ReentrantLock 及 AQS 实现原理...
  7. POJ 3461Oulipo KMP模板
  8. kafka多个消费者消费一个topic_kafka:一文读懂消费者背后的那点quot;猫腻quot;
  9. 时间序列趋势判断(一)——斜率阈值判断
  10. python接口自动化测试二十二:文件下载
  11. 通过 Powershell 来替换 ARM 模式下虚拟机的网络接口
  12. optisystem中器件的学习(4-Test Sets/Passives Library/Optical Switches)
  13. 廖雪峰git教程总结
  14. Xshell6、Xftp6【官方免费版】下载
  15. 宋人千首绝句【全十卷】
  16. FPGA MF-TDMA SCPC TPC QPSK DVB-S2 IP core
  17. Deel:20个月,ARR 1M to 100M。
  18. 周易六十四卦——风山渐卦
  19. 利用Prometheus(普罗米修斯)Grafana对机器群的运行时各项数据进行监控
  20. 人工智能入门相关书籍

热门文章

  1. html中meta的写法规范,HTML代码meta标签的charset 属性写法及用法
  2. JavaScript----与函数大战的207个回合(来日再战)
  3. 企业常见的数据泄露点梳理
  4. 电子零售商进军百货业务 棘手的供应链(转)
  5. Scrum开发管理方法的由来、团队建设与实施过程
  6. 实验---采用SOM网络进行聚类
  7. matlab如何查看眼图q值,详解:什么是眼图、眼图怎么看?
  8. 全球室内设计界NO.1力作!东呈联合HBA打造柏曼酒店;万豪集团在上海开设第五家福朋喜来登酒店 | 美通社头条...
  9. 《以道御术》荣耀上市,高管书评
  10. 专业卡与游戏卡的区别