From mboxrd@z Thu Jan  1 00:00:00 1970
Return-Path: <dev-bounces@dpdk.org>
Received: from dpdk.org (dpdk.org [92.243.14.124])
	by inbox.dpdk.org (Postfix) with ESMTP id B955BA04B6;
	Tue, 13 Oct 2020 00:31:43 +0200 (CEST)
Received: from [92.243.14.124] (localhost [127.0.0.1])
	by dpdk.org (Postfix) with ESMTP id 212561D9E7;
	Tue, 13 Oct 2020 00:31:41 +0200 (CEST)
Received: from EUR04-HE1-obe.outbound.protection.outlook.com
 (mail-eopbgr70053.outbound.protection.outlook.com [40.107.7.53])
 by dpdk.org (Postfix) with ESMTP id E8F861D99C
 for <dev@dpdk.org>; Tue, 13 Oct 2020 00:31:38 +0200 (CEST)
DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=armh.onmicrosoft.com; 
 s=selector2-armh-onmicrosoft-com;
 h=From:Date:Subject:Message-ID:Content-Type:MIME-Version:X-MS-Exchange-SenderADCheck;
 bh=4i2R8ET7+DhkpQpEnuKVK51ba80+U7f2DjNz0YGoY78=;
 b=oi3z1rdOb1qOG8OH/F6NypXP4v3agfHKs9iRsrsUEi3lIEqbufl3x/81TEmcW8TD0apTI+JuXO1gcJdHHuOS3pwbSBoqUJ2yoc9If3/CUCzi1ZKRMAp4gmGDxFhCVzEMCDImulBmEiUnzfHxWiIlQlkUiBQUP1uQ4mdEEO0QwqQ=
Received: from MRXP264CA0018.FRAP264.PROD.OUTLOOK.COM (2603:10a6:500:15::30)
 by VE1PR08MB4734.eurprd08.prod.outlook.com (2603:10a6:802:a1::16) with
 Microsoft SMTP Server (version=TLS1_2,
 cipher=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384) id 15.20.3455.26; Mon, 12 Oct
 2020 22:31:35 +0000
Received: from VE1EUR03FT047.eop-EUR03.prod.protection.outlook.com
 (2603:10a6:500:15:cafe::f7) by MRXP264CA0018.outlook.office365.com
 (2603:10a6:500:15::30) with Microsoft SMTP Server (version=TLS1_2,
 cipher=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384) id 15.20.3455.21 via Frontend
 Transport; Mon, 12 Oct 2020 22:31:35 +0000
X-MS-Exchange-Authentication-Results: spf=pass (sender IP is 63.35.35.123)
 smtp.mailfrom=arm.com; dpdk.org; dkim=pass (signature was verified)
 header.d=armh.onmicrosoft.com;dpdk.org; dmarc=pass action=none
 header.from=arm.com;
Received-SPF: Pass (protection.outlook.com: domain of arm.com designates
 63.35.35.123 as permitted sender) receiver=protection.outlook.com;
 client-ip=63.35.35.123; helo=64aa7808-outbound-1.mta.getcheckrecipient.com;
Received: from 64aa7808-outbound-1.mta.getcheckrecipient.com (63.35.35.123) by
 VE1EUR03FT047.mail.protection.outlook.com (10.152.19.218) with
 Microsoft SMTP
 Server (version=TLS1_2, cipher=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384) id
 15.20.3455.23 via Frontend Transport; Mon, 12 Oct 2020 22:31:34 +0000
Received: ("Tessian outbound a0bffebca527:v64");
 Mon, 12 Oct 2020 22:31:33 +0000
X-CR-MTA-TID: 64aa7808
Received: from 11bcddf45002.2
 by 64aa7808-outbound-1.mta.getcheckrecipient.com id
 0CFB80BB-EF60-40EE-86B3-A9A288844F85.1; 
 Mon, 12 Oct 2020 22:31:28 +0000
Received: from EUR04-HE1-obe.outbound.protection.outlook.com
 by 64aa7808-outbound-1.mta.getcheckrecipient.com with ESMTPS id 11bcddf45002.2
 (version=TLSv1.2 cipher=ECDHE-RSA-AES256-GCM-SHA384);
 Mon, 12 Oct 2020 22:31:28 +0000
ARC-Seal: i=1; a=rsa-sha256; s=arcselector9901; d=microsoft.com; cv=none;
 b=Ze5Z1YBYV98+x9x2PwwoTycavx3jhOBAla30AT13sT4CszmyGaYXZQdlzGjHSi6NVQd42x3UdllK7EiiOtQOXpDtqU53QfoLxYxBoBACH35m2t47IPCqjpvBiylEKd3Cq2HB8PmX3UEXJlRKzWIZomNlkv6/rY1CUl7IqLf2xWlXaWTCHUhoC3Cgtl/3nZ+5zzGSJoW6GEzv3bG4tHnTFuyFnFdOHZwp6rdg9ITjoLFKExWCofI0/iNpV8N/1XzFYyUbNhBi6iKoWRDFIGsWUyh2nz3+S1L1zw6NBfu+rZstccsmquYOWYAUU0AEqGD/RSfpi0ye25Cp8Dy8CCl4qQ==
ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=microsoft.com; 
 s=arcselector9901;
 h=From:Date:Subject:Message-ID:Content-Type:MIME-Version:X-MS-Exchange-SenderADCheck;
 bh=4i2R8ET7+DhkpQpEnuKVK51ba80+U7f2DjNz0YGoY78=;
 b=XY/BFuNn609An9z0vvLIm06sYRz8GfHnvXE4yrajNdcAHEXr0tjHI79APZuinfWJfreDm3V54PbqTFnk30zk/rr0bwcZwpang99THPkX+8VbnfdgkmydrERa0rDA5jqp3heeB2kiLDXgx3IoO/H1kBBeb+4YxoFRXWzlAREnjCT05UCHGVRzgFlyoAu6X70433mier7Y7of+WPEmaCCfDKoAkFqvl6KOH1fW0sKfAx3IB/B3WFcUyR7YjtExyTcmJ49wfpZbfQwz7kO5nZbXLzCKXT+/VK/asahCBAHNSMTjVGF4bpeIvfIxU1BZ7hm4ixEuQ6SHianMIsJdCbtEiA==
