2019独角兽企业重金招聘Python工程师标准>>>

基本概念

Wangle中的Pipeline和Netty中的Pipeline是很相似的,既可以将它看为一种职责链模式的实现也可以看作是Handler的容器。Pipeline中的handler都是串行化执行的,前一个handler完成自己的工作之后把事件传递给下一个handler,理论上Pipeline中的所有handler都是在同一个IO线程中执行的,但是为了防止某些handler(比如序列化、编解码handler等)耗时过长,Netty中允许为某些handler指定其它线程(eventloop)异步执行,类似的功能在Wangle中也有体现,只是在实现方式上有些区别。和Netty中一个较大的区别是,Wangle中并没有专门的Channel定义,Wangle中的Pipeline兼有了Channel的角色和功能。下面分别就Pipeline、Handler和Context的顺序进行源码分析。

Pipeline

PipelineBase作为Pipeline的基类,提供了一些最为通用、核心的api实现,比如对handler的操作:addBack及其变体、addFront及其变体、remove及其变体等,下面看一下addBack的一个实现版本:

template <class H>
PipelineBase& PipelineBase::addBack(std::shared_ptr<H> handler) {typedef typename ContextType<H>::type Context;// 声明Conetxt类型,ContextImpl<Handler>、InboundContextImpl<Handler>、OutboundContextImpl<Handler>其中之一// 使用Context包装Handler后,将其添加到pipeline中,Context中还持有pipeline的引用return addHelper(std::make_shared<Context>(shared_from_this(), std::move(handler)),false);// false标识添加到尾部
}

首先,会根据要添加的handler类型定义一个Context(Context可以看成是Handler的外套,后面还会单独介绍)类型,然后根据这个Context类型创建一个Context:参数为Pipeline指针和handler,最终addHelper会将Context添加到容器管理起来:

template <class Context>
PipelineBase& PipelineBase::addHelper(std::shared_ptr<Context>&& ctx,bool front) {// 先加入总的Context (std::vector<std::shared_ptr<PipelineContext>>)// 该vector种使用的是智能指针,可以保持对Context的引用ctxs_.insert(front ? ctxs_.begin() : ctxs_.end(), ctx);// 然后根据方向(BOTH、IN、OUT分别加入相应的vector中)// std::vector<PipelineContext*> 这里放的是Context的指针,因为引用在上面的容器中已经保持if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::IN) {inCtxs_.insert(front ? inCtxs_.begin() : inCtxs_.end(), ctx.get());}if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::OUT) {outCtxs_.insert(front ? outCtxs_.begin() : outCtxs_.end(), ctx.get());}return *this;
}

Context内部包含了Pipeline、Handler,和Handler一样,Context也有方向:BOTH、IN、OUT,首先,无论Context 什么方向,都会在ctxs_容器上添加这个Context,然后会根据Context方向的不同,分别在inCtxs_和outCtxs_上添加该Context。接下来看一下这三个容器的定义:

  std::vector<std::shared_ptr<PipelineContext>> ctxs_;  // 所有的PipelineContextstd::vector<PipelineContext*> inCtxs_;  // inbound 类型的PipelineContextstd::vector<PipelineContext*> outCtxs_; // outbound 类型的PipelineContext

由于handler的其他操作(addFront、remove等)都是对这三个容器的增删操作,原理一样,此处不再赘述。

PipelineBase中还提供了设置PipelineManager的接口,从字面理解,PipelineManager就是管理Pipeline的接口,其定义如下:

class PipelineManager {public:virtual ~PipelineManager() = default;virtual void deletePipeline(PipelineBase* pipeline) = 0;virtual void refreshTimeout() {};
};

其中,deletePipeline会在显示调用一个pipeline的close方法时被调用,一般用来完成该Pipeline相关的资源释放,而refreshTimeout主要在Pipeline发生读写事件时被回调,主要用来刷新Pipeline的空闲时间。因此,如果你需要监听Pipeline的delete和refresh事件,那么可以自己实现一个PipelineManager并设置到Pipeline上。

在Wangle中没有定义专门的Channel结构,其实Wangle中的Pipeline兼有Channel的功能,比如要判断一个Channel是否还处于连接状态,在Netty中代码如下:

channel.isConnected();

那么Wangle中的Pipeline并没有此类方法可供使用,怎么办呢?其实,Wangle的Pipeline提供了一个更强大的方法:getTransport,该方法可以获得一个底层的AsyncTransport,而该AsyncTransport拥有所有的底层连接信息,比如(仅列出主要接口):

class AsyncTransport : public DelayedDestruction, public AsyncSocketBase {
public:typedef std::unique_ptr<AsyncTransport, Destructor> UniquePtr;virtual void close() = 0;virtual void closeNow() = 0;virtual void closeWithReset() {closeNow();}virtual void shutdownWrite() = 0;virtual void shutdownWriteNow() = 0;virtual bool good() const = 0;virtual bool readable() const = 0;virtual bool isPending() const {return readable();}virtual bool connecting() const = 0;virtual bool error() const = 0;virtual void attachEventBase(EventBase* eventBase) = 0;virtual void detachEventBase() = 0;virtual bool isDetachable() const = 0;virtual void setSendTimeout(uint32_t milliseconds) = 0;virtual uint32_t getSendTimeout() const = 0;virtual void getLocalAddress(SocketAddress* address) const = 0;virtual void getAddress(SocketAddress* address) const {getLocalAddress(address);}virtual void getPeerAddress(SocketAddress* address) const = 0;virtual ssl::X509UniquePtr getPeerCert() const { return nullptr; }
};

