记录下备忘,年前,公司业务需求,需要搭建一套相对通用的游戏后台,来提供给其它游戏项目使用。最开始的逻辑如下,本次主要是涉及到红框部分:

1、多台研发服务器把相关数据,通过http方式请求到后台的API服务器中。(后续扩展也可以按实际情况使用日志文件或者同步读取研发只读库方式来进行数据传递)。

2、API服务器再把接收到的日志写到日志文件中来,然后通过【crontab+rsync】把日志文件同步到数据处理机子上。

3、数据处理机子,再定时读取收到的日志文件,格式化清理补全并写入到数据库中来。

整套简化逻辑大概就这样了,以前也很多系统使用类似的方式在跑着。可是后面demo测试的时候就感觉数据不够即时,比如创建个角色,要几分钟后才能在后台看到。小伙伴们决定要解决这个问题,起初有几个方案:

a、rsync配合上infotify来更实时的同步数据
b、API在写数据的时候,同时写一份数据到数据库上,后台部分实时业务读实时数据库来展示

c、使用kafka来记录和同步数据

对比了以上几个,a方案虽然同步实时了点,但是读取时候还得判断是否数据完整等等,并且有数据丢失的风险。b方案就治标不治本的方法了,以前部分项目其实也有用过。c方案是最终选定的,觉得这个比较符合需求,能够比较实时的同步数据,而且有比较强的容灾和扩展性。说干就干,首先是机器上装上相对于的环境,API机子需要安装kafak,数据处理机子不需要kafka,但使用php来读取数据,需要装扩展(api机子是go来做接口,相对方便了一点),文末有相关参考教程链接

一、环境安装:

【jdk】

1.去Oracle官网下载需要安装的jdk版本,我这里用的是jdk-8u144-linux-x64.tar.gz

2.解压后移动到喜欢的位置

tar zxvf jdk-8u144-linux-x64.tar.gz
mv jdk1.8.0_144 /usr/local/jdk/

3、编辑下面文件,在末尾增加下面几行命令,配置环境变量
vim /etc/profile

export JAVA_HOME=/usr/local/jdk/jdk1.8.0_144
export CLASSPATH=$:CLASSPATH:$JAVA_HOME/lib/
export PATH=$PATH:$JAVA_HOME/bin

4、重加载配置文件生效后,查看是否安装成功

source /etc/profile
java -version

【kafka_2.13-3.1.0】

1、到官网去下载对应版本,这边用的是kafka_2.13-3.1.0,其中2.13是zookeeper的版本,3.1.0是kafka的版本,新的已经集成在一起了。

wget --no-check-certificat https://dlcdn.apache.org/kafka/3.1.0/kafka_2.13-3.1.0.tgz

2、解压到对应目录,就可以使用了

tar -zxvf kafka_2.13-3.1.0.tgz
mv kafka_2.13-3.1.0 /usr/local/
cd /usr/local/kafka_2.13-3.1.0

【php_rdkafka扩展】

1、这里用到的是php-rdkafka扩展,相对php-kafka强大一点。不过在装扩展前,需要先安装个librdkafka库。记得要找合适php版本的。sfdfsdfsdf

https://github.com/edenhill/librdkafka  //librdkafka地址
https://github.com/arnaud-lb/php-rdkafka  //php-rdkafka下载地址

2、安装librdkafka

unzip librdkafka-master.zip
cd librdkafka-master
./configure
make && make install

3、再安装rdkafka

unzip php-rdkafka-master.zip
cd php-rdkafka-master
phpize
./configure --with-php-config=/usr/local/php/bin/php-config
make && make install

4、修改配置文件,加上扩展,重启php

vim /usr/local/php/etc/php.ini   //对应配置文件添加下面代码extension=/usr/local/php/lib/php/extensions/no-debug-non-zts-20170718/rdkafka.so
service php-fpm restart  //重启PHP

【supervisorctl安装】

1、supervisorctl主要是用来监控进程健康,自动重启等

二、kafka集群配置

相关教程可以到文末链接了解,这边主要是暂时先用两台机子来集群,分别使用两个9091,9092两个端口来运行着。

1、使用端口,防火墙相关也需要配置好

2、kafka/config/server.properties配置文件,主要配置集群id和相关链接端口等

3、部分参考命令记录

启动 ZK    bin/zookeeper-server-start.sh -daemon config/zookeeper.properties   Kafka 安装包自带 ZK,可以单节点启动
启动 Kafka 服务器    bin/kafka-server-start.sh -daemon config/server.properties
创建 Topic(test)    bin/kafka-topics.sh --create -bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
Topic 列表    bin/kafka-topics.sh --list  -bootstrap-server localhost:9092
启动 Producer bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
启动 Consumer bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
Topic 相关信息(test)  bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test

