rebar3/src/rebar_parallel.erl

57 lines
1.9 KiB
Erlang
Raw Normal View History

2021-12-27 19:09:21 +00:00
%%% @doc
%%% This module contains a small parallel dispatch queue that allows
%%% to take a list of jobs and run as many of them in parallel as there
%%% are schedulers ongoing.
%%%
%%% Original design by Max Fedorov in the rebar compiler, then generalised
%%% and extracted here to be reused in other circumstances.
%%% @end
-module(rebar_parallel).
-export([queue/5]).
-include("rebar.hrl").
queue(Tasks, WorkF, WArgs, Handler, HArgs) ->
Parent = self(),
Worker = fun() -> worker(Parent, WorkF, WArgs) end,
Jobs = min(length(Tasks), erlang:system_info(schedulers)),
?DIAGNOSTIC("Starting ~B worker(s)", [Jobs]),
Pids = [spawn_monitor(Worker) || _ <- lists:seq(1, Jobs)],
parallel_dispatch(Tasks, Pids, Handler, HArgs).
parallel_dispatch([], [], _, _) ->
[];
parallel_dispatch(Targets, Pids, Handler, Args) ->
receive
{ready, Worker} when is_pid(Worker), Targets =:= [] ->
Worker ! empty,
parallel_dispatch(Targets, Pids, Handler, Args);
{ready, Worker} when is_pid(Worker) ->
[Task|Tasks] = Targets,
Worker ! {task, Task},
parallel_dispatch(Tasks, Pids, Handler, Args);
{'DOWN', Mref, _, Pid, normal} ->
NewPids = lists:delete({Pid, Mref}, Pids),
parallel_dispatch(Targets, NewPids, Handler, Args);
{'DOWN', _Mref, _, _Pid, Info} ->
?ERROR("Task failed: ~p", [Info]),
?ABORT;
{result, Result} ->
case Handler(Result, Args) of
ok ->
parallel_dispatch(Targets, Pids, Handler, Args);
{ok, Acc} ->
[Acc | parallel_dispatch(Targets, Pids, Handler, Args)]
end
end.
worker(QueuePid, F, Args) ->
QueuePid ! {ready, self()},
receive
{task, Task} ->
QueuePid ! {result, F(Task, Args)},
worker(QueuePid, F, Args);
empty ->
ok
end.