性能优化的前提是工具,所谓:工欲善其事必先利其器

坦白的讲,kafka-producer-perf-test.sh这个脚本也可以做压测。但是使用这个工具有一个问题,那就是发送消息的格式无法做自定义。还有一点,作为程序员,使用现有的工具就像带着T XX一样,纵然这工具再怎么美味多姿,却感觉始终达不到内心的G点。

人活着图个啥? 不就是个爽嘛。

调侃完毕,开正题。


接下来我基于golang手把手教大家实现kafka发送压测工具。

kafka的发送分两种,一种是同步,一种是异步。

同步就是说,我先发一个,这个发完了,我再发下一个。所以同步的方式比较慢,在我的例子里,采用同步方式,大概每秒可以发送2000条消息;

异步就是说,我一直发,啥时候你发完了,通知我一下让我知道即可。所以,异步的方式比较快,在我的例子里,采用异步的方式,大概每秒可以发送10000条以上的消息。

那接下来,咱们上代码:

先看同步方式:

sync.go

package main

import (

"fmt"

"github.com/Shopify/sarama"

"github.com/json-iterator/go"

"time"

)

type Message struct {

Log string `json:"log"`

Stream string `json:"stream"`

Time string `json:"time"`

}

var sasl_enabled = true

func main() {

config := sarama.NewConfig()

// WaitForAll waits for all in-sync replicas to commit before responding.

config.Producer.RequiredAcks = sarama.WaitForAll

// NewRandomPartitioner returns a Partitioner which chooses a random partition each time.

config.Producer.Partitioner = sarama.NewRandomPartitioner

config.Producer.Return.Successes = true

if sasl_enabled {

config.Net.SASL.Enable = true

config.Net.SASL.User = "admin"

config.Net.SASL.Password = "admin-secret"

config.Net.SASL.Mechanism = "PLAIN"

}

client, err := sarama.NewSyncProducer([]string{"172.21.92.170:31090"}, config)

if err != nil {

fmt.Println("producer close, err:", err)

return

}

defer client.Close()

myMesg := Message{"error", "this is a log", "2020"}

var json= jsoniter.ConfigCompatibleWithStandardLibrary

myStr, err := json.MarshalToString(myMesg)

if err != nil {

fmt.Println("marshal error")

return

}

var n int = 0

start := time.Now()

defer func() {

cost := time.Since(start)

fmt.Println("cost=", cost)

}()

for n < 360000{

msg := &sarama.ProducerMessage{}

msg.Topic = "advlog"

msg.Key = sarama.StringEncoder("miles")

msg.Value = sarama.StringEncoder(myStr)

pid, offset, err := client.SendMessage(msg)

if err != nil {

fmt.Println("send message failed,", err)

return

}

fmt.Printf("pid:%v offset:%v", pid, offset)

n++

}

}

编译:

# export GOPATH=你的目录

# go build sync.go

再看异步方式:

package main

import (

"fmt"

"github.com/Shopify/sarama"

"github.com/json-iterator/go"

"log"

"runtime"

"sync"

"time"

)

type AMessage struct {

Log string `json:"log"`

Stream string `json:"stream"`

Time string `json:"time"`

}

var use_sasl = true

const total int = 1000000000

const interval_us time.Duration = 10

func main() {

runtime.GOMAXPROCS(runtime.NumCPU())

wg := &sync.WaitGroup{}

config := sarama.NewConfig()

config.Producer.Return.Successes = true

config.Producer.Return.Errors = true

if use_sasl {

config.Net.SASL.Enable = true

config.Net.SASL.User = "admin"

config.Net.SASL.Password = "admin-secret"

config.Net.SASL.Mechanism = "PLAIN"

}

producer, err := sarama.NewAsyncProducer([]string{"172.21.92.170:31090"}, config)

if err != nil {

fmt.Println("producer close, err:", err)

return

}

defer producer.AsyncClose()

go ProcessResponse(producer)

var topic string = "advlog"

var logMsg string = "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz"

myMesg := AMessage{logMsg, "stream", "2020"}

var json= jsoniter.ConfigCompatibleWithStandardLibrary

myStr, err := json.MarshalToString(myMesg)

if err != nil {

fmt.Println("marshal error")

return

}

var n int = 0

start := time.Now()

defer func() {

cost := time.Since(start)

fmt.Println("cost=", cost)

}()

for n < total {

producer.Input()

time.Sleep(interval_us * time.Microsecond)

n++

}

wg.Add(1)

fmt.Println("running...")

wg.Wait()

fmt.Printf("finished...")

}

func ProcessResponse(producer sarama.AsyncProducer) {

for {

select {

case result :=

//log.Printf("> message: "%s" sent to partition %d at offset %d", result.Value, result.Partition, result.Offset)

log.Printf("> message: sent to partition %d at offset %d", result.Partition, result.Offset)

case err :=

log.Println("Failed to produce message", err)

}

}

}

乖乖,看到代码了没?

我测试了10亿条数据。

我跑了一晚上,发送了10亿条数据。

稳着呢。


一大波儿性能测试工具和组件优化方案正在火速赶来,欢迎关注。

kafka测试工具_kafka压测工具:同步方式2000+、异步方式10000+、带源码相关推荐

  1. web版本 开源压测工具_Web压测工具之Webbench和http_load

    Webbench简介 是知名的网站压力测试工具,能测试处在相同硬件上,不同服务的性能以及不同硬件上同一个服务的运行状况. webbench的标准测试可以向我们展示服务器的两项内容:每秒钟相应请求数和每 ...

  2. 基准测试工具(压测工具):wrk---高并发、损耗低,安装简单 (一)

    基准测试工具:Wrk初识   最近和同事聊起常用的一些压测工具,谈到了Apache ab.阿里云的PTS.Jmeter.Locust以及wrk各自的一些优缺点和适用的场景类型. 这篇博客,简单介绍下H ...

  3. jdk自带压测工具_压测工具JMeter的使用

    最近接了压测的需求,顺带熟悉下压测工具的使用.这里推荐JMeter,安装快捷.请求方便,省了一大堆麻烦的事情.Apache JMeter是Apache组织开发的基于Java的压力测试工具.用于对软件做 ...

  4. jdk自带压测工具_jmeter压测工具

    一.目录文件讲解 目录 bin:核心可执行文件,包含配置 mac/linux:使用jmeter启动 windows:使用jmeter.bat启动 jmeter-server:mac/linux分布式压 ...

  5. linux压测接口工具,jmeter压测工具

    一.目录文件讲解 目录 bin:核心可执行文件,包含配置 mac/linux:使用jmeter启动 windows:使用jmeter.bat启动 jmeter-server:mac/linux分布式压 ...

  6. 压测工具 Jmeter 压测工具 apache bench

    Jmeter下载地址 Jmeter参考地址 一.Jmeter下载安装 下载地址:http://jmeter.apache.org/download_jmeter.cgi windows选择zip,li ...

  7. mysql 并发 压测工具_MySQL压测工具mysqlslap的介绍与使用

    一.Mysqlslap介绍 mysqlslap是MySQL5.1之后自带的benchmark基准测试工具,类似Apache Bench负载产生工具,生成schema,装载数据,执行benckmark和 ...

  8. 不看我真的会很伤心【压测工具:提升系统性能的利器】,查看TPS,计算TPS,计算压测指标、压测名词解释、教大家如何压测

    目录 前言 一.压测是什么? 二.为什么要压测? 三. 压测名词解释 1.压测类型解释 2.压测名词解释 3.机器性能指标解释 4.访问指标解释 四.如何计算压测指标 五.常见的压测工具 1.JMet ...

  9. MySQL测试工具之-TPCC(业界通用的压测工具)

    TPCC业界通用的压测工具,主要是压数据库性能. 首先安装tpcc 官网地址:https://github.com/Percona-Lab/tpcc-mysql [root@test3 src]# u ...

  10. jmeter,TCPCopy,loadrunner 等测试压测工具使用教程

    2.JMeter环境设置 – Jmeter中文网 Apache JMeter - User's Manual: Getting Started JMeter学习(一)工具简单介绍 - 阳光温暖了心情 ...

最新文章

  1. 百度成立国内首个深度学习教育联盟,将制定行业标准
  2. IDE:集成开发环境(Integrated Development Environment)
  3. 奇异递归模板模式(Curiously Recurring Template Pattern,CRTP)
  4. EF中Take和Skip的区别
  5. 什么原因接触接触impala的
  6. 网页弹出窗口代码【来源于网络】
  7. 使用jquery datatables插件遇到fnReloadAjax的问题
  8. 借助共享缓存redis实现分布式锁
  9. jQuery学习(十一)— 常用的删除方法
  10. android逆向工程dex2jar使用
  11. Win7 + VirtualBox安装Mac OS X雪豹操作系统图文详解[转]
  12. TensorFlow系列——一些api的使用场景及方式
  13. c语言浮典型数据类型,C语言的数据类型→浮点型数据
  14. 佛说,是我们自己苦了自己~
  15. 龙蜥社区新增100+家合作伙伴,堡塔、东方通、宝德等头部企业均已加入
  16. S32K系列S32K144学习笔记——ADC
  17. 弱酸阳离子树脂去除硫酸锂溶液中的钙镁离子技术
  18. Android开源控件收集整理
  19. C++程序设计原理与实践电子书pdf下载
  20. 4K超高清电视观看距离

热门文章

  1. unity Rigidbogy组件
  2. atitit 数字音频技术概论 艾提拉著 目录 1. 声学基础 2 1.1. 1.2人耳的听觉效应9 2 2. 第1章数字音频基础 2 2.1. 1.1音频的发展历史 2 2.2. 1.2音频的发展
  3. Atitit 流水线子线程异常处理 1.1. 大概原理是 FutureTask排除异常 FutureTask.get can throw ExecutionException,can catc
  4. atitit.获取connection hibernate4
  5. paip.提升用户体验---注册异常记录
  6. paip.asp 项目流程及管理工具总结
  7. 解密游走于法律边缘的爬虫技术
  8. Redis: 为行情数据库设计键值
  9. 深度 | EB级规模大数据平台核心技术揭秘(下)
  10. MoveIt China Developer Workshop