1.目标 - Kafka客户端

在本文的Kafka客户端中,我们将学习如何使用Kafka API 创建Apache Kafka客户端。有几种方法可以创建Kafka客户端,例如最多一次,至少一次,以及一次性消息处理需求。因此,在这个Kafka客户端教程中,我们将学习所有三种方式的详细描述。此外,我们将详细介绍如何使用Avro客户端。

那么,让我们开始Kafka客户端教程。

如何创建Kafka客户端:Avro Producer和Consumer Client

2. Kafka客户是什么?

  • 创建Kafka客户端的先决条件
  1. 最初,为了创建Kafka客户端,我们必须在本地计算机上设置Apache Kafka中间件。
  2. 此外,在开始创建Kafka客户端之前,本地安装的单个节点Kafka实例必须与我们的本地机器一起运行,并且需要运行Zookeeper和arning Kafka节点。

学习Apache Kafka用例| Kafka应用程序
此外,在Kafka客户端中创建一个名为normal-topic的主题,其中包含两个分区,命令为:

bin/kafka-topics --zookeeper localhost:2181 --create --topic normal-topic --partitions 2 --replication-factor 1

  1. bin / kafka-topics --zookeeper localhost:2181 --create --topic normal-topic --partitions 2 --rerelication -factor 1

此外,执行以下命令,以检查创建的主题的状态:

bin/kafka-topics --list --topic normal-topic --zookeeper localhost:2181

  1. bin / kafka-topics --list --topic normal-topic --zookeeper localhost:2181

此外,要在需要更改主题时增加分区,请执行以下命令:

bin/kafka-topics.sh --alter --topic normal-topic --zookeeper localhost:2181 --partitions 2

  1. bin / kafka-topics.sh --alter --topic normal-topic --zookeeper localhost:2181 --partitions 2

3.卡夫卡制片人客户

这里是以下代码来实现Kafka生产者客户端。它将有助于发送文本消息并调整循环以控制需要发送以创建Kafka客户端的消息数量:

public class ProducerExample {public static void main(String[] str) throws InterruptedException, IOException {System.out.println("Starting ProducerExample ...");sendMessages();}private static void sendMessages() throws InterruptedException, IOException {Producer<String, String> producer = createProducer();sendMessages(producer);// Allow the producer to complete sending of the messages before program exit.Thread.sleep(20);}private static Producer<String, String> createProducer() {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("acks", "all");props.put("retries", 0);// Controls how much bytes sender would wait to batch up before publishing to Kafka.props.put("batch.size", 10);props.put("linger.ms", 1);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");return new KafkaProducer(props);}private static void sendMessages(Producer<String, String> producer) {String topic = "normal-topic";int partition = 0;long record = 1;for (int i = 1; i <= 10; i++) {producer.send(new ProducerRecord<String, String>(topic, partition,                                 Long.toString(record),Long.toString(record++)));}}
}

4.消费者可以注册Kafka

首先,让我们学习几种方法,Kafka消费者客户可以通过这种方式向Kafka经纪人注册。具体来说,有两种方法,使用subscribe方法调用或使用assign方法调用。让我们详细了解这两种Kafka客户端方法。

一个。使用订阅方法调用

使用订阅方法调用时,Kafka会在添加/删除主题或分区时,或者在添加或删除使用者时自动重新平衡可用的使用者。

湾 使用分配方法调用。

但是,当消费者使用assign方法调用注册时,Kafka客户端不提供消费者的自动重新平衡。
让我们修改Kafka架构及其基本概念
上述任何一种注册选项都可以被最多一次,至少一次或完全一次的消费者使用。
一世。最多一次卡夫卡消费者(零次或多次交付)
基本上,这是卡夫卡消费者的默认行为。
要在Kafka客户端中配置此类型的使用者,请按照下列步骤操作:

  • 首先,将'enable.auto.commit'设置为true。
  • 另外,将'auto.commit.interval.ms'设置为较低的时间范围。
  • 确保不要调用consumer.commitSync(); 来自消费者。此外,Kafka将使用此消费者配置以指定的时间间隔自动提交偏移量。

然而,消费者有可能表现出最多一次或至少一次的行为,而消费者则以这种方式配置。虽然,让我们将此消费者声明为最多一次,因为最多一次是较低的消息传递保证。让我们详细讨论两种消费者行为:

  • 最多一次的情景

发生提交间隔的时刻,以及触发Kafka自动提交上次使用的偏移的时刻,这种情况发生。但是,让我们假设消息和消费者在处理之间崩溃了。然后,当消费者重新启动时,它开始从最后提交的偏移量接收消息。同时,消费者可能会丢失一些消息。
探索卡夫卡的优势与劣势

  • 至少一次的情况

当消费者处理消息并将消息提交到其持久存储中时,消费者在此时崩溃,这种情况发生。但是,让我们假设Kafka没有机会向代理提交偏移,因为提交间隔还没有通过。然后,当消费者重新启动时,它会从最后一个提交的偏移量中获得一些较旧的消息。
卡夫卡消费者代码:

public class AtMostOnceConsumer {public static void main(String[] str) throws InterruptedException {System.out.println("Starting  AtMostOnceConsumer ...");execute();}private static void execute() throws InterruptedException {KafkaConsumer<String, String> consumer = createConsumer();// Subscribe to all partition in that topic. 'assign' could be used here// instead of 'subscribe' to subscribe to specific partition.consumer.subscribe(Arrays.asList("normal-topic"));processRecords(consumer);}private static KafkaConsumer<String, String> createConsumer() {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");String consumeGroup = "cg1";props.put("group.id", consumeGroup);// Set this property, if auto commit should happen.props.put("enable.auto.commit", "true");// Auto commit interval, kafka would commit offset at this interval.props.put("auto.commit.interval.ms", "101");// This is how to control number of records being read in each pollprops.put("max.partition.fetch.bytes", "135");// Set this if you want to always read from beginning.// props.put("auto.offset.reset", "earliest");props.put("heartbeat.interval.ms", "3000");props.put("session.timeout.ms", "6001");props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");return new KafkaConsumer<String, String>(props);}private static void processRecords(KafkaConsumer<String, String> consumer)  {while (true) {ConsumerRecords<String, String> records = consumer.poll(100);long lastOffset = 0;for (ConsumerRecord<String, String> record : records) {System.out.printf("\n\roffset = %d, key = %s, value = %s", record.offset(),                                             record.key(), record.value());lastOffset = record.offset();}System.out.println("lastOffset read: " + lastOffset);process();}}private static void process() throws InterruptedException {// create some delay to simulate processing of the message.Thread.sleep(20);}
}

II。至少一次Kafka Consumer(一个或多个消息传递,可能重复)
为了配置此类型的使用者,请按照下列步骤操作:

