// main.go

// 代码仅供参考,无法直接运行.

package main

import (

"bytes"

"encoding/csv"

"fmt"

MQTT "github.com/eclipse/paho.mqtt.golang"

"github.com/myzhan/boomer"

"io"

"io/ioutil"

"log"

"os"

"strconv"

"strings"

"sync"

"time"

)

var rows [][]string // 读取csv文件保存到这里

var clientTopic []map[string]MQTT.Client

var conn = 0 // 调试用

var failCount = 0 // 初始化失败数量

var i = 0 // 控制并发

var j = 1 // 记录消息发送成功

var f = 1 // 记录消息发送失败

var nowStr = strconv.Itoa(int(time.Now().Unix())) // 当前时间戳,用来做后续查询的消息的标识符

func newConn(c MQTT.Client, clientId string, group *sync.WaitGroup) {

defer func() {

group.Add(-1)

err := recover()

if err != nil {

failCount++

fmt.Println("login fail clientId: ", clientId)

}

}()

token := c.Connect()

if token.Wait() && token.Error() != nil {

panic(token.Error())

}

// 组装topic

topic := fmt.Sprintf("msg/%s/supply", clientId)

temp := make(map[string]MQTT.Client)

temp[topic] = c

clientTopic = append(clientTopic, temp)

conn++ // 调试用

}

func initClients() {

var wg sync.WaitGroup

server := "server_ip:1883"

for i := 0; i < len(rows); i++ {

wg.Add(1)

clientId, userName, passWord := rows[i][0], rows[i][1], rows[i][2]

opts := MQTT.NewClientOptions().AddBroker(server)

opts.SetUsername(userName)

opts.SetPassword(passWord)

opts.SetClientID(clientId)

opts.SetKeepAlive(300 * time.Second)

c := MQTT.NewClient(opts)

go newConn(c, clientId, &wg)

}

wg.Wait() // 等到所有协程执行完成

fmt.Printf("init finish, clients len is %d \n", len(clientTopic))

fmt.Printf("conn: %d \n", conn)

fmt.Printf("failCount: %d \n", failCount)

}

func initCsvData() {

pwd, _ := os.Getwd()

b, err := ioutil.ReadFile(pwd + "/clients.csv")

fs := bytes.NewBuffer(b)

if err != nil {

log.Fatalf("can not open the file, err is %+v", err)

}

r := csv.NewReader(fs)

//针对大文件,一行一行的读取文件

for {

row, err := r.Read()

if err != nil && err != io.EOF {

log.Fatalf("can not read, err is %+v", err)

}

if err == io.EOF {

break

}

rows = append(rows, row)

}

}

func login() {

server := "server_ip:port"

clientId, userName, passWord := rows[i][0], rows[i][1], rows[i][2]

start := time.Now()

opts := MQTT.NewClientOptions().AddBroker(server)

opts.SetUsername(userName)

opts.SetPassword(passWord)

opts.SetClientID(clientId)

c := MQTT.NewClient(opts)

token := c.Connect()

elapsed := time.Since(start)

if token.Error() == nil {

log.Println("success" + strconv.Itoa(j))

boomer.RecordSuccess("tcp", "login", elapsed.Nanoseconds()/int64(time.Millisecond), int64(10))

} else {

log.Println(token.Error())

boomer.RecordFailure("tcp", "login", elapsed.Nanoseconds()/int64(time.Millisecond), clientId)

}

c.Disconnect(5)

// avoid out of array

if i < len(clientTopic)-1 {

i++

} else {

i = 0

}

j++

}

func sendMsg() {

start := time.Now()

msgId := "msg" + strconv.Itoa(i)

var clientId string

var topic string

var c MQTT.Client

for k, v := range clientTopic[i] {

clientId = k[6:19]

topic = k

c = v // v就是一个connected的client

}

deviceTime := nowStr

str := []string{msgId, clientId, deviceTime}

msgPayload := strings.Join(str, "|")

if c.IsConnected() == true {

token := c.Publish(topic, 1, false, msgPayload)

token.Wait() 等待消息发送完成,虽然会拉低并发,但必须要这么做,确保消息发送成功

elapsed := time.Since(start)

if token.Error() == nil {

fmt.Printf("this topic name is: %s \n", topic)

fmt.Printf("this topic payload is: %s \n", msgPayload)

fmt.Printf("success msg index: %v elapsed: %v \n", j, elapsed)

j++ // 消息发送成功, 记录一条,并且也给locust记录一条,方便后续校对数据量

boomer.RecordSuccess("tcp", "task", elapsed.Nanoseconds()/int64(time.Millisecond), int64(j))

// 避免数组越界

if i < len(clientTopic)-1 {

i++

} else {

i = 0

}

} else {

boomer.RecordFailure("tcp", "task", elapsed.Nanoseconds()/int64(time.Millisecond), msgPayload)

fmt.Printf("发送失败, fail msg index: %v \n", f)

}

} else {

if token := c.Connect(); token.Wait() && token.Error() != nil {

elapsed := time.Since(start)

fmt.Printf("fail msg index: %v \n", f)

f++

boomer.RecordFailure("tcp", "task", elapsed.Nanoseconds()/int64(time.Millisecond), msgPayload)

}

}

}

