当前位置:首页 > PHP教程 > php高级应用 > 列表

教你使用mixphp打造多进程异步邮件发送

发布:smiling 来源: PHP粉丝网  添加日期:2022-07-26 12:40:44 浏览: 评论:0 

注意:这个是 MixPHP V1 的范例

邮件发送是很常见的需求,由于发送邮件的操作一般是比较耗时的,所以我们一般采用异步处理来提升用户体验,而异步通常我们使用消息队列来实现。

传统 MVC 框架由于缺少多进程开发能力,通常是采用同一个脚本执行多次,产生多个进程的方式,mixphp 封装了 TaskExecutor 专用于多进程开发,用户能非常简单的开发出功能完善的高可用多进程应用。

下面演示一个异步邮件发送系统的开发过程,涉及知识点:

异步

消息队列

多进程

守护进程

如何使用消息队列实现异步

PHP 使用消息队列通常是使用中间件来实现,常用的消息中间件有:

redis

rabbitmq

kafka

本次我们选用 redis 来实现异步邮件发送,redis 的数据类型中有一个 list 类型,可实现消息队列,使用以下命令:

  1. // 入列 
  2.  
  3. $redis->lpush($key$data); 
  4.  
  5. // 出列 
  6.  
  7. $data = $redis->rpop($key); 
  8.  
  9. // 阻塞出列 
  10.  
  11. $data = $redis->brpop($key, 10); 

架构设计

本实例由传统 MVC 框架投递邮件发送需求,MixPHP 多进程执行发送任务。

邮件发送库选型

以往我们通常使用框架提供的邮件发送库,或者网上下载别的用户分享的库,composer 出现后,https://packagist.org/ 上有大量优质的库,我们只需选择一个最好的即可,本例选择 swiftmailer。

由于发送任务是由 MixPHP 执行,所以 swiftmailer 是安装在 MixPHP 项目中,在项目根目录中执行以下命令安装:

composer require swiftmailer/swiftmailer

生产者开发

在邮件发送这个需求中生产者是指投递发送任务的一方,这一方通常是一个接口或网页,这个部分并不一定需 mixphp 开发,TP、CI、YII 这些都可以,只需在接口或网页中把任务信息投递到消息队列中即可。

在传统 MVC 框架的控制器中增加如下代码:

