当前位置:首页 > PHP教程 > php高级应用 > 列表

PHP 消息队列 Kafka 使用

发布:smiling 来源: PHP粉丝网  添加日期:2022-06-14 08:40:57 浏览: 评论:0 

安装 Kafka 服务

直接到 kafka 官网 , 下载最新的

wget https://mirror.bit.edu.cn/apache/kafka/2.5.0/kafka_2.13-2.5.0.tgz

解压,进入目录

tar -zxvf kafka_2.13-2.5.0.tgz

cd kafka_2.13-2.5.0

启动 Kafka 服务

使用安装包中的脚本启动单节点 Zookeeper 实例

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

使用 kafka-server-start.sh 启动 kafka 服务

bin/kafka-server-start.sh config/server.properties

创建 topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看 topic 列表,检查是否创建成功

bin/kafka-topics.sh --list --zookeeper localhost:2181

$ test

生产者,发送消息

bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test

服务方面到这里就差不多了,接下来就是 php 的事了。

安装 PHP 扩展

rdkafka 安装需要依赖 librdkafka , 所以先安装 librdkafka

  1. git clone https://github.com/edenhill/librdkafka.git 
  2.  
  3. cd librdkafka 
  4.  
  5. ./configure 
  6.  
  7. make && make install 

安装 php-rdkafka 扩展

  1. git clone https://github.com/arnaud-lb/php-rdkafka.git 
  2.  
  3. cd php-rdkafka 
  4.  
  5. phpize 
  6.  
  7. ./configure --with-php-config=/usr/local/Cellar/php@7.2/7.2.24/bin/php-config  ## 这里根据自己的情况填写路径 
  8.  
  9. make && make install 

在 php-ini 加上

extension=rdkafka.so

重启,php-fpm,就应该可以看到该扩展。

使用Kafka

创建一个生产者类

  1. <?php 
  2.  
  3. class KafkaProducer 
  4.  
  5.  
  6.     public static $brokerList = '127.0.0.1:9092'
  7.  
  8.     public static function send($message$topic
  9.  
  10.     { 
  11.  
  12.         self::producer($message$topic); 
  13.  
  14.     } 
  15.  
  16.     public static function producer($message$topic = 'test'
  17.  
  18.     { 
  19.  
  20.         $conf = new \RdKafka\Conf(); 
  21.  
  22.         $conf->set('metadata.broker.list', self::$brokerList); 
  23.  
  24.         $producer = new \RdKafka\Producer($conf); 
  25.  
  26.         $topic = $producer->newTopic($topic); 
  27.  
  28.         $topic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($message)); 
  29.  
  30.         $producer->poll(0); 
  31.  
  32.         $result = $producer->flush(10000); 
  33.  
  34.         if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) { 
  35.  
  36.             throw new \RuntimeException('Was unable to flush, messages might be lost!'); 
  37.  
  38.         } 
  39.  
  40.     } 
  41.  

创建一个消费类

  1. <?php 
  2.  
  3. class KafkaConsumer 
  4.  
  5.  
  6.     public static $brokerList = '127.0.0.1:9092'
  7.  
  8.       public static function consumer() 
  9.  
  10.     { 
  11.  
  12.         $conf = new \RdKafka\Conf(); 
  13.  
  14.         $conf->set('group.id''test'); 
  15.  
  16.         $rk = new \RdKafka\Consumer($conf); 
  17.  
  18.         $rk->addBrokers("127.0.0.1"); 
  19.  
  20.         $topicConf = new \RdKafka\TopicConf(); 
  21.  
  22.         $topicConf->set('auto.commit.interval.ms', 100); 
  23.  
  24.         $topicConf->set('offset.store.method''broker'); 
  25.  
  26.         $topicConf->set('auto.offset.reset''smallest'); 
  27.  
  28.         $topic = $rk->newTopic('test'$topicConf); 
  29.  
  30.         $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); 
  31.  
  32.         while (true) { 
  33.  
  34.             $message = $topic->consume(0, 120*10000); 
  35.  
  36.             switch ($message->err) { 
  37.  
  38.                 case RD_KAFKA_RESP_ERR_NO_ERROR: 
  39.  
  40.                     var_dump($message); 
  41.  
  42.                     break
  43.  
  44.                 case RD_KAFKA_RESP_ERR__PARTITION_EOF: 
  45.  
  46.                     echo "No more messages; will wait for more\n"
  47.  
  48.                     break
  49.  
  50.                 case RD_KAFKA_RESP_ERR__TIMED_OUT: 
  51.  
  52.                     echo "Timed out\n"
  53.  
  54.                     break
  55.  
  56.                 default
  57.  
  58.                     throw new \Exception($message->errstr(), $message->err); 
  59.  
  60.                     break
  61.  
  62.             } 
  63.  
  64.         } 
  65.  
  66.     } 
  67.  

问题汇总

1、 No Java runtime present, requesting install

因为 kafka 需要 java 环境支持,所以安装 java 环境。可以到 javase-jdk14-downloads 选择自己的版本进行下载安装。

2、创建 topic 出现:Replication factor: 1 larger than available brokers: 0

意思是至少有一个 brokers. 也就是说并没有有效的 brokers 可以用。你要确保你的 kafka 已经启动了。

Tags: PHP消息队列 Kafka

分享到: