TP5使用RabbitMQ实现消息队列的项目实践
发布:smiling 来源: PHP粉丝网 添加日期:2023-10-16 18:26:28 浏览: 评论:0
在使用 RabbitMQ 之前,你要安装好 RabbitMQ 服务,具体安装方法可以参考 windows下安装RabbitMQ。
1、安装扩展
进入TP5 更目录下,输入命令安装:
composer require php-amqplib/php-amqplib
2、自定义命令
TP5 的自定义命令,这里也简单说下。
第一步:
创建命令类文件,新建 application/api/command/Test.php。
- <?php
- namespace app\api\command;
- use think\console\Command;
- use think\console\Input;
- use think\console\Output;
- /**
- * 自定义命令测试
- */
- class Test extends Command
- {
- /**
- * 配置
- */
- protected function configure()
- {
- // 设置命令的名称和描述
- $this->setName('test')->setDescription('这是一个测试命令');
- }
- /**
- * 执行
- */
- protected function execute(Input $input, Output $output)
- {
- $output->writeln("测试命令");
- }
- }
这个文件定义了一个叫test的命令,备注为 这是一个测试命令,执行命令会输出:test command。
第二步:
配置 command.php文件,在 application/command.php文件中添加命令。
- <?php
- return [
- 'app\api\command\Test',
- ];
第三步:
测试命令,在项目根目录下输入命令:
php think test
回车运行之后输出:
test command
到这里,自定义命令就结束了,test命令就自定义成功了。
3、rabbitmq服务端
下来我们自定义 RabbitMQ 启动命令,守护进程运行,启动 rabbirmq 服务端接收消息。
在 application/api/command 目录下,新建 Ramq.php 文件,在执行命令的方法中,调用 RabbitMQ 启动守护进程方法即可。
- <?php
- namespace app\api\command;
- use PhpAmqpLib\Connection\AMQPStreamConnection;
- use think\console\Command;
- use think\console\Input;
- use think\console\Output;
- /**
- * RabbitMq 启动命令
- */
- class Ramq extends Command
- {
- protected $consumerTag = 'customer';
- protected $exchange = 'xcuser';
- protected $queue = 'xcmsg';
- protected function configure()
- {
- $this->setName('ramq')->setDescription('rabbitmq');
- }
- protected function execute(Input $input, Output $output)
- {
- $output->writeln("消息队列开始");
- $this->start();
- // 指令输出
- $output->writeln('消费队列结束');
- }
- /**
- * 关闭
- */
- function shutdown($channel, $connection)
- {
- $channel->close();
- $connection->close();
- }
- /**
- * 回调处理信息
- */
- function process_message($message)
- {
- if ($message->body !== 'quit') {
- echo $message->body;
- }
- //手动应答
- $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
- if ($message->body === 'quit') {
- $message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']);
- }
- }
- /**
- * 启动 守护进程运行
- */
- public function start()
- {
- $host = '127.0.0.1';
- $port = 5672;
- $user = 'guest';
- $pwd = 'guest';
- $vhost = '/';
- $connection = new AMQPStreamConnection($host, $port, $user, $pwd, $vhost);
- $channel = $connection->channel();
- $channel->queue_declare($this->queue, false, true, false, false);
- $channel->exchange_declare($this->exchange, 'direct', false, true, false);
- $channel->queue_bind($this->queue, $this->exchange);
- $channel->basic_consume($this->queue, $this->consumerTag, false, false, false, false, array($this, 'process_message'));
- register_shutdown_function(array($this, 'shutdown'), $channel, $connection);
- while (count($channel->callbacks)) {
- $channel->wait();
- }
- }
- }
在application/command.php文件中,添加rabbitmq自定义命令。
- return [
- 'app\api\command\Ramq',// rabbitmq
- ];
4、发送端
最后,我们再写发送消息的控制器,实现消息队列,具体代码如下:
- <?php
- namespace app\api\controller;
- use PhpAmqpLib\Connection\AMQPStreamConnection;
- use PhpAmqpLib\Message\AMQPMessage;
- use think\Controller;
- /**
- * 发送端
- */
- class MessageQueue extends Controller
- {
- const exchange = 'xcuser';
- const queue = 'xcmsg';
- /**
- * 发送消息
- */
- public function pushMessage($data)
- {
- $host = '127.0.0.1';
- $port = 5672;
- $user = 'guest';
- $pwd = 'guest';
- $vhost = '/';
- $connection = new AMQPStreamConnection($host, $port, $user, $pwd, $vhost);
- $channel = $connection->channel();
- $channel->exchange_declare(self::exchange, 'direct', false, true, false);
- $channel->queue_declare(self::queue, false, true, false, false);
- $channel->queue_bind(self::queue, self::exchange);
- $messageBody = $data;
- $message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));
- $channel->basic_publish($message, self::exchange);
- $channel->close();
- $connection->close();
- echo 'ok';
- }
- /**
- * 执行
- */
- public function index()
- {
- $data = json_encode(['msg' => '测试数据', 'id' => '15']);
- $this->pushMessage($data);
- }
- }
5、验证
先执行自定义命令,启动 rabbitmq 守护进程。在项目更目录下打开命令行,输入下面命令:
php think ramq
然后在浏览器访问发送信息的方法,http://你的域名/api/message/index,你发送一次消息,在命令行就会输出一条消息。这样我们就用 RabbitMQ 实现了一个简单的消息队列。
Tags: RabbitMQ TP5消息队列
- 上一篇:ThinkPHP部署Workerman的成功使用示例
- 下一篇:最后一页
推荐文章
热门文章
最新评论文章
- 写给考虑创业的年轻程序员(10)
- PHP新手上路(一)(7)
- 惹恼程序员的十件事(5)
- PHP邮件发送例子,已测试成功(5)
- 致初学者:PHP比ASP优秀的七个理由(4)
- PHP会被淘汰吗?(4)
- PHP新手上路(四)(4)
- 如何去学习PHP?(2)
- 简单入门级php分页代码(2)
- php中邮箱email 电话等格式的验证(2)