管道机制的主体是系统调用pipe,但是由pipe所建立的管道的两端都在同一进程中,所以必须在fork的配合下,才能在父子进程之间或两个子进程之间建立起进程间的通信管道。

我们先来看系统调用pipe。

由于管道两端都是以(已打开)文件的形式出现在相关的进程中,在具体实现上也是作为匿名文件来实现的,所以pipe的代码与文件系统密切相关。

先看系统调用的入口sys_pipe,其代码如下:


/** sys_pipe() is the normal C calling standard for creating* a pipe. It's not the way Unix traditionally does this, though.*/
asmlinkage int sys_pipe(unsigned long * fildes)
{int fd[2];int error;error = do_pipe(fd);if (!error) {if (copy_to_user(fildes, fd, 2*sizeof(int)))error = -EFAULT;}return error;
}

这里由do_pipe建立起一个管道,通过作为调用参数的数组fd返回带抱着管道两端的两个已打开文件号,再由copy_to_user将数组fd复制到用户空间。显然,do_pipe是这个系统调用的主体,其代码在fs/pipe.c中,我们分段阅读:

sys_pipe=>do_pipe

int do_pipe(int *fd)
{struct qstr this;char name[32];struct dentry *dentry;struct inode * inode;struct file *f1, *f2;int error;int i,j;error = -ENFILE;f1 = get_empty_filp();if (!f1)goto no_files;f2 = get_empty_filp();if (!f2)goto close_f1;inode = get_pipe_inode();if (!inode)goto close_f12;error = get_unused_fd();if (error < 0)goto close_f12_inode;i = error;error = get_unused_fd();if (error < 0)goto close_f12_inode_i;j = error;

在文件系统系列中读者已经看到,进程对每个已打开文件的操作都是通过一个file数据结构进行的,只有在由同一进程按相同模式重复打开同一文件时才共享同一个数据结构。一个管道实际上就是一个无形(只存在于内存中)的文件,对这个文件的操作要通过两个已打开文件进行,分别代表该管道的两端。虽然最初创建时一个管道的两端都在同一进程中,但是在实际使用时却总是分别在两个不同的进程(通常是父子进程)中,所以,管道的两端不能共享同一个file数据结构,而要为之各分配一个file数据结构。代码汇总520行和524行调用get_empty_filp的目的就是为管道的两端f1和f2各分配一个file数据结构。get_empty_filp的代码以及file结构的定义可参看文件系统系列,这里不再重复。只是要指出,这个数据结构只是代表这个一个特定进程对某个文件操作的现状,而并不代表这个文件本身的状态。例如,结构中的成分f_pos就表示该进程在此文件中即将进行读写的起始位置,当不同的进程分别打开同一文件进行读写时,最初此位置都是0,以后就可能各不相同了。

同时,每个文件都是由一个inode数据结构代表的。虽然一个管道实际上是一个无形的文件,它也得要有一个inode数据结构。由于这个文件在创建管道之前并不存在,所以需要在创建管道时临时创建其一个inode结构,这就是代码中528行调用get_pipe_inode的目的。实际上,创建一个管道的过程主要就是创建这么一个文件的过程,函数get_pipe_inode的代码如下:

sys_pipe=>do_pipe=>get_pipe_inode


static struct inode * get_pipe_inode(void)
{struct inode *inode = get_empty_inode();if (!inode)goto fail_inode;if(!pipe_new(inode))goto fail_iput;PIPE_READERS(*inode) = PIPE_WRITERS(*inode) = 1;inode->i_fop = &rdwr_pipe_fops;inode->i_sb = pipe_mnt->mnt_sb;/** Mark the inode dirty from the very beginning,* that way it will never be moved to the dirty* list because "mark_inode_dirty()" will think* that it already _is_ on the dirty list.*/inode->i_state = I_DIRTY;inode->i_mode = S_IFIFO | S_IRUSR | S_IWUSR;inode->i_uid = current->fsuid;inode->i_gid = current->fsgid;inode->i_atime = inode->i_mtime = inode->i_ctime = CURRENT_TIME;inode->i_blksize = PAGE_SIZE;return inode;fail_iput:iput(inode);
fail_inode:return NULL;
}

先在478行分配一个空闲的inode数据结构。这是一个比较复杂的数据结构,我们已在文件系统中加以详细介绍。对于管道的创建和使用,我们关系的只是其中少数几个成分。第一个成分i_pipe是指向一个pipe_inode_info数据结构的指针,只有当由一个inode数据结构所代表的文件时用来实现一个管道的时候才使用它,否则就把这个指针设为NULL。pipe_inode_info的数据结构定义如下:

struct pipe_inode_info {wait_queue_head_t wait;char *base;unsigned int start;unsigned int readers;unsigned int writers;unsigned int waiting_readers;unsigned int waiting_writers;unsigned int r_counter;unsigned int w_counter;
};

同一文件中还定义了一些宏操作,下面我们就要用这些宏定义。

/* Differs from PIPE_BUF in that PIPE_SIZE is the length of the actualmemory allocation, whereas PIPE_BUF makes atomicity guarantees.  */
#define PIPE_SIZE       PAGE_SIZE#define PIPE_SEM(inode)        (&(inode).i_sem)
#define PIPE_WAIT(inode)    (&(inode).i_pipe->wait)
#define PIPE_BASE(inode)    ((inode).i_pipe->base)
#define PIPE_START(inode)   ((inode).i_pipe->start)
#define PIPE_LEN(inode)     ((inode).i_size)
#define PIPE_READERS(inode) ((inode).i_pipe->readers)
#define PIPE_WRITERS(inode) ((inode).i_pipe->writers)
#define PIPE_WAITING_READERS(inode) ((inode).i_pipe->waiting_readers)
#define PIPE_WAITING_WRITERS(inode) ((inode).i_pipe->waiting_writers)
#define PIPE_RCOUNTER(inode)    ((inode).i_pipe->r_counter)
#define PIPE_WCOUNTER(inode)    ((inode).i_pipe->w_counter)#define PIPE_EMPTY(inode) (PIPE_LEN(inode) == 0)
#define PIPE_FULL(inode)    (PIPE_LEN(inode) == PIPE_SIZE)
#define PIPE_FREE(inode)    (PIPE_SIZE - PIPE_LEN(inode))
#define PIPE_END(inode) ((PIPE_START(inode) + PIPE_LEN(inode)) & (PIPE_SIZE-1))
#define PIPE_MAX_RCHUNK(inode)  (PIPE_SIZE - PIPE_START(inode))
#define PIPE_MAX_WCHUNK(inode)  (PIPE_SIZE - PIPE_END(inode))

前面get_pipe_inode的代码中就引用了PIPE_READERS和PIPE_WRITERS。分配了inode数据结构以后,483行又通过pipe_new分配所需的缓冲区。这个函数的代码定义如下:

sys_pipe=>do_pipe=>get_pipe_inode=>pipe_new


struct inode* pipe_new(struct inode* inode)
{unsigned long page;page = __get_free_page(GFP_USER);if (!page)return NULL;inode->i_pipe = kmalloc(sizeof(struct pipe_inode_info), GFP_KERNEL);if (!inode->i_pipe)goto fail_page;init_waitqueue_head(PIPE_WAIT(*inode));PIPE_BASE(*inode) = (char*) page;PIPE_START(*inode) = PIPE_LEN(*inode) = 0;PIPE_READERS(*inode) = PIPE_WRITERS(*inode) = 0;PIPE_WAITING_READERS(*inode) = PIPE_WAITING_WRITERS(*inode) = 0;PIPE_RCOUNTER(*inode) = PIPE_WCOUNTER(*inode) = 1;return inode;
fail_page:free_page(page);return NULL;
}

这里先分配一个内存页面用作管道的缓冲区,再分配一个缓冲区用作pipe_inode_info数据结构。前面讲过,用来实现管道的文件是无形的,它并不出现在磁盘或其他的文件系统存储介质上,而只存在于内存空间,其他进程也无从打开或访问这个文件。所以,这个所谓文件实质上只是一个用作缓冲区的内存页面,只是把它纳入了文件系统的机制,借用了文件系统的各种数据结构和操作加以管理而已。

在前一章中已经讲过,inode数据结构中有个重要的成分i_fop,是指向一个file_operations数据结构的指针。在这个数据结构中给出了用于该文件的每种操作的函数指针。对于管道(见上面get_pipe_inode中的486行),这个数据结构是rdwr_pipe_fops,也是在pipe.c中定义的:


struct file_operations rdwr_pipe_fops = {llseek:       pipe_lseek,read:        pipe_read,write:        pipe_write,poll:        pipe_poll,ioctl:        pipe_ioctl,open:        pipe_rdwr_open,release: pipe_rdwr_release,
};

结合上列的宏定义,读者应该不难理解get_pipe_inode中自分配了inode结构以后的一些初始化操作,我们就不多讲了。值得注意的是,代码汇总并没有设置inode结构中的inode_operations结构指针i_op,所以该指针为0。可见,对于用来实现管道的inode并不允许对这里的inode进行常规操作,只有当inode代表着有形的文件时才使用。

从get_pipe_inode返回到do_pipe中时,必须的数据结构都已经齐全了。但是,还要为代表着管道两端的两个已打开文件分别分配打开文件号,这是通过调用get_unused_fd完成的。

让我们在do_pipe中继续往下看:

sys_pipe=>do_pipe

 error = -ENOMEM;sprintf(name, "[%lu]", inode->i_ino);this.name = name;this.len = strlen(name);this.hash = inode->i_ino; /* will go */dentry = d_alloc(pipe_mnt->mnt_sb->s_root, &this);if (!dentry)goto close_f12_inode_i_j;dentry->d_op = &pipefs_dentry_operations;d_add(dentry, inode);f1->f_vfsmnt = f2->f_vfsmnt = mntget(mntget(pipe_mnt));f1->f_dentry = f2->f_dentry = dget(dentry);/* read file */f1->f_pos = f2->f_pos = 0;f1->f_flags = O_RDONLY;f1->f_op = &read_pipe_fops;f1->f_mode = 1;f1->f_version = 0;/* write file */f2->f_flags = O_WRONLY;f2->f_op = &write_pipe_fops;f2->f_mode = 2;f2->f_version = 0;fd_install(i, f1);fd_install(j, f2);fd[0] = i;fd[1] = j;return 0;

在正常的情况下,每个文件都至少有一个目录项,代表这个文件的一个路径名;而每个目录项则只描述一个文件,在dentry数据结构中有个指针指向相应的inode结构。因此,在file数据结构中有个指针f_dentry指向所打开文件的目录项dentry数据结构,这样,从file结构开始就可以一路通到文件的inode结构。对于管道来说,由于文件是无形的,本来并不非得有个目录项不可。可是,在file数据结构中并没有直接指向相应inode结构的指针,一定要经过一个目录项中转一下才行。而inode结构又是各种文件操作的枢纽。这么一来,对于管道就也得有一个目录项了。所以代码中的547行调用d_alloc分配,然后通过d_add使已经分配的inode结构与这个目录项互相挂上钩,并且让两个已打开文件结构中的f_dentry指针都指向这个目录项。

对目录项的操作时通过一个dentry_operations数据结构定义的。具体到管道文件的目录项,这个数据结构是pipefs_dentry_operations,这是在550行中设置的,定义如下:

static struct dentry_operations pipefs_dentry_operations = {d_delete:   pipefs_delete_dentry,
};

就是说,对于管道的目录项只允许一种操作,那就是pipefs_delete_dentry,即把它删除。

对于管道的两端来说,管道是单向的,所以其f1一端设置成只读(O_RDONLY)。同时,两端的文件操作也分别设置成read_pipe_fops和write_pipe_fops,定义如下:

struct file_operations read_pipe_fops = {llseek:        pipe_lseek,read:        pipe_read,write:        bad_pipe_w,poll:        pipe_poll,ioctl:        pipe_ioctl,open:        pipe_read_open,release: pipe_read_release,
};struct file_operations write_pipe_fops = {llseek:        pipe_lseek,read:        bad_pipe_r,write:       pipe_write,poll:        pipe_poll,ioctl:        pipe_ioctl,open:        pipe_write_open,release:    pipe_write_release,
};

比较一下,就可以发现,在read_pipe_fops中的写操作函数为bad_pipe_w,而在write_pipe_fops中的读操作函数为bad_pipe_r,分别用来返回一个出错代码。读者可能会问,前面在管道的inode数据结构中将指针i_fop设置成指向rdwr_pipe_fops,那显然是双向的,这里不是有矛盾吗?其实不然。对于代表着管道两端的两个已打开文件来说,一个只能写而另一个只能读,这是事情的一个方面。可是,另一方面,这两个逻辑上的已打开文件都通向同一个inode、同一个物理上存在的文件,即用作管道的缓冲区;显然这个缓冲区应该既支持写又支持读,这才能使数据流通。读者在文件系统系列博客中看到,file结构中的指针f_op一般都来自inode结构中的指针i_fop,都指向同一个file_operations结构。而这里,对于管道这么一种特殊的文件,则使管道两端的file结构各自指向不同的file_operations结构,以此来确保一端只能读而另一端只能写。

管道时一种特殊的文件,它并不属于某种特定的文件系统(如ext2),而是自己构成一种独立的文件系统,也有自身的数据结构pipe_fs_type:

static DECLARE_FSTYPE(pipe_fs_type, "pipefs", pipefs_read_super,FS_NOMOUNT|FS_SINGLE);

系统在初始化时通过kern_mount安装这个特殊的文件系统,并让一个指针pipe_mnt指向安装时的vfsmount数据结构:

static struct vfsmount *pipe_mnt;

现在,代表着管道两端的两个文件既然都属于这个文件系统,它们各自的file结构中的指针f_vfsmnt就要指向安装该文件系统的vfsmount数据结构,而这个数据结构也就多了两个使用者,所以要调用mntget两次(见522行),使其使用计数加2。

最后,do_pipe中的568行和569行两个已打开文件结构跟分配的打开文件号挂上钩来(注意,打开文件号只在一个进程的范围内有效);并且将两个打开文件号填入数组fd中,使得fd[0]为管道读出端的打开文件号,而fd[1]为写入端的打开文件号。这个数组随后在sys_pipe中被复制到当前进程的用户空间。

显然,管道的两端在创建之初都在同一进程中,这样是起不到进程间通信的作用的。那么,怎样才能将管道用于进程间通信呢?下面就是一个典型的过程。

  1. 进程A在创建一个管道,创建完成时代表管道两端的两个已打开文件都在进程A中。
  2. 进程A通过fork创建出进程B,在fork的过程中进程A的打开文件表按原样复制到进程B中。
  3. 进程A关闭管道的读端,而进程B关闭管道的写段。于是,管道的写端在进程A中而读端在进程B中,称为父子进程之间的通信管道。
  4. 进程A又通过fork创建进程C,然后关闭其管道写端而与管道脱离关心,使得管道的写端在进程C中而读端在进程B中,成为两个兄弟进程之间的管道。
  5. 进程C和进程B各自通过exec执行各自的目标程序,并通过管道进行单向通信。

下面我们看一个实例:

#include<stdio.h>
int main()
{int child_B, child_C;int pipefds[2];/*pipefds[0] for read, pipefds[1] for write*/char * args1[] = {"/bin/wc", NULL};char * args2[] = {"/usr/bin/ls", "-l", NULL};/*process A*/pipe(pipefds);if (!(child_B = fork())) /*fork process B*/{/* Process B*/close(pipefds[1]);/* close the write end*//* redirect stdin*/close(0);dup2(pipefds[0], 0);close(pipefds[0]);/*exec the target*/execve("/usr/bin/wc", args1, NULL); /*no return if success*/printf("pid %d: I am back, something is wrong!\n", getpid());}/* process A continue*/close(pipefds[0]);/*close the read end */if (!(child_C = fork())) /*fork process C*/{/*process C*//*redirect stdout*/close(1);dup2(pipefds[1], 1);close(pipefds[1]);/*exec the target*/execve("/bin/ls", args2, NULL); /* no return if success*/printf("pid :%d: I am back, something is wrong!\n", getpid());}/*process A continue*/close(pipefds[1]);/*close the write end */wait4(child_B, NULL, 0, NULL); /* wait for process B to finish*/printf("Done!\n");return 0;
}

结果如下(ubuntu中运行):

程序中调用的dup2是个系统调用,它将一个已打开文件的file结构指针复制到由指定的打开文件号所对应的通道。在进程B中,先把打开文件号0(即标准输入)关闭,然后把管道的读端复制到文件号0所对应的通道,这就完成了对标准输入的重定向。但是原先的管道读端既然已经复制就没用用处了,所以也将其关闭。进程C的重定向与此相似,只不过所重定向的是标准输出、除此之蛙,就与前面所述的过程完全一样了。

从进程间通信的角度来说,这种标准输入和标准输出的重定向并非必须,直接使用管道原先的写端和读端也能在进程之间传递数据。但是,应该承认这种将标准输入输出重定向与管道结合使用的办法非常巧妙的,不这样就难以达到将可执行程序在启动执行时动态地加以组合的灵活性。另一方面,一旦将标准输入和标注输出分别重定向到管道的读端和写端以后,两个进程就都像对普通文件一样地读写。事实上,它们并不知道自己在读写的到底是一个文件、一个设备、还是一个管道?

但是,我们知道当读一个文件到达末尾的时候会碰到EOF,从而知道已经读完了,可是当从管道中读时应该读到什么时候为止呢?下面读者将会看到,向管道写入的进程在完成其使命以后就会关闭管道的写端,一旦管道没有了写端就相当于达到了文件的末尾。

从管道所传递数据的角度看,两端的两个进程之间是一种典型的生产者消费者关系。一旦生产者停止了生产并关闭了管道的写入端,消费者就没有东西可消费了,这时候就到了文件(管道)的末尾。在典型的情况下,生产者总是在完成了使命,调用exit之前关闭其所有的已打开文件,包括管道,而消费者则总是在得知已经到达了输入文件末尾时才调用exit,所以一般总是生产者调用exit之前,消费者调用exit在后。但是,在特殊的条件下,也会有消费者exit在前的情况发生(里偶然消费者进程发生了非法越界访问),而使得管道的读端关闭在前。在这种情况下内核会向生产者进程发出一个SIGPIPE信号,表示管道已经断裂,而生产者进程在接收到这种信号时通常就会调用exit。

下面,我们进一步看看对管道的关闭以及读、写操作的源代码,以加深对管道机制的了解和理解。

先看管道的关闭。当一个进程通过系统调用close关闭代表着管道一段的已打开文件时,在内核中经由如下的执行路线而达到fput:

sys_close=>filp_close=>fput

关于这条执行路线的详情可参阅文件系统系列博客。每当对一个已打开文件执行关闭操作时,在fput中将相应file数据结构中的共享计数减1。如果减1以后该共享计数变成了0,就进而通过具体文件系统提供的release操作,释放对dentry数据结构的使用,并释放file数据结构。

在最初打开一个文件时,内核为之分配一个file数据结构,并将共享计数设置成1.那么,在什么情况下这个共享计数会变成大于1,从而使得在一次调用fput后共享计数不变成0呢?

第一种情况是在fork一个进程的时候,读者在前面的博客中看到过do_fork中要调用一个函数copy_files;里面有个for循环,对所有已打开文件的file结构调用get_file递增其共享计数。所有,在fork出来一个子进程以后,若父进程先关闭一个已打开文件,便只会使其共享计数减1,而并不会使计数达到0,因而也就不会最终地关文件。

第二种情况是通过系统调用dup或dup2复制一个打开文件号。这与将同一个文件再打开一次是不同的,它只是使一个已经存在的file数据结构和本进程的另一个打开文件号建立联系而已。因此,前面所举的例子中将标准输入重定向到一个管道时,先是dup2然后close,并不会使其file结构中的共享计数变成0。

也就是说,只有在一个file结构的最后一个用户通向该结构的最后一条通路也被关闭时,才会调用具体文件系统提供的release操作并最终释放file数据结构。

函数fput所处理的对象是所关闭的文件相联系的dentry等数据结构。在do_pipe的代码中,我们已经看到管道两端的文件操作结构(file_operations结构)被分别设置成read_pipe_fops和write_pipe_ops。两个数据结构中对应于release的函数指针分别为pipe_read_release和pipe_write_release。所以,在fput采用这些指针来调用相应的函数时就会执行pipe_read_release或pipe_write_release、这两个函数都是通向pipe_release的中转站,或者说是pipe_release的外层,继续沿着pipe.c往下看:

sys_close=>filp_close=>fput=>pipe_read_release


static int
pipe_read_release(struct inode *inode, struct file *filp)
{return pipe_release(inode, 1, 0);
}static int
pipe_write_release(struct inode *inode, struct file *filp)
{return pipe_release(inode, 0, 1);
}

其主体为函数pipe_release,源代码在pipe.c中。

结合这两个函数以及前面所列的pipe_fs_i.h中的一些宏定义,pipe_release的代码就不难读懂了。

sys_close=>filp_close=>fput=>pipe_read_release=>pipe_release


static int
pipe_release(struct inode *inode, int decr, int decw)
{down(PIPE_SEM(*inode));PIPE_READERS(*inode) -= decr;PIPE_WRITERS(*inode) -= decw;if (!PIPE_READERS(*inode) && !PIPE_WRITERS(*inode)) {struct pipe_inode_info *info = inode->i_pipe;inode->i_pipe = NULL;free_page((unsigned long) info->base);kfree(info);} else {wake_up_interruptible(PIPE_WAIT(*inode));}up(PIPE_SEM(*inode));return 0;
}

就像在file结构中有个共享计数一样,在由inode->i_pipe所指向的pipe_inode_info结构中也有共享计数,而且有两个,一个是readership,一个是writers。这两个共享计数在创建管道时在get_pipe_inode中都被设置成1(见pipe.c:get_pipe_inode中的485行)。然后,当关闭管道的读端是,pipe_read_release调用pipe_release,使共享计数readers减1;而关闭管道的写端时则使writers减1.当二者都减到了0时,整个管道就完成了使命,此时应将用作缓冲区的存储页面以及pipe_inode_info数据结构释放。在常规的文件操作中,文件的inode存在于磁盘(或其他介质)上,所以在最后关闭时要将内存中的inode数据结构写回到磁盘上。但是,管道并不是常规意义上的文件,磁盘上并没有相应的索引节点,所以最后只是将分配的inode数据结构(以及dentry结构)释放了事,而并没有什么磁盘操作。这一点从用于管道的inode数据结构中的inode_operations结构指针为0可以看出。

再看管道的读操作。在典型的应用中,管道的读端总是处于一个循环中,通过系统调用read从管道中读,读了就处理,处理完又读。对管道的读操作,在内核中经过sys_read和数据结构read_pipe_fops中的函数指针而到达pipe_read。这个函数的代码在pipe.c中,让我我们逐段地往下看:

sys_read=>pipe_read

static ssize_t
pipe_read(struct file *filp, char *buf, size_t count, loff_t *ppos)
{struct inode *inode = filp->f_dentry->d_inode;ssize_t size, read, ret;/* Seeks are not allowed on pipes.  */ret = -ESPIPE;read = 0;if (ppos != &filp->f_pos)goto out_nolock;

这里44行的注释解说seek操作在管道上是不允许的,这当然是对的,事实上函数pipe_lseek只是返回一个出错代码。注意47行的检验所针对的只是参数ppos,那是个指针,必须指向file->f_pos本身。沿着pipe.c再往下看:

sys_read=>pipe_read

 /* Always return 0 on null read.  */ret = 0;if (count == 0)goto out_nolock;/* Get the pipe semaphore */ret = -ERESTARTSYS;if (down_interruptible(PIPE_SEM(*inode)))goto out_nolock;if (PIPE_EMPTY(*inode)) {
do_more_read:ret = 0;if (!PIPE_WRITERS(*inode))goto out;ret = -EAGAIN;if (filp->f_flags & O_NONBLOCK)goto out;for (;;) {PIPE_WAITING_READERS(*inode)++;pipe_wait(inode);PIPE_WAITING_READERS(*inode)--;ret = -ERESTARTSYS;if (signal_pending(current))goto out;ret = 0;if (!PIPE_EMPTY(*inode))break;if (!PIPE_WRITERS(*inode))goto out;}}

如果读的时候管道里已经有数据在缓冲区中,这一段程序就会被跳过了。可是,如果管道缓冲区中没有数据,那一版就要睡眠等待了,但是有两种例外的情况。第一种情况是管道的writers计数已经为0,也就是说已经没有生产者会向管道中写了,这时候当然不能再等待。第二种情况是管道创建时设置了标志位O_NONBLOCK,表示在读不到东西时,当前进程不应该被阻塞(也就是在睡眠中等待),而要立即返回。只要不属于这两种特殊情况,那就要通过pipe_wait在睡眠中等待了。函数pipe_wait的代码也在同一文件中:

sys_read=>pipe_read=>pipe_wait

/** We use a start+len construction, which provides full use of the * allocated memory.* -- Florian Coosmann (FGC)* * Reads with count = 0 should always return 0.* -- Julian Bradfield 1999-06-07.*//* Drop the inode semaphore and wait for a pipe event, atomically */
void pipe_wait(struct inode * inode)
{DECLARE_WAITQUEUE(wait, current);current->state = TASK_INTERRUPTIBLE;add_wait_queue(PIPE_WAIT(*inode), &wait);up(PIPE_SEM(*inode));schedule();remove_wait_queue(PIPE_WAIT(*inode), &wait);current->state = TASK_RUNNING;down(PIPE_SEM(*inode));
}

注意,与这里的up操作配对的是down_interruptible,是在pipe_read代码中的57行,一个在for循环外面,一个在for循环里面。实际上,pipe_read中的临界区时从57行至127行(见下面的代码),但是睡眠时必须要退出临界区,而到被唤醒后再进入临界区。为什么要把pipe_wait放在一个循环中呢?这是因为睡眠中的进程被唤醒的原因不一定就是有进程往管道中写,也可能是收到了信号。而且,即使是因为有进程往管道中写而唤醒,也不能保证每个被唤醒的进程都能读到数据,因为等待着从管道中读数据的进程可能不止一个。因此,要将睡眠等待的过程放在一个循环中,并且在唤醒以后还要再检验所等待的条件是否得到满足,以及是否发生了例外的情况。对于在生产者、消费者模型中消费者一方的等待过程,这是一种典型的设计。在正常的情况下,这个循环一般都是因为管道中有了数据而结束(见78和79行),于是具体从管道中读取数据的操作就开始了:

sys_read=>pipe_read

 /* Read what data is available.  */ret = -EFAULT;while (count > 0 && (size = PIPE_LEN(*inode))) {char *pipebuf = PIPE_BASE(*inode) + PIPE_START(*inode);ssize_t chars = PIPE_MAX_RCHUNK(*inode);if (chars > count)chars = count;if (chars > size)chars = size;if (copy_to_user(buf, pipebuf, chars))goto out;read += chars;PIPE_START(*inode) += chars;PIPE_START(*inode) &= (PIPE_SIZE - 1);PIPE_LEN(*inode) -= chars;count -= chars;buf += chars;}/* Cache behaviour optimization */if (!PIPE_LEN(*inode))PIPE_START(*inode) = 0;if (count && PIPE_WAITING_WRITERS(*inode) && !(filp->f_flags & O_NONBLOCK)) {/** We know that we are going to sleep: signal* writers synchronously that there is more* room.*/wake_up_interruptible_sync(PIPE_WAIT(*inode));if (!PIPE_EMPTY(*inode))BUG();goto do_more_read;}/* Signal writers asynchronously that there is more room.  */wake_up_interruptible(PIPE_WAIT(*inode));ret = read;
out:up(PIPE_SEM(*inode));
out_nolock:if (read)ret = read;return ret;
}

每个管道只有一个页面用作缓冲区,该页面时按唤醒缓冲区的方式来使用的。就是说,每当读写到了页面的末端就又要回到页面的始端(见下图),这样,管道中的数据就有可能要分两段读出,所以要由一个循环来完成。

结合本博客前面所列的宏定义,这段代码应该是不难理解的。循环结束以后的情况有以下几种可能:

  1. 读到了所要求的的长度,所以count减到了0,同时管道中的数据也正好读完了,所以管道中的数据长度变成了0。此时函数的返回值为所要求的的长度。
  2. 管道中的数据已经读完,但还没有达到所要求的的长度,函数返回实际读出的长度。
  3. 读到了所要求的的长度,但管道中的数据还有剩余,此时函数也是返回所要求的长度。

在前两种情况下,管道中的数据都已读完,但指示着下一次读写的起始点start,在不同的条件下有可能在页面中的任何位置上。可是,既然管道中已经空了,那就不如把起始点start设置到页面的开头,这样可以减少下一次读写必须分成两段进行的可能性,这就是108行和109行所做优化的目的。

由于管道的换乘区只限于一个页面,当生产者进程有大量数据要写时,每当写满了一个页面(分一段或两段)就得停下来睡眠等待,等到消费者进程从管道中读走了一些数据而腾出一些空间时才能继续。所以,消费者进程在读出了一些数据以后要唤醒可能正在睡眠中的生产者进程。最后,只要读出的长度不为0,就要更新inode的受访问时间印记。

所以这些操作,包括从管道中读出,复制到用户空间,更新inode的受访问时间印记等等,都是不能容许其他进程打扰的,所以都是放在临界区中进行。而57行处的down_interruptible和127行处的up正是界定了这样一个临界区。

与读操作相似,对管道的写操作也是在sys_write中通过file结构中的指针f_op找到file_operations数据结构write_pipe_fops,再通过其函数指针write调用pipe_write。这个函数也是在pipe.c中定义的,我们还是逐段来解读:

sys_write->pipe_write

static ssize_t
pipe_write(struct file *filp, const char *buf, size_t count, loff_t *ppos)
{struct inode *inode = filp->f_dentry->d_inode;ssize_t free, written, ret;/* Seeks are not allowed on pipes.  */ret = -ESPIPE;written = 0;if (ppos != &filp->f_pos)goto out_nolock;/* Null write succeeds.  */ret = 0;if (count == 0)goto out_nolock;

显然,这一段与pipe_read的开头一段完全相同。继续往下读:

sys_write->pipe_write

 ret = -ERESTARTSYS;if (down_interruptible(PIPE_SEM(*inode)))goto out_nolock;/* No readers yields SIGPIPE.  */if (!PIPE_READERS(*inode))goto sigpipe;/* If count <= PIPE_BUF, we have to make it atomic.  */free = (count <= PIPE_BUF ? count : 1);/* Wait, or check for, available space.  */if (filp->f_flags & O_NONBLOCK) {ret = -EAGAIN;if (PIPE_FREE(*inode) < free)goto out;} else {while (PIPE_FREE(*inode) < free) {PIPE_WAITING_WRITERS(*inode)++;pipe_wait(inode);PIPE_WAITING_WRITERS(*inode)--;ret = -ERESTARTSYS;if (signal_pending(current))goto out;if (!PIPE_READERS(*inode))goto sigpipe;}}

如果管道的读端已经全部关闭,那就表示已经没有消费者了。既然没有了消费者,那么生产者的存在以及继续生产就失去意义了;所以此时转到标号sigpipe处,向当前进程发送一个SIGPIPE信号,表示管道已经断裂:

sys_write->pipe_write

sigpipe:if (written)goto out;up(PIPE_SEM(*inode));send_sig(SIGPIPE, current, 0);return -EPIPE;
}

一般而言,进程在接收到SIGPIPE信号时会调用do_exit来结束其生命。读者也许注意到,这里其实是当前进程自己向自己发SIGPIPE信号。man么为何不直接调用do_exit呢?这里有两方面的考虑:一方面是使程序的结构更好,更整齐划一;另一方面也为进程通过信号机制来改变其在接收到SIGPIPE信号时的行为提供了更多的灵活性的可能性。

160行中的常数PIPE_BUF定义为4096.当要求写入的长度不超过这个数值时,内核保证写入操作的原子性,也就是说,一定要到管道缓冲区中足够容纳这块数据时才开始写。如果超过整个数值,就不保证其原子性了,这时候有多大空间就写多少字节,有一个字节的空间就写一个字节,余下的等消费者独奏一些字节以后以后再继续写。这就是第160行将变量free设置成count或者1的意义。注意变量free表示开始写入前缓冲区中至少要有这么多个空闲的字节,否则就要睡眠等待;所以只是在决定等待与否时使用,而一旦开始写入就不再使用了。读者可以对照前面pipe_read中的代码自行阅读这里的162-179行,应该不会有困难。

一旦生产者进程在等待中被消费者进程唤醒,并且缓冲区中有了足够的空间,或者一开始时缓冲区中就有足够的空间,具体的写入操作就开始了:

sys_write->pipe_write

 /* Copy into available space.  */ret = -EFAULT;while (count > 0) {int space;char *pipebuf = PIPE_BASE(*inode) + PIPE_END(*inode);ssize_t chars = PIPE_MAX_WCHUNK(*inode);if ((space = PIPE_FREE(*inode)) != 0) {if (chars > count)chars = count;if (chars > space)chars = space;if (copy_from_user(pipebuf, buf, chars))goto out;written += chars;PIPE_LEN(*inode) += chars;count -= chars;buf += chars;space = PIPE_FREE(*inode);continue;}ret = written;if (filp->f_flags & O_NONBLOCK)break;do {/** Synchronous wake-up: it knows that this process* is going to give up this CPU, so it doesnt have* to do idle reschedules.*/wake_up_interruptible_sync(PIPE_WAIT(*inode));PIPE_WAITING_WRITERS(*inode)++;pipe_wait(inode);PIPE_WAITING_WRITERS(*inode)--;if (signal_pending(current))goto out;if (!PIPE_READERS(*inode))goto sigpipe;} while (!PIPE_FREE(*inode));ret = -EFAULT;}/* Signal readers asynchronously that there is more data.  */wake_up_interruptible(PIPE_WAIT(*inode));inode->i_ctime = inode->i_mtime = CURRENT_TIME;mark_inode_dirty(inode);out:up(PIPE_SEM(*inode));
out_nolock:if (written)ret = written;return ret;......
}

首先,对照pipe_read中分两段读的情况,即使要求写入的长度小于PIPE_BUF时,也可能会要分两段来写,所以整个写入的过程也放在一个while循环中。另外,要求写入的长度大于PIPE_BUF时,还要分成几次来写,也就是写写入若干字节,然后睡眠等待消费者从缓冲区中读走一些字节而创造出一些空间,再继续写。这就是为什么要有209-223行的do-while循环的原因。这个循环与前面的睡眠等待循环略有不同,这就是当前进程被唤醒时,只检查缓冲区中是否有空间,而不问空间多大。为什么呢?因为此时的宗旨是有一个字节的空间就写一个字节,而既然消费者进程已经读走了若干字节,那么至少已经有一个字节的空间,可以进入while循环体的下一次循环了。对照pipe_read的代码,读者应该可以读懂上面这段代码而不会有太大的困难,我们把它留给读者作为练习。建议读者假设几种不同的数据长度来走过这段程序,并且在纸上记下几种不同情况下的执行路线。阅读时要注意202行的continue语句,当要求写入的数据长度不大于PIPE_BUF但需要分两段(不是两次)写入时,它使执行路线跳过后面的do-while循环。同时,还要注意185行中的宏定义PIPE_END,它使写入的位置pipe_buf回到页面的起点。

这样,在典型的情景下,生产者和消费者之间互相等待,互相唤醒,协调地向前发展,也就是说:

  • 对生产者而言,缓冲区有空间就往里写,并且唤醒可能正在等待着要从缓冲区中读数据的小费制;没有空间就要睡眠,等待消费者从缓冲区读走数据而腾出空间。
  • 对小辅助而言,缓冲区中有数据就读出,然后唤醒可能正在等待着要往缓冲区写的生产者。如果没有数据就睡眠,等待生产者往缓冲区中写数据。

一句话,管道两端的进程通过管道所形成的是典型的生产者消费者关系和运行模式。

管道与系统调用pipe相关推荐

  1. 1、编制实现管道通信的程序。使用系统调用 pipe()函数建立一条管道线,两个子进程分别向管道各写一句话。Child process 1 is sending a message

    管道创建函数 创建管道可以通过调用 pipe()来实现,表1列出了pipe()函数的语法要点. 表1 pipe()函数语法要点 所需头文件 #include <unistd.h> 函数原型 ...

  2. Linux中的pipe(管道)与named pipe(FIFO 命名管道)

    catalogue 1. pipe匿名管道 2. named pipe(FIFO)有名管道 1. pipe匿名管道 管道是Linux中很重要的一种通信方式,是把一个程序的输出直接连接到另一个程序的输入 ...

  3. linux 进程和线程或线程和线程之间通过管道通信(pipe)

    linux 进程和线程或线程和线程之间通过管道通信(pipe) 转自:http://blog.csdn.net/robertkun/article/details/8095331 线程间通信: [cp ...

  4. Java NIO 学习笔记(五)----路径、文件和管道 Path/Files/Pipe

    目录: Java NIO 学习笔记(一)----概述,Channel/Buffer Java NIO 学习笔记(二)----聚集和分散,通道到通道 Java NIO 学习笔记(三)----Select ...

  5. 命名管道(Named Pipe)服务

    命名管道(Named Pipe)服务 1  命名管道的名称解析 在Windows中,管道的名称遵循Windows统一命名规范(UNC,Universal Naming Convention). 命名管 ...

  6. 使用系统调用pipe建立一条管道线_使用Unixbench对服务器综合性能打分及测试结果...

    Unixbench是一个类unix系统(Unix,BSD,Linux)下的性能测试工具,一个开源工具,被广泛用与测试linux系统主机的性能,简称UB.Unixbench的主要测试项目有:系统调用.读 ...

  7. 使用系统调用pipe建立一条管道线_【Linux系统】Linux进程间通信

    作者:Vamei 出处:http://www.cnblogs.com/vamei 我们在Linux信号基础中已经说明,信号可以看作一种粗糙的进程间通信(IPC, interprocess commun ...

  8. Linux管道指令(pipe)与shell 重定向的区别

    2019独角兽企业重金招聘Python工程师标准>>> 多指令执行 符号 格式 作用 ; comd1;comd2  依次执行 && comd1 && ...

  9. linux通过管道的进程通信,linux 线程或进程之间通过管道通信(pipe)

    线程间通信: #include // printf #include // exit #include // pipe #include // strlen #include // pthread_c ...

  10. Linux管道命令(pipe)

    学习管道之前我们先了解一下linux的命令执行顺序 命令执行顺序控制 通常情况下,我们在终端只能执行一条命令,然后按下回车执行,那么如何执行多条命令呢? 顺序执行多条命令:command1;comma ...

最新文章

  1. HashMap之微代码解析-总结整理
  2. Navicat和DBeaver的查询快捷键
  3. 设计模式之Facade(外观)模式
  4. ubuntu19.10 安装搜狗输入法
  5. Riddle(2018 CCPC (秦皇岛站) I 题)
  6. HTML5公式插件,在HTML5中使用MathML数学公式
  7. wpf展开树节点_回归树分析与sklearn决策树案例,来玩一会
  8. 厦门GDP超过万亿需要多少年时间?
  9. Google AdSense实战宝典
  10. Fl Studio真的不如Cubase或者Logic Pro等电音软件专业吗?
  11. SVN报错 could not connect to server
  12. MAC编译:fatal error: ‘endian.h‘ file not found
  13. Altium Designer中的长度单位如何转换?
  14. 简单粗暴讲述自动化仓储系统
  15. kubectl template 一个例子
  16. 图片加水印怎么加?这篇文章告诉你
  17. 微信40029 code解决办法
  18. 电子学会2022年6月青少年软件编程(图形化)等级考试试卷(二级)答案解析
  19. 湖南大学的计算机网络,林亚平-湖大信息科学与工程学院
  20. html随机出现一张图片,图片随机飘动用html怎么做

热门文章

  1. 今天,你脸上还长痤疮吗?
  2. 模拟电路64(滤波电路)
  3. 今天电脑突然出现问题: 请安装 TCP/IP 协议 错误 10106 【已解决·】
  4. Nginx 转发配置
  5. 快速掌握SOLIDWORKS齿轮转动的应用
  6. 什么是DNS服务器?
  7. DATAGUARD手记(DUPLICATE)(四)
  8. @interface List
  9. 怎样运行一个php的项目,第一章 如何加载运行已发布的PHP项目
  10. 8.4和Apache Geronimo集成