func main() {

initCsvData()

initClients()

task1 := &boomer.Task{

Name: "myTask",

Weight: 1,

Fn: sendMsg,

}

//task2 := &boomer.Task{

// Name: "login",

// Weight: 1,

// Fn: login,

//}

boomer.Run(task1)

}

java 高并发mqtt服务器_Boomer 实战压测 mqtt,2w 并发轻松实现相关推荐

  1. 早知道早幸福——从压测工具谈并发、压力、吞吐量

    目前腾讯WeTest服务器性能测试已经正式对外开放, 点击 链接:http://wetest.qq.com/gaps/ 立即体验! 导语   这篇文章其实憋了很久,最初仅仅是对吞吐量计算的个人兴趣研究 ...

  2. 【腾讯优测干货分享】从压测工具谈并发、压力、吞吐量

    本文来自于腾讯bugly开发者社区,非经作者同意,请勿转载,原文地址:http://dev.qq.com/topic/580d914e07b7fc1c26a0cf7c 前言 随着部门业务的拓展,我们有 ...

  3. java模拟数据库压测_写并发压测 java 脚本你必须会的 3 个类

    性能测试做到后面,一些特殊的场景利用常用的现成工具满足不了需求,所以你需要学习java写一些特定协议的压测脚本,那你不得不研究多线程或线程池,而此时你也一定会遇到java并发编程中的几个类,今天重点讲 ...

  4. 阿里巴巴高可用技术专家襄玲:压测环境的设计和搭建

    性能压测,是保障服务可用性和稳定性过程中,不可或缺的一环,但是有关性能压测的体系化分享并不多.从本期开始,我们将推出 <Performance Test Together> (简称PTT) ...

  5. locust压测mqtt

    Locust与Jmeter对比: 发压能力:相同并发下,Locust(使用FastHttpLocust)> Jmeter 并发能力:Locust和Jmeter旗鼓相当,都能满足工作需求,Jmet ...

  6. 一、实战-压测流程,总概述

    性能压测背景: 小型电商公司,平常访问量不大,但是一旦涉及引流搞活动时,网站明显性能有问题,平常上线功能中基本上不太考虑性能,长期已久后,线上性能问题成为一个隐形问题,不知道站点流量上限在哪里,故需要 ...

  7. JAVA读取jtl文件不完整,Jmeter压测之jtl文件解析

    一.背景 最近在做性能压测方面的工作,用的压测工具是Jmeter.Jmeter中有一个插件叫 jp@gc Throughput Shaping Timer.翻译过来就是吞吐量整形定时器.如下图: Th ...

  8. 压测中提高并发数后服务端处理时间增长的原因分析

    后端逻辑:  接收请求.分词匹配.读redis做一些过滤策略 在不断提高并发数的压测过程中,发现一个问题,后台服务对一个请求的处理时间也在增高, 而且增高的主要时间是消耗在读redis过滤那块 过滤那 ...

  9. 我的世界服务器在线压测,我朋友的我的世界服务器一直被压测怎么办啊

    我的世界开服时遇到压测攻击可以说是非常习以为常的事情,而压测的防御其实也并不是很简单,目前比较常用的防压测方法是物理防御及通过插件避免压测程序频繁的获取服务器详情,以下是一套卓有成效的防压测方案以及对 ...

最新文章

  1. 【OpenCV 】Sobel 导数/Laplace 算子/Canny 边缘检测
  2. 电信应在短时间内放弃CDMA网络
  3. 信息系统项目管理师-第5章:项目范围管理-重点汇总
  4. linux下配置DHCP中继代理
  5. C 的16个大坑,你能躲过几个?
  6. linux下杀死进程全权讲解
  7. python-模块的嵌套调用-随堂草稿件
  8. linux+读取初始化文件,Linux 初始化系统 SystemV Upstart
  9. IntelliJ idea 添加参数
  10. 【M1芯片兼容】MP3 Audio Recorder for Mac - MP3录音工具
  11. VIsualSVNSever 和SVN安装教程
  12. 一键拼接所有微信好友头像
  13. 信息学奥赛一本通答案-1006:A+B问题
  14. 006 管理Ceph的RBD块设备
  15. 互联网时代,传统企业如何做引流拓客?
  16. 【转】最全前端面试问题及答案总结
  17. java查找图片_用java查找图片中的图片?
  18. 微信昵称中表情保存到数据库问题
  19. 期货近远月价差的含义(期货远月比近月价格高)
  20. yaml简介读取及修改

热门文章

  1. for循环里面有异步操作_JS 线程与异步的那些事
  2. python缩进格式错误的是_19个常见的python错误和异常
  3. 这些黑科技让百姓安心、安全过年
  4. 别把数学想得太难,数学是一场游戏
  5. 如何理解马尔可夫决策过程?
  6. idea创建springboot项目,一直在reading pom.xml
  7. ConcurrentLinkedQueue常用方法
  8. C语言fgets函数了解
  9. gSoap客户端调用WebService完成后注意内存释放顺序
  10. 基于MaxCompute SQL 的半结构化数据处理实践