linux内核管道pipe实现详解

(文件系统暂时不是很了解,文件系统部分暂时不做解释,此文仅解释关键流程,系统调用部分请参考前面已经发布的文章,这里不做展开)

1、管道系统调用(SyS_pipe)

1.1、SyS_pipe

/** sys_pipe() is the normal C calling standard for creating* a pipe. It's not the way Unix traditionally does this, though.*/
SYSCALL_DEFINE2(pipe2, int __user *, fildes, int, flags)
{struct file *files[2];int fd[2];int error;error = __do_pipe_flags(fd, files, flags); // glibc管道创建函数原型"int pipe(int pipefd[2])",内核需要创建两个管道文件描述符(fd, file)if (!error) {if (unlikely(copy_to_user(fildes, fd, sizeof(fd)))) { // 拷贝文件描述符到用户空间fput(files[0]);fput(files[1]);put_unused_fd(fd[0]);put_unused_fd(fd[1]);error = -EFAULT;} else {fd_install(fd[0], files[0]); // 安装文件描述符,将fd放置到当前任务files->fd数组中,fd与file关联(fd是个整形数据,file包括了文件信息及操作函数信息,应用程系统调用时传的是fd,最终需要找的file来获取文件真正信息及读写函数等)fd_install(fd[1], files[1]); // 安装文件描述符,将fd放置到当前任务files->fd数组中}}return error;
}SYSCALL_DEFINE1(pipe, int __user *, fildes)
{return sys_pipe2(fildes, 0);
}

1.2、create_pipe_files


struct file {union {struct llist_node   fu_llist;struct rcu_head    fu_rcuhead;} f_u;struct path        f_path;struct inode     *f_inode;   /* cached value */const struct file_operations  *f_op; // 文件操作函数指针(read, write等)/** Protects f_ep_links, f_flags.* Must not be taken from IRQ context.*/spinlock_t      f_lock;atomic_long_t        f_count;unsigned int        f_flags;fmode_t         f_mode;struct mutex     f_pos_lock;loff_t           f_pos;struct fown_struct    f_owner;const struct cred   *f_cred;struct file_ra_state    f_ra;u64            f_version;
\#ifdef CONFIG_SECURITYvoid         *f_security;
\#endif/* needed for tty driver, and maybe others */void            *private_data;\#ifdef CONFIG_EPOLL/* Used by fs/eventpoll.c to link all the hooks to this file */struct list_head   f_ep_links;struct list_head f_tfile_llink;
\#endif /* #ifdef CONFIG_EPOLL */struct address_space   *f_mapping;
} __attribute__((aligned(4)));  /* lest something weird decides that 2 is OK */
static int __do_pipe_flags(int *fd, struct file **files, int flags)
{int error;int fdw, fdr;if (flags & ~(O_CLOEXEC | O_NONBLOCK | O_DIRECT))return -EINVAL;error = create_pipe_files(files, flags); // // 创建2个管道file文件结构体,主要绑定pipe_read、pipe_write即管道读写等相关函数if (error)return error;error = get_unused_fd_flags(flags); // 获取当前进程未使用的文件描述符作为读管道描述符if (error < 0)goto err_read_pipe;fdr = error;error = get_unused_fd_flags(flags); // 获取当前进程未使用的文件描述符作为写管道描述符if (error < 0)goto err_fdr;fdw = error;audit_fd_pair(fdr, fdw);fd[0] = fdr; // 返回给用户的管道文件描述符fd[1] = fdw; // 返回给用户的管道文件描述符return 0;err_fdr:put_unused_fd(fdr);err_read_pipe:fput(files[0]);fput(files[1]);return error;
}
int get_unused_fd_flags(unsigned flags)
{return __alloc_fd(current->files, 0, rlimit(RLIMIT_NOFILE), flags); // current是个宏,通过sp低13位清零获取到当前线程的thread_info,再通过thread_info获取到当前任务的task信息,task_struct->files
}
int create_pipe_files(struct file **res, int flags)
{int err;struct inode *inode = get_pipe_inode();struct file *f;struct path path;static struct qstr name = { .name = "" };if (!inode)return -ENFILE;err = -ENOMEM;path.dentry = d_alloc_pseudo(pipe_mnt->mnt_sb, &name);if (!path.dentry)goto err_inode;path.mnt = mntget(pipe_mnt);d_instantiate(path.dentry, inode);f = alloc_file(&path, FMODE_WRITE, &pipefifo_fops); // 创建file结构体(res[1]),用pipfifo_fops初始化if (IS_ERR(f)) {err = PTR_ERR(f);goto err_dentry;}f->f_flags = O_WRONLY | (flags & (O_NONBLOCK | O_DIRECT));f->private_data = inode->i_pipe;res[0] = alloc_file(&path, FMODE_READ, &pipefifo_fops); // 创建file结构体(res[0]),用pipfifo_fops初始化if (IS_ERR(res[0])) {err = PTR_ERR(res[0]);goto err_file;}path_get(&path);res[0]->private_data = inode->i_pipe;res[0]->f_flags = O_RDONLY | (flags & O_NONBLOCK);res[1] = f;return 0;err_file:put_filp(f);
err_dentry:free_pipe_info(inode->i_pipe);path_put(&path);return err;err_inode:free_pipe_info(inode->i_pipe);iput(inode);return err;
}

2、管道写入(SyS_write)

SYSCALL_DEFINE3(write, unsigned int, fd, const char __user *, buf,size_t, count)
{struct fd f = fdget_pos(fd); // 获取文件file等信息(file包括文件操作函数、文件信息等),对于应用程序,用户并不关注文件时管道还是设备,一律按找文件操作,有vfs来区分具体的文件操作ssize_t ret = -EBADF;if (f.file) {loff_t pos = file_pos_read(f.file); // 获取文件偏移ret = vfs_write(f.file, buf, count, &pos); // vfs写文件if (ret >= 0)file_pos_write(f.file, pos); // 有数据写入文件,更新文件偏移fdput_pos(f);}return ret;
}
ssize_t vfs_write(struct file *file, const char __user *buf, size_t count, loff_t *pos)
{ssize_t ret;if (!(file->f_mode & FMODE_WRITE)) // 写权限检查return -EBADF;if (!(file->f_mode & FMODE_CAN_WRITE)) // 写权限检查return -EINVAL;if (unlikely(!access_ok(VERIFY_READ, buf, count)))return -EFAULT;ret = rw_verify_area(WRITE, file, pos, count);if (ret >= 0) {count = ret;file_start_write(file);ret = __vfs_write(file, buf, count, pos); // 写文件if (ret > 0) {fsnotify_modify(file);add_wchar(current, ret);}inc_syscw(current);file_end_write(file);}return ret;
}
ssize_t __vfs_write(struct file *file, const char __user *p, size_t count,loff_t *pos)
{if (file->f_op->write)return file->f_op->write(file, p, count, pos); // new_sync_writeelse if (file->f_op->write_iter)return new_sync_write(file, p, count, pos);elsereturn -EINVAL;
}
static ssize_t new_sync_write(struct file *filp, const char __user *buf, size_t len, loff_t *ppos)
{struct iovec iov = { .iov_base = (void __user *)buf, .iov_len = len };struct kiocb kiocb;struct iov_iter iter;ssize_t ret;init_sync_kiocb(&kiocb, filp);kiocb.ki_pos = *ppos;iov_iter_init(&iter, WRITE, &iov, 1, len);ret = filp->f_op->write_iter(&kiocb, &iter); // pipe_writeBUG_ON(ret == -EIOCBQUEUED);if (ret > 0)*ppos = kiocb.ki_pos;return ret;
}
struct iov_iter {int type;size_t iov_offset;size_t count; // 需要写入管道的数据长度union {const struct iovec *iov; // iov->iov_base保存需要写入管道的数据,iov->iov_len保存需要写入管道的数据长度const struct kvec *kvec;const struct bio_vec *bvec;};unsigned long nr_segs;
};
static ssize_t
pipe_write(struct kiocb *iocb, struct iov_iter *from)
{struct file *filp = iocb->ki_filp;struct pipe_inode_info *pipe = filp->private_data;ssize_t ret = 0;int do_wakeup = 0;size_t total_len = iov_iter_count(from); // 需要写入管道的数据长度ssize_t chars;/* Null write succeeds. */if (unlikely(total_len == 0))return 0;__pipe_lock(pipe);if (!pipe->readers) { // 读管道端已经关闭,写管道无意义,返回-EPIPEsend_sig(SIGPIPE, current, 0);ret = -EPIPE;goto out;}/* We try to merge small writes */chars = total_len & (PAGE_SIZE-1); /* size of the last buffer */ // 不是PAGE_SIZE大小的数据if (pipe->nrbufs && chars != 0) {int lastbuf = (pipe->curbuf + pipe->nrbufs - 1) &(pipe->buffers - 1); // curbuf: the current pipe buffer entry,nrbufs: the number of non-empty pipe buffers in this pipe,buffers: total number of buffers (should be a power of 2),获取下一个可用buff,这条语句类似取余,假如管道buff是从0-9,有效数据起始buff是5,有效buff数据是6,那么存储管道数据的buff依次为"5,6,7,8,9,0",下一个可用的buff就是1((5 + 6)/10),把10位清0了,&运算符也是把二进制的高位清0了,实现了一个类似的循环链表操作struct pipe_buffer *buf = pipe->bufs + lastbuf; // 获取下一个可以buffer的地址(上一行代码只是获取了索引,类似数组的下标)const struct pipe_buf_operations *ops = buf->ops;int offset = buf->offset + buf->len; // offset: offset of data inside the @page,len: length of data inside the @page,目前数据在page中的有效起始地址 + 有效数据长度 = 下一个可存放数据的地址 (管道是从前往后读的,并没规定读写大小,有可能只读取了page的前一部分,中间部分尚未读取,但是写的时候必须从中间有效数据后继续写)if (ops->can_merge && offset + chars <= PAGE_SIZE) { // 当前需要写入的数据 + 已有的数据 没有超过PAGE_SIZE大小,可以拷贝到page里面ret = ops->confirm(pipe, buf);if (ret)goto out;ret = copy_page_from_iter(buf->page, offset, chars, from); // 从from(用户写的数据)拷贝到offset处if (unlikely(ret < chars)) {ret = -EFAULT; // 写的数据少于预期的,出错返回(前面已经计算空间足够,要是写入与预期不一致,肯定出问题了)goto out;}do_wakeup = 1;buf->len += ret; // 该page已经写入了ret字节数据,更新有效数据长度if (!iov_iter_count(from))goto out; // 拷贝之后,from里面已经没有数据需要写了,全部数据已经写入到管道了,不需要再写}}for (;;) {int bufs;if (!pipe->readers) {send_sig(SIGPIPE, current, 0);if (!ret)ret = -EPIPE;break;}bufs = pipe->nrbufs; // // for循换之前写小于PAGE_SIZE的数据,并没有写在新的page里面,此处获取当前管道有多少有效bufsif (bufs < pipe->buffers) { // 有效bufs小于管道总buffersint newbuf = (pipe->curbuf + bufs) & (pipe->buffers-1); // 获取下一有效buff index,前面已经解释了,此处实现的类似循环链表功能struct pipe_buffer *buf = pipe->bufs + newbuf; // 获取下一有效pipe_buffer地址(起始地址+索引)struct page *page = pipe->tmp_page;int copied;if (!page) {page = alloc_page(GFP_HIGHUSER);if (unlikely(!page)) {ret = ret ? : -ENOMEM;break;}pipe->tmp_page = page;}/* Always wake up, even if the copy fails. Otherwise* we lock up (O_NONBLOCK-)readers that sleep due to* syscall merging.* FIXME! Is this really true?*/do_wakeup = 1;copied = copy_page_from_iter(page, 0, PAGE_SIZE, from); // 前面获取buff的时候获取的是完全没有数据的buff,因此此处拷贝到0偏移处,PAGE_SIZE大小数据if (unlikely(copied < PAGE_SIZE && iov_iter_count(from))) {if (!ret)ret = -EFAULT; // from剩余数据不为0,但是拷贝的数据不足PAGE_SIZE,也就是空间足够但是数据写入失败,出错,退出拷贝break;}ret += copied; // 已拷贝的数据增加,需要返回给应用程序,告诉应用程序实际写了多少数据/* Insert it into the buffer array */buf->page = page; // buf->tmp_page -> buf->pagebuf->ops = &anon_pipe_buf_ops;buf->offset = 0; // 有效数据的偏移地址buf->len = copied; // 有效数据的长度buf->flags = 0;if (is_packetized(filp)) {buf->ops = &packet_pipe_buf_ops;buf->flags = PIPE_BUF_FLAG_PACKET;}pipe->nrbufs = ++bufs; // 管道有效bufs数目增加pipe->tmp_page = NULL;if (!iov_iter_count(from))break; // from没有数据,所有数据已经写入管道}if (bufs < pipe->buffers) // 有效bufs小于管道总buffers,还有bufs可以写数据,继续将用户数据写入管道continue;if (filp->f_flags & O_NONBLOCK) {if (!ret)ret = -EAGAIN; // 剩余bufs不够,但是是非阻塞方式调用,返回给应用程序-EAGAIN,表示让重新尝试,而不是失败break;}if (signal_pending(current)) {if (!ret)ret = -ERESTARTSYS;break;}if (do_wakeup) {wake_up_interruptible_sync_poll(&pipe->wait, POLLIN | POLLRDNORM); // 唤醒等待列表(此处有个疑问,管道通常是一端读一端写,是否存在多进程读写的情况,按照一端读,一端写的情况,被唤醒的肯定只有读进程,还没看到过多进程读写的情况,细节后续再看,暂时先写到这里)kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);do_wakeup = 0;}pipe->waiting_writers++; // buffer不够,等待buffer被读取后再继续写入,等待写入计数器加1,再其他任务读buffer数据之后唤醒等待写buffer的任务pipe_wait(pipe); // 设置当前任务状态为TASK_INTERRUPTIBLE,将当前任务添加到等待列表,释放互斥锁等,执行调度,切换到其他任务pipe->waiting_writers--; // 等待写计数器减1,重新尝试写管道}
out:__pipe_unlock(pipe);if (do_wakeup) {wake_up_interruptible_sync_poll(&pipe->wait, POLLIN | POLLRDNORM);kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);}if (ret > 0 && sb_start_write_trylock(file_inode(filp)->i_sb)) {int err = file_update_time(filp);if (err)ret = err;sb_end_write(file_inode(filp)->i_sb);}return ret;
}

3、管道读取(SyS_read)

SYSCALL_DEFINE3(read, unsigned int, fd, char __user *, buf, size_t, count)
{struct fd f = fdget_pos(fd);ssize_t ret = -EBADF;if (f.file) {loff_t pos = file_pos_read(f.file);ret = vfs_read(f.file, buf, count, &pos);if (ret >= 0)file_pos_write(f.file, pos);fdput_pos(f);}return ret;
}
static ssize_t
pipe_read(struct kiocb *iocb, struct iov_iter *to)
{size_t total_len = iov_iter_count(to);struct file *filp = iocb->ki_filp;struct pipe_inode_info *pipe = filp->private_data;int do_wakeup;ssize_t ret;/* Null read succeeds. */if (unlikely(total_len == 0))return 0;do_wakeup = 0;ret = 0;__pipe_lock(pipe);for (;;) {int bufs = pipe->nrbufs; // bufs相关操作参考写管道函数,原理与写管道一致if (bufs) {int curbuf = pipe->curbuf;struct pipe_buffer *buf = pipe->bufs + curbuf;const struct pipe_buf_operations *ops = buf->ops;size_t chars = buf->len;size_t written;int error;if (chars > total_len)chars = total_len;error = ops->confirm(pipe, buf);if (error) {if (!ret)ret = error;break;}written = copy_page_to_iter(buf->page, buf->offset, chars, to);if (unlikely(written < chars)) {if (!ret)ret = -EFAULT;break;}ret += chars;buf->offset += chars;buf->len -= chars;/* Was it a packet buffer? Clean up and exit */if (buf->flags & PIPE_BUF_FLAG_PACKET) {total_len = chars;buf->len = 0;}if (!buf->len) {buf->ops = NULL;ops->release(pipe, buf);curbuf = (curbuf + 1) & (pipe->buffers - 1);pipe->curbuf = curbuf;pipe->nrbufs = --bufs;do_wakeup = 1;}total_len -= chars;if (!total_len)break; /* common path: read succeeded */}if (bufs) /* More to do? */ // bufs不为空,继续读continue;if (!pipe->writers) // 管道写端已经关闭,且bufs为空,没有继续等待的必要,永远没法再读到数据break; // 没有等待写管道的进程if (!pipe->waiting_writers) { // 有等待写管道的进程/* syscall merging: Usually we must not sleep* if O_NONBLOCK is set, or if we got some data.* But if a writer sleeps in kernel space, then* we can wait for that data without violating POSIX.*/if (ret)break; // 已经读取到部分数据,直接返回已读取的数据if (filp->f_flags & O_NONBLOCK) {ret = -EAGAIN; // 非阻塞状态,且没有读到数据,返回-EAGAIN,稍微再试break;}}if (signal_pending(current)) {if (!ret)ret = -ERESTARTSYS;break;}if (do_wakeup) {wake_up_interruptible_sync_poll(&pipe->wait, POLLOUT | POLLWRNORM); // 唤醒等待任务链表kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);}pipe_wait(pipe); // 前面管道写端已关闭会返回,还有数据会继续读,没有已读取到数据且没有等待写管道的进程会返回,非阻塞模式会返回,因此阻塞模式下没有读到数据且管道写端没有关闭才会执行这条语句(等待其他进程往管道写数据),将但前task添加到写链表,切换到其他进程}__pipe_unlock(pipe);/* Signal writers asynchronously that there is more room. */if (do_wakeup) {wake_up_interruptible_sync_poll(&pipe->wait, POLLOUT | POLLWRNORM);kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);}if (ret > 0)file_accessed(filp);return ret;
}

linux内核管道pipe实现详解相关推荐

  1. 【流媒体服务器Mediasoup】 NodeJs与C++信令通信详解及Linux下管道通信的详解(五)

    目录 前言 匿名管道进程间通信 进程间管道 的创建与图解 MediaSoup中的管道创建 MediaSoup Channel的创建 NodeJs和 C++ 管道通信的过程 MediaSoup 消息确认 ...

  2. Linux内核中sk_buff结构详解

    目录 1.sk_buff结构体 1.1 sk_buff在内核中的结构 1.2 重要的长度len的解析 2. sk_buff数据区 2.1 线性数据区 2.2 非线性数据区 -------------- ...

  3. linux 内核编译 Kconfig文件详解

    Kconfig的格式 下面截取/drivers/net下的Kconfig文件中的部分内容, # Network device configuration menuconfig NETDEVICESde ...

  4. Linux内核线程kernel thread详解--Linux进程的管理与调度(十)【转】

    转自:https://blog.csdn.net/gatieme/article/details/51589205 版权声明:本文为博主原创文章 && 转载请著名出处 @ http:/ ...

  5. Linux内核线程kernel thread详解--Linux进程的管理与调度

    内核线程 为什么需要内核线程 Linux内核可以看作一个服务进程(管理软硬件资源,响应用户进程的种种合理以及不合理的请求). 内核需要多个执行流并行,为了防止可能的阻塞,支持多线程是必要的. 内核线程 ...

  6. linux内核的I2C子系统详解1——I2C总线概览、驱动框架概览

    以下内容源于朱有鹏<物联网大讲堂>课程的学习,如有侵权,请告知删除. 1.I2C总线汇总概览 (1)三根通信线:SCL.SDA.GND: (2)同步.串行.电平.低速(几百k).近距离: ...

  7. linux内核提取ret2usr,Linux内核漏洞利用技术详解 Part 2

    前言 在上一篇文章中,我们不仅为读者详细介绍了如何搭建环境,还通过一个具体的例子演示了最简单的内核漏洞利用技术:ret2usr.在本文中,我们将逐步启用更多的安全防御机制,即SMEP.KPTI和SMA ...

  8. Linux内核IO技术栈详解

    这是<Linux系统调用那些事>高级部分的第一章<聊聊Linux IO>.高级部分的文章均假设读者完整的学习过Linux系统基础以及Linux系统编程相关的内容,并已有一定的工 ...

  9. Linux 内核0.11 系统调用详解(下)

    备注:上讲中,博猪讲到了操作系统是如何让用户程序调用系统函数的,这讲继续接上讲的话题,从一个系统内核系统函数创建的小实验来学习系统内核具体做了些什么.理清下系统调用的整体过程. 实验:在Linux 0 ...

最新文章

  1. [连载]JavaScript讲义(05)--- 数据处理
  2. Effective C++ 学习笔记
  3. [转载]html5教程
  4. Redis命令:SETNX key value(SET if Not eXists)
  5. Element-UI分页组件使用——点第几页查第几页
  6. 深度比较Paxos和Raft
  7. Oracle 11g Java驱动包ojdbc6.jar安装到maven库,并查看jar具体版本号
  8. 代码里配置java代理
  9. JAVA视频系列,学习JAVA常遇到的问题,连载【方法篇】。
  10. 百度DuerOS负责人景鲲晋升副总裁,继续向李彦宏汇报
  11. Quartz的misfire特性
  12. 据说:一个线程性能相当于30%核心
  13. 代码安全之代码混淆及加固(Android)
  14. kali linux ap热点,Kali(debian)创建WIFI AP热点
  15. 天正电气图例_天正电气设计施工图中常用线路敷设方式
  16. xml文件中servlet映射重复问题,也是导致tomcat启动不了,出现在Java 9上运行时,需在JVM命令行参数中添加“-add opens=Java.base/Java.lang=ALL-U
  17. 页面静态化的优点及缺点
  18. android 修改双卡铃声,Android 修改系统来电铃声
  19. 计算机小游戏有哪些,计算机有哪些单机游戏可以耐玩,4 G以下?
  20. 课堂笔记 - 数据库设计

热门文章

  1. Element el-input 输入框详解
  2. stm32的FreeRTOS移植
  3. VScode配置8086汇编环境
  4. 冷却水的循环方式有哪几种_循环冷却水系统,按照通风方式可分为 和 两种。...
  5. linux7防火墙图形界面打不开,centos 7版本防火墙详细说明
  6. My97日历控件用法
  7. 英语阅读的逻辑——唐迟
  8. Windows如何成功下载scipy(包含numpy+mkl的安装下载和百度网盘资源)
  9. 聊聊学历那点事----学历重要还是能力重要?
  10. 1903java全套_#千锋逆战班,java1903#