消息机制

当使用vhost-user时,需要在系统中创建一个unix domain socket server,用来处理qemu发送给host的消息。

如果有新的socket连接,说明guest创建了新的virtio-net设备,vhost驱动会为之创建一个vhost设备,之后qemu就可以通过socket和vhost进行通信了;当socket关闭,vhost就会销毁对应的设备。

常用的消息包括:

//driver\net\virtio\virtio_user\vhost_kernel.c
/* vhost kernel ioctls */
#define VHOST_VIRTIO 0xAF
/*返回vhost支持的virtio-net功能子集*/
#define VHOST_GET_FEATURES _IOR(VHOST_VIRTIO, 0x00, __u64)
/*检查功能掩码,设置vhost和virtio前端共同支持的特性,需要两者同时支持才能生效*/
#define VHOST_SET_FEATURES _IOW(VHOST_VIRTIO, 0x00, __u64)
/*将设备设置为当前进程所有*/
#define VHOST_SET_OWNER _IO(VHOST_VIRTIO, 0x01)
/*当前进程释放对设备的所有权*/
#define VHOST_RESET_OWNER _IO(VHOST_VIRTIO, 0x02)
/*设置内存空间布局信息,用于报文收发时的地址转换*/
#define VHOST_SET_MEM_TABLE _IOW(VHOST_VIRTIO, 0x03, struct vhost_memory_kernel)
/*下面两个宏,用于guest在线迁移*/
#define VHOST_SET_LOG_BASE _IOW(VHOST_VIRTIO, 0x04, __u64)
#define VHOST_SET_LOG_FD _IOW(VHOST_VIRTIO, 0x07, int)
/*vhost记录每个虚拟队列的大小*/
#define VHOST_SET_VRING_NUM _IOW(VHOST_VIRTIO, 0x10, struct vhost_vring_state)
/*由qemu发送virtqueue结构的虚拟地址。vhost将该地址转换成vhost的虚拟地址。*/
#define VHOST_SET_VRING_ADDR _IOW(VHOST_VIRTIO, 0x11, struct vhost_vring_addr)
/*传递初始索引值,vhost通过该索引值找到初始描述符*/
#define VHOST_SET_VRING_BASE _IOW(VHOST_VIRTIO, 0x12, struct vhost_vring_state)
/*将虚拟队列的当前可用索引值发送给qemu*/
#define VHOST_GET_VRING_BASE _IOWR(VHOST_VIRTIO, 0x12, struct vhost_vring_state)
/*传递eventfd文件描述符。当guest有新的数据要发送时,通过该文件描述符通知vhsot接收数据
* 并发送到目的地;vhost使用eventfd代理模块把这个文件描述符从qemu上下文切换到自己的进程
* 上下文
*/
#define VHOST_SET_VRING_KICK _IOW(VHOST_VIRTIO, 0x20, struct vhost_vring_file)
/*也是用来传递eventfd文件描述符。使vhost能够在完成对新的数据包接收时,通过中断方式通知
*guest准备接收数据包。使用eventfd代理模块把这个文件描述符从qemu上下文切换到自己的进程
*上下文
*/
#define VHOST_SET_VRING_CALL _IOW(VHOST_VIRTIO, 0x21, struct vhost_vring_file)
/*代码中仅有定义,未使用*/
#define VHOST_SET_VRING_ERR _IOW(VHOST_VIRTIO, 0x22, struct vhost_vring_file)
/*用来支持virtio-user*/
#define VHOST_NET_SET_BACKEND _IOW(VHOST_VIRTIO, 0x30, struct vhost_vring_file)

地址转换和内存映射

virtqueue和vring进行数据交换的核心是使用一种机制将数据缓冲区实现对guest和host同时可见,从而通过避免数据的拷贝来消耗性能。dpdk vhost在这里使用的是大页内存、内存映射以及相应的地址转换来完成这个功能的。

因此,host端必须由足够的大页空间,同时需要指定内存预分配。为了vhost能访问virtqueue和数据包缓冲区,所有的描述符表、环表地址,其所在页面必须被映射到vhost的进程空间中。

vhost在收到VHOST_SET_MEM_TABLE消息后,会使用消息中的内存分布表来完成内存映射工作:

