Differences Between: [Versions 311 and 402] [Versions 400 and 402]
1 <?php 2 /** 3 * ADOdb Load Balancer 4 * 5 * ADOdbLoadBalancer is a class that allows the user to do read/write splitting 6 * and load balancing across multiple servers. It can handle and load balance 7 * any number of write capable (AKA: master) or readonly (AKA: slave) connections, 8 * including dealing with connection failures and retrying queries on a different 9 * connection instead. 10 * 11 * This file is part of ADOdb, a Database Abstraction Layer library for PHP. 12 * 13 * @package ADOdb 14 * @link https://adodb.org Project's web site and documentation 15 * @link https://github.com/ADOdb/ADOdb Source code and issue tracker 16 * 17 * The ADOdb Library is dual-licensed, released under both the BSD 3-Clause 18 * and the GNU Lesser General Public Licence (LGPL) v2.1 or, at your option, 19 * any later version. This means you can use it in proprietary products. 20 * See the LICENSE.md file distributed with this source code for details. 21 * @license BSD-3-Clause 22 * @license LGPL-2.1-or-later 23 * 24 * @copyright 2016 Mike Benoit and the ADOdb community 25 */ 26 27 /** 28 * Class ADOdbLoadBalancer 29 */ 30 class ADOdbLoadBalancer 31 { 32 /** 33 * @var bool Once a write or readonly connection is made, stick to that connection for the entire request. 34 */ 35 public $enable_sticky_sessions = true; 36 37 38 /** 39 * @var bool|array All connections to each database. 40 */ 41 protected $connections = false; 42 43 /** 44 * @var bool|array Just connections to the write capable database. 45 */ 46 protected $connections_write = false; 47 48 /** 49 * @var bool|array Just connections to the readonly database. 50 */ 51 protected $connections_readonly = false; 52 53 /** 54 * @var array Counts of all connections and their types. 55 */ 56 protected $total_connections = array('all' => 0, 'write' => 0, 'readonly' => 0); 57 58 /** 59 * @var array Weights of all connections for each type. 60 */ 61 protected $total_connection_weights = array('all' => 0, 'write' => 0, 'readonly' => 0); 62 63 /** 64 * @var bool When in transactions, always use this connection. 65 */ 66 protected $pinned_connection_id = false; 67 68 /** 69 * @var array Last connection_id for each database type. 70 */ 71 protected $last_connection_id = array('write' => false, 'readonly' => false, 'all' => false); 72 73 /** 74 * @var bool Session variables that must be maintained across all connections, ie: SET TIME ZONE. 75 */ 76 protected $session_variables = false; 77 78 /** 79 * @var bool Called immediately after connecting to any DB. 80 */ 81 protected $user_defined_session_init_sql = false; 82 83 84 /** 85 * Defines SQL queries that are executed each time a new database connection is established. 86 * 87 * @param $sql 88 * @return bool 89 */ 90 public function setSessionInitSQL($sql) 91 { 92 $this->user_defined_session_init_sql[] = $sql; 93 94 return true; 95 } 96 97 /** 98 * Adds a new database connection to the pool, but no actual connection is made until its needed. 99 * 100 * @param $obj 101 * @return bool 102 * @throws Exception 103 */ 104 public function addConnection($obj) 105 { 106 if ($obj instanceof ADOdbLoadBalancerConnection) { 107 $this->connections[] = $obj; 108 end($this->connections); 109 $i = key($this->connections); 110 111 $this->total_connections[$obj->type]++; 112 $this->total_connections['all']++; 113 114 $this->total_connection_weights[$obj->type] += abs($obj->weight); 115 $this->total_connection_weights['all'] += abs($obj->weight); 116 117 if ($obj->type == 'write') { 118 $this->connections_write[] = $i; 119 } else { 120 $this->connections_readonly[] = $i; 121 } 122 123 return true; 124 } 125 126 throw new Exception('Connection object is not an instance of ADOdbLoadBalancerConnection'); 127 } 128 129 /** 130 * Removes a database connection from the pool. 131 * 132 * @param $i 133 * @return bool 134 */ 135 public function removeConnection($i) 136 { 137 if (isset($this->connections[$i])) { 138 $obj = $this->connections[ $i ]; 139 140 $this->total_connections[ $obj->type ]--; 141 $this->total_connections['all']--; 142 143 $this->total_connection_weights[ $obj->type ] -= abs($obj->weight); 144 $this->total_connection_weights['all'] -= abs($obj->weight); 145 146 if ($obj->type == 'write') { 147 unset($this->connections_write[array_search($i, $this->connections_write)]); 148 // Reindex array. 149 $this->connections_write = array_values($this->connections_write); 150 } else { 151 unset($this->connections_readonly[array_search($i, $this->connections_readonly)]); 152 // Reindex array. 153 $this->connections_readonly = array_values($this->connections_readonly); 154 } 155 156 // Remove any sticky connections as well. 157 if ($this->last_connection_id[$obj->type] == $i) { 158 $this->last_connection_id[$obj->type] = false; 159 } 160 161 unset($this->connections[$i]); 162 163 return true; 164 } 165 166 return false; 167 } 168 169 /** 170 * Returns a database connection of the specified type. 171 * 172 * Takes into account the connection weight for load balancing. 173 * 174 * @param string $type Type of database connection, either: 'write' capable or 'readonly' 175 * @return bool|int|string 176 */ 177 public function getConnectionByWeight($type) 178 { 179 if ($type == 'readonly') { 180 $total_weight = $this->total_connection_weights['all']; 181 } else { 182 $total_weight = $this->total_connection_weights['write']; 183 } 184 185 $i = false; 186 if (is_array($this->connections)) { 187 $n = 0; 188 $num = mt_rand(0, $total_weight); 189 foreach ($this->connections as $i => $connection_obj) { 190 if ($connection_obj->weight > 0 && ($type == 'readonly' || $connection_obj->type == 'write')) { 191 $n += $connection_obj->weight; 192 if ($n >= $num) { 193 break; 194 } 195 } 196 } 197 } 198 199 return $i; 200 } 201 202 /** 203 * Returns the proper database connection when taking into account sticky sessions and load balancing. 204 * 205 * @param $type 206 * @return bool|int|mixed|string 207 */ 208 public function getLoadBalancedConnection($type) 209 { 210 if ($this->total_connections == 0) { 211 $connection_id = 0; 212 } else { 213 if ($this->enable_sticky_sessions == true && $this->last_connection_id[$type] !== false) { 214 $connection_id = $this->last_connection_id[$type]; 215 } else { 216 if ($type == 'write' && $this->total_connections['write'] == 1) { 217 $connection_id = $this->connections_write[0]; 218 } else { 219 $connection_id = $this->getConnectionByWeight($type); 220 } 221 } 222 } 223 224 return $connection_id; 225 } 226 227 /** 228 * Returns the ADODB connection object by connection_id. 229 * 230 * Ensures that it's connected and the session variables are executed. 231 * 232 * @param $connection_id 233 * @return bool|ADOConnection 234 * @throws Exception 235 */ 236 public function getConnectionById($connection_id) 237 { 238 if (isset($this->connections[$connection_id])) { 239 $connection_obj = $this->connections[$connection_id]; 240 /** @var ADOConnection $adodb_obj */ 241 $adodb_obj = $connection_obj->getADOdbObject(); 242 if (is_object($adodb_obj) && $adodb_obj->_connectionID == false) { 243 try { 244 if ($connection_obj->persistent_connection == true) { 245 $adodb_obj->Pconnect( 246 $connection_obj->host, 247 $connection_obj->user, 248 $connection_obj->password, 249 $connection_obj->database 250 ); 251 } else { 252 $adodb_obj->Connect( 253 $connection_obj->host, 254 $connection_obj->user, 255 $connection_obj->password, 256 $connection_obj->database 257 ); 258 } 259 } catch (Exception $e) { 260 // Connection error, see if there are other connections to try still. 261 throw $e; // No connections left, reThrow exception so application can catch it. 262 } 263 264 // Check to see if a connection test callback was defined, and if so execute it. 265 // This is useful for testing replication lag and such to ensure the connection is suitable to be used. 266 $test_connection_callback = $connection_obj->getConnectionTestCallback(); 267 if (is_callable($test_connection_callback) 268 && $test_connection_callback($connection_obj, $adodb_obj) !== TRUE 269 ) { 270 return false; 271 } 272 273 if (is_array($this->user_defined_session_init_sql)) { 274 foreach ($this->user_defined_session_init_sql as $session_init_sql) { 275 $adodb_obj->Execute($session_init_sql); 276 } 277 } 278 $this->executeSessionVariables($adodb_obj); 279 } 280 281 return $adodb_obj; 282 } else { 283 throw new Exception('Unable to return Connection object...'); 284 } 285 } 286 287 /** 288 * Returns the ADODB connection object by database type. 289 * 290 * Ensures that it's connected and the session variables are executed. 291 * 292 * @param string $type 293 * @param null $pin_connection 294 * @return ADOConnection|bool 295 * @throws Exception 296 */ 297 public function getConnection($type = 'write', $pin_connection = null) 298 { 299 while (($type == 'write' && $this->total_connections['write'] > 0) 300 || ($type == 'readonly' && $this->total_connections['all'] > 0) 301 ) { 302 if ($this->pinned_connection_id !== false) { 303 $connection_id = $this->pinned_connection_id; 304 } else { 305 $connection_id = $this->getLoadBalancedConnection($type); 306 } 307 308 if ($connection_id !== false) { 309 try { 310 $adodb_obj = $this->getConnectionById($connection_id); 311 if (is_object($adodb_obj)) { 312 break; //Found valid connection, continue with it. 313 } else { 314 throw new Exception('ADODB Connection Object does not exist. Perhaps LoadBalancer Database Connection Test Failed?'); 315 } 316 } catch (Exception $e) { 317 // Connection error, see if there are other connections to try still. 318 $this->removeConnection($connection_id); 319 if ( ($type == 'write' && $this->total_connections['write'] == 0) 320 || ($type == 'readonly' && $this->total_connections['all'] == 0) 321 ) { 322 throw $e; 323 } 324 } 325 } else { 326 throw new Exception('Connection ID is invalid!'); 327 } 328 } 329 330 if (!isset($connection_id)) { 331 throw new Exception('No connection available to use at this time! Type: ' . $type); 332 } 333 334 $this->last_connection_id[$type] = $connection_id; 335 336 if ($pin_connection === true) { 337 $this->pinned_connection_id = $connection_id; 338 } elseif ($pin_connection === false && $adodb_obj->transOff <= 1) { 339 // UnPin connection only if we are 1 level deep in a transaction. 340 $this->pinned_connection_id = false; 341 342 // When unpinning connection, reset last_connection_id so readonly 343 // queries don't get stuck on the write capable connection. 344 $this->last_connection_id['write'] = false; 345 $this->last_connection_id['readonly'] = false; 346 } 347 348 return $adodb_obj; 349 } 350 351 /** 352 * This is a hack to work around pass by reference error. 353 * 354 * Parameter 1 to ADOConnection::GetInsertSQL() expected to be a reference, 355 * value given in adodb-loadbalancer.inc.php on line 83 356 * 357 * @param $arr 358 * @return array 359 */ 360 private function makeValuesReferenced($arr) 361 { 362 $refs = array(); 363 364 foreach ($arr as $key => $value) { 365 $refs[$key] = &$arr[$key]; 366 } 367 368 return $refs; 369 } 370 371 /** 372 * Allow setting session variables that are maintained across connections. 373 * 374 * Its important that these are set using name/value, so it can determine 375 * if the same variable is set multiple times causing bloat/clutter when 376 * new connections are established. For example if the time_zone is set to 377 * many different ones through the course of a single connection, a new 378 * connection should only set it to the most recent value. 379 * 380 * @param $name 381 * @param $value 382 * @param bool $execute_immediately 383 * @return array|bool|mixed 384 * @throws Exception 385 */ 386 public function setSessionVariable($name, $value, $execute_immediately = true) 387 { 388 $this->session_variables[$name] = $value; 389 390 if ($execute_immediately == true) { 391 return $this->executeSessionVariables(); 392 } else { 393 return true; 394 } 395 } 396 397 /** 398 * Executes the session variables on a given ADODB object. 399 * 400 * @param ADOConnection|bool $adodb_obj 401 * @return array|bool|mixed 402 * @throws Exception 403 */ 404 private function executeSessionVariables($adodb_obj = false) 405 { 406 if (is_array($this->session_variables)) { 407 $sql = ''; 408 foreach ($this->session_variables as $name => $value) { 409 // $sql .= 'SET SESSION '. $name .' '. $value; 410 // MySQL uses: SET SESSION foo_bar='foo' 411 // PGSQL uses: SET SESSION foo_bar 'foo' 412 // So leave it up to the user to pass the proper value with '=' if needed. 413 // This may be a candidate to move into ADOdb proper. 414 $sql .= 'SET SESSION ' . $name . ' ' . $value; 415 } 416 417 if ($adodb_obj !== false) { 418 return $adodb_obj->Execute($sql); 419 } else { 420 return $this->ClusterExecute($sql); 421 } 422 } 423 424 return false; 425 } 426 427 /** 428 * Executes the same SQL QUERY on the entire cluster of connections. 429 * Would be used for things like SET SESSION TIME ZONE calls and such. 430 * 431 * @param $sql 432 * @param bool $inputarr 433 * @param bool $return_all_results 434 * @param bool $existing_connections_only 435 * @return array|bool|mixed 436 * @throws Exception 437 */ 438 public function clusterExecute( 439 $sql, 440 $inputarr = false, 441 $return_all_results = false, 442 $existing_connections_only = true 443 ) { 444 if (is_array($this->connections) && count($this->connections) > 0) { 445 foreach ($this->connections as $key => $connection_obj) { 446 if ($existing_connections_only == false 447 || ($existing_connections_only == true 448 && $connection_obj->getADOdbObject()->_connectionID !== false 449 ) 450 ) { 451 $adodb_obj = $this->getConnectionById($key); 452 if (is_object($adodb_obj)) { 453 $result_arr[] = $adodb_obj->Execute($sql, $inputarr); 454 } 455 } 456 } 457 458 if (isset($result_arr) && $return_all_results == true) { 459 return $result_arr; 460 } else { 461 // Loop through all results checking to see if they match, if they do return the first one 462 // otherwise return an array of all results. 463 if (isset($result_arr)) { 464 foreach ($result_arr as $result) { 465 if ($result == false) { 466 return $result_arr; 467 } 468 } 469 470 return $result_arr[0]; 471 } else { 472 // When using lazy connections, there are cases where 473 // setSessionVariable() is called early on, but there are 474 // no connections to execute the queries on yet. 475 // This captures that case and forces a RETURN TRUE to occur. 476 // As likely the queries will be executed as soon as a 477 // connection is established. 478 return true; 479 } 480 } 481 } 482 483 return false; 484 } 485 486 /** 487 * Determines if a SQL query is read-only or not. 488 * 489 * @param string $sql SQL Query to test. 490 * @return bool 491 */ 492 public function isReadOnlyQuery($sql) 493 { 494 if ( stripos($sql, 'SELECT') === 0 495 && stripos($sql, 'FOR UPDATE') === false 496 && stripos($sql, ' INTO ') === false 497 && stripos($sql, 'LOCK IN') === false 498 ) { 499 return true; 500 } 501 502 return false; 503 } 504 505 /** 506 * Use this instead of __call() as it significantly reduces the overhead of call_user_func_array(). 507 * 508 * @param $sql 509 * @param bool $inputarr 510 * @return array|bool|mixed 511 * @throws Exception 512 */ 513 public function execute($sql, $inputarr = false) 514 { 515 $type = 'write'; 516 $pin_connection = null; 517 518 // Prevent leading spaces from causing isReadOnlyQuery/stripos from failing. 519 $sql = trim($sql); 520 521 // SELECT queries that can write and therefore must be run on a write capable connection. 522 // SELECT ... FOR UPDATE; 523 // SELECT ... INTO ... 524 // SELECT .. LOCK IN ... (MYSQL) 525 if ($this->isReadOnlyQuery($sql) == true) { 526 $type = 'readonly'; 527 } elseif (stripos($sql, 'SET') === 0) { 528 // SET SQL statements should likely use setSessionVariable() instead, 529 // so state is properly maintained across connections, especially when they are lazily created. 530 return $this->ClusterExecute($sql, $inputarr); 531 } 532 533 $adodb_obj = $this->getConnection($type, $pin_connection); 534 if ($adodb_obj !== false) { 535 return $adodb_obj->Execute($sql, $inputarr); 536 } 537 538 return false; 539 } 540 541 /** 542 * Magic method to intercept method and callback to the proper ADODB object for write/readonly connections. 543 * 544 * @param string $method ADODB method to call. 545 * @param array $args Arguments to the ADODB method. 546 * @return bool|mixed 547 * @throws Exception 548 */ 549 public function __call($method, $args) 550 { 551 $type = 'write'; 552 $pin_connection = null; 553 554 // Intercept specific methods to determine if they are read-only or not. 555 $method = strtolower($method); 556 switch ($method) { 557 // case 'execute': // This is the direct overloaded function above instead. 558 case 'getone': 559 case 'getrow': 560 case 'getall': 561 case 'getcol': 562 case 'getassoc': 563 case 'selectlimit': 564 if ($this->isReadOnlyQuery(trim($args[0])) == true) { 565 $type = 'readonly'; 566 } 567 break; 568 case 'cachegetone': 569 case 'cachegetrow': 570 case 'cachegetall': 571 case 'cachegetcol': 572 case 'cachegetassoc': 573 case 'cacheexecute': 574 case 'cacheselect': 575 case 'pageexecute': 576 case 'cachepageexecute': 577 $type = 'readonly'; 578 break; 579 // case 'ignoreerrors': 580 // // When ignoreerrors is called, PIN to the connection until its called again. 581 // if (!isset($args[0]) || (isset($args[0]) && $args[0] == FALSE)) { 582 // $pin_connection = TRUE; 583 // } else { 584 // $pin_connection = FALSE; 585 // } 586 // break; 587 588 // Manual transactions 589 case 'begintrans': 590 case 'settransactionmode': 591 $pin_connection = true; 592 break; 593 case 'rollbacktrans': 594 case 'committrans': 595 $pin_connection = false; 596 break; 597 // Smart transactions 598 case 'starttrans': 599 $pin_connection = true; 600 break; 601 case 'completetrans': 602 case 'failtrans': 603 // getConnection() will only unpin the transaction if we're exiting the last nested transaction 604 $pin_connection = false; 605 break; 606 607 // Functions that don't require any connection and therefore 608 // shouldn't force a connection be established before they run. 609 case 'qstr': 610 case 'escape': 611 case 'binddate': 612 case 'bindtimestamp': 613 case 'setfetchmode': 614 case 'setcustommetatype': 615 $type = false; // No connection necessary. 616 break; 617 618 // Default to assuming write connection is required to be on the safe side. 619 default: 620 break; 621 } 622 623 if ($type === false) { 624 if (is_array($this->connections) && count($this->connections) > 0) { 625 foreach ($this->connections as $key => $connection_obj) { 626 $adodb_obj = $connection_obj->getADOdbObject(); 627 return call_user_func_array(array($adodb_obj, $method), $this->makeValuesReferenced($args)); // Just makes the function call on the first object. 628 } 629 } 630 } else { 631 $adodb_obj = $this->getConnection($type, $pin_connection); 632 if (is_object($adodb_obj)) { 633 $result = call_user_func_array(array($adodb_obj, $method), $this->makeValuesReferenced($args)); 634 635 return $result; 636 } 637 } 638 return false; 639 } 640 641 /** 642 * Magic method to proxy property getter calls back to the proper ADODB object currently in use. 643 * 644 * @param $property 645 * @return mixed 646 * @throws Exception 647 */ 648 public function __get($property) 649 { 650 if (is_array($this->connections) && count($this->connections) > 0) { 651 foreach ($this->connections as $key => $connection_obj) { 652 // Just returns the property from the first object. 653 return $connection_obj->getADOdbObject()->$property; 654 } 655 } 656 657 return false; 658 } 659 660 /** 661 * Magic method to proxy property setter calls back to the proper ADODB object currently in use. 662 * 663 * @param $property 664 * @param $value 665 * @return mixed 666 * @throws Exception 667 */ 668 public function __set($property, $value) 669 { 670 // Special function to set object properties on all objects 671 // without initiating a connection to the database. 672 if (is_array($this->connections) && count($this->connections) > 0) { 673 foreach ($this->connections as $key => $connection_obj) { 674 $connection_obj->getADOdbObject()->$property = $value; 675 } 676 677 return true; 678 } 679 680 return false; 681 } 682 683 /** 684 * Override the __clone() magic method. 685 */ 686 private function __clone() 687 { 688 } 689 } 690 691 /** 692 * Class ADOdbLoadBalancerConnection 693 */ 694 class ADOdbLoadBalancerConnection 695 { 696 /** 697 * @var bool ADOdb drive name. 698 */ 699 protected $driver = false; 700 701 /** 702 * @var bool ADODB object. 703 */ 704 protected $adodb_obj = false; 705 706 /** 707 * @var callable Closure 708 */ 709 protected $connection_test_callback = NULL; 710 711 /** 712 * @var string Type of connection, either 'write' capable or 'readonly' 713 */ 714 public $type = 'write'; 715 716 /** 717 * @var int Weight of connection, lower receives less queries, higher receives more queries. 718 */ 719 public $weight = 1; 720 721 /** 722 * @var bool Determines if the connection persistent. 723 */ 724 public $persistent_connection = false; 725 726 /** 727 * @var string Database connection host 728 */ 729 public $host = ''; 730 731 /** 732 * @var string Database connection user 733 */ 734 public $user = ''; 735 736 /** 737 * @var string Database connection password 738 */ 739 public $password = ''; 740 741 /** 742 * @var string Database connection database name 743 */ 744 public $database = ''; 745 746 /** 747 * ADOdbLoadBalancerConnection constructor to setup the ADODB object. 748 * 749 * @param $driver 750 * @param string $type 751 * @param int $weight 752 * @param bool $persistent_connection 753 * @param string $argHostname 754 * @param string $argUsername 755 * @param string $argPassword 756 * @param string $argDatabaseName 757 */ 758 public function __construct( 759 $driver, 760 $type = 'write', 761 $weight = 1, 762 $persistent_connection = false, 763 $argHostname = '', 764 $argUsername = '', 765 $argPassword = '', 766 $argDatabaseName = '' 767 ) { 768 if ($type !== 'write' && $type !== 'readonly') { 769 return false; 770 } 771 772 $this->adodb_obj = ADONewConnection($driver); 773 774 $this->type = $type; 775 $this->weight = $weight; 776 $this->persistent_connection = $persistent_connection; 777 778 $this->host = $argHostname; 779 $this->user = $argUsername; 780 $this->password = $argPassword; 781 $this->database = $argDatabaseName; 782 783 return true; 784 } 785 786 /** 787 * Anonymous function that is called and must return TRUE for the connection to be usable.* 788 * The first argument is the type of connection to test. 789 * Useful to check things like replication lag. 790 * @param callable $callback 791 * @return void 792 */ 793 function setConnectionTestCallback($callback) { 794 $this->connection_test_callback = $callback; 795 } 796 797 /** 798 * @return callable|null 799 */ 800 function getConnectionTestCallback() { 801 return $this->connection_test_callback; 802 } 803 804 /** 805 * Returns the ADODB object for this connection. 806 * 807 * @return bool 808 */ 809 public function getADOdbObject() 810 { 811 return $this->adodb_obj; 812 } 813 }
title
Description
Body
title
Description
Body
title
Description
Body
title
Body