To tell the engine to process in parallel you have to mark a transition async and provide an AsyncTransition implementation.
It could be a solution based on process forking or queue messaging.
PVM provides an integration with Enqueue Messaging Library to execute tasks in parallel.
Some extra packages have to be installed.
In the example above the foo task is executed in same process where bar is not, instead, the message is sent to the queue.
Then the message is picked up by a consumer and gets processed.
mkdir ~/pvm-parallel-execution
cd ~/pvm-parallel-execution
composer req formapro/pvm:0.4.x-dev makasim/values:0.5.x-dev makasim/yadm:0.5.x-dev
composer req enqueue/simple-client:^0.8 enqueue/fs:^0.8 enqueue/enqueue:^0.8.33
# create such files with the code below
php parallel-execution.php
php handle-async-transition.php
parallel-execution.php
Connected to mongodb://mongo/pvm_demo
Connected to amqp://guest:guest@rabbitmq:5672/pvm_demo
a_task
handle-async-transition.php
Connected to mongodb://mongo/pvm_demo
Connected to amqp://guest:guest@rabbitmq:5672/pvm_demo
async_task
parallel-execution.php
<?php
use Enqueue\Client\Config;
use Enqueue\SimpleClient\SimpleClient;
use Formapro\Pvm\DefaultBehaviorRegistry;
use Formapro\Pvm\Enqueue\AsyncTransition;
use Formapro\Pvm\Enqueue\HandleAsyncTransitionProcessor;
use Formapro\Pvm\Process;
use Formapro\Pvm\ProcessEngine;
use Formapro\Pvm\Token;
use Formapro\Pvm\ProcessBuilder;
use Formapro\Pvm\Yadm\InProcessDAL;
use Formapro\Pvm\Yadm\TokenLocker;
use Makasim\Yadm\CollectionFactory;
use Makasim\Yadm\Hydrator;
use Makasim\Yadm\PessimisticLock;
use Makasim\Yadm\Storage;
require_once __DIR__.'/vendor/autoload.php';
$mongoDsn = getenv('MONGO_DSN');
$mongoClient = new \MongoDB\Client($mongoDsn);
$processCollection = (new CollectionFactory($mongoClient, $mongoDsn))->create('pvm_process');
$processStorage = new Storage($processCollection, new Hydrator(Process::class));
$processLockCollection = (new CollectionFactory($mongoClient, $mongoDsn))->create('pvm_token_lock');
echo 'Connected to '.$mongoDsn.PHP_EOL;
$enqueueDsn = getenv('ENQUEUE_DSN');
$enqueueClient = new SimpleClient($enqueueDsn);
echo 'Connected to '.$enqueueDsn.PHP_EOL;
$process = (new ProcessBuilder())
->createNode('a_task', 'print_label')->end()
->createNode('async_task', 'print_label')->end()
->createTransition('a_task', 'async_task')
->setAsync(true)
->end()
->createStartTransition('a_task')->end()
->getProcess()
;
$processStorage->insert($process);
$registry = new DefaultBehaviorRegistry([
'print_label' => function(Token $token) {
echo $token->getTo()->getId().PHP_EOL;
},
]);
$dal = new InProcessDAL($processStorage);
$tokenLocker = new TokenLocker(new PessimisticLock($processLockCollection));
$engine = new ProcessEngine($registry, $dal, new AsyncTransition($enqueueClient->getProducer()));
$enqueueClient->bind(
Config::COMMAND_TOPIC,
HandleAsyncTransitionProcessor::COMMAND,
new HandleAsyncTransitionProcessor($engine, $tokenLocker)
);
$enqueueClient->setupBroker();
$token = $engine->createTokenFor($process->getStartTransition());
$engine->proceed($token);
handle-async-transition.php
<?php
use Enqueue\Client\Config;
use Enqueue\Consumption\ChainExtension;
use Enqueue\Consumption\Extension\LimitConsumptionTimeExtension;
use Enqueue\SimpleClient\SimpleClient;
use Formapro\Pvm\DefaultBehaviorRegistry;
use Formapro\Pvm\Enqueue\AsyncTransition;
use Formapro\Pvm\Enqueue\HandleAsyncTransitionProcessor;
use Formapro\Pvm\Process;
use Formapro\Pvm\ProcessEngine;
use Formapro\Pvm\Token;
use Formapro\Pvm\Yadm\InProcessDAL;
use Formapro\Pvm\Yadm\TokenLocker;
use Makasim\Yadm\CollectionFactory;
use Makasim\Yadm\Hydrator;
use Makasim\Yadm\PessimisticLock;
use Makasim\Yadm\Storage;
require_once __DIR__.'/vendor/autoload.php';
$mongoDsn = getenv('MONGO_DSN');
$mongoClient = new \MongoDB\Client($mongoDsn);
$processCollection = (new CollectionFactory($mongoClient, $mongoDsn))->create('pvm_process');
$processStorage = new Storage($processCollection, new Hydrator(Process::class));
$processLockCollection = (new CollectionFactory($mongoClient, $mongoDsn))->create('pvm_token_lock');
echo 'Connected to '.$mongoDsn.PHP_EOL;
$enqueueDsn = getenv('ENQUEUE_DSN');
$enqueueClient = new SimpleClient($enqueueDsn);
echo 'Connected to '.$enqueueDsn.PHP_EOL;
$registry = new DefaultBehaviorRegistry([
'print_label' => function(Token $token) {
echo $token->getTo()->getId().PHP_EOL;
},
]);
$dal = new InProcessDAL($processStorage);
$tokenLocker = new TokenLocker(new PessimisticLock($processLockCollection));
$engine = new ProcessEngine($registry, $dal, new AsyncTransition($enqueueClient->getProducer()));
$enqueueClient->bind(
Config::COMMAND_TOPIC,
HandleAsyncTransitionProcessor::COMMAND,
new HandleAsyncTransitionProcessor($engine, $tokenLocker)
);
$enqueueClient->setupBroker();
$enqueueClient->consume(new ChainExtension([
new LimitConsumptionTimeExtension(new \DateTime('now + 3 seconds'))
]));
{ "schema": "http:\/\/pvm.forma-pro.com\/schemas\/Process.json", "id": "dcc5390b-880a-4ff3-b478-effcb3c0e072", "nodes": { "a_task": { "schema": "http:\/\/pvm.forma-pro.com\/schemas\/Node.json", "id": "a_task", "behavior": "print_label" }, "async_task": { "schema": "http:\/\/pvm.forma-pro.com\/schemas\/Node.json", "id": "async_task", "behavior": "print_label" } }, "transitions": { "a5a72c48-ab53-48f9-848c-374a590df8b5": { "id": "a5a72c48-ab53-48f9-848c-374a590df8b5", "weight": 1, "async": true, "active": true, "schema": "http:\/\/pvm.forma-pro.com\/schemas\/Transition.json", "from": "a_task", "to": "async_task" }, "af994117-917a-4670-abb1-9110aea54933": { "id": "af994117-917a-4670-abb1-9110aea54933", "weight": 1, "async": false, "active": true, "schema": "http:\/\/pvm.forma-pro.com\/schemas\/Transition.json", "to": "a_task" } }, "outTransitions": { "a_task": [ "a5a72c48-ab53-48f9-848c-374a590df8b5" ] }, "inTransitions": { "async_task": [ "a5a72c48-ab53-48f9-848c-374a590df8b5" ], "a_task": [ "af994117-917a-4670-abb1-9110aea54933" ] }, "tokens": { "c635fff1-a471-49a0-904f-6eb3db512f70": { "schema": "http:\/\/pvm.forma-pro.com\/schemas\/Token.json", "id": "c635fff1-a471-49a0-904f-6eb3db512f70", "transitions": [ { "schema": "http:\/\/pvm.forma-pro.com\/schemas\/TokenTransition.json", "id": "716d8779-412c-40a9-920a-bb9aa36fe19c", "transitionId": "af994117-917a-4670-abb1-9110aea54933", "weight": 1, "state": "opened", "time": 16107338820149 }, { "schema": "http:\/\/pvm.forma-pro.com\/schemas\/TokenTransition.json", "id": "cac5891b-d81b-4a0f-b8dd-ff029f91e7db", "transitionId": "af994117-917a-4670-abb1-9110aea54933", "weight": 1, "state": "passed", "time": 16107338820154 }, { "schema": "http:\/\/pvm.forma-pro.com\/schemas\/TokenTransition.json", "id": "7f11cf01-f1dc-4f6e-ae96-5545988a19f8", "transitionId": "a5a72c48-ab53-48f9-848c-374a590df8b5", "weight": 1, "state": "opened", "time": 16107338820156 } ] } } }