From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from dpdk.org (dpdk.org [92.243.14.124]) by inbox.dpdk.org (Postfix) with ESMTP id AD0AEA0597; Thu, 9 Apr 2020 16:52:24 +0200 (CEST) Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id 82B5E1C2BB; Thu, 9 Apr 2020 16:52:24 +0200 (CEST) Received: from mga07.intel.com (mga07.intel.com [134.134.136.100]) by dpdk.org (Postfix) with ESMTP id 4600F1C2B4 for ; Thu, 9 Apr 2020 16:52:22 +0200 (CEST) IronPort-SDR: lKxKC0kz0hT7ipMuwI73SceYQ1F6Alo8pijGVPCzZh9qio3fB2ptLg1xoEu0HIQju+Hg+/A/Dg 9/Fkxng5htXg== X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from fmsmga006.fm.intel.com ([10.253.24.20]) by orsmga105.jf.intel.com with ESMTP/TLS/ECDHE-RSA-AES256-GCM-SHA384; 09 Apr 2020 07:52:21 -0700 IronPort-SDR: XdYB0mCQGhT7QNwKeCrS3SE5BFdbQ3Q1ChvhQiAMg5BB8XTVXz9K9sWXEALPx01IhhbMtuVXQO cUdfxkFCXO9w== X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.72,363,1580803200"; d="scan'208";a="453183864" Received: from orsmsx106.amr.corp.intel.com ([10.22.225.133]) by fmsmga006.fm.intel.com with ESMTP; 09 Apr 2020 07:52:20 -0700 Received: from orsmsx163.amr.corp.intel.com (10.22.240.88) by ORSMSX106.amr.corp.intel.com (10.22.225.133) with Microsoft SMTP Server (TLS) id 14.3.439.0; Thu, 9 Apr 2020 07:52:20 -0700 Received: from ORSEDG002.ED.cps.intel.com (10.7.248.5) by ORSMSX163.amr.corp.intel.com (10.22.240.88) with Microsoft SMTP Server (TLS) id 14.3.439.0; Thu, 9 Apr 2020 07:52:20 -0700 Received: from NAM12-BN8-obe.outbound.protection.outlook.com (104.47.55.171) by edgegateway.intel.com (134.134.137.101) with Microsoft SMTP Server (TLS) id 14.3.439.0; Thu, 9 Apr 2020 07:52:19 -0700 ARC-Seal: i=1; a=rsa-sha256; s=arcselector9901; d=microsoft.com; cv=none; b=cR/g+gNEs6JztpLJO7kitU8WdvvmW/n9LyAe6ObKDkIYRAT4fccrZRIxCl1bYb+W5ttYOH35a6/cb9zSDa77ll2bXLvkH8hJFjVIlZFcbcW99dxjCa7V61XL1IXm/jJI5pFmqDUtpZ5avVKCo6DmD+C8JNvd2Zxl8C7Vt0aufiHEAJaLnbiqbHhPY+RQN7Nq+h/0VNL/FXEqIR4EB6jzl0vl1xUE1cuudBTIrNzd8ruMt1K+eFwWU9rPOwwrrLNASvANIkYcZGTtrg7eNPbLyn4Lq5gyM4SiyZmwmT2IvlHPFR6KENRpnXEGyhtcZqyLLs7CiR5KsIANjt0K/m4H7w== ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=microsoft.com; s=arcselector9901; h=From:Date:Subject:Message-ID:Content-Type:MIME-Version:X-MS-Exchange-SenderADCheck; bh=ZzoZYWTjDMtX5isQJy88qtRQwKMFqRbIwj/FykGFc2Q=; b=W8R0DXdK6i8XaeEl6+eZLt6vK3kRl41WdKMUkAed26RKTDbUm6J7s3eq967d06t5cOngB2XTqBH09VRxlVJ/IIddh+mC1XFaLTRfDCUEPq3lsD3jJzBj2gJo6Y5nUyvqBmCqQpt32xKJXhni2O9hPnwkQyY5fcOqUTmWYE6GgMLU8KSv0XMtoVaVTaEbt6RsFwAHQ0uyioSwxnb/gnqzpsJj/JnTXNASxTITCgy+2/eiaSe1rPLzV9cpqSd+mx+4tRolfGx+EBRKq46mLwQueWwJ9sAXh+TINrq1rzuC0KFYgdmxIAC7kbPr1xoTG+vAd2A+XowLSH5zGorGITBByw== ARC-Authentication-Results: i=1; mx.microsoft.com 1; spf=pass smtp.mailfrom=intel.com; dmarc=pass action=none header.from=intel.com; dkim=pass header.d=intel.com; arc=none DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=intel.onmicrosoft.com; s=selector2-intel-onmicrosoft-com; h=From:Date:Subject:Message-ID:Content-Type:MIME-Version:X-MS-Exchange-SenderADCheck; bh=ZzoZYWTjDMtX5isQJy88qtRQwKMFqRbIwj/FykGFc2Q=; b=IFo22MsYhLtkUK2APUsAr57lsmD4DXhPzMbk2yCCk+dgll/VFOLO1vm4phXJwH1Cg0o8o+bbXRATD2p3i4Z1mVpGKodQXFy+eMs0GjLOPCvv5COilCTPxZM1o5iOvKud0aqxeXHg3YuU5VVZiSsjzdiYtEk9hTl20CurzCu2wjk= Received: from SN6PR11MB2558.namprd11.prod.outlook.com (2603:10b6:805:5d::19) by SN6PR11MB2941.namprd11.prod.outlook.com (2603:10b6:805:dc::25) with Microsoft SMTP Server (version=TLS1_2, cipher=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384) id 15.20.2878.19; Thu, 9 Apr 2020 14:52:17 +0000 Received: from SN6PR11MB2558.namprd11.prod.outlook.com ([fe80::3982:ee47:b95a:4ed4]) by SN6PR11MB2558.namprd11.prod.outlook.com ([fe80::3982:ee47:b95a:4ed4%3]) with mapi id 15.20.2900.015; Thu, 9 Apr 2020 14:52:17 +0000 From: "Ananyev, Konstantin" To: Honnappa Nagarahalli , "dev@dpdk.org" CC: "david.marchand@redhat.com" , "jielong.zjl@antfin.com" , nd , nd Thread-Topic: [PATCH v3 3/9] ring: introduce RTS ring mode Thread-Index: AQHWCd9yutInr8fFKkq13RS+eoOjdahusX8AgAIjjGA= Date: Thu, 9 Apr 2020 14:52:17 +0000 Message-ID: References: <20200402220959.29885-1-konstantin.ananyev@intel.com> <20200403174235.23308-1-konstantin.ananyev@intel.com> <20200403174235.23308-4-konstantin.ananyev@intel.com> In-Reply-To: Accept-Language: en-GB, en-US Content-Language: en-US X-MS-Has-Attach: X-MS-TNEF-Correlator: dlp-product: dlpe-windows dlp-reaction: no-action dlp-version: 11.2.0.6 authentication-results: spf=none (sender IP is ) smtp.mailfrom=konstantin.ananyev@intel.com; x-originating-ip: [192.198.151.161] x-ms-publictraffictype: Email x-ms-office365-filtering-correlation-id: 710831a5-6ff3-4966-5530-08d7dc959922 x-ms-traffictypediagnostic: SN6PR11MB2941: x-microsoft-antispam-prvs: x-ms-oob-tlc-oobclassifiers: OLM:4941; x-forefront-prvs: 0368E78B5B x-forefront-antispam-report: CIP:255.255.255.255; CTRY:; LANG:en; SCL:1; SRV:; IPV:NLI; SFV:NSPM; H:SN6PR11MB2558.namprd11.prod.outlook.com; PTR:; CAT:NONE; SFTY:; SFS:(10019020)(136003)(376002)(396003)(366004)(346002)(39860400002)(71200400001)(9686003)(55016002)(478600001)(26005)(4326008)(186003)(7696005)(6506007)(64756008)(66476007)(66446008)(66556008)(5660300002)(66946007)(52536014)(76116006)(54906003)(316002)(8936002)(110136005)(33656002)(30864003)(8676002)(2906002)(81156014)(86362001)(81166007)(21314003)(579004)(559001); DIR:OUT; SFP:1102; x-ms-exchange-senderadcheck: 1 x-microsoft-antispam: BCL:0; x-microsoft-antispam-message-info: H3Wej6aVnWD1xP6jPx16gdvPMfUkIEZ6H/TO0jjfE+N1qebewWYIYRuvV2abd7ejy2JT0vWIi/vgONrt8Kw/oba3Y5GxC2xwCVdinTvNJkTc1EC77OzxcZlw/l7Y0wqql3NQOtLZq6IiVlMOsRDtQ3kIwSo4ajBXkGF4LxvqoNFW9RIqMSN3eDWb1jMaTDcTPR/5dGwIUvm3sPhyRAlqlx7ldcogJ/m73+oxAMZPaXnPMPZF2Q73c+zQPtJnJvItZLyPRfW7vSamzXCi0wECuGRFsYynMYPKFUMDnSVPAaUNRv6dwV+SmfE5ysbeLGYQcIBstupmjg7NkdeiGJz+XL2Ler+S4iADVZdxpwscrLfMQiuhAnP7WudWLfUKyAlEoYw2ilIgi+4wysSqmbsYwuhKoP4OSnk0jZY0pk+dA3Watu8Aj/ztb7rNnp0E7jDLrq6Mv85EN8GaPOLP17YtznOzay/rVYkMXU8yDyJ2BshHCMXBCxQJZEzP6nphp4GV x-ms-exchange-antispam-messagedata: P5CtYE2H/mskntJH7I2xneDDBCDVsgKa6uHViWkdiVkS6dJZ+/ScWRFJ80hyexCE98cybmO5OpX4UUpa4hq4LLpWLrppEE3K044CimKB394eqQVsAw8NcQeEQIKWIm080QjtExgqUaEaExUFfx7Iwg== x-ms-exchange-transport-forked: True Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: quoted-printable MIME-Version: 1.0 X-MS-Exchange-CrossTenant-Network-Message-Id: 710831a5-6ff3-4966-5530-08d7dc959922 X-MS-Exchange-CrossTenant-originalarrivaltime: 09 Apr 2020 14:52:17.1977 (UTC) X-MS-Exchange-CrossTenant-fromentityheader: Hosted X-MS-Exchange-CrossTenant-id: 46c98d88-e344-4ed4-8496-4ed7712e255d X-MS-Exchange-CrossTenant-mailboxtype: HOSTED X-MS-Exchange-CrossTenant-userprincipalname: RV2bwgHlVUPaP+jp9J64Mq1BMpVLtePzr+XANZ7TVH8WprLJVhJ+89URatEBwkuu2Kp9rUyGo+JVCKXevNt1bvllF/X1N2FPNGyKDhpZeRY= X-MS-Exchange-Transport-CrossTenantHeadersStamped: SN6PR11MB2941 X-OriginatorOrg: intel.com Subject: Re: [dpdk-dev] [PATCH v3 3/9] ring: introduce RTS ring mode X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" > > Introduce relaxed tail sync (RTS) mode for MT ring synchronization. > > Aim to reduce stall times in case when ring is used on overcommited cpu= s > > (multiple active threads on the same cpu). > > The main difference from original MP/MC algorithm is that tail value is > > increased not by every thread that finished enqueue/dequeue, but only b= y the > > last one. > > That allows threads to avoid spinning on ring tail value, leaving actua= l tail > > value change to the last thread in the update queue. > > > > check-abi.sh reports what I believe is a false-positive about ring cons= /prod > > changes. As a workaround, devtools/libabigail.abignore is updated to su= ppress > > *struct ring* related errors. > This can be removed from the commit message. >=20 > > > > Signed-off-by: Konstantin Ananyev > > --- > > devtools/libabigail.abignore | 7 + > > lib/librte_ring/Makefile | 5 +- > > lib/librte_ring/meson.build | 5 +- > > lib/librte_ring/rte_ring.c | 100 +++++++- > > lib/librte_ring/rte_ring.h | 110 ++++++++- > > lib/librte_ring/rte_ring_elem.h | 86 ++++++- > > lib/librte_ring/rte_ring_rts.h | 316 +++++++++++++++++++++++++ > > lib/librte_ring/rte_ring_rts_elem.h | 205 ++++++++++++++++ > > lib/librte_ring/rte_ring_rts_generic.h | 210 ++++++++++++++++ > > 9 files changed, 1015 insertions(+), 29 deletions(-) create mode 1006= 44 > > lib/librte_ring/rte_ring_rts.h create mode 100644 > > lib/librte_ring/rte_ring_rts_elem.h > > create mode 100644 lib/librte_ring/rte_ring_rts_generic.h > > > > diff --git a/devtools/libabigail.abignore b/devtools/libabigail.abignor= e index > > a59df8f13..cd86d89ca 100644 > > --- a/devtools/libabigail.abignore > > +++ b/devtools/libabigail.abignore > > @@ -11,3 +11,10 @@ > > type_kind =3D enum > > name =3D rte_crypto_asym_xform_type > > changed_enumerators =3D RTE_CRYPTO_ASYM_XFORM_TYPE_LIST_END > > +; Ignore updates of ring prod/cons > > +[suppress_type] > > + type_kind =3D struct > > + name =3D rte_ring > > +[suppress_type] > > + type_kind =3D struct > > + name =3D rte_event_ring > Does this block the reporting of these structures forever? Till we'll have a fix in libabigail, then we can remove these lines. I don't know any better alternative. >=20 > > diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile index > > 917c560ad..8f5c284cc 100644 > > --- a/lib/librte_ring/Makefile > > +++ b/lib/librte_ring/Makefile > > @@ -18,6 +18,9 @@ SRCS-$(CONFIG_RTE_LIBRTE_RING) :=3D rte_ring.c > > SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include :=3D rte_ring.h \ > > rte_ring_elem.h \ > > rte_ring_generic.h \ > > - rte_ring_c11_mem.h > > + rte_ring_c11_mem.h \ > > + rte_ring_rts.h \ > > + rte_ring_rts_elem.h \ > > + rte_ring_rts_generic.h > > > > include $(RTE_SDK)/mk/rte.lib.mk > > diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build = index > > f2f3ccc88..612936afb 100644 > > --- a/lib/librte_ring/meson.build > > +++ b/lib/librte_ring/meson.build > > @@ -5,7 +5,10 @@ sources =3D files('rte_ring.c') headers =3D files('rt= e_ring.h', > > 'rte_ring_elem.h', > > 'rte_ring_c11_mem.h', > > - 'rte_ring_generic.h') > > + 'rte_ring_generic.h', > > + 'rte_ring_rts.h', > > + 'rte_ring_rts_elem.h', > > + 'rte_ring_rts_generic.h') > > > > # rte_ring_create_elem and rte_ring_get_memsize_elem are experimental > > allow_experimental_apis =3D true diff --git a/lib/librte_ring/rte_ring.= c > > b/lib/librte_ring/rte_ring.c index fa5733907..222eec0fb 100644 > > --- a/lib/librte_ring/rte_ring.c > > +++ b/lib/librte_ring/rte_ring.c > > @@ -45,6 +45,9 @@ EAL_REGISTER_TAILQ(rte_ring_tailq) > > /* true if x is a power of 2 */ > > #define POWEROF2(x) ((((x)-1) & (x)) =3D=3D 0) > > > > +/* by default set head/tail distance as 1/8 of ring capacity */ > > +#define HTD_MAX_DEF 8 > > + > > /* return the size of memory occupied by a ring */ ssize_t > > rte_ring_get_memsize_elem(unsigned int esize, unsigned int count) @@ - > > 79,11 +82,84 @@ rte_ring_get_memsize(unsigned int count) > > return rte_ring_get_memsize_elem(sizeof(void *), count); } > > > > +/* > > + * internal helper function to reset prod/cons head-tail values. > > + */ > > +static void > > +reset_headtail(void *p) > > +{ > > + struct rte_ring_headtail *ht; > > + struct rte_ring_rts_headtail *ht_rts; > > + > > + ht =3D p; > > + ht_rts =3D p; > > + > > + switch (ht->sync_type) { > > + case RTE_RING_SYNC_MT: > > + case RTE_RING_SYNC_ST: > > + ht->head =3D 0; > > + ht->tail =3D 0; > > + break; > > + case RTE_RING_SYNC_MT_RTS: > > + ht_rts->head.raw =3D 0; > > + ht_rts->tail.raw =3D 0; > > + break; > > + default: > > + /* unknown sync mode */ > > + RTE_ASSERT(0); > > + } > > +} > > + > > void > > rte_ring_reset(struct rte_ring *r) > > { > > - r->prod.head =3D r->cons.head =3D 0; > > - r->prod.tail =3D r->cons.tail =3D 0; > > + reset_headtail(&r->prod); > > + reset_headtail(&r->cons); > > +} > > + > > +/* > > + * helper function, calculates sync_type values for prod and cons > > + * based on input flags. Returns zero at success or negative > > + * errno value otherwise. > > + */ > > +static int > > +get_sync_type(uint32_t flags, enum rte_ring_sync_type *prod_st, > > + enum rte_ring_sync_type *cons_st) > > +{ > > + static const uint32_t prod_st_flags =3D > > + (RING_F_SP_ENQ | RING_F_MP_RTS_ENQ); > > + static const uint32_t cons_st_flags =3D > > + (RING_F_SC_DEQ | RING_F_MC_RTS_DEQ); > > + > > + switch (flags & prod_st_flags) { > > + case 0: > > + *prod_st =3D RTE_RING_SYNC_MT; > > + break; > > + case RING_F_SP_ENQ: > > + *prod_st =3D RTE_RING_SYNC_ST; > > + break; > > + case RING_F_MP_RTS_ENQ: > > + *prod_st =3D RTE_RING_SYNC_MT_RTS; > > + break; > > + default: > > + return -EINVAL; > > + } > > + > > + switch (flags & cons_st_flags) { > > + case 0: > > + *cons_st =3D RTE_RING_SYNC_MT; > > + break; > > + case RING_F_SC_DEQ: > > + *cons_st =3D RTE_RING_SYNC_ST; > > + break; > > + case RING_F_MC_RTS_DEQ: > > + *cons_st =3D RTE_RING_SYNC_MT_RTS; > > + break; > > + default: > > + return -EINVAL; > > + } > > + > > + return 0; > > } > > > > int > > @@ -100,16 +176,20 @@ rte_ring_init(struct rte_ring *r, const char *nam= e, > > unsigned count, > > RTE_BUILD_BUG_ON((offsetof(struct rte_ring, prod) & > > RTE_CACHE_LINE_MASK) !=3D 0); > > > > + RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, sync_type) !=3D > > + offsetof(struct rte_ring_rts_headtail, sync_type)); > > + RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, tail) !=3D > > + offsetof(struct rte_ring_rts_headtail, tail.val.pos)); > > + > > /* init the ring structure */ > > memset(r, 0, sizeof(*r)); > > ret =3D strlcpy(r->name, name, sizeof(r->name)); > > if (ret < 0 || ret >=3D (int)sizeof(r->name)) > > return -ENAMETOOLONG; > > r->flags =3D flags; > > - r->prod.sync_type =3D (flags & RING_F_SP_ENQ) ? > > - RTE_RING_SYNC_ST : RTE_RING_SYNC_MT; > > - r->cons.sync_type =3D (flags & RING_F_SC_DEQ) ? > > - RTE_RING_SYNC_ST : RTE_RING_SYNC_MT; > > + ret =3D get_sync_type(flags, &r->prod.sync_type, &r->cons.sync_type); > > + if (ret !=3D 0) > > + return ret; > > > > if (flags & RING_F_EXACT_SZ) { > > r->size =3D rte_align32pow2(count + 1); @@ -126,8 +206,12 > > @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned count, > > r->mask =3D count - 1; > > r->capacity =3D r->mask; > > } > > - r->prod.head =3D r->cons.head =3D 0; > > - r->prod.tail =3D r->cons.tail =3D 0; > > + > > + /* set default values for head-tail distance */ > > + if (flags & RING_F_MP_RTS_ENQ) > > + rte_ring_set_prod_htd_max(r, r->capacity / HTD_MAX_DEF); > > + if (flags & RING_F_MC_RTS_DEQ) > > + rte_ring_set_cons_htd_max(r, r->capacity / HTD_MAX_DEF); > > > > return 0; > > } > > diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h in= dex > > d4775a063..f6f084d79 100644 > > --- a/lib/librte_ring/rte_ring.h > > +++ b/lib/librte_ring/rte_ring.h > > @@ -48,6 +48,7 @@ extern "C" { > > #include > > #include > > #include > > +#include > > > > #define RTE_TAILQ_RING_NAME "RTE_RING" > > > > @@ -65,10 +66,13 @@ enum rte_ring_queue_behavior { enum > > rte_ring_sync_type { > > RTE_RING_SYNC_MT, /**< multi-thread safe (default mode) */ > > RTE_RING_SYNC_ST, /**< single thread only */ > > +#ifdef ALLOW_EXPERIMENTAL_API > > + RTE_RING_SYNC_MT_RTS, /**< multi-thread relaxed tail sync */ > > #endif > > }; > > > > /** > > - * structure to hold a pair of head/tail values and other metadata. > > + * structures to hold a pair of head/tail values and other metadata. > > * Depending on sync_type format of that structure might be different, > > * but offset for *sync_type* and *tail* values should remain the same= . > > */ > > @@ -84,6 +88,21 @@ struct rte_ring_headtail { > > }; > > }; > > > > +union rte_ring_ht_poscnt { > nit, this is specific to RTS, may be change this to rte_ring_rts_ht_poscn= t? Ok. >=20 > > + uint64_t raw; > > + struct { > > + uint32_t cnt; /**< head/tail reference counter */ > > + uint32_t pos; /**< head/tail position */ > > + } val; > > +}; > > + > > +struct rte_ring_rts_headtail { > > + volatile union rte_ring_ht_poscnt tail; > > + enum rte_ring_sync_type sync_type; /**< sync type of prod/cons */ > > + uint32_t htd_max; /**< max allowed distance between head/tail */ > > + volatile union rte_ring_ht_poscnt head; }; > > + > > /** > > * An RTE ring structure. > > * > > @@ -111,11 +130,21 @@ struct rte_ring { > > char pad0 __rte_cache_aligned; /**< empty cache line */ > > > > /** Ring producer status. */ > > - struct rte_ring_headtail prod __rte_cache_aligned; > > + RTE_STD_C11 > > + union { > > + struct rte_ring_headtail prod; > > + struct rte_ring_rts_headtail rts_prod; > > + } __rte_cache_aligned; > > + > > char pad1 __rte_cache_aligned; /**< empty cache line */ > > > > /** Ring consumer status. */ > > - struct rte_ring_headtail cons __rte_cache_aligned; > > + RTE_STD_C11 > > + union { > > + struct rte_ring_headtail cons; > > + struct rte_ring_rts_headtail rts_cons; > > + } __rte_cache_aligned; > > + > > char pad2 __rte_cache_aligned; /**< empty cache line */ }; > > > > @@ -132,6 +161,9 @@ struct rte_ring { > > #define RING_F_EXACT_SZ 0x0004 > > #define RTE_RING_SZ_MASK (0x7fffffffU) /**< Ring size mask */ > > > > +#define RING_F_MP_RTS_ENQ 0x0008 /**< The default enqueue is "MP RTS". > > +*/ #define RING_F_MC_RTS_DEQ 0x0010 /**< The default dequeue is "MC > > +RTS". */ > > + > > #define __IS_SP RTE_RING_SYNC_ST > > #define __IS_MP RTE_RING_SYNC_MT > > #define __IS_SC RTE_RING_SYNC_ST > > @@ -461,6 +493,10 @@ rte_ring_sp_enqueue_bulk(struct rte_ring *r, void = * > > const *obj_table, > > RTE_RING_SYNC_ST, free_space); > > } > > > > +#ifdef ALLOW_EXPERIMENTAL_API > > +#include > > +#endif > > + > > /** > > * Enqueue several objects on a ring. > > * > > @@ -484,8 +520,21 @@ static __rte_always_inline unsigned int > > rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table, > > unsigned int n, unsigned int *free_space) { > > - return __rte_ring_do_enqueue(r, obj_table, n, > > RTE_RING_QUEUE_FIXED, > > - r->prod.sync_type, free_space); > > + switch (r->prod.sync_type) { > > + case RTE_RING_SYNC_MT: > > + return rte_ring_mp_enqueue_bulk(r, obj_table, n, free_space); > > + case RTE_RING_SYNC_ST: > > + return rte_ring_sp_enqueue_bulk(r, obj_table, n, free_space); > Have you validated if these affect the performance for the existing APIs? I run ring_pmd_perf_autotest (AFAIK, that's the only one of our perf tests that calls rte_ring_enqueue/d= equeue), and didn't see any real difference in perf numbers.=20 > I am also wondering why should we support these new modes in the legacy A= PIs? Majority of DPDK users still do use legacy API, and I am not sure all of them will be happy to switch to _elem_ one manuall= y. Plus I can't see how we can justify that after let say: rte_ring_init(ring, ..., RING_F_MP_HTS_ENQ | RING_F_MC_HTS_DEQ); returns with success valid call to rte_ring_enqueue(ring,...) should fail. > I think users should move to use rte_ring_xxx_elem APIs. If users want to= use RTS/HTS it will be a good time for them to move to new APIs. If they use rte_ring_enqueue/dequeue all they have to do - just change flag= s in ring_create/ring_init call. With what you suggest - they have to change every rte_ring_enqueue/dequeue to rte_ring_elem_enqueue/dequeue. That's much bigger code churn. > They anyway have to test their code for RTS/HTS, might as well make the c= hange to new APIs and test both. > It will be less code to maintain for the community as well. That's true, right now there is a lot of duplication between _elem_ and legacy code.=20 Actually the only real diff between them - actual copying of the objects. But I thought we are going to deal with that, just by changing one day all legacy API to wrappers around _elem_ calls, i.e something like: static __rte_always_inline unsigned int rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table, unsigned int n, unsigned int *free_space) { return rte_ring_enqueue_elem_bulk(r, obj_table, sizeof(uintptr_t), n, fre= e_space); } That way users will switch to new API automatically, without any extra effort for them, and we will be able to remove legacy cod= e.=20 Do you have some other thoughts here how to deal with this legacy/elem conv= ersion? >=20 > > #ifdef > > +ALLOW_EXPERIMENTAL_API > > + case RTE_RING_SYNC_MT_RTS: > > + return rte_ring_mp_rts_enqueue_bulk(r, obj_table, n, > > + free_space); > > +#endif > > + } > > + > > + /* valid ring should never reach this point */ > > + RTE_ASSERT(0); > > + return 0; > > } > > > > /** > > @@ -619,8 +668,20 @@ static __rte_always_inline unsigned int > > rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned in= t n, > > unsigned int *available) > > { > > - return __rte_ring_do_dequeue(r, obj_table, n, > > RTE_RING_QUEUE_FIXED, > > - r->cons.sync_type, available); > > + switch (r->cons.sync_type) { > > + case RTE_RING_SYNC_MT: > > + return rte_ring_mc_dequeue_bulk(r, obj_table, n, available); > > + case RTE_RING_SYNC_ST: > > + return rte_ring_sc_dequeue_bulk(r, obj_table, n, available); > > #ifdef > > +ALLOW_EXPERIMENTAL_API > > + case RTE_RING_SYNC_MT_RTS: > > + return rte_ring_mc_rts_dequeue_bulk(r, obj_table, n, > > available); > > +#endif > > + } > > + > > + /* valid ring should never reach this point */ > > + RTE_ASSERT(0); > > + return 0; > > } > > > > /** > > @@ -940,8 +1001,21 @@ static __rte_always_inline unsigned > > rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table, > > unsigned int n, unsigned int *free_space) { > > - return __rte_ring_do_enqueue(r, obj_table, n, > > RTE_RING_QUEUE_VARIABLE, > > - r->prod.sync_type, free_space); > > + switch (r->prod.sync_type) { > > + case RTE_RING_SYNC_MT: > > + return rte_ring_mp_enqueue_burst(r, obj_table, n, > > free_space); > > + case RTE_RING_SYNC_ST: > > + return rte_ring_sp_enqueue_burst(r, obj_table, n, free_space); > > #ifdef > > +ALLOW_EXPERIMENTAL_API > > + case RTE_RING_SYNC_MT_RTS: > > + return rte_ring_mp_rts_enqueue_burst(r, obj_table, n, > > + free_space); > > +#endif > > + } > > + > > + /* valid ring should never reach this point */ > > + RTE_ASSERT(0); > > + return 0; > > } > > > > /** > > @@ -1020,9 +1094,21 @@ static __rte_always_inline unsigned > > rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, > > unsigned int n, unsigned int *available) { > > - return __rte_ring_do_dequeue(r, obj_table, n, > > - RTE_RING_QUEUE_VARIABLE, > > - r->cons.sync_type, available); > > + switch (r->cons.sync_type) { > > + case RTE_RING_SYNC_MT: > > + return rte_ring_mc_dequeue_burst(r, obj_table, n, available); > > + case RTE_RING_SYNC_ST: > > + return rte_ring_sc_dequeue_burst(r, obj_table, n, available); > > #ifdef > > +ALLOW_EXPERIMENTAL_API > > + case RTE_RING_SYNC_MT_RTS: > > + return rte_ring_mc_rts_dequeue_burst(r, obj_table, n, > > + available); > > +#endif > > + } > > + > > + /* valid ring should never reach this point */ > > + RTE_ASSERT(0); > > + return 0; > > } > > > > #ifdef __cplusplus > > diff --git a/lib/librte_ring/rte_ring_elem.h b/lib/librte_ring/rte_ring= _elem.h > > index 28f9836e6..5de0850dc 100644 > > --- a/lib/librte_ring/rte_ring_elem.h > > +++ b/lib/librte_ring/rte_ring_elem.h > > @@ -542,6 +542,8 @@ rte_ring_sp_enqueue_bulk_elem(struct rte_ring *r, > > const void *obj_table, > > RTE_RING_QUEUE_FIXED, __IS_SP, free_space); } > > > > +#include > > + > > /** > > * Enqueue several objects on a ring. > > * > > @@ -571,6 +573,26 @@ rte_ring_enqueue_bulk_elem(struct rte_ring *r, > > const void *obj_table, { > > return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, > > RTE_RING_QUEUE_FIXED, r->prod.sync_type, > > free_space); > > + > > + switch (r->prod.sync_type) { > > + case RTE_RING_SYNC_MT: > > + return rte_ring_mp_enqueue_bulk_elem(r, obj_table, esize, n, > > + free_space); > > + case RTE_RING_SYNC_ST: > > + return rte_ring_sp_enqueue_bulk_elem(r, obj_table, esize, n, > > + free_space); > > +#ifdef ALLOW_EXPERIMENTAL_API > > + case RTE_RING_SYNC_MT_RTS: > > + return rte_ring_mp_rts_enqueue_bulk_elem(r, obj_table, > > esize, n, > > + free_space); > > +#endif > > + } > > + > > + /* valid ring should never reach this point */ > > + RTE_ASSERT(0); > > + if (free_space !=3D NULL) > > + *free_space =3D 0; > > + return 0; > > } > > > > /** > > @@ -733,8 +755,25 @@ static __rte_always_inline unsigned int > > rte_ring_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, > > unsigned int esize, unsigned int n, unsigned int *available) { > > - return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, > > - RTE_RING_QUEUE_FIXED, r->cons.sync_type, > > available); > > + switch (r->cons.sync_type) { > > + case RTE_RING_SYNC_MT: > > + return rte_ring_mc_dequeue_bulk_elem(r, obj_table, esize, n, > > + available); > > + case RTE_RING_SYNC_ST: > > + return rte_ring_sc_dequeue_bulk_elem(r, obj_table, esize, n, > > + available); > > +#ifdef ALLOW_EXPERIMENTAL_API > > + case RTE_RING_SYNC_MT_RTS: > > + return rte_ring_mc_rts_dequeue_bulk_elem(r, obj_table, > > esize, > > + n, available); > > +#endif > > + } > > + > > + /* valid ring should never reach this point */ > > + RTE_ASSERT(0); > > + if (available !=3D NULL) > > + *available =3D 0; > > + return 0; > > } > > > > /** > > @@ -901,8 +940,25 @@ static __rte_always_inline unsigned > > rte_ring_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, > > unsigned int esize, unsigned int n, unsigned int *free_space) { > > - return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, > > - RTE_RING_QUEUE_VARIABLE, r->prod.sync_type, > > free_space); > > + switch (r->prod.sync_type) { > > + case RTE_RING_SYNC_MT: > > + return rte_ring_mp_enqueue_burst_elem(r, obj_table, esize, n, > > + free_space); > > + case RTE_RING_SYNC_ST: > > + return rte_ring_sp_enqueue_burst_elem(r, obj_table, esize, n, > > + free_space); > > +#ifdef ALLOW_EXPERIMENTAL_API > > + case RTE_RING_SYNC_MT_RTS: > > + return rte_ring_mp_rts_enqueue_burst_elem(r, obj_table, > > esize, > > + n, free_space); > > +#endif > > + } > > + > > + /* valid ring should never reach this point */ > > + RTE_ASSERT(0); > > + if (free_space !=3D NULL) > > + *free_space =3D 0; > > + return 0; > > } > > > > /** > > @@ -993,9 +1049,25 @@ static __rte_always_inline unsigned int > > rte_ring_dequeue_burst_elem(struct rte_ring *r, void *obj_table, > > unsigned int esize, unsigned int n, unsigned int *available) { > > - return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, > > - RTE_RING_QUEUE_VARIABLE, > > - r->cons.sync_type, available); > > + switch (r->cons.sync_type) { > > + case RTE_RING_SYNC_MT: > > + return rte_ring_mc_dequeue_burst_elem(r, obj_table, esize, n, > > + available); > > + case RTE_RING_SYNC_ST: > > + return rte_ring_sc_dequeue_burst_elem(r, obj_table, esize, n, > > + available); > > +#ifdef ALLOW_EXPERIMENTAL_API > > + case RTE_RING_SYNC_MT_RTS: > > + return rte_ring_mc_rts_dequeue_burst_elem(r, obj_table, > > esize, > > + n, available); > > +#endif > > + } > > + > > + /* valid ring should never reach this point */ > > + RTE_ASSERT(0); > > + if (available !=3D NULL) > > + *available =3D 0; > > + return 0; > > } > > > > #ifdef __cplusplus > > diff --git a/lib/librte_ring/rte_ring_rts.h b/lib/librte_ring/rte_ring_= rts.h new > > file mode 100644 index 000000000..18404fe48 > > --- /dev/null > > +++ b/lib/librte_ring/rte_ring_rts.h > IMO, we should not provide these APIs. You mean only _elem_ ones, as discussed above? >=20 > > @@ -0,0 +1,316 @@ > > +/* SPDX-License-Identifier: BSD-3-Clause > > + * > > + * Copyright (c) 2010-2017 Intel Corporation > nit, the year should change to 2020? Look at others too. ack, will do.=20 >=20 > > + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org > > + * All rights reserved. > > + * Derived from FreeBSD's bufring.h > > + * Used as BSD-3 Licensed with permission from Kip Macy. > > + */ > > + > > +#ifndef _RTE_RING_RTS_H_ > > +#define _RTE_RING_RTS_H_ > > + > > +/** > > + * @file rte_ring_rts.h > > + * @b EXPERIMENTAL: this API may change without prior notice > > + * It is not recommended to include this file directly. > > + * Please include instead. > > + * > > + * Contains functions for Relaxed Tail Sync (RTS) ring mode. > > + * The main idea remains the same as for our original MP/MC > = ^^^ the > > +synchronization > > + * mechanism. > > + * The main difference is that tail value is increased not > > + * by every thread that finished enqueue/dequeue, > > + * but only by the last one doing enqueue/dequeue. > should we say 'current last' or 'last thread at a given instance'? >=20 > > + * That allows threads to skip spinning on tail value, > > + * leaving actual tail value change to last thread in the update queue= . > nit, I understand what you mean by 'update queue' here. IMO, we should re= move it as it might confuse some. >=20 > > + * RTS requires 2 64-bit CAS for each enqueue(/dequeue) operation: > > + * one for head update, second for tail update. > > + * As a gain it allows thread to avoid spinning/waiting on tail value. > > + * In comparision original MP/MC algorithm requires one 32-bit CAS > > + * for head update and waiting/spinning on tail value. > > + * > > + * Brief outline: > > + * - introduce refcnt for both head and tail. > Suggesting using the same names as used in the structures. >=20 > > + * - increment head.refcnt for each head.value update > > + * - write head:value and head:refcnt atomically (64-bit CAS) > > + * - move tail.value ahead only when tail.refcnt + 1 =3D=3D head.refc= nt > May be add '(indicating that this is the last thread updating the tail)' >=20 > > + * - increment tail.refcnt when each enqueue/dequeue op finishes > May be add 'otherwise' at the beginning. >=20 > > + * (no matter is tail:value going to change or not) > nit ^^ if > > + * - write tail.value and tail.recnt atomically (64-bit CAS) > > + * > > + * To avoid producer/consumer starvation: > > + * - limit max allowed distance between head and tail value (HTD_MAX)= . > > + * I.E. thread is allowed to proceed with changing head.value, > > + * only when: head.value - tail.value <=3D HTD_MAX > > + * HTD_MAX is an optional parameter. > > + * With HTD_MAX =3D=3D 0 we'll have fully serialized ring - > > + * i.e. only one thread at a time will be able to enqueue/dequeue > > + * to/from the ring. > > + * With HTD_MAX >=3D ring.capacity - no limitation. > > + * By default HTD_MAX =3D=3D ring.capacity / 8. > > + */ > > + > > +#ifdef __cplusplus > > +extern "C" { > > +#endif > > + > > +#include > > + > > +/** > > + * @internal Enqueue several objects on the RTS ring. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param obj_table > > + * A pointer to a table of void * pointers (objects). > > + * @param n > > + * The number of objects to add in the ring from the obj_table. > > + * @param behavior > > + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a r= ing > > + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from > > ring > > + * @param free_space > > + * returns the amount of space after the enqueue operation has finis= hed > > + * @return > > + * Actual number of objects enqueued. > > + * If behavior =3D=3D RTE_RING_QUEUE_FIXED, this will be 0 or n only= . > > + */ > > +static __rte_always_inline unsigned int > > +__rte_ring_do_rts_enqueue(struct rte_ring *r, void * const *obj_table, > > + uint32_t n, enum rte_ring_queue_behavior behavior, > > + uint32_t *free_space) > > +{ > > + uint32_t free, head; > > + > > + n =3D __rte_ring_rts_move_prod_head(r, n, behavior, &head, &free); > > + > > + if (n !=3D 0) { > > + ENQUEUE_PTRS(r, &r[1], head, obj_table, n, void *); > > + __rte_ring_rts_update_tail(&r->rts_prod); > > + } > > + > > + if (free_space !=3D NULL) > > + *free_space =3D free - n; > > + return n; > > +} > > + > > +/** > > + * @internal Dequeue several objects from the RTS ring. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param obj_table > > + * A pointer to a table of void * pointers (objects). > > + * @param n > > + * The number of objects to pull from the ring. > > + * @param behavior > > + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a r= ing > > + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from > > ring > > + * @param available > > + * returns the number of remaining ring entries after the dequeue ha= s > > finished > > + * @return > > + * - Actual number of objects dequeued. > > + * If behavior =3D=3D RTE_RING_QUEUE_FIXED, this will be 0 or n on= ly. > > + */ > > +static __rte_always_inline unsigned int > > +__rte_ring_do_rts_dequeue(struct rte_ring *r, void **obj_table, > > + uint32_t n, enum rte_ring_queue_behavior behavior, > > + uint32_t *available) > > +{ > > + uint32_t entries, head; > > + > > + n =3D __rte_ring_rts_move_cons_head(r, n, behavior, &head, &entries); > > + > > + if (n !=3D 0) { > > + DEQUEUE_PTRS(r, &r[1], head, obj_table, n, void *); > > + __rte_ring_rts_update_tail(&r->rts_cons); > > + } > > + > > + if (available !=3D NULL) > > + *available =3D entries - n; > > + return n; > > +} > > + > > +/** > > + * Enqueue several objects on the RTS ring (multi-producers safe). > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param obj_table > > + * A pointer to a table of void * pointers (objects). > > + * @param n > > + * The number of objects to add in the ring from the obj_table. > > + * @param free_space > > + * if non-NULL, returns the amount of space in the ring after the > > + * enqueue operation has finished. > > + * @return > > + * The number of objects enqueued, either 0 or n > > + */ > > +__rte_experimental > > +static __rte_always_inline unsigned int > > +rte_ring_mp_rts_enqueue_bulk(struct rte_ring *r, void * const *obj_tab= le, > > + unsigned int n, unsigned int *free_space) { > > + return __rte_ring_do_rts_enqueue(r, obj_table, n, > > RTE_RING_QUEUE_FIXED, > > + free_space); > > +} > > + > > +/** > > + * Dequeue several objects from an RTS ring (multi-consumers safe). > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param obj_table > > + * A pointer to a table of void * pointers (objects) that will be fi= lled. > > + * @param n > > + * The number of objects to dequeue from the ring to the obj_table. > > + * @param available > > + * If non-NULL, returns the number of remaining ring entries after t= he > > + * dequeue has finished. > > + * @return > > + * The number of objects dequeued, either 0 or n > > + */ > > +__rte_experimental > > +static __rte_always_inline unsigned int > > +rte_ring_mc_rts_dequeue_bulk(struct rte_ring *r, void **obj_table, > > + unsigned int n, unsigned int *available) { > > + return __rte_ring_do_rts_dequeue(r, obj_table, n, > > RTE_RING_QUEUE_FIXED, > > + available); > > +} > > + > > +/** > > + * Return producer max Head-Tail-Distance (HTD). > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @return > > + * Producer HTD value, if producer is set in appropriate sync mode, > > + * or UINT32_MAX otherwise. > > + */ > > +__rte_experimental > > +static inline uint32_t > > +rte_ring_get_prod_htd_max(const struct rte_ring *r) { > > + if (r->prod.sync_type =3D=3D RTE_RING_SYNC_MT_RTS) > > + return r->rts_prod.htd_max; > > + return UINT32_MAX; > > +} > > + > > +/** > > + * Set producer max Head-Tail-Distance (HTD). > > + * Note that producer has to use appropriate sync mode (RTS). > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param v > > + * new HTD value to setup. > > + * @return > > + * Zero on success, or negative error code otherwise. > > + */ > > +__rte_experimental > > +static inline int > > +rte_ring_set_prod_htd_max(struct rte_ring *r, uint32_t v) { > > + if (r->prod.sync_type !=3D RTE_RING_SYNC_MT_RTS) > > + return -ENOTSUP; > > + > > + r->rts_prod.htd_max =3D v; > > + return 0; > > +} > > + > > +/** > > + * Return consumer max Head-Tail-Distance (HTD). > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @return > > + * Consumer HTD value, if consumer is set in appropriate sync mode, > > + * or UINT32_MAX otherwise. > > + */ > > +__rte_experimental > > +static inline uint32_t > > +rte_ring_get_cons_htd_max(const struct rte_ring *r) { > > + if (r->cons.sync_type =3D=3D RTE_RING_SYNC_MT_RTS) > > + return r->rts_cons.htd_max; > > + return UINT32_MAX; > > +} > > + > > +/** > > + * Set consumer max Head-Tail-Distance (HTD). > > + * Note that consumer has to use appropriate sync mode (RTS). > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param v > > + * new HTD value to setup. > > + * @return > > + * Zero on success, or negative error code otherwise. > > + */ > > +__rte_experimental > > +static inline int > > +rte_ring_set_cons_htd_max(struct rte_ring *r, uint32_t v) { > > + if (r->cons.sync_type !=3D RTE_RING_SYNC_MT_RTS) > > + return -ENOTSUP; > > + > > + r->rts_cons.htd_max =3D v; > > + return 0; > > +} > > + > > +/** > > + * Enqueue several objects on the RTS ring (multi-producers safe). > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param obj_table > > + * A pointer to a table of void * pointers (objects). > > + * @param n > > + * The number of objects to add in the ring from the obj_table. > > + * @param free_space > > + * if non-NULL, returns the amount of space in the ring after the > > + * enqueue operation has finished. > > + * @return > > + * - n: Actual number of objects enqueued. > > + */ > > +__rte_experimental > > +static __rte_always_inline unsigned > > +rte_ring_mp_rts_enqueue_burst(struct rte_ring *r, void * const *obj_ta= ble, > > + unsigned int n, unsigned int *free_space) { > > + return __rte_ring_do_rts_enqueue(r, obj_table, n, > > + RTE_RING_QUEUE_VARIABLE, free_space); } > > + > > +/** > > + * Dequeue several objects from an RTS ring (multi-consumers safe). > > + * When the requested objects are more than the available objects, > > + * only dequeue the actual number of objects. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param obj_table > > + * A pointer to a table of void * pointers (objects) that will be fi= lled. > > + * @param n > > + * The number of objects to dequeue from the ring to the obj_table. > > + * @param available > > + * If non-NULL, returns the number of remaining ring entries after t= he > > + * dequeue has finished. > > + * @return > > + * - n: Actual number of objects dequeued, 0 if ring is empty > > + */ > > +__rte_experimental > > +static __rte_always_inline unsigned > > +rte_ring_mc_rts_dequeue_burst(struct rte_ring *r, void **obj_table, > > + unsigned int n, unsigned int *available) { > > + return __rte_ring_do_rts_dequeue(r, obj_table, n, > > + RTE_RING_QUEUE_VARIABLE, available); } > > + > > +#ifdef __cplusplus > > +} > > +#endif > > + > > +#endif /* _RTE_RING_RTS_H_ */ > > diff --git a/lib/librte_ring/rte_ring_rts_elem.h > > b/lib/librte_ring/rte_ring_rts_elem.h > > new file mode 100644 > > index 000000000..71a331b23 > > --- /dev/null > > +++ b/lib/librte_ring/rte_ring_rts_elem.h > > @@ -0,0 +1,205 @@ > > +/* SPDX-License-Identifier: BSD-3-Clause > > + * > > + * Copyright (c) 2010-2017 Intel Corporation > > + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org > > + * All rights reserved. > > + * Derived from FreeBSD's bufring.h > > + * Used as BSD-3 Licensed with permission from Kip Macy. > > + */ > > + > > +#ifndef _RTE_RING_RTS_ELEM_H_ > > +#define _RTE_RING_RTS_ELEM_H_ > > + > > +/** > > + * @file rte_ring_rts_elem.h > > + * @b EXPERIMENTAL: this API may change without prior notice > > + * > > + * It is not recommended to include this file directly. > > + * Please include instead. > > + * Contains *ring_elem* functions for Relaxed Tail Sync (RTS) ring mod= e. > > + * for more details please refer to . > > + */ > > + > > +#ifdef __cplusplus > > +extern "C" { > > +#endif > > + > > +#include > > + > > +/** > > + * @internal Enqueue several objects on the RTS ring. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param obj_table > > + * A pointer to a table of void * pointers (objects). > > + * @param n > > + * The number of objects to add in the ring from the obj_table. > > + * @param behavior > > + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a r= ing > > + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from > > ring > > + * @param free_space > > + * returns the amount of space after the enqueue operation has finis= hed > > + * @return > > + * Actual number of objects enqueued. > > + * If behavior =3D=3D RTE_RING_QUEUE_FIXED, this will be 0 or n only= . > > + */ > > +static __rte_always_inline unsigned int > > +__rte_ring_do_rts_enqueue_elem(struct rte_ring *r, void * const *obj_t= able, > obj_table should be of type 'const void * obj_table' (looks like copy pas= te error). Please check the other APIs below too. >=20 > > + uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior, > 'esize' is not documented in the comments above the function. You can cop= y the header from rte_ring_elem.h file. Please check other APIs > as well. Ack to both, will fix. >=20 > > + uint32_t *free_space) > > +{ > > + uint32_t free, head; > > + > > + n =3D __rte_ring_rts_move_prod_head(r, n, behavior, &head, &free); > > + > > + if (n !=3D 0) { > > + __rte_ring_enqueue_elems(r, head, obj_table, esize, n); > > + __rte_ring_rts_update_tail(&r->rts_prod); > > + } > > + > > + if (free_space !=3D NULL) > > + *free_space =3D free - n; > > + return n; > > +} > > + > > +/** > > + * @internal Dequeue several objects from the RTS ring. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param obj_table > > + * A pointer to a table of void * pointers (objects). > > + * @param n > > + * The number of objects to pull from the ring. > > + * @param behavior > > + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a r= ing > > + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from > > ring > > + * @param available > > + * returns the number of remaining ring entries after the dequeue ha= s > > finished > > + * @return > > + * - Actual number of objects dequeued. > > + * If behavior =3D=3D RTE_RING_QUEUE_FIXED, this will be 0 or n on= ly. > > + */ > > +static __rte_always_inline unsigned int > > +__rte_ring_do_rts_dequeue_elem(struct rte_ring *r, void **obj_table, > > + uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior, > > + uint32_t *available) > > +{ > > + uint32_t entries, head; > > + > > + n =3D __rte_ring_rts_move_cons_head(r, n, behavior, &head, &entries); > > + > > + if (n !=3D 0) { > > + __rte_ring_dequeue_elems(r, head, obj_table, esize, n); > > + __rte_ring_rts_update_tail(&r->rts_cons); > > + } > > + > > + if (available !=3D NULL) > > + *available =3D entries - n; > > + return n; > > +} > > + > > +/** > > + * Enqueue several objects on the RTS ring (multi-producers safe). > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param obj_table > > + * A pointer to a table of void * pointers (objects). > > + * @param n > > + * The number of objects to add in the ring from the obj_table. > > + * @param free_space > > + * if non-NULL, returns the amount of space in the ring after the > > + * enqueue operation has finished. > > + * @return > > + * The number of objects enqueued, either 0 or n > > + */ > > +__rte_experimental > > +static __rte_always_inline unsigned int > > +rte_ring_mp_rts_enqueue_bulk_elem(struct rte_ring *r, void * const > > *obj_table, > > + unsigned int esize, unsigned int n, unsigned int *free_space) { > > + return __rte_ring_do_rts_enqueue_elem(r, obj_table, esize, n, > > + RTE_RING_QUEUE_FIXED, free_space); > > +} > > + > > +/** > > + * Dequeue several objects from an RTS ring (multi-consumers safe). > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param obj_table > > + * A pointer to a table of void * pointers (objects) that will be fi= lled. > > + * @param n > > + * The number of objects to dequeue from the ring to the obj_table. > > + * @param available > > + * If non-NULL, returns the number of remaining ring entries after t= he > > + * dequeue has finished. > > + * @return > > + * The number of objects dequeued, either 0 or n > > + */ > > +__rte_experimental > > +static __rte_always_inline unsigned int > > +rte_ring_mc_rts_dequeue_bulk_elem(struct rte_ring *r, void **obj_table= , > > + unsigned int esize, unsigned int n, unsigned int *available) { > > + return __rte_ring_do_rts_dequeue_elem(r, obj_table, esize, n, > > + RTE_RING_QUEUE_FIXED, available); > > +} > > + > > +/** > > + * Enqueue several objects on the RTS ring (multi-producers safe). > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param obj_table > > + * A pointer to a table of void * pointers (objects). > > + * @param n > > + * The number of objects to add in the ring from the obj_table. > > + * @param free_space > > + * if non-NULL, returns the amount of space in the ring after the > > + * enqueue operation has finished. > > + * @return > > + * - n: Actual number of objects enqueued. > > + */ > > +__rte_experimental > > +static __rte_always_inline unsigned > > +rte_ring_mp_rts_enqueue_burst_elem(struct rte_ring *r, void * const > > *obj_table, > > + unsigned int esize, unsigned int n, unsigned int *free_space) { > > + return __rte_ring_do_rts_enqueue_elem(r, obj_table, esize, n, > > + RTE_RING_QUEUE_VARIABLE, free_space); } > > + > > +/** > > + * Dequeue several objects from an RTS ring (multi-consumers safe). > > + * When the requested objects are more than the available objects, > > + * only dequeue the actual number of objects. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param obj_table > > + * A pointer to a table of void * pointers (objects) that will be fi= lled. > > + * @param n > > + * The number of objects to dequeue from the ring to the obj_table. > > + * @param available > > + * If non-NULL, returns the number of remaining ring entries after t= he > > + * dequeue has finished. > > + * @return > > + * - n: Actual number of objects dequeued, 0 if ring is empty > > + */ > > +__rte_experimental > > +static __rte_always_inline unsigned > > +rte_ring_mc_rts_dequeue_burst_elem(struct rte_ring *r, void **obj_tabl= e, > > + unsigned int esize, unsigned int n, unsigned int *available) { > > + return __rte_ring_do_rts_dequeue_elem(r, obj_table, esize, n, > > + RTE_RING_QUEUE_VARIABLE, available); } > > + > > +#ifdef __cplusplus > > +} > > +#endif > > + > > +#endif /* _RTE_RING_RTS_ELEM_H_ */ > > diff --git a/lib/librte_ring/rte_ring_rts_generic.h > > b/lib/librte_ring/rte_ring_rts_generic.h > > new file mode 100644 > > index 000000000..f88460d47 > > --- /dev/null > > +++ b/lib/librte_ring/rte_ring_rts_generic.h > I do not know the benefit to providing the generic version. Do you know w= hy this was done in the legacy APIs? I think at first we had generic API only, then later C11 was added. As I remember, C11 one on IA was measured as a bit slower then generic, so it was decided to keep both. =20 > If there is no performance difference between generic and C11 versions, s= hould we just skip the generic version? > The oldest compiler in CI are GCC 4.8.5 and Clang 3.4.2 and C11 built-ins= are supported earlier than these compiler versions. > I feel the code is growing exponentially in rte_ring library and we shoul= d try to cut non-value-ass code/APIs aggressively. I'll check is there perf difference for RTS and HTS between generic and C11= versions on IA. Meanwhile please have a proper look at C11 implementation, I am not that fa= miliar with C11 atomics yet. If there would be no problems with it and no noticeable diff in performance= - I am ok to have for RTS/HTS modes C11 version only. >=20 > > @@ -0,0 +1,210 @@ > > +/* SPDX-License-Identifier: BSD-3-Clause > > + * > > + * Copyright (c) 2010-2017 Intel Corporation > > + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org > > + * All rights reserved. > > + * Derived from FreeBSD's bufring.h > > + * Used as BSD-3 Licensed with permission from Kip Macy. > > + */ > > + > > +#ifndef _RTE_RING_RTS_GENERIC_H_ > > +#define _RTE_RING_RTS_GENERIC_H_ > > + > > +/** > > + * @file rte_ring_rts_generic.h > > + * It is not recommended to include this file directly, > > + * include instead. > > + * Contains internal helper functions for Relaxed Tail Sync (RTS) ring= mode. > > + * For more information please refer to . > > + */ > > + > > +/** > > + * @internal This function updates tail values. > > + */ > > +static __rte_always_inline void > > +__rte_ring_rts_update_tail(struct rte_ring_rts_headtail *ht) { > > + union rte_ring_ht_poscnt h, ot, nt; > > + > > + /* > > + * If there are other enqueues/dequeues in progress that > > + * might preceded us, then don't update tail with new value. > > + */ > > + > > + do { > > + ot.raw =3D ht->tail.raw; > > + rte_smp_rmb(); > > + > > + /* on 32-bit systems we have to do atomic read here */ > > + h.raw =3D rte_atomic64_read((rte_atomic64_t *) > > + (uintptr_t)&ht->head.raw); > > + > > + nt.raw =3D ot.raw; > > + if (++nt.val.cnt =3D=3D h.val.cnt) > > + nt.val.pos =3D h.val.pos; > > + > > + } while (rte_atomic64_cmpset(&ht->tail.raw, ot.raw, nt.raw) =3D=3D 0)= ; } > > + > > +/** > > + * @internal This function waits till head/tail distance wouldn't > > + * exceed pre-defined max value. > > + */ > > +static __rte_always_inline void > > +__rte_ring_rts_head_wait(const struct rte_ring_rts_headtail *ht, > > + union rte_ring_ht_poscnt *h) > > +{ > > + uint32_t max; > > + > > + max =3D ht->htd_max; > > + h->raw =3D ht->head.raw; > > + rte_smp_rmb(); > > + > > + while (h->val.pos - ht->tail.val.pos > max) { > > + rte_pause(); > > + h->raw =3D ht->head.raw; > > + rte_smp_rmb(); > > + } > > +} > > + > > +/** > > + * @internal This function updates the producer head for enqueue. > > + * > > + * @param r > > + * A pointer to the ring structure > > + * @param is_sp > > + * Indicates whether multi-producer path is needed or not > > + * @param n > > + * The number of elements we will want to enqueue, i.e. how far shou= ld the > > + * head be moved > > + * @param behavior > > + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a r= ing > > + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from > > ring > > + * @param old_head > > + * Returns head value as it was before the move, i.e. where enqueue = starts > > + * @param new_head > > + * Returns the current/new head value i.e. where enqueue finishes > > + * @param free_entries > > + * Returns the amount of free space in the ring BEFORE head was move= d > > + * @return > > + * Actual number of objects enqueued. > > + * If behavior =3D=3D RTE_RING_QUEUE_FIXED, this will be 0 or n only= . > > + */ > > +static __rte_always_inline uint32_t > > +__rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num, > > + enum rte_ring_queue_behavior behavior, uint32_t *old_head, > > + uint32_t *free_entries) > > +{ > > + uint32_t n; > > + union rte_ring_ht_poscnt nh, oh; > > + > > + const uint32_t capacity =3D r->capacity; > > + > > + do { > > + /* Reset n to the initial burst count */ > > + n =3D num; > > + > > + /* read prod head (may spin on prod tail) */ > > + __rte_ring_rts_head_wait(&r->rts_prod, &oh); > > + > > + /* add rmb barrier to avoid load/load reorder in weak > > + * memory model. It is noop on x86 > > + */ > > + rte_smp_rmb(); > > + > > + /* > > + * The subtraction is done between two unsigned 32bits value > > + * (the result is always modulo 32 bits even if we have > > + * *old_head > cons_tail). So 'free_entries' is always between > > 0 > > + * and capacity (which is < size). > > + */ > > + *free_entries =3D capacity + r->cons.tail - oh.val.pos; > > + > > + /* check that we have enough room in ring */ > > + if (unlikely(n > *free_entries)) > > + n =3D (behavior =3D=3D RTE_RING_QUEUE_FIXED) ? > > + 0 : *free_entries; > > + > > + if (n =3D=3D 0) > > + break; > > + > > + nh.val.pos =3D oh.val.pos + n; > > + nh.val.cnt =3D oh.val.cnt + 1; > > + > > + } while (rte_atomic64_cmpset(&r->rts_prod.head.raw, > > + oh.raw, nh.raw) =3D=3D 0); > > + > > + *old_head =3D oh.val.pos; > > + return n; > > +} > > + > > +/** > > + * @internal This function updates the consumer head for dequeue > > + * > > + * @param r > > + * A pointer to the ring structure > > + * @param is_sc > > + * Indicates whether multi-consumer path is needed or not > > + * @param n > > + * The number of elements we will want to enqueue, i.e. how far shou= ld the > > + * head be moved > > + * @param behavior > > + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a r= ing > > + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from > > ring > > + * @param old_head > > + * Returns head value as it was before the move, i.e. where dequeue = starts > > + * @param new_head > > + * Returns the current/new head value i.e. where dequeue finishes > > + * @param entries > > + * Returns the number of entries in the ring BEFORE head was moved > > + * @return > > + * - Actual number of objects dequeued. > > + * If behavior =3D=3D RTE_RING_QUEUE_FIXED, this will be 0 or n on= ly. > > + */ > > +static __rte_always_inline unsigned int > > +__rte_ring_rts_move_cons_head(struct rte_ring *r, uint32_t num, > > + enum rte_ring_queue_behavior behavior, uint32_t *old_head, > > + uint32_t *entries) > > +{ > > + uint32_t n; > > + union rte_ring_ht_poscnt nh, oh; > > + > > + /* move cons.head atomically */ > > + do { > > + /* Restore n as it may change every loop */ > > + n =3D num; > > + > > + /* read cons head (may spin on cons tail) */ > > + __rte_ring_rts_head_wait(&r->rts_cons, &oh); > > + > > + > > + /* add rmb barrier to avoid load/load reorder in weak > > + * memory model. It is noop on x86 > > + */ > > + rte_smp_rmb(); > > + > > + /* The subtraction is done between two unsigned 32bits value > > + * (the result is always modulo 32 bits even if we have > > + * cons_head > prod_tail). So 'entries' is always between 0 > > + * and size(ring)-1. > > + */ > > + *entries =3D r->prod.tail - oh.val.pos; > > + > > + /* Set the actual entries for dequeue */ > > + if (n > *entries) > > + n =3D (behavior =3D=3D RTE_RING_QUEUE_FIXED) ? 0 : > > *entries; > > + > > + if (unlikely(n =3D=3D 0)) > > + break; > > + > > + nh.val.pos =3D oh.val.pos + n; > > + nh.val.cnt =3D oh.val.cnt + 1; > > + > > + } while (rte_atomic64_cmpset(&r->rts_cons.head.raw, > > + oh.raw, nh.raw) =3D=3D 0); > > + > > + *old_head =3D oh.val.pos; > > + return n; > > +} > > + > > +#endif /* _RTE_RING_RTS_GENERIC_H_ */ > > -- > > 2.17.1