Processes
Control over the timing of execution of activities is another area that differentiates workflows from traditional programs. A cloud workflow allows for describing explicitly how multiple activities are orchestrated: whether they run sequentially or concurrently and when to synchronize.
The building blocks for describing the sequencing are the sub and concurrent expressions. All sub-expressions of a sub expression execute sequentially while all sub-expressions of a concurrent expression execute concurrently. sub expressions can be nested inside concurrent expressions and vice versa providing the mean for describing what runs concurrently and when to synchronize.
concurrent do # the following two blocks execute concurrently 
  sub do # Expressions below (until 'end') execute sequentially 
    @servers = rs_cm.servers.get(filter: ["name==my_server"])
    @instances = @servers.launch()
    sleep_until(all?(@instances.state, "operational"))
  end 
  sub do 
    @servers = rs_cm.servers.get(filter: ["name==my_other_server"])
    @instances = @servers.launch()
    sleep_until(all?(@instances.state, "operational"))
  end 
end # Marks the synchronization point for the concurrence 
# At this point all servers are operational: both concurrent sequences are finished. 
Process Tasks
All activities taking place in a process occur in tasks. Each iteration of a concurrent foreach expression runs in its own task. Processes start with one task: the main task. The task_name attribute can be used to adorn sub-expressions of the concurrent expression to name tasks. This attribute can be specified on any expression (changing the name of the current task each time). However, the convention is to adorn the outer sub expression if there is one.
That name can then be used to wait for that task:
# Update code on app servers 
define update_code(@app_servers, $branch) do 
  # Obtain the current branch (through a tag in one of the app server) 
  call get_current_branch(first(@app_servers)) retrieve $previous_branch 
 
  # Concurrently pull code on all app servers 
  concurrent foreach @server in @app_servers task_prefix: "codepull", timeout: 10m do 
    call run_recipe(@server, "app::pull_code", { branch: $branch })
  end 
 
  # Perform a rolling update on all app servers (detech from loadbalancer, restart service, attach to load balancer) 
  foreach @server in @app_servers task_prefix: "restart", on_rollback: rollback_code_update(), on_error: handle_update_error() do 
    call run_recipe(@server, "lb::detach", {})
    call run_recipe(@server, "app::restart", {})
    call run_recipe(@server, "lb::attach", {})
  end 
end 
 
define rollback_code_update() do 
  call run_recipe(@server, "app:pull_code", { branch: $previous_branch })
  call run_recipe(@server, "app::restart", {})
  call run_recipe(@server, "lb::attach", {})
end 
 
define handle_update_error() do 
  cancel_task
end 
 
...
The snippet above pulls code on all application services concurrently. Once the code pull is completed, the service is restarted on each application service sequentially. Any error during this process will cancel the task using cancel_task keyword which rolls the code back to the previous known working version.
Click the appropriate link below to learn more about process tasks.
Task Names
Tasks can be referred to using two different names: the local name (used in the example above) is the name used with the task_name attribute. This name can only be used to refer to a task that is a sibling, that is a task that was launched by the same task that also launched the task using the name. The other way to address a task is to use its global name: this name is defined using the parent tasks names recursively (excluding the main task) combined with the current task name using / as separator:
concurrent do 
  sub task_name: "grand_parent" do 
    concurrent do 
      sub task_name: "parent" do 
        concurrent do 
          sub task_name: "child" do 
            # do something 
          end 
          concurrent do 
            sub do 
              wait_task grand_parent/parent/child # cannot use local name for 'child' task because 
                                                  # not in a sibling task 
              # do something 
            end 
            sub do
              # do something 
            end 
          end 
        end 
      end 
    end 
  end 
end 
Tasks that are not explicitly named using the task_name attribute get assigned a unique name by the engine. The task_name() function (functions are covered in the next section) returns the global name of the current task.
Task names must be strings or variables containing strings:
$name = "foo" 
concurrent do 
  sub task_name: $name do 
    # do something 
  end 
  sub  do  
    wait_task $name 
    # do something 
  end 
