* [RFC PATCH] usertools: add telemetry exporter @ 2023-09-26 16:34 Robin Jarry 2023-11-20 13:26 ` Robin Jarry ` (2 more replies) 0 siblings, 3 replies; 7+ messages in thread From: Robin Jarry @ 2023-09-26 16:34 UTC (permalink / raw) To: dev; +Cc: Robin Jarry For now the telemetry socket is local to the machine running a DPDK application. Also, there is no official "schema" for the exposed metrics. Add a framework and a script to collect and expose these metrics to telemetry and observability agree gators such as Prometheus, Carbon or Influxdb. The exposed data must be done with end-users in mind, some DPDK terminology or internals may not make sense to everyone. The script only serves as an entry point and does not know anything about any specific metrics nor JSON data structures exposed in the telemetry socket. It uses dynamically loaded endpoint exporters which are basic python files that must implement two functions: def info() -> dict[MetricName, MetricInfo]: Mapping of metric names to their description and type. def metrics(sock: TelemetrySocket) -> list[MetricValue]: Request data from sock and return it as metric values. A metric value is a 3-tuple: (name: str, value: any, labels: dict). Each name must be present in info(). The sock argument passed to metrics() has a single method: def cmd(self, uri: str, arg: any = None) -> dict | list: Request JSON data to the telemetry socket and parse it to python values. The main script invokes endpoints and exports the data into an output format. For now, only two formats are implemented: * openmetrics/prometheus: text based format exported via a local HTTP server. * carbon/graphite: binary (python pickle) format exported to a distant carbon TCP server. As a starting point, 3 built-in endpoints are implemented: * counters: ethdev hardware counters * cpu: lcore usage * memory: overall memory usage The goal is to keep all built-in endpoints in the DPDK repository so that they can be updated along with the telemetry JSON data structures. Example output for the openmetrics:// format: ~# dpdk-telemetry-exporter.py -o openmetrics://:9876 & INFO using endpoint: counters (from .../telemetry-endpoints/counters.py) INFO using endpoint: cpu (from .../telemetry-endpoints/cpu.py) INFO using endpoint: memory (from .../telemetry-endpoints/memory.py) INFO listening on port 9876 [1] 838829 ~$ curl http://127.0.0.1:9876/ # HELP dpdk_cpu_total_cycles Total number of CPU cycles. # TYPE dpdk_cpu_total_cycles counter # HELP dpdk_cpu_busy_cycles Number of busy CPU cycles. # TYPE dpdk_cpu_busy_cycles counter dpdk_cpu_total_cycles{cpu="73", numa="0"} 4353385274702980 dpdk_cpu_busy_cycles{cpu="73", numa="0"} 6215932860 dpdk_cpu_total_cycles{cpu="9", numa="0"} 4353385274745740 dpdk_cpu_busy_cycles{cpu="9", numa="0"} 6215932860 dpdk_cpu_total_cycles{cpu="8", numa="0"} 4353383451895540 dpdk_cpu_busy_cycles{cpu="8", numa="0"} 6171923160 dpdk_cpu_total_cycles{cpu="72", numa="0"} 4353385274817320 dpdk_cpu_busy_cycles{cpu="72", numa="0"} 6215932860 # HELP dpdk_memory_total_bytes The total size of reserved memory in bytes. # TYPE dpdk_memory_total_bytes gauge # HELP dpdk_memory_used_bytes The currently used memory in bytes. # TYPE dpdk_memory_used_bytes gauge dpdk_memory_total_bytes 1073741824 dpdk_memory_used_bytes 794197376 Link: https://prometheus.io/docs/instrumenting/exposition_formats/#text-based-format Link: https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#text-format Link: https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-pickle-protocol Link: https://github.com/influxdata/telegraf/tree/master/plugins/inputs/prometheus Signed-off-by: Robin Jarry <rjarry@redhat.com> --- Notes: v1: * Ideally, this script should be tested in CI to avoid breakage when the telemetry data structures change. I don't know where such a test could be done. * There was work done 3/4 years ago in collectd to add DPDK telemetry support but I think this has now been abandoned. https://github.com/collectd/collectd/blob/main/src/dpdk_telemetry.c I think that keeping the exporters in the DPDK repository makes more sense from a maintainability perspective. usertools/dpdk-telemetry-exporter.py | 376 ++++++++++++++++++++++ usertools/meson.build | 6 + usertools/telemetry-endpoints/counters.py | 47 +++ usertools/telemetry-endpoints/cpu.py | 29 ++ usertools/telemetry-endpoints/memory.py | 37 +++ 5 files changed, 495 insertions(+) create mode 100755 usertools/dpdk-telemetry-exporter.py create mode 100644 usertools/telemetry-endpoints/counters.py create mode 100644 usertools/telemetry-endpoints/cpu.py create mode 100644 usertools/telemetry-endpoints/memory.py diff --git a/usertools/dpdk-telemetry-exporter.py b/usertools/dpdk-telemetry-exporter.py new file mode 100755 index 000000000000..6c1495a9e1e8 --- /dev/null +++ b/usertools/dpdk-telemetry-exporter.py @@ -0,0 +1,376 @@ +#!/usr/bin/env python3 +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2023 Robin Jarry + +''' +DPDK telemetry exporter. + +It uses dynamically loaded endpoint exporters which are basic python files that +must implement two functions: + + def info() -> dict[MetricName, MetricInfo]: + """ + Mapping of metric names to their description and type. + """ + + def metrics(sock: TelemetrySocket) -> list[MetricValue]: + """ + Request data from sock and return it as metric values. A metric value + is a 3-tuple: (name: str, value: any, labels: dict). Each name must be + present in info(). + """ + +The sock argument passed to metrics() has a single method: + + def cmd(self, uri, arg=None) -> dict | list: + """ + Request JSON data to the telemetry socket and parse it to python + values. + """ + +See existing endpoints for examples. + +The exporter supports multiple output formats: + +prometheus://ADDRESS:PORT +openmetrics://ADDRESS:PORT + Expose the enabled endpoints via a local HTTP server listening on the + specified address and port. GET requests on that server are served with + text/plain responses in the prometheus/openmetrics format. + + More details: + https://prometheus.io/docs/instrumenting/exposition_formats/#text-based-format + +carbon://ADDRESS:PORT +graphite://ADDRESS:PORT + Export all enabled endpoints to the specified TCP ADDRESS:PORT in the pickle + carbon format. + + More details: + https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-pickle-protocol +''' + +import argparse +from http import HTTPStatus, server +import importlib.util +import json +import logging +import os +import pickle +import re +import socket +import struct +import sys +import time +import typing +from urllib.parse import urlparse + + +LOG = logging.getLogger(__name__) +# Use local endpoints path only when running from source +LOCAL = os.path.join(os.path.dirname(__file__), "telemetry-endpoints") +DEFAULT_LOAD_PATHS = [] +if os.path.isdir(LOCAL): + DEFAULT_LOAD_PATHS.append(LOCAL) +DEFAULT_LOAD_PATHS += [ + "/usr/local/share/dpdk/telemetry-endpoints", + "/usr/share/dpdk/telemetry-endpoints", +] +DEFAULT_OUTPUT = "openmetrics://:9876" + + +def main(): + logging.basicConfig( + stream=sys.stdout, + level=logging.INFO, + format="%(asctime)s %(levelname)s %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument( + "-o", + "--output", + metavar="FORMAT://PARAMETERS", + default=urlparse(DEFAULT_OUTPUT), + type=urlparse, + help=f""" + Output format (default: "{DEFAULT_OUTPUT}"). Depending on the format, + URL elements have different meanings. By default, the exporter starts a + local HTTP server on port 9876 that serves requests in the + prometheus/openmetrics plain text format. + """, + ) + parser.add_argument( + "-p", + "--load-path", + dest="load_paths", + type=lambda v: v.split(os.pathsep), + default=DEFAULT_LOAD_PATHS, + help=f""" + The list of paths from which to disvover endpoints. + (default: "{os.pathsep.join(DEFAULT_LOAD_PATHS)}"). + """, + ) + parser.add_argument( + "-e", + "--endpoint", + dest="endpoints", + action="append", + help=""" + Telemetry endpoint to export (by default, all discovered endpoints are + enabled). This option can be specified more than once. + """, + ) + parser.add_argument( + "-l", + "--list", + action="store_true", + help=""" + Only list detected endpoints and exit. + """, + ) + parser.add_argument( + "-s", + "--socket-path", + default="/run/dpdk/rte/dpdk_telemetry.v2", + help=""" + The DPDK telemetry socket path (default: "%(default)s"). + """, + ) + args = parser.parse_args() + output = OUTPUT_FORMATS.get(args.output.scheme) + if output is None: + parser.error(f"unsupported output format: {args.output.scheme}://") + try: + endpoints = load_endpoints(args.load_paths, args.endpoints) + if args.list: + return + output(args, endpoints) + except KeyboardInterrupt: + pass + except Exception: + LOG.exception("") + + +class TelemetrySocket: + """ + Abstraction of the DPDK telemetry socket. + """ + + def __init__(self, path: str): + self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET) + self.sock.connect(path) + data = json.loads(self.sock.recv(1024).decode()) + self.max_output_len = data["max_output_len"] + + def cmd( + self, uri: str, arg: typing.Any = None + ) -> typing.Optional[typing.Union[dict, list]]: + """ + Request JSON data to the telemetry socket and parse it to python + values. + """ + if arg is not None: + u = f"{uri},{arg}" + else: + u = uri + self.sock.send(u.encode("utf-8")) + data = self.sock.recv(self.max_output_len) + return json.loads(data.decode("utf-8"))[uri] + + def __enter__(self): + return self + + def __exit__(self, *args, **kwargs): + self.sock.close() + + +MetricDescription = str +MetricType = str +MetricName = str +MetricLabels = typing.Dict[str, typing.Any] +MetricInfo = typing.Tuple[MetricDescription, MetricType] +MetricValue = typing.Tuple[MetricName, typing.Any, MetricLabels] + + +class TelemetryEndpoint: + """ + Placeholder class only used for typing annotations. + """ + + @staticmethod + def info() -> typing.Dict[MetricName, MetricInfo]: + """ + Mapping of metric names to their description and type. + """ + raise NotImplementedError() + + @staticmethod + def metrics(sock: TelemetrySocket) -> typing.List[MetricValue]: + """ + Request data from sock and return it as metric values. Each metric + name must be present in info(). + """ + raise NotImplementedError() + + +def load_endpoints( + paths: typing.List[str], names: typing.List[str] +) -> typing.List[TelemetryEndpoint]: + """ + Load selected telemetry endpoints from the specified paths. + """ + + endpoints = {} + dwb = sys.dont_write_bytecode + sys.dont_write_bytecode = True # never generate .pyc files for endpoints + + for p in paths: + if not os.path.isdir(p): + continue + for fname in os.listdir(p): + f = os.path.join(p, fname) + if os.path.isdir(f): + continue + try: + name, _ = os.path.splitext(fname) + if names is not None and name not in names: + # not selected by user + continue + if name in endpoints: + # endpoint with same name already loaded + continue + spec = importlib.util.spec_from_file_location(name, f) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + endpoints[name] = module + except Exception: + LOG.exception("parsing endpoint: %s", f) + + sys.dont_write_bytecode = dwb + + modules = [] + info = {} + for name, module in sorted(endpoints.items()): + LOG.info("using endpoint: %s (from %s)", name, module.__file__) + try: + for metric, (description, type_) in module.info().items(): + info[(name, metric)] = (description, type_) + modules.append(module) + except Exception: + LOG.exception("getting endpoint info: %s", name) + return modules + + +def serve_openmetrics( + args: argparse.Namespace, endpoints: typing.List[TelemetryEndpoint] +): + """ + Start an HTTP server and serve requests in the openmetrics/prometheus + format. + """ + listen = (args.output.hostname or "", int(args.output.port or 80)) + with server.HTTPServer(listen, OpenmetricsHandler) as httpd: + httpd.dpdk_socket_path = args.socket_path + httpd.telemetry_endpoints = endpoints + LOG.info("listening on port %s", httpd.server_port) + httpd.serve_forever() + + +class OpenmetricsHandler(server.BaseHTTPRequestHandler): + """ + Basic HTTP handler that returns prometheus/openmetrics formatted responses. + """ + + CONTENT_TYPE = "text/plain; version=0.0.4; charset=utf-8" + + def escape(self, value: typing.Any) -> str: + """ + Escape a metric label value. + """ + value = str(value) + value = value.replace('"', '\\"') + value = value.replace("\\", "\\\\") + return value.replace("\n", "\\n") + + def do_GET(self): + """ + Called uppon GET requests. + """ + try: + lines = [] + metrics_names = set() + with TelemetrySocket(self.server.dpdk_socket_path) as sock: + for e in self.server.telemetry_endpoints: + info = e.info() + metrics_lines = [] + for name, value, labels in e.metrics(sock): + fullname = re.sub(r"\W", "_", f"dpdk_{e.__name__}_{name}") + labels = ", ".join( + f'{k}="{self.escape(v)}"' for k, v in labels.items() + ) + if labels: + labels = f"{{{labels}}}" + metrics_lines.append(f"{fullname}{labels} {value}") + if fullname not in metrics_names: + metrics_names.add(fullname) + desc, metric_type = info[name] + lines += [ + f"# HELP {fullname} {desc}", + f"# TYPE {fullname} {metric_type}", + ] + lines += metrics_lines + body = "\n".join(lines).encode("utf-8") + b"\n" + self.send_response(HTTPStatus.OK) + self.send_header("Content-Type", self.CONTENT_TYPE) + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + LOG.info("%s %s", self.address_string(), self.requestline) + + except Exception as e: + if isinstance(e, (FileNotFoundError, ConnectionRefusedError)): + self.send_error(HTTPStatus.SERVICE_UNAVAILABLE) + else: + self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR) + LOG.exception("%s %s", self.address_string(), self.requestline) + + def log_message(self, fmt, *args): + pass # disable built-in logger + + +def export_carbon(args: argparse.Namespace, endpoints: typing.List[TelemetryEndpoint]): + """ + Collect all metrics and export them to a carbon server in the pickle format. + """ + addr = (args.output.hostname or "", int(args.output.port or 80)) + with TelemetrySocket(args.socket_path) as dpdk: + with socket.socket() as carbon: + carbon.connect(addr) + metrics = [] + for e in endpoints: + for name, value, labels in e.metrics(dpdk): + fullname = re.sub(r"\W", ".", f"dpdk.{e.__name__}.{name}") + for key, val in labels.items(): + val = str(val).replace(";", "") + fullname += f";{key}={val}" + metrics.append((fullname, (time.time(), value))) + payload = pickle.dumps(metrics, protocol=2) + header = struct.pack("!L", len(payload)) + buf = header + payload + carbon.sendall(buf) + + +OUTPUT_FORMATS = { + "openmetrics": serve_openmetrics, + "prometheus": serve_openmetrics, + "carbon": export_carbon, + "graphite": export_carbon, +} + + +if __name__ == "__main__": + main() diff --git a/usertools/meson.build b/usertools/meson.build index 740b4832f36d..eb48e2f4403f 100644 --- a/usertools/meson.build +++ b/usertools/meson.build @@ -11,5 +11,11 @@ install_data([ 'dpdk-telemetry.py', 'dpdk-hugepages.py', 'dpdk-rss-flows.py', + 'dpdk-telemetry-exporter.py', ], install_dir: 'bin') + +install_subdir( + 'telemetry-endpoints', + install_dir: 'share/dpdk', + strip_directory: false) diff --git a/usertools/telemetry-endpoints/counters.py b/usertools/telemetry-endpoints/counters.py new file mode 100644 index 000000000000..e17cffb43b2c --- /dev/null +++ b/usertools/telemetry-endpoints/counters.py @@ -0,0 +1,47 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2023 Robin Jarry + +RX_PACKETS = "rx_packets" +RX_BYTES = "rx_bytes" +RX_MISSED = "rx_missed" +RX_NOMBUF = "rx_nombuf" +RX_ERRORS = "rx_errors" +TX_PACKETS = "tx_packets" +TX_BYTES = "tx_bytes" +TX_ERRORS = "tx_errors" + + +def info() -> "dict[Name, tuple[Description, Type]]": + return { + RX_PACKETS: ("Number of successfully received packets.", "counter"), + RX_BYTES: ("Number of successfully received bytes.", "counter"), + RX_MISSED: ( + "Number of packets dropped by the HW because Rx queues are full.", + "counter", + ), + RX_NOMBUF: ("Number of Rx mbuf allocation failures.", "counter"), + RX_ERRORS: ("Number of erroneous received packets.", "counter"), + TX_PACKETS: ("Number of successfully transmitted packets.", "counter"), + TX_BYTES: ("Number of successfully transmitted bytes.", "counter"), + TX_ERRORS: ("Number of packet transmission failures.", "counter"), + } + + +def metrics(sock: "TelemetrySocket") -> "list[tuple[Name, Value, Labels]]": + out = [] + for port_id in sock.cmd("/ethdev/list"): + port = sock.cmd("/ethdev/info", port_id) + stats = sock.cmd("/ethdev/stats", port_id) + labels = {"port": port["name"]} + out += [ + (RX_PACKETS, stats["ipackets"], labels), + (RX_PACKETS, stats["ipackets"], labels), + (RX_BYTES, stats["ibytes"], labels), + (RX_MISSED, stats["imissed"], labels), + (RX_NOMBUF, stats["rx_nombuf"], labels), + (RX_ERRORS, stats["ierrors"], labels), + (TX_PACKETS, stats["opackets"], labels), + (TX_BYTES, stats["obytes"], labels), + (TX_ERRORS, stats["oerrors"], labels), + ] + return out diff --git a/usertools/telemetry-endpoints/cpu.py b/usertools/telemetry-endpoints/cpu.py new file mode 100644 index 000000000000..d38d8d6e2558 --- /dev/null +++ b/usertools/telemetry-endpoints/cpu.py @@ -0,0 +1,29 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2023 Robin Jarry + +CPU_TOTAL = "total_cycles" +CPU_BUSY = "busy_cycles" + + +def info() -> "dict[Name, tuple[Description, Type]]": + return { + CPU_TOTAL: ("Total number of CPU cycles.", "counter"), + CPU_BUSY: ("Number of busy CPU cycles.", "counter"), + } + + +def metrics(sock: "TelemetrySocket") -> "list[tuple[Name, Value, Labels]]": + out = [] + for lcore_id in sock.cmd("/eal/lcore/list"): + lcore = sock.cmd("/eal/lcore/info", lcore_id) + cpu = ",".join(str(c) for c in lcore.get("cpuset", [])) + total = lcore.get("total_cycles") + busy = lcore.get("busy_cycles", 0) + if not (cpu and total): + continue + labels = {"cpu": cpu, "numa": lcore.get("socket", 0)} + out += [ + (CPU_TOTAL, total, labels), + (CPU_BUSY, busy, labels), + ] + return out diff --git a/usertools/telemetry-endpoints/memory.py b/usertools/telemetry-endpoints/memory.py new file mode 100644 index 000000000000..32cce1e59382 --- /dev/null +++ b/usertools/telemetry-endpoints/memory.py @@ -0,0 +1,37 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2023 Robin Jarry + +MEM_TOTAL = "total_bytes" +MEM_USED = "used_bytes" + + +def info() -> "dict[Name, tuple[Description, Type]]": + return { + MEM_TOTAL: ("The total size of reserved memory in bytes.", "gauge"), + MEM_USED: ("The currently used memory in bytes.", "gauge"), + } + + +def metrics(sock: "TelemetrySocket") -> "list[tuple[Name, Value, Labels]]": + zones = {} + used = 0 + for zone in sock.cmd("/eal/memzone_list") or []: + z = sock.cmd("/eal/memzone_info", zone) + start = int(z["Hugepage_base"], 16) + end = start + (z["Hugepage_size"] * z["Hugepage_used"]) + used += z["Length"] + for s, e in list(zones.items()): + if s < start < e < end: + zones[s] = end + break + if start < s < end < e: + del zones[s] + zones[start] = e + break + else: + zones[start] = end + + return [ + (MEM_TOTAL, sum(end - start for (start, end) in zones.items()), {}), + (MEM_USED, max(0, used), {}), + ] -- 2.41.0 ^ permalink raw reply [flat|nested] 7+ messages in thread
* Re: [RFC PATCH] usertools: add telemetry exporter 2023-09-26 16:34 [RFC PATCH] usertools: add telemetry exporter Robin Jarry @ 2023-11-20 13:26 ` Robin Jarry 2024-03-27 15:18 ` Anthony Harivel 2024-04-16 13:46 ` [PATCH v2] " Robin Jarry 2 siblings, 0 replies; 7+ messages in thread From: Robin Jarry @ 2023-11-20 13:26 UTC (permalink / raw) To: dev Gentle ping. Is anybody interested in this? ^ permalink raw reply [flat|nested] 7+ messages in thread
* [RFC PATCH] usertools: add telemetry exporter 2023-09-26 16:34 [RFC PATCH] usertools: add telemetry exporter Robin Jarry 2023-11-20 13:26 ` Robin Jarry @ 2024-03-27 15:18 ` Anthony Harivel 2024-04-01 21:28 ` Robin Jarry 2024-04-16 13:46 ` [PATCH v2] " Robin Jarry 2 siblings, 1 reply; 7+ messages in thread From: Anthony Harivel @ 2024-03-27 15:18 UTC (permalink / raw) To: rjarry; +Cc: dev Hi Robin, Thanks for this patch. I did test it and it works as expected. Nonetheless, maybe we can improve on some parts. In 'class TelemetrySocket', there is: ... self.sock.connect(path) data = json.loads(self.sock.recv(1024).decode()) ... Maybe we can improve with something like: try: rcv_data = self.sock.recv(1024) if rcv_data: data = json.loads(rcv_data.decode()) else: print("No data received from socket.") except json.JSONDecodeError as e: print("Error decoding JSON:", e) except Exception as e: print("An error occurred:", e) So that it handles a bit better the error cases. In the same way to implement more robust error handling mechanisms in: def load_endpoints ... except Exception as e: LOG.error("Failed to load endpoint module '%s' from '%s': %s", name, f, e) ... For example, you might catch FileNotFoundError, ImportError, or SyntaxError. That could help to debug! About TelemetryEndpoint I would see something like: class TelemetryEndpoint: """ Placeholder class only used for typing annotations. """ @staticmethod def info() -> typing.Dict[MetricName, MetricInfo]: """ Mapping of metric names to their description and type. """ raise NotImplementedError() @staticmethod def metrics(sock: TelemetrySocket) -> typing.List[MetricValue]: """ Request data from sock and return it as metric values. Each metric name must be present in info(). """ try: metrics = [] metrics_data = sock.fetch_metrics_data() for metric_name, metric_value in metrics_data.items(): metrics.append((metric_name, metric_value, {})) return metrics except Exception as e: LOG.error("Failed to fetch metrics data: %s", e) # If unable to fetch metrics data, return an empty list return [] With these changes, the metrics method of the TelemetryEndpoint class could handle errors better and the exporter can continue functioning even if there are issues with fetching metrics data. I don't know if all of that makes sens or if it's just nitpicking ! I can also propose an enhanced version of your patch if you prefer. Regards, Anthony ^ permalink raw reply [flat|nested] 7+ messages in thread
* Re: [RFC PATCH] usertools: add telemetry exporter 2024-03-27 15:18 ` Anthony Harivel @ 2024-04-01 21:28 ` Robin Jarry 0 siblings, 0 replies; 7+ messages in thread From: Robin Jarry @ 2024-04-01 21:28 UTC (permalink / raw) To: Anthony Harivel; +Cc: dev Anthony Harivel, Mar 27, 2024 at 16:18: > Hi Robin, > > Thanks for this patch. I did test it and it works as expected. > Nonetheless, maybe we can improve on some parts. Hey Anthony, thanks a lot for testing! > In 'class TelemetrySocket', there is: > ... > self.sock.connect(path) > data = json.loads(self.sock.recv(1024).decode()) > ... > > Maybe we can improve with something like: > > try: > rcv_data = self.sock.recv(1024) > if rcv_data: > data = json.loads(rcv_data.decode()) > else: > print("No data received from socket.") > except json.JSONDecodeError as e: > print("Error decoding JSON:", e) > except Exception as e: > print("An error occurred:", e) > > So that it handles a bit better the error cases. This is undesired as it would actually mask the errors from the calling code. Unless you can do something about the error, printing it should be left to the calling code. I will handle these errors better in v2. > In the same way to implement more robust error handling mechanisms in: > def load_endpoints > ... > except Exception as e: > LOG.error("Failed to load endpoint module '%s' from '%s': %s", name, f, e) > ... > > For example, you might catch FileNotFoundError, ImportError, or SyntaxError. > That could help to debug! We could print the whole stack trace but I don't see what special handling could be done depending on the exception. I will try to make this better for v2. > About TelemetryEndpoint I would see something like: > > class TelemetryEndpoint: > """ > Placeholder class only used for typing annotations. > """ > > @staticmethod > def info() -> typing.Dict[MetricName, MetricInfo]: > """ > Mapping of metric names to their description and type. > """ > raise NotImplementedError() > > @staticmethod > def metrics(sock: TelemetrySocket) -> typing.List[MetricValue]: > """ > Request data from sock and return it as metric values. Each metric > name must be present in info(). > """ > try: > metrics = [] > metrics_data = sock.fetch_metrics_data() > for metric_name, metric_value in metrics_data.items(): > metrics.append((metric_name, metric_value, {})) > return metrics > except Exception as e: > LOG.error("Failed to fetch metrics data: %s", e) > # If unable to fetch metrics data, return an empty list > return [] > > With these changes, the metrics method of the TelemetryEndpoint class > could handle errors better and the exporter can continue functioning > even if there are issues with fetching metrics data. > > I don't know if all of that makes sens or if it's just nitpicking! > I can also propose an enhanced version of your patch if you prefer. As indicated in the docstring, this class is merely a placeholder. Its code is never executed. It only stands to represent what functions must be implemented in endpoints. However, your point is valid. I will update my code to handle errors in endpoints more gracefully and avoid failing the whole request if there is only one error. Thanks for the thorough review! ^ permalink raw reply [flat|nested] 7+ messages in thread
* [PATCH v2] usertools: add telemetry exporter 2023-09-26 16:34 [RFC PATCH] usertools: add telemetry exporter Robin Jarry 2023-11-20 13:26 ` Robin Jarry 2024-03-27 15:18 ` Anthony Harivel @ 2024-04-16 13:46 ` Robin Jarry 2024-04-22 7:17 ` Anthony Harivel 2024-07-09 4:50 ` David Marchand 2 siblings, 2 replies; 7+ messages in thread From: Robin Jarry @ 2024-04-16 13:46 UTC (permalink / raw) To: dev; +Cc: Anthony Harivel For now the telemetry socket is local to the machine running a DPDK application. Also, there is no official "schema" for the exposed metrics. Add a framework and a script to collect and expose these metrics to telemetry and observability agree gators such as Prometheus, Carbon or Influxdb. The exposed data must be done with end-users in mind, some DPDK terminology or internals may not make sense to everyone. The script only serves as an entry point and does not know anything about any specific metrics nor JSON data structures exposed in the telemetry socket. It uses dynamically loaded endpoint exporters which are basic python files that must implement two functions: def info() -> dict[MetricName, MetricInfo]: Mapping of metric names to their description and type. def metrics(sock: TelemetrySocket) -> list[MetricValue]: Request data from sock and return it as metric values. A metric value is a 3-tuple: (name: str, value: any, labels: dict). Each name must be present in info(). The sock argument passed to metrics() has a single method: def cmd(self, uri: str, arg: any = None) -> dict | list: Request JSON data to the telemetry socket and parse it to python values. The main script invokes endpoints and exports the data into an output format. For now, only two formats are implemented: * openmetrics/prometheus: text based format exported via a local HTTP server. * carbon/graphite: binary (python pickle) format exported to a distant carbon TCP server. As a starting point, 3 built-in endpoints are implemented: * counters: ethdev hardware counters * cpu: lcore usage * memory: overall memory usage The goal is to keep all built-in endpoints in the DPDK repository so that they can be updated along with the telemetry JSON data structures. Example output for the openmetrics:// format: ~# dpdk-telemetry-exporter.py -o openmetrics://:9876 & INFO using endpoint: counters (from .../telemetry-endpoints/counters.py) INFO using endpoint: cpu (from .../telemetry-endpoints/cpu.py) INFO using endpoint: memory (from .../telemetry-endpoints/memory.py) INFO listening on port 9876 [1] 838829 ~$ curl http://127.0.0.1:9876/ # HELP dpdk_cpu_total_cycles Total number of CPU cycles. # TYPE dpdk_cpu_total_cycles counter # HELP dpdk_cpu_busy_cycles Number of busy CPU cycles. # TYPE dpdk_cpu_busy_cycles counter dpdk_cpu_total_cycles{cpu="73", numa="0"} 4353385274702980 dpdk_cpu_busy_cycles{cpu="73", numa="0"} 6215932860 dpdk_cpu_total_cycles{cpu="9", numa="0"} 4353385274745740 dpdk_cpu_busy_cycles{cpu="9", numa="0"} 6215932860 dpdk_cpu_total_cycles{cpu="8", numa="0"} 4353383451895540 dpdk_cpu_busy_cycles{cpu="8", numa="0"} 6171923160 dpdk_cpu_total_cycles{cpu="72", numa="0"} 4353385274817320 dpdk_cpu_busy_cycles{cpu="72", numa="0"} 6215932860 # HELP dpdk_memory_total_bytes The total size of reserved memory in bytes. # TYPE dpdk_memory_total_bytes gauge # HELP dpdk_memory_used_bytes The currently used memory in bytes. # TYPE dpdk_memory_used_bytes gauge dpdk_memory_total_bytes 1073741824 dpdk_memory_used_bytes 794197376 Link: https://prometheus.io/docs/instrumenting/exposition_formats/#text-based-format Link: https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#text-format Link: https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-pickle-protocol Link: https://github.com/influxdata/telegraf/tree/master/plugins/inputs/prometheus Signed-off-by: Robin Jarry <rjarry@redhat.com> --- Notes: v2: * Refuse to run if no endpoints are enabled. * Handle endpoint errors gracefully without failing the whole query. usertools/dpdk-telemetry-exporter.py | 405 ++++++++++++++++++++++ usertools/meson.build | 6 + usertools/telemetry-endpoints/counters.py | 47 +++ usertools/telemetry-endpoints/cpu.py | 29 ++ usertools/telemetry-endpoints/memory.py | 37 ++ 5 files changed, 524 insertions(+) create mode 100755 usertools/dpdk-telemetry-exporter.py create mode 100644 usertools/telemetry-endpoints/counters.py create mode 100644 usertools/telemetry-endpoints/cpu.py create mode 100644 usertools/telemetry-endpoints/memory.py diff --git a/usertools/dpdk-telemetry-exporter.py b/usertools/dpdk-telemetry-exporter.py new file mode 100755 index 000000000000..f8d873ad856c --- /dev/null +++ b/usertools/dpdk-telemetry-exporter.py @@ -0,0 +1,405 @@ +#!/usr/bin/env python3 +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2023 Robin Jarry + +r''' +DPDK telemetry exporter. + +It uses dynamically loaded endpoint exporters which are basic python files that +must implement two functions: + + def info() -> dict[MetricName, MetricInfo]: + """ + Mapping of metric names to their description and type. + """ + + def metrics(sock: TelemetrySocket) -> list[MetricValue]: + """ + Request data from sock and return it as metric values. A metric value + is a 3-tuple: (name: str, value: any, labels: dict). Each name must be + present in info(). + """ + +The sock argument passed to metrics() has a single method: + + def cmd(self, uri, arg=None) -> dict | list: + """ + Request JSON data to the telemetry socket and parse it to python + values. + """ + +See existing endpoints for examples. + +The exporter supports multiple output formats: + +prometheus://ADDRESS:PORT +openmetrics://ADDRESS:PORT + Expose the enabled endpoints via a local HTTP server listening on the + specified address and port. GET requests on that server are served with + text/plain responses in the prometheus/openmetrics format. + + More details: + https://prometheus.io/docs/instrumenting/exposition_formats/#text-based-format + +carbon://ADDRESS:PORT +graphite://ADDRESS:PORT + Export all enabled endpoints to the specified TCP ADDRESS:PORT in the pickle + carbon format. + + More details: + https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-pickle-protocol +''' + +import argparse +import importlib.util +import json +import logging +import os +import pickle +import re +import socket +import struct +import sys +import time +import typing +from http import HTTPStatus, server +from urllib.parse import urlparse + +LOG = logging.getLogger(__name__) +# Use local endpoints path only when running from source +LOCAL = os.path.join(os.path.dirname(__file__), "telemetry-endpoints") +DEFAULT_LOAD_PATHS = [] +if os.path.isdir(LOCAL): + DEFAULT_LOAD_PATHS.append(LOCAL) +DEFAULT_LOAD_PATHS += [ + "/usr/local/share/dpdk/telemetry-endpoints", + "/usr/share/dpdk/telemetry-endpoints", +] +DEFAULT_OUTPUT = "openmetrics://:9876" + + +def main(): + logging.basicConfig( + stream=sys.stdout, + level=logging.INFO, + format="%(asctime)s %(levelname)s %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument( + "-o", + "--output", + metavar="FORMAT://PARAMETERS", + default=urlparse(DEFAULT_OUTPUT), + type=urlparse, + help=f""" + Output format (default: "{DEFAULT_OUTPUT}"). Depending on the format, + URL elements have different meanings. By default, the exporter starts a + local HTTP server on port 9876 that serves requests in the + prometheus/openmetrics plain text format. + """, + ) + parser.add_argument( + "-p", + "--load-path", + dest="load_paths", + type=lambda v: v.split(os.pathsep), + default=DEFAULT_LOAD_PATHS, + help=f""" + The list of paths from which to disvover endpoints. + (default: "{os.pathsep.join(DEFAULT_LOAD_PATHS)}"). + """, + ) + parser.add_argument( + "-e", + "--endpoint", + dest="endpoints", + metavar="ENDPOINT", + action="append", + help=""" + Telemetry endpoint to export (by default, all discovered endpoints are + enabled). This option can be specified more than once. + """, + ) + parser.add_argument( + "-l", + "--list", + action="store_true", + help=""" + Only list detected endpoints and exit. + """, + ) + parser.add_argument( + "-s", + "--socket-path", + default="/run/dpdk/rte/dpdk_telemetry.v2", + help=""" + The DPDK telemetry socket path (default: "%(default)s"). + """, + ) + args = parser.parse_args() + output = OUTPUT_FORMATS.get(args.output.scheme) + if output is None: + parser.error(f"unsupported output format: {args.output.scheme}://") + + try: + endpoints = load_endpoints(args.load_paths, args.endpoints) + if args.list: + return + except Exception as e: + parser.error(str(e)) + + output(args, endpoints) + + +class TelemetrySocket: + """ + Abstraction of the DPDK telemetry socket. + """ + + def __init__(self, path: str): + self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET) + self.sock.connect(path) + data = json.loads(self.sock.recv(1024).decode()) + self.max_output_len = data["max_output_len"] + + def cmd( + self, uri: str, arg: typing.Any = None + ) -> typing.Optional[typing.Union[dict, list]]: + """ + Request JSON data to the telemetry socket and parse it to python + values. + """ + if arg is not None: + u = f"{uri},{arg}" + else: + u = uri + self.sock.send(u.encode("utf-8")) + data = self.sock.recv(self.max_output_len) + return json.loads(data.decode("utf-8"))[uri] + + def __enter__(self): + return self + + def __exit__(self, *args, **kwargs): + self.sock.close() + + +MetricDescription = str +MetricType = str +MetricName = str +MetricLabels = typing.Dict[str, typing.Any] +MetricInfo = typing.Tuple[MetricDescription, MetricType] +MetricValue = typing.Tuple[MetricName, typing.Any, MetricLabels] + + +class TelemetryEndpoint: + """ + Placeholder class only used for typing annotations. + """ + + @staticmethod + def info() -> typing.Dict[MetricName, MetricInfo]: + """ + Mapping of metric names to their description and type. + """ + raise NotImplementedError() + + @staticmethod + def metrics(sock: TelemetrySocket) -> typing.List[MetricValue]: + """ + Request data from sock and return it as metric values. Each metric + name must be present in info(). + """ + raise NotImplementedError() + + +def load_endpoints( + paths: typing.List[str], names: typing.List[str] +) -> typing.List[TelemetryEndpoint]: + """ + Load selected telemetry endpoints from the specified paths. + """ + + endpoints = {} + dwb = sys.dont_write_bytecode + sys.dont_write_bytecode = True # never generate .pyc files for endpoints + + for p in paths: + if not os.path.isdir(p): + continue + for fname in os.listdir(p): + f = os.path.join(p, fname) + if os.path.isdir(f): + continue + try: + name, _ = os.path.splitext(fname) + if names is not None and name not in names: + # not selected by user + continue + if name in endpoints: + # endpoint with same name already loaded + continue + spec = importlib.util.spec_from_file_location(name, f) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + endpoints[name] = module + except Exception: + LOG.exception("parsing endpoint: %s", f) + + if not endpoints: + raise Exception("no telemetry endpoints detected/selected") + + sys.dont_write_bytecode = dwb + + modules = [] + info = {} + for name, module in sorted(endpoints.items()): + LOG.info("using endpoint: %s (from %s)", name, module.__file__) + try: + for metric, (description, type_) in module.info().items(): + info[(name, metric)] = (description, type_) + modules.append(module) + except Exception: + LOG.exception("getting endpoint info: %s", name) + return modules + + +def serve_openmetrics( + args: argparse.Namespace, endpoints: typing.List[TelemetryEndpoint] +): + """ + Start an HTTP server and serve requests in the openmetrics/prometheus + format. + """ + listen = (args.output.hostname or "", int(args.output.port or 80)) + with server.HTTPServer(listen, OpenmetricsHandler) as httpd: + httpd.dpdk_socket_path = args.socket_path + httpd.telemetry_endpoints = endpoints + LOG.info("listening on port %s", httpd.server_port) + try: + httpd.serve_forever() + except KeyboardInterrupt: + LOG.info("shutting down") + + +class OpenmetricsHandler(server.BaseHTTPRequestHandler): + """ + Basic HTTP handler that returns prometheus/openmetrics formatted responses. + """ + + CONTENT_TYPE = "text/plain; version=0.0.4; charset=utf-8" + + def escape(self, value: typing.Any) -> str: + """ + Escape a metric label value. + """ + value = str(value) + value = value.replace('"', '\\"') + value = value.replace("\\", "\\\\") + return value.replace("\n", "\\n") + + def do_GET(self): + """ + Called uppon GET requests. + """ + try: + lines = [] + metrics_names = set() + with TelemetrySocket(self.server.dpdk_socket_path) as sock: + for e in self.server.telemetry_endpoints: + info = e.info() + metrics_lines = [] + try: + metrics = e.metrics(sock) + except Exception: + LOG.exception("%s: metrics collection failed", e.__name__) + continue + for name, value, labels in metrics: + fullname = re.sub(r"\W", "_", f"dpdk_{e.__name__}_{name}") + labels = ", ".join( + f'{k}="{self.escape(v)}"' for k, v in labels.items() + ) + if labels: + labels = f"{{{labels}}}" + metrics_lines.append(f"{fullname}{labels} {value}") + if fullname not in metrics_names: + metrics_names.add(fullname) + desc, metric_type = info[name] + lines += [ + f"# HELP {fullname} {desc}", + f"# TYPE {fullname} {metric_type}", + ] + lines += metrics_lines + if not lines: + self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR) + LOG.error( + "%s %s: no metrics collected", + self.address_string(), + self.requestline, + ) + body = "\n".join(lines).encode("utf-8") + b"\n" + self.send_response(HTTPStatus.OK) + self.send_header("Content-Type", self.CONTENT_TYPE) + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + LOG.info("%s %s", self.address_string(), self.requestline) + + except (FileNotFoundError, ConnectionRefusedError): + self.send_error(HTTPStatus.SERVICE_UNAVAILABLE) + LOG.exception( + "%s %s: telemetry socket not available", + self.address_string(), + self.requestline, + ) + except Exception: + self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR) + LOG.exception("%s %s", self.address_string(), self.requestline) + + def log_message(self, fmt, *args): + pass # disable built-in logger + + +def export_carbon(args: argparse.Namespace, endpoints: typing.List[TelemetryEndpoint]): + """ + Collect all metrics and export them to a carbon server in the pickle format. + """ + addr = (args.output.hostname or "", int(args.output.port or 80)) + with TelemetrySocket(args.socket_path) as dpdk: + with socket.socket() as carbon: + carbon.connect(addr) + all_metrics = [] + for e in endpoints: + try: + metrics = e.metrics(dpdk) + except Exception: + LOG.exception("%s: metrics collection failed", e.__name__) + continue + for name, value, labels in metrics: + fullname = re.sub(r"\W", ".", f"dpdk.{e.__name__}.{name}") + for key, val in labels.items(): + val = str(val).replace(";", "") + fullname += f";{key}={val}" + all_metrics.append((fullname, (time.time(), value))) + if not all_metrics: + raise Exception("no metrics collected") + payload = pickle.dumps(all_metrics, protocol=2) + header = struct.pack("!L", len(payload)) + buf = header + payload + carbon.sendall(buf) + + +OUTPUT_FORMATS = { + "openmetrics": serve_openmetrics, + "prometheus": serve_openmetrics, + "carbon": export_carbon, + "graphite": export_carbon, +} + + +if __name__ == "__main__": + main() diff --git a/usertools/meson.build b/usertools/meson.build index 740b4832f36d..eb48e2f4403f 100644 --- a/usertools/meson.build +++ b/usertools/meson.build @@ -11,5 +11,11 @@ install_data([ 'dpdk-telemetry.py', 'dpdk-hugepages.py', 'dpdk-rss-flows.py', + 'dpdk-telemetry-exporter.py', ], install_dir: 'bin') + +install_subdir( + 'telemetry-endpoints', + install_dir: 'share/dpdk', + strip_directory: false) diff --git a/usertools/telemetry-endpoints/counters.py b/usertools/telemetry-endpoints/counters.py new file mode 100644 index 000000000000..e17cffb43b2c --- /dev/null +++ b/usertools/telemetry-endpoints/counters.py @@ -0,0 +1,47 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2023 Robin Jarry + +RX_PACKETS = "rx_packets" +RX_BYTES = "rx_bytes" +RX_MISSED = "rx_missed" +RX_NOMBUF = "rx_nombuf" +RX_ERRORS = "rx_errors" +TX_PACKETS = "tx_packets" +TX_BYTES = "tx_bytes" +TX_ERRORS = "tx_errors" + + +def info() -> "dict[Name, tuple[Description, Type]]": + return { + RX_PACKETS: ("Number of successfully received packets.", "counter"), + RX_BYTES: ("Number of successfully received bytes.", "counter"), + RX_MISSED: ( + "Number of packets dropped by the HW because Rx queues are full.", + "counter", + ), + RX_NOMBUF: ("Number of Rx mbuf allocation failures.", "counter"), + RX_ERRORS: ("Number of erroneous received packets.", "counter"), + TX_PACKETS: ("Number of successfully transmitted packets.", "counter"), + TX_BYTES: ("Number of successfully transmitted bytes.", "counter"), + TX_ERRORS: ("Number of packet transmission failures.", "counter"), + } + + +def metrics(sock: "TelemetrySocket") -> "list[tuple[Name, Value, Labels]]": + out = [] + for port_id in sock.cmd("/ethdev/list"): + port = sock.cmd("/ethdev/info", port_id) + stats = sock.cmd("/ethdev/stats", port_id) + labels = {"port": port["name"]} + out += [ + (RX_PACKETS, stats["ipackets"], labels), + (RX_PACKETS, stats["ipackets"], labels), + (RX_BYTES, stats["ibytes"], labels), + (RX_MISSED, stats["imissed"], labels), + (RX_NOMBUF, stats["rx_nombuf"], labels), + (RX_ERRORS, stats["ierrors"], labels), + (TX_PACKETS, stats["opackets"], labels), + (TX_BYTES, stats["obytes"], labels), + (TX_ERRORS, stats["oerrors"], labels), + ] + return out diff --git a/usertools/telemetry-endpoints/cpu.py b/usertools/telemetry-endpoints/cpu.py new file mode 100644 index 000000000000..d38d8d6e2558 --- /dev/null +++ b/usertools/telemetry-endpoints/cpu.py @@ -0,0 +1,29 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2023 Robin Jarry + +CPU_TOTAL = "total_cycles" +CPU_BUSY = "busy_cycles" + + +def info() -> "dict[Name, tuple[Description, Type]]": + return { + CPU_TOTAL: ("Total number of CPU cycles.", "counter"), + CPU_BUSY: ("Number of busy CPU cycles.", "counter"), + } + + +def metrics(sock: "TelemetrySocket") -> "list[tuple[Name, Value, Labels]]": + out = [] + for lcore_id in sock.cmd("/eal/lcore/list"): + lcore = sock.cmd("/eal/lcore/info", lcore_id) + cpu = ",".join(str(c) for c in lcore.get("cpuset", [])) + total = lcore.get("total_cycles") + busy = lcore.get("busy_cycles", 0) + if not (cpu and total): + continue + labels = {"cpu": cpu, "numa": lcore.get("socket", 0)} + out += [ + (CPU_TOTAL, total, labels), + (CPU_BUSY, busy, labels), + ] + return out diff --git a/usertools/telemetry-endpoints/memory.py b/usertools/telemetry-endpoints/memory.py new file mode 100644 index 000000000000..32cce1e59382 --- /dev/null +++ b/usertools/telemetry-endpoints/memory.py @@ -0,0 +1,37 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2023 Robin Jarry + +MEM_TOTAL = "total_bytes" +MEM_USED = "used_bytes" + + +def info() -> "dict[Name, tuple[Description, Type]]": + return { + MEM_TOTAL: ("The total size of reserved memory in bytes.", "gauge"), + MEM_USED: ("The currently used memory in bytes.", "gauge"), + } + + +def metrics(sock: "TelemetrySocket") -> "list[tuple[Name, Value, Labels]]": + zones = {} + used = 0 + for zone in sock.cmd("/eal/memzone_list") or []: + z = sock.cmd("/eal/memzone_info", zone) + start = int(z["Hugepage_base"], 16) + end = start + (z["Hugepage_size"] * z["Hugepage_used"]) + used += z["Length"] + for s, e in list(zones.items()): + if s < start < e < end: + zones[s] = end + break + if start < s < end < e: + del zones[s] + zones[start] = e + break + else: + zones[start] = end + + return [ + (MEM_TOTAL, sum(end - start for (start, end) in zones.items()), {}), + (MEM_USED, max(0, used), {}), + ] -- 2.44.0 ^ permalink raw reply [flat|nested] 7+ messages in thread
* Re: [PATCH v2] usertools: add telemetry exporter 2024-04-16 13:46 ` [PATCH v2] " Robin Jarry @ 2024-04-22 7:17 ` Anthony Harivel 2024-07-09 4:50 ` David Marchand 1 sibling, 0 replies; 7+ messages in thread From: Anthony Harivel @ 2024-04-22 7:17 UTC (permalink / raw) To: Robin Jarry, dev Hi Robin, I've tested your patch and this is all good for me. The errors are better handled. The most common one is when the socket telemetry is closed and you get: 2024-04-19 15:22:00 ERROR 192.168.122.116 GET /metrics HTTP/1.1: telemetry socket not available Traceback (most recent call last): File "/usr/bin/dpdk-telemetry-exporter.py", line 312, in do_GET with TelemetrySocket(self.server.dpdk_socket_path) as sock: File "/usr/bin/dpdk-telemetry-exporter.py", line 165, in __init__ self.sock.connect(path) FileNotFoundError: [Errno 2] No such file or directory You get the Traceback of Python which is a bit useless for the user but at least you have at the first line the root cause: "telemetry socket not available" which is IMO the most important. Thanks for you patch ! Tested-by: Anthony Harivel <aharivel@redhat.com> Regards, Anthony Robin Jarry, Apr 16, 2024 at 15:46: > For now the telemetry socket is local to the machine running a DPDK > application. Also, there is no official "schema" for the exposed > metrics. Add a framework and a script to collect and expose these > metrics to telemetry and observability agree gators such as Prometheus, > Carbon or Influxdb. The exposed data must be done with end-users in > mind, some DPDK terminology or internals may not make sense to everyone. > > The script only serves as an entry point and does not know anything > about any specific metrics nor JSON data structures exposed in the > telemetry socket. > > It uses dynamically loaded endpoint exporters which are basic python > files that must implement two functions: > > def info() -> dict[MetricName, MetricInfo]: > Mapping of metric names to their description and type. > > def metrics(sock: TelemetrySocket) -> list[MetricValue]: > Request data from sock and return it as metric values. A metric > value is a 3-tuple: (name: str, value: any, labels: dict). Each > name must be present in info(). > > The sock argument passed to metrics() has a single method: > > def cmd(self, uri: str, arg: any = None) -> dict | list: > Request JSON data to the telemetry socket and parse it to python > values. > > The main script invokes endpoints and exports the data into an output > format. For now, only two formats are implemented: > > * openmetrics/prometheus: text based format exported via a local HTTP > server. > * carbon/graphite: binary (python pickle) format exported to a distant > carbon TCP server. > > As a starting point, 3 built-in endpoints are implemented: > > * counters: ethdev hardware counters > * cpu: lcore usage > * memory: overall memory usage > > The goal is to keep all built-in endpoints in the DPDK repository so > that they can be updated along with the telemetry JSON data structures. > > Example output for the openmetrics:// format: > > ~# dpdk-telemetry-exporter.py -o openmetrics://:9876 & > INFO using endpoint: counters (from .../telemetry-endpoints/counters.py) > INFO using endpoint: cpu (from .../telemetry-endpoints/cpu.py) > INFO using endpoint: memory (from .../telemetry-endpoints/memory.py) > INFO listening on port 9876 > [1] 838829 > > ~$ curl http://127.0.0.1:9876/ > # HELP dpdk_cpu_total_cycles Total number of CPU cycles. > # TYPE dpdk_cpu_total_cycles counter > # HELP dpdk_cpu_busy_cycles Number of busy CPU cycles. > # TYPE dpdk_cpu_busy_cycles counter > dpdk_cpu_total_cycles{cpu="73", numa="0"} 4353385274702980 > dpdk_cpu_busy_cycles{cpu="73", numa="0"} 6215932860 > dpdk_cpu_total_cycles{cpu="9", numa="0"} 4353385274745740 > dpdk_cpu_busy_cycles{cpu="9", numa="0"} 6215932860 > dpdk_cpu_total_cycles{cpu="8", numa="0"} 4353383451895540 > dpdk_cpu_busy_cycles{cpu="8", numa="0"} 6171923160 > dpdk_cpu_total_cycles{cpu="72", numa="0"} 4353385274817320 > dpdk_cpu_busy_cycles{cpu="72", numa="0"} 6215932860 > # HELP dpdk_memory_total_bytes The total size of reserved memory in bytes. > # TYPE dpdk_memory_total_bytes gauge > # HELP dpdk_memory_used_bytes The currently used memory in bytes. > # TYPE dpdk_memory_used_bytes gauge > dpdk_memory_total_bytes 1073741824 > dpdk_memory_used_bytes 794197376 > > Link: https://prometheus.io/docs/instrumenting/exposition_formats/#text-based-format > Link: https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#text-format > Link: https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-pickle-protocol > Link: https://github.com/influxdata/telegraf/tree/master/plugins/inputs/prometheus > Signed-off-by: Robin Jarry <rjarry@redhat.com> > --- > > Notes: > v2: > > * Refuse to run if no endpoints are enabled. > * Handle endpoint errors gracefully without failing the whole query. > > usertools/dpdk-telemetry-exporter.py | 405 ++++++++++++++++++++++ > usertools/meson.build | 6 + > usertools/telemetry-endpoints/counters.py | 47 +++ > usertools/telemetry-endpoints/cpu.py | 29 ++ > usertools/telemetry-endpoints/memory.py | 37 ++ > 5 files changed, 524 insertions(+) > create mode 100755 usertools/dpdk-telemetry-exporter.py > create mode 100644 usertools/telemetry-endpoints/counters.py > create mode 100644 usertools/telemetry-endpoints/cpu.py > create mode 100644 usertools/telemetry-endpoints/memory.py > > diff --git a/usertools/dpdk-telemetry-exporter.py b/usertools/dpdk-telemetry-exporter.py > new file mode 100755 > index 000000000000..f8d873ad856c > --- /dev/null > +++ b/usertools/dpdk-telemetry-exporter.py > @@ -0,0 +1,405 @@ > +#!/usr/bin/env python3 > +# SPDX-License-Identifier: BSD-3-Clause > +# Copyright (c) 2023 Robin Jarry > + > +r''' > +DPDK telemetry exporter. > + > +It uses dynamically loaded endpoint exporters which are basic python files that > +must implement two functions: > + > + def info() -> dict[MetricName, MetricInfo]: > + """ > + Mapping of metric names to their description and type. > + """ > + > + def metrics(sock: TelemetrySocket) -> list[MetricValue]: > + """ > + Request data from sock and return it as metric values. A metric value > + is a 3-tuple: (name: str, value: any, labels: dict). Each name must be > + present in info(). > + """ > + > +The sock argument passed to metrics() has a single method: > + > + def cmd(self, uri, arg=None) -> dict | list: > + """ > + Request JSON data to the telemetry socket and parse it to python > + values. > + """ > + > +See existing endpoints for examples. > + > +The exporter supports multiple output formats: > + > +prometheus://ADDRESS:PORT > +openmetrics://ADDRESS:PORT > + Expose the enabled endpoints via a local HTTP server listening on the > + specified address and port. GET requests on that server are served with > + text/plain responses in the prometheus/openmetrics format. > + > + More details: > + https://prometheus.io/docs/instrumenting/exposition_formats/#text-based-format > + > +carbon://ADDRESS:PORT > +graphite://ADDRESS:PORT > + Export all enabled endpoints to the specified TCP ADDRESS:PORT in the pickle > + carbon format. > + > + More details: > + https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-pickle-protocol > +''' > + > +import argparse > +import importlib.util > +import json > +import logging > +import os > +import pickle > +import re > +import socket > +import struct > +import sys > +import time > +import typing > +from http import HTTPStatus, server > +from urllib.parse import urlparse > + > +LOG = logging.getLogger(__name__) > +# Use local endpoints path only when running from source > +LOCAL = os.path.join(os.path.dirname(__file__), "telemetry-endpoints") > +DEFAULT_LOAD_PATHS = [] > +if os.path.isdir(LOCAL): > + DEFAULT_LOAD_PATHS.append(LOCAL) > +DEFAULT_LOAD_PATHS += [ > + "/usr/local/share/dpdk/telemetry-endpoints", > + "/usr/share/dpdk/telemetry-endpoints", > +] > +DEFAULT_OUTPUT = "openmetrics://:9876" > + > + > +def main(): > + logging.basicConfig( > + stream=sys.stdout, > + level=logging.INFO, > + format="%(asctime)s %(levelname)s %(message)s", > + datefmt="%Y-%m-%d %H:%M:%S", > + ) > + parser = argparse.ArgumentParser( > + description=__doc__, > + formatter_class=argparse.RawDescriptionHelpFormatter, > + ) > + parser.add_argument( > + "-o", > + "--output", > + metavar="FORMAT://PARAMETERS", > + default=urlparse(DEFAULT_OUTPUT), > + type=urlparse, > + help=f""" > + Output format (default: "{DEFAULT_OUTPUT}"). Depending on the format, > + URL elements have different meanings. By default, the exporter starts a > + local HTTP server on port 9876 that serves requests in the > + prometheus/openmetrics plain text format. > + """, > + ) > + parser.add_argument( > + "-p", > + "--load-path", > + dest="load_paths", > + type=lambda v: v.split(os.pathsep), > + default=DEFAULT_LOAD_PATHS, > + help=f""" > + The list of paths from which to disvover endpoints. > + (default: "{os.pathsep.join(DEFAULT_LOAD_PATHS)}"). > + """, > + ) > + parser.add_argument( > + "-e", > + "--endpoint", > + dest="endpoints", > + metavar="ENDPOINT", > + action="append", > + help=""" > + Telemetry endpoint to export (by default, all discovered endpoints are > + enabled). This option can be specified more than once. > + """, > + ) > + parser.add_argument( > + "-l", > + "--list", > + action="store_true", > + help=""" > + Only list detected endpoints and exit. > + """, > + ) > + parser.add_argument( > + "-s", > + "--socket-path", > + default="/run/dpdk/rte/dpdk_telemetry.v2", > + help=""" > + The DPDK telemetry socket path (default: "%(default)s"). > + """, > + ) > + args = parser.parse_args() > + output = OUTPUT_FORMATS.get(args.output.scheme) > + if output is None: > + parser.error(f"unsupported output format: {args.output.scheme}://") > + > + try: > + endpoints = load_endpoints(args.load_paths, args.endpoints) > + if args.list: > + return > + except Exception as e: > + parser.error(str(e)) > + > + output(args, endpoints) > + > + > +class TelemetrySocket: > + """ > + Abstraction of the DPDK telemetry socket. > + """ > + > + def __init__(self, path: str): > + self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET) > + self.sock.connect(path) > + data = json.loads(self.sock.recv(1024).decode()) > + self.max_output_len = data["max_output_len"] > + > + def cmd( > + self, uri: str, arg: typing.Any = None > + ) -> typing.Optional[typing.Union[dict, list]]: > + """ > + Request JSON data to the telemetry socket and parse it to python > + values. > + """ > + if arg is not None: > + u = f"{uri},{arg}" > + else: > + u = uri > + self.sock.send(u.encode("utf-8")) > + data = self.sock.recv(self.max_output_len) > + return json.loads(data.decode("utf-8"))[uri] > + > + def __enter__(self): > + return self > + > + def __exit__(self, *args, **kwargs): > + self.sock.close() > + > + > +MetricDescription = str > +MetricType = str > +MetricName = str > +MetricLabels = typing.Dict[str, typing.Any] > +MetricInfo = typing.Tuple[MetricDescription, MetricType] > +MetricValue = typing.Tuple[MetricName, typing.Any, MetricLabels] > + > + > +class TelemetryEndpoint: > + """ > + Placeholder class only used for typing annotations. > + """ > + > + @staticmethod > + def info() -> typing.Dict[MetricName, MetricInfo]: > + """ > + Mapping of metric names to their description and type. > + """ > + raise NotImplementedError() > + > + @staticmethod > + def metrics(sock: TelemetrySocket) -> typing.List[MetricValue]: > + """ > + Request data from sock and return it as metric values. Each metric > + name must be present in info(). > + """ > + raise NotImplementedError() > + > + > +def load_endpoints( > + paths: typing.List[str], names: typing.List[str] > +) -> typing.List[TelemetryEndpoint]: > + """ > + Load selected telemetry endpoints from the specified paths. > + """ > + > + endpoints = {} > + dwb = sys.dont_write_bytecode > + sys.dont_write_bytecode = True # never generate .pyc files for endpoints > + > + for p in paths: > + if not os.path.isdir(p): > + continue > + for fname in os.listdir(p): > + f = os.path.join(p, fname) > + if os.path.isdir(f): > + continue > + try: > + name, _ = os.path.splitext(fname) > + if names is not None and name not in names: > + # not selected by user > + continue > + if name in endpoints: > + # endpoint with same name already loaded > + continue > + spec = importlib.util.spec_from_file_location(name, f) > + module = importlib.util.module_from_spec(spec) > + spec.loader.exec_module(module) > + endpoints[name] = module > + except Exception: > + LOG.exception("parsing endpoint: %s", f) > + > + if not endpoints: > + raise Exception("no telemetry endpoints detected/selected") > + > + sys.dont_write_bytecode = dwb > + > + modules = [] > + info = {} > + for name, module in sorted(endpoints.items()): > + LOG.info("using endpoint: %s (from %s)", name, module.__file__) > + try: > + for metric, (description, type_) in module.info().items(): > + info[(name, metric)] = (description, type_) > + modules.append(module) > + except Exception: > + LOG.exception("getting endpoint info: %s", name) > + return modules > + > + > +def serve_openmetrics( > + args: argparse.Namespace, endpoints: typing.List[TelemetryEndpoint] > +): > + """ > + Start an HTTP server and serve requests in the openmetrics/prometheus > + format. > + """ > + listen = (args.output.hostname or "", int(args.output.port or 80)) > + with server.HTTPServer(listen, OpenmetricsHandler) as httpd: > + httpd.dpdk_socket_path = args.socket_path > + httpd.telemetry_endpoints = endpoints > + LOG.info("listening on port %s", httpd.server_port) > + try: > + httpd.serve_forever() > + except KeyboardInterrupt: > + LOG.info("shutting down") > + > + > +class OpenmetricsHandler(server.BaseHTTPRequestHandler): > + """ > + Basic HTTP handler that returns prometheus/openmetrics formatted responses. > + """ > + > + CONTENT_TYPE = "text/plain; version=0.0.4; charset=utf-8" > + > + def escape(self, value: typing.Any) -> str: > + """ > + Escape a metric label value. > + """ > + value = str(value) > + value = value.replace('"', '\\"') > + value = value.replace("\\", "\\\\") > + return value.replace("\n", "\\n") > + > + def do_GET(self): > + """ > + Called uppon GET requests. > + """ > + try: > + lines = [] > + metrics_names = set() > + with TelemetrySocket(self.server.dpdk_socket_path) as sock: > + for e in self.server.telemetry_endpoints: > + info = e.info() > + metrics_lines = [] > + try: > + metrics = e.metrics(sock) > + except Exception: > + LOG.exception("%s: metrics collection failed", e.__name__) > + continue > + for name, value, labels in metrics: > + fullname = re.sub(r"\W", "_", f"dpdk_{e.__name__}_{name}") > + labels = ", ".join( > + f'{k}="{self.escape(v)}"' for k, v in labels.items() > + ) > + if labels: > + labels = f"{{{labels}}}" > + metrics_lines.append(f"{fullname}{labels} {value}") > + if fullname not in metrics_names: > + metrics_names.add(fullname) > + desc, metric_type = info[name] > + lines += [ > + f"# HELP {fullname} {desc}", > + f"# TYPE {fullname} {metric_type}", > + ] > + lines += metrics_lines > + if not lines: > + self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR) > + LOG.error( > + "%s %s: no metrics collected", > + self.address_string(), > + self.requestline, > + ) > + body = "\n".join(lines).encode("utf-8") + b"\n" > + self.send_response(HTTPStatus.OK) > + self.send_header("Content-Type", self.CONTENT_TYPE) > + self.send_header("Content-Length", str(len(body))) > + self.end_headers() > + self.wfile.write(body) > + LOG.info("%s %s", self.address_string(), self.requestline) > + > + except (FileNotFoundError, ConnectionRefusedError): > + self.send_error(HTTPStatus.SERVICE_UNAVAILABLE) > + LOG.exception( > + "%s %s: telemetry socket not available", > + self.address_string(), > + self.requestline, > + ) > + except Exception: > + self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR) > + LOG.exception("%s %s", self.address_string(), self.requestline) > + > + def log_message(self, fmt, *args): > + pass # disable built-in logger > + > + > +def export_carbon(args: argparse.Namespace, endpoints: typing.List[TelemetryEndpoint]): > + """ > + Collect all metrics and export them to a carbon server in the pickle format. > + """ > + addr = (args.output.hostname or "", int(args.output.port or 80)) > + with TelemetrySocket(args.socket_path) as dpdk: > + with socket.socket() as carbon: > + carbon.connect(addr) > + all_metrics = [] > + for e in endpoints: > + try: > + metrics = e.metrics(dpdk) > + except Exception: > + LOG.exception("%s: metrics collection failed", e.__name__) > + continue > + for name, value, labels in metrics: > + fullname = re.sub(r"\W", ".", f"dpdk.{e.__name__}.{name}") > + for key, val in labels.items(): > + val = str(val).replace(";", "") > + fullname += f";{key}={val}" > + all_metrics.append((fullname, (time.time(), value))) > + if not all_metrics: > + raise Exception("no metrics collected") > + payload = pickle.dumps(all_metrics, protocol=2) > + header = struct.pack("!L", len(payload)) > + buf = header + payload > + carbon.sendall(buf) > + > + > +OUTPUT_FORMATS = { > + "openmetrics": serve_openmetrics, > + "prometheus": serve_openmetrics, > + "carbon": export_carbon, > + "graphite": export_carbon, > +} > + > + > +if __name__ == "__main__": > + main() > diff --git a/usertools/meson.build b/usertools/meson.build > index 740b4832f36d..eb48e2f4403f 100644 > --- a/usertools/meson.build > +++ b/usertools/meson.build > @@ -11,5 +11,11 @@ install_data([ > 'dpdk-telemetry.py', > 'dpdk-hugepages.py', > 'dpdk-rss-flows.py', > + 'dpdk-telemetry-exporter.py', > ], > install_dir: 'bin') > + > +install_subdir( > + 'telemetry-endpoints', > + install_dir: 'share/dpdk', > + strip_directory: false) > diff --git a/usertools/telemetry-endpoints/counters.py b/usertools/telemetry-endpoints/counters.py > new file mode 100644 > index 000000000000..e17cffb43b2c > --- /dev/null > +++ b/usertools/telemetry-endpoints/counters.py > @@ -0,0 +1,47 @@ > +# SPDX-License-Identifier: BSD-3-Clause > +# Copyright (c) 2023 Robin Jarry > + > +RX_PACKETS = "rx_packets" > +RX_BYTES = "rx_bytes" > +RX_MISSED = "rx_missed" > +RX_NOMBUF = "rx_nombuf" > +RX_ERRORS = "rx_errors" > +TX_PACKETS = "tx_packets" > +TX_BYTES = "tx_bytes" > +TX_ERRORS = "tx_errors" > + > + > +def info() -> "dict[Name, tuple[Description, Type]]": > + return { > + RX_PACKETS: ("Number of successfully received packets.", "counter"), > + RX_BYTES: ("Number of successfully received bytes.", "counter"), > + RX_MISSED: ( > + "Number of packets dropped by the HW because Rx queues are full.", > + "counter", > + ), > + RX_NOMBUF: ("Number of Rx mbuf allocation failures.", "counter"), > + RX_ERRORS: ("Number of erroneous received packets.", "counter"), > + TX_PACKETS: ("Number of successfully transmitted packets.", "counter"), > + TX_BYTES: ("Number of successfully transmitted bytes.", "counter"), > + TX_ERRORS: ("Number of packet transmission failures.", "counter"), > + } > + > + > +def metrics(sock: "TelemetrySocket") -> "list[tuple[Name, Value, Labels]]": > + out = [] > + for port_id in sock.cmd("/ethdev/list"): > + port = sock.cmd("/ethdev/info", port_id) > + stats = sock.cmd("/ethdev/stats", port_id) > + labels = {"port": port["name"]} > + out += [ > + (RX_PACKETS, stats["ipackets"], labels), > + (RX_PACKETS, stats["ipackets"], labels), > + (RX_BYTES, stats["ibytes"], labels), > + (RX_MISSED, stats["imissed"], labels), > + (RX_NOMBUF, stats["rx_nombuf"], labels), > + (RX_ERRORS, stats["ierrors"], labels), > + (TX_PACKETS, stats["opackets"], labels), > + (TX_BYTES, stats["obytes"], labels), > + (TX_ERRORS, stats["oerrors"], labels), > + ] > + return out > diff --git a/usertools/telemetry-endpoints/cpu.py b/usertools/telemetry-endpoints/cpu.py > new file mode 100644 > index 000000000000..d38d8d6e2558 > --- /dev/null > +++ b/usertools/telemetry-endpoints/cpu.py > @@ -0,0 +1,29 @@ > +# SPDX-License-Identifier: BSD-3-Clause > +# Copyright (c) 2023 Robin Jarry > + > +CPU_TOTAL = "total_cycles" > +CPU_BUSY = "busy_cycles" > + > + > +def info() -> "dict[Name, tuple[Description, Type]]": > + return { > + CPU_TOTAL: ("Total number of CPU cycles.", "counter"), > + CPU_BUSY: ("Number of busy CPU cycles.", "counter"), > + } > + > + > +def metrics(sock: "TelemetrySocket") -> "list[tuple[Name, Value, Labels]]": > + out = [] > + for lcore_id in sock.cmd("/eal/lcore/list"): > + lcore = sock.cmd("/eal/lcore/info", lcore_id) > + cpu = ",".join(str(c) for c in lcore.get("cpuset", [])) > + total = lcore.get("total_cycles") > + busy = lcore.get("busy_cycles", 0) > + if not (cpu and total): > + continue > + labels = {"cpu": cpu, "numa": lcore.get("socket", 0)} > + out += [ > + (CPU_TOTAL, total, labels), > + (CPU_BUSY, busy, labels), > + ] > + return out > diff --git a/usertools/telemetry-endpoints/memory.py b/usertools/telemetry-endpoints/memory.py > new file mode 100644 > index 000000000000..32cce1e59382 > --- /dev/null > +++ b/usertools/telemetry-endpoints/memory.py > @@ -0,0 +1,37 @@ > +# SPDX-License-Identifier: BSD-3-Clause > +# Copyright (c) 2023 Robin Jarry > + > +MEM_TOTAL = "total_bytes" > +MEM_USED = "used_bytes" > + > + > +def info() -> "dict[Name, tuple[Description, Type]]": > + return { > + MEM_TOTAL: ("The total size of reserved memory in bytes.", "gauge"), > + MEM_USED: ("The currently used memory in bytes.", "gauge"), > + } > + > + > +def metrics(sock: "TelemetrySocket") -> "list[tuple[Name, Value, Labels]]": > + zones = {} > + used = 0 > + for zone in sock.cmd("/eal/memzone_list") or []: > + z = sock.cmd("/eal/memzone_info", zone) > + start = int(z["Hugepage_base"], 16) > + end = start + (z["Hugepage_size"] * z["Hugepage_used"]) > + used += z["Length"] > + for s, e in list(zones.items()): > + if s < start < e < end: > + zones[s] = end > + break > + if start < s < end < e: > + del zones[s] > + zones[start] = e > + break > + else: > + zones[start] = end > + > + return [ > + (MEM_TOTAL, sum(end - start for (start, end) in zones.items()), {}), > + (MEM_USED, max(0, used), {}), > + ] > -- > 2.44.0 ^ permalink raw reply [flat|nested] 7+ messages in thread
* Re: [PATCH v2] usertools: add telemetry exporter 2024-04-16 13:46 ` [PATCH v2] " Robin Jarry 2024-04-22 7:17 ` Anthony Harivel @ 2024-07-09 4:50 ` David Marchand 1 sibling, 0 replies; 7+ messages in thread From: David Marchand @ 2024-07-09 4:50 UTC (permalink / raw) To: Robin Jarry; +Cc: dev, Anthony Harivel On Tue, Apr 16, 2024 at 3:47 PM Robin Jarry <rjarry@redhat.com> wrote: > > For now the telemetry socket is local to the machine running a DPDK > application. Also, there is no official "schema" for the exposed > metrics. Add a framework and a script to collect and expose these > metrics to telemetry and observability agree gators such as Prometheus, > Carbon or Influxdb. The exposed data must be done with end-users in > mind, some DPDK terminology or internals may not make sense to everyone. > > The script only serves as an entry point and does not know anything > about any specific metrics nor JSON data structures exposed in the > telemetry socket. > > It uses dynamically loaded endpoint exporters which are basic python > files that must implement two functions: > > def info() -> dict[MetricName, MetricInfo]: > Mapping of metric names to their description and type. > > def metrics(sock: TelemetrySocket) -> list[MetricValue]: > Request data from sock and return it as metric values. A metric > value is a 3-tuple: (name: str, value: any, labels: dict). Each > name must be present in info(). > > The sock argument passed to metrics() has a single method: > > def cmd(self, uri: str, arg: any = None) -> dict | list: > Request JSON data to the telemetry socket and parse it to python > values. > > The main script invokes endpoints and exports the data into an output > format. For now, only two formats are implemented: > > * openmetrics/prometheus: text based format exported via a local HTTP > server. > * carbon/graphite: binary (python pickle) format exported to a distant > carbon TCP server. > > As a starting point, 3 built-in endpoints are implemented: > > * counters: ethdev hardware counters > * cpu: lcore usage > * memory: overall memory usage > > The goal is to keep all built-in endpoints in the DPDK repository so > that they can be updated along with the telemetry JSON data structures. > > Example output for the openmetrics:// format: > > ~# dpdk-telemetry-exporter.py -o openmetrics://:9876 & > INFO using endpoint: counters (from .../telemetry-endpoints/counters.py) > INFO using endpoint: cpu (from .../telemetry-endpoints/cpu.py) > INFO using endpoint: memory (from .../telemetry-endpoints/memory.py) > INFO listening on port 9876 > [1] 838829 > > ~$ curl http://127.0.0.1:9876/ > # HELP dpdk_cpu_total_cycles Total number of CPU cycles. > # TYPE dpdk_cpu_total_cycles counter > # HELP dpdk_cpu_busy_cycles Number of busy CPU cycles. > # TYPE dpdk_cpu_busy_cycles counter > dpdk_cpu_total_cycles{cpu="73", numa="0"} 4353385274702980 > dpdk_cpu_busy_cycles{cpu="73", numa="0"} 6215932860 > dpdk_cpu_total_cycles{cpu="9", numa="0"} 4353385274745740 > dpdk_cpu_busy_cycles{cpu="9", numa="0"} 6215932860 > dpdk_cpu_total_cycles{cpu="8", numa="0"} 4353383451895540 > dpdk_cpu_busy_cycles{cpu="8", numa="0"} 6171923160 > dpdk_cpu_total_cycles{cpu="72", numa="0"} 4353385274817320 > dpdk_cpu_busy_cycles{cpu="72", numa="0"} 6215932860 > # HELP dpdk_memory_total_bytes The total size of reserved memory in bytes. > # TYPE dpdk_memory_total_bytes gauge > # HELP dpdk_memory_used_bytes The currently used memory in bytes. > # TYPE dpdk_memory_used_bytes gauge > dpdk_memory_total_bytes 1073741824 > dpdk_memory_used_bytes 794197376 > > Link: https://prometheus.io/docs/instrumenting/exposition_formats/#text-based-format > Link: https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#text-format > Link: https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-pickle-protocol > Link: https://github.com/influxdata/telegraf/tree/master/plugins/inputs/prometheus > Signed-off-by: Robin Jarry <rjarry@redhat.com> Applied, thanks. -- David Marchand ^ permalink raw reply [flat|nested] 7+ messages in thread
end of thread, other threads:[~2024-07-09 4:50 UTC | newest] Thread overview: 7+ messages (download: mbox.gz / follow: Atom feed) -- links below jump to the message on this page -- 2023-09-26 16:34 [RFC PATCH] usertools: add telemetry exporter Robin Jarry 2023-11-20 13:26 ` Robin Jarry 2024-03-27 15:18 ` Anthony Harivel 2024-04-01 21:28 ` Robin Jarry 2024-04-16 13:46 ` [PATCH v2] " Robin Jarry 2024-04-22 7:17 ` Anthony Harivel 2024-07-09 4:50 ` David Marchand
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox; as well as URLs for NNTP newsgroup(s).