DPDK patches and discussions
 help / color / mirror / Atom feed
* [dpdk-dev] [PATCH 0/2] add implicit release disable capability
@ 2017-11-30  4:20 Gage Eads
  2017-11-30  4:20 ` [dpdk-dev] [PATCH 1/2] eventdev: " Gage Eads
  2017-11-30  4:20 ` [dpdk-dev] [PATCH 2/2] event/sw: simplify credit scheme Gage Eads
  0 siblings, 2 replies; 10+ messages in thread
From: Gage Eads @ 2017-11-30  4:20 UTC (permalink / raw)
  To: dev
  Cc: jerin.jacob, harry.van.haaren, bruce.richardson, hemant.agrawal,
	nipun.gupta, santosh.shukla

This patch set introduces an event device capability for disabling implicit
release -- in which rte_event_dequeue_burst() releases any unreleased,
previously dequeued events -- on a per-port basis. The port configuration
struct is likewise extended with a disable_implicit_release option, for PMDs
that support the capability. If the option is set for a port, the application
is responsible for releasing the port's dequeued events (either with
RTE_EVENT_OP_RELEASE or RTE_EVENT_OP_FORWARD).

This behavior was previously discussed
(http://dpdk.org/ml/archives/dev/2017-November/081037.html) as a way to avoid
flow migration problems when using eventdev with look-aside hardware (such as
cryptodev). For instance, if each worker puts dequeued events in a per-worker
crypto request queue and periodically polls the response queue to forward the
packets, it shouldn't release the events until the crypto operation completes.
By disabling the implicit release, it can dequeue further events and process
them while the crypto operation(s) execute in parallel.

The only PMD that supports this capability is the sw PMD. For all PMDs, the
default port configuration is implicit releases enabled.

The second patch in the series isn't directly related to implicit release,
but the code changes depend on the first patch.

Gage Eads (2):
  eventdev: add implicit release disable capability
  event/sw: simplify credit scheme

 drivers/event/dpaa2/dpaa2_eventdev.c       |  2 ++
 drivers/event/octeontx/ssovf_evdev.c       |  1 +
 drivers/event/skeleton/skeleton_eventdev.c |  1 +
 drivers/event/sw/sw_evdev.c                | 10 +++++---
 drivers/event/sw/sw_evdev.h                |  1 +
 drivers/event/sw/sw_evdev_worker.c         | 38 +++++++++++++++---------------
 examples/eventdev_pipeline_sw_pmd/main.c   | 24 ++++++++++++++++++-
 lib/librte_eventdev/rte_eventdev.c         |  9 +++++++
 lib/librte_eventdev/rte_eventdev.h         | 23 +++++++++++++++---
 test/test/test_eventdev.c                  |  9 +++++++
 test/test/test_eventdev_sw.c               | 20 ++++++++++++++--
 11 files changed, 110 insertions(+), 28 deletions(-)

-- 
2.7.4

^ permalink raw reply	[flat|nested] 10+ messages in thread

* [dpdk-dev] [PATCH 1/2] eventdev: add implicit release disable capability
  2017-11-30  4:20 [dpdk-dev] [PATCH 0/2] add implicit release disable capability Gage Eads
@ 2017-11-30  4:20 ` Gage Eads
  2017-12-11 12:36   ` Van Haaren, Harry
  2017-12-11 17:56   ` [dpdk-dev] [PATCH v2 " Gage Eads
  2017-11-30  4:20 ` [dpdk-dev] [PATCH 2/2] event/sw: simplify credit scheme Gage Eads
  1 sibling, 2 replies; 10+ messages in thread
From: Gage Eads @ 2017-11-30  4:20 UTC (permalink / raw)
  To: dev
  Cc: jerin.jacob, harry.van.haaren, bruce.richardson, hemant.agrawal,
	nipun.gupta, santosh.shukla

This commit introduces a capability for disabling the "implicit" release
functionality for a port, which prevents the eventdev PMD from issuing
outstanding releases for previously dequeued events when dequeuing a new
batch of events.

If a PMD does not support this capability, the application will receive an
error if it attempts to setup a port with implicit releases disabled.
Otherwise, if the port is configured with implicit releases disabled, the
application must release each dequeued event by invoking
rte_event_enqueue_burst() with RTE_EVENT_OP_RELEASE or
RTE_EVENT_OP_FORWARD.

Signed-off-by: Gage Eads <gage.eads@intel.com>
---
 drivers/event/dpaa2/dpaa2_eventdev.c       |  2 ++
 drivers/event/octeontx/ssovf_evdev.c       |  1 +
 drivers/event/skeleton/skeleton_eventdev.c |  1 +
 drivers/event/sw/sw_evdev.c                | 10 +++++++---
 drivers/event/sw/sw_evdev.h                |  1 +
 drivers/event/sw/sw_evdev_worker.c         | 16 ++++++++--------
 examples/eventdev_pipeline_sw_pmd/main.c   | 24 +++++++++++++++++++++++-
 lib/librte_eventdev/rte_eventdev.c         |  9 +++++++++
 lib/librte_eventdev/rte_eventdev.h         | 23 ++++++++++++++++++++---
 test/test/test_eventdev.c                  |  9 +++++++++
 test/test/test_eventdev_sw.c               | 20 ++++++++++++++++++--
 11 files changed, 99 insertions(+), 17 deletions(-)

diff --git a/drivers/event/dpaa2/dpaa2_eventdev.c b/drivers/event/dpaa2/dpaa2_eventdev.c
index eeeb231..236b211 100644
--- a/drivers/event/dpaa2/dpaa2_eventdev.c
+++ b/drivers/event/dpaa2/dpaa2_eventdev.c
@@ -440,6 +440,8 @@ dpaa2_eventdev_port_def_conf(struct rte_eventdev *dev, uint8_t port_id,
 		DPAA2_EVENT_MAX_PORT_DEQUEUE_DEPTH;
 	port_conf->enqueue_depth =
 		DPAA2_EVENT_MAX_PORT_ENQUEUE_DEPTH;
+	port_conf->disable_implicit_release =
+		0;
 }
 
 static void
diff --git a/drivers/event/octeontx/ssovf_evdev.c b/drivers/event/octeontx/ssovf_evdev.c
index 117b145..b80a6c0 100644
--- a/drivers/event/octeontx/ssovf_evdev.c
+++ b/drivers/event/octeontx/ssovf_evdev.c
@@ -252,6 +252,7 @@ ssovf_port_def_conf(struct rte_eventdev *dev, uint8_t port_id,
 	port_conf->new_event_threshold = edev->max_num_events;
 	port_conf->dequeue_depth = 1;
 	port_conf->enqueue_depth = 1;
+	port_conf->disable_implicit_release = 0;
 }
 
 static void
diff --git a/drivers/event/skeleton/skeleton_eventdev.c b/drivers/event/skeleton/skeleton_eventdev.c
index bb554c3..8b62361 100644
--- a/drivers/event/skeleton/skeleton_eventdev.c
+++ b/drivers/event/skeleton/skeleton_eventdev.c
@@ -237,6 +237,7 @@ skeleton_eventdev_port_def_conf(struct rte_eventdev *dev, uint8_t port_id,
 	port_conf->new_event_threshold = 32 * 1024;
 	port_conf->dequeue_depth = 16;
 	port_conf->enqueue_depth = 16;
+	port_conf->disable_implicit_release = false;
 }
 
 static void
diff --git a/drivers/event/sw/sw_evdev.c b/drivers/event/sw/sw_evdev.c
index fd11079..64b02d1 100644
--- a/drivers/event/sw/sw_evdev.c
+++ b/drivers/event/sw/sw_evdev.c
@@ -181,6 +181,7 @@ sw_port_setup(struct rte_eventdev *dev, uint8_t port_id,
 	}
 
 	p->inflight_max = conf->new_event_threshold;
+	p->implicit_release = !conf->disable_implicit_release;
 
 	/* check if ring exists, same as rx_worker above */
 	snprintf(buf, sizeof(buf), "sw%d_p%u, %s", dev->data->dev_id,
@@ -402,6 +403,7 @@ sw_port_def_conf(struct rte_eventdev *dev, uint8_t port_id,
 	port_conf->new_event_threshold = 1024;
 	port_conf->dequeue_depth = 16;
 	port_conf->enqueue_depth = 16;
+	port_conf->disable_implicit_release = 0;
 }
 
 static int
@@ -450,9 +452,11 @@ sw_info_get(struct rte_eventdev *dev, struct rte_event_dev_info *info)
 			.max_event_port_dequeue_depth = MAX_SW_CONS_Q_DEPTH,
 			.max_event_port_enqueue_depth = MAX_SW_PROD_Q_DEPTH,
 			.max_num_events = SW_INFLIGHT_EVENTS_TOTAL,
-			.event_dev_cap = (RTE_EVENT_DEV_CAP_QUEUE_QOS |
-					RTE_EVENT_DEV_CAP_BURST_MODE |
-					RTE_EVENT_DEV_CAP_EVENT_QOS),
+			.event_dev_cap = (
+				RTE_EVENT_DEV_CAP_QUEUE_QOS |
+				RTE_EVENT_DEV_CAP_BURST_MODE |
+				RTE_EVENT_DEV_CAP_EVENT_QOS |
+				RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE),
 	};
 
 	*info = evdev_sw_info;
diff --git a/drivers/event/sw/sw_evdev.h b/drivers/event/sw/sw_evdev.h
index e0dec91..3b5f64f 100644
--- a/drivers/event/sw/sw_evdev.h
+++ b/drivers/event/sw/sw_evdev.h
@@ -201,6 +201,7 @@ struct sw_port {
 	uint16_t outstanding_releases __rte_cache_aligned;
 	uint16_t inflight_max; /* app requested max inflights for this port */
 	uint16_t inflight_credits; /* num credits this port has right now */
+	uint8_t implicit_release; /* release events before dequeueing */
 
 	uint16_t last_dequeue_burst_sz; /* how big the burst was */
 	uint64_t last_dequeue_ticks; /* used to track burst processing time */
diff --git a/drivers/event/sw/sw_evdev_worker.c b/drivers/event/sw/sw_evdev_worker.c
index b3b3b17..93cd29b 100644
--- a/drivers/event/sw/sw_evdev_worker.c
+++ b/drivers/event/sw/sw_evdev_worker.c
@@ -109,7 +109,7 @@ sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num)
 			return 0;
 	}
 
