Source code for hypernets.hyperctl.server

# -*- encoding: utf-8 -*-
import json
import sys
from typing import Optional, Awaitable

from tornado.log import app_log
from tornado.web import RequestHandler, Finish, HTTPError, Application

from hypernets.hyperctl.batch import Batch
from hypernets.hyperctl.batch import _ShellJob
from hypernets.hyperctl.executor import RemoteSSHExecutorManager
from hypernets.hyperctl.scheduler import JobScheduler
from hypernets.hyperctl.utils import http_portal
from hypernets.utils import logging as hyn_logging
import copy

logger = hyn_logging.getLogger(__name__)


[docs]class RestResult(object): def __init__(self, code, body): self.code = code self.body = body
[docs] def to_dict(self): return {"code": self.code, "data": self.body}
[docs] def to_json(self): return json.dumps(self.to_dict())
[docs]class RestCode(object): Success = 0 Exception = -1
[docs]class BaseHandler(RequestHandler):
[docs] def data_received(self, chunk: bytes) -> Optional[Awaitable[None]]: pass
def _handle_request_exception(self, e): if isinstance(e, Finish): # Not an error; just finish the request without logging. if not self._finished: self.finish(*e.args) return try: self.log_exception(*sys.exc_info()) except Exception: # An error here should still get a best-effort send_error() # to avoid leaking the connection. app_log.error("Error in exception logger", exc_info=True) if self._finished: # Extra errors after the request has been finished should # be logged, but there is no reason to continue to try and # send a response. return if isinstance(e, HTTPError): self.send_error_content(str(e)) else: self.send_error_content(str(e))
[docs] def send_error_content(self, msg): # msg = "\"%s\"" % msg.replace("\"", "\\\"") _s = RestResult(RestCode.Exception, str(msg)) self.set_header("Content-Type", "application/json") self.finish(_s.to_json())
[docs] def response(self, result: dict = None, code=RestCode.Success): rest_result = RestResult(code, result) self.response_json(rest_result.to_dict())
[docs] def response_json(self, response_dict): self.set_header("Content-Type", "application/json") self.write(json.dumps(response_dict, indent=4))
[docs] def get_request_as_dict(self): body = self.request.body return json.loads(body)
[docs]class IndexHandler(BaseHandler):
[docs] def get(self, *args, **kwargs): return self.finish("Welcome to hyperctl.")
[docs]def to_job_detail(job, batch): # add 'status' to return dict job_dict = job.to_config() config_dict = copy.copy(job_dict) config_dict['status'] = job.status return config_dict
[docs]class JobHandler(BaseHandler):
[docs] def get(self, job_name, **kwargs): job = self.batch.get_job_by_name(job_name) if job is None: self.response({"msg": "resource not found"}, RestCode.Exception) else: ret_dict = to_job_detail(job, self.batch) return self.response(ret_dict)
[docs] def initialize(self, batch: Batch): self.batch = batch
[docs]class JobOperationHandler(BaseHandler): OPT_KILL = 'kill'
[docs] def post(self, job_name, operation, **kwargs): # kill job if operation == self.OPT_KILL: self.job_scheduler.kill_job(job_name) self.response({"msg": f"{job_name} killed"}) else: raise ValueError(f"unknown operation {operation} ")
[docs] def initialize(self, batch: Batch, job_scheduler: JobScheduler): self.batch = batch self.job_scheduler = job_scheduler
[docs]class JobListHandler(BaseHandler):
[docs] def get(self, *args, **kwargs): jobs_dict = [] for job in self.batch.jobs: jobs_dict.append(to_job_detail(job, self.batch)) self.response({"jobs": jobs_dict})
[docs] def initialize(self, batch: Batch): self.batch = batch
[docs]class HyperctlWebApplication(Application): def __init__(self, host="localhost", port=8060, **kwargs): self.host = host self.port = port super().__init__(**kwargs) @property def portal(self): return http_portal(self.host, self.port)
[docs]def create_batch_manage_webapp(server_host, server_port, batch, job_scheduler) -> HyperctlWebApplication: handlers = create_hyperctl_handlers(batch, job_scheduler) application = HyperctlWebApplication(host=server_host, port=server_port, handlers=handlers) return application
[docs]def create_hyperctl_handlers(batch, job_scheduler): handlers = [ (r'/hyperctl/api/job/(?P<job_name>.+)/(?P<operation>.+)', JobOperationHandler, dict(batch=batch, job_scheduler=job_scheduler)), (r'/hyperctl/api/job/(?P<job_name>.+)', JobHandler, dict(batch=batch)), (r'/hyperctl/api/job', JobListHandler, dict(batch=batch)), (r'/hyperctl', IndexHandler) ] return handlers