From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mga04.intel.com (mga04.intel.com [192.55.52.120]) by dpdk.org (Postfix) with ESMTP id 10B561B1E0 for ; Thu, 11 Jan 2018 01:21:43 +0100 (CET) X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from orsmga005.jf.intel.com ([10.7.209.41]) by fmsmga104.fm.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 10 Jan 2018 16:21:43 -0800 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.46,342,1511856000"; d="scan'208";a="192627468" Received: from txasoft-yocto.an.intel.com (HELO txasoft-yocto.an.intel.com.) ([10.123.72.111]) by orsmga005.jf.intel.com with ESMTP; 10 Jan 2018 16:21:43 -0800 From: Erik Gabriel Carrillo To: pbhagavatula@caviumnetworks.com Cc: dev@dpdk.org, jerin.jacob@caviumnetworks.com, nipun.gupta@nxp.com, hemant.agrawal@nxp.com Date: Wed, 10 Jan 2018 18:21:06 -0600 Message-Id: <1515630074-29020-16-git-send-email-erik.g.carrillo@intel.com> X-Mailer: git-send-email 1.7.10 In-Reply-To: <1515630074-29020-1-git-send-email-erik.g.carrillo@intel.com> References: <1512158458-22661-1-git-send-email-erik.g.carrillo@intel.com> <1515630074-29020-1-git-send-email-erik.g.carrillo@intel.com> Subject: [dpdk-dev] [PATCH v6 15/23] eventtimer: add buffering of timer expiry events X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Thu, 11 Jan 2018 00:21:44 -0000 Buffer timer expiry events generated while walking a "run list" in rte_timer_manage, and burst enqueue them to an event device to the extent possible. Signed-off-by: Erik Gabriel Carrillo --- lib/librte_eventdev/rte_event_timer_adapter.c | 118 +++++++++++++++++++++++--- 1 file changed, 108 insertions(+), 10 deletions(-) diff --git a/lib/librte_eventdev/rte_event_timer_adapter.c b/lib/librte_eventdev/rte_event_timer_adapter.c index 8bd9ebc..176cc5b 100644 --- a/lib/librte_eventdev/rte_event_timer_adapter.c +++ b/lib/librte_eventdev/rte_event_timer_adapter.c @@ -447,7 +447,93 @@ rte_event_timer_cancel_burst(const struct rte_event_timer_adapter *adapter, } /* - * Software event timer adapter ops definitions + * Software event timer adapter buffer helper functions + */ + +#define EVENT_BUFFER_SZ 1024 +#if EVENT_BUFFER_SZ < 1 || EVENT_BUFFER_SZ > 65536 +#error "EVENT_BUFFER_SZ must be between 1 and 65536" +#endif + +#define EVENT_BUFFER_BATCHSZ 32 + +struct event_buffer { + uint16_t head; + uint16_t tail; + uint16_t count; + struct rte_event events[EVENT_BUFFER_SZ]; +} __rte_cache_aligned; + +static inline bool +event_buffer_full(struct event_buffer *bufp) +{ + return (bufp->tail + 1) % EVENT_BUFFER_SZ == bufp->head; +} + +static inline bool +event_buffer_batch_ready(struct event_buffer *bufp) +{ + return bufp->count >= EVENT_BUFFER_BATCHSZ; +} + +static void +event_buffer_init(struct event_buffer *bufp) +{ + bufp->head = bufp->tail = bufp->count = 0; + memset(&bufp->events, 0, sizeof(struct rte_event) * EVENT_BUFFER_SZ); +} + +static int +event_buffer_add(struct event_buffer *bufp, struct rte_event *eventp) +{ + uint16_t *tailp = &bufp->tail; + struct rte_event *buf_eventp; + + if (event_buffer_full(bufp)) + return -1; + + buf_eventp = &bufp->events[*tailp]; + rte_memcpy(buf_eventp, eventp, sizeof(struct rte_event)); + + *tailp = (*tailp + 1) % EVENT_BUFFER_SZ; + bufp->count++; + + return 0; +} + +static void +event_buffer_flush(struct event_buffer *bufp, uint8_t dev_id, uint8_t port_id, + uint16_t *nb_events_flushed) +{ + uint16_t n = 0; + uint16_t *headp = &bufp->head; + uint16_t *tailp = &bufp->tail; + struct rte_event *events = bufp->events; + + if (*tailp > *headp) + n = *tailp - *headp; + else if (*tailp < *headp) + n = EVENT_BUFFER_SZ - *headp; + else { /* buffer empty */ + *nb_events_flushed = 0; + return; + } + + *nb_events_flushed = rte_event_enqueue_burst(dev_id, port_id, + &events[*headp], n); + if (*nb_events_flushed != n && rte_errno == -EINVAL) { + /* We must have hit a bad event...skip it to ensure we don't + * hang on it. + */ + (*nb_events_flushed)++; + } + + *headp = (*headp + *nb_events_flushed) % EVENT_BUFFER_SZ; + bufp->count -= *nb_events_flushed; +} + +/* + * Software event timer adapter implementation */ struct rte_event_timer_adapter_sw_data { @@ -461,6 +547,8 @@ struct rte_event_timer_adapter_sw_data { struct rte_mempool *msg_pool; /* Mempool containing timer objects */ struct rte_mempool *tim_pool; + /* Buffered timer expiry events to be enqueued to an event device. */ + struct event_buffer buffer; }; enum msg_type {MSG_TYPE_ARM, MSG_TYPE_CANCEL}; @@ -473,7 +561,8 @@ struct msg { static void sw_event_timer_cb(struct rte_timer *tim, void *arg) { - uint16_t n; + uint16_t nb_evs_flushed; + int ret; struct rte_event_timer *evtim; struct rte_event_timer_adapter *adapter; struct rte_event_timer_adapter_sw_data *sw_data; @@ -482,14 +571,10 @@ sw_event_timer_cb(struct rte_timer *tim, void *arg) adapter = (struct rte_event_timer_adapter *)evtim->impl_opaque[1]; sw_data = adapter->data->adapter_priv; - n = rte_event_enqueue_burst(adapter->data->event_dev_id, - adapter->data->event_port_id, - &evtim->ev, - 1); - if (n != 1 && rte_errno == -ENOSPC) { - /* If we couldn't enqueue because the event port was - * backpressured, put the timer back in the skiplist with an - * immediate expiry value so we can process it again on the + ret = event_buffer_add(&sw_data->buffer, &evtim->ev); + if (ret < 0) { + /* If event buffer is full, put timer back in list with + * immediate expiry value, so that we process it again on the * next iteration. */ rte_timer_reset_sync(tim, SINGLE, 0, rte_lcore_id(), @@ -500,6 +585,13 @@ sw_event_timer_cb(struct rte_timer *tim, void *arg) evtim->state = RTE_EVENT_TIMER_NOT_ARMED; rte_mempool_put(sw_data->tim_pool, (void **)&tim); } + + if (event_buffer_batch_ready(&sw_data->buffer)) { + event_buffer_flush(&sw_data->buffer, + adapter->data->event_dev_id, + adapter->data->event_port_id, + &nb_evs_flushed); + } } static __rte_always_inline uint64_t @@ -658,10 +750,14 @@ sw_event_timer_adapter_service_func(void *arg) rte_timer_manage(); + event_buffer_flush(&sw_data->buffer, adapter->data->event_dev_id, + adapter->data->event_port_id, &nb_events); + /* Could use for stats */ RTE_SET_USED(nb_events); RTE_SET_USED(ret); + return 0; } @@ -726,6 +822,8 @@ sw_event_timer_adapter_init(struct rte_event_timer_adapter *adapter) return -1; } + event_buffer_init(&sw_data->buffer); + /* Register a service component */ memset(&service, 0, sizeof(service)); snprintf(service.name, RTE_SERVICE_NAME_MAX, -- 2.6.4