PHP kafka消息队列的使用
PHP kafka消息队列的使用
1.kafka安装
kafka下载地址https://mirror.bit.edu.cn/apache/kafka/,这里我下载的是最新版本2.5.0,这里演示的是kafka单机单分区的情况:
cd /opt/
wget https://mirror.bit.edu.cn/apache/kafka/2.5.0/kafka_2.12-2.5.0.tgz
tar -zxvf kafka_2.12-2.5.0.tgz
mv kafka_2.12-2.5.0 kafka
cd kafka# 编辑配置文件中的数据路径即可,这里只是演示单机单分区的情况
vim config/server.properties
log.dir=/opt/data_kafka#保存退出
配置就可以了,下面启动kafka,kafka是依赖zookeeper,所以需要先启动自带的zookeeper,再启动kafka:
#启动zookeeper -deamon表示后台启动
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
#启动kafka -deamon表示后台启动
bin/kafka-server-start.sh -daemon config/server.properties
注意:启动参数-daemon表示后台启动,启动失败是没有任何信息的
测试kafka是否安装以及启动成功:
#创建topic
##--partitions 分区数
##--replication-factor 副本数
##--topic 主题名称
bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 1 --replication-factor 1 --topic test1#测试生产/消费
##生产
bin/kafka-console-producer.sh --topic test1 --broker-list localhost:9092
##消费 --from-beginning表示从头开始拉取消息
bin/kafka-console-consumer.sh --topic test1 --bootstrap-server localhost:9092 --from-beginning#其他命令:
#查看主题列表
bin/kafka-topics.sh --list --zookeeper localhost:2181#删除主题
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test_name_1#查看主题详细信息
bin/kafka-topics.sh --describe --zookeeper localhost --topic test1
2.安装php-kafka拓展
cd /opt/#安装librdkafka 库
git clone https://github.com/edenhill/librdkafka.git
cd librdkafka
./configure
make
make install#安装php-rdkafka 扩展
git clone https://github.com/arnaud-lb/php-rdkafka.git
cd php-rdkafka
phpize
./configure --with-php-config=/usr/local/php/bin/php-config
make
make installvim /usr/local/php/etc/php.ini
#加入这行
extension=rdkafka.so
重启php-fpm
service php-fpm restart
检查是否安装成功:
php -m
#看到rdkafka表示安装成功
3.PHP kafka的使用
生产者Mkafka类:
<?phpclass Mkafka{private $kafka;private $broker_list = 'localhost:9092';private $topic_list = array();function __construct() {$this->kafka = new RdKafka\Producer();$this->kafka->addBrokers($this->broker_list);}public function getTopicByType($type = 0) {$topic_obj;$topic_key;switch ($type) {case 0:$topic_key = 'user_visit';if(empty($this->topic_list[$topic_key])) {$this->topic_list[$topic_key] = $this->kafka->newTopic($topic_key);}$topic_obj = $this->topic_list[$topic_key];break;}return $topic_obj;}/** 创建消息* type 类型 0:用户访问记录*/public function createMessage($type, $data = array()) {$data = json_encode($data);$topic_obj = $this->getTopicByType($type);$topic_obj->produce(0, RD_KAFKA_MSG_F_BLOCK, $data);$this->kafka->poll(0);$result = $this->kafka->flush(10000);return RD_KAFKA_RESP_ERR_NO_ERROR !== $result ? false : true;}}
调用生产者:
$kafka = new \Mkafka();
$result = $kafka->createMessage(0, array('name' => 'fdl'));
这时可以命令行打开消费者查看是否能消费新增的消息:
bin/kafka-console-consumer.sh --topic user_visit --bootstrap-server localhost:9092 --from-beginning
看到消息表示成功:
消费者代码:
<?phpclass MkafkaConsumer{private $kafka;private $config;private $broker_list = 'localhost';private $is_stop;/** 初始化* type 类型 0:用户访问*/function __construct($type) {$group_id;$topic;switch ($type) {case 0:$group_id = 'user_visit_consumer';$topic = 'user_visit';break;default:throwErr('type err');break;}$this->config = new RdKafka\Conf();$this->config->set('group.id', $group_id);$this->config->set('metadata.broker.list', $this->broker_list);$this->kafka = new RdKafka\KafkaConsumer($this->config);$this->kafka->subscribe([$topic]);}/** 消费消息*/public function consumeMessage() {$this->is_stop = false;while ($this->is_stop == false) {$message = $this->kafka->consume(120*1000);switch ($message->err) {case RD_KAFKA_RESP_ERR_NO_ERROR:echo 'success.';var_dump($message);echo '=======<br>';break;case RD_KAFKA_RESP_ERR__PARTITION_EOF:echo "No more messages; will wait for more<br>";break;case RD_KAFKA_RESP_ERR__TIMED_OUT:echo "Timed out<br>";break;default:echo 'err.' . $message->errstr() . ' == ' . $message->err;break;}if($this->is_stop == true) return true;}}/* 停止消费 */public function stopConsumeFromKafka() {$this->is_stop = true;}}
总结:网上很多文章都是丢数据的,composer也有kafka的库但是已经不能使用,最好的解决办法还是参考官方文档:
https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/book.rdkafka.html
PHP kafka消息队列的使用相关推荐
- kafka消息队列的概念理解
kafka在大数据.分布式架构中都很流行.kafka可以进行流式计算,也可以做为日志系统,还可以用于消息队列. kafka作为消息队列的优点: 分布式的系统 高吞吐量.即使存储了许多TB的消息,它也保 ...
- 使用kafka消息队列中间件实现跨进程,跨服务器的高并发消息通讯
作者 | 陈屹 责编 | 欧阳姝黎 近来工作上接收到一项任务,实现c++后台服务器程序,要求它能承载千万级别的DAU读写请求.目前实现千万级高并发海量数据请求的服务器设计在"套路 ...
- 19 kafka消息队列
文章目录 19 kafka消息队列 一.kafka介绍 1.消息队列基本介绍 2.常用的消息队列介绍 3.消息队列的应用场景 4.消息队列的两种模式 5.kafka的基本介绍 6.kafka的架构介绍 ...
- kafka 消息队列
kafka 消息队列 kafka 架构原理 大数据时代来临,如果你还不知道Kafka那就真的out了!据统计,有三分之一的世界财富500强企业正在使用Kafka,包括所有TOP10旅游公司,7家TOP ...
- Java+Kafka消息队列
本文主要针对,Java端对Kafka消息队列的生产和消费.Kafka的安装部署,请看查看相关文章. 笔者最近所用的是Spring mvc,监听文件路径,然后将读取到的文件内容发送到消息队列中.由另外系 ...
- SpringBoot集成Kafka消息队列
1.说明 Spring可以方便的集成使用 Kafka消息队列 , 只需要引入依赖包spring-kafka, 注意版本兼容问题, 本文详细介绍SpringBoot集成Kafka的方法, 以及生产者和消 ...
- kafka消息队列应用总结
kafka官网: Apache Kafka 公司使用阿里云提供的kafka消息队列服务,分别为测试环境与生产环境,部署了多个集群. 使用场景:应用对外提供API接口调用,同时支持kafka增量消息推送 ...
- Flink使用KafkaSource从Kafka消息队列中读取数据
Flink使用KafkaSource从Kafka消息队列中读取数据 使用KafkaSource从Kafka消息队列中读取数据 1.KafkaSource创建的DataStream是一个并行的DataS ...
- Kafka—消息队列
Kafka-消息队列(理论部分) 一.Kafka概述 1.1.简介 kafka是一个分布式的基于发布/订阅模式的消息队列 主要应用场景:大数据实时处理领域 1.2.什么是消息队列? 消息队列 = 消息 ...
- kafka消息队列使用场景
一.消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构.目前使用较多的消息队列有ActiveMQ,Rabbit ...
最新文章
- html5设置文字不能复制,网页文字不能复制?巧解网页文字不能复制
- PyTorch的计算图和自动求导机制
- GDI+中发生一般性错误的解决办法 from http://www.cnblogs.com/winzheng/archive/2008/12/23/1360440.html...
- Spring Async和Java的8 CompletableFuture
- 不能以根用户身份运行 Google Chrome 浏览器
- lvs-dr模式原理详解和可能存在的“假负载均衡”
- 图像处理中的卷积与模板
- 运行opencv保存视频时出现错误的解决方法
- HtmlDom 基础
- 如果苹果公司允许其他手机厂商使用iOS系统,对苹果公司来说会有什么好处?
- NET开发资源站点和部分优秀.NET开源项目
- 微信小程序如何实现登陆功能
- 为何天搜科技这些互联网企业对杭州情有独钟?
- Windows Filtering Platform Windows筛选平台
- 随着人工智能发展的少儿编程教育
- 使用docker安装mysql8及mysql5.7
- windows的由来与详细介绍
- 64位linux安装adobe flash play插件
- 技术点:手写axios
- ERP采购管理系统软件
热门文章
- 哪吒之魔童降世视听语言影评_《哪吒之魔童降世》影评:生而孤独,从不认命...
- feign调用https接口_SpringCloudFeign远程调用
- MySQL查询日期类数据常用函数
- TypeScript:函数基础
- 前端一键复制粘贴插件——clipboard.js的使用
- 设计模式(2)——观察者模式
- bootstrap select2 动态从后台Ajax动态获取数据
- 论文笔记_S2D.73_2019_BTS_从大到小:多尺度局部平面引导的单目深度估计
- Asp.net(C#)年月日时分秒毫秒
- HDU-1863-畅通工程