Kafka原理+操作+实战

前面我和大家交流了kafka的部署安装。对于部署安装这都是小意思,不值得太多的提及。重点还是需要知道kafka原理、熟练掌握kafka命令以及灵活用于kafka场景。 干货走起~

#回顾4种部署kafka方案, 如下链接:
《大数据组件运维—kafka安装部署》-大宁。
https://blog.csdn.net/myself_ning/article/details/125564263?spm=1001.2014.3001.5501

接下来,就按照 kakfa原理kafka操作命令kafka场景应用 逐个和大家进行交流。

一、kafka原理
二、kafka操作命令
三、kafka场景应用

一、kafka原理

kafka简单介绍

#1、kafka是一个分布式的基于发布/订阅模式的消息队列(MQ), 主要应用于大数据实时处理领域。​  通常消息队列有两种模式,‘点对点模式’和‘发布订阅模式’#2、‘点对点模式’和‘发布订阅模式’有什么区别呢?​   点对点模式——消费者主动拉取数据,消息收到后消息清除。(既,消息生产者生成数据发送到Queue中,然后消息消费者从Queue中提取并且消费消息,消息被消费之后,queue中不再有储存。所以对于一条消息而言,只能被一个消费者消费)​ 发布订阅模式——生产者将消息发布到topic中,同时有多个消费者消费消息,和点对点不同,发布到topic的消息会被所有的订阅者消费。#3.kafka消息队列异步通信的优点:​ 解耦:两个不必同时在线。​    削峰/缓冲:解决发的快收的慢的情况,又叫 “削峰”, 相当于缓冲。​ 可恢复性:系统一部分组件失效时候,不会影响整个系统。​   灵活性:因为很多消息队列是分布式的,所以可以动态的增减机器。(例如618双11,增加机器)​  异步通信:很多时候,用户不需要立马得到回复.#4.对消息队列的理解​    消息队列仅仅是消息队列,不是存储文件的系统,所以数据一般会设置保留时间长度​    发布订阅模式,也就是一个生产者多个消费者模型​  一般情况,点对点模式是消费者主动拉取数据,发布订阅模式是主动推给消费者。但是kafka考虑到发布订阅模式如果主动推数据的话,当下游来不及消费完毕那么就会把下游干崩了。所以kafka的消费订阅模式是主动拉取数据的这个机制。#kakfa存在着一定的缺点。​    kafka由于是主动去队列中拉取数据的,所以kafka要经常去询问队列是否有新的数据生成,这样就会造成不必要的资源浪费,如果长时间没有数据进入队列,那么kafka会做很多次无意义的访问。 但是目前kafka的机制是,如果访问一定次数之后还是没有新数据,他就会延长下一次访问的间隔时间。

Kafka与其他消息队列技术的比较

#Kafka与常见消息队列的对比
RabbitMQ​   Erlang编写​   支持很多的协议:AMQP,XMPP, SMTP, STOMP​   非常重量级,更适合于企业级的开发​    发送给客户端时先在中心队列排队。对路由,负载均衡或者数据持久化都有很好的支持。Redis​    基于Key-Value对的NoSQL数据库​  入队时,当数据比较小时Redis的性能要高于RabbitMQ,而如果数据大小超过了10K,Redis则慢的无法忍受;​ 出队时,无论数据大小,Redis都表现出非常好的性能,而RabbitMQ的出队性能则远低于Redis。ZeroMQ​ 号称最快的消息队列系统,尤其针对大吞吐量的需求场景。​  高级/复杂的队列,但是开发人员需要自己组合多种技术框架,技术上的复杂度是对这MQ能够应用成功的挑战。​   具有一个独特的非中间件的模式,不需要安装和运行一个消息服务器或中间件​  ZeroMQ仅提供非持久性的队列,也就是说如果宕机,数据将会丢失。ActiveMQ​    类似于ZeroMQ,它能够以代理人和点对点的技术实现队列。​   类似于RabbitMQ,它少量代码就可以高效地实现高级应用场景。Kafka/Jafka​ 高性能跨语言分布式发布/订阅消息队列系统​   快速持久化,可以在O(1)的系统开销下进行消息持久化;​  高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;​   完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现负载均衡;​ 支持Hadoop数据并行加载,对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka通过Hadoop的并行加载机制统一了在线和离线的消息处理。​    一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。​   采用零拷贝技术

kafka原理架构图分析

生产者 --> kafka集群管理消息#Broker 1,2,3 表示一个集群的三台服务器。#数据并不是存在Brocker中的,Brocker里面有多个Topic (例如‘点赞’消息的主题,‘浏览’消息的主题)#每个主题(Topic)的数据会被拆分到多个Partition(分区个数由创Topic时候设定)其中的Partition(分区)为了 提高Topic的负载均衡假设一个Topic有多个分区,则生产者以轮询的方式向队列中发送消息 提高了kafka集群的负载能力,同时也提高了并发度#延申——各个组件中设置分区的作用:MR中分区——提高reduce这边的并发   ; hive分区——查询的时候可以减少读取数据的量。   Kafka的Topic这边的分区是为了 提高Topic的负载均衡#Leader 和 Follower其中还有Leader 指的是分区的Leader不是集群或者kafka的leader  有leader就有Follower, 其中Follower起着备份的作用, 所以说Leader和Follower不在同一台机器的,当leader挂掉,follower转化为leadwer生产者,消费者只连接Leader, Follower只起到备份作用。 只有Leader挂掉了Follower才起作用。总结:Leader和Follower都是针对 {Topic_A , Partition_m}的,也就是说:”每个主题的每个分区来选举出Leader和Follower“。kafka集群管理消息 --> 消费者#Consumer(消费者) 或者 Consumer group(消费者组)来消费数据#消费者组:多个消费者用同一个组名,那么这些消费者就是同一个组的。#注意1:一个Partition中的数据只能被同一个消费者组中的一个消费者消费。因为它是把同一个组看成一个大的消费整体, 消费者组的优点是提高消费能力。所以效率最好的是消费者个数=同一个主题的分区数  的时候#注意2:一个消费者组中的消费者可能会消费多个分区的数据(当 消费者数量<分区数量)。Zookeep注册消息#kafka集群的正常工作 需要依赖ZK,    kafka如果做成集群 只需要将其所用的ZK放在同一个集群即可。#ZK在其中的作用:1)保存一些kafka信息。 2)保存消费者消费的进度信息。#因为消息队列有解耦功能,所以,如果一个消费者挂了,重启之后还可以继续消费, 可以这么做的原因是:消费者里面保存着消费的进度保存肯定不在在内存里的呀,因为重启机器内存清空。0.9版之前消费者offect保存在zk里面呢,0.9版本之后存在kafka中, 准确的说保存在kafka的一个特殊的Topic中,这个Topic由系统自动创建(后面介绍)唉?为啥从zk迁移到kafka中呢?因为和zk之间通信不如跟kafka请求的快,并且多次请求zk对zk是压力啊。尤溪~)

kafka工作流程和文件存储

工作流程

​ 图中 1,2,3,4,5… 表示的是偏移量,不是数据哈。不然容易看不懂或者误解。会误解成”咿~。为啥三个分区数据是一样的呀?不是说轮询着往分区中写入数据的嘛?“ 在此重申:这里的1,2,3,4 理解成这个分区的第1个数据,第2个数据,第3个数据 …
​ 1)如图:这个状态可以看出生产者Producer生产了15条数据(6+4+5)
​ 2)每个分区都有各自独立的偏移量。不要误认为:生产者生产的第一个数据偏移量为1,生产者生产的第二个数据偏移量为2…(错误), 所以不要误认为是全局的偏移量。所以说呀,kafka消费者保证的是分区消费有序性,而不是全局消费有序性。

#隐式创建:当生产者向一个不存在的topic中传数据的时候,系统会默认创建这个topic, 这个topic有一个分区,一个备份。分区数/备份数 这个默认值可以自己改。#副本数为 n: 表示Leader 1个, follower n-1个。 并且n-1个副本不可能有两分在同一台机子上。#偏移量(offset) : 表示生产者所发的消息量 被分在不同分区的偏移量#Follower 主动找到Leader去把数据给备份过来#消费者 主动去找Leader去消费数据。#生产者写数据是往分区询写的,(例如topic设置2个partition,那么数据会轮询写到两个分区(一般是两台机子),其中两个分区分别写Topic_0, Topic_1。 再假设topic设置2个备份, 那么Topic_0, Topic_1会被备份到其他机子上(非本机)。 数据在每个分区可以保证有序性,但是在全局没法保证有序性质。#kafka中消息是以topic进行分类的,生产者产生消息,消费者消费消息,都是面向topic的。 、
#topic是逻辑上的概念,而partition是物理上的概念,每个partition对应一个文件,该log文件中存储的就是parducer产生的数据,partition生产的数据会被不断的追加到该log文件的末端。并且每条数据都有自己的offset, 消费者组中每个消费者都会记录自己消费到哪个offset, 以便错误回复时候,从上次错误处继续消费。#还有个问题,消费者记录着消费到数据的offset,那么这个offset指的是哪一个分区的offset? 还是说是全局offset? 按照道理说应该是全局offset,那么他哪里来的全局offset的呢?
也就是说,消费者不仅要保存offset,还要保存是哪一个分区对应的offset:如下。{分区1:offset1, 分区2:offset2, .... }

