java kafka分布式_JavaWeb项目架构之Kafka分布式日志队列
架构、分布式、日志队列,标题自己都看着唬人,其实就是一个日志收集的功能,只不过中间加了一个Kafka做消息队列罢了。
kafka介绍
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。
特性
Kafka是一种高吞吐量的分布式发布订阅消息系统,有如下特性:
通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。
支持通过Kafka服务器和消费机集群来分区消息。
支持Hadoop并行数据加载。
主要功能
发布和订阅消息流,这个功能类似于消息队列,这也是kafka归类为消息队列框架的原因
以容错的方式记录消息流,kafka以文件的方式来存储消息流
可以再消息发布的时候进行处理
使用场景
在系统或应用程序之间构建可靠的用于传输实时数据的管道,消息队列功能
构建实时的流数据处理程序来变换或处理数据流,数据处理功能
消息传输流程
相关术语介绍
Broker
Kafka集群包含一个或多个服务器,这种服务器被称为broker
Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
Partition
Partition是物理上的概念,每个Topic包含一个或多个Partition.
Producer
负责发布消息到Kafka broker
Consumer
消息消费者,向Kafka broker读取消息的客户端。
Consumer Group
每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)
Kafka安装
环境
Linux、JDK、Zookeeper
下载二进制程序
wget https://archive.apache.org/dist/kafka/0.10.0.1/kafka_2.11-0.10.0.1.tgz
安装
tar -zxvf kafka_2.11-0.10.0.1.tgz
cd kafka_2.11-0.10.0.1
目录说明
bin 启动,停止等命令
config 配置文件
libs 类库
参数说明
#########################参数解释##############################
broker.id=0 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
port=9092 #当前kafka对外提供服务的端口默认是9092
host.name=192.168.1.170 #这个参数默认是关闭的
num.network.threads=3 #这个是borker进行网络处理的线程数
num.io.threads=8 #这个是borker进行I/O处理的线程数
log.dirs=/opt/kafka/kafkalogs/ #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
num.partitions=1 #默认的分区数,一个topic默认1个分区数
log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
message.max.byte=5242880 #消息保存的最大值5M
default.replication.factor=2 #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
replica.fetch.max.bytes=5242880 #取消息的最大直接数
log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能
zookeeper.connect=192.168.1.180:12181,192.168.1.181:12181,192.168.1.182:1218 #设置zookeeper的连接端口、如果非集群配置一个地址即可
#########################参数解释##############################
启动kafka
启动kafka之前要启动相应的zookeeper集群、自行安装,这里不做说明。
#进入到kafka的bin目录
./kafka-server-start.sh -daemon ../config/server.properties
Kafka集成
环境
spring-boot、elasticsearch、kafka
pom.xml引入:
org.springframework.kafka
spring-kafka
1.1.1.RELEASE
生产者
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
/**
* 生产者
* 创建者 科帮网
* 创建时间2018年2月4日
*/
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Value("${kafka.producer.servers}")
private String servers;
@Value("${kafka.producer.retries}")
private int retries;
@Value("${kafka.producer.batch.size}")
private int batchSize;
@Value("${kafka.producer.linger}")
private int linger;
@Value("${kafka.producer.buffer.memory}")
private int bufferMemory;
public Map producerConfigs() {
Map props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
public ProducerFactory producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate kafkaTemplate() {
return new KafkaTemplate(producerFactory());
}
}
消费者
mport java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
/**
* 消费者
* 创建者 科帮网
* 创建时间2018年2月4日
*/
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${kafka.consumer.servers}")
private String servers;
@Value("${kafka.consumer.enable.auto.commit}")
private boolean enableAutoCommit;
@Value("${kafka.consumer.session.timeout}")
private String sessionTimeout;
@Value("${kafka.consumer.auto.commit.interval}")
private String autoCommitInterval;
@Value("${kafka.consumer.group.id}")
private String groupId;
@Value("${kafka.consumer.auto.offset.reset}")
private String autoOffsetReset;
@Value("${kafka.consumer.concurrency}")
private int concurrency;
@Bean
public KafkaListenerContainerFactory> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(1500);
return factory;
}
public ConsumerFactory consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
public Map consumerConfigs() {
Map propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return propsMap;
}
@Bean
public Listener listener() {
return new Listener();
}
}
日志监听
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import com.itstyle.es.common.utils.JsonMapper;
import com.itstyle.es.log.entity.SysLogs;
import com.itstyle.es.log.repository.ElasticLogRepository;
/**
* 扫描监听
* 创建者 科帮网
* 创建时间2018年2月4日
*/
@Component
public class Listener {
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private ElasticLogRepository elasticLogRepository;
@KafkaListener(topics = {"itstyle"})
public void listen(ConsumerRecord, ?> record) {
logger.info("kafka的key: " + record.key());
logger.info("kafka的value: " + record.value());
if(record.key().equals("itstyle_log")){
try {
SysLogs log = JsonMapper.fromJsonString(record.value().toString(), SysLogs.class);
logger.info("kafka保存日志: " + log.getUsername());
elasticLogRepository.save(log);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
测试日志传输
/**
* kafka 日志队列测试接口
*/
@GetMapping(value="kafkaLog")
public @ResponseBody String kafkaLog() {
SysLogs log = new SysLogs();
log.setUsername("红薯");
log.setOperation("开源中国社区");
log.setMethod("com.itstyle.es.log.controller.kafkaLog()");
log.setIp("192.168.1.80");
log.setGmtCreate(new Timestamp(new Date().getTime()));
log.setExceptionDetail("开源中国社区");
log.setParams("{'name':'码云','type':'开源'}");
log.setDeviceType((short)1);
log.setPlatFrom((short)1);
log.setLogType((short)1);
log.setDeviceType((short)1);
log.setId((long)200000);
log.setUserId((long)1);
log.setTime((long)1);
//模拟日志队列实现
String json = JsonMapper.toJsonString(log);
kafkaTemplate.send("itstyle", "itstyle_log",json);
return "success";
}
Kafka与Redis
之前简单的介绍过,JavaWeb项目架构之Redis分布式日志队列,有小伙伴们聊到, Redis PUB/SUB没有任何可靠性保障,也不会持久化。当然了,原项目中仅仅是记录日志,并不是十分重要的信息,可以有一定程度上的丢失
Kafka与Redis PUB/SUB之间最大的区别在于Kafka是一个完整的分布式发布订阅消息系统,而Redis PUB/SUB只是一个组件而已。
使用场景
Redis PUB/SUB
消息持久性需求不高、吞吐量要求不高、可以忍受数据丢失
Kafka
高可用、高吞吐、持久性、多样化的消费处理模型
java kafka分布式_JavaWeb项目架构之Kafka分布式日志队列相关推荐
- java文件服务器_JavaWeb项目架构之NFS文件服务器
NFS简介 NFS(Network File System)即网络文件系统. 主要功能:通过网络(局域网)让不同的主机系统之间可以共享文件或目录. 主要用途:NFS网络文件系统一般被用来存储共享视频, ...
- java seo优化_JavaWeb 项目如果从技术选型的角度来做 Seo 优化
很久以前就开始想这个问题了,一直不知道 Seo 这块怎么做,感觉针对 Java 项目 Seo 优化这块网上资料挺匮乏的,所以就厚着脸皮来问各位前辈. 问题 在下想知道开发一个 javaweb 项目如何 ...
- java web插件_javaweb项目插件实现机制
如题,java开发web程序想实现插件机制有什么办法? 就比如:一个论坛,里面有签到,积分,第三方登录,编辑器选择等等的功能,现在我想把他们都抽出来,当成插件,论坛核心只保留用户的登录,注册,发帖,回 ...
- java成绩查询_JavaWeb项目第三次总结_成绩查询的实现
查询图书的功能实现 如何知道浏览器往服务器传入的参数 1.在编写好查询页面后,使用火狐浏览器的friebug (全部->POST->参数) 2.编写GradeListServlet,重写d ...
- java 插件原理_javaweb项目插件实现机制
如题,java开发web程序想实现插件机制有什么办法? 就比如:一个论坛,里面有签到,积分,第三方登录,编辑器选择等等的功能,现在我想把他们都抽出来,当成插件,论坛核心只保留用户的登录,注册,发帖,回 ...
- 分布式文件系统FastDFS架构辨析,分布式文件系统FastDFS_V4.06安装部署
FastDFS是一款类Google FS的开源分布式文件系统,它用纯C语言实现,支持Linux.FreeBSD.AIX等UNIX系统.它只能通过专有API对文件进行存取访问,不支持POSIX接口方式, ...
- java log 断点_项目中常见的log日志调用
第一种用法:引用org.apache.commons.logging.Log. import org.apache.commons.logging.Log; import org.apache.com ...
- 大数据开发hadoop核心的分布式消息系统:Apache Kafka 你知道吗
简介 Apache Kafka是分布式发布-订阅消息系统.它最初由LinkedIn公司开发,之后成为Apache项目的一部分.Kafka是一种快速.可扩展的.设计内在就是分布式的,分区的和可复制的提交 ...
- 大规模分布式与并行数据库架构
大规模分布式与并行数据库架构 注意区分分布式和并行数据库之间的差别,不要混淆.分布式和并行同时出现时,两者特点容易使人犯迷糊. 分布式 最基本特征:本地自治,非集中式管理;分布透明性的组成:位置,数据 ...
最新文章
- 超越时代的天才——图灵
- Seaborn在图像内自定义图例(legend)位置实战
- Activity中与ListActivity中使用listview区别
- ARM汇编:乘法指令集
- 1-36随机生成6个不重复的数
- 在C语言中是怎么存储的,在C语言中,串的存储方式是()。
- 有小数点是什么类型_为什么0.1+0.2不等于0.3?原来编程语言是这么算的……
- NetBeans 7.2 beta:更快,更有用
- linux的yum命令无法使用在哪里下载_Linux 知识分享:为Linux的cp和mv命令添加进度条...
- HTTP 错误 403.6 - Forbidden 解决方案
- 这届互联网公司月饼:阿里卡哇伊,百度酷炫风,京东乾隆审美……
- SQLServer如何取得随机获取的数据库记录
- cPanel虚拟主机上运行Python的方法
- makefile初步制作,arm-linux- (gcc/ld/objcopy/objdump)详解
- AI连围棋都可以大胜,何况游戏
- 在Qt工程中调用GmSSL
- 黑眼圈订单系统_大熊猫黑眼圈订单后台
- 导线测量步骤c语言程序,基于excel表的附合导线计算程序.doc
- python tkinter treeview制作_python-3.x – Tkinter Treeview标题样式
- Shell修改命令提示符
热门文章
- li标签横向排列_lt;bdigt; | HTML5 双向隔离标签
- 玩转 SpringBoot 2.x 之 快速集成 Jedis客户端(普通版)
- Cron 触发器及相关内容 (第三部分)
- Android动画定义知识小结
- 基于JAVA+SpringMVC+Mybatis+MYSQL的学生信息与选课系统
- linux终端安装mingw编译器_C/C++编译器MinGW的安装与配置
- [******] 链表问题:将单向链表按某值划分成左边小、中间相等、右边大的形式...
- ASP.NET Core学习——7
- [转]MySQL索引背后的数据结构及算法原理
- [转] Linux C语言 段错误bug的调试