hander

  • 前言
  • hander 定义
  • 方法
    • 1. 注册服务
      • 本地服
      • 远程服务
    • 2. 获取服务
      • 本地
      • 远程
    • 3. 查找远程服务
    • 4. 处理调用
      • 包的处理
      • 消息处理
      • 本地调用处理
      • 远程调用处理
  • 下一章

前言

package cluster

注册服务

头文件 hander.go

hander 定义

type LocalHandler struct {localServices map[string]*component.Service // all registered servicelocalHandlers map[string]*component.Handler // all handler methodmu             sync.RWMutexremoteServices map[string][]*clusterpb.MemberInfopipeline    pipeline.PipelinecurrentNode *Node
}func NewHandler(currentNode *Node, pipeline pipeline.Pipeline) *LocalHandler {h := &LocalHandler{localServices:  make(map[string]*component.Service),localHandlers:  make(map[string]*component.Handler),remoteServices: map[string][]*clusterpb.MemberInfo{},pipeline:       pipeline,currentNode:    currentNode,}return h
}

// 握手和心跳消息

var (// cached serialized datahrd []byte // handshake response datahbd []byte // heartbeat packet data
)type rpcHandler func(session *session.Session, msg *message.Message, noCopy bool)func cache() {data, err := json.Marshal(map[string]interface{}{"code": 200,"sys":  map[string]float64{"heartbeat": env.Heartbeat.Seconds()},})if err != nil {panic(err)}hrd, err = codec.Encode(packet.Handshake, data)if err != nil {panic(err)}hbd, err = codec.Encode(packet.Heartbeat, nil)if err != nil {panic(err)}
}

方法

1. 注册服务

本地服

务和处理逻辑的hander

func (h *LocalHandler) register(comp component.Component, opts []component.Option) error {//注册新的服务s := component.NewService(comp, opts)//是否已经注册if _, ok := h.localServices[s.Name]; ok {return fmt.Errorf("handler: service already defined: %s", s.Name)}//解析符合条件的handerif err := s.ExtractHandler(); err != nil {return err}// register all localHandlersh.localServices[s.Name] = sfor name, handler := range s.Handlers {n := fmt.Sprintf("%s.%s", s.Name, name)log.Println("Register local handler", n)h.localHandlers[n] = handler}return nil
}

远程服务

func (h *LocalHandler) initRemoteService(members []*clusterpb.MemberInfo) {for _, m := range members {h.addRemoteService(m)}
}func (h *LocalHandler) addRemoteService(member *clusterpb.MemberInfo) {h.mu.Lock()defer h.mu.Unlock()for _, s := range member.Services {log.Println("Register remote service", s)h.remoteServices[s] = append(h.remoteServices[s], member)}
}

2. 获取服务

本地

func (h *LocalHandler) LocalService() []string {var result []stringfor service := range h.localServices {result = append(result, service)}sort.Strings(result)return result
}

远程

func (h *LocalHandler) RemoteService() []string {h.mu.RLock()defer h.mu.RUnlock()var result []stringfor service := range h.remoteServices {result = append(result, service)}sort.Strings(result)return result
}

3. 查找远程服务

func (h *LocalHandler) findMembers(service string) []*clusterpb.MemberInfo {h.mu.RLock()defer h.mu.RUnlock()return h.remoteServices[service]
}

4. 处理调用

包的处理