文件存储

cd home/hduser/kafka/data/topic名-0
ll
#我们会发现以下文件【图一】。
#其中:xxxxxxxxxxxxxxxxx.log 存数据log.retention.hours=168log.segment.bytes=1073741824#默认会保留168小时(7天), 也可以将生命周期设置成别的。#默认最大大小1073741824(1G),当这个文件到这么大时候,就会创建一个新的log数据文件。#这样就会遇到两个问题:(1)产生的新的文件是如何命名的呢?(2)消费者如何定位到第二个log文件?#对于上述两个问题由 00000000000000000000.index来解决。#kafka采用分片和索引的机制【图二】
#kafka采用分片和索引的机制,将每个partition分为多个segment, 每个segment对应一组index和log文件。这些文件位于同一个文件夹下。文件夹的命名规则为:topic名称+分区号。(例如 top1-0  top1-1   top1-2 .... 这个之前说过啦)
#其中segment(index+log)的命名规则是”该log中最小的偏移量“+log/index。最小的偏移量指的是:log中最先开始的哪个数据的偏移量。(既:index和log文件以当前segment的第一条消息的offset来命名)xxxxxxxxxxxxxxxxx.index
#index记载于log对应关系:【图三】
#index记载着offset和对应数据的起始位置。例如我想取offset=3 的数据,那么我只需要在index中找到offset=3和offset=4的对应数据起始位置, 然后按照[index[3], index[4])这个区间取log中取数据即可。
#因为,index中所用来记载每个offset的占用尺寸(元数据尺寸)是固定的,假设为m字节,所以当去找offset=n的index数据的时候,直接定位到xxxx.index文件的n*m 字节位置即可。相当于一个指针,直接定位。

生产者—生产者发往分区策略

#分区的原因:1)方便在集群中扩展。(每个Partition可以通过调整来适应它所在的机器,然而一个topic由多个Partition组成,因此整个集群就可以适应任意大小的数据了。)2)可以提高并发。 (因为可以以Partition为单位进行读写了)#分区的原则:1)情况1:生产者直接指明Partition分区。数据直接发往topic的Partition分区2)情况2:生产者没有指明Partition分区,但是有key的情况下。key的hash值 与 topic的分区数据进行求余。得到的结果值 作为生产者数据即将发往的分区。3)情况3:生产者没有指明Partition分区,也没有key的情况下。第一次回随机生成一个整数(往后每生成一个数据,这个值自增)值 与 topic的分区数据进行求余。得到的结果值 作为生产者数据即将       发往的分区。(round-robin算法)

生产者—数据可靠性保证

#根据<生产者发往分区策略>可以知道,生产者产生的数据即将发往哪个分区。  但是topic收到没收到还不知道尼....
所以诞生出ack(acKnowledgement 确认收到)机制#对于 Leader向 Producer发送Ack,什么时候发送Ack合适?1)Leader收到数据时?2)确保所有Follower与Leader同步完成时候?3)部分Follower同步Leader完成时?#什么时候发送Ack合适? 下面具体讨论。结论是:kafka默认选用的是 '2)确保所有Follower与Leader同步完成时候' Leader向Producer发送Ack。 后面<生产者ACK机制选择>中可以了解到,也可以选择别的。

生产者—IRS

#ISR(in-sync replica set)同步副本#ISR的作用:1)为了完成发送ack的这个点#ISR具体细节: 看完下图,然后再解释。假设有10个副本(1个Leader, 9个fellower。), 选择4个follower放在ISR中(什么样的follower才能被选中? 选中——1.通信快,2.同步的量差越小的【在kafka高版本中不考虑这个】)第2个选中因素,在kafka高版本中不考虑这个,因为:生产者往Leader发数据是成批[batch]的发送数据,假如batch数据量 > kafka设置的最低同步差异量的时候,岂不是ISR中所有follower都被踢出ISR, 所以这写ISR中的folower会被频繁的踢出/拉入ISR, [因为ISR中follower本身就通信快,所以除去后,又以优势被拉进来]。ISR的最终目的是防止Leader故障,而从选新的Leader。#ISR+Ack是如何保证 produce将数据发到topic的?答:Leader只要保证再ISR中的Lollower同步完毕,即可向Producer发送Ack

数据一致性

#说明
HW(高水位):副本保存最小的数据量。 HW之前的数据才对消费者(Consumer)可见。
LEO:指的是每个副本最大的offset。#HW之前的数据才对消费者(Consumer)可见。保证了消费者消费数据的一致性这也就印证了,为什么每次生产者输入数据,消费者并不能立马消费的现象。就是因为所有Follower还没同步完毕,以至于HW(高水位)还没后移。所以,即使Leader挂掉了选举新的Leader也不影响HW(高水位的位置)#HW的作用之一:消费者消费一致性. (注意HW并不是保证数据不丢失,数据不丢失是ACK来管的)#HW作用综合:(1)保证了消费者消费数据的一致性(2)实施了leader故障新leader选举的时候, 告诉其他follower都截取到HW, 然后follower从新的Leader同步数据。#疑问:
#在新选取出Leader的时候将所有Follower在HW之后进行截断 然后从新的Leader中同步数据这一步骤中 有个疑问:如果新Leader数据量<旧Leader数据量的时候,那么整体的数据不会丢失嘛?
答疑:答案是不会丢失数据的。因为如果旧Leader数据量 > 新Leader数据量,那也就是说旧Leader在接收完最后一批数据(也就是多出来的部分)后其他Follower还没来得及同步,旧Leader就挂了,进而说明旧Leader没有向Producer发送Ack就挂了。所以新选Leader后,Producer还会把这批数据再发一遍滴。

一次性语义

#At Least Once : 至少一次
#At Most Once : 至多一次
#Exactly Once : 精准一次性Exaclty Once语句:保证下游消费者数据既不重复也不丢失。
#Exaclty Once = ACK=-1(至少一次) + 幂等性(去重)
#幂等性:#幂等性,是在kafka集群中,对重复的数据进行去重。 #开启幂等性,只需要将Producer中enable.idompotence设置为True (次数kafka中默认ACK=-1)
#幂等性原理:无论当前数据发送多少次,在此仅仅保留一次。Produce(生产者)链接Kafka集群后,kafka会给Produce分配个PID(既producer的 ID),    当Producer发送消息时,会同时对每个消息编一个SeqNumber(序列化号),Broker端会对<PID, Partition, SeqNumber>做缓        存,当同一个客户端发来的数据的PID,SeqNumber相同时候,这时候要做去重。当PID, Partition, SeqNumber都一样的时候,可以看    成同一条数据,则不做写入。#问题:当Producer挂掉又恢复后,其会从新申请一个PID。 那么在此发送相同数据后,则不会被去重。因此,幂等性只能解决 单次会话,单个分区的数据重复问题。

二、kafka操作命令

日志

#kafka的日志保存在: ./logs/server.log 中
#如果启动后,jps发现进程没起来,我们就要看看这个log

topic操作

#如下 --zookeeper nodex:2181 都可以。这里看成像集群哪一台机子的zookeeper进行交流。#查看当前服务器中所有topic ============================================
./bin/kafka-topics.sh \
--zookeeper node1:2181 \
--list#创建一个 topic主题【包含主题名字,分区数,副本数】========================
./bin/kafka-topics.sh \
--zookeeper node1:2181 \
--create \
--topic 主题名字 \
--partitions 2 \
--replication-factor 2 \
#创建成功后,控制台会打印:Created topic 主题名字.
#这时候  ls kafka/logs/ 我们发现有个节点有会有“主题-0”,有个节点有会有“主题-1”。然后再随机找1个“非己”节点备份自己分区主题“主题-x”。 另外leader和flower也是根据某种机制选择的(后面介绍)#删除topic ========================================================
./bin/kafka-topics.sh \
--zookeeper node1:2181 \
--delete \
--topic 主题名字
#反馈如下:Topic t1 is marked for deletion.Note: This will have no impact if delete.topic.enable is not set   to   true.#显示t1 这个topic已经被标记删除, 下一行告诉我们delete.topic.enable 设置为true才会被真正删除,这里只是告知,即使我们已经这样设置,它还是会告诉我们滴。#查看topic的详情 ====================================================
./bin/kafka-topics.sh \
--zookeeper node1:2181 \
--describe \
--topic 主题名字
#反馈显示如下图:
第一行反馈 topic的名字, 分区数, 备份数, 相关配置
下面两行是展示topic在每个分区的信息:topic名字, 分区名字, Leader机, 备份机, ISR    

#注意1. 如果topic的 备份数(副本数) > 机器数 , 会报错:
Replication factor: 4 larger than available brokers: 3.
#但是分区数可以 > 机器数; 因为一个机器可以有多个分区啊

生产者、消费者基本操作

#生产者连接topic 【注意生产者需要连接topic和broker】====================
./bin/kafka-console-producer.sh \
--topic t1 \
--broker-list node1:9092
#这时候会出现 “>” 这表示在此可以往队列中输入东西了。如下图:

#消费者连接订阅的主题
./bin/kafka-console-consumer.sh \
--bootstrap-server node1:9092 \
--topic t1
#这时候在生产者 >输入数据时候,可以被该消费者接收。#因为kafka是异步的发布订阅模式, 所以生产者和消费者可以不同时在线,当后来有新的生产者的时候,我们可以让其从头消费,只需要加上--from-beginning 即可。
./bin/kafka-console-consumer.sh \
--bootstrap-server 127.0.0.1:9092 \
--topic t1 \
--from-beginning#注意1:在9.0版本之前,前面我们说过,消费者的消费记录都是保存在zookeeper中,但是之后的版本都是保存在kafka中的,所以,
#如果是9.0版本之前 上面的命令中--bootstrap-server node1:9091 要改成 --zookeeper node1:8082#注意3:消费者要想--from-beginning从头消费数据,必须在主题的有效期之内,默认是七天,前面安装的时候配置过:log.retention.hours=168  (168小时)#注意2:生产者发出一些消息之后,根据上面所说的0.9版本kafka之后,消费者消费信息是保存在kafka中(也就是说是kafka的本地)的。我们可以去三台机子的kafka/logs/下面看,可以看到三台机子轮询的保存着一些消费记录(消费者偏移量):如下图。
__consumer_offsets-x 就是保存消费者消费偏移量的。

生产者、消费者所有操作

#先查看一下有哪些参数kafka-console-consumer --helpkafka-console-producer --help#####################################################################################################
# kafka-console-producer --help
#####################################################################################################
This tool helps to read data from standard input and publish it to Kafka.
Option                                   Description
------                                   -----------
--batch-size <Integer: size>             Number of messages to send in a singlebatch if they are not being sentsynchronously. (default: 200)
--broker-list <String: broker-list>      REQUIRED: The broker list string inthe form HOST1:PORT1,HOST2:PORT2.
--compression-codec [String:             The compression codec: either 'none',compression-codec]                       'gzip', 'snappy', 'lz4', or 'zstd'.If specified without value, then itdefaults to 'gzip'
--help                                   Print usage information.
--line-reader <String: reader_class>     The class name of the class to use forreading lines from standard in. Bydefault each line is read as aseparate message. (default: kafka.tools.ConsoleProducer$LineMessageReader)
--max-block-ms <Long: max block on       The max time that the producer willsend>                                    block for during a send request(default: 60000)
--max-memory-bytes <Long: total memory   The total memory used by the producerin bytes>                                to buffer records waiting to be sentto the server. (default: 33554432)
--max-partition-memory-bytes <Long:      The buffer size allocated for amemory in bytes per partition>           partition. When records are receivedwhich are smaller than this size theproducer will attempt tooptimistically group them togetheruntil this size is reached.(default: 16384)
--message-send-max-retries <Integer>     Brokers can fail receiving the messagefor multiple reasons, and beingunavailable transiently is just oneof them. This property specifies thenumber of retires before theproducer give up and drop thismessage. (default: 3)
--metadata-expiry-ms <Long: metadata     The period of time in millisecondsexpiration interval>                     after which we force a refresh ofmetadata even if we haven't seen anyleadership changes. (default: 300000)
--producer-property <String:             A mechanism to pass user-definedproducer_prop>                           properties in the form key=value tothe producer.
--producer.config <String: config file>  Producer config properties file. Notethat [producer-property] takesprecedence over this config.
--property <String: prop>                A mechanism to pass user-definedproperties in the form key=value tothe message reader. This allowscustom configuration for a user-defined message reader.
--request-required-acks <String:         The required acks of the producerrequest required acks>                   requests (default: 1)
--request-timeout-ms <Integer: request   The ack timeout of the producertimeout ms>                              requests. Value must be non-negativeand non-zero (default: 1500)
--retry-backoff-ms <Integer>             Before each retry, the producerrefreshes the metadata of relevanttopics. Since leader election takesa bit of time, this propertyspecifies the amount of time thatthe producer waits before refreshingthe metadata. (default: 100)
--socket-buffer-size <Integer: size>     The size of the tcp RECV size.(default: 102400)
--sync                                   If set message send requests to thebrokers are synchronously, one at atime as they arrive.
--timeout <Integer: timeout_ms>          If set and the producer is running inasynchronous mode, this gives themaximum amount of time a messagewill queue awaiting sufficient batchsize. The value is given in ms.(default: 1000)
--topic <String: topic>                  REQUIRED: The topic id to producemessages to.#####################################################################################################
# kafka-console-consumer --help
#####################################################################################################
This tool helps to read data from Kafka topics and outputs it to standard output.
Option                                   Description
------                                   -----------
--bootstrap-server <String: server to    REQUIRED: The server(s) to connect to.connect to>
--consumer-property <String:             A mechanism to pass user-definedconsumer_prop>                           properties in the form key=value tothe consumer.
--consumer.config <String: config file>  Consumer config properties file. Notethat [consumer-property] takesprecedence over this config.
--enable-systest-events                  Log lifecycle events of the consumerin addition to logging consumedmessages. (This is specific forsystem tests.)
--formatter <String: class>              The name of a class to use forformatting kafka messages fordisplay. (default: kafka.tools.DefaultMessageFormatter)
--from-beginning                         If the consumer does not already havean established offset to consumefrom, start with the earliestmessage present in the log ratherthan the latest message.
--group <String: consumer group id>      The consumer group id of the consumer.
--help                                   Print usage information.
--isolation-level <String>               Set to read_committed in order tofilter out transactional messageswhich are not committed. Set toread_uncommittedto read allmessages. (default: read_uncommitted)
--key-deserializer <String:deserializer for key>
--max-messages <Integer: num_messages>   The maximum number of messages toconsume before exiting. If not set,consumption is continual.
--offset <String: consume offset>        The offset id to consume from (a non-negative number), or 'earliest'which means from beginning, or'latest' which means from end(default: latest)
--partition <Integer: partition>         The partition to consume from.Consumption starts from the end ofthe partition unless '--offset' isspecified.
--property <String: prop>                The properties to initialize themessage formatter. Defaultproperties include:print.timestamp=true|falseprint.key=true|falseprint.value=true|falsekey.separator=<key.separator>line.separator=<line.separator>key.deserializer=<key.deserializer>value.deserializer=<value.deserializer>Users can also pass in customizedproperties for their formatter; morespecifically, users can pass inproperties keyed with 'key.deserializer.' and 'value.deserializer.' prefixes to configuretheir deserializers.
--skip-message-on-error                  If there is an error when processing amessage, skip it instead of halt.
--timeout-ms <Integer: timeout_ms>       If specified, exit if no message isavailable for consumption for thespecified interval.
--topic <String: topic>                  The topic id to consume on.
--value-deserializer <String:deserializer for values>
--whitelist <String: whitelist>          Regular expression specifyingwhitelist of topics to include forconsumption.

kafka的log和data分开的操作

#step1 : 先删除三台机子的 logs;   (补充这里的logs文件是系统自动创的,放kafka启动的时候,如果发现kafka/下面没有logs 就会自动创建。)#注意:kafka在启动的时候,会先去zk上进行broker id的注册。   通常我们会遇到一种情况:我们把kafka中log数据删除了,接下来却没法启动kafka了。因为虽然kafka中logs数据删除了,但是元数据还在zk中还没删除呢,所以kafka启动不了,通常的解决办法是1)修改kafka的broker id从新启动。或者2)再去zk中删除之前数据的元数据信息。rm -rf kafka/logs#step2 : 删除其在zk之中的信息
cd /home/hduser/zookeeper/zookeeper-3.4.9/
./bin/zkCli.sh  或者CDH中 zookeeper-client
#进入控制台
ls /    #查看
#删除如下这些属于kafka在zk上注册的东西。
rmr /cluster
rmr /brokers
rmr /controller_epoch
rmr /admin
rmr /isr_change_notification
rmr /consumers
rmr /latest_producer_id_block
rmr /config
ls /
#也可以在stop ZK之后,去删除/home/hduser/zookeeper/zookeeper-3.4.9/zkdatas/version-2 ; 但是删除这个相当于ZK重装了,所以如果ZK还被其他组件依赖的时候,别这么做。接下来重启zk, 就和上面rmr操作的结果是一样的了。#step3 : 修改配置文件 kafka/config/kserver.properties
#三台机子都要改
vim /home/hduser/kafka/config/server.properties
由log.dirs=/home/hduser/kafka/log
改成log.dirs=/home/hduser/kafka/data#step4 : 启动三台机子的kafka; 再创建topic时候,数据就和log进行了分离。 不妨去ls logs 和ls data 进行查看一下。#补充:我们可以去data中一个topic下面看,可以发现 数据实际存放地叫做 0000000000000.log注意,这里是数据,不是log, 疑惑的地点就是,人家数据的文件后缀就是log, 你能咋办?你能做的就是接受,并且记住。如下第一个图    并且我们通过生产者往这个topic中写数据,看看这个log文件中记载什么? 如下第二个图,可以看到数据是保存在这里面了,只是它被序列化了(我们看起来好像是乱码。)

