Golang 连接Kafka
Kafka介绍
Kafka是Apache软件基金会开发的一个开源流处理平台,由Java和Scala编写;Kafka是一种高吞吐、分布式、基于订阅发布的消息系统。
Kafka名称解释
- Producer:生产者
- Consumer:消费者
- Topic:消息主题,每一类的消息称之为一个主题
- Broker:Kafka以集群的方式运行,可以由一个或多个服务器组成,每个服务器叫做一个broker
- Partition:物理概念上的分区,为了提供系统吞吐量,在物理上每个Topic会分为一个或多个Partition
Kafka架构图
一个典型的Kafka集群中包含若干Producer,若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。
Kafka通过Zookeeper管理集群配置及服务协同,Producer使用push模式将消息发布到broker,Consumer通过监听使用pull模式从broker订阅并消费消息。
图上有个细节需要注意,producer给broker的过程是push,也就是有数据就推送给broker,而consumer给broker的过程是pull,是通过consumer主动去拉数据的,而不是broker把数据主动发送给consumer端的。
Kafka与RabbitMQ比较
- Kafka比RabbitMQ性能要高
- RabbitMQ比Kafka可靠性要高
- 因此在金融支付领域使用RabbitMQ居多,而在日志处理、大数据等方面Kafka使用居多。
Kafka安装
第一步 下载Kafka:
地址 http://kafka.apache.org/downloads
第二步 解压Kafka:
tar -zxvf kafka.tgz -C /usr/local/kafka
第三步 运行Zookeeper:
以后台方式运行 /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties & zookeeper端口 2181
第四步 运行Kafka:
以后台方式运行 /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties kafka端口 9092
Kafka图形管理工具
http://www.kafkatool.com/download.html
Go语言中使用Kafka
Sarama is an MIT-licensed Go client library for Apache Kafka version 0.8 (and later).
安装sarama
go get github.com/Shopify/sarama
Producer
package mainimport ("fmt""github.com/Shopify/sarama" )func main() {// 新建一个arama配置实例config := sarama.NewConfig()// WaitForAll waits for all in-sync replicas to commit before responding.config.Producer.RequiredAcks = sarama.WaitForAll// NewRandomPartitioner returns a Partitioner which chooses a random partition each time.config.Producer.Partitioner = sarama.NewRandomPartitionerconfig.Producer.Return.Successes = true// 新建一个同步生产者client, err := sarama.NewSyncProducer([]string{"172.16.65.210:9092"}, config)if err != nil {fmt.Println("producer close, err:", err)return}defer client.Close()// 定义一个生产消息,包括Topic、消息内容、msg := &sarama.ProducerMessage{}msg.Topic = "revolution"msg.Key = sarama.StringEncoder("miles")msg.Value = sarama.StringEncoder("hello world...")// 发送消息pid, offset, err := client.SendMessage(msg)msg2 := &sarama.ProducerMessage{}msg2.Topic = "revolution"msg2.Key = sarama.StringEncoder("monroe")msg2.Value = sarama.StringEncoder("hello world2...")pid2, offset2, err := client.SendMessage(msg2)if err != nil {fmt.Println("send message failed,", err)return}fmt.Printf("pid:%v offset:%v\n", pid, offset)fmt.Printf("pid2:%v offset2:%v\n", pid2, offset2) }
Consumer
package mainimport ("sync""github.com/Shopify/sarama""fmt" )var wg sync.WaitGroupfunc main() {consumer, err := sarama.NewConsumer([]string{"172.16.65.210:9092"}, nil)if err != nil {fmt.Println("consumer connect error:", err)return}fmt.Println("connnect success...")defer consumer.Close()partitions, err := consumer.Partitions("revolution")if err != nil {fmt.Println("geet partitions failed, err:", err)return}for _, p := range partitions {partitionConsumer, err := consumer.ConsumePartition("revolution", p, sarama.OffsetOldest)if err != nil {fmt.Println("partitionConsumer err:", err)continue}wg.Add(1)go func(){for m := range partitionConsumer.Messages() {fmt.Printf("key: %s, text: %s, offset: %d\n", string(m.Key), string(m.Value), m.Offset)}wg.Done()}()}wg.Wait() }
转载于:https://www.cnblogs.com/vincenshen/p/9824486.html
Golang 连接Kafka相关推荐
- Golang连接kafka报错: Errorkafka: client has run out of available brokers to talk to
用到的go包: "github.com/Shopify/sarama" 详细的报错内容如下: 2022/10/28 15:39:25 Error creating consumer ...
- go连接Kafka报错kafka: client has run out of available brokers to talk to
问题出现的场景: 有个go写的项目,原来是用go vendor来管理依赖包的,现在改为是用go module方式了,转换之后,编译成功,发现了上面的错误.很显然连接kafka集群报错了. 问题的原因: ...
- go 连接 kafka 写 mysql
引言 上一篇中提到了,go 连接 kafka 进行消息的生产和消费过程.在这一篇中,将对 go 连接 kafka 写 mysql 进行简单的设计和实现. 本文主要针对的是 Mac 系统,如果使用其它操 ...
- golang 接口_「实战」助力数据库开发之接口篇 - Golang 连接 Greenplum
Greenplum 作为一款强大的 HTAP 数据库,针对大多数流行语言都有相应的连接库.大部分均是与 PostgreSQL 采用相同的接口,但是也有部分接口是 Greenplum 专门优化后用于自身 ...
- 64位oracle客户端_开发小记-golang连接Oracle数据库配置
项目需求golang连接Orale数据库,使用mattn/go-oci8包,github地址 https://github.com/mattn/go-oci8 过程 Oracle Client和SDK ...
- 物联网架构----EMQ-Hook了解、连接Kafka发送消息
物联网架构----EMQ-Hook了解.连接Kafka发送消息 1. 前言 按照我自己设计的物联网框架,对于MQTT集群中的所有消息,是要持久化到磁盘的,这里采用一个消息队列中间件Kafka作为数据缓 ...
- golang连接mysql操作示例增删改查
golang本身没有提供连接mysql的驱动,但是定义了标准接口供第三方开发驱动.这里连接mysql可以使用第三方库,第三方库推荐使用https://github.com/Go-SQL-Driver/ ...
- flink连接kafka报错
问题描述: 通过本地idea方式连接kafka,报如下错误: 可能原因:本地程序会查询本地配置,看是否存在ip映射,可能是本地ip映射不存在,导致连接kafka失败.(主要是由于kafka的serve ...
- 如何通过golang 连接阿里云的redis实例(golang带账号密码访问redis)
前言: 随着云技术和5G的到来,高并发.低延时.高扩展的需求必定会越来越高.并且现在的云技术实在超过了绝大多数小公司的技术积累,所以笔者认为将来必定会越来越多的公司会将自己的业务挪到云上来,也就是说将 ...
最新文章
- 自定义DateField,带时分秒
- github中SSH的Key
- 服务器连接异常系统无法登录,win10系统无法登录LOL提示“服务器连接异常”的解决方法...
- Boost:使用OpenCV在图像或相机框架上应用sobel过滤器
- php如何实现添加到购物车_PHP实现添加购物车功能
- 解决创建maven项目后,不能创建scala
- Codeforces Round #736 (Div. 1Div2)
- Java VisualVM无法检测到本地java程序 的 解决办法
- Windows2003开机后进不了系统
- 滤波器m矩阵 awr 不一样_云南tte滤波器_灿勤科技
- Unity加载GIf动画
- 易点易动助力叮咚买菜数字化管理固定资产和易耗品
- js中字符串方法集合
- 江枫谈淘宝“双十一”事件中的数据库架构优化
- 2021-2027年全球与中国彩色隐形眼镜行业市场前瞻与投资战略规划分析报告
- 如何零基础制作一款自己的游戏!(一)
- Linux学习日记15——exec函数族、回收子进程
- 【GBASE培训】GBase数据库2022年第6期培训圆满结束
- Springboot毕设项目酒店地下停车场管理系统47g66java+VUE+Mybatis+Maven+Mysql+sprnig)
- 什么是验厂什么是认证
热门文章
- 394. Decode String
- 中国大学MOOC 计算机组成原理第4章 测试(中)
- 2014\Province_C_C++_B\1 啤酒和饮料
- Process 'command '/usr/lib/jvm/java-11-openjdk-amd64/bin/java'' finished with non-zero exit value 1
- 信息学奥赛一本通(C++)在线评测系统——基础(一)C++语言——1082:求小数的某一位
- 【机器视觉】 import算子
- python小黄人程序_python signal信号
- 计算机密码都有什么用,要不是他,你根本不会忘记密码。
- mysql useing查询,MySQL数据库之多表查询using优化与案例
- 通俗易懂量子计算的原理