Redoing RabbitMQ's tutorial - part 2
Sunday, January 27, 2013 9:46:05 PM
And here we go with RabbitMQ's tutorial 2: work queues:In this one we'll create a Work Queue that will be used to distribute time-consuming tasks among multiple workers.
As previously, I'll just show the finished code. Please refer to the tutorial and to Net::RabbitMQ's documentation for the details.
new_task.pl
#!/usr/bin/perl
use strict ;
use warnings ;
use Net::RabbitMQ ;
{
# closure to return a new channel ID every time we call nextchan
my $nextchan = 1 ;
sub nextchan { return $nextchan++ } ;
}
### BEGIN CONFIGURABLE PARAMETERS ######################################
my $qserver = q{gravity} ;
my %qparms = () ;
my %declare_opts = ( durable => 1, auto_delete => 0 ) ;
my $qname = q{gravity.checks} ;
my $count = $ARGV[0] ;
### NO CONFIGURABLE PARAMETERS BELOW THIS LINE #########################
my $mq = Net::RabbitMQ->new() ;
my $chanID = nextchan() ;
$mq->connect($qserver, \%qparms) ;
$mq->channel_open($chanID) ;
$mq->queue_declare($chanID,$qname,\%declare_opts,) ;
for (my $i = 1 ; $i <= $count ; $i++)
{
my $sec = 1+int(rand(10)) ;
my $message = qq{This task will last for $sec seconds} ;
$mq->publish($chanID,$qname,$message,{ exchange => "" },) ;
print STDERR qq{Message "$message" sent to queue $qname\n} ;
}
$mq->disconnect ;
worker.pl
#!/usr/bin/perl
use strict ;
use warnings ;
use Net::RabbitMQ ;
{
# closure to return a new channel ID every time we call nextchan
my $nextchan = 1 ;
sub nextchan { return $nextchan++ } ;
}
### BEGIN CONFIGURABLE PARAMETERS ######################################
my $qserver = q{gravity} ;
my %qparms = () ;
my %consume_opts = (
consumer_tag => "worker_$$",
no_ack => 0,
exclusive => 0
) ;
my %declare_opts = ( durable => 1, auto_delete => 0 ) ;
my $qname = q{gravity.checks} ;
### NO CONFIGURABLE PARAMETERS BELOW THIS LINE #########################
my $mq = Net::RabbitMQ->new() ;
my $chanID = nextchan() ;
$mq->connect($qserver, \%qparms) ;
$mq->channel_open($chanID) ;
$mq->basic_qos($chanID,{ prefetch_count => 1 }) ;
$mq->queue_declare($chanID,$qname,\%declare_opts,) ;
$mq->consume($chanID,$qname,\%consume_opts) ;
# NOTE THAT recv() is BLOCKING!!! get wasn't!
while ( my $payload = $mq->recv() )
{
last if not defined $payload ;
my $body = $payload->{body} ;
my $dtag = $payload->{delivery_tag} ;
my ($sec) = ( $body =~ m{(\d+)} ) ;
print STDERR qq{Worker $$: Received from queue $qname: $body\n} ;
sleep $sec ;
$mq->ack($chanID,$dtag,) ;
print STDERR qq{Worker $$: Work done in $sec seconds\n} ;
}
output
bronto@cooper:~/Lab/gravity/tutorial-2$ ./worker.pl & ./worker.pl & [1] 8079 [2] 8080 bronto@cooper:~/Lab/gravity/tutorial-2$ ./new_task.pl 5 Message "This task will last for 8 seconds" sent to queue gravity.checks Message "This task will last for 2 seconds" sent to queue gravity.checks Message "This task will last for 6 seconds" sent to queue gravity.checks Message "This task will last for 8 seconds" sent to queue gravity.checks Message "This task will last for 6 seconds" sent to queue gravity.checks Worker 8080: Received from queue gravity.checks: This task will last for 8 seconds Worker 8079: Received from queue gravity.checks: This task will last for 2 seconds bronto@cooper:~/Lab/gravity/tutorial-2$ Worker 8079: Work done in 2 seconds Worker 8079: Received from queue gravity.checks: This task will last for 6 seconds Worker 8080: Work done in 8 seconds Worker 8080: Received from queue gravity.checks: This task will last for 8 seconds Worker 8079: Work done in 6 seconds Worker 8079: Received from queue gravity.checks: This task will last for 6 seconds Worker 8079: Work done in 6 seconds Worker 8080: Work done in 8 seconds bronto@cooper:~/Lab/gravity/tutorial-2$ kill %1 %2 bronto@cooper:~/Lab/gravity/tutorial-2$ [1]- Terminated ./worker.pl [2]+ Terminated ./worker.pl bronto@cooper:~/Lab/gravity/tutorial-2$
Note that:
- only one call of consume() is needed, you don't have to repeat it before each recv();
- recv() is blocking, the workers don't stop running when the queue was empty: I had to kill the workers to stop them.






