PHP 5.4.33 Released

Examples

Example #1 AMQP Example

<?php

// Create a connection
$cnn = new AMQPConnection();
$cnn->connect();

// Declare a new exchange
$ex = new AMQPExchange($cnn);
$ex->declare('exchange1'AMQP_EX_TYPE_FANOUT);

// Create a new queue
$q = new AMQPQueue($cnn);
$q->declare('queue1');

// Bind it on the exchange to routing.key
$ex->bind('queue1''routing.key');

// Publish a message to the exchange with a routing key
$ex->publish('message''routing.key');

// Read from the queue
$msg $q->consume();

?>
add a note add a note

User Contributed Notes 5 notes

up
2
jean dot weisbuch at phpnet dot org
2 years ago
The official documentation is not up to date as of June 2012 for the latest stable version (1.0.3) so i had to find semi-blindly how i could use the extension (thanks to some snippets on User Contributed Notes).

Here is a simple way to publish and retrieve messages (tested using RabbitMQ as the broker) that works on the stable version of the extension :

<?php
function amqp_connection() {
   
$amqpConnection = new AMQPConnection();
   
$amqpConnection->setLogin("username");
   
$amqpConnection->setPassword("123456");
   
$amqpConnection->setVhost("virthost");
   
$amqpConnection->connect();

    if(!
$amqpConnection->isConnected()) {
        die(
"Cannot connect to the broker, exiting !\n");
    }
    return
$amqpConnection;
}

function
amqp_receive($exchangeName, $routingKey, $queueName) {
   
$amqpConnection = amqp_connection();

   
$channel = new AMQPChannel($amqpConnection);
   
$queue = new AMQPQueue($channel);
   
$queue->setName($queueName);
   
$queue->bind($exchangeName, $routingKey);

    while(
$message = $queue->get()) {
        echo(
"Message #".$message->getDeliveryTag()." '".$message->getBody()."'");

        if(
$message->isRedelivery()) {
            echo(
"\t(this message has already been delivered)");
        }
       
// just for testing purpose, shows how to manually remove a message from queue
       
if(rand(0,6) > 4) {
           
$queue->ack($message->getDeliveryTag());
            echo(
"\t(this message has been removed from the queue)");
        }
       
print_r($message->getMessageId());
        echo
"\n";
    }

    if(!
$amqpConnection->disconnect()) {
        throw new
Exception("Could not disconnect !");
    }
}

function
amqp_send($text, $routingKey, $exchangeName){
   
$amqpConnection = amqp_connection();

   
$channel = new AMQPChannel($amqpConnection);
   
$exchange = new AMQPExchange($channel);
   
$exchange->setName($exchangeName);
   
$exchange->setType("fanout");
       
$message = $exchange->publish($text, $routingKey);
        if(!
$message) {
        echo
"Error: Message '".$message."' was not sent.\n";
    } else {
        echo
"Message '".$message."' sent.\n";
    }

    if (!
$amqpConnection->disconnect()) {
        throw new
Exception("Could not disconnect !");
    }
}

// lets send a message with a "random" content (the date)
amqp_send("Message added at this date: ".date(DATE_RFC822), "action", "amq.fanout");

// you need to sleep for 1 sec if you want to be able to receive the message you just sent ("limitation" must be on the broker side)
sleep(1);

// now we receive messages from the queue we just sent a message on
amqp_receive("amq.fanout","action","action");
?>

The rand() part of the script is used to make it more "lively" for testing when the script is run many times by randomly acknowledging messages (and show how to acknowledge non-automatically a message after it has been read).
up
2
info at eeasoftware dot com
2 years ago
I spent several hours looking for some examples of sending messages to worker queues vs. fan queues.  The documentation isn't too clear and I had to consult many sources, so here are simple methods for both types of 'producers'.  For brevity I've left out 'safety' checks in the code dealing with whether or not a connection was established, error checking, cleanup, etc.

As further clarification, I'm using rabbitmq as my service, with java daemons running on the server as 'consumers'.  These examples are current with the new 1.0.0 stable AMQP release in February of 2012.

<?php

//set your connection arguments and connect to the server
$conn_args = array('host' => 'your_host', 'port' => 'your_host_port', 'login' => 'your_username', 'password' => 'your_password');
$conn = new AMQPConnection($conn_args);
$conn->connect();

//create your message
$message = 'Hello World!';

//create a channel and exchange
$channel = new AMQPChannel($conn);
$exchange = new AMQPExchange($channel);

//Here is where the code splits for the different types of queues:

//-------------------------------------------------
//For Fan Type Queues (One message will be consumed by many consumers - each on a queue of their own)

//set the exchange name and publish the message
$exchange->setName('exchange_name');
$exchange->publish($message, 'your_routing_key');

//for Fan Queue calls not bound to a particular key, the 'routing_key' will simply be ignored

//-------------------------------------------------
//For Worker Type Queues (Many consumers are listening to a single queue, and a single message will be passed only to the next available consumer waiting in line)

