Rust轻量级I/O库mio

mio是rust实现的一个轻量级的I/O库。其实现基本上就是对不同操作系统底层相关API的封装,抽象出统一的接口供上层使用。Linux下为epoll,Windows下为IOCP,OS X下为kqueue。

一、关于mio

1、重要特性

  • 非阻塞TCP,UDP
  • I/O事件通知epoll,kqeue,IOCP实现
  • 运行时零分配
  • 平台可扩展

2、基础用法

其使用方法与Linux中epoll差不多,mio底层封装了epoll,使用步骤思路:

  1. 创建Poll
  2. 注册事件
  3. 事件循环等待与处理事件

mio提供可跨平台的sytem selector访问,不同平台如下表,都可调用相同的API。不同平台使用的API开销不尽相同。由于mio是基于readiness(就绪状态)的API,与Linux epoll相似,可以看到很多API在Linux上都可以一对一映射。相比之下,Windows IOCP是基于完成(completion-based)而非基于就绪的API,所以两者间会有较多桥接。 同时mio提供自身版本的TcpListener、TcpStream、UdpSocket,这些API封装了底层平台相关API,并设为非阻塞且实现Evented trait。

OS Selector
Linux epoll
OS X, iOS kqueue
Windows IOCP
FreeBSD kqueue
Android epoll

mio实现的是一个单线程事件循环,并没有实现线程池及多线程事件循环,如果需要线程池及多线程事件循环等需要自己实现。

二、源码分析

先给出mio的源码目录结构,只列出了关键的部分,如下所示:

mio代码目录结构
mio
|---->test
|---->src
|-------->deprecated         //事件循环代码
|-------------->event_loop.rs        //EventLoop的实现,内部封装了Poll     【1】
|-------------->handler.rs           //供上层实现的接口
|-------->net
|------------>mod.rs
|------------>tcp.rs
|------------>udp.rs
|-------->sys                        //不同系统下的实现
|------------>mod.rs
|------------>fuchsia
|------------>unix               //Linux下封装的epoll
|------------------>mod.rs
|------------------>epoll.rs                     【3】
|------------------>awakener.rs
|------------>windows            //windows下封装的iocp
|-------->lib.rs
|-------->poll.rs            //定义Poll            【2】
|-------->channel.rs     【4】
|-------->event_imp.rs
|-------->timer.rs       【5】
|-------->......
复制代码

对涉及不同操作系统的部分代码,以Linux操作系统为例。在Linux操作系统中,mio封装了epoll。后面会给出相应的代码。

【1】Eventloop代码分析

结合前面的代码示例给出相应的关键代码如下: EventLoop事件循环定义,可以看到里面封装了Poll,以Linux系统举例,Poll又封装了epoll。在使用Poll或Linux中epoll时,最重要的代码是epoll_wait()等待事件Event并针对每个Event进行不同的处理。这里EventLoopepoll_create()epoll_wait()epoll_ctl()进行进一步的封装,将对Event的处理抽象成Handler,供上层实现具体的逻辑处理。

// Single threaded IO event loop.        //这里是单线程事件循环,更多的时候我们需要加线程池,以此为基础,再进行一次封装,供上层使用
pub struct EventLoop<H: Handler> {run: bool,poll: Poll,       events: Events,     //对应epoll中的epoll_eventtimer: Timer<H::Timeout>,notify_tx: channel::SyncSender<H::Message>,notify_rx: channel::Receiver<H::Message>,config: Config,
}
复制代码

抽象出接口供上层应用实现不同事件的逻辑处理。这里有点类似于回调函数,上层用户需要在此实现业务逻辑代码,实际运行时需要将函数指针传递给底层事件循环,底层事件循环运行时会调用用户传递过来的函数。在Rust中,可能描述的不是很精准,不过可以这样理解。

pub trait Handler: Sized {type Timeout;type Message;/// Invoked when the socket represented by `token` is ready to be operated/// on. `events` indicates the specific operations that are/// ready to be performed./// This function will only be invoked a single time per socket per event/// loop tick.fn ready(&mut self, event_loop: &mut EventLoop<Self>, token: Token, events: Ready) {}        //【1】/// Invoked when a message has been received via the event loop's channel.fn notify(&mut self, event_loop: &mut EventLoop<Self>, msg: Self::Message) {}     //【2】/// Invoked when a timeout has completed.fn timeout(&mut self, event_loop: &mut EventLoop<Self>, timeout: Self::Timeout) {}      //【3】/// Invoked when `EventLoop` has been interrupted by a signal interrupt.fn interrupted(&mut self, event_loop: &mut EventLoop<Self>) {}     //【4】/// Invoked at the end of an event loop tick.fn tick(&mut self, event_loop: &mut EventLoop<Self>) {}     //【5】
}
复制代码

这里把Poll进行了封装,主要实现了Eventloop::new()---->Poll::new()---->epoll_create()Eventloop::run()--->Selecter::select()---->epoll_wait(),还有register()reregister()deregister()等等......

impl<H: Handler> EventLoop<H> {/// Constructs a new `EventLoop` using the default configuration values./// The `EventLoop` will not be running.pub fn new() -> io::Result<EventLoop<H>> {EventLoop::configured(Config::default())}fn configured(config: Config) -> io::Result<EventLoop<H>> {// Create the IO pollerlet poll = Poll::new()?;      //Linux内部调用epoll_create()let timer = timer::Builder::default().tick_duration(config.timer_tick).num_slots(config.timer_wheel_size).capacity(config.timer_capacity).build();// Create cross thread notification queuelet (tx, rx) = channel::sync_channel(config.notify_capacity);  //这里创建的是同步管道,可配置同步管道内部的buffer queue bound size.// Register the notification wakeup FD with the IO pollerpoll.register(&rx, NOTIFY, Ready::readable(), PollOpt::edge() | PollOpt::oneshot())?;  //NOTIFY和TIMER由mio实现poll.register(&timer, TIMER, Ready::readable(), PollOpt::edge())?;Ok(EventLoop {run: true,poll: poll,timer: timer,notify_tx: tx,notify_rx: rx,config: config,events: Events::with_capacity(1024),})}/// Keep spinning the event loop indefinitely, and notify the handler whenever/// any of the registered handles are ready.pub fn run(&mut self, handler: &mut H) -> io::Result<()> {self.run = true;while self.run {// Execute ticks as long as the event loop is runningself.run_once(handler, None)?;   //Linux下调用epoll_wait()}Ok(())}pub fn run_once(&mut self, handler: &mut H, timeout: Option<Duration>) -> io::Result<()> {trace!("event loop tick");// Check the registered IO handles for any new events. Each poll// is for one second, so a shutdown request can last as long as// one second before it takes effect.let events = match self.io_poll(timeout) {Ok(e) => e,Err(err) => {if err.kind() == io::ErrorKind::Interrupted {handler.interrupted(self);     //调用Handler::interrupted() 【4】0} else {return Err(err);}}};self.io_process(handler, events);    //处理就绪的事件,handler为如何处理各种事件的实例handler.tick(self); //一轮事件处理后,最后调用Handler::tick()    调用【5】Ok(())}#[inline]fn io_poll(&mut self, timeout: Option<Duration>) -> io::Result<usize> {self.poll.poll(&mut self.events, timeout)}// Process IO events that have been previously polledfn io_process(&mut self, handler: &mut H, cnt: usize) {let mut i = 0;trace!("io_process(..); cnt={}; len={}", cnt, self.events.len());// Iterate over the notifications. Each event provides the token// it was registered with (which usually represents, at least, the// handle that the event is about) as well as information about// what kind of event occurred (readable, writable, signal, etc.)while i < cnt {       //遍历所有就绪的事件,进行处理let evt = self.events.get(i).unwrap();trace!("event={:?}; idx={:?}", evt, i);// mio在epoll之上,增加了NOTIFY和TIMERmatch evt.token() {NOTIFY => self.notify(handler),            //channel处理 ,这个epoll中是没有的,mio实现TIMER => self.timer_process(handler),  //Timer处理, 这个epoll中也是没有的,mio实现_ => self.io_event(handler, evt)        //IO事件的处理, 这个epoll有}i += 1;}}fn io_event(&mut self, handler: &mut H, evt: Event) {handler.ready(self, evt.token(), evt.readiness());   //调用Handler::ready() 【1】}fn notify(&mut self, handler: &mut H) {for _ in 0..self.config.messages_per_tick {match self.notify_rx.try_recv() {    //从channel中接收数据,内部实现是std::sync::mpsc::sync_channel()Ok(msg) => handler.notify(self, msg),    //调用Handler::notify()   【2】_ => break,}}// Re-registerlet _ = self.poll.reregister(&self.notify_rx, NOTIFY, Ready::readable(), PollOpt::edge() | PollOpt::oneshot());  //PollOpt::oneshot(),必须重新reregister.}fn timer_process(&mut self, handler: &mut H) {while let Some(t) = self.timer.poll() {handler.timeout(self, t);    //调用Handler::timeout() 【3】}}/// Registers an IO handle with the event loop.pub fn register<E: ?Sized>(&mut self, io: &E, token: Token, interest: Ready, opt: PollOpt) -> io::Result<()>where E: Evented{self.poll.register(io, token, interest, opt)}/// Re-Registers an IO handle with the event loop.pub fn reregister<E: ?Sized>(&mut self, io: &E, token: Token, interest: Ready, opt: PollOpt) -> io::Result<()>where E: Evented{self.poll.reregister(io, token, interest, opt)}/// Deregisters an IO handle with the event loop.pub fn deregister<E: ?Sized>(&mut self, io: &E) -> io::Result<()> where E: Evented {self.poll.deregister(io)}/// Returns a sender that allows sending messages to the event loop in a/// thread-safe way, waking up the event loop if needed.pub fn channel(&self) -> Sender<H::Message> {Sender::new(self.notify_tx.clone())}/// Schedules a timeout after the requested time interval. When the/// duration has been reached,pub fn timeout(&mut self, token: H::Timeout, delay: Duration) -> timer::Result<Timeout> {self.timer.set_timeout(delay, token)}/// If the supplied timeout has not been triggered, cancel it such that it/// will not be triggered in the future.pub fn clear_timeout(&mut self, timeout: &Timeout) -> bool {self.timer.cancel_timeout(&timeout).is_some()}/// Tells the event loop to exit after it is done handling all events in the current iteration.pub fn shutdown(&mut self) {self.run = false;}/// Indicates whether the event loop is currently running. If it's not it has either/// stopped or is scheduled to stop on the next tick.pub fn is_running(&self) -> bool {self.run}
}
复制代码

