Laravel中Kafka的使用详解
发布:smiling 来源: PHP粉丝网 添加日期:2022-04-20 11:48:21 浏览: 评论:0
本文并没有kafka的安装教程,本文是针对已经安装kafka及其配置好kafka的php拓展并且使用laravel框架进行开发项目,配置一个可供laravel框架使用的生产及消费者类。
以下代码修改自本站的YII框架关于kafka类的代码,经过测试使用在本人的项目中,可正常运行,larvael版本:5.6 代码放置larvael框架位置:app/Tools/Kafka.php
- <?php
- namespace App\Tools;
- use Illuminate\Config\Repository;
- use Illuminate\Support\Facades\DB;
- use Monolog\Logger;
- use Monolog\Handler\StreamHandler;
- use Illuminate\Http\Request;
- class Kafka
- {
- public $broker_list = '127.0.0.1';//配置kafka,可以用逗号隔开多个kafka
- public $topic = 'test';//管道名称
- public $partition = 0;
- protected $producer = null;
- protected $consumer = null;
- public function __construct()
- {
- if (emptyempty($this->broker_list)) {
- throw new InvalidConfigException("broker not config");
- }
- $rk = new \RdKafka\Producer();
- if (emptyempty($rk)) {
- throw new InvalidConfigException("producer error");
- }
- $rk->setLogLevel(LOG_DEBUG);
- if (!$rk->addBrokers($this->broker_list)) {
- throw new InvalidConfigException("producer error");
- }
- $this->producer = $rk;
- }
- /**
- * 生产者
- * @param array $messages
- * @return mixed
- */
- public function send($messages = [],$topic)
- {
- $topic = $this->producer->newTopic($topic);
- return $topic->produce(RD_KAFKA_PARTITION_UA, $this->partition, json_encode($messages));
- }
- /**
- * 消费者
- */
- public function consumer($object, $callback){
- $conf = new \RdKafka\Conf();
- $conf->set('group.id', 0);
- $conf->set('metadata.broker.list', $this->broker_list);
- $topicConf = new \RdKafka\TopicConf();
- $topicConf->set('auto.offset.reset', 'smallest');
- $conf->setDefaultTopicConf($topicConf);
- $consumer = new \RdKafka\KafkaConsumer($conf);
- $consumer->subscribe([$this->topic]);
- echo "waiting for messages.....\n";
- while(true) {
- $message = $consumer->consume(120*1000);
- switch ($message->err) {
- case RD_KAFKA_RESP_ERR_NO_ERROR:
- echo "message payload....";
- $object->$callback($message->payload);
- break;
- }
- sleep(1);
- }
- }
- }
- ?>
在控制器中如何使用:
首先再头部导入这个类:use App\Tools\Kafka;
下面是使用生产者实例:
- public function test(){
- $topic = 'tool';//输入使用管道名称
- $data['shop_id'] = 58;
- $data['bar_code']=586;
- $data['goods_num'] = 1;
- $data['goods_unit'] = '个';
- $Kafka = new Kafka();
- $Error_Msg = $Kafka->send($data,$topic);//传入数组会自动转换json
- var_dump($Error_Msg);
- }
下面是消费者实例,消费者我这里使用了的是php脚本进行的操作:
- <?php
- $conf = new RdKafka\Conf();
- $conf->set('group.id', 'myConsumerGroup');
- $rk = new RdKafka\Consumer($conf);
- $rk->addBrokers("localhost:9092");
- $topicConf = new RdKafka\TopicConf();
- $topicConf->set('auto.commit.interval.ms', 100);
- $topicConf->set('offset.store.method', 'file');
- $topicConf->set('offset.store.path', sys_get_temp_dir());
- $topicConf->set('auto.offset.reset', 'smallest');
- $topic = $rk->newTopic("tool", $topicConf);//读取的管道
- // Start consuming partition 0
- $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
- while (true) {
- $message = $topic->consume(0, 120*10000);
- switch ($message->err) {
- case RD_KAFKA_RESP_ERR_NO_ERROR:
- //没有错误打印信息
- $message = json_decode(json_encode($message),true);
- $data = json_decode($message['payload'],true);
- var_dump($data);
- break;
- case RD_KAFKA_RESP_ERR__PARTITION_EOF:
- echo "等待接收信息\n";
- break;
- case RD_KAFKA_RESP_ERR__TIMED_OUT:
- echo "超时\n";
- break;
- default:
- throw new \Exception($message->errstr(), $message->err);
- break;
- }
- sleep(1);
- }
- ?>
Tags: Kafka
- 上一篇:YII2 全局异常处理深入讲解
- 下一篇:最后一页
推荐文章
热门文章
最新评论文章
- 写给考虑创业的年轻程序员(10)
- PHP新手上路(一)(7)
- 惹恼程序员的十件事(5)
- PHP邮件发送例子,已测试成功(5)
- 致初学者:PHP比ASP优秀的七个理由(4)
- PHP会被淘汰吗?(4)
- PHP新手上路(四)(4)
- 如何去学习PHP?(2)
- 简单入门级php分页代码(2)
- php中邮箱email 电话等格式的验证(2)