PHP kafka消息队列的使用

1.kafka安装

kafka下载地址https://mirror.bit.edu.cn/apache/kafka/,这里我下载的是最新版本2.5.0,这里演示的是kafka单机单分区的情况:

cd /opt/
wget https://mirror.bit.edu.cn/apache/kafka/2.5.0/kafka_2.12-2.5.0.tgz
tar -zxvf kafka_2.12-2.5.0.tgz
mv kafka_2.12-2.5.0 kafka
cd kafka# 编辑配置文件中的数据路径即可,这里只是演示单机单分区的情况
vim config/server.properties
log.dir=/opt/data_kafka#保存退出

配置就可以了,下面启动kafka,kafka是依赖zookeeper,所以需要先启动自带的zookeeper,再启动kafka:

#启动zookeeper -deamon表示后台启动
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
#启动kafka -deamon表示后台启动
bin/kafka-server-start.sh -daemon config/server.properties

注意:启动参数-daemon表示后台启动,启动失败是没有任何信息的

测试kafka是否安装以及启动成功:

#创建topic
##--partitions 分区数
##--replication-factor 副本数
##--topic 主题名称
bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 1 --replication-factor 1 --topic test1#测试生产/消费
##生产
bin/kafka-console-producer.sh --topic test1 --broker-list localhost:9092
##消费 --from-beginning表示从头开始拉取消息
bin/kafka-console-consumer.sh --topic test1 --bootstrap-server localhost:9092 --from-beginning#其他命令:
#查看主题列表
bin/kafka-topics.sh --list --zookeeper localhost:2181#删除主题
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test_name_1#查看主题详细信息
bin/kafka-topics.sh --describe --zookeeper localhost --topic test1

2.安装php-kafka拓展

cd /opt/#安装librdkafka 库
git clone https://github.com/edenhill/librdkafka.git
cd librdkafka
./configure
make
make install#安装php-rdkafka 扩展
git clone https://github.com/arnaud-lb/php-rdkafka.git
cd php-rdkafka
phpize
./configure --with-php-config=/usr/local/php/bin/php-config
make
make installvim /usr/local/php/etc/php.ini
#加入这行
extension=rdkafka.so

重启php-fpm

service php-fpm restart

检查是否安装成功:

php -m
#看到rdkafka表示安装成功

3.PHP kafka的使用

生产者Mkafka类:

<?phpclass Mkafka{private $kafka;private $broker_list = 'localhost:9092';private $topic_list = array();function __construct() {$this->kafka = new RdKafka\Producer();$this->kafka->addBrokers($this->broker_list);}public function getTopicByType($type = 0) {$topic_obj;$topic_key;switch ($type) {case 0:$topic_key = 'user_visit';if(empty($this->topic_list[$topic_key])) {$this->topic_list[$topic_key] = $this->kafka->newTopic($topic_key);}$topic_obj = $this->topic_list[$topic_key];break;}return $topic_obj;}/** 创建消息* type 类型 0:用户访问记录*/public function createMessage($type, $data = array()) {$data = json_encode($data);$topic_obj = $this->getTopicByType($type);$topic_obj->produce(0, RD_KAFKA_MSG_F_BLOCK, $data);$this->kafka->poll(0);$result = $this->kafka->flush(10000);return RD_KAFKA_RESP_ERR_NO_ERROR !== $result ? false : true;}}

调用生产者:

$kafka = new \Mkafka();
$result = $kafka->createMessage(0, array('name' => 'fdl'));

这时可以命令行打开消费者查看是否能消费新增的消息:

bin/kafka-console-consumer.sh --topic user_visit --bootstrap-server localhost:9092 --from-beginning

看到消息表示成功:

消费者代码:

<?phpclass MkafkaConsumer{private $kafka;private $config;private $broker_list = 'localhost';private $is_stop;/** 初始化* type 类型 0:用户访问*/function __construct($type) {$group_id;$topic;switch ($type) {case 0:$group_id = 'user_visit_consumer';$topic = 'user_visit';break;default:throwErr('type err');break;}$this->config = new RdKafka\Conf();$this->config->set('group.id', $group_id);$this->config->set('metadata.broker.list', $this->broker_list);$this->kafka = new RdKafka\KafkaConsumer($this->config);$this->kafka->subscribe([$topic]);}/** 消费消息*/public function consumeMessage() {$this->is_stop = false;while ($this->is_stop == false) {$message = $this->kafka->consume(120*1000);switch ($message->err) {case RD_KAFKA_RESP_ERR_NO_ERROR:echo 'success.';var_dump($message);echo '=======<br>';break;case RD_KAFKA_RESP_ERR__PARTITION_EOF:echo "No more messages; will wait for more<br>";break;case RD_KAFKA_RESP_ERR__TIMED_OUT:echo "Timed out<br>";break;default:echo 'err.' . $message->errstr() . '  ==  ' . $message->err;break;}if($this->is_stop == true) return true;}}/* 停止消费 */public function stopConsumeFromKafka() {$this->is_stop = true;}}

