DPDK patches and discussions
 help / color / mirror / Atom feed
* [RFC PATCH] usertools: add telemetry exporter
@ 2023-09-26 16:34 Robin Jarry
  2023-11-20 13:26 ` Robin Jarry
                   ` (2 more replies)
  0 siblings, 3 replies; 7+ messages in thread
From: Robin Jarry @ 2023-09-26 16:34 UTC (permalink / raw)
  To: dev; +Cc: Robin Jarry

For now the telemetry socket is local to the machine running a DPDK
application. Also, there is no official "schema" for the exposed
metrics. Add a framework and a script to collect and expose these
metrics to telemetry and observability agree gators such as Prometheus,
Carbon or Influxdb. The exposed data must be done with end-users in
mind, some DPDK terminology or internals may not make sense to everyone.

The script only serves as an entry point and does not know anything
about any specific metrics nor JSON data structures exposed in the
telemetry socket.

It uses dynamically loaded endpoint exporters which are basic python
files that must implement two functions:

 def info() -> dict[MetricName, MetricInfo]:
     Mapping of metric names to their description and type.

 def metrics(sock: TelemetrySocket) -> list[MetricValue]:
     Request data from sock and return it as metric values. A metric
     value is a 3-tuple: (name: str, value: any, labels: dict). Each
     name must be present in info().

The sock argument passed to metrics() has a single method:

 def cmd(self, uri: str, arg: any = None) -> dict | list:
     Request JSON data to the telemetry socket and parse it to python
     values.

The main script invokes endpoints and exports the data into an output
format. For now, only two formats are implemented:

* openmetrics/prometheus: text based format exported via a local HTTP
  server.
* carbon/graphite: binary (python pickle) format exported to a distant
  carbon TCP server.

As a starting point, 3 built-in endpoints are implemented:

* counters: ethdev hardware counters
* cpu: lcore usage
* memory: overall memory usage

The goal is to keep all built-in endpoints in the DPDK repository so
that they can be updated along with the telemetry JSON data structures.

Example output for the openmetrics:// format:

 ~# dpdk-telemetry-exporter.py -o openmetrics://:9876 &
 INFO using endpoint: counters (from .../telemetry-endpoints/counters.py)
 INFO using endpoint: cpu (from .../telemetry-endpoints/cpu.py)
 INFO using endpoint: memory (from .../telemetry-endpoints/memory.py)
 INFO listening on port 9876
 [1] 838829

 ~$ curl http://127.0.0.1:9876/
 # HELP dpdk_cpu_total_cycles Total number of CPU cycles.
 # TYPE dpdk_cpu_total_cycles counter
 # HELP dpdk_cpu_busy_cycles Number of busy CPU cycles.
 # TYPE dpdk_cpu_busy_cycles counter
 dpdk_cpu_total_cycles{cpu="73", numa="0"} 4353385274702980
 dpdk_cpu_busy_cycles{cpu="73", numa="0"} 6215932860
 dpdk_cpu_total_cycles{cpu="9", numa="0"} 4353385274745740
 dpdk_cpu_busy_cycles{cpu="9", numa="0"} 6215932860
 dpdk_cpu_total_cycles{cpu="8", numa="0"} 4353383451895540
 dpdk_cpu_busy_cycles{cpu="8", numa="0"} 6171923160
 dpdk_cpu_total_cycles{cpu="72", numa="0"} 4353385274817320
 dpdk_cpu_busy_cycles{cpu="72", numa="0"} 6215932860
 # HELP dpdk_memory_total_bytes The total size of reserved memory in bytes.
 # TYPE dpdk_memory_total_bytes gauge
 # HELP dpdk_memory_used_bytes The currently used memory in bytes.
 # TYPE dpdk_memory_used_bytes gauge
 dpdk_memory_total_bytes 1073741824
 dpdk_memory_used_bytes 794197376

Link: https://prometheus.io/docs/instrumenting/exposition_formats/#text-based-format
Link: https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#text-format
Link: https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-pickle-protocol
Link: https://github.com/influxdata/telegraf/tree/master/plugins/inputs/prometheus
Signed-off-by: Robin Jarry <rjarry@redhat.com>
---

Notes:
    v1:
    
    * Ideally, this script should be tested in CI to avoid breakage when the
      telemetry data structures change. I don't know where such a test could
      be done.
    
    * There was work done 3/4 years ago in collectd to add DPDK telemetry
      support but I think this has now been abandoned.
    
      https://github.com/collectd/collectd/blob/main/src/dpdk_telemetry.c
    
      I think that keeping the exporters in the DPDK repository makes more
      sense from a maintainability perspective.

 usertools/dpdk-telemetry-exporter.py      | 376 ++++++++++++++++++++++
 usertools/meson.build                     |   6 +
 usertools/telemetry-endpoints/counters.py |  47 +++
 usertools/telemetry-endpoints/cpu.py      |  29 ++
 usertools/telemetry-endpoints/memory.py   |  37 +++
 5 files changed, 495 insertions(+)
 create mode 100755 usertools/dpdk-telemetry-exporter.py
 create mode 100644 usertools/telemetry-endpoints/counters.py
 create mode 100644 usertools/telemetry-endpoints/cpu.py
 create mode 100644 usertools/telemetry-endpoints/memory.py

