From mboxrd@z Thu Jan  1 00:00:00 1970
Return-Path: <dev-bounces@dpdk.org>
Received: from dpdk.org (dpdk.org [92.243.14.124])
	by inbox.dpdk.org (Postfix) with ESMTP id 2C01AA055A;
	Wed, 26 Feb 2020 21:38:53 +0100 (CET)
Received: from [92.243.14.124] (localhost [127.0.0.1])
	by dpdk.org (Postfix) with ESMTP id EB7F02C4F;
	Wed, 26 Feb 2020 21:38:51 +0100 (CET)
Received: from mga11.intel.com (mga11.intel.com [192.55.52.93])
 by dpdk.org (Postfix) with ESMTP id 6441C1F1C
 for <dev@dpdk.org>; Wed, 26 Feb 2020 21:38:50 +0100 (CET)
X-Amp-Result: SKIPPED(no attachment in message)
X-Amp-File-Uploaded: False
Received: from orsmga003.jf.intel.com ([10.7.209.27])
 by fmsmga102.fm.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384;
 26 Feb 2020 12:38:49 -0800
X-ExtLoop1: 1
X-IronPort-AV: E=Sophos;i="5.70,489,1574150400"; d="scan'208";a="238155754"
Received: from orsmsx105.amr.corp.intel.com ([10.22.225.132])
 by orsmga003.jf.intel.com with ESMTP; 26 Feb 2020 12:38:48 -0800
Received: from ORSEDG001.ED.cps.intel.com (10.7.248.4) by
 ORSMSX105.amr.corp.intel.com (10.22.225.132) with Microsoft SMTP Server (TLS)
 id 14.3.439.0; Wed, 26 Feb 2020 12:38:48 -0800
Received: from NAM04-BN3-obe.outbound.protection.outlook.com (104.47.46.51) by
 edgegateway.intel.com (134.134.137.100) with Microsoft SMTP Server
 (TLS) id 14.3.439.0; Wed, 26 Feb 2020 12:38:48 -0800
ARC-Seal: i=1; a=rsa-sha256; s=arcselector9901; d=microsoft.com; cv=none;
 b=cvhXzTUIpxUq3xhQQlZAsj+yK1Rb/nV1JYTuf7oZTsPDwlYJtsUfSwzsuvLkH9XGDgu7RjVJlgTmsD/lKYkuZ5FEWWy7m+Q3dQnvSaLYYxayz2uyYAowP8U/9KjKaeFwV2PDmENQge85GLm4S6DgnCEL8b1P+8y3/dokJ1qVChqbwwK/ZziiH6NdgHng28Hh9dsZYYcwQZQNIj1qebZOehudw1zVaWbj2IgE7PBhEDRL3Qwjpz4+Jo9vdFgbk5TxEUiO+0rTAu/3B2JW+xL4nW8DEie34xzttese1uUz6ebowJAHIO28eNQZbymIldoT37Sf6GbXgfQud3IC+VBSbQ==
ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=microsoft.com; 
 s=arcselector9901;
 h=From:Date:Subject:Message-ID:Content-Type:MIME-Version:X-MS-Exchange-SenderADCheck;
 bh=PBFfp9sCuQevYZ9ekjRbqJ7e5UT+Z5Z+BptYEza0CCI=;
 b=CFVAB6+w4Av/ci+k0hiT+u6WKCORC+txs/hiYP1qjGB66c0CHy7RhGy5n2Dd2hmpTCM5gCPbH5NCwjoBSSLwVQBSDXToClzKrbhXt+vZ8HvgHJ1ieYln4jT3oRRudYGlWYxJLndBOKirTOTcbkuORBw61ZXPr9UTvSbmT4d0mb3secAVviuMoHFrddfy22sfW3MEoegS1x3PAFSjGD7CTQhO6Yck64Zg96aomiF/jwD8mWGJ9bgGk1XINff0rv9OgYpXDqgtqQg+mvr4b0BKOWNJUmZULExNAwY5aZta9UE7EBSKWt/Hhl1mwEBvo7syF4tsPGIZiTmlk/uzn0XW4w==
ARC-Authentication-Results: i=1; mx.microsoft.com 1; spf=pass
 smtp.mailfrom=intel.com; dmarc=pass action=none header.from=intel.com;
 dkim=pass header.d=intel.com; arc=none
DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=intel.onmicrosoft.com; 
 s=selector2-intel-onmicrosoft-com;
 h=From:Date:Subject:Message-ID:Content-Type:MIME-Version:X-MS-Exchange-SenderADCheck;
 bh=PBFfp9sCuQevYZ9ekjRbqJ7e5UT+Z5Z+BptYEza0CCI=;
 b=rqBoKCQj1OTVGqJmYhKAXnSXi6gRM7VarMhNxgjsMHDZMF8iEqJzGRhn+tcs4AZl5jHw9RElT/43GtqHVs41IzWd1a2ZoSdN7rD9+pfmiCYM7UH1vP+z91wX4WGQ7TYGH0KmxNmApKGKS5WibI0NYTlZipxv1gXkj1vrlNr335I=
Received: from SN6PR11MB2558.namprd11.prod.outlook.com (2603:10b6:805:5d::19)
 by SN6PR11MB2800.namprd11.prod.outlook.com (2603:10b6:805:5b::15)
 with Microsoft SMTP Server (version=TLS1_2,
 cipher=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384) id 15.20.2750.22; Wed, 26 Feb
 2020 20:38:46 +0000
Received: from SN6PR11MB2558.namprd11.prod.outlook.com
 ([fe80::395e:eb75:6ab7:2ba5]) by SN6PR11MB2558.namprd11.prod.outlook.com
 ([fe80::395e:eb75:6ab7:2ba5%3]) with mapi id 15.20.2772.012; Wed, 26 Feb 2020
 20:38:46 +0000
From: "Ananyev, Konstantin" <konstantin.ananyev@intel.com>
To: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>,
 "olivier.matz@6wind.com" <olivier.matz@6wind.com>
CC: "gavin.hu@arm.com" <gavin.hu@arm.com>, "dev@dpdk.org" <dev@dpdk.org>,
 "nd@arm.com" <nd@arm.com>
Thread-Topic: [RFC 1/1] lib/ring: add scatter gather and serial dequeue APIs
Thread-Index: AQHV61KoLY8qxNN4FEGi1nt9XMNavqgt1y+w
Date: Wed, 26 Feb 2020 20:38:45 +0000
Message-ID: <SN6PR11MB25583DDB5F09983CCE9DB5769AEA0@SN6PR11MB2558.namprd11.prod.outlook.com>
References: <20200224203931.21256-1-honnappa.nagarahalli@arm.com>
 <20200224203931.21256-2-honnappa.nagarahalli@arm.com>
In-Reply-To: <20200224203931.21256-2-honnappa.nagarahalli@arm.com>
Accept-Language: en-GB, en-US
Content-Language: en-US
X-MS-Has-Attach: 
X-MS-TNEF-Correlator: 
x-titus-metadata-40: eyJDYXRlZ29yeUxhYmVscyI6IiIsIk1ldGFkYXRhIjp7Im5zIjoiaHR0cDpcL1wvd3d3LnRpdHVzLmNvbVwvbnNcL0ludGVsMyIsImlkIjoiNjMxOTlmMGMtNGFjYS00YTUzLTgxYWEtZTRhMjdmNjc3N2YwIiwicHJvcHMiOlt7Im4iOiJDVFBDbGFzc2lmaWNhdGlvbiIsInZhbHMiOlt7InZhbHVlIjoiQ1RQX05UIn1dfV19LCJTdWJqZWN0TGFiZWxzIjpbXSwiVE1DVmVyc2lvbiI6IjE3LjEwLjE4MDQuNDkiLCJUcnVzdGVkTGFiZWxIYXNoIjoiajlSQ0prTUpydFRMUkVwUzg3Z2NiVEl0RE5cL0JBOTdEd29TSGFnTTVzcTBOaFRYeVwvSUZzYnRpSld2Q25ERTM3In0=
dlp-product: dlpe-windows
dlp-reaction: no-action
dlp-version: 11.2.0.6
x-ctpclassification: CTP_NT
authentication-results: spf=none (sender IP is )
 smtp.mailfrom=konstantin.ananyev@intel.com; 
x-originating-ip: [192.198.151.178]
x-ms-publictraffictype: Email
x-ms-office365-filtering-correlation-id: e3959ddd-e34d-4042-cbaa-08d7bafbe06c
x-ms-traffictypediagnostic: SN6PR11MB2800:
x-microsoft-antispam-prvs: <SN6PR11MB2800C36825CBEE677C260D419AEA0@SN6PR11MB2800.namprd11.prod.outlook.com>
x-ms-oob-tlc-oobclassifiers: OLM:10000;
x-forefront-prvs: 0325F6C77B
x-forefront-antispam-report: SFV:NSPM;
 SFS:(10019020)(136003)(366004)(39860400002)(376002)(346002)(396003)(199004)(189003)(86362001)(52536014)(4326008)(81166006)(7696005)(966005)(71200400001)(81156014)(76116006)(8936002)(9686003)(8676002)(55016002)(110136005)(54906003)(316002)(6506007)(2906002)(30864003)(66946007)(26005)(33656002)(478600001)(64756008)(66446008)(186003)(66556008)(5660300002)(66476007)(21314003)(579004);
 DIR:OUT; SFP:1102; SCL:1; SRVR:SN6PR11MB2800;
 H:SN6PR11MB2558.namprd11.prod.outlook.com; FPR:; SPF:None; LANG:en;
 PTR:InfoNoRecords; A:1; MX:1; 
