From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from dpdk.org (dpdk.org [92.243.14.124]) by inbox.dpdk.org (Postfix) with ESMTP id 81BDCA04E6; Sat, 31 Oct 2020 10:52:44 +0100 (CET) Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id 118BACA60; Sat, 31 Oct 2020 10:52:42 +0100 (CET) Received: from mail-io1-f66.google.com (mail-io1-f66.google.com [209.85.166.66]) by dpdk.org (Postfix) with ESMTP id 5A2D6CA36 for ; Sat, 31 Oct 2020 10:52:39 +0100 (CET) Received: by mail-io1-f66.google.com with SMTP id k6so10077067ior.2 for ; Sat, 31 Oct 2020 02:52:39 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :cc; bh=0eXOxP2g2bp66zPcA8escJvrqUbtMxz2qP3fuCdp6LM=; b=s5KwGH/YvIIduD+psqS0n4yGa2stvL+VRN/e5HIC51SNoci3vs9qz1gzMNlQRnYw+A 9Kw0i1girtDsMHpHnD2dLoEmf0cMH6nvJSZItRwiAjVkJ8bLApiGG0eJOSOxuV+rTH5j /t2gYR2hxlxaL8QHvwptUqCo9AZP3K/elUfHfeIz79Vqv7owr7/7oIa7y1kwUecqcIvN sIa3ahp9ZtPVM3Rf95F0qdFeVSbvWewl1Lr5lw0QIxnOmBdRvhkiruv+3+YcJMBXoyO1 wkP5Y0i/NxpfUVNQuoED1YJGXwCr3dVQJTKrYHeSDZp7pa/3LuCR2CMI51I8uSwlF3Tm cyng== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to:cc; bh=0eXOxP2g2bp66zPcA8escJvrqUbtMxz2qP3fuCdp6LM=; b=Gq0RIwOSXxU1vrtsx+wKoxhpk7+FlIp+zmUFFnlQk1E4CmnlUdbMzLauGJCQCpTiC/ +NJ8JKTeZWau1+H/teDGphHPHJcg9R/Xl7Kz+XCX7nouakshxGiQi69IamwECnmRgb3g mbqv3+QYhsSj60HrgH00nJrUwswEGrIlQGaUMX0KoZo5C0sLylhRDZuW3ownFVUoOydk wHCgHzGcXQLxSM3L3drXwiSciZUvj05y2RF45f57YVOcUlzzqUVhKlHA2hc88jHpqpJZ E3WbV85wGXWXN7fIqt8IrKgtY3Bhq3wqRprpf22Y45rTkDjHjHTUoi/oSQUE4gs84eKh V5sA== X-Gm-Message-State: AOAM530ilMnRc/7fuAFhPY1Qy50fw2ubMi6M0Y5KAolWwIA3FF8ZDaII DoIgolLYJVk5gYcQkQfpWUH74UtuG+TDgL6udnBFuAmfTSHNaQ== X-Google-Smtp-Source: ABdhPJxg7tYAmrifYIlM1zc8DVo1VKQcIc2Qiu4aIHBKxhKJUG8ry/eUqiu7B4MD9vySKCoyoZwdXRbUewwILz7zgFE= X-Received: by 2002:a02:ac09:: with SMTP id a9mr5172075jao.60.1604137958427; Sat, 31 Oct 2020 02:52:38 -0700 (PDT) MIME-Version: 1.0 References: <1602958879-8558-2-git-send-email-timothy.mcdaniel@intel.com> <1604109710-31624-1-git-send-email-timothy.mcdaniel@intel.com> <1604109710-31624-18-git-send-email-timothy.mcdaniel@intel.com> In-Reply-To: <1604109710-31624-18-git-send-email-timothy.mcdaniel@intel.com> From: Jerin Jacob Date: Sat, 31 Oct 2020 15:22:22 +0530 Message-ID: To: Timothy McDaniel Cc: dpdk-dev , Erik Gabriel Carrillo , Gage Eads , "Van Haaren, Harry" , Jerin Jacob , Thomas Monjalon Content-Type: text/plain; charset="UTF-8" Subject: Re: [dpdk-dev] [PATCH v8 17/23] event/dlb2: add enqueue and its burst variants X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" On Sat, Oct 31, 2020 at 7:36 AM Timothy McDaniel wrote: > > Add support for enqueue and its variants. > > Signed-off-by: Timothy McDaniel > Reviewed-by: Gage Eads > --- > doc/guides/eventdevs/dlb2.rst | 118 +++++++++ > drivers/event/dlb2/dlb2.c | 578 ++++++++++++++++++++++++++++++++++++++++++ > 2 files changed, 696 insertions(+) Found following build error at this patch with clang [2262/2494] Linking target drivers/librte_bus_vmbus.so.21.0 [2263/2494] Compiling C object drivers/libtmp_rte_event_dlb2.a.p/event_dlb2_dlb2.c.o FAILED: drivers/libtmp_rte_event_dlb2.a.p/event_dlb2_dlb2.c.o ccache clang -Idrivers/libtmp_rte_event_dlb2.a.p -Idrivers -I../drivers -Idrivers/event/dlb2 -I../drivers/event/dlb2 -Ilib/librte_eventdev -I../lib/librte_eventdev -I. -I.. -Iconfig -I../config -Ilib/librte_eal/include -I../lib/librte_eal/i nclude -Ilib/librte_eal/linux/include -I../lib/librte_eal/linux/include -Ilib/librte_eal/x86/include -I../lib/librte_eal/x86/include -Ilib/librte_eal/common -I../lib/librte_eal/common -Ilib/librte_eal -I../lib/librte_eal -Ilib/librte_kvargs -I../lib/librte_kvargs -Ilib/librte_metrics -I../lib/librte_metrics -Ilib/librte_telemetry -I../lib/librte_telemetry -Ilib/librte_ring -I../lib/librte_ring -Ilib/librte_ethdev -I../lib/librte_ethdev -Ilib/librte_net -I../lib/librte_net -Il ib/librte_mbuf -I../lib/librte_mbuf -Ilib/librte_mempool -I../lib/librte_mempool -Ilib/librte_meter -I../lib/librte_meter -Ilib/librte_hash -I../lib/librte_hash -Ilib/librte_rcu -I../lib/librte_rcu -Ilib/librte_timer -I../lib/librte_timer - Ilib/librte_cryptodev -I../lib/librte_cryptodev -Ilib/librte_pci -I../lib/librte_pci -Idrivers/bus/pci -I../drivers/bus/pci -I../drivers/bus/pci/linux -Xclang -fcolor-diagnostics -pipe -D_FILE_OFFSET_BITS=64 -Wall -Winvalid-pch -Werror -O2 -g -include rte_config.h -Wextra -Wcast-qual -Wdeprecated -Wformat-nonliteral -Wformat-security -Wmissing-declarations -Wmissing-prototypes -Wnested-externs -Wold-style-definition -Wpointer-arith -Wsign-compare -Wstrict-prototypes -Wundef - Wwrite-strings -Wno-address-of-packed-member -Wno-missing-field-initializers -D_GNU_SOURCE -fPIC -march=native -DALLOW_EXPERIMENTAL_API -DALLOW_INTERNAL_API -MD -MQ drivers/libtmp_rte_event_dlb2.a.p/event_dlb2_dlb2.c.o -MF drivers/libtmp_rt e_event_dlb2.a.p/event_dlb2_dlb2.c.o.d -o drivers/libtmp_rte_event_dlb2.a.p/event_dlb2_dlb2.c.o -c ../drivers/event/dlb2/dlb2.c ../drivers/event/dlb2/dlb2.c:2231:1: error: unused function 'dlb2_consume_qe_immediate' [-Werror,-Wunused-function] dlb2_consume_qe_immediate(struct dlb2_port *qm_port, int num) ^ ../drivers/event/dlb2/dlb2.c:2271:1: error: unused function 'dlb2_construct_token_pop_qe' [-Werror,-Wunused-function] dlb2_construct_token_pop_qe(struct dlb2_port *qm_port, int idx) ^ 2 errors generated. [2264/2494] Generating rte_event_octeontx.sym_chk with a meson_exe.py custom command [2265/2494] Generating rte_baseband_fpga_lte_fec.sym_chk with a meson_exe.py custom command [2266/2494] Generating rte_event_octeontx2.sym_chk with a meson_exe.py custom command [2267/2494] Generating rte_event_sw.sym_chk with a meson_exe.py custom command [2268/2494] Linking target lib/librte_stack.so.21.0 [2269/2494] Linking target lib/librte_jobstats.so.21.0 [2270/2494] Linking target lib/librte_cfgfile.so.21.0 > > diff --git a/doc/guides/eventdevs/dlb2.rst b/doc/guides/eventdevs/dlb2.rst > index bbd6ac8..aa8bf01 100644 > --- a/doc/guides/eventdevs/dlb2.rst > +++ b/doc/guides/eventdevs/dlb2.rst > @@ -163,6 +163,124 @@ Flow ID > The flow ID field is preserved in the event when it is scheduled in the > DLB2. > > +Hardware Credits > +~~~~~~~~~~~~~~~~ > + > +DLB2 uses a hardware credit scheme to prevent software from overflowing hardware > +event storage, with each unit of storage represented by a credit. A port spends > +a credit to enqueue an event, and hardware refills the ports with credits as the > +events are scheduled to ports. Refills come from credit pools, and each port is > +a member of a load-balanced credit pool and a directed credit pool. The > +load-balanced credits are used to enqueue to load-balanced queues, and directed > +credits are used for directed queues. > + > +A DLB2 eventdev contains one load-balanced and one directed credit pool. These > +pools' sizes are controlled by the nb_events_limit field in struct > +rte_event_dev_config. The load-balanced pool is sized to contain > +nb_events_limit credits, and the directed pool is sized to contain > +nb_events_limit/4 credits. The directed pool size can be overridden with the > +num_dir_credits vdev argument, like so: > + > + .. code-block:: console > + > + --vdev=dlb1_event,num_dir_credits= > + > +This can be used if the default allocation is too low or too high for the > +specific application needs. The PMD also supports a vdev arg that limits the > +max_num_events reported by rte_event_dev_info_get(): > + > + .. code-block:: console > + > + --vdev=dlb1_event,max_num_events= > + > +By default, max_num_events is reported as the total available load-balanced > +credits. If multiple DLB2-based applications are being used, it may be desirable > +to control how many load-balanced credits each application uses, particularly > +when application(s) are written to configure nb_events_limit equal to the > +reported max_num_events. > + > +Each port is a member of both credit pools. A port's credit allocation is > +defined by its low watermark, high watermark, and refill quanta. These three > +parameters are calculated by the dlb PMD like so: > + > +- The load-balanced high watermark is set to the port's enqueue_depth. > + The directed high watermark is set to the minimum of the enqueue_depth and > + the directed pool size divided by the total number of ports. > +- The refill quanta is set to half the high watermark. > +- The low watermark is set to the minimum of 16 and the refill quanta. > + > +When the eventdev is started, each port is pre-allocated a high watermark's > +worth of credits. For example, if an eventdev contains four ports with enqueue > +depths of 32 and a load-balanced credit pool size of 4096, each port will start > +with 32 load-balanced credits, and there will be 3968 credits available to > +replenish the ports. Thus, a single port is not capable of enqueueing up to the > +nb_events_limit (without any events being dequeued), since the other ports are > +retaining their initial credit allocation; in short, all ports must enqueue in > +order to reach the limit. > + > +If a port attempts to enqueue and has no credits available, the enqueue > +operation will fail and the application must retry the enqueue. Credits are > +replenished asynchronously by the DLB2 hardware. > + > +Software Credits > +~~~~~~~~~~~~~~~~ > + > +The DLB2 is a "closed system" event dev, and the DLB2 PMD layers a software > +credit scheme on top of the hardware credit scheme in order to comply with > +the per-port backpressure described in the eventdev API. > + > +The DLB2's hardware scheme is local to a queue/pipeline stage: a port spends a > +credit when it enqueues to a queue, and credits are later replenished after the > +events are dequeued and released. > + > +In the software credit scheme, a credit is consumed when a new (.op = > +RTE_EVENT_OP_NEW) event is injected into the system, and the credit is > +replenished when the event is released from the system (either explicitly with > +RTE_EVENT_OP_RELEASE or implicitly in dequeue_burst()). > + > +In this model, an event is "in the system" from its first enqueue into eventdev > +until it is last dequeued. If the event goes through multiple event queues, it > +is still considered "in the system" while a worker thread is processing it. > + > +A port will fail to enqueue if the number of events in the system exceeds its > +``new_event_threshold`` (specified at port setup time). A port will also fail > +to enqueue if it lacks enough hardware credits to enqueue; load-balanced > +credits are used to enqueue to a load-balanced queue, and directed credits are > +used to enqueue to a directed queue. > + > +The out-of-credit situations are typically transient, and an eventdev > +application using the DLB2 ought to retry its enqueues if they fail. > +If enqueue fails, DLB2 PMD sets rte_errno as follows: > + > +- -ENOSPC: Credit exhaustion (either hardware or software) > +- -EINVAL: Invalid argument, such as port ID, queue ID, or sched_type. > + > +Depending on the pipeline the application has constructed, it's possible to > +enter a credit deadlock scenario wherein the worker thread lacks the credit > +to enqueue an event, and it must dequeue an event before it can recover the > +credit. If the worker thread retries its enqueue indefinitely, it will not > +make forward progress. Such deadlock is possible if the application has event > +"loops", in which an event in dequeued from queue A and later enqueued back to > +queue A. > + > +Due to this, workers should stop retrying after a time, release the events it > +is attempting to enqueue, and dequeue more events. It is important that the > +worker release the events and don't simply set them aside to retry the enqueue > +again later, because the port has limited history list size (by default, twice > +the port's dequeue_depth). > + > +Priority > +~~~~~~~~ > + > +The DLB2 supports event priority and per-port queue service priority, as > +described in the eventdev header file. The DLB2 does not support 'global' event > +queue priority established at queue creation time. > + > +DLB2 supports 8 event and queue service priority levels. For both priority > +types, the PMD uses the upper three bits of the priority field to determine the > +DLB2 priority, discarding the 5 least significant bits. The 5 least significant > +event priority bits are not preserved when an event is enqueued. > + > Reconfiguration > ~~~~~~~~~~~~~~~ > > diff --git a/drivers/event/dlb2/dlb2.c b/drivers/event/dlb2/dlb2.c > index 051836d..7d2cf76 100644 > --- a/drivers/event/dlb2/dlb2.c > +++ b/drivers/event/dlb2/dlb2.c > @@ -2089,6 +2089,578 @@ dlb2_eventdev_start(struct rte_eventdev *dev) > return 0; > } > > +static uint8_t cmd_byte_map[DLB2_NUM_PORT_TYPES][DLB2_NUM_HW_SCHED_TYPES] = { > + { > + /* Load-balanced cmd bytes */ > + [RTE_EVENT_OP_NEW] = DLB2_NEW_CMD_BYTE, > + [RTE_EVENT_OP_FORWARD] = DLB2_FWD_CMD_BYTE, > + [RTE_EVENT_OP_RELEASE] = DLB2_COMP_CMD_BYTE, > + }, > + { > + /* Directed cmd bytes */ > + [RTE_EVENT_OP_NEW] = DLB2_NEW_CMD_BYTE, > + [RTE_EVENT_OP_FORWARD] = DLB2_NEW_CMD_BYTE, > + [RTE_EVENT_OP_RELEASE] = DLB2_NOOP_CMD_BYTE, > + }, > +}; > + > +static inline uint32_t > +dlb2_port_credits_get(struct dlb2_port *qm_port, > + enum dlb2_hw_queue_types type) > +{ > + uint32_t credits = *qm_port->credit_pool[type]; > + uint32_t batch_size = DLB2_SW_CREDIT_BATCH_SZ; > + > + if (unlikely(credits < batch_size)) > + batch_size = credits; > + > + if (likely(credits && > + __atomic_compare_exchange_n( > + qm_port->credit_pool[type], > + &credits, credits - batch_size, false, > + __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))) > + return batch_size; > + else > + return 0; > +} > + > +static inline void > +dlb2_replenish_sw_credits(struct dlb2_eventdev *dlb2, > + struct dlb2_eventdev_port *ev_port) > +{ > + uint16_t quanta = ev_port->credit_update_quanta; > + > + if (ev_port->inflight_credits >= quanta * 2) { > + /* Replenish credits, saving one quanta for enqueues */ > + uint16_t val = ev_port->inflight_credits - quanta; > + > + __atomic_fetch_sub(&dlb2->inflights, val, __ATOMIC_SEQ_CST); > + ev_port->inflight_credits -= val; > + } > +} > + > +static inline int > +dlb2_check_enqueue_sw_credits(struct dlb2_eventdev *dlb2, > + struct dlb2_eventdev_port *ev_port) > +{ > + uint32_t sw_inflights = __atomic_load_n(&dlb2->inflights, > + __ATOMIC_SEQ_CST); > + const int num = 1; > + > + if (unlikely(ev_port->inflight_max < sw_inflights)) { > + DLB2_INC_STAT(ev_port->stats.traffic.tx_nospc_inflight_max, 1); > + rte_errno = -ENOSPC; > + return 1; > + } > + > + if (ev_port->inflight_credits < num) { > + /* check if event enqueue brings ev_port over max threshold */ > + uint32_t credit_update_quanta = ev_port->credit_update_quanta; > + > + if (sw_inflights + credit_update_quanta > > + dlb2->new_event_limit) { > + DLB2_INC_STAT( > + ev_port->stats.traffic.tx_nospc_new_event_limit, > + 1); > + rte_errno = -ENOSPC; > + return 1; > + } > + > + __atomic_fetch_add(&dlb2->inflights, credit_update_quanta, > + __ATOMIC_SEQ_CST); > + ev_port->inflight_credits += (credit_update_quanta); > + > + if (ev_port->inflight_credits < num) { > + DLB2_INC_STAT( > + ev_port->stats.traffic.tx_nospc_inflight_credits, > + 1); > + rte_errno = -ENOSPC; > + return 1; > + } > + } > + > + return 0; > +} > + > +static inline int > +dlb2_check_enqueue_hw_ldb_credits(struct dlb2_port *qm_port) > +{ > + if (unlikely(qm_port->cached_ldb_credits == 0)) { > + qm_port->cached_ldb_credits = > + dlb2_port_credits_get(qm_port, > + DLB2_LDB_QUEUE); > + if (unlikely(qm_port->cached_ldb_credits == 0)) { > + DLB2_INC_STAT( > + qm_port->ev_port->stats.traffic.tx_nospc_ldb_hw_credits, > + 1); > + DLB2_LOG_DBG("ldb credits exhausted\n"); > + return 1; /* credits exhausted */ > + } > + } > + > + return 0; > +} > + > +static inline int > +dlb2_check_enqueue_hw_dir_credits(struct dlb2_port *qm_port) > +{ > + if (unlikely(qm_port->cached_dir_credits == 0)) { > + qm_port->cached_dir_credits = > + dlb2_port_credits_get(qm_port, > + DLB2_DIR_QUEUE); > + if (unlikely(qm_port->cached_dir_credits == 0)) { > + DLB2_INC_STAT( > + qm_port->ev_port->stats.traffic.tx_nospc_dir_hw_credits, > + 1); > + DLB2_LOG_DBG("dir credits exhausted\n"); > + return 1; /* credits exhausted */ > + } > + } > + > + return 0; > +} > + > +static __rte_always_inline void > +dlb2_pp_write(struct dlb2_enqueue_qe *qe4, > + struct process_local_port_data *port_data) > +{ > + dlb2_movdir64b(port_data->pp_addr, qe4); > +} > + > +static inline int > +dlb2_consume_qe_immediate(struct dlb2_port *qm_port, int num) > +{ > + struct process_local_port_data *port_data; > + struct dlb2_cq_pop_qe *qe; > + > + RTE_ASSERT(qm_port->config_state == DLB2_CONFIGURED); > + > + qe = qm_port->consume_qe; > + > + qe->tokens = num - 1; > + > + /* No store fence needed since no pointer is being sent, and CQ token > + * pops can be safely reordered with other HCWs. > + */ > + port_data = &dlb2_port[qm_port->id][PORT_TYPE(qm_port)]; > + > + dlb2_movntdq_single(port_data->pp_addr, qe); > + > + DLB2_LOG_DBG("dlb2: consume immediate - %d QEs\n", num); > + > + qm_port->owed_tokens = 0; > + > + return 0; > +} > + > +static inline void > +dlb2_hw_do_enqueue(struct dlb2_port *qm_port, > + bool do_sfence, > + struct process_local_port_data *port_data) > +{ > + /* Since MOVDIR64B is weakly-ordered, use an SFENCE to ensure that > + * application writes complete before enqueueing the QE. > + */ > + if (do_sfence) > + rte_wmb(); > + > + dlb2_pp_write(qm_port->qe4, port_data); > +} > + > +static inline void > +dlb2_construct_token_pop_qe(struct dlb2_port *qm_port, int idx) > +{ > + struct dlb2_cq_pop_qe *qe = (void *)qm_port->qe4; > + int num = qm_port->owed_tokens; > + > + qe[idx].cmd_byte = DLB2_POP_CMD_BYTE; > + qe[idx].tokens = num - 1; > + > + qm_port->owed_tokens = 0; > +} > + > +static inline void > +dlb2_event_build_hcws(struct dlb2_port *qm_port, > + const struct rte_event ev[], > + int num, > + uint8_t *sched_type, > + uint8_t *queue_id) > +{ > + struct dlb2_enqueue_qe *qe; > + uint16_t sched_word[4]; > + __m128i sse_qe[2]; > + int i; > + > + qe = qm_port->qe4; > + > + sse_qe[0] = _mm_setzero_si128(); > + sse_qe[1] = _mm_setzero_si128(); > + > + switch (num) { > + case 4: > + /* Construct the metadata portion of two HCWs in one 128b SSE > + * register. HCW metadata is constructed in the SSE registers > + * like so: > + * sse_qe[0][63:0]: qe[0]'s metadata > + * sse_qe[0][127:64]: qe[1]'s metadata > + * sse_qe[1][63:0]: qe[2]'s metadata > + * sse_qe[1][127:64]: qe[3]'s metadata > + */ > + > + /* Convert the event operation into a command byte and store it > + * in the metadata: > + * sse_qe[0][63:56] = cmd_byte_map[is_directed][ev[0].op] > + * sse_qe[0][127:120] = cmd_byte_map[is_directed][ev[1].op] > + * sse_qe[1][63:56] = cmd_byte_map[is_directed][ev[2].op] > + * sse_qe[1][127:120] = cmd_byte_map[is_directed][ev[3].op] > + */ > +#define DLB2_QE_CMD_BYTE 7 > + sse_qe[0] = _mm_insert_epi8(sse_qe[0], > + cmd_byte_map[qm_port->is_directed][ev[0].op], > + DLB2_QE_CMD_BYTE); > + sse_qe[0] = _mm_insert_epi8(sse_qe[0], > + cmd_byte_map[qm_port->is_directed][ev[1].op], > + DLB2_QE_CMD_BYTE + 8); > + sse_qe[1] = _mm_insert_epi8(sse_qe[1], > + cmd_byte_map[qm_port->is_directed][ev[2].op], > + DLB2_QE_CMD_BYTE); > + sse_qe[1] = _mm_insert_epi8(sse_qe[1], > + cmd_byte_map[qm_port->is_directed][ev[3].op], > + DLB2_QE_CMD_BYTE + 8); > + > + /* Store priority, scheduling type, and queue ID in the sched > + * word array because these values are re-used when the > + * destination is a directed queue. > + */ > + sched_word[0] = EV_TO_DLB2_PRIO(ev[0].priority) << 10 | > + sched_type[0] << 8 | > + queue_id[0]; > + sched_word[1] = EV_TO_DLB2_PRIO(ev[1].priority) << 10 | > + sched_type[1] << 8 | > + queue_id[1]; > + sched_word[2] = EV_TO_DLB2_PRIO(ev[2].priority) << 10 | > + sched_type[2] << 8 | > + queue_id[2]; > + sched_word[3] = EV_TO_DLB2_PRIO(ev[3].priority) << 10 | > + sched_type[3] << 8 | > + queue_id[3]; > + > + /* Store the event priority, scheduling type, and queue ID in > + * the metadata: > + * sse_qe[0][31:16] = sched_word[0] > + * sse_qe[0][95:80] = sched_word[1] > + * sse_qe[1][31:16] = sched_word[2] > + * sse_qe[1][95:80] = sched_word[3] > + */ > +#define DLB2_QE_QID_SCHED_WORD 1 > + sse_qe[0] = _mm_insert_epi16(sse_qe[0], > + sched_word[0], > + DLB2_QE_QID_SCHED_WORD); > + sse_qe[0] = _mm_insert_epi16(sse_qe[0], > + sched_word[1], > + DLB2_QE_QID_SCHED_WORD + 4); > + sse_qe[1] = _mm_insert_epi16(sse_qe[1], > + sched_word[2], > + DLB2_QE_QID_SCHED_WORD); > + sse_qe[1] = _mm_insert_epi16(sse_qe[1], > + sched_word[3], > + DLB2_QE_QID_SCHED_WORD + 4); > + > + /* If the destination is a load-balanced queue, store the lock > + * ID. If it is a directed queue, DLB places this field in > + * bytes 10-11 of the received QE, so we format it accordingly: > + * sse_qe[0][47:32] = dir queue ? sched_word[0] : flow_id[0] > + * sse_qe[0][111:96] = dir queue ? sched_word[1] : flow_id[1] > + * sse_qe[1][47:32] = dir queue ? sched_word[2] : flow_id[2] > + * sse_qe[1][111:96] = dir queue ? sched_word[3] : flow_id[3] > + */ > +#define DLB2_QE_LOCK_ID_WORD 2 > + sse_qe[0] = _mm_insert_epi16(sse_qe[0], > + (sched_type[0] == DLB2_SCHED_DIRECTED) ? > + sched_word[0] : ev[0].flow_id, > + DLB2_QE_LOCK_ID_WORD); > + sse_qe[0] = _mm_insert_epi16(sse_qe[0], > + (sched_type[1] == DLB2_SCHED_DIRECTED) ? > + sched_word[1] : ev[1].flow_id, > + DLB2_QE_LOCK_ID_WORD + 4); > + sse_qe[1] = _mm_insert_epi16(sse_qe[1], > + (sched_type[2] == DLB2_SCHED_DIRECTED) ? > + sched_word[2] : ev[2].flow_id, > + DLB2_QE_LOCK_ID_WORD); > + sse_qe[1] = _mm_insert_epi16(sse_qe[1], > + (sched_type[3] == DLB2_SCHED_DIRECTED) ? > + sched_word[3] : ev[3].flow_id, > + DLB2_QE_LOCK_ID_WORD + 4); > + > + /* Store the event type and sub event type in the metadata: > + * sse_qe[0][15:0] = flow_id[0] > + * sse_qe[0][79:64] = flow_id[1] > + * sse_qe[1][15:0] = flow_id[2] > + * sse_qe[1][79:64] = flow_id[3] > + */ > +#define DLB2_QE_EV_TYPE_WORD 0 > + sse_qe[0] = _mm_insert_epi16(sse_qe[0], > + ev[0].sub_event_type << 8 | > + ev[0].event_type, > + DLB2_QE_EV_TYPE_WORD); > + sse_qe[0] = _mm_insert_epi16(sse_qe[0], > + ev[1].sub_event_type << 8 | > + ev[1].event_type, > + DLB2_QE_EV_TYPE_WORD + 4); > + sse_qe[1] = _mm_insert_epi16(sse_qe[1], > + ev[2].sub_event_type << 8 | > + ev[2].event_type, > + DLB2_QE_EV_TYPE_WORD); > + sse_qe[1] = _mm_insert_epi16(sse_qe[1], > + ev[3].sub_event_type << 8 | > + ev[3].event_type, > + DLB2_QE_EV_TYPE_WORD + 4); > + > + /* Store the metadata to memory (use the double-precision > + * _mm_storeh_pd because there is no integer function for > + * storing the upper 64b): > + * qe[0] metadata = sse_qe[0][63:0] > + * qe[1] metadata = sse_qe[0][127:64] > + * qe[2] metadata = sse_qe[1][63:0] > + * qe[3] metadata = sse_qe[1][127:64] > + */ > + _mm_storel_epi64((__m128i *)&qe[0].u.opaque_data, sse_qe[0]); > + _mm_storeh_pd((double *)&qe[1].u.opaque_data, > + (__m128d)sse_qe[0]); > + _mm_storel_epi64((__m128i *)&qe[2].u.opaque_data, sse_qe[1]); > + _mm_storeh_pd((double *)&qe[3].u.opaque_data, > + (__m128d)sse_qe[1]); > + > + qe[0].data = ev[0].u64; > + qe[1].data = ev[1].u64; > + qe[2].data = ev[2].u64; > + qe[3].data = ev[3].u64; > + > + break; > + case 3: > + case 2: > + case 1: > + /* At least one QE will be valid, so only zero out three */ > + qe[1].cmd_byte = 0; > + qe[2].cmd_byte = 0; > + qe[3].cmd_byte = 0; > + > + for (i = 0; i < num; i++) { > + qe[i].cmd_byte = > + cmd_byte_map[qm_port->is_directed][ev[i].op]; > + qe[i].sched_type = sched_type[i]; > + qe[i].data = ev[i].u64; > + qe[i].qid = queue_id[i]; > + qe[i].priority = EV_TO_DLB2_PRIO(ev[i].priority); > + qe[i].lock_id = ev[i].flow_id; > + if (sched_type[i] == DLB2_SCHED_DIRECTED) { > + struct dlb2_msg_info *info = > + (struct dlb2_msg_info *)&qe[i].lock_id; > + > + info->qid = queue_id[i]; > + info->sched_type = DLB2_SCHED_DIRECTED; > + info->priority = qe[i].priority; > + } > + qe[i].u.event_type.major = ev[i].event_type; > + qe[i].u.event_type.sub = ev[i].sub_event_type; > + } > + break; > + } > +} > + > +static inline int > +dlb2_event_enqueue_prep(struct dlb2_eventdev_port *ev_port, > + struct dlb2_port *qm_port, > + const struct rte_event ev[], > + uint8_t *sched_type, > + uint8_t *queue_id) > +{ > + struct dlb2_eventdev *dlb2 = ev_port->dlb2; > + struct dlb2_eventdev_queue *ev_queue; > + uint16_t *cached_credits = NULL; > + struct dlb2_queue *qm_queue; > + > + ev_queue = &dlb2->ev_queues[ev->queue_id]; > + qm_queue = &ev_queue->qm_queue; > + *queue_id = qm_queue->id; > + > + /* Ignore sched_type and hardware credits on release events */ > + if (ev->op == RTE_EVENT_OP_RELEASE) > + goto op_check; > + > + if (!qm_queue->is_directed) { > + /* Load balanced destination queue */ > + > + if (dlb2_check_enqueue_hw_ldb_credits(qm_port)) { > + rte_errno = -ENOSPC; > + return 1; > + } > + cached_credits = &qm_port->cached_ldb_credits; > + > + switch (ev->sched_type) { > + case RTE_SCHED_TYPE_ORDERED: > + DLB2_LOG_DBG("dlb2: put_qe: RTE_SCHED_TYPE_ORDERED\n"); > + if (qm_queue->sched_type != RTE_SCHED_TYPE_ORDERED) { > + DLB2_LOG_ERR("dlb2: tried to send ordered event to unordered queue %d\n", > + *queue_id); > + rte_errno = -EINVAL; > + return 1; > + } > + *sched_type = DLB2_SCHED_ORDERED; > + break; > + case RTE_SCHED_TYPE_ATOMIC: > + DLB2_LOG_DBG("dlb2: put_qe: RTE_SCHED_TYPE_ATOMIC\n"); > + *sched_type = DLB2_SCHED_ATOMIC; > + break; > + case RTE_SCHED_TYPE_PARALLEL: > + DLB2_LOG_DBG("dlb2: put_qe: RTE_SCHED_TYPE_PARALLEL\n"); > + if (qm_queue->sched_type == RTE_SCHED_TYPE_ORDERED) > + *sched_type = DLB2_SCHED_ORDERED; > + else > + *sched_type = DLB2_SCHED_UNORDERED; > + break; > + default: > + DLB2_LOG_ERR("Unsupported LDB sched type in put_qe\n"); > + DLB2_INC_STAT(ev_port->stats.tx_invalid, 1); > + rte_errno = -EINVAL; > + return 1; > + } > + } else { > + /* Directed destination queue */ > + > + if (dlb2_check_enqueue_hw_dir_credits(qm_port)) { > + rte_errno = -ENOSPC; > + return 1; > + } > + cached_credits = &qm_port->cached_dir_credits; > + > + DLB2_LOG_DBG("dlb2: put_qe: RTE_SCHED_TYPE_DIRECTED\n"); > + > + *sched_type = DLB2_SCHED_DIRECTED; > + } > + > +op_check: > + switch (ev->op) { > + case RTE_EVENT_OP_NEW: > + /* Check that a sw credit is available */ > + if (dlb2_check_enqueue_sw_credits(dlb2, ev_port)) { > + rte_errno = -ENOSPC; > + return 1; > + } > + ev_port->inflight_credits--; > + (*cached_credits)--; > + break; > + case RTE_EVENT_OP_FORWARD: > + /* Check for outstanding_releases underflow. If this occurs, > + * the application is not using the EVENT_OPs correctly; for > + * example, forwarding or releasing events that were not > + * dequeued. > + */ > + RTE_ASSERT(ev_port->outstanding_releases > 0); > + ev_port->outstanding_releases--; > + qm_port->issued_releases++; > + (*cached_credits)--; > + break; > + case RTE_EVENT_OP_RELEASE: > + ev_port->inflight_credits++; > + /* Check for outstanding_releases underflow. If this occurs, > + * the application is not using the EVENT_OPs correctly; for > + * example, forwarding or releasing events that were not > + * dequeued. > + */ > + RTE_ASSERT(ev_port->outstanding_releases > 0); > + ev_port->outstanding_releases--; > + qm_port->issued_releases++; > + > + /* Replenish s/w credits if enough are cached */ > + dlb2_replenish_sw_credits(dlb2, ev_port); > + break; > + } > + > + DLB2_INC_STAT(ev_port->stats.tx_op_cnt[ev->op], 1); > + DLB2_INC_STAT(ev_port->stats.traffic.tx_ok, 1); > + > +#ifndef RTE_LIBRTE_PMD_DLB2_QUELL_STATS > + if (ev->op != RTE_EVENT_OP_RELEASE) { > + DLB2_INC_STAT(ev_port->stats.queue[ev->queue_id].enq_ok, 1); > + DLB2_INC_STAT(ev_port->stats.tx_sched_cnt[*sched_type], 1); > + } > +#endif > + > + return 0; > +} > + > +static inline uint16_t > +dlb2_event_enqueue_burst(void *event_port, > + const struct rte_event events[], > + uint16_t num) > +{ > + struct dlb2_eventdev_port *ev_port = event_port; > + struct dlb2_port *qm_port = &ev_port->qm_port; > + struct process_local_port_data *port_data; > + int i, cnt; > + > + RTE_ASSERT(ev_port->enq_configured); > + RTE_ASSERT(events != NULL); > + > + cnt = 0; > + > + port_data = &dlb2_port[qm_port->id][PORT_TYPE(qm_port)]; > + > + for (i = 0; i < num; i += DLB2_NUM_QES_PER_CACHE_LINE) { > + uint8_t sched_types[DLB2_NUM_QES_PER_CACHE_LINE]; > + uint8_t queue_ids[DLB2_NUM_QES_PER_CACHE_LINE]; > + int j = 0; > + > + for (; j < DLB2_NUM_QES_PER_CACHE_LINE && (i + j) < num; j++) { > + const struct rte_event *ev = &events[i + j]; > + > + if (dlb2_event_enqueue_prep(ev_port, qm_port, ev, > + &sched_types[j], > + &queue_ids[j])) > + break; > + } > + > + if (j == 0) > + break; > + > + dlb2_event_build_hcws(qm_port, &events[i], j, > + sched_types, queue_ids); > + > + dlb2_hw_do_enqueue(qm_port, i == 0, port_data); > + > + cnt += j; > + > + if (j < DLB2_NUM_QES_PER_CACHE_LINE) > + break; > + } > + > + return cnt; > +} > + > +static inline uint16_t > +dlb2_event_enqueue(void *event_port, > + const struct rte_event events[]) > +{ > + return dlb2_event_enqueue_burst(event_port, events, 1); > +} > + > +static uint16_t > +dlb2_event_enqueue_new_burst(void *event_port, > + const struct rte_event events[], > + uint16_t num) > +{ > + return dlb2_event_enqueue_burst(event_port, events, num); > +} > + > +static uint16_t > +dlb2_event_enqueue_forward_burst(void *event_port, > + const struct rte_event events[], > + uint16_t num) > +{ > + return dlb2_event_enqueue_burst(event_port, events, num); > +} > + > static void > dlb2_entry_points_init(struct rte_eventdev *dev) > { > @@ -2112,7 +2684,13 @@ dlb2_entry_points_init(struct rte_eventdev *dev) > .xstats_reset = dlb2_eventdev_xstats_reset, > }; > > + /* Expose PMD's eventdev interface */ > + > dev->dev_ops = &dlb2_eventdev_entry_ops; > + dev->enqueue = dlb2_event_enqueue; > + dev->enqueue_burst = dlb2_event_enqueue_burst; > + dev->enqueue_new_burst = dlb2_event_enqueue_new_burst; > + dev->enqueue_forward_burst = dlb2_event_enqueue_forward_burst; > } > > int > -- > 2.6.4 >