DPDK patches and discussions
 help / color / mirror / Atom feed
* [dpdk-dev] [PATCH] event/opdl: fix atomic queue race condition issue
@ 2018-03-13 11:34 Liang Ma
  2018-03-26 12:29 ` Van Haaren, Harry
  2018-03-27 14:18 ` [dpdk-dev] [PATCH v2] " Liang Ma
  0 siblings, 2 replies; 6+ messages in thread
From: Liang Ma @ 2018-03-13 11:34 UTC (permalink / raw)
  To: jerin.jacob
  Cc: dev, harry.van.haaren, deepak.k.jain, john.geary, peter.mccarthy

If application link one atomic queue to multiple ports,
and each worker core update flow_id, there will have a
chance to hit race condition issue and lead to double processing
same event. This fix solve the problem and eliminate
the race condition issue.

Fixes: 4236ce9bf5bf ("event/opdl: add OPDL ring infrastructure library")

Signed-off-by: Liang Ma <liang.j.ma@intel.com>
Signed-off-by: Peter Mccarthy <peter.mccarthy@intel.com>
---
 drivers/event/opdl/opdl_evdev_init.c |  3 ++
 drivers/event/opdl/opdl_ring.c       | 95 +++++++++++++++++++++++++-----------
 drivers/event/opdl/opdl_ring.h       | 16 +++++-
 3 files changed, 85 insertions(+), 29 deletions(-)

diff --git a/drivers/event/opdl/opdl_evdev_init.c b/drivers/event/opdl/opdl_evdev_init.c
index 1454de5..582ad69 100644
--- a/drivers/event/opdl/opdl_evdev_init.c
+++ b/drivers/event/opdl/opdl_evdev_init.c
@@ -733,6 +733,9 @@ initialise_all_other_ports(struct rte_eventdev *dev)
 				queue->ports[queue->nb_ports] = port;
 				port->instance_id = queue->nb_ports;
 				queue->nb_ports++;
+				opdl_stage_set_queue_id(stage_inst,
+						port->queue_id);
+
 			} else if (queue->q_pos == OPDL_Q_POS_END) {
 
 				/* tx port  */
diff --git a/drivers/event/opdl/opdl_ring.c b/drivers/event/opdl/opdl_ring.c
index eca7712..0f40d31 100644
--- a/drivers/event/opdl/opdl_ring.c
+++ b/drivers/event/opdl/opdl_ring.c
@@ -25,7 +25,10 @@
 #define OPDL_NAME_SIZE 64
 
 
-#define OPDL_EVENT_MASK  (0xFFFF0000000FFFFFULL)
+#define OPDL_EVENT_MASK  (0x00000000000FFFFFULL)
+#define OPDL_FLOWID_MASK (0xFFFFF)
+#define OPDL_OPA_MASK    (0xFF)
+#define OPDL_OPA_OFFSET  (0x38)
 
 int opdl_logtype_driver;
 
@@ -86,7 +89,6 @@ struct opdl_stage {
 	 */
 	uint32_t available_seq;
 	uint32_t head;  /* Current head for single-thread operation */
-	uint32_t shadow_head;  /* Shadow head for single-thread operation */
 	uint32_t nb_instance;  /* Number of instances */
 	uint32_t instance_id;  /* ID of this stage instance */
 	uint16_t num_claimed;  /* Number of slots claimed */
@@ -102,6 +104,9 @@ struct opdl_stage {
 	/* For managing disclaims in multi-threaded processing stages */
 	struct claim_manager pending_disclaims[RTE_MAX_LCORE]
 					       __rte_cache_aligned;
+	uint32_t shadow_head;  /* Shadow head for single-thread operation */
+	uint32_t queue_id;     /* ID of Queue which is assigned to this stage */
+	uint32_t pos;		/* Atomic scan position */
 } __rte_cache_aligned;
 
 /* Context for opdl_ring */
@@ -494,6 +499,9 @@ opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries,
 		uint32_t num_entries, uint32_t *seq, bool block, bool atomic)
 {
 	uint32_t i = 0, j = 0,  offset;
+	uint32_t opa_id   = 0;
+	uint32_t flow_id  = 0;
+	uint64_t event    = 0;
 	void *get_slots;
 	struct rte_event *ev;
 	RTE_SET_USED(seq);
@@ -520,7 +528,17 @@ opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries,
 
 		for (j = 0; j < num_entries; j++) {
 			ev = (struct rte_event *)get_slot(t, s->head+j);
-			if ((ev->flow_id%s->nb_instance) == s->instance_id) {
+
+			event  = __atomic_load_n(&(ev->event),
+					__ATOMIC_ACQUIRE);
+
+			opa_id = OPDL_OPA_MASK&(event>>OPDL_OPA_OFFSET);
+			flow_id  = OPDL_FLOWID_MASK&event;
+
+			if (opa_id >= s->queue_id)
+				continue;
+
+			if ((flow_id%s->nb_instance) == s->instance_id) {
 				memcpy(entries_offset, ev, t->slot_size);
 				entries_offset += t->slot_size;
 				i++;
@@ -531,6 +549,7 @@ opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries,
 	s->head += num_entries;
 	s->num_claimed = num_entries;
 	s->num_event = i;
+	s->pos = 0;
 
 	/* automatically disclaim entries if number of rte_events is zero */
 	if (unlikely(i == 0))
@@ -953,14 +972,19 @@ opdl_ring_get_slot(const struct opdl_ring *t, uint32_t index)
 }
 
 bool
-opdl_ring_cas_slot(const struct opdl_stage *s, const struct rte_event *ev,
+opdl_ring_cas_slot(struct opdl_stage *s, const struct rte_event *ev,
 		uint32_t index, bool atomic)
 {
-	uint32_t i = 0, j = 0, offset;
+	uint32_t i = 0, offset;
 	struct opdl_ring *t = s->t;
 	struct rte_event *ev_orig = NULL;
 	bool ev_updated = false;
-	uint64_t  ev_temp = 0;
+	uint64_t ev_temp    = 0;
+	uint64_t ev_update  = 0;
+
+	uint32_t opa_id   = 0;
+	uint32_t flow_id  = 0;
+	uint64_t event    = 0;
 
 	if (index > s->num_event) {
 		PMD_DRV_LOG(ERR, "index is overflow");
@@ -984,27 +1008,39 @@ opdl_ring_cas_slot(const struct opdl_stage *s, const struct rte_event *ev,
 		}
 
 	} else {
-		for (i = 0; i < s->num_claimed; i++) {
+		for (i = s->pos; i < s->num_claimed; i++) {
 			ev_orig = (struct rte_event *)
 				get_slot(t, s->shadow_head+i);
 
-			if ((ev_orig->flow_id%s->nb_instance) ==
-					s->instance_id) {
-
-				if (j == index) {
-					if ((ev_orig->event&OPDL_EVENT_MASK) !=
-							ev_temp) {
-						ev_orig->event = ev->event;
-						ev_updated = true;
-					}
-					if (ev_orig->u64 != ev->u64) {
-						ev_orig->u64 = ev->u64;
-						ev_updated = true;
-					}
-
-					break;
+			event  = __atomic_load_n(&(ev_orig->event),
+					__ATOMIC_ACQUIRE);
+
+			opa_id = OPDL_OPA_MASK&(event>>OPDL_OPA_OFFSET);
+			flow_id  = OPDL_FLOWID_MASK&event;
+
+			if (opa_id >= s->queue_id)
+				continue;
+
+			if ((flow_id%s->nb_instance) == s->instance_id) {
+				ev_update = s->queue_id;
+				ev_update = (ev_update<<OPDL_OPA_OFFSET)
+					|ev->event;
+
+				s->pos = i+1;
+
+				if ((event&OPDL_EVENT_MASK) !=
+						ev_temp) {
+					__atomic_store_n(&(ev_orig->event),
+							ev_update,
+							__ATOMIC_RELEASE);
+					ev_updated = true;
 				}
-				j++;
+				if (ev_orig->u64 != ev->u64) {
+					ev_orig->u64 = ev->u64;
+					ev_updated = true;
+				}
+
+				break;
 			}
 		}
 
@@ -1049,11 +1085,7 @@ check_deps(struct opdl_ring *t, struct opdl_stage *deps[],
 			return -EINVAL;
 		}
 	}
-	if (num_deps > t->num_stages) {
-		PMD_DRV_LOG(ERR, "num_deps (%u) > number stages (%u)",
-				num_deps, t->num_stages);
-		return -EINVAL;
-	}
+
 	return 0;
 }
 
@@ -1154,6 +1186,13 @@ opdl_stage_get_opdl_ring(const struct opdl_stage *s)
 }
 
 void
+opdl_stage_set_queue_id(struct opdl_stage *s,
+		uint32_t queue_id)
+{
+	s->queue_id = queue_id;
+}
+
+void
 opdl_ring_dump(const struct opdl_ring *t, FILE *f)
 {
 	uint32_t i;
diff --git a/drivers/event/opdl/opdl_ring.h b/drivers/event/opdl/opdl_ring.h
index 9e8c33e..751a59d 100644
--- a/drivers/event/opdl/opdl_ring.h
+++ b/drivers/event/opdl/opdl_ring.h
@@ -518,6 +518,20 @@ opdl_stage_find_num_available(struct opdl_stage *s, uint32_t num_entries);
 struct opdl_stage *
 opdl_stage_create(struct opdl_ring *t,  bool threadsafe);
 
+
+/**
+ * Set the internal queue id for each stage instance.
+ *
+ * @param s
+ *   The pointer of  stage instance.
+ *
+ * @param queue_id
+ *    The value of internal queue id.
+ */
+void
+opdl_stage_set_queue_id(struct opdl_stage *s,
+		uint32_t queue_id);
+
 /**
  * Prints information on opdl_ring instance and all its stages
  *
@@ -590,7 +604,7 @@ opdl_ring_set_stage_threadsafe(struct opdl_stage *s, bool threadsafe);
  */
 
 bool
-opdl_ring_cas_slot(const struct opdl_stage *s, const struct rte_event *ev,
+opdl_ring_cas_slot(struct opdl_stage *s, const struct rte_event *ev,
 		uint32_t index, bool atomic);
 
 #ifdef __cplusplus
-- 
2.7.5

^ permalink raw reply	[flat|nested] 6+ messages in thread

* Re: [dpdk-dev] [PATCH] event/opdl: fix atomic queue race condition issue
  2018-03-13 11:34 [dpdk-dev] [PATCH] event/opdl: fix atomic queue race condition issue Liang Ma
@ 2018-03-26 12:29 ` Van Haaren, Harry
  2018-03-26 13:32   ` Liang, Ma
  2018-03-27 14:18 ` [dpdk-dev] [PATCH v2] " Liang Ma
  1 sibling, 1 reply; 6+ messages in thread
From: Van Haaren, Harry @ 2018-03-26 12:29 UTC (permalink / raw)
  To: Ma, Liang J
  Cc: dev, Jain, Deepak K, Geary, John, Mccarthy, Peter, jerin.jacob

> From: Ma, Liang J
> Sent: Tuesday, March 13, 2018 11:34 AM
> To: jerin.jacob@caviumnetworks.com
> Cc: dev@dpdk.org; Van Haaren, Harry <harry.van.haaren@intel.com>; Jain, Deepak
> K <deepak.k.jain@intel.com>; Geary, John <john.geary@intel.com>; Mccarthy,
> Peter <peter.mccarthy@intel.com>
> Subject: [PATCH] event/opdl: fix atomic queue race condition issue
> 
> If application link one atomic queue to multiple ports,
> and each worker core update flow_id, there will have a
> chance to hit race condition issue and lead to double processing
> same event. This fix solve the problem and eliminate
> the race condition issue.
> 
> Fixes: 4236ce9bf5bf ("event/opdl: add OPDL ring infrastructure library")
> 

General notes
- Spaces around & % << >> and other bitwise manipulations (https://dpdk.org/doc/guides/contributing/coding_style.html#operators)
- I've noted a few below, but there are more
- Usually checkpatch flags these - I'm curious why it didn't in this case

It would be nice if we didn't have to rely on __atomic_load_n() and friends,
however I don't see a better alternative. Given other DPDK components are
also using __atomic_* functions, no objection here.


<snip>

> @@ -520,7 +528,17 @@ opdl_stage_claim_singlethread(struct opdl_stage *s, void
> *entries,
> 
>  		for (j = 0; j < num_entries; j++) {
>  			ev = (struct rte_event *)get_slot(t, s->head+j);
> -			if ((ev->flow_id%s->nb_instance) == s->instance_id) {

Spaces around the %

> +
> +			event  = __atomic_load_n(&(ev->event),
> +					__ATOMIC_ACQUIRE);
> +
> +			opa_id = OPDL_OPA_MASK&(event>>OPDL_OPA_OFFSET);

Spaces &

> +			flow_id  = OPDL_FLOWID_MASK&event;

Spaces &

> +
> +			if (opa_id >= s->queue_id)
> +				continue;
> +
> +			if ((flow_id%s->nb_instance) == s->instance_id) {

Spaces %

<snip rest of patch>


Will re-review v2. Cheers, -Harry

^ permalink raw reply	[flat|nested] 6+ messages in thread

* Re: [dpdk-dev] [PATCH] event/opdl: fix atomic queue race condition issue
  2018-03-26 12:29 ` Van Haaren, Harry
@ 2018-03-26 13:32   ` Liang, Ma
  0 siblings, 0 replies; 6+ messages in thread
From: Liang, Ma @ 2018-03-26 13:32 UTC (permalink / raw)
  To: Van Haaren, Harry
  Cc: dev, Jain, Deepak K, Geary, John, Mccarthy, Peter, jerin.jacob

On 26 Mar 05:29, Van Haaren, Harry wrote:
> > From: Ma, Liang J
> > Sent: Tuesday, March 13, 2018 11:34 AM
> > To: jerin.jacob@caviumnetworks.com
> > Cc: dev@dpdk.org; Van Haaren, Harry <harry.van.haaren@intel.com>; Jain, Deepak
> > K <deepak.k.jain@intel.com>; Geary, John <john.geary@intel.com>; Mccarthy,
> > Peter <peter.mccarthy@intel.com>
> > Subject: [PATCH] event/opdl: fix atomic queue race condition issue
> > 
> > If application link one atomic queue to multiple ports,
> > and each worker core update flow_id, there will have a
> > chance to hit race condition issue and lead to double processing
> > same event. This fix solve the problem and eliminate
> > the race condition issue.
> > 
> > Fixes: 4236ce9bf5bf ("event/opdl: add OPDL ring infrastructure library")
> > 
> 
> General notes
> - Spaces around & % << >> and other bitwise manipulations (https://dpdk.org/doc/guides/contributing/coding_style.html#operators)
> - I've noted a few below, but there are more
> - Usually checkpatch flags these - I'm curious why it didn't in this case
> 
> It would be nice if we didn't have to rely on __atomic_load_n() and friends,
> however I don't see a better alternative. Given other DPDK components are
> also using __atomic_* functions, no objection here.
> 
> 
> <snip>
> 
> > @@ -520,7 +528,17 @@ opdl_stage_claim_singlethread(struct opdl_stage *s, void
> > *entries,
> > 
> >  		for (j = 0; j < num_entries; j++) {
> >  			ev = (struct rte_event *)get_slot(t, s->head+j);
> > -			if ((ev->flow_id%s->nb_instance) == s->instance_id) {
> 
> Spaces around the %
> 
> > +
> > +			event  = __atomic_load_n(&(ev->event),
> > +					__ATOMIC_ACQUIRE);
> > +
> > +			opa_id = OPDL_OPA_MASK&(event>>OPDL_OPA_OFFSET);
> 
> Spaces &
> 
> > +			flow_id  = OPDL_FLOWID_MASK&event;
> 
> Spaces &
> 
> > +
> > +			if (opa_id >= s->queue_id)
> > +				continue;
> > +
> > +			if ((flow_id%s->nb_instance) == s->instance_id) {
> 
> Spaces %
> 
> <snip rest of patch>
> 
> 
> Will re-review v2. Cheers, -Harry
Thanks, I will send V2 later. 

^ permalink raw reply	[flat|nested] 6+ messages in thread

* [dpdk-dev] [PATCH v2] event/opdl: fix atomic queue race condition issue
  2018-03-13 11:34 [dpdk-dev] [PATCH] event/opdl: fix atomic queue race condition issue Liang Ma
  2018-03-26 12:29 ` Van Haaren, Harry
@ 2018-03-27 14:18 ` Liang Ma
  2018-03-29 14:48   ` Van Haaren, Harry
  1 sibling, 1 reply; 6+ messages in thread
From: Liang Ma @ 2018-03-27 14:18 UTC (permalink / raw)
  To: jerin.jacob
  Cc: dev, harry.van.haaren, deepak.k.jain, john.geary, peter.mccarthy

If application link one atomic queue to multiple ports,
and each worker core update flow_id, there will have a
chance to hit race condition issue and lead to double processing
same event. This fix solve the problem and eliminate
the race condition issue.

Fixes: 4236ce9bf5bf ("event/opdl: add OPDL ring infrastructure library")

Signed-off-by: Liang Ma <liang.j.ma@intel.com>
Signed-off-by: Peter Mccarthy <peter.mccarthy@intel.com>
---
 drivers/event/opdl/opdl_evdev_init.c |  3 ++
 drivers/event/opdl/opdl_ring.c       | 97 +++++++++++++++++++++++++-----------
 drivers/event/opdl/opdl_ring.h       | 16 +++++-
 3 files changed, 86 insertions(+), 30 deletions(-)

diff --git a/drivers/event/opdl/opdl_evdev_init.c b/drivers/event/opdl/opdl_evdev_init.c
index 1454de5..582ad69 100644
--- a/drivers/event/opdl/opdl_evdev_init.c
+++ b/drivers/event/opdl/opdl_evdev_init.c
@@ -733,6 +733,9 @@ initialise_all_other_ports(struct rte_eventdev *dev)
 				queue->ports[queue->nb_ports] = port;
 				port->instance_id = queue->nb_ports;
 				queue->nb_ports++;
+				opdl_stage_set_queue_id(stage_inst,
+						port->queue_id);
+
 			} else if (queue->q_pos == OPDL_Q_POS_END) {
 
 				/* tx port  */
diff --git a/drivers/event/opdl/opdl_ring.c b/drivers/event/opdl/opdl_ring.c
index eca7712..8aca481 100644
--- a/drivers/event/opdl/opdl_ring.c
+++ b/drivers/event/opdl/opdl_ring.c
@@ -25,7 +25,10 @@
 #define OPDL_NAME_SIZE 64
 
 
-#define OPDL_EVENT_MASK  (0xFFFF0000000FFFFFULL)
+#define OPDL_EVENT_MASK  (0x00000000000FFFFFULL)
+#define OPDL_FLOWID_MASK (0xFFFFF)
+#define OPDL_OPA_MASK    (0xFF)
+#define OPDL_OPA_OFFSET  (0x38)
 
 int opdl_logtype_driver;
 
@@ -86,7 +89,6 @@ struct opdl_stage {
 	 */
 	uint32_t available_seq;
 	uint32_t head;  /* Current head for single-thread operation */
-	uint32_t shadow_head;  /* Shadow head for single-thread operation */
 	uint32_t nb_instance;  /* Number of instances */
 	uint32_t instance_id;  /* ID of this stage instance */
 	uint16_t num_claimed;  /* Number of slots claimed */
@@ -102,6 +104,9 @@ struct opdl_stage {
 	/* For managing disclaims in multi-threaded processing stages */
 	struct claim_manager pending_disclaims[RTE_MAX_LCORE]
 					       __rte_cache_aligned;
+	uint32_t shadow_head;  /* Shadow head for single-thread operation */
+	uint32_t queue_id;     /* ID of Queue which is assigned to this stage */
+	uint32_t pos;		/* Atomic scan position */
 } __rte_cache_aligned;
 
 /* Context for opdl_ring */
@@ -494,6 +499,9 @@ opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries,
 		uint32_t num_entries, uint32_t *seq, bool block, bool atomic)
 {
 	uint32_t i = 0, j = 0,  offset;
+	uint32_t opa_id   = 0;
+	uint32_t flow_id  = 0;
+	uint64_t event    = 0;
 	void *get_slots;
 	struct rte_event *ev;
 	RTE_SET_USED(seq);
@@ -520,7 +528,17 @@ opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries,
 
 		for (j = 0; j < num_entries; j++) {
 			ev = (struct rte_event *)get_slot(t, s->head+j);
-			if ((ev->flow_id%s->nb_instance) == s->instance_id) {
+
+			event  = __atomic_load_n(&(ev->event),
+					__ATOMIC_ACQUIRE);
+
+			opa_id = OPDL_OPA_MASK & (event >> OPDL_OPA_OFFSET);
+			flow_id  = OPDL_FLOWID_MASK & event;
+
+			if (opa_id >= s->queue_id)
+				continue;
+
+			if ((flow_id % s->nb_instance) == s->instance_id) {
 				memcpy(entries_offset, ev, t->slot_size);
 				entries_offset += t->slot_size;
 				i++;
@@ -531,6 +549,7 @@ opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries,
 	s->head += num_entries;
 	s->num_claimed = num_entries;
 	s->num_event = i;
+	s->pos = 0;
 
 	/* automatically disclaim entries if number of rte_events is zero */
 	if (unlikely(i == 0))
@@ -953,21 +972,26 @@ opdl_ring_get_slot(const struct opdl_ring *t, uint32_t index)
 }
 
 bool
-opdl_ring_cas_slot(const struct opdl_stage *s, const struct rte_event *ev,
+opdl_ring_cas_slot(struct opdl_stage *s, const struct rte_event *ev,
 		uint32_t index, bool atomic)
 {
-	uint32_t i = 0, j = 0, offset;
+	uint32_t i = 0, offset;
 	struct opdl_ring *t = s->t;
 	struct rte_event *ev_orig = NULL;
 	bool ev_updated = false;
-	uint64_t  ev_temp = 0;
+	uint64_t ev_temp    = 0;
+	uint64_t ev_update  = 0;
+
+	uint32_t opa_id   = 0;
+	uint32_t flow_id  = 0;
+	uint64_t event    = 0;
 
 	if (index > s->num_event) {
 		PMD_DRV_LOG(ERR, "index is overflow");
 		return ev_updated;
 	}
 
-	ev_temp = ev->event&OPDL_EVENT_MASK;
+	ev_temp = ev->event & OPDL_EVENT_MASK;
 
 	if (!atomic) {
 		offset = opdl_first_entry_id(s->seq, s->nb_instance,
@@ -984,27 +1008,39 @@ opdl_ring_cas_slot(const struct opdl_stage *s, const struct rte_event *ev,
 		}
 
 	} else {
-		for (i = 0; i < s->num_claimed; i++) {
+		for (i = s->pos; i < s->num_claimed; i++) {
 			ev_orig = (struct rte_event *)
 				get_slot(t, s->shadow_head+i);
 
-			if ((ev_orig->flow_id%s->nb_instance) ==
-					s->instance_id) {
-
-				if (j == index) {
-					if ((ev_orig->event&OPDL_EVENT_MASK) !=
-							ev_temp) {
-						ev_orig->event = ev->event;
-						ev_updated = true;
-					}
-					if (ev_orig->u64 != ev->u64) {
-						ev_orig->u64 = ev->u64;
-						ev_updated = true;
-					}
-
-					break;
+			event  = __atomic_load_n(&(ev_orig->event),
+					__ATOMIC_ACQUIRE);
+
+			opa_id = OPDL_OPA_MASK & (event >> OPDL_OPA_OFFSET);
+			flow_id  = OPDL_FLOWID_MASK & event;
+
+			if (opa_id >= s->queue_id)
+				continue;
+
+			if ((flow_id % s->nb_instance) == s->instance_id) {
+				ev_update = s->queue_id;
+				ev_update = (ev_update << OPDL_OPA_OFFSET)
+					| ev->event;
+
+				s->pos = i + 1;
+
+				if ((event & OPDL_EVENT_MASK) !=
+						ev_temp) {
+					__atomic_store_n(&(ev_orig->event),
+							ev_update,
+							__ATOMIC_RELEASE);
+					ev_updated = true;
 				}
-				j++;
+				if (ev_orig->u64 != ev->u64) {
+					ev_orig->u64 = ev->u64;
+					ev_updated = true;
+				}
+
+				break;
 			}
 		}
 
@@ -1049,11 +1085,7 @@ check_deps(struct opdl_ring *t, struct opdl_stage *deps[],
 			return -EINVAL;
 		}
 	}
-	if (num_deps > t->num_stages) {
-		PMD_DRV_LOG(ERR, "num_deps (%u) > number stages (%u)",
-				num_deps, t->num_stages);
-		return -EINVAL;
-	}
+
 	return 0;
 }
 
@@ -1154,6 +1186,13 @@ opdl_stage_get_opdl_ring(const struct opdl_stage *s)
 }
 
 void
+opdl_stage_set_queue_id(struct opdl_stage *s,
+		uint32_t queue_id)
+{
+	s->queue_id = queue_id;
+}
+
+void
 opdl_ring_dump(const struct opdl_ring *t, FILE *f)
 {
 	uint32_t i;
diff --git a/drivers/event/opdl/opdl_ring.h b/drivers/event/opdl/opdl_ring.h
index 9e8c33e..751a59d 100644
--- a/drivers/event/opdl/opdl_ring.h
+++ b/drivers/event/opdl/opdl_ring.h
@@ -518,6 +518,20 @@ opdl_stage_find_num_available(struct opdl_stage *s, uint32_t num_entries);
 struct opdl_stage *
 opdl_stage_create(struct opdl_ring *t,  bool threadsafe);
 
+
+/**
+ * Set the internal queue id for each stage instance.
+ *
+ * @param s
+ *   The pointer of  stage instance.
+ *
+ * @param queue_id
+ *    The value of internal queue id.
+ */
+void
+opdl_stage_set_queue_id(struct opdl_stage *s,
+		uint32_t queue_id);
+
 /**
  * Prints information on opdl_ring instance and all its stages
  *
@@ -590,7 +604,7 @@ opdl_ring_set_stage_threadsafe(struct opdl_stage *s, bool threadsafe);
  */
 
 bool
-opdl_ring_cas_slot(const struct opdl_stage *s, const struct rte_event *ev,
+opdl_ring_cas_slot(struct opdl_stage *s, const struct rte_event *ev,
 		uint32_t index, bool atomic);
 
 #ifdef __cplusplus
-- 
2.7.5

^ permalink raw reply	[flat|nested] 6+ messages in thread

* Re: [dpdk-dev] [PATCH v2] event/opdl: fix atomic queue race condition issue
  2018-03-27 14:18 ` [dpdk-dev] [PATCH v2] " Liang Ma
@ 2018-03-29 14:48   ` Van Haaren, Harry
  2018-04-02  4:09     ` Jerin Jacob
  0 siblings, 1 reply; 6+ messages in thread
From: Van Haaren, Harry @ 2018-03-29 14:48 UTC (permalink / raw)
  To: Ma, Liang J, jerin.jacob
  Cc: dev, Jain, Deepak K, Geary, John, Mccarthy, Peter

> From: Ma, Liang J
> Sent: Tuesday, March 27, 2018 3:18 PM
> To: jerin.jacob@caviumnetworks.com
> Cc: dev@dpdk.org; Van Haaren, Harry <harry.van.haaren@intel.com>; Jain,
> Deepak K <deepak.k.jain@intel.com>; Geary, John <john.geary@intel.com>;
> Mccarthy, Peter <peter.mccarthy@intel.com>
> Subject: [PATCH v2] event/opdl: fix atomic queue race condition issue
> 
> If application link one atomic queue to multiple ports,
> and each worker core update flow_id, there will have a
> chance to hit race condition issue and lead to double processing
> same event. This fix solve the problem and eliminate
> the race condition issue.
> 
> Fixes: 4236ce9bf5bf ("event/opdl: add OPDL ring infrastructure library")
> 
> Signed-off-by: Liang Ma <liang.j.ma@intel.com>
> Signed-off-by: Peter Mccarthy <peter.mccarthy@intel.com>

Note; compilation flagged as failing on patchwork, but its due to master build broken not this patch.

Acked-by: Harry van Haaren <harry.van.haaren@intel.com>

^ permalink raw reply	[flat|nested] 6+ messages in thread

* Re: [dpdk-dev] [PATCH v2] event/opdl: fix atomic queue race condition issue
  2018-03-29 14:48   ` Van Haaren, Harry
@ 2018-04-02  4:09     ` Jerin Jacob
  0 siblings, 0 replies; 6+ messages in thread
From: Jerin Jacob @ 2018-04-02  4:09 UTC (permalink / raw)
  To: Van Haaren, Harry
  Cc: Ma, Liang J, dev, Jain, Deepak K, Geary, John, Mccarthy, Peter

-----Original Message-----
> Date: Thu, 29 Mar 2018 14:48:26 +0000
> From: "Van Haaren, Harry" <harry.van.haaren@intel.com>
> To: "Ma, Liang J" <liang.j.ma@intel.com>, "jerin.jacob@caviumnetworks.com"
>  <jerin.jacob@caviumnetworks.com>
> CC: "dev@dpdk.org" <dev@dpdk.org>, "Jain, Deepak K"
>  <deepak.k.jain@intel.com>, "Geary, John" <john.geary@intel.com>,
>  "Mccarthy, Peter" <peter.mccarthy@intel.com>
> Subject: RE: [PATCH v2] event/opdl: fix atomic queue race condition issue
> 
> > From: Ma, Liang J
> > Sent: Tuesday, March 27, 2018 3:18 PM
> > To: jerin.jacob@caviumnetworks.com
> > Cc: dev@dpdk.org; Van Haaren, Harry <harry.van.haaren@intel.com>; Jain,
> > Deepak K <deepak.k.jain@intel.com>; Geary, John <john.geary@intel.com>;
> > Mccarthy, Peter <peter.mccarthy@intel.com>
> > Subject: [PATCH v2] event/opdl: fix atomic queue race condition issue
> > 
> > If application link one atomic queue to multiple ports,
> > and each worker core update flow_id, there will have a
> > chance to hit race condition issue and lead to double processing
> > same event. This fix solve the problem and eliminate
> > the race condition issue.
> > 
> > Fixes: 4236ce9bf5bf ("event/opdl: add OPDL ring infrastructure library")
> > 
> > Signed-off-by: Liang Ma <liang.j.ma@intel.com>
> > Signed-off-by: Peter Mccarthy <peter.mccarthy@intel.com>
> 
> Note; compilation flagged as failing on patchwork, but its due to master build broken not this patch.
> 
> Acked-by: Harry van Haaren <harry.van.haaren@intel.com>

Cc: stable@dpdk.org

Applied to dpdk-next-eventdev/master. Thanks.

> 

^ permalink raw reply	[flat|nested] 6+ messages in thread

end of thread, other threads:[~2018-04-02  4:09 UTC | newest]

Thread overview: 6+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2018-03-13 11:34 [dpdk-dev] [PATCH] event/opdl: fix atomic queue race condition issue Liang Ma
2018-03-26 12:29 ` Van Haaren, Harry
2018-03-26 13:32   ` Liang, Ma
2018-03-27 14:18 ` [dpdk-dev] [PATCH v2] " Liang Ma
2018-03-29 14:48   ` Van Haaren, Harry
2018-04-02  4:09     ` Jerin Jacob

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).