作者 | 陈屹       责编 | 欧阳姝黎

近来工作上接收到一项任务,实现c++后台服务器程序,要求它能承载千万级别的DAU读写请求。目前实现千万级高并发海量数据请求的服务器设计在”套路“上比较成熟,基本做法是形成服务器集群,然后将海量请求分发到集群中的各个服务器,使得服务器面对的请求数量不再“海量”,本质上就是采用分而治之,各个击破的思维来破解高并发的数据请求。

后台服务器实现的难点之一在于,当服务器程序运行在不同机器上时,服务器之间的数据通信则成为技术难点。假设客户端要上传一张图片,它会将图片数据发送给API服务器程序,后者从数据库服务器集群中选择一台,然后将图片数据发送给数据库服务器进行存储,此时API服务器和数据库服务器之间就发生了相互通讯的需求。在处理海量级别的高并发请求时,例如在微信上一秒钟内,用户可能会上传几十万张图片,于是服务器集群中,不同服务器程序之间的通讯的量级同样也是一秒内几十万分发,因此实现服务器进程间的高并发通讯是让后台能承载海量级请求的关键。

还在于满足这种需求的中间件也很成熟,目前有很多高并发消息队列组件就用于承担这种责任,其中阿帕奇的kafka就是其中佼佼者。消息队列的使用除了能够满足服务器进程之间的高并发通讯外,它还能够实现不同进程之间的解耦合,于是不同后台进程之间在实现时根本无需考虑对方的实现机制,只要确定双方通讯的消息或数据格式即可,这点很类似于面向对象中的接口机制。

我们先从感性上认识kafka的基本功能,也就是跑一次基于kafka的”hello world”。这里我们看的是kafka在mac上的运行。首先从https://kafka.apache.org/downloads下载kafka中间件的运行脚本,下载到本地后是一个tgz压缩包,解压后打开控制台,通过cd命令进入解压的文件路径。kafka的运行要基于服务器集群的控制程序叫zookeeper,因此我们先通过如下命令行启动它:

sh bin/zookeeper-server-start.sh config/zookeeper.properties

接下来要做的就是启动kafka的服务器进程,重新打开新的控制台窗口,cd到指定目录,然后执行下面命令:

 sh bin/kafka-server-start.sh config/server.properties

执行上面命令后,kafka消息队列中间件就启动了。现在我们需要做的是让一个进程往队列里发送消息,然后另一个进程从队列中获取消息从而完成不同进程之间的数据通信。发消息的进程叫做生产者,获取或接收消息的进程叫消费者,如果你看过操作系统原理这类书,你一定了解到所谓的生产者-消费者模型。

首先我们启动生产者进程:

sh bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

这个命令的大概意思是,生产者进程启动了一个消息队列叫“test”, 这个队列的数据将从端口9092发出,消费者要想获得生产者放入到队列中的数据,它就必须跟生产者通过端口9092建立连接,上面命令执行后,控制台会出现字符”<”,也就是进入等待输入状态,这时候我们就可以通过键盘输入字符串信息。我们看消费者的启动命令:

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

通过该命令,消费者就与生产者在端口9092建立连接,我们可以想象消费者和生产者在河岸的两端,队列就是在两岸建立起一座桥梁,汽车从河岸一段上桥后抵达另一端就等同于消息从生产者进程推送到消费者进程,此时我们在生产者进程的控制台窗口输入信息:

然后按下回车后,我们在消费者进程对应的控制台窗口就可以接收到相应的内容:

接下来我们看看如何通过python代码的方式实现上面功能,首先要安装相应的python程序库:

pip install kafka-python

然后我们先看生产者对应代码:

from kafka import KafkaProducer
from time import sleepdef start_producer():producer = KafkaProducer(bootstrap_servers='localhost:9092')for i in range(0,100000):msg = 'msg is ' + str(i)producer.send('my_favorite_topic2', msg.encode('utf-8'))print('send: ', msg)sleep(3)if __name__ == '__main__':start_producer()

