[business] process administration

The model of a business process usually focuses on a “happy path”, everything happening according to the plan.

A few alternative paths may be added explicitely. Provisions for errors and timeouts may explicitely be added, thanks to :on_error and :on_timeout.

Sometimes, this is not enough. Processes may fall into errors, may stall (a participant reply got lost and there is no timeout) or they may simply become obsolete due to changes in their context.

Deciding whether an exception (error, timeout, …) is dealt explicitely in the process definition often requires experience.

This page is about manipulating process instances.

Here is the span of possible actions:

  • querying the engine about running process instances. Not an action, rather an observation

querying the engine about running process instances

To list all the processes currently running in the engine (rdoc):

              statuses = engine.processes
            

To query about a process, known by its wfid (workflow instance id) (rdoc):

              status = engine.process(wfid)
            

Those two methods return ProcessStatus instances.


cancelling

Cancelling is about terminating the execution of a process instance or of a branch of it.

Cancelling cannot occur in one operation. The engine traverses the tree of execution and cancels alive expressions one by one. Active participants receive a cancel message indicating which the id of the workitem to cancel.

cancelling: process instances

              engine.cancel_process(wfid)
            

Warning: as explained cancelling a process or a branch of a process isn’t instantaneous.

cancelling: expressions

Perhaps, the most common use case for “cancelling an expression” is cancelling a participant expression.

Let’s say you have this process:

              Ruote.process_definition do
                sequence do
                  logistics
                  delivery
                  accounting
                end
              end
            

For some reason, this process was triggered for an online product, which was downloaded successfully. The delivery is not necessary, but the delivery participant has already received the workitem for this process instance.

Cancelling the delivery participant expression would let the flow continue to “accounting”. Programmatically that would look like:

              status = engine.process(wfid)
            
              exp = status.expressions.first
            
              engine.cancel_expression(exp.fei)
            

Warning: as explained cancelling a process or a branch of a process isn’t instantaneous.

Dashboard#cancel(wfid_or_fei)

The cancel method of the dashboard will, when passed a string, order the engine to cancel the process with the given (string) wfid (if it exists). When passed an expression, a FlowExpressionId instance or a Hash containing expid/subid/wfid, it will cancel the given expression (if it exists).

Likewise Dashboard#kill(wfid_or_fei) is as flexible as #cancel.

on_cancel

Most of the time, the process definition attempts to model the “happy path”, the process as it occurs 95% of the time. One may want to go a step further and include provisions for errors (see on_error) and for cancellations.

              Ruote.process_definition do
                sequence :on_cancel => 'admin' do
                  logistics
                  delivery
                  accounting
                end
              end
            

In this process definition, if the process or the sequence gets cancelled, the admin subprocess (or the participant admin) will be applied (or simply receive a workitem).

More info about on_cancel.

killing vs cancelling

Processes (and expressions) can be cancelled or they can be killed. Killing looks much like cancelling, except that any on_cancel will be ignored.


pausing process instance

Pausing a process instance is equivalent to pausing its root expression. Pausing freezes an expression (and all its children expressions that are in a running state (not in error, or being cancelled)).

A paused expression upon receiving a reply will store it and not process it until it gets resumed.

            dashboard.pause('20120618-makumepa')
              # will pause the corresponding process instance
            
            # ...
            
            dashboard.resume('20120618-makumepa')
              # will resume the corresponding process instance
            

pausing expressions

            dashboard.pause(fei)
              # will pause the expression (and its children) behind "fei"
            
            # ...
            
            dashboard.resume(fei)
              # will resume the expressionn (allow it to reply to its parent expression)
            

dashboard.pause(fei, :breakpoint => true)

By default, pausing pauses the target expression and all its children, if one wants to set a “breakpoint”, ie let the flow resume but pause when it reaches back an expression, the breakpoint option can be used.

            dashboard.pause(fei, :breakpoint => true)
            

Only the targetted expression will get paused, not its children. It means that the flow in the branch (the children of the expression) will go on, but once the flow gets back to the expression with the breakpoint, it will stop.

(Not sure if “breakpoint” is the best word for this, since we’re not running a debugger, it’s just a regular workflow run).


re_applying

The Engine#re_apply(fei, options) method cancels an expression and re_applies. It has a few options for dealing with some process repair tasks.

Most of the technique described here are tested in ruote’s ft_14_re_apply.rb

re_applying a stalled [participant] expression

Sometimes participants don’t reply and the participant expression that emitted the workitem to them is stuck waiting. Or simply the engine could have been down when the participant answer came and the answer got lost.

With a re_apply, it’s possible to re-apply the delivery and hopefully restart the interaction (though that could ultimately depend on the participant implementation).

              pdef = Ruote.process_definition do
                sequence do
                  alpha
                  bravo
                end
              end
            
              wfid = engine.launch(pdef)
            
              # ... process stalls at participant alpha ...
            
              stalled_fexp = engine.process(wfid).expressions.last
                #
                # expressions in process status (as returned by Engine#process) are
                # ordered with the root as the first and the leaves in the last positions
            
              engine.re_apply(stalled_fexp.fei)
                #
                # will cancel the expression at 'alpha' and re_apply it, triggering
                # a new dispatch to the participant alpha points to.
            

re_applying a process branch that went wrong

If a process branch went wrong, it’s sometimes necessary to redo it.

