元数据服务是BeeGFS中用来维护文件和目录关系及其属性配置的服务,其多线程epoll设计实现非常高效,主要流程如下:

ConnAcceptor(PThread)类(一个线程)负责监听端口,并接受客户端连接,然后把;连接信息(包含接收的套接字)写入管道;

StreamListenerV2(PThread)类(多个线程,可配置)从管道读取连接信息,使用epoll轮询接收数据,然后生成IncomingPreprocessedMsgWork(Work),写入MultiWorkQueue先进先出队列;

Worker(PThread)类(多个线程,可配置)从MultiWorkQueue队列取出消息进行处理。

程序初始化

主函数

创建App对象,App对象是程序的主要载体:

// fhgfs_meta\source\program\main.cpp

#include "Program.h"

int main(int argc, char** argv)

{

return Program::main(argc, argv);

}

// fhgfs_meta\source\program\Program.cpp

#include

#include "Program.h"

#include

App* Program::app = NULL;

int Program::main(int argc, char** argv)

{

BuildTypeTk::checkDebugBuildTypes();

AbstractApp::runTimeInitsAndChecks(); // must be called before creating a new App

app = new App(argc, argv);

app->startInCurrentThread();

int appRes = app->getAppResult();

delete app;

return appRes;

}

创建ConnAcceptor

主程序中会初始化一个线程,监听服务端口,由ConnAcceptor类负责:

// fhgfs_meta\source\app\App.cpp

void App::initComponents(TargetConsistencyState initialConsistencyState)

throw(ComponentInitException)

{

this->log->log(Log_DEBUG, "Initializing components...");

this->dgramListener = new DatagramListener(

netFilter, localNicList, ackStore, cfg->getConnMetaPortUDP() );

if(cfg->getTuneListenerPrioShift() )

dgramListener->setPriorityShift(cfg->getTuneListenerPrioShift() );

streamListenersInit();

unsigned short listenPort = cfg->getConnMetaPortTCP();

this->connAcceptor = new ConnAcceptor(this, localNicList, listenPort);

this->statsCollector = new StatsCollector(workQueue, STATSCOLLECTOR_COLLECT_INTERVAL_MS,

STATSCOLLECTOR_HISTORY_LENGTH);

this->buddyResyncer = new BuddyResyncer();

this->internodeSyncer = new InternodeSyncer(initialConsistencyState);

this->timerQueue = new TimerQueue(1, 1);

this->modificationEventFlusher = new ModificationEventFlusher();

workersInit();

commSlavesInit();

this->log->log(Log_DEBUG, "Components initialized.");

}

创建StreamListener

根据配置创建多个StreamListener实例,每个实例对应线程,用于从ConnAcceptor接收新连接,已及从从连接读取数据,生成Work:

// fhgfs_meta\source\app\App.cpp

void App::streamListenersInit() throw(ComponentInitException)

{

this->numStreamListeners = cfg->getTuneNumStreamListeners();

for(unsigned i=0; i < numStreamListeners; i++)

{

StreamListenerV2* listener = new StreamListenerV2(

std::string("StreamLis") + StringTk::uintToStr(i+1), this, workQueue);

if(cfg->getTuneListenerPrioShift() )

listener->setPriorityShift(cfg->getTuneListenerPrioShift() );

if(cfg->getTuneUseAggressiveStreamPoll() )

listener->setUseAggressivePoll();

streamLisVec.push_back(listener);

}

}

创建WorkQueue

创建WorkQueue,用于保存StreamListener生成的Work:

// fhgfs_meta\source\app\App.cpp

/**

* Init basic shared objects like work queues, node stores etc.

*/

void App::initDataObjects() throw(InvalidConfigException)

{

...

this->workQueue = new MultiWorkQueue();

this->commSlaveQueue = new MultiWorkQueue();

if(cfg->getTuneUsePerUserMsgQueues() )

workQueue->setIndirectWorkList(new UserWorkContainer() );

...

}

创建Worker

根据配置创建Worker线程,从WorkQueue读取Work并进行处理:

// fhgfs_meta\source\app\App.cpp

void App::workersInit() throw(ComponentInitException)

{

unsigned numWorkers = cfg->getTuneNumWorkers();

for(unsigned i=0; i < numWorkers; i++)

{

Worker* worker = new Worker(

std::string("Worker") + StringTk::uintToStr(i+1), workQueue, QueueWorkType_INDIRECT);

worker->setBufLens(cfg->getTuneWorkerBufSize(), cfg->getTuneWorkerBufSize() );

workerList.push_back(worker);

}

for(unsigned i=0; i < APP_WORKERS_DIRECT_NUM; i++)

{

Worker* worker = new Worker(

std::string("DirectWorker") + StringTk::uintToStr(i+1), workQueue, QueueWorkType_DIRECT);

worker->setBufLens(cfg->getTuneWorkerBufSize(), cfg->getTuneWorkerBufSize() );

workerList.push_back(worker);

}

}

连接监听

监听类ConnAcceptor

ConnAcceptor类的定义:

// fhgfs_common\source\common\components\streamlistenerv2\ConnAcceptor.h

class ConnAcceptor : public PThread

{

public:

ConnAcceptor(AbstractApp* app, NicAddressList& localNicList, unsigned short listenPort)

throw(ComponentInitException);

virtual ~ConnAcceptor();

private:

AbstractApp* app;

LogContext log;

StandardSocket* tcpListenSock;

StandardSocket* sdpListenSock;

RDMASocket* rdmaListenSock;

int epollFD;

bool initSocks(unsigned short listenPort, NicListCapabilities* localNicCaps);

virtual void run();

void listenLoop();

void onIncomingStandardConnection(StandardSocket* sock);

void onIncomingRDMAConnection(RDMASocket* sock);

void applySocketOptions(StandardSocket* sock);

public:

// getters & setters

};

连接监听循环

使用epool来轮询监听端口,并建立新连接:

// fhgfs_common\source\common\components\streamlistenerv2\ConnAcceptor.cpp

void ConnAcceptor::run()

{

try

{

registerSignalHandler();

listenLoop();

log.log(Log_DEBUG, "Component stopped.");

}

catch(std::exception& e)

{

PThread::getCurrentThreadApp()->handleComponentException(e);

}

}

void ConnAcceptor::listenLoop()

{

const int epollTimeoutMS = 3000;

struct epoll_event epollEvents[EPOLL_EVENTS_NUM];

// (just to have these values on the stack...)

const int epollFD = this->epollFD;

RDMASocket* rdmaListenSock = this->rdmaListenSock;

StandardSocket* sdpListenSock = this->sdpListenSock;

StandardSocket* tcpListenSock = this->tcpListenSock;

// wait for incoming events and handle them...

while(!getSelfTerminate() )

{

//log.log(Log_DEBUG, std::string("Before poll(). pollArrayLen: ") +

// StringTk::uintToStr(pollArrayLen) );

int epollRes = epoll_wait(epollFD, epollEvents, EPOLL_EVENTS_NUM, epollTimeoutMS);

if(unlikely(epollRes < 0) )

{ // error occurred

if(errno == EINTR) // ignore interruption, because the debugger causes this

continue;

log.logErr(std::string("Unrecoverable epoll_wait error: ") + System::getErrString() );

break;

}

// handle incoming connection attempts

for(size_t i=0; i < (size_t)epollRes; i++)

{

struct epoll_event* currentEvent = &epollEvents[i];

Pollable* currentPollable = (Pollable*)currentEvent->data.ptr;

//log.log(Log_DEBUG, std::string("Incoming data on FD: ") +

// StringTk::intToStr(pollArray[i].fd) ); // debug in

if(currentPollable == rdmaListenSock)

onIncomingRDMAConnection(rdmaListenSock);

else

if(currentPollable == tcpListenSock)

onIncomingStandardConnection(tcpListenSock);

else

if(currentPollable == sdpListenSock)

onIncomingStandardConnection(sdpListenSock);

else

{ // unknown connection => should never happen

log.log(Log_WARNING, "Should never happen: Ignoring event for unknown connection. "

"FD: " + StringTk::uintToStr(currentPollable->getFD() ) );

}

}

}

}

套接字监听处理(派发给流)

把建立的套接字发送给指定的StreamListener:

// fhgfs_common\source\common\components\streamlistenerv2\ConnAcceptor.cpp

/**

* Accept the incoming connection and add new socket to StreamListenerV2 queue.

*

* Note: This is for standard sockets like TCP and SDP.

*/

void ConnAcceptor::onIncomingStandardConnection(StandardSocket* sock)

{

try

{

struct sockaddr_in peerAddr;

socklen_t peerAddrLen = sizeof(peerAddr);

StandardSocket* acceptedSock =

(StandardSocket*)sock->accept( (struct sockaddr*)&peerAddr, &peerAddrLen);

// (note: level Log_DEBUG to avoid spamming the log until we have log topics)

log.log(Log_DEBUG, std::string("Accepted new connection from " +

Socket::endpointAddrToString(&peerAddr.sin_addr, ntohs(peerAddr.sin_port) ) ) +

std::string(" [SockFD: ") + StringTk::intToStr(acceptedSock->getFD() ) +

std::string("]") );

applySocketOptions(acceptedSock);

// hand the socket over to a stream listener

StreamListenerV2* listener = app->getStreamListenerByFD(acceptedSock->getFD() );

StreamListenerV2::SockReturnPipeInfo returnInfo(

StreamListenerV2::SockPipeReturn_NEWCONN, acceptedSock);

listener->getSockReturnFD()->write(&returnInfo, sizeof(returnInfo) );

}

catch(SocketException& se)

{

log.logErr(std::string("Trying to continue after connection accept error: ") +

se.what() );

}

}

流处理的选择

选择StreamListener时,是根据fd的数值取模运算得来:

// fhgfs_meta\source\app\App.h

class App : public AbstractApp

{

public:

/**

* Get one of the available stream listeners based on the socket file descriptor number.

* This is to load-balance the sockets over all available stream listeners and ensure that

* sockets are not bouncing between different stream listeners.

*

* Note that IB connections eat two fd numbers, so 2 and multiples of 2 might not be a good

* value for number of stream listeners.

*/

virtual StreamListenerV2* getStreamListenerByFD(int fd)

{

return streamLisVec[fd % numStreamListeners];

}

}

数据包流处理

流处理类StreamListenerV2

StreamListener的定义:

// fhgfs_common\source\common\components\streamlistenerv2\StreamListenerV2.h

class StreamListenerV2 : public PThread

{

public:

/**

* This is what we will send over the socket return pipe

*/

struct SockReturnPipeInfo

{

/**

* Standard constructor for creating/sending a returnInfo.

*/

SockReturnPipeInfo(SockPipeReturnType returnType, Socket* sock) :

returnType(returnType), sock(sock) {}

/**

* For receiving only (no initialization of members).

*/

SockReturnPipeInfo() {}

SockPipeReturnType returnType;

Socket* sock;

};

}

流处理循环

StreamListener使用epoll同时处理新连接以及数据接收:

// fhgfs_common\source\common\components\streamlistenerv2\StreamListenerV2.cpp

void StreamListenerV2::run()

{

try

{

registerSignalHandler();

listenLoop();

log.log(Log_DEBUG, "Component stopped.");

}

catch(std::exception& e)

{

PThread::getCurrentThreadApp()->handleComponentException(e);

}

}

void StreamListenerV2::listenLoop()

{

const int epollTimeoutMS = useAggressivePoll ? 0 : 3000;

struct epoll_event epollEvents[EPOLL_EVENTS_NUM];

// (just to have these values on the stack...)

const int epollFD = this->epollFD;

FileDescriptor* sockReturnPipeReadEnd = this->sockReturnPipe->getReadFD();

bool runRDMAConnIdleCheck = false; // true just means we call the method (not enforce the check)

// wait for incoming events and handle them...

while(!getSelfTerminate() )

{

//log.log(Log_DEBUG, std::string("Before poll(). pollArrayLen: ") +

// StringTk::uintToStr(pollArrayLen) );

int epollRes = epoll_wait(epollFD, epollEvents, EPOLL_EVENTS_NUM, epollTimeoutMS);

if(unlikely(epollRes < 0) )

{ // error occurred

if(errno == EINTR) // ignore interruption, because the debugger causes this

continue;

log.logErr(std::string("Unrecoverable epoll_wait error: ") + System::getErrString() );

break;

}

else

if(unlikely(!epollRes || (rdmaCheckForceCounter++ > RDMA_CHECK_FORCE_POLLLOOPS) ) )

{ // epollRes==0 is nothing to worry about, just idle

// note: we can't run idle check here directly because the check might modify the

// poll set, which will be accessed in the loop below

runRDMAConnIdleCheck = true;

}

// handle incoming data & connection attempts

for(size_t i=0; i < (size_t)epollRes; i++)

{

struct epoll_event* currentEvent = &epollEvents[i];

Pollable* currentPollable = (Pollable*)currentEvent->data.ptr;

if(currentPollable == sockReturnPipeReadEnd)

onSockReturn();

else

onIncomingData( (Socket*)currentPollable);

}

if(unlikely(runRDMAConnIdleCheck) )

{ // note: whether check actually happens depends on elapsed time since last check

runRDMAConnIdleCheck = false;

rdmaConnIdleCheck();

}

}

}

新连接处理

如果是新连接,则加入epoll的fd中:

// fhgfs_common\source\common\components\streamlistenerv2\StreamListenerV2.cpp

/**

* Receive pointer to returned socket through the sockReturnPipe and re-add it to the pollList.

*/

void StreamListenerV2::onSockReturn()

{

SockReturnPipeInfo returnInfos[SOCKRETURN_SOCKS_NUM];

// try to get multiple returnInfos at once (if available)

ssize_t readRes = sockReturnPipe->getReadFD()->read(&returnInfos, sizeof(SockReturnPipeInfo) );

// loop: walk over each info and handle the contained socket

for(size_t i=0; ; i++)

{

SockReturnPipeInfo& currentReturnInfo = returnInfos[i];

// make sure we have a complete SockReturnPipeInfo

if(unlikely(readRes < (ssize_t)sizeof(SockReturnPipeInfo) ) )

{ // only got a partial SockReturnPipeInfo => recv the rest

char* currentReturnInfoChar = (char*)&currentReturnInfo;

sockReturnPipe->getReadFD()->readExact(

&currentReturnInfoChar[readRes], sizeof(SockReturnPipeInfo)-readRes);

readRes = sizeof(SockReturnPipeInfo);

}

// handle returnInfo depending on contained returnType...

Socket* currentSock = currentReturnInfo.sock;

SockPipeReturnType returnType = currentReturnInfo.returnType;

switch(returnType)

{

case SockPipeReturn_MSGDONE_NOIMMEDIATE:

{ // most likely case: worker is done with a msg and now returns the sock to the epoll set

struct epoll_event epollEvent;

epollEvent.events = EPOLLIN | EPOLLONESHOT | EPOLLET;

epollEvent.data.ptr = currentSock;

int epollRes = epoll_ctl(epollFD, EPOLL_CTL_MOD, currentSock->getFD(), &epollEvent);

if(likely(!epollRes) )

{ // sock was successfully re-armed in epoll set

pollList.add(currentSock);

break; // break out of switch

}

else

if(errno != ENOENT)

{ // error

log.logErr("Unable to re-arm sock in epoll set. "

"FD: " + StringTk::uintToStr(currentSock->getFD() ) + "; "

"SockTypeNum: " + StringTk::uintToStr(currentSock->getSockType() ) + "; "

"SysErr: " + System::getErrString() );

log.log(Log_NOTICE, "Disconnecting: " + currentSock->getPeername() );

delete(currentSock);

break; // break out of switch

}

/* for ENOENT, we fall through to NEWCONN, because this socket appearently wasn't

used with this stream listener yet, so we need to add it (instead of modify it) */

} // might fall through here on ENOENT

case SockPipeReturn_NEWCONN:

{ // new conn from ConnAcceptor (or wasn't used with this stream listener yet)

// add new socket file descriptor to epoll set

struct epoll_event epollEvent;

epollEvent.events = EPOLLIN | EPOLLONESHOT | EPOLLET;

epollEvent.data.ptr = currentSock;

int epollRes = epoll_ctl(epollFD, EPOLL_CTL_ADD, currentSock->getFD(), &epollEvent);

if(likely(!epollRes) )

{ // socket was successfully added to epoll set

pollList.add(currentSock);

}

else

{ // adding to epoll set failed => unrecoverable error

log.logErr("Unable to add sock to epoll set. "

"FD: " + StringTk::uintToStr(currentSock->getFD() ) + " "

"SockTypeNum: " + StringTk::uintToStr(currentSock->getSockType() ) + " "

"SysErr: " + System::getErrString() );

log.log(Log_NOTICE, "Disconnecting: " + currentSock->getPeername() );

delete(currentSock);

}

} break;

case SockPipeReturn_MSGDONE_WITHIMMEDIATE:

{ // special case: worker detected that immediate data is available after msg processing

// data immediately available => recv header and so on

onIncomingData(currentSock);

} break;

default:

{ // should never happen: unknown/unhandled returnType

log.logErr("Should never happen: "

"Unknown socket return type: " + StringTk::uintToStr(returnType) );

} break;

} // end of switch(returnType)

readRes -= sizeof(SockReturnPipeInfo);

if(!readRes)

break; // all received returnInfos have been processed

} // end of "for each received SockReturnPipeInfo" loop

}

数据包处理(生成工作)

生成Work(IncomingPreprocessedMsgWork),并放进队列(MultiWorkQueue):

// fhgfs_common\source\common\components\streamlistenerv2\StreamListenerV2.cpp

/**

* Receive msg header and add the socket to the work queue.

*/

void StreamListenerV2::onIncomingData(Socket* sock)

{

// check whether this is just a false alarm from a RDMASocket

if( (sock->getSockType() == NICADDRTYPE_RDMA) &&

isFalseAlarm( (RDMASocket*)sock) )

{

return;

}

try

{

const int recvTimeoutMS = 5000;

char msgHeaderBuf[NETMSG_HEADER_LENGTH];

NetMessageHeader msgHeader;

// receive & deserialize message header

sock->recvExactT(msgHeaderBuf, NETMSG_HEADER_LENGTH, 0, recvTimeoutMS);

NetMessage::deserializeHeader(msgHeaderBuf, NETMSG_HEADER_LENGTH, &msgHeader);

/* (note on header verification: we leave header verification work to the worker threads to

save CPU cycles in the stream listener and instead just take what we need to know here, no

matter whether the header is valid or not.) */

// create work and add it to queue

//log.log(Log_DEBUG, "Creating new work for to the queue");

IncomingPreprocessedMsgWork* work = new IncomingPreprocessedMsgWork(app, sock, &msgHeader);

int sockFD = sock->getFD(); /* note: we store this here for delayed pollList removal, because

worker thread might disconnect, so the sock gets deleted by the worker and thus "sock->"

pointer becomes invalid */

sock->setHasActivity(); // mark sock as active (for idle disconnect check)

// (note: userID intToStr (not uint) because default userID (~0) looks better this way)

LOG_DEBUG("StreamListenerV2::onIncomingData", Log_DEBUG,

"Incoming message: " + NetMsgStrMapping().defineToStr(msgHeader.msgType) + "; "

"from: " + sock->getPeername() + "; "

"userID: " + StringTk::intToStr(msgHeader.msgUserID) +

(msgHeader.msgTargetID

? "; targetID: " + StringTk::uintToStr(msgHeader.msgTargetID)

: "") );

if (sock->getIsDirect())

getWorkQueue(msgHeader.msgTargetID)->addDirectWork(work, msgHeader.msgUserID);

else

getWorkQueue(msgHeader.msgTargetID)->addIndirectWork(work, msgHeader.msgUserID);

/* notes on sock handling:

*) no need to remove sock from epoll set, because we use edge-triggered mode with

oneshot flag (which disables further events after first one has been reported).

*) a sock that is closed by a worker is not a problem, because it will automatically be

removed from the epoll set by the kernel.

*) we just need to re-arm the epoll entry upon sock return. */

pollList.removeByFD(sockFD);

return;

}

catch(SocketTimeoutException& e)

{

log.log(Log_NOTICE, "Connection timed out: " + sock->getPeername() );

}

catch(SocketDisconnectException& e)

{

// (note: level Log_DEBUG here to avoid spamming the log until we have log topics)

log.log(Log_DEBUG, std::string(e.what() ) );

}

catch(SocketException& e)

{

log.log(Log_NOTICE,

"Connection error: " + sock->getPeername() + ": " + std::string(e.what() ) );

}

// socket exception occurred => cleanup

pollList.removeByFD(sock->getFD() );

IncomingPreprocessedMsgWork::invalidateConnection(sock); // also includes delete(sock)

}

工作处理

工人类(Worker)

// fhgfs_common\source\common\components\streamlistenerv2\ConnAcceptor.cpp

#define WORKER_BUFIN_SIZE (1024*1024*4)

#define WORKER_BUFOUT_SIZE WORKER_BUFIN_SIZE

class Worker : public PThread

{

public:

Worker(std::string workerID, MultiWorkQueue* workQueue, QueueWorkType workType)

throw(ComponentInitException);

virtual ~Worker()

{

SAFE_FREE(bufIn);

SAFE_FREE(bufOut);

}

private:

LogContext log;

bool terminateWithFullQueue; // allow self-termination when queue not empty (see setter nodes)

size_t bufInLen;

char* bufIn;

size_t bufOutLen;

char* bufOut;

MultiWorkQueue* workQueue;

QueueWorkType workType;

HighResolutionStats stats;

virtual void run();

void workLoopAnyWork();

void workLoopDirectWork();

void initBuffers();

// inliners

bool maySelfTerminateNow()

{

if(terminateWithFullQueue ||

(!workQueue->getDirectWorkListSize() && !workQueue->getIndirectWorkListSize() ) )

return true;

return false;

}

public:

// setters & getters

/**

* Note: Do not use this after the run method of this component has been called!

*/

void setBufLens(size_t bufInLen, size_t bufOutLen)

{

this->bufInLen = bufInLen;

this->bufOutLen = bufOutLen;

}

/**

* WARNING: This will only work if there is only a single worker attached to a queue.

* Otherwise the queue would need a getWorkAndDontWait() method that is used during the

* termination phase of the worker, because the queue might become empty before the worker

* calls waitForWork() after the maySelfTerminateNow check.

*/

void disableTerminationWithFullQueue()

{

this->terminateWithFullQueue = false;

}

};

工作类(Work)

// fhgfs_common\source\common\components\worker\Work.h

class Work

{

public:

Work()

{

HighResolutionStatsTk::resetStats(&stats);

}

virtual ~Work() {}

Work(const Work&) = delete;

Work(Work&&) = delete;

Work& operator=(const Work&) = delete;

Work& operator=(Work&&) = delete;

virtual void process(char* bufIn, unsigned bufInLen, char* bufOut, unsigned bufOutLen) = 0;

protected:

HighResolutionStats stats;

public:

HighResolutionStats* getHighResolutionStats()

{

return &stats;

}

#ifdef BEEGFS_DEBUG_PROFILING

TimeFine* getAgeTime()

{

return &age;

}

private:

TimeFine age;

#endif

};

// fhgfs_common\source\common\components\streamlistenerv2\IncomingPreprocessedMsgWork.h

class IncomingPreprocessedMsgWork : public Work

{

public:

/**

* Note: Be aware that this class is only for stream connections that need to be returned

* to a StreamListenerV2 after processing.

*

* @param msgHeader contents will be copied

*/

IncomingPreprocessedMsgWork(AbstractApp* app, Socket* sock, NetMessageHeader* msgHeader)

{

this->app = app;

this->sock = sock;

this->msgHeader = *msgHeader;

}

virtual void process(char* bufIn, unsigned bufInLen, char* bufOut, unsigned bufOutLen);

static void releaseSocket(AbstractApp* app, Socket** sock, NetMessage* msg);

static void invalidateConnection(Socket* sock);

static bool checkRDMASocketImmediateData(AbstractApp* app, Socket* sock);

private:

AbstractApp* app;

Socket* sock;

NetMessageHeader msgHeader;

};

工作循环

从WorkQueens获取Work并进行处理:

// fhgfs_common\source\common\components\worker\Worker.cpp

void Worker::workLoop(QueueWorkType workType)

{

LOG(DEBUG, "Ready", as("TID", System::getTID()), workType);

workQueue->incNumWorkers(); // add this worker to queue stats

while(!getSelfTerminate() || !maySelfTerminateNow() )

{

Work* work = waitForWorkByType(stats, personalWorkQueue, workType);

#ifdef BEEGFS_DEBUG_PROFILING

TimeFine workStartTime;

#endif

HighResolutionStatsTk::resetStats(&stats); // prepare stats

// process the work packet

work->process(bufIn, bufInLen, bufOut, bufOutLen);

// update stats

stats.incVals.workRequests = 1;

HighResolutionStatsTk::addHighResIncStats(*work->getHighResolutionStats(), stats);

#ifdef BEEGFS_DEBUG_PROFILING

TimeFine workEndTime;

const auto workElapsedMS = workEndTime.elapsedSinceMS(&workStartTime);

const auto workLatencyMS = workEndTime.elapsedSinceMS(work->getAgeTime());

if (workElapsedMS >= 10)

{

if (workLatencyMS >= 10)

LOG_TOP(WORKQUEUES, DEBUG, "Work processed.",

as("Elapsed ms", workElapsedMS), as("Total latency (ms)", workLatencyMS));

else

LOG_TOP(WORKQUEUES, DEBUG, "Work processed.", as("Elapsed ms", workElapsedMS),

as("Total latency (us)", workEndTime.elapsedSinceMicro(work->getAgeTime())));

}

else

{

if (workLatencyMS >= 10)

{

LOG_TOP(WORKQUEUES, DEBUG, "Work processed.",

as("Elapsed us", workEndTime.elapsedSinceMicro(&workStartTime)),

as("Total latency (ms)", workEndTime.elapsedSinceMS(work->getAgeTime())));

}

else

{

LOG_TOP(WORKQUEUES, DEBUG, "Work processed.",

as("Elapsed us", workEndTime.elapsedSinceMicro(&workStartTime)),

as("Total latency (us)", workEndTime.elapsedSinceMicro(work->getAgeTime())));

}

}

#endif

// cleanup

delete(work);

}

}

工作处理(消息生成和处理)

处理Work时,使用Work基类的processIncoming虚函数进行处理:

// fhgfs_common\source\common\components\streamlistenerv2\IncomingPreprocessedMsgWork.cpp

void IncomingPreprocessedMsgWork::process(char* bufIn, unsigned bufInLen,

char* bufOut, unsigned bufOutLen)

{

const char* logContextStr = "Work (process incoming msg)";

const int recvTimeoutMS = 5000;

unsigned numReceived = NETMSG_HEADER_LENGTH; // (header actually received by stream listener)

NetMessage* msg = NULL;

try

{

// attach stats to sock (stream listener already received the msg header)

stats.incVals.netRecvBytes += NETMSG_HEADER_LENGTH;

sock->setStats(&stats);

// make sure msg length fits into our receive buffer

unsigned msgLength = msgHeader.msgLength;

unsigned msgPayloadLength = msgLength - numReceived;

if(unlikely(msgPayloadLength > bufInLen) )

{ // message too big

LogContext(logContextStr).log(Log_NOTICE,

std::string("Received a message that is too large. Disconnecting: ") +

sock->getPeername() );

sock->unsetStats();

invalidateConnection(sock);

return;

}

// receive the message payload

if(msgPayloadLength)

sock->recvExactT(bufIn, msgPayloadLength, 0, recvTimeoutMS);

// we got the complete message buffer => create msg object

AbstractApp* app = PThread::getCurrentThreadApp();

ICommonConfig* cfg = app->getCommonConfig();

AbstractNetMessageFactory* netMessageFactory = app->getNetMessageFactory();

msg = netMessageFactory->createFromPreprocessedBuf(&msgHeader, bufIn, msgPayloadLength);

if(unlikely(msg->getMsgType() == NETMSGTYPE_Invalid) )

{ // message invalid

LogContext(logContextStr).log(Log_NOTICE,

std::string("Received an invalid message. Disconnecting: ") + sock->getPeername() );

sock->unsetStats();

invalidateConnection(sock);

delete(msg);

return;

}

// process the received msg

bool processRes = false;

if(likely(!cfg->getConnAuthHash() ||

sock->getIsAuthenticated() ||

(msg->getMsgType() == NETMSGTYPE_AuthenticateChannel) ) )

{ // auth disabled or channel is auth'ed or this is an auth msg => process

NetMessage::ResponseContext rctx(NULL, sock, bufOut, bufOutLen, &stats);

processRes = msg->processIncoming(rctx);

}

else

LogContext(logContextStr).log(Log_NOTICE,

std::string("Rejecting message from unauthenticated peer: ") + sock->getPeername() );

// processing completed => cleanup

bool needSockRelease = msg->getReleaseSockAfterProcessing();

delete(msg);

msg = NULL;

if(!needSockRelease)

return; // sock release was already done within msg->processIncoming() method

if(unlikely(!processRes) )

{ // processIncoming encountered messaging error => invalidate connection

LogContext(logContextStr).log(Log_NOTICE,

std::string("Problem encountered during processing of a message. Disconnecting: ") +

sock->getPeername() );

invalidateConnection(sock);

return;

}

releaseSocket(app, &sock, NULL);

return;

}

catch(SocketTimeoutException& e)

{

LogContext(logContextStr).log(Log_NOTICE,

std::string("Connection timed out: ") + sock->getPeername() );

}

catch(SocketDisconnectException& e)

{

// (note: level Log_DEBUG here to avoid spamming the log until we have log topics)

LogContext(logContextStr).log(Log_DEBUG, std::string(e.what() ) );

}

catch(SocketException& e)

{

LogContext(logContextStr).log(Log_NOTICE,

std::string("Connection error: ") + sock->getPeername() + std::string(": ") +

std::string(e.what() ) );

}

// socket exception occurred => cleanup

if(msg && msg->getReleaseSockAfterProcessing() )

{

sock->unsetStats();

invalidateConnection(sock);

}

SAFE_DELETE(msg);

}

消息工厂

消息工厂类(NetMessageFactory)

StreamListener收到数据时使用消息工厂类生成各种类型的消息:

// fhgfs_meta\source\net\message\NetMessageFactory.h

class NetMessageFactory : public AbstractNetMessageFactory

{

public:

NetMessageFactory() {}

protected:

virtual NetMessage* createFromMsgType(unsigned short msgType);

} ;

消息工厂初始化

// fhgfs_meta\source\app\App.cpp

/**

* Init basic networking data structures.

*

* Note: no RDMA is detected here, because this needs to be done later

*/

void App::initBasicNetwork()

{

// check if management host is defined

if(!cfg->getSysMgmtdHost().length() )

throw InvalidConfigException("Management host undefined");

// prepare filter for outgoing packets/connections

this->netFilter = new NetFilter(cfg->getConnNetFilterFile() );

this->tcpOnlyFilter = new NetFilter(cfg->getConnTcpOnlyFilterFile() );

// prepare filter for interfaces

StringList allowedInterfaces;

std::string interfacesList = cfg->getConnInterfacesList();

if(!interfacesList.empty() )

{

log->log(Log_DEBUG, "Allowed interfaces: " + interfacesList);

StringTk::explodeEx(interfacesList, ',', true, &allowedInterfaces);

}

// discover local NICs and filter them

NetworkInterfaceCard::findAllInterfaces(allowedInterfaces, cfg->getConnUseSDP(), localNicList);

if(localNicList.empty() )

throw InvalidConfigException("Couldn't find any usable NIC");

localNicList.sort(&NetworkInterfaceCard::nicAddrPreferenceComp);

// prepare factory for incoming messages

this->netMessageFactory = new NetMessageFactory();

}

生成消息

消息实例的生成均根据msgType来确定:

// fhgfs_meta\source\net\message\NetMessageFactory.cpp

/**

* @return NetMessage that must be deleted by the caller

* (msg->msgType is NETMSGTYPE_Invalid on error)

*/

NetMessage* NetMessageFactory::createFromMsgType(unsigned short msgType)

{

NetMessage* msg;

switch(msgType)

{

// The following lines are grouped by "type of the message" and ordered alphabetically inside

// the groups. There should always be one message per line to keep a clear layout (although

// this might lead to lines that are longer than usual)

// control messages

case NETMSGTYPE_Ack: { msg = new AckMsgEx(); } break;

case NETMSGTYPE_AuthenticateChannel: { msg = new AuthenticateChannelMsgEx(); } break;

case NETMSGTYPE_GenericResponse: { msg = new GenericResponseMsg(); } break;

case NETMSGTYPE_SetChannelDirect: { msg = new SetChannelDirectMsgEx(); } break;

case NETMSGTYPE_PeerInfo: { msg = new PeerInfoMsgEx(); } break;

// nodes messages

case NETMSGTYPE_ChangeTargetConsistencyStatesResp: { msg = new ChangeTargetConsistencyStatesRespMsg(); } break;

case NETMSGTYPE_GenericDebug: { msg = new GenericDebugMsgEx(); } break;

case NETMSGTYPE_GetClientStats: { msg = new GetClientStatsMsgEx(); } break;

case NETMSGTYPE_GetMirrorBuddyGroupsResp: { msg = new GetMirrorBuddyGroupsRespMsg(); } break;

case NETMSGTYPE_GetNodeCapacityPools: { msg = new GetNodeCapacityPoolsMsgEx(); } break;

case NETMSGTYPE_GetNodeCapacityPoolsResp: { msg = new GetNodeCapacityPoolsRespMsg(); } break;

case NETMSGTYPE_GetNodes: { msg = new GetNodesMsgEx(); } break;

case NETMSGTYPE_GetNodesResp: { msg = new GetNodesRespMsg(); } break;

case NETMSGTYPE_GetStatesAndBuddyGroupsResp: { msg = new GetStatesAndBuddyGroupsRespMsg(); } break;

case NETMSGTYPE_GetTargetMappings: { msg = new GetTargetMappingsMsgEx(); } break;

case NETMSGTYPE_GetTargetMappingsResp: { msg = new GetTargetMappingsRespMsg(); } break;

case NETMSGTYPE_GetTargetStatesResp: { msg = new GetTargetStatesRespMsg(); } break;

case NETMSGTYPE_HeartbeatRequest: { msg = new HeartbeatRequestMsgEx(); } break;

case NETMSGTYPE_Heartbeat: { msg = new HeartbeatMsgEx(); } break;

case NETMSGTYPE_MapTargets: { msg = new MapTargetsMsgEx(); } break;

case NETMSGTYPE_PublishCapacities: { msg = new PublishCapacitiesMsgEx(); } break;

case NETMSGTYPE_RegisterNodeResp: { msg = new RegisterNodeRespMsg(); } break;

case NETMSGTYPE_RemoveNode: { msg = new RemoveNodeMsgEx(); } break;

case NETMSGTYPE_RemoveNodeResp: { msg = new RemoveNodeRespMsg(); } break;

case NETMSGTYPE_RefreshCapacityPools: { msg = new RefreshCapacityPoolsMsgEx(); } break;

case NETMSGTYPE_RefreshTargetStates: { msg = new RefreshTargetStatesMsgEx(); } break;

case NETMSGTYPE_SetMirrorBuddyGroup: { msg = new SetMirrorBuddyGroupMsgEx(); } break;

case NETMSGTYPE_SetRootNodeIDResp: { msg = new SetRootNodeIDRespMsg(); } break;

case NETMSGTYPE_SetTargetConsistencyStates: { msg = new SetTargetConsistencyStatesMsgEx(); } break;

case NETMSGTYPE_SetTargetConsistencyStatesResp: { msg = new SetTargetConsistencyStatesRespMsg(); } break;

// storage messages

case NETMSGTYPE_FindEntryname: { msg = new FindEntrynameMsgEx(); } break;

case NETMSGTYPE_FindLinkOwner: { msg = new FindLinkOwnerMsgEx(); } break;

case NETMSGTYPE_FindOwner: { msg = new FindOwnerMsgEx(); } break;

case NETMSGTYPE_FindOwnerResp: { msg = new FindOwnerRespMsg(); } break;

case NETMSGTYPE_GetChunkFileAttribsResp: { msg = new GetChunkFileAttribsRespMsg(); } break;

case NETMSGTYPE_GetStorageTargetInfo: { msg = new GetStorageTargetInfoMsgEx(); } break;

case NETMSGTYPE_GetEntryInfo: { msg = new GetEntryInfoMsgEx(); } break;

case NETMSGTYPE_GetEntryInfoResp: { msg = new GetEntryInfoRespMsg(); } break;

case NETMSGTYPE_GetHighResStats: { msg = new GetHighResStatsMsgEx(); } break;

case NETMSGTYPE_GetMetaResyncStats: { msg = new GetMetaResyncStatsMsgEx(); } break;

case NETMSGTYPE_RequestExceededQuotaResp: {msg = new RequestExceededQuotaRespMsg(); } break;

case NETMSGTYPE_SetExceededQuota: {msg = new SetExceededQuotaMsgEx(); } break;

case NETMSGTYPE_StorageResyncStarted: { msg = new StorageResyncStartedMsgEx(); } break;

case NETMSGTYPE_StorageResyncStartedResp: { msg = new StorageResyncStartedRespMsg(); } break;

case NETMSGTYPE_GetXAttr: { msg = new GetXAttrMsgEx(); } break;

case NETMSGTYPE_Hardlink: { msg = new HardlinkMsgEx(); } break;

case NETMSGTYPE_HardlinkResp: { msg = new HardlinkRespMsg(); } break;

case NETMSGTYPE_ListDirFromOffset: { msg = new ListDirFromOffsetMsgEx(); } break;

case NETMSGTYPE_ListDirFromOffsetResp: { msg = new ListDirFromOffsetRespMsg(); } break;

case NETMSGTYPE_ListXAttr: { msg = new ListXAttrMsgEx(); } break;

case NETMSGTYPE_LookupIntent: { msg = new LookupIntentMsgEx(); } break;

case NETMSGTYPE_LookupIntentResp: { msg = new LookupIntentRespMsg(); } break;

case NETMSGTYPE_MkDir: { msg = new MkDirMsgEx(); } break;

case NETMSGTYPE_MkDirResp: { msg = new MkDirRespMsg(); } break;

case NETMSGTYPE_MkFile: { msg = new MkFileMsgEx(); } break;

case NETMSGTYPE_MkFileResp: { msg = new MkFileRespMsg(); } break;

case NETMSGTYPE_MkFileWithPattern: { msg = new MkFileWithPatternMsgEx(); } break;

case NETMSGTYPE_MkFileWithPatternResp: { msg = new MkFileWithPatternRespMsg(); } break;

case NETMSGTYPE_MkLocalDir: { msg = new MkLocalDirMsgEx(); } break;

case NETMSGTYPE_MkLocalDirResp: { msg = new MkLocalDirRespMsg(); } break;

case NETMSGTYPE_MkLocalFileResp: { msg = new MkLocalFileRespMsg(); } break;

case NETMSGTYPE_MovingDirInsert: { msg = new MovingDirInsertMsgEx(); } break;

case NETMSGTYPE_MovingDirInsertResp: { msg = new MovingDirInsertRespMsg(); } break;

case NETMSGTYPE_MovingFileInsert: { msg = new MovingFileInsertMsgEx(); } break;

case NETMSGTYPE_MovingFileInsertResp: { msg = new MovingFileInsertRespMsg(); } break;

case NETMSGTYPE_RefreshEntryInfo: { msg = new RefreshEntryInfoMsgEx(); } break;

case NETMSGTYPE_RefreshEntryInfoResp: { msg = new RefreshEntryInfoRespMsg(); } break;

case NETMSGTYPE_ResyncRawInodes: { msg = new ResyncRawInodesMsgEx(); } break;

case NETMSGTYPE_ResyncRawInodesResp: { msg = new ResyncRawInodesRespMsg(); } break;

case NETMSGTYPE_ResyncSessionStore: { msg = new ResyncSessionStoreMsgEx(); } break;

case NETMSGTYPE_ResyncSessionStoreResp: { msg = new ResyncSessionStoreRespMsg(); } break;

case NETMSGTYPE_RemoveXAttr: { msg = new RemoveXAttrMsgEx(); } break;

case NETMSGTYPE_RemoveXAttrResp: { msg = new RemoveXAttrRespMsg(); } break;

case NETMSGTYPE_Rename: { msg = new RenameV2MsgEx(); } break;

case NETMSGTYPE_RenameResp: { msg = new RenameRespMsg(); } break;

case NETMSGTYPE_RmChunkPathsResp: { msg = new RmChunkPathsRespMsg(); } break;

case NETMSGTYPE_RmDirEntry: { msg = new RmDirEntryMsgEx(); } break;

case NETMSGTYPE_RmDir: { msg = new RmDirMsgEx(); } break;

case NETMSGTYPE_RmDirResp: { msg = new RmDirRespMsg(); } break;

case NETMSGTYPE_RmLocalDir: { msg = new RmLocalDirMsgEx(); } break;

case NETMSGTYPE_RmLocalDirResp: { msg = new RmLocalDirRespMsg(); } break;

case NETMSGTYPE_SetAttr: { msg = new SetAttrMsgEx(); } break;

case NETMSGTYPE_SetAttrResp: { msg = new SetAttrRespMsg(); } break;

case NETMSGTYPE_SetDirPattern: { msg = new SetDirPatternMsgEx(); } break;

case NETMSGTYPE_SetDirPatternResp: { msg = new SetDirPatternRespMsg(); } break;

case NETMSGTYPE_SetLocalAttrResp: { msg = new SetLocalAttrRespMsg(); } break;

case NETMSGTYPE_SetMetadataMirroring: { msg = new SetMetadataMirroringMsgEx(); } break;

case NETMSGTYPE_SetStorageTargetInfoResp: { msg = new SetStorageTargetInfoRespMsg(); } break;

case NETMSGTYPE_SetXAttr: { msg = new SetXAttrMsgEx(); } break;

case NETMSGTYPE_SetXAttrResp: { msg = new SetXAttrRespMsg(); } break;

case NETMSGTYPE_Stat: { msg = new StatMsgEx(); } break;

case NETMSGTYPE_StatResp: { msg = new StatRespMsg(); } break;

case NETMSGTYPE_StatStoragePath: { msg = new StatStoragePathMsgEx(); } break;

case NETMSGTYPE_StatStoragePathResp: { msg = new StatStoragePathRespMsg(); } break;

case NETMSGTYPE_TruncFile: { msg = new TruncFileMsgEx(); } break;

case NETMSGTYPE_TruncFileResp: { msg = new TruncFileRespMsg(); } break;

case NETMSGTYPE_TruncLocalFileResp: { msg = new TruncLocalFileRespMsg(); } break;

case NETMSGTYPE_UnlinkFile: { msg = new UnlinkFileMsgEx(); } break;

case NETMSGTYPE_UnlinkFileResp: { msg = new UnlinkFileRespMsg(); } break;

case NETMSGTYPE_UnlinkLocalFileResp: { msg = new UnlinkLocalFileRespMsg(); } break;

case NETMSGTYPE_UpdateBacklinkResp: { msg = new UpdateBacklinkRespMsg(); } break;

case NETMSGTYPE_UpdateDirParent: { msg = new UpdateDirParentMsgEx(); } break;

case NETMSGTYPE_UpdateDirParentResp: { msg = new UpdateDirParentRespMsg(); } break;

// session messages

case NETMSGTYPE_BumpFileVersion: { msg = new BumpFileVersionMsgEx(); } break;

case NETMSGTYPE_BumpFileVersionResp: { msg = new BumpFileVersionRespMsg(); } break;

case NETMSGTYPE_OpenFile: { msg = new OpenFileMsgEx(); } break;

case NETMSGTYPE_OpenFileResp: { msg = new OpenFileRespMsg(); } break;

case NETMSGTYPE_CloseFile: { msg = new CloseFileMsgEx(); } break;

case NETMSGTYPE_CloseFileResp: { msg = new CloseFileRespMsg(); } break;

case NETMSGTYPE_CloseChunkFileResp: { msg = new CloseChunkFileRespMsg(); } break;

case NETMSGTYPE_WriteLocalFileResp: { msg = new WriteLocalFileRespMsg(); } break;

case NETMSGTYPE_FSyncLocalFileResp: { msg = new FSyncLocalFileRespMsg(); } break;

case NETMSGTYPE_FLockAppend: { msg = new FLockAppendMsgEx(); } break;

case NETMSGTYPE_FLockEntry: { msg = new FLockEntryMsgEx(); } break;

case NETMSGTYPE_FLockEntryResp: { msg = new FLockEntryRespMsg(); } break;

case NETMSGTYPE_FLockRange: { msg = new FLockRangeMsgEx(); } break;

case NETMSGTYPE_FLockRangeResp: { msg = new FLockRangeRespMsg(); } break;

case NETMSGTYPE_GetFileVersion: { msg = new GetFileVersionMsgEx(); } break;

case NETMSGTYPE_AckNotify: { msg = new AckNotifiyMsgEx(); } break;

case NETMSGTYPE_AckNotifyResp: { msg = new AckNotifiyRespMsg(); } break;

//admon messages

case NETMSGTYPE_RequestMetaData: { msg = new RequestMetaDataMsgEx(); } break;

case NETMSGTYPE_GetNodeInfo: { msg = new GetNodeInfoMsgEx(); } break;

// fsck messages

case NETMSGTYPE_RetrieveDirEntries: { msg = new RetrieveDirEntriesMsgEx(); } break;

case NETMSGTYPE_RetrieveInodes: { msg = new RetrieveInodesMsgEx(); } break;

case NETMSGTYPE_RetrieveFsIDs: { msg = new RetrieveFsIDsMsgEx(); } break;

case NETMSGTYPE_DeleteDirEntries: { msg = new DeleteDirEntriesMsgEx(); } break;

case NETMSGTYPE_CreateDefDirInodes: { msg = new CreateDefDirInodesMsgEx(); } break;

case NETMSGTYPE_FixInodeOwners: { msg = new FixInodeOwnersMsgEx(); } break;

case NETMSGTYPE_FixInodeOwnersInDentry: { msg = new FixInodeOwnersInDentryMsgEx(); } break;

case NETMSGTYPE_LinkToLostAndFound: { msg = new LinkToLostAndFoundMsgEx(); } break;

case NETMSGTYPE_CreateEmptyContDirs: { msg = new CreateEmptyContDirsMsgEx(); } break;

case NETMSGTYPE_UpdateFileAttribs: { msg = new UpdateFileAttribsMsgEx(); } break;

case NETMSGTYPE_UpdateDirAttribs: { msg = new UpdateDirAttribsMsgEx(); } break;

case NETMSGTYPE_RemoveInodes: { msg = new RemoveInodesMsgEx(); } break;

case NETMSGTYPE_RecreateFsIDs: { msg = new RecreateFsIDsMsgEx(); } break;

case NETMSGTYPE_RecreateDentries: { msg = new RecreateDentriesMsgEx(); } break;

case NETMSGTYPE_FsckSetEventLogging: { msg = new FsckSetEventLoggingMsgEx(); } break;

case NETMSGTYPE_AdjustChunkPermissions: { msg = new AdjustChunkPermissionsMsgEx(); } break;

default:

{

msg = new SimpleMsg(NETMSGTYPE_Invalid);

} break;

}

return msg;

}

应用挂载beegfs指定目录_BeeGFS源码分析1-元数据服务概要分析相关推荐

  1. 从Internet上抓取指定URL的源码的方案

    从Internet上抓取指定URL的源码的方案 作者: 引言: 在做无线项目的时候,与通讯公司的数据通讯有一部分是通过XML交互的,所以必须要动态抓取通讯公司提供的固定的Internet上的数据,便研 ...

  2. android device目录,Android源码下device目录的分析

    一般源码的编译前都会执行lunch命令,选择编译目标: source build/envsetup.sh lunch BUILD_BUILDTYPE 其中BUILDTYPE可以为user.userde ...

  3. 【OpenHarmony-v3.2代码分析】02 - device目录 uboot源码分析

    [OpenHarmony-v3.2代码分析]02 - device目录 uboot源码分析 1. device 目录分析 从本文开始 ,我们正式来分析OpenHarmony-V3.2的源码的 uboo ...

  4. 源码 状态机_[源码阅读] 阿里SOFA服务注册中心MetaServer(1)

    [源码阅读] 阿里SOFA服务注册中心MetaServer(1) 0x00 摘要 0x01 服务注册中心 1.1 服务注册中心简介 1.2 SOFARegistry 总体架构 1.3 为什么要分层 0 ...

  5. [源码阅读] 阿里SOFA服务注册中心MetaServer(1)

    0x00 摘要 SOFARegistry 是蚂蚁金服开源的一个生产级.高时效.高可用的服务注册中心.本系列将带领大家一起分析其MetaServer的实现机制,本文为第一篇,介绍MetaServer总体 ...

  6. 分析开源项目源码,我们该如何入手分析?(授人以渔)

    点击上方 好好学java ,选择 星标 公众号 重磅资讯.干货,第一时间送达 今日推荐:牛人 20000 字的 Spring Cloud 总结,太硬核了~ 1 前言 本文接上篇文章跟大家聊聊我们为什么 ...

  7. (Nacos源码解析五)Nacos服务事件变动源码解析

    Nacos源码解析系列目录 Nacos 源码编译运行 (Nacos源码解析一)Nacos 注册实例源码解析 (Nacos源码解析二)Nacos 服务发现源码解析 (Nacos源码解析三)Nacos 心 ...

  8. 即时通讯源码-即时通讯集群服务免费-通讯百万并发技术-Openfire 的安装配置教程手册-哇谷即时通讯集群方案-哇谷云-哇谷即时通讯源码

    即时通讯源码-即时通讯集群服务免费-通讯百万并发技术-Openfire 的安装配置教程手册-哇谷即时通讯集群方案-哇谷云 1,openfire开发环境配置 很久没有写点东西了.最近很烦心,领导不给力. ...

  9. InfluxDB源码阅读之httpd服务

    操作系统 : CentOS7.3.1611_x64 go语言版本:1.8.3 linux/amd64 InfluxDB版本:1.1.0 服务模块介绍 源码路径: github.com/influxda ...

最新文章

  1. android设置title字体大小,android setTitle怎么实现字体的大小
  2. Python:变量与字符串
  3. 人工智能浪潮下,什么内容值得看?
  4. mybatis 调用存储过程_你真的该进来了解下MyBatis的SQL工作流程了
  5. 可实现自动设置尺寸的图片上传类
  6. 107_Power Pivot员工效率监控
  7. Linux修改root用户登录密码
  8. 跨越鸿沟,IIoT 如何更融合与开放?
  9. 成为iPhone游戏开发者的十大秘技
  10. 手机sstv解码软件_关于二维码扫描使用专业扫描设备和手机二维码扫描软件的区别?...
  11. idea中ssm集成freemark_基于SSM框架的迷你天猫商城
  12. 十分钟完成的操作系统编写 你信吗?
  13. 信息安全等级保护建设(二,三级)需上的设备
  14. 学计算机平面设计软件有哪些,计算机平面设计软件有哪些
  15. STM8系列芯使用STVP烧写程序设定读保护和如何解除读保护操作
  16. C语言打印三角形,倒三角形,打印沙漏图形
  17. win10计算机屏幕暗怎么办,Win10电脑屏幕亮度调不了怎么办
  18. linux MySQL操作
  19. 【在线教程】pdf文件怎么压缩到最小
  20. 业内人士真心话:只会测试没有前途的,我慌了....

热门文章

  1. 计算机能实现哪些人类智力活动,人工智能是电脑科学的一个重要分支,它的近期目标是什么?...
  2. 智能水位检测系统proteus_浅谈智能视觉检测系统的6大优点
  3. wxpython滑动面板_wxpython实现按钮切换界面的方法
  4. 摄影灵感|轮廓趋势,剪影以一种主要的方式回来了。
  5. 液态渐变背景纹理,选择一个新潮的背景,为你的设计加分!
  6. 平安夜、圣诞节设计素材和灵感|撒糖(PNG免扣素材)
  7. 干货素材|UI设计中的字体样式的应用
  8. 设计psd分层素材模板|家居海报设计思路!
  9. excel匹配_Excel常用的关联匹配函数
  10. Windows互斥锁的使用