-	uint32_t forwards = 0;
+	uint32_t completions = 0;
 	for (i = 0; i < num; i++) {
 		int op = ev[i].op;
 		int outstanding = p->outstanding_releases > 0;
@@ -118,7 +118,6 @@ sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num)
 		p->inflight_credits -= (op == RTE_EVENT_OP_NEW);
 		p->inflight_credits += (op == RTE_EVENT_OP_RELEASE) *
 					outstanding;
-		forwards += (op == RTE_EVENT_OP_FORWARD);
 
 		new_ops[i] = sw_qe_flag_map[op];
 		new_ops[i] &= ~(invalid_qid << QE_FLAG_VALID_SHIFT);
@@ -127,8 +126,10 @@ sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num)
 		 * correct usage of the API), providing very high correct
 		 * prediction rate.
 		 */
-		if ((new_ops[i] & QE_FLAG_COMPLETE) && outstanding)
+		if ((new_ops[i] & QE_FLAG_COMPLETE) && outstanding) {
 			p->outstanding_releases--;
+			completions++;
+		}
 
 		/* error case: branch to avoid touching p->stats */
 		if (unlikely(invalid_qid)) {
@@ -137,8 +138,8 @@ sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num)
 		}
 	}
 
-	/* handle directed port forward credits */
-	p->inflight_credits -= forwards * p->is_directed;
+	/* handle directed port forward and release credits */
+	p->inflight_credits -= completions * p->is_directed;
 
 	/* returns number of events actually enqueued */
 	uint32_t enq = enqueue_burst_with_ops(p->rx_worker_ring, ev, i,
@@ -172,7 +173,7 @@ sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num,
 	uint32_t credit_update_quanta = sw->credit_update_quanta;
 
 	/* check that all previous dequeues have been released */
-	if (!p->is_directed) {
+	if (p->implicit_release && !p->is_directed) {
 		uint16_t out_rels = p->outstanding_releases;
 		uint16_t i;
 		for (i = 0; i < out_rels; i++)
@@ -182,7 +183,6 @@ sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num,
 	/* returns number of events actually dequeued */
 	uint16_t ndeq = rte_event_ring_dequeue_burst(ring, ev, num, NULL);
 	if (unlikely(ndeq == 0)) {
-		p->outstanding_releases = 0;
 		p->zero_polls++;
 		p->total_polls++;
 		goto end;
@@ -190,7 +190,7 @@ sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num,
 
 	/* only add credits for directed ports - LB ports send RELEASEs */
 	p->inflight_credits += ndeq * p->is_directed;
-	p->outstanding_releases = ndeq;
+	p->outstanding_releases += ndeq;
 	p->last_dequeue_burst_sz = ndeq;
 	p->last_dequeue_ticks = rte_get_timer_cycles();
 	p->poll_buckets[(ndeq - 1) >> SW_DEQ_STAT_BUCKET_SHIFT]++;
diff --git a/examples/eventdev_pipeline_sw_pmd/main.c b/examples/eventdev_pipeline_sw_pmd/main.c
index 5f431d8..3910b53 100644
--- a/examples/eventdev_pipeline_sw_pmd/main.c
+++ b/examples/eventdev_pipeline_sw_pmd/main.c
@@ -62,6 +62,7 @@ struct prod_data {
 struct cons_data {
 	uint8_t dev_id;
 	uint8_t port_id;
+	bool release;
 } __rte_cache_aligned;
 
 static struct prod_data prod_data;
@@ -167,6 +168,18 @@ consumer(void)
 		uint8_t outport = packets[i].mbuf->port;
 		rte_eth_tx_buffer(outport, 0, fdata->tx_buf[outport],
 				packets[i].mbuf);
+
+		packets[i].op = RTE_EVENT_OP_RELEASE;
+	}
+
+	if (cons_data.release) {
+		uint16_t nb_tx;
+
+		nb_tx = rte_event_enqueue_burst(dev_id, port_id, packets, n);
+		while (nb_tx < n)
+			nb_tx += rte_event_enqueue_burst(dev_id, port_id,
+							 packets + nb_tx,
+							 n - nb_tx);
 	}
 
 	/* Print out mpps every 1<22 packets */
@@ -702,6 +715,7 @@ setup_eventdev(struct prod_data *prod_data,
 	};
 
 	struct port_link worker_queues[MAX_NUM_STAGES];
+	bool disable_implicit_release;
 	struct port_link tx_queue;
 	unsigned int i;
 
@@ -715,6 +729,12 @@ setup_eventdev(struct prod_data *prod_data,
 	ret = rte_event_dev_info_get(dev_id, &dev_info);
 	printf("\tEventdev %d: %s\n", dev_id, dev_info.driver_name);
 
+	disable_implicit_release = (dev_info.event_dev_cap &
+				    RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE);
+
+	wkr_p_conf.disable_implicit_release = disable_implicit_release;
+	tx_p_conf.disable_implicit_release = disable_implicit_release;
+
 	if (dev_info.max_event_port_dequeue_depth <
 			config.nb_event_port_dequeue_depth)
 		config.nb_event_port_dequeue_depth =
@@ -823,6 +843,7 @@ setup_eventdev(struct prod_data *prod_data,
 			.dequeue_depth = 8,
 			.enqueue_depth = 8,
 			.new_event_threshold = 1200,
+			.disable_implicit_release = disable_implicit_release,
 	};
 
 	if (rx_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
@@ -839,7 +860,8 @@ setup_eventdev(struct prod_data *prod_data,
 					.port_id = i + 1,
 					.qid = cdata.qid[0] };
 	*cons_data = (struct cons_data){.dev_id = dev_id,
-					.port_id = i };
+					.port_id = i,
+					.release = disable_implicit_release };
 
 	ret = rte_event_dev_service_id_get(dev_id,
 				&fdata->evdev_service_id);
diff --git a/lib/librte_eventdev/rte_eventdev.c b/lib/librte_eventdev/rte_eventdev.c
index ce6a5dc..eea8b7a 100644
--- a/lib/librte_eventdev/rte_eventdev.c
+++ b/lib/librte_eventdev/rte_eventdev.c
@@ -686,6 +686,15 @@ rte_event_port_setup(uint8_t dev_id, uint8_t port_id,
 		return -EINVAL;
 	}
 
+	if (port_conf && port_conf->disable_implicit_release &&
+	    !(dev->data->event_dev_cap &
+	      RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE)) {
+		RTE_EDEV_LOG_ERR(
+		   "dev%d port%d Implicit release disable not supported",
+			dev_id, port_id);
+		return -EINVAL;
+	}
+
 	if (dev->data->dev_started) {
 		RTE_EDEV_LOG_ERR(
 		    "device %d must be stopped to allow port setup", dev_id);
diff --git a/lib/librte_eventdev/rte_eventdev.h b/lib/librte_eventdev/rte_eventdev.h
index f1949ff..8377b3f 100644
--- a/lib/librte_eventdev/rte_eventdev.h
+++ b/lib/librte_eventdev/rte_eventdev.h
@@ -282,6 +282,16 @@ struct rte_mbuf; /* we just use mbuf pointers; no need to include rte_mbuf.h */
  *
  * @see rte_event_dequeue_burst() rte_event_enqueue_burst()
  */
+#define RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE    (1ULL << 5)
+/**< Event device ports support disabling the implicit release feature, in
+ * which the port will release all unreleased events in its dequeue operation.
+ * If this capability is set and the port is configured with implicit release
+ * disabled, the application is responsible for explicitly releasing events
+ * using either the RTE_EVENT_OP_FORWARD or the RTE_EVENT_OP_RELEASE event
+ * enqueue operations.
+ *
+ * @see rte_event_dequeue_burst() rte_event_enqueue_burst()
+ */
 
 /* Event device priority levels */
 #define RTE_EVENT_DEV_PRIORITY_HIGHEST   0
@@ -686,6 +696,13 @@ struct rte_event_port_conf {
 	 * which previously supplied to rte_event_dev_configure().
 	 * Ignored when device is not RTE_EVENT_DEV_CAP_BURST_MODE capable.
 	 */
+	uint8_t disable_implicit_release;
+	/**< Configure the port not to release outstanding events in
+	 * rte_event_dev_dequeue_burst(). If true, all events received through
+	 * the port must be explicitly released with RTE_EVENT_OP_RELEASE or
+	 * RTE_EVENT_OP_FORWARD. Must be false when the device is not
+	 * RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE capable.
+	 */
 };
 
 /**
@@ -1381,9 +1398,9 @@ rte_event_dequeue_timeout_ticks(uint8_t dev_id, uint64_t ns,
  *
  * The number of events dequeued is the number of scheduler contexts held by
  * this port. These contexts are automatically released in the next
- * rte_event_dequeue_burst() invocation, or invoking rte_event_enqueue_burst()
- * with RTE_EVENT_OP_RELEASE operation can be used to release the
- * contexts early.
+ * rte_event_dequeue_burst() invocation if the port supports implicit
+ * releases, or invoking rte_event_enqueue_burst() with RTE_EVENT_OP_RELEASE
+ * operation can be used to release the contexts early.
  *
  * Event operations RTE_EVENT_OP_FORWARD and RTE_EVENT_OP_RELEASE must only be
  * enqueued to the same port that their associated events were dequeued from.
diff --git a/test/test/test_eventdev.c b/test/test/test_eventdev.c
index ba39cba..85fe631 100644
--- a/test/test/test_eventdev.c
+++ b/test/test/test_eventdev.c
@@ -581,6 +581,15 @@ test_eventdev_port_setup(void)
 	ret = rte_event_port_setup(TEST_DEV_ID, 0, &pconf);
 	TEST_ASSERT(ret == -EINVAL, "Expected -EINVAL, %d", ret);
 
+	if (!(info.event_dev_cap &
+	      RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE)) {
+		pconf.enqueue_depth = info.max_event_port_enqueue_depth;
+		pconf.disable_implicit_release = 1;
+		ret = rte_event_port_setup(TEST_DEV_ID, 0, &pconf);
+		TEST_ASSERT(ret == -EINVAL, "Expected -EINVAL, %d", ret);
+		pconf.disable_implicit_release = 0;
+	}
+
 	ret = rte_event_port_setup(TEST_DEV_ID, info.max_event_ports,
 					&pconf);
 	TEST_ASSERT(ret == -EINVAL, "Expected -EINVAL, %d", ret);
diff --git a/test/test/test_eventdev_sw.c b/test/test/test_eventdev_sw.c
index f524b6f..0f99160 100644
--- a/test/test/test_eventdev_sw.c
+++ b/test/test/test_eventdev_sw.c
@@ -200,6 +200,7 @@ create_ports(struct test *t, int num_ports)
 			.new_event_threshold = 1024,
 			.dequeue_depth = 32,
 			.enqueue_depth = 64,
+			.disable_implicit_release = 0,
 	};
 	if (num_ports > MAX_PORTS)
 		return -1;
@@ -1256,6 +1257,7 @@ port_reconfig_credits(struct test *t)
 				.new_event_threshold = 128,
 				.dequeue_depth = 32,
 				.enqueue_depth = 64,
+				.disable_implicit_release = 0,
 		};
 		if (rte_event_port_setup(evdev, 0, &port_conf) < 0) {
 			printf("%d Error setting up port\n", __LINE__);
@@ -1345,6 +1347,7 @@ port_single_lb_reconfig(struct test *t)
 		.new_event_threshold = 128,
 		.dequeue_depth = 32,
 		.enqueue_depth = 64,
+		.disable_implicit_release = 0,
 	};
 	if (rte_event_port_setup(evdev, 0, &port_conf) < 0) {
 		printf("%d Error setting up port\n", __LINE__);
@@ -2940,7 +2943,7 @@ worker_loopback_producer_fn(void *arg)
 }
 
 static int
-worker_loopback(struct test *t)
+worker_loopback(struct test *t, uint8_t disable_implicit_release)
 {
 	/* use a single producer core, and a worker core to see what happens
 	 * if the worker loops packets back multiple times
@@ -2966,6 +2969,7 @@ worker_loopback(struct test *t)
 	 * only be initialized once - and this needs to be set for multiple runs
 	 */
 	conf.new_event_threshold = 512;
+	conf.disable_implicit_release = disable_implicit_release;
 
 	if (rte_event_port_setup(evdev, 0, &conf) < 0) {
 		printf("Error setting up RX port\n");
@@ -3240,7 +3244,7 @@ test_sw_eventdev(void)
 	}
 	if (rte_lcore_count() >= 3) {
 		printf("*** Running Worker loopback test...\n");
-		ret = worker_loopback(t);
+		ret = worker_loopback(t, 0);
 		if (ret != 0) {
 			printf("ERROR - Worker loopback test FAILED.\n");
 			return ret;
@@ -3249,6 +3253,18 @@ test_sw_eventdev(void)
 		printf("### Not enough cores for worker loopback test.\n");
 		printf("### Need at least 3 cores for test.\n");
 	}
+	if (rte_lcore_count() >= 3) {
+		printf("*** Running Worker loopback test (implicit release disabled)...\n");
+		ret = worker_loopback(t, 1);
+		if (ret != 0) {
+			printf("ERROR - Worker loopback test FAILED.\n");
+			return ret;
+		}
+	} else {
+		printf("### Not enough cores for worker loopback test.\n");
+		printf("### Need at least 3 cores for test.\n");
+	}
+
 	/*
 	 * Free test instance, leaving mempool initialized, and a pointer to it
 	 * in static eventdev_func_mempool, as it is re-used on re-runs
-- 
2.7.4

^ permalink raw reply	[flat|nested] 10+ messages in thread

* [dpdk-dev] [PATCH 2/2] event/sw: simplify credit scheme
  2017-11-30  4:20 [dpdk-dev] [PATCH 0/2] add implicit release disable capability Gage Eads
  2017-11-30  4:20 ` [dpdk-dev] [PATCH 1/2] eventdev: " Gage Eads
@ 2017-11-30  4:20 ` Gage Eads
  2017-12-11 13:42   ` Van Haaren, Harry
  1 sibling, 1 reply; 10+ messages in thread
From: Gage Eads @ 2017-11-30  4:20 UTC (permalink / raw)
  To: dev
  Cc: jerin.jacob, harry.van.haaren, bruce.richardson, hemant.agrawal,
	nipun.gupta, santosh.shukla

This commit modifies the sw PMD credit scheme such that credits are
consumed when enqueueing a NEW event and released when an event is
released -- typically, the beginning and end of a pipeline. Workers that
simply forward events do not interact with the credit pool.

Signed-off-by: Gage Eads <gage.eads@intel.com>
---
 drivers/event/sw/sw_evdev_worker.c | 38 +++++++++++++++++++-------------------
 1 file changed, 19 insertions(+), 19 deletions(-)

diff --git a/drivers/event/sw/sw_evdev_worker.c b/drivers/event/sw/sw_evdev_worker.c
index 93cd29b..766c836 100644
--- a/drivers/event/sw/sw_evdev_worker.c
+++ b/drivers/event/sw/sw_evdev_worker.c
@@ -85,6 +85,7 @@ sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num)
 	struct sw_port *p = port;
 	struct sw_evdev *sw = (void *)p->sw;
 	uint32_t sw_inflights = rte_atomic32_read(&sw->inflights);
+	uint32_t credit_update_quanta = sw->credit_update_quanta;
 	int new = 0;
 
 	if (num > PORT_ENQUEUE_MAX_BURST_SIZE)
@@ -98,7 +99,6 @@ sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num)
 
 	if (p->inflight_credits < new) {
 		/* check if event enqueue brings port over max threshold */
-		uint32_t credit_update_quanta = sw->credit_update_quanta;
 		if (sw_inflights + credit_update_quanta > sw->nb_events_limit)
 			return 0;
 
@@ -109,7 +109,6 @@ sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num)
 			return 0;
 	}
 
-	uint32_t completions = 0;
 	for (i = 0; i < num; i++) {
 		int op = ev[i].op;
 		int outstanding = p->outstanding_releases > 0;
@@ -126,21 +125,16 @@ sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num)
 		 * correct usage of the API), providing very high correct
 		 * prediction rate.
 		 */
-		if ((new_ops[i] & QE_FLAG_COMPLETE) && outstanding) {
+		if ((new_ops[i] & QE_FLAG_COMPLETE) && outstanding)
 			p->outstanding_releases--;
-			completions++;
-		}
 
 		/* error case: branch to avoid touching p->stats */
-		if (unlikely(invalid_qid)) {
+		if (unlikely(invalid_qid && op != RTE_EVENT_OP_RELEASE)) {
 			p->stats.rx_dropped++;
 			p->inflight_credits++;
 		}
 	}
 
-	/* handle directed port forward and release credits */
-	p->inflight_credits -= completions * p->is_directed;
-
 	/* returns number of events actually enqueued */
 	uint32_t enq = enqueue_burst_with_ops(p->rx_worker_ring, ev, i,
 					     new_ops);
@@ -153,6 +147,13 @@ sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num)
 		p->avg_pkt_ticks += burst_pkt_ticks / NUM_SAMPLES;
 		p->last_dequeue_ticks = 0;
 	}
+
+	/* Replenish credits if enough releases are performed */
+	if (p->inflight_credits >= credit_update_quanta * 2) {
+		rte_atomic32_sub(&sw->inflights, credit_update_quanta);
+		p->inflight_credits -= credit_update_quanta;
+	}
+
 	return enq;
 }
 
@@ -168,16 +169,22 @@ sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num,
 {
 	RTE_SET_USED(wait);
 	struct sw_port *p = (void *)port;
-	struct sw_evdev *sw = (void *)p->sw;
 	struct rte_event_ring *ring = p->cq_worker_ring;
-	uint32_t credit_update_quanta = sw->credit_update_quanta;
 
 	/* check that all previous dequeues have been released */
-	if (p->implicit_release && !p->is_directed) {
+	if (p->implicit_release) {
+		struct sw_evdev *sw = (void *)p->sw;
+		uint32_t credit_update_quanta = sw->credit_update_quanta;
 		uint16_t out_rels = p->outstanding_releases;
 		uint16_t i;
 		for (i = 0; i < out_rels; i++)
 			sw_event_release(p, i);
+
+		/* Replenish credits if enough releases are performed */
+		if (p->inflight_credits >= credit_update_quanta * 2) {
+			rte_atomic32_sub(&sw->inflights, credit_update_quanta);
+			p->inflight_credits -= credit_update_quanta;
+		}
 	}
 
 	/* returns number of events actually dequeued */
@@ -188,8 +195,6 @@ sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num,
 		goto end;
 	}
 
-	/* only add credits for directed ports - LB ports send RELEASEs */
-	p->inflight_credits += ndeq * p->is_directed;
 	p->outstanding_releases += ndeq;
 	p->last_dequeue_burst_sz = ndeq;
 	p->last_dequeue_ticks = rte_get_timer_cycles();
@@ -197,11 +202,6 @@ sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num,
 	p->total_polls++;
 
 end:
-	if (p->inflight_credits >= credit_update_quanta * 2 &&
-			p->inflight_credits > credit_update_quanta + ndeq) {
-		rte_atomic32_sub(&sw->inflights, credit_update_quanta);
-		p->inflight_credits -= credit_update_quanta;
-	}
 	return ndeq;
 }
 
-- 
2.7.4

^ permalink raw reply	[flat|nested] 10+ messages in thread

* Re: [dpdk-dev] [PATCH 1/2] eventdev: add implicit release disable capability
  2017-11-30  4:20 ` [dpdk-dev] [PATCH 1/2] eventdev: " Gage Eads
@ 2017-12-11 12:36   ` Van Haaren, Harry
  2017-12-11 17:39     ` Eads, Gage
  2017-12-11 17:56   ` [dpdk-dev] [PATCH v2 " Gage Eads
  1 sibling, 1 reply; 10+ messages in thread
From: Van Haaren, Harry @ 2017-12-11 12:36 UTC (permalink / raw)
  To: Eads, Gage, dev
  Cc: jerin.jacob, Richardson, Bruce, hemant.agrawal, nipun.gupta,
	santosh.shukla

> -----Original Message-----
> From: Eads, Gage
> Sent: Thursday, November 30, 2017 4:21 AM
> To: dev@dpdk.org
> Cc: jerin.jacob@caviumnetworks.com; Van Haaren, Harry
> <harry.van.haaren@intel.com>; Richardson, Bruce
> <bruce.richardson@intel.com>; hemant.agrawal@nxp.com; nipun.gupta@nxp.com;
> santosh.shukla@caviumnetworks.com
> Subject: [PATCH 1/2] eventdev: add implicit release disable capability
> 
> This commit introduces a capability for disabling the "implicit" release
> functionality for a port, which prevents the eventdev PMD from issuing
> outstanding releases for previously dequeued events when dequeuing a new
> batch of events.
> 
> If a PMD does not support this capability, the application will receive an
> error if it attempts to setup a port with implicit releases disabled.
> Otherwise, if the port is configured with implicit releases disabled, the
> application must release each dequeued event by invoking
> rte_event_enqueue_burst() with RTE_EVENT_OP_RELEASE or
> RTE_EVENT_OP_FORWARD.
> 
> Signed-off-by: Gage Eads <gage.eads@intel.com>

Some comments inline. In general, I think this makes sense, and is the easiest solution that I can see.


>  drivers/event/dpaa2/dpaa2_eventdev.c       |  2 ++
>  drivers/event/octeontx/ssovf_evdev.c       |  1 +
>  drivers/event/skeleton/skeleton_eventdev.c |  1 +
>  drivers/event/sw/sw_evdev.c                | 10 +++++++---
>  drivers/event/sw/sw_evdev.h                |  1 +
>  drivers/event/sw/sw_evdev_worker.c         | 16 ++++++++--------
>  examples/eventdev_pipeline_sw_pmd/main.c   | 24 +++++++++++++++++++++++-
>  lib/librte_eventdev/rte_eventdev.c         |  9 +++++++++
>  lib/librte_eventdev/rte_eventdev.h         | 23 ++++++++++++++++++++---
>  test/test/test_eventdev.c                  |  9 +++++++++
>  test/test/test_eventdev_sw.c               | 20 ++++++++++++++++++--
>  11 files changed, 99 insertions(+), 17 deletions(-)
> 
> diff --git a/drivers/event/dpaa2/dpaa2_eventdev.c
> b/drivers/event/dpaa2/dpaa2_eventdev.c
> index eeeb231..236b211 100644
> --- a/drivers/event/dpaa2/dpaa2_eventdev.c
> +++ b/drivers/event/dpaa2/dpaa2_eventdev.c
> @@ -440,6 +440,8 @@ dpaa2_eventdev_port_def_conf(struct rte_eventdev *dev,
> uint8_t port_id,
>  		DPAA2_EVENT_MAX_PORT_DEQUEUE_DEPTH;
>  	port_conf->enqueue_depth =
>  		DPAA2_EVENT_MAX_PORT_ENQUEUE_DEPTH;
> +	port_conf->disable_implicit_release =
> +		0;

Merge "0;" onto previous line?

<snip>

> --- a/drivers/event/skeleton/skeleton_eventdev.c
> +++ b/drivers/event/skeleton/skeleton_eventdev.c
> @@ -237,6 +237,7 @@ skeleton_eventdev_port_def_conf(struct rte_eventdev
> *dev, uint8_t port_id,
>  	port_conf->new_event_threshold = 32 * 1024;
>  	port_conf->dequeue_depth = 16;
>  	port_conf->enqueue_depth = 16;
> +	port_conf->disable_implicit_release = false;

Prefer 0 to false.

<snip>

> diff --git a/examples/eventdev_pipeline_sw_pmd/main.c
> b/examples/eventdev_pipeline_sw_pmd/main.c
> index 5f431d8..3910b53 100644
> --- a/examples/eventdev_pipeline_sw_pmd/main.c
> +++ b/examples/eventdev_pipeline_sw_pmd/main.c
> @@ -62,6 +62,7 @@ struct prod_data {
>  struct cons_data {
>  	uint8_t dev_id;
>  	uint8_t port_id;
> +	bool release;

I'd personally uint8_t this instead of bool, which requires <stdbool.h>. I haven't seen stdbool.h in other DPDK headers, so suggesting stick with the good old byte-sized integers for flags.. 


>  } __rte_cache_aligned;
> 
>  static struct prod_data prod_data;
> @@ -167,6 +168,18 @@ consumer(void)
>  		uint8_t outport = packets[i].mbuf->port;
>  		rte_eth_tx_buffer(outport, 0, fdata->tx_buf[outport],
>  				packets[i].mbuf);
> +
> +		packets[i].op = RTE_EVENT_OP_RELEASE;
> +	}
> +
> +	if (cons_data.release) {
> +		uint16_t nb_tx;
> +
> +		nb_tx = rte_event_enqueue_burst(dev_id, port_id, packets, n);
> +		while (nb_tx < n)
> +			nb_tx += rte_event_enqueue_burst(dev_id, port_id,
> +							 packets + nb_tx,
> +							 n - nb_tx);
>  	}
> 
>  	/* Print out mpps every 1<22 packets */
> @@ -702,6 +715,7 @@ setup_eventdev(struct prod_data *prod_data,
>  	};
> 
>  	struct port_link worker_queues[MAX_NUM_STAGES];
> +	bool disable_implicit_release;

Same uint8_t over stdbool.h comment as above


> @@ -3240,7 +3244,7 @@ test_sw_eventdev(void)
>  	if (rte_lcore_count() >= 3) {
>  		printf("*** Running Worker loopback test...\n");
> -		ret = worker_loopback(t);
> +		ret = worker_loopback(t, 0);
>  		if (ret != 0) {
>  			printf("ERROR - Worker loopback test FAILED.\n");
>  			return ret;
> @@ -3249,6 +3253,18 @@ test_sw_eventdev(void)
>  		printf("### Not enough cores for worker loopback test.\n");
>  		printf("### Need at least 3 cores for test.\n");
>  	}
> +	if (rte_lcore_count() >= 3) {
> +		printf("*** Running Worker loopback test (implicit release
> disabled)...\n");
> +		ret = worker_loopback(t, 1);
> +		if (ret != 0) {
> +			printf("ERROR - Worker loopback test FAILED.\n");
> +			return ret;
> +		}
> +	} else {
> +		printf("### Not enough cores for worker loopback test.\n");
> +		printf("### Need at least 3 cores for test.\n");
> +	}

The double if (count >= 3) here looks like it could be removed and the loopback(t, 1) added to the first if()- but otherwise the logic is fine.


With the above changes, this looks good to me!

Acked-by: Harry van Haaren <harry.van.haaren@intel.com>

^ permalink raw reply	[flat|nested] 10+ messages in thread

* Re: [dpdk-dev] [PATCH 2/2] event/sw: simplify credit scheme
  2017-11-30  4:20 ` [dpdk-dev] [PATCH 2/2] event/sw: simplify credit scheme Gage Eads
@ 2017-12-11 13:42   ` Van Haaren, Harry
  2017-12-16  8:54     ` Jerin Jacob
  0 siblings, 1 reply; 10+ messages in thread
From: Van Haaren, Harry @ 2017-12-11 13:42 UTC (permalink / raw)
  To: Eads, Gage, dev
  Cc: jerin.jacob, Richardson, Bruce, hemant.agrawal, nipun.gupta,
	santosh.shukla

> -----Original Message-----
> From: Eads, Gage
> Sent: Thursday, November 30, 2017 4:21 AM
> To: dev@dpdk.org
> Cc: jerin.jacob@caviumnetworks.com; Van Haaren, Harry
> <harry.van.haaren@intel.com>; Richardson, Bruce
> <bruce.richardson@intel.com>; hemant.agrawal@nxp.com; nipun.gupta@nxp.com;
> santosh.shukla@caviumnetworks.com
> Subject: [PATCH 2/2] event/sw: simplify credit scheme
> 
> This commit modifies the sw PMD credit scheme such that credits are
> consumed when enqueueing a NEW event and released when an event is
> released -- typically, the beginning and end of a pipeline. Workers that
> simply forward events do not interact with the credit pool.
> 
> Signed-off-by: Gage Eads <gage.eads@intel.com>

Acked-by: Harry van Haaren <harry.van.haaren@intel.com>

^ permalink raw reply	[flat|nested] 10+ messages in thread

* Re: [dpdk-dev] [PATCH 1/2] eventdev: add implicit release disable capability
  2017-12-11 12:36   ` Van Haaren, Harry
@ 2017-12-11 17:39     ` Eads, Gage
  0 siblings, 0 replies; 10+ messages in thread
From: Eads, Gage @ 2017-12-11 17:39 UTC (permalink / raw)
  To: Van Haaren, Harry, dev
  Cc: jerin.jacob, Richardson, Bruce, hemant.agrawal, nipun.gupta,
	santosh.shukla

Sure, will make all suggested changes in v2.

> -----Original Message-----
> From: Van Haaren, Harry
> Sent: Monday, December 11, 2017 6:36 AM
> To: Eads, Gage <gage.eads@intel.com>; dev@dpdk.org
> Cc: jerin.jacob@caviumnetworks.com; Richardson, Bruce
> <bruce.richardson@intel.com>; hemant.agrawal@nxp.com;
> nipun.gupta@nxp.com; santosh.shukla@caviumnetworks.com
> Subject: RE: [PATCH 1/2] eventdev: add implicit release disable capability
> 
> > -----Original Message-----
> > From: Eads, Gage
> > Sent: Thursday, November 30, 2017 4:21 AM
> > To: dev@dpdk.org
> > Cc: jerin.jacob@caviumnetworks.com; Van Haaren, Harry
> > <harry.van.haaren@intel.com>; Richardson, Bruce
> > <bruce.richardson@intel.com>; hemant.agrawal@nxp.com;
> > nipun.gupta@nxp.com; santosh.shukla@caviumnetworks.com
> > Subject: [PATCH 1/2] eventdev: add implicit release disable capability
> >
> > This commit introduces a capability for disabling the "implicit"
> > release functionality for a port, which prevents the eventdev PMD from
> > issuing outstanding releases for previously dequeued events when
> > dequeuing a new batch of events.
> >
> > If a PMD does not support this capability, the application will
> > receive an error if it attempts to setup a port with implicit releases disabled.
> > Otherwise, if the port is configured with implicit releases disabled,
> > the application must release each dequeued event by invoking
> > rte_event_enqueue_burst() with RTE_EVENT_OP_RELEASE or
> > RTE_EVENT_OP_FORWARD.
> >
> > Signed-off-by: Gage Eads <gage.eads@intel.com>
> 
> Some comments inline. In general, I think this makes sense, and is the easiest
> solution that I can see.
> 
> 
> >  drivers/event/dpaa2/dpaa2_eventdev.c       |  2 ++
> >  drivers/event/octeontx/ssovf_evdev.c       |  1 +
> >  drivers/event/skeleton/skeleton_eventdev.c |  1 +
> >  drivers/event/sw/sw_evdev.c                | 10 +++++++---
> >  drivers/event/sw/sw_evdev.h                |  1 +
> >  drivers/event/sw/sw_evdev_worker.c         | 16 ++++++++--------
> >  examples/eventdev_pipeline_sw_pmd/main.c   | 24
> +++++++++++++++++++++++-
> >  lib/librte_eventdev/rte_eventdev.c         |  9 +++++++++
> >  lib/librte_eventdev/rte_eventdev.h         | 23 ++++++++++++++++++++---
> >  test/test/test_eventdev.c                  |  9 +++++++++
> >  test/test/test_eventdev_sw.c               | 20 ++++++++++++++++++--
> >  11 files changed, 99 insertions(+), 17 deletions(-)
> >
> > diff --git a/drivers/event/dpaa2/dpaa2_eventdev.c
> > b/drivers/event/dpaa2/dpaa2_eventdev.c
> > index eeeb231..236b211 100644
> > --- a/drivers/event/dpaa2/dpaa2_eventdev.c
> > +++ b/drivers/event/dpaa2/dpaa2_eventdev.c
> > @@ -440,6 +440,8 @@ dpaa2_eventdev_port_def_conf(struct rte_eventdev
> > *dev, uint8_t port_id,
> >  		DPAA2_EVENT_MAX_PORT_DEQUEUE_DEPTH;
> >  	port_conf->enqueue_depth =
> >  		DPAA2_EVENT_MAX_PORT_ENQUEUE_DEPTH;
> > +	port_conf->disable_implicit_release =
> > +		0;
> 
> Merge "0;" onto previous line?
> 
> <snip>
> 
> > --- a/drivers/event/skeleton/skeleton_eventdev.c
> > +++ b/drivers/event/skeleton/skeleton_eventdev.c
> > @@ -237,6 +237,7 @@ skeleton_eventdev_port_def_conf(struct
> > rte_eventdev *dev, uint8_t port_id,
> >  	port_conf->new_event_threshold = 32 * 1024;
> >  	port_conf->dequeue_depth = 16;
> >  	port_conf->enqueue_depth = 16;
> > +	port_conf->disable_implicit_release = false;
> 
> Prefer 0 to false.
> 
> <snip>
> 
> > diff --git a/examples/eventdev_pipeline_sw_pmd/main.c
> > b/examples/eventdev_pipeline_sw_pmd/main.c
> > index 5f431d8..3910b53 100644
> > --- a/examples/eventdev_pipeline_sw_pmd/main.c
> > +++ b/examples/eventdev_pipeline_sw_pmd/main.c
> > @@ -62,6 +62,7 @@ struct prod_data {
> >  struct cons_data {
> >  	uint8_t dev_id;
> >  	uint8_t port_id;
> > +	bool release;
> 
> I'd personally uint8_t this instead of bool, which requires <stdbool.h>. I haven't
> seen stdbool.h in other DPDK headers, so suggesting stick with the good old
> byte-sized integers for flags..
> 
> 
> >  } __rte_cache_aligned;
> >
> >  static struct prod_data prod_data;
> > @@ -167,6 +168,18 @@ consumer(void)
> >  		uint8_t outport = packets[i].mbuf->port;
> >  		rte_eth_tx_buffer(outport, 0, fdata->tx_buf[outport],
> >  				packets[i].mbuf);
> > +
> > +		packets[i].op = RTE_EVENT_OP_RELEASE;
> > +	}
> > +
> > +	if (cons_data.release) {
> > +		uint16_t nb_tx;
> > +
> > +		nb_tx = rte_event_enqueue_burst(dev_id, port_id, packets, n);
> > +		while (nb_tx < n)
> > +			nb_tx += rte_event_enqueue_burst(dev_id, port_id,
> > +							 packets + nb_tx,
> > +							 n - nb_tx);
> >  	}
> >
> >  	/* Print out mpps every 1<22 packets */ @@ -702,6 +715,7 @@
> > setup_eventdev(struct prod_data *prod_data,
> >  	};
> >
> >  	struct port_link worker_queues[MAX_NUM_STAGES];
> > +	bool disable_implicit_release;
> 
> Same uint8_t over stdbool.h comment as above
> 
> 
> > @@ -3240,7 +3244,7 @@ test_sw_eventdev(void)
> >  	if (rte_lcore_count() >= 3) {
> >  		printf("*** Running Worker loopback test...\n");
> > -		ret = worker_loopback(t);
> > +		ret = worker_loopback(t, 0);
> >  		if (ret != 0) {
> >  			printf("ERROR - Worker loopback test FAILED.\n");
> >  			return ret;
> > @@ -3249,6 +3253,18 @@ test_sw_eventdev(void)
> >  		printf("### Not enough cores for worker loopback test.\n");
> >  		printf("### Need at least 3 cores for test.\n");
> >  	}
> > +	if (rte_lcore_count() >= 3) {
> > +		printf("*** Running Worker loopback test (implicit release
> > disabled)...\n");
> > +		ret = worker_loopback(t, 1);
> > +		if (ret != 0) {
> > +			printf("ERROR - Worker loopback test FAILED.\n");
> > +			return ret;
> > +		}
> > +	} else {
> > +		printf("### Not enough cores for worker loopback test.\n");
> > +		printf("### Need at least 3 cores for test.\n");
> > +	}
> 
> The double if (count >= 3) here looks like it could be removed and the
> loopback(t, 1) added to the first if()- but otherwise the logic is fine.
> 
> 
> With the above changes, this looks good to me!
> 
> Acked-by: Harry van Haaren <harry.van.haaren@intel.com>

^ permalink raw reply	[flat|nested] 10+ messages in thread

* [dpdk-dev] [PATCH v2 1/2] eventdev: add implicit release disable capability
  2017-11-30  4:20 ` [dpdk-dev] [PATCH 1/2] eventdev: " Gage Eads
  2017-12-11 12:36   ` Van Haaren, Harry
@ 2017-12-11 17:56   ` Gage Eads
  2017-12-11 17:56     ` [dpdk-dev] [PATCH v2 2/2] event/sw: simplify credit scheme Gage Eads
  2017-12-16  8:50     ` [dpdk-dev] [PATCH v2 1/2] eventdev: add implicit release disable capability Jerin Jacob
  1 sibling, 2 replies; 10+ messages in thread
From: Gage Eads @ 2017-12-11 17:56 UTC (permalink / raw)
  To: dev
  Cc: jerin.jacob, harry.van.haaren, bruce.richardson, hemant.agrawal,
	nipun.gupta, santosh.shukla

This commit introduces a capability for disabling the "implicit" release
functionality for a port, which prevents the eventdev PMD from issuing
outstanding releases for previously dequeued events when dequeuing a new
batch of events.

If a PMD does not support this capability, the application will receive an
error if it attempts to setup a port with implicit releases disabled.
Otherwise, if the port is configured with implicit releases disabled, the
application must release each dequeued event by invoking
rte_event_enqueue_burst() with RTE_EVENT_OP_RELEASE or
RTE_EVENT_OP_FORWARD.

Signed-off-by: Gage Eads <gage.eads@intel.com>
Acked-by: Harry van Haaren <harry.van.haaren@intel.com>
---
v2:
 - Merged two lines into one in dpaa2_eventdev_port_def_conf().
 - Used '0' instead of 'false' in skeleton_eventdev_port_def_conf().
 - Replaced instances of 'bool' with 'uint8_t' in
   eventdev_pipeline_sw_pmd/main.c.
 - Put both worker_loopback() invocations in the same if block in
   test_sw_eventdev().

 drivers/event/dpaa2/dpaa2_eventdev.c       |  1 +
 drivers/event/octeontx/ssovf_evdev.c       |  1 +
 drivers/event/skeleton/skeleton_eventdev.c |  1 +
 drivers/event/sw/sw_evdev.c                | 10 +++++++---
 drivers/event/sw/sw_evdev.h                |  1 +
 drivers/event/sw/sw_evdev_worker.c         | 16 ++++++++--------
 examples/eventdev_pipeline_sw_pmd/main.c   | 24 +++++++++++++++++++++++-
 lib/librte_eventdev/rte_eventdev.c         |  9 +++++++++
 lib/librte_eventdev/rte_eventdev.h         | 23 ++++++++++++++++++++---
 test/test/test_eventdev.c                  |  9 +++++++++
 test/test/test_eventdev_sw.c               | 20 ++++++++++++++++----
 11 files changed, 96 insertions(+), 19 deletions(-)

diff --git a/drivers/event/dpaa2/dpaa2_eventdev.c b/drivers/event/dpaa2/dpaa2_eventdev.c
index eeeb231..13e7122 100644
--- a/drivers/event/dpaa2/dpaa2_eventdev.c
+++ b/drivers/event/dpaa2/dpaa2_eventdev.c
@@ -440,6 +440,7 @@ dpaa2_eventdev_port_def_conf(struct rte_eventdev *dev, uint8_t port_id,
 		DPAA2_EVENT_MAX_PORT_DEQUEUE_DEPTH;
 	port_conf->enqueue_depth =
 		DPAA2_EVENT_MAX_PORT_ENQUEUE_DEPTH;
+	port_conf->disable_implicit_release = 0;
 }
 
 static void
diff --git a/drivers/event/octeontx/ssovf_evdev.c b/drivers/event/octeontx/ssovf_evdev.c
index 117b145..b80a6c0 100644
--- a/drivers/event/octeontx/ssovf_evdev.c
+++ b/drivers/event/octeontx/ssovf_evdev.c
@@ -252,6 +252,7 @@ ssovf_port_def_conf(struct rte_eventdev *dev, uint8_t port_id,
 	port_conf->new_event_threshold = edev->max_num_events;
 	port_conf->dequeue_depth = 1;
 	port_conf->enqueue_depth = 1;
+	port_conf->disable_implicit_release = 0;
 }
 
 static void
diff --git a/drivers/event/skeleton/skeleton_eventdev.c b/drivers/event/skeleton/skeleton_eventdev.c
index bb554c3..3b448b2 100644
--- a/drivers/event/skeleton/skeleton_eventdev.c
+++ b/drivers/event/skeleton/skeleton_eventdev.c
@@ -237,6 +237,7 @@ skeleton_eventdev_port_def_conf(struct rte_eventdev *dev, uint8_t port_id,
 	port_conf->new_event_threshold = 32 * 1024;
 	port_conf->dequeue_depth = 16;
 	port_conf->enqueue_depth = 16;
+	port_conf->disable_implicit_release = 0;
 }
 
 static void
diff --git a/drivers/event/sw/sw_evdev.c b/drivers/event/sw/sw_evdev.c
index 5b4a208..1ef6340 100644
--- a/drivers/event/sw/sw_evdev.c
+++ b/drivers/event/sw/sw_evdev.c
@@ -191,6 +191,7 @@ sw_port_setup(struct rte_eventdev *dev, uint8_t port_id,
 	}
 
 	p->inflight_max = conf->new_event_threshold;
+	p->implicit_release = !conf->disable_implicit_release;
 
 	/* check if ring exists, same as rx_worker above */
 	snprintf(buf, sizeof(buf), "sw%d_p%u, %s", dev->data->dev_id,
@@ -413,6 +414,7 @@ sw_port_def_conf(struct rte_eventdev *dev, uint8_t port_id,
 	port_conf->new_event_threshold = 1024;
 	port_conf->dequeue_depth = 16;
 	port_conf->enqueue_depth = 16;
+	port_conf->disable_implicit_release = 0;
 }
 
 static int
@@ -482,9 +484,11 @@ sw_info_get(struct rte_eventdev *dev, struct rte_event_dev_info *info)
 			.max_event_port_dequeue_depth = MAX_SW_CONS_Q_DEPTH,
 			.max_event_port_enqueue_depth = MAX_SW_PROD_Q_DEPTH,
 			.max_num_events = SW_INFLIGHT_EVENTS_TOTAL,
-			.event_dev_cap = (RTE_EVENT_DEV_CAP_QUEUE_QOS |
-					RTE_EVENT_DEV_CAP_BURST_MODE |
-					RTE_EVENT_DEV_CAP_EVENT_QOS),
+			.event_dev_cap = (
+				RTE_EVENT_DEV_CAP_QUEUE_QOS |
+				RTE_EVENT_DEV_CAP_BURST_MODE |
+				RTE_EVENT_DEV_CAP_EVENT_QOS |
+				RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE),
 	};
 
 	*info = evdev_sw_info;
diff --git a/drivers/event/sw/sw_evdev.h b/drivers/event/sw/sw_evdev.h
index 0859388..d08f7d0 100644
--- a/drivers/event/sw/sw_evdev.h
+++ b/drivers/event/sw/sw_evdev.h
@@ -213,6 +213,7 @@ struct sw_port {
 	uint16_t outstanding_releases __rte_cache_aligned;
 	uint16_t inflight_max; /* app requested max inflights for this port */
 	uint16_t inflight_credits; /* num credits this port has right now */
+	uint8_t implicit_release; /* release events before dequeueing */
 
 	uint16_t last_dequeue_burst_sz; /* how big the burst was */
 	uint64_t last_dequeue_ticks; /* used to track burst processing time */
diff --git a/drivers/event/sw/sw_evdev_worker.c b/drivers/event/sw/sw_evdev_worker.c
index b3b3b17..93cd29b 100644
--- a/drivers/event/sw/sw_evdev_worker.c
+++ b/drivers/event/sw/sw_evdev_worker.c
@@ -109,7 +109,7 @@ sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num)
 			return 0;
 	}
 
-	uint32_t forwards = 0;
+	uint32_t completions = 0;
 	for (i = 0; i < num; i++) {
 		int op = ev[i].op;
 		int outstanding = p->outstanding_releases > 0;
@@ -118,7 +118,6 @@ sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num)
 		p->inflight_credits -= (op == RTE_EVENT_OP_NEW);
 		p->inflight_credits += (op == RTE_EVENT_OP_RELEASE) *
 					outstanding;
-		forwards += (op == RTE_EVENT_OP_FORWARD);
 
 		new_ops[i] = sw_qe_flag_map[op];
 		new_ops[i] &= ~(invalid_qid << QE_FLAG_VALID_SHIFT);
@@ -127,8 +126,10 @@ sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num)
 		 * correct usage of the API), providing very high correct
 		 * prediction rate.
 		 */
-		if ((new_ops[i] & QE_FLAG_COMPLETE) && outstanding)
+		if ((new_ops[i] & QE_FLAG_COMPLETE) && outstanding) {
 			p->outstanding_releases--;
+			completions++;
+		}
 
 		/* error case: branch to avoid touching p->stats */
 		if (unlikely(invalid_qid)) {
@@ -137,8 +138,8 @@ sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num)
 		}
 	}
 
