消息队列-发送消息_tx_queue_send

1,发送消息会插入到队列尾部。
2,如果消息队列有挂起的接收线程,发送消息时,可以直接把消息放到接收线程的缓冲中,这可以降低消息传递延时。
TX_THREAD线程控制块中tx_additional_suspend_info域用于存储接收线程缓冲区地址。
3,如果消息队列已满,发送线程调用_tx_queue_send(wait_option不为0)发送消息时,线程会被挂起到队列tx_queue_suspension_list。其它线程接收消息时,会恢复挂起的线程。

source_ptr参数指向发送消息的指针

UINT    _tx_queue_send(TX_QUEUE *queue_ptr, VOID *source_ptr, ULONG wait_option)
{TX_INTERRUPT_SAVE_AREAREG_1   UINT        status;                 /* Return status           */REG_2   TX_THREAD   *thread_ptr;            /* Working thread pointer  */REG_3   ULONG_PTR   source;                 /* Source pointer          */REG_4   ULONG_PTR   destination;            /* Destination pointer     *//* Disable interrupts to place message in the queue.  */#def 禁止中断,防止下面操作被打断TX_DISABLE/* Determine if there is room in the queue.  */if (queue_ptr -> tx_queue_available_storage){#def 队列中有可用空间/* Now determine if there is a thread waiting for a message.  */if (!queue_ptr -> tx_queue_suspension_list){#def 挂起list中没有挂起线程,直接把消息插入队列尾部/* No thread suspended while waiting for a message fromthis queue.  *//* Simply place the message in the queue.  *//* Reduce the amount of available storage.  */#def 可用容量减1queue_ptr -> tx_queue_available_storage--;/* Increase the enqueued count.  */#def 队列中当前消息个数加1queue_ptr -> tx_queue_enqueued++;/* Setup source and destination pointers.  */#def source_ptr为发送消息的指针   tx_queue_write为队列中可写空间首地址,把消息放到这个地址开始处source = (ULONG_PTR) source_ptr;destination = (ULONG_PTR) queue_ptr -> tx_queue_write;/* Copy the message into the queue.  */#def 根据消息大小tx_queue_message_size占用的word个数进行copy,这里采用了特殊处理,没有breakswitch (queue_ptr -> tx_queue_message_size){#def tx_queue_message_size为16是,需要copy 16此default:/* Copy a sixteen longword message into the queue.  */*destination++ =  *source++;*destination++ =  *source++;*destination++ =  *source++;*destination++ =  *source++;*destination++ =  *source++;*destination++ =  *source++;*destination++ =  *source++;*destination++ =  *source++;/* Fall through to copy the remaining eight longwords.  */#def tx_queue_message_size为8时,copy 8次,case TX_8_ULONG:/* Copy an eight longword message into the queue.  */*destination++ =  *source++;*destination++ =  *source++;*destination++ =  *source++;*destination++ =  *source++;/* Fall through to copy the remaining four longwords.  */case TX_4_ULONG:/* Copy a four longword message into the queue.  */*destination++ =  *source++;*destination++ =  *source++;/* Fall through to copy the remaining two longwords.  */case TX_2_ULONG:/* Copy a two longword message into the queue.  */*destination++ =  *source++;/* Fall through to copy the remaining longword.  */case TX_1_ULONG:/* Copy single longword message into the queue.  */*destination =  *source;}/* Adjust the write pointer.  */#def tx_queue_write 指针迁移queue_ptr -> tx_queue_write =queue_ptr -> tx_queue_write + queue_ptr -> tx_queue_message_size;/* Determine if we are at the end.  */#def tx_queue_write 指针如果到了队列末尾,那么重头开始if (queue_ptr -> tx_queue_write >= queue_ptr -> tx_queue_end)/* Yes, wrap around to the beginning.  */queue_ptr -> tx_queue_write =  queue_ptr -> tx_queue_start;/* Set status to success.  */status =  TX_SUCCESS;}else{#def 消息队列中有可用空间,并且挂起队列中线程等待,说明消息队列中没有任何消息,有消息的话不可能挂起线程等待#def 这种情况下,没有把消息放到消息队列tx_queue_write 处,而是直接放到了等待线程的缓冲区#def 等待线程在调度_tx_queue_receive时,发现队列中没有消息,就自我挂起到了tx_queue_suspension_list中,#def 并且把_tx_queue_receive 接收缓冲区指针存储到了TX_THREAD控制块的tx_additional_suspend_info域/* Thread suspended waiting for a message.  Remove it and copy this messageinto its storage area.  */#def 取tx_queue_suspension_list队列中最前面线程,FIFO规则,并没有按照优先级高低顺序thread_ptr =  queue_ptr -> tx_queue_suspension_list;/* See if this is the only suspended thread on the list.  */#def 从tx_queue_suspension_list 移除线程if (thread_ptr == thread_ptr -> tx_suspended_next){/* Yes, the only suspended thread.  *//* Update the head pointer.  */queue_ptr -> tx_queue_suspension_list =  TX_NULL;}else{/* At least one more thread is on the same expiration list.  *//* Update the list head pointer.  */queue_ptr -> tx_queue_suspension_list =  thread_ptr -> tx_suspended_next;/* Update the links of the adjacent threads.  */(thread_ptr -> tx_suspended_next) -> tx_suspended_previous =thread_ptr -> tx_suspended_previous;(thread_ptr -> tx_suspended_previous) -> tx_suspended_next =thread_ptr -> tx_suspended_next;}/* Decrement the suspension count.  */queue_ptr -> tx_queue_suspended_count--;/* Prepare for resumption of the thread.  *//* Clear cleanup routine to avoid timeout.  */thread_ptr -> tx_suspend_cleanup =  TX_NULL;/* Temporarily disable preemption.  */_tx_thread_preempt_disable++;/* Restore interrupts.  */TX_RESTORE/* Setup source and destination pointers.  */source = (ULONG_PTR) source_ptr;#def tx_additional_suspend_info中存储了之前接收线程的接收缓冲区指针destination = (ULONG_PTR) thread_ptr -> tx_additional_suspend_info;#def 把发送消息source_ptr copy 到tx_additional_suspend_info 接收缓冲区/* Copy the message into the thread's destination.  */switch (queue_ptr -> tx_queue_message_size){default:/* Copy a sixteen longword message into the thread's destination.  */*destination++ =  *source++;*destination++ =  *source++;*destination++ =  *source++;*destination++ =  *source++;*destination++ =  *source++;*destination++ =  *source++;*destination++ =  *source++;*destination++ =  *source++;/* Fall through to copy the remaining eight longwords.  */case TX_8_ULONG:/* Copy an eight longword message into the thread's destination.  */*destination++ =  *source++;*destination++ =  *source++;*destination++ =  *source++;*destination++ =  *source++;/* Fall through to copy the remaining four longwords.  */case TX_4_ULONG:/* Copy a four longword message into the thread's destination.  */*destination++ =  *source++;*destination++ =  *source++;/* Fall through to copy the remaining two longwords.  */case TX_2_ULONG:/* Copy a two longword message into the thread's destination.  */*destination++ =  *source++;/* Fall through to copy the remaining longword.  */case TX_1_ULONG:/* Copy single longword message into the thread's destination.  */*destination =  *source;}/* Deactivate the timeout timer if necessary.  */if (thread_ptr -> tx_thread_timer.tx_list_head){/* Deactivate the thread's timeout timer.  */_tx_timer_deactivate(&(thread_ptr -> tx_thread_timer));}else{/* Clear the remaining time to ensure timer doesn't get activated.  */thread_ptr -> tx_thread_timer.tx_remaining_ticks =  0;}/* Put return status into the thread control block.  */#def _tx_queue_receive函数要使用的变量, 这个挂起线程挂起,现在恢复后,作为_tx_queue_receive函数返回值thread_ptr -> tx_suspend_status =  TX_SUCCESS;/* Resume thread.  */if (_tx_thread_resume(thread_ptr))/* Preemption is required, transfer control back tosystem.  */_tx_thread_system_return();/* Return successful status.  */return (TX_SUCCESS);}}else{#def 消息队列已经没有可用空间 ,消息队列是满的,需要挂起这个发送线程,等有空间在发送消息#def 由于消息队列是满,肯定没有接收线程挂起。 所以挂起到tx_queue_suspension_list的一定是发送线程#def 挂起时,把要发送的消息指针source_ptr,存放到了tx_additional_suspend_info 中,所以tx_additional_suspend_info 这个变量可以存储接收线程的接收缓冲区指针,也可以存储发送消息指针/* Determine if the request specifies suspension.  */if (wait_option){#def wait_option不为0才挂起发送线程,为0,函数就直接返回失败/* Prepare for suspension of this thread.  *//* Pickup thread pointer.  */thread_ptr =  _tx_thread_current_ptr;/* Setup cleanup routine pointer.  */thread_ptr -> tx_suspend_cleanup =  _tx_queue_cleanup;/* Setup cleanup information, i.e. this queue controlblock and the source pointer.  */thread_ptr -> tx_suspend_control_block = (VOID_PTR) queue_ptr;#def 发送消息指针source_ptr暂存到TX_THREAD控制块的tx_additional_suspend_info 域thread_ptr -> tx_additional_suspend_info = (VOID_PTR) source_ptr;/* Setup suspension list.  */#def 把发送线程插入到tx_queue_suspension_listif (queue_ptr -> tx_queue_suspension_list){/* This list is not NULL, add current thread to the end. */thread_ptr -> tx_suspended_next =queue_ptr -> tx_queue_suspension_list;thread_ptr -> tx_suspended_previous =(queue_ptr -> tx_queue_suspension_list) -> tx_suspended_previous;((queue_ptr -> tx_queue_suspension_list) -> tx_suspended_previous) -> tx_suspended_next =thread_ptr;(queue_ptr -> tx_queue_suspension_list) -> tx_suspended_previous =   thread_ptr;}else{/* No other threads are suspended.  Setup the head pointer andjust setup this threads pointers to itself.  */queue_ptr -> tx_queue_suspension_list =  thread_ptr;thread_ptr -> tx_suspended_next =        thread_ptr;thread_ptr -> tx_suspended_previous =    thread_ptr;}/* Increment the suspended thread count.  */queue_ptr -> tx_queue_suspended_count++;/* Set the state to suspended.  */thread_ptr -> tx_state =    TX_QUEUE_SUSP;/* Set the suspending flag.  */#def 开始挂起过程,所以这里必须设置tx_suspending 标志,_tx_thread_suspend中会检查thread_ptr -> tx_suspending =  TX_TRUE;/* Temporarily disable preemption.  */_tx_thread_preempt_disable++;/* Save the timeout value.  */thread_ptr -> tx_thread_timer.tx_remaining_ticks =  wait_option;/* Restore interrupts.  */TX_RESTORE/* See if we need to start a timer.  */#def 不为TX_WAIT_FOREVER开启定时器,定时器超时会把这个线程从挂起队列移除if (wait_option != TX_WAIT_FOREVER){/* A timeout is required.  *//* Activate the thread timer for timeout.  */_tx_timer_activate(&(thread_ptr -> tx_thread_timer));}/* Call actual thread suspension routine.  */#def 自我挂起_tx_thread_suspend(thread_ptr);/* Return the completion status.  */#def 前面挂起,线程切换到其它线程,恢复后,从这里开始执行,返回值tx_suspend_status在_tx_queue_receive函数中设置return (thread_ptr -> tx_suspend_status);}else/* Immediate return, return error completion.  */status =  TX_QUEUE_FULL;}/* Restore interrupts.  */TX_RESTORE/* Return completion status.  */return (status);
}

Threadx 消息队列-发送消息_tx_queue_send相关推荐

  1. linux 消息队列_Linux消息队列

    消息队列,Unix的通信机制之一,可以理解为是一个存放消息(数据)容器.将消息写入消息队列,然后再从消息队列中取消息,一般来说是先进先出的顺序.可以解决两个进程的读写速度不同(处理数据速度不同),系统 ...

  2. 【Android 异步操作】手写 Handler ( 消息队列 MessageQueue | 消息保存到链表 | 从链表中获取消息 )

    文章目录 一.MessageQueue 消息队列存储消息 二.MessageQueue 消息队列取出消息 三.消息队列完整代码 一.MessageQueue 消息队列存储消息 Message 链表 : ...

  3. 阿里Java面试题剖析:为什么使用消息队列?消息队列有什么优点和缺点?

    面试题 为什么使用消息队列? 消息队列有什么优点和缺点? Kafka.ActiveMQ.RabbitMQ.RocketMQ 都有什么区别,以及适合哪些场景? 面试官心理分析 其实面试官主要是想看看: ...

  4. 消息队列_消息队列:kafka

    概念 kafka是一个分布式的基于发布/订阅模式的消息队列,主要用于大数据实时处理领域. 要理解kafka首先要有分布式的概念,要有消息队列的概念.分布式系统最大的优势就是解耦和削峰,这种情况下,A系 ...

  5. 消息队列面试 - 为什么使用消息队列,消息队列有什么优点和缺点?

    消息队列面试 - 为什么使用消息队列,消息队列有什么优点和缺点? 面试题 为什么使用消息队列? 消息队列有什么优点和缺点? Kafka.ActiveMQ.RabbitMQ.RocketMQ 都有什么区 ...

  6. POSIX标准总体分析 执行调度 消息传递 调度参数 进程调度函数 关闭消息队列 得到消息队列参数 设置调度参数 时钟和定时器  时钟和定时器函数 消息传递函数 打开消息队列 设置消息队列参数

    粉丝不过w 调度参数 一个调度参数结构 sched_param 包括了调度策略所支持的执行者所需要的调度参数,它在头文件<sched.h>中定义 执行者可根据规对该结构进行扩展 调度策略 ...

  7. 面试题:为什么使用消息队列?消息队列有什么优缺点?

    目录 1. 面试题 2. 面试官心理分析 3. 面试题剖析 3.1. 为什么使用消息队列 3.2. 消息队列有什么优缺点 3.3. Kafka.ActiveMQ.RabbitMQ.RocketMQ 有 ...

  8. 消息队列把消息弄丢了怎么办?

    消息队列会丢失消息吗? 答案是肯定的,所以对于业务严谨的数据,我们要确保其在消息队列中的安全,不能丢. 要想解决不丢的问题,首先要弄清楚 消息是怎么丢的呢? 丢消息的关键点有3个: Producer ...

  9. 消息队列把消息弄丢了怎么办

    消息队列把消息弄丢了怎么办 消息队列会丢失消息吗? 答案是肯定的,所以对于业务严谨的数据,我们要确保其在消息队列中的安全,不能丢. 要想解决不丢的问题,首先要弄清楚 消息是怎么丢的呢? 丢消息的关键点 ...

最新文章

  1. Elixir: 开发和发布Elixir库
  2. 服务器市场步步为营:Intel发布新款至强Xeon E5-4600v4四路处理器
  3. 【图像分割模型】感受野与分辨率的控制术—空洞卷积
  4. NeHe OpenGL课程 网址整理
  5. 图Graph--最短路径算法(Shortest Path Algorithm)
  6. python用outlook自动发邮件_python使用两种发邮件的方式smtp和outlook示例
  7. 12699元的iPhone 11 Pro Max物料成本却不足3500元,苹果赚疯了?
  8. 每首歌都保存着一份记忆
  9. Linux在文件第一列添加字段,linux – 如何将file1的每一列追加到file2的特定字段并创建一个新的输出文件?...
  10. 深入理解jQuery插件开发(讲解jQuery开发的好文)
  11. 坚持写博客,果然有人找吾进行技术合作
  12. 如何查找求职简历模板及pdf编辑器去水印
  13. Maven使用详解,非常详细
  14. 申请高德地图API【流程记录】
  15. leapmotion 导入 unity 3D 教程
  16. 卸载rasing,瑞星
  17. 程序员颈椎病康复秘籍,你值得拥有!
  18. 高校体育场地预约管理系统(Java Web毕业设计)
  19. 华为、小米轮番超越三星激动人心,且莫沸腾,三星和苹果优势仍在
  20. php判断域名是否解析源码,检测域名是否被墙API接口源码

热门文章

  1. 【ML】机器学习模型之PMML--概述
  2. 点定位(五):处理退化情况·续(Point Location: handle degenerate cases)
  3. 皮尔洛和c罗讲什么语言,够胆量!皮尔洛换下进球功臣C罗,赛后说出这么做的真正原因...
  4. 【深度学习】神经网络入门(最通俗的理解神经网络)
  5. Button英文字符自动大写的问题
  6. 策略游戏中的道理--金铲铲之战
  7. win7下格式化Linux格式硬盘,linux格式化硬盘 【应对指南】
  8. mac电脑怎么使用自带截图?
  9. mysql无法导出表解决方案
  10. [附源码]JAVA+ssm基于JAVA语言的国货美妆店管理系统(程序+Lw)