DPDK patches and discussions
 help / color / mirror / Atom feed
From: Timothy McDaniel <timothy.mcdaniel@intel.com>
Cc: dev@dpdk.org, erik.g.carrillo@intel.com, gage.eads@intel.com,
	harry.van.haaren@intel.com, jerinj@marvell.com
Subject: [dpdk-dev] [PATCH v5 16/22] event/dlb: add dequeue and its burst variants
Date: Sat, 17 Oct 2020 14:04:10 -0500	[thread overview]
Message-ID: <1602961456-17392-17-git-send-email-timothy.mcdaniel@intel.com> (raw)
In-Reply-To: <1602961456-17392-1-git-send-email-timothy.mcdaniel@intel.com>

Add support for dequeue, dequeue_burst, ...

DLB does not currently support interrupts, but instead uses
umonitor/umwait if supported by the processor. This allows
the software to monitor and wait on writes to a cache-line.

DLB supports normal and sparse cq mode. In normal mode the
hardware will pack 4 QEs into each cache line. In sparse cq
mode, the hardware will only populate one QE per cache line.
Software must be aware of the cq mode, and take the appropriate
actions, based on the mode.

Signed-off-by: Timothy McDaniel <timothy.mcdaniel@intel.com>
---
 drivers/event/dlb/dlb.c | 728 ++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 728 insertions(+)

diff --git a/drivers/event/dlb/dlb.c b/drivers/event/dlb/dlb.c
index e314af1c..2b6a4d0 100644
--- a/drivers/event/dlb/dlb.c
+++ b/drivers/event/dlb/dlb.c
@@ -2812,9 +2812,728 @@ dlb_event_enqueue_forward_burst_delayed(void *event_port,
 	return __dlb_event_enqueue_burst(event_port, events, num);
 }
 
