本文主要研究一下MaxwellKafkaProducer

MaxwellKafkaProducer

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java

public class MaxwellKafkaProducer extends AbstractProducer {

private final ArrayBlockingQueue queue;

private final MaxwellKafkaProducerWorker worker;

public MaxwellKafkaProducer(MaxwellContext context, Properties kafkaProperties, String kafkaTopic) {

super(context);

this.queue = new ArrayBlockingQueue<>(100);

this.worker = new MaxwellKafkaProducerWorker(context, kafkaProperties, kafkaTopic, this.queue);

Thread thread = new Thread(this.worker, "maxwell-kafka-worker");

thread.setDaemon(true);

thread.start();

}

@Override

public void push(RowMap r) throws Exception {

this.queue.put(r);

}

@Override

public StoppableTask getStoppableTask() {

return this.worker;

}

@Override

public KafkaProducerDiagnostic getDiagnostic() {

return new KafkaProducerDiagnostic(worker, context.getConfig(), context.getPositionStoreThread());

}

}

MaxwellKafkaProducer继承了AbstractProducer,其构造器会创建ArrayBlockingQueue、ArrayBlockingQueue;其push方法则往queue中put数据

MaxwellKafkaProducerWorker

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java

class MaxwellKafkaProducerWorker extends AbstractAsyncProducer implements Runnable, StoppableTask {

static final Logger LOGGER = LoggerFactory.getLogger(MaxwellKafkaProducer.class);

private final Producer kafka;

private final String topic;

private final String ddlTopic;

private final MaxwellKafkaPartitioner partitioner;

private final MaxwellKafkaPartitioner ddlPartitioner;

private final KeyFormat keyFormat;

private final boolean interpolateTopic;

private final ArrayBlockingQueue queue;

private Thread thread;

private StoppableTaskState taskState;

private String deadLetterTopic;

private final ConcurrentLinkedQueue, KafkaCallback>> deadLetterQueue;

public static MaxwellKafkaPartitioner makeDDLPartitioner(String partitionHashFunc, String partitionKey) {

if ( partitionKey.equals("table") ) {

return new MaxwellKafkaPartitioner(partitionHashFunc, "table", null, "database");

} else {

return new MaxwellKafkaPartitioner(partitionHashFunc, "database", null, null);

}

}

public MaxwellKafkaProducerWorker(MaxwellContext context, String kafkaTopic, ArrayBlockingQueue queue,

Producer producer)

{

super(context);

if ( kafkaTopic == null ) {

this.topic = "maxwell";

} else {

this.topic = kafkaTopic;

}

this.interpolateTopic = this.topic.contains("%{");

this.kafka = producer;

String hash = context.getConfig().kafkaPartitionHash;

String partitionKey = context.getConfig().producerPartitionKey;

String partitionColumns = context.getConfig().producerPartitionColumns;

String partitionFallback = context.getConfig().producerPartitionFallback;

this.partitioner = new MaxwellKafkaPartitioner(hash, partitionKey, partitionColumns, partitionFallback);

this.ddlPartitioner = makeDDLPartitioner(hash, partitionKey);

this.ddlTopic = context.getConfig().ddlKafkaTopic;

this.deadLetterTopic = context.getConfig().deadLetterTopic;

this.deadLetterQueue = new ConcurrentLinkedQueue<>();

if ( context.getConfig().kafkaKeyFormat.equals("hash") )

keyFormat = KeyFormat.HASH;

else

keyFormat = KeyFormat.ARRAY;

this.queue = queue;

this.taskState = new StoppableTaskState("MaxwellKafkaProducerWorker");

}

public MaxwellKafkaProducerWorker(MaxwellContext context, Properties kafkaProperties, String kafkaTopic,

ArrayBlockingQueue queue)

{

this(context, kafkaTopic, queue,

new KafkaProducer(kafkaProperties, new StringSerializer(), new StringSerializer()));

}

@Override

public void run() {

this.thread = Thread.currentThread();

while ( true ) {

try {

drainDeadLetterQueue();

RowMap row = queue.take();

if (!taskState.isRunning()) {

taskState.stopped();

return;

}

this.push(row);

} catch ( Exception e ) {

taskState.stopped();

context.terminate(e);

return;

}

}

}

void drainDeadLetterQueue() {

Pair, KafkaCallback> pair;

while ((pair = deadLetterQueue.poll()) != null) {

sendAsync(pair.getLeft(), pair.getRight());

}

}

//......

}

MaxwellKafkaProducerWorker继承了AbstractAsyncProducer,实现了Runnable及StoppableTask接口;其run方法使用while循环,不断执行drainDeadLetterQueue、queue.take()、this.push(row);drainDeadLetterQueue方法从deadLetterQueue拉取数据,然后通过sendAsync再次发送

AbstractAsyncProducer

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/producer/AbstractAsyncProducer.java

public abstract class AbstractAsyncProducer extends AbstractProducer {

public class CallbackCompleter {

private InflightMessageList inflightMessages;

private final MaxwellContext context;

private final MaxwellConfig config;

private final Position position;

private final boolean isTXCommit;

private final long messageID;

public CallbackCompleter(InflightMessageList inflightMessages, Position position, boolean isTXCommit, MaxwellContext context, long messageID) {

this.inflightMessages = inflightMessages;

this.context = context;

this.config = context.getConfig();

this.position = position;

this.isTXCommit = isTXCommit;

this.messageID = messageID;

}

public void markCompleted() {

inflightMessages.freeSlot(messageID);

if(isTXCommit) {

InflightMessageList.InflightMessage message = inflightMessages.completeMessage(position);

if (message != null) {

context.setPosition(message.position);

long currentTime = System.currentTimeMillis();

long age = currentTime - message.sendTimeMS;

messagePublishTimer.update(age, TimeUnit.MILLISECONDS);

messageLatencyTimer.update(Math.max(0L, currentTime - message.eventTimeMS - 500L), TimeUnit.MILLISECONDS);

if (age > config.metricsAgeSlo) {

messageLatencySloViolationCount.inc();

}

}

}

}

}

private InflightMessageList inflightMessages;

public AbstractAsyncProducer(MaxwellContext context) {

super(context);

this.inflightMessages = new InflightMessageList(context);

Metrics metrics = context.getMetrics();

String gaugeName = metrics.metricName("inflightmessages", "count");

metrics.register(gaugeName, (Gauge) () -> (long) inflightMessages.size());

}

public abstract void sendAsync(RowMap r, CallbackCompleter cc) throws Exception;

@Override

public final void push(RowMap r) throws Exception {

Position position = r.getNextPosition();

// Rows that do not get sent to a target will be automatically marked as complete.

// We will attempt to commit a checkpoint up to the current row.

if(!r.shouldOutput(outputConfig)) {

if ( position != null ) {

inflightMessages.addMessage(position, r.getTimestampMillis(), 0L);

InflightMessageList.InflightMessage completed = inflightMessages.completeMessage(position);

if (completed != null) {

context.setPosition(completed.position);

}

}

return;

}

// back-pressure from slow producers

long messageID = inflightMessages.waitForSlot();

if(r.isTXCommit()) {

inflightMessages.addMessage(position, r.getTimestampMillis(), messageID);

}

CallbackCompleter cc = new CallbackCompleter(inflightMessages, position, r.isTXCommit(), context, messageID);

sendAsync(r, cc);

}

}

AbstractAsyncProducer继承了AbstractProducer,其push方法主要执行inflightMessages.addMessage及sendAsync

小结

MaxwellKafkaProducer继承了AbstractProducer,其构造器会创建ArrayBlockingQueue、ArrayBlockingQueue;其push方法则往queue中put数据;MaxwellKafkaProducerWorker继承了AbstractAsyncProducer,实现了Runnable及StoppableTask接口;其run方法使用while循环,不断执行drainDeadLetterQueue、queue.take()、this.push(row);drainDeadLetterQueue方法从deadLetterQueue拉取数据,然后通过sendAsync再次发送

doc

作者:codecraft

kafka身份认证 maxwell_聊聊MaxwellKafkaProducer相关推荐

  1. kafka身份认证 maxwell_Kafka 使用SASL / SCRAM进行身份验证

    使用SASL / SCRAM进行身份验证 请先在不配置任何身份验证的情况下启动Kafka 1. 创建SCRAM Credentials 1.1 创建broker通信用户(或称超级用户) bash Em ...

  2. Window下kafka 单机SASL_SCRAM加密及身份认证

    KAFKA加密认证机制中的SASL主要包括SASL_PLAINTEXT SASL_GSSAPI SASL_SCRAM.这里主要记录一下Windows下搭建配置单机sasl_scram环境. 一.前情提 ...

  3. 深入聊聊微服务架构的身份认证问题

    从单体应用架构到分布式应用架构再到微服务架构,应用的安全访问在不断的经受考验.为了适应架构的变化.需求的变化,身份认证与鉴权方案也在不断的变革.面对数十个甚至上百个微服务之间的调用,如何保证高效安全的 ...

  4. 芯盾时代:致力于身份认证安全的领军者

    本文讲的是芯盾时代:致力于身份认证安全的领军者 信息安全技术的关注点,一直在随着互联网的发展而变化. 互联网兴起时,作为流量聚集地的门户网站,主要面向大众提供信息,缺乏个性化色彩,防火墙.IDS/IP ...

  5. Python+OpenCv实现AI人脸识别身份认证系统(2)——人脸数据采集、存储

    原 Python+OpenCv实现AI人脸识别身份认证系统(2)--人脸数据采集.存储 2019年07月02日 08:47:52 不脱发的程序猿 阅读数 602更多 所属专栏: 人脸识别身份认证系统设 ...

  6. Shiro(三) 身份认证源码分析与 MD5 盐值加密

    文章目录 1. 身份认证 2. 身份验证的基本流程 3. 身份验证实现 3.1 在 `login.jsp` 添加登录表单 3.2 添加表单提交的 Controller 3.3 完善 Realm 的身份 ...

  7. 开启kafka密码认证

    Kafka默认未开启密码认证,可以免密登录,太不安全,因此需要开启密码认证. 一 kafka认证方式类型 kafka提供了多种安全认证机制,主要分为SSL和SASL大类.其中SASL/PLAIN是基于 ...

  8. 大数据Hadoop之——Kafka安全机制(Kafka SSL认证实现)

    文章目录 一.概述 1)SASL认证概述 2)Delegation Token认证概述 3)SSL认证概述(本章实现) 二.各种安全认证机制对比和使用场景 三.Kafka SSL认证实现 1)创建ss ...

  9. Shiro01 功能点框图、架构图、身份认证逻辑、身份认证代码实现

    基本功能点 功能点框图 功能点说明 1.Authentication:身份认证/登录,验证用户是不是拥有相应的身份: 2.Authorization:授权,即权限验证,验证某个已认证的用户是否拥有某个 ...

最新文章

  1. Python标准库:内置函数tuple([iterable])
  2. 上海python培训比较好的机构-上海Python培训机构推荐
  3. Java开发前景好,3大从业方向供你选择
  4. 如何比较 Java 的字符串
  5. 1114. 按序打印
  6. LeetCode 979. 在二叉树中分配硬币(DFS)
  7. iOS7应用开发2、关于新版的IDE:XCode 5
  8. .net发送带附件邮件
  9. TensorFlow神经网络(九)VGG net论文阅读笔记
  10. html表格基础及案例示图代码。
  11. Visio 2013—安装步骤说明
  12. python中一元二次方程的判别式_【Python算法作业】解一元二次方程
  13. 最简单的单片机c语言程序,单片机的C语言编程基础知识(初学注意)
  14. 鸿蒙哦叟,苕木匠时评:说鸿蒙“
  15. linux双系统无u盘安装教程视频教程,U盘安装Windows和Ubuntu 15.04双系统图解教程
  16. 21秋信源编码技术作业(1)——使用Audacity软件绘制清浊音频谱图并进行分析
  17. 手把手教你撸一个Web汇率计算器
  18. 运动会分数统计系统(数据结构)C++
  19. 堆栈与动态分配内存空间
  20. HTML canvas系列-画圆(4)

热门文章

  1. 基于vivado的fir ip核的重采样设计与实现
  2. 修改can接口波特率_CAN总线分析仪使用
  3. android切换字体颜色,Android开发实现按钮点击切换背景并修改文字颜色的方法
  4. 推荐陈永真著作《高效率开关电源设计与制作》
  5. python从list列表中选出一个数和其对应的坐标
  6. numpy.loadtxt画功率谱图
  7. django写项目的详细步骤
  8. Beta--冲刺阶段合集
  9. 《数据分析实战 基于EXCEL和SPSS系列工具的实践》一3.4 数据量太大了怎么办
  10. OSX Yosemite,pod install报错RPC failed; result=52,