如图所示,多个channel输入通过smux合并在一个连接中,后端服务将连接中的channel分离出来进行处理

smux.jpg

场景分析

假设一个简单的使用场景,一个apiservice网关服务对外提供HTTP接口,后面还有一个rand随机数服务,对内提供随机数TCP接口。

客户端访问apiservice接口,apiservice连接randservice服务获取数据并返回。如果不做多路复用的话,apiservice和randservice之间的连接数就是客户端请求数,这样apiservice和randservice之间连接过多会导致性能问题。

n link n link

+-----------+ +-------------+ +---------------+

| |

| client apiservice randservice |

| |

+-----------+ +-------------+ +---------------+

经过多路复用后,apiservice和randservice之间只有一个连接,这样无论多少个客户端请求都不会导致连接过多问题。

n link 1 link

+-----------+ +-------------+ +---------------+

| | | |

| client apiservice randservice |

| | | |

+-----------+ +-------------+ +---------------+

(当然这只是个示例场景而已,生产中apiservice和randservice之间使用RPC框架即可,不用我们手动写socket通信)

代码示例

1.随机数服务 randservice.go

package main

import (

"bytes"

"encoding/binary"

"fmt"

"github.com/rs/zerolog"

"github.com/rs/zerolog/log"

"github.com/xtaci/smux"

"math/rand"

"net"

"runtime"

"time"

)

func init() {

rand.Seed(time.Now().UnixNano())

}

/**

一个生成随机数的tcp服务

客户端发送'R', 'A', 'N', 'D',服务返回一个随机数

*/

func main() {

listener, err := net.Listen("tcp", ":9000")

if err != nil {

panic(err)

}

log.Info().Msg("随机数服务启动,监听9000端口")

defer listener.Close()

for {

conn, err := listener.Accept()

if err != nil {

fmt.Println(err.Error())

continue

}

go SessionHandler(conn)

}

}

/**

处理会话

每个tcp连接生成一个会话session

*/

func SessionHandler(conn net.Conn) {

session, err := smux.Server(conn, nil)

if err != nil {

panic(err)

}

log.Info().Msgf("收到客户端连接,创建新会话,对端地址:%s", session.RemoteAddr().String())

for !session.IsClosed() {

stream, err := session.AcceptStream()

if err != nil {

fmt.Println(err.Error())

break

}

go StreamHandler(stream)

}

log.Info().Msgf("客户端连接断开,销毁会话,对端地址:%s", session.RemoteAddr().String())

}

/**

流数据处理

*/

func StreamHandler(stream *smux.Stream) {

buffer := make([]byte, 1024)

n, err := stream.Read(buffer)

if err != nil {

log.Error().Msgf("流id:%d,异常信息:%s", stream.ID(), err.Error())

stream.Close()

return

}

cmd := buffer[:n]

if bytes.Equal(cmd, []byte{'R', 'A', 'N', 'D'}) {

rand := rand.Uint64()

response := make([]byte, 8)

binary.BigEndian.PutUint64(response, rand)

stream.Write(response)

log.Debug().Msgf("收到客户端数据,流id:%d,随机数:%d, 响应数据:%v", stream.ID(), rand, response)

} else {

log.Warn().Msgf("收到未知请求命令,流id:%d,请求命令:%v", stream.ID(), cmd)

}

}

2.api接口服务 apiservice.go

package main

import (

"encoding/binary"

"fmt"

"github.com/rs/zerolog"

"github.com/rs/zerolog/log"

"github.com/xtaci/smux"

"net"

"net/http"

"runtime"

)

/**

随机数服务客户端连接

*/

var randClient *smux.Session

func init() {

//连接后端随机数服务

conn, err := net.Dial("tcp", ":9000")

if err != nil {

log.Warn().Msg("随机数服务未启动")

panic(err)

}

session, err := smux.Client(conn, nil)

if err != nil {

log.Error().Msg("打开会话失败")

panic(err)

}

randClient = session

}

/**

一个api网关,对外提供api接口

调用随机数服务来获取随机数

*/

func main() {

defer randClient.Close()

http.HandleFunc("/rand", RandHandler)

http.ListenAndServe(":8080", nil)

}

/**

随机数接口

*/

func RandHandler(w http.ResponseWriter, r *http.Request) {

stream, err := randClient.OpenStream()

if err != nil {

w.WriteHeader(500)

fmt.Fprint(w, err.Error())

} else {

log.Debug().Msgf("收到请求,打开流成功,流id:%d", stream.ID())

defer stream.Close()

stream.Write([]byte{'R', 'A', 'N', 'D'})

buffer := make([]byte, 1024)

n, err := stream.Read(buffer)

if err != nil {

w.WriteHeader(500)

fmt.Fprint(w, err.Error())

} else {

response := buffer[:n]

var rand = binary.BigEndian.Uint64(response)

log.Debug().Msgf("收到服务端数据,流id:%d,随机数:%d, 响应数据:%v", stream.ID(), rand, response)

fmt.Fprintf(w, "%d", rand)

}

}

}

