1. kafka安装与配置

  • 官网下载安装解压缩:http://kafka.apache.org/downloads
  • 下载解压启动
启动命令:bin/kafka-server-start.sh  -daemon config/server.properties关闭命令:./bin/kafka-server-stop.shserver.properties配置中需要关注以下几个参数:
broker.id=0 表示broker的编号,如果集群中有多个broker,则每个broker的编号需要设置
的不同
listeners=PLAINTEXT://:9092 brokder对外提供的服务入口地址
log.dirs=/tmp/kafka/log 设置存放消息日志文件的地址
zookeeper.connect=localhost:2181 Kafka所需Zookeeper集群地址,教学中Zookeeper和
Kafka都安装本机

2. Kafka测试消息生产与消费

  • 首先创建一个主题
    命令如下:
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic heima --partitions 2 --replication-factor 1--zookeeper:指定了Kafka所连接的Zookeeper服务地址
--topic:指定了所要创建主题的名称
--partitions:指定了分区个数
--replication-factor:指定了副本因子
--create:创建主题的动作指令
  • 报错
  • 解决
    解决方案:
    在较新版本(2.2 及更高版本)的 Kafka 不再需要 ZooKeeper 连接字符串,即- -zookeeper localhost:2181。使用 Kafka Broker的 --bootstrap-server localhost:9092来替代- -zookeeper localhost:2181。
bin/kafka-topics.sh --bootstrap-server localhost:9092  --create --topic heima --partitions 2 --replication-factor 1

  • 展示所有主题
    bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
  • 查看主题详情
    bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic heima
  • 启动消费端接收消息
    命令:bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic heima

–bootstrap-server 指定了连接Kafka集群的地址
–topic 指定了消费端订阅的主题

itcast@Server-node:/mnt/d/kafka_2.12-2.2.1$ bin/kafka-console-consumer.sh -- bootstrap-server localhost:9092 --topic heima Hello,Kafka!
  • 生产端发送消息
    命令:bin/kafka-console-producer.sh --broker-list localhost:9092 --topic heima

–broker-list 指定了连接的Kafka集群的地址
–topic 指定了发送消息时的主题

itcast@Server-node:/mnt/d/kafka_2.12-2.2.1$ bin/kafka-console-producer.sh -- broker-list localhost:9092 --topic heima >Hello,Kafka!
  • 生产者在linux输入发送消息的内容,消费者可以查收到


3. java连接Kafka

  • Producer
