diff options
| author | Aaron LI <aaronly.me@outlook.com> | 2016-11-09 20:22:49 +0800 | 
|---|---|---|
| committer | Aaron LI <aaronly.me@outlook.com> | 2016-11-09 20:22:49 +0800 | 
| commit | 0c2ab928af57178e6d7d3265709c3f1c3b654013 (patch) | |
| tree | ddf17af2dcdc6e52653aac4b9d8cb9e93efd1061 /fg21sim | |
| parent | 3113e2809249e1fa9a1c71f3e210d4d74ae47502 (diff) | |
| download | fg21sim-0c2ab928af57178e6d7d3265709c3f1c3b654013.tar.bz2 | |
webui: Add "ConsoleHandler" to handle the "console" type message
XXX/FIXME:
The console task will BLOCK the tornado, which should be FIXED!
However, the `WebSocket.on_message` currently may NOT be a coroutine
(as of Tornado v4.3), so another way should be taken to solve this
problem in order to call the console task asynchronously!
Diffstat (limited to 'fg21sim')
| -rw-r--r-- | fg21sim/webui/consolehandler.py | 199 | ||||
| -rw-r--r-- | fg21sim/webui/websocket.py | 32 | 
2 files changed, 219 insertions, 12 deletions
| diff --git a/fg21sim/webui/consolehandler.py b/fg21sim/webui/consolehandler.py new file mode 100644 index 0000000..0184a93 --- /dev/null +++ b/fg21sim/webui/consolehandler.py @@ -0,0 +1,199 @@ +# Copyright (c) 2016 Weitian LI <liweitianux@live.com> +# MIT license + +""" +Handle the "console" type of messages from the client. +""" + +import json +import logging +from concurrent.futures import ThreadPoolExecutor + +import tornado.ioloop +import tornado.gen + +from .loghandler import WebSocketLogHandler + + +logger = logging.getLogger(__name__) + + +class ConsoleHandler: +    """ +    Handle the "console" type of messages from the client. + +    XXX/TODO: +    * How to kill the submitted task? (force kill thread?) + +    Parameters +    ---------- +    websocket : `~tornado.websocket.WebSocketHandler` +        An `~tornado.websocket.WebSocketHandler` instance. +        The ``WebSocketLogHandler`` requires this to push logging messages +        to the client. +    max_workers : int, optional +        Maximum number of workers/threads for the execution pool +    onetask_only : bool, optional +        Whether to allow only one task running at the same time? + +    Attributes +    ---------- +    websocket : `~tornado.websocket.WebSocketHandler` +        An `~tornado.websocket.WebSocketHandler` instance, which is used by +        the ``WebSocketLogHandler`` to push logging messages to the client. +    wsloghandler : `~WebSocketLogHandler` +        Push logging messages to the client through WebSocket. +    executor : `~concurrent.futures.ThreadPoolExecutor` +        Where to submit the synchronous task and make it perform +        asynchronously using threads. +    io_loop : `~tornado.ioloop.IOLoop` +        Used to communicate with the main thread (e.g., callback) from the +        submitted task, which is executed on a different thread. +    onetask_only : bool +        Whether to allow only one task running at the same time? +    status : dict +        Whether the task is running and/or finished? +        There may be 4 possible status: +        1. running=False, finished=False: not started +        2. running=False, finished=True:  finished +        3. running=True,  finished=False: running +        4. running=True,  finished=True:  ?? error ?? +    """ +    def __init__(self, websocket, max_workers=3, onetask_only=False): +        self.websocket = websocket +        self.wsloghandler = WebSocketLogHandler(websocket, msg_type="console") +        self.executor = ThreadPoolExecutor(max_workers=1) +        # NOTE: +        # Use ``IOLoop.instance`` instead of ``IOLoop.current``, since we +        # will need to communicate with the main thread from another thread. +        self.io_loop = tornado.ioloop.IOLoop.instance() +        self.onetask_only = onetask_only +        self.status = {"running": False, "finished": False} + +    def handle_message(self, msg): +        try: +            msg_type = msg["type"] +            msg_action = msg["action"] +            response = {"type": msg_type, "action": msg_action} +            logger.info("WebSocket: handle message: " + +                        "type: {0}, action: {1}".format(msg_type, msg_action)) +            if msg_action == "start": +                # Start the task asynchronously +                future = self._start(msg["time"]) +                self.io_loop.add_future(future, self._response_future) +                response["success"] = True +                response["status"] = "future" +            elif msg_action == "get_status": +                response["success"] = True +                response["action"] = "push" +                response["status"] = self.status +            else: +                logger.warning("WebSocket: " + +                               "unknown action: {0}".format(msg_action)) +                response["success"] = False +                response["error"] = "unknown action: {0}".format(msg_action) +        except KeyError: +            # Received message has wrong syntax/format +            response = {"success": False, +                        "type": msg_type, +                        "error": "no action specified"} +        # +        logger.debug("WebSocket: response: {0}".format(response)) +        return response + +    # FIXME/XXX: +    # * How to call this task asynchronously ?? +    def _start(self, *args, **kwargs): +        """ +        Start the task by submitting it to the executor + +        Returns +        ------- +        success : bool +            Whether success without any errors +        error : str +            Detail of the error if not succeed + +        """ +        if self.onetask_only and self.status["running"]: +            logger.warning("Task already running, and only one task allowed") +            success = False +            error = "already running and only one task allowed" +        else: +            logger.info("Start the task on the executor ...") +            self.status["running"] = True +            self.status["finished"] = False +            # Also push the logging messages to the client +            self._add_wsloghandler() +            future = self.executor.submit(self._task, *args, **kwargs) +            self.io_loop.add_future(future, self._task_callback) +            success, error = future.result() +        return (success, error) + +    def _response_future(self, future): +        """ +        Callback function which will be called when the caller finishes +        in order to response the results to the client. +        """ +        response = {"type": "console", "action": "future"} +        success, error = future.result() +        response["success"] = success +        if not success: +            response["error"] = error +        logger.debug("WebSocket: future response: {0}".format(response)) +        msg_response = json.dumps(response) +        self.websocket.write_message(msg_response) + +    def _add_wsloghandler(self): +        """Add the ``self.wsloghandler`` to the logging handlers""" +        root_logger = logging.getLogger() +        root_logger.addHandler(self.wsloghandler) +        logger.info("Added the WebSocket logging handler") + +    def _remove_wsloghandler(self): +        """Remove the ``self.wsloghandler`` from the logging handlers""" +        root_logger = logging.getLogger() +        root_logger.removeHandler(self.wsloghandler) +        logger.info("Removed the WebSocket logging handler") + +    def _task_callback(self, future): +        """Callback function executed when the task finishes""" +        logger.info("Console task finished! Callback ...") +        self.status["running"] = False +        self.status["finished"] = True +        self._remove_wsloghandler() +        # +        response = {"type": "console", "action": "push"} +        response["success"] = True +        response["status"] = self.status +        logger.debug("WebSocket: future response: {0}".format(response)) +        msg_response = json.dumps(response) +        self.websocket.write_message(msg_response) + +    def _task(self, *args, **kwargs): +        """ +        The task this console to manage. + +        Returns +        ------- +        success : bool +            Whether success without any errors +        error : str +            Detail of the error if not succeed + +        NOTE +        ---- +        The task is synchronous and may be computationally intensive +        (i.e., CPU-bound rather than IO/event-bound), therefore, +        threads (or processes) are required to make it non-blocking +        (i.e., asynchronous). + +        Credit: https://stackoverflow.com/a/32164711/4856091 +        """ +        import time +        logger.info("console task: START") +        for i in range(args[0]): +            logger.info("console task: slept {0} seconds ...".format(i)) +            time.sleep(1) +        logger.info("console task: DONE!") +        return (True, None) diff --git a/fg21sim/webui/websocket.py b/fg21sim/webui/websocket.py index fffc19c..d205004 100644 --- a/fg21sim/webui/websocket.py +++ b/fg21sim/webui/websocket.py @@ -24,9 +24,10 @@ import logging  import tornado.websocket  from tornado.options import options +from .consolehandler import ConsoleHandler +from .utils import get_host_ip, ip_in_network  from ..configs import ConfigManager  from ..errors import ConfigError -from .utils import get_host_ip, ip_in_network  logger = logging.getLogger(__name__) @@ -58,7 +59,6 @@ class FG21simWSHandler(tornado.websocket.WebSocketHandler):      """      name = "fg21sim"      from_localhost = None -    configs = ConfigManager()      def check_origin(self, origin):          """Check the origin of the WebSocket access. @@ -98,6 +98,12 @@ class FG21simWSHandler(tornado.websocket.WebSocketHandler):      def open(self):          """Invoked when a new WebSocket is opened by the client.""" +        # FIXME: +        # * better to move to the `Application` class ?? +        # * or create a ``ConfigsHandler`` similar to the ``ConsoleHandler`` +        self.configs = ConfigManager() +        self.console_handler = ConsoleHandler(websocket=self) +        #          logger.info("WebSocket: {0}: opened".format(self.name))          logger.info("Allowed hosts: {0}".format(options.hosts_allowed)) @@ -111,6 +117,12 @@ class FG21simWSHandler(tornado.websocket.WebSocketHandler):          logger.info("WebSocket: {0}: closed by client: {1}, {2}".format(              self.name, code, reason)) +    # FIXME/XXX: +    # * How to be non-blocking ?? +    # NOTE: WebSocket.on_message: may NOT be a coroutine at the moment (v4.3) +    # References: +    # [1] https://stackoverflow.com/a/35543856/4856091 +    # [2] https://stackoverflow.com/a/33724486/4856091      def on_message(self, message):          """Handle incoming messages and dispatch task according to the          message type. @@ -156,7 +168,9 @@ class FG21simWSHandler(tornado.websocket.WebSocketHandler):                  response = self._handle_configs(msg)              elif msg_type == "console":                  # Control the simulation tasks, or request logging messages -                response = self._handle_console(msg) +                # FIXME/XXX: +                # * How to make this asynchronously ?? +                response = self.console_handler.handle_message(msg)              elif msg_type == "results":                  # Request the simulation results                  response = self._handle_results(msg) @@ -175,7 +189,9 @@ class FG21simWSHandler(tornado.websocket.WebSocketHandler):          """Handle the message of type "configs", which request to get or          set some configurations by the client. -        TODO: improve the description ... +        TODO: +        * improve the description ... +        * split these handling functions into a separate class in a module          Parameters          ---------- @@ -407,14 +423,6 @@ class FG21simWSHandler(tornado.websocket.WebSocketHandler):              error = str(e)          return (success, error) -    def _handle_console(self, msg): -        # Got a message of supported types -        msg_type = msg["type"] -        logger.info("WebSocket: {0}: ".format(self.name) + -                    "handle message of type: {0}".format(msg_type)) -        response = {"success": True, "type": msg_type} -        return response -      def _handle_results(self, msg):          # Got a message of supported types          msg_type = msg["type"] | 
