* [RFC 1/6] dumpcap: handle primary process exit
2025-08-11 21:34 [RFC 0/6] Packet capture fixes Stephen Hemminger
@ 2025-08-11 21:34 ` Stephen Hemminger
2025-08-11 21:35 ` [RFC 2/6] pdump: " Stephen Hemminger
` (4 subsequent siblings)
5 siblings, 0 replies; 7+ messages in thread
From: Stephen Hemminger @ 2025-08-11 21:34 UTC (permalink / raw)
To: dev; +Cc: Stephen Hemminger, stable
If primary process exits, then it is not possible (or needed)
to cleanup resources. Instead just exit after closing the
capture file.
Bugzilla ID: 1760
Fixes: cbb44143be74 ("app/dumpcap: add new packet capture application")
Cc: stable@dpdk.org
Signed-off-by: Stephen Hemminger <stephen@networkplumber.org>
---
app/dumpcap/main.c | 4 ++++
1 file changed, 4 insertions(+)
diff --git a/app/dumpcap/main.c b/app/dumpcap/main.c
index 3d3c0dbc66..7b19c830b5 100644
--- a/app/dumpcap/main.c
+++ b/app/dumpcap/main.c
@@ -1058,6 +1058,10 @@ int main(int argc, char **argv)
else
pcap_dump_close(out.dumper);
+ /* If primary has exited, do not try and communicate with it */
+ if (!rte_eal_primary_proc_alive(NULL))
+ return 0;
+
cleanup_pdump_resources();
rte_ring_free(r);
--
2.47.2
^ permalink raw reply [flat|nested] 7+ messages in thread
* [RFC 2/6] pdump: handle primary process exit
2025-08-11 21:34 [RFC 0/6] Packet capture fixes Stephen Hemminger
2025-08-11 21:34 ` [RFC 1/6] dumpcap: handle primary process exit Stephen Hemminger
@ 2025-08-11 21:35 ` Stephen Hemminger
2025-08-11 21:35 ` [RFC 3/6] pdump: fix races in callbacks Stephen Hemminger
` (3 subsequent siblings)
5 siblings, 0 replies; 7+ messages in thread
From: Stephen Hemminger @ 2025-08-11 21:35 UTC (permalink / raw)
To: dev; +Cc: Stephen Hemminger, stable
If primary process exits, then it is not possible (or needed)
to cleanup resources. Instead just exit after closing the
capture file.
Bugzilla ID: 1760
Fixes: a99a311ba101 ("app/pdump: exit with primary process")
Cc: stable@dpdk.org
Signed-off-by: Stephen Hemminger <stephen@networkplumber.org>
---
app/pdump/main.c | 12 +++++++-----
1 file changed, 7 insertions(+), 5 deletions(-)
diff --git a/app/pdump/main.c b/app/pdump/main.c
index fa85859703..1741d7e709 100644
--- a/app/pdump/main.c
+++ b/app/pdump/main.c
@@ -1028,13 +1028,15 @@ main(int argc, char **argv)
dump_packets();
disable_primary_monitor();
- cleanup_pdump_resources();
+
/* dump debug stats */
print_pdump_stats();
- ret = rte_eal_cleanup();
- if (ret)
- printf("Error from rte_eal_cleanup(), %d\n", ret);
+ /* If primary has exited, do not try and communicate with it */
+ if (!rte_eal_primary_proc_alive(NULL))
+ return 0;
- return 0;
+ cleanup_pdump_resources();
+
+ return rte_eal_cleanup() ? EXIT_FAILURE : 0;
}
--
2.47.2
^ permalink raw reply [flat|nested] 7+ messages in thread
* [RFC 3/6] pdump: fix races in callbacks
2025-08-11 21:34 [RFC 0/6] Packet capture fixes Stephen Hemminger
2025-08-11 21:34 ` [RFC 1/6] dumpcap: handle primary process exit Stephen Hemminger
2025-08-11 21:35 ` [RFC 2/6] pdump: " Stephen Hemminger
@ 2025-08-11 21:35 ` Stephen Hemminger
2025-08-11 21:35 ` [RFC 4/6] dumpcap: handle pdump requests from primary Stephen Hemminger
` (2 subsequent siblings)
5 siblings, 0 replies; 7+ messages in thread
From: Stephen Hemminger @ 2025-08-11 21:35 UTC (permalink / raw)
To: dev; +Cc: Stephen Hemminger
The pdump callback can race with other cpu's in the datapath.
Handle this by using reference counts and LSB in manner
similar to seqcount and bpf code.
Signed-off-by: Stephen Hemminger <stephen@networkplumber.org>
---
lib/pdump/rte_pdump.c | 48 +++++++++++++++++++++++++++++++++++++++++--
1 file changed, 46 insertions(+), 2 deletions(-)
diff --git a/lib/pdump/rte_pdump.c b/lib/pdump/rte_pdump.c
index ba75b828f2..bfd63fa8c2 100644
--- a/lib/pdump/rte_pdump.c
+++ b/lib/pdump/rte_pdump.c
@@ -12,6 +12,7 @@
#include <rte_memzone.h>
#include <rte_errno.h>
#include <rte_string_fns.h>
+#include <rte_pause.h>
#include <rte_pcapng.h>
#include "rte_pdump.h"
@@ -62,6 +63,7 @@ static struct pdump_rxtx_cbs {
const struct rte_bpf *filter;
enum pdump_version ver;
uint32_t snaplen;
+ RTE_ATOMIC(uint32_t) use_count;
} rx_cbs[RTE_MAX_ETHPORTS][RTE_MAX_QUEUES_PER_PORT],
tx_cbs[RTE_MAX_ETHPORTS][RTE_MAX_QUEUES_PER_PORT];
@@ -78,6 +80,36 @@ static struct {
const struct rte_memzone *mz;
} *pdump_stats;
+static void
+pdump_cb_wait(struct pdump_rxtx_cbs *cbs)
+{
+ /* make sure the data loads happens before the use count load */
+ rte_atomic_thread_fence(rte_memory_order_acquire);
+
+ /* wait until use_count is even (not in use) */
+ RTE_WAIT_UNTIL_MASKED(&cbs->use_count, 1, ==, 0, rte_memory_order_relaxed);
+}
+
+static __rte_always_inline void
+pdump_cb_hold(struct pdump_rxtx_cbs *cbs)
+{
+ uint32_t count = cbs->use_count + 1;
+
+ rte_atomic_store_explicit(&cbs->use_count, count, rte_memory_order_relaxed);
+
+ /* prevent stores after this from happening before the use_count update */
+ rte_atomic_thread_fence(rte_memory_order_release);
+}
+
+static __rte_always_inline void
+pdump_cb_release(struct pdump_rxtx_cbs *cbs)
+{
+ uint32_t count = cbs->use_count + 1;
+
+ /* Synchronizes-with the load acquire in pdump_cb_wait */
+ rte_atomic_store_explicit(&cbs->use_count, count, rte_memory_order_release);
+}
+
/* Create a clone of mbuf to be placed into ring. */
static void
pdump_copy(uint16_t port_id, uint16_t queue,
@@ -146,11 +178,14 @@ pdump_rx(uint16_t port, uint16_t queue,
struct rte_mbuf **pkts, uint16_t nb_pkts,
uint16_t max_pkts __rte_unused, void *user_params)
{
- const struct pdump_rxtx_cbs *cbs = user_params;
+ struct pdump_rxtx_cbs *cbs = user_params;
struct rte_pdump_stats *stats = &pdump_stats->rx[port][queue];
+ pdump_cb_hold(cbs);
pdump_copy(port, queue, RTE_PCAPNG_DIRECTION_IN,
pkts, nb_pkts, cbs, stats);
+ pdump_cb_release(cbs);
+
return nb_pkts;
}
@@ -158,14 +193,18 @@ static uint16_t
pdump_tx(uint16_t port, uint16_t queue,
struct rte_mbuf **pkts, uint16_t nb_pkts, void *user_params)
{
- const struct pdump_rxtx_cbs *cbs = user_params;
+ struct pdump_rxtx_cbs *cbs = user_params;
struct rte_pdump_stats *stats = &pdump_stats->tx[port][queue];
+ pdump_cb_hold(cbs);
pdump_copy(port, queue, RTE_PCAPNG_DIRECTION_OUT,
pkts, nb_pkts, cbs, stats);
+ pdump_cb_release(cbs);
+
return nb_pkts;
}
+
static int
pdump_register_rx_callbacks(enum pdump_version ver,
uint16_t end_q, uint16_t port, uint16_t queue,
@@ -186,6 +225,7 @@ pdump_register_rx_callbacks(enum pdump_version ver,
port, qid);
return -EEXIST;
}
+ cbs->use_count = 0;
cbs->ver = ver;
cbs->ring = ring;
cbs->mp = mp;
@@ -218,6 +258,7 @@ pdump_register_rx_callbacks(enum pdump_version ver,
-ret);
return ret;
}
+ pdump_cb_wait(cbs);
cbs->cb = NULL;
}
}
@@ -246,6 +287,7 @@ pdump_register_tx_callbacks(enum pdump_version ver,
port, qid);
return -EEXIST;
}
+ cbs->use_count = 0;
cbs->ver = ver;
cbs->ring = ring;
cbs->mp = mp;
@@ -277,6 +319,8 @@ pdump_register_tx_callbacks(enum pdump_version ver,
-ret);
return ret;
}
+
+ pdump_cb_wait(cbs);
cbs->cb = NULL;
}
}
--
2.47.2
^ permalink raw reply [flat|nested] 7+ messages in thread
* [RFC 4/6] dumpcap: handle pdump requests from primary
2025-08-11 21:34 [RFC 0/6] Packet capture fixes Stephen Hemminger
` (2 preceding siblings ...)
2025-08-11 21:35 ` [RFC 3/6] pdump: fix races in callbacks Stephen Hemminger
@ 2025-08-11 21:35 ` Stephen Hemminger
2025-08-11 21:35 ` [RFC 5/6] pdump: " Stephen Hemminger
2025-08-11 21:35 ` [RFC 6/6] pdump: forward callback enable to secondary Stephen Hemminger
5 siblings, 0 replies; 7+ messages in thread
From: Stephen Hemminger @ 2025-08-11 21:35 UTC (permalink / raw)
To: dev; +Cc: Stephen Hemminger
The primary process will start to notify all secondary processes
about pdump changes. The dumpcap secondary process can just call
rte_pdump_init() and it take care of that.
Signed-off-by: Stephen Hemminger <stephen@networkplumber.org>
---
app/dumpcap/main.c | 10 ++++++++++
1 file changed, 10 insertions(+)
diff --git a/app/dumpcap/main.c b/app/dumpcap/main.c
index 7b19c830b5..c734fc7b9d 100644
--- a/app/dumpcap/main.c
+++ b/app/dumpcap/main.c
@@ -528,6 +528,8 @@ cleanup_pdump_resources(void)
if (intf->opts.promisc_mode)
rte_eth_promiscuous_disable(intf->port);
}
+
+ rte_pdump_uninit();
}
/* Alarm signal handler, used to check that primary process */
@@ -659,6 +661,14 @@ static void dpdk_init(void)
if (rte_eal_init(eal_argc, eal_argv) < 0)
rte_exit(EXIT_FAILURE, "EAL init failed: is primary process running?\n");
+ /*
+ * Register pdump callback handler.
+ * Primary will notify all secondary processes of change.
+ * No impact for this application, but need to reply.
+ */
+ if (rte_pdump_init() < 0)
+ rte_exit(EXIT_FAILURE, "EAL pdump init failed\n");
+
/*
* If no lcore argument was specified, then run this program as a normal process
* which can be scheduled on any non-isolated CPU.
--
2.47.2
^ permalink raw reply [flat|nested] 7+ messages in thread
* [RFC 5/6] pdump: handle pdump requests from primary
2025-08-11 21:34 [RFC 0/6] Packet capture fixes Stephen Hemminger
` (3 preceding siblings ...)
2025-08-11 21:35 ` [RFC 4/6] dumpcap: handle pdump requests from primary Stephen Hemminger
@ 2025-08-11 21:35 ` Stephen Hemminger
2025-08-11 21:35 ` [RFC 6/6] pdump: forward callback enable to secondary Stephen Hemminger
5 siblings, 0 replies; 7+ messages in thread
From: Stephen Hemminger @ 2025-08-11 21:35 UTC (permalink / raw)
To: dev; +Cc: Stephen Hemminger
The primary process will start to notify all secondary processes
about pdump changes. The pdump secondary process can just call
rte_pdump_init() and it take care of that.
Signed-off-by: Stephen Hemminger <stephen@networkplumber.org>
---
app/pdump/main.c | 4 ++++
1 file changed, 4 insertions(+)
diff --git a/app/pdump/main.c b/app/pdump/main.c
index 1741d7e709..626ba0ce93 100644
--- a/app/pdump/main.c
+++ b/app/pdump/main.c
@@ -552,6 +552,7 @@ cleanup_pdump_resources(void)
}
}
+ rte_pdump_uninit();
cleanup_rings();
}
@@ -822,6 +823,9 @@ enable_pdump(void)
struct pdump_tuples *pt;
int ret = 0, ret1 = 0;
+ if (rte_pdump_init() < 0)
+ rte_exit(EXIT_FAILURE, "pdump init failed\n");
+
for (i = 0; i < num_tuples; i++) {
pt = &pdump_t[i];
if (pt->dir == RTE_PDUMP_FLAG_RXTX) {
--
2.47.2
^ permalink raw reply [flat|nested] 7+ messages in thread
* [RFC 6/6] pdump: forward callback enable to secondary
2025-08-11 21:34 [RFC 0/6] Packet capture fixes Stephen Hemminger
` (4 preceding siblings ...)
2025-08-11 21:35 ` [RFC 5/6] pdump: " Stephen Hemminger
@ 2025-08-11 21:35 ` Stephen Hemminger
5 siblings, 0 replies; 7+ messages in thread
From: Stephen Hemminger @ 2025-08-11 21:35 UTC (permalink / raw)
To: dev; +Cc: Stephen Hemminger
When packet capture is enabled, need to also notify
secondary processes to force them to do the callbacks.
Requires that all secondary processes also call rte_pdump_init()
or there will be warning about not responding secondary.
Signed-off-by: Stephen Hemminger <stephen@networkplumber.org>
---
lib/pdump/rte_pdump.c | 213 ++++++++++++++++++++++++++++++++++++------
1 file changed, 185 insertions(+), 28 deletions(-)
diff --git a/lib/pdump/rte_pdump.c b/lib/pdump/rte_pdump.c
index bfd63fa8c2..b95f7f863d 100644
--- a/lib/pdump/rte_pdump.c
+++ b/lib/pdump/rte_pdump.c
@@ -5,6 +5,7 @@
#include <stdlib.h>
#include <eal_export.h>
+#include <rte_alarm.h>
#include <rte_mbuf.h>
#include <rte_ethdev.h>
#include <rte_lcore.h>
@@ -26,11 +27,28 @@ RTE_LOG_REGISTER_DEFAULT(pdump_logtype, NOTICE);
/* Used for the multi-process communication */
#define PDUMP_MP "mp_pdump"
+/* Overly generous timeout for secondary to respond */
+#define MP_TIMEOUT_S 5
+
enum pdump_operation {
DISABLE = 1,
ENABLE = 2
};
+static inline const char *
+pdump_opname(enum pdump_operation op)
+{
+ static char buf[32];
+
+ if (op == DISABLE)
+ return "disable";
+ if (op == ENABLE)
+ return "enable";
+
+ snprintf(buf, sizeof(buf), "op%u", op);
+ return buf;
+}
+
/* Internal version number in request */
enum pdump_version {
V1 = 1, /* no filtering or snap */
@@ -56,6 +74,11 @@ struct pdump_response {
int32_t err_value;
};
+struct pdump_bundle {
+ struct rte_mp_msg msg;
+ char peer[];
+};
+
static struct pdump_rxtx_cbs {
struct rte_ring *ring;
struct rte_mempool *mp;
@@ -432,34 +455,150 @@ set_pdump_rxtx_cbs(const struct pdump_request *p)
return ret;
}
+static void
+pdump_request_to_secondary(const struct pdump_request *req)
+{
+ struct rte_mp_msg mp_req = { };
+ struct rte_mp_reply mp_reply;
+ struct timespec ts = {.tv_sec = MP_TIMEOUT_S, .tv_nsec = 0};
+
+ PDUMP_LOG_LINE(DEBUG, "forward req %s to secondary", pdump_opname(req->op));
+
+ memcpy(mp_req.param, req, sizeof(*req));
+ strlcpy(mp_req.name, PDUMP_MP, sizeof(mp_req.name));
+ mp_req.len_param = sizeof(*req);
+
+ if (rte_mp_request_sync(&mp_req, &mp_reply, &ts) != 0)
+ PDUMP_LOG_LINE(ERR, "rte_mp_request_sync failed");
+
+ else if (mp_reply.nb_sent != mp_reply.nb_received)
+ PDUMP_LOG_LINE(ERR, "not all secondary's replied (sent %u recv %u)",
+ mp_reply.nb_sent, mp_reply.nb_received);
+
+ free(mp_reply.msgs);
+}
+
+/* Allocate temporary storage for passing state to the alarm thread for deferred handling */
+static struct pdump_bundle *
+pdump_bundle_alloc(const struct rte_mp_msg *mp_msg, const char *peer)
+{
+ struct pdump_bundle *bundle;
+ size_t peer_len = strlen(peer) + 1;
+
+ /* peer is the unix domain socket path */
+ bundle = malloc(sizeof(*bundle) + peer_len);
+ if (bundle == NULL)
+ return NULL;
+
+ bundle->msg = *mp_msg;
+ memcpy(bundle->peer, peer, peer_len);
+ return bundle;
+}
+
+/* Send response to peer */
static int
-pdump_server(const struct rte_mp_msg *mp_msg, const void *peer)
+pdump_send_response(const struct pdump_request *req, int result, const void *peer)
{
- struct rte_mp_msg mp_resp;
- const struct pdump_request *cli_req;
- struct pdump_response *resp = (struct pdump_response *)&mp_resp.param;
+ struct rte_mp_msg mp_resp = { };
+ struct pdump_response *resp = (struct pdump_response *)mp_resp.param;
+ int ret;
- /* recv client requests */
- if (mp_msg->len_param != sizeof(*cli_req)) {
- PDUMP_LOG_LINE(ERR, "failed to recv from client");
- resp->err_value = -EINVAL;
- } else {
- cli_req = (const struct pdump_request *)mp_msg->param;
- resp->ver = cli_req->ver;
- resp->res_op = cli_req->op;
- resp->err_value = set_pdump_rxtx_cbs(cli_req);
+ if (req) {
+ resp->ver = req->ver;
+ resp->res_op = req->op;
}
+ resp->err_value = result;
rte_strscpy(mp_resp.name, PDUMP_MP, RTE_MP_MAX_NAME_LEN);
mp_resp.len_param = sizeof(*resp);
- mp_resp.num_fds = 0;
- if (rte_mp_reply(&mp_resp, peer) < 0) {
- PDUMP_LOG_LINE(ERR, "failed to send to client:%s",
+
+ ret = rte_mp_reply(&mp_resp, peer);
+ if (ret != 0)
+ PDUMP_LOG_LINE(ERR, "failed to send response: %s",
strerror(rte_errno));
- return -1;
+ return ret;
+}
+
+/* Callback from MP request handler in secondary process */
+static int
+pdump_handle_primary_request(const struct rte_mp_msg *mp_msg, const void *peer)
+{
+ const struct pdump_request *req = NULL;
+ int ret;
+
+ if (mp_msg->len_param != sizeof(*req)) {
+ PDUMP_LOG_LINE(ERR, "invalid request from primary");
+ ret = -EINVAL;
+ } else {
+ req = (const struct pdump_request *)mp_msg->param;
+ PDUMP_LOG_LINE(DEBUG, "secondary pdump %s", pdump_opname(req->op));
+
+ /* Can just do it now, no need for interrupt thread */
+ ret = set_pdump_rxtx_cbs(req);
}
+ return pdump_send_response(req, ret, peer);
+
+}
+
+/* Callback from the alarm handler (in interrupt thread) which does actual change */
+static void
+__pdump_request(void *param)
+{
+ struct pdump_bundle *bundle = param;
+ struct rte_mp_msg *msg = &bundle->msg;
+ const struct pdump_request *req =
+ (const struct pdump_request *)msg->param;
+ int ret;
+
+ PDUMP_LOG_LINE(DEBUG, "primary pdump %s", pdump_opname(req->op));
+
+ ret = set_pdump_rxtx_cbs(req);
+ ret = pdump_send_response(req, ret, bundle->peer);
+
+ /* Primary process is responsible for broadcasting request to all secondaries */
+ if (ret == 0)
+ pdump_request_to_secondary(req);
+
+ free(bundle);
+}
+
+/* Callback from MP request handler in primary process */
+static int
+pdump_handle_secondary_request(const struct rte_mp_msg *mp_msg, const void *peer)
+{
+ struct pdump_bundle *bundle = NULL;
+ const struct pdump_request *req = NULL;
+ int ret;
+
+ if (mp_msg->len_param != sizeof(*req)) {
+ PDUMP_LOG_LINE(ERR, "invalid request from secondary");
+ ret = -EINVAL;
+ goto error;
+ }
+
+ req = (const struct pdump_request *)mp_msg->param;
+
+ bundle = pdump_bundle_alloc(mp_msg, peer);
+ if (bundle == NULL) {
+ PDUMP_LOG_LINE(ERR, "not enough memory");
+ ret = -ENOMEM;
+ goto error;
+ }
+
+ /*
+ * We are in IPC callback thread, sync IPC is not possible
+ * since sending to secondary would cause livelock.
+ * Delegate the task to interrupt thread.
+ */
+ ret = rte_eal_alarm_set(1, __pdump_request, bundle);
+ if (ret != 0)
+ goto error;
return 0;
+
+error:
+ free(bundle);
+ return pdump_send_response(req, ret, peer);
}
RTE_EXPORT_SYMBOL(rte_pdump_init)
@@ -469,19 +608,36 @@ rte_pdump_init(void)
const struct rte_memzone *mz;
int ret;
- mz = rte_memzone_reserve(MZ_RTE_PDUMP_STATS, sizeof(*pdump_stats),
- SOCKET_ID_ANY, 0);
- if (mz == NULL) {
- PDUMP_LOG_LINE(ERR, "cannot allocate pdump statistics");
- rte_errno = ENOMEM;
- return -1;
+ if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
+ ret = rte_mp_action_register(PDUMP_MP, pdump_handle_secondary_request);
+ if (ret && rte_errno != ENOTSUP)
+ return -1;
+
+ mz = rte_memzone_reserve(MZ_RTE_PDUMP_STATS, sizeof(*pdump_stats),
+ SOCKET_ID_ANY, 0);
+ if (mz == NULL) {
+ PDUMP_LOG_LINE(ERR, "cannot allocate pdump statistics");
+ rte_mp_action_unregister(PDUMP_MP);
+ rte_errno = ENOMEM;
+ return -1;
+ }
+ } else {
+ ret = rte_mp_action_register(PDUMP_MP, pdump_handle_primary_request);
+ if (ret && rte_errno != ENOTSUP)
+ return -1;
+
+ mz = rte_memzone_lookup(MZ_RTE_PDUMP_STATS);
+ if (mz == NULL) {
+ PDUMP_LOG_LINE(ERR, "cannot find pdump statistics");
+ rte_mp_action_unregister(PDUMP_MP);
+ rte_errno = ENOENT;
+ return -1;
+ }
}
+
pdump_stats = mz->addr;
pdump_stats->mz = mz;
- ret = rte_mp_action_register(PDUMP_MP, pdump_server);
- if (ret && rte_errno != ENOTSUP)
- return -1;
return 0;
}
@@ -491,7 +647,7 @@ rte_pdump_uninit(void)
{
rte_mp_action_unregister(PDUMP_MP);
- if (pdump_stats != NULL) {
+ if (rte_eal_process_type() == RTE_PROC_PRIMARY && pdump_stats != NULL) {
rte_memzone_free(pdump_stats->mz);
pdump_stats = NULL;
}
@@ -580,11 +736,12 @@ pdump_prepare_client_request(const char *device, uint16_t queue,
int ret = -1;
struct rte_mp_msg mp_req, *mp_rep;
struct rte_mp_reply mp_reply;
- struct timespec ts = {.tv_sec = 5, .tv_nsec = 0};
+ struct timespec ts = {.tv_sec = MP_TIMEOUT_S, .tv_nsec = 0};
struct pdump_request *req = (struct pdump_request *)mp_req.param;
struct pdump_response *resp;
if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
+ /* FIXME */
PDUMP_LOG_LINE(ERR,
"pdump enable/disable not allowed in primary process");
return -EINVAL;
--
2.47.2
^ permalink raw reply [flat|nested] 7+ messages in thread