From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mga03.intel.com (mga03.intel.com [134.134.136.65]) by dpdk.org (Postfix) with ESMTP id 683B22C6D for ; Thu, 30 Mar 2017 12:07:16 +0200 (CEST) DKIM-Signature: v=1; a=rsa-sha256; c=simple/simple; d=intel.com; i=@intel.com; q=dns/txt; s=intel; t=1490868437; x=1522404437; h=subject:to:references:cc:from:message-id:date: mime-version:in-reply-to; bh=edHtHCci49ymuIId/7ne5GF6wzfYbWTwkzZQrBLJ+GE=; b=MjEtAvmZToit0vjnLNeJA0qUgtpnNuVE3KA8sD+E4nVcmJuyQH5SodCs 0DzIrRKFi7ZTqtSUDC9J6cqD35zjtA==; Received: from orsmga001.jf.intel.com ([10.7.209.18]) by orsmga103.jf.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 30 Mar 2017 03:07:15 -0700 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.36,246,1486454400"; d="scan'208,217";a="1113596585" Received: from dhunt5-mobl.ger.corp.intel.com (HELO [10.237.221.69]) ([10.237.221.69]) by orsmga001.jf.intel.com with ESMTP; 30 Mar 2017 03:07:12 -0700 To: Harry van Haaren , dev@dpdk.org References: <1490374395-149320-1-git-send-email-harry.van.haaren@intel.com> <1490829963-106807-1-git-send-email-harry.van.haaren@intel.com> <1490829963-106807-12-git-send-email-harry.van.haaren@intel.com> Cc: jerin.jacob@caviumnetworks.com, Bruce Richardson , Gage Eads From: "Hunt, David" Message-ID: <5233d86d-1485-01f8-9515-fddead36aa20@intel.com> Date: Thu, 30 Mar 2017 11:07:11 +0100 User-Agent: Mozilla/5.0 (Windows NT 6.3; WOW64; rv:45.0) Gecko/20100101 Thunderbird/45.8.0 MIME-Version: 1.0 In-Reply-To: <1490829963-106807-12-git-send-email-harry.van.haaren@intel.com> Content-Type: text/plain; charset=windows-1252; format=flowed Content-Transfer-Encoding: 7bit X-Content-Filtered-By: Mailman/MimeDel 2.1.15 Subject: Re: [dpdk-dev] [PATCH v6 11/21] event/sw: add scheduling logic X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Thu, 30 Mar 2017 10:07:18 -0000 On 30/3/2017 12:25 AM, Harry van Haaren wrote: > From: Bruce Richardson > > Add in the scheduling function which takes the events from the > producer queues and buffers them before scheduling them to consumer > queues. The scheduling logic includes support for atomic, reordered, > and parallel scheduling of flows. > > Signed-off-by: Bruce Richardson > Signed-off-by: Gage Eads > Signed-off-by: Harry van Haaren > > --- > > v6: > - Fix handling of event priority normalization (Jerin) > --- > drivers/event/sw/Makefile | 1 + > drivers/event/sw/sw_evdev.c | 1 + > drivers/event/sw/sw_evdev.h | 11 + > drivers/event/sw/sw_evdev_scheduler.c | 601 ++++++++++++++++++++++++++++++++++ > 4 files changed, 614 insertions(+) > create mode 100644 drivers/event/sw/sw_evdev_scheduler.c > > diff --git a/drivers/event/sw/Makefile b/drivers/event/sw/Makefile > index b6ecd91..a7f5b3d 100644 > --- a/drivers/event/sw/Makefile > +++ b/drivers/event/sw/Makefile > @@ -54,6 +54,7 @@ EXPORT_MAP := rte_pmd_evdev_sw_version.map > # library source files > SRCS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += sw_evdev.c > SRCS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += sw_evdev_worker.c > +SRCS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += sw_evdev_scheduler.c > > # export include files > SYMLINK-y-include += > diff --git a/drivers/event/sw/sw_evdev.c b/drivers/event/sw/sw_evdev.c > index 2c28547..f91a04b 100644 > --- a/drivers/event/sw/sw_evdev.c > +++ b/drivers/event/sw/sw_evdev.c > @@ -557,6 +557,7 @@ sw_probe(const char *name, const char *params) > dev->enqueue_burst = sw_event_enqueue_burst; > dev->dequeue = sw_event_dequeue; > dev->dequeue_burst = sw_event_dequeue_burst; > + dev->schedule = sw_event_schedule; > > if (rte_eal_process_type() != RTE_PROC_PRIMARY) > return 0; > diff --git a/drivers/event/sw/sw_evdev.h b/drivers/event/sw/sw_evdev.h > index ab372fd..7c157c7 100644 > --- a/drivers/event/sw/sw_evdev.h > +++ b/drivers/event/sw/sw_evdev.h > @@ -248,8 +248,18 @@ struct sw_evdev { > /* Cache how many packets are in each cq */ > uint16_t cq_ring_space[SW_PORTS_MAX] __rte_cache_aligned; > > + /* Array of pointers to load-balanced QIDs sorted by priority level */ > + struct sw_qid *qids_prioritized[RTE_EVENT_MAX_QUEUES_PER_DEV]; > + > + /* Stats */ > + struct sw_point_stats stats __rte_cache_aligned; > + uint64_t sched_called; > int32_t sched_quanta; > + uint64_t sched_no_iq_enqueues; > + uint64_t sched_no_cq_enqueues; > + uint64_t sched_cq_qid_called; > > + uint8_t started; > uint32_t credit_update_quanta; > }; > > @@ -272,5 +282,6 @@ uint16_t sw_event_enqueue_burst(void *port, const struct rte_event ev[], > uint16_t sw_event_dequeue(void *port, struct rte_event *ev, uint64_t wait); > uint16_t sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num, > uint64_t wait); > +void sw_event_schedule(struct rte_eventdev *dev); > > #endif /* _SW_EVDEV_H_ */ > diff --git a/drivers/event/sw/sw_evdev_scheduler.c b/drivers/event/sw/sw_evdev_scheduler.c > new file mode 100644 > index 0000000..c0fe6a3 > --- /dev/null > +++ b/drivers/event/sw/sw_evdev_scheduler.c > @@ -0,0 +1,601 @@ > +/*- > + * BSD LICENSE > + * > + * Copyright(c) 2016-2017 Intel Corporation. All rights reserved. > + * > + * Redistribution and use in source and binary forms, with or without > + * modification, are permitted provided that the following conditions > + * are met: > + * > + * * Redistributions of source code must retain the above copyright > + * notice, this list of conditions and the following disclaimer. > + * * Redistributions in binary form must reproduce the above copyright > + * notice, this list of conditions and the following disclaimer in > + * the documentation and/or other materials provided with the > + * distribution. > + * * Neither the name of Intel Corporation nor the names of its > + * contributors may be used to endorse or promote products derived > + * from this software without specific prior written permission. > + * > + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS > + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT > + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR > + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT > + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, > + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT > + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, > + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY > + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT > + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE > + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. > + */ > + > +#include > +#include > +#include "sw_evdev.h" > +#include "iq_ring.h" > +#include "event_ring.h" > + > +#define SW_IQS_MASK (SW_IQS_MAX-1) > + > +/* Retrieve the highest priority IQ or -1 if no pkts available. Doing the > + * CLZ twice is faster than caching the value due to data dependencies > + */ > +#define PKT_MASK_TO_IQ(pkts) \ > + (__builtin_ctz(pkts | (1 << SW_IQS_MAX))) > + > +#if SW_IQS_MAX != 4 > +#error Misconfigured PRIO_TO_IQ caused by SW_IQS_MAX value change > +#endif > +#define PRIO_TO_IQ(prio) (prio >> 6) > + > +#define MAX_PER_IQ_DEQUEUE 48 > +#define FLOWID_MASK (SW_QID_NUM_FIDS-1) > + > +static inline uint32_t > +sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, > + uint32_t iq_num, unsigned int count) > +{ > + struct rte_event qes[MAX_PER_IQ_DEQUEUE]; /* count <= MAX */ > + struct rte_event blocked_qes[MAX_PER_IQ_DEQUEUE]; > + uint32_t nb_blocked = 0; > + uint32_t i; > + > + if (count > MAX_PER_IQ_DEQUEUE) > + count = MAX_PER_IQ_DEQUEUE; > + > + /* This is the QID ID. The QID ID is static, hence it can be > + * used to identify the stage of processing in history lists etc > + */ > + uint32_t qid_id = qid->id; > + > + iq_ring_dequeue_burst(qid->iq[iq_num], qes, count); > + for (i = 0; i < count; i++) { > + const struct rte_event *qe = &qes[i]; > + /* use cheap bit mixing, we only need to lose a few bits */ > + uint32_t flow_id32 = (qes[i].flow_id) ^ (qes[i].flow_id >> 10); > + const uint16_t flow_id = FLOWID_MASK & flow_id32; > + struct sw_fid_t *fid = &qid->fids[flow_id]; > + int cq = fid->cq; > + > + if (cq < 0) { > + uint32_t cq_idx = qid->cq_next_tx++; > + if (qid->cq_next_tx == qid->cq_num_mapped_cqs) > + qid->cq_next_tx = 0; > + cq = qid->cq_map[cq_idx]; > + > + /* find least used */ > + int cq_free_cnt = sw->cq_ring_space[cq]; > + for (cq_idx = 0; cq_idx < qid->cq_num_mapped_cqs; > + cq_idx++) { > + int test_cq = qid->cq_map[cq_idx]; > + int test_cq_free = sw->cq_ring_space[test_cq]; > + if (test_cq_free > cq_free_cnt) { > + cq = test_cq; > + cq_free_cnt = test_cq_free; > + } > + } > + > + fid->cq = cq; /* this pins early */ > + } > + > + if (sw->cq_ring_space[cq] == 0 || > + sw->ports[cq].inflights == SW_PORT_HIST_LIST) { > + blocked_qes[nb_blocked++] = *qe; > + continue; > + } > + > + struct sw_port *p = &sw->ports[cq]; > + > + /* at this point we can queue up the packet on the cq_buf */ > + fid->pcount++; > + p->cq_buf[p->cq_buf_count++] = *qe; > + p->inflights++; > + sw->cq_ring_space[cq]--; > + > + int head = (p->hist_head++ & (SW_PORT_HIST_LIST-1)); > + p->hist_list[head].fid = flow_id; > + p->hist_list[head].qid = qid_id; > + > + p->stats.tx_pkts++; > + qid->stats.tx_pkts++; > + > + /* if we just filled in the last slot, flush the buffer */ > + if (sw->cq_ring_space[cq] == 0) { > + struct qe_ring *worker = p->cq_worker_ring; > + qe_ring_enqueue_burst(worker, p->cq_buf, > + p->cq_buf_count, > + &sw->cq_ring_space[cq]); > + p->cq_buf_count = 0; > + } > + } > + iq_ring_put_back(qid->iq[iq_num], blocked_qes, nb_blocked); > + > + return count - nb_blocked; > +} > + > +static inline uint32_t > +sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, > + uint32_t iq_num, unsigned int count, int keep_order) > +{ > + uint32_t i; > + uint32_t cq_idx = qid->cq_next_tx; > + > + /* This is the QID ID. The QID ID is static, hence it can be > + * used to identify the stage of processing in history lists etc > + */ > + uint32_t qid_id = qid->id; > + > + if (count > MAX_PER_IQ_DEQUEUE) > + count = MAX_PER_IQ_DEQUEUE; > + > + if (keep_order) > + /* only schedule as many as we have reorder buffer entries */ > + count = RTE_MIN(count, > + rte_ring_count(qid->reorder_buffer_freelist)); > + > + for (i = 0; i < count; i++) { > + const struct rte_event *qe = iq_ring_peek(qid->iq[iq_num]); > + uint32_t cq_check_count = 0; > + uint32_t cq; > + > + /* > + * for parallel, just send to next available CQ in round-robin > + * fashion. So scan for an available CQ. If all CQs are full > + * just return and move on to next QID > + */ > + do { > + if (++cq_check_count > qid->cq_num_mapped_cqs) > + goto exit; > + cq = qid->cq_map[cq_idx]; > + if (++cq_idx == qid->cq_num_mapped_cqs) > + cq_idx = 0; > + } while (qe_ring_free_count(sw->ports[cq].cq_worker_ring) == 0 || > + sw->ports[cq].inflights == SW_PORT_HIST_LIST); > + > + struct sw_port *p = &sw->ports[cq]; > + if (sw->cq_ring_space[cq] == 0 || > + p->inflights == SW_PORT_HIST_LIST) > + break; > + > + sw->cq_ring_space[cq]--; > + > + qid->stats.tx_pkts++; > + > + const int head = (p->hist_head & (SW_PORT_HIST_LIST-1)); > + > + p->hist_list[head].fid = qe->flow_id; > + p->hist_list[head].qid = qid_id; > + > + if (keep_order) > + rte_ring_sc_dequeue(qid->reorder_buffer_freelist, > + (void *)&p->hist_list[head].rob_entry); > + > + sw->ports[cq].cq_buf[sw->ports[cq].cq_buf_count++] = *qe; > + iq_ring_pop(qid->iq[iq_num]); > + > + rte_compiler_barrier(); > + p->inflights++; > + p->stats.tx_pkts++; > + p->hist_head++; > + } > +exit: > + qid->cq_next_tx = cq_idx; > + return i; > +} > + > +static uint32_t > +sw_schedule_dir_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, > + uint32_t iq_num, unsigned int count __rte_unused) > +{ > + uint32_t cq_id = qid->cq_map[0]; > + struct sw_port *port = &sw->ports[cq_id]; > + > + /* get max burst enq size for cq_ring */ > + uint32_t count_free = sw->cq_ring_space[cq_id]; > + if (count_free == 0) > + return 0; > + > + /* burst dequeue from the QID IQ ring */ > + struct iq_ring *ring = qid->iq[iq_num]; > + uint32_t ret = iq_ring_dequeue_burst(ring, > + &port->cq_buf[port->cq_buf_count], count_free); > + port->cq_buf_count += ret; > + > + /* Update QID, Port and Total TX stats */ > + qid->stats.tx_pkts += ret; > + port->stats.tx_pkts += ret; > + > + /* Subtract credits from cached value */ > + sw->cq_ring_space[cq_id] -= ret; > + > + return ret; > +} > + > +static uint32_t > +sw_schedule_qid_to_cq(struct sw_evdev *sw) > +{ > + uint32_t pkts = 0; > + uint32_t qid_idx; > + > + sw->sched_cq_qid_called++; > + > + for (qid_idx = 0; qid_idx < sw->qid_count; qid_idx++) { > + struct sw_qid *qid = sw->qids_prioritized[qid_idx]; > + > + int type = qid->type; > + int iq_num = PKT_MASK_TO_IQ(qid->iq_pkt_mask); > + > + /* zero mapped CQs indicates directed */ > + if (iq_num >= SW_IQS_MAX) > + continue; > + > + uint32_t pkts_done = 0; > + uint32_t count = iq_ring_count(qid->iq[iq_num]); > + > + if (count > 0) { > + if (type == SW_SCHED_TYPE_DIRECT) > + pkts_done += sw_schedule_dir_to_cq(sw, qid, > + iq_num, count); > + else if (type == RTE_SCHED_TYPE_ATOMIC) > + pkts_done += sw_schedule_atomic_to_cq(sw, qid, > + iq_num, count); > + else > + pkts_done += sw_schedule_parallel_to_cq(sw, qid, > + iq_num, count, > + type == RTE_SCHED_TYPE_ORDERED); > + } > + > + /* Check if the IQ that was polled is now empty, and unset it > + * in the IQ mask if its empty. > + */ > + int all_done = (pkts_done == count); > + > + qid->iq_pkt_mask &= ~(all_done << (iq_num)); > + pkts += pkts_done; > + } > + > + return pkts; > +} > + > +/* This function will perform re-ordering of packets, and injecting into > + * the appropriate QID IQ. As LB and DIR QIDs are in the same array, but *NOT* > + * contiguous in that array, this function accepts a "range" of QIDs to scan. > + */ > +static uint16_t > +sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int qid_end) > +{ > + /* Perform egress reordering */ > + struct rte_event *qe; > + uint32_t pkts_iter = 0; > + > + for (; qid_start < qid_end; qid_start++) { > + struct sw_qid *qid = &sw->qids[qid_start]; > + int i, num_entries_in_use; > + > + if (qid->type != RTE_SCHED_TYPE_ORDERED) > + continue; > + > + num_entries_in_use = rte_ring_free_count( > + qid->reorder_buffer_freelist); > + > + for (i = 0; i < num_entries_in_use; i++) { > + struct reorder_buffer_entry *entry; > + int j; > + > + entry = &qid->reorder_buffer[qid->reorder_buffer_index]; > + > + if (!entry->ready) > + break; > + > + for (j = 0; j < entry->num_fragments; j++) { > + uint16_t dest_qid; > + uint16_t dest_iq; > + > + int idx = entry->fragment_index + j; > + qe = &entry->fragments[idx]; > + > + dest_qid = qe->queue_id; > + dest_iq = PRIO_TO_IQ(qe->priority); > + > + if (dest_qid >= sw->qid_count) { > + sw->stats.rx_dropped++; > + continue; > + } > + > + struct sw_qid *dest_qid_ptr = > + &sw->qids[dest_qid]; > + const struct iq_ring *dest_iq_ptr = > + dest_qid_ptr->iq[dest_iq]; > + if (iq_ring_free_count(dest_iq_ptr) == 0) > + break; > + > + pkts_iter++; > + > + struct sw_qid *q = &sw->qids[dest_qid]; > + struct iq_ring *r = q->iq[dest_iq]; > + > + /* we checked for space above, so enqueue must > + * succeed > + */ > + iq_ring_enqueue(r, qe); > + q->iq_pkt_mask |= (1 << (dest_iq)); > + q->iq_pkt_count[dest_iq]++; > + q->stats.rx_pkts++; > + } > + > + entry->ready = (j != entry->num_fragments); > + entry->num_fragments -= j; > + entry->fragment_index += j; > + > + if (!entry->ready) { > + entry->fragment_index = 0; > + > + rte_ring_sp_enqueue( > + qid->reorder_buffer_freelist, > + entry); > + > + qid->reorder_buffer_index++; > + qid->reorder_buffer_index %= qid->window_size; > + } > + } > + } > + return pkts_iter; > +} > + > +static inline void __attribute__((always_inline)) > +sw_refill_pp_buf(struct sw_evdev *sw, struct sw_port *port) > +{ > + RTE_SET_USED(sw); > + struct qe_ring *worker = port->rx_worker_ring; > + port->pp_buf_start = 0; > + port->pp_buf_count = qe_ring_dequeue_burst(worker, port->pp_buf, > + RTE_DIM(port->pp_buf)); > +} > + > +static inline uint32_t __attribute__((always_inline)) > +__pull_port_lb(struct sw_evdev *sw, uint32_t port_id, int allow_reorder) > +{ > + static const struct reorder_buffer_entry dummy_rob; > + uint32_t pkts_iter = 0; > + struct sw_port *port = &sw->ports[port_id]; > + > + /* If shadow ring has 0 pkts, pull from worker ring */ > + if (port->pp_buf_count == 0) > + sw_refill_pp_buf(sw, port); > + > + while (port->pp_buf_count) { > + const struct rte_event *qe = &port->pp_buf[port->pp_buf_start]; > + struct sw_hist_list_entry *hist_entry = NULL; > + uint8_t flags = qe->op; > + const uint16_t eop = !(flags & QE_FLAG_NOT_EOP); > + int needs_reorder = 0; > + /* if no-reordering, having PARTIAL == NEW */ > + if (!allow_reorder && !eop) > + flags = QE_FLAG_VALID; > + > + /* > + * if we don't have space for this packet in an IQ, > + * then move on to next queue. Technically, for a > + * packet that needs reordering, we don't need to check > + * here, but it simplifies things not to special-case > + */ > + uint32_t iq_num = PRIO_TO_IQ(qe->priority); > + struct sw_qid *qid = &sw->qids[qe->queue_id]; > + > + if ((flags & QE_FLAG_VALID) && > + iq_ring_free_count(qid->iq[iq_num]) == 0) > + break; > + > + /* now process based on flags. Note that for directed > + * queues, the enqueue_flush masks off all but the > + * valid flag. This makes FWD and PARTIAL enqueues just > + * NEW type, and makes DROPS no-op calls. > + */ > + if ((flags & QE_FLAG_COMPLETE) && port->inflights > 0) { > + const uint32_t hist_tail = port->hist_tail & > + (SW_PORT_HIST_LIST - 1); > + > + hist_entry = &port->hist_list[hist_tail]; > + const uint32_t hist_qid = hist_entry->qid; > + const uint32_t hist_fid = hist_entry->fid; > + > + struct sw_fid_t *fid = > + &sw->qids[hist_qid].fids[hist_fid]; > + fid->pcount -= eop; > + if (fid->pcount == 0) > + fid->cq = -1; > + > + if (allow_reorder) { > + /* set reorder ready if an ordered QID */ > + uintptr_t rob_ptr = > + (uintptr_t)hist_entry->rob_entry; > + const uintptr_t valid = (rob_ptr != 0); > + needs_reorder = valid; > + rob_ptr |= > + ((valid - 1) & (uintptr_t)&dummy_rob); > + struct reorder_buffer_entry *tmp_rob_ptr = > + (struct reorder_buffer_entry *)rob_ptr; > + tmp_rob_ptr->ready = eop * needs_reorder; > + } > + > + port->inflights -= eop; > + port->hist_tail += eop; > + } > + if (flags & QE_FLAG_VALID) { > + port->stats.rx_pkts++; > + > + if (allow_reorder && needs_reorder) { > + struct reorder_buffer_entry *rob_entry = > + hist_entry->rob_entry; > + > + /* Although fragmentation not currently > + * supported by eventdev API, we support it > + * here. Open: How do we alert the user that > + * they've exceeded max frags? > + */ > + int num_frag = rob_entry->num_fragments; > + if (num_frag == SW_FRAGMENTS_MAX) > + sw->stats.rx_dropped++; > + else { > + int idx = rob_entry->num_fragments++; > + rob_entry->fragments[idx] = *qe; > + } > + goto end_qe; > + } > + > + /* Use the iq_num from above to push the QE > + * into the qid at the right priority > + */ > + > + qid->iq_pkt_mask |= (1 << (iq_num)); > + iq_ring_enqueue(qid->iq[iq_num], qe); > + qid->iq_pkt_count[iq_num]++; > + qid->stats.rx_pkts++; > + pkts_iter++; > + } > + > +end_qe: > + port->pp_buf_start++; > + port->pp_buf_count--; > + } /* while (avail_qes) */ > + > + return pkts_iter; > +} > + > +static uint32_t > +sw_schedule_pull_port_lb(struct sw_evdev *sw, uint32_t port_id) > +{ > + return __pull_port_lb(sw, port_id, 1); > +} > + > +static uint32_t > +sw_schedule_pull_port_no_reorder(struct sw_evdev *sw, uint32_t port_id) > +{ > + return __pull_port_lb(sw, port_id, 0); > +} > + > +static uint32_t > +sw_schedule_pull_port_dir(struct sw_evdev *sw, uint32_t port_id) > +{ > + uint32_t pkts_iter = 0; > + struct sw_port *port = &sw->ports[port_id]; > + > + /* If shadow ring has 0 pkts, pull from worker ring */ > + if (port->pp_buf_count == 0) > + sw_refill_pp_buf(sw, port); > + > + while (port->pp_buf_count) { > + const struct rte_event *qe = &port->pp_buf[port->pp_buf_start]; > + uint8_t flags = qe->op; > + > + if ((flags & QE_FLAG_VALID) == 0) > + goto end_qe; > + > + uint32_t iq_num = PRIO_TO_IQ(qe->priority); > + struct sw_qid *qid = &sw->qids[qe->queue_id]; > + struct iq_ring *iq_ring = qid->iq[iq_num]; > + > + if (iq_ring_free_count(iq_ring) == 0) > + break; /* move to next port */ > + > + port->stats.rx_pkts++; > + > + /* Use the iq_num from above to push the QE > + * into the qid at the right priority > + */ > + qid->iq_pkt_mask |= (1 << (iq_num)); > + iq_ring_enqueue(iq_ring, qe); > + qid->iq_pkt_count[iq_num]++; > + qid->stats.rx_pkts++; > + pkts_iter++; > + > +end_qe: > + port->pp_buf_start++; > + port->pp_buf_count--; > + } /* while port->pp_buf_count */ > + > + return pkts_iter; > +} > + > +void > +sw_event_schedule(struct rte_eventdev *dev) > +{ > + struct sw_evdev *sw = sw_pmd_priv(dev); > + uint32_t in_pkts, out_pkts; > + uint32_t out_pkts_total = 0, in_pkts_total = 0; > + int32_t sched_quanta = sw->sched_quanta; > + uint32_t i; > + > + sw->sched_called++; > + if (!sw->started) > + return; > + > + do { > + uint32_t in_pkts_this_iteration = 0; > + > + /* Pull from rx_ring for ports */ > + do { > + in_pkts = 0; > + for (i = 0; i < sw->port_count; i++) > + if (sw->ports[i].is_directed) > + in_pkts += sw_schedule_pull_port_dir(sw, i); > + else if (sw->ports[i].num_ordered_qids > 0) > + in_pkts += sw_schedule_pull_port_lb(sw, i); > + else > + in_pkts += sw_schedule_pull_port_no_reorder(sw, i); > + > + /* QID scan for re-ordered */ > + in_pkts += sw_schedule_reorder(sw, 0, > + sw->qid_count); > + in_pkts_this_iteration += in_pkts; > + } while (in_pkts > 4 && > + (int)in_pkts_this_iteration < sched_quanta); > + > + out_pkts = 0; > + out_pkts += sw_schedule_qid_to_cq(sw); > + out_pkts_total += out_pkts; > + in_pkts_total += in_pkts_this_iteration; > + > + if (in_pkts == 0 && out_pkts == 0) > + break; > + } while ((int)out_pkts_total < sched_quanta); > + > + /* push all the internal buffered QEs in port->cq_ring to the > + * worker cores: aka, do the ring transfers batched. > + */ > + for (i = 0; i < sw->port_count; i++) { > + struct qe_ring *worker = sw->ports[i].cq_worker_ring; > + qe_ring_enqueue_burst(worker, sw->ports[i].cq_buf, > + sw->ports[i].cq_buf_count, > + &sw->cq_ring_space[i]); > + sw->ports[i].cq_buf_count = 0; > + } > + > + sw->stats.tx_pkts += out_pkts_total; > + sw->stats.rx_pkts += in_pkts_total; > + > + sw->sched_no_iq_enqueues += (in_pkts_total == 0); > + sw->sched_no_cq_enqueues += (out_pkts_total == 0); > + > +} There's a couple of line-length issues in checkpatch, but the indentation makes it very difficult to resolve, so I would suggest they're OK as they are. So, Acked-by: David Hunt