【2】Poll代码分析

Poll屏蔽了不同系统的实现,给出了统一的抽象。Poll的实现代码这里只能列出较为重要的部分代码,有一部分代码省略掉了,详细代码可查看mio/src/poll.rs:

pub struct Poll {// Platform specific IO selectorselector: sys::Selector,    // Custom readiness queue// The second readiness queue is implemented in user space by `ReadinessQueue`. It provides a way to implement purely user space `Evented` types.readiness_queue: ReadinessQueue,  //区别于系统就绪队列(sys::Selector),这是上层自己实现的就绪队列// Use an atomic to first check if a full lock will be required. This is a// fast-path check for single threaded cases avoiding the extra syscalllock_state: AtomicUsize,// Sequences concurrent calls to `Poll::poll`lock: Mutex<()>,// Wakeup the next waitercondvar: Condvar,
}impl Poll {/// Return a new `Poll` handle.pub fn new() -> io::Result<Poll> {is_send::<Poll>();is_sync::<Poll>();let poll = Poll {selector: sys::Selector::new()?,readiness_queue: ReadinessQueue::new()?,lock_state: AtomicUsize::new(0),lock: Mutex::new(()),condvar: Condvar::new(),};// Register the notification wakeup FD with the IO pollerpoll.readiness_queue.inner.awakener.register(&poll, AWAKEN, Ready::readable(), PollOpt::edge())?;Ok(poll)}/// Wait for readiness events////// Blocks the current thread and waits for readiness events for any of the/// `Evented` handles that have been registered with this `Poll` instance./// The function will block until either at least one readiness event has/// been received or `timeout` has elapsed. A `timeout` of `None` means that/// `poll` will block until a readiness event has been received.pub fn poll(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {self.poll1(events, timeout, false)        //Poll::poll()非常最重要的一个方法, poll()-->poll1()-->poll2()}fn poll1(&self, events: &mut Events, mut timeout: Option<Duration>, interruptible: bool) -> io::Result<usize> {let zero = Some(Duration::from_millis(0));let mut curr = self.lock_state.compare_and_swap(0, 1, SeqCst);if 0 != curr { ... } //{ ... }代表中间有很多代码被省略掉了.let ret = self.poll2(events, timeout, interruptible);// Release the lockif 1 != self.lock_state.fetch_and(!1, Release) { ... }    //{ ... }代表中间有很多代码被省略掉了.ret}#[inline]fn poll2(&self, events: &mut Events, mut timeout: Option<Duration>, interruptible: bool) -> io::Result<usize> {// Compute the timeout value passed to the system selector. If the// readiness queue has pending nodes, we still want to poll the system// selector for new events, but we don't want to block the thread to// wait for new events.if timeout == Some(Duration::from_millis(0)) {// If blocking is not requested, then there is no need to prepare// the queue for sleep//// The sleep_marker should be removed by readiness_queue.poll().} else if self.readiness_queue.prepare_for_sleep() {// The readiness queue is empty. The call to `prepare_for_sleep`// inserts `sleep_marker` into the queue. This signals to any// threads setting readiness that the `Poll::poll` is going to// sleep, so the awakener should be used.} else {// The readiness queue is not empty, so do not block the thread.timeout = Some(Duration::from_millis(0));}//poll系统就绪队列loop {let now = Instant::now();// First get selector eventslet res = self.selector.select(&mut events.inner, AWAKEN, timeout);   //Linux下调用epoll_wait(),就绪事件放入events中match res {Ok(true) => {// Some awakeners require reading from a FD.self.readiness_queue.inner.awakener.cleanup();break;}Ok(false) => break,Err(ref e) if e.kind() == io::ErrorKind::Interrupted && !interruptible => {// Interrupted by a signal; update timeout if necessary and retryif let Some(to) = timeout {let elapsed = now.elapsed();if elapsed >= to {break;} else {timeout = Some(to - elapsed);}}}Err(e) => return Err(e),}}// Poll custom event queueself.readiness_queue.poll(&mut events.inner);   //Poll用户就绪队列// Return number of polled eventsOk(events.inner.len())}/// Register an `Evented` handle with the `Poll` instance.pub fn register<E: ?Sized>(&self, handle: &E, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()>where E: Evented {validate_args(token)?;// Register interests for this sockethandle.register(self, token, interest, opts)?;Ok(())}/// Re-register an `Evented` handle with the `Poll` instance.pub fn reregister<E: ?Sized>(&self, handle: &E, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()>where E: Evented {validate_args(token)?;// Register interests for this sockethandle.reregister(self, token, interest, opts)?;Ok(())}/// Deregister an `Evented` handle with the `Poll` instance.pub fn deregister<E: ?Sized>(&self, handle: &E) -> io::Result<()>where E: Evented {// Deregister interests for this sockethandle.deregister(self)?;Ok(())}
}
复制代码

