当前位置:首页 > PHP教程 > php高级应用 > 列表

PHP队列场景以及实现代码实例详解

发布:smiling 来源: PHP粉丝网  添加日期:2022-04-13 15:51:44 浏览: 评论:0 

为了降低单点压力,通常会根据业务情况进行分表分库,将表分布在不同的库中(库可能分布在不同的机器上),但是一个业务场景可能会同时处理两个表的操作。在这种场景下,事务的提交会变得相对复杂,因为多个节点(库)的存在,可能存在部分节点提交失败的情况,即事务的ACID特性需要在各个不同的数据库实例中保证。比如更新db1库的A表时,必须同步更新db2库的B表,两个更新形成一个事务,要么都成功,要么都失败。

那么我们如何利用mysql实现分布式数据库的事务呢?

mysql是从5.0开始支持分布式事务

这里先声明两个概念:

资源管理器(resource manager):用来管理系统资源,是通向事务资源的途径。数据库就是一种资源管理器。资源管理还应该具有管理事务提交或回滚的能力。

事务管理器(transaction manager):事务管理器是分布式事务的核心管理者。事务管理器与每个资源管理器(resource

manager)进行通信,协调并完成事务的处理。事务的各个分支由唯一命名进行标识。

mysql在执行分布式事务(外部XA)的时候,mysql服务器相当于xa事务资源管理器,与mysql链接的客户端相当于事务管理器。

分布式事务原理:分段式提交

分布式事务通常采用2PC协议,全称Two Phase Commitment Protocol。该协议主要为了解决在分布式数据库场景下,所有节点间数据一致性的问题。分布式事务通过2PC协议将提交分成两个阶段:

prepare;commit/rollback

阶段一为准备(prepare)阶段。即所有的参与者准备执行事务并锁住需要的资源。参与者ready时,向transaction manager报告已准备就绪。

阶段二为提交阶段(commit)。当transaction manager确认所有参与者都ready后,向所有参与者发送commit命令。

事务协调者transaction manager

因为XA 事务是基于两阶段提交协议的,所以需要有一个事务协调者(transaction manager)来保证所有的事务参与者都完成了准备工作(第一阶段)。如果事务协调者(transaction manager)收到所有参与者都准备好的消息,就会通知所有的事务都可以提交了(第二阶段)。MySQL 在这个XA事务中扮演的是参与者的角色,而不是事务协调者(transaction manager)。

Mysql的XA事务分为外部XA和内部XA

外部XA用于跨多MySQL实例的分布式事务,需要应用层作为协调者,通俗的说就是比如我们在PHP中写代码,那么PHP书写的逻辑就是协调者。应用层负责决定提交还是回滚,崩溃时的悬挂事务。MySQL数据库外部XA可以用在分布式数据库代理层,实现对MySQL数据库的分布式事务支持,例如开源的代理工具:网易的DDB,淘宝的TDDL等等。

内部XA事务用于同一实例下跨多引擎事务,由Binlog作为协调者,比如在一个存储引擎提交时,需要将提交信息写入二进制日志,这就是一个分布式内部XA事务,只不过二进制日志的参与者是MySQL本身。Binlog作为内部XA的协调者,在binlog中出现的内部xid,在crash recover时,由binlog负责提交。(这是因为,binlog不进行prepare,只进行commit,因此在binlog中出现的内部xid,一定能够保证其在底层各存储引擎中已经完成prepare)。

mysql xa事务的语法

1、首先要确保mysql开启XA事务支持

SHOW VARIABLES LIKE '%xa%'

如果innodb_support_xa的值是ON就说明mysql已经开启对XA事务的支持了。 如果不是就执行:

SET innodb_support_xa = ON

主要有:

XA START 'any_unique_id'; // 'any_unique_id' 是用户给的,全局唯一在一台mysql中开启一个XA事务

XA END 'any_unique_id '; //标识XA事务的操作结束

XA PREPARE 'any_unique_id'; //告知mysql 准备提交这个xa事务

XA COMMIT 'any_unique_id'; //告知mysql提交这个xa事务

XA ROLLBACK 'any_unique_id'; //告知mysql回滚这个xa事务

XA RECOVER;//查看本机mysql目前有哪些xa事务处于prepare状态

XA事务恢复

如果执行分布式事务的mysql crash了,mysql 按照如下逻辑进行恢复:

a. 如果这个xa事务commit了,那么什么也不用做

b. 如果这个xa事务还没有prepare,那么直接回滚它

c. 如果这个xa事务prepare了,还没commit, 那么把它恢复到prepare的状态,由用户去决定commit或rollback

当mysql crash后重新启动之后,执行“XA RECOVER;”查看当前处于prepare状态的xa事务,然后commit或rollback它们。

使用限制

a. XA事务和本地事务以及锁表操作是互斥的

开启了xa事务就无法使用本地事务和锁表操作

  1. mysql> xa start 't1xa'
  2. Query OK, 0 rows affected (0.04 sec) 
  3. mysql> begin; 
  4. ERROR 1399 (XAE07): XAER_RMFAIL: The command cannot be executed when global transaction is in the ACTIVE state 
  5. mysql> lock table t1 read; 
  6. ERROR 1399 (XAE07): XAER_RMFAIL: The command cannot be executed when global transaction is in the ACTIVE state 

开启了本地事务就无法使用xa事务

  1. mysql> begin; 
  2. Query OK, 0 rows affected (0.00 sec) 
  3. mysql> xa start 'rrrr'
  4. ERROR 1400 (XAE09): XAER_OUTSIDE: Some work is done outside global transaction 

b. xa start 之后必须xa end, 否则不能执行xa commit 和xa rollback