end 
Controlling Tasks
As mentioned earlier tasks can be canceled or aborted. The respective keywords are cancel_task and abort_task and they will apply to the task executing this action.
A process can also be canceled or aborted in its entirety using respectively the cancel or abort keyword. Executing any of these has the same effect as executing the corresponding task version in all currently running tasks. In particular, this means that canceling or aborting a process will take effect once all tasks have finished running their current expression. The exact behavior of canceling and aborting are described below in Ending Processes.
# If an error occurs during launch then cancel all launching tasks and terminate all servers. 
define cancel_launch(@servers) do 
  # Wait for the other task to finish before terminating the servers and canceling the process. 
  if task_name() == "/root/launch_app_server" 
    wait_task launch_app_fe 
  else 
    wait_task launch_app_server 
  end 
  @servers.terminate() 
  cancel 
end 
 
...
@servers = rs_cm.deployments.get(filter: ["name==default"]).servers() # Retrieve all servers of "default" deployment 
concurrent return @app_servers do 
  sub task_name: "launch_app_server", on_error: cancel_launch(@servers) do 
    @app_servers = select(@servers, { "name": "app_server" }) # Select servers named "app_server" 
    @app_servers.launch()                                     # and launch them 
  end 
  sub task_name: "launch_app_fe", on_error: cancel_launch(@servers) do 
    @fe_servers = select(@servers, { "name": "front_end" })  # Select servers named "front_end" 
    @fe_servers.launch()                                     # and launch them 
  end 
end 
sleep_until(all?(@app_servers.state[], "operational"))       # Wait until all app servers are operational 
Checking for Tasks
As covered earlier a task can be waited on using the wait_task keyword. The current task blocks until the given task finishes (i.e. completes, fails, is canceled, or is aborted). If the given task is not even started, the current task will wait until it is started before waiting for it to finish. This keyword will have no effect if the task has already completed, but will raise an error if there is no task with the given name.
@servers = rs_cm.deployments.get(filter: ["name==default"]).servers()
concurrent do 
  sub task_name: "launch_app_server" do 
    @servers = select(@servers, { name: "app_server" })
    @servers.launch()
    sleep_until(all?(@servers.state[], "operational"))
    # Wait for load balancers to become available 
    wait_task launch_app_fe
    call run_recipe(@servers, "lb::attach", {})
  end 
  sub task_name: "launch_app_fe" do 
    @servers = select(@servers, { name: "front_end" })
    @servers.launch()
    sleep_until(all?(@servers.state[], "operational"))
  end 
end 
wait_task can also be used with a number indicating the number of tasks that should be waited on. The task running the wait_task expression blocks until the given number of tasks complete. Note that this form is mostly useful when used as an attribute on a concurrent expression to indicate how many concurrent tasks should complete before the next expression runs.
Finally, wait_task also accepts an array of task names corresponding to the tasks that should complete prior to the execution resuming. This form can also be used as an attribute:
@servers = rs_cm.deployments.get(filter: "name==default").servers()
concurrent wait_task: [launch_app_server, launch_app_fe] do 
  sub task_name: "launch_app_server" do 
    @servers = select(@servers, { "name": "app_server" })
    @servers.launch()
    sleep_until(all?(@servers.state[], "operational"))
  end 
  sub task_name: "launch_app_fe" do 
    @servers = select(@servers, { "name": "front_end" })
    @servers.launch()
  end 
  sub do 
    @servers = select(@servers, { "name": "diagnostics_servers" })
    @servers.launch()
  end 
end 
# At this point the diagnostics servers may not have been launched yet (the last sub block may not have completed) 
Synchronization Primitives
The most basic synchronization primitive is a bare concurrent expression. Each sub-expression of concurrent expression runs in its own task. This expression will block until all sub-expressions have completed. Sometimes more control is needed. For example, it may suffice for one of the concurrent expressions to finish before proceeding. The concurrent expression wait_task attribute can be used in two different ways to provide the additional control:
- 
When wait_taskis followed by an integer, the concurrent expression will return after the corresponding number of tasks have completed.
- 
When wait_taskis followed by a list of task names, the concurrent expression will return after the corresponding tasks have completed.
In the following example:
concurrent wait_task: 1 do 
  sub do 
    @servers = find("servers", "app_server_1")
    @servers.launch()
    sleep_until(all?(@servers.state[], "operational"))
  end 
  sub do 
    @servers = find("servers", "app_server_2")
    @servers.launch()
    sleep_until(all?(@servers.state[], "operational"))
  end 
