Class OpenWFE::Scheduler
In: lib/openwfe/util/scheduler.rb
Parent: Object

The Scheduler is used by OpenWFEru for registering ‘at’ and ‘cron’ jobs. ‘at’ jobs to execute once at a given point in time. ‘cron’ jobs execute a specified intervals. The two main methods are thus schedule_at() and schedule().

schedule_at() and schedule() await either a Schedulable instance and params (usually an array or nil), either a block, which is more in the Ruby way.

Examples

   scheduler.schedule_in("3d") do
       regenerate_monthly_report()
   end
       #
       # will call the regenerate_monthly_report method
       # in 3 days from now

    scheduler.schedule "0 22 * * 1-5" do
        log.info "activating security system..."
        activate_security_system()
    end

    job_id = scheduler.schedule_at "Sun Oct 07 14:24:01 +0900 2009" do
        init_self_destruction_sequence()
    end

an example that uses a Schedulable class :

   class Regenerator < Schedulable
       def trigger (frequency)
           self.send(frequency)
       end
       def monthly
           # ...
       end
       def yearly
           # ...
       end
   end

   regenerator = Regenerator.new

   scheduler.schedule_in("4d", regenerator)
       #
       # will regenerate the report in four days

   scheduler.schedule_in(
       "5d",
       { :schedulable => regenerator, :scope => :month })
           #
           # will regenerate the monthly report in 5 days

There is also schedule_every() :

    scheduler.schedule_every("1h20m") do
        regenerate_latest_report()
    end

The scheduler has a "exit_when_no_more_jobs" attribute. When set to ‘true’, the scheduler will exit as soon as there are no more jobs to run. Use with care though, if you create a scheduler, set this attribute to true and start the scheduler, the scheduler will immediately exit. This attribute is best used indirectly : the method join_until_no_more_jobs() wraps it.

The :scheduler_precision can be set when instantiating the scheduler.

    scheduler = OpenWFE::Scheduler.new(:scheduler_precision => 0.500)
    scheduler.start
        #
        # instatiates a scheduler that checks its jobs twice per second
        # (the default is 4 times per second (0.250))

Tags

Since OpenWFEru 0.9.16, tags can be attached to jobs scheduled :

    scheduler.schedule_in "2h", :tags => "backup" do
        init_backup_sequence()
    end

    scheduler.schedule "0 24 * * *", :tags => "new_day" do
        do_this_or_that()
    end

    jobs = find_jobs 'backup'
    jobs.each { |job| job.unschedule }

Multiple tags may be attached to a single job :

    scheduler.schedule_in "2h", :tags => [ "backup", "important" ]  do
        init_backup_sequence()
    end

The vanilla case for tags assume they are String instances, but nothing prevents you from using anything else. The scheduler has no persistence by itself, so no serialization issue.

Cron up to the second

Since OpenWFEru 0.9.16, a cron schedule can be set at the second level :

    scheduler.schedule "7 * * * * *" do
        puts "it's now the seventh second of the minute"
    end

The OpenWFEru scheduler recognizes an optional first column for second scheduling. This column can, like for the other columns, specify a value ("7"), a list of values ("7,8,9,27") or a range ("7-12").

Exceptions

The OpenWFEru scheduler will output a stacktrace to the STDOUT in case of exception. There are two ways to change that behaviour.

    # 1 - providing a lwarn method to the scheduler instance :

    class << scheduler
        def lwarn (&block)
            puts "oops, something wrong happened : "
            puts block.call
        end
    end

    # 2 - overriding the [protected] method log_exception(e) :

    class << scheduler
        def log_exception (e)
            puts "something wrong happened : "+e.to_s
        end
    end

‘Every jobs’ and rescheduling

Every jobs can reschedule/unschedule themselves. A reschedule example :

    schedule.schedule_every "5h" do |job_id, at, params|

        mails = $inbox.fetch_mails
        mails.each { |m| $inbox.mark_as_spam(m) if is_spam(m) }

        params[:every] = if mails.size > 100
            "1h" # lots of spam, check every hour
        else
            "5h" # normal schedule, every 5 hours
        end
    end

Unschedule example :

    schedule.schedule_every "10s" do |job_id, at, params|
        #
        # polls every 10 seconds until a mail arrives

        $mail = $inbox.fetch_last_mail

        params[:dont_reschedule] = true if $mail
    end

Methods

Attributes

precision  [RW]  By default, the precision is 0.250, with means the scheduler will check for jobs to execute 4 times per second.
stopped  [RW]  As its name implies.

Public Class methods

Returns true if the given string seems to be a cron string.

[Source]

     # File lib/openwfe/util/scheduler.rb, line 624
624:         def Scheduler.is_cron_string (s)
625: 
626:             s.match(".+ .+ .+ .+ .+")
627:         end

[Source]

     # File lib/openwfe/util/scheduler.rb, line 225