func (h *LocalHandler) processPacket(agent *agent, p *packet.Packet) error {switch p.Type {case packet.Handshake:if _, err := agent.conn.Write(hrd); err != nil {return err}agent.setStatus(statusHandshake)if env.Debug {log.Println(fmt.Sprintf("Session handshake Id=%d, Remote=%s", agent.session.ID(), agent.conn.RemoteAddr()))}case packet.HandshakeAck:agent.setStatus(statusWorking)if env.Debug {log.Println(fmt.Sprintf("Receive handshake ACK Id=%d, Remote=%s", agent.session.ID(), agent.conn.RemoteAddr()))}case packet.Data:if agent.status() < statusWorking {return fmt.Errorf("receive data on socket which not yet ACK, session will be closed immediately, remote=%s",agent.conn.RemoteAddr().String())}msg, err := message.Decode(p.Data)if err != nil {return err}h.processMessage(agent, msg)case packet.Heartbeat:// expected}agent.lastAt = time.Now().Unix()return nil
}

消息处理

//根据消息处理调用,然后根据是本地调用还是远程调用

func (h *LocalHandler) processMessage(agent *agent, msg *message.Message) {var lastMid uint64switch msg.Type {case message.Request:lastMid = msg.IDcase message.Notify:lastMid = 0default:log.Println("Invalid message type: " + msg.Type.String())return}handler, found := h.localHandlers[msg.Route]if !found {h.remoteProcess(agent.session, msg, false)} else {h.localProcess(handler, lastMid, agent.session, msg)}
}

本地调用处理

func (h *LocalHandler) localProcess(handler *component.Handler, lastMid uint64, session *session.Session, msg *message.Message) {if pipe := h.pipeline; pipe != nil {err := pipe.Inbound().Process(session, msg)if err != nil {log.Println("Pipeline process failed: " + err.Error())return}}var payload = msg.Datavar data interface{}if handler.IsRawArg {data = payload} else {data = reflect.New(handler.Type.Elem()).Interface()err := env.Serializer.Unmarshal(payload, data)if err != nil {log.Println(fmt.Sprintf("Deserialize to %T failed: %+v (%v)", data, err, payload))return}}if env.Debug {log.Println(fmt.Sprintf("UID=%d, Message={%s}, Data=%+v", session.UID(), msg.String(), data))}args := []reflect.Value{handler.Receiver, reflect.ValueOf(session), reflect.ValueOf(data)}task := func() {switch v := session.NetworkEntity().(type) {case *agent:v.lastMid = lastMidcase *acceptor:v.lastMid = lastMid}result := handler.Method.Func.Call(args)if len(result) > 0 {if err := result[0].Interface(); err != nil {log.Println(fmt.Sprintf("Service %s error: %+v", msg.Route, err))}}}index := strings.LastIndex(msg.Route, ".")if index < 0 {log.Println(fmt.Sprintf("nano/handler: invalid route %s", msg.Route))return}// A message can be dispatch to global thread or a user customized threadservice := msg.Route[:index]if s, found := h.localServices[service]; found && s.SchedName != "" {sched := session.Value(s.SchedName)if sched == nil {log.Println(fmt.Sprintf("nanl/handler: cannot found `schedular.LocalScheduler` by %s", s.SchedName))return}local, ok := sched.(scheduler.LocalScheduler)if !ok {log.Println(fmt.Sprintf("nanl/handler: Type %T does not implement the `schedular.LocalScheduler` interface",sched))return}local.Schedule(task)} else {scheduler.PushTask(task)}
}

远程调用处理

func (h *LocalHandler) remoteProcess(session *session.Session, msg *message.Message, noCopy bool) {index := strings.LastIndex(msg.Route, ".")if index < 0 {log.Println(fmt.Sprintf("nano/handler: invalid route %s", msg.Route))return}service := msg.Route[:index]members := h.findMembers(service)if len(members) == 0 {log.Println(fmt.Sprintf("nano/handler: %s not found(forgot registered?)", msg.Route))return}// Select a remote service address// 1. Use the service address directly if the router contains binding item// 2. Select a remote service address randomly and bind to routervar remoteAddr stringif addr, found := session.Router().Find(service); found {remoteAddr = addr} else {remoteAddr = members[rand.Intn(len(members))].ServiceAddrsession.Router().Bind(service, remoteAddr)}pool, err := h.currentNode.rpcClient.getConnPool(remoteAddr)if err != nil {log.Println(err)return}var data = msg.Dataif !noCopy && len(msg.Data) > 0 {data = make([]byte, len(msg.Data))copy(data, msg.Data)}// Retrieve gate address and session idgateAddr := h.currentNode.ServiceAddrsessionId := session.ID()switch v := session.NetworkEntity().(type) {case *acceptor:gateAddr = v.gateAddrsessionId = v.sid}client := clusterpb.NewMemberClient(pool.Get())switch msg.Type {case message.Request:request := &clusterpb.RequestMessage{GateAddr:  gateAddr,SessionId: sessionId,Id:        msg.ID,Route:     msg.Route,Data:      data,}_, err = client.HandleRequest(context.Background(), request)case message.Notify:request := &clusterpb.NotifyMessage{GateAddr:  gateAddr,SessionId: sessionId,Route:     msg.Route,Data:      data,}_, err = client.HandleNotify(context.Background(), request)}if err != nil {log.Println(fmt.Sprintf("Process remote message (%d:%s) error: %+v", msg.ID, msg.Route, err))}
}

下一章

下一章看mesage消息

go开源网络库nano(6)-hander逻辑相关推荐

  1. 直接拿来用!谷歌开源网络库 TensorNetwork,GPU 处理提升 100 倍

    编译 |  琥珀 出品 | AI科技大本营(ID:rgznai100) 世界上许多最严峻的科学挑战,如开发高温超导体和理解时空的本质,都涉及处理量子系统的复杂性.然而,这些系统中量子态的数量程指数级增 ...

  2. 唯快不破:开源网络库的分析libevent muduo nginx

    每一个开源项目存在都有它的道理和意义,不同的思想有不同的优缺点. libevent:这是一个用纯C写的开源库,属于一个轻量级的网络中间件.其中用到的基本数据结构也是非常巧妙.展现反应堆模型的基本使用方 ...

  3. 开源网络库Alamofire的安装及简单使用

    方式一.使用CocoaPods添加Alamofire依赖 1. 安装CocoaPods(一种第三方依赖管理工具) 网址:COCOAPODS 打开电脑上的终端(Terminal) 在终端输入COCOAP ...

  4. 收藏的博客 -- 高性能Linux/Windows服务器/C++网络库(★★★★★)

    免费的跨平台SSH和SFTP工具: https://www.putty.org/ -- Windows https://www.chiark.greenend.org.uk/~sgtatham/put ...

  5. C++常用库之网络库

    C++一个很大的用途就是作为网络层组件的开发语言.C++开发的第三方网络库也比较多.其实,c语言下的网络库也不少.现在简单介绍一下. ACE库 ACE是一个大型的中间件产品,代码有几十万行,非常宏大, ...

  6. linux tcp server开源,GitHub - 06linux/cellnet: 高性能,简单,方便的开源服务器网络库...

    cellnet cellnet是一个高性能,简单,方便的开源服务器网络库 自由混合编码,业务代码无需调整. TCP和html5的应用都可以直接使用cellnet迅速搭建服务器框架. 与Java的Net ...

  7. 国产网络库libhv开源四周年回顾

    libhv是一个跨平台的c/c++网络库,本文写在libhv开源四周年之际,借机回顾了libhv的发展历程. github地址:https://github.com/ithewei/libhv 文章目 ...

  8. libgo高性能网络服务器,【开源】gnet: 一个轻量级且高性能的 Golang 网络库

    ![](https://ask.qcloudimg.com/http-save/1303222/sipe2g9n9h.png) # Github 主页 [https://github.com/panj ...

  9. 【开源推荐】gnet: 一个轻量级且高性能的 Go 网络库

    Github 主页 https://github.com/panjf2000/gnet 欢迎大家围观~~,目前还在持续更新,感兴趣的话可以 star 一下暗中观察哦. 简介 gnet 是一个基于事件驱 ...

最新文章

  1. 入住两年的CSDN,在今天2020年8月27日,成为CSDN博客专家
  2. 大剑无锋之post那么多优点,为什么还用get
  3. 那些年,我们见过的Java服务端乱象
  4. 控制附件的大小 php,wordpress如何修改默认上传附件限制大小
  5. Python精通-Python字典操作
  6. 理解SQL Server中的权限体系(下)----安全对象和权限
  7. (转)HDOJ 4006 The kth great number(优先队列)
  8. 新手在前期应该怎样发“外链”(4)之终级外链法
  9. html中id和name的异同
  10. 网站开发之HTML基础表格Table和表单Form(三)
  11. CRMEB Min电商系统商城源码 v4.3.2
  12. 门禁系统java_java实现门禁系统
  13. 漫步数理统计三十一——依分布收敛
  14. python四分位数_python 计算箱线图、中位数、上下四分位数等
  15. HTML在列表中加图片,HTML + JS 列表显示图片
  16. 数据结构-图的应用-最小生成树(类C语言版)
  17. 135、137、138、139、445等端口解释和关闭方法
  18. Vue3定义全局变量/方法
  19. 解决虚拟机打不开Ubuntu的问题:
  20. echarts动态滑动平均滤波

热门文章

  1. Java项目论文+PPT+源码等]基于javaweb的网上订餐管理系统|点餐餐饮餐厅
  2. 集显、独显、核显、SOC、POP、微型计算机树莓派的资料
  3. 键盘上F1~F12各个功能键的作用
  4. 学习日记day17 ps
  5. vim配置参考备忘-------嵌入式
  6. 3.1 Vue实战--电商后台管理系统 的登录功能 补充了 加密功能
  7. 增强型MOS管工作相关问题
  8. Dynamics 365 OP V9.1启用邮箱失败问题
  9. 官网购买的个人版 ArcGIS Pro安装授权问题汇总(持续更新)
  10. 2021京东Java面试真题:c和java哪个更适合开发游戏