【3】Selector代码分析

下面这段代码出自mio/src/sys/unix/epoll.rs是对底层Linux系统epoll的封装抽象,可以看到Selector::new()内部实际上调用了epoll_create()Selector::select()内部实际上调用了epoll_wait()register()reregister()deregister()实内部实际上调用了epoll_ctl()。如果你非常熟悉epoll,就会感觉下面的代码很熟悉,详细代码如下:

pub struct Selector {id: usize,epfd: RawFd,
}impl Selector {pub fn new() -> io::Result<Selector> {let epfd = unsafe {// Emulate `epoll_create` by using `epoll_create1` if it's available// and otherwise falling back to `epoll_create` followed by a call to// set the CLOEXEC flag.dlsym!(fn epoll_create1(c_int) -> c_int);match epoll_create1.get() {Some(epoll_create1_fn) => {cvt(epoll_create1_fn(libc::EPOLL_CLOEXEC))?}None => {let fd = cvt(libc::epoll_create(1024))?;drop(set_cloexec(fd));fd}}};// offset by 1 to avoid choosing 0 as the id of a selectorlet id = NEXT_ID.fetch_add(1, Ordering::Relaxed) + 1;Ok(Selector {id: id,epfd: epfd,})}pub fn id(&self) -> usize {self.id}/// Wait for events from the OSpub fn select(&self, evts: &mut Events, awakener: Token, timeout: Option<Duration>) -> io::Result<bool> {let timeout_ms = timeout.map(|to| cmp::min(millis(to), i32::MAX as u64) as i32).unwrap_or(-1);// Wait for epoll events for at most timeout_ms millisecondsevts.clear();unsafe {let cnt = cvt(libc::epoll_wait(self.epfd,evts.events.as_mut_ptr(),evts.events.capacity() as i32,timeout_ms))?;let cnt = cnt as usize;evts.events.set_len(cnt);for i in 0..cnt {if evts.events[i].u64 as usize == awakener.into() {evts.events.remove(i);return Ok(true);}}}Ok(false)}/// Register event interests for the given IO handle with the OSpub fn register(&self, fd: RawFd, token: Token, interests: Ready, opts: PollOpt) -> io::Result<()> {let mut info = libc::epoll_event {events: ioevent_to_epoll(interests, opts),u64: usize::from(token) as u64};unsafe {cvt(libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_ADD, fd, &mut info))?;Ok(())}}/// Register event interests for the given IO handle with the OSpub fn reregister(&self, fd: RawFd, token: Token, interests: Ready, opts: PollOpt) -> io::Result<()> {let mut info = libc::epoll_event {events: ioevent_to_epoll(interests, opts),u64: usize::from(token) as u64};unsafe {cvt(libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_MOD, fd, &mut info))?;Ok(())}}/// Deregister event interests for the given IO handle with the OSpub fn deregister(&self, fd: RawFd) -> io::Result<()> {// The &info argument should be ignored by the system,// but linux < 2.6.9 required it to be not null.// For compatibility, we provide a dummy EpollEvent.let mut info = libc::epoll_event {events: 0,u64: 0,};unsafe {cvt(libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_DEL, fd, &mut info))?;Ok(())}}
}
复制代码

【4】Notify channel代码分析

这个涉及的代码比较多,比较杂,也较为难以理解。

// `ReadinessQueue` is backed by a MPSC queue that supports reuse of linked
// list nodes. This significantly reduces the number of required allocations.
// Each `Registration` / `SetReadiness` pair allocates a single readiness node
// that is used for the lifetime of the registration.
//
// The readiness node also includes a single atomic variable, `state` that
// tracks most of the state associated with the registration. This includes the
// current readiness, interest, poll options, and internal state. When the node
// state is mutated, it is queued in the MPSC channel. A call to
// `ReadinessQueue::poll` will dequeue and process nodes. The node state can
// still be mutated while it is queued in the channel for processing.
// Intermediate state values do not matter as long as the final state is
// included in the call to `poll`. This is the eventually consistent nature of
// the readiness queue.
//
// The readiness node is ref counted using the `ref_count` field. On creation,
// the ref_count is initialized to 3: one `Registration` handle, one
// `SetReadiness` handle, and one for the readiness queue. Since the readiness queue
// doesn't *always* hold a handle to the node, we don't use the Arc type for
// managing ref counts (this is to avoid constantly incrementing and
// decrementing the ref count when pushing & popping from the queue). When the
// `Registration` handle is dropped, the `dropped` flag is set on the node, then
// the node is pushed into the registration queue. When Poll::poll pops the
// node, it sees the drop flag is set, and decrements it's ref count.
//
// The MPSC queue is a modified version of the intrusive MPSC node based queue
// described by 1024cores [1].
#[derive(Clone)]
struct ReadinessQueue {inner: Arc<ReadinessQueueInner>,
}struct ReadinessQueueInner {// Used to wake up `Poll` when readiness is set in another thread.awakener: sys::Awakener,// Head of the MPSC queue used to signal readiness to `Poll::poll`.head_readiness: AtomicPtr<ReadinessNode>,// Tail of the readiness queue.//// Only accessed by Poll::poll. Coordination will be handled by the poll fntail_readiness: UnsafeCell<*mut ReadinessNode>,// Fake readiness node used to punctuate the end of the readiness queue.// Before attempting to read from the queue, this node is inserted in order// to partition the queue between nodes that are "owned" by the dequeue end// and nodes that will be pushed on by producers.end_marker: Box<ReadinessNode>,// Similar to `end_marker`, but this node signals to producers that `Poll`// has gone to sleep and must be woken up.sleep_marker: Box<ReadinessNode>,// Similar to `end_marker`, but the node signals that the queue is closed.// This happens when `ReadyQueue` is dropped and signals to producers that// the nodes should no longer be pushed into the queue.closed_marker: Box<ReadinessNode>,
}
复制代码
/// Node shared by a `Registration` / `SetReadiness` pair as well as the node
/// queued into the MPSC channel.
struct ReadinessNode {// Node state, see struct docs for `ReadinessState`//// This variable is the primary point of coordination between all the// various threads concurrently accessing the node.state: AtomicState,// The registration token cannot fit into the `state` variable, so it is// broken out here. In order to atomically update both the state and token// we have to jump through a few hoops.//// First, `state` includes `token_read_pos` and `token_write_pos`. These can// either be 0, 1, or 2 which represent a token slot. `token_write_pos` is// the token slot that contains the most up to date registration token.// `token_read_pos` is the token slot that `poll` is currently reading from.//// When a call to `update` includes a different token than the one currently// associated with the registration (token_write_pos), first an unused token// slot is found. The unused slot is the one not represented by// `token_read_pos` OR `token_write_pos`. The new token is written to this// slot, then `state` is updated with the new `token_write_pos` value. This// requires that there is only a *single* concurrent call to `update`.//// When `poll` reads a node state, it checks that `token_read_pos` matches// `token_write_pos`. If they do not match, then it atomically updates// `state` such that `token_read_pos` is set to `token_write_pos`. It will// then read the token at the newly updated `token_read_pos`.token_0: UnsafeCell<Token>,token_1: UnsafeCell<Token>,token_2: UnsafeCell<Token>,// Used when the node is queued in the readiness linked list. Accessing// this field requires winning the "queue" locknext_readiness: AtomicPtr<ReadinessNode>,// Ensures that there is only one concurrent call to `update`.//// Each call to `update` will attempt to swap `update_lock` from `false` to// `true`. If the CAS succeeds, the thread has obtained the update lock. If// the CAS fails, then the `update` call returns immediately and the update// is discarded.update_lock: AtomicBool,// Pointer to Arc<ReadinessQueueInner>readiness_queue: AtomicPtr<()>,// Tracks the number of `ReadyRef` pointersref_count: AtomicUsize,
}
复制代码
/// Handle to a user space `Poll` registration.
///
/// `Registration` allows implementing [`Evented`] for types that cannot work
/// with the [system selector]. A `Registration` is always paired with a
/// `SetReadiness`, which allows updating the registration's readiness state.
/// When [`set_readiness`] is called and the `Registration` is associated with a
/// [`Poll`] instance, a readiness event will be created and eventually returned
/// by [`poll`].
pub struct Registration {inner: RegistrationInner,
}
复制代码
/// Updates the readiness state of the associated `Registration`.
#[derive(Clone)]
pub struct SetReadiness {inner: RegistrationInner,
}
复制代码

未完,待续......

参考文档:Intrusive MPSC node-based queue

【5】Timer定时器代码分析

pub struct Timer<T> {// Size of each tick in millisecondstick_ms: u64,// Slab of timeout entriesentries: Slab<Entry<T>>,// Timeout wheel. Each tick, the timer will look at the next slot for// timeouts that match the current tick.wheel: Vec<WheelEntry>,// Tick 0's time instantstart: Instant,// The current ticktick: Tick,// The next entry to possibly timeoutnext: Token,// Masks the target tick to get the slotmask: u64,// Set on registration with Pollinner: LazyCell<Inner>,
}
复制代码

未完,待续......

三、mio用法示例

下面的2个示例都很简单,其实直接看mio的测试代码mio/test/就好了,不用看下面的2个示例。

1、代码示例1

直接使用Poll示例如下:

#[macro_use]
extern crate log;
extern crate simple_logger;
extern crate mio;use mio::*;
use mio::tcp::{TcpListener, TcpStream};
use std::io::{Read,Write};fn main() {simple_logger::init().unwrap();// Setup some tokens to allow us to identify which event is for which socket.const SERVER: Token = Token(0);const CLIENT: Token = Token(1);let addr = "127.0.0.1:12345".parse().unwrap();// Setup the server socketlet server = TcpListener::bind(&addr).unwrap();// Create a poll instancelet poll = Poll::new().unwrap();// Start listening for incoming connectionspoll.register(&server, SERVER, Ready::readable(), PollOpt::edge()).unwrap();// Setup the client socketlet sock = TcpStream::connect(&addr).unwrap();// Register the socketpoll.register(&sock, CLIENT, Ready::readable(), PollOpt::edge()).unwrap();// Create storage for eventslet mut events = Events::with_capacity(1024);loop {poll.poll(&mut events, None).unwrap();for event in events.iter() {match event.token() {SERVER => {// Accept and drop the socket immediately, this will close// the socket and notify the client of the EOF.let (stream,addr) = server.accept().unwrap();info!("Listener accept {:?}",addr);},CLIENT => {// The server just shuts down the socket, let's just exit// from our event loop.info!("client response.");return;},_ => unreachable!(),}}}
}
复制代码

通过上面的代码示例1,我们可以看到其用法与epoll非常相似。

2、代码示例2

上面的代码编程时较为麻烦,下面使用事件循环EventLoop的方式,代码能看起来更清晰一些(相对的):

#[macro_use]
extern crate log;
extern crate simple_logger;
extern crate mio;use mio::*;
use mio::timer::{Timeout};
use mio::deprecated::{EventLoop, Handler, Sender, EventLoopBuilder};
use std::thread;
use std::time::Duration;fn main() {simple_logger::init().unwrap();let mut event_loop=EventLoop::new().unwrap();let channel_sender=event_loop.channel();thread::spawn(move ||{channel_sender.send(IoMessage::Notify);thread::sleep_ms(5*1000);channel_sender.send(IoMessage::End);});let timeout = event_loop.timeout(Token(123), Duration::from_millis(3000)).unwrap();let mut handler=MioHandler::new();let _ = event_loop.run(&mut handler).unwrap();
}pub enum IoMessage{Notify,End,
}pub struct MioHandler{
}impl MioHandler{pub fn new()->Self{MioHandler{}}
}impl Handler for MioHandler {type Timeout = Token;type Message = IoMessage;/// Invoked when the socket represented by `token` is ready to be operated on.fn ready(&mut self, event_loop: &mut EventLoop<Self>, token: Token, events: Ready) {}/// Invoked when a message has been received via the event loop's channel.fn notify(&mut self, event_loop: &mut EventLoop<Self>, msg: Self::Message) {match msg {IoMessage::Notify=>info!("channel notify"),IoMessage::End=>{info!("shutdown eventloop.");event_loop.shutdown();}}}/// Invoked when a timeout has completed.fn timeout(&mut self, event_loop: &mut EventLoop<Self>, timeout: Self::Timeout) {match timeout{Token(123)=>info!("time out."),Token(_)=>{},}}/// Invoked when `EventLoop` has been interrupted by a signal interrupt.fn interrupted(&mut self, event_loop: &mut EventLoop<Self>) {}/// Invoked at the end of an event loop tick.fn tick(&mut self, event_loop: &mut EventLoop<Self>) {}
}
复制代码

这个示例说明了超时及channel,围绕EventLoop编程,其实与上一个例子没有什么不同,只是EventLoopPoll做了封装。

参考文档:
【譯】Tokio 內部機制:從頭理解 Rust 非同步 I/O 框架
使用mio开发web framework - base
My Basic Understanding of mio and Asynchronous IO
MIO for Rust
mio-github

Rust轻量级I/O库mio相关推荐

  1. Datenlord | Rust实现RDMA异步编程(二):async Rust 封装 UCX 通信库

    UCX 是一个高性能网络通信库,它作为 MPI 所依赖的通信模块之一在高性能计算领域得到广泛的使用.UCX 使用 C 语言编写,为了在 Rust 项目中使用它,我们需要将它的 C 接口包装成 Rust ...

  2. 找个轻量级的Log库还挺难

    这两天一直希望找个可以移植到VxWorks上的Log库,早就知道大名鼎鼎的Log4c,但一直想找个更好的,本来看上了Pantheios,觉得它的架构非常清晰,使用也很简便,特别是其网站上宣传它的性能非 ...

  3. 【一起学Rust | 进阶篇 | thesaurus-rs库】Rust 的离线同义词库——thesaurus-rs

    文章目录 前言 后端比较 一.安装与引用 1. 使用WordNet后端 2. 使用moby后端 3. build下载crate 二.使用步骤 1.获取命令行参数 2.取到同义词 补充 3.输出结果 4 ...

  4. golang byte转string_Golang和Rust语言常见功能/库

    时下最流行.最具发展前途的的两门语言是Golang和Rust.Golang语言简洁.高效.并发.并且有个强大的囊括了常见功能标准库.与之相对比,Rust语言则主要是安全.高性能.虽然Rust没有gol ...

  5. python中的轻量级定时任务调度库:schedule

    提到定时任务调度的时候,相信很多人会想到celery,要么就写个脚本塞到crontab中.不过,一个小的定时脚本,要用celery的话太"重"了.所以,我找到了一个轻量级的定时任务 ...

  6. 12 个轻量级的 JavaScript 库

    1. meSing.js meSing.js是一个一个JavaScript歌唱合成库,它使用Web Audio API的DSP功能与meSpeak.js语音合成库结合,为Web提供声乐合成器. 2. ...

  7. rust第三人称视角插件_一个第三人称游戏相机的实现(基于rust语言和cgmath库)...

    我在学校时就对角色扮演类3D游戏感兴趣,毕业那会研究过第三人称视角的游戏相机的实现(基于D3D),但由于没有想到合适的计算方法,最后实现出来的程序有BUG,并且没找出原因. 最近看到git有rust写 ...

  8. rust超低配置补丁_腐蚀rust游戏必备运行库下载_rust腐蚀启动插件下载-游迅网

    不知道怎么下载?点我 游戏介绍 <腐蚀>这款游戏有些玩家首次下载安装的时候,可能无法启动,那是因为电脑上缺少<腐蚀>游戏必备运行库,插件只有一个,玩家很快就能够下载安装吧,然后 ...

  9. date-fns日期格式化_date-fns简介–轻量级JavaScript日期库

    date-fns日期格式化 在JavaScript中使用日期非常麻烦. 本地日期方法通常很冗长, 有时也不一致 -这也使它们易于出错. 但是,好消息即将到来. 有几个库可以消除过时的痛苦. 这些库是J ...

最新文章

  1. 数据结构与算法分析-第一章Java类(04)
  2. 2017-2018-1 20155231 《信息安全系统设计基础》第6周学习总结
  3. html一个div浮动在另一div上,css – 在另一个DIV的顶部浮动DIV
  4. mysql中find_in_set结合GROUP_CONCAT使用
  5. 【MIPS汇编】ADDI,ADDIU,ADD,ADDU的区别、有符号无符号的谬误
  6. 解决eclipse + pydev 编译过程中有中文的问题
  7. Wireshark文档阅读笔记-TCP 3 way handshaking解析与实例
  8. 在vs中使用python
  9. opencv移植到ubuntu
  10. Atitit 粘贴路径参数法 跨进程通讯法 目录 1. .IPC(Inter-Process Communication,跨进程通信) 1 1.1. .IPC的使用场景: 2 2. 传统的进程间通
  11. python中用rdflib生成rdf,用sparql查询
  12. Android 面(被)试(锤)现场还原~
  13. uni-app 生成安卓证书
  14. 手撸一个外卖点餐系统后台,可以写上简历的实战项目!
  15. php 防挂马,织梦dedecms安全设置防挂马教程
  16. 怎么用优动漫PAINT做出色彩的朦胧感?
  17. 比较两个路径的几种方式
  18. yum是干什么的_linux下的yum命令详解
  19. 蓝桥杯C/C++程序设计 往届真题汇总(基础篇)
  20. Excel根据出生日期判断生肖,Leo老师来教你!

热门文章

  1. QQ红包终于支持微信支付了
  2. 微软服务器认证找工作,微软认证技术工程师MCTS
  3. android平板苹果,苹果界面让人生厌 5款Android平板推荐
  4. 搭建恋爱话术库一个月赚5万,一年全款车!投入不到两千
  5. iOS中CAShapeLayer用法
  6. mac版phpstorm中文切换为英文
  7. 国内网站用香港服务器会被封吗?
  8. composer 2.0 报错:No composer.json present in the current directory, this may be the cause of the foll
  9. 【软件测试】简历中的项目经历可以怎么写?
  10. 160个CrackMe 028 Cosh.2