代码很简单,但是有几点需要注意,kafka队列中间件服务器的端口默认是9092,我原来以为任何端口都可以,于是改成9091结果代码运行就错误,类似的我们完成消费者代码如下:

from kafka import KafkaConsumer
import timedef start_consumer():print('run kafka consumer...')consumer = KafkaConsumer('my_favorite_topic2', bootstrap_servers = 'localhost:9092')for msg in consumer:print(msg)print("topic = %s" % msg.topic) # topic default is stringprint("partition = %d" % msg.offset)print("value = %s" % msg.value.decode()) # bytes to stringprint("timestamp = %d" % msg.timestamp)print("time = ", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime( msg.timestamp/1000 )) )if __name__ == '__main__':start_consumer()

完成代码后,先运行生产者就可以得到如下输出:

topic = my_favorite_topic2
partition = 62
value = msg is 3
timestamp = 1620183897409
time =  2021-05-05 11:04:57
ConsumerRecord(topic='my_favorite_topic2', partition=0, offset=63, timestamp=1620183900412, timestamp_type=0, key=None, value=b'msg is 4', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=8, serialized_header_size=-1)

输出结果中有不少参数,例如partition之类的可以先不用关心。类似kafka这里消息队列中间件除了实现高并发的消息发送外,还采取了很多机制来保证消息必须发送成功,机制之一就是把发送的消息写入到文件或数据库中,发送方必须确认接收方收到消息后才将写入的数据擦除,同时它还能保证消息只会被对方接收一次。

同理运行消费者对应的代码后,所得结果如下:

ConsumerRecord(topic='my_favorite_topic2', partition=0, offset=62, timestamp=1620183897409, timestamp_type=0, key=None, value=b'msg is 3', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=8, serialized_header_size=-1)
topic = my_favorite_topic2
partition = 62
value = msg is 3
timestamp = 1620183897409
time =  2021-05-05 11:04:57
ConsumerRecord(topic='my_favorite_topic2', partition=0, offset=63, timestamp=1620183900412, timestamp_type=0, key=None, value=b'msg is 4', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=8, serialized_header_size=-1)

这些消息队列中间件的诞生,其实跟当下云后台的开发模式有关。当前后台开发喜欢采用所谓的“微服务”模式,我搜索过这个概念发现其没有明确的定义,各家各有说法,莫衷一是。我的理解是,所谓微服务就是把原来服务器程序所实现的各个功能分解开来,独立形成一个服务器小程序,当模块间需要相互配合时,就可以通过消息队列的机制把数据发送给对方。

例如在微信中发送附件给别人时,用户在手机上将文件上传到服务器,此时有一个服务器小程序A来接收用户要上传文件的消息,然后它用消息通知数据库服务器程序B,让后者把附件存储到数据库中,接着接着A又发送一个消息给服务器程序C,让C通知对应的接受者有文件传递给他,这种机制的最大优点就是能将原本衔接在一起的功能模块解耦合,使得每个模块各自为政,于是增强了后台的可扩展性和鲁棒性。

☞不打好评不给用!苹果竟然把这种“流氓” App 都放出来?☞特斯拉已在中国建立数据中心☞21 句话入门机器学习

