diff options
Diffstat (limited to 'fg21sim/webui/handlers')
| -rw-r--r-- | fg21sim/webui/handlers/__init__.py | 1 | ||||
| -rw-r--r-- | fg21sim/webui/handlers/console.py | 275 | ||||
| -rw-r--r-- | fg21sim/webui/handlers/log.py | 61 | ||||
| -rw-r--r-- | fg21sim/webui/handlers/websocket.py | 436 | 
4 files changed, 773 insertions, 0 deletions
diff --git a/fg21sim/webui/handlers/__init__.py b/fg21sim/webui/handlers/__init__.py index f97ef07..0c01ea7 100644 --- a/fg21sim/webui/handlers/__init__.py +++ b/fg21sim/webui/handlers/__init__.py @@ -3,3 +3,4 @@  from .index import IndexHandler  from .login import LoginHandler +from .websocket import FG21simWSHandler diff --git a/fg21sim/webui/handlers/console.py b/fg21sim/webui/handlers/console.py new file mode 100644 index 0000000..fe1b500 --- /dev/null +++ b/fg21sim/webui/handlers/console.py @@ -0,0 +1,275 @@ +# Copyright (c) 2016 Weitian LI <liweitianux@live.com> +# MIT license + +""" +Handle the "console" type of messages from the client. +""" + +import json +import logging +from concurrent.futures import ThreadPoolExecutor + +import tornado.ioloop +import tornado.gen + +from .log import WebSocketLogHandler + + +logger = logging.getLogger(__name__) + + +class ConsoleHandler: +    """ +    Handle the "console" type of messages from the client. + +    XXX/TODO: +    * How to kill the submitted task? (force kill thread?) + +    Parameters +    ---------- +    websocket : `~tornado.websocket.WebSocketHandler` +        An `~tornado.websocket.WebSocketHandler` instance. +        The ``WebSocketLogHandler`` requires this to push logging messages +        to the client. +    max_workers : int, optional +        Maximum number of workers/threads for the execution pool +    onetask_only : bool, optional +        Whether to allow only one task running at the same time? + +    Attributes +    ---------- +    websocket : `~tornado.websocket.WebSocketHandler` +        An `~tornado.websocket.WebSocketHandler` instance, which is used by +        the ``WebSocketLogHandler`` to push logging messages to the client. +    wsloghandler : `~WebSocketLogHandler` +        Push logging messages to the client through WebSocket. +    executor : `~concurrent.futures.ThreadPoolExecutor` +        Where to submit the synchronous task and make it perform +        asynchronously using threads. +    io_loop : `~tornado.ioloop.IOLoop` +        Used to communicate with the main thread (e.g., callback) from the +        submitted task, which is executed on a different thread. +    onetask_only : bool +        Whether to allow only one task running at the same time? +    status : dict +        Whether the task is running and/or finished? +        There may be 4 possible status: +        1. running=False, finished=False: not started +        2. running=False, finished=True:  finished +        3. running=True,  finished=False: running +        4. running=True,  finished=True:  ?? error ?? +    """ +    def __init__(self, websocket, max_workers=3, onetask_only=False): +        self.websocket = websocket +        self.wsloghandler = WebSocketLogHandler(websocket, msg_type="console") +        self.executor = ThreadPoolExecutor(max_workers=1) +        # NOTE: +        # Use ``IOLoop.instance`` instead of ``IOLoop.current``, since we +        # will need to communicate with the main thread from another thread. +        self.io_loop = tornado.ioloop.IOLoop.instance() +        self.onetask_only = onetask_only +        self.status = {"running": False, "finished": False} + +    def handle_message(self, msg): +        try: +            msg_type = msg["type"] +            msg_action = msg["action"] +            response = {"type": msg_type, "action": msg_action} +            logger.info("WebSocket: handle message: " + +                        "type: {0}, action: {1}".format(msg_type, msg_action)) +            if msg_action == "start": +                # FIXME/XXX: This task should be asynchronous! +                success, error = self._start() +                response["success"] = success +                if not success: +                    response["error"] = error +            elif msg_action == "start_test": +                # FIXME/XXX: This task should be asynchronous! +                success, error = self._start(msg["time"]) +                response["success"] = success +                if not success: +                    response["error"] = error +            elif msg_action == "get_status": +                response["success"] = True +                response["action"] = "push" +                response["status"] = self.status +            else: +                logger.warning("WebSocket: " + +                               "unknown action: {0}".format(msg_action)) +                response["success"] = False +                response["error"] = "unknown action: {0}".format(msg_action) +        except KeyError: +            # Received message has wrong syntax/format +            response = {"success": False, +                        "type": msg_type, +                        "error": "no action specified"} +        # +        logger.debug("WebSocket: response: {0}".format(response)) +        return response + +    # FIXME/XXX: +    # * How to call this task asynchronously ?? +    def _start_test(self, *args, **kwargs): +        """ +        Start the task by submitting it to the executor + +        Returns +        ------- +        success : bool +            Whether success without any errors +        error : str +            Detail of the error if not succeed + +        """ +        if self.onetask_only and self.status["running"]: +            logger.warning("Task already running, and only one task allowed") +            success = False +            error = "already running and only one task allowed" +        else: +            logger.info("Start the task on the executor ...") +            self.status["running"] = True +            self.status["finished"] = False +            # Also push the logging messages to the client +            self._add_wsloghandler() +            future = self.executor.submit(self._task_test, *args, **kwargs) +            self.io_loop.add_future(future, self._task_callback) +            success, error = future.result() +        return (success, error) + +    # FIXME/XXX: +    # * How to call this task asynchronously ?? +    def _start(self, *args, **kwargs): +        """ +        Start the task by submitting it to the executor + +        Returns +        ------- +        success : bool +            Whether success without any errors +        error : str +            Detail of the error if not succeed + +        """ +        if self.onetask_only and self.status["running"]: +            logger.warning("Task already running, and only one task allowed") +            success = False +            error = "already running and only one task allowed" +        else: +            logger.info("Start the task on the executor ...") +            self.status["running"] = True +            self.status["finished"] = False +            # Also push the logging messages to the client +            self._add_wsloghandler() +            future = self.executor.submit(self._task, *args, **kwargs) +            self.io_loop.add_future(future, self._task_callback) +            success, error = future.result() +        return (success, error) + +    def _response_future(self, future): +        """ +        Callback function which will be called when the caller finishes +        in order to response the results to the client. +        """ +        response = {"type": "console", "action": "future"} +        success, error = future.result() +        response["success"] = success +        if not success: +            response["error"] = error +        logger.debug("WebSocket: future response: {0}".format(response)) +        msg_response = json.dumps(response) +        self.websocket.write_message(msg_response) + +    def _add_wsloghandler(self): +        """Add the ``self.wsloghandler`` to the logging handlers""" +        root_logger = logging.getLogger() +        root_logger.addHandler(self.wsloghandler) +        logger.info("Added the WebSocket logging handler") + +    def _remove_wsloghandler(self): +        """Remove the ``self.wsloghandler`` from the logging handlers""" +        root_logger = logging.getLogger() +        root_logger.removeHandler(self.wsloghandler) +        logger.info("Removed the WebSocket logging handler") + +    def _task_callback(self, future): +        """Callback function executed when the task finishes""" +        logger.info("Console task finished! Callback ...") +        self.status["running"] = False +        self.status["finished"] = True +        self._remove_wsloghandler() +        # +        response = {"type": "console", "action": "push"} +        response["success"] = True +        response["status"] = self.status +        logger.debug("WebSocket: future response: {0}".format(response)) +        msg_response = json.dumps(response) +        self.websocket.write_message(msg_response) + +    def _task_test(self, *args, **kwargs): +        """ +        The task this console to manage. + +        Returns +        ------- +        success : bool +            Whether success without any errors +        error : str +            Detail of the error if not succeed + +        NOTE +        ---- +        The task is synchronous and may be computationally intensive +        (i.e., CPU-bound rather than IO/event-bound), therefore, +        threads (or processes) are required to make it non-blocking +        (i.e., asynchronous). + +        Credit: https://stackoverflow.com/a/32164711/4856091 +        """ +        import time +        logger.info("console task: START") +        for i in range(args[0]): +            logger.info("console task: slept {0} seconds ...".format(i)) +            time.sleep(1) +        logger.info("console task: DONE!") +        return (True, None) + +    def _task(self, *args, **kwargs): +        """ +        The task this console to manage. +        Perform the foregrounds simulations. + +        Returns +        ------- +        success : bool +            Whether success without any errors +        error : str +            Detail of the error if not succeed + +        NOTE +        ---- +        The task is synchronous and may be computationally intensive +        (i.e., CPU-bound rather than IO/event-bound), therefore, +        threads (or processes) are required to make it non-blocking +        (i.e., asynchronous). + +        Credit: https://stackoverflow.com/a/32164711/4856091 +        """ +        logger.info("Preparing to start foregrounds simulations ...") +        logger.info("Importing modules + Numba JIT, waiting ...") + +        from ..foregrounds import Foregrounds + +        # FIXME: This is a hack +        configs = self.websocket.configs +        logger.info("Checking the configurations ...") +        configs.check_all() + +        fg = Foregrounds(configs) +        fg.preprocess() +        fg.simulate() +        fg.postprocess() + +        logger.info("Foregrounds simulations DONE!") + +        # NOTE: Should always return a tuple of (success, error) +        return (True, None) diff --git a/fg21sim/webui/handlers/log.py b/fg21sim/webui/handlers/log.py new file mode 100644 index 0000000..0b457b5 --- /dev/null +++ b/fg21sim/webui/handlers/log.py @@ -0,0 +1,61 @@ +# Copyright (c) 2016 Weitian LI <liweitianux@live.com> +# MIT license + +""" +Custom logging handlers + +WebSocketLogHandler : +    Send logging messages to the WebSocket as JSON-encoded string. +""" + + +import logging +import json + + +class WebSocketLogHandler(logging.Handler): +    """ +    Send logging messages to the WebSocket as JSON-encoded string. + +    Parameters +    ---------- +    websocket : `~tornado.websocket.WebSocketHandler` +        An `~tornado.websocket.WebSocketHandler` instance, which has +        the ``write_message()`` method that will be used to send the +        logging messages. +    msg_type : str, optional +        Set the type of the sent back message, for easier processing +        by the client. + +    NOTE +    ---- +    The message sent through the WebSocket is a JSON-encoded string +    from a dictionary, e.g., +    ``{"type": self.msg_type, +       "action": "log", +       "levelname": record.levelname, +       "levelno": record.levelno, +       "name": record.name, +       "asctime": record.asctime, +       "message": <formatted-message>}`` +    """ +    def __init__(self, websocket, msg_type=None): +        super().__init__() +        self.websocket = websocket +        self.msg_type = msg_type + +    def emit(self, record): +        try: +            message = self.format(record) +            msg = json.dumps({ +                "type": self.msg_type, +                "action": "log", +                "levelname": record.levelname, +                "levelno": record.levelno, +                "name": record.name, +                "asctime": record.asctime, +                "message": message, +            }) +            self.websocket.write_message(msg) +        except Exception: +            self.handleError(record) diff --git a/fg21sim/webui/handlers/websocket.py b/fg21sim/webui/handlers/websocket.py new file mode 100644 index 0000000..7db953c --- /dev/null +++ b/fg21sim/webui/handlers/websocket.py @@ -0,0 +1,436 @@ +# Copyright (c) 2016 Weitian LI <liweitianux@live.com> +# MIT license + +""" +Communicate with the "fg21sim" simulation program through the Web UI using +the WebSocket_ protocol, which provides full-duplex communication channels +over a single TCP connection. + +.. _WebSocket: https://en.wikipedia.org/wiki/WebSocket + + +References +---------- +- Tornado WebSocket: +  http://www.tornadoweb.org/en/stable/websocket.html +- Can I Use: WebSocket: +  http://caniuse.com/#feat=websockets +""" + +import os +import json +import logging + +import tornado.websocket +from tornado.options import options + +from .console import ConsoleHandler +from ..utils import get_host_ip, ip_in_network +from ...errors import ConfigError + + +logger = logging.getLogger(__name__) + + +class FG21simWSHandler(tornado.websocket.WebSocketHandler): +    """ +    WebSocket for bi-directional communication between the Web UI and +    the server, which can deal with the configurations and execute the +    simulation task. + +    Generally, WebSocket send and receive data as *string*.  Therefore, +    the more complex data are stringified as JSON string before sending, +    which will be parsed after receive. + +    Each message (as a JSON object or Python dictionary) has a ``type`` +    field which will be used to determine the following action to take. + +    Attributes +    ---------- +    name : str +        Name to distinguish this WebSocket handle. +    from_localhost : bool +        Set to ``True`` if the access is from the localhost, +        otherwise ``False``. +    configs : `~ConfigManager` +        A ``ConfigManager`` instance, for configuration manipulations when +        communicating with the Web UI. +    """ +    name = "fg21sim" +    from_localhost = None + +    def check_origin(self, origin): +        """Check the origin of the WebSocket access. + +        Attributes +        ---------- +        from_localhost : bool +            Set to ``True`` if the access is from the localhost, +            otherwise ``False``. + +        NOTE +        ---- +        Currently, only allow access from the ``localhost`` +        (i.e., 127.0.0.1) and local LAN. +        """ +        self.from_localhost = False +        logger.info("WebSocket: {0}: origin: {1}".format(self.name, origin)) +        ip = get_host_ip(url=origin) +        network = options.hosts_allowed +        if ip == "127.0.0.1": +            self.from_localhost = True +            allow = True +            logger.info("WebSocket: %s: origin is localhost" % self.name) +        elif network.upper() == "ANY": +            # Any hosts are allowed +            allow = True +            logger.warning("WebSocket: %s: any hosts are allowed" % self.name) +        elif ip_in_network(ip, network): +            allow = True +            logger.info("WebSocket: %s: " % self.name + +                        "client is in the allowed network: %s" % network) +        else: +            allow = False +            logger.error("WebSocket: %s: " % self.name + +                         "client is NOT in the allowed network: %s" % network) +        return allow + +    def open(self): +        """Invoked when a new WebSocket is opened by the client.""" +        # FIXME: +        # * better to move to the `Application` class ?? +        # * or create a ``ConfigsHandler`` similar to the ``ConsoleHandler`` +        self.configs = self.application.configmanager +        self.console_handler = ConsoleHandler(websocket=self) +        # +        logger.info("WebSocket: {0}: opened".format(self.name)) +        logger.info("Allowed hosts: {0}".format(options.hosts_allowed)) + +    def on_close(self): +        """Invoked when a new WebSocket is closed by the client.""" +        code, reason = None, None +        if hasattr(self, "close_code"): +            code = self.close_code +        if hasattr(self, "close_reason"): +            reason = self.close_reason +        logger.info("WebSocket: {0}: closed by client: {1}, {2}".format( +            self.name, code, reason)) + +    # FIXME/XXX: +    # * How to be non-blocking ?? +    # NOTE: WebSocket.on_message: may NOT be a coroutine at the moment (v4.3) +    # References: +    # [1] https://stackoverflow.com/a/35543856/4856091 +    # [2] https://stackoverflow.com/a/33724486/4856091 +    def on_message(self, message): +        """Handle incoming messages and dispatch task according to the +        message type. + +        NOTE +        ---- +        The received message (parsed to a Python dictionary) has a ``type`` +        item which will be used to determine the following action to take. + +        Currently supported message types are: +        ``configs``: +            Request or set the configurations +        ``console``: +            Control the simulation tasks, or request logging messages +        ``results``: +            Request the simulation results + +        The sent message also has a ``type`` item of same value, which the +        client can be used to figure out the proper actions. +        There is a ``success`` item which indicates the status of the +        requested operation, and an ``error`` recording the error message +        if ``success=False``. +        """ +        logger.debug("WebSocket: %s: received: %s" % (self.name, message)) +        try: +            msg = json.loads(message) +            msg_type = msg["type"] +        except json.JSONDecodeError: +            logger.warning("WebSocket: {0}: ".format(self.name) + +                           "message is not a valid JSON string") +            response = {"success": False, +                        "type": None, +                        "error": "message is not a valid JSON string"} +        except (KeyError, TypeError): +            logger.warning("WebSocket: %s: skip invalid message" % self.name) +            response = {"success": False, +                        "type": None, +                        "error": "type is missing"} +        else: +            # Check the message type and dispatch task +            if msg_type == "configs": +                # Request or set the configurations +                response = self._handle_configs(msg) +            elif msg_type == "console": +                # Control the simulation tasks, or request logging messages +                # FIXME/XXX: +                # * How to make this asynchronously ?? +                response = self.console_handler.handle_message(msg) +            elif msg_type == "results": +                # Request the simulation results +                response = self._handle_results(msg) +            else: +                # Message of unknown type +                logger.warning("WebSocket: {0}: ".format(self.name) + +                               "unknown message type: {0}".format(msg_type)) +                response = {"success": False, +                            "type": msg_type, +                            "error": "unknown message type %s" % msg_type} +        # +        msg_response = json.dumps(response) +        self.write_message(msg_response) + +    def _handle_configs(self, msg): +        """Handle the message of type "configs", which request to get or +        set some configurations by the client. + +        TODO: +        * improve the description ... +        * split these handling functions into a separate class in a module + +        Parameters +        ---------- +        msg : dict +            A dictionary parsed from the incoming JSON message, which +            generally has the following syntax: +            ``{"type": "configs", "action": <action>, "data": <data>}`` +            where the ``<action>`` is ``set`` or ``get``, and the ``<data>`` +            is a list of config keys or a dict of config key-value pairs. + +        Returns +        ------- +        response : dict +            A dictionary parsed from the incoming JSON message, which +            generally has the following syntax: +            ``{"type": "configs", "action": <action>, +               "data": <data>, "errors": <errors>}`` +            where the ``<action>`` is the same as input, the ``<data>`` is +            a list of config keys or a dict of config key-value pairs, and +            ``<errors>`` contains the error message for the invalid config +            values. +        """ +        try: +            msg_type = msg["type"] +            msg_action = msg["action"] +            response = {"type": msg_type, "action": msg_action} +            logger.info("WebSocket: {0}: handle message: ".format(self.name) + +                        "type: {0}, action: {1}".format(msg_type, msg_action)) +            if msg_action == "get": +                # Get the values of the specified options +                try: +                    data, errors = self._get_configs(keys=msg["keys"]) +                    response["success"] = True +                    response["data"] = data +                    response["errors"] = errors +                except KeyError: +                    response["success"] = False +                    response["error"] = "'keys' is missing" +            elif msg_action == "set": +                # Set the values of the specified options +                try: +                    errors = self._set_configs(data=msg["data"]) +                    response["success"] = True +                    response["data"] = {}  # be more consistent +                    response["errors"] = errors +                except KeyError: +                    response["success"] = False +                    response["error"] = "'data' is missing" +            elif msg_action == "reset": +                # Reset the configurations to the defaults +                self._reset_configs() +                response["success"] = True +            elif msg_action == "load": +                # Load the supplied user configuration file +                try: +                    success, error = self._load_configs(msg["userconfig"]) +                    response["success"] = success +                    if not success: +                        response["error"] = error +                except KeyError: +                    response["success"] = False +                    response["error"] = "'userconfig' is missing" +            elif msg_action == "save": +                # Save current configurations to file +                try: +                    success, error = self._save_configs(msg["outfile"], +                                                        msg["clobber"]) +                    response["success"] = success +                    if not success: +                        response["error"] = error +                except KeyError: +                    response["success"] = False +                    response["error"] = "'outfile' or 'clobber' is missing" +            else: +                logger.warning("WebSocket: {0}: ".format(self.name) + +                               "unknown action: {0}".format(msg_action)) +                response["success"] = False +                response["error"] = "unknown action: {0}".format(msg_action) +        except KeyError: +            # Received message has wrong syntax/format +            response = {"success": False, +                        "type": msg_type, +                        "error": "no action specified"} +        # +        logger.debug("WebSocket: {0}: ".format(self.name) + +                     "response: {0}".format(response)) +        return response + +    def _get_configs(self, keys=None): +        """Get the values of the config options specified by the given keys. + +        Parameters +        ---------- +        keys : list[str], optional +            A list of keys specifying the config options whose values will +            be obtained. +            If ``keys=None``, then all the configurations values are dumped. + +        Returns +        ------- +        data : dict +            A dictionary with keys the same as the input keys, and values +            the corresponding config option values. +        errors : dict +            When error occurs (e.g., invalid key), then the specific errors +            with details are stored in this dictionary. + +        NOTE +        ---- +        Do not forget the ``userconfig`` option. +        """ +        if keys is None: +            # Dump all the configurations +            data = self.configs.dump(flatten=True) +            data["userconfig"] = self.configs.userconfig +            errors = {} +        else: +            data = {} +            errors = {} +            for key in keys: +                if key == "userconfig": +                    data["userconfig"] = self.configs.userconfig +                else: +                    try: +                        data[key] = self.configs.getn(key) +                    except KeyError as e: +                        errors[key] = str(e) +        # +        return (data, errors) + +    def _set_configs(self, data): +        """Set the values of the config options specified by the given keys +        to the corresponding supplied data. + +        NOTE +        ---- +        The ``userconfig`` needs special handle. +        The ``workdir`` and ``configfile`` options should be ignored. + +        Parameters +        ---------- +        data : dict +            A dictionary of key-value pairs, with keys specifying the config +            options whose value will be changed, and values the new values +            to which config options will be set. +            NOTE: +            If want to set the ``userconfig`` option, an *absolute path* +            must be provided. + +        Returns +        ------- +        errors : dict +            When error occurs (e.g., invalid key, invalid values), then the +            specific errors with details are stored in this dictionary. +        """ +        errors = {} +        for key, value in data.items(): +            if key in ["workdir", "configfile"]: +                # Ignore "workdir" and "configfile" +                continue +            elif key == "userconfig": +                if os.path.isabs(os.path.expanduser(value)): +                    self.configs.userconfig = value +                else: +                    errors[key] = "Not an absolute path" +            else: +                try: +                    self.configs.setn(key, value) +                except KeyError as e: +                    errors[key] = str(e) +        # NOTE: +        # Check the whole configurations after all provided options are +        # updated, and merge the validation errors. +        __, cherr = self.configs.check_all(raise_exception=False) +        errors.update(cherr) +        return errors + +    def _reset_configs(self): +        """Reset the configurations to the defaults.""" +        self.configs.reset() + +    def _load_configs(self, userconfig): +        """Load configurations from the provided user configuration file. + +        Parameters +        ---------- +        userconfig: str +            The filepath to the user configuration file, which must be +            an *absolute path*. + +        Returns +        ------- +        success : bool +            ``True`` if the operation succeeded, otherwise, ``False``. +        error : str +            If failed, this ``error`` saves the details, otherwise, ``None``. +        """ +        success = False +        error = None +        if os.path.isabs(os.path.expanduser(userconfig)): +            try: +                self.configs.read_userconfig(userconfig) +                success = True +            except ConfigError as e: +                error = str(e) +        else: +            error = "Not an absolute path" +        return (success, error) + +    def _save_configs(self, outfile, clobber=False): +        """Save current configurations to file. + +        Parameters +        ---------- +        outfile: str +            The filepath to the output configuration file, which must be +            an *absolute path*. +        clobber : bool, optional +            Whether overwrite the output file if already exists? + +        Returns +        ------- +        success : bool +            ``True`` if the operation succeeded, otherwise, ``False``. +        error : str +            If failed, this ``error`` saves the details, otherwise, ``None``. +        """ +        success = False +        error = None +        try: +            self.configs.save(outfile, clobber=clobber) +            success = True +        except (ValueError, OSError) as e: +            error = str(e) +        return (success, error) + +    def _handle_results(self, msg): +        # Got a message of supported types +        msg_type = msg["type"] +        logger.info("WebSocket: {0}: ".format(self.name) + +                    "handle message of type: {0}".format(msg_type)) +        response = {"success": True, "type": msg_type} +        return response  | 
