Search moodle.org's
Developer Documentation

See Release Notes

  • Bug fixes for general core bugs in 4.0.x will end 8 May 2023 (12 months).
  • Bug fixes for security issues in 4.0.x will end 13 November 2023 (18 months).
  • PHP version: minimum PHP 7.3.0 Note: the minimum PHP version has increased since Moodle 3.10. PHP 7.4.x is also supported.

Differences Between: [Versions 310 and 400] [Versions 311 and 400] [Versions 39 and 400] [Versions 400 and 401] [Versions 400 and 402] [Versions 400 and 403]

   1  <?php
   2  // This file is part of Moodle - http://moodle.org/
   3  //
   4  // Moodle is free software: you can redistribute it and/or modify
   5  // it under the terms of the GNU General Public License as published by
   6  // the Free Software Foundation, either version 3 of the License, or
   7  // (at your option) any later version.
   8  //
   9  // Moodle is distributed in the hope that it will be useful,
  10  // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11  // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  12  // GNU General Public License for more details.
  13  //
  14  // You should have received a copy of the GNU General Public License
  15  // along with Moodle.  If not, see <http://www.gnu.org/licenses/>.
  16  
  17  /**
  18   * Native pgsql class representing moodle database interface.
  19   *
  20   * @package    core_dml
  21   * @copyright  2008 Petr Skoda (http://skodak.org)
  22   * @license    http://www.gnu.org/copyleft/gpl.html GNU GPL v3 or later
  23   */
  24  
  25  defined('MOODLE_INTERNAL') || die();
  26  
  27  require_once (__DIR__.'/moodle_database.php');
  28  require_once (__DIR__.'/moodle_read_slave_trait.php');
  29  require_once (__DIR__.'/pgsql_native_moodle_recordset.php');
  30  require_once (__DIR__.'/pgsql_native_moodle_temptables.php');
  31  
  32  /**
  33   * Native pgsql class representing moodle database interface.
  34   *
  35   * @package    core_dml
  36   * @copyright  2008 Petr Skoda (http://skodak.org)
  37   * @license    http://www.gnu.org/copyleft/gpl.html GNU GPL v3 or later
  38   */
  39  class pgsql_native_moodle_database extends moodle_database {
  40      use moodle_read_slave_trait {
  41          select_db_handle as read_slave_select_db_handle;
  42          can_use_readonly as read_slave_can_use_readonly;
  43          query_start as read_slave_query_start;
  44          query_end as read_slave_query_end;
  45      }
  46  
  47      /** @var array $dbhcursor keep track of open cursors */
  48      private $dbhcursor = [];
  49  
  50      /** @var resource $pgsql database resource */
  51      protected $pgsql     = null;
  52  
  53      protected $last_error_reporting; // To handle pgsql driver default verbosity
  54  
  55      /** @var bool savepoint hack for MDL-35506 - workaround for automatic transaction rollback on error */
  56      protected $savepointpresent = false;
  57  
  58      /** @var int Number of cursors used (for constructing a unique ID) */
  59      protected $cursorcount = 0;
  60  
  61      /** @var int Default number of rows to fetch at a time when using recordsets with cursors */
  62      const DEFAULT_FETCH_BUFFER_SIZE = 100000;
  63  
  64      /**
  65       * Detects if all needed PHP stuff installed.
  66       * Note: can be used before connect()
  67       * @return mixed true if ok, string if something
  68       */
  69      public function driver_installed() {
  70          if (!extension_loaded('pgsql')) {
  71              return get_string('pgsqlextensionisnotpresentinphp', 'install');
  72          }
  73          return true;
  74      }
  75  
  76      /**
  77       * Returns database family type - describes SQL dialect
  78       * Note: can be used before connect()
  79       * @return string db family name (mysql, postgres, mssql, oracle, etc.)
  80       */
  81      public function get_dbfamily() {
  82          return 'postgres';
  83      }
  84  
  85      /**
  86       * Returns more specific database driver type
  87       * Note: can be used before connect()
  88       * @return string db type mysqli, pgsql, oci, mssql, sqlsrv
  89       */
  90      protected function get_dbtype() {
  91          return 'pgsql';
  92      }
  93  
  94      /**
  95       * Returns general database library name
  96       * Note: can be used before connect()
  97       * @return string db type pdo, native
  98       */
  99      protected function get_dblibrary() {
 100          return 'native';
 101      }
 102  
 103      /**
 104       * Returns localised database type name
 105       * Note: can be used before connect()
 106       * @return string
 107       */
 108      public function get_name() {
 109          return get_string('nativepgsql', 'install');
 110      }
 111  
 112      /**
 113       * Returns localised database configuration help.
 114       * Note: can be used before connect()
 115       * @return string
 116       */
 117      public function get_configuration_help() {
 118          return get_string('nativepgsqlhelp', 'install');
 119      }
 120  
 121      /**
 122       * Connect to db
 123       * @param string $dbhost The database host.
 124       * @param string $dbuser The database username.
 125       * @param string $dbpass The database username's password.
 126       * @param string $dbname The name of the database being connected to.
 127       * @param mixed $prefix string means moodle db prefix, false used for external databases where prefix not used
 128       * @param array $dboptions driver specific options
 129       * @return bool true
 130       * @throws dml_connection_exception if error
 131       */
 132      public function raw_connect(string $dbhost, string $dbuser, string $dbpass, string $dbname, $prefix, array $dboptions=null): bool {
 133          if ($prefix == '' and !$this->external) {
 134              //Enforce prefixes for everybody but mysql
 135              throw new dml_exception('prefixcannotbeempty', $this->get_dbfamily());
 136          }
 137  
 138          $driverstatus = $this->driver_installed();
 139  
 140          if ($driverstatus !== true) {
 141              throw new dml_exception('dbdriverproblem', $driverstatus);
 142          }
 143  
 144          $this->store_settings($dbhost, $dbuser, $dbpass, $dbname, $prefix, $dboptions);
 145  
 146          $pass = addcslashes($this->dbpass, "'\\");
 147  
 148          // Unix socket connections should have lower overhead
 149          if (!empty($this->dboptions['dbsocket']) and ($this->dbhost === 'localhost' or $this->dbhost === '127.0.0.1')) {
 150              $connection = "user='$this->dbuser' password='$pass' dbname='$this->dbname'";
 151              if (strpos($this->dboptions['dbsocket'], '/') !== false) {
 152                  // A directory was specified as the socket location.
 153                  $connection .= " host='".$this->dboptions['dbsocket']."'";
 154              }
 155              if (!empty($this->dboptions['dbport'])) {
 156                  // A port as specified, add it to the connection as it's used as part of the socket path.
 157                  $connection .= " port ='".$this->dboptions['dbport']."'";
 158              }
 159          } else {
 160              $this->dboptions['dbsocket'] = '';
 161              if (empty($this->dbname)) {
 162                  // probably old style socket connection - do not add port
 163                  $port = "";
 164              } else if (empty($this->dboptions['dbport'])) {
 165                  $port = "port ='5432'";
 166              } else {
 167                  $port = "port ='".$this->dboptions['dbport']."'";
 168              }
 169              $connection = "host='$this->dbhost' $port user='$this->dbuser' password='$pass' dbname='$this->dbname'";
 170          }
 171  
 172          if (!empty($this->dboptions['connecttimeout'])) {
 173              $connection .= " connect_timeout=".$this->dboptions['connecttimeout'];
 174          }
 175  
 176          if (empty($this->dboptions['dbhandlesoptions'])) {
 177              // ALTER USER and ALTER DATABASE are overridden by these settings.
 178              $options = array('--client_encoding=utf8', '--standard_conforming_strings=on');
 179              // Select schema if specified, otherwise the first one wins.
 180              if (!empty($this->dboptions['dbschema'])) {
 181                  $options[] = "-c search_path=" . addcslashes($this->dboptions['dbschema'], "'\\");
 182              }
 183  
 184              $connection .= " options='" . implode(' ', $options) . "'";
 185          }
 186  
 187          ob_start();
 188          // It seems that pg_connect() handles some errors differently.
 189          // For example, name resolution error will raise an exception, and non-existing
 190          // database or wrong credentials will just return false.
 191          // We need to cater for both.
 192          try {
 193              if (empty($this->dboptions['dbpersist'])) {
 194                  $this->pgsql = pg_connect($connection, PGSQL_CONNECT_FORCE_NEW);
 195              } else {
 196                  $this->pgsql = pg_pconnect($connection, PGSQL_CONNECT_FORCE_NEW);
 197              }
 198              $dberr = ob_get_contents();
 199          } catch (\Exception $e) {
 200              $dberr = $e->getMessage();
 201          }
 202          ob_end_clean();
 203  
 204          $status = $this->pgsql ? pg_connection_status($this->pgsql) : false;
 205  
 206          if ($status === false or $status === PGSQL_CONNECTION_BAD) {
 207              $this->pgsql = null;
 208              throw new dml_connection_exception($dberr);
 209          }
 210  
 211          if (!empty($this->dboptions['dbpersist'])) {
 212              // There are rare situations (such as PHP out of memory errors) when open cursors may
 213              // not be closed at the end of a connection. When using persistent connections, the
 214              // cursors remain open and 'get in the way' of future connections. To avoid this
 215              // problem, close all cursors here.
 216              $result = pg_query($this->pgsql, 'CLOSE ALL');
 217              if ($result) {
 218                  pg_free_result($result);
 219              }
 220          }
 221  
 222          if (!empty($this->dboptions['dbhandlesoptions'])) {
 223              /* We don't trust people who just set the dbhandlesoptions, this code checks up on them.
 224               * These functions do not talk to the server, they use the client library knowledge to determine state.
 225               */
 226              if (!empty($this->dboptions['dbschema'])) {
 227                  throw new dml_connection_exception('You cannot specify a schema with dbhandlesoptions, use the database to set it.');
 228              }
 229              if (pg_client_encoding($this->pgsql) != 'UTF8') {
 230                  throw new dml_connection_exception('client_encoding = UTF8 not set, it is: ' . pg_client_encoding($this->pgsql));
 231              }
 232              if (pg_escape_string($this->pgsql, '\\') != '\\') {
 233                  throw new dml_connection_exception('standard_conforming_strings = on, must be set at the database.');
 234              }
 235          }
 236  
 237          // Connection stabilised and configured, going to instantiate the temptables controller
 238          $this->temptables = new pgsql_native_moodle_temptables($this);
 239  
 240          return true;
 241      }
 242  
 243      /**
 244       * Close database connection and release all resources
 245       * and memory (especially circular memory references).
 246       * Do NOT use connect() again, create a new instance if needed.
 247       */
 248      public function dispose() {
 249          parent::dispose(); // Call parent dispose to write/close session and other common stuff before closing connection
 250          if ($this->pgsql) {
 251              pg_close($this->pgsql);
 252              $this->pgsql = null;
 253          }
 254      }
 255  
 256      /**
 257       * Gets db handle currently used with queries
 258       * @return resource
 259       */
 260      protected function get_db_handle() {
 261          return $this->pgsql;
 262      }
 263  
 264      /**
 265       * Sets db handle to be used with subsequent queries
 266       * @param resource $dbh
 267       * @return void
 268       */
 269      protected function set_db_handle($dbh): void {
 270          $this->pgsql = $dbh;
 271      }
 272  
 273      /**
 274       * Select appropriate db handle - readwrite or readonly
 275       * @param int $type type of query
 276       * @param string $sql
 277       * @return void
 278       */
 279      protected function select_db_handle(int $type, string $sql): void {
 280          $this->read_slave_select_db_handle($type, $sql);
 281  
 282          if (preg_match('/^DECLARE (crs\w*) NO SCROLL CURSOR/', $sql, $match)) {
 283              $cursor = $match[1];
 284              $this->dbhcursor[$cursor] = $this->pgsql;
 285          }
 286          if (preg_match('/^(?:FETCH \d+ FROM|CLOSE) (crs\w*)\b/', $sql, $match)) {
 287              $cursor = $match[1];
 288              $this->pgsql = $this->dbhcursor[$cursor];
 289          }
 290      }
 291  
 292      /**
 293       * Check if The query qualifies for readonly connection execution
 294       * Logging queries are exempt, those are write operations that circumvent
 295       * standard query_start/query_end paths.
 296       * @param int $type type of query
 297       * @param string $sql
 298       * @return bool
 299       */
 300      protected function can_use_readonly(int $type, string $sql): bool {
 301          // ... pg_*lock queries always go to master.
 302          if (preg_match('/\bpg_\w*lock/', $sql)) {
 303              return false;
 304          }
 305  
 306          // ... a nuisance - temptables use this.
 307          if (preg_match('/\bpg_catalog/', $sql) && $this->temptables->get_temptables()) {
 308              return false;
 309          }
 310  
 311          return $this->read_slave_can_use_readonly($type, $sql);
 312  
 313      }
 314  
 315      /**
 316       * Called before each db query.
 317       * @param string $sql
 318       * @param array|null $params An array of parameters.
 319       * @param int $type type of query
 320       * @param mixed $extrainfo driver specific extra information
 321       * @return void
 322       */
 323      protected function query_start($sql, ?array $params, $type, $extrainfo=null) {
 324          $this->read_slave_query_start($sql, $params, $type, $extrainfo);
 325          // pgsql driver tends to send debug to output, we do not need that.
 326          $this->last_error_reporting = error_reporting(0);
 327      }
 328  
 329      /**
 330       * Called immediately after each db query.
 331       * @param mixed db specific result
 332       * @return void
 333       */
 334      protected function query_end($result) {
 335          // reset original debug level
 336          error_reporting($this->last_error_reporting);
 337          try {
 338              $this->read_slave_query_end($result);
 339              if ($this->savepointpresent and $this->last_type != SQL_QUERY_AUX and $this->last_type != SQL_QUERY_SELECT) {
 340                  $res = @pg_query($this->pgsql, "RELEASE SAVEPOINT moodle_pg_savepoint; SAVEPOINT moodle_pg_savepoint");
 341                  if ($res) {
 342                      pg_free_result($res);
 343                  }
 344              }
 345          } catch (Exception $e) {
 346              if ($this->savepointpresent) {
 347                  $res = @pg_query($this->pgsql, "ROLLBACK TO SAVEPOINT moodle_pg_savepoint; SAVEPOINT moodle_pg_savepoint");
 348                  if ($res) {
 349                      pg_free_result($res);
 350                  }
 351              }
 352              throw $e;
 353          }
 354      }
 355  
 356      /**
 357       * Returns database server info array
 358       * @return array Array containing 'description' and 'version' info
 359       */
 360      public function get_server_info() {
 361          static $info;
 362          if (!$info) {
 363              $this->query_start("--pg_version()", null, SQL_QUERY_AUX);
 364              $info = pg_version($this->pgsql);
 365              $this->query_end(true);
 366          }
 367          return array('description'=>$info['server'], 'version'=>$info['server']);
 368      }
 369  
 370      /**
 371       * Returns supported query parameter types
 372       * @return int bitmask of accepted SQL_PARAMS_*
 373       */
 374      protected function allowed_param_types() {
 375          return SQL_PARAMS_DOLLAR;
 376      }
 377  
 378      /**
 379       * Returns last error reported by database engine.
 380       * @return string error message
 381       */
 382      public function get_last_error() {
 383          return pg_last_error($this->pgsql);
 384      }
 385  
 386      /**
 387       * Return tables in database WITHOUT current prefix.
 388       * @param bool $usecache if true, returns list of cached tables.
 389       * @return array of table names in lowercase and without prefix
 390       */
 391      public function get_tables($usecache=true) {
 392          if ($usecache and $this->tables !== null) {
 393              return $this->tables;
 394          }
 395          $this->tables = array();
 396          $prefix = str_replace('_', '|_', $this->prefix);
 397          $sql = "SELECT c.relname
 398                    FROM pg_catalog.pg_class c
 399                    JOIN pg_catalog.pg_namespace as ns ON ns.oid = c.relnamespace
 400                   WHERE c.relname LIKE '$prefix%' ESCAPE '|'
 401                         AND c.relkind = 'r'
 402                         AND (ns.nspname = current_schema() OR ns.oid = pg_my_temp_schema())";
 403          $this->query_start($sql, null, SQL_QUERY_AUX);
 404          $result = pg_query($this->pgsql, $sql);
 405          $this->query_end($result);
 406  
 407          if ($result) {
 408              while ($row = pg_fetch_row($result)) {
 409                  $tablename = reset($row);
 410                  if ($this->prefix !== false && $this->prefix !== '') {
 411                      if (strpos($tablename, $this->prefix) !== 0) {
 412                          continue;
 413                      }
 414                      $tablename = substr($tablename, strlen($this->prefix));
 415                  }
 416                  $this->tables[$tablename] = $tablename;
 417              }
 418              pg_free_result($result);
 419          }
 420          return $this->tables;
 421      }
 422  
 423      /**
 424       * Constructs 'IN()' or '=' sql fragment
 425       *
 426       * Method overriding {@see moodle_database::get_in_or_equal} to be able to use
 427       * more than 65535 elements in $items array.
 428       *
 429       * @param mixed $items A single value or array of values for the expression.
 430       * @param int $type Parameter bounding type : SQL_PARAMS_QM or SQL_PARAMS_NAMED.
 431       * @param string $prefix Named parameter placeholder prefix (a unique counter value is appended to each parameter name).
 432       * @param bool $equal True means we want to equate to the constructed expression, false means we don't want to equate to it.
 433       * @param mixed $onemptyitems This defines the behavior when the array of items provided is empty. Defaults to false,
 434       *              meaning throw exceptions. Other values will become part of the returned SQL fragment.
 435       * @throws coding_exception | dml_exception
 436       * @return array A list containing the constructed sql fragment and an array of parameters.
 437       */
 438      public function get_in_or_equal($items, $type=SQL_PARAMS_QM, $prefix='param', $equal=true, $onemptyitems=false): array {
 439          // We only interfere if number of items in expression exceeds 16 bit value.
 440          if (!is_array($items) || count($items) < 65535) {
 441              return parent::get_in_or_equal($items, $type, $prefix,  $equal, $onemptyitems);
 442          }
 443  
 444          // Determine the type from the first value. We don't need to be very smart here,
 445          // it is developer's responsibility to make sure that variable type is matching
 446          // field type, if not the case, DB engine will hint. Also mixing types won't work
 447          // here anyway, so we ignore NULL or boolean (unlikely you need 56k values of
 448          // these types only).
 449          $cast = is_string(current($items)) ? '::text' : '::bigint';
 450  
 451          if ($type == SQL_PARAMS_QM) {
 452              if ($equal) {
 453                  $sql = 'IN (VALUES ('.implode('),(', array_fill(0, count($items), '?'.$cast)).'))';
 454              } else {
 455                  $sql = 'NOT IN (VALUES ('.implode('),(', array_fill(0, count($items), '?'.$cast)).'))';
 456              }
 457              $params = array_values($items);
 458          } else if ($type == SQL_PARAMS_NAMED) {
 459              if (empty($prefix)) {
 460                  $prefix = 'param';
 461              }
 462              $params = [];
 463              $sql = [];
 464              foreach ($items as $item) {
 465                  $param = $prefix.$this->inorequaluniqueindex++;
 466                  $params[$param] = $item;
 467                  $sql[] = ':'.$param.$cast;
 468              }
 469              if ($equal) {
 470                  $sql = 'IN (VALUES ('.implode('),(', $sql).'))';
 471              } else {
 472                  $sql = 'NOT IN (VALUES ('.implode('),(', $sql).'))';
 473              }
 474          } else {
 475              throw new dml_exception('typenotimplement');
 476          }
 477          return [$sql, $params];
 478      }
 479  
 480      /**
 481       * Return table indexes - everything lowercased.
 482       * @param string $table The table we want to get indexes from.
 483       * @return array of arrays
 484       */
 485      public function get_indexes($table) {
 486          $indexes = array();
 487          $tablename = $this->prefix.$table;
 488  
 489          $sql = "SELECT i.*
 490                    FROM pg_catalog.pg_indexes i
 491                    JOIN pg_catalog.pg_namespace as ns ON ns.nspname = i.schemaname
 492                   WHERE i.tablename = '$tablename'
 493                         AND (i.schemaname = current_schema() OR ns.oid = pg_my_temp_schema())";
 494  
 495          $this->query_start($sql, null, SQL_QUERY_AUX);
 496          $result = pg_query($this->pgsql, $sql);
 497          $this->query_end($result);
 498  
 499          if ($result) {
 500              while ($row = pg_fetch_assoc($result)) {
 501                  // The index definition could be generated schema-qualifying the target table name
 502                  // for safety, depending on the pgsql version (CVE-2018-1058).
 503                  if (!preg_match('/CREATE (|UNIQUE )INDEX ([^\s]+) ON (|'.$row['schemaname'].'\.)'.$tablename.' USING ([^\s]+) \(([^\)]+)\)/i', $row['indexdef'], $matches)) {
 504                      continue;
 505                  }
 506                  if ($matches[5] === 'id') {
 507                      continue;
 508                  }
 509                  $columns = explode(',', $matches[5]);
 510                  foreach ($columns as $k=>$column) {
 511                      $column = trim($column);
 512                      if ($pos = strpos($column, ' ')) {
 513                          // index type is separated by space
 514                          $column = substr($column, 0, $pos);
 515                      }
 516                      $columns[$k] = $this->trim_quotes($column);
 517                  }
 518                  $indexes[$row['indexname']] = array('unique'=>!empty($matches[1]),
 519                                                'columns'=>$columns);
 520              }
 521              pg_free_result($result);
 522          }
 523          return $indexes;
 524      }
 525  
 526      /**
 527       * Returns detailed information about columns in table.
 528       *
 529       * @param string $table name
 530       * @return database_column_info[] array of database_column_info objects indexed with column names
 531       */
 532      protected function fetch_columns(string $table): array {
 533          $structure = array();
 534  
 535          $tablename = $this->prefix.$table;
 536  
 537          $sql = "SELECT a.attnum, a.attname AS field, t.typname AS type, a.attlen, a.atttypmod, a.attnotnull, a.atthasdef,
 538                         CASE WHEN a.atthasdef THEN pg_catalog.pg_get_expr(d.adbin, d.adrelid) END AS adsrc
 539                    FROM pg_catalog.pg_class c
 540                    JOIN pg_catalog.pg_namespace as ns ON ns.oid = c.relnamespace
 541                    JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid
 542                    JOIN pg_catalog.pg_type t ON t.oid = a.atttypid
 543               LEFT JOIN pg_catalog.pg_attrdef d ON (d.adrelid = c.oid AND d.adnum = a.attnum)
 544                   WHERE relkind = 'r' AND c.relname = '$tablename' AND c.reltype > 0 AND a.attnum > 0
 545                         AND (ns.nspname = current_schema() OR ns.oid = pg_my_temp_schema())
 546                ORDER BY a.attnum";
 547  
 548          $this->query_start($sql, null, SQL_QUERY_AUX);
 549          $result = pg_query($this->pgsql, $sql);
 550          $this->query_end($result);
 551  
 552          if (!$result) {
 553              return array();
 554          }
 555          while ($rawcolumn = pg_fetch_object($result)) {
 556  
 557              $info = new stdClass();
 558              $info->name = $rawcolumn->field;
 559              $matches = null;
 560  
 561              if ($rawcolumn->type === 'varchar') {
 562                  $info->type          = 'varchar';
 563                  $info->meta_type     = 'C';
 564                  $info->max_length    = $rawcolumn->atttypmod - 4;
 565                  $info->scale         = null;
 566                  $info->not_null      = ($rawcolumn->attnotnull === 't');
 567                  $info->has_default   = ($rawcolumn->atthasdef === 't');
 568                  if ($info->has_default) {
 569                      $parts = explode('::', $rawcolumn->adsrc);
 570                      if (count($parts) > 1) {
 571                          $info->default_value = reset($parts);
 572                          $info->default_value = trim($info->default_value, "'");
 573                      } else {
 574                          $info->default_value = $rawcolumn->adsrc;
 575                      }
 576                  } else {
 577                      $info->default_value = null;
 578                  }
 579                  $info->primary_key   = false;
 580                  $info->binary        = false;
 581                  $info->unsigned      = null;
 582                  $info->auto_increment= false;
 583                  $info->unique        = null;
 584  
 585              } else if (preg_match('/int(\d)/i', $rawcolumn->type, $matches)) {
 586                  $info->type = 'int';
 587                  if (strpos($rawcolumn->adsrc, 'nextval') === 0) {
 588                      $info->primary_key   = true;
 589                      $info->meta_type     = 'R';
 590                      $info->unique        = true;
 591                      $info->auto_increment= true;
 592                      $info->has_default   = false;
 593                  } else {
 594                      $info->primary_key   = false;
 595                      $info->meta_type     = 'I';
 596                      $info->unique        = null;
 597                      $info->auto_increment= false;
 598                      $info->has_default   = ($rawcolumn->atthasdef === 't');
 599                  }
 600                  // Return number of decimals, not bytes here.
 601                  if ($matches[1] >= 8) {
 602                      $info->max_length = 18;
 603                  } else if ($matches[1] >= 4) {
 604                      $info->max_length = 9;
 605                  } else if ($matches[1] >= 2) {
 606                      $info->max_length = 4;
 607                  } else if ($matches[1] >= 1) {
 608                      $info->max_length = 2;
 609                  } else {
 610                      $info->max_length = 0;
 611                  }
 612                  $info->scale         = null;
 613                  $info->not_null      = ($rawcolumn->attnotnull === 't');
 614                  if ($info->has_default) {
 615                      // PG 9.5+ uses ::<TYPE> syntax for some defaults.
 616                      $parts = explode('::', $rawcolumn->adsrc);
 617                      if (count($parts) > 1) {
 618                          $info->default_value = reset($parts);
 619                      } else {
 620                          $info->default_value = $rawcolumn->adsrc;
 621                      }
 622                      $info->default_value = trim($info->default_value, "()'");
 623                  } else {
 624                      $info->default_value = null;
 625                  }
 626                  $info->binary        = false;
 627                  $info->unsigned      = false;
 628  
 629              } else if ($rawcolumn->type === 'numeric') {
 630                  $info->type = $rawcolumn->type;
 631                  $info->meta_type     = 'N';
 632                  $info->primary_key   = false;
 633                  $info->binary        = false;
 634                  $info->unsigned      = null;
 635                  $info->auto_increment= false;
 636                  $info->unique        = null;
 637                  $info->not_null      = ($rawcolumn->attnotnull === 't');
 638                  $info->has_default   = ($rawcolumn->atthasdef === 't');
 639                  if ($info->has_default) {
 640                      // PG 9.5+ uses ::<TYPE> syntax for some defaults.
 641                      $parts = explode('::', $rawcolumn->adsrc);
 642                      if (count($parts) > 1) {
 643                          $info->default_value = reset($parts);
 644                      } else {
 645                          $info->default_value = $rawcolumn->adsrc;
 646                      }
 647                      $info->default_value = trim($info->default_value, "()'");
 648                  } else {
 649                      $info->default_value = null;
 650                  }
 651                  $info->max_length    = $rawcolumn->atttypmod >> 16;
 652                  $info->scale         = ($rawcolumn->atttypmod & 0xFFFF) - 4;
 653  
 654              } else if (preg_match('/float(\d)/i', $rawcolumn->type, $matches)) {
 655                  $info->type = 'float';
 656                  $info->meta_type     = 'N';
 657                  $info->primary_key   = false;
 658                  $info->binary        = false;
 659                  $info->unsigned      = null;
 660                  $info->auto_increment= false;
 661                  $info->unique        = null;
 662                  $info->not_null      = ($rawcolumn->attnotnull === 't');
 663                  $info->has_default   = ($rawcolumn->atthasdef === 't');
 664                  if ($info->has_default) {
 665                      // PG 9.5+ uses ::<TYPE> syntax for some defaults.
 666                      $parts = explode('::', $rawcolumn->adsrc);
 667                      if (count($parts) > 1) {
 668                          $info->default_value = reset($parts);
 669                      } else {
 670                          $info->default_value = $rawcolumn->adsrc;
 671                      }
 672                      $info->default_value = trim($info->default_value, "()'");
 673                  } else {
 674                      $info->default_value = null;
 675                  }
 676                  // just guess expected number of deciaml places :-(
 677                  if ($matches[1] == 8) {
 678                      // total 15 digits
 679                      $info->max_length = 8;
 680                      $info->scale      = 7;
 681                  } else {
 682                      // total 6 digits
 683                      $info->max_length = 4;
 684                      $info->scale      = 2;
 685                  }
 686  
 687              } else if ($rawcolumn->type === 'text') {
 688                  $info->type          = $rawcolumn->type;
 689                  $info->meta_type     = 'X';
 690                  $info->max_length    = -1;
 691                  $info->scale         = null;
 692                  $info->not_null      = ($rawcolumn->attnotnull === 't');
 693                  $info->has_default   = ($rawcolumn->atthasdef === 't');
 694                  if ($info->has_default) {
 695                      $parts = explode('::', $rawcolumn->adsrc);
 696                      if (count($parts) > 1) {
 697                          $info->default_value = reset($parts);
 698                          $info->default_value = trim($info->default_value, "'");
 699                      } else {
 700                          $info->default_value = $rawcolumn->adsrc;
 701                      }
 702                  } else {
 703                      $info->default_value = null;
 704                  }
 705                  $info->primary_key   = false;
 706                  $info->binary        = false;
 707                  $info->unsigned      = null;
 708                  $info->auto_increment= false;
 709                  $info->unique        = null;
 710  
 711              } else if ($rawcolumn->type === 'bytea') {
 712                  $info->type          = $rawcolumn->type;
 713                  $info->meta_type     = 'B';
 714                  $info->max_length    = -1;
 715                  $info->scale         = null;
 716                  $info->not_null      = ($rawcolumn->attnotnull === 't');
 717                  $info->has_default   = false;
 718                  $info->default_value = null;
 719                  $info->primary_key   = false;
 720                  $info->binary        = true;
 721                  $info->unsigned      = null;
 722                  $info->auto_increment= false;
 723                  $info->unique        = null;
 724  
 725              }
 726  
 727              $structure[$info->name] = new database_column_info($info);
 728          }
 729  
 730          pg_free_result($result);
 731  
 732          return $structure;
 733      }
 734  
 735      /**
 736       * Normalise values based in RDBMS dependencies (booleans, LOBs...)
 737       *
 738       * @param database_column_info $column column metadata corresponding with the value we are going to normalise
 739       * @param mixed $value value we are going to normalise
 740       * @return mixed the normalised value
 741       */
 742      protected function normalise_value($column, $value) {
 743          $this->detect_objects($value);
 744  
 745          if (is_bool($value)) { // Always, convert boolean to int
 746              $value = (int)$value;
 747  
 748          } else if ($column->meta_type === 'B') {
 749              if (!is_null($value)) {
 750                  // standard_conforming_strings must be enabled, otherwise pg_escape_bytea() will double escape
 751                  // \ and produce data errors.  This is set on the connection.
 752                  $value = pg_escape_bytea($this->pgsql, $value);
 753              }
 754  
 755          } else if ($value === '') {
 756              if ($column->meta_type === 'I' or $column->meta_type === 'F' or $column->meta_type === 'N') {
 757                  $value = 0; // prevent '' problems in numeric fields
 758              }
 759          }
 760          return $value;
 761      }
 762  
 763      /**
 764       * Is db in unicode mode?
 765       * @return bool
 766       */
 767      public function setup_is_unicodedb() {
 768          // Get PostgreSQL server_encoding value
 769          $sql = "SHOW server_encoding";
 770          $this->query_start($sql, null, SQL_QUERY_AUX);
 771          $result = pg_query($this->pgsql, $sql);
 772          $this->query_end($result);
 773  
 774          if (!$result) {
 775              return false;
 776          }
 777          $rawcolumn = pg_fetch_object($result);
 778          $encoding = $rawcolumn->server_encoding;
 779          pg_free_result($result);
 780  
 781          return (strtoupper($encoding) == 'UNICODE' || strtoupper($encoding) == 'UTF8');
 782      }
 783  
 784      /**
 785       * Do NOT use in code, to be used by database_manager only!
 786       * @param string|array $sql query
 787       * @param array|null $tablenames an array of xmldb table names affected by this request.
 788       * @return bool true
 789       * @throws ddl_change_structure_exception A DDL specific exception is thrown for any errors.
 790       */
 791      public function change_database_structure($sql, $tablenames = null) {
 792          $this->get_manager(); // Includes DDL exceptions classes ;-)
 793          if (is_array($sql)) {
 794              $sql = implode("\n;\n", $sql);
 795          }
 796          if (!$this->is_transaction_started()) {
 797              // It is better to do all or nothing, this helps with recovery...
 798              $sql = "BEGIN ISOLATION LEVEL SERIALIZABLE;\n$sql\n; COMMIT";
 799          }
 800  
 801          try {
 802              $this->query_start($sql, null, SQL_QUERY_STRUCTURE);
 803              $result = pg_query($this->pgsql, $sql);
 804              $this->query_end($result);
 805              pg_free_result($result);
 806          } catch (ddl_change_structure_exception $e) {
 807              if (!$this->is_transaction_started()) {
 808                  $result = @pg_query($this->pgsql, "ROLLBACK");
 809                  @pg_free_result($result);
 810              }
 811              $this->reset_caches($tablenames);
 812              throw $e;
 813          }
 814  
 815          $this->reset_caches($tablenames);
 816          return true;
 817      }
 818  
 819      /**
 820       * Execute general sql query. Should be used only when no other method suitable.
 821       * Do NOT use this to make changes in db structure, use database_manager methods instead!
 822       * @param string $sql query
 823       * @param array $params query parameters
 824       * @return bool true
 825       * @throws dml_exception A DML specific exception is thrown for any errors.
 826       */
 827      public function execute($sql, array $params=null) {
 828          list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
 829  
 830          if (strpos($sql, ';') !== false) {
 831              throw new coding_exception('moodle_database::execute() Multiple sql statements found or bound parameters not used properly in query!');
 832          }
 833  
 834          $this->query_start($sql, $params, SQL_QUERY_UPDATE);
 835          $result = pg_query_params($this->pgsql, $sql, $params);
 836          $this->query_end($result);
 837  
 838          pg_free_result($result);
 839          return true;
 840      }
 841  
 842      /**
 843       * Get a number of records as a moodle_recordset using a SQL statement.
 844       *
 845       * Since this method is a little less readable, use of it should be restricted to
 846       * code where it's possible there might be large datasets being returned.  For known
 847       * small datasets use get_records_sql - it leads to simpler code.
 848       *
 849       * The return type is like:
 850       * @see function get_recordset.
 851       *
 852       * @param string $sql the SQL select query to execute.
 853       * @param array $params array of sql parameters
 854       * @param int $limitfrom return a subset of records, starting at this point (optional, required if $limitnum is set).
 855       * @param int $limitnum return a subset comprising this many records (optional, required if $limitfrom is set).
 856       * @return moodle_recordset instance
 857       * @throws dml_exception A DML specific exception is thrown for any errors.
 858       */
 859      public function get_recordset_sql($sql, array $params=null, $limitfrom=0, $limitnum=0) {
 860  
 861          list($limitfrom, $limitnum) = $this->normalise_limit_from_num($limitfrom, $limitnum);
 862  
 863          if ($limitnum) {
 864              $sql .= " LIMIT $limitnum";
 865          }
 866          if ($limitfrom) {
 867              $sql .= " OFFSET $limitfrom";
 868          }
 869  
 870          list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
 871  
 872          // For any query that doesn't explicitly specify a limit, we must use cursors to stop it
 873          // loading the entire thing (unless the config setting is turned off).
 874          $usecursors = !$limitnum && ($this->get_fetch_buffer_size() > 0);
 875          if ($usecursors) {
 876              // Work out the cursor unique identifer. This is based on a simple count used which
 877              // should be OK because the identifiers only need to be unique within the current
 878              // transaction.
 879              $this->cursorcount++;
 880              $cursorname = 'crs' . $this->cursorcount;
 881  
 882              // Do the query to a cursor.
 883              $sql = 'DECLARE ' . $cursorname . ' NO SCROLL CURSOR WITH HOLD FOR ' . $sql;
 884          } else {
 885              $cursorname = '';
 886          }
 887  
 888          $this->query_start($sql, $params, SQL_QUERY_SELECT);
 889  
 890          $result = pg_query_params($this->pgsql, $sql, $params);
 891  
 892          $this->query_end($result);
 893          if ($usecursors) {
 894              pg_free_result($result);
 895              $result = null;
 896          }
 897  
 898          return new pgsql_native_moodle_recordset($result, $this, $cursorname);
 899      }
 900  
 901      /**
 902       * Gets size of fetch buffer used for recordset queries.
 903       *
 904       * If this returns 0 then cursors will not be used, meaning recordset queries will occupy enough
 905       * memory as needed for the Postgres library to hold the entire query results in memory.
 906       *
 907       * @return int Fetch buffer size or 0 indicating not to use cursors
 908       */
 909      protected function get_fetch_buffer_size() {
 910          if (array_key_exists('fetchbuffersize', $this->dboptions)) {
 911              return (int)$this->dboptions['fetchbuffersize'];
 912          } else {
 913              return self::DEFAULT_FETCH_BUFFER_SIZE;
 914          }
 915      }
 916  
 917      /**
 918       * Retrieves data from cursor. For use by recordset only; do not call directly.
 919       *
 920       * Return value contains the next batch of Postgres data, and a boolean indicating if this is
 921       * definitely the last batch (if false, there may be more)
 922       *
 923       * @param string $cursorname Name of cursor to read from
 924       * @return array Array with 2 elements (next data batch and boolean indicating last batch)
 925       */
 926      public function fetch_from_cursor($cursorname) {
 927          $count = $this->get_fetch_buffer_size();
 928  
 929          $sql = 'FETCH ' . $count . ' FROM ' . $cursorname;
 930  
 931          $this->query_start($sql, [], SQL_QUERY_AUX);
 932          $result = pg_query($this->pgsql, $sql);
 933          $last = pg_num_rows($result) !== $count;
 934  
 935          $this->query_end($result);
 936  
 937          return [$result, $last];
 938      }
 939  
 940      /**
 941       * Closes a cursor. For use by recordset only; do not call directly.
 942       *
 943       * @param string $cursorname Name of cursor to close
 944       * @return bool True if we actually closed one, false if the transaction was cancelled
 945       */
 946      public function close_cursor($cursorname) {
 947          // If the transaction got cancelled, then ignore this request.
 948          $sql = 'CLOSE ' . $cursorname;
 949          $this->query_start($sql, [], SQL_QUERY_AUX);
 950          $result = pg_query($this->pgsql, $sql);
 951          $this->query_end($result);
 952          if ($result) {
 953              pg_free_result($result);
 954          }
 955          return true;
 956      }
 957  
 958      /**
 959       * A faster version of pg_field_type
 960       *
 961       * The pg_field_type function in the php postgres driver internally makes an sql call
 962       * to get the list of field types which it statically caches only for a single request.
 963       * This wraps it in a cache keyed by oid to avoid these DB calls on every request.
 964       *
 965       * @param resource $result
 966       * @param int $fieldnumber
 967       * @return string Field type
 968       */
 969      public function pg_field_type($result, int $fieldnumber) {
 970          static $map;
 971          $cache = $this->get_metacache();
 972  
 973          // Getting the oid doesn't make an internal query.
 974          $oid = pg_field_type_oid($result, $fieldnumber);
 975          if (!$map) {
 976              $map = $cache->get('oid2typname');
 977          }
 978          if ($map === false) {
 979              $map = [];
 980          }
 981          if (isset($map[$oid])) {
 982              return $map[$oid];
 983          }
 984          $map[$oid] = pg_field_type($result, $fieldnumber);
 985          $cache->set('oid2typname', $map);
 986          return $map[$oid];
 987      }
 988  
 989      /**
 990       * Get a number of records as an array of objects using a SQL statement.
 991       *
 992       * Return value is like:
 993       * @see function get_records.
 994       *
 995       * @param string $sql the SQL select query to execute. The first column of this SELECT statement
 996       *   must be a unique value (usually the 'id' field), as it will be used as the key of the
 997       *   returned array.
 998       * @param array $params array of sql parameters
 999       * @param int $limitfrom return a subset of records, starting at this point (optional, required if $limitnum is set).
1000       * @param int $limitnum return a subset comprising this many records (optional, required if $limitfrom is set).
1001       * @return array of objects, or empty array if no records were found
1002       * @throws dml_exception A DML specific exception is thrown for any errors.
1003       */
1004      public function get_records_sql($sql, array $params = null, $limitfrom = 0, $limitnum = 0) {
1005          list($limitfrom, $limitnum) = $this->normalise_limit_from_num($limitfrom, $limitnum);
1006  
1007          if ($limitnum) {
1008              $sql .= " LIMIT $limitnum";
1009          }
1010          if ($limitfrom) {
1011              $sql .= " OFFSET $limitfrom";
1012          }
1013  
1014          list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
1015          $this->query_start($sql, $params, SQL_QUERY_SELECT);
1016          $result = pg_query_params($this->pgsql, $sql, $params);
1017          $this->query_end($result);
1018  
1019          // find out if there are any blobs
1020          $numfields = pg_num_fields($result);
1021          $blobs = array();
1022          for ($i = 0; $i < $numfields; $i++) {
1023              $type = $this->pg_field_type($result, $i);
1024              if ($type == 'bytea') {
1025                  $blobs[] = pg_field_name($result, $i);
1026              }
1027          }
1028  
1029          $return = [];
1030          while ($row = pg_fetch_assoc($result)) {
1031              $id = reset($row);
1032              if ($blobs) {
1033                  foreach ($blobs as $blob) {
1034                      $row[$blob] = ($row[$blob] !== null ? pg_unescape_bytea($row[$blob]) : null);
1035                  }
1036              }
1037              if (isset($return[$id])) {
1038                  $colname = key($row);
1039                  debugging("Did you remember to make the first column something unique in your call to get_records? Duplicate value '$id' found in column '$colname'.", DEBUG_DEVELOPER);
1040              }
1041              $return[$id] = (object) $row;
1042          }
1043  
1044          return $return;
1045      }
1046  
1047      /**
1048       * Selects records and return values (first field) as an array using a SQL statement.
1049       *
1050       * @param string $sql The SQL query
1051       * @param array $params array of sql parameters
1052       * @return array of values
1053       * @throws dml_exception A DML specific exception is thrown for any errors.
1054       */
1055      public function get_fieldset_sql($sql, array $params=null) {
1056          list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
1057  
1058          $this->query_start($sql, $params, SQL_QUERY_SELECT);
1059          $result = pg_query_params($this->pgsql, $sql, $params);
1060          $this->query_end($result);
1061  
1062          $return = pg_fetch_all_columns($result, 0);
1063  
1064          if ($this->pg_field_type($result, 0) == 'bytea') {
1065              foreach ($return as $key => $value) {
1066                  $return[$key] = ($value === null ? $value : pg_unescape_bytea($value));
1067              }
1068          }
1069  
1070          pg_free_result($result);
1071  
1072          return $return;
1073      }
1074  
1075      /**
1076       * Insert new record into database, as fast as possible, no safety checks, lobs not supported.
1077       * @param string $table name
1078       * @param mixed $params data record as object or array
1079       * @param bool $returnit return it of inserted record
1080       * @param bool $bulk true means repeated inserts expected
1081       * @param bool $customsequence true if 'id' included in $params, disables $returnid
1082       * @return bool|int true or new id
1083       * @throws dml_exception A DML specific exception is thrown for any errors.
1084       */
1085      public function insert_record_raw($table, $params, $returnid=true, $bulk=false, $customsequence=false) {
1086          if (!is_array($params)) {
1087              $params = (array)$params;
1088          }
1089  
1090          $returning = "";
1091  
1092          if ($customsequence) {
1093              if (!isset($params['id'])) {
1094                  throw new coding_exception('moodle_database::insert_record_raw() id field must be specified if custom sequences used.');
1095              }
1096              $returnid = false;
1097          } else {
1098              if ($returnid) {
1099                  $returning = "RETURNING id";
1100                  unset($params['id']);
1101              } else {
1102                  unset($params['id']);
1103              }
1104          }
1105  
1106          if (empty($params)) {
1107              throw new coding_exception('moodle_database::insert_record_raw() no fields found.');
1108          }
1109  
1110          $fields = implode(',', array_keys($params));
1111          $values = array();
1112          $i = 1;
1113          foreach ($params as $value) {
1114              $this->detect_objects($value);
1115              $values[] = "\$".$i++;
1116          }
1117          $values = implode(',', $values);
1118  
1119          $sql = "INSERT INTO {$this->prefix}$table ($fields) VALUES($values) $returning";
1120          $this->query_start($sql, $params, SQL_QUERY_INSERT);
1121          $result = pg_query_params($this->pgsql, $sql, $params);
1122          $this->query_end($result);
1123  
1124          if ($returning !== "") {
1125              $row = pg_fetch_assoc($result);
1126              $params['id'] = reset($row);
1127          }
1128          pg_free_result($result);
1129  
1130          if (!$returnid) {
1131              return true;
1132          }
1133  
1134          return (int)$params['id'];
1135      }
1136  
1137      /**
1138       * Insert a record into a table and return the "id" field if required.
1139       *
1140       * Some conversions and safety checks are carried out. Lobs are supported.
1141       * If the return ID isn't required, then this just reports success as true/false.
1142       * $data is an object containing needed data
1143       * @param string $table The database table to be inserted into
1144       * @param object|array $dataobject A data object with values for one or more fields in the record
1145       * @param bool $returnid Should the id of the newly created record entry be returned? If this option is not requested then true/false is returned.
1146       * @return bool|int true or new id
1147       * @throws dml_exception A DML specific exception is thrown for any errors.
1148       */
1149      public function insert_record($table, $dataobject, $returnid=true, $bulk=false) {
1150          $dataobject = (array)$dataobject;
1151  
1152          $columns = $this->get_columns($table);
1153          if (empty($columns)) {
1154              throw new dml_exception('ddltablenotexist', $table);
1155          }
1156  
1157          $cleaned = array();
1158  
1159          foreach ($dataobject as $field=>$value) {
1160              if ($field === 'id') {
1161                  continue;
1162              }
1163              if (!isset($columns[$field])) {
1164                  continue;
1165              }
1166              $column = $columns[$field];
1167              $cleaned[$field] = $this->normalise_value($column, $value);
1168          }
1169  
1170          return $this->insert_record_raw($table, $cleaned, $returnid, $bulk);
1171  
1172      }
1173  
1174      /**
1175       * Insert multiple records into database as fast as possible.
1176       *
1177       * Order of inserts is maintained, but the operation is not atomic,
1178       * use transactions if necessary.
1179       *
1180       * This method is intended for inserting of large number of small objects,
1181       * do not use for huge objects with text or binary fields.
1182       *
1183       * @since Moodle 2.7
1184       *
1185       * @param string $table  The database table to be inserted into
1186       * @param array|Traversable $dataobjects list of objects to be inserted, must be compatible with foreach
1187       * @return void does not return new record ids
1188       *
1189       * @throws coding_exception if data objects have different structure
1190       * @throws dml_exception A DML specific exception is thrown for any errors.
1191       */
1192      public function insert_records($table, $dataobjects) {
1193          if (!is_array($dataobjects) and !($dataobjects instanceof Traversable)) {
1194              throw new coding_exception('insert_records() passed non-traversable object');
1195          }
1196  
1197          // PostgreSQL does not seem to have problems with huge queries.
1198          $chunksize = 500;
1199          if (!empty($this->dboptions['bulkinsertsize'])) {
1200              $chunksize = (int)$this->dboptions['bulkinsertsize'];
1201          }
1202  
1203          $columns = $this->get_columns($table, true);
1204  
1205          $fields = null;
1206          $count = 0;
1207          $chunk = array();
1208          foreach ($dataobjects as $dataobject) {
1209              if (!is_array($dataobject) and !is_object($dataobject)) {
1210                  throw new coding_exception('insert_records() passed invalid record object');
1211              }
1212              $dataobject = (array)$dataobject;
1213              if ($fields === null) {
1214                  $fields = array_keys($dataobject);
1215                  $columns = array_intersect_key($columns, $dataobject);
1216                  unset($columns['id']);
1217              } else if ($fields !== array_keys($dataobject)) {
1218                  throw new coding_exception('All dataobjects in insert_records() must have the same structure!');
1219              }
1220  
1221              $count++;
1222              $chunk[] = $dataobject;
1223  
1224              if ($count === $chunksize) {
1225                  $this->insert_chunk($table, $chunk, $columns);
1226                  $chunk = array();
1227                  $count = 0;
1228              }
1229          }
1230  
1231          if ($count) {
1232              $this->insert_chunk($table, $chunk, $columns);
1233          }
1234      }
1235  
1236      /**
1237       * Insert records in chunks, strict param types...
1238       *
1239       * Note: can be used only from insert_records().
1240       *
1241       * @param string $table
1242       * @param array $chunk
1243       * @param database_column_info[] $columns
1244       */
1245      protected function insert_chunk($table, array $chunk, array $columns) {
1246          $i = 1;
1247          $params = array();
1248          $values = array();
1249          foreach ($chunk as $dataobject) {
1250              $vals = array();
1251              foreach ($columns as $field => $column) {
1252                  $params[] = $this->normalise_value($column, $dataobject[$field]);
1253                  $vals[] = "\$".$i++;
1254              }
1255              $values[] = '('.implode(',', $vals).')';
1256          }
1257  
1258          $fieldssql = '('.implode(',', array_keys($columns)).')';
1259          $valuessql = implode(',', $values);
1260  
1261          $sql = "INSERT INTO {$this->prefix}$table $fieldssql VALUES $valuessql";
1262          $this->query_start($sql, $params, SQL_QUERY_INSERT);
1263          $result = pg_query_params($this->pgsql, $sql, $params);
1264          $this->query_end($result);
1265          pg_free_result($result);
1266      }
1267  
1268      /**
1269       * Import a record into a table, id field is required.
1270       * Safety checks are NOT carried out. Lobs are supported.
1271       *
1272       * @param string $table name of database table to be inserted into
1273       * @param object $dataobject A data object with values for one or more fields in the record
1274       * @return bool true
1275       * @throws dml_exception A DML specific exception is thrown for any errors.
1276       */
1277      public function import_record($table, $dataobject) {
1278          $dataobject = (array)$dataobject;
1279  
1280          $columns = $this->get_columns($table);
1281          $cleaned = array();
1282  
1283          foreach ($dataobject as $field=>$value) {
1284              $this->detect_objects($value);
1285              if (!isset($columns[$field])) {
1286                  continue;
1287              }
1288              $column = $columns[$field];
1289              $cleaned[$field] = $this->normalise_value($column, $value);
1290          }
1291  
1292          return $this->insert_record_raw($table, $cleaned, false, true, true);
1293      }
1294  
1295      /**
1296       * Update record in database, as fast as possible, no safety checks, lobs not supported.
1297       * @param string $table name
1298       * @param mixed $params data record as object or array
1299       * @param bool true means repeated updates expected
1300       * @return bool true
1301       * @throws dml_exception A DML specific exception is thrown for any errors.
1302       */
1303      public function update_record_raw($table, $params, $bulk=false) {
1304          $params = (array)$params;
1305  
1306          if (!isset($params['id'])) {
1307              throw new coding_exception('moodle_database::update_record_raw() id field must be specified.');
1308          }
1309          $id = $params['id'];
1310          unset($params['id']);
1311  
1312          if (empty($params)) {
1313              throw new coding_exception('moodle_database::update_record_raw() no fields found.');
1314          }
1315  
1316          $i = 1;
1317  
1318          $sets = array();
1319          foreach ($params as $field=>$value) {
1320              $this->detect_objects($value);
1321              $sets[] = "$field = \$".$i++;
1322          }
1323  
1324          $params[] = $id; // last ? in WHERE condition
1325  
1326          $sets = implode(',', $sets);
1327          $sql = "UPDATE {$this->prefix}$table SET $sets WHERE id=\$".$i;
1328  
1329          $this->query_start($sql, $params, SQL_QUERY_UPDATE);
1330          $result = pg_query_params($this->pgsql, $sql, $params);
1331          $this->query_end($result);
1332  
1333          pg_free_result($result);
1334          return true;
1335      }
1336  
1337      /**
1338       * Update a record in a table
1339       *
1340       * $dataobject is an object containing needed data
1341       * Relies on $dataobject having a variable "id" to
1342       * specify the record to update
1343       *
1344       * @param string $table The database table to be checked against.
1345       * @param object $dataobject An object with contents equal to fieldname=>fieldvalue. Must have an entry for 'id' to map to the table specified.
1346       * @param bool true means repeated updates expected
1347       * @return bool true
1348       * @throws dml_exception A DML specific exception is thrown for any errors.
1349       */
1350      public function update_record($table, $dataobject, $bulk=false) {
1351          $dataobject = (array)$dataobject;
1352  
1353          $columns = $this->get_columns($table);
1354          $cleaned = array();
1355  
1356          foreach ($dataobject as $field=>$value) {
1357              if (!isset($columns[$field])) {
1358                  continue;
1359              }
1360              $column = $columns[$field];
1361              $cleaned[$field] = $this->normalise_value($column, $value);
1362          }
1363  
1364          $this->update_record_raw($table, $cleaned, $bulk);
1365  
1366          return true;
1367      }
1368  
1369      /**
1370       * Set a single field in every table record which match a particular WHERE clause.
1371       *
1372       * @param string $table The database table to be checked against.
1373       * @param string $newfield the field to set.
1374       * @param string $newvalue the value to set the field to.
1375       * @param string $select A fragment of SQL to be used in a where clause in the SQL call.
1376       * @param array $params array of sql parameters
1377       * @return bool true
1378       * @throws dml_exception A DML specific exception is thrown for any errors.
1379       */
1380      public function set_field_select($table, $newfield, $newvalue, $select, array $params=null) {
1381  
1382          if ($select) {
1383              $select = "WHERE $select";
1384          }
1385          if (is_null($params)) {
1386              $params = array();
1387          }
1388          list($select, $params, $type) = $this->fix_sql_params($select, $params);
1389          $i = count($params)+1;
1390  
1391          // Get column metadata
1392          $columns = $this->get_columns($table);
1393          $column = $columns[$newfield];
1394  
1395          $normalisedvalue = $this->normalise_value($column, $newvalue);
1396  
1397          $newfield = "$newfield = \$" . $i;
1398          $params[] = $normalisedvalue;
1399          $sql = "UPDATE {$this->prefix}$table SET $newfield $select";
1400  
1401          $this->query_start($sql, $params, SQL_QUERY_UPDATE);
1402          $result = pg_query_params($this->pgsql, $sql, $params);
1403          $this->query_end($result);
1404  
1405          pg_free_result($result);
1406  
1407          return true;
1408      }
1409  
1410      /**
1411       * Delete one or more records from a table which match a particular WHERE clause, lobs not supported.
1412       *
1413       * @param string $table The database table to be checked against.
1414       * @param string $select A fragment of SQL to be used in a where clause in the SQL call (used to define the selection criteria).
1415       * @param array $params array of sql parameters
1416       * @return bool true
1417       * @throws dml_exception A DML specific exception is thrown for any errors.
1418       */
1419      public function delete_records_select($table, $select, array $params=null) {
1420          if ($select) {
1421              $select = "WHERE $select";
1422          }
1423          $sql = "DELETE FROM {$this->prefix}$table $select";
1424  
1425          list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
1426  
1427          $this->query_start($sql, $params, SQL_QUERY_UPDATE);
1428          $result = pg_query_params($this->pgsql, $sql, $params);
1429          $this->query_end($result);
1430  
1431          pg_free_result($result);
1432  
1433          return true;
1434      }
1435  
1436      /**
1437       * Returns 'LIKE' part of a query.
1438       *
1439       * @param string $fieldname usually name of the table column
1440       * @param string $param usually bound query parameter (?, :named)
1441       * @param bool $casesensitive use case sensitive search
1442       * @param bool $accensensitive use accent sensitive search (not all databases support accent insensitive)
1443       * @param bool $notlike true means "NOT LIKE"
1444       * @param string $escapechar escape char for '%' and '_'
1445       * @return string SQL code fragment
1446       */
1447      public function sql_like($fieldname, $param, $casesensitive = true, $accentsensitive = true, $notlike = false, $escapechar = '\\') {
1448          if (strpos($param, '%') !== false) {
1449              debugging('Potential SQL injection detected, sql_like() expects bound parameters (? or :named)');
1450          }
1451  
1452          // postgresql does not support accent insensitive text comparisons, sorry
1453          if ($casesensitive) {
1454              $LIKE = $notlike ? 'NOT LIKE' : 'LIKE';
1455          } else {
1456              $LIKE = $notlike ? 'NOT ILIKE' : 'ILIKE';
1457          }
1458          return "$fieldname $LIKE $param ESCAPE '$escapechar'";
1459      }
1460  
1461      public function sql_bitxor($int1, $int2) {
1462          return '((' . $int1 . ') # (' . $int2 . '))';
1463      }
1464  
1465      public function sql_cast_char2int($fieldname, $text=false) {
1466          return ' CAST(' . $fieldname . ' AS INT) ';
1467      }
1468  
1469      public function sql_cast_char2real($fieldname, $text=false) {
1470          return " $fieldname::real ";
1471      }
1472  
1473      public function sql_concat() {
1474          $arr = func_get_args();
1475          $s = implode(' || ', $arr);
1476          if ($s === '') {
1477              return " '' ";
1478          }
1479          // Add always empty string element so integer-exclusive concats
1480          // will work without needing to cast each element explicitly
1481          return " '' || $s ";
1482      }
1483  
1484      public function sql_concat_join($separator="' '", $elements=array()) {
1485          for ($n=count($elements)-1; $n > 0 ; $n--) {
1486              array_splice($elements, $n, 0, $separator);
1487          }
1488          $s = implode(' || ', $elements);
1489          if ($s === '') {
1490              return " '' ";
1491          }
1492          return " $s ";
1493      }
1494  
1495      /**
1496       * Return SQL for performing group concatenation on given field/expression
1497       *
1498       * @param string $field
1499       * @param string $separator
1500       * @param string $sort
1501       * @return string
1502       */
1503      public function sql_group_concat(string $field, string $separator = ', ', string $sort = ''): string {
1504          $fieldsort = $sort ? "ORDER BY {$sort}" : '';
1505          return "STRING_AGG(CAST({$field} AS VARCHAR), '{$separator}' {$fieldsort})";
1506      }
1507  
1508      public function sql_regex_supported() {
1509          return true;
1510      }
1511  
1512      public function sql_regex($positivematch = true, $casesensitive = false) {
1513          if ($casesensitive) {
1514              return $positivematch ? '~' : '!~';
1515          } else {
1516              return $positivematch ? '~*' : '!~*';
1517          }
1518      }
1519  
1520      /**
1521       * Does this driver support tool_replace?
1522       *
1523       * @since Moodle 2.6.1
1524       * @return bool
1525       */
1526      public function replace_all_text_supported() {
1527          return true;
1528      }
1529  
1530      public function session_lock_supported() {
1531          return true;
1532      }
1533  
1534      /**
1535       * Obtain session lock
1536       * @param int $rowid id of the row with session record
1537       * @param int $timeout max allowed time to wait for the lock in seconds
1538       * @return bool success
1539       */
1540      public function get_session_lock($rowid, $timeout) {
1541          // NOTE: there is a potential locking problem for database running
1542          //       multiple instances of moodle, we could try to use pg_advisory_lock(int, int),
1543          //       luckily there is not a big chance that they would collide
1544          if (!$this->session_lock_supported()) {
1545              return;
1546          }
1547  
1548          parent::get_session_lock($rowid, $timeout);
1549  
1550          $timeoutmilli = $timeout * 1000;
1551  
1552          $sql = "SET statement_timeout TO $timeoutmilli";
1553          $this->query_start($sql, null, SQL_QUERY_AUX);
1554          $result = pg_query($this->pgsql, $sql);
1555          $this->query_end($result);
1556  
1557          if ($result) {
1558              pg_free_result($result);
1559          }
1560  
1561          $sql = "SELECT pg_advisory_lock($rowid)";
1562          $this->query_start($sql, null, SQL_QUERY_AUX);
1563          $start = time();
1564          $result = pg_query($this->pgsql, $sql);
1565          $end = time();
1566          try {
1567              $this->query_end($result);
1568          } catch (dml_exception $ex) {
1569              if ($end - $start >= $timeout) {
1570                  throw new dml_sessionwait_exception();
1571              } else {
1572                  throw $ex;
1573              }
1574          }
1575  
1576          if ($result) {
1577              pg_free_result($result);
1578          }
1579  
1580          $sql = "SET statement_timeout TO DEFAULT";
1581          $this->query_start($sql, null, SQL_QUERY_AUX);
1582          $result = pg_query($this->pgsql, $sql);
1583          $this->query_end($result);
1584  
1585          if ($result) {
1586              pg_free_result($result);
1587          }
1588      }
1589  
1590      public function release_session_lock($rowid) {
1591          if (!$this->session_lock_supported()) {
1592              return;
1593          }
1594          if (!$this->used_for_db_sessions) {
1595              return;
1596          }
1597  
1598          parent::release_session_lock($rowid);
1599  
1600          $sql = "SELECT pg_advisory_unlock($rowid)";
1601          $this->query_start($sql, null, SQL_QUERY_AUX);
1602          $result = pg_query($this->pgsql, $sql);
1603          $this->query_end($result);
1604  
1605          if ($result) {
1606              pg_free_result($result);
1607          }
1608      }
1609  
1610      /**
1611       * Driver specific start of real database transaction,
1612       * this can not be used directly in code.
1613       * @return void
1614       */
1615      protected function begin_transaction() {
1616          $this->savepointpresent = true;
1617          $sql = "BEGIN ISOLATION LEVEL READ COMMITTED; SAVEPOINT moodle_pg_savepoint";
1618          $this->query_start($sql, null, SQL_QUERY_AUX);
1619          $result = pg_query($this->pgsql, $sql);
1620          $this->query_end($result);
1621  
1622          pg_free_result($result);
1623      }
1624  
1625      /**
1626       * Driver specific commit of real database transaction,
1627       * this can not be used directly in code.
1628       * @return void
1629       */
1630      protected function commit_transaction() {
1631          $this->savepointpresent = false;
1632          $sql = "RELEASE SAVEPOINT moodle_pg_savepoint; COMMIT";
1633          $this->query_start($sql, null, SQL_QUERY_AUX);
1634          $result = pg_query($this->pgsql, $sql);
1635          $this->query_end($result);
1636  
1637          pg_free_result($result);
1638      }
1639  
1640      /**
1641       * Driver specific abort of real database transaction,
1642       * this can not be used directly in code.
1643       * @return void
1644       */
1645      protected function rollback_transaction() {
1646          $this->savepointpresent = false;
1647          $sql = "RELEASE SAVEPOINT moodle_pg_savepoint; ROLLBACK";
1648          $this->query_start($sql, null, SQL_QUERY_AUX);
1649          $result = pg_query($this->pgsql, $sql);
1650          $this->query_end($result);
1651  
1652          pg_free_result($result);
1653      }
1654  
1655      /**
1656       * Helper function trimming (whitespace + quotes) any string
1657       * needed because PG uses to enclose with double quotes some
1658       * fields in indexes definition and others
1659       *
1660       * @param string $str string to apply whitespace + quotes trim
1661       * @return string trimmed string
1662       */
1663      private function trim_quotes($str) {
1664          return trim(trim($str), "'\"");
1665      }
1666  
1667      /**
1668       * Postgresql supports full-text search indexes.
1669       *
1670       * @return bool
1671       */
1672      public function is_fulltext_search_supported() {
1673          return true;
1674      }
1675  }