From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from dpdk.org (dpdk.org [92.243.14.124]) by inbox.dpdk.org (Postfix) with ESMTP id E6FDAA0563; Fri, 28 Feb 2020 01:18:50 +0100 (CET) Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id BE8432C4F; Fri, 28 Feb 2020 01:18:50 +0100 (CET) Received: from EUR01-HE1-obe.outbound.protection.outlook.com (mail-eopbgr130055.outbound.protection.outlook.com [40.107.13.55]) by dpdk.org (Postfix) with ESMTP id 49FA12C02 for ; Fri, 28 Feb 2020 01:18:49 +0100 (CET) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=armh.onmicrosoft.com; s=selector2-armh-onmicrosoft-com; h=From:Date:Subject:Message-ID:Content-Type:MIME-Version:X-MS-Exchange-SenderADCheck; bh=+rj4Ca8MF3zu9qSVuo1RT1E8mMLCzY83PO9brMi9Nfc=; b=kaqaY2PjuIC0TOUDateWWRjSbSTdlRrIrXGE/v5n/Drdxse7h3mO0N24iU+IpeopCC2slbLTgmSJRUCFFKpkF4pvGl6RW3Hc0fVf7smh/0ApDTU86FoFJ508EJ/zPSvAWf8yYt6JNiYlMqajLXZ0V6h8kZs6V++GAR5j9sOhbGE= Received: from DB6PR0801CA0053.eurprd08.prod.outlook.com (2603:10a6:4:2b::21) by VI1PR0802MB2286.eurprd08.prod.outlook.com (2603:10a6:800:9e::13) with Microsoft SMTP Server (version=TLS1_2, cipher=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384) id 15.20.2750.21; Fri, 28 Feb 2020 00:18:46 +0000 Received: from VE1EUR03FT011.eop-EUR03.prod.protection.outlook.com (2a01:111:f400:7e09::201) by DB6PR0801CA0053.outlook.office365.com (2603:10a6:4:2b::21) with Microsoft SMTP Server (version=TLS1_2, cipher=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384) id 15.20.2772.14 via Frontend Transport; Fri, 28 Feb 2020 00:18:46 +0000 Authentication-Results: spf=pass (sender IP is 63.35.35.123) smtp.mailfrom=arm.com; dpdk.org; dkim=pass (signature was verified) header.d=armh.onmicrosoft.com;dpdk.org; dmarc=bestguesspass action=none header.from=arm.com; Received-SPF: Pass (protection.outlook.com: domain of arm.com designates 63.35.35.123 as permitted sender) receiver=protection.outlook.com; client-ip=63.35.35.123; helo=64aa7808-outbound-1.mta.getcheckrecipient.com; Received: from 64aa7808-outbound-1.mta.getcheckrecipient.com (63.35.35.123) by VE1EUR03FT011.mail.protection.outlook.com (10.152.18.134) with Microsoft SMTP Server (version=TLS1_2, cipher=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384) id 15.20.2772.14 via Frontend Transport; Fri, 28 Feb 2020 00:18:46 +0000 Received: ("Tessian outbound da94dc68d1bb:v42"); Fri, 28 Feb 2020 00:18:46 +0000 X-CR-MTA-TID: 64aa7808 Received: from b30fab26e941.1 by 64aa7808-outbound-1.mta.getcheckrecipient.com id 098B92E2-5E06-437B-B5C3-DD12FD20B587.1; Fri, 28 Feb 2020 00:18:40 +0000 Received: from EUR03-VE1-obe.outbound.protection.outlook.com by 64aa7808-outbound-1.mta.getcheckrecipient.com with ESMTPS id b30fab26e941.1 (version=TLSv1.2 cipher=ECDHE-RSA-AES256-GCM-SHA384); Fri, 28 Feb 2020 00:18:40 +0000 ARC-Seal: i=1; a=rsa-sha256; s=arcselector9901; d=microsoft.com; cv=none; b=n0GsqqfTuO/Ts8HKXqKxrQn5XjTdexH7ZjQOHgGNjZDIgOkvZ7eiQ/TREWqlD9UtrBgYhieXBAeZtubN4XC9w0cN/niralgzedRAnqJ4/NHhFOOqp1Ikl4T1ItRwrX2o7C9ULFT0N5CcezRTgBV9iH56DLlZNc5yJGBJ4BeW+RVJKv4H29g4BJh85yQ+uojINdj9FbNgPEGOgP6+3FnILvf3DRrc5Jkkfn8RSAgRrAgh4Zvq+MkPUJxtEctfEd5fxieRQ31Cm1iD76tBYflE1Cs8e326vReWv9ucUAt1oK6rBQznG3r76YzRTjc8DFam8eot0fOr/q08eqGugvVZsg== ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=microsoft.com; s=arcselector9901; h=From:Date:Subject:Message-ID:Content-Type:MIME-Version:X-MS-Exchange-SenderADCheck; bh=+rj4Ca8MF3zu9qSVuo1RT1E8mMLCzY83PO9brMi9Nfc=; b=X+gXdc6V8pYBjFF5MMm2oweWJblR0Z811I1L6vqQ/NAL3dOduslSh/t8tcAE0Z8rPx80+EWJc7Q84EHnlCIgh4SfS1WAwiIShElkvVpXaFgYQxnQSQgwuTbY75RyaLVkUx104Sm1Gmb4ba14EIeQppKnr1Ol8c0J1k/1xDsXi7FkVMMrky1dgHfW3U/twt+FXg7482wKb1TUluVf1K1LstAxt68nBkuzb7zQ8Q0IRXNPIvaTzxMa/y1Y8QUlQMACSe0dASSawohYMUwyaBnFiXIWtP3WbbTkT0H0RvyqTkYRo5kLw8c+53+VVBFOconJf4ywHZIb3K+kNuYDh6C2Zg== ARC-Authentication-Results: i=1; mx.microsoft.com 1; spf=pass smtp.mailfrom=arm.com; dmarc=pass action=none header.from=arm.com; dkim=pass header.d=arm.com; arc=none DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=armh.onmicrosoft.com; s=selector2-armh-onmicrosoft-com; h=From:Date:Subject:Message-ID:Content-Type:MIME-Version:X-MS-Exchange-SenderADCheck; bh=+rj4Ca8MF3zu9qSVuo1RT1E8mMLCzY83PO9brMi9Nfc=; b=kaqaY2PjuIC0TOUDateWWRjSbSTdlRrIrXGE/v5n/Drdxse7h3mO0N24iU+IpeopCC2slbLTgmSJRUCFFKpkF4pvGl6RW3Hc0fVf7smh/0ApDTU86FoFJ508EJ/zPSvAWf8yYt6JNiYlMqajLXZ0V6h8kZs6V++GAR5j9sOhbGE= Received: from VE1PR08MB5149.eurprd08.prod.outlook.com (20.179.30.27) by VE1PR08MB4926.eurprd08.prod.outlook.com (10.255.114.22) with Microsoft SMTP Server (version=TLS1_2, cipher=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384) id 15.20.2750.21; Fri, 28 Feb 2020 00:18:37 +0000 Received: from VE1PR08MB5149.eurprd08.prod.outlook.com ([fe80::29eb:a1be:8f8f:fae2]) by VE1PR08MB5149.eurprd08.prod.outlook.com ([fe80::29eb:a1be:8f8f:fae2%7]) with mapi id 15.20.2750.024; Fri, 28 Feb 2020 00:18:37 +0000 From: Honnappa Nagarahalli To: "Ananyev, Konstantin" , "olivier.matz@6wind.com" CC: Gavin Hu , "dev@dpdk.org" , nd , Honnappa Nagarahalli , nd Thread-Topic: [RFC 1/1] lib/ring: add scatter gather and serial dequeue APIs Thread-Index: AQHV7OTEiOImxZdrNE6vzPRTj2NEk6gvbsUw Date: Fri, 28 Feb 2020 00:18:36 +0000 Message-ID: References: <20200224203931.21256-1-honnappa.nagarahalli@arm.com> <20200224203931.21256-2-honnappa.nagarahalli@arm.com> In-Reply-To: Accept-Language: en-US Content-Language: en-US X-MS-Has-Attach: X-MS-TNEF-Correlator: x-ts-tracking-id: f6a2f977-b913-4927-a855-160368d17be3.0 x-checkrecipientchecked: true Authentication-Results-Original: spf=none (sender IP is ) smtp.mailfrom=Honnappa.Nagarahalli@arm.com; x-originating-ip: [217.140.111.135] x-ms-publictraffictype: Email X-MS-Office365-Filtering-HT: Tenant X-MS-Office365-Filtering-Correlation-Id: 5ce5339c-2738-4d46-1761-08d7bbe3c6e3 X-MS-TrafficTypeDiagnostic: VE1PR08MB4926:|VE1PR08MB4926:|VI1PR0802MB2286: x-ms-exchange-transport-forked: True X-Microsoft-Antispam-PRVS: x-checkrecipientrouted: true nodisclaimer: true x-ms-oob-tlc-oobclassifiers: OLM:10000;OLM:10000; x-forefront-prvs: 0327618309 X-Forefront-Antispam-Report-Untrusted: SFV:NSPM; SFS:(10009020)(4636009)(366004)(39860400002)(376002)(136003)(346002)(396003)(199004)(189003)(186003)(54906003)(8936002)(4326008)(2906002)(81166006)(110136005)(5660300002)(8676002)(81156014)(9686003)(52536014)(66446008)(64756008)(66476007)(30864003)(66946007)(26005)(66556008)(55016002)(7696005)(71200400001)(86362001)(478600001)(33656002)(6506007)(316002)(76116006)(966005)(21314003)(579004); DIR:OUT; SFP:1101; SCL:1; SRVR:VE1PR08MB4926; H:VE1PR08MB5149.eurprd08.prod.outlook.com; FPR:; SPF:None; LANG:en; PTR:InfoNoRecords; A:1; MX:1; received-spf: None (protection.outlook.com: arm.com does not designate permitted sender hosts) X-MS-Exchange-SenderADCheck: 1 X-Microsoft-Antispam-Untrusted: BCL:0; X-Microsoft-Antispam-Message-Info-Original: YZ+batH1sLefKG+LvA7v/z6w1fF7VpyguAaC8Rx63l/ARnpo34Xe8dq0p1QxpQkxqln23tjFlSqfxUhte7FYcQssyUN1MMTNoktLqjeWcN76XxCYd7sirp1t6hegHguF9+MJL1kUww8y1TdlxIxjy90hm9G83Dd4RBjrRwyzouPmE9EZV4oe6PnAtKj3a+pDR+igW6BteTvzZcyDr5fGBxvyMEUXlV8vinnJHs3senkJ8WgijlQ62QHZaYEdcX7etudefwYo1c0MyzT7uZQI8mCtv6dARYZ4guk1ze37q30ZEmQprZgKDdG3e9WhTGgUf0VnL0ecqCLni7wG0QvF27kVt6Lq9AvO2w/wQcXOZlHXA11cH48G3vH8ez1VGuTdK6vlcFvKvG8raNehPDj7O/hXOcT7ACtQ0b5rxsyI1UDYCLY9E5n2mYbf4Lz/PuU6S8t5aLAGWpfw6S3AHzr0C0LlMHeWnV740PXscNviVHclKaH+uPsPwJ4eomSDnNM59mjfaaDAqGNwzK/ysOgbcaP8O4vpmRUtHNumO7BAE3haUy2A1m87UVynz6k6pdoM x-ms-exchange-antispam-messagedata: l2cNcSpU51wYS1qFj+jwwOgtVGppRC+Y96E9YWDQBhut5mQez9cu2vlrIra/DH/RijxBwRme1cffuzEonN/lZJqGmiG4PaXNGGUJ1ySxIWpas/D57CKxrO0Si3Zs5fJco/+HUbr1IUONf0il/angjg== Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: quoted-printable MIME-Version: 1.0 X-MS-Exchange-Transport-CrossTenantHeadersStamped: VE1PR08MB4926 Original-Authentication-Results: spf=none (sender IP is ) smtp.mailfrom=Honnappa.Nagarahalli@arm.com; X-EOPAttributedMessage: 0 X-MS-Exchange-Transport-CrossTenantHeadersStripped: VE1EUR03FT011.eop-EUR03.prod.protection.outlook.com X-Forefront-Antispam-Report: CIP:63.35.35.123; IPV:CAL; SCL:-1; CTRY:IE; EFV:NLI; SFV:NSPM; SFS:(10009020)(4636009)(136003)(39860400002)(346002)(376002)(396003)(189003)(199004)(186003)(26826003)(86362001)(9686003)(6506007)(52536014)(26005)(7696005)(478600001)(55016002)(356004)(81156014)(966005)(8676002)(70206006)(8936002)(81166006)(5660300002)(54906003)(30864003)(316002)(2906002)(4326008)(336012)(70586007)(33656002)(36906005)(110136005)(21314003); DIR:OUT; SFP:1101; SCL:1; SRVR:VI1PR0802MB2286; H:64aa7808-outbound-1.mta.getcheckrecipient.com; FPR:; SPF:Pass; LANG:en; PTR:ec2-63-35-35-123.eu-west-1.compute.amazonaws.com; MX:1; A:1; X-MS-Office365-Filtering-Correlation-Id-Prvs: 94a7c5fc-57bf-4220-a121-08d7bbe3c13e X-Forefront-PRVS: 0327618309 X-Microsoft-Antispam: BCL:0; X-Microsoft-Antispam-Message-Info: fH8mH5xOx4c4cuBvbKOX/0Y7SSK4ZwM50YgmyNAIMZ09hVQkVWdheHeqbVdXGbnvNUhlcy9yqxmzOGPmmwOjBxACm+VqMG/r63id71dTd1SR4pgOj9NQ8Q63qAEqXBsh7e2y78zhkOnX88c9ynBEnzGXFmeFFuVG4pN7bq2k5IVeulSiXA/bAS3cw9WjCqzwIQGxTgQ4n0Ahk6SrAKA8MY0N38dXp8P0ibaIQaWJTUkX4CrkDrtFEqcnWlDcGQNBqemvV8xWTEe55XrVaDYrVyoZl43dQvpL/DYn9k9eb46jHo/2mbhAhowJjnGWFD398KVmQsK4S6cd/PT8OYOpFEjLvAYNWHsNVXv9/ByQAVjSE3h9gZom5WibvIkIMizt+xV2JRia91whjqKlf53GCPs+eQVNpxSHAxwwWNxOkMlcYYXbU6mzcPPgV2OeLY60rvSukWlFAq63E56xV7A7+hkVCI0qO2vID/bB7z7sYyaP+9ehsoRxv0LabVVq2Zcs6If6iItbsCOKsME/ZpwqPxy1BxhbSDNDy7EqDzwMlD6H2n1bqWSXz4IHYzEgSuVo X-OriginatorOrg: arm.com X-MS-Exchange-CrossTenant-OriginalArrivalTime: 28 Feb 2020 00:18:46.3515 (UTC) X-MS-Exchange-CrossTenant-Network-Message-Id: 5ce5339c-2738-4d46-1761-08d7bbe3c6e3 X-MS-Exchange-CrossTenant-Id: f34e5979-57d9-4aaa-ad4d-b122a662184d X-MS-Exchange-CrossTenant-OriginalAttributedTenantConnectingIp: TenantId=f34e5979-57d9-4aaa-ad4d-b122a662184d; Ip=[63.35.35.123]; Helo=[64aa7808-outbound-1.mta.getcheckrecipient.com] X-MS-Exchange-CrossTenant-FromEntityHeader: HybridOnPrem X-MS-Exchange-Transport-CrossTenantHeadersStamped: VI1PR0802MB2286 Subject: Re: [dpdk-dev] [RFC 1/1] lib/ring: add scatter gather and serial dequeue APIs X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" >=20 >=20 > Hi Honnappa, Thanks Konstantin for the comments. >=20 > > Add scatter gather APIs to avoid intermediate memcpy. Serial dequeue > > APIs are added to support access to ring elements before actual > > dequeue. > > > > Signed-off-by: Honnappa Nagarahalli > > Reviewed-by: Gavin Hu > > Reviewed-by: Ola Liljedahl > > --- > > lib/librte_ring/Makefile | 1 + > > lib/librte_ring/meson.build | 1 + > > lib/librte_ring/rte_ring_c11_mem.h | 98 +++++++ > > lib/librte_ring/rte_ring_elem_sg.h | 417 +++++++++++++++++++++++++++++ > > lib/librte_ring/rte_ring_generic.h | 93 +++++++ > > 5 files changed, 610 insertions(+) >=20 > As was already noticed by you this patch overlaps quite a bit with anothe= r > one: > http://patches.dpdk.org/patch/66006/ I took a cursory look at this. I need to take a detailed look, plan to do s= o soon. >=20 > Though it seems there are few significant differences in our approaches (= both > API and implementation). > So we probably need to come-up with some common view first, before > moving forward with some unified version. > To start a discussion, I produced some comments, pls see below. >=20 > I don't see changes in rte_ring.h itself, but I suppose that's just becau= se it is an > RFC and it would be added in later versions? I did not plan to add them. IMO, we should not add new APIs to that list. W= e should encourage using the rte_ring_xxx_elem APIs should be used going fo= rward. They are interoperable (I mean, the application can call a mix of AP= Is) > Another similar question there seems only _bulk_ (RTE_RING_QUEUE_FIXED) > mode, I suppose _burst_ will also be added in later versions? Here, I was trying to avoid providing APIs without a clear need (_bulk_ is = enough for RCU for now). If you see the need, I can add them. >=20 > > diff --git a/lib/librte_ring/rte_ring_elem_sg.h > > b/lib/librte_ring/rte_ring_elem_sg.h > > new file mode 100644 > > index 000000000..a73f4fbfe > > --- /dev/null > > +++ b/lib/librte_ring/rte_ring_elem_sg.h > > @@ -0,0 +1,417 @@ > > +/* SPDX-License-Identifier: BSD-3-Clause > > + * > > + * Copyright (c) 2020 Arm Limited > > + * Copyright (c) 2010-2017 Intel Corporation > > + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org > > + * All rights reserved. > > + * Derived from FreeBSD's bufring.h > > + * Used as BSD-3 Licensed with permission from Kip Macy. > > + */ > > + > > +#ifndef _RTE_RING_ELEM_SG_H_ > > +#define _RTE_RING_ELEM_SG_H_ > > + > > +/** > > + * @file > > + * RTE Ring with > > + * 1) user defined element size > > + * 2) scatter gather feature to copy objects to/from the ring > > + * 3) ability to reserve, consume/discard elements in the ring */ > > + > > +#ifdef __cplusplus > > +extern "C" { > > +#endif > > + > > +#include > > +#include > > +#include > > +#include > > +#include > > +#include > > +#include > > +#include > > +#include > > +#include > > +#include > > +#include > > +#include > > + > > +#include "rte_ring.h" > > +#include "rte_ring_elem.h" > > + > > +/* Between load and load. there might be cpu reorder in weak model > > + * (powerpc/arm). > > + * There are 2 choices for the users > > + * 1.use rmb() memory barrier > > + * 2.use one-direction load_acquire/store_release barrier,defined by > > + * CONFIG_RTE_USE_C11_MEM_MODEL=3Dy > > + * It depends on performance test results. > > + * By default, move common functions to rte_ring_generic.h */ #ifdef > > +RTE_USE_C11_MEM_MODEL #include "rte_ring_c11_mem.h" > > +#else > > +#include "rte_ring_generic.h" > > +#endif > > + > > +static __rte_always_inline void > > +__rte_ring_get_elem_addr_64(struct rte_ring *r, uint32_t head, > > + uint32_t num, void **dst1, uint32_t *n1, void **dst2) { > > + uint32_t idx =3D head & r->mask; > > + uint64_t *ring =3D (uint64_t *)&r[1]; > > + > > + *dst1 =3D ring + idx; > > + *n1 =3D num; > > + > > + if (idx + num > r->size) { > > + *n1 =3D num - (r->size - idx - 1); > > + *dst2 =3D ring; > > + } > > +} > > + > > +static __rte_always_inline void > > +__rte_ring_get_elem_addr_128(struct rte_ring *r, uint32_t head, > > + uint32_t num, void **dst1, uint32_t *n1, void **dst2) { > > + uint32_t idx =3D head & r->mask; > > + rte_int128_t *ring =3D (rte_int128_t *)&r[1]; > > + > > + *dst1 =3D ring + idx; > > + *n1 =3D num; > > + > > + if (idx + num > r->size) { > > + *n1 =3D num - (r->size - idx - 1); > > + *dst2 =3D ring; > > + } > > +} > > + > > +static __rte_always_inline void > > +__rte_ring_get_elem_addr(struct rte_ring *r, uint32_t head, > > + uint32_t esize, uint32_t num, void **dst1, uint32_t *n1, void > > +**dst2) { > > + if (esize =3D=3D 8) > > + return __rte_ring_get_elem_addr_64(r, head, > > + num, dst1, n1, dst2); > > + else if (esize =3D=3D 16) > > + return __rte_ring_get_elem_addr_128(r, head, > > + num, dst1, n1, dst2); > > + else { > > + uint32_t idx, scale, nr_idx; > > + uint32_t *ring =3D (uint32_t *)&r[1]; > > + > > + /* Normalize to uint32_t */ > > + scale =3D esize / sizeof(uint32_t); > > + idx =3D head & r->mask; > > + nr_idx =3D idx * scale; > > + > > + *dst1 =3D ring + nr_idx; > > + *n1 =3D num; > > + > > + if (idx + num > r->size) { > > + *n1 =3D num - (r->size - idx - 1); > > + *dst2 =3D ring; > > + } > > + } > > +} > > + > > +/** > > + * @internal Reserve ring elements to enqueue several objects on the > > +ring > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param esize > > + * The size of ring element, in bytes. It must be a multiple of 4. > > + * This must be the same value used while creating the ring. Otherwi= se > > + * the results are undefined. > > + * @param n > > + * The number of elements to reserve in the ring. > > + * @param behavior > > + * RTE_RING_QUEUE_FIXED: Reserve a fixed number of elements from = a > ring > > + * RTE_RING_QUEUE_VARIABLE: Reserve as many elements as possible > from ring > > + * @param is_sp > > + * Indicates whether to use single producer or multi-producer reserv= e > > + * @param old_head > > + * Producer's head index before reservation. > > + * @param new_head > > + * Producer's head index after reservation. > > + * @param free_space > > + * returns the amount of space after the reserve operation has finis= hed. > > + * It is not updated if the number of reserved elements is zero. > > + * @param dst1 > > + * Pointer to location in the ring to copy the data. > > + * @param n1 > > + * Number of elements to copy at dst1 > > + * @param dst2 > > + * In case of ring wrap around, this pointer provides the location t= o > > + * copy the remaining elements. The number of elements to copy at th= is > > + * location is equal to (number of elements reserved - n1) > > + * @return > > + * Actual number of elements reserved. > > + * If behavior =3D=3D RTE_RING_QUEUE_FIXED, this will be 0 or n only= . > > + */ > > +static __rte_always_inline unsigned int > > +__rte_ring_do_enqueue_elem_reserve(struct rte_ring *r, unsigned int > > +esize, >=20 >=20 > I do understand the purpose of reserve, then either commit/abort for seri= al > sync mode, but what is the purpose of non-serial version of reserve/commi= t? In RCU, I have the need for scatter-gather feature. i.e. the data in the ri= ng element is coming from multiple sources ('token' is generated by the RCU= library and the application provides additional data). If I do not provide= the reserve/commit, I need to introduce an intermediate memcpy to get thes= e two data contiguously to copy to the ring element. The sequence is 'reser= ve(1), memcpy1, mempcy2, commit(1)'. Hence, you do not see the abort API fo= r the enqueue. =20 > In serial MP/MC case, after _reserve_(n) you always have to do > _commit_(n) - you can't reduce number of elements, or do _abort_. Agree, the intention here is to provide the scatter/gather feature. > Again you cannot avoid memcpy(n) here anyhow. > So what is the point of these functions for non-serial case? It avoids an intermediate memcpy when the data is coming from multiple sour= ces. >=20 > BTW, I think it would be good to have serial version of _enqueue_ too. If there is a good use case, they should be provided. I did not come across= a good use case. >=20 > > + unsigned int n, enum rte_ring_queue_behavior behavior, > > + unsigned int is_sp, unsigned int *old_head, > > + unsigned int *new_head, unsigned int *free_space, > > + void **dst1, unsigned int *n1, void **dst2) >=20 > I do understand the intention to avoid memcpy(), but proposed API seems > overcomplicated, error prone, and not very convenient for the user. The issue is the need to handle the wrap around in ring storage array. i.e.= when the space is reserved for more than 1 ring element, the wrap around m= ight happen. > I don't think that avoiding memcpy() will save us that many cycles here, = so This depends on the amount of data being copied. > probably better to keep API model a bit more regular: >=20 > n =3D rte_ring_mp_serial_enqueue_bulk_reserve(ring, num, &free_space); ..= . > /* performs actual memcpy(), m<=3Dn */ > rte_ring_mp_serial_enqueue_bulk_commit(ring, obj, m); These do not take care of the wrap-around case or I am not able to understa= nd your comment. >=20 > /* performs actual memcpy for num elems */ n =3D > rte_ring_mp_serial_dequeue_bulk_reserve(ring, obj, num, &free_space); ...= . > /* m<=3Dn */ > rte_ring_mp_serial_dequeue_bulk_commit(ring, obj, m); >=20 > Plus, we can have usual enqueue/dequeue API for serial sync mode: > rte_ring_serial_(enqueue/dequeue)_(bulk/burst) It would be good to understand the use cases. IMO, if we do not have use ca= ses, we should not add for now. We can add them as and when the use cases a= re understood. >=20 > > +{ > > + uint32_t free_entries; > > + > > + n =3D __rte_ring_move_prod_head(r, is_sp, n, behavior, > > + old_head, new_head, &free_entries); > > + > > + if (n =3D=3D 0) > > + goto end; >=20 > Here and in other similar places, why not just 'return 0;'? Yes, should be possible. >=20 > > + > > + __rte_ring_get_elem_addr(r, *old_head, esize, n, dst1, n1, dst2); > > + > > + if (free_space !=3D NULL) > > + *free_space =3D free_entries - n; > > + > > +end: > > + return n; > > +} > > + > > +/** > > + * @internal Consume previously reserved ring elements (for enqueue) > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param old_head > > + * Producer's head index before reservation. > > + * @param new_head > > + * Producer's head index after reservation. > > + * @param is_sp > > + * Indicates whether to use single producer or multi-producer head > update > > + */ > > +static __rte_always_inline void > > +__rte_ring_do_enqueue_elem_commit(struct rte_ring *r, > > + unsigned int old_head, unsigned int new_head, > > + unsigned int is_sp) > > +{ > > + update_tail(&r->prod, old_head, new_head, is_sp, 1); } > > + > > +/** > > + * Reserve one element for enqueuing one object on a ring > > + * (multi-producers safe). Application must call > > + * 'rte_ring_mp_enqueue_elem_commit' to complete the enqueue > operation. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param esize > > + * The size of ring element, in bytes. It must be a multiple of 4. > > + * This must be the same value used while creating the ring. Otherwi= se > > + * the results are undefined. > > + * @param old_head > > + * Producer's head index before reservation. The same should be pass= ed > to > > + * 'rte_ring_mp_enqueue_elem_commit' function. > > + * @param new_head > > + * Producer's head index after reservation. The same should be passe= d to > > + * 'rte_ring_mp_enqueue_elem_commit' function. > > + * @param free_space > > + * Returns the amount of space after the reservation operation has > finished. > > + * It is not updated if the number of reserved elements is zero. > > + * @param dst > > + * Pointer to location in the ring to copy the data. > > + * @return > > + * - 0: Success; objects enqueued. > > + * - -ENOBUFS: Not enough room in the ring to reserve; no element is > reserved. > > + */ > > +static __rte_always_inline int > > +rte_ring_mp_enqueue_elem_reserve(struct rte_ring *r, unsigned int esiz= e, > > + unsigned int *old_head, unsigned int *new_head, > > + unsigned int *free_space, void **dst) { > > + unsigned int n; > > + > > + return __rte_ring_do_enqueue_elem_reserve(r, esize, 1, > > + RTE_RING_QUEUE_FIXED, 0, old_head, new_head, > > + free_space, dst, &n, NULL) ? 0 : -ENOBUFS; } > > + > > +/** > > + * Consume previously reserved elements (for enqueue) in a ring > > + * (multi-producers safe). This API completes the enqueue operation. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param old_head > > + * Producer's head index before reservation. This value was returned > > + * when the API 'rte_ring_mp_enqueue_elem_reserve' was called. > > + * @param new_head > > + * Producer's head index after reservation. This value was returned > > + * when the API 'rte_ring_mp_enqueue_elem_reserve' was called. > > + */ > > +static __rte_always_inline void > > +rte_ring_mp_enqueue_elem_commit(struct rte_ring *r, unsigned int > old_head, > > + unsigned int new_head) > > +{ > > + __rte_ring_do_enqueue_elem_commit(r, old_head, new_head, 0); } > > + > > +/** > > + * @internal Reserve elements to dequeue several objects on the ring. > > + * This function blocks if there are elements reserved already. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param esize > > + * The size of ring element, in bytes. It must be a multiple of 4. > > + * This must be the same value used while creating the ring. Otherwi= se > > + * the results are undefined. > > + * @param n > > + * The number of objects to reserve in the ring > > + * @param behavior > > + * RTE_RING_QUEUE_FIXED: Reserve fixed number of elements in a ri= ng > > + * RTE_RING_QUEUE_VARIABLE: Reserve as many elements as possible in > a ring > > + * @param is_sc > > + * Indicates whether to use single consumer or multi-consumer head > update > > + * @param old_head > > + * Consumer's head index before reservation. > > + * @param new_head > > + * Consumer's head index after reservation. > > + * @param available > > + * returns the number of remaining ring elements after the reservati= on > > + * It is not updated if the number of reserved elements is zero. > > + * @param src1 > > + * Pointer to location in the ring to copy the data from. > > + * @param n1 > > + * Number of elements to copy from src1 > > + * @param src2 > > + * In case of wrap around in the ring, this pointer provides the loc= ation > > + * to copy the remaining elements from. The number of elements to co= py > from > > + * this pointer is equal to (number of elements reserved - n1) > > + * @return > > + * Actual number of elements reserved. > > + * If behavior =3D=3D RTE_RING_QUEUE_FIXED, this will be 0 or n only= . > > + */ > > +static __rte_always_inline unsigned int > > +__rte_ring_do_dequeue_elem_reserve_serial(struct rte_ring *r, > > + unsigned int esize, unsigned int n, > > + enum rte_ring_queue_behavior behavior, unsigned int is_sc, > > + unsigned int *old_head, unsigned int *new_head, > > + unsigned int *available, void **src1, unsigned int *n1, > > + void **src2) > > +{ > > + uint32_t entries; > > + > > + n =3D __rte_ring_move_cons_head_serial(r, is_sc, n, behavior, > > + old_head, new_head, &entries); > > + > > + if (n =3D=3D 0) > > + goto end; > > + > > + __rte_ring_get_elem_addr(r, *old_head, esize, n, src1, n1, src2); > > + > > + if (available !=3D NULL) > > + *available =3D entries - n; > > + > > +end: > > + return n; > > +} > > + > > +/** > > + * @internal Consume previously reserved ring elements (for dequeue) > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param old_head > > + * Consumer's head index before reservation. > > + * @param new_head > > + * Consumer's head index after reservation. > > + * @param is_sc > > + * Indicates whether to use single consumer or multi-consumer head > update > > + */ > > +static __rte_always_inline void > > +__rte_ring_do_dequeue_elem_commit(struct rte_ring *r, > > + unsigned int old_head, unsigned int new_head, > > + unsigned int is_sc) > > +{ >=20 > I think it is a bit dangerous and error-prone approach to let user specif= y > old_head/new_head manually. old_head and new_head are local to the thread in the non-serial MP/MC case.= Hence, they need to be returned back to the caller. > Seems better just _commit(ring, num) - see above. This would not work for non-serial cases. > That way suer don't have to calculate new head mannualy, I do not understand the 'calculate' part. The user has to provide the same = values that were returned. > plus we can have a check that ring.tail - ring.head >=3D num. >=20 > > + update_tail(&r->cons, old_head, new_head, is_sc, 1); >=20 > I think update_tail() is not enough here. > As in some cases we need to update ring.head also: > let say user reserved 2 elems, but then decided to commit only one. This patch does not address that use case. Do you see use cases for this? > So I think we need a special new function instead of update_tail() here. > BTW, in HTS I use atomic 64-bit read/write to get/set both head and tail = in > one go. > This is not really required - two 32bit ops would work too, I think. > As usual, both ways have some pros and cons: > using 64bit ops might be faster on 64-bit target, plus it is less error p= rone (no > need to think about head/tail read/write ordering, etc.), though for 32-b= it > target it would mean some extra overhead. >=20 > > +} > > + > > +/** > > + * Reserve one element on a ring for dequeue. This function blocks if > > +there > > + * are elements reserved already. Application must call > > + * 'rte_ring_do_dequeue_elem_commit' or > > + * `rte_ring_do_dequeue_elem_revert_serial' to complete the dequeue > operation. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param esize > > + * The size of ring element, in bytes. It must be a multiple of 4. > > + * This must be the same value used while creating the ring. Otherwi= se > > + * the results are undefined. > > + * @param old_head > > + * Consumer's head index before reservation. The same should be pass= ed > to > > + * 'rte_ring_dequeue_elem_commit' function. > > + * @param new_head > > + * Consumer's head index after reservation. The same should be passe= d to > > + * 'rte_ring_dequeue_elem_commit' function. > > + * @param available > > + * returns the number of remaining ring elements after the reservati= on > > + * It is not updated if the number of reserved elements is zero. > > + * @param src > > + * Pointer to location in the ring to copy the data from. > > + * @return > > + * - 0: Success; elements reserved > > + * - -ENOBUFS: Not enough room in the ring; no element is reserved. > > + */ > > +static __rte_always_inline int > > +rte_ring_dequeue_elem_reserve_serial(struct rte_ring *r, unsigned int > esize, > > + unsigned int *old_head, unsigned int *new_head, > > + unsigned int *available, void **src) { > > + unsigned int n; > > + > > + return __rte_ring_do_dequeue_elem_reserve_serial(r, esize, 1, > > + RTE_RING_QUEUE_FIXED, r->cons.single, old_head, > > + new_head, available, src, &n, NULL) ? 0 : -ENOBUFS; } > > + > > +/** > > + * Consume previously reserved elements (for dequeue) in a ring > > + * (multi-consumer safe). > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param old_head > > + * Consumer's head index before reservation. This value was returned > > + * when the API 'rte_ring_dequeue_elem_reserve_xxx' was called. > > + * @param new_head > > + * Consumer's head index after reservation. This value was returned > > + * when the API 'rte_ring_dequeue_elem_reserve_xxx' was called. > > + */ > > +static __rte_always_inline void > > +rte_ring_dequeue_elem_commit(struct rte_ring *r, unsigned int old_head= , > > + unsigned int new_head) > > +{ > > + __rte_ring_do_dequeue_elem_commit(r, old_head, new_head, > > + r->cons.single); > > +} > > + > > +/** > > + * Discard previously reserved elements (for dequeue) in a ring. > > + * > > + * @warning > > + * This API can be called only if the ring elements were reserved > > + * using 'rte_ring_dequeue_xxx_elem_reserve_serial' APIs. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + */ > > +static __rte_always_inline void > > +rte_ring_dequeue_elem_revert_serial(struct rte_ring *r) { > > + __rte_ring_revert_head(&r->cons); > > +} > > + > > +#ifdef __cplusplus > > +} > > +#endif > > + > > +#endif /* _RTE_RING_ELEM_SG_H_ */ > > diff --git a/lib/librte_ring/rte_ring_generic.h > > b/lib/librte_ring/rte_ring_generic.h > > index 953cdbbd5..8d7a7ffcc 100644 > > --- a/lib/librte_ring/rte_ring_generic.h > > +++ b/lib/librte_ring/rte_ring_generic.h > > @@ -170,4 +170,97 @@ __rte_ring_move_cons_head(struct rte_ring *r, > unsigned int is_sc, > > return n; > > } > > > > +/** > > + * @internal This function updates the consumer head if there are no > > + * prior reserved elements on the ring. > > + * > > + * @param r > > + * A pointer to the ring structure > > + * @param is_sc > > + * Indicates whether multi-consumer path is needed or not > > + * @param n > > + * The number of elements we will want to dequeue, i.e. how far shou= ld > the > > + * head be moved > > + * @param behavior > > + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a > ring > > + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from > ring > > + * @param old_head > > + * Returns head value as it was before the move, i.e. where dequeue > starts > > + * @param new_head > > + * Returns the current/new head value i.e. where dequeue finishes > > + * @param entries > > + * Returns the number of entries in the ring BEFORE head was moved > > + * @return > > + * - Actual number of objects dequeued. > > + * If behavior =3D=3D RTE_RING_QUEUE_FIXED, this will be 0 or n on= ly. > > + */ > > +static __rte_always_inline unsigned int > > +__rte_ring_move_cons_head_serial(struct rte_ring *r, unsigned int is_s= c, > > + unsigned int n, enum rte_ring_queue_behavior behavior, > > + uint32_t *old_head, uint32_t *new_head, > > + uint32_t *entries) > > +{ > > + unsigned int max =3D n; > > + int success; > > + > > + /* move cons.head atomically */ > > + do { > > + /* Restore n as it may change every loop */ > > + n =3D max; > > + > > + *old_head =3D r->cons.head; > > + > > + /* add rmb barrier to avoid load/load reorder in weak > > + * memory model. It is noop on x86 > > + */ > > + rte_smp_rmb(); > > + > > + /* Ensure that cons.tail and cons.head are the same */ > > + if (*old_head !=3D r->cons.tail) { > > + rte_pause(); > > + > > + success =3D 0; > > + continue; > > + } > > + > > + /* The subtraction is done between two unsigned 32bits value > > + * (the result is always modulo 32 bits even if we have > > + * cons_head > prod_tail). So 'entries' is always between 0 > > + * and size(ring)-1. > > + */ > > + *entries =3D (r->prod.tail - *old_head); > > + > > + /* Set the actual entries for dequeue */ > > + if (n > *entries) > > + n =3D (behavior =3D=3D RTE_RING_QUEUE_FIXED) ? 0 : > *entries; > > + > > + if (unlikely(n =3D=3D 0)) > > + return 0; > > + > > + *new_head =3D *old_head + n; > > + if (is_sc) { > > + r->cons.head =3D *new_head; > > + rte_smp_rmb(); > > + success =3D 1; >=20 > I don't think we need to worry about SC case in this function. > For sc(/sp)_serial (if we need such mode) - we probably can use normal > move_(cons/prod)_head(). Agree >=20 > > + } else { > > + success =3D rte_atomic32_cmpset(&r->cons.head, > *old_head, > > + *new_head); > > + } > > + } while (unlikely(success =3D=3D 0)); > > + return n; > > +} > > + > > +/** > > + * @internal This function updates the head to match the tail > > + * > > + * @param ht > > + * A pointer to the ring's head-tail structure > > + */ > > +static __rte_always_inline void > > +__rte_ring_revert_head(struct rte_ring_headtail *ht) { > > + /* Discard the reserved ring elements. */ > > + ht->head =3D ht->tail; > > +} > > + > > #endif /* _RTE_RING_GENERIC_H_ */ > > -- > > 2.17.1