* [dpdk-dev] [RFC PATCH DRAFT 1/2] ethdev: add buffered single pkt TX function to API
2014-06-24 22:32 [dpdk-dev] [RFC PATCH DRAFT 0/2] ethdev: Proposal to expand API for single-pkt-tx Bruce Richardson
@ 2014-06-24 22:32 ` Bruce Richardson
2014-06-24 23:05 ` Stephen Hemminger
2014-06-24 23:07 ` Stephen Hemminger
2014-06-24 22:32 ` [dpdk-dev] [RFC PATCH DRAFT 2/2] l2fwd: update l2fwd to use tx_buffer API Bruce Richardson
1 sibling, 2 replies; 7+ messages in thread
From: Bruce Richardson @ 2014-06-24 22:32 UTC (permalink / raw)
To: dev
Many sample apps include internal buffering for single-packet-at-a-time
operation. Since this is such a common paradigm, this functionality is
better suited to being inside the core ethdev API.
The new APIs include three functions:
* rte_eth_tx_buffer - buffer up a single packet for future transmission
* rte_eth_tx_buffer_flush - flush any unsent buffered packets
* rte_eth_tx_buffer_set_err_callback - set up a callback to be called in
case transmitting a buffered burst fails. By default, we just free the
unsent packets.
---
config/common_bsdapp | 1 +
config/common_linuxapp | 1 +
lib/librte_ether/rte_ethdev.c | 55 +++++++++++++++++++++++++++++++++++--
lib/librte_ether/rte_ethdev.h | 63 +++++++++++++++++++++++++++++++++++++++++++
4 files changed, 118 insertions(+), 2 deletions(-)
diff --git a/config/common_bsdapp b/config/common_bsdapp
index 989e1da..98da1f5 100644
--- a/config/common_bsdapp
+++ b/config/common_bsdapp
@@ -138,6 +138,7 @@ CONFIG_RTE_LIBRTE_ETHDEV_DEBUG=n
CONFIG_RTE_MAX_ETHPORTS=32
CONFIG_RTE_LIBRTE_IEEE1588=n
CONFIG_RTE_ETHDEV_QUEUE_STAT_CNTRS=16
+CONFIG_RTE_ETHDEV_TX_BUFSIZE=32
#
# Compile burst-oriented IGB & EM PMD drivers
diff --git a/config/common_linuxapp b/config/common_linuxapp
index 5b896c3..0f509d0 100644
--- a/config/common_linuxapp
+++ b/config/common_linuxapp
@@ -161,6 +161,7 @@ CONFIG_RTE_LIBRTE_ETHDEV_DEBUG=n
CONFIG_RTE_MAX_ETHPORTS=32
CONFIG_RTE_LIBRTE_IEEE1588=n
CONFIG_RTE_ETHDEV_QUEUE_STAT_CNTRS=16
+CONFIG_RTE_ETHDEV_TX_BUFSIZE=32
#
# Support NIC bypass logic
diff --git a/lib/librte_ether/rte_ethdev.c b/lib/librte_ether/rte_ethdev.c
index 7256841..68d4d22 100644
--- a/lib/librte_ether/rte_ethdev.c
+++ b/lib/librte_ether/rte_ethdev.c
@@ -397,11 +397,32 @@ rte_eth_dev_tx_queue_stop(uint8_t port_id, uint16_t tx_queue_id)
}
+static void
+free_unsent_pkts(struct rte_mbuf **pkts, uint16_t unsent,
+ void *userdata __rte_unused)
+{
+ unsigned i;
+ for (i = 0; i < unsent; i++)
+ rte_pktmbuf_free(pkts[i]);
+}
+
+void
+rte_eth_tx_buffer_set_err_callback(uint8_t port_id, uint16_t queue_id,
+ buffer_tx_error_fn cbfn, void *userdata)
+{
+ struct rte_eth_dev_data *dev_data = rte_eth_devices[port_id].data;
+ struct rte_eth_dev_tx_buffer *buf = dev_data->tx_queues[queue_id];
+
+ buf->userdata = userdata;
+ buf->flush_cb = cbfn;
+}
+
static int
rte_eth_dev_tx_queue_config(struct rte_eth_dev *dev, uint16_t nb_queues)
{
uint16_t old_nb_queues = dev->data->nb_tx_queues;
void **txq;
+ struct rte_eth_dev_tx_buffer *new_bufs;
unsigned i;
if (dev->data->tx_queues == NULL) { /* first time configuration */
@@ -412,24 +433,54 @@ rte_eth_dev_tx_queue_config(struct rte_eth_dev *dev, uint16_t nb_queues)
dev->data->nb_tx_queues = 0;
return -(ENOMEM);
}
+
+ dev->data->txq_bufs = rte_zmalloc("ethdev->txq_bufs",
+ sizeof(*dev->data->txq_bufs) * nb_queues, 0);
+ if (dev->data->txq_bufs == NULL) {
+ dev->data->nb_tx_queues = 0;
+ rte_free(dev->data->tx_queues);
+ return -(ENOMEM);
+ }
+ for (i = 0; i < nb_queues; i++ )
+ dev->data->txq_bufs[i].flush_cb = free_unsent_pkts;
} else { /* re-configure */
+
+ /* flush the packets queued for all queues*/
+ for (i = 0; i < old_nb_queues; i++)
+ rte_eth_tx_buffer_flush(dev->data->port_id, i);
+
FUNC_PTR_OR_ERR_RET(*dev->dev_ops->tx_queue_release, -ENOTSUP);
+ /* get new buffer space first, but keep old space around */
+ new_bufs = rte_zmalloc("ethdev->txq_bufs",
+ sizeof(*dev->data->txq_bufs) * nb_queues, 0);
+ if (new_bufs == NULL)
+ return -(ENOMEM);
+
txq = dev->data->tx_queues;
for (i = nb_queues; i < old_nb_queues; i++)
(*dev->dev_ops->tx_queue_release)(txq[i]);
txq = rte_realloc(txq, sizeof(txq[0]) * nb_queues,
CACHE_LINE_SIZE);
- if (txq == NULL)
+ if (txq == NULL) {
+ rte_free(new_bufs);
return -(ENOMEM);
+ }
- if (nb_queues > old_nb_queues)
+ if (nb_queues > old_nb_queues) {
memset(txq + old_nb_queues, 0,
sizeof(txq[0]) * (nb_queues - old_nb_queues));
+ for (i = old_nb_queues; i < nb_queues; i++)
+ dev->data->txq_bufs[i].flush_cb =
+ free_unsent_pkts;
+ }
dev->data->tx_queues = txq;
+ /* now replace old buffers with new */
+ rte_free(dev->data->txq_bufs);
+ dev->data->txq_bufs = new_bufs;
}
dev->data->nb_tx_queues = nb_queues;
return (0);
diff --git a/lib/librte_ether/rte_ethdev.h b/lib/librte_ether/rte_ethdev.h
index 2406e45..0ec7076 100644
--- a/lib/librte_ether/rte_ethdev.h
+++ b/lib/librte_ether/rte_ethdev.h
@@ -176,6 +176,7 @@ extern "C" {
#include <rte_interrupts.h>
#include <rte_pci.h>
#include <rte_mbuf.h>
+#include <rte_branch_prediction.h>
#include "rte_ether.h"
/**
@@ -1497,6 +1498,16 @@ struct rte_eth_dev_sriov {
};
#define RTE_ETH_DEV_SRIOV(dev) ((dev)->data->sriov)
+typedef void (*buffer_tx_error_fn)(struct rte_mbuf **unsent, uint16_t count,
+ void *userdata);
+
+struct rte_eth_dev_tx_buffer {
+ struct rte_mbuf *pkts[RTE_ETHDEV_TX_BUFSIZE];
+ unsigned nb_pkts;
+ buffer_tx_error_fn flush_cb; /* callback for when tx_burst fails */
+ void *userdata; /* userdata for callback */
+};
+
/**
* @internal
* The data part, with no function pointers, associated with each ethernet device.
@@ -1534,6 +1545,9 @@ struct rte_eth_dev_data {
scattered_rx : 1, /**< RX of scattered packets is ON(1) / OFF(0) */
all_multicast : 1, /**< RX all multicast mode ON(1) / OFF(0). */
dev_started : 1; /**< Device state: STARTED(1) / STOPPED(0). */
+
+ struct rte_eth_dev_tx_buffer
+ *txq_bufs; /**< space to allow buffered transmits */
};
/**
@@ -2426,6 +2440,55 @@ rte_eth_tx_burst(uint8_t port_id, uint16_t queue_id,
}
#endif
+static inline uint16_t
+rte_eth_tx_buffer(uint8_t port_id, uint16_t queue_id, struct rte_mbuf *tx_pkt)
+{
+ struct rte_eth_dev *dev = &rte_eth_devices[port_id];
+ struct rte_eth_dev_tx_buffer *qbuf = &dev->data->txq_bufs[queue_id];
+
+ qbuf->pkts[qbuf->nb_pkts++] = tx_pkt;
+ if (qbuf->nb_pkts < RTE_ETHDEV_TX_BUFSIZE)
+ return 0;
+
+ const uint16_t sent = (*dev->tx_pkt_burst)(
+ dev->data->tx_queues[queue_id], qbuf->pkts,
+ RTE_ETHDEV_TX_BUFSIZE);
+
+ qbuf->nb_pkts = 0; /* all packets sent, or to be dealt with by
+ * callback below */
+ if (unlikely(sent != RTE_ETHDEV_TX_BUFSIZE))
+ qbuf->flush_cb(&qbuf->pkts[sent], RTE_ETHDEV_TX_BUFSIZE - sent,
+ qbuf->userdata);
+
+ return sent;
+}
+
+static inline uint16_t
+rte_eth_tx_buffer_flush(uint8_t port_id, uint16_t queue_id)
+{
+ struct rte_eth_dev *dev = &rte_eth_devices[port_id];
+ struct rte_eth_dev_tx_buffer *qbuf = &dev->data->txq_bufs[queue_id];
+
+ if (qbuf->nb_pkts != 0)
+ return 0;
+
+ const uint16_t to_send = qbuf->nb_pkts;
+ const uint16_t sent = (*dev->tx_pkt_burst)(
+ dev->data->tx_queues[queue_id], qbuf->pkts, to_send);
+
+ qbuf->nb_pkts = 0; /* all packets sent, or to be dealt with by
+ * callback below */
+ if (unlikely(sent != qbuf->nb_pkts))
+ qbuf->flush_cb(&qbuf->pkts[sent], to_send - sent,
+ qbuf->userdata);
+
+ return sent;
+}
+
+void
+rte_eth_tx_buffer_set_err_callback(uint8_t port_id, uint16_t queue_id,
+ buffer_tx_error_fn cbfn, void *userdata);
+
/**
* Setup a new signature filter rule on an Ethernet device
*
--
1.9.3
^ permalink raw reply [flat|nested] 7+ messages in thread
* [dpdk-dev] [RFC PATCH DRAFT 2/2] l2fwd: update l2fwd to use tx_buffer API
2014-06-24 22:32 [dpdk-dev] [RFC PATCH DRAFT 0/2] ethdev: Proposal to expand API for single-pkt-tx Bruce Richardson
2014-06-24 22:32 ` [dpdk-dev] [RFC PATCH DRAFT 1/2] ethdev: add buffered single pkt TX function to API Bruce Richardson
@ 2014-06-24 22:32 ` Bruce Richardson
1 sibling, 0 replies; 7+ messages in thread
From: Bruce Richardson @ 2014-06-24 22:32 UTC (permalink / raw)
To: dev
The internal buffering of packets for TX done in l2fwd is no longer
needed, so replace this code with calls to the new rte_eth_tx_buffer*
APIs.
---
examples/l2fwd/main.c | 61 ++++++---------------------------------------------
1 file changed, 7 insertions(+), 54 deletions(-)
diff --git a/examples/l2fwd/main.c b/examples/l2fwd/main.c
index 4069d7c..5ca5709 100644
--- a/examples/l2fwd/main.c
+++ b/examples/l2fwd/main.c
@@ -231,52 +231,6 @@ print_stats(void)
printf("\n====================================================\n");
}
-/* Send the burst of packets on an output interface */
-static int
-l2fwd_send_burst(struct lcore_queue_conf *qconf, unsigned n, uint8_t port)
-{
- struct rte_mbuf **m_table;
- unsigned ret;
- unsigned queueid =0;
-
- m_table = (struct rte_mbuf **)qconf->tx_mbufs[port].m_table;
-
- ret = rte_eth_tx_burst(port, (uint16_t) queueid, m_table, (uint16_t) n);
- port_statistics[port].tx += ret;
- if (unlikely(ret < n)) {
- port_statistics[port].dropped += (n - ret);
- do {
- rte_pktmbuf_free(m_table[ret]);
- } while (++ret < n);
- }
-
- return 0;
-}
-
-/* Enqueue packets for TX and prepare them to be sent */
-static int
-l2fwd_send_packet(struct rte_mbuf *m, uint8_t port)
-{
- unsigned lcore_id, len;
- struct lcore_queue_conf *qconf;
-
- lcore_id = rte_lcore_id();
-
- qconf = &lcore_queue_conf[lcore_id];
- len = qconf->tx_mbufs[port].len;
- qconf->tx_mbufs[port].m_table[len] = m;
- len++;
-
- /* enough pkts to be sent */
- if (unlikely(len == MAX_PKT_BURST)) {
- l2fwd_send_burst(qconf, MAX_PKT_BURST, port);
- len = 0;
- }
-
- qconf->tx_mbufs[port].len = len;
- return 0;
-}
-
static void
l2fwd_simple_forward(struct rte_mbuf *m, unsigned portid)
{
@@ -294,7 +248,7 @@ l2fwd_simple_forward(struct rte_mbuf *m, unsigned portid)
/* src addr */
ether_addr_copy(&l2fwd_ports_eth_addr[dst_port], ð->s_addr);
- l2fwd_send_packet(m, (uint8_t) dst_port);
+ port_statistics[dst_port].tx += rte_eth_tx_buffer(dst_port, 0, m);
}
/* main processing loop */
@@ -339,13 +293,12 @@ l2fwd_main_loop(void)
diff_tsc = cur_tsc - prev_tsc;
if (unlikely(diff_tsc > drain_tsc)) {
- for (portid = 0; portid < RTE_MAX_ETHPORTS; portid++) {
- if (qconf->tx_mbufs[portid].len == 0)
- continue;
- l2fwd_send_burst(&lcore_queue_conf[lcore_id],
- qconf->tx_mbufs[portid].len,
- (uint8_t) portid);
- qconf->tx_mbufs[portid].len = 0;
+ for (i = 0; i < qconf->n_rx_port; i++) {
+
+ portid = qconf->rx_port_list[i];
+ portid = l2fwd_dst_ports[portid];
+ port_statistics[portid].tx +=
+ rte_eth_tx_buffer_flush(portid, 0);
}
/* if timer is enabled */
--
1.9.3
^ permalink raw reply [flat|nested] 7+ messages in thread