Source code for hypernets.hyperctl.batch

# -*- encoding: utf-8 -*-
import json
import tempfile
import os
from collections import OrderedDict
from pathlib import Path
from typing import Dict, Optional

import psutil

from hypernets.hyperctl import consts
from hypernets.utils import logging
from hypernets.utils.common import generate_short_id

logger = logging.getLogger(__name__)


class _ShellJob:  # internal class

    STATUS_INIT = 'init'
    STATUS_RUNNING = 'running'
    STATUS_SUCCEED = 'succeed'
    STATUS_FAILED = 'failed'

    FINAL_STATUS = [STATUS_SUCCEED, STATUS_FAILED]

    def __init__(self,  *, name, batch, params, working_dir=None, assets=None, resource=None):

        self.name = name
        self.batch = batch
        self.params = params
        self.resource = resource

        # write job files to tmp
        self.data_dir_path = Path(tempfile.gettempdir()) \
                             / f"{consts.JOB_DATA_DIR_PREFIX}{self.batch.name}_{self.name}_{generate_short_id()}"

        if working_dir is None:
            self.working_dir = self.data_dir_path.as_posix()
        else:
            self.working_dir = working_dir

        self.assets = [] if assets is None else assets

        self.start_datetime = None
        self.end_datetime = None

        self._status = _ShellJob.STATUS_INIT
        self._ext = {}

    @property
    def ext(self):
        return self._ext

    def set_ext(self, ext):
        self._ext = ext

    def state_data_file(self):  # on master node
        return (self.batch.data_dir_path / f"{self.name}.json").as_posix()

    def state_data(self):
        with open(self.state_data_file(), 'r') as f:
            return json.load(f)

    def set_status(self, status):
        self._status = status

    @property
    def status(self):
        return self._status

    @property
    def run_file(self):
        return (self.data_dir_path / "run.sh").as_posix()

    @property
    def resources_path(self):
        return self.data_dir_path / "resources"  # resources should be copied to working dir

    def to_dict(self):
        import copy
        config_dict = copy.copy(self.to_config())
        return config_dict

    def to_config(self):
        return {
            "name": self.name,
            "params": self.params,
            "resource": self.resource,
            "working_dir": self.working_dir,
            "assets": self.assets
        }

    @property
    def elapsed(self):
        if self.start_datetime is not None and self.end_datetime is not None:
            return self.end_datetime - self.start_datetime
        else:
            return None


[docs]class ServerConf: # API server conf def __init__(self, host="localhost", port=8060, exit_on_finish=False): self.host = host self.port = port self.exit_on_finish = exit_on_finish @property def portal(self): return f"http://{self.host}:{self.port}"
[docs] def to_config(self): return { "host": self.host, "port": self.port, "exit_on_finish": self.exit_on_finish }
[docs]class BackendConf: def __init__(self, type = 'local', conf: Dict = None): self.type = type if conf is None: conf = {} self.conf = conf
[docs] def to_config(self): return { "type": self.type, "conf": self.conf }
[docs]class Batch: FILE_CONFIG = "config.json" FILE_PID = "server.pid" STATUS_NOT_START = "NOT_START" STATUS_RUNNING = "RUNNING" STATUS_FINISHED = "FINISHED" def __init__(self, *, name, job_command, data_dir: str): self.name = name self.job_command = job_command self.data_dir = data_dir self._jobs_dict = OrderedDict() self.start_time = None self.end_time = None @property def data_dir_path(self): return Path(self.data_dir) @property def jobs(self): return list(self._jobs_dict.values())
[docs] def job_status_file_path(self, job_name, status): return (self.data_dir_path / f"{job_name}.{status}").as_posix()
[docs] def job_state_data_file_path(self, job_name): return (self.data_dir_path / f"{job_name}.json").as_posix()
[docs] def status_files(self): return self._status_files([_ShellJob.STATUS_FAILED, _ShellJob.STATUS_SUCCEED, _ShellJob.STATUS_RUNNING])
def _status_files(self, statuses): return {status: f"{self.name}.{status}" for status in statuses}
[docs] def get_persisted_job_status(self, job_name): exists_statuses = [] for status_value, status_file in self.status_files().items(): abs_status_file = self.job_status_file_path(job_name, status_value) if os.path.exists(abs_status_file): exists_statuses.append((status_value, status_file)) status_len = len(exists_statuses) if status_len > 1: files_msg = ",".join(map(lambda _: _[1], exists_statuses)) raise ValueError(f"Invalid status, multiple status files exists: {files_msg}") elif status_len == 1: return exists_statuses[0][0] else: # no status file return _ShellJob.STATUS_INIT
[docs] def add_job(self, name, **kwargs): assert name not in self._jobs_dict, f'job {name} is already exists' # check job name self._jobs_dict[name] = _ShellJob(name=name, batch=self, **kwargs)
[docs] def status(self): pid = self.pid() if pid is None: return self.STATUS_NOT_START try: psutil.Process(pid) return self.STATUS_RUNNING except Exception as e: return self.STATUS_FINISHED finally: pass
[docs] def is_finished(self): exists_status = set([job.status for job in self.jobs]) return exists_status.issubset(set(_ShellJob.FINAL_STATUS))
[docs] def config_file_path(self): return self.data_dir_path / self.FILE_CONFIG
[docs] def pid_file_path(self): return self.data_dir_path / self.FILE_PID
[docs] def pid(self): pid_file_path = self.pid_file_path() if pid_file_path.exists(): with open(pid_file_path, 'r') as f: return int(f.read()) else: return None
[docs] def get_job_by_name(self, job_name) -> Optional[_ShellJob]: for job in self.jobs: if job.name == job_name: return job return None
[docs] def summary(self): batch = self def _filter_jobs(status): return list(filter(lambda j: j.status == status, batch.jobs)) def cnt(status): return len(_filter_jobs(status)) return { "name": batch.name, 'status': batch.status(), 'total': len(batch.jobs), _ShellJob.STATUS_FAILED: cnt(_ShellJob.STATUS_FAILED), _ShellJob.STATUS_INIT: cnt(_ShellJob.STATUS_INIT), _ShellJob.STATUS_SUCCEED: cnt(_ShellJob.STATUS_SUCCEED), _ShellJob.STATUS_RUNNING: cnt(_ShellJob.STATUS_RUNNING), }
@property def elapsed(self): if self.start_time is not None and self.end_time is not None: return self.end_time - self.start_time else: return None