至此,PipelineBase中的主要功能分析完毕。

Pipeline是PipelineBase的子类,其具体定义如下:

template <class R, class W = folly::Unit>
class Pipeline : public PipelineBase {public:using Ptr = std::shared_ptr<Pipeline>;static Ptr create() {return std::shared_ptr<Pipeline>(new Pipeline());}~Pipeline();// 模板方法template <class T = R>typename std::enable_if<!std::is_same<T, folly::Unit>::value>::typeread(R msg);//front_->read(std::forward<R>(msg)); --> this->handler_->read(this, std::forward<Rin>(msg));template <class T = R>typename std::enable_if<!std::is_same<T, folly::Unit>::value>::typereadEOF();//front_->readEOF();template <class T = R>typename std::enable_if<!std::is_same<T, folly::Unit>::value>::typereadException(folly::exception_wrapper e);//front_->readException(std::move(e));template <class T = R>typename std::enable_if<!std::is_same<T, folly::Unit>::value>::typetransportActive();// front_->transportActive();template <class T = R>typename std::enable_if<!std::is_same<T, folly::Unit>::value>::typetransportInactive();//front_->transportInactive();template <class T = W>typename std::enable_if<!std::is_same<T, folly::Unit>::value,folly::Future<folly::Unit>>::typewrite(W msg);//back_->write(std::forward<W>(msg));template <class T = W>typename std::enable_if<!std::is_same<T, folly::Unit>::value,folly::Future<folly::Unit>>::typewriteException(folly::exception_wrapper e);//back_->writeException(std::move(e));template <class T = W>typename std::enable_if<!std::is_same<T, folly::Unit>::value,folly::Future<folly::Unit>>::typeclose();//back_->close()void finalize() override;protected:Pipeline();explicit Pipeline(bool isStatic);private:bool isStatic_{false};InboundLink<R>* front_{nullptr};// inbound类型Context(read)OutboundLink<W>* back_{nullptr};// outbound类型Context (write)
};

可以看到,Pipeline主要定义和实现了一些和Handler对应的常用方法:read、readEOF、readException、transportActive、transportInactive、write、writeException、close。同时,Pipeline还定义了两个私有成员:front_和back_,从类型可以看出这是两个不同的方向,首先看一下InboundLink定义:

template <class In>
class InboundLink {
public:virtual ~InboundLink() = default;virtual void read(In msg) = 0;virtual void readEOF() = 0;virtual void readException(folly::exception_wrapper e) = 0;virtual void transportActive() = 0;virtual void transportInactive() = 0;
};

可以看出,InboundLink只是把Pipeline主要方法中的IN方向单独抽象出来,都是一个IN事件(输入事件),那么可想而知OutboundLink的定义:

template <class Out>
class OutboundLink {
public:virtual ~OutboundLink() = default;virtual folly::Future<folly::Unit> write(Out msg) = 0;virtual folly::Future<folly::Unit> writeException(folly::exception_wrapper e) = 0;virtual folly::Future<folly::Unit> close() = 0;
};

的确,OutboundLink定义的都是OUT事件类型的操作。

前文在讲PipelineBase时,addBack之类的操作都是只针对那三个容器进行的,没有地方对front_链表和back_链表进行操作啊?其实,front_链表和back_链表的设置是在Pipeline的finalize中完成的:

template <class R, class W>
void Pipeline<R, W>::finalize() {front_ = nullptr;if (!inCtxs_.empty()) {front_ = dynamic_cast<InboundLink<R>*>(inCtxs_.front());for (size_t i = 0; i < inCtxs_.size() - 1; i++) {inCtxs_[i]->setNextIn(inCtxs_[i + 1]);}inCtxs_.back()->setNextIn(nullptr);}back_ = nullptr;if (!outCtxs_.empty()) {back_ = dynamic_cast<OutboundLink<W>*>(outCtxs_.back());for (size_t i = outCtxs_.size() - 1; i > 0; i--) {outCtxs_[i]->setNextOut(outCtxs_[i - 1]);}outCtxs_.front()->setNextOut(nullptr);}if (!front_) {detail::logWarningIfNotUnit<R>("No inbound handler in Pipeline, inbound operations will throw ""std::invalid_argument");}if (!back_) {detail::logWarningIfNotUnit<W>("No outbound handler in Pipeline, outbound operations will throw ""std::invalid_argument");}for (auto it = ctxs_.rbegin(); it != ctxs_.rend(); it++) {(*it)->attachPipeline();}
}

代码很简单,以IN方向为例,遍历inCtxs_容器,对容器中的每一个Context调用其setNextIn方法将Context组成一个单向链表front_。同理,outCtxs_最终会变为back_单向链表。最后,还会遍历Context的总容器ctxs_,为每一个Context调用attachPipeline方法,该方法主要工作就是把Context绑定到对应的Handler上(最终是Context和Handler都互相持有对方的引用),还会回调Handler的attachPipeline方法。

