From: Konstantin Ananyev <konstantin.ananyev@intel.com>
To: dev@dpdk.org
To: dev@dpdk.org
Cc: Konstantin Ananyev <konstantin.ananyev@intel.com>
Subject: [dpdk-dev] [RFC PATCH 1/3] ethdev: introduce eth_queue_local
Date: Fri, 1 Dec 2017 14:48:00 +0000 [thread overview]
Message-ID: <1512139682-11114-2-git-send-email-konstantin.ananyev@intel.com> (raw)
In-Reply-To: <1512139682-11114-1-git-send-email-konstantin.ananyev@intel.com>
Introduce a separate data structure (rte_eth_queue_local)
to store local to given process (i.e. no-shareable) information
for each configured rx/tx queue.
Memory for that structure is allocated/freed dynamically during
rte_eth_dev_configure().
Put a placeholder pointers for queue specific (rx|tx)_pkt_burst(),
tx_pkt_prepare() functions inside that structure.
Move rx/tx callback related information inside that structure.
That introduces a change in current behavior: all callbacks for
un-configured queues will be automatically removed.
Let say: rte_eth_dev_configure(port, 0, 0, ...);
would wipe out all installed callbacks for that port.
Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
---
lib/librte_ether/rte_ethdev.c | 228 +++++++++++++++++++++++++++++-------------
lib/librte_ether/rte_ethdev.h | 85 +++++++++++-----
2 files changed, 216 insertions(+), 97 deletions(-)
diff --git a/lib/librte_ether/rte_ethdev.c b/lib/librte_ether/rte_ethdev.c
index 318af2869..ff8571d60 100644
--- a/lib/librte_ether/rte_ethdev.c
+++ b/lib/librte_ether/rte_ethdev.c
@@ -241,6 +241,14 @@ rte_eth_dev_allocate(const char *name)
return eth_dev;
}
+static struct rte_eth_queue_local *
+alloc_queue_local(uint16_t nb_queues)
+{
+ struct rte_eth_queue_local *ql;
+ ql = rte_zmalloc(NULL, sizeof(ql[0]) * nb_queues, RTE_CACHE_LINE_SIZE);
+ return ql;
+}
+
/*
* Attach to a port already registered by the primary process, which
* makes sure that the same device would have the same port id both
@@ -269,6 +277,16 @@ rte_eth_dev_attach_secondary(const char *name)
eth_dev = eth_dev_get(i);
RTE_ASSERT(eth_dev->data->port_id == i);
+ /*
+ * obviously it is quite error prone:
+ * if the primary process will decide to (re)configure device later,
+ * there is no way for secondary one to notice that and update
+ * it's local data (queue_local, rx|tx_pkt_burst, etc.).
+ * We probably need an eth_dev_configure_secondary() or so.
+ */
+ eth_dev->rx_ql = alloc_queue_local(eth_dev->data->nb_rx_queues);
+ eth_dev->tx_ql = alloc_queue_local(eth_dev->data->nb_tx_queues);
+
return eth_dev;
}
@@ -444,52 +462,92 @@ rte_eth_dev_detach(uint16_t port_id, char *name)
return ret;
}
+/*
+ * Helper routine - removes all registered RX/TX queue callbacks
+ * from the FIFO list.
+ * It is a caller responsibility to make sure no actual RX/TX happens
+ * simultanesoulsy on that queue.
+ */
+static void
+free_rxtx_cbs(struct rte_eth_rxtx_callback *cbs)
+{
+ struct rte_eth_rxtx_callback *cb, *next;
+
+ for (cb = cbs; cb != NULL; cb = next) {
+ next = cb->next;
+ rte_free(cb);
+ }
+}
+
+static void
+reset_queue_local(struct rte_eth_queue_local *ql)
+{
+ free_rxtx_cbs(ql->cbs);
+ memset(ql, 0, sizeof(*ql));
+}
+
static int
rte_eth_dev_rx_queue_config(struct rte_eth_dev *dev, uint16_t nb_queues)
{
- uint16_t old_nb_queues = dev->data->nb_rx_queues;
+ uint32_t diff, i, old_nb_queues;
+ struct rte_eth_queue_local *rlq;
void **rxq;
- unsigned i;
- if (dev->data->rx_queues == NULL && nb_queues != 0) { /* first time configuration */
- dev->data->rx_queues = rte_zmalloc("ethdev->rx_queues",
- sizeof(dev->data->rx_queues[0]) * nb_queues,
- RTE_CACHE_LINE_SIZE);
- if (dev->data->rx_queues == NULL) {
- dev->data->nb_rx_queues = 0;
- return -(ENOMEM);
- }
- } else if (dev->data->rx_queues != NULL && nb_queues != 0) { /* re-configure */
- RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->rx_queue_release, -ENOTSUP);
+ old_nb_queues = dev->data->nb_rx_queues;
+
+ /* same # of queues nothing need to be done */
+ if (nb_queues == old_nb_queues)
+ return 0;
+
+ rxq = dev->data->rx_queues;
+ rlq = dev->rx_ql;
- rxq = dev->data->rx_queues;
+ /* reduce # of queues */
+ if (old_nb_queues > nb_queues) {
- for (i = nb_queues; i < old_nb_queues; i++)
+ RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->rx_queue_release,
+ -ENOTSUP);
+
+ dev->data->nb_rx_queues = nb_queues;
+
+ /* free resources used by the queues we are going to remove */
+ for (i = nb_queues; i < old_nb_queues; i++) {
+ reset_queue_local(rlq + i);
(*dev->dev_ops->rx_queue_release)(rxq[i]);
- rxq = rte_realloc(rxq, sizeof(rxq[0]) * nb_queues,
- RTE_CACHE_LINE_SIZE);
- if (rxq == NULL)
- return -(ENOMEM);
- if (nb_queues > old_nb_queues) {
- uint16_t new_qs = nb_queues - old_nb_queues;
-
- memset(rxq + old_nb_queues, 0,
- sizeof(rxq[0]) * new_qs);
+ rxq[i] = 0;
}
- dev->data->rx_queues = rxq;
+ if (nb_queues == 0) {
+ dev->data->rx_queues = NULL;
+ dev->rx_ql = NULL;
+ rte_free(rxq);
+ rte_free(rlq);
+ }
+
+ return 0;
+ }
- } else if (dev->data->rx_queues != NULL && nb_queues == 0) {
- RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->rx_queue_release, -ENOTSUP);
+ /* increase # of queues */
- rxq = dev->data->rx_queues;
+ diff = nb_queues - old_nb_queues;
- for (i = nb_queues; i < old_nb_queues; i++)
- (*dev->dev_ops->rx_queue_release)(rxq[i]);
+ rxq = rte_realloc(rxq, sizeof(rxq[0]) * nb_queues,
+ RTE_CACHE_LINE_SIZE);
+ rlq = rte_realloc(rlq, sizeof(rlq[0]) * nb_queues,
+ RTE_CACHE_LINE_SIZE);
- rte_free(dev->data->rx_queues);
- dev->data->rx_queues = NULL;
+ if (rxq != NULL) {
+ memset(rxq + old_nb_queues, 0, sizeof(rxq[0]) * diff);
+ dev->data->rx_queues = rxq;
+ }
+ if (rlq != NULL) {
+ memset(rlq + old_nb_queues, 0, sizeof(rlq[0]) * diff);
+ dev->rx_ql = rlq;
}
+
+ if (rxq == NULL || rlq == NULL)
+ return -ENOMEM;
+
dev->data->nb_rx_queues = nb_queues;
return 0;
}
@@ -601,49 +659,65 @@ rte_eth_dev_tx_queue_stop(uint16_t port_id, uint16_t tx_queue_id)
static int
rte_eth_dev_tx_queue_config(struct rte_eth_dev *dev, uint16_t nb_queues)
{
- uint16_t old_nb_queues = dev->data->nb_tx_queues;
+ uint32_t diff, i, old_nb_queues;
+ struct rte_eth_queue_local *tlq;
void **txq;
- unsigned i;
- if (dev->data->tx_queues == NULL && nb_queues != 0) { /* first time configuration */
- dev->data->tx_queues = rte_zmalloc("ethdev->tx_queues",
- sizeof(dev->data->tx_queues[0]) * nb_queues,
- RTE_CACHE_LINE_SIZE);
- if (dev->data->tx_queues == NULL) {
- dev->data->nb_tx_queues = 0;
- return -(ENOMEM);
- }
- } else if (dev->data->tx_queues != NULL && nb_queues != 0) { /* re-configure */
- RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->tx_queue_release, -ENOTSUP);
+ old_nb_queues = dev->data->nb_tx_queues;
+
+ /* same # of queues nothing need to be done */
+ if (nb_queues == old_nb_queues)
+ return 0;
+
+ txq = dev->data->tx_queues;
+ tlq = dev->tx_ql;
+
+ /* reduce # of queues */
+ if (old_nb_queues > nb_queues) {
+
+ RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->tx_queue_release,
+ -ENOTSUP);
- txq = dev->data->tx_queues;
+ dev->data->nb_tx_queues = nb_queues;
- for (i = nb_queues; i < old_nb_queues; i++)
+ /* free resources used by the queues we are going to remove */
+ for (i = nb_queues; i < old_nb_queues; i++) {
+ reset_queue_local(tlq + i);
(*dev->dev_ops->tx_queue_release)(txq[i]);
- txq = rte_realloc(txq, sizeof(txq[0]) * nb_queues,
- RTE_CACHE_LINE_SIZE);
- if (txq == NULL)
- return -ENOMEM;
- if (nb_queues > old_nb_queues) {
- uint16_t new_qs = nb_queues - old_nb_queues;
-
- memset(txq + old_nb_queues, 0,
- sizeof(txq[0]) * new_qs);
+ txq[i] = 0;
}
- dev->data->tx_queues = txq;
+ if (nb_queues == 0) {
+ dev->data->tx_queues = NULL;
+ dev->tx_ql = NULL;
+ rte_free(txq);
+ rte_free(tlq);
+ }
+
+ return 0;
+ }
- } else if (dev->data->tx_queues != NULL && nb_queues == 0) {
- RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->tx_queue_release, -ENOTSUP);
+ /* increase # of queues */
- txq = dev->data->tx_queues;
+ diff = nb_queues - old_nb_queues;
- for (i = nb_queues; i < old_nb_queues; i++)
- (*dev->dev_ops->tx_queue_release)(txq[i]);
+ txq = rte_realloc(txq, sizeof(txq[0]) * nb_queues,
+ RTE_CACHE_LINE_SIZE);
+ tlq = rte_realloc(tlq, sizeof(tlq[0]) * nb_queues,
+ RTE_CACHE_LINE_SIZE);
- rte_free(dev->data->tx_queues);
- dev->data->tx_queues = NULL;
+ if (txq != NULL) {
+ memset(txq + old_nb_queues, 0, sizeof(txq[0]) * diff);
+ dev->data->tx_queues = txq;
+ }
+ if (tlq != NULL) {
+ memset(tlq + old_nb_queues, 0, sizeof(tlq[0]) * diff);
+ dev->tx_ql = tlq;
}
+
+ if (txq == NULL || tlq == NULL)
+ return -ENOMEM;
+
dev->data->nb_tx_queues = nb_queues;
return 0;
}
@@ -1084,6 +1158,7 @@ void
rte_eth_dev_close(uint16_t port_id)
{
struct rte_eth_dev *dev;
+ uint32_t i;
RTE_ETH_VALID_PORTID_OR_RET(port_id);
dev = &rte_eth_devices[port_id];
@@ -1092,12 +1167,25 @@ rte_eth_dev_close(uint16_t port_id)
dev->data->dev_started = 0;
(*dev->dev_ops->dev_close)(dev);
+ /* free local resources for RX/TX queues */
+
+ for (i = 0; i != dev->data->nb_rx_queues; i++)
+ reset_queue_local(dev->rx_ql + i);
+
+ for (i = 0; i != dev->data->nb_tx_queues; i++)
+ reset_queue_local(dev->tx_ql + i);
+
dev->data->nb_rx_queues = 0;
rte_free(dev->data->rx_queues);
dev->data->rx_queues = NULL;
+ rte_free(dev->rx_ql);
+ dev->rx_ql = NULL;
+
dev->data->nb_tx_queues = 0;
rte_free(dev->data->tx_queues);
dev->data->tx_queues = NULL;
+ rte_free(dev->tx_ql);
+ dev->tx_ql = NULL;
}
int
@@ -3111,10 +3199,10 @@ rte_eth_add_rx_callback(uint16_t port_id, uint16_t queue_id,
rte_spinlock_lock(&rte_eth_rx_cb_lock);
/* Add the callbacks in fifo order. */
struct rte_eth_rxtx_callback *tail =
- rte_eth_devices[port_id].post_rx_burst_cbs[queue_id];
+ rte_eth_devices[port_id].rx_ql[queue_id].cbs;
if (!tail) {
- rte_eth_devices[port_id].post_rx_burst_cbs[queue_id] = cb;
+ rte_eth_devices[port_id].rx_ql[queue_id].cbs = cb;
} else {
while (tail->next)
@@ -3153,9 +3241,9 @@ rte_eth_add_first_rx_callback(uint16_t port_id, uint16_t queue_id,
rte_spinlock_lock(&rte_eth_rx_cb_lock);
/* Add the callbacks at fisrt position*/
- cb->next = rte_eth_devices[port_id].post_rx_burst_cbs[queue_id];
+ cb->next = rte_eth_devices[port_id].rx_ql[queue_id].cbs;
rte_smp_wmb();
- rte_eth_devices[port_id].post_rx_burst_cbs[queue_id] = cb;
+ rte_eth_devices[port_id].rx_ql[queue_id].cbs = cb;
rte_spinlock_unlock(&rte_eth_rx_cb_lock);
return cb;
@@ -3189,10 +3277,10 @@ rte_eth_add_tx_callback(uint16_t port_id, uint16_t queue_id,
rte_spinlock_lock(&rte_eth_tx_cb_lock);
/* Add the callbacks in fifo order. */
struct rte_eth_rxtx_callback *tail =
- rte_eth_devices[port_id].pre_tx_burst_cbs[queue_id];
+ rte_eth_devices[port_id].tx_ql[queue_id].cbs;
if (!tail) {
- rte_eth_devices[port_id].pre_tx_burst_cbs[queue_id] = cb;
+ rte_eth_devices[port_id].tx_ql[queue_id].cbs = cb;
} else {
while (tail->next)
@@ -3223,7 +3311,7 @@ rte_eth_remove_rx_callback(uint16_t port_id, uint16_t queue_id,
int ret = -EINVAL;
rte_spinlock_lock(&rte_eth_rx_cb_lock);
- prev_cb = &dev->post_rx_burst_cbs[queue_id];
+ prev_cb = &dev->rx_ql[queue_id].cbs;
for (; *prev_cb != NULL; prev_cb = &cb->next) {
cb = *prev_cb;
if (cb == user_cb) {
@@ -3257,7 +3345,7 @@ rte_eth_remove_tx_callback(uint16_t port_id, uint16_t queue_id,
struct rte_eth_rxtx_callback **prev_cb;
rte_spinlock_lock(&rte_eth_tx_cb_lock);
- prev_cb = &dev->pre_tx_burst_cbs[queue_id];
+ prev_cb = &dev->tx_ql[queue_id].cbs;
for (; *prev_cb != NULL; prev_cb = &cb->next) {
cb = *prev_cb;
if (cb == user_cb) {
diff --git a/lib/librte_ether/rte_ethdev.h b/lib/librte_ether/rte_ethdev.h
index 341c2d624..d62e1bcc3 100644
--- a/lib/librte_ether/rte_ethdev.h
+++ b/lib/librte_ether/rte_ethdev.h
@@ -1694,6 +1694,10 @@ typedef uint16_t (*rte_tx_callback_fn)(uint16_t port, uint16_t queue,
* @internal
* Structure used to hold information about the callbacks to be called for a
* queue on RX and TX.
+ * On RX: user-supplied functions called from rte_eth_rx_burst to post-process
+ * received packets before passing them to the user
+ * On TX: user-supplied functions called from rte_eth_tx_burst to pre-process
+ * received packets before passing them to the driver for transmission.
*/
struct rte_eth_rxtx_callback {
struct rte_eth_rxtx_callback *next;
@@ -1715,6 +1719,24 @@ enum rte_eth_dev_state {
/**
* @internal
+ * Structure to hold process non-shareable information
+ * about RX/TX queue of the ethernet device.
+ */
+struct rte_eth_queue_local {
+ /**
+ * placeholder for queue specific rx/tx and tx_preapare
+ * functions pointers
+ */
+ eth_rx_burst_t rx_pkt_burst; /**< receive function pointer. */
+ eth_tx_burst_t tx_pkt_burst; /**< transmit function pointer. */
+ eth_tx_prep_t tx_pkt_prepare; /**< transmit prepare function pointer. */
+
+ struct rte_eth_rxtx_callback *cbs;
+ /**< list of user supplied callbacks */
+} __rte_cache_aligned;
+
+/**
+ * @internal
* The generic data structure associated with each ethernet device.
*
* Pointers to burst-oriented packet receive and transmit functions are
@@ -1730,19 +1752,13 @@ struct rte_eth_dev {
struct rte_eth_dev_data *data; /**< Pointer to device data */
const struct eth_dev_ops *dev_ops; /**< Functions exported by PMD */
struct rte_device *device; /**< Backing device */
+
+ struct rte_eth_queue_local *rx_ql; /**< RX queues local data */
+ struct rte_eth_queue_local *tx_ql; /**< TX queues local data */
+
struct rte_intr_handle *intr_handle; /**< Device interrupt handle */
/** User application callbacks for NIC interrupts */
struct rte_eth_dev_cb_list link_intr_cbs;
- /**
- * User-supplied functions called from rx_burst to post-process
- * received packets before passing them to the user
- */
- struct rte_eth_rxtx_callback *post_rx_burst_cbs[RTE_MAX_QUEUES_PER_PORT];
- /**
- * User-supplied functions called from tx_burst to pre-process
- * received packets before passing them to the driver for transmission.
- */
- struct rte_eth_rxtx_callback *pre_tx_burst_cbs[RTE_MAX_QUEUES_PER_PORT];
enum rte_eth_dev_state state; /**< Flag indicating the port state */
void *security_ctx; /**< Context for security ops */
} __rte_cache_aligned;
@@ -2883,29 +2899,37 @@ static inline uint16_t
rte_eth_rx_burst(uint16_t port_id, uint16_t queue_id,
struct rte_mbuf **rx_pkts, const uint16_t nb_pkts)
{
+ uint16_t nb_rx;
struct rte_eth_dev *dev = &rte_eth_devices[port_id];
#ifdef RTE_LIBRTE_ETHDEV_DEBUG
RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, 0);
RTE_FUNC_PTR_OR_ERR_RET(*dev->rx_pkt_burst, 0);
- if (queue_id >= dev->data->nb_rx_queues) {
+ if (queue_id >= dev->data->nb_rx_queues || dev->rx_ql == NULL) {
RTE_PMD_DEBUG_TRACE("Invalid RX queue_id=%d\n", queue_id);
return 0;
}
#endif
- int16_t nb_rx = (*dev->rx_pkt_burst)(dev->data->rx_queues[queue_id],
+ nb_rx = (*dev->rx_pkt_burst)(dev->data->rx_queues[queue_id],
rx_pkts, nb_pkts);
#ifdef RTE_ETHDEV_RXTX_CALLBACKS
- struct rte_eth_rxtx_callback *cb = dev->post_rx_burst_cbs[queue_id];
+ {
+ struct rte_eth_queue_local *ql;
+ struct rte_eth_rxtx_callback *cb;
- if (unlikely(cb != NULL)) {
- do {
- nb_rx = cb->fn.rx(port_id, queue_id, rx_pkts, nb_rx,
+ ql = dev->rx_ql + queue_id;
+ cb = ql->cbs;
+
+ if (unlikely(cb != NULL)) {
+ do {
+ nb_rx = cb->fn.rx(port_id, queue_id,
+ rx_pkts, nb_rx,
nb_pkts, cb->param);
- cb = cb->next;
- } while (cb != NULL);
+ cb = cb->next;
+ } while (cb != NULL);
+ }
}
#endif
@@ -3151,21 +3175,28 @@ rte_eth_tx_burst(uint16_t port_id, uint16_t queue_id,
RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, 0);
RTE_FUNC_PTR_OR_ERR_RET(*dev->tx_pkt_burst, 0);
- if (queue_id >= dev->data->nb_tx_queues) {
+ if (queue_id >= dev->data->nb_tx_queues || dev->tx_ql == NULL) {
RTE_PMD_DEBUG_TRACE("Invalid TX queue_id=%d\n", queue_id);
return 0;
}
#endif
#ifdef RTE_ETHDEV_RXTX_CALLBACKS
- struct rte_eth_rxtx_callback *cb = dev->pre_tx_burst_cbs[queue_id];
-
- if (unlikely(cb != NULL)) {
- do {
- nb_pkts = cb->fn.tx(port_id, queue_id, tx_pkts, nb_pkts,
- cb->param);
- cb = cb->next;
- } while (cb != NULL);
+ {
+ struct rte_eth_queue_local *ql;
+ struct rte_eth_rxtx_callback *cb;
+
+ ql = dev->tx_ql + queue_id;
+ cb = ql->cbs;
+
+ if (unlikely(cb != NULL)) {
+ do {
+ nb_pkts = cb->fn.tx(port_id, queue_id,
+ tx_pkts, nb_pkts,
+ cb->param);
+ cb = cb->next;
+ } while (cb != NULL);
+ }
}
#endif
--
2.13.5
next prev parent reply other threads:[~2017-12-01 14:48 UTC|newest]
Thread overview: 6+ messages / expand[flat|nested] mbox.gz Atom feed top
2017-12-01 14:47 [dpdk-dev] [RFC PATCH 0/3] *** SUBJECT HERE *** Konstantin Ananyev
2017-12-01 14:48 ` Konstantin Ananyev [this message]
2017-12-01 14:48 ` [dpdk-dev] [RFC PATCH 2/3] ethdev: make it safe to remove rx/tx callback at runtime Konstantin Ananyev
2017-12-01 14:48 ` [dpdk-dev] [RFC PATCH 3/3] doc: ethdev ABI change deprecation notice Konstantin Ananyev
2017-12-12 14:55 ` Kovacevic, Marko
2017-12-01 14:56 ` [dpdk-dev] [RFC PATCH 0/3] *** SUBJECT HERE *** Ananyev, Konstantin
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=1512139682-11114-2-git-send-email-konstantin.ananyev@intel.com \
--to=konstantin.ananyev@intel.com \
--cc=dev@dpdk.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).