Search moodle.org's
Developer Documentation

See Release Notes

  • Bug fixes for general core bugs in 4.3.x will end 7 October 2024 (12 months).
  • Bug fixes for security issues in 4.3.x will end 21 April 2025 (18 months).
  • PHP version: minimum PHP 8.0.0 Note: minimum PHP version has increased since Moodle 4.1. PHP 8.2.x is supported too.
<?php
/**
 * ADOdb Load Balancer
 *
 * ADOdbLoadBalancer is a class that allows the user to do read/write splitting
 * and load balancing across multiple servers. It can handle and load balance
 * any number of write capable (AKA: master) or readonly (AKA: slave) connections,
 * including dealing with connection failures and retrying queries on a different
 * connection instead.
 *
 * This file is part of ADOdb, a Database Abstraction Layer library for PHP.
 *
 * @package ADOdb
 * @link https://adodb.org Project's web site and documentation
 * @link https://github.com/ADOdb/ADOdb Source code and issue tracker
 *
 * The ADOdb Library is dual-licensed, released under both the BSD 3-Clause
 * and the GNU Lesser General Public Licence (LGPL) v2.1 or, at your option,
 * any later version. This means you can use it in proprietary products.
 * See the LICENSE.md file distributed with this source code for details.
 * @license BSD-3-Clause
 * @license LGPL-2.1-or-later
 *
 * @copyright 2016 Mike Benoit and the ADOdb community
 */

/**
 * Class ADOdbLoadBalancer
 */
class ADOdbLoadBalancer
{
    /**
     * @var bool    Once a write or readonly connection is made, stick to that connection for the entire request.
     */
    public $enable_sticky_sessions = true;


    /**
     * @var bool|array    All connections to each database.
     */
    protected $connections = false;

    /**
     * @var bool|array    Just connections to the write capable database.
     */
    protected $connections_write = false;

    /**
     * @var bool|array    Just connections to the readonly database.
     */
    protected $connections_readonly = false;

    /**
     * @var array    Counts of all connections and their types.
     */
    protected $total_connections = array('all' => 0, 'write' => 0, 'readonly' => 0);

    /**
     * @var array    Weights of all connections for each type.
     */
    protected $total_connection_weights = array('all' => 0, 'write' => 0, 'readonly' => 0);

    /**
     * @var bool    When in transactions, always use this connection.
     */
    protected $pinned_connection_id = false;

    /**
     * @var array    Last connection_id for each database type.
     */
    protected $last_connection_id = array('write' => false, 'readonly' => false, 'all' => false);

    /**
     * @var bool    Session variables that must be maintained across all connections, ie: SET TIME ZONE.
     */
    protected $session_variables = false;

    /**
     * @var bool    Called immediately after connecting to any DB.
     */
    protected $user_defined_session_init_sql = false;


    /**
     * Defines SQL queries that are executed each time a new database connection is established.
     *
     * @param  $sql
     * @return bool
     */
    public function setSessionInitSQL($sql)
    {
        $this->user_defined_session_init_sql[] = $sql;

        return true;
    }

    /**
     * Adds a new database connection to the pool, but no actual connection is made until its needed.
     *
     * @param  $obj
     * @return bool
     * @throws Exception
     */
    public function addConnection($obj)
    {
        if ($obj instanceof ADOdbLoadBalancerConnection) {
            $this->connections[] = $obj;
            end($this->connections);
            $i = key($this->connections);

            $this->total_connections[$obj->type]++;
            $this->total_connections['all']++;

            $this->total_connection_weights[$obj->type] += abs($obj->weight);
            $this->total_connection_weights['all'] += abs($obj->weight);

            if ($obj->type == 'write') {
                $this->connections_write[] = $i;
            } else {
                $this->connections_readonly[] = $i;
            }

            return true;
        }

        throw new Exception('Connection object is not an instance of ADOdbLoadBalancerConnection');
    }

