Asynchronous programming is very popular nowadays, mostly popularized by Javascript in Node.JS. Having single thread in Javascript leaves no way to do parallel things. Same picture is in PHP, where request is handled by single process or thread.

Good news is that PHP is not years behind. There are already async extensions and libraries, for example ReactPHP.

Our goal can easily be reached using ReactPHP based on callbacks or promises, but I want to add more magic with RxPHP. It is outstanding approach to solve daily tasks in a way you might never thought before.

So let's describe ultimate goals and move on:  Efficially query remote Rest API via HTTP GET.

On the backend quite often you face a problem of doing some stuff in parallel. Integration with 3rd party APIs quite often requires make multiple calls to achieve one goal. Latency and big number of calls can kill performance of every app with standard PHP approach like CURL. Ironically most time application will idle, waiting for network.

# Simple example

To get better idea what is Rx or ReactiveX, let's look at the following example, which shows how Rx works and what are major operators.

Basically it just grabs some data from API, does some filtering and output.

<?php
/**
 * Simple fetch, filter and map example
 */
require __DIR__ . '/vendor/autoload.php';

use \Rx\Observable as Observable;
use \Rx\React\Http as Http;


Observable::of('https://jsonplaceholder.typicode.com/posts')
    ->flatMap(function ($url) {
        return Http::get($url);
    })
    ->map('json_decode')
    ->flatMap(function($posts) {
        return Observable::fromArray($posts);
    })
    ->filter(function($post) {
        return strlen($post->title) <= 20;
    })
    ->subscribe(function ($post) {
        echo $post->title . "<br/>";
    });

Simple? Yes. We just use couple of operators “just”, “flatMap”, “map”, “filter”, “subscribe”.  Data follows the chain and is very visual.

You can find out more examples and documentation at : http://reactivex.io/

# Parallel example

In this example we will query same “posts” endpoint but goal is to extract email addresses for every comment. This requires one call for “posts" and multiple for “comments”, where “comments” are not depended on each other and can be run in parallel.

<?php
/**
 * Parallel fetch example
 */

require __DIR__ . '/vendor/autoload.php';

use \Rx\Observable as Observable;
use \Rx\React\Http as Http;

Observable::of('https://jsonplaceholder.typicode.com/posts')
    ->flatMap(function ($url) {
        return Http::get($url);
    })
    ->map('json_decode')
    ->flatMap(function($posts) {
        return Observable::fromArray($posts);
    })
    ->take(5)
    ->flatMap(function ($post) {
        return Http::get(sprintf('https://jsonplaceholder.typicode.com/posts/%d/comments', $post->id));
    })
    ->map('json_decode')
    ->flatMap(function($comments) {
        return Observable::fromArray($comments);
    })
    ->map(function($comment) {
        return $comment->email;
    })
    ->distinct()
    ->subscribe(function($email) {
        echo $email . "<br/>\n";
    });

Very similar to first example, just added more steps to do additional calls.  You may notice two new operators. “Distinct” easy way to exclude duplicates. And “take” means take only first 5 posts. Why first 5?  Let's move to next chapter.

# Throttling

Almost all API servers do not like many parallel requests, it puts unwanted load on the system. Eventually your app would get blocked if not respect connection limits,

In example above I have “->take(5)”, which just takes 5 first posts. I did this intentionally, because Rx will start as many connections as data items are available. Which is good for speed, but might get us to trouble.

How to do throttling? Simple approach using groupBy operator, all HTTP requests are organized in defined number of queues. In this case its “2”.

->groupBy(function () {
   static $index = 0;return $index++ % 2; // Limit by 2 concurrent requests
 })
 ->flatMap(function (Observable\GroupedObservable $go) {
   return $go->concatAll();
 })

Full listing

<?php
/**
 * Parallel fetch with connection limiter
 */
require __DIR__ . '/vendor/autoload.php';

use \Rx\Observable as Observable;
use \Rx\React\Http as Http;

Observable::of('https://jsonplaceholder.typicode.com/posts')
    ->flatMap(function ($url) {
        return Http::get($url);
    })
    ->map('json_decode')
    ->flatMap(function($posts) {
        return Observable::fromArray($posts);
    })
    ->map(function ($post) {
        return Http::get(sprintf('https://jsonplaceholder.typicode.com/posts/%d/comments', $post->id));
    })
    ->groupBy(function () {
        static $index = 0;return $index++ % 5; // Limit by 5 concurrent requests
    })
    ->flatMap(function (Observable\GroupedObservable $go) {
        return $go->concatAll();
    })
    ->map('json_decode')
    ->flatMap(function($comments) {
        return Observable::fromArray($comments);
    })
    ->map(function($comment) {
        return $comment->email;
    })
    ->distinct()
    ->subscribe(function($email) {
        echo $email . "<br/>\n";
    });

This technique is very useful in access remove APIs or doing data scraping. Hope you read something new for you.

And one more thing )

# The Loop

If you try to put example above into function and return some result, most likely you will see empty array.  Like below:

function () {
   $result = [];

   Observable::just(
       ->subscribe(
           function ($post) use (&$result) {
               $result[] = $post->title;
           }
       );

   return $result;
}

Why? Rx being asynchronous,  works in global loop. This loop is always being created and executes. In my simple examples its done behind the scenes, it is created in first Observable constructor and executes on script shutdown hook, that’s why it works.

To fix function above, we need two lines of code to create loop and execute it.

function ()
 {
    $result = [];

   $loop = \EventLoop\getLoop();

   Observable::just(...
        ->subscribe(
            function ($post) use (&$result) {
                $result[] = $post->title;
            }
        );

   $loop->run();

   return $result;
 }

Examples from above

  • https://github.com/ameoba32/RxHTTP-examples
Nest of RxPHP 😃
  • https://github.com/ReactiveX/RxPHP/
Cool RxHTTP implementation
  • https://github.com/RxPHP/RxHttp

# Special thanks to

https://github.com/davidwdan

https://github.com/luijar

https://github.com/mbonneau

Book: PHP Reactive Programming By Martin Sikora