-	/* handle directed port forward credits */
-	p->inflight_credits -= forwards * p->is_directed;
+	/* handle directed port forward and release credits */
+	p->inflight_credits -= completions * p->is_directed;
 
 	/* returns number of events actually enqueued */
 	uint32_t enq = enqueue_burst_with_ops(p->rx_worker_ring, ev, i,
@@ -172,7 +173,7 @@ sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num,
 	uint32_t credit_update_quanta = sw->credit_update_quanta;
 
 	/* check that all previous dequeues have been released */
-	if (!p->is_directed) {
+	if (p->implicit_release && !p->is_directed) {
 		uint16_t out_rels = p->outstanding_releases;
 		uint16_t i;
 		for (i = 0; i < out_rels; i++)
@@ -182,7 +183,6 @@ sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num,
 	/* returns number of events actually dequeued */
 	uint16_t ndeq = rte_event_ring_dequeue_burst(ring, ev, num, NULL);
 	if (unlikely(ndeq == 0)) {
-		p->outstanding_releases = 0;
 		p->zero_polls++;
 		p->total_polls++;
 		goto end;
@@ -190,7 +190,7 @@ sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num,
 
 	/* only add credits for directed ports - LB ports send RELEASEs */
 	p->inflight_credits += ndeq * p->is_directed;
-	p->outstanding_releases = ndeq;
+	p->outstanding_releases += ndeq;
 	p->last_dequeue_burst_sz = ndeq;
 	p->last_dequeue_ticks = rte_get_timer_cycles();
 	p->poll_buckets[(ndeq - 1) >> SW_DEQ_STAT_BUCKET_SHIFT]++;
diff --git a/examples/eventdev_pipeline_sw_pmd/main.c b/examples/eventdev_pipeline_sw_pmd/main.c
index 5f431d8..2e9a6d2 100644
--- a/examples/eventdev_pipeline_sw_pmd/main.c
+++ b/examples/eventdev_pipeline_sw_pmd/main.c
@@ -62,6 +62,7 @@ struct prod_data {
 struct cons_data {
 	uint8_t dev_id;
 	uint8_t port_id;
+	uint8_t release;
 } __rte_cache_aligned;
 
 static struct prod_data prod_data;
@@ -167,6 +168,18 @@ consumer(void)
 		uint8_t outport = packets[i].mbuf->port;
 		rte_eth_tx_buffer(outport, 0, fdata->tx_buf[outport],
 				packets[i].mbuf);
+
+		packets[i].op = RTE_EVENT_OP_RELEASE;
+	}
+
+	if (cons_data.release) {
+		uint16_t nb_tx;
+
+		nb_tx = rte_event_enqueue_burst(dev_id, port_id, packets, n);
+		while (nb_tx < n)
+			nb_tx += rte_event_enqueue_burst(dev_id, port_id,
+							 packets + nb_tx,
+							 n - nb_tx);
 	}
 
 	/* Print out mpps every 1<22 packets */
@@ -702,6 +715,7 @@ setup_eventdev(struct prod_data *prod_data,
 	};
 
 	struct port_link worker_queues[MAX_NUM_STAGES];
+	uint8_t disable_implicit_release;
 	struct port_link tx_queue;
 	unsigned int i;
 
@@ -715,6 +729,12 @@ setup_eventdev(struct prod_data *prod_data,
 	ret = rte_event_dev_info_get(dev_id, &dev_info);
 	printf("\tEventdev %d: %s\n", dev_id, dev_info.driver_name);
 
+	disable_implicit_release = (dev_info.event_dev_cap &
+				    RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE);
+
+	wkr_p_conf.disable_implicit_release = disable_implicit_release;
+	tx_p_conf.disable_implicit_release = disable_implicit_release;
+
 	if (dev_info.max_event_port_dequeue_depth <
 			config.nb_event_port_dequeue_depth)
 		config.nb_event_port_dequeue_depth =
@@ -823,6 +843,7 @@ setup_eventdev(struct prod_data *prod_data,
 			.dequeue_depth = 8,
 			.enqueue_depth = 8,
 			.new_event_threshold = 1200,
+			.disable_implicit_release = disable_implicit_release,
 	};
 
 	if (rx_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
@@ -839,7 +860,8 @@ setup_eventdev(struct prod_data *prod_data,
 					.port_id = i + 1,
 					.qid = cdata.qid[0] };
 	*cons_data = (struct cons_data){.dev_id = dev_id,
-					.port_id = i };
+					.port_id = i,
+					.release = disable_implicit_release };
 
 	ret = rte_event_dev_service_id_get(dev_id,
 				&fdata->evdev_service_id);
diff --git a/lib/librte_eventdev/rte_eventdev.c b/lib/librte_eventdev/rte_eventdev.c
index e17f8fc..3e93c03 100644
--- a/lib/librte_eventdev/rte_eventdev.c
+++ b/lib/librte_eventdev/rte_eventdev.c
@@ -686,6 +686,15 @@ rte_event_port_setup(uint8_t dev_id, uint8_t port_id,
 		return -EINVAL;
 	}
 
+	if (port_conf && port_conf->disable_implicit_release &&
+	    !(dev->data->event_dev_cap &
+	      RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE)) {
+		RTE_EDEV_LOG_ERR(
+		   "dev%d port%d Implicit release disable not supported",
+			dev_id, port_id);
+		return -EINVAL;
+	}
+
 	if (dev->data->dev_started) {
 		RTE_EDEV_LOG_ERR(
 		    "device %d must be stopped to allow port setup", dev_id);
diff --git a/lib/librte_eventdev/rte_eventdev.h b/lib/librte_eventdev/rte_eventdev.h
index f1949ff..8377b3f 100644
--- a/lib/librte_eventdev/rte_eventdev.h
+++ b/lib/librte_eventdev/rte_eventdev.h
@@ -282,6 +282,16 @@ struct rte_mbuf; /* we just use mbuf pointers; no need to include rte_mbuf.h */
  *
  * @see rte_event_dequeue_burst() rte_event_enqueue_burst()
  */
+#define RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE    (1ULL << 5)
+/**< Event device ports support disabling the implicit release feature, in
+ * which the port will release all unreleased events in its dequeue operation.
+ * If this capability is set and the port is configured with implicit release
+ * disabled, the application is responsible for explicitly releasing events
+ * using either the RTE_EVENT_OP_FORWARD or the RTE_EVENT_OP_RELEASE event
+ * enqueue operations.
+ *
+ * @see rte_event_dequeue_burst() rte_event_enqueue_burst()
+ */
 
 /* Event device priority levels */
 #define RTE_EVENT_DEV_PRIORITY_HIGHEST   0
@@ -686,6 +696,13 @@ struct rte_event_port_conf {
 	 * which previously supplied to rte_event_dev_configure().
 	 * Ignored when device is not RTE_EVENT_DEV_CAP_BURST_MODE capable.
 	 */
+	uint8_t disable_implicit_release;
+	/**< Configure the port not to release outstanding events in
+	 * rte_event_dev_dequeue_burst(). If true, all events received through
+	 * the port must be explicitly released with RTE_EVENT_OP_RELEASE or
+	 * RTE_EVENT_OP_FORWARD. Must be false when the device is not
+	 * RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE capable.
+	 */
 };
 
 /**
@@ -1381,9 +1398,9 @@ rte_event_dequeue_timeout_ticks(uint8_t dev_id, uint64_t ns,
  *
  * The number of events dequeued is the number of scheduler contexts held by
  * this port. These contexts are automatically released in the next
- * rte_event_dequeue_burst() invocation, or invoking rte_event_enqueue_burst()
- * with RTE_EVENT_OP_RELEASE operation can be used to release the
- * contexts early.
+ * rte_event_dequeue_burst() invocation if the port supports implicit
+ * releases, or invoking rte_event_enqueue_burst() with RTE_EVENT_OP_RELEASE
+ * operation can be used to release the contexts early.
  *
  * Event operations RTE_EVENT_OP_FORWARD and RTE_EVENT_OP_RELEASE must only be
  * enqueued to the same port that their associated events were dequeued from.
diff --git a/test/test/test_eventdev.c b/test/test/test_eventdev.c
index 1ed2a1d..f8ee1be 100644
--- a/test/test/test_eventdev.c
+++ b/test/test/test_eventdev.c
@@ -581,6 +581,15 @@ test_eventdev_port_setup(void)
 	ret = rte_event_port_setup(TEST_DEV_ID, 0, &pconf);
 	TEST_ASSERT(ret == -EINVAL, "Expected -EINVAL, %d", ret);
 
+	if (!(info.event_dev_cap &
+	      RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE)) {
+		pconf.enqueue_depth = info.max_event_port_enqueue_depth;
+		pconf.disable_implicit_release = 1;
+		ret = rte_event_port_setup(TEST_DEV_ID, 0, &pconf);
+		TEST_ASSERT(ret == -EINVAL, "Expected -EINVAL, %d", ret);
+		pconf.disable_implicit_release = 0;
+	}
+
 	ret = rte_event_port_setup(TEST_DEV_ID, info.max_event_ports,
 					&pconf);
 	TEST_ASSERT(ret == -EINVAL, "Expected -EINVAL, %d", ret);
diff --git a/test/test/test_eventdev_sw.c b/test/test/test_eventdev_sw.c
index 96ed920..4108b00 100644
--- a/test/test/test_eventdev_sw.c
+++ b/test/test/test_eventdev_sw.c
@@ -200,6 +200,7 @@ create_ports(struct test *t, int num_ports)
 			.new_event_threshold = 1024,
 			.dequeue_depth = 32,
 			.enqueue_depth = 64,
+			.disable_implicit_release = 0,
 	};
 	if (num_ports > MAX_PORTS)
 		return -1;
@@ -1254,6 +1255,7 @@ port_reconfig_credits(struct test *t)
 				.new_event_threshold = 128,
 				.dequeue_depth = 32,
 				.enqueue_depth = 64,
+				.disable_implicit_release = 0,
 		};
 		if (rte_event_port_setup(evdev, 0, &port_conf) < 0) {
 			printf("%d Error setting up port\n", __LINE__);
@@ -1343,6 +1345,7 @@ port_single_lb_reconfig(struct test *t)
 		.new_event_threshold = 128,
 		.dequeue_depth = 32,
 		.enqueue_depth = 64,
+		.disable_implicit_release = 0,
 	};
 	if (rte_event_port_setup(evdev, 0, &port_conf) < 0) {
 		printf("%d Error setting up port\n", __LINE__);
@@ -2935,7 +2938,7 @@ worker_loopback_producer_fn(void *arg)
 }
 
 static int
-worker_loopback(struct test *t)
+worker_loopback(struct test *t, uint8_t disable_implicit_release)
 {
 	/* use a single producer core, and a worker core to see what happens
 	 * if the worker loops packets back multiple times
@@ -2961,6 +2964,7 @@ worker_loopback(struct test *t)
 	 * only be initialized once - and this needs to be set for multiple runs
 	 */
 	conf.new_event_threshold = 512;
+	conf.disable_implicit_release = disable_implicit_release;
 
 	if (rte_event_port_setup(evdev, 0, &conf) < 0) {
 		printf("Error setting up RX port\n");
@@ -3235,15 +3239,23 @@ test_sw_eventdev(void)
 	}
 	if (rte_lcore_count() >= 3) {
 		printf("*** Running Worker loopback test...\n");
-		ret = worker_loopback(t);
+		ret = worker_loopback(t, 0);
+		if (ret != 0) {
+			printf("ERROR - Worker loopback test FAILED.\n");
+			return ret;
+		}
+
+		printf("*** Running Worker loopback test (implicit release disabled)...\n");
+		ret = worker_loopback(t, 1);
 		if (ret != 0) {
 			printf("ERROR - Worker loopback test FAILED.\n");
 			return ret;
 		}
 	} else {
-		printf("### Not enough cores for worker loopback test.\n");
-		printf("### Need at least 3 cores for test.\n");
+		printf("### Not enough cores for worker loopback tests.\n");
+		printf("### Need at least 3 cores for the tests.\n");
 	}
+
 	/*
 	 * Free test instance, leaving mempool initialized, and a pointer to it
 	 * in static eventdev_func_mempool, as it is re-used on re-runs
-- 
2.7.4

^ permalink raw reply	[flat|nested] 10+ messages in thread

* [dpdk-dev] [PATCH v2 2/2] event/sw: simplify credit scheme
  2017-12-11 17:56   ` [dpdk-dev] [PATCH v2 " Gage Eads
@ 2017-12-11 17:56     ` Gage Eads
  2017-12-16  8:50     ` [dpdk-dev] [PATCH v2 1/2] eventdev: add implicit release disable capability Jerin Jacob
  1 sibling, 0 replies; 10+ messages in thread
From: Gage Eads @ 2017-12-11 17:56 UTC (permalink / raw)
  To: dev
  Cc: jerin.jacob, harry.van.haaren, bruce.richardson, hemant.agrawal,
	nipun.gupta, santosh.shukla

This commit modifies the sw PMD credit scheme such that credits are
consumed when enqueueing a NEW event and released when an event is
released -- typically, the beginning and end of a pipeline. Workers that
simply forward events do not interact with the credit pool.

Signed-off-by: Gage Eads <gage.eads@intel.com>
Acked-by: Harry van Haaren <harry.van.haaren@intel.com>
---
 drivers/event/sw/sw_evdev_worker.c | 38 +++++++++++++++++++-------------------
 1 file changed, 19 insertions(+), 19 deletions(-)

diff --git a/drivers/event/sw/sw_evdev_worker.c b/drivers/event/sw/sw_evdev_worker.c
index 93cd29b..766c836 100644
--- a/drivers/event/sw/sw_evdev_worker.c
+++ b/drivers/event/sw/sw_evdev_worker.c
@@ -85,6 +85,7 @@ sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num)
 	struct sw_port *p = port;
 	struct sw_evdev *sw = (void *)p->sw;
 	uint32_t sw_inflights = rte_atomic32_read(&sw->inflights);
+	uint32_t credit_update_quanta = sw->credit_update_quanta;
 	int new = 0;
 
 	if (num > PORT_ENQUEUE_MAX_BURST_SIZE)
@@ -98,7 +99,6 @@ sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num)
 
 	if (p->inflight_credits < new) {
 		/* check if event enqueue brings port over max threshold */
-		uint32_t credit_update_quanta = sw->credit_update_quanta;
 		if (sw_inflights + credit_update_quanta > sw->nb_events_limit)
 			return 0;
 
@@ -109,7 +109,6 @@ sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num)
 			return 0;
 	}
 
-	uint32_t completions = 0;
 	for (i = 0; i < num; i++) {
 		int op = ev[i].op;
 		int outstanding = p->outstanding_releases > 0;
@@ -126,21 +125,16 @@ sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num)
 		 * correct usage of the API), providing very high correct
 		 * prediction rate.
 		 */
-		if ((new_ops[i] & QE_FLAG_COMPLETE) && outstanding) {
+		if ((new_ops[i] & QE_FLAG_COMPLETE) && outstanding)
 			p->outstanding_releases--;
-			completions++;
-		}
 
 		/* error case: branch to avoid touching p->stats */
-		if (unlikely(invalid_qid)) {
+		if (unlikely(invalid_qid && op != RTE_EVENT_OP_RELEASE)) {
 			p->stats.rx_dropped++;
 			p->inflight_credits++;
 		}
 	}
 
-	/* handle directed port forward and release credits */
-	p->inflight_credits -= completions * p->is_directed;
-
 	/* returns number of events actually enqueued */
 	uint32_t enq = enqueue_burst_with_ops(p->rx_worker_ring, ev, i,
 					     new_ops);
@@ -153,6 +147,13 @@ sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num)
 		p->avg_pkt_ticks += burst_pkt_ticks / NUM_SAMPLES;
 		p->last_dequeue_ticks = 0;
 	}
+
+	/* Replenish credits if enough releases are performed */
+	if (p->inflight_credits >= credit_update_quanta * 2) {
+		rte_atomic32_sub(&sw->inflights, credit_update_quanta);
+		p->inflight_credits -= credit_update_quanta;
+	}
+
 	return enq;
 }
 