三、代码摘录

1、初始化创建topic,根据具体业务分了多个topic,每个topic里面也可能存在相关的几个日志

bin/kafka-topics.sh --create -bootstrap-server 10.16.0.7:9091 --replication-factor 2 --topic roleCreate --partitions 2
bin/kafka-topics.sh --create -bootstrap-server 10.16.0.7:9091 --replication-factor 2 --topic roleEnter --partitions 4
bin/kafka-topics.sh --create -bootstrap-server 10.16.0.7:9091 --replication-factor 2 --topic roleOther --partitions 6
bin/kafka-topics.sh --create -bootstrap-server 10.16.0.7:9091 --replication-factor 2 --topic task --partitions 4
bin/kafka-topics.sh --create -bootstrap-server 10.16.0.7:9091 --replication-factor 2 --topic goods --partitions 6
bin/kafka-topics.sh --create -bootstrap-server 10.16.0.7:9091 --replication-factor 2 --topic partner --partitions 6
bin/kafka-topics.sh --create -bootstrap-server 10.16.0.7:9091 --replication-factor 2 --topic orderCreate --partitions 2
bin/kafka-topics.sh --create -bootstrap-server 10.16.0.7:9091 --replication-factor 2 --topic orderFinish --partitions 2
bin/kafka-topics.sh --create -bootstrap-server 10.16.0.7:9091 --replication-factor 2 --topic union --partitions 6
bin/kafka-topics.sh --create -bootstrap-server 10.16.0.7:9091 --replication-factor 2 --topic chat --partitions 4
bin/kafka-topics.sh --create -bootstrap-server 10.16.0.7:9091 --replication-factor 2 --topic otherLog --partitions 4
bin/kafka-topics.sh --create -bootstrap-server 10.16.0.7:9091 --replication-factor 2 --topic account --partitions 2
bin/kafka-topics.sh --create -bootstrap-server 10.16.0.7:9091 --replication-factor 2 --topic recoverLog --partitions 2

1、API使用go记录日志

package utilsimport ("errors""goapi/library/global""log""sync""go.uber.org/zap""github.com/confluentinc/confluent-kafka-go/kafka"
)const (INT32_MAX = 2147483647 - 1000
)var KafkaProdct *kafka.Producer
var KafkaLock sync.RWMutextype KafkaConfig struct {BootstrapServers string `json:"bootstrap.servers"`SecurityProtocol string `json:"security.protocol"`SslCaLocation    string `json:"ssl.ca.location"`SaslMechanism    string `json:"sasl.mechanism"`SaslUsername     string `json:"sasl.username"`SaslPassword     string `json:"sasl.password"`
}// config should be a pointer to structure, if not, panic
func loadJsonConfig() *KafkaConfig {var config = &KafkaConfig{BootstrapServers: global.G_CONFIG.Kafka.BootstrapServers,}return config
}func doInitProducer(cfg *KafkaConfig) (producer *kafka.Producer, err error) {global.GetLogger().Info("init kafka producer, it may take a few seconds to init the connection")//common argumentsvar kafkaconf = &kafka.ConfigMap{"api.version.request":           "true","linger.ms":                     100,"sticky.partitioning.linger.ms": 1000,"delivery.timeout.ms":           1000,"request.timeout.ms":            300,"batch.size":                    "32k","acks":                          "1",}kafkaconf.SetKey("bootstrap.servers", cfg.BootstrapServers)cfg.SecurityProtocol = "PLAINTEXT"switch cfg.SecurityProtocol {case "PLAINTEXT":kafkaconf.SetKey("security.protocol", "plaintext")case "SASL_SSL":kafkaconf.SetKey("security.protocol", "sasl_ssl")kafkaconf.SetKey("ssl.ca.location", "conf/ca-cert.pem")kafkaconf.SetKey("sasl.username", cfg.SaslUsername)kafkaconf.SetKey("sasl.password", cfg.SaslPassword)kafkaconf.SetKey("sasl.mechanism", cfg.SaslMechanism)case "SASL_PLAINTEXT":kafkaconf.SetKey("security.protocol", "sasl_plaintext")kafkaconf.SetKey("sasl.username", cfg.SaslUsername)kafkaconf.SetKey("sasl.password", cfg.SaslPassword)kafkaconf.SetKey("sasl.mechanism", cfg.SaslMechanism)default:panic(kafka.NewError(kafka.ErrUnknownProtocol, "unknown protocol", true))}producer, err = kafka.NewProducer(kafkaconf)if err != nil {return producer, errors.New("初始化producer失败")}global.GetLogger().Info("init kafka producer success")return producer, err
}func ProducerMessage() (producer *kafka.Producer, err error) {KafkaLock.Lock()defer KafkaLock.Unlock()if KafkaProdct != nil {return KafkaProdct, err}cfg := loadJsonConfig()KafkaProdct, err = doInitProducer(cfg)if err != nil {return KafkaProdct, err}go func() {for e := range KafkaProdct.Events() {switch ev := e.(type) {case *kafka.Message:if ev.TopicPartition.Error != nil {global.GetLogger().Error("KAFKA数据写入失败", zap.Error(ev.TopicPartition.Error))} else {log.Printf("Send OK topic:%v partition:%v offset:%v content:%s\n", *ev.TopicPartition.Topic, ev.TopicPartition.Partition, ev.TopicPartition.Offset, ev.Value)}}}}()return KafkaProdct, err
}func GetInstance() (producer *kafka.Producer, err error) {if KafkaProdct == nil {return ProducerMessage()}// 用于判断KafkaProdct连接是否正常,如果不正常则重连if KafkaProdct.GetFatalError() != nil {return ProducerMessage()}return KafkaProdct, err
}// Log 发送log
func KafkaLog(logName string, topic string, value string, key string) (err error) {if topic == "" {return errors.New("topic不能为空")}header := make([]kafka.Header, 0)header = append(header, kafka.Header{Key:   "log_type",Value: []byte(logName),})msg := &kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},Value:          []byte(value),Headers:        header,}if key != "" {msg.Key = []byte(key)}client, err := GetInstance()if err != nil {return err}// 创建通道用于接受kafka返回结果deliveryChan := make(chan kafka.Event)defer close(deliveryChan)err = client.Produce(msg, deliveryChan)if err != nil {return}e := <-deliveryChanm := e.(*kafka.Message)if m.TopicPartition.Error != nil {return m.TopicPartition.Error}return
}

2、php读取kafka上的数据并进行处理,这边通过header来区分topic下不同的日志。这个可以php放后台一直执行着,基本日志进kafka,很快就能读出来处理。然后用【sudo supervisorctl restart load_base_log_kafka】来监控脚本的执行和自动重启

<?php
include 'classes/loadLogBaseKafka.php';
include_once 'classes/config.php';
$topic_arr = ['account','roleCreate','roleEnter','roleOther','task','goods','partner','orderCreate','orderFinish','union','chat','otherLog','recoverLog'    //写入失败后重新写入的topic
];
$conf = new RdKafka\Conf();
$conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {switch ($err) {case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:echo "Assign: ";var_dump($partitions);$kafka->assign($partitions);break;case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:echo "Revoke: ";var_dump($partitions);$kafka->assign(NULL);break;default:throw new \Exception($err);}
});
$conf->set('group.id', KAFKA_GROUP_ID);    //定义消费者组
$conf->set('metadata.broker.list', KAFKA_HOST);    //定义kafka链接数组
$conf->set('auto.offset.reset', 'latest');    //定义消费模式为最新
$consumer = new RdKafka\KafkaConsumer($conf);    //创建消费者
$consumer->subscribe($topic_arr);     //定义需要消费的主题
$obj = new LoadLogBaseKafka();
while (true) {$message = $consumer->consume(120*1000);switch ($message->err) {case RD_KAFKA_RESP_ERR_NO_ERROR:load_message_to_sql($message,$obj);break;case RD_KAFKA_RESP_ERR__PARTITION_EOF:echo "No more messages; will wait for more\n";break;case RD_KAFKA_RESP_ERR__TIMED_OUT:echo "Timed out\n";break;default:throw new \Exception($message->errstr(), $message->err);break;}
}
function load_message_to_sql($message,$obj){$headers = $message->headers;if(isset($headers['log_type'])){$obj->exec($headers['log_type'],$message->payload);    //数据清洗入库等业务处理}
}

四、查阅资料链接

kafka中文教程

kafka安装教程

php-rdkafka使用说明

游戏数据后台,kafka代替rsync同步日志数据相关推荐

  1. cdc工具 postgresql_SQLServer CDC数据通过Kafka connect实时同步至分析型数据库 AnalyticDB For PostgreSQL及OSS-阿里云开发者社区...

    背景 SQLServer为实时更新数据同步提供了CDC机制,类似于Mysql的binlog,将数据更新操作维护到一张CDC表中. 开启cdc的源表在插入INSERT.更新UPDATE和删除DELETE ...

  2. elasticsearch通过logstash同步mysql数据(中文分词)

    Elasticsearch 目录 概述 索引(Index) 类型(Type) 文档(Document) 倒排索引(Inverted Index) 节点(Node) 安装 启动 DSL(查询语句) 官方 ...

  3. 基于深度学习的日志数据异常检测

    基于深度学习的日志数据异常检测 数据对象 智能运维(AIOps)是通过机器学习等算法分析来自于多种运维工具和设备的大规模数据.智能运维的分析数据对象多源运维数据包括系统运行时数据和历史记录数据,历史记 ...

  4. es对日志数据进行索引生命周期管理

    前言 在采用ELK分布式日志采集平台的时候,一般都会采用ES来存储采集的日志信息.日志信息一般都是持续增长的,是典型的时序数据. 如果不对采集的日志数据做生命周期管理,很容易导致单个索引体积持续增长. ...

  5. 数据中台交付专家告诉你,数据架构的分层怎样更加合理?

    作者:柯根 从整体上看,数据中台体系架构可分为:数据采集层.数据计算层.数据服务层三大层次.通过这三大层次对上层数据应用提供数据支撑. 数据采集层 对于企业来说,每时每刻都在产生海量的数据,数据采集作 ...

  6. Linux下Rsync+Inotify-tools实现数据实时同步

    说明: 操作系统:CentOS 5.X 源服务器:192.168.21.129 目标服务器:192.168.21.127,192.168.21.128 目的:把源服务器上/home/www.osyun ...

  7. rsync+inotify 实现数据实时同步

    Rsync (Rmote sync)是一款非常好的数据同步工具,能够通过对比同步双方的数据变动,实现增量同步,还可以通过LAN/WAN实现远程多台主机间文件的同步,还能结合crond任务计划来执行自动 ...

  8. 日志数据如何同步到MaxCompute

    摘要:日常工作中,企业需要将通过ECS.容器.移动端.开源软件.网站服务.JS等接入的实时日志数据进行应用开发.包括对日志实时查询与分析.采集与消费.数据清洗与流计算.数据仓库对接等场景.本次分享主要 ...

  9. linux云计算架构师:Rsync+sersync实现数据实时同步

    全套学习资料移步至公众号[学神来啦]更多学习资料添加扣扣资源群:661308959 本节所讲内容:   2.1  Rsync概述   2.2  Rsync服务安装   2.3  Rsync+sersy ...

最新文章

  1. 我的Pandas学习经历及动手实践
  2. 给Linux系统/网络管理员的nmap的29个实用例子
  3. python怎么学最快-怎么快速自学python
  4. 小米开源文件管理器MiCodeFileExplorer-源码研究(8)-文件排序工具类FileSortHelper
  5. 通用双向链表的设计(参考Linux系统中的实现)
  6. 按下enter键禁止页面刷新
  7. 28岁学python转行_28岁转行程序员,学Java还是Python?码农:想快点月薪过万就选它...
  8. java jqgrid json格式_jqGrid 数据之 Json
  9. unknow ’query_cache_size’报错的解决方法吗
  10. 央视牵手搜狗,AI合成主播为3·15晚会预热
  11. 二元序列游程编码c语言,基于游程编码数据压缩算法设计与实现.doc
  12. 领导人怎样带领好团队
  13. c 语言 百钱买百鸡
  14. #微信小程序# 在小程序里面退出退出小程序(navigator以及API--wx.exitMiniProgram)
  15. WEB 3D JS 使用【3Dmol.js】
  16. powerdesigner中name属性变为注释_se7en3_新浪博客
  17. 阿里巴巴CEO马云曾经注册过的N个搞笑商标 只有你想不到
  18. Android集成Facebook 事件统计
  19. 智能家居和物联网的未来:AIoT
  20. xna api大全(二)

热门文章

  1. 惯性导航工作原理及系统分类
  2. Root后安装Xposed框架卡Logo问题
  3. Spring Cloud Alibaba 实战 | 第十二篇: 微服务整合Sentinel的流控、熔断降级,赋能拥有降级功能的Feign新技能熔断,实现熔断降级双剑合璧(JMeter模拟测试)
  4. 文件(file)操作
  5. python Beautiful Soup常用过滤方法
  6. Python 用pygame 做一个游戏的开始界面(小白第一篇博客)
  7. seedlab:V** tunneling The Container Version
  8. 面部识别预处理全家桶:mtcnn人脸捕捉、人脸点对齐、仿射运算
  9. 【推荐系统】协同过滤
  10. 【分享】“小鹅通“在集简云平台集成应用的常见问题与解决方案