此处还有一个细节,Pipeline是一个模板类,具有两个模板参数template <class R, class W = folly::Unit>,分别代表Pipeline的 read(IN事件)的数据类型和write(out事件)数据类型,这些类型的设置要和Pipeline中的handler类型向匹配(后文还会详细讲解)。

下面就以Pipeline中的write方法来看一下事件的流动过程:

template <class R, class W>
template <class T>
typename std::enable_if < !std::is_same<T, folly::Unit>::value,folly::Future<folly::Unit >>::type
Pipeline<R, W>::write(W msg) {if (!back_) {throw std::invalid_argument("write(): no outbound handler in Pipeline");}return back_->write(std::forward<W>(msg));
}

Pipeline的write方法只是简单的调用back_的wirte方法,也就是OUT类型的事件会从Pipeline的最后一个Context依次向前传递(只传递给OUT类型的handler)。

Handler

Handler在继承层次上类似于Pipeline,首先有一个基类HandlerBase,其定义如下:

template <class Context>
class HandlerBase {
public:virtual ~HandlerBase() = default;virtual void attachPipeline(Context* /*ctx*/) {}virtual void detachPipeline(Context* /*ctx*/) {}// 获取绑定的ContextContext* getContext() {if (attachCount_ != 1) {return nullptr;}CHECK(ctx_);return ctx_;}private:friend PipelineContext;    // 设置PipelineContext为友元类,便于PipelineContext操作自己uint64_t attachCount_{0};  // 绑定计数,同一个handler可以被同时绑定到不同的pipeline中Context* ctx_{nullptr};    // 该Handler绑定的Context
};

HandlerBase内部组合了一个绑定的Context指针,并提供了getContext接口用于获取这个Handler绑定的Context。

Handler作为HandlerBase的子类,它具有四个模板参数: Rin、Rout、Win、Wout,其中Rin作为Handler和Context中read方法中消息的数据类型,Rout是作为Context中fireRead方法的参数类型。同理,Win是作为Handler和Context中wirte方法的消息参数类型,而Wout是作为Context中fireWrite的消息参数类型。可以这么理解:Xout是作为以fire开头的事件方法的参数类型。

template <class Rin, class Rout = Rin, class Win = Rout, class Wout = Rin>
class Handler : public HandlerBase<HandlerContext<Rout, Wout>> {
public:static const HandlerDir dir = HandlerDir::BOTH; // 方向为双向typedef Rin rin;typedef Rout rout;typedef Win win;typedef Wout wout;typedef HandlerContext<Rout, Wout> Context;  // 声明该HandlerContext类型virtual ~Handler() = default;// inbound类型事件virtual void read(Context* ctx, Rin msg) = 0;virtual void readEOF(Context* ctx) {ctx->fireReadEOF();}virtual void readException(Context* ctx, folly::exception_wrapper e) {ctx->fireReadException(std::move(e));}virtual void transportActive(Context* ctx) {ctx->fireTransportActive();}virtual void transportInactive(Context* ctx) {ctx->fireTransportInactive();}// outbound类型事件virtual folly::Future<folly::Unit> write(Context* ctx, Win msg) = 0;virtual folly::Future<folly::Unit> writeException(Context* ctx,folly::exception_wrapper e) {return ctx->fireWriteException(std::move(e));}virtual folly::Future<folly::Unit> close(Context* ctx) {return ctx->fireClose();}
};

类似于Pipeline,Handler也相应的定义了inbound类型和outbound类型事件,分别对应方法:read、readEOF、readException、transportActive、transportInactive、write、writeException、close(这些方法和Pipeline中一一对应)。其中,除了read和write两个方法是纯虚接口之外,其他的方法都提供了默认实现:就是将事件进行透传(调用Context里fireXxx方法)。

同理,根据事件类型的不同,还可以进一步细分Handler类型,比如InboundHandler类型为:

// inbound类型的Handler (默认情况下读入和读出的类型是一致)
template <class Rin, class Rout = Rin>
class InboundHandler : public HandlerBase<InboundHandlerContext<Rout>> {
public:static const HandlerDir dir = HandlerDir::IN;  // 方向为输入typedef Rin rin;typedef Rout rout;typedef folly::Unit win;typedef folly::Unit wout;typedef InboundHandlerContext<Rout> Context; // 声明inbound类型的InboundHandlerContextvirtual ~InboundHandler() = default;// 纯虚函数。由子类实现virtual void read(Context* ctx, Rin msg) = 0;// 下面的默认实现都是事件的透传virtual void readEOF(Context* ctx) {ctx->fireReadEOF();}virtual void readException(Context* ctx, folly::exception_wrapper e) {ctx->fireReadException(std::move(e));// std::move}virtual void transportActive(Context* ctx) {ctx->fireTransportActive();}virtual void transportInactive(Context* ctx) {ctx->fireTransportInactive();}
};

相应的,OutboundHandler类型定义为:

// outbound类型的Handler (默认写入类型和写出类型一致,如果不一致就会产生很多的转换)
template <class Win, class Wout = Win>
class OutboundHandler : public HandlerBase<OutboundHandlerContext<Wout>> {
public:static const HandlerDir dir = HandlerDir::OUT; // 方向为输出typedef folly::Unit rin;typedef folly::Unit rout;typedef Win win;typedef Wout wout;typedef OutboundHandlerContext<Wout> Context;virtual ~OutboundHandler() = default;// 纯虚函数。由子类实现virtual folly::Future<folly::Unit> write(Context* ctx, Win msg) = 0;// 下面的默认实现都是事件的透传virtual folly::Future<folly::Unit> writeException(Context* ctx, folly::exception_wrapper e) {return ctx->fireWriteException(std::move(e));}virtual folly::Future<folly::Unit> close(Context* ctx) {return ctx->fireClose();}
};

前文所说,Handler所有的事件方法中只有read和write是纯虚接口,这样用户每次实现自己的Handler时都需要override这两个方法(即使只是完成简单的事件透传),因此,为了方便用户编写自己的Handler,Wangle提供了HandlerAdapter,HandlerAdapter其实很简单,就是以事件透传的方式重写(override)了read个write两个方法。代码如下:

// Handler适配器
template <class R, class W = R>
class HandlerAdapter : public Handler<R, R, W, W> {
public:typedef typename Handler<R, R, W, W>::Context Context;// 将read事件直接进行透传void read(Context* ctx, R msg) override {ctx->fireRead(std::forward<R>(msg));}// 将write事件直接进行透传folly::Future<folly::Unit> write(Context* ctx, W msg) override {return ctx->fireWrite(std::forward<W>(msg));}
};

Context

如前文所述,Pipeline中直接管理的并不是Handler,而是Context,为了便于理解,此处再把Pipeline中的addBack源码列出来:

template <class H>
PipelineBase& PipelineBase::addBack(std::shared_ptr<H> handler) {typedef typename ContextType<H>::type Context;// 声明Conetxt类型,ContextImpl<Handler>、InboundContextImpl<Handler>、OutboundContextImpl<Handler>其中之一// 使用Context包装Handler后,将其添加到pipeline中,Context中还持有pipeline的引用return addHelper(std::make_shared<Context>(shared_from_this(), std::move(handler)),false);// false标识添加到尾部
}

其中,ContextType的定义如下,它会根据Handler的类型(具体来说是方向)决定Context的类型,如果Handler是双向的,那么Context类型为ContextImpl<Handler>,如果Handler的方向为IN,那么Context类型为InboundContextImpl<Handler>,如果Handler的方向为OUT,那么Context类型为OutboundContextImpl<Handler>。

template <class Handler>
struct ContextType {// template< bool B, class T, class F >// type T if B == true, F if B == falsetypedef typename std::conditional <Handler::dir == HandlerDir::BOTH,        //如果是双向ContextImpl<Handler>,            //类型就是ContextImpl<Handler>typename std::conditional<       //如果不是双向,那么还需要细分Handler::dir == HandlerDir::IN,  //如果是IN类型InboundContextImpl<Handler>,     //那么类型就是InboundContextImpl<Handler>OutboundContextImpl<Handler>     //否则就是OutboundContextImpl<Handler>>::type >::typetype;                            // Context类型
};

其实,InboundContextImpl和OutboundContextImpl都是ContextImpl的子类,ContextImpl的继承关系为:

template <class H>
class ContextImpl: public HandlerContext<typename H::rout, typename H::wout>,public InboundLink<typename H::rin>,public OutboundLink<typename H::win>,public ContextImplBase<H, HandlerContext<typename H::rout, typename H::wout>> 

可以看到,ContextImpl一个继承自四个父类:HandlerContext、InboundLink、OutboundLink和ContextImplBase,其中HandlerContext中主要定义了以fire开头的事件传递方法;InboundLink和OutboundLink分别定义了Handler中Inbound和Outbound类型的方法接口,还记得Pipeline中用于管理IN方向和OUT方向的两个链表:front_和back_,它们就分别是InboundLink和OutboundLink类型;ContextImplBase主要提供了Pipeline中Context在组装链表时的接口,比如:setNextIn、setNextOut,以及用于将Context绑定到handler上的attachPipeline方法。

首先来看HandlerContext基类:

// HandlerContext定义(集inbound和outbound类型于一身)
// 以fire开始的方法都是Context中的事件方法
template <class In, class Out>
class HandlerContext {
public:virtual ~HandlerContext() = default;// inbound类型事件接口virtual void fireRead(In msg) = 0;virtual void fireReadEOF() = 0;virtual void fireReadException(folly::exception_wrapper e) = 0;virtual void fireTransportActive() = 0;virtual void fireTransportInactive() = 0;// outbound类型事件接口virtual folly::Future<folly::Unit> fireWrite(Out msg) = 0;virtual folly::Future<folly::Unit> fireWriteException(folly::exception_wrapper e) = 0;virtual folly::Future<folly::Unit> fireClose() = 0;virtual PipelineBase* getPipeline() = 0;virtual std::shared_ptr<PipelineBase> getPipelineShared() = 0;std::shared_ptr<folly::AsyncTransport> getTransport() {return getPipeline()->getTransport();}virtual void setWriteFlags(folly::WriteFlags flags) = 0;virtual folly::WriteFlags getWriteFlags() = 0;virtual void setReadBufferSettings(uint64_t minAvailable,uint64_t allocationSize) = 0;virtual std::pair<uint64_t, uint64_t> getReadBufferSettings() = 0;
};

