Revert "Worker exception handling policy"
[qa-tools:ots.git] / ots.worker / ots / worker / task_broker.py
1 # ***** BEGIN LICENCE BLOCK *****
2
3 # This file is part of OTS
4 #
5 # Copyright (C) 2010 Nokia Corporation and/or its subsidiary(-ies).
6 #
7 # Contact: Mikko Makinen <mikko.al.makinen@nokia.com>
8 #
9 # This library is free software; you can redistribute it and/or
10 # modify it under the terms of the GNU Lesser General Public License
11 # version 2.1 as published by the Free Software Foundation.
12 #
13 # This library is distributed in the hope that it will be useful, but
14 # WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 # Lesser General Public License for more details.
17 #
18 # You should have received a copy of the GNU Lesser General Public
19 # License along with this library; if not, write to the Free Software
20 # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
21 # 02110-1301 USA
22 # ***** END LICENCE BLOCK *****
23
24 """
25 The Task Broker is the heart of the Worker.
26
27 Listen for messages containing Tasks 
28 from RabbitMQ and dispatch
29 the Tasks as (Blocking) Processes.
30
31 The simple implementation will hold as long as 
32 the following assumptions holding true:
33
34 1. There is only a single resource providing i/o (RabbitMQ)
35 2. Tasks are blocking
36 3. Tasks are run in Serial 
37 """
38
39 #Disable spurious pylint warnings
40
41 #pylint: disable-msg=E0611
42 #pylint: disable-msg=F0401
43
44 import os
45 import sys
46 from time import sleep
47 import logging
48 from itertools import cycle
49
50 from ots.common.amqp.api import unpack_message, pack_message
51 from ots.common.dto.api import StateChangeMessage, TaskCondition
52 from ots.common.routing.api import get_queues
53
54 import ots.worker
55 from ots.worker.command import Command
56 from ots.worker.command import SoftTimeoutException,  HardTimeoutException
57 from ots.worker.command import CommandFailed
58
59
60 LOGGER = logging.getLogger(__name__)
61
62 STOP_SIGNAL_FILE = "/tmp/stop_ots_worker"
63
64 TASK_CONDITION_RESPONSES = [TaskCondition.START,
65                             TaskCondition.FINISH]
66
67 class NotConnectedError(Exception):
68     """Exception raised if not connected to amqp"""
69     pass
70
71 ########################################
72 # Command Class to Function
73 ########################################
74
75 def _start_process(command):
76     """
77     Starts the specified process
78
79     @type command: string
80     @param command: The CL params for the Process to be run as a Task 
81     """
82     task = Command(command)
83     task.execute()
84
85
86 ##############################
87 # TASK_BROKER
88 ##############################
89
90 class TaskBroker(object):
91     """
92     Listens to a Queue of the given Routing Key.
93     Pulls messages containing Tasks from AMQP 
94     Dispatch the Tasks as a process
95     """   
96     def __init__(self, connection, device_properties):
97         """
98         device_properties have magic keys that
99         are dependent on the rules set out 
100         in ots.common.routing.routing 
101
102         @type connection : L{ots.worker.connection.Connection} 
103         @param connection : The connection 
104
105         @type device_properties : C{dict}
106         @param device_properties : The device_properties
107         """
108         self._connection = connection 
109         self._queues = get_queues(device_properties)
110         self._keep_looping = True
111         self._consumer_tags = dict()
112
113         self._task_state = cycle(TASK_CONDITION_RESPONSES)
114         self._amqp_log_handler = None
115
116     ############################################
117     # LOG HANDLER
118     ############################################
119
120     def _set_amqp_log_handler(self, amqp_log_handler):
121         """
122         @type amqp_log_handler : L{AMQPLogHandler}
123         @param amqp_log_handler: The AMQP Log Handler
124         """
125         self._amqp_log_handler = amqp_log_handler
126         self._amqp_log_handler.channel = self.channel
127
128     amqp_log_handler = property(None, _set_amqp_log_handler)
129
130     #############################################
131     # AMQP CONNECTION PROPERTIES 
132     #############################################
133
134     @property 
135     def channel(self):
136         """amqp channel"""
137         channel = self._connection.channel
138         if channel is not None:
139             return channel
140         else:
141             raise NotConnectedError()
142
143     ##############################################
144     # AMQP Configuration
145     ##############################################
146
147     def _start_consume(self):
148         """
149         Start consuming messages from the queue
150         """
151         basic_consume = self.channel.basic_consume
152         for queue in self._queues:
153             LOGGER.info("start consume on queue: %s" % queue)
154             self._consumer_tags[queue] = basic_consume(queue = queue,
155                                               callback = self._on_message)
156             
157     def _stop_consume(self):
158         """
159         Cancel consuming from queues. This is needed for proper load balancing.
160         Otherwise the server will push next task to the consumer as soon as the
161         ongoing is acked.
162         """
163         for queue in self._queues:
164             self.channel.basic_cancel(self._consumer_tags[queue])
165             LOGGER.info("stop consume on queue: %s" % queue)
166
167     def _init_connection(self):
168         """
169         Initialise the connection to AMQP.
170         Queue and Services Exchange are both durable
171         """
172         for queue in self._queues:
173             LOGGER.info("Initialising queue: %s" % queue)
174             self.channel.queue_declare(queue = queue, 
175                                        durable = True,
176                                        exclusive = False, 
177                                        auto_delete = False)
178             self.channel.exchange_declare(exchange = queue,
179                                           type = 'direct', 
180                                           durable = True,
181                                           auto_delete = False)
182             self.channel.queue_bind(queue = queue,
183                                     exchange = queue,
184                                     routing_key = queue)
185         self.channel.basic_qos(0, 1, False)
186
187     ###############################################
188     # LOOPING / HANDLING / DISPATCHING
189     ###############################################
190
191     def _cancel(self):
192         self.channel.basic_cancel(self._consumer_tag)
193
194     def _loop(self):
195         """
196         The main loop
197         Continually listen for messages coming from RabbitMQ
198         """
199         LOGGER.debug("Starting main loop...")
200         while self._keep_looping:
201             try:
202                 if not self._stop_file_exists():
203                     self.channel.wait()
204                 else:
205                     self._keep_looping = False
206             except Exception:
207                 LOGGER.exception("_loop() failed")
208                 #FIXME Check logs to see what exceptions are raised here
209                 self._try_reconnect()
210         self._clean_up()
211     
212     def _handle_message(self, message):
213         """
214         The Message Handler. 
215         
216         @type message: amqplib.client_0_8.basic_message.Message 
217         @param message: A message containing a pickled dictionary
218
219         This turns off the connection on receipt of a message.
220         Once the Task has run the connection is reestablished.
221
222         Response Queue is kept informed of the status
223         """
224         
225         self._stop_consume()
226         self.channel.basic_ack(delivery_tag = message.delivery_tag)
227         #
228         cmd_msg = unpack_message(message)
229         task_id = cmd_msg.task_id
230         response_queue = cmd_msg.response_queue
231        
232         self._set_log_handler(response_queue)
233         self._publish_task_state_change(task_id, response_queue)
234         #
235         try:
236             self._dispatch(cmd_msg)
237         except CommandFailed:
238             exception = sys.exc_info()[1]
239             exception.task_id = task_id 
240             self._publish_exception(response_queue,
241                                     exception)
242         finally:
243             self._set_log_handler(None)
244             self._publish_task_state_change(task_id, response_queue)
245             self._start_consume()
246
247     def _on_message(self, message):
248         """
249         The High Level Message Handler. 
250         Handles messages if the Worker is version compatible 
251         
252         @type message: amqplib.client_0_8.basic_message.Message 
253         @param message: A message containing a pickled dictionary
254         """
255         LOGGER.debug("Received Message")
256         if self._is_version_compatible(message):
257             self._handle_message(message)
258         else:
259             LOGGER.debug("Worker not version compatible")
260             #Close the connection makes message available to other Workers
261             self._clean_up()
262
263     def _dispatch(self, cmd_msg):
264         """
265         Dispatch the Task. Currently as a Process (Blocking)
266                 
267         @type message: C{ots.common.amqp.messages.CommandMessage
268         @param message: The CL params for the Process to be run as a Task 
269         """
270
271         if cmd_msg.is_quit:
272             LOGGER.debug("Received QUIT command")
273             self._keep_looping = False
274         elif not cmd_msg.is_ignore:
275             LOGGER.debug("Running command: '%s'"%(cmd_msg.command))
276             _start_process(command = cmd_msg.command)
277             
278     ########################################
279     # MESSAGE PUBLISHING
280     ########################################
281
282     def _publish_task_state_change(self, task_id, response_queue):
283
284         """
285         Inform the response queue of the status of the Task
286
287         @type response_queue: string
288         @param response_queue: The name of the response queue 
289         """
290         state = self._task_state.next()
291         LOGGER.debug("Task in state: '%s'"%(state))
292         state_msg = StateChangeMessage(task_id, state)
293         amqp_message = pack_message(state_msg) 
294         self.channel.basic_publish(amqp_message,
295                                    mandatory = True,
296                                    exchange = response_queue,
297                                    routing_key = response_queue)
298
299
300     def _publish_exception(self, response_queue, exception):
301         """
302         Put an Exception on the response queue 
303
304         @type response_queue: C{str}
305         @param response_queue: The name of the response queue 
306
307         @type exception: L{OTSException}
308         @param exception: An OTSException 
309
310         """
311         message = pack_message(exception)
312         self.channel.basic_publish(message,
313                                    mandatory = True,
314                                    exchange = response_queue,
315                                    routing_key = response_queue)
316
317     #######################################
318     # HELPERS
319     #######################################
320
321     def _is_version_compatible(self, message):
322         """
323         Is the Worker version compatible
324
325         @type message: amqplib.client_0_8.basic_message.Message 
326         @param message: A message containing a pickled dictionary
327
328         @rtype: C{bool}
329         @rparam: Returns True if compatible otherwise false
330         """
331         ret_val = True
332         cmd_msg = unpack_message(message)
333         min_worker_version = cmd_msg.min_worker_version
334
335         if min_worker_version is not None:
336             major_minor, revision = ots.worker.__VERSION__.split("r")
337             LOGGER.debug("Min version: %s. Worker version: %s"%
338                          (min_worker_version, major_minor))
339             ret_val = float(major_minor) >= float(min_worker_version)
340         return ret_val
341
342     def _set_log_handler(self, queue):
343         """
344         Set the AMQP Log Handler to use the queue
345         or None to stop it logging
346
347         @type queue : C{str} or None
348         @param queue : The name of the queue 
349         """
350         if self._amqp_log_handler is not None:
351             self._amqp_log_handler.queue = queue
352             self._amqp_log_handler.exchange = queue
353         
354     def _try_reconnect(self):
355        """
356        A poorly implemented reconnect to AMQP
357        """
358        #FIXME: Move out into own connection module.
359        #Implement with a exponential backoff with max retries.
360        LOGGER.exception("Error. Waiting 5s then retrying")
361        sleep(5)
362        try:
363            LOGGER.info("Trying to reconnect...")
364            self._connection.connect()
365            self._init_connection()
366            self._start_consume()
367        except Exception:
368            #If rabbit is still down, we expect this to fail
369            LOGGER.exception("Reconnecting failed...")
370
371     def _clean_up(self):
372         """
373         Delegate to connection cleanup
374         """
375         try:
376             self._stop_consume()
377         except:
378             pass
379         if self._connection:
380             self._connection.clean_up()
381
382     def __del__(self):
383         """
384         Destructor
385         """
386         self._clean_up()
387
388     @staticmethod
389     def _stop_file_exists():
390         """
391         Check whether the stop file is in place
392         
393         @rtype stop: C{bool} 
394         @return stop: Is stop file present
395         """
396         stop = False
397         if os.path.exists(STOP_SIGNAL_FILE):
398             os.system("rm -fr "+STOP_SIGNAL_FILE)
399             LOGGER.info("Worker was asked to stop after testrun ready.")
400             stop = True
401         return stop
402       
403     ################################
404     # PUBLIC METHODS
405     ################################
406         
407     def run(self):
408         """ 
409         Polls RabbitMQ for Task Messages and runs the Tasks.
410
411         Initialises the AMQP connections and run the forever loop.
412         """
413         self._init_connection()
414         self._start_consume()
415         self._loop()