package com.hisign.xzxt2.kafka.learn.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class ProducerDemo {public static void main(String[] args) {Properties properties = new Properties();//broker的地址清单,建议至少填写两个,避免宕机properties.put("bootstrap.servers", "192.168.42.160:9092");//acks指定必须有多少个分区副本接收消息,生产者才认为消息写入成功,用户检测数据丢失的可能性//acks=0:生产者在成功写入消息之前不会等待任何来自服务器的响应。无法监控数据是否发送成功,但可以以网络能够支持的最大速度发送消息,达到很高的吞吐量。//acks=1:只要集群的首领节点收到消息,生产者就会收到来自服务器的成功响应。//acks=all:只有所有参与复制的节点全部收到消息时,生产者才会收到来自服务器的成功响应。这种模式是最安全的,properties.put("acks", "all");//retries:生产者从服务器收到的错误有可能是临时性的错误的次数properties.put("retries", 0);//batch.size:该参数指定了一个批次可以使用的内存大小,按照字节数计算(而不是消息个数)properties.put("batch.size", 16384);//linger.ms:该参数指定了生产者在发送批次之前等待更多消息加入批次的时间,增加延迟,提高吞吐量properties.put("linger.ms", 1);//buffer.memory该参数用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。properties.put("buffer.memory", 33554432);//key和value的序列化properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<String, String>(properties);try {//producer = new KafkaProducer<>(properties);for (int i = 0; i < 10; i++) {String values = "value" + i + "\t";producer.send(new ProducerRecord<String, String>("heima", "key" + Integer.toString(i), values));Thread.sleep(500);System.out.println("Sent:" + values);}} catch (Exception e) {e.printStackTrace();} finally {producer.close();}}
}
  • Consumer
package com.hisign.xzxt2.kafka.learn.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays;
import java.util.Properties;public class ConsumerDemo {public static void main(String[] args) throws Exception {String topicName = "heima";Properties props = new Properties();props.put("bootstrap.servers", "192.168.42.160:9092");props.put("group.id", "test");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("session.timeout.ms", "30000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);//Kafka Consumer subscribes list of topics here.consumer.subscribe(Arrays.asList(topicName));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records)// print the offset,key and value for the consumer records.System.out.printf("offset = %d, key = %s, value = %s\n",record.offset(), record.key(), record.value());}}}
  • 报错
connection to node -1 (/192.168.42.161:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
  • 解决
    网上好多是修改server.properties文件中绑定的ip地址,但是我的修改完后还是报错
    最后检查了一下防火墙的设置,是开启状态的,最后关闭防火墙,成功连接


Linux Kafka安装与启动相关推荐

  1. 大数据互联网架构阶段 Linux下安装mysql启动的常见问题

    Linux下安装mysql启动的常见问题 1.PID file could not be found mysql无法启动ERROR! MySQL is running but PID file cou ...

  2. kafka 安装和启动

    kafka中文教程 学习记录 首先准备好linux机器环境,参考此篇文章:快速搭建Linux服务器环境 单机版 1. 下载代码 https://kafka.apache.org/downloads.h ...

  3. Tengine的Linux下安装和启动

    选择的 Linux 系统为 CentOS7 1.下载tengine-2.3.3 2.解压tengine-2.3.3.tar.gz 一.安装 nginx 环境 执行下面4个命令 1. yum insta ...

  4. linux下安装nginx启动,Linux下安装启动nginx的过程

    1.首先将nginx的安装包传到虚拟机里的/home目录下 2.为了方便nginx运行而不影响linux安全需创建组合用户 groupadd -r nginx useradd -r -g nginx  ...

  5. Linux kafka安装

    一  JDK安装: 1.jdk1.7.0_55.tar.gz 准备安装包 2.tar -zxvf 安装包进行当前位置解压(一般解压在根目录opt下) 3.vi /etc/profile 编辑文本最下面 ...

  6. Linux下安装配置启动redis

    Linux下安装redis Linux版本: Centos7 1. 下载redis并解压 去reids官网下载看一下最新的版本 http://download.redis.io/releases 复制 ...

  7. 查询linux kafka安装目录,Linux下安装并(单节点)配置启动Kafka

    1. 从Kafka官网下载最新的Kafka,目前最新版本为0.9.0.1 2. 下载完毕后,上传到Linux服务器,并解压tar -xzf kafka_2.11-0.9.0.1.tgz 3. 修改Zo ...

  8. 查询linux kafka安装目录,Kafka 1.0.0安装和配置--Linux篇

    阅读目录: 1. 关闭防火墙和Selinux 2. 安装所需环境JDK,Zookeeper 3. 下载Kafka 1.0.0版本 4. 配置Kafka 5. 启动Kafka并验证 6. 报错及解决 7 ...

  9. Linux下安装Tomcat启动报错

    一.报以下错误: Using CATALINA_BASE:   /home/apache-tomcat-7.0.72 Using CATALINA_HOME:   /home/apache-tomca ...

最新文章

  1. Nauuo and Votes
  2. Language-Directed Hardware Design for Network Performance Monitoring——Marple
  3. Spring快速开启计划任务
  4. Ajax学习总结(1)——Ajax实例讲解与技术原理
  5. mysql ibd文件还原_MySQL
  6. SVM原理,及和逻辑回归区别
  7. python服务器搭建nginx_从0开始在腾讯云服务器上搭建python3+flask+uwsgi+nginx服务器...
  8. Raki的读paper小记:Soft Gazetteers for Low-Resource Named Entity Recognition
  9. lua实现xxTea加解密
  10. phyton的函数与类的学习
  11. 一键生成IOS App Icon工具
  12. 2020,我不想奋斗了
  13. YOLOv3使用笔记
  14. 【无标题】有向图的创建、求度和遍历
  15. Hyper-V虚拟机安装Linux后修改Linux的屏幕分辨率
  16. JAVA——JSch
  17. Java最大值最小值问题(用户输入)
  18. Android 生成自己的签名key(releasekey platform shared media networkstack verify等)
  19. 电影院3d是什么模式的_3D的完整形式是什么?
  20. Moments无法识别人像解决方法

热门文章

  1. 百度蜘蛛index.php,百度蜘蛛抓的这些404链接,不知道啥链接
  2. 2019年数学建模国赛赛前讲座收获
  3. 【CRM】浅析奔驰汽车的客户关系管理
  4. 用C语言写一个日期计算器
  5. 故障:卡死原因及解决
  6. [百度解析工具]利用IDM工具在线提速下载
  7. uvalive 4997 ABCD Tiles
  8. Microsoft Windows Server 2008 Standard Edition激活码申请方法
  9. Mentor Tanner EDA Tools version 16.30模拟/混合信号集成电路设计
  10. Ubuntu安装QT Creater和配置