#include<pthread.h>
#include "errors.h"typedef struct stage_tag{<span style="white-space:pre">           </span>//流水线的工作单元结构体pthread_mutex_t   mutex;<span style="white-space:pre">       </span>//保护当前工作单元数据的互斥锁pthread_cond_t         avail;<span style="white-space:pre">   </span>//等待当前工作单元存储数据可用的条件变量pthread_cond_t        ready;<span style="white-space:pre">   </span>//等待当前工作单元可处理新数据的条件变量int           data_ready;<span style="white-space:pre">      </span>//表示当前工作单元存放数据的状态(0表示data为过期数据,1表示data为有效数据)long           data;<span style="white-space:pre">        </span>//当前工作单元存放的数据pthread_t         thread;<span style="white-space:pre">      </span>//当前工作单元所在的线程IDstruct stage_tag    *next;<span style="white-space:pre">       </span>//流水线中指向下一工作单单元的指针
}stage_t;typedef struct pipe_tag{<span style="white-space:pre">            </span>//流水线结构体pthread_mutex_t    mutex;<span style="white-space:pre">       </span>//保护流水线加入新数据或读出流水线工作结果的数据stage_t       *head;<span style="white-space:pre">           </span>//流水线的第一个工作单原指针stage_t     *tail;<span style="white-space:pre">           </span>//流水线的最后一个工作单原的指针(保存数据经过流水线处理之后所得的结果数据)int         stages;<span style="white-space:pre">              </span>//流水线中工作单元的数量int       active;<span style="white-space:pre">              </span>//流水线中正在处理数据的工作单元的数量
}pipe_t;int pipe_send(stage_t *stage,long data){<span style="white-space:pre">                         </span>//向stage所指向的工作单原传送新数据data的函数int status;<span style="white-space:pre"> </span>//保存线程函数的调用状态(返回值为0表示调用成功,否则调用失败就打印错误信息)status = pthread_mutex_lock(&stage->mutex);<span style="white-space:pre">                 </span>//试图锁定stage指向的工作单原的互斥锁if(status != 0)return status;while(stage->data_ready){<span style="white-space:pre">                                    </span>//等待条件变量ready发送当前工作单原可处理新数据的信号status = pthread_cond_wait(&stage->ready,&stage->mutex); //<span style="font-family: Arial, Helvetica, sans-serif;">如果data_ready为1,即当前工作单原存储的数据为有效数据则等待处理完毕</span>if(status != 0){<span style="white-space:pre">                                         </span>//<span style="font-family: Arial, Helvetica, sans-serif;">否则,表示当前工作单元存储的数据为过期数据,可以处理新传入的数据data</span>pthread_mutex_unlock(&stage->mutex);return status;}}stage->data = data;<span style="white-space:pre">                                     </span>//更新当前工作单原存储的数据为datastage->data_ready = 1;<span style="white-space:pre">                                      </span>//更新后的数据为有效数据,data_ready设置为1status = pthread_cond_signal(&stage->avail);<span style="white-space:pre">                    </span>//并且发送avail信号通知等待中的线程该工作单元的数据为可用的有效数据if(status != 0){pthread_mutex_unlock(&stage->mutex);return status;}status = pthread_mutex_unlock(&stage->mutex);<span style="white-space:pre">               </span>//解锁当前工作单元的互斥锁return status;<span style="white-space:pre">                                                </span>//返回函数调用状态
}void *pipe_stage(void *arg){<span style="white-space:pre">                                    </span>//线程入口函数,arg为当前的工作单元结构体的指针stage_t *stage = (stage_t*)arg;<span style="white-space:pre">                              </span>//j将arg转换成stage_t*类型并赋值给stagestage_t *next_stage = stage->next;<span style="white-space:pre">                         </span>//将stage的下一个工作单元的指针赋值给stage_nextint status;<span style="white-space:pre">     </span>//同上的statusstatus = pthread_mutex_lock(&stage->mutex);<span style="white-space:pre">                  </span>//开始工作,试图锁定当前互斥锁if(status != 0)err_abort(status,"Lock pipe stage");while(true){<span style="white-space:pre">                                              </span>//循环处理当前工作单原内的数据while(stage->data_ready != 1){<span style="white-space:pre">                          </span>//等待条件变量avail发送当前数据可用的信号status = pthread_cond_wait(&stage->avail,&stage->mutex);//等待的开始时会先解锁绑定的互斥锁if(status != 0)err_abort(status,"Wait for previous stage");}pipe_send(next_stage, stage->data + 1);<span style="white-space:pre">                 </span>//将对当前数据工作(这里是对data+1)后的结果发送给下一个工作单元处理stage->data_ready = 0;<span style="white-space:pre">                                   </span>//数据传送给下一个工作单元后当前数矩即过期,data_ready设置为0status = pthread_cond_signal(&stage->ready);<span style="white-space:pre">               </span>//并发送ready信号通知等待中的线程,该工作单元可以接收并处理新的数据if(status != 0)err_abort(status,"Wake next stage");}
}int pipe_create(pipe_t * pipe,int stages){<span style="white-space:pre">                          </span>//创建工作流的函数 stages表示工作单元数量int pipe_index;<span style="white-space:pre">        </span>//当前创建的工作单原的索引stage_t **link = &pipe->head, *new_stage, *stage;//用于创建链表的各个指针变量int status;<span style="white-space:pre">   </span>//同上的statusstatus = pthread_mutex_init(&pipe->mutex,NULL);<span style="white-space:pre">              </span>//初始化保护工作流链表的互斥锁if(status != 0)err_abort(status , "Init pipe mutex");pipe->stages = stages;pipe->active = 0;<span style="white-space:pre">                                         </span>//当前正在处理数据的工作单元数初始化为0for(pipe_index = 0; pipe_index <= stages; pipe_index++){<span style="white-space:pre">            </span>//循环创建工作单元的数据节点new_stage = (stage_t*)malloc(sizeof(stage_t));<span style="white-space:pre">              </span>//分配内存空间if(new_stage == NULL)errno_abort("Allocate stage");status = pthread_mutex_init(&new_stage->mutex,NULL);<span style="white-space:pre">     </span>//下面是一些初始化工作if(status != 0)err_abort(status,"Init stage mutex");status = pthread_cond_init(&new_stage->avail,NULL);if(status != 0)err_abort(status,"Init avail condition");status = pthread_cond_init(&new_stage->ready,NULL);if(status != 0)err_abort(status, "Init ready condition");new_stage->data_ready = 0;*link = new_stage;link = &new_stage->next;}*link = (stage_t*)NULL;<span style="white-space:pre">  </span>pipe->tail = new_stage;for(stage = pipe->head;stage->next != NULL;stage = stage->next){<span style="white-space:pre">         </span>//为每个工作单元创建线程status = pthread_create(&stage->thread,NULL,pipe_stage,(void*)stage);if(status != 0)err_abort(status,"Create pipe stage");}return 0;
}int pipe_start(pipe_t *pipe, long value){<span style="white-space:pre">                   </span>//将新加入的数据发送给工作流处理int status;status = pthread_mutex_lock(&pipe->mutex);<span style="white-space:pre">          </span>//试图锁定互斥锁以加入新数据if(status != 0)err_abort(status,"Lock pipe mutex");pipe->active++;<span style="white-space:pre">                                   </span>//如果锁定了互斥锁,说明当前第一个工作单元可以工作,将工作的计数量加一status = pthread_mutex_unlock(&pipe->mutex);<span style="white-space:pre">        </span>//解锁互斥锁if(status != 0)err_abort(status, "Unlock pipe mutex");pipe_send(pipe->head,value);<span style="white-space:pre">                     </span>//将数据发送给第一个工作单元处理return 0;
}int pipe_result(pipe_t *pipe,long *result){<span style="white-space:pre">                 </span>//获取处理结果的函数stage_t *tail = pipe->tail;long value;<span style="white-space:pre">   </span>//保存结果数据int empty = 0;<span style="white-space:pre">     </span>//判断工作流中数据是否为空的flagint status;status = pthread_mutex_lock(&pipe->mutex);<span style="white-space:pre">            </span>//试图锁定流水线的互斥锁if(status != 0)err_abort(status, "Lock pipe mutex");if(pipe->active <= 0)<span style="white-space:pre">                                </span>//如果当前工作中的工作单元小于等于0empty = 1;<span style="white-space:pre">                                  </span>//将empty置为1elsepipe->active--;<span style="white-space:pre">                               </span>//否则减少一个工作中的工作单元数(因为取出来来一个数据嘛)status = pthread_mutex_unlock(&pipe->mutex);<span style="white-space:pre">      </span>//解锁流水线的互斥锁if(status != 0)err_abort(status, "Unlock pipe mutex");if(empty)<span style="white-space:pre">                                           </span>//如果流水线中的数据为空,则获取数据失败,函数直接返回0return 0;pthread_mutex_lock(&tail->mutex);<span style="white-space:pre">                  </span>//试图锁定最后一个工作单元的互斥锁while(!tail->data_ready)<span style="white-space:pre">                               </span>//等待最后一个工作单元发送其中存放的数据为可用的有效数矩的信号pthread_cond_wait(&tail->avail,&tail->mutex);*result = tail->data;<span style="white-space:pre">                                </span>//将最后一个工作单元中的有效数据取出存入result中tail->data_ready = 0;<span style="white-space:pre">                               </span>//最后一个工作单原的数据已取出,则其中的data为过期数据,将data_ready置为0pthread_cond_signal(&tail->ready);<span style="white-space:pre">                  </span>//并发送信号给其他正在等待中的线程,最后一个工作单元可以处理新的数据了pthread_mutex_unlock(&tail->mutex);<span style="white-space:pre">              </span>//锁定最后一个工作单元的互斥锁return 1;<span style="white-space:pre">                                           </span>//获取数据成功,返回1
}int main(int argc, char *argv[]){<span style="white-space:pre">                           </span>//主函数pipe_t my_pipe;<span style="white-space:pre">    </span>//定义工作流变量long value,result;<span style="white-space:pre"> </span>//value为需要处理的数据,result为处理结果int status;char line[128];<span style="white-space:pre">   </span>//输入缓存pipe_create(&my_pipe, 10);<span style="white-space:pre">                            </span>//创建新的工作流(默认创建10个工作单元的工作流)printf("Enter integer values, or \"=\" for next result\n");<span style="white-space:pre">  </span>//提示信息while(true){<span style="white-space:pre">                                      </span>//循环等待用户输入并处理数据printf("Data>");<span style="white-space:pre">    </span>//提示符if(fgets(line,sizeof(line),stdin) == NULL)exit(0);<span style="white-space:pre">   </span>//用户输入结束退出if(strlen(line) <= 1) continue;if(strlen(line) <= 2 && line[0] == '='){<span style="white-space:pre">              </span>//如果输入 = 表示读取流水线中的处理结果if(pipe_result(&my_pipe,&result))<span style="white-space:pre">            </span>//如果读取结果成功printf("Result is %ld\n",result);<span style="white-space:pre">           </span>//则将结果打印出来else<span style="white-space:pre">                                      </span>//否则读取失败printf("Pipe is empty\n");<span style="white-space:pre">                </span>//打印提示,工作流为空}else{if(sscanf(line,"%ld",&value) < 1)<span style="white-space:pre">                </span>//将缓存中的数据读出fprintf(stderr, "Enter an integer value\n");else pipe_start(&my_pipe,value);<span style="white-space:pre">               </span>//调用函数,处理新加入的数据}}
}

这个例程也是 POSIX多线程程序设计 一书中说明工作流方式的例程, 反复看了很多遍才理清楚其中的逻辑,特别是两个条件变量ready   avail  和  data_ready这个flag的关系.

有几点需要说明的:

1>因为工作流默认创建10个工作单元,每个工作单元独占一个线程(最后一个工作单元除外,它只负责保存处理结果),所以一共可以同时处理11个数据,如果输入更多数据,整个工作流中的工作单元都在等待下一个工作单元通过ready发出可以接收新数据的信号,而tail所指的工作单原则在等待主线程读出数据,而此时主线程因为加入来新数据而堵塞在等待head所指的工作单元通过ready发出可以接受新数据的信号,从而出现死锁现象,整个进程都被挂起。

2> 每个stage_t中的互斥锁只保护当前工作单元的数据,条件变量也是. 其中ready条件变量是由自己发出给上一个工作单元接收信号的,代表当前工作单原中的数据已经过期,通知上一个工作单原可以将新的数据交给其处理了; avail条件变量是由自己发出给下一个工作单原接收信号的,代表当前工作单元中的数据已经处理完毕,通知下一个数据单元可以拿去继续下一次处理来. 这可能有点绕,但也很好理解:B->C这两个工作单元,如果C中处理完毕的数据还未交给下一个工作单元,也即C中的数据未过期,则B将阻塞,等待C中的数据传送给下一个工作单元处理(此时C中的数据过期),并通过ready发送信号通知B可以将新的数据交给C处理来,这时B将解除阻塞,继续执行.如果C中的数据已经过期,而B中的数据还未处理好(比如没有新的数据加入,或者正在处理新的数据的过程中),则C会阻塞一直等待B通过avail发送信号通知C,B有新的处理好的数据(即有效数据)可以交给C处理来,此时C才会接触阻塞继续工作.    所以ready和avail这两个条件变量是相关联的,最后组合起来就是一个完整的工作流的信号传输体系.

3> 根据第二点就很容易说明第一点的原因来: 因为当前工作流中有11个数据而且从未读出,则最先加入工作流的数据d1在tail所指的工作单元G11中等待读出, 第二加入工作流的数据d2则在第十个工作单原G10中等待G11通过ready发送可以处理新数据的信号,所以G10所在的线程处于阻塞状态.而此时d3所在的G9也同样在等待G10发送ready信号以接收d3进行处理,所以G9所在的线程同样处于阻塞状态,依次类推,d11所在的G1在等待G2的ready信号,线程G1阻塞,因此,此时用户输入的第12个数据d12在主函数中调用pipe_start()欲进行处理时pipe_start()会将数据交给pipe_send(pipe->head,d12)处理,而pipe_send会阻塞在等待G1发出ready的信号的条件变量上,即主线程被阻塞,整个进程被挂起。

本人还在学习中,鉴于水平有限,如果有错误还请指教,谢谢!

POSIX多线程程序设计_流水线工作例程相关推荐

  1. 《POSIX多线程程序设计》读书笔记

    <POSIX多线程程序设计>读书笔记 一.      概述 1.    一个UNIX进程可以理解为一个线程加上地址空间.文件描述符和其他数据: 2.    多个线程可以共享一个地址空间,而 ...

  2. 《win32多线程程序设计》学习笔记

    写于2016年6月24日 " vim:fen:fdm=marker:fmr={{{,}}}:fdl=0:fdc=1:ts=2:sw=2:sts=2 "第一章 为什么千头万绪 {{{ ...

  3. 深入浅出Win32多线程程序设计

    引言 从单进程单线程到多进程多线程是操作系统发展的一种必然趋势,当年的DOS系统属于单任务操作系统,最优秀的成员员也只能通过驻留内存的方式实现所谓的"多任务",而如今的Win32操 ...

  4. c语言判断s1是否大于s2,C语言程序设计_复习资料一.doc

    C语言程序设计_复习资料一 -- <程序设计基础> 院(系) 班级 学号 姓名 试卷卷面成绩占课程考核成绩 %平时 成绩占 %课程考核成绩题号一二三四五六七八九十小计得分 得 分一.单项选 ...

  5. c语言中,x-y,'105',ab,7f8那个是正确的,C语言程序设计_第三章 数据.ppt

    C语言程序设计_第三章 数据 * 运算符功能 与运算量关系 要求运算量个数 要求运算量类型 运算符优先级别 结合方向 结果的类型 学习运算符应注意 * 基本算术运算符: + - * / % 结合方向: ...

  6. Linux Qt使用POSIX多线程条件变量、互斥锁(量)

    今天团建,但是文章也要写.酒要喝好,文要写美,方为我辈程序员的全才之路.嘎嘎 之前一直在看POSIX的多线程编程,上个周末结合自己的理解,写了一个基于Qt的用条件变量同步线程的例子.故此来和大家一起分 ...

  7. c语言编程杭电1008,C语言程序设计_杭州电子科技大学cyy1_3

    <C语言程序设计_杭州电子科技大学cyy1_3>由会员分享,可在线阅读,更多相关<C语言程序设计_杭州电子科技大学cyy1_3(8页珍藏版)>请在人人文库网上搜索. 1.1,可 ...

  8. java大作业设计_Java程序设计_大作业.doc

    Java程序设计_大作业.doc Java程序设计_大作业 专业:计算机科学与技术专业 学号:1245713131 姓名: 2014年12月10日 目录 作业内容:2 1.IPublisherDao接 ...

  9. c语言开发题库管理系统,c语言程序设计_题库管理系统.doc

    c语言程序设计_题库管理系统 程序设计基础课程设计报告 班 级: 计算机科学与技术1103班 姓 名: 杨广宇 指导教师: 胡宏涛 完成日期: 2012年9月6日 (题目) 1. 设计题目与要求 (简 ...

最新文章

  1. 微信小程序登录,后端获取信息的问题
  2. pycharm奇技淫巧 直接通过代码输出函数 refactor —— extract method
  3. activiti idea 请假流程_IDEA开发流程Activiti需要注意的一些坑
  4. Spring Boot 2.1.5 正式发布,1.5.x 即将结束使命!
  5. JavaIO15FileReader和FileWriter源码分析及介绍使用
  6. swift-自定义Alert
  7. 用C语言编写99乘法表
  8. winPE4.0制作过程
  9. JavaScript设计模式——状态模式
  10. 如何解决网页无法复制文字问题
  11. ecshop+ectouch LANP伪静态
  12. itunes更新失败卸载失败(AppleMobileDeviceSupport6464安装时回滚的解决办法)
  13. 我是不是该安静的走开
  14. git从远程仓库拉取指定日期版本的代码到本地
  15. 360权重是什么,360权重怎么查询
  16. Java中xmp标签的作用_html 中 xmp标记
  17. D3.js 中Bubble Chart详解
  18. python编程免费小说_使用Python开发小说下载器,不再为下载小说而发愁
  19. 基于百度paddlehub多种海洋鱼类的智能分类识别
  20. ROS .bashrc笔记

热门文章

  1. 1662_MIT 6.828 JOS check_page_free_list实现分析以及boot_alloc问题修复
  2. android 电话接通时震动
  3. tmodjs+artTemplate用法,简单的实例
  4. 普中科技HC6800-EM3 V3.0单片机开发板资料
  5. scala学习--面向对象(OOP)
  6. 自己觉得喜欢的2个项目,慢慢进步吧,呵呵
  7. python完全卸载教程
  8. microbit python编译器_micro:bit MakeCode 2020 (V3.0) 测试版功能介绍
  9. 修改texworks安装时的中文路径后,对texworks注册表数据值的修改方式
  10. TexLive+TexWorks安装