x-ms-exchange-senderadcheck: 1
x-microsoft-antispam: BCL:0;
x-microsoft-antispam-message-info: oGqNdf1EZV+PQcc55Nygc8YOJ8RQNGIDHak3Qjtfzs84kQPaovEC47YGDQZYqkghhsFBokDpahjoITeNS0bbAFSTEFJKwju9qFfGKyze6CqvdZwjQohysrff9b9YdrUXTXUxH55KTPmR0eXlr1OhLj0AV/i56sxhh5ePMbskCixsHsqhslLe9LY6etwl3u80mMsDaZlqUuMpganjWoaLAWznHe4ePPA0f2/Kcpq+YrN7ZUibXyk9yq0XFpEiIz+uhdjMdN66tPS47h79BfmQUeqfO95txbDMkyOIWhJzQ8xJJ+QsI7lWNR/Qcxtr3PeBh4ghHO0jLiPneD8jdXaQ1QCsFA4PNW30XaRWwzfBMppOJ6szIXIb66hKyUakEXD51xa+sCKXHQMeJBsRzEdsrPi+ZRMMAHh0ASCuCOVPFVT97wQeH25Y25Qh1UGyDOLT8zCVh5scGrZGgKz8BsBfdndAygttzPL7arIRWYaSMZjCGBKdIsTx49Tb98sqYB8syP4IjzcfYm202PscNF7ZdDuMok8rJ+26F3aar8oyXYh0QK8mLkvQKBj7HqFgx+3/
x-ms-exchange-antispam-messagedata: o9oDrf9vYEwrMEeOMGT0DYpoyTkybAc3VrU660gLFL66kEUyz6v1HvbFiNywvE4mPdYvMmfOtyW2g8CUaZ7qQ9yc6Dwic7M8CMVBOEKaBDZ6YJAmzYL7bCpT+lSrda+i41/TdryaC1eW+CvDwI4tbw==
x-ms-exchange-transport-forked: True
Content-Type: text/plain; charset="us-ascii"
Content-Transfer-Encoding: quoted-printable
MIME-Version: 1.0
X-MS-Exchange-CrossTenant-Network-Message-Id: e3959ddd-e34d-4042-cbaa-08d7bafbe06c
X-MS-Exchange-CrossTenant-originalarrivaltime: 26 Feb 2020 20:38:45.7050 (UTC)
X-MS-Exchange-CrossTenant-fromentityheader: Hosted
X-MS-Exchange-CrossTenant-id: 46c98d88-e344-4ed4-8496-4ed7712e255d
X-MS-Exchange-CrossTenant-mailboxtype: HOSTED
X-MS-Exchange-CrossTenant-userprincipalname: 6hfWm4UwOVFuH5jtgfbD6UU1y6xQshNk2OjjT8eX4zis+ac5QWeDUbZ46HdclkOXiAxaMqI8of2T8gm5/YMlKGj0oWaQ9iKkYJjdg11QOK8=
X-MS-Exchange-Transport-CrossTenantHeadersStamped: SN6PR11MB2800
X-OriginatorOrg: intel.com
Subject: Re: [dpdk-dev] [RFC 1/1] lib/ring: add scatter gather and serial
	dequeue APIs
X-BeenThere: dev@dpdk.org
X-Mailman-Version: 2.1.15
Precedence: list
List-Id: DPDK patches and discussions <dev.dpdk.org>
List-Unsubscribe: <https://mails.dpdk.org/options/dev>,
 <mailto:dev-request@dpdk.org?subject=unsubscribe>
List-Archive: <http://mails.dpdk.org/archives/dev/>
List-Post: <mailto:dev@dpdk.org>
List-Help: <mailto:dev-request@dpdk.org?subject=help>
List-Subscribe: <https://mails.dpdk.org/listinfo/dev>,
 <mailto:dev-request@dpdk.org?subject=subscribe>
Errors-To: dev-bounces@dpdk.org
Sender: "dev" <dev-bounces@dpdk.org>


Hi Honnappa,

> Add scatter gather APIs to avoid intermediate memcpy. Serial
> dequeue APIs are added to support access to ring elements
> before actual dequeue.
>=20
> Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
> Reviewed-by: Gavin Hu <gavin.hu@arm.com>
> Reviewed-by: Ola Liljedahl <ola.liljedahl@arm.com>
> ---
>  lib/librte_ring/Makefile           |   1 +
>  lib/librte_ring/meson.build        |   1 +
>  lib/librte_ring/rte_ring_c11_mem.h |  98 +++++++
>  lib/librte_ring/rte_ring_elem_sg.h | 417 +++++++++++++++++++++++++++++
>  lib/librte_ring/rte_ring_generic.h |  93 +++++++
>  5 files changed, 610 insertions(+)

As was already noticed by you this patch overlaps quite a bit with another =
one:
http://patches.dpdk.org/patch/66006/

Though it seems there are few significant differences in
our approaches (both API and implementation).
So we probably need to come-up with some common
view first, before moving forward with some unified version.
To start a discussion, I produced some comments, pls see below.=20

I don't see changes in rte_ring.h itself, but I suppose
that's just because it is an RFC and it would be added in later versions?
Another similar question there seems only _bulk_ (RTE_RING_QUEUE_FIXED) mod=
e,
I suppose _burst_ will also be added in later versions?

> diff --git a/lib/librte_ring/rte_ring_elem_sg.h b/lib/librte_ring/rte_rin=
g_elem_sg.h
> new file mode 100644
> index 000000000..a73f4fbfe
> --- /dev/null
> +++ b/lib/librte_ring/rte_ring_elem_sg.h
> @@ -0,0 +1,417 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + *
> + * Copyright (c) 2020 Arm Limited
> + * Copyright (c) 2010-2017 Intel Corporation
> + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
> + * All rights reserved.
> + * Derived from FreeBSD's bufring.h
> + * Used as BSD-3 Licensed with permission from Kip Macy.
> + */
> +
> +#ifndef _RTE_RING_ELEM_SG_H_
> +#define _RTE_RING_ELEM_SG_H_
> +
> +/**
> + * @file
> + * RTE Ring with
> + * 1) user defined element size
> + * 2) scatter gather feature to copy objects to/from the ring
> + * 3) ability to reserve, consume/discard elements in the ring
> + */
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +#include <stdio.h>
> +#include <stdint.h>
> +#include <string.h>
> +#include <sys/queue.h>
> +#include <errno.h>
> +#include <rte_common.h>
> +#include <rte_config.h>
> +#include <rte_memory.h>
> +#include <rte_lcore.h>
> +#include <rte_atomic.h>
> +#include <rte_branch_prediction.h>
> +#include <rte_memzone.h>
> +#include <rte_pause.h>
> +
> +#include "rte_ring.h"
> +#include "rte_ring_elem.h"
> +
> +/* Between load and load. there might be cpu reorder in weak model
> + * (powerpc/arm).
> + * There are 2 choices for the users
> + * 1.use rmb() memory barrier
> + * 2.use one-direction load_acquire/store_release barrier,defined by
> + * CONFIG_RTE_USE_C11_MEM_MODEL=3Dy
> + * It depends on performance test results.
> + * By default, move common functions to rte_ring_generic.h
> + */
> +#ifdef RTE_USE_C11_MEM_MODEL
> +#include "rte_ring_c11_mem.h"
> +#else
> +#include "rte_ring_generic.h"
> +#endif
> +
> +static __rte_always_inline void
> +__rte_ring_get_elem_addr_64(struct rte_ring *r, uint32_t head,
> +	uint32_t num, void **dst1, uint32_t *n1, void **dst2)
> +{
> +	uint32_t idx =3D head & r->mask;
> +	uint64_t *ring =3D (uint64_t *)&r[1];
> +
> +	*dst1 =3D ring + idx;
> +	*n1 =3D num;
> +
> +	if (idx + num > r->size) {
> +		*n1 =3D num - (r->size - idx - 1);
> +		*dst2 =3D ring;
> +	}
> +}
> +
> +static __rte_always_inline void
> +__rte_ring_get_elem_addr_128(struct rte_ring *r, uint32_t head,
> +	uint32_t num, void **dst1, uint32_t *n1, void **dst2)
> +{
> +	uint32_t idx =3D head & r->mask;
> +	rte_int128_t *ring =3D (rte_int128_t *)&r[1];
> +
> +	*dst1 =3D ring + idx;
> +	*n1 =3D num;
> +
> +	if (idx + num > r->size) {
> +		*n1 =3D num - (r->size - idx - 1);
> +		*dst2 =3D ring;
> +	}
> +}
> +
> +static __rte_always_inline void
> +__rte_ring_get_elem_addr(struct rte_ring *r, uint32_t head,
> +	uint32_t esize, uint32_t num, void **dst1, uint32_t *n1, void **dst2)
> +{
> +	if (esize =3D=3D 8)
> +		return __rte_ring_get_elem_addr_64(r, head,
> +						num, dst1, n1, dst2);
> +	else if (esize =3D=3D 16)
> +		return __rte_ring_get_elem_addr_128(r, head,
> +						num, dst1, n1, dst2);
> +	else {
> +		uint32_t idx, scale, nr_idx;
> +		uint32_t *ring =3D (uint32_t *)&r[1];
> +
> +		/* Normalize to uint32_t */
> +		scale =3D esize / sizeof(uint32_t);
> +		idx =3D head & r->mask;
> +		nr_idx =3D idx * scale;
> +
> +		*dst1 =3D ring + nr_idx;
> +		*n1 =3D num;
> +
> +		if (idx + num > r->size) {
> +			*n1 =3D num - (r->size - idx - 1);
> +			*dst2 =3D ring;
> +		}
> +	}
> +}
> +
> +/**
> + * @internal Reserve ring elements to enqueue several objects on the rin=
g
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   This must be the same value used while creating the ring. Otherwise
> + *   the results are undefined.
> + * @param n
> + *   The number of elements to reserve in the ring.
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Reserve a fixed number of elements from a =
ring
> + *   RTE_RING_QUEUE_VARIABLE: Reserve as many elements as possible from =
ring
> + * @param is_sp
> + *   Indicates whether to use single producer or multi-producer reserve
> + * @param old_head
> + *   Producer's head index before reservation.
> + * @param new_head
> + *   Producer's head index after reservation.
> + * @param free_space
> + *   returns the amount of space after the reserve operation has finishe=
d.
> + *   It is not updated if the number of reserved elements is zero.
> + * @param dst1
> + *   Pointer to location in the ring to copy the data.
> + * @param n1
> + *   Number of elements to copy at dst1
> + * @param dst2
> + *   In case of ring wrap around, this pointer provides the location to
> + *   copy the remaining elements. The number of elements to copy at this
> + *   location is equal to (number of elements reserved - n1)
> + * @return
> + *   Actual number of elements reserved.
> + *   If behavior =3D=3D RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_enqueue_elem_reserve(struct rte_ring *r, unsigned int esiz=
e,


I do understand the purpose of reserve, then either commit/abort for serial=
 sync mode,
but what is the purpose of non-serial version of reserve/commit?
In serial  MP/MC case, after _reserve_(n) you always have to do
_commit_(n) - you can't reduce number of elements, or do _abort_.
Again you cannot avoid memcpy(n) here anyhow.
So what is the point of these functions for non-serial case?=20

BTW, I think it would be good to have serial version of _enqueue_ too.

> +		unsigned int n, enum rte_ring_queue_behavior behavior,
> +		unsigned int is_sp, unsigned int *old_head,
> +		unsigned int *new_head, unsigned int *free_space,
> +		void **dst1, unsigned int *n1, void **dst2)

I do understand the intention to avoid memcpy(), but proposed API
seems overcomplicated, error prone, and not very convenient for the user.
I don't think that avoiding memcpy() will save us that many cycles here,
so probably better to keep API model a bit more regular:

n =3D rte_ring_mp_serial_enqueue_bulk_reserve(ring, num, &free_space);
...
/* performs actual memcpy(), m<=3Dn */=20
rte_ring_mp_serial_enqueue_bulk_commit(ring, obj,  m);

/* performs actual memcpy for num elems */=20
n =3D rte_ring_mp_serial_dequeue_bulk_reserve(ring, obj, num, &free_space);
....
/* m<=3Dn */
rte_ring_mp_serial_dequeue_bulk_commit(ring, obj,  m);

Plus, we can have usual enqueue/dequeue API for serial sync mode:
rte_ring_serial_(enqueue/dequeue)_(bulk/burst)

