From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from dpdk.org (dpdk.org [92.243.14.124]) by inbox.dpdk.org (Postfix) with ESMTP id 80764A04B6; Mon, 12 Oct 2020 18:23:23 +0200 (CEST) Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id 5CD201D938; Mon, 12 Oct 2020 18:21:05 +0200 (CEST) Received: from mga09.intel.com (mga09.intel.com [134.134.136.24]) by dpdk.org (Postfix) with ESMTP id DEF4A1D938 for ; Mon, 12 Oct 2020 18:21:02 +0200 (CEST) IronPort-SDR: tHY6N1uaWQN/fQ85miG/wdkuhchDzCJkU3mXV9JelSZ7GoDTXx4Rdl9cNQvftREcZykLB8Pfdv 6MFpELeWjf3Q== X-IronPort-AV: E=McAfee;i="6000,8403,9772"; a="165877576" X-IronPort-AV: E=Sophos;i="5.77,367,1596524400"; d="scan'208";a="165877576" X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from orsmga003.jf.intel.com ([10.7.209.27]) by orsmga102.jf.intel.com with ESMTP/TLS/ECDHE-RSA-AES256-GCM-SHA384; 12 Oct 2020 09:20:59 -0700 IronPort-SDR: AaahtAp1mISRLovF4DGz1SqyfmAd9EMUFSnyLosAwHCtsE8t3EquJoIrNxZV58PvKlgxh3Z2/S NCOKnXPHdykQ== X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.77,367,1596524400"; d="scan'208";a="313496374" Received: from fmsmsx606.amr.corp.intel.com ([10.18.126.86]) by orsmga003.jf.intel.com with ESMTP; 12 Oct 2020 09:20:59 -0700 Received: from fmsmsx607.amr.corp.intel.com (10.18.126.87) by fmsmsx606.amr.corp.intel.com (10.18.126.86) with Microsoft SMTP Server (version=TLS1_2, cipher=TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256) id 15.1.1713.5; Mon, 12 Oct 2020 09:20:59 -0700 Received: from fmsmsx603.amr.corp.intel.com (10.18.126.83) by fmsmsx607.amr.corp.intel.com (10.18.126.87) with Microsoft SMTP Server (version=TLS1_2, cipher=TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256) id 15.1.1713.5; Mon, 12 Oct 2020 09:20:58 -0700 Received: from fmsedg601.ED.cps.intel.com (10.1.192.135) by fmsmsx603.amr.corp.intel.com (10.18.126.83) with Microsoft SMTP Server (version=TLS1_2, cipher=TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256) id 15.1.1713.5 via Frontend Transport; Mon, 12 Oct 2020 09:20:58 -0700 Received: from NAM10-MW2-obe.outbound.protection.outlook.com (104.47.55.103) by edgegateway.intel.com (192.55.55.70) with Microsoft SMTP Server (version=TLS1_2, cipher=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384) id 15.1.1713.5; Mon, 12 Oct 2020 09:20:57 -0700 ARC-Seal: i=1; a=rsa-sha256; s=arcselector9901; d=microsoft.com; cv=none; b=MxygZivcGyiGWef3rMeJN8zPlqx5IbrGWxGL642GYWSt1DKcPYWOH9YGZSqunKd4aBlW1j+6WJnr4XsRd6ud5Y1JUq5ZpkJXf5oI+qianXDtGvLZgvdGWT4rGSelOruWk5zweByewJPTvJjAlIHsqNAXokD7ZaBS3joFbvgOY5k29BLqo8xaG7RpVDhTKui6bwPvPKF+NFSzHNHY6gfvlTgd43xyj9KHEgwPWgXI+M8OlD/kKYAxlBmm1fSoh4nE3zBgWjDCW8PO5RnAzZnwLpEtJ+50fA8wSlhdK+0U4AdH8ju8sF+4Z1s3enHri1LxsKrY2CrBAmPcS6hDlJPQeQ== ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=microsoft.com; s=arcselector9901; h=From:Date:Subject:Message-ID:Content-Type:MIME-Version:X-MS-Exchange-SenderADCheck; bh=QNXMRwF01YhcpyDmW0OIWBg3o22XtXqAl2hojVquobw=; b=fv8seDHiQohR5pK3FQN+j6YiXHWl+NYagAlL/RIRu7dUsEg0FB/XbQ5+JWIWFMGSw0ruVDbgNTxqYLvJXyXfVI1bs4Mmj22omfm7Lkjsuc/hcYWKvztatbQ0a1n0IXCdCcYkwIrmyZoObngJ5svZVAKraep08jMROsgYbcqaABgqF22dID88li9MlnVlYch3JyssI2zhzCSYw+NUEeqp6lBavfV6ZVZIfMvUr4oYpB02wHCdR0Qu9+w8pJijUOeCg+J1qYA0VdVGrSO2wiSzjGlVCbSspMUhyDNO9vViorRpGv8dAn9SbBom8XoIprl2hIynK9kOakDRISTaiFAJmg== ARC-Authentication-Results: i=1; mx.microsoft.com 1; spf=pass smtp.mailfrom=intel.com; dmarc=pass action=none header.from=intel.com; dkim=pass header.d=intel.com; arc=none DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=intel.onmicrosoft.com; s=selector2-intel-onmicrosoft-com; h=From:Date:Subject:Message-ID:Content-Type:MIME-Version:X-MS-Exchange-SenderADCheck; bh=QNXMRwF01YhcpyDmW0OIWBg3o22XtXqAl2hojVquobw=; b=MvPdn/TBfuUQGvaI/g3AUQrZ2rHBqKdYlTObMgu65xgMgPILEk1WAOMu1GXge4VWkgExzMW3qyveuRVbJdxIa4TOMFvu37uy47s6ipx60uHYWLQ8Kr7P5c+TZihIYp9rgXr+0vy41wAn/rfjnW70rLqchF/nWxhj+MWuEC8Hh4A= Received: from BYAPR11MB3301.namprd11.prod.outlook.com (2603:10b6:a03:7f::26) by BY5PR11MB3894.namprd11.prod.outlook.com (2603:10b6:a03:18c::20) with Microsoft SMTP Server (version=TLS1_2, cipher=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384) id 15.20.3455.24; Mon, 12 Oct 2020 16:20:56 +0000 Received: from BYAPR11MB3301.namprd11.prod.outlook.com ([fe80::f5a4:3f6b:ade3:296b]) by BYAPR11MB3301.namprd11.prod.outlook.com ([fe80::f5a4:3f6b:ade3:296b%3]) with mapi id 15.20.3455.029; Mon, 12 Oct 2020 16:20:56 +0000 From: "Ananyev, Konstantin" To: Honnappa Nagarahalli , "dev@dpdk.org" CC: "olivier.matz@6wind.com" , "david.marchand@redhat.com" , "nd@arm.com" Thread-Topic: [RFC v2 1/1] lib/ring: add scatter gather APIs Thread-Index: AQHWm+TAMRnLEGUYBEqkMDYffUIcWqmT/ylA Date: Mon, 12 Oct 2020 16:20:56 +0000 Message-ID: References: <20200224203931.21256-1-honnappa.nagarahalli@arm.com> <20201006132905.46205-1-honnappa.nagarahalli@arm.com> <20201006132905.46205-2-honnappa.nagarahalli@arm.com> In-Reply-To: <20201006132905.46205-2-honnappa.nagarahalli@arm.com> Accept-Language: en-GB, en-US Content-Language: en-US X-MS-Has-Attach: X-MS-TNEF-Correlator: dlp-product: dlpe-windows dlp-reaction: no-action dlp-version: 11.5.1.3 authentication-results: arm.com; dkim=none (message not signed) header.d=none;arm.com; dmarc=none action=none header.from=intel.com; x-originating-ip: [46.7.39.127] x-ms-publictraffictype: Email x-ms-office365-filtering-correlation-id: 982548dc-dee6-4815-8b56-08d86ecacc46 x-ms-traffictypediagnostic: BY5PR11MB3894: x-microsoft-antispam-prvs: x-ms-oob-tlc-oobclassifiers: OLM:10000; x-ms-exchange-senderadcheck: 1 x-microsoft-antispam: BCL:0; x-microsoft-antispam-message-info: IgFxQ1RxNqBnewhFdSfVHAbIyd4MWM4q+mhaEnhm5T+idPfbM3Vmqr5oh+73LDVYGXCspJxQGOynruXQ/4EfSuOz5Bnnz5w2z6X8/dxBO1S5UmPNbCTZuH8Fy2E94J37Ya/6N+XsR2SbVC0SWe31OwHW1rESEF2d17rWjTXy3916SLeEIdRRFwmDePFsYIZEY79A9PgJbfaxsNVIskiAoWFvM0LaKaVnKKZK7Ua+fd7ayS/uIy4MwwQ80xD0oOOgTKsYrh+Ln4HyEj6euc2hU6BwdTKA5+CyHISOZzXPRRjrCnAql+VJJ2MCK5FtVk/S x-forefront-antispam-report: CIP:255.255.255.255; CTRY:; LANG:en; SCL:1; SRV:; IPV:NLI; SFV:NSPM; H:BYAPR11MB3301.namprd11.prod.outlook.com; PTR:; CAT:NONE; SFS:(4636009)(346002)(376002)(39860400002)(396003)(366004)(136003)(316002)(52536014)(66556008)(55016002)(66476007)(83380400001)(2906002)(76116006)(7696005)(64756008)(66446008)(186003)(30864003)(26005)(110136005)(54906003)(86362001)(8936002)(33656002)(71200400001)(5660300002)(9686003)(4326008)(66946007)(8676002)(6506007)(478600001)(579004); DIR:OUT; SFP:1102; x-ms-exchange-antispam-messagedata: KozdF3zKjwRDu27hAoEwkUO6CcL721YfIZtBD9X7MCGIvdXmFyuHm8xjnymHEO71riPk293SAwE84Rc+sQJ3nwSxXuptr2GMGA7ua3zBnfF2Dzie4EV3c8CM4SHDSMNE01CBSvvmKAsjfW9IFRWJqKV+V1XRHY11+26h8VOks5h47HI0XaR9bplabCegQR5afuqif9OM1WCS1Y6ILMgWVb30EBNGwijZNyekAJsIwOQwWQmVJxY3QDFwWy63s6BWCkDJonLNtkfYSSAqr58U+vfDcQE4HKo3pfqUO5tQHHamZjvJuIhwWtoBymJGEc9GBkWlEQBXuRk6eObSRdpcpKbnbDisKLUlsVVJnACGGup4ZdQ2DHi61x5M67QGgQT2ta0ZZS016ye40BPPlUHB8E3AiH+E8VWTBygr6cUsxPnPyBzDq0DGnnQ0Ywe1VKSqc5ohvvOyv5iLS99347oKQSUy6AlFZVf7JK6y5buTVJPDccx6W6cbkhr+mHRbRXLOBAmuzKon1TDYCKglR7h2ptrH2DMDPFJOYI3eHqMc/MUBmnA64ILk06LI8b1DmXOiMkxrcH3d2eRdCTFvp5Vw0KO+rlWQOphxH9/c+bi9jgw6m4yx+NZ3q46rMVgjZCno2f0GsI31vOMCCmL0WhRHyg== x-ms-exchange-transport-forked: True Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: quoted-printable MIME-Version: 1.0 X-MS-Exchange-CrossTenant-AuthAs: Internal X-MS-Exchange-CrossTenant-AuthSource: BYAPR11MB3301.namprd11.prod.outlook.com X-MS-Exchange-CrossTenant-Network-Message-Id: 982548dc-dee6-4815-8b56-08d86ecacc46 X-MS-Exchange-CrossTenant-originalarrivaltime: 12 Oct 2020 16:20:56.0740 (UTC) X-MS-Exchange-CrossTenant-fromentityheader: Hosted X-MS-Exchange-CrossTenant-id: 46c98d88-e344-4ed4-8496-4ed7712e255d X-MS-Exchange-CrossTenant-mailboxtype: HOSTED X-MS-Exchange-CrossTenant-userprincipalname: vuHNgoHH5kryPKbhWWAZiEYzHEc6Qtupgshy7oG2eHgDYUbtqmsD3StkBvh/bnWs4DxCcaKlqyjSuYmNTg6PMmGZWODw6YIo0/3/PPhqAzE= X-MS-Exchange-Transport-CrossTenantHeadersStamped: BY5PR11MB3894 X-OriginatorOrg: intel.com Subject: Re: [dpdk-dev] [RFC v2 1/1] lib/ring: add scatter gather APIs X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" > Add scatter gather APIs to avoid intermediate memcpy. Use cases > that involve copying large amount of data to/from the ring > can benefit from these APIs. >=20 > Signed-off-by: Honnappa Nagarahalli > --- > lib/librte_ring/meson.build | 3 +- > lib/librte_ring/rte_ring_elem.h | 1 + > lib/librte_ring/rte_ring_peek_sg.h | 552 +++++++++++++++++++++++++++++ > 3 files changed, 555 insertions(+), 1 deletion(-) > create mode 100644 lib/librte_ring/rte_ring_peek_sg.h As a generic one - need to update ring UT both func and perf to test/measure this new API. >=20 > diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build > index 31c0b4649..377694713 100644 > --- a/lib/librte_ring/meson.build > +++ b/lib/librte_ring/meson.build > @@ -12,4 +12,5 @@ headers =3D files('rte_ring.h', > 'rte_ring_peek.h', > 'rte_ring_peek_c11_mem.h', > 'rte_ring_rts.h', > - 'rte_ring_rts_c11_mem.h') > + 'rte_ring_rts_c11_mem.h', > + 'rte_ring_peek_sg.h') > diff --git a/lib/librte_ring/rte_ring_elem.h b/lib/librte_ring/rte_ring_e= lem.h > index 938b398fc..7d3933f15 100644 > --- a/lib/librte_ring/rte_ring_elem.h > +++ b/lib/librte_ring/rte_ring_elem.h > @@ -1079,6 +1079,7 @@ rte_ring_dequeue_burst_elem(struct rte_ring *r, voi= d *obj_table, >=20 > #ifdef ALLOW_EXPERIMENTAL_API > #include > +#include > #endif >=20 > #include > diff --git a/lib/librte_ring/rte_ring_peek_sg.h b/lib/librte_ring/rte_rin= g_peek_sg.h > new file mode 100644 > index 000000000..97d5764a6 > --- /dev/null > +++ b/lib/librte_ring/rte_ring_peek_sg.h > @@ -0,0 +1,552 @@ > +/* SPDX-License-Identifier: BSD-3-Clause > + * > + * Copyright (c) 2020 Arm > + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org > + * All rights reserved. > + * Derived from FreeBSD's bufring.h > + * Used as BSD-3 Licensed with permission from Kip Macy. > + */ > + > +#ifndef _RTE_RING_PEEK_SG_H_ > +#define _RTE_RING_PEEK_SG_H_ > + > +/** > + * @file > + * @b EXPERIMENTAL: this API may change without prior notice > + * It is not recommended to include this file directly. > + * Please include instead. > + * > + * Ring Peek Scatter Gather APIs > + * Introduction of rte_ring with scatter gather serialized producer/cons= umer > + * (HTS sync mode) makes it possible to split public enqueue/dequeue API > + * into 3 phases: > + * - enqueue/dequeue start > + * - copy data to/from the ring > + * - enqueue/dequeue finish > + * Along with the advantages of the peek APIs, these APIs provide the ab= ility > + * to avoid copying of the data to temporary area. > + * > + * Note that right now this new API is available only for two sync modes= : > + * 1) Single Producer/Single Consumer (RTE_RING_SYNC_ST) > + * 2) Serialized Producer/Serialized Consumer (RTE_RING_SYNC_MT_HTS). > + * It is a user responsibility to create/init ring with appropriate sync > + * modes selected. > + * > + * Example usage: > + * // read 1 elem from the ring: > + * n =3D rte_ring_enqueue_sg_bulk_start(ring, 32, &sgd, NULL); > + * if (n !=3D 0) { > + * //Copy objects in the ring > + * memcpy (sgd->ptr1, obj, sgd->n1 * sizeof(uintptr_t)); > + * if (n !=3D sgd->n1) > + * //Second memcpy because of wrapround > + * n2 =3D n - sgd->n1; > + * memcpy (sgd->ptr2, obj[n2], n2 * sizeof(uintptr_t)); > + * rte_ring_dequeue_sg_finish(ring, n); It is not clear from the example above why do you need SG(ZC) API. Existing peek API would be able to handle such situation (just copy will be done internally). Probably better to use examples=20 you provided in your last reply to Olivier.=20 > + * } > + * > + * Note that between _start_ and _finish_ none other thread can proceed > + * with enqueue(/dequeue) operation till _finish_ completes. > + */ > + > +#ifdef __cplusplus > +extern "C" { > +#endif > + > +#include > + > +/* Rock that needs to be passed between reserve and commit APIs */ > +struct rte_ring_sg_data { > + /* Pointer to the first space in the ring */ > + void **ptr1; > + /* Pointer to the second space in the ring if there is wrap-around */ > + void **ptr2; > + /* Number of elements in the first pointer. If this is equal to > + * the number of elements requested, then ptr2 is NULL. > + * Otherwise, subtracting n1 from number of elements requested > + * will give the number of elements available at ptr2. > + */ > + unsigned int n1; > +}; I wonder what is the primary goal of that API? The reason I am asking: from what I understand with this patch ZC API will work only for ST and HTS modes (same as peek API). Though, I think it is possible to make it work for any sync model, by changing API a bit: instead of returning sg_data to the user, force him to provide function to read/write elems from/to the ring. Just a schematic one, to illustrate the idea: typedef void (*write_ring_func_t)(void *elem, /*pointer to first elem to up= date inside the ring*/ uint32_t num, /* number of elems to update */ uint32_t esize, void *udata /* caller provide data */); rte_ring_enqueue_zc_bulk_elem(struct rte_ring *r, unsigned int esize, unsigned int n, unsigned int *free_space, write_ring_func_t wf, void *udat= a) { struct rte_ring_sg_data sgd; ..... n =3D move_head_tail(r, ...); =09 /* get sgd data based on n */ get_elem_addr(r, ..., &sgd); /* call user defined function to fill reserved elems */ wf(sgd.p1, sgd.n1, esize, udata); if (n !=3D n1) wf(sgd.p2, sgd.n2, esize, udata); .... return n;=20 } If we want ZC peek API also - some extra work need to be done with introducing return value for write_ring_func() and checking it properly, but I don't see any big problems here too. That way ZC API can support all sync models, plus we don't need to expose sg_data to the user directly. Also, in future, we probably can de-duplicate the code by making our non-ZC API to use that one internally=20 (pass ring_enqueue_elems()/ob_table as a parameters).=20 > + > +static __rte_always_inline void > +__rte_ring_get_elem_addr_64(struct rte_ring *r, uint32_t head, > + uint32_t num, void **dst1, uint32_t *n1, void **dst2) > +{ > + uint32_t idx =3D head & r->mask; > + uint64_t *ring =3D (uint64_t *)&r[1]; > + > + *dst1 =3D ring + idx; > + *n1 =3D num; > + > + if (idx + num > r->size) { > + *n1 =3D num - (r->size - idx - 1); > + *dst2 =3D ring; > + } > +} > + > +static __rte_always_inline void > +__rte_ring_get_elem_addr_128(struct rte_ring *r, uint32_t head, > + uint32_t num, void **dst1, uint32_t *n1, void **dst2) > +{ > + uint32_t idx =3D head & r->mask; > + rte_int128_t *ring =3D (rte_int128_t *)&r[1]; > + > + *dst1 =3D ring + idx; > + *n1 =3D num; > + > + if (idx + num > r->size) { > + *n1 =3D num - (r->size - idx - 1); > + *dst2 =3D ring; > + } > +} > + > +static __rte_always_inline void > +__rte_ring_get_elem_addr(struct rte_ring *r, uint32_t head, > + uint32_t esize, uint32_t num, void **dst1, uint32_t *n1, void **dst2) > +{ > + if (esize =3D=3D 8) > + __rte_ring_get_elem_addr_64(r, head, > + num, dst1, n1, dst2); > + else if (esize =3D=3D 16) > + __rte_ring_get_elem_addr_128(r, head, > + num, dst1, n1, dst2); I don't think we need that special handling for 8/16B sizes. In all functions esize is an input parameter. If user will specify is as a constant - compiler will be able to convert multiply to shift and add ops.=20 > + else { > + uint32_t idx, scale, nr_idx; > + uint32_t *ring =3D (uint32_t *)&r[1]; > + > + /* Normalize to uint32_t */ > + scale =3D esize / sizeof(uint32_t); > + idx =3D head & r->mask; > + nr_idx =3D idx * scale; > + > + *dst1 =3D ring + nr_idx; > + *n1 =3D num; > + > + if (idx + num > r->size) { > + *n1 =3D num - (r->size - idx - 1); > + *dst2 =3D ring; > + } > + } > +} > + > +/** > + * @internal This function moves prod head value. > + */ > +static __rte_always_inline unsigned int > +__rte_ring_do_enqueue_sg_elem_start(struct rte_ring *r, unsigned int esi= ze, > + uint32_t n, enum rte_ring_queue_behavior behavior, > + struct rte_ring_sg_data *sgd, unsigned int *free_space) > +{ > + uint32_t free, head, next; > + > + switch (r->prod.sync_type) { > + case RTE_RING_SYNC_ST: > + n =3D __rte_ring_move_prod_head(r, RTE_RING_SYNC_ST, n, > + behavior, &head, &next, &free); > + __rte_ring_get_elem_addr(r, head, esize, n, (void **)&sgd->ptr1, > + &sgd->n1, (void **)&sgd->ptr2); > + break; > + case RTE_RING_SYNC_MT_HTS: > + n =3D __rte_ring_hts_move_prod_head(r, n, behavior, &head, &free); > + __rte_ring_get_elem_addr(r, head, esize, n, (void **)&sgd->ptr1, > + &sgd->n1, (void **)&sgd->ptr2); > + break; > + case RTE_RING_SYNC_MT: > + case RTE_RING_SYNC_MT_RTS: > + default: > + /* unsupported mode, shouldn't be here */ > + RTE_ASSERT(0); > + n =3D 0; > + free =3D 0; > + } > + > + if (free_space !=3D NULL) > + *free_space =3D free - n; > + return n; > +} > + > +/** > + * Start to enqueue several objects on the ring. > + * Note that no actual objects are put in the queue by this function, > + * it just reserves space for the user on the ring. > + * User has to copy objects into the queue using the returned pointers. > + * User should call rte_ring_enqueue_sg_bulk_elem_finish to complete the > + * enqueue operation. > + * > + * @param r > + * A pointer to the ring structure. > + * @param esize > + * The size of ring element, in bytes. It must be a multiple of 4. > + * @param n > + * The number of objects to add in the ring. > + * @param sgd > + * The scatter-gather data containing pointers for copying data. > + * @param free_space > + * if non-NULL, returns the amount of space in the ring after the > + * reservation operation has finished. > + * @return > + * The number of objects that can be enqueued, either 0 or n > + */ > +__rte_experimental > +static __rte_always_inline unsigned int > +rte_ring_enqueue_sg_bulk_elem_start(struct rte_ring *r, unsigned int esi= ze, > + unsigned int n, struct rte_ring_sg_data *sgd, unsigned int *free_space) > +{ > + return __rte_ring_do_enqueue_sg_elem_start(r, esize, n, > + RTE_RING_QUEUE_FIXED, sgd, free_space); > +} > + > +/** > + * Start to enqueue several pointers to objects on the ring. > + * Note that no actual pointers are put in the queue by this function, > + * it just reserves space for the user on the ring. > + * User has to copy pointers to objects into the queue using the > + * returned pointers. > + * User should call rte_ring_enqueue_sg_bulk_finish to complete the > + * enqueue operation. > + * > + * @param r > + * A pointer to the ring structure. > + * @param n > + * The number of objects to add in the ring. > + * @param sgd > + * The scatter-gather data containing pointers for copying data. > + * @param free_space > + * if non-NULL, returns the amount of space in the ring after the > + * reservation operation has finished. > + * @return > + * The number of objects that can be enqueued, either 0 or n > + */ > +__rte_experimental > +static __rte_always_inline unsigned int > +rte_ring_enqueue_sg_bulk_start(struct rte_ring *r, unsigned int n, > + struct rte_ring_sg_data *sgd, unsigned int *free_space) > +{ > + return rte_ring_enqueue_sg_bulk_elem_start(r, sizeof(uintptr_t), n, > + sgd, free_space); > +} > +/** > + * Start to enqueue several objects on the ring. > + * Note that no actual objects are put in the queue by this function, > + * it just reserves space for the user on the ring. > + * User has to copy objects into the queue using the returned pointers. > + * User should call rte_ring_enqueue_sg_bulk_elem_finish to complete the > + * enqueue operation. > + * > + * @param r > + * A pointer to the ring structure. > + * @param esize > + * The size of ring element, in bytes. It must be a multiple of 4. > + * @param n > + * The number of objects to add in the ring. > + * @param sgd > + * The scatter-gather data containing pointers for copying data. > + * @param free_space > + * if non-NULL, returns the amount of space in the ring after the > + * reservation operation has finished. > + * @return > + * The number of objects that can be enqueued, either 0 or n > + */ > +__rte_experimental > +static __rte_always_inline unsigned int > +rte_ring_enqueue_sg_burst_elem_start(struct rte_ring *r, unsigned int es= ize, > + unsigned int n, struct rte_ring_sg_data *sgd, unsigned int *free_space) > +{ > + return __rte_ring_do_enqueue_sg_elem_start(r, esize, n, > + RTE_RING_QUEUE_VARIABLE, sgd, free_space); > +} > + > +/** > + * Start to enqueue several pointers to objects on the ring. > + * Note that no actual pointers are put in the queue by this function, > + * it just reserves space for the user on the ring. > + * User has to copy pointers to objects into the queue using the > + * returned pointers. > + * User should call rte_ring_enqueue_sg_bulk_finish to complete the > + * enqueue operation. > + * > + * @param r > + * A pointer to the ring structure. > + * @param n > + * The number of objects to add in the ring. > + * @param sgd > + * The scatter-gather data containing pointers for copying data. > + * @param free_space > + * if non-NULL, returns the amount of space in the ring after the > + * reservation operation has finished. > + * @return > + * The number of objects that can be enqueued, either 0 or n > + */ > +__rte_experimental > +static __rte_always_inline unsigned int > +rte_ring_enqueue_sg_burst_start(struct rte_ring *r, unsigned int n, > + struct rte_ring_sg_data *sgd, unsigned int *free_space) > +{ > + return rte_ring_enqueue_sg_burst_elem_start(r, sizeof(uintptr_t), n, > + sgd, free_space); > +} > + > +/** > + * Complete enqueuing several objects on the ring. > + * Note that number of objects to enqueue should not exceed previous > + * enqueue_start return value. > + * > + * @param r > + * A pointer to the ring structure. > + * @param n > + * The number of objects to add to the ring. > + */ > +__rte_experimental > +static __rte_always_inline void > +rte_ring_enqueue_sg_elem_finish(struct rte_ring *r, unsigned int n) > +{ > + uint32_t tail; > + > + switch (r->prod.sync_type) { > + case RTE_RING_SYNC_ST: > + n =3D __rte_ring_st_get_tail(&r->prod, &tail, n); > + __rte_ring_st_set_head_tail(&r->prod, tail, n, 1); > + break; > + case RTE_RING_SYNC_MT_HTS: > + n =3D __rte_ring_hts_get_tail(&r->hts_prod, &tail, n); > + __rte_ring_hts_set_head_tail(&r->hts_prod, tail, n, 1); > + break; > + case RTE_RING_SYNC_MT: > + case RTE_RING_SYNC_MT_RTS: > + default: > + /* unsupported mode, shouldn't be here */ > + RTE_ASSERT(0); > + } > +} > + > +/** > + * Complete enqueuing several pointers to objects on the ring. > + * Note that number of objects to enqueue should not exceed previous > + * enqueue_start return value. > + * > + * @param r > + * A pointer to the ring structure. > + * @param n > + * The number of pointers to objects to add to the ring. > + */ > +__rte_experimental > +static __rte_always_inline void > +rte_ring_enqueue_sg_finish(struct rte_ring *r, unsigned int n) > +{ > + rte_ring_enqueue_sg_elem_finish(r, n); > +} > + > +/** > + * @internal This function moves cons head value and copies up to *n* > + * objects from the ring to the user provided obj_table. > + */ > +static __rte_always_inline unsigned int > +__rte_ring_do_dequeue_sg_elem_start(struct rte_ring *r, > + uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior, > + struct rte_ring_sg_data *sgd, unsigned int *available) > +{ > + uint32_t avail, head, next; > + > + switch (r->cons.sync_type) { > + case RTE_RING_SYNC_ST: > + n =3D __rte_ring_move_cons_head(r, RTE_RING_SYNC_ST, n, > + behavior, &head, &next, &avail); > + __rte_ring_get_elem_addr(r, head, esize, n, > + sgd->ptr1, &sgd->n1, sgd->ptr2); > + break; > + case RTE_RING_SYNC_MT_HTS: > + n =3D __rte_ring_hts_move_cons_head(r, n, behavior, > + &head, &avail); > + __rte_ring_get_elem_addr(r, head, esize, n, > + sgd->ptr1, &sgd->n1, sgd->ptr2); > + break; > + case RTE_RING_SYNC_MT: > + case RTE_RING_SYNC_MT_RTS: > + default: > + /* unsupported mode, shouldn't be here */ > + RTE_ASSERT(0); > + n =3D 0; > + avail =3D 0; > + } > + > + if (available !=3D NULL) > + *available =3D avail - n; > + return n; > +} > + > +/** > + * Start to dequeue several objects from the ring. > + * Note that no actual objects are copied from the queue by this functio= n. > + * User has to copy objects from the queue using the returned pointers. > + * User should call rte_ring_dequeue_sg_bulk_elem_finish to complete the > + * dequeue operation. > + * > + * @param r > + * A pointer to the ring structure. > + * @param esize > + * The size of ring element, in bytes. It must be a multiple of 4. > + * @param n > + * The number of objects to remove from the ring. > + * @param sgd > + * The scatter-gather data containing pointers for copying data. > + * @param available > + * If non-NULL, returns the number of remaining ring entries after the > + * dequeue has finished. > + * @return > + * The number of objects that can be dequeued, either 0 or n > + */ > +__rte_experimental > +static __rte_always_inline unsigned int > +rte_ring_dequeue_sg_bulk_elem_start(struct rte_ring *r, unsigned int esi= ze, > + unsigned int n, struct rte_ring_sg_data *sgd, unsigned int *available) > +{ > + return __rte_ring_do_dequeue_sg_elem_start(r, esize, n, > + RTE_RING_QUEUE_FIXED, sgd, available); > +} > + > +/** > + * Start to dequeue several pointers to objects from the ring. > + * Note that no actual pointers are removed from the queue by this funct= ion. > + * User has to copy pointers to objects from the queue using the > + * returned pointers. > + * User should call rte_ring_dequeue_sg_bulk_finish to complete the > + * dequeue operation. > + * > + * @param r > + * A pointer to the ring structure. > + * @param n > + * The number of objects to remove from the ring. > + * @param sgd > + * The scatter-gather data containing pointers for copying data. > + * @param available > + * If non-NULL, returns the number of remaining ring entries after the > + * dequeue has finished. > + * @return > + * The number of objects that can be dequeued, either 0 or n > + */ > +__rte_experimental > +static __rte_always_inline unsigned int > +rte_ring_dequeue_sg_bulk_start(struct rte_ring *r, unsigned int n, > + struct rte_ring_sg_data *sgd, unsigned int *available) > +{ > + return rte_ring_dequeue_sg_bulk_elem_start(r, sizeof(uintptr_t), > + n, sgd, available); > +} > + > +/** > + * Start to dequeue several objects from the ring. > + * Note that no actual objects are copied from the queue by this functio= n. > + * User has to copy objects from the queue using the returned pointers. > + * User should call rte_ring_dequeue_sg_burst_elem_finish to complete th= e > + * dequeue operation. > + * > + * @param r > + * A pointer to the ring structure. > + * @param esize > + * The size of ring element, in bytes. It must be a multiple of 4. > + * This must be the same value used while creating the ring. Otherwise > + * the results are undefined. > + * @param n > + * The number of objects to dequeue from the ring. > + * @param sgd > + * The scatter-gather data containing pointers for copying data. > + * @param available > + * If non-NULL, returns the number of remaining ring entries after the > + * dequeue has finished. > + * @return > + * The number of objects that can be dequeued, either 0 or n > + */ > +__rte_experimental > +static __rte_always_inline unsigned int > +rte_ring_dequeue_sg_burst_elem_start(struct rte_ring *r, unsigned int es= ize, > + unsigned int n, struct rte_ring_sg_data *sgd, unsigned int *available) > +{ > + return __rte_ring_do_dequeue_sg_elem_start(r, esize, n, > + RTE_RING_QUEUE_VARIABLE, sgd, available); > +} > + > +/** > + * Start to dequeue several pointers to objects from the ring. > + * Note that no actual pointers are removed from the queue by this funct= ion. > + * User has to copy pointers to objects from the queue using the > + * returned pointers. > + * User should call rte_ring_dequeue_sg_burst_finish to complete the > + * dequeue operation. > + * > + * @param r > + * A pointer to the ring structure. > + * @param n > + * The number of objects to remove from the ring. > + * @param sgd > + * The scatter-gather data containing pointers for copying data. > + * @param available > + * If non-NULL, returns the number of remaining ring entries after the > + * dequeue has finished. > + * @return > + * The number of objects that can be dequeued, either 0 or n > + */ > +__rte_experimental > +static __rte_always_inline unsigned int > +rte_ring_dequeue_sg_burst_start(struct rte_ring *r, unsigned int n, > + struct rte_ring_sg_data *sgd, unsigned int *available) > +{ > + return rte_ring_dequeue_sg_burst_elem_start(r, sizeof(uintptr_t), n, > + sgd, available); > +} > + > +/** > + * Complete dequeuing several objects from the ring. > + * Note that number of objects to dequeued should not exceed previous > + * dequeue_start return value. > + * > + * @param r > + * A pointer to the ring structure. > + * @param n > + * The number of objects to remove from the ring. > + */ > +__rte_experimental > +static __rte_always_inline void > +rte_ring_dequeue_sg_elem_finish(struct rte_ring *r, unsigned int n) > +{ > + uint32_t tail; > + > + switch (r->cons.sync_type) { > + case RTE_RING_SYNC_ST: > + n =3D __rte_ring_st_get_tail(&r->cons, &tail, n); > + __rte_ring_st_set_head_tail(&r->cons, tail, n, 0); > + break; > + case RTE_RING_SYNC_MT_HTS: > + n =3D __rte_ring_hts_get_tail(&r->hts_cons, &tail, n); > + __rte_ring_hts_set_head_tail(&r->hts_cons, tail, n, 0); > + break; > + case RTE_RING_SYNC_MT: > + case RTE_RING_SYNC_MT_RTS: > + default: > + /* unsupported mode, shouldn't be here */ > + RTE_ASSERT(0); > + } > +} > + > +/** > + * Complete dequeuing several objects from the ring. > + * Note that number of objects to dequeued should not exceed previous > + * dequeue_start return value. > + * > + * @param r > + * A pointer to the ring structure. > + * @param n > + * The number of objects to remove from the ring. > + */ > +__rte_experimental > +static __rte_always_inline void > +rte_ring_dequeue_sg_finish(struct rte_ring *r, unsigned int n) > +{ > + rte_ring_dequeue_elem_finish(r, n); > +} > + > +#ifdef __cplusplus > +} > +#endif > + > +#endif /* _RTE_RING_PEEK_SG_H_ */ > -- > 2.17.1