From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from dpdk.org (dpdk.org [92.243.14.124]) by dpdk.space (Postfix) with ESMTP id 1C3F5A0096 for ; Fri, 7 Jun 2019 08:03:39 +0200 (CEST) Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id 778B52B82; Fri, 7 Jun 2019 08:03:38 +0200 (CEST) Received: from EUR01-DB5-obe.outbound.protection.outlook.com (mail-eopbgr150071.outbound.protection.outlook.com [40.107.15.71]) by dpdk.org (Postfix) with ESMTP id 435252B82 for ; Fri, 7 Jun 2019 08:03:37 +0200 (CEST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=armh.onmicrosoft.com; s=selector2-armh-onmicrosoft-com; h=From:Date:Subject:Message-ID:Content-Type:MIME-Version:X-MS-Exchange-SenderADCheck; bh=84gPa5KWAto4sbXkAhWZCNq2q7i4oib5j/sijLGvHSc=; b=tcl1tF2zvYvBof+jtS0rb7E64RwzOPTZ6p6V7smC2UUjLQ4h09gFCdS4jVPY4r1pjznk0RsBhHmcuxxgrvSskxIlFQxwvHbJY47/ph1JLxQThE+8sn2QS0dx08SaUb1R2G1U1f2AwaCS+zuWVJHeGvxacfPX2Uxz4S8lVidiB9o= Received: from VE1PR08MB5149.eurprd08.prod.outlook.com (20.179.30.152) by VE1PR08MB5120.eurprd08.prod.outlook.com (20.179.30.75) with Microsoft SMTP Server (version=TLS1_2, cipher=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384) id 15.20.1965.12; Fri, 7 Jun 2019 06:03:35 +0000 Received: from VE1PR08MB5149.eurprd08.prod.outlook.com ([fe80::9983:2882:a24:c0b0]) by VE1PR08MB5149.eurprd08.prod.outlook.com ([fe80::9983:2882:a24:c0b0%5]) with mapi id 15.20.1965.011; Fri, 7 Jun 2019 06:03:35 +0000 From: Honnappa Nagarahalli To: Vipin Varghese , "olivier.matz@6wind.com" , "reshma.pattan@intel.com" , "keith.wiles@intel.com" , "dev@dpdk.org" CC: "sanjay.padubidri@intel.com" , Honnappa Nagarahalli , nd , nd Thread-Topic: [dpdk-dev] [PATCH v1 1/2] lib/ring: add enqueue-dequeue callabck Thread-Index: AQHVHJZWfdT7ASJYqEOhdZNDSUTduKaPr92g Date: Fri, 7 Jun 2019 06:03:34 +0000 Message-ID: References: <20190606183355.56734-1-vipin.varghese@intel.com> In-Reply-To: <20190606183355.56734-1-vipin.varghese@intel.com> Accept-Language: en-US Content-Language: en-US X-MS-Has-Attach: X-MS-TNEF-Correlator: x-ts-tracking-id: 9380c122-adb3-415a-ac83-bfc5d3bdf879.0 x-checkrecipientchecked: true authentication-results: spf=none (sender IP is ) smtp.mailfrom=Honnappa.Nagarahalli@arm.com; x-originating-ip: [217.140.111.135] x-ms-publictraffictype: Email x-ms-office365-filtering-correlation-id: a2acff42-acd0-43a5-0a2b-08d6eb0de065 x-ms-office365-filtering-ht: Tenant x-microsoft-antispam: BCL:0; PCL:0; RULEID:(2390118)(7020095)(4652040)(8989299)(4534185)(4627221)(201703031133081)(201702281549075)(8990200)(5600148)(711020)(4605104)(1401327)(4618075)(2017052603328)(7193020); SRVR:VE1PR08MB5120; x-ms-traffictypediagnostic: VE1PR08MB5120: nodisclaimer: True x-microsoft-antispam-prvs: x-ms-oob-tlc-oobclassifiers: OLM:4125; x-forefront-prvs: 0061C35778 x-forefront-antispam-report: SFV:NSPM; SFS:(10009020)(396003)(136003)(346002)(376002)(366004)(39860400002)(189003)(199004)(8676002)(186003)(64756008)(71190400001)(26005)(71200400001)(76176011)(66946007)(3846002)(14444005)(316002)(81166006)(2201001)(66066001)(81156014)(66446008)(446003)(6506007)(6116002)(102836004)(8936002)(7696005)(66556008)(76116006)(25786009)(4326008)(72206003)(6246003)(256004)(305945005)(66476007)(33656002)(229853002)(74316002)(11346002)(2906002)(52536014)(55016002)(53936002)(99286004)(110136005)(68736007)(486006)(54906003)(2501003)(14454004)(478600001)(476003)(73956011)(86362001)(9686003)(7736002)(5660300002)(6436002); DIR:OUT; SFP:1101; SCL:1; SRVR:VE1PR08MB5120; H:VE1PR08MB5149.eurprd08.prod.outlook.com; FPR:; SPF:None; LANG:en; PTR:InfoNoRecords; MX:1; A:1; received-spf: None (protection.outlook.com: arm.com does not designate permitted sender hosts) x-ms-exchange-senderadcheck: 1 x-microsoft-antispam-message-info: teNiwyYid9Q89NQlTbnFBxyKGBU2gnvIOGFzU8ZYMb2I9NBb+cG6gdVcbnAQ6lG08oe9AJX6J2MH+MYMvbVwC5a7Tj0iXJ3xmQG5v8kswjSqMnizuoL3NqBr+LoBMqT3IBE/Dsift6W+HG+rFWIYvfwcbjPTpI2PKDBREzZdMkWLDvD9ngaAOiYerNuwgBO9GhTdPCv2hj7kdQ2pftQZpcaeZN7PCPeb6ldf3xjCBfwyyuI5gZ67+ZXIIqULQ660/WVcbZ852jQYCkoVH/SLG5etEtch6xtnErxoJ+7UASUK+ZsAQhHmIhTifEZcQOfqkB5gdDmLlmiGDV7HOkZZs2tfDN2m8/94cCmvXA/xDJwKXzcwusI4vvN5Ez4TFvL2vBVNfxTVw58YBxODHhVJm616g0lKdd1OR8epfrqlVwo= Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: quoted-printable MIME-Version: 1.0 X-OriginatorOrg: arm.com X-MS-Exchange-CrossTenant-Network-Message-Id: a2acff42-acd0-43a5-0a2b-08d6eb0de065 X-MS-Exchange-CrossTenant-originalarrivaltime: 07 Jun 2019 06:03:35.0084 (UTC) X-MS-Exchange-CrossTenant-fromentityheader: Hosted X-MS-Exchange-CrossTenant-id: f34e5979-57d9-4aaa-ad4d-b122a662184d X-MS-Exchange-CrossTenant-mailboxtype: HOSTED X-MS-Exchange-CrossTenant-userprincipalname: Honnappa.Nagarahalli@arm.com X-MS-Exchange-Transport-CrossTenantHeadersStamped: VE1PR08MB5120 Subject: Re: [dpdk-dev] [PATCH v1 1/2] lib/ring: add enqueue-dequeue callabck X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" >=20 > Add callback event handler for enqueue dequeue operation on ring. > The pre-enqueue and post-dequeue operation on ring is selected to invoke > user callback handler. Can you provide a use case for this to better understand the need? >=20 > Signed-off-by: Vipin Varghese > --- > config/common_base | 1 + > lib/librte_ring/Makefile | 1 + > lib/librte_ring/meson.build | 2 + > lib/librte_ring/rte_ring.c | 187 +++++++++++++++++++++++++++ > lib/librte_ring/rte_ring.h | 117 ++++++++++++++++- > lib/librte_ring/rte_ring_version.map | 9 ++ > 6 files changed, 316 insertions(+), 1 deletion(-) >=20 > diff --git a/config/common_base b/config/common_base index > ec29455d2..022734f19 100644 > --- a/config/common_base > +++ b/config/common_base > @@ -500,6 +500,7 @@ CONFIG_RTE_LIBRTE_PMD_PCAP=3Dn > CONFIG_RTE_LIBRTE_PMD_RING=3Dy > CONFIG_RTE_PMD_RING_MAX_RX_RINGS=3D16 > CONFIG_RTE_PMD_RING_MAX_TX_RINGS=3D16 > +CONFIG_RTE_RING_ENQDEQ_CALLBACKS=3Dn >=20 > # > # Compile SOFTNIC PMD > diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile index > 21a36770d..4f086e687 100644 > --- a/lib/librte_ring/Makefile > +++ b/lib/librte_ring/Makefile > @@ -6,6 +6,7 @@ include $(RTE_SDK)/mk/rte.vars.mk # library name LIB = =3D > librte_ring.a >=20 > +CFLAGS +=3D -DALLOW_EXPERIMENTAL_API > CFLAGS +=3D $(WERROR_FLAGS) -I$(SRCDIR) -O3 LDLIBS +=3D -lrte_eal >=20 > diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build in= dex > ab8b0b469..b92dcf027 100644 > --- a/lib/librte_ring/meson.build > +++ b/lib/librte_ring/meson.build > @@ -2,6 +2,8 @@ > # Copyright(c) 2017 Intel Corporation >=20 > version =3D 2 > +allow_experimental_apis =3D true > + > sources =3D files('rte_ring.c') > headers =3D files('rte_ring.h', > 'rte_ring_c11_mem.h', > diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c inde= x > b89ecf999..ee740c401 100644 > --- a/lib/librte_ring/rte_ring.c > +++ b/lib/librte_ring/rte_ring.c > @@ -43,6 +43,11 @@ EAL_REGISTER_TAILQ(rte_ring_tailq) > /* true if x is a power of 2 */ > #define POWEROF2(x) ((((x)-1) & (x)) =3D=3D 0) >=20 > +/* spinlock for pre-enqueue callback */ rte_spinlock_t > +rte_ring_preenq_cb_lock =3D RTE_SPINLOCK_INITIALIZER; > +/* spinlock for post-dequeue callback */ rte_spinlock_t > +rte_ring_pstdeq_cb_lock =3D RTE_SPINLOCK_INITIALIZER; > + > /* return the size of memory occupied by a ring */ ssize_t > rte_ring_get_memsize(unsigned count) @@ -103,6 +108,9 @@ > rte_ring_init(struct rte_ring *r, const char *name, unsigned count, > r->prod.head =3D r->cons.head =3D 0; > r->prod.tail =3D r->cons.tail =3D 0; >=20 > + TAILQ_INIT(&(r->enq_cbs)); > + TAILQ_INIT(&(r->deq_cbs)); > + > return 0; > } >=20 > @@ -220,6 +228,185 @@ rte_ring_free(struct rte_ring *r) > rte_free(te); > } >=20 > +int __rte_experimental > +rte_ring_preenq_callback_register(struct rte_ring *r, > + rte_ring_cb_fn cb_fn, void *cb_arg) > +{ > +#ifndef RTE_RING_ENQDEQ_CALLBACKS > + rte_errno =3D ENOTSUP; > + return -ENOTSUP; > +#endif > + > + struct rte_ring_callback *user_cb; > + rte_spinlock_t *lock =3D &rte_ring_preenq_cb_lock; > + > + if (!cb_fn) > + return -EINVAL; > + > + if (!rte_ring_get_capacity(r)) { > + RTE_LOG(ERR, RING, "Invalid ring=3D%p", r); > + return -EINVAL; > + } > + > + rte_spinlock_lock(lock); > + > + TAILQ_FOREACH(user_cb, &(r->enq_cbs), next) { > + if ((void *) user_cb->cb_fn =3D=3D (void *) cb_fn && > + user_cb->cb_arg =3D=3D cb_arg) > + break; > + } > + > + /* create a new callback. */ > + if (user_cb =3D=3D NULL) { > + user_cb =3D rte_zmalloc("RING_USER_CALLBACK", > + sizeof(struct rte_ring_callback), 0); > + if (user_cb !=3D NULL) { > + user_cb->cb_fn =3D cb_fn; > + user_cb->cb_arg =3D cb_arg; > + TAILQ_INSERT_TAIL(&(r->enq_cbs), user_cb, next); > + } > + } > + > + rte_spinlock_unlock(lock); > + > + return (user_cb =3D=3D NULL) ? -ENOMEM : 0; } > + > +int __rte_experimental > +rte_ring_pstdeq_callback_register(struct rte_ring *r, > + rte_ring_cb_fn cb_fn, void *cb_arg) > +{ > +#ifndef RTE_RING_ENQDEQ_CALLBACKS > + rte_errno =3D ENOTSUP; > + return -ENOTSUP; > +#endif > + > + struct rte_ring_callback *user_cb; > + rte_spinlock_t *lock =3D &rte_ring_pstdeq_cb_lock; > + > + if (!cb_fn) > + return -EINVAL; > + > + if (!rte_ring_get_capacity(r)) { > + RTE_LOG(ERR, RING, "Invalid ring=3D%p", r); > + return -EINVAL; > + } > + > + rte_spinlock_lock(lock); > + > + TAILQ_FOREACH(user_cb, &(r->deq_cbs), next) { > + if ((void *) user_cb->cb_fn =3D=3D (void *) cb_fn && > + user_cb->cb_arg =3D=3D cb_arg) > + break; > + } > + > + /* create a new callback. */ > + if (user_cb =3D=3D NULL) { > + user_cb =3D rte_zmalloc("RING_USER_CALLBACK", > + sizeof(struct rte_ring_callback), 0); > + if (user_cb !=3D NULL) { > + user_cb->cb_fn =3D cb_fn; > + user_cb->cb_arg =3D cb_arg; > + TAILQ_INSERT_TAIL(&(r->deq_cbs), user_cb, next); > + } > + } > + > + rte_spinlock_unlock(lock); > + > + return (user_cb =3D=3D NULL) ? -ENOMEM : 0; } > + > +int __rte_experimental > +rte_ring_preenq_callback_unregister(struct rte_ring *r, > + rte_ring_cb_fn cb_fn, void *cb_arg) > +{ > +#ifndef RTE_RING_ENQDEQ_CALLBACKS > + rte_errno =3D ENOTSUP; > + return -ENOTSUP; > +#endif > + > + int ret =3D 0; > + struct rte_ring_callback *cb, *next; > + rte_spinlock_t *lock =3D &rte_ring_preenq_cb_lock; > + > + if (!cb_fn) > + return -EINVAL; > + > + if (!rte_ring_get_capacity(r)) { > + RTE_LOG(ERR, RING, "Invalid ring=3D%p", r); > + return -EINVAL; > + } > + > + rte_spinlock_lock(lock); > + > + ret =3D -EINVAL; > + for (cb =3D TAILQ_FIRST(&r->enq_cbs); cb !=3D NULL; cb =3D next) { > + next =3D TAILQ_NEXT(cb, next); > + > + if (cb->cb_fn !=3D cb_fn || cb->cb_arg !=3D cb_arg) > + continue; > + > + if (cb->active =3D=3D 0) { > + TAILQ_REMOVE(&(r->enq_cbs), cb, next); > + rte_free(cb); > + ret =3D 0; > + } else { > + ret =3D -EAGAIN; > + } > + } > + > + rte_spinlock_unlock(lock); > + > + return ret; > +} > + > +int __rte_experimental > +rte_ring_pstdeq_callback_unregister(struct rte_ring *r, > + rte_ring_cb_fn cb_fn, void *cb_arg) > +{ > +#ifndef RTE_RING_ENQDEQ_CALLBACKS > + rte_errno =3D ENOTSUP; > + return -ENOTSUP; > +#endif > + > + int ret =3D 0; > + struct rte_ring_callback *cb, *next; > + rte_spinlock_t *lock =3D &rte_ring_pstdeq_cb_lock; > + > + if (!cb_fn) > + return -EINVAL; > + > + if (!rte_ring_get_capacity(r)) { > + RTE_LOG(ERR, RING, "Invalid ring=3D%p", r); > + return -EINVAL; > + } > + > + rte_spinlock_lock(lock); > + > + ret =3D -EINVAL; > + for (cb =3D TAILQ_FIRST(&r->deq_cbs); cb !=3D NULL; cb =3D next) { > + next =3D TAILQ_NEXT(cb, next); > + > + if (cb->cb_fn !=3D cb_fn || cb->cb_arg !=3D cb_arg) > + continue; > + > + if (cb->active =3D=3D 0) { > + TAILQ_REMOVE(&(r->deq_cbs), cb, next); > + rte_free(cb); > + ret =3D 0; > + } else { > + ret =3D -EAGAIN; > + } > + } > + > + rte_spinlock_unlock(lock); > + > + return ret; > + > + return 0; > +} > + > + > /* dump the status of the ring on the console */ void rte_ring_dump(FI= LE *f, > const struct rte_ring *r) diff --git a/lib/librte_ring/rte_ring.h > b/lib/librte_ring/rte_ring.h index e265e9479..fb0f3efb5 100644 > --- a/lib/librte_ring/rte_ring.h > +++ b/lib/librte_ring/rte_ring.h > @@ -63,6 +63,11 @@ enum rte_ring_queue_behavior { >=20 > struct rte_memzone; /* forward declaration, so as not to require memzone= .h > */ >=20 > +struct rte_ring_callback; > + > +TAILQ_HEAD(rte_ring_enq_cb_list, rte_ring_callback); > +TAILQ_HEAD(rte_ring_deq_cb_list, rte_ring_callback); > + > /* structure to hold a pair of head/tail values and other metadata */ s= truct > rte_ring_headtail { > volatile uint32_t head; /**< Prod/consumer head. */ @@ -103,6 > +108,20 @@ struct rte_ring { > /** Ring consumer status. */ > struct rte_ring_headtail cons __rte_cache_aligned; > char pad2 __rte_cache_aligned; /**< empty cache line */ > + > + struct rte_ring_enq_cb_list enq_cbs; > + struct rte_ring_deq_cb_list deq_cbs; > +}; This breaks ABI compatibility > + > +typedef unsigned int (*rte_ring_cb_fn)(struct rte_ring *r, > + void *obj_table, unsigned int n, > + void *cb_arg); > + > +struct rte_ring_callback { > + TAILQ_ENTRY(rte_ring_callback) next; /* Callbacks list */ > + rte_ring_cb_fn cb_fn; /* Callback address */ > + void *cb_arg; /* Parameter for callback */ > + uint32_t active; /* Callback is executing */ > }; >=20 > #define RING_F_SP_ENQ 0x0001 /**< The default enqueue is "single- > producer". */ @@ -850,9 +869,23 @@ rte_ring_sp_enqueue_burst(struct > rte_ring *r, void * const *obj_table, > * - n: Actual number of objects enqueued. > */ > static __rte_always_inline unsigned > -rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table, > +rte_ring_enqueue_burst(struct rte_ring *r, void **obj_table, > unsigned int n, unsigned int *free_space) { > +#ifdef RTE_RING_ENQDEQ_CALLBACKS > + struct rte_ring_callback *cb =3D NULL; > + > + TAILQ_FOREACH(cb, &(r->enq_cbs), next) { Need to take the TAILQ lock before this. For ex: what happens if a un-regis= ter is called concurrently? Also, traversing a linked list for every enqueue call would be too costly. = May be, understanding the use case will help. > + if (cb->cb_fn =3D=3D NULL) > + continue; > + > + cb->active =3D 1; > + n =3D cb->cb_fn(r, obj_table, n, cb->cb_arg); > + cb->active =3D 0; > + } > + > +#endif > + > return __rte_ring_do_enqueue(r, obj_table, n, > RTE_RING_QUEUE_VARIABLE, > r->prod.single, free_space); > }