@@ -168,16 +169,22 @@ sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num,
 {
 	RTE_SET_USED(wait);
 	struct sw_port *p = (void *)port;
-	struct sw_evdev *sw = (void *)p->sw;
 	struct rte_event_ring *ring = p->cq_worker_ring;
-	uint32_t credit_update_quanta = sw->credit_update_quanta;
 
 	/* check that all previous dequeues have been released */
-	if (p->implicit_release && !p->is_directed) {
+	if (p->implicit_release) {
+		struct sw_evdev *sw = (void *)p->sw;
+		uint32_t credit_update_quanta = sw->credit_update_quanta;
 		uint16_t out_rels = p->outstanding_releases;
 		uint16_t i;
 		for (i = 0; i < out_rels; i++)
 			sw_event_release(p, i);
+
+		/* Replenish credits if enough releases are performed */
+		if (p->inflight_credits >= credit_update_quanta * 2) {
+			rte_atomic32_sub(&sw->inflights, credit_update_quanta);
+			p->inflight_credits -= credit_update_quanta;
+		}
 	}
 
 	/* returns number of events actually dequeued */
@@ -188,8 +195,6 @@ sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num,
 		goto end;
 	}
 
-	/* only add credits for directed ports - LB ports send RELEASEs */
-	p->inflight_credits += ndeq * p->is_directed;
 	p->outstanding_releases += ndeq;
 	p->last_dequeue_burst_sz = ndeq;
 	p->last_dequeue_ticks = rte_get_timer_cycles();
@@ -197,11 +202,6 @@ sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num,
 	p->total_polls++;
 
 end:
-	if (p->inflight_credits >= credit_update_quanta * 2 &&
-			p->inflight_credits > credit_update_quanta + ndeq) {
-		rte_atomic32_sub(&sw->inflights, credit_update_quanta);
-		p->inflight_credits -= credit_update_quanta;
-	}
 	return ndeq;
 }
 
-- 
2.7.4

^ permalink raw reply	[flat|nested] 10+ messages in thread

* Re: [dpdk-dev] [PATCH v2 1/2] eventdev: add implicit release disable capability
  2017-12-11 17:56   ` [dpdk-dev] [PATCH v2 " Gage Eads
  2017-12-11 17:56     ` [dpdk-dev] [PATCH v2 2/2] event/sw: simplify credit scheme Gage Eads
@ 2017-12-16  8:50     ` Jerin Jacob
  1 sibling, 0 replies; 10+ messages in thread
From: Jerin Jacob @ 2017-12-16  8:50 UTC (permalink / raw)
  To: Gage Eads
  Cc: dev, harry.van.haaren, bruce.richardson, hemant.agrawal,
	nipun.gupta, santosh.shukla

-----Original Message-----
> Date: Mon, 11 Dec 2017 11:56:31 -0600
> From: Gage Eads <gage.eads@intel.com>
> To: dev@dpdk.org
> CC: jerin.jacob@caviumnetworks.com, harry.van.haaren@intel.com,
>  bruce.richardson@intel.com, hemant.agrawal@nxp.com, nipun.gupta@nxp.com,
>  santosh.shukla@caviumnetworks.com
> Subject: [PATCH v2 1/2] eventdev: add implicit release disable capability
> X-Mailer: git-send-email 2.7.4
> 
> This commit introduces a capability for disabling the "implicit" release
> functionality for a port, which prevents the eventdev PMD from issuing
> outstanding releases for previously dequeued events when dequeuing a new
> batch of events.
> 
> If a PMD does not support this capability, the application will receive an
> error if it attempts to setup a port with implicit releases disabled.
> Otherwise, if the port is configured with implicit releases disabled, the
> application must release each dequeued event by invoking
> rte_event_enqueue_burst() with RTE_EVENT_OP_RELEASE or
> RTE_EVENT_OP_FORWARD.
> 
> Signed-off-by: Gage Eads <gage.eads@intel.com>
> Acked-by: Harry van Haaren <harry.van.haaren@intel.com>

With respect to specification changes:
Acked-by: Jerin Jacob <jerin.jacob@caviumnetworks.com>

^ permalink raw reply	[flat|nested] 10+ messages in thread

* Re: [dpdk-dev] [PATCH 2/2] event/sw: simplify credit scheme
  2017-12-11 13:42   ` Van Haaren, Harry
