kafka身份认证 maxwell_聊聊MaxwellKafkaProducer
序
本文主要研究一下MaxwellKafkaProducer
MaxwellKafkaProducer
maxwell-1.25.1/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java
public class MaxwellKafkaProducer extends AbstractProducer {
private final ArrayBlockingQueue queue;
private final MaxwellKafkaProducerWorker worker;
public MaxwellKafkaProducer(MaxwellContext context, Properties kafkaProperties, String kafkaTopic) {
super(context);
this.queue = new ArrayBlockingQueue<>(100);
this.worker = new MaxwellKafkaProducerWorker(context, kafkaProperties, kafkaTopic, this.queue);
Thread thread = new Thread(this.worker, "maxwell-kafka-worker");
thread.setDaemon(true);
thread.start();
}
@Override
public void push(RowMap r) throws Exception {
this.queue.put(r);
}
@Override
public StoppableTask getStoppableTask() {
return this.worker;
}
@Override
public KafkaProducerDiagnostic getDiagnostic() {
return new KafkaProducerDiagnostic(worker, context.getConfig(), context.getPositionStoreThread());
}
}
MaxwellKafkaProducer继承了AbstractProducer,其构造器会创建ArrayBlockingQueue、ArrayBlockingQueue;其push方法则往queue中put数据
MaxwellKafkaProducerWorker
maxwell-1.25.1/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java
class MaxwellKafkaProducerWorker extends AbstractAsyncProducer implements Runnable, StoppableTask {
static final Logger LOGGER = LoggerFactory.getLogger(MaxwellKafkaProducer.class);
private final Producer kafka;
private final String topic;
private final String ddlTopic;
private final MaxwellKafkaPartitioner partitioner;
private final MaxwellKafkaPartitioner ddlPartitioner;
private final KeyFormat keyFormat;
private final boolean interpolateTopic;
private final ArrayBlockingQueue queue;
private Thread thread;
private StoppableTaskState taskState;
private String deadLetterTopic;
private final ConcurrentLinkedQueue, KafkaCallback>> deadLetterQueue;
public static MaxwellKafkaPartitioner makeDDLPartitioner(String partitionHashFunc, String partitionKey) {
if ( partitionKey.equals("table") ) {
return new MaxwellKafkaPartitioner(partitionHashFunc, "table", null, "database");
} else {
return new MaxwellKafkaPartitioner(partitionHashFunc, "database", null, null);
}
}
public MaxwellKafkaProducerWorker(MaxwellContext context, String kafkaTopic, ArrayBlockingQueue queue,
Producer producer)
{
super(context);
if ( kafkaTopic == null ) {
this.topic = "maxwell";
} else {
this.topic = kafkaTopic;
}
this.interpolateTopic = this.topic.contains("%{");
this.kafka = producer;
String hash = context.getConfig().kafkaPartitionHash;
String partitionKey = context.getConfig().producerPartitionKey;
String partitionColumns = context.getConfig().producerPartitionColumns;
String partitionFallback = context.getConfig().producerPartitionFallback;
this.partitioner = new MaxwellKafkaPartitioner(hash, partitionKey, partitionColumns, partitionFallback);
this.ddlPartitioner = makeDDLPartitioner(hash, partitionKey);
this.ddlTopic = context.getConfig().ddlKafkaTopic;
this.deadLetterTopic = context.getConfig().deadLetterTopic;
this.deadLetterQueue = new ConcurrentLinkedQueue<>();
if ( context.getConfig().kafkaKeyFormat.equals("hash") )
keyFormat = KeyFormat.HASH;
else
keyFormat = KeyFormat.ARRAY;
this.queue = queue;
this.taskState = new StoppableTaskState("MaxwellKafkaProducerWorker");
}
public MaxwellKafkaProducerWorker(MaxwellContext context, Properties kafkaProperties, String kafkaTopic,
ArrayBlockingQueue queue)
{
this(context, kafkaTopic, queue,
new KafkaProducer(kafkaProperties, new StringSerializer(), new StringSerializer()));
}
@Override
public void run() {
this.thread = Thread.currentThread();
while ( true ) {
try {
drainDeadLetterQueue();
RowMap row = queue.take();
if (!taskState.isRunning()) {
taskState.stopped();
return;
}
this.push(row);
} catch ( Exception e ) {
taskState.stopped();
context.terminate(e);
return;
}
}
}
void drainDeadLetterQueue() {
Pair, KafkaCallback> pair;
while ((pair = deadLetterQueue.poll()) != null) {
sendAsync(pair.getLeft(), pair.getRight());
}
}
//......
}
MaxwellKafkaProducerWorker继承了AbstractAsyncProducer,实现了Runnable及StoppableTask接口;其run方法使用while循环,不断执行drainDeadLetterQueue、queue.take()、this.push(row);drainDeadLetterQueue方法从deadLetterQueue拉取数据,然后通过sendAsync再次发送
AbstractAsyncProducer
maxwell-1.25.1/src/main/java/com/zendesk/maxwell/producer/AbstractAsyncProducer.java
public abstract class AbstractAsyncProducer extends AbstractProducer {
public class CallbackCompleter {
private InflightMessageList inflightMessages;
private final MaxwellContext context;
private final MaxwellConfig config;
private final Position position;
private final boolean isTXCommit;
private final long messageID;
public CallbackCompleter(InflightMessageList inflightMessages, Position position, boolean isTXCommit, MaxwellContext context, long messageID) {
this.inflightMessages = inflightMessages;
this.context = context;
this.config = context.getConfig();
this.position = position;
this.isTXCommit = isTXCommit;
this.messageID = messageID;
}
public void markCompleted() {
inflightMessages.freeSlot(messageID);
if(isTXCommit) {
InflightMessageList.InflightMessage message = inflightMessages.completeMessage(position);
if (message != null) {
context.setPosition(message.position);
long currentTime = System.currentTimeMillis();
long age = currentTime - message.sendTimeMS;
messagePublishTimer.update(age, TimeUnit.MILLISECONDS);
messageLatencyTimer.update(Math.max(0L, currentTime - message.eventTimeMS - 500L), TimeUnit.MILLISECONDS);
if (age > config.metricsAgeSlo) {
messageLatencySloViolationCount.inc();
}
}
}
}
}
private InflightMessageList inflightMessages;
public AbstractAsyncProducer(MaxwellContext context) {
super(context);
this.inflightMessages = new InflightMessageList(context);
Metrics metrics = context.getMetrics();
String gaugeName = metrics.metricName("inflightmessages", "count");
metrics.register(gaugeName, (Gauge) () -> (long) inflightMessages.size());
}
public abstract void sendAsync(RowMap r, CallbackCompleter cc) throws Exception;
@Override
public final void push(RowMap r) throws Exception {
Position position = r.getNextPosition();
// Rows that do not get sent to a target will be automatically marked as complete.
// We will attempt to commit a checkpoint up to the current row.
if(!r.shouldOutput(outputConfig)) {
if ( position != null ) {
inflightMessages.addMessage(position, r.getTimestampMillis(), 0L);
InflightMessageList.InflightMessage completed = inflightMessages.completeMessage(position);
if (completed != null) {
context.setPosition(completed.position);
}
}
return;
}
// back-pressure from slow producers
long messageID = inflightMessages.waitForSlot();
if(r.isTXCommit()) {
inflightMessages.addMessage(position, r.getTimestampMillis(), messageID);
}
CallbackCompleter cc = new CallbackCompleter(inflightMessages, position, r.isTXCommit(), context, messageID);
sendAsync(r, cc);
}
}
AbstractAsyncProducer继承了AbstractProducer,其push方法主要执行inflightMessages.addMessage及sendAsync
小结
MaxwellKafkaProducer继承了AbstractProducer,其构造器会创建ArrayBlockingQueue、ArrayBlockingQueue;其push方法则往queue中put数据;MaxwellKafkaProducerWorker继承了AbstractAsyncProducer,实现了Runnable及StoppableTask接口;其run方法使用while循环,不断执行drainDeadLetterQueue、queue.take()、this.push(row);drainDeadLetterQueue方法从deadLetterQueue拉取数据,然后通过sendAsync再次发送
doc
作者:codecraft
kafka身份认证 maxwell_聊聊MaxwellKafkaProducer相关推荐
- kafka身份认证 maxwell_Kafka 使用SASL / SCRAM进行身份验证
使用SASL / SCRAM进行身份验证 请先在不配置任何身份验证的情况下启动Kafka 1. 创建SCRAM Credentials 1.1 创建broker通信用户(或称超级用户) bash Em ...
- Window下kafka 单机SASL_SCRAM加密及身份认证
KAFKA加密认证机制中的SASL主要包括SASL_PLAINTEXT SASL_GSSAPI SASL_SCRAM.这里主要记录一下Windows下搭建配置单机sasl_scram环境. 一.前情提 ...
- 深入聊聊微服务架构的身份认证问题
从单体应用架构到分布式应用架构再到微服务架构,应用的安全访问在不断的经受考验.为了适应架构的变化.需求的变化,身份认证与鉴权方案也在不断的变革.面对数十个甚至上百个微服务之间的调用,如何保证高效安全的 ...
- 芯盾时代:致力于身份认证安全的领军者
本文讲的是芯盾时代:致力于身份认证安全的领军者 信息安全技术的关注点,一直在随着互联网的发展而变化. 互联网兴起时,作为流量聚集地的门户网站,主要面向大众提供信息,缺乏个性化色彩,防火墙.IDS/IP ...
- Python+OpenCv实现AI人脸识别身份认证系统(2)——人脸数据采集、存储
原 Python+OpenCv实现AI人脸识别身份认证系统(2)--人脸数据采集.存储 2019年07月02日 08:47:52 不脱发的程序猿 阅读数 602更多 所属专栏: 人脸识别身份认证系统设 ...
- Shiro(三) 身份认证源码分析与 MD5 盐值加密
文章目录 1. 身份认证 2. 身份验证的基本流程 3. 身份验证实现 3.1 在 `login.jsp` 添加登录表单 3.2 添加表单提交的 Controller 3.3 完善 Realm 的身份 ...
- 开启kafka密码认证
Kafka默认未开启密码认证,可以免密登录,太不安全,因此需要开启密码认证. 一 kafka认证方式类型 kafka提供了多种安全认证机制,主要分为SSL和SASL大类.其中SASL/PLAIN是基于 ...
- 大数据Hadoop之——Kafka安全机制(Kafka SSL认证实现)
文章目录 一.概述 1)SASL认证概述 2)Delegation Token认证概述 3)SSL认证概述(本章实现) 二.各种安全认证机制对比和使用场景 三.Kafka SSL认证实现 1)创建ss ...
- Shiro01 功能点框图、架构图、身份认证逻辑、身份认证代码实现
基本功能点 功能点框图 功能点说明 1.Authentication:身份认证/登录,验证用户是不是拥有相应的身份: 2.Authorization:授权,即权限验证,验证某个已认证的用户是否拥有某个 ...
最新文章
- Python标准库:内置函数tuple([iterable])
- 上海python培训比较好的机构-上海Python培训机构推荐
- Java开发前景好,3大从业方向供你选择
- 如何比较 Java 的字符串
- 1114. 按序打印
- LeetCode 979. 在二叉树中分配硬币(DFS)
- iOS7应用开发2、关于新版的IDE:XCode 5
- .net发送带附件邮件
- TensorFlow神经网络(九)VGG net论文阅读笔记
- html表格基础及案例示图代码。
- Visio 2013—安装步骤说明
- python中一元二次方程的判别式_【Python算法作业】解一元二次方程
- 最简单的单片机c语言程序,单片机的C语言编程基础知识(初学注意)
- 鸿蒙哦叟,苕木匠时评:说鸿蒙“
- linux双系统无u盘安装教程视频教程,U盘安装Windows和Ubuntu 15.04双系统图解教程
- 21秋信源编码技术作业(1)——使用Audacity软件绘制清浊音频谱图并进行分析
- 手把手教你撸一个Web汇率计算器
- 运动会分数统计系统(数据结构)C++
- 堆栈与动态分配内存空间
- HTML canvas系列-画圆(4)
热门文章
- 基于vivado的fir ip核的重采样设计与实现
- 修改can接口波特率_CAN总线分析仪使用
- android切换字体颜色,Android开发实现按钮点击切换背景并修改文字颜色的方法
- 推荐陈永真著作《高效率开关电源设计与制作》
- python从list列表中选出一个数和其对应的坐标
- numpy.loadtxt画功率谱图
- django写项目的详细步骤
- Beta--冲刺阶段合集
- 《数据分析实战 基于EXCEL和SPSS系列工具的实践》一3.4 数据量太大了怎么办
- OSX Yosemite,pod install报错RPC failed; result=52,