原理分析

smux将socket连接封装成session,每次请求响应封装成一个stream,通过自定义协议发送数据

VERSION(1B) | CMD(1B) | LENGTH(2B) | STREAMID(4B) | DATA(LENGTH)

VALUES FOR LATEST VERSION:

VERSION:

1/2

CMD:

cmdSYN(0)

cmdFIN(1)

cmdPSH(2)

cmdNOP(3)

cmdUPD(4) // only supported on version 2

STREAMID:

client use odd numbers starts from 1

server use even numbers starts from 0

cmdUPD:

| CONSUMED(4B) | WINDOW(4B) |

比如我们发送的RAND命令封装成以下数据包发送给服务端,假设请求的STREAMID为11223344

VERSION(1B) | CMD(1B) | LENGTH(2B) | 11223344 | RAND

VERSION(1B) | CMD(1B) | LENGTH(2B) | 11223344 | 0102030405060708

扩展优化

但是这样又导致了另一个问题,由于apiservice和randservice之间只有一个连接,而这一个连接只能由一个goroutine处理,这样就导致性能低下

所以进一步扩展apiservice和randservice之间建立固定数量的连接,如10个连接,用来处理所有的请求,就是通过连接池的方式来性能最大化

改造后的示意图如下:

n link 10 link

+-----------+ +-------------+ +---------------+

| |

| client apiservice randservice |

| |

+-----------+ +-------------+ +---------------+

连接池版代码 apiservicewithpool.go

package main

import (

"context"

"encoding/binary"

"fmt"

cpool "github.com/jolestar/go-commons-pool/v2"

"github.com/rs/zerolog"

"github.com/rs/zerolog/log"

"github.com/xtaci/smux"

"net"

"net/http"

"runtime"

)

var commonPool *cpool.ObjectPool

var ctx = context.Background()

func init() {

factory := cpool.NewPooledObjectFactorySimple(NewSessionCpool)

commonPool = cpool.NewObjectPoolWithDefaultConfig(ctx, factory)

commonPool.Config.MaxTotal = 10

}

/**

连接池生成新会话函数

*/

func NewSessionCpool(ctx context.Context) (interface{}, error) {

log.Debug().Msg("连接池中生成一个连接")

//连接后端随机数服务

conn, err := net.Dial("tcp", ":9000")

if err != nil {

log.Warn().Msg("随机数服务未启动")

panic(err)

}

//随机数服务客户端连接

session, err := smux.Client(conn, nil)

if err != nil {

log.Error().Msg("打开会话失败")

panic(err)

}

return session, err

}

/**

一个api网关,对外提供api接口

调用随机数服务来获取随机数

通过sync.Pool实现“连接池” !!! 不推荐这种方式,sync.Pool的种种特性不适合作为连接池

*/

func main() {

http.HandleFunc("/rand", CommonPoolRandHandler)

http.ListenAndServe(":8080", nil)

}

/**

随机数接口

*/

func CommonPoolRandHandler(w http.ResponseWriter, r *http.Request) {

obj, err := commonPool.BorrowObject(ctx)

if err != nil {

w.WriteHeader(500)

fmt.Fprint(w, err.Error())

return

}

client := obj.(*smux.Session)

stream, err := client.OpenStream()

if err != nil {

w.WriteHeader(500)

fmt.Fprint(w, err.Error())

} else {

log.Debug().Msgf("收到请求,打开流成功,流id:%d", stream.ID())

defer stream.Close()

stream.Write([]byte{'R', 'A', 'N', 'D'})

buffer := make([]byte, 1024)

n, err := stream.Read(buffer)

if err != nil {

w.WriteHeader(500)

fmt.Fprint(w, err.Error())

} else {

response := buffer[:n]

var rand = binary.BigEndian.Uint64(response)

log.Debug().Msgf("收到服务端数据,流id:%d,随机数:%d, 响应数据:%v", stream.ID(), rand, response)

fmt.Fprintf(w, "%d", rand)

}

}

commonPool.ReturnObject(ctx, obj)

}

经过连接池改造后的模型就像MySQL或Redis的使用场景,每次请求相当于一个stream,多个stream共用一个session,一个session背后有一个socket连接,程序和MySQL或Redis之间创建多个session放入连接池中,每次请求从连接池中拿出session进行读写操作

