Mojolicous, render_later and weaken transactions

When using render_later method, did you ever encountered this Mojolicious error?

Can't call method "res" on an undefined value at /home/max/perl5/lib/perl5/Mojolicious/Controller.pm line 275.

Read the rest of this entry »

REFCOUNTers and AnyEvent…

Perl use reference counting to destroy unused objects. Each times an object is referenced, the reference counter of the object is increased by one. At the opposite, when a reference to the object is destroyed, the reference counter of the object is decreased by one. When the reference counter reaches 0, the object is destroyed.

This is a cool feature but sometimes, we have to be very careful especially when we over-abuse of closures as we like to do it in AnyEvent, for example…

Because when an object references another one, that itself references the previous one, the two objects will never be destroyed automatically… With closures it can happen very quickly…

Take that code:

use 5.010;
use strict;
use warnings;

use AnyEvent;

my $cv = AE::cv;

{
    # Instanciate an object ($obj.REFCOUNT=1)
    my $obj = Test->new;

    # Create a new reference on it ($obj.REFCOUNT=2)
    my $obj_dup = $obj;

    # Create a timer: every seconds, print a counter
    # ($obj.COUNTER=3 since it is used in the timer callback/closure)
    $obj->{watcher} = AE::timer(1, 1, sub { say ++$obj_dup->{count} });

    # Keep the main condvar reference so we will be able to stop the
    # event loop when the instance won't be in use by anyone (see Test
    # DESTROY method)...
    $obj->{condvar} = $cv;
}
# $obj.REFCOUNT=1 since $obj and $obj_dup no longer exist, but the
# callback is still active since it remains a reference to the Test
# instance in the timer callback/closure.

# THE event loop...
$cv->recv;


package Test;

use Carp;

sub new
{
    return bless {}, shift;
}

# Called when perl automatically destroy the object instance
sub DESTROY
{
    my $self = shift;

    # Stop the event loop...
    $self->{condvar}->send;

    carp "DESTROYed!";
}

If you launch it, you will see that it will never end… Printing 1, 2, 3… The timer callback never stops. Even when $obj and $obj_dup go out of scope, the Test instance remains alive…

The reason is that, after the creation of the two first references $obj and $obj_dup, the instance used in the timer closure increased the reference counter by one, so it became 3. When the two references $obj and $obj_dup went out of scope, the reference counter decreased by 2, so it became 1… Forever…

We have here a cyclic reference:

Test instance -> timer watcher -> closure -> Test instance -> etc.

So the Test instance can not be destroyed…

The solution: use a weak reference.

A weak reference is like a normal reference, but it does not increase the reference counter. And when a reference counter of an object decreases to 0, perl automatically set to undef all weak references that referenced it…

To create a weak reference, you can use the weaken() function of Scalar::Util module. So the previous code becomes:

use 5.010;
use strict;
use warnings;

use Scalar::Util;

use AnyEvent;

my $cv = AE::cv;

# Yes, now we declare this variable out of the following scope, just
# to see the undef effect...
my $obj_dup;

{
    # Instanciate an object ($obj.REFCOUNT=1)
    my $obj = Test->new;

    # Create a new reference on it ($obj.REFCOUNT=2)
    $obj_dup = $obj;

    # Make $obj_dup a weak reference ($obj.REFCOUNT=1)
    Scalar::Util::weaken($obj_dup);

    # Create a timer: every seconds, print a counter
    # As we use a weak reference in the timer callback/closure, the
    # reference counter does not increase.
    $obj->{watcher} = AE::timer(1, 1, sub { say ++$obj_dup->{count} });

    # Keep the main condvar reference so we will be able to stop the
    # event loop when the instance won't be in use by anyone (see Test
    # DESTROY method)...
    $obj->{condvar} = $cv;
}
# $obj.REFCOUNT=0 since $obj no longer exist => Test::DESTROY is called.

# And $obj_dup becomes undef
unless (defined $obj_dup)
{
    warn '$obj_dup now undefined!';
}

# THE event loop...
$cv->recv;


package Test;

use Carp;

sub new
{
    return bless {}, shift;
}

# Called when perl automatically destroy the object instance
sub DESTROY
{
    my $self = shift;

    # Stop the event loop...
    $self->{condvar}->send;

    carp "DESTROYed!";
}

In this case the event loop stops immediately.

When $obj goes out of scope, the reference counter of the instance decrease from 1 to 0, so perl destroys the instance and so calls Test::DESTROY() which prints “DESTROYed! at – line 16”. Then it undefines $obj_dup so we print “$obj_dup now undefined! at – line 39.”.

The job is done, but be careful!!! :)

Proxy dispatcher for HTTP/SSL *and* SSH

Peteris Krumins just told us how he helps one of his friends to bypass a firewall to do SSH through the port 443 (HTTP/SSL one).

Last year, I did a proof of concept of a proxy that will listen on port 443 and forward the data on the internal HTTP server or SSH server, based on the client behavior without decoding anything.

To achieve this, I used AnyEvent, fantastic event loop manager…

One things to know is that when doing HTTP or HTTP over SSL, it is the client that first talk to the server, doing like:

GET /index.html HTTP/1.1
Host: www.ijenko.com
...

With SSH, the server announces itself, like that:

SSH-2.0-OpenSSH_5.4p1 FreeBSD-20100308

waiting then for client data…

So our proxy just has to wait a little bit after accepting the client connection (here we wait 0.5 seconds) before deciding what to do.

If the client talk during this time, it probably wants to do HTTP, if not it probably wants to do SSH.

The delay only impact SSH connections and only at the first step.

So reconfigure your HTTP server to only listen on localhost, then launch the proxy with the network side address.

Note that you can change the proxy to connect to different hosts than the local one (here 127.1), it’s up to you.

Enjoy… :-)

Just keep in mind that all connections to your internal HTTP and SSH servers will be coming from the proxy, you will not be able to know the real source, only the proxy knows…

use strict;
use warnings;

use AnyEvent;
use AnyEvent::Socket;
use AnyEvent::Handle;

die "usage: $0 BIND_IP_ADDRESS\n" if @ARGV != 1;

my $ip_address = shift;

use constant DEBUG => 1;

use constant {
    BIND_PORT   => 443,

    SSL_PORT    => 443,
    SSH_PORT    => 22,
};

tcp_server($ip_address, BIND_PORT, sub
           {
               my($fh, $host, $port) = @_;

               my $cnx = Cnx->new;

               $cnx->client_handle(
                   AnyEvent::Handle->new(
                       fh          => $fh,
                       rtimeout    => 0.5,
                       on_error    => $cnx->on_error,
                       # Client didn't say anything after initial timeout => SSH
                       on_rtimeout => $cnx->on_init_action(SSH_PORT),
                       # Client talk immediately => SSL
                       on_read     => $cnx->on_init_action(SSL_PORT)));

               warn "$host:$port connected.\n" if DEBUG;
           });


package Cnx;

use Scalar::Util qw(refaddr);

use AnyEvent;
use AnyEvent::Socket;
use AnyEvent::Handle;

use Carp;

my %CONNECTIONS;

sub new
{
    my($class, %opt) = @_;

    my $self = bless \%opt, $class;

    $CONNECTIONS{refaddr $self} = $self;

    return $self;
}


sub DESTROY
{
    my $self = shift;

    delete $CONNECTIONS{refaddr $self};

    warn "$self DESTROYed\n" if main::DEBUG;
}

# Create two accessors/mutators for attributes...
foreach my $attribute (qw(client_handle serv_handle))
{
    no strict 'refs';

    *$attribute = sub
    {
        if (@_ == 1)
        {
            return $_[0]{$attribute};
        }

        if (@_ == 2)
        {
            return $_[0]{$attribute} = $_[1];
        }

        carp "$attribute miscalled...";
    };
}


sub close_all
{
    my $self = shift;

    if (defined(my $handle = $self->client_handle))
    {
        $handle->destroy;
        $self->client_handle(undef);
    }

    if (defined(my $handle = $self->serv_handle))
    {
        $handle->destroy;
        $self->serv_handle(undef);
    }

    delete $CONNECTIONS{refaddr $self};
}


sub on_error
{
    my $self = shift;

    return sub
    {
        $self // return;

        my($handle, undef, $message) = @_;

        warn "CLIENT got error $message\n" if main::DEBUG;

        $self->close_all;
    };
}


sub on_init_action
{
    my($self, $port) = @_;

    # Something happens during the probe period
    return sub
    {
        my($handle, undef, $message) = @_;

        warn "$self on_init_action(PORT=$port).\n" if main::DEBUG;

        unless (defined $self->serv_handle)
        {
            # We cancel the timeout and we connect to the internal service
            $self->client_handle->rtimeout(0);

            tcp_connect('127.1', $port, $self->on_serv_connected($port));
        }
    };
}


sub on_client_read
{
    my $self = shift;

    # Client talk after the connection to the internal service
    return sub
    {
        my $handle = shift;

        warn "CLIENT -> serv: " . length($handle->{rbuf}) . " bytes\n"
            if main::DEBUG;

        $self->serv_handle->push_write(delete $handle->{rbuf});
    };
}


sub on_serv_connected
{
    my($self, $port) = @_;

    # We just connected to the internal service (or failed to)
    return sub
    {
        my $fh = shift;

        unless (defined $fh)
        {
            warn "Can't connect to internal service on port $port: $!\n";
            $self->close_all;
            return;
        }

        my $serv_handle = AnyEvent::Handle->new(
            fh => $fh,
            on_error => $self->on_serv_error,
            on_read  => $self->on_serv_read);

        warn "$serv_handle serv_connected\n" if main::DEBUG;

        $self->serv_handle($serv_handle);

        $self->client_handle->on_read($self->on_client_read);
    };
}


sub on_serv_error
{
    my $self = shift;

    # Error from internal service side
    return sub
    {
        my($serv_handle, undef, $msg) = @_;

        warn "SERV got error $msg\n" if main::DEBUG;

        $self->close_all;
    };
}


sub on_serv_read
{
    my $self = shift;

    # Something to read from internal service
    return sub
    {
        my $handle = shift;

        warn "SERV -> client: " . length($handle->{rbuf}) . " bytes\n"
            if main::DEBUG;

        $self->client_handle->push_write(delete $handle->{rbuf});
    };
}


package main;

AnyEvent->condvar->recv;

AnyEvent et POE

AnyEvent et POE sont deux modules Perl permettant de gérer de manière transparente les problématiques de boucles d’événements et de s’affranchir du système d’exploitation utilisé.

Il fournissent une interface uniforme quelle que soit la boucle d’événements sous-jacente. Les boucles d’événements pouvant être utilisées sous POE sont dans le namespace POE::Loop:: tandis que celles pouvant l’être sous AnyEvent sont dans le namespace AnyEvent::Impl::. On peut d’ailleurs voir que AnyEvent peut utiliser POE via AnyEvent::Impl::POE, mais c’est une autre histoire…

Si ces deux modules reconnaissent les événements de type entrées/sorties, les timers et les signaux, ils ont une approche différente de leur gestion :

  • AnyEvent permet d’associer un callback (uniquement une référence sur une fonction) à chaque événement. Point ;
  • POE, quant à lui, ajoute une couche dans laquelle les événements vont être nommés avant d’être distribués. Un callback (référence sur une fonction ou un couple instance / méthode) est associé à un état (un nom), puis chaque événement va être associé à un état. Lorsqu’un événement arrive, il déclenche un état et le callback associé à l’état est appelé. Une sorte d’indirection, en fait. Ces états sont regroupés par session. Chaque session est indépendante, mais peut communiquer avec une autre via des événements que nous qualifierons de « logiques », puisqu’ils n’ont pas le système pour origine (voir les méthodes post() et call() de POE::Kernel). Ce cloisonnement permet d’isoler des composants du programme les uns des autres et ainsi d’éviter que les noms d’état ne se trouvent tous mélangés, avec le risque de collision de nom que cela pourrait entraîner.

Bien entendu AnyEvent comme POE permettent de réaliser n’importe quelle application client/serveur. Pour tous les protocoles orientés ligne, cela ne pose aucun problème.

Là où le bât blesse, c’est lorsque le dialogue utilise des données sérialisées. Le problème n’est pas tant comment les données sont sérialisées, mais plutôt comment ces données sérialisées sont envoyées. Faut-il transmettre la longueur des données sérialisées en tête ? Si oui sous quelle forme ? Si non y-a-t’il une séquence de fin ? Autant d’options qui rendent l’interopérabilité parfois impossible…

AnyEvent va avoir une attitude différente selon la méthode de sérialisation utilisée, partant du principe que certaines méthodes de désérialisation savent détecter toutes seules la fin du flux (JSON ou Data::MessagePack, par exemple). Pour les méthodes n’incluant pas cette possibilité (comme Storable), la longueur va être transmise en tête avec un pack("w", LONGUEUR) par exemple.

POE gère les choses de manière générique. À l’aide du module POE::Filter::Reference, quelle que soit la méthode de sérialisation, la longueur des données sérialisées est passée en tête sous une forme « humaine » suivie d’un \0 comme "128\0..." pour une longueur de 128 octets par exemple.

Donc, ce n’est pas compatible :-(

C’est là qu’intervient le module AnyEvent::POE_Reference. Ce module permet de sérialiser et de désérialiser des données à la mode POE depuis AnyEvent sans nécessiter la présence du module POE.

Merci Perl, merci Ijenko !