继上一篇博文初步搭建好kafka+zookeeper+filebeat的集群平台后,这次我们继续给此集群添加功能并完善.

数据入库(python实现)

需求分析

需要nginx日志ip时间带宽字段
将ip字段解析成相应的省份运营商
存入数据库的字段: id, 时间, 省份, 运营商, 带宽

实验步骤

1、创建数据表
2、编写python脚本, 从kafka获取nginx日志
3、获取好的nginx日志,提取出ip,时间,带宽字段
4、提取出的ip字段通过淘宝的一个接口解析出省份和运营商
(url = “https://ip.taobao.com/outGetIpInfo?accessKey=alibaba-inc&ip=”)
5、格式化时间字段 “2021-10-12 12:00:00”
6、存入数据库

第一步:
将上一次实践的集群都开启,开一台有mysql的服务器.进行第一步,创建consumers数据库和nginxlog表.

CREATE DATABASE consumers;
USE consumers;
CREATE TABLE nginxlog (id INT NOT NULL AUTO_INCREMENT,dt DATETIME NOT NULL,prov VARCHAR(20) NOT NULL,isp VARCHAR(50) NOT NULL,bd INT NOT NULL,PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

第二步:
将2,3,4,5,6步用Python写好一个程序,直接执行即可.(编写消费者程序)

vim python_consumers.py
import json
import requests
import time
import pymysqltaobao_url = "https://ip.taobao.com/outGetIpInfo?accessKey=alibaba-inc&ip="# 查询ip地址的信息(省份和运营商isp),通过taobao网的接口
def resolv_ip(ip):response = requests.get(taobao_url+ip)if response.status_code == 200:tmp_dict = json.loads(response.text)prov = tmp_dict["data"]["region"]isp = tmp_dict["data"]["isp"]return prov, ispreturn None, None# 将日志里读取的格式转换为我们指定的格式
def trans_time(dt):# 把字符串转成时间格式timeArray = time.strptime(dt, "%d/%b/%Y:%H:%M:%S")# timeStamp = int(time.mktime(timeArray))# 把时间格式转成字符串new_time = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)return new_time# 从kafka里获取数据,清洗为我们需要的ip,时间,带宽
from pykafka import KafkaClientclient = KafkaClient(hosts="192.168.1.94:9092,192.168.1.95:9092,192.168.1.96:9092")
topic = client.topics['nginxlog']
balanced_consumer = topic.get_balanced_consumer(consumer_group='testgroup',# 自动提交offsetauto_commit_enable=True,zookeeper_connect='nginx-kafka01:2181,nginx-kafka02:2181,nginx-kafka03:2181'
)# 建立数据库连接
db = pymysql.connect(host="192.168.1.150", user="jiangda330", password="123456", port=3306, db="consumers", charset="utf8")
cursor = db.cursor()for message in balanced_consumer:if message is not None:try:line = json.loads(message.value.decode("utf-8"))log = line["message"]tmp_lst = log.split()ip = tmp_lst[0]dt = tmp_lst[3].replace("[","")bt = tmp_lst[9]dt = trans_time(dt)prov, isp = resolv_ip(ip)if prov and isp:print(prov, isp, dt, bt)# 使用占位符,防止SQL注入攻击cursor.execute('insert into nginxlog(dt,prov,isp,bd) values(%s, %s, %s, %s)', (dt, prov, isp, bt))db.commit()print("save successfully!")except json.JSONDecodeError as err:print("JSON decode error:", err)except IndexError as err:print("Index error:", err)except requests.exceptions.RequestException as err:print("Request error:", err)except pymysql.Error as err:print("Database error:", err)# 关闭数据库连接
db.close()

将数据库连接放在了for循环外面,避免了重复建立和释放数据库连接的情况。

在进行数据库操作时,使用了try…except语句来处理异常,更好地定位错误。

在进行数据库操作时,使用了参数化查询,以避免SQL注入攻击。

调整了代码的缩进,符合PEP8规范,使用了4个空格的缩进方式。

请注意,在消费数据时,我将consumer改为了balanced_consumer.consume(),这是因为使用SimpleConsumer时,可能会出现消费速度跟不上生产速度的情况,导致消息堆积。而使用balanced_consumer可以动态平衡分配消费者,更合理地分配负载,避免消息堆积的情况。

第三步:执行

python3 python_consumers.py

第四步:验证:查看是否消费成功
访问自己的Nginx服务器.这样就算成功了,当然你还可以直接登陆数据库,看看数据有没有传进去.


多说一些:
针对于分表,我们的脚本就要进行相应的修改。
数据插入nginxlog表之前
1、获取prov_index和isp_index表里的数据
2、判断要插入的prov和isp在不在表的数据里
3、如果prov_index和isp_index数据存在,就获取相应的id,再进行存储
4、如果prov_index和isp_index数据不存在,就把新的数据插入这两张表,然后再获取id字段,进行存储到数据表nginxlog

Celery

celery 是由python开发的 ,简单、灵活、可靠的分布式任务处理框架.

celery 的5个角色:
Task
就是任务,有异步任务和定时任务

Broker
中间人,接收生产者发来的消息即Task,将任务存入队列。任务的消费者是Worker。
Celery本身不提供队列服务,推荐用Redis或RabbitMQ实现队列服务。

Worker
执行任务的单元,它实时监控消息队列,如果有任务就获取任务并执行它。

Beat
定时任务调度器,根据配置定时将任务发送给Broker。

Backend

实验

安装redis
yum  install epel-release -y
yum install redis -y
修改监听ip
vim /etc/redis.confbind 0.0.0.0
启动redis
[root@nginx-kafka01 ~]# systemctl start redis

memcached vs redis

redis是一种键值存储的数据库。
redis开启持久化的两种模式:
AOF 全持久化模式 每一次操作都会同步到磁盘
RDB 半持久化模式 定时的将内存内容快照写入磁盘

redis数据库 0-15号库

为什么用kafka做日志收集而不用redis?
kafka适用于做日志收集平台
redis大多数使用在做kv数据存储上
redis也有一个queue的数据类型,用来做发布/订阅系统。
它里面的消息只能消费一次,kafka可以通过offset的设置来重复消费。
redis只有单一的消费者,不能分成多个消费组。
redis数据存储在内存的,kafka存储在磁盘的。

安装celery
pip install  celery
pip install  redis

编辑celery 参照flask_log/celery_app

配置celery
vim config.py
from celery.schedules import crontab#配置消息中间件的地址
BROKER_URL = "redis://192.168.77.132:6379/1"#配置结果存放地址
CELERY_RESULT_BACKEND = "redis://192.168.77.132:6379/2"#启动celery时,导入任务, 只有导入任务才能执行
CELERY_IMPORTS = {'celery_tasks'   #存放celery要执行的任务
}#时区
CELERY_TIMEZONE = "Asia/Shanghai"#设置定时任务
CELERYBEAT_SCHEDULE = {'log-every-minute': {'task' : 'celery_tasks.scheduled_task','schedule': crontab(minute='*/1')}
}
vim app.py
from celery import Celery#实例化celery对象,传入一个名字即可
celery_app = Celery('celery_app')celery_app.config_from_object('config')#############celery_tasks.py 存放任务的文件from app import celery_app@celery_app.task
def scheduled_task(*args, **kwargs):print("this is schedule task")
启动worker
[root@nginx-kafka01 flask_log]# celery -A app.celery_app worker --loglevel=INFO -n node1
启动beat
[root@nginx-kafka01 flask_log]# celery -A app.celery_app beat --loglevel=INFO

开两个bash进程,可以看到:


大功告成!
就到这把~
接下来就是总结了,我感觉完成后回头一看,发现也不过如此,甚至觉得没那么负责,可是在做的时候,积累的问题真的很多很多.勤总结!!!继续加油~

【Kafka】kafka日志收集平台搭建(二)相关推荐

  1. ELK+Kafka 企业日志收集平台(一)

    背景: 最近线上上了ELK,但是只用了一台Redis在中间作为消息队列,以减轻前端es集群的压力,Redis的集群解决方案暂时没有接触过,并且Redis作为消息队列并不是它的强项:所以最近将Redis ...

  2. ELK+Kafka 企业日志收集平台(二)这是原版

    上篇博文主要总结了一下elk.基于kafka的zookeeper集群搭建,以及系统日志通过zookeeper集群达到我们集群的整个过程.下面我们接着下面这个未完成的几个主题 4.Kibana部署; 5 ...

  3. ELK+Kafka 企业日志收集平台(二)

    上篇博文主要总结了一下elk.基于kafka的zookeeper集群搭建,以及系统日志通过zookeeper集群达到我们集群的整个过程.下面我们接着下面这个未完成的几个主题 4.Kibana部署; 5 ...

  4. 搭建elk+logstash+kafka+filebeat日志收集平台

    文章目录 前言 组件介绍 原理图 环境介绍 安装 日志收集与展示 前言 在日常的运维过程中,对系统日志和业务日志的处理比较重要,对于以后的数据分析.排查异常问题有很重的作用.今天就分享一个自己基于ka ...

  5. ELK+Kafka分布式日志收集系统环境搭建

    一.ES与kafka环境搭建 1.使用Docker搭建Elasticsearch Docker安装ES 2.使用Docker搭建Kafka,因为这里是演示,所以Kafka没有搭建集群. Docker安 ...

  6. 实战:kafka实现日志收集系统

    实战:kafka实现日志收集系统 一.Kafka案例需求概述 1.1 需要收集的信息: 用户唯一标识 用户操作时间 具体用户操作 1.2 工作流程: 用户的操作会由Web服务器进行响应. 同时用户的操 ...

  7. EFK6.3+kafka+logstash日志分析平台集群

    转载来源 :EFK6.3+kafka+logstash日志分析平台集群 :https://www.jianshu.com/p/f956ebbb2499 架构解读 : 第一层.数据采集层 安装fileb ...

  8. 深入浅出ELK日志收集系统搭建

    先看一下目录图 背景 试想这么一种场景:Nginx负载了2个Tomcat,那么日志查看就很麻烦了,每次查看日志都要登录2台服务器,挨个搜索,2台还好,如果5台呢?10台呢?那查看日志就可费劲了,所以需 ...

  9. 网页版本的飞行日志分析平台是_一个轻便的实时日志收集平台wslog

    一个轻便的实时日志收集平台wslog wslog原理 利用github.com上无数的slack hook 日志工具sdk 遵循 slack hook API 规范 https://api.slack ...

最新文章

  1. 第166天:canvas绘制饼状图动画
  2. android gradle is插件,android gradle 插件创建 configuration
  3. ArcGIS实验教程——实验十四:空间数据库的建立
  4. 与计算机交朋友优秀教案,《与计算机交朋友》教学设计-20210608120218.pdf-原创力文档...
  5. ssm radis mysql_SSM完美整合Redis
  6. python中宽度是什么意思_在Python中,高度还是宽度优先?
  7. Intellij Idea 15 下新建 Hibernate 项目以及如何添加配置
  8. 文件不混淆_Python代码保护 | pyc 混淆从入门到工具实现
  9. Exp3免杀原理与实践 20154326杨茜
  10. 产品需求文档(PRD,Product Requirement Document)模板
  11. 在Word中快速插入分隔线
  12. RobotStudio知识你知多少?
  13. 变压器励磁模型 Matlab/simulink 可用于模拟电压暂降等电能质量问题
  14. 用JS实现贪吃蛇小游戏
  15. 我的AI转型之路与AI之我见
  16. JS 日期的获取和计算 ios不兼容问题
  17. 人人美剧迅雷链接多线程和多进程爬虫分析
  18. Spring boot集成Redis实现sessions共享时,sessions过期时间问题分析
  19. 大数据【企业级360°全方位用户画像】项目介绍
  20. 安格最近推出的AG6202来设计一款HDMI 1.4转VGA的产品|AG6202设计应用

热门文章

  1. php and mysql登录注册页面
  2. Java IDE介绍和使用
  3. Java——IDEA
  4. 力扣 704.二分查找
  5. HDU4544 湫湫系列故事——消灭兔子
  6. 实现连续签到案例的可能(使用)
  7. linux LED设备驱动文件
  8. css里banner是什么,div+css banner的问题
  9. 1天1个岗位画像洞察-无线DPM岗位
  10. 12306官方抢票服务,铁路候补购票服务扩大到全部旅客列车!