  • 首先,将'enable.auto.commit'设置为false或
  • 另外,将'enable.auto.commit'设置为true,将'auto.commit.interval.ms'设置为更高的数字。

通过调用consumer.commitSync(),Consumer现在应该控制消息偏移提交给Kafka; 
此外,为了避免重复消息的重新处理,在消费者中实现“幂等”行为,尤其是对于这种类型的消费者,因为在以下场景中,可能发生重复的消息传递。
我们来讨论Apache Kafka Security | Kafka代码的需求和组成部分

public class AtLeastOnceConsumer {public static void main(String[] str) throws InterruptedException {System.out.println("Starting AutoOffsetGuranteedAtLeastOnceConsumer ...");execute();}private static void execute() throws InterruptedException {KafkaConsumer<String, String> consumer = createConsumer();// Subscribe to all partition in that topic. 'assign' could be used here// instead of 'subscribe' to subscribe to specific partition.consumer.subscribe(Arrays.asList("normal-topic"));processRecords(consumer);}private static KafkaConsumer<String, String> createConsumer() {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");String consumeGroup = "cg1";props.put("group.id", consumeGroup);// Set this property, if auto commit should happen.props.put("enable.auto.commit", "true");// Make Auto commit interval to a big number so that auto commit does not happen,// we are going to control the offset commit via consumer.commitSync(); after processing             // message.props.put("auto.commit.interval.ms", "999999999999");// This is how to control number of messages being read in each pollprops.put("max.partition.fetch.bytes", "135");props.put("heartbeat.interval.ms", "3000");props.put("session.timeout.ms", "6001");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");return new KafkaConsumer<String, String>(props);}private static void processRecords(KafkaConsumer<String, String> consumer) throws {while (true) {ConsumerRecords<String, String> records = consumer.poll(100);long lastOffset = 0;for (ConsumerRecord<String, String> record : records) {System.out.printf("\n\roffset = %d, key = %s, value = %s", record.offset(),                                         record.key(), record.value());lastOffset = record.offset();}System.out.println("lastOffset read: " + lastOffset);process();// Below call is important to control the offset commit. Do this call after you// finish processing the business process.
                   consumer.commitSync();}}private static void process() throws InterruptedException {// create some delay to simulate processing of the record.Thread.sleep(20);}
}

III。通过订阅(一个且只有一个消息传递)完全一次Kafka动态消费者
这里,通过'subscribe'(1,a)注册方法调用,消费者向Kafka注册。
确保在这种情况下应手动管理偏移量。要在Kafka客户端中设置完全一次的方案,请按照下列步骤操作:

