aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--fg21sim/webui/consolehandler.py199
-rw-r--r--fg21sim/webui/websocket.py32
2 files changed, 219 insertions, 12 deletions
diff --git a/fg21sim/webui/consolehandler.py b/fg21sim/webui/consolehandler.py
new file mode 100644
index 0000000..0184a93
--- /dev/null
+++ b/fg21sim/webui/consolehandler.py
@@ -0,0 +1,199 @@
+# Copyright (c) 2016 Weitian LI <liweitianux@live.com>
+# MIT license
+
+"""
+Handle the "console" type of messages from the client.
+"""
+
+import json
+import logging
+from concurrent.futures import ThreadPoolExecutor
+
+import tornado.ioloop
+import tornado.gen
+
+from .loghandler import WebSocketLogHandler
+
+
+logger = logging.getLogger(__name__)
+
+
+class ConsoleHandler:
+ """
+ Handle the "console" type of messages from the client.
+
+ XXX/TODO:
+ * How to kill the submitted task? (force kill thread?)
+
+ Parameters
+ ----------
+ websocket : `~tornado.websocket.WebSocketHandler`
+ An `~tornado.websocket.WebSocketHandler` instance.
+ The ``WebSocketLogHandler`` requires this to push logging messages
+ to the client.
+ max_workers : int, optional
+ Maximum number of workers/threads for the execution pool
+ onetask_only : bool, optional
+ Whether to allow only one task running at the same time?
+
+ Attributes
+ ----------
+ websocket : `~tornado.websocket.WebSocketHandler`
+ An `~tornado.websocket.WebSocketHandler` instance, which is used by
+ the ``WebSocketLogHandler`` to push logging messages to the client.
+ wsloghandler : `~WebSocketLogHandler`
+ Push logging messages to the client through WebSocket.
+ executor : `~concurrent.futures.ThreadPoolExecutor`
+ Where to submit the synchronous task and make it perform
+ asynchronously using threads.
+ io_loop : `~tornado.ioloop.IOLoop`
+ Used to communicate with the main thread (e.g., callback) from the
+ submitted task, which is executed on a different thread.
+ onetask_only : bool
+ Whether to allow only one task running at the same time?
+ status : dict
+ Whether the task is running and/or finished?
+ There may be 4 possible status:
+ 1. running=False, finished=False: not started
+ 2. running=False, finished=True: finished
+ 3. running=True, finished=False: running
+ 4. running=True, finished=True: ?? error ??
+ """
+ def __init__(self, websocket, max_workers=3, onetask_only=False):
+ self.websocket = websocket
+ self.wsloghandler = WebSocketLogHandler(websocket, msg_type="console")
+ self.executor = ThreadPoolExecutor(max_workers=1)
+ # NOTE:
+ # Use ``IOLoop.instance`` instead of ``IOLoop.current``, since we
+ # will need to communicate with the main thread from another thread.
+ self.io_loop = tornado.ioloop.IOLoop.instance()
+ self.onetask_only = onetask_only
+ self.status = {"running": False, "finished": False}
+
+ def handle_message(self, msg):
+ try:
+ msg_type = msg["type"]
+ msg_action = msg["action"]
+ response = {"type": msg_type, "action": msg_action}
+ logger.info("WebSocket: handle message: " +
+ "type: {0}, action: {1}".format(msg_type, msg_action))
+ if msg_action == "start":
+ # Start the task asynchronously
+ future = self._start(msg["time"])
+ self.io_loop.add_future(future, self._response_future)
+ response["success"] = True
+ response["status"] = "future"
+ elif msg_action == "get_status":
+ response["success"] = True
+ response["action"] = "push"
+ response["status"] = self.status
+ else:
+ logger.warning("WebSocket: " +
+ "unknown action: {0}".format(msg_action))
+ response["success"] = False
+ response["error"] = "unknown action: {0}".format(msg_action)
+ except KeyError:
+ # Received message has wrong syntax/format
+ response = {"success": False,
+ "type": msg_type,
+ "error": "no action specified"}
+ #
+ logger.debug("WebSocket: response: {0}".format(response))
+ return response
+
+ # FIXME/XXX:
+ # * How to call this task asynchronously ??
+ def _start(self, *args, **kwargs):
+ """
+ Start the task by submitting it to the executor
+
+ Returns
+ -------
+ success : bool
+ Whether success without any errors
+ error : str
+ Detail of the error if not succeed
+
+ """
+ if self.onetask_only and self.status["running"]:
+ logger.warning("Task already running, and only one task allowed")
+ success = False
+ error = "already running and only one task allowed"
+ else:
+ logger.info("Start the task on the executor ...")
+ self.status["running"] = True
+ self.status["finished"] = False
+ # Also push the logging messages to the client
+ self._add_wsloghandler()
+ future = self.executor.submit(self._task, *args, **kwargs)
+ self.io_loop.add_future(future, self._task_callback)
+ success, error = future.result()
+ return (success, error)
+
+ def _response_future(self, future):
+ """
+ Callback function which will be called when the caller finishes
+ in order to response the results to the client.
+ """
+ response = {"type": "console", "action": "future"}
+ success, error = future.result()
+ response["success"] = success
+ if not success:
+ response["error"] = error
+ logger.debug("WebSocket: future response: {0}".format(response))
+ msg_response = json.dumps(response)
+ self.websocket.write_message(msg_response)
+
+ def _add_wsloghandler(self):
+ """Add the ``self.wsloghandler`` to the logging handlers"""
+ root_logger = logging.getLogger()
+ root_logger.addHandler(self.wsloghandler)
+ logger.info("Added the WebSocket logging handler")
+
+ def _remove_wsloghandler(self):
+ """Remove the ``self.wsloghandler`` from the logging handlers"""
+ root_logger = logging.getLogger()
+ root_logger.removeHandler(self.wsloghandler)
+ logger.info("Removed the WebSocket logging handler")
+
+ def _task_callback(self, future):
+ """Callback function executed when the task finishes"""
+ logger.info("Console task finished! Callback ...")
+ self.status["running"] = False
+ self.status["finished"] = True
+ self._remove_wsloghandler()
+ #
+ response = {"type": "console", "action": "push"}
+ response["success"] = True
+ response["status"] = self.status
+ logger.debug("WebSocket: future response: {0}".format(response))
+ msg_response = json.dumps(response)
+ self.websocket.write_message(msg_response)
+
+ def _task(self, *args, **kwargs):
+ """
+ The task this console to manage.
+
+ Returns
+ -------
+ success : bool
+ Whether success without any errors
+ error : str
+ Detail of the error if not succeed
+
+ NOTE
+ ----
+ The task is synchronous and may be computationally intensive
+ (i.e., CPU-bound rather than IO/event-bound), therefore,
+ threads (or processes) are required to make it non-blocking
+ (i.e., asynchronous).
+
+ Credit: https://stackoverflow.com/a/32164711/4856091
+ """
+ import time
+ logger.info("console task: START")
+ for i in range(args[0]):
+ logger.info("console task: slept {0} seconds ...".format(i))
+ time.sleep(1)
+ logger.info("console task: DONE!")
+ return (True, None)
diff --git a/fg21sim/webui/websocket.py b/fg21sim/webui/websocket.py
index fffc19c..d205004 100644
--- a/fg21sim/webui/websocket.py
+++ b/fg21sim/webui/websocket.py
@@ -24,9 +24,10 @@ import logging
import tornado.websocket
from tornado.options import options
+from .consolehandler import ConsoleHandler
+from .utils import get_host_ip, ip_in_network
from ..configs import ConfigManager
from ..errors import ConfigError
-from .utils import get_host_ip, ip_in_network
logger = logging.getLogger(__name__)
@@ -58,7 +59,6 @@ class FG21simWSHandler(tornado.websocket.WebSocketHandler):
"""
name = "fg21sim"
from_localhost = None
- configs = ConfigManager()
def check_origin(self, origin):
"""Check the origin of the WebSocket access.
@@ -98,6 +98,12 @@ class FG21simWSHandler(tornado.websocket.WebSocketHandler):
def open(self):
"""Invoked when a new WebSocket is opened by the client."""
+ # FIXME:
+ # * better to move to the `Application` class ??
+ # * or create a ``ConfigsHandler`` similar to the ``ConsoleHandler``
+ self.configs = ConfigManager()
+ self.console_handler = ConsoleHandler(websocket=self)
+ #
logger.info("WebSocket: {0}: opened".format(self.name))
logger.info("Allowed hosts: {0}".format(options.hosts_allowed))
@@ -111,6 +117,12 @@ class FG21simWSHandler(tornado.websocket.WebSocketHandler):
logger.info("WebSocket: {0}: closed by client: {1}, {2}".format(
self.name, code, reason))
+ # FIXME/XXX:
+ # * How to be non-blocking ??
+ # NOTE: WebSocket.on_message: may NOT be a coroutine at the moment (v4.3)
+ # References:
+ # [1] https://stackoverflow.com/a/35543856/4856091
+ # [2] https://stackoverflow.com/a/33724486/4856091
def on_message(self, message):
"""Handle incoming messages and dispatch task according to the
message type.
@@ -156,7 +168,9 @@ class FG21simWSHandler(tornado.websocket.WebSocketHandler):
response = self._handle_configs(msg)
elif msg_type == "console":
# Control the simulation tasks, or request logging messages
- response = self._handle_console(msg)
+ # FIXME/XXX:
+ # * How to make this asynchronously ??
+ response = self.console_handler.handle_message(msg)
elif msg_type == "results":
# Request the simulation results
response = self._handle_results(msg)
@@ -175,7 +189,9 @@ class FG21simWSHandler(tornado.websocket.WebSocketHandler):
"""Handle the message of type "configs", which request to get or
set some configurations by the client.
- TODO: improve the description ...
+ TODO:
+ * improve the description ...
+ * split these handling functions into a separate class in a module
Parameters
----------
@@ -407,14 +423,6 @@ class FG21simWSHandler(tornado.websocket.WebSocketHandler):
error = str(e)
return (success, error)
- def _handle_console(self, msg):
- # Got a message of supported types
- msg_type = msg["type"]
- logger.info("WebSocket: {0}: ".format(self.name) +
- "handle message of type: {0}".format(msg_type))
- response = {"success": True, "type": msg_type}
- return response
-
def _handle_results(self, msg):
# Got a message of supported types
msg_type = msg["type"]