From: Liang Ma <liang.j.ma@intel.com>
To: jerin.jacob@caviumnetworks.com
Cc: dev@dpdk.org, harry.van.haaren@intel.com,
deepak.k.jain@intel.com, john.geary@intel.com,
peter.mccarthy@intel.com
Subject: [dpdk-dev] [PATCH v2] event/opdl: fix atomic queue race condition issue
Date: Tue, 27 Mar 2018 15:18:12 +0100 [thread overview]
Message-ID: <1522160292-193409-1-git-send-email-liang.j.ma@intel.com> (raw)
In-Reply-To: <1520940853-56748-1-git-send-email-liang.j.ma@intel.com>
If application link one atomic queue to multiple ports,
and each worker core update flow_id, there will have a
chance to hit race condition issue and lead to double processing
same event. This fix solve the problem and eliminate
the race condition issue.
Fixes: 4236ce9bf5bf ("event/opdl: add OPDL ring infrastructure library")
Signed-off-by: Liang Ma <liang.j.ma@intel.com>
Signed-off-by: Peter Mccarthy <peter.mccarthy@intel.com>
---
drivers/event/opdl/opdl_evdev_init.c | 3 ++
drivers/event/opdl/opdl_ring.c | 97 +++++++++++++++++++++++++-----------
drivers/event/opdl/opdl_ring.h | 16 +++++-
3 files changed, 86 insertions(+), 30 deletions(-)
diff --git a/drivers/event/opdl/opdl_evdev_init.c b/drivers/event/opdl/opdl_evdev_init.c
index 1454de5..582ad69 100644
--- a/drivers/event/opdl/opdl_evdev_init.c
+++ b/drivers/event/opdl/opdl_evdev_init.c
@@ -733,6 +733,9 @@ initialise_all_other_ports(struct rte_eventdev *dev)
queue->ports[queue->nb_ports] = port;
port->instance_id = queue->nb_ports;
queue->nb_ports++;
+ opdl_stage_set_queue_id(stage_inst,
+ port->queue_id);
+
} else if (queue->q_pos == OPDL_Q_POS_END) {
/* tx port */
diff --git a/drivers/event/opdl/opdl_ring.c b/drivers/event/opdl/opdl_ring.c
index eca7712..8aca481 100644
--- a/drivers/event/opdl/opdl_ring.c
+++ b/drivers/event/opdl/opdl_ring.c
@@ -25,7 +25,10 @@
#define OPDL_NAME_SIZE 64
-#define OPDL_EVENT_MASK (0xFFFF0000000FFFFFULL)
+#define OPDL_EVENT_MASK (0x00000000000FFFFFULL)
+#define OPDL_FLOWID_MASK (0xFFFFF)
+#define OPDL_OPA_MASK (0xFF)
+#define OPDL_OPA_OFFSET (0x38)
int opdl_logtype_driver;
@@ -86,7 +89,6 @@ struct opdl_stage {
*/
uint32_t available_seq;
uint32_t head; /* Current head for single-thread operation */
- uint32_t shadow_head; /* Shadow head for single-thread operation */
uint32_t nb_instance; /* Number of instances */
uint32_t instance_id; /* ID of this stage instance */
uint16_t num_claimed; /* Number of slots claimed */
@@ -102,6 +104,9 @@ struct opdl_stage {
/* For managing disclaims in multi-threaded processing stages */
struct claim_manager pending_disclaims[RTE_MAX_LCORE]
__rte_cache_aligned;
+ uint32_t shadow_head; /* Shadow head for single-thread operation */
+ uint32_t queue_id; /* ID of Queue which is assigned to this stage */
+ uint32_t pos; /* Atomic scan position */
} __rte_cache_aligned;
/* Context for opdl_ring */
@@ -494,6 +499,9 @@ opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries,
uint32_t num_entries, uint32_t *seq, bool block, bool atomic)
{
uint32_t i = 0, j = 0, offset;
+ uint32_t opa_id = 0;
+ uint32_t flow_id = 0;
+ uint64_t event = 0;
void *get_slots;
struct rte_event *ev;
RTE_SET_USED(seq);
@@ -520,7 +528,17 @@ opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries,
for (j = 0; j < num_entries; j++) {
ev = (struct rte_event *)get_slot(t, s->head+j);
- if ((ev->flow_id%s->nb_instance) == s->instance_id) {
+
+ event = __atomic_load_n(&(ev->event),
+ __ATOMIC_ACQUIRE);
+
+ opa_id = OPDL_OPA_MASK & (event >> OPDL_OPA_OFFSET);
+ flow_id = OPDL_FLOWID_MASK & event;
+
+ if (opa_id >= s->queue_id)
+ continue;
+
+ if ((flow_id % s->nb_instance) == s->instance_id) {
memcpy(entries_offset, ev, t->slot_size);
entries_offset += t->slot_size;
i++;
@@ -531,6 +549,7 @@ opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries,
s->head += num_entries;
s->num_claimed = num_entries;
s->num_event = i;
+ s->pos = 0;
/* automatically disclaim entries if number of rte_events is zero */
if (unlikely(i == 0))
@@ -953,21 +972,26 @@ opdl_ring_get_slot(const struct opdl_ring *t, uint32_t index)
}
bool
-opdl_ring_cas_slot(const struct opdl_stage *s, const struct rte_event *ev,
+opdl_ring_cas_slot(struct opdl_stage *s, const struct rte_event *ev,
uint32_t index, bool atomic)
{
- uint32_t i = 0, j = 0, offset;
+ uint32_t i = 0, offset;
struct opdl_ring *t = s->t;
struct rte_event *ev_orig = NULL;
bool ev_updated = false;
- uint64_t ev_temp = 0;
+ uint64_t ev_temp = 0;
+ uint64_t ev_update = 0;
+
+ uint32_t opa_id = 0;
+ uint32_t flow_id = 0;
+ uint64_t event = 0;
if (index > s->num_event) {
PMD_DRV_LOG(ERR, "index is overflow");
return ev_updated;
}
- ev_temp = ev->event&OPDL_EVENT_MASK;
+ ev_temp = ev->event & OPDL_EVENT_MASK;
if (!atomic) {
offset = opdl_first_entry_id(s->seq, s->nb_instance,
@@ -984,27 +1008,39 @@ opdl_ring_cas_slot(const struct opdl_stage *s, const struct rte_event *ev,
}
} else {
- for (i = 0; i < s->num_claimed; i++) {
+ for (i = s->pos; i < s->num_claimed; i++) {
ev_orig = (struct rte_event *)
get_slot(t, s->shadow_head+i);
- if ((ev_orig->flow_id%s->nb_instance) ==
- s->instance_id) {
-
- if (j == index) {
- if ((ev_orig->event&OPDL_EVENT_MASK) !=
- ev_temp) {
- ev_orig->event = ev->event;
- ev_updated = true;
- }
- if (ev_orig->u64 != ev->u64) {
- ev_orig->u64 = ev->u64;
- ev_updated = true;
- }
-
- break;
+ event = __atomic_load_n(&(ev_orig->event),
+ __ATOMIC_ACQUIRE);
+
+ opa_id = OPDL_OPA_MASK & (event >> OPDL_OPA_OFFSET);
+ flow_id = OPDL_FLOWID_MASK & event;
+
+ if (opa_id >= s->queue_id)
+ continue;
+
+ if ((flow_id % s->nb_instance) == s->instance_id) {
+ ev_update = s->queue_id;
+ ev_update = (ev_update << OPDL_OPA_OFFSET)
+ | ev->event;
+
+ s->pos = i + 1;
+
+ if ((event & OPDL_EVENT_MASK) !=
+ ev_temp) {
+ __atomic_store_n(&(ev_orig->event),
+ ev_update,
+ __ATOMIC_RELEASE);
+ ev_updated = true;
}
- j++;
+ if (ev_orig->u64 != ev->u64) {
+ ev_orig->u64 = ev->u64;
+ ev_updated = true;
+ }
+
+ break;
}
}
@@ -1049,11 +1085,7 @@ check_deps(struct opdl_ring *t, struct opdl_stage *deps[],
return -EINVAL;
}
}
- if (num_deps > t->num_stages) {
- PMD_DRV_LOG(ERR, "num_deps (%u) > number stages (%u)",
- num_deps, t->num_stages);
- return -EINVAL;
- }
+
return 0;
}
@@ -1154,6 +1186,13 @@ opdl_stage_get_opdl_ring(const struct opdl_stage *s)
}
void
+opdl_stage_set_queue_id(struct opdl_stage *s,
+ uint32_t queue_id)
+{
+ s->queue_id = queue_id;
+}
+
+void
opdl_ring_dump(const struct opdl_ring *t, FILE *f)
{
uint32_t i;
diff --git a/drivers/event/opdl/opdl_ring.h b/drivers/event/opdl/opdl_ring.h
index 9e8c33e..751a59d 100644
--- a/drivers/event/opdl/opdl_ring.h
+++ b/drivers/event/opdl/opdl_ring.h
@@ -518,6 +518,20 @@ opdl_stage_find_num_available(struct opdl_stage *s, uint32_t num_entries);
struct opdl_stage *
opdl_stage_create(struct opdl_ring *t, bool threadsafe);
+
+/**
+ * Set the internal queue id for each stage instance.
+ *
+ * @param s
+ * The pointer of stage instance.
+ *
+ * @param queue_id
+ * The value of internal queue id.
+ */
+void
+opdl_stage_set_queue_id(struct opdl_stage *s,
+ uint32_t queue_id);
+
/**
* Prints information on opdl_ring instance and all its stages
*
@@ -590,7 +604,7 @@ opdl_ring_set_stage_threadsafe(struct opdl_stage *s, bool threadsafe);
*/
bool
-opdl_ring_cas_slot(const struct opdl_stage *s, const struct rte_event *ev,
+opdl_ring_cas_slot(struct opdl_stage *s, const struct rte_event *ev,
uint32_t index, bool atomic);
#ifdef __cplusplus
--
2.7.5
next prev parent reply other threads:[~2018-03-27 14:18 UTC|newest]
Thread overview: 6+ messages / expand[flat|nested] mbox.gz Atom feed top
2018-03-13 11:34 [dpdk-dev] [PATCH] " Liang Ma
2018-03-26 12:29 ` Van Haaren, Harry
2018-03-26 13:32 ` Liang, Ma
2018-03-27 14:18 ` Liang Ma [this message]
2018-03-29 14:48 ` [dpdk-dev] [PATCH v2] " Van Haaren, Harry
2018-04-02 4:09 ` Jerin Jacob
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=1522160292-193409-1-git-send-email-liang.j.ma@intel.com \
--to=liang.j.ma@intel.com \
--cc=deepak.k.jain@intel.com \
--cc=dev@dpdk.org \
--cc=harry.van.haaren@intel.com \
--cc=jerin.jacob@caviumnetworks.com \
--cc=john.geary@intel.com \
--cc=peter.mccarthy@intel.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).