From e74ccdc92478ceb50aa2617e21f6f7eb4bc181de Mon Sep 17 00:00:00 2001 From: Aaron LI Date: Tue, 15 Nov 2016 18:23:00 +0800 Subject: webui: Place handlers under the directory "hnadlers/" --- fg21sim/webui/consolehandler.py | 275 ----------------------- fg21sim/webui/handlers/__init__.py | 1 + fg21sim/webui/handlers/console.py | 275 +++++++++++++++++++++++ fg21sim/webui/handlers/log.py | 61 +++++ fg21sim/webui/handlers/websocket.py | 436 ++++++++++++++++++++++++++++++++++++ fg21sim/webui/loghandler.py | 61 ----- fg21sim/webui/websocket.py | 436 ------------------------------------ 7 files changed, 773 insertions(+), 772 deletions(-) delete mode 100644 fg21sim/webui/consolehandler.py create mode 100644 fg21sim/webui/handlers/console.py create mode 100644 fg21sim/webui/handlers/log.py create mode 100644 fg21sim/webui/handlers/websocket.py delete mode 100644 fg21sim/webui/loghandler.py delete mode 100644 fg21sim/webui/websocket.py (limited to 'fg21sim') diff --git a/fg21sim/webui/consolehandler.py b/fg21sim/webui/consolehandler.py deleted file mode 100644 index 54d4305..0000000 --- a/fg21sim/webui/consolehandler.py +++ /dev/null @@ -1,275 +0,0 @@ -# Copyright (c) 2016 Weitian LI -# MIT license - -""" -Handle the "console" type of messages from the client. -""" - -import json -import logging -from concurrent.futures import ThreadPoolExecutor - -import tornado.ioloop -import tornado.gen - -from .loghandler import WebSocketLogHandler - - -logger = logging.getLogger(__name__) - - -class ConsoleHandler: - """ - Handle the "console" type of messages from the client. - - XXX/TODO: - * How to kill the submitted task? (force kill thread?) - - Parameters - ---------- - websocket : `~tornado.websocket.WebSocketHandler` - An `~tornado.websocket.WebSocketHandler` instance. - The ``WebSocketLogHandler`` requires this to push logging messages - to the client. - max_workers : int, optional - Maximum number of workers/threads for the execution pool - onetask_only : bool, optional - Whether to allow only one task running at the same time? - - Attributes - ---------- - websocket : `~tornado.websocket.WebSocketHandler` - An `~tornado.websocket.WebSocketHandler` instance, which is used by - the ``WebSocketLogHandler`` to push logging messages to the client. - wsloghandler : `~WebSocketLogHandler` - Push logging messages to the client through WebSocket. - executor : `~concurrent.futures.ThreadPoolExecutor` - Where to submit the synchronous task and make it perform - asynchronously using threads. - io_loop : `~tornado.ioloop.IOLoop` - Used to communicate with the main thread (e.g., callback) from the - submitted task, which is executed on a different thread. - onetask_only : bool - Whether to allow only one task running at the same time? - status : dict - Whether the task is running and/or finished? - There may be 4 possible status: - 1. running=False, finished=False: not started - 2. running=False, finished=True: finished - 3. running=True, finished=False: running - 4. running=True, finished=True: ?? error ?? - """ - def __init__(self, websocket, max_workers=3, onetask_only=False): - self.websocket = websocket - self.wsloghandler = WebSocketLogHandler(websocket, msg_type="console") - self.executor = ThreadPoolExecutor(max_workers=1) - # NOTE: - # Use ``IOLoop.instance`` instead of ``IOLoop.current``, since we - # will need to communicate with the main thread from another thread. - self.io_loop = tornado.ioloop.IOLoop.instance() - self.onetask_only = onetask_only - self.status = {"running": False, "finished": False} - - def handle_message(self, msg): - try: - msg_type = msg["type"] - msg_action = msg["action"] - response = {"type": msg_type, "action": msg_action} - logger.info("WebSocket: handle message: " + - "type: {0}, action: {1}".format(msg_type, msg_action)) - if msg_action == "start": - # FIXME/XXX: This task should be asynchronous! - success, error = self._start() - response["success"] = success - if not success: - response["error"] = error - elif msg_action == "start_test": - # FIXME/XXX: This task should be asynchronous! - success, error = self._start(msg["time"]) - response["success"] = success - if not success: - response["error"] = error - elif msg_action == "get_status": - response["success"] = True - response["action"] = "push" - response["status"] = self.status - else: - logger.warning("WebSocket: " + - "unknown action: {0}".format(msg_action)) - response["success"] = False - response["error"] = "unknown action: {0}".format(msg_action) - except KeyError: - # Received message has wrong syntax/format - response = {"success": False, - "type": msg_type, - "error": "no action specified"} - # - logger.debug("WebSocket: response: {0}".format(response)) - return response - - # FIXME/XXX: - # * How to call this task asynchronously ?? - def _start_test(self, *args, **kwargs): - """ - Start the task by submitting it to the executor - - Returns - ------- - success : bool - Whether success without any errors - error : str - Detail of the error if not succeed - - """ - if self.onetask_only and self.status["running"]: - logger.warning("Task already running, and only one task allowed") - success = False - error = "already running and only one task allowed" - else: - logger.info("Start the task on the executor ...") - self.status["running"] = True - self.status["finished"] = False - # Also push the logging messages to the client - self._add_wsloghandler() - future = self.executor.submit(self._task_test, *args, **kwargs) - self.io_loop.add_future(future, self._task_callback) - success, error = future.result() - return (success, error) - - # FIXME/XXX: - # * How to call this task asynchronously ?? - def _start(self, *args, **kwargs): - """ - Start the task by submitting it to the executor - - Returns - ------- - success : bool - Whether success without any errors - error : str - Detail of the error if not succeed - - """ - if self.onetask_only and self.status["running"]: - logger.warning("Task already running, and only one task allowed") - success = False - error = "already running and only one task allowed" - else: - logger.info("Start the task on the executor ...") - self.status["running"] = True - self.status["finished"] = False - # Also push the logging messages to the client - self._add_wsloghandler() - future = self.executor.submit(self._task, *args, **kwargs) - self.io_loop.add_future(future, self._task_callback) - success, error = future.result() - return (success, error) - - def _response_future(self, future): - """ - Callback function which will be called when the caller finishes - in order to response the results to the client. - """ - response = {"type": "console", "action": "future"} - success, error = future.result() - response["success"] = success - if not success: - response["error"] = error - logger.debug("WebSocket: future response: {0}".format(response)) - msg_response = json.dumps(response) - self.websocket.write_message(msg_response) - - def _add_wsloghandler(self): - """Add the ``self.wsloghandler`` to the logging handlers""" - root_logger = logging.getLogger() - root_logger.addHandler(self.wsloghandler) - logger.info("Added the WebSocket logging handler") - - def _remove_wsloghandler(self): - """Remove the ``self.wsloghandler`` from the logging handlers""" - root_logger = logging.getLogger() - root_logger.removeHandler(self.wsloghandler) - logger.info("Removed the WebSocket logging handler") - - def _task_callback(self, future): - """Callback function executed when the task finishes""" - logger.info("Console task finished! Callback ...") - self.status["running"] = False - self.status["finished"] = True - self._remove_wsloghandler() - # - response = {"type": "console", "action": "push"} - response["success"] = True - response["status"] = self.status - logger.debug("WebSocket: future response: {0}".format(response)) - msg_response = json.dumps(response) - self.websocket.write_message(msg_response) - - def _task_test(self, *args, **kwargs): - """ - The task this console to manage. - - Returns - ------- - success : bool - Whether success without any errors - error : str - Detail of the error if not succeed - - NOTE - ---- - The task is synchronous and may be computationally intensive - (i.e., CPU-bound rather than IO/event-bound), therefore, - threads (or processes) are required to make it non-blocking - (i.e., asynchronous). - - Credit: https://stackoverflow.com/a/32164711/4856091 - """ - import time - logger.info("console task: START") - for i in range(args[0]): - logger.info("console task: slept {0} seconds ...".format(i)) - time.sleep(1) - logger.info("console task: DONE!") - return (True, None) - - def _task(self, *args, **kwargs): - """ - The task this console to manage. - Perform the foregrounds simulations. - - Returns - ------- - success : bool - Whether success without any errors - error : str - Detail of the error if not succeed - - NOTE - ---- - The task is synchronous and may be computationally intensive - (i.e., CPU-bound rather than IO/event-bound), therefore, - threads (or processes) are required to make it non-blocking - (i.e., asynchronous). - - Credit: https://stackoverflow.com/a/32164711/4856091 - """ - logger.info("Preparing to start foregrounds simulations ...") - logger.info("Importing modules + Numba JIT, waiting ...") - - from ..foregrounds import Foregrounds - - # FIXME: This is a hack - configs = self.websocket.configs - logger.info("Checking the configurations ...") - configs.check_all() - - fg = Foregrounds(configs) - fg.preprocess() - fg.simulate() - fg.postprocess() - - logger.info("Foregrounds simulations DONE!") - - # NOTE: Should always return a tuple of (success, error) - return (True, None) diff --git a/fg21sim/webui/handlers/__init__.py b/fg21sim/webui/handlers/__init__.py index f97ef07..0c01ea7 100644 --- a/fg21sim/webui/handlers/__init__.py +++ b/fg21sim/webui/handlers/__init__.py @@ -3,3 +3,4 @@ from .index import IndexHandler from .login import LoginHandler +from .websocket import FG21simWSHandler diff --git a/fg21sim/webui/handlers/console.py b/fg21sim/webui/handlers/console.py new file mode 100644 index 0000000..fe1b500 --- /dev/null +++ b/fg21sim/webui/handlers/console.py @@ -0,0 +1,275 @@ +# Copyright (c) 2016 Weitian LI +# MIT license + +""" +Handle the "console" type of messages from the client. +""" + +import json +import logging +from concurrent.futures import ThreadPoolExecutor + +import tornado.ioloop +import tornado.gen + +from .log import WebSocketLogHandler + + +logger = logging.getLogger(__name__) + + +class ConsoleHandler: + """ + Handle the "console" type of messages from the client. + + XXX/TODO: + * How to kill the submitted task? (force kill thread?) + + Parameters + ---------- + websocket : `~tornado.websocket.WebSocketHandler` + An `~tornado.websocket.WebSocketHandler` instance. + The ``WebSocketLogHandler`` requires this to push logging messages + to the client. + max_workers : int, optional + Maximum number of workers/threads for the execution pool + onetask_only : bool, optional + Whether to allow only one task running at the same time? + + Attributes + ---------- + websocket : `~tornado.websocket.WebSocketHandler` + An `~tornado.websocket.WebSocketHandler` instance, which is used by + the ``WebSocketLogHandler`` to push logging messages to the client. + wsloghandler : `~WebSocketLogHandler` + Push logging messages to the client through WebSocket. + executor : `~concurrent.futures.ThreadPoolExecutor` + Where to submit the synchronous task and make it perform + asynchronously using threads. + io_loop : `~tornado.ioloop.IOLoop` + Used to communicate with the main thread (e.g., callback) from the + submitted task, which is executed on a different thread. + onetask_only : bool + Whether to allow only one task running at the same time? + status : dict + Whether the task is running and/or finished? + There may be 4 possible status: + 1. running=False, finished=False: not started + 2. running=False, finished=True: finished + 3. running=True, finished=False: running + 4. running=True, finished=True: ?? error ?? + """ + def __init__(self, websocket, max_workers=3, onetask_only=False): + self.websocket = websocket + self.wsloghandler = WebSocketLogHandler(websocket, msg_type="console") + self.executor = ThreadPoolExecutor(max_workers=1) + # NOTE: + # Use ``IOLoop.instance`` instead of ``IOLoop.current``, since we + # will need to communicate with the main thread from another thread. + self.io_loop = tornado.ioloop.IOLoop.instance() + self.onetask_only = onetask_only + self.status = {"running": False, "finished": False} + + def handle_message(self, msg): + try: + msg_type = msg["type"] + msg_action = msg["action"] + response = {"type": msg_type, "action": msg_action} + logger.info("WebSocket: handle message: " + + "type: {0}, action: {1}".format(msg_type, msg_action)) + if msg_action == "start": + # FIXME/XXX: This task should be asynchronous! + success, error = self._start() + response["success"] = success + if not success: + response["error"] = error + elif msg_action == "start_test": + # FIXME/XXX: This task should be asynchronous! + success, error = self._start(msg["time"]) + response["success"] = success + if not success: + response["error"] = error + elif msg_action == "get_status": + response["success"] = True + response["action"] = "push" + response["status"] = self.status + else: + logger.warning("WebSocket: " + + "unknown action: {0}".format(msg_action)) + response["success"] = False + response["error"] = "unknown action: {0}".format(msg_action) + except KeyError: + # Received message has wrong syntax/format + response = {"success": False, + "type": msg_type, + "error": "no action specified"} + # + logger.debug("WebSocket: response: {0}".format(response)) + return response + + # FIXME/XXX: + # * How to call this task asynchronously ?? + def _start_test(self, *args, **kwargs): + """ + Start the task by submitting it to the executor + + Returns + ------- + success : bool + Whether success without any errors + error : str + Detail of the error if not succeed + + """ + if self.onetask_only and self.status["running"]: + logger.warning("Task already running, and only one task allowed") + success = False + error = "already running and only one task allowed" + else: + logger.info("Start the task on the executor ...") + self.status["running"] = True + self.status["finished"] = False + # Also push the logging messages to the client + self._add_wsloghandler() + future = self.executor.submit(self._task_test, *args, **kwargs) + self.io_loop.add_future(future, self._task_callback) + success, error = future.result() + return (success, error) + + # FIXME/XXX: + # * How to call this task asynchronously ?? + def _start(self, *args, **kwargs): + """ + Start the task by submitting it to the executor + + Returns + ------- + success : bool + Whether success without any errors + error : str + Detail of the error if not succeed + + """ + if self.onetask_only and self.status["running"]: + logger.warning("Task already running, and only one task allowed") + success = False + error = "already running and only one task allowed" + else: + logger.info("Start the task on the executor ...") + self.status["running"] = True + self.status["finished"] = False + # Also push the logging messages to the client + self._add_wsloghandler() + future = self.executor.submit(self._task, *args, **kwargs) + self.io_loop.add_future(future, self._task_callback) + success, error = future.result() + return (success, error) + + def _response_future(self, future): + """ + Callback function which will be called when the caller finishes + in order to response the results to the client. + """ + response = {"type": "console", "action": "future"} + success, error = future.result() + response["success"] = success + if not success: + response["error"] = error + logger.debug("WebSocket: future response: {0}".format(response)) + msg_response = json.dumps(response) + self.websocket.write_message(msg_response) + + def _add_wsloghandler(self): + """Add the ``self.wsloghandler`` to the logging handlers""" + root_logger = logging.getLogger() + root_logger.addHandler(self.wsloghandler) + logger.info("Added the WebSocket logging handler") + + def _remove_wsloghandler(self): + """Remove the ``self.wsloghandler`` from the logging handlers""" + root_logger = logging.getLogger() + root_logger.removeHandler(self.wsloghandler) + logger.info("Removed the WebSocket logging handler") + + def _task_callback(self, future): + """Callback function executed when the task finishes""" + logger.info("Console task finished! Callback ...") + self.status["running"] = False + self.status["finished"] = True + self._remove_wsloghandler() + # + response = {"type": "console", "action": "push"} + response["success"] = True + response["status"] = self.status + logger.debug("WebSocket: future response: {0}".format(response)) + msg_response = json.dumps(response) + self.websocket.write_message(msg_response) + + def _task_test(self, *args, **kwargs): + """ + The task this console to manage. + + Returns + ------- + success : bool + Whether success without any errors + error : str + Detail of the error if not succeed + + NOTE + ---- + The task is synchronous and may be computationally intensive + (i.e., CPU-bound rather than IO/event-bound), therefore, + threads (or processes) are required to make it non-blocking + (i.e., asynchronous). + + Credit: https://stackoverflow.com/a/32164711/4856091 + """ + import time + logger.info("console task: START") + for i in range(args[0]): + logger.info("console task: slept {0} seconds ...".format(i)) + time.sleep(1) + logger.info("console task: DONE!") + return (True, None) + + def _task(self, *args, **kwargs): + """ + The task this console to manage. + Perform the foregrounds simulations. + + Returns + ------- + success : bool + Whether success without any errors + error : str + Detail of the error if not succeed + + NOTE + ---- + The task is synchronous and may be computationally intensive + (i.e., CPU-bound rather than IO/event-bound), therefore, + threads (or processes) are required to make it non-blocking + (i.e., asynchronous). + + Credit: https://stackoverflow.com/a/32164711/4856091 + """ + logger.info("Preparing to start foregrounds simulations ...") + logger.info("Importing modules + Numba JIT, waiting ...") + + from ..foregrounds import Foregrounds + + # FIXME: This is a hack + configs = self.websocket.configs + logger.info("Checking the configurations ...") + configs.check_all() + + fg = Foregrounds(configs) + fg.preprocess() + fg.simulate() + fg.postprocess() + + logger.info("Foregrounds simulations DONE!") + + # NOTE: Should always return a tuple of (success, error) + return (True, None) diff --git a/fg21sim/webui/handlers/log.py b/fg21sim/webui/handlers/log.py new file mode 100644 index 0000000..0b457b5 --- /dev/null +++ b/fg21sim/webui/handlers/log.py @@ -0,0 +1,61 @@ +# Copyright (c) 2016 Weitian LI +# MIT license + +""" +Custom logging handlers + +WebSocketLogHandler : + Send logging messages to the WebSocket as JSON-encoded string. +""" + + +import logging +import json + + +class WebSocketLogHandler(logging.Handler): + """ + Send logging messages to the WebSocket as JSON-encoded string. + + Parameters + ---------- + websocket : `~tornado.websocket.WebSocketHandler` + An `~tornado.websocket.WebSocketHandler` instance, which has + the ``write_message()`` method that will be used to send the + logging messages. + msg_type : str, optional + Set the type of the sent back message, for easier processing + by the client. + + NOTE + ---- + The message sent through the WebSocket is a JSON-encoded string + from a dictionary, e.g., + ``{"type": self.msg_type, + "action": "log", + "levelname": record.levelname, + "levelno": record.levelno, + "name": record.name, + "asctime": record.asctime, + "message": }`` + """ + def __init__(self, websocket, msg_type=None): + super().__init__() + self.websocket = websocket + self.msg_type = msg_type + + def emit(self, record): + try: + message = self.format(record) + msg = json.dumps({ + "type": self.msg_type, + "action": "log", + "levelname": record.levelname, + "levelno": record.levelno, + "name": record.name, + "asctime": record.asctime, + "message": message, + }) + self.websocket.write_message(msg) + except Exception: + self.handleError(record) diff --git a/fg21sim/webui/handlers/websocket.py b/fg21sim/webui/handlers/websocket.py new file mode 100644 index 0000000..7db953c --- /dev/null +++ b/fg21sim/webui/handlers/websocket.py @@ -0,0 +1,436 @@ +# Copyright (c) 2016 Weitian LI +# MIT license + +""" +Communicate with the "fg21sim" simulation program through the Web UI using +the WebSocket_ protocol, which provides full-duplex communication channels +over a single TCP connection. + +.. _WebSocket: https://en.wikipedia.org/wiki/WebSocket + + +References +---------- +- Tornado WebSocket: + http://www.tornadoweb.org/en/stable/websocket.html +- Can I Use: WebSocket: + http://caniuse.com/#feat=websockets +""" + +import os +import json +import logging + +import tornado.websocket +from tornado.options import options + +from .console import ConsoleHandler +from ..utils import get_host_ip, ip_in_network +from ...errors import ConfigError + + +logger = logging.getLogger(__name__) + + +class FG21simWSHandler(tornado.websocket.WebSocketHandler): + """ + WebSocket for bi-directional communication between the Web UI and + the server, which can deal with the configurations and execute the + simulation task. + + Generally, WebSocket send and receive data as *string*. Therefore, + the more complex data are stringified as JSON string before sending, + which will be parsed after receive. + + Each message (as a JSON object or Python dictionary) has a ``type`` + field which will be used to determine the following action to take. + + Attributes + ---------- + name : str + Name to distinguish this WebSocket handle. + from_localhost : bool + Set to ``True`` if the access is from the localhost, + otherwise ``False``. + configs : `~ConfigManager` + A ``ConfigManager`` instance, for configuration manipulations when + communicating with the Web UI. + """ + name = "fg21sim" + from_localhost = None + + def check_origin(self, origin): + """Check the origin of the WebSocket access. + + Attributes + ---------- + from_localhost : bool + Set to ``True`` if the access is from the localhost, + otherwise ``False``. + + NOTE + ---- + Currently, only allow access from the ``localhost`` + (i.e., 127.0.0.1) and local LAN. + """ + self.from_localhost = False + logger.info("WebSocket: {0}: origin: {1}".format(self.name, origin)) + ip = get_host_ip(url=origin) + network = options.hosts_allowed + if ip == "127.0.0.1": + self.from_localhost = True + allow = True + logger.info("WebSocket: %s: origin is localhost" % self.name) + elif network.upper() == "ANY": + # Any hosts are allowed + allow = True + logger.warning("WebSocket: %s: any hosts are allowed" % self.name) + elif ip_in_network(ip, network): + allow = True + logger.info("WebSocket: %s: " % self.name + + "client is in the allowed network: %s" % network) + else: + allow = False + logger.error("WebSocket: %s: " % self.name + + "client is NOT in the allowed network: %s" % network) + return allow + + def open(self): + """Invoked when a new WebSocket is opened by the client.""" + # FIXME: + # * better to move to the `Application` class ?? + # * or create a ``ConfigsHandler`` similar to the ``ConsoleHandler`` + self.configs = self.application.configmanager + self.console_handler = ConsoleHandler(websocket=self) + # + logger.info("WebSocket: {0}: opened".format(self.name)) + logger.info("Allowed hosts: {0}".format(options.hosts_allowed)) + + def on_close(self): + """Invoked when a new WebSocket is closed by the client.""" + code, reason = None, None + if hasattr(self, "close_code"): + code = self.close_code + if hasattr(self, "close_reason"): + reason = self.close_reason + logger.info("WebSocket: {0}: closed by client: {1}, {2}".format( + self.name, code, reason)) + + # FIXME/XXX: + # * How to be non-blocking ?? + # NOTE: WebSocket.on_message: may NOT be a coroutine at the moment (v4.3) + # References: + # [1] https://stackoverflow.com/a/35543856/4856091 + # [2] https://stackoverflow.com/a/33724486/4856091 + def on_message(self, message): + """Handle incoming messages and dispatch task according to the + message type. + + NOTE + ---- + The received message (parsed to a Python dictionary) has a ``type`` + item which will be used to determine the following action to take. + + Currently supported message types are: + ``configs``: + Request or set the configurations + ``console``: + Control the simulation tasks, or request logging messages + ``results``: + Request the simulation results + + The sent message also has a ``type`` item of same value, which the + client can be used to figure out the proper actions. + There is a ``success`` item which indicates the status of the + requested operation, and an ``error`` recording the error message + if ``success=False``. + """ + logger.debug("WebSocket: %s: received: %s" % (self.name, message)) + try: + msg = json.loads(message) + msg_type = msg["type"] + except json.JSONDecodeError: + logger.warning("WebSocket: {0}: ".format(self.name) + + "message is not a valid JSON string") + response = {"success": False, + "type": None, + "error": "message is not a valid JSON string"} + except (KeyError, TypeError): + logger.warning("WebSocket: %s: skip invalid message" % self.name) + response = {"success": False, + "type": None, + "error": "type is missing"} + else: + # Check the message type and dispatch task + if msg_type == "configs": + # Request or set the configurations + response = self._handle_configs(msg) + elif msg_type == "console": + # Control the simulation tasks, or request logging messages + # FIXME/XXX: + # * How to make this asynchronously ?? + response = self.console_handler.handle_message(msg) + elif msg_type == "results": + # Request the simulation results + response = self._handle_results(msg) + else: + # Message of unknown type + logger.warning("WebSocket: {0}: ".format(self.name) + + "unknown message type: {0}".format(msg_type)) + response = {"success": False, + "type": msg_type, + "error": "unknown message type %s" % msg_type} + # + msg_response = json.dumps(response) + self.write_message(msg_response) + + def _handle_configs(self, msg): + """Handle the message of type "configs", which request to get or + set some configurations by the client. + + TODO: + * improve the description ... + * split these handling functions into a separate class in a module + + Parameters + ---------- + msg : dict + A dictionary parsed from the incoming JSON message, which + generally has the following syntax: + ``{"type": "configs", "action": , "data": }`` + where the ```` is ``set`` or ``get``, and the ```` + is a list of config keys or a dict of config key-value pairs. + + Returns + ------- + response : dict + A dictionary parsed from the incoming JSON message, which + generally has the following syntax: + ``{"type": "configs", "action": , + "data": , "errors": }`` + where the ```` is the same as input, the ```` is + a list of config keys or a dict of config key-value pairs, and + ```` contains the error message for the invalid config + values. + """ + try: + msg_type = msg["type"] + msg_action = msg["action"] + response = {"type": msg_type, "action": msg_action} + logger.info("WebSocket: {0}: handle message: ".format(self.name) + + "type: {0}, action: {1}".format(msg_type, msg_action)) + if msg_action == "get": + # Get the values of the specified options + try: + data, errors = self._get_configs(keys=msg["keys"]) + response["success"] = True + response["data"] = data + response["errors"] = errors + except KeyError: + response["success"] = False + response["error"] = "'keys' is missing" + elif msg_action == "set": + # Set the values of the specified options + try: + errors = self._set_configs(data=msg["data"]) + response["success"] = True + response["data"] = {} # be more consistent + response["errors"] = errors + except KeyError: + response["success"] = False + response["error"] = "'data' is missing" + elif msg_action == "reset": + # Reset the configurations to the defaults + self._reset_configs() + response["success"] = True + elif msg_action == "load": + # Load the supplied user configuration file + try: + success, error = self._load_configs(msg["userconfig"]) + response["success"] = success + if not success: + response["error"] = error + except KeyError: + response["success"] = False + response["error"] = "'userconfig' is missing" + elif msg_action == "save": + # Save current configurations to file + try: + success, error = self._save_configs(msg["outfile"], + msg["clobber"]) + response["success"] = success + if not success: + response["error"] = error + except KeyError: + response["success"] = False + response["error"] = "'outfile' or 'clobber' is missing" + else: + logger.warning("WebSocket: {0}: ".format(self.name) + + "unknown action: {0}".format(msg_action)) + response["success"] = False + response["error"] = "unknown action: {0}".format(msg_action) + except KeyError: + # Received message has wrong syntax/format + response = {"success": False, + "type": msg_type, + "error": "no action specified"} + # + logger.debug("WebSocket: {0}: ".format(self.name) + + "response: {0}".format(response)) + return response + + def _get_configs(self, keys=None): + """Get the values of the config options specified by the given keys. + + Parameters + ---------- + keys : list[str], optional + A list of keys specifying the config options whose values will + be obtained. + If ``keys=None``, then all the configurations values are dumped. + + Returns + ------- + data : dict + A dictionary with keys the same as the input keys, and values + the corresponding config option values. + errors : dict + When error occurs (e.g., invalid key), then the specific errors + with details are stored in this dictionary. + + NOTE + ---- + Do not forget the ``userconfig`` option. + """ + if keys is None: + # Dump all the configurations + data = self.configs.dump(flatten=True) + data["userconfig"] = self.configs.userconfig + errors = {} + else: + data = {} + errors = {} + for key in keys: + if key == "userconfig": + data["userconfig"] = self.configs.userconfig + else: + try: + data[key] = self.configs.getn(key) + except KeyError as e: + errors[key] = str(e) + # + return (data, errors) + + def _set_configs(self, data): + """Set the values of the config options specified by the given keys + to the corresponding supplied data. + + NOTE + ---- + The ``userconfig`` needs special handle. + The ``workdir`` and ``configfile`` options should be ignored. + + Parameters + ---------- + data : dict + A dictionary of key-value pairs, with keys specifying the config + options whose value will be changed, and values the new values + to which config options will be set. + NOTE: + If want to set the ``userconfig`` option, an *absolute path* + must be provided. + + Returns + ------- + errors : dict + When error occurs (e.g., invalid key, invalid values), then the + specific errors with details are stored in this dictionary. + """ + errors = {} + for key, value in data.items(): + if key in ["workdir", "configfile"]: + # Ignore "workdir" and "configfile" + continue + elif key == "userconfig": + if os.path.isabs(os.path.expanduser(value)): + self.configs.userconfig = value + else: + errors[key] = "Not an absolute path" + else: + try: + self.configs.setn(key, value) + except KeyError as e: + errors[key] = str(e) + # NOTE: + # Check the whole configurations after all provided options are + # updated, and merge the validation errors. + __, cherr = self.configs.check_all(raise_exception=False) + errors.update(cherr) + return errors + + def _reset_configs(self): + """Reset the configurations to the defaults.""" + self.configs.reset() + + def _load_configs(self, userconfig): + """Load configurations from the provided user configuration file. + + Parameters + ---------- + userconfig: str + The filepath to the user configuration file, which must be + an *absolute path*. + + Returns + ------- + success : bool + ``True`` if the operation succeeded, otherwise, ``False``. + error : str + If failed, this ``error`` saves the details, otherwise, ``None``. + """ + success = False + error = None + if os.path.isabs(os.path.expanduser(userconfig)): + try: + self.configs.read_userconfig(userconfig) + success = True + except ConfigError as e: + error = str(e) + else: + error = "Not an absolute path" + return (success, error) + + def _save_configs(self, outfile, clobber=False): + """Save current configurations to file. + + Parameters + ---------- + outfile: str + The filepath to the output configuration file, which must be + an *absolute path*. + clobber : bool, optional + Whether overwrite the output file if already exists? + + Returns + ------- + success : bool + ``True`` if the operation succeeded, otherwise, ``False``. + error : str + If failed, this ``error`` saves the details, otherwise, ``None``. + """ + success = False + error = None + try: + self.configs.save(outfile, clobber=clobber) + success = True + except (ValueError, OSError) as e: + error = str(e) + return (success, error) + + def _handle_results(self, msg): + # Got a message of supported types + msg_type = msg["type"] + logger.info("WebSocket: {0}: ".format(self.name) + + "handle message of type: {0}".format(msg_type)) + response = {"success": True, "type": msg_type} + return response diff --git a/fg21sim/webui/loghandler.py b/fg21sim/webui/loghandler.py deleted file mode 100644 index 0b457b5..0000000 --- a/fg21sim/webui/loghandler.py +++ /dev/null @@ -1,61 +0,0 @@ -# Copyright (c) 2016 Weitian LI -# MIT license - -""" -Custom logging handlers - -WebSocketLogHandler : - Send logging messages to the WebSocket as JSON-encoded string. -""" - - -import logging -import json - - -class WebSocketLogHandler(logging.Handler): - """ - Send logging messages to the WebSocket as JSON-encoded string. - - Parameters - ---------- - websocket : `~tornado.websocket.WebSocketHandler` - An `~tornado.websocket.WebSocketHandler` instance, which has - the ``write_message()`` method that will be used to send the - logging messages. - msg_type : str, optional - Set the type of the sent back message, for easier processing - by the client. - - NOTE - ---- - The message sent through the WebSocket is a JSON-encoded string - from a dictionary, e.g., - ``{"type": self.msg_type, - "action": "log", - "levelname": record.levelname, - "levelno": record.levelno, - "name": record.name, - "asctime": record.asctime, - "message": }`` - """ - def __init__(self, websocket, msg_type=None): - super().__init__() - self.websocket = websocket - self.msg_type = msg_type - - def emit(self, record): - try: - message = self.format(record) - msg = json.dumps({ - "type": self.msg_type, - "action": "log", - "levelname": record.levelname, - "levelno": record.levelno, - "name": record.name, - "asctime": record.asctime, - "message": message, - }) - self.websocket.write_message(msg) - except Exception: - self.handleError(record) diff --git a/fg21sim/webui/websocket.py b/fg21sim/webui/websocket.py deleted file mode 100644 index 00bda03..0000000 --- a/fg21sim/webui/websocket.py +++ /dev/null @@ -1,436 +0,0 @@ -# Copyright (c) 2016 Weitian LI -# MIT license - -""" -Communicate with the "fg21sim" simulation program through the Web UI using -the WebSocket_ protocol, which provides full-duplex communication channels -over a single TCP connection. - -.. _WebSocket: https://en.wikipedia.org/wiki/WebSocket - - -References ----------- -- Tornado WebSocket: - http://www.tornadoweb.org/en/stable/websocket.html -- Can I Use: WebSocket: - http://caniuse.com/#feat=websockets -""" - -import os -import json -import logging - -import tornado.websocket -from tornado.options import options - -from .consolehandler import ConsoleHandler -from .utils import get_host_ip, ip_in_network -from ..errors import ConfigError - - -logger = logging.getLogger(__name__) - - -class FG21simWSHandler(tornado.websocket.WebSocketHandler): - """ - WebSocket for bi-directional communication between the Web UI and - the server, which can deal with the configurations and execute the - simulation task. - - Generally, WebSocket send and receive data as *string*. Therefore, - the more complex data are stringified as JSON string before sending, - which will be parsed after receive. - - Each message (as a JSON object or Python dictionary) has a ``type`` - field which will be used to determine the following action to take. - - Attributes - ---------- - name : str - Name to distinguish this WebSocket handle. - from_localhost : bool - Set to ``True`` if the access is from the localhost, - otherwise ``False``. - configs : `~ConfigManager` - A ``ConfigManager`` instance, for configuration manipulations when - communicating with the Web UI. - """ - name = "fg21sim" - from_localhost = None - - def check_origin(self, origin): - """Check the origin of the WebSocket access. - - Attributes - ---------- - from_localhost : bool - Set to ``True`` if the access is from the localhost, - otherwise ``False``. - - NOTE - ---- - Currently, only allow access from the ``localhost`` - (i.e., 127.0.0.1) and local LAN. - """ - self.from_localhost = False - logger.info("WebSocket: {0}: origin: {1}".format(self.name, origin)) - ip = get_host_ip(url=origin) - network = options.hosts_allowed - if ip == "127.0.0.1": - self.from_localhost = True - allow = True - logger.info("WebSocket: %s: origin is localhost" % self.name) - elif network.upper() == "ANY": - # Any hosts are allowed - allow = True - logger.warning("WebSocket: %s: any hosts are allowed" % self.name) - elif ip_in_network(ip, network): - allow = True - logger.info("WebSocket: %s: " % self.name + - "client is in the allowed network: %s" % network) - else: - allow = False - logger.error("WebSocket: %s: " % self.name + - "client is NOT in the allowed network: %s" % network) - return allow - - def open(self): - """Invoked when a new WebSocket is opened by the client.""" - # FIXME: - # * better to move to the `Application` class ?? - # * or create a ``ConfigsHandler`` similar to the ``ConsoleHandler`` - self.configs = self.application.configmanager - self.console_handler = ConsoleHandler(websocket=self) - # - logger.info("WebSocket: {0}: opened".format(self.name)) - logger.info("Allowed hosts: {0}".format(options.hosts_allowed)) - - def on_close(self): - """Invoked when a new WebSocket is closed by the client.""" - code, reason = None, None - if hasattr(self, "close_code"): - code = self.close_code - if hasattr(self, "close_reason"): - reason = self.close_reason - logger.info("WebSocket: {0}: closed by client: {1}, {2}".format( - self.name, code, reason)) - - # FIXME/XXX: - # * How to be non-blocking ?? - # NOTE: WebSocket.on_message: may NOT be a coroutine at the moment (v4.3) - # References: - # [1] https://stackoverflow.com/a/35543856/4856091 - # [2] https://stackoverflow.com/a/33724486/4856091 - def on_message(self, message): - """Handle incoming messages and dispatch task according to the - message type. - - NOTE - ---- - The received message (parsed to a Python dictionary) has a ``type`` - item which will be used to determine the following action to take. - - Currently supported message types are: - ``configs``: - Request or set the configurations - ``console``: - Control the simulation tasks, or request logging messages - ``results``: - Request the simulation results - - The sent message also has a ``type`` item of same value, which the - client can be used to figure out the proper actions. - There is a ``success`` item which indicates the status of the - requested operation, and an ``error`` recording the error message - if ``success=False``. - """ - logger.debug("WebSocket: %s: received: %s" % (self.name, message)) - try: - msg = json.loads(message) - msg_type = msg["type"] - except json.JSONDecodeError: - logger.warning("WebSocket: {0}: ".format(self.name) + - "message is not a valid JSON string") - response = {"success": False, - "type": None, - "error": "message is not a valid JSON string"} - except (KeyError, TypeError): - logger.warning("WebSocket: %s: skip invalid message" % self.name) - response = {"success": False, - "type": None, - "error": "type is missing"} - else: - # Check the message type and dispatch task - if msg_type == "configs": - # Request or set the configurations - response = self._handle_configs(msg) - elif msg_type == "console": - # Control the simulation tasks, or request logging messages - # FIXME/XXX: - # * How to make this asynchronously ?? - response = self.console_handler.handle_message(msg) - elif msg_type == "results": - # Request the simulation results - response = self._handle_results(msg) - else: - # Message of unknown type - logger.warning("WebSocket: {0}: ".format(self.name) + - "unknown message type: {0}".format(msg_type)) - response = {"success": False, - "type": msg_type, - "error": "unknown message type %s" % msg_type} - # - msg_response = json.dumps(response) - self.write_message(msg_response) - - def _handle_configs(self, msg): - """Handle the message of type "configs", which request to get or - set some configurations by the client. - - TODO: - * improve the description ... - * split these handling functions into a separate class in a module - - Parameters - ---------- - msg : dict - A dictionary parsed from the incoming JSON message, which - generally has the following syntax: - ``{"type": "configs", "action": , "data": }`` - where the ```` is ``set`` or ``get``, and the ```` - is a list of config keys or a dict of config key-value pairs. - - Returns - ------- - response : dict - A dictionary parsed from the incoming JSON message, which - generally has the following syntax: - ``{"type": "configs", "action": , - "data": , "errors": }`` - where the ```` is the same as input, the ```` is - a list of config keys or a dict of config key-value pairs, and - ```` contains the error message for the invalid config - values. - """ - try: - msg_type = msg["type"] - msg_action = msg["action"] - response = {"type": msg_type, "action": msg_action} - logger.info("WebSocket: {0}: handle message: ".format(self.name) + - "type: {0}, action: {1}".format(msg_type, msg_action)) - if msg_action == "get": - # Get the values of the specified options - try: - data, errors = self._get_configs(keys=msg["keys"]) - response["success"] = True - response["data"] = data - response["errors"] = errors - except KeyError: - response["success"] = False - response["error"] = "'keys' is missing" - elif msg_action == "set": - # Set the values of the specified options - try: - errors = self._set_configs(data=msg["data"]) - response["success"] = True - response["data"] = {} # be more consistent - response["errors"] = errors - except KeyError: - response["success"] = False - response["error"] = "'data' is missing" - elif msg_action == "reset": - # Reset the configurations to the defaults - self._reset_configs() - response["success"] = True - elif msg_action == "load": - # Load the supplied user configuration file - try: - success, error = self._load_configs(msg["userconfig"]) - response["success"] = success - if not success: - response["error"] = error - except KeyError: - response["success"] = False - response["error"] = "'userconfig' is missing" - elif msg_action == "save": - # Save current configurations to file - try: - success, error = self._save_configs(msg["outfile"], - msg["clobber"]) - response["success"] = success - if not success: - response["error"] = error - except KeyError: - response["success"] = False - response["error"] = "'outfile' or 'clobber' is missing" - else: - logger.warning("WebSocket: {0}: ".format(self.name) + - "unknown action: {0}".format(msg_action)) - response["success"] = False - response["error"] = "unknown action: {0}".format(msg_action) - except KeyError: - # Received message has wrong syntax/format - response = {"success": False, - "type": msg_type, - "error": "no action specified"} - # - logger.debug("WebSocket: {0}: ".format(self.name) + - "response: {0}".format(response)) - return response - - def _get_configs(self, keys=None): - """Get the values of the config options specified by the given keys. - - Parameters - ---------- - keys : list[str], optional - A list of keys specifying the config options whose values will - be obtained. - If ``keys=None``, then all the configurations values are dumped. - - Returns - ------- - data : dict - A dictionary with keys the same as the input keys, and values - the corresponding config option values. - errors : dict - When error occurs (e.g., invalid key), then the specific errors - with details are stored in this dictionary. - - NOTE - ---- - Do not forget the ``userconfig`` option. - """ - if keys is None: - # Dump all the configurations - data = self.configs.dump(flatten=True) - data["userconfig"] = self.configs.userconfig - errors = {} - else: - data = {} - errors = {} - for key in keys: - if key == "userconfig": - data["userconfig"] = self.configs.userconfig - else: - try: - data[key] = self.configs.getn(key) - except KeyError as e: - errors[key] = str(e) - # - return (data, errors) - - def _set_configs(self, data): - """Set the values of the config options specified by the given keys - to the corresponding supplied data. - - NOTE - ---- - The ``userconfig`` needs special handle. - The ``workdir`` and ``configfile`` options should be ignored. - - Parameters - ---------- - data : dict - A dictionary of key-value pairs, with keys specifying the config - options whose value will be changed, and values the new values - to which config options will be set. - NOTE: - If want to set the ``userconfig`` option, an *absolute path* - must be provided. - - Returns - ------- - errors : dict - When error occurs (e.g., invalid key, invalid values), then the - specific errors with details are stored in this dictionary. - """ - errors = {} - for key, value in data.items(): - if key in ["workdir", "configfile"]: - # Ignore "workdir" and "configfile" - continue - elif key == "userconfig": - if os.path.isabs(os.path.expanduser(value)): - self.configs.userconfig = value - else: - errors[key] = "Not an absolute path" - else: - try: - self.configs.setn(key, value) - except KeyError as e: - errors[key] = str(e) - # NOTE: - # Check the whole configurations after all provided options are - # updated, and merge the validation errors. - __, cherr = self.configs.check_all(raise_exception=False) - errors.update(cherr) - return errors - - def _reset_configs(self): - """Reset the configurations to the defaults.""" - self.configs.reset() - - def _load_configs(self, userconfig): - """Load configurations from the provided user configuration file. - - Parameters - ---------- - userconfig: str - The filepath to the user configuration file, which must be - an *absolute path*. - - Returns - ------- - success : bool - ``True`` if the operation succeeded, otherwise, ``False``. - error : str - If failed, this ``error`` saves the details, otherwise, ``None``. - """ - success = False - error = None - if os.path.isabs(os.path.expanduser(userconfig)): - try: - self.configs.read_userconfig(userconfig) - success = True - except ConfigError as e: - error = str(e) - else: - error = "Not an absolute path" - return (success, error) - - def _save_configs(self, outfile, clobber=False): - """Save current configurations to file. - - Parameters - ---------- - outfile: str - The filepath to the output configuration file, which must be - an *absolute path*. - clobber : bool, optional - Whether overwrite the output file if already exists? - - Returns - ------- - success : bool - ``True`` if the operation succeeded, otherwise, ``False``. - error : str - If failed, this ``error`` saves the details, otherwise, ``None``. - """ - success = False - error = None - try: - self.configs.save(outfile, clobber=clobber) - success = True - except (ValueError, OSError) as e: - error = str(e) - return (success, error) - - def _handle_results(self, msg): - # Got a message of supported types - msg_type = msg["type"] - logger.info("WebSocket: {0}: ".format(self.name) + - "handle message of type: {0}".format(msg_type)) - response = {"success": True, "type": msg_type} - return response -- cgit v1.2.2