ARC-Authentication-Results: i=1; mx.microsoft.com 1; spf=pass
 smtp.mailfrom=arm.com; dmarc=pass action=none header.from=arm.com; dkim=pass
 header.d=arm.com; arc=none
DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=armh.onmicrosoft.com; 
 s=selector2-armh-onmicrosoft-com;
 h=From:Date:Subject:Message-ID:Content-Type:MIME-Version:X-MS-Exchange-SenderADCheck;
 bh=4i2R8ET7+DhkpQpEnuKVK51ba80+U7f2DjNz0YGoY78=;
 b=oi3z1rdOb1qOG8OH/F6NypXP4v3agfHKs9iRsrsUEi3lIEqbufl3x/81TEmcW8TD0apTI+JuXO1gcJdHHuOS3pwbSBoqUJ2yoc9If3/CUCzi1ZKRMAp4gmGDxFhCVzEMCDImulBmEiUnzfHxWiIlQlkUiBQUP1uQ4mdEEO0QwqQ=
Received: from DBAPR08MB5814.eurprd08.prod.outlook.com (2603:10a6:10:1b1::6)
 by DB8PR08MB5241.eurprd08.prod.outlook.com (2603:10a6:10:e2::17) with
 Microsoft SMTP Server (version=TLS1_2,
 cipher=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384) id 15.20.3455.28; Mon, 12 Oct
 2020 22:31:25 +0000
Received: from DBAPR08MB5814.eurprd08.prod.outlook.com
 ([fe80::7814:9c1:781f:475d]) by DBAPR08MB5814.eurprd08.prod.outlook.com
 ([fe80::7814:9c1:781f:475d%4]) with mapi id 15.20.3455.029; Mon, 12 Oct 2020
 22:31:25 +0000
From: Honnappa Nagarahalli <Honnappa.Nagarahalli@arm.com>
To: "Ananyev, Konstantin" <konstantin.ananyev@intel.com>, "dev@dpdk.org"
 <dev@dpdk.org>
CC: "olivier.matz@6wind.com" <olivier.matz@6wind.com>,
 "david.marchand@redhat.com" <david.marchand@redhat.com>, nd <nd@arm.com>,
 Honnappa Nagarahalli <Honnappa.Nagarahalli@arm.com>, nd <nd@arm.com>
Thread-Topic: [RFC v2 1/1] lib/ring: add scatter gather APIs
Thread-Index: AQHWoLO/yqeV7FdtXkWOtLPuFIVaL6mUZvWA
Date: Mon, 12 Oct 2020 22:31:25 +0000
Message-ID: <DBAPR08MB58141A517542254AB3B4671498070@DBAPR08MB5814.eurprd08.prod.outlook.com>
References: <20200224203931.21256-1-honnappa.nagarahalli@arm.com>
 <20201006132905.46205-1-honnappa.nagarahalli@arm.com>
 <20201006132905.46205-2-honnappa.nagarahalli@arm.com>
 <BYAPR11MB3301820B94597437B6F968F59A070@BYAPR11MB3301.namprd11.prod.outlook.com>
In-Reply-To: <BYAPR11MB3301820B94597437B6F968F59A070@BYAPR11MB3301.namprd11.prod.outlook.com>
Accept-Language: en-US
Content-Language: en-US
X-MS-Has-Attach: 
X-MS-TNEF-Correlator: 
x-ts-tracking-id: DA2C060E71E74746AF3FCD0A80744800.0
x-checkrecipientchecked: true
Authentication-Results-Original: intel.com; dkim=none (message not signed)
 header.d=none;intel.com; dmarc=none action=none header.from=arm.com;
x-originating-ip: [107.77.217.158]
x-ms-publictraffictype: Email
X-MS-Office365-Filtering-HT: Tenant
X-MS-Office365-Filtering-Correlation-Id: ecd543e2-d966-4627-03ec-08d86efe9358
x-ms-traffictypediagnostic: DB8PR08MB5241:|VE1PR08MB4734:
x-ms-exchange-transport-forked: True
X-Microsoft-Antispam-PRVS: <VE1PR08MB4734074F0322A717768F7D9298070@VE1PR08MB4734.eurprd08.prod.outlook.com>
x-checkrecipientrouted: true
nodisclaimer: true
x-ms-oob-tlc-oobclassifiers: OLM:10000;OLM:10000;
X-MS-Exchange-SenderADCheck: 1
X-Microsoft-Antispam-Untrusted: BCL:0;
X-Microsoft-Antispam-Message-Info-Original: cZ3rVsyL69c31s69SXjtu2IgPYQj7RQqcpqJjNBs7r70qIWzWDxjIWdaHrC440uwzoY3WvEkBZSrTIkTXPM6oPwcSm5dYeqhrI/+kUYBn/vwDC4e/mepNqItorCg+CJFLX/kcmTXX8kQxGQj+tTwPSd+iBXhrMfuO+rBzV1OyaadomLpM69SBQXDYKWoKi4Bx9RJHOruShDEvVseredUJ7BBJ4WXBbcTlHy9qjgYLyA+aWcVkUF3qpYDd+mdgvPuHkPN0mDq9oIgFWxFuAvfVyE24SB+E4kz1mN6zdX6tbJUJ+pssXhx/ou3mFe0w9AP
X-Forefront-Antispam-Report-Untrusted: CIP:255.255.255.255; CTRY:; LANG:en;
 SCL:1; SRV:; IPV:NLI; SFV:NSPM; H:DBAPR08MB5814.eurprd08.prod.outlook.com;
 PTR:; CAT:NONE;
 SFS:(4636009)(376002)(346002)(39850400004)(396003)(136003)(366004)(478600001)(76116006)(33656002)(4326008)(2906002)(7696005)(6506007)(86362001)(52536014)(8936002)(55016002)(316002)(110136005)(66556008)(8676002)(26005)(66446008)(66946007)(54906003)(66476007)(186003)(30864003)(5660300002)(9686003)(71200400001)(83380400001)(64756008)(579004)(559001);
 DIR:OUT; SFP:1101; 
x-ms-exchange-antispam-messagedata: YAO+wZuS6WOQG7Bk8p1MXjaq26ds6ox07PtR6oRlr5o7QkgfUryTqT3okyD8WBNzaF5yyZXm0hVseouIAJBYJn2v00XAR1sUF6YmXhP/ClCoa6lv8iEp2MV37fa2trLXL/JTwqXsUkijZp/rkzpXhDqMxXVFdjdPAkt/WNGRAfVzt+VsE3sC02NsB0KWL4iV2PSkRuRh1k+5fvsvyOXP861lhPuIzfXQzzCQDW+qo0g16MOWFHPWjBbRtA/nLvGqNn3fP1oP67FuaTUMzBDjFHM9a/lc76RxoUt4y5DS3kNEKf+0w0khlHzlF9WDkLqqT+wTymw0bCJwiAR9i7efLIWM6XPhSXAfW0PaI2MKMGPw+iP0dgqc65v1D1uKuGh/LGlnrxdm8A3TfW8DQfC+RuqnBqqcdgeRI631FJzdd4aLcrlXEPKT3bJ/O66rInVWQGEuXm/WepsmNvEBwh7D4uPMt58MbVKT720r4v8F/CMz4tdnFgfpm/AkGeuzY2aTh8g1VIM0Zso/miVrpG4PlzYGtt6iQrJQl37kPV3oq6adnEyP4tevWMzhwUtFZEv/Pnnldprx35nRwqdM16mqbdUdaBvop2I/Zw9XDmdoDOh8AqbNfNznUYhef6r1M0oh2Xhrt3GU7hBcXQ46NbwEiQ==
Content-Type: text/plain; charset="us-ascii"
Content-Transfer-Encoding: quoted-printable
MIME-Version: 1.0
X-MS-Exchange-Transport-CrossTenantHeadersStamped: DB8PR08MB5241
Original-Authentication-Results: intel.com; dkim=none (message not signed)
 header.d=none;intel.com; dmarc=none action=none header.from=arm.com;
X-EOPAttributedMessage: 0
X-MS-Exchange-Transport-CrossTenantHeadersStripped: VE1EUR03FT047.eop-EUR03.prod.protection.outlook.com
X-MS-Office365-Filtering-Correlation-Id-Prvs: 45557d2e-252b-4848-0577-08d86efe8e03
X-Microsoft-Antispam: BCL:0;
X-Microsoft-Antispam-Message-Info: spZwkvAZ0MQw/4mzGT5esn0O2+Y0Xpp0zDIE8SEdE8tVBLLsZEQ3DiZHBDxjW5a9IpxorGz24m8ztF9V4HmMlburmYD7+Fa7qzbH+LuAAp0CC8IhxqUglSWgeGCmOkSjau1Zd8AQWQxxkiisJGnRCBzyN8Ck+q58rAnDILo9qWZbZxK4+h2PoPO34WqcOaorSLIFha08HAA+WsBDATGGkq6oSqw1lqdzp9SXg4ZkHxJk50dkvFXnsYCeY0TDCb/nx/88TnYQYKmPjIatuIfxUJ8y7+CP8yij6oPoDF5rd0+qfn8XZkxoTwc0m6k6RzYBxLyIwUQ+CGvbEL0rh+r398xktFCAkXNjbnD3ladl5As9VTU8SCn1SK8cy1oQlePSnWT4U8TY60cjUXriF+Njjg==
X-Forefront-Antispam-Report: CIP:63.35.35.123; CTRY:IE; LANG:en; SCL:1; SRV:;
 IPV:CAL; SFV:NSPM; H:64aa7808-outbound-1.mta.getcheckrecipient.com;
 PTR:ec2-63-35-35-123.eu-west-1.compute.amazonaws.com; CAT:NONE;
 SFS:(4636009)(396003)(376002)(346002)(136003)(39850400004)(46966005)(7696005)(47076004)(83380400001)(8936002)(6506007)(4326008)(82310400003)(8676002)(30864003)(82740400003)(33656002)(478600001)(5660300002)(110136005)(81166007)(70586007)(316002)(70206006)(54906003)(356005)(186003)(336012)(2906002)(9686003)(26005)(52536014)(86362001)(55016002);
 DIR:OUT; SFP:1101; 
X-OriginatorOrg: arm.com
X-MS-Exchange-CrossTenant-OriginalArrivalTime: 12 Oct 2020 22:31:34.4163 (UTC)
X-MS-Exchange-CrossTenant-Network-Message-Id: ecd543e2-d966-4627-03ec-08d86efe9358
X-MS-Exchange-CrossTenant-Id: f34e5979-57d9-4aaa-ad4d-b122a662184d
X-MS-Exchange-CrossTenant-OriginalAttributedTenantConnectingIp: TenantId=f34e5979-57d9-4aaa-ad4d-b122a662184d; Ip=[63.35.35.123];
 Helo=[64aa7808-outbound-1.mta.getcheckrecipient.com]
X-MS-Exchange-CrossTenant-AuthSource: VE1EUR03FT047.eop-EUR03.prod.protection.outlook.com
X-MS-Exchange-CrossTenant-AuthAs: Anonymous
X-MS-Exchange-CrossTenant-FromEntityHeader: HybridOnPrem
X-MS-Exchange-Transport-CrossTenantHeadersStamped: VE1PR08MB4734
Subject: Re: [dpdk-dev] [RFC v2 1/1] lib/ring: add scatter gather APIs
X-BeenThere: dev@dpdk.org
X-Mailman-Version: 2.1.15
Precedence: list
List-Id: DPDK patches and discussions <dev.dpdk.org>
List-Unsubscribe: <https://mails.dpdk.org/options/dev>,
 <mailto:dev-request@dpdk.org?subject=unsubscribe>
List-Archive: <http://mails.dpdk.org/archives/dev/>
List-Post: <mailto:dev@dpdk.org>
List-Help: <mailto:dev-request@dpdk.org?subject=help>
List-Subscribe: <https://mails.dpdk.org/listinfo/dev>,
 <mailto:dev-request@dpdk.org?subject=subscribe>
Errors-To: dev-bounces@dpdk.org
Sender: "dev" <dev-bounces@dpdk.org>

Hi Konstantin,
	Appreciate your feedback.

<snip>

>=20
>=20
> > Add scatter gather APIs to avoid intermediate memcpy. Use cases that
> > involve copying large amount of data to/from the ring can benefit from
> > these APIs.
> >
> > Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
> > ---
> >  lib/librte_ring/meson.build        |   3 +-
> >  lib/librte_ring/rte_ring_elem.h    |   1 +
> >  lib/librte_ring/rte_ring_peek_sg.h | 552
> > +++++++++++++++++++++++++++++
> >  3 files changed, 555 insertions(+), 1 deletion(-)  create mode 100644
> > lib/librte_ring/rte_ring_peek_sg.h
>=20
> As a generic one - need to update ring UT both func and perf to
> test/measure this new API.
Yes, will add.

>=20
> >
> > diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build
> > index 31c0b4649..377694713 100644
> > --- a/lib/librte_ring/meson.build
> > +++ b/lib/librte_ring/meson.build
> > @@ -12,4 +12,5 @@ headers =3D files('rte_ring.h',
> >  		'rte_ring_peek.h',
> >  		'rte_ring_peek_c11_mem.h',
> >  		'rte_ring_rts.h',
> > -		'rte_ring_rts_c11_mem.h')
> > +		'rte_ring_rts_c11_mem.h',
> > +		'rte_ring_peek_sg.h')
> > diff --git a/lib/librte_ring/rte_ring_elem.h
> > b/lib/librte_ring/rte_ring_elem.h index 938b398fc..7d3933f15 100644
> > --- a/lib/librte_ring/rte_ring_elem.h
> > +++ b/lib/librte_ring/rte_ring_elem.h
> > @@ -1079,6 +1079,7 @@ rte_ring_dequeue_burst_elem(struct rte_ring *r,
> > void *obj_table,
> >
> >  #ifdef ALLOW_EXPERIMENTAL_API
> >  #include <rte_ring_peek.h>
> > +#include <rte_ring_peek_sg.h>
> >  #endif
> >
> >  #include <rte_ring.h>
> > diff --git a/lib/librte_ring/rte_ring_peek_sg.h
> > b/lib/librte_ring/rte_ring_peek_sg.h
> > new file mode 100644
> > index 000000000..97d5764a6
> > --- /dev/null
> > +++ b/lib/librte_ring/rte_ring_peek_sg.h
> > @@ -0,0 +1,552 @@
> > +/* SPDX-License-Identifier: BSD-3-Clause
> > + *
> > + * Copyright (c) 2020 Arm
> > + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
> > + * All rights reserved.
> > + * Derived from FreeBSD's bufring.h
> > + * Used as BSD-3 Licensed with permission from Kip Macy.
> > + */
> > +
> > +#ifndef _RTE_RING_PEEK_SG_H_
> > +#define _RTE_RING_PEEK_SG_H_
> > +
> > +/**
> > + * @file
> > + * @b EXPERIMENTAL: this API may change without prior notice
> > + * It is not recommended to include this file directly.
> > + * Please include <rte_ring_elem.h> instead.
> > + *
> > + * Ring Peek Scatter Gather APIs
> > + * Introduction of rte_ring with scatter gather serialized
> > +producer/consumer
> > + * (HTS sync mode) makes it possible to split public enqueue/dequeue
> > +API
> > + * into 3 phases:
> > + * - enqueue/dequeue start
> > + * - copy data to/from the ring
> > + * - enqueue/dequeue finish
> > + * Along with the advantages of the peek APIs, these APIs provide the
> > +ability
> > + * to avoid copying of the data to temporary area.
> > + *
> > + * Note that right now this new API is available only for two sync mod=
es:
> > + * 1) Single Producer/Single Consumer (RTE_RING_SYNC_ST)
> > + * 2) Serialized Producer/Serialized Consumer (RTE_RING_SYNC_MT_HTS).
> > + * It is a user responsibility to create/init ring with appropriate
> > +sync
> > + * modes selected.
> > + *
> > + * Example usage:
> > + * // read 1 elem from the ring:
> > + * n =3D rte_ring_enqueue_sg_bulk_start(ring, 32, &sgd, NULL);
> > + * if (n !=3D 0) {
> > + *	//Copy objects in the ring
> > + *	memcpy (sgd->ptr1, obj, sgd->n1 * sizeof(uintptr_t));
> > + *	if (n !=3D sgd->n1)
> > + *		//Second memcpy because of wrapround
> > + *		n2 =3D n - sgd->n1;
> > + *		memcpy (sgd->ptr2, obj[n2], n2 * sizeof(uintptr_t));
> > + *	rte_ring_dequeue_sg_finish(ring, n);
>=20
> It is not clear from the example above why do you need SG(ZC) API.
> Existing peek API would be able to handle such situation (just copy will =
be
> done internally). Probably better to use examples you provided in your la=
st
> reply to Olivier.
Agree, not a good example, will change it.

>=20
> > + * }
> > + *
> > + * Note that between _start_ and _finish_ none other thread can
> > + proceed
> > + * with enqueue(/dequeue) operation till _finish_ completes.
> > + */
> > +
> > +#ifdef __cplusplus
> > +extern "C" {
> > +#endif
> > +
> > +#include <rte_ring_peek_c11_mem.h>
> > +
> > +/* Rock that needs to be passed between reserve and commit APIs */
> > +struct rte_ring_sg_data {
> > +	/* Pointer to the first space in the ring */
> > +	void **ptr1;
> > +	/* Pointer to the second space in the ring if there is wrap-around */
> > +	void **ptr2;
> > +	/* Number of elements in the first pointer. If this is equal to
> > +	 * the number of elements requested, then ptr2 is NULL.
> > +	 * Otherwise, subtracting n1 from number of elements requested
> > +	 * will give the number of elements available at ptr2.
> > +	 */
> > +	unsigned int n1;
> > +};
>=20
> I wonder what is the primary goal of that API?
> The reason I am asking: from what I understand with this patch ZC API wil=
l
> work only for ST and HTS modes (same as peek API).
> Though, I think it is possible to make it work for any sync model, by cha=
nging
Agree, the functionality can be extended to other modes as well. I added th=
ese 2 modes as I found the use cases for these.

> API a bit: instead of returning sg_data to the user, force him to provide
> function to read/write elems from/to the ring.
> Just a schematic one, to illustrate the idea:
>=20
> typedef void (*write_ring_func_t)(void *elem, /*pointer to first elem to
> update inside the ring*/
> 				uint32_t num, /* number of elems to update
> */
> 				uint32_t esize,
> 				void *udata  /* caller provide data */);
>=20
> rte_ring_enqueue_zc_bulk_elem(struct rte_ring *r, unsigned int esize,
> 	unsigned int n, unsigned int *free_space, write_ring_func_t wf, void
> *udata) {
> 	struct rte_ring_sg_data sgd;
> 	.....
> 	n =3D move_head_tail(r, ...);
>=20
> 	/* get sgd data based on n */
> 	get_elem_addr(r, ..., &sgd);
>=20
> 	/* call user defined function to fill reserved elems */
> 	wf(sgd.p1, sgd.n1, esize, udata);
> 	if (n !=3D n1)
> 		wf(sgd.p2, sgd.n2, esize, udata);
>=20
> 	....
> 	return n;
> }
>=20
I think the call back function makes it difficult to use the API. The call =
back function would be a wrapper around another function or API which will =
have its own arguments. Now all those parameters have to passed using the '=
udata'. For ex: in the 2nd example that I provided earlier, the user has to=
 create a wrapper around 'rte_eth_rx_burst' API and then provide the parame=
ters to 'rte_eth_rx_burst' through 'udata'. 'udata' would need a structure =
definition as well.

> If we want ZC peek API also - some extra work need to be done with
> introducing return value for write_ring_func() and checking it properly, =
but I
> don't see any big problems here too.
> That way ZC API can support all sync models, plus we don't need to expose
> sg_data to the user directly.
Other modes can be supported with the method used in this patch as well. If=
 you see a need, I can add them.
IMO, only issue with exposing sg_data is ABI compatibility in the future. I=
 think, we can align the 'struct rte_ring_sg_data' to cache line boundary a=
nd it should provide ability to extend it in the future without affecting t=
he ABI compatibility.

> Also, in future, we probably can de-duplicate the code by making our non-=
ZC
> API to use that one internally (pass ring_enqueue_elems()/ob_table as a
> parameters).
>=20
> > +
> > +static __rte_always_inline void
> > +__rte_ring_get_elem_addr_64(struct rte_ring *r, uint32_t head,
> > +	uint32_t num, void **dst1, uint32_t *n1, void **dst2) {
> > +	uint32_t idx =3D head & r->mask;
> > +	uint64_t *ring =3D (uint64_t *)&r[1];
> > +
> > +	*dst1 =3D ring + idx;
> > +	*n1 =3D num;
> > +
> > +	if (idx + num > r->size) {
> > +		*n1 =3D num - (r->size - idx - 1);
> > +		*dst2 =3D ring;
> > +	}
> > +}
> > +
> > +static __rte_always_inline void
> > +__rte_ring_get_elem_addr_128(struct rte_ring *r, uint32_t head,
> > +	uint32_t num, void **dst1, uint32_t *n1, void **dst2) {
> > +	uint32_t idx =3D head & r->mask;
> > +	rte_int128_t *ring =3D (rte_int128_t *)&r[1];
> > +
> > +	*dst1 =3D ring + idx;
> > +	*n1 =3D num;
> > +
> > +	if (idx + num > r->size) {
> > +		*n1 =3D num - (r->size - idx - 1);
> > +		*dst2 =3D ring;
> > +	}
> > +}
> > +
> > +static __rte_always_inline void
> > +__rte_ring_get_elem_addr(struct rte_ring *r, uint32_t head,
> > +	uint32_t esize, uint32_t num, void **dst1, uint32_t *n1, void
> > +**dst2) {
> > +	if (esize =3D=3D 8)
> > +		__rte_ring_get_elem_addr_64(r, head,
> > +						num, dst1, n1, dst2);
> > +	else if (esize =3D=3D 16)
> > +		__rte_ring_get_elem_addr_128(r, head,
> > +						num, dst1, n1, dst2);
>=20
>=20
> I don't think we need that special handling for 8/16B sizes.
> In all functions esize is an input parameter.
> If user will specify is as a constant - compiler will be able to convert =
multiply
> to shift and add ops.
Ok, I will check this out.

>=20
> > +	else {
> > +		uint32_t idx, scale, nr_idx;
> > +		uint32_t *ring =3D (uint32_t *)&r[1];
> > +
> > +		/* Normalize to uint32_t */
> > +		scale =3D esize / sizeof(uint32_t);
> > +		idx =3D head & r->mask;
> > +		nr_idx =3D idx * scale;
> > +
> > +		*dst1 =3D ring + nr_idx;
> > +		*n1 =3D num;
> > +
> > +		if (idx + num > r->size) {
> > +			*n1 =3D num - (r->size - idx - 1);
> > +			*dst2 =3D ring;
> > +		}
> > +	}
> > +}
> > +
> > +/**
> > + * @internal This function moves prod head value.
> > + */
> > +static __rte_always_inline unsigned int
> > +__rte_ring_do_enqueue_sg_elem_start(struct rte_ring *r, unsigned int
> esize,
> > +		uint32_t n, enum rte_ring_queue_behavior behavior,
> > +		struct rte_ring_sg_data *sgd, unsigned int *free_space) {
> > +	uint32_t free, head, next;
> > +
> > +	switch (r->prod.sync_type) {
> > +	case RTE_RING_SYNC_ST:
> > +		n =3D __rte_ring_move_prod_head(r, RTE_RING_SYNC_ST, n,
> > +			behavior, &head, &next, &free);
> > +		__rte_ring_get_elem_addr(r, head, esize, n, (void **)&sgd-
> >ptr1,
> > +			&sgd->n1, (void **)&sgd->ptr2);
> > +		break;
> > +	case RTE_RING_SYNC_MT_HTS:
> > +		n =3D __rte_ring_hts_move_prod_head(r, n, behavior, &head,
> &free);
> > +		__rte_ring_get_elem_addr(r, head, esize, n, (void **)&sgd-
> >ptr1,
> > +			&sgd->n1, (void **)&sgd->ptr2);
> > +		break;
> > +	case RTE_RING_SYNC_MT:
> > +	case RTE_RING_SYNC_MT_RTS:
> > +	default:
> > +		/* unsupported mode, shouldn't be here */
> > +		RTE_ASSERT(0);
> > +		n =3D 0;
> > +		free =3D 0;
> > +	}
> > +
> > +	if (free_space !=3D NULL)
> > +		*free_space =3D free - n;
> > +	return n;
> > +}
> > +
> > +/**
> > + * Start to enqueue several objects on the ring.
> > + * Note that no actual objects are put in the queue by this function,
> > + * it just reserves space for the user on the ring.
> > + * User has to copy objects into the queue using the returned pointers=
.
> > + * User should call rte_ring_enqueue_sg_bulk_elem_finish to complete
> > +the
> > + * enqueue operation.
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param esize
> > + *   The size of ring element, in bytes. It must be a multiple of 4.
> > + * @param n
> > + *   The number of objects to add in the ring.
> > + * @param sgd
> > + *   The scatter-gather data containing pointers for copying data.
> > + * @param free_space
> > + *   if non-NULL, returns the amount of space in the ring after the
> > + *   reservation operation has finished.
> > + * @return
> > + *   The number of objects that can be enqueued, either 0 or n
> > + */
> > +__rte_experimental
> > +static __rte_always_inline unsigned int
> > +rte_ring_enqueue_sg_bulk_elem_start(struct rte_ring *r, unsigned int
> esize,
> > +	unsigned int n, struct rte_ring_sg_data *sgd, unsigned int
> > +*free_space) {
> > +	return __rte_ring_do_enqueue_sg_elem_start(r, esize, n,
> > +			RTE_RING_QUEUE_FIXED, sgd, free_space); }
> > +
> > +/**
> > + * Start to enqueue several pointers to objects on the ring.
> > + * Note that no actual pointers are put in the queue by this
> > +function,
> > + * it just reserves space for the user on the ring.
> > + * User has to copy pointers to objects into the queue using the
> > + * returned pointers.
> > + * User should call rte_ring_enqueue_sg_bulk_finish to complete the
> > + * enqueue operation.
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param n
> > + *   The number of objects to add in the ring.
> > + * @param sgd
> > + *   The scatter-gather data containing pointers for copying data.
> > + * @param free_space
> > + *   if non-NULL, returns the amount of space in the ring after the
> > + *   reservation operation has finished.
> > + * @return
> > + *   The number of objects that can be enqueued, either 0 or n
> > + */
> > +__rte_experimental
> > +static __rte_always_inline unsigned int
> > +rte_ring_enqueue_sg_bulk_start(struct rte_ring *r, unsigned int n,
> > +	struct rte_ring_sg_data *sgd, unsigned int *free_space) {
> > +	return rte_ring_enqueue_sg_bulk_elem_start(r, sizeof(uintptr_t), n,
> > +							sgd, free_space);
> > +}
> > +/**
> > + * Start to enqueue several objects on the ring.
> > + * Note that no actual objects are put in the queue by this function,
> > + * it just reserves space for the user on the ring.
> > + * User has to copy objects into the queue using the returned pointers=
.
> > + * User should call rte_ring_enqueue_sg_bulk_elem_finish to complete
> > +the
> > + * enqueue operation.
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param esize
> > + *   The size of ring element, in bytes. It must be a multiple of 4.
> > + * @param n
> > + *   The number of objects to add in the ring.
> > + * @param sgd
> > + *   The scatter-gather data containing pointers for copying data.
> > + * @param free_space
> > + *   if non-NULL, returns the amount of space in the ring after the
> > + *   reservation operation has finished.
> > + * @return
> > + *   The number of objects that can be enqueued, either 0 or n
> > + */
> > +__rte_experimental
> > +static __rte_always_inline unsigned int
> > +rte_ring_enqueue_sg_burst_elem_start(struct rte_ring *r, unsigned int
> esize,
> > +	unsigned int n, struct rte_ring_sg_data *sgd, unsigned int
> > +*free_space) {
> > +	return __rte_ring_do_enqueue_sg_elem_start(r, esize, n,
> > +			RTE_RING_QUEUE_VARIABLE, sgd, free_space); }
> > +
> > +/**
> > + * Start to enqueue several pointers to objects on the ring.
> > + * Note that no actual pointers are put in the queue by this
> > +function,
> > + * it just reserves space for the user on the ring.
> > + * User has to copy pointers to objects into the queue using the
> > + * returned pointers.
> > + * User should call rte_ring_enqueue_sg_bulk_finish to complete the
> > + * enqueue operation.
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param n
> > + *   The number of objects to add in the ring.
> > + * @param sgd
> > + *   The scatter-gather data containing pointers for copying data.
> > + * @param free_space
> > + *   if non-NULL, returns the amount of space in the ring after the
> > + *   reservation operation has finished.
> > + * @return
> > + *   The number of objects that can be enqueued, either 0 or n
> > + */
> > +__rte_experimental
> > +static __rte_always_inline unsigned int
> > +rte_ring_enqueue_sg_burst_start(struct rte_ring *r, unsigned int n,
> > +	struct rte_ring_sg_data *sgd, unsigned int *free_space) {
> > +	return rte_ring_enqueue_sg_burst_elem_start(r, sizeof(uintptr_t),
> n,
> > +							sgd, free_space);
> > +}
> > +
> > +/**
> > + * Complete enqueuing several objects on the ring.
> > + * Note that number of objects to enqueue should not exceed previous
> > + * enqueue_start return value.
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param n
> > + *   The number of objects to add to the ring.
> > + */
> > +__rte_experimental
> > +static __rte_always_inline void
> > +rte_ring_enqueue_sg_elem_finish(struct rte_ring *r, unsigned int n) {
> > +	uint32_t tail;
> > +
> > +	switch (r->prod.sync_type) {
> > +	case RTE_RING_SYNC_ST:
> > +		n =3D __rte_ring_st_get_tail(&r->prod, &tail, n);
> > +		__rte_ring_st_set_head_tail(&r->prod, tail, n, 1);
> > +		break;
> > +	case RTE_RING_SYNC_MT_HTS:
> > +		n =3D __rte_ring_hts_get_tail(&r->hts_prod, &tail, n);
> > +		__rte_ring_hts_set_head_tail(&r->hts_prod, tail, n, 1);
> > +		break;
> > +	case RTE_RING_SYNC_MT:
> > +	case RTE_RING_SYNC_MT_RTS:
> > +	default:
> > +		/* unsupported mode, shouldn't be here */
> > +		RTE_ASSERT(0);
> > +	}
> > +}
> > +
> > +/**
> > + * Complete enqueuing several pointers to objects on the ring.
> > + * Note that number of objects to enqueue should not exceed previous
> > + * enqueue_start return value.
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param n
> > + *   The number of pointers to objects to add to the ring.
> > + */
> > +__rte_experimental
> > +static __rte_always_inline void
> > +rte_ring_enqueue_sg_finish(struct rte_ring *r, unsigned int n) {
> > +	rte_ring_enqueue_sg_elem_finish(r, n); }
> > +
> > +/**
> > + * @internal This function moves cons head value and copies up to *n*
> > + * objects from the ring to the user provided obj_table.
> > + */
> > +static __rte_always_inline unsigned int
> > +__rte_ring_do_dequeue_sg_elem_start(struct rte_ring *r,
> > +	uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior,
> > +	struct rte_ring_sg_data *sgd, unsigned int *available) {
> > +	uint32_t avail, head, next;
> > +
> > +	switch (r->cons.sync_type) {
> > +	case RTE_RING_SYNC_ST:
> > +		n =3D __rte_ring_move_cons_head(r, RTE_RING_SYNC_ST, n,
> > +			behavior, &head, &next, &avail);
> > +		__rte_ring_get_elem_addr(r, head, esize, n,
> > +					sgd->ptr1, &sgd->n1, sgd->ptr2);
> > +		break;
> > +	case RTE_RING_SYNC_MT_HTS:
> > +		n =3D __rte_ring_hts_move_cons_head(r, n, behavior,
> > +			&head, &avail);
> > +		__rte_ring_get_elem_addr(r, head, esize, n,
> > +					sgd->ptr1, &sgd->n1, sgd->ptr2);
> > +		break;
> > +	case RTE_RING_SYNC_MT:
> > +	case RTE_RING_SYNC_MT_RTS:
> > +	default:
> > +		/* unsupported mode, shouldn't be here */
> > +		RTE_ASSERT(0);
> > +		n =3D 0;
> > +		avail =3D 0;
> > +	}
> > +
> > +	if (available !=3D NULL)
> > +		*available =3D avail - n;
> > +	return n;
> > +}
> > +
> > +/**
> > + * Start to dequeue several objects from the ring.
> > + * Note that no actual objects are copied from the queue by this funct=
ion.
> > + * User has to copy objects from the queue using the returned pointers=
.
> > + * User should call rte_ring_dequeue_sg_bulk_elem_finish to complete
> > +the
> > + * dequeue operation.
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param esize
> > + *   The size of ring element, in bytes. It must be a multiple of 4.
> > + * @param n
> > + *   The number of objects to remove from the ring.
> > + * @param sgd
> > + *   The scatter-gather data containing pointers for copying data.
> > + * @param available
> > + *   If non-NULL, returns the number of remaining ring entries after t=
he
> > + *   dequeue has finished.
> > + * @return
> > + *   The number of objects that can be dequeued, either 0 or n
> > + */
> > +__rte_experimental
> > +static __rte_always_inline unsigned int
> > +rte_ring_dequeue_sg_bulk_elem_start(struct rte_ring *r, unsigned int
> esize,
> > +	unsigned int n, struct rte_ring_sg_data *sgd, unsigned int
> > +*available) {
> > +	return __rte_ring_do_dequeue_sg_elem_start(r, esize, n,
> > +			RTE_RING_QUEUE_FIXED, sgd, available); }
> > +
> > +/**
> > + * Start to dequeue several pointers to objects from the ring.
> > + * Note that no actual pointers are removed from the queue by this
> function.
> > + * User has to copy pointers to objects from the queue using the
> > + * returned pointers.
> > + * User should call rte_ring_dequeue_sg_bulk_finish to complete the
> > + * dequeue operation.
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param n
> > + *   The number of objects to remove from the ring.
> > + * @param sgd
> > + *   The scatter-gather data containing pointers for copying data.
> > + * @param available
> > + *   If non-NULL, returns the number of remaining ring entries after t=
he
> > + *   dequeue has finished.
> > + * @return
> > + *   The number of objects that can be dequeued, either 0 or n
> > + */
> > +__rte_experimental
> > +static __rte_always_inline unsigned int
> > +rte_ring_dequeue_sg_bulk_start(struct rte_ring *r, unsigned int n,
> > +	struct rte_ring_sg_data *sgd, unsigned int *available) {
> > +	return rte_ring_dequeue_sg_bulk_elem_start(r, sizeof(uintptr_t),
> > +		n, sgd, available);
> > +}
> > +
> > +/**
> > + * Start to dequeue several objects from the ring.
> > + * Note that no actual objects are copied from the queue by this funct=
ion.
> > + * User has to copy objects from the queue using the returned pointers=
.
> > + * User should call rte_ring_dequeue_sg_burst_elem_finish to complete
> > +the
> > + * dequeue operation.
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param esize
> > + *   The size of ring element, in bytes. It must be a multiple of 4.
> > + *   This must be the same value used while creating the ring. Otherwi=
se
> > + *   the results are undefined.
> > + * @param n
> > + *   The number of objects to dequeue from the ring.
> > + * @param sgd
> > + *   The scatter-gather data containing pointers for copying data.
> > + * @param available
> > + *   If non-NULL, returns the number of remaining ring entries after t=
he
> > + *   dequeue has finished.
> > + * @return
> > + *   The number of objects that can be dequeued, either 0 or n
> > + */
> > +__rte_experimental
> > +static __rte_always_inline unsigned int
> > +rte_ring_dequeue_sg_burst_elem_start(struct rte_ring *r, unsigned int
> esize,
> > +	unsigned int n, struct rte_ring_sg_data *sgd, unsigned int
> > +*available) {
> > +	return __rte_ring_do_dequeue_sg_elem_start(r, esize, n,
> > +			RTE_RING_QUEUE_VARIABLE, sgd, available); }
> > +
> > +/**
> > + * Start to dequeue several pointers to objects from the ring.
> > + * Note that no actual pointers are removed from the queue by this
> function.
> > + * User has to copy pointers to objects from the queue using the
> > + * returned pointers.
> > + * User should call rte_ring_dequeue_sg_burst_finish to complete the
> > + * dequeue operation.
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param n
> > + *   The number of objects to remove from the ring.
> > + * @param sgd
> > + *   The scatter-gather data containing pointers for copying data.
> > + * @param available
> > + *   If non-NULL, returns the number of remaining ring entries after t=
he
> > + *   dequeue has finished.
> > + * @return
> > + *   The number of objects that can be dequeued, either 0 or n
> > + */
> > +__rte_experimental
> > +static __rte_always_inline unsigned int
> > +rte_ring_dequeue_sg_burst_start(struct rte_ring *r, unsigned int n,
> > +		struct rte_ring_sg_data *sgd, unsigned int *available) {
> > +	return rte_ring_dequeue_sg_burst_elem_start(r, sizeof(uintptr_t),
> n,
> > +			sgd, available);
> > +}
> > +
> > +/**
> > + * Complete dequeuing several objects from the ring.
> > + * Note that number of objects to dequeued should not exceed previous
> > + * dequeue_start return value.
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param n
> > + *   The number of objects to remove from the ring.
> > + */
> > +__rte_experimental
> > +static __rte_always_inline void
> > +rte_ring_dequeue_sg_elem_finish(struct rte_ring *r, unsigned int n) {
> > +	uint32_t tail;
> > +
> > +	switch (r->cons.sync_type) {
> > +	case RTE_RING_SYNC_ST:
> > +		n =3D __rte_ring_st_get_tail(&r->cons, &tail, n);
> > +		__rte_ring_st_set_head_tail(&r->cons, tail, n, 0);
> > +		break;
> > +	case RTE_RING_SYNC_MT_HTS:
> > +		n =3D __rte_ring_hts_get_tail(&r->hts_cons, &tail, n);
> > +		__rte_ring_hts_set_head_tail(&r->hts_cons, tail, n, 0);
> > +		break;
> > +	case RTE_RING_SYNC_MT:
> > +	case RTE_RING_SYNC_MT_RTS:
> > +	default:
> > +		/* unsupported mode, shouldn't be here */
> > +		RTE_ASSERT(0);
> > +	}
> > +}
> > +
> > +/**
> > + * Complete dequeuing several objects from the ring.
> > + * Note that number of objects to dequeued should not exceed previous
> > + * dequeue_start return value.
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param n
> > + *   The number of objects to remove from the ring.
> > + */
> > +__rte_experimental
> > +static __rte_always_inline void
> > +rte_ring_dequeue_sg_finish(struct rte_ring *r, unsigned int n) {
> > +	rte_ring_dequeue_elem_finish(r, n);
> > +}
> > +
> > +#ifdef __cplusplus
> > +}
> > +#endif
> > +
> > +#endif /* _RTE_RING_PEEK_SG_H_ */
> > --
> > 2.17.1