From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from dpdk.org (dpdk.org [92.243.14.124]) by inbox.dpdk.org (Postfix) with ESMTP id 2C01AA055A; Wed, 26 Feb 2020 21:38:53 +0100 (CET) Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id EB7F02C4F; Wed, 26 Feb 2020 21:38:51 +0100 (CET) Received: from mga11.intel.com (mga11.intel.com [192.55.52.93]) by dpdk.org (Postfix) with ESMTP id 6441C1F1C for ; Wed, 26 Feb 2020 21:38:50 +0100 (CET) X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from orsmga003.jf.intel.com ([10.7.209.27]) by fmsmga102.fm.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 26 Feb 2020 12:38:49 -0800 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.70,489,1574150400"; d="scan'208";a="238155754" Received: from orsmsx105.amr.corp.intel.com ([10.22.225.132]) by orsmga003.jf.intel.com with ESMTP; 26 Feb 2020 12:38:48 -0800 Received: from ORSEDG001.ED.cps.intel.com (10.7.248.4) by ORSMSX105.amr.corp.intel.com (10.22.225.132) with Microsoft SMTP Server (TLS) id 14.3.439.0; Wed, 26 Feb 2020 12:38:48 -0800 Received: from NAM04-BN3-obe.outbound.protection.outlook.com (104.47.46.51) by edgegateway.intel.com (134.134.137.100) with Microsoft SMTP Server (TLS) id 14.3.439.0; Wed, 26 Feb 2020 12:38:48 -0800 ARC-Seal: i=1; a=rsa-sha256; s=arcselector9901; d=microsoft.com; cv=none; b=cvhXzTUIpxUq3xhQQlZAsj+yK1Rb/nV1JYTuf7oZTsPDwlYJtsUfSwzsuvLkH9XGDgu7RjVJlgTmsD/lKYkuZ5FEWWy7m+Q3dQnvSaLYYxayz2uyYAowP8U/9KjKaeFwV2PDmENQge85GLm4S6DgnCEL8b1P+8y3/dokJ1qVChqbwwK/ZziiH6NdgHng28Hh9dsZYYcwQZQNIj1qebZOehudw1zVaWbj2IgE7PBhEDRL3Qwjpz4+Jo9vdFgbk5TxEUiO+0rTAu/3B2JW+xL4nW8DEie34xzttese1uUz6ebowJAHIO28eNQZbymIldoT37Sf6GbXgfQud3IC+VBSbQ== ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=microsoft.com; s=arcselector9901; h=From:Date:Subject:Message-ID:Content-Type:MIME-Version:X-MS-Exchange-SenderADCheck; bh=PBFfp9sCuQevYZ9ekjRbqJ7e5UT+Z5Z+BptYEza0CCI=; b=CFVAB6+w4Av/ci+k0hiT+u6WKCORC+txs/hiYP1qjGB66c0CHy7RhGy5n2Dd2hmpTCM5gCPbH5NCwjoBSSLwVQBSDXToClzKrbhXt+vZ8HvgHJ1ieYln4jT3oRRudYGlWYxJLndBOKirTOTcbkuORBw61ZXPr9UTvSbmT4d0mb3secAVviuMoHFrddfy22sfW3MEoegS1x3PAFSjGD7CTQhO6Yck64Zg96aomiF/jwD8mWGJ9bgGk1XINff0rv9OgYpXDqgtqQg+mvr4b0BKOWNJUmZULExNAwY5aZta9UE7EBSKWt/Hhl1mwEBvo7syF4tsPGIZiTmlk/uzn0XW4w== ARC-Authentication-Results: i=1; mx.microsoft.com 1; spf=pass smtp.mailfrom=intel.com; dmarc=pass action=none header.from=intel.com; dkim=pass header.d=intel.com; arc=none DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=intel.onmicrosoft.com; s=selector2-intel-onmicrosoft-com; h=From:Date:Subject:Message-ID:Content-Type:MIME-Version:X-MS-Exchange-SenderADCheck; bh=PBFfp9sCuQevYZ9ekjRbqJ7e5UT+Z5Z+BptYEza0CCI=; b=rqBoKCQj1OTVGqJmYhKAXnSXi6gRM7VarMhNxgjsMHDZMF8iEqJzGRhn+tcs4AZl5jHw9RElT/43GtqHVs41IzWd1a2ZoSdN7rD9+pfmiCYM7UH1vP+z91wX4WGQ7TYGH0KmxNmApKGKS5WibI0NYTlZipxv1gXkj1vrlNr335I= Received: from SN6PR11MB2558.namprd11.prod.outlook.com (2603:10b6:805:5d::19) by SN6PR11MB2800.namprd11.prod.outlook.com (2603:10b6:805:5b::15) with Microsoft SMTP Server (version=TLS1_2, cipher=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384) id 15.20.2750.22; Wed, 26 Feb 2020 20:38:46 +0000 Received: from SN6PR11MB2558.namprd11.prod.outlook.com ([fe80::395e:eb75:6ab7:2ba5]) by SN6PR11MB2558.namprd11.prod.outlook.com ([fe80::395e:eb75:6ab7:2ba5%3]) with mapi id 15.20.2772.012; Wed, 26 Feb 2020 20:38:46 +0000 From: "Ananyev, Konstantin" To: Honnappa Nagarahalli , "olivier.matz@6wind.com" CC: "gavin.hu@arm.com" , "dev@dpdk.org" , "nd@arm.com" Thread-Topic: [RFC 1/1] lib/ring: add scatter gather and serial dequeue APIs Thread-Index: AQHV61KoLY8qxNN4FEGi1nt9XMNavqgt1y+w Date: Wed, 26 Feb 2020 20:38:45 +0000 Message-ID: References: <20200224203931.21256-1-honnappa.nagarahalli@arm.com> <20200224203931.21256-2-honnappa.nagarahalli@arm.com> In-Reply-To: <20200224203931.21256-2-honnappa.nagarahalli@arm.com> Accept-Language: en-GB, en-US Content-Language: en-US X-MS-Has-Attach: X-MS-TNEF-Correlator: x-titus-metadata-40: eyJDYXRlZ29yeUxhYmVscyI6IiIsIk1ldGFkYXRhIjp7Im5zIjoiaHR0cDpcL1wvd3d3LnRpdHVzLmNvbVwvbnNcL0ludGVsMyIsImlkIjoiNjMxOTlmMGMtNGFjYS00YTUzLTgxYWEtZTRhMjdmNjc3N2YwIiwicHJvcHMiOlt7Im4iOiJDVFBDbGFzc2lmaWNhdGlvbiIsInZhbHMiOlt7InZhbHVlIjoiQ1RQX05UIn1dfV19LCJTdWJqZWN0TGFiZWxzIjpbXSwiVE1DVmVyc2lvbiI6IjE3LjEwLjE4MDQuNDkiLCJUcnVzdGVkTGFiZWxIYXNoIjoiajlSQ0prTUpydFRMUkVwUzg3Z2NiVEl0RE5cL0JBOTdEd29TSGFnTTVzcTBOaFRYeVwvSUZzYnRpSld2Q25ERTM3In0= dlp-product: dlpe-windows dlp-reaction: no-action dlp-version: 11.2.0.6 x-ctpclassification: CTP_NT authentication-results: spf=none (sender IP is ) smtp.mailfrom=konstantin.ananyev@intel.com; x-originating-ip: [192.198.151.178] x-ms-publictraffictype: Email x-ms-office365-filtering-correlation-id: e3959ddd-e34d-4042-cbaa-08d7bafbe06c x-ms-traffictypediagnostic: SN6PR11MB2800: x-microsoft-antispam-prvs: x-ms-oob-tlc-oobclassifiers: OLM:10000; x-forefront-prvs: 0325F6C77B x-forefront-antispam-report: SFV:NSPM; SFS:(10019020)(136003)(366004)(39860400002)(376002)(346002)(396003)(199004)(189003)(86362001)(52536014)(4326008)(81166006)(7696005)(966005)(71200400001)(81156014)(76116006)(8936002)(9686003)(8676002)(55016002)(110136005)(54906003)(316002)(6506007)(2906002)(30864003)(66946007)(26005)(33656002)(478600001)(64756008)(66446008)(186003)(66556008)(5660300002)(66476007)(21314003)(579004); DIR:OUT; SFP:1102; SCL:1; SRVR:SN6PR11MB2800; H:SN6PR11MB2558.namprd11.prod.outlook.com; FPR:; SPF:None; LANG:en; PTR:InfoNoRecords; A:1; MX:1; x-ms-exchange-senderadcheck: 1 x-microsoft-antispam: BCL:0; x-microsoft-antispam-message-info: oGqNdf1EZV+PQcc55Nygc8YOJ8RQNGIDHak3Qjtfzs84kQPaovEC47YGDQZYqkghhsFBokDpahjoITeNS0bbAFSTEFJKwju9qFfGKyze6CqvdZwjQohysrff9b9YdrUXTXUxH55KTPmR0eXlr1OhLj0AV/i56sxhh5ePMbskCixsHsqhslLe9LY6etwl3u80mMsDaZlqUuMpganjWoaLAWznHe4ePPA0f2/Kcpq+YrN7ZUibXyk9yq0XFpEiIz+uhdjMdN66tPS47h79BfmQUeqfO95txbDMkyOIWhJzQ8xJJ+QsI7lWNR/Qcxtr3PeBh4ghHO0jLiPneD8jdXaQ1QCsFA4PNW30XaRWwzfBMppOJ6szIXIb66hKyUakEXD51xa+sCKXHQMeJBsRzEdsrPi+ZRMMAHh0ASCuCOVPFVT97wQeH25Y25Qh1UGyDOLT8zCVh5scGrZGgKz8BsBfdndAygttzPL7arIRWYaSMZjCGBKdIsTx49Tb98sqYB8syP4IjzcfYm202PscNF7ZdDuMok8rJ+26F3aar8oyXYh0QK8mLkvQKBj7HqFgx+3/ x-ms-exchange-antispam-messagedata: o9oDrf9vYEwrMEeOMGT0DYpoyTkybAc3VrU660gLFL66kEUyz6v1HvbFiNywvE4mPdYvMmfOtyW2g8CUaZ7qQ9yc6Dwic7M8CMVBOEKaBDZ6YJAmzYL7bCpT+lSrda+i41/TdryaC1eW+CvDwI4tbw== x-ms-exchange-transport-forked: True Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: quoted-printable MIME-Version: 1.0 X-MS-Exchange-CrossTenant-Network-Message-Id: e3959ddd-e34d-4042-cbaa-08d7bafbe06c X-MS-Exchange-CrossTenant-originalarrivaltime: 26 Feb 2020 20:38:45.7050 (UTC) X-MS-Exchange-CrossTenant-fromentityheader: Hosted X-MS-Exchange-CrossTenant-id: 46c98d88-e344-4ed4-8496-4ed7712e255d X-MS-Exchange-CrossTenant-mailboxtype: HOSTED X-MS-Exchange-CrossTenant-userprincipalname: 6hfWm4UwOVFuH5jtgfbD6UU1y6xQshNk2OjjT8eX4zis+ac5QWeDUbZ46HdclkOXiAxaMqI8of2T8gm5/YMlKGj0oWaQ9iKkYJjdg11QOK8= X-MS-Exchange-Transport-CrossTenantHeadersStamped: SN6PR11MB2800 X-OriginatorOrg: intel.com Subject: Re: [dpdk-dev] [RFC 1/1] lib/ring: add scatter gather and serial dequeue APIs X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" Hi Honnappa, > Add scatter gather APIs to avoid intermediate memcpy. Serial > dequeue APIs are added to support access to ring elements > before actual dequeue. >=20 > Signed-off-by: Honnappa Nagarahalli > Reviewed-by: Gavin Hu > Reviewed-by: Ola Liljedahl > --- > lib/librte_ring/Makefile | 1 + > lib/librte_ring/meson.build | 1 + > lib/librte_ring/rte_ring_c11_mem.h | 98 +++++++ > lib/librte_ring/rte_ring_elem_sg.h | 417 +++++++++++++++++++++++++++++ > lib/librte_ring/rte_ring_generic.h | 93 +++++++ > 5 files changed, 610 insertions(+) As was already noticed by you this patch overlaps quite a bit with another = one: http://patches.dpdk.org/patch/66006/ Though it seems there are few significant differences in our approaches (both API and implementation). So we probably need to come-up with some common view first, before moving forward with some unified version. To start a discussion, I produced some comments, pls see below.=20 I don't see changes in rte_ring.h itself, but I suppose that's just because it is an RFC and it would be added in later versions? Another similar question there seems only _bulk_ (RTE_RING_QUEUE_FIXED) mod= e, I suppose _burst_ will also be added in later versions? > diff --git a/lib/librte_ring/rte_ring_elem_sg.h b/lib/librte_ring/rte_rin= g_elem_sg.h > new file mode 100644 > index 000000000..a73f4fbfe > --- /dev/null > +++ b/lib/librte_ring/rte_ring_elem_sg.h > @@ -0,0 +1,417 @@ > +/* SPDX-License-Identifier: BSD-3-Clause > + * > + * Copyright (c) 2020 Arm Limited > + * Copyright (c) 2010-2017 Intel Corporation > + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org > + * All rights reserved. > + * Derived from FreeBSD's bufring.h > + * Used as BSD-3 Licensed with permission from Kip Macy. > + */ > + > +#ifndef _RTE_RING_ELEM_SG_H_ > +#define _RTE_RING_ELEM_SG_H_ > + > +/** > + * @file > + * RTE Ring with > + * 1) user defined element size > + * 2) scatter gather feature to copy objects to/from the ring > + * 3) ability to reserve, consume/discard elements in the ring > + */ > + > +#ifdef __cplusplus > +extern "C" { > +#endif > + > +#include > +#include > +#include > +#include > +#include > +#include > +#include > +#include > +#include > +#include > +#include > +#include > +#include > + > +#include "rte_ring.h" > +#include "rte_ring_elem.h" > + > +/* Between load and load. there might be cpu reorder in weak model > + * (powerpc/arm). > + * There are 2 choices for the users > + * 1.use rmb() memory barrier > + * 2.use one-direction load_acquire/store_release barrier,defined by > + * CONFIG_RTE_USE_C11_MEM_MODEL=3Dy > + * It depends on performance test results. > + * By default, move common functions to rte_ring_generic.h > + */ > +#ifdef RTE_USE_C11_MEM_MODEL > +#include "rte_ring_c11_mem.h" > +#else > +#include "rte_ring_generic.h" > +#endif > + > +static __rte_always_inline void > +__rte_ring_get_elem_addr_64(struct rte_ring *r, uint32_t head, > + uint32_t num, void **dst1, uint32_t *n1, void **dst2) > +{ > + uint32_t idx =3D head & r->mask; > + uint64_t *ring =3D (uint64_t *)&r[1]; > + > + *dst1 =3D ring + idx; > + *n1 =3D num; > + > + if (idx + num > r->size) { > + *n1 =3D num - (r->size - idx - 1); > + *dst2 =3D ring; > + } > +} > + > +static __rte_always_inline void > +__rte_ring_get_elem_addr_128(struct rte_ring *r, uint32_t head, > + uint32_t num, void **dst1, uint32_t *n1, void **dst2) > +{ > + uint32_t idx =3D head & r->mask; > + rte_int128_t *ring =3D (rte_int128_t *)&r[1]; > + > + *dst1 =3D ring + idx; > + *n1 =3D num; > + > + if (idx + num > r->size) { > + *n1 =3D num - (r->size - idx - 1); > + *dst2 =3D ring; > + } > +} > + > +static __rte_always_inline void > +__rte_ring_get_elem_addr(struct rte_ring *r, uint32_t head, > + uint32_t esize, uint32_t num, void **dst1, uint32_t *n1, void **dst2) > +{ > + if (esize =3D=3D 8) > + return __rte_ring_get_elem_addr_64(r, head, > + num, dst1, n1, dst2); > + else if (esize =3D=3D 16) > + return __rte_ring_get_elem_addr_128(r, head, > + num, dst1, n1, dst2); > + else { > + uint32_t idx, scale, nr_idx; > + uint32_t *ring =3D (uint32_t *)&r[1]; > + > + /* Normalize to uint32_t */ > + scale =3D esize / sizeof(uint32_t); > + idx =3D head & r->mask; > + nr_idx =3D idx * scale; > + > + *dst1 =3D ring + nr_idx; > + *n1 =3D num; > + > + if (idx + num > r->size) { > + *n1 =3D num - (r->size - idx - 1); > + *dst2 =3D ring; > + } > + } > +} > + > +/** > + * @internal Reserve ring elements to enqueue several objects on the rin= g > + * > + * @param r > + * A pointer to the ring structure. > + * @param esize > + * The size of ring element, in bytes. It must be a multiple of 4. > + * This must be the same value used while creating the ring. Otherwise > + * the results are undefined. > + * @param n > + * The number of elements to reserve in the ring. > + * @param behavior > + * RTE_RING_QUEUE_FIXED: Reserve a fixed number of elements from a = ring > + * RTE_RING_QUEUE_VARIABLE: Reserve as many elements as possible from = ring > + * @param is_sp > + * Indicates whether to use single producer or multi-producer reserve > + * @param old_head > + * Producer's head index before reservation. > + * @param new_head > + * Producer's head index after reservation. > + * @param free_space > + * returns the amount of space after the reserve operation has finishe= d. > + * It is not updated if the number of reserved elements is zero. > + * @param dst1 > + * Pointer to location in the ring to copy the data. > + * @param n1 > + * Number of elements to copy at dst1 > + * @param dst2 > + * In case of ring wrap around, this pointer provides the location to > + * copy the remaining elements. The number of elements to copy at this > + * location is equal to (number of elements reserved - n1) > + * @return > + * Actual number of elements reserved. > + * If behavior =3D=3D RTE_RING_QUEUE_FIXED, this will be 0 or n only. > + */ > +static __rte_always_inline unsigned int > +__rte_ring_do_enqueue_elem_reserve(struct rte_ring *r, unsigned int esiz= e, I do understand the purpose of reserve, then either commit/abort for serial= sync mode, but what is the purpose of non-serial version of reserve/commit? In serial MP/MC case, after _reserve_(n) you always have to do _commit_(n) - you can't reduce number of elements, or do _abort_. Again you cannot avoid memcpy(n) here anyhow. So what is the point of these functions for non-serial case?=20 BTW, I think it would be good to have serial version of _enqueue_ too. > + unsigned int n, enum rte_ring_queue_behavior behavior, > + unsigned int is_sp, unsigned int *old_head, > + unsigned int *new_head, unsigned int *free_space, > + void **dst1, unsigned int *n1, void **dst2) I do understand the intention to avoid memcpy(), but proposed API seems overcomplicated, error prone, and not very convenient for the user. I don't think that avoiding memcpy() will save us that many cycles here, so probably better to keep API model a bit more regular: n =3D rte_ring_mp_serial_enqueue_bulk_reserve(ring, num, &free_space); ... /* performs actual memcpy(), m<=3Dn */=20 rte_ring_mp_serial_enqueue_bulk_commit(ring, obj, m); /* performs actual memcpy for num elems */=20 n =3D rte_ring_mp_serial_dequeue_bulk_reserve(ring, obj, num, &free_space); .... /* m<=3Dn */ rte_ring_mp_serial_dequeue_bulk_commit(ring, obj, m); Plus, we can have usual enqueue/dequeue API for serial sync mode: rte_ring_serial_(enqueue/dequeue)_(bulk/burst) > +{ > + uint32_t free_entries; > + > + n =3D __rte_ring_move_prod_head(r, is_sp, n, behavior, > + old_head, new_head, &free_entries); > + > + if (n =3D=3D 0) > + goto end; Here and in other similar places, why not just 'return 0;'? > + > + __rte_ring_get_elem_addr(r, *old_head, esize, n, dst1, n1, dst2); > + > + if (free_space !=3D NULL) > + *free_space =3D free_entries - n; > + > +end: > + return n; > +} > + > +/** > + * @internal Consume previously reserved ring elements (for enqueue) > + * > + * @param r > + * A pointer to the ring structure. > + * @param old_head > + * Producer's head index before reservation. > + * @param new_head > + * Producer's head index after reservation. > + * @param is_sp > + * Indicates whether to use single producer or multi-producer head upd= ate > + */ > +static __rte_always_inline void > +__rte_ring_do_enqueue_elem_commit(struct rte_ring *r, > + unsigned int old_head, unsigned int new_head, > + unsigned int is_sp) > +{ > + update_tail(&r->prod, old_head, new_head, is_sp, 1); > +} > + > +/** > + * Reserve one element for enqueuing one object on a ring > + * (multi-producers safe). Application must call > + * 'rte_ring_mp_enqueue_elem_commit' to complete the enqueue operation. > + * > + * @param r > + * A pointer to the ring structure. > + * @param esize > + * The size of ring element, in bytes. It must be a multiple of 4. > + * This must be the same value used while creating the ring. Otherwise > + * the results are undefined. > + * @param old_head > + * Producer's head index before reservation. The same should be passed= to > + * 'rte_ring_mp_enqueue_elem_commit' function. > + * @param new_head > + * Producer's head index after reservation. The same should be passed = to > + * 'rte_ring_mp_enqueue_elem_commit' function. > + * @param free_space > + * Returns the amount of space after the reservation operation has fin= ished. > + * It is not updated if the number of reserved elements is zero. > + * @param dst > + * Pointer to location in the ring to copy the data. > + * @return > + * - 0: Success; objects enqueued. > + * - -ENOBUFS: Not enough room in the ring to reserve; no element is r= eserved. > + */ > +static __rte_always_inline int > +rte_ring_mp_enqueue_elem_reserve(struct rte_ring *r, unsigned int esize, > + unsigned int *old_head, unsigned int *new_head, > + unsigned int *free_space, void **dst) > +{ > + unsigned int n; > + > + return __rte_ring_do_enqueue_elem_reserve(r, esize, 1, > + RTE_RING_QUEUE_FIXED, 0, old_head, new_head, > + free_space, dst, &n, NULL) ? 0 : -ENOBUFS; > +} > + > +/** > + * Consume previously reserved elements (for enqueue) in a ring > + * (multi-producers safe). This API completes the enqueue operation. > + * > + * @param r > + * A pointer to the ring structure. > + * @param old_head > + * Producer's head index before reservation. This value was returned > + * when the API 'rte_ring_mp_enqueue_elem_reserve' was called. > + * @param new_head > + * Producer's head index after reservation. This value was returned > + * when the API 'rte_ring_mp_enqueue_elem_reserve' was called. > + */ > +static __rte_always_inline void > +rte_ring_mp_enqueue_elem_commit(struct rte_ring *r, unsigned int old_hea= d, > + unsigned int new_head) > +{ > + __rte_ring_do_enqueue_elem_commit(r, old_head, new_head, 0); > +} > + > +/** > + * @internal Reserve elements to dequeue several objects on the ring. > + * This function blocks if there are elements reserved already. > + * > + * @param r > + * A pointer to the ring structure. > + * @param esize > + * The size of ring element, in bytes. It must be a multiple of 4. > + * This must be the same value used while creating the ring. Otherwise > + * the results are undefined. > + * @param n > + * The number of objects to reserve in the ring > + * @param behavior > + * RTE_RING_QUEUE_FIXED: Reserve fixed number of elements in a ring > + * RTE_RING_QUEUE_VARIABLE: Reserve as many elements as possible in a = ring > + * @param is_sc > + * Indicates whether to use single consumer or multi-consumer head upd= ate > + * @param old_head > + * Consumer's head index before reservation. > + * @param new_head > + * Consumer's head index after reservation. > + * @param available > + * returns the number of remaining ring elements after the reservation > + * It is not updated if the number of reserved elements is zero. > + * @param src1 > + * Pointer to location in the ring to copy the data from. > + * @param n1 > + * Number of elements to copy from src1 > + * @param src2 > + * In case of wrap around in the ring, this pointer provides the locat= ion > + * to copy the remaining elements from. The number of elements to copy= from > + * this pointer is equal to (number of elements reserved - n1) > + * @return > + * Actual number of elements reserved. > + * If behavior =3D=3D RTE_RING_QUEUE_FIXED, this will be 0 or n only. > + */ > +static __rte_always_inline unsigned int > +__rte_ring_do_dequeue_elem_reserve_serial(struct rte_ring *r, > + unsigned int esize, unsigned int n, > + enum rte_ring_queue_behavior behavior, unsigned int is_sc, > + unsigned int *old_head, unsigned int *new_head, > + unsigned int *available, void **src1, unsigned int *n1, > + void **src2) > +{ > + uint32_t entries; > + > + n =3D __rte_ring_move_cons_head_serial(r, is_sc, n, behavior, > + old_head, new_head, &entries); > + > + if (n =3D=3D 0) > + goto end; > + > + __rte_ring_get_elem_addr(r, *old_head, esize, n, src1, n1, src2); > + > + if (available !=3D NULL) > + *available =3D entries - n; > + > +end: > + return n; > +} > + > +/** > + * @internal Consume previously reserved ring elements (for dequeue) > + * > + * @param r > + * A pointer to the ring structure. > + * @param old_head > + * Consumer's head index before reservation. > + * @param new_head > + * Consumer's head index after reservation. > + * @param is_sc > + * Indicates whether to use single consumer or multi-consumer head upd= ate > + */ > +static __rte_always_inline void > +__rte_ring_do_dequeue_elem_commit(struct rte_ring *r, > + unsigned int old_head, unsigned int new_head, > + unsigned int is_sc) > +{ I think it is a bit dangerous and error-prone approach to let user specify old_head/new_head manually. Seems better just _commit(ring, num) - see above. That way suer don't have to calculate new head mannualy, plus we can have a check that ring.tail - ring.head >=3D num. =20 > + update_tail(&r->cons, old_head, new_head, is_sc, 1); I think update_tail() is not enough here. As in some cases we need to update ring.head also: let say user reserved 2 elems, but then decided to commit only one. =20 So I think we need a special new function instead of update_tail() here. BTW, in HTS I use atomic 64-bit read/write to get/set both head and tail in= one go. This is not really required - two 32bit ops would work too, I think. As usual, both ways have some pros and cons: using 64bit ops might be faster on 64-bit target, plus it is less error pro= ne (no need to think about head/tail read/write ordering, etc.), though for 32-bit target it would mean some extra overhead.=20 > +} > + > +/** > + * Reserve one element on a ring for dequeue. This function blocks if th= ere > + * are elements reserved already. Application must call > + * 'rte_ring_do_dequeue_elem_commit' or > + * `rte_ring_do_dequeue_elem_revert_serial' to complete the dequeue oper= ation. > + * > + * @param r > + * A pointer to the ring structure. > + * @param esize > + * The size of ring element, in bytes. It must be a multiple of 4. > + * This must be the same value used while creating the ring. Otherwise > + * the results are undefined. > + * @param old_head > + * Consumer's head index before reservation. The same should be passed= to > + * 'rte_ring_dequeue_elem_commit' function. > + * @param new_head > + * Consumer's head index after reservation. The same should be passed = to > + * 'rte_ring_dequeue_elem_commit' function. > + * @param available > + * returns the number of remaining ring elements after the reservation > + * It is not updated if the number of reserved elements is zero. > + * @param src > + * Pointer to location in the ring to copy the data from. > + * @return > + * - 0: Success; elements reserved > + * - -ENOBUFS: Not enough room in the ring; no element is reserved. > + */ > +static __rte_always_inline int > +rte_ring_dequeue_elem_reserve_serial(struct rte_ring *r, unsigned int es= ize, > + unsigned int *old_head, unsigned int *new_head, > + unsigned int *available, void **src) > +{ > + unsigned int n; > + > + return __rte_ring_do_dequeue_elem_reserve_serial(r, esize, 1, > + RTE_RING_QUEUE_FIXED, r->cons.single, old_head, > + new_head, available, src, &n, NULL) ? 0 : -ENOBUFS; > +} > + > +/** > + * Consume previously reserved elements (for dequeue) in a ring > + * (multi-consumer safe). > + * > + * @param r > + * A pointer to the ring structure. > + * @param old_head > + * Consumer's head index before reservation. This value was returned > + * when the API 'rte_ring_dequeue_elem_reserve_xxx' was called. > + * @param new_head > + * Consumer's head index after reservation. This value was returned > + * when the API 'rte_ring_dequeue_elem_reserve_xxx' was called. > + */ > +static __rte_always_inline void > +rte_ring_dequeue_elem_commit(struct rte_ring *r, unsigned int old_head, > + unsigned int new_head) > +{ > + __rte_ring_do_dequeue_elem_commit(r, old_head, new_head, > + r->cons.single); > +} > + > +/** > + * Discard previously reserved elements (for dequeue) in a ring. > + * > + * @warning > + * This API can be called only if the ring elements were reserved > + * using 'rte_ring_dequeue_xxx_elem_reserve_serial' APIs. > + * > + * @param r > + * A pointer to the ring structure. > + */ > +static __rte_always_inline void > +rte_ring_dequeue_elem_revert_serial(struct rte_ring *r) > +{ > + __rte_ring_revert_head(&r->cons); > +} > + > +#ifdef __cplusplus > +} > +#endif > + > +#endif /* _RTE_RING_ELEM_SG_H_ */ > diff --git a/lib/librte_ring/rte_ring_generic.h b/lib/librte_ring/rte_rin= g_generic.h > index 953cdbbd5..8d7a7ffcc 100644 > --- a/lib/librte_ring/rte_ring_generic.h > +++ b/lib/librte_ring/rte_ring_generic.h > @@ -170,4 +170,97 @@ __rte_ring_move_cons_head(struct rte_ring *r, unsign= ed int is_sc, > return n; > } >=20 > +/** > + * @internal This function updates the consumer head if there are no > + * prior reserved elements on the ring. > + * > + * @param r > + * A pointer to the ring structure > + * @param is_sc > + * Indicates whether multi-consumer path is needed or not > + * @param n > + * The number of elements we will want to dequeue, i.e. how far should= the > + * head be moved > + * @param behavior > + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a rin= g > + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from rin= g > + * @param old_head > + * Returns head value as it was before the move, i.e. where dequeue st= arts > + * @param new_head > + * Returns the current/new head value i.e. where dequeue finishes > + * @param entries > + * Returns the number of entries in the ring BEFORE head was moved > + * @return > + * - Actual number of objects dequeued. > + * If behavior =3D=3D RTE_RING_QUEUE_FIXED, this will be 0 or n only= . > + */ > +static __rte_always_inline unsigned int > +__rte_ring_move_cons_head_serial(struct rte_ring *r, unsigned int is_sc, > + unsigned int n, enum rte_ring_queue_behavior behavior, > + uint32_t *old_head, uint32_t *new_head, > + uint32_t *entries) > +{ > + unsigned int max =3D n; > + int success; > + > + /* move cons.head atomically */ > + do { > + /* Restore n as it may change every loop */ > + n =3D max; > + > + *old_head =3D r->cons.head; > + > + /* add rmb barrier to avoid load/load reorder in weak > + * memory model. It is noop on x86 > + */ > + rte_smp_rmb(); > + > + /* Ensure that cons.tail and cons.head are the same */ > + if (*old_head !=3D r->cons.tail) { > + rte_pause(); > + > + success =3D 0; > + continue; > + } > + > + /* The subtraction is done between two unsigned 32bits value > + * (the result is always modulo 32 bits even if we have > + * cons_head > prod_tail). So 'entries' is always between 0 > + * and size(ring)-1. > + */ > + *entries =3D (r->prod.tail - *old_head); > + > + /* Set the actual entries for dequeue */ > + if (n > *entries) > + n =3D (behavior =3D=3D RTE_RING_QUEUE_FIXED) ? 0 : *entries; > + > + if (unlikely(n =3D=3D 0)) > + return 0; > + > + *new_head =3D *old_head + n; > + if (is_sc) { > + r->cons.head =3D *new_head; > + rte_smp_rmb(); > + success =3D 1; I don't think we need to worry about SC case in this function. For sc(/sp)_serial (if we need such mode) - we probably can use normal move= _(cons/prod)_head(). > + } else { > + success =3D rte_atomic32_cmpset(&r->cons.head, *old_head, > + *new_head); > + } > + } while (unlikely(success =3D=3D 0)); > + return n; > +} > + > +/** > + * @internal This function updates the head to match the tail > + * > + * @param ht > + * A pointer to the ring's head-tail structure > + */ > +static __rte_always_inline void > +__rte_ring_revert_head(struct rte_ring_headtail *ht) > +{ > + /* Discard the reserved ring elements. */ > + ht->head =3D ht->tail; > +} > + > #endif /* _RTE_RING_GENERIC_H_ */ > -- > 2.17.1