    /**
     * Removes a database connection from the pool.
     *
     * @param  $i
     * @return bool
     */
    public function removeConnection($i)
    {
        if (isset($this->connections[$i])) {
               $obj = $this->connections[ $i ];

               $this->total_connections[ $obj->type ]--;
               $this->total_connections['all']--;

               $this->total_connection_weights[ $obj->type ] -= abs($obj->weight);
               $this->total_connection_weights['all'] -= abs($obj->weight);

            if ($obj->type == 'write') {
                unset($this->connections_write[array_search($i, $this->connections_write)]);
                // Reindex array.
                $this->connections_write = array_values($this->connections_write);
            } else {
                unset($this->connections_readonly[array_search($i, $this->connections_readonly)]);
                // Reindex array.
                $this->connections_readonly = array_values($this->connections_readonly);
            }

            // Remove any sticky connections as well.
            if ($this->last_connection_id[$obj->type] == $i) {
                $this->last_connection_id[$obj->type] = false;
            }

            unset($this->connections[$i]);

            return true;
        }

        return false;
    }

    /**
     * Returns a database connection of the specified type.
     *
     * Takes into account the connection weight for load balancing.
     *
     * @param  string $type Type of database connection, either: 'write' capable or 'readonly'
     * @return bool|int|string
     */
< private function getConnectionByWeight($type)
> public function getConnectionByWeight($type)
{ if ($type == 'readonly') { $total_weight = $this->total_connection_weights['all']; } else { $total_weight = $this->total_connection_weights['write']; } $i = false; if (is_array($this->connections)) { $n = 0; $num = mt_rand(0, $total_weight); foreach ($this->connections as $i => $connection_obj) { if ($connection_obj->weight > 0 && ($type == 'readonly' || $connection_obj->type == 'write')) { $n += $connection_obj->weight; if ($n >= $num) { break; } } } } return $i; } /** * Returns the proper database connection when taking into account sticky sessions and load balancing. * * @param $type * @return bool|int|mixed|string */ public function getLoadBalancedConnection($type) { if ($this->total_connections == 0) { $connection_id = 0; } else { if ($this->enable_sticky_sessions == true && $this->last_connection_id[$type] !== false) { $connection_id = $this->last_connection_id[$type]; } else { if ($type == 'write' && $this->total_connections['write'] == 1) { $connection_id = $this->connections_write[0]; } else { $connection_id = $this->getConnectionByWeight($type); } } } return $connection_id; } /** * Returns the ADODB connection object by connection_id. * * Ensures that it's connected and the session variables are executed. * * @param $connection_id * @return bool|ADOConnection * @throws Exception */
< private function _getConnection($connection_id)
> public function getConnectionById($connection_id)
{ if (isset($this->connections[$connection_id])) { $connection_obj = $this->connections[$connection_id]; /** @var ADOConnection $adodb_obj */ $adodb_obj = $connection_obj->getADOdbObject(); if (is_object($adodb_obj) && $adodb_obj->_connectionID == false) { try { if ($connection_obj->persistent_connection == true) { $adodb_obj->Pconnect( $connection_obj->host, $connection_obj->user, $connection_obj->password, $connection_obj->database ); } else { $adodb_obj->Connect( $connection_obj->host, $connection_obj->user, $connection_obj->password, $connection_obj->database ); } } catch (Exception $e) { // Connection error, see if there are other connections to try still. throw $e; // No connections left, reThrow exception so application can catch it. }
> // Check to see if a connection test callback was defined, and if so execute it. if (is_array($this->user_defined_session_init_sql)) { > // This is useful for testing replication lag and such to ensure the connection is suitable to be used. foreach ($this->user_defined_session_init_sql as $session_init_sql) { > $test_connection_callback = $connection_obj->getConnectionTestCallback(); $adodb_obj->Execute($session_init_sql); > if (is_callable($test_connection_callback) } > && $test_connection_callback($connection_obj, $adodb_obj) !== TRUE } > ) { $this->executeSessionVariables($adodb_obj); > return false; } > } >
return $adodb_obj; } else { throw new Exception('Unable to return Connection object...'); } } /** * Returns the ADODB connection object by database type. * * Ensures that it's connected and the session variables are executed. * * @param string $type * @param null $pin_connection * @return ADOConnection|bool * @throws Exception */ public function getConnection($type = 'write', $pin_connection = null) { while (($type == 'write' && $this->total_connections['write'] > 0) || ($type == 'readonly' && $this->total_connections['all'] > 0) ) { if ($this->pinned_connection_id !== false) { $connection_id = $this->pinned_connection_id; } else { $connection_id = $this->getLoadBalancedConnection($type); } if ($connection_id !== false) { try {
< $adodb_obj = $this->_getConnection($connection_id); < // $connection_obj = $this->connections[$connection_id]; < break;
> $adodb_obj = $this->getConnectionById($connection_id); > if (is_object($adodb_obj)) { > break; //Found valid connection, continue with it. > } else { > throw new Exception('ADODB Connection Object does not exist. Perhaps LoadBalancer Database Connection Test Failed?'); > }
} catch (Exception $e) { // Connection error, see if there are other connections to try still. $this->removeConnection($connection_id); if ( ($type == 'write' && $this->total_connections['write'] == 0) || ($type == 'readonly' && $this->total_connections['all'] == 0) ) { throw $e; } } } else { throw new Exception('Connection ID is invalid!'); } }
> if (!isset($connection_id)) { $this->last_connection_id[$type] = $connection_id; > throw new Exception('No connection available to use at this time! Type: ' . $type); > } if ($pin_connection === true) { >
$this->pinned_connection_id = $connection_id; } elseif ($pin_connection === false && $adodb_obj->transOff <= 1) { // UnPin connection only if we are 1 level deep in a transaction. $this->pinned_connection_id = false; // When unpinning connection, reset last_connection_id so readonly // queries don't get stuck on the write capable connection. $this->last_connection_id['write'] = false; $this->last_connection_id['readonly'] = false; } return $adodb_obj; } /** * This is a hack to work around pass by reference error. * * Parameter 1 to ADOConnection::GetInsertSQL() expected to be a reference, * value given in adodb-loadbalancer.inc.php on line 83 * * @param $arr * @return array */ private function makeValuesReferenced($arr) { $refs = array(); foreach ($arr as $key => $value) { $refs[$key] = &$arr[$key]; } return $refs; } /** * Allow setting session variables that are maintained across connections. * * Its important that these are set using name/value, so it can determine * if the same variable is set multiple times causing bloat/clutter when * new connections are established. For example if the time_zone is set to * many different ones through the course of a single connection, a new * connection should only set it to the most recent value. * * @param $name * @param $value * @param bool $execute_immediately * @return array|bool|mixed * @throws Exception */ public function setSessionVariable($name, $value, $execute_immediately = true) { $this->session_variables[$name] = $value; if ($execute_immediately == true) { return $this->executeSessionVariables(); } else { return true; } } /** * Executes the session variables on a given ADODB object. * * @param ADOConnection|bool $adodb_obj * @return array|bool|mixed * @throws Exception */ private function executeSessionVariables($adodb_obj = false) { if (is_array($this->session_variables)) { $sql = ''; foreach ($this->session_variables as $name => $value) { // $sql .= 'SET SESSION '. $name .' '. $value; // MySQL uses: SET SESSION foo_bar='foo' // PGSQL uses: SET SESSION foo_bar 'foo' // So leave it up to the user to pass the proper value with '=' if needed. // This may be a candidate to move into ADOdb proper. $sql .= 'SET SESSION ' . $name . ' ' . $value; } if ($adodb_obj !== false) { return $adodb_obj->Execute($sql); } else { return $this->ClusterExecute($sql); } } return false; } /** * Executes the same SQL QUERY on the entire cluster of connections. * Would be used for things like SET SESSION TIME ZONE calls and such. * * @param $sql * @param bool $inputarr * @param bool $return_all_results * @param bool $existing_connections_only * @return array|bool|mixed * @throws Exception */ public function clusterExecute( $sql, $inputarr = false, $return_all_results = false, $existing_connections_only = true ) { if (is_array($this->connections) && count($this->connections) > 0) { foreach ($this->connections as $key => $connection_obj) { if ($existing_connections_only == false || ($existing_connections_only == true && $connection_obj->getADOdbObject()->_connectionID !== false ) ) {
< $adodb_obj = $this->_getConnection($key);
> $adodb_obj = $this->getConnectionById($key);
if (is_object($adodb_obj)) { $result_arr[] = $adodb_obj->Execute($sql, $inputarr); } } } if (isset($result_arr) && $return_all_results == true) { return $result_arr; } else { // Loop through all results checking to see if they match, if they do return the first one // otherwise return an array of all results. if (isset($result_arr)) { foreach ($result_arr as $result) { if ($result == false) { return $result_arr; } } return $result_arr[0]; } else { // When using lazy connections, there are cases where // setSessionVariable() is called early on, but there are // no connections to execute the queries on yet. // This captures that case and forces a RETURN TRUE to occur. // As likely the queries will be executed as soon as a // connection is established. return true; } } } return false; } /** * Determines if a SQL query is read-only or not. * * @param string $sql SQL Query to test. * @return bool */ public function isReadOnlyQuery($sql) { if ( stripos($sql, 'SELECT') === 0 && stripos($sql, 'FOR UPDATE') === false && stripos($sql, ' INTO ') === false && stripos($sql, 'LOCK IN') === false ) { return true; } return false; } /** * Use this instead of __call() as it significantly reduces the overhead of call_user_func_array(). * * @param $sql * @param bool $inputarr * @return array|bool|mixed * @throws Exception */ public function execute($sql, $inputarr = false) { $type = 'write'; $pin_connection = null; // Prevent leading spaces from causing isReadOnlyQuery/stripos from failing. $sql = trim($sql); // SELECT queries that can write and therefore must be run on a write capable connection. // SELECT ... FOR UPDATE; // SELECT ... INTO ... // SELECT .. LOCK IN ... (MYSQL) if ($this->isReadOnlyQuery($sql) == true) { $type = 'readonly'; } elseif (stripos($sql, 'SET') === 0) { // SET SQL statements should likely use setSessionVariable() instead, // so state is properly maintained across connections, especially when they are lazily created. return $this->ClusterExecute($sql, $inputarr); } $adodb_obj = $this->getConnection($type, $pin_connection); if ($adodb_obj !== false) { return $adodb_obj->Execute($sql, $inputarr); } return false; } /** * Magic method to intercept method and callback to the proper ADODB object for write/readonly connections. * * @param string $method ADODB method to call. * @param array $args Arguments to the ADODB method. * @return bool|mixed * @throws Exception */ public function __call($method, $args) { $type = 'write'; $pin_connection = null; // Intercept specific methods to determine if they are read-only or not. $method = strtolower($method); switch ($method) { // case 'execute': // This is the direct overloaded function above instead. case 'getone': case 'getrow': case 'getall': case 'getcol': case 'getassoc': case 'selectlimit': if ($this->isReadOnlyQuery(trim($args[0])) == true) { $type = 'readonly'; } break; case 'cachegetone': case 'cachegetrow': case 'cachegetall': case 'cachegetcol': case 'cachegetassoc': case 'cacheexecute': case 'cacheselect': case 'pageexecute': case 'cachepageexecute': $type = 'readonly'; break; // case 'ignoreerrors': // // When ignoreerrors is called, PIN to the connection until its called again. // if (!isset($args[0]) || (isset($args[0]) && $args[0] == FALSE)) { // $pin_connection = TRUE; // } else { // $pin_connection = FALSE; // } // break; // Manual transactions case 'begintrans': case 'settransactionmode': $pin_connection = true; break; case 'rollbacktrans': case 'committrans': $pin_connection = false; break; // Smart transactions case 'starttrans': $pin_connection = true; break; case 'completetrans': case 'failtrans': // getConnection() will only unpin the transaction if we're exiting the last nested transaction $pin_connection = false; break; // Functions that don't require any connection and therefore // shouldn't force a connection be established before they run. case 'qstr': case 'escape': case 'binddate': case 'bindtimestamp': case 'setfetchmode':
> case 'setcustommetatype':
$type = false; // No connection necessary. break; // Default to assuming write connection is required to be on the safe side. default: break; } if ($type === false) { if (is_array($this->connections) && count($this->connections) > 0) { foreach ($this->connections as $key => $connection_obj) { $adodb_obj = $connection_obj->getADOdbObject(); return call_user_func_array(array($adodb_obj, $method), $this->makeValuesReferenced($args)); // Just makes the function call on the first object. } } } else { $adodb_obj = $this->getConnection($type, $pin_connection); if (is_object($adodb_obj)) { $result = call_user_func_array(array($adodb_obj, $method), $this->makeValuesReferenced($args)); return $result; } } return false; } /** * Magic method to proxy property getter calls back to the proper ADODB object currently in use. * * @param $property * @return mixed * @throws Exception */ public function __get($property) { if (is_array($this->connections) && count($this->connections) > 0) { foreach ($this->connections as $key => $connection_obj) { // Just returns the property from the first object. return $connection_obj->getADOdbObject()->$property; } } return false; } /** * Magic method to proxy property setter calls back to the proper ADODB object currently in use. * * @param $property * @param $value * @return mixed * @throws Exception */ public function __set($property, $value) { // Special function to set object properties on all objects // without initiating a connection to the database. if (is_array($this->connections) && count($this->connections) > 0) { foreach ($this->connections as $key => $connection_obj) { $connection_obj->getADOdbObject()->$property = $value; } return true; } return false; } /** * Override the __clone() magic method. */ private function __clone() { } } /** * Class ADOdbLoadBalancerConnection */ class ADOdbLoadBalancerConnection { /** * @var bool ADOdb drive name. */ protected $driver = false; /** * @var bool ADODB object. */ protected $adodb_obj = false; /**
> * @var callable Closure * @var string Type of connection, either 'write' capable or 'readonly' > */ */ > protected $connection_test_callback = NULL; public $type = 'write'; > > /**
/** * @var int Weight of connection, lower receives less queries, higher receives more queries. */ public $weight = 1; /** * @var bool Determines if the connection persistent. */ public $persistent_connection = false; /** * @var string Database connection host */ public $host = ''; /** * @var string Database connection user */ public $user = ''; /** * @var string Database connection password */ public $password = ''; /** * @var string Database connection database name */ public $database = ''; /** * ADOdbLoadBalancerConnection constructor to setup the ADODB object. * * @param $driver * @param string $type * @param int $weight * @param bool $persistent_connection * @param string $argHostname * @param string $argUsername * @param string $argPassword * @param string $argDatabaseName */ public function __construct( $driver, $type = 'write', $weight = 1, $persistent_connection = false, $argHostname = '', $argUsername = '', $argPassword = '', $argDatabaseName = '' ) { if ($type !== 'write' && $type !== 'readonly') { return false; } $this->adodb_obj = ADONewConnection($driver); $this->type = $type; $this->weight = $weight; $this->persistent_connection = $persistent_connection; $this->host = $argHostname; $this->user = $argUsername; $this->password = $argPassword; $this->database = $argDatabaseName; return true;
> } } > > /** /** > * Anonymous function that is called and must return TRUE for the connection to be usable.* * Returns the ADODB object for this connection. > * The first argument is the type of connection to test. * > * Useful to check things like replication lag. * @return bool > * @param callable $callback */ > * @return void public function getADOdbObject() > */ { > function setConnectionTestCallback($callback) { return $this->adodb_obj; > $this->connection_test_callback = $callback; } > } } > > /** > * @return callable|null > */ > function getConnectionTestCallback() { > return $this->connection_test_callback;