HandlerContext主要定义了以fire开头的事件传播方法:fireRead、fireReadEOF、fireReadException、fireTransportActive、fireTransportInactive、fireWrite、fireWriteException、fireClose,以及getPipeline用于获取Context绑定的Pipeline、getPipelineShared以智能指针的形式获取Pipeline、getTransport用于获取Pipeline对应的Transport。

根据事件流向的不同,Context也可以细分定义,InboundHandlerContext定义为:

// inbound 类型的InboundHandlerContext
template <class In>
class InboundHandlerContext {
public:virtual ~InboundHandlerContext() = default;virtual void fireRead(In msg) = 0;virtual void fireReadEOF() = 0;virtual void fireReadException(folly::exception_wrapper e) = 0;virtual void fireTransportActive() = 0;virtual void fireTransportInactive() = 0;virtual PipelineBase* getPipeline() = 0;virtual std::shared_ptr<PipelineBase> getPipelineShared() = 0;std::shared_ptr<folly::AsyncTransport> getTransport() {return getPipeline()->getTransport();}
};

同理,OutboundHandlerContext定义为:

// outbound 类型的OutboundHandlerContext
template <class Out>
class OutboundHandlerContext {
public:virtual ~OutboundHandlerContext() = default;virtual folly::Future<folly::Unit> fireWrite(Out msg) = 0;virtual folly::Future<folly::Unit> fireWriteException(folly::exception_wrapper e) = 0;virtual folly::Future<folly::Unit> fireClose() = 0;virtual PipelineBase* getPipeline() = 0;virtual std::shared_ptr<PipelineBase> getPipelineShared() = 0;std::shared_ptr<folly::AsyncTransport> getTransport() {return getPipeline()->getTransport();}
};

如前文所述,PipelineContext主要定义了如何在Pipeline中组织Context链表的操作接口,比如setNextIn用于设置下一个IN类型的Context,setNextOut用来设置下一个OUT类型Context,具体定义如下:

class PipelineContext {
public:virtual ~PipelineContext() = default;// 依附到一个pipeline中virtual void attachPipeline() = 0;// 从pipeline中分离virtual void detachPipeline() = 0;// 将一个HandlerContext绑定到handler上template <class H, class HandlerContext>void attachContext(H* handler, HandlerContext* ctx) {// 只有第一次绑定的时候才会设置if (++handler->attachCount_ == 1) {handler->ctx_ = ctx;} else {// 为何在此设置的时候就为nullptrhandler->ctx_ = nullptr;}}// 设置下一个inbound类型的Contextvirtual void setNextIn(PipelineContext* ctx) = 0;// 设置下一个outbound类型的Contextvirtual void setNextOut(PipelineContext* ctx) = 0;// 获取方向(Context方向依赖于Handler方向)virtual HandlerDir getDirection() = 0;
};

ContextImplBase主要实现了PipelineContext接口方法,同时它的两个成员:nextIn_和nextOut_就是链表的指针,用来串联起整个Context。

template <class H, class Context>
class ContextImplBase : public PipelineContext {
public:~ContextImplBase() = default;// 获取Context绑定的HandlerH* getHandler() {return handler_.get();}// Context初始化,参数为Context所属的Pipeline weak_ptr,Context要绑定的Handler  shared_ptrvoid initialize(std::weak_ptr<PipelineBase> pipeline,std::shared_ptr<H> handler) {pipelineWeak_ = pipeline;pipelineRaw_ = pipeline.lock().get();//裸指针handler_ = std::move(handler);}// PipelineContext overridesvoid attachPipeline() override {// 如果该Context还没有被绑定if (!attached_) {this->attachContext(handler_.get(), impl_);// 将该Context绑定到handler上handler_->attachPipeline(impl_); // 调用Handler的attachPipeline,有具体的Handler实现attached_ = true;//标记Context已经attached到一个pipeline中}}// 从pipeline中分离void detachPipeline() override {handler_->detachPipeline(impl_);// 调用Handler的detachPipeline,有具体的Handler实现// 依附标志位为falseattached_ = false;}void setNextIn(PipelineContext* ctx) override {if (!ctx) {nextIn_ = nullptr;return;}// 转成InboundLink,因为Context是InboundLink子类auto nextIn = dynamic_cast<InboundLink<typename H::rout>*>(ctx);if (nextIn) {nextIn_ = nextIn;} else {throw std::invalid_argument(folly::sformat("inbound type mismatch after {}", folly::demangle(typeid(H))));}}void setNextOut(PipelineContext* ctx) override {if (!ctx) {nextOut_ = nullptr;return;}auto nextOut = dynamic_cast<OutboundLink<typename H::wout>*>(ctx);if (nextOut) {nextOut_ = nextOut;} else {throw std::invalid_argument(folly::sformat("outbound type mismatch after {}", folly::demangle(typeid(H))));}}// 获取Context的方向HandlerDir getDirection() override {return H::dir;}protected:Context* impl_;                                    // 具体的Context实现std::weak_ptr<PipelineBase> pipelineWeak_;         //PipelineBase* pipelineRaw_;                        // 该Context绑定的pipelinestd::shared_ptr<H> handler_;                       // 该Context包含的HandlerInboundLink<typename H::rout>* nextIn_{nullptr};   // 下一个inbound类型的Context地址OutboundLink<typename H::wout>* nextOut_{nullptr}; // 下一个outbound类型的Context地址private:bool attached_{false}; // 这个Context是否已经被绑定
};

ContextImpl就是最终的Context实现,也就是要被添加到Pipeline中(比如使用addBack)的容器(ctxs_,inCtxs_,outCtxs_)的最终Context,在最后的finalize方法中还会进一步将容器中的Context组装成front_和back_单向链表。

ContextImpl的主要功能就是实现了各种事件传递方法(以fire开头的方法),以fireRead为例,这是一个IN类型的事件,由于Context中持有的Pipeline是一个weak类型的指针,因此先尝试lock,保证在事件传播阶段这个Pipeline不会销毁,然后会去调用下一个IN类型的Context的read方法。read方法是InboundLink中定义的接口(注意这里的read不是Handler中的也不是Pipeline中的),ContextImpl的也实现了这个read方法,它的功能很简单,首先还是先lock住这个Pipeline,然后直接调用Context内部包含的Handler的read方法。

template <class H>
class ContextImpl: public HandlerContext<typename H::rout, typename H::wout>,public InboundLink<typename H::rin>,public OutboundLink<typename H::win>,public ContextImplBase<H, HandlerContext<typename H::rout, typename H::wout>> {
public:typedef typename H::rin Rin;typedef typename H::rout Rout;typedef typename H::win Win;typedef typename H::wout Wout;static const HandlerDir dir = HandlerDir::BOTH;explicit ContextImpl(std::weak_ptr<PipelineBase> pipeline,std::shared_ptr<H> handler) {this->impl_ = this;//实现就是自己this->initialize(pipeline, std::move(handler));//初始化}// For StaticPipelineContextImpl() {this->impl_ = this;}~ContextImpl() = default;// HandlerContext overrides// Inbound类型的事件:read事件void fireRead(Rout msg) override {auto guard = this->pipelineWeak_.lock();// 锁住,确保一旦锁住成功,在操作期间,pipeline不会被销毁// 如果还没有到最后if (this->nextIn_) {//  将事件继续向下传播(传给下一个Inbound类型的Context)//  注意:这里调用的是下一个Contex的read而不是fireRead//  即调用下一个Context里面的Handler方法this->nextIn_->read(std::forward<Rout>(msg));} else {LOG(WARNING) << "read reached end of pipeline";}}void fireReadEOF() override {auto guard = this->pipelineWeak_.lock();if (this->nextIn_) {this->nextIn_->readEOF();} else {LOG(WARNING) << "readEOF reached end of pipeline";}}void fireReadException(folly::exception_wrapper e) override {auto guard = this->pipelineWeak_.lock();if (this->nextIn_) {this->nextIn_->readException(std::move(e));} else {LOG(WARNING) << "readException reached end of pipeline";}}void fireTransportActive() override {auto guard = this->pipelineWeak_.lock();if (this->nextIn_) {this->nextIn_->transportActive();}}void fireTransportInactive() override {auto guard = this->pipelineWeak_.lock();if (this->nextIn_) {this->nextIn_->transportInactive();}}//Outbound类型的事件传播folly::Future<folly::Unit> fireWrite(Wout msg) override {auto guard = this->pipelineWeak_.lock();if (this->nextOut_) {return this->nextOut_->write(std::forward<Wout>(msg));} else {LOG(WARNING) << "write reached end of pipeline";// 如果到了最后,返回一个futurereturn folly::makeFuture();}}folly::Future<folly::Unit> fireWriteException(folly::exception_wrapper e) override {auto guard = this->pipelineWeak_.lock();if (this->nextOut_) {return this->nextOut_->writeException(std::move(e));} else {LOG(WARNING) << "close reached end of pipeline";return folly::makeFuture();}}folly::Future<folly::Unit> fireClose() override {auto guard = this->pipelineWeak_.lock();if (this->nextOut_) {return this->nextOut_->close();} else {LOG(WARNING) << "close reached end of pipeline";return folly::makeFuture();}}// 获取Context绑定的pipeline指针PipelineBase* getPipeline() override {return this->pipelineRaw_;}// 获取Context绑定的pipeline引用std::shared_ptr<PipelineBase> getPipelineShared() override {return this->pipelineWeak_.lock();}// 设置和获取wirte标志位void setWriteFlags(folly::WriteFlags flags) override {this->pipelineRaw_->setWriteFlags(flags);}folly::WriteFlags getWriteFlags() override {return this->pipelineRaw_->getWriteFlags();}// 设置read缓冲区参数 minAvailable、allocationSizevoid setReadBufferSettings(uint64_t minAvailable,uint64_t allocationSize) override {this->pipelineRaw_->setReadBufferSettings(minAvailable, allocationSize);}std::pair<uint64_t, uint64_t> getReadBufferSettings() override {return this->pipelineRaw_->getReadBufferSettings();}// InboundLink overridesvoid read(Rin msg) override {// 保证pipeline不会被删除auto guard = this->pipelineWeak_.lock();// 调用该Context绑定的Handler的read方法,至于事件是都需要继续传播,完全受read中的实现this->handler_->read(this, std::forward<Rin>(msg));}void readEOF() override {auto guard = this->pipelineWeak_.lock();this->handler_->readEOF(this);}void readException(folly::exception_wrapper e) override {auto guard = this->pipelineWeak_.lock();this->handler_->readException(this, std::move(e));}void transportActive() override {auto guard = this->pipelineWeak_.lock();this->handler_->transportActive(this);}void transportInactive() override {auto guard = this->pipelineWeak_.lock();this->handler_->transportInactive(this);}// OutboundLink overridesfolly::Future<folly::Unit> write(Win msg) override {auto guard = this->pipelineWeak_.lock();return this->handler_->write(this, std::forward<Win>(msg));}folly::Future<folly::Unit> writeException(folly::exception_wrapper e) override {auto guard = this->pipelineWeak_.lock();return this->handler_->writeException(this, std::move(e));}folly::Future<folly::Unit> close() override {auto guard = this->pipelineWeak_.lock();return this->handler_->close(this);}
};

同样,Context也可以根据传输方向进行细分,首先是InboundContextImpl:

template <class H>
class InboundContextImpl: public InboundHandlerContext<typename H::rout>,public InboundLink<typename H::rin>,public ContextImplBase<H, InboundHandlerContext<typename H::rout>> {
public:typedef typename H::rin Rin;typedef typename H::rout Rout;typedef typename H::win Win;typedef typename H::wout Wout;static const HandlerDir dir = HandlerDir::IN;explicit InboundContextImpl(std::weak_ptr<PipelineBase> pipeline,std::shared_ptr<H> handler) {this->impl_ = this;this->initialize(pipeline, std::move(handler));}// For StaticPipelineInboundContextImpl() {this->impl_ = this;}~InboundContextImpl() = default;// InboundHandlerContext overridesvoid fireRead(Rout msg) override {auto guard = this->pipelineWeak_.lock();if (this->nextIn_) {this->nextIn_->read(std::forward<Rout>(msg));} else {LOG(WARNING) << "read reached end of pipeline";}}void fireReadEOF() override {auto guard = this->pipelineWeak_.lock();if (this->nextIn_) {this->nextIn_->readEOF();} else {LOG(WARNING) << "readEOF reached end of pipeline";}}void fireReadException(folly::exception_wrapper e) override {auto guard = this->pipelineWeak_.lock();if (this->nextIn_) {this->nextIn_->readException(std::move(e));} else {LOG(WARNING) << "readException reached end of pipeline";}}void fireTransportActive() override {auto guard = this->pipelineWeak_.lock();if (this->nextIn_) {this->nextIn_->transportActive();}}void fireTransportInactive() override {auto guard = this->pipelineWeak_.lock();if (this->nextIn_) {this->nextIn_->transportInactive();}}PipelineBase* getPipeline() override {return this->pipelineRaw_;}std::shared_ptr<PipelineBase> getPipelineShared() override {return this->pipelineWeak_.lock();}// InboundLink overridesvoid read(Rin msg) override {auto guard = this->pipelineWeak_.lock();this->handler_->read(this, std::forward<Rin>(msg));}void readEOF() override {auto guard = this->pipelineWeak_.lock();this->handler_->readEOF(this);}void readException(folly::exception_wrapper e) override {auto guard = this->pipelineWeak_.lock();this->handler_->readException(this, std::move(e));}void transportActive() override {auto guard = this->pipelineWeak_.lock();this->handler_->transportActive(this);}void transportInactive() override {auto guard = this->pipelineWeak_.lock();this->handler_->transportInactive(this);}
};

其次是OutboundContextImpl:

template <class H>
class OutboundContextImpl: public OutboundHandlerContext<typename H::wout>,public OutboundLink<typename H::win>,public ContextImplBase<H, OutboundHandlerContext<typename H::wout>> {
public:typedef typename H::rin Rin;typedef typename H::rout Rout;typedef typename H::win Win;typedef typename H::wout Wout;static const HandlerDir dir = HandlerDir::OUT;explicit OutboundContextImpl(std::weak_ptr<PipelineBase> pipeline,std::shared_ptr<H> handler) {this->impl_ = this;this->initialize(pipeline, std::move(handler));}// For StaticPipelineOutboundContextImpl() {this->impl_ = this;}~OutboundContextImpl() = default;// OutboundHandlerContext overridesfolly::Future<folly::Unit> fireWrite(Wout msg) override {auto guard = this->pipelineWeak_.lock();if (this->nextOut_) {return this->nextOut_->write(std::forward<Wout>(msg));} else {LOG(WARNING) << "write reached end of pipeline";return folly::makeFuture();}}folly::Future<folly::Unit> fireWriteException(folly::exception_wrapper e) override {auto guard = this->pipelineWeak_.lock();if (this->nextOut_) {return this->nextOut_->writeException(std::move(e));} else {LOG(WARNING) << "close reached end of pipeline";return folly::makeFuture();}}folly::Future<folly::Unit> fireClose() override {auto guard = this->pipelineWeak_.lock();if (this->nextOut_) {return this->nextOut_->close();} else {LOG(WARNING) << "close reached end of pipeline";return folly::makeFuture();}}PipelineBase* getPipeline() override {return this->pipelineRaw_;}std::shared_ptr<PipelineBase> getPipelineShared() override {return this->pipelineWeak_.lock();}// OutboundLink overridesfolly::Future<folly::Unit> write(Win msg) override {auto guard = this->pipelineWeak_.lock();return this->handler_->write(this, std::forward<Win>(msg));}folly::Future<folly::Unit> writeException(folly::exception_wrapper e) override {auto guard = this->pipelineWeak_.lock();return this->handler_->writeException(this, std::move(e));}folly::Future<folly::Unit> close() override {auto guard = this->pipelineWeak_.lock();return this->handler_->close(this);}
};

按照惯例,还是来一张图总结一下吧:

本系列文章

Wangle源码分析:Service

Wangle源码分析:ServerBootstrap

Wangle源码分析:编解码Handler

Wangle源码分析:EventBaseHandler、AsyncSocketHandler

Wangle源码分析:Pipeline、Handler、Context

Wangle源码分析:ClientBootstrap

转载于:https://my.oschina.net/fileoptions/blog/881236

Wangle源码分析:Pipeline、Handler、Context相关推荐

  1. Wangle源码分析:编解码Handler

    2019独角兽企业重金招聘Python工程师标准>>> 前言 编解码是协议相关的,如果没有编解码Handler,那么在处理网络的粘包.拆包时会变得很复杂.除了http之类的公有协议之 ...

  2. Wangle源码分析:Service

    2019独角兽企业重金招聘Python工程师标准>>> 前言 Wangle中的Service代表一个远程服务(方法),熟悉RPC的朋友肯定知道这就是一个简单的RPC,当然,和一些常见 ...

  3. Wangle源码分析:ClientBootstrap

    2019独角兽企业重金招聘Python工程师标准>>> ClientBootstrap介绍 ClientBootstrap是wangle作为Client端的一个快速启动辅助类,在经过 ...

  4. Wangle源码分析:ServerBootstrap

    2019独角兽企业重金招聘Python工程师标准>>> ServerBootstrap介绍       ServerBootstrap,顾名思义,它是作为Wangle服务端的一个启动 ...

  5. Wangle源码分析:EventBaseHandler、AsyncSocketHandler

    2019独角兽企业重金招聘Python工程师标准>>> 前言 前面的Wangle源码分析系列文章详细的分析了Pipeline.Handler等实现原理,细心的读者可能发现,每次在构造 ...

  6. 源码分析Android Handler是如何实现线程间通信的

    源码分析Android Handler是如何实现线程间通信的 Handler作为Android消息通信的基础,它的使用是每一个开发者都必须掌握的.开发者从一开始就被告知必须在主线程中进行UI操作.但H ...

  7. Android源码分析-全面理解Context

    前言 Context在android中的作用不言而喻,当我们访问当前应用的资源,启动一个新的activity的时候都需要提供Context,而这个Context到底是什么呢,这个问题好像很好回答又好像 ...

  8. android的消息处理机制(图文+源码分析)—Looper/Handler/Message[转]

    from:http://www.jb51.net/article/33514.htm 作为一个大三的预备程序员,我学习android的一大乐趣是可以通过源码学习google大牛们的设计思想.andro ...

  9. Android -- 消息处理机制源码分析(Looper,Handler,Message)

    android的消息处理有三个核心类:Looper,Handler和Message.其实还有一个Message Queue(消息队列),但是MQ被封装到Looper里面了,我们不会直接与MQ打交道,因 ...

最新文章

  1. 学习的本质在于触发了你的思考
  2. netstat [选项]
  3. opengl如何画出一个球_OpenGL-Controlling and Monitoring the Pipeline
  4. android 添加新用户,华为手机怎么添加新用户?华为手机添加新用户的方法
  5. vba窗体 点击增加减少_EXCEL之VBA-窗体实例多页控件的基础应用
  6. Atitit 得到mybatis 实际 sql 1.1. 使用mybatis工具提供的,只能出现问号一大堆不行 1 1.2. 配置log 打印sql依然不行,里面有问号。。 4 1.3. 配置p
  7. 数据集:高考录取分数
  8. Android 阿里云推送集成指南
  9. VR测试视频源,双目立体视觉测试视频,大分辨率2880x1440-25fps
  10. MQTT系列 | Retained消息和LWT和Keep Alive
  11. 傲腾这么厉害?QLC闪存笑了!
  12. 高效通过,PMI-ACP 备考知识突击(九阴真经版)
  13. html制作洋葱皮,洋葱皮作用做漂染复活节彩蛋教程
  14. 使用HttpClient下载图片常用代码,以及下载失败原因
  15. Maven插件wagon-maven-plugin自动化部署Java项目到Linux远程服务器
  16. MATLAB函数downsample的用法详解
  17. mysql靶场_搭建sqli靶场
  18. 发送邮件的JavaMail和Spring提供的MailSender比较分析
  19. Excel小技巧(随机点名)
  20. UIApp教程(全网最详细的教程来啦)

热门文章

  1. 特征工程中的数据标准化
  2. Docker mongo:5.0
  3. 80%中国男人不敢主动和女人搭讪
  4. [转] 快速掌握一个语言最常用的50%
  5. man page 中代号的意思
  6. 机电工程专业成功转行IT的真实经历
  7. 【C语言】BC90小乐乐算多少人被请家长(DAY 5)
  8. 【LeetCode 深度优先搜索专项】不同岛屿的数量 II(711)
  9. 立创eda专业版学习笔记(3)(隐藏部分飞线)
  10. wishbone协议(B.3)下载地址