Some process developers like to include those “something went wrong” paths as extension to the happy path and they use the cursor expression with some rewind magic. But sometimes it’s really necessary to rewind/redo manually.

            require 'rubygems'
            require 'yajl'
            require 'ruote'
            
            engine = Ruote::Engine.new(Ruote::Worker.new(Ruote::HashStorage.new))
            
            pdef = Ruote.define do
              sequence do
                alpha
                bravo
                zebulon
              end
            end
            
            class ThisParticipant
              include Ruote::LocalParticipant
              def consume(workitem)
                puts "* #{workitem.participant_name}"
                reply_to_engine(workitem)
              end
            end
            class ThatParticipant
              include Ruote::LocalParticipant
              def consume(workitem)
                puts ". #{workitem.participant_name}"
                # no reply to engine
              end
              def cancel (fei, flavour)
                # ok, let pass
              end
              def do_not_thread; true; end
            end
            
            engine.register_participant 'alpha', ThisParticipant
            engine.register_participant 'bravo', ThisParticipant
            engine.register_participant 'charlie', ThisParticipant
            engine.register_participant 'zebulon', ThatParticipant
            
            #engine.context.logger.noisy = true
            
            wfid = engine.launch(pdef)
            
            engine.wait_for(:zebulon)
            
            # ... process reached 'zebulon', but there something wrong
            #     we need to redo that sequence differently ...
            
            ps = engine.process(wfid)
            
            sequence = ps.expressions.find { |fexp| fexp.fei.expid == '0_0' }
              # first child '0' after the root '0' => '0_0'
            
            puts "re_applying..."
            
            engine.re_apply(sequence.fei, :tree => Ruote.to_tree do
              sequence do
                charlie
                bravo
                alpha
              end
            end)
            
            engine.wait_for(wfid)
            

(gist at http://gist.github.com/485559)

This self-containing example will output:

            * alpha
            * bravo
            . zebulon
            re_applying...
            * charlie
            * bravo
            * alpha
            

The key here is the :tree option passed to re_apply, it gives that has to be applied instead of the target expression.

re_applying and changing the [workitem] fields

When re_applying, the workitem used for the re_apply is the one used to originally apply.

The re_apply method accepts the :fields and :merge_in_fields options. :fields completely changes the workitem payload used for the re_apply, while the :merge_in_fields, allows to add new fields to the workitem payload.

A few examples:

            engine.re_apply(fei, :fields => { 'customer' => 'Alfred' })
            
            engine.re_apply(fei, :merge_in_fields => { 'customer' => 'Alfred' })
            
            engine.re_apply(
              fei,
              :fields => { 'customer' => 'Alfred', 'items' => [ 124, 356, 798 ] },
              :tree => Ruote.to_tree { sequence { print_invoice; package; ship } })
            

 


process errors

Each time a process encounters an error (most likely in a participant implementation), the process stops and ‘documents’ the error.

If a process is in two concurrent branches, only the execution of the branch with the error will stop. A process instance can have as many errors has it has concurrent branches.

Here is a program that runs a process with a unreliable participant:

            require 'rubygems'
            require 'ruote'
            
            class TestParticipant
              include Ruote::LocalParticipant
            
              def consume(workitem)
                raise "Houston, something is wrong"
              end
            end
            
            engine = Ruote::Engine.new(Ruote::Worker.new(Ruote::HashStorage.new))
            
            engine.register do
              catchall TestParticipant
            end
            
            pdef = Ruote.process_definition do
              bad
            end
            
            wfid = engine.launch(pdef)
            
            engine.wait_for(wfid)
              # blocks until the process terminates or gets into an error
            
            err = engine.process(wfid).errors.first
            puts "intercepted an error: #{err.message}..."
            

It’s possible to query the engine about the status of a process instance and its errors.

Once a process got into an error, there are 3 possibilities:

plus one that has to be prepared in advance:

  • having acknowledged that such an error might occur, set an :on_error attribute on the branch where the error might occur

Which possibility to use depends on your process and context.

errors: cancelling the whole process

The car won’t start ? Let’s dump it.

              engine.cancel_process(wfid)
            

errors: cancelling

First variant, cancelling at the error point:

              ps = engine.process(wfid)
              engine.cancel_expression(ps.errors.first.fei)
            

This will make the flow resume right after the expression in error.

Second variant, cancelling upstream. It sounds nice but it actually requires knowledge of the process definition.

Consider this process definition:

              Ruote.process_definition do
                concurrence do
                  alpha
                  bravo
                  sequence do
                    charly
                    delta
                  end
                end
              end
            

If an error occurred at charly, you could cancel at charly directly, or the sequence, or the concurrence.

Without looking at the process definition and knowing the participants involved, it’s hard to cancel “upstreams”.

This alpha-bravo-charly example is expanded at https://github.com/jmettraux/ruote/blob/ruote2.1/snips/where_to_cancel.rb

errors: replaying

Launching a process, intercepting the error, fixing its cause and replaying:

            require 'rubygems'
            require 'ruote'
            
            $mode = :fail
            
            class TestParticipant
              include Ruote::LocalParticipant
            
              def consume(workitem)
                raise "Houston, something is wrong" if $mode == :fail
                reply_to_engine(workitem)
              end
            end
            
            engine = Ruote::Engine.new(Ruote::Worker.new(Ruote::HashStorage.new))
            
            engine.register do
              catchall TestParticipant
            end
            
            pdef = Ruote.process_definition do
              bad
            end
            
            wfid = engine.launch(pdef)
            
            engine.wait_for(wfid)
            
            err = engine.process(wfid).errors.first
            puts "intercepted an error: #{err.message}... fixing..."
            
            $mode = :success
              # fixing the cause of the error
            
            engine.replay_at_error(err)
            
            engine.wait_for(wfid)
            
            p engine.process(wfid)
              # => nil (the process is over)
            

on_error

The :on_error attribute is explained in the “common attributes” page.

(isn’t prevention of errors better than dealing with them automatically ?)