From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mail-wr0-f196.google.com (mail-wr0-f196.google.com [209.85.128.196]) by dpdk.org (Postfix) with ESMTP id 73C8B10BD for ; Mon, 30 Apr 2018 16:43:46 +0200 (CEST) Received: by mail-wr0-f196.google.com with SMTP id p18-v6so8293167wrm.1 for ; Mon, 30 Apr 2018 07:43:46 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=from:to:cc:subject:date:message-id:in-reply-to:references; bh=i8kOiqnxz15vqGHownXtq+V35f6x9mlqHsjvO9Zr6Wg=; b=V66q5kq76qVneORmbZRYRHbccEBj05UPpQ0TX1m6qxfaTt14LEzUxhRgFK/9Ez355B UCB/c9jsh5btVgCUP/zCsJYVhXDTF3dhVKNCuSEf53gw2JqhPivH8+4QyACP6OW0vS/k 0aMulOpn2bv35BSEmEMDLyRk/4LqwJ6INjVF/w/SrCTz0vsneWSXgr19/kbLEAh2lglF o3NY7XF5jZE8pyHvZ3Gz5dY0a5c+Cif7NsyVW97ltYTrWB2br7DI8cuW2bOIf7bEGd+o qTTnk99gPVqC2kH2ABR7JVTCKJauO/QwPeAMPivENegZ08xauZQC9FT0DgNsNrWy16Lj Jk9g== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references; bh=i8kOiqnxz15vqGHownXtq+V35f6x9mlqHsjvO9Zr6Wg=; b=AOVcfp5Z50GH5LUUMdmLrEHv51Yba1h8EXBGK8TKoSHf5Akhs9J2+SeZnIUzMgjIQC 0AJrzXUnpTaKpT5pMzjEBPaIiyZUu//6ZS7q6VilWdPl4oXi7Zqjoov4yB+FhbieDSZS tFKah6jvGW/p2lTCUJMQNOk0wTnF4/Ci1mx5MFh9vHzZiL/9pqWvQyyKHsNDmHkSJk60 jQq3lq9GRH4nUH7xn8TmqS0tIuhtnpyMK/K9w3u3F3NerZ8bFNSa8Tler2C6rivkmVqn 0Sg08L6nO4HdbrL9mxF31VaxKEv9MDC4BTg21aaNn8LuedQkoxSj9nkrLlfDBW8q1a9E ktRQ== X-Gm-Message-State: ALQs6tDP4cgqYbJ+ZpZ2uOyxRjb9qpXm9MkFo79H9zGCrK/aaYMgaqkK jBgY9ZqukgUmNjVRoq8tfEmCvsz8m5o= X-Google-Smtp-Source: AB8JxZqs6AsUSg0MoBL5goh6KIErs672B3BViFjK4lJRWKnRdMFW5+3uAjyw6M6clFX3grduhbjedQ== X-Received: by 2002:adf:b850:: with SMTP id u16-v6mr8975811wrf.64.1525099426051; Mon, 30 Apr 2018 07:43:46 -0700 (PDT) Received: from localhost ([2a00:23c5:be9a:5200:ce4c:82c0:d567:ecbb]) by smtp.gmail.com with ESMTPSA id v12-v6sm6505414wrm.68.2018.04.30.07.43.44 (version=TLS1_2 cipher=ECDHE-RSA-CHACHA20-POLY1305 bits=256/256); Mon, 30 Apr 2018 07:43:45 -0700 (PDT) From: luca.boccassi@gmail.com To: Liang Ma Cc: Peter Mccarthy , Harry van Haaren , dpdk stable Date: Mon, 30 Apr 2018 15:41:05 +0100 Message-Id: <20180430144223.18657-50-luca.boccassi@gmail.com> X-Mailer: git-send-email 2.14.2 In-Reply-To: <20180430144223.18657-1-luca.boccassi@gmail.com> References: <20180430140606.4615-80-luca.boccassi@gmail.com> <20180430144223.18657-1-luca.boccassi@gmail.com> Subject: [dpdk-stable] patch 'event/opdl: fix atomic queue race condition' has been queued to stable release 18.02.2 X-BeenThere: stable@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: patches for DPDK stable branches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Mon, 30 Apr 2018 14:43:46 -0000 Hi, FYI, your patch has been queued to stable release 18.02.2 Note it hasn't been pushed to http://dpdk.org/browse/dpdk-stable yet. It will be pushed if I get no objections before 05/02/18. So please shout if anyone has objections. Thanks. Luca Boccassi --- >>From 459f030c20513c4f26d1d6504538374ff41a4d3d Mon Sep 17 00:00:00 2001 From: Liang Ma Date: Tue, 27 Mar 2018 15:18:12 +0100 Subject: [PATCH] event/opdl: fix atomic queue race condition [ upstream commit b770f952de678b4964d05ce49b0518919f152b83 ] If application link one atomic queue to multiple ports, and each worker core update flow_id, there will have a chance to hit race condition issue and lead to double processing same event. This fix solve the problem and eliminate the race condition issue. Fixes: 4236ce9bf5bf ("event/opdl: add OPDL ring infrastructure library") Signed-off-by: Liang Ma Signed-off-by: Peter Mccarthy Acked-by: Harry van Haaren --- drivers/event/opdl/opdl_evdev_init.c | 3 ++ drivers/event/opdl/opdl_ring.c | 93 +++++++++++++++++++++++++----------- drivers/event/opdl/opdl_ring.h | 16 ++++++- 3 files changed, 84 insertions(+), 28 deletions(-) diff --git a/drivers/event/opdl/opdl_evdev_init.c b/drivers/event/opdl/opdl_evdev_init.c index 1454de533..582ad698f 100644 --- a/drivers/event/opdl/opdl_evdev_init.c +++ b/drivers/event/opdl/opdl_evdev_init.c @@ -733,6 +733,9 @@ initialise_all_other_ports(struct rte_eventdev *dev) queue->ports[queue->nb_ports] = port; port->instance_id = queue->nb_ports; queue->nb_ports++; + opdl_stage_set_queue_id(stage_inst, + port->queue_id); + } else if (queue->q_pos == OPDL_Q_POS_END) { /* tx port */ diff --git a/drivers/event/opdl/opdl_ring.c b/drivers/event/opdl/opdl_ring.c index eca7712bd..8aca481c9 100644 --- a/drivers/event/opdl/opdl_ring.c +++ b/drivers/event/opdl/opdl_ring.c @@ -25,7 +25,10 @@ #define OPDL_NAME_SIZE 64 -#define OPDL_EVENT_MASK (0xFFFF0000000FFFFFULL) +#define OPDL_EVENT_MASK (0x00000000000FFFFFULL) +#define OPDL_FLOWID_MASK (0xFFFFF) +#define OPDL_OPA_MASK (0xFF) +#define OPDL_OPA_OFFSET (0x38) int opdl_logtype_driver; @@ -86,7 +89,6 @@ struct opdl_stage { */ uint32_t available_seq; uint32_t head; /* Current head for single-thread operation */ - uint32_t shadow_head; /* Shadow head for single-thread operation */ uint32_t nb_instance; /* Number of instances */ uint32_t instance_id; /* ID of this stage instance */ uint16_t num_claimed; /* Number of slots claimed */ @@ -102,6 +104,9 @@ struct opdl_stage { /* For managing disclaims in multi-threaded processing stages */ struct claim_manager pending_disclaims[RTE_MAX_LCORE] __rte_cache_aligned; + uint32_t shadow_head; /* Shadow head for single-thread operation */ + uint32_t queue_id; /* ID of Queue which is assigned to this stage */ + uint32_t pos; /* Atomic scan position */ } __rte_cache_aligned; /* Context for opdl_ring */ @@ -494,6 +499,9 @@ opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries, uint32_t num_entries, uint32_t *seq, bool block, bool atomic) { uint32_t i = 0, j = 0, offset; + uint32_t opa_id = 0; + uint32_t flow_id = 0; + uint64_t event = 0; void *get_slots; struct rte_event *ev; RTE_SET_USED(seq); @@ -520,7 +528,17 @@ opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries, for (j = 0; j < num_entries; j++) { ev = (struct rte_event *)get_slot(t, s->head+j); - if ((ev->flow_id%s->nb_instance) == s->instance_id) { + + event = __atomic_load_n(&(ev->event), + __ATOMIC_ACQUIRE); + + opa_id = OPDL_OPA_MASK & (event >> OPDL_OPA_OFFSET); + flow_id = OPDL_FLOWID_MASK & event; + + if (opa_id >= s->queue_id) + continue; + + if ((flow_id % s->nb_instance) == s->instance_id) { memcpy(entries_offset, ev, t->slot_size); entries_offset += t->slot_size; i++; @@ -531,6 +549,7 @@ opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries, s->head += num_entries; s->num_claimed = num_entries; s->num_event = i; + s->pos = 0; /* automatically disclaim entries if number of rte_events is zero */ if (unlikely(i == 0)) @@ -953,21 +972,26 @@ opdl_ring_get_slot(const struct opdl_ring *t, uint32_t index) } bool -opdl_ring_cas_slot(const struct opdl_stage *s, const struct rte_event *ev, +opdl_ring_cas_slot(struct opdl_stage *s, const struct rte_event *ev, uint32_t index, bool atomic) { - uint32_t i = 0, j = 0, offset; + uint32_t i = 0, offset; struct opdl_ring *t = s->t; struct rte_event *ev_orig = NULL; bool ev_updated = false; - uint64_t ev_temp = 0; + uint64_t ev_temp = 0; + uint64_t ev_update = 0; + + uint32_t opa_id = 0; + uint32_t flow_id = 0; + uint64_t event = 0; if (index > s->num_event) { PMD_DRV_LOG(ERR, "index is overflow"); return ev_updated; } - ev_temp = ev->event&OPDL_EVENT_MASK; + ev_temp = ev->event & OPDL_EVENT_MASK; if (!atomic) { offset = opdl_first_entry_id(s->seq, s->nb_instance, @@ -984,27 +1008,39 @@ opdl_ring_cas_slot(const struct opdl_stage *s, const struct rte_event *ev, } } else { - for (i = 0; i < s->num_claimed; i++) { + for (i = s->pos; i < s->num_claimed; i++) { ev_orig = (struct rte_event *) get_slot(t, s->shadow_head+i); - if ((ev_orig->flow_id%s->nb_instance) == - s->instance_id) { + event = __atomic_load_n(&(ev_orig->event), + __ATOMIC_ACQUIRE); - if (j == index) { - if ((ev_orig->event&OPDL_EVENT_MASK) != - ev_temp) { - ev_orig->event = ev->event; - ev_updated = true; - } - if (ev_orig->u64 != ev->u64) { - ev_orig->u64 = ev->u64; - ev_updated = true; - } + opa_id = OPDL_OPA_MASK & (event >> OPDL_OPA_OFFSET); + flow_id = OPDL_FLOWID_MASK & event; - break; + if (opa_id >= s->queue_id) + continue; + + if ((flow_id % s->nb_instance) == s->instance_id) { + ev_update = s->queue_id; + ev_update = (ev_update << OPDL_OPA_OFFSET) + | ev->event; + + s->pos = i + 1; + + if ((event & OPDL_EVENT_MASK) != + ev_temp) { + __atomic_store_n(&(ev_orig->event), + ev_update, + __ATOMIC_RELEASE); + ev_updated = true; } - j++; + if (ev_orig->u64 != ev->u64) { + ev_orig->u64 = ev->u64; + ev_updated = true; + } + + break; } } @@ -1049,11 +1085,7 @@ check_deps(struct opdl_ring *t, struct opdl_stage *deps[], return -EINVAL; } } - if (num_deps > t->num_stages) { - PMD_DRV_LOG(ERR, "num_deps (%u) > number stages (%u)", - num_deps, t->num_stages); - return -EINVAL; - } + return 0; } @@ -1153,6 +1185,13 @@ opdl_stage_get_opdl_ring(const struct opdl_stage *s) return s->t; } +void +opdl_stage_set_queue_id(struct opdl_stage *s, + uint32_t queue_id) +{ + s->queue_id = queue_id; +} + void opdl_ring_dump(const struct opdl_ring *t, FILE *f) { diff --git a/drivers/event/opdl/opdl_ring.h b/drivers/event/opdl/opdl_ring.h index 9e8c33e68..751a59dbc 100644 --- a/drivers/event/opdl/opdl_ring.h +++ b/drivers/event/opdl/opdl_ring.h @@ -518,6 +518,20 @@ opdl_stage_find_num_available(struct opdl_stage *s, uint32_t num_entries); struct opdl_stage * opdl_stage_create(struct opdl_ring *t, bool threadsafe); + +/** + * Set the internal queue id for each stage instance. + * + * @param s + * The pointer of stage instance. + * + * @param queue_id + * The value of internal queue id. + */ +void +opdl_stage_set_queue_id(struct opdl_stage *s, + uint32_t queue_id); + /** * Prints information on opdl_ring instance and all its stages * @@ -590,7 +604,7 @@ opdl_ring_set_stage_threadsafe(struct opdl_stage *s, bool threadsafe); */ bool -opdl_ring_cas_slot(const struct opdl_stage *s, const struct rte_event *ev, +opdl_ring_cas_slot(struct opdl_stage *s, const struct rte_event *ev, uint32_t index, bool atomic); #ifdef __cplusplus -- 2.14.2