Elixir/OTP : Basics of Tasks

Arunmuthuram M
24 min readFeb 9, 2024

--

Tasks in Elixir are abstractions built on top of processes. Unlike GenServers which are mostly used as long living state machines, tasks are short lived processes that are spawned to perform a single operation without the need for holding state and extensively communicating with other processes. They are mainly used for concurrent execution of different operations as opposed to performing them sequentially.

Task.start

A task can be started in many ways using the functions of the Task module. The Task.start/1 takes in a zero-arg anonymous function, spawns a process and executes the given anonymous function in the spawned process, which exits as soon as the execution is complete. The Task.start/1 immediately returns a tuple in the format {:ok, task_pid} after spawning the task process.

{:ok, task_pid} = Task.start(fn -> IO.puts("executing task") end)
executing task

Process.alive?(task_pid)
false

There is no link between the called process and the spawned process. Hence, even if one of the processes exits due to a crash, the other process is unaffected. If the spawned process fails and exits due to an error when executing the anonymous function, an error will be logged.

self()
#PID<0.109.0>

{:ok, task_pid} = Task.start(fn -> raise("error") end)
18:05:44.110 [error] Task #PID<0.149.0> started from #PID<0.109.0> terminating
** (RuntimeError) error
(elixir 1.16.0) src/elixir.erl:405: :elixir.eval_external_handler/3
Function: #Function<43.125776118/0 in :erl_eval.expr/6>
Args: []

self()
#PID<0.109.0> # task process crash does not affect the caller process
---------------------------------------------------------------------------
self()
#PID<0.109.0>

{:ok, task_pid} = Task.start(fn -> Process.sleep(60000) end)

