go开源网络库nano(6)-hander逻辑
hander
- 前言
- hander 定义
- 方法
- 1. 注册服务
- 本地服
- 远程服务
- 2. 获取服务
- 本地
- 远程
- 3. 查找远程服务
- 4. 处理调用
- 包的处理
- 消息处理
- 本地调用处理
- 远程调用处理
- 下一章
前言
package cluster
注册服务
头文件 hander.go
hander 定义
type LocalHandler struct {localServices map[string]*component.Service // all registered servicelocalHandlers map[string]*component.Handler // all handler methodmu sync.RWMutexremoteServices map[string][]*clusterpb.MemberInfopipeline pipeline.PipelinecurrentNode *Node
}func NewHandler(currentNode *Node, pipeline pipeline.Pipeline) *LocalHandler {h := &LocalHandler{localServices: make(map[string]*component.Service),localHandlers: make(map[string]*component.Handler),remoteServices: map[string][]*clusterpb.MemberInfo{},pipeline: pipeline,currentNode: currentNode,}return h
}
// 握手和心跳消息
var (// cached serialized datahrd []byte // handshake response datahbd []byte // heartbeat packet data
)type rpcHandler func(session *session.Session, msg *message.Message, noCopy bool)func cache() {data, err := json.Marshal(map[string]interface{}{"code": 200,"sys": map[string]float64{"heartbeat": env.Heartbeat.Seconds()},})if err != nil {panic(err)}hrd, err = codec.Encode(packet.Handshake, data)if err != nil {panic(err)}hbd, err = codec.Encode(packet.Heartbeat, nil)if err != nil {panic(err)}
}
方法
1. 注册服务
本地服
务和处理逻辑的hander
func (h *LocalHandler) register(comp component.Component, opts []component.Option) error {//注册新的服务s := component.NewService(comp, opts)//是否已经注册if _, ok := h.localServices[s.Name]; ok {return fmt.Errorf("handler: service already defined: %s", s.Name)}//解析符合条件的handerif err := s.ExtractHandler(); err != nil {return err}// register all localHandlersh.localServices[s.Name] = sfor name, handler := range s.Handlers {n := fmt.Sprintf("%s.%s", s.Name, name)log.Println("Register local handler", n)h.localHandlers[n] = handler}return nil
}
远程服务
func (h *LocalHandler) initRemoteService(members []*clusterpb.MemberInfo) {for _, m := range members {h.addRemoteService(m)}
}func (h *LocalHandler) addRemoteService(member *clusterpb.MemberInfo) {h.mu.Lock()defer h.mu.Unlock()for _, s := range member.Services {log.Println("Register remote service", s)h.remoteServices[s] = append(h.remoteServices[s], member)}
}
2. 获取服务
本地
func (h *LocalHandler) LocalService() []string {var result []stringfor service := range h.localServices {result = append(result, service)}sort.Strings(result)return result
}
远程
func (h *LocalHandler) RemoteService() []string {h.mu.RLock()defer h.mu.RUnlock()var result []stringfor service := range h.remoteServices {result = append(result, service)}sort.Strings(result)return result
}
3. 查找远程服务
func (h *LocalHandler) findMembers(service string) []*clusterpb.MemberInfo {h.mu.RLock()defer h.mu.RUnlock()return h.remoteServices[service]
}
4. 处理调用
包的处理
func (h *LocalHandler) processPacket(agent *agent, p *packet.Packet) error {switch p.Type {case packet.Handshake:if _, err := agent.conn.Write(hrd); err != nil {return err}agent.setStatus(statusHandshake)if env.Debug {log.Println(fmt.Sprintf("Session handshake Id=%d, Remote=%s", agent.session.ID(), agent.conn.RemoteAddr()))}case packet.HandshakeAck:agent.setStatus(statusWorking)if env.Debug {log.Println(fmt.Sprintf("Receive handshake ACK Id=%d, Remote=%s", agent.session.ID(), agent.conn.RemoteAddr()))}case packet.Data:if agent.status() < statusWorking {return fmt.Errorf("receive data on socket which not yet ACK, session will be closed immediately, remote=%s",agent.conn.RemoteAddr().String())}msg, err := message.Decode(p.Data)if err != nil {return err}h.processMessage(agent, msg)case packet.Heartbeat:// expected}agent.lastAt = time.Now().Unix()return nil
}
消息处理
//根据消息处理调用,然后根据是本地调用还是远程调用
func (h *LocalHandler) processMessage(agent *agent, msg *message.Message) {var lastMid uint64switch msg.Type {case message.Request:lastMid = msg.IDcase message.Notify:lastMid = 0default:log.Println("Invalid message type: " + msg.Type.String())return}handler, found := h.localHandlers[msg.Route]if !found {h.remoteProcess(agent.session, msg, false)} else {h.localProcess(handler, lastMid, agent.session, msg)}
}
本地调用处理
func (h *LocalHandler) localProcess(handler *component.Handler, lastMid uint64, session *session.Session, msg *message.Message) {if pipe := h.pipeline; pipe != nil {err := pipe.Inbound().Process(session, msg)if err != nil {log.Println("Pipeline process failed: " + err.Error())return}}var payload = msg.Datavar data interface{}if handler.IsRawArg {data = payload} else {data = reflect.New(handler.Type.Elem()).Interface()err := env.Serializer.Unmarshal(payload, data)if err != nil {log.Println(fmt.Sprintf("Deserialize to %T failed: %+v (%v)", data, err, payload))return}}if env.Debug {log.Println(fmt.Sprintf("UID=%d, Message={%s}, Data=%+v", session.UID(), msg.String(), data))}args := []reflect.Value{handler.Receiver, reflect.ValueOf(session), reflect.ValueOf(data)}task := func() {switch v := session.NetworkEntity().(type) {case *agent:v.lastMid = lastMidcase *acceptor:v.lastMid = lastMid}result := handler.Method.Func.Call(args)if len(result) > 0 {if err := result[0].Interface(); err != nil {log.Println(fmt.Sprintf("Service %s error: %+v", msg.Route, err))}}}index := strings.LastIndex(msg.Route, ".")if index < 0 {log.Println(fmt.Sprintf("nano/handler: invalid route %s", msg.Route))return}// A message can be dispatch to global thread or a user customized threadservice := msg.Route[:index]if s, found := h.localServices[service]; found && s.SchedName != "" {sched := session.Value(s.SchedName)if sched == nil {log.Println(fmt.Sprintf("nanl/handler: cannot found `schedular.LocalScheduler` by %s", s.SchedName))return}local, ok := sched.(scheduler.LocalScheduler)if !ok {log.Println(fmt.Sprintf("nanl/handler: Type %T does not implement the `schedular.LocalScheduler` interface",sched))return}local.Schedule(task)} else {scheduler.PushTask(task)}
}
远程调用处理
func (h *LocalHandler) remoteProcess(session *session.Session, msg *message.Message, noCopy bool) {index := strings.LastIndex(msg.Route, ".")if index < 0 {log.Println(fmt.Sprintf("nano/handler: invalid route %s", msg.Route))return}service := msg.Route[:index]members := h.findMembers(service)if len(members) == 0 {log.Println(fmt.Sprintf("nano/handler: %s not found(forgot registered?)", msg.Route))return}// Select a remote service address// 1. Use the service address directly if the router contains binding item// 2. Select a remote service address randomly and bind to routervar remoteAddr stringif addr, found := session.Router().Find(service); found {remoteAddr = addr} else {remoteAddr = members[rand.Intn(len(members))].ServiceAddrsession.Router().Bind(service, remoteAddr)}pool, err := h.currentNode.rpcClient.getConnPool(remoteAddr)if err != nil {log.Println(err)return}var data = msg.Dataif !noCopy && len(msg.Data) > 0 {data = make([]byte, len(msg.Data))copy(data, msg.Data)}// Retrieve gate address and session idgateAddr := h.currentNode.ServiceAddrsessionId := session.ID()switch v := session.NetworkEntity().(type) {case *acceptor:gateAddr = v.gateAddrsessionId = v.sid}client := clusterpb.NewMemberClient(pool.Get())switch msg.Type {case message.Request:request := &clusterpb.RequestMessage{GateAddr: gateAddr,SessionId: sessionId,Id: msg.ID,Route: msg.Route,Data: data,}_, err = client.HandleRequest(context.Background(), request)case message.Notify:request := &clusterpb.NotifyMessage{GateAddr: gateAddr,SessionId: sessionId,Route: msg.Route,Data: data,}_, err = client.HandleNotify(context.Background(), request)}if err != nil {log.Println(fmt.Sprintf("Process remote message (%d:%s) error: %+v", msg.ID, msg.Route, err))}
}
下一章
下一章看mesage消息
go开源网络库nano(6)-hander逻辑相关推荐
- 直接拿来用!谷歌开源网络库 TensorNetwork,GPU 处理提升 100 倍
编译 | 琥珀 出品 | AI科技大本营(ID:rgznai100) 世界上许多最严峻的科学挑战,如开发高温超导体和理解时空的本质,都涉及处理量子系统的复杂性.然而,这些系统中量子态的数量程指数级增 ...
- 唯快不破:开源网络库的分析libevent muduo nginx
每一个开源项目存在都有它的道理和意义,不同的思想有不同的优缺点. libevent:这是一个用纯C写的开源库,属于一个轻量级的网络中间件.其中用到的基本数据结构也是非常巧妙.展现反应堆模型的基本使用方 ...
- 开源网络库Alamofire的安装及简单使用
方式一.使用CocoaPods添加Alamofire依赖 1. 安装CocoaPods(一种第三方依赖管理工具) 网址:COCOAPODS 打开电脑上的终端(Terminal) 在终端输入COCOAP ...
- 收藏的博客 -- 高性能Linux/Windows服务器/C++网络库(★★★★★)
免费的跨平台SSH和SFTP工具: https://www.putty.org/ -- Windows https://www.chiark.greenend.org.uk/~sgtatham/put ...
- C++常用库之网络库
C++一个很大的用途就是作为网络层组件的开发语言.C++开发的第三方网络库也比较多.其实,c语言下的网络库也不少.现在简单介绍一下. ACE库 ACE是一个大型的中间件产品,代码有几十万行,非常宏大, ...
- linux tcp server开源,GitHub - 06linux/cellnet: 高性能,简单,方便的开源服务器网络库...
cellnet cellnet是一个高性能,简单,方便的开源服务器网络库 自由混合编码,业务代码无需调整. TCP和html5的应用都可以直接使用cellnet迅速搭建服务器框架. 与Java的Net ...
- 国产网络库libhv开源四周年回顾
libhv是一个跨平台的c/c++网络库,本文写在libhv开源四周年之际,借机回顾了libhv的发展历程. github地址:https://github.com/ithewei/libhv 文章目 ...
- libgo高性能网络服务器,【开源】gnet: 一个轻量级且高性能的 Golang 网络库
![](https://ask.qcloudimg.com/http-save/1303222/sipe2g9n9h.png) # Github 主页 [https://github.com/panj ...
- 【开源推荐】gnet: 一个轻量级且高性能的 Go 网络库
Github 主页 https://github.com/panjf2000/gnet 欢迎大家围观~~,目前还在持续更新,感兴趣的话可以 star 一下暗中观察哦. 简介 gnet 是一个基于事件驱 ...
最新文章
- 入住两年的CSDN,在今天2020年8月27日,成为CSDN博客专家
- 大剑无锋之post那么多优点,为什么还用get
- 那些年,我们见过的Java服务端乱象
- 控制附件的大小 php,wordpress如何修改默认上传附件限制大小
- Python精通-Python字典操作
- 理解SQL Server中的权限体系(下)----安全对象和权限
- (转)HDOJ 4006 The kth great number(优先队列)
- 新手在前期应该怎样发“外链”(4)之终级外链法
- html中id和name的异同
- 网站开发之HTML基础表格Table和表单Form(三)
- CRMEB Min电商系统商城源码 v4.3.2
- 门禁系统java_java实现门禁系统
- 漫步数理统计三十一——依分布收敛
- python四分位数_python 计算箱线图、中位数、上下四分位数等
- HTML在列表中加图片,HTML + JS 列表显示图片
- 数据结构-图的应用-最小生成树(类C语言版)
- 135、137、138、139、445等端口解释和关闭方法
- Vue3定义全局变量/方法
- 解决虚拟机打不开Ubuntu的问题:
- echarts动态滑动平均滤波