5.5 Redis 对 epoll 的封装

Redis 的作者和 Nginx 的作者一样,不喜欢引入第三方的库,比如 libevent、libev 来做事件处理,而是自己封装了 epoll,不像 Memcachd 的 I/O 模型还得依赖 libevent。Redis 的 I/O 模型针对不同系统做了不同的实现,比如 Linux 中的实现是对 epoll 的封装,BSD 中的实现是对 kqueue 的封装。针对 Linux 的实现,我们来看其核心的 ae_epoll.c:

aeApiState 封装了 epoll_event:

typedef struct aeApiState {int epfd;struct epoll_event *events;
} aeApiState;

aeApiCreate 用于调用 epoll_create 创建 epoll:

static int aeApiCreate(aeEventLoop *eventLoop) {aeApiState *state = zmalloc(sizeof(aeApiState));…state->epfd = epoll_create(1024); // 1024是内核设置的默认值…
}

aeApiAddEvent 用于向 epoll 中注册事件:

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {aeApiState *state = eventLoop->apidata;struct epoll_event ee;…ee.data.fd = fd;if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;return 0;
}

aeApiDelEvent 用于从 epoll 中删除事件:

static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {aeApiState *state = eventLoop->apidata;struct epoll_event ee;int mask = eventLoop->events[fd].mask & (~delmask);ee.events = 0;if (mask & AE_READABLE) ee.events |= EPOLLIN;if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;ee.data.u64 = 0; /* avoid valgrind warning */ee.data.fd = fd;if (mask != AE_NONE) {epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);} else {// 注意, Kernel < 2.6.9 EPOLL_CTL_DEL 需要一个非空的事件指针epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);}
}

aeApiPoll 通过调用 epoll_wait 等待 epoll 事件就绪:

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {aeApiState *state = eventLoop->apidata;int retval, numevents = 0;retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);if (retval > 0) {int j;numevents = retval;for (j = 0; j < numevents; j++) {int mask = 0;struct epoll_event *e = state->events+j;if (e->events & EPOLLIN) mask |= AE_READABLE;if (e->events & EPOLLOUT) mask |= AE_WRITABLE;if (e->events & EPOLLERR) mask |= AE_WRITABLE;if (e->events & EPOLLHUP) mask |= AE_WRITABLE;eventLoop->fired[j].fd = e->data.fd;eventLoop->fired[j].mask = mask;}}return numevents;
}

其中:

  • aeApiCreate:调用 epoll_create 创建了一个 epoll 池子。

  • aeApiAddEvent:调用 epoll_ctl 向 epoll 中注册事件。

  • aeApiPoll:通过调用 epoll_wait 来获取已经响应的事件。

那么这个过程是如何呢?我们来一步一步看 server epoll 初始化过程:

首先在 initServer 函数执行的时候初始化了 epoll:

server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);

接着设置回调函数:

aeSetBeforeSleepProc(server.el,beforeSleep);

再来看主循环中的 aeMain 函数:

void aeMain(aeEventLoop *eventLoop) {eventLoop->stop = 0;while (!eventLoop->stop) {if (eventLoop->beforesleep != NULL)eventLoop->beforesleep(eventLoop);aeProcessEvents(eventLoop, AE_ALL_EVENTS);}
}

最后循环调用 aeProcessEvents 来进行事件处理(见图5-10)。

图5-10 Redis epoll 主循和事件的关系

可以看到 eventLoop 会对两类事件进行处理,定时器事件和 file 事件。

