From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mga05.intel.com (mga05.intel.com [192.55.52.43]) by dpdk.org (Postfix) with ESMTP id 2CB201D004 for ; Fri, 8 Jun 2018 20:15:02 +0200 (CEST) X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from orsmga002.jf.intel.com ([10.7.209.21]) by fmsmga105.fm.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 08 Jun 2018 11:15:02 -0700 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.49,490,1520924400"; d="scan'208";a="65468575" Received: from unknown (HELO localhost.localdomain.localdomain) ([10.224.122.193]) by orsmga002.jf.intel.com with ESMTP; 08 Jun 2018 11:15:00 -0700 From: Nikhil Rao To: jerin.jacob@caviumnetworks.com Cc: dev@dpdk.org, Nikhil Rao Date: Fri, 8 Jun 2018 23:45:18 +0530 Message-Id: <1528481718-7241-6-git-send-email-nikhil.rao@intel.com> X-Mailer: git-send-email 1.8.3.1 In-Reply-To: <1528481718-7241-1-git-send-email-nikhil.rao@intel.com> References: <1528481718-7241-1-git-send-email-nikhil.rao@intel.com> Subject: [dpdk-dev] [PATCH v1 4/4] eventdev: add interrupt driven queues in Rx event adapter X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Fri, 08 Jun 2018 18:15:04 -0000 Add support for interrupt driven queues when eth device is configured for rxq interrupts and servicing weight for the queue is configured to be zero. A interrupt driven packet received counter has been added to rte_event_eth_rx_adapter_stats. Signed-off-by: Nikhil Rao --- lib/librte_eventdev/rte_event_eth_rx_adapter.h | 5 +- lib/librte_eventdev/rte_event_eth_rx_adapter.c | 1049 +++++++++++++++++++- test/test/test_event_eth_rx_adapter.c | 261 ++++- .../prog_guide/event_ethernet_rx_adapter.rst | 24 + config/common_base | 1 + lib/librte_eventdev/Makefile | 4 +- 6 files changed, 1296 insertions(+), 48 deletions(-) diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.h b/lib/librte_eventdev/rte_event_eth_rx_adapter.h index 307b2b5..97f25e9 100644 --- a/lib/librte_eventdev/rte_event_eth_rx_adapter.h +++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.h @@ -64,8 +64,7 @@ * the service function ID of the adapter in this case. * * Note: - * 1) Interrupt driven receive queues are currently unimplemented. - * 2) Devices created after an instance of rte_event_eth_rx_adapter_create + * 1) Devices created after an instance of rte_event_eth_rx_adapter_create * should be added to a new instance of the rx adapter. */ @@ -199,6 +198,8 @@ struct rte_event_eth_rx_adapter_stats { * block cycles can be used to compute the percentage of * cycles the service is blocked by the event device. */ + uint64_t rx_intr_packets; + /**< Received packet count for interrupt mode Rx queues */ }; /** diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.c b/lib/librte_eventdev/rte_event_eth_rx_adapter.c index 40e9bc9..d038ee4 100644 --- a/lib/librte_eventdev/rte_event_eth_rx_adapter.c +++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.c @@ -2,6 +2,8 @@ * Copyright(c) 2017 Intel Corporation. * All rights reserved. */ +#include +#include #include #include #include @@ -11,6 +13,7 @@ #include #include #include +#include #include "rte_eventdev.h" #include "rte_eventdev_pmd.h" @@ -24,6 +27,36 @@ #define ETH_RX_ADAPTER_MEM_NAME_LEN 32 #define RSS_KEY_SIZE 40 +/* value written to intr thread pipe to signal thread exit */ +#define ETH_BRIDGE_INTR_THREAD_EXIT 1 +/* Sentinel value to detect initialized file handle */ +#define INIT_FD -1 + +/* + * Used to communicate exit notification to interrupt thread + */ +union intr_pipefds { + RTE_STD_C11 + struct { + int pipefd[2]; + }; + struct { + int readfd; + int writefd; + }; +}; + +/* + * Used to store port and queue ID of interrupting Rx queue + */ +union queue_data { + RTE_STD_C11 + void *ptr; + struct { + uint16_t port; + uint16_t queue; + }; +}; /* * There is an instance of this struct per polled Rx queue added to the @@ -75,6 +108,34 @@ struct rte_event_eth_rx_adapter { uint16_t enq_block_count; /* Block start ts */ uint64_t rx_enq_block_start_ts; + /* epoll fd used to wait for Rx interrupts */ + int epd; + /* Num of interrupt driven interrupt queues */ + uint32_t num_rx_intr; + /* Used to send of interrupting Rx queues from + * the interrupt thread to the Rx thread + */ + struct rte_ring *intr_ring; + /* Rx Queue data (dev id, queue id) for the last non-empty + * queue polled + */ + union queue_data qd; + /* queue_data is valid */ + int qd_valid; + /* Interrupt ring lock, synchronizes Rx thread + * and interrupt thread + */ + rte_spinlock_t intr_ring_lock; + /* event array passed to rte_poll_wait */ + struct rte_epoll_event *epoll_events; + /* Count of interrupt vectors in use */ + uint32_t num_intr_vec; + /* fd used to send intr thread an exit notification */ + union intr_pipefds intr_pipe; + /* Event used in exit notification for intr thread */ + struct rte_epoll_event exit_ev; + /* Thread blocked on Rx interrupts */ + pthread_t rx_intr_thread; /* Configuration callback for rte_service configuration */ rte_event_eth_rx_adapter_conf_cb conf_cb; /* Configuration callback argument */ @@ -111,19 +172,40 @@ struct eth_device_info { uint8_t dev_rx_started; /* Number of queues added for this device */ uint16_t nb_dev_queues; - /* If nb_rx_poll > 0, the start callback will + /* Number of poll based queues + * If nb_rx_poll > 0, the start callback will * be invoked if not already invoked */ uint16_t nb_rx_poll; + /* Number of interrupt based queues + * If nb_rx_intr > 0, the start callback will + * be invoked if not already invoked. + */ + uint16_t nb_rx_intr; + /* Number of queues that use the shared interrupt */ + uint16_t nb_shared_intr; /* sum(wrr(q)) for all queues within the device * useful when deleting all device queues */ uint32_t wrr_len; + /* Intr based queue index to start polling from, this is used + * if the number of shared interrupts is non-zero + */ + uint16_t next_q_idx; + /* Intr based queue indices */ + uint16_t *intr_queue; + /* device generates per Rx queue interrupt for queue index + * for queue indices < RTE_MAX_RXTX_INTR_VEC_ID - 1 + */ + int multi_intr_cap; + /* shared interrupt enabled */ + int shared_intr_enabled; }; /* Per Rx queue */ struct eth_rx_queue_info { int queue_enabled; /* True if added */ + int intr_enabled; uint16_t wt; /* Polling weight */ uint8_t event_queue_id; /* Event queue to enqueue packets to */ uint8_t sched_type; /* Sched type for events */ @@ -150,7 +232,7 @@ struct eth_rx_queue_info { static inline int rxa_sw_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter) { - return rx_adapter->num_rx_polled; + return rx_adapter->num_rx_polled + rx_adapter->num_rx_intr; } /* Greatest common divisor */ @@ -195,6 +277,28 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b) } static inline int +rxa_shared_intr(struct eth_device_info *dev_info, + int rx_queue_id) +{ + int multi_intr_cap = + rte_intr_cap_multiple(dev_info->dev->intr_handle); + return !multi_intr_cap || + rx_queue_id >= RTE_MAX_RXTX_INTR_VEC_ID - 1; +} + +static inline int +rxa_intr_queue(struct eth_device_info *dev_info, + int rx_queue_id) +{ + struct eth_rx_queue_info *queue_info; + + queue_info = &dev_info->rx_queue[rx_queue_id]; + return dev_info->rx_queue && + !dev_info->internal_event_port && + queue_info->queue_enabled && queue_info->wt == 0; +} + +static inline int rxa_polled_queue(struct eth_device_info *dev_info, int rx_queue_id) { @@ -206,6 +310,95 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b) queue_info->queue_enabled && queue_info->wt != 0; } +/* Calculate change in number of vectors after Rx queue ID is add/deleted */ +static int +rxa_nb_intr_vect(struct eth_device_info *dev_info, int rx_queue_id, int add) +{ + uint16_t i; + int n, s; + uint16_t nbq; + + nbq = dev_info->dev->data->nb_rx_queues; + n = 0; /* non shared count */ + s = 0; /* shared count */ + + if (rx_queue_id == -1) { + for (i = 0; i < nbq; i++) { + if (!rxa_shared_intr(dev_info, i)) + n += add ? !rxa_intr_queue(dev_info, i) : + rxa_intr_queue(dev_info, i); + else + s += add ? !rxa_intr_queue(dev_info, i) : + rxa_intr_queue(dev_info, i); + } + + if (s > 0) { + if ((add && dev_info->nb_shared_intr == 0) || + (!add && dev_info->nb_shared_intr)) + n += 1; + } + } else { + if (!rxa_shared_intr(dev_info, rx_queue_id)) + n = add ? !rxa_intr_queue(dev_info, rx_queue_id) : + rxa_intr_queue(dev_info, rx_queue_id); + else + n = add ? !dev_info->nb_shared_intr : + dev_info->nb_shared_intr == 1; + } + + return add ? n : -n; +} + +/* Calculate nb_rx_intr after deleting interrupt mode rx queues + */ +static void +rxa_calc_nb_post_intr_del(struct rte_event_eth_rx_adapter *rx_adapter, + struct eth_device_info *dev_info, + int rx_queue_id, + uint32_t *nb_rx_intr) +{ + uint32_t intr_diff; + + if (rx_queue_id == -1) + intr_diff = dev_info->nb_rx_intr; + else + intr_diff = rxa_intr_queue(dev_info, rx_queue_id); + + *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff; +} + +/* Calculate nb_rx_* after adding interrupt mode rx queues, newly added + * interrupt queues could currently be poll mode Rx queues + */ +static void +rxa_calc_nb_post_add_intr(struct rte_event_eth_rx_adapter *rx_adapter, + struct eth_device_info *dev_info, + int rx_queue_id, + uint32_t *nb_rx_poll, + uint32_t *nb_rx_intr, + uint32_t *nb_wrr) +{ + uint32_t intr_diff; + uint32_t poll_diff; + uint32_t wrr_len_diff; + + if (rx_queue_id == -1) { + intr_diff = dev_info->dev->data->nb_rx_queues - + dev_info->nb_rx_intr; + poll_diff = dev_info->nb_rx_poll; + wrr_len_diff = dev_info->wrr_len; + } else { + intr_diff = !rxa_intr_queue(dev_info, rx_queue_id); + poll_diff = rxa_polled_queue(dev_info, rx_queue_id); + wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt : + 0; + } + + *nb_rx_intr = rx_adapter->num_rx_intr + intr_diff; + *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff; + *nb_wrr = rx_adapter->wrr_len - wrr_len_diff; +} + /* Calculate size of the eth_rx_poll and wrr_sched arrays * after deleting poll mode rx queues */ @@ -240,17 +433,21 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b) int rx_queue_id, uint16_t wt, uint32_t *nb_rx_poll, + uint32_t *nb_rx_intr, uint32_t *nb_wrr) { + uint32_t intr_diff; uint32_t poll_diff; uint32_t wrr_len_diff; if (rx_queue_id == -1) { + intr_diff = dev_info->nb_rx_intr; poll_diff = dev_info->dev->data->nb_rx_queues - dev_info->nb_rx_poll; wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues - dev_info->wrr_len; } else { + intr_diff = rxa_intr_queue(dev_info, rx_queue_id); poll_diff = !rxa_polled_queue(dev_info, rx_queue_id); wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ? wt - dev_info->rx_queue[rx_queue_id].wt : @@ -258,6 +455,7 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b) } *nb_rx_poll = rx_adapter->num_rx_polled + poll_diff; + *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff; *nb_wrr = rx_adapter->wrr_len + wrr_len_diff; } @@ -268,10 +466,15 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b) int rx_queue_id, uint16_t wt, uint32_t *nb_rx_poll, + uint32_t *nb_rx_intr, uint32_t *nb_wrr) { - rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id, - wt, nb_rx_poll, nb_wrr); + if (wt != 0) + rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id, + wt, nb_rx_poll, nb_rx_intr, nb_wrr); + else + rxa_calc_nb_post_add_intr(rx_adapter, dev_info, rx_queue_id, + nb_rx_poll, nb_rx_intr, nb_wrr); } /* Calculate nb_rx_* after deleting rx_queue_id */ @@ -280,10 +483,13 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b) struct eth_device_info *dev_info, int rx_queue_id, uint32_t *nb_rx_poll, + uint32_t *nb_rx_intr, uint32_t *nb_wrr) { rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll, nb_wrr); + rxa_calc_nb_post_intr_del(rx_adapter, dev_info, rx_queue_id, + nb_rx_intr); } /* @@ -622,7 +828,8 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b) uint16_t port_id, uint16_t queue_id, uint32_t rx_count, - uint32_t max_rx) + uint32_t max_rx, + int *rxq_empty) { struct rte_mbuf *mbufs[BATCH_SIZE]; struct rte_eth_event_enqueue_buffer *buf = @@ -632,6 +839,8 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b) uint16_t n; uint32_t nb_rx = 0; + if (rxq_empty) + *rxq_empty = 0; /* Don't do a batch dequeue from the rx queue if there isn't * enough space in the enqueue buffer. */ @@ -641,8 +850,11 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b) stats->rx_poll_count++; n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE); - if (unlikely(!n)) + if (unlikely(!n)) { + if (rxq_empty) + *rxq_empty = 1; break; + } rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n); nb_rx += n; if (rx_count + nb_rx > max_rx) @@ -655,6 +867,237 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b) return nb_rx; } +static inline void +rxa_intr_ring_enqueue(struct rte_event_eth_rx_adapter *rx_adapter, + void *data) +{ + uint16_t port_id; + uint16_t queue; + int err; + union queue_data qd; + struct eth_device_info *dev_info; + struct eth_rx_queue_info *queue_info; + int *intr_enabled; + + qd.ptr = data; + port_id = qd.port; + queue = qd.queue; + + dev_info = &rx_adapter->eth_devices[port_id]; + queue_info = &dev_info->rx_queue[queue]; + rte_spinlock_lock(&rx_adapter->intr_ring_lock); + if (rxa_shared_intr(dev_info, queue)) + intr_enabled = &dev_info->shared_intr_enabled; + else + intr_enabled = &queue_info->intr_enabled; + + if (*intr_enabled) { + *intr_enabled = 0; + err = rte_ring_enqueue(rx_adapter->intr_ring, data); + /* Entry should always be available. + * The ring size equals the maximum number of interrupt + * vectors supported (an interrupt vector is shared in + * case of shared interrupts) + */ + if (err) + RTE_EDEV_LOG_ERR("Failed to enqueue interrupt" + " to ring: %s", strerror(err)); + else + rte_eth_dev_rx_intr_disable(port_id, queue); + } + rte_spinlock_unlock(&rx_adapter->intr_ring_lock); +} + +static int +rxa_intr_ring_check_avail(struct rte_event_eth_rx_adapter *rx_adapter, + uint32_t num_intr_vec) +{ + if (rx_adapter->num_intr_vec + num_intr_vec > + RTE_EVENT_ETH_INTR_RING_SIZE) { + RTE_EDEV_LOG_ERR("Exceeded intr ring slots current" + " %d needed %d limit %d", rx_adapter->num_intr_vec, + num_intr_vec, RTE_EVENT_ETH_INTR_RING_SIZE); + return -ENOSPC; + } + + return 0; +} + +/* Delete entries for (dev, queue) from the interrupt ring */ +static void +rxa_intr_ring_del_entries(struct rte_event_eth_rx_adapter *rx_adapter, + struct eth_device_info *dev_info, + uint16_t rx_queue_id) +{ + int i, n; + union queue_data qd; + + rte_spinlock_lock(&rx_adapter->intr_ring_lock); + + n = rte_ring_count(rx_adapter->intr_ring); + for (i = 0; i < n; i++) { + rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr); + if (!rxa_shared_intr(dev_info, rx_queue_id)) { + if (qd.port == dev_info->dev->data->port_id && + qd.queue == rx_queue_id) + continue; + } else { + if (qd.port == dev_info->dev->data->port_id) + continue; + } + rte_ring_enqueue(rx_adapter->intr_ring, qd.ptr); + } + + rte_spinlock_unlock(&rx_adapter->intr_ring_lock); +} + +/* pthread callback handling interrupt mode receive queues + * After receiving an Rx interrupt, it enqueues the port id and queue id of the + * interrupting queue to the adapter's ring buffer for interrupt events. + * These events are picked up by rxa_intr_ring_dequeue() which is invoked from + * the adapter service function. + */ +static void * +rxa_intr_thread(void *arg) +{ + struct rte_event_eth_rx_adapter *rx_adapter = arg; + struct rte_epoll_event *epoll_events = rx_adapter->epoll_events; + int n, i; + uint8_t val; + ssize_t bytes_read; + + while (1) { + n = rte_epoll_wait(rx_adapter->epd, epoll_events, + RTE_EVENT_ETH_INTR_RING_SIZE + 1, -1); + if (unlikely(n < 0)) + RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d", + n); + for (i = 0; i < n; i++) { + if (epoll_events[i].fd == rx_adapter->intr_pipe.readfd) + goto done; + rxa_intr_ring_enqueue(rx_adapter, + epoll_events[i].epdata.data); + } + } + +done: + bytes_read = read(rx_adapter->intr_pipe.readfd, &val, sizeof(val)); + if (bytes_read != sizeof(val)) + RTE_EDEV_LOG_ERR("Failed to read from pipe %s", + strerror(errno)); + return NULL; +} + +/* Dequeue from interrupt ring and enqueue received + * mbufs to eventdev + */ +static inline uint32_t +rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter) +{ + uint32_t n; + uint32_t nb_rx = 0; + int rxq_empty; + struct rte_eth_event_enqueue_buffer *buf; + rte_spinlock_t *ring_lock; + uint8_t max_done = 0; + + if (rx_adapter->num_rx_intr == 0) + return 0; + + if (rte_ring_count(rx_adapter->intr_ring) == 0 + && !rx_adapter->qd_valid) + return 0; + + buf = &rx_adapter->event_enqueue_buffer; + ring_lock = &rx_adapter->intr_ring_lock; + + if (buf->count >= BATCH_SIZE) + rxa_flush_event_buffer(rx_adapter); + + while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) { + struct eth_device_info *dev_info; + uint16_t port; + uint16_t queue; + union queue_data qd = rx_adapter->qd; + int err; + + if (!rx_adapter->qd_valid) { + struct eth_rx_queue_info *queue_info; + + rte_spinlock_lock(ring_lock); + err = rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr); + if (err) { + rte_spinlock_unlock(ring_lock); + break; + } + + port = qd.port; + queue = qd.queue; + rx_adapter->qd = qd; + rx_adapter->qd_valid = 1; + dev_info = &rx_adapter->eth_devices[port]; + if (rxa_shared_intr(dev_info, queue)) + dev_info->shared_intr_enabled = 1; + else { + queue_info = &dev_info->rx_queue[queue]; + queue_info->intr_enabled = 1; + } + rte_eth_dev_rx_intr_enable(port, queue); + rte_spinlock_unlock(ring_lock); + } else { + port = qd.port; + queue = qd.queue; + + dev_info = &rx_adapter->eth_devices[port]; + } + + if (rxa_shared_intr(dev_info, queue)) { + uint16_t i; + uint16_t nb_queues; + + nb_queues = dev_info->dev->data->nb_rx_queues; + n = 0; + for (i = dev_info->next_q_idx; i < nb_queues; i++) { + uint8_t enq_buffer_full; + + if (!rxa_intr_queue(dev_info, i)) + continue; + n = rxa_eth_rx(rx_adapter, port, i, nb_rx, + rx_adapter->max_nb_rx, + &rxq_empty); + nb_rx += n; + + enq_buffer_full = !rxq_empty && n == 0; + max_done = nb_rx > rx_adapter->max_nb_rx; + + if (enq_buffer_full || max_done) { + dev_info->next_q_idx = i; + goto done; + } + } + + rx_adapter->qd_valid = 0; + + /* Reinitialize for next interrupt */ + dev_info->next_q_idx = dev_info->multi_intr_cap ? + RTE_MAX_RXTX_INTR_VEC_ID - 1 : + 0; + } else { + n = rxa_eth_rx(rx_adapter, port, queue, nb_rx, + rx_adapter->max_nb_rx, + &rxq_empty); + rx_adapter->qd_valid = !rxq_empty; + nb_rx += n; + if (nb_rx > rx_adapter->max_nb_rx) + break; + } + } + +done: + rx_adapter->stats.rx_intr_packets += nb_rx; + return nb_rx; +} + /* * Polls receive queues added to the event adapter and enqueues received * packets to the event device. @@ -668,7 +1111,7 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b) * the hypervisor's switching layer where adjustments can be made to deal with * it. */ -static inline void +static inline uint32_t rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter) { uint32_t num_queue; @@ -676,7 +1119,6 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b) struct rte_eth_event_enqueue_buffer *buf; uint32_t wrr_pos; uint32_t max_nb_rx; - struct rte_event_eth_rx_adapter_stats *stats; wrr_pos = rx_adapter->wrr_pos; max_nb_rx = rx_adapter->max_nb_rx; @@ -696,10 +1138,11 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b) rxa_flush_event_buffer(rx_adapter); if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) { rx_adapter->wrr_pos = wrr_pos; - break; + return nb_rx; } - nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx); + nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx, + NULL); if (nb_rx > max_nb_rx) { rx_adapter->wrr_pos = (wrr_pos + 1) % rx_adapter->wrr_len; @@ -709,14 +1152,14 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b) if (++wrr_pos == rx_adapter->wrr_len) wrr_pos = 0; } - - stats->rx_packets += nb_rx; + return nb_rx; } static int rxa_service_func(void *args) { struct rte_event_eth_rx_adapter *rx_adapter = args; + struct rte_event_eth_rx_adapter_stats *stats; if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0) return 0; @@ -724,7 +1167,10 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b) return 0; rte_spinlock_unlock(&rx_adapter->rx_lock); } - rxa_poll(rx_adapter); + + stats = &rx_adapter->stats; + stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter); + stats->rx_packets += rxa_poll(rx_adapter); rte_spinlock_unlock(&rx_adapter->rx_lock); return 0; } @@ -809,6 +1255,443 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b) } static int +rxa_init_epd(struct rte_event_eth_rx_adapter *rx_adapter) +{ + int err; + + if (rx_adapter->epd != INIT_FD) + return 0; + + rx_adapter->epd = epoll_create1(EPOLL_CLOEXEC); + if (rx_adapter->epd < 0) { + RTE_EDEV_LOG_ERR("epoll_create1() failed, err %d", errno); + return -errno; + } + + if (pipe(rx_adapter->intr_pipe.pipefd) < 0) { + err = -errno; + RTE_EDEV_LOG_ERR("pipe() error, err %d", errno); + goto error_0; + } + + rx_adapter->exit_ev.epdata.event = EPOLLIN; + rx_adapter->exit_ev.epdata.cb_fun = NULL; + err = rte_epoll_ctl(rx_adapter->epd, EPOLL_CTL_ADD, + rx_adapter->intr_pipe.readfd, &rx_adapter->exit_ev); + if (err) { + RTE_EDEV_LOG_ERR("Failed to add epoll instance, err %d", err); + goto error_1; + } + + return 0; + +error_1: + close(rx_adapter->intr_pipe.writefd); + close(rx_adapter->intr_pipe.readfd); + rx_adapter->intr_pipe.writefd = INIT_FD; + rx_adapter->intr_pipe.readfd = INIT_FD; + +error_0: + close(rx_adapter->epd); + rx_adapter->epd = INIT_FD; + + return err; +} + +/* Affinitize interrupt thread to the same cores as the service function + * If the service function has not been mapped to cores then affinitize + * the interrupt thread to the master lcore + */ +static int +rxa_affinitize_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter) +{ + rte_cpuset_t rx_cpuset; + uint32_t i; + uint32_t n_sc; + uint32_t service_cores[RTE_MAX_LCORE]; + int err; + + CPU_ZERO(&rx_cpuset); + n_sc = rte_service_lcore_list(service_cores, RTE_MAX_LCORE); + + for (i = 0; i < n_sc; i++) { + struct lcore_config *c = &lcore_config[service_cores[i]]; + if (rte_service_map_lcore_get(rx_adapter->service_id, + service_cores[i])) + CPU_OR(&rx_cpuset, &rx_cpuset, &c->cpuset); + } + + if (CPU_COUNT(&rx_cpuset) == 0) { + struct rte_config *cfg = rte_eal_get_configuration(); + struct lcore_config *c = &lcore_config[cfg->master_lcore]; + CPU_OR(&rx_cpuset, &rx_cpuset, &c->cpuset); + } + + err = pthread_setaffinity_np(rx_adapter->rx_intr_thread, + sizeof(cpu_set_t), + &rx_cpuset); + if (err != 0) + RTE_EDEV_LOG_ERR("pthread_setaffinity_np() failed, err %d", + err); + return -err; +} + +static int +rxa_create_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter) +{ + int err; + uint8_t val; + char thread_name[RTE_MAX_THREAD_NAME_LEN]; + + if (rx_adapter->intr_ring) + return 0; + + rx_adapter->intr_ring = rte_ring_create("intr_ring", + RTE_EVENT_ETH_INTR_RING_SIZE, + rte_socket_id(), 0); + if (!rx_adapter->intr_ring) + return -ENOMEM; + + rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name, + (RTE_EVENT_ETH_INTR_RING_SIZE + 1) * + sizeof(struct rte_epoll_event), + RTE_CACHE_LINE_SIZE, + rx_adapter->socket_id); + if (!rx_adapter->epoll_events) { + err = -ENOMEM; + goto error; + } + + rte_spinlock_init(&rx_adapter->intr_ring_lock); + + snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, + "rx-intr-thread"); + err = pthread_create(&rx_adapter->rx_intr_thread, NULL, + rxa_intr_thread, rx_adapter); + if (err) { + err = -err; + goto error; + } + + err = rxa_affinitize_intr_thread(rx_adapter); + if (!err) { + rte_thread_setname(rx_adapter->rx_intr_thread, thread_name); + return 0; + } + + val = ETH_BRIDGE_INTR_THREAD_EXIT; + /* This write wakes up the interrupt thread that is + * blocked in rte_epoll_wait() + */ + if (write(rx_adapter->intr_pipe.writefd, &val, + sizeof(val)) <= 0) { + RTE_EDEV_LOG_ERR("Failed to notify intr rx thread %s", + strerror(errno)); + err = -errno; + } else { + err = pthread_join(rx_adapter->rx_intr_thread, NULL); + if (err) { + RTE_EDEV_LOG_ERR("pthread_join failed, err %d", err); + err = -err; + } + } + +error: + rte_ring_free(rx_adapter->intr_ring); + rx_adapter->intr_ring = NULL; + rx_adapter->epoll_events = NULL; + return err; +} + +static int +rxa_destroy_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter) +{ + int err; + uint8_t val = ETH_BRIDGE_INTR_THREAD_EXIT; + + /* This write wakes up the interrupt thread that is + * blocked in rte_epoll_wait() + */ + if (write(rx_adapter->intr_pipe.writefd, &val, + sizeof(val)) <= 0) { + RTE_EDEV_LOG_ERR("Failed to notify intr rx thread %s", + strerror(errno)); + return -errno; + } + + err = pthread_join(rx_adapter->rx_intr_thread, NULL); + if (err != 0) { + RTE_EDEV_LOG_ERR("Failed to join thread err: %d\n", err); + return -err; + } + + rte_free(rx_adapter->epoll_events); + rte_ring_free(rx_adapter->intr_ring); + rx_adapter->intr_ring = NULL; + rx_adapter->epoll_events = NULL; + return 0; +} + +static int +rxa_free_intr_resources(struct rte_event_eth_rx_adapter *rx_adapter) +{ + int ret; + + if (rx_adapter->num_rx_intr == 0) + return 0; + + ret = rxa_destroy_intr_thread(rx_adapter); + if (ret) + return ret; + ret = rte_epoll_ctl(rx_adapter->epd, EPOLL_CTL_DEL, + rx_adapter->intr_pipe.readfd, &rx_adapter->exit_ev); + if (ret) + RTE_EDEV_LOG_ERR("Failed to delete fd from epoll err: %d\n", + ret); + + close(rx_adapter->intr_pipe.writefd); + close(rx_adapter->intr_pipe.readfd); + close(rx_adapter->epd); + + rx_adapter->intr_pipe.writefd = INIT_FD; + rx_adapter->intr_pipe.readfd = INIT_FD; + rx_adapter->epd = INIT_FD; + + return ret; +} + +static int +rxa_disable_intr(struct rte_event_eth_rx_adapter *rx_adapter, + struct eth_device_info *dev_info, + uint16_t rx_queue_id) +{ + int err; + uint16_t eth_dev_id = dev_info->dev->data->port_id; + int sintr = rxa_shared_intr(dev_info, rx_queue_id); + + err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id); + if (err) { + RTE_EDEV_LOG_ERR("Could not disable interrupt for Rx queue %u", + rx_queue_id); + return err; + } + + err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id, + rx_adapter->epd, + RTE_INTR_EVENT_DEL, + 0); + if (err) + RTE_EDEV_LOG_ERR("Interrupt event deletion failed %d", err); + + if (sintr) + dev_info->rx_queue[rx_queue_id].intr_enabled = 0; + else + dev_info->shared_intr_enabled = 0; + return err; +} + +static int +rxa_del_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter, + struct eth_device_info *dev_info, + int rx_queue_id) +{ + int err; + int i; + int s; + + if (dev_info->nb_rx_intr == 0) + return 0; + + err = 0; + if (rx_queue_id == -1) { + s = dev_info->nb_shared_intr; + for (i = 0; i < dev_info->nb_rx_intr; i++) { + int sintr; + uint16_t q; + + q = dev_info->intr_queue[i]; + sintr = rxa_shared_intr(dev_info, q); + s -= sintr; + + if (!sintr || s == 0) { + + err = rxa_disable_intr(rx_adapter, dev_info, + q); + if (err) + return err; + rxa_intr_ring_del_entries(rx_adapter, dev_info, + q); + } + } + } else { + if (!rxa_intr_queue(dev_info, rx_queue_id)) + return 0; + if (!rxa_shared_intr(dev_info, rx_queue_id) || + dev_info->nb_shared_intr == 1) { + err = rxa_disable_intr(rx_adapter, dev_info, + rx_queue_id); + if (err) + return err; + rxa_intr_ring_del_entries(rx_adapter, dev_info, + rx_queue_id); + } + + for (i = 0; i < dev_info->nb_rx_intr; i++) { + if (dev_info->intr_queue[i] == rx_queue_id) { + for (; i < dev_info->nb_rx_intr - 1; i++) + dev_info->intr_queue[i] = + dev_info->intr_queue[i + 1]; + break; + } + } + } + + return err; +} + +static int +rxa_config_intr(struct rte_event_eth_rx_adapter *rx_adapter, + struct eth_device_info *dev_info, + uint16_t rx_queue_id) +{ + int err, err1; + uint16_t eth_dev_id = dev_info->dev->data->port_id; + union queue_data qd; + int init_fd; + uint16_t *intr_queue; + int sintr = rxa_shared_intr(dev_info, rx_queue_id); + + if (rxa_intr_queue(dev_info, rx_queue_id)) + return 0; + + intr_queue = dev_info->intr_queue; + if (dev_info->intr_queue == NULL) { + size_t len = + dev_info->dev->data->nb_rx_queues * sizeof(uint16_t); + dev_info->intr_queue = + rte_zmalloc_socket( + rx_adapter->mem_name, + len, + 0, + rx_adapter->socket_id); + if (dev_info->intr_queue == NULL) + return -ENOMEM; + } + + init_fd = rx_adapter->epd; + err = rxa_init_epd(rx_adapter); + if (err) + goto err_free_queue; + + qd.port = eth_dev_id; + qd.queue = rx_queue_id; + + err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id, + rx_adapter->epd, + RTE_INTR_EVENT_ADD, + qd.ptr); + if (err) { + RTE_EDEV_LOG_ERR("Failed to add interrupt event for" + " Rx Queue %u err %d", rx_queue_id, err); + goto err_del_fd; + } + + err = rte_eth_dev_rx_intr_enable(eth_dev_id, rx_queue_id); + if (err) { + RTE_EDEV_LOG_ERR("Could not enable interrupt for" + " Rx Queue %u err %d", rx_queue_id, err); + + goto err_del_event; + } + + err = rxa_create_intr_thread(rx_adapter); + if (!err) { + if (sintr) + dev_info->shared_intr_enabled = 1; + else + dev_info->rx_queue[rx_queue_id].intr_enabled = 1; + return 0; + } + + + err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id); + if (err) + RTE_EDEV_LOG_ERR("Could not disable interrupt for" + " Rx Queue %u err %d", rx_queue_id, err); +err_del_event: + err1 = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id, + rx_adapter->epd, + RTE_INTR_EVENT_DEL, + 0); + if (err1) { + RTE_EDEV_LOG_ERR("Could not delete event for" + " Rx Queue %u err %d", rx_queue_id, err1); + } +err_del_fd: + if (init_fd == INIT_FD) { + close(rx_adapter->epd); + rx_adapter->epd = -1; + } +err_free_queue: + if (intr_queue == NULL) + rte_free(dev_info->intr_queue); + + return err; +} + +static int +rxa_add_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter, + struct eth_device_info *dev_info, + int rx_queue_id) + +{ + int i, j, err; + int si = -1; + int shared_done = (dev_info->nb_shared_intr > 0); + + if (rx_queue_id != -1) { + if (rxa_shared_intr(dev_info, rx_queue_id) && shared_done) + return 0; + return rxa_config_intr(rx_adapter, dev_info, rx_queue_id); + } + + err = 0; + for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) { + + if (rxa_shared_intr(dev_info, i) && shared_done) + continue; + + err = rxa_config_intr(rx_adapter, dev_info, i); + + shared_done = err == 0 && rxa_shared_intr(dev_info, i); + if (shared_done) { + si = i; + dev_info->shared_intr_enabled = 1; + } + if (err) + break; + } + + if (err == 0) + return 0; + + shared_done = (dev_info->nb_shared_intr > 0); + for (j = 0; j < i; j++) { + if (rxa_intr_queue(dev_info, j)) + continue; + if (rxa_shared_intr(dev_info, j) && si != j) + continue; + err = rxa_disable_intr(rx_adapter, dev_info, j); + if (err) + break; + + } + + return err; +} + + +static int rxa_init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id) { int ret; @@ -843,6 +1726,7 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b) rx_adapter->event_port_id = rx_adapter_conf.event_port_id; rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx; rx_adapter->service_inited = 1; + rx_adapter->epd = INIT_FD; return 0; err_done: @@ -886,6 +1770,9 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b) int32_t rx_queue_id) { int pollq; + int intrq; + int sintrq; + if (rx_adapter->nb_queues == 0) return; @@ -901,9 +1788,14 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b) } pollq = rxa_polled_queue(dev_info, rx_queue_id); + intrq = rxa_intr_queue(dev_info, rx_queue_id); + sintrq = rxa_shared_intr(dev_info, rx_queue_id); rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0); rx_adapter->num_rx_polled -= pollq; dev_info->nb_rx_poll -= pollq; + rx_adapter->num_rx_intr -= intrq; + dev_info->nb_rx_intr -= intrq; + dev_info->nb_shared_intr -= intrq && sintrq; } static void @@ -915,6 +1807,8 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b) struct eth_rx_queue_info *queue_info; const struct rte_event *ev = &conf->ev; int pollq; + int intrq; + int sintrq; if (rx_queue_id == -1) { uint16_t nb_rx_queues; @@ -927,6 +1821,8 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b) } pollq = rxa_polled_queue(dev_info, rx_queue_id); + intrq = rxa_intr_queue(dev_info, rx_queue_id); + sintrq = rxa_shared_intr(dev_info, rx_queue_id); queue_info = &dev_info->rx_queue[rx_queue_id]; queue_info->event_queue_id = ev->queue_id; @@ -944,6 +1840,24 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b) if (rxa_polled_queue(dev_info, rx_queue_id)) { rx_adapter->num_rx_polled += !pollq; dev_info->nb_rx_poll += !pollq; + rx_adapter->num_rx_intr -= intrq; + dev_info->nb_rx_intr -= intrq; + dev_info->nb_shared_intr -= intrq && sintrq; + } + + if (rxa_intr_queue(dev_info, rx_queue_id)) { + rx_adapter->num_rx_polled -= pollq; + dev_info->nb_rx_poll -= pollq; + rx_adapter->num_rx_intr += !intrq; + dev_info->nb_rx_intr += !intrq; + dev_info->nb_shared_intr += !intrq && sintrq; + if (dev_info->nb_shared_intr == 1) { + if (dev_info->multi_intr_cap) + dev_info->next_q_idx = + RTE_MAX_RXTX_INTR_VEC_ID - 1; + else + dev_info->next_q_idx = 0; + } } } @@ -960,24 +1874,24 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter, uint32_t *rx_wrr; uint16_t nb_rx_queues; uint32_t nb_rx_poll, nb_wrr; + uint32_t nb_rx_intr; + int num_intr_vec; + uint16_t wt; if (queue_conf->servicing_weight == 0) { - struct rte_eth_dev_data *data = dev_info->dev->data; - if (data->dev_conf.intr_conf.rxq) { - RTE_EDEV_LOG_ERR("Interrupt driven queues" - " not supported"); - return -ENOTSUP; - } - temp_conf = *queue_conf; - /* If Rx interrupts are disabled set wt = 1 */ - temp_conf.servicing_weight = 1; + temp_conf = *queue_conf; + if (!data->dev_conf.intr_conf.rxq) { + /* If Rx interrupts are disabled set wt = 1 */ + temp_conf.servicing_weight = 1; + } queue_conf = &temp_conf; } nb_rx_queues = dev_info->dev->data->nb_rx_queues; rx_queue = dev_info->rx_queue; + wt = queue_conf->servicing_weight; if (dev_info->rx_queue == NULL) { dev_info->rx_queue = @@ -993,13 +1907,64 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter, rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id, queue_conf->servicing_weight, - &nb_rx_poll, &nb_wrr); + &nb_rx_poll, &nb_rx_intr, &nb_wrr); + + dev_info->multi_intr_cap = + rte_intr_cap_multiple(dev_info->dev->intr_handle); ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr, &rx_poll, &rx_wrr); if (ret) goto err_free_rxqueue; + if (wt == 0) { + num_intr_vec = rxa_nb_intr_vect(dev_info, rx_queue_id, 1); + + ret = rxa_intr_ring_check_avail(rx_adapter, num_intr_vec); + if (ret) + goto err_free_rxqueue; + + ret = rxa_add_intr_queue(rx_adapter, dev_info, rx_queue_id); + if (ret) + goto err_free_rxqueue; + } else { + + num_intr_vec = 0; + if (rx_adapter->num_rx_intr > nb_rx_intr) { + num_intr_vec = rxa_nb_intr_vect(dev_info, + rx_queue_id, 0); + /* interrupt based queues are being converted to + * poll mode queues, delete the interrupt configuration + * for those. + */ + ret = rxa_del_intr_queue(rx_adapter, + dev_info, rx_queue_id); + if (ret) + goto err_free_rxqueue; + } + } + + if (nb_rx_intr == 0) { + ret = rxa_free_intr_resources(rx_adapter); + if (ret) + goto err_free_rxqueue; + } + + if (wt == 0) { + uint16_t i; + + if (rx_queue_id == -1) { + for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) + dev_info->intr_queue[i] = i; + } else { + if (!rxa_intr_queue(dev_info, rx_queue_id)) + dev_info->intr_queue[nb_rx_intr - 1] = + rx_queue_id; + } + } + + + rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf); rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr); @@ -1009,6 +1974,7 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter, rx_adapter->eth_rx_poll = rx_poll; rx_adapter->wrr_sched = rx_wrr; rx_adapter->wrr_len = nb_wrr; + rx_adapter->num_intr_vec += num_intr_vec; return 0; err_free_rxqueue: @@ -1301,8 +2267,10 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter, uint32_t cap; uint32_t nb_rx_poll = 0; uint32_t nb_wrr = 0; + uint32_t nb_rx_intr; struct eth_rx_poll_entry *rx_poll = NULL; uint32_t *rx_wrr = NULL; + int num_intr_vec; RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL); RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL); @@ -1345,29 +2313,59 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter, } } else { rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id, - &nb_rx_poll, &nb_wrr); + &nb_rx_poll, &nb_rx_intr, &nb_wrr); + ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr, &rx_poll, &rx_wrr); if (ret) return ret; rte_spinlock_lock(&rx_adapter->rx_lock); + + num_intr_vec = 0; + if (rx_adapter->num_rx_intr > nb_rx_intr) { + + num_intr_vec = rxa_nb_intr_vect(dev_info, + rx_queue_id, 0); + ret = rxa_del_intr_queue(rx_adapter, dev_info, + rx_queue_id); + if (ret) + goto unlock_ret; + } + + if (nb_rx_intr == 0) { + ret = rxa_free_intr_resources(rx_adapter); + if (ret) + goto unlock_ret; + } + rxa_sw_del(rx_adapter, dev_info, rx_queue_id); rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr); rte_free(rx_adapter->eth_rx_poll); rte_free(rx_adapter->wrr_sched); + if (nb_rx_intr == 0) { + rte_free(dev_info->intr_queue); + dev_info->intr_queue = NULL; + } + rx_adapter->eth_rx_poll = rx_poll; - rx_adapter->num_rx_polled = nb_rx_poll; rx_adapter->wrr_sched = rx_wrr; rx_adapter->wrr_len = nb_wrr; + rx_adapter->num_intr_vec += num_intr_vec; if (dev_info->nb_dev_queues == 0) { rte_free(dev_info->rx_queue); dev_info->rx_queue = NULL; } +unlock_ret: rte_spinlock_unlock(&rx_adapter->rx_lock); + if (ret) { + rte_free(rx_poll); + rte_free(rx_wrr); + return ret; + } rte_service_component_runstate_set(rx_adapter->service_id, rxa_sw_adapter_queue_count(rx_adapter)); @@ -1376,7 +2374,6 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter, return ret; } - int rte_event_eth_rx_adapter_start(uint8_t id) { diff --git a/test/test/test_event_eth_rx_adapter.c b/test/test/test_event_eth_rx_adapter.c index dee632b..d247c0a 100644 --- a/test/test/test_event_eth_rx_adapter.c +++ b/test/test/test_event_eth_rx_adapter.c @@ -25,28 +25,17 @@ struct event_eth_rx_adapter_test_params { struct rte_mempool *mp; uint16_t rx_rings, tx_rings; uint32_t caps; + int rx_intr_port_inited; + uint16_t rx_intr_port; }; static struct event_eth_rx_adapter_test_params default_params; static inline int -port_init(uint8_t port, struct rte_mempool *mp) +port_init_common(uint8_t port, const struct rte_eth_conf *port_conf, + struct rte_mempool *mp) { - static const struct rte_eth_conf port_conf_default = { - .rxmode = { - .mq_mode = ETH_MQ_RX_RSS, - .max_rx_pkt_len = ETHER_MAX_LEN - }, - .rx_adv_conf = { - .rss_conf = { - .rss_hf = ETH_RSS_IP | - ETH_RSS_TCP | - ETH_RSS_UDP, - } - } - }; const uint16_t rx_ring_size = 512, tx_ring_size = 512; - struct rte_eth_conf port_conf = port_conf_default; int retval; uint16_t q; struct rte_eth_dev_info dev_info; @@ -54,7 +43,7 @@ struct event_eth_rx_adapter_test_params { if (!rte_eth_dev_is_valid_port(port)) return -1; - retval = rte_eth_dev_configure(port, 0, 0, &port_conf); + retval = rte_eth_dev_configure(port, 0, 0, port_conf); rte_eth_dev_info_get(port, &dev_info); @@ -64,7 +53,7 @@ struct event_eth_rx_adapter_test_params { /* Configure the Ethernet device. */ retval = rte_eth_dev_configure(port, default_params.rx_rings, - default_params.tx_rings, &port_conf); + default_params.tx_rings, port_conf); if (retval != 0) return retval; @@ -104,6 +93,77 @@ struct event_eth_rx_adapter_test_params { return 0; } +static inline int +port_init_rx_intr(uint8_t port, struct rte_mempool *mp) +{ + static const struct rte_eth_conf port_conf_default = { + .rxmode = { + .mq_mode = ETH_MQ_RX_RSS, + .max_rx_pkt_len = ETHER_MAX_LEN + }, + .intr_conf = { + .rxq = 1, + }, + }; + + return port_init_common(port, &port_conf_default, mp); +} + +static inline int +port_init(uint8_t port, struct rte_mempool *mp) +{ + static const struct rte_eth_conf port_conf_default = { + .rxmode = { + .mq_mode = ETH_MQ_RX_RSS, + .max_rx_pkt_len = ETHER_MAX_LEN + }, + .rx_adv_conf = { + .rss_conf = { + .rss_hf = ETH_RSS_IP | + ETH_RSS_TCP | + ETH_RSS_UDP, + } + } + }; + + return port_init_common(port, &port_conf_default, mp); +} + +static int +init_port_rx_intr(int num_ports) +{ + int retval; + uint16_t portid; + int err; + + default_params.mp = rte_pktmbuf_pool_create("packet_pool", + NB_MBUFS, + MBUF_CACHE_SIZE, + MBUF_PRIV_SIZE, + RTE_MBUF_DEFAULT_BUF_SIZE, + rte_socket_id()); + if (!default_params.mp) + return -ENOMEM; + + RTE_ETH_FOREACH_DEV(portid) { + retval = port_init_rx_intr(portid, default_params.mp); + if (retval) + continue; + err = rte_event_eth_rx_adapter_caps_get(TEST_DEV_ID, portid, + &default_params.caps); + if (err) + continue; + if (!(default_params.caps & + RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT)) { + default_params.rx_intr_port_inited = 1; + default_params.rx_intr_port = portid; + return 0; + } + rte_eth_dev_stop(portid); + } + return 0; +} + static int init_ports(int num_ports) { @@ -175,6 +235,57 @@ struct event_eth_rx_adapter_test_params { return err; } +static int +testsuite_setup_rx_intr(void) +{ + int err; + uint8_t count; + struct rte_event_dev_info dev_info; + + count = rte_event_dev_count(); + if (!count) { + printf("Failed to find a valid event device," + " testing with event_skeleton device\n"); + rte_vdev_init("event_skeleton", NULL); + } + + struct rte_event_dev_config config = { + .nb_event_queues = 1, + .nb_event_ports = 1, + }; + + err = rte_event_dev_info_get(TEST_DEV_ID, &dev_info); + config.nb_event_queue_flows = dev_info.max_event_queue_flows; + config.nb_event_port_dequeue_depth = + dev_info.max_event_port_dequeue_depth; + config.nb_event_port_enqueue_depth = + dev_info.max_event_port_enqueue_depth; + config.nb_events_limit = + dev_info.max_num_events; + + err = rte_event_dev_configure(TEST_DEV_ID, &config); + TEST_ASSERT(err == 0, "Event device initialization failed err %d\n", + err); + + /* + * eth devices like octeontx use event device to receive packets + * so rte_eth_dev_start invokes rte_event_dev_start internally, so + * call init_ports after rte_event_dev_configure + */ + err = init_port_rx_intr(rte_eth_dev_count_total()); + TEST_ASSERT(err == 0, "Port initialization failed err %d\n", err); + + if (!default_params.rx_intr_port_inited) + return 0; + + err = rte_event_eth_rx_adapter_caps_get(TEST_DEV_ID, + default_params.rx_intr_port, + &default_params.caps); + TEST_ASSERT(err == 0, "Failed to get adapter cap err %d\n", err); + + return err; +} + static void testsuite_teardown(void) { @@ -185,6 +296,16 @@ struct event_eth_rx_adapter_test_params { rte_mempool_free(default_params.mp); } +static void +testsuite_teardown_rx_intr(void) +{ + if (!default_params.rx_intr_port_inited) + return; + + rte_eth_dev_stop(default_params.rx_intr_port); + rte_mempool_free(default_params.mp); +} + static int adapter_create(void) { @@ -333,6 +454,89 @@ struct event_eth_rx_adapter_test_params { } static int +adapter_intr_queue_add_del(void) +{ + int err; + struct rte_event ev; + uint32_t cap; + uint16_t eth_port; + struct rte_event_eth_rx_adapter_queue_conf queue_config; + + if (!default_params.rx_intr_port_inited) + return 0; + + eth_port = default_params.rx_intr_port; + err = rte_event_eth_rx_adapter_caps_get(TEST_DEV_ID, eth_port, &cap); + TEST_ASSERT(err == 0, "Expected 0 got %d", err); + + ev.queue_id = 0; + ev.sched_type = RTE_SCHED_TYPE_ATOMIC; + ev.priority = 0; + + queue_config.rx_queue_flags = 0; + queue_config.ev = ev; + + /* weight = 0 => interrupt mode */ + queue_config.servicing_weight = 0; + + /* add queue 0 */ + err = rte_event_eth_rx_adapter_queue_add(TEST_INST_ID, + TEST_ETHDEV_ID, 0, + &queue_config); + TEST_ASSERT(err == 0, "Expected 0 got %d", err); + + /* add all queues */ + queue_config.servicing_weight = 0; + err = rte_event_eth_rx_adapter_queue_add(TEST_INST_ID, + TEST_ETHDEV_ID, + -1, + &queue_config); + TEST_ASSERT(err == 0, "Expected 0 got %d", err); + + /* del queue 0 */ + err = rte_event_eth_rx_adapter_queue_del(TEST_INST_ID, + TEST_ETHDEV_ID, + 0); + TEST_ASSERT(err == 0, "Expected 0 got %d", err); + + /* del remaining queues */ + err = rte_event_eth_rx_adapter_queue_del(TEST_INST_ID, + TEST_ETHDEV_ID, + -1); + TEST_ASSERT(err == 0, "Expected 0 got %d", err); + + /* add all queues */ + queue_config.servicing_weight = 0; + err = rte_event_eth_rx_adapter_queue_add(TEST_INST_ID, + TEST_ETHDEV_ID, + -1, + &queue_config); + TEST_ASSERT(err == 0, "Expected 0 got %d", err); + + /* intr -> poll mode queue */ + queue_config.servicing_weight = 1; + err = rte_event_eth_rx_adapter_queue_add(TEST_INST_ID, + TEST_ETHDEV_ID, + 0, + &queue_config); + TEST_ASSERT(err == 0, "Expected 0 got %d", err); + + err = rte_event_eth_rx_adapter_queue_add(TEST_INST_ID, + TEST_ETHDEV_ID, + -1, + &queue_config); + TEST_ASSERT(err == 0, "Expected 0 got %d", err); + + /* del queues */ + err = rte_event_eth_rx_adapter_queue_del(TEST_INST_ID, + TEST_ETHDEV_ID, + -1); + TEST_ASSERT(err == 0, "Expected 0 got %d", err); + + return TEST_SUCCESS; +} + +static int adapter_start_stop(void) { int err; @@ -402,7 +606,7 @@ struct event_eth_rx_adapter_test_params { return TEST_SUCCESS; } -static struct unit_test_suite service_tests = { +static struct unit_test_suite event_eth_rx_tests = { .suite_name = "rx event eth adapter test suite", .setup = testsuite_setup, .teardown = testsuite_teardown, @@ -416,11 +620,30 @@ struct event_eth_rx_adapter_test_params { } }; +static struct unit_test_suite event_eth_rx_intr_tests = { + .suite_name = "rx event eth adapter test suite", + .setup = testsuite_setup_rx_intr, + .teardown = testsuite_teardown_rx_intr, + .unit_test_cases = { + TEST_CASE_ST(adapter_create, adapter_free, + adapter_intr_queue_add_del), + TEST_CASES_END() /**< NULL terminate unit test array */ + } +}; + static int test_event_eth_rx_adapter_common(void) { - return unit_test_suite_runner(&service_tests); + return unit_test_suite_runner(&event_eth_rx_tests); +} + +static int +test_event_eth_rx_intr_adapter_common(void) +{ + return unit_test_suite_runner(&event_eth_rx_intr_tests); } REGISTER_TEST_COMMAND(event_eth_rx_adapter_autotest, test_event_eth_rx_adapter_common); +REGISTER_TEST_COMMAND(event_eth_rx_intr_adapter_autotest, + test_event_eth_rx_intr_adapter_common); diff --git a/doc/guides/prog_guide/event_ethernet_rx_adapter.rst b/doc/guides/prog_guide/event_ethernet_rx_adapter.rst index 319e4f0..2f055ec 100644 --- a/doc/guides/prog_guide/event_ethernet_rx_adapter.rst +++ b/doc/guides/prog_guide/event_ethernet_rx_adapter.rst @@ -144,3 +144,27 @@ enqueued event counts are a sum of the counts from the eventdev PMD callbacks if the callback is supported, and the counts maintained by the service function, if one exists. The service function also maintains a count of cycles for which it was not able to enqueue to the event device. + +Interrupt Based Rx Queues +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The service core function is typically set up to poll ethernet Rx queues for +packets. Certain queues may have low packet rates and it would be more +efficient to enable the Rx queue interrupt and read packets after receiving +the interrupt. + +The servicing_weight member of struct rte_event_eth_rx_adapter_queue_conf +is applicable when the adapter uses a service core function. The application +has to enable Rx queue interrupts when configuring the ethernet device +uing the ``rte_eth_dev_configue()`` function and then use a servicing_weight +of zero when addding the Rx queue to the adapter. + +The adapter creates a thread blocked on the interrupt, on an interrupt this +thread enqueues the port id and the queue id to a ring buffer. The adapter +service function dequeues the port id and queue id from the ring buffer, +invokes the ``rte_eth_rx_burst()`` to receive packets on the queue and +converts the received packets to events in the same manner as packets +received on a polled Rx queue. The interrupt thread is affinitized to the same +CPUs as the lcores of the Rx adapter service function, if the Rx adapter +service function has not been mapped to any lcores, the interrupt thread +is mapped to the master lcore. diff --git a/config/common_base b/config/common_base index 6b0d1cb..bc32956 100644 --- a/config/common_base +++ b/config/common_base @@ -597,6 +597,7 @@ CONFIG_RTE_LIBRTE_EVENTDEV_DEBUG=n CONFIG_RTE_EVENT_MAX_DEVS=16 CONFIG_RTE_EVENT_MAX_QUEUES_PER_DEV=64 CONFIG_RTE_EVENT_TIMER_ADAPTER_NUM_MAX=32 +CONFIG_RTE_EVENT_ETH_INTR_RING_SIZE=1024 CONFIG_RTE_EVENT_CRYPTO_ADAPTER_MAX_INSTANCE=32 # diff --git a/lib/librte_eventdev/Makefile b/lib/librte_eventdev/Makefile index b3e2546..e269357 100644 --- a/lib/librte_eventdev/Makefile +++ b/lib/librte_eventdev/Makefile @@ -8,14 +8,16 @@ include $(RTE_SDK)/mk/rte.vars.mk LIB = librte_eventdev.a # library version -LIBABIVER := 4 +LIBABIVER := 5 # build flags CFLAGS += -DALLOW_EXPERIMENTAL_API +CFLAGS += -D_GNU_SOURCE CFLAGS += -O3 CFLAGS += $(WERROR_FLAGS) LDLIBS += -lrte_eal -lrte_ring -lrte_ethdev -lrte_hash -lrte_mempool -lrte_timer LDLIBS += -lrte_mbuf -lrte_cryptodev +LDLIBS += -lpthread # library source files SRCS-y += rte_eventdev.c -- 1.8.3.1