From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mga03.intel.com (mga03.intel.com [134.134.136.65]) by dpdk.org (Postfix) with ESMTP id D73C69237 for ; Tue, 5 Jan 2016 07:41:41 +0100 (CET) Received: from fmsmga004.fm.intel.com ([10.253.24.48]) by orsmga103.jf.intel.com with ESMTP; 04 Jan 2016 22:41:40 -0800 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.20,524,1444719600"; d="scan'208";a="23741237" Received: from dpdk15.sh.intel.com ([10.239.129.25]) by fmsmga004.fm.intel.com with ESMTP; 04 Jan 2016 22:41:38 -0800 From: Huawei Xie To: dev@dpdk.org Date: Mon, 4 Jan 2016 22:46:27 +0800 Message-Id: <1451918787-85887-1-git-send-email-huawei.xie@intel.com> X-Mailer: git-send-email 1.8.1.4 Cc: ann.zhuangyanying@huawei.com Subject: [dpdk-dev] [PATCH] vhost: remove lockless enqueue to the virtio ring X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: patches and discussions about DPDK List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Tue, 05 Jan 2016 06:41:42 -0000 This patch removes the internal lockless enqueue implmentation. DPDK doesn't support receiving/transmitting packets from/to the same queue. Vhost PMD wraps vhost device as normal DPDK port. DPDK applications normally have their own lock implmentation when enqueue packets to the same queue of a port. The atomic cmpset is a costly operation. This patch should help performance a bit. Signed-off-by: Huawei Xie --- lib/librte_vhost/vhost_rxtx.c | 86 +++++++++++++------------------------------ 1 file changed, 25 insertions(+), 61 deletions(-) diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c index bbf3fac..26a1b9c 100644 --- a/lib/librte_vhost/vhost_rxtx.c +++ b/lib/librte_vhost/vhost_rxtx.c @@ -69,10 +69,8 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id, uint64_t buff_hdr_addr = 0; uint32_t head[MAX_PKT_BURST]; uint32_t head_idx, packet_success = 0; - uint16_t avail_idx, res_cur_idx; - uint16_t res_base_idx, res_end_idx; + uint16_t avail_idx, res_cur_idx, res_end_idx; uint16_t free_entries; - uint8_t success = 0; LOG_DEBUG(VHOST_DATA, "(%"PRIu64") virtio_dev_rx()\n", dev->device_fh); if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->virt_qp_nb))) { @@ -88,29 +86,18 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id, count = (count > MAX_PKT_BURST) ? MAX_PKT_BURST : count; - /* - * As many data cores may want access to available buffers, - * they need to be reserved. - */ - do { - res_base_idx = vq->last_used_idx_res; - avail_idx = *((volatile uint16_t *)&vq->avail->idx); - - free_entries = (avail_idx - res_base_idx); - /*check that we have enough buffers*/ - if (unlikely(count > free_entries)) - count = free_entries; - - if (count == 0) - return 0; - - res_end_idx = res_base_idx + count; - /* vq->last_used_idx_res is atomically updated. */ - /* TODO: Allow to disable cmpset if no concurrency in application. */ - success = rte_atomic16_cmpset(&vq->last_used_idx_res, - res_base_idx, res_end_idx); - } while (unlikely(success == 0)); - res_cur_idx = res_base_idx; + avail_idx = *((volatile uint16_t *)&vq->avail->idx); + free_entries = (avail_idx - vq->last_used_idx_res); + /*check that we have enough buffers*/ + if (unlikely(count > free_entries)) + count = free_entries; + if (count == 0) + return 0; + + res_cur_idx = vq->last_used_idx_res; + res_end_idx = res_cur_idx + count; + vq->last_used_idx_res = res_end_idx; + LOG_DEBUG(VHOST_DATA, "(%"PRIu64") Current Index %d| End Index %d\n", dev->device_fh, res_cur_idx, res_end_idx); @@ -230,10 +217,6 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id, rte_compiler_barrier(); - /* Wait until it's our turn to add our buffer to the used ring. */ - while (unlikely(vq->last_used_idx != res_base_idx)) - rte_pause(); - *(volatile uint16_t *)&vq->used->idx += count; vq->last_used_idx = res_end_idx; @@ -474,7 +457,6 @@ virtio_dev_merge_rx(struct virtio_net *dev, uint16_t queue_id, uint32_t pkt_idx = 0, entry_success = 0; uint16_t avail_idx; uint16_t res_base_idx, res_cur_idx; - uint8_t success = 0; LOG_DEBUG(VHOST_DATA, "(%"PRIu64") virtio_dev_merge_rx()\n", dev->device_fh); @@ -496,46 +478,28 @@ virtio_dev_merge_rx(struct virtio_net *dev, uint16_t queue_id, for (pkt_idx = 0; pkt_idx < count; pkt_idx++) { uint32_t pkt_len = pkts[pkt_idx]->pkt_len + vq->vhost_hlen; + uint32_t secure_len = 0; + uint32_t vec_idx = 0; - do { - /* - * As many data cores may want access to available - * buffers, they need to be reserved. - */ - uint32_t secure_len = 0; - uint32_t vec_idx = 0; - - res_base_idx = vq->last_used_idx_res; - res_cur_idx = res_base_idx; + res_base_idx = res_cur_idx = vq->last_used_idx_res; - do { - avail_idx = *((volatile uint16_t *)&vq->avail->idx); - if (unlikely(res_cur_idx == avail_idx)) - goto merge_rx_exit; + do { + avail_idx = *((volatile uint16_t *)&vq->avail->idx); + if (unlikely(res_cur_idx == avail_idx)) + goto merge_rx_exit; - update_secure_len(vq, res_cur_idx, - &secure_len, &vec_idx); - res_cur_idx++; - } while (pkt_len > secure_len); + update_secure_len(vq, res_cur_idx, + &secure_len, &vec_idx); + res_cur_idx++; + } while (pkt_len > secure_len); - /* vq->last_used_idx_res is atomically updated. */ - success = rte_atomic16_cmpset(&vq->last_used_idx_res, - res_base_idx, - res_cur_idx); - } while (success == 0); + vq->last_used_idx_res = res_cur_idx; entry_success = copy_from_mbuf_to_vring(dev, queue_id, res_base_idx, res_cur_idx, pkts[pkt_idx]); rte_compiler_barrier(); - /* - * Wait until it's our turn to add our buffer - * to the used ring. - */ - while (unlikely(vq->last_used_idx != res_base_idx)) - rte_pause(); - *(volatile uint16_t *)&vq->used->idx += entry_success; vq->last_used_idx = res_cur_idx; } -- 1.8.1.4