diff --git a/usertools/dpdk-telemetry-exporter.py b/usertools/dpdk-telemetry-exporter.py
new file mode 100755
index 000000000000..6c1495a9e1e8
--- /dev/null
+++ b/usertools/dpdk-telemetry-exporter.py
@@ -0,0 +1,376 @@
+#!/usr/bin/env python3
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright (c) 2023 Robin Jarry
+
+'''
+DPDK telemetry exporter.
+
+It uses dynamically loaded endpoint exporters which are basic python files that
+must implement two functions:
+
+    def info() -> dict[MetricName, MetricInfo]:
+        """
+        Mapping of metric names to their description and type.
+        """
+
+    def metrics(sock: TelemetrySocket) -> list[MetricValue]:
+        """
+        Request data from sock and return it as metric values. A metric value
+        is a 3-tuple: (name: str, value: any, labels: dict). Each name must be
+        present in info().
+        """
+
+The sock argument passed to metrics() has a single method:
+
+    def cmd(self, uri, arg=None) -> dict | list:
+        """
+        Request JSON data to the telemetry socket and parse it to python
+        values.
+        """
+
+See existing endpoints for examples.
+
+The exporter supports multiple output formats:
+
+prometheus://ADDRESS:PORT
+openmetrics://ADDRESS:PORT
+  Expose the enabled endpoints via a local HTTP server listening on the
+  specified address and port. GET requests on that server are served with
+  text/plain responses in the prometheus/openmetrics format.
+
+  More details:
+  https://prometheus.io/docs/instrumenting/exposition_formats/#text-based-format
+
+carbon://ADDRESS:PORT
+graphite://ADDRESS:PORT
+  Export all enabled endpoints to the specified TCP ADDRESS:PORT in the pickle
+  carbon format.
+
+  More details:
+  https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-pickle-protocol
+'''
+
+import argparse
+from http import HTTPStatus, server
+import importlib.util
+import json
+import logging
+import os
+import pickle
+import re
+import socket
+import struct
+import sys
+import time
+import typing
+from urllib.parse import urlparse
+
+
+LOG = logging.getLogger(__name__)
+# Use local endpoints path only when running from source
+LOCAL = os.path.join(os.path.dirname(__file__), "telemetry-endpoints")
+DEFAULT_LOAD_PATHS = []
+if os.path.isdir(LOCAL):
+    DEFAULT_LOAD_PATHS.append(LOCAL)
+DEFAULT_LOAD_PATHS += [
+    "/usr/local/share/dpdk/telemetry-endpoints",
+    "/usr/share/dpdk/telemetry-endpoints",
+]
+DEFAULT_OUTPUT = "openmetrics://:9876"
+
+
+def main():
+    logging.basicConfig(
+        stream=sys.stdout,
+        level=logging.INFO,
+        format="%(asctime)s %(levelname)s %(message)s",
+        datefmt="%Y-%m-%d %H:%M:%S",
+    )
+    parser = argparse.ArgumentParser(
+        description=__doc__,
+        formatter_class=argparse.RawDescriptionHelpFormatter,
+    )
+    parser.add_argument(
+        "-o",
+        "--output",
+        metavar="FORMAT://PARAMETERS",
+        default=urlparse(DEFAULT_OUTPUT),
+        type=urlparse,
+        help=f"""
+        Output format (default: "{DEFAULT_OUTPUT}"). Depending on the format,
+        URL elements have different meanings. By default, the exporter starts a
+        local HTTP server on port 9876 that serves requests in the
+        prometheus/openmetrics plain text format.
+        """,
+    )
+    parser.add_argument(
+        "-p",
+        "--load-path",
+        dest="load_paths",
+        type=lambda v: v.split(os.pathsep),
+        default=DEFAULT_LOAD_PATHS,
+        help=f"""
+        The list of paths from which to disvover endpoints.
+        (default: "{os.pathsep.join(DEFAULT_LOAD_PATHS)}").
+        """,
+    )
+    parser.add_argument(
+        "-e",
+        "--endpoint",
+        dest="endpoints",
+        action="append",
+        help="""
+        Telemetry endpoint to export (by default, all discovered endpoints are
+        enabled). This option can be specified more than once.
+        """,
+    )
+    parser.add_argument(
+        "-l",
+        "--list",
+        action="store_true",
+        help="""
+        Only list detected endpoints and exit.
+        """,
+    )
+    parser.add_argument(
+        "-s",
+        "--socket-path",
+        default="/run/dpdk/rte/dpdk_telemetry.v2",
+        help="""
+        The DPDK telemetry socket path (default: "%(default)s").
+        """,
+    )
+    args = parser.parse_args()
+    output = OUTPUT_FORMATS.get(args.output.scheme)
+    if output is None:
+        parser.error(f"unsupported output format: {args.output.scheme}://")
+    try:
+        endpoints = load_endpoints(args.load_paths, args.endpoints)
+        if args.list:
+            return
+        output(args, endpoints)
+    except KeyboardInterrupt:
+        pass
+    except Exception:
+        LOG.exception("")
+
+
+class TelemetrySocket:
+    """
+    Abstraction of the DPDK telemetry socket.
+    """
+
+    def __init__(self, path: str):
+        self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
+        self.sock.connect(path)
+        data = json.loads(self.sock.recv(1024).decode())
+        self.max_output_len = data["max_output_len"]
+
+    def cmd(
+        self, uri: str, arg: typing.Any = None
+    ) -> typing.Optional[typing.Union[dict, list]]:
+        """
+        Request JSON data to the telemetry socket and parse it to python
+        values.
+        """
+        if arg is not None:
+            u = f"{uri},{arg}"
+        else:
+            u = uri
+        self.sock.send(u.encode("utf-8"))
+        data = self.sock.recv(self.max_output_len)
+        return json.loads(data.decode("utf-8"))[uri]
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *args, **kwargs):
+        self.sock.close()
+
+
+MetricDescription = str
+MetricType = str
+MetricName = str
+MetricLabels = typing.Dict[str, typing.Any]
+MetricInfo = typing.Tuple[MetricDescription, MetricType]
+MetricValue = typing.Tuple[MetricName, typing.Any, MetricLabels]
+
+
+class TelemetryEndpoint:
+    """
+    Placeholder class only used for typing annotations.
+    """
+
+    @staticmethod
+    def info() -> typing.Dict[MetricName, MetricInfo]:
+        """
+        Mapping of metric names to their description and type.
+        """
+        raise NotImplementedError()
+
+    @staticmethod
+    def metrics(sock: TelemetrySocket) -> typing.List[MetricValue]:
+        """
+        Request data from sock and return it as metric values. Each metric
+        name must be present in info().
+        """
+        raise NotImplementedError()
+
+
+def load_endpoints(
+    paths: typing.List[str], names: typing.List[str]
+) -> typing.List[TelemetryEndpoint]:
+    """
+    Load selected telemetry endpoints from the specified paths.
+    """
+
+    endpoints = {}
+    dwb = sys.dont_write_bytecode
+    sys.dont_write_bytecode = True  # never generate .pyc files for endpoints
+
+    for p in paths:
+        if not os.path.isdir(p):
+            continue
+        for fname in os.listdir(p):
+            f = os.path.join(p, fname)
+            if os.path.isdir(f):
+                continue
+            try:
+                name, _ = os.path.splitext(fname)
+                if names is not None and name not in names:
+                    # not selected by user
+                    continue
+                if name in endpoints:
+                    # endpoint with same name already loaded
+                    continue
+                spec = importlib.util.spec_from_file_location(name, f)
+                module = importlib.util.module_from_spec(spec)
+                spec.loader.exec_module(module)
+                endpoints[name] = module
+            except Exception:
+                LOG.exception("parsing endpoint: %s", f)
+
+    sys.dont_write_bytecode = dwb
+
+    modules = []
+    info = {}
+    for name, module in sorted(endpoints.items()):
+        LOG.info("using endpoint: %s (from %s)", name, module.__file__)
+        try:
+            for metric, (description, type_) in module.info().items():
+                info[(name, metric)] = (description, type_)
+            modules.append(module)
+        except Exception:
+            LOG.exception("getting endpoint info: %s", name)
+    return modules
+
+
+def serve_openmetrics(
+    args: argparse.Namespace, endpoints: typing.List[TelemetryEndpoint]
+):
+    """
+    Start an HTTP server and serve requests in the openmetrics/prometheus
+    format.
+    """
+    listen = (args.output.hostname or "", int(args.output.port or 80))
+    with server.HTTPServer(listen, OpenmetricsHandler) as httpd:
+        httpd.dpdk_socket_path = args.socket_path
+        httpd.telemetry_endpoints = endpoints
+        LOG.info("listening on port %s", httpd.server_port)
+        httpd.serve_forever()
+
+
+class OpenmetricsHandler(server.BaseHTTPRequestHandler):
+    """
+    Basic HTTP handler that returns prometheus/openmetrics formatted responses.
+    """
+
+    CONTENT_TYPE = "text/plain; version=0.0.4; charset=utf-8"
+
+    def escape(self, value: typing.Any) -> str:
+        """
+        Escape a metric label value.
+        """
+        value = str(value)
+        value = value.replace('"', '\\"')
+        value = value.replace("\\", "\\\\")
+        return value.replace("\n", "\\n")
+
+    def do_GET(self):
+        """
+        Called uppon GET requests.
+        """
+        try:
+            lines = []
+            metrics_names = set()
+            with TelemetrySocket(self.server.dpdk_socket_path) as sock:
+                for e in self.server.telemetry_endpoints:
+                    info = e.info()
+                    metrics_lines = []
+                    for name, value, labels in e.metrics(sock):
+                        fullname = re.sub(r"\W", "_", f"dpdk_{e.__name__}_{name}")
+                        labels = ", ".join(
+                            f'{k}="{self.escape(v)}"' for k, v in labels.items()
+                        )
+                        if labels:
+                            labels = f"{{{labels}}}"
+                        metrics_lines.append(f"{fullname}{labels} {value}")
+                        if fullname not in metrics_names:
+                            metrics_names.add(fullname)
+                            desc, metric_type = info[name]
+                            lines += [
+                                f"# HELP {fullname} {desc}",
+                                f"# TYPE {fullname} {metric_type}",
+                            ]
+                    lines += metrics_lines
+            body = "\n".join(lines).encode("utf-8") + b"\n"
+            self.send_response(HTTPStatus.OK)
+            self.send_header("Content-Type", self.CONTENT_TYPE)
+            self.send_header("Content-Length", str(len(body)))
+            self.end_headers()
+            self.wfile.write(body)
+            LOG.info("%s %s", self.address_string(), self.requestline)
+
+        except Exception as e:
+            if isinstance(e, (FileNotFoundError, ConnectionRefusedError)):
+                self.send_error(HTTPStatus.SERVICE_UNAVAILABLE)
+            else:
+                self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR)
+            LOG.exception("%s %s", self.address_string(), self.requestline)
+
+    def log_message(self, fmt, *args):
+        pass  # disable built-in logger
+
+
+def export_carbon(args: argparse.Namespace, endpoints: typing.List[TelemetryEndpoint]):
+    """
+    Collect all metrics and export them to a carbon server in the pickle format.
+    """
+    addr = (args.output.hostname or "", int(args.output.port or 80))
+    with TelemetrySocket(args.socket_path) as dpdk:
+        with socket.socket() as carbon:
+            carbon.connect(addr)
+            metrics = []
+            for e in endpoints:
+                for name, value, labels in e.metrics(dpdk):
+                    fullname = re.sub(r"\W", ".", f"dpdk.{e.__name__}.{name}")
+                    for key, val in labels.items():
+                        val = str(val).replace(";", "")
+                        fullname += f";{key}={val}"
+                    metrics.append((fullname, (time.time(), value)))
+            payload = pickle.dumps(metrics, protocol=2)
+            header = struct.pack("!L", len(payload))
+            buf = header + payload
+            carbon.sendall(buf)
+
+
+OUTPUT_FORMATS = {
+    "openmetrics": serve_openmetrics,
+    "prometheus": serve_openmetrics,
+    "carbon": export_carbon,
+    "graphite": export_carbon,
+}
+
+
+if __name__ == "__main__":
+    main()
diff --git a/usertools/meson.build b/usertools/meson.build
index 740b4832f36d..eb48e2f4403f 100644
--- a/usertools/meson.build
+++ b/usertools/meson.build
@@ -11,5 +11,11 @@ install_data([
             'dpdk-telemetry.py',
             'dpdk-hugepages.py',
             'dpdk-rss-flows.py',
+            'dpdk-telemetry-exporter.py',
         ],
         install_dir: 'bin')
+
+install_subdir(
+        'telemetry-endpoints',
+        install_dir: 'share/dpdk',
+        strip_directory: false)
diff --git a/usertools/telemetry-endpoints/counters.py b/usertools/telemetry-endpoints/counters.py
new file mode 100644
index 000000000000..e17cffb43b2c
--- /dev/null
+++ b/usertools/telemetry-endpoints/counters.py
@@ -0,0 +1,47 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright (c) 2023 Robin Jarry
+
+RX_PACKETS = "rx_packets"
+RX_BYTES = "rx_bytes"
+RX_MISSED = "rx_missed"
+RX_NOMBUF = "rx_nombuf"
+RX_ERRORS = "rx_errors"
+TX_PACKETS = "tx_packets"
+TX_BYTES = "tx_bytes"
+TX_ERRORS = "tx_errors"
+
+
+def info() -> "dict[Name, tuple[Description, Type]]":
+    return {
+        RX_PACKETS: ("Number of successfully received packets.", "counter"),
+        RX_BYTES: ("Number of successfully received bytes.", "counter"),
+        RX_MISSED: (
+            "Number of packets dropped by the HW because Rx queues are full.",
+            "counter",
+        ),
+        RX_NOMBUF: ("Number of Rx mbuf allocation failures.", "counter"),
+        RX_ERRORS: ("Number of erroneous received packets.", "counter"),
+        TX_PACKETS: ("Number of successfully transmitted packets.", "counter"),
+        TX_BYTES: ("Number of successfully transmitted bytes.", "counter"),
+        TX_ERRORS: ("Number of packet transmission failures.", "counter"),
+    }
+
+
+def metrics(sock: "TelemetrySocket") -> "list[tuple[Name, Value, Labels]]":
+    out = []
+    for port_id in sock.cmd("/ethdev/list"):
+        port = sock.cmd("/ethdev/info", port_id)
+        stats = sock.cmd("/ethdev/stats", port_id)
+        labels = {"port": port["name"]}
+        out += [
+            (RX_PACKETS, stats["ipackets"], labels),
+            (RX_PACKETS, stats["ipackets"], labels),
+            (RX_BYTES, stats["ibytes"], labels),
+            (RX_MISSED, stats["imissed"], labels),
+            (RX_NOMBUF, stats["rx_nombuf"], labels),
+            (RX_ERRORS, stats["ierrors"], labels),
+            (TX_PACKETS, stats["opackets"], labels),
+            (TX_BYTES, stats["obytes"], labels),
+            (TX_ERRORS, stats["oerrors"], labels),
+        ]
+    return out
diff --git a/usertools/telemetry-endpoints/cpu.py b/usertools/telemetry-endpoints/cpu.py
new file mode 100644
index 000000000000..d38d8d6e2558
--- /dev/null
+++ b/usertools/telemetry-endpoints/cpu.py
@@ -0,0 +1,29 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright (c) 2023 Robin Jarry
+
+CPU_TOTAL = "total_cycles"
+CPU_BUSY = "busy_cycles"
+
+
+def info() -> "dict[Name, tuple[Description, Type]]":
+    return {
+        CPU_TOTAL: ("Total number of CPU cycles.", "counter"),
+        CPU_BUSY: ("Number of busy CPU cycles.", "counter"),
+    }
+
+
+def metrics(sock: "TelemetrySocket") -> "list[tuple[Name, Value, Labels]]":
+    out = []
+    for lcore_id in sock.cmd("/eal/lcore/list"):
+        lcore = sock.cmd("/eal/lcore/info", lcore_id)
+        cpu = ",".join(str(c) for c in lcore.get("cpuset", []))
+        total = lcore.get("total_cycles")
+        busy = lcore.get("busy_cycles", 0)
+        if not (cpu and total):
+            continue
+        labels = {"cpu": cpu, "numa": lcore.get("socket", 0)}
+        out += [
+            (CPU_TOTAL, total, labels),
+            (CPU_BUSY, busy, labels),
+        ]
+    return out
diff --git a/usertools/telemetry-endpoints/memory.py b/usertools/telemetry-endpoints/memory.py
new file mode 100644
index 000000000000..32cce1e59382
--- /dev/null
+++ b/usertools/telemetry-endpoints/memory.py
@@ -0,0 +1,37 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright (c) 2023 Robin Jarry
+
+MEM_TOTAL = "total_bytes"
+MEM_USED = "used_bytes"
+
+
+def info() -> "dict[Name, tuple[Description, Type]]":
+    return {
+        MEM_TOTAL: ("The total size of reserved memory in bytes.", "gauge"),
+        MEM_USED: ("The currently used memory in bytes.", "gauge"),
+    }
+
+
+def metrics(sock: "TelemetrySocket") -> "list[tuple[Name, Value, Labels]]":
+    zones = {}
+    used = 0
+    for zone in sock.cmd("/eal/memzone_list") or []:
+        z = sock.cmd("/eal/memzone_info", zone)
+        start = int(z["Hugepage_base"], 16)
+        end = start + (z["Hugepage_size"] * z["Hugepage_used"])
+        used += z["Length"]
+        for s, e in list(zones.items()):
+            if s < start < e < end:
+                zones[s] = end
+                break
+            if start < s < end < e:
+                del zones[s]
+                zones[start] = e
+                break
+        else:
+            zones[start] = end
+
+    return [
+        (MEM_TOTAL, sum(end - start for (start, end) in zones.items()), {}),
+        (MEM_USED, max(0, used), {}),
+    ]
-- 
2.41.0


^ permalink raw reply	[flat|nested] 7+ messages in thread

* Re: [RFC PATCH] usertools: add telemetry exporter
  2023-09-26 16:34 [RFC PATCH] usertools: add telemetry exporter Robin Jarry
@ 2023-11-20 13:26 ` Robin Jarry
  2024-03-27 15:18 ` Anthony Harivel
  2024-04-16 13:46 ` [PATCH v2] " Robin Jarry
  2 siblings, 0 replies; 7+ messages in thread
From: Robin Jarry @ 2023-11-20 13:26 UTC (permalink / raw)
  To: dev

Gentle ping.

Is anybody interested in this?


^ permalink raw reply	[flat|nested] 7+ messages in thread

* [RFC PATCH] usertools: add telemetry exporter
  2023-09-26 16:34 [RFC PATCH] usertools: add telemetry exporter Robin Jarry
  2023-11-20 13:26 ` Robin Jarry
@ 2024-03-27 15:18 ` Anthony Harivel
  2024-04-01 21:28   ` Robin Jarry
  2024-04-16 13:46 ` [PATCH v2] " Robin Jarry
  2 siblings, 1 reply; 7+ messages in thread
From: Anthony Harivel @ 2024-03-27 15:18 UTC (permalink / raw)
  To: rjarry; +Cc: dev

Hi Robin, 

Thanks for this patch. I did test it and it works as expected. 
Nonetheless, maybe we can improve on some parts. 

In 'class  TelemetrySocket', there is:
...
self.sock.connect(path)
data = json.loads(self.sock.recv(1024).decode())
...

Maybe we can improve with something like: 

        try:
	    rcv_data = self.sock.recv(1024)

            if rcv_data:
                data = json.loads(rcv_data.decode())
            else:
                print("No data received from socket.")
        except json.JSONDecodeError as e:
                print("Error decoding JSON:", e)
        except Exception as e:
                print("An error occurred:", e)

So that it handles a bit better the error cases.

In the same way to implement more robust error handling mechanisms in:
def load_endpoints
...
except Exception as e:
    LOG.error("Failed to load endpoint module '%s' from '%s': %s", name, f, e)
...

For example, you might catch FileNotFoundError, ImportError, or SyntaxError.
That could help to debug!


About TelemetryEndpoint I would see something like: 

class TelemetryEndpoint:
    """
    Placeholder class only used for typing annotations.
    """

    @staticmethod
    def info() -> typing.Dict[MetricName, MetricInfo]:
        """
        Mapping of metric names to their description and type.
        """
        raise NotImplementedError()

    @staticmethod
    def metrics(sock: TelemetrySocket) -> typing.List[MetricValue]:
        """
        Request data from sock and return it as metric values. Each metric
        name must be present in info().
        """
        try:
            metrics = []
            metrics_data = sock.fetch_metrics_data()
            for metric_name, metric_value in metrics_data.items():
                metrics.append((metric_name, metric_value, {}))
            return metrics
        except Exception as e:
            LOG.error("Failed to fetch metrics data: %s", e)
            # If unable to fetch metrics data, return an empty list
            return []

With these changes, the metrics method of the TelemetryEndpoint class could
handle errors better and the exporter can continue functioning even if there
are issues with fetching metrics data.

I don't know if all of that makes sens or if it's just nitpicking !
I can also propose an enhanced version of your patch if you prefer.

Regards,
Anthony


^ permalink raw reply	[flat|nested] 7+ messages in thread

* Re: [RFC PATCH] usertools: add telemetry exporter
  2024-03-27 15:18 ` Anthony Harivel
@ 2024-04-01 21:28   ` Robin Jarry
  0 siblings, 0 replies; 7+ messages in thread
From: Robin Jarry @ 2024-04-01 21:28 UTC (permalink / raw)
  To: Anthony Harivel; +Cc: dev

Anthony Harivel, Mar 27, 2024 at 16:18:
> Hi Robin,
>
> Thanks for this patch. I did test it and it works as expected. 
> Nonetheless, maybe we can improve on some parts.

Hey Anthony, thanks a lot for testing!

> In 'class  TelemetrySocket', there is:
> ...
> self.sock.connect(path)
> data = json.loads(self.sock.recv(1024).decode())
> ...
>
> Maybe we can improve with something like:
>
>         try:
>             rcv_data = self.sock.recv(1024)
>             if rcv_data:
>                 data = json.loads(rcv_data.decode())
>             else:
>                 print("No data received from socket.")
>         except json.JSONDecodeError as e:
>                 print("Error decoding JSON:", e)
>         except Exception as e:
>                 print("An error occurred:", e)
>
> So that it handles a bit better the error cases.

This is undesired as it would actually mask the errors from the calling 
code. Unless you can do something about the error, printing it should be 
left to the calling code. I will handle these errors better in v2.

> In the same way to implement more robust error handling mechanisms in:
> def load_endpoints
> ...
> except Exception as e:
>     LOG.error("Failed to load endpoint module '%s' from '%s': %s", name, f, e)
> ...
>
> For example, you might catch FileNotFoundError, ImportError, or SyntaxError.
> That could help to debug!

We could print the whole stack trace but I don't see what special 
handling could be done depending on the exception. I will try to make 
this better for v2.

> About TelemetryEndpoint I would see something like:
>
> class TelemetryEndpoint:
>     """
>     Placeholder class only used for typing annotations.
>     """
>
>     @staticmethod
>     def info() -> typing.Dict[MetricName, MetricInfo]:
>         """
>         Mapping of metric names to their description and type.
>         """
>         raise NotImplementedError()
>
>     @staticmethod
>     def metrics(sock: TelemetrySocket) -> typing.List[MetricValue]:
>         """
>         Request data from sock and return it as metric values. Each metric
>         name must be present in info().
>         """
>         try:
>             metrics = []
>             metrics_data = sock.fetch_metrics_data()
>             for metric_name, metric_value in metrics_data.items():
>                 metrics.append((metric_name, metric_value, {}))
>             return metrics
>         except Exception as e:
>             LOG.error("Failed to fetch metrics data: %s", e)
>             # If unable to fetch metrics data, return an empty list
>             return []
>
> With these changes, the metrics method of the TelemetryEndpoint class 
> could handle errors better and the exporter can continue functioning 
> even if there are issues with fetching metrics data.
>
> I don't know if all of that makes sens or if it's just nitpicking! 
> I can also propose an enhanced version of your patch if you prefer.

As indicated in the docstring, this class is merely a placeholder. Its 
code is never executed. It only stands to represent what functions must 
be implemented in endpoints.

However, your point is valid. I will update my code to handle errors in 
endpoints more gracefully and avoid failing the whole request if there 
is only one error.

Thanks for the thorough review!


^ permalink raw reply	[flat|nested] 7+ messages in thread

* [PATCH v2] usertools: add telemetry exporter
  2023-09-26 16:34 [RFC PATCH] usertools: add telemetry exporter Robin Jarry
  2023-11-20 13:26 ` Robin Jarry
  2024-03-27 15:18 ` Anthony Harivel
@ 2024-04-16 13:46 ` Robin Jarry
  2024-04-22  7:17   ` Anthony Harivel
  2024-07-09  4:50   ` David Marchand
  2 siblings, 2 replies; 7+ messages in thread
From: Robin Jarry @ 2024-04-16 13:46 UTC (permalink / raw)
  To: dev; +Cc: Anthony Harivel

For now the telemetry socket is local to the machine running a DPDK
application. Also, there is no official "schema" for the exposed
metrics. Add a framework and a script to collect and expose these
metrics to telemetry and observability agree gators such as Prometheus,
Carbon or Influxdb. The exposed data must be done with end-users in
mind, some DPDK terminology or internals may not make sense to everyone.

The script only serves as an entry point and does not know anything
about any specific metrics nor JSON data structures exposed in the
telemetry socket.

It uses dynamically loaded endpoint exporters which are basic python
files that must implement two functions:

 def info() -> dict[MetricName, MetricInfo]:
     Mapping of metric names to their description and type.

 def metrics(sock: TelemetrySocket) -> list[MetricValue]:
     Request data from sock and return it as metric values. A metric
     value is a 3-tuple: (name: str, value: any, labels: dict). Each
     name must be present in info().

The sock argument passed to metrics() has a single method:

 def cmd(self, uri: str, arg: any = None) -> dict | list:
     Request JSON data to the telemetry socket and parse it to python
     values.

The main script invokes endpoints and exports the data into an output
format. For now, only two formats are implemented:

* openmetrics/prometheus: text based format exported via a local HTTP
  server.
* carbon/graphite: binary (python pickle) format exported to a distant
  carbon TCP server.

As a starting point, 3 built-in endpoints are implemented:

* counters: ethdev hardware counters
* cpu: lcore usage
* memory: overall memory usage

The goal is to keep all built-in endpoints in the DPDK repository so
that they can be updated along with the telemetry JSON data structures.

Example output for the openmetrics:// format:

 ~# dpdk-telemetry-exporter.py -o openmetrics://:9876 &
 INFO using endpoint: counters (from .../telemetry-endpoints/counters.py)
 INFO using endpoint: cpu (from .../telemetry-endpoints/cpu.py)
 INFO using endpoint: memory (from .../telemetry-endpoints/memory.py)
 INFO listening on port 9876
 [1] 838829

 ~$ curl http://127.0.0.1:9876/
 # HELP dpdk_cpu_total_cycles Total number of CPU cycles.
 # TYPE dpdk_cpu_total_cycles counter
 # HELP dpdk_cpu_busy_cycles Number of busy CPU cycles.
 # TYPE dpdk_cpu_busy_cycles counter
 dpdk_cpu_total_cycles{cpu="73", numa="0"} 4353385274702980
 dpdk_cpu_busy_cycles{cpu="73", numa="0"} 6215932860
 dpdk_cpu_total_cycles{cpu="9", numa="0"} 4353385274745740
 dpdk_cpu_busy_cycles{cpu="9", numa="0"} 6215932860
 dpdk_cpu_total_cycles{cpu="8", numa="0"} 4353383451895540
 dpdk_cpu_busy_cycles{cpu="8", numa="0"} 6171923160
 dpdk_cpu_total_cycles{cpu="72", numa="0"} 4353385274817320
 dpdk_cpu_busy_cycles{cpu="72", numa="0"} 6215932860
 # HELP dpdk_memory_total_bytes The total size of reserved memory in bytes.
 # TYPE dpdk_memory_total_bytes gauge
 # HELP dpdk_memory_used_bytes The currently used memory in bytes.
 # TYPE dpdk_memory_used_bytes gauge
 dpdk_memory_total_bytes 1073741824
 dpdk_memory_used_bytes 794197376

Link: https://prometheus.io/docs/instrumenting/exposition_formats/#text-based-format
Link: https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#text-format
Link: https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-pickle-protocol
Link: https://github.com/influxdata/telegraf/tree/master/plugins/inputs/prometheus
Signed-off-by: Robin Jarry <rjarry@redhat.com>
---

Notes:
    v2:
    
    * Refuse to run if no endpoints are enabled.
    * Handle endpoint errors gracefully without failing the whole query.

 usertools/dpdk-telemetry-exporter.py      | 405 ++++++++++++++++++++++
 usertools/meson.build                     |   6 +
 usertools/telemetry-endpoints/counters.py |  47 +++
 usertools/telemetry-endpoints/cpu.py      |  29 ++
 usertools/telemetry-endpoints/memory.py   |  37 ++
 5 files changed, 524 insertions(+)
 create mode 100755 usertools/dpdk-telemetry-exporter.py
 create mode 100644 usertools/telemetry-endpoints/counters.py
 create mode 100644 usertools/telemetry-endpoints/cpu.py
 create mode 100644 usertools/telemetry-endpoints/memory.py

diff --git a/usertools/dpdk-telemetry-exporter.py b/usertools/dpdk-telemetry-exporter.py
new file mode 100755
index 000000000000..f8d873ad856c
--- /dev/null
+++ b/usertools/dpdk-telemetry-exporter.py
@@ -0,0 +1,405 @@
+#!/usr/bin/env python3
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright (c) 2023 Robin Jarry
+
+r'''
+DPDK telemetry exporter.
+
+It uses dynamically loaded endpoint exporters which are basic python files that
+must implement two functions:
+
+    def info() -> dict[MetricName, MetricInfo]:
+        """
+        Mapping of metric names to their description and type.
+        """
+
+    def metrics(sock: TelemetrySocket) -> list[MetricValue]:
+        """
+        Request data from sock and return it as metric values. A metric value
+        is a 3-tuple: (name: str, value: any, labels: dict). Each name must be
+        present in info().
+        """
+
+The sock argument passed to metrics() has a single method:
+
+    def cmd(self, uri, arg=None) -> dict | list:
+        """
+        Request JSON data to the telemetry socket and parse it to python
+        values.
+        """
+
+See existing endpoints for examples.
+
+The exporter supports multiple output formats:
+
+prometheus://ADDRESS:PORT
+openmetrics://ADDRESS:PORT
+  Expose the enabled endpoints via a local HTTP server listening on the
+  specified address and port. GET requests on that server are served with
+  text/plain responses in the prometheus/openmetrics format.
+
+  More details:
+  https://prometheus.io/docs/instrumenting/exposition_formats/#text-based-format
+
+carbon://ADDRESS:PORT
+graphite://ADDRESS:PORT
+  Export all enabled endpoints to the specified TCP ADDRESS:PORT in the pickle
+  carbon format.
+
+  More details:
+  https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-pickle-protocol
+'''
+
+import argparse
+import importlib.util
+import json
+import logging
+import os
+import pickle
+import re
+import socket
+import struct
+import sys
+import time
+import typing
+from http import HTTPStatus, server
+from urllib.parse import urlparse
+
+LOG = logging.getLogger(__name__)
+# Use local endpoints path only when running from source
+LOCAL = os.path.join(os.path.dirname(__file__), "telemetry-endpoints")
+DEFAULT_LOAD_PATHS = []
+if os.path.isdir(LOCAL):
+    DEFAULT_LOAD_PATHS.append(LOCAL)
+DEFAULT_LOAD_PATHS += [
+    "/usr/local/share/dpdk/telemetry-endpoints",
+    "/usr/share/dpdk/telemetry-endpoints",
+]
+DEFAULT_OUTPUT = "openmetrics://:9876"
+
+
+def main():
+    logging.basicConfig(
+        stream=sys.stdout,
+        level=logging.INFO,
+        format="%(asctime)s %(levelname)s %(message)s",
+        datefmt="%Y-%m-%d %H:%M:%S",
+    )
+    parser = argparse.ArgumentParser(
+        description=__doc__,
+        formatter_class=argparse.RawDescriptionHelpFormatter,
+    )
+    parser.add_argument(
+        "-o",
+        "--output",
+        metavar="FORMAT://PARAMETERS",
+        default=urlparse(DEFAULT_OUTPUT),
+        type=urlparse,
+        help=f"""
+        Output format (default: "{DEFAULT_OUTPUT}"). Depending on the format,
+        URL elements have different meanings. By default, the exporter starts a
+        local HTTP server on port 9876 that serves requests in the
+        prometheus/openmetrics plain text format.
+        """,
+    )
+    parser.add_argument(
+        "-p",
+        "--load-path",
+        dest="load_paths",
+        type=lambda v: v.split(os.pathsep),
+        default=DEFAULT_LOAD_PATHS,
+        help=f"""
+        The list of paths from which to disvover endpoints.
+        (default: "{os.pathsep.join(DEFAULT_LOAD_PATHS)}").
+        """,
+    )
+    parser.add_argument(
+        "-e",
+        "--endpoint",
+        dest="endpoints",
+        metavar="ENDPOINT",
+        action="append",
+        help="""
+        Telemetry endpoint to export (by default, all discovered endpoints are
+        enabled). This option can be specified more than once.
+        """,
+    )
+    parser.add_argument(
+        "-l",
+        "--list",
+        action="store_true",
+        help="""
+        Only list detected endpoints and exit.
+        """,
+    )
+    parser.add_argument(
+        "-s",
+        "--socket-path",
+        default="/run/dpdk/rte/dpdk_telemetry.v2",
+        help="""
+        The DPDK telemetry socket path (default: "%(default)s").
+        """,
+    )
+    args = parser.parse_args()
+    output = OUTPUT_FORMATS.get(args.output.scheme)
+    if output is None:
+        parser.error(f"unsupported output format: {args.output.scheme}://")
+
+    try:
+        endpoints = load_endpoints(args.load_paths, args.endpoints)
+        if args.list:
+            return
+    except Exception as e:
+        parser.error(str(e))
+
+    output(args, endpoints)
+
+
+class TelemetrySocket:
+    """
+    Abstraction of the DPDK telemetry socket.
+    """
+
+    def __init__(self, path: str):
+        self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
+        self.sock.connect(path)
+        data = json.loads(self.sock.recv(1024).decode())
+        self.max_output_len = data["max_output_len"]
+
+    def cmd(
+        self, uri: str, arg: typing.Any = None
+    ) -> typing.Optional[typing.Union[dict, list]]:
+        """
+        Request JSON data to the telemetry socket and parse it to python
+        values.
+        """
+        if arg is not None:
+            u = f"{uri},{arg}"
+        else:
+            u = uri
+        self.sock.send(u.encode("utf-8"))
+        data = self.sock.recv(self.max_output_len)
+        return json.loads(data.decode("utf-8"))[uri]
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *args, **kwargs):
+        self.sock.close()
+
+
+MetricDescription = str
+MetricType = str
+MetricName = str
+MetricLabels = typing.Dict[str, typing.Any]
+MetricInfo = typing.Tuple[MetricDescription, MetricType]
+MetricValue = typing.Tuple[MetricName, typing.Any, MetricLabels]
+
+
+class TelemetryEndpoint:
+    """
+    Placeholder class only used for typing annotations.
+    """
+
+    @staticmethod
+    def info() -> typing.Dict[MetricName, MetricInfo]:
+        """
+        Mapping of metric names to their description and type.
+        """
+        raise NotImplementedError()
+
+    @staticmethod
+    def metrics(sock: TelemetrySocket) -> typing.List[MetricValue]:
+        """
+        Request data from sock and return it as metric values. Each metric
+        name must be present in info().
+        """
+        raise NotImplementedError()
+
+
+def load_endpoints(
+    paths: typing.List[str], names: typing.List[str]
+) -> typing.List[TelemetryEndpoint]:
+    """
+    Load selected telemetry endpoints from the specified paths.
+    """
+
+    endpoints = {}
+    dwb = sys.dont_write_bytecode
+    sys.dont_write_bytecode = True  # never generate .pyc files for endpoints
+
+    for p in paths:
+        if not os.path.isdir(p):
+            continue
+        for fname in os.listdir(p):
+            f = os.path.join(p, fname)
+            if os.path.isdir(f):
+                continue
+            try:
+                name, _ = os.path.splitext(fname)
+                if names is not None and name not in names:
+                    # not selected by user
+                    continue
+                if name in endpoints:
+                    # endpoint with same name already loaded
+                    continue
+                spec = importlib.util.spec_from_file_location(name, f)
+                module = importlib.util.module_from_spec(spec)
+                spec.loader.exec_module(module)
+                endpoints[name] = module
+            except Exception:
+                LOG.exception("parsing endpoint: %s", f)
+
+    if not endpoints:
+        raise Exception("no telemetry endpoints detected/selected")
+
+    sys.dont_write_bytecode = dwb
+
+    modules = []
+    info = {}
+    for name, module in sorted(endpoints.items()):
+        LOG.info("using endpoint: %s (from %s)", name, module.__file__)
+        try:
+            for metric, (description, type_) in module.info().items():
+                info[(name, metric)] = (description, type_)
+            modules.append(module)
+        except Exception:
+            LOG.exception("getting endpoint info: %s", name)
+    return modules
+
+
+def serve_openmetrics(
+    args: argparse.Namespace, endpoints: typing.List[TelemetryEndpoint]
+):
+    """
+    Start an HTTP server and serve requests in the openmetrics/prometheus
+    format.
+    """
+    listen = (args.output.hostname or "", int(args.output.port or 80))
+    with server.HTTPServer(listen, OpenmetricsHandler) as httpd:
+        httpd.dpdk_socket_path = args.socket_path
+        httpd.telemetry_endpoints = endpoints
+        LOG.info("listening on port %s", httpd.server_port)
+        try:
+            httpd.serve_forever()
+        except KeyboardInterrupt:
+            LOG.info("shutting down")
+
+
+class OpenmetricsHandler(server.BaseHTTPRequestHandler):
+    """
+    Basic HTTP handler that returns prometheus/openmetrics formatted responses.
+    """
+
+    CONTENT_TYPE = "text/plain; version=0.0.4; charset=utf-8"
+
+    def escape(self, value: typing.Any) -> str:
+        """
+        Escape a metric label value.
+        """
+        value = str(value)
+        value = value.replace('"', '\\"')
+        value = value.replace("\\", "\\\\")
+        return value.replace("\n", "\\n")
+
+    def do_GET(self):
+        """
+        Called uppon GET requests.
+        """
+        try:
+            lines = []
+            metrics_names = set()
+            with TelemetrySocket(self.server.dpdk_socket_path) as sock:
+                for e in self.server.telemetry_endpoints:
+                    info = e.info()
+                    metrics_lines = []
+                    try:
+                        metrics = e.metrics(sock)
+                    except Exception:
+                        LOG.exception("%s: metrics collection failed", e.__name__)
+                        continue
+                    for name, value, labels in metrics:
+                        fullname = re.sub(r"\W", "_", f"dpdk_{e.__name__}_{name}")
+                        labels = ", ".join(
+                            f'{k}="{self.escape(v)}"' for k, v in labels.items()
+                        )
+                        if labels:
+                            labels = f"{{{labels}}}"
+                        metrics_lines.append(f"{fullname}{labels} {value}")
+                        if fullname not in metrics_names:
+                            metrics_names.add(fullname)
+                            desc, metric_type = info[name]
+                            lines += [
+                                f"# HELP {fullname} {desc}",
+                                f"# TYPE {fullname} {metric_type}",
+                            ]
+                    lines += metrics_lines
+            if not lines:
+                self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR)
+                LOG.error(
+                    "%s %s: no metrics collected",
+                    self.address_string(),
+                    self.requestline,
+                )
+            body = "\n".join(lines).encode("utf-8") + b"\n"
+            self.send_response(HTTPStatus.OK)
+            self.send_header("Content-Type", self.CONTENT_TYPE)
+            self.send_header("Content-Length", str(len(body)))
+            self.end_headers()
+            self.wfile.write(body)
+            LOG.info("%s %s", self.address_string(), self.requestline)
+
+        except (FileNotFoundError, ConnectionRefusedError):
+            self.send_error(HTTPStatus.SERVICE_UNAVAILABLE)
+            LOG.exception(
+                "%s %s: telemetry socket not available",
+                self.address_string(),
+                self.requestline,
+            )
+        except Exception:
+            self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR)
+            LOG.exception("%s %s", self.address_string(), self.requestline)
+
+    def log_message(self, fmt, *args):
+        pass  # disable built-in logger
+
+
+def export_carbon(args: argparse.Namespace, endpoints: typing.List[TelemetryEndpoint]):
+    """
+    Collect all metrics and export them to a carbon server in the pickle format.
+    """
+    addr = (args.output.hostname or "", int(args.output.port or 80))
+    with TelemetrySocket(args.socket_path) as dpdk:
+        with socket.socket() as carbon:
+            carbon.connect(addr)
+            all_metrics = []
+            for e in endpoints:
+                try:
+                    metrics = e.metrics(dpdk)
+                except Exception:
+                    LOG.exception("%s: metrics collection failed", e.__name__)
+                    continue
+                for name, value, labels in metrics:
+                    fullname = re.sub(r"\W", ".", f"dpdk.{e.__name__}.{name}")
+                    for key, val in labels.items():
+                        val = str(val).replace(";", "")
+                        fullname += f";{key}={val}"
+                    all_metrics.append((fullname, (time.time(), value)))
+            if not all_metrics:
+                raise Exception("no metrics collected")
+            payload = pickle.dumps(all_metrics, protocol=2)
+            header = struct.pack("!L", len(payload))
+            buf = header + payload
+            carbon.sendall(buf)
+
+
+OUTPUT_FORMATS = {
+    "openmetrics": serve_openmetrics,
+    "prometheus": serve_openmetrics,
+    "carbon": export_carbon,
+    "graphite": export_carbon,
+}
+
+
+if __name__ == "__main__":
+    main()
diff --git a/usertools/meson.build b/usertools/meson.build
index 740b4832f36d..eb48e2f4403f 100644
--- a/usertools/meson.build
+++ b/usertools/meson.build
@@ -11,5 +11,11 @@ install_data([
             'dpdk-telemetry.py',
             'dpdk-hugepages.py',
             'dpdk-rss-flows.py',
+            'dpdk-telemetry-exporter.py',
         ],
         install_dir: 'bin')
+
+install_subdir(
+        'telemetry-endpoints',
+        install_dir: 'share/dpdk',
+        strip_directory: false)
diff --git a/usertools/telemetry-endpoints/counters.py b/usertools/telemetry-endpoints/counters.py
new file mode 100644
index 000000000000..e17cffb43b2c
--- /dev/null
+++ b/usertools/telemetry-endpoints/counters.py
@@ -0,0 +1,47 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright (c) 2023 Robin Jarry
+
+RX_PACKETS = "rx_packets"
+RX_BYTES = "rx_bytes"
+RX_MISSED = "rx_missed"
+RX_NOMBUF = "rx_nombuf"
+RX_ERRORS = "rx_errors"
+TX_PACKETS = "tx_packets"
+TX_BYTES = "tx_bytes"
+TX_ERRORS = "tx_errors"
+
+
+def info() -> "dict[Name, tuple[Description, Type]]":
+    return {
+        RX_PACKETS: ("Number of successfully received packets.", "counter"),
+        RX_BYTES: ("Number of successfully received bytes.", "counter"),
+        RX_MISSED: (
+            "Number of packets dropped by the HW because Rx queues are full.",
+            "counter",
+        ),
+        RX_NOMBUF: ("Number of Rx mbuf allocation failures.", "counter"),
+        RX_ERRORS: ("Number of erroneous received packets.", "counter"),
+        TX_PACKETS: ("Number of successfully transmitted packets.", "counter"),
+        TX_BYTES: ("Number of successfully transmitted bytes.", "counter"),
+        TX_ERRORS: ("Number of packet transmission failures.", "counter"),
+    }
+
+
+def metrics(sock: "TelemetrySocket") -> "list[tuple[Name, Value, Labels]]":
+    out = []
+    for port_id in sock.cmd("/ethdev/list"):
+        port = sock.cmd("/ethdev/info", port_id)
+        stats = sock.cmd("/ethdev/stats", port_id)
+        labels = {"port": port["name"]}
+        out += [
+            (RX_PACKETS, stats["ipackets"], labels),
+            (RX_PACKETS, stats["ipackets"], labels),
+            (RX_BYTES, stats["ibytes"], labels),
+            (RX_MISSED, stats["imissed"], labels),
+            (RX_NOMBUF, stats["rx_nombuf"], labels),
+            (RX_ERRORS, stats["ierrors"], labels),
+            (TX_PACKETS, stats["opackets"], labels),
+            (TX_BYTES, stats["obytes"], labels),
+            (TX_ERRORS, stats["oerrors"], labels),
+        ]
+    return out
diff --git a/usertools/telemetry-endpoints/cpu.py b/usertools/telemetry-endpoints/cpu.py
new file mode 100644
index 000000000000..d38d8d6e2558
--- /dev/null
+++ b/usertools/telemetry-endpoints/cpu.py
@@ -0,0 +1,29 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright (c) 2023 Robin Jarry
+
+CPU_TOTAL = "total_cycles"
+CPU_BUSY = "busy_cycles"
+
+
+def info() -> "dict[Name, tuple[Description, Type]]":
+    return {
+        CPU_TOTAL: ("Total number of CPU cycles.", "counter"),
+        CPU_BUSY: ("Number of busy CPU cycles.", "counter"),
+    }
+
+
+def metrics(sock: "TelemetrySocket") -> "list[tuple[Name, Value, Labels]]":
+    out = []
+    for lcore_id in sock.cmd("/eal/lcore/list"):
+        lcore = sock.cmd("/eal/lcore/info", lcore_id)
+        cpu = ",".join(str(c) for c in lcore.get("cpuset", []))
+        total = lcore.get("total_cycles")
+        busy = lcore.get("busy_cycles", 0)
+        if not (cpu and total):
+            continue
+        labels = {"cpu": cpu, "numa": lcore.get("socket", 0)}
+        out += [
+            (CPU_TOTAL, total, labels),
+            (CPU_BUSY, busy, labels),
+        ]
+    return out
diff --git a/usertools/telemetry-endpoints/memory.py b/usertools/telemetry-endpoints/memory.py
new file mode 100644
index 000000000000..32cce1e59382
--- /dev/null
+++ b/usertools/telemetry-endpoints/memory.py
@@ -0,0 +1,37 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright (c) 2023 Robin Jarry
+
+MEM_TOTAL = "total_bytes"
+MEM_USED = "used_bytes"
+
+
+def info() -> "dict[Name, tuple[Description, Type]]":
+    return {
+        MEM_TOTAL: ("The total size of reserved memory in bytes.", "gauge"),
+        MEM_USED: ("The currently used memory in bytes.", "gauge"),
+    }
+
+
+def metrics(sock: "TelemetrySocket") -> "list[tuple[Name, Value, Labels]]":
+    zones = {}
+    used = 0
+    for zone in sock.cmd("/eal/memzone_list") or []:
+        z = sock.cmd("/eal/memzone_info", zone)
+        start = int(z["Hugepage_base"], 16)
+        end = start + (z["Hugepage_size"] * z["Hugepage_used"])
+        used += z["Length"]
+        for s, e in list(zones.items()):
+            if s < start < e < end:
+                zones[s] = end
+                break
+            if start < s < end < e:
+                del zones[s]
+                zones[start] = e
+                break
+        else:
+            zones[start] = end
+
+    return [
+        (MEM_TOTAL, sum(end - start for (start, end) in zones.items()), {}),
+        (MEM_USED, max(0, used), {}),
+    ]
-- 
2.44.0


^ permalink raw reply	[flat|nested] 7+ messages in thread

* Re: [PATCH v2] usertools: add telemetry exporter
  2024-04-16 13:46 ` [PATCH v2] " Robin Jarry
@ 2024-04-22  7:17   ` Anthony Harivel
  2024-07-09  4:50   ` David Marchand
  1 sibling, 0 replies; 7+ messages in thread
From: Anthony Harivel @ 2024-04-22  7:17 UTC (permalink / raw)
  To: Robin Jarry, dev

Hi Robin,

I've tested your patch and this is all good for me. 
The errors are better handled. 

The most common one is when the socket telemetry is closed and you 
get:

2024-04-19 15:22:00 ERROR 192.168.122.116 GET /metrics HTTP/1.1: telemetry socket not available
Traceback (most recent call last):                            
  File "/usr/bin/dpdk-telemetry-exporter.py", line 312, in do_GET
    with TelemetrySocket(self.server.dpdk_socket_path) as sock: 
  File "/usr/bin/dpdk-telemetry-exporter.py", line 165, in __init__
    self.sock.connect(path)                                   
FileNotFoundError: [Errno 2] No such file or directory

You get the Traceback of Python which is a bit useless for the user but 
at least you have at the first line the root cause: 
"telemetry socket not available" which is IMO the most important.

Thanks for you patch !

Tested-by: Anthony Harivel <aharivel@redhat.com>

Regards,
Anthony

Robin Jarry, Apr 16, 2024 at 15:46:
> For now the telemetry socket is local to the machine running a DPDK
> application. Also, there is no official "schema" for the exposed
> metrics. Add a framework and a script to collect and expose these
> metrics to telemetry and observability agree gators such as Prometheus,
> Carbon or Influxdb. The exposed data must be done with end-users in
> mind, some DPDK terminology or internals may not make sense to everyone.
>
> The script only serves as an entry point and does not know anything
> about any specific metrics nor JSON data structures exposed in the
> telemetry socket.
>
> It uses dynamically loaded endpoint exporters which are basic python
> files that must implement two functions:
>
>  def info() -> dict[MetricName, MetricInfo]:
>      Mapping of metric names to their description and type.
>
>  def metrics(sock: TelemetrySocket) -> list[MetricValue]:
>      Request data from sock and return it as metric values. A metric
>      value is a 3-tuple: (name: str, value: any, labels: dict). Each
>      name must be present in info().
>
> The sock argument passed to metrics() has a single method:
>
>  def cmd(self, uri: str, arg: any = None) -> dict | list:
>      Request JSON data to the telemetry socket and parse it to python
>      values.
>
> The main script invokes endpoints and exports the data into an output
> format. For now, only two formats are implemented:
>
> * openmetrics/prometheus: text based format exported via a local HTTP
>   server.
> * carbon/graphite: binary (python pickle) format exported to a distant
>   carbon TCP server.
>
> As a starting point, 3 built-in endpoints are implemented:
>
> * counters: ethdev hardware counters
> * cpu: lcore usage
> * memory: overall memory usage
>
> The goal is to keep all built-in endpoints in the DPDK repository so
> that they can be updated along with the telemetry JSON data structures.
>
> Example output for the openmetrics:// format:
>
>  ~# dpdk-telemetry-exporter.py -o openmetrics://:9876 &
>  INFO using endpoint: counters (from .../telemetry-endpoints/counters.py)
>  INFO using endpoint: cpu (from .../telemetry-endpoints/cpu.py)
>  INFO using endpoint: memory (from .../telemetry-endpoints/memory.py)
>  INFO listening on port 9876
>  [1] 838829
>
>  ~$ curl http://127.0.0.1:9876/
>  # HELP dpdk_cpu_total_cycles Total number of CPU cycles.
>  # TYPE dpdk_cpu_total_cycles counter
>  # HELP dpdk_cpu_busy_cycles Number of busy CPU cycles.
>  # TYPE dpdk_cpu_busy_cycles counter
>  dpdk_cpu_total_cycles{cpu="73", numa="0"} 4353385274702980
>  dpdk_cpu_busy_cycles{cpu="73", numa="0"} 6215932860
>  dpdk_cpu_total_cycles{cpu="9", numa="0"} 4353385274745740
>  dpdk_cpu_busy_cycles{cpu="9", numa="0"} 6215932860
>  dpdk_cpu_total_cycles{cpu="8", numa="0"} 4353383451895540
>  dpdk_cpu_busy_cycles{cpu="8", numa="0"} 6171923160
>  dpdk_cpu_total_cycles{cpu="72", numa="0"} 4353385274817320
>  dpdk_cpu_busy_cycles{cpu="72", numa="0"} 6215932860
>  # HELP dpdk_memory_total_bytes The total size of reserved memory in bytes.
>  # TYPE dpdk_memory_total_bytes gauge
>  # HELP dpdk_memory_used_bytes The currently used memory in bytes.
>  # TYPE dpdk_memory_used_bytes gauge
>  dpdk_memory_total_bytes 1073741824
>  dpdk_memory_used_bytes 794197376
>
> Link: https://prometheus.io/docs/instrumenting/exposition_formats/#text-based-format
> Link: https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#text-format
> Link: https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-pickle-protocol
> Link: https://github.com/influxdata/telegraf/tree/master/plugins/inputs/prometheus
> Signed-off-by: Robin Jarry <rjarry@redhat.com>
> ---
>
> Notes:
>     v2:
>     
>     * Refuse to run if no endpoints are enabled.
>     * Handle endpoint errors gracefully without failing the whole query.
>
>  usertools/dpdk-telemetry-exporter.py      | 405 ++++++++++++++++++++++
>  usertools/meson.build                     |   6 +
>  usertools/telemetry-endpoints/counters.py |  47 +++
>  usertools/telemetry-endpoints/cpu.py      |  29 ++
>  usertools/telemetry-endpoints/memory.py   |  37 ++
>  5 files changed, 524 insertions(+)
>  create mode 100755 usertools/dpdk-telemetry-exporter.py
>  create mode 100644 usertools/telemetry-endpoints/counters.py
>  create mode 100644 usertools/telemetry-endpoints/cpu.py
>  create mode 100644 usertools/telemetry-endpoints/memory.py
>
> diff --git a/usertools/dpdk-telemetry-exporter.py b/usertools/dpdk-telemetry-exporter.py
> new file mode 100755
> index 000000000000..f8d873ad856c
> --- /dev/null
> +++ b/usertools/dpdk-telemetry-exporter.py
> @@ -0,0 +1,405 @@
> +#!/usr/bin/env python3
> +# SPDX-License-Identifier: BSD-3-Clause
> +# Copyright (c) 2023 Robin Jarry
> +
> +r'''
> +DPDK telemetry exporter.
> +
> +It uses dynamically loaded endpoint exporters which are basic python files that
> +must implement two functions:
> +
> +    def info() -> dict[MetricName, MetricInfo]:
> +        """
> +        Mapping of metric names to their description and type.
> +        """
> +
> +    def metrics(sock: TelemetrySocket) -> list[MetricValue]:
> +        """
> +        Request data from sock and return it as metric values. A metric value
> +        is a 3-tuple: (name: str, value: any, labels: dict). Each name must be
> +        present in info().
> +        """
> +
> +The sock argument passed to metrics() has a single method:
> +
> +    def cmd(self, uri, arg=None) -> dict | list:
> +        """
> +        Request JSON data to the telemetry socket and parse it to python
> +        values.
> +        """
> +
> +See existing endpoints for examples.
> +
> +The exporter supports multiple output formats:
> +
> +prometheus://ADDRESS:PORT
> +openmetrics://ADDRESS:PORT
> +  Expose the enabled endpoints via a local HTTP server listening on the
> +  specified address and port. GET requests on that server are served with
> +  text/plain responses in the prometheus/openmetrics format.
> +
> +  More details:
> +  https://prometheus.io/docs/instrumenting/exposition_formats/#text-based-format
> +
> +carbon://ADDRESS:PORT
> +graphite://ADDRESS:PORT
> +  Export all enabled endpoints to the specified TCP ADDRESS:PORT in the pickle
> +  carbon format.
> +
> +  More details:
> +  https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-pickle-protocol
> +'''
> +
> +import argparse
> +import importlib.util
> +import json
> +import logging
> +import os
> +import pickle
> +import re
> +import socket
> +import struct
> +import sys
> +import time
> +import typing
> +from http import HTTPStatus, server
> +from urllib.parse import urlparse
> +
> +LOG = logging.getLogger(__name__)
> +# Use local endpoints path only when running from source
> +LOCAL = os.path.join(os.path.dirname(__file__), "telemetry-endpoints")
> +DEFAULT_LOAD_PATHS = []
> +if os.path.isdir(LOCAL):
> +    DEFAULT_LOAD_PATHS.append(LOCAL)
> +DEFAULT_LOAD_PATHS += [
> +    "/usr/local/share/dpdk/telemetry-endpoints",
> +    "/usr/share/dpdk/telemetry-endpoints",
> +]
> +DEFAULT_OUTPUT = "openmetrics://:9876"
> +
> +
> +def main():
> +    logging.basicConfig(
> +        stream=sys.stdout,
> +        level=logging.INFO,
> +        format="%(asctime)s %(levelname)s %(message)s",
> +        datefmt="%Y-%m-%d %H:%M:%S",
> +    )
> +    parser = argparse.ArgumentParser(
> +        description=__doc__,
> +        formatter_class=argparse.RawDescriptionHelpFormatter,
> +    )
> +    parser.add_argument(
> +        "-o",
> +        "--output",
> +        metavar="FORMAT://PARAMETERS",
> +        default=urlparse(DEFAULT_OUTPUT),
> +        type=urlparse,
> +        help=f"""
> +        Output format (default: "{DEFAULT_OUTPUT}"). Depending on the format,
> +        URL elements have different meanings. By default, the exporter starts a
> +        local HTTP server on port 9876 that serves requests in the
> +        prometheus/openmetrics plain text format.
> +        """,
> +    )
> +    parser.add_argument(
> +        "-p",
> +        "--load-path",
> +        dest="load_paths",
> +        type=lambda v: v.split(os.pathsep),
> +        default=DEFAULT_LOAD_PATHS,
> +        help=f"""
> +        The list of paths from which to disvover endpoints.
> +        (default: "{os.pathsep.join(DEFAULT_LOAD_PATHS)}").
> +        """,
> +    )
> +    parser.add_argument(
> +        "-e",
> +        "--endpoint",
> +        dest="endpoints",
> +        metavar="ENDPOINT",
> +        action="append",
> +        help="""
> +        Telemetry endpoint to export (by default, all discovered endpoints are
> +        enabled). This option can be specified more than once.
> +        """,
> +    )
> +    parser.add_argument(
> +        "-l",
> +        "--list",
> +        action="store_true",
> +        help="""
> +        Only list detected endpoints and exit.
> +        """,
> +    )
> +    parser.add_argument(
> +        "-s",
> +        "--socket-path",
> +        default="/run/dpdk/rte/dpdk_telemetry.v2",
> +        help="""
> +        The DPDK telemetry socket path (default: "%(default)s").
> +        """,
> +    )
> +    args = parser.parse_args()
> +    output = OUTPUT_FORMATS.get(args.output.scheme)
> +    if output is None:
> +        parser.error(f"unsupported output format: {args.output.scheme}://")
> +
> +    try:
> +        endpoints = load_endpoints(args.load_paths, args.endpoints)
> +        if args.list:
> +            return
> +    except Exception as e:
> +        parser.error(str(e))
> +
> +    output(args, endpoints)
> +
> +
> +class TelemetrySocket:
> +    """
> +    Abstraction of the DPDK telemetry socket.
> +    """
> +
> +    def __init__(self, path: str):
> +        self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
> +        self.sock.connect(path)
> +        data = json.loads(self.sock.recv(1024).decode())
> +        self.max_output_len = data["max_output_len"]
> +
> +    def cmd(
> +        self, uri: str, arg: typing.Any = None
> +    ) -> typing.Optional[typing.Union[dict, list]]:
> +        """
> +        Request JSON data to the telemetry socket and parse it to python
> +        values.
> +        """
> +        if arg is not None:
> +            u = f"{uri},{arg}"
> +        else:
> +            u = uri
> +        self.sock.send(u.encode("utf-8"))
> +        data = self.sock.recv(self.max_output_len)
> +        return json.loads(data.decode("utf-8"))[uri]
> +
> +    def __enter__(self):
> +        return self
> +
> +    def __exit__(self, *args, **kwargs):
> +        self.sock.close()
> +
> +
> +MetricDescription = str
> +MetricType = str
> +MetricName = str
> +MetricLabels = typing.Dict[str, typing.Any]
> +MetricInfo = typing.Tuple[MetricDescription, MetricType]
> +MetricValue = typing.Tuple[MetricName, typing.Any, MetricLabels]
> +
> +
> +class TelemetryEndpoint:
> +    """
> +    Placeholder class only used for typing annotations.
> +    """
> +
> +    @staticmethod
> +    def info() -> typing.Dict[MetricName, MetricInfo]:
> +        """
> +        Mapping of metric names to their description and type.
> +        """
> +        raise NotImplementedError()
> +
> +    @staticmethod
> +    def metrics(sock: TelemetrySocket) -> typing.List[MetricValue]:
> +        """
> +        Request data from sock and return it as metric values. Each metric
> +        name must be present in info().
> +        """
> +        raise NotImplementedError()
> +
> +
> +def load_endpoints(
> +    paths: typing.List[str], names: typing.List[str]
> +) -> typing.List[TelemetryEndpoint]:
> +    """
> +    Load selected telemetry endpoints from the specified paths.
> +    """
> +
> +    endpoints = {}
> +    dwb = sys.dont_write_bytecode
> +    sys.dont_write_bytecode = True  # never generate .pyc files for endpoints
> +
> +    for p in paths:
> +        if not os.path.isdir(p):
> +            continue
> +        for fname in os.listdir(p):
> +            f = os.path.join(p, fname)
> +            if os.path.isdir(f):
> +                continue
> +            try:
> +                name, _ = os.path.splitext(fname)
> +                if names is not None and name not in names:
> +                    # not selected by user
> +                    continue
> +                if name in endpoints:
> +                    # endpoint with same name already loaded
> +                    continue
> +                spec = importlib.util.spec_from_file_location(name, f)
> +                module = importlib.util.module_from_spec(spec)
> +                spec.loader.exec_module(module)
> +                endpoints[name] = module
> +            except Exception:
> +                LOG.exception("parsing endpoint: %s", f)
> +
> +    if not endpoints:
> +        raise Exception("no telemetry endpoints detected/selected")
> +
> +    sys.dont_write_bytecode = dwb
> +
> +    modules = []
> +    info = {}
> +    for name, module in sorted(endpoints.items()):
> +        LOG.info("using endpoint: %s (from %s)", name, module.__file__)
> +        try:
> +            for metric, (description, type_) in module.info().items():
> +                info[(name, metric)] = (description, type_)
> +            modules.append(module)
> +        except Exception:
> +            LOG.exception("getting endpoint info: %s", name)
> +    return modules
> +
> +
> +def serve_openmetrics(
> +    args: argparse.Namespace, endpoints: typing.List[TelemetryEndpoint]
> +):
> +    """
> +    Start an HTTP server and serve requests in the openmetrics/prometheus
> +    format.
> +    """
> +    listen = (args.output.hostname or "", int(args.output.port or 80))
> +    with server.HTTPServer(listen, OpenmetricsHandler) as httpd:
> +        httpd.dpdk_socket_path = args.socket_path
> +        httpd.telemetry_endpoints = endpoints
> +        LOG.info("listening on port %s", httpd.server_port)
> +        try:
> +            httpd.serve_forever()
> +        except KeyboardInterrupt:
> +            LOG.info("shutting down")
> +
> +
> +class OpenmetricsHandler(server.BaseHTTPRequestHandler):
> +    """
> +    Basic HTTP handler that returns prometheus/openmetrics formatted responses.
> +    """
> +
> +    CONTENT_TYPE = "text/plain; version=0.0.4; charset=utf-8"
> +
> +    def escape(self, value: typing.Any) -> str:
> +        """
> +        Escape a metric label value.
> +        """
> +        value = str(value)
> +        value = value.replace('"', '\\"')
> +        value = value.replace("\\", "\\\\")
> +        return value.replace("\n", "\\n")
> +
> +    def do_GET(self):
> +        """
> +        Called uppon GET requests.
> +        """
> +        try:
> +            lines = []
> +            metrics_names = set()
> +            with TelemetrySocket(self.server.dpdk_socket_path) as sock:
> +                for e in self.server.telemetry_endpoints:
> +                    info = e.info()
> +                    metrics_lines = []
> +                    try:
> +                        metrics = e.metrics(sock)
> +                    except Exception:
> +                        LOG.exception("%s: metrics collection failed", e.__name__)
> +                        continue
> +                    for name, value, labels in metrics:
> +                        fullname = re.sub(r"\W", "_", f"dpdk_{e.__name__}_{name}")
> +                        labels = ", ".join(
> +                            f'{k}="{self.escape(v)}"' for k, v in labels.items()
> +                        )
> +                        if labels:
> +                            labels = f"{{{labels}}}"
> +                        metrics_lines.append(f"{fullname}{labels} {value}")
> +                        if fullname not in metrics_names:
> +                            metrics_names.add(fullname)
> +                            desc, metric_type = info[name]
> +                            lines += [
> +                                f"# HELP {fullname} {desc}",
> +                                f"# TYPE {fullname} {metric_type}",
> +                            ]
> +                    lines += metrics_lines
> +            if not lines:
> +                self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR)
> +                LOG.error(
> +                    "%s %s: no metrics collected",
> +                    self.address_string(),
> +                    self.requestline,
> +                )
> +            body = "\n".join(lines).encode("utf-8") + b"\n"
> +            self.send_response(HTTPStatus.OK)
> +            self.send_header("Content-Type", self.CONTENT_TYPE)
> +            self.send_header("Content-Length", str(len(body)))
> +            self.end_headers()
> +            self.wfile.write(body)
> +            LOG.info("%s %s", self.address_string(), self.requestline)
> +
> +        except (FileNotFoundError, ConnectionRefusedError):
> +            self.send_error(HTTPStatus.SERVICE_UNAVAILABLE)
> +            LOG.exception(
> +                "%s %s: telemetry socket not available",
> +                self.address_string(),
> +                self.requestline,
> +            )
> +        except Exception:
> +            self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR)
> +            LOG.exception("%s %s", self.address_string(), self.requestline)
> +
> +    def log_message(self, fmt, *args):
> +        pass  # disable built-in logger
> +
> +
> +def export_carbon(args: argparse.Namespace, endpoints: typing.List[TelemetryEndpoint]):
> +    """
> +    Collect all metrics and export them to a carbon server in the pickle format.
> +    """
> +    addr = (args.output.hostname or "", int(args.output.port or 80))
> +    with TelemetrySocket(args.socket_path) as dpdk:
> +        with socket.socket() as carbon:
> +            carbon.connect(addr)
> +            all_metrics = []
> +            for e in endpoints:
> +                try:
> +                    metrics = e.metrics(dpdk)
> +                except Exception:
> +                    LOG.exception("%s: metrics collection failed", e.__name__)
> +                    continue
> +                for name, value, labels in metrics:
> +                    fullname = re.sub(r"\W", ".", f"dpdk.{e.__name__}.{name}")
> +                    for key, val in labels.items():
> +                        val = str(val).replace(";", "")
> +                        fullname += f";{key}={val}"
> +                    all_metrics.append((fullname, (time.time(), value)))
> +            if not all_metrics:
> +                raise Exception("no metrics collected")
> +            payload = pickle.dumps(all_metrics, protocol=2)
> +            header = struct.pack("!L", len(payload))
> +            buf = header + payload
> +            carbon.sendall(buf)
> +
> +
> +OUTPUT_FORMATS = {
> +    "openmetrics": serve_openmetrics,
> +    "prometheus": serve_openmetrics,
> +    "carbon": export_carbon,
> +    "graphite": export_carbon,
> +}
> +
> +
> +if __name__ == "__main__":
> +    main()
> diff --git a/usertools/meson.build b/usertools/meson.build
> index 740b4832f36d..eb48e2f4403f 100644
> --- a/usertools/meson.build
> +++ b/usertools/meson.build
> @@ -11,5 +11,11 @@ install_data([
>              'dpdk-telemetry.py',
>              'dpdk-hugepages.py',
>              'dpdk-rss-flows.py',
> +            'dpdk-telemetry-exporter.py',
>          ],
>          install_dir: 'bin')
> +
> +install_subdir(
> +        'telemetry-endpoints',
> +        install_dir: 'share/dpdk',
> +        strip_directory: false)
> diff --git a/usertools/telemetry-endpoints/counters.py b/usertools/telemetry-endpoints/counters.py
> new file mode 100644
> index 000000000000..e17cffb43b2c
> --- /dev/null
> +++ b/usertools/telemetry-endpoints/counters.py
> @@ -0,0 +1,47 @@
> +# SPDX-License-Identifier: BSD-3-Clause
> +# Copyright (c) 2023 Robin Jarry
> +
> +RX_PACKETS = "rx_packets"
> +RX_BYTES = "rx_bytes"
> +RX_MISSED = "rx_missed"
> +RX_NOMBUF = "rx_nombuf"
> +RX_ERRORS = "rx_errors"
> +TX_PACKETS = "tx_packets"
> +TX_BYTES = "tx_bytes"
> +TX_ERRORS = "tx_errors"
> +
> +
> +def info() -> "dict[Name, tuple[Description, Type]]":
> +    return {
> +        RX_PACKETS: ("Number of successfully received packets.", "counter"),
> +        RX_BYTES: ("Number of successfully received bytes.", "counter"),
> +        RX_MISSED: (
> +            "Number of packets dropped by the HW because Rx queues are full.",
> +            "counter",
> +        ),
> +        RX_NOMBUF: ("Number of Rx mbuf allocation failures.", "counter"),
> +        RX_ERRORS: ("Number of erroneous received packets.", "counter"),
> +        TX_PACKETS: ("Number of successfully transmitted packets.", "counter"),
> +        TX_BYTES: ("Number of successfully transmitted bytes.", "counter"),
> +        TX_ERRORS: ("Number of packet transmission failures.", "counter"),
> +    }
> +
> +
> +def metrics(sock: "TelemetrySocket") -> "list[tuple[Name, Value, Labels]]":
> +    out = []
> +    for port_id in sock.cmd("/ethdev/list"):
> +        port = sock.cmd("/ethdev/info", port_id)
> +        stats = sock.cmd("/ethdev/stats", port_id)
> +        labels = {"port": port["name"]}
> +        out += [
> +            (RX_PACKETS, stats["ipackets"], labels),
> +            (RX_PACKETS, stats["ipackets"], labels),
> +            (RX_BYTES, stats["ibytes"], labels),
> +            (RX_MISSED, stats["imissed"], labels),
> +            (RX_NOMBUF, stats["rx_nombuf"], labels),
> +            (RX_ERRORS, stats["ierrors"], labels),
> +            (TX_PACKETS, stats["opackets"], labels),
> +            (TX_BYTES, stats["obytes"], labels),
> +            (TX_ERRORS, stats["oerrors"], labels),
> +        ]
> +    return out
> diff --git a/usertools/telemetry-endpoints/cpu.py b/usertools/telemetry-endpoints/cpu.py
> new file mode 100644
> index 000000000000..d38d8d6e2558
> --- /dev/null
> +++ b/usertools/telemetry-endpoints/cpu.py
> @@ -0,0 +1,29 @@
> +# SPDX-License-Identifier: BSD-3-Clause
> +# Copyright (c) 2023 Robin Jarry
> +
> +CPU_TOTAL = "total_cycles"
> +CPU_BUSY = "busy_cycles"
> +
> +
> +def info() -> "dict[Name, tuple[Description, Type]]":
> +    return {
> +        CPU_TOTAL: ("Total number of CPU cycles.", "counter"),
> +        CPU_BUSY: ("Number of busy CPU cycles.", "counter"),
> +    }
> +
> +
> +def metrics(sock: "TelemetrySocket") -> "list[tuple[Name, Value, Labels]]":
> +    out = []
> +    for lcore_id in sock.cmd("/eal/lcore/list"):
> +        lcore = sock.cmd("/eal/lcore/info", lcore_id)
> +        cpu = ",".join(str(c) for c in lcore.get("cpuset", []))
> +        total = lcore.get("total_cycles")
> +        busy = lcore.get("busy_cycles", 0)
> +        if not (cpu and total):
> +            continue
> +        labels = {"cpu": cpu, "numa": lcore.get("socket", 0)}
> +        out += [
> +            (CPU_TOTAL, total, labels),
> +            (CPU_BUSY, busy, labels),
> +        ]
> +    return out
> diff --git a/usertools/telemetry-endpoints/memory.py b/usertools/telemetry-endpoints/memory.py
> new file mode 100644
> index 000000000000..32cce1e59382
> --- /dev/null
> +++ b/usertools/telemetry-endpoints/memory.py
> @@ -0,0 +1,37 @@
> +# SPDX-License-Identifier: BSD-3-Clause
> +# Copyright (c) 2023 Robin Jarry
> +
> +MEM_TOTAL = "total_bytes"
> +MEM_USED = "used_bytes"
> +
> +
> +def info() -> "dict[Name, tuple[Description, Type]]":
> +    return {
> +        MEM_TOTAL: ("The total size of reserved memory in bytes.", "gauge"),
> +        MEM_USED: ("The currently used memory in bytes.", "gauge"),
> +    }
> +
> +
> +def metrics(sock: "TelemetrySocket") -> "list[tuple[Name, Value, Labels]]":
> +    zones = {}
> +    used = 0
> +    for zone in sock.cmd("/eal/memzone_list") or []:
> +        z = sock.cmd("/eal/memzone_info", zone)
> +        start = int(z["Hugepage_base"], 16)
> +        end = start + (z["Hugepage_size"] * z["Hugepage_used"])
> +        used += z["Length"]
> +        for s, e in list(zones.items()):
> +            if s < start < e < end:
> +                zones[s] = end
> +                break
> +            if start < s < end < e:
> +                del zones[s]
> +                zones[start] = e
> +                break
> +        else:
> +            zones[start] = end
> +
> +    return [
> +        (MEM_TOTAL, sum(end - start for (start, end) in zones.items()), {}),
> +        (MEM_USED, max(0, used), {}),
> +    ]
> -- 
> 2.44.0


^ permalink raw reply	[flat|nested] 7+ messages in thread

* Re: [PATCH v2] usertools: add telemetry exporter
  2024-04-16 13:46 ` [PATCH v2] " Robin Jarry
  2024-04-22  7:17   ` Anthony Harivel
@ 2024-07-09  4:50   ` David Marchand
  1 sibling, 0 replies; 7+ messages in thread
From: David Marchand @ 2024-07-09  4:50 UTC (permalink / raw)
  To: Robin Jarry; +Cc: dev, Anthony Harivel

On Tue, Apr 16, 2024 at 3:47 PM Robin Jarry <rjarry@redhat.com> wrote:
>
> For now the telemetry socket is local to the machine running a DPDK
> application. Also, there is no official "schema" for the exposed
> metrics. Add a framework and a script to collect and expose these
> metrics to telemetry and observability agree gators such as Prometheus,
> Carbon or Influxdb. The exposed data must be done with end-users in
> mind, some DPDK terminology or internals may not make sense to everyone.
>
> The script only serves as an entry point and does not know anything
> about any specific metrics nor JSON data structures exposed in the
> telemetry socket.
>
> It uses dynamically loaded endpoint exporters which are basic python
> files that must implement two functions:
>
>  def info() -> dict[MetricName, MetricInfo]:
>      Mapping of metric names to their description and type.
>
>  def metrics(sock: TelemetrySocket) -> list[MetricValue]:
>      Request data from sock and return it as metric values. A metric
>      value is a 3-tuple: (name: str, value: any, labels: dict). Each
>      name must be present in info().
>
> The sock argument passed to metrics() has a single method:
>
>  def cmd(self, uri: str, arg: any = None) -> dict | list:
>      Request JSON data to the telemetry socket and parse it to python
>      values.
>
> The main script invokes endpoints and exports the data into an output
> format. For now, only two formats are implemented:
>
> * openmetrics/prometheus: text based format exported via a local HTTP
>   server.
> * carbon/graphite: binary (python pickle) format exported to a distant
>   carbon TCP server.
>
> As a starting point, 3 built-in endpoints are implemented:
>
> * counters: ethdev hardware counters
> * cpu: lcore usage
> * memory: overall memory usage
>
> The goal is to keep all built-in endpoints in the DPDK repository so
> that they can be updated along with the telemetry JSON data structures.
>
> Example output for the openmetrics:// format:
>
>  ~# dpdk-telemetry-exporter.py -o openmetrics://:9876 &
>  INFO using endpoint: counters (from .../telemetry-endpoints/counters.py)
>  INFO using endpoint: cpu (from .../telemetry-endpoints/cpu.py)
>  INFO using endpoint: memory (from .../telemetry-endpoints/memory.py)
>  INFO listening on port 9876
>  [1] 838829
>
>  ~$ curl http://127.0.0.1:9876/
>  # HELP dpdk_cpu_total_cycles Total number of CPU cycles.
>  # TYPE dpdk_cpu_total_cycles counter
>  # HELP dpdk_cpu_busy_cycles Number of busy CPU cycles.
>  # TYPE dpdk_cpu_busy_cycles counter
>  dpdk_cpu_total_cycles{cpu="73", numa="0"} 4353385274702980
>  dpdk_cpu_busy_cycles{cpu="73", numa="0"} 6215932860
>  dpdk_cpu_total_cycles{cpu="9", numa="0"} 4353385274745740
>  dpdk_cpu_busy_cycles{cpu="9", numa="0"} 6215932860
>  dpdk_cpu_total_cycles{cpu="8", numa="0"} 4353383451895540
>  dpdk_cpu_busy_cycles{cpu="8", numa="0"} 6171923160
>  dpdk_cpu_total_cycles{cpu="72", numa="0"} 4353385274817320
>  dpdk_cpu_busy_cycles{cpu="72", numa="0"} 6215932860
>  # HELP dpdk_memory_total_bytes The total size of reserved memory in bytes.
>  # TYPE dpdk_memory_total_bytes gauge
>  # HELP dpdk_memory_used_bytes The currently used memory in bytes.
>  # TYPE dpdk_memory_used_bytes gauge
>  dpdk_memory_total_bytes 1073741824
>  dpdk_memory_used_bytes 794197376
>
> Link: https://prometheus.io/docs/instrumenting/exposition_formats/#text-based-format
> Link: https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#text-format
> Link: https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-pickle-protocol
> Link: https://github.com/influxdata/telegraf/tree/master/plugins/inputs/prometheus
> Signed-off-by: Robin Jarry <rjarry@redhat.com>

Applied, thanks.


-- 
David Marchand


^ permalink raw reply	[flat|nested] 7+ messages in thread

end of thread, other threads:[~2024-07-09  4:50 UTC | newest]

Thread overview: 7+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2023-09-26 16:34 [RFC PATCH] usertools: add telemetry exporter Robin Jarry
2023-11-20 13:26 ` Robin Jarry
2024-03-27 15:18 ` Anthony Harivel
2024-04-01 21:28   ` Robin Jarry
2024-04-16 13:46 ` [PATCH v2] " Robin Jarry
2024-04-22  7:17   ` Anthony Harivel
2024-07-09  4:50   ` David Marchand

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).