From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mail-vc0-f173.google.com (mail-vc0-f173.google.com [209.85.220.173]) by dpdk.org (Postfix) with ESMTP id 9AA3A6A95 for ; Wed, 19 Nov 2014 21:38:06 +0100 (CET) Received: by mail-vc0-f173.google.com with SMTP id id10so740891vcb.32 for ; Wed, 19 Nov 2014 12:48:33 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=0Jsj6qg+5KrPsRlL6rWSZ+qdTh9bt42vcmvjcZulXMQ=; b=p7XLXZ1oX6BUdWRIEC3AI0UtBDbhKrjhpMKnxC42UMEu3MHxfRj2UlOLP4ZI8enOce 8SNg7vhiR8/umvEj3jfZ95cGIrVcwYJPqSZpvptuk6emk1+Dhd8zzVgGgloZ8l1qFrju 01xUCsgyMJoqyBI3B2SVZ/gSOJ8GEQQdoKqzxSDo2nPFq91/7SwY8lKH7SrHZoa3N/ZH M+0isyljxNnYIDGhYmzec07UJRBLQXDiO7xIFrB6e47TZ+VLvwYXaYkVoJpY8h+92fW8 BTyexWdrGwnkfpy3u7lIjCjjmjzUTaZ3IV33w9rjJA+AT1spedq5zQn+WtHXcogTl3vy brbA== MIME-Version: 1.0 X-Received: by 10.52.37.43 with SMTP id v11mr32391632vdj.3.1416430113132; Wed, 19 Nov 2014 12:48:33 -0800 (PST) Received: by 10.31.129.205 with HTTP; Wed, 19 Nov 2014 12:48:33 -0800 (PST) In-Reply-To: References: Date: Wed, 19 Nov 2014 15:48:33 -0500 Message-ID: From: Robert Sanford To: "Zhou, Danny" , "dev@dpdk.org" Content-Type: text/plain; charset=UTF-8 X-Content-Filtered-By: Mailman/MimeDel 2.1.15 Subject: Re: [dpdk-dev] Enhance KNI DPDK-app-side to be Multi-Producer/Consumer X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: patches and discussions about DPDK List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Wed, 19 Nov 2014 20:38:07 -0000 Hi Danny, On Fri, Nov 14, 2014 at 7:04 PM, Zhou, Danny wrote: > It will be always good if you can submit the RFC patch in terms of KNI > optimization. > > On the other hand, do you have any perf. data to prove that your patchset > could improve > KNI performance which is the concern that most customers care about? We > introduced > multiple-threaded KNI kernel support last year, if I remember correctly, > the key perform > bottle-neck we found is the skb alloc/free and memcpy between skb and > mbuf. Would be > very happy if your patchset can approve I am wrong. This is not an attempt to improve raw performance. Our modest goal is to make librte_kni's RX/TX burst APIs multithreaded, without changing rte_kni.ko. In this RFC patch, we make it possible for multiple cores to concurrently invoke rte_kni_tx_burst (or rte_kni_rx_burst) for the same KNI device. At the moment, multiple cores invoking rte_kni_tx_burst for the same device cannot function correctly, because the rte_kni_fifo structures (memory shared between app and kernel driver) are single-producer, single-consumer. The following patch supplements the rte_kni_fifo structure with an additional structure that is private to the application, and we borrow librte_ring's MP/MC enqueue/dequeue logic. Here is a patch for 1.8. We have only tested a 1.7.1 version. Please have a look and let us know whether you think something like this would be useful. -- Thanks, Robert *Signed-off-by: Robert Sanford >* --- lib/librte_kni/rte_kni.c | 21 +++++- lib/librte_kni/rte_kni_fifo.h | 131 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 148 insertions(+), 4 deletions(-) diff --git a/lib/librte_kni/rte_kni.c b/lib/librte_kni/rte_kni.c index fdb7509..8009173 100644 --- a/lib/librte_kni/rte_kni.c +++ b/lib/librte_kni/rte_kni.c @@ -76,6 +76,11 @@ struct rte_kni { struct rte_kni_fifo *alloc_q; /**< Allocated mbufs queue */ struct rte_kni_fifo *free_q; /**< To be freed mbufs queue */ + struct rte_kni_fifo_multi tx_q_mc; /**< Make tx_q multi-consumer */ + struct rte_kni_fifo_multi alloc_q_mp;/**< Make alloc_q multi-producer */ + struct rte_kni_fifo_multi rx_q_mp; /**< Make rx_q multi-producer */ + struct rte_kni_fifo_multi free_q_mc;/**< Make free_q multi-consumer */ + /* For request & response */ struct rte_kni_fifo *req_q; /**< Request queue */ struct rte_kni_fifo *resp_q; /**< Response queue */ @@ -414,6 +419,11 @@ rte_kni_alloc(struct rte_mempool *pktmbuf_pool, kni_fifo_init(ctx->free_q, KNI_FIFO_COUNT_MAX); dev_info.free_phys = mz->phys_addr; + kni_fifo_multi_init(&ctx->tx_q_mc, KNI_FIFO_COUNT_MAX); + kni_fifo_multi_init(&ctx->alloc_q_mp, KNI_FIFO_COUNT_MAX); + kni_fifo_multi_init(&ctx->rx_q_mp, KNI_FIFO_COUNT_MAX); + kni_fifo_multi_init(&ctx->free_q_mc, KNI_FIFO_COUNT_MAX); + /* Request RING */ mz = slot->m_req_q; ctx->req_q = mz->addr; @@ -557,7 +567,8 @@ rte_kni_handle_request(struct rte_kni *kni) unsigned rte_kni_tx_burst(struct rte_kni *kni, struct rte_mbuf **mbufs, unsigned num) { - unsigned ret = kni_fifo_put(kni->rx_q, (void **)mbufs, num); + unsigned ret = kni_fifo_put_mp(kni->rx_q, &kni->rx_q_mp, (void **)mbufs, + num); /* Get mbufs from free_q and then free them */ kni_free_mbufs(kni); @@ -568,7 +579,8 @@ rte_kni_tx_burst(struct rte_kni *kni, struct rte_mbuf **mbufs, unsigned num) unsigned rte_kni_rx_burst(struct rte_kni *kni, struct rte_mbuf **mbufs, unsigned num) { - unsigned ret = kni_fifo_get(kni->tx_q, (void **)mbufs, num); + unsigned ret = kni_fifo_get_mc(kni->tx_q, &kni->tx_q_mc, + (void **)mbufs, num); /* Allocate mbufs and then put them into alloc_q */ kni_allocate_mbufs(kni); @@ -582,7 +594,8 @@ kni_free_mbufs(struct rte_kni *kni) int i, ret; struct rte_mbuf *pkts[MAX_MBUF_BURST_NUM]; - ret = kni_fifo_get(kni->free_q, (void **)pkts, MAX_MBUF_BURST_NUM); + ret = kni_fifo_get_mc(kni->free_q, &kni->free_q_mc, (void **)pkts, + MAX_MBUF_BURST_NUM); if (likely(ret > 0)) { for (i = 0; i < ret; i++) rte_pktmbuf_free(pkts[i]); @@ -629,7 +642,7 @@ kni_allocate_mbufs(struct rte_kni *kni) if (i <= 0) return; - ret = kni_fifo_put(kni->alloc_q, (void **)pkts, i); + ret = kni_fifo_put_mp(kni->alloc_q, &kni->alloc_q_mp, (void **)pkts, i); /* Check if any mbufs not put into alloc_q, and then free them */ if (ret >= 0 && ret < i && ret < MAX_MBUF_BURST_NUM) { diff --git a/lib/librte_kni/rte_kni_fifo.h b/lib/librte_kni/rte_kni_fifo.h index 8cb8587..7dccba2 100644 --- a/lib/librte_kni/rte_kni_fifo.h +++ b/lib/librte_kni/rte_kni_fifo.h @@ -91,3 +91,134 @@ kni_fifo_get(struct rte_kni_fifo *fifo, void **data, unsigned num) fifo->read = new_read; return i; } + + +/** + * Suplemental members to facilitate MP/MC access to KNI FIFOs. + */ +struct rte_kni_fifo_multi { + volatile uint32_t head; + volatile uint32_t tail; + uint32_t mask; + uint32_t size; +}; + +/** + * Initialize a kni fifo MP/MC struct. + */ +static void +kni_fifo_multi_init(struct rte_kni_fifo_multi *multi, unsigned size) +{ + multi->head = 0; + multi->tail = 0; + multi->mask = (typeof(multi->mask))(size - 1); + multi->size = (typeof(multi->size))size; +} + +/** + * Adds num elements into the fifo. Return the number actually written. + * + * Multiple-producer version, modeled after __rte_ring_mp_do_enqueue(). + */ +static inline unsigned +kni_fifo_put_mp(struct rte_kni_fifo *fifo, struct rte_kni_fifo_multi *prod, + void **data, unsigned n) +{ + uint32_t prod_head, prod_next; + uint32_t cons_tail, free_entries; + const unsigned max = n; + int success; + unsigned i; + const uint32_t mask = prod->mask; + + /* Move prod->head atomically. */ + do { + /* Reset n to the initial burst count. */ + n = max; + + prod_head = prod->head; + cons_tail = fifo->read; + + free_entries = (mask + cons_tail - prod_head) & mask; + + /* Check that we have enough room in ring. */ + if (unlikely(n > free_entries)) { + if (unlikely(free_entries == 0)) + return 0; + n = free_entries; + } + + prod_next = (prod_head + n) & mask; + success = rte_atomic32_cmpset(&prod->head, prod_head, prod_next); + } while (unlikely(success == 0)); + + /* Write entries in ring. */ + for (i = 0; i < n; i++) { + fifo->buffer[(prod_head + i) & mask] = data[i]; + } + rte_compiler_barrier(); + + /* If there are other enqueues in progress that preceded us, + * we need to wait for them to complete. + */ + while (unlikely(prod->tail != prod_head)) + rte_pause(); + + prod->tail = prod_next; + fifo->write = prod_next; + return n; +} + +/** + * Get up to num elements from the fifo. Return the number actully read. + * + * Multiple-consumer version, modeled after __rte_ring_mc_do_dequeue(). + */ +static inline unsigned +kni_fifo_get_mc(struct rte_kni_fifo *fifo, struct rte_kni_fifo_multi *cons, + void **data, unsigned n) +{ + uint32_t cons_head, prod_tail; + uint32_t cons_next, entries; + const unsigned max = n; + int success; + unsigned i; + const uint32_t mask = cons->mask; + + /* Move cons->head atomically. */ + do { + /* Restore n as it may change every loop. */ + n = max; + + cons_head = cons->head; + prod_tail = fifo->write; + + entries = (prod_tail - cons_head + mask + 1) & mask; + + /* Set the actual entries for dequeue. */ + if (n > entries) { + if (unlikely(entries == 0)) + return 0; + n = entries; + } + + cons_next = (cons_head + n) & mask; + success = rte_atomic32_cmpset(&cons->head, cons_head, cons_next); + } while (unlikely(success == 0)); + + /* Copy entries from ring. */ + for (i = 0; i < n; i++) { + data[i] = fifo->buffer[(cons_head + i) & mask]; + } + rte_compiler_barrier(); + + /* If there are other dequeues in progress that preceded us, + * we need to wait for them to complete. + */ + while (unlikely(cons->tail != cons_head)) + rte_pause(); + + cons->tail = cons_next; + fifo->read = cons_next; + return n; +} -- 1.7.1