From 0c2ab928af57178e6d7d3265709c3f1c3b654013 Mon Sep 17 00:00:00 2001 From: Aaron LI Date: Wed, 9 Nov 2016 20:22:49 +0800 Subject: webui: Add "ConsoleHandler" to handle the "console" type message XXX/FIXME: The console task will BLOCK the tornado, which should be FIXED! However, the `WebSocket.on_message` currently may NOT be a coroutine (as of Tornado v4.3), so another way should be taken to solve this problem in order to call the console task asynchronously! --- fg21sim/webui/consolehandler.py | 199 ++++++++++++++++++++++++++++++++++++++++ fg21sim/webui/websocket.py | 32 ++++--- 2 files changed, 219 insertions(+), 12 deletions(-) create mode 100644 fg21sim/webui/consolehandler.py (limited to 'fg21sim/webui') diff --git a/fg21sim/webui/consolehandler.py b/fg21sim/webui/consolehandler.py new file mode 100644 index 0000000..0184a93 --- /dev/null +++ b/fg21sim/webui/consolehandler.py @@ -0,0 +1,199 @@ +# Copyright (c) 2016 Weitian LI +# MIT license + +""" +Handle the "console" type of messages from the client. +""" + +import json +import logging +from concurrent.futures import ThreadPoolExecutor + +import tornado.ioloop +import tornado.gen + +from .loghandler import WebSocketLogHandler + + +logger = logging.getLogger(__name__) + + +class ConsoleHandler: + """ + Handle the "console" type of messages from the client. + + XXX/TODO: + * How to kill the submitted task? (force kill thread?) + + Parameters + ---------- + websocket : `~tornado.websocket.WebSocketHandler` + An `~tornado.websocket.WebSocketHandler` instance. + The ``WebSocketLogHandler`` requires this to push logging messages + to the client. + max_workers : int, optional + Maximum number of workers/threads for the execution pool + onetask_only : bool, optional + Whether to allow only one task running at the same time? + + Attributes + ---------- + websocket : `~tornado.websocket.WebSocketHandler` + An `~tornado.websocket.WebSocketHandler` instance, which is used by + the ``WebSocketLogHandler`` to push logging messages to the client. + wsloghandler : `~WebSocketLogHandler` + Push logging messages to the client through WebSocket. + executor : `~concurrent.futures.ThreadPoolExecutor` + Where to submit the synchronous task and make it perform + asynchronously using threads. + io_loop : `~tornado.ioloop.IOLoop` + Used to communicate with the main thread (e.g., callback) from the + submitted task, which is executed on a different thread. + onetask_only : bool + Whether to allow only one task running at the same time? + status : dict + Whether the task is running and/or finished? + There may be 4 possible status: + 1. running=False, finished=False: not started + 2. running=False, finished=True: finished + 3. running=True, finished=False: running + 4. running=True, finished=True: ?? error ?? + """ + def __init__(self, websocket, max_workers=3, onetask_only=False): + self.websocket = websocket + self.wsloghandler = WebSocketLogHandler(websocket, msg_type="console") + self.executor = ThreadPoolExecutor(max_workers=1) + # NOTE: + # Use ``IOLoop.instance`` instead of ``IOLoop.current``, since we + # will need to communicate with the main thread from another thread. + self.io_loop = tornado.ioloop.IOLoop.instance() + self.onetask_only = onetask_only + self.status = {"running": False, "finished": False} + + def handle_message(self, msg): + try: + msg_type = msg["type"] + msg_action = msg["action"] + response = {"type": msg_type, "action": msg_action} + logger.info("WebSocket: handle message: " + + "type: {0}, action: {1}".format(msg_type, msg_action)) + if msg_action == "start": + # Start the task asynchronously + future = self._start(msg["time"]) + self.io_loop.add_future(future, self._response_future) + response["success"] = True + response["status"] = "future" + elif msg_action == "get_status": + response["success"] = True + response["action"] = "push" + response["status"] = self.status + else: + logger.warning("WebSocket: " + + "unknown action: {0}".format(msg_action)) + response["success"] = False + response["error"] = "unknown action: {0}".format(msg_action) + except KeyError: + # Received message has wrong syntax/format + response = {"success": False, + "type": msg_type, + "error": "no action specified"} + # + logger.debug("WebSocket: response: {0}".format(response)) + return response + + # FIXME/XXX: + # * How to call this task asynchronously ?? + def _start(self, *args, **kwargs): + """ + Start the task by submitting it to the executor + + Returns + ------- + success : bool + Whether success without any errors + error : str + Detail of the error if not succeed + + """ + if self.onetask_only and self.status["running"]: + logger.warning("Task already running, and only one task allowed") + success = False + error = "already running and only one task allowed" + else: + logger.info("Start the task on the executor ...") + self.status["running"] = True + self.status["finished"] = False + # Also push the logging messages to the client + self._add_wsloghandler() + future = self.executor.submit(self._task, *args, **kwargs) + self.io_loop.add_future(future, self._task_callback) + success, error = future.result() + return (success, error) + + def _response_future(self, future): + """ + Callback function which will be called when the caller finishes + in order to response the results to the client. + """ + response = {"type": "console", "action": "future"} + success, error = future.result() + response["success"] = success + if not success: + response["error"] = error + logger.debug("WebSocket: future response: {0}".format(response)) + msg_response = json.dumps(response) + self.websocket.write_message(msg_response) + + def _add_wsloghandler(self): + """Add the ``self.wsloghandler`` to the logging handlers""" + root_logger = logging.getLogger() + root_logger.addHandler(self.wsloghandler) + logger.info("Added the WebSocket logging handler") + + def _remove_wsloghandler(self): + """Remove the ``self.wsloghandler`` from the logging handlers""" + root_logger = logging.getLogger() + root_logger.removeHandler(self.wsloghandler) + logger.info("Removed the WebSocket logging handler") + + def _task_callback(self, future): + """Callback function executed when the task finishes""" + logger.info("Console task finished! Callback ...") + self.status["running"] = False + self.status["finished"] = True + self._remove_wsloghandler() + # + response = {"type": "console", "action": "push"} + response["success"] = True + response["status"] = self.status + logger.debug("WebSocket: future response: {0}".format(response)) + msg_response = json.dumps(response) + self.websocket.write_message(msg_response) + + def _task(self, *args, **kwargs): + """ + The task this console to manage. + + Returns + ------- + success : bool + Whether success without any errors + error : str + Detail of the error if not succeed + + NOTE + ---- + The task is synchronous and may be computationally intensive + (i.e., CPU-bound rather than IO/event-bound), therefore, + threads (or processes) are required to make it non-blocking + (i.e., asynchronous). + + Credit: https://stackoverflow.com/a/32164711/4856091 + """ + import time + logger.info("console task: START") + for i in range(args[0]): + logger.info("console task: slept {0} seconds ...".format(i)) + time.sleep(1) + logger.info("console task: DONE!") + return (True, None) diff --git a/fg21sim/webui/websocket.py b/fg21sim/webui/websocket.py index fffc19c..d205004 100644 --- a/fg21sim/webui/websocket.py +++ b/fg21sim/webui/websocket.py @@ -24,9 +24,10 @@ import logging import tornado.websocket from tornado.options import options +from .consolehandler import ConsoleHandler +from .utils import get_host_ip, ip_in_network from ..configs import ConfigManager from ..errors import ConfigError -from .utils import get_host_ip, ip_in_network logger = logging.getLogger(__name__) @@ -58,7 +59,6 @@ class FG21simWSHandler(tornado.websocket.WebSocketHandler): """ name = "fg21sim" from_localhost = None - configs = ConfigManager() def check_origin(self, origin): """Check the origin of the WebSocket access. @@ -98,6 +98,12 @@ class FG21simWSHandler(tornado.websocket.WebSocketHandler): def open(self): """Invoked when a new WebSocket is opened by the client.""" + # FIXME: + # * better to move to the `Application` class ?? + # * or create a ``ConfigsHandler`` similar to the ``ConsoleHandler`` + self.configs = ConfigManager() + self.console_handler = ConsoleHandler(websocket=self) + # logger.info("WebSocket: {0}: opened".format(self.name)) logger.info("Allowed hosts: {0}".format(options.hosts_allowed)) @@ -111,6 +117,12 @@ class FG21simWSHandler(tornado.websocket.WebSocketHandler): logger.info("WebSocket: {0}: closed by client: {1}, {2}".format( self.name, code, reason)) + # FIXME/XXX: + # * How to be non-blocking ?? + # NOTE: WebSocket.on_message: may NOT be a coroutine at the moment (v4.3) + # References: + # [1] https://stackoverflow.com/a/35543856/4856091 + # [2] https://stackoverflow.com/a/33724486/4856091 def on_message(self, message): """Handle incoming messages and dispatch task according to the message type. @@ -156,7 +168,9 @@ class FG21simWSHandler(tornado.websocket.WebSocketHandler): response = self._handle_configs(msg) elif msg_type == "console": # Control the simulation tasks, or request logging messages - response = self._handle_console(msg) + # FIXME/XXX: + # * How to make this asynchronously ?? + response = self.console_handler.handle_message(msg) elif msg_type == "results": # Request the simulation results response = self._handle_results(msg) @@ -175,7 +189,9 @@ class FG21simWSHandler(tornado.websocket.WebSocketHandler): """Handle the message of type "configs", which request to get or set some configurations by the client. - TODO: improve the description ... + TODO: + * improve the description ... + * split these handling functions into a separate class in a module Parameters ---------- @@ -407,14 +423,6 @@ class FG21simWSHandler(tornado.websocket.WebSocketHandler): error = str(e) return (success, error) - def _handle_console(self, msg): - # Got a message of supported types - msg_type = msg["type"] - logger.info("WebSocket: {0}: ".format(self.name) + - "handle message of type: {0}".format(msg_type)) - response = {"success": True, "type": msg_type} - return response - def _handle_results(self, msg): # Got a message of supported types msg_type = msg["type"] -- cgit v1.2.2