Asynchronous

Messages

- Messages driven development -

Speaker

A long time symfony enthusiast from the early days

  • Web Architect @ Ekino
  • Sonata Project lead dev

- Thomas Rabaix / @th0masr

Preface

What's an object ?

What's an object ?

  • An instance of a class definition
  • Hold data
  • A state defined as object's properties
  • Contain methods to interact with its state or alter state of other objects

What's a service ?

  • Service is an object with no state, only configuration parameters
  • It can alter objects or execute external i/o operations
  • No state means: no memory leak and low impact on memory

Asynchronous

In programming, asynchronous events are those occurring independently of the main program flow. Asynchronous actions are actions executed in a non-blocking scheme, allowing the main program flow to continue processing http://msdn2.microsoft.com/en-us/library/7ch3stsw.aspx

I don't care about this, because

  • PHP is awesome!
  • my code is uber cool!
  • I use simple quote for speed!

WRONG!

  • Architecture scale

    • Modern LAMP stack : Nginx, php-fpm, memcached, redis, varnish, mongodb, mysql, solr, elasticsearch, aws, haproxy, RabbitMQ, ZeroMQ, supervisord, statsd, nodejs
    • PHP is A component, not THE component
  • PHP cannot fork or use thread

Real life situation

Boss: A client need a web service to generate a PDF from any url.

Awesome Guy: OK, no problem, I can use wkhtmltopdf to render any url, it is one hour job!

Boss: Awesome!

AG's Solution

<?php
public function generateAction(Request $request, $url)
{
    $tmpfname = sprintf("%s/screeny_output_%s", sys_get_temp_dir(), uniqid(sha1($url), true));

    $this->get('knp_snappy.pdf')->generate($url, $tmpfname);

    $response = new Response(file_get_contents($tmpfname), 200, array(
        'Content-Type'        => 'application/pdf',
        'Content-Disposition' => sprintf('attachment; filename="%s.pdf"', Inflector::tableize($url))
    ));

    unlink($tmpfname);

    return $response;
}

Result : FAIL!

  • It works!
  • ... but the client complains as the solution does not scale<
  • ... some timeout occurs - max execution time


This code cannot be used in production

Real life situation

Boss: We need to find a solution => Use a cache !!!

Awesome Guy: OK!

AG's solution

<?php
public function generateAction(Request $request, $url)
{
    $uniqid = sha1($url);
    $manager = $this->get('sonata.media.manager.media');
    $media = $manager->findOneBy(array('uniqid' => $uniqid));

    if (!$media) {
        $tmpfname = sprintf("%s/screeny_output_%s", sys_get_temp_dir(), uniqid($uniqid, true));
        $this->get('knp_snappy.pdf')->generate($url, $tmpfname);
        $media = $manager->create();
        $media->setUniqid($uniqid);
        $manager->save($media, 'screeny', 'sonata.media.provider.file');
    }

    return $this->get('sonata.media.pool')
        ->getProvider('sonata.media.provider.file')
        ->getDownloadResponse($media, 'reference', 'http', array(
            'Content-Disposition'   => sprintf('attachment; filename="%s"', Inflector::tableize($media->getName()).'.pdf'),
        ));
}

Result : FAIL!

  • It works!
  • ... but the client complains as the solution does not scale<
  • ... less timeout occurs - max execution time


This code cannot be used in production

Real life situation

Boss: argh!!! We need to buy more servers!

Awesome Guy: maybe we can try another strategy using asynchronous messages

Boss: huuuu ! what?!

Messaging Pattern : Work Queues

  • Producer : Create a message
  • Queue : Stack message
  • Consumer : Handle a message

Pattern used in the SonataNotificationBundle



Solution

Integrate a message queue

# install SonataNotificationBundle
composer require sonata-project/notification-bundle

# edit config.yml
sonata_notification:
    backend: sonata.notification.backend.rabbitmq
    backends:
        rabbitmq:
            exchange:     router
            queue:        messages
            connection:
                host:     127.0.0.1
                user:     guest
                pass:     guest
                port:     5672
                vhost:    /screening_master

# install rabbitmq
apt-get install rabbitmq-server

Create a UrlShotManager service

<?php
class UrlShotManager
{
    public function shot($url)
    {
        $uniqid = sha1($url);

        $media = $this->mediaManager->findOneBy(array('uniqid' => $uniqid));

        if (!$media) {
            $tmpfname = sprintf("%s/screeny_output_%s", sys_get_temp_dir(), uniqid($uniqid, true));

            $this->generator->generate($url, $tmpfname);

            $this->mediaManager->save($media, 'screeny', 'sonata.media.provider.file');
        }

        return $media;
    }
}

Create a Consumer service

<?php
class UrlShotConsumer implements ConsumerInterface
{
   public function process(ConsumerEvent $event)
   {
       $url = $event->getMessage()->getValue('url');

       if (!$url) {
           return;
       }

       try {
           $this->manager->shot($url);
       } catch(\Exception $e) {

       }
   }
}

Produce a message

<?php
public function generateAction(Request $request, $url)
{
   $this->get('sonata.notification.backend')
       ->createAndPublish('url_shot', array(
           'url' => $url
       ));

   return new JsonResponse(array('status' => 'OK'));
}

Sonata Notification Bundle

  • One queue with multiple consumers registered through the Symfony Event Dispatcher
  • Multiple Backends: runtime, database and rabbitmq
  • Clean API but implements only 1 pattern

What if the logic becomes more complex:

  • The PDF generator need metrics
  • Add logs on errors
  • Send confirmation emails

UrlShotConsumer

<?php
class UrlShotConsumer implements ConsumerInterface
{
   public function process(ConsumerEvent $event)
   {
       $url = $event->getMessage()->getValue('url');

       if (!$url) { return; }

       try {
           $media = $this->manager->shot($url);
           $this->logger->info(sprintf("OK - The PDF has been generated - media.id = %d - $url", $media->getId(), $url));
           $this->mailer->send("client@service.com", "The PDF has been generated", "message");
           $this->metric->inc('pdf.generated.success')
       } catch(\Exception $e) {

           $this->logger->error(sprintf("ERROR - The PDF has not been generated - media.id = %d - $url", $media->getId(), $url));
           $this->mailer->send("client@service.com", "The PDF has not been generated", "message");
           $this->metric->inc('pdf.generated.error')
       }
   }
}

WRONG!

  • The current implementation works but ...
  • ... too much logic inside the consumer
  • ... what if more logic needs to be added
  • ... how the logging solution works with simultaneous accesses

RabbitMQ

... is open source message broker software (i.e., message-oriented middleware) that implements the Advanced Message Queuing Protocol (AMQP) standard.
The RabbitMQ server is written in Erlang and is built on the Open Telecom Platform framework for clustering and failover

Definitions

  • Producer : Create a message
  • Exchange : Connect a producer with queues
  • Queue : Stack message
  • Consumer : Handle a message

Exchange types: direct, fanout and topic

Exchange Types


note: for more information, please review http://www.rabbitmq.com/tutorials/amqp-concepts.html

RabbitMQ Bundle

Integrates more RabbitMQ features inside Symfony2,
maintained by Alvaro Videla
- Co author of RabbitMQ in Action and
co author of php-amqplib -


  • Queue declarations
  • Exchanges types definition
  • RPC Call

Configuration

old_sound_rabbit_mq:
  producers:
    call_shot:
      connection: default
      exchange_options: {name: 'shot_direct', type: direct}

  consumers:
    generate_shot:
      connection:       default
      exchange_options: {name: 'shot_direct', type: direct}
      queue_options:    {name: 'generate_shot'}
      callback:         sonata_screeny.old_sound_rabbit_mq.generate_shot

    generate_shot_log:
      connection:       default
      exchange_options: {name: 'shot_direct', type: direct}
      queue_options:    {name: 'generate_shot_log'}
      callback:         sonata_screeny.old_sound_rabbit_mq.generate_shot_log

    generate_shot_mail:
      connection:       default
      exchange_options: {name: 'shot_direct', type: direct}
      queue_options:    {name: 'generate_shot_mail'}
      callback:         sonata_screeny.old_sound_rabbit_mq.generate_shot_mail

Commands

# Start the PDF consumer
app/console rabbitmq:consumer generate_shot --route=shot_direct.start

# Start the log consumer
app/console rabbitmq:consumer generate_shot_log --route=shot_direct.start

# Start the confirmation email consumer
app/console rabbitmq:consumer generate_shot_mail --route=shot_direct.completed

Abstraction is your enemy

  • NotificationBundle or RabbitMQbundle are a good start, not really mature for all use cases
  • Use php-amqplib for more convenience and powerfull workflow
  • Use raw php script or symfony command to access amqplib features!!
  • Use other programming languages as well!

Ping Command

<?php
$con = new AMQPConnection('localhost', 5672, 'guest', 'guest', '/test_payload');

$ch = $connection->channel();
$ch->queue_declare('super_bus', false, true, false, false);
$ch->exchange_declare('starting_point', 'direct', false, true, false);
$ch->queue_bind('super_bus', 'starting_point');

while (1) {
    $body = json_encode(array(
        'command' => 'foobar',
        'sleep'   => 1200,
        'body'     => str_repeat(uniqid(), rand(1, 500)),
    ));

    $amq = new AMQPMessage($body, array(
        'content_type' => 'text/plain',
        'delivery-mode' => 2
    ));

    $channel->basic_publish($amq, 'starting_point');
}

Pong Command

<?php
$conn = new AMQPConnection('localhost', 5672, 'guest', 'guest', '/test_payload');

$ch = $conn->channel();
$ch->queue_declare('super_bus', false, true, false, false);
$ch->exchange_declare('starting_point', 'direct', false, true, false);
$ch->queue_bind('super_bus', 'starting_point');

$ch->basic_consume('super_bus', 'pong_command', false, false, false, false, function(AMQPMessage $message) {
  echo "Payload size" + strlen($message->body) + "\n";

  $message
    ->delivery_info['channel']
    ->basic_ack($message->delivery_info['delivery_tag']);
});

// Loop as long as the channel has callbacks registered
while (count($chan->callbacks)) {
    $channel->wait();
}

Long process and PHP

  • Avoid using stack
    • monolog with crossfinger logger
    • Doctrine ORM (clear the Unit of work)
    • You need to know how lib internally behaves
  • Connection timeout
  • Use a process control system
    • supervisord: simple solution in python
    • god: a more complex solution (CPU and memory control) in ruby

QUESTIONS

?

THE END

References