From 4c4531530eaa94be2abeec2b28e343c312183334 Mon Sep 17 00:00:00 2001 From: Aaron LI Date: Thu, 17 Nov 2016 22:28:07 +0800 Subject: webui: console.py: Change to request/response instead of WebSocket * Rename to "ConsoleAJAXHandler" * Change to use the request/response model, which accepts the AJAX request and then response. * Update logging handler to push messages to all connected clients * Many simplifications and rewrites. --- fg21sim/webui/handlers/console.py | 343 ++++++++++++++++---------------------- 1 file changed, 145 insertions(+), 198 deletions(-) diff --git a/fg21sim/webui/handlers/console.py b/fg21sim/webui/handlers/console.py index fe1b500..793198c 100644 --- a/fg21sim/webui/handlers/console.py +++ b/fg21sim/webui/handlers/console.py @@ -5,216 +5,158 @@ Handle the "console" type of messages from the client. """ -import json import logging -from concurrent.futures import ThreadPoolExecutor import tornado.ioloop import tornado.gen +from tornado.escape import json_decode, json_encode +from .base import BaseRequestHandler from .log import WebSocketLogHandler logger = logging.getLogger(__name__) -class ConsoleHandler: +class ConsoleAJAXHandler(BaseRequestHandler): """ - Handle the "console" type of messages from the client. - - XXX/TODO: - * How to kill the submitted task? (force kill thread?) - - Parameters - ---------- - websocket : `~tornado.websocket.WebSocketHandler` - An `~tornado.websocket.WebSocketHandler` instance. - The ``WebSocketLogHandler`` requires this to push logging messages - to the client. - max_workers : int, optional - Maximum number of workers/threads for the execution pool - onetask_only : bool, optional - Whether to allow only one task running at the same time? - - Attributes - ---------- - websocket : `~tornado.websocket.WebSocketHandler` - An `~tornado.websocket.WebSocketHandler` instance, which is used by - the ``WebSocketLogHandler`` to push logging messages to the client. - wsloghandler : `~WebSocketLogHandler` - Push logging messages to the client through WebSocket. - executor : `~concurrent.futures.ThreadPoolExecutor` - Where to submit the synchronous task and make it perform - asynchronously using threads. - io_loop : `~tornado.ioloop.IOLoop` - Used to communicate with the main thread (e.g., callback) from the - submitted task, which is executed on a different thread. - onetask_only : bool - Whether to allow only one task running at the same time? - status : dict - Whether the task is running and/or finished? - There may be 4 possible status: - 1. running=False, finished=False: not started - 2. running=False, finished=True: finished - 3. running=True, finished=False: running - 4. running=True, finished=True: ?? error ?? + Handle the AJAX requests from the client to control the tasks. """ - def __init__(self, websocket, max_workers=3, onetask_only=False): - self.websocket = websocket - self.wsloghandler = WebSocketLogHandler(websocket, msg_type="console") - self.executor = ThreadPoolExecutor(max_workers=1) + # Allow only one task running at the same time + onetask_only = True + + def initialize(self): + """Hook for subclass initialization. Called for each request.""" + self.configs = self.application.configmanager + self.status = self.application.task_status # NOTE: # Use ``IOLoop.instance`` instead of ``IOLoop.current``, since we - # will need to communicate with the main thread from another thread. + # will need to communicate with the main thread (e.g., callback) + # from another thread, which executes the submitted task. self.io_loop = tornado.ioloop.IOLoop.instance() - self.onetask_only = onetask_only - self.status = {"running": False, "finished": False} - def handle_message(self, msg): - try: - msg_type = msg["type"] - msg_action = msg["action"] - response = {"type": msg_type, "action": msg_action} - logger.info("WebSocket: handle message: " + - "type: {0}, action: {1}".format(msg_type, msg_action)) - if msg_action == "start": - # FIXME/XXX: This task should be asynchronous! - success, error = self._start() - response["success"] = success - if not success: - response["error"] = error - elif msg_action == "start_test": - # FIXME/XXX: This task should be asynchronous! - success, error = self._start(msg["time"]) - response["success"] = success - if not success: - response["error"] = error - elif msg_action == "get_status": - response["success"] = True - response["action"] = "push" - response["status"] = self.status - else: - logger.warning("WebSocket: " + - "unknown action: {0}".format(msg_action)) - response["success"] = False - response["error"] = "unknown action: {0}".format(msg_action) - except KeyError: - # Received message has wrong syntax/format - response = {"success": False, - "type": msg_type, - "error": "no action specified"} + @tornado.web.authenticated + def get(self): + """ + Handle the READ-ONLY tasks operations. + + Supported actions: + - get: Get the current status of tasks. + """ + action = self.get_argument("action", "get") + if action == "get": + response = {"status": self.status} + success = True + else: + # ERROR: bad action + success = False + reason = "Bad request action: {0}".format(action) # - logger.debug("WebSocket: response: {0}".format(response)) - return response + if success: + logger.debug("Response: {0}".format(response)) + self.set_header("Content-Type", "application/json; charset=UTF-8") + self.write(json_encode(response)) + else: + logger.warning("Request failed: {0}".format(reason)) + self.send_error(400, reason=reason) - # FIXME/XXX: - # * How to call this task asynchronously ?? - def _start_test(self, *args, **kwargs): + @tornado.web.authenticated + def post(self): """ - Start the task by submitting it to the executor + Handle the READ-WRITE task operations. - Returns - ------- - success : bool - Whether success without any errors - error : str - Detail of the error if not succeed + XXX/TODO: + * How to kill the submitted task? (force kill thread?) + Supported actions: + - start: Start the default or specified task. + - stop: Stop the running task (TODO/XXX) """ - if self.onetask_only and self.status["running"]: - logger.warning("Task already running, and only one task allowed") + request = json_decode(self.request.body) + logger.debug("Received request: {0}".format(request)) + action = request.get("action") + if action == "start": + task = request.get("task") + kwargs = request.get("kwargs", {}) + success, reason = self._start_task(task=task, kwargs=kwargs) + elif action == "stop": success = False - error = "already running and only one task allowed" + reason = "NOT implemented action: {0}".format(action) else: - logger.info("Start the task on the executor ...") - self.status["running"] = True - self.status["finished"] = False - # Also push the logging messages to the client - self._add_wsloghandler() - future = self.executor.submit(self._task_test, *args, **kwargs) - self.io_loop.add_future(future, self._task_callback) - success, error = future.result() - return (success, error) + # ERROR: bad action + success = False + reason = "Bad request action: {0}".format(action) + # + if success: + response = {"status": self.status} + logger.debug("Response: {0}".format(response)) + self.set_header("Content-Type", "application/json; charset=UTF-8") + self.write(json_encode(response)) + else: + logger.warning("Request failed: {0}".format(reason)) + self.send_error(400, reason=reason) # FIXME/XXX: # * How to call this task asynchronously ?? - def _start(self, *args, **kwargs): + def _start_task(self, task=None, kwargs={}): """ - Start the task by submitting it to the executor + Start the task by submitting it to the executor. + + Parameters + ---------- + task : str, optional + The name of the task to be started. + If not specified, then start the default task. + NOTE: + Currently only support the default (``None``) and ``test`` tasks. + kwargs : dict, optional + Keyword arguments to be passed to the task. Returns ------- success : bool - Whether success without any errors + Whether the task successfully finished? error : str - Detail of the error if not succeed - + Error message if the task failed """ + TASKS = { + None: self._task_default, # default task + "test": self._task_test, + } + try: + f_task = TASKS[task] + except KeyError: + success = False + error = "Unknown task: {0}".format(task) + logger.warning(error) + return (success, error) + # if self.onetask_only and self.status["running"]: - logger.warning("Task already running, and only one task allowed") success = False - error = "already running and only one task allowed" + error = "Task already running and only one task allowed" + logger.warning(error) else: - logger.info("Start the task on the executor ...") + logger.info("Submit the task to the executor ...") self.status["running"] = True self.status["finished"] = False - # Also push the logging messages to the client + # Also push the logging messages to clients through WebSocket self._add_wsloghandler() - future = self.executor.submit(self._task, *args, **kwargs) + future = self.application.executor.submit(f_task, **kwargs) self.io_loop.add_future(future, self._task_callback) success, error = future.result() return (success, error) - def _response_future(self, future): - """ - Callback function which will be called when the caller finishes - in order to response the results to the client. - """ - response = {"type": "console", "action": "future"} - success, error = future.result() - response["success"] = success - if not success: - response["error"] = error - logger.debug("WebSocket: future response: {0}".format(response)) - msg_response = json.dumps(response) - self.websocket.write_message(msg_response) - - def _add_wsloghandler(self): - """Add the ``self.wsloghandler`` to the logging handlers""" - root_logger = logging.getLogger() - root_logger.addHandler(self.wsloghandler) - logger.info("Added the WebSocket logging handler") - - def _remove_wsloghandler(self): - """Remove the ``self.wsloghandler`` from the logging handlers""" - root_logger = logging.getLogger() - root_logger.removeHandler(self.wsloghandler) - logger.info("Removed the WebSocket logging handler") - - def _task_callback(self, future): - """Callback function executed when the task finishes""" - logger.info("Console task finished! Callback ...") - self.status["running"] = False - self.status["finished"] = True - self._remove_wsloghandler() - # - response = {"type": "console", "action": "push"} - response["success"] = True - response["status"] = self.status - logger.debug("WebSocket: future response: {0}".format(response)) - msg_response = json.dumps(response) - self.websocket.write_message(msg_response) - - def _task_test(self, *args, **kwargs): + def _task_default(self, **kwargs): """ - The task this console to manage. + The default task that this console manages, which performs + the foregrounds simulations. Returns ------- success : bool - Whether success without any errors + Whether the task successfully finished? error : str - Detail of the error if not succeed + Error message if the task failed NOTE ---- @@ -223,53 +165,58 @@ class ConsoleHandler: threads (or processes) are required to make it non-blocking (i.e., asynchronous). - Credit: https://stackoverflow.com/a/32164711/4856091 - """ - import time - logger.info("console task: START") - for i in range(args[0]): - logger.info("console task: slept {0} seconds ...".format(i)) - time.sleep(1) - logger.info("console task: DONE!") - return (True, None) - - def _task(self, *args, **kwargs): - """ - The task this console to manage. - Perform the foregrounds simulations. - - Returns - ------- - success : bool - Whether success without any errors - error : str - Detail of the error if not succeed - - NOTE - ---- - The task is synchronous and may be computationally intensive - (i.e., CPU-bound rather than IO/event-bound), therefore, - threads (or processes) are required to make it non-blocking - (i.e., asynchronous). - - Credit: https://stackoverflow.com/a/32164711/4856091 + References: + [1] https://stackoverflow.com/a/32164711/4856091 """ + logger.info("Console DEFAULT task: START ...") logger.info("Preparing to start foregrounds simulations ...") logger.info("Importing modules + Numba JIT, waiting ...") - from ..foregrounds import Foregrounds - - # FIXME: This is a hack - configs = self.websocket.configs + # logger.info("Checking the configurations ...") - configs.check_all() - - fg = Foregrounds(configs) + self.configs.check_all() + fg = Foregrounds(self.configs) fg.preprocess() fg.simulate() fg.postprocess() - logger.info("Foregrounds simulations DONE!") + logger.info("Console DEFAULT task: DONE!") + # NOTE: always return a tuple of (success, error) + return (True, None) - # NOTE: Should always return a tuple of (success, error) + def _task_test(self, **kwargs): + """ + Test task ... + """ + import time + logger.info("Console TEST task: START ...") + for i in range(kwargs["time"]): + logger.info("Console TEST task: slept {0} seconds ...".format(i)) + time.sleep(1) + logger.info("Console TEST task: DONE!") return (True, None) + + def _task_callback(self, future): + """Callback function executed when the task finishes""" + logger.info("Task finished! Callback ...") + self.status["running"] = False + self.status["finished"] = True + self._remove_wsloghandler() + logger.info("Callback DONE!") + + def _add_wsloghandler(self): + """ + Add a WebSocket handler to the logging handlers, which will capture + all the logging messages and push them to the client. + """ + self.wsloghandler = WebSocketLogHandler(self.application.websockets, + msg_type="console") + root_logger = logging.getLogger() + root_logger.addHandler(self.wsloghandler) + logger.info("Added the WebSocket logging handler") + + def _remove_wsloghandler(self): + """Remove the WebSocket logging handler""" + root_logger = logging.getLogger() + root_logger.removeHandler(self.wsloghandler) + logger.info("Removed the WebSocket logging handler") -- cgit v1.2.2