Python操作Kafka原理及使用详解

一、什么是Kafka

Kafka是一个分布式流处理系统,流处理系统使它可以像消息队列一样publish或者subscribe消息,分布式提供了容错性,并发处理消息的机制

二、Kafka的基本概念

kafka运行在集群上,集群包含一个或多个服务器。kafka把消息存在topic中,每一条消息包含键值(key),值(value)和时间戳(timestamp)。

kafka有以下一些基本概念:

Producer - 消息生产者,就是向kafka broker发消息的客户端。

Consumer - 消息消费者,是消息的使用方,负责消费Kafka服务器上的消息。

Topic - 主题,由用户定义并配置在Kafka服务器,用于建立Producer和Consumer之间的订阅关系。生产者发送消息到指定的Topic下,消息者从这个Topic下消费消息。

Partition - 消息分区,一个topic可以分为多个 partition,每个

partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。

Broker - 一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。

Consumer Group - 消费者分组,用于归组同类消费者。每个consumer属于一个特定的consumer group,多个消费者可以共同消息一个Topic下的消息,每个消费者消费其中的部分消息,这些消费者就组成了一个分组,拥有同一个分组名称,通常也被称为消费者集群。

Offset - 消息在partition中的偏移量。每一条消息在partition都有唯一的偏移量,消息者可以指定偏移量来指定要消费的消息。

三、Kafka分布式架构

如上图所示,kafka将topic中的消息存在不同的partition中。如果存在键值(key),消息按照键值(key)做分类存在不同的partiition中,如果不存在键值(key),消息按照轮询(Round Robin)机制存在不同的partition中。默认情况下,键值(key)决定了一条消息会被存在哪个partition中。

partition中的消息序列是有序的消息序列。kafka在partition使用偏移量(offset)来指定消息的位置。一个topic的一个partition只能被一个consumer group中的一个consumer消费,多个consumer消费同一个partition中的数据是不允许的,但是一个consumer可以消费多个partition中的数据。

kafka将partition的数据复制到不同的broker,提供了partition数据的备份。每一个partition都有一个broker作为leader,若干个broker作为follower。所有的数据读写都通过leader所在的服务器进行,并且leader在不同broker之间复制数据。

上图中,对于Partition 0,broker 1是它的leader,broker 2和broker 3是follower。对于Partition 1,broker 2是它的leader,broker 1和broker 3是follower。

在上图中,当有Client(也就是Producer)要写入数据到Partition 0时,会写入到leader Broker 1,Broker 1再将数据复制到follower Broker 2和Broker 3。

在上图中,Client向Partition 1中写入数据时,会写入到Broker 2,因为Broker 2是Partition 1的Leader,然后Broker 2再将数据复制到follower Broker 1和Broker 3中。

上图中的topic一共有3个partition,对每个partition的读写都由不同的broker处理,因此总的吞吐量得到了提升。

四、kafka-python实现生产者消费者

kafka-python是一个python的Kafka客户端,可以用来向kafka的topic发送消息、消费消息。

这个实验会实现一个producer和一个consumer,producer向kafka发送消息,consumer从topic中消费消息。结构如下图

producer代码

consumer代码

接下来创建test topic

打开两个窗口中,我们在window1中运行producer,如下

在window2中运行consumer,如下

可以看到window2中的consumer成功的读到了producer写入的数据

五、消费组实现容错性机制

这个实验将展示消费组的容错性的特点。这个实验中将创建一个有2个partition的topic,和2个consumer,这2个consumer共同消费同一个topic中的数据。结构如下所示

producer部分代码和实验一相同,这里不再重复。consumer需要指定所属的consumer group,代码如下

接下来我们创建topic,名字test,设置partition数量为2

打开三个窗口,一个窗口运行producer,还有两个窗口运行consumer。

运行consumer的两个窗口的输出如下:

可以看到两个consumer同时运行的情况下,它们分别消费不同partition中的数据。window1中的consumer消费partition 0中的数据,window2中的consumer消费parition 1中的数据。

我们尝试关闭window1中的consumer,可以看到如下结果

刚开始window2中的consumer只消费partition1中的数据,当window1中的consumer退出后,window2中的consumer中也开始消费partition 0中的数据了。

六、offset管理

kafka允许consumer将当前消费的消息的offset提交到kafka中,这样如果consumer因异常退出后,下次启动仍然可以从上次记录的offset开始向后继续消费消息。

这个实验的结构和实验一的结构是一样的,使用一个producer,一个consumer,test topic的partition数量设为1。

producer的代码和实验一中的一样,这里不再重复。consumer的代码稍作修改,这里consumer中打印出下一个要被消费的消息的offset。consumer代码如下

在一个窗口中启动producer,在另一个窗口并且启动consumer。consumer的输出如下

可以尝试退出consumer,再启动consumer。每一次重新启动,consumer都是从offset=98的消息开始消费的。

修改consumer的代码如下,在consumer消费每一条消息后将offset提交回kafka

启动consumer

可以看到consumer从offset=98的消息开始消费,到offset=829时,我们Ctrl+C退出consumer。

我们再次启动consumer

可以看到重新启动后,consumer从上一次记录的offset开始继续消费消息。之后每一次consumer重新启动,consumer都会从上一次停止的地方继续开始消费。

python使用kafka原理详解_Python操作Kafka原理及使用详解相关推荐

  1. python对sqlite增删改查_Python操作SQLite数据库的方法详解【导入,创建,游标,增删改查等】...

    本文实例讲述了python操作SQLite数据库的方法.分享给大家供大家参考,具体如下: SQLite简介 SQLite,是一款轻型的数据库,是遵守ACID的关系型数据库管理系统,它包含在一个相对小的 ...

  2. python excel详解_python操作excel详解

    前提: python操作excel需要使用的模块有xlrd.xlwt.xlutils.对excel进行读.写.更新操作.操作excel时需要先导入这些模块,demo如下: excel-读操作知识点: ...

  3. python xlwt xlrd模块详解_python操作excel之xlrd、xlwt模块详解

    python操作excel主要用到xlrd和xlwt这两个库,即xlrd是读excel,xlwt是写excel的库. 可从这里下载https://pypi.python.org/pypi.下面分别记录 ...

  4. python数据清理的实践总结_python 数据的清理行为实例详解

    python 数据的清理行为实例详解 数据清洗主要是指填充缺失数据,消除噪声数据等操作,主要还是通过分析"脏数据"产生的原因和存在形式,利用现有的数据挖掘手段去清洗"脏数 ...

  5. python标准类型内建模块_Python内建模块struct实例详解

    本文研究的主要是Python内建模块struct的相关内容,具体如下. Python中变量的类型只有列表.元祖.字典.集合等高级抽象类型,并没有像c中定义了位.字节.整型等底层初级类型.因为Pytho ...

  6. python中configparser详解_Python中的ConfigParser模块使用详解

    1.基本的读取配置文件 -read(filename) 直接读取ini文件内容 -sections() 得到所有的section,并以列表的形式返回 -options(section) 得到该sect ...

  7. python分析方向的第三方库_Python标准库与第三方库详解

    干货大礼包!21天带你轻松学Python(文末领取更多福利) 点击查看课程视频地址 本课程来自于千锋教育在阿里云开发者社区学习中心上线课程<Python入门2020最新大课>,主讲人姜伟. ...

  8. python简单计算器综合实验报告_Python实现的简单计算器功能详解

    本文实例讲述了Python实现的简单计算器功能.分享给大家供大家参考,具体如下: 使用python编写一款简易的计算器 计算器效果图 首先搭建计算器的面板: 计算器面板结构 建造一个继承于wx.Fra ...

  9. python爬虫多线程是什么意思_python爬虫中多线程的使用详解

    queue介绍 queue是python的标准库,俗称队列.可以直接import引用,在python2.x中,模块名为Queue.python3直接queue即可 在python中,多个线程之间的数据 ...

最新文章

  1. 求1+2+……+N的和
  2. .NET Core跨平台的奥秘[中篇]:复用之殇
  3. guice 实例_使用Google Guice消除实例之间的歧义
  4. 关于摄像头的一些零碎知识
  5. mac cad石材填充图案_CAD电视背景墙画法步骤
  6. 1.4 如何学习设计模式
  7. 【博客美化】09.评论带头像,且支持旋转
  8. 一体机的扫描至网络共享的设置
  9. OLEDB, ODEB, ADO.NET Abbreviation
  10. jquery中checkbox全选失效的解决方法
  11. 蓝桥练习 之 单词个数统计
  12. 莫烦Tensorflow学习笔记(10-12)——构建简单的神经网络及其可视化
  13. Spring中@Component,@Service等注解如何被解析?
  14. 开源 MQTT 服务器
  15. Android Root
  16. R语言 RStudio快捷键
  17. JavaScript-事件和事件对象、实现键盘打字小游戏
  18. Python——列表与元组
  19. 算法小记(1)--判断三个数的最大,最小
  20. 漫步数理统计三十——依概率收敛

热门文章

  1. 集中式开发和分布式开发的区别
  2. 深耳道微型无线蓝牙耳机
  3. JavaScript,Mysql,Java中substring,substr区别
  4. jquery插件猫冬formValidator3.X版本中ajaxValidator问题
  5. 百度地图重磅发布《2019年春运出行预测报告》
  6. Linux zip 压缩,解压
  7. 东方博宜oj部分答案
  8. Python学习笔记 | 练习1:数字形式转换 I
  9. UE5实现建筑剖切效果
  10. 面试美团,完全实况30+面试真题与答案公布。不得不说细节拉满想要拿到一个大厂offer还真不容易。