python消费kafka逻辑处理导致cpu升高_用Apache Kafka 和 Python 搭建分布式流处理系统[翻译]...
注: 本篇翻译自 scotch.io 。
Apache Kafka 是什么?
Kafka 是一个开源的分布式流处理平台,其简化了不同数据系统的集成。流指的是一个数据管道,应用能够通过流不断地接收数据。Kafka 作为流处理系统主要有两个用处:
数据集成: Kafka 捕捉事件流或数据变化流,并将这些数据送给其它数据系统,如关系型数据库,键值对数据库或者数据仓库。
流处理:Kafka接收事件流并保存在一个只能追加的队列里,该队列称为日志(log)。日志里的信息是不可变的,因此支持连续实时的数据处理和流转换,并使结果在系统级别可访问。
相比于其它技术,Kafka 拥有更高的吞吐量,内置分区,副本和容错率。这些使得 Kafka 成为大规模消息处理应用的良好解决方案。
Kafka 系统有三个主要的部分:
生产者(Producer): 产生原始数据的服务。
中间人(Broker): Kafka 是生产者和消费者之间的中间人,它使用API来获取和发布数据。
消费者(Consumer): 使用中间人发布的数据的服务。
访问 Kafka 官网获取更详细信息。
项目
我们将要搭建一个简单的流处理应用,其中生产者将把一个视频文件流传送给消费者,最后在浏览器页面上显示。该项目是为了展示Kafka的数据集成和流处理。
依赖
该项目介绍了 Kafka 和 消息处理。读者需要对 Python 有基本的了解。
Python 基础知识
Python 3
Kafka
Virtualenv
pip
安装 Kafka
Mac 安装: brew install kafka安装后启动: brew services start kafka
Linux 安装指南
Kafka 默认端口为 9092
配置环境
我们的项目将包括:
视频文件: 作为源数据,建议使用 5mb 以下的 mp4 文件。
简单的生产者,能将视频图像发送给 Kafka
消费者: 获取数据并展示在浏览器页面上
Kafka: 作为中间人
创建项目路径:
$ mkdir kafka && cd kafka
创建虚拟环境并启用:
$ virtualenv env && source env/bin/activate
安装需要的依赖包,我们将使用 Flask 和 opencv
pip install kafka-python opencv-python Flask
创建生产者
生产者是给 Kafka 中间人发送消息的服务。值得注意的是,生产者并不关注最终消费或加载数据的消费者。
创建生产者:
创建一个 producer.py 文件并添加如下代码:
# producer.py
import time
import cv2
from kafka import SimpleProducer, KafkaClient
# connect to Kafka
kafka = KafkaClient('localhost:9092')
producer = SimpleProducer(kafka)
# Assign a topic
topic = 'my-topic'
创建消息:
消息将由二进制的图片组成。 OpenCV 能够读取视频文件并转换成二进制。我们需要创建一个函数,用于在发送消息给 Kafka 前读取视频文件并转换成二进制。将视频文件放置在与生产者相同路径下。
发送消息:
Kafka 消息是二进制字符串格式(byte),所以图像在发送前需要被编码。
以下是完整的生产者代码:
# producer.py
import time
import cv2
from kafka import SimpleProducer, KafkaClient
# connect to Kafka
kafka = KafkaClient('localhost:9092')
producer = SimpleProducer(kafka)
# Assign a topic
topic = 'my-topic'
def video_emitter(video):
# Open the video
video = cv2.VideoCapture(video)
print(' emitting.....')
# read the file
while (video.isOpened):
# read the image in each frame
success, image = video.read()
# check if the file has read to the end
if not success:
break
# convert the image png
ret, jpeg = cv2.imencode('.png', image)
# Convert the image to bytes and send to kafka
producer.send_messages(topic, jpeg.tobytes())
# To reduce CPU usage create sleep time of 0.2sec
time.sleep(0.2)
# clear the capture
video.release()
print('done emitting')
if __name__ == '__main__':
video_emitter('video.mp4')
生产者就完成了。
创建消费者
消费者监听并消费来自 Kafka 中间人的消息。我们的消费者应该监听 my-topic 主题的消息并将消息展示。我们将使用 Flask 微框架来展示接收到的视频图像。
持续监听:
消费者将持续监听来自 Kafka 的消息更新和创建广播。我们将使用生成器来保持连接。生成器是用来生成结果序列而非单个结果的循环。由于图像是被序列地发送,我们的响应也将使用 multipart/x-mixed-replace mime type。
以下是消费者代码(consumer.py):
from flask import Flask, Response
from kafka import KafkaConsumer
#connect to Kafka server and pass the topic we want to consume
consumer = KafkaConsumer('my-topic', group_id='view', bootstrap_servers=['0.0.0.0:9092'])
#Continuously listen to the connection and print messages as recieved
app = Flask(__name__)
@app.route('/')
def index():
# return a multipart response
return Response(kafkastream(),
mimetype='multipart/x-mixed-replace; boundary=frame')
def kafkastream():
for msg in consumer:
yield (b'--frame\r\n'
b'Content-Type: image/png\r\n\r\n' + msg.value + b'\r\n\r\n')
if __name__ == '__main__':
app.run(host='0.0.0.0', debug=True)
运行项目
确保 Kafka 在运行: brew services start kafka
打开两个终端,在第一个终端中运行生产者:
(env)$ python producer.py
在第二个终端运行消费者:
(env)$ python consumer.py
这将会运行我们的 Flask 服务器。接下来打开浏览器查看链 http://0.0.0.0:5000 。
观察结果
刷新浏览器并不会重头开始播放视频。 Kafka 使用 offset 来记录消费者的日志读取位置。
如果浏览器在播放视频的时候被关闭,下次再打开浏览器输入链接后,视频将会从断点继续播放。
播放视频的时候不需要生产者在运行, Kafka 将保存消息,并当消费者需要的时候提供消息。
当生产者和消费者同时运行的时候图像几乎是实时传送到消费者。
视频处理是线性的。
消息共享能够减少生产者需要发送图像的次数。
何处使用 Kafka?
微框架: Kafka 是众多需要彼此持续异步通信的微服务间的最好管道。
数据库: 为了防止将数据仓库中的数据整个导出,可以创建 Kafka 生产者和消费者用于保存和监测数据库发生的改变。
数据收集处理:网站内置的生产者可以实时收集点击事件或访问量。
传感器和其它硬件数据。
证券报价机
总结
Kafka 是一个快速、可扩展、使用简单的分布式流处理系统。要使用 Kafka 需要知道:
生产者将发布消息给中间人的消息主题。
消费者需要监听中间人发布的消息主题。
我们创建了一个简单的流应用,展示了使用流数据的好处,如此使用如何快速以及 Kafka 如何被作为中间人。
希望你已经知道了如何使用 Kafka。
python消费kafka逻辑处理导致cpu升高_用Apache Kafka 和 Python 搭建分布式流处理系统[翻译]...相关推荐
- python消费kafka逻辑处理导致cpu升高_爬虫架构|利用Kafka处理数据推送问题(1)
如下图1-1所示,我们之前爬虫集群在采集完数据之后是直接插入到MySQL数据库中,分发服务再消费MySQL里面的数据.这样的设计会有两个主要的问题: 随着数据量越来越大,数据保存和数据存取的响应效率是 ...
- python消费kafka逻辑处理导致cpu升高_大数据技术之一次KAFKA消费者异常引起的思考...
本篇教程探讨了大数据技术之一次KAFKA消费者异常引起的思考,希望阅读本篇文章以后大家有所收获,帮助大家对相关内容的理解更加深入. 问题描述: 线上出现一台服务器特别慢,于是关闭了服务器上的kafka ...
- python消费kafka逻辑处理导致cpu升高_请教:Python模块KafkaConsumer会被Kerberos的状态影响嘛?...
请教大家一下Kafka队列和Kerberos票据的问题. 我在运行一段python代码的时候, from kafka import KafkaConsumer, KafkaProducer impor ...
- python消费kafka逻辑处理导致cpu升高_Kafka 消费迟滞监控工具 Burrow
Kafka 官方对于自身的 LAG 监控并没有太好的方法,虽然Kafka broker 自带有 kafka-topic.sh, kafka-consumer-groups.sh, kafka-cons ...
- python流处理框架_Python操作分布式流处理系统Kafka
什么是Kafka Kafka是一个分布式流处理系统,流处理系统使它可以像消息队列一样publish或者subscribe消息,分布式提供了容错性,并发处理消息的机制. Kafka的基本概念 kafka ...
- python 死循环程序能占满cpu吗_运行tensorflow python程序,限制对GPU和CPU的占用操作...
一般情况下,运行tensorflow时,默认会占用可以看见的所有GPU,那么就会导致其它用户或程序无GPU可用,那么就需要限制程序对GPU的占用.并且,一般我们的程序也用不了所有的GPU资源,只是强行 ...
- kafka 发布-订阅模式_使用Apache Kafka作为消息系统的发布-订阅通信中的微服务,并通过集成测试进行了验证...
kafka 发布-订阅模式 发布-订阅消息系统在任何企业体系结构中都起着重要作用,因为它可以实现可靠的集成而无需紧密耦合应用程序. 在解耦的系统之间共享数据的能力并不是一个容易解决的问题. 考虑一个企 ...
- kafka数据不丢失不重复_如何配置 KAFKA 使其消息不会丢失
不可靠的KAFKA 这里的不可靠是指代KAFKA其设计之初就为高性能而设计,其是允许消息丢失的,但经过多个版本的升级之后,通过KAFKA的相关配置,我们可以将其作为可靠的队列(不丢消息的队列). 在本 ...
- python c++情侣网名是什么意思_网友:c++与Python,究竟谁才是大哥?
一直以来,在 TIOBE 编程语言排行榜中,简单易用的新贵 Python 和老将 C++ 是强劲的竞争对手,不过 C++ 和 Python 都是非常流行的编程语言,对于开发者而言,在选择语言利器时究竟 ...
- kafka查看topic中的数据_实战!Kafka Manager能统计出Topic中的记录条数吗?
问题描述 今天现场实施同事说Kafka Manager上显示有3500w条记录,但使用我们的平台落地后,一统计发现只有2200w条记录,这是不是说明我们的平台存在丢数据的可能. 经了解,对接方是通过如 ...
最新文章
- 数据合并设计_八秒搞定合并相同内容的单元格,你却加班了1小时
- dynamic关键字
- Java功底之Reference
- xss_url通关_1-10
- Oracle数据库基础入门《一》Oracle服务器的构成
- Golang——Json的序列化和反序列化
- UITableView 系列四 :项目中行的操作 (添加移动和删除)(实例)
- [jQuery基础] jQuery核心函数和工具方法
- python import random_python import random 后一直无法使用解决方法
- 使用phpExcel实现Excel数据的导入导出(完全步骤)
- [C#]用Forms.TreeView显示Icon会有黑边
- 利用java反射根据方法名称字符串调用方法
- struts2 跳转类型 result type=chain、dispatcher、redirect(redirect-action)
- ThinkPHP的pathinfo模式、路径访问模式及URL重写
- 使用BeautifulSoup爬取百度图片
- 聊聊两个状态管理库 Redux Recoil
- [渝粤教育] 西南科技大学 电子测量与仪表 在线考试复习资料(1)
- 嵌入式Linux红外遥控,一个简单的IAL分析(红外遥控)(转)
- Adobe安装程序无法初始化的解决方案
- ICSharpCode.SharpZipLib压缩解压
热门文章
- 【绿豆识别】基于matlab形态学绿豆计数【含Matlab源码 1113期】
- 【三维路径规划】基于matlab人工蜂群算法无人机三维路径规划【含Matlab源码 021期】
- 统一修改gcd高程文字样式_样式与格式的使用
- ai人工智能的本质和未来_什么是人工智能,它将如何塑造我们的未来?
- 会不会导致内存泄漏_使用ThreadLocal不当可能会导致内存泄露
- TypeError: '' not supported between instances of 'float' and 'str'
- 掌握6大技巧,让python编程健步如飞!
- Varnish 安装部署
- 基础学习笔记之opencv(2):haartraining前将统一图片尺寸方法
- 活动目录管理之批量创建域用户