| Class | OpenWFE::Scheduler |
| In: |
lib/openwfe/util/scheduler.rb
|
| Parent: | Object |
The Scheduler is used by OpenWFEru for registering ‘at’ and ‘cron’ jobs. ‘at’ jobs to execute once at a given point in time. ‘cron’ jobs execute a specified intervals. The two main methods are thus schedule_at() and schedule().
schedule_at() and schedule() await either a Schedulable instance and params (usually an array or nil), either a block, which is more in the Ruby way.
scheduler.schedule_in("3d") do
regenerate_monthly_report()
end
#
# will call the regenerate_monthly_report method
# in 3 days from now
scheduler.schedule "0 22 * * 1-5" do
log.info "activating security system..."
activate_security_system()
end
job_id = scheduler.schedule_at "Sun Oct 07 14:24:01 +0900 2009" do
init_self_destruction_sequence()
end
an example that uses a Schedulable class :
class Regenerator < Schedulable
def trigger (frequency)
self.send(frequency)
end
def monthly
# ...
end
def yearly
# ...
end
end
regenerator = Regenerator.new
scheduler.schedule_in("4d", regenerator)
#
# will regenerate the report in four days
scheduler.schedule_in(
"5d",
{ :schedulable => regenerator, :scope => :month })
#
# will regenerate the monthly report in 5 days
There is also schedule_every() :
scheduler.schedule_every("1h20m") do
regenerate_latest_report()
end
The scheduler has a "exit_when_no_more_jobs" attribute. When set to ‘true’, the scheduler will exit as soon as there are no more jobs to run. Use with care though, if you create a scheduler, set this attribute to true and start the scheduler, the scheduler will immediately exit. This attribute is best used indirectly : the method join_until_no_more_jobs() wraps it.
The :scheduler_precision can be set when instantiating the scheduler.
scheduler = OpenWFE::Scheduler.new(:scheduler_precision => 0.500)
scheduler.start
#
# instatiates a scheduler that checks its jobs twice per second
# (the default is 4 times per second (0.250))
Since OpenWFEru 0.9.16, tags can be attached to jobs scheduled :
scheduler.schedule_in "2h", :tags => "backup" do
init_backup_sequence()
end
scheduler.schedule "0 24 * * *", :tags => "new_day" do
do_this_or_that()
end
jobs = find_jobs 'backup'
jobs.each { |job| job.unschedule }
Multiple tags may be attached to a single job :
scheduler.schedule_in "2h", :tags => [ "backup", "important" ] do
init_backup_sequence()
end
The vanilla case for tags assume they are String instances, but nothing prevents you from using anything else. The scheduler has no persistence by itself, so no serialization issue.
Since OpenWFEru 0.9.16, a cron schedule can be set at the second level :
scheduler.schedule "7 * * * * *" do
puts "it's now the seventh second of the minute"
end
The OpenWFEru scheduler recognizes an optional first column for second scheduling. This column can, like for the other columns, specify a value ("7"), a list of values ("7,8,9,27") or a range ("7-12").
The OpenWFEru scheduler will output a stacktrace to the STDOUT in case of exception. There are two ways to change that behaviour.
# 1 - providing a lwarn method to the scheduler instance :
class << scheduler
def lwarn (&block)
puts "oops, something wrong happened : "
puts block.call
end
end
# 2 - overriding the [protected] method log_exception(e) :
class << scheduler
def log_exception (e)
puts "something wrong happened : "+e.to_s
end
end
Every jobs can reschedule/unschedule themselves. A reschedule example :
schedule.schedule_every "5h" do |job_id, at, params|
mails = $inbox.fetch_mails
mails.each { |m| $inbox.mark_as_spam(m) if is_spam(m) }
params[:every] = if mails.size > 100
"1h" # lots of spam, check every hour
else
"5h" # normal schedule, every 5 hours
end
end
Unschedule example :
schedule.schedule_every "10s" do |job_id, at, params|
#
# polls every 10 seconds until a mail arrives
$mail = $inbox.fetch_last_mail
params[:dont_reschedule] = true if $mail
end
| precision | [RW] | By default, the precision is 0.250, with means the scheduler will check for jobs to execute 4 times per second. |
| stopped | [RW] | As its name implies. |
Returns true if the given string seems to be a cron string.
# File lib/openwfe/util/scheduler.rb, line 624
624: def Scheduler.is_cron_string (s)
625:
626: s.match(".+ .+ .+ .+ .+")
627: end
# File lib/openwfe/util/scheduler.rb, line 225
225: def initialize (params={})
226:
227: super()
228:
229: @pending_jobs = []
230: @cron_jobs = {}
231:
232: @schedule_queue = Queue.new
233: @unschedule_queue = Queue.new
234: #
235: # sync between the step() method and the [un]schedule
236: # methods is done via these queues, no more mutex
237:
238: @scheduler_thread = nil
239:
240: @precision = 0.250
241: # every 250ms, the scheduler wakes up (default value)
242: begin
243: @precision = Float(params[:scheduler_precision])
244: rescue Exception => e
245: # let precision at its default value
246: end
247:
248: @exit_when_no_more_jobs = false
249: @dont_reschedule_every = false
250:
251: @last_cron_second = -1
252:
253: @stopped = true
254: end
Returns the current count of ‘at’ jobs scheduled (not ‘every’).
# File lib/openwfe/util/scheduler.rb, line 616
616: def at_job_count
617:
618: @pending_jobs.select { |j| j.instance_of?(AtJob) }.size
619: end
Returns the number of cron jobs currently active in this scheduler.
# File lib/openwfe/util/scheduler.rb, line 600
600: def cron_job_count
601:
602: @cron_jobs.size
603: end
Returns the current count of ‘every’ jobs scheduled.
# File lib/openwfe/util/scheduler.rb, line 608
608: def every_job_count
609:
610: @pending_jobs.select { |j| j.is_a?(EveryJob) }.size
611: end
Returns an array of jobs that have the given tag.
# File lib/openwfe/util/scheduler.rb, line 555
555: def find_jobs (tag)
556:
557: result = @cron_jobs.values.find_all do |job|
558: job.has_tag?(tag)
559: end
560:
561: result + @pending_jobs.find_all do |job|
562: job.has_tag?(tag)
563: end
564: end
Finds the jobs with the given tag and then returns an array of the wrapped Schedulable objects. Jobs that haven‘t a wrapped Schedulable won‘t be included in the result.
# File lib/openwfe/util/scheduler.rb, line 572
572: def find_schedulables (tag)
573:
574: #jobs = find_jobs(tag)
575: #result = []
576: #jobs.each do |job|
577: # result.push(job.schedulable) if job.respond_to?(:schedulable)
578: #end
579: #result
580:
581: find_jobs(tags).inject([]) do |result, job|
582:
583: result.push(job.schedulable) if job.respond_to?(:schedulable)
584: result
585: end
586: end
Returns the job corresponding to job_id, an instance of AtJob or CronJob will be returned.
# File lib/openwfe/util/scheduler.rb, line 527
527: def get_job (job_id)
528:
529: job = @cron_jobs[job_id]
530: return job if job
531:
532: @pending_jobs.find do |job|
533: job.job_id == job_id
534: end
535: end
Finds a job (via get_job()) and then returns the wrapped schedulable if any.
# File lib/openwfe/util/scheduler.rb, line 541
541: def get_schedulable (job_id)
542:
543: #return nil unless job_id
544:
545: j = get_job(job_id)
546:
547: return j.schedulable if j.respond_to?(:schedulable)
548:
549: nil
550: end
Joins on the scheduler thread
# File lib/openwfe/util/scheduler.rb, line 299
299: def join
300:
301: @scheduler_thread.join
302: end
Like join() but takes care of setting the ‘exit_when_no_more_jobs’ attribute of this scheduler to true before joining. Thus the scheduler will exit (and the join terminates) as soon as there aren‘t no more ‘at’ (or ‘every’) jobs in the scheduler.
Currently used only in unit tests.
# File lib/openwfe/util/scheduler.rb, line 312
312: def join_until_no_more_jobs
313:
314: @exit_when_no_more_jobs = true
315: join
316: end
Returns the number of currently pending jobs in this scheduler (‘at’ jobs and ‘every’ jobs).
# File lib/openwfe/util/scheduler.rb, line 592
592: def pending_job_count
593:
594: @pending_jobs.size
595: end
Schedules a cron job, the ‘cron_line’ is a string following the Unix cron standard (see "man 5 crontab" in your command line, or www.google.com/search?q=man%205%20crontab).
For example :
scheduler.schedule("5 0 * * *", s)
# will trigger the schedulable s every day
# five minutes after midnight
scheduler.schedule("15 14 1 * *", s)
# will trigger s at 14:15 on the first of every month
scheduler.schedule("0 22 * * 1-5") do
puts "it's break time..."
end
# outputs a message every weekday at 10pm
Returns the job id attributed to this ‘cron job’, this id can be used to unschedule the job.
This method returns a job identifier which can be used to unschedule() the job.
# File lib/openwfe/util/scheduler.rb, line 469
469: def schedule (cron_line, params={}, &block)
470:
471: params = prepare_params(params)
472:
473: #
474: # is a job with the same id already scheduled ?
475:
476: cron_id = params[:cron_id]
477: cron_id = params[:job_id] unless cron_id
478:
479: #unschedule(cron_id) if cron_id
480: @unschedule_queue << [ :cron, cron_id ]
481:
482: #
483: # schedule
484:
485: b = to_block(params, &block)
486: job = CronJob.new(self, cron_id, cron_line, params, &b)
487:
488: #@cron_jobs[job.job_id] = job
489: @schedule_queue << job
490:
491: job.job_id
492: end
Schedules a job by specifying at which time it should trigger. Returns the a job_id that can be used to unschedule the job.
This method returns a job identifier which can be used to unschedule() the job.
If the job is specified in the past, it will be triggered immediately but not scheduled. To avoid the triggering, the parameter :discard_past may be set to true :
jobid = scheduler.schedule_at(yesterday, :discard_past => true) do
puts "you'll never read this message"
end
And ‘jobid’ will hold a nil (not scheduled).
# File lib/openwfe/util/scheduler.rb, line 343
343: def schedule_at (at, params={}, &block)
344:
345: do_schedule_at(
346: at,
347: prepare_params(params),
348: &block)
349: end
Schedules a job in a loop. After an execution, it will not execute before the time specified in ‘freq’.
This method returns a job identifier which can be used to unschedule() the job.
In case of exception in the job, it will be rescheduled. If you don‘t want the job to be rescheduled, set the parameter :try_again to false.
scheduler.schedule_every "500", :try_again => false do
do_some_prone_to_error_stuff()
# won't get rescheduled in case of exception
end
# File lib/openwfe/util/scheduler.rb, line 382
382: def schedule_every (freq, params={}, &block)
383:
384: f = duration_to_f freq
385:
386: params = prepare_params params
387: schedulable = params[:schedulable]
388: params[:every] = freq
389:
390: last_at = params[:last_at]
391: next_at = if last_at
392: last_at + f
393: else
394: Time.now.to_f + f
395: end
396:
397: do_schedule_at(next_at, params) do |job_id, at|
398:
399: #
400: # trigger ...
401:
402: hit_exception = false
403:
404: begin
405:
406: if schedulable
407: schedulable.trigger params
408: else
409: block.call job_id, at, params
410: end
411:
412: rescue Exception => e
413:
414: log_exception e
415:
416: hit_exception = true
417: end
418:
419: # cannot use a return here !!! (block)
420:
421: unless \
422: @dont_reschedule_every or
423: (params[:dont_reschedule] == true) or
424: (hit_exception and params[:try_again] == false)
425:
426: #
427: # ok, reschedule ...
428:
429: params[:job_id] = job_id
430: params[:last_at] = at
431:
432: schedule_every params[:every], params, &block
433: #
434: # yes, this is a kind of recursion
435:
436: # note that params[:every] might have been changed
437: # by the block/schedulable code
438: end
439:
440: job_id
441: end
442: end
Schedules a job by stating in how much time it should trigger. Returns the a job_id that can be used to unschedule the job.
This method returns a job identifier which can be used to unschedule() the job.
# File lib/openwfe/util/scheduler.rb, line 359
359: def schedule_in (duration, params={}, &block)
360:
361: do_schedule_at(
362: Time.new.to_f + duration_to_f(duration),
363: prepare_params(params),
364: &block)
365: end
Starts this scheduler (or restart it if it was previously stopped)
# File lib/openwfe/util/scheduler.rb, line 259
259: def sstart
260:
261: @stopped = false
262:
263: @scheduler_thread = Thread.new do
264:
265: if defined?(JRUBY_VERSION)
266:
267: require 'java'
268:
269: java.lang.Thread.current_thread.name = \
270: "openwferu scheduler (Ruby Thread)"
271: end
272:
273: loop do
274:
275: break if @stopped
276:
277: step
278:
279: sleep @precision
280: # TODO : adjust precision
281: end
282: end
283: end