  • 首先,设置enable.auto.commit = false。
  • 处理完消息后,请勿调用consumer.commitSync()。
  • 此外,通过进行“订阅”调用,将消费者注册到主题。
  • 要从该主题/分区的特定偏移量开始读取,请实现ConsumerRebalanceListener。此外,在侦听器中执行consumer.seek(topicPartition,offset)。
  • 作为安全网,实施幂等。

码:

public class ExactlyOnceDynamicConsumer {private static OffsetManager offsetManager = new OffsetManager("storage2");public static void main(String[] str) throws InterruptedException {System.out.println("Starting ExactlyOnceDynamicConsumer ...");readMessages();}private static void readMessages() throws InterruptedException {KafkaConsumer<String, String> consumer = createConsumer();// Manually controlling offset but register consumer to topics to get dynamically// assigned partitions. Inside MyConsumerRebalancerListener use// consumer.seek(topicPartition,offset) to control offset which messages to be read.consumer.subscribe(Arrays.asList("normal-topic"),new MyConsumerRebalancerListener(consumer));processRecords(consumer);}private static KafkaConsumer<String, String> createConsumer() {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");String consumeGroup = "cg3";props.put("group.id", consumeGroup);// To turn off the auto commit, below is a key setting.props.put("enable.auto.commit", "false");props.put("heartbeat.interval.ms", "2000");props.put("session.timeout.ms", "6001");// Control maximum data on each poll, make sure this value is bigger than the maximum                   // single message sizeprops.put("max.partition.fetch.bytes", "140");props.put("key.deserializer",                                 "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer",                         "org.apache.kafka.common.serialization.StringDeserializer");return new KafkaConsumer<String, String>(props);}private static void processRecords(KafkaConsumer<String, String> consumer)while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(),                                     record.key(), record.value());// Save processed offset in external storage.
                           offsetManager.saveOffsetInExternalStore(record.topic(), record.partition(),                                             record.offset());}}}
}
public class MyConsumerRebalancerListener implements                                 org.apache.kafka.clients.consumer.ConsumerRebalanceListener {private OffsetManager offsetManager = new OffsetManager("storage2");private Consumer<String, String> consumer;public MyConsumerRebalancerListener(Consumer<String, String> consumer) {this.consumer = consumer;}public void onPartitionsRevoked(Collection<TopicPartition> partitions) {for (TopicPartition partition : partitions) {offsetManager.saveOffsetInExternalStore(partition.topic(), partition.partition(),                     consumer.position(partition));}}public void onPartitionsAssigned(Collection<TopicPartition> partitions) {for (TopicPartition partition : partitions) {consumer.seek(partition,                             offsetManager.readOffsetFromExternalStore(partition.topic(),                             partition.partition()));}}
}
/**
* The partition offset are stored in an external storage. In this case in a local file system where
* program runs.
*/
public class OffsetManager {private String storagePrefix;public OffsetMpublic class ExactlyOnceDynamicConsumer {private static OffsetManager offsetManager = new OffsetManager("storage2");public static void main(String[] str) throws InterruptedException {System.out.println("Starting ExactlyOnceDynamicConsumer ...");readMessages();}private static void readMessages() throws InterruptedException {KafkaConsumer<String, String> consumer = createConsumer()// Manually controlling offset but register consumer to topics to get dynamically// assigned partitions. Inside MyConsumerRebalancerListener use// consumer.seek(topicPartition,offset) to control offset which messages to be read.consumer.subscribe(Arrays.asList("normal-topic"),new MyConsumerRebalancerListener(consumer));processRecords(consumer);}private static KafkaConsumer<String, String> createConsumer() {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");String consumeGroup = "cg3";props.put("group.id", consumeGroup);// To turn off the auto commit, below is a key setting.props.put("enable.auto.commit", "false");props.put("heartbeat.interval.ms", "2000");props.put("session.timeout.ms", "6001");// Control maximum data on each poll, make sure this value is bigger than the maximum                   // single message sizeprops.put("max.partition.fetch.bytes", "140");props.put("key.deserializer",                                 "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer",                         "org.apache.kafka.common.serialization.StringDeserializer");return new KafkaConsumer<String, String>(props);}private static void processRecords(KafkaConsumer<String, String> consumer) {while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(),                                     record.key(), record.value());// Save processed offset in external storage.
                           offsetManager.saveOffsetInExternalStore(record.topic(), record.partition(),                                             record.offset());}}}
}
public class MyConsumerRebalancerListener implements                                 org.apache.kafka.clients.consumer.ConsumerRebalanceListener {private OffsetManager offsetManager = new OffsetManager("storage2");private Consumer<String, String> consumer;public MyConsumerRebalancerListener(Consumer<String, String> consumer) {this.consumer = consumer;}public void onPartitionsRevoked(Collection<TopicPartition> partitions) {for (TopicPartition partition : partitions) {offsetManager.saveOffsetInExternalStore(partition.topic(), partition.partition(),                     consumer.position(partition));}}public void onPartitionsAssigned(Collection<TopicPartition> partitions) {for (TopicPartition partition : partitions) {consumer.seek(partition,                             offsetManager.readOffsetFromExternalStore(partition.topic(),                             partition.partition()));}}
}
/**
* The partition offset are stored in an external storage. In this case in a local file system where
* program runs.
*/
public class OffsetManager {private String storagePrefix;public OffsetManager(String storagePrefix) {this.storagePrefix = storagePrefix;}/*** in an external storage, overwrite the offset for the topic.** @param topic - Topic name.* @param partition - Partition of the topic.* @param offset - offset to be stored.*/void saveOffsetInExternalStore(String topic, int partition, long offset) {try {FileWriter writer = new FileWriter(storageName(topic, partition), false);BufferedWriter bufferedWriter = new BufferedWriter(writer);bufferedWriter.write(offset + "");bufferedWriter.flush();bufferedWriter.close();} catch (Exception e) {e.printStackTrace();throw new RuntimeException(e);}}/*** @return he last offset + 1 for the provided topic and partition.*/long readOffsetFromExternalStore(String topic, int partition) {try {Stream<String> stream = Files.lines(Paths.get(storageName(topic, partition)));return Long.parseLong(stream.collect(Collectors.toList()).get(0)) + 1;} catch (Exception e) {e.printStackTrace();}return 0;}private String storageName(String topic, int partition) {return storagePrefix + "-" + topic + "-" + partition;}
}
anager(String storagePrefix) {this.storagePrefix = storagePrefix;}/*** in an external storage, overwrite the offset for the topic.** @param topic - Topic name.* @param partition - Partition of the topic.* @param offset - offset to be stored.*/void saveOffsetInExternalStore(String topic, int partition, long offset) {try {FileWriter writer = new FileWriter(storageName(topic, partition), false);BufferedWriter bufferedWriter = new BufferedWriter(writer);bufferedWriter.write(offset + "");bufferedWriter.flush();bufferedWriter.close();} catch (Exception e) {e.printStackTrace();throw new RuntimeException(e);}}/*** @return he last offset + 1 for the provided topic and partition.*/long readOffsetFromExternalStore(String topic, int partition) {try {Stream<String> stream = Files.lines(Paths.get(storageName(topic, partition)));return Long.parseLong(stream.collect(Collectors.toList()).get(0)) + 1;} catch (Exception e) {e.printStackTrace();}return 0;}private String storageName(String topic, int partition) {return storagePrefix + "-" + topic + "-" + partition;}
}

看看Storm Kafka与配置和代码的集成
iv。完全一次Kafka静态消费者通过分配(一次和一次消息传递)
这里,通过'assign(2)注册方法调用,消费者向Kafka客户注册。
确保在这种情况下应手动管理偏移量。要通过Assign设置Exactly-once Kafka Static Consumer,请按照下列步骤操作:

  • 首先,设置enable.auto.commit = false
  • 请记住,在处理完消息后,请不要调用consumer.commitSync()。
  • 此外,通过使用'assign'调用,将consumer注册到特定分区。
  • 通过调用consumer.seek(topicPartition,offset),在消费者启动时寻找特定的消息偏移量。
  • 另外,作为安全网,实施幂等。

码:

public class ExactlyOnceStaticConsumer {private static OffsetManager offsetManager = new OffsetManager("storage1");public static void main(String[] str) throws InterruptedException, IOException {System.out.println("Starting ExactlyOnceStaticConsumer ...");readMessages();}private static void readMessages() throws InterruptedException, IOException {KafkaConsumer<String, String> consumer = createConsumer();String topic = "normal-topic";int partition =1;TopicPartition topicPartition =registerConsumerToSpecificPartition(consumer, topic, partition);// Read the offset for the topic and partition from external storage.long offset = offsetManager.readOffsetFromExternalStore(topic, partition);// Use seek and go to exact offset for that topic and partition.
               consumer.seek(topicPartition, offset);processRecords(consumer);}private static KafkaConsumer<String, String> createConsumer() {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");String consumeGroup = "cg2";props.put("group.id", consumeGroup);// To turn off the auto commit, below is a key setting.props.put("enable.auto.commit", "false");props.put("heartbeat.interval.ms", "2000");props.put("session.timeout.ms", "6001");// control maximum data on each poll, make sure this value is bigger than the maximum                 // single message sizeprops.put("max.partition.fetch.bytes", "140");props.put("key.deserializer",                                     "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer",                                     "org.apache.kafka.common.serialization.StringDeserializer");return new KafkaConsumer<String, String>(props);}/*** Manually listens for specific topic partition. Now, see an example of how to                * dynamically listens to partition and want to manually control offset,* ExactlyOnceDynamicConsumer.java*/private static TopicPartition registerConsumerToSpecificPartition(KafkaConsumer<String, String> consumer, String topic, int partition) {TopicPartition topicPartition = new TopicPartition(topic, partition);List<TopicPartition> partitions = Arrays.asList(topicPartition);consumer.assign(partitions);return topicPartition;}/*** Process data and store offset in external store. Best practice is to do these operations* atomically.*/private static void processRecords(KafkaConsumer<String, String> consumer) throws {while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(),                                                 record.key(), record.value());offsetManager.saveOffsetInExternalStore(record.topic(), record.partition(),                                                 record.offset());}}}
}

5. Avro制片人和消费者

在定义Avro时,它是一种开源二进制消息交换协议。基本上,为了通过线路发送优化的消息,这也减少了网络开销,我们使用它。此外,对于可以使用JSON定义的消息,Avro可以强制执行模式。通过使用这些模式,Avro可以使用各种编程语言生成绑定对象。将Avro与Kafka一起使用是本机支持的,也是强烈推荐的。
阅读Apache Kafka + Spark Streaming Integration
下面是一个简单的Avro消费者和制作人。

public class AvroConsumerExample {public static void main(String[] str) throws InterruptedException {System.out.println("Starting AutoOffsetAvroConsumerExample ...");readMessages();}private static void readMessages() throws InterruptedException {KafkaConsumer<String, byte[]> consumer = createConsumer();// Assign to specific topic and partition.consumer.assign(Arrays.asList(new TopicPartition("avro-topic", 0)));processRecords(consumer);}private static void processRecords(KafkaConsumer<String, byte[]> consumer) throws {while (true) {ConsumerRecords<String, byte[]> records = consumer.poll(100);long lastOffset = 0;for (ConsumerRecord<String, byte[]> record : records) {GenericRecord genericRecord                                        = AvroSupport.byteArrayToData(AvroSupport.getSchema(),                                             record.value());String firstName = AvroSupport.getValue(genericRecord,                                             "firstName", String.class);System.out.printf("\n\roffset = %d, key = %s, value = %s", record.offset(),                                             record.key(), firstName);lastOffset = record.offset();}System.out.println("lastOffset read: " + lastOffset);consumer.commitSync();}}private static KafkaConsumer<String, byte[]> createConsumer() {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");String consumeGroup = "cg1";props.put("group.id", consumeGroup);props.put("enable.auto.commit", "true");props.put("auto.offset.reset", "earliest");props.put("auto.commit.interval.ms", "100");props.put("heartbeat.interval.ms", "3000");props.put("session.timeout.ms", "30000");props.put("key.deserializer",                                     "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer",                                     "org.apache.kafka.common.serialization.ByteArrayDeserializer");return new KafkaConsumer<String, byte[]>(props);}
}
public class AvroProducerExample {public static void main(String[] str) throws InterruptedException, IOException {System.out.println("Starting ProducerAvroExample ...");sendMessages();}private static void sendMessages() throws InterruptedException, IOException {Producer<String, byte[]> producer = createProducer();sendRecords(producer);}private static Producer<String, byte[]> createProducer() {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("acks", "all");props.put("retries", 0);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer",                                 "org.apache.kafka.common.serialization.ByteArraySerializer");return new KafkaProducer(props);}private static void sendRecords(Producer<String, byte[]> producer) throws IOException, {String topic = "avro-topic";int partition = 0;while (true) {for (int i = 1; i < 100; i++)producer.send(new ProducerRecord<String, byte[]>(topic, partition,                                     Integer.toString(0), record(i + "")));}}private static byte[] record(String name) throws IOException {GenericRecord record = new GenericData.Record(AvroSupport.getSchema());record.put("firstName", name);return AvroSupport.dataToByteArray(AvroSupport.getSchema(), record);}
}

所以,这完全是关于Kafka客户端的。希望您喜欢我们对如何创建Kafka客户端的解释。

六,结论

因此,我们已经看到了使用Kafka API创建Kafka客户端的所有方法。此外,在这个Kafka Clients教程中,我们讨论了Kafka Producer Client,Kafka Consumer Client。除此之外,我们还了解了Avro Kafka Producer和Consumer Kafka客户。但是,如果对Kafka客户有任何疑问,请随时通过评论部分询问。

转载于:https://www.cnblogs.com/a00ium/p/10852433.html

如何创建Kafka客户端:Avro Producer和Consumer Client相关推荐

  1. nodejs链接kafka示例(producer、consumer)

    2019独角兽企业重金招聘Python工程师标准>>> 安装node环境: wget https://nodejs.org/dist/v6.10.3/node-v6.10.3-lin ...

  2. java连接kafka api_Kafka-JavaAPI(Producer And Consumer)

    Kafka--JAVA API(Producer和Consumer) Kafka 版本2.11-0.9.0.0 producer package com.yzy.spark.kafka; import ...

  3. SpringBoot整合kafka(实现producer和consumer)

    转载自 SpringBoot整合kafka(实现producer和consumer) 在Windows环境下安装运行Kafka:https://www.jianshu.com/p/d64798e81f ...

  4. kafka 0.9 java开发_kafka 0.9 java producer and consumer demo

    实验环境: kafka_2.11-0.9.0.1.tgz zookeeper-3.4.6.tar.gz 样例代码: git clone https://github.com/downgoon/hell ...

  5. Kafka学习整理七(producer和consumer编程实践)

    实践代码采用kafka-clients V0.10.0.0 编写 一.编写producer 第一步:使用./kafka-topics.sh 命令创建topic及partitions 分区数 ./kaf ...

  6. 基于Confluent.Kafka实现的Kafka客户端操作类使用详解

    一.引言 有段时间没有写东西了,当然不是没得写,还有MongoDB的系列没有写完呢,那个系列还要继续.今天正好是周末,有点时间,来写新东西吧.最近公司用了Kafka做为消息的中间件,最开始写的那个版本 ...

  7. MAC搭建kafka客户端以及实现生产消费

    Kafka 部分参数说明 (1)max.in.flight.requests.per.connection Kafka 可以保证同一个分区里的消息是有序的.也就是说,如果生产者按照一定的顺序发送消息, ...

  8. Jmeter之创建Kafka生产者和消费者进行性能测试

    目录 1. A Brief Overview of Apache Kafka 2. Pepper-Box Serialized Config 3. Pepper Box Kafka Sampler 4 ...

  9. Kafka 客户端实现逻辑分析

    这里主要分析kafka 客户端实现 (代码分析以perl kafka实现为准) kafka客户端分为生产者和消费者,生产者发送消息,消费者获取消息. 在kafka协议里客户端通信中用到的最多的四个协议 ...

最新文章

  1. 师生对话:我们都曾是爱学习的孩子
  2. vba抓取网页数据到excel_R语言网页数据抓取XML数据包
  3. win10 中的eclipse无法新建web项目
  4. 电梯维修属于什么服务器,电梯维修属于建筑服务中的修缮服务吗?
  5. 简明writeStream实现
  6. Java基础之正则表达式
  7. 学习Java编程面向对象的五大基本原则
  8. Operations Manager 2007 R2系列之仪表板(多)视图
  9. Java多线程学习四十:如何写一个必然死锁的例子
  10. 移动端API接口优化的术和结果
  11. Python中判断两个字符串的内容是否相同
  12. JAVA关键字final修饰类,Java入门之认识final关键字、权限修饰符和内部类
  13. java学习笔记(详细)
  14. 推荐多款好看的报表图表配色方案(适用于PPT,大屏可视化分析)
  15. XAMP下tomcat无法启动:Make sure you have Java JDK or JRE installed and the required ports are free解决方法
  16. “~i“在C语言的for循环中是什么意思
  17. 数据结构PTA习题:基础实验7-2.3 德才论 (25分)——排序
  18. 常用的行列式和矩阵的性质
  19. 人为什么活着?这个观点绝对让你耳目一新
  20. 想要30一朵花,这样做

热门文章

  1. 134. Gas Station 加油站
  2. 5.Vue 计算属性和侦听器
  3. python人工智能——机器学习——分类算法-k近邻算法
  4. 28335之SCI模块
  5. 【Qt】Qt之网格布局
  6. 【STM32】ST-LINK固件升级
  7. java secretkey_Java中的SecretKeyFactory类 | 学步园
  8. 浙江省计算机网络技术比赛,[2018年最新整理]0509浙江省三级计算机网络技术历年真题(含答桉).doc...
  9. php mysql 内存溢出_关于MySQL的整型数据的内存溢出问题的应对方法_MySQL
  10. Promise对象的创建与使用