From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mga12.intel.com (mga12.intel.com [192.55.52.136]) by dpdk.org (Postfix) with ESMTP id BBF182965 for ; Tue, 13 Mar 2018 12:34:13 +0100 (CET) X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from fmsmga004.fm.intel.com ([10.253.24.48]) by fmsmga106.fm.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 13 Mar 2018 04:34:12 -0700 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.47,464,1515484800"; d="scan'208";a="36904152" Received: from silpixa00398162.ir.intel.com (HELO silpixa00398162.ger.corp.intel.com) ([10.237.223.171]) by fmsmga004.fm.intel.com with ESMTP; 13 Mar 2018 04:34:11 -0700 From: Liang Ma To: jerin.jacob@caviumnetworks.com Cc: dev@dpdk.org, harry.van.haaren@intel.com, deepak.k.jain@intel.com, john.geary@intel.com, peter.mccarthy@intel.com Date: Tue, 13 Mar 2018 11:34:13 +0000 Message-Id: <1520940853-56748-1-git-send-email-liang.j.ma@intel.com> X-Mailer: git-send-email 2.7.5 Subject: [dpdk-dev] [PATCH] event/opdl: fix atomic queue race condition issue X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Tue, 13 Mar 2018 11:34:15 -0000 If application link one atomic queue to multiple ports, and each worker core update flow_id, there will have a chance to hit race condition issue and lead to double processing same event. This fix solve the problem and eliminate the race condition issue. Fixes: 4236ce9bf5bf ("event/opdl: add OPDL ring infrastructure library") Signed-off-by: Liang Ma Signed-off-by: Peter Mccarthy --- drivers/event/opdl/opdl_evdev_init.c | 3 ++ drivers/event/opdl/opdl_ring.c | 95 +++++++++++++++++++++++++----------- drivers/event/opdl/opdl_ring.h | 16 +++++- 3 files changed, 85 insertions(+), 29 deletions(-) diff --git a/drivers/event/opdl/opdl_evdev_init.c b/drivers/event/opdl/opdl_evdev_init.c index 1454de5..582ad69 100644 --- a/drivers/event/opdl/opdl_evdev_init.c +++ b/drivers/event/opdl/opdl_evdev_init.c @@ -733,6 +733,9 @@ initialise_all_other_ports(struct rte_eventdev *dev) queue->ports[queue->nb_ports] = port; port->instance_id = queue->nb_ports; queue->nb_ports++; + opdl_stage_set_queue_id(stage_inst, + port->queue_id); + } else if (queue->q_pos == OPDL_Q_POS_END) { /* tx port */ diff --git a/drivers/event/opdl/opdl_ring.c b/drivers/event/opdl/opdl_ring.c index eca7712..0f40d31 100644 --- a/drivers/event/opdl/opdl_ring.c +++ b/drivers/event/opdl/opdl_ring.c @@ -25,7 +25,10 @@ #define OPDL_NAME_SIZE 64 -#define OPDL_EVENT_MASK (0xFFFF0000000FFFFFULL) +#define OPDL_EVENT_MASK (0x00000000000FFFFFULL) +#define OPDL_FLOWID_MASK (0xFFFFF) +#define OPDL_OPA_MASK (0xFF) +#define OPDL_OPA_OFFSET (0x38) int opdl_logtype_driver; @@ -86,7 +89,6 @@ struct opdl_stage { */ uint32_t available_seq; uint32_t head; /* Current head for single-thread operation */ - uint32_t shadow_head; /* Shadow head for single-thread operation */ uint32_t nb_instance; /* Number of instances */ uint32_t instance_id; /* ID of this stage instance */ uint16_t num_claimed; /* Number of slots claimed */ @@ -102,6 +104,9 @@ struct opdl_stage { /* For managing disclaims in multi-threaded processing stages */ struct claim_manager pending_disclaims[RTE_MAX_LCORE] __rte_cache_aligned; + uint32_t shadow_head; /* Shadow head for single-thread operation */ + uint32_t queue_id; /* ID of Queue which is assigned to this stage */ + uint32_t pos; /* Atomic scan position */ } __rte_cache_aligned; /* Context for opdl_ring */ @@ -494,6 +499,9 @@ opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries, uint32_t num_entries, uint32_t *seq, bool block, bool atomic) { uint32_t i = 0, j = 0, offset; + uint32_t opa_id = 0; + uint32_t flow_id = 0; + uint64_t event = 0; void *get_slots; struct rte_event *ev; RTE_SET_USED(seq); @@ -520,7 +528,17 @@ opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries, for (j = 0; j < num_entries; j++) { ev = (struct rte_event *)get_slot(t, s->head+j); - if ((ev->flow_id%s->nb_instance) == s->instance_id) { + + event = __atomic_load_n(&(ev->event), + __ATOMIC_ACQUIRE); + + opa_id = OPDL_OPA_MASK&(event>>OPDL_OPA_OFFSET); + flow_id = OPDL_FLOWID_MASK&event; + + if (opa_id >= s->queue_id) + continue; + + if ((flow_id%s->nb_instance) == s->instance_id) { memcpy(entries_offset, ev, t->slot_size); entries_offset += t->slot_size; i++; @@ -531,6 +549,7 @@ opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries, s->head += num_entries; s->num_claimed = num_entries; s->num_event = i; + s->pos = 0; /* automatically disclaim entries if number of rte_events is zero */ if (unlikely(i == 0)) @@ -953,14 +972,19 @@ opdl_ring_get_slot(const struct opdl_ring *t, uint32_t index) } bool -opdl_ring_cas_slot(const struct opdl_stage *s, const struct rte_event *ev, +opdl_ring_cas_slot(struct opdl_stage *s, const struct rte_event *ev, uint32_t index, bool atomic) { - uint32_t i = 0, j = 0, offset; + uint32_t i = 0, offset; struct opdl_ring *t = s->t; struct rte_event *ev_orig = NULL; bool ev_updated = false; - uint64_t ev_temp = 0; + uint64_t ev_temp = 0; + uint64_t ev_update = 0; + + uint32_t opa_id = 0; + uint32_t flow_id = 0; + uint64_t event = 0; if (index > s->num_event) { PMD_DRV_LOG(ERR, "index is overflow"); @@ -984,27 +1008,39 @@ opdl_ring_cas_slot(const struct opdl_stage *s, const struct rte_event *ev, } } else { - for (i = 0; i < s->num_claimed; i++) { + for (i = s->pos; i < s->num_claimed; i++) { ev_orig = (struct rte_event *) get_slot(t, s->shadow_head+i); - if ((ev_orig->flow_id%s->nb_instance) == - s->instance_id) { - - if (j == index) { - if ((ev_orig->event&OPDL_EVENT_MASK) != - ev_temp) { - ev_orig->event = ev->event; - ev_updated = true; - } - if (ev_orig->u64 != ev->u64) { - ev_orig->u64 = ev->u64; - ev_updated = true; - } - - break; + event = __atomic_load_n(&(ev_orig->event), + __ATOMIC_ACQUIRE); + + opa_id = OPDL_OPA_MASK&(event>>OPDL_OPA_OFFSET); + flow_id = OPDL_FLOWID_MASK&event; + + if (opa_id >= s->queue_id) + continue; + + if ((flow_id%s->nb_instance) == s->instance_id) { + ev_update = s->queue_id; + ev_update = (ev_update<event; + + s->pos = i+1; + + if ((event&OPDL_EVENT_MASK) != + ev_temp) { + __atomic_store_n(&(ev_orig->event), + ev_update, + __ATOMIC_RELEASE); + ev_updated = true; } - j++; + if (ev_orig->u64 != ev->u64) { + ev_orig->u64 = ev->u64; + ev_updated = true; + } + + break; } } @@ -1049,11 +1085,7 @@ check_deps(struct opdl_ring *t, struct opdl_stage *deps[], return -EINVAL; } } - if (num_deps > t->num_stages) { - PMD_DRV_LOG(ERR, "num_deps (%u) > number stages (%u)", - num_deps, t->num_stages); - return -EINVAL; - } + return 0; } @@ -1154,6 +1186,13 @@ opdl_stage_get_opdl_ring(const struct opdl_stage *s) } void +opdl_stage_set_queue_id(struct opdl_stage *s, + uint32_t queue_id) +{ + s->queue_id = queue_id; +} + +void opdl_ring_dump(const struct opdl_ring *t, FILE *f) { uint32_t i; diff --git a/drivers/event/opdl/opdl_ring.h b/drivers/event/opdl/opdl_ring.h index 9e8c33e..751a59d 100644 --- a/drivers/event/opdl/opdl_ring.h +++ b/drivers/event/opdl/opdl_ring.h @@ -518,6 +518,20 @@ opdl_stage_find_num_available(struct opdl_stage *s, uint32_t num_entries); struct opdl_stage * opdl_stage_create(struct opdl_ring *t, bool threadsafe); + +/** + * Set the internal queue id for each stage instance. + * + * @param s + * The pointer of stage instance. + * + * @param queue_id + * The value of internal queue id. + */ +void +opdl_stage_set_queue_id(struct opdl_stage *s, + uint32_t queue_id); + /** * Prints information on opdl_ring instance and all its stages * @@ -590,7 +604,7 @@ opdl_ring_set_stage_threadsafe(struct opdl_stage *s, bool threadsafe); */ bool -opdl_ring_cas_slot(const struct opdl_stage *s, const struct rte_event *ev, +opdl_ring_cas_slot(struct opdl_stage *s, const struct rte_event *ev, uint32_t index, bool atomic); #ifdef __cplusplus -- 2.7.5