* [dpdk-dev] [PATCH v2 1/3] port: added WRITER_APPROACH == 1 implementation to ring port
2015-04-30 11:58 [dpdk-dev] [PATCH v2 0/3] port: added ethdev_writer_nodrop and ring_writer_nodrop ports Michal Jastrzebski
@ 2015-04-30 11:58 ` Michal Jastrzebski
2015-05-17 21:44 ` Thomas Monjalon
2015-04-30 11:58 ` [dpdk-dev] [PATCH v2 2/3] port: added ethdev_writer_nodrop port Michal Jastrzebski
` (2 subsequent siblings)
3 siblings, 1 reply; 8+ messages in thread
From: Michal Jastrzebski @ 2015-04-30 11:58 UTC (permalink / raw)
To: dev
From: Maciej Gajdzica <maciejx.t.gajdzica@intel.com>
Added better optimized implementation of tx_bulk for ring writer port
based on
similar solution in ethdev_writer port. New implementation sends burst
without
copying data to internal buffer if it is possible.
Signed-off-by: Maciej Gajdzica <maciejx.t.gajdzica@intel.com>
---
lib/librte_port/rte_port_ring.c | 59 +++++++++++++++++++++++++++++++++++++++
1 file changed, 59 insertions(+)
diff --git a/lib/librte_port/rte_port_ring.c b/lib/librte_port/rte_port_ring.c
index fa3d77b..ba2eeb3 100644
--- a/lib/librte_port/rte_port_ring.c
+++ b/lib/librte_port/rte_port_ring.c
@@ -96,11 +96,14 @@ rte_port_ring_reader_free(void *port)
/*
* Port RING Writer
*/
+#define RTE_PORT_RING_WRITER_APPROACH 1
+
struct rte_port_ring_writer {
struct rte_mbuf *tx_buf[RTE_PORT_IN_BURST_SIZE_MAX];
struct rte_ring *ring;
uint32_t tx_burst_sz;
uint32_t tx_buf_count;
+ uint64_t bsz_mask;
};
static void *
@@ -130,6 +133,7 @@ rte_port_ring_writer_create(void *params, int socket_id)
port->ring = conf->ring;
port->tx_burst_sz = conf->tx_burst_sz;
port->tx_buf_count = 0;
+ port->bsz_mask = 1LLU << (conf->tx_burst_sz - 1);
return port;
}
@@ -160,6 +164,8 @@ rte_port_ring_writer_tx(void *port, struct rte_mbuf *pkt)
return 0;
}
+#if RTE_PORT_RING_WRITER_APPROACH == 0
+
static int
rte_port_ring_writer_tx_bulk(void *port,
struct rte_mbuf **pkts,
@@ -194,6 +200,59 @@ rte_port_ring_writer_tx_bulk(void *port,
return 0;
}
+#elif RTE_PORT_RING_WRITER_APPROACH == 1
+
+static int
+rte_port_ring_writer_tx_bulk(void *port,
+ struct rte_mbuf **pkts,
+ uint64_t pkts_mask)
+{
+ struct rte_port_ring_writer *p =
+ (struct rte_port_ring_writer *) port;
+
+ uint32_t bsz_mask = p->bsz_mask;
+ uint32_t tx_buf_count = p->tx_buf_count;
+ uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
+ ((pkts_mask & bsz_mask) ^ bsz_mask);
+
+ if (expr == 0) {
+ uint64_t n_pkts = __builtin_popcountll(pkts_mask);
+ uint32_t n_pkts_ok;
+
+ if (tx_buf_count)
+ send_burst(p);
+
+ n_pkts_ok = rte_ring_sp_enqueue_burst(p->ring, (void **)pkts, n_pkts);
+
+ for ( ; n_pkts_ok < n_pkts; n_pkts_ok++) {
+ struct rte_mbuf *pkt = pkts[n_pkts_ok];
+
+ rte_pktmbuf_free(pkt);
+ }
+ } else {
+ for ( ; pkts_mask; ) {
+ uint32_t pkt_index = __builtin_ctzll(pkts_mask);
+ uint64_t pkt_mask = 1LLU << pkt_index;
+ struct rte_mbuf *pkt = pkts[pkt_index];
+
+ p->tx_buf[tx_buf_count++] = pkt;
+ pkts_mask &= ~pkt_mask;
+ }
+
+ p->tx_buf_count = tx_buf_count;
+ if (tx_buf_count >= p->tx_burst_sz)
+ send_burst(p);
+ }
+
+ return 0;
+}
+
+#else
+
+#error Invalid value for RTE_PORT_RING_WRITER_APPROACH
+
+#endif
+
static int
rte_port_ring_writer_flush(void *port)
{
--
1.7.9.5
^ permalink raw reply [flat|nested] 8+ messages in thread
* Re: [dpdk-dev] [PATCH v2 1/3] port: added WRITER_APPROACH == 1 implementation to ring port
2015-04-30 11:58 ` [dpdk-dev] [PATCH v2 1/3] port: added WRITER_APPROACH == 1 implementation to ring port Michal Jastrzebski
@ 2015-05-17 21:44 ` Thomas Monjalon
2015-05-19 22:49 ` Dumitrescu, Cristian
0 siblings, 1 reply; 8+ messages in thread
From: Thomas Monjalon @ 2015-05-17 21:44 UTC (permalink / raw)
To: Michal Jastrzebski, Maciej Gajdzica; +Cc: dev
2015-04-30 13:58, Michal Jastrzebski:
> From: Maciej Gajdzica <maciejx.t.gajdzica@intel.com>
>
> Added better optimized implementation of tx_bulk for ring writer port
> based on
> similar solution in ethdev_writer port. New implementation sends burst
> without
> copying data to internal buffer if it is possible.
>
> Signed-off-by: Maciej Gajdzica <maciejx.t.gajdzica@intel.com>
[...]
> +#if RTE_PORT_RING_WRITER_APPROACH == 0
Nack
Maybe you missed the previous comment:
http://dpdk.org/ml/archives/dev/2015-March/015999.html
^ permalink raw reply [flat|nested] 8+ messages in thread
* Re: [dpdk-dev] [PATCH v2 1/3] port: added WRITER_APPROACH == 1 implementation to ring port
2015-05-17 21:44 ` Thomas Monjalon
@ 2015-05-19 22:49 ` Dumitrescu, Cristian
2015-05-19 23:19 ` Thomas Monjalon
0 siblings, 1 reply; 8+ messages in thread
From: Dumitrescu, Cristian @ 2015-05-19 22:49 UTC (permalink / raw)
To: Thomas Monjalon, Jastrzebski, MichalX K, Gajdzica, MaciejX T; +Cc: dev
HI Thomas and David,
We can remove one of the code branches if you guys feel strongly about it.
The reason we recommended to keep both is because both of them are working, and we wanted to keep both of them for a while to get some feedback from other people about which one they prefer. It might be that some apps would prefer one over the other for performance reasons. But again, we can resend this patch with only one code path present.
Regards,
Cristian
> -----Original Message-----
> From: dev [mailto:dev-bounces@dpdk.org] On Behalf Of Thomas Monjalon
> Sent: Sunday, May 17, 2015 10:45 PM
> To: Jastrzebski, MichalX K; Gajdzica, MaciejX T
> Cc: dev@dpdk.org
> Subject: Re: [dpdk-dev] [PATCH v2 1/3] port: added WRITER_APPROACH == 1
> implementation to ring port
>
> 2015-04-30 13:58, Michal Jastrzebski:
> > From: Maciej Gajdzica <maciejx.t.gajdzica@intel.com>
> >
> > Added better optimized implementation of tx_bulk for ring writer port
> > based on
> > similar solution in ethdev_writer port. New implementation sends burst
> > without
> > copying data to internal buffer if it is possible.
> >
> > Signed-off-by: Maciej Gajdzica <maciejx.t.gajdzica@intel.com>
> [...]
> > +#if RTE_PORT_RING_WRITER_APPROACH == 0
>
> Nack
> Maybe you missed the previous comment:
> http://dpdk.org/ml/archives/dev/2015-March/015999.html
^ permalink raw reply [flat|nested] 8+ messages in thread
* Re: [dpdk-dev] [PATCH v2 1/3] port: added WRITER_APPROACH == 1 implementation to ring port
2015-05-19 22:49 ` Dumitrescu, Cristian
@ 2015-05-19 23:19 ` Thomas Monjalon
0 siblings, 0 replies; 8+ messages in thread
From: Thomas Monjalon @ 2015-05-19 23:19 UTC (permalink / raw)
To: Dumitrescu, Cristian; +Cc: dev
Hi Cristian,
2015-05-19 22:49, Dumitrescu, Cristian:
> HI Thomas and David,
>
> We can remove one of the code branches if you guys feel strongly about it.
We don't feel strongly as we are not authors nor testers of this new code.
But we want to avoid the maintenance nightmare of #ifdef.
> The reason we recommended to keep both is because both of them are working,
> and we wanted to keep both of them for a while to get some feedback from
> other people about which one they prefer. It might be that some apps would
> prefer one over the other for performance reasons. But again, we can resend
> this patch with only one code path present.
In general, RFC patches are used to request feedbacks.
I think it's better to have only one implementation at a time.
Comments are welcome.
> > -----Original Message-----
> > From: dev [mailto:dev-bounces@dpdk.org] On Behalf Of Thomas Monjalon
> > Sent: Sunday, May 17, 2015 10:45 PM
> > To: Jastrzebski, MichalX K; Gajdzica, MaciejX T
> > Cc: dev@dpdk.org
> > Subject: Re: [dpdk-dev] [PATCH v2 1/3] port: added WRITER_APPROACH == 1
> > implementation to ring port
> >
> > 2015-04-30 13:58, Michal Jastrzebski:
> > > From: Maciej Gajdzica <maciejx.t.gajdzica@intel.com>
> > >
> > > Added better optimized implementation of tx_bulk for ring writer port
> > > based on
> > > similar solution in ethdev_writer port. New implementation sends burst
> > > without
> > > copying data to internal buffer if it is possible.
> > >
> > > Signed-off-by: Maciej Gajdzica <maciejx.t.gajdzica@intel.com>
> > [...]
> > > +#if RTE_PORT_RING_WRITER_APPROACH == 0
> >
> > Nack
> > Maybe you missed the previous comment:
> > http://dpdk.org/ml/archives/dev/2015-March/015999.html
^ permalink raw reply [flat|nested] 8+ messages in thread
* [dpdk-dev] [PATCH v2 2/3] port: added ethdev_writer_nodrop port
2015-04-30 11:58 [dpdk-dev] [PATCH v2 0/3] port: added ethdev_writer_nodrop and ring_writer_nodrop ports Michal Jastrzebski
2015-04-30 11:58 ` [dpdk-dev] [PATCH v2 1/3] port: added WRITER_APPROACH == 1 implementation to ring port Michal Jastrzebski
@ 2015-04-30 11:58 ` Michal Jastrzebski
2015-04-30 11:58 ` [dpdk-dev] [PATCH v2 3/3] port: added ring_writer_nodrop port Michal Jastrzebski
2015-05-05 15:06 ` [dpdk-dev] [PATCH v2 0/3] port: added ethdev_writer_nodrop and ring_writer_nodrop ports Dumitrescu, Cristian
3 siblings, 0 replies; 8+ messages in thread
From: Michal Jastrzebski @ 2015-04-30 11:58 UTC (permalink / raw)
To: dev
From: Maciej Gajdzica <maciejx.t.gajdzica@intel.com>
When ethdev_writer_nodrop port fails to send data, it tries to resend.
Operation
is aborted when maximum number of retries is reached.
Signed-off-by: Maciej Gajdzica <maciejx.t.gajdzica@intel.com>
---
lib/librte_port/rte_port_ethdev.c | 230 +++++++++++++++++++++++++++++++++++++
lib/librte_port/rte_port_ethdev.h | 19 +++
2 files changed, 249 insertions(+)
diff --git a/lib/librte_port/rte_port_ethdev.c b/lib/librte_port/rte_port_ethdev.c
index d014913..1f77cb5 100644
--- a/lib/librte_port/rte_port_ethdev.c
+++ b/lib/librte_port/rte_port_ethdev.c
@@ -31,6 +31,7 @@
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <string.h>
+#include <stdint.h>
#include <rte_mbuf.h>
#include <rte_ethdev.h>
@@ -288,6 +289,227 @@ rte_port_ethdev_writer_free(void *port)
}
/*
+ * Port ETHDEV Writer Nodrop
+ */
+#define RTE_PORT_ETHDEV_WRITER_NODROP_APPROACH 1
+
+struct rte_port_ethdev_writer_nodrop {
+ struct rte_mbuf *tx_buf[2 * RTE_PORT_IN_BURST_SIZE_MAX];
+ uint32_t tx_burst_sz;
+ uint16_t tx_buf_count;
+ uint64_t bsz_mask;
+ uint64_t n_retries;
+ uint16_t queue_id;
+ uint8_t port_id;
+};
+
+static void *
+rte_port_ethdev_writer_nodrop_create(void *params, int socket_id)
+{
+ struct rte_port_ethdev_writer_nodrop_params *conf =
+ (struct rte_port_ethdev_writer_nodrop_params *) params;
+ struct rte_port_ethdev_writer_nodrop *port;
+
+ /* Check input parameters */
+ if ((conf == NULL) ||
+ (conf->tx_burst_sz == 0) ||
+ (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX) ||
+ (!rte_is_power_of_2(conf->tx_burst_sz))) {
+ RTE_LOG(ERR, PORT, "%s: Invalid input parameters\n", __func__);
+ return NULL;
+ }
+
+ /* Memory allocation */
+ port = rte_zmalloc_socket("PORT", sizeof(*port),
+ RTE_CACHE_LINE_SIZE, socket_id);
+ if (port == NULL) {
+ RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
+ return NULL;
+ }
+
+ /* Initialization */
+ port->port_id = conf->port_id;
+ port->queue_id = conf->queue_id;
+ port->tx_burst_sz = conf->tx_burst_sz;
+ port->tx_buf_count = 0;
+ port->bsz_mask = 1LLU << (conf->tx_burst_sz - 1);
+
+ /*
+ * When n_retries is 0 it means that we should wait for every packet to
+ * send no matter how many retries should it take. To limit number of
+ * branches in fast path, we use UINT64_MAX instead of branching.
+ */
+ port->n_retries = (conf->n_retries == 0) ? UINT64_MAX : conf->n_retries;
+
+ return port;
+}
+
+static inline void
+send_burst_nodrop(struct rte_port_ethdev_writer_nodrop *p)
+{
+ uint32_t nb_tx = 0, i;
+
+ nb_tx = rte_eth_tx_burst(p->port_id, p->queue_id, p->tx_buf,
+ p->tx_buf_count);
+
+ /* We sent all the packets in a first try */
+ if (nb_tx >= p->tx_buf_count)
+ return;
+
+ for (i = 0; i < p->n_retries; i++) {
+ nb_tx += rte_eth_tx_burst(p->port_id, p->queue_id,
+ p->tx_buf + nb_tx, p->tx_buf_count - nb_tx);
+
+ /* We sent all the packets in more than one try */
+ if (nb_tx >= p->tx_buf_count)
+ return;
+ }
+
+ /* We didn't send the packets in maximum allowed attempts */
+ for ( ; nb_tx < p->tx_buf_count; nb_tx++)
+ rte_pktmbuf_free(p->tx_buf[nb_tx]);
+
+ p->tx_buf_count = 0;
+}
+
+static int
+rte_port_ethdev_writer_nodrop_tx(void *port, struct rte_mbuf *pkt)
+{
+ struct rte_port_ethdev_writer_nodrop *p =
+ (struct rte_port_ethdev_writer_nodrop *) port;
+
+ p->tx_buf[p->tx_buf_count++] = pkt;
+ if (p->tx_buf_count >= p->tx_burst_sz)
+ send_burst_nodrop(p);
+
+ return 0;
+}
+
+#if RTE_PORT_ETHDEV_WRITER_NODROP_APPROACH == 0
+
+static int
+rte_port_ethdev_writer_nodrop_tx_bulk(void *port,
+ struct rte_mbuf **pkts,
+ uint64_t pkts_mask)
+{
+ struct rte_port_ethdev_writer_nodrop *p =
+ (struct rte_port_ethdev_writer_nodrop *) port;
+
+ if ((pkts_mask & (pkts_mask + 1)) == 0) {
+ uint64_t n_pkts = __builtin_popcountll(pkts_mask);
+ uint32_t i;
+
+ for (i = 0; i < n_pkts; i++) {
+ struct rte_mbuf *pkt = pkts[i];
+
+ p->tx_buf[p->tx_buf_count++] = pkt;
+ if (p->tx_buf_count >= p->tx_burst_sz)
+ send_burst_nodrop(p);
+ }
+ } else {
+ for ( ; pkts_mask; ) {
+ uint32_t pkt_index = __builtin_ctzll(pkts_mask);
+ uint64_t pkt_mask = 1LLU << pkt_index;
+ struct rte_mbuf *pkt = pkts[pkt_index];
+
+ p->tx_buf[p->tx_buf_count++] = pkt;
+ if (p->tx_buf_count >= p->tx_burst_sz)
+ send_burst_nodrop(p);
+ pkts_mask &= ~pkt_mask;
+ }
+ }
+
+ return 0;
+}
+
+#elif RTE_PORT_ETHDEV_WRITER_NODROP_APPROACH == 1
+
+static int
+rte_port_ethdev_writer_nodrop_tx_bulk(void *port,
+ struct rte_mbuf **pkts,
+ uint64_t pkts_mask)
+{
+ struct rte_port_ethdev_writer_nodrop *p =
+ (struct rte_port_ethdev_writer_nodrop *) port;
+
+ uint32_t bsz_mask = p->bsz_mask;
+ uint32_t tx_buf_count = p->tx_buf_count;
+ uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
+ ((pkts_mask & bsz_mask) ^ bsz_mask);
+
+ if (expr == 0) {
+ uint64_t n_pkts = __builtin_popcountll(pkts_mask);
+ uint32_t n_pkts_ok;
+
+ if (tx_buf_count)
+ send_burst_nodrop(p);
+
+ n_pkts_ok = rte_eth_tx_burst(p->port_id, p->queue_id, pkts,
+ n_pkts);
+
+ if (n_pkts_ok >= n_pkts)
+ return 0;
+
+ /*
+ * If we didnt manage to send all packets in single burst, move
+ * remaining packets to the buffer and call send burst.
+ */
+ for (; n_pkts_ok < n_pkts; n_pkts_ok++) {
+ struct rte_mbuf *pkt = pkts[n_pkts_ok];
+ p->tx_buf[p->tx_buf_count++] = pkt;
+ }
+ send_burst_nodrop(p);
+ } else {
+ for ( ; pkts_mask; ) {
+ uint32_t pkt_index = __builtin_ctzll(pkts_mask);
+ uint64_t pkt_mask = 1LLU << pkt_index;
+ struct rte_mbuf *pkt = pkts[pkt_index];
+
+ p->tx_buf[tx_buf_count++] = pkt;
+ pkts_mask &= ~pkt_mask;
+ }
+
+ p->tx_buf_count = tx_buf_count;
+ if (tx_buf_count >= p->tx_burst_sz)
+ send_burst_nodrop(p);
+ }
+
+ return 0;
+}
+
+#else
+
+#error Invalid value for RTE_PORT_ETHDEV_WRITER_NODROP_APPROACH
+
+#endif
+
+static int
+rte_port_ethdev_writer_nodrop_flush(void *port)
+{
+ struct rte_port_ethdev_writer_nodrop *p =
+ (struct rte_port_ethdev_writer_nodrop *) port;
+
+ if (p->tx_buf_count > 0)
+ send_burst_nodrop(p);
+
+ return 0;
+}
+
+static int
+rte_port_ethdev_writer_nodrop_free(void *port)
+{
+ if (port == NULL) {
+ RTE_LOG(ERR, PORT, "%s: Port is NULL\n", __func__);
+ return -EINVAL;
+ }
+
+ rte_port_ethdev_writer_nodrop_flush(port);
+ rte_free(port);
+
+ return 0;
+}
+
+/*
* Summary of port operations
*/
struct rte_port_in_ops rte_port_ethdev_reader_ops = {
@@ -303,3 +525,11 @@ struct rte_port_out_ops rte_port_ethdev_writer_ops = {
.f_tx_bulk = rte_port_ethdev_writer_tx_bulk,
.f_flush = rte_port_ethdev_writer_flush,
};
+
+struct rte_port_out_ops rte_port_ethdev_writer_nodrop_ops = {
+ .f_create = rte_port_ethdev_writer_nodrop_create,
+ .f_free = rte_port_ethdev_writer_nodrop_free,
+ .f_tx = rte_port_ethdev_writer_nodrop_tx,
+ .f_tx_bulk = rte_port_ethdev_writer_nodrop_tx_bulk,
+ .f_flush = rte_port_ethdev_writer_nodrop_flush,
+};
diff --git a/lib/librte_port/rte_port_ethdev.h b/lib/librte_port/rte_port_ethdev.h
index af67a12..201a79e 100644
--- a/lib/librte_port/rte_port_ethdev.h
+++ b/lib/librte_port/rte_port_ethdev.h
@@ -79,6 +79,25 @@ struct rte_port_ethdev_writer_params {
/** ethdev_writer port operations */
extern struct rte_port_out_ops rte_port_ethdev_writer_ops;
+/** ethdev_writer_nodrop port parameters */
+struct rte_port_ethdev_writer_nodrop_params {
+ /** NIC RX port ID */
+ uint8_t port_id;
+
+ /** NIC RX queue ID */
+ uint16_t queue_id;
+
+ /** Recommended burst size to NIC TX queue. The actual burst size can be
+ bigger or smaller than this value. */
+ uint32_t tx_burst_sz;
+
+ /** Maximum number of retries, 0 for no limit */
+ uint32_t n_retries;
+};
+
+/** ethdev_writer_nodrop port operations */
+extern struct rte_port_out_ops rte_port_ethdev_writer_nodrop_ops;
+
#ifdef __cplusplus
}
#endif
--
1.7.9.5
^ permalink raw reply [flat|nested] 8+ messages in thread
* [dpdk-dev] [PATCH v2 3/3] port: added ring_writer_nodrop port
2015-04-30 11:58 [dpdk-dev] [PATCH v2 0/3] port: added ethdev_writer_nodrop and ring_writer_nodrop ports Michal Jastrzebski
2015-04-30 11:58 ` [dpdk-dev] [PATCH v2 1/3] port: added WRITER_APPROACH == 1 implementation to ring port Michal Jastrzebski
2015-04-30 11:58 ` [dpdk-dev] [PATCH v2 2/3] port: added ethdev_writer_nodrop port Michal Jastrzebski
@ 2015-04-30 11:58 ` Michal Jastrzebski
2015-05-05 15:06 ` [dpdk-dev] [PATCH v2 0/3] port: added ethdev_writer_nodrop and ring_writer_nodrop ports Dumitrescu, Cristian
3 siblings, 0 replies; 8+ messages in thread
From: Michal Jastrzebski @ 2015-04-30 11:58 UTC (permalink / raw)
To: dev
From: Maciej Gajdzica <maciejx.t.gajdzica@intel.com>
When ethdev_writer_nodrop port fails to send data, it tries to resend.
Operation
is aborted when maximum number of retries is reached.
Signed-off-by: Maciej Gajdzica <maciejx.t.gajdzica@intel.com>
---
lib/librte_port/rte_port_ring.c | 226 +++++++++++++++++++++++++++++++++++++++
lib/librte_port/rte_port_ring.h | 16 +++
2 files changed, 242 insertions(+)
diff --git a/lib/librte_port/rte_port_ring.c b/lib/librte_port/rte_port_ring.c
index ba2eeb3..371bb1e 100644
--- a/lib/librte_port/rte_port_ring.c
+++ b/lib/librte_port/rte_port_ring.c
@@ -31,6 +31,7 @@
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <string.h>
+#include <stdint.h>
#include <rte_mbuf.h>
#include <rte_ring.h>
@@ -279,6 +280,223 @@ rte_port_ring_writer_free(void *port)
}
/*
+ * Port RING Writer Nodrop
+ */
+#define RTE_PORT_RING_WRITER_NODROP_APPROACH 1
+
+struct rte_port_ring_writer_nodrop {
+ struct rte_mbuf *tx_buf[RTE_PORT_IN_BURST_SIZE_MAX];
+ struct rte_ring *ring;
+ uint32_t tx_burst_sz;
+ uint32_t tx_buf_count;
+ uint64_t bsz_mask;
+ uint64_t n_retries;
+};
+
+static void *
+rte_port_ring_writer_nodrop_create(void *params, int socket_id)
+{
+ struct rte_port_ring_writer_nodrop_params *conf =
+ (struct rte_port_ring_writer_nodrop_params *) params;
+ struct rte_port_ring_writer_nodrop *port;
+
+ /* Check input parameters */
+ if ((conf == NULL) ||
+ (conf->ring == NULL) ||
+ (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
+ RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
+ return NULL;
+ }
+
+ /* Memory allocation */
+ port = rte_zmalloc_socket("PORT", sizeof(*port),
+ RTE_CACHE_LINE_SIZE, socket_id);
+ if (port == NULL) {
+ RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
+ return NULL;
+ }
+
+ /* Initialization */
+ port->ring = conf->ring;
+ port->tx_burst_sz = conf->tx_burst_sz;
+ port->tx_buf_count = 0;
+ port->bsz_mask = 1LLU << (conf->tx_burst_sz - 1);
+
+ /*
+ * When n_retries is 0 it means that we should wait for every packet to
+ * send no matter how many retries should it take. To limit number of
+ * branches in fast path, we use UINT64_MAX instead of branching.
+ */
+ port->n_retries = (conf->n_retries == 0) ? UINT64_MAX : conf->n_retries;
+
+ return port;
+}
+
+static inline void
+send_burst_nodrop(struct rte_port_ring_writer_nodrop *p)
+{
+ uint32_t nb_tx = 0, i;
+
+ nb_tx = rte_ring_sp_enqueue_burst(p->ring, (void **)p->tx_buf,
+ p->tx_buf_count);
+
+ /* We sent all the packets in a first try */
+ if (nb_tx >= p->tx_buf_count)
+ return;
+
+ for (i = 0; i < p->n_retries; i++) {
+ nb_tx += rte_ring_sp_enqueue_burst(p->ring,
+ (void **) (p->tx_buf + nb_tx), p->tx_buf_count - nb_tx);
+
+ /* We sent all the packets in more than one try */
+ if (nb_tx >= p->tx_buf_count)
+ return;
+ }
+
+ /* We didn't send the packets in maximum allowed attempts */
+ for ( ; nb_tx < p->tx_buf_count; nb_tx++)
+ rte_pktmbuf_free(p->tx_buf[nb_tx]);
+
+ p->tx_buf_count = 0;
+}
+
+static int
+rte_port_ring_writer_nodrop_tx(void *port, struct rte_mbuf *pkt)
+{
+ struct rte_port_ring_writer_nodrop *p =
+ (struct rte_port_ring_writer_nodrop *) port;
+
+ p->tx_buf[p->tx_buf_count++] = pkt;
+ if (p->tx_buf_count >= p->tx_burst_sz)
+ send_burst_nodrop(p);
+
+ return 0;
+}
+
+#if RTE_PORT_RING_WRITER_NODROP_APPROACH == 0
+
+static int
+rte_port_ring_writer_nodrop_tx_bulk(void *port,
+ struct rte_mbuf **pkts,
+ uint64_t pkts_mask)
+{
+ struct rte_port_ring_writer_nodrop *p =
+ (struct rte_port_ring_writer_nodrop *) port;
+
+ if ((pkts_mask & (pkts_mask + 1)) == 0) {
+ uint64_t n_pkts = __builtin_popcountll(pkts_mask);
+ uint32_t i;
+
+ for (i = 0; i < n_pkts; i++) {
+ struct rte_mbuf *pkt = pkts[i];
+
+ p->tx_buf[p->tx_buf_count++] = pkt;
+ if (p->tx_buf_count >= p->tx_burst_sz)
+ send_burst_nodrop(p);
+ }
+ } else {
+ for ( ; pkts_mask; ) {
+ uint32_t pkt_index = __builtin_ctzll(pkts_mask);
+ uint64_t pkt_mask = 1LLU << pkt_index;
+ struct rte_mbuf *pkt = pkts[pkt_index];
+
+ p->tx_buf[p->tx_buf_count++] = pkt;
+ if (p->tx_buf_count >= p->tx_burst_sz)
+ send_burst_nodrop(p);
+ pkts_mask &= ~pkt_mask;
+ }
+ }
+
+ return 0;
+}
+
+#elif RTE_PORT_RING_WRITER_NODROP_APPROACH == 1
+
+static int
+rte_port_ring_writer_nodrop_tx_bulk(void *port,
+ struct rte_mbuf **pkts,
+ uint64_t pkts_mask)
+{
+ struct rte_port_ring_writer_nodrop *p =
+ (struct rte_port_ring_writer_nodrop *) port;
+
+ uint32_t bsz_mask = p->bsz_mask;
+ uint32_t tx_buf_count = p->tx_buf_count;
+ uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
+ ((pkts_mask & bsz_mask) ^ bsz_mask);
+
+ if (expr == 0) {
+ uint64_t n_pkts = __builtin_popcountll(pkts_mask);
+ uint32_t n_pkts_ok;
+
+ if (tx_buf_count)
+ send_burst_nodrop(p);
+
+ n_pkts_ok = rte_ring_sp_enqueue_burst(p->ring, (void **)pkts, n_pkts);
+
+ if (n_pkts_ok >= n_pkts)
+ return 0;
+
+ /*
+ * If we didnt manage to send all packets in single burst, move
+ * remaining packets to the buffer and call send burst.
+ */
+ for (; n_pkts_ok < n_pkts; n_pkts_ok++) {
+ struct rte_mbuf *pkt = pkts[n_pkts_ok];
+ p->tx_buf[p->tx_buf_count++] = pkt;
+ }
+ send_burst_nodrop(p);
+ } else {
+ for ( ; pkts_mask; ) {
+ uint32_t pkt_index = __builtin_ctzll(pkts_mask);
+ uint64_t pkt_mask = 1LLU << pkt_index;
+ struct rte_mbuf *pkt = pkts[pkt_index];
+
+ p->tx_buf[tx_buf_count++] = pkt;
+ pkts_mask &= ~pkt_mask;
+ }
+
+ p->tx_buf_count = tx_buf_count;
+ if (tx_buf_count >= p->tx_burst_sz)
+ send_burst_nodrop(p);
+ }
+
+ return 0;
+}
+
+#else
+
+#error Invalid value for RTE_PORT_RING_WRITER_NODROP_APPROACH
+
+#endif
+
+static int
+rte_port_ring_writer_nodrop_flush(void *port)
+{
+ struct rte_port_ring_writer_nodrop *p =
+ (struct rte_port_ring_writer_nodrop *) port;
+
+ if (p->tx_buf_count > 0)
+ send_burst_nodrop(p);
+
+ return 0;
+}
+
+static int
+rte_port_ring_writer_nodrop_free(void *port)
+{
+ if (port == NULL) {
+ RTE_LOG(ERR, PORT, "%s: Port is NULL\n", __func__);
+ return -EINVAL;
+ }
+
+ rte_port_ring_writer_nodrop_flush(port);
+ rte_free(port);
+
+ return 0;
+}
+
+/*
* Summary of port operations
*/
struct rte_port_in_ops rte_port_ring_reader_ops = {
@@ -294,3 +512,11 @@ struct rte_port_out_ops rte_port_ring_writer_ops = {
.f_tx_bulk = rte_port_ring_writer_tx_bulk,
.f_flush = rte_port_ring_writer_flush,
};
+
+struct rte_port_out_ops rte_port_ring_writer_nodrop_ops = {
+ .f_create = rte_port_ring_writer_nodrop_create,
+ .f_free = rte_port_ring_writer_nodrop_free,
+ .f_tx = rte_port_ring_writer_nodrop_tx,
+ .f_tx_bulk = rte_port_ring_writer_nodrop_tx_bulk,
+ .f_flush = rte_port_ring_writer_nodrop_flush,
+};
diff --git a/lib/librte_port/rte_port_ring.h b/lib/librte_port/rte_port_ring.h
index 009dcf8..ea5ad92 100644
--- a/lib/librte_port/rte_port_ring.h
+++ b/lib/librte_port/rte_port_ring.h
@@ -75,6 +75,22 @@ struct rte_port_ring_writer_params {
/** ring_writer port operations */
extern struct rte_port_out_ops rte_port_ring_writer_ops;
+/** ring_writer port parameters */
+struct rte_port_ring_writer_nodrop_params {
+ /** Underlying single producer ring that has to be pre-initialized */
+ struct rte_ring *ring;
+
+ /** Recommended burst size to ring. The actual burst size can be
+ bigger or smaller than this value. */
+ uint32_t tx_burst_sz;
+
+ /** Maximum number of retries, 0 for no limit */
+ uint32_t n_retries;
+};
+
+/** ring_writer port operations */
+extern struct rte_port_out_ops rte_port_ring_writer_nodrop_ops;
+
#ifdef __cplusplus
}
#endif
--
1.7.9.5
^ permalink raw reply [flat|nested] 8+ messages in thread
* Re: [dpdk-dev] [PATCH v2 0/3] port: added ethdev_writer_nodrop and ring_writer_nodrop ports
2015-04-30 11:58 [dpdk-dev] [PATCH v2 0/3] port: added ethdev_writer_nodrop and ring_writer_nodrop ports Michal Jastrzebski
` (2 preceding siblings ...)
2015-04-30 11:58 ` [dpdk-dev] [PATCH v2 3/3] port: added ring_writer_nodrop port Michal Jastrzebski
@ 2015-05-05 15:06 ` Dumitrescu, Cristian
3 siblings, 0 replies; 8+ messages in thread
From: Dumitrescu, Cristian @ 2015-05-05 15:06 UTC (permalink / raw)
To: Jastrzebski, MichalX K, dev
> -----Original Message-----
> From: dev [mailto:dev-bounces@dpdk.org] On Behalf Of Michal Jastrzebski
> Sent: Thursday, April 30, 2015 12:58 PM
> To: dev@dpdk.org
> Subject: [dpdk-dev] [PATCH v2 0/3] port: added ethdev_writer_nodrop and
> ring_writer_nodrop ports
>
> From: Maciej Gajdzica <maciejx.t.gajdzica@intel.com>
>
> When nodrop writer port fails to send data, it retries until reach maximum
> number of retries. Also added new tx_bulk implementation for ring writer
> port.
>
> Maciej Gajdzica (3):
> port: added WRITER_APPROACH == 1 implementation to ring port
> port: added ethdev_writer_nodrop port
> port: added ring_writer_nodrop port
>
> lib/librte_port/rte_port_ethdev.c | 230
> ++++++++++++++++++++++++++++++
> lib/librte_port/rte_port_ethdev.h | 19 +++
> lib/librte_port/rte_port_ring.c | 285
> +++++++++++++++++++++++++++++++++++++
> lib/librte_port/rte_port_ring.h | 16 +++
> 4 files changed, 550 insertions(+)
>
> --
> 1.7.9.5
Acked by: Cristian Dumitrescu <cristian.dumitrescu@intel.com>
^ permalink raw reply [flat|nested] 8+ messages in thread