注: 本篇翻译自 scotch.io 。

Apache Kafka 是什么?

Kafka 是一个开源的分布式流处理平台,其简化了不同数据系统的集成。流指的是一个数据管道,应用能够通过流不断地接收数据。Kafka 作为流处理系统主要有两个用处:

数据集成: Kafka 捕捉事件流或数据变化流,并将这些数据送给其它数据系统,如关系型数据库,键值对数据库或者数据仓库。

流处理:Kafka接收事件流并保存在一个只能追加的队列里,该队列称为日志(log)。日志里的信息是不可变的,因此支持连续实时的数据处理和流转换,并使结果在系统级别可访问。

相比于其它技术,Kafka 拥有更高的吞吐量,内置分区,副本和容错率。这些使得 Kafka 成为大规模消息处理应用的良好解决方案。

Kafka 系统有三个主要的部分:

生产者(Producer): 产生原始数据的服务。

中间人(Broker): Kafka 是生产者和消费者之间的中间人,它使用API来获取和发布数据。

消费者(Consumer): 使用中间人发布的数据的服务。

访问 Kafka 官网获取更详细信息。

项目

我们将要搭建一个简单的流处理应用,其中生产者将把一个视频文件流传送给消费者,最后在浏览器页面上显示。该项目是为了展示Kafka的数据集成和流处理。

依赖

该项目介绍了 Kafka 和 消息处理。读者需要对 Python 有基本的了解。

Python 基础知识

Python 3

Kafka

Virtualenv

pip

安装 Kafka

Mac 安装: brew install kafka安装后启动: brew services start kafka

Linux 安装指南

Kafka 默认端口为 9092

配置环境

我们的项目将包括:

视频文件: 作为源数据,建议使用 5mb 以下的 mp4 文件。

简单的生产者,能将视频图像发送给 Kafka

消费者: 获取数据并展示在浏览器页面上

Kafka: 作为中间人

创建项目路径:

$ mkdir kafka && cd kafka

创建虚拟环境并启用:

$ virtualenv env && source env/bin/activate

安装需要的依赖包,我们将使用 Flask 和 opencv

pip install kafka-python opencv-python Flask

创建生产者

生产者是给 Kafka 中间人发送消息的服务。值得注意的是,生产者并不关注最终消费或加载数据的消费者。

创建生产者:

创建一个 producer.py 文件并添加如下代码:

# producer.py

import time

import cv2

from kafka import SimpleProducer, KafkaClient

# connect to Kafka

kafka = KafkaClient('localhost:9092')

producer = SimpleProducer(kafka)

# Assign a topic

topic = 'my-topic'

创建消息:

消息将由二进制的图片组成。 OpenCV 能够读取视频文件并转换成二进制。我们需要创建一个函数,用于在发送消息给 Kafka 前读取视频文件并转换成二进制。将视频文件放置在与生产者相同路径下。

发送消息:

Kafka 消息是二进制字符串格式(byte),所以图像在发送前需要被编码。

以下是完整的生产者代码:

# producer.py

import time

import cv2

from kafka import SimpleProducer, KafkaClient

# connect to Kafka

kafka = KafkaClient('localhost:9092')

producer = SimpleProducer(kafka)

# Assign a topic

topic = 'my-topic'

def video_emitter(video):

# Open the video

video = cv2.VideoCapture(video)

print(' emitting.....')

# read the file

while (video.isOpened):

# read the image in each frame

success, image = video.read()

# check if the file has read to the end

if not success:

break

# convert the image png

ret, jpeg = cv2.imencode('.png', image)

# Convert the image to bytes and send to kafka

producer.send_messages(topic, jpeg.tobytes())

# To reduce CPU usage create sleep time of 0.2sec

time.sleep(0.2)

# clear the capture

video.release()

print('done emitting')

if __name__ == '__main__':

video_emitter('video.mp4')

生产者就完成了。

创建消费者

消费者监听并消费来自 Kafka 中间人的消息。我们的消费者应该监听 my-topic 主题的消息并将消息展示。我们将使用 Flask 微框架来展示接收到的视频图像。

持续监听:

消费者将持续监听来自 Kafka 的消息更新和创建广播。我们将使用生成器来保持连接。生成器是用来生成结果序列而非单个结果的循环。由于图像是被序列地发送,我们的响应也将使用 multipart/x-mixed-replace mime type。

以下是消费者代码(consumer.py):

from flask import Flask, Response

from kafka import KafkaConsumer

#connect to Kafka server and pass the topic we want to consume

consumer = KafkaConsumer('my-topic', group_id='view', bootstrap_servers=['0.0.0.0:9092'])

#Continuously listen to the connection and print messages as recieved

app = Flask(__name__)

@app.route('/')

def index():

# return a multipart response

return Response(kafkastream(),

mimetype='multipart/x-mixed-replace; boundary=frame')

def kafkastream():

for msg in consumer:

yield (b'--frame\r\n'

b'Content-Type: image/png\r\n\r\n' + msg.value + b'\r\n\r\n')

if __name__ == '__main__':

app.run(host='0.0.0.0', debug=True)

运行项目

确保 Kafka 在运行: brew services start kafka

打开两个终端,在第一个终端中运行生产者:

(env)$ python producer.py

在第二个终端运行消费者:

(env)$ python consumer.py

这将会运行我们的 Flask 服务器。接下来打开浏览器查看链 http://0.0.0.0:5000 。

观察结果

刷新浏览器并不会重头开始播放视频。 Kafka 使用 offset 来记录消费者的日志读取位置。

如果浏览器在播放视频的时候被关闭,下次再打开浏览器输入链接后,视频将会从断点继续播放。

播放视频的时候不需要生产者在运行, Kafka 将保存消息,并当消费者需要的时候提供消息。

当生产者和消费者同时运行的时候图像几乎是实时传送到消费者。

视频处理是线性的。

消息共享能够减少生产者需要发送图像的次数。

何处使用 Kafka?

微框架: Kafka 是众多需要彼此持续异步通信的微服务间的最好管道。

数据库: 为了防止将数据仓库中的数据整个导出,可以创建 Kafka 生产者和消费者用于保存和监测数据库发生的改变。

数据收集处理:网站内置的生产者可以实时收集点击事件或访问量。

传感器和其它硬件数据。

证券报价机

总结

Kafka 是一个快速、可扩展、使用简单的分布式流处理系统。要使用 Kafka 需要知道:

生产者将发布消息给中间人的消息主题。

消费者需要监听中间人发布的消息主题。

我们创建了一个简单的流应用,展示了使用流数据的好处,如此使用如何快速以及 Kafka 如何被作为中间人。

希望你已经知道了如何使用 Kafka。

