mosquitto源码分析

  • 一、mosquitto简介
  • 二、主要目录
  • 三、客户端源码
    • 结构体
    • 重要函数
      • 1、mosquitto_lib_init
      • 2、mosquitto_new
      • 3、mosquitto_connect
      • 4、mosquitto_discnnect
      • 5、mosquitto_destroy
      • 6、mosquitto_lib_cleanup
    • 网络循环函数
      • 1、mosquitto_loop
      • 2、mosquitto_loop_forever
      • 3、mosquitto_loop_start
      • 4、mosquitto_loop_stop
    • 订阅发布函数
      • 1、mosquitto_publish
      • 2、mosquitto_subscribe
      • 3、mosquitto_unsubscribe

一、mosquitto简介

mosquitto是一款实现了消息推送协议MQTT v3.1 的开源消息代理软件,提供轻量级的,支持可发布/可订阅的的消息推送模式,使设备对设备之间的短消息通信变得简单,例如现在应用广泛的低功耗传感器,手机、嵌入式计算机、微型控制器等移动设备。
Mosquitto采用出版/订阅的模式实现MQTT协议,这种设计模式将通信终端之间的关系统一到服务程序中进行管理,可极大减轻客户端的开发和维护工作。

mqtt协议详细介绍:http://t.csdn.cn/IVtqa
源码地址:https://github.com/eclipse/mosquitto

二、主要目录

主要需要关注的有/mosquitto/src、/mosquitto/lib、/mosquitto/client。

其中/src和/lib目录下主要放置服务端(Broker)的实现代码以及部分底层与网络相关的操作;client目录主要是订阅客户端和发布客户端的实现源码。

mosquitto_internal.h定义各种数据结构
mosquitto:外部调用接口
memory_mosq:内存分配处理,可记录内存用量
net_mosq:网络基础操作,tcp 创建,关闭等;打包/解包数据,向_mosquitto_packet 中写入/读取各种数据
send_mosq:主要实现发送请求逻辑(协议组包),实际命令请求实现组包
send_client_mosq:与 send_mosq 类似,主要实现客户端常用高频使用接口;
messages_mosq:主要针对消息的实现(PUBLISH,PUBACK,PUBREL…)
read_handle:处理收到的数据包,根据数据包类型做相应处理。

三、客户端源码

结构体

struct mosq_config结构体主要为MQTT的配置信息,定义在client_shared.h文件中

struct mosq_config {char *id;char *id_prefix;int protocol_version;int keepalive;char *host;int port;int qos;bool retain;int pub_mode; /* pub, rr */char *file_input; /* pub, rr */char *message; /* pub, rr */int msglen; /* pub, rr */char *topic; /* pub, rr */char *bind_address;int repeat_count; /* pub */struct timeval repeat_delay; /* pub */
#ifdef WITH_SRVbool use_srv;
#endifbool debug;bool quiet;unsigned int max_inflight;char *username;char *password;char *will_topic;char *will_payload;int will_payloadlen;int will_qos;bool will_retain;
#ifdef WITH_TLSchar *cafile;char *capath;char *certfile;char *keyfile;char *ciphers;bool insecure;char *tls_alpn;char *tls_version;char *tls_engine;char *tls_engine_kpass_sha1;char *keyform;bool tls_use_os_certs;
#  ifdef FINAL_WITH_TLS_PSKchar *psk;char *psk_identity;
#  endif
#endifbool clean_session;char **topics; /* sub, rr */int topic_count; /* sub, rr */bool exit_after_sub; /* sub */bool no_retain; /* sub */bool retained_only; /* sub */bool remove_retained; /* sub */char **filter_outs; /* sub */int filter_out_count; /* sub */char **unsub_topics; /* sub */int unsub_topic_count; /* sub */bool verbose; /* sub */bool eol; /* sub */int msg_count; /* sub */char *format; /* sub, rr */bool pretty; /* sub, rr */unsigned int timeout; /* sub */int sub_opts; /* sub */long session_expiry_interval;int random_filter; /* sub */
#ifdef WITH_SOCKSchar *socks5_host;int socks5_port;char *socks5_username;char *socks5_password;
#endifmosquitto_property *connect_props;mosquitto_property *publish_props;mosquitto_property *subscribe_props;mosquitto_property *unsubscribe_props;mosquitto_property *disconnect_props;mosquitto_property *will_props;bool have_topic_alias; /* pub */char *response_topic; /* rr */bool tcp_nodelay;
};

结构体 struct mosquito 主要用于保存一个客户端连接的所有信息,例如用户名、密码、用户ID、向该客户端发送的消息等

struct mosquitto {mosq_sock_t sock;
#ifndef WITH_BROKERmosq_sock_t sockpairR, sockpairW;
#endif
#if defined(__GLIBC__) && defined(WITH_ADNS)struct gaicb *adns; /* For getaddrinfo_a */
#endifenum _mosquitto_protocol protocol;char *address;char *id;char *username;char *password;uint16_t keepalive;uint16_t last_mid;enum mosquitto_client_state state;time_t last_msg_in;time_t next_msg_out;time_t ping_t;struct _mosquitto_packet in_packet;struct _mosquitto_packet *current_out_packet;struct _mosquitto_packet *out_packet;struct mosquitto_message *will;
#ifdef WITH_TLSSSL *ssl;SSL_CTX *ssl_ctx;char *tls_cafile;char *tls_capath;char *tls_certfile;char *tls_keyfile;int (*tls_pw_callback)(char *buf, int size, int rwflag, void *userdata);char *tls_version;char *tls_ciphers;char *tls_psk;char *tls_psk_identity;int tls_cert_reqs;bool tls_insecure;
#endifbool want_write;bool want_connect;
#if defined(WITH_THREADING) && !defined(WITH_BROKER)pthread_mutex_t callback_mutex;pthread_mutex_t log_callback_mutex;pthread_mutex_t msgtime_mutex;pthread_mutex_t out_packet_mutex;pthread_mutex_t current_out_packet_mutex;pthread_mutex_t state_mutex;pthread_mutex_t in_message_mutex;pthread_mutex_t out_message_mutex;pthread_mutex_t mid_mutex;pthread_t thread_id;
#endifbool clean_session;
#ifdef WITH_BROKERbool is_dropping;bool is_bridge;struct _mqtt3_bridge *bridge;struct mosquitto_client_msg *msgs;struct mosquitto_client_msg *last_msg;int msg_count;int msg_count12;struct _mosquitto_acl_user *acl_list;struct _mqtt3_listener *listener;time_t disconnect_t;struct _mosquitto_packet *out_packet_last;struct _mosquitto_subhier **subs;int sub_count;int pollfd_index;
#  ifdef WITH_WEBSOCKETS
#    if defined(LWS_LIBRARY_VERSION_NUMBER)struct lws *wsi;
#    elsestruct libwebsocket_context *ws_context;struct libwebsocket *wsi;
#    endif
#  endifbool ws_want_write;
#else
#  ifdef WITH_SOCKSchar *socks5_host;int socks5_port;char *socks5_username;char *socks5_password;
#  endifvoid *userdata;bool in_callback;unsigned int message_retry;time_t last_retry_check;struct mosquitto_message_all *in_messages;struct mosquitto_message_all *in_messages_last;struct mosquitto_message_all *out_messages;struct mosquitto_message_all *out_messages_last;void (*on_connect)(struct mosquitto *, void *userdata, int rc);void (*on_disconnect)(struct mosquitto *, void *userdata, int rc);void (*on_publish)(struct mosquitto *, void *userdata, int mid);void (*on_message)(struct mosquitto *, void *userdata, const struct mosquitto_message *message);void (*on_subscribe)(struct mosquitto *, void *userdata, int mid, int qos_count, const int *granted_qos);void (*on_unsubscribe)(struct mosquitto *, void *userdata, int mid);void (*on_log)(struct mosquitto *, void *userdata, int level, const char *str);//void (*on_error)();char *host;int port;int in_queue_len;int out_queue_len;char *bind_address;unsigned int reconnect_delay;unsigned int reconnect_delay_max;bool reconnect_exponential_backoff;char threaded;struct _mosquitto_packet *out_packet_last;int inflight_messages;int max_inflight_messages;
#  ifdef WITH_SRVares_channel achan;
#  endif
#endif#ifdef WITH_BROKERUT_hash_handle hh_id;UT_hash_handle hh_sock;struct mosquitto *for_free_next;
#endif
};

client_config_load 客户端配置负载
第二个参数,可选择选择是 PUB 还是 SUB

client_config_load在client_shared.c文件中

然后看到 init_config 函数,可以看到一些初始化配置,在client_shared.c文件中

重要函数

1、mosquitto_lib_init

函数位于lib/mosquitto.c文件中,初始化需要的网络资源

int mosquitto_lib_init(void)
{int rc;if (init_refcount == 0) {
#ifdef WIN32srand((unsigned int)GetTickCount64());
#elif _POSIX_TIMERS>0 && defined(_POSIX_MONOTONIC_CLOCK)struct timespec tp;clock_gettime(CLOCK_MONOTONIC, &tp);srand((unsigned int)tp.tv_nsec);
#elif defined(__APPLE__)uint64_t ticks;ticks = mach_absolute_time();srand((unsigned int)ticks);
#elsestruct timeval tv;gettimeofday(&tv, NULL);srand(tv.tv_sec*1000 + tv.tv_usec/1000);
#endifrc = net__init();if (rc != MOSQ_ERR_SUCCESS) {return rc;}}init_refcount++;return MOSQ_ERR_SUCCESS;
}

client_id_generate 生成客户端 ID
其实就是我们讲MQTT服务器的时候,订阅主题然后在服务器上多出的那一行信息。
里面的 mosqsub|2431-ubuntu 就是客户端 ID。这个函数就是干这个。

1502159601: New client connected from 127.0.0.1 as mosqsub|2431-ubuntu (c1, k60)

2、mosquitto_new

位于lib/mosquitto.c文件中

给struct mosquitto *mosq指针分配资源。再mosquitto_reinitialise,也就是给结构体指针里面的变量重新赋初始默认值

struct mosquitto *mosquitto_new(const char *id, bool clean_start, void *userdata)
{struct mosquitto *mosq = NULL;int rc;if(clean_start == false && id == NULL){errno = EINVAL;return NULL;}#ifndef WIN32signal(SIGPIPE, SIG_IGN);
#endifmosq = (struct mosquitto *)mosquitto__calloc(1, sizeof(struct mosquitto));if(mosq){mosq->sock = INVALID_SOCKET;
#ifdef WITH_THREADINGmosq->thread_id = pthread_self();
#endifmosq->sockpairR = INVALID_SOCKET;mosq->sockpairW = INVALID_SOCKET;rc = mosquitto_reinitialise(mosq, id, clean_start, userdata);if(rc){mosquitto_destroy(mosq);if(rc == MOSQ_ERR_INVAL){errno = EINVAL;}else if(rc == MOSQ_ERR_NOMEM){errno = ENOMEM;}return NULL;}}else{errno = ENOMEM;}return mosq;
}

3、mosquitto_connect

位于mosquitto-2.0.14/lib/connect.c文件中

int mosquitto_connect(struct mosquitto *mosq, const char *host, int port, int keepalive)
{return mosquitto_connect_bind(mosq, host, port, keepalive, NULL);
}int mosquitto_connect_bind(struct mosquitto *mosq, const char *host, int port, int keepalive, const char *bind_address)
{return mosquitto_connect_bind_v5(mosq, host, port, keepalive, bind_address, NULL);
}int mosquitto_connect_bind_v5(struct mosquitto *mosq, const char *host, int port, int keepalive, const char *bind_address, const mosquitto_property *properties)
{int rc;if(bind_address){rc = mosquitto_string_option(mosq, MOSQ_OPT_BIND_ADDRESS, bind_address);if(rc) return rc;}mosquitto_property_free_all(&mosq->connect_properties);if(properties){rc = mosquitto_property_check_all(CMD_CONNECT, properties);if(rc) return rc;rc = mosquitto_property_copy_all(&mosq->connect_properties, properties);if(rc) return rc;mosq->connect_properties->client_generated = true;}rc = mosquitto__connect_init(mosq, host, port, keepalive);if(rc) return rc;mosquitto__set_state(mosq, mosq_cs_new);return mosquitto__reconnect(mosq, true);
}

4、mosquitto_discnnect

位于mosquitto-2.0.14/lib/connect.c文件中

int mosquitto_disconnect(struct mosquitto *mosq)
{return mosquitto_disconnect_v5(mosq, 0, NULL);
}int mosquitto_disconnect_v5(struct mosquitto *mosq, int reason_code, const mosquitto_property *properties)
{const mosquitto_property *outgoing_properties = NULL;mosquitto_property local_property;int rc;if(!mosq) return MOSQ_ERR_INVAL;if(mosq->protocol != mosq_p_mqtt5 && properties) return MOSQ_ERR_NOT_SUPPORTED;if(reason_code < 0 || reason_code > UINT8_MAX) return MOSQ_ERR_INVAL;if(properties){if(properties->client_generated){outgoing_properties = properties;}else{memcpy(&local_property, properties, sizeof(mosquitto_property));local_property.client_generated = true;local_property.next = NULL;outgoing_properties = &local_property;}rc = mosquitto_property_check_all(CMD_DISCONNECT, outgoing_properties);if(rc) return rc;}mosquitto__set_state(mosq, mosq_cs_disconnected);if(mosq->sock == INVALID_SOCKET){return MOSQ_ERR_NO_CONN;}else{return send__disconnect(mosq, (uint8_t)reason_code, outgoing_properties);}
}

5、mosquitto_destroy

释放线程资源,摧毁线程锁,释放上下文中的资源

void mosquitto__destroy(struct mosquitto *mosq)
{if(!mosq) return;#ifdef WITH_THREADING
#  ifdef HAVE_PTHREAD_CANCELif(mosq->threaded == mosq_ts_self && !pthread_equal(mosq->thread_id, pthread_self())){pthread_cancel(mosq->thread_id);pthread_join(mosq->thread_id, NULL);mosq->threaded = mosq_ts_none;}
#  endifif(mosq->id){/* If mosq->id is not NULL then the client has already been initialised* and so the mutexes need destroying. If mosq->id is NULL, the mutexes* haven't been initialised. */pthread_mutex_destroy(&mosq->callback_mutex);pthread_mutex_destroy(&mosq->log_callback_mutex);pthread_mutex_destroy(&mosq->state_mutex);pthread_mutex_destroy(&mosq->out_packet_mutex);pthread_mutex_destroy(&mosq->current_out_packet_mutex);pthread_mutex_destroy(&mosq->msgtime_mutex);pthread_mutex_destroy(&mosq->msgs_in.mutex);pthread_mutex_destroy(&mosq->msgs_out.mutex);pthread_mutex_destroy(&mosq->mid_mutex);}
#endifif(mosq->sock != INVALID_SOCKET){net__socket_close(mosq);}message__cleanup_all(mosq);will__clear(mosq);
#ifdef WITH_TLSif(mosq->ssl){SSL_free(mosq->ssl);}if(mosq->ssl_ctx){SSL_CTX_free(mosq->ssl_ctx);}mosquitto__free(mosq->tls_cafile);mosquitto__free(mosq->tls_capath);mosquitto__free(mosq->tls_certfile);mosquitto__free(mosq->tls_keyfile);if(mosq->tls_pw_callback) mosq->tls_pw_callback = NULL;mosquitto__free(mosq->tls_version);mosquitto__free(mosq->tls_ciphers);mosquitto__free(mosq->tls_psk);mosquitto__free(mosq->tls_psk_identity);mosquitto__free(mosq->tls_alpn);
#endifmosquitto__free(mosq->address);mosq->address = NULL;mosquitto__free(mosq->id);mosq->id = NULL;mosquitto__free(mosq->username);mosq->username = NULL;mosquitto__free(mosq->password);mosq->password = NULL;mosquitto__free(mosq->host);mosq->host = NULL;mosquitto__free(mosq->bind_address);mosq->bind_address = NULL;mosquitto_property_free_all(&mosq->connect_properties);packet__cleanup_all_no_locks(mosq);packet__cleanup(&mosq->in_packet);if(mosq->sockpairR != INVALID_SOCKET){COMPAT_CLOSE(mosq->sockpairR);mosq->sockpairR = INVALID_SOCKET;}if(mosq->sockpairW != INVALID_SOCKET){COMPAT_CLOSE(mosq->sockpairW);mosq->sockpairW = INVALID_SOCKET;}
}

6、mosquitto_lib_cleanup

将mosquitto_lib_init函数开启的各项服务关闭,释放一些使用到的内存空间

int mosquitto_lib_cleanup(void)
{if (init_refcount == 1) {net__cleanup();}if (init_refcount > 0) {--init_refcount;}return MOSQ_ERR_SUCCESS;
}

网络循环函数

1、mosquitto_loop

位于mosquitto-2.0.14/lib/loop.c文件中

int mosquitto_loop(struct mosquitto *mosq, int timeout, int max_packets)
{
#ifdef HAVE_PSELECTstruct timespec local_timeout;
#elsestruct timeval local_timeout;
#endiffd_set readfds, writefds;int fdcount;int rc;char pairbuf;int maxfd = 0;time_t now;time_t timeout_ms;if(!mosq || max_packets < 1) return MOSQ_ERR_INVAL;
#ifndef WIN32if(mosq->sock >= FD_SETSIZE || mosq->sockpairR >= FD_SETSIZE){return MOSQ_ERR_INVAL;}
#endifFD_ZERO(&readfds);FD_ZERO(&writefds);if(mosq->sock != INVALID_SOCKET){maxfd = mosq->sock;FD_SET(mosq->sock, &readfds);pthread_mutex_lock(&mosq->current_out_packet_mutex);pthread_mutex_lock(&mosq->out_packet_mutex);if(mosq->out_packet || mosq->current_out_packet){FD_SET(mosq->sock, &writefds);}
#ifdef WITH_TLSif(mosq->ssl){if(mosq->want_write){FD_SET(mosq->sock, &writefds);}else if(mosq->want_connect){/* Remove possible FD_SET from above, we don't want* for writing if we are still connecting, unless w* definitely set. The presence of outgoing packets* matter yet. */FD_CLR(mosq->sock, &writefds);}}
#endifpthread_mutex_unlock(&mosq->out_packet_mutex);pthread_mutex_unlock(&mosq->current_out_packet_mutex);}else{
#ifdef WITH_SRVif(mosq->achan){if(mosquitto__get_state(mosq) == mosq_cs_connect_srv){rc = ares_fds(mosq->achan, &readfds, &writefds);if(rc > maxfd){maxfd = rc;}}else{return MOSQ_ERR_NO_CONN;}}
#elsereturn MOSQ_ERR_NO_CONN;
#endif}if(mosq->sockpairR != INVALID_SOCKET){/* sockpairR is used to break out of select() before the timeout, o* call to publish() etc. */FD_SET(mosq->sockpairR, &readfds);if((int)mosq->sockpairR > maxfd){maxfd = mosq->sockpairR;}}timeout_ms = timeout;if(timeout_ms < 0){timeout_ms = 1000;}now = mosquitto_time();if(mosq->next_msg_out && now + timeout_ms/1000 > mosq->next_msg_out){timeout_ms = (mosq->next_msg_out - now)*1000;}if(timeout_ms < 0){/* There has been a delay somewhere which means we should have alre* sent a message. */timeout_ms = 0;}local_timeout.tv_sec = timeout_ms/1000;
#ifdef HAVE_PSELECTlocal_timeout.tv_nsec = (timeout_ms-local_timeout.tv_sec*1000)*1000000;
#elselocal_timeout.tv_usec = (timeout_ms-local_timeout.tv_sec*1000)*1000;
#endif#ifdef HAVE_PSELECTfdcount = pselect(maxfd+1, &readfds, &writefds, NULL, &local_timeout, NULL)
#elsefdcount = select(maxfd+1, &readfds, &writefds, NULL, &local_timeout);
#endifif(fdcount == -1){
#ifdef WIN32errno = WSAGetLastError();
#endifif(errno == EINTR){return MOSQ_ERR_SUCCESS;}else{return MOSQ_ERR_ERRNO;}}else{if(mosq->sock != INVALID_SOCKET){if(FD_ISSET(mosq->sock, &readfds)){rc = mosquitto_loop_read(mosq, max_packets);if(rc || mosq->sock == INVALID_SOCKET){return rc;}}if(mosq->sockpairR != INVALID_SOCKET && FD_ISSET(mosq->sock
#ifndef WIN32if(read(mosq->sockpairR, &pairbuf, 1) == 0){}
#elserecv(mosq->sockpairR, &pairbuf, 1, 0);
#endif/* Fake write possible, to stimulate output write e* we didn't ask for it, because at that point the* other command wasn't present. */if(mosq->sock != INVALID_SOCKET)FD_SET(mosq->sock, &writefds);}if(mosq->sock != INVALID_SOCKET && FD_ISSET(mosq->sock, &wr
#ifdef WITH_TLSif(mosq->want_connect){rc = net__socket_connect_tls(mosq);if(rc) return rc;}else
#endif{rc = mosquitto_loop_write(mosq, max_packetsif(rc || mosq->sock == INVALID_SOCKET){return rc;}}}}
#ifdef WITH_SRVif(mosq->achan){ares_process(mosq->achan, &readfds, &writefds);}
#endif}return mosquitto_loop_misc(mosq);
}

2、mosquitto_loop_forever

位于mosquitto-2.0.14/lib/loop.c文件中

int mosquitto_loop_forever(struct mosquitto *mosq, int timeout, int max_packets)
{int run = 1;int rc = MOSQ_ERR_SUCCESS;unsigned long reconnect_delay;enum mosquitto_client_state state;if(!mosq) return MOSQ_ERR_INVAL;mosq->reconnects = 0;while(run){do{
#ifdef HAVE_PTHREAD_CANCELpthread_testcancel();
#endifrc = mosquitto_loop(mosq, timeout, max_packets);}while(run && rc == MOSQ_ERR_SUCCESS);/* Quit after fatal errors. */switch(rc){case MOSQ_ERR_NOMEM:case MOSQ_ERR_PROTOCOL:case MOSQ_ERR_INVAL:case MOSQ_ERR_NOT_FOUND:case MOSQ_ERR_TLS:case MOSQ_ERR_PAYLOAD_SIZE:case MOSQ_ERR_NOT_SUPPORTED:case MOSQ_ERR_AUTH:case MOSQ_ERR_ACL_DENIED:case MOSQ_ERR_UNKNOWN:case MOSQ_ERR_EAI:case MOSQ_ERR_PROXY:return rc;case MOSQ_ERR_ERRNO:break;}if(errno == EPROTO){return rc;}do{
#ifdef HAVE_PTHREAD_CANCELpthread_testcancel();
#endifrc = MOSQ_ERR_SUCCESS;state = mosquitto__get_state(mosq);if(state == mosq_cs_disconnecting || state == mosq_cs_discorun = 0;}else{if(mosq->reconnect_delay_max > mosq->reconnect_delaif(mosq->reconnect_exponential_backoff){reconnect_delay = mosq->reconnect_d}else{reconnect_delay = mosq->reconnect_d}}else{reconnect_delay = mosq->reconnect_delay;}if(reconnect_delay > mosq->reconnect_delay_max){reconnect_delay = mosq->reconnect_delay_max}else{mosq->reconnects++;}rc = interruptible_sleep(mosq, (time_t)reconnect_deif(rc) return rc;state = mosquitto__get_state(mosq);if(state == mosq_cs_disconnecting || state == mosq_run = 0;}else{rc = mosquitto_reconnect(mosq);}}}while(run && rc != MOSQ_ERR_SUCCESS);}return rc;
}

3、mosquitto_loop_start

位于mosquitto-2.0.14/lib/thread_mosq.c文件中

int mosquitto_loop_start(struct mosquitto *mosq)
{
#if defined(WITH_THREADING)if(!mosq || mosq->threaded != mosq_ts_none) return MOSQ_ERR_INVAL;mosq->threaded = mosq_ts_self;if(!pthread_create(&mosq->thread_id, NULL, mosquitto__thread_main, mosq)){
#if defined(__linux__)pthread_setname_np(mosq->thread_id, "mosquitto loop");
#elif defined(__NetBSD__)pthread_setname_np(mosq->thread_id, "%s", "mosquitto loop");
#elif defined(__FreeBSD__) || defined(__OpenBSD__)pthread_set_name_np(mosq->thread_id, "mosquitto loop");
#endifreturn MOSQ_ERR_SUCCESS;}else{return MOSQ_ERR_ERRNO;}
#elseUNUSED(mosq);return MOSQ_ERR_NOT_SUPPORTED;
#endif
}

4、mosquitto_loop_stop

位于mosquitto-2.0.14/lib/thread_mosq.c文件中

int mosquitto_loop_stop(struct mosquitto *mosq, bool force)
{
#if defined(WITH_THREADING)
#  ifndef WITH_BROKERchar sockpair_data = 0;
#  endifif(!mosq || mosq->threaded != mosq_ts_self) return MOSQ_ERR_INVAL;/* Write a single byte to sockpairW (connected to sockpairR) to break out* of select() if in threaded mode. */if(mosq->sockpairW != INVALID_SOCKET){
#ifndef WIN32if(write(mosq->sockpairW, &sockpair_data, 1)){}
#elsesend(mosq->sockpairW, &sockpair_data, 1, 0);
#endif}#ifdef HAVE_PTHREAD_CANCELif(force){pthread_cancel(mosq->thread_id);}
#endifpthread_join(mosq->thread_id, NULL);mosq->thread_id = pthread_self();mosq->threaded = mosq_ts_none;return MOSQ_ERR_SUCCESS;
#elseUNUSED(mosq);UNUSED(force);return MOSQ_ERR_NOT_SUPPORTED;
#endif
}

订阅发布函数

1、mosquitto_publish

位于mosquitto-2.0.14/lib/actions.c文件中

int mosquitto_publish(struct mosquitto *mosq, int *mid, const char *topic, int payloadlen, const void *payload, int qos, bool retain)
{return mosquitto_publish_v5(mosq, mid, topic, payloadlen, payload, qos, retain, NULL);
}int mosquitto_publish_v5(struct mosquitto *mosq, int *mid, const char *topic, int payloadlen, const void *payload, int qos, bool retain, const mosquitto_property *properties)
{struct mosquitto_message_all *message;uint16_t local_mid;const mosquitto_property *p;const mosquitto_property *outgoing_properties = NULL;mosquitto_property *properties_copy = NULL;mosquitto_property local_property;bool have_topic_alias;int rc;size_t tlen = 0;uint32_t remaining_length;if(!mosq || qos<0 || qos>2) return MOSQ_ERR_INVAL;if(mosq->protocol != mosq_p_mqtt5 && properties) return MOSQ_ERR_NOT_SUPPORTED;if(qos > mosq->max_qos) return MOSQ_ERR_QOS_NOT_SUPPORTED;if(!mosq->retain_available){retain = false;}if(properties){if(properties->client_generated){outgoing_properties = properties;}else{memcpy(&local_property, properties, sizeof(mosquitto_property));local_property.client_generated = true;local_property.next = NULL;outgoing_properties = &local_property;}rc = mosquitto_property_check_all(CMD_PUBLISH, outgoing_properties);if(rc) return rc;}if(!topic || STREMPTY(topic)){if(topic) topic = NULL;if(mosq->protocol == mosq_p_mqtt5){p = outgoing_properties;have_topic_alias = false;while(p){if(p->identifier == MQTT_PROP_TOPIC_ALIAS){have_topic_alias = true;break;}p = p->next;}if(have_topic_alias == false){return MOSQ_ERR_INVAL;}}else{return MOSQ_ERR_INVAL;}}else{tlen = strlen(topic);if(mosquitto_validate_utf8(topic, (int)tlen)) return MOSQ_ERR_MALFORMED_UTF8;if(payloadlen < 0 || payloadlen > (int)MQTT_MAX_PAYLOAD) return MOSQ_ERR_PAYLOAD_SIZE;if(mosquitto_pub_topic_check(topic) != MOSQ_ERR_SUCCESS){return MOSQ_ERR_INVAL;}}if(mosq->maximum_packet_size > 0){remaining_length = 1 + 2+(uint32_t)tlen + (uint32_t)payloadlen + property__get_length_all(outgoing_properties);if(qos > 0){remaining_length++;}if(packet__check_oversize(mosq, remaining_length)){return MOSQ_ERR_OVERSIZE_PACKET;}}local_mid = mosquitto__mid_generate(mosq);if(mid){*mid = local_mid;}if(qos == 0){return send__publish(mosq, local_mid, topic, (uint32_t)payloadlen, payload, (uint8_t)qos, retain, false, outgoing_properties, NULL, 0);}else{if(outgoing_properties){rc = mosquitto_property_copy_all(&properties_copy, outgoing_properties);if(rc) return rc;}message = mosquitto__calloc(1, sizeof(struct mosquitto_message_all));if(!message){mosquitto_property_free_all(&properties_copy);return MOSQ_ERR_NOMEM;}message->next = NULL;message->timestamp = mosquitto_time();message->msg.mid = local_mid;if(topic){message->msg.topic = mosquitto__strdup(topic);if(!message->msg.topic){message__cleanup(&message);mosquitto_property_free_all(&properties_copy);return MOSQ_ERR_NOMEM;}}if(payloadlen){message->msg.payloadlen = payloadlen;message->msg.payload = mosquitto__malloc((unsigned int)payloadlen*sizeof(uint8_t));if(!message->msg.payload){message__cleanup(&message);mosquitto_property_free_all(&properties_copy);return MOSQ_ERR_NOMEM;}memcpy(message->msg.payload, payload, (uint32_t)payloadlen*sizeof(uint8_t));}else{message->msg.payloadlen = 0;message->msg.payload = NULL;}message->msg.qos = (uint8_t)qos;message->msg.retain = retain;message->dup = false;message->properties = properties_copy;pthread_mutex_lock(&mosq->msgs_out.mutex);message->state = mosq_ms_invalid;rc = message__queue(mosq, message, mosq_md_out);pthread_mutex_unlock(&mosq->msgs_out.mutex);return rc;}
}

2、mosquitto_subscribe

位于mosquitto-2.0.14/lib/actions.c文件中

int mosquitto_subscribe(struct mosquitto *mosq, int *mid, const char *sub, int qos)
{return mosquitto_subscribe_multiple(mosq, mid, 1, (char *const *const)&sub, qos, 0, NULL);
}int mosquitto_subscribe_v5(struct mosquitto *mosq, int *mid, const char *sub, int qos, int options, const mosquitto_property *properties)
{return mosquitto_subscribe_multiple(mosq, mid, 1, (char *const *const)&sub, qos, options, properties);
}int mosquitto_subscribe_multiple(struct mosquitto *mosq, int *mid, int sub_count, char *const *const sub, int qos, int options, const mosquitto_property *properties)
{const mosquitto_property *outgoing_properties = NULL;mosquitto_property local_property;int i;int rc;uint32_t remaining_length = 0;int slen;if(!mosq || !sub_count || !sub) return MOSQ_ERR_INVAL;if(mosq->protocol != mosq_p_mqtt5 && properties) return MOSQ_ERR_NOT_SUPPORTED;if(qos < 0 || qos > 2) return MOSQ_ERR_INVAL;if((options & 0x30) == 0x30 || (options & 0xC0) != 0) return MOSQ_ERR_INVAL;if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;if(properties){if(properties->client_generated){outgoing_properties = properties;}else{memcpy(&local_property, properties, sizeof(mosquitto_property));local_property.client_generated = true;local_property.next = NULL;outgoing_properties = &local_property;}rc = mosquitto_property_check_all(CMD_SUBSCRIBE, outgoing_properties);if(rc) return rc;}for(i=0; i<sub_count; i++){if(mosquitto_sub_topic_check(sub[i])) return MOSQ_ERR_INVAL;slen = (int)strlen(sub[i]);if(mosquitto_validate_utf8(sub[i], slen)) return MOSQ_ERR_MALFORMED_UTF8;remaining_length += 2+(uint32_t)slen + 1;}if(mosq->maximum_packet_size > 0){remaining_length += 2 + property__get_length_all(outgoing_properties);if(packet__check_oversize(mosq, remaining_length)){return MOSQ_ERR_OVERSIZE_PACKET;}}if(mosq->protocol == mosq_p_mqtt311 || mosq->protocol == mosq_p_mqtt31){options = 0;}return send__subscribe(mosq, mid, sub_count, sub, qos|options, outgoing_properties);
}

3、mosquitto_unsubscribe

位于mosquitto-2.0.14/lib/actions.c文件中

int mosquitto_unsubscribe(struct mosquitto *mosq, int *mid, const char *sub)
{return mosquitto_unsubscribe_multiple(mosq, mid, 1, (char *const *const)&sub, NULL);
}int mosquitto_unsubscribe_v5(struct mosquitto *mosq, int *mid, const char *sub, const mosquitto_property *properties)
{return mosquitto_unsubscribe_multiple(mosq, mid, 1, (char *const *const)&sub, properties);
}int mosquitto_unsubscribe_multiple(struct mosquitto *mosq, int *mid, int sub_count, char *const *const sub, const mosquitto_property *properties)
{const mosquitto_property *outgoing_properties = NULL;mosquitto_property local_property;int rc;int i;uint32_t remaining_length = 0;int slen;if(!mosq) return MOSQ_ERR_INVAL;if(mosq->protocol != mosq_p_mqtt5 && properties) return MOSQ_ERR_NOT_SUPPORTED;if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;if(properties){if(properties->client_generated){outgoing_properties = properties;}else{memcpy(&local_property, properties, sizeof(mosquitto_property));local_property.client_generated = true;local_property.next = NULL;outgoing_properties = &local_property;}rc = mosquitto_property_check_all(CMD_UNSUBSCRIBE, outgoing_properties);if(rc) return rc;}for(i=0; i<sub_count; i++){if(mosquitto_sub_topic_check(sub[i])) return MOSQ_ERR_INVAL;slen = (int)strlen(sub[i]);if(mosquitto_validate_utf8(sub[i], slen)) return MOSQ_ERR_MALFORMED_UTF8;remaining_length += 2U + (uint32_t)slen;}if(mosq->maximum_packet_size > 0){remaining_length += 2U + property__get_length_all(outgoing_properties);if(packet__check_oversize(mosq, remaining_length)){return MOSQ_ERR_OVERSIZE_PACKET;}}return send__unsubscribe(mosq, mid, sub_count, sub, outgoing_properties);
}

mosquitto源码分析相关推荐

  1. mosquitto源码分析(一)

    关于mqtt.mosquito的技术交流,可入群:221779856  本文由逍遥子撰写,转发请标注原址: http://blog.csdn.net/houjixin/article/details/ ...

  2. mosquitto源码分析(六)

    本文由逍遥子撰写,转发请标注原址: http://write.blog.csdn.net/postedit/21465011 一.  Mosquito的辅助功能介绍 Mosquitto代码的辅助功能主 ...

  3. mosquitto源码分析(五)

    本文由逍遥子撰写,转发请标注原址: http://write.blog.csdn.net/postedit/21464519 3.2.1.poll机制简介 Poll机制是一种I/O多路转接(I/O m ...

  4. mosquitto源码分析(四)

    本文由逍遥子撰写,转发请标注原址: http://write.blog.csdn.net/postedit/21463965 3.1.2.使用订阅树发布消息 在Mosquito程序中,消息发送过程主要 ...

  5. mosquitto源码分析(三)

    本文由逍遥子撰写,转发请标注原址: http://write.blog.csdn.net/postedit/21462255 一.  Mosquito的核心功能分析 3.1.订阅树 Mosquitto ...

  6. mosquitto源码分析(二)

     本文由逍遥子撰写,转发请标注原址: http://write.blog.csdn.net/postedit/21462005 一.  Mosquito的数据结构 1)  struct mosquit ...

  7. mosquitto客户端对象“struct mosquitto *mosq”管理下篇(mosquitto2.0.15客户端源码分析之四)

    文章目录 前言 5 设置网络参数 5.1 客户端连接服务器使用的端口号 `mosq->port` 5.2 指定绑定的网络地址 `mosq->bind_address` 5.3 客户端连接服 ...

  8. 【Golang源码分析】Go Web常用程序包gorilla/mux的使用与源码简析

    目录[阅读时间:约10分钟] 一.概述 二.对比: gorilla/mux与net/http DefaultServeMux 三.简单使用 四.源码简析 1.NewRouter函数 2.HandleF ...

  9. SpringBoot-web开发(四): SpringMVC的拓展、接管(源码分析)

    [SpringBoot-web系列]前文: SpringBoot-web开发(一): 静态资源的导入(源码分析) SpringBoot-web开发(二): 页面和图标定制(源码分析) SpringBo ...

最新文章

  1. 算法与数据结构之二分查找
  2. 分类器评价与在R中的实现:收益图与提升图
  3. Android细节问题总结(二)
  4. java解压gz文件
  5. php openssl加密数据长度,PHP使用openssl解密数据(用mcrypt加密)
  6. eclipse弃坑记第一篇之在idea上配置Tomcat环境并创建Javaweb项目的详细步骤原创
  7. opencv配置原理
  8. (118)System Verilog 父类与子类对象复制(copy函数)详解
  9. 【报告分享】2020快手电商生态报告.pdf(附下载链接)
  10. 职友集 进化者机器人_麦克风解决方案将发掘交互式机器人的无限潜力
  11. 中国地图3D立体效果
  12. KVM的vCPU算法和Xen的Credit算法对比
  13. Linux编程基础之Makefile的使用
  14. [Qt]QLabel的显示圆形
  15. mysql got signal 11_轻松解决MYSQL错误mysqld got signal 11 ;
  16. java的class是什么意思_Java的class是什么意思?
  17. Java8用jmap输出jvm参数
  18. Java中哈希集(HashSet)概念,实现以及操作
  19. 软件测试周刊(第62期):无论你处于什么状态,你都要明白,生活的目标是健康和快乐。
  20. Hyperledger Fabric 2.x Java区块链应用

热门文章

  1. hdfs 进入文件夹_hdfs操作命令
  2. 红队隧道应用篇之CS正反向连接突破内网(二)
  3. 给爱学习的琪姐的题解
  4. 开发问题记录(这部分还是比较零碎)
  5. Apache Maven项目提供的EAR插件详解
  6. 【MQTT 5.0】协议 ——发布订阅模式、Qos、keepalive、连接认证、消息结构
  7. ASP.NET MVC - 路由
  8. 计算机中的公式概念与运用,机器学习概念,公式总结
  9. 经典 Learned Index 结构设计及其应用
  10. 5902 xjb模拟