merged cont.
[opensuse:yast-rest-service.git] / webyast / vendor / plugins / delayed_job / lib / delayed / job.rb
1 module Delayed
2
3   class DeserializationError < StandardError
4   end
5
6   # A job object that is persisted to the database.
7   # Contains the work object as a YAML field.
8   class Job < ActiveRecord::Base
9     MAX_ATTEMPTS = 25
10     MAX_RUN_TIME = 4.hours
11     set_table_name :delayed_jobs
12
13     # By default failed jobs are destroyed after too many attempts.
14     # If you want to keep them around (perhaps to inspect the reason
15     # for the failure), set this to false.
16     cattr_accessor :destroy_failed_jobs
17     self.destroy_failed_jobs = true
18
19     # Every worker has a unique name which by default is the pid of the process.
20     # There are some advantages to overriding this with something which survives worker retarts:
21     # Workers can safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before.
22     cattr_accessor :worker_name
23     self.worker_name = "host:#{Socket.gethostname} pid:#{Process.pid}" rescue "pid:#{Process.pid}"
24
25     NextTaskSQL         = '(run_at <= ? AND (locked_at IS NULL OR locked_at < ?) OR (locked_by = ?)) AND failed_at IS NULL'
26     NextTaskOrder       = 'priority DESC, run_at ASC'
27
28     ParseObjectFromYaml = /\!ruby\/\w+\:([^\s]+)/
29
30     cattr_accessor :min_priority, :max_priority
31     self.min_priority = nil
32     self.max_priority = nil
33
34     # When a worker is exiting, make sure we don't have any locked jobs.
35     def self.clear_locks!
36       update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name])
37     end
38
39     def failed?
40       failed_at
41     end
42     alias_method :failed, :failed?
43
44     def payload_object
45       @payload_object ||= deserialize(self['handler'])
46     end
47
48     def name
49       @name ||= begin
50         payload = payload_object
51         if payload.respond_to?(:display_name)
52           payload.display_name
53         else
54           payload.class.name
55         end
56       end
57     end
58
59     def payload_object=(object)
60       self['handler'] = object.to_yaml
61     end
62
63     # Reschedule the job in the future (when a job fails).
64     # Uses an exponential scale depending on the number of failed attempts.
65     def reschedule(message, backtrace = [], time = nil)
66       if self.attempts < MAX_ATTEMPTS
67         time ||= Job.db_time_now + (attempts ** 4) + 5
68
69         self.attempts    += 1
70         self.run_at       = time
71         self.last_error   = message + "\n" + backtrace.join("\n")
72         self.unlock
73         save!
74       else
75         logger.info "* [JOB] PERMANENTLY removing #{self.name} because of #{attempts} consequetive failures."
76         destroy_failed_jobs ? destroy : update_attribute(:failed_at, Delayed::Job.db_time_now)
77       end
78     end
79
80
81     # Try to run one job. Returns true/false (work done/work failed) or nil if job can't be locked.
82     def run_with_lock(max_run_time, worker_name)
83       logger.info "* [JOB] aquiring lock on #{name}"
84       unless lock_exclusively!(max_run_time, worker_name)
85         # We did not get the lock, some other worker process must have
86         logger.warn "* [JOB] failed to aquire exclusive lock for #{name}"
87         return nil # no work done
88       end
89
90       begin
91         runtime =  Benchmark.realtime do
92           invoke_job # TODO: raise error if takes longer than max_run_time
93           destroy
94         end
95         # TODO: warn if runtime > max_run_time ?
96         logger.info "* [JOB] #{name} completed after %.4f" % runtime
97         return true  # did work
98       rescue Exception => e
99         reschedule e.message, e.backtrace
100         log_exception(e)
101         return false  # work failed
102       end
103     end
104
105     # Add a job to the queue
106     def self.enqueue(*args, &block)
107       object = block_given? ? EvaledJob.new(&block) : args.shift
108
109       unless object.respond_to?(:perform) || block_given?
110         raise ArgumentError, 'Cannot enqueue items which do not respond to perform'
111       end
112     
113       priority = args.first || 0
114       run_at   = args[1]
115
116       Job.create(:payload_object => object, :priority => priority.to_i, :run_at => run_at)
117     end
118
119     # Find a few candidate jobs to run (in case some immediately get locked by others).
120     # Return in random order prevent everyone trying to do same head job at once.
121     def self.find_available(limit = 5, max_run_time = MAX_RUN_TIME)
122
123       time_now = db_time_now
124
125       sql = NextTaskSQL.dup
126
127       conditions = [time_now, time_now - max_run_time, worker_name]
128
129       if self.min_priority
130         sql << ' AND (priority >= ?)'
131         conditions << min_priority
132       end
133
134       if self.max_priority
135         sql << ' AND (priority <= ?)'
136         conditions << max_priority
137       end
138
139       conditions.unshift(sql)
140
141       records = ActiveRecord::Base.silence do
142         find(:all, :conditions => conditions, :order => NextTaskOrder, :limit => limit)
143       end
144
145       records.sort_by { rand() }
146     end
147
148     # Run the next job we can get an exclusive lock on.
149     # If no jobs are left we return nil
150     def self.reserve_and_run_one_job(max_run_time = MAX_RUN_TIME)
151
152       # We get up to 5 jobs from the db. In case we cannot get exclusive access to a job we try the next.
153       # this leads to a more even distribution of jobs across the worker processes
154       find_available(5, max_run_time).each do |job|
155         t = job.run_with_lock(max_run_time, worker_name)
156         return t unless t == nil  # return if we did work (good or bad)
157       end
158
159       nil # we didn't do any work, all 5 were not lockable
160     end
161
162     # Lock this job for this worker.
163     # Returns true if we have the lock, false otherwise.
164     def lock_exclusively!(max_run_time, worker = worker_name)
165       now = self.class.db_time_now
166       affected_rows = if locked_by != worker
167         # We don't own this job so we will update the locked_by name and the locked_at
168         self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?)", id, (now - max_run_time.to_i)])
169       else
170         # We already own this job, this may happen if the job queue crashes.
171         # Simply resume and update the locked_at
172         self.class.update_all(["locked_at = ?", now], ["id = ? and locked_by = ?", id, worker])
173       end
174       if affected_rows == 1
175         self.locked_at    = now
176         self.locked_by    = worker
177         return true
178       else
179         return false
180       end
181     end
182
183     # Unlock this job (note: not saved to DB)
184     def unlock
185       self.locked_at    = nil
186       self.locked_by    = nil
187     end
188
189     # This is a good hook if you need to report job processing errors in additional or different ways
190     def log_exception(error)
191       logger.error "* [JOB] #{name} failed with #{error.class.name}: #{error.message} - #{attempts} failed attempts"
192       logger.error(error)
193     end
194
195     # Do num jobs and return stats on success/failure.
196     # Exit early if interrupted.
197     def self.work_off(num = 100)
198       success, failure = 0, 0
199
200       num.times do
201         case self.reserve_and_run_one_job
202         when true
203             success += 1
204         when false
205             failure += 1
206         else
207           break  # leave if no work could be done
208         end
209         break if $exit # leave if we're exiting
210       end
211
212       return [success, failure]
213     end
214
215     # Moved into its own method so that new_relic can trace it.
216     def invoke_job
217       payload_object.perform
218     end
219
220   private
221
222     def deserialize(source)
223       handler = YAML.load(source) rescue nil
224
225       unless handler.respond_to?(:perform)
226         if handler.nil? && source =~ ParseObjectFromYaml
227           handler_class = $1
228         end
229         attempt_to_load(handler_class || handler.class)
230         handler = YAML.load(source)
231       end
232
233       return handler if handler.respond_to?(:perform)
234
235       raise DeserializationError,
236         'Job failed to load: Unknown handler. Try to manually require the appropiate file.'
237     rescue TypeError, LoadError, NameError => e
238       raise DeserializationError,
239         "Job failed to load: #{e.message}. Try to manually require the required file."
240     end
241
242     # Constantize the object so that ActiveSupport can attempt
243     # its auto loading magic. Will raise LoadError if not successful.
244     def attempt_to_load(klass)
245        klass.constantize
246     end
247
248     # Get the current time (GMT or local depending on DB)
249     # Note: This does not ping the DB to get the time, so all your clients
250     # must have syncronized clocks.
251     def self.db_time_now
252       (ActiveRecord::Base.default_timezone == :utc) ? Time.now.utc : Time.zone.now
253     end
254
255   protected
256
257     def before_save
258       self.run_at ||= self.class.db_time_now
259     end
260
261   end
262
263   class EvaledJob
264     def initialize
265       @job = yield
266     end
267
268     def perform
269       eval(@job)
270     end
271   end
272 end