From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from dpdk.org (dpdk.org [92.243.14.124]) by inbox.dpdk.org (Postfix) with ESMTP id B955BA04B6; Tue, 13 Oct 2020 00:31:43 +0200 (CEST) Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id 212561D9E7; Tue, 13 Oct 2020 00:31:41 +0200 (CEST) Received: from EUR04-HE1-obe.outbound.protection.outlook.com (mail-eopbgr70053.outbound.protection.outlook.com [40.107.7.53]) by dpdk.org (Postfix) with ESMTP id E8F861D99C for ; Tue, 13 Oct 2020 00:31:38 +0200 (CEST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=armh.onmicrosoft.com; s=selector2-armh-onmicrosoft-com; h=From:Date:Subject:Message-ID:Content-Type:MIME-Version:X-MS-Exchange-SenderADCheck; bh=4i2R8ET7+DhkpQpEnuKVK51ba80+U7f2DjNz0YGoY78=; b=oi3z1rdOb1qOG8OH/F6NypXP4v3agfHKs9iRsrsUEi3lIEqbufl3x/81TEmcW8TD0apTI+JuXO1gcJdHHuOS3pwbSBoqUJ2yoc9If3/CUCzi1ZKRMAp4gmGDxFhCVzEMCDImulBmEiUnzfHxWiIlQlkUiBQUP1uQ4mdEEO0QwqQ= Received: from MRXP264CA0018.FRAP264.PROD.OUTLOOK.COM (2603:10a6:500:15::30) by VE1PR08MB4734.eurprd08.prod.outlook.com (2603:10a6:802:a1::16) with Microsoft SMTP Server (version=TLS1_2, cipher=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384) id 15.20.3455.26; Mon, 12 Oct 2020 22:31:35 +0000 Received: from VE1EUR03FT047.eop-EUR03.prod.protection.outlook.com (2603:10a6:500:15:cafe::f7) by MRXP264CA0018.outlook.office365.com (2603:10a6:500:15::30) with Microsoft SMTP Server (version=TLS1_2, cipher=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384) id 15.20.3455.21 via Frontend Transport; Mon, 12 Oct 2020 22:31:35 +0000 X-MS-Exchange-Authentication-Results: spf=pass (sender IP is 63.35.35.123) smtp.mailfrom=arm.com; dpdk.org; dkim=pass (signature was verified) header.d=armh.onmicrosoft.com;dpdk.org; dmarc=pass action=none header.from=arm.com; Received-SPF: Pass (protection.outlook.com: domain of arm.com designates 63.35.35.123 as permitted sender) receiver=protection.outlook.com; client-ip=63.35.35.123; helo=64aa7808-outbound-1.mta.getcheckrecipient.com; Received: from 64aa7808-outbound-1.mta.getcheckrecipient.com (63.35.35.123) by VE1EUR03FT047.mail.protection.outlook.com (10.152.19.218) with Microsoft SMTP Server (version=TLS1_2, cipher=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384) id 15.20.3455.23 via Frontend Transport; Mon, 12 Oct 2020 22:31:34 +0000 Received: ("Tessian outbound a0bffebca527:v64"); Mon, 12 Oct 2020 22:31:33 +0000 X-CR-MTA-TID: 64aa7808 Received: from 11bcddf45002.2 by 64aa7808-outbound-1.mta.getcheckrecipient.com id 0CFB80BB-EF60-40EE-86B3-A9A288844F85.1; Mon, 12 Oct 2020 22:31:28 +0000 Received: from EUR04-HE1-obe.outbound.protection.outlook.com by 64aa7808-outbound-1.mta.getcheckrecipient.com with ESMTPS id 11bcddf45002.2 (version=TLSv1.2 cipher=ECDHE-RSA-AES256-GCM-SHA384); Mon, 12 Oct 2020 22:31:28 +0000 ARC-Seal: i=1; a=rsa-sha256; s=arcselector9901; d=microsoft.com; cv=none; b=Ze5Z1YBYV98+x9x2PwwoTycavx3jhOBAla30AT13sT4CszmyGaYXZQdlzGjHSi6NVQd42x3UdllK7EiiOtQOXpDtqU53QfoLxYxBoBACH35m2t47IPCqjpvBiylEKd3Cq2HB8PmX3UEXJlRKzWIZomNlkv6/rY1CUl7IqLf2xWlXaWTCHUhoC3Cgtl/3nZ+5zzGSJoW6GEzv3bG4tHnTFuyFnFdOHZwp6rdg9ITjoLFKExWCofI0/iNpV8N/1XzFYyUbNhBi6iKoWRDFIGsWUyh2nz3+S1L1zw6NBfu+rZstccsmquYOWYAUU0AEqGD/RSfpi0ye25Cp8Dy8CCl4qQ== ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=microsoft.com; s=arcselector9901; h=From:Date:Subject:Message-ID:Content-Type:MIME-Version:X-MS-Exchange-SenderADCheck; bh=4i2R8ET7+DhkpQpEnuKVK51ba80+U7f2DjNz0YGoY78=; b=XY/BFuNn609An9z0vvLIm06sYRz8GfHnvXE4yrajNdcAHEXr0tjHI79APZuinfWJfreDm3V54PbqTFnk30zk/rr0bwcZwpang99THPkX+8VbnfdgkmydrERa0rDA5jqp3heeB2kiLDXgx3IoO/H1kBBeb+4YxoFRXWzlAREnjCT05UCHGVRzgFlyoAu6X70433mier7Y7of+WPEmaCCfDKoAkFqvl6KOH1fW0sKfAx3IB/B3WFcUyR7YjtExyTcmJ49wfpZbfQwz7kO5nZbXLzCKXT+/VK/asahCBAHNSMTjVGF4bpeIvfIxU1BZ7hm4ixEuQ6SHianMIsJdCbtEiA== ARC-Authentication-Results: i=1; mx.microsoft.com 1; spf=pass smtp.mailfrom=arm.com; dmarc=pass action=none header.from=arm.com; dkim=pass header.d=arm.com; arc=none DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=armh.onmicrosoft.com; s=selector2-armh-onmicrosoft-com; h=From:Date:Subject:Message-ID:Content-Type:MIME-Version:X-MS-Exchange-SenderADCheck; bh=4i2R8ET7+DhkpQpEnuKVK51ba80+U7f2DjNz0YGoY78=; b=oi3z1rdOb1qOG8OH/F6NypXP4v3agfHKs9iRsrsUEi3lIEqbufl3x/81TEmcW8TD0apTI+JuXO1gcJdHHuOS3pwbSBoqUJ2yoc9If3/CUCzi1ZKRMAp4gmGDxFhCVzEMCDImulBmEiUnzfHxWiIlQlkUiBQUP1uQ4mdEEO0QwqQ= Received: from DBAPR08MB5814.eurprd08.prod.outlook.com (2603:10a6:10:1b1::6) by DB8PR08MB5241.eurprd08.prod.outlook.com (2603:10a6:10:e2::17) with Microsoft SMTP Server (version=TLS1_2, cipher=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384) id 15.20.3455.28; Mon, 12 Oct 2020 22:31:25 +0000 Received: from DBAPR08MB5814.eurprd08.prod.outlook.com ([fe80::7814:9c1:781f:475d]) by DBAPR08MB5814.eurprd08.prod.outlook.com ([fe80::7814:9c1:781f:475d%4]) with mapi id 15.20.3455.029; Mon, 12 Oct 2020 22:31:25 +0000 From: Honnappa Nagarahalli To: "Ananyev, Konstantin" , "dev@dpdk.org" CC: "olivier.matz@6wind.com" , "david.marchand@redhat.com" , nd , Honnappa Nagarahalli , nd Thread-Topic: [RFC v2 1/1] lib/ring: add scatter gather APIs Thread-Index: AQHWoLO/yqeV7FdtXkWOtLPuFIVaL6mUZvWA Date: Mon, 12 Oct 2020 22:31:25 +0000 Message-ID: References: <20200224203931.21256-1-honnappa.nagarahalli@arm.com> <20201006132905.46205-1-honnappa.nagarahalli@arm.com> <20201006132905.46205-2-honnappa.nagarahalli@arm.com> In-Reply-To: Accept-Language: en-US Content-Language: en-US X-MS-Has-Attach: X-MS-TNEF-Correlator: x-ts-tracking-id: DA2C060E71E74746AF3FCD0A80744800.0 x-checkrecipientchecked: true Authentication-Results-Original: intel.com; dkim=none (message not signed) header.d=none;intel.com; dmarc=none action=none header.from=arm.com; x-originating-ip: [107.77.217.158] x-ms-publictraffictype: Email X-MS-Office365-Filtering-HT: Tenant X-MS-Office365-Filtering-Correlation-Id: ecd543e2-d966-4627-03ec-08d86efe9358 x-ms-traffictypediagnostic: DB8PR08MB5241:|VE1PR08MB4734: x-ms-exchange-transport-forked: True X-Microsoft-Antispam-PRVS: x-checkrecipientrouted: true nodisclaimer: true x-ms-oob-tlc-oobclassifiers: OLM:10000;OLM:10000; X-MS-Exchange-SenderADCheck: 1 X-Microsoft-Antispam-Untrusted: BCL:0; X-Microsoft-Antispam-Message-Info-Original: cZ3rVsyL69c31s69SXjtu2IgPYQj7RQqcpqJjNBs7r70qIWzWDxjIWdaHrC440uwzoY3WvEkBZSrTIkTXPM6oPwcSm5dYeqhrI/+kUYBn/vwDC4e/mepNqItorCg+CJFLX/kcmTXX8kQxGQj+tTwPSd+iBXhrMfuO+rBzV1OyaadomLpM69SBQXDYKWoKi4Bx9RJHOruShDEvVseredUJ7BBJ4WXBbcTlHy9qjgYLyA+aWcVkUF3qpYDd+mdgvPuHkPN0mDq9oIgFWxFuAvfVyE24SB+E4kz1mN6zdX6tbJUJ+pssXhx/ou3mFe0w9AP X-Forefront-Antispam-Report-Untrusted: CIP:255.255.255.255; CTRY:; LANG:en; SCL:1; SRV:; IPV:NLI; SFV:NSPM; H:DBAPR08MB5814.eurprd08.prod.outlook.com; PTR:; CAT:NONE; SFS:(4636009)(376002)(346002)(39850400004)(396003)(136003)(366004)(478600001)(76116006)(33656002)(4326008)(2906002)(7696005)(6506007)(86362001)(52536014)(8936002)(55016002)(316002)(110136005)(66556008)(8676002)(26005)(66446008)(66946007)(54906003)(66476007)(186003)(30864003)(5660300002)(9686003)(71200400001)(83380400001)(64756008)(579004)(559001); DIR:OUT; SFP:1101; x-ms-exchange-antispam-messagedata: YAO+wZuS6WOQG7Bk8p1MXjaq26ds6ox07PtR6oRlr5o7QkgfUryTqT3okyD8WBNzaF5yyZXm0hVseouIAJBYJn2v00XAR1sUF6YmXhP/ClCoa6lv8iEp2MV37fa2trLXL/JTwqXsUkijZp/rkzpXhDqMxXVFdjdPAkt/WNGRAfVzt+VsE3sC02NsB0KWL4iV2PSkRuRh1k+5fvsvyOXP861lhPuIzfXQzzCQDW+qo0g16MOWFHPWjBbRtA/nLvGqNn3fP1oP67FuaTUMzBDjFHM9a/lc76RxoUt4y5DS3kNEKf+0w0khlHzlF9WDkLqqT+wTymw0bCJwiAR9i7efLIWM6XPhSXAfW0PaI2MKMGPw+iP0dgqc65v1D1uKuGh/LGlnrxdm8A3TfW8DQfC+RuqnBqqcdgeRI631FJzdd4aLcrlXEPKT3bJ/O66rInVWQGEuXm/WepsmNvEBwh7D4uPMt58MbVKT720r4v8F/CMz4tdnFgfpm/AkGeuzY2aTh8g1VIM0Zso/miVrpG4PlzYGtt6iQrJQl37kPV3oq6adnEyP4tevWMzhwUtFZEv/Pnnldprx35nRwqdM16mqbdUdaBvop2I/Zw9XDmdoDOh8AqbNfNznUYhef6r1M0oh2Xhrt3GU7hBcXQ46NbwEiQ== Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: quoted-printable MIME-Version: 1.0 X-MS-Exchange-Transport-CrossTenantHeadersStamped: DB8PR08MB5241 Original-Authentication-Results: intel.com; dkim=none (message not signed) header.d=none;intel.com; dmarc=none action=none header.from=arm.com; X-EOPAttributedMessage: 0 X-MS-Exchange-Transport-CrossTenantHeadersStripped: VE1EUR03FT047.eop-EUR03.prod.protection.outlook.com X-MS-Office365-Filtering-Correlation-Id-Prvs: 45557d2e-252b-4848-0577-08d86efe8e03 X-Microsoft-Antispam: BCL:0; X-Microsoft-Antispam-Message-Info: spZwkvAZ0MQw/4mzGT5esn0O2+Y0Xpp0zDIE8SEdE8tVBLLsZEQ3DiZHBDxjW5a9IpxorGz24m8ztF9V4HmMlburmYD7+Fa7qzbH+LuAAp0CC8IhxqUglSWgeGCmOkSjau1Zd8AQWQxxkiisJGnRCBzyN8Ck+q58rAnDILo9qWZbZxK4+h2PoPO34WqcOaorSLIFha08HAA+WsBDATGGkq6oSqw1lqdzp9SXg4ZkHxJk50dkvFXnsYCeY0TDCb/nx/88TnYQYKmPjIatuIfxUJ8y7+CP8yij6oPoDF5rd0+qfn8XZkxoTwc0m6k6RzYBxLyIwUQ+CGvbEL0rh+r398xktFCAkXNjbnD3ladl5As9VTU8SCn1SK8cy1oQlePSnWT4U8TY60cjUXriF+Njjg== X-Forefront-Antispam-Report: CIP:63.35.35.123; CTRY:IE; LANG:en; SCL:1; SRV:; IPV:CAL; SFV:NSPM; H:64aa7808-outbound-1.mta.getcheckrecipient.com; PTR:ec2-63-35-35-123.eu-west-1.compute.amazonaws.com; CAT:NONE; SFS:(4636009)(396003)(376002)(346002)(136003)(39850400004)(46966005)(7696005)(47076004)(83380400001)(8936002)(6506007)(4326008)(82310400003)(8676002)(30864003)(82740400003)(33656002)(478600001)(5660300002)(110136005)(81166007)(70586007)(316002)(70206006)(54906003)(356005)(186003)(336012)(2906002)(9686003)(26005)(52536014)(86362001)(55016002); DIR:OUT; SFP:1101; X-OriginatorOrg: arm.com X-MS-Exchange-CrossTenant-OriginalArrivalTime: 12 Oct 2020 22:31:34.4163 (UTC) X-MS-Exchange-CrossTenant-Network-Message-Id: ecd543e2-d966-4627-03ec-08d86efe9358 X-MS-Exchange-CrossTenant-Id: f34e5979-57d9-4aaa-ad4d-b122a662184d X-MS-Exchange-CrossTenant-OriginalAttributedTenantConnectingIp: TenantId=f34e5979-57d9-4aaa-ad4d-b122a662184d; Ip=[63.35.35.123]; Helo=[64aa7808-outbound-1.mta.getcheckrecipient.com] X-MS-Exchange-CrossTenant-AuthSource: VE1EUR03FT047.eop-EUR03.prod.protection.outlook.com X-MS-Exchange-CrossTenant-AuthAs: Anonymous X-MS-Exchange-CrossTenant-FromEntityHeader: HybridOnPrem X-MS-Exchange-Transport-CrossTenantHeadersStamped: VE1PR08MB4734 Subject: Re: [dpdk-dev] [RFC v2 1/1] lib/ring: add scatter gather APIs X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" Hi Konstantin, Appreciate your feedback. >=20 >=20 > > Add scatter gather APIs to avoid intermediate memcpy. Use cases that > > involve copying large amount of data to/from the ring can benefit from > > these APIs. > > > > Signed-off-by: Honnappa Nagarahalli > > --- > > lib/librte_ring/meson.build | 3 +- > > lib/librte_ring/rte_ring_elem.h | 1 + > > lib/librte_ring/rte_ring_peek_sg.h | 552 > > +++++++++++++++++++++++++++++ > > 3 files changed, 555 insertions(+), 1 deletion(-) create mode 100644 > > lib/librte_ring/rte_ring_peek_sg.h >=20 > As a generic one - need to update ring UT both func and perf to > test/measure this new API. Yes, will add. >=20 > > > > diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build > > index 31c0b4649..377694713 100644 > > --- a/lib/librte_ring/meson.build > > +++ b/lib/librte_ring/meson.build > > @@ -12,4 +12,5 @@ headers =3D files('rte_ring.h', > > 'rte_ring_peek.h', > > 'rte_ring_peek_c11_mem.h', > > 'rte_ring_rts.h', > > - 'rte_ring_rts_c11_mem.h') > > + 'rte_ring_rts_c11_mem.h', > > + 'rte_ring_peek_sg.h') > > diff --git a/lib/librte_ring/rte_ring_elem.h > > b/lib/librte_ring/rte_ring_elem.h index 938b398fc..7d3933f15 100644 > > --- a/lib/librte_ring/rte_ring_elem.h > > +++ b/lib/librte_ring/rte_ring_elem.h > > @@ -1079,6 +1079,7 @@ rte_ring_dequeue_burst_elem(struct rte_ring *r, > > void *obj_table, > > > > #ifdef ALLOW_EXPERIMENTAL_API > > #include > > +#include > > #endif > > > > #include > > diff --git a/lib/librte_ring/rte_ring_peek_sg.h > > b/lib/librte_ring/rte_ring_peek_sg.h > > new file mode 100644 > > index 000000000..97d5764a6 > > --- /dev/null > > +++ b/lib/librte_ring/rte_ring_peek_sg.h > > @@ -0,0 +1,552 @@ > > +/* SPDX-License-Identifier: BSD-3-Clause > > + * > > + * Copyright (c) 2020 Arm > > + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org > > + * All rights reserved. > > + * Derived from FreeBSD's bufring.h > > + * Used as BSD-3 Licensed with permission from Kip Macy. > > + */ > > + > > +#ifndef _RTE_RING_PEEK_SG_H_ > > +#define _RTE_RING_PEEK_SG_H_ > > + > > +/** > > + * @file > > + * @b EXPERIMENTAL: this API may change without prior notice > > + * It is not recommended to include this file directly. > > + * Please include instead. > > + * > > + * Ring Peek Scatter Gather APIs > > + * Introduction of rte_ring with scatter gather serialized > > +producer/consumer > > + * (HTS sync mode) makes it possible to split public enqueue/dequeue > > +API > > + * into 3 phases: > > + * - enqueue/dequeue start > > + * - copy data to/from the ring > > + * - enqueue/dequeue finish > > + * Along with the advantages of the peek APIs, these APIs provide the > > +ability > > + * to avoid copying of the data to temporary area. > > + * > > + * Note that right now this new API is available only for two sync mod= es: > > + * 1) Single Producer/Single Consumer (RTE_RING_SYNC_ST) > > + * 2) Serialized Producer/Serialized Consumer (RTE_RING_SYNC_MT_HTS). > > + * It is a user responsibility to create/init ring with appropriate > > +sync > > + * modes selected. > > + * > > + * Example usage: > > + * // read 1 elem from the ring: > > + * n =3D rte_ring_enqueue_sg_bulk_start(ring, 32, &sgd, NULL); > > + * if (n !=3D 0) { > > + * //Copy objects in the ring > > + * memcpy (sgd->ptr1, obj, sgd->n1 * sizeof(uintptr_t)); > > + * if (n !=3D sgd->n1) > > + * //Second memcpy because of wrapround > > + * n2 =3D n - sgd->n1; > > + * memcpy (sgd->ptr2, obj[n2], n2 * sizeof(uintptr_t)); > > + * rte_ring_dequeue_sg_finish(ring, n); >=20 > It is not clear from the example above why do you need SG(ZC) API. > Existing peek API would be able to handle such situation (just copy will = be > done internally). Probably better to use examples you provided in your la= st > reply to Olivier. Agree, not a good example, will change it. >=20 > > + * } > > + * > > + * Note that between _start_ and _finish_ none other thread can > > + proceed > > + * with enqueue(/dequeue) operation till _finish_ completes. > > + */ > > + > > +#ifdef __cplusplus > > +extern "C" { > > +#endif > > + > > +#include > > + > > +/* Rock that needs to be passed between reserve and commit APIs */ > > +struct rte_ring_sg_data { > > + /* Pointer to the first space in the ring */ > > + void **ptr1; > > + /* Pointer to the second space in the ring if there is wrap-around */ > > + void **ptr2; > > + /* Number of elements in the first pointer. If this is equal to > > + * the number of elements requested, then ptr2 is NULL. > > + * Otherwise, subtracting n1 from number of elements requested > > + * will give the number of elements available at ptr2. > > + */ > > + unsigned int n1; > > +}; >=20 > I wonder what is the primary goal of that API? > The reason I am asking: from what I understand with this patch ZC API wil= l > work only for ST and HTS modes (same as peek API). > Though, I think it is possible to make it work for any sync model, by cha= nging Agree, the functionality can be extended to other modes as well. I added th= ese 2 modes as I found the use cases for these. > API a bit: instead of returning sg_data to the user, force him to provide > function to read/write elems from/to the ring. > Just a schematic one, to illustrate the idea: >=20 > typedef void (*write_ring_func_t)(void *elem, /*pointer to first elem to > update inside the ring*/ > uint32_t num, /* number of elems to update > */ > uint32_t esize, > void *udata /* caller provide data */); >=20 > rte_ring_enqueue_zc_bulk_elem(struct rte_ring *r, unsigned int esize, > unsigned int n, unsigned int *free_space, write_ring_func_t wf, void > *udata) { > struct rte_ring_sg_data sgd; > ..... > n =3D move_head_tail(r, ...); >=20 > /* get sgd data based on n */ > get_elem_addr(r, ..., &sgd); >=20 > /* call user defined function to fill reserved elems */ > wf(sgd.p1, sgd.n1, esize, udata); > if (n !=3D n1) > wf(sgd.p2, sgd.n2, esize, udata); >=20 > .... > return n; > } >=20 I think the call back function makes it difficult to use the API. The call = back function would be a wrapper around another function or API which will = have its own arguments. Now all those parameters have to passed using the '= udata'. For ex: in the 2nd example that I provided earlier, the user has to= create a wrapper around 'rte_eth_rx_burst' API and then provide the parame= ters to 'rte_eth_rx_burst' through 'udata'. 'udata' would need a structure = definition as well. > If we want ZC peek API also - some extra work need to be done with > introducing return value for write_ring_func() and checking it properly, = but I > don't see any big problems here too. > That way ZC API can support all sync models, plus we don't need to expose > sg_data to the user directly. Other modes can be supported with the method used in this patch as well. If= you see a need, I can add them. IMO, only issue with exposing sg_data is ABI compatibility in the future. I= think, we can align the 'struct rte_ring_sg_data' to cache line boundary a= nd it should provide ability to extend it in the future without affecting t= he ABI compatibility. > Also, in future, we probably can de-duplicate the code by making our non-= ZC > API to use that one internally (pass ring_enqueue_elems()/ob_table as a > parameters). >=20 > > + > > +static __rte_always_inline void > > +__rte_ring_get_elem_addr_64(struct rte_ring *r, uint32_t head, > > + uint32_t num, void **dst1, uint32_t *n1, void **dst2) { > > + uint32_t idx =3D head & r->mask; > > + uint64_t *ring =3D (uint64_t *)&r[1]; > > + > > + *dst1 =3D ring + idx; > > + *n1 =3D num; > > + > > + if (idx + num > r->size) { > > + *n1 =3D num - (r->size - idx - 1); > > + *dst2 =3D ring; > > + } > > +} > > + > > +static __rte_always_inline void > > +__rte_ring_get_elem_addr_128(struct rte_ring *r, uint32_t head, > > + uint32_t num, void **dst1, uint32_t *n1, void **dst2) { > > + uint32_t idx =3D head & r->mask; > > + rte_int128_t *ring =3D (rte_int128_t *)&r[1]; > > + > > + *dst1 =3D ring + idx; > > + *n1 =3D num; > > + > > + if (idx + num > r->size) { > > + *n1 =3D num - (r->size - idx - 1); > > + *dst2 =3D ring; > > + } > > +} > > + > > +static __rte_always_inline void > > +__rte_ring_get_elem_addr(struct rte_ring *r, uint32_t head, > > + uint32_t esize, uint32_t num, void **dst1, uint32_t *n1, void > > +**dst2) { > > + if (esize =3D=3D 8) > > + __rte_ring_get_elem_addr_64(r, head, > > + num, dst1, n1, dst2); > > + else if (esize =3D=3D 16) > > + __rte_ring_get_elem_addr_128(r, head, > > + num, dst1, n1, dst2); >=20 >=20 > I don't think we need that special handling for 8/16B sizes. > In all functions esize is an input parameter. > If user will specify is as a constant - compiler will be able to convert = multiply > to shift and add ops. Ok, I will check this out. >=20 > > + else { > > + uint32_t idx, scale, nr_idx; > > + uint32_t *ring =3D (uint32_t *)&r[1]; > > + > > + /* Normalize to uint32_t */ > > + scale =3D esize / sizeof(uint32_t); > > + idx =3D head & r->mask; > > + nr_idx =3D idx * scale; > > + > > + *dst1 =3D ring + nr_idx; > > + *n1 =3D num; > > + > > + if (idx + num > r->size) { > > + *n1 =3D num - (r->size - idx - 1); > > + *dst2 =3D ring; > > + } > > + } > > +} > > + > > +/** > > + * @internal This function moves prod head value. > > + */ > > +static __rte_always_inline unsigned int > > +__rte_ring_do_enqueue_sg_elem_start(struct rte_ring *r, unsigned int > esize, > > + uint32_t n, enum rte_ring_queue_behavior behavior, > > + struct rte_ring_sg_data *sgd, unsigned int *free_space) { > > + uint32_t free, head, next; > > + > > + switch (r->prod.sync_type) { > > + case RTE_RING_SYNC_ST: > > + n =3D __rte_ring_move_prod_head(r, RTE_RING_SYNC_ST, n, > > + behavior, &head, &next, &free); > > + __rte_ring_get_elem_addr(r, head, esize, n, (void **)&sgd- > >ptr1, > > + &sgd->n1, (void **)&sgd->ptr2); > > + break; > > + case RTE_RING_SYNC_MT_HTS: > > + n =3D __rte_ring_hts_move_prod_head(r, n, behavior, &head, > &free); > > + __rte_ring_get_elem_addr(r, head, esize, n, (void **)&sgd- > >ptr1, > > + &sgd->n1, (void **)&sgd->ptr2); > > + break; > > + case RTE_RING_SYNC_MT: > > + case RTE_RING_SYNC_MT_RTS: > > + default: > > + /* unsupported mode, shouldn't be here */ > > + RTE_ASSERT(0); > > + n =3D 0; > > + free =3D 0; > > + } > > + > > + if (free_space !=3D NULL) > > + *free_space =3D free - n; > > + return n; > > +} > > + > > +/** > > + * Start to enqueue several objects on the ring. > > + * Note that no actual objects are put in the queue by this function, > > + * it just reserves space for the user on the ring. > > + * User has to copy objects into the queue using the returned pointers= . > > + * User should call rte_ring_enqueue_sg_bulk_elem_finish to complete > > +the > > + * enqueue operation. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param esize > > + * The size of ring element, in bytes. It must be a multiple of 4. > > + * @param n > > + * The number of objects to add in the ring. > > + * @param sgd > > + * The scatter-gather data containing pointers for copying data. > > + * @param free_space > > + * if non-NULL, returns the amount of space in the ring after the > > + * reservation operation has finished. > > + * @return > > + * The number of objects that can be enqueued, either 0 or n > > + */ > > +__rte_experimental > > +static __rte_always_inline unsigned int > > +rte_ring_enqueue_sg_bulk_elem_start(struct rte_ring *r, unsigned int > esize, > > + unsigned int n, struct rte_ring_sg_data *sgd, unsigned int > > +*free_space) { > > + return __rte_ring_do_enqueue_sg_elem_start(r, esize, n, > > + RTE_RING_QUEUE_FIXED, sgd, free_space); } > > + > > +/** > > + * Start to enqueue several pointers to objects on the ring. > > + * Note that no actual pointers are put in the queue by this > > +function, > > + * it just reserves space for the user on the ring. > > + * User has to copy pointers to objects into the queue using the > > + * returned pointers. > > + * User should call rte_ring_enqueue_sg_bulk_finish to complete the > > + * enqueue operation. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param n > > + * The number of objects to add in the ring. > > + * @param sgd > > + * The scatter-gather data containing pointers for copying data. > > + * @param free_space > > + * if non-NULL, returns the amount of space in the ring after the > > + * reservation operation has finished. > > + * @return > > + * The number of objects that can be enqueued, either 0 or n > > + */ > > +__rte_experimental > > +static __rte_always_inline unsigned int > > +rte_ring_enqueue_sg_bulk_start(struct rte_ring *r, unsigned int n, > > + struct rte_ring_sg_data *sgd, unsigned int *free_space) { > > + return rte_ring_enqueue_sg_bulk_elem_start(r, sizeof(uintptr_t), n, > > + sgd, free_space); > > +} > > +/** > > + * Start to enqueue several objects on the ring. > > + * Note that no actual objects are put in the queue by this function, > > + * it just reserves space for the user on the ring. > > + * User has to copy objects into the queue using the returned pointers= . > > + * User should call rte_ring_enqueue_sg_bulk_elem_finish to complete > > +the > > + * enqueue operation. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param esize > > + * The size of ring element, in bytes. It must be a multiple of 4. > > + * @param n > > + * The number of objects to add in the ring. > > + * @param sgd > > + * The scatter-gather data containing pointers for copying data. > > + * @param free_space > > + * if non-NULL, returns the amount of space in the ring after the > > + * reservation operation has finished. > > + * @return > > + * The number of objects that can be enqueued, either 0 or n > > + */ > > +__rte_experimental > > +static __rte_always_inline unsigned int > > +rte_ring_enqueue_sg_burst_elem_start(struct rte_ring *r, unsigned int > esize, > > + unsigned int n, struct rte_ring_sg_data *sgd, unsigned int > > +*free_space) { > > + return __rte_ring_do_enqueue_sg_elem_start(r, esize, n, > > + RTE_RING_QUEUE_VARIABLE, sgd, free_space); } > > + > > +/** > > + * Start to enqueue several pointers to objects on the ring. > > + * Note that no actual pointers are put in the queue by this > > +function, > > + * it just reserves space for the user on the ring. > > + * User has to copy pointers to objects into the queue using the > > + * returned pointers. > > + * User should call rte_ring_enqueue_sg_bulk_finish to complete the > > + * enqueue operation. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param n > > + * The number of objects to add in the ring. > > + * @param sgd > > + * The scatter-gather data containing pointers for copying data. > > + * @param free_space > > + * if non-NULL, returns the amount of space in the ring after the > > + * reservation operation has finished. > > + * @return > > + * The number of objects that can be enqueued, either 0 or n > > + */ > > +__rte_experimental > > +static __rte_always_inline unsigned int > > +rte_ring_enqueue_sg_burst_start(struct rte_ring *r, unsigned int n, > > + struct rte_ring_sg_data *sgd, unsigned int *free_space) { > > + return rte_ring_enqueue_sg_burst_elem_start(r, sizeof(uintptr_t), > n, > > + sgd, free_space); > > +} > > + > > +/** > > + * Complete enqueuing several objects on the ring. > > + * Note that number of objects to enqueue should not exceed previous > > + * enqueue_start return value. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param n > > + * The number of objects to add to the ring. > > + */ > > +__rte_experimental > > +static __rte_always_inline void > > +rte_ring_enqueue_sg_elem_finish(struct rte_ring *r, unsigned int n) { > > + uint32_t tail; > > + > > + switch (r->prod.sync_type) { > > + case RTE_RING_SYNC_ST: > > + n =3D __rte_ring_st_get_tail(&r->prod, &tail, n); > > + __rte_ring_st_set_head_tail(&r->prod, tail, n, 1); > > + break; > > + case RTE_RING_SYNC_MT_HTS: > > + n =3D __rte_ring_hts_get_tail(&r->hts_prod, &tail, n); > > + __rte_ring_hts_set_head_tail(&r->hts_prod, tail, n, 1); > > + break; > > + case RTE_RING_SYNC_MT: > > + case RTE_RING_SYNC_MT_RTS: > > + default: > > + /* unsupported mode, shouldn't be here */ > > + RTE_ASSERT(0); > > + } > > +} > > + > > +/** > > + * Complete enqueuing several pointers to objects on the ring. > > + * Note that number of objects to enqueue should not exceed previous > > + * enqueue_start return value. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param n > > + * The number of pointers to objects to add to the ring. > > + */ > > +__rte_experimental > > +static __rte_always_inline void > > +rte_ring_enqueue_sg_finish(struct rte_ring *r, unsigned int n) { > > + rte_ring_enqueue_sg_elem_finish(r, n); } > > + > > +/** > > + * @internal This function moves cons head value and copies up to *n* > > + * objects from the ring to the user provided obj_table. > > + */ > > +static __rte_always_inline unsigned int > > +__rte_ring_do_dequeue_sg_elem_start(struct rte_ring *r, > > + uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior, > > + struct rte_ring_sg_data *sgd, unsigned int *available) { > > + uint32_t avail, head, next; > > + > > + switch (r->cons.sync_type) { > > + case RTE_RING_SYNC_ST: > > + n =3D __rte_ring_move_cons_head(r, RTE_RING_SYNC_ST, n, > > + behavior, &head, &next, &avail); > > + __rte_ring_get_elem_addr(r, head, esize, n, > > + sgd->ptr1, &sgd->n1, sgd->ptr2); > > + break; > > + case RTE_RING_SYNC_MT_HTS: > > + n =3D __rte_ring_hts_move_cons_head(r, n, behavior, > > + &head, &avail); > > + __rte_ring_get_elem_addr(r, head, esize, n, > > + sgd->ptr1, &sgd->n1, sgd->ptr2); > > + break; > > + case RTE_RING_SYNC_MT: > > + case RTE_RING_SYNC_MT_RTS: > > + default: > > + /* unsupported mode, shouldn't be here */ > > + RTE_ASSERT(0); > > + n =3D 0; > > + avail =3D 0; > > + } > > + > > + if (available !=3D NULL) > > + *available =3D avail - n; > > + return n; > > +} > > + > > +/** > > + * Start to dequeue several objects from the ring. > > + * Note that no actual objects are copied from the queue by this funct= ion. > > + * User has to copy objects from the queue using the returned pointers= . > > + * User should call rte_ring_dequeue_sg_bulk_elem_finish to complete > > +the > > + * dequeue operation. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param esize > > + * The size of ring element, in bytes. It must be a multiple of 4. > > + * @param n > > + * The number of objects to remove from the ring. > > + * @param sgd > > + * The scatter-gather data containing pointers for copying data. > > + * @param available > > + * If non-NULL, returns the number of remaining ring entries after t= he > > + * dequeue has finished. > > + * @return > > + * The number of objects that can be dequeued, either 0 or n > > + */ > > +__rte_experimental > > +static __rte_always_inline unsigned int > > +rte_ring_dequeue_sg_bulk_elem_start(struct rte_ring *r, unsigned int > esize, > > + unsigned int n, struct rte_ring_sg_data *sgd, unsigned int > > +*available) { > > + return __rte_ring_do_dequeue_sg_elem_start(r, esize, n, > > + RTE_RING_QUEUE_FIXED, sgd, available); } > > + > > +/** > > + * Start to dequeue several pointers to objects from the ring. > > + * Note that no actual pointers are removed from the queue by this > function. > > + * User has to copy pointers to objects from the queue using the > > + * returned pointers. > > + * User should call rte_ring_dequeue_sg_bulk_finish to complete the > > + * dequeue operation. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param n > > + * The number of objects to remove from the ring. > > + * @param sgd > > + * The scatter-gather data containing pointers for copying data. > > + * @param available > > + * If non-NULL, returns the number of remaining ring entries after t= he > > + * dequeue has finished. > > + * @return > > + * The number of objects that can be dequeued, either 0 or n > > + */ > > +__rte_experimental > > +static __rte_always_inline unsigned int > > +rte_ring_dequeue_sg_bulk_start(struct rte_ring *r, unsigned int n, > > + struct rte_ring_sg_data *sgd, unsigned int *available) { > > + return rte_ring_dequeue_sg_bulk_elem_start(r, sizeof(uintptr_t), > > + n, sgd, available); > > +} > > + > > +/** > > + * Start to dequeue several objects from the ring. > > + * Note that no actual objects are copied from the queue by this funct= ion. > > + * User has to copy objects from the queue using the returned pointers= . > > + * User should call rte_ring_dequeue_sg_burst_elem_finish to complete > > +the > > + * dequeue operation. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param esize > > + * The size of ring element, in bytes. It must be a multiple of 4. > > + * This must be the same value used while creating the ring. Otherwi= se > > + * the results are undefined. > > + * @param n > > + * The number of objects to dequeue from the ring. > > + * @param sgd > > + * The scatter-gather data containing pointers for copying data. > > + * @param available > > + * If non-NULL, returns the number of remaining ring entries after t= he > > + * dequeue has finished. > > + * @return > > + * The number of objects that can be dequeued, either 0 or n > > + */ > > +__rte_experimental > > +static __rte_always_inline unsigned int > > +rte_ring_dequeue_sg_burst_elem_start(struct rte_ring *r, unsigned int > esize, > > + unsigned int n, struct rte_ring_sg_data *sgd, unsigned int > > +*available) { > > + return __rte_ring_do_dequeue_sg_elem_start(r, esize, n, > > + RTE_RING_QUEUE_VARIABLE, sgd, available); } > > + > > +/** > > + * Start to dequeue several pointers to objects from the ring. > > + * Note that no actual pointers are removed from the queue by this > function. > > + * User has to copy pointers to objects from the queue using the > > + * returned pointers. > > + * User should call rte_ring_dequeue_sg_burst_finish to complete the > > + * dequeue operation. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param n > > + * The number of objects to remove from the ring. > > + * @param sgd > > + * The scatter-gather data containing pointers for copying data. > > + * @param available > > + * If non-NULL, returns the number of remaining ring entries after t= he > > + * dequeue has finished. > > + * @return > > + * The number of objects that can be dequeued, either 0 or n > > + */ > > +__rte_experimental > > +static __rte_always_inline unsigned int > > +rte_ring_dequeue_sg_burst_start(struct rte_ring *r, unsigned int n, > > + struct rte_ring_sg_data *sgd, unsigned int *available) { > > + return rte_ring_dequeue_sg_burst_elem_start(r, sizeof(uintptr_t), > n, > > + sgd, available); > > +} > > + > > +/** > > + * Complete dequeuing several objects from the ring. > > + * Note that number of objects to dequeued should not exceed previous > > + * dequeue_start return value. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param n > > + * The number of objects to remove from the ring. > > + */ > > +__rte_experimental > > +static __rte_always_inline void > > +rte_ring_dequeue_sg_elem_finish(struct rte_ring *r, unsigned int n) { > > + uint32_t tail; > > + > > + switch (r->cons.sync_type) { > > + case RTE_RING_SYNC_ST: > > + n =3D __rte_ring_st_get_tail(&r->cons, &tail, n); > > + __rte_ring_st_set_head_tail(&r->cons, tail, n, 0); > > + break; > > + case RTE_RING_SYNC_MT_HTS: > > + n =3D __rte_ring_hts_get_tail(&r->hts_cons, &tail, n); > > + __rte_ring_hts_set_head_tail(&r->hts_cons, tail, n, 0); > > + break; > > + case RTE_RING_SYNC_MT: > > + case RTE_RING_SYNC_MT_RTS: > > + default: > > + /* unsupported mode, shouldn't be here */ > > + RTE_ASSERT(0); > > + } > > +} > > + > > +/** > > + * Complete dequeuing several objects from the ring. > > + * Note that number of objects to dequeued should not exceed previous > > + * dequeue_start return value. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param n > > + * The number of objects to remove from the ring. > > + */ > > +__rte_experimental > > +static __rte_always_inline void > > +rte_ring_dequeue_sg_finish(struct rte_ring *r, unsigned int n) { > > + rte_ring_dequeue_elem_finish(r, n); > > +} > > + > > +#ifdef __cplusplus > > +} > > +#endif > > + > > +#endif /* _RTE_RING_PEEK_SG_H_ */ > > -- > > 2.17.1