通常框架中使用 redis 会安装一个类库来使用,本例使用原生代码,便于理解。

  1. // 连接 
  2.  
  3. $redis = new \Redis(); 
  4.  
  5. if (!$redis->connect('127.0.0.1', 6379)) { 
  6.  
  7.     throw new \Exception('Redis Connect Failure'); 
  8.  
  9.  
  10. $redis->auth(''); 
  11.  
  12. $redis->select(0); 
  13.  
  14. // 投递任务 
  15.  
  16. $data = [ 
  17.  
  18.     'to'      => ['***@qq.com' => 'A name'], 
  19.  
  20.     'body'    => 'Here is the message itself'
  21.  
  22.     'subject' => 'The title content'
  23.  
  24. ]; 
  25.  
  26. $redis->lpush('queue:email', serialize($data)); 

通常异步开发中,投递完成后就会立即响应一个消息给用户,当然此时该任务并没有执行。

消费者开发

本例我们使用 MixPHP 的多进程开发工具 TaskExecutor 来完成这个需求,通常使用常驻进程来处理队列的消费,所以我们使用 TaskExecutor 的 TYPE_DAEMON 类型,MODE_PUSH 模式。

TaskExecutor 的 MODE_PUSH 模式有二种进程:

左进程:负责从消息队列取出任务数据,投放给中进程。

中进程:负责执行邮件发送任务。

PushCommand.php 代码如下:

  1. <?php 
  2.  
  3. namespace apps\daemon\commands; 
  4.  
  5. use mix\console\ExitCode; 
  6.  
  7. use mix\facades\Input; 
  8.  
  9. use mix\facades\Redis; 
  10.  
  11. use mix\task\CenterProcess; 
  12.  
  13. use mix\task\LeftProcess; 
  14.  
  15. use mix\task\TaskExecutor; 
  16.  
  17.  
  18.  
  19. /** 
  20.  
  21.  * 推送模式范例 
  22.  
  23.  * @author 刘健 <coder.liu@qq.com> 
  24.  
  25.  */ 
  26.  
  27. class PushCommand extends BaseCommand 
  28.  
  29.  
  30.  
  31.  
  32.     // 配置信息 
  33.  
  34.     const HOST = 'smtpdm.aliyun.com'
  35.  
  36.     const PORT = 465; 
  37.  
  38.     const SECURITY = 'ssl'
  39.  
  40.     const USERNAME = '****@email.***.com'
  41.  
  42.     const PASSWORD = '****'
  43.  
  44.  
  45.  
  46.     // 初始化事件 
  47.  
  48.     public function onInitialize() 
  49.  
  50.     { 
  51.  
  52.         parent::onInitialize(); // TODO: Change the autogenerated stub 
  53.  
  54.         // 获取程序名称 
  55.  
  56.         $this->programName = Input::getCommandName(); 
  57.  
  58.         // 设置pidfile 
  59.  
  60.         $this->pidFile = "/var/run/{$this->programName}.pid"
  61.  
  62.     } 
  63.  
  64.  
  65.  
  66.     /** 
  67.  
  68.      * 获取服务 
  69.  
  70.      * @return TaskExecutor 
  71.  
  72.      */ 
  73.  
  74.     public function getTaskService() 
  75.  
  76.     { 
  77.  
  78.         return create_object( 
  79.  
  80.             [ 
  81.  
  82.                 // 类路径 
  83.  
  84.                 'class'         => 'mix\task\TaskExecutor'
  85.  
  86.                 // 服务名称 
  87.  
  88.                 'name'          => "mix-daemon: {$this->programName}"
  89.  
  90.                 // 执行类型 
  91.  
  92.                 'type'          => \mix\task\TaskExecutor::TYPE_DAEMON, 
  93.  
  94.                 // 执行模式 
  95.  
  96.                 'mode'          => \mix\task\TaskExecutor::MODE_PUSH, 
  97.  
  98.                 // 左进程数 
  99.  
  100.                 'leftProcess'   => 1, 
  101.  
  102.                 // 中进程数 
  103.  
  104.                 'centerProcess' => 5, 
  105.  
  106.                 // 任务超时时间 (秒) 
  107.  
  108.                 'timeout'       => 5, 
  109.  
  110.             ] 
  111.  
  112.         ); 
  113.  
  114.     } 
  115.  
  116.  
  117.  
  118.     // 启动 
  119.  
  120.     public function actionStart() 
  121.  
  122.     { 
  123.  
  124.         // 预处理 
  125.  
  126.         if (!parent::actionStart()) { 
  127.  
  128.             return ExitCode::UNSPECIFIED_ERROR; 
  129.  
  130.         } 
  131.  
  132.         // 启动服务 
  133.  
  134.         $service = $this->getTaskService(); 
  135.  
  136.         $service->on('LeftStart', [$this'onLeftStart']); 
  137.  
  138.         $service->on('CenterStart', [$this'onCenterStart']); 
  139.  
  140.         $service->start(); 
  141.  
  142.         // 返回退出码 
  143.  
  144.         return ExitCode::OK; 
  145.  
  146.     } 
  147.  
  148.  
  149.  
  150.     // 左进程启动事件回调函数 
  151.  
  152.     public function onLeftStart(LeftProcess $worker
  153.  
  154.     { 
  155.  
  156.         try { 
  157.  
  158.             // 模型内使用长连接版本的数据库组件,这样组件会自动帮你维护连接不断线 
  159.  
  160.             $queueModel = Redis::getInstance(); 
  161.  
  162.             // 保持任务执行状态,循环结束后当前进程会退出,主进程会重启一个新进程继续执行任务,这样做是为了避免长时间执行内存溢出 
  163.  
  164.             for ($j = 0; $j < 16000; $j++) { 
  165.  
  166.                 // 从消息队列中间件阻塞获取一条消息 
  167.  
  168.                 $data = $queueModel->brpop('queue:email', 10); 
  169.  
  170.                 if (emptyempty($data)) { 
  171.  
  172.                     continue
  173.  
  174.                 } 
  175.  
  176.                 list(, $data) = $data
  177.  
  178.                 // 将消息推送给中进程去处理,push有长度限制 (https://wiki.swoole.com/wiki/page/290.html) 
  179.  
  180.                 $worker->push($data, false); 
  181.  
  182.             } 
  183.  
  184.         } catch (\Exception $e) { 
  185.  
  186.             // 休息一会,避免 CPU 出现 100% 
  187.  
  188.             sleep(1); 
  189.  
  190.             // 抛出错误 
  191.  
  192.             throw $e
  193.  
  194.         } 
  195.  
  196.     } 
  197.  
  198.  
  199.  
  200.     // 中进程启动事件回调函数 
  201.  
  202.     public function onCenterStart(CenterProcess $worker
  203.  
  204.     { 
  205.  
  206.         // 保持任务执行状态,循环结束后当前进程会退出,主进程会重启一个新进程继续执行任务,这样做是为了避免长时间执行内存溢出 
  207.  
  208.         for ($j = 0; $j < 16000; $j++) { 
  209.  
  210.             // 从进程消息队列中抢占一条消息 
  211.  
  212.             $data = $worker->pop(); 
  213.  
  214.             if (emptyempty($data)) { 
  215.  
  216.                 continue
  217.  
  218.             } 
  219.  
  220.             // 处理消息 
  221.  
  222.             try { 
  223.  
  224.                 // 处理消息,比如:发送短信、发送邮件、微信推送 
  225.  
  226.                 var_dump($data); 
  227.  
  228.                 $ret = self::sendEmail($data); 
  229.  
  230.                 var_dump($ret); 
  231.  
  232.             } catch (\Exception $e) { 
  233.  
  234.                 // 回退数据到消息队列 
  235.  
  236.                 $worker->rollback($data); 
  237.  
  238.                 // 休息一会,避免 CPU 出现 100% 
  239.  
  240.                 sleep(1); 
  241.  
  242.                 // 抛出错误 
  243.  
  244.                 throw $e
  245.  
  246.             } 
  247.  
  248.         } 
  249.  
  250.     } 
  251.  
  252.  
  253.  
  254.     // 发送邮件 
  255.  
  256.     public static function sendEmail($data
  257.  
  258.     { 
  259.  
  260.         // Create the Transport 
  261.  
  262.         $transport = (new \Swift_SmtpTransport(self::HOST, self::PORT, self::SECURITY)) 
  263.  
  264.             ->setUsername(self::USERNAME) 
  265.  
  266.             ->setPassword(self::PASSWORD); 
  267.  
  268.         // Create the Mailer using your created Transport 
  269.  
  270.         $mailer = new \Swift_Mailer($transport); 
  271.  
  272.         // Create a message 
  273.  
  274.         $message = (new \Swift_Message($data['subject'])) 
  275.  
  276.             ->setFrom([self::USERNAME => '**网']) 
  277.  
  278.             ->setTo($data['to']) 
  279.  
  280.             ->setBody($data['body']); 
  281.  
  282.         // Send the message 
  283.  
  284.         $result = $mailer->send($message); 
  285.  
  286.         return $result
  287.  
  288.     } 
  289.  
  290.  
  291.  

测试

1.在 shell 中启动 push 常驻程序。

[root@localhost bin]# ./mix-daemon push start

mix-daemon 'push' start successed.

1.调用接口往消息队列投放任务。

此时 shell 终端将打印:

教你使用mixphp打造多进程异步邮件发送

成功收到测试邮件:

教你使用mixphp打造多进程异步邮件发送

MixPHP

GitHub: https://github.com/mix-php/mix

官网:http://www.mixphp.cn/

Tags: mixphp多进程 php异步邮件发送

分享到: