Search moodle.org's
Developer Documentation

See Release Notes

  • Bug fixes for general core bugs in 3.11.x will end 14 Nov 2022 (12 months plus 6 months extension).
  • Bug fixes for security issues in 3.11.x will end 13 Nov 2023 (18 months plus 12 months extension).
  • PHP version: minimum PHP 7.3.0 Note: minimum PHP version has increased since Moodle 3.10. PHP 7.4.x is supported too.

Differences Between: [Versions 310 and 311] [Versions 311 and 400] [Versions 311 and 401] [Versions 311 and 402] [Versions 311 and 403] [Versions 39 and 311]

   1  <?php
   2  // This file is part of Moodle - http://moodle.org/
   3  //
   4  // Moodle is free software: you can redistribute it and/or modify
   5  // it under the terms of the GNU General Public License as published by
   6  // the Free Software Foundation, either version 3 of the License, or
   7  // (at your option) any later version.
   8  //
   9  // Moodle is distributed in the hope that it will be useful,
  10  // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11  // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  12  // GNU General Public License for more details.
  13  //
  14  // You should have received a copy of the GNU General Public License
  15  // along with Moodle.  If not, see <http://www.gnu.org/licenses/>.
  16  
  17  /**
  18   * Native pgsql class representing moodle database interface.
  19   *
  20   * @package    core_dml
  21   * @copyright  2008 Petr Skoda (http://skodak.org)
  22   * @license    http://www.gnu.org/copyleft/gpl.html GNU GPL v3 or later
  23   */
  24  
  25  defined('MOODLE_INTERNAL') || die();
  26  
  27  require_once (__DIR__.'/moodle_database.php');
  28  require_once (__DIR__.'/moodle_read_slave_trait.php');
  29  require_once (__DIR__.'/pgsql_native_moodle_recordset.php');
  30  require_once (__DIR__.'/pgsql_native_moodle_temptables.php');
  31  
  32  /**
  33   * Native pgsql class representing moodle database interface.
  34   *
  35   * @package    core_dml
  36   * @copyright  2008 Petr Skoda (http://skodak.org)
  37   * @license    http://www.gnu.org/copyleft/gpl.html GNU GPL v3 or later
  38   */
  39  class pgsql_native_moodle_database extends moodle_database {
  40      use moodle_read_slave_trait {
  41          select_db_handle as read_slave_select_db_handle;
  42          can_use_readonly as read_slave_can_use_readonly;
  43          query_start as read_slave_query_start;
  44          query_end as read_slave_query_end;
  45      }
  46  
  47      /** @var array $dbhcursor keep track of open cursors */
  48      private $dbhcursor = [];
  49  
  50      /** @var resource $pgsql database resource */
  51      protected $pgsql     = null;
  52  
  53      protected $last_error_reporting; // To handle pgsql driver default verbosity
  54  
  55      /** @var bool savepoint hack for MDL-35506 - workaround for automatic transaction rollback on error */
  56      protected $savepointpresent = false;
  57  
  58      /** @var int Number of cursors used (for constructing a unique ID) */
  59      protected $cursorcount = 0;
  60  
  61      /** @var int Default number of rows to fetch at a time when using recordsets with cursors */
  62      const DEFAULT_FETCH_BUFFER_SIZE = 100000;
  63  
  64      /**
  65       * Detects if all needed PHP stuff installed.
  66       * Note: can be used before connect()
  67       * @return mixed true if ok, string if something
  68       */
  69      public function driver_installed() {
  70          if (!extension_loaded('pgsql')) {
  71              return get_string('pgsqlextensionisnotpresentinphp', 'install');
  72          }
  73          return true;
  74      }
  75  
  76      /**
  77       * Returns database family type - describes SQL dialect
  78       * Note: can be used before connect()
  79       * @return string db family name (mysql, postgres, mssql, oracle, etc.)
  80       */
  81      public function get_dbfamily() {
  82          return 'postgres';
  83      }
  84  
  85      /**
  86       * Returns more specific database driver type
  87       * Note: can be used before connect()
  88       * @return string db type mysqli, pgsql, oci, mssql, sqlsrv
  89       */
  90      protected function get_dbtype() {
  91          return 'pgsql';
  92      }
  93  
  94      /**
  95       * Returns general database library name
  96       * Note: can be used before connect()
  97       * @return string db type pdo, native
  98       */
  99      protected function get_dblibrary() {
 100          return 'native';
 101      }
 102  
 103      /**
 104       * Returns localised database type name
 105       * Note: can be used before connect()
 106       * @return string
 107       */
 108      public function get_name() {
 109          return get_string('nativepgsql', 'install');
 110      }
 111  
 112      /**
 113       * Returns localised database configuration help.
 114       * Note: can be used before connect()
 115       * @return string
 116       */
 117      public function get_configuration_help() {
 118          return get_string('nativepgsqlhelp', 'install');
 119      }
 120  
 121      /**
 122       * Connect to db
 123       * @param string $dbhost The database host.
 124       * @param string $dbuser The database username.
 125       * @param string $dbpass The database username's password.
 126       * @param string $dbname The name of the database being connected to.
 127       * @param mixed $prefix string means moodle db prefix, false used for external databases where prefix not used
 128       * @param array $dboptions driver specific options
 129       * @return bool true
 130       * @throws dml_connection_exception if error
 131       */
 132      public function raw_connect(string $dbhost, string $dbuser, string $dbpass, string $dbname, $prefix, array $dboptions=null): bool {
 133          if ($prefix == '' and !$this->external) {
 134              //Enforce prefixes for everybody but mysql
 135              throw new dml_exception('prefixcannotbeempty', $this->get_dbfamily());
 136          }
 137  
 138          $driverstatus = $this->driver_installed();
 139  
 140          if ($driverstatus !== true) {
 141              throw new dml_exception('dbdriverproblem', $driverstatus);
 142          }
 143  
 144          $this->store_settings($dbhost, $dbuser, $dbpass, $dbname, $prefix, $dboptions);
 145  
 146          $pass = addcslashes($this->dbpass, "'\\");
 147  
 148          // Unix socket connections should have lower overhead
 149          if (!empty($this->dboptions['dbsocket']) and ($this->dbhost === 'localhost' or $this->dbhost === '127.0.0.1')) {
 150              $connection = "user='$this->dbuser' password='$pass' dbname='$this->dbname'";
 151              if (strpos($this->dboptions['dbsocket'], '/') !== false) {
 152                  // A directory was specified as the socket location.
 153                  $connection .= " host='".$this->dboptions['dbsocket']."'";
 154              }
 155              if (!empty($this->dboptions['dbport'])) {
 156                  // A port as specified, add it to the connection as it's used as part of the socket path.
 157                  $connection .= " port ='".$this->dboptions['dbport']."'";
 158              }
 159          } else {
 160              $this->dboptions['dbsocket'] = '';
 161              if (empty($this->dbname)) {
 162                  // probably old style socket connection - do not add port
 163                  $port = "";
 164              } else if (empty($this->dboptions['dbport'])) {
 165                  $port = "port ='5432'";
 166              } else {
 167                  $port = "port ='".$this->dboptions['dbport']."'";
 168              }
 169              $connection = "host='$this->dbhost' $port user='$this->dbuser' password='$pass' dbname='$this->dbname'";
 170          }
 171  
 172          if (!empty($this->dboptions['connecttimeout'])) {
 173              $connection .= " connect_timeout=".$this->dboptions['connecttimeout'];
 174          }
 175  
 176          if (empty($this->dboptions['dbhandlesoptions'])) {
 177              // ALTER USER and ALTER DATABASE are overridden by these settings.
 178              $options = array('--client_encoding=utf8', '--standard_conforming_strings=on');
 179              // Select schema if specified, otherwise the first one wins.
 180              if (!empty($this->dboptions['dbschema'])) {
 181                  $options[] = "-c search_path=" . addcslashes($this->dboptions['dbschema'], "'\\");
 182              }
 183  
 184              $connection .= " options='" . implode(' ', $options) . "'";
 185          }
 186  
 187          ob_start();
 188          // It seems that pg_connect() handles some errors differently.
 189          // For example, name resolution error will raise an exception, and non-existing
 190          // database or wrong credentials will just return false.
 191          // We need to cater for both.
 192          try {
 193              if (empty($this->dboptions['dbpersist'])) {
 194                  $this->pgsql = pg_connect($connection, PGSQL_CONNECT_FORCE_NEW);
 195              } else {
 196                  $this->pgsql = pg_pconnect($connection, PGSQL_CONNECT_FORCE_NEW);
 197              }
 198              $dberr = ob_get_contents();
 199          } catch (\Exception $e) {
 200              $dberr = $e->getMessage();
 201          }
 202          ob_end_clean();
 203  
 204          $status = $this->pgsql ? pg_connection_status($this->pgsql) : false;
 205  
 206          if ($status === false or $status === PGSQL_CONNECTION_BAD) {
 207              $this->pgsql = null;
 208              throw new dml_connection_exception($dberr);
 209          }
 210  
 211          if (!empty($this->dboptions['dbpersist'])) {
 212              // There are rare situations (such as PHP out of memory errors) when open cursors may
 213              // not be closed at the end of a connection. When using persistent connections, the
 214              // cursors remain open and 'get in the way' of future connections. To avoid this
 215              // problem, close all cursors here.
 216              $result = pg_query($this->pgsql, 'CLOSE ALL');
 217              if ($result) {
 218                  pg_free_result($result);
 219              }
 220          }
 221  
 222          if (!empty($this->dboptions['dbhandlesoptions'])) {
 223              /* We don't trust people who just set the dbhandlesoptions, this code checks up on them.
 224               * These functions do not talk to the server, they use the client library knowledge to determine state.
 225               */
 226              if (!empty($this->dboptions['dbschema'])) {
 227                  throw new dml_connection_exception('You cannot specify a schema with dbhandlesoptions, use the database to set it.');
 228              }
 229              if (pg_client_encoding($this->pgsql) != 'UTF8') {
 230                  throw new dml_connection_exception('client_encoding = UTF8 not set, it is: ' . pg_client_encoding($this->pgsql));
 231              }
 232              if (pg_escape_string($this->pgsql, '\\') != '\\') {
 233                  throw new dml_connection_exception('standard_conforming_strings = on, must be set at the database.');
 234              }
 235          }
 236  
 237          // Connection stabilised and configured, going to instantiate the temptables controller
 238          $this->temptables = new pgsql_native_moodle_temptables($this);
 239  
 240          return true;
 241      }
 242  
 243      /**
 244       * Close database connection and release all resources
 245       * and memory (especially circular memory references).
 246       * Do NOT use connect() again, create a new instance if needed.
 247       */
 248      public function dispose() {
 249          parent::dispose(); // Call parent dispose to write/close session and other common stuff before closing connection
 250          if ($this->pgsql) {
 251              pg_close($this->pgsql);
 252              $this->pgsql = null;
 253          }
 254      }
 255  
 256      /**
 257       * Gets db handle currently used with queries
 258       * @return resource
 259       */
 260      protected function get_db_handle() {
 261          return $this->pgsql;
 262      }
 263  
 264      /**
 265       * Sets db handle to be used with subsequent queries
 266       * @param resource $dbh
 267       * @return void
 268       */
 269      protected function set_db_handle($dbh): void {
 270          $this->pgsql = $dbh;
 271      }
 272  
 273      /**
 274       * Select appropriate db handle - readwrite or readonly
 275       * @param int $type type of query
 276       * @param string $sql
 277       * @return void
 278       */
 279      protected function select_db_handle(int $type, string $sql): void {
 280          $this->read_slave_select_db_handle($type, $sql);
 281  
 282          if (preg_match('/^DECLARE (crs\w*) NO SCROLL CURSOR/', $sql, $match)) {
 283              $cursor = $match[1];
 284              $this->dbhcursor[$cursor] = $this->pgsql;
 285          }
 286          if (preg_match('/^(?:FETCH \d+ FROM|CLOSE) (crs\w*)\b/', $sql, $match)) {
 287              $cursor = $match[1];
 288              $this->pgsql = $this->dbhcursor[$cursor];
 289          }
 290      }
 291  
 292      /**
 293       * Check if The query qualifies for readonly connection execution
 294       * Logging queries are exempt, those are write operations that circumvent
 295       * standard query_start/query_end paths.
 296       * @param int $type type of query
 297       * @param string $sql
 298       * @return bool
 299       */
 300      protected function can_use_readonly(int $type, string $sql): bool {
 301          // ... pg_*lock queries always go to master.
 302          if (preg_match('/\bpg_\w*lock/', $sql)) {
 303              return false;
 304          }
 305  
 306          // ... a nuisance - temptables use this.
 307          if (preg_match('/\bpg_catalog/', $sql) && $this->temptables->get_temptables()) {
 308              return false;
 309          }
 310  
 311          return $this->read_slave_can_use_readonly($type, $sql);
 312  
 313      }
 314  
 315      /**
 316       * Called before each db query.
 317       * @param string $sql
 318       * @param array|null $params An array of parameters.
 319       * @param int $type type of query
 320       * @param mixed $extrainfo driver specific extra information
 321       * @return void
 322       */
 323      protected function query_start($sql, ?array $params, $type, $extrainfo=null) {
 324          $this->read_slave_query_start($sql, $params, $type, $extrainfo);
 325          // pgsql driver tends to send debug to output, we do not need that.
 326          $this->last_error_reporting = error_reporting(0);
 327      }
 328  
 329      /**
 330       * Called immediately after each db query.
 331       * @param mixed db specific result
 332       * @return void
 333       */
 334      protected function query_end($result) {
 335          // reset original debug level
 336          error_reporting($this->last_error_reporting);
 337          try {
 338              $this->read_slave_query_end($result);
 339              if ($this->savepointpresent and $this->last_type != SQL_QUERY_AUX and $this->last_type != SQL_QUERY_SELECT) {
 340                  $res = @pg_query($this->pgsql, "RELEASE SAVEPOINT moodle_pg_savepoint; SAVEPOINT moodle_pg_savepoint");
 341                  if ($res) {
 342                      pg_free_result($res);
 343                  }
 344              }
 345          } catch (Exception $e) {
 346              if ($this->savepointpresent) {
 347                  $res = @pg_query($this->pgsql, "ROLLBACK TO SAVEPOINT moodle_pg_savepoint; SAVEPOINT moodle_pg_savepoint");
 348                  if ($res) {
 349                      pg_free_result($res);
 350                  }
 351              }
 352              throw $e;
 353          }
 354      }
 355  
 356      /**
 357       * Returns database server info array
 358       * @return array Array containing 'description' and 'version' info
 359       */
 360      public function get_server_info() {
 361          static $info;
 362          if (!$info) {
 363              $this->query_start("--pg_version()", null, SQL_QUERY_AUX);
 364              $info = pg_version($this->pgsql);
 365              $this->query_end(true);
 366          }
 367          return array('description'=>$info['server'], 'version'=>$info['server']);
 368      }
 369  
 370      /**
 371       * Returns supported query parameter types
 372       * @return int bitmask of accepted SQL_PARAMS_*
 373       */
 374      protected function allowed_param_types() {
 375          return SQL_PARAMS_DOLLAR;
 376      }
 377  
 378      /**
 379       * Returns last error reported by database engine.
 380       * @return string error message
 381       */
 382      public function get_last_error() {
 383          return pg_last_error($this->pgsql);
 384      }
 385  
 386      /**
 387       * Return tables in database WITHOUT current prefix.
 388       * @param bool $usecache if true, returns list of cached tables.
 389       * @return array of table names in lowercase and without prefix
 390       */
 391      public function get_tables($usecache=true) {
 392          if ($usecache and $this->tables !== null) {
 393              return $this->tables;
 394          }
 395          $this->tables = array();
 396          $prefix = str_replace('_', '|_', $this->prefix);
 397          $sql = "SELECT c.relname
 398                    FROM pg_catalog.pg_class c
 399                    JOIN pg_catalog.pg_namespace as ns ON ns.oid = c.relnamespace
 400                   WHERE c.relname LIKE '$prefix%' ESCAPE '|'
 401                         AND c.relkind = 'r'
 402                         AND (ns.nspname = current_schema() OR ns.oid = pg_my_temp_schema())";
 403          $this->query_start($sql, null, SQL_QUERY_AUX);
 404          $result = pg_query($this->pgsql, $sql);
 405          $this->query_end($result);
 406  
 407          if ($result) {
 408              while ($row = pg_fetch_row($result)) {
 409                  $tablename = reset($row);
 410                  if ($this->prefix !== false && $this->prefix !== '') {
 411                      if (strpos($tablename, $this->prefix) !== 0) {
 412                          continue;
 413                      }
 414                      $tablename = substr($tablename, strlen($this->prefix));
 415                  }
 416                  $this->tables[$tablename] = $tablename;
 417              }
 418              pg_free_result($result);
 419          }
 420          return $this->tables;
 421      }
 422  
 423      /**
 424       * Constructs 'IN()' or '=' sql fragment
 425       *
 426       * Method overriding {@see moodle_database::get_in_or_equal} to be able to use
 427       * more than 65535 elements in $items array.
 428       *
 429       * @param mixed $items A single value or array of values for the expression.
 430       * @param int $type Parameter bounding type : SQL_PARAMS_QM or SQL_PARAMS_NAMED.
 431       * @param string $prefix Named parameter placeholder prefix (a unique counter value is appended to each parameter name).
 432       * @param bool $equal True means we want to equate to the constructed expression, false means we don't want to equate to it.
 433       * @param mixed $onemptyitems This defines the behavior when the array of items provided is empty. Defaults to false,
 434       *              meaning throw exceptions. Other values will become part of the returned SQL fragment.
 435       * @throws coding_exception | dml_exception
 436       * @return array A list containing the constructed sql fragment and an array of parameters.
 437       */
 438      public function get_in_or_equal($items, $type=SQL_PARAMS_QM, $prefix='param', $equal=true, $onemptyitems=false): array {
 439          // We only interfere if number of items in expression exceeds 16 bit value.
 440          if (!is_array($items) || count($items) < 65535) {
 441              return parent::get_in_or_equal($items, $type, $prefix,  $equal, $onemptyitems);
 442          }
 443  
 444          // Determine the type from the first value. We don't need to be very smart here,
 445          // it is developer's responsibility to make sure that variable type is matching
 446          // field type, if not the case, DB engine will hint. Also mixing types won't work
 447          // here anyway, so we ignore NULL or boolean (unlikely you need 56k values of
 448          // these types only).
 449          $cast = is_string(current($items)) ? '::text' : '::bigint';
 450  
 451          if ($type == SQL_PARAMS_QM) {
 452              if ($equal) {
 453                  $sql = 'IN (VALUES ('.implode('),(', array_fill(0, count($items), '?'.$cast)).'))';
 454              } else {
 455                  $sql = 'NOT IN (VALUES ('.implode('),(', array_fill(0, count($items), '?'.$cast)).'))';
 456              }
 457              $params = array_values($items);
 458          } else if ($type == SQL_PARAMS_NAMED) {
 459              if (empty($prefix)) {
 460                  $prefix = 'param';
 461              }
 462              $params = [];
 463              $sql = [];
 464              foreach ($items as $item) {
 465                  $param = $prefix.$this->inorequaluniqueindex++;
 466                  $params[$param] = $item;
 467                  $sql[] = ':'.$param.$cast;
 468              }
 469              if ($equal) {
 470                  $sql = 'IN (VALUES ('.implode('),(', $sql).'))';
 471              } else {
 472                  $sql = 'NOT IN (VALUES ('.implode('),(', $sql).'))';
 473              }
 474          } else {
 475              throw new dml_exception('typenotimplement');
 476          }
 477          return [$sql, $params];
 478      }
 479  
 480      /**
 481       * Return table indexes - everything lowercased.
 482       * @param string $table The table we want to get indexes from.
 483       * @return array of arrays
 484       */
 485      public function get_indexes($table) {
 486          $indexes = array();
 487          $tablename = $this->prefix.$table;
 488  
 489          $sql = "SELECT i.*
 490                    FROM pg_catalog.pg_indexes i
 491                    JOIN pg_catalog.pg_namespace as ns ON ns.nspname = i.schemaname
 492                   WHERE i.tablename = '$tablename'
 493                         AND (i.schemaname = current_schema() OR ns.oid = pg_my_temp_schema())";
 494  
 495          $this->query_start($sql, null, SQL_QUERY_AUX);
 496          $result = pg_query($this->pgsql, $sql);
 497          $this->query_end($result);
 498  
 499          if ($result) {
 500              while ($row = pg_fetch_assoc($result)) {
 501                  // The index definition could be generated schema-qualifying the target table name
 502                  // for safety, depending on the pgsql version (CVE-2018-1058).
 503                  if (!preg_match('/CREATE (|UNIQUE )INDEX ([^\s]+) ON (|'.$row['schemaname'].'\.)'.$tablename.' USING ([^\s]+) \(([^\)]+)\)/i', $row['indexdef'], $matches)) {
 504                      continue;
 505                  }
 506                  if ($matches[5] === 'id') {
 507                      continue;
 508                  }
 509                  $columns = explode(',', $matches[5]);
 510                  foreach ($columns as $k=>$column) {
 511                      $column = trim($column);
 512                      if ($pos = strpos($column, ' ')) {
 513                          // index type is separated by space
 514                          $column = substr($column, 0, $pos);
 515                      }
 516                      $columns[$k] = $this->trim_quotes($column);
 517                  }
 518                  $indexes[$row['indexname']] = array('unique'=>!empty($matches[1]),
 519                                                'columns'=>$columns);
 520              }
 521              pg_free_result($result);
 522          }
 523          return $indexes;
 524      }
 525  
 526      /**
 527       * Returns detailed information about columns in table.
 528       *
 529       * @param string $table name
 530       * @return database_column_info[] array of database_column_info objects indexed with column names
 531       */
 532      protected function fetch_columns(string $table): array {
 533          $structure = array();
 534  
 535          $tablename = $this->prefix.$table;
 536  
 537          $sql = "SELECT a.attnum, a.attname AS field, t.typname AS type, a.attlen, a.atttypmod, a.attnotnull, a.atthasdef,
 538                         CASE WHEN a.atthasdef THEN pg_catalog.pg_get_expr(d.adbin, d.adrelid) END AS adsrc
 539                    FROM pg_catalog.pg_class c
 540                    JOIN pg_catalog.pg_namespace as ns ON ns.oid = c.relnamespace
 541                    JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid
 542                    JOIN pg_catalog.pg_type t ON t.oid = a.atttypid
 543               LEFT JOIN pg_catalog.pg_attrdef d ON (d.adrelid = c.oid AND d.adnum = a.attnum)
 544                   WHERE relkind = 'r' AND c.relname = '$tablename' AND c.reltype > 0 AND a.attnum > 0
 545                         AND (ns.nspname = current_schema() OR ns.oid = pg_my_temp_schema())
 546                ORDER BY a.attnum";
 547  
 548          $this->query_start($sql, null, SQL_QUERY_AUX);
 549          $result = pg_query($this->pgsql, $sql);
 550          $this->query_end($result);
 551  
 552          if (!$result) {
 553              return array();
 554          }
 555          while ($rawcolumn = pg_fetch_object($result)) {
 556  
 557              $info = new stdClass();
 558              $info->name = $rawcolumn->field;
 559              $matches = null;
 560  
 561              if ($rawcolumn->type === 'varchar') {
 562                  $info->type          = 'varchar';
 563                  $info->meta_type     = 'C';
 564                  $info->max_length    = $rawcolumn->atttypmod - 4;
 565                  $info->scale         = null;
 566                  $info->not_null      = ($rawcolumn->attnotnull === 't');
 567                  $info->has_default   = ($rawcolumn->atthasdef === 't');
 568                  if ($info->has_default) {
 569                      $parts = explode('::', $rawcolumn->adsrc);
 570                      if (count($parts) > 1) {
 571                          $info->default_value = reset($parts);
 572                          $info->default_value = trim($info->default_value, "'");
 573                      } else {
 574                          $info->default_value = $rawcolumn->adsrc;
 575                      }
 576                  } else {
 577                      $info->default_value = null;
 578                  }
 579                  $info->primary_key   = false;
 580                  $info->binary        = false;
 581                  $info->unsigned      = null;
 582                  $info->auto_increment= false;
 583                  $info->unique        = null;
 584  
 585              } else if (preg_match('/int(\d)/i', $rawcolumn->type, $matches)) {
 586                  $info->type = 'int';
 587                  if (strpos($rawcolumn->adsrc, 'nextval') === 0) {
 588                      $info->primary_key   = true;
 589                      $info->meta_type     = 'R';
 590                      $info->unique        = true;
 591                      $info->auto_increment= true;
 592                      $info->has_default   = false;
 593                  } else {
 594                      $info->primary_key   = false;
 595                      $info->meta_type     = 'I';
 596                      $info->unique        = null;
 597                      $info->auto_increment= false;
 598                      $info->has_default   = ($rawcolumn->atthasdef === 't');
 599                  }
 600                  // Return number of decimals, not bytes here.
 601                  if ($matches[1] >= 8) {
 602                      $info->max_length = 18;
 603                  } else if ($matches[1] >= 4) {
 604                      $info->max_length = 9;
 605                  } else if ($matches[1] >= 2) {
 606                      $info->max_length = 4;
 607                  } else if ($matches[1] >= 1) {
 608                      $info->max_length = 2;
 609                  } else {
 610                      $info->max_length = 0;
 611                  }
 612                  $info->scale         = null;
 613                  $info->not_null      = ($rawcolumn->attnotnull === 't');
 614                  if ($info->has_default) {
 615                      // PG 9.5+ uses ::<TYPE> syntax for some defaults.
 616                      $parts = explode('::', $rawcolumn->adsrc);
 617                      if (count($parts) > 1) {
 618                          $info->default_value = reset($parts);
 619                      } else {
 620                          $info->default_value = $rawcolumn->adsrc;
 621                      }
 622                      $info->default_value = trim($info->default_value, "()'");
 623                  } else {
 624                      $info->default_value = null;
 625                  }
 626                  $info->binary        = false;
 627                  $info->unsigned      = false;
 628  
 629              } else if ($rawcolumn->type === 'numeric') {
 630                  $info->type = $rawcolumn->type;
 631                  $info->meta_type     = 'N';
 632                  $info->primary_key   = false;
 633                  $info->binary        = false;
 634                  $info->unsigned      = null;
 635                  $info->auto_increment= false;
 636                  $info->unique        = null;
 637                  $info->not_null      = ($rawcolumn->attnotnull === 't');
 638                  $info->has_default   = ($rawcolumn->atthasdef === 't');
 639                  if ($info->has_default) {
 640                      // PG 9.5+ uses ::<TYPE> syntax for some defaults.
 641                      $parts = explode('::', $rawcolumn->adsrc);
 642                      if (count($parts) > 1) {
 643                          $info->default_value = reset($parts);
 644                      } else {
 645                          $info->default_value = $rawcolumn->adsrc;
 646                      }
 647                      $info->default_value = trim($info->default_value, "()'");
 648                  } else {
 649                      $info->default_value = null;
 650                  }
 651                  $info->max_length    = $rawcolumn->atttypmod >> 16;
 652                  $info->scale         = ($rawcolumn->atttypmod & 0xFFFF) - 4;
 653  
 654              } else if (preg_match('/float(\d)/i', $rawcolumn->type, $matches)) {
 655                  $info->type = 'float';
 656                  $info->meta_type     = 'N';
 657                  $info->primary_key   = false;
 658                  $info->binary        = false;
 659                  $info->unsigned      = null;
 660                  $info->auto_increment= false;
 661                  $info->unique        = null;
 662                  $info->not_null      = ($rawcolumn->attnotnull === 't');
 663                  $info->has_default   = ($rawcolumn->atthasdef === 't');
 664                  if ($info->has_default) {
 665                      // PG 9.5+ uses ::<TYPE> syntax for some defaults.
 666                      $parts = explode('::', $rawcolumn->adsrc);
 667                      if (count($parts) > 1) {
 668                          $info->default_value = reset($parts);
 669                      } else {
 670                          $info->default_value = $rawcolumn->adsrc;
 671                      }
 672                      $info->default_value = trim($info->default_value, "()'");
 673                  } else {
 674                      $info->default_value = null;
 675                  }
 676                  // just guess expected number of deciaml places :-(
 677                  if ($matches[1] == 8) {
 678                      // total 15 digits
 679                      $info->max_length = 8;
 680                      $info->scale      = 7;
 681                  } else {
 682                      // total 6 digits
 683                      $info->max_length = 4;
 684                      $info->scale      = 2;
 685                  }
 686  
 687              } else if ($rawcolumn->type === 'text') {
 688                  $info->type          = $rawcolumn->type;
 689                  $info->meta_type     = 'X';
 690                  $info->max_length    = -1;
 691                  $info->scale         = null;
 692                  $info->not_null      = ($rawcolumn->attnotnull === 't');
 693                  $info->has_default   = ($rawcolumn->atthasdef === 't');
 694                  if ($info->has_default) {
 695                      $parts = explode('::', $rawcolumn->adsrc);
 696                      if (count($parts) > 1) {
 697                          $info->default_value = reset($parts);
 698                          $info->default_value = trim($info->default_value, "'");
 699                      } else {
 700                          $info->default_value = $rawcolumn->adsrc;
 701                      }
 702                  } else {
 703                      $info->default_value = null;
 704                  }
 705                  $info->primary_key   = false;
 706                  $info->binary        = false;
 707                  $info->unsigned      = null;
 708                  $info->auto_increment= false;
 709                  $info->unique        = null;
 710  
 711              } else if ($rawcolumn->type === 'bytea') {
 712                  $info->type          = $rawcolumn->type;
 713                  $info->meta_type     = 'B';
 714                  $info->max_length    = -1;
 715                  $info->scale         = null;
 716                  $info->not_null      = ($rawcolumn->attnotnull === 't');
 717                  $info->has_default   = false;
 718                  $info->default_value = null;
 719                  $info->primary_key   = false;
 720                  $info->binary        = true;
 721                  $info->unsigned      = null;
 722                  $info->auto_increment= false;
 723                  $info->unique        = null;
 724  
 725              }
 726  
 727              $structure[$info->name] = new database_column_info($info);
 728          }
 729  
 730          pg_free_result($result);
 731  
 732          return $structure;
 733      }
 734  
 735      /**
 736       * Normalise values based in RDBMS dependencies (booleans, LOBs...)
 737       *
 738       * @param database_column_info $column column metadata corresponding with the value we are going to normalise
 739       * @param mixed $value value we are going to normalise
 740       * @return mixed the normalised value
 741       */
 742      protected function normalise_value($column, $value) {
 743          $this->detect_objects($value);
 744  
 745          if (is_bool($value)) { // Always, convert boolean to int
 746              $value = (int)$value;
 747  
 748          } else if ($column->meta_type === 'B') {
 749              if (!is_null($value)) {
 750                  // standard_conforming_strings must be enabled, otherwise pg_escape_bytea() will double escape
 751                  // \ and produce data errors.  This is set on the connection.
 752                  $value = pg_escape_bytea($this->pgsql, $value);
 753              }
 754  
 755          } else if ($value === '') {
 756              if ($column->meta_type === 'I' or $column->meta_type === 'F' or $column->meta_type === 'N') {
 757                  $value = 0; // prevent '' problems in numeric fields
 758              }
 759          }
 760          return $value;
 761      }
 762  
 763      /**
 764       * Is db in unicode mode?
 765       * @return bool
 766       */
 767      public function setup_is_unicodedb() {
 768          // Get PostgreSQL server_encoding value
 769          $sql = "SHOW server_encoding";
 770          $this->query_start($sql, null, SQL_QUERY_AUX);
 771          $result = pg_query($this->pgsql, $sql);
 772          $this->query_end($result);
 773  
 774          if (!$result) {
 775              return false;
 776          }
 777          $rawcolumn = pg_fetch_object($result);
 778          $encoding = $rawcolumn->server_encoding;
 779          pg_free_result($result);
 780  
 781          return (strtoupper($encoding) == 'UNICODE' || strtoupper($encoding) == 'UTF8');
 782      }
 783  
 784      /**
 785       * Do NOT use in code, to be used by database_manager only!
 786       * @param string|array $sql query
 787       * @param array|null $tablenames an array of xmldb table names affected by this request.
 788       * @return bool true
 789       * @throws ddl_change_structure_exception A DDL specific exception is thrown for any errors.
 790       */
 791      public function change_database_structure($sql, $tablenames = null) {
 792          $this->get_manager(); // Includes DDL exceptions classes ;-)
 793          if (is_array($sql)) {
 794              $sql = implode("\n;\n", $sql);
 795          }
 796          if (!$this->is_transaction_started()) {
 797              // It is better to do all or nothing, this helps with recovery...
 798              $sql = "BEGIN ISOLATION LEVEL SERIALIZABLE;\n$sql\n; COMMIT";
 799          }
 800  
 801          try {
 802              $this->query_start($sql, null, SQL_QUERY_STRUCTURE);
 803              $result = pg_query($this->pgsql, $sql);
 804              $this->query_end($result);
 805              pg_free_result($result);
 806          } catch (ddl_change_structure_exception $e) {
 807              if (!$this->is_transaction_started()) {
 808                  $result = @pg_query($this->pgsql, "ROLLBACK");
 809                  @pg_free_result($result);
 810              }
 811              $this->reset_caches($tablenames);
 812              throw $e;
 813          }
 814  
 815          $this->reset_caches($tablenames);
 816          return true;
 817      }
 818  
 819      /**
 820       * Execute general sql query. Should be used only when no other method suitable.
 821       * Do NOT use this to make changes in db structure, use database_manager methods instead!
 822       * @param string $sql query
 823       * @param array $params query parameters
 824       * @return bool true
 825       * @throws dml_exception A DML specific exception is thrown for any errors.
 826       */
 827      public function execute($sql, array $params=null) {
 828          list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
 829  
 830          if (strpos($sql, ';') !== false) {
 831              throw new coding_exception('moodle_database::execute() Multiple sql statements found or bound parameters not used properly in query!');
 832          }
 833  
 834          $this->query_start($sql, $params, SQL_QUERY_UPDATE);
 835          $result = pg_query_params($this->pgsql, $sql, $params);
 836          $this->query_end($result);
 837  
 838          pg_free_result($result);
 839          return true;
 840      }
 841  
 842      /**
 843       * Get a number of records as a moodle_recordset using a SQL statement.
 844       *
 845       * Since this method is a little less readable, use of it should be restricted to
 846       * code where it's possible there might be large datasets being returned.  For known
 847       * small datasets use get_records_sql - it leads to simpler code.
 848       *
 849       * The return type is like:
 850       * @see function get_recordset.
 851       *
 852       * @param string $sql the SQL select query to execute.
 853       * @param array $params array of sql parameters
 854       * @param int $limitfrom return a subset of records, starting at this point (optional, required if $limitnum is set).
 855       * @param int $limitnum return a subset comprising this many records (optional, required if $limitfrom is set).
 856       * @return moodle_recordset instance
 857       * @throws dml_exception A DML specific exception is thrown for any errors.
 858       */
 859      public function get_recordset_sql($sql, array $params=null, $limitfrom=0, $limitnum=0) {
 860  
 861          list($limitfrom, $limitnum) = $this->normalise_limit_from_num($limitfrom, $limitnum);
 862  
 863          if ($limitnum) {
 864              $sql .= " LIMIT $limitnum";
 865          }
 866          if ($limitfrom) {
 867              $sql .= " OFFSET $limitfrom";
 868          }
 869  
 870          list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
 871  
 872          // For any query that doesn't explicitly specify a limit, we must use cursors to stop it
 873          // loading the entire thing (unless the config setting is turned off).
 874          $usecursors = !$limitnum && ($this->get_fetch_buffer_size() > 0);
 875          if ($usecursors) {
 876              // Work out the cursor unique identifer. This is based on a simple count used which
 877              // should be OK because the identifiers only need to be unique within the current
 878              // transaction.
 879              $this->cursorcount++;
 880              $cursorname = 'crs' . $this->cursorcount;
 881  
 882              // Do the query to a cursor.
 883              $sql = 'DECLARE ' . $cursorname . ' NO SCROLL CURSOR WITH HOLD FOR ' . $sql;
 884          } else {
 885              $cursorname = '';
 886          }
 887  
 888          $this->query_start($sql, $params, SQL_QUERY_SELECT);
 889  
 890          $result = pg_query_params($this->pgsql, $sql, $params);
 891  
 892          $this->query_end($result);
 893          if ($usecursors) {
 894              pg_free_result($result);
 895              $result = null;
 896          }
 897  
 898          return new pgsql_native_moodle_recordset($result, $this, $cursorname);
 899      }
 900  
 901      /**
 902       * Gets size of fetch buffer used for recordset queries.
 903       *
 904       * If this returns 0 then cursors will not be used, meaning recordset queries will occupy enough
 905       * memory as needed for the Postgres library to hold the entire query results in memory.
 906       *
 907       * @return int Fetch buffer size or 0 indicating not to use cursors
 908       */
 909      protected function get_fetch_buffer_size() {
 910          if (array_key_exists('fetchbuffersize', $this->dboptions)) {
 911              return (int)$this->dboptions['fetchbuffersize'];
 912          } else {
 913              return self::DEFAULT_FETCH_BUFFER_SIZE;
 914          }
 915      }
 916  
 917      /**
 918       * Retrieves data from cursor. For use by recordset only; do not call directly.
 919       *
 920       * Return value contains the next batch of Postgres data, and a boolean indicating if this is
 921       * definitely the last batch (if false, there may be more)
 922       *
 923       * @param string $cursorname Name of cursor to read from
 924       * @return array Array with 2 elements (next data batch and boolean indicating last batch)
 925       */
 926      public function fetch_from_cursor($cursorname) {
 927          $count = $this->get_fetch_buffer_size();
 928  
 929          $sql = 'FETCH ' . $count . ' FROM ' . $cursorname;
 930  
 931          $this->query_start($sql, [], SQL_QUERY_AUX);
 932          $result = pg_query($this->pgsql, $sql);
 933          $last = pg_num_rows($result) !== $count;
 934  
 935          $this->query_end($result);
 936  
 937          return [$result, $last];
 938      }
 939  
 940      /**
 941       * Closes a cursor. For use by recordset only; do not call directly.
 942       *
 943       * @param string $cursorname Name of cursor to close
 944       * @return bool True if we actually closed one, false if the transaction was cancelled
 945       */
 946      public function close_cursor($cursorname) {
 947          // If the transaction got cancelled, then ignore this request.
 948          $sql = 'CLOSE ' . $cursorname;
 949          $this->query_start($sql, [], SQL_QUERY_AUX);
 950          $result = pg_query($this->pgsql, $sql);
 951          $this->query_end($result);
 952          if ($result) {
 953              pg_free_result($result);
 954          }
 955          return true;
 956      }
 957  
 958      /**
 959       * Get a number of records as an array of objects using a SQL statement.
 960       *
 961       * Return value is like:
 962       * @see function get_records.
 963       *
 964       * @param string $sql the SQL select query to execute. The first column of this SELECT statement
 965       *   must be a unique value (usually the 'id' field), as it will be used as the key of the
 966       *   returned array.
 967       * @param array $params array of sql parameters
 968       * @param int $limitfrom return a subset of records, starting at this point (optional, required if $limitnum is set).
 969       * @param int $limitnum return a subset comprising this many records (optional, required if $limitfrom is set).
 970       * @return array of objects, or empty array if no records were found
 971       * @throws dml_exception A DML specific exception is thrown for any errors.
 972       */
 973      public function get_records_sql($sql, array $params = null, $limitfrom = 0, $limitnum = 0) {
 974          list($limitfrom, $limitnum) = $this->normalise_limit_from_num($limitfrom, $limitnum);
 975  
 976          if ($limitnum) {
 977              $sql .= " LIMIT $limitnum";
 978          }
 979          if ($limitfrom) {
 980              $sql .= " OFFSET $limitfrom";
 981          }
 982  
 983          list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
 984          $this->query_start($sql, $params, SQL_QUERY_SELECT);
 985          $result = pg_query_params($this->pgsql, $sql, $params);
 986          $this->query_end($result);
 987  
 988          // find out if there are any blobs
 989          $numfields = pg_num_fields($result);
 990          $blobs = array();
 991          for ($i = 0; $i < $numfields; $i++) {
 992              $type = pg_field_type($result, $i);
 993              if ($type == 'bytea') {
 994                  $blobs[] = pg_field_name($result, $i);
 995              }
 996          }
 997  
 998          $return = [];
 999          while ($row = pg_fetch_assoc($result)) {
1000              $id = reset($row);
1001              if ($blobs) {
1002                  foreach ($blobs as $blob) {
1003                      $row[$blob] = ($row[$blob] !== null ? pg_unescape_bytea($row[$blob]) : null);
1004                  }
1005              }
1006              if (isset($return[$id])) {
1007                  $colname = key($row);
1008                  debugging("Did you remember to make the first column something unique in your call to get_records? Duplicate value '$id' found in column '$colname'.", DEBUG_DEVELOPER);
1009              }
1010              $return[$id] = (object) $row;
1011          }
1012  
1013          return $return;
1014      }
1015  
1016      /**
1017       * Selects records and return values (first field) as an array using a SQL statement.
1018       *
1019       * @param string $sql The SQL query
1020       * @param array $params array of sql parameters
1021       * @return array of values
1022       * @throws dml_exception A DML specific exception is thrown for any errors.
1023       */
1024      public function get_fieldset_sql($sql, array $params=null) {
1025          list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
1026  
1027          $this->query_start($sql, $params, SQL_QUERY_SELECT);
1028          $result = pg_query_params($this->pgsql, $sql, $params);
1029          $this->query_end($result);
1030  
1031          $return = pg_fetch_all_columns($result, 0);
1032  
1033          if (pg_field_type($result, 0) == 'bytea') {
1034              foreach ($return as $key => $value) {
1035                  $return[$key] = ($value === null ? $value : pg_unescape_bytea($value));
1036              }
1037          }
1038  
1039          pg_free_result($result);
1040  
1041          return $return;
1042      }
1043  
1044      /**
1045       * Insert new record into database, as fast as possible, no safety checks, lobs not supported.
1046       * @param string $table name
1047       * @param mixed $params data record as object or array
1048       * @param bool $returnit return it of inserted record
1049       * @param bool $bulk true means repeated inserts expected
1050       * @param bool $customsequence true if 'id' included in $params, disables $returnid
1051       * @return bool|int true or new id
1052       * @throws dml_exception A DML specific exception is thrown for any errors.
1053       */
1054      public function insert_record_raw($table, $params, $returnid=true, $bulk=false, $customsequence=false) {
1055          if (!is_array($params)) {
1056              $params = (array)$params;
1057          }
1058  
1059          $returning = "";
1060  
1061          if ($customsequence) {
1062              if (!isset($params['id'])) {
1063                  throw new coding_exception('moodle_database::insert_record_raw() id field must be specified if custom sequences used.');
1064              }
1065              $returnid = false;
1066          } else {
1067              if ($returnid) {
1068                  $returning = "RETURNING id";
1069                  unset($params['id']);
1070              } else {
1071                  unset($params['id']);
1072              }
1073          }
1074  
1075          if (empty($params)) {
1076              throw new coding_exception('moodle_database::insert_record_raw() no fields found.');
1077          }
1078  
1079          $fields = implode(',', array_keys($params));
1080          $values = array();
1081          $i = 1;
1082          foreach ($params as $value) {
1083              $this->detect_objects($value);
1084              $values[] = "\$".$i++;
1085          }
1086          $values = implode(',', $values);
1087  
1088          $sql = "INSERT INTO {$this->prefix}$table ($fields) VALUES($values) $returning";
1089          $this->query_start($sql, $params, SQL_QUERY_INSERT);
1090          $result = pg_query_params($this->pgsql, $sql, $params);
1091          $this->query_end($result);
1092  
1093          if ($returning !== "") {
1094              $row = pg_fetch_assoc($result);
1095              $params['id'] = reset($row);
1096          }
1097          pg_free_result($result);
1098  
1099          if (!$returnid) {
1100              return true;
1101          }
1102  
1103          return (int)$params['id'];
1104      }
1105  
1106      /**
1107       * Insert a record into a table and return the "id" field if required.
1108       *
1109       * Some conversions and safety checks are carried out. Lobs are supported.
1110       * If the return ID isn't required, then this just reports success as true/false.
1111       * $data is an object containing needed data
1112       * @param string $table The database table to be inserted into
1113       * @param object|array $dataobject A data object with values for one or more fields in the record
1114       * @param bool $returnid Should the id of the newly created record entry be returned? If this option is not requested then true/false is returned.
1115       * @return bool|int true or new id
1116       * @throws dml_exception A DML specific exception is thrown for any errors.
1117       */
1118      public function insert_record($table, $dataobject, $returnid=true, $bulk=false) {
1119          $dataobject = (array)$dataobject;
1120  
1121          $columns = $this->get_columns($table);
1122          if (empty($columns)) {
1123              throw new dml_exception('ddltablenotexist', $table);
1124          }
1125  
1126          $cleaned = array();
1127  
1128          foreach ($dataobject as $field=>$value) {
1129              if ($field === 'id') {
1130                  continue;
1131              }
1132              if (!isset($columns[$field])) {
1133                  continue;
1134              }
1135              $column = $columns[$field];
1136              $cleaned[$field] = $this->normalise_value($column, $value);
1137          }
1138  
1139          return $this->insert_record_raw($table, $cleaned, $returnid, $bulk);
1140  
1141      }
1142  
1143      /**
1144       * Insert multiple records into database as fast as possible.
1145       *
1146       * Order of inserts is maintained, but the operation is not atomic,
1147       * use transactions if necessary.
1148       *
1149       * This method is intended for inserting of large number of small objects,
1150       * do not use for huge objects with text or binary fields.
1151       *
1152       * @since Moodle 2.7
1153       *
1154       * @param string $table  The database table to be inserted into
1155       * @param array|Traversable $dataobjects list of objects to be inserted, must be compatible with foreach
1156       * @return void does not return new record ids
1157       *
1158       * @throws coding_exception if data objects have different structure
1159       * @throws dml_exception A DML specific exception is thrown for any errors.
1160       */
1161      public function insert_records($table, $dataobjects) {
1162          if (!is_array($dataobjects) and !($dataobjects instanceof Traversable)) {
1163              throw new coding_exception('insert_records() passed non-traversable object');
1164          }
1165  
1166          // PostgreSQL does not seem to have problems with huge queries.
1167          $chunksize = 500;
1168          if (!empty($this->dboptions['bulkinsertsize'])) {
1169              $chunksize = (int)$this->dboptions['bulkinsertsize'];
1170          }
1171  
1172          $columns = $this->get_columns($table, true);
1173  
1174          $fields = null;
1175          $count = 0;
1176          $chunk = array();
1177          foreach ($dataobjects as $dataobject) {
1178              if (!is_array($dataobject) and !is_object($dataobject)) {
1179                  throw new coding_exception('insert_records() passed invalid record object');
1180              }
1181              $dataobject = (array)$dataobject;
1182              if ($fields === null) {
1183                  $fields = array_keys($dataobject);
1184                  $columns = array_intersect_key($columns, $dataobject);
1185                  unset($columns['id']);
1186              } else if ($fields !== array_keys($dataobject)) {
1187                  throw new coding_exception('All dataobjects in insert_records() must have the same structure!');
1188              }
1189  
1190              $count++;
1191              $chunk[] = $dataobject;
1192  
1193              if ($count === $chunksize) {
1194                  $this->insert_chunk($table, $chunk, $columns);
1195                  $chunk = array();
1196                  $count = 0;
1197              }
1198          }
1199  
1200          if ($count) {
1201              $this->insert_chunk($table, $chunk, $columns);
1202          }
1203      }
1204  
1205      /**
1206       * Insert records in chunks, strict param types...
1207       *
1208       * Note: can be used only from insert_records().
1209       *
1210       * @param string $table
1211       * @param array $chunk
1212       * @param database_column_info[] $columns
1213       */
1214      protected function insert_chunk($table, array $chunk, array $columns) {
1215          $i = 1;
1216          $params = array();
1217          $values = array();
1218          foreach ($chunk as $dataobject) {
1219              $vals = array();
1220              foreach ($columns as $field => $column) {
1221                  $params[] = $this->normalise_value($column, $dataobject[$field]);
1222                  $vals[] = "\$".$i++;
1223              }
1224              $values[] = '('.implode(',', $vals).')';
1225          }
1226  
1227          $fieldssql = '('.implode(',', array_keys($columns)).')';
1228          $valuessql = implode(',', $values);
1229  
1230          $sql = "INSERT INTO {$this->prefix}$table $fieldssql VALUES $valuessql";
1231          $this->query_start($sql, $params, SQL_QUERY_INSERT);
1232          $result = pg_query_params($this->pgsql, $sql, $params);
1233          $this->query_end($result);
1234          pg_free_result($result);
1235      }
1236  
1237      /**
1238       * Import a record into a table, id field is required.
1239       * Safety checks are NOT carried out. Lobs are supported.
1240       *
1241       * @param string $table name of database table to be inserted into
1242       * @param object $dataobject A data object with values for one or more fields in the record
1243       * @return bool true
1244       * @throws dml_exception A DML specific exception is thrown for any errors.
1245       */
1246      public function import_record($table, $dataobject) {
1247          $dataobject = (array)$dataobject;
1248  
1249          $columns = $this->get_columns($table);
1250          $cleaned = array();
1251  
1252          foreach ($dataobject as $field=>$value) {
1253              $this->detect_objects($value);
1254              if (!isset($columns[$field])) {
1255                  continue;
1256              }
1257              $column = $columns[$field];
1258              $cleaned[$field] = $this->normalise_value($column, $value);
1259          }
1260  
1261          return $this->insert_record_raw($table, $cleaned, false, true, true);
1262      }
1263  
1264      /**
1265       * Update record in database, as fast as possible, no safety checks, lobs not supported.
1266       * @param string $table name
1267       * @param mixed $params data record as object or array
1268       * @param bool true means repeated updates expected
1269       * @return bool true
1270       * @throws dml_exception A DML specific exception is thrown for any errors.
1271       */
1272      public function update_record_raw($table, $params, $bulk=false) {
1273          $params = (array)$params;
1274  
1275          if (!isset($params['id'])) {
1276              throw new coding_exception('moodle_database::update_record_raw() id field must be specified.');
1277          }
1278          $id = $params['id'];
1279          unset($params['id']);
1280  
1281          if (empty($params)) {
1282              throw new coding_exception('moodle_database::update_record_raw() no fields found.');
1283          }
1284  
1285          $i = 1;
1286  
1287          $sets = array();
1288          foreach ($params as $field=>$value) {
1289              $this->detect_objects($value);
1290              $sets[] = "$field = \$".$i++;
1291          }
1292  
1293          $params[] = $id; // last ? in WHERE condition
1294  
1295          $sets = implode(',', $sets);
1296          $sql = "UPDATE {$this->prefix}$table SET $sets WHERE id=\$".$i;
1297  
1298          $this->query_start($sql, $params, SQL_QUERY_UPDATE);
1299          $result = pg_query_params($this->pgsql, $sql, $params);
1300          $this->query_end($result);
1301  
1302          pg_free_result($result);
1303          return true;
1304      }
1305  
1306      /**
1307       * Update a record in a table
1308       *
1309       * $dataobject is an object containing needed data
1310       * Relies on $dataobject having a variable "id" to
1311       * specify the record to update
1312       *
1313       * @param string $table The database table to be checked against.
1314       * @param object $dataobject An object with contents equal to fieldname=>fieldvalue. Must have an entry for 'id' to map to the table specified.
1315       * @param bool true means repeated updates expected
1316       * @return bool true
1317       * @throws dml_exception A DML specific exception is thrown for any errors.
1318       */
1319      public function update_record($table, $dataobject, $bulk=false) {
1320          $dataobject = (array)$dataobject;
1321  
1322          $columns = $this->get_columns($table);
1323          $cleaned = array();
1324  
1325          foreach ($dataobject as $field=>$value) {
1326              if (!isset($columns[$field])) {
1327                  continue;
1328              }
1329              $column = $columns[$field];
1330              $cleaned[$field] = $this->normalise_value($column, $value);
1331          }
1332  
1333          $this->update_record_raw($table, $cleaned, $bulk);
1334  
1335          return true;
1336      }
1337  
1338      /**
1339       * Set a single field in every table record which match a particular WHERE clause.
1340       *
1341       * @param string $table The database table to be checked against.
1342       * @param string $newfield the field to set.
1343       * @param string $newvalue the value to set the field to.
1344       * @param string $select A fragment of SQL to be used in a where clause in the SQL call.
1345       * @param array $params array of sql parameters
1346       * @return bool true
1347       * @throws dml_exception A DML specific exception is thrown for any errors.
1348       */
1349      public function set_field_select($table, $newfield, $newvalue, $select, array $params=null) {
1350  
1351          if ($select) {
1352              $select = "WHERE $select";
1353          }
1354          if (is_null($params)) {
1355              $params = array();
1356          }
1357          list($select, $params, $type) = $this->fix_sql_params($select, $params);
1358          $i = count($params)+1;
1359  
1360          // Get column metadata
1361          $columns = $this->get_columns($table);
1362          $column = $columns[$newfield];
1363  
1364          $normalisedvalue = $this->normalise_value($column, $newvalue);
1365  
1366          $newfield = "$newfield = \$" . $i;
1367          $params[] = $normalisedvalue;
1368          $sql = "UPDATE {$this->prefix}$table SET $newfield $select";
1369  
1370          $this->query_start($sql, $params, SQL_QUERY_UPDATE);
1371          $result = pg_query_params($this->pgsql, $sql, $params);
1372          $this->query_end($result);
1373  
1374          pg_free_result($result);
1375  
1376          return true;
1377      }
1378  
1379      /**
1380       * Delete one or more records from a table which match a particular WHERE clause, lobs not supported.
1381       *
1382       * @param string $table The database table to be checked against.
1383       * @param string $select A fragment of SQL to be used in a where clause in the SQL call (used to define the selection criteria).
1384       * @param array $params array of sql parameters
1385       * @return bool true
1386       * @throws dml_exception A DML specific exception is thrown for any errors.
1387       */
1388      public function delete_records_select($table, $select, array $params=null) {
1389          if ($select) {
1390              $select = "WHERE $select";
1391          }
1392          $sql = "DELETE FROM {$this->prefix}$table $select";
1393  
1394          list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
1395  
1396          $this->query_start($sql, $params, SQL_QUERY_UPDATE);
1397          $result = pg_query_params($this->pgsql, $sql, $params);
1398          $this->query_end($result);
1399  
1400          pg_free_result($result);
1401  
1402          return true;
1403      }
1404  
1405      /**
1406       * Returns 'LIKE' part of a query.
1407       *
1408       * @param string $fieldname usually name of the table column
1409       * @param string $param usually bound query parameter (?, :named)
1410       * @param bool $casesensitive use case sensitive search
1411       * @param bool $accensensitive use accent sensitive search (not all databases support accent insensitive)
1412       * @param bool $notlike true means "NOT LIKE"
1413       * @param string $escapechar escape char for '%' and '_'
1414       * @return string SQL code fragment
1415       */
1416      public function sql_like($fieldname, $param, $casesensitive = true, $accentsensitive = true, $notlike = false, $escapechar = '\\') {
1417          if (strpos($param, '%') !== false) {
1418              debugging('Potential SQL injection detected, sql_like() expects bound parameters (? or :named)');
1419          }
1420  
1421          // postgresql does not support accent insensitive text comparisons, sorry
1422          if ($casesensitive) {
1423              $LIKE = $notlike ? 'NOT LIKE' : 'LIKE';
1424          } else {
1425              $LIKE = $notlike ? 'NOT ILIKE' : 'ILIKE';
1426          }
1427          return "$fieldname $LIKE $param ESCAPE '$escapechar'";
1428      }
1429  
1430      public function sql_bitxor($int1, $int2) {
1431          return '((' . $int1 . ') # (' . $int2 . '))';
1432      }
1433  
1434      public function sql_cast_char2int($fieldname, $text=false) {
1435          return ' CAST(' . $fieldname . ' AS INT) ';
1436      }
1437  
1438      public function sql_cast_char2real($fieldname, $text=false) {
1439          return " $fieldname::real ";
1440      }
1441  
1442      public function sql_concat() {
1443          $arr = func_get_args();
1444          $s = implode(' || ', $arr);
1445          if ($s === '') {
1446              return " '' ";
1447          }
1448          // Add always empty string element so integer-exclusive concats
1449          // will work without needing to cast each element explicitly
1450          return " '' || $s ";
1451      }
1452  
1453      public function sql_concat_join($separator="' '", $elements=array()) {
1454          for ($n=count($elements)-1; $n > 0 ; $n--) {
1455              array_splice($elements, $n, 0, $separator);
1456          }
1457          $s = implode(' || ', $elements);
1458          if ($s === '') {
1459              return " '' ";
1460          }
1461          return " $s ";
1462      }
1463  
1464      /**
1465       * Return SQL for performing group concatenation on given field/expression
1466       *
1467       * @param string $field
1468       * @param string $separator
1469       * @param string $sort
1470       * @return string
1471       */
1472      public function sql_group_concat(string $field, string $separator = ', ', string $sort = ''): string {
1473          $fieldsort = $sort ? "ORDER BY {$sort}" : '';
1474          return "STRING_AGG(CAST({$field} AS VARCHAR), '{$separator}' {$fieldsort})";
1475      }
1476  
1477      public function sql_regex_supported() {
1478          return true;
1479      }
1480  
1481      public function sql_regex($positivematch = true, $casesensitive = false) {
1482          if ($casesensitive) {
1483              return $positivematch ? '~' : '!~';
1484          } else {
1485              return $positivematch ? '~*' : '!~*';
1486          }
1487      }
1488  
1489      /**
1490       * Does this driver support tool_replace?
1491       *
1492       * @since Moodle 2.6.1
1493       * @return bool
1494       */
1495      public function replace_all_text_supported() {
1496          return true;
1497      }
1498  
1499      public function session_lock_supported() {
1500          return true;
1501      }
1502  
1503      /**
1504       * Obtain session lock
1505       * @param int $rowid id of the row with session record
1506       * @param int $timeout max allowed time to wait for the lock in seconds
1507       * @return bool success
1508       */
1509      public function get_session_lock($rowid, $timeout) {
1510          // NOTE: there is a potential locking problem for database running
1511          //       multiple instances of moodle, we could try to use pg_advisory_lock(int, int),
1512          //       luckily there is not a big chance that they would collide
1513          if (!$this->session_lock_supported()) {
1514              return;
1515          }
1516  
1517          parent::get_session_lock($rowid, $timeout);
1518  
1519          $timeoutmilli = $timeout * 1000;
1520  
1521          $sql = "SET statement_timeout TO $timeoutmilli";
1522          $this->query_start($sql, null, SQL_QUERY_AUX);
1523          $result = pg_query($this->pgsql, $sql);
1524          $this->query_end($result);
1525  
1526          if ($result) {
1527              pg_free_result($result);
1528          }
1529  
1530          $sql = "SELECT pg_advisory_lock($rowid)";
1531          $this->query_start($sql, null, SQL_QUERY_AUX);
1532          $start = time();
1533          $result = pg_query($this->pgsql, $sql);
1534          $end = time();
1535          try {
1536              $this->query_end($result);
1537          } catch (dml_exception $ex) {
1538              if ($end - $start >= $timeout) {
1539                  throw new dml_sessionwait_exception();
1540              } else {
1541                  throw $ex;
1542              }
1543          }
1544  
1545          if ($result) {
1546              pg_free_result($result);
1547          }
1548  
1549          $sql = "SET statement_timeout TO DEFAULT";
1550          $this->query_start($sql, null, SQL_QUERY_AUX);
1551          $result = pg_query($this->pgsql, $sql);
1552          $this->query_end($result);
1553  
1554          if ($result) {
1555              pg_free_result($result);
1556          }
1557      }
1558  
1559      public function release_session_lock($rowid) {
1560          if (!$this->session_lock_supported()) {
1561              return;
1562          }
1563          if (!$this->used_for_db_sessions) {
1564              return;
1565          }
1566  
1567          parent::release_session_lock($rowid);
1568  
1569          $sql = "SELECT pg_advisory_unlock($rowid)";
1570          $this->query_start($sql, null, SQL_QUERY_AUX);
1571          $result = pg_query($this->pgsql, $sql);
1572          $this->query_end($result);
1573  
1574          if ($result) {
1575              pg_free_result($result);
1576          }
1577      }
1578  
1579      /**
1580       * Driver specific start of real database transaction,
1581       * this can not be used directly in code.
1582       * @return void
1583       */
1584      protected function begin_transaction() {
1585          $this->savepointpresent = true;
1586          $sql = "BEGIN ISOLATION LEVEL READ COMMITTED; SAVEPOINT moodle_pg_savepoint";
1587          $this->query_start($sql, null, SQL_QUERY_AUX);
1588          $result = pg_query($this->pgsql, $sql);
1589          $this->query_end($result);
1590  
1591          pg_free_result($result);
1592      }
1593  
1594      /**
1595       * Driver specific commit of real database transaction,
1596       * this can not be used directly in code.
1597       * @return void
1598       */
1599      protected function commit_transaction() {
1600          $this->savepointpresent = false;
1601          $sql = "RELEASE SAVEPOINT moodle_pg_savepoint; COMMIT";
1602          $this->query_start($sql, null, SQL_QUERY_AUX);
1603          $result = pg_query($this->pgsql, $sql);
1604          $this->query_end($result);
1605  
1606          pg_free_result($result);
1607      }
1608  
1609      /**
1610       * Driver specific abort of real database transaction,
1611       * this can not be used directly in code.
1612       * @return void
1613       */
1614      protected function rollback_transaction() {
1615          $this->savepointpresent = false;
1616          $sql = "RELEASE SAVEPOINT moodle_pg_savepoint; ROLLBACK";
1617          $this->query_start($sql, null, SQL_QUERY_AUX);
1618          $result = pg_query($this->pgsql, $sql);
1619          $this->query_end($result);
1620  
1621          pg_free_result($result);
1622      }
1623  
1624      /**
1625       * Helper function trimming (whitespace + quotes) any string
1626       * needed because PG uses to enclose with double quotes some
1627       * fields in indexes definition and others
1628       *
1629       * @param string $str string to apply whitespace + quotes trim
1630       * @return string trimmed string
1631       */
1632      private function trim_quotes($str) {
1633          return trim(trim($str), "'\"");
1634      }
1635  
1636      /**
1637       * Postgresql supports full-text search indexes.
1638       *
1639       * @return bool
1640       */
1641      public function is_fulltext_search_supported() {
1642          return true;
1643      }
1644  }