所以如果在执行xa事务过程中有语句出错了,你也需要先xa end一下,然后才能xarollback。

注意事项

a. mysql只是提供了xa事务的接口,分布式事务中的mysql实例之间是互相独立的不感知的。 所以用户必须自己实现分布式事务的调度器

b. xa事务有一些使用上的bug, 参考http://www.mysqlops.com/2012/02/24/mysql-xa-optimize.html

主要是

“MySQL数据库的主备数据库的同步,通过Binlog的复制完成。而Binlog是MySQL数据库内部XA事务的协调者,并且MySQL数据库为binlog做了优化——binlog不写prepare日志,只写commit日志。

所有的参与节点prepare完成,在进行xa commit前crash。crash recover如果选择commit此事务。由于binlog在prepare阶段未写,因此主库中看来,此分布式事务最终提交了,但是此事务的操作并未 写到binlog中,因此也就未能成功复制到备库,从而导致主备库数据不一致的情况出现。

而crash recover如果选rollback, 那么就会出现全局不一致(该分布式事务对应的节点,部分已经提交,无法回滚,而部分节点回滚。最终导致同一分布式事务,在各参与节点,最终状态不一致)”

参考的那篇blog中给出的办法是修改mysql代码,这个无法在DBScale中使用。 所以可选的替代方案是不使用

主从复制进行备份,而是直接使用xa事务实现同步写来作为备份。

php+mysql实现分布式事务案例

保证数据表是innodb的

  1. //db_finance库下 
  2. CREATE TABLE `t_user_account` ( 
  3.  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'id'
  4.  `username` varchar(255) NOT NULL DEFAULT '' COMMENT '用户名'
  5.  `money` int(11) NOT NULL DEFAULT '0' COMMENT '账户金额'
  6.  PRIMARY KEY (`id`) 
  7. ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;<br type="_moz" data-filtered="filtered"
  8.  
  9. //db_order库下 
  10. CREATE TABLE `t_user_orders` ( 
  11.  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键'
  12.  `username` varchar(255) NOT NULL DEFAULT ''
  13.  `money` int(11) NOT NULL DEFAULT '0' COMMENT '订单扣款金额'
  14.  PRIMARY KEY (`id`) 
  15. ) ENGINE=InnoDB AUTO_INCREMENT=44 DEFAULT CHARSET=utf8; 

php代码

  1. $username = '公众号PHP开源社区'
  2. $order_money = 100; 
  3.  
  4. $addOrder_success = addOrder($username,$order_money); 
  5. $upAccount_success = updateAccount($username,$order_money); 
  6.  
  7.  
  8. if($addOrder_success['state'] =="yes" && $upAccount_success['state']=="yes"){ 
  9.   commitdb($addOrder_success['xa']); 
  10.   commitdb1($upAccount_success['xa']); 
  11. }else
  12.   rollbackdb($addOrder_success['xa']); 
  13.   rollbackdb1($upAccount_success['xa']); 
  14. die
  15. function addOrder ($username$order_money){ 
  16.  
  17.   $xa = uniqid(""); 
  18.  
  19.   $sql_xa = "XA START '$xa'"
  20.   $db = Yii::app()->dborder_readonly; 
  21.   $db->createCommand($sql_xa)->execute(); 
  22.  
  23.   $insert_sql = "INSERT INTO t_user_orders (`username`,`money`) VALUES ($username,$order_money)"
  24.   $id = $db->createCommand($insert_sql)->execute(); 
  25.  
  26.   $db->createCommand("XA END '$xa'")->execute(); 
  27.   if ($id) { 
  28.  
  29.     $db->createCommand("XA PREPARE '$xa'")->execute(); 
  30.     return ['state' => 'yes''xa' => $xa]; 
  31.   }else { 
  32.     return ['state' => 'no''xa' => $xa]; 
  33.   } 
  34.  
  35.  
  36. function updateAccount($username$order_money){ 
  37.   $xa = uniqid(""); 
  38.   $sql_xa = "XA START '$xa'"
  39.   $db = Yii::app()->db_finance; 
  40.   $db->createCommand($sql_xa)->execute(); 
  41.  
  42.   $sql = "update t_user_account set money=money-".$order_money." where username='$username'"
  43.  
  44.  
  45.   $id = $db->createCommand($sql)->execute(); 
  46.  
  47.   $db->createCommand("XA END '$xa'")->execute(); 
  48.   if ($id) { 
  49.     $db->createCommand("XA PREPARE '$xa'")->execute(); 
  50.     return ['state' => 'yes''xa' => $xa]; 
  51.   }else { 
  52.     return ['state' => 'no''xa' => $xa]; 
  53.   } 
  54.  
  55.  
  56.  
  57. //提交事务! 
  58. function commitdb($xa){ 
  59.  
  60.   $db = Yii::app()->dborder_readonly; 
  61.   return $db->createCommand("XA COMMIT '$xa'")->execute(); 
  62.  
  63. //回滚事务 
  64. function rollbackdb($xa){ 
  65.  
  66.   $db = Yii::app()->dborder_readonly; 
  67.   return $db->createCommand("XA COMMIT '$xa'")->execute(); 
  68.  
  69. //提交事务! 
  70. function commitdb1($xa){ 
  71.  
  72.   $db = Yii::app()->db_finance; 
  73.   return $db->createCommand("XA COMMIT '$xa'")->execute(); 
  74.  
  75. //回滚事务 
  76. function rollbackdb1($xa){ 
  77.   $db = Yii::app()->db_finance; 
  78.   return $db->createCommand("XA ROLLBACK '$xa'")->execute();

Tags: PHP队列场景

分享到: