From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mails.dpdk.org (mails.dpdk.org [217.70.189.124]) by inbox.dpdk.org (Postfix) with ESMTP id E9EF9A0C56; Wed, 8 Sep 2021 12:31:29 +0200 (CEST) Received: from [217.70.189.124] (localhost [127.0.0.1]) by mails.dpdk.org (Postfix) with ESMTP id 25A12411D6; Wed, 8 Sep 2021 12:30:44 +0200 (CEST) Received: from mga06.intel.com (mga06.intel.com [134.134.136.31]) by mails.dpdk.org (Postfix) with ESMTP id 182B5411D6 for ; Wed, 8 Sep 2021 12:30:41 +0200 (CEST) X-IronPort-AV: E=McAfee;i="6200,9189,10100"; a="281461969" X-IronPort-AV: E=Sophos;i="5.85,277,1624345200"; d="scan'208";a="281461969" Received: from orsmga001.jf.intel.com ([10.7.209.18]) by orsmga104.jf.intel.com with ESMTP/TLS/ECDHE-RSA-AES256-GCM-SHA384; 08 Sep 2021 03:30:41 -0700 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.85,277,1624345200"; d="scan'208";a="513213905" Received: from silpixa00401122.ir.intel.com ([10.55.128.10]) by orsmga001.jf.intel.com with ESMTP; 08 Sep 2021 03:30:39 -0700 From: Kevin Laatz To: dev@dpdk.org Cc: bruce.richardson@intel.com, fengchengwen@huawei.com, jerinj@marvell.com, conor.walsh@intel.com, Kevin Laatz Date: Wed, 8 Sep 2021 10:30:10 +0000 Message-Id: <20210908103016.1661914-12-kevin.laatz@intel.com> X-Mailer: git-send-email 2.30.2 In-Reply-To: <20210908103016.1661914-1-kevin.laatz@intel.com> References: <20210903105001.1179328-1-kevin.laatz@intel.com> <20210908103016.1661914-1-kevin.laatz@intel.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [dpdk-dev] [PATCH v3 11/17] dma/idxd: add data-path job completion functions X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" Add the data path functions for gathering completed operations. Signed-off-by: Bruce Richardson Signed-off-by: Kevin Laatz --- v2: - fixed typo in docs - add completion status for invalid opcode --- doc/guides/dmadevs/idxd.rst | 25 ++++ drivers/dma/idxd/idxd_common.c | 237 +++++++++++++++++++++++++++++++ drivers/dma/idxd/idxd_internal.h | 5 + 3 files changed, 267 insertions(+) diff --git a/doc/guides/dmadevs/idxd.rst b/doc/guides/dmadevs/idxd.rst index 0c4c105e0f..b0b5632b48 100644 --- a/doc/guides/dmadevs/idxd.rst +++ b/doc/guides/dmadevs/idxd.rst @@ -209,6 +209,31 @@ device and start the hardware processing of them: } rte_dmadev_submit(dev_id, vchan); +To retrieve information about completed copies, ``rte_dmadev_completed()`` and +``rte_dmadev_completed_status()`` APIs should be used. ``rte_dmadev_completed()`` +will return the number of completed operations, along with the index of the last +successful completed operation and whether or not an error was encountered. If an +error was encountered, ``rte_dmadev_completed_status()`` must be used to kick the +device off to continue processing operations and also to gather the status of each +individual operations which is filled in to the ``status`` array provided as +parameter by the application. + +The following code shows how to retrieve the number of successfully completed +copies within a burst and then using ``rte_dmadev_completed_status()`` to check +which operation failed and kick off the device to continue processing operations: + +.. code-block:: C + + enum rte_dma_status_code status[COMP_BURST_SZ]; + uint16_t count, idx, status_count; + bool error = 0; + + count = rte_dmadev_completed(dev_id, vchan, COMP_BURST_SZ, &idx, &error); + + if (error){ + status_count = rte_dmadev_completed_status(dev_id, vchan, COMP_BURST_SZ, &idx, status); + } + Filling an Area of Memory ~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/drivers/dma/idxd/idxd_common.c b/drivers/dma/idxd/idxd_common.c index 69851defba..8eb73fdcc6 100644 --- a/drivers/dma/idxd/idxd_common.c +++ b/drivers/dma/idxd/idxd_common.c @@ -143,6 +143,241 @@ idxd_submit(struct rte_dmadev *dev, uint16_t qid __rte_unused) return 0; } +static enum rte_dma_status_code +get_comp_status(struct idxd_completion *c) +{ + uint8_t st = c->status; + switch (st) { + /* successful descriptors are not written back normally */ + case IDXD_COMP_STATUS_INCOMPLETE: + case IDXD_COMP_STATUS_SUCCESS: + return RTE_DMA_STATUS_SUCCESSFUL; + case IDXD_COMP_STATUS_INVALID_OPCODE: + return RTE_DMA_STATUS_INVALID_OPCODE; + case IDXD_COMP_STATUS_INVALID_SIZE: + return RTE_DMA_STATUS_INVALID_LENGTH; + case IDXD_COMP_STATUS_SKIPPED: + return RTE_DMA_STATUS_NOT_ATTEMPTED; + default: + return RTE_DMA_STATUS_ERROR_UNKNOWN; + } +} + +static __rte_always_inline int +batch_ok(struct idxd_dmadev *idxd, uint8_t max_ops, enum rte_dma_status_code *status) +{ + uint16_t ret; + uint8_t bstatus; + + if (max_ops == 0) + return 0; + + /* first check if there are any unreturned handles from last time */ + if (idxd->ids_avail != idxd->ids_returned) { + ret = RTE_MIN((uint16_t)(idxd->ids_avail - idxd->ids_returned), max_ops); + idxd->ids_returned += ret; + if (status) + memset(status, RTE_DMA_STATUS_SUCCESSFUL, ret * sizeof(*status)); + return ret; + } + + if (idxd->batch_idx_read == idxd->batch_idx_write) + return 0; + + bstatus = idxd->batch_comp_ring[idxd->batch_idx_read].status; + /* now check if next batch is complete and successful */ + if (bstatus == IDXD_COMP_STATUS_SUCCESS) { + /* since the batch idx ring stores the start of each batch, pre-increment to lookup + * start of next batch. + */ + if (++idxd->batch_idx_read > idxd->max_batches) + idxd->batch_idx_read = 0; + idxd->ids_avail = idxd->batch_idx_ring[idxd->batch_idx_read]; + + ret = RTE_MIN((uint16_t)(idxd->ids_avail - idxd->ids_returned), max_ops); + idxd->ids_returned += ret; + if (status) + memset(status, RTE_DMA_STATUS_SUCCESSFUL, ret * sizeof(*status)); + return ret; + } + /* check if batch is incomplete */ + else if (bstatus == IDXD_COMP_STATUS_INCOMPLETE) + return 0; + + return -1; /* error case */ +} + +static inline uint16_t +batch_completed(struct idxd_dmadev *idxd, uint8_t max_ops, bool *has_error) +{ + uint16_t i; + uint16_t b_start, b_end, next_batch; + + int ret = batch_ok(idxd, max_ops, NULL); + if (ret >= 0) + return ret; + + /* ERROR case, not successful, not incomplete */ + /* Get the batch size, and special case size 1. + * once we identify the actual failure job, return other jobs, then update + * the batch ring indexes to make it look like the first job of the batch has failed. + * Subsequent calls here will always return zero packets, and the error must be cleared by + * calling the completed_status() function. + */ + next_batch = (idxd->batch_idx_read + 1); + if (next_batch > idxd->max_batches) + next_batch = 0; + b_start = idxd->batch_idx_ring[idxd->batch_idx_read]; + b_end = idxd->batch_idx_ring[next_batch]; + + if (b_end - b_start == 1) { /* not a batch */ + *has_error = true; + return 0; + } + + for (i = b_start; i < b_end; i++) { + struct idxd_completion *c = (void *)&idxd->desc_ring[i & idxd->desc_ring_mask]; + if (c->status > IDXD_COMP_STATUS_SUCCESS) /* ignore incomplete(0) and success(1) */ + break; + } + ret = RTE_MIN((uint16_t)(i - idxd->ids_returned), max_ops); + if (ret < max_ops) + *has_error = true; /* we got up to the point of error */ + idxd->ids_avail = idxd->ids_returned += ret; + + /* to ensure we can call twice and just return 0, set start of batch to where we finished */ + idxd->batch_comp_ring[idxd->batch_idx_read].completed_size -= ret; + idxd->batch_idx_ring[idxd->batch_idx_read] += ret; + if (idxd->batch_idx_ring[next_batch] - idxd->batch_idx_ring[idxd->batch_idx_read] == 1) { + /* copy over the descriptor status to the batch ring as if no batch */ + uint16_t d_idx = idxd->batch_idx_ring[idxd->batch_idx_read] & idxd->desc_ring_mask; + struct idxd_completion *desc_comp = (void *)&idxd->desc_ring[d_idx]; + idxd->batch_comp_ring[idxd->batch_idx_read].status = desc_comp->status; + } + + return ret; +} + +static uint16_t +batch_completed_status(struct idxd_dmadev *idxd, uint16_t max_ops, enum rte_dma_status_code *status) +{ + uint16_t next_batch; + + int ret = batch_ok(idxd, max_ops, status); + if (ret >= 0) + return ret; + + /* ERROR case, not successful, not incomplete */ + /* Get the batch size, and special case size 1. + */ + next_batch = (idxd->batch_idx_read + 1); + if (next_batch > idxd->max_batches) + next_batch = 0; + const uint16_t b_start = idxd->batch_idx_ring[idxd->batch_idx_read]; + const uint16_t b_end = idxd->batch_idx_ring[next_batch]; + const uint16_t b_len = b_end - b_start; + if (b_len == 1) {/* not a batch */ + *status = get_comp_status(&idxd->batch_comp_ring[idxd->batch_idx_read]); + idxd->ids_avail++; + idxd->ids_returned++; + idxd->batch_idx_read = next_batch; + return 1; + } + + /* not a single-element batch, need to process more. + * Scenarios: + * 1. max_ops >= batch_size - can fit everything, simple case + * - loop through completed ops and then add on any not-attempted ones + * 2. max_ops < batch_size - can't fit everything, more complex case + * - loop through completed/incomplete and stop when hit max_ops + * - adjust the batch descriptor to update where we stopped, with appropriate bcount + * - if bcount is to be exactly 1, update the batch descriptor as it will be treated as + * non-batch next time. + */ + const uint16_t bcount = idxd->batch_comp_ring[idxd->batch_idx_read].completed_size; + for (ret = 0; ret < b_len && ret < max_ops; ret++) { + struct idxd_completion *c = (void *) + &idxd->desc_ring[(b_start + ret) & idxd->desc_ring_mask]; + status[ret] = (ret < bcount) ? get_comp_status(c) : RTE_DMA_STATUS_NOT_ATTEMPTED; + } + idxd->ids_avail = idxd->ids_returned += ret; + + /* everything fit */ + if (ret == b_len) { + idxd->batch_idx_read = next_batch; + return ret; + } + + /* set up for next time, update existing batch descriptor & start idx at batch_idx_read */ + idxd->batch_idx_ring[idxd->batch_idx_read] += ret; + if (ret > bcount) { + /* we have only incomplete ones - set batch completed size to 0 */ + struct idxd_completion *comp = &idxd->batch_comp_ring[idxd->batch_idx_read]; + comp->completed_size = 0; + /* if there is only one descriptor left, job skipped so set flag appropriately */ + if (b_len - ret == 1) + comp->status = IDXD_COMP_STATUS_SKIPPED; + } else { + struct idxd_completion *comp = &idxd->batch_comp_ring[idxd->batch_idx_read]; + comp->completed_size -= ret; + /* if there is only one descriptor left, copy status info straight to desc */ + if (comp->completed_size == 1) { + struct idxd_completion *c = (void *) + &idxd->desc_ring[(b_start + ret) & idxd->desc_ring_mask]; + comp->status = c->status; + /* individual descs can be ok without writeback, but not batches */ + if (comp->status == IDXD_COMP_STATUS_INCOMPLETE) + comp->status = IDXD_COMP_STATUS_SUCCESS; + } else if (bcount == b_len) { + /* check if we still have an error, and clear flag if not */ + uint16_t i; + for (i = b_start + ret; i < b_end; i++) { + struct idxd_completion *c = (void *) + &idxd->desc_ring[i & idxd->desc_ring_mask]; + if (c->status > IDXD_COMP_STATUS_SUCCESS) + break; + } + if (i == b_end) /* no errors */ + comp->status = IDXD_COMP_STATUS_SUCCESS; + } + } + + return ret; +} + +uint16_t +idxd_completed(struct rte_dmadev *dev, uint16_t qid __rte_unused, uint16_t max_ops, + uint16_t *last_idx, bool *has_error) +{ + struct idxd_dmadev *idxd = dev->dev_private; + uint16_t batch, ret = 0; + + do { + batch = batch_completed(idxd, max_ops - ret, has_error); + ret += batch; + } while (batch > 0 && *has_error == false); + + *last_idx = idxd->ids_returned - 1; + return ret; +} + +uint16_t +idxd_completed_status(struct rte_dmadev *dev, uint16_t qid __rte_unused, uint16_t max_ops, + uint16_t *last_idx, enum rte_dma_status_code *status) +{ + struct idxd_dmadev *idxd = dev->dev_private; + + uint16_t batch, ret = 0; + + do { + batch = batch_completed_status(idxd, max_ops - ret, &status[ret]); + ret += batch; + } while (batch > 0); + + *last_idx = idxd->ids_returned - 1; + return ret; +} + int idxd_dump(const struct rte_dmadev *dev, FILE *f) { @@ -277,6 +512,8 @@ idxd_dmadev_create(const char *name, struct rte_device *dev, dmadev->copy = idxd_enqueue_copy; dmadev->fill = idxd_enqueue_fill; dmadev->submit = idxd_submit; + dmadev->completed = idxd_completed; + dmadev->completed_status = idxd_completed_status; idxd = rte_malloc_socket(NULL, sizeof(struct idxd_dmadev), 0, dev->numa_node); if (idxd == NULL) { diff --git a/drivers/dma/idxd/idxd_internal.h b/drivers/dma/idxd/idxd_internal.h index 7017d252b4..84d45a09d6 100644 --- a/drivers/dma/idxd/idxd_internal.h +++ b/drivers/dma/idxd/idxd_internal.h @@ -93,5 +93,10 @@ int idxd_enqueue_copy(struct rte_dmadev *dev, uint16_t qid, rte_iova_t src, int idxd_enqueue_fill(struct rte_dmadev *dev, uint16_t qid, uint64_t pattern, rte_iova_t dst, unsigned int length, uint64_t flags); int idxd_submit(struct rte_dmadev *dev, uint16_t qid); +uint16_t idxd_completed(struct rte_dmadev *dev, uint16_t qid, uint16_t max_ops, + uint16_t *last_idx, bool *has_error); +uint16_t idxd_completed_status(struct rte_dmadev *dev, uint16_t qid __rte_unused, + uint16_t max_ops, uint16_t *last_idx, + enum rte_dma_status_code *status); #endif /* _IDXD_INTERNAL_H_ */ -- 2.30.2