packaging for, based on
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

56 lines
1.9 KiB

%%% @doc
%%% This module contains a small parallel dispatch queue that allows
%%% to take a list of jobs and run as many of them in parallel as there
%%% are schedulers ongoing.
%%% Original design by Max Fedorov in the rebar compiler, then generalised
%%% and extracted here to be reused in other circumstances.
%%% @end
queue(Tasks, WorkF, WArgs, Handler, HArgs) ->
Parent = self(),
Worker = fun() -> worker(Parent, WorkF, WArgs) end,
Jobs = min(length(Tasks), erlang:system_info(schedulers)),
?DIAGNOSTIC("Starting ~B worker(s)", [Jobs]),
Pids = [spawn_monitor(Worker) || _ <- lists:seq(1, Jobs)],
parallel_dispatch(Tasks, Pids, Handler, HArgs).
parallel_dispatch([], [], _, _) ->
parallel_dispatch(Targets, Pids, Handler, Args) ->
{ready, Worker} when is_pid(Worker), Targets =:= [] ->
Worker ! empty,
parallel_dispatch(Targets, Pids, Handler, Args);
{ready, Worker} when is_pid(Worker) ->
[Task|Tasks] = Targets,
Worker ! {task, Task},
parallel_dispatch(Tasks, Pids, Handler, Args);
{'DOWN', Mref, _, Pid, normal} ->
NewPids = lists:delete({Pid, Mref}, Pids),
parallel_dispatch(Targets, NewPids, Handler, Args);
{'DOWN', _Mref, _, _Pid, Info} ->
?ERROR("Task failed: ~p", [Info]),
{result, Result} ->
case Handler(Result, Args) of
ok ->
parallel_dispatch(Targets, Pids, Handler, Args);
{ok, Acc} ->
[Acc | parallel_dispatch(Targets, Pids, Handler, Args)]
worker(QueuePid, F, Args) ->
QueuePid ! {ready, self()},
{task, Task} ->
QueuePid ! {result, F(Task, Args)},
worker(QueuePid, F, Args);
empty ->