python消费kafka逻辑处理导致cpu升高_用Apache Kafka 和 Python 搭建分布式流处理系统[翻译]...相关推荐

  1. python消费kafka逻辑处理导致cpu升高_爬虫架构|利用Kafka处理数据推送问题(1)

    如下图1-1所示,我们之前爬虫集群在采集完数据之后是直接插入到MySQL数据库中,分发服务再消费MySQL里面的数据.这样的设计会有两个主要的问题: 随着数据量越来越大,数据保存和数据存取的响应效率是 ...

  2. python消费kafka逻辑处理导致cpu升高_大数据技术之一次KAFKA消费者异常引起的思考...

    本篇教程探讨了大数据技术之一次KAFKA消费者异常引起的思考,希望阅读本篇文章以后大家有所收获,帮助大家对相关内容的理解更加深入. 问题描述: 线上出现一台服务器特别慢,于是关闭了服务器上的kafka ...

  3. python消费kafka逻辑处理导致cpu升高_请教:Python模块KafkaConsumer会被Kerberos的状态影响嘛?...

    请教大家一下Kafka队列和Kerberos票据的问题. 我在运行一段python代码的时候, from kafka import KafkaConsumer, KafkaProducer impor ...

  4. python消费kafka逻辑处理导致cpu升高_Kafka 消费迟滞监控工具 Burrow

    Kafka 官方对于自身的 LAG 监控并没有太好的方法,虽然Kafka broker 自带有 kafka-topic.sh, kafka-consumer-groups.sh, kafka-cons ...

  5. python流处理框架_Python操作分布式流处理系统Kafka

    什么是Kafka Kafka是一个分布式流处理系统,流处理系统使它可以像消息队列一样publish或者subscribe消息,分布式提供了容错性,并发处理消息的机制. Kafka的基本概念 kafka ...

  6. python 死循环程序能占满cpu吗_运行tensorflow python程序,限制对GPU和CPU的占用操作...

    一般情况下,运行tensorflow时,默认会占用可以看见的所有GPU,那么就会导致其它用户或程序无GPU可用,那么就需要限制程序对GPU的占用.并且,一般我们的程序也用不了所有的GPU资源,只是强行 ...

  7. kafka 发布-订阅模式_使用Apache Kafka作为消息系统的发布-订阅通信中的微服务,并通过集成测试进行了验证...

    kafka 发布-订阅模式 发布-订阅消息系统在任何企业体系结构中都起着重要作用,因为它可以实现可靠的集成而无需紧密耦合应用程序. 在解耦的系统之间共享数据的能力并不是一个容易解决的问题. 考虑一个企 ...

  8. kafka数据不丢失不重复_如何配置 KAFKA 使其消息不会丢失

    不可靠的KAFKA 这里的不可靠是指代KAFKA其设计之初就为高性能而设计,其是允许消息丢失的,但经过多个版本的升级之后,通过KAFKA的相关配置,我们可以将其作为可靠的队列(不丢消息的队列). 在本 ...

  9. python c++情侣网名是什么意思_网友:c++与Python,究竟谁才是大哥?

    一直以来,在 TIOBE 编程语言排行榜中,简单易用的新贵 Python 和老将 C++ 是强劲的竞争对手,不过 C++ 和 Python 都是非常流行的编程语言,对于开发者而言,在选择语言利器时究竟 ...

  10. kafka查看topic中的数据_实战!Kafka Manager能统计出Topic中的记录条数吗?

    问题描述 今天现场实施同事说Kafka Manager上显示有3500w条记录,但使用我们的平台落地后,一统计发现只有2200w条记录,这是不是说明我们的平台存在丢数据的可能. 经了解,对接方是通过如 ...

最新文章

  1. 数据合并设计_八秒搞定合并相同内容的单元格,你却加班了1小时
  2. dynamic关键字
  3. Java功底之Reference
  4. xss_url通关_1-10
  5. Oracle数据库基础入门《一》Oracle服务器的构成
  6. Golang——Json的序列化和反序列化
  7. UITableView 系列四 :项目中行的操作 (添加移动和删除)(实例)
  8. [jQuery基础] jQuery核心函数和工具方法
  9. python import random_python import random 后一直无法使用解决方法
  10. 使用phpExcel实现Excel数据的导入导出(完全步骤)
  11. [C#]用Forms.TreeView显示Icon会有黑边
  12. 利用java反射根据方法名称字符串调用方法
  13. struts2 跳转类型 result type=chain、dispatcher、redirect(redirect-action)
  14. ThinkPHP的pathinfo模式、路径访问模式及URL重写
  15. 使用BeautifulSoup爬取百度图片
  16. 聊聊两个状态管理库 Redux Recoil
  17. [渝粤教育] 西南科技大学 电子测量与仪表 在线考试复习资料(1)
  18. 嵌入式Linux红外遥控,一个简单的IAL分析(红外遥控)(转)
  19. Adobe安装程序无法初始化的解决方案
  20. ICSharpCode.SharpZipLib压缩解压

热门文章

  1. 【绿豆识别】基于matlab形态学绿豆计数【含Matlab源码 1113期】
  2. 【三维路径规划】基于matlab人工蜂群算法无人机三维路径规划【含Matlab源码 021期】
  3. 统一修改gcd高程文字样式_样式与格式的使用
  4. ai人工智能的本质和未来_什么是人工智能,它将如何塑造我们的未来?
  5. 会不会导致内存泄漏_使用ThreadLocal不当可能会导致内存泄露
  6. TypeError: '' not supported between instances of 'float' and 'str'
  7. 掌握6大技巧,让python编程健步如飞!
  8. Varnish 安装部署
  9. 基础学习笔记之opencv(2):haartraining前将统一图片尺寸方法
  10. 活动目录管理之批量创建域用户