# -*- encoding: utf-8 -*-
import argparse
import itertools
import json
import os
from pathlib import Path
import prettytable as pt
from hypernets import __version__ as current_version
from hypernets.hyperctl import api
from hypernets.hyperctl import consts
from hypernets.hyperctl.appliation import BatchApplication
from hypernets.hyperctl.batch import Batch
from hypernets.hyperctl.utils import load_yaml, load_json, copy_item
from hypernets.utils import logging
from hypernets.utils import logging as hyn_logging, common as common_util
logger = logging.getLogger(__name__)
[docs]def get_default_batches_data_dir():
bdd_env = os.environ.get(consts.KEY_ENV_BATCHES_DATA_DIR)
if bdd_env is None:
bdd_default = Path("~/hyperctl-batches-data-dir").expanduser().as_posix()
return bdd_default
else:
return bdd_env
[docs]def run_batch_config(config_dict, batches_data_dir):
# add batch name
batch_name = config_dict.get('name')
if batch_name is None:
batch_name = common_util.generate_short_id()
logger.debug(f"generated batch name {batch_name}")
# add job name
jobs_dict = config_dict['jobs']
for job_dict in jobs_dict:
if job_dict.get('name') is None:
job_name = common_util.generate_short_id()
logger.debug(f"generated job name {job_name}")
job_dict['name'] = job_name
app = BatchApplication.load(config_dict, batches_data_dir)
app.start()
[docs]def run_generate_job_specs(template, output):
yaml_file = template
output_path = Path(output)
# 1. validation
# 1.1. checkout output
if output_path.exists():
raise FileExistsError(output)
# load file
config_dict = load_yaml(yaml_file)
# 1.3. check values should be a list
assert "params" in config_dict
params = config_dict['params']
for k, v in params.items():
if not isinstance(v, list):
raise ValueError(f"Value of param '{k}' should be list")
# 1.4. check command exists
assert "command" in config_dict
# assert "data_dir" in config_dict
# assert "working_dir" in config_dict
# 2. combine params to generate jobs
job_param_names = params.keys()
param_values = [params[_] for _ in job_param_names]
def make_job_dict(job_param_values):
job_params_dict = dict(zip(job_param_names, job_param_values))
job_dict = {
"name": common_util.generate_short_id(),
"params": job_params_dict
}
copy_item(config_dict, job_dict, 'resource')
copy_item(config_dict, job_dict, 'data_dir')
copy_item(config_dict, job_dict, 'working_dir')
return job_dict
jobs = [make_job_dict(_) for _ in itertools.product(*param_values)]
# 3. merge to bath spec
batch_spec = {
"jobs": jobs,
'name': config_dict.get('name', common_util.generate_short_id()),
"version": config_dict.get('version', current_version)
}
copy_item(config_dict, batch_spec, 'server')
copy_item(config_dict, batch_spec, 'scheduler')
copy_item(config_dict, batch_spec, 'backend')
# 4. write to file
os.makedirs(output_path.parent, exist_ok=True)
with open(output_path, 'w', newline='\n') as f:
f.write(json.dumps(batch_spec, indent=4))
return batch_spec
def _load_batch_data_dir(batches_data_dir: str, batch_name) -> BatchApplication:
batch_data_dir_path = Path(batches_data_dir) / batch_name
spec_file_path = batch_data_dir_path / Batch.FILE_CONFIG
if not spec_file_path.exists():
raise RuntimeError(f"batch {batch_name} not exists")
batch_spec_dict = load_json(spec_file_path)
return BatchApplication.load(batch_spec_dict, batch_data_dir_path)
[docs]def run_show_jobs(batch_name, batches_data_dir):
batch_app = _load_batch_data_dir(batches_data_dir, batch_name)
batch = batch_app.batch
if batch.STATUS_RUNNING != batch.status():
raise RuntimeError("batch is not running")
api_server_portal = batch_app.web_app.portal
jobs_dict = api.list_jobs(api_server_portal)
headers = ['name', 'status']
tb = pt.PrettyTable(headers)
for job_dict in jobs_dict:
row = [job_dict.get(header) for header in headers]
tb.add_row(row)
print(tb)
[docs]def run_kill_job(batch_name, job_name, batches_data_dir):
batch_app = _load_batch_data_dir(batches_data_dir, batch_name)
batch = batch_app.batch
if batch.STATUS_RUNNING != batch.status():
raise RuntimeError("batch is not running")
api_server_portal = batch_app.web_app.portal
jobs_dict = api.kill_job(api_server_portal, job_name)
print(json.dumps(jobs_dict))
[docs]def show_job(batch_name, job_name, batches_data_dir):
batch_app = _load_batch_data_dir(batches_data_dir, batch_name)
batch = batch_app.batch
if batch.STATUS_RUNNING != batch.status():
raise RuntimeError("batch is not running")
api_server_portal = batch_app.web_app.portal
job_dict = api.get_job(job_name, api_server_portal)
job_desc = json.dumps(job_dict, indent=4)
print(job_desc)
[docs]def run_batch_config_file(config, batches_data_dir):
config_dict = load_json(config)
run_batch_config(config_dict, batches_data_dir)
[docs]def run_show_batches(batches_data_dir):
batches_data_dir_path = Path(batches_data_dir)
if not batches_data_dir_path.exists():
print("null")
return
# batches_data_dir_path = Path(batches_data_dir)
# batches_name =
batches_name = []
batches_summary = []
for filename in os.listdir(batches_data_dir):
if (batches_data_dir_path / filename).is_dir():
batches_name.append(filename)
batch_app = _load_batch_data_dir(batches_data_dir, filename)
batch_summary = batch_app.summary_batch()
batches_summary.append(batch_summary)
if len(batches_summary) == 0:
print("empty")
return
# to csv format
headers = batches_summary[0].keys()
tb = pt.PrettyTable(headers)
for batch_summary_ in batches_summary:
row = [batch_summary_.get(header) for header in headers]
tb.add_row(row)
print(tb)
[docs]def main():
"""
Examples:
cd hypernets/tests/hyperctl/
hyperctl run --config ./local_batch.json
hyperctl batch list
hyperctl job list --batch-name=local-batch-example
hyperctl job describe --job-name=job1 --batch-name=local-batch-example
hyperctl job kill --job-name=job1 --batch-name=local-batch-example
hyperctl job kill --job-name=job2 --batch-name=local-batch-example
hyperctl batch list
:return:
"""
bdd_help = f"batches data dir, default get from environment variable {consts.KEY_ENV_BATCHES_DATA_DIR}"
default_batches_data_dir = get_default_batches_data_dir()
def setup_global_args(global_parser):
# console output
logging_group = global_parser.add_argument_group('Console outputs')
logging_group.add_argument('--log-level', type=str, default='INFO',
help='logging level, default is %(default)s')
logging_group.add_argument('-error', dest='log_level', action='store_const', const='ERROR',
help='alias of "--log-level=ERROR"')
logging_group.add_argument('-warn', dest='log_level', action='store_const', const='WARN',
help='alias of "--log-level=WARN"')
logging_group.add_argument('-info', dest='log_level', action='store_const', const='INFO',
help='alias of "--log-level=INFO"')
logging_group.add_argument('-debug', dest='log_level', action='store_const', const='DEBUG',
help='alias of "--log-level=DEBUG"')
def setup_batch_parser(operation_parser):
exec_parser = operation_parser.add_parser("batch", help="batch operations")
batch_subparsers = exec_parser.add_subparsers(dest="batch_operation")
batch_list_parse = batch_subparsers.add_parser("list", help="list batches")
batch_list_parse.add_argument("--batches-data-dir", help=bdd_help,
default=default_batches_data_dir, required=False)
def setup_job_parser(operation_parser):
exec_parser = operation_parser.add_parser("job", help="job operations")
batch_subparsers = exec_parser.add_subparsers(dest="job_operation")
job_list_parse = batch_subparsers.add_parser("list", help="list jobs")
job_list_parse.add_argument("-b", "--batch-name", help="batch name", default=None, required=True)
job_list_parse.add_argument("--batches-data-dir", help=bdd_help,
default=default_batches_data_dir, required=False)
def add_job_spec_args(parser_):
parser_.add_argument("-b", "--batch-name", help="batch name", default=None, required=True)
parser_.add_argument("-j", "--job-name", help="job name", default=None, required=True)
parser_.add_argument("--batches-data-dir", help=bdd_help, default=default_batches_data_dir,
required=False)
job_kill_parse = batch_subparsers.add_parser("kill", help="kill job")
add_job_spec_args(job_kill_parse)
job_describe_parse = batch_subparsers.add_parser("describe", help="describe job")
add_job_spec_args(job_describe_parse)
def setup_run_parser(operation_parser):
exec_parser = operation_parser.add_parser("run", help="run jobs")
exec_parser.add_argument("-c", "--config", help="specific jobs json file", default=None, required=True)
exec_parser.add_argument("--batches-data-dir", help=bdd_help, default=default_batches_data_dir,
required=False)
def setup_generate_parser(operation_parser):
exec_parser = operation_parser.add_parser("generate", help="generate specific jobs json file ")
exec_parser.add_argument("-t", "--template", help="template yaml file", default=None, required=True)
exec_parser.add_argument("-o", "--output", help="output json file", default="batch.json", required=False)
parser = argparse.ArgumentParser(prog="hyperctl",
description='hyperctl command is used to manage jobs', add_help=True)
setup_global_args(parser)
subparsers = parser.add_subparsers(dest="operation")
setup_run_parser(subparsers)
setup_generate_parser(subparsers)
setup_batch_parser(subparsers)
setup_job_parser(subparsers)
args_namespace = parser.parse_args()
kwargs = args_namespace.__dict__.copy()
log_level = kwargs.pop('log_level')
if log_level is None:
log_level = hyn_logging.INFO
hyn_logging.set_level(log_level)
operation = kwargs.pop('operation')
if operation == 'run':
run_batch_config_file(**kwargs)
elif operation == 'generate':
run_generate_job_specs(**kwargs)
elif operation == 'batch':
batch_operation = kwargs.pop('batch_operation')
if batch_operation == 'list':
run_show_batches(**kwargs)
else:
raise ValueError(f"unknown batch operation: {batch_operation} ")
elif operation == 'job':
job_operation = kwargs.pop('job_operation')
if job_operation == 'list':
run_show_jobs(**kwargs)
elif job_operation == 'kill':
run_kill_job(**kwargs)
elif job_operation == 'describe':
show_job(**kwargs)
else:
raise ValueError(f"unknown job operation: {job_operation} ")
else:
parser.print_help()
# raise ValueError(f"unknown job operation: {operation} ")
if __name__ == '__main__':
main()