最后我们来看 aeProcessEvents 函数:

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{int processed = 0, numevents;…aeTimeEvent *shortest = NULL;struct timeval tv, *tvp;if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))shortest = aeSearchNearestTimer(eventLoop);if (shortest) {long now_sec, now_ms;aeGetTime(&now_sec, &now_ms);tvp = &tv;tvp->tv_sec = shortest->when_sec - now_sec;if (shortest->when_ms < now_ms) {tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;tvp->tv_sec --;} else {tvp->tv_usec = (shortest->when_ms - now_ms)*1000;}if (tvp->tv_sec < 0) tvp->tv_sec = 0;if (tvp->tv_usec < 0) tvp->tv_usec = 0;} else {// AE_DONT_WAIT 标志置位,则设置超时时间为0if (flags & AE_DONT_WAIT) {tv.tv_sec = tv.tv_usec = 0;tvp = &tv;} else {// 否则会发生阻塞tvp = NULL;         // 一直等待}}numevents = aeApiPoll(eventLoop, tvp);for (j = 0; j < numevents; j++) {aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];int mask = eventLoop->fired[j].mask;int fd = eventLoop->fired[j].fd;int rfired = 0;if (fe->mask & mask & AE_READABLE) {rfired = 1;fe->rfileProc(eventLoop,fd,fe->clientData,mask);}if (fe->mask & mask & AE_WRITABLE) {if (!rfired || fe->wfileProc != fe->rfileProc)fe->wfileProc(eventLoop,fd,fe->clientData,mask);}processed++;}}if (flags & AE_TIME_EVENTS)processed += processTimeEvents(eventLoop);return processed;                 // 返回需要处理的 file/time 事件数量
}

这个函数大致上分为以下几个步骤:

1)aeSearchNearestTimer 查找是否有要优先处理的定时器任务,如果有就先处理。

2)假如没有,则执行 aeApiPoll 来处理 epoll 中的就绪事件,而且是无限等待的哦:

    if (flags & AE_DONT_WAIT) {tv.tv_sec = tv.tv_usec = 0;tvp = &tv;} else {tvp = NULL;}

3)处理定时器任务。

最后我们来看一下 Redis 的整体事件处理流程(见图5-11),由于 Redis 本身是单线程的,没有锁的竞争,为了提高处理的吞吐量,Redis 把工作的流程拆成了很多步,每步都是通过 epoll 的机制来回调,这样尽量不让一个请求 hold 住主线程,让系统的吞吐量得到有效的提升。

图5-11 Redis 的整体事件处理流程图

5.6 Nginx 文件异步 I/O

为了提升对 I/O 事件的及时响应速度,Linux 提供了 aio 机制,该机制实现了真正的异步 I/O 响应处理,不像 libc 的 aio 是异步线程伪装的。

因为 Linux 的 aio 对缓存不支持,所以在 Nginx 中,仅仅对读文件做了 aio 的支持。

aio 的使用可以分为以下几个步骤:

1)io_setup:初始化异步 I/O 上下文,类似于 epoll_create。

2)io_submit:注册异步事件和回调 handler。

ngx_epoll_module 在初始化的时候,会先进行 aio 的初始化:

ngx_epoll_aio_init(ngx_cycle_t *cycle, ngx_epoll_conf_t *epcf)
{int                    n;struct epoll_event  ee;#if (NGX_HAVE_SYS_EVENTFD_H)ngx_eventfd = eventfd(0, 0);
#elsengx_eventfd = syscall(SYS_eventfd, 0);
#endif
…n = 1;if (ioctl(ngx_eventfd, FIONBIO, &n) == -1) {ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,"ioctl(eventfd, FIONBIO) failed");goto failed;}if (io_setup(epcf->aio_requests, &ngx_aio_ctx) == -1) {ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,"io_setup() failed");goto failed;}ngx_eventfd_event.data = &ngx_eventfd_conn;ngx_eventfd_event.handler = ngx_epoll_eventfd_handler;ngx_eventfd_event.log = cycle->log;ngx_eventfd_event.active = 1;ngx_eventfd_conn.fd = ngx_eventfd;ngx_eventfd_conn.read = &ngx_eventfd_event;ngx_eventfd_conn.log = cycle->log;ee.events = EPOLLIN|EPOLLET;ee.data.ptr = &ngx_eventfd_conn;if (epoll_ctl(ep, EPOLL_CTL_ADD, ngx_eventfd, &ee) != -1) {return;}
…
}

然后 Nginx 会在读取文件的时候调用 ngx_file_aio_read 函数进行异步读取:

ssize_t
ngx_file_aio_read(ngx_file_t *file, u_char *buf, size_t size, off_t offset,ngx_pool_t *pool)
{ngx_err_t         err;struct iocb      *piocb[1];ngx_event_t      *ev;ngx_event_aio_t  *aio;...aio = file->aio;ev = &aio->event;...if (ev->complete) {ev->active = 0;ev->complete = 0;if (aio->res >= 0) {ngx_set_errno(0);return aio->res;}...}ngx_memzero(&aio->aiocb, sizeof(struct iocb));aio->aiocb.aio_data = (uint64_t) (uintptr_t) ev;aio->aiocb.aio_lio_opcode = IOCB_CMD_PREAD;aio->aiocb.aio_fildes = file->fd;aio->aiocb.aio_buf = (uint64_t) (uintptr_t) buf;aio->aiocb.aio_nbytes = size;aio->aiocb.aio_offset = offset;aio->aiocb.aio_flags = IOCB_FLAG_RESFD;aio->aiocb.aio_resfd = ngx_eventfd;ev->handler = ngx_file_aio_event_handler;piocb[0] = &aio->aiocb;if (io_submit(ngx_aio_ctx, 1, piocb) == 1) {ev->active = 1;ev->ready = 0;ev->complete = 0;return NGX_AGAIN;}
...
}

5.7 tail 指令为何牛

因为需要采集线上环境的数据,我开发了一个 Java agent 程序来采集相关信息。跑了一段时间后发现一个问题,假如采集数据量压力过大的话,会产生该进程占用 CPU 过高,例如100%以上的情况。

首先来看代理程序的伪代码:

public void run() {try {accesslog.seek(accesslog.length());int i = 0;while (!Thread.currentThread().isInterrupted()) {String line = accesslog.readLine();if (line != null) {try {parseLineAndLog(line);} catch (Exception ex) {LOGGER.error("parseLineAndLog log error:", ex);}try {if (i++ % 100 == 0) {Thread.sleep(100);}} catch (InterruptedException e) {e.printStackTrace();}}}} catch (IOException e) {LOGGER.error("read ngx access log error:", e);}
}

我们首先通过 top-H-p${pid}观察该进程中的具体哪个线程占用 CPU 比较高。找到后,通过以下指令把 pid 转换成16进制的数据:

awk '{printf("%x",1234)}'

然后我们通过如下命令产生线程堆栈信息:

jstack ${pid} > stack.log

再通过刚才 awk 的16进制进程号查看,发现 jstack 中该进程一直在做如下操作:

String line = accesslog.readLine();

因为上面代码采用的 accesslog 其实是:

this.accesslog = new RandomAccessFile(path, "r");

而它的 readLine()方法是不阻塞的,轮询必然导致 CPU 占用率的提升。

那如何解决呢?我使用了 tail 指令:

 Process p = Runtime.getRuntime().exec("tail -n 1 -F " + path);br = new BufferedReader(new InputStreamReader(p.getInputStream()));

然后在 while 循环中用 br.readline 来解决问题。

上线后再观察,神奇的事情发生了,CPU 占用率始终控制在5%以下。

那么一条 tail 指令为什么能那么神奇呢?我们根据 Linus 大神的指示:从代码中寻找答案。

首先我们找到 tail 的源码:

http:// git.savannah.gnu.org/cgit/coreutils.git/tree/src/tail.c

经过一步一步分析后,我们发现,最终会调用 tail_forever_inotify 函数。

在2.6内核之后,Linux 提供了 inotify 功能,内核通过监控文件系统的变更来反向通知用户,这样减少了轮询的开销。我们来看其实现:

tail_forever_inotify (int wd, struct File_spec *f, size_t n_files,double sleep_interval)
{
...
f[i].wd = inotify_add_watch (wd, f[i].name, inotify_wd_mask);
...if (pid){if (writer_is_dead)exit (EXIT_SUCCESS);writer_is_dead = (kill (pid, 0) != 0 && errno != EPERM);struct timeval delay; // 等待文件变化的时间if (writer_is_dead)delay.tv_sec = delay.tv_usec = 0;else{delay.tv_sec = (time_t) sleep_interval;delay.tv_usec = 1000000 * (sleep_interval - delay.tv_sec);}fd_set rfd;FD_ZERO (&rfd);FD_SET (wd, &rfd);int file_change = select (wd + 1, &rfd, NULL, NULL, &delay);if (file_change == 0)continue;else if (file_change == -1)die (EXIT_FAILURE, errno, _("error monitoring inotify event"));}
...len = safe_read (wd, evbuf, evlen);
...

所以以上步骤主要分为三步:

1)注册 inotify 的 watch。

2)用 select 等待 watch 事件发生。

3)用 safe_read 读取准备好的数据。

5.8 零拷贝技术应用分析

在常见的 I/O 场景中,都是先通过 read+write(或 send)的方式来完成的,如图5-12所示,read 调用先从用户态切换到内核态,然后从文件中读取了数据,存储到内核的缓冲区中,然后再把数据从内核态缓冲区拷贝到用户态,同时从内核态切换到用户态。

接着用 send 写入到指定文件也是类似的过程,这里存在4次上下文切换和4次缓冲区的拷贝。

图5-12 一次 read/send 的过程

为了优化这个缓冲区拷贝和上下文切换的次数,Linux 提供了几种方案,下面分别介绍。

5.8.1 mmap

假如仅仅是把数据写入到文件,Linux 提供了 mmap 的方式来共享内存虚拟地址空间,这样只要写共享内存就是写文件,读共享内存就是读文件,减少了缓冲区拷贝的次数。

mmap 的实现最终通过 do_mmap 函数来实现:

unsigned long do_mmap(struct file *file, unsigned long addr,unsigned long len, unsigned long prot,unsigned long flags, vm_flags_t vm_flags,unsigned long pgoff, unsigned long *populate)
{struct mm_struct *mm = current->mm;                        // 当前进程的 mm…if ((prot & PROT_READ) && (current->personality & READ_IMPLIES_EXEC))// 是否隐藏了可执行属性if (!(file && path_noexec(&file->f_path)))prot |= PROT_EXEC;if (!(flags & MAP_FIXED))        // MAP_FIXED没有设置addr = round_hint_to_min(addr);        // 判断输入的欲映射的起始地址是否小于最小映射地址,如果小于,将 addr 修改为最小地址len = PAGE_ALIGN(len);        // 检测 len 是否越界…if ((pgoff + (len >> PAGE_SHIFT)) < pgoff)                // 再次检测是否越界return -EOVERFLOW;if (mm->map_count > sysctl_max_map_count)        // 超过一个进程中对于 mmap 的最大个数限制return -ENOMEM;addr = get_unmapped_area(file, addr, len, pgoff, flags);        // 获取没有映射的地址(查询 mm 中空闲的内存地址)…// 设置 vm_flags,根据传入的 port 和 flags 以及 mm 自己的 flag 来设置vm_flags |= calc_vm_prot_bits(prot) | calc_vm_flag_bits(flags) |mm->def_flags | VM_MAYREAD | VM_MAYWRITE | VM_MAYEXEC;…if (file) {struct inode *inode = file_inode(file);switch (flags & MAP_TYPE) {case MAP_SHARED:if ((prot&PROT_WRITE) && !(file->f_mode&FMODE_WRITE))// file 应该被打开并允许写入return -EACCES;if (IS_APPEND(inode) && (file->f_mode & FMODE_WRITE))// 不能写入一个只允许写追加的文件return -EACCES;if (locks_verify_locked(file))                // 文件被强制锁定return -EAGAIN;vm_flags |= VM_SHARED | VM_MAYSHARE        // 尝试允许其他进程共享if (!(file->f_mode & FMODE_WRITE))                // 如果 file 不允许写,取消共享vm_flags &= ~(VM_MAYWRITE | VM_SHARED);…}}…addr = mmap_region(file, addr, len, vm_flags, pgoff); // 建立从文件到虚存区间的映射…return addr;
}

以上过程最重要的两步是:

1)get_unmapped_area 查询并获取当前进程虚拟地址空间中空闲的没有映射的地址。

2)mmap_region 建立从文件到虚存区间的映射。

最终映射后的关系如图5-13所示。

图5-13 文件和虚拟地址空间映射后的关系

5.8.2 sendfile

假如需要从一个文件读数据,并且写入到另一个文件,mmap 的方式还是会存在2次系统调用4次上下文切换,所以 Linux 又提供了 sendfile 的调用,1次系统调用搞定(见图5-14)。

图5-14 sendfile 调用过程

下面我们来分析一下 sendfile 的实现,sendfile 调用最终会调用 do_sendfile 函数:

static ssize_t do_sendfile(int out_fd, int in_fd, loff_t *ppos,size_t count, loff_t max)
{file_start_write(out.file);retval = do_splice_direct(in.file, &pos, out.file, &out_pos, count, fl);file_end_write(out.file);
…return retval;
}

其中最关键的一行是 do_splice_direct:

long do_splice_direct(struct file *in, loff_t *ppos, struct file *out,loff_t *opos, size_t len, unsigned int flags)
{struct splice_desc sd = {.len        = len,.total_len      = len,.flags          = flags,.pos            = *ppos,.u.file         = out,.opos           = opos,};long ret;…ret = splice_direct_to_actor(in, &sd, direct_splice_actor);if (ret > 0)*ppos = sd.pos;return ret;
}ssize_t splice_direct_to_actor(struct file *in, struct splice_desc *sd,splice_direct_actor *actor)
{struct pipe_inode_info *pipe;long ret, bytes;umode_t i_mode;size_t len;int i, flags, more;…pipe = current->splice_pipe;if (unlikely(!pipe)) {pipe = alloc_pipe_info();…pipe->readers = 1;current->splice_pipe = pipe;}// 进行拼接ret = 0;bytes = 0;len = sd->total_len;flags = sd->flags;// 不要在输出的时候阻塞,我们需要清空 direct pipesd->flags &= ~SPLICE_F_NONBLOCK;more = sd->flags & SPLICE_F_MORE;while (len) {size_t read_len;loff_t pos = sd->pos, prev_pos = pos;ret = do_splice_to(in, &pos, pipe, len, flags);…ret = actor(pipe, sd);if (unlikely(ret <= 0)) {sd->pos = prev_pos;goto out_release;}bytes += ret;len -= ret;sd->pos = pos;if (ret < read_len) {sd->pos = prev_pos + ret;goto out_release;}}done:pipe->nrbufs = pipe->curbuf = 0;file_accessed(in);return bytes;
…
}

在上述代码中,总结起来就三个步骤:

1)alloc_pipe_info 分配 pipe 对象,pipe 其实就是个缓冲区。

2)do_splice_to 把 in 文件的数据读入到缓冲区。

3)actor 把缓冲区的数据读到 out 文件中。

5.8.3 mmap 和 sendfile 在开源软件中的使用

在 MongoDB 中,使用了操作系统底层提供的内存映射机制,即 mmap,数据文件使用 mmap 映射到内存空间进行管理,内存的管理(哪些数据何时换入/换出)完全交给 OS 管理。

MongoDB 对不同操作系统的 MemoryMappedFile 有不同的实现,我们这里针对 Linux 操作系统的实现来分析 MongoDB 中把文件数据映射到进程地址空间的操作:

void* MemoryMappedFile::map(const char *filename, unsigned long long &length, int options) {setFilename(filename);FileAllocator::get()->allocateAsap( filename, length );len = length;…unsigned long long filelen = lseek(fd, 0, SEEK_END);…lseek( fd, 0, SEEK_SET );void * view = mmap(NULL, length, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);…views.push_back( view );return view;
}

MongoDB 通知操作系统去映射所有数据文件到内存,操作系统使用 mmap()系统调用来完成。从这一点看,数据文件,包括所有的 docments、collections 及其索引,都会被操作系统通过页(page)的方式交换到内存。如果有足够的内存,所有数据文件最终都会加载到内存中。

当内存发生了改变,比如一个写操作,产生的变化将会异步刷新到磁盘,但写操作仍是很快的,直接操作内存。数据量可以适应内存大小,从而达到一个理想状况——对磁盘的操作达到最小量。但是如果数据量超出内存,页面访问错误(page faults)将会悄悄上来,那么系统就会频繁访问内存,读写操作要慢很多。最糟糕的状况是数据量远大于内存,读写不稳定,性能急剧下降。

Kafka 是 Apache 社区下的消息中间件,在 Kafka 上,有两个原因可能导致低效:1)太多的网络请求;2)过多的字节拷贝。为了提高效率,Kafka 把 message 分成一组一组的,每次请求会把一组 message 发给相应的 consumer。此外,为了减少字节拷贝,采用了 sendfile 系统调用。

Kafka 设计了一种“标准字节消息”,Producer、Broker、Consumer 共享这一种消息格式。Kakfa 的消息日志在 broker 端就是一些目录文件,这些日志文件都是 MessageSet 按照这种“标准字节消息”格式写入磁盘的。

