DPDK patches and discussions
 help / color / mirror / Atom feed
From: "Ananyev, Konstantin" <konstantin.ananyev@intel.com>
To: "david.marchand@redhat.com" <david.marchand@redhat.com>,
	"thomas@monjalon.net" <thomas@monjalon.net>,
	"nhorman@tuxdriver.com" <nhorman@tuxdriver.com>
Cc: "honnappa.nagarahalli@arm.com" <honnappa.nagarahalli@arm.com>,
	"dev@dpdk.org" <dev@dpdk.org>,
	"Richardson, Bruce" <bruce.richardson@intel.com>,
	"Yigit, Ferruh" <ferruh.yigit@intel.com>
Subject: [dpdk-dev] /validate-abi.sh complains [PATCH v1 3/8] ring: introduce RTS ring mode
Date: Tue, 31 Mar 2020 17:05:24 +0000	[thread overview]
Message-ID: <SN6PR11MB2558241135DA9A3C218F49BD9AC80@SN6PR11MB2558.namprd11.prod.outlook.com> (raw)

Hi everyone,

Have a question regarding validate-abi.sh.
It complains on the following changes with that patch:

@@ -111,11 +129,21 @@ struct rte_ring {
        char pad0 __rte_cache_aligned; /**< empty cache line */

        /** Ring producer status. */
-       struct rte_ring_headtail prod __rte_cache_aligned;
+       RTE_STD_C11
+       union {
+               struct rte_ring_headtail prod;
+               struct rte_ring_rts_headtail rts_prod;
+       }  __rte_cache_aligned;
+
        char pad1 __rte_cache_aligned; /**< empty cache line */

        /** Ring consumer status. */
-       struct rte_ring_headtail cons __rte_cache_aligned;
+       RTE_STD_C11
+       union {
+               struct rte_ring_headtail cons;
+               struct rte_ring_rts_headtail rts_cons;
+       }  __rte_cache_aligned;
+

Complaints:
rte_ring.h
[−] struct rte_ring  2  

1
Change: Field cons has been removed from this type.
Effect:    Applications will access incorrect memory when attempting to access this field.
2
Change: Field prod has been removed from this type.
Effect:    Applications will access incorrect memory when attempting to access this field.

From my perspective it looks false-positive:
*prod* and *cons* fields are still there,
their format, size and offset within rte_ring remain the same. 
Is that some limitation with the tool, or am I missing something here?
Thanks
Konstantin 

> -----Original Message-----
> From: Ananyev, Konstantin <konstantin.ananyev@intel.com>
> Sent: Tuesday, March 31, 2020 5:43 PM
> To: dev@dpdk.org
> Cc: honnappa.nagarahalli@arm.com; Ananyev, Konstantin <konstantin.ananyev@intel.com>
> Subject: [PATCH v1 3/8] ring: introduce RTS ring mode
> 
> Introduce relaxed tail sync (RTS) mode for MT ring synchronization.
> Aim to reduce stall times in case when ring is used on
> overcommited cpus (multiple active threads on the same cpu).
> The main difference from original MP/MC algorithm is that
> tail value is increased not by every thread that finished enqueue/dequeue,
> but only by the last one.
> That allows threads to avoid spinning on ring tail value,
> leaving actual tail value change to the last thread in the update queue.
> 
> Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
> ---
>  lib/librte_ring/Makefile               |   5 +-
>  lib/librte_ring/meson.build            |   5 +-
>  lib/librte_ring/rte_ring.c             | 100 +++++++-
>  lib/librte_ring/rte_ring.h             | 109 ++++++++-
>  lib/librte_ring/rte_ring_elem.h        |  86 ++++++-
>  lib/librte_ring/rte_ring_rts.h         | 316 +++++++++++++++++++++++++
>  lib/librte_ring/rte_ring_rts_elem.h    | 205 ++++++++++++++++
>  lib/librte_ring/rte_ring_rts_generic.h | 210 ++++++++++++++++
>  8 files changed, 1007 insertions(+), 29 deletions(-)
>  create mode 100644 lib/librte_ring/rte_ring_rts.h
>  create mode 100644 lib/librte_ring/rte_ring_rts_elem.h
>  create mode 100644 lib/librte_ring/rte_ring_rts_generic.h
> 
> diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile
> index 917c560ad..8f5c284cc 100644
> --- a/lib/librte_ring/Makefile
> +++ b/lib/librte_ring/Makefile
> @@ -18,6 +18,9 @@ SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c
>  SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \
>  					rte_ring_elem.h \
>  					rte_ring_generic.h \
> -					rte_ring_c11_mem.h
> +					rte_ring_c11_mem.h \
> +					rte_ring_rts.h \
> +					rte_ring_rts_elem.h \
> +					rte_ring_rts_generic.h
> 
>  include $(RTE_SDK)/mk/rte.lib.mk
> diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build
> index f2f3ccc88..612936afb 100644
> --- a/lib/librte_ring/meson.build
> +++ b/lib/librte_ring/meson.build
> @@ -5,7 +5,10 @@ sources = files('rte_ring.c')
>  headers = files('rte_ring.h',
>  		'rte_ring_elem.h',
>  		'rte_ring_c11_mem.h',
> -		'rte_ring_generic.h')
> +		'rte_ring_generic.h',
> +		'rte_ring_rts.h',
> +		'rte_ring_rts_elem.h',
> +		'rte_ring_rts_generic.h')
> 
>  # rte_ring_create_elem and rte_ring_get_memsize_elem are experimental
>  allow_experimental_apis = true
> diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c
> index fa5733907..222eec0fb 100644
> --- a/lib/librte_ring/rte_ring.c
> +++ b/lib/librte_ring/rte_ring.c
> @@ -45,6 +45,9 @@ EAL_REGISTER_TAILQ(rte_ring_tailq)
>  /* true if x is a power of 2 */
>  #define POWEROF2(x) ((((x)-1) & (x)) == 0)
> 
> +/* by default set head/tail distance as 1/8 of ring capacity */
> +#define HTD_MAX_DEF	8
> +
>  /* return the size of memory occupied by a ring */
>  ssize_t
>  rte_ring_get_memsize_elem(unsigned int esize, unsigned int count)
> @@ -79,11 +82,84 @@ rte_ring_get_memsize(unsigned int count)
>  	return rte_ring_get_memsize_elem(sizeof(void *), count);
>  }
> 
> +/*
> + * internal helper function to reset prod/cons head-tail values.
> + */
> +static void
> +reset_headtail(void *p)
> +{
> +	struct rte_ring_headtail *ht;
> +	struct rte_ring_rts_headtail *ht_rts;
> +
> +	ht = p;
> +	ht_rts = p;
> +
> +	switch (ht->sync_type) {
> +	case RTE_RING_SYNC_MT:
> +	case RTE_RING_SYNC_ST:
> +		ht->head = 0;
> +		ht->tail = 0;
> +		break;
> +	case RTE_RING_SYNC_MT_RTS:
> +		ht_rts->head.raw = 0;
> +		ht_rts->tail.raw = 0;
> +		break;
> +	default:
> +		/* unknown sync mode */
> +		RTE_ASSERT(0);
> +	}
> +}
> +
>  void
>  rte_ring_reset(struct rte_ring *r)
>  {
> -	r->prod.head = r->cons.head = 0;
> -	r->prod.tail = r->cons.tail = 0;
> +	reset_headtail(&r->prod);
> +	reset_headtail(&r->cons);
> +}
> +
> +/*
> + * helper function, calculates sync_type values for prod and cons
> + * based on input flags. Returns zero at success or negative
> + * errno value otherwise.
> + */
> +static int
> +get_sync_type(uint32_t flags, enum rte_ring_sync_type *prod_st,
> +	enum rte_ring_sync_type *cons_st)
> +{
> +	static const uint32_t prod_st_flags =
> +		(RING_F_SP_ENQ | RING_F_MP_RTS_ENQ);
> +	static const uint32_t cons_st_flags =
> +		(RING_F_SC_DEQ | RING_F_MC_RTS_DEQ);
> +
> +	switch (flags & prod_st_flags) {
> +	case 0:
> +		*prod_st = RTE_RING_SYNC_MT;
> +		break;
> +	case RING_F_SP_ENQ:
> +		*prod_st = RTE_RING_SYNC_ST;
> +		break;
> +	case RING_F_MP_RTS_ENQ:
> +		*prod_st = RTE_RING_SYNC_MT_RTS;
> +		break;
> +	default:
> +		return -EINVAL;
> +	}
> +
> +	switch (flags & cons_st_flags) {
> +	case 0:
> +		*cons_st = RTE_RING_SYNC_MT;
> +		break;
> +	case RING_F_SC_DEQ:
> +		*cons_st = RTE_RING_SYNC_ST;
> +		break;
> +	case RING_F_MC_RTS_DEQ:
> +		*cons_st = RTE_RING_SYNC_MT_RTS;
> +		break;
> +	default:
> +		return -EINVAL;
> +	}
> +
> +	return 0;
>  }
> 
>  int
> @@ -100,16 +176,20 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
>  	RTE_BUILD_BUG_ON((offsetof(struct rte_ring, prod) &
>  			  RTE_CACHE_LINE_MASK) != 0);
> 
> +	RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, sync_type) !=
> +		offsetof(struct rte_ring_rts_headtail, sync_type));
> +	RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, tail) !=
> +		offsetof(struct rte_ring_rts_headtail, tail.val.pos));
> +
>  	/* init the ring structure */
>  	memset(r, 0, sizeof(*r));
>  	ret = strlcpy(r->name, name, sizeof(r->name));
>  	if (ret < 0 || ret >= (int)sizeof(r->name))
>  		return -ENAMETOOLONG;
>  	r->flags = flags;
> -	r->prod.sync_type = (flags & RING_F_SP_ENQ) ?
> -		RTE_RING_SYNC_ST : RTE_RING_SYNC_MT;
> -	r->cons.sync_type = (flags & RING_F_SC_DEQ) ?
> -		RTE_RING_SYNC_ST : RTE_RING_SYNC_MT;
> +	ret = get_sync_type(flags, &r->prod.sync_type, &r->cons.sync_type);
> +	if (ret != 0)
> +		return ret;
> 
>  	if (flags & RING_F_EXACT_SZ) {
>  		r->size = rte_align32pow2(count + 1);
> @@ -126,8 +206,12 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
>  		r->mask = count - 1;
>  		r->capacity = r->mask;
>  	}
> -	r->prod.head = r->cons.head = 0;
> -	r->prod.tail = r->cons.tail = 0;
> +
> +	/* set default values for head-tail distance */
> +	if (flags & RING_F_MP_RTS_ENQ)
> +		rte_ring_set_prod_htd_max(r, r->capacity / HTD_MAX_DEF);
> +	if (flags & RING_F_MC_RTS_DEQ)
> +		rte_ring_set_cons_htd_max(r, r->capacity / HTD_MAX_DEF);
> 
>  	return 0;
>  }
> diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
> index d4775a063..2b42a0211 100644
> --- a/lib/librte_ring/rte_ring.h
> +++ b/lib/librte_ring/rte_ring.h
> @@ -65,10 +65,13 @@ enum rte_ring_queue_behavior {
>  enum rte_ring_sync_type {
>  	RTE_RING_SYNC_MT,     /**< multi-thread safe (default mode) */
>  	RTE_RING_SYNC_ST,     /**< single thread only */
> +#ifdef ALLOW_EXPERIMENTAL_API
> +	RTE_RING_SYNC_MT_RTS, /**< multi-thread relaxed tail sync */
> +#endif
>  };
> 
>  /**
> - * structure to hold a pair of head/tail values and other metadata.
> + * structures to hold a pair of head/tail values and other metadata.
>   * Depending on sync_type format of that structure might be different,
>   * but offset for *sync_type* and *tail* values should remain the same.
>   */
> @@ -84,6 +87,21 @@ struct rte_ring_headtail {
>  	};
>  };
> 
> +union rte_ring_ht_poscnt {
> +	uint64_t raw;
> +	struct {
> +		uint32_t cnt; /**< head/tail reference counter */
> +		uint32_t pos; /**< head/tail position */
> +	} val;
> +};
> +
> +struct rte_ring_rts_headtail {
> +	volatile union rte_ring_ht_poscnt tail;
> +	enum rte_ring_sync_type sync_type;  /**< sync type of prod/cons */
> +	uint32_t htd_max;   /**< max allowed distance between head/tail */
> +	volatile union rte_ring_ht_poscnt head;
> +};
> +
>  /**
>   * An RTE ring structure.
>   *
> @@ -111,11 +129,21 @@ struct rte_ring {
>  	char pad0 __rte_cache_aligned; /**< empty cache line */
> 
>  	/** Ring producer status. */
> -	struct rte_ring_headtail prod __rte_cache_aligned;
> +	RTE_STD_C11
> +	union {
> +		struct rte_ring_headtail prod;
> +		struct rte_ring_rts_headtail rts_prod;
> +	}  __rte_cache_aligned;
> +
>  	char pad1 __rte_cache_aligned; /**< empty cache line */
> 
>  	/** Ring consumer status. */
> -	struct rte_ring_headtail cons __rte_cache_aligned;
> +	RTE_STD_C11
> +	union {
> +		struct rte_ring_headtail cons;
> +		struct rte_ring_rts_headtail rts_cons;
> +	}  __rte_cache_aligned;
> +
>  	char pad2 __rte_cache_aligned; /**< empty cache line */
>  };
> 
> @@ -132,6 +160,9 @@ struct rte_ring {
>  #define RING_F_EXACT_SZ 0x0004
>  #define RTE_RING_SZ_MASK  (0x7fffffffU) /**< Ring size mask */
> 
> +#define RING_F_MP_RTS_ENQ 0x0008 /**< The default enqueue is "MP RTS". */
> +#define RING_F_MC_RTS_DEQ 0x0010 /**< The default dequeue is "MC RTS". */
> +
>  #define __IS_SP RTE_RING_SYNC_ST
>  #define __IS_MP RTE_RING_SYNC_MT
>  #define __IS_SC RTE_RING_SYNC_ST
> @@ -461,6 +492,10 @@ rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
>  			RTE_RING_SYNC_ST, free_space);
>  }
> 
> +#ifdef ALLOW_EXPERIMENTAL_API
> +#include <rte_ring_rts.h>
> +#endif
> +
>  /**
>   * Enqueue several objects on a ring.
>   *
> @@ -484,8 +519,21 @@ static __rte_always_inline unsigned int
>  rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
>  		      unsigned int n, unsigned int *free_space)
>  {
> -	return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
> -			r->prod.sync_type, free_space);
> +	switch (r->prod.sync_type) {
> +	case RTE_RING_SYNC_MT:
> +		return rte_ring_mp_enqueue_bulk(r, obj_table, n, free_space);
> +	case RTE_RING_SYNC_ST:
> +		return rte_ring_sp_enqueue_bulk(r, obj_table, n, free_space);
> +#ifdef ALLOW_EXPERIMENTAL_API
> +	case RTE_RING_SYNC_MT_RTS:
> +		return rte_ring_mp_rts_enqueue_bulk(r, obj_table, n,
> +			free_space);
> +#endif
> +	}
> +
> +	/* valid ring should never reach this point */
> +	RTE_ASSERT(0);
> +	return 0;
>  }
> 
>  /**
> @@ -619,8 +667,20 @@ static __rte_always_inline unsigned int
>  rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n,
>  		unsigned int *available)
>  {
> -	return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
> -				r->cons.sync_type, available);
> +	switch (r->cons.sync_type) {
> +	case RTE_RING_SYNC_MT:
> +		return rte_ring_mc_dequeue_bulk(r, obj_table, n, available);
> +	case RTE_RING_SYNC_ST:
> +		return rte_ring_sc_dequeue_bulk(r, obj_table, n, available);
> +#ifdef ALLOW_EXPERIMENTAL_API
> +	case RTE_RING_SYNC_MT_RTS:
> +		return rte_ring_mc_rts_dequeue_bulk(r, obj_table, n, available);
> +#endif
> +	}
> +
> +	/* valid ring should never reach this point */
> +	RTE_ASSERT(0);
> +	return 0;
>  }
> 
>  /**
> @@ -940,8 +1000,21 @@ static __rte_always_inline unsigned
>  rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
>  		      unsigned int n, unsigned int *free_space)
>  {
> -	return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE,
> -			r->prod.sync_type, free_space);
> +	switch (r->prod.sync_type) {
> +	case RTE_RING_SYNC_MT:
> +		return rte_ring_mp_enqueue_burst(r, obj_table, n, free_space);
> +	case RTE_RING_SYNC_ST:
> +		return rte_ring_sp_enqueue_burst(r, obj_table, n, free_space);
> +#ifdef ALLOW_EXPERIMENTAL_API
> +	case RTE_RING_SYNC_MT_RTS:
> +		return rte_ring_mp_rts_enqueue_burst(r, obj_table, n,
> +			free_space);
> +#endif
> +	}
> +
> +	/* valid ring should never reach this point */
> +	RTE_ASSERT(0);
> +	return 0;
>  }
> 
>  /**
> @@ -1020,9 +1093,21 @@ static __rte_always_inline unsigned
>  rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table,
>  		unsigned int n, unsigned int *available)
>  {
> -	return __rte_ring_do_dequeue(r, obj_table, n,
> -				RTE_RING_QUEUE_VARIABLE,
> -				r->cons.sync_type, available);
> +	switch (r->cons.sync_type) {
> +	case RTE_RING_SYNC_MT:
> +		return rte_ring_mc_dequeue_burst(r, obj_table, n, available);
> +	case RTE_RING_SYNC_ST:
> +		return rte_ring_sc_dequeue_burst(r, obj_table, n, available);
> +#ifdef ALLOW_EXPERIMENTAL_API
> +	case RTE_RING_SYNC_MT_RTS:
> +		return rte_ring_mc_rts_dequeue_burst(r, obj_table, n,
> +			available);
> +#endif
> +	}
> +
> +	/* valid ring should never reach this point */
> +	RTE_ASSERT(0);
> +	return 0;
>  }
> 
>  #ifdef __cplusplus
> diff --git a/lib/librte_ring/rte_ring_elem.h b/lib/librte_ring/rte_ring_elem.h
> index 28f9836e6..5de0850dc 100644
> --- a/lib/librte_ring/rte_ring_elem.h
> +++ b/lib/librte_ring/rte_ring_elem.h
> @@ -542,6 +542,8 @@ rte_ring_sp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
>  			RTE_RING_QUEUE_FIXED, __IS_SP, free_space);
>  }
> 
> +#include <rte_ring_rts_elem.h>
> +
>  /**
>   * Enqueue several objects on a ring.
>   *
> @@ -571,6 +573,26 @@ rte_ring_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
>  {
>  	return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
>  			RTE_RING_QUEUE_FIXED, r->prod.sync_type, free_space);
> +
> +	switch (r->prod.sync_type) {
> +	case RTE_RING_SYNC_MT:
> +		return rte_ring_mp_enqueue_bulk_elem(r, obj_table, esize, n,
> +			free_space);
> +	case RTE_RING_SYNC_ST:
> +		return rte_ring_sp_enqueue_bulk_elem(r, obj_table, esize, n,
> +			free_space);
> +#ifdef ALLOW_EXPERIMENTAL_API
> +	case RTE_RING_SYNC_MT_RTS:
> +		return rte_ring_mp_rts_enqueue_bulk_elem(r, obj_table, esize, n,
> +			free_space);
> +#endif
> +	}
> +
> +	/* valid ring should never reach this point */
> +	RTE_ASSERT(0);
> +	if (free_space != NULL)
> +		*free_space = 0;
> +	return 0;
>  }
> 
>  /**
> @@ -733,8 +755,25 @@ static __rte_always_inline unsigned int
>  rte_ring_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
>  		unsigned int esize, unsigned int n, unsigned int *available)
>  {
> -	return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
> -			RTE_RING_QUEUE_FIXED, r->cons.sync_type, available);
> +	switch (r->cons.sync_type) {
> +	case RTE_RING_SYNC_MT:
> +		return rte_ring_mc_dequeue_bulk_elem(r, obj_table, esize, n,
> +			available);
> +	case RTE_RING_SYNC_ST:
> +		return rte_ring_sc_dequeue_bulk_elem(r, obj_table, esize, n,
> +			available);
> +#ifdef ALLOW_EXPERIMENTAL_API
> +	case RTE_RING_SYNC_MT_RTS:
> +		return rte_ring_mc_rts_dequeue_bulk_elem(r, obj_table, esize,
> +			n, available);
> +#endif
> +	}
> +
> +	/* valid ring should never reach this point */
> +	RTE_ASSERT(0);
> +	if (available != NULL)
> +		*available = 0;
> +	return 0;
>  }
> 
>  /**
> @@ -901,8 +940,25 @@ static __rte_always_inline unsigned
>  rte_ring_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
>  		unsigned int esize, unsigned int n, unsigned int *free_space)
>  {
> -	return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
> -			RTE_RING_QUEUE_VARIABLE, r->prod.sync_type, free_space);
> +	switch (r->prod.sync_type) {
> +	case RTE_RING_SYNC_MT:
> +		return rte_ring_mp_enqueue_burst_elem(r, obj_table, esize, n,
> +			free_space);
> +	case RTE_RING_SYNC_ST:
> +		return rte_ring_sp_enqueue_burst_elem(r, obj_table, esize, n,
> +			free_space);
> +#ifdef ALLOW_EXPERIMENTAL_API
> +	case RTE_RING_SYNC_MT_RTS:
> +		return rte_ring_mp_rts_enqueue_burst_elem(r, obj_table, esize,
> +			n, free_space);
> +#endif
> +	}
> +
> +	/* valid ring should never reach this point */
> +	RTE_ASSERT(0);
> +	if (free_space != NULL)
> +		*free_space = 0;
> +	return 0;
>  }
> 
>  /**
> @@ -993,9 +1049,25 @@ static __rte_always_inline unsigned int
>  rte_ring_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
>  		unsigned int esize, unsigned int n, unsigned int *available)
>  {
> -	return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
> -				RTE_RING_QUEUE_VARIABLE,
> -				r->cons.sync_type, available);
> +	switch (r->cons.sync_type) {
> +	case RTE_RING_SYNC_MT:
> +		return rte_ring_mc_dequeue_burst_elem(r, obj_table, esize, n,
> +			available);
> +	case RTE_RING_SYNC_ST:
> +		return rte_ring_sc_dequeue_burst_elem(r, obj_table, esize, n,
> +			available);
> +#ifdef ALLOW_EXPERIMENTAL_API
> +	case RTE_RING_SYNC_MT_RTS:
> +		return rte_ring_mc_rts_dequeue_burst_elem(r, obj_table, esize,
> +			n, available);
> +#endif
> +	}
> +
> +	/* valid ring should never reach this point */
> +	RTE_ASSERT(0);
> +	if (available != NULL)
> +		*available = 0;
> +	return 0;
>  }
> 
>  #ifdef __cplusplus
> diff --git a/lib/librte_ring/rte_ring_rts.h b/lib/librte_ring/rte_ring_rts.h
> new file mode 100644
> index 000000000..18404fe48
> --- /dev/null
> +++ b/lib/librte_ring/rte_ring_rts.h
> @@ -0,0 +1,316 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + *
> + * Copyright (c) 2010-2017 Intel Corporation
> + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
> + * All rights reserved.
> + * Derived from FreeBSD's bufring.h
> + * Used as BSD-3 Licensed with permission from Kip Macy.
> + */
> +
> +#ifndef _RTE_RING_RTS_H_
> +#define _RTE_RING_RTS_H_
> +
> +/**
> + * @file rte_ring_rts.h
> + * @b EXPERIMENTAL: this API may change without prior notice
> + * It is not recommended to include this file directly.
> + * Please include <rte_ring.h> instead.
> + *
> + * Contains functions for Relaxed Tail Sync (RTS) ring mode.
> + * The main idea remains the same as for our original MP/MC synchronization
> + * mechanism.
> + * The main difference is that tail value is increased not
> + * by every thread that finished enqueue/dequeue,
> + * but only by the last one doing enqueue/dequeue.
> + * That allows threads to skip spinning on tail value,
> + * leaving actual tail value change to last thread in the update queue.
> + * RTS requires 2 64-bit CAS for each enqueue(/dequeue) operation:
> + * one for head update, second for tail update.
> + * As a gain it allows thread to avoid spinning/waiting on tail value.
> + * In comparision original MP/MC algorithm requires one 32-bit CAS
> + * for head update and waiting/spinning on tail value.
> + *
> + * Brief outline:
> + *  - introduce refcnt for both head and tail.
> + *  - increment head.refcnt for each head.value update
> + *  - write head:value and head:refcnt atomically (64-bit CAS)
> + *  - move tail.value ahead only when tail.refcnt + 1 == head.refcnt
> + *  - increment tail.refcnt when each enqueue/dequeue op finishes
> + *    (no matter is tail:value going to change or not)
> + *  - write tail.value and tail.recnt atomically (64-bit CAS)
> + *
> + * To avoid producer/consumer starvation:
> + *  - limit max allowed distance between head and tail value (HTD_MAX).
> + *    I.E. thread is allowed to proceed with changing head.value,
> + *    only when:  head.value - tail.value <= HTD_MAX
> + * HTD_MAX is an optional parameter.
> + * With HTD_MAX == 0 we'll have fully serialized ring -
> + * i.e. only one thread at a time will be able to enqueue/dequeue
> + * to/from the ring.
> + * With HTD_MAX >= ring.capacity - no limitation.
> + * By default HTD_MAX == ring.capacity / 8.
> + */
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +#include <rte_ring_rts_generic.h>
> +
> +/**
> + * @internal Enqueue several objects on the RTS ring.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to add in the ring from the obj_table.
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
> + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
> + * @param free_space
> + *   returns the amount of space after the enqueue operation has finished
> + * @return
> + *   Actual number of objects enqueued.
> + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_rts_enqueue(struct rte_ring *r, void * const *obj_table,
> +		uint32_t n, enum rte_ring_queue_behavior behavior,
> +		uint32_t *free_space)
> +{
> +	uint32_t free, head;
> +
> +	n =  __rte_ring_rts_move_prod_head(r, n, behavior, &head, &free);
> +
> +	if (n != 0) {
> +		ENQUEUE_PTRS(r, &r[1], head, obj_table, n, void *);
> +		__rte_ring_rts_update_tail(&r->rts_prod);
> +	}
> +
> +	if (free_space != NULL)
> +		*free_space = free - n;
> +	return n;
> +}
> +
> +/**
> + * @internal Dequeue several objects from the RTS ring.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to pull from the ring.
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
> + *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
> + * @param available
> + *   returns the number of remaining ring entries after the dequeue has finished
> + * @return
> + *   - Actual number of objects dequeued.
> + *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_rts_dequeue(struct rte_ring *r, void **obj_table,
> +		uint32_t n, enum rte_ring_queue_behavior behavior,
> +		uint32_t *available)
> +{
> +	uint32_t entries, head;
> +
> +	n = __rte_ring_rts_move_cons_head(r, n, behavior, &head, &entries);
> +
> +	if (n != 0) {
> +		DEQUEUE_PTRS(r, &r[1], head, obj_table, n, void *);
> +		__rte_ring_rts_update_tail(&r->rts_cons);
> +	}
> +
> +	if (available != NULL)
> +		*available = entries - n;
> +	return n;
> +}
> +
> +/**
> + * Enqueue several objects on the RTS ring (multi-producers safe).
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to add in the ring from the obj_table.
> + * @param free_space
> + *   if non-NULL, returns the amount of space in the ring after the
> + *   enqueue operation has finished.
> + * @return
> + *   The number of objects enqueued, either 0 or n
> + */
> +__rte_experimental
> +static __rte_always_inline unsigned int
> +rte_ring_mp_rts_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
> +			 unsigned int n, unsigned int *free_space)
> +{
> +	return __rte_ring_do_rts_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
> +			free_space);
> +}
> +
> +/**
> + * Dequeue several objects from an RTS ring (multi-consumers safe).
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects) that will be filled.
> + * @param n
> + *   The number of objects to dequeue from the ring to the obj_table.
> + * @param available
> + *   If non-NULL, returns the number of remaining ring entries after the
> + *   dequeue has finished.
> + * @return
> + *   The number of objects dequeued, either 0 or n
> + */
> +__rte_experimental
> +static __rte_always_inline unsigned int
> +rte_ring_mc_rts_dequeue_bulk(struct rte_ring *r, void **obj_table,
> +		unsigned int n, unsigned int *available)
> +{
> +	return __rte_ring_do_rts_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
> +			available);
> +}
> +
> +/**
> + * Return producer max Head-Tail-Distance (HTD).
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @return
> + *   Producer HTD value, if producer is set in appropriate sync mode,
> + *   or UINT32_MAX otherwise.
> + */
> +__rte_experimental
> +static inline uint32_t
> +rte_ring_get_prod_htd_max(const struct rte_ring *r)
> +{
> +	if (r->prod.sync_type == RTE_RING_SYNC_MT_RTS)
> +		return r->rts_prod.htd_max;
> +	return UINT32_MAX;
> +}
> +
> +/**
> + * Set producer max Head-Tail-Distance (HTD).
> + * Note that producer has to use appropriate sync mode (RTS).
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param v
> + *   new HTD value to setup.
> + * @return
> + *   Zero on success, or negative error code otherwise.
> + */
> +__rte_experimental
> +static inline int
> +rte_ring_set_prod_htd_max(struct rte_ring *r, uint32_t v)
> +{
> +	if (r->prod.sync_type != RTE_RING_SYNC_MT_RTS)
> +		return -ENOTSUP;
> +
> +	r->rts_prod.htd_max = v;
> +	return 0;
> +}
> +
> +/**
> + * Return consumer max Head-Tail-Distance (HTD).
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @return
> + *   Consumer HTD value, if consumer is set in appropriate sync mode,
> + *   or UINT32_MAX otherwise.
> + */
> +__rte_experimental
> +static inline uint32_t
> +rte_ring_get_cons_htd_max(const struct rte_ring *r)
> +{
> +	if (r->cons.sync_type == RTE_RING_SYNC_MT_RTS)
> +		return r->rts_cons.htd_max;
> +	return UINT32_MAX;
> +}
> +
> +/**
> + * Set consumer max Head-Tail-Distance (HTD).
> + * Note that consumer has to use appropriate sync mode (RTS).
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param v
> + *   new HTD value to setup.
> + * @return
> + *   Zero on success, or negative error code otherwise.
> + */
> +__rte_experimental
> +static inline int
> +rte_ring_set_cons_htd_max(struct rte_ring *r, uint32_t v)
> +{
> +	if (r->cons.sync_type != RTE_RING_SYNC_MT_RTS)
> +		return -ENOTSUP;
> +
> +	r->rts_cons.htd_max = v;
> +	return 0;
> +}
> +
> +/**
> + * Enqueue several objects on the RTS ring (multi-producers safe).
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to add in the ring from the obj_table.
> + * @param free_space
> + *   if non-NULL, returns the amount of space in the ring after the
> + *   enqueue operation has finished.
> + * @return
> + *   - n: Actual number of objects enqueued.
> + */
> +__rte_experimental
> +static __rte_always_inline unsigned
> +rte_ring_mp_rts_enqueue_burst(struct rte_ring *r, void * const *obj_table,
> +			 unsigned int n, unsigned int *free_space)
> +{
> +	return __rte_ring_do_rts_enqueue(r, obj_table, n,
> +			RTE_RING_QUEUE_VARIABLE, free_space);
> +}
> +
> +/**
> + * Dequeue several objects from an RTS  ring (multi-consumers safe).
> + * When the requested objects are more than the available objects,
> + * only dequeue the actual number of objects.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects) that will be filled.
> + * @param n
> + *   The number of objects to dequeue from the ring to the obj_table.
> + * @param available
> + *   If non-NULL, returns the number of remaining ring entries after the
> + *   dequeue has finished.
> + * @return
> + *   - n: Actual number of objects dequeued, 0 if ring is empty
> + */
> +__rte_experimental
> +static __rte_always_inline unsigned
> +rte_ring_mc_rts_dequeue_burst(struct rte_ring *r, void **obj_table,
> +		unsigned int n, unsigned int *available)
> +{
> +	return __rte_ring_do_rts_dequeue(r, obj_table, n,
> +			RTE_RING_QUEUE_VARIABLE, available);
> +}
> +
> +#ifdef __cplusplus
> +}
> +#endif
> +
> +#endif /* _RTE_RING_RTS_H_ */
> diff --git a/lib/librte_ring/rte_ring_rts_elem.h b/lib/librte_ring/rte_ring_rts_elem.h
> new file mode 100644
> index 000000000..71a331b23
> --- /dev/null
> +++ b/lib/librte_ring/rte_ring_rts_elem.h
> @@ -0,0 +1,205 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + *
> + * Copyright (c) 2010-2017 Intel Corporation
> + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
> + * All rights reserved.
> + * Derived from FreeBSD's bufring.h
> + * Used as BSD-3 Licensed with permission from Kip Macy.
> + */
> +
> +#ifndef _RTE_RING_RTS_ELEM_H_
> +#define _RTE_RING_RTS_ELEM_H_
> +
> +/**
> + * @file rte_ring_rts_elem.h
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * It is not recommended to include this file directly.
> + * Please include <rte_ring_elem.h> instead.
> + * Contains *ring_elem* functions for Relaxed Tail Sync (RTS) ring mode.
> + * for more details please refer to <rte_ring_rts.h>.
> + */
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +#include <rte_ring_rts_generic.h>
> +
> +/**
> + * @internal Enqueue several objects on the RTS ring.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to add in the ring from the obj_table.
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
> + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
> + * @param free_space
> + *   returns the amount of space after the enqueue operation has finished
> + * @return
> + *   Actual number of objects enqueued.
> + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_rts_enqueue_elem(struct rte_ring *r, void * const *obj_table,
> +	uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior,
> +	uint32_t *free_space)
> +{
> +	uint32_t free, head;
> +
> +	n =  __rte_ring_rts_move_prod_head(r, n, behavior, &head, &free);
> +
> +	if (n != 0) {
> +		__rte_ring_enqueue_elems(r, head, obj_table, esize, n);
> +		__rte_ring_rts_update_tail(&r->rts_prod);
> +	}
> +
> +	if (free_space != NULL)
> +		*free_space = free - n;
> +	return n;
> +}
> +
> +/**
> + * @internal Dequeue several objects from the RTS ring.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to pull from the ring.
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
> + *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
> + * @param available
> + *   returns the number of remaining ring entries after the dequeue has finished
> + * @return
> + *   - Actual number of objects dequeued.
> + *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_rts_dequeue_elem(struct rte_ring *r, void **obj_table,
> +	uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior,
> +	uint32_t *available)
> +{
> +	uint32_t entries, head;
> +
> +	n = __rte_ring_rts_move_cons_head(r, n, behavior, &head, &entries);
> +
> +	if (n != 0) {
> +		__rte_ring_dequeue_elems(r, head, obj_table, esize, n);
> +		__rte_ring_rts_update_tail(&r->rts_cons);
> +	}
> +
> +	if (available != NULL)
> +		*available = entries - n;
> +	return n;
> +}
> +
> +/**
> + * Enqueue several objects on the RTS ring (multi-producers safe).
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to add in the ring from the obj_table.
> + * @param free_space
> + *   if non-NULL, returns the amount of space in the ring after the
> + *   enqueue operation has finished.
> + * @return
> + *   The number of objects enqueued, either 0 or n
> + */
> +__rte_experimental
> +static __rte_always_inline unsigned int
> +rte_ring_mp_rts_enqueue_bulk_elem(struct rte_ring *r, void * const *obj_table,
> +	unsigned int esize, unsigned int n, unsigned int *free_space)
> +{
> +	return __rte_ring_do_rts_enqueue_elem(r, obj_table, esize, n,
> +			RTE_RING_QUEUE_FIXED, free_space);
> +}
> +
> +/**
> + * Dequeue several objects from an RTS ring (multi-consumers safe).
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects) that will be filled.
> + * @param n
> + *   The number of objects to dequeue from the ring to the obj_table.
> + * @param available
> + *   If non-NULL, returns the number of remaining ring entries after the
> + *   dequeue has finished.
> + * @return
> + *   The number of objects dequeued, either 0 or n
> + */
> +__rte_experimental
> +static __rte_always_inline unsigned int
> +rte_ring_mc_rts_dequeue_bulk_elem(struct rte_ring *r, void **obj_table,
> +	unsigned int esize, unsigned int n, unsigned int *available)
> +{
> +	return __rte_ring_do_rts_dequeue_elem(r, obj_table, esize, n,
> +		RTE_RING_QUEUE_FIXED, available);
> +}
> +
> +/**
> + * Enqueue several objects on the RTS ring (multi-producers safe).
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to add in the ring from the obj_table.
> + * @param free_space
> + *   if non-NULL, returns the amount of space in the ring after the
> + *   enqueue operation has finished.
> + * @return
> + *   - n: Actual number of objects enqueued.
> + */
> +__rte_experimental
> +static __rte_always_inline unsigned
> +rte_ring_mp_rts_enqueue_burst_elem(struct rte_ring *r, void * const *obj_table,
> +	unsigned int esize, unsigned int n, unsigned int *free_space)
> +{
> +	return __rte_ring_do_rts_enqueue_elem(r, obj_table, esize, n,
> +			RTE_RING_QUEUE_VARIABLE, free_space);
> +}
> +
> +/**
> + * Dequeue several objects from an RTS  ring (multi-consumers safe).
> + * When the requested objects are more than the available objects,
> + * only dequeue the actual number of objects.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects) that will be filled.
> + * @param n
> + *   The number of objects to dequeue from the ring to the obj_table.
> + * @param available
> + *   If non-NULL, returns the number of remaining ring entries after the
> + *   dequeue has finished.
> + * @return
> + *   - n: Actual number of objects dequeued, 0 if ring is empty
> + */
> +__rte_experimental
> +static __rte_always_inline unsigned
> +rte_ring_mc_rts_dequeue_burst_elem(struct rte_ring *r, void **obj_table,
> +	unsigned int esize, unsigned int n, unsigned int *available)
> +{
> +	return __rte_ring_do_rts_dequeue_elem(r, obj_table, esize, n,
> +			RTE_RING_QUEUE_VARIABLE, available);
> +}
> +
> +#ifdef __cplusplus
> +}
> +#endif
> +
> +#endif /* _RTE_RING_RTS_ELEM_H_ */
> diff --git a/lib/librte_ring/rte_ring_rts_generic.h b/lib/librte_ring/rte_ring_rts_generic.h
> new file mode 100644
> index 000000000..31a37924c
> --- /dev/null
> +++ b/lib/librte_ring/rte_ring_rts_generic.h
> @@ -0,0 +1,210 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + *
> + * Copyright (c) 2010-2017 Intel Corporation
> + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
> + * All rights reserved.
> + * Derived from FreeBSD's bufring.h
> + * Used as BSD-3 Licensed with permission from Kip Macy.
> + */
> +
> +#ifndef _RTE_RING_RTS_GENERIC_H_
> +#define _RTE_RING_RTS_GENERIC_H_
> +
> +/**
> + * @file rte_ring_rts_generic.h
> + * It is not recommended to include this file directly,
> + * include <rte_ring.h> instead.
> + * Contains internal helper functions for Relaxed Tail Sync (RTS) ring mode.
> + * For more information please refer to <rte_ring_rts.h>.
> + */
> +
> +/**
> + * @internal This function updates tail values.
> + */
> +static __rte_always_inline void
> +__rte_ring_rts_update_tail(struct rte_ring_rts_headtail *ht)
> +{
> +	union rte_ring_ht_poscnt h, ot, nt;
> +
> +	/*
> +	 * If there are other enqueues/dequeues in progress that
> +	 * might preceded us, then don't update tail with new value.
> +	 */
> +
> +	do {
> +		ot.raw = ht->tail.raw;
> +		rte_smp_rmb();
> +
> +		/* on 32-bit systems we have to do atomic read here */
> +		h.raw = rte_atomic64_read((rte_atomic64_t *)
> +			(uintptr_t)&ht->head.raw);
> +
> +		nt.raw = ot.raw;
> +		if (++nt.val.cnt == h.val.cnt)
> +			nt.val.pos = h.val.pos;
> +
> +	} while (rte_atomic64_cmpset(&ht->tail.raw, ot.raw, nt.raw) == 0);
> +}
> +
> +/**
> + * @internal This function waits till head/tail distance wouldn't
> + * exceed pre-defined max value.
> + */
> +static __rte_always_inline void
> +__rte_ring_rts_head_wait(const struct rte_ring_rts_headtail *ht,
> +	union rte_ring_ht_poscnt *h)
> +{
> +	uint32_t max;
> +
> +	max = ht->htd_max;
> +	h->raw = ht->head.raw;
> +	rte_smp_rmb();
> +
> +	while (h->val.pos - ht->tail.val.pos > max) {
> +		rte_pause();
> +		h->raw = ht->head.raw;
> +		rte_smp_rmb();
> +	}
> +}
> +
> +/**
> + * @internal This function updates the producer head for enqueue.
> + *
> + * @param r
> + *   A pointer to the ring structure
> + * @param is_sp
> + *   Indicates whether multi-producer path is needed or not
> + * @param n
> + *   The number of elements we will want to enqueue, i.e. how far should the
> + *   head be moved
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
> + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
> + * @param old_head
> + *   Returns head value as it was before the move, i.e. where enqueue starts
> + * @param new_head
> + *   Returns the current/new head value i.e. where enqueue finishes
> + * @param free_entries
> + *   Returns the amount of free space in the ring BEFORE head was moved
> + * @return
> + *   Actual number of objects enqueued.
> + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline uint32_t
> +__rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num,
> +	enum rte_ring_queue_behavior behavior, uint32_t *old_head,
> +	uint32_t *free_entries)
> +{
> +	uint32_t n;
> +	union rte_ring_ht_poscnt nh, oh;
> +
> +	const uint32_t capacity = r->capacity;
> +
> +	do {
> +		/* Reset n to the initial burst count */
> +		n = num;
> +
> +		/* read prod head (may spin on prod tail) */
> +		__rte_ring_rts_head_wait(&r->rts_prod, &oh);
> +
> +		/* add rmb barrier to avoid load/load reorder in weak
> +		 * memory model. It is noop on x86
> +		 */
> +		rte_smp_rmb();
> +
> +		/*
> +		 *  The subtraction is done between two unsigned 32bits value
> +		 * (the result is always modulo 32 bits even if we have
> +		 * *old_head > cons_tail). So 'free_entries' is always between 0
> +		 * and capacity (which is < size).
> +		 */
> +		*free_entries = capacity + r->cons.tail - oh.val.pos;
> +
> +		/* check that we have enough room in ring */
> +		if (unlikely(n > *free_entries))
> +			n = (behavior == RTE_RING_QUEUE_FIXED) ?
> +					0 : *free_entries;
> +
> +		if (n == 0)
> +			return 0;
> +
> +		nh.val.pos = oh.val.pos + n;
> +		nh.val.cnt = oh.val.cnt + 1;
> +
> +	} while (rte_atomic64_cmpset(&r->rts_prod.head.raw,
> +			oh.raw, nh.raw) == 0);
> +
> +	*old_head = oh.val.pos;
> +	return n;
> +}
> +
> +/**
> + * @internal This function updates the consumer head for dequeue
> + *
> + * @param r
> + *   A pointer to the ring structure
> + * @param is_sc
> + *   Indicates whether multi-consumer path is needed or not
> + * @param n
> + *   The number of elements we will want to enqueue, i.e. how far should the
> + *   head be moved
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
> + *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
> + * @param old_head
> + *   Returns head value as it was before the move, i.e. where dequeue starts
> + * @param new_head
> + *   Returns the current/new head value i.e. where dequeue finishes
> + * @param entries
> + *   Returns the number of entries in the ring BEFORE head was moved
> + * @return
> + *   - Actual number of objects dequeued.
> + *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_rts_move_cons_head(struct rte_ring *r, uint32_t num,
> +	enum rte_ring_queue_behavior behavior, uint32_t *old_head,
> +	uint32_t *entries)
> +{
> +	uint32_t n;
> +	union rte_ring_ht_poscnt nh, oh;
> +
> +	/* move cons.head atomically */
> +	do {
> +		/* Restore n as it may change every loop */
> +		n = num;
> +
> +		/* read cons head (may spin on cons tail) */
> +		__rte_ring_rts_head_wait(&r->rts_cons, &oh);
> +
> +
> +		/* add rmb barrier to avoid load/load reorder in weak
> +		 * memory model. It is noop on x86
> +		 */
> +		rte_smp_rmb();
> +
> +		/* The subtraction is done between two unsigned 32bits value
> +		 * (the result is always modulo 32 bits even if we have
> +		 * cons_head > prod_tail). So 'entries' is always between 0
> +		 * and size(ring)-1.
> +		 */
> +		*entries = r->prod.tail - oh.val.pos;
> +
> +		/* Set the actual entries for dequeue */
> +		if (n > *entries)
> +			n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
> +
> +		if (unlikely(n == 0))
> +			return 0;
> +
> +		nh.val.pos = oh.val.pos + n;
> +		nh.val.cnt = oh.val.cnt + 1;
> +
> +	} while (rte_atomic64_cmpset(&r->rts_cons.head.raw,
> +			oh.raw, nh.raw) == 0);
> +
> +	*old_head = oh.val.pos;
> +	return n;
> +}
> +
> +#endif /* _RTE_RING_RTS_GENERIC_H_ */
> --
> 2.17.1


             reply	other threads:[~2020-03-31 17:05 UTC|newest]

Thread overview: 13+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2020-03-31 17:05 Ananyev, Konstantin [this message]
2020-03-31 19:22 ` Thomas Monjalon
2020-03-31 19:24   ` Thomas Monjalon
2020-04-01 12:52     ` Neil Horman
2020-04-06 14:02       ` Thomas Monjalon
2020-04-06 14:36         ` Neil Horman
2020-04-07  7:32           ` Ray Kinsella
2020-04-07 11:31             ` Neil Horman
2020-04-01  6:38 ` David Marchand
2020-04-02 12:36   ` Ananyev, Konstantin
2020-04-02 13:13     ` Dodji Seketeli
2020-04-02 14:27     ` David Marchand
2020-04-02 15:25       ` Ananyev, Konstantin

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=SN6PR11MB2558241135DA9A3C218F49BD9AC80@SN6PR11MB2558.namprd11.prod.outlook.com \
    --to=konstantin.ananyev@intel.com \
    --cc=bruce.richardson@intel.com \
    --cc=david.marchand@redhat.com \
    --cc=dev@dpdk.org \
    --cc=ferruh.yigit@intel.com \
    --cc=honnappa.nagarahalli@arm.com \
    --cc=nhorman@tuxdriver.com \
    --cc=thomas@monjalon.net \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).