总结:网上很多文章都是丢数据的,composer也有kafka的库但是已经不能使用,最好的解决办法还是参考官方文档:

https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/book.rdkafka.html

PHP kafka消息队列的使用相关推荐

  1. kafka消息队列的概念理解

    kafka在大数据.分布式架构中都很流行.kafka可以进行流式计算,也可以做为日志系统,还可以用于消息队列. kafka作为消息队列的优点: 分布式的系统 高吞吐量.即使存储了许多TB的消息,它也保 ...

  2. 使用kafka消息队列中间件实现跨进程,跨服务器的高并发消息通讯

    作者 | 陈屹       责编 | 欧阳姝黎 近来工作上接收到一项任务,实现c++后台服务器程序,要求它能承载千万级别的DAU读写请求.目前实现千万级高并发海量数据请求的服务器设计在"套路 ...

  3. 19 kafka消息队列

    文章目录 19 kafka消息队列 一.kafka介绍 1.消息队列基本介绍 2.常用的消息队列介绍 3.消息队列的应用场景 4.消息队列的两种模式 5.kafka的基本介绍 6.kafka的架构介绍 ...

  4. kafka 消息队列

    kafka 消息队列 kafka 架构原理 大数据时代来临,如果你还不知道Kafka那就真的out了!据统计,有三分之一的世界财富500强企业正在使用Kafka,包括所有TOP10旅游公司,7家TOP ...

  5. Java+Kafka消息队列

    本文主要针对,Java端对Kafka消息队列的生产和消费.Kafka的安装部署,请看查看相关文章. 笔者最近所用的是Spring mvc,监听文件路径,然后将读取到的文件内容发送到消息队列中.由另外系 ...

  6. SpringBoot集成Kafka消息队列

    1.说明 Spring可以方便的集成使用 Kafka消息队列 , 只需要引入依赖包spring-kafka, 注意版本兼容问题, 本文详细介绍SpringBoot集成Kafka的方法, 以及生产者和消 ...

  7. kafka消息队列应用总结

    kafka官网: Apache Kafka 公司使用阿里云提供的kafka消息队列服务,分别为测试环境与生产环境,部署了多个集群. 使用场景:应用对外提供API接口调用,同时支持kafka增量消息推送 ...

  8. Flink使用KafkaSource从Kafka消息队列中读取数据

    Flink使用KafkaSource从Kafka消息队列中读取数据 使用KafkaSource从Kafka消息队列中读取数据 1.KafkaSource创建的DataStream是一个并行的DataS ...

  9. Kafka—消息队列

    Kafka-消息队列(理论部分) 一.Kafka概述 1.1.简介 kafka是一个分布式的基于发布/订阅模式的消息队列 主要应用场景:大数据实时处理领域 1.2.什么是消息队列? 消息队列 = 消息 ...

  10. kafka消息队列使用场景

    一.消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构.目前使用较多的消息队列有ActiveMQ,Rabbit ...

最新文章

  1. html5设置文字不能复制,网页文字不能复制?巧解网页文字不能复制
  2. PyTorch的计算图和自动求导机制
  3. GDI+中发生一般性错误的解决办法 from http://www.cnblogs.com/winzheng/archive/2008/12/23/1360440.html...
  4. Spring Async和Java的8 CompletableFuture
  5. 不能以根用户身份运行 Google Chrome 浏览器
  6. lvs-dr模式原理详解和可能存在的“假负载均衡”
  7. 图像处理中的卷积与模板
  8. 运行opencv保存视频时出现错误的解决方法
  9. HtmlDom 基础
  10. 如果苹果公司允许其他手机厂商使用iOS系统,对苹果公司来说会有什么好处?
  11. NET开发资源站点和部分优秀.NET开源项目
  12. 微信小程序如何实现登陆功能
  13. 为何天搜科技这些互联网企业对杭州情有独钟?
  14. Windows Filtering Platform Windows筛选平台
  15. 随着人工智能发展的少儿编程教育
  16. 使用docker安装mysql8及mysql5.7
  17. windows的由来与详细介绍
  18. 64位linux安装adobe flash play插件
  19. 技术点:手写axios
  20. ERP采购管理系统软件

热门文章

  1. 哪吒之魔童降世视听语言影评_《哪吒之魔童降世》影评:生而孤独,从不认命...
  2. feign调用https接口_SpringCloudFeign远程调用
  3. MySQL查询日期类数据常用函数
  4. TypeScript:函数基础
  5. 前端一键复制粘贴插件——clipboard.js的使用
  6. 设计模式(2)——观察者模式
  7. bootstrap select2 动态从后台Ajax动态获取数据
  8. 论文笔记_S2D.73_2019_BTS_从大到小:多尺度局部平面引导的单目深度估计
  9. Asp.net(C#)年月日时分秒毫秒
  10. HDU-1863-畅通工程