最近在做点监控系统,先后采用了两套方案:
方案一:Nginx记录日志 --> tail语句扫描日志到Kafka --> Java站点消费Kafka消息后存储到Mysql数据库 --> Mysql数据库 --> 数据库定时汇聚数据 --> 界面呈现
方案一遇到的问题:
1.1 面对海量数据,日志文件增长很快,磁盘占用大
1.2 Java站点消费Kafka消息后存储到Mysql数据库 太慢

为了解决上述两个问题,采用方案二:
方案二:Nginx+Lua直接向Kafa发送数据  --> Java站点消费Kafka消息后存储到Mysql数据库 --> Mysql数据库 --> 数据库定时汇聚数据 --> 界面呈现
2.1 Nginx+Lua直接向Kafa发送数据: Nginx不记录日志,直接发送到Kafka,不占用磁盘空间
2.2 Java站点消费Kafka消息后存储到Mysql数据库: 合并更新操作,如对同一个sessionid的更新操作有100条,合并后只有一条更新语句
方案二遇到的问题:
2.3 Nginx可以直接用完整的 OpenResty,也可以在现有的 Nginx基础上添加插件
    我采用的是在现有的Nginx基础上添加插件

方案一具体如下:
Nginx1.9.9
kafka_2.11-0.9.0.1

1.1、Nginx配置文件里的配置:
log_format ht-video  '$request|$msec|$http_x_forwarded_for|$http_user_agent|$request_body';

server{
                server_name lc.zhihuishu.com;

location / {
                        root /usr/local/nginxnew/html/view;
                        index index.html;
                }
                location /ht{
                        #root   html;
                        #index  index.html index.htm;
                        if ( $request ~ "GET" ) {
                                #access_log logs/ht-video.log ht-video;
                                                access_log /htlog/ht-video.log ht-video;
                        }
                        proxy_set_header Host $http_host;
                        proxy_pass http://localhost:6099;
                        client_max_body_size 8000M;
                        #error_page 405 =200 $1;
                }
        }

server {
                listen       6099;
                server_name  localhost;
                location /ht {
                root   /usr/local/nginxnew/html/hightower;
                index  index.html index.htm;
                client_max_body_size 8000M;
                error_page 405 =200 $1;
        }

1.2、修改本机Hosts文件
vi /etc/hosts

10.252.126.242  kafka1.livecourse.com
10.252.126.242  zookeeper1.livecourse.com

1.3、修改Kafka文件
vi /usr/local/kafka_2.11-0.9.0.1/config/server.properties 
   这个文件里要改下面5点:
broker.id=0
port=9092
host.name=10.252.126.242
log.dirs=/able4/kafka-logs
zookeeper.connect=zookeeper1.livecourse.com:2181

vi /usr/local/kafka_2.11-0.9.0.1/bin/kafka-run-class.sh 
  添加两行:
export JAVA_HOME=/usr/local/java/jdk1.7.0_79
export JRE_HOME=/usr/local/java/jdk1.7.0_79/jre

1.4、启动zookeeper和kafka
/usr/local/kafka_2.11-0.9.0.1/bin/zookeeper-server-start.sh -daemon /usr/local/kafka_2.11-0.9.0.1/config/zookeeper.properties
/usr/local/kafka_2.11-0.9.0.1/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.11-0.9.0.1/config/server.properties

1.5、扫描Nginx日志发送到kafka
tail -n 0 -f  /usr/local/nginxnew/logs/ht-video.log | /usr/local/kafka_2.11-0.9.0.1/bin/kafka-console-producer.sh --broker-list kafka1.livecourse.com:9092 --topic HT-VIDEO

方案二具体如下:
Nginx1.9.9
LuaJIT-2.0.4
lua-nginx-module-0.10.2
ngx_devel_kit-0.3.0rc1
lua-resty-kafka
kafka_2.11-0.9.0.1

参考的这几篇文章方成此方案

Nginx与Lua

基于Lua+Kafka+Heka的Nginx Log实时监控系统

Lua如何读取nginx内置对象和方法

Kafka官方文档

nginx+lua+kafka实现日志统一收集汇总

1 安装LuaJIT
下载http://luajit.org/download.html
http://luajit.org/install.html
  命令如下:
tar zxf LuaJIT-2.0.4.tar.gz
cd LuaJIT-2.0.4
make PREFIX=/usr/local/LuaJIT
make install PREFIX=/usr/local/LuaJIT
echo "/usr/local/LuaJIT/lib" > /etc/ld.so.conf.d/usr_local_luajit_lib.conf
ldconfig
#注意环境变量!
export LUAJIT_LIB=/usr/local/LuaJIT/lib
export LUAJIT_INC=/usr/local/LuaJIT/include/luajit-2.0

2 安装lua-nginx-module
https://github.com/openresty/lua-nginx-module/tags
cd /usr/local
tar zxvf lua-nginx-module-0.10.2.tar.gz

3 安装ngx_devel_kit

https://github.com/simpl/ngx_devel_kit/tags

http://17173ops.com/2013/11/01/17173-ngx-lua-manual.shtml
cd /usr/local
tar zxvf ngx_devel_kit-0.3.0rc1.tar.gz

4 安装编译Nginx
http://17173ops.com/2013/11/01/17173-ngx-lua-manual.shtml
给Nginx添加下面的参数,如果已经安装了Nginx则用
  nginx -V
   --add-module=/usr/local/lua-nginx-module-0.10.2 --add-module=/usr/local/ngx_devel_kit-0.3.0rc1
如:
cd /usr/local/nginx-1.9.9
./configure  --prefix=/usr/local/nginxnew/ --with-http_stub_status_module --with-http_ssl_module --with-http_realip_module --add-module=/root/install/ngx_log_if-master --add-module=/usr/local/lua-nginx-module-0.10.2 --add-module=/usr/local/ngx_devel_kit-0.3.0rc1

5 lua插件lua-resty-kafka
https://github.com/doujiang24/lua-resty-kafka
mkdir /usr/local/lua
上传lua-cjson-2.1.0.3.tar.gz到/usr/local/lua
上传lua-resty-kafka到/usr/local/lua
这里遇到些问题,我改写了其中的client.lua文件的两个方法:
方法1:
function _M.new(self, broker_list, client_config)
    local opts = client_config or {}
    local socket_config = {
        socket_timeout = opts.socket_timeout or 3000,
        keepalive_timeout = opts.keepalive_timeout or 600 * 1000,   -- 10 min
        keepalive_size = opts.keepalive_size or 2,
socket_config
    }
    --start 0 == wangsihong 2016-4-16
    local broker_list_host_ip = opts.broker_list_host_ip or {}

local cli = setmetatable({
        broker_list = broker_list,
broker_list_host_ip = broker_list_host_ip,
        topic_partitions = {},
        brokers = {},
        client_id = "worker" .. pid(),
        socket_config = socket_config,
    }, mt)
    --end 0 == wangsihong 2016-4-16

if opts.refresh_interval then
        meta_refresh(nil, cli, opts.refresh_interval / 1000) -- in ms
    end

return cli
end

方法2:
function _M.choose_broker(self, topic, partition_id)
    local brokers, partitions = self:fetch_metadata(topic)

if not brokers then
        return nil, partitions
    end

local partition = partitions[partition_id]
    if not partition then
        return nil, "not found partition"
    end

local config = brokers[partition.leader]
    if not config then
        return nil, "not found broker"
    end

--start 1 == wangsihong 2016-4-16
    local broker_list_host_ip = self.broker_list_host_ip
    for k = 1, #broker_list_host_ip do
         local hostip = broker_list_host_ip[k]
         if config ~= nil and hostip ~= nil and config.host == hostip.host then
             config.host = broker_list_host_ip[k].ip
         end
     end
    --end  1 == wangsihong 2016-4-16

return config
end

6 lua插件cjson
http://www.kyne.com.au/~mark/software/lua-cjson-manual.html
cd /usr/local/lua
tar zxvf lua-cjson-2.1.0.3.tar.gz

7 安装Nginx1.9.9好后修改nginx.conf

http{
#    lua_package_path "/usr/local/lua/lua-resty-kafka/lib/?.lua;/usr/local/lua/lua-cjson-2.1.0.3/lua/?.lua;;";
#    lua_package_cpath '/usr/local/LuaJIT/lib/lua/5.1/?.so;';
    lua_package_path "/usr/local/lua/lua-resty-kafka/lib/?.lua;;";

server{
#为了方便调试,关闭了lua_code_cache,如果是生产环境,应该开启它。
lua_code_cache off;

listen 80;
server_name localhost;
location = /lua-v {
            content_by_lua '
                ngx.header.content_type = "text/plain";
                if jit then
                        ngx.say(jit.version)
                else
                        ngx.say(_VERSION)
                end
             ';
        }

location /ht3/video/p {
            content_by_lua '
                ngx.header.content_type = "text/plain";
                --local cjson = require "cjson"
                --local client = require "resty.kafka.client"
                local producer = require "resty.kafka.producer"

local broker_list = {
                    { host = "10.252.126.242", port = 9092 },
                    { host = "10.252.126.242", port = 9093 },
                    { host = "10.252.126.242", port = 9094 },
                    { host = "10.252.126.242", port = 9095 },
                    { host = "10.252.126.242", port = 9096 },
                }
local broker_list_host_ip = {
   { host = "kafka1.livecourse.com", ip = "10.252.126.242" },
}

local key = "key"
                --local message = $request|$msec|$remote_addrx|$http_user_agent|$request_body

local myIP = ngx.req.get_headers()["X-Real-IP"]
                if myIP == nil then
                        myIP = ngx.req.get_headers()["x_forwarded_for"]
                end
                if myIP == nil then
                        myIP = ngx.var.remote_addr
                end
                local h = ngx.req.get_headers()

local message = ngx.req.get_method() .. " " .. ngx.var.uri
if ngx.var.args  ~= nil then
    message = message .. "?" .. ngx.var.args .. "|"
end
message = message .. ngx.now() .. "|"
message = message .. myIP .. "|"
message = message .. h["user-agent"] .. "|"
local bodyData = ngx.req.get_body_data()
if bodyData  == nil then
bodyData = "-"
end
message = message .. bodyData

-- 定义kafka异步生产者
                -- 发送日志消息,send第二个参数key,用于kafka路由控制:
                -- key为nill(空)时,一段时间向同一partition写入数据
                -- 指定key,按照key的hash写入到对应的partition

-- 指定key,按照key的hash写入到对应的partition

local key = nil

-- this is async producer_type and bp will be reused in the whole nginx worker
                local bp = producer:new(broker_list, { producer_type = "async" , broker_list_host_ip = broker_list_host_ip,refresh_interval = 3000})

local ok, err = bp:send("HT-VIDEO", key, message)
                if not ok then
                    --ngx.say("send err:", err)
   ngx.say("void(1);");
                    return
                end
ngx.say("void(0);");
                --ngx.say("send success, ok:", ok)
            ';
        }
       
    }

}

8 完成

请求Url:http://localhost/ht3/video/p?a=bbbb 就能将日志内容发送到kafka

基于Nginx1.9+LuaJIT+Kafka的点播监控系统实战(上海卓越智慧树网点播监控系统)相关推荐

  1. 【学员源鑫笔记】韦东山物联网流媒体实战项目-智慧家居视频监控系统(值得收藏)...

    点击上方「嵌入式云IOT技术圈」,选择「置顶公众号」第一时间查看嵌入式笔记! 作者 | 嵌入式云IOT技术圈 编排 | 嵌入式云IOT技术圈 嵌入式行业的大佬韦东山老师一贯是授人以鱼不如授人以渔,作为 ...

  2. 系统linux/redhat6.5 zabbix 2.47监控nginx1.8.0 (下)

    2019独角兽企业重金招聘Python工程师标准>>> 6  zabbix_get获取数据(服务器端) (测试是否能接受到数据) zabbix_get -s 172.16.145.7 ...

  3. Python 基于pykafka简单实现KAFKA消费者

    基于pykafka简单实现KAFKA消费者   By: 授客 QQ:1033553122         1.测试环境 python 3.4 zookeeper-3.4.13.tar.gz 下载地址1 ...

  4. 【树莓派-网络监控(5)前端搭建】基于iframe标签,集成监控实时画面与遥控功能,完成网络监控的搭建与调试

    前期内容提要: [树莓派-网络监控(1)分析准备]基于树莓派搭建可视化可远程遥控网络监控--工程分析及前期准备 [树莓派-网络监控(2)画面传输]基于mjpg-stream实现监控画面的传输 [树莓派 ...

  5. Kafka万亿级消息实战

    作者:vivo互联网服务器团队-Yang Yijun 一.Kafka应用 本文主要总结当Kafka集群流量达到 万亿级记录/天或者十万亿级记录/天  甚至更高后,我们需要具备哪些能力才能保障集群高可用 ...

  6. Linux实战教学笔记49:Zabbix监控平台3.2.4(一)搭建部署与概述

    https://www.cnblogs.com/chensiqiqi/p/9162986.html 一,Zabbix架构 zabbix 是一个基于 WEB 界面的提供分布式系统监视以及网络监视功能的企 ...

  7. 一篇运维老司机的大数据平台监控宝典(1)-联通大数据集群平台监控体系进程详解

    一篇运维老司机的大数据平台监控宝典(1)-联通大数据集群平台监控体系进程详解 "如果你是一个经验丰富的运维开发人员,那么你一定知道ganglia.nagios.zabbix.elastics ...

  8. 视频系统 流媒体 rtsp hls h264 h265 aac 高并发 低延时 系统 设计 录像 视频合成 转发 点播 快进 快退 单步播放 分布式集群

    系统改名为:,升级包改使用jpg图像封装,从2.124版本开始,1.*的升级包停止使用 系统工具            下载地址(2019-04-19) :https://pan.baidu.com/ ...

  9. 【监控】Prometheus(普罗米修斯)监控概述

    文章目录 一.监控系统概论 二.基础资源监控 2.1.网络监控 2.2.存储监控 2.3.服务器监控 2.4.中间件监控 2.5.应用程序监控(APM) 三.Prometheus 简介 3.1.什么是 ...

最新文章

  1. vue项目中使用element的dialog中引入ztree却不能初始化解决办法
  2. uva - Broken Keyboard (a.k.a. Beiju Text)(链表)
  3. CentOS5.8分区与文件系统
  4. 高斯加权滤波matlab,简单易懂的高斯滤波
  5. VC编程使计算机休眠
  6. 【单片机仿真】(十一)指令系统逻辑运算指令 — 逻辑与指令ANL、逻辑或指令ORL
  7. 计算机和数学 论文参考文献,数学与计算机参考文献外国 数学与计算机核心期刊参考文献哪里找...
  8. 麻省理工遍地走,6年经验安卓程序员面试微软,靠这份思维脑图拿下Offer!
  9. 用计算机计算2的31次方,2的31次方,用什么方法可以最快算出来呢
  10. 如何使用shell限制指定用户shell程序的网络带宽
  11. SFP光模块相关知识
  12. 怎样使用计算机网络,手机使用电脑网络怎么操作 手机使用电脑网络操作方法...
  13. 【Processing】行走的行星 动态海报 processing艺术与科技
  14. 趣味数学(各大经典数学问题)
  15. 编辑距离(Edit Distance) 一文读懂(Python实现)
  16. 怎么删除github项目/仓库中已经上传的代码
  17. 使用cocoscreator接入google AdSence广告
  18. JavaScript提升(你不知道的JavaScript)
  19. jquery插件——仿新浪微博限制输入字数的textarea
  20. 负载均衡手段之DNS轮询

热门文章

  1. 查询每个部门中入职日期一致的员工信息
  2. 训练AI数据模型所需要的高性能计算机配置
  3. 使用simsun的黑体
  4. Tensorflow实现变分自编码器
  5. 图标转换成web字体
  6. Docker版本变化和新版安装
  7. html 计算两数之和,百度web前端面试题之求两个数的最大公约数和最小公倍数
  8. 疫情防控之外来人员登记表模板
  9. 表空间配额和UNLIMITED TABLESPACE权限
  10. 最新IGS MGEX测站数据选择及下载详细教程(附IGS登录方法)