Process.exit(self(), :kill) # caller process crash
** (EXIT from #PID<0.109.0>) shell process exited with reason: killed

self()
#PID<0.111.0>

Process.alive?(task_pid) # Task process unaffected by caller process's crash
true

The Task module also offers an alternate version Task.start/3, which takes in a module, function atom and a list of arguments instead of an anonymous function. The Task.start/1 will internally call the Task.start/3 function using :erlang.apply/2 as follows.

Task.start(anonymous_function)
||
\/
Task.start(:erlang, :apply, [anonymous_function, []])

The Task.start function is mainly used for performing operations with side effects and in situations where the caller process does not require the result of the task operation.

The Task module also offers the Task.start_link/1 and Task.start_link/3 functions that are similar to the start function, but they create a process link between the caller and the spawned process. Hence if one of the processes crashes abnormally, an exit signal will be sent to the other process, making it also crash if it does not explicitly trap exits. These functions are used to create tasks under a supervision tree which will be discussed in detail in another article.

self()
#PID<0.109.0>

{:ok, task_pid} = Task.start_link(fn -> raise("error") end)
18:05:44.110 [error] Task #PID<0.149.0> started from #PID<0.109.0> terminating
** (RuntimeError) error
(elixir 1.16.0) src/elixir.erl:405: :elixir.eval_external_handler/3
Function: #Function<43.125776118/0 in :erl_eval.expr/6>
Args: []
** (EXIT from #PID<0.109.0>) shell process exited with reason: an exception was raised:
** (RuntimeError) error
(elixir 1.16.0) src/elixir.erl:405: :elixir.eval_external_handler/3

self()
#PID<0.111.0> # task process crash, crashes the caller process
---------------------------------------------------------------------------
self()
#PID<0.109.0>

{:ok, task_pid} = Task.start_link(fn -> Process.sleep(60000) end)

Process.exit(self(), :kill) # caller process crash
** (EXIT from #PID<0.109.0>) shell process exited with reason: killed

self()
#PID<0.111.0>

Process.alive?(task_pid) # Task process crashed by caller process's crash
false

Task.async

The Task.async/1 function is used when the caller process requires the result of a spawned task. Similar to the Task.start function, it takes a no-arg anonymous function as its argument. It spawns a process, creates a process link, enables process monitoring on the task process, initiates execution of the anonymous function in the task process and returns a Task struct to the caller process. After the anonymous function finishes execution in the task process, the task process returns the result of the execution back to the caller process through a message, and then exits normally.

Unlike the Task.start function that returns {:ok, task_pid}, the Task.async call immediately returns a Task struct that contains information about the module name, function name and the arity of the function that is being executed by the task process, the pid of the caller process, the pid of the task process and a unique task monitor reference that was created when enabling process monitoring on the task process.

Task.async(fn -> Process.sleep(4000); IO.puts("task execution") end)
%Task{ # returned immediately
mfa: {:erlang, :apply, 2},
owner: #PID<0.111.0>,
pid: #PID<0.112.0>,
ref: #Reference<0.0.14211.3176322712.2278621188.70801>
}

task execution # printed after 4 seconds

Since process linking is enabled, if one of the processes crashes and if the other process is not trapping exits, then the other process will also crash due to the propagated exit signal. Since process monitoring is enabled for the task process, the caller process will receive a :DOWN message when the task process finishes execution and exits normally or when it crashes and exits abnormally.

Similar to the Task.start function, the Task module offers Task.async/3 function that takes in a module, function and an arguments list instead of an anonymous function. The Task.async/1 internally calls the 3 arity clause using :erlang.apply/2.

Task.await

As we have seen above, the Task.async executes an anonymous function in a spawned process and returns the result back to the caller process through messages.

 Task.async(fn -> 2 + 2 end)
%Task{
mfa: {:erlang, :apply, 2},
owner: #PID<0.115.0>,
pid: #PID<0.116.0>,
ref: #Reference<0.0.14723.3176322712.2278621188.70902>
}

Process.info(self(), :messages)
{:messages,
[
{#Reference<0.0.14723.3176322712.2278621188.70902>, 4},
{:DOWN, #Reference<0.0.14723.3176322712.2278621188.70902>, :process,
#PID<0.116.0>, :normal}
]}

As you can see above, after the task process finishes execution, there are two messages sent from the task process to the caller process. The first message contains the result of the task execution and the second message is sent to indicate that the task process has exited after execution. Instead of manually reading the result message from the mailbox, the Task.await/2 can be used to achieve the same. If you look closely in the code above, both the messages sent by the task process contain the same unique task monitor reference present in the Task struct that was returned when the task was created. Task.await internally uses this unique reference to pattern match the correct result message sent from a task process.

The Task.await/2 takes in two arguments such as the Task struct of the task that was returned when the task was created and an optional timeout in milliseconds whose default value is 5000. The Task.await call is synchronous and hence it blocks the caller process until it is done. As soon as the Task.await is called on a task struct, a timer is started for the default 5000 milliseconds or the explicitly passed milliseconds via 2nd argument. The caller process starts going through its messages and tries to find a result message that matches the unique task monitor reference present in the task struct. If there is no such message yet, then the caller process will continue waiting for the result message for the provided time. If the timeout occurs before the process matches a result message, then the caller process will crash, if not caught. Since process linking is enabled, the task process will also crash in this case, if it does not trap exits. Alternatively, if the caller process does find a matching result message before timeout, it will extract the 2nd element of the message tuple and will return it from the Task.await call. If a successful result message was found, the task process will be demonitored and the :DOWN message sent by the task process on exit will also be removed from the mailbox, if it is present.

task = Task.async(fn -> 3 + 3 end)

Task.await(task)
6
---------------------------------------------------------------------------
task = Task.async(fn -> Process.sleep(60000) end)

Task.await(task, 1000)
** (exit) exited in: Task.await(%Task{mfa: {:erlang, :apply, 2}, owner: #PID<0.115.0>, pid: #PID<0.127.0>, ref: #Reference<0.0.14723.3176322712.2278621188.71245>}, 1000)
** (EXIT) time out
(elixir 1.16.0) lib/task.ex:874: Task.await_receive/3
iex:20: (file) # caller process exits after waiting for 1000 milliseconds

Process.alive?(task.pid) # task process exits due to process linking
false

# please note that the iex shell process catches exits by default and so will
# not crash in the above scenario. But when the above code is executed in a
# non iex shell process, it will crash as expected

Task.async and Task.await can be used to perform operations concurrently instead of a sequential execution as shown below.

defmodule Test do
def sequential() do # takes a total of 8 seconds
result1 = http_request(:get, "https://url_1.com") # takes 2 seconds
result2 = http_request(:get, "https://url_2.com") # takes 2 seconds
do_something_else() # takes 2 seconds
process_results(result1, result2) # takes 2 seconds
end

def tasks() do # takes a total of 4 seconds
task1 = Task.async(fn -> http_request(:get, "https://url_1.com") end)
task2 = Task.async(fn -> http_request(:get, "https://url_2.com") end)
do_something_else() # takes 2 seconds
process_results(Task.await(task1), Task.await(task2)) # takes 2 seconds
end
end
---------------------------------------------------------------------------

#Sequential
|result1|
--------|result2|
----------------|do_something_else|
----------------------------------|process_results|
#concurrent
| task1 |
| task2 |
|do_something_else|
------------------|process_results|

In the code above, the function tasks initiates two tasks for performing one http request each, but it returns the task structs immediately and will not block the caller process, since the requests are performed concurrently by separate tasks. When the do_something_else function is being executed, the http requests will also be executed concurrently in two other tasks. When the execution of do_something_else is completed, the http requests would have also been completed and the results would have been sent back to the caller process. When Task.await is called on both the tasks, since the result messages are already present in the mailbox, the results of the http requests can then be passed in straight away into the process_results function, thus cutting down the time it takes for the function to finish its execution.

In a GenServer process, any received message that is not a call or cast request is consumed by the handle_info callback. Hence when Task.async is used from within a GenServer process, the result of the task can be read by using the handle_info callback. Moreover, since Task.await is synchronous, using it inside the GenServer to read the result of a task will block the GenServer process making it unable to process the other messages in the mailbox, ultimately causing performance issues. Hence, when using Task.async in GenServers, the returned task structs can be stored in the process state and the result messages can be matched in handle_info callback clauses using the unique task monitor reference. The :DOWN messages sent by the task process at the time of its exit due to process monitoring can also be matched and handled using the handle_info callback. Please note that since process linking is enabled when using Task.async, a crash in the task process will lead to a crash in the GenServer process. This can be avoided by trapping exits in the GenServer process or by using Task.Supervisor.async_nolink/3 which will be discussed in detail in another article on supervisors.

defmodule Test do
use GenServer

def start(), do: GenServer.start(__MODULE__, nil, name: __MODULE__)

def request(url), do: GenServer.cast(__MODULE__, {:get, url})

defp get_request(_) do
random = Enum.random(1..10)
if random < 8, do: "result#{random}", else: raise "error#{random}"
end

@impl true
def init(_) do
Process.flag(:trap_exit, true)
{:ok, %{}}
end

@impl true
def handle_cast({:get, url}, state) do
task = Task.async(fn -> get_request(url) end)
{:noreply, Map.put(state, task.ref, url)}
end

@impl true
def handle_info({ref, result}, state) do
Process.demonitor(ref, [:flush]) # demonitors the task process and removes
url = Map.get(state, ref) # task's normal exit :DOWN message from mailbox
IO.inspect(result, label: url)
{:noreply, Map.delete(state, ref)}
end

@impl true
def handle_info({:DOWN, ref, _, _, _reason}, state) do # handles task failure
url = Map.get(state, ref)
IO.inspect("Error occurred", label: url)
{:noreply, Map.delete(state, ref)}
end

@impl true
def handle_info(_, state), do: {:noreply, state}
end

---------------------------------------------------------------------------
Test.start()

Test.request("url1.com")
:ok
url1.com: result3

Test.request("url2.com")
:ok
url2.com: result6

Test.request("url3.com")
:ok
url3.com: Error Occurred

Test.request("url4.com")
:ok
url4.com: result5

Task.yield

The Task.yield/2 function is used as an alternative for Task.await/2 in scenarios where you need to check the result of a task multiple times. Task.await/2 cannot be used multiple times since the caller process will exit if it does not find a result message in the mailbox before the timeout occurs. The exit has to be explicitly caught in such cases for the caller process to stay alive. To avoid this, the Task.yield/2 performs the same way as await but after the timeout, it doesn’t crash the process and instead just returns nil. Hence it can be used multiple times on the same task until it finds the result message from the task.

Similar to Task.await/2, the Task.yield/2 takes in a task struct as its first argument and an optional timeout in milliseconds whose default value is 5000. Once the function is called, it starts a timer, starts looking for the result message by using the task monitor reference. If it finds a matching message, then it returns {:ok, result}, else it waits for the message until the timeout occurs, and then returns nil. If the task process has already exited normally or due to an error, then Task.yield/2 returns {:exit, reason}, provided that the caller process traps exits or has itself unlinked from the task process. Similar to Task.await, if a result message is found, the task process will be demonitored and its :DOWN message will be removed from the mailbox.

task = Task.async(fn -> 3 + 3 end)

Task.yield(task)
{:ok, 6}
---------------------------------------------------------------------------
task = Task.async(fn -> Process.sleep(6000); :result end)

Task.yield(task, 2000) # waits for 2 seconds
nil

Task.yield(task, 2000)# waits for 2 seconds
nil

Task.yield(task, 2000)
{:ok, :result}

Task.shutdown

The Task.shutdown/2 is used to explicitly terminate a running task process. It is commonly used along with the Task.yield function. It takes in a task struct as its first argument and a timeout in milliseconds as its second argument whose default value is 5000. This function sends an exit signal to the task process with :shutdown as reason and waits for the provided time for the task process to terminate. If after the timeout, the task process is still alive, then another exit signal is sent to the task process with :kill as reason to abruptly kill the process. The value :brutal_kill can also be passed as the second argument instead of a timeout milliseconds to directly kill the task process without giving it time to terminate. In addition to killing the task process, Task.shutdown also checks for the result message one last time in the mailbox and returns the result if found. It returns either {:ok, result} if a result message is found in the mailbox, {:error, reason} if there is no result message in the mailbox and the task process is already dead or nil if there is no result message and the task process did not send any result message before its shutdown.

task = Task.async(fn -> 3 + 3 end)

Task.shutdown(task) # result is available in message box, task is dead
{:ok, 6}

Task.shutdown(task) # task is already dead and result is not available(already read)
{:exit, :noproc}

task = Task.async(fn -> Process.sleep(600000) end)

Task.shutdown(task) # task did not return result before its shutdown
nil
Process.alive?(task.pid) # task process terminated by the shutdown function
false
----------------------------------------------------------------------------
task = Task.async(fn -> Process.sleep(6000); :result end)
result = Task.yield(task) || Task.shutdown(task)
nil # waits for 5 seconds for the result and shutsdown the task since
# there is no result

task = Task.async(fn -> Process.sleep(4000); :result end)
result = Task.yield(task) || Task.shutdown(task)
{:ok, :result} # result is returned before 5 seconds and hence it is
# returned and Task.shutdown will not be called. The task
# will exit normally after returning the result.

Task.ignore

The Task.ignore/1 function is used to remove the process link and monitoring of the task process, while not shutting it down explicitly. The task process will still be alive even after calling the Task.ignore, but it will not send a result message back to the caller once it has finished execution. Since the processes are unlinked, crash of one process will not affect the other and since the task process is demonitored, the caller process will not receive a :DOWN message when the task process exits. The Task.ignore/1 function takes in a task struct and returns either {:ok, result} if there is already a result message in the mailbox, {:exit, reason} if there is no result message in the mailbox and if there is a :DOWN message in the mailbox indicating that the process is already dead or returns nil if there is no result message and no :DOWN message in the mailbox. Once a task process has been ignored, Task.await and Task.yield, when called, will fail to find a result message as the task process will not be sending any. Task.shutdown can still be called on the ignored process to kill it, if it is still alive.

task = Task.async(fn -> 3 + 3 end)

Task.ignore(task)
{:ok, 6} # the result message is already present in the mailbox
-------------------------------------------------------------------------
task = Task.async(fn -> exit(:normal) end)

Process.info(self(), :messages)
{:messages,
[
{:DOWN, #Reference<0.0.13955.866113612.3498901505.113859>, :process,
#PID<0.151.0>, :normal}
]}

Task.ignore(task) # no result message, but a matching :DOWN message
{:exit, :normal}
--------------------------------------------------------------------------
task = Task.async(fn -> 3 + 3 end)

Task.await(task)
6

Process.info(self(), :messages)
{:messages, []}

Task.ignore(task) # no result message and :DOWN message
nil
-------------------------------------------------------------------------
task = Task.async(fn -> Process.sleep(10000); :result end)

Task.ignore(task) # no result message and :DOWN message,
nil # task process is unlinked and demonitored


Process.alive?(task.pid) # task process still alive
true

Task.yield(task, 10000) # task process will not send result message even
nil # after successful execution

Process.alive?(task.pid) # task process exited normally after execution
false

Process.info(self(), :messages) # no result and :DOWN messages from the
{:messages, []} # task process

Task.await_many

The Task.await_many/2 performs the same thing as Task.await but for multiple tasks. It takes in a list of task structs as the first argument and an optional timeout in milliseconds as its second argument whose default value is 5000. It waits for the result messages of the provided task structs and collects them when they arrive. Once it has extracted all the results, it will return the results of all the tasks as a list in the same input order. If the timeout occurs before the caller process collects all the results for the tasks, the caller process will exit. This will in turn crash all the tasks linked to the caller process, if they do not trap exits.

task1 = Task.async(fn -> 3 + 3 end)
task2 = Task.async(fn -> 4 + 4 end)
task3 = Task.async(fn -> 5 + 5 end)

Task.await_many([task1, task2, task3])
[6, 8, 10]
---------------------------------------------------------------------------
task1 = Task.async(fn -> 3 + 3 end)
task2 = Task.async(fn -> Process.sleep(10000); 4 + 4 end)
task3 = Task.async(fn -> 5 + 5 end)

Task.await_many([task1, task2, task3], 2000) # task 2 times out
** (exit) exited in: Task.await_many([%Task{mfa: {:erlang, :apply, 2}, owner: #PID<0.109.0>, pid: #PID<0.179.0>, ref: #Reference<0.0.13955.866113612.3498901505.114461>}, %Task{mfa: {:erlang, :apply, 2}, owner: #PID<0.109.0>, pid: #PID<0.180.0>, ref: #Reference<0.0.13955.866113612.3498901505.114479>}, %Task{mfa: {:erlang, :apply, 2}, owner: #PID<0.109.0>, pid: #PID<0.181.0>, ref: #Reference<0.0.13955.866113612.3498901505.114497>}], 2000)
** (EXIT) time out
(elixir 1.16.0) lib/task.ex:997: Task.await_many/5
(elixir 1.16.0) lib/task.ex:981: Task.await_many/2
iex:188: (file)

Process.alive?(task2.pid) # linked tasks have been terminated
false

Process.alive?(task3.pid)
false

# please note that the iex shell process catches exits by default and so will
# not crash in the above scenario. But when the above code is executed in a
# non-iex shell process, it will crash as expected

Task.yield_many

Similar to Task.await_many, the Task.yield_many/2 function performs Task.yield for multiple tasks. It takes in a list of task structs as its first argument and an optional timeout value in milliseconds as its second argument, whose default value is 5000. It can also take a keyword list of options such as :limit, :timeout and :on_timeout as its second argument. Once it is called, it waits for the result messages of all the tasks and collects them when they arrive. When it has collected all the results or when the timeout occurs, it returns the result of all the tasks in the same input order as a list of two-element tuples. The first element will be the task struct and the second element will be its associated result which can be either {:ok, result}, {:exit, reason} or nil depending on whether the result was found for the task.

task1 = task1 = Task.async(fn -> 3 + 3 end)
task2 = Task.async(fn -> Process.sleep(20000); 4 + 4 end)
task3 = Task.async(fn -> exit(:normal) end)

Task.yield_many([task1, task2, task3])
[ # results after 5 seconds
{%Task{
mfa: {:erlang, :apply, 2},
owner: #PID<0.182.0>,
pid: #PID<0.247.0>,
ref: #Reference<0.0.23299.866113612.3498901505.115291>
}, {:ok, 6}},
{%Task{
mfa: {:erlang, :apply, 2},
owner: #PID<0.182.0>,
pid: #PID<0.248.0>,
ref: #Reference<0.0.23299.866113612.3498901505.115309>
}, nil},
{%Task{
mfa: {:erlang, :apply, 2},
owner: #PID<0.182.0>,
pid: #PID<0.249.0>,
ref: #Reference<0.0.23299.866113612.3498901505.115327>
}, {:exit, :normal}}
]

The :limit option takes in a positive integer which is the maximum number of results that the function call will collect. As soon as the number of collected results equals the limit, the function will return the results with nil as the result for the rest of the tasks.

task1 = Task.async(fn -> 3 + 3 end)
task2 = Task.async(fn -> 4 + 4 end)
task3 = Task.async(fn -> 5 + 5 end)

Task.yield_many([task1, task2, task3], limit: 1)
[
{%Task{
mfa: {:erlang, :apply, 2},
owner: #PID<0.182.0>,
pid: #PID<0.253.0>,
ref: #Reference<0.0.23299.866113612.3498901505.115468>
}, {:ok, 6}},
{%Task{
mfa: {:erlang, :apply, 2},
owner: #PID<0.182.0>,
pid: #PID<0.254.0>,
ref: #Reference<0.0.23299.866113612.3498901505.115486>
}, nil},
{%Task{
mfa: {:erlang, :apply, 2},
owner: #PID<0.182.0>,
pid: #PID<0.255.0>,
ref: #Reference<0.0.23299.866113612.3498901505.115504>
}, nil}
]

The :timeout option takes in milliseconds or :infinity, and it is the total time until which the caller process will collect results for. As soon as the timeout occurs the collected results will be returned with nil as the result for tasks for which the result message was not found.

The :on_timeout option determines what happens to the tasks that fail to return results within the timeout. It can take one of the three values such as :nothing which is the default value, :ignore and :kill_task. The :nothing value does not do anything to the task and the result :nil is returned for the tasks that fail to return the result message within the provided time. The :ignore value will call Task.ignore/1 on the tasks that fail to return results within the given time and the :kill_task value will call the Task.shutdown/2 function on the tasks that fail to return results within the given time.

Task.completed

The Task.completed/1 function is used to create a dummy task that will not spawn a task process. It takes in the result for the task as its argument, sends a dummy result message to itself with the given result and returns a task struct. When Task.await and Task.yield is called on the task created using Task.completed, they work the same as for any other task and return the result from the dummy result message. Since there is no task process, Task.ignore will not perform any unlinking or demonitoring and Task.shutdown will not send any exit signal.

task = Task.completed(:dummy_result)
%Task{
mfa: {Task, :completed, 1},
owner: #PID<0.109.0>,
pid: nil,
ref: #Reference<0.1796000674.3522691076.36618>
}

Process.info(self(), :messages)
{:messages, [{#Reference<0.1796000674.3522691076.36618>, :dummy_result}]}

Task.await(task)
:dummy_result

Task.yield(task)
nil

Task.ignore(task)
nil

Task.shutdown(task)
nil

Task.completed is mostly used when creating multiple tasks based on input data and then awaiting/yielding on them later. If any of the input data is invalid, you could spawn a task process and return an error from there. But this is redundant since you already know the result of the task. You could instead avoid creating a task for the invalid data, but end up manually injecting the appropriate error result into the collection that holds the results of other valid tasks. Task.completed lets you avoid both the above approaches and instead lets you go with the same flow that you take for valid tasks.

input_data = [2, nil, 4]

tasks = for num <- input_data do
if is_integer(num),
do: Task.async(fn -> num * num end),
else: Task.completed({:error, :nan})
end

Task.await_many(tasks)
[4, {:error, :nan}, 16]

Task.async_stream

The Task.async_stream/3 takes in an enumerable as its first argument and an anonymous function as its second argument. It returns a stream which can be lazily evaluated when required. Whenever the stream is accessed for results, for every required number of elements in the enumerable, a task process is spawned, a process link is created, the anonymous function is executed in the spawned task process with the element from the enumerable as its argument and the result message is then sent back. The received results are then collected and returned as a list of 2-element tuples with the first element being :ok or :exit and the second element being the result or exit reason.

stream = Task.async_stream([1,2,3,4,5], fn x -> x * x end)
Enum.take(stream, 3)
[ok: 1, ok: 4, ok: 9]

In the above code, only the first 3 results are required and hence only 3 tasks will be spawned each returning the result for one of the three elements.

The Task.async_stream can also take in an optional keyword list of options as its third argument that can include :timeout, :on_timeout, :max_concurrency, :ordered and :zip_input_on_exit.

The :timeout option takes in milliseconds as its value and its default value is 5000. It is the maximum amount of time that each task is allowed to run once it has been spawned. If a task fails to finish execution and return the result message within the given time, the caller process will exit by default. This will in turn crash all the linked task processes that are currently alive.

func = fn x -> Process.sleep(x * 1000); x end

stream = Task.async_stream([1, 2, 1], func, timeout: 2500)

Enum.to_list(stream)
[ok: 1, ok: 2, ok: 1]

stream = Task.async_stream([1, 2, 1, 3], func, timeout: 2500)

Enum.to_list(stream) # caller processs crashes since the final task times out
** (exit) exited in: Task.Supervised.stream(2500)
** (EXIT) time out
(elixir 1.16.0) lib/task/supervised.ex:314: Task.Supervised.stream_reduce/7
(elixir 1.16.0) lib/enum.ex:4399: Enum.reverse/1
(elixir 1.16.0) lib/enum.ex:3728: Enum.to_list/1
iex:45: (file)

The :on_timeout option determines what happens when the timeout occurs for any of the tasks. It takes in either :exit(default) or :kill_task as its values. The default value :exit crashes the caller process when timeout occurs for any of the tasks. The value :kill_task shuts down the tasks that timed out and emits {:exit, :timeout} as the result of the timed out tasks. The caller process will not crash in this case.

func = fn x -> Process.sleep(x * 1000); x end

stream = Task.async_stream([1, 2, 1, 3], func, timeout: 2500)

Enum.to_list(stream) # caller process crashes since the final task times out
** (exit) exited in: Task.Supervised.stream(2500)
** (EXIT) time out
(elixir 1.16.0) lib/task/supervised.ex:314: Task.Supervised.stream_reduce/7
(elixir 1.16.0) lib/enum.ex:4399: Enum.reverse/1
(elixir 1.16.0) lib/enum.ex:3728: Enum.to_list/1
iex:45: (file)
-----------------------------------------------------------------------------
opts = [timeout: 2500, on_timeout: :kill_task]

stream = Task.async_stream([1, 2, 1, 3], func, opts)

Enum.to_list(stream)
[ok: 1, ok: 2, ok: 1, exit: :timeout]

The :zip_input_on_exit takes in a boolean value and adds the input element to the result of the failed tasks. The default value is false and the value true will add the input element, for which the task failed, to the result as {:exit, {input, reason}}

func = fn x -> Process.sleep(x * 1000); x end

opts = [timeout: 2500, on_timeout: :kill_task, zip_input_on_exit: true]

stream = Task.async_stream([1, 2, 1, 3], func, opts)

Enum.to_list(stream)
[ok: 1, ok: 2, ok: 1, exit: {3, :timeout}]

The :max_concurrency option takes in a positive integer and determines the maximum number of tasks that can be run concurrently at a single point of time. Its default value is the number of schedulers operating in the BEAM VM. By default, BEAM VM employs one scheduler per core and this number can be obtained by using the System.schedulers_online function.

func = fn x -> start_time = Time.utc_now()
Process.sleep(x * 1000);
{start_time, Time.utc_now()}
end

stream = Task.async_stream([1, 1, 1, 1], func, max_concurrency: 1)

:timer.tc(fn -> Enum.to_list(stream) end, :second)
{4,
[
ok: {~T[16:47:43.686000], ~T[16:47:44.704000]},
ok: {~T[16:47:44.704000], ~T[16:47:45.719000]},
ok: {~T[16:47:45.719000], ~T[16:47:46.739000]},
ok: {~T[16:47:46.739000], ~T[16:47:47.748000]}
]}

---------------------------------------------------------------------------
|task 1|
-------|task 2|
--------------|task 3|
---------------------|task 4|

In the above code the :max_concurrency option’s value is set as 1. Hence at any point of time only one task is allowed to run. The results show that the tasks are run one by one sequentially, each taking roughly a second, with all the four tasks taking roughly 4 seconds to complete.

func = fn x -> start_time = Time.utc_now()
Process.sleep(x * 1000);
{start_time, Time.utc_now()}
end

stream = Task.async_stream([1, 1, 1, 1], func, max_concurrency: 2)

:timer.tc(fn -> Enum.to_list(stream) end, :second)
{2,
[
ok: {~T[16:53:37.567000], ~T[16:53:38.578000]},
ok: {~T[16:53:37.567000], ~T[16:53:38.578000]},
ok: {~T[16:53:38.578000], ~T[16:53:39.593000]},
ok: {~T[16:53:38.578000], ~T[16:53:39.593000]}
]}

---------------------------------------------------------------------------
|task 1|
|task 2|
-------|task 3|
-------|task 4|

The :ordered option takes in a boolean and determines the order of the results returned from the stream. The default value is true and it ensures that the collected results are returned in the same order as the input enumerable. The value false will return the results in the order that the tasks are completed and not in the input enumerable order. Using the value false is more efficient since the need to buffer the results to emit them in the same order is not required. Hence if you don’t require the results to be in the input order, using the value false for :ordered is preferred.

func = fn x -> Process.sleep(x * 1000); x end

stream = Task.async_stream([4, 3, 2, 1], func)
Enum.to_list(stream)
[ok: 4, ok: 3, ok: 2, ok: 1]

stream = Task.async_stream([4, 3, 2, 1], func, ordered: false)
Enum.to_list(stream)
[ok: 1, ok: 2, ok: 3, ok: 4]

The Task module also offers an alternate version of async_stream, Task.async_stream/5, that takes in a module, function and an arguments list instead of an anonymous function. For every element in the enumerable, the element will be prepended to the arguments list and the mfa will be invoked in the task process.

stream = Task.async_stream([4, 6, 8, 11], Kernel, :div, [2])
Enum.to_list(stream)
[ok: 2, ok: 3, ok: 4, ok: 5]

Task supervisors

Task supervisor is a version of dynamic supervisor that is used explicitly for dynamically creating and supervising tasks as children. Creating tasks under a task supervisor provides more control over supervising and monitoring the spawned task process, making it possible to retry failed tasks.

Starting a Task supervisor

A task supervisor can be started directly using the Task.Supervisor.start_link/1 function which takes in a keyword list of options such as :name, :max_restarts, :max_seconds and :max_children. Internally the DynamicSupervisor.start_link/1 function will be called to start a dynamic supervisor and hence all the options mentioned above behave the same way as when they are used in a dynamic supervisor. Task.Supervisor module, similar to the DynamicSupervisor module, contains a child_spec/1 and an init/1 functions, due to which they can be directly used as children under a high level supervisor.

Task.Supervisor.start_link(name: TaskSup, max_children: 1000)
{:ok, #PID<0.120.0>}

Supervisor.start_link([Task.Supervisor], strategy: :one_for_one)
{:ok, #PID<0.121.0>}

Supervisor.start_link([{Task.Supervisor, name: TaskSup1}], strategy: :one_for_one)
{:ok, #PID<0.123.0>}

Process.whereis(TaskSup1)
#PID<0.124.0>

Creating tasks as children

Unlike dynamic supervisors which require child specs to start children, task supervisors take in either anonymous functions or mfas which will be executed in a spawned process. Tasks can be created under a supervisor using different functions present in the Task.Supervisor module.

  • The start_child/3 function takes in a supervisor’s PID or its associated name as the first argument, a no-arg anonymous function as its second argument and an optional keyword list containing options as its third argument. The supported options for a task are :restart and :shutdown which behave the same way as when they are used in a normal supervisor. Unlike normal supervisors where the default value for the :restart option is :permanent, its default value in this module is :temporary. A task process will be spawned as a child under the given supervisor and the provided anonymous function is executed in the spawned process. The spawned task process will be linked to the supervisor and not directly to the caller process. This function immediately returns {:ok, child_pid}. It is used for executing background tasks or side-effects and it is equivalent to using the Task.start function.
Task.Supervisor.start_link(name: TaskSup, max_restarts: 2)

Task.Supervisor.start_child(TaskSup, fn -> IO.puts("Executing Task") end)
Executing Task
{:ok, #PID<0.116.0>}

Please note that using :permanent as the value for :restart option will continuously restart the task even if it executes the anonymous function successfully and exits normally, ultimately reaching the :max_restarts limit and shutting down the supervisor and all its associated children. The Task.Supervisor module also provides the start_child/5 function that takes in a mfa instead of the anonymous function.

  • The Task.Supervisor.async/3 function is equivalent to using the Task.async function which sends the result back to the caller process once the anonymous function is executed in the task process. It takes in a supervisor’s PID or its associated name as its first argument, a no-arg anonymous function as its second argument and an optional keyword list of options as its third argument and it immediately returns a Task struct. The only option supported by this function is :shutdown and the :restart value is set as :temporary. The spawned task will still be linked to the caller and abnormal exits in the task process will terminate the caller process if exits are not trapped. The results can be read using Task.await or Task.yield functions by passing in the Task struct returned by the async function.
Task.Supervisor.start_link(name: TaskSup)

task = Task.Supervisor.async(TaskSup, fn -> :result end)

Task.await(task)
:result

Similar to start_child, an alternate version async/5 is available that takes in a mfa instead of an anonymous function. The function Task.Supervisor.async_nolink/3 and Task.Supervisor.async_nolink/5 perform the same as the async function, but the spawned task process will not be directly linked with the caller process. Hence an abnormal exit in the task process will not affect the caller process.

  • The Task.Supervisor.async_stream/4 function is equivalent to using the Task.async_stream function that returns a stream which can be lazily evaluated. It takes in a supervisor’s PID or its associated name as the first argument, an enumerable as its second argument, an one-arg anonymous function as its third argument and a keyword list of options as its fourth argument. It supports all the options from the Task.async_stream function and additionally the :shutdown option. The Task.Supervisor.async_stream/6 function supports a mfa instead of an anonymous function. The tasks created for each enumerable will be linked to the caller process similar to the Task.Supervisor.async function. To avoid linking the caller process and the spawned task, the functions Task.Supervisor.async_stream_nolink/4 and Task.Supervisor.async_stream_nolink/6 can be used.
  • Other than the functions mentioned above that can be used for start tasks under a supervisor, the Task.Supervisor module offers the children/1 function that takes in a supervisor’s PID or its associated name and returns a list of all the children PID that are currently alive under the supervisor. The terminate_child/2 function takes in a supervisor’s PID or its associated name as its first argument and a child task’s pid as its second argument. It terminates the child task associated with the provided PID and returns :ok if the termination is successful.

Statically supervising tasks

Instead of creating a task supervisor and creating tasks dynamically under them, tasks can be created directly under a normal supervisor by using their child specs. Unlike using a task supervisor, statically supervised tasks cannot be awaited by the caller. Hence they are mainly used for performing background jobs and for tasks where the result is not required by the caller.

Since the Task module contains the child_spec/1 function that takes in a zero-arg anonymous function and returns a default child spec map, the tuple {Task, anonymous_function} can be added directly to the child spec list of a supervisor. The supervisor will in turn access the child spec map by calling the Task.child_spec(anonymous_function) function and will start the task by calling the function Task.start_link(anonymous_function) as provided in the :start key of the returned child spec map.

Task.child_spec(fn -> :ok end)
%{
id: Task,
restart: :temporary,
start: {Task, :start_link, [#Function<43.125776118/0 in :erl_eval.expr/6>]}
}

Supervisor.start_link([{Task, fn -> IO.puts("Task exec") end}], strategy: :one_for_one)
Task exec

User-defined custom modules can also be added directly under a supervisor if they provide a child_spec/1 function and a function definition for the :start key mfa provided in the returned child spec map.

defmodule MyTask do
def child_spec(func) do
%{
id: __MODULE__,
restart: :temporary,
start: {__MODULE__, :start_link, [func]}
}
end

def start_link(func) do
Task.start_link(func)
end
end
---------------------------------------------------------------------------
Supervisor.start_link([{MyTask, fn -> IO.puts("Task exec") end}], strategy: :one_for_one)
Task exec

The use Task macro can be used in the user-defined custom modules to inject the above specified default child_spec/1 function definition. The use Task macro also takes in the :id, :restart and :shutdown options that will be overridden in the default child spec map returned by the injected child_spec/1 function.

defmodule MyTask do
use Task

def start_link(func) do
Task.start_link(func)
end
end
---------------------------------------------------------------------------
Supervisor.start_link([{MyTask, fn -> IO.puts("Task exec") end}], strategy: :one_for_one)
Task exec

--

--