From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from dpdk.org (dpdk.org [92.243.14.124]) by inbox.dpdk.org (Postfix) with ESMTP id 36CFDA058B; Wed, 25 Mar 2020 21:44:18 +0100 (CET) Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id 18F571B949; Wed, 25 Mar 2020 21:44:18 +0100 (CET) Received: from EUR03-AM5-obe.outbound.protection.outlook.com (mail-eopbgr30043.outbound.protection.outlook.com [40.107.3.43]) by dpdk.org (Postfix) with ESMTP id D4C29378E for ; Wed, 25 Mar 2020 21:44:16 +0100 (CET) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=armh.onmicrosoft.com; s=selector2-armh-onmicrosoft-com; h=From:Date:Subject:Message-ID:Content-Type:MIME-Version:X-MS-Exchange-SenderADCheck; bh=DjK8x0vPbXULqTYvahvCxQ33ZMTjH7ZahxUPAQpCfT4=; b=lvkpP8X5/QzLdV2TuVU0maGYRYG6Jf6r1srfwSvmby6wMh2MfZOW9DkKngu02poQW3R+sz0HoMQMMoKpV4dubT8lp/oW4c9cpYhRYpeqVw4VlJcn3JqodXH+tDykmtzqNm0OnKhT7UdaXzrDTZMrDQJvOnw//umqGR/LOJsc5xM= Received: from AM5PR0602CA0014.eurprd06.prod.outlook.com (2603:10a6:203:a3::24) by HE1PR0801MB1722.eurprd08.prod.outlook.com (2603:10a6:3:87::18) with Microsoft SMTP Server (version=TLS1_2, cipher=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384) id 15.20.2835.20; Wed, 25 Mar 2020 20:44:14 +0000 Received: from AM5EUR03FT031.eop-EUR03.prod.protection.outlook.com (2603:10a6:203:a3:cafe::8a) by AM5PR0602CA0014.outlook.office365.com (2603:10a6:203:a3::24) with Microsoft SMTP Server (version=TLS1_2, cipher=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384) id 15.20.2856.18 via Frontend Transport; Wed, 25 Mar 2020 20:44:13 +0000 Authentication-Results: spf=pass (sender IP is 63.35.35.123) smtp.mailfrom=arm.com; dpdk.org; dkim=pass (signature was verified) header.d=armh.onmicrosoft.com;dpdk.org; dmarc=bestguesspass action=none header.from=arm.com; Received-SPF: Pass (protection.outlook.com: domain of arm.com designates 63.35.35.123 as permitted sender) receiver=protection.outlook.com; client-ip=63.35.35.123; helo=64aa7808-outbound-1.mta.getcheckrecipient.com; Received: from 64aa7808-outbound-1.mta.getcheckrecipient.com (63.35.35.123) by AM5EUR03FT031.mail.protection.outlook.com (10.152.16.111) with Microsoft SMTP Server (version=TLS1_2, cipher=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384) id 15.20.2856.17 via Frontend Transport; Wed, 25 Mar 2020 20:44:13 +0000 Received: ("Tessian outbound e13acb17570e:v48"); Wed, 25 Mar 2020 20:44:13 +0000 X-CR-MTA-TID: 64aa7808 Received: from 42113716b0ec.2 by 64aa7808-outbound-1.mta.getcheckrecipient.com id F59F4872-5216-46D3-AE59-7842F96DDCE9.1; Wed, 25 Mar 2020 20:44:08 +0000 Received: from EUR02-HE1-obe.outbound.protection.outlook.com by 64aa7808-outbound-1.mta.getcheckrecipient.com with ESMTPS id 42113716b0ec.2 (version=TLSv1.2 cipher=ECDHE-RSA-AES256-GCM-SHA384); Wed, 25 Mar 2020 20:44:08 +0000 ARC-Seal: i=1; a=rsa-sha256; s=arcselector9901; d=microsoft.com; cv=none; b=oRUc8Ce4dTV8HiDWnetadFztKXynypBczAgxSW2N2pF3fSj00kt5gUBGtql+KhmV+xQ0LOMNLJTdCNhUR8/gwjzHtGsa7jCLFU+xFTQEuHNAllbM1mWFZZrNpKZhAcC5rPyS4BBlItGn85g/A7qVE7Nj0Mt6Oe8dsZ89RRNeR07r7O++L2qSratARCtCjplOY1k2NciPB85nkWiM5x0z2yUBLgcVJivVNdywuBS9frG7/eZ/W9hkiWkYb3/3SjSDRTbY653pIuS6nMOu/vjF6YfuQ0WnOoEU8+Pfq+3u7cgRSWH/PcRntfSQToBV/rdLwuD6uM4wAvu+NLuyAH64gg== ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=microsoft.com; s=arcselector9901; h=From:Date:Subject:Message-ID:Content-Type:MIME-Version:X-MS-Exchange-SenderADCheck; bh=DjK8x0vPbXULqTYvahvCxQ33ZMTjH7ZahxUPAQpCfT4=; b=XK1fe/iJPtNi060LeS+jYz94gy5EUjA3PEbAqq513cZ8odYt2+5hGN2pn8z6PvR/FeIW9OEXQDrTs9zAigLB79ZR9uHLu+XQ8OHn89QZ8+y57kFTQJN3pvr09lakc5wotm4HllJgcMCiDJgRJ7hcWBEcwp5mK/NlxUPK60EKEX0SdXWNay+I0PvZRukuVLlia2vuP/juNdATnem623PJDM/HWHf7Fp4jvMWRYBcWdUMzBrDw8yk8TudVCeNEs32SsmxXR3oYHpjx/xrAmRziXteKf83NDxtF8EWEkrFwu2OxnHThMWOPPL7SwZ13YjXjqJvlaaNZKTBT45qAwOxvJA== ARC-Authentication-Results: i=1; mx.microsoft.com 1; spf=pass smtp.mailfrom=arm.com; dmarc=pass action=none header.from=arm.com; dkim=pass header.d=arm.com; arc=none DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=armh.onmicrosoft.com; s=selector2-armh-onmicrosoft-com; h=From:Date:Subject:Message-ID:Content-Type:MIME-Version:X-MS-Exchange-SenderADCheck; bh=DjK8x0vPbXULqTYvahvCxQ33ZMTjH7ZahxUPAQpCfT4=; b=lvkpP8X5/QzLdV2TuVU0maGYRYG6Jf6r1srfwSvmby6wMh2MfZOW9DkKngu02poQW3R+sz0HoMQMMoKpV4dubT8lp/oW4c9cpYhRYpeqVw4VlJcn3JqodXH+tDykmtzqNm0OnKhT7UdaXzrDTZMrDQJvOnw//umqGR/LOJsc5xM= Received: from VE1PR08MB5149.eurprd08.prod.outlook.com (20.179.30.27) by VE1PR08MB5133.eurprd08.prod.outlook.com (20.179.30.156) with Microsoft SMTP Server (version=TLS1_2, cipher=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384) id 15.20.2856.18; Wed, 25 Mar 2020 20:44:04 +0000 Received: from VE1PR08MB5149.eurprd08.prod.outlook.com ([fe80::2573:103b:ed96:90bd]) by VE1PR08MB5149.eurprd08.prod.outlook.com ([fe80::2573:103b:ed96:90bd%6]) with mapi id 15.20.2835.023; Wed, 25 Mar 2020 20:44:04 +0000 From: Honnappa Nagarahalli To: Konstantin Ananyev , "dev@dpdk.org" CC: "olivier.matz@6wind.com" , nd , Honnappa Nagarahalli , nd Thread-Topic: [dpdk-dev] [RFC 5/6] ring: introduce HTS ring mode Thread-Index: AQHV6wakdBzGVVHx3U6yLbwnviGisKhYr6VA Date: Wed, 25 Mar 2020 20:44:03 +0000 Message-ID: References: <20200224113515.1744-1-konstantin.ananyev@intel.com> <20200224113515.1744-6-konstantin.ananyev@intel.com> In-Reply-To: <20200224113515.1744-6-konstantin.ananyev@intel.com> Accept-Language: en-US Content-Language: en-US X-MS-Has-Attach: X-MS-TNEF-Correlator: x-ts-tracking-id: fcb01f4d-3da2-4f04-8d48-b8a58a9501f5.0 x-checkrecipientchecked: true Authentication-Results-Original: spf=none (sender IP is ) smtp.mailfrom=Honnappa.Nagarahalli@arm.com; x-originating-ip: [70.113.25.165] x-ms-publictraffictype: Email X-MS-Office365-Filtering-HT: Tenant X-MS-Office365-Filtering-Correlation-Id: 3a145c07-9e21-43b6-5162-08d7d0fd4751 x-ms-traffictypediagnostic: VE1PR08MB5133:|VE1PR08MB5133:|HE1PR0801MB1722: x-ms-exchange-transport-forked: True X-Microsoft-Antispam-PRVS: x-checkrecipientrouted: true nodisclaimer: true x-ms-oob-tlc-oobclassifiers: OLM:8882;OLM:8882; x-forefront-prvs: 0353563E2B X-Forefront-Antispam-Report-Untrusted: SFV:NSPM; SFS:(10009020)(4636009)(396003)(366004)(39860400002)(136003)(376002)(346002)(478600001)(66476007)(30864003)(7696005)(66556008)(64756008)(66446008)(66946007)(5660300002)(76116006)(8676002)(81156014)(81166006)(52536014)(6506007)(4326008)(26005)(9686003)(55016002)(2906002)(316002)(110136005)(54906003)(186003)(8936002)(71200400001)(33656002)(86362001)(21314003)(559001)(579004); DIR:OUT; SFP:1101; SCL:1; SRVR:VE1PR08MB5133; H:VE1PR08MB5149.eurprd08.prod.outlook.com; FPR:; SPF:None; LANG:en; PTR:InfoNoRecords; received-spf: None (protection.outlook.com: arm.com does not designate permitted sender hosts) X-MS-Exchange-SenderADCheck: 1 X-Microsoft-Antispam-Untrusted: BCL:0; X-Microsoft-Antispam-Message-Info-Original: IeQdjStfeRaQjcNJAeTHbxPBbrEr82hFAUOV4KIyQaoz1gM4WB9hzdGEWmJnnJQOO+8TE7QMpVhyU7pkopiJmhrB1dgjTgzyXQBky1QXPo29zZZenolTpXdrM99YEaAqvKExskcD3ugK+Rlln3Vzrpn+4SbpBmY4RQDTtYxNnfSv1fgmNur1mXufHawjRKivbr2HZZwxiPQZXF0W/Jr5OpzENlAF5voWvlyJhFp9w0/CmlANEvSBP81PxYhdLWSl3m2PPm2ucBLq9+id0rNX4urPwL1P2Yt8twI9yugGgiJiSIAda3MGQcF6mizCWsTlyV/MWVX9vh2c/sdjAKWs4fwwG0HPYIpsktBZDLI4ClZeTYi/zCCPNodB/83z9MhLRlakmW1a/dDDuAdRZ07y5SOia8TiLo8wI+1i7ASKaqca5D4f/4hC+gynUo+5XGKO4Yon1LM4icrKvGRRHNNpheM9ah4sKtBv59vR3SZHSi8cEoFJgLmdVfbVKoeXdpGA x-ms-exchange-antispam-messagedata: 7ZFVdhwYDwAG5BdSyEGFFwiHOEYrl5rx3sdsUp/M8RY4/pNL8L/8vXmOOWz0QwKSTB6zmgSbsxg5gPu6UL8TWdMSe/WckGPBaF+ra1o+23L2Ey/mGjczvUaSzDY/ihTYuzfsR/VJGp98DYWVaj/KGA== Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: quoted-printable MIME-Version: 1.0 X-MS-Exchange-Transport-CrossTenantHeadersStamped: VE1PR08MB5133 Original-Authentication-Results: spf=none (sender IP is ) smtp.mailfrom=Honnappa.Nagarahalli@arm.com; X-EOPAttributedMessage: 0 X-MS-Exchange-Transport-CrossTenantHeadersStripped: AM5EUR03FT031.eop-EUR03.prod.protection.outlook.com X-Forefront-Antispam-Report: CIP:63.35.35.123; IPV:CAL; SCL:-1; CTRY:IE; EFV:NLI; SFV:NSPM; SFS:(10009020)(4636009)(396003)(39860400002)(376002)(346002)(136003)(46966005)(55016002)(70206006)(70586007)(82740400003)(316002)(336012)(2906002)(9686003)(110136005)(186003)(33656002)(54906003)(26005)(36906005)(4326008)(356004)(86362001)(81166006)(30864003)(81156014)(5660300002)(478600001)(6506007)(26826003)(8936002)(7696005)(47076004)(8676002)(52536014)(21314003); DIR:OUT; SFP:1101; SCL:1; SRVR:HE1PR0801MB1722; H:64aa7808-outbound-1.mta.getcheckrecipient.com; FPR:; SPF:Pass; LANG:en; PTR:ec2-63-35-35-123.eu-west-1.compute.amazonaws.com; X-MS-Office365-Filtering-Correlation-Id-Prvs: c6d37b01-5356-48bc-ac20-08d7d0fd418a X-Forefront-PRVS: 0353563E2B X-Microsoft-Antispam: BCL:0; X-Microsoft-Antispam-Message-Info: teKYo1oL+V3WAJqd6f2mRo+SENUtSDKTpwpltOfhJzlsNdxoIe09P98eMRdazzdUD6eeVbrAT3Vh70cCY4o9WbR2Ms5ezLvMQNwA5lulvGwXVOkXnDXNRhQ67Oy3PHch/ndT7QXksf0xQvObh+9sq3W/ejA7AxFoe6WStzlodILOxrXsW/2c6nhvRjf/cPfIgGjTxusGIPHoWbHMfwtJVdMAAfiYBuFwEyRkTgG5vEURcp6mpY7nXd0pr7o5DMmFV9zz01Uzwo//hKvjkZDZS3qiX7hQ+LL6k68s7flWUtkkzG6dvwik2X5xOjjvN70PECpft7BehnR4Z8+k3w/UnfzqMweOKJsOt9Pnrkwqy6xSvGgjvnkOfd4OYLJDzFkHJC7PabI5lMGSrtzw+mBYUq2o2DXet0ZrVGcgYHnLl+r0xul4w7QftlH0/TLoOKH9ACmln36FMD3bPCm51lLFkBFu8yOrPvqulQjkFqiqtyJnDGSLfgxfMn9gdeJTRBZpeGolRa2tvTkh7G/ap6tqS5aYLly+9jbY7BCUBFve46M= X-OriginatorOrg: arm.com X-MS-Exchange-CrossTenant-OriginalArrivalTime: 25 Mar 2020 20:44:13.7281 (UTC) X-MS-Exchange-CrossTenant-Network-Message-Id: 3a145c07-9e21-43b6-5162-08d7d0fd4751 X-MS-Exchange-CrossTenant-Id: f34e5979-57d9-4aaa-ad4d-b122a662184d X-MS-Exchange-CrossTenant-OriginalAttributedTenantConnectingIp: TenantId=f34e5979-57d9-4aaa-ad4d-b122a662184d; Ip=[63.35.35.123]; Helo=[64aa7808-outbound-1.mta.getcheckrecipient.com] X-MS-Exchange-CrossTenant-FromEntityHeader: HybridOnPrem X-MS-Exchange-Transport-CrossTenantHeadersStamped: HE1PR0801MB1722 Subject: Re: [dpdk-dev] [RFC 5/6] ring: introduce HTS ring mode X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" >=20 > Introduce head/tail sync mode for MT ring synchronization. > In that mode enqueue/dequeue operation is fully serialized: > only one thread at a time is allowed to perform given op. > Suppose to reduce stall times in case when ring is used on overcommitted > cpus (multiple active threads on the same cpu). > As another enhancement provide ability to split enqueue/dequeue operation > into two phases: > - enqueue/dequeue start > - enqueue/dequeue finish > That allows user to inspect objects in the ring without removing them fro= m it > (aka MT safe peek). >=20 > Signed-off-by: Konstantin Ananyev > --- > lib/librte_ring/Makefile | 1 + > lib/librte_ring/meson.build | 1 + > lib/librte_ring/rte_ring.c | 15 +- > lib/librte_ring/rte_ring.h | 259 ++++++++++++++++++++++++- > lib/librte_ring/rte_ring_hts_generic.h | 228 ++++++++++++++++++++++ > 5 files changed, 500 insertions(+), 4 deletions(-) create mode 100644 > lib/librte_ring/rte_ring_hts_generic.h >=20 > diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile index > 4f90344f4..0c7f8f918 100644 > --- a/lib/librte_ring/Makefile > +++ b/lib/librte_ring/Makefile > @@ -19,6 +19,7 @@ SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include :=3D > rte_ring.h \ > rte_ring_elem.h \ > rte_ring_generic.h \ > rte_ring_c11_mem.h \ > + rte_ring_hts_generic.h \ > rte_ring_rts_generic.h >=20 > include $(RTE_SDK)/mk/rte.lib.mk > diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build in= dex > dc8d7dbea..5aa673199 100644 > --- a/lib/librte_ring/meson.build > +++ b/lib/librte_ring/meson.build > @@ -6,6 +6,7 @@ headers =3D files('rte_ring.h', > 'rte_ring_elem.h', > 'rte_ring_c11_mem.h', > 'rte_ring_generic.h', > + 'rte_ring_hts_generic.h', > 'rte_ring_rts_generic.h') >=20 > # rte_ring_create_elem and rte_ring_get_memsize_elem are experimental > diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c inde= x > 1ce0af3e5..d3b948667 100644 > --- a/lib/librte_ring/rte_ring.c > +++ b/lib/librte_ring/rte_ring.c > @@ -102,9 +102,9 @@ static int > get_sync_type(uint32_t flags, uint32_t *prod_st, uint32_t *cons_st) { > static const uint32_t prod_st_flags =3D > - (RING_F_SP_ENQ | RING_F_MP_RTS_ENQ); > + (RING_F_SP_ENQ | RING_F_MP_RTS_ENQ | > RING_F_MP_HTS_ENQ); > static const uint32_t cons_st_flags =3D > - (RING_F_SC_DEQ | RING_F_MC_RTS_DEQ); > + (RING_F_SC_DEQ | RING_F_MC_RTS_DEQ | > RING_F_MC_HTS_DEQ); >=20 > switch (flags & prod_st_flags) { > case 0: > @@ -116,6 +116,9 @@ get_sync_type(uint32_t flags, uint32_t *prod_st, > uint32_t *cons_st) > case RING_F_MP_RTS_ENQ: > *prod_st =3D RTE_RING_SYNC_MT_RTS; > break; > + case RING_F_MP_HTS_ENQ: > + *prod_st =3D RTE_RING_SYNC_MT_HTS; > + break; > default: > return -EINVAL; > } > @@ -130,6 +133,9 @@ get_sync_type(uint32_t flags, uint32_t *prod_st, > uint32_t *cons_st) > case RING_F_MC_RTS_DEQ: > *cons_st =3D RTE_RING_SYNC_MT_RTS; > break; > + case RING_F_MC_HTS_DEQ: > + *cons_st =3D RTE_RING_SYNC_MT_HTS; > + break; > default: > return -EINVAL; > } > @@ -151,6 +157,11 @@ rte_ring_init(struct rte_ring *r, const char *name, > unsigned count, > RTE_BUILD_BUG_ON((offsetof(struct rte_ring, prod) & > RTE_CACHE_LINE_MASK) !=3D 0); >=20 > + RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, sync_type) !=3D > + offsetof(struct rte_ring_hts_headtail, sync_type)); > + RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, tail) !=3D > + offsetof(struct rte_ring_hts_headtail, ht.pos.tail)); > + > RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, sync_type) !=3D > offsetof(struct rte_ring_rts_headtail, sync_type)); > RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, tail) !=3D diff --g= it > a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h index > a130aeb9d..52edcea11 100644 > --- a/lib/librte_ring/rte_ring.h > +++ b/lib/librte_ring/rte_ring.h > @@ -66,11 +66,11 @@ enum { > RTE_RING_SYNC_MT, /**< multi-thread safe (default mode) */ > RTE_RING_SYNC_ST, /**< single thread only */ > RTE_RING_SYNC_MT_RTS, /**< multi-thread relaxed tail sync */ > + RTE_RING_SYNC_MT_HTS, /**< multi-thread head/tail sync */ > }; >=20 > /** > - * structure to hold a pair of head/tail values and other metadata. > - * used by RTE_RING_SYNC_MT, RTE_RING_SYNC_ST sync types. > + * Structure to hold a pair of head/tail values and other metadata. > * Depending on sync_type format of that structure might differ > * depending on the sync mechanism selelcted, but offsets for > * *sync_type* and *tail* values should always remain the same. > @@ -96,6 +96,19 @@ struct rte_ring_rts_headtail { > volatile union rte_ring_ht_poscnt head; }; >=20 > +union rte_ring_ht_pos { > + uint64_t raw; > + struct { > + uint32_t tail; /**< tail position */ > + uint32_t head; /**< head position */ > + } pos; > +}; > + > +struct rte_ring_hts_headtail { > + uint32_t sync_type; /**< sync type of prod/cons */ > + volatile union rte_ring_ht_pos ht __rte_aligned(8); }; > + > /** > * An RTE ring structure. > * > @@ -126,6 +139,7 @@ struct rte_ring { > RTE_STD_C11 > union { > struct rte_ring_headtail prod; > + struct rte_ring_hts_headtail hts_prod; > struct rte_ring_rts_headtail rts_prod; > } __rte_cache_aligned; >=20 > @@ -135,6 +149,7 @@ struct rte_ring { > RTE_STD_C11 > union { > struct rte_ring_headtail cons; > + struct rte_ring_hts_headtail hts_cons; > struct rte_ring_rts_headtail rts_cons; > } __rte_cache_aligned; >=20 > @@ -157,6 +172,9 @@ struct rte_ring { > #define RING_F_MP_RTS_ENQ 0x0008 /**< The default enqueue is "MP RTS". > */ #define RING_F_MC_RTS_DEQ 0x0010 /**< The default dequeue is "MC > RTS". */ >=20 > +#define RING_F_MP_HTS_ENQ 0x0020 /**< The default enqueue is "MP > HTS". > +*/ #define RING_F_MC_HTS_DEQ 0x0040 /**< The default dequeue is "MC > +HTS". */ > + > #define __IS_SP RTE_RING_SYNC_ST > #define __IS_MP RTE_RING_SYNC_MT > #define __IS_SC RTE_RING_SYNC_ST > @@ -513,6 +531,82 @@ __rte_ring_do_rts_dequeue(struct rte_ring *r, void > **obj_table, > return n; > } >=20 > +#include > + > +/** > + * @internal Start to enqueue several objects on the HTS ring. > + * Note that user has to call appropriate enqueue_finish() > + * to complete given enqueue operation. > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj_table > + * A pointer to a table of void * pointers (objects). > + * @param n > + * The number of objects to add in the ring from the obj_table. > + * @param behavior > + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a rin= g > + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from > ring > + * @param free_space > + * returns the amount of space after the enqueue operation has finishe= d > + * @return > + * Actual number of objects enqueued. > + * If behavior =3D=3D RTE_RING_QUEUE_FIXED, this will be 0 or n only. > + */ > +static __rte_always_inline unsigned int > +__rte_ring_do_hts_enqueue_start(struct rte_ring *r, void * const *obj_ta= ble, > + uint32_t n, enum rte_ring_queue_behavior behavior, > + uint32_t *free_space) > +{ > + uint32_t free, head; > + > + n =3D __rte_ring_hts_move_prod_head(r, n, behavior, &head, &free); > + > + if (n !=3D 0) > + ENQUEUE_PTRS(r, &r[1], head, obj_table, n, void *); > + > + if (free_space !=3D NULL) > + *free_space =3D free - n; > + return n; > +} rte_ring.h is becoming too big. May be we should move these functions to an= other HTS specific file. But leave the top level API in rte_ring.h. Similar= ly for RTS. > + > +/** > + * @internal Start to dequeue several objects from the HTS ring. > + * Note that user has to call appropriate dequeue_finish() > + * to complete given dequeue operation. > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj_table > + * A pointer to a table of void * pointers (objects). > + * @param n > + * The number of objects to pull from the ring. > + * @param behavior > + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a rin= g > + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from > ring > + * @param available > + * returns the number of remaining ring entries after the dequeue has > finished > + * @return > + * - Actual number of objects dequeued. > + * If behavior =3D=3D RTE_RING_QUEUE_FIXED, this will be 0 or n only= . > + */ > +static __rte_always_inline unsigned int > +__rte_ring_do_hts_dequeue_start(struct rte_ring *r, void **obj_table, > + unsigned int n, enum rte_ring_queue_behavior behavior, > + unsigned int *available) > +{ > + uint32_t entries, head; > + > + n =3D __rte_ring_hts_move_cons_head(r, n, behavior, &head, &entries); > + > + if (n !=3D 0) > + DEQUEUE_PTRS(r, &r[1], head, obj_table, n, void *); > + > + if (available !=3D NULL) > + *available =3D entries - n; > + return n; > +} > + > /** > * Enqueue several objects on the ring (multi-producers safe). > * > @@ -585,6 +679,47 @@ rte_ring_rts_enqueue_bulk(struct rte_ring *r, void * > const *obj_table, > free_space); > } >=20 > +/** > + * Start to enqueue several objects on the HTS ring (multi-producers saf= e). > + * Note that user has to call appropriate dequeue_finish() > + * to complete given dequeue operation. > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj_table > + * A pointer to a table of void * pointers (objects). > + * @param n > + * The number of objects to add in the ring from the obj_table. > + * @param free_space > + * if non-NULL, returns the amount of space in the ring after the > + * enqueue operation has finished. > + * @return > + * The number of objects enqueued, either 0 or n > + */ > +static __rte_always_inline unsigned int > +rte_ring_hts_enqueue_bulk_start(struct rte_ring *r, void * const *obj_ta= ble, > + unsigned int n, unsigned int *free_space) { > + return __rte_ring_do_hts_enqueue_start(r, obj_table, n, > + RTE_RING_QUEUE_FIXED, free_space); > +} I do not clearly understand the requirements on the enqueue_start and enque= ue_finish in the form they are here. IMO, only requirement for these APIs is to provide the ability to avoid int= ermediate memcpys. > + > +static __rte_always_inline void > +rte_ring_hts_enqueue_finish(struct rte_ring *r, unsigned int n) { > + __rte_ring_hts_update_tail(&r->hts_prod, n, 1); } > + > +static __rte_always_inline unsigned int > +rte_ring_hts_enqueue_bulk(struct rte_ring *r, void * const *obj_table, > + unsigned int n, unsigned int *free_space) { > + n =3D rte_ring_hts_enqueue_bulk_start(r, obj_table, n, free_space); > + if (n !=3D 0) > + rte_ring_hts_enqueue_finish(r, n); > + return n; > +} > + > /** > * Enqueue several objects on a ring. > * > @@ -615,6 +750,8 @@ rte_ring_enqueue_bulk(struct rte_ring *r, void * cons= t > *obj_table, > return rte_ring_sp_enqueue_bulk(r, obj_table, n, free_space); > case RTE_RING_SYNC_MT_RTS: > return rte_ring_rts_enqueue_bulk(r, obj_table, n, free_space); > + case RTE_RING_SYNC_MT_HTS: > + return rte_ring_hts_enqueue_bulk(r, obj_table, n, > free_space); > } >=20 > /* valid ring should never reach this point */ @@ -753,6 +890,47 @@ > rte_ring_rts_dequeue_bulk(struct rte_ring *r, void **obj_table, > available); > } >=20 > +/** > + * Start to dequeue several objects from an HTS ring (multi-consumers sa= fe). > + * Note that user has to call appropriate dequeue_finish() > + * to complete given dequeue operation. > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj_table > + * A pointer to a table of void * pointers (objects) that will be fill= ed. > + * @param n > + * The number of objects to dequeue from the ring to the obj_table. > + * @param available > + * If non-NULL, returns the number of remaining ring entries after the > + * dequeue has finished. > + * @return > + * The number of objects dequeued, either 0 or n > + */ > +static __rte_always_inline unsigned int > +rte_ring_hts_dequeue_bulk_start(struct rte_ring *r, void **obj_table, > + unsigned int n, unsigned int *available) { > + return __rte_ring_do_hts_dequeue_start(r, obj_table, n, > + RTE_RING_QUEUE_FIXED, available); > +} IMO, we should look to provide the ability to avoid intermediate copies whe= n the data from the ring needs to be distributed to different locations. My proposal in its form is complicated. But, I am thinking that, if the ret= urn values are abstracted in a structure, it might look much simple. > + > +static __rte_always_inline void > +rte_ring_hts_dequeue_finish(struct rte_ring *r, unsigned int n) { > + __rte_ring_hts_update_tail(&r->hts_cons, n, 0); } > + > +static __rte_always_inline unsigned int > +rte_ring_hts_dequeue_bulk(struct rte_ring *r, void **obj_table, > + unsigned int n, unsigned int *available) { > + n =3D rte_ring_hts_dequeue_bulk_start(r, obj_table, n, available); > + if (n !=3D 0) > + rte_ring_hts_dequeue_finish(r, n); > + return n; > +} > + > /** > * Dequeue several objects from a ring. > * > @@ -783,6 +961,8 @@ rte_ring_dequeue_bulk(struct rte_ring *r, void > **obj_table, unsigned int n, > return rte_ring_sc_dequeue_bulk(r, obj_table, n, available); > case RTE_RING_SYNC_MT_RTS: > return rte_ring_rts_dequeue_bulk(r, obj_table, n, available); > + case RTE_RING_SYNC_MT_HTS: > + return rte_ring_hts_dequeue_bulk(r, obj_table, n, available); > } >=20 > /* valid ring should never reach this point */ @@ -1111,6 +1291,41 > @@ rte_ring_rts_enqueue_burst(struct rte_ring *r, void * const *obj_table= , > RTE_RING_QUEUE_VARIABLE, free_space); } >=20 > +/** > + * Start to enqueue several objects on the HTS ring (multi-producers saf= e). > + * Note that user has to call appropriate dequeue_finish() > + * to complete given dequeue operation. > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj_table > + * A pointer to a table of void * pointers (objects). > + * @param n > + * The number of objects to add in the ring from the obj_table. > + * @param free_space > + * if non-NULL, returns the amount of space in the ring after the > + * enqueue operation has finished. > + * @return > + * The number of objects enqueued, either 0 or n > + */ > +static __rte_always_inline unsigned int > +rte_ring_hts_enqueue_burst_start(struct rte_ring *r, void * const *obj_t= able, > + unsigned int n, unsigned int *free_space) { > + return __rte_ring_do_hts_enqueue_start(r, obj_table, n, > + RTE_RING_QUEUE_VARIABLE, free_space); } > + rte_ring_hts_enqueue_burst_finish is not implemented. It requires the 'n' r= eturned from ' rte_ring_hts_enqueue_burst_start' to be passed. We can't com= pletely avoid passing correct information between xxx_start and xxx_finish = APIs. > +static __rte_always_inline unsigned int > +rte_ring_hts_enqueue_burst(struct rte_ring *r, void * const *obj_table, > + unsigned int n, unsigned int *free_space) { > + n =3D rte_ring_hts_enqueue_burst_start(r, obj_table, n, free_space); > + if (n !=3D 0) > + rte_ring_hts_enqueue_finish(r, n); > + return n; > +} > + > /** > * Enqueue several objects on a ring. > * > @@ -1141,6 +1356,8 @@ rte_ring_enqueue_burst(struct rte_ring *r, void * > const *obj_table, > return rte_ring_sp_enqueue_burst(r, obj_table, n, > free_space); > case RTE_RING_SYNC_MT_RTS: > return rte_ring_rts_enqueue_burst(r, obj_table, n, > free_space); > + case RTE_RING_SYNC_MT_HTS: > + return rte_ring_hts_enqueue_burst(r, obj_table, n, > free_space); > } >=20 > /* valid ring should never reach this point */ @@ -1225,6 +1442,42 > @@ rte_ring_rts_dequeue_burst(struct rte_ring *r, void **obj_table, > return __rte_ring_do_rts_dequeue(r, obj_table, n, > RTE_RING_QUEUE_VARIABLE, available); } > + > +/** > + * Start to dequeue several objects from an HTS ring (multi-consumers sa= fe). > + * Note that user has to call appropriate dequeue_finish() > + * to complete given dequeue operation. > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj_table > + * A pointer to a table of void * pointers (objects) that will be fill= ed. > + * @param n > + * The number of objects to dequeue from the ring to the obj_table. > + * @param available > + * If non-NULL, returns the number of remaining ring entries after the > + * dequeue has finished. > + * @return > + * The number of objects dequeued, either 0 or n > + */ > +static __rte_always_inline unsigned int > +rte_ring_hts_dequeue_burst_start(struct rte_ring *r, void **obj_table, > + unsigned int n, unsigned int *available) { > + return __rte_ring_do_hts_dequeue_start(r, obj_table, n, > + RTE_RING_QUEUE_VARIABLE, available); > +} > + > +static __rte_always_inline unsigned int > +rte_ring_hts_dequeue_burst(struct rte_ring *r, void **obj_table, > + unsigned int n, unsigned int *available) { > + n =3D rte_ring_hts_dequeue_burst_start(r, obj_table, n, available); > + if (n !=3D 0) > + rte_ring_hts_dequeue_finish(r, n); > + return n; > +} > + > /** > * Dequeue multiple objects from a ring up to a maximum number. > * > @@ -1255,6 +1508,8 @@ rte_ring_dequeue_burst(struct rte_ring *r, void > **obj_table, > return rte_ring_sc_dequeue_burst(r, obj_table, n, available); > case RTE_RING_SYNC_MT_RTS: > return rte_ring_rts_dequeue_burst(r, obj_table, n, available); > + case RTE_RING_SYNC_MT_HTS: > + return rte_ring_hts_dequeue_burst(r, obj_table, n, available); > } >=20 > /* valid ring should never reach this point */ diff --git > a/lib/librte_ring/rte_ring_hts_generic.h > b/lib/librte_ring/rte_ring_hts_generic.h > new file mode 100644 > index 000000000..7e447e30b > --- /dev/null > +++ b/lib/librte_ring/rte_ring_hts_generic.h > @@ -0,0 +1,228 @@ > +/* SPDX-License-Identifier: BSD-3-Clause > + * > + * Copyright (c) 2010-2020 Intel Corporation > + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org > + * All rights reserved. > + * Derived from FreeBSD's bufring.h > + * Used as BSD-3 Licensed with permission from Kip Macy. > + */ > + > +#ifndef _RTE_RING_HTS_GENERIC_H_ > +#define _RTE_RING_HTS_GENERIC_H_ > + > +/** > + * @file rte_ring_hts_generic.h > + * It is not recommended to include this file directly, > + * include instead. > + * Contains internal helper functions for head/tail sync (HTS) ring mode= . > + * In that mode enqueue/dequeue operation is fully serialized: > + * only one thread at a time is allowed to perform given op. > + * This is achieved by thread is allowed to proceed with changing > +head.value > + * only when head.value =3D=3D tail.value. > + * Both head and tail values are updated atomically (as one 64-bit value= ). > + * As another enhancement that provides ability to split > +enqueue/dequeue > + * operation into two phases: > + * - enqueue/dequeue start > + * - enqueue/dequeue finish > + * That allows user to inspect objects in the ring without removing > + * them from it (aka MT safe peek). > + * As an example: > + * // read 1 elem from the ring: > + * n =3D rte_ring_hts_dequeue_bulk_start(ring, &obj, 1, NULL); > + * if (n !=3D 0) { > + * //examined object > + * if (object_examine(obj) =3D=3D KEEP) > + * //decided to keep it in the ring. > + * rte_ring_hts_dequeue_finish(ring, 0); > + * else > + * //decided to remove it in the ring. > + * rte_ring_hts_dequeue_finish(ring, n); > + * } > + * Note that between _start_ and _finish_ the ring is sort of locked - > + * none other thread can proceed with enqueue(/dequeue) operation till > + * _finish_ will complete. This means it does not solve the problem for over committed systems. Do you= agree? > + */ > + > +static __rte_always_inline void > +__rte_ring_hts_update_tail(struct rte_ring_hts_headtail *ht, uint32_t nu= m, > + uint32_t enqueue) > +{ > + uint32_t n; > + union rte_ring_ht_pos p; > + > + if (enqueue) > + rte_smp_wmb(); > + else > + rte_smp_rmb(); > + > + p.raw =3D rte_atomic64_read((rte_atomic64_t *)(uintptr_t)&ht->ht.raw); > + > + n =3D p.pos.head - p.pos.tail; > + RTE_ASSERT(n >=3D num); > + RTE_SET_USED(n); > + > + p.pos.head =3D p.pos.tail + num; > + p.pos.tail =3D p.pos.head; > + > + rte_atomic64_set((rte_atomic64_t *)(uintptr_t)&ht->ht.raw, p.raw); } > + > +/** > + * @internal waits till tail will become equal to head. > + * Means no writer/reader is active for that ring. > + * Suppose to work as serialization point. > + */ > +static __rte_always_inline void > +__rte_ring_hts_head_wait(const struct rte_ring_hts_headtail *ht, > + union rte_ring_ht_pos *p) > +{ > + p->raw =3D rte_atomic64_read((rte_atomic64_t *) > + (uintptr_t)&ht->ht.raw); > + > + while (p->pos.head !=3D p->pos.tail) { > + rte_pause(); > + p->raw =3D rte_atomic64_read((rte_atomic64_t *) > + (uintptr_t)&ht->ht.raw); > + } > +} > + > +/** > + * @internal This function updates the producer head for enqueue > + * > + * @param r > + * A pointer to the ring structure > + * @param is_sp > + * Indicates whether multi-producer path is needed or not > + * @param n > + * The number of elements we will want to enqueue, i.e. how far should= the > + * head be moved > + * @param behavior > + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a rin= g > + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from > ring > + * @param old_head > + * Returns head value as it was before the move, i.e. where enqueue st= arts > + * @param new_head > + * Returns the current/new head value i.e. where enqueue finishes > + * @param free_entries > + * Returns the amount of free space in the ring BEFORE head was moved > + * @return > + * Actual number of objects enqueued. > + * If behavior =3D=3D RTE_RING_QUEUE_FIXED, this will be 0 or n only. > + */ > +static __rte_always_inline unsigned int > +__rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num, > + enum rte_ring_queue_behavior behavior, uint32_t *old_head, > + uint32_t *free_entries) > +{ > + uint32_t n; > + union rte_ring_ht_pos np, op; > + > + const uint32_t capacity =3D r->capacity; > + > + do { > + /* Reset n to the initial burst count */ > + n =3D num; > + > + /* wait for tail to be equal to head */ > + __rte_ring_hts_head_wait(&r->hts_prod, &op); > + > + /* add rmb barrier to avoid load/load reorder in weak > + * memory model. It is noop on x86 > + */ > + rte_smp_rmb(); > + > + /* > + * The subtraction is done between two unsigned 32bits value > + * (the result is always modulo 32 bits even if we have > + * *old_head > cons_tail). So 'free_entries' is always between > 0 > + * and capacity (which is < size). > + */ > + *free_entries =3D capacity + r->cons.tail - op.pos.head; > + > + /* check that we have enough room in ring */ > + if (unlikely(n > *free_entries)) > + n =3D (behavior =3D=3D RTE_RING_QUEUE_FIXED) ? > + 0 : *free_entries; > + > + if (n =3D=3D 0) > + return 0; > + > + np.pos.tail =3D op.pos.tail; > + np.pos.head =3D op.pos.head + n; > + > + } while (rte_atomic64_cmpset(&r->hts_prod.ht.raw, > + op.raw, np.raw) =3D=3D 0); > + > + *old_head =3D op.pos.head; > + return n; > +} > + > +/** > + * @internal This function updates the consumer head for dequeue > + * > + * @param r > + * A pointer to the ring structure > + * @param is_sc > + * Indicates whether multi-consumer path is needed or not > + * @param n > + * The number of elements we will want to enqueue, i.e. how far should= the > + * head be moved > + * @param behavior > + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a rin= g > + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from > ring > + * @param old_head > + * Returns head value as it was before the move, i.e. where dequeue st= arts > + * @param new_head > + * Returns the current/new head value i.e. where dequeue finishes > + * @param entries > + * Returns the number of entries in the ring BEFORE head was moved > + * @return > + * - Actual number of objects dequeued. > + * If behavior =3D=3D RTE_RING_QUEUE_FIXED, this will be 0 or n only= . > + */ > +static __rte_always_inline unsigned int > +__rte_ring_hts_move_cons_head(struct rte_ring *r, unsigned int num, > + enum rte_ring_queue_behavior behavior, uint32_t *old_head, > + uint32_t *entries) > +{ > + uint32_t n; > + union rte_ring_ht_pos np, op; > + > + /* move cons.head atomically */ > + do { > + /* Restore n as it may change every loop */ > + n =3D num; > + > + /* wait for tail to be equal to head */ > + __rte_ring_hts_head_wait(&r->hts_cons, &op); > + > + /* add rmb barrier to avoid load/load reorder in weak > + * memory model. It is noop on x86 > + */ > + rte_smp_rmb(); > + > + /* The subtraction is done between two unsigned 32bits value > + * (the result is always modulo 32 bits even if we have > + * cons_head > prod_tail). So 'entries' is always between 0 > + * and size(ring)-1. > + */ > + *entries =3D r->prod.tail - op.pos.head; > + > + /* Set the actual entries for dequeue */ > + if (n > *entries) > + n =3D (behavior =3D=3D RTE_RING_QUEUE_FIXED) ? 0 : > *entries; > + > + if (unlikely(n =3D=3D 0)) > + return 0; > + > + np.pos.tail =3D op.pos.tail; > + np.pos.head =3D op.pos.head + n; > + > + } while (rte_atomic64_cmpset(&r->hts_cons.ht.raw, > + op.raw, np.raw) =3D=3D 0); > + > + *old_head =3D op.pos.head; > + return n; > +} > + > +#endif /* _RTE_RING_HTS_GENERIC_H_ */ > -- > 2.17.1