DPDK usage discussions
 help / color / mirror / Atom feed
From: Volkan Atlı <volkan.atli@b-ulltech.com>
To: "users@dpdk.org" <users@dpdk.org>
Cc: Demir Yavaş <demir.yavas@b-ulltech.com>
Subject: [dpdk-users] rte_hash with rcu usage
Date: Sun, 30 Aug 2020 21:19:44 +0000
Message-ID: <PR2PR08MB47774FF8F7D56DEDBCABA9D6BA500@PR2PR08MB4777.eurprd08.prod.outlook.com> (raw)

Hi

Since there is not any example codes of how to use DPDK RCU [1] on the internet, I wrote a simple application to understand it (paste below)

I'm trying to track the end of grace period for each struct ctx including an RCU-QS variable. I run the application as two processes, called primary (or controlplane) and secondary (or dataplane). Actually it works as expected. The primary process can add, delete or list the ctx(es). The secondary process can list, and also process only one ctx object by taking RCU and then sleeping 10 seconds to simulate datapath.

The problem here is that since DPDK RCU APIs directly take the RCU itself as a parameter, I first need to lookup the corresponding ctx object in both process command in secondary process and del command in primary process. Hence, this is nonsense. If the both processes return the same ctx object from hash lookup at the same time (race condition), del command may free the ctx object without waiting grace period. I can use locks here but what is the meaning of RCU this time? Should I use a RCU-QS for the whole hash? How can I use it for each hash object? What is the correct use?

Thanks in advance

- Volkan

[1] https://doc.dpdk.org/guides/prog_guide/rcu_lib.html
[https://doc.dpdk.org/guides/_images/rcu_general_info.svg]<https://doc.dpdk.org/guides/prog_guide/rcu_lib.html>
6. RCU Library — Data Plane Development Kit 20.08.0 documentation - DPDK<https://doc.dpdk.org/guides/prog_guide/rcu_lib.html>
6.4. How to use this library. The application must allocate memory and initialize a QS variable. Applications can call rte_rcu_qsbr_get_memsize() to calculate the size of memory to allocate. This API takes a maximum number of reader threads, using this variable, as a parameter.
doc.dpdk.org
----------------------------------------------------------------

#include <stdio.h>
#include <rte_pause.h>
#include <rte_rcu_qsbr.h>
#include <rte_hash.h>
#include <rte_jhash.h>
#include <rte_malloc.h>
#include <rte_cycles.h>
#include <rte_random.h>
#include <unistd.h>

#include <inttypes.h>
#include <stdlib.h>
#include <stdio.h>
#include <netinet/in.h>

#include <rte_hash.h>
#include <rte_atomic.h>

#define DATAPLANE_LCORE_ID 3
#define DEV_ID  0

/* Check condition and return an error if true. */
#define rcu_qsbr_exit_if(cond, str, ...) do { \
    if (cond) { \
        printf("ERROR %s:%d: " str "\n", __FILE__, __LINE__, ##__VA_ARGS__); \
        exit(EXIT_FAILURE); \
    } \
} while (0)

struct ctx {
    uint32_t tid;
    uint32_t ms_addr;

    struct rte_rcu_qsbr *rcu;
};

struct dev {
    struct rte_hash *h_ctx;
};

#define TOTAL_CTX_ENTRY (1024 * 32)

static struct rte_rcu_qsbr *init_rcu(void)
{
    size_t sz = rte_rcu_qsbr_get_memsize(RTE_MAX_LCORE);

    struct rte_rcu_qsbr *rcu_qsbr = rte_zmalloc(NULL, sz, RTE_CACHE_LINE_SIZE);
    rte_rcu_qsbr_init(rcu_qsbr, RTE_MAX_LCORE);

    return rcu_qsbr;
}

static int uninit_rcu(struct rte_rcu_qsbr *rcu)
{
    rte_free(rcu);

    return 0;
}

static struct ctx *init_ctx(uint64_t tid, uint32_t ms_addr)
{
    struct ctx *ctx = rte_malloc(NULL, sizeof(struct ctx), 0);
    rcu_qsbr_exit_if(ctx == NULL, "No memory");

    ctx->tid = tid;
    ctx->ms_addr = ms_addr;

    ctx->rcu = init_rcu();

    return ctx;
}

static struct dev *init_dev(int hash_id)
{
    struct dev *dev = rte_malloc(NULL, sizeof(struct dev), 0);
    rcu_qsbr_exit_if(dev == NULL, "No memory");

    char hash_name[8];
    sprintf(hash_name, "dev%d", hash_id);
    struct rte_hash_parameters hash_params = {
        .entries = TOTAL_CTX_ENTRY,
        .key_len = sizeof(uint32_t),
        .hash_func_init_val = 0,
        .socket_id = rte_socket_id(),
        .hash_func = rte_jhash,
        .extra_flag = RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF,
        .name = hash_name,
    };

    dev->h_ctx = rte_hash_create(&hash_params);
    rcu_qsbr_exit_if(dev->h_ctx == NULL, "Hash create failed");

    return dev;
}

static void dataplane(int dev_id)
{
    char hash_name[8];
    sprintf(hash_name, "dev%d", dev_id);

    struct rte_hash *hash = rte_hash_find_existing(hash_name);
    rcu_qsbr_exit_if(hash == NULL, "Hash not found, please check primary process");

    char *p;

    char line[100];
    do {
        printf("ulak_dp> ");
        fflush(stdout);

        p = fgets(line, sizeof(line) - 1, stdin);

        char command[8];
        uint32_t tid;
        int istatus = sscanf(line, "%7s %u", command, &tid);
        if (istatus == 1 && (strcmp(command, "list") == 0)) {
            const void *next_key;
            void *next_data;
            uint32_t iter = 0;

            if (rte_hash_count(hash) == 0) {
                printf("no entry exits\n");
                continue;
            }

            /* Iterate through the hash table */
            uint32_t line_n = 0;
            while (rte_hash_iterate(hash, &next_key, &next_data, &iter) >= 0) {
                printf("#%u tid=%u ms_addr=%u ptr(ctx)=%p\n", line_n++, *(uint32_t *)next_key, ((struct ctx *)next_data)->ms_addr, next_data);
            }
        } else if (istatus == 2 && (strcmp(command, "process") == 0)) {
            uint32_t *pdata;
            if (rte_hash_lookup_data(hash, &tid, (void **)&pdata) != -ENOENT) {
                struct rte_rcu_qsbr *temp = ((struct ctx *) pdata)->rcu;
                rte_rcu_qsbr_thread_online(temp, DATAPLANE_LCORE_ID);

                /* This API is provided to aid debugging, it will do nothing. */
                rte_rcu_qsbr_lock(temp, DATAPLANE_LCORE_ID);

                printf("processing begin (tid = %u)\n", tid);
                rte_delay_us_sleep(10000000);
                printf("processing end (tid = %u)\n", tid);

                /* This API is provided to aid debugging, it will do nothing. */
                rte_rcu_qsbr_unlock(temp, DATAPLANE_LCORE_ID);

                rte_rcu_qsbr_thread_offline(temp, DATAPLANE_LCORE_ID);

                /* Update quiescent state counter */
                rte_rcu_qsbr_quiescent(temp, DATAPLANE_LCORE_ID);
            } else {
                printf("no entry found (tid = %u)\n", tid);
            }

        }

    } while (p != NULL);
}


static void controlplane(struct dev *dev)
{
    struct rte_hash *hash = dev->h_ctx;
    char *p;

    char line[100];
    do {
        printf("ulak_cp> ");
        fflush(stdout);

        p = fgets(line, sizeof(line) - 1, stdin);

        char command[8];
        uint32_t tid, ms_addr;
        int istatus = sscanf(line, "%7s %u %u", command, &tid, &ms_addr);
        if (istatus == 1 && (strcmp(command, "list") == 0)) {
            const void *next_key;
            void *next_data;
            uint32_t iter = 0;

            if (rte_hash_count(hash) == 0) {
                printf("no entry exits\n");
                continue;
            }

            /* Iterate through the hash table */
            uint32_t line_n = 0;
            while (rte_hash_iterate(hash, &next_key, &next_data, &iter) >= 0) {
                printf("#%u tid=%u ms_addr=%u ptr(ctx)=%p\n", line_n++, *(uint32_t *)next_key, ((struct ctx *)next_data)->ms_addr, next_data);
            }

        } else if (istatus == 2 && (strcmp(command, "del") == 0)) {
            uint32_t *pdata;
            if (rte_hash_lookup_data(hash, &tid, (void **)&pdata) != -ENOENT) {
                struct ctx *ctx = (struct ctx *)pdata;

                int32_t pos = rte_hash_del_key(hash, &tid);

                printf("grace period (ctx->tid = %u)\n", ctx->tid);

                /* Start the quiescent state query process */
                uint64_t token = rte_rcu_qsbr_start(ctx->rcu);
                /* Check the quiescent state status */
                rte_rcu_qsbr_check(ctx->rcu, token, true);

                rcu_qsbr_exit_if(rte_hash_free_key_with_position(hash, pos) < 0, "Failed to free the key #%d\n", tid);

                rte_free(ctx->rcu);
                rte_free(ctx);

                printf("deleted #%d\n", tid);

            } else {
                printf("no entry found (tid = %u)\n", tid);
            }


        } else if (istatus == 3 && (strcmp(command, "add") == 0)) {
            struct ctx *ctx = init_ctx(tid, ms_addr);
            rte_rcu_qsbr_thread_register(ctx->rcu, DATAPLANE_LCORE_ID);

            uint32_t *pdata;
            if (rte_hash_lookup_data(dev->h_ctx, &tid, (void **)&pdata) == -ENOENT) {
                if (rte_hash_add_key_data(dev->h_ctx, &tid, (void *)((uintptr_t)ctx)) < 0) {
                    printf("Hash key add failed\n");
                }
            } else {
                printf("Hash key (tid = %u) already exist\n", tid);
            }
        }
    } while (p != NULL);

    printf("done\n");
}

int main(int argc, char **argv)
{
    int ret = rte_eal_init(argc, argv);
    if (ret < 0)
        rte_exit(EXIT_FAILURE, "Invalid EAL arguments\n");

    if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
        struct dev *dev1 = init_dev(DEV_ID);
        controlplane(dev1);
    } else {
        dataplane(DEV_ID);
    }

    printf("exiting\n");

    return EXIT_SUCCESS;
}


- Volkan

                 reply	other threads:[~2020-08-30 21:19 UTC|newest]

Thread overview: [no followups] expand[flat|nested]  mbox.gz  Atom feed

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=PR2PR08MB47774FF8F7D56DEDBCABA9D6BA500@PR2PR08MB4777.eurprd08.prod.outlook.com \
    --to=volkan.atli@b-ulltech.com \
    --cc=demir.yavas@b-ulltech.com \
    --cc=users@dpdk.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

DPDK usage discussions

This inbox may be cloned and mirrored by anyone:

	git clone --mirror https://inbox.dpdk.org/users/0 users/git/0.git

	# If you have public-inbox 1.1+ installed, you may
	# initialize and index your mirror using the following commands:
	public-inbox-init -V2 users users/ https://inbox.dpdk.org/users \
		users@dpdk.org
	public-inbox-index users

Example config snippet for mirrors.
Newsgroup available over NNTP:
	nntp://inbox.dpdk.org/inbox.dpdk.users


AGPL code for this site: git clone https://public-inbox.org/public-inbox.git