Hello World

先决条件

本教程假定RabbitMQ在localhost默认端口(5672)上安装和运行。如果您使用其他主机,端口或凭据,连接设置将需要调整。

在哪里获得帮助

如果您在阅读本教程时遇到问题,可以通过邮件列表与我们联系。

简介

RabbitMQ是一个消息代理。主要想法很简单:它接受和转发消息。你可以把它当成一个邮局:当你发送邮件到邮政信箱,你很确定邮递员最终将邮件发送给你的收件人。使用这个隐喻RabbitMQ是一个邮政信箱,邮局和邮递员。

RabbitMQ和邮局之间的主要区别是它不处理纸张,而是接受,存储和转发二进制数据块的消息。

在使用RabbitMQ前,先让我们了解一些基本术语:

  • Producing:发送消息,发送消息的程序是生产者(producer),我们用P来表示它:

  • Queue: queue 是邮箱的名称,它们存在于RabbitMQ中,虽然消息流经RabbitMQ和您的应用程序,但它们只能存储在队列中。队列不受任何限制,它可以存储尽可能多的消息,你喜欢 - 它本质上是一个无限缓冲区。许多生产者可以发送去往一个队列的消息,许多消费者可以尝试从一个队列接收数据。

  • Consuming: 消费与接收具有相似的含义。消费者(consumer)是大多数等待接收消息的程序。

生产者(producer),消费者(consumer)和代理(broker)不必驻留在同一台机器上;事实上在大多数应用中他们没有。

Hello World!

使用php-amqplib

在这一部分,我们将在php中编写两个应用程序:发送单个消息的生产者、以及接收消息并将其打印出来的消费者,我们将讨论php-amqplib API中的一些细节,集中讨论这个非常简单的事情。这是一个“Hello World”的消息。

在下图中,“P”是我们的生产者,“C”是我们的消费者。中间的框是一个队列--RabbitMQ代表消费者保存的消息缓冲区。

php-amqplib 客户端库

RabbitMQ支持多种协议,本教程包含AMQP 0-9-1,它是一个用于消息传递的开放式协议。 RabbitMQ有许多不同语言的客户端。我们将在本教程中使用php-amqplib,并使用Composer进行依赖关系管理。

在你的项目根目录中添加 composer.json文件,并加入下列内容:

{
    "require": {
        "php-amqplib/php-amqplib": "2.5.*"
    }
}

接下来在终端中执行下列命令来安装php-amqplib:

$ composer install

如果你没有安装composer,请自行安装。

现在我们已经安装了php-amqplib库,我们可以编写一些代码。

Sending

我们将发送消息的程序命名为 send.php,和接收消息的程序命名为 receive.php,发送方链接到RabbitMQ,发送单条消息,然后退出。

在send.php中,我们需要包含库并使用必要的类

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

接下来,我们创建一个到服务器的链接:

$connection = new AMQPStreamConnection('localhost',5672,'guest','guest');
$channel = $connection->channel();

$connection 连接socket链接,并为我们处理协议版本和认证等等。在这个案例中,我们连接到本地的broker,所以这里是localhost。如果我们想连接到其它机器上的broker,只需要提供它的名字或者IP地址。

然后我们创建一个通道(channel),这里有大多数的API来完成我们的任务。

要发送消息,我们必须先声明一个队列(queue),然后才可以向队列发送消息:

$channel->queue_declare('hello',false,false,false,false);

$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg,'','hello');

echo " [x] Sent 'Hello World!' \n";

声明队列是幂等的,它只有在它不存在时才会被创建。

消息内容是一个字节数组,所以你可以把任何想发送的内容编码后发送过来。

最后我们要关闭$channel 和$connection来释放资源:

$channel->close();
$connection->close();

上面就是整个 send.php的代码了。

发送程序无法运行?

如果你第一次使用RabbitMQ,并且没有看到程序最后输出的Sent来告知你成功了,你可能会挠着头来思考到底是哪出错了? 可能是broker没有足够的可用空间(默认情况下,它至少需要1GB的空间来运行),因此拒绝接受消息。你可以检查下broker的日志文件来确定问题的原因,并在必要时减少该限制,配置文档里面有教你如何配置disk_free_limit.

Receiving

以上就是我们的sender了,我们的接收器是推送来自RabbitMQ的消息,因此与发送单个消息的sender是不一样的。我们继续运行监听消息并将其打印出来。

receive.php 中同样需要include类库和use必要的类:

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

设置与sender 相同,我们打开一个连接(connection)和一个通道(channel),并且声明一个我们将要使用的队列,这里要注意到队列必须和sender发送的队列是同一个。

$connection = new AMQPStreamConnection('localhost',5672,'guest','guest');
$channel = $connection->channel();

$channel->queue_declare('hello',false,false,false,false);

echo ' [*] Waiting for messages. To exit press CTRL+C',"\n";

注意:我们在这里声明队列,是因为我们可能在发送方之前启动接收方,所以我们希望在尝试使用消息前确保队列存在。并且因为这里声明是幂等的,所以在发送方声明时,则可能不会创建新的队列,而是打开现有的队列

我们告诉服务器使用队列来传递消息。我们定义一个php回调(PHP callable),它来接收服务器发送的消息。

$callback = function($msg) {
    echo " [x] Received ",$msg->body,"\n";
}

$channel->basic_consume('hello','',false,true,false,false,$callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

上述代码会阻塞住程序结束,而 $channel有一个回调函数,每当我们收到一个程序时,我们的$callback函数将被传递接收道德消息。

以上就是receive.php的全部代码了

将它们放在一起

现在我们可以运行这两个脚本了,在终端里,运行发送程序:

$ php send.php

然后运行接收程序

$ php receive.php

接收方将通过RabbitMQ接收到的消息打印出来,接收者将继续运行,等待消息(使用Ctrl-C停止它),所以尝试从另一个终端运行发送者。

如果你想检查队列,你可以运行在RabbitMQ目录下的sbin目录中的rabbitmqctl:

$ rabbitmqctl list_queues.

你可以将rabbitmqctl加入到你的环境变量中,那样你就可以直接运行rabbitmqctl,而不用进入RabbitMQ目录中。

输出:Hello World!

results matching ""

    No results matching ""