From: Brandon Lo <blo@iol.unh.edu>
To: ci@dpdk.org
Cc: Brandon Lo <blo@iol.unh.edu>
Subject: [PATCH v3 1/4] tools: add acvp_tool
Date: Wed, 26 Jan 2022 13:56:43 -0500 [thread overview]
Message-ID: <20220126185643.775332-1-blo@iol.unh.edu> (raw)
In-Reply-To: <CAOeXdvYL_T-ug8zq_ixCnaWYWMh-QXkQ6beE1dd4VPfL4nowbA@mail.gmail.com>
This tool is used to interact with the ACVP API.
Signed-off-by: Brandon Lo <blo@iol.unh.edu>
---
v3:
* Removed changes to root requirements
v2:
* Added SPDX and copyright line
tools/acvp/__init__.py | 0
tools/acvp/acvp_tool.py | 319 ++++++++++++++++++++++++++++++++++++++++
2 files changed, 319 insertions(+)
create mode 100644 tools/acvp/__init__.py
create mode 100644 tools/acvp/acvp_tool.py
diff --git a/tools/acvp/__init__.py b/tools/acvp/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/tools/acvp/acvp_tool.py b/tools/acvp/acvp_tool.py
new file mode 100644
index 0000000..40d2f2f
--- /dev/null
+++ b/tools/acvp/acvp_tool.py
@@ -0,0 +1,319 @@
+#!/usr/bin/env python3
+
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright 2022 The University of New Hampshire
+
+import hashlib
+import sys
+import time
+import base64
+import argparse
+import os
+import json
+import logging
+from typing import Tuple, Optional, Any, Dict, List
+
+import pyotp
+import requests
+
+
+class ACVPProxy:
+ def __init__(self, cert_path: str, key_path: str, totp_path: str,
+ config_path: str):
+ """ACVP Proxy used to abstract API calls.
+
+ @param cert_path: Path to the client certificate.
+ @param key_path: Path to the client key.
+ @param totp_path: Path to the one-time password seed.
+ @param config_path: Path to the configuration for the session.
+ """
+ self.cert: Tuple[str, str] = (cert_path, key_path)
+
+ if None in self.cert:
+ logging.error('Missing certificate/key file.')
+ sys.exit(1)
+
+ self.totp_path: str = totp_path
+ self.login_data: Optional[Dict[str, Any]] = None
+ self.session_data: Optional[Dict[str, Any]] = None
+
+ with open(config_path, 'r') as f:
+ self.config: Any = json.load(f)
+
+ def __get_totp(self) -> str:
+ """Get the current one-time password.
+
+ Uses the totp_path argument when instantiating to
+ read the TOTP seed.
+
+ @return: String containing the password.
+ """
+ with open(self.totp_path, 'r') as f:
+ seed = f.read().strip()
+ base64_seed = base64.b32encode(base64.b64decode(seed)).decode('utf-8')
+ totp = pyotp.TOTP(s=base64_seed, digits=8, digest=hashlib.sha256,
+ interval=30)
+ return totp.now()
+
+ def __fetch_vector_set(self, url: str) -> Optional[Dict]:
+ """Fetch the vector set object from a URL.
+
+ @param url: URL of the vector set given by NIST.
+ @return: Dictionary of the response from the NIST API.
+ The returned dictionary comes without the session information.
+ """
+ logging.info(f'Fetching vector set {url}')
+ token = self.session_data['jwt']
+ while True:
+ response = requests.get(
+ f'{self.config["url"]}{url}',
+ cert=self.cert,
+ headers={'Authorization': f'Bearer {token}'}
+ )
+ if not response.ok:
+ logging.error(f'Failed to fetch vector set {url}')
+ logging.error(json.dumps(response.json(), indent=4))
+ return None
+
+ vector_set_json = response.json()[1]
+ if 'retry' in vector_set_json:
+ duration = vector_set_json['retry']
+ logging.info(f'Server says retry in {duration} seconds...')
+ time.sleep(duration)
+ continue
+
+ logging.info(f'Downloaded vector set {url}')
+ return vector_set_json
+
+ def login(self) -> bool:
+ """Log into the API server.
+
+ Uses the instance's current TOTP seed and certificate file paths
+ to authenticate with the API.
+
+ If successful, the access token of the account will be stored in
+ the instance.
+
+ @return: True if authentication succeeded, false otherwise.
+ """
+ response = requests.post(
+ url=f'{self.config["url"]}/acvp/v1/login',
+ json=[
+ {'acvVersion': '1.0'},
+ {'password': self.__get_totp()},
+ ],
+ cert=self.cert,
+ )
+
+ if not response.ok:
+ logging.error('Failed to log in.')
+ logging.error(json.dumps(response.json(), indent=4))
+ return False
+
+ self.login_data = response.json()[1]
+ # Renamed 'accessToken' to 'jwt' in the json object
+ # to stay consistent with libacvp
+ self.login_data['jwt'] = self.login_data.pop('accessToken')
+ return True
+
+ def register(self) -> Optional[List[Any]]:
+ """Register a new test session.
+
+ This requires the ACVPProxy instance to be authenticated (use .login).
+
+ @return: If registration succeeded, it will return the list
+ containing session information and vector sets.
+ """
+ if self.login_data is None:
+ logging.error('ACVP proxy cannot register a test session without '
+ 'logging in first.')
+ return None
+
+ response = requests.post(
+ url=f'{self.config["url"]}/acvp/v1/testSessions',
+ json=[
+ {'acvVersion': '1.0'},
+ {
+ 'isSample': False,
+ 'algorithms': self.config['algorithms']
+ }
+ ],
+ cert=self.cert,
+ headers={'Authorization': f'Bearer {self.login_data["jwt"]}'}
+ )
+
+ if not response.ok:
+ logging.error('Unable to register.')
+ logging.error(json.dumps(response.json(), indent=4))
+ return None
+
+ self.session_data = response.json()[1]
+ # Renamed 'accessToken' to 'jwt' in the json object
+ # to stay consistent with libacvp
+ self.session_data['jwt'] = self.session_data.pop('accessToken')
+ write_data = [self.session_data]
+
+ for url in self.session_data['vectorSetUrls']:
+ write_data.append(self.__fetch_vector_set(url))
+
+ return write_data
+
+ def fetch_verdict(self, vector_results: List[Any]) -> List[Any]:
+ """Fetch verdict for a list of vector sets.
+
+ @param vector_results: List of vector set dictionaries with answers.
+ @return: A list containing the session information and verdicts.
+ """
+ session_url = self.session_data['url']
+ write_data = [self.session_data]
+ for _, vector_set in vector_results:
+ vector_set_id = vector_set['vsId']
+ logging.info(f'Downloading verdict for vector set {vector_set_id}')
+ while True:
+ result = requests.get(
+ f'{self.config["url"]}{session_url}'
+ f'/vectorSets/{vector_set_id}/results',
+ cert=self.cert,
+ headers={
+ 'Authorization': f'Bearer {self.session_data["jwt"]}'
+ }
+ )
+ version, result_json = result.json()
+ if 'retry' in result_json:
+ duration = result_json['retry']
+ logging.info(f'Vector set verdict not ready, waiting '
+ f'{duration} seconds...')
+ time.sleep(duration)
+ continue
+
+ write_data.append([version, result_json])
+ break
+ return write_data
+
+ def upload(self, vector_sets: List[Any]) -> bool:
+ """Upload the given vector sets.
+
+ @param vector_sets: List of vector set dictionaries with answers.
+ @return: True if uploading succeeded.
+ """
+ has_error = False
+ session_url = self.session_data['url']
+
+ for version, vector_set in vector_sets:
+ response = requests.post(
+ f'{self.config["url"]}{session_url}/vectorSets/'
+ f'{vector_set["vsId"]}/results',
+ json=[version, vector_set],
+ cert=self.cert,
+ headers={'Authorization': f'Bearer {self.session_data["jwt"]}'}
+ )
+
+ if not response.ok:
+ has_error = True
+ logging.error(f'Could not upload vector set response for '
+ f'vector set ID {vector_set["vsId"]}.')
+ logging.error(json.dumps(response.json(), indent=4))
+ continue
+
+ return not has_error
+
+
+def main(request_path: Optional[str],
+ response_path: Optional[str],
+ verdict_path: Optional[str],
+ do_upload: bool,
+ config_path: str):
+
+ if request_path and response_path:
+ logging.error('You cannot use both a request and a response file.')
+ sys.exit(1)
+
+ if not any([request_path, response_path]):
+ logging.error('You must specify either a request or a response file.')
+ sys.exit(1)
+
+ proxy = ACVPProxy(
+ cert_path=os.getenv('ACVP_CERT_FILE'),
+ key_path=os.getenv('ACVP_KEY_FILE'),
+ totp_path=os.getenv('ACVP_SEED_FILE'),
+ config_path=config_path,
+ )
+
+ logging.info('Attempting to log in...')
+ if not proxy.login():
+ logging.error('Could not log in.')
+ sys.exit(1)
+ logging.info('Successfully logged in.')
+
+ if request_path:
+ logging.info('Creating a new test session and downloading vectors...')
+ test_session = proxy.register()
+ if not test_session:
+ logging.error('Could not create a new test session.')
+ sys.exit(1)
+
+ with open(request_path, 'w') as f:
+ json.dump(test_session, f, indent=4)
+ elif response_path:
+ logging.info('Using response file...')
+ with open(response_path, 'r') as upload_file:
+ upload_json: List[Any] = json.load(upload_file)
+ proxy.session_data = upload_json[0]
+
+ if do_upload:
+ logging.info('Uploading response file...')
+ if not proxy.upload(upload_json[1:]):
+ logging.error('Could not successfully upload results file.')
+ sys.exit(1)
+
+ if verdict_path:
+ logging.info('Fetching verdict...')
+ verdict = proxy.fetch_verdict(upload_json[1:])
+ if not verdict:
+ logging.error('Could not successfully fetch verdict file.')
+ sys.exit(1)
+ with open(verdict_path, 'w') as f:
+ json.dump(verdict, f, indent=4)
+
+ logging.info('Done')
+
+
+if __name__ == '__main__':
+ logging.basicConfig(level=logging.INFO)
+ parser = argparse.ArgumentParser(description='ACVP tool to fetch tests '
+ 'and upload results.')
+ parser.add_argument('--request', '-r',
+ help='Path to download a request file. '
+ 'If specified, the tool start a new test session '
+ 'and download the test information into the '
+ 'given path.')
+ parser.add_argument('--response', '-s',
+ help='Path of the response file. '
+ 'If specified, the tool will use this file '
+ 'to determine session information. '
+ 'This argument must be used when uploading '
+ 'results or downloading verdicts.')
+ parser.add_argument('--verdict', '-v',
+ help='Download the verdict to the specified path. '
+ 'If this flag is set, the tool will download the '
+ 'verdict for the given response file '
+ '(--response).')
+ parser.add_argument('--upload', '-u',
+ help='Upload the given response file to the API. '
+ 'If this flag is set, the tool will upload the '
+ 'given response file (--response).',
+ action='store_true')
+ parser.add_argument('--config', '-c',
+ help='Path of the configuration file. '
+ '(Default: acvp_config.json)',
+ default='acvp_config.json')
+
+ args = parser.parse_args()
+
+ main(
+ request_path=args.request,
+ response_path=args.response,
+ verdict_path=args.verdict,
+ do_upload=args.upload,
+ config_path=args.config,
+ )
--
2.25.1
next prev parent reply other threads:[~2022-01-26 18:56 UTC|newest]
Thread overview: 20+ messages / expand[flat|nested] mbox.gz Atom feed top
2022-01-26 18:16 [PATCH v2 0/4] Add ACVP tool Brandon Lo
2022-01-26 18:16 ` [PATCH v2 1/4] tools: add acvp_tool Brandon Lo
2022-01-26 18:25 ` Brandon Lo
2022-01-26 18:56 ` Brandon Lo [this message]
2022-01-26 18:16 ` [PATCH v2 2/4] tools: add default config file for acvp_tool Brandon Lo
2022-01-26 18:16 ` [PATCH v2 3/4] tools: add requirements " Brandon Lo
2022-01-26 18:16 ` [PATCH v2 4/4] doc: add readme " Brandon Lo
2022-02-02 15:04 ` [PATCH v3 0/4] Add ACVP tool Brandon Lo
2022-02-02 15:04 ` [PATCH v3 1/4] tools: add acvp_tool Brandon Lo
2022-04-16 10:34 ` Ali Alnubani
2022-02-02 15:04 ` [PATCH v3 2/4] tools: add default config file for acvp_tool Brandon Lo
2022-02-02 15:04 ` [PATCH v3 3/4] tools: add requirements " Brandon Lo
2022-02-02 15:04 ` [PATCH v3 4/4] doc: add readme " Brandon Lo
2022-02-17 14:27 ` [PATCH v3 0/4] Add ACVP tool Brandon Lo
2022-04-16 10:35 ` Ali Alnubani
2022-04-18 13:36 ` [PATCH v4 " Brandon Lo
2022-04-18 13:36 ` [PATCH v4 1/4] tools: add acvp_tool Brandon Lo
2022-04-18 13:36 ` [PATCH v4 2/4] tools: add default config file for acvp_tool Brandon Lo
2022-04-18 13:36 ` [PATCH v4 3/4] tools: add requirements " Brandon Lo
2022-04-18 13:36 ` [PATCH v4 4/4] doc: add readme " Brandon Lo
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20220126185643.775332-1-blo@iol.unh.edu \
--to=blo@iol.unh.edu \
--cc=ci@dpdk.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).