aboutsummaryrefslogtreecommitdiffstats
path: root/fg21sim/webui/handlers/console.py
blob: fece851597752c3b22a89826a12b079533a77309 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# Copyright (c) 2016 Weitian LI <liweitianux@live.com>
# MIT license

"""
Handle the AJAX requests from the client to control the tasks.
"""

import logging
import time

import tornado.ioloop
from tornado.escape import json_decode, json_encode

from .base import BaseRequestHandler
from .log import WebSocketLogHandler


logger = logging.getLogger(__name__)


class ConsoleAJAXHandler(BaseRequestHandler):
    """
    Handle the AJAX requests from the client to control the tasks.
    """
    # Allow only one task running at the same time
    onetask_only = True

    def initialize(self):
        """Hook for subclass initialization.  Called for each request."""
        self.configs = self.application.configmanager
        self.status = self.application.task_status
        # NOTE:
        # Use ``IOLoop.instance`` instead of ``IOLoop.current``, since we
        # will need to communicate with the main thread (e.g., callback)
        # from another thread, which executes the submitted task.
        self.io_loop = tornado.ioloop.IOLoop.instance()

    def get(self):
        """
        Handle the READ-ONLY tasks operations.

        Supported actions:
        - get: Get the current status of tasks.
        """
        action = self.get_argument("action", "get")
        if action == "get":
            response = {"status": self.status}
            success = True
        else:
            # ERROR: bad action
            success = False
            reason = "Bad request action: {0}".format(action)
        #
        if success:
            logger.debug("Response: {0}".format(response))
            self.set_header("Content-Type", "application/json; charset=UTF-8")
            self.write(json_encode(response))
        else:
            logger.warning("Request failed: {0}".format(reason))
            self.send_error(400, reason=reason)

    @tornado.web.authenticated
    def post(self):
        """
        Handle the READ-WRITE task operations.

        XXX/TODO:
        * How to kill the submitted task? (force kill thread?)

        Supported actions:
        - start: Start the default or specified task.
        - stop: Stop the running task (TODO/XXX)
        """
        request = json_decode(self.request.body)
        logger.debug("Received request: {0}".format(request))
        action = request.get("action")
        if action == "start":
            task = request.get("task")
            kwargs = request.get("kwargs", {})
            success, reason = self._start_task(task=task, kwargs=kwargs)
        elif action == "stop":
            success = False
            reason = "NOT implemented action: {0}".format(action)
        else:
            # ERROR: bad action
            success = False
            reason = "Bad request action: {0}".format(action)
        #
        if success:
            response = {"status": self.status}
            logger.debug("Response: {0}".format(response))
            self.set_header("Content-Type", "application/json; charset=UTF-8")
            self.write(json_encode(response))
        else:
            logger.warning("Request failed: {0}".format(reason))
            self.send_error(400, reason=reason)

    # FIXME/XXX:
    # * How to call this task asynchronously ??
    def _start_task(self, task=None, kwargs={}):
        """
        Start the task by submitting it to the executor.

        Parameters
        ----------
        task : str, optional
            The name of the task to be started.
            If not specified, then start the default task.
            NOTE:
            Currently only support the default (``None``) and ``test`` tasks.
        kwargs : dict, optional
            Keyword arguments to be passed to the task.

        Returns
        -------
        success : bool
            Whether the task successfully finished?
        error : str
            Error message if the task failed
        """
        TASKS = {
            None: self._task_default,  # default task
            "test": self._task_test,
        }
        try:
            f_task = TASKS[task]
        except KeyError:
            success = False
            error = "Unknown task: {0}".format(task)
            logger.warning(error)
            return (success, error)
        #
        if self.onetask_only and self.status["running"]:
            success = False
            error = "Task already running and only one task allowed"
            logger.warning(error)
        else:
            logger.info("Submit the task to the executor ...")
            self.status["running"] = True
            self.status["finished"] = False
            # Also push the logging messages to clients through WebSocket
            self._add_wsloghandler()
            future = self.application.executor.submit(f_task, **kwargs)
            self.io_loop.add_future(future, self._task_callback)
            success, error = future.result()
        return (success, error)

    def _task_default(self, **kwargs):
        """
        The default task that this console manages, which performs
        the foregrounds simulations.

        Returns
        -------
        success : bool
            Whether the task successfully finished?
        error : str
            Error message if the task failed

        NOTE
        ----
        The task is synchronous and may be computationally intensive
        (i.e., CPU-bound rather than IO/event-bound), therefore,
        threads (or processes) are required to make it non-blocking
        (i.e., asynchronous).

        References:
        [1] https://stackoverflow.com/a/32164711/4856091
        """
        t1_start = time.perf_counter()
        t2_start = time.process_time()
        logger.info("Console DEFAULT task: START ...")
        logger.info("Preparing to start foregrounds simulations ...")
        logger.info("Checking the configurations ...")
        self.configs.check_all()
        #
        logger.info("Importing modules + Numba JIT, waiting ...")
        from ...foregrounds import Foregrounds
        #
        fg = Foregrounds(self.configs)
        fg.preprocess()
        fg.simulate()
        fg.postprocess()
        logger.info("Foregrounds simulations DONE!")
        logger.info("Console DEFAULT task: DONE!")
        t1_stop = time.perf_counter()
        t2_stop = time.process_time()
        logger.info("Elapsed time: {0:.3f} (s)".format(t1_stop - t1_start))
        logger.info("CPU process time: {0:.3f} (s)".format(t2_stop - t2_start))
        # NOTE: always return a tuple of (success, error)
        return (True, None)

    def _task_test(self, **kwargs):
        """
        Test task ...
        """
        import time
        t1_start = time.perf_counter()
        t2_start = time.process_time()
        logger.info("Console TEST task: START ...")
        for i in range(kwargs["time"]):
            logger.info("Console TEST task: slept {0} seconds ...".format(i))
            time.sleep(1)
        logger.info("Console TEST task: DONE!")
        t1_stop = time.perf_counter()
        t2_stop = time.process_time()
        logger.info("Elapsed time: {0:.3f} (s)".format(t1_stop - t1_start))
        logger.info("CPU process time: {0:.3f} (s)".format(t2_stop - t2_start))
        return (True, None)

    def _task_callback(self, future):
        """Callback function executed when the task finishes"""
        logger.info("Task finished! Callback ...")
        self.status["running"] = False
        self.status["finished"] = True
        self._remove_wsloghandler()
        logger.info("Callback DONE!")

    def _add_wsloghandler(self):
        """
        Add a WebSocket handler to the logging handlers, which will capture
        all the logging messages and push them to the client.
        """
        self.wsloghandler = WebSocketLogHandler(self.application.websockets,
                                                msg_type="console")
        root_logger = logging.getLogger()
        root_logger.addHandler(self.wsloghandler)
        logger.info("Added the WebSocket logging handler")

    def _remove_wsloghandler(self):
        """Remove the WebSocket logging handler"""
        root_logger = logging.getLogger()
        root_logger.removeHandler(self.wsloghandler)
        logger.info("Removed the WebSocket logging handler")