Linux编程之自定义消息队列
我这里要讲的并不是IPC中的消息队列,我要讲的是在进程内部实现自定义的消息队列,让各个线程的消息来推动整个进程的运动。进程间的消息队列用于进程与进程之间的通信,而我将要实现的进程内的消息队列是用于有序妥当处理来自于各个线程请求,避免一窝蜂的请求而导致消息的异常丢失。想想socket编程里的listen函数吧,里面要设置一个队列长度的参数,其实来自网络的请求已经排成一个请求队列了,只是这个队列是系统帮我们做好了,我们看不到而已。如果系统不帮我们做这个等待队列的话,那就需要我们程序员在应用层实现了。
- 自定义消息结构,并构造队列
- 一个线程负责依次从消息队列中取出消息,并处理该消息
- 多个线程产生事件,并将消息放进消息队列,等待处理
typedef struct Msg_Hdr_s { uint32 msg_type; uint32 msg_len; uint32 msg_src; uint32 msg_dst; }Msg_Hdr_t; typedef struct Msg_s { Msg_Hdr_t hdr; uint8 data[100]; } Msg_t;
- msg_type:标记消息类型,当消息接收者看到该msg_type后就知道他要干什么事了
- msg_len:消息长度,待扩展,暂时没用到(以后会扩展为变长消息)
- msg_src:消息的源地址,即消息的发起者
- msg_dst:消息的目的地,即消息的接受者
- data[100]:消息除去消息头外可以携带的信息量,定义为100字节
typedef struct Queue_s { int head; int rear; sem_t sem; Msg_t data[QUEUE_SIZE]; }Queue_t; int MsgQueueInit(Queue_t* Q) { if(!Q) { printf("Invalid Queue!\n"); return -1; } Q->rear = 0; Q->head = 0; sem_init(&Q->sem, 0, 1); return 0; } int MsgDeQueue(Queue_t* Q, Msg_t* msg) { if(!Q) { printf("Invalid Queue!\n"); return -1; } if(Q->rear == Q->head) //only one consumer,no need to lock head { printf("Empty Queue!\n"); return -1; } memcpy(msg, &(Q->data[Q->head]), sizeof(Msg_t)); Q->head = (Q->head+1)%QUEUE_SIZE; return 0; } int MsgEnQueue(Queue_t* Q, Msg_t* msg) { if(Q->head == (Q->rear+1)%QUEUE_SIZE) { printf("Full Queue!\n"); return -1; } sem_wait(&Q->sem); memcpy(&(Q->data[Q->rear]), msg, sizeof(Msg_t)); Q->rear = (Q->rear+1)%QUEUE_SIZE; sem_post(&Q->sem); return 0; }
- 队列中应加入信号量或锁来保证进队时的互斥访问,因为多个消息可能同时进队,互相覆盖其队列节点
- 这里的信号量仅用于进队而没用于出队,理由是消息处理者只有一个,不存在互斥的情形
三、构造消息处理者
if(pthread_create(&handler_thread_id, NULL, (void*)msg_handler, NULL)) { printf("create handler thread fail!\n"); return -1; } void msg_printer(Msg_t* msg) { if(!msg) { return; } printf("%s: I have recieved a message!\n", __FUNCTION__); printf("%s: msgtype:%d msg_src:%d dst:%d\n\n",__FUNCTION__,msg->hdr.msg_type,msg->hdr.msg_src,msg->hdr.msg_dst); } void msg_handler() { sleep(5); //let's wait 5s when starts while(1) { Msg_t msg; memset(&msg, 0 ,sizeof(Msg_t)); int res = MsgDeQueue((Queue_t*)&MsgQueue, &msg); if(res != 0) { sleep(10); continue; } msg_printer(&msg); sleep(1); } }
enum MSG_TYPE { GO_HOME, GO_TO_BED, GO_TO_LUNCH, GO_TO_CINAMA, GO_TO_SCHOOL, GO_DATEING, GO_TO_WORK,//6 }; void handler() { switch(msgtype) { case GO_HOME: go_home(); break; case GO_TO_BED: go_to_bed(); break; ....... } }
这里的handler就是一个简单的状态机了,根据给定的消息类型(事件)去做特定的事,推动状态机的转动。
四、构造消息生产者
if(pthread_create(&thread1_id, NULL, (void*)msg_sender1, NULL)) { printf("create thread1 fail!\n"); return -1; } if(pthread_create(&thread2_id, NULL, (void*)msg_sender2, NULL)) { printf("create thread2 fail!\n"); return -1; } if(pthread_create(&thread3_id, NULL, (void*)msg_sender3, NULL)) { printf("create thread3 fail!\n"); return -1; } void msg_sender1() { int i = 0; while(1) { if(i > 10) { i = 0; } Msg_t msg; msg.hdr.msg_type = i++; msg.hdr.msg_src = THREAD1; msg.hdr.msg_dst = HANDLER; MsgEnQueue((Queue_t*)&MsgQueue, &msg); printf("%s: Thread1 send a message!\n",__FUNCTION__); sleep(1); } } void msg_sender2() { int i = 0; while(1) { if(i > 10) { i = 0; } Msg_t msg; msg.hdr.msg_type = i++; msg.hdr.msg_src = THREAD2; msg.hdr.msg_dst = HANDLER; MsgEnQueue((Queue_t*)&MsgQueue, &msg); printf("%s: Thread2 send a message!\n",__FUNCTION__); sleep(1); } } void msg_sender3() { int i = 0; while(1) { if(i > 10) { i = 0; } Msg_t msg; msg.hdr.msg_type = i++; msg.hdr.msg_src = THREAD3; msg.hdr.msg_dst = HANDLER; MsgEnQueue((Queue_t*)&MsgQueue, &msg); printf("%s: Thread3 send a message!\n",__FUNCTION__); sleep(1); } }
这里我create了三个线程来模拟消息生产者,每个生产者每隔1秒往消息队列里写消息。
五、跑起来看看
1 #include <stdio.h> 2 #include <pthread.h> 3 #include <semaphore.h> 4 #include <unistd.h> 5 #include <string.h> 6 #include "msg_def.h" 7 8 Queue_t MsgQueue; 9 10 int main(int argc, char* argv[]) 11 { 12 int ret; 13 pthread_t thread1_id; 14 pthread_t thread2_id; 15 pthread_t thread3_id; 16 pthread_t handler_thread_id; 17 18 ret = MsgQueueInit((Queue_t*)&MsgQueue); 19 if(ret != 0) 20 { 21 return -1; 22 } 23 24 if(pthread_create(&handler_thread_id, NULL, (void*)msg_handler, NULL)) 25 { 26 printf("create handler thread fail!\n"); 27 return -1; 28 } 29 30 31 if(pthread_create(&thread1_id, NULL, (void*)msg_sender1, NULL)) 32 { 33 printf("create thread1 fail!\n"); 34 return -1; 35 } 36 37 if(pthread_create(&thread2_id, NULL, (void*)msg_sender2, NULL)) 38 { 39 printf("create thread2 fail!\n"); 40 return -1; 41 } 42 43 if(pthread_create(&thread3_id, NULL, (void*)msg_sender3, NULL)) 44 { 45 printf("create thread3 fail!\n"); 46 return -1; 47 } 48 49 50 while(1) 51 { 52 sleep(1); 53 } 54 55 return 0; 56 } 57 58 59 60 61 int MsgQueueInit(Queue_t* Q) 62 { 63 if(!Q) 64 { 65 printf("Invalid Queue!\n"); 66 return -1; 67 } 68 Q->rear = 0; 69 Q->head = 0; 70 sem_init(&Q->sem, 0, 1); 71 return 0; 72 } 73 74 int MsgDeQueue(Queue_t* Q, Msg_t* msg) 75 { 76 if(!Q) 77 { 78 printf("Invalid Queue!\n"); 79 return -1; 80 } 81 if(Q->rear == Q->head) //only one cosumer,no need to lock head 82 { 83 printf("Empty Queue!\n"); 84 return -1; 85 } 86 memcpy(msg, &(Q->data[Q->head]), sizeof(Msg_t)); 87 Q->head = (Q->head+1)%QUEUE_SIZE; 88 return 0; 89 90 } 91 92 int MsgEnQueue(Queue_t* Q, Msg_t* msg) 93 { 94 if(Q->head == (Q->rear+1)%QUEUE_SIZE) 95 { 96 printf("Full Queue!\n"); 97 return -1; 98 } 99 sem_wait(&Q->sem); 100 memcpy(&(Q->data[Q->rear]), msg, sizeof(Msg_t)); 101 Q->rear = (Q->rear+1)%QUEUE_SIZE; 102 sem_post(&Q->sem); 103 return 0; 104 } 105 106 void msg_printer(Msg_t* msg) 107 { 108 if(!msg) 109 { 110 return; 111 } 112 printf("%s: I have recieved a message!\n", __FUNCTION__); 113 printf("%s: msgtype:%d msg_src:%d dst:%d\n\n",__FUNCTION__,msg->hdr.msg_type,msg->hdr.msg_src,msg->hdr.msg_dst); 114 115 } 116 117 int msg_send() 118 { 119 120 Msg_t msg; 121 msg.hdr.msg_type = GO_HOME; 122 msg.hdr.msg_src = THREAD1; 123 msg.hdr.msg_dst = HANDLER; 124 return MsgEnQueue((Queue_t*)&MsgQueue, &msg); 125 126 } 127 128 void msg_handler() 129 { 130 sleep(5); //let's wait 5s when starts 131 while(1) 132 { 133 Msg_t msg; 134 memset(&msg, 0 ,sizeof(Msg_t)); 135 int res = MsgDeQueue((Queue_t*)&MsgQueue, &msg); 136 if(res != 0) 137 { 138 sleep(10); 139 continue; 140 } 141 msg_printer(&msg); 142 sleep(1); 143 } 144 } 145 146 147 void msg_sender1() 148 { 149 int i = 0; 150 while(1) 151 { 152 if(i > 10) 153 { 154 i = 0; 155 } 156 Msg_t msg; 157 msg.hdr.msg_type = i++; 158 msg.hdr.msg_src = THREAD1; 159 msg.hdr.msg_dst = HANDLER; 160 MsgEnQueue((Queue_t*)&MsgQueue, &msg); 161 printf("%s: Thread1 send a message!\n",__FUNCTION__); 162 sleep(1); 163 } 164 } 165 166 void msg_sender2() 167 { 168 int i = 0; 169 while(1) 170 { 171 if(i > 10) 172 { 173 i = 0; 174 } 175 Msg_t msg; 176 msg.hdr.msg_type = i++; 177 msg.hdr.msg_src = THREAD2; 178 msg.hdr.msg_dst = HANDLER; 179 MsgEnQueue((Queue_t*)&MsgQueue, &msg); 180 printf("%s: Thread2 send a message!\n",__FUNCTION__); 181 sleep(1); 182 } 183 } 184 185 void msg_sender3() 186 { 187 int i = 0; 188 while(1) 189 { 190 if(i > 10) 191 { 192 i = 0; 193 } 194 Msg_t msg; 195 msg.hdr.msg_type = i++; 196 msg.hdr.msg_src = THREAD3; 197 msg.hdr.msg_dst = HANDLER; 198 MsgEnQueue((Queue_t*)&MsgQueue, &msg); 199 printf("%s: Thread3 send a message!\n",__FUNCTION__); 200 sleep(1); 201 } 202 }
msg_def.h:
1 #include <stdio.h> 2 #include <pthread.h> 3 #include <semaphore.h> 4 5 typedef unsigned char uint8; 6 typedef unsigned short unit16; 7 typedef unsigned int uint32; 8 9 #define QUEUE_SIZE 1000 10 11 typedef struct Msg_Hdr_s 12 { 13 uint32 msg_type; 14 uint32 msg_len; 15 uint32 msg_src; 16 uint32 msg_dst; 17 }Msg_Hdr_t; 18 19 typedef struct Msg_s 20 { 21 Msg_Hdr_t hdr; 22 uint8 data[100]; 23 } Msg_t; 24 25 typedef struct Queue_s 26 { 27 int head; 28 int rear; 29 sem_t sem; 30 Msg_t data[QUEUE_SIZE]; 31 }Queue_t; 32 33 typedef struct Queue_s QueueNode; 34 35 enum MSG_TYPE 36 { 37 GO_HOME, 38 GO_TO_BED, 39 GO_TO_LUNCH, 40 GO_TO_CINAMA, 41 GO_TO_SCHOOL, 42 GO_DATEING, 43 GO_TO_WORK,//6 44 }; 45 46 enum SRC_ADDR 47 { 48 THREAD1, 49 THREAD2, 50 THREAD3, 51 HANDLER, 52 }; 53 54 55 int MsgQueueInit(Queue_t* Q); 56 int MsgDeQueue(Queue_t* Q, Msg_t* msg); 57 int MsgEnQueue(Queue_t* Q, Msg_t* msg); 58 void msg_handler(); 59 void msg_sender1(); 60 void msg_sender2(); 61 void msg_sender3(); 62 void msg_printer(Msg_t* msg); 63 int msg_send();
Linux编程之自定义消息队列相关推荐
- c语言系统编程八:Linux进程间通信之消息队列
Linux进程间通信之消息队列 一 消息队列概述 二 消息队列的特点 三 消息队列的创建和使用 3.1 获取系统唯一的key值 3.2 创建消息队列 3.3 查看消息队列和删除消息队列的shell命令 ...
- Linux的进程间通信-消息队列
Linux的进程间通信-消息队列 微博ID:orroz 微信公众号:Linux系统技术 前言 Linux系统给我们提供了一种可以发送格式化数据流的通信手段,这就是消息队列.使用消息队列无疑在某些场景的 ...
- 嵌入式linux内核oops,Linux编程时遇到Oops提示该如何排查?
各位工程师在Linux下开发程序时,有没有遇到由于系统中存在某些小故障而跳出了"Oops"提示的情况,此时你是如何排查故障?一行行的查看代码吗?其实不用那么复杂,本文将为你介绍一种 ...
- Linux运行项目部分空指针,技术文章—Linux编程时遇到Oops提示该如何排查?
各位工程师在Linux下开发程序时,有没有遇到由于系统中存在某些小故障而跳出了"Oops"提示的情况,此时你是如何排查故障?一行行的查看代码吗?其实不用那么复杂,本文将为你介绍一种 ...
- 牛人整理分享的面试知识:操作系统、计算机网络、设计模式、Linux编程,数据结构总结...
网站地址:http://www.itmian4.com 基础篇:操作系统.计算机网络.设计模式 一:操作系统 1. 进程的有哪几种状态,状态转换图,及导致转换的事件. 2. 进程与线程的区别. 3. ...
- arm linux udp 自发自收_嵌入式linux编程开发必备知识
嵌入式linux是嵌入式开发必不可少的一份子,在科技高速发展的今天,嵌入式已然已经成为了最热门的技术之一了.对于想要学习好嵌入式的学员来说,现在学习好linux是很有必要的,因为这个是嵌入式的核心.那 ...
- linux QT 结束当前进程_嵌入式linux编程开发必备知识
嵌入式linux是嵌入式开发必不可少的一份子,在科技高速发展的今天,嵌入式已然已经成为了最热门的技术之一了.对于想要学习好嵌入式的学员来说,现在学习好linux是很有必要的,因为这个是嵌入式的核心.那 ...
- 计算机基础(八):linux编程规范总结
linux编程规范小结 1.函数:(子)模块接口函数大写字母开头: AddUser() (子)模块内部调用函数小写字母开头:addUser() 接口函数/回调函数以' ...
- 嵌入式linux编程开发必备知识
嵌入式linux是嵌入式开发必不可少的一份子,在科技高速发展的今天,嵌入式已然已经成为了最热门的技术之一了.对于想要学习好嵌入式的学员来说,现在学习好linux是很有必要的,因为这个是嵌入式的核心.那 ...
最新文章
- 软件系统维护是一项不吸引人的工作_测试人员必须了解的软件测试工作规范
- CentOS 6.5 部署 Horizon
- windows server 2008 R2系统安装教程
- Ubuntu通过vnc连接Windows主机的问题解决
- bzoj 1643: [Usaco2007 Oct]Bessie's Secret Pasture 贝茜的秘密草坪(DP)
- windows bat 命令之%~dp0
- windows中bat批处理的注释语句
- 安卓Android Studio开发环境,无需连接外网
- Web开发分享qq、微信组件-百度分享
- cocos2dx 3.17海外sdk接入填坑全纪录 Appodeal(广告) SDK 接入(5)
- 【转】金蝶EAS BOS工作流开发(附带JAVA脚本)
- 安装vue环境,并新建Vue项目
- 1007 Problem H	A+B 输入输出练习VIII
- 这是我见过最接地气的PCB设计指南了!
- 20135203齐岳 信息安全系统设计基础期中总结
- 看见她力量丨中国铁建地产铁姐,内外多面,绽放光彩!
- 【数据结构】赫夫曼树
- java仪表盘_GitHub - Jensenczx/Dashboard: 通过Java实现的仪表盘
- 十进制转化为二进制与十六进制显示(汇编程序)
- Microsoft COCO: Common Objects in Context - 目标检测评估 指标(Detection Evaluation)