kafka简介(摘自百度百科)

一、简介:

详见:https://blog.csdn.net/Beyond_F4/article/details/80310507

二、安装

详见博客:https://blog.csdn.net/beyond_f4/article/details/80095689

三、按照官网的样例,先跑一个应用

1、生产者:

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['172.21.10.136:9092']) #此处ip可以是多个['0.0.0.1:9092','0.0.0.2:9092','0.0.0.3:9092' ]

for i in range(3):

msg = "msg%d" % i

producer.send('test', msg)

producer.close()

2、消费者(简单demo):

from kafka import KafkaConsumer

consumer = KafkaConsumer('test',

bootstrap_servers=['172.21.10.136:9092'])

for message in consumer:

print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,

message.offset, message.key,

message.value))

启动后生产者、消费者可以正常消费。

3、消费者(消费群组)

from kafka import KafkaConsumer

consumer = KafkaConsumer('test',

group_id='my-group',

bootstrap_servers=['172.21.10.136:9092'])

for message in consumer:

print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,

message.offset, message.key,

message.value))

启动多个消费者,只有其中可以可以消费到,满足要求,消费组可以横向扩展提高处理能力

4、消费者(读取目前最早可读的消息)

from kafka import KafkaConsumer

consumer = KafkaConsumer('test',

auto_offset_reset='earliest',

bootstrap_servers=['172.21.10.136:9092'])

for message in consumer:

print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,

message.offset, message.key,

message.value))

auto_offset_reset:重置偏移量,earliest移到最早的可用消息,latest最新的消息,默认为latest

源码定义:{'smallest': 'earliest', 'largest': 'latest'}

5、消费者(手动设置偏移量)

from kafka import KafkaConsumer

from kafka.structs import TopicPartition

consumer = KafkaConsumer('test',

bootstrap_servers=['172.21.10.136:9092'])

print consumer.partitions_for_topic("test") #获取test主题的分区信息

print consumer.topics() #获取主题列表

print consumer.subscription() #获取当前消费者订阅的主题

print consumer.assignment() #获取当前消费者topic、分区信息

print consumer.beginning_offsets(consumer.assignment()) #获取当前消费者可消费的偏移量

consumer.seek(TopicPartition(topic=u'test', partition=0), 5) #重置偏移量,从第5个偏移量消费

for message in consumer:

print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,

message.offset, message.key,

message.value))

6、消费者(订阅多个主题)

from kafka import KafkaConsumer

from kafka.structs import TopicPartition

consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092'])

consumer.subscribe(topics=('test','test0')) #订阅要消费的主题

print consumer.topics()

print consumer.position(TopicPartition(topic=u'test', partition=0)) #获取当前主题的最新偏移量

for message in consumer:

print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,

message.offset, message.key,

message.value))

7、消费者(手动拉取消息)

from kafka import KafkaConsumer

import time

consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092'])

consumer.subscribe(topics=('test','test0'))

while True:

msg = consumer.poll(timeout_ms=5) #从kafka获取消息

print msg

time.sleep(1)

8、消费者(消息挂起与恢复)

from kafka import KafkaConsumer

from kafka.structs import TopicPartition

import time

consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092'])

consumer.subscribe(topics=('test'))

consumer.topics()

consumer.pause(TopicPartition(topic=u'test', partition=0))

num = 0

while True:

print num

print consumer.paused() #获取当前挂起的消费者

msg = consumer.poll(timeout_ms=5)

print msg

time.sleep(2)

num = num + 1

if num == 10:

print "resume..."

consumer.resume(TopicPartition(topic=u'test', partition=0))

print "resume......"

pause执行后,consumer不能读取,直到调用resume后恢复。

如果对您有帮助,记得给我点赞诺

如果对您有帮助,记得给我点赞诺

python消费kafka_Python脚本消费kafka数据相关推荐

  1. kafka 的pom文件_Flink 消费 Kafka 数据

    kafka核心概念: Kafka 是一个消息队列,生产者向消息队列中写入数据,消费者从队列中获取数据并进行消费.可以认为一个 Topic 就是一个队列,每个 Topic 又会被分成多个 Partiti ...

  2. sparkstreaming监听hdfs目录_Spark Streaming消费Kafka数据的两种方案

    下午的时候翻微信看到大家在讨论Spark消费Kafka的方式,官网中就有答案,只不过是英文的,当然很多博客也都做了介绍,正好我的收藏夹中有一篇文章供大家参考.文章写的通俗易懂,搭配代码,供大家参考. ...

  3. 客快物流大数据项目(六十一):将消费的kafka数据同步到Kudu中

    目录 将消费的kafka数据同步到Kudu中 一.导入表名映射关系类

  4. Storm 消费Kafka数据及相关异常解决

    Storm 消费Kafka数据及相关异常解决 问题 KafkaTopoDemo类 bolt类 问题 storm报错:Exception in thread "main" java. ...

  5. java消费kafka数据之后,进行堆积之后在插入数据库

    java高频的获取kafka数据,导致数据库数据一致在高频读写,为了降低数据库的高频连接搞高频读写,可以将数据堆积一段时间之后,进行插入数据库操作. 主要采用了队列和缓存,将获取到的数据放入java队 ...

  6. SparkStreaming安全消费Kafka数据

    前言 在这之前做SparkStreaming连接Kafka,我会这么写: val sparkConf = new SparkConf().setAppName("Spark2Kafka&qu ...

  7. java debug非同期ski,简记kafka group id相同导致的不同consumers启动后不消费和延时消费问题...

    场景: 在一个线程内,使用相同的brokers和group id等配置,根据传入的topic数量N,分别定义N个consumer,按定义顺序先后调用consumers消费 现象: 程序启动后,kafk ...

  8. kafka控制台模拟消费_Flink初试——对接Kafka

    本篇文章我们用 Flink Kafka Connector对接Kafka,实现一个简单的报警业务.我们暂时不去谈论理论,先上手实现这个简单的需求. flink-connector-kafka是 fli ...

  9. mac系统下使用flink消费docker运行的kafka

    版本 flink 1.12.0 scala 2.11 java 1.8 kafka 2.0.2 首先使用maven创建一个新的工程 mvn archetype:generate -Darchetype ...

  10. 【python】使用python脚本将LFW数据中1672组同一个人多张照片拷贝出来

    使用python脚本将LFW数据中1672组同一个人多张照片拷贝出来 dataCleaning4multiple.py 源码如下: import os, random, shutil import s ...

最新文章

  1. 为什么神经网络会把乌龟识别成步枪?现在的 AI 值得信任吗?
  2. 哈利波特 pdf_干货!哈利波特英文原版pdf免费领,(含音频)词汇量大于新概念!...
  3. dec++如何查看机器指令_机器指令到汇编再到高级编程语言!
  4. 谈谈对Canal( 增量数据订阅与消费 )的理解
  5. 修正 010 Editor 模板文件 MachO.bt 的错误
  6. Web Service security UserNameToken 使用
  7. 管理exchange 2010用户邮箱本地移动请求
  8. 一些html5和css3的一些常见面试题
  9. 从泰勒展开到牛顿迭代
  10. 如何安全地终止线程interrupt()、isInterrupted()、interrupted()的区别与使用
  11. html安装方正兰亭,方正兰亭字体
  12. 如何设计带限流功能的5V供电电路?快来学!
  13. 神经网络ANN——SPSS实现
  14. ruby与ruby on rails环境部署
  15. 各种UML图的应用场景
  16. 基于C++和QT实现的简单数独游戏软件
  17. java后端处理Apple Pay流程
  18. Ubuntu-18.04版本网络配置,连接网络的方法
  19. matlab在线_正版MATLAB向中国人民大学全校师生免费开放!
  20. 串口工具推荐——串口监视精灵v4.0

热门文章

  1. 《CLR via C#》笔记——运行时序列化(2)
  2. [日志]怎样的男人才会讨美女喜欢?
  3. Ajax局部刷新例子
  4. linux学习 建立静态库,动态库,写简单的makefile
  5. poj 2352 Stars 树状数组
  6. indesign软件教程,如何更改内容颜色?
  7. iOS开发之cocoapods安装(2017)
  8. Mac操作指南:废纸篓里的文件无法清除如何解决?
  9. Mac系统如何使用文件标记功能?
  10. 对Mac硬盘重新分区后如何恢复丢失的数据?