DPDK usage discussions
 help / color / mirror / Atom feed
* [dpdk-users] rte_hash with rcu usage
@ 2020-08-30 21:19 Volkan Atlı
  0 siblings, 0 replies; only message in thread
From: Volkan Atlı @ 2020-08-30 21:19 UTC (permalink / raw)
  To: users; +Cc: Demir Yavaş

Hi

Since there is not any example codes of how to use DPDK RCU [1] on the internet, I wrote a simple application to understand it (paste below)

I'm trying to track the end of grace period for each struct ctx including an RCU-QS variable. I run the application as two processes, called primary (or controlplane) and secondary (or dataplane). Actually it works as expected. The primary process can add, delete or list the ctx(es). The secondary process can list, and also process only one ctx object by taking RCU and then sleeping 10 seconds to simulate datapath.

The problem here is that since DPDK RCU APIs directly take the RCU itself as a parameter, I first need to lookup the corresponding ctx object in both process command in secondary process and del command in primary process. Hence, this is nonsense. If the both processes return the same ctx object from hash lookup at the same time (race condition), del command may free the ctx object without waiting grace period. I can use locks here but what is the meaning of RCU this time? Should I use a RCU-QS for the whole hash? How can I use it for each hash object? What is the correct use?

Thanks in advance

- Volkan

[1] https://doc.dpdk.org/guides/prog_guide/rcu_lib.html
[https://doc.dpdk.org/guides/_images/rcu_general_info.svg]<https://doc.dpdk.org/guides/prog_guide/rcu_lib.html>
6. RCU Library — Data Plane Development Kit 20.08.0 documentation - DPDK<https://doc.dpdk.org/guides/prog_guide/rcu_lib.html>
6.4. How to use this library. The application must allocate memory and initialize a QS variable. Applications can call rte_rcu_qsbr_get_memsize() to calculate the size of memory to allocate. This API takes a maximum number of reader threads, using this variable, as a parameter.
doc.dpdk.org
----------------------------------------------------------------

#include <stdio.h>
#include <rte_pause.h>
#include <rte_rcu_qsbr.h>
#include <rte_hash.h>
#include <rte_jhash.h>
#include <rte_malloc.h>
#include <rte_cycles.h>
#include <rte_random.h>
#include <unistd.h>

#include <inttypes.h>
#include <stdlib.h>
#include <stdio.h>
#include <netinet/in.h>

#include <rte_hash.h>
#include <rte_atomic.h>

#define DATAPLANE_LCORE_ID 3
#define DEV_ID  0

/* Check condition and return an error if true. */
#define rcu_qsbr_exit_if(cond, str, ...) do { \
    if (cond) { \
        printf("ERROR %s:%d: " str "\n", __FILE__, __LINE__, ##__VA_ARGS__); \
        exit(EXIT_FAILURE); \
    } \
} while (0)

struct ctx {
    uint32_t tid;
    uint32_t ms_addr;

    struct rte_rcu_qsbr *rcu;
};

struct dev {
    struct rte_hash *h_ctx;
};

#define TOTAL_CTX_ENTRY (1024 * 32)

static struct rte_rcu_qsbr *init_rcu(void)
{
    size_t sz = rte_rcu_qsbr_get_memsize(RTE_MAX_LCORE);

    struct rte_rcu_qsbr *rcu_qsbr = rte_zmalloc(NULL, sz, RTE_CACHE_LINE_SIZE);
    rte_rcu_qsbr_init(rcu_qsbr, RTE_MAX_LCORE);

    return rcu_qsbr;
}

static int uninit_rcu(struct rte_rcu_qsbr *rcu)
{
    rte_free(rcu);

    return 0;
}

static struct ctx *init_ctx(uint64_t tid, uint32_t ms_addr)
{
    struct ctx *ctx = rte_malloc(NULL, sizeof(struct ctx), 0);
    rcu_qsbr_exit_if(ctx == NULL, "No memory");

    ctx->tid = tid;
    ctx->ms_addr = ms_addr;

    ctx->rcu = init_rcu();

    return ctx;
}

static struct dev *init_dev(int hash_id)
{
    struct dev *dev = rte_malloc(NULL, sizeof(struct dev), 0);
    rcu_qsbr_exit_if(dev == NULL, "No memory");

    char hash_name[8];
    sprintf(hash_name, "dev%d", hash_id);
    struct rte_hash_parameters hash_params = {
        .entries = TOTAL_CTX_ENTRY,
        .key_len = sizeof(uint32_t),
        .hash_func_init_val = 0,
        .socket_id = rte_socket_id(),
        .hash_func = rte_jhash,
        .extra_flag = RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF,
        .name = hash_name,
    };

    dev->h_ctx = rte_hash_create(&hash_params);
    rcu_qsbr_exit_if(dev->h_ctx == NULL, "Hash create failed");

    return dev;
}

static void dataplane(int dev_id)
{
    char hash_name[8];
    sprintf(hash_name, "dev%d", dev_id);

    struct rte_hash *hash = rte_hash_find_existing(hash_name);
    rcu_qsbr_exit_if(hash == NULL, "Hash not found, please check primary process");

    char *p;

    char line[100];
    do {
        printf("ulak_dp> ");
        fflush(stdout);

        p = fgets(line, sizeof(line) - 1, stdin);

        char command[8];
        uint32_t tid;
        int istatus = sscanf(line, "%7s %u", command, &tid);
        if (istatus == 1 && (strcmp(command, "list") == 0)) {
            const void *next_key;
            void *next_data;
            uint32_t iter = 0;

            if (rte_hash_count(hash) == 0) {
                printf("no entry exits\n");
                continue;
            }

            /* Iterate through the hash table */
            uint32_t line_n = 0;
            while (rte_hash_iterate(hash, &next_key, &next_data, &iter) >= 0) {
                printf("#%u tid=%u ms_addr=%u ptr(ctx)=%p\n", line_n++, *(uint32_t *)next_key, ((struct ctx *)next_data)->ms_addr, next_data);
            }
        } else if (istatus == 2 && (strcmp(command, "process") == 0)) {
            uint32_t *pdata;
            if (rte_hash_lookup_data(hash, &tid, (void **)&pdata) != -ENOENT) {
                struct rte_rcu_qsbr *temp = ((struct ctx *) pdata)->rcu;
                rte_rcu_qsbr_thread_online(temp, DATAPLANE_LCORE_ID);

                /* This API is provided to aid debugging, it will do nothing. */
                rte_rcu_qsbr_lock(temp, DATAPLANE_LCORE_ID);

                printf("processing begin (tid = %u)\n", tid);
                rte_delay_us_sleep(10000000);
                printf("processing end (tid = %u)\n", tid);

                /* This API is provided to aid debugging, it will do nothing. */
                rte_rcu_qsbr_unlock(temp, DATAPLANE_LCORE_ID);

                rte_rcu_qsbr_thread_offline(temp, DATAPLANE_LCORE_ID);

                /* Update quiescent state counter */
                rte_rcu_qsbr_quiescent(temp, DATAPLANE_LCORE_ID);
            } else {
                printf("no entry found (tid = %u)\n", tid);
            }

        }

    } while (p != NULL);
}


static void controlplane(struct dev *dev)
{
    struct rte_hash *hash = dev->h_ctx;
    char *p;

    char line[100];
    do {
        printf("ulak_cp> ");
        fflush(stdout);

        p = fgets(line, sizeof(line) - 1, stdin);

        char command[8];
        uint32_t tid, ms_addr;
        int istatus = sscanf(line, "%7s %u %u", command, &tid, &ms_addr);
        if (istatus == 1 && (strcmp(command, "list") == 0)) {
            const void *next_key;
            void *next_data;
            uint32_t iter = 0;

            if (rte_hash_count(hash) == 0) {
                printf("no entry exits\n");
                continue;
            }

            /* Iterate through the hash table */
            uint32_t line_n = 0;
            while (rte_hash_iterate(hash, &next_key, &next_data, &iter) >= 0) {
                printf("#%u tid=%u ms_addr=%u ptr(ctx)=%p\n", line_n++, *(uint32_t *)next_key, ((struct ctx *)next_data)->ms_addr, next_data);
            }

        } else if (istatus == 2 && (strcmp(command, "del") == 0)) {
            uint32_t *pdata;
            if (rte_hash_lookup_data(hash, &tid, (void **)&pdata) != -ENOENT) {
                struct ctx *ctx = (struct ctx *)pdata;

                int32_t pos = rte_hash_del_key(hash, &tid);

                printf("grace period (ctx->tid = %u)\n", ctx->tid);

                /* Start the quiescent state query process */
                uint64_t token = rte_rcu_qsbr_start(ctx->rcu);
                /* Check the quiescent state status */
                rte_rcu_qsbr_check(ctx->rcu, token, true);

                rcu_qsbr_exit_if(rte_hash_free_key_with_position(hash, pos) < 0, "Failed to free the key #%d\n", tid);

                rte_free(ctx->rcu);
                rte_free(ctx);

                printf("deleted #%d\n", tid);

            } else {
                printf("no entry found (tid = %u)\n", tid);
            }


        } else if (istatus == 3 && (strcmp(command, "add") == 0)) {
            struct ctx *ctx = init_ctx(tid, ms_addr);
            rte_rcu_qsbr_thread_register(ctx->rcu, DATAPLANE_LCORE_ID);

            uint32_t *pdata;
            if (rte_hash_lookup_data(dev->h_ctx, &tid, (void **)&pdata) == -ENOENT) {
                if (rte_hash_add_key_data(dev->h_ctx, &tid, (void *)((uintptr_t)ctx)) < 0) {
                    printf("Hash key add failed\n");
                }
            } else {
                printf("Hash key (tid = %u) already exist\n", tid);
            }
        }
    } while (p != NULL);

    printf("done\n");
}

int main(int argc, char **argv)
{
    int ret = rte_eal_init(argc, argv);
    if (ret < 0)
        rte_exit(EXIT_FAILURE, "Invalid EAL arguments\n");

    if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
        struct dev *dev1 = init_dev(DEV_ID);
        controlplane(dev1);
    } else {
        dataplane(DEV_ID);
    }

    printf("exiting\n");

    return EXIT_SUCCESS;
}


- Volkan

^ permalink raw reply	[flat|nested] only message in thread

only message in thread, other threads:[~2020-08-30 21:19 UTC | newest]

Thread overview: (only message) (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-08-30 21:19 [dpdk-users] rte_hash with rcu usage Volkan Atlı

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).