使用kafka消息队列中间件实现跨进程,跨服务器的高并发消息通讯相关推荐

  1. 了不起的Node.js: 将JavaScript进行到底(Web开发首选,实时,跨多服务器,高并发)...

    了不起的Node.js: 将JavaScript进行到底(Web开发首选,实时,跨多服务器,高并发) Guillermo Rauch 编   赵静 译 ISBN 978-7-121-21769-2 2 ...

  2. ActiveMQ RabbitMQ RokcetMQ Kafka实战 消息队列中间件视频教程

    附上消息队列中间件百度网盘连接: 链接: https://pan.baidu.com/s/1FFZQ5w17e1TlLDSF7yhzmA 密码: hr63 转载于:https://www.cnblog ...

  3. 浅谈消息队列及常见的分布式消息队列中间件

    背景 分布式消息队列中间件是是大型分布式系统不可缺少的中间件,通过消息队列,应用程序可以在不知道彼此位置的情况下独立处理消息,或者在处理消息前不需要等待接收此消息.所以消息队列主要解决应用耦合.异步消 ...

  4. rabbitmq实战:高效部署分布式消息队列_一文看懂消息队列中间件--AMQ及部署介绍...

    概述 最近有个小项目用到了AMQ来做消息队列,之前介绍的主要是rabbitmq,所以今天主要提一下AMQ,也简单介绍下两者的区别~ 消息队列中间件 消息队列中间件(简称消息中间件)是指利用高效可靠的消 ...

  5. 消息中间件系列(七):如何从0到1设计一个消息队列中间件

    消息队列作为系统解耦,流量控制的利器,成为分布式系统核心组件之一. 如果你对消息队列背后的实现原理关注不多,其实了解消息队列背后的实现非常重要. 不仅知其然还要知其所以然,这才是一个优秀的工程师需要具 ...

  6. 基于硬件的消息队列中间件 Solace 简介之二

    小短篇介绍关于Solace https://blog.csdn.net/aqudgv83/article/details/79495489 . 前面简单介绍了Solace来自于哪家公司, 主要能做哪些 ...

  7. MQ消息队列中间件:

    MQ消息队列中间件: 微服务间通讯有同步和异步两种方式: 同步通讯:就像打电话,需要实时响应. 异步通讯:就像发信息,不需要马上回复. 同步调用的优点: 时效性较强,可以立即得到结果 同步调用的问题: ...

  8. 消息队列中间件 Message Queue 简称:MQ

    一.什么是消息队列? 消息队列,一般我们会简称它为MQ(Message Queue) 队列是一种先进先出的数据结构. 现有常用的开源消息中间件有Kafka.CMQ.JBoss Messaging.JO ...

  9. NMQ消息队列—中间件

    转自:http://www.cnblogs.com/lushilin/p/6209976.html(鲁仕林) 消息中间件NMQ 1.What is nmq? nmq = new message que ...

最新文章

  1. 【 FPGA 】组合逻辑中的竞争与险象问题(三)
  2. mysql 主主+ Keepalived 高可用
  3. Channel的几种状态
  4. Anaconda中快速安装Tensorflow与Keras并在pycharm中完成相应配置(win10cpu版)
  5. EasyExcel 2 上传 下载
  6. python 编译函数_在Python的Django框架中编写编译函数
  7. 收藏!5V转3.3V电平的19种方法技巧
  8. 美国银行将AI应用于企业应收账款处理
  9. 4位并行加载寄存器设计
  10. 软件测试都有哪些证书,软件测试都有哪些证书呀?有用吗?
  11. 1.3—Spring基础配置—3.AOP
  12. 来点硬件知识吧,今天求职吃亏了!
  13. python numba_如何用numba加速python?
  14. cpu性能参数如何看?
  15. c语言串口接收的字符转int,从串口发送和接收int值
  16. 数据库敏感数据加密技术
  17. java word 分页显示_Java 在Word中插入分页符、分节符
  18. 雷电模拟器连接Android,1.AS连接雷电模拟器
  19. linux qt 多点触摸,Qt 4.6 添加 Multi-touch(多点触摸)支持
  20. 关于2014年相关人脸检测识别的几个论文摘要翻译

热门文章

  1. Pytorch常用总结(持续更新...)
  2. 数据分析5大关键环节
  3. 关于layui.laypage.render 刷新首页没有分页问题
  4. Swift - 项目部署配置(支持的系统,设备和状态条样式等)
  5. 典型测试错误(英中文对照)
  6. 21. 栈的压入、弹出序列
  7. [论文阅读] Stereoscopically Attentive Multi-scale Network for Lightweight Salient Object Detection
  8. 矩池云上安装yolov5并测试
  9. 链表——单链表、单向循环链表、双向链表
  10. LeNet-5实战minist——搭建卷积网络模型