end 
 
# At this point at least one of the sequences above has completed 
@servers = @servers = find("servers", "front_end")
@servers.launch()
The front-ends will be launched as soon as either all servers tagged with app:role=app_server_1 or servers tagged with app:role=app_server_2 are operational. As stated above tasks can be waited on using their names:
concurrent wait_task: databases, app_servers do 
  sub task_name: "databases" do 
    @servers = find("servers", "database")
    @servers.launch()
    sleep_until(all?(@servers.state[], "operational"))
  end 
  sub task_name: "app_servers" do 
    @servers = find("servers", "app_server")
    @servers.launch()
    sleep_until(all?(@servers.state[], "operational"))
  end 
  sub task_name: "additional" 
    @servers = find("servers", "additional")
    @servers.launch()
  end 
end 
 
# At this point the databases and app_servers tasks have completed 
@servers = find("servers", "front_end")
@servers.launch()
One interesting application of the wait_task attribute is when used in conjunction with the number 0 as follows:
concurrent wait_task: 0 do 
  sub do 
    @servers = find("servers", "database")
    @servers.launch()
  end 
  sub do 
    @servers = find("servers", "app_server")
    @servers.launch()
  end 
end 
 
# At this point tasks above have not completed 
@servers = find("servers", "diag_server")
@servers.launch()
In this case, the process proceeds past the concurrent expression without waiting for any of the launched tasks. This is the same behavior as wrapping the whole definition extract above in an outer concurrent.
Tasks and State
Whenever a task is launched it gets its own copy of the parent task state. This includes all references and all variables currently defined in the parent task.
$n = 3 
@servers = rs_cm.deployments.get(filter: ["name==default"]).servers()
concurrent do 
  sub do 
    # $n is equal to 3 and @servers contain all servers in the "default" deployment 
    $n = 5 
    @servers = rs_cm.deployments.get(filter: ["name==other"]).servers()
    # $n is equal to 5 and @servers contain all servers in the "other" deployment 
  end 
  sub do 
    # $n is equal to 3 and @servers contain all servers in the "default" deployment 
  end 
end 
Once a task finishes its state is discarded, however, it is sometimes necessary to retrieve state from a different task. RCL provides two mechanisms to share state across tasks:
- The values for global references and variables are stored in the process, they can be written to and read from by any task.
- A concurrentsub expression may optionallyreturnlocal variables or references. Such values are merged back into the parent task. If multiple tasks in the concurrence return the same value then the behavior is undefined; in other words the code needs to use different names for values returned by different tasks.
Here is an example using the return keyword:
define main(@server1, @server2) return @server1, @server2 do 
  concurrent return @server1, @server2 do 
    sub do 
      provision(@server1)
    end 
    sub do 
      provision(@server2)
    end 
  end 
  # @server1 and @server2 are now operational 
end 
concurrent foreach
Another way to create tasks in a process apart from concurrent is through the concurrent foreach expression. This expression runs all sub-expressions in sequence on all resources in the given resources collection concurrently. In other words a task is created for each resource in the collection:
@instances = rs_cm.get(href: "/api/deployments/123").servers().current_instance()
concurrent foreach @instance in @instances do 
  @instance.run_executable(recipe_name: "cassandra::default")
  @instance.run_executable(recipe_name: "cassandra::configure")
  @instance.run_executable(recipe_name: "cassandra::restart")
end \
In the snippet above, the three RightScripts get run sequentially on all servers in the collection at once. If the @servers collection in the example above contained two resources the following would have the same effect:
@instances = rs_cm.get(href: "/api/deployments/123").servers().current_instance()
concurrent do 
  sub do 
    @instance = @instances[0]
    @instance.run_executable(recipe_name: "cassandra::default")
    @instance.run_executable(recipe_name: "cassandra::configure")
    @instance.run_executable(recipe_name: "cassandra::restart")
  end 
  sub do 
    @instance = @instance[1]
    @instance.run_executable(recipe_name: "cassandra::default")
    @instance.run_executable(recipe_name: "cassandra::configure")
    @instance.run_executable(recipe_name: "cassandra::restart")
  end 
end 
Sometimes it is necessary to explicitly refer to one of the tasks that was spawned from the concurrent foreach execution. The task_prefix attribute is only valid for the concurrent foreach expression and allows defining a common prefix for all generated tasks. The task names are built from the prefix and the index of the resource in the collection:
concurrent do
  sub task_name: "run_scripts" do 
    @servers = rs_cm.get(href: "/api/deployments/123").servers()
    concurrent foreach @server in @servers task_prefix: cassandra_setup do 
      @instance = @server.current_instance()
      @instance.run_executable(recipe_name: "cassandra::default")
      @instance.run_executable(recipe_name: "cassandra::configure")
      @instance.run_executable(recipe_name: "cassandra::restart")
    end 
  end 
  sub do 
    wait_task run_scripts/cassandra_setup0 # Wait for execution of scripts on first server in collection above to complete 
  end 
end
In the example above, cassandra_setup0 refers to the task generated to run the concurrent foreach sub-expressions on the first resource in the @servers collection.
concurrent map
Apart from concurrent and concurrent foreach, concurrent map is the only other way to create tasks in a process. A concurrent map works as expected: each iteration runs concurrently and the resulting collections are built from the results of each iteration.
Even though the resulting collections are built concurrently, concurrent map guarantees that the order of elements in the final collection(s) match the order of the collection being iterated on.
So, for example:
@servers = rs_cm.get(href: "/api/deployments/123").servers()
 
# Launch all servers concurrently and conditionally run a script on the resulting 
# instance once it is operational. 
@instances = concurrent map @server in @servers return @instance do 
  @instance = @server.launch()
  sleep_until(@instance.state == "operational")
  if @instance.name =~ "/^app_/" 
    @instance.run_executable(recipe_name: "app::connect")
  end 
end 
In the example above the instances in the @instances collection will be ordered identically to the servers in the @servers collection (that is, the instance at a given index in the @instances collection will correspond to the server at the same index in the @servers collection).
Ending Processes
A process ends once all its tasks end. There are four conditions that will cause the execution of a task to end:
- Completing the task: the task has no more expressions to run.
- Failing the task: an expression raised an error that was not handled.
- Canceling the task: this can be done through the cancelandcancel_taskkeywords.
- Aborting the task: this can be done through the abortandabort_taskkeywords.
For more information, see:
Canceling a Task or a Process
Canceling a task can be done at any time in any task using the cancel_task keyword. This provides a way to finish cleanly a task that still has expressions to run. The cloud workflow can define rollback handlers that get triggered when the task cancels. These handlers behave much like timeout or error handlers: they may take arbitrary arguments and inherit the local variables and references of the caller. Nested rollback handlers are executed in reverse order as shown in this example:
define delete_deployment($deployment_name) do 
  @deployment = rs_cm.deployments.get(filter: ["name==" + $deployment_name])
  @deployment.destroy()
end 
 
define delete_servers($server_names) do 
  foreach $name in $server_names do 
    @server = rs_cm.servers.get(filter: ["name==" + $name])
    @sever.destroy()
  end 
end 
sub on_rollback: delete_deployment($deployment_name) do# Assumes $deployment_name exists 
  rs_cm.deployments.create(deployment: { "name": $deployment_name })
  # ... do more stuff 
  sub on_rollback: delete_servers($server_names) do    # Assumes $server_names exists 
    foreach $name in $server_names do 
      # Assumes $server_params exists and is a hash of all required params to create a server 
      rs_cm.servers.create(server: { "name": $name } + $server_params)
      # ... do more stuff 
    end 
    # ... do more stuff, initialize $has_errors 
    if $has_errors 
      cancel_task # Will call both delete_servers and delete_deployment in this order 
    end 
  end 