/*下面的两个数据结构记录guest的物理地址及偏移量*/
/**
* Information relating to memory regions including offsets to
* addresses in QEMUs memory file.
*/
struct rte_vhost_mem_region {uint64_t guest_phys_addr;uint64_t guest_user_addr;uint64_t host_user_addr;uint64_t size;void     *mmap_addr;uint64_t mmap_size;int fd;
};/**
* Memory structure includes region and mapping information.
*/
struct rte_vhost_memory {uint32_t nregions;struct rte_vhost_mem_region regions[];
};/*
*将 QEMU virtual address 转化成 Vhost virtual address. 该函数用来将ring address
* 转换成host端的virtual address
*/
static uint64_t
qva_to_vva(struct virtio_net *dev, uint64_t qva)
{struct rte_vhost_mem_region *reg;uint32_t i;/* Find the region where the address lives. */for (i = 0; i < dev->mem->nregions; i++) {reg = &dev->mem->regions[i];if (qva >= reg->guest_user_addr &&qva <  reg->guest_user_addr + reg->size) {return qva - reg->guest_user_addr +reg->host_user_addr;}}return 0;
}

virtio-net 设备管理

一个virtio-net设备的生命周期包括设备创建、配置、服务启动和设备销毁几个阶段。

1、设备创建

vhost-user通过socket连接来创建。当创建一个virtio-net设备是,需要分配新的virtio-net设备结构,并添加到设备链表中为该设备分配一个处理处理核并添加设备到数据面的链表中在vhost上分配一个为virtio-net设备服务的RX\TX队列

2、 配置

利用VHOST_SET_VRING_*消息通知vhost虚拟队列的大小、基本索引和位置,vhost将虚拟队列映射到自己的虚拟地址空间

3、服务启动

vhost利用VHOST_SET_VRING_KICK消息来启动虚拟队列服务。之后,vhost便可以轮询接收队列,并将数据放到virtio-net设备的接收队列上。同时,也可以轮询发送虚拟队列,查看是否有待发送的数据包,如果有,则将其复制到发送队列中。

4、 设备销毁

vhost利用VHOST_GET_VRING_BASE消息来通知停止提供对接收队列和发送虚拟队列的服务。同时,分配给virtio-net设备的处理和和物理网卡上的RX和TX队列也将被释放。

比较重要的API

下面从代码角度来理解下前面描述的过程,几个比较重要的API包括:

注册驱动接口

int rte_vhost_driver_register(const char *path, uint64_t flags)

这个函数负责在系统中注册一个vhost driver,path表示socket的路径。flags在最新的17.05版本中(之前的版本中还不支持可设置,只默认支持client,重连)支持下面几个特性:

  • RTE_VHOST_USER_CLIENT :以client模式和QEMU相连

  • RTE_VHOST_USER_NO_RECONNECT: 默认情况下client会一直尝试自动和server(QEMU)建立连接,当server还没有启动或者重启时,通过此flag可以关闭该特性

  • RTE_VHOST_USER_DEQUEUE_ZERO_COPY:用于vm2vm,vm2nic通信的一种优化方案,默认关闭

来读下代码:

int rte_vhost_driver_register(const char *path, uint64_t flags)
{int ret = -1;.../*创建一个vhost-user socket,并根据不同的flag设置不同的特性*/struct vhost_user_socket *vsocket;vsocket = malloc(sizeof(struct vhost_user_socket));if (!vsocket)goto out;memset(vsocket, 0, sizeof(struct vhost_user_socket));vsocket->path = strdup(path);TAILQ_INIT(&vsocket->conn_list);pthread_mutex_init(&vsocket->conn_mutex, NULL);vsocket->dequeue_zero_copy = flags & RTE_VHOST_USER_DEQUEUE_ZERO_COPY;/**设置上内置支持属性,这些特性对用户都是透明的*/vsocket->supported_features = VIRTIO_NET_SUPPORTED_FEATURES;vsocket->features           = VIRTIO_NET_SUPPORTED_FEATURES;if ((flags & RTE_VHOST_USER_CLIENT) != 0) {vsocket->reconnect = !(flags & RTE_VHOST_USER_NO_RECONNECT);if (vsocket->reconnect && reconn_tid == 0) {/*创建一个线程,这个线程会在后台一直扫描全局的reconn_list链表,*不断的尝试将链表中的socket和server进行连接*/if (vhost_user_reconnect_init() < 0) {free(vsocket->path);free(vsocket);goto out;}}} else {/*可以看到此版本也是支持server模式的,这种情况需要QEMU充当client,*对QEMU的版本有依赖。*/vsocket->is_server = true;}/*最终也就是创建了一个unix socket来实现通信功能*/ret = create_unix_socket(vsocket);if (ret < 0) {free(vsocket->path);free(vsocket);goto out;}/*完成后将socket插入到vhost_user.vsockets数组中,供后续操作查询socket,*查找操作见find_vhost_user_socket(),当前最大支持创建1024个sockets*/vhost_user.vsockets[vhost_user.vsocket_cnt++] = vsocket;...
}/*封装的socket创建函数,没啥可说的*/
int create_unix_socket(struct vhost_user_socket *vsocket)
{int fd;struct sockaddr_un *un = &vsocket->un;fd = socket(AF_UNIX, SOCK_STREAM, 0);if (fd < 0)return -1;RTE_LOG(INFO, VHOST_CONFIG, "vhost-user %s: socket created, fd: %d\n",vsocket->is_server ? "server" : "client", fd);if (!vsocket->is_server && fcntl(fd, F_SETFL, O_NONBLOCK)) {RTE_LOG(ERR, VHOST_CONFIG,"vhost-user: can't set nonblocking mode for socket, fd: ""%d (%s)\n", fd, strerror(errno));close(fd);return -1;}memset(un, 0, sizeof(*un));un->sun_family = AF_UNIX;strncpy(un->sun_path, vsocket->path, sizeof(un->sun_path));un->sun_path[sizeof(un->sun_path) - 1] = '\0';vsocket->socket_fd = fd;return 0;
}/*查找函数*/
struct vhost_user_socket *
find_vhost_user_socket(const char *path)
{int i;/*通过遍历数组方式进行查找,时间效率0(N),好在不会创建太多,*估计是考虑过,但觉得不值得做优化*/for (i = 0; i < vhost_user.vsocket_cnt; i++) {struct vhost_user_socket *vsocket = vhost_user.vsockets[i];if (!strcmp(vsocket->path, path))return vsocket;}return NULL;
}

设置使能特性

/*显式设置支持新特性*/
int rte_vhost_driver_set_features(const char *path, uint64_t features)
/*使能相关特性*/
int rte_vhost_driver_enable_features(const char *path, uint64_t features)
/*去使能相关特性*/
int rte_vhost_driver_disable_features(const char *path, uint64_t features)

以上的操作都是针对socket->features做软件特性的设置,原理大同小异;这些接口可以用来在driver注册后,对该driver的特性进行微调。

比如当支持mergeable特性时,可以调用rte_vhost_driver_enable_features(file,1ULL << VIRTIO_NET_F_MRG_RXBUF)来进行设置。

当前支持的特性包括:

/* The feature bitmap for virtio net */
#define VIRTIO_NET_F_CSUM   0   /* Host handles pkts w/ partial csum */
#define VIRTIO_NET_F_GUEST_CSUM 1   /* Guest handles pkts w/ partial csum */
#define VIRTIO_NET_F_MTU    3   /* Initial MTU advice. */
#define VIRTIO_NET_F_MAC    5   /* Host has given MAC address. */
#define VIRTIO_NET_F_GUEST_TSO4 7   /* Guest can handle TSOv4 in. */
#define VIRTIO_NET_F_GUEST_TSO6 8   /* Guest can handle TSOv6 in. */
#define VIRTIO_NET_F_GUEST_ECN  9   /* Guest can handle TSO[6] w/ ECN in. */
#define VIRTIO_NET_F_GUEST_UFO  10  /* Guest can handle UFO in. */
#define VIRTIO_NET_F_HOST_TSO4  11  /* Host can handle TSOv4 in. */
#define VIRTIO_NET_F_HOST_TSO6  12  /* Host can handle TSOv6 in. */
#define VIRTIO_NET_F_HOST_ECN   13  /* Host can handle TSO[6] w/ ECN in. */
#define VIRTIO_NET_F_HOST_UFO   14  /* Host can handle UFO in. */
#define VIRTIO_NET_F_MRG_RXBUF  15  /* Host can merge receive buffers. */
#define VIRTIO_NET_F_STATUS 16  /* virtio_net_config.status available */
#define VIRTIO_NET_F_CTRL_VQ    17  /* Control channel available */
#define VIRTIO_NET_F_CTRL_RX    18  /* Control channel RX mode support */
#define VIRTIO_NET_F_CTRL_VLAN  19  /* Control channel VLAN filtering */
#define VIRTIO_NET_F_CTRL_RX_EXTRA 20   /* Extra RX mode control support */
#define VIRTIO_NET_F_GUEST_ANNOUNCE 21  /* Guest can announce device on the
* network */
#define VIRTIO_NET_F_MQ     22  /* Device supports Receive Flow
* Steering */
#define VIRTIO_NET_F_CTRL_MAC_ADDR 23   /* Set MAC address */
/* Do we get callbacks when the ring is completely used, even if we've
* suppressed them? */
#define VIRTIO_F_NOTIFY_ON_EMPTY    24
/* Can the device handle any descriptor layout? */
#define VIRTIO_F_ANY_LAYOUT     27
/* We support indirect buffer descriptors */
#define VIRTIO_RING_F_INDIRECT_DESC 28
#define VIRTIO_F_VERSION_1      32
#define VIRTIO_F_IOMMU_PLATFORM 33

驱动的操作函数

rte_vhost_driver_callback_register()

int rte_vhost_driver_callback_register(const char *path,struct vhost_device_ops const * const ops)

重点是第二个参数:

struct vhost_device_ops {int (*new_device)(int vid);     /**< Add device. */void (*destroy_device)(int vid);    /**< Remove device. */int (*vring_state_changed)(int vid, uint16_t queue_id, int enable);int (*features_changed)(int vid, uint64_t features);void *reserved[4]; /**< Reserved for future extension */
};

new_device()

new_device(int vid)

当virtual device就绪时,调用该函数。该函数用来创建并初始化device的配置,包括virtqueue,virtio_memory等相关,完成后将该device插入到一个单向链表中,供配置查询使用

destory_device()

destory_device(int vid)

当virtio设备关闭或者connection断掉时,执行该操作。

vring_state_changed()

vring_state_changed(int vid,uint16_t queue_id, int enable)

该操作可以在device的特性改变时,注册使用。比如记log日志。

features_changed()

features_changed(int vid, uint64_t features)

这个操作会在features改变时调用,可以动态实现一些功能。例如:VHOST_F_LOG_ALL会在动态迁移的开始/结束时分别被enable/disable。

使能device

该接口会触发vhost-user进行协商动作,属于驱动初始化的最后一个步骤。

int rte_vhost_driver_start(const char *path)

研究下代码:

int rte_vhost_driver_start(const char *path)
{struct vhost_user_socket *vsocket;static pthread_t fdset_tid;/*根据之前记录的数组,找到socket*/pthread_mutex_lock(&vhost_user.mutex);vsocket = find_vhost_user_socket(path);pthread_mutex_unlock(&vhost_user.mutex);if (!vsocket)return -1;/*创建fdset handling 线程*/if (fdset_tid == 0) {int ret = pthread_create(&fdset_tid, NULL, fdset_event_dispatch,&vhost_user.fdset);if (ret < 0)RTE_LOG(ERR, VHOST_CONFIG,"failed to create fdset handling thread");
}/*根据启动时指定的模式,执行不同的动作*/if (vsocket->is_server)return vhost_user_start_server(vsocket);elsereturn vhost_user_start_client(vsocket);
}/*client模式*/
vhost_user_start_client(struct vhost_user_socket *vsocket)
{int ret;int fd = vsocket->socket_fd;const char *path = vsocket->path;struct vhost_user_reconnect *reconn;/*和server进行连接,检查是否可以和server进行连接* 关于server socket的创建放到QEMU中来完成,这里仅执行* 连接操作*/ret = vhost_user_connect_nonblock(fd, (struct sockaddr *)&vsocket->un,sizeof(vsocket->un));if (ret == 0) {/*检查通过,创建vhost_device,vhost_user_connection并加入到* 对应的conn_list中*/vhost_user_add_connection(fd, vsocket);return 0;}RTE_LOG(WARNING, VHOST_CONFIG,"failed to connect to %s: %s\n",path, strerror(errno));/*检查失败时,判断是否已配置重连特性,没有的话就直接退出了*/if (ret == -2 || !vsocket->reconnect) {close(fd);return -1;}/*把该socket放到重连队列中,等待vhost_user_reconnect_init()初始化创* 建的后台线程执行调度了*/RTE_LOG(INFO, VHOST_CONFIG, "%s: reconnecting...\n", path);reconn = malloc(sizeof(*reconn));if (reconn == NULL) {RTE_LOG(ERR, VHOST_CONFIG,"failed to allocate memory for reconnect\n");close(fd);return -1;}reconn->un = vsocket->un;reconn->fd = fd;reconn->vsocket = vsocket;pthread_mutex_lock(&reconn_list.mutex);TAILQ_INSERT_TAIL(&reconn_list.head, reconn, next);pthread_mutex_unlock(&reconn_list.mutex);return 0;
}/*server模式*/
vhost_user_start_server(struct vhost_user_socket *vsocket)
{int ret;int fd = vsocket->socket_fd;const char *path = vsocket->path;/*熟悉的套路,bind-->listen-->read handle*/ret = bind(fd, (struct sockaddr *)&vsocket->un, sizeof(vsocket->un));if (ret < 0) {RTE_LOG(ERR, VHOST_CONFIG,"failed to bind to %s: %s; remove it and try again\n",path, strerror(errno));goto err;}RTE_LOG(INFO, VHOST_CONFIG, "bind to %s\n", path);ret = listen(fd, MAX_VIRTIO_BACKLOG);if (ret < 0)goto err;/*真正的处理函数,根据新连上的socket创建virtio device,* 插入到连接队列中待处理*/ret = fdset_add(&vhost_user.fdset, fd, vhost_user_server_new_connection,NULL, vsocket);if (ret < 0) {RTE_LOG(ERR, VHOST_CONFIG,"failed to add listen fd %d to vhost server fdset\n",fd);goto err;}return 0;err:close(fd);return -1;
}

报文传输(enqueue,dequeue)

API接口:

/*将count个报文从host转发给guest*/
uint16_t rte_vhost_enqueue_burst(int vid, uint16_t queue_id,struct rte_mbuf **pkts, uint16_t count)
/*从guest接收count个报文,并存储到pkts中*/
uint16_t rte_vhost_dequeue_burst(int vid, uint16_t queue_id,struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count)

直接看代码:

rte_vhost_enqueue_burst()

uint16_t
rte_vhost_enqueue_burst(int vid, uint16_t queue_id,
struct rte_mbuf **pkts, uint16_t count)
{/*获取guest的virtio dev*/struct virtio_net *dev = get_device(vid);if (!dev)return 0;/*检查是否支持mergable,执行不同的路径*/if (dev->features & (1 << VIRTIO_NET_F_MRG_RXBUF))return virtio_dev_merge_rx(dev, queue_id, pkts, count);elsereturn virtio_dev_rx(dev, queue_id, pkts, count);
}/*只看看简单的情况吧,mergable涉及到的优化略复杂,框架还是大同小异的。
* 该函数将从物理网卡或者别的虚机中收到的pkt放到virtio dev的RX 虚拟队列中。
*///优化从函数定义就开始了,staic & inline
static inline uint32_t __attribute__((always_inline))
virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
struct rte_mbuf **pkts, uint32_t count)
{struct vhost_virtqueue *vq;uint16_t avail_idx, free_entries, start_idx;uint16_t desc_indexes[MAX_PKT_BURST];struct vring_desc *descs;uint16_t used_idx;uint32_t i, sz;/*执行相关一系列检查*/LOG_DEBUG(VHOST_DATA, "(%d) %s\n", dev->vid, __func__);if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {RTE_LOG(ERR, VHOST_DATA, "(%d) %s: invalid virtqueue idx %d.\n",dev->vid, __func__, queue_id);return 0;}vq = dev->virtqueue[queue_id];if (unlikely(vq->enabled == 0))return 0;avail_idx = *((volatile uint16_t *)&vq->avail->idx);start_idx = vq->last_used_idx;free_entries = avail_idx - start_idx;count = RTE_MIN(count, free_entries);count = RTE_MIN(count, (uint32_t)MAX_PKT_BURST);if (count == 0)return 0;LOG_DEBUG(VHOST_DATA, "(%d) start_idx %d | end_idx %d\n",dev->vid, start_idx, start_idx + count);/* Retrieve all of the desc indexes first to avoid caching issues. */rte_prefetch0(&vq->avail->ring[start_idx & (vq->size - 1)]);for (i = 0; i < count; i++) {used_idx = (start_idx + i) & (vq->size - 1);desc_indexes[i] = vq->avail->ring[used_idx];vq->used->ring[used_idx].id = desc_indexes[i];vq->used->ring[used_idx].len = pkts[i]->pkt_len +dev->vhost_hlen;vhost_log_used_vring(dev, vq,offsetof(struct vring_used, ring[used_idx]),sizeof(vq->used->ring[used_idx]));}rte_prefetch0(&vq->desc[desc_indexes[0]]);for (i = 0; i < count; i++) {uint16_t desc_idx = desc_indexes[i];int err;if (vq->desc[desc_idx].flags & VRING_DESC_F_INDIRECT) {descs = (struct vring_desc *)(uintptr_t)rte_vhost_gpa_to_vva(dev->mem,vq->desc[desc_idx].addr);if (unlikely(!descs)) {count = i;break;}desc_idx = 0;sz = vq->desc[desc_idx].len / sizeof(*descs);} else {descs = vq->desc;sz = vq->size;}/*一个一个的往ring中拷贝,性能估计不会太好*/err = copy_mbuf_to_desc(dev, descs, pkts[i], desc_idx, sz);if (unlikely(err)) {used_idx = (start_idx + i) & (vq->size - 1);vq->used->ring[used_idx].len = dev->vhost_hlen;vhost_log_used_vring(dev, vq,offsetof(struct vring_used, ring[used_idx]),sizeof(vq->used->ring[used_idx]));}if (i + 1 < count)rte_prefetch0(&vq->desc[desc_indexes[i+1]]);}rte_smp_wmb();*(volatile uint16_t *)&vq->used->idx += count;vq->last_used_idx += count;vhost_log_used_vring(dev, vq,offsetof(struct vring_used, idx),sizeof(vq->used->idx));/* flush used->idx update before we read avail->flags. */rte_mb();/* Kick the guest if necessary. *//*如果条件满足,就发事件通知*/if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT)&& (vq->callfd >= 0))eventfd_write(vq->callfd, (eventfd_t)1);return count;
}

