DPDK patches and discussions
 help / color / mirror / Atom feed
* [dpdk-dev] [PATCH] event/dlb2: Optimize Dequeue Operations
@ 2021-03-17 17:02 Timothy McDaniel
  2021-03-21 11:10 ` Jerin Jacob
                   ` (2 more replies)
  0 siblings, 3 replies; 9+ messages in thread
From: Timothy McDaniel @ 2021-03-17 17:02 UTC (permalink / raw)
  To: Bruce Richardson
  Cc: dev, jerinj, harry.van.haaren, mdr, nhorman, nikhil.rao,
	erik.g.carrillo, abhinandan.gujjar, pbhagavatula, hemant.agrawal,
	mattias.ronnblom, peter.mccarthy

Convert code to use x86 vector instructions, thereby significantly
improving dequeue performance.

Signed-off-by: Timothy McDaniel <timothy.mcdaniel@intel.com>
---
 config/rte_config.h            |    1 +
 drivers/event/dlb2/dlb2.c      |  607 ++++++++++++++++++++++++++++++++++++----
 drivers/event/dlb2/dlb2_priv.h |   19 +-
 3 files changed, 574 insertions(+), 53 deletions(-)

diff --git a/config/rte_config.h b/config/rte_config.h
index aedb68c..133ca35 100644
--- a/config/rte_config.h
+++ b/config/rte_config.h
@@ -144,5 +144,6 @@
 #undef RTE_LIBRTE_PMD_DLB2_QUELL_STATS
 #define RTE_LIBRTE_PMD_DLB2_SW_CREDIT_QUANTA 32
 #define RTE_PMD_DLB2_DEFAULT_DEPTH_THRESH 256
+#define RTE_LIBRTE_PMD_DLB2_VECTOR_CODE 1
 
 #endif /* _RTE_CONFIG_H_ */
diff --git a/drivers/event/dlb2/dlb2.c b/drivers/event/dlb2/dlb2.c
index a4a7db4..adcd3dd 100644
--- a/drivers/event/dlb2/dlb2.c
+++ b/drivers/event/dlb2/dlb2.c
@@ -1186,6 +1186,37 @@ static int dlb2_num_dir_queues_setup(struct dlb2_eventdev *dlb2)
 					 const struct rte_event events[],
 					 uint16_t num);
 
+/* Generate the required bitmask for rotate-style expected QE gen bits.
+ * This requires a pattern of 1's and zeros, starting with expected as
+ * 1 bits, so when hardware writes 0's they're "new". This requires the
+ * ring size to be powers of 2 to wrap correctly.
+ */
+static void
+dlb2_hw_cq_bitmask_init(struct dlb2_port *qm_port, uint32_t cq_depth)
+{
+	uint64_t cq_build_mask = 0;
+	uint32_t i;
+
+	if (cq_depth > 64)
+		return; /* need to fall back to scalar code */
+
+	/*
+	 * all 1's in first u64, all zeros in 2nd is correct bit pattern to
+	 * start. Special casing == 64 easier than adapting complex loop logic.
+	 */
+	if (cq_depth == 64) {
+		qm_port->cq_rolling_mask = 0;
+		qm_port->cq_rolling_mask_2 = -1;
+		return;
+	}
+
+	for (i = 0; i < 64; i += (cq_depth * 2))
+		cq_build_mask |= ((1ULL << cq_depth) - 1) << (i + cq_depth);
+
+	qm_port->cq_rolling_mask = cq_build_mask;
+	qm_port->cq_rolling_mask_2 = cq_build_mask;
+}
+
 static int
 dlb2_hw_create_ldb_port(struct dlb2_eventdev *dlb2,
 			struct dlb2_eventdev_port *ev_port,
@@ -1303,6 +1334,8 @@ static int dlb2_num_dir_queues_setup(struct dlb2_eventdev *dlb2)
 	/* starting value of gen bit - it toggles at wrap time */
 	qm_port->gen_bit = 1;
 
+	dlb2_hw_cq_bitmask_init(qm_port, qm_port->cq_depth);
+
 	qm_port->int_armed = false;
 
 	/* Save off for later use in info and lookup APIs. */
@@ -1354,6 +1387,17 @@ static int dlb2_num_dir_queues_setup(struct dlb2_eventdev *dlb2)
 			     dequeue_depth,
 			     qm_port->credits);
 	}
+
+	qm_port->use_scalar = false;
+
+#if (!defined RTE_ARCH_X86_64) || (!defined RTE_LIBRTE_PMD_DLB2_VECTOR_CODE)
+	qm_port->use_scalar = true;
+#else
+	if ((qm_port->cq_depth > 64) ||
+	    (!rte_is_power_of_2(qm_port->cq_depth)))
+		qm_port->use_scalar = true;
+#endif
+
 	rte_spinlock_unlock(&handle->resource_lock);
 
 	return 0;
@@ -1499,6 +1543,7 @@ static int dlb2_num_dir_queues_setup(struct dlb2_eventdev *dlb2)
 	qm_port->gen_bit_shift = __builtin_popcount(qm_port->cq_depth_mask);
 	/* starting value of gen bit - it toggles at wrap time */
 	qm_port->gen_bit = 1;
+	dlb2_hw_cq_bitmask_init(qm_port, qm_port->cq_depth);
 
 	qm_port->int_armed = false;
 
@@ -1539,6 +1584,15 @@ static int dlb2_num_dir_queues_setup(struct dlb2_eventdev *dlb2)
 			     dequeue_depth,
 			     credit_high_watermark);
 	}
+
+#if (!defined RTE_ARCH_X86_64) || (!defined RTE_LIBRTE_PMD_DLB2_VECTOR_CODE)
+	qm_port->use_scalar = true;
+#else
+	if ((qm_port->cq_depth > 64) ||
+	    (!rte_is_power_of_2(qm_port->cq_depth)))
+		qm_port->use_scalar = true;
+#endif
+
 	rte_spinlock_unlock(&handle->resource_lock);
 
 	return 0;
@@ -2424,6 +2478,203 @@ static int dlb2_num_dir_queues_setup(struct dlb2_eventdev *dlb2)
 }
 
 static inline void
+dlb2_event_build_hcws_vec(struct dlb2_port *qm_port,
+			  const struct rte_event ev[],
+			  int num,
+			  uint8_t *sched_type,
+			  uint8_t *queue_id)
+{
+	const __m128i *v_evs = (const __m128i *)ev;
+
+	/* Useful for mask creation in all sections */
+	const __m128i v_zeros = _mm_setzero_si128();
+	const __m128i v_ones = _mm_cmpeq_epi8(v_zeros, v_zeros);
+
+	/* LSB lanes are pkt 0, then pkt 1, pkt 2, MSB lanes are pkt 3 */
+	__m128i v_unpack_02 = _mm_unpacklo_epi32(v_evs[0], v_evs[2]);
+	__m128i v_unpack_13 = _mm_unpacklo_epi32(v_evs[1], v_evs[3]);
+	__m128i v_unpack_b32_63 = _mm_unpackhi_epi32(v_unpack_02, v_unpack_13);
+	__m128i v_unpack_b00_31 = _mm_unpacklo_epi32(v_unpack_02, v_unpack_13);
+
+	/* HCW contents for 4x QEs built up in here at u32 per QE */
+	__m128i v_qe_hcws;
+
+	/* Event OP processing to cmd_byte:
+	 * qe[i].cmd_byte = cmd_byte_map[qm_port->is_directed][ev[i].op];
+	 */
+	{
+		__m128i v_op_bits = _mm_srli_epi32(v_ones, 30);
+		__m128i v_ev_op = _mm_and_si128(v_op_bits, v_unpack_b32_63);
+
+		/*
+		 * Is directed -> shuffle index:
+		 * - OP field is 2 bits, so indexes 0,1,2,3 are used by LB.
+		 * - DIR results stored in 4,5,6,7, so bit OR in a bit to
+		 *   select the higher shuffle-mask range if the enqueues are
+		 *   for DIR ports.
+		 */
+		uint32_t port_is_directed = qm_port->is_directed;
+		__m128i v_is_dir = _mm_insert_epi32(v_zeros,
+						    0xffffff00 |
+						    port_is_directed << 2, 0);
+		__m128i v_dir_inject = _mm_shuffle_epi32(v_is_dir, 0);
+		__m128i v_ev_op_w_dir = _mm_or_si128(v_dir_inject, v_ev_op);
+
+		/* Load shuffle mask, perform shuffle for CMD byte */
+		__m128i v_op_shuffle = _mm_loadu_si128((__m128i *)cmd_byte_map);
+		__m128i v_qe_op_preshift = _mm_shuffle_epi8(v_op_shuffle,
+							    v_ev_op_w_dir);
+
+		/* move to MSB u8 lane where cmp_byte is located */
+		__m128i v_cmd_byte = _mm_slli_epi32(v_qe_op_preshift, 24);
+
+
+		/* Use burst size as provided in "num" to ZERO cmd_bytes for
+		 * the relevant QE. Note that if num == 4, then no zeroing need
+		 * take place, and num > 4 is invalid.
+		 * We take advantage of the num < 4 fact in the below scalar
+		 * mask generation code.
+		 */
+		if (num < 4) {
+			uint64_t burst_mask = (1ULL << (num * 16)) - 1;
+			__m128i v_mask_u64 = _mm_insert_epi64(v_zeros,
+							      burst_mask, 0);
+			__m128i v_burst_mask = _mm_unpacklo_epi8(v_mask_u64,
+								 v_mask_u64);
+			v_cmd_byte = _mm_blendv_epi8(v_zeros, v_cmd_byte,
+						     v_burst_mask);
+		}
+		v_qe_hcws = v_cmd_byte;
+	}
+
+	/* Priority:
+	 * Generate 3-bit wide mask in correct place, shift prio value itself
+	 * between lanes with a u32 shift, then AND then OR into HCW result.
+	 */
+	{
+		__m128i v_prio_mask = _mm_srli_epi32(v_ones, 29);
+		v_prio_mask = _mm_slli_epi32(v_prio_mask, 2 + 8);
+
+		__m128i v_ev_prio = _mm_srli_epi32(v_unpack_b32_63, 3 + 8);
+		__m128i v_prio_final = _mm_and_si128(v_ev_prio, v_prio_mask);
+		v_qe_hcws = _mm_or_si128(v_qe_hcws, v_prio_final);
+	}
+
+	/* Scalar Loading & inserts:
+	 * - Re-use scalar provided version until "prep_enq" is SSE)
+	 * SCHED_TYPE:
+	 * - Load 4x u8 as u32, insert
+	 * QUEUE ID:
+	 * - Load 4x u8 as u32, insert
+	 * Finalize:
+	 * - Shuffle u8 values to correct locations, OR into HCWs
+	 */
+	/* Expose for re-use in SCHED==DIRECTED mask creation later */
+	__m128i v_4x_qid_sched;
+	{
+		uint32_t qtmp = queue_id[0] |
+				queue_id[1] << 8 |
+				queue_id[2] << 8*2 |
+				queue_id[3] << 8*3;
+		__m128i v_4x_qid = _mm_insert_epi32(_mm_setzero_si128(),
+						qtmp, 0);
+
+		uint32_t stmp = sched_type[0] |
+				sched_type[1] << 8 |
+				sched_type[2] << 8*2 |
+				sched_type[3] << 8*3;
+		v_4x_qid_sched = _mm_insert_epi32(v_4x_qid, stmp, 1);
+
+		static const uint8_t data_move[] = {
+			 0, 4, 0xFF, 0xFF,
+			 1, 5, 0xFF, 0xFF,
+			 2, 6, 0xFF, 0xFF,
+			 3, 7, 0xFF, 0xFF,
+		};
+		__m128i v_qid_sched_move = _mm_loadu_si128((const void *)
+							   data_move);
+		__m128i v_qid_sched_done = _mm_shuffle_epi8(v_4x_qid_sched,
+							    v_qid_sched_move);
+		v_qe_hcws = _mm_or_si128(v_qe_hcws, v_qid_sched_done);
+	}
+
+	/* FlowID, SubEv Type:
+	 * - Process 4x in single reg
+	 * - Shift by 4 bits, blend to align for shuffle mask
+	 * - Shuffle FID vs sub/ev type order
+	 * - unpack as u16s to interleave with previous work
+	 */
+	__m128i v_sev_fid_done;
+	{
+		__m128i v_subevt_shift = _mm_srli_epi16(v_unpack_b00_31, 4);
+		__m128i v_subevt_fixed = _mm_blend_epi16(v_unpack_b00_31,
+							 v_subevt_shift, 0xAA);
+		static const uint8_t data_move[16] = {
+			 3,  2,  0,  1,
+			 7,  6,  4,  5,
+			11, 10,  8,  9,
+			15, 14, 12, 13,
+		};
+		__m128i v_data_move = _mm_loadu_si128((const void *)data_move);
+		v_sev_fid_done = _mm_shuffle_epi8(v_subevt_fixed, v_data_move);
+	}
+
+	/* If sched type == DIRECTED, copy QID/SCHED/PRIO fields to FLOWID u16:
+	 * v_qe_hcws u16 lane 0 gets written to v_sev_fid_done u16 lane 1
+	 * (x4 for QEs) blend results based on == DIR compare
+	 */
+	{
+		__m128i v_q_sp_shift = _mm_slli_epi32(v_qe_hcws, 16);
+		__m128i v_sched_dir = _mm_insert_epi32(v_zeros,
+						       0x3 | 0x3 << 8 |
+						       0x3 << 16 | 0x3 << 24,
+						       1);
+		__m128i v_dir_sched_mask = _mm_cmpeq_epi8(v_4x_qid_sched,
+							  v_sched_dir);
+
+		/* Duplicate cmp mask to u16 as required for larger blend */
+		static const uint8_t data_move[] = {
+			 0xFF, 0xFF, 4, 4,
+			 0xFF, 0xFF, 5, 5,
+			 0xFF, 0xFF, 6, 6,
+			 0xFF, 0xFF, 7, 7,
+		};
+		__m128i v_dir_sched_move = _mm_loadu_si128((const void *)
+							   data_move);
+		__m128i v_dir_sched_mask_done = _mm_shuffle_epi8(
+							v_dir_sched_mask,
+							v_dir_sched_move);
+
+		v_sev_fid_done = _mm_blendv_epi8(v_sev_fid_done,
+						 v_q_sp_shift,
+						 v_dir_sched_mask_done);
+	}
+
+	/* Unpacks from 2 regs to single u64 for each QE */
+	struct dlb2_enqueue_qe *qe = qm_port->qe4;
+	__m128i v_qe_01_u64s = _mm_unpacklo_epi16(v_sev_fid_done, v_qe_hcws);
+	__m128i v_qe_23_u64s = _mm_unpackhi_epi16(v_sev_fid_done, v_qe_hcws);
+
+	/* QE 0 */
+	__m128i v_qe0 = _mm_alignr_epi8(v_qe_01_u64s, v_evs[0], 8);
+	_mm_storeu_si128((void *)&qe[0], v_qe0);
+	/* QE 1 */
+	__m128i v_qe1 = _mm_blend_epi16(v_qe_01_u64s, _mm_alignr_epi8(v_evs[1],
+								      v_evs[1],
+								      8),
+					0x0F);
+	_mm_storeu_si128((void *)&qe[1], v_qe1);
+	/* QE 2 */
+	__m128i v_qe2 = _mm_alignr_epi8(v_qe_23_u64s, v_evs[2], 8);
+	_mm_storeu_si128((void *)&qe[2], v_qe2);
+	/* QE 3 */
+	__m128i v_qe3 = _mm_blend_epi16(v_qe_23_u64s,
+					_mm_alignr_epi8(v_evs[3], v_evs[3], 8),
+					0x0F);
+	_mm_storeu_si128((void *)&qe[3], v_qe3);
+}
+
+static inline void
 dlb2_construct_token_pop_qe(struct dlb2_port *qm_port, int idx)
 {
 	struct dlb2_cq_pop_qe *qe = (void *)qm_port->qe4;
@@ -2932,10 +3183,11 @@ static int dlb2_num_dir_queues_setup(struct dlb2_eventdev *dlb2)
 		int j = 0;
 
 		/* Zero-out QEs */
-		qm_port->qe4[0].cmd_byte = 0;
-		qm_port->qe4[1].cmd_byte = 0;
-		qm_port->qe4[2].cmd_byte = 0;
-		qm_port->qe4[3].cmd_byte = 0;
+		_mm_storeu_si128((void *)&qm_port->qe4[0], _mm_setzero_si128());
+		_mm_storeu_si128((void *)&qm_port->qe4[1], _mm_setzero_si128());
+		_mm_storeu_si128((void *)&qm_port->qe4[2], _mm_setzero_si128());
+		_mm_storeu_si128((void *)&qm_port->qe4[3], _mm_setzero_si128());
+
 
 		for (; j < DLB2_NUM_QES_PER_CACHE_LINE && (i + j) < n; j++) {
 			int16_t thresh = qm_port->token_pop_thresh;
@@ -2965,7 +3217,7 @@ static int dlb2_num_dir_queues_setup(struct dlb2_eventdev *dlb2)
 
 sw_credit_update:
 	/* each release returns one credit */
-	if (!ev_port->outstanding_releases) {
+	if (unlikely(!ev_port->outstanding_releases)) {
 		DLB2_LOG_ERR("%s: Outstanding releases underflowed.\n",
 			     __func__);
 		return;
@@ -3082,7 +3334,7 @@ static int dlb2_num_dir_queues_setup(struct dlb2_eventdev *dlb2)
 	return 0;
 }
 
-static inline int
+static __rte_noinline int
 dlb2_process_dequeue_qes(struct dlb2_eventdev_port *ev_port,
 			 struct dlb2_port *qm_port,
 			 struct rte_event *events,
@@ -3351,8 +3603,7 @@ static int dlb2_num_dir_queues_setup(struct dlb2_eventdev *dlb2)
 
 	cq_addr = dlb2_port[qm_port->id][PORT_TYPE(qm_port)].cq_base;
 
-	idx = qm_port->cq_idx;
-
+	idx = qm_port->cq_idx_unmasked & qm_port->cq_depth_mask;
 	/* Load the next 4 QEs */
 	addr[0] = (uintptr_t)&cq_addr[idx];
 	addr[1] = (uintptr_t)&cq_addr[(idx +  4) & qm_port->cq_depth_mask];
@@ -3398,6 +3649,272 @@ static int dlb2_num_dir_queues_setup(struct dlb2_eventdev *dlb2)
 }
 
 static inline void
+_process_deq_qes_vec_impl(struct dlb2_port *qm_port,
+			  struct rte_event *events,
+			  __m128i v_qe_3,
+			  __m128i v_qe_2,
+			  __m128i v_qe_1,
+			  __m128i v_qe_0,
+			  __m128i v_qe_meta,
+			  __m128i v_qe_status,
+			  uint32_t valid_events)
+{
+	/* Look up the event QIDs, using the hardware QIDs to index the
+	 * port's QID mapping.
+	 *
+	 * Each v_qe_[0-4] is just a 16-byte load of the whole QE. It is
+	 * passed along in registers as the QE data is required later.
+	 *
+	 * v_qe_meta is an u32 unpack of all 4x QEs. Aka, it contains one
+	 * 32-bit slice of each QE, so makes up a full SSE register. This
+	 * allows parallel processing of 4x QEs in a single register.
+	 */
+
+	__m128i v_qid_done = {0};
+	int hw_qid0 = _mm_extract_epi8(v_qe_meta, 2);
+	int hw_qid1 = _mm_extract_epi8(v_qe_meta, 6);
+	int hw_qid2 = _mm_extract_epi8(v_qe_meta, 10);
+	int hw_qid3 = _mm_extract_epi8(v_qe_meta, 14);
+
+	int ev_qid0 = qm_port->qid_mappings[hw_qid0];
+	int ev_qid1 = qm_port->qid_mappings[hw_qid1];
+	int ev_qid2 = qm_port->qid_mappings[hw_qid2];
+	int ev_qid3 = qm_port->qid_mappings[hw_qid3];
+
+	v_qid_done = _mm_insert_epi8(v_qid_done, ev_qid0, 2);
+	v_qid_done = _mm_insert_epi8(v_qid_done, ev_qid1, 6);
+	v_qid_done = _mm_insert_epi8(v_qid_done, ev_qid2, 10);
+	v_qid_done = _mm_insert_epi8(v_qid_done, ev_qid3, 14);
+
+	/* Schedule field remapping using byte shuffle
+	 * - Full byte containing sched field handled here (op, rsvd are zero)
+	 * - Note sanitizing the register requires two masking ANDs:
+	 *   1) to strip prio/msg_type from byte for correct shuffle lookup
+	 *   2) to strip any non-sched-field lanes from any results to OR later
+	 * - Final byte result is >> 10 to another byte-lane inside the u32.
+	 *   This makes the final combination OR easier to make the rte_event.
+	 */
+	__m128i v_sched_done;
+	__m128i v_sched_bits;
+	{
+		static const uint8_t sched_type_map[16] = {
+			[DLB2_SCHED_ATOMIC] = RTE_SCHED_TYPE_ATOMIC,
+			[DLB2_SCHED_UNORDERED] = RTE_SCHED_TYPE_PARALLEL,
+			[DLB2_SCHED_ORDERED] = RTE_SCHED_TYPE_ORDERED,
+			[DLB2_SCHED_DIRECTED] = RTE_SCHED_TYPE_ATOMIC,
+		};
+		static const uint8_t sched_and_mask[16] = {
+			0x00, 0x00, 0x00, 0x03,
+			0x00, 0x00, 0x00, 0x03,
+			0x00, 0x00, 0x00, 0x03,
+			0x00, 0x00, 0x00, 0x03,
+		};
+		const __m128i v_sched_map = _mm_loadu_si128(
+					     (const __m128i *)sched_type_map);
+		__m128i v_sched_mask = _mm_loadu_si128(
+					     (const __m128i *)&sched_and_mask);
+		v_sched_bits = _mm_and_si128(v_qe_meta, v_sched_mask);
+		__m128i v_sched_remapped = _mm_shuffle_epi8(v_sched_map,
+							    v_sched_bits);
+		__m128i v_preshift = _mm_and_si128(v_sched_remapped,
+						   v_sched_mask);
+		v_sched_done = _mm_srli_epi32(v_preshift, 10);
+	}
+
+	/* Priority handling
+	 * - QE provides 3 bits of priority
+	 * - Shift << 3 to move to MSBs for byte-prio in rte_event
+	 * - Mask bits to avoid pollution, leaving only 3 prio MSBs in reg
+	 */
+	__m128i v_prio_done;
+	{
+		static const uint8_t prio_mask[16] = {
+			0x00, 0x00, 0x00, 0x07 << 5,
+			0x00, 0x00, 0x00, 0x07 << 5,
+			0x00, 0x00, 0x00, 0x07 << 5,
+			0x00, 0x00, 0x00, 0x07 << 5,
+		};
+		__m128i v_prio_mask  = _mm_loadu_si128(
+						(const __m128i *)prio_mask);
+		__m128i v_prio_shifted = _mm_slli_epi32(v_qe_meta, 3);
+		v_prio_done = _mm_and_si128(v_prio_shifted, v_prio_mask);
+	}
+
+	/* Event Sub/Type handling:
+	 * we want to keep the lower 12 bits of each QE. Shift up by 20 bits
+	 * to get the sub/ev type data into rte_event location, clearing the
+	 * lower 20 bits in the process.
+	 */
+	__m128i v_types_done;
+	{
+		static const uint8_t event_mask[16] = {
+			0x0f, 0x00, 0x00, 0x00,
+			0x0f, 0x00, 0x00, 0x00,
+			0x0f, 0x00, 0x00, 0x00,
+			0x0f, 0x00, 0x00, 0x00,
+		};
+		static const uint8_t sub_event_mask[16] = {
+			0xff, 0x00, 0x00, 0x00,
+			0xff, 0x00, 0x00, 0x00,
+			0xff, 0x00, 0x00, 0x00,
+			0xff, 0x00, 0x00, 0x00,
+		};
+		static const uint8_t flow_mask[16] = {
+			0xff, 0xff, 0x00, 0x00,
+			0xff, 0xff, 0x00, 0x00,
+			0xff, 0xff, 0x00, 0x00,
+			0xff, 0xff, 0x00, 0x00,
+		};
+		__m128i v_event_mask  = _mm_loadu_si128(
+					(const __m128i *)event_mask);
+		__m128i v_sub_event_mask  = _mm_loadu_si128(
+					(const __m128i *)sub_event_mask);
+		__m128i v_flow_mask  = _mm_loadu_si128(
+				       (const __m128i *)flow_mask);
+		__m128i v_sub = _mm_srli_epi32(v_qe_meta, 8);
+		v_sub = _mm_and_si128(v_sub, v_sub_event_mask);
+		__m128i v_type = _mm_and_si128(v_qe_meta, v_event_mask);
+		v_type = _mm_slli_epi32(v_type, 8);
+		v_types_done = _mm_or_si128(v_type, v_sub);
+		v_types_done = _mm_slli_epi32(v_types_done, 20);
+		__m128i v_flow = _mm_and_si128(v_qe_status, v_flow_mask);
+		v_types_done = _mm_or_si128(v_types_done, v_flow);
+	}
+
+	/* Combine QID, Sched and Prio fields, then Shift >> 8 bits to align
+	 * with the rte_event, allowing unpacks to move/blend with payload.
+	 */
+	__m128i v_q_s_p_done;
+	{
+		__m128i v_qid_sched = _mm_or_si128(v_qid_done, v_sched_done);
+		__m128i v_q_s_prio = _mm_or_si128(v_qid_sched, v_prio_done);
+		v_q_s_p_done = _mm_srli_epi32(v_q_s_prio, 8);
+	}
+
+	__m128i v_unpk_ev_23, v_unpk_ev_01, v_ev_2, v_ev_3, v_ev_0, v_ev_1;
+
+	/* Unpack evs into u64 metadata, then indiv events */
+	v_unpk_ev_23 = _mm_unpackhi_epi32(v_types_done, v_q_s_p_done);
+	v_unpk_ev_01 = _mm_unpacklo_epi32(v_types_done, v_q_s_p_done);
+
+	switch (valid_events) {
+	case 4:
+		v_ev_3 = _mm_blend_epi16(v_unpk_ev_23, v_qe_3, 0x0F);
+		v_ev_3 = _mm_alignr_epi8(v_ev_3, v_ev_3, 8);
+		_mm_storeu_si128((__m128i *)&events[3], v_ev_3);
+		/* fallthrough */
+	case 3:
+		v_ev_2 = _mm_unpacklo_epi64(v_unpk_ev_23, v_qe_2);
+		_mm_storeu_si128((__m128i *)&events[2], v_ev_2);
+		/* fallthrough */
+	case 2:
+		v_ev_1 = _mm_blend_epi16(v_unpk_ev_01, v_qe_1, 0x0F);
+		v_ev_1 = _mm_alignr_epi8(v_ev_1, v_ev_1, 8);
+		_mm_storeu_si128((__m128i *)&events[1], v_ev_1);
+		/* fallthrough */
+	case 1:
+		v_ev_0 = _mm_unpacklo_epi64(v_unpk_ev_01, v_qe_0);
+		_mm_storeu_si128((__m128i *)&events[0], v_ev_0);
+	}
+}
+
+static __rte_always_inline int
+dlb2_recv_qe_sparse_vec(struct dlb2_port *qm_port, void *events,
+			uint32_t max_events)
+{
+	/* Using unmasked idx for perf, and masking manually */
+	uint16_t idx = qm_port->cq_idx_unmasked;
+	volatile struct dlb2_dequeue_qe *cq_addr;
+
+	cq_addr = dlb2_port[qm_port->id][PORT_TYPE(qm_port)].cq_base;
+
+	uintptr_t qe_ptr_3 = (uintptr_t)&cq_addr[(idx + 12) &
+						 qm_port->cq_depth_mask];
+	uintptr_t qe_ptr_2 = (uintptr_t)&cq_addr[(idx +  8) &
+						 qm_port->cq_depth_mask];
+	uintptr_t qe_ptr_1 = (uintptr_t)&cq_addr[(idx +  4) &
+						 qm_port->cq_depth_mask];
+	uintptr_t qe_ptr_0 = (uintptr_t)&cq_addr[(idx +  0) &
+						 qm_port->cq_depth_mask];
+
+	/* Load QEs from CQ: use compiler barriers to avoid load reordering */
+	__m128i v_qe_3 = _mm_loadu_si128((const __m128i *)qe_ptr_3);
+	rte_compiler_barrier();
+	__m128i v_qe_2 = _mm_loadu_si128((const __m128i *)qe_ptr_2);
+	rte_compiler_barrier();
+	__m128i v_qe_1 = _mm_loadu_si128((const __m128i *)qe_ptr_1);
+	rte_compiler_barrier();
+	__m128i v_qe_0 = _mm_loadu_si128((const __m128i *)qe_ptr_0);
+
+	/* Generate the pkt_shuffle mask;
+	 * - Avoids load in otherwise load-heavy section of code
+	 * - Moves bytes 3,7,11,15 (gen bit bytes) to LSB bytes in XMM
+	 */
+	const uint32_t stat_shuf_bytes = (15 << 24) | (11 << 16) | (7 << 8) | 3;
+	__m128i v_zeros = _mm_setzero_si128();
+	__m128i v_ffff = _mm_cmpeq_epi8(v_zeros, v_zeros);
+	__m128i v_stat_shuf_mask = _mm_insert_epi32(v_ffff, stat_shuf_bytes, 0);
+
+	/* Extract u32 components required from the QE
+	 * - QE[64 to 95 ] for metadata (qid, sched, prio, event type, ...)
+	 * - QE[96 to 127] for status (cq gen bit, error)
+	 *
+	 * Note that stage 1 of the unpacking is re-used for both u32 extracts
+	 */
+	__m128i v_qe_02 = _mm_unpackhi_epi32(v_qe_0, v_qe_2);
+	__m128i v_qe_13 = _mm_unpackhi_epi32(v_qe_1, v_qe_3);
+	__m128i v_qe_status = _mm_unpackhi_epi32(v_qe_02, v_qe_13);
+	__m128i v_qe_meta   = _mm_unpacklo_epi32(v_qe_02, v_qe_13);
+
+	/* Status byte (gen_bit, error) handling:
+	 * - Shuffle to lanes 0,1,2,3, clear all others
+	 * - Shift right by 7 for gen bit to MSB, movemask to scalar
+	 * - Shift right by 2 for error bit to MSB, movemask to scalar
+	 */
+	__m128i v_qe_shuffled = _mm_shuffle_epi8(v_qe_status, v_stat_shuf_mask);
+	__m128i v_qes_shift_gen_bit = _mm_slli_epi32(v_qe_shuffled, 7);
+	int32_t qe_gen_bits = _mm_movemask_epi8(v_qes_shift_gen_bit) & 0xf;
+
+	/* Expected vs Reality of QE Gen bits
+	 * - cq_rolling_mask provides expected bits
+	 * - QE loads, unpacks/shuffle and movemask provides reality
+	 * - XOR of the two gives bitmask of new packets
+	 * - POPCNT to get the number of new events
+	 */
+	uint64_t rolling = qm_port->cq_rolling_mask & 0xF;
+	uint64_t qe_xor_bits = (qe_gen_bits ^ rolling);
+	uint32_t count_new = __builtin_popcount(qe_xor_bits);
+	count_new = RTE_MIN(count_new, max_events);
+	if (!count_new)
+		return 0;
+
+	/* emulate a 128 bit rotate using 2x 64-bit numbers and bit-shifts */
+
+	uint64_t m_rshift = qm_port->cq_rolling_mask >> count_new;
+	uint64_t m_lshift = qm_port->cq_rolling_mask << (64 - count_new);
+	uint64_t m2_rshift = qm_port->cq_rolling_mask_2 >> count_new;
+	uint64_t m2_lshift = qm_port->cq_rolling_mask_2 << (64 - count_new);
+
+	/* shifted out of m2 into MSB of m */
+	qm_port->cq_rolling_mask = (m_rshift | m2_lshift);
+
+	/* shifted out of m "looped back" into MSB of m2 */
+	qm_port->cq_rolling_mask_2 = (m2_rshift | m_lshift);
+
+	/* Prefetch the next QEs - should run as IPC instead of cycles */
+	rte_prefetch0(&cq_addr[(idx + 16) & qm_port->cq_depth_mask]);
+	rte_prefetch0(&cq_addr[(idx + 20) & qm_port->cq_depth_mask]);
+	rte_prefetch0(&cq_addr[(idx + 24) & qm_port->cq_depth_mask]);
+	rte_prefetch0(&cq_addr[(idx + 28) & qm_port->cq_depth_mask]);
+
+	/* Convert QEs from XMM regs to events and store events directly */
+	_process_deq_qes_vec_impl(qm_port, events, v_qe_3, v_qe_2, v_qe_1,
+				  v_qe_0, v_qe_meta, v_qe_status, count_new);
+
+	return count_new;
+}
+
+static inline void
 dlb2_inc_cq_idx(struct dlb2_port *qm_port, int cnt)
 {
 	uint16_t idx = qm_port->cq_idx_unmasked + cnt;
@@ -3414,25 +3931,15 @@ static int dlb2_num_dir_queues_setup(struct dlb2_eventdev *dlb2)
 		       uint16_t max_num,
 		       uint64_t dequeue_timeout_ticks)
 {
-	uint64_t timeout;
 	uint64_t start_ticks = 0ULL;
 	struct dlb2_port *qm_port;
 	int num = 0;
+	bool use_scalar;
+	uint64_t timeout;
 
 	qm_port = &ev_port->qm_port;
+	use_scalar = qm_port->use_scalar;
 
-	/* We have a special implementation for waiting. Wait can be:
-	 * 1) no waiting at all
-	 * 2) busy poll only
-	 * 3) wait for interrupt. If wakeup and poll time
-	 * has expired, then return to caller
-	 * 4) umonitor/umwait repeatedly up to poll time
-	 */
-
-	/* If configured for per dequeue wait, then use wait value provided
-	 * to this API. Otherwise we must use the global
-	 * value from eventdev config time.
-	 */
 	if (!dlb2->global_dequeue_wait)
 		timeout = dequeue_timeout_ticks;
 	else
@@ -3440,35 +3947,41 @@ static int dlb2_num_dir_queues_setup(struct dlb2_eventdev *dlb2)
 
 	start_ticks = rte_get_timer_cycles();
 
+	use_scalar = use_scalar || (max_num & 0x3);
+
 	while (num < max_num) {
 		struct dlb2_dequeue_qe qes[DLB2_NUM_QES_PER_CACHE_LINE];
 		int num_avail;
-
-		/* Copy up to 4 QEs from the current cache line into qes */
-		num_avail = dlb2_recv_qe_sparse(qm_port, qes);
-
-		/* But don't process more than the user requested */
-		num_avail = RTE_MIN(num_avail, max_num - num);
-
-		dlb2_inc_cq_idx(qm_port, num_avail << 2);
-
-		if (num_avail == DLB2_NUM_QES_PER_CACHE_LINE)
-			num += dlb2_process_dequeue_four_qes(ev_port,
-							      qm_port,
-							      &events[num],
-							      &qes[0]);
-		else if (num_avail)
-			num += dlb2_process_dequeue_qes(ev_port,
-							 qm_port,
-							 &events[num],
-							 &qes[0],
-							 num_avail);
-		else if ((timeout == 0) || (num > 0))
-			/* Not waiting in any form, or 1+ events received? */
-			break;
-		else if (dlb2_dequeue_wait(dlb2, ev_port, qm_port,
-					   timeout, start_ticks))
-			break;
+		if (use_scalar) {
+			num_avail = dlb2_recv_qe_sparse(qm_port, qes);
+			num_avail = RTE_MIN(num_avail, max_num - num);
+			dlb2_inc_cq_idx(qm_port, num_avail << 2);
+			if (num_avail == DLB2_NUM_QES_PER_CACHE_LINE)
+				num += dlb2_process_dequeue_four_qes(ev_port,
+								  qm_port,
+								  &events[num],
+								  &qes[0]);
+			else if (num_avail)
+				num += dlb2_process_dequeue_qes(ev_port,
+								qm_port,
+								&events[num],
+								&qes[0],
+								num_avail);
+		} else { /* !use_scalar */
+			num_avail = dlb2_recv_qe_sparse_vec(qm_port,
+							    &events[num],
+							    max_num - num);
+			num += num_avail;
+			dlb2_inc_cq_idx(qm_port, num_avail << 2);
+			DLB2_INC_STAT(ev_port->stats.traffic.rx_ok, num_avail);
+		}
+		if (!num_avail) {
+			if (num > 0)
+				break;
+			else if (dlb2_dequeue_wait(dlb2, ev_port, qm_port,
+						   timeout, start_ticks))
+				break;
+		}
 	}
 
 	qm_port->owed_tokens += num;
diff --git a/drivers/event/dlb2/dlb2_priv.h b/drivers/event/dlb2/dlb2_priv.h
index ad663a3..5693448 100644
--- a/drivers/event/dlb2/dlb2_priv.h
+++ b/drivers/event/dlb2/dlb2_priv.h
@@ -199,9 +199,9 @@ enum dlb2_enqueue_type {
 /* hw-specific format - do not change */
 
 struct dlb2_event_type {
-	uint8_t major:4;
-	uint8_t unused:4;
-	uint8_t sub;
+	uint16_t major:4;
+	uint16_t unused:4;
+	uint16_t sub:8;
 };
 
 union dlb2_opaque_data {
@@ -345,6 +345,12 @@ struct dlb2_port {
 	uint16_t cq_idx_unmasked;
 	uint16_t cq_depth_mask;
 	uint16_t gen_bit_shift;
+	uint64_t cq_rolling_mask; /*
+				   * rotate to always have right expected
+				   * gen bits
+				   */
+	uint64_t cq_rolling_mask_2;
+	void *cq_addr_cached; /* avoid multiple refs */
 	enum dlb2_port_state state;
 	enum dlb2_configuration_state config_state;
 	int num_mapped_qids;
@@ -354,6 +360,7 @@ struct dlb2_port {
 	struct dlb2_cq_pop_qe *consume_qe;
 	struct dlb2_eventdev *dlb2; /* back ptr */
 	struct dlb2_eventdev_port *ev_port; /* back ptr */
+	bool use_scalar; /* force usage of scalar code */
 };
 
 /* Per-process per-port mmio and memory pointers */
@@ -507,9 +514,9 @@ struct dlb2_queue {
 	uint32_t num_qid_inflights; /* User config */
 	uint32_t num_atm_inflights; /* User config */
 	enum dlb2_configuration_state config_state;
-	int sched_type; /* LB queue only */
-	uint32_t id;
-	bool is_directed;
+	int  sched_type; /* LB queue only */
+	uint8_t id;
+	bool	 is_directed;
 };
 
 struct dlb2_eventdev_queue {
-- 
1.7.10


^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [dpdk-dev] [PATCH] event/dlb2: Optimize Dequeue Operations
  2021-03-17 17:02 [dpdk-dev] [PATCH] event/dlb2: Optimize Dequeue Operations Timothy McDaniel
@ 2021-03-21 11:10 ` Jerin Jacob
  2021-04-13 20:30 ` [dpdk-dev] [PATCH v2 0/1] Optimize DLB Dequeue Timothy McDaniel
  2021-05-01 19:07 ` [dpdk-dev] [PATCH v3 0/1] Optimize DLB2 Dequeue Operations McDaniel, Timothy
  2 siblings, 0 replies; 9+ messages in thread
From: Jerin Jacob @ 2021-03-21 11:10 UTC (permalink / raw)
  To: Timothy McDaniel
  Cc: Bruce Richardson, dpdk-dev, Jerin Jacob, Van Haaren, Harry,
	Ray Kinsella, Neil Horman, Nikhil Rao, Erik Gabriel Carrillo,
	Gujjar, Abhinandan S, Pavan Nikhilesh, Hemant Agrawal,
	Mattias Rönnblom, Peter Mccarthy

On Wed, Mar 17, 2021 at 10:33 PM Timothy McDaniel
<timothy.mcdaniel@intel.com> wrote:
>
> Convert code to use x86 vector instructions, thereby significantly
> improving dequeue performance.
>
> Signed-off-by: Timothy McDaniel <timothy.mcdaniel@intel.com>
> ---
>  config/rte_config.h            |    1 +
>  drivers/event/dlb2/dlb2.c      |  607 ++++++++++++++++++++++++++++++++++++----
>  drivers/event/dlb2/dlb2_priv.h |   19 +-
>  3 files changed, 574 insertions(+), 53 deletions(-)
>
> diff --git a/config/rte_config.h b/config/rte_config.h
> index aedb68c..133ca35 100644
> --- a/config/rte_config.h
> +++ b/config/rte_config.h
> @@ -144,5 +144,6 @@
>  #undef RTE_LIBRTE_PMD_DLB2_QUELL_STATS
>  #define RTE_LIBRTE_PMD_DLB2_SW_CREDIT_QUANTA 32
>  #define RTE_PMD_DLB2_DEFAULT_DEPTH_THRESH 256
> +#define RTE_LIBRTE_PMD_DLB2_VECTOR_CODE 1

This is not required. Please expose as devargs option.

^ permalink raw reply	[flat|nested] 9+ messages in thread

* [dpdk-dev] [PATCH v2 0/1] Optimize DLB Dequeue
  2021-03-17 17:02 [dpdk-dev] [PATCH] event/dlb2: Optimize Dequeue Operations Timothy McDaniel
  2021-03-21 11:10 ` Jerin Jacob
@ 2021-04-13 20:30 ` Timothy McDaniel
  2021-04-13 20:30   ` [dpdk-dev] [PATCH v2 1/1] event/dlb: optimize Dequeue Operations Timothy McDaniel
  2021-04-29  7:20   ` [dpdk-dev] [PATCH v2 0/1] Optimize DLB Dequeue Jerin Jacob
  2021-05-01 19:07 ` [dpdk-dev] [PATCH v3 0/1] Optimize DLB2 Dequeue Operations McDaniel, Timothy
  2 siblings, 2 replies; 9+ messages in thread
From: Timothy McDaniel @ 2021-04-13 20:30 UTC (permalink / raw)
  Cc: dev, erik.g.carrillo, gage.eads, harry.van.haaren, jerinj, thomas

This commit optimizes dequeue performance by using x86 vector
instructions.

Changes since V1:
Added devargs interface to disable optimization

Depends-on: patch-16345 ("Add DLB 2.5")

Timothy McDaniel (1):
  event/dlb: optimize Dequeue Operations

 drivers/event/dlb/dlb2.c      | 445 ++++++++++++++++++++++++++++++----
 drivers/event/dlb/dlb2_priv.h |  22 +-
 2 files changed, 414 insertions(+), 53 deletions(-)

-- 
2.23.0


^ permalink raw reply	[flat|nested] 9+ messages in thread

* [dpdk-dev] [PATCH v2 1/1] event/dlb: optimize Dequeue Operations
  2021-04-13 20:30 ` [dpdk-dev] [PATCH v2 0/1] Optimize DLB Dequeue Timothy McDaniel
@ 2021-04-13 20:30   ` Timothy McDaniel
  2021-04-29  7:20   ` [dpdk-dev] [PATCH v2 0/1] Optimize DLB Dequeue Jerin Jacob
  1 sibling, 0 replies; 9+ messages in thread
From: Timothy McDaniel @ 2021-04-13 20:30 UTC (permalink / raw)
  Cc: dev, erik.g.carrillo, gage.eads, harry.van.haaren, jerinj, thomas

Convert code to use x86 vector instructions, thereby significantly
improving dequeue performance.

Signed-off-by: Timothy McDaniel <timothy.mcdaniel@intel.com>
Signed-off-by: Harry Van Haaren <harry.van.haaren@intel.com>
---
 drivers/event/dlb/dlb2.c      | 445 ++++++++++++++++++++++++++++++----
 drivers/event/dlb/dlb2_priv.h |  22 +-
 2 files changed, 414 insertions(+), 53 deletions(-)

diff --git a/drivers/event/dlb/dlb2.c b/drivers/event/dlb/dlb2.c
index 818b1c367..c8a50cddf 100644
--- a/drivers/event/dlb/dlb2.c
+++ b/drivers/event/dlb/dlb2.c
@@ -375,6 +375,26 @@ set_default_depth_thresh(const char *key __rte_unused,
 	return 0;
 }
 
+static int
+set_vector_opts_disab(const char *key __rte_unused,
+	const char *value,
+	void *opaque)
+{
+	bool *dlb2_vector_opts_disabled = opaque;
+
+	if (value == NULL || opaque == NULL) {
+		DLB2_LOG_ERR("NULL pointer\n");
+		return -EINVAL;
+	}
+
+	if ((*value == 'y') || (*value == 'Y'))
+		*dlb2_vector_opts_disabled = true;
+	else
+		*dlb2_vector_opts_disabled = false;
+
+	return 0;
+}
+
 static int
 set_qid_depth_thresh(const char *key __rte_unused,
 		     const char *value,
@@ -1240,6 +1260,37 @@ dlb2_event_enqueue_forward_burst_delayed(void *event_port,
 					 const struct rte_event events[],
 					 uint16_t num);
 
+/* Generate the required bitmask for rotate-style expected QE gen bits.
+ * This requires a pattern of 1's and zeros, starting with expected as
+ * 1 bits, so when hardware writes 0's they're "new". This requires the
+ * ring size to be powers of 2 to wrap correctly.
+ */
+static void
+dlb2_hw_cq_bitmask_init(struct dlb2_port *qm_port, uint32_t cq_depth)
+{
+	uint64_t cq_build_mask = 0;
+	uint32_t i;
+
+	if (cq_depth > 64)
+		return; /* need to fall back to scalar code */
+
+	/*
+	 * all 1's in first u64, all zeros in second is correct bit pattern to
+	 * start. Special casing == 64 easier than adapting complex loop logic.
+	 */
+	if (cq_depth == 64) {
+		qm_port->cq_rolling_mask = 0;
+		qm_port->cq_rolling_mask_2 = -1;
+		return;
+	}
+
+	for (i = 0; i < 64; i += (cq_depth * 2))
+		cq_build_mask |= ((1ULL << cq_depth) - 1) << (i + cq_depth);
+
+	qm_port->cq_rolling_mask = cq_build_mask;
+	qm_port->cq_rolling_mask_2 = cq_build_mask;
+}
+
 static int
 dlb2_hw_create_ldb_port(struct dlb2_eventdev *dlb2,
 			struct dlb2_eventdev_port *ev_port,
@@ -1357,6 +1408,8 @@ dlb2_hw_create_ldb_port(struct dlb2_eventdev *dlb2,
 	/* starting value of gen bit - it toggles at wrap time */
 	qm_port->gen_bit = 1;
 
+	dlb2_hw_cq_bitmask_init(qm_port, qm_port->cq_depth);
+
 	qm_port->int_armed = false;
 
 	/* Save off for later use in info and lookup APIs. */
@@ -1408,6 +1461,18 @@ dlb2_hw_create_ldb_port(struct dlb2_eventdev *dlb2,
 			     dequeue_depth,
 			     qm_port->credits);
 	}
+
+	qm_port->use_scalar = false;
+
+#if (!defined RTE_ARCH_X86_64)
+	qm_port->use_scalar = true;
+#else
+	if ((qm_port->cq_depth > 64) ||
+	    (!rte_is_power_of_2(qm_port->cq_depth)) ||
+	    (dlb2->vector_opts_disabled == true))
+		qm_port->use_scalar = true;
+#endif
+
 	rte_spinlock_unlock(&handle->resource_lock);
 
 	return 0;
@@ -1553,6 +1618,7 @@ dlb2_hw_create_dir_port(struct dlb2_eventdev *dlb2,
 	qm_port->gen_bit_shift = __builtin_popcount(qm_port->cq_depth_mask);
 	/* starting value of gen bit - it toggles at wrap time */
 	qm_port->gen_bit = 1;
+	dlb2_hw_cq_bitmask_init(qm_port, qm_port->cq_depth);
 
 	qm_port->int_armed = false;
 
@@ -1593,6 +1659,16 @@ dlb2_hw_create_dir_port(struct dlb2_eventdev *dlb2,
 			     dequeue_depth,
 			     credit_high_watermark);
 	}
+
+#if (!defined RTE_ARCH_X86_64)
+	qm_port->use_scalar = true;
+#else
+	if ((qm_port->cq_depth > 64) ||
+	    (!rte_is_power_of_2(qm_port->cq_depth)) ||
+	    (dlb2->vector_opts_disabled == true))
+		qm_port->use_scalar = true;
+#endif
+
 	rte_spinlock_unlock(&handle->resource_lock);
 
 	return 0;
@@ -2987,10 +3063,11 @@ dlb2_event_release(struct dlb2_eventdev *dlb2,
 		int j = 0;
 
 		/* Zero-out QEs */
-		qm_port->qe4[0].cmd_byte = 0;
-		qm_port->qe4[1].cmd_byte = 0;
-		qm_port->qe4[2].cmd_byte = 0;
-		qm_port->qe4[3].cmd_byte = 0;
+		_mm_storeu_si128((void *)&qm_port->qe4[0], _mm_setzero_si128());
+		_mm_storeu_si128((void *)&qm_port->qe4[1], _mm_setzero_si128());
+		_mm_storeu_si128((void *)&qm_port->qe4[2], _mm_setzero_si128());
+		_mm_storeu_si128((void *)&qm_port->qe4[3], _mm_setzero_si128());
+
 
 		for (; j < DLB2_NUM_QES_PER_CACHE_LINE && (i + j) < n; j++) {
 			int16_t thresh = qm_port->token_pop_thresh;
@@ -3020,7 +3097,7 @@ dlb2_event_release(struct dlb2_eventdev *dlb2,
 
 sw_credit_update:
 	/* each release returns one credit */
-	if (!ev_port->outstanding_releases) {
+	if (unlikely(!ev_port->outstanding_releases)) {
 		DLB2_LOG_ERR("%s: Outstanding releases underflowed.\n",
 			     __func__);
 		return;
@@ -3137,7 +3214,7 @@ dlb2_dequeue_wait(struct dlb2_eventdev *dlb2,
 	return 0;
 }
 
-static inline int
+static __rte_noinline int
 dlb2_process_dequeue_qes(struct dlb2_eventdev_port *ev_port,
 			 struct dlb2_port *qm_port,
 			 struct rte_event *events,
@@ -3406,8 +3483,7 @@ dlb2_recv_qe_sparse(struct dlb2_port *qm_port, struct dlb2_dequeue_qe *qe)
 
 	cq_addr = dlb2_port[qm_port->id][PORT_TYPE(qm_port)].cq_base;
 
-	idx = qm_port->cq_idx;
-
+	idx = qm_port->cq_idx_unmasked & qm_port->cq_depth_mask;
 	/* Load the next 4 QEs */
 	addr[0] = (uintptr_t)&cq_addr[idx];
 	addr[1] = (uintptr_t)&cq_addr[(idx +  4) & qm_port->cq_depth_mask];
@@ -3452,6 +3528,272 @@ dlb2_recv_qe_sparse(struct dlb2_port *qm_port, struct dlb2_dequeue_qe *qe)
 	return __builtin_popcount(gen_bits);
 }
 
+static inline void
+_process_deq_qes_vec_impl(struct dlb2_port *qm_port,
+			  struct rte_event *events,
+			  __m128i v_qe_3,
+			  __m128i v_qe_2,
+			  __m128i v_qe_1,
+			  __m128i v_qe_0,
+			  __m128i v_qe_meta,
+			  __m128i v_qe_status,
+			  uint32_t valid_events)
+{
+	/* Look up the event QIDs, using the hardware QIDs to index the
+	 * port's QID mapping.
+	 *
+	 * Each v_qe_[0-4] is just a 16-byte load of the whole QE. It is
+	 * passed along in registers as the QE data is required later.
+	 *
+	 * v_qe_meta is an u32 unpack of all 4x QEs. Aka, it contains one
+	 * 32-bit slice of each QE, so makes up a full SSE register. This
+	 * allows parallel processing of 4x QEs in a single register.
+	 */
+
+	__m128i v_qid_done = {0};
+	int hw_qid0 = _mm_extract_epi8(v_qe_meta, 2);
+	int hw_qid1 = _mm_extract_epi8(v_qe_meta, 6);
+	int hw_qid2 = _mm_extract_epi8(v_qe_meta, 10);
+	int hw_qid3 = _mm_extract_epi8(v_qe_meta, 14);
+
+	int ev_qid0 = qm_port->qid_mappings[hw_qid0];
+	int ev_qid1 = qm_port->qid_mappings[hw_qid1];
+	int ev_qid2 = qm_port->qid_mappings[hw_qid2];
+	int ev_qid3 = qm_port->qid_mappings[hw_qid3];
+
+	v_qid_done = _mm_insert_epi8(v_qid_done, ev_qid0, 2);
+	v_qid_done = _mm_insert_epi8(v_qid_done, ev_qid1, 6);
+	v_qid_done = _mm_insert_epi8(v_qid_done, ev_qid2, 10);
+	v_qid_done = _mm_insert_epi8(v_qid_done, ev_qid3, 14);
+
+	/* Schedule field remapping using byte shuffle
+	 * - Full byte containing sched field handled here (op, rsvd are zero)
+	 * - Note sanitizing the register requires two masking ANDs:
+	 *   1) to strip prio/msg_type from byte for correct shuffle lookup
+	 *   2) to strip any non-sched-field lanes from any results to OR later
+	 * - Final byte result is >> 10 to another byte-lane inside the u32.
+	 *   This makes the final combination OR easier to make the rte_event.
+	 */
+	__m128i v_sched_done;
+	__m128i v_sched_bits;
+	{
+		static const uint8_t sched_type_map[16] = {
+			[DLB2_SCHED_ATOMIC] = RTE_SCHED_TYPE_ATOMIC,
+			[DLB2_SCHED_UNORDERED] = RTE_SCHED_TYPE_PARALLEL,
+			[DLB2_SCHED_ORDERED] = RTE_SCHED_TYPE_ORDERED,
+			[DLB2_SCHED_DIRECTED] = RTE_SCHED_TYPE_ATOMIC,
+		};
+		static const uint8_t sched_and_mask[16] = {
+			0x00, 0x00, 0x00, 0x03,
+			0x00, 0x00, 0x00, 0x03,
+			0x00, 0x00, 0x00, 0x03,
+			0x00, 0x00, 0x00, 0x03,
+		};
+		const __m128i v_sched_map = _mm_loadu_si128(
+					     (const __m128i *)sched_type_map);
+		__m128i v_sched_mask = _mm_loadu_si128(
+					     (const __m128i *)&sched_and_mask);
+		v_sched_bits = _mm_and_si128(v_qe_meta, v_sched_mask);
+		__m128i v_sched_remapped = _mm_shuffle_epi8(v_sched_map,
+							    v_sched_bits);
+		__m128i v_preshift = _mm_and_si128(v_sched_remapped,
+						   v_sched_mask);
+		v_sched_done = _mm_srli_epi32(v_preshift, 10);
+	}
+
+	/* Priority handling
+	 * - QE provides 3 bits of priority
+	 * - Shift << 3 to move to MSBs for byte-prio in rte_event
+	 * - Mask bits to avoid pollution, leaving only 3 prio MSBs in reg
+	 */
+	__m128i v_prio_done;
+	{
+		static const uint8_t prio_mask[16] = {
+			0x00, 0x00, 0x00, 0x07 << 5,
+			0x00, 0x00, 0x00, 0x07 << 5,
+			0x00, 0x00, 0x00, 0x07 << 5,
+			0x00, 0x00, 0x00, 0x07 << 5,
+		};
+		__m128i v_prio_mask  = _mm_loadu_si128(
+						(const __m128i *)prio_mask);
+		__m128i v_prio_shifted = _mm_slli_epi32(v_qe_meta, 3);
+		v_prio_done = _mm_and_si128(v_prio_shifted, v_prio_mask);
+	}
+
+	/* Event Sub/Type handling:
+	 * we want to keep the lower 12 bits of each QE. Shift up by 20 bits
+	 * to get the sub/ev type data into rte_event location, clearing the
+	 * lower 20 bits in the process.
+	 */
+	__m128i v_types_done;
+	{
+		static const uint8_t event_mask[16] = {
+			0x0f, 0x00, 0x00, 0x00,
+			0x0f, 0x00, 0x00, 0x00,
+			0x0f, 0x00, 0x00, 0x00,
+			0x0f, 0x00, 0x00, 0x00,
+		};
+		static const uint8_t sub_event_mask[16] = {
+			0xff, 0x00, 0x00, 0x00,
+			0xff, 0x00, 0x00, 0x00,
+			0xff, 0x00, 0x00, 0x00,
+			0xff, 0x00, 0x00, 0x00,
+		};
+		static const uint8_t flow_mask[16] = {
+			0xff, 0xff, 0x00, 0x00,
+			0xff, 0xff, 0x00, 0x00,
+			0xff, 0xff, 0x00, 0x00,
+			0xff, 0xff, 0x00, 0x00,
+		};
+		__m128i v_event_mask  = _mm_loadu_si128(
+					(const __m128i *)event_mask);
+		__m128i v_sub_event_mask  = _mm_loadu_si128(
+					(const __m128i *)sub_event_mask);
+		__m128i v_flow_mask  = _mm_loadu_si128(
+				       (const __m128i *)flow_mask);
+		__m128i v_sub = _mm_srli_epi32(v_qe_meta, 8);
+		v_sub = _mm_and_si128(v_sub, v_sub_event_mask);
+		__m128i v_type = _mm_and_si128(v_qe_meta, v_event_mask);
+		v_type = _mm_slli_epi32(v_type, 8);
+		v_types_done = _mm_or_si128(v_type, v_sub);
+		v_types_done = _mm_slli_epi32(v_types_done, 20);
+		__m128i v_flow = _mm_and_si128(v_qe_status, v_flow_mask);
+		v_types_done = _mm_or_si128(v_types_done, v_flow);
+	}
+
+	/* Combine QID, Sched and Prio fields, then Shift >> 8 bits to align
+	 * with the rte_event, allowing unpacks to move/blend with payload.
+	 */
+	__m128i v_q_s_p_done;
+	{
+		__m128i v_qid_sched = _mm_or_si128(v_qid_done, v_sched_done);
+		__m128i v_q_s_prio = _mm_or_si128(v_qid_sched, v_prio_done);
+		v_q_s_p_done = _mm_srli_epi32(v_q_s_prio, 8);
+	}
+
+	__m128i v_unpk_ev_23, v_unpk_ev_01, v_ev_2, v_ev_3, v_ev_0, v_ev_1;
+
+	/* Unpack evs into u64 metadata, then indiv events */
+	v_unpk_ev_23 = _mm_unpackhi_epi32(v_types_done, v_q_s_p_done);
+	v_unpk_ev_01 = _mm_unpacklo_epi32(v_types_done, v_q_s_p_done);
+
+	switch (valid_events) {
+	case 4:
+		v_ev_3 = _mm_blend_epi16(v_unpk_ev_23, v_qe_3, 0x0F);
+		v_ev_3 = _mm_alignr_epi8(v_ev_3, v_ev_3, 8);
+		_mm_storeu_si128((__m128i *)&events[3], v_ev_3);
+		/* fallthrough */
+	case 3:
+		v_ev_2 = _mm_unpacklo_epi64(v_unpk_ev_23, v_qe_2);
+		_mm_storeu_si128((__m128i *)&events[2], v_ev_2);
+		/* fallthrough */
+	case 2:
+		v_ev_1 = _mm_blend_epi16(v_unpk_ev_01, v_qe_1, 0x0F);
+		v_ev_1 = _mm_alignr_epi8(v_ev_1, v_ev_1, 8);
+		_mm_storeu_si128((__m128i *)&events[1], v_ev_1);
+		/* fallthrough */
+	case 1:
+		v_ev_0 = _mm_unpacklo_epi64(v_unpk_ev_01, v_qe_0);
+		_mm_storeu_si128((__m128i *)&events[0], v_ev_0);
+	}
+}
+
+static __rte_always_inline int
+dlb2_recv_qe_sparse_vec(struct dlb2_port *qm_port, void *events,
+			uint32_t max_events)
+{
+	/* Using unmasked idx for perf, and masking manually */
+	uint16_t idx = qm_port->cq_idx_unmasked;
+	volatile struct dlb2_dequeue_qe *cq_addr;
+
+	cq_addr = dlb2_port[qm_port->id][PORT_TYPE(qm_port)].cq_base;
+
+	uintptr_t qe_ptr_3 = (uintptr_t)&cq_addr[(idx + 12) &
+						 qm_port->cq_depth_mask];
+	uintptr_t qe_ptr_2 = (uintptr_t)&cq_addr[(idx +  8) &
+						 qm_port->cq_depth_mask];
+	uintptr_t qe_ptr_1 = (uintptr_t)&cq_addr[(idx +  4) &
+						 qm_port->cq_depth_mask];
+	uintptr_t qe_ptr_0 = (uintptr_t)&cq_addr[(idx +  0) &
+						 qm_port->cq_depth_mask];
+
+	/* Load QEs from CQ: use compiler barriers to avoid load reordering */
+	__m128i v_qe_3 = _mm_loadu_si128((const __m128i *)qe_ptr_3);
+	rte_compiler_barrier();
+	__m128i v_qe_2 = _mm_loadu_si128((const __m128i *)qe_ptr_2);
+	rte_compiler_barrier();
+	__m128i v_qe_1 = _mm_loadu_si128((const __m128i *)qe_ptr_1);
+	rte_compiler_barrier();
+	__m128i v_qe_0 = _mm_loadu_si128((const __m128i *)qe_ptr_0);
+
+	/* Generate the pkt_shuffle mask;
+	 * - Avoids load in otherwise load-heavy section of code
+	 * - Moves bytes 3,7,11,15 (gen bit bytes) to LSB bytes in XMM
+	 */
+	const uint32_t stat_shuf_bytes = (15 << 24) | (11 << 16) | (7 << 8) | 3;
+	__m128i v_zeros = _mm_setzero_si128();
+	__m128i v_ffff = _mm_cmpeq_epi8(v_zeros, v_zeros);
+	__m128i v_stat_shuf_mask = _mm_insert_epi32(v_ffff, stat_shuf_bytes, 0);
+
+	/* Extract u32 components required from the QE
+	 * - QE[64 to 95 ] for metadata (qid, sched, prio, event type, ...)
+	 * - QE[96 to 127] for status (cq gen bit, error)
+	 *
+	 * Note that stage 1 of the unpacking is re-used for both u32 extracts
+	 */
+	__m128i v_qe_02 = _mm_unpackhi_epi32(v_qe_0, v_qe_2);
+	__m128i v_qe_13 = _mm_unpackhi_epi32(v_qe_1, v_qe_3);
+	__m128i v_qe_status = _mm_unpackhi_epi32(v_qe_02, v_qe_13);
+	__m128i v_qe_meta   = _mm_unpacklo_epi32(v_qe_02, v_qe_13);
+
+	/* Status byte (gen_bit, error) handling:
+	 * - Shuffle to lanes 0,1,2,3, clear all others
+	 * - Shift right by 7 for gen bit to MSB, movemask to scalar
+	 * - Shift right by 2 for error bit to MSB, movemask to scalar
+	 */
+	__m128i v_qe_shuffled = _mm_shuffle_epi8(v_qe_status, v_stat_shuf_mask);
+	__m128i v_qes_shift_gen_bit = _mm_slli_epi32(v_qe_shuffled, 7);
+	int32_t qe_gen_bits = _mm_movemask_epi8(v_qes_shift_gen_bit) & 0xf;
+
+	/* Expected vs Reality of QE Gen bits
+	 * - cq_rolling_mask provides expected bits
+	 * - QE loads, unpacks/shuffle and movemask provides reality
+	 * - XOR of the two gives bitmask of new packets
+	 * - POPCNT to get the number of new events
+	 */
+	uint64_t rolling = qm_port->cq_rolling_mask & 0xF;
+	uint64_t qe_xor_bits = (qe_gen_bits ^ rolling);
+	uint32_t count_new = __builtin_popcount(qe_xor_bits);
+	count_new = RTE_MIN(count_new, max_events);
+	if (!count_new)
+		return 0;
+
+	/* emulate a 128 bit rotate using 2x 64-bit numbers and bit-shifts */
+
+	uint64_t m_rshift = qm_port->cq_rolling_mask >> count_new;
+	uint64_t m_lshift = qm_port->cq_rolling_mask << (64 - count_new);
+	uint64_t m2_rshift = qm_port->cq_rolling_mask_2 >> count_new;
+	uint64_t m2_lshift = qm_port->cq_rolling_mask_2 << (64 - count_new);
+
+	/* shifted out of m2 into MSB of m */
+	qm_port->cq_rolling_mask = (m_rshift | m2_lshift);
+
+	/* shifted out of m "looped back" into MSB of m2 */
+	qm_port->cq_rolling_mask_2 = (m2_rshift | m_lshift);
+
+	/* Prefetch the next QEs - should run as IPC instead of cycles */
+	rte_prefetch0(&cq_addr[(idx + 16) & qm_port->cq_depth_mask]);
+	rte_prefetch0(&cq_addr[(idx + 20) & qm_port->cq_depth_mask]);
+	rte_prefetch0(&cq_addr[(idx + 24) & qm_port->cq_depth_mask]);
+	rte_prefetch0(&cq_addr[(idx + 28) & qm_port->cq_depth_mask]);
+
+	/* Convert QEs from XMM regs to events and store events directly */
+	_process_deq_qes_vec_impl(qm_port, events, v_qe_3, v_qe_2, v_qe_1,
+				  v_qe_0, v_qe_meta, v_qe_status, count_new);
+
+	return count_new;
+}
+
 static inline void
 dlb2_inc_cq_idx(struct dlb2_port *qm_port, int cnt)
 {
@@ -3469,25 +3811,15 @@ dlb2_hw_dequeue_sparse(struct dlb2_eventdev *dlb2,
 		       uint16_t max_num,
 		       uint64_t dequeue_timeout_ticks)
 {
-	uint64_t timeout;
 	uint64_t start_ticks = 0ULL;
 	struct dlb2_port *qm_port;
 	int num = 0;
+	bool use_scalar;
+	uint64_t timeout;
 
 	qm_port = &ev_port->qm_port;
+	use_scalar = qm_port->use_scalar;
 
-	/* We have a special implementation for waiting. Wait can be:
-	 * 1) no waiting at all
-	 * 2) busy poll only
-	 * 3) wait for interrupt. If wakeup and poll time
-	 * has expired, then return to caller
-	 * 4) umonitor/umwait repeatedly up to poll time
-	 */
-
-	/* If configured for per dequeue wait, then use wait value provided
-	 * to this API. Otherwise we must use the global
-	 * value from eventdev config time.
-	 */
 	if (!dlb2->global_dequeue_wait)
 		timeout = dequeue_timeout_ticks;
 	else
@@ -3495,35 +3827,41 @@ dlb2_hw_dequeue_sparse(struct dlb2_eventdev *dlb2,
 
 	start_ticks = rte_get_timer_cycles();
 
+	use_scalar = use_scalar || (max_num & 0x3);
+
 	while (num < max_num) {
 		struct dlb2_dequeue_qe qes[DLB2_NUM_QES_PER_CACHE_LINE];
 		int num_avail;
-
-		/* Copy up to 4 QEs from the current cache line into qes */
-		num_avail = dlb2_recv_qe_sparse(qm_port, qes);
-
-		/* But don't process more than the user requested */
-		num_avail = RTE_MIN(num_avail, max_num - num);
-
-		dlb2_inc_cq_idx(qm_port, num_avail << 2);
-
-		if (num_avail == DLB2_NUM_QES_PER_CACHE_LINE)
-			num += dlb2_process_dequeue_four_qes(ev_port,
-							      qm_port,
-							      &events[num],
-							      &qes[0]);
-		else if (num_avail)
-			num += dlb2_process_dequeue_qes(ev_port,
-							 qm_port,
-							 &events[num],
-							 &qes[0],
-							 num_avail);
-		else if ((timeout == 0) || (num > 0))
-			/* Not waiting in any form, or 1+ events received? */
-			break;
-		else if (dlb2_dequeue_wait(dlb2, ev_port, qm_port,
-					   timeout, start_ticks))
-			break;
+		if (use_scalar) {
+			num_avail = dlb2_recv_qe_sparse(qm_port, qes);
+			num_avail = RTE_MIN(num_avail, max_num - num);
+			dlb2_inc_cq_idx(qm_port, num_avail << 2);
+			if (num_avail == DLB2_NUM_QES_PER_CACHE_LINE)
+				num += dlb2_process_dequeue_four_qes(ev_port,
+								  qm_port,
+								  &events[num],
+								  &qes[0]);
+			else if (num_avail)
+				num += dlb2_process_dequeue_qes(ev_port,
+								qm_port,
+								&events[num],
+								&qes[0],
+								num_avail);
+		} else { /* !use_scalar */
+			num_avail = dlb2_recv_qe_sparse_vec(qm_port,
+							    &events[num],
+							    max_num - num);
+			num += num_avail;
+			dlb2_inc_cq_idx(qm_port, num_avail << 2);
+			DLB2_INC_STAT(ev_port->stats.traffic.rx_ok, num_avail);
+		}
+		if (!num_avail) {
+			if (num > 0)
+				break;
+			else if (dlb2_dequeue_wait(dlb2, ev_port, qm_port,
+						   timeout, start_ticks))
+				break;
+		}
 	}
 
 	qm_port->owed_tokens += num;
@@ -4083,6 +4421,7 @@ dlb2_primary_eventdev_probe(struct rte_eventdev *dev,
 	dlb2->poll_interval = dlb2_args->poll_interval;
 	dlb2->sw_credit_quanta = dlb2_args->sw_credit_quanta;
 	dlb2->default_depth_thresh = dlb2_args->default_depth_thresh;
+	dlb2->vector_opts_disabled = dlb2_args->vector_opts_disabled;
 
 	err = dlb2_iface_open(&dlb2->qm_instance, name);
 	if (err < 0) {
@@ -4186,6 +4525,7 @@ dlb2_parse_params(const char *params,
 					     DLB2_POLL_INTERVAL_ARG,
 					     DLB2_SW_CREDIT_QUANTA_ARG,
 					     DLB2_DEPTH_THRESH_ARG,
+					     DLB2_VECTOR_OPTS_DISAB_ARG,
 					     NULL };
 
 	if (params != NULL && params[0] != '\0') {
@@ -4299,6 +4639,17 @@ dlb2_parse_params(const char *params,
 				return ret;
 			}
 
+			ret = rte_kvargs_process(kvlist,
+					DLB2_VECTOR_OPTS_DISAB_ARG,
+					set_vector_opts_disab,
+					&dlb2_args->vector_opts_disabled);
+			if (ret != 0) {
+				DLB2_LOG_ERR("%s: Error parsing vector opts disabled",
+					     name);
+				rte_kvargs_free(kvlist);
+				return ret;
+			}
+
 			rte_kvargs_free(kvlist);
 		}
 	}
diff --git a/drivers/event/dlb/dlb2_priv.h b/drivers/event/dlb/dlb2_priv.h
index 3c540a264..8b38d04fb 100644
--- a/drivers/event/dlb/dlb2_priv.h
+++ b/drivers/event/dlb/dlb2_priv.h
@@ -39,6 +39,7 @@
 #define DLB2_POLL_INTERVAL_ARG "poll_interval"
 #define DLB2_SW_CREDIT_QUANTA_ARG "sw_credit_quanta"
 #define DLB2_DEPTH_THRESH_ARG "default_depth_thresh"
+#define DLB2_VECTOR_OPTS_DISAB_ARG "vector_opts_disable"
 
 /* Begin HW related defines and structs */
 
@@ -206,9 +207,9 @@ enum dlb2_enqueue_type {
 /* hw-specific format - do not change */
 
 struct dlb2_event_type {
-	uint8_t major:4;
-	uint8_t unused:4;
-	uint8_t sub;
+	uint16_t major:4;
+	uint16_t unused:4;
+	uint16_t sub:8;
 };
 
 union dlb2_opaque_data {
@@ -352,6 +353,12 @@ struct dlb2_port {
 	uint16_t cq_idx_unmasked;
 	uint16_t cq_depth_mask;
 	uint16_t gen_bit_shift;
+	uint64_t cq_rolling_mask; /*
+				   * rotate to always have right expected
+				   * gen bits
+				   */
+	uint64_t cq_rolling_mask_2;
+	void *cq_addr_cached; /* avoid multiple refs */
 	enum dlb2_port_state state;
 	enum dlb2_configuration_state config_state;
 	int num_mapped_qids;
@@ -361,6 +368,7 @@ struct dlb2_port {
 	struct dlb2_cq_pop_qe *consume_qe;
 	struct dlb2_eventdev *dlb2; /* back ptr */
 	struct dlb2_eventdev_port *ev_port; /* back ptr */
+	bool use_scalar; /* force usage of scalar code */
 };
 
 /* Per-process per-port mmio and memory pointers */
@@ -514,9 +522,9 @@ struct dlb2_queue {
 	uint32_t num_qid_inflights; /* User config */
 	uint32_t num_atm_inflights; /* User config */
 	enum dlb2_configuration_state config_state;
-	int sched_type; /* LB queue only */
-	uint32_t id;
-	bool is_directed;
+	int  sched_type; /* LB queue only */
+	uint8_t id;
+	bool	 is_directed;
 };
 
 struct dlb2_eventdev_queue {
@@ -559,6 +567,7 @@ struct dlb2_eventdev {
 	uint32_t new_event_limit;
 	int max_num_events_override;
 	int num_dir_credits_override;
+	bool vector_opts_disabled;
 	volatile enum dlb2_run_state run_state;
 	uint16_t num_dir_queues; /* total num of evdev dir queues requested */
 	union {
@@ -618,6 +627,7 @@ struct dlb2_devargs {
 	int poll_interval;
 	int sw_credit_quanta;
 	int default_depth_thresh;
+	bool vector_opts_disabled;
 };
 
 /* End Eventdev related defines and structs */
-- 
2.23.0


^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [dpdk-dev] [PATCH v2 0/1] Optimize DLB Dequeue
  2021-04-13 20:30 ` [dpdk-dev] [PATCH v2 0/1] Optimize DLB Dequeue Timothy McDaniel
  2021-04-13 20:30   ` [dpdk-dev] [PATCH v2 1/1] event/dlb: optimize Dequeue Operations Timothy McDaniel
@ 2021-04-29  7:20   ` Jerin Jacob
  2021-04-29 13:45     ` McDaniel, Timothy
  1 sibling, 1 reply; 9+ messages in thread
From: Jerin Jacob @ 2021-04-29  7:20 UTC (permalink / raw)
  To: Timothy McDaniel
  Cc: dpdk-dev, Erik Gabriel Carrillo, Gage Eads, Van Haaren, Harry,
	Jerin Jacob, Thomas Monjalon

On Wed, Apr 14, 2021 at 2:02 AM Timothy McDaniel
<timothy.mcdaniel@intel.com> wrote:
>
> This commit optimizes dequeue performance by using x86 vector
> instructions.
>
> Changes since V1:
> Added devargs interface to disable optimization
>
> Depends-on: patch-16345 ("Add DLB 2.5")

Now that series needs rework. Please rebase this series.
Marking as "Change Requested"  for this patch.

>
> Timothy McDaniel (1):
>   event/dlb: optimize Dequeue Operations
>
>  drivers/event/dlb/dlb2.c      | 445 ++++++++++++++++++++++++++++++----
>  drivers/event/dlb/dlb2_priv.h |  22 +-
>  2 files changed, 414 insertions(+), 53 deletions(-)
>
> --
> 2.23.0
>

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [dpdk-dev] [PATCH v2 0/1] Optimize DLB Dequeue
  2021-04-29  7:20   ` [dpdk-dev] [PATCH v2 0/1] Optimize DLB Dequeue Jerin Jacob
@ 2021-04-29 13:45     ` McDaniel, Timothy
  0 siblings, 0 replies; 9+ messages in thread
From: McDaniel, Timothy @ 2021-04-29 13:45 UTC (permalink / raw)
  To: Jerin Jacob
  Cc: dpdk-dev, Carrillo, Erik G, Gage Eads, Van Haaren, Harry,
	Jerin Jacob, Thomas Monjalon



> -----Original Message-----
> From: Jerin Jacob <jerinjacobk@gmail.com>
> Sent: Thursday, April 29, 2021 2:21 AM
> To: McDaniel, Timothy <timothy.mcdaniel@intel.com>
> Cc: dpdk-dev <dev@dpdk.org>; Carrillo, Erik G <erik.g.carrillo@intel.com>; Gage
> Eads <gage.eads@intel.com>; Van Haaren, Harry
> <harry.van.haaren@intel.com>; Jerin Jacob <jerinj@marvell.com>; Thomas
> Monjalon <thomas@monjalon.net>
> Subject: Re: [dpdk-dev] [PATCH v2 0/1] Optimize DLB Dequeue
> 
> On Wed, Apr 14, 2021 at 2:02 AM Timothy McDaniel
> <timothy.mcdaniel@intel.com> wrote:
> >
> > This commit optimizes dequeue performance by using x86 vector
> > instructions.
> >
> > Changes since V1:
> > Added devargs interface to disable optimization
> >
> > Depends-on: patch-16345 ("Add DLB 2.5")
> 
> Now that series needs rework. Please rebase this series.
> Marking as "Change Requested"  for this patch.
> 
> >
> > Timothy McDaniel (1):
> >   event/dlb: optimize Dequeue Operations
> >
> >  drivers/event/dlb/dlb2.c      | 445 ++++++++++++++++++++++++++++++----
> >  drivers/event/dlb/dlb2_priv.h |  22 +-
> >  2 files changed, 414 insertions(+), 53 deletions(-)
> >
> > --
> > 2.23.0
> >

I will rebase and resubmit.

Thanks,
Tim

^ permalink raw reply	[flat|nested] 9+ messages in thread

* [dpdk-dev] [PATCH v3 0/1] Optimize DLB2 Dequeue Operations
  2021-03-17 17:02 [dpdk-dev] [PATCH] event/dlb2: Optimize Dequeue Operations Timothy McDaniel
  2021-03-21 11:10 ` Jerin Jacob
  2021-04-13 20:30 ` [dpdk-dev] [PATCH v2 0/1] Optimize DLB Dequeue Timothy McDaniel
@ 2021-05-01 19:07 ` McDaniel, Timothy
  2021-05-01 19:07   ` [dpdk-dev] [PATCH v3 1/1] event/dlb2: optimize " McDaniel, Timothy
  2 siblings, 1 reply; 9+ messages in thread
From: McDaniel, Timothy @ 2021-05-01 19:07 UTC (permalink / raw)
  Cc: dev, erik.g.carrillo, harry.van.haaren, jerinj, thomas, Timothy McDaniel

From: Timothy McDaniel <timothy.mcdaniel@intel.com>

This patch converts the PMD to use x86 vector instructions, thereby
significantly improving dequeue performance.

Changes since V2:
1) Rebased patch on top of dpdk-next-eventdev
2) Use drivers/event/dlb2 as source directory to patch,
   thereby taking into account that the PMD retains its
   original name (dlb2_event).

Timothy McDaniel (1):
  event/dlb2: optimize Dequeue Operations

 drivers/event/dlb2/dlb2.c      | 445 +++++++++++++++++++++++++++++----
 drivers/event/dlb2/dlb2_priv.h |  22 +-
 2 files changed, 414 insertions(+), 53 deletions(-)

-- 
2.23.0


^ permalink raw reply	[flat|nested] 9+ messages in thread

* [dpdk-dev] [PATCH v3 1/1] event/dlb2: optimize Dequeue Operations
  2021-05-01 19:07 ` [dpdk-dev] [PATCH v3 0/1] Optimize DLB2 Dequeue Operations McDaniel, Timothy
@ 2021-05-01 19:07   ` McDaniel, Timothy
  2021-05-04  8:29     ` Jerin Jacob
  0 siblings, 1 reply; 9+ messages in thread
From: McDaniel, Timothy @ 2021-05-01 19:07 UTC (permalink / raw)
  Cc: dev, erik.g.carrillo, harry.van.haaren, jerinj, thomas, Timothy McDaniel

From: Timothy McDaniel <timothy.mcdaniel@intel.com>

Convert code to use x86 vector instructions, thereby significantly
improving dequeue performance.

Signed-off-by: Timothy McDaniel <timothy.mcdaniel@intel.com>
Signed-off-by: Harry Van Haaren <harry.van.haaren@intel.com>
---
 drivers/event/dlb2/dlb2.c      | 445 +++++++++++++++++++++++++++++----
 drivers/event/dlb2/dlb2_priv.h |  22 +-
 2 files changed, 414 insertions(+), 53 deletions(-)

diff --git a/drivers/event/dlb2/dlb2.c b/drivers/event/dlb2/dlb2.c
index 818b1c367..c8a50cddf 100644
--- a/drivers/event/dlb2/dlb2.c
+++ b/drivers/event/dlb2/dlb2.c
@@ -375,6 +375,26 @@ set_default_depth_thresh(const char *key __rte_unused,
 	return 0;
 }
 
+static int
+set_vector_opts_disab(const char *key __rte_unused,
+	const char *value,
+	void *opaque)
+{
+	bool *dlb2_vector_opts_disabled = opaque;
+
+	if (value == NULL || opaque == NULL) {
+		DLB2_LOG_ERR("NULL pointer\n");
+		return -EINVAL;
+	}
+
+	if ((*value == 'y') || (*value == 'Y'))
+		*dlb2_vector_opts_disabled = true;
+	else
+		*dlb2_vector_opts_disabled = false;
+
+	return 0;
+}
+
 static int
 set_qid_depth_thresh(const char *key __rte_unused,
 		     const char *value,
@@ -1240,6 +1260,37 @@ dlb2_event_enqueue_forward_burst_delayed(void *event_port,
 					 const struct rte_event events[],
 					 uint16_t num);
 
+/* Generate the required bitmask for rotate-style expected QE gen bits.
+ * This requires a pattern of 1's and zeros, starting with expected as
+ * 1 bits, so when hardware writes 0's they're "new". This requires the
+ * ring size to be powers of 2 to wrap correctly.
+ */
+static void
+dlb2_hw_cq_bitmask_init(struct dlb2_port *qm_port, uint32_t cq_depth)
+{
+	uint64_t cq_build_mask = 0;
+	uint32_t i;
+
+	if (cq_depth > 64)
+		return; /* need to fall back to scalar code */
+
+	/*
+	 * all 1's in first u64, all zeros in second is correct bit pattern to
+	 * start. Special casing == 64 easier than adapting complex loop logic.
+	 */
+	if (cq_depth == 64) {
+		qm_port->cq_rolling_mask = 0;
+		qm_port->cq_rolling_mask_2 = -1;
+		return;
+	}
+
+	for (i = 0; i < 64; i += (cq_depth * 2))
+		cq_build_mask |= ((1ULL << cq_depth) - 1) << (i + cq_depth);
+
+	qm_port->cq_rolling_mask = cq_build_mask;
+	qm_port->cq_rolling_mask_2 = cq_build_mask;
+}
+
 static int
 dlb2_hw_create_ldb_port(struct dlb2_eventdev *dlb2,
 			struct dlb2_eventdev_port *ev_port,
@@ -1357,6 +1408,8 @@ dlb2_hw_create_ldb_port(struct dlb2_eventdev *dlb2,
 	/* starting value of gen bit - it toggles at wrap time */
 	qm_port->gen_bit = 1;
 
+	dlb2_hw_cq_bitmask_init(qm_port, qm_port->cq_depth);
+
 	qm_port->int_armed = false;
 
 	/* Save off for later use in info and lookup APIs. */
@@ -1408,6 +1461,18 @@ dlb2_hw_create_ldb_port(struct dlb2_eventdev *dlb2,
 			     dequeue_depth,
 			     qm_port->credits);
 	}
+
+	qm_port->use_scalar = false;
+
+#if (!defined RTE_ARCH_X86_64)
+	qm_port->use_scalar = true;
+#else
+	if ((qm_port->cq_depth > 64) ||
+	    (!rte_is_power_of_2(qm_port->cq_depth)) ||
+	    (dlb2->vector_opts_disabled == true))
+		qm_port->use_scalar = true;
+#endif
+
 	rte_spinlock_unlock(&handle->resource_lock);
 
 	return 0;
@@ -1553,6 +1618,7 @@ dlb2_hw_create_dir_port(struct dlb2_eventdev *dlb2,
 	qm_port->gen_bit_shift = __builtin_popcount(qm_port->cq_depth_mask);
 	/* starting value of gen bit - it toggles at wrap time */
 	qm_port->gen_bit = 1;
+	dlb2_hw_cq_bitmask_init(qm_port, qm_port->cq_depth);
 
 	qm_port->int_armed = false;
 
@@ -1593,6 +1659,16 @@ dlb2_hw_create_dir_port(struct dlb2_eventdev *dlb2,
 			     dequeue_depth,
 			     credit_high_watermark);
 	}
+
+#if (!defined RTE_ARCH_X86_64)
+	qm_port->use_scalar = true;
+#else
+	if ((qm_port->cq_depth > 64) ||
+	    (!rte_is_power_of_2(qm_port->cq_depth)) ||
+	    (dlb2->vector_opts_disabled == true))
+		qm_port->use_scalar = true;
+#endif
+
 	rte_spinlock_unlock(&handle->resource_lock);
 
 	return 0;
@@ -2987,10 +3063,11 @@ dlb2_event_release(struct dlb2_eventdev *dlb2,
 		int j = 0;
 
 		/* Zero-out QEs */
-		qm_port->qe4[0].cmd_byte = 0;
-		qm_port->qe4[1].cmd_byte = 0;
-		qm_port->qe4[2].cmd_byte = 0;
-		qm_port->qe4[3].cmd_byte = 0;
+		_mm_storeu_si128((void *)&qm_port->qe4[0], _mm_setzero_si128());
+		_mm_storeu_si128((void *)&qm_port->qe4[1], _mm_setzero_si128());
+		_mm_storeu_si128((void *)&qm_port->qe4[2], _mm_setzero_si128());
+		_mm_storeu_si128((void *)&qm_port->qe4[3], _mm_setzero_si128());
+
 
 		for (; j < DLB2_NUM_QES_PER_CACHE_LINE && (i + j) < n; j++) {
 			int16_t thresh = qm_port->token_pop_thresh;
@@ -3020,7 +3097,7 @@ dlb2_event_release(struct dlb2_eventdev *dlb2,
 
 sw_credit_update:
 	/* each release returns one credit */
-	if (!ev_port->outstanding_releases) {
+	if (unlikely(!ev_port->outstanding_releases)) {
 		DLB2_LOG_ERR("%s: Outstanding releases underflowed.\n",
 			     __func__);
 		return;
@@ -3137,7 +3214,7 @@ dlb2_dequeue_wait(struct dlb2_eventdev *dlb2,
 	return 0;
 }
 
-static inline int
+static __rte_noinline int
 dlb2_process_dequeue_qes(struct dlb2_eventdev_port *ev_port,
 			 struct dlb2_port *qm_port,
 			 struct rte_event *events,
@@ -3406,8 +3483,7 @@ dlb2_recv_qe_sparse(struct dlb2_port *qm_port, struct dlb2_dequeue_qe *qe)
 
 	cq_addr = dlb2_port[qm_port->id][PORT_TYPE(qm_port)].cq_base;
 
-	idx = qm_port->cq_idx;
-
+	idx = qm_port->cq_idx_unmasked & qm_port->cq_depth_mask;
 	/* Load the next 4 QEs */
 	addr[0] = (uintptr_t)&cq_addr[idx];
 	addr[1] = (uintptr_t)&cq_addr[(idx +  4) & qm_port->cq_depth_mask];
@@ -3452,6 +3528,272 @@ dlb2_recv_qe_sparse(struct dlb2_port *qm_port, struct dlb2_dequeue_qe *qe)
 	return __builtin_popcount(gen_bits);
 }
 
+static inline void
+_process_deq_qes_vec_impl(struct dlb2_port *qm_port,
+			  struct rte_event *events,
+			  __m128i v_qe_3,
+			  __m128i v_qe_2,
+			  __m128i v_qe_1,
+			  __m128i v_qe_0,
+			  __m128i v_qe_meta,
+			  __m128i v_qe_status,
+			  uint32_t valid_events)
+{
+	/* Look up the event QIDs, using the hardware QIDs to index the
+	 * port's QID mapping.
+	 *
+	 * Each v_qe_[0-4] is just a 16-byte load of the whole QE. It is
+	 * passed along in registers as the QE data is required later.
+	 *
+	 * v_qe_meta is an u32 unpack of all 4x QEs. Aka, it contains one
+	 * 32-bit slice of each QE, so makes up a full SSE register. This
+	 * allows parallel processing of 4x QEs in a single register.
+	 */
+
+	__m128i v_qid_done = {0};
+	int hw_qid0 = _mm_extract_epi8(v_qe_meta, 2);
+	int hw_qid1 = _mm_extract_epi8(v_qe_meta, 6);
+	int hw_qid2 = _mm_extract_epi8(v_qe_meta, 10);
+	int hw_qid3 = _mm_extract_epi8(v_qe_meta, 14);
+
+	int ev_qid0 = qm_port->qid_mappings[hw_qid0];
+	int ev_qid1 = qm_port->qid_mappings[hw_qid1];
+	int ev_qid2 = qm_port->qid_mappings[hw_qid2];
+	int ev_qid3 = qm_port->qid_mappings[hw_qid3];
+
+	v_qid_done = _mm_insert_epi8(v_qid_done, ev_qid0, 2);
+	v_qid_done = _mm_insert_epi8(v_qid_done, ev_qid1, 6);
+	v_qid_done = _mm_insert_epi8(v_qid_done, ev_qid2, 10);
+	v_qid_done = _mm_insert_epi8(v_qid_done, ev_qid3, 14);
+
+	/* Schedule field remapping using byte shuffle
+	 * - Full byte containing sched field handled here (op, rsvd are zero)
+	 * - Note sanitizing the register requires two masking ANDs:
+	 *   1) to strip prio/msg_type from byte for correct shuffle lookup
+	 *   2) to strip any non-sched-field lanes from any results to OR later
+	 * - Final byte result is >> 10 to another byte-lane inside the u32.
+	 *   This makes the final combination OR easier to make the rte_event.
+	 */
+	__m128i v_sched_done;
+	__m128i v_sched_bits;
+	{
+		static const uint8_t sched_type_map[16] = {
+			[DLB2_SCHED_ATOMIC] = RTE_SCHED_TYPE_ATOMIC,
+			[DLB2_SCHED_UNORDERED] = RTE_SCHED_TYPE_PARALLEL,
+			[DLB2_SCHED_ORDERED] = RTE_SCHED_TYPE_ORDERED,
+			[DLB2_SCHED_DIRECTED] = RTE_SCHED_TYPE_ATOMIC,
+		};
+		static const uint8_t sched_and_mask[16] = {
+			0x00, 0x00, 0x00, 0x03,
+			0x00, 0x00, 0x00, 0x03,
+			0x00, 0x00, 0x00, 0x03,
+			0x00, 0x00, 0x00, 0x03,
+		};
+		const __m128i v_sched_map = _mm_loadu_si128(
+					     (const __m128i *)sched_type_map);
+		__m128i v_sched_mask = _mm_loadu_si128(
+					     (const __m128i *)&sched_and_mask);
+		v_sched_bits = _mm_and_si128(v_qe_meta, v_sched_mask);
+		__m128i v_sched_remapped = _mm_shuffle_epi8(v_sched_map,
+							    v_sched_bits);
+		__m128i v_preshift = _mm_and_si128(v_sched_remapped,
+						   v_sched_mask);
+		v_sched_done = _mm_srli_epi32(v_preshift, 10);
+	}
+
+	/* Priority handling
+	 * - QE provides 3 bits of priority
+	 * - Shift << 3 to move to MSBs for byte-prio in rte_event
+	 * - Mask bits to avoid pollution, leaving only 3 prio MSBs in reg
+	 */
+	__m128i v_prio_done;
+	{
+		static const uint8_t prio_mask[16] = {
+			0x00, 0x00, 0x00, 0x07 << 5,
+			0x00, 0x00, 0x00, 0x07 << 5,
+			0x00, 0x00, 0x00, 0x07 << 5,
+			0x00, 0x00, 0x00, 0x07 << 5,
+		};
+		__m128i v_prio_mask  = _mm_loadu_si128(
+						(const __m128i *)prio_mask);
+		__m128i v_prio_shifted = _mm_slli_epi32(v_qe_meta, 3);
+		v_prio_done = _mm_and_si128(v_prio_shifted, v_prio_mask);
+	}
+
+	/* Event Sub/Type handling:
+	 * we want to keep the lower 12 bits of each QE. Shift up by 20 bits
+	 * to get the sub/ev type data into rte_event location, clearing the
+	 * lower 20 bits in the process.
+	 */
+	__m128i v_types_done;
+	{
+		static const uint8_t event_mask[16] = {
+			0x0f, 0x00, 0x00, 0x00,
+			0x0f, 0x00, 0x00, 0x00,
+			0x0f, 0x00, 0x00, 0x00,
+			0x0f, 0x00, 0x00, 0x00,
+		};
+		static const uint8_t sub_event_mask[16] = {
+			0xff, 0x00, 0x00, 0x00,
+			0xff, 0x00, 0x00, 0x00,
+			0xff, 0x00, 0x00, 0x00,
+			0xff, 0x00, 0x00, 0x00,
+		};
+		static const uint8_t flow_mask[16] = {
+			0xff, 0xff, 0x00, 0x00,
+			0xff, 0xff, 0x00, 0x00,
+			0xff, 0xff, 0x00, 0x00,
+			0xff, 0xff, 0x00, 0x00,
+		};
+		__m128i v_event_mask  = _mm_loadu_si128(
+					(const __m128i *)event_mask);
+		__m128i v_sub_event_mask  = _mm_loadu_si128(
+					(const __m128i *)sub_event_mask);
+		__m128i v_flow_mask  = _mm_loadu_si128(
+				       (const __m128i *)flow_mask);
+		__m128i v_sub = _mm_srli_epi32(v_qe_meta, 8);
+		v_sub = _mm_and_si128(v_sub, v_sub_event_mask);
+		__m128i v_type = _mm_and_si128(v_qe_meta, v_event_mask);
+		v_type = _mm_slli_epi32(v_type, 8);
+		v_types_done = _mm_or_si128(v_type, v_sub);
+		v_types_done = _mm_slli_epi32(v_types_done, 20);
+		__m128i v_flow = _mm_and_si128(v_qe_status, v_flow_mask);
+		v_types_done = _mm_or_si128(v_types_done, v_flow);
+	}
+
+	/* Combine QID, Sched and Prio fields, then Shift >> 8 bits to align
+	 * with the rte_event, allowing unpacks to move/blend with payload.
+	 */
+	__m128i v_q_s_p_done;
+	{
+		__m128i v_qid_sched = _mm_or_si128(v_qid_done, v_sched_done);
+		__m128i v_q_s_prio = _mm_or_si128(v_qid_sched, v_prio_done);
+		v_q_s_p_done = _mm_srli_epi32(v_q_s_prio, 8);
+	}
+
+	__m128i v_unpk_ev_23, v_unpk_ev_01, v_ev_2, v_ev_3, v_ev_0, v_ev_1;
+
+	/* Unpack evs into u64 metadata, then indiv events */
+	v_unpk_ev_23 = _mm_unpackhi_epi32(v_types_done, v_q_s_p_done);
+	v_unpk_ev_01 = _mm_unpacklo_epi32(v_types_done, v_q_s_p_done);
+
+	switch (valid_events) {
+	case 4:
+		v_ev_3 = _mm_blend_epi16(v_unpk_ev_23, v_qe_3, 0x0F);
+		v_ev_3 = _mm_alignr_epi8(v_ev_3, v_ev_3, 8);
+		_mm_storeu_si128((__m128i *)&events[3], v_ev_3);
+		/* fallthrough */
+	case 3:
+		v_ev_2 = _mm_unpacklo_epi64(v_unpk_ev_23, v_qe_2);
+		_mm_storeu_si128((__m128i *)&events[2], v_ev_2);
+		/* fallthrough */
+	case 2:
+		v_ev_1 = _mm_blend_epi16(v_unpk_ev_01, v_qe_1, 0x0F);
+		v_ev_1 = _mm_alignr_epi8(v_ev_1, v_ev_1, 8);
+		_mm_storeu_si128((__m128i *)&events[1], v_ev_1);
+		/* fallthrough */
+	case 1:
+		v_ev_0 = _mm_unpacklo_epi64(v_unpk_ev_01, v_qe_0);
+		_mm_storeu_si128((__m128i *)&events[0], v_ev_0);
+	}
+}
+
+static __rte_always_inline int
+dlb2_recv_qe_sparse_vec(struct dlb2_port *qm_port, void *events,
+			uint32_t max_events)
+{
+	/* Using unmasked idx for perf, and masking manually */
+	uint16_t idx = qm_port->cq_idx_unmasked;
+	volatile struct dlb2_dequeue_qe *cq_addr;
+
+	cq_addr = dlb2_port[qm_port->id][PORT_TYPE(qm_port)].cq_base;
+
+	uintptr_t qe_ptr_3 = (uintptr_t)&cq_addr[(idx + 12) &
+						 qm_port->cq_depth_mask];
+	uintptr_t qe_ptr_2 = (uintptr_t)&cq_addr[(idx +  8) &
+						 qm_port->cq_depth_mask];
+	uintptr_t qe_ptr_1 = (uintptr_t)&cq_addr[(idx +  4) &
+						 qm_port->cq_depth_mask];
+	uintptr_t qe_ptr_0 = (uintptr_t)&cq_addr[(idx +  0) &
+						 qm_port->cq_depth_mask];
+
+	/* Load QEs from CQ: use compiler barriers to avoid load reordering */
+	__m128i v_qe_3 = _mm_loadu_si128((const __m128i *)qe_ptr_3);
+	rte_compiler_barrier();
+	__m128i v_qe_2 = _mm_loadu_si128((const __m128i *)qe_ptr_2);
+	rte_compiler_barrier();
+	__m128i v_qe_1 = _mm_loadu_si128((const __m128i *)qe_ptr_1);
+	rte_compiler_barrier();
+	__m128i v_qe_0 = _mm_loadu_si128((const __m128i *)qe_ptr_0);
+
+	/* Generate the pkt_shuffle mask;
+	 * - Avoids load in otherwise load-heavy section of code
+	 * - Moves bytes 3,7,11,15 (gen bit bytes) to LSB bytes in XMM
+	 */
+	const uint32_t stat_shuf_bytes = (15 << 24) | (11 << 16) | (7 << 8) | 3;
+	__m128i v_zeros = _mm_setzero_si128();
+	__m128i v_ffff = _mm_cmpeq_epi8(v_zeros, v_zeros);
+	__m128i v_stat_shuf_mask = _mm_insert_epi32(v_ffff, stat_shuf_bytes, 0);
+
+	/* Extract u32 components required from the QE
+	 * - QE[64 to 95 ] for metadata (qid, sched, prio, event type, ...)
+	 * - QE[96 to 127] for status (cq gen bit, error)
+	 *
+	 * Note that stage 1 of the unpacking is re-used for both u32 extracts
+	 */
+	__m128i v_qe_02 = _mm_unpackhi_epi32(v_qe_0, v_qe_2);
+	__m128i v_qe_13 = _mm_unpackhi_epi32(v_qe_1, v_qe_3);
+	__m128i v_qe_status = _mm_unpackhi_epi32(v_qe_02, v_qe_13);
+	__m128i v_qe_meta   = _mm_unpacklo_epi32(v_qe_02, v_qe_13);
+
+	/* Status byte (gen_bit, error) handling:
+	 * - Shuffle to lanes 0,1,2,3, clear all others
+	 * - Shift right by 7 for gen bit to MSB, movemask to scalar
+	 * - Shift right by 2 for error bit to MSB, movemask to scalar
+	 */
+	__m128i v_qe_shuffled = _mm_shuffle_epi8(v_qe_status, v_stat_shuf_mask);
+	__m128i v_qes_shift_gen_bit = _mm_slli_epi32(v_qe_shuffled, 7);
+	int32_t qe_gen_bits = _mm_movemask_epi8(v_qes_shift_gen_bit) & 0xf;
+
+	/* Expected vs Reality of QE Gen bits
+	 * - cq_rolling_mask provides expected bits
+	 * - QE loads, unpacks/shuffle and movemask provides reality
+	 * - XOR of the two gives bitmask of new packets
+	 * - POPCNT to get the number of new events
+	 */
+	uint64_t rolling = qm_port->cq_rolling_mask & 0xF;
+	uint64_t qe_xor_bits = (qe_gen_bits ^ rolling);
+	uint32_t count_new = __builtin_popcount(qe_xor_bits);
+	count_new = RTE_MIN(count_new, max_events);
+	if (!count_new)
+		return 0;
+
+	/* emulate a 128 bit rotate using 2x 64-bit numbers and bit-shifts */
+
+	uint64_t m_rshift = qm_port->cq_rolling_mask >> count_new;
+	uint64_t m_lshift = qm_port->cq_rolling_mask << (64 - count_new);
+	uint64_t m2_rshift = qm_port->cq_rolling_mask_2 >> count_new;
+	uint64_t m2_lshift = qm_port->cq_rolling_mask_2 << (64 - count_new);
+
+	/* shifted out of m2 into MSB of m */
+	qm_port->cq_rolling_mask = (m_rshift | m2_lshift);
+
+	/* shifted out of m "looped back" into MSB of m2 */
+	qm_port->cq_rolling_mask_2 = (m2_rshift | m_lshift);
+
+	/* Prefetch the next QEs - should run as IPC instead of cycles */
+	rte_prefetch0(&cq_addr[(idx + 16) & qm_port->cq_depth_mask]);
+	rte_prefetch0(&cq_addr[(idx + 20) & qm_port->cq_depth_mask]);
+	rte_prefetch0(&cq_addr[(idx + 24) & qm_port->cq_depth_mask]);
+	rte_prefetch0(&cq_addr[(idx + 28) & qm_port->cq_depth_mask]);
+
+	/* Convert QEs from XMM regs to events and store events directly */
+	_process_deq_qes_vec_impl(qm_port, events, v_qe_3, v_qe_2, v_qe_1,
+				  v_qe_0, v_qe_meta, v_qe_status, count_new);
+
+	return count_new;
+}
+
 static inline void
 dlb2_inc_cq_idx(struct dlb2_port *qm_port, int cnt)
 {
@@ -3469,25 +3811,15 @@ dlb2_hw_dequeue_sparse(struct dlb2_eventdev *dlb2,
 		       uint16_t max_num,
 		       uint64_t dequeue_timeout_ticks)
 {
-	uint64_t timeout;
 	uint64_t start_ticks = 0ULL;
 	struct dlb2_port *qm_port;
 	int num = 0;
+	bool use_scalar;
+	uint64_t timeout;
 
 	qm_port = &ev_port->qm_port;
+	use_scalar = qm_port->use_scalar;
 
-	/* We have a special implementation for waiting. Wait can be:
-	 * 1) no waiting at all
-	 * 2) busy poll only
-	 * 3) wait for interrupt. If wakeup and poll time
-	 * has expired, then return to caller
-	 * 4) umonitor/umwait repeatedly up to poll time
-	 */
-
-	/* If configured for per dequeue wait, then use wait value provided
-	 * to this API. Otherwise we must use the global
-	 * value from eventdev config time.
-	 */
 	if (!dlb2->global_dequeue_wait)
 		timeout = dequeue_timeout_ticks;
 	else
@@ -3495,35 +3827,41 @@ dlb2_hw_dequeue_sparse(struct dlb2_eventdev *dlb2,
 
 	start_ticks = rte_get_timer_cycles();
 
+	use_scalar = use_scalar || (max_num & 0x3);
+
 	while (num < max_num) {
 		struct dlb2_dequeue_qe qes[DLB2_NUM_QES_PER_CACHE_LINE];
 		int num_avail;
-
-		/* Copy up to 4 QEs from the current cache line into qes */
-		num_avail = dlb2_recv_qe_sparse(qm_port, qes);
-
-		/* But don't process more than the user requested */
-		num_avail = RTE_MIN(num_avail, max_num - num);
-
-		dlb2_inc_cq_idx(qm_port, num_avail << 2);
-
-		if (num_avail == DLB2_NUM_QES_PER_CACHE_LINE)
-			num += dlb2_process_dequeue_four_qes(ev_port,
-							      qm_port,
-							      &events[num],
-							      &qes[0]);
-		else if (num_avail)
-			num += dlb2_process_dequeue_qes(ev_port,
-							 qm_port,
-							 &events[num],
-							 &qes[0],
-							 num_avail);
-		else if ((timeout == 0) || (num > 0))
-			/* Not waiting in any form, or 1+ events received? */
-			break;
-		else if (dlb2_dequeue_wait(dlb2, ev_port, qm_port,
-					   timeout, start_ticks))
-			break;
+		if (use_scalar) {
+			num_avail = dlb2_recv_qe_sparse(qm_port, qes);
+			num_avail = RTE_MIN(num_avail, max_num - num);
+			dlb2_inc_cq_idx(qm_port, num_avail << 2);
+			if (num_avail == DLB2_NUM_QES_PER_CACHE_LINE)
+				num += dlb2_process_dequeue_four_qes(ev_port,
+								  qm_port,
+								  &events[num],
+								  &qes[0]);
+			else if (num_avail)
+				num += dlb2_process_dequeue_qes(ev_port,
+								qm_port,
+								&events[num],
+								&qes[0],
+								num_avail);
+		} else { /* !use_scalar */
+			num_avail = dlb2_recv_qe_sparse_vec(qm_port,
+							    &events[num],
+							    max_num - num);
+			num += num_avail;
+			dlb2_inc_cq_idx(qm_port, num_avail << 2);
+			DLB2_INC_STAT(ev_port->stats.traffic.rx_ok, num_avail);
+		}
+		if (!num_avail) {
+			if (num > 0)
+				break;
+			else if (dlb2_dequeue_wait(dlb2, ev_port, qm_port,
+						   timeout, start_ticks))
+				break;
+		}
 	}
 
 	qm_port->owed_tokens += num;
@@ -4083,6 +4421,7 @@ dlb2_primary_eventdev_probe(struct rte_eventdev *dev,
 	dlb2->poll_interval = dlb2_args->poll_interval;
 	dlb2->sw_credit_quanta = dlb2_args->sw_credit_quanta;
 	dlb2->default_depth_thresh = dlb2_args->default_depth_thresh;
+	dlb2->vector_opts_disabled = dlb2_args->vector_opts_disabled;
 
 	err = dlb2_iface_open(&dlb2->qm_instance, name);
 	if (err < 0) {
@@ -4186,6 +4525,7 @@ dlb2_parse_params(const char *params,
 					     DLB2_POLL_INTERVAL_ARG,
 					     DLB2_SW_CREDIT_QUANTA_ARG,
 					     DLB2_DEPTH_THRESH_ARG,
+					     DLB2_VECTOR_OPTS_DISAB_ARG,
 					     NULL };
 
 	if (params != NULL && params[0] != '\0') {
@@ -4299,6 +4639,17 @@ dlb2_parse_params(const char *params,
 				return ret;
 			}
 
+			ret = rte_kvargs_process(kvlist,
+					DLB2_VECTOR_OPTS_DISAB_ARG,
+					set_vector_opts_disab,
+					&dlb2_args->vector_opts_disabled);
+			if (ret != 0) {
+				DLB2_LOG_ERR("%s: Error parsing vector opts disabled",
+					     name);
+				rte_kvargs_free(kvlist);
+				return ret;
+			}
+
 			rte_kvargs_free(kvlist);
 		}
 	}
diff --git a/drivers/event/dlb2/dlb2_priv.h b/drivers/event/dlb2/dlb2_priv.h
index cf120c92d..3140764a5 100644
--- a/drivers/event/dlb2/dlb2_priv.h
+++ b/drivers/event/dlb2/dlb2_priv.h
@@ -38,6 +38,7 @@
 #define DLB2_POLL_INTERVAL_ARG "poll_interval"
 #define DLB2_SW_CREDIT_QUANTA_ARG "sw_credit_quanta"
 #define DLB2_DEPTH_THRESH_ARG "default_depth_thresh"
+#define DLB2_VECTOR_OPTS_DISAB_ARG "vector_opts_disable"
 
 /* Begin HW related defines and structs */
 
@@ -205,9 +206,9 @@ enum dlb2_enqueue_type {
 /* hw-specific format - do not change */
 
 struct dlb2_event_type {
-	uint8_t major:4;
-	uint8_t unused:4;
-	uint8_t sub;
+	uint16_t major:4;
+	uint16_t unused:4;
+	uint16_t sub:8;
 };
 
 union dlb2_opaque_data {
@@ -351,6 +352,12 @@ struct dlb2_port {
 	uint16_t cq_idx_unmasked;
 	uint16_t cq_depth_mask;
 	uint16_t gen_bit_shift;
+	uint64_t cq_rolling_mask; /*
+				   * rotate to always have right expected
+				   * gen bits
+				   */
+	uint64_t cq_rolling_mask_2;
+	void *cq_addr_cached; /* avoid multiple refs */
 	enum dlb2_port_state state;
 	enum dlb2_configuration_state config_state;
 	int num_mapped_qids;
@@ -360,6 +367,7 @@ struct dlb2_port {
 	struct dlb2_cq_pop_qe *consume_qe;
 	struct dlb2_eventdev *dlb2; /* back ptr */
 	struct dlb2_eventdev_port *ev_port; /* back ptr */
+	bool use_scalar; /* force usage of scalar code */
 };
 
 /* Per-process per-port mmio and memory pointers */
@@ -513,9 +521,9 @@ struct dlb2_queue {
 	uint32_t num_qid_inflights; /* User config */
 	uint32_t num_atm_inflights; /* User config */
 	enum dlb2_configuration_state config_state;
-	int sched_type; /* LB queue only */
-	uint32_t id;
-	bool is_directed;
+	int  sched_type; /* LB queue only */
+	uint8_t id;
+	bool	 is_directed;
 };
 
 struct dlb2_eventdev_queue {
@@ -558,6 +566,7 @@ struct dlb2_eventdev {
 	uint32_t new_event_limit;
 	int max_num_events_override;
 	int num_dir_credits_override;
+	bool vector_opts_disabled;
 	volatile enum dlb2_run_state run_state;
 	uint16_t num_dir_queues; /* total num of evdev dir queues requested */
 	union {
@@ -617,6 +626,7 @@ struct dlb2_devargs {
 	int poll_interval;
 	int sw_credit_quanta;
 	int default_depth_thresh;
+	bool vector_opts_disabled;
 };
 
 /* End Eventdev related defines and structs */
-- 
2.23.0


^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [dpdk-dev] [PATCH v3 1/1] event/dlb2: optimize Dequeue Operations
  2021-05-01 19:07   ` [dpdk-dev] [PATCH v3 1/1] event/dlb2: optimize " McDaniel, Timothy
@ 2021-05-04  8:29     ` Jerin Jacob
  0 siblings, 0 replies; 9+ messages in thread
From: Jerin Jacob @ 2021-05-04  8:29 UTC (permalink / raw)
  To: McDaniel, Timothy
  Cc: dpdk-dev, Erik Gabriel Carrillo, Van Haaren, Harry, Jerin Jacob,
	Thomas Monjalon

On Sun, May 2, 2021 at 12:39 AM McDaniel, Timothy
<timothy.mcdaniel@intel.com> wrote:
>
> From: Timothy McDaniel <timothy.mcdaniel@intel.com>
>
> Convert code to use x86 vector instructions, thereby significantly
> improving dequeue performance.
>
> Signed-off-by: Timothy McDaniel <timothy.mcdaniel@intel.com>
> Signed-off-by: Harry Van Haaren <harry.van.haaren@intel.com>


Applied to dpdk-next-eventdev/for-main. Thanks.



> ---
>  drivers/event/dlb2/dlb2.c      | 445 +++++++++++++++++++++++++++++----
>  drivers/event/dlb2/dlb2_priv.h |  22 +-
>  2 files changed, 414 insertions(+), 53 deletions(-)
>
> diff --git a/drivers/event/dlb2/dlb2.c b/drivers/event/dlb2/dlb2.c
> index 818b1c367..c8a50cddf 100644
> --- a/drivers/event/dlb2/dlb2.c
> +++ b/drivers/event/dlb2/dlb2.c
> @@ -375,6 +375,26 @@ set_default_depth_thresh(const char *key __rte_unused,
>         return 0;
>  }
>
> +static int
> +set_vector_opts_disab(const char *key __rte_unused,
> +       const char *value,
> +       void *opaque)
> +{
> +       bool *dlb2_vector_opts_disabled = opaque;
> +
> +       if (value == NULL || opaque == NULL) {
> +               DLB2_LOG_ERR("NULL pointer\n");
> +               return -EINVAL;
> +       }
> +
> +       if ((*value == 'y') || (*value == 'Y'))
> +               *dlb2_vector_opts_disabled = true;
> +       else
> +               *dlb2_vector_opts_disabled = false;
> +
> +       return 0;
> +}
> +
>  static int
>  set_qid_depth_thresh(const char *key __rte_unused,
>                      const char *value,
> @@ -1240,6 +1260,37 @@ dlb2_event_enqueue_forward_burst_delayed(void *event_port,
>                                          const struct rte_event events[],
>                                          uint16_t num);
>
> +/* Generate the required bitmask for rotate-style expected QE gen bits.
> + * This requires a pattern of 1's and zeros, starting with expected as
> + * 1 bits, so when hardware writes 0's they're "new". This requires the
> + * ring size to be powers of 2 to wrap correctly.
> + */
> +static void
> +dlb2_hw_cq_bitmask_init(struct dlb2_port *qm_port, uint32_t cq_depth)
> +{
> +       uint64_t cq_build_mask = 0;
> +       uint32_t i;
> +
> +       if (cq_depth > 64)
> +               return; /* need to fall back to scalar code */
> +
> +       /*
> +        * all 1's in first u64, all zeros in second is correct bit pattern to
> +        * start. Special casing == 64 easier than adapting complex loop logic.
> +        */
> +       if (cq_depth == 64) {
> +               qm_port->cq_rolling_mask = 0;
> +               qm_port->cq_rolling_mask_2 = -1;
> +               return;
> +       }
> +
> +       for (i = 0; i < 64; i += (cq_depth * 2))
> +               cq_build_mask |= ((1ULL << cq_depth) - 1) << (i + cq_depth);
> +
> +       qm_port->cq_rolling_mask = cq_build_mask;
> +       qm_port->cq_rolling_mask_2 = cq_build_mask;
> +}
> +
>  static int
>  dlb2_hw_create_ldb_port(struct dlb2_eventdev *dlb2,
>                         struct dlb2_eventdev_port *ev_port,
> @@ -1357,6 +1408,8 @@ dlb2_hw_create_ldb_port(struct dlb2_eventdev *dlb2,
>         /* starting value of gen bit - it toggles at wrap time */
>         qm_port->gen_bit = 1;
>
> +       dlb2_hw_cq_bitmask_init(qm_port, qm_port->cq_depth);
> +
>         qm_port->int_armed = false;
>
>         /* Save off for later use in info and lookup APIs. */
> @@ -1408,6 +1461,18 @@ dlb2_hw_create_ldb_port(struct dlb2_eventdev *dlb2,
>                              dequeue_depth,
>                              qm_port->credits);
>         }
> +
> +       qm_port->use_scalar = false;
> +
> +#if (!defined RTE_ARCH_X86_64)
> +       qm_port->use_scalar = true;
> +#else
> +       if ((qm_port->cq_depth > 64) ||
> +           (!rte_is_power_of_2(qm_port->cq_depth)) ||
> +           (dlb2->vector_opts_disabled == true))
> +               qm_port->use_scalar = true;
> +#endif
> +
>         rte_spinlock_unlock(&handle->resource_lock);
>
>         return 0;
> @@ -1553,6 +1618,7 @@ dlb2_hw_create_dir_port(struct dlb2_eventdev *dlb2,
>         qm_port->gen_bit_shift = __builtin_popcount(qm_port->cq_depth_mask);
>         /* starting value of gen bit - it toggles at wrap time */
>         qm_port->gen_bit = 1;
> +       dlb2_hw_cq_bitmask_init(qm_port, qm_port->cq_depth);
>
>         qm_port->int_armed = false;
>
> @@ -1593,6 +1659,16 @@ dlb2_hw_create_dir_port(struct dlb2_eventdev *dlb2,
>                              dequeue_depth,
>                              credit_high_watermark);
>         }
> +
> +#if (!defined RTE_ARCH_X86_64)
> +       qm_port->use_scalar = true;
> +#else
> +       if ((qm_port->cq_depth > 64) ||
> +           (!rte_is_power_of_2(qm_port->cq_depth)) ||
> +           (dlb2->vector_opts_disabled == true))
> +               qm_port->use_scalar = true;
> +#endif
> +
>         rte_spinlock_unlock(&handle->resource_lock);
>
>         return 0;
> @@ -2987,10 +3063,11 @@ dlb2_event_release(struct dlb2_eventdev *dlb2,
>                 int j = 0;
>
>                 /* Zero-out QEs */
> -               qm_port->qe4[0].cmd_byte = 0;
> -               qm_port->qe4[1].cmd_byte = 0;
> -               qm_port->qe4[2].cmd_byte = 0;
> -               qm_port->qe4[3].cmd_byte = 0;
> +               _mm_storeu_si128((void *)&qm_port->qe4[0], _mm_setzero_si128());
> +               _mm_storeu_si128((void *)&qm_port->qe4[1], _mm_setzero_si128());
> +               _mm_storeu_si128((void *)&qm_port->qe4[2], _mm_setzero_si128());
> +               _mm_storeu_si128((void *)&qm_port->qe4[3], _mm_setzero_si128());
> +
>
>                 for (; j < DLB2_NUM_QES_PER_CACHE_LINE && (i + j) < n; j++) {
>                         int16_t thresh = qm_port->token_pop_thresh;
> @@ -3020,7 +3097,7 @@ dlb2_event_release(struct dlb2_eventdev *dlb2,
>
>  sw_credit_update:
>         /* each release returns one credit */
> -       if (!ev_port->outstanding_releases) {
> +       if (unlikely(!ev_port->outstanding_releases)) {
>                 DLB2_LOG_ERR("%s: Outstanding releases underflowed.\n",
>                              __func__);
>                 return;
> @@ -3137,7 +3214,7 @@ dlb2_dequeue_wait(struct dlb2_eventdev *dlb2,
>         return 0;
>  }
>
> -static inline int
> +static __rte_noinline int
>  dlb2_process_dequeue_qes(struct dlb2_eventdev_port *ev_port,
>                          struct dlb2_port *qm_port,
>                          struct rte_event *events,
> @@ -3406,8 +3483,7 @@ dlb2_recv_qe_sparse(struct dlb2_port *qm_port, struct dlb2_dequeue_qe *qe)
>
>         cq_addr = dlb2_port[qm_port->id][PORT_TYPE(qm_port)].cq_base;
>
> -       idx = qm_port->cq_idx;
> -
> +       idx = qm_port->cq_idx_unmasked & qm_port->cq_depth_mask;
>         /* Load the next 4 QEs */
>         addr[0] = (uintptr_t)&cq_addr[idx];
>         addr[1] = (uintptr_t)&cq_addr[(idx +  4) & qm_port->cq_depth_mask];
> @@ -3452,6 +3528,272 @@ dlb2_recv_qe_sparse(struct dlb2_port *qm_port, struct dlb2_dequeue_qe *qe)
>         return __builtin_popcount(gen_bits);
>  }
>
> +static inline void
> +_process_deq_qes_vec_impl(struct dlb2_port *qm_port,
> +                         struct rte_event *events,
> +                         __m128i v_qe_3,
> +                         __m128i v_qe_2,
> +                         __m128i v_qe_1,
> +                         __m128i v_qe_0,
> +                         __m128i v_qe_meta,
> +                         __m128i v_qe_status,
> +                         uint32_t valid_events)
> +{
> +       /* Look up the event QIDs, using the hardware QIDs to index the
> +        * port's QID mapping.
> +        *
> +        * Each v_qe_[0-4] is just a 16-byte load of the whole QE. It is
> +        * passed along in registers as the QE data is required later.
> +        *
> +        * v_qe_meta is an u32 unpack of all 4x QEs. Aka, it contains one
> +        * 32-bit slice of each QE, so makes up a full SSE register. This
> +        * allows parallel processing of 4x QEs in a single register.
> +        */
> +
> +       __m128i v_qid_done = {0};
> +       int hw_qid0 = _mm_extract_epi8(v_qe_meta, 2);
> +       int hw_qid1 = _mm_extract_epi8(v_qe_meta, 6);
> +       int hw_qid2 = _mm_extract_epi8(v_qe_meta, 10);
> +       int hw_qid3 = _mm_extract_epi8(v_qe_meta, 14);
> +
> +       int ev_qid0 = qm_port->qid_mappings[hw_qid0];
> +       int ev_qid1 = qm_port->qid_mappings[hw_qid1];
> +       int ev_qid2 = qm_port->qid_mappings[hw_qid2];
> +       int ev_qid3 = qm_port->qid_mappings[hw_qid3];
> +
> +       v_qid_done = _mm_insert_epi8(v_qid_done, ev_qid0, 2);
> +       v_qid_done = _mm_insert_epi8(v_qid_done, ev_qid1, 6);
> +       v_qid_done = _mm_insert_epi8(v_qid_done, ev_qid2, 10);
> +       v_qid_done = _mm_insert_epi8(v_qid_done, ev_qid3, 14);
> +
> +       /* Schedule field remapping using byte shuffle
> +        * - Full byte containing sched field handled here (op, rsvd are zero)
> +        * - Note sanitizing the register requires two masking ANDs:
> +        *   1) to strip prio/msg_type from byte for correct shuffle lookup
> +        *   2) to strip any non-sched-field lanes from any results to OR later
> +        * - Final byte result is >> 10 to another byte-lane inside the u32.
> +        *   This makes the final combination OR easier to make the rte_event.
> +        */
> +       __m128i v_sched_done;
> +       __m128i v_sched_bits;
> +       {
> +               static const uint8_t sched_type_map[16] = {
> +                       [DLB2_SCHED_ATOMIC] = RTE_SCHED_TYPE_ATOMIC,
> +                       [DLB2_SCHED_UNORDERED] = RTE_SCHED_TYPE_PARALLEL,
> +                       [DLB2_SCHED_ORDERED] = RTE_SCHED_TYPE_ORDERED,
> +                       [DLB2_SCHED_DIRECTED] = RTE_SCHED_TYPE_ATOMIC,
> +               };
> +               static const uint8_t sched_and_mask[16] = {
> +                       0x00, 0x00, 0x00, 0x03,
> +                       0x00, 0x00, 0x00, 0x03,
> +                       0x00, 0x00, 0x00, 0x03,
> +                       0x00, 0x00, 0x00, 0x03,
> +               };
> +               const __m128i v_sched_map = _mm_loadu_si128(
> +                                            (const __m128i *)sched_type_map);
> +               __m128i v_sched_mask = _mm_loadu_si128(
> +                                            (const __m128i *)&sched_and_mask);
> +               v_sched_bits = _mm_and_si128(v_qe_meta, v_sched_mask);
> +               __m128i v_sched_remapped = _mm_shuffle_epi8(v_sched_map,
> +                                                           v_sched_bits);
> +               __m128i v_preshift = _mm_and_si128(v_sched_remapped,
> +                                                  v_sched_mask);
> +               v_sched_done = _mm_srli_epi32(v_preshift, 10);
> +       }
> +
> +       /* Priority handling
> +        * - QE provides 3 bits of priority
> +        * - Shift << 3 to move to MSBs for byte-prio in rte_event
> +        * - Mask bits to avoid pollution, leaving only 3 prio MSBs in reg
> +        */
> +       __m128i v_prio_done;
> +       {
> +               static const uint8_t prio_mask[16] = {
> +                       0x00, 0x00, 0x00, 0x07 << 5,
> +                       0x00, 0x00, 0x00, 0x07 << 5,
> +                       0x00, 0x00, 0x00, 0x07 << 5,
> +                       0x00, 0x00, 0x00, 0x07 << 5,
> +               };
> +               __m128i v_prio_mask  = _mm_loadu_si128(
> +                                               (const __m128i *)prio_mask);
> +               __m128i v_prio_shifted = _mm_slli_epi32(v_qe_meta, 3);
> +               v_prio_done = _mm_and_si128(v_prio_shifted, v_prio_mask);
> +       }
> +
> +       /* Event Sub/Type handling:
> +        * we want to keep the lower 12 bits of each QE. Shift up by 20 bits
> +        * to get the sub/ev type data into rte_event location, clearing the
> +        * lower 20 bits in the process.
> +        */
> +       __m128i v_types_done;
> +       {
> +               static const uint8_t event_mask[16] = {
> +                       0x0f, 0x00, 0x00, 0x00,
> +                       0x0f, 0x00, 0x00, 0x00,
> +                       0x0f, 0x00, 0x00, 0x00,
> +                       0x0f, 0x00, 0x00, 0x00,
> +               };
> +               static const uint8_t sub_event_mask[16] = {
> +                       0xff, 0x00, 0x00, 0x00,
> +                       0xff, 0x00, 0x00, 0x00,
> +                       0xff, 0x00, 0x00, 0x00,
> +                       0xff, 0x00, 0x00, 0x00,
> +               };
> +               static const uint8_t flow_mask[16] = {
> +                       0xff, 0xff, 0x00, 0x00,
> +                       0xff, 0xff, 0x00, 0x00,
> +                       0xff, 0xff, 0x00, 0x00,
> +                       0xff, 0xff, 0x00, 0x00,
> +               };
> +               __m128i v_event_mask  = _mm_loadu_si128(
> +                                       (const __m128i *)event_mask);
> +               __m128i v_sub_event_mask  = _mm_loadu_si128(
> +                                       (const __m128i *)sub_event_mask);
> +               __m128i v_flow_mask  = _mm_loadu_si128(
> +                                      (const __m128i *)flow_mask);
> +               __m128i v_sub = _mm_srli_epi32(v_qe_meta, 8);
> +               v_sub = _mm_and_si128(v_sub, v_sub_event_mask);
> +               __m128i v_type = _mm_and_si128(v_qe_meta, v_event_mask);
> +               v_type = _mm_slli_epi32(v_type, 8);
> +               v_types_done = _mm_or_si128(v_type, v_sub);
> +               v_types_done = _mm_slli_epi32(v_types_done, 20);
> +               __m128i v_flow = _mm_and_si128(v_qe_status, v_flow_mask);
> +               v_types_done = _mm_or_si128(v_types_done, v_flow);
> +       }
> +
> +       /* Combine QID, Sched and Prio fields, then Shift >> 8 bits to align
> +        * with the rte_event, allowing unpacks to move/blend with payload.
> +        */
> +       __m128i v_q_s_p_done;
> +       {
> +               __m128i v_qid_sched = _mm_or_si128(v_qid_done, v_sched_done);
> +               __m128i v_q_s_prio = _mm_or_si128(v_qid_sched, v_prio_done);
> +               v_q_s_p_done = _mm_srli_epi32(v_q_s_prio, 8);
> +       }
> +
> +       __m128i v_unpk_ev_23, v_unpk_ev_01, v_ev_2, v_ev_3, v_ev_0, v_ev_1;
> +
> +       /* Unpack evs into u64 metadata, then indiv events */
> +       v_unpk_ev_23 = _mm_unpackhi_epi32(v_types_done, v_q_s_p_done);
> +       v_unpk_ev_01 = _mm_unpacklo_epi32(v_types_done, v_q_s_p_done);
> +
> +       switch (valid_events) {
> +       case 4:
> +               v_ev_3 = _mm_blend_epi16(v_unpk_ev_23, v_qe_3, 0x0F);
> +               v_ev_3 = _mm_alignr_epi8(v_ev_3, v_ev_3, 8);
> +               _mm_storeu_si128((__m128i *)&events[3], v_ev_3);
> +               /* fallthrough */
> +       case 3:
> +               v_ev_2 = _mm_unpacklo_epi64(v_unpk_ev_23, v_qe_2);
> +               _mm_storeu_si128((__m128i *)&events[2], v_ev_2);
> +               /* fallthrough */
> +       case 2:
> +               v_ev_1 = _mm_blend_epi16(v_unpk_ev_01, v_qe_1, 0x0F);
> +               v_ev_1 = _mm_alignr_epi8(v_ev_1, v_ev_1, 8);
> +               _mm_storeu_si128((__m128i *)&events[1], v_ev_1);
> +               /* fallthrough */
> +       case 1:
> +               v_ev_0 = _mm_unpacklo_epi64(v_unpk_ev_01, v_qe_0);
> +               _mm_storeu_si128((__m128i *)&events[0], v_ev_0);
> +       }
> +}
> +
> +static __rte_always_inline int
> +dlb2_recv_qe_sparse_vec(struct dlb2_port *qm_port, void *events,
> +                       uint32_t max_events)
> +{
> +       /* Using unmasked idx for perf, and masking manually */
> +       uint16_t idx = qm_port->cq_idx_unmasked;
> +       volatile struct dlb2_dequeue_qe *cq_addr;
> +
> +       cq_addr = dlb2_port[qm_port->id][PORT_TYPE(qm_port)].cq_base;
> +
> +       uintptr_t qe_ptr_3 = (uintptr_t)&cq_addr[(idx + 12) &
> +                                                qm_port->cq_depth_mask];
> +       uintptr_t qe_ptr_2 = (uintptr_t)&cq_addr[(idx +  8) &
> +                                                qm_port->cq_depth_mask];
> +       uintptr_t qe_ptr_1 = (uintptr_t)&cq_addr[(idx +  4) &
> +                                                qm_port->cq_depth_mask];
> +       uintptr_t qe_ptr_0 = (uintptr_t)&cq_addr[(idx +  0) &
> +                                                qm_port->cq_depth_mask];
> +
> +       /* Load QEs from CQ: use compiler barriers to avoid load reordering */
> +       __m128i v_qe_3 = _mm_loadu_si128((const __m128i *)qe_ptr_3);
> +       rte_compiler_barrier();
> +       __m128i v_qe_2 = _mm_loadu_si128((const __m128i *)qe_ptr_2);
> +       rte_compiler_barrier();
> +       __m128i v_qe_1 = _mm_loadu_si128((const __m128i *)qe_ptr_1);
> +       rte_compiler_barrier();
> +       __m128i v_qe_0 = _mm_loadu_si128((const __m128i *)qe_ptr_0);
> +
> +       /* Generate the pkt_shuffle mask;
> +        * - Avoids load in otherwise load-heavy section of code
> +        * - Moves bytes 3,7,11,15 (gen bit bytes) to LSB bytes in XMM
> +        */
> +       const uint32_t stat_shuf_bytes = (15 << 24) | (11 << 16) | (7 << 8) | 3;
> +       __m128i v_zeros = _mm_setzero_si128();
> +       __m128i v_ffff = _mm_cmpeq_epi8(v_zeros, v_zeros);
> +       __m128i v_stat_shuf_mask = _mm_insert_epi32(v_ffff, stat_shuf_bytes, 0);
> +
> +       /* Extract u32 components required from the QE
> +        * - QE[64 to 95 ] for metadata (qid, sched, prio, event type, ...)
> +        * - QE[96 to 127] for status (cq gen bit, error)
> +        *
> +        * Note that stage 1 of the unpacking is re-used for both u32 extracts
> +        */
> +       __m128i v_qe_02 = _mm_unpackhi_epi32(v_qe_0, v_qe_2);
> +       __m128i v_qe_13 = _mm_unpackhi_epi32(v_qe_1, v_qe_3);
> +       __m128i v_qe_status = _mm_unpackhi_epi32(v_qe_02, v_qe_13);
> +       __m128i v_qe_meta   = _mm_unpacklo_epi32(v_qe_02, v_qe_13);
> +
> +       /* Status byte (gen_bit, error) handling:
> +        * - Shuffle to lanes 0,1,2,3, clear all others
> +        * - Shift right by 7 for gen bit to MSB, movemask to scalar
> +        * - Shift right by 2 for error bit to MSB, movemask to scalar
> +        */
> +       __m128i v_qe_shuffled = _mm_shuffle_epi8(v_qe_status, v_stat_shuf_mask);
> +       __m128i v_qes_shift_gen_bit = _mm_slli_epi32(v_qe_shuffled, 7);
> +       int32_t qe_gen_bits = _mm_movemask_epi8(v_qes_shift_gen_bit) & 0xf;
> +
> +       /* Expected vs Reality of QE Gen bits
> +        * - cq_rolling_mask provides expected bits
> +        * - QE loads, unpacks/shuffle and movemask provides reality
> +        * - XOR of the two gives bitmask of new packets
> +        * - POPCNT to get the number of new events
> +        */
> +       uint64_t rolling = qm_port->cq_rolling_mask & 0xF;
> +       uint64_t qe_xor_bits = (qe_gen_bits ^ rolling);
> +       uint32_t count_new = __builtin_popcount(qe_xor_bits);
> +       count_new = RTE_MIN(count_new, max_events);
> +       if (!count_new)
> +               return 0;
> +
> +       /* emulate a 128 bit rotate using 2x 64-bit numbers and bit-shifts */
> +
> +       uint64_t m_rshift = qm_port->cq_rolling_mask >> count_new;
> +       uint64_t m_lshift = qm_port->cq_rolling_mask << (64 - count_new);
> +       uint64_t m2_rshift = qm_port->cq_rolling_mask_2 >> count_new;
> +       uint64_t m2_lshift = qm_port->cq_rolling_mask_2 << (64 - count_new);
> +
> +       /* shifted out of m2 into MSB of m */
> +       qm_port->cq_rolling_mask = (m_rshift | m2_lshift);
> +
> +       /* shifted out of m "looped back" into MSB of m2 */
> +       qm_port->cq_rolling_mask_2 = (m2_rshift | m_lshift);
> +
> +       /* Prefetch the next QEs - should run as IPC instead of cycles */
> +       rte_prefetch0(&cq_addr[(idx + 16) & qm_port->cq_depth_mask]);
> +       rte_prefetch0(&cq_addr[(idx + 20) & qm_port->cq_depth_mask]);
> +       rte_prefetch0(&cq_addr[(idx + 24) & qm_port->cq_depth_mask]);
> +       rte_prefetch0(&cq_addr[(idx + 28) & qm_port->cq_depth_mask]);
> +
> +       /* Convert QEs from XMM regs to events and store events directly */
> +       _process_deq_qes_vec_impl(qm_port, events, v_qe_3, v_qe_2, v_qe_1,
> +                                 v_qe_0, v_qe_meta, v_qe_status, count_new);
> +
> +       return count_new;
> +}
> +
>  static inline void
>  dlb2_inc_cq_idx(struct dlb2_port *qm_port, int cnt)
>  {
> @@ -3469,25 +3811,15 @@ dlb2_hw_dequeue_sparse(struct dlb2_eventdev *dlb2,
>                        uint16_t max_num,
>                        uint64_t dequeue_timeout_ticks)
>  {
> -       uint64_t timeout;
>         uint64_t start_ticks = 0ULL;
>         struct dlb2_port *qm_port;
>         int num = 0;
> +       bool use_scalar;
> +       uint64_t timeout;
>
>         qm_port = &ev_port->qm_port;
> +       use_scalar = qm_port->use_scalar;
>
> -       /* We have a special implementation for waiting. Wait can be:
> -        * 1) no waiting at all
> -        * 2) busy poll only
> -        * 3) wait for interrupt. If wakeup and poll time
> -        * has expired, then return to caller
> -        * 4) umonitor/umwait repeatedly up to poll time
> -        */
> -
> -       /* If configured for per dequeue wait, then use wait value provided
> -        * to this API. Otherwise we must use the global
> -        * value from eventdev config time.
> -        */
>         if (!dlb2->global_dequeue_wait)
>                 timeout = dequeue_timeout_ticks;
>         else
> @@ -3495,35 +3827,41 @@ dlb2_hw_dequeue_sparse(struct dlb2_eventdev *dlb2,
>
>         start_ticks = rte_get_timer_cycles();
>
> +       use_scalar = use_scalar || (max_num & 0x3);
> +
>         while (num < max_num) {
>                 struct dlb2_dequeue_qe qes[DLB2_NUM_QES_PER_CACHE_LINE];
>                 int num_avail;
> -
> -               /* Copy up to 4 QEs from the current cache line into qes */
> -               num_avail = dlb2_recv_qe_sparse(qm_port, qes);
> -
> -               /* But don't process more than the user requested */
> -               num_avail = RTE_MIN(num_avail, max_num - num);
> -
> -               dlb2_inc_cq_idx(qm_port, num_avail << 2);
> -
> -               if (num_avail == DLB2_NUM_QES_PER_CACHE_LINE)
> -                       num += dlb2_process_dequeue_four_qes(ev_port,
> -                                                             qm_port,
> -                                                             &events[num],
> -                                                             &qes[0]);
> -               else if (num_avail)
> -                       num += dlb2_process_dequeue_qes(ev_port,
> -                                                        qm_port,
> -                                                        &events[num],
> -                                                        &qes[0],
> -                                                        num_avail);
> -               else if ((timeout == 0) || (num > 0))
> -                       /* Not waiting in any form, or 1+ events received? */
> -                       break;
> -               else if (dlb2_dequeue_wait(dlb2, ev_port, qm_port,
> -                                          timeout, start_ticks))
> -                       break;
> +               if (use_scalar) {
> +                       num_avail = dlb2_recv_qe_sparse(qm_port, qes);
> +                       num_avail = RTE_MIN(num_avail, max_num - num);
> +                       dlb2_inc_cq_idx(qm_port, num_avail << 2);
> +                       if (num_avail == DLB2_NUM_QES_PER_CACHE_LINE)
> +                               num += dlb2_process_dequeue_four_qes(ev_port,
> +                                                                 qm_port,
> +                                                                 &events[num],
> +                                                                 &qes[0]);
> +                       else if (num_avail)
> +                               num += dlb2_process_dequeue_qes(ev_port,
> +                                                               qm_port,
> +                                                               &events[num],
> +                                                               &qes[0],
> +                                                               num_avail);
> +               } else { /* !use_scalar */
> +                       num_avail = dlb2_recv_qe_sparse_vec(qm_port,
> +                                                           &events[num],
> +                                                           max_num - num);
> +                       num += num_avail;
> +                       dlb2_inc_cq_idx(qm_port, num_avail << 2);
> +                       DLB2_INC_STAT(ev_port->stats.traffic.rx_ok, num_avail);
> +               }
> +               if (!num_avail) {
> +                       if (num > 0)
> +                               break;
> +                       else if (dlb2_dequeue_wait(dlb2, ev_port, qm_port,
> +                                                  timeout, start_ticks))
> +                               break;
> +               }
>         }
>
>         qm_port->owed_tokens += num;
> @@ -4083,6 +4421,7 @@ dlb2_primary_eventdev_probe(struct rte_eventdev *dev,
>         dlb2->poll_interval = dlb2_args->poll_interval;
>         dlb2->sw_credit_quanta = dlb2_args->sw_credit_quanta;
>         dlb2->default_depth_thresh = dlb2_args->default_depth_thresh;
> +       dlb2->vector_opts_disabled = dlb2_args->vector_opts_disabled;
>
>         err = dlb2_iface_open(&dlb2->qm_instance, name);
>         if (err < 0) {
> @@ -4186,6 +4525,7 @@ dlb2_parse_params(const char *params,
>                                              DLB2_POLL_INTERVAL_ARG,
>                                              DLB2_SW_CREDIT_QUANTA_ARG,
>                                              DLB2_DEPTH_THRESH_ARG,
> +                                            DLB2_VECTOR_OPTS_DISAB_ARG,
>                                              NULL };
>
>         if (params != NULL && params[0] != '\0') {
> @@ -4299,6 +4639,17 @@ dlb2_parse_params(const char *params,
>                                 return ret;
>                         }
>
> +                       ret = rte_kvargs_process(kvlist,
> +                                       DLB2_VECTOR_OPTS_DISAB_ARG,
> +                                       set_vector_opts_disab,
> +                                       &dlb2_args->vector_opts_disabled);
> +                       if (ret != 0) {
> +                               DLB2_LOG_ERR("%s: Error parsing vector opts disabled",
> +                                            name);
> +                               rte_kvargs_free(kvlist);
> +                               return ret;
> +                       }
> +
>                         rte_kvargs_free(kvlist);
>                 }
>         }
> diff --git a/drivers/event/dlb2/dlb2_priv.h b/drivers/event/dlb2/dlb2_priv.h
> index cf120c92d..3140764a5 100644
> --- a/drivers/event/dlb2/dlb2_priv.h
> +++ b/drivers/event/dlb2/dlb2_priv.h
> @@ -38,6 +38,7 @@
>  #define DLB2_POLL_INTERVAL_ARG "poll_interval"
>  #define DLB2_SW_CREDIT_QUANTA_ARG "sw_credit_quanta"
>  #define DLB2_DEPTH_THRESH_ARG "default_depth_thresh"
> +#define DLB2_VECTOR_OPTS_DISAB_ARG "vector_opts_disable"
>
>  /* Begin HW related defines and structs */
>
> @@ -205,9 +206,9 @@ enum dlb2_enqueue_type {
>  /* hw-specific format - do not change */
>
>  struct dlb2_event_type {
> -       uint8_t major:4;
> -       uint8_t unused:4;
> -       uint8_t sub;
> +       uint16_t major:4;
> +       uint16_t unused:4;
> +       uint16_t sub:8;
>  };
>
>  union dlb2_opaque_data {
> @@ -351,6 +352,12 @@ struct dlb2_port {
>         uint16_t cq_idx_unmasked;
>         uint16_t cq_depth_mask;
>         uint16_t gen_bit_shift;
> +       uint64_t cq_rolling_mask; /*
> +                                  * rotate to always have right expected
> +                                  * gen bits
> +                                  */
> +       uint64_t cq_rolling_mask_2;
> +       void *cq_addr_cached; /* avoid multiple refs */
>         enum dlb2_port_state state;
>         enum dlb2_configuration_state config_state;
>         int num_mapped_qids;
> @@ -360,6 +367,7 @@ struct dlb2_port {
>         struct dlb2_cq_pop_qe *consume_qe;
>         struct dlb2_eventdev *dlb2; /* back ptr */
>         struct dlb2_eventdev_port *ev_port; /* back ptr */
> +       bool use_scalar; /* force usage of scalar code */
>  };
>
>  /* Per-process per-port mmio and memory pointers */
> @@ -513,9 +521,9 @@ struct dlb2_queue {
>         uint32_t num_qid_inflights; /* User config */
>         uint32_t num_atm_inflights; /* User config */
>         enum dlb2_configuration_state config_state;
> -       int sched_type; /* LB queue only */
> -       uint32_t id;
> -       bool is_directed;
> +       int  sched_type; /* LB queue only */
> +       uint8_t id;
> +       bool     is_directed;
>  };
>
>  struct dlb2_eventdev_queue {
> @@ -558,6 +566,7 @@ struct dlb2_eventdev {
>         uint32_t new_event_limit;
>         int max_num_events_override;
>         int num_dir_credits_override;
> +       bool vector_opts_disabled;
>         volatile enum dlb2_run_state run_state;
>         uint16_t num_dir_queues; /* total num of evdev dir queues requested */
>         union {
> @@ -617,6 +626,7 @@ struct dlb2_devargs {
>         int poll_interval;
>         int sw_credit_quanta;
>         int default_depth_thresh;
> +       bool vector_opts_disabled;
>  };
>
>  /* End Eventdev related defines and structs */
> --
> 2.23.0
>

^ permalink raw reply	[flat|nested] 9+ messages in thread

end of thread, other threads:[~2021-05-04  8:29 UTC | newest]

Thread overview: 9+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-03-17 17:02 [dpdk-dev] [PATCH] event/dlb2: Optimize Dequeue Operations Timothy McDaniel
2021-03-21 11:10 ` Jerin Jacob
2021-04-13 20:30 ` [dpdk-dev] [PATCH v2 0/1] Optimize DLB Dequeue Timothy McDaniel
2021-04-13 20:30   ` [dpdk-dev] [PATCH v2 1/1] event/dlb: optimize Dequeue Operations Timothy McDaniel
2021-04-29  7:20   ` [dpdk-dev] [PATCH v2 0/1] Optimize DLB Dequeue Jerin Jacob
2021-04-29 13:45     ` McDaniel, Timothy
2021-05-01 19:07 ` [dpdk-dev] [PATCH v3 0/1] Optimize DLB2 Dequeue Operations McDaniel, Timothy
2021-05-01 19:07   ` [dpdk-dev] [PATCH v3 1/1] event/dlb2: optimize " McDaniel, Timothy
2021-05-04  8:29     ` Jerin Jacob

DPDK patches and discussions

This inbox may be cloned and mirrored by anyone:

	git clone --mirror https://inbox.dpdk.org/dev/0 dev/git/0.git

	# If you have public-inbox 1.1+ installed, you may
	# initialize and index your mirror using the following commands:
	public-inbox-init -V2 dev dev/ https://inbox.dpdk.org/dev \
		dev@dpdk.org
	public-inbox-index dev

Example config snippet for mirrors.
Newsgroup available over NNTP:
	nntp://inbox.dpdk.org/inbox.dpdk.dev


AGPL code for this site: git clone https://public-inbox.org/public-inbox.git