三、kafka api应用案例

生产

#实验代码:
import java.util.Propertiesimport org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata}object test1 {def main(args: Array[String]): Unit = {val properties = new Properties()properties.setProperty("bootstrap.servers", "139.196.237.231:9092")   //指定连接的kafka集群properties.setProperty("acks", "all")   //ack应答级别选取properties.setProperty("retries","3")   //重试次数//当数据到达16k或者等待1ms 发送一次数据。properties.setProperty("batch.size","16384")  //批次大小字节(16384B=16k)properties.setProperty("linger.ms","1") //等待时间//properties.setProperty("buffer.memory","33554432")  //RecordAccumulator缓冲区大小(33554432B=32M)//指定key,value的序列类。properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")println(properties)//创建生产者val producer = new KafkaProducer[String,String](properties)   //可以ctrl+Q 看看要传哪些东西。for( i <- 1 to 20){//producer调用send() 发送数据val record = new ProducerRecord[String, String]("ln1","sssss"+i.toString)producer.send(record, new Callback {override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {if(exception == null){println(s"发往分区:${metadata.partition()}\t偏移量:${metadata.offset()}")}}}) //发送数据}producer.close()  //关闭producer。}
}

消费

import java.util
import java.util.{Collections, Properties}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecords, KafkaConsumer}object test2 {def main(args: Array[String]): Unit = {/** KafkaConsumer kafka消费者.*  KafkaConsumer[String,String]中传入[key类型,value类型]*  1.创建消费者配置信息:(可以用字符串,也可以用消费者类)(properties.setProper或者用properties.put)*    连接集群信息*    开启自动提交*    自动提交的延迟时间(提交的是offect. 消费到哪个offect就提交哪个值. 默认1000(ms))*    key的反序列化*    value的反序列化*    消费者组*  2.创建消费者*    val consumer = new KafkaConsumer[String,String](properties)*  3.订阅主题**  4.获取数据** *///1.创建消费者配置信息val properties = new Properties()properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "139.196.237.231:9091,139.196.237.231:9092,139.196.237.231:9093")properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true")properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000")properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer")properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer")properties.put(ConsumerConfig.GROUP_ID_CONFIG,"g1")/*properties.setProperty("bootstrap.servers", "139.196.237.231:9091,139.196.237.231:9092,139.196.237.231:9093")properties.setProperty("enable.auto.commit","true")properties.setProperty("auto.commit.interval.ms", "1000")properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")properties.setProperty("group.id","g1")*///2.创建消费者val consumer = new KafkaConsumer[String,String](properties)   //自动不全功能:ctrl+alt+v//3.订阅主题//consumer.subscribe(Collections.singletonList("ln1"))   //订阅一个主题.consumer.subscribe(util.Arrays.asList("ln1","ln2"))   //订阅多个主题. 订阅一个主题也可以用这个//4.获取数据while(true){  //注意要把消费者获取数据步骤放在死循环里.不如一开启scala程序就执行完关闭了.val consumerRecords: ConsumerRecords[String, String] = consumer.poll(100)val ite = consumerRecords.iterator()while(ite.hasNext){val consumerRecord = ite.next()//可以获得 key,value,partition,offect,timestamp......println(s"topic:${consumerRecord.topic()}\tpartition:${consumerRecord.partition()}\tkey:${consumerRecord.key()}\tvalue:${consumerRecord.value()}\toffect:${consumerRecord.offset()}")}}//关闭consumer.close()}
}

Kafka原理+操作+实战相关推荐

  1. java进阶Kafka集群实战之原理分析及优化教程全在这里

    我不去想是否能够成功 既然选择了Java 便只顾风雨兼程 我不去想能否征服Kafka集群 既然钟情于Java 就勇敢地追随千锋 我不去想Kafka集群有多么晦涩难懂 既然目标是远方 留给世界的只能是努 ...

  2. kafka详解(JAVA API操作kafka、kafka原理、kafka监控)-step2

    1.JAVA API操作kafka  修改Windows的Host文件: 目录:C:\Windows\System32\drivers\etc (win10) 内容: 192.168.40.150 k ...

  3. Kafka原理--时间轮(延时操作)

    原文网址:Kafka原理--时间轮(延时操作)_IT利刃出鞘的博客-CSDN博客 简介 说明         本文介绍Kafka的时间轮的原理. Kafka没有延迟队列功能供用户使用,本文介绍的延时操 ...

  4. python使用kafka原理详解_Python操作Kafka原理及使用详解

    Python操作Kafka原理及使用详解 一.什么是Kafka Kafka是一个分布式流处理系统,流处理系统使它可以像消息队列一样publish或者subscribe消息,分布式提供了容错性,并发处理 ...

  5. Kafka原理篇:图解kakfa架构原理

    我想很多同学之前可能已经看过很多 Kafka 原理相关的文章,但往往看时"牛逼"声连连,激情满满,总觉得自己又学习到了各种"吊炸天"的技术.但很多同学往往是不觉 ...

  6. Kafka原理篇:图解kafka架构原理

    今天我们来深入讲解 Kafka 的架构和实现原理.[码哥]将从架构和细节入手,以生动的图深入讲解 Kafka 的实现原理. 我想很多同学之前可能已经看过很多 Kafka 原理相关的文章,但往往看时&q ...

  7. 读书笔记-SpringCloudAlibaba微服务原理与实战-谭锋-【未完待续】

    SpringCloudAlibaba微服务原理与实战 谭锋 电子工业出版社 ISBN-9787121388248 仅供参考, 自建索引, 以备后查 一.应用架构演进.微服务发展史 1.单体架构 一般来 ...

  8. 第三章:Python基础の函数和文件操作实战

    本課主題 Set 集合和操作实战 函数介紹和操作实战 参数的深入介绍和操作实战 format 函数操作实战 lambda 表达式介绍 文件操作函数介紹和操作实战 本周作业 Set 集合和操作实战 Se ...

  9. 【赠书】五一假期福利,OpenCV4最新原理与实战书籍

    五一假期快要到了,本次给大家赠送3本本月新书,这次赠送的书籍是<OpenCV 4机器学习算法原理与编程实战>. 这是一本什么样的书 OpenCV(Open Source Computer ...

最新文章

  1. 医学影像AI:全球市场展望
  2. 如何格式化电脑_Mac苹果电脑如何格式化?
  3. 利用libevent 和线程池实现高并发服务器的设计
  4. IOS审核的各个状态的时间
  5. 成功解决Both binary classification-only and multiclassification-only loss function or metrics specified
  6. mysql 备库 hang住_mysql主键的缺少导致备库hang住
  7. 闲聊Linux内存管理(1)
  8. 《Linux内核设计与实现》读书笔记(一)-内核简介
  9. boost::alignment_of相关的测试程序
  10. 十年经验工程师为何被裁?
  11. 解决the resource is not on the build path of a java project
  12. 100款违法违规APP下架整改:微店、更美等在列
  13. Julia 语言可重用性高竟源于缺陷和不完美?
  14. 【翻译】BCGControlBar Professional Edition for MFC v 29.0重大更新
  15. 电工模拟接线软件 app_图文详解:户内配电箱的安装及接线方法 ,电气初学者必看!...
  16. 用GaussView,Gaussian软件演示小分子的振动和红外波数
  17. linux离线安装pg数据库
  18. 一个nginx小白的vue项目部署的成功!
  19. 圣诞表白html,Pyhton表白代码——浪漫圣诞节
  20. Typora MarkDown语法

热门文章

  1. @Valid 注解详解 Java Bean Validation的前世今生
  2. 免费css代码下载-Free Css Templates
  3. ViewFlipper和ViewPager
  4. Quartz配置参考
  5. Opencv2.4.9源码分析——Stitching(五)
  6. 一、计算机基础: 特点、数制、编码、组成
  7. VR科技赋能智慧冬奥
  8. HTTPS文件服务器搭建,搭建一个简易的https
  9. CNDS 创建属于自己的专栏
  10. js调用本地摄像头拍照截图,提交后台