diff options
Diffstat (limited to 'fg21sim/webui/handlers')
| -rw-r--r-- | fg21sim/webui/handlers/console.py | 343 | 
1 files changed, 145 insertions, 198 deletions
diff --git a/fg21sim/webui/handlers/console.py b/fg21sim/webui/handlers/console.py index fe1b500..793198c 100644 --- a/fg21sim/webui/handlers/console.py +++ b/fg21sim/webui/handlers/console.py @@ -5,216 +5,158 @@  Handle the "console" type of messages from the client.  """ -import json  import logging -from concurrent.futures import ThreadPoolExecutor  import tornado.ioloop  import tornado.gen +from tornado.escape import json_decode, json_encode +from .base import BaseRequestHandler  from .log import WebSocketLogHandler  logger = logging.getLogger(__name__) -class ConsoleHandler: +class ConsoleAJAXHandler(BaseRequestHandler):      """ -    Handle the "console" type of messages from the client. - -    XXX/TODO: -    * How to kill the submitted task? (force kill thread?) - -    Parameters -    ---------- -    websocket : `~tornado.websocket.WebSocketHandler` -        An `~tornado.websocket.WebSocketHandler` instance. -        The ``WebSocketLogHandler`` requires this to push logging messages -        to the client. -    max_workers : int, optional -        Maximum number of workers/threads for the execution pool -    onetask_only : bool, optional -        Whether to allow only one task running at the same time? - -    Attributes -    ---------- -    websocket : `~tornado.websocket.WebSocketHandler` -        An `~tornado.websocket.WebSocketHandler` instance, which is used by -        the ``WebSocketLogHandler`` to push logging messages to the client. -    wsloghandler : `~WebSocketLogHandler` -        Push logging messages to the client through WebSocket. -    executor : `~concurrent.futures.ThreadPoolExecutor` -        Where to submit the synchronous task and make it perform -        asynchronously using threads. -    io_loop : `~tornado.ioloop.IOLoop` -        Used to communicate with the main thread (e.g., callback) from the -        submitted task, which is executed on a different thread. -    onetask_only : bool -        Whether to allow only one task running at the same time? -    status : dict -        Whether the task is running and/or finished? -        There may be 4 possible status: -        1. running=False, finished=False: not started -        2. running=False, finished=True:  finished -        3. running=True,  finished=False: running -        4. running=True,  finished=True:  ?? error ?? +    Handle the AJAX requests from the client to control the tasks.      """ -    def __init__(self, websocket, max_workers=3, onetask_only=False): -        self.websocket = websocket -        self.wsloghandler = WebSocketLogHandler(websocket, msg_type="console") -        self.executor = ThreadPoolExecutor(max_workers=1) +    # Allow only one task running at the same time +    onetask_only = True + +    def initialize(self): +        """Hook for subclass initialization.  Called for each request.""" +        self.configs = self.application.configmanager +        self.status = self.application.task_status          # NOTE:          # Use ``IOLoop.instance`` instead of ``IOLoop.current``, since we -        # will need to communicate with the main thread from another thread. +        # will need to communicate with the main thread (e.g., callback) +        # from another thread, which executes the submitted task.          self.io_loop = tornado.ioloop.IOLoop.instance() -        self.onetask_only = onetask_only -        self.status = {"running": False, "finished": False} -    def handle_message(self, msg): -        try: -            msg_type = msg["type"] -            msg_action = msg["action"] -            response = {"type": msg_type, "action": msg_action} -            logger.info("WebSocket: handle message: " + -                        "type: {0}, action: {1}".format(msg_type, msg_action)) -            if msg_action == "start": -                # FIXME/XXX: This task should be asynchronous! -                success, error = self._start() -                response["success"] = success -                if not success: -                    response["error"] = error -            elif msg_action == "start_test": -                # FIXME/XXX: This task should be asynchronous! -                success, error = self._start(msg["time"]) -                response["success"] = success -                if not success: -                    response["error"] = error -            elif msg_action == "get_status": -                response["success"] = True -                response["action"] = "push" -                response["status"] = self.status -            else: -                logger.warning("WebSocket: " + -                               "unknown action: {0}".format(msg_action)) -                response["success"] = False -                response["error"] = "unknown action: {0}".format(msg_action) -        except KeyError: -            # Received message has wrong syntax/format -            response = {"success": False, -                        "type": msg_type, -                        "error": "no action specified"} +    @tornado.web.authenticated +    def get(self): +        """ +        Handle the READ-ONLY tasks operations. + +        Supported actions: +        - get: Get the current status of tasks. +        """ +        action = self.get_argument("action", "get") +        if action == "get": +            response = {"status": self.status} +            success = True +        else: +            # ERROR: bad action +            success = False +            reason = "Bad request action: {0}".format(action)          # -        logger.debug("WebSocket: response: {0}".format(response)) -        return response +        if success: +            logger.debug("Response: {0}".format(response)) +            self.set_header("Content-Type", "application/json; charset=UTF-8") +            self.write(json_encode(response)) +        else: +            logger.warning("Request failed: {0}".format(reason)) +            self.send_error(400, reason=reason) -    # FIXME/XXX: -    # * How to call this task asynchronously ?? -    def _start_test(self, *args, **kwargs): +    @tornado.web.authenticated +    def post(self):          """ -        Start the task by submitting it to the executor +        Handle the READ-WRITE task operations. -        Returns -        ------- -        success : bool -            Whether success without any errors -        error : str -            Detail of the error if not succeed +        XXX/TODO: +        * How to kill the submitted task? (force kill thread?) +        Supported actions: +        - start: Start the default or specified task. +        - stop: Stop the running task (TODO/XXX)          """ -        if self.onetask_only and self.status["running"]: -            logger.warning("Task already running, and only one task allowed") +        request = json_decode(self.request.body) +        logger.debug("Received request: {0}".format(request)) +        action = request.get("action") +        if action == "start": +            task = request.get("task") +            kwargs = request.get("kwargs", {}) +            success, reason = self._start_task(task=task, kwargs=kwargs) +        elif action == "stop":              success = False -            error = "already running and only one task allowed" +            reason = "NOT implemented action: {0}".format(action)          else: -            logger.info("Start the task on the executor ...") -            self.status["running"] = True -            self.status["finished"] = False -            # Also push the logging messages to the client -            self._add_wsloghandler() -            future = self.executor.submit(self._task_test, *args, **kwargs) -            self.io_loop.add_future(future, self._task_callback) -            success, error = future.result() -        return (success, error) +            # ERROR: bad action +            success = False +            reason = "Bad request action: {0}".format(action) +        # +        if success: +            response = {"status": self.status} +            logger.debug("Response: {0}".format(response)) +            self.set_header("Content-Type", "application/json; charset=UTF-8") +            self.write(json_encode(response)) +        else: +            logger.warning("Request failed: {0}".format(reason)) +            self.send_error(400, reason=reason)      # FIXME/XXX:      # * How to call this task asynchronously ?? -    def _start(self, *args, **kwargs): +    def _start_task(self, task=None, kwargs={}):          """ -        Start the task by submitting it to the executor +        Start the task by submitting it to the executor. + +        Parameters +        ---------- +        task : str, optional +            The name of the task to be started. +            If not specified, then start the default task. +            NOTE: +            Currently only support the default (``None``) and ``test`` tasks. +        kwargs : dict, optional +            Keyword arguments to be passed to the task.          Returns          -------          success : bool -            Whether success without any errors +            Whether the task successfully finished?          error : str -            Detail of the error if not succeed - +            Error message if the task failed          """ +        TASKS = { +            None: self._task_default,  # default task +            "test": self._task_test, +        } +        try: +            f_task = TASKS[task] +        except KeyError: +            success = False +            error = "Unknown task: {0}".format(task) +            logger.warning(error) +            return (success, error) +        #          if self.onetask_only and self.status["running"]: -            logger.warning("Task already running, and only one task allowed")              success = False -            error = "already running and only one task allowed" +            error = "Task already running and only one task allowed" +            logger.warning(error)          else: -            logger.info("Start the task on the executor ...") +            logger.info("Submit the task to the executor ...")              self.status["running"] = True              self.status["finished"] = False -            # Also push the logging messages to the client +            # Also push the logging messages to clients through WebSocket              self._add_wsloghandler() -            future = self.executor.submit(self._task, *args, **kwargs) +            future = self.application.executor.submit(f_task, **kwargs)              self.io_loop.add_future(future, self._task_callback)              success, error = future.result()          return (success, error) -    def _response_future(self, future): -        """ -        Callback function which will be called when the caller finishes -        in order to response the results to the client. -        """ -        response = {"type": "console", "action": "future"} -        success, error = future.result() -        response["success"] = success -        if not success: -            response["error"] = error -        logger.debug("WebSocket: future response: {0}".format(response)) -        msg_response = json.dumps(response) -        self.websocket.write_message(msg_response) - -    def _add_wsloghandler(self): -        """Add the ``self.wsloghandler`` to the logging handlers""" -        root_logger = logging.getLogger() -        root_logger.addHandler(self.wsloghandler) -        logger.info("Added the WebSocket logging handler") - -    def _remove_wsloghandler(self): -        """Remove the ``self.wsloghandler`` from the logging handlers""" -        root_logger = logging.getLogger() -        root_logger.removeHandler(self.wsloghandler) -        logger.info("Removed the WebSocket logging handler") - -    def _task_callback(self, future): -        """Callback function executed when the task finishes""" -        logger.info("Console task finished! Callback ...") -        self.status["running"] = False -        self.status["finished"] = True -        self._remove_wsloghandler() -        # -        response = {"type": "console", "action": "push"} -        response["success"] = True -        response["status"] = self.status -        logger.debug("WebSocket: future response: {0}".format(response)) -        msg_response = json.dumps(response) -        self.websocket.write_message(msg_response) - -    def _task_test(self, *args, **kwargs): +    def _task_default(self, **kwargs):          """ -        The task this console to manage. +        The default task that this console manages, which performs +        the foregrounds simulations.          Returns          -------          success : bool -            Whether success without any errors +            Whether the task successfully finished?          error : str -            Detail of the error if not succeed +            Error message if the task failed          NOTE          ---- @@ -223,53 +165,58 @@ class ConsoleHandler:          threads (or processes) are required to make it non-blocking          (i.e., asynchronous). -        Credit: https://stackoverflow.com/a/32164711/4856091 -        """ -        import time -        logger.info("console task: START") -        for i in range(args[0]): -            logger.info("console task: slept {0} seconds ...".format(i)) -            time.sleep(1) -        logger.info("console task: DONE!") -        return (True, None) - -    def _task(self, *args, **kwargs): -        """ -        The task this console to manage. -        Perform the foregrounds simulations. - -        Returns -        ------- -        success : bool -            Whether success without any errors -        error : str -            Detail of the error if not succeed - -        NOTE -        ---- -        The task is synchronous and may be computationally intensive -        (i.e., CPU-bound rather than IO/event-bound), therefore, -        threads (or processes) are required to make it non-blocking -        (i.e., asynchronous). - -        Credit: https://stackoverflow.com/a/32164711/4856091 +        References: +        [1] https://stackoverflow.com/a/32164711/4856091          """ +        logger.info("Console DEFAULT task: START ...")          logger.info("Preparing to start foregrounds simulations ...")          logger.info("Importing modules + Numba JIT, waiting ...") -          from ..foregrounds import Foregrounds - -        # FIXME: This is a hack -        configs = self.websocket.configs +        #          logger.info("Checking the configurations ...") -        configs.check_all() - -        fg = Foregrounds(configs) +        self.configs.check_all() +        fg = Foregrounds(self.configs)          fg.preprocess()          fg.simulate()          fg.postprocess() -          logger.info("Foregrounds simulations DONE!") +        logger.info("Console DEFAULT task: DONE!") +        # NOTE: always return a tuple of (success, error) +        return (True, None) -        # NOTE: Should always return a tuple of (success, error) +    def _task_test(self, **kwargs): +        """ +        Test task ... +        """ +        import time +        logger.info("Console TEST task: START ...") +        for i in range(kwargs["time"]): +            logger.info("Console TEST task: slept {0} seconds ...".format(i)) +            time.sleep(1) +        logger.info("Console TEST task: DONE!")          return (True, None) + +    def _task_callback(self, future): +        """Callback function executed when the task finishes""" +        logger.info("Task finished! Callback ...") +        self.status["running"] = False +        self.status["finished"] = True +        self._remove_wsloghandler() +        logger.info("Callback DONE!") + +    def _add_wsloghandler(self): +        """ +        Add a WebSocket handler to the logging handlers, which will capture +        all the logging messages and push them to the client. +        """ +        self.wsloghandler = WebSocketLogHandler(self.application.websockets, +                                                msg_type="console") +        root_logger = logging.getLogger() +        root_logger.addHandler(self.wsloghandler) +        logger.info("Added the WebSocket logging handler") + +    def _remove_wsloghandler(self): +        """Remove the WebSocket logging handler""" +        root_logger = logging.getLogger() +        root_logger.removeHandler(self.wsloghandler) +        logger.info("Removed the WebSocket logging handler")  | 