mysql sock golang_golang socket连接复用 - smux相关推荐

  1. mysql sock golang_golang thrift 总结一下网络上的一些坑

    我们以hello world来大概分析一下golang中的thrift包,并且扒一扒网络上有关thrift的一些坑 查看源码,服务器定义如下:(详见simple_server.go文件) type T ...

  2. cant connect local mysql to_连接Mysql提示Can't connect to local MySQL server through socket的解决方法...

    mysql,mysqldump,mysqldump,php连接mysql服务常会提示下面错误: ERROR 2002 (HY000): Can't connect to local MySQL ser ...

  3. 连接Mysql提示Can’t connect to local MySQL server through socket的解决方法

    mysql,mysqldump,Mysqladmin,php连接mysql服务常会提示下面错误: ERROR 2002 (HY000): Can't connect to local MySQL se ...

  4. cant connect local mysql to_连接Mysql提示Can’t connect to local MySQL server through socket的解决方法...

    mysql.mysqldump.mysqladmin.php连接mysql服务常会提示下面错误:ERROR 2002 (HY000): Can't connect to local MySQL ser ...

  5. mysql.sock 文件解析

    在观察MySQL本地连接的时候,发现对mysql.sock是个啥我不明白,于是我提出了一个问题:mysql.sock到底存了什么信息? 根据多方查资料和自我思考,我有了自己的一些认识和结论,但结论并不 ...

  6. ERROR 2002 (HY000): Can't connect to local MySQL server through socket '/var/lib/mysql/mysq

    http://hi.baidu.com/magecommerce/item/962c5a329db1eef1a9842809 配置文件设置不正确,也可能导致mysql.sock文件无法创建,消除错误日 ...

  7. ERROR 2002 (HY000): Cant connect to local MySQL server through socket的解决方法

    连接MySQL提示ERROR 2002 (HY000): Can't connect to local MySQL server through socket '/tmp/mysql.sock' (2 ...

  8. MySQL找不到mysql.sock文件的临时解

    连接数据库时报错:Can 't connect to local MySQL server through socket '/tmp/mysql.sock '(2) "; 原因:1)mysq ...

  9. mysql.sock的问题

    关于mysql.sock的一些问题: 连接数据库时报错:Can 't connect to local MySQL server through socket '/tmp/mysql.sock '(2 ...

  10. 解决问题Can’t connect to local MySQL server through socket

    不幸遇到MySQL出现ERROR 2002 (HY000): Can't connect to local mysql server through socket '/tmp/mysql.sock'错 ...

最新文章

  1. 推荐8个轻巧强大的办公工具,高效实用,不容错过
  2. SAP Spartacus i18n 的文本,和翻译相关的话题:internationalization
  3. C++学习——C++中的四种类型转换
  4. 华为手机系统更新后有什么大的变化?
  5. 老是原罪?技术圈为何不待见大龄企业家
  6. AcWing 1776. 牛的基因组学(STL+枚举)
  7. python jdk安装_环境搭建:1.JDK安装配置
  8. MCSE 2012 R2之工作文件夹Word Folders(2)
  9. Stylus Loader has been initialized using an options object that does not match the API schema.
  10. 油猴(Tampermonkey)插件+脚本+IDM=百度网盘高速下载
  11. 2023年东北大学外国语言学及应用语言学考研上岸经验贴
  12. 基于51单片机流水灯仿真与程序设计
  13. uni-app微信小程序,写一个级联查询
  14. 如何从google play下载apk
  15. 365资讯简报 每日精选12条新闻简报 每天一分钟 知晓天下事10月12日
  16. 小知识·typec耳机原理
  17. Oracle11G的数据库数据导入导出(由11g上导出导入10g数据库等)
  18. Vue:Vue的element组件中的el-row的属性gutter什么意思?
  19. 搭建CocoaPods私有库
  20. 基于Python的淘宝自动回复助手

热门文章

  1. 全国计算机等级考试三级网络技术知识点考点
  2. 苹果手机怎么编辑word文档_怎么用苹果手机扫描文件转换成Word?这个方法我一定要告诉你...
  3. 计算机包括台式机和笔记本,笔记本电脑与台式机怎样连接
  4. C语言控制51单片机音乐报告,51单片机_音乐_天空之城_C语言
  5. 说说数据一致性有哪几种?
  6. 服务器安全防护措施有哪些?
  7. 计算机认知矫正治疗游戏,CCRT认知矫正系统_计算机认知矫正系统_认知行为矫正治疗系统-3618医疗器械网...
  8. html炫酷动态时钟代码,js动态炫酷数字时钟
  9. 北京市电动自行车产品目录 汇总查询
  10. java 中抽象类的继承_java抽象类 继承