From e74ccdc92478ceb50aa2617e21f6f7eb4bc181de Mon Sep 17 00:00:00 2001 From: Aaron LI Date: Tue, 15 Nov 2016 18:23:00 +0800 Subject: webui: Place handlers under the directory "hnadlers/" --- fg21sim/webui/handlers/__init__.py | 1 + fg21sim/webui/handlers/console.py | 275 +++++++++++++++++++++++ fg21sim/webui/handlers/log.py | 61 +++++ fg21sim/webui/handlers/websocket.py | 436 ++++++++++++++++++++++++++++++++++++ 4 files changed, 773 insertions(+) create mode 100644 fg21sim/webui/handlers/console.py create mode 100644 fg21sim/webui/handlers/log.py create mode 100644 fg21sim/webui/handlers/websocket.py (limited to 'fg21sim/webui/handlers') diff --git a/fg21sim/webui/handlers/__init__.py b/fg21sim/webui/handlers/__init__.py index f97ef07..0c01ea7 100644 --- a/fg21sim/webui/handlers/__init__.py +++ b/fg21sim/webui/handlers/__init__.py @@ -3,3 +3,4 @@ from .index import IndexHandler from .login import LoginHandler +from .websocket import FG21simWSHandler diff --git a/fg21sim/webui/handlers/console.py b/fg21sim/webui/handlers/console.py new file mode 100644 index 0000000..fe1b500 --- /dev/null +++ b/fg21sim/webui/handlers/console.py @@ -0,0 +1,275 @@ +# Copyright (c) 2016 Weitian LI +# MIT license + +""" +Handle the "console" type of messages from the client. +""" + +import json +import logging +from concurrent.futures import ThreadPoolExecutor + +import tornado.ioloop +import tornado.gen + +from .log import WebSocketLogHandler + + +logger = logging.getLogger(__name__) + + +class ConsoleHandler: + """ + Handle the "console" type of messages from the client. + + XXX/TODO: + * How to kill the submitted task? (force kill thread?) + + Parameters + ---------- + websocket : `~tornado.websocket.WebSocketHandler` + An `~tornado.websocket.WebSocketHandler` instance. + The ``WebSocketLogHandler`` requires this to push logging messages + to the client. + max_workers : int, optional + Maximum number of workers/threads for the execution pool + onetask_only : bool, optional + Whether to allow only one task running at the same time? + + Attributes + ---------- + websocket : `~tornado.websocket.WebSocketHandler` + An `~tornado.websocket.WebSocketHandler` instance, which is used by + the ``WebSocketLogHandler`` to push logging messages to the client. + wsloghandler : `~WebSocketLogHandler` + Push logging messages to the client through WebSocket. + executor : `~concurrent.futures.ThreadPoolExecutor` + Where to submit the synchronous task and make it perform + asynchronously using threads. + io_loop : `~tornado.ioloop.IOLoop` + Used to communicate with the main thread (e.g., callback) from the + submitted task, which is executed on a different thread. + onetask_only : bool + Whether to allow only one task running at the same time? + status : dict + Whether the task is running and/or finished? + There may be 4 possible status: + 1. running=False, finished=False: not started + 2. running=False, finished=True: finished + 3. running=True, finished=False: running + 4. running=True, finished=True: ?? error ?? + """ + def __init__(self, websocket, max_workers=3, onetask_only=False): + self.websocket = websocket + self.wsloghandler = WebSocketLogHandler(websocket, msg_type="console") + self.executor = ThreadPoolExecutor(max_workers=1) + # NOTE: + # Use ``IOLoop.instance`` instead of ``IOLoop.current``, since we + # will need to communicate with the main thread from another thread. + self.io_loop = tornado.ioloop.IOLoop.instance() + self.onetask_only = onetask_only + self.status = {"running": False, "finished": False} + + def handle_message(self, msg): + try: + msg_type = msg["type"] + msg_action = msg["action"] + response = {"type": msg_type, "action": msg_action} + logger.info("WebSocket: handle message: " + + "type: {0}, action: {1}".format(msg_type, msg_action)) + if msg_action == "start": + # FIXME/XXX: This task should be asynchronous! + success, error = self._start() + response["success"] = success + if not success: + response["error"] = error + elif msg_action == "start_test": + # FIXME/XXX: This task should be asynchronous! + success, error = self._start(msg["time"]) + response["success"] = success + if not success: + response["error"] = error + elif msg_action == "get_status": + response["success"] = True + response["action"] = "push" + response["status"] = self.status + else: + logger.warning("WebSocket: " + + "unknown action: {0}".format(msg_action)) + response["success"] = False + response["error"] = "unknown action: {0}".format(msg_action) + except KeyError: + # Received message has wrong syntax/format + response = {"success": False, + "type": msg_type, + "error": "no action specified"} + # + logger.debug("WebSocket: response: {0}".format(response)) + return response + + # FIXME/XXX: + # * How to call this task asynchronously ?? + def _start_test(self, *args, **kwargs): + """ + Start the task by submitting it to the executor + + Returns + ------- + success : bool + Whether success without any errors + error : str + Detail of the error if not succeed + + """ + if self.onetask_only and self.status["running"]: + logger.warning("Task already running, and only one task allowed") + success = False + error = "already running and only one task allowed" + else: + logger.info("Start the task on the executor ...") + self.status["running"] = True + self.status["finished"] = False + # Also push the logging messages to the client + self._add_wsloghandler() + future = self.executor.submit(self._task_test, *args, **kwargs) + self.io_loop.add_future(future, self._task_callback) + success, error = future.result() + return (success, error) + + # FIXME/XXX: + # * How to call this task asynchronously ?? + def _start(self, *args, **kwargs): + """ + Start the task by submitting it to the executor + + Returns + ------- + success : bool + Whether success without any errors + error : str + Detail of the error if not succeed + + """ + if self.onetask_only and self.status["running"]: + logger.warning("Task already running, and only one task allowed") + success = False + error = "already running and only one task allowed" + else: + logger.info("Start the task on the executor ...") + self.status["running"] = True + self.status["finished"] = False + # Also push the logging messages to the client + self._add_wsloghandler() + future = self.executor.submit(self._task, *args, **kwargs) + self.io_loop.add_future(future, self._task_callback) + success, error = future.result() + return (success, error) + + def _response_future(self, future): + """ + Callback function which will be called when the caller finishes + in order to response the results to the client. + """ + response = {"type": "console", "action": "future"} + success, error = future.result() + response["success"] = success + if not success: + response["error"] = error + logger.debug("WebSocket: future response: {0}".format(response)) + msg_response = json.dumps(response) + self.websocket.write_message(msg_response) + + def _add_wsloghandler(self): + """Add the ``self.wsloghandler`` to the logging handlers""" + root_logger = logging.getLogger() + root_logger.addHandler(self.wsloghandler) + logger.info("Added the WebSocket logging handler") + + def _remove_wsloghandler(self): + """Remove the ``self.wsloghandler`` from the logging handlers""" + root_logger = logging.getLogger() + root_logger.removeHandler(self.wsloghandler) + logger.info("Removed the WebSocket logging handler") + + def _task_callback(self, future): + """Callback function executed when the task finishes""" + logger.info("Console task finished! Callback ...") + self.status["running"] = False + self.status["finished"] = True + self._remove_wsloghandler() + # + response = {"type": "console", "action": "push"} + response["success"] = True + response["status"] = self.status + logger.debug("WebSocket: future response: {0}".format(response)) + msg_response = json.dumps(response) + self.websocket.write_message(msg_response) + + def _task_test(self, *args, **kwargs): + """ + The task this console to manage. + + Returns + ------- + success : bool + Whether success without any errors + error : str + Detail of the error if not succeed + + NOTE + ---- + The task is synchronous and may be computationally intensive + (i.e., CPU-bound rather than IO/event-bound), therefore, + threads (or processes) are required to make it non-blocking + (i.e., asynchronous). + + Credit: https://stackoverflow.com/a/32164711/4856091 + """ + import time + logger.info("console task: START") + for i in range(args[0]): + logger.info("console task: slept {0} seconds ...".format(i)) + time.sleep(1) + logger.info("console task: DONE!") + return (True, None) + + def _task(self, *args, **kwargs): + """ + The task this console to manage. + Perform the foregrounds simulations. + + Returns + ------- + success : bool + Whether success without any errors + error : str + Detail of the error if not succeed + + NOTE + ---- + The task is synchronous and may be computationally intensive + (i.e., CPU-bound rather than IO/event-bound), therefore, + threads (or processes) are required to make it non-blocking + (i.e., asynchronous). + + Credit: https://stackoverflow.com/a/32164711/4856091 + """ + logger.info("Preparing to start foregrounds simulations ...") + logger.info("Importing modules + Numba JIT, waiting ...") + + from ..foregrounds import Foregrounds + + # FIXME: This is a hack + configs = self.websocket.configs + logger.info("Checking the configurations ...") + configs.check_all() + + fg = Foregrounds(configs) + fg.preprocess() + fg.simulate() + fg.postprocess() + + logger.info("Foregrounds simulations DONE!") + + # NOTE: Should always return a tuple of (success, error) + return (True, None) diff --git a/fg21sim/webui/handlers/log.py b/fg21sim/webui/handlers/log.py new file mode 100644 index 0000000..0b457b5 --- /dev/null +++ b/fg21sim/webui/handlers/log.py @@ -0,0 +1,61 @@ +# Copyright (c) 2016 Weitian LI +# MIT license + +""" +Custom logging handlers + +WebSocketLogHandler : + Send logging messages to the WebSocket as JSON-encoded string. +""" + + +import logging +import json + + +class WebSocketLogHandler(logging.Handler): + """ + Send logging messages to the WebSocket as JSON-encoded string. + + Parameters + ---------- + websocket : `~tornado.websocket.WebSocketHandler` + An `~tornado.websocket.WebSocketHandler` instance, which has + the ``write_message()`` method that will be used to send the + logging messages. + msg_type : str, optional + Set the type of the sent back message, for easier processing + by the client. + + NOTE + ---- + The message sent through the WebSocket is a JSON-encoded string + from a dictionary, e.g., + ``{"type": self.msg_type, + "action": "log", + "levelname": record.levelname, + "levelno": record.levelno, + "name": record.name, + "asctime": record.asctime, + "message": }`` + """ + def __init__(self, websocket, msg_type=None): + super().__init__() + self.websocket = websocket + self.msg_type = msg_type + + def emit(self, record): + try: + message = self.format(record) + msg = json.dumps({ + "type": self.msg_type, + "action": "log", + "levelname": record.levelname, + "levelno": record.levelno, + "name": record.name, + "asctime": record.asctime, + "message": message, + }) + self.websocket.write_message(msg) + except Exception: + self.handleError(record) diff --git a/fg21sim/webui/handlers/websocket.py b/fg21sim/webui/handlers/websocket.py new file mode 100644 index 0000000..7db953c --- /dev/null +++ b/fg21sim/webui/handlers/websocket.py @@ -0,0 +1,436 @@ +# Copyright (c) 2016 Weitian LI +# MIT license + +""" +Communicate with the "fg21sim" simulation program through the Web UI using +the WebSocket_ protocol, which provides full-duplex communication channels +over a single TCP connection. + +.. _WebSocket: https://en.wikipedia.org/wiki/WebSocket + + +References +---------- +- Tornado WebSocket: + http://www.tornadoweb.org/en/stable/websocket.html +- Can I Use: WebSocket: + http://caniuse.com/#feat=websockets +""" + +import os +import json +import logging + +import tornado.websocket +from tornado.options import options + +from .console import ConsoleHandler +from ..utils import get_host_ip, ip_in_network +from ...errors import ConfigError + + +logger = logging.getLogger(__name__) + + +class FG21simWSHandler(tornado.websocket.WebSocketHandler): + """ + WebSocket for bi-directional communication between the Web UI and + the server, which can deal with the configurations and execute the + simulation task. + + Generally, WebSocket send and receive data as *string*. Therefore, + the more complex data are stringified as JSON string before sending, + which will be parsed after receive. + + Each message (as a JSON object or Python dictionary) has a ``type`` + field which will be used to determine the following action to take. + + Attributes + ---------- + name : str + Name to distinguish this WebSocket handle. + from_localhost : bool + Set to ``True`` if the access is from the localhost, + otherwise ``False``. + configs : `~ConfigManager` + A ``ConfigManager`` instance, for configuration manipulations when + communicating with the Web UI. + """ + name = "fg21sim" + from_localhost = None + + def check_origin(self, origin): + """Check the origin of the WebSocket access. + + Attributes + ---------- + from_localhost : bool + Set to ``True`` if the access is from the localhost, + otherwise ``False``. + + NOTE + ---- + Currently, only allow access from the ``localhost`` + (i.e., 127.0.0.1) and local LAN. + """ + self.from_localhost = False + logger.info("WebSocket: {0}: origin: {1}".format(self.name, origin)) + ip = get_host_ip(url=origin) + network = options.hosts_allowed + if ip == "127.0.0.1": + self.from_localhost = True + allow = True + logger.info("WebSocket: %s: origin is localhost" % self.name) + elif network.upper() == "ANY": + # Any hosts are allowed + allow = True + logger.warning("WebSocket: %s: any hosts are allowed" % self.name) + elif ip_in_network(ip, network): + allow = True + logger.info("WebSocket: %s: " % self.name + + "client is in the allowed network: %s" % network) + else: + allow = False + logger.error("WebSocket: %s: " % self.name + + "client is NOT in the allowed network: %s" % network) + return allow + + def open(self): + """Invoked when a new WebSocket is opened by the client.""" + # FIXME: + # * better to move to the `Application` class ?? + # * or create a ``ConfigsHandler`` similar to the ``ConsoleHandler`` + self.configs = self.application.configmanager + self.console_handler = ConsoleHandler(websocket=self) + # + logger.info("WebSocket: {0}: opened".format(self.name)) + logger.info("Allowed hosts: {0}".format(options.hosts_allowed)) + + def on_close(self): + """Invoked when a new WebSocket is closed by the client.""" + code, reason = None, None + if hasattr(self, "close_code"): + code = self.close_code + if hasattr(self, "close_reason"): + reason = self.close_reason + logger.info("WebSocket: {0}: closed by client: {1}, {2}".format( + self.name, code, reason)) + + # FIXME/XXX: + # * How to be non-blocking ?? + # NOTE: WebSocket.on_message: may NOT be a coroutine at the moment (v4.3) + # References: + # [1] https://stackoverflow.com/a/35543856/4856091 + # [2] https://stackoverflow.com/a/33724486/4856091 + def on_message(self, message): + """Handle incoming messages and dispatch task according to the + message type. + + NOTE + ---- + The received message (parsed to a Python dictionary) has a ``type`` + item which will be used to determine the following action to take. + + Currently supported message types are: + ``configs``: + Request or set the configurations + ``console``: + Control the simulation tasks, or request logging messages + ``results``: + Request the simulation results + + The sent message also has a ``type`` item of same value, which the + client can be used to figure out the proper actions. + There is a ``success`` item which indicates the status of the + requested operation, and an ``error`` recording the error message + if ``success=False``. + """ + logger.debug("WebSocket: %s: received: %s" % (self.name, message)) + try: + msg = json.loads(message) + msg_type = msg["type"] + except json.JSONDecodeError: + logger.warning("WebSocket: {0}: ".format(self.name) + + "message is not a valid JSON string") + response = {"success": False, + "type": None, + "error": "message is not a valid JSON string"} + except (KeyError, TypeError): + logger.warning("WebSocket: %s: skip invalid message" % self.name) + response = {"success": False, + "type": None, + "error": "type is missing"} + else: + # Check the message type and dispatch task + if msg_type == "configs": + # Request or set the configurations + response = self._handle_configs(msg) + elif msg_type == "console": + # Control the simulation tasks, or request logging messages + # FIXME/XXX: + # * How to make this asynchronously ?? + response = self.console_handler.handle_message(msg) + elif msg_type == "results": + # Request the simulation results + response = self._handle_results(msg) + else: + # Message of unknown type + logger.warning("WebSocket: {0}: ".format(self.name) + + "unknown message type: {0}".format(msg_type)) + response = {"success": False, + "type": msg_type, + "error": "unknown message type %s" % msg_type} + # + msg_response = json.dumps(response) + self.write_message(msg_response) + + def _handle_configs(self, msg): + """Handle the message of type "configs", which request to get or + set some configurations by the client. + + TODO: + * improve the description ... + * split these handling functions into a separate class in a module + + Parameters + ---------- + msg : dict + A dictionary parsed from the incoming JSON message, which + generally has the following syntax: + ``{"type": "configs", "action": , "data": }`` + where the ```` is ``set`` or ``get``, and the ```` + is a list of config keys or a dict of config key-value pairs. + + Returns + ------- + response : dict + A dictionary parsed from the incoming JSON message, which + generally has the following syntax: + ``{"type": "configs", "action": , + "data": , "errors": }`` + where the ```` is the same as input, the ```` is + a list of config keys or a dict of config key-value pairs, and + ```` contains the error message for the invalid config + values. + """ + try: + msg_type = msg["type"] + msg_action = msg["action"] + response = {"type": msg_type, "action": msg_action} + logger.info("WebSocket: {0}: handle message: ".format(self.name) + + "type: {0}, action: {1}".format(msg_type, msg_action)) + if msg_action == "get": + # Get the values of the specified options + try: + data, errors = self._get_configs(keys=msg["keys"]) + response["success"] = True + response["data"] = data + response["errors"] = errors + except KeyError: + response["success"] = False + response["error"] = "'keys' is missing" + elif msg_action == "set": + # Set the values of the specified options + try: + errors = self._set_configs(data=msg["data"]) + response["success"] = True + response["data"] = {} # be more consistent + response["errors"] = errors + except KeyError: + response["success"] = False + response["error"] = "'data' is missing" + elif msg_action == "reset": + # Reset the configurations to the defaults + self._reset_configs() + response["success"] = True + elif msg_action == "load": + # Load the supplied user configuration file + try: + success, error = self._load_configs(msg["userconfig"]) + response["success"] = success + if not success: + response["error"] = error + except KeyError: + response["success"] = False + response["error"] = "'userconfig' is missing" + elif msg_action == "save": + # Save current configurations to file + try: + success, error = self._save_configs(msg["outfile"], + msg["clobber"]) + response["success"] = success + if not success: + response["error"] = error + except KeyError: + response["success"] = False + response["error"] = "'outfile' or 'clobber' is missing" + else: + logger.warning("WebSocket: {0}: ".format(self.name) + + "unknown action: {0}".format(msg_action)) + response["success"] = False + response["error"] = "unknown action: {0}".format(msg_action) + except KeyError: + # Received message has wrong syntax/format + response = {"success": False, + "type": msg_type, + "error": "no action specified"} + # + logger.debug("WebSocket: {0}: ".format(self.name) + + "response: {0}".format(response)) + return response + + def _get_configs(self, keys=None): + """Get the values of the config options specified by the given keys. + + Parameters + ---------- + keys : list[str], optional + A list of keys specifying the config options whose values will + be obtained. + If ``keys=None``, then all the configurations values are dumped. + + Returns + ------- + data : dict + A dictionary with keys the same as the input keys, and values + the corresponding config option values. + errors : dict + When error occurs (e.g., invalid key), then the specific errors + with details are stored in this dictionary. + + NOTE + ---- + Do not forget the ``userconfig`` option. + """ + if keys is None: + # Dump all the configurations + data = self.configs.dump(flatten=True) + data["userconfig"] = self.configs.userconfig + errors = {} + else: + data = {} + errors = {} + for key in keys: + if key == "userconfig": + data["userconfig"] = self.configs.userconfig + else: + try: + data[key] = self.configs.getn(key) + except KeyError as e: + errors[key] = str(e) + # + return (data, errors) + + def _set_configs(self, data): + """Set the values of the config options specified by the given keys + to the corresponding supplied data. + + NOTE + ---- + The ``userconfig`` needs special handle. + The ``workdir`` and ``configfile`` options should be ignored. + + Parameters + ---------- + data : dict + A dictionary of key-value pairs, with keys specifying the config + options whose value will be changed, and values the new values + to which config options will be set. + NOTE: + If want to set the ``userconfig`` option, an *absolute path* + must be provided. + + Returns + ------- + errors : dict + When error occurs (e.g., invalid key, invalid values), then the + specific errors with details are stored in this dictionary. + """ + errors = {} + for key, value in data.items(): + if key in ["workdir", "configfile"]: + # Ignore "workdir" and "configfile" + continue + elif key == "userconfig": + if os.path.isabs(os.path.expanduser(value)): + self.configs.userconfig = value + else: + errors[key] = "Not an absolute path" + else: + try: + self.configs.setn(key, value) + except KeyError as e: + errors[key] = str(e) + # NOTE: + # Check the whole configurations after all provided options are + # updated, and merge the validation errors. + __, cherr = self.configs.check_all(raise_exception=False) + errors.update(cherr) + return errors + + def _reset_configs(self): + """Reset the configurations to the defaults.""" + self.configs.reset() + + def _load_configs(self, userconfig): + """Load configurations from the provided user configuration file. + + Parameters + ---------- + userconfig: str + The filepath to the user configuration file, which must be + an *absolute path*. + + Returns + ------- + success : bool + ``True`` if the operation succeeded, otherwise, ``False``. + error : str + If failed, this ``error`` saves the details, otherwise, ``None``. + """ + success = False + error = None + if os.path.isabs(os.path.expanduser(userconfig)): + try: + self.configs.read_userconfig(userconfig) + success = True + except ConfigError as e: + error = str(e) + else: + error = "Not an absolute path" + return (success, error) + + def _save_configs(self, outfile, clobber=False): + """Save current configurations to file. + + Parameters + ---------- + outfile: str + The filepath to the output configuration file, which must be + an *absolute path*. + clobber : bool, optional + Whether overwrite the output file if already exists? + + Returns + ------- + success : bool + ``True`` if the operation succeeded, otherwise, ``False``. + error : str + If failed, this ``error`` saves the details, otherwise, ``None``. + """ + success = False + error = None + try: + self.configs.save(outfile, clobber=clobber) + success = True + except (ValueError, OSError) as e: + error = str(e) + return (success, error) + + def _handle_results(self, msg): + # Got a message of supported types + msg_type = msg["type"] + logger.info("WebSocket: {0}: ".format(self.name) + + "handle message of type: {0}".format(msg_type)) + response = {"success": True, "type": msg_type} + return response -- cgit v1.2.2