part of the flow

The motivation for writing this piece of documentation came from a discussion on the mailing list.

The classical block participant example is quite synchronous : the ruote (OpenWFEru) engine feeds a workitem to the participant which does something with/about it and gives it back to the engine, almost immediately.

Ruote is an asynchronous beast. Workflow engines are asynchronous beasts. They cannot get stuck waiting for one single reply.

The engine itself cannot distinguish between local and remote participants, it only hands the workitem to the participant and sometimes later, as an updated version of the workitem comes back, it lets the flow resume where it stopped, at the participant expression point.

The participant / engine interaction begins when a participant expression gets applied, typically, when launching this process

 1   OpenWFE.process_definition :name => 'accident investigation case' do
 2     sequence do
 3       concurrence do
 4         participant 'reporter 1', :activity => 'gather info on accident site'
 5         participant 'reporter 2', :activity => 'gather info about customer'
 6       end
 7       participant 'hq', :activity => 'handle case'
 8     end
 9   end

the engine will end up applying the participants ‘reporter 1’ and ‘reporter 2’ concurrently.

The participant expression will look up a [registered] participant instance and make it consume the workitem. What happens next is outside of the scope of the engine, the participant expression will wait for a reply from the participant (the expression could eventually timeout).

‘local’ participants

Participants whose activity is limited to the Ruby runtime of ruote. Those participants will usually reply by themselves to the engine, whereas the ‘remote’ participants will do it via a Listener of some sort.

  1. participant expression gets applied
  2. the participant expression gets the reference of the participant instance from the participant map, it hands the workitem to the participant instance
  3. the participant instance, once its job is done for the workitem, will hand back the workitem to the engine, which will call the reply method of the participant expression
  4. the participant expression resumes the flow by replying to its parent expression

As said earlier, the vanilla example for a ‘local’ participant is the block participant. It executes a block of ruby code (with access to the workitem) and then implicitely replies to the engine.

Here is an example of block participant who uses Prawn to generate a PDF report :

 1 engine.register_participant :generate_report do |workitem|
 2 
 3   path = "report-#{workitem.fei.workflow_instance_id}.pdf"
 4 
 5   Prawn::Document.generate(path) do
 6     font 'Helvetica'
 7     text "about : #{workitem.subject}"
 8     text "date : #{Time.now}"
 9     text "summary : "
10     text workitem.summary
11   end
12 
13   workitem.report_path = path
14 
15   # block participants implicitely reply to the engine
16 end

(for another complete example of pdf report generation)

Most of the time, a block participant isn’t ideal. Some participants may have an activity lasting longer than a direct ask/reply exchange, their activity should be cancellable.

 1 require 'yaml'
 2 
 3 # An inbox where workitems stay only for 3 hours.
 4 # Workitems are then removed and their processes resume.
 5 #
 6 class SimpleInboxParticipant
 7   include OpenWFE::LocalParticipant
 8 
 9   def initialize (options={})
10     @dir = options[:inbox_dir] || 'work/inbox/'
11     FileUtils.mkdir_p(@dir) unless File.exist?(@dir)
12   end
13 
14   # this is the method called when the participant expression hands a workitem
15   # to this participant
16   #
17   def consume (workitem)
18 
19     File.open(determine_file_name(workitem)) do |f|
20       f.write(YAML.dump(workitem))
21     end
22 
23     sleep 3 * 3600 # expose workitem for 3 hours
24 
25     cancel(workitem) # delete file
26     reply_to_engine(workitem) # let flow resume
27   end
28 
29   # this method is called when a segment or the whole process which has
30   # sent a workitem to this participant gets cancelled.
31   #
32   def cancel (cancelitem)
33     begin
34       FileUtils.rm_f(determine_file_name(cancelitem))
35     rescue Exception => e
36       # don't care, gone is gone
37     end
38   end
39 
40   protected
41 
42   def determine_file_name (workitem)
43     "#{@dir}/#{workitem.fei.wfid}_#{workitem.fei.expid}.yaml"
44   end
45 end
46 
47 engine.register_participant :alice, SimpleInbox, :dir => 'work/alice'
48 engine.register_participant :bob, SimpleInbox, :dir => 'work/bob'

This participant simply dumps workitems as YAML in a file.

‘remote’ participants

Participants who act as proxy to ‘real’ participants somewhere outside of the Ruby runtime.

  1. participant expression gets applied
  2. participant instance receives workitem
  3. participant dispatches to ‘real’ participant
  4. ‘real’ participant does its job with the workitem, and some time later, hands it back to the engine via a listener
  5. listener gives back the workitem to the engine which triggers the reply method of the waiting participant expression
  6. the participant expression resumes the flow by replying to its parent expression

TODO : continue me …

participant timeout

In the process definition, it’s easy to set a timeout when ‘invoking’ a participant :

1   OpenWFE.process_definition :name => 'reporting' do
2     sequence do
3       participant 'alice', :activity => 'gather information', :timeout => '2d'
4       participant 'bob', :activity => 'write report'
5       participant 'charly', :activity => 'distribute report'
6     end
7   end

Alice has two days to gather the information for the report. If she hasn’t replied/proceeded after two days, the engine will cancel her task and make the process resume (to Bob).

participant dispatch thread

By default, when a participant expressions hands a workitem to a participant, it does it in a separate thread so that the thread that applied the expression is immediately relieved (it goes on resuming other process instances).

If a participant has a method ‘do_not_thread’ and this method returns true, upon dispatching, no new thread will get created. This might be interesting for tiny, immediate-reply participants. If your participant ‘activity’ is a potential bottleneck (disk/network access, long computation), it’s recommend to stick to the default : the distinct, separate thread.

 1 require 'openwfe/participants/participant'
 2 
 3 class TinyCounterParticipant
 4   include OpenWFE::LocalParticipant
 5 
 6   def consume (workitem)
 7     @counter = (@counter || 0) + 1
 8     workitem.tiny_count = @counter
 9     reply_to_engine(workitem)
10   end
11 
12   def do_not_thread
13     true
14   end
15 end

This transient counter is the ideal candidate for a ‘do_not_thread’, it simply does an addition and replies immediately. The cost of spawning a new thread just for that could become ‘visible’.

writing custom participants

TODO : a few notes about writing custom participants (especially #initialize)