225:         def initialize (params={})
226: 
227:             super()
228: 
229:             @pending_jobs = []
230:             @cron_jobs = {}
231: 
232:             @schedule_queue = Queue.new
233:             @unschedule_queue = Queue.new
234:                 #
235:                 # sync between the step() method and the [un]schedule
236:                 # methods is done via these queues, no more mutex
237: 
238:             @scheduler_thread = nil
239: 
240:             @precision = 0.250
241:                 # every 250ms, the scheduler wakes up (default value)
242:             begin
243:                 @precision = Float(params[:scheduler_precision])
244:             rescue Exception => e
245:                 # let precision at its default value
246:             end
247: 
248:             @exit_when_no_more_jobs = false
249:             @dont_reschedule_every = false
250: 
251:             @last_cron_second = -1
252: 
253:             @stopped = true
254:         end

Public Instance methods

Returns the current count of ‘at’ jobs scheduled (not ‘every’).

[Source]

     # File lib/openwfe/util/scheduler.rb, line 616
616:         def at_job_count
617: 
618:             @pending_jobs.select { |j| j.instance_of?(AtJob) }.size
619:         end

Returns the number of cron jobs currently active in this scheduler.

[Source]

     # File lib/openwfe/util/scheduler.rb, line 600
600:         def cron_job_count
601: 
602:             @cron_jobs.size
603:         end

Returns the current count of ‘every’ jobs scheduled.

[Source]

     # File lib/openwfe/util/scheduler.rb, line 608
608:         def every_job_count
609: 
610:             @pending_jobs.select { |j| j.is_a?(EveryJob) }.size
611:         end

Returns an array of jobs that have the given tag.

[Source]

     # File lib/openwfe/util/scheduler.rb, line 555
555:         def find_jobs (tag)
556: 
557:             result = @cron_jobs.values.find_all do |job|
558:                 job.has_tag?(tag)
559:             end
560: 
561:             result + @pending_jobs.find_all do |job|
562:                 job.has_tag?(tag)
563:             end
564:         end

Finds the jobs with the given tag and then returns an array of the wrapped Schedulable objects. Jobs that haven‘t a wrapped Schedulable won‘t be included in the result.

[Source]

     # File lib/openwfe/util/scheduler.rb, line 572
572:         def find_schedulables (tag)
573: 
574:             #jobs = find_jobs(tag)
575:             #result = []
576:             #jobs.each do |job|
577:             #    result.push(job.schedulable) if job.respond_to?(:schedulable)
578:             #end
579:             #result
580: 
581:             find_jobs(tags).inject([]) do |result, job|
582: 
583:                 result.push(job.schedulable) if job.respond_to?(:schedulable)
584:                 result
585:             end
586:         end

Returns the job corresponding to job_id, an instance of AtJob or CronJob will be returned.

[Source]

     # File lib/openwfe/util/scheduler.rb, line 527
527:         def get_job (job_id)
528: 
529:             job = @cron_jobs[job_id]
530:             return job if job
531: 
532:             @pending_jobs.find do |job|
533:                 job.job_id == job_id
534:             end
535:         end

Finds a job (via get_job()) and then returns the wrapped schedulable if any.

[Source]

     # File lib/openwfe/util/scheduler.rb, line 541
541:         def get_schedulable (job_id)
542: 
543:             #return nil unless job_id
544: 
545:             j = get_job(job_id)
546: 
547:             return j.schedulable if j.respond_to?(:schedulable)
548: 
549:             nil
550:         end

Joins on the scheduler thread

[Source]

     # File lib/openwfe/util/scheduler.rb, line 299
299:         def join
300: 
301:             @scheduler_thread.join
302:         end

Like join() but takes care of setting the ‘exit_when_no_more_jobs’ attribute of this scheduler to true before joining. Thus the scheduler will exit (and the join terminates) as soon as there aren‘t no more ‘at’ (or ‘every’) jobs in the scheduler.

Currently used only in unit tests.

[Source]

     # File lib/openwfe/util/scheduler.rb, line 312
312:         def join_until_no_more_jobs
313: 
314:             @exit_when_no_more_jobs = true
315:             join
316:         end

Returns the number of currently pending jobs in this scheduler (‘at’ jobs and ‘every’ jobs).

[Source]

     # File lib/openwfe/util/scheduler.rb, line 592
592:         def pending_job_count
593: 
594:             @pending_jobs.size
595:         end

Schedules a cron job, the ‘cron_line’ is a string following the Unix cron standard (see "man 5 crontab" in your command line, or www.google.com/search?q=man%205%20crontab).

For example :

   scheduler.schedule("5 0 * * *", s)
       # will trigger the schedulable s every day
       # five minutes after midnight

   scheduler.schedule("15 14 1 * *", s)
       # will trigger s at 14:15 on the first of every month

   scheduler.schedule("0 22 * * 1-5") do
       puts "it's break time..."
   end
       # outputs a message every weekday at 10pm

Returns the job id attributed to this ‘cron job’, this id can be used to unschedule the job.

This method returns a job identifier which can be used to unschedule() the job.

[Source]

     # File lib/openwfe/util/scheduler.rb, line 469
469:         def schedule (cron_line, params={}, &block)
470: 
471:             params = prepare_params(params)
472:         
473:             #
474:             # is a job with the same id already scheduled ?
475: 
476:             cron_id = params[:cron_id]
477:             cron_id = params[:job_id] unless cron_id
478: 
479:             #unschedule(cron_id) if cron_id
480:             @unschedule_queue << [ :cron, cron_id ]
481: 
482:             #
483:             # schedule
484: 
485:             b = to_block(params, &block)
486:             job = CronJob.new(self, cron_id, cron_line, params, &b)
487: 
488:             #@cron_jobs[job.job_id] = job
489:             @schedule_queue << job
490: 
491:             job.job_id
492:         end

Schedules a job by specifying at which time it should trigger. Returns the a job_id that can be used to unschedule the job.

This method returns a job identifier which can be used to unschedule() the job.

If the job is specified in the past, it will be triggered immediately but not scheduled. To avoid the triggering, the parameter :discard_past may be set to true :

    jobid = scheduler.schedule_at(yesterday, :discard_past => true) do
        puts "you'll never read this message"
    end

And ‘jobid’ will hold a nil (not scheduled).

[Source]

     # File lib/openwfe/util/scheduler.rb, line 343
343:         def schedule_at (at, params={}, &block)
344: 
345:             do_schedule_at(
346:                 at, 
347:                 prepare_params(params), 
348:                 &block)
349:         end

Schedules a job in a loop. After an execution, it will not execute before the time specified in ‘freq’.

This method returns a job identifier which can be used to unschedule() the job.

In case of exception in the job, it will be rescheduled. If you don‘t want the job to be rescheduled, set the parameter :try_again to false.

    scheduler.schedule_every "500", :try_again => false do
        do_some_prone_to_error_stuff()
            # won't get rescheduled in case of exception
    end

[Source]

     # File lib/openwfe/util/scheduler.rb, line 382
382:         def schedule_every (freq, params={}, &block)
383: 
384:             f = duration_to_f freq
385: 
386:             params = prepare_params params
387:             schedulable = params[:schedulable]
388:             params[:every] = freq
389: 
390:             last_at = params[:last_at]
391:             next_at = if last_at
392:                 last_at + f
393:             else
394:                 Time.now.to_f + f
395:             end
396: 
397:             do_schedule_at(next_at, params) do |job_id, at|
398: 
399:                 #
400:                 # trigger ...
401: 
402:                 hit_exception = false
403: 
404:                 begin
405: 
406:                     if schedulable
407:                         schedulable.trigger params
408:                     else
409:                         block.call job_id, at, params
410:                     end
411: 
412:                 rescue Exception => e
413: 
414:                     log_exception e
415: 
416:                     hit_exception = true
417:                 end
418: 
419:                 # cannot use a return here !!! (block)
420: 
421:                 unless \
422:                     @dont_reschedule_every or
423:                     (params[:dont_reschedule] == true) or
424:                     (hit_exception and params[:try_again] == false)
425: 
426:                     #
427:                     # ok, reschedule ...
428: 
429:                     params[:job_id] = job_id
430:                     params[:last_at] = at
431:                     
432:                     schedule_every params[:every], params, &block
433:                         #
434:                         # yes, this is a kind of recursion
435: 
436:                         # note that params[:every] might have been changed
437:                         # by the block/schedulable code
438:                 end
439: 
440:                 job_id
441:             end
442:         end

Schedules a job by stating in how much time it should trigger. Returns the a job_id that can be used to unschedule the job.

This method returns a job identifier which can be used to unschedule() the job.

[Source]

     # File lib/openwfe/util/scheduler.rb, line 359
359:         def schedule_in (duration, params={}, &block)
360: 
361:             do_schedule_at(
362:                 Time.new.to_f + duration_to_f(duration),
363:                 prepare_params(params),
364:                 &block)
365:         end

Starts this scheduler (or restart it if it was previously stopped)

[Source]

     # File lib/openwfe/util/scheduler.rb, line 259
259:         def sstart
260: 
261:             @stopped = false
262: 
263:             @scheduler_thread = Thread.new do
264: 
265:                 if defined?(JRUBY_VERSION)
266: 
267:                     require 'java'
268: 
269:                     java.lang.Thread.current_thread.name = \
270:                         "openwferu scheduler (Ruby Thread)"
271:                 end
272: 
273:                 loop do
274: 
275:                     break if @stopped
276: 
277:                     step
278: 
279:                     sleep @precision
280:                         # TODO : adjust precision
281:                 end
282:             end
283:         end

The scheduler is stoppable via sstop()

[Source]

     # File lib/openwfe/util/scheduler.rb, line 288
288:         def sstop
289: 
290:             @stopped = true
291:         end
start()

Alias for sstart

stop()

Alias for sstop

Unschedules an ‘at’ or a ‘cron’ job identified by the id it was given at schedule time.

[Source]

     # File lib/openwfe/util/scheduler.rb, line 504
504:         def unschedule (job_id)
505:             
506:             @unschedule_queue << [ :at, job_id ]
507:         end

Unschedules a cron job

[Source]

     # File lib/openwfe/util/scheduler.rb, line 512
512:         def unschedule_cron_job (job_id)
513: 
514:             @unschedule_queue << [ :cron, job_id ]
515:         end

[Validate]