Search moodle.org's
Developer Documentation

See Release Notes

  • Bug fixes for general core bugs in 4.3.x will end 7 October 2024 (12 months).
  • Bug fixes for security issues in 4.3.x will end 21 April 2025 (18 months).
  • PHP version: minimum PHP 8.0.0 Note: minimum PHP version has increased since Moodle 4.1. PHP 8.2.x is supported too.

Differences Between: [Versions 310 and 403] [Versions 311 and 403] [Versions 39 and 403] [Versions 400 and 403] [Versions 401 and 403] [Versions 402 and 403]

   1  <?php
   2  // This file is part of Moodle - http://moodle.org/
   3  //
   4  // Moodle is free software: you can redistribute it and/or modify
   5  // it under the terms of the GNU General Public License as published by
   6  // the Free Software Foundation, either version 3 of the License, or
   7  // (at your option) any later version.
   8  //
   9  // Moodle is distributed in the hope that it will be useful,
  10  // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11  // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  12  // GNU General Public License for more details.
  13  //
  14  // You should have received a copy of the GNU General Public License
  15  // along with Moodle.  If not, see <http://www.gnu.org/licenses/>.
  16  
  17  /**
  18   * Native pgsql class representing moodle database interface.
  19   *
  20   * @package    core_dml
  21   * @copyright  2008 Petr Skoda (http://skodak.org)
  22   * @license    http://www.gnu.org/copyleft/gpl.html GNU GPL v3 or later
  23   */
  24  
  25  defined('MOODLE_INTERNAL') || die();
  26  
  27  require_once (__DIR__.'/moodle_database.php');
  28  require_once (__DIR__.'/moodle_read_slave_trait.php');
  29  require_once (__DIR__.'/pgsql_native_moodle_recordset.php');
  30  require_once (__DIR__.'/pgsql_native_moodle_temptables.php');
  31  
  32  /**
  33   * Native pgsql class representing moodle database interface.
  34   *
  35   * @package    core_dml
  36   * @copyright  2008 Petr Skoda (http://skodak.org)
  37   * @license    http://www.gnu.org/copyleft/gpl.html GNU GPL v3 or later
  38   */
  39  class pgsql_native_moodle_database extends moodle_database {
  40      use moodle_read_slave_trait {
  41          select_db_handle as read_slave_select_db_handle;
  42          can_use_readonly as read_slave_can_use_readonly;
  43          query_start as read_slave_query_start;
  44          query_end as read_slave_query_end;
  45      }
  46  
  47      /** @var array $sslmodes */
  48      private static $sslmodes = [
  49          'disable',
  50          'prefer',
  51          'require',
  52          'verify-full'
  53      ];
  54  
  55      /** @var array $serverinfo cache */
  56      private $serverinfo = [];
  57  
  58      /** @var array $dbhcursor keep track of open cursors */
  59      private $dbhcursor = [];
  60  
  61      /** @var resource|PgSql\Connection|null $pgsql database resource */
  62      protected $pgsql     = null;
  63  
  64      protected $last_error_reporting; // To handle pgsql driver default verbosity
  65  
  66      /** @var bool savepoint hack for MDL-35506 - workaround for automatic transaction rollback on error */
  67      protected $savepointpresent = false;
  68  
  69      /** @var int Number of cursors used (for constructing a unique ID) */
  70      protected $cursorcount = 0;
  71  
  72      /** @var int Default number of rows to fetch at a time when using recordsets with cursors */
  73      const DEFAULT_FETCH_BUFFER_SIZE = 100000;
  74  
  75      /**
  76       * Detects if all needed PHP stuff installed.
  77       * Note: can be used before connect()
  78       * @return mixed true if ok, string if something
  79       */
  80      public function driver_installed() {
  81          if (!extension_loaded('pgsql')) {
  82              return get_string('pgsqlextensionisnotpresentinphp', 'install');
  83          }
  84          return true;
  85      }
  86  
  87      /**
  88       * Returns database family type - describes SQL dialect
  89       * Note: can be used before connect()
  90       * @return string db family name (mysql, postgres, mssql, oracle, etc.)
  91       */
  92      public function get_dbfamily() {
  93          return 'postgres';
  94      }
  95  
  96      /**
  97       * Returns more specific database driver type
  98       * Note: can be used before connect()
  99       * @return string db type mysqli, pgsql, oci, mssql, sqlsrv
 100       */
 101      protected function get_dbtype() {
 102          return 'pgsql';
 103      }
 104  
 105      /**
 106       * Returns general database library name
 107       * Note: can be used before connect()
 108       * @return string db type pdo, native
 109       */
 110      protected function get_dblibrary() {
 111          return 'native';
 112      }
 113  
 114      /**
 115       * Returns localised database type name
 116       * Note: can be used before connect()
 117       * @return string
 118       */
 119      public function get_name() {
 120          return get_string('nativepgsql', 'install');
 121      }
 122  
 123      /**
 124       * Returns localised database configuration help.
 125       * Note: can be used before connect()
 126       * @return string
 127       */
 128      public function get_configuration_help() {
 129          return get_string('nativepgsqlhelp', 'install');
 130      }
 131  
 132      /**
 133       * Connect to db
 134       * @param string $dbhost The database host.
 135       * @param string $dbuser The database username.
 136       * @param string $dbpass The database username's password.
 137       * @param string $dbname The name of the database being connected to.
 138       * @param mixed $prefix string means moodle db prefix, false used for external databases where prefix not used
 139       * @param array $dboptions driver specific options
 140       * @return bool true
 141       * @throws moodle_exception
 142       * @throws dml_connection_exception if error
 143       */
 144      public function raw_connect(string $dbhost, string $dbuser, string $dbpass, string $dbname, $prefix, array $dboptions=null): bool {
 145          if ($prefix == '' and !$this->external) {
 146              //Enforce prefixes for everybody but mysql
 147              throw new dml_exception('prefixcannotbeempty', $this->get_dbfamily());
 148          }
 149  
 150          $driverstatus = $this->driver_installed();
 151  
 152          if ($driverstatus !== true) {
 153              throw new dml_exception('dbdriverproblem', $driverstatus);
 154          }
 155  
 156          $this->store_settings($dbhost, $dbuser, $dbpass, $dbname, $prefix, $dboptions);
 157  
 158          $pass = addcslashes($this->dbpass, "'\\");
 159  
 160          // Unix socket connections should have lower overhead
 161          if (!empty($this->dboptions['dbsocket']) and ($this->dbhost === 'localhost' or $this->dbhost === '127.0.0.1')) {
 162              $connection = "user='$this->dbuser' password='$pass' dbname='$this->dbname'";
 163              if (strpos($this->dboptions['dbsocket'], '/') !== false) {
 164                  // A directory was specified as the socket location.
 165                  $connection .= " host='".$this->dboptions['dbsocket']."'";
 166              }
 167              if (!empty($this->dboptions['dbport'])) {
 168                  // A port as specified, add it to the connection as it's used as part of the socket path.
 169                  $connection .= " port ='".$this->dboptions['dbport']."'";
 170              }
 171          } else {
 172              $this->dboptions['dbsocket'] = '';
 173              if (empty($this->dbname)) {
 174                  // probably old style socket connection - do not add port
 175                  $port = "";
 176              } else if (empty($this->dboptions['dbport'])) {
 177                  $port = "port ='5432'";
 178              } else {
 179                  $port = "port ='".$this->dboptions['dbport']."'";
 180              }
 181              $connection = "host='$this->dbhost' $port user='$this->dbuser' password='$pass' dbname='$this->dbname'";
 182          }
 183  
 184          if (!empty($this->dboptions['connecttimeout'])) {
 185              $connection .= " connect_timeout=".$this->dboptions['connecttimeout'];
 186          }
 187  
 188          if (empty($this->dboptions['dbhandlesoptions'])) {
 189              // ALTER USER and ALTER DATABASE are overridden by these settings.
 190              $options = array('--client_encoding=utf8', '--standard_conforming_strings=on');
 191              // Select schema if specified, otherwise the first one wins.
 192              if (!empty($this->dboptions['dbschema'])) {
 193                  $options[] = "-c search_path=" . addcslashes($this->dboptions['dbschema'], "'\\");
 194              }
 195  
 196              $connection .= " options='" . implode(' ', $options) . "'";
 197          }
 198  
 199          if (isset($this->dboptions['ssl'])) {
 200              $sslmode = $this->dboptions['ssl'];
 201              if (!in_array($sslmode, self::$sslmodes, true)) {
 202                  throw new moodle_exception('validateerrorlist', 'admin', '', "'dboptions''ssl': $sslmode");
 203              }
 204              $connection .= " sslmode=$sslmode";
 205          }
 206  
 207          ob_start();
 208          // It seems that pg_connect() handles some errors differently.
 209          // For example, name resolution error will raise an exception, and non-existing
 210          // database or wrong credentials will just return false.
 211          // We need to cater for both.
 212          try {
 213              if (empty($this->dboptions['dbpersist'])) {
 214                  $this->pgsql = pg_connect($connection, PGSQL_CONNECT_FORCE_NEW);
 215              } else {
 216                  $this->pgsql = pg_pconnect($connection, PGSQL_CONNECT_FORCE_NEW);
 217              }
 218              $dberr = ob_get_contents();
 219          } catch (\Exception $e) {
 220              $dberr = $e->getMessage();
 221          }
 222          ob_end_clean();
 223  
 224          $status = $this->pgsql ? pg_connection_status($this->pgsql) : false;
 225  
 226          if ($status === false or $status === PGSQL_CONNECTION_BAD) {
 227              $this->pgsql = null;
 228              throw new dml_connection_exception($dberr);
 229          }
 230  
 231          if (!empty($this->dboptions['dbpersist'])) {
 232              // There are rare situations (such as PHP out of memory errors) when open cursors may
 233              // not be closed at the end of a connection. When using persistent connections, the
 234              // cursors remain open and 'get in the way' of future connections. To avoid this
 235              // problem, close all cursors here.
 236              $result = pg_query($this->pgsql, 'CLOSE ALL');
 237              if ($result) {
 238                  pg_free_result($result);
 239              }
 240          }
 241  
 242          if (!empty($this->dboptions['dbhandlesoptions'])) {
 243              /* We don't trust people who just set the dbhandlesoptions, this code checks up on them.
 244               * These functions do not talk to the server, they use the client library knowledge to determine state.
 245               */
 246              if (!empty($this->dboptions['dbschema'])) {
 247                  throw new dml_connection_exception('You cannot specify a schema with dbhandlesoptions, use the database to set it.');
 248              }
 249              if (pg_client_encoding($this->pgsql) != 'UTF8') {
 250                  throw new dml_connection_exception('client_encoding = UTF8 not set, it is: ' . pg_client_encoding($this->pgsql));
 251              }
 252              if (pg_escape_string($this->pgsql, '\\') != '\\') {
 253                  throw new dml_connection_exception('standard_conforming_strings = on, must be set at the database.');
 254              }
 255          }
 256  
 257          // Connection stabilised and configured, going to instantiate the temptables controller
 258          $this->temptables = new pgsql_native_moodle_temptables($this);
 259  
 260          return true;
 261      }
 262  
 263      /**
 264       * Close database connection and release all resources
 265       * and memory (especially circular memory references).
 266       * Do NOT use connect() again, create a new instance if needed.
 267       */
 268      public function dispose() {
 269          parent::dispose(); // Call parent dispose to write/close session and other common stuff before closing connection
 270          if ($this->pgsql) {
 271              pg_close($this->pgsql);
 272              $this->pgsql = null;
 273          }
 274      }
 275  
 276      /**
 277       * Gets db handle currently used with queries
 278       * @return resource
 279       */
 280      protected function get_db_handle() {
 281          return $this->pgsql;
 282      }
 283  
 284      /**
 285       * Sets db handle to be used with subsequent queries
 286       * @param resource $dbh
 287       * @return void
 288       */
 289      protected function set_db_handle($dbh): void {
 290          $this->pgsql = $dbh;
 291      }
 292  
 293      /**
 294       * Select appropriate db handle - readwrite or readonly
 295       * @param int $type type of query
 296       * @param string $sql
 297       * @return void
 298       */
 299      protected function select_db_handle(int $type, string $sql): void {
 300          $this->read_slave_select_db_handle($type, $sql);
 301  
 302          if (preg_match('/^DECLARE (crs\w*) NO SCROLL CURSOR/', $sql, $match)) {
 303              $cursor = $match[1];
 304              $this->dbhcursor[$cursor] = $this->pgsql;
 305          }
 306          if (preg_match('/^(?:FETCH \d+ FROM|CLOSE) (crs\w*)\b/', $sql, $match)) {
 307              $cursor = $match[1];
 308              $this->pgsql = $this->dbhcursor[$cursor];
 309          }
 310      }
 311  
 312      /**
 313       * Check if The query qualifies for readonly connection execution
 314       * Logging queries are exempt, those are write operations that circumvent
 315       * standard query_start/query_end paths.
 316       * @param int $type type of query
 317       * @param string $sql
 318       * @return bool
 319       */
 320      protected function can_use_readonly(int $type, string $sql): bool {
 321          // ... pg_*lock queries always go to master.
 322          if (preg_match('/\bpg_\w*lock/', $sql)) {
 323              return false;
 324          }
 325  
 326          // ... a nuisance - temptables use this.
 327          if (preg_match('/\bpg_catalog/', $sql) && $this->temptables->get_temptables()) {
 328              return false;
 329          }
 330  
 331          return $this->read_slave_can_use_readonly($type, $sql);
 332  
 333      }
 334  
 335      /**
 336       * Called before each db query.
 337       * @param string $sql
 338       * @param array|null $params An array of parameters.
 339       * @param int $type type of query
 340       * @param mixed $extrainfo driver specific extra information
 341       * @return void
 342       */
 343      protected function query_start($sql, ?array $params, $type, $extrainfo=null) {
 344          $this->read_slave_query_start($sql, $params, $type, $extrainfo);
 345          // pgsql driver tends to send debug to output, we do not need that.
 346          $this->last_error_reporting = error_reporting(0);
 347      }
 348  
 349      /**
 350       * Called immediately after each db query.
 351       * @param mixed db specific result
 352       * @return void
 353       */
 354      protected function query_end($result) {
 355          // reset original debug level
 356          error_reporting($this->last_error_reporting);
 357          try {
 358              $this->read_slave_query_end($result);
 359              if ($this->savepointpresent &&
 360                      !in_array(
 361                          $this->last_type,
 362                          [SQL_QUERY_AUX, SQL_QUERY_AUX_READONLY, SQL_QUERY_SELECT],
 363                          true
 364                      )) {
 365                  $res = @pg_query($this->pgsql, "RELEASE SAVEPOINT moodle_pg_savepoint; SAVEPOINT moodle_pg_savepoint");
 366                  if ($res) {
 367                      pg_free_result($res);
 368                  }
 369              }
 370          } catch (Exception $e) {
 371              if ($this->savepointpresent) {
 372                  $res = @pg_query($this->pgsql, "ROLLBACK TO SAVEPOINT moodle_pg_savepoint; SAVEPOINT moodle_pg_savepoint");
 373                  if ($res) {
 374                      pg_free_result($res);
 375                  }
 376              }
 377              throw $e;
 378          }
 379      }
 380  
 381      /**
 382       * Returns database server info array
 383       * @return array Array containing 'description' and 'version' info
 384       */
 385      public function get_server_info(): array {
 386          if (empty($this->serverinfo)) {
 387              $this->query_start('--pg_version()', null, SQL_QUERY_AUX);
 388              $this->serverinfo = pg_version($this->pgsql);
 389              $this->query_end(true);
 390          }
 391          return [
 392              'description' => $this->serverinfo['server'],
 393              'version' => $this->serverinfo['server'],
 394          ];
 395      }
 396  
 397      /**
 398       * Returns supported query parameter types
 399       * @return int bitmask of accepted SQL_PARAMS_*
 400       */
 401      protected function allowed_param_types() {
 402          return SQL_PARAMS_DOLLAR;
 403      }
 404  
 405      /**
 406       * Returns last error reported by database engine.
 407       * @return string error message
 408       */
 409      public function get_last_error() {
 410          return pg_last_error($this->pgsql);
 411      }
 412  
 413      /**
 414       * Return tables in database WITHOUT current prefix.
 415       * @param bool $usecache if true, returns list of cached tables.
 416       * @return array of table names in lowercase and without prefix
 417       */
 418      public function get_tables($usecache=true) {
 419          if ($usecache and $this->tables !== null) {
 420              return $this->tables;
 421          }
 422          $this->tables = array();
 423          $prefix = str_replace('_', '|_', $this->prefix);
 424          $sql = "SELECT c.relname
 425                    FROM pg_catalog.pg_class c
 426                    JOIN pg_catalog.pg_namespace as ns ON ns.oid = c.relnamespace
 427                   WHERE c.relname LIKE '$prefix%' ESCAPE '|'
 428                         AND c.relkind = 'r'
 429                         AND (ns.nspname = current_schema() OR ns.oid = pg_my_temp_schema())";
 430          $this->query_start($sql, null, SQL_QUERY_AUX_READONLY);
 431          $result = pg_query($this->pgsql, $sql);
 432          $this->query_end($result);
 433  
 434          if ($result) {
 435              while ($row = pg_fetch_row($result)) {
 436                  $tablename = reset($row);
 437                  if ($this->prefix !== false && $this->prefix !== '') {
 438                      if (strpos($tablename, $this->prefix) !== 0) {
 439                          continue;
 440                      }
 441                      $tablename = substr($tablename, strlen($this->prefix));
 442                  }
 443                  $this->tables[$tablename] = $tablename;
 444              }
 445              pg_free_result($result);
 446          }
 447          return $this->tables;
 448      }
 449  
 450      /**
 451       * Constructs 'IN()' or '=' sql fragment
 452       *
 453       * Method overriding {@see moodle_database::get_in_or_equal} to be able to use
 454       * more than 65535 elements in $items array.
 455       *
 456       * @param mixed $items A single value or array of values for the expression.
 457       * @param int $type Parameter bounding type : SQL_PARAMS_QM or SQL_PARAMS_NAMED.
 458       * @param string $prefix Named parameter placeholder prefix (a unique counter value is appended to each parameter name).
 459       * @param bool $equal True means we want to equate to the constructed expression, false means we don't want to equate to it.
 460       * @param mixed $onemptyitems This defines the behavior when the array of items provided is empty. Defaults to false,
 461       *              meaning throw exceptions. Other values will become part of the returned SQL fragment.
 462       * @throws coding_exception | dml_exception
 463       * @return array A list containing the constructed sql fragment and an array of parameters.
 464       */
 465      public function get_in_or_equal($items, $type=SQL_PARAMS_QM, $prefix='param', $equal=true, $onemptyitems=false): array {
 466          // We only interfere if number of items in expression exceeds 16 bit value.
 467          if (!is_array($items) || count($items) < 65535) {
 468              return parent::get_in_or_equal($items, $type, $prefix,  $equal, $onemptyitems);
 469          }
 470  
 471          // Determine the type from the first value. We don't need to be very smart here,
 472          // it is developer's responsibility to make sure that variable type is matching
 473          // field type, if not the case, DB engine will hint. Also mixing types won't work
 474          // here anyway, so we ignore NULL or boolean (unlikely you need 56k values of
 475          // these types only).
 476          $cast = is_string(current($items)) ? '::text' : '::bigint';
 477  
 478          if ($type == SQL_PARAMS_QM) {
 479              if ($equal) {
 480                  $sql = 'IN (VALUES ('.implode('),(', array_fill(0, count($items), '?'.$cast)).'))';
 481              } else {
 482                  $sql = 'NOT IN (VALUES ('.implode('),(', array_fill(0, count($items), '?'.$cast)).'))';
 483              }
 484              $params = array_values($items);
 485          } else if ($type == SQL_PARAMS_NAMED) {
 486              if (empty($prefix)) {
 487                  $prefix = 'param';
 488              }
 489              $params = [];
 490              $sql = [];
 491              foreach ($items as $item) {
 492                  $param = $prefix.$this->inorequaluniqueindex++;
 493                  $params[$param] = $item;
 494                  $sql[] = ':'.$param.$cast;
 495              }
 496              if ($equal) {
 497                  $sql = 'IN (VALUES ('.implode('),(', $sql).'))';
 498              } else {
 499                  $sql = 'NOT IN (VALUES ('.implode('),(', $sql).'))';
 500              }
 501          } else {
 502              throw new dml_exception('typenotimplement');
 503          }
 504          return [$sql, $params];
 505      }
 506  
 507      /**
 508       * Return table indexes - everything lowercased.
 509       * @param string $table The table we want to get indexes from.
 510       * @return array of arrays
 511       */
 512      public function get_indexes($table) {
 513          $indexes = array();
 514          $tablename = $this->prefix.$table;
 515  
 516          $sql = "SELECT i.*
 517                    FROM pg_catalog.pg_indexes i
 518                    JOIN pg_catalog.pg_namespace as ns ON ns.nspname = i.schemaname
 519                   WHERE i.tablename = '$tablename'
 520                         AND (i.schemaname = current_schema() OR ns.oid = pg_my_temp_schema())";
 521  
 522          $this->query_start($sql, null, SQL_QUERY_AUX_READONLY);
 523          $result = pg_query($this->pgsql, $sql);
 524          $this->query_end($result);
 525  
 526          if ($result) {
 527              while ($row = pg_fetch_assoc($result)) {
 528                  // The index definition could be generated schema-qualifying the target table name
 529                  // for safety, depending on the pgsql version (CVE-2018-1058).
 530                  if (!preg_match('/CREATE (|UNIQUE )INDEX ([^\s]+) ON (|'.$row['schemaname'].'\.)'.$tablename.' USING ([^\s]+) \(([^\)]+)\)/i', $row['indexdef'], $matches)) {
 531                      continue;
 532                  }
 533                  if ($matches[5] === 'id') {
 534                      continue;
 535                  }
 536                  $columns = explode(',', $matches[5]);
 537                  foreach ($columns as $k=>$column) {
 538                      $column = trim($column);
 539                      if ($pos = strpos($column, ' ')) {
 540                          // index type is separated by space
 541                          $column = substr($column, 0, $pos);
 542                      }
 543                      $columns[$k] = $this->trim_quotes($column);
 544                  }
 545                  $indexes[$row['indexname']] = array('unique'=>!empty($matches[1]),
 546                                                'columns'=>$columns);
 547              }
 548              pg_free_result($result);
 549          }
 550          return $indexes;
 551      }
 552  
 553      /**
 554       * Returns detailed information about columns in table.
 555       *
 556       * @param string $table name
 557       * @return database_column_info[] array of database_column_info objects indexed with column names
 558       */
 559      protected function fetch_columns(string $table): array {
 560          $structure = array();
 561  
 562          $tablename = $this->prefix.$table;
 563  
 564          $sql = "SELECT a.attnum, a.attname AS field, t.typname AS type, a.attlen, a.atttypmod, a.attnotnull, a.atthasdef,
 565                         CASE WHEN a.atthasdef THEN pg_catalog.pg_get_expr(d.adbin, d.adrelid) ELSE '' END AS adsrc
 566                    FROM pg_catalog.pg_class c
 567                    JOIN pg_catalog.pg_namespace as ns ON ns.oid = c.relnamespace
 568                    JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid
 569                    JOIN pg_catalog.pg_type t ON t.oid = a.atttypid
 570               LEFT JOIN pg_catalog.pg_attrdef d ON (d.adrelid = c.oid AND d.adnum = a.attnum)
 571                   WHERE relkind = 'r' AND c.relname = '$tablename' AND c.reltype > 0 AND a.attnum > 0
 572                         AND (ns.nspname = current_schema() OR ns.oid = pg_my_temp_schema())
 573                ORDER BY a.attnum";
 574  
 575          $this->query_start($sql, null, SQL_QUERY_AUX_READONLY);
 576          $result = pg_query($this->pgsql, $sql);
 577          $this->query_end($result);
 578  
 579          if (!$result) {
 580              return array();
 581          }
 582          while ($rawcolumn = pg_fetch_object($result)) {
 583  
 584              $info = new stdClass();
 585              $info->name = $rawcolumn->field;
 586              $matches = null;
 587  
 588              if ($rawcolumn->type === 'varchar') {
 589                  $info->type          = 'varchar';
 590                  $info->meta_type     = 'C';
 591                  $info->max_length    = $rawcolumn->atttypmod - 4;
 592                  $info->scale         = null;
 593                  $info->not_null      = ($rawcolumn->attnotnull === 't');
 594                  $info->has_default   = ($rawcolumn->atthasdef === 't');
 595                  if ($info->has_default) {
 596                      $parts = explode('::', $rawcolumn->adsrc);
 597                      if (count($parts) > 1) {
 598                          $info->default_value = reset($parts);
 599                          $info->default_value = trim($info->default_value, "'");
 600                      } else {
 601                          $info->default_value = $rawcolumn->adsrc;
 602                      }
 603                  } else {
 604                      $info->default_value = null;
 605                  }
 606                  $info->primary_key   = false;
 607                  $info->binary        = false;
 608                  $info->unsigned      = null;
 609                  $info->auto_increment= false;
 610                  $info->unique        = null;
 611  
 612              } else if (preg_match('/int(\d)/i', $rawcolumn->type, $matches)) {
 613                  $info->type = 'int';
 614                  if (strpos($rawcolumn->adsrc ?? '', 'nextval') === 0) {
 615                      $info->primary_key   = true;
 616                      $info->meta_type     = 'R';
 617                      $info->unique        = true;
 618                      $info->auto_increment= true;
 619                      $info->has_default   = false;
 620                  } else {
 621                      $info->primary_key   = false;
 622                      $info->meta_type     = 'I';
 623                      $info->unique        = null;
 624                      $info->auto_increment= false;
 625                      $info->has_default   = ($rawcolumn->atthasdef === 't');
 626                  }
 627                  // Return number of decimals, not bytes here.
 628                  if ($matches[1] >= 8) {
 629                      $info->max_length = 18;
 630                  } else if ($matches[1] >= 4) {
 631                      $info->max_length = 9;
 632                  } else if ($matches[1] >= 2) {
 633                      $info->max_length = 4;
 634                  } else if ($matches[1] >= 1) {
 635                      $info->max_length = 2;
 636                  } else {
 637                      $info->max_length = 0;
 638                  }
 639                  $info->scale         = null;
 640                  $info->not_null      = ($rawcolumn->attnotnull === 't');
 641                  if ($info->has_default) {
 642                      // PG 9.5+ uses ::<TYPE> syntax for some defaults.
 643                      $parts = explode('::', $rawcolumn->adsrc);
 644                      if (count($parts) > 1) {
 645                          $info->default_value = reset($parts);
 646                      } else {
 647                          $info->default_value = $rawcolumn->adsrc;
 648                      }
 649                      $info->default_value = trim($info->default_value, "()'");
 650                  } else {
 651                      $info->default_value = null;
 652                  }
 653                  $info->binary        = false;
 654                  $info->unsigned      = false;
 655  
 656              } else if ($rawcolumn->type === 'numeric') {
 657                  $info->type = $rawcolumn->type;
 658                  $info->meta_type     = 'N';
 659                  $info->primary_key   = false;
 660                  $info->binary        = false;
 661                  $info->unsigned      = null;
 662                  $info->auto_increment= false;
 663                  $info->unique        = null;
 664                  $info->not_null      = ($rawcolumn->attnotnull === 't');
 665                  $info->has_default   = ($rawcolumn->atthasdef === 't');
 666                  if ($info->has_default) {
 667                      // PG 9.5+ uses ::<TYPE> syntax for some defaults.
 668                      $parts = explode('::', $rawcolumn->adsrc);
 669                      if (count($parts) > 1) {
 670                          $info->default_value = reset($parts);
 671                      } else {
 672                          $info->default_value = $rawcolumn->adsrc;
 673                      }
 674                      $info->default_value = trim($info->default_value, "()'");
 675                  } else {
 676                      $info->default_value = null;
 677                  }
 678                  $info->max_length    = $rawcolumn->atttypmod >> 16;
 679                  $info->scale         = ($rawcolumn->atttypmod & 0xFFFF) - 4;
 680  
 681              } else if (preg_match('/float(\d)/i', $rawcolumn->type, $matches)) {
 682                  $info->type = 'float';
 683                  $info->meta_type     = 'N';
 684                  $info->primary_key   = false;
 685                  $info->binary        = false;
 686                  $info->unsigned      = null;
 687                  $info->auto_increment= false;
 688                  $info->unique        = null;
 689                  $info->not_null      = ($rawcolumn->attnotnull === 't');
 690                  $info->has_default   = ($rawcolumn->atthasdef === 't');
 691                  if ($info->has_default) {
 692                      // PG 9.5+ uses ::<TYPE> syntax for some defaults.
 693                      $parts = explode('::', $rawcolumn->adsrc);
 694                      if (count($parts) > 1) {
 695                          $info->default_value = reset($parts);
 696                      } else {
 697                          $info->default_value = $rawcolumn->adsrc;
 698                      }
 699                      $info->default_value = trim($info->default_value, "()'");
 700                  } else {
 701                      $info->default_value = null;
 702                  }
 703                  // just guess expected number of deciaml places :-(
 704                  if ($matches[1] == 8) {
 705                      // total 15 digits
 706                      $info->max_length = 8;
 707                      $info->scale      = 7;
 708                  } else {
 709                      // total 6 digits
 710                      $info->max_length = 4;
 711                      $info->scale      = 2;
 712                  }
 713  
 714              } else if ($rawcolumn->type === 'text') {
 715                  $info->type          = $rawcolumn->type;
 716                  $info->meta_type     = 'X';
 717                  $info->max_length    = -1;
 718                  $info->scale         = null;
 719                  $info->not_null      = ($rawcolumn->attnotnull === 't');
 720                  $info->has_default   = ($rawcolumn->atthasdef === 't');
 721                  if ($info->has_default) {
 722                      $parts = explode('::', $rawcolumn->adsrc);
 723                      if (count($parts) > 1) {
 724                          $info->default_value = reset($parts);
 725                          $info->default_value = trim($info->default_value, "'");
 726                      } else {
 727                          $info->default_value = $rawcolumn->adsrc;
 728                      }
 729                  } else {
 730                      $info->default_value = null;
 731                  }
 732                  $info->primary_key   = false;
 733                  $info->binary        = false;
 734                  $info->unsigned      = null;
 735                  $info->auto_increment= false;
 736                  $info->unique        = null;
 737  
 738              } else if ($rawcolumn->type === 'bytea') {
 739                  $info->type          = $rawcolumn->type;
 740                  $info->meta_type     = 'B';
 741                  $info->max_length    = -1;
 742                  $info->scale         = null;
 743                  $info->not_null      = ($rawcolumn->attnotnull === 't');
 744                  $info->has_default   = false;
 745                  $info->default_value = null;
 746                  $info->primary_key   = false;
 747                  $info->binary        = true;
 748                  $info->unsigned      = null;
 749                  $info->auto_increment= false;
 750                  $info->unique        = null;
 751  
 752              }
 753  
 754              $structure[$info->name] = new database_column_info($info);
 755          }
 756  
 757          pg_free_result($result);
 758  
 759          return $structure;
 760      }
 761  
 762      /**
 763       * Normalise values based in RDBMS dependencies (booleans, LOBs...)
 764       *
 765       * @param database_column_info $column column metadata corresponding with the value we are going to normalise
 766       * @param mixed $value value we are going to normalise
 767       * @return mixed the normalised value
 768       */
 769      protected function normalise_value($column, $value) {
 770          $this->detect_objects($value);
 771  
 772          if (is_bool($value)) { // Always, convert boolean to int
 773              $value = (int)$value;
 774  
 775          } else if ($column->meta_type === 'B') {
 776              if (!is_null($value)) {
 777                  // standard_conforming_strings must be enabled, otherwise pg_escape_bytea() will double escape
 778                  // \ and produce data errors.  This is set on the connection.
 779                  $value = pg_escape_bytea($this->pgsql, $value);
 780              }
 781  
 782          } else if ($value === '') {
 783              if ($column->meta_type === 'I' or $column->meta_type === 'F' or $column->meta_type === 'N') {
 784                  $value = 0; // prevent '' problems in numeric fields
 785              }
 786          }
 787          return $value;
 788      }
 789  
 790      /**
 791       * Is db in unicode mode?
 792       * @return bool
 793       */
 794      public function setup_is_unicodedb() {
 795          // Get PostgreSQL server_encoding value
 796          $sql = 'SHOW server_encoding';
 797          $this->query_start($sql, null, SQL_QUERY_AUX_READONLY);
 798          $result = pg_query($this->pgsql, $sql);
 799          $this->query_end($result);
 800  
 801          if (!$result) {
 802              return false;
 803          }
 804          $rawcolumn = pg_fetch_object($result);
 805          $encoding = $rawcolumn->server_encoding;
 806          pg_free_result($result);
 807  
 808          return (strtoupper($encoding) == 'UNICODE' || strtoupper($encoding) == 'UTF8');
 809      }
 810  
 811      /**
 812       * Do NOT use in code, to be used by database_manager only!
 813       * @param string|array $sql query
 814       * @param array|null $tablenames an array of xmldb table names affected by this request.
 815       * @return bool true
 816       * @throws ddl_change_structure_exception A DDL specific exception is thrown for any errors.
 817       */
 818      public function change_database_structure($sql, $tablenames = null) {
 819          $this->get_manager(); // Includes DDL exceptions classes ;-)
 820          if (is_array($sql)) {
 821              $sql = implode("\n;\n", $sql);
 822          }
 823          if (!$this->is_transaction_started()) {
 824              // It is better to do all or nothing, this helps with recovery...
 825              $sql = "BEGIN ISOLATION LEVEL SERIALIZABLE;\n$sql\n; COMMIT";
 826          }
 827  
 828          try {
 829              $this->query_start($sql, null, SQL_QUERY_STRUCTURE);
 830              $result = pg_query($this->pgsql, $sql);
 831              $this->query_end($result);
 832              pg_free_result($result);
 833          } catch (ddl_change_structure_exception $e) {
 834              if (!$this->is_transaction_started()) {
 835                  $result = @pg_query($this->pgsql, "ROLLBACK");
 836                  @pg_free_result($result);
 837              }
 838              $this->reset_caches($tablenames);
 839              throw $e;
 840          }
 841  
 842          $this->reset_caches($tablenames);
 843          return true;
 844      }
 845  
 846      /**
 847       * Execute general sql query. Should be used only when no other method suitable.
 848       * Do NOT use this to make changes in db structure, use database_manager methods instead!
 849       * @param string $sql query
 850       * @param array $params query parameters
 851       * @return bool true
 852       * @throws dml_exception A DML specific exception is thrown for any errors.
 853       */
 854      public function execute($sql, array $params=null) {
 855          list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
 856  
 857          if (strpos($sql, ';') !== false) {
 858              throw new coding_exception('moodle_database::execute() Multiple sql statements found or bound parameters not used properly in query!');
 859          }
 860  
 861          $this->query_start($sql, $params, SQL_QUERY_UPDATE);
 862          $result = pg_query_params($this->pgsql, $sql, $params);
 863          $this->query_end($result);
 864  
 865          pg_free_result($result);
 866          return true;
 867      }
 868  
 869      /**
 870       * Get a number of records as a moodle_recordset using a SQL statement.
 871       *
 872       * Since this method is a little less readable, use of it should be restricted to
 873       * code where it's possible there might be large datasets being returned.  For known
 874       * small datasets use get_records_sql - it leads to simpler code.
 875       *
 876       * The return type is like:
 877       * @see function get_recordset.
 878       *
 879       * @param string $sql the SQL select query to execute.
 880       * @param array $params array of sql parameters
 881       * @param int $limitfrom return a subset of records, starting at this point (optional, required if $limitnum is set).
 882       * @param int $limitnum return a subset comprising this many records (optional, required if $limitfrom is set).
 883       * @return moodle_recordset instance
 884       * @throws dml_exception A DML specific exception is thrown for any errors.
 885       */
 886      public function get_recordset_sql($sql, array $params=null, $limitfrom=0, $limitnum=0) {
 887  
 888          list($limitfrom, $limitnum) = $this->normalise_limit_from_num($limitfrom, $limitnum);
 889  
 890          if ($limitnum) {
 891              $sql .= " LIMIT $limitnum";
 892          }
 893          if ($limitfrom) {
 894              $sql .= " OFFSET $limitfrom";
 895          }
 896  
 897          list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
 898  
 899          // For any query that doesn't explicitly specify a limit, we must use cursors to stop it
 900          // loading the entire thing (unless the config setting is turned off).
 901          $usecursors = !$limitnum && ($this->get_fetch_buffer_size() > 0);
 902          if ($usecursors) {
 903              // Work out the cursor unique identifer. This is based on a simple count used which
 904              // should be OK because the identifiers only need to be unique within the current
 905              // transaction.
 906              $this->cursorcount++;
 907              $cursorname = 'crs' . $this->cursorcount;
 908  
 909              // Do the query to a cursor.
 910              $sql = 'DECLARE ' . $cursorname . ' NO SCROLL CURSOR WITH HOLD FOR ' . $sql;
 911          } else {
 912              $cursorname = '';
 913          }
 914  
 915          $this->query_start($sql, $params, SQL_QUERY_SELECT);
 916  
 917          $result = pg_query_params($this->pgsql, $sql, $params);
 918  
 919          $this->query_end($result);
 920          if ($usecursors) {
 921              pg_free_result($result);
 922              $result = null;
 923          }
 924  
 925          return new pgsql_native_moodle_recordset($result, $this, $cursorname);
 926      }
 927  
 928      /**
 929       * Gets size of fetch buffer used for recordset queries.
 930       *
 931       * If this returns 0 then cursors will not be used, meaning recordset queries will occupy enough
 932       * memory as needed for the Postgres library to hold the entire query results in memory.
 933       *
 934       * @return int Fetch buffer size or 0 indicating not to use cursors
 935       */
 936      protected function get_fetch_buffer_size() {
 937          if (array_key_exists('fetchbuffersize', $this->dboptions)) {
 938              return (int)$this->dboptions['fetchbuffersize'];
 939          } else {
 940              return self::DEFAULT_FETCH_BUFFER_SIZE;
 941          }
 942      }
 943  
 944      /**
 945       * Retrieves data from cursor. For use by recordset only; do not call directly.
 946       *
 947       * Return value contains the next batch of Postgres data, and a boolean indicating if this is
 948       * definitely the last batch (if false, there may be more)
 949       *
 950       * @param string $cursorname Name of cursor to read from
 951       * @return array Array with 2 elements (next data batch and boolean indicating last batch)
 952       */
 953      public function fetch_from_cursor($cursorname) {
 954          $count = $this->get_fetch_buffer_size();
 955  
 956          $sql = 'FETCH ' . $count . ' FROM ' . $cursorname;
 957  
 958          $this->query_start($sql, [], SQL_QUERY_AUX);
 959          $result = pg_query($this->pgsql, $sql);
 960          $last = pg_num_rows($result) !== $count;
 961  
 962          $this->query_end($result);
 963  
 964          return [$result, $last];
 965      }
 966  
 967      /**
 968       * Closes a cursor. For use by recordset only; do not call directly.
 969       *
 970       * @param string $cursorname Name of cursor to close
 971       * @return bool True if we actually closed one, false if the transaction was cancelled
 972       */
 973      public function close_cursor($cursorname) {
 974          // If the transaction got cancelled, then ignore this request.
 975          $sql = 'CLOSE ' . $cursorname;
 976          $this->query_start($sql, [], SQL_QUERY_AUX);
 977          $result = pg_query($this->pgsql, $sql);
 978          $this->query_end($result);
 979          if ($result) {
 980              pg_free_result($result);
 981          }
 982          return true;
 983      }
 984  
 985      /**
 986       * A faster version of pg_field_type
 987       *
 988       * The pg_field_type function in the php postgres driver internally makes an sql call
 989       * to get the list of field types which it statically caches only for a single request.
 990       * This wraps it in a cache keyed by oid to avoid these DB calls on every request.
 991       *
 992       * @param resource|PgSql\Result $result
 993       * @param int $fieldnumber
 994       * @return string Field type
 995       */
 996      public function pg_field_type($result, int $fieldnumber) {
 997          static $map;
 998          $cache = $this->get_metacache();
 999  
1000          // Getting the oid doesn't make an internal query.
1001          $oid = pg_field_type_oid($result, $fieldnumber);
1002          if (!$map) {
1003              $map = $cache->get('oid2typname');
1004          }
1005          if ($map === false) {
1006              $map = [];
1007          }
1008          if (isset($map[$oid])) {
1009              return $map[$oid];
1010          }
1011          $map[$oid] = pg_field_type($result, $fieldnumber);
1012          $cache->set('oid2typname', $map);
1013          return $map[$oid];
1014      }
1015  
1016      /**
1017       * Get a number of records as an array of objects using a SQL statement.
1018       *
1019       * Return value is like:
1020       * @see function get_records.
1021       *
1022       * @param string $sql the SQL select query to execute. The first column of this SELECT statement
1023       *   must be a unique value (usually the 'id' field), as it will be used as the key of the
1024       *   returned array.
1025       * @param array $params array of sql parameters
1026       * @param int $limitfrom return a subset of records, starting at this point (optional, required if $limitnum is set).
1027       * @param int $limitnum return a subset comprising this many records (optional, required if $limitfrom is set).
1028       * @return array of objects, or empty array if no records were found
1029       * @throws dml_exception A DML specific exception is thrown for any errors.
1030       */
1031      public function get_records_sql($sql, array $params = null, $limitfrom = 0, $limitnum = 0) {
1032          list($limitfrom, $limitnum) = $this->normalise_limit_from_num($limitfrom, $limitnum);
1033  
1034          if ($limitnum) {
1035              $sql .= " LIMIT $limitnum";
1036          }
1037          if ($limitfrom) {
1038              $sql .= " OFFSET $limitfrom";
1039          }
1040  
1041          list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
1042          $this->query_start($sql, $params, SQL_QUERY_SELECT);
1043          $result = pg_query_params($this->pgsql, $sql, $params);
1044          $this->query_end($result);
1045  
1046          // find out if there are any blobs
1047          $numfields = pg_num_fields($result);
1048          $blobs = array();
1049          for ($i = 0; $i < $numfields; $i++) {
1050              $type = $this->pg_field_type($result, $i);
1051              if ($type == 'bytea') {
1052                  $blobs[] = pg_field_name($result, $i);
1053              }
1054          }
1055  
1056          $return = [];
1057          while ($row = pg_fetch_assoc($result)) {
1058              $id = reset($row);
1059              if ($blobs) {
1060                  foreach ($blobs as $blob) {
1061                      $row[$blob] = ($row[$blob] !== null ? pg_unescape_bytea($row[$blob]) : null);
1062                  }
1063              }
1064              if (isset($return[$id])) {
1065                  $colname = key($row);
1066                  debugging("Did you remember to make the first column something unique in your call to get_records? Duplicate value '$id' found in column '$colname'.", DEBUG_DEVELOPER);
1067              }
1068              $return[$id] = (object) $row;
1069          }
1070  
1071          return $return;
1072      }
1073  
1074      /**
1075       * Selects records and return values (first field) as an array using a SQL statement.
1076       *
1077       * @param string $sql The SQL query
1078       * @param array $params array of sql parameters
1079       * @return array of values
1080       * @throws dml_exception A DML specific exception is thrown for any errors.
1081       */
1082      public function get_fieldset_sql($sql, array $params=null) {
1083          list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
1084  
1085          $this->query_start($sql, $params, SQL_QUERY_SELECT);
1086          $result = pg_query_params($this->pgsql, $sql, $params);
1087          $this->query_end($result);
1088  
1089          $return = pg_fetch_all_columns($result, 0);
1090  
1091          if ($this->pg_field_type($result, 0) == 'bytea') {
1092              foreach ($return as $key => $value) {
1093                  $return[$key] = ($value === null ? $value : pg_unescape_bytea($value));
1094              }
1095          }
1096  
1097          pg_free_result($result);
1098  
1099          return $return;
1100      }
1101  
1102      /**
1103       * Insert new record into database, as fast as possible, no safety checks, lobs not supported.
1104       * @param string $table name
1105       * @param mixed $params data record as object or array
1106       * @param bool $returnit return it of inserted record
1107       * @param bool $bulk true means repeated inserts expected
1108       * @param bool $customsequence true if 'id' included in $params, disables $returnid
1109       * @return bool|int true or new id
1110       * @throws dml_exception A DML specific exception is thrown for any errors.
1111       */
1112      public function insert_record_raw($table, $params, $returnid=true, $bulk=false, $customsequence=false) {
1113          if (!is_array($params)) {
1114              $params = (array)$params;
1115          }
1116  
1117          $returning = "";
1118  
1119          if ($customsequence) {
1120              if (!isset($params['id'])) {
1121                  throw new coding_exception('moodle_database::insert_record_raw() id field must be specified if custom sequences used.');
1122              }
1123              $returnid = false;
1124          } else {
1125              if ($returnid) {
1126                  $returning = "RETURNING id";
1127                  unset($params['id']);
1128              } else {
1129                  unset($params['id']);
1130              }
1131          }
1132  
1133          if (empty($params)) {
1134              throw new coding_exception('moodle_database::insert_record_raw() no fields found.');
1135          }
1136  
1137          $fields = implode(',', array_keys($params));
1138          $values = array();
1139          $i = 1;
1140          foreach ($params as $value) {
1141              $this->detect_objects($value);
1142              $values[] = "\$".$i++;
1143          }
1144          $values = implode(',', $values);
1145  
1146          $sql = "INSERT INTO {$this->prefix}$table ($fields) VALUES($values) $returning";
1147          $this->query_start($sql, $params, SQL_QUERY_INSERT);
1148          $result = pg_query_params($this->pgsql, $sql, $params);
1149          $this->query_end($result);
1150  
1151          if ($returning !== "") {
1152              $row = pg_fetch_assoc($result);
1153              $params['id'] = reset($row);
1154          }
1155          pg_free_result($result);
1156  
1157          if (!$returnid) {
1158              return true;
1159          }
1160  
1161          return (int)$params['id'];
1162      }
1163  
1164      /**
1165       * Insert a record into a table and return the "id" field if required.
1166       *
1167       * Some conversions and safety checks are carried out. Lobs are supported.
1168       * If the return ID isn't required, then this just reports success as true/false.
1169       * $data is an object containing needed data
1170       * @param string $table The database table to be inserted into
1171       * @param object|array $dataobject A data object with values for one or more fields in the record
1172       * @param bool $returnid Should the id of the newly created record entry be returned? If this option is not requested then true/false is returned.
1173       * @return bool|int true or new id
1174       * @throws dml_exception A DML specific exception is thrown for any errors.
1175       */
1176      public function insert_record($table, $dataobject, $returnid=true, $bulk=false) {
1177          $dataobject = (array)$dataobject;
1178  
1179          $columns = $this->get_columns($table);
1180          if (empty($columns)) {
1181              throw new dml_exception('ddltablenotexist', $table);
1182          }
1183  
1184          $cleaned = array();
1185  
1186          foreach ($dataobject as $field=>$value) {
1187              if ($field === 'id') {
1188                  continue;
1189              }
1190              if (!isset($columns[$field])) {
1191                  continue;
1192              }
1193              $column = $columns[$field];
1194              $cleaned[$field] = $this->normalise_value($column, $value);
1195          }
1196  
1197          return $this->insert_record_raw($table, $cleaned, $returnid, $bulk);
1198  
1199      }
1200  
1201      /**
1202       * Insert multiple records into database as fast as possible.
1203       *
1204       * Order of inserts is maintained, but the operation is not atomic,
1205       * use transactions if necessary.
1206       *
1207       * This method is intended for inserting of large number of small objects,
1208       * do not use for huge objects with text or binary fields.
1209       *
1210       * @since Moodle 2.7
1211       *
1212       * @param string $table  The database table to be inserted into
1213       * @param array|Traversable $dataobjects list of objects to be inserted, must be compatible with foreach
1214       * @return void does not return new record ids
1215       *
1216       * @throws coding_exception if data objects have different structure
1217       * @throws dml_exception A DML specific exception is thrown for any errors.
1218       */
1219      public function insert_records($table, $dataobjects) {
1220          if (!is_array($dataobjects) and !($dataobjects instanceof Traversable)) {
1221              throw new coding_exception('insert_records() passed non-traversable object');
1222          }
1223  
1224          // PostgreSQL does not seem to have problems with huge queries.
1225          $chunksize = 500;
1226          if (!empty($this->dboptions['bulkinsertsize'])) {
1227              $chunksize = (int)$this->dboptions['bulkinsertsize'];
1228          }
1229  
1230          $columns = $this->get_columns($table, true);
1231  
1232          $fields = null;
1233          $count = 0;
1234          $chunk = array();
1235          foreach ($dataobjects as $dataobject) {
1236              if (!is_array($dataobject) and !is_object($dataobject)) {
1237                  throw new coding_exception('insert_records() passed invalid record object');
1238              }
1239              $dataobject = (array)$dataobject;
1240              if ($fields === null) {
1241                  $fields = array_keys($dataobject);
1242                  $columns = array_intersect_key($columns, $dataobject);
1243                  unset($columns['id']);
1244              } else if ($fields !== array_keys($dataobject)) {
1245                  throw new coding_exception('All dataobjects in insert_records() must have the same structure!');
1246              }
1247  
1248              $count++;
1249              $chunk[] = $dataobject;
1250  
1251              if ($count === $chunksize) {
1252                  $this->insert_chunk($table, $chunk, $columns);
1253                  $chunk = array();
1254                  $count = 0;
1255              }
1256          }
1257  
1258          if ($count) {
1259              $this->insert_chunk($table, $chunk, $columns);
1260          }
1261      }
1262  
1263      /**
1264       * Insert records in chunks, strict param types...
1265       *
1266       * Note: can be used only from insert_records().
1267       *
1268       * @param string $table
1269       * @param array $chunk
1270       * @param database_column_info[] $columns
1271       */
1272      protected function insert_chunk($table, array $chunk, array $columns) {
1273          $i = 1;
1274          $params = array();
1275          $values = array();
1276          foreach ($chunk as $dataobject) {
1277              $vals = array();
1278              foreach ($columns as $field => $column) {
1279                  $params[] = $this->normalise_value($column, $dataobject[$field]);
1280                  $vals[] = "\$".$i++;
1281              }
1282              $values[] = '('.implode(',', $vals).')';
1283          }
1284  
1285          $fieldssql = '('.implode(',', array_keys($columns)).')';
1286          $valuessql = implode(',', $values);
1287  
1288          $sql = "INSERT INTO {$this->prefix}$table $fieldssql VALUES $valuessql";
1289          $this->query_start($sql, $params, SQL_QUERY_INSERT);
1290          $result = pg_query_params($this->pgsql, $sql, $params);
1291          $this->query_end($result);
1292          pg_free_result($result);
1293      }
1294  
1295      /**
1296       * Import a record into a table, id field is required.
1297       * Safety checks are NOT carried out. Lobs are supported.
1298       *
1299       * @param string $table name of database table to be inserted into
1300       * @param object $dataobject A data object with values for one or more fields in the record
1301       * @return bool true
1302       * @throws dml_exception A DML specific exception is thrown for any errors.
1303       */
1304      public function import_record($table, $dataobject) {
1305          $dataobject = (array)$dataobject;
1306  
1307          $columns = $this->get_columns($table);
1308          $cleaned = array();
1309  
1310          foreach ($dataobject as $field=>$value) {
1311              $this->detect_objects($value);
1312              if (!isset($columns[$field])) {
1313                  continue;
1314              }
1315              $column = $columns[$field];
1316              $cleaned[$field] = $this->normalise_value($column, $value);
1317          }
1318  
1319          return $this->insert_record_raw($table, $cleaned, false, true, true);
1320      }
1321  
1322      /**
1323       * Update record in database, as fast as possible, no safety checks, lobs not supported.
1324       * @param string $table name
1325       * @param stdClass|array $params data record as object or array
1326       * @param bool true means repeated updates expected
1327       * @return bool true
1328       * @throws dml_exception A DML specific exception is thrown for any errors.
1329       */
1330      public function update_record_raw($table, $params, $bulk=false) {
1331          $params = (array)$params;
1332  
1333          if (!isset($params['id'])) {
1334              throw new coding_exception('moodle_database::update_record_raw() id field must be specified.');
1335          }
1336          $id = $params['id'];
1337          unset($params['id']);
1338  
1339          if (empty($params)) {
1340              throw new coding_exception('moodle_database::update_record_raw() no fields found.');
1341          }
1342  
1343          $i = 1;
1344  
1345          $sets = array();
1346          foreach ($params as $field=>$value) {
1347              $this->detect_objects($value);
1348              $sets[] = "$field = \$".$i++;
1349          }
1350  
1351          $params[] = $id; // last ? in WHERE condition
1352  
1353          $sets = implode(',', $sets);
1354          $sql = "UPDATE {$this->prefix}$table SET $sets WHERE id=\$".$i;
1355  
1356          $this->query_start($sql, $params, SQL_QUERY_UPDATE);
1357          $result = pg_query_params($this->pgsql, $sql, $params);
1358          $this->query_end($result);
1359  
1360          pg_free_result($result);
1361          return true;
1362      }
1363  
1364      /**
1365       * Update a record in a table
1366       *
1367       * $dataobject is an object containing needed data
1368       * Relies on $dataobject having a variable "id" to
1369       * specify the record to update
1370       *
1371       * @param string $table The database table to be checked against.
1372       * @param stdClass|array $dataobject An object with contents equal to fieldname=>fieldvalue.
1373       *        Must have an entry for 'id' to map to the table specified.
1374       * @param bool true means repeated updates expected
1375       * @return bool true
1376       * @throws dml_exception A DML specific exception is thrown for any errors.
1377       */
1378      public function update_record($table, $dataobject, $bulk=false) {
1379          $dataobject = (array)$dataobject;
1380  
1381          $columns = $this->get_columns($table);
1382          $cleaned = array();
1383  
1384          foreach ($dataobject as $field=>$value) {
1385              if (!isset($columns[$field])) {
1386                  continue;
1387              }
1388              $column = $columns[$field];
1389              $cleaned[$field] = $this->normalise_value($column, $value);
1390          }
1391  
1392          $this->update_record_raw($table, $cleaned, $bulk);
1393  
1394          return true;
1395      }
1396  
1397      /**
1398       * Set a single field in every table record which match a particular WHERE clause.
1399       *
1400       * @param string $table The database table to be checked against.
1401       * @param string $newfield the field to set.
1402       * @param string $newvalue the value to set the field to.
1403       * @param string $select A fragment of SQL to be used in a where clause in the SQL call.
1404       * @param array $params array of sql parameters
1405       * @return bool true
1406       * @throws dml_exception A DML specific exception is thrown for any errors.
1407       */
1408      public function set_field_select($table, $newfield, $newvalue, $select, array $params=null) {
1409  
1410          if ($select) {
1411              $select = "WHERE $select";
1412          }
1413          if (is_null($params)) {
1414              $params = array();
1415          }
1416          list($select, $params, $type) = $this->fix_sql_params($select, $params);
1417          $i = count($params)+1;
1418  
1419          // Get column metadata
1420          $columns = $this->get_columns($table);
1421          $column = $columns[$newfield];
1422  
1423          $normalisedvalue = $this->normalise_value($column, $newvalue);
1424  
1425          $newfield = "$newfield = \$" . $i;
1426          $params[] = $normalisedvalue;
1427          $sql = "UPDATE {$this->prefix}$table SET $newfield $select";
1428  
1429          $this->query_start($sql, $params, SQL_QUERY_UPDATE);
1430          $result = pg_query_params($this->pgsql, $sql, $params);
1431          $this->query_end($result);
1432  
1433          pg_free_result($result);
1434  
1435          return true;
1436      }
1437  
1438      /**
1439       * Delete one or more records from a table which match a particular WHERE clause, lobs not supported.
1440       *
1441       * @param string $table The database table to be checked against.
1442       * @param string $select A fragment of SQL to be used in a where clause in the SQL call (used to define the selection criteria).
1443       * @param array $params array of sql parameters
1444       * @return bool true
1445       * @throws dml_exception A DML specific exception is thrown for any errors.
1446       */
1447      public function delete_records_select($table, $select, array $params=null) {
1448          if ($select) {
1449              $select = "WHERE $select";
1450          }
1451          $sql = "DELETE FROM {$this->prefix}$table $select";
1452  
1453          list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
1454  
1455          $this->query_start($sql, $params, SQL_QUERY_UPDATE);
1456          $result = pg_query_params($this->pgsql, $sql, $params);
1457          $this->query_end($result);
1458  
1459          pg_free_result($result);
1460  
1461          return true;
1462      }
1463  
1464      /**
1465       * Returns 'LIKE' part of a query.
1466       *
1467       * @param string $fieldname usually name of the table column
1468       * @param string $param usually bound query parameter (?, :named)
1469       * @param bool $casesensitive use case sensitive search
1470       * @param bool $accensensitive use accent sensitive search (not all databases support accent insensitive)
1471       * @param bool $notlike true means "NOT LIKE"
1472       * @param string $escapechar escape char for '%' and '_'
1473       * @return string SQL code fragment
1474       */
1475      public function sql_like($fieldname, $param, $casesensitive = true, $accentsensitive = true, $notlike = false, $escapechar = '\\') {
1476          if (strpos($param, '%') !== false) {
1477              debugging('Potential SQL injection detected, sql_like() expects bound parameters (? or :named)');
1478          }
1479  
1480          // postgresql does not support accent insensitive text comparisons, sorry
1481          if ($casesensitive) {
1482              $LIKE = $notlike ? 'NOT LIKE' : 'LIKE';
1483          } else {
1484              $LIKE = $notlike ? 'NOT ILIKE' : 'ILIKE';
1485          }
1486          return "$fieldname $LIKE $param ESCAPE '$escapechar'";
1487      }
1488  
1489      public function sql_bitxor($int1, $int2) {
1490          return '((' . $int1 . ') # (' . $int2 . '))';
1491      }
1492  
1493      /**
1494       * Return SQL for casting to char of given field/expression
1495       *
1496       * @param string $field Table field or SQL expression to be cast
1497       * @return string
1498       */
1499      public function sql_cast_to_char(string $field): string {
1500          return "CAST({$field} AS VARCHAR)";
1501      }
1502  
1503      public function sql_cast_char2int($fieldname, $text=false) {
1504          return ' CAST(' . $fieldname . ' AS INT) ';
1505      }
1506  
1507      public function sql_cast_char2real($fieldname, $text=false) {
1508          return " $fieldname::real ";
1509      }
1510  
1511      public function sql_concat() {
1512          $arr = func_get_args();
1513          $s = implode(' || ', $arr);
1514          if ($s === '') {
1515              return " '' ";
1516          }
1517          // Add always empty string element so integer-exclusive concats
1518          // will work without needing to cast each element explicitly
1519          return " '' || $s ";
1520      }
1521  
1522      public function sql_concat_join($separator="' '", $elements=array()) {
1523          for ($n=count($elements)-1; $n > 0 ; $n--) {
1524              array_splice($elements, $n, 0, $separator);
1525          }
1526          $s = implode(' || ', $elements);
1527          if ($s === '') {
1528              return " '' ";
1529          }
1530          return " $s ";
1531      }
1532  
1533      /**
1534       * Return SQL for performing group concatenation on given field/expression
1535       *
1536       * @param string $field
1537       * @param string $separator
1538       * @param string $sort
1539       * @return string
1540       */
1541      public function sql_group_concat(string $field, string $separator = ', ', string $sort = ''): string {
1542          $fieldsort = $sort ? "ORDER BY {$sort}" : '';
1543          return "STRING_AGG(" . $this->sql_cast_to_char($field) . ", '{$separator}' {$fieldsort})";
1544      }
1545  
1546      /**
1547       * Returns the SQL text to be used to order by columns, standardising the return
1548       * pattern of null values across database types to sort nulls first when ascending
1549       * and last when descending.
1550       *
1551       * @param string $fieldname The name of the field we need to sort by.
1552       * @param int $sort An order to sort the results in.
1553       * @return string The piece of SQL code to be used in your statement.
1554       */
1555      public function sql_order_by_null(string $fieldname, int $sort = SORT_ASC): string {
1556          return parent::sql_order_by_null($fieldname, $sort) . ' NULLS ' . ($sort == SORT_ASC ? 'FIRST' : 'LAST');
1557      }
1558  
1559      public function sql_regex_supported() {
1560          return true;
1561      }
1562  
1563      public function sql_regex($positivematch = true, $casesensitive = false) {
1564          if ($casesensitive) {
1565              return $positivematch ? '~' : '!~';
1566          } else {
1567              return $positivematch ? '~*' : '!~*';
1568          }
1569      }
1570  
1571      /**
1572       * Does this driver support tool_replace?
1573       *
1574       * @since Moodle 2.6.1
1575       * @return bool
1576       */
1577      public function replace_all_text_supported() {
1578          return true;
1579      }
1580  
1581      public function session_lock_supported() {
1582          return true;
1583      }
1584  
1585      /**
1586       * Obtain session lock
1587       * @param int $rowid id of the row with session record
1588       * @param int $timeout max allowed time to wait for the lock in seconds
1589       * @return bool success
1590       */
1591      public function get_session_lock($rowid, $timeout) {
1592          // NOTE: there is a potential locking problem for database running
1593          //       multiple instances of moodle, we could try to use pg_advisory_lock(int, int),
1594          //       luckily there is not a big chance that they would collide
1595          if (!$this->session_lock_supported()) {
1596              return;
1597          }
1598  
1599          parent::get_session_lock($rowid, $timeout);
1600  
1601          $timeoutmilli = $timeout * 1000;
1602  
1603          $sql = "SET statement_timeout TO $timeoutmilli";
1604          $this->query_start($sql, null, SQL_QUERY_AUX);
1605          $result = pg_query($this->pgsql, $sql);
1606          $this->query_end($result);
1607  
1608          if ($result) {
1609              pg_free_result($result);
1610          }
1611  
1612          $sql = "SELECT pg_advisory_lock($rowid)";
1613          $this->query_start($sql, null, SQL_QUERY_AUX);
1614          $start = time();
1615          $result = pg_query($this->pgsql, $sql);
1616          $end = time();
1617          try {
1618              $this->query_end($result);
1619          } catch (dml_exception $ex) {
1620              if ($end - $start >= $timeout) {
1621                  throw new dml_sessionwait_exception();
1622              } else {
1623                  throw $ex;
1624              }
1625          }
1626  
1627          if ($result) {
1628              pg_free_result($result);
1629          }
1630  
1631          $sql = "SET statement_timeout TO DEFAULT";
1632          $this->query_start($sql, null, SQL_QUERY_AUX);
1633          $result = pg_query($this->pgsql, $sql);
1634          $this->query_end($result);
1635  
1636          if ($result) {
1637              pg_free_result($result);
1638          }
1639      }
1640  
1641      public function release_session_lock($rowid) {
1642          if (!$this->session_lock_supported()) {
1643              return;
1644          }
1645          if (!$this->used_for_db_sessions) {
1646              return;
1647          }
1648  
1649          parent::release_session_lock($rowid);
1650  
1651          $sql = "SELECT pg_advisory_unlock($rowid)";
1652          $this->query_start($sql, null, SQL_QUERY_AUX);
1653          $result = pg_query($this->pgsql, $sql);
1654          $this->query_end($result);
1655  
1656          if ($result) {
1657              pg_free_result($result);
1658          }
1659      }
1660  
1661      /**
1662       * Driver specific start of real database transaction,
1663       * this can not be used directly in code.
1664       * @return void
1665       */
1666      protected function begin_transaction() {
1667          $this->savepointpresent = true;
1668          $sql = "BEGIN ISOLATION LEVEL READ COMMITTED; SAVEPOINT moodle_pg_savepoint";
1669          $this->query_start($sql, null, SQL_QUERY_AUX);
1670          $result = pg_query($this->pgsql, $sql);
1671          $this->query_end($result);
1672  
1673          pg_free_result($result);
1674      }
1675  
1676      /**
1677       * Driver specific commit of real database transaction,
1678       * this can not be used directly in code.
1679       * @return void
1680       */
1681      protected function commit_transaction() {
1682          $this->savepointpresent = false;
1683          $sql = "RELEASE SAVEPOINT moodle_pg_savepoint; COMMIT";
1684          $this->query_start($sql, null, SQL_QUERY_AUX);
1685          $result = pg_query($this->pgsql, $sql);
1686          $this->query_end($result);
1687  
1688          pg_free_result($result);
1689      }
1690  
1691      /**
1692       * Driver specific abort of real database transaction,
1693       * this can not be used directly in code.
1694       * @return void
1695       */
1696      protected function rollback_transaction() {
1697          $this->savepointpresent = false;
1698          $sql = "RELEASE SAVEPOINT moodle_pg_savepoint; ROLLBACK";
1699          $this->query_start($sql, null, SQL_QUERY_AUX);
1700          $result = pg_query($this->pgsql, $sql);
1701          $this->query_end($result);
1702  
1703          pg_free_result($result);
1704      }
1705  
1706      /**
1707       * Helper function trimming (whitespace + quotes) any string
1708       * needed because PG uses to enclose with double quotes some
1709       * fields in indexes definition and others
1710       *
1711       * @param string $str string to apply whitespace + quotes trim
1712       * @return string trimmed string
1713       */
1714      private function trim_quotes($str) {
1715          return trim(trim($str), "'\"");
1716      }
1717  
1718      /**
1719       * Postgresql supports full-text search indexes.
1720       *
1721       * @return bool
1722       */
1723      public function is_fulltext_search_supported() {
1724          return true;
1725      }
1726  }