当前位置:首页 > PHP教程 > php高级应用 > 列表

php测试kafka项目示例

发布:smiling 来源: PHP粉丝网  添加日期:2022-02-10 09:33:07 浏览: 评论:0 

本文实例讲述了php测试kafka项目,分享给大家供大家参考,具体如下:

概述

Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。

主要应用场景是:日志收集系统和消息系统。

安装kafka-php项目依赖

composer require nmred/kafka-php

produce.php

  1. <?php 
  2. require './vendor/autoload.php'
  3. date_default_timezone_set('PRC'); 
  4. $config = \Kafka\ProducerConfig::getInstance(); 
  5. $config->setMetadataRefreshIntervalMs(10000); 
  6. $config->setMetadataBrokerList('127.0.0.1:9092'); 
  7. $config->setBrokerVersion('0.10.2.1'); 
  8. $config->setRequiredAck(1); 
  9. $config->setIsAsyn(false); 
  10. $config->setProduceInterval(500); 
  11. $producer = new \Kafka\Producer(function() { 
  12.  $t = time(); 
  13.  return array
  14.  array
  15.   'topic' => 'test'
  16.   'value' => $t
  17.   'key' => $t
  18.  ), 
  19.  ); 
  20. }); 
  21. $producer->success(function($result) { 
  22.  var_export($result); 
  23. }); 
  24. $producer->error(function($errorCode) { 
  25.  var_dump('error'$errorCode); 
  26. }); 
  27. $producer->send(); 

consumer.php

  1. <?php 
  2. require './vendor/autoload.php'
  3. date_default_timezone_set('PRC'); 
  4. $config = \Kafka\ConsumerConfig::getInstance(); 
  5. $config->setMetadataRefreshIntervalMs(10000); 
  6. $config->setMetadataBrokerList('127.0.0.1:9092'); 
  7. $config->setGroupId('test'); 
  8. $config->setBrokerVersion('0.10.2.1'); 
  9. $config->setTopics(array('test')); 
  10. $consumer = new \Kafka\Consumer(); 
  11. $consumer->start(function($topic$part$message) { 
  12.  var_dump($message); 
  13. }); 

测试生产者

php produce.php

测试消费者

php consumer.php

Tags: php测试kafka

分享到: