From: Brandon Lo <blo@iol.unh.edu> To: ci@dpdk.org Cc: Brandon Lo <blo@iol.unh.edu> Subject: [PATCH 1/4] tools: add acvp_tool Date: Mon, 24 Jan 2022 14:09:20 -0500 Message-ID: <20220124190923.745577-2-blo@iol.unh.edu> (raw) In-Reply-To: <20220124190923.745577-1-blo@iol.unh.edu> This tool is used to interact with the ACVP API. Signed-off-by: Brandon Lo <blo@iol.unh.edu> --- tools/acvp/__init__.py | 0 tools/acvp/acvp_tool.py | 315 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 315 insertions(+) create mode 100644 tools/acvp/__init__.py create mode 100644 tools/acvp/acvp_tool.py diff --git a/tools/acvp/__init__.py b/tools/acvp/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tools/acvp/acvp_tool.py b/tools/acvp/acvp_tool.py new file mode 100644 index 0000000..7bdda87 --- /dev/null +++ b/tools/acvp/acvp_tool.py @@ -0,0 +1,315 @@ +#!/usr/bin/env python3 +import hashlib +import sys +import time +import base64 +import argparse +import os +import json +from typing import Tuple, Optional, Any, Dict, List + +import pyotp +import requests + +from tools.ci_logging import logging + + +class ACVPProxy: + def __init__(self, cert_path: str, key_path: str, totp_path: str, + config_path: str): + """ACVP Proxy used to abstract API calls. + + @param cert_path: Path to the client certificate. + @param key_path: Path to the client key. + @param totp_path: Path to the one-time password seed. + @param config_path: Path to the configuration for the session. + """ + self.cert: Tuple[str, str] = (cert_path, key_path) + + if None in self.cert: + print('Missing certificate/key file.') + sys.exit(1) + + self.totp_path: str = totp_path + self.login_data: Optional[Dict[str, Any]] = None + self.session_data: Optional[Dict[str, Any]] = None + + with open(config_path, 'r') as f: + self.config: Any = json.load(f) + + def __get_totp(self) -> str: + """Get the current one-time password. + + Uses the totp_path argument when instantiating to + read the TOTP seed. + + @return: String containing the password. + """ + with open(self.totp_path, 'r') as f: + seed = f.read().strip() + base64_seed = base64.b32encode(base64.b64decode(seed)).decode('utf-8') + totp = pyotp.TOTP(s=base64_seed, digits=8, digest=hashlib.sha256, + interval=30) + return totp.now() + + def __fetch_vector_set(self, url: str) -> Optional[Dict]: + """Fetch the vector set object from a URL. + + @param url: URL of the vector set given by NIST. + @return: Dictionary of the response from the NIST API. + The returned dictionary comes without the session information. + """ + logging.info(f'Fetching vector set {url}') + token = self.session_data['jwt'] + while True: + response = requests.get( + f'{self.config["url"]}{url}', + cert=self.cert, + headers={'Authorization': f'Bearer {token}'} + ) + if not response.ok: + logging.error(f'Failed to fetch vector set {url}') + logging.error(json.dumps(response.json(), indent=4)) + return None + + vector_set_json = response.json()[1] + if 'retry' in vector_set_json: + duration = vector_set_json['retry'] + logging.info(f'Server says retry in {duration} seconds...') + time.sleep(duration) + continue + + logging.info(f'Downloaded vector set {url}') + return vector_set_json + + def login(self) -> bool: + """Log into the API server. + + Uses the instance's current TOTP seed and certificate file paths + to authenticate with the API. + + If successful, the access token of the account will be stored in + the instance. + + @return: True if authentication succeeded, false otherwise. + """ + response = requests.post( + url=f'{self.config["url"]}/acvp/v1/login', + json=[ + {'acvVersion': '1.0'}, + {'password': self.__get_totp()}, + ], + cert=self.cert, + ) + + if not response.ok: + logging.error('Failed to log in.') + logging.error(json.dumps(response.json(), indent=4)) + return False + + self.login_data = response.json()[1] + # Renamed 'accessToken' to 'jwt' in the json object + # to stay consistent with libacvp + self.login_data['jwt'] = self.login_data.pop('accessToken') + return True + + def register(self) -> Optional[List[Any]]: + """Register a new test session. + + This requires the ACVPProxy instance to be authenticated (use .login). + + @return: If registration succeeded, it will return the list + containing session information and vector sets. + """ + if self.login_data is None: + logging.error('ACVP proxy cannot register a test session without ' + 'logging in first.') + return None + + response = requests.post( + url=f'{self.config["url"]}/acvp/v1/testSessions', + json=[ + {'acvVersion': '1.0'}, + { + 'isSample': False, + 'algorithms': self.config['algorithms'] + } + ], + cert=self.cert, + headers={'Authorization': f'Bearer {self.login_data["jwt"]}'} + ) + + if not response.ok: + logging.error('Unable to register.') + logging.error(json.dumps(response.json(), indent=4)) + return None + + self.session_data = response.json()[1] + # Renamed 'accessToken' to 'jwt' in the json object + # to stay consistent with libacvp + self.session_data['jwt'] = self.session_data.pop('accessToken') + write_data = [self.session_data] + + for url in self.session_data['vectorSetUrls']: + write_data.append(self.__fetch_vector_set(url)) + + return write_data + + def fetch_verdict(self, vector_results: List[Any]) -> List[Any]: + """Fetch verdict for a list of vector sets. + + @param vector_results: List of vector set dictionaries with answers. + @return: A list containing the session information and verdicts. + """ + session_url = self.session_data['url'] + write_data = [self.session_data] + for _, vector_set in vector_results: + vector_set_id = vector_set['vsId'] + logging.info(f'Downloading verdict for vector set {vector_set_id}') + while True: + result = requests.get( + f'{self.config["url"]}{session_url}' + f'/vectorSets/{vector_set_id}/results', + cert=self.cert, + headers={ + 'Authorization': f'Bearer {self.session_data["jwt"]}' + } + ) + version, result_json = result.json() + if 'retry' in result_json: + duration = result_json['retry'] + logging.info(f'Vector set verdict not ready, waiting ' + f'{duration} seconds...') + time.sleep(duration) + continue + + write_data.append([version, result_json]) + break + return write_data + + def upload(self, vector_sets: List[Any]) -> bool: + """Upload the given vector sets. + + @param vector_sets: List of vector set dictionaries with answers. + @return: True if uploading succeeded. + """ + has_error = False + session_url = self.session_data['url'] + + for version, vector_set in vector_sets: + response = requests.post( + f'{self.config["url"]}{session_url}/vectorSets/' + f'{vector_set["vsId"]}/results', + json=[version, vector_set], + cert=self.cert, + headers={'Authorization': f'Bearer {self.session_data["jwt"]}'} + ) + + if not response.ok: + has_error = True + logging.error(f'Could not upload vector set response for ' + f'vector set ID {vector_set["vsId"]}.') + logging.error(json.dumps(response.json(), indent=4)) + continue + + return not has_error + + +def main(request_path: Optional[str], + response_path: Optional[str], + verdict_path: Optional[str], + do_upload: bool, + config_path: str): + + if request_path and response_path: + logging.error('You cannot use both a request and a response file.') + sys.exit(1) + + if not any([request_path, response_path]): + logging.error('You must specify either a request or a response file.') + sys.exit(1) + + proxy = ACVPProxy( + cert_path=os.getenv('ACVP_CERT_FILE'), + key_path=os.getenv('ACVP_KEY_FILE'), + totp_path=os.getenv('ACVP_SEED_FILE'), + config_path=config_path, + ) + + logging.info('Attempting to log in...') + if not proxy.login(): + logging.error('Could not log in.') + sys.exit(1) + logging.info('Successfully logged in.') + + if request_path: + logging.info('Creating a new test session and downloading vectors...') + test_session = proxy.register() + if not test_session: + logging.error('Could not create a new test session.') + sys.exit(1) + + with open(request_path, 'w') as f: + json.dump(test_session, f, indent=4) + elif response_path: + logging.info('Using response file...') + with open(response_path, 'r') as upload_file: + upload_json: List[Any] = json.load(upload_file) + proxy.session_data = upload_json[0] + + if do_upload: + logging.info('Uploading response file...') + if not proxy.upload(upload_json[1:]): + logging.error('Could not successfully upload results file.') + sys.exit(1) + + if verdict_path: + logging.info('Fetching verdict...') + verdict = proxy.fetch_verdict(upload_json[1:]) + if not verdict: + logging.error('Could not successfully fetch verdict file.') + sys.exit(1) + with open(verdict_path, 'w') as f: + json.dump(verdict, f, indent=4) + + logging.info('Done') + + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description='ACVP tool to fetch tests ' + 'and upload results.') + parser.add_argument('--request', '-r', + help='Path to download a request file. ' + 'If specified, the tool start a new test session ' + 'and download the test information into the ' + 'given path.') + parser.add_argument('--response', '-s', + help='Path of the response file. ' + 'If specified, the tool will use this file ' + 'to determine session information. ' + 'This argument must be used when uploading ' + 'results or downloading verdicts.') + parser.add_argument('--verdict', '-v', + help='Download the verdict to the specified path. ' + 'If this flag is set, the tool will download the ' + 'verdict for the given response file ' + '(--response).') + parser.add_argument('--upload', '-u', + help='Upload the given response file to the API. ' + 'If this flag is set, the tool will upload the ' + 'given response file (--response).', + action='store_true') + parser.add_argument('--config', '-c', + help='Path of the configuration file. ' + '(Default: acvp_config.json)', + default='acvp_config.json') + + args = parser.parse_args() + + main( + request_path=args.request, + response_path=args.response, + verdict_path=args.verdict, + do_upload=args.upload, + config_path=args.config, + ) -- 2.25.1
next prev parent reply other threads:[~2022-01-24 19:09 UTC|newest] Thread overview: 8+ messages / expand[flat|nested] mbox.gz Atom feed top 2022-01-24 19:09 [PATCH 0/4] Add ACVP API tool Brandon Lo 2022-01-24 19:09 ` Brandon Lo [this message] 2022-01-24 20:24 ` [PATCH 1/4] tools: add acvp_tool Thomas Monjalon 2022-01-25 19:07 ` Brandon Lo 2022-01-25 20:33 ` Thomas Monjalon 2022-01-24 19:09 ` [PATCH 2/4] tools: add default config file for acvp_tool Brandon Lo 2022-01-24 19:09 ` [PATCH 3/4] tools: add requirements " Brandon Lo 2022-01-24 19:09 ` [PATCH 4/4] doc: add readme " Brandon Lo
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=20220124190923.745577-2-blo@iol.unh.edu \ --to=blo@iol.unh.edu \ --cc=ci@dpdk.org \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
DPDK CI discussions This inbox may be cloned and mirrored by anyone: git clone --mirror http://inbox.dpdk.org/ci/0 ci/git/0.git # If you have public-inbox 1.1+ installed, you may # initialize and index your mirror using the following commands: public-inbox-init -V2 ci ci/ http://inbox.dpdk.org/ci \ ci@dpdk.org public-inbox-index ci Example config snippet for mirrors. Newsgroup available over NNTP: nntp://inbox.dpdk.org/inbox.dpdk.ci AGPL code for this site: git clone https://public-inbox.org/public-inbox.git