end 
In this snippet, if $has_errors gets initialized then the process is canceled and both the delete_servers and the delete_deployment get run in that order.
Canceling a process is done using the cancel keyword. This causes all the running tasks to be canceled and follow the same logic as above, potentially executing multiple rollback handlers concurrently. Once all rollback handlers finish then the process ends and the status of all its tasks is set to canceled.
Aborting a Task or a Process
Tasks can also be terminated through the abort_task keyword. This causes the task to finish immediately bypassing all rollback handlers. The abort keyword causes all the tasks in the current process to be aborted. The process thus finishes immediately and the status of all its tasks is set to aborted.
Concurrency and Expressions
As described in Cloud Workflow Language Overview and Definitions, a definition consists of a sequence of statements. Each statement is in turn made of expressions. The engine makes the following guarantee: expressions always run atomically.
In particular, if we consider any expression running in a concurrence (inside a concurrent, concurrent foreach, or concurrent map), then the rule above dictates that each concurrent expression runs atomically. So if we consider:
concurrent do 
  sub do 
    @servers = rs_cm.servers.get(filter: "name==app_server")
    @@instances << @servers.launch() # (1) 
  end 
  sub do 
    @@instances << rs_cm.get(href: "/api/servers/123").launch() # (2) 
  end 
end 
In the definition above, statement (1) is composed of two expressions: the call to the launch() action followed by the assignment to @@instances. Statement (2) is composed of 3 expressions: the call to get() followed by the call to launch() and finally the append to the @@instances collection. Since expressions run atomically the definition above guarantees that the @@instances collection will end-up with all instances, there is no need to explicitly synchronize the appends to @@instances. There is no guarantee about ordering though so it could be that the single instance retrieved in statement 2 is first in the collection.
Note that the following could generate inconsistent results:
# DO NOT DO THIS 
@instances = rs_cm.get(href: "/api/deployments/123").servers(filter: ["state==operational"]).current_instance()
 
$$failed = 0 
 
concurrent foreach @instance in @instances do 
  @task = @instance.run_executable(recipe_name: "sys:do_reconverge_list_disable")
  sleep_until(@task.summary =~ "/^completed|^failed/")
  if @task.summary =~ "/^failed" 
    $$failed = $$failed + 1 # (1) Oops, $$failed can be overridden concurrently 
  end 
end 
In the example above, statement (1) is comprised of two expressions: the increment and the assignment. If two tasks were to increment concurrently after reading the same value then one of the increments would get lost (both tasks would write back the same value to $$failed). The concurrent map expression should be used instead to build results concurrently:
# DO THIS INSTEAD 
@instances = rs_cm.get(href: "/api/deployments/123").servers(filter: ["state==operational"]).current_instance()
 
$failed_ones = concurrent map @instance in @instances return $failed_one do 
  @task = @instance.run_executable(recipe_name: "sys:do_reconverge_list_disable")
  sleep_until(@task.summary =~ "/^completed|^failed/")
  if @task.summary =~ "/^failed" 
    $failed_one = 1 
  end # Do not return anything on success 
end 
$failed = size($failed_ones)
The concurrent map expression takes care of building the resulting array from the results returned by each concurrent execution. There is no problem of the task overriding values concurrently in this case.
Processes Summary
A process may run one or more tasks concurrently at any time. Cloud Workflow Language allows for describing how these tasks should be synchronized by providing both synchronization primitives and keywords for controlling tasks individually. A process ends once all its tasks end. A task ends when it completes (no more expression to execute), fails (an expression raises an error that is not handled), is canceled or is aborted. Canceling a task will cause any rollback handler to trigger and do additional processing before the task ends.
The concept of tasks and definitions are completely disjoint and should not be confused: a definition always runs in the task that ran the call expression. In other words, simply using call does not create a new task.