> +{
> +	uint32_t free_entries;
> +
> +	n =3D __rte_ring_move_prod_head(r, is_sp, n, behavior,
> +			old_head, new_head, &free_entries);
> +
> +	if (n =3D=3D 0)
> +		goto end;

Here and in other similar places, why not just 'return 0;'?

> +
> +	__rte_ring_get_elem_addr(r, *old_head, esize, n, dst1, n1, dst2);
> +
> +	if (free_space !=3D NULL)
> +		*free_space =3D free_entries - n;
> +
> +end:
> +	return n;
> +}
> +
> +/**
> + * @internal Consume previously reserved ring elements (for enqueue)
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param old_head
> + *   Producer's head index before reservation.
> + * @param new_head
> + *   Producer's head index after reservation.
> + * @param is_sp
> + *   Indicates whether to use single producer or multi-producer head upd=
ate
> + */
> +static __rte_always_inline void
> +__rte_ring_do_enqueue_elem_commit(struct rte_ring *r,
> +		unsigned int old_head, unsigned int new_head,
> +		unsigned int is_sp)
> +{
> +	update_tail(&r->prod, old_head, new_head, is_sp, 1);
> +}
> +
> +/**
> + * Reserve one element for enqueuing one object on a ring
> + * (multi-producers safe). Application must call
> + * 'rte_ring_mp_enqueue_elem_commit' to complete the enqueue operation.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   This must be the same value used while creating the ring. Otherwise
> + *   the results are undefined.
> + * @param old_head
> + *   Producer's head index before reservation. The same should be passed=
 to
> + *   'rte_ring_mp_enqueue_elem_commit' function.
> + * @param new_head
> + *   Producer's head index after reservation. The same should be passed =
to
> + *   'rte_ring_mp_enqueue_elem_commit' function.
> + * @param free_space
> + *   Returns the amount of space after the reservation operation has fin=
ished.
> + *   It is not updated if the number of reserved elements is zero.
> + * @param dst
> + *   Pointer to location in the ring to copy the data.
> + * @return
> + *   - 0: Success; objects enqueued.
> + *   - -ENOBUFS: Not enough room in the ring to reserve; no element is r=
eserved.
> + */
> +static __rte_always_inline int
> +rte_ring_mp_enqueue_elem_reserve(struct rte_ring *r, unsigned int esize,
> +		unsigned int *old_head, unsigned int *new_head,
> +		unsigned int *free_space, void **dst)
> +{
> +	unsigned int n;
> +
> +	return __rte_ring_do_enqueue_elem_reserve(r, esize, 1,
> +			RTE_RING_QUEUE_FIXED, 0, old_head, new_head,
> +			free_space, dst, &n, NULL) ? 0 : -ENOBUFS;
> +}
> +
> +/**
> + * Consume previously reserved elements (for enqueue) in a ring
> + * (multi-producers safe). This API completes the enqueue operation.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param old_head
> + *   Producer's head index before reservation. This value was returned
> + *   when the API 'rte_ring_mp_enqueue_elem_reserve' was called.
> + * @param new_head
> + *   Producer's head index after reservation. This value was returned
> + *   when the API 'rte_ring_mp_enqueue_elem_reserve' was called.
> + */
> +static __rte_always_inline void
> +rte_ring_mp_enqueue_elem_commit(struct rte_ring *r, unsigned int old_hea=
d,
> +		unsigned int new_head)
> +{
> +	__rte_ring_do_enqueue_elem_commit(r, old_head, new_head, 0);
> +}
> +
> +/**
> + * @internal Reserve elements to dequeue several objects on the ring.
> + * This function blocks if there are elements reserved already.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   This must be the same value used while creating the ring. Otherwise
> + *   the results are undefined.
> + * @param n
> + *   The number of objects to reserve in the ring
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Reserve fixed number of elements in a ring
> + *   RTE_RING_QUEUE_VARIABLE: Reserve as many elements as possible in a =
ring
> + * @param is_sc
> + *   Indicates whether to use single consumer or multi-consumer head upd=
ate
> + * @param old_head
> + *   Consumer's head index before reservation.
> + * @param new_head
> + *   Consumer's head index after reservation.
> + * @param available
> + *   returns the number of remaining ring elements after the reservation
> + *   It is not updated if the number of reserved elements is zero.
> + * @param src1
> + *   Pointer to location in the ring to copy the data from.
> + * @param n1
> + *   Number of elements to copy from src1
> + * @param src2
> + *   In case of wrap around in the ring, this pointer provides the locat=
ion
> + *   to copy the remaining elements from. The number of elements to copy=
 from
> + *   this pointer is equal to (number of elements reserved - n1)
> + * @return
> + *   Actual number of elements reserved.
> + *   If behavior =3D=3D RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_dequeue_elem_reserve_serial(struct rte_ring *r,
> +		unsigned int esize, unsigned int n,
> +		enum rte_ring_queue_behavior behavior, unsigned int is_sc,
> +		unsigned int *old_head, unsigned int *new_head,
> +		unsigned int *available, void **src1, unsigned int *n1,
> +		void **src2)
> +{
> +	uint32_t entries;
> +
> +	n =3D __rte_ring_move_cons_head_serial(r, is_sc, n, behavior,
> +			old_head, new_head, &entries);
> +
> +	if (n =3D=3D 0)
> +		goto end;
> +
> +	__rte_ring_get_elem_addr(r, *old_head, esize, n, src1, n1, src2);
> +
> +	if (available !=3D NULL)
> +		*available =3D entries - n;
> +
> +end:
> +	return n;
> +}
> +
> +/**
> + * @internal Consume previously reserved ring elements (for dequeue)
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param old_head
> + *   Consumer's head index before reservation.
> + * @param new_head
> + *   Consumer's head index after reservation.
> + * @param is_sc
> + *   Indicates whether to use single consumer or multi-consumer head upd=
ate
> + */
> +static __rte_always_inline void
> +__rte_ring_do_dequeue_elem_commit(struct rte_ring *r,
> +		unsigned int old_head, unsigned int new_head,
> +		unsigned int is_sc)
> +{

I think it is a bit dangerous and error-prone approach to let user
specify old_head/new_head manually.
Seems better just _commit(ring, num) - see above.
That way suer don't have to calculate new head mannualy,
plus we can have a check that ring.tail - ring.head >=3D num.   =20

> +	update_tail(&r->cons, old_head, new_head, is_sc, 1);

I think update_tail() is not enough here.
As in some cases we need to update  ring.head also:
let say user reserved 2 elems, but then decided to commit only one. =20
So I think we need a special new function instead of update_tail() here.
BTW, in HTS I use atomic 64-bit read/write to get/set both head and tail in=
 one go.
This is not really required - two 32bit ops would work too, I think.
As usual, both ways have some pros and cons:
using 64bit ops might be faster on 64-bit target, plus it is less error pro=
ne
(no need to think about head/tail read/write ordering, etc.),
though for 32-bit target it would mean some extra overhead.=20

> +}
> +
> +/**
> + * Reserve one element on a ring for dequeue. This function blocks if th=
ere
> + * are elements reserved already. Application must call
> + * 'rte_ring_do_dequeue_elem_commit' or
> + * `rte_ring_do_dequeue_elem_revert_serial' to complete the dequeue oper=
ation.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   This must be the same value used while creating the ring. Otherwise
> + *   the results are undefined.
> + * @param old_head
> + *   Consumer's head index before reservation. The same should be passed=
 to
> + *   'rte_ring_dequeue_elem_commit' function.
> + * @param new_head
> + *   Consumer's head index after reservation. The same should be passed =
to
> + *   'rte_ring_dequeue_elem_commit' function.
> + * @param available
> + *   returns the number of remaining ring elements after the reservation
> + *   It is not updated if the number of reserved elements is zero.
> + * @param src
> + *   Pointer to location in the ring to copy the data from.
> + * @return
> + *   - 0: Success; elements reserved
> + *   - -ENOBUFS: Not enough room in the ring; no element is reserved.
> + */
> +static __rte_always_inline int
> +rte_ring_dequeue_elem_reserve_serial(struct rte_ring *r, unsigned int es=
ize,
> +		unsigned int *old_head, unsigned int *new_head,
> +		unsigned int *available, void **src)
> +{
> +	unsigned int n;
> +
> +	return __rte_ring_do_dequeue_elem_reserve_serial(r, esize, 1,
> +			RTE_RING_QUEUE_FIXED, r->cons.single, old_head,
> +			new_head, available, src, &n, NULL) ? 0 : -ENOBUFS;
> +}
> +
> +/**
> + * Consume previously reserved elements (for dequeue) in a ring
> + * (multi-consumer safe).
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param old_head
> + *   Consumer's head index before reservation. This value was returned
> + *   when the API 'rte_ring_dequeue_elem_reserve_xxx' was called.
> + * @param new_head
> + *   Consumer's head index after reservation. This value was returned
> + *   when the API 'rte_ring_dequeue_elem_reserve_xxx' was called.
> + */
> +static __rte_always_inline void
> +rte_ring_dequeue_elem_commit(struct rte_ring *r, unsigned int old_head,
> +		unsigned int new_head)
> +{
> +	__rte_ring_do_dequeue_elem_commit(r, old_head, new_head,
> +						r->cons.single);
> +}
> +
> +/**
> + * Discard previously reserved elements (for dequeue) in a ring.
> + *
> + * @warning
> + * This API can be called only if the ring elements were reserved
> + * using 'rte_ring_dequeue_xxx_elem_reserve_serial' APIs.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + */
> +static __rte_always_inline void
> +rte_ring_dequeue_elem_revert_serial(struct rte_ring *r)
> +{
> +	__rte_ring_revert_head(&r->cons);
> +}
> +
> +#ifdef __cplusplus
> +}
> +#endif
> +
> +#endif /* _RTE_RING_ELEM_SG_H_ */
> diff --git a/lib/librte_ring/rte_ring_generic.h b/lib/librte_ring/rte_rin=
g_generic.h
> index 953cdbbd5..8d7a7ffcc 100644
> --- a/lib/librte_ring/rte_ring_generic.h
> +++ b/lib/librte_ring/rte_ring_generic.h
> @@ -170,4 +170,97 @@ __rte_ring_move_cons_head(struct rte_ring *r, unsign=
ed int is_sc,
>  	return n;
>  }
>=20
> +/**
> + * @internal This function updates the consumer head if there are no
> + * prior reserved elements on the ring.
> + *
> + * @param r
> + *   A pointer to the ring structure
> + * @param is_sc
> + *   Indicates whether multi-consumer path is needed or not
> + * @param n
> + *   The number of elements we will want to dequeue, i.e. how far should=
 the
> + *   head be moved
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a rin=
g
> + *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from rin=
g
> + * @param old_head
> + *   Returns head value as it was before the move, i.e. where dequeue st=
arts
> + * @param new_head
> + *   Returns the current/new head value i.e. where dequeue finishes
> + * @param entries
> + *   Returns the number of entries in the ring BEFORE head was moved
> + * @return
> + *   - Actual number of objects dequeued.
> + *     If behavior =3D=3D RTE_RING_QUEUE_FIXED, this will be 0 or n only=
.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_move_cons_head_serial(struct rte_ring *r, unsigned int is_sc,
> +		unsigned int n, enum rte_ring_queue_behavior behavior,
> +		uint32_t *old_head, uint32_t *new_head,
> +		uint32_t *entries)
> +{
> +	unsigned int max =3D n;
> +	int success;
> +
> +	/* move cons.head atomically */
> +	do {
> +		/* Restore n as it may change every loop */
> +		n =3D max;
> +
> +		*old_head =3D r->cons.head;
> +
> +		/* add rmb barrier to avoid load/load reorder in weak
> +		 * memory model. It is noop on x86
> +		 */
> +		rte_smp_rmb();
> +
> +		/* Ensure that cons.tail and cons.head are the same */
> +		if (*old_head !=3D r->cons.tail) {
> +			rte_pause();
> +
> +			success =3D 0;
> +			continue;
> +		}
> +
> +		/* The subtraction is done between two unsigned 32bits value
> +		 * (the result is always modulo 32 bits even if we have
> +		 * cons_head > prod_tail). So 'entries' is always between 0
> +		 * and size(ring)-1.
> +		 */
> +		*entries =3D (r->prod.tail - *old_head);
> +
> +		/* Set the actual entries for dequeue */
> +		if (n > *entries)
> +			n =3D (behavior =3D=3D RTE_RING_QUEUE_FIXED) ? 0 : *entries;
> +
> +		if (unlikely(n =3D=3D 0))
> +			return 0;
> +
> +		*new_head =3D *old_head + n;
> +		if (is_sc) {
> +			r->cons.head =3D *new_head;
> +			rte_smp_rmb();
> +			success =3D 1;

I don't think we need to worry about SC case in this function.
For sc(/sp)_serial (if we need such mode) - we probably can use normal move=
_(cons/prod)_head().

> +		} else {
> +			success =3D rte_atomic32_cmpset(&r->cons.head, *old_head,
> +					*new_head);
> +		}
> +	} while (unlikely(success =3D=3D 0));
> +	return n;
> +}
> +
> +/**
> + * @internal This function updates the head to match the tail
> + *
> + * @param ht
> + *   A pointer to the ring's head-tail structure
> + */
> +static __rte_always_inline void
> +__rte_ring_revert_head(struct rte_ring_headtail *ht)
> +{
> +	/* Discard the reserved ring elements. */
> +	ht->head =3D ht->tail;
> +}
> +
>  #endif /* _RTE_RING_GENERIC_H_ */
> --
> 2.17.1