Parallel execution

To tell the engine to process in parallel you have to mark a transition async and provide an AsyncTransition implementation. It could be a solution based on process forking or queue messaging.
PVM provides an integration with Enqueue Messaging Library to execute tasks in parallel. Some extra packages have to be installed.
In the example above the foo task is executed in same process where bar is not, instead, the message is sent to the queue. Then the message is picked up by a consumer and gets processed.

Setup


mkdir ~/pvm-parallel-execution
cd ~/pvm-parallel-execution

composer req formapro/pvm:0.4.x-dev makasim/values:0.5.x-dev makasim/yadm:0.5.x-dev
composer req enqueue/simple-client:^0.8 enqueue/fs:^0.8 enqueue/enqueue:^0.8.33

# create such files with the code below
php parallel-execution.php
php handle-async-transition.php

Result

parallel-execution.php

Connected to mongodb://mongo/pvm_demo
Connected to amqp://guest:guest@rabbitmq:5672/pvm_demo
a_task

handle-async-transition.php

Connected to mongodb://mongo/pvm_demo
Connected to amqp://guest:guest@rabbitmq:5672/pvm_demo
async_task

parallel-execution.php


<?php
use Enqueue\Client\Config;
use 
Enqueue\SimpleClient\SimpleClient;
use 
Formapro\Pvm\DefaultBehaviorRegistry;
use 
Formapro\Pvm\Enqueue\AsyncTransition;
use 
Formapro\Pvm\Enqueue\HandleAsyncTransitionProcessor;
use 
Formapro\Pvm\Process;
use 
Formapro\Pvm\ProcessEngine;
use 
Formapro\Pvm\Token;
use 
Formapro\Pvm\ProcessBuilder;
use 
Formapro\Pvm\Yadm\InProcessDAL;
use 
Formapro\Pvm\Yadm\TokenLocker;
use 
Makasim\Yadm\CollectionFactory;
use 
Makasim\Yadm\Hydrator;
use 
Makasim\Yadm\PessimisticLock;
use 
Makasim\Yadm\Storage;

require_once 
__DIR__.'/vendor/autoload.php';

$mongoDsn getenv('MONGO_DSN');
$mongoClient = new \MongoDB\Client($mongoDsn);
$processCollection = (new CollectionFactory($mongoClient$mongoDsn))->create('pvm_process');
$processStorage = new Storage($processCollection, new Hydrator(Process::class));

$processLockCollection = (new CollectionFactory($mongoClient$mongoDsn))->create('pvm_token_lock');
echo 
'Connected to '.$mongoDsn.PHP_EOL;

$enqueueDsn getenv('ENQUEUE_DSN');
$enqueueClient = new SimpleClient($enqueueDsn);
echo 
'Connected to '.$enqueueDsn.PHP_EOL;

$process = (new ProcessBuilder())
    ->
createNode('a_task''print_label')->end()
    ->
createNode('async_task''print_label')->end()
    ->
createTransition('a_task''async_task')
        ->
setAsync(true)
    ->
end()
    ->
createStartTransition('a_task')->end()

    ->
getProcess()
;

$processStorage->insert($process);

$registry = new DefaultBehaviorRegistry([
    
'print_label' => function(Token $token) {
        echo 
$token->getTo()->getId().PHP_EOL;
    },
]);

$dal = new InProcessDAL($processStorage);
$tokenLocker = new TokenLocker(new PessimisticLock($processLockCollection));

$engine = new ProcessEngine($registry$dal, new AsyncTransition($enqueueClient->getProducer()));

$enqueueClient->bind(
    
Config::COMMAND_TOPIC,
    
HandleAsyncTransitionProcessor::COMMAND,
    new 
HandleAsyncTransitionProcessor($engine$tokenLocker)
);
$enqueueClient->setupBroker();

$token $engine->createTokenFor($process->getStartTransition());
$engine->proceed($token);

handle-async-transition.php


<?php
use Enqueue\Client\Config;
use 
Enqueue\Consumption\ChainExtension;
use 
Enqueue\Consumption\Extension\LimitConsumptionTimeExtension;
use 
Enqueue\SimpleClient\SimpleClient;
use 
Formapro\Pvm\DefaultBehaviorRegistry;
use 
Formapro\Pvm\Enqueue\AsyncTransition;
use 
Formapro\Pvm\Enqueue\HandleAsyncTransitionProcessor;
use 
Formapro\Pvm\Process;
use 
Formapro\Pvm\ProcessEngine;
use 
Formapro\Pvm\Token;
use 
Formapro\Pvm\Yadm\InProcessDAL;
use 
Formapro\Pvm\Yadm\TokenLocker;
use 
Makasim\Yadm\CollectionFactory;
use 
Makasim\Yadm\Hydrator;
use 
Makasim\Yadm\PessimisticLock;
use 
Makasim\Yadm\Storage;

require_once 
__DIR__.'/vendor/autoload.php';

$mongoDsn getenv('MONGO_DSN');
$mongoClient = new \MongoDB\Client($mongoDsn);
$processCollection = (new CollectionFactory($mongoClient$mongoDsn))->create('pvm_process');
$processStorage = new Storage($processCollection, new Hydrator(Process::class));
$processLockCollection = (new CollectionFactory($mongoClient$mongoDsn))->create('pvm_token_lock');
echo 
'Connected to '.$mongoDsn.PHP_EOL;

$enqueueDsn getenv('ENQUEUE_DSN');
$enqueueClient = new SimpleClient($enqueueDsn);
echo 
'Connected to '.$enqueueDsn.PHP_EOL;

$registry = new DefaultBehaviorRegistry([
    
'print_label' => function(Token $token) {
        echo 
$token->getTo()->getId().PHP_EOL;
    },
]);

$dal = new InProcessDAL($processStorage);
$tokenLocker = new TokenLocker(new PessimisticLock($processLockCollection));

$engine = new ProcessEngine($registry$dal, new AsyncTransition($enqueueClient->getProducer()));

$enqueueClient->bind(
    
Config::COMMAND_TOPIC,
    
HandleAsyncTransitionProcessor::COMMAND,
    new 
HandleAsyncTransitionProcessor($engine$tokenLocker)
);

$enqueueClient->setupBroker();
$enqueueClient->consume(new ChainExtension([
    new 
LimitConsumptionTimeExtension(new \DateTime('now + 3 seconds'))
]));
{
    "schema": "http:\/\/pvm.forma-pro.com\/schemas\/Process.json",
    "id": "24fecb29-8dc7-4a08-8e94-99b26814582c",
    "nodes": {
        "a_task": {
            "schema": "http:\/\/pvm.forma-pro.com\/schemas\/Node.json",
            "id": "a_task",
            "behavior": "print_label"
        },
        "async_task": {
            "schema": "http:\/\/pvm.forma-pro.com\/schemas\/Node.json",
            "id": "async_task",
            "behavior": "print_label"
        }
    },
    "transitions": {
        "d804d2a8-1251-47e2-9969-8edcb2a4fdb7": {
            "id": "d804d2a8-1251-47e2-9969-8edcb2a4fdb7",
            "weight": 1,
            "async": true,
            "active": true,
            "schema": "http:\/\/pvm.forma-pro.com\/schemas\/Transition.json",
            "from": "a_task",
            "to": "async_task"
        },
        "8d9fbffc-f528-4160-a0d7-ce45ce28ad99": {
            "id": "8d9fbffc-f528-4160-a0d7-ce45ce28ad99",
            "weight": 1,
            "async": false,
            "active": true,
            "schema": "http:\/\/pvm.forma-pro.com\/schemas\/Transition.json",
            "to": "a_task"
        }
    },
    "outTransitions": {
        "a_task": [
            "d804d2a8-1251-47e2-9969-8edcb2a4fdb7"
        ]
    },
    "inTransitions": {
        "async_task": [
            "d804d2a8-1251-47e2-9969-8edcb2a4fdb7"
        ],
        "a_task": [
            "8d9fbffc-f528-4160-a0d7-ce45ce28ad99"
        ]
    },
    "tokens": {
        "ca0e3507-b79d-4bc7-baa7-b8a344452b56": {
            "schema": "http:\/\/pvm.forma-pro.com\/schemas\/Token.json",
            "id": "ca0e3507-b79d-4bc7-baa7-b8a344452b56",
            "transitions": [
                {
                    "schema": "http:\/\/pvm.forma-pro.com\/schemas\/TokenTransition.json",
                    "id": "9e181202-4fcd-4d56-ab1b-aefca69ab8c2",
                    "transitionId": "8d9fbffc-f528-4160-a0d7-ce45ce28ad99",
                    "weight": 1,
                    "state": "opened",
                    "time": 15421485894728
                },
                {
                    "schema": "http:\/\/pvm.forma-pro.com\/schemas\/TokenTransition.json",
                    "id": "6310dcba-820b-4c22-aac9-ce1dd58ab004",
                    "transitionId": "8d9fbffc-f528-4160-a0d7-ce45ce28ad99",
                    "weight": 1,
                    "state": "passed",
                    "time": 15421485894733
                },
                {
                    "schema": "http:\/\/pvm.forma-pro.com\/schemas\/TokenTransition.json",
                    "id": "4af37d8a-3620-494b-8b4d-eb60f6139925",
                    "transitionId": "d804d2a8-1251-47e2-9969-8edcb2a4fdb7",
                    "weight": 1,
                    "state": "opened",
                    "time": 15421485894735
                }
            ]
        }
    }
}