+static __rte_always_inline int
+dlb_recv_qe(struct dlb_port *qm_port, struct dlb_dequeue_qe *qe,
+	    uint8_t *offset)
+{
+	uint8_t xor_mask[2][4] = { {0x0F, 0x0E, 0x0C, 0x08},
+				   {0x00, 0x01, 0x03, 0x07} };
+	uint8_t and_mask[4] = {0x0F, 0x0E, 0x0C, 0x08};
+	volatile struct dlb_dequeue_qe *cq_addr;
+	__m128i *qes = (__m128i *)qe;
+	uint64_t *cache_line_base;
+	uint8_t gen_bits;
+
+	cq_addr = dlb_port[qm_port->id][PORT_TYPE(qm_port)].cq_base;
+	cq_addr = &cq_addr[qm_port->cq_idx];
+
+	cache_line_base = (void *)(((uintptr_t)cq_addr) & ~0x3F);
+	*offset = ((uintptr_t)cq_addr & 0x30) >> 4;
+
+	/* Load the next CQ cache line from memory. Pack these reads as tight
+	 * as possible to reduce the chance that DLB invalidates the line while
+	 * the CPU is reading it. Read the cache line backwards to ensure that
+	 * if QE[N] (N > 0) is valid, then QEs[0:N-1] are too.
+	 *
+	 * (Valid QEs start at &qe[offset])
+	 */
+	qes[3] = _mm_load_si128((__m128i *)&cache_line_base[6]);
+	qes[2] = _mm_load_si128((__m128i *)&cache_line_base[4]);
+	qes[1] = _mm_load_si128((__m128i *)&cache_line_base[2]);
+	qes[0] = _mm_load_si128((__m128i *)&cache_line_base[0]);
+
+	/* Evict the cache line ASAP */
+	dlb_cldemote(cache_line_base);
+
+	/* Extract and combine the gen bits */
+	gen_bits = ((_mm_extract_epi8(qes[0], 15) & 0x1) << 0) |
+		   ((_mm_extract_epi8(qes[1], 15) & 0x1) << 1) |
+		   ((_mm_extract_epi8(qes[2], 15) & 0x1) << 2) |
+		   ((_mm_extract_epi8(qes[3], 15) & 0x1) << 3);
+
+	/* XOR the combined bits such that a 1 represents a valid QE */
+	gen_bits ^= xor_mask[qm_port->gen_bit][*offset];
+
+	/* Mask off gen bits we don't care about */
+	gen_bits &= and_mask[*offset];
+
+	return __builtin_popcount(gen_bits);
+}
+
+static inline void
+dlb_inc_cq_idx(struct dlb_port *qm_port, int cnt)
+{
+	uint16_t idx = qm_port->cq_idx_unmasked + cnt;
+
+	qm_port->cq_idx_unmasked = idx;
+	qm_port->cq_idx = idx & qm_port->cq_depth_mask;
+	qm_port->gen_bit = (~(idx >> qm_port->gen_bit_shift)) & 0x1;
+}
+
+static inline int
+dlb_process_dequeue_qes(struct dlb_eventdev_port *ev_port,
+			struct dlb_port *qm_port,
+			struct rte_event *events,
+			struct dlb_dequeue_qe *qes,
+			int cnt)
+{
+	uint8_t *qid_mappings = qm_port->qid_mappings;
+	int i, num;
+
+	RTE_SET_USED(ev_port);  /* avoids unused variable error */
+
+	for (i = 0, num = 0; i < cnt; i++) {
+		struct dlb_dequeue_qe *qe = &qes[i];
+		int sched_type_map[4] = {
+			[DLB_SCHED_ATOMIC] = RTE_SCHED_TYPE_ATOMIC,
+			[DLB_SCHED_UNORDERED] = RTE_SCHED_TYPE_PARALLEL,
+			[DLB_SCHED_ORDERED] = RTE_SCHED_TYPE_ORDERED,
+			[DLB_SCHED_DIRECTED] = RTE_SCHED_TYPE_ATOMIC,
+		};
+
+		DLB_LOG_DBG("dequeue success, data = 0x%llx, qid=%d, event_type=%d, subevent=%d\npp_id = %d, sched_type = %d, qid = %d, err=%d\n",
+			    (long long)qe->data, qe->qid,
+			    qe->u.event_type.major,
+			    qe->u.event_type.sub,
+			    qe->pp_id, qe->sched_type, qe->qid, qe->error);
+
+		/* Fill in event information.
+		 * Note that flow_id must be embedded in the data by
+		 * the app, such as the mbuf RSS hash field if the data
+		 * buffer is a mbuf.
+		 */
+		if (unlikely(qe->error)) {
+			DLB_LOG_ERR("QE error bit ON\n");
+			DLB_INC_STAT(ev_port->stats.traffic.rx_drop, 1);
+			dlb_consume_qe_immediate(qm_port, 1);
+			continue; /* Ignore */
+		}
+
+		events[num].u64 = qe->data;
+		events[num].queue_id = qid_mappings[qe->qid];
+		events[num].priority = DLB_TO_EV_PRIO((uint8_t)qe->priority);
+		events[num].event_type = qe->u.event_type.major;
+		events[num].sub_event_type = qe->u.event_type.sub;
+		events[num].sched_type = sched_type_map[qe->sched_type];
+		DLB_INC_STAT(ev_port->stats.rx_sched_cnt[qe->sched_type], 1);
+		num++;
+	}
+	DLB_INC_STAT(ev_port->stats.traffic.rx_ok, num);
+
+	return num;
+}
+
+static inline int
+dlb_process_dequeue_four_qes(struct dlb_eventdev_port *ev_port,
+			     struct dlb_port *qm_port,
+			     struct rte_event *events,
+			     struct dlb_dequeue_qe *qes)
+{
+	int sched_type_map[] = {
+		[DLB_SCHED_ATOMIC] = RTE_SCHED_TYPE_ATOMIC,
+		[DLB_SCHED_UNORDERED] = RTE_SCHED_TYPE_PARALLEL,
+		[DLB_SCHED_ORDERED] = RTE_SCHED_TYPE_ORDERED,
+		[DLB_SCHED_DIRECTED] = RTE_SCHED_TYPE_ATOMIC,
+	};
+	const int num_events = DLB_NUM_QES_PER_CACHE_LINE;
+	uint8_t *qid_mappings = qm_port->qid_mappings;
+	__m128i sse_evt[2];
+	int i;
+
+	/* In the unlikely case that any of the QE error bits are set, process
+	 * them one at a time.
+	 */
+	if (unlikely(qes[0].error || qes[1].error ||
+		     qes[2].error || qes[3].error))
+		return dlb_process_dequeue_qes(ev_port, qm_port, events,
+					       qes, num_events);
+
+	for (i = 0; i < DLB_NUM_QES_PER_CACHE_LINE; i++) {
+		DLB_LOG_DBG("dequeue success, data = 0x%llx, qid=%d, event_type=%d, subevent=%d\npp_id = %d, sched_type = %d, qid = %d, err=%d\n",
+			    (long long)qes[i].data, qes[i].qid,
+			    qes[i].u.event_type.major,
+			    qes[i].u.event_type.sub,
+			    qes[i].pp_id, qes[i].sched_type, qes[i].qid,
+			    qes[i].error);
+	}
+
+	events[0].u64 = qes[0].data;
+	events[1].u64 = qes[1].data;
+	events[2].u64 = qes[2].data;
+	events[3].u64 = qes[3].data;
+
+	/* Construct the metadata portion of two struct rte_events
+	 * in one 128b SSE register. Event metadata is constructed in the SSE
+	 * registers like so:
+	 * sse_evt[0][63:0]:   event[0]'s metadata
+	 * sse_evt[0][127:64]: event[1]'s metadata
+	 * sse_evt[1][63:0]:   event[2]'s metadata
+	 * sse_evt[1][127:64]: event[3]'s metadata
+	 */
+	sse_evt[0] = _mm_setzero_si128();
+	sse_evt[1] = _mm_setzero_si128();
+
+	/* Convert the hardware queue ID to an event queue ID and store it in
+	 * the metadata:
+	 * sse_evt[0][47:40]   = qid_mappings[qes[0].qid]
+	 * sse_evt[0][111:104] = qid_mappings[qes[1].qid]
+	 * sse_evt[1][47:40]   = qid_mappings[qes[2].qid]
+	 * sse_evt[1][111:104] = qid_mappings[qes[3].qid]
+	 */
+#define RTE_EVENT_QUEUE_ID_BYTE 5
+	sse_evt[0] = _mm_insert_epi8(sse_evt[0],
+				     qid_mappings[qes[0].qid],
+				     RTE_EVENT_QUEUE_ID_BYTE);
+	sse_evt[0] = _mm_insert_epi8(sse_evt[0],
+				     qid_mappings[qes[1].qid],
+				     RTE_EVENT_QUEUE_ID_BYTE + 8);
+	sse_evt[1] = _mm_insert_epi8(sse_evt[1],
+				     qid_mappings[qes[2].qid],
+				     RTE_EVENT_QUEUE_ID_BYTE);
+	sse_evt[1] = _mm_insert_epi8(sse_evt[1],
+				     qid_mappings[qes[3].qid],
+				     RTE_EVENT_QUEUE_ID_BYTE + 8);
+
+	/* Convert the hardware priority to an event priority and store it in
+	 * the metadata:
+	 * sse_evt[0][55:48]   = DLB_TO_EV_PRIO(qes[0].priority)
+	 * sse_evt[0][119:112] = DLB_TO_EV_PRIO(qes[1].priority)
+	 * sse_evt[1][55:48]   = DLB_TO_EV_PRIO(qes[2].priority)
+	 * sse_evt[1][119:112] = DLB_TO_EV_PRIO(qes[3].priority)
+	 */
+#define RTE_EVENT_PRIO_BYTE 6
+	sse_evt[0] = _mm_insert_epi8(sse_evt[0],
+				     DLB_TO_EV_PRIO((uint8_t)qes[0].priority),
+				     RTE_EVENT_PRIO_BYTE);
+	sse_evt[0] = _mm_insert_epi8(sse_evt[0],
+				     DLB_TO_EV_PRIO((uint8_t)qes[1].priority),
+				     RTE_EVENT_PRIO_BYTE + 8);
+	sse_evt[1] = _mm_insert_epi8(sse_evt[1],
+				     DLB_TO_EV_PRIO((uint8_t)qes[2].priority),
+				     RTE_EVENT_PRIO_BYTE);
+	sse_evt[1] = _mm_insert_epi8(sse_evt[1],
+				     DLB_TO_EV_PRIO((uint8_t)qes[3].priority),
+				     RTE_EVENT_PRIO_BYTE + 8);
+
+	/* Write the event type and sub event type to the event metadata. Leave
+	 * flow ID unspecified, since the hardware does not maintain it during
+	 * scheduling:
+	 * sse_evt[0][31:0]   = qes[0].u.event_type.major << 28 |
+	 *			qes[0].u.event_type.sub << 20;
+	 * sse_evt[0][95:64]  = qes[1].u.event_type.major << 28 |
+	 *			qes[1].u.event_type.sub << 20;
+	 * sse_evt[1][31:0]   = qes[2].u.event_type.major << 28 |
+	 *			qes[2].u.event_type.sub << 20;
+	 * sse_evt[1][95:64]  = qes[3].u.event_type.major << 28 |
+	 *			qes[3].u.event_type.sub << 20;
+	 */
+#define RTE_EVENT_EV_TYPE_DW 0
+#define RTE_EVENT_EV_TYPE_SHIFT 28
+#define RTE_EVENT_SUB_EV_TYPE_SHIFT 20
+	sse_evt[0] = _mm_insert_epi32(sse_evt[0],
+			qes[0].u.event_type.major << RTE_EVENT_EV_TYPE_SHIFT |
+			qes[0].u.event_type.sub << RTE_EVENT_SUB_EV_TYPE_SHIFT,
+			RTE_EVENT_EV_TYPE_DW);
+	sse_evt[0] = _mm_insert_epi32(sse_evt[0],
+			qes[1].u.event_type.major << RTE_EVENT_EV_TYPE_SHIFT |
+			qes[1].u.event_type.sub <<  RTE_EVENT_SUB_EV_TYPE_SHIFT,
+			RTE_EVENT_EV_TYPE_DW + 2);
+	sse_evt[1] = _mm_insert_epi32(sse_evt[1],
+			qes[2].u.event_type.major << RTE_EVENT_EV_TYPE_SHIFT |
+			qes[2].u.event_type.sub <<  RTE_EVENT_SUB_EV_TYPE_SHIFT,
+			RTE_EVENT_EV_TYPE_DW);
+	sse_evt[1] = _mm_insert_epi32(sse_evt[1],
+			qes[3].u.event_type.major << RTE_EVENT_EV_TYPE_SHIFT  |
+			qes[3].u.event_type.sub << RTE_EVENT_SUB_EV_TYPE_SHIFT,
+			RTE_EVENT_EV_TYPE_DW + 2);
+
+	/* Write the sched type to the event metadata. 'op' and 'rsvd' are not
+	 * set:
+	 * sse_evt[0][39:32]  = sched_type_map[qes[0].sched_type] << 6
+	 * sse_evt[0][103:96] = sched_type_map[qes[1].sched_type] << 6
+	 * sse_evt[1][39:32]  = sched_type_map[qes[2].sched_type] << 6
+	 * sse_evt[1][103:96] = sched_type_map[qes[3].sched_type] << 6
+	 */
+#define RTE_EVENT_SCHED_TYPE_BYTE 4
+#define RTE_EVENT_SCHED_TYPE_SHIFT 6
+	sse_evt[0] = _mm_insert_epi8(sse_evt[0],
+		sched_type_map[qes[0].sched_type] << RTE_EVENT_SCHED_TYPE_SHIFT,
+		RTE_EVENT_SCHED_TYPE_BYTE);
+	sse_evt[0] = _mm_insert_epi8(sse_evt[0],
+		sched_type_map[qes[1].sched_type] << RTE_EVENT_SCHED_TYPE_SHIFT,
+		RTE_EVENT_SCHED_TYPE_BYTE + 8);
+	sse_evt[1] = _mm_insert_epi8(sse_evt[1],
+		sched_type_map[qes[2].sched_type] << RTE_EVENT_SCHED_TYPE_SHIFT,
+		RTE_EVENT_SCHED_TYPE_BYTE);
+	sse_evt[1] = _mm_insert_epi8(sse_evt[1],
+		sched_type_map[qes[3].sched_type] << RTE_EVENT_SCHED_TYPE_SHIFT,
+		RTE_EVENT_SCHED_TYPE_BYTE + 8);
+
+	/* Store the metadata to the event (use the double-precision
+	 * _mm_storeh_pd because there is no integer function for storing the
+	 * upper 64b):
+	 * events[0].event = sse_evt[0][63:0]
+	 * events[1].event = sse_evt[0][127:64]
+	 * events[2].event = sse_evt[1][63:0]
+	 * events[3].event = sse_evt[1][127:64]
+	 */
+	_mm_storel_epi64((__m128i *)&events[0].event, sse_evt[0]);
+	_mm_storeh_pd((double *)&events[1].event, (__m128d) sse_evt[0]);
+	_mm_storel_epi64((__m128i *)&events[2].event, sse_evt[1]);
+	_mm_storeh_pd((double *)&events[3].event, (__m128d) sse_evt[1]);
+
+	DLB_INC_STAT(ev_port->stats.rx_sched_cnt[qes[0].sched_type], 1);
+	DLB_INC_STAT(ev_port->stats.rx_sched_cnt[qes[1].sched_type], 1);
+	DLB_INC_STAT(ev_port->stats.rx_sched_cnt[qes[2].sched_type], 1);
+	DLB_INC_STAT(ev_port->stats.rx_sched_cnt[qes[3].sched_type], 1);
+
+	DLB_INC_STAT(ev_port->stats.traffic.rx_ok, num_events);
+
+	return num_events;
+}
+
+static inline bool
+dlb_cq_is_empty(struct dlb_port *qm_port)
+{
+	volatile struct dlb_dequeue_qe *qe_ptr;
+	struct dlb_dequeue_qe qe;
+
+	qe_ptr = dlb_port[qm_port->id][PORT_TYPE(qm_port)].cq_base;
+	qe = qe_ptr[qm_port->cq_idx];
+
+	return (qe.cq_gen != qm_port->gen_bit);
+}
+
+static inline int
+dlb_dequeue_wait(struct dlb_eventdev *dlb,
+		 struct dlb_eventdev_port *ev_port,
+		 struct dlb_port *qm_port,
+		 uint64_t timeout,
+		 uint64_t start_ticks)
+{
+	struct process_local_port_data *port_data;
+	uint64_t elapsed_ticks;
+
+	port_data = &dlb_port[qm_port->id][PORT_TYPE(qm_port)];
+
+	elapsed_ticks = rte_get_timer_cycles() - start_ticks;
+
+	/* Wait/poll time expired */
+	if (elapsed_ticks >= timeout) {
+		/* Interrupts not supported by PF PMD */
+		return 1;
+	} else if (dlb->umwait_allowed) {
+		volatile struct dlb_dequeue_qe *cq_base;
+
+		cq_base = port_data->cq_base;
+
+		/* Block on cache line write to CQ. Note: it's
+		 * safe to access the per-process cq_base
+		 * address here, since the PMD has already
+		 * attempted at least one CQ dequeue.
+		 */
+		dlb_umonitor(&cq_base[qm_port->cq_idx]);
+
+		/* Avoid race condition. Check if still empty */
+		if (dlb_cq_is_empty(qm_port)) {
+			dlb_umwait(RTE_LIBRTE_PMD_DLB_UMWAIT_CTL_STATE,
+				   timeout + start_ticks);
+			DLB_INC_STAT(ev_port->stats.traffic.rx_umonitor_umwait,
+				     1);
+		}
+	} else {
+		uint64_t poll_interval = RTE_LIBRTE_PMD_DLB_POLL_INTERVAL;
+		uint64_t curr_ticks = rte_get_timer_cycles();
+		uint64_t init_ticks = curr_ticks;
+
+		while ((curr_ticks - start_ticks < timeout) &&
+		       (curr_ticks - init_ticks < poll_interval))
+			curr_ticks = rte_get_timer_cycles();
+	}
+
+	return 0;
+}
+
+static inline int16_t
+dlb_hw_dequeue(struct dlb_eventdev *dlb,
+	       struct dlb_eventdev_port *ev_port,
+	       struct rte_event *events,
+	       uint16_t max_num,
+	       uint64_t dequeue_timeout_ticks)
+{
+	uint64_t timeout;
+	uint64_t start_ticks = 0ULL;
+	struct dlb_port *qm_port;
+	int num = 0;
+
+	qm_port = &ev_port->qm_port;
+
+	/* If configured for per dequeue wait, then use wait value provided
+	 * to this API. Otherwise we must use the global
+	 * value from eventdev config time.
+	 */
+	if (!dlb->global_dequeue_wait)
+		timeout = dequeue_timeout_ticks;
+	else
+		timeout = dlb->global_dequeue_wait_ticks;
+
+	if (timeout)
+		start_ticks = rte_get_timer_cycles();
+
+	while (num < max_num) {
+		struct dlb_dequeue_qe qes[DLB_NUM_QES_PER_CACHE_LINE];
+		uint8_t offset;
+		int num_avail;
+
+		/* Copy up to 4 QEs from the current cache line into qes */
+		num_avail = dlb_recv_qe(qm_port, qes, &offset);
+
+		/* But don't process more than the user requested */
+		num_avail = RTE_MIN(num_avail, max_num - num);
+
+		dlb_inc_cq_idx(qm_port, num_avail);
+
+		if (num_avail == DLB_NUM_QES_PER_CACHE_LINE)
+			num += dlb_process_dequeue_four_qes(ev_port,
+							     qm_port,
+							     &events[num],
+							     &qes[offset]);
+		else if (num_avail)
+			num += dlb_process_dequeue_qes(ev_port,
+							qm_port,
+							&events[num],
+							&qes[offset],
+							num_avail);
+		else if ((timeout == 0) || (num > 0))
+			/* Not waiting in any form, or 1+ events received? */
+			break;
+		else if (dlb_dequeue_wait(dlb, ev_port, qm_port,
+					  timeout, start_ticks))
+			break;
+	}
+
+	qm_port->owed_tokens += num;
+
+	dlb_consume_qe_immediate(qm_port, num);
+
+	ev_port->outstanding_releases += num;
+
+	return num;
+}
+
+static inline int
+dlb_process_dequeue_qe(struct dlb_eventdev_port *ev_port __rte_unused,
+		       struct dlb_port *qm_port,
+		       struct rte_event *event,
+		       struct dlb_dequeue_qe *qe)
+{
+	int sched_type_map[4] = {
+		[DLB_SCHED_ATOMIC] = RTE_SCHED_TYPE_ATOMIC,
+		[DLB_SCHED_UNORDERED] = RTE_SCHED_TYPE_PARALLEL,
+		[DLB_SCHED_ORDERED] = RTE_SCHED_TYPE_ORDERED,
+		[DLB_SCHED_DIRECTED] = RTE_SCHED_TYPE_ATOMIC,
+	};
+	uint8_t *qid_mappings = qm_port->qid_mappings;
+
+	DLB_LOG_DBG("dequeue success, data = 0x%llx, qid=%d, event_type=%d, subevent=%d\npp_id = %d, sched_type = %d, qid = %d, err=%d\n",
+		    (long long)qe->data, qe->qid,
+		    qe->u.event_type.major,
+		    qe->u.event_type.sub,
+		    qe->pp_id, qe->sched_type, qe->qid, qe->error);
+
+	/* Fill in event information.
+	 * Note that flow_id must be embedded in the data by
+	 * the app, such as the mbuf RSS hash field if the data
+	 * buffer is a mbuf.
+	 */
+	if (unlikely(qe->error)) {
+		DLB_LOG_ERR("QE error bit ON\n");
+		DLB_INC_STAT(ev_port->stats.traffic.rx_drop, 1);
+		dlb_consume_qe_immediate(qm_port, 1);
+		return 0;
+	}
+
+	event->u64 = qe->data;
+	event->queue_id = qid_mappings[qe->qid];
+	event->priority = DLB_TO_EV_PRIO((uint8_t)qe->priority);
+	event->event_type = qe->u.event_type.major;
+	event->sub_event_type = qe->u.event_type.sub;
+	event->sched_type = sched_type_map[qe->sched_type];
+	DLB_INC_STAT(ev_port->stats.rx_sched_cnt[qe->sched_type], 1);
+
+	DLB_INC_STAT(ev_port->stats.traffic.rx_ok, 1);
+
+	return 1;
+}
+
+static __rte_always_inline int
+dlb_recv_qe_sparse(struct dlb_port *qm_port, struct dlb_dequeue_qe *qe)
+{
+	volatile struct dlb_dequeue_qe *cq_addr;
+	uint8_t xor_mask[2] = {0x0F, 0x00};
+	const uint8_t and_mask = 0x0F;
+	__m128i *qes = (__m128i *)qe;
+	uint8_t gen_bits, gen_bit;
+	uintptr_t addr[4];
+	uint16_t idx;
+
+	cq_addr = dlb_port[qm_port->id][PORT_TYPE(qm_port)].cq_base;
+
+	idx = qm_port->cq_idx;
+
+	/* Load the next 4 QEs */
+	addr[0] = (uintptr_t)&cq_addr[idx];
+	addr[1] = (uintptr_t)&cq_addr[(idx +  4) & qm_port->cq_depth_mask];
+	addr[2] = (uintptr_t)&cq_addr[(idx +  8) & qm_port->cq_depth_mask];
+	addr[3] = (uintptr_t)&cq_addr[(idx + 12) & qm_port->cq_depth_mask];
+
+	/* Prefetch next batch of QEs (all CQs occupy minimum 8 cache lines) */
+	rte_prefetch0(&cq_addr[(idx + 16) & qm_port->cq_depth_mask]);
+	rte_prefetch0(&cq_addr[(idx + 20) & qm_port->cq_depth_mask]);
+	rte_prefetch0(&cq_addr[(idx + 24) & qm_port->cq_depth_mask]);
+	rte_prefetch0(&cq_addr[(idx + 28) & qm_port->cq_depth_mask]);
+
+	/* Correct the xor_mask for wrap-around QEs */
+	gen_bit = qm_port->gen_bit;
+	xor_mask[gen_bit] ^= !!((idx +  4) > qm_port->cq_depth_mask) << 1;
+	xor_mask[gen_bit] ^= !!((idx +  8) > qm_port->cq_depth_mask) << 2;
+	xor_mask[gen_bit] ^= !!((idx + 12) > qm_port->cq_depth_mask) << 3;
+
+	/* Read the cache lines backwards to ensure that if QE[N] (N > 0) is
+	 * valid, then QEs[0:N-1] are too.
+	 */
+	qes[3] = _mm_load_si128((__m128i *)(void *)addr[3]);
+	rte_compiler_barrier();
+	qes[2] = _mm_load_si128((__m128i *)(void *)addr[2]);
+	rte_compiler_barrier();
+	qes[1] = _mm_load_si128((__m128i *)(void *)addr[1]);
+	rte_compiler_barrier();
+	qes[0] = _mm_load_si128((__m128i *)(void *)addr[0]);
+
+	/* Extract and combine the gen bits */
+	gen_bits = ((_mm_extract_epi8(qes[0], 15) & 0x1) << 0) |
+		   ((_mm_extract_epi8(qes[1], 15) & 0x1) << 1) |
+		   ((_mm_extract_epi8(qes[2], 15) & 0x1) << 2) |
+		   ((_mm_extract_epi8(qes[3], 15) & 0x1) << 3);
+
+	/* XOR the combined bits such that a 1 represents a valid QE */
+	gen_bits ^= xor_mask[gen_bit];
+
+	/* Mask off gen bits we don't care about */
+	gen_bits &= and_mask;
+
+	return __builtin_popcount(gen_bits);
+}
+
+static inline int16_t
+dlb_hw_dequeue_sparse(struct dlb_eventdev *dlb,
+		      struct dlb_eventdev_port *ev_port,
+		      struct rte_event *events,
+		      uint16_t max_num,
+		      uint64_t dequeue_timeout_ticks)
+{
+	uint64_t timeout;
+	uint64_t start_ticks = 0ULL;
+	struct dlb_port *qm_port;
+	int num = 0;
+
+	qm_port = &ev_port->qm_port;
+
+	/* If configured for per dequeue wait, then use wait value provided
+	 * to this API. Otherwise we must use the global
+	 * value from eventdev config time.
+	 */
+	if (!dlb->global_dequeue_wait)
+		timeout = dequeue_timeout_ticks;
+	else
+		timeout = dlb->global_dequeue_wait_ticks;
+
+	if (timeout)
+		start_ticks = rte_get_timer_cycles();
+
+	while (num < max_num) {
+		struct dlb_dequeue_qe qes[DLB_NUM_QES_PER_CACHE_LINE];
+		int num_avail;
+
+		/* Copy up to 4 QEs from the current cache line into qes */
+		num_avail = dlb_recv_qe_sparse(qm_port, qes);
+
+		/* But don't process more than the user requested */
+		num_avail = RTE_MIN(num_avail, max_num - num);
+
+		dlb_inc_cq_idx(qm_port, num_avail << 2);
+
+		if (num_avail == DLB_NUM_QES_PER_CACHE_LINE)
+			num += dlb_process_dequeue_four_qes(ev_port,
+							     qm_port,
+							     &events[num],
+							     &qes[0]);
+		else if (num_avail)
+			num += dlb_process_dequeue_qes(ev_port,
+							qm_port,
+							&events[num],
+							&qes[0],
+							num_avail);
+		else if ((timeout == 0) || (num > 0))
+			/* Not waiting in any form, or 1+ events received? */
+			break;
+		else if (dlb_dequeue_wait(dlb, ev_port, qm_port,
+					  timeout, start_ticks))
+			break;
+	}
+
+	qm_port->owed_tokens += num;
+
+	dlb_consume_qe_immediate(qm_port, num);
+
+	ev_port->outstanding_releases += num;
+
+	return num;
+}
+
+static int
+dlb_event_release(struct dlb_eventdev *dlb, uint8_t port_id, int n)
+{
+	struct process_local_port_data *port_data;
+	struct dlb_eventdev_port *ev_port;
+	struct dlb_port *qm_port;
+	int i;
+
+	if (port_id > dlb->num_ports) {
+		DLB_LOG_ERR("Invalid port id %d in dlb-event_release\n",
+			    port_id);
+		rte_errno = -EINVAL;
+		return rte_errno;
+	}
+
+	ev_port = &dlb->ev_ports[port_id];
+	qm_port = &ev_port->qm_port;
+	port_data = &dlb_port[qm_port->id][PORT_TYPE(qm_port)];
+
+	i = 0;
+
+	if (qm_port->is_directed) {
+		i = n;
+		goto sw_credit_update;
+	}
+
+	while (i < n) {
+		int pop_offs = 0;
+		int j = 0;
+
+		/* Zero-out QEs */
+		qm_port->qe4[0].cmd_byte = 0;
+		qm_port->qe4[1].cmd_byte = 0;
+		qm_port->qe4[2].cmd_byte = 0;
+		qm_port->qe4[3].cmd_byte = 0;
+
+		for (; j < DLB_NUM_QES_PER_CACHE_LINE && (i + j) < n; j++) {
+
+			qm_port->qe4[j].cmd_byte = DLB_COMP_CMD_BYTE;
+			qm_port->issued_releases++;
+		}
+
+		dlb_hw_do_enqueue(qm_port, i == 0, port_data);
+
+		/* Don't include the token pop QE in the release count */
+		i += j - pop_offs;
+	}
+
+sw_credit_update:
+	/* each release returns one credit */
+	if (!ev_port->outstanding_releases) {
+		DLB_LOG_ERR("Unrecoverable application error. Outstanding releases underflowed.\n");
+		rte_errno = -ENOTRECOVERABLE;
+		return rte_errno;
+	}
+
+	ev_port->outstanding_releases -= i;
+	ev_port->inflight_credits += i;
+
+	/* Replenish s/w credits if enough releases are performed */
+	dlb_replenish_sw_credits(dlb, ev_port);
+	return 0;
+}
+
+static uint16_t
+dlb_event_dequeue_burst(void *event_port, struct rte_event *ev, uint16_t num,
+			uint64_t wait)
+{
+	struct dlb_eventdev_port *ev_port = event_port;
+	struct dlb_eventdev *dlb = ev_port->dlb;
+	uint16_t cnt;
+	int ret;
+
+	rte_errno = 0;
+
+	RTE_ASSERT(ev_port->setup_done);
+	RTE_ASSERT(ev != NULL);
+
+	if (ev_port->implicit_release && ev_port->outstanding_releases > 0) {
+		uint16_t out_rels = ev_port->outstanding_releases;
+
+		ret = dlb_event_release(dlb, ev_port->id, out_rels);
+		if (ret)
+			return(ret);
+
+		DLB_INC_STAT(ev_port->stats.tx_implicit_rel, out_rels);
+	}
+
+	cnt = dlb_hw_dequeue(dlb, ev_port, ev, num, wait);
+
+	DLB_INC_STAT(ev_port->stats.traffic.total_polls, 1);
+	DLB_INC_STAT(ev_port->stats.traffic.zero_polls, ((cnt == 0) ? 1 : 0));
+	return cnt;
+}
+
+static uint16_t
+dlb_event_dequeue(void *event_port, struct rte_event *ev, uint64_t wait)
+{
+	return dlb_event_dequeue_burst(event_port, ev, 1, wait);
+}
+
+static uint16_t
+dlb_event_dequeue_burst_sparse(void *event_port, struct rte_event *ev,
+			       uint16_t num, uint64_t wait)
+{
+	struct dlb_eventdev_port *ev_port = event_port;
+	struct dlb_eventdev *dlb = ev_port->dlb;
+	uint16_t cnt;
+	int ret;
+
+	rte_errno = 0;
+
+	RTE_ASSERT(ev_port->setup_done);
+	RTE_ASSERT(ev != NULL);
+
+	if (ev_port->implicit_release && ev_port->outstanding_releases > 0) {
+		uint16_t out_rels = ev_port->outstanding_releases;
+
+		ret = dlb_event_release(dlb, ev_port->id, out_rels);
+		if (ret)
+			return(ret);
+
+		DLB_INC_STAT(ev_port->stats.tx_implicit_rel, out_rels);
+	}
+
+	cnt = dlb_hw_dequeue_sparse(dlb, ev_port, ev, num, wait);
+
+	DLB_INC_STAT(ev_port->stats.traffic.total_polls, 1);
+	DLB_INC_STAT(ev_port->stats.traffic.zero_polls, ((cnt == 0) ? 1 : 0));
+	return cnt;
+}
+
+static uint16_t
+dlb_event_dequeue_sparse(void *event_port, struct rte_event *ev, uint64_t wait)
+{
+	return dlb_event_dequeue_burst_sparse(event_port, ev, 1, wait);
+}
+
 void
 dlb_entry_points_init(struct rte_eventdev *dev)
 {
+	struct dlb_eventdev *dlb;
+
 	static struct rte_eventdev_ops dlb_eventdev_entry_ops = {
 		.dev_infos_get    = dlb_eventdev_info_get,
 		.dev_configure    = dlb_eventdev_configure,
@@ -2841,6 +3560,15 @@ dlb_entry_points_init(struct rte_eventdev *dev)
 	dev->enqueue_burst = dlb_event_enqueue_burst;
 	dev->enqueue_new_burst = dlb_event_enqueue_new_burst;
 	dev->enqueue_forward_burst = dlb_event_enqueue_forward_burst;
+	dev->dequeue = dlb_event_dequeue;
+	dev->dequeue_burst = dlb_event_dequeue_burst;
+
+	dlb = dev->data->dev_private;
+
+	if (dlb->poll_mode == DLB_CQ_POLL_MODE_SPARSE) {
+		dev->dequeue = dlb_event_dequeue_sparse;
+		dev->dequeue_burst = dlb_event_dequeue_burst_sparse;
+	}
 }
 
 int
-- 
2.6.4


  parent reply	other threads:[~2020-10-17 19:08 UTC|newest]

Thread overview: 95+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
     [not found] <1593232671-5690-0-git-send-email-timothy.mcdaniel@intel.com>
2020-07-30 19:49 ` [dpdk-dev] [PATCH 00/27] Add Intel DLM PMD to 20.11 McDaniel, Timothy
2020-07-30 19:49   ` [dpdk-dev] [PATCH 01/27] eventdev: dlb upstream prerequisites McDaniel, Timothy
2020-08-11 17:44     ` Jerin Jacob
2020-10-17 19:03     ` [dpdk-dev] [PATCH v5 00/22] Add DLB PMD Timothy McDaniel
2020-10-17 19:03       ` [dpdk-dev] [PATCH v5 01/22] event/dlb: add documentation and meson infrastructure Timothy McDaniel
2020-10-20 20:06         ` Eads, Gage
2020-10-17 19:03       ` [dpdk-dev] [PATCH v5 02/22] event/dlb: add dynamic logging Timothy McDaniel
2020-10-17 19:03       ` [dpdk-dev] [PATCH v5 03/22] event/dlb: add private data structures and constants Timothy McDaniel
2020-10-20 20:06         ` Eads, Gage
2020-10-17 19:03       ` [dpdk-dev] [PATCH v5 04/22] event/dlb: add definitions shared with LKM or shared code Timothy McDaniel
2020-10-17 19:03       ` [dpdk-dev] [PATCH v5 05/22] event/dlb: add inline functions Timothy McDaniel
2020-10-17 19:04       ` [dpdk-dev] [PATCH v5 06/22] event/dlb: add probe Timothy McDaniel
2020-10-18 12:49         ` Jerin Jacob
2020-10-20 20:06         ` Eads, Gage
2020-10-17 19:04       ` [dpdk-dev] [PATCH v5 07/22] event/dlb: add xstats Timothy McDaniel
2020-10-20 20:06         ` Eads, Gage
2020-10-17 19:04       ` [dpdk-dev] [PATCH v5 08/22] event/dlb: add infos get and configure Timothy McDaniel
2020-10-20 20:06         ` Eads, Gage
2020-10-17 19:04       ` [dpdk-dev] [PATCH v5 09/22] event/dlb: add queue and port default conf Timothy McDaniel
2020-10-17 19:04       ` [dpdk-dev] [PATCH v5 10/22] event/dlb: add queue setup Timothy McDaniel
2020-10-17 19:04       ` [dpdk-dev] [PATCH v5 11/22] event/dlb: add port setup Timothy McDaniel
2020-10-20 20:06         ` Eads, Gage
2020-10-17 19:04       ` [dpdk-dev] [PATCH v5 12/22] event/dlb: add port link Timothy McDaniel
2020-10-17 19:04       ` [dpdk-dev] [PATCH v5 13/22] event/dlb: add port unlink and port unlinks in progress Timothy McDaniel
2020-10-17 19:04       ` [dpdk-dev] [PATCH v5 14/22] event/dlb: add eventdev start Timothy McDaniel
2020-10-17 19:04       ` [dpdk-dev] [PATCH v5 15/22] event/dlb: add enqueue and its burst variants Timothy McDaniel
2020-10-17 19:04       ` Timothy McDaniel [this message]
2020-10-20 20:06         ` [dpdk-dev] [PATCH v5 16/22] event/dlb: add dequeue " Eads, Gage
2020-10-17 19:04       ` [dpdk-dev] [PATCH v5 17/22] event/dlb: add eventdev stop and close Timothy McDaniel
2020-10-17 19:04       ` [dpdk-dev] [PATCH v5 18/22] event/dlb: add PMD's token pop public interface Timothy McDaniel
2020-10-17 19:04       ` [dpdk-dev] [PATCH v5 19/22] event/dlb: add PMD self-tests Timothy McDaniel
2020-10-17 19:04       ` [dpdk-dev] [PATCH v5 20/22] event/dlb: add queue and port release Timothy McDaniel
2020-10-20 20:06         ` Eads, Gage
2020-10-17 19:04       ` [dpdk-dev] [PATCH v5 21/22] event/dlb: add timeout ticks entry point Timothy McDaniel
2020-10-17 19:04       ` [dpdk-dev] [PATCH v5 22/22] doc: Add new DLB eventdev driver to relnotes Timothy McDaniel
2020-10-20 20:06         ` Eads, Gage
2020-10-23 18:32     ` [dpdk-dev] [PATCH v6 00/23] Add DLB PMD Timothy McDaniel
2020-10-23 18:32       ` [dpdk-dev] [PATCH v6 01/23] event/dlb: add documentation and meson infrastructure Timothy McDaniel
2020-10-24 13:05         ` Jerin Jacob
2020-10-26 16:02           ` McDaniel, Timothy
2020-10-24 14:05         ` Thomas Monjalon
2020-10-26 16:12           ` McDaniel, Timothy
2020-10-23 18:32       ` [dpdk-dev] [PATCH v6 02/23] event/dlb: add dynamic logging Timothy McDaniel
2020-10-23 18:32       ` [dpdk-dev] [PATCH v6 03/23] event/dlb: add private data structures and constants Timothy McDaniel
2020-10-23 18:32       ` [dpdk-dev] [PATCH v6 04/23] event/dlb: add definitions shared with LKM or shared code Timothy McDaniel
2020-10-23 18:32       ` [dpdk-dev] [PATCH v6 05/23] event/dlb: add inline functions Timothy McDaniel
2020-10-23 18:32       ` [dpdk-dev] [PATCH v6 06/23] event/dlb: add eventdev probe Timothy McDaniel
2020-10-23 18:32       ` [dpdk-dev] [PATCH v6 07/23] event/dlb: add flexible interface Timothy McDaniel
2020-10-23 18:32       ` [dpdk-dev] [PATCH v6 08/23] event/dlb: add probe-time hardware init Timothy McDaniel
2020-10-23 18:32       ` [dpdk-dev] [PATCH v6 09/23] event/dlb: add xstats Timothy McDaniel
2020-10-23 18:32       ` [dpdk-dev] [PATCH v6 10/23] event/dlb: add infos get and configure Timothy McDaniel
2020-10-23 18:32       ` [dpdk-dev] [PATCH v6 11/23] event/dlb: add queue and port default conf Timothy McDaniel
2020-10-23 18:32       ` [dpdk-dev] [PATCH v6 12/23] event/dlb: add queue setup Timothy McDaniel
2020-10-23 18:32       ` [dpdk-dev] [PATCH v6 13/23] event/dlb: add port setup Timothy McDaniel
2020-10-23 18:32       ` [dpdk-dev] [PATCH v6 14/23] event/dlb: add port link Timothy McDaniel
2020-10-23 18:32       ` [dpdk-dev] [PATCH v6 15/23] event/dlb: add port unlink and port unlinks in progress Timothy McDaniel
2020-10-23 18:32       ` [dpdk-dev] [PATCH v6 16/23] event/dlb: add eventdev start Timothy McDaniel
2020-10-23 18:32       ` [dpdk-dev] [PATCH v6 17/23] event/dlb: add enqueue and its burst variants Timothy McDaniel
2020-10-23 18:32       ` [dpdk-dev] [PATCH v6 18/23] event/dlb: add dequeue " Timothy McDaniel
2020-10-23 18:32       ` [dpdk-dev] [PATCH v6 19/23] event/dlb: add eventdev stop and close Timothy McDaniel
2020-10-23 18:32       ` [dpdk-dev] [PATCH v6 20/23] event/dlb: add PMD's token pop public interface Timothy McDaniel
2020-10-23 18:32       ` [dpdk-dev] [PATCH v6 21/23] event/dlb: add PMD self-tests Timothy McDaniel
2020-10-23 18:32       ` [dpdk-dev] [PATCH v6 22/23] event/dlb: add queue and port release Timothy McDaniel
2020-10-23 18:32       ` [dpdk-dev] [PATCH v6 23/23] event/dlb: add timeout ticks entry point Timothy McDaniel
2020-07-30 19:49   ` [dpdk-dev] [PATCH 02/27] eventdev: do not pass disable_implicit_release bit to trace macro McDaniel, Timothy
2020-08-11 17:48     ` Jerin Jacob
2020-07-30 19:49   ` [dpdk-dev] [PATCH 03/27] event/dlb: add shared code version 10.7.9 McDaniel, Timothy
2020-08-11 18:22     ` [dpdk-dev] [EXT] " Jerin Jacob Kollanukkaran
2020-07-30 19:49   ` [dpdk-dev] [PATCH 04/27] event/dlb: add make and meson build infrastructure McDaniel, Timothy
2020-08-11 18:24     ` Jerin Jacob
2020-07-30 19:49   ` [dpdk-dev] [PATCH 05/27] event/dlb: add DLB documentation McDaniel, Timothy
2020-08-11 18:26     ` Jerin Jacob
2020-07-30 19:49   ` [dpdk-dev] [PATCH 06/27] event/dlb: add dynamic logging McDaniel, Timothy
2020-08-11 18:27     ` Jerin Jacob
2020-07-30 19:49   ` [dpdk-dev] [PATCH 07/27] event/dlb: add private data structures and constants McDaniel, Timothy
2020-07-30 19:49   ` [dpdk-dev] [PATCH 08/27] event/dlb: add definitions shared with LKM or shared code McDaniel, Timothy
2020-07-30 19:49   ` [dpdk-dev] [PATCH 09/27] event/dlb: add inline functions used in multiple files McDaniel, Timothy
2020-07-30 19:49   ` [dpdk-dev] [PATCH 10/27] event/dlb: add PFPMD-specific interface layer to shared code McDaniel, Timothy
2020-07-30 19:49   ` [dpdk-dev] [PATCH 11/27] event/dlb: add flexible PMD to device interfaces McDaniel, Timothy
2020-07-30 19:49   ` [dpdk-dev] [PATCH 12/27] event/dlb: add the PMD's public interfaces McDaniel, Timothy
2020-07-30 19:50   ` [dpdk-dev] [PATCH 13/27] event/dlb: add xstats support McDaniel, Timothy
2020-07-30 19:50   ` [dpdk-dev] [PATCH 14/27] event/dlb: add PMD self-tests McDaniel, Timothy
2020-07-30 19:50   ` [dpdk-dev] [PATCH 15/27] event/dlb: add probe McDaniel, Timothy
2020-07-30 19:50   ` [dpdk-dev] [PATCH 16/27] event/dlb: add infos_get and configure McDaniel, Timothy
2020-07-30 19:50   ` [dpdk-dev] [PATCH 17/27] event/dlb: add queue_def_conf and port_def_conf McDaniel, Timothy
2020-07-30 19:50   ` [dpdk-dev] [PATCH 18/27] event/dlb: add queue setup McDaniel, Timothy
2020-07-30 19:50   ` [dpdk-dev] [PATCH 19/27] event/dlb: add port_setup McDaniel, Timothy
2020-07-30 19:50   ` [dpdk-dev] [PATCH 20/27] event/dlb: add port_link McDaniel, Timothy
2020-07-30 19:50   ` [dpdk-dev] [PATCH 21/27] event/dlb: add queue_release and port_release McDaniel, Timothy
2020-07-30 19:50   ` [dpdk-dev] [PATCH 22/27] event/dlb: add port_unlink and port_unlinks_in_progress McDaniel, Timothy
2020-07-30 19:50   ` [dpdk-dev] [PATCH 23/27] event/dlb: add eventdev_start McDaniel, Timothy
2020-07-30 19:50   ` [dpdk-dev] [PATCH 24/27] event/dlb: add timeout_ticks, dump, xstats, and selftest McDaniel, Timothy
2020-07-30 19:50   ` [dpdk-dev] [PATCH 25/27] event/dlb: add enqueue and its burst variants McDaniel, Timothy
2020-07-30 19:50   ` [dpdk-dev] [PATCH 26/27] event/dlb: add dequeue, dequeue_burst, and variants McDaniel, Timothy
2020-07-30 19:50   ` [dpdk-dev] [PATCH 27/27] event/dlb: add eventdev_stop and eventdev_close McDaniel, Timothy

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1602961456-17392-17-git-send-email-timothy.mcdaniel@intel.com \
    --to=timothy.mcdaniel@intel.com \
    --cc=dev@dpdk.org \
    --cc=erik.g.carrillo@intel.com \
    --cc=gage.eads@intel.com \
    --cc=harry.van.haaren@intel.com \
    --cc=jerinj@marvell.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).