* [dpdk-dev] [PATCH 1/3] port: added WRITER_APPROACH == 1 implementation to ring port
2015-03-30 9:56 [dpdk-dev] [PATCH 0/3] port: added ethdev_writer_nodrop and ring_writer_nodrop ports Maciej Gajdzica
@ 2015-03-30 9:56 ` Maciej Gajdzica
2015-03-30 20:50 ` David Marchand
2015-03-30 9:56 ` [dpdk-dev] [PATCH 2/3] port: added ethdev_writer_nodrop port Maciej Gajdzica
` (2 subsequent siblings)
3 siblings, 1 reply; 8+ messages in thread
From: Maciej Gajdzica @ 2015-03-30 9:56 UTC (permalink / raw)
To: dev
Added better optimized implementation of tx_bulk for ring writer port based on
similar solution in ethdev_writer port. New implementation sends burst without
copying data to internal buffer if it is possible.
---
lib/librte_port/rte_port_ring.c | 59 +++++++++++++++++++++++++++++++++++++++
1 file changed, 59 insertions(+)
diff --git a/lib/librte_port/rte_port_ring.c b/lib/librte_port/rte_port_ring.c
index fa3d77b..ba2eeb3 100644
--- a/lib/librte_port/rte_port_ring.c
+++ b/lib/librte_port/rte_port_ring.c
@@ -96,11 +96,14 @@ rte_port_ring_reader_free(void *port)
/*
* Port RING Writer
*/
+#define RTE_PORT_RING_WRITER_APPROACH 1
+
struct rte_port_ring_writer {
struct rte_mbuf *tx_buf[RTE_PORT_IN_BURST_SIZE_MAX];
struct rte_ring *ring;
uint32_t tx_burst_sz;
uint32_t tx_buf_count;
+ uint64_t bsz_mask;
};
static void *
@@ -130,6 +133,7 @@ rte_port_ring_writer_create(void *params, int socket_id)
port->ring = conf->ring;
port->tx_burst_sz = conf->tx_burst_sz;
port->tx_buf_count = 0;
+ port->bsz_mask = 1LLU << (conf->tx_burst_sz - 1);
return port;
}
@@ -160,6 +164,8 @@ rte_port_ring_writer_tx(void *port, struct rte_mbuf *pkt)
return 0;
}
+#if RTE_PORT_RING_WRITER_APPROACH == 0
+
static int
rte_port_ring_writer_tx_bulk(void *port,
struct rte_mbuf **pkts,
@@ -194,6 +200,59 @@ rte_port_ring_writer_tx_bulk(void *port,
return 0;
}
+#elif RTE_PORT_RING_WRITER_APPROACH == 1
+
+static int
+rte_port_ring_writer_tx_bulk(void *port,
+ struct rte_mbuf **pkts,
+ uint64_t pkts_mask)
+{
+ struct rte_port_ring_writer *p =
+ (struct rte_port_ring_writer *) port;
+
+ uint32_t bsz_mask = p->bsz_mask;
+ uint32_t tx_buf_count = p->tx_buf_count;
+ uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
+ ((pkts_mask & bsz_mask) ^ bsz_mask);
+
+ if (expr == 0) {
+ uint64_t n_pkts = __builtin_popcountll(pkts_mask);
+ uint32_t n_pkts_ok;
+
+ if (tx_buf_count)
+ send_burst(p);
+
+ n_pkts_ok = rte_ring_sp_enqueue_burst(p->ring, (void **)pkts, n_pkts);
+
+ for ( ; n_pkts_ok < n_pkts; n_pkts_ok++) {
+ struct rte_mbuf *pkt = pkts[n_pkts_ok];
+
+ rte_pktmbuf_free(pkt);
+ }
+ } else {
+ for ( ; pkts_mask; ) {
+ uint32_t pkt_index = __builtin_ctzll(pkts_mask);
+ uint64_t pkt_mask = 1LLU << pkt_index;
+ struct rte_mbuf *pkt = pkts[pkt_index];
+
+ p->tx_buf[tx_buf_count++] = pkt;
+ pkts_mask &= ~pkt_mask;
+ }
+
+ p->tx_buf_count = tx_buf_count;
+ if (tx_buf_count >= p->tx_burst_sz)
+ send_burst(p);
+ }
+
+ return 0;
+}
+
+#else
+
+#error Invalid value for RTE_PORT_RING_WRITER_APPROACH
+
+#endif
+
static int
rte_port_ring_writer_flush(void *port)
{
--
1.7.9.5
^ permalink raw reply [flat|nested] 8+ messages in thread
* Re: [dpdk-dev] [PATCH 1/3] port: added WRITER_APPROACH == 1 implementation to ring port
2015-03-30 9:56 ` [dpdk-dev] [PATCH 1/3] port: added WRITER_APPROACH == 1 implementation to ring port Maciej Gajdzica
@ 2015-03-30 20:50 ` David Marchand
0 siblings, 0 replies; 8+ messages in thread
From: David Marchand @ 2015-03-30 20:50 UTC (permalink / raw)
To: Maciej Gajdzica; +Cc: dev
On Mon, Mar 30, 2015 at 11:56 AM, Maciej Gajdzica <
maciejx.t.gajdzica@intel.com> wrote:
> Added better optimized implementation of tx_bulk for ring writer port
> based on
> similar solution in ethdev_writer port. New implementation sends burst
> without
> copying data to internal buffer if it is possible.
>
Well, if this is such a better implementation, then remove the old one.
We don't want dead code.
>
> ---
> lib/librte_port/rte_port_ring.c | 59
> +++++++++++++++++++++++++++++++++++++++
> 1 file changed, 59 insertions(+)
>
> diff --git a/lib/librte_port/rte_port_ring.c
> b/lib/librte_port/rte_port_ring.c
> index fa3d77b..ba2eeb3 100644
> --- a/lib/librte_port/rte_port_ring.c
> +++ b/lib/librte_port/rte_port_ring.c
> @@ -96,11 +96,14 @@ rte_port_ring_reader_free(void *port)
> /*
> * Port RING Writer
> */
> +#define RTE_PORT_RING_WRITER_APPROACH 1
> +
>
Seriously, can't we just drop this ?
Having build options everywhere is a pain.
Having this kind of stuff is worse ...
I can see other places that do the same, are these parts maintained ?
--
David Marchand
^ permalink raw reply [flat|nested] 8+ messages in thread
* [dpdk-dev] [PATCH 2/3] port: added ethdev_writer_nodrop port
2015-03-30 9:56 [dpdk-dev] [PATCH 0/3] port: added ethdev_writer_nodrop and ring_writer_nodrop ports Maciej Gajdzica
2015-03-30 9:56 ` [dpdk-dev] [PATCH 1/3] port: added WRITER_APPROACH == 1 implementation to ring port Maciej Gajdzica
@ 2015-03-30 9:56 ` Maciej Gajdzica
2015-03-30 10:22 ` Walukiewicz, Miroslaw
2015-03-30 9:56 ` [dpdk-dev] [PATCH 3/3] port: added ring_writer_nodrop port Maciej Gajdzica
2015-03-30 11:54 ` [dpdk-dev] [PATCH 0/3] port: added ethdev_writer_nodrop and ring_writer_nodrop ports Dumitrescu, Cristian
3 siblings, 1 reply; 8+ messages in thread
From: Maciej Gajdzica @ 2015-03-30 9:56 UTC (permalink / raw)
To: dev
When ethdev_writer_nodrop port fails to send data, it tries to resend. Operation
is aborted when maximum number of retries is reached.
---
lib/librte_port/rte_port_ethdev.c | 230 +++++++++++++++++++++++++++++++++++++
lib/librte_port/rte_port_ethdev.h | 19 +++
2 files changed, 249 insertions(+)
diff --git a/lib/librte_port/rte_port_ethdev.c b/lib/librte_port/rte_port_ethdev.c
index d014913..1f77cb5 100644
--- a/lib/librte_port/rte_port_ethdev.c
+++ b/lib/librte_port/rte_port_ethdev.c
@@ -31,6 +31,7 @@
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <string.h>
+#include <stdint.h>
#include <rte_mbuf.h>
#include <rte_ethdev.h>
@@ -288,6 +289,227 @@ rte_port_ethdev_writer_free(void *port)
}
/*
+ * Port ETHDEV Writer Nodrop
+ */
+#define RTE_PORT_ETHDEV_WRITER_NODROP_APPROACH 1
+
+struct rte_port_ethdev_writer_nodrop {
+ struct rte_mbuf *tx_buf[2 * RTE_PORT_IN_BURST_SIZE_MAX];
+ uint32_t tx_burst_sz;
+ uint16_t tx_buf_count;
+ uint64_t bsz_mask;
+ uint64_t n_retries;
+ uint16_t queue_id;
+ uint8_t port_id;
+};
+
+static void *
+rte_port_ethdev_writer_nodrop_create(void *params, int socket_id)
+{
+ struct rte_port_ethdev_writer_nodrop_params *conf =
+ (struct rte_port_ethdev_writer_nodrop_params *) params;
+ struct rte_port_ethdev_writer_nodrop *port;
+
+ /* Check input parameters */
+ if ((conf == NULL) ||
+ (conf->tx_burst_sz == 0) ||
+ (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX) ||
+ (!rte_is_power_of_2(conf->tx_burst_sz))) {
+ RTE_LOG(ERR, PORT, "%s: Invalid input parameters\n", __func__);
+ return NULL;
+ }
+
+ /* Memory allocation */
+ port = rte_zmalloc_socket("PORT", sizeof(*port),
+ RTE_CACHE_LINE_SIZE, socket_id);
+ if (port == NULL) {
+ RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
+ return NULL;
+ }
+
+ /* Initialization */
+ port->port_id = conf->port_id;
+ port->queue_id = conf->queue_id;
+ port->tx_burst_sz = conf->tx_burst_sz;
+ port->tx_buf_count = 0;
+ port->bsz_mask = 1LLU << (conf->tx_burst_sz - 1);
+
+ /*
+ * When n_retries is 0 it means that we should wait for every packet to
+ * send no matter how many retries should it take. To limit number of
+ * branches in fast path, we use UINT64_MAX instead of branching.
+ */
+ port->n_retries = (conf->n_retries == 0) ? UINT64_MAX : conf->n_retries;
+
+ return port;
+}
+
+static inline void
+send_burst_nodrop(struct rte_port_ethdev_writer_nodrop *p)
+{
+ uint32_t nb_tx = 0, i;
+
+ nb_tx = rte_eth_tx_burst(p->port_id, p->queue_id, p->tx_buf,
+ p->tx_buf_count);
+
+ /* We sent all the packets in a first try */
+ if (nb_tx >= p->tx_buf_count)
+ return;
+
+ for (i = 0; i < p->n_retries; i++) {
+ nb_tx += rte_eth_tx_burst(p->port_id, p->queue_id,
+ p->tx_buf + nb_tx, p->tx_buf_count - nb_tx);
+
+ /* We sent all the packets in more than one try */
+ if (nb_tx >= p->tx_buf_count)
+ return;
+ }
+
+ /* We didn't send the packets in maximum allowed attempts */
+ for ( ; nb_tx < p->tx_buf_count; nb_tx++)
+ rte_pktmbuf_free(p->tx_buf[nb_tx]);
+
+ p->tx_buf_count = 0;
+}
+
+static int
+rte_port_ethdev_writer_nodrop_tx(void *port, struct rte_mbuf *pkt)
+{
+ struct rte_port_ethdev_writer_nodrop *p =
+ (struct rte_port_ethdev_writer_nodrop *) port;
+
+ p->tx_buf[p->tx_buf_count++] = pkt;
+ if (p->tx_buf_count >= p->tx_burst_sz)
+ send_burst_nodrop(p);
+
+ return 0;
+}
+
+#if RTE_PORT_ETHDEV_WRITER_NODROP_APPROACH == 0
+
+static int
+rte_port_ethdev_writer_nodrop_tx_bulk(void *port,
+ struct rte_mbuf **pkts,
+ uint64_t pkts_mask)
+{
+ struct rte_port_ethdev_writer_nodrop *p =
+ (struct rte_port_ethdev_writer_nodrop *) port;
+
+ if ((pkts_mask & (pkts_mask + 1)) == 0) {
+ uint64_t n_pkts = __builtin_popcountll(pkts_mask);
+ uint32_t i;
+
+ for (i = 0; i < n_pkts; i++) {
+ struct rte_mbuf *pkt = pkts[i];
+
+ p->tx_buf[p->tx_buf_count++] = pkt;
+ if (p->tx_buf_count >= p->tx_burst_sz)
+ send_burst_nodrop(p);
+ }
+ } else {
+ for ( ; pkts_mask; ) {
+ uint32_t pkt_index = __builtin_ctzll(pkts_mask);
+ uint64_t pkt_mask = 1LLU << pkt_index;
+ struct rte_mbuf *pkt = pkts[pkt_index];
+
+ p->tx_buf[p->tx_buf_count++] = pkt;
+ if (p->tx_buf_count >= p->tx_burst_sz)
+ send_burst_nodrop(p);
+ pkts_mask &= ~pkt_mask;
+ }
+ }
+
+ return 0;
+}
+
+#elif RTE_PORT_ETHDEV_WRITER_NODROP_APPROACH == 1
+
+static int
+rte_port_ethdev_writer_nodrop_tx_bulk(void *port,
+ struct rte_mbuf **pkts,
+ uint64_t pkts_mask)
+{
+ struct rte_port_ethdev_writer_nodrop *p =
+ (struct rte_port_ethdev_writer_nodrop *) port;
+
+ uint32_t bsz_mask = p->bsz_mask;
+ uint32_t tx_buf_count = p->tx_buf_count;
+ uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
+ ((pkts_mask & bsz_mask) ^ bsz_mask);
+
+ if (expr == 0) {
+ uint64_t n_pkts = __builtin_popcountll(pkts_mask);
+ uint32_t n_pkts_ok;
+
+ if (tx_buf_count)
+ send_burst_nodrop(p);
+
+ n_pkts_ok = rte_eth_tx_burst(p->port_id, p->queue_id, pkts,
+ n_pkts);
+
+ if (n_pkts_ok >= n_pkts)
+ return 0;
+
+ /*
+ * If we didnt manage to send all packets in single burst, move
+ * remaining packets to the buffer and call send burst.
+ */
+ for (; n_pkts_ok < n_pkts; n_pkts_ok++) {
+ struct rte_mbuf *pkt = pkts[n_pkts_ok];
+ p->tx_buf[p->tx_buf_count++] = pkt;
+ }
+ send_burst_nodrop(p);
+ } else {
+ for ( ; pkts_mask; ) {
+ uint32_t pkt_index = __builtin_ctzll(pkts_mask);
+ uint64_t pkt_mask = 1LLU << pkt_index;
+ struct rte_mbuf *pkt = pkts[pkt_index];
+
+ p->tx_buf[tx_buf_count++] = pkt;
+ pkts_mask &= ~pkt_mask;
+ }
+
+ p->tx_buf_count = tx_buf_count;
+ if (tx_buf_count >= p->tx_burst_sz)
+ send_burst_nodrop(p);
+ }
+
+ return 0;
+}
+
+#else
+
+#error Invalid value for RTE_PORT_ETHDEV_WRITER_NODROP_APPROACH
+
+#endif
+
+static int
+rte_port_ethdev_writer_nodrop_flush(void *port)
+{
+ struct rte_port_ethdev_writer_nodrop *p =
+ (struct rte_port_ethdev_writer_nodrop *) port;
+
+ if (p->tx_buf_count > 0)
+ send_burst_nodrop(p);
+
+ return 0;
+}
+
+static int
+rte_port_ethdev_writer_nodrop_free(void *port)
+{
+ if (port == NULL) {
+ RTE_LOG(ERR, PORT, "%s: Port is NULL\n", __func__);
+ return -EINVAL;
+ }
+
+ rte_port_ethdev_writer_nodrop_flush(port);
+ rte_free(port);
+
+ return 0;
+}
+
+/*
* Summary of port operations
*/
struct rte_port_in_ops rte_port_ethdev_reader_ops = {
@@ -303,3 +525,11 @@ struct rte_port_out_ops rte_port_ethdev_writer_ops = {
.f_tx_bulk = rte_port_ethdev_writer_tx_bulk,
.f_flush = rte_port_ethdev_writer_flush,
};
+
+struct rte_port_out_ops rte_port_ethdev_writer_nodrop_ops = {
+ .f_create = rte_port_ethdev_writer_nodrop_create,
+ .f_free = rte_port_ethdev_writer_nodrop_free,
+ .f_tx = rte_port_ethdev_writer_nodrop_tx,
+ .f_tx_bulk = rte_port_ethdev_writer_nodrop_tx_bulk,
+ .f_flush = rte_port_ethdev_writer_nodrop_flush,
+};
diff --git a/lib/librte_port/rte_port_ethdev.h b/lib/librte_port/rte_port_ethdev.h
index af67a12..201a79e 100644
--- a/lib/librte_port/rte_port_ethdev.h
+++ b/lib/librte_port/rte_port_ethdev.h
@@ -79,6 +79,25 @@ struct rte_port_ethdev_writer_params {
/** ethdev_writer port operations */
extern struct rte_port_out_ops rte_port_ethdev_writer_ops;
+/** ethdev_writer_nodrop port parameters */
+struct rte_port_ethdev_writer_nodrop_params {
+ /** NIC RX port ID */
+ uint8_t port_id;
+
+ /** NIC RX queue ID */
+ uint16_t queue_id;
+
+ /** Recommended burst size to NIC TX queue. The actual burst size can be
+ bigger or smaller than this value. */
+ uint32_t tx_burst_sz;
+
+ /** Maximum number of retries, 0 for no limit */
+ uint32_t n_retries;
+};
+
+/** ethdev_writer_nodrop port operations */
+extern struct rte_port_out_ops rte_port_ethdev_writer_nodrop_ops;
+
#ifdef __cplusplus
}
#endif
--
1.7.9.5
^ permalink raw reply [flat|nested] 8+ messages in thread
* Re: [dpdk-dev] [PATCH 2/3] port: added ethdev_writer_nodrop port
2015-03-30 9:56 ` [dpdk-dev] [PATCH 2/3] port: added ethdev_writer_nodrop port Maciej Gajdzica
@ 2015-03-30 10:22 ` Walukiewicz, Miroslaw
0 siblings, 0 replies; 8+ messages in thread
From: Walukiewicz, Miroslaw @ 2015-03-30 10:22 UTC (permalink / raw)
To: Gajdzica, MaciejX T, dev
> -----Original Message-----
> From: dev [mailto:dev-bounces@dpdk.org] On Behalf Of Maciej Gajdzica
> Sent: Monday, March 30, 2015 11:57 AM
> To: dev@dpdk.org
> Subject: [dpdk-dev] [PATCH 2/3] port: added ethdev_writer_nodrop port
>
> When ethdev_writer_nodrop port fails to send data, it tries to resend.
> Operation
> is aborted when maximum number of retries is reached.
>
> ---
> lib/librte_port/rte_port_ethdev.c | 230
> +++++++++++++++++++++++++++++++++++++
> lib/librte_port/rte_port_ethdev.h | 19 +++
> 2 files changed, 249 insertions(+)
>
> diff --git a/lib/librte_port/rte_port_ethdev.c
> b/lib/librte_port/rte_port_ethdev.c
> index d014913..1f77cb5 100644
> --- a/lib/librte_port/rte_port_ethdev.c
> +++ b/lib/librte_port/rte_port_ethdev.c
> @@ -31,6 +31,7 @@
> * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
> DAMAGE.
> */
> #include <string.h>
> +#include <stdint.h>
>
> #include <rte_mbuf.h>
> #include <rte_ethdev.h>
> @@ -288,6 +289,227 @@ rte_port_ethdev_writer_free(void *port)
> }
>
> /*
> + * Port ETHDEV Writer Nodrop
> + */
> +#define RTE_PORT_ETHDEV_WRITER_NODROP_APPROACH 1
> +
> +struct rte_port_ethdev_writer_nodrop {
> + struct rte_mbuf *tx_buf[2 * RTE_PORT_IN_BURST_SIZE_MAX];
> + uint32_t tx_burst_sz;
> + uint16_t tx_buf_count;
> + uint64_t bsz_mask;
> + uint64_t n_retries;
> + uint16_t queue_id;
> + uint8_t port_id;
> +};
> +
> +static void *
> +rte_port_ethdev_writer_nodrop_create(void *params, int socket_id)
> +{
> + struct rte_port_ethdev_writer_nodrop_params *conf =
> + (struct rte_port_ethdev_writer_nodrop_params *)
> params;
> + struct rte_port_ethdev_writer_nodrop *port;
> +
> + /* Check input parameters */
> + if ((conf == NULL) ||
> + (conf->tx_burst_sz == 0) ||
> + (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX) ||
> + (!rte_is_power_of_2(conf->tx_burst_sz))) {
> + RTE_LOG(ERR, PORT, "%s: Invalid input parameters\n",
> __func__);
> + return NULL;
> + }
> +
> + /* Memory allocation */
> + port = rte_zmalloc_socket("PORT", sizeof(*port),
> + RTE_CACHE_LINE_SIZE, socket_id);
> + if (port == NULL) {
> + RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n",
> __func__);
> + return NULL;
> + }
> +
> + /* Initialization */
> + port->port_id = conf->port_id;
> + port->queue_id = conf->queue_id;
> + port->tx_burst_sz = conf->tx_burst_sz;
> + port->tx_buf_count = 0;
> + port->bsz_mask = 1LLU << (conf->tx_burst_sz - 1);
> +
> + /*
> + * When n_retries is 0 it means that we should wait for every packet
> to
> + * send no matter how many retries should it take. To limit number of
> + * branches in fast path, we use UINT64_MAX instead of branching.
> + */
> + port->n_retries = (conf->n_retries == 0) ? UINT64_MAX : conf-
> >n_retries;
> +
> + return port;
> +}
> +
> +static inline void
> +send_burst_nodrop(struct rte_port_ethdev_writer_nodrop *p)
> +{
> + uint32_t nb_tx = 0, i;
> +
> + nb_tx = rte_eth_tx_burst(p->port_id, p->queue_id, p->tx_buf,
> + p->tx_buf_count);
> +
> + /* We sent all the packets in a first try */
> + if (nb_tx >= p->tx_buf_count)
> + return;
> +
> + for (i = 0; i < p->n_retries; i++) {
> + nb_tx += rte_eth_tx_burst(p->port_id, p->queue_id,
> + p->tx_buf + nb_tx, p-
> >tx_buf_count - nb_tx);
> +
> + /* We sent all the packets in more than one try */
> + if (nb_tx >= p->tx_buf_count)
> + return;
> +
I think that this method will work only for single output port in pipeline.
For multiport configurations especially when TX HW rings are continuously overloaded
The driver will wait all the time on this loop.
As example Imagine that you have single TX thread in DPDK pipeline and you have multiple output Ethernet ports with different speeds
10G and 40G calling +send_burst_nodrop - both overloaded. The effect will be that 10G port will wait most of time and limit the speed of the 40G so
The effect expected is that 40G works on 10G speed ;-(
Similar effect possible when same speed ports are accessed by send_burst_nodrop and the DCB/or any other L2 flow control is enabled that limits the TX speed
The speed of whole system will be the speed of the slowest link.
}
> +
> + /* We didn't send the packets in maximum allowed attempts */
> + for ( ; nb_tx < p->tx_buf_count; nb_tx++)
> + rte_pktmbuf_free(p->tx_buf[nb_tx]);
> +
> + p->tx_buf_count = 0;
> +}
> +
> +static int
> +rte_port_ethdev_writer_nodrop_tx(void *port, struct rte_mbuf *pkt)
> +{
> + struct rte_port_ethdev_writer_nodrop *p =
> + (struct rte_port_ethdev_writer_nodrop *) port;
> +
> + p->tx_buf[p->tx_buf_count++] = pkt;
> + if (p->tx_buf_count >= p->tx_burst_sz)
> + send_burst_nodrop(p);
> +
> + return 0;
> +}
> +
> +#if RTE_PORT_ETHDEV_WRITER_NODROP_APPROACH == 0
> +
> +static int
> +rte_port_ethdev_writer_nodrop_tx_bulk(void *port,
> + struct rte_mbuf **pkts,
> + uint64_t pkts_mask)
> +{
> + struct rte_port_ethdev_writer_nodrop *p =
> + (struct rte_port_ethdev_writer_nodrop *) port;
> +
> + if ((pkts_mask & (pkts_mask + 1)) == 0) {
> + uint64_t n_pkts = __builtin_popcountll(pkts_mask);
> + uint32_t i;
> +
> + for (i = 0; i < n_pkts; i++) {
> + struct rte_mbuf *pkt = pkts[i];
> +
> + p->tx_buf[p->tx_buf_count++] = pkt;
> + if (p->tx_buf_count >= p->tx_burst_sz)
> + send_burst_nodrop(p);
> + }
> + } else {
> + for ( ; pkts_mask; ) {
> + uint32_t pkt_index = __builtin_ctzll(pkts_mask);
> + uint64_t pkt_mask = 1LLU << pkt_index;
> + struct rte_mbuf *pkt = pkts[pkt_index];
> +
> + p->tx_buf[p->tx_buf_count++] = pkt;
> + if (p->tx_buf_count >= p->tx_burst_sz)
> + send_burst_nodrop(p);
> + pkts_mask &= ~pkt_mask;
> + }
> + }
> +
> + return 0;
> +}
> +
> +#elif RTE_PORT_ETHDEV_WRITER_NODROP_APPROACH == 1
> +
> +static int
> +rte_port_ethdev_writer_nodrop_tx_bulk(void *port,
> + struct rte_mbuf **pkts,
> + uint64_t pkts_mask)
> +{
> + struct rte_port_ethdev_writer_nodrop *p =
> + (struct rte_port_ethdev_writer_nodrop *) port;
> +
> + uint32_t bsz_mask = p->bsz_mask;
> + uint32_t tx_buf_count = p->tx_buf_count;
> + uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
> + ((pkts_mask & bsz_mask) ^ bsz_mask);
> +
> + if (expr == 0) {
> + uint64_t n_pkts = __builtin_popcountll(pkts_mask);
> + uint32_t n_pkts_ok;
> +
> + if (tx_buf_count)
> + send_burst_nodrop(p);
> +
> + n_pkts_ok = rte_eth_tx_burst(p->port_id, p->queue_id, pkts,
> + n_pkts);
> +
> + if (n_pkts_ok >= n_pkts)
> + return 0;
> +
> + /*
> + * If we didnt manage to send all packets in single burst, move
> + * remaining packets to the buffer and call send burst.
> + */
> + for (; n_pkts_ok < n_pkts; n_pkts_ok++) {
> + struct rte_mbuf *pkt = pkts[n_pkts_ok];
> + p->tx_buf[p->tx_buf_count++] = pkt;
> + }
> + send_burst_nodrop(p);
> + } else {
> + for ( ; pkts_mask; ) {
> + uint32_t pkt_index = __builtin_ctzll(pkts_mask);
> + uint64_t pkt_mask = 1LLU << pkt_index;
> + struct rte_mbuf *pkt = pkts[pkt_index];
> +
> + p->tx_buf[tx_buf_count++] = pkt;
> + pkts_mask &= ~pkt_mask;
> + }
> +
> + p->tx_buf_count = tx_buf_count;
> + if (tx_buf_count >= p->tx_burst_sz)
> + send_burst_nodrop(p);
> + }
> +
> + return 0;
> +}
> +
> +#else
> +
> +#error Invalid value for RTE_PORT_ETHDEV_WRITER_NODROP_APPROACH
> +
> +#endif
> +
> +static int
> +rte_port_ethdev_writer_nodrop_flush(void *port)
> +{
> + struct rte_port_ethdev_writer_nodrop *p =
> + (struct rte_port_ethdev_writer_nodrop *) port;
> +
> + if (p->tx_buf_count > 0)
> + send_burst_nodrop(p);
> +
> + return 0;
> +}
> +
> +static int
> +rte_port_ethdev_writer_nodrop_free(void *port)
> +{
> + if (port == NULL) {
> + RTE_LOG(ERR, PORT, "%s: Port is NULL\n", __func__);
> + return -EINVAL;
> + }
> +
> + rte_port_ethdev_writer_nodrop_flush(port);
> + rte_free(port);
> +
> + return 0;
> +}
> +
> +/*
> * Summary of port operations
> */
> struct rte_port_in_ops rte_port_ethdev_reader_ops = {
> @@ -303,3 +525,11 @@ struct rte_port_out_ops
> rte_port_ethdev_writer_ops = {
> .f_tx_bulk = rte_port_ethdev_writer_tx_bulk,
> .f_flush = rte_port_ethdev_writer_flush,
> };
> +
> +struct rte_port_out_ops rte_port_ethdev_writer_nodrop_ops = {
> + .f_create = rte_port_ethdev_writer_nodrop_create,
> + .f_free = rte_port_ethdev_writer_nodrop_free,
> + .f_tx = rte_port_ethdev_writer_nodrop_tx,
> + .f_tx_bulk = rte_port_ethdev_writer_nodrop_tx_bulk,
> + .f_flush = rte_port_ethdev_writer_nodrop_flush,
> +};
> diff --git a/lib/librte_port/rte_port_ethdev.h
> b/lib/librte_port/rte_port_ethdev.h
> index af67a12..201a79e 100644
> --- a/lib/librte_port/rte_port_ethdev.h
> +++ b/lib/librte_port/rte_port_ethdev.h
> @@ -79,6 +79,25 @@ struct rte_port_ethdev_writer_params {
> /** ethdev_writer port operations */
> extern struct rte_port_out_ops rte_port_ethdev_writer_ops;
>
> +/** ethdev_writer_nodrop port parameters */
> +struct rte_port_ethdev_writer_nodrop_params {
> + /** NIC RX port ID */
> + uint8_t port_id;
> +
> + /** NIC RX queue ID */
> + uint16_t queue_id;
> +
> + /** Recommended burst size to NIC TX queue. The actual burst size
> can be
> + bigger or smaller than this value. */
> + uint32_t tx_burst_sz;
> +
> + /** Maximum number of retries, 0 for no limit */
> + uint32_t n_retries;
> +};
> +
> +/** ethdev_writer_nodrop port operations */
> +extern struct rte_port_out_ops rte_port_ethdev_writer_nodrop_ops;
> +
> #ifdef __cplusplus
> }
> #endif
> --
> 1.7.9.5
^ permalink raw reply [flat|nested] 8+ messages in thread
* [dpdk-dev] [PATCH 3/3] port: added ring_writer_nodrop port
2015-03-30 9:56 [dpdk-dev] [PATCH 0/3] port: added ethdev_writer_nodrop and ring_writer_nodrop ports Maciej Gajdzica
2015-03-30 9:56 ` [dpdk-dev] [PATCH 1/3] port: added WRITER_APPROACH == 1 implementation to ring port Maciej Gajdzica
2015-03-30 9:56 ` [dpdk-dev] [PATCH 2/3] port: added ethdev_writer_nodrop port Maciej Gajdzica
@ 2015-03-30 9:56 ` Maciej Gajdzica
2015-03-30 11:54 ` [dpdk-dev] [PATCH 0/3] port: added ethdev_writer_nodrop and ring_writer_nodrop ports Dumitrescu, Cristian
3 siblings, 0 replies; 8+ messages in thread
From: Maciej Gajdzica @ 2015-03-30 9:56 UTC (permalink / raw)
To: dev
When ethdev_writer_nodrop port fails to send data, it tries to resend. Operation
is aborted when maximum number of retries is reached.
---
lib/librte_port/rte_port_ring.c | 226 +++++++++++++++++++++++++++++++++++++++
lib/librte_port/rte_port_ring.h | 16 +++
2 files changed, 242 insertions(+)
diff --git a/lib/librte_port/rte_port_ring.c b/lib/librte_port/rte_port_ring.c
index ba2eeb3..371bb1e 100644
--- a/lib/librte_port/rte_port_ring.c
+++ b/lib/librte_port/rte_port_ring.c
@@ -31,6 +31,7 @@
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <string.h>
+#include <stdint.h>
#include <rte_mbuf.h>
#include <rte_ring.h>
@@ -279,6 +280,223 @@ rte_port_ring_writer_free(void *port)
}
/*
+ * Port RING Writer Nodrop
+ */
+#define RTE_PORT_RING_WRITER_NODROP_APPROACH 1
+
+struct rte_port_ring_writer_nodrop {
+ struct rte_mbuf *tx_buf[RTE_PORT_IN_BURST_SIZE_MAX];
+ struct rte_ring *ring;
+ uint32_t tx_burst_sz;
+ uint32_t tx_buf_count;
+ uint64_t bsz_mask;
+ uint64_t n_retries;
+};
+
+static void *
+rte_port_ring_writer_nodrop_create(void *params, int socket_id)
+{
+ struct rte_port_ring_writer_nodrop_params *conf =
+ (struct rte_port_ring_writer_nodrop_params *) params;
+ struct rte_port_ring_writer_nodrop *port;
+
+ /* Check input parameters */
+ if ((conf == NULL) ||
+ (conf->ring == NULL) ||
+ (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
+ RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
+ return NULL;
+ }
+
+ /* Memory allocation */
+ port = rte_zmalloc_socket("PORT", sizeof(*port),
+ RTE_CACHE_LINE_SIZE, socket_id);
+ if (port == NULL) {
+ RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
+ return NULL;
+ }
+
+ /* Initialization */
+ port->ring = conf->ring;
+ port->tx_burst_sz = conf->tx_burst_sz;
+ port->tx_buf_count = 0;
+ port->bsz_mask = 1LLU << (conf->tx_burst_sz - 1);
+
+ /*
+ * When n_retries is 0 it means that we should wait for every packet to
+ * send no matter how many retries should it take. To limit number of
+ * branches in fast path, we use UINT64_MAX instead of branching.
+ */
+ port->n_retries = (conf->n_retries == 0) ? UINT64_MAX : conf->n_retries;
+
+ return port;
+}
+
+static inline void
+send_burst_nodrop(struct rte_port_ring_writer_nodrop *p)
+{
+ uint32_t nb_tx = 0, i;
+
+ nb_tx = rte_ring_sp_enqueue_burst(p->ring, (void **)p->tx_buf,
+ p->tx_buf_count);
+
+ /* We sent all the packets in a first try */
+ if (nb_tx >= p->tx_buf_count)
+ return;
+
+ for (i = 0; i < p->n_retries; i++) {
+ nb_tx += rte_ring_sp_enqueue_burst(p->ring,
+ (void **) (p->tx_buf + nb_tx), p->tx_buf_count - nb_tx);
+
+ /* We sent all the packets in more than one try */
+ if (nb_tx >= p->tx_buf_count)
+ return;
+ }
+
+ /* We didn't send the packets in maximum allowed attempts */
+ for ( ; nb_tx < p->tx_buf_count; nb_tx++)
+ rte_pktmbuf_free(p->tx_buf[nb_tx]);
+
+ p->tx_buf_count = 0;
+}
+
+static int
+rte_port_ring_writer_nodrop_tx(void *port, struct rte_mbuf *pkt)
+{
+ struct rte_port_ring_writer_nodrop *p =
+ (struct rte_port_ring_writer_nodrop *) port;
+
+ p->tx_buf[p->tx_buf_count++] = pkt;
+ if (p->tx_buf_count >= p->tx_burst_sz)
+ send_burst_nodrop(p);
+
+ return 0;
+}
+
+#if RTE_PORT_RING_WRITER_NODROP_APPROACH == 0
+
+static int
+rte_port_ring_writer_nodrop_tx_bulk(void *port,
+ struct rte_mbuf **pkts,
+ uint64_t pkts_mask)
+{
+ struct rte_port_ring_writer_nodrop *p =
+ (struct rte_port_ring_writer_nodrop *) port;
+
+ if ((pkts_mask & (pkts_mask + 1)) == 0) {
+ uint64_t n_pkts = __builtin_popcountll(pkts_mask);
+ uint32_t i;
+
+ for (i = 0; i < n_pkts; i++) {
+ struct rte_mbuf *pkt = pkts[i];
+
+ p->tx_buf[p->tx_buf_count++] = pkt;
+ if (p->tx_buf_count >= p->tx_burst_sz)
+ send_burst_nodrop(p);
+ }
+ } else {
+ for ( ; pkts_mask; ) {
+ uint32_t pkt_index = __builtin_ctzll(pkts_mask);
+ uint64_t pkt_mask = 1LLU << pkt_index;
+ struct rte_mbuf *pkt = pkts[pkt_index];
+
+ p->tx_buf[p->tx_buf_count++] = pkt;
+ if (p->tx_buf_count >= p->tx_burst_sz)
+ send_burst_nodrop(p);
+ pkts_mask &= ~pkt_mask;
+ }
+ }
+
+ return 0;
+}
+
+#elif RTE_PORT_RING_WRITER_NODROP_APPROACH == 1
+
+static int
+rte_port_ring_writer_nodrop_tx_bulk(void *port,
+ struct rte_mbuf **pkts,
+ uint64_t pkts_mask)
+{
+ struct rte_port_ring_writer_nodrop *p =
+ (struct rte_port_ring_writer_nodrop *) port;
+
+ uint32_t bsz_mask = p->bsz_mask;
+ uint32_t tx_buf_count = p->tx_buf_count;
+ uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
+ ((pkts_mask & bsz_mask) ^ bsz_mask);
+
+ if (expr == 0) {
+ uint64_t n_pkts = __builtin_popcountll(pkts_mask);
+ uint32_t n_pkts_ok;
+
+ if (tx_buf_count)
+ send_burst_nodrop(p);
+
+ n_pkts_ok = rte_ring_sp_enqueue_burst(p->ring, (void **)pkts, n_pkts);
+
+ if (n_pkts_ok >= n_pkts)
+ return 0;
+
+ /*
+ * If we didnt manage to send all packets in single burst, move
+ * remaining packets to the buffer and call send burst.
+ */
+ for (; n_pkts_ok < n_pkts; n_pkts_ok++) {
+ struct rte_mbuf *pkt = pkts[n_pkts_ok];
+ p->tx_buf[p->tx_buf_count++] = pkt;
+ }
+ send_burst_nodrop(p);
+ } else {
+ for ( ; pkts_mask; ) {
+ uint32_t pkt_index = __builtin_ctzll(pkts_mask);
+ uint64_t pkt_mask = 1LLU << pkt_index;
+ struct rte_mbuf *pkt = pkts[pkt_index];
+
+ p->tx_buf[tx_buf_count++] = pkt;
+ pkts_mask &= ~pkt_mask;
+ }
+
+ p->tx_buf_count = tx_buf_count;
+ if (tx_buf_count >= p->tx_burst_sz)
+ send_burst_nodrop(p);
+ }
+
+ return 0;
+}
+
+#else
+
+#error Invalid value for RTE_PORT_RING_WRITER_NODROP_APPROACH
+
+#endif
+
+static int
+rte_port_ring_writer_nodrop_flush(void *port)
+{
+ struct rte_port_ring_writer_nodrop *p =
+ (struct rte_port_ring_writer_nodrop *) port;
+
+ if (p->tx_buf_count > 0)
+ send_burst_nodrop(p);
+
+ return 0;
+}
+
+static int
+rte_port_ring_writer_nodrop_free(void *port)
+{
+ if (port == NULL) {
+ RTE_LOG(ERR, PORT, "%s: Port is NULL\n", __func__);
+ return -EINVAL;
+ }
+
+ rte_port_ring_writer_nodrop_flush(port);
+ rte_free(port);
+
+ return 0;
+}
+
+/*
* Summary of port operations
*/
struct rte_port_in_ops rte_port_ring_reader_ops = {
@@ -294,3 +512,11 @@ struct rte_port_out_ops rte_port_ring_writer_ops = {
.f_tx_bulk = rte_port_ring_writer_tx_bulk,
.f_flush = rte_port_ring_writer_flush,
};
+
+struct rte_port_out_ops rte_port_ring_writer_nodrop_ops = {
+ .f_create = rte_port_ring_writer_nodrop_create,
+ .f_free = rte_port_ring_writer_nodrop_free,
+ .f_tx = rte_port_ring_writer_nodrop_tx,
+ .f_tx_bulk = rte_port_ring_writer_nodrop_tx_bulk,
+ .f_flush = rte_port_ring_writer_nodrop_flush,
+};
diff --git a/lib/librte_port/rte_port_ring.h b/lib/librte_port/rte_port_ring.h
index 009dcf8..ea5ad92 100644
--- a/lib/librte_port/rte_port_ring.h
+++ b/lib/librte_port/rte_port_ring.h
@@ -75,6 +75,22 @@ struct rte_port_ring_writer_params {
/** ring_writer port operations */
extern struct rte_port_out_ops rte_port_ring_writer_ops;
+/** ring_writer port parameters */
+struct rte_port_ring_writer_nodrop_params {
+ /** Underlying single producer ring that has to be pre-initialized */
+ struct rte_ring *ring;
+
+ /** Recommended burst size to ring. The actual burst size can be
+ bigger or smaller than this value. */
+ uint32_t tx_burst_sz;
+
+ /** Maximum number of retries, 0 for no limit */
+ uint32_t n_retries;
+};
+
+/** ring_writer port operations */
+extern struct rte_port_out_ops rte_port_ring_writer_nodrop_ops;
+
#ifdef __cplusplus
}
#endif
--
1.7.9.5
^ permalink raw reply [flat|nested] 8+ messages in thread
* Re: [dpdk-dev] [PATCH 0/3] port: added ethdev_writer_nodrop and ring_writer_nodrop ports
2015-03-30 9:56 [dpdk-dev] [PATCH 0/3] port: added ethdev_writer_nodrop and ring_writer_nodrop ports Maciej Gajdzica
` (2 preceding siblings ...)
2015-03-30 9:56 ` [dpdk-dev] [PATCH 3/3] port: added ring_writer_nodrop port Maciej Gajdzica
@ 2015-03-30 11:54 ` Dumitrescu, Cristian
3 siblings, 0 replies; 8+ messages in thread
From: Dumitrescu, Cristian @ 2015-03-30 11:54 UTC (permalink / raw)
To: Gajdzica, MaciejX T, dev
> -----Original Message-----
> From: dev [mailto:dev-bounces@dpdk.org] On Behalf Of Maciej Gajdzica
> Sent: Monday, March 30, 2015 10:57 AM
> To: dev@dpdk.org
> Subject: [dpdk-dev] [PATCH 0/3] port: added ethdev_writer_nodrop and
> ring_writer_nodrop ports
>
> When nodrop writer port fails to send data, it retries until reach maximum
> number of retries. Also added new tx_bulk implementation for ring writer
> port.
>
> Maciej Gajdzica (3):
> port: added WRITER_APPROACH == 1 implementation to ring port
> port: added ethdev_writer_nodrop port
> port: added ring_writer_nodrop port
>
> lib/librte_port/rte_port_ethdev.c | 230
> ++++++++++++++++++++++++++++++
> lib/librte_port/rte_port_ethdev.h | 19 +++
> lib/librte_port/rte_port_ring.c | 285
> +++++++++++++++++++++++++++++++++++++
> lib/librte_port/rte_port_ring.h | 16 +++
> 4 files changed, 550 insertions(+)
>
> --
> 1.7.9.5
Acked by: Cristian Dumitrescu <cristian.dumitrescu@intel.com>
^ permalink raw reply [flat|nested] 8+ messages in thread