前提

1、 安装openresty,记得安装nginx的监控模块
2、 安装kafka
3、 下载lua+kafka插件:https://github.com/doujiang24/lua-resty-kafka/archive/master.zip
4、 解压插件,将lua-resty-kafka-master\lib\resty\kafka文件夹放到openresty/lualib/resty下

首先修改openresty的配置文件中localtion位置,引入外部lua文件,这样修改lua文件会比较方便

  location / {default_type text/html;content_by_lua_file /usr/local/openresty/tmp.lua;}

案例

1 先获取kafka的实例
2 通过实例获取连接
3设置分区发送策略
4调用send方法发送数据
5启动一个kafka消费测试,验证是否发送成功

lua代码

---
--- Generated by EmmyLua(https://github.com/EmmyLua)
--- Created by NH55.
--- DateTime: 2020/12/11 11:48
---
--- 数据采集运行线程阈值监控,如果超过了我们设置的最大阈值,那么就等待不send数据,下个批次再次执行
local DEFAULT_THRESH = 100
-- 编写kafka相关配置
-- 配置broker地址local BROKER_LIST = {{ host = "192.168.xx.101", port = 9092 },{ host = "192.168.xx.102", port = 9092 },{ host = "192.168.xx.103", port = 9092 }}
-- kafka分区数
local PARTITION_NUM = 3
-- kafka的topic
local TOPIC = "csdn"
-- producerConfig
local CONNECT_PARAMS = {producer_type = "async", socket_timeout = 30000,flush_time = 10000, request_timeout = 20000
}
-- 默认分区
local function default_partitioner(key, num, correlation_id)local id = key and crc32(key) or correlation_id-- partition_id is continuous and start from 0return id % num
end
--- 我们为了让数据均匀分布到每一个分区内,这里我们使用轮询方式发送消息至Kafka分区中
--- 相当于自定义分区的模式,当然你也可以不用这种方式,使用默认的分区也行
-- 获取共享内存数据
local shared_data = ngx.shared.shared_data
-- 设置共享内存的变量(Key)
local sharedKey = "shared_Key"
local key_val = shared_data:get(sharedKey)
if not key_val thenkey_val = 1shared_data:set(sharedKey,key_val)
end
-- 计算消息发送分区
local partition_id = ""..tonumber(key_val%PARTITION_NUM)
--每个key的value要自增
shared_data:incr(sharedKey,1)
-- 变量监控
local isGone = true
-- 进行阈值判断
if tonumber(ngx.var.connections_active) > tonumber(DEFAULT_THRESH) thenisGone =false
end
-- 满足条件true执行,反之不执行
if isGone then-- 获取页面数据信息local time_local = ngx.var.time_localif time_local == nil thentime_local = ""endlocal request = ngx.var.requestif request == nil thenrequest = ""endlocal request_method = ngx.var.request_methodif request_method == nil thenrequest_method = ""endlocal content_type = ngx.var.content_typeif content_type == nil thencontent_type = ""endngx.req.read_body()local request_body = ngx.var.request_bodyif request_body == nil thenrequest_body = ""endlocal http_referer = ngx.var.http_refererif http_referer == nil thenhttp_referer = ""endlocal remote_addr = ngx.var.remote_addrif remote_addr == nil thenremote_addr = ""endlocal http_user_agent = ngx.var.http_user_agentif http_user_agent == nil thenhttp_user_agent = ""endlocal time_iso8601 = ngx.var.time_iso8601if time_iso8601 == nil thentime_iso8601 = ""endlocal server_addr = ngx.var.server_addrif server_addr == nil thenserver_addr = ""endlocal http_cookie = ngx.var.http_cookieif http_cookie == nil thenhttp_cookie = ""end--封装数据local message = time_local .."#CS#".. request .."#CS#".. request_method .."#CS#"..content_type .."#CS#".. request_body .."#CS#".. http_referer .."#CS#"..remote_addr .."#CS#".. http_user_agent .."#CS#".. time_iso8601 .."#CS#"..server_addr .."#CS#".. http_cookie;-- 引入生产者模块创建实例local producerDic = require "resty.kafka.producer"-- 创建实例local producer = producerDic:new(BROKER_LIST,CONNECT_PARAMS)-- 调用发送方法sendlocal ok,err = producer:send(TOPIC,partition_id,message)-- 判断发送消息是否成功打印日志if not ok thenngx.log("kafka send message err:",err)end
end

之后打开消费者

kafka-console-consumer.sh \
--bootstrap-server mypc01:9092,mypc02:9092,mypc03:9092 \
--topic csdn

刷新nginx监听的网页,在消费者端就可以收到内容
tt

11/Dec/2020:19:53:58 +0800#CS#GET / HTTP/1.1#CS#GET#CS##CS##CS##CS#192.168.xx.1#CS#Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36 Edg/87.0.664.57#CS#2020-12-11T19:53:58+08:00#CS#192.168.xx.101#CS#
11/Dec/2020:19:54:01 +0800#CS#GET /

注意事项

kafka server.properties 需开启如下选项
集群的每台机器都需要打开

advertised.listeners=PLAINTEXT://192.168.xx.103:9092

openresty lua集成kafka相关推荐

  1. SpringBoot笔记:SpringBoot2.3集成Kafka组件配置

    文章目录 说明 Springboot集成Kafka 依赖配置 配置文件yml配置 Producer生产者 Consumer消费者 测试代码 注意事项 说明 本文是接<Kafka学习:CentOS ...

  2. springboot集成kafka及kafka web UI的使用

    springboot集成kafka application.properties spring.kafka.bootstrap-servers=CentOSA:9092,CentOSB:9092,Ce ...

  3. .Net Core 集成 Kafka

    最近维护的一个系统并发有点高,所以想引入一个消息队列来进行削峰.考察了一些产品,最终决定使用kafka来当做消息队列.以下是关于kafka的一些知识的整理笔记. kafka kafka 是分布式流式平 ...

  4. Splunk集成Kafka配置方法

    [摘要]Splunk是业界赫赫有名的数据分析工具,比较擅长BI和安全分析,我司很多部门都有购买其产品和服务.最近有个需求要把Splunk和分部署消息队列Kafka做个集成,Splunk官方提供的一个K ...

  5. storm如何集成kafka

    之前的kafka案例:http://blog.csdn.net/weixin_35757704/article/details/77196539 之前的storm案例:http://blog.csdn ...

  6. SpringCloud学习之SpringCloudStream集成kafka

    一.关于Spring-Cloud-Stream Spring Cloud Stream本质上就是整合了Spring Boot和Spring Integration,实现了一套轻量级的消息驱动的微服务框 ...

  7. Kafka09:【案例】Flume集成Kafka

    一.Flume集成Kafka 在实际工作中flume和kafka会深度结合使用 1:flume采集数据,将数据实时写入kafka 2:flume从kafka中消费数据,保存到hdfs,做数据备份 下面 ...

  8. OpenResty+Lua实现WAF防火墙

    OpenResty+Lua实现WAF防火墙 WAF功能 白名单 黑名单 防止SQL注入 防止CC攻击 若服务器收到CC攻击,一分钟内请求数大于60,则会将对方ip封进黑名单 文件结构 waf 项目文件 ...

  9. springboot 集成kafka 实现多个customer不同group

    springboot正常集成kafka 这个网上很多资料都有些集成,我就不浪费太多篇幅和时间了,笔者找了篇还算很容易理解的博客,自行学习 https://blog.csdn.net/tzs_10412 ...

最新文章

  1. python安装后怎么配置环境变量_Python安装与环境变量的配置
  2. Linux 操作系统原理 — 文件系统 — 存储布局
  3. 【转】学习asp.net比较完整的流程
  4. LoadRunner监控Linux的三种方法
  5. 聊一聊Python的变量类型判断type和isinstance
  6. 有关Drools业务规则引擎的完整教程
  7. java chsftp.get 追加_Java SFTP上传使用JSch,但如何覆盖当前文件?
  8. 【转】1.9 Asp.Net Core 轻松学-多线程之取消令牌(
  9. CIO启示:转向数字业务为传统IT带来颠覆性影响
  10. python使用virtualenvwrapper
  11. 三大框架ssh整合(一)
  12. spotify电脑下载歌曲_我来简单说一下Apple Music和Spotify的下载方法
  13. Windows Server 2008搭建FTP服务
  14. 讯飞语音测评的简单demo演示
  15. 自定义ListView实现任意View跑马灯效果
  16. pycharm中导入pyQt无法使用
  17. 记github学生认证
  18. parseFloat() 小数点后不为0,就保留2位。否则为整数
  19. 树莓派:基于物联网的指纹打卡器
  20. 西北农林科技大学linux实验,2021双非科班调剂985(一志愿中科大,调剂上岸西北农林科技大学)初试复试经验帖...

热门文章

  1. 服务器经过交换机传递文件丢失,H3C交换机bin文件丢失后补救方案
  2. python自带的解释器和编辑器叫什么_(四)python自带解释器(LDIE)的使用
  3. GAN用数学语言描述
  4. vue封装echarts示例
  5. cesium加载arcgis切片
  6. vue封装websocket_Vue中使用websocket
  7. frpc客户端 linux安装,centos配置frp服务端,与客户端
  8. java sleep方法_6种快速统计代码执行时间的方法,真香!(史上最全)
  9. java map 缓存数据_Map方式实现JAVA数据缓存
  10. 快速ping_PING你真的会用么?