@ 2017-12-16  8:54     ` Jerin Jacob
  0 siblings, 0 replies; 10+ messages in thread
From: Jerin Jacob @ 2017-12-16  8:54 UTC (permalink / raw)
  To: Van Haaren, Harry
  Cc: Eads, Gage, dev, Richardson, Bruce, hemant.agrawal, nipun.gupta,
	santosh.shukla

-----Original Message-----
> Date: Mon, 11 Dec 2017 13:42:08 +0000
> From: "Van Haaren, Harry" <harry.van.haaren@intel.com>
> To: "Eads, Gage" <gage.eads@intel.com>, "dev@dpdk.org" <dev@dpdk.org>
> CC: "jerin.jacob@caviumnetworks.com" <jerin.jacob@caviumnetworks.com>,
>  "Richardson, Bruce" <bruce.richardson@intel.com>, "hemant.agrawal@nxp.com"
>  <hemant.agrawal@nxp.com>, "nipun.gupta@nxp.com" <nipun.gupta@nxp.com>,
>  "santosh.shukla@caviumnetworks.com" <santosh.shukla@caviumnetworks.com>
> Subject: RE: [PATCH 2/2] event/sw: simplify credit scheme
> 
> > -----Original Message-----
> > From: Eads, Gage
> > Sent: Thursday, November 30, 2017 4:21 AM
> > To: dev@dpdk.org
> > Cc: jerin.jacob@caviumnetworks.com; Van Haaren, Harry
> > <harry.van.haaren@intel.com>; Richardson, Bruce
> > <bruce.richardson@intel.com>; hemant.agrawal@nxp.com; nipun.gupta@nxp.com;
> > santosh.shukla@caviumnetworks.com
> > Subject: [PATCH 2/2] event/sw: simplify credit scheme
> > 
> > This commit modifies the sw PMD credit scheme such that credits are
> > consumed when enqueueing a NEW event and released when an event is
> > released -- typically, the beginning and end of a pipeline. Workers that
> > simply forward events do not interact with the credit pool.
> > 
> > Signed-off-by: Gage Eads <gage.eads@intel.com>
> 
> Acked-by: Harry van Haaren <harry.van.haaren@intel.com>

Applied this series to dpdk-next-eventdev/master. Thanks.

^ permalink raw reply	[flat|nested] 10+ messages in thread

end of thread, other threads:[~2017-12-16  8:55 UTC | newest]

Thread overview: 10+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2017-11-30  4:20 [dpdk-dev] [PATCH 0/2] add implicit release disable capability Gage Eads
2017-11-30  4:20 ` [dpdk-dev] [PATCH 1/2] eventdev: " Gage Eads
2017-12-11 12:36   ` Van Haaren, Harry
2017-12-11 17:39     ` Eads, Gage
2017-12-11 17:56   ` [dpdk-dev] [PATCH v2 " Gage Eads
2017-12-11 17:56     ` [dpdk-dev] [PATCH v2 2/2] event/sw: simplify credit scheme Gage Eads
2017-12-16  8:50     ` [dpdk-dev] [PATCH v2 1/2] eventdev: add implicit release disable capability Jerin Jacob
2017-11-30  4:20 ` [dpdk-dev] [PATCH 2/2] event/sw: simplify credit scheme Gage Eads
2017-12-11 13:42   ` Van Haaren, Harry
2017-12-16  8:54     ` Jerin Jacob

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).