PHP使用enqueue/amqp-lib实现rabbitmq任务处理
发布:smiling 来源: PHP粉丝网 添加日期:2024-04-28 11:24:09 浏览: 评论:0
这篇文章主要为大家详细介绍了PHP如何使用enqueue/amqp-lib实现rabbitmq任务处理,文中的示例代码讲解详细,感兴趣的小伙伴可以学习一下。
一:拓展安装
composer require enqueue/amqp-lib
文档地址:https://github.com/php-enqueue/enqueue-dev/blob/master/docs/transport/amqp_lib.md
二:方法介绍
1:连接rabbitmq
- $factory = new AmqpConnectionFactory([
- 'host' => '192.168.6.88',//host
- 'port' => '5672',//端口
- 'vhost' => '/',//虚拟主机
- 'user' => 'admin',//账号
- 'pass' => 'admin',//密码
- ]);
- $context = $factory->createContext();
2:声明主题
- //声明并创建主题
- $exchangeName = 'exchange';
- $fooTopic = $context->createTopic($exchangeName);
- $fooTopic->setType(AmqpTopic::TYPE_FANOUT);
- $context->declareTopic($fooTopic);
- //删除主题
- $context->deleteTopic($fooTopic);
3:声明队列
- //声明并创建队列
- $queueName = 'rabbitmq';
- $fooQueue = $context->createQueue($queueName);
- $fooQueue->addFlag(AmqpQueue::FLAG_DURABLE);
- $context->declareQueue($fooQueue);
- //删除队列
- $context->deleteQueue($fooQueue);
4:将队列绑定到主题
$context->bind(new AmqpBind($fooTopic, $fooQueue));
5:发送消息
- //向队列发送消息
- $message = $context->createMessage('Hello world!');
- $context->createProducer()->send($fooQueue, $message);
- //向队列发送优先消息
- $queueName = 'rabbitmq';
- $fooQueue = $context->createQueue(queueName);
- $fooQueue->addFlag(AmqpQueue::FLAG_DURABLE);
- //设置队列的最大优先级
- $fooQueue->setArguments(['x-max-priority' => 10]);
- $context->declareQueue($fooQueue);
- $message = $context->createMessage('Hello world!');
- $context->createProducer()
- ->setPriority(5) //设置优先级,优先级越高,消息越快到达消费者
- ->send($fooQueue, $message);
- //向队列发送延时消息
- $message = $context->createMessage('Hello world!');
- $context->createProducer()
- ->setDelayStrategy(new RabbitMqDlxDelayStrategy())
- ->setDeliveryDelay(5000) //消息延时5秒
- ->send($fooQueue, $message);
6:消费消息【接收消息】
- //消费消息
- $consumer = $context->createConsumer($fooQueue);
- $message = $consumer->receive();
- // process a message
- //业务代码
- $consumer->acknowledge($message);//ack应答,通知rabbitmq成功,删除对应任务
- // $consumer->reject($message);ack应答,通知rabbitmq失败,不删除对应任务
- //订阅消费者
- $fooConsumer = $context->createConsumer($fooQueue);
- $subscriptionConsumer = $context->createSubscriptionConsumer();
- $subscriptionConsumer->subscribe($fooConsumer, function(Message $message, Consumer $consumer) {
- // process message
- //业务代码
- $consumer->acknowledge($message);//ack应答,通知rabbitmq成功,删除对应任务
- // $consumer->reject($message);ack应答,通知rabbitmq失败,不删除对应任务
- return true;
- });
- $subscriptionConsumer->consume();
- //清除队列消息
- $queueName = 'rabbitmq';
- $queue = $context->createQueue($queueName);
- $context->purgeQueue($queue);
三:简单实现
1:发送消息
- //连接rabbitmq
- $factory = new AmqpConnectionFactory([
- 'host' => '192.168.6.88',
- 'port' => '5672',
- 'vhost' => '/',
- 'user' => 'admin',
- 'pass' => 'admin',
- 'persisted' => false,
- ]);
- $context = $factory->createContext();
- //声明主题
- $exchangeName = 'exchange';
- $fooTopic = $context->createTopic($exchangeName);
- $fooTopic->setType(AmqpTopic::TYPE_FANOUT);
- $context->declareTopic($fooTopic);
- //声明队列
- $queueName = 'rabbitmq';
- $fooQueue = $context->createQueue($queueName);
- $fooQueue->addFlag(AmqpQueue::FLAG_DURABLE);
- $context->declareQueue($fooQueue);
- //将队列绑定到主题
- $context->bind(new AmqpBind($fooTopic, $fooQueue));
- //发送消息到队列
- $message = $context->createMessage('Hello world!');
- $context->createProducer()->send($fooQueue, $message);
2:消费消息
- $factory = new AmqpConnectionFactory([
- 'host' => '192.168.6.88',
- 'port' => '5672',
- 'vhost' => '/',
- 'user' => 'admin',
- 'pass' => 'admin',
- 'persisted' => false,
- ]);
- $context = $factory->createContext();
- $queueName = 'rabbitmq';
- $fooQueue = $context->createQueue($queueName);
- $fooConsumer = $context->createConsumer($fooQueue);
- $subscriptionConsumer = $context->createSubscriptionConsumer();
- $subscriptionConsumer->subscribe($fooConsumer, function(Message $message, Consumer $consumer) {
- // process message
- //业务代码
- $consumer->acknowledge($message);//ack应答,通知rabbitmq成功,删除对应任务
- // $consumer->reject($message);ack应答,通知rabbitmq失败,不删除对应任务
- return true;
- });
- $subscriptionConsumer->consume();
Tags: enqueue amqp-lib rabbitmq
- 上一篇:基于PHP实现端口批量查询工具
- 下一篇:最后一页
相关文章
- ·PHP和RabbitMQ实现消息队列的完整代码(2020-02-04)
- ·什么是RabbitMQ?RabbitMQ的简单介绍(2020-02-08)
- ·PHP基于rabbitmq操作类的生产者和消费者功能示例(2021-10-01)
- ·PHP+RabbitMQ实现消息队列的完整代码(2021-11-13)
- ·如何用RabbitMQ和Swoole实现一个异步任务系统(2022-05-01)
- ·如何基于Hyperf实现RabbitMQ+WebSocket消息推送(2022-06-12)
- ·基于 Hyperf + RabbitMQ + WebSocket 实现消息推送(2022-06-19)
- ·以PHP代码为实例详解RabbitMQ消息队列中间件的6种模式(2023-07-12)
推荐文章
热门文章
最新评论文章
- 写给考虑创业的年轻程序员(10)
- PHP新手上路(一)(7)
- 惹恼程序员的十件事(5)
- PHP邮件发送例子,已测试成功(5)
- 致初学者:PHP比ASP优秀的七个理由(4)
- PHP会被淘汰吗?(4)
- PHP新手上路(四)(4)
- 如何去学习PHP?(2)
- 简单入门级php分页代码(2)
- php中邮箱email 电话等格式的验证(2)