//start a transaction
$channel->startTransaction();

//publish your message with 'your_queue_name' as the 'routing_key'
$exchange->publish($message, 'your_queue_name');

//commit your transaction
$channel->commitTransaction();

//-------------------------------------------------

//psuedo code:
//clean up $channel; - probably just unset($channel) will do.
//clean up $exchange; - probably just unset($exchange) will do.

//disconnect from server
$conn->disconnect();

?>
up
1
WhatTheWhat
1 year ago
How is SSL handled? I cannot find any documentation on support for SSL or how to use it.

I am trying to connect to CloudAMQP which requires a different port for SSL (easy enough) and it also requires to use amqps:// instead of amqp:// for the AMQP URL.

However, I cannot figure out how to specify amqps://

Thank You
up
1
bradyjvitrano at gmail dot com
2 years ago
This example assumes no previous queue or exchange has been declared.

Send File
<?php
/**
* Filename: send.php
* Purpose:
* Send messages to RabbitMQ server using AMQP extension
* Exchange Name: exchange1
* Exchange Type: fanout
* Queue Name: queue1
*/
$connection = new AMQPConnection();
$connection->connect();
if (!
$connection->isConnected()) {
    die(
'Not connected :(' . PHP_EOL);
}
// Open Channel
$channel    = new AMQPChannel($connection);
// Declare exchange
$exchange   = new AMQPExchange($channel);
$exchange->setName('exchange1');
$exchange->setType('fanout');
$exchange->declare();
// Create Queue
$queue      = new AMQPQueue($channel);
$queue->setName('queue1');
$queue->declare();

$message    = $exchange->publish('Custom Message (ts): '.time(), 'key1');
if (!
$message) {
    echo
'Message not sent', PHP_EOL;
} else {
    echo
'Message sent!', PHP_EOL;
}
?>

Receive File
<?php
/**
* Filename: receive.php
* Purpose:
* Receive messages from RabbitMQ server using AMQP extension
* Exchange Name: exchange1
* Exchange Type: fanout
* Queue Name: queue1
*/
$connection = new AMQPConnection();
$connection->connect();
if (!
$connection->isConnected()) {
    die(
'Not connected :('. PHP_EOL);
}
// Open channel
$channel    = new AMQPChannel($connection);
// Open Queue and bind to exchange
$queue      = new AMQPQueue($channel);
$queue->setName('queue1');
$queue->bind('exchange1', 'key1');
$queue->declare();
// Prevent message redelivery with AMQP_AUTOACK param
while ($envelope = $queue->get(AMQP_AUTOACK)) {
    echo (
$envelope->isRedelivery()) ? 'Redelivery' : 'New Message';
    echo
PHP_EOL;
    echo
$envelope->getBody(), PHP_EOL;
}

?>

Test It
$ php -f send.php
$ php -f receive.php
up
0
kurt at surfmerchants dot com
3 years ago
The docs for this extension are a little vague, so I'm adding a few things I found helpful in getting this to work.

Helpful reading:
* http://en.wikipedia.org/wiki/AMQP
* http://www.rabbitmq.com/getstarted.html

Helpful tool:
/usr/sbin/rabbitmqctl list_queues

Here I've split the usage example into a sender and receiver, because that is much more useful (at least to me):

send.php :
#!/usr/bin/php -q
<?php
// config
$exchangeName = 'myexchange';
$routingKey = 'routing.key';
$message = $argv[1];

// connect
$connection = new AMQPConnection();
$connection->connect();

// setup exchange
$ex = new AMQPExchange($connection);
$ex->declare($exchangeName, AMQP_EX_TYPE_FANOUT);

// Publish a message to the exchange with our routing key
$ex->publish($message, $routingKey);

// disconnect
$connection->disconnect();

?>

receive.php:
#!/usr/bin/php -q
<?php
// config
$exchangeName = 'myexchange';
$routingKey = 'routing.key';
$queueName = 'myqueue';

// connect
$connection = new AMQPConnection();
$connection->connect();

// setup our queue
$q = new AMQPQueue($connection);
$q->declare($queueName);

// Bind it on the exchange to routing.key
$q->bind($exchangeName, $routingKey);

// show the message
print_r($q->get());

// disconnect
$connection->disconnect();

?>

The first time you run "./send.php 'hello world'", then "./receive.php", you will not get the message, because the receiver creates the queue. That's OK. While you *can* create the queue in the sender, this is not so useful, because it's really the receivers that read the queue, and I've found that various clients are very picky about the details of how the queue was created (for example, if I create the queue in my php sender, then try to listen with a python receiver, it throws errors, but if I let the python receiver create the queue, all is well).
So, the second time you send a message (after running receive.php once), the queue will exist, and you will then get the message the next time you run receive.

Another thing I noticed is that I'm using $q->get() instead of $q->consume(), because the latter seems to segfault (at least on my CentOS 5.5 system).
To Top