维持这种通用的格式对这些操作的优化尤为重要:持久化 log 块的网络传输。流行的 Unix 操作系统提供了一种非常高效的途径来实现页面缓存和 socket 之间的数据传递。在 Linux 操作系统中,这种方式称作:sendfile 系统调用(Java 提供了访问这个系统调用的方法:FileChannel.transferTo api)。

下面我们来分析在 Kafka 中的零拷贝流程。

首先我们来看 kafka 的服务端 socketServer 逻辑:

override def run() {startupComplete()while(isRunning) {try {// 配置任意一个新的可以用来排队的连接configureNewConnections()// 注册一个新的请求用来写processNewResponses()try {selector.poll(300)} catch {case...}

SocketServer 会 poll 队列,一旦对应的 KafkaChannel 写操作准备好了,就会调用 KafkaChannel 的 write 方法:

// KafkaChannel.scala
public Send write() throws IOException {if (send != null && send(send))
}
// KafkaChannel.scala
private boolean send(Send send) throws IOException {send.writeTo(transportLayer);if (send.completed())transportLayer.removeInterestOps(SelectionKey.OP_WRITE);return send.completed();
}

其中 write 会调用 send 方法,对应的 Send 对象其实就是上面我们注册的 FetchRes-ponseSend 对象。

这段代码里实际发送数据的代码是 send.writeTo(transportLayer),对应的 writeTo 方法为:

private val sends = new MultiSend(dest, JavaConversions.seqAsJavaList(fetchResponse.dataGroupedByTopic.toList.map {case(topic, data) => new TopicDataSend(dest, TopicData(topic,data.map{case(topicAndPartition, message) => (topicAndPartition.partition, message)}))}))
override def writeTo(channel: GatheringByteChannel): Long = {…written += sends.writeTo(channel)…
}

这里最后调用了 sends 的 writeTo 方法,而 sends 其实是个 MultiSend。MultiSend 里有两个东西:

  • topicAndPartition.partition:分区。

  • message:FetchResponsePartitionData。还记得这个 FetchResponsePartitionData 吗?我们的 MessageSet 就放在了这个对象里。

TopicDataSend 也包含了 sends,该 sends 包含了 PartitionDataSend,而 PartitionDataSend 则包含了 FetchResponsePartitionData。

最后进行 writeTo 的时候,其实是调用了:

// partitionData 就是 FetchResponsePartitionData,messages 就是 FileMessageSet
val bytesSent = partitionData.messages.writeTo(channel, messagesSentSize, messageSize - messagesSentSize)

FileMessageSet 也有个 writeTo 方法,就是我们之前已经提到过的那段代码:

def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int = {...val bytesTransferred = (destChannel match {case tl: TransportLayer => tl.transferFrom(channel, position, count)case dc => channel.transferTo(position, count, dc)}).toIntbytesTransferred
}

最后通过 tl.transferFrom(channel,position,count)来完成最后的数据发送的。trans-ferFrom 其实是 Kafka 自己封装的一个方法,最终里面调用的也是 transerTo:

public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {return fileChannel.transferTo(position, count, socketChannel);
}

5.9 本章小结

我们编写的应用程序或多或少都会涉及 I/O,比如读写数据库、读写网络等,总会遇到很多问题,例如在高并发场景下,如何编写高性能的服务端和客户端程序。对 I/O 的理解是否深入,关系到写出来的应用对性能的影响程度。

从狭义的角度来讲,I/O 就是 in 和 out 两条输入输出的汇编指令。但是从广义角度讲,I/O 可以涉及操作系统整个 I/O 模型的构建,系统从分层的角度,将数据从写文件开始最终转换成数据块并落入磁盘。从更深入的视角看,会涉及 epoll 这样的 I/O 多路复用模型。

因此,只有从操作系统的角度来理解 I/O,才能真正编写出高性能的应用程序。

linux内核分析及应用 -- 输入输出(下)相关推荐

  1. LINUX内核分析第四周——扒开系统调用的三层皮

    LINUX内核分析第四周--扒开系统调用的三层皮 李雪琦 + 原创作品转载请注明出处 + <Linux内核分析>MOOC课程http://mooc.study.163.com/course ...

  2. Linux内核分析——可执行程序的装载

    链接的过程 首先运行C预处理器cpp,将C的源程序(a.c)翻译成ASCII码的中间文件(a.i) 接着C编译器ccl,将a.i翻译成ASCII汇编语言文件a.s 接着运行汇编器as,将a.s翻译成可 ...

  3. LINUX内核分析第二周学习总结——操作系统是如何工作的

    LINUX内核分析第二周学习总结--操作系统是如何工作的 张忻(原创作品转载请注明出处) <Linux内核分析>MOOC课程http://mooc.study.163.com/course ...

  4. linux内核分析 网络九,“Linux内核分析”实验报告(九)

    一 Linux内核分析博客简介及其索引 本次实验简单的分析了计算机如何进行工作,并通过简单的汇编实例进行解释分析 在本次实验中 通过听老师的视频分析,和自己的学习,初步了解了进程切换的原理.操作系统通 ...

  5. Linux内核分析作业第二周

    操作系统是如何工作的 <Linux内核分析>MOOC课程http://mooc.study.163.com/course/USTC-1000029000 一.函数调用堆栈 1.计算机工作三 ...

  6. Linux内核分析:完成一个简单的时间片轮转多道程序内核代码

    PS.贺邦   原创作品转载请注明出处  <Linux内核分析>MOOC课程    http://mooc.study.163.com/course/USTC-1000029000 1.m ...

  7. 《Linux内核分析》实践4

    <Linux内核分析> 实践四--ELF文件格式分析 20135211李行之 一.概述 1.ELF全称Executable and Linkable Format,可执行连接格式,ELF格 ...

  8. Linux内核分析--操作系统是如何工作的

    "平安的祝福 + 原创作品转载请注明出处 + <Linux内核分析>MOOC课程http://mooc.study.163.com/course/USTC-1000029000  ...

  9. 《Linux内核分析》MOOC课程之从汇编语言角度看计算机是如何工作的

    2019独角兽企业重金招聘Python工程师标准>>> piratezgw 原创作品转载请注明出处 <Linux内核分析>MOOC课程http://mooc.study. ...

  10. linux swi 内核sp,Linux内核分析课程8_进程调度与进程切换过程

    8种机械键盘轴体对比 本人程序员,要买一个写代码的键盘,请问红轴和茶轴怎么选? Linux内核课第八周作业.本文在云课堂中实验楼完成. 原创作品转载请注明出处 <Linux内核分析>MOO ...

最新文章

  1. puppet yum模块、配置仓储、mount模块
  2. Python学习【第6篇】:Python之文件操作
  3. freemarker第三篇
  4. java之七 高级类设计
  5. android listview mapview,RelativeLayout和并列ListView/MapView
  6. 前端React结构工程-改写render
  7. 前端入门-day2(常见css问题及解答)
  8. [Java] 蓝桥杯ADV-208 算法提高 矩阵相乘
  9. linux sudo apt-get,linux sudo apt-get用法详解
  10. matlab四宫格画图_科学网—Matlab画图(一):生成高质量的供发表和展示用的图 - 周建锋的博文...
  11. ubuntu 常用软件包安装、卸载和删除的方法
  12. python结束函数_python结束函数
  13. 花几分钟轻松搞定快速排序算法
  14. 关于长江的题目_高中优秀议论文题目【高中关于长江的作文题目加优秀范文】...
  15. SpringBoot基础学习之整合Swagger框架(上篇)
  16. #有关汇编语言的org指令(许多不为人知的故事):
  17. request获取不到参数一种情况
  18. 解决 cannot connect to 192.168.1.136:5555: 由于目标计算机积极拒绝,无法连接。 (10061)
  19. HTML: 点击链接时在新窗口打开
  20. 镂空数学符号空心体数学符号

热门文章

  1. 《构建之法》阅读笔记(三)
  2. 随手一写,简单的四则运算练习
  3. QString字符串拼接【转载】
  4. 韦东山驱动视频笔记——6.输入子系统之编写驱动程序
  5. 对象str()与reper()转换为字符串
  6. Mysql数据库乱码
  7. list遍历_Python遍历list,使用range和enumerate的效率区别
  8. 重构计算力 浪潮M5新一代服务器闪耀登场
  9. 【Win 10应用开发】分阶段进行数据绑定
  10. 解决NTLDR is missing,系统无法启动的方法