rte_vhost_dequeue_burst()

uint16_t rte_vhost_dequeue_burst(int vid, uint16_t queue_id,
struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count)
{struct virtio_net *dev;struct rte_mbuf *rarp_mbuf = NULL;struct vhost_virtqueue *vq;uint32_t desc_indexes[MAX_PKT_BURST];uint32_t used_idx;uint32_t i = 0;uint16_t free_entries;uint16_t avail_idx;/*获取vdevice,并做相关检查*/dev = get_device(vid);if (!dev)return 0;if (unlikely(!is_valid_virt_queue_idx(queue_id, 1, dev->nr_vring))) {RTE_LOG(ERR, VHOST_DATA, "(%d) %s: invalid virtqueue idx %d.\n",dev->vid, __func__, queue_id);return 0;}vq = dev->virtqueue[queue_id];if (unlikely(vq->enabled == 0))return 0;if (unlikely(dev->dequeue_zero_copy)) {struct zcopy_mbuf *zmbuf, *next;int nr_updated = 0;for (zmbuf = TAILQ_FIRST(&vq->zmbuf_list);zmbuf != NULL; zmbuf = next) {next = TAILQ_NEXT(zmbuf, next);if (mbuf_is_consumed(zmbuf->mbuf)) {used_idx = vq->last_used_idx++ & (vq->size - 1);update_used_ring(dev, vq, used_idx,zmbuf->desc_idx);nr_updated += 1;TAILQ_REMOVE(&vq->zmbuf_list, zmbuf, next);rte_pktmbuf_free(zmbuf->mbuf);put_zmbuf(zmbuf);vq->nr_zmbuf -= 1;}}update_used_idx(dev, vq, nr_updated);}/** Construct a RARP broadcast packet, and inject it to the "pkts"* array, to looks like that guest actually send such packet.** Check user_send_rarp() for more information.** broadcast_rarp shares a cacheline in the virtio_net structure* with some fields that are accessed during enqueue and* rte_atomic16_cmpset() causes a write if using cmpxchg. This could* result in false sharing between enqueue and dequeue.** Prevent unnecessary false sharing by reading broadcast_rarp first* and only performing cmpset if the read indicates it is likely to* be set.*//*先要将第一个赋值成构造的RARP广播包,至于为什么要添加这么一个包,* 主要和虚拟迁移有关,有兴趣的可以研究下上面的英文注释*/if (unlikely(rte_atomic16_read(&dev->broadcast_rarp) &&rte_atomic16_cmpset((volatile uint16_t *)&dev->broadcast_rarp.cnt, 1, 0))) {rarp_mbuf = rte_pktmbuf_alloc(mbuf_pool);if (rarp_mbuf == NULL) {RTE_LOG(ERR, VHOST_DATA,"Failed to allocate memory for mbuf.\n");return 0;}if (make_rarp_packet(rarp_mbuf, &dev->mac)) {rte_pktmbuf_free(rarp_mbuf);rarp_mbuf = NULL;} else {count -= 1;}}free_entries = *((volatile uint16_t *)&vq->avail->idx) - vq->last_avail_idx;if (free_entries == 0)goto out;LOG_DEBUG(VHOST_DATA, "(%d) %s\n", dev->vid, __func__);/* Prefetch available and used ring */avail_idx = vq->last_avail_idx & (vq->size - 1);used_idx  = vq->last_used_idx  & (vq->size - 1);rte_prefetch0(&vq->avail->ring[avail_idx]);rte_prefetch0(&vq->used->ring[used_idx]);count = RTE_MIN(count, MAX_PKT_BURST);count = RTE_MIN(count, free_entries);LOG_DEBUG(VHOST_DATA, "(%d) about to dequeue %u buffers\n",dev->vid, count);/* Retrieve all of the head indexes first to avoid caching issues. */for (i = 0; i < count; i++) {avail_idx = (vq->last_avail_idx + i) & (vq->size - 1);used_idx  = (vq->last_used_idx  + i) & (vq->size - 1);desc_indexes[i] = vq->avail->ring[avail_idx];if (likely(dev->dequeue_zero_copy == 0))update_used_ring(dev, vq, used_idx, desc_indexes[i]);}/* Prefetch descriptor index. */rte_prefetch0(&vq->desc[desc_indexes[0]]);for (i = 0; i < count; i++) {struct vring_desc *desc;uint16_t sz, idx;int err;if (likely(i + 1 < count))rte_prefetch0(&vq->desc[desc_indexes[i + 1]]);if (vq->desc[desc_indexes[i]].flags & VRING_DESC_F_INDIRECT) {desc = (struct vring_desc *)(uintptr_t)rte_vhost_gpa_to_vva(dev->mem,vq->desc[desc_indexes[i]].addr);if (unlikely(!desc))break;rte_prefetch0(desc);sz = vq->desc[desc_indexes[i]].len / sizeof(*desc);idx = 0;} else {desc = vq->desc;sz = vq->size;idx = desc_indexes[i];}pkts[i] = rte_pktmbuf_alloc(mbuf_pool);if (unlikely(pkts[i] == NULL)) {RTE_LOG(ERR, VHOST_DATA,"Failed to allocate memory for mbuf.\n");break;}//还是一个一个拷贝err = copy_desc_to_mbuf(dev, desc, sz, pkts[i], idx, mbuf_pool);if (unlikely(err)) {rte_pktmbuf_free(pkts[i]);break;}if (unlikely(dev->dequeue_zero_copy)) {struct zcopy_mbuf *zmbuf;zmbuf = get_zmbuf(vq);if (!zmbuf) {rte_pktmbuf_free(pkts[i]);break;}zmbuf->mbuf = pkts[i];zmbuf->desc_idx = desc_indexes[i];/** Pin lock the mbuf; we will check later to see* whether the mbuf is freed (when we are the last* user) or not. If that's the case, we then could* update the used ring safely.*/rte_mbuf_refcnt_update(pkts[i], 1);vq->nr_zmbuf += 1;TAILQ_INSERT_TAIL(&vq->zmbuf_list, zmbuf, next);}}vq->last_avail_idx += i;if (likely(dev->dequeue_zero_copy == 0)) {vq->last_used_idx += i;update_used_idx(dev, vq, i);}out:if (unlikely(rarp_mbuf != NULL)) {/** Inject it to the head of "pkts" array, so that switch's mac* learning table will get updated first.*/memmove(&pkts[1], pkts, i * sizeof(struct rte_mbuf *));pkts[0] = rarp_mbuf;i += 1;}return i;
}

ok,到这里比较重要的API就介绍差不多了,基本的原理应该也就掌握了。

原文链接:https://www.geek-share.com/detail/2716296473.html

DPDK vhost-user研究(九)相关推荐

  1. 英特尔DSA-加速DPDK Vhost

    1. 介绍 VirtIO是一个虚拟接口标准,用于虚拟机(Virtual Machine, VM)访问其他设备,如网络设备和块设备.一个VirtIO设备由一个运行在主机上的后端和一个存在于虚拟机中的前端 ...

  2. DPDK vhost库(十一)

    Vhost库实现了一个用户空间virtio网络服务器,允许用户直接操作virtio. 换句话说,它允许用户通过VM virtio网络设备获取/发送数据包. 为了达到这个功能,一个vhost库需要实现: ...

  3. DPDK vhost库

    vhost库实现了一个用户空间的virtio net server,允许用户直接处理virtio ring队列.换句话说,它让用户可以从VM virtio网络设备读取或写入数据包,为了达到这个目的,v ...

  4. dpdk Vhost 库

    1. 怎么实现vhost_dev的VhostOps的vhost_set_vring_kick和vhost_set_vring_call: vhost_net  kernel方式的vhost_set_v ...

  5. dpdk vhost应用

    dpdk vhost应用 dpdk 目录下有examples/vhost 测试使用vhost-user 的例子,相当与网卡直通到虚拟机网卡 [root@HP14QY2 build]# ./vhost- ...

  6. 经济模型研究九:DBTC中流用平台激励力度

    计算公式:s=20*k 平台获奖的金额(元)s=总的奖励数量 a*自己应得的比例 r*币价 p 流用平台总的奖励数量 a=1600 万/年,每 4 年减半. 自己应得的比例 r=平台所有用户的持仓量之 ...

  7. springboot研究九:lettuce连接池很香,撸撸它的源代码

    个人公众号:jinjunzhu 目录 springboot中lettuce配置 lettuce初始化 使用netty创建连接 管理连接 actuator健康检查获取连接 释放不掉的连接 共享连接 总结 ...

  8. 数据平面开发套件(DPDK)中的Vhost / Virtio的配置和性能

    目录 Vhost / Virtio简介 接收和发送路径 可合并路径 向量路径 不可合并的路径 PVP路径性能比较 I / O转发吞吐量 Mac转发吞吐量 PVP MAC转发吞吐量 测试台信息 关于作者 ...

  9. 《深入浅出DPDK》读书笔记(十三):DPDK虚拟化技术篇(加速包处理的vhost优化方案)

    Table of Contents 加速包处理的vhost优化方案 142.vhost的演进和原理 143.Qemu与virtio-net 144.Linux内核态vhost-net 145.用户态v ...

最新文章

  1. 我的Android进阶之旅------gt;Android中通过adb shell input来模拟滑动、按键、点击事件...
  2. Js函数function基础理解
  3. OpenCV实现Mat与vector,Mat与数组互转
  4. Shell练习题(持续更新)
  5. 牺牲阳极计算机安装标准储罐,钢质储罐阴极保护牺牲阳极保护方法与设计安装...
  6. Codeforces Round #476 (Div. 2) C - Greedy Arkady
  7. html标签b规定粗体文本
  8. Ubuntu驱动摄像头
  9. 【2016 ACM-ICPC 曼谷区域赛 Gym-101161 G】Binary Strings【矩阵快速幂】
  10. 计算机保研厦大面试,保研其实不难:他们保研人大、厦大、山大,有这些经验,值得收藏!...
  11. 笔记本电脑发射无线信号的操作步骤及命令
  12. 四六级分数根据比例给分
  13. java中二维数组的定义
  14. keep怎么弄轨迹动画_keep怎么录视频?教学视频录像和轨迹动画视频录制方法介绍...
  15. python射击小游戏源码_导弹发射小游戏——Python源代码
  16. linux根目录IOT文件,针对新型IoT僵尸网络Linux.Omni的分析
  17. SUSE 12 SP5下静默全脚本搭建Oracle 12C RAC R1(五)
  18. PHP笔试题和面试题汇总
  19. 【Linux】进程控制2-进程等待
  20. 医院信息系统服务器巡查内容,医院信息化系统应急预案

热门文章

  1. 0基础学python-如何从零基础自学Python?
  2. python的none是什么-python中stream=None什么意思?
  3. 学python要多少钱-学python去培训班要多少钱?
  4. 学好python能干嘛-普通人学Python有用吗?学完能做什么?
  5. python自动化测试视频百度云-python接口自动化测试视频教程全集
  6. python就业方向及工资-Python的就业的方向和前景
  7. python银行系统-菲菲用python模拟银行系统
  8. python 类-python类定义的讲解
  9. python基础常用语句-Python-基础-常用术语对照表
  10. python编程入门经典教程-python编程入门经典