Completed
Branch master (099915)
by Fabio
08:02
created

TPgsqlMetaData::getPrimaryKeys()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 23
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
eloc 13
c 1
b 0
f 0
nc 2
nop 3
dl 0
loc 23
ccs 10
cts 10
cp 1
crap 2
rs 9.8333
1
<?php
2
/**
3
 * TPgsqlMetaData class file.
4
 *
5
 * @author Wei Zhuo <weizhuo[at]gmail[dot]com>
6
 * @link https://github.com/pradosoft/prado
7
 * @license https://github.com/pradosoft/prado/blob/master/LICENSE
8
 * @package Prado\Data\Common\Pgsql
9
 */
10
11
namespace Prado\Data\Common\Pgsql;
12
13
/**
14
 * Load the base TDbMetaData class.
15
 */
16
use Prado\Data\Common\TDbMetaData;
17
use Prado\Exceptions\TDbException;
18
use Prado\Prado;
19
20
/**
21
 * TPgsqlMetaData loads PostgreSQL database table and column information.
22
 *
23
 * @author Wei Zhuo <weizho[at]gmail[dot]com>
24
 * @package Prado\Data\Common\Pgsql
25
 * @since 3.1
26
 */
27
class TPgsqlMetaData extends TDbMetaData
28
{
29
	private $_defaultSchema = 'public';
30
31
	/**
32
	 * @return string TDbTableInfo class name.
33
	 */
34 5
	protected function getTableInfoClass()
35
	{
36 5
		return '\Prado\Data\Common\Pgsql\TPgsqlTableInfo';
37
	}
38
39
	/**
40
	 * Quotes a table name for use in a query.
41
	 * @param string $name $name table name
42
	 * @return string the properly quoted table name
43
	 */
44
	public function quoteTableName($name)
45
	{
46
		return parent::quoteTableName($name, '"', '"');
47
	}
48
49
	/**
50
	 * Quotes a column name for use in a query.
51
	 * @param string $name $name column name
52
	 * @return string the properly quoted column name
53
	 */
54
	public function quoteColumnName($name)
55
	{
56
		return parent::quoteColumnName($name, '"', '"');
57
	}
58
59
	/**
60
	 * Quotes a column alias for use in a query.
61
	 * @param string $name $name column alias
62
	 * @return string the properly quoted column alias
63
	 */
64
	public function quoteColumnAlias($name)
65
	{
66
		return parent::quoteColumnAlias($name, '"', '"');
67
	}
68
69
	/**
70
	 * @param string $schema default schema.
71
	 */
72
	public function setDefaultSchema($schema)
73
	{
74
		$this->_defaultSchema = $schema;
75
	}
76
77
	/**
78
	 * @return string default schema.
79
	 */
80 4
	public function getDefaultSchema()
81
	{
82 4
		return $this->_defaultSchema;
83
	}
84
85
	/**
86
	 * @param string $table table name with optional schema name prefix, uses default schema name prefix is not provided.
87
	 * @return array tuple as ($schemaName,$tableName)
88
	 */
89 5
	protected function getSchemaTableName($table)
90
	{
91 5
		if (count($parts = explode('.', str_replace('"', '', $table))) > 1) {
92 1
			return [$parts[0], $parts[1]];
93
		} else {
94 4
			return [$this->getDefaultSchema(), $parts[0]];
95
		}
96
	}
97
98
	/**
99
	 * Get the column definitions for given table.
100
	 * @param string $table table name.
101
	 * @return TPgsqlTableInfo table information.
102
	 */
103 5
	protected function createTableInfo($table)
104
	{
105 5
		[$schemaName, $tableName] = $this->getSchemaTableName($table);
106
107
		// This query is made much more complex by the addition of the 'attisserial' field.
108
		// The subquery to get that field checks to see if there is an internally dependent
109
		// sequence on the field.
110
		$sql =
111
<<<EOD
112 5
		SELECT
113
			a.attname,
114
			pg_catalog.format_type(a.atttypid, a.atttypmod) as type,
115
			a.atttypmod,
116
			a.attnotnull, a.atthasdef, adef.adsrc,
117
			(
118
				SELECT 1 FROM pg_catalog.pg_depend pd, pg_catalog.pg_class pc
119
				WHERE pd.objid=pc.oid
120
				AND pd.classid=pc.tableoid
121
				AND pd.refclassid=pc.tableoid
122
				AND pd.refobjid=a.attrelid
123
				AND pd.refobjsubid=a.attnum
124
				AND pd.deptype='i'
125
				AND pc.relkind='S'
126
			) IS NOT NULL AS attisserial
127
128
		FROM
129
			pg_catalog.pg_attribute a LEFT JOIN pg_catalog.pg_attrdef adef
130
			ON a.attrelid=adef.adrelid
131
			AND a.attnum=adef.adnum
132
			LEFT JOIN pg_catalog.pg_type t ON a.atttypid=t.oid
133
		WHERE
134
			a.attrelid = (SELECT oid FROM pg_catalog.pg_class WHERE relname=:table
135
				AND relnamespace = (SELECT oid FROM pg_catalog.pg_namespace WHERE
136
				nspname = :schema))
137
			AND a.attnum > 0 AND NOT a.attisdropped
138
		ORDER BY a.attnum
139
EOD;
140 5
		$this->getDbConnection()->setActive(true);
141 5
		$command = $this->getDbConnection()->createCommand($sql);
142 5
		$command->bindValue(':table', $tableName);
143 5
		$command->bindValue(':schema', $schemaName);
144 5
		$tableInfo = $this->createNewTableInfo($schemaName, $tableName);
145 5
		$index = 0;
146 5
		foreach ($command->query() as $col) {
147 5
			$col['index'] = $index++;
148 5
			$this->processColumn($tableInfo, $col);
149
		}
150 5
		if ($index === 0) {
151
			throw new TDbException('dbmetadata_invalid_table_view', $table);
152
		}
153 5
		return $tableInfo;
154
	}
155
156
	/**
157
	 * @param string $schemaName table schema name
158
	 * @param string $tableName table name.
159
	 * @return TPgsqlTableInfo
160
	 */
161 5
	protected function createNewTableInfo($schemaName, $tableName)
162
	{
163 5
		$info['SchemaName'] = $this->assertIdentifier($schemaName);
0 ignored issues
show
Comprehensibility Best Practice introduced by
$info was never initialized. Although not strictly required by PHP, it is generally a good practice to add $info = array(); before regardless.
Loading history...
164 5
		$info['TableName'] = $this->assertIdentifier($tableName);
165 5
		if ($this->getIsView($schemaName, $tableName)) {
166
			$info['IsView'] = true;
167
		}
168 5
		[$primary, $foreign] = $this->getConstraintKeys($schemaName, $tableName);
169 5
		$class = $this->getTableInfoClass();
170 5
		return new $class($info, $primary, $foreign);
171
	}
172
173
	/**
174
	 * @param string $name table name, schema name or column name.
175
	 * @throws TDbException when table name contains a double quote (").
176
	 * @return string a valid identifier.
177
	 */
178 5
	protected function assertIdentifier($name)
179
	{
180 5
		if (strpos($name, '"') !== false) {
181
			$ref = 'http://www.postgresql.org/docs/7.4/static/sql-syntax.html#SQL-SYNTAX-IDENTIFIERS';
182
			throw new TDbException('dbcommon_invalid_identifier_name', $name, $ref);
183
		}
184 5
		return $name;
185
	}
186
187
	/**
188
	 * @param string $schemaName table schema name
189
	 * @param string $tableName table name.
190
	 * @return bool true if the table is a view.
191
	 */
192 5
	protected function getIsView($schemaName, $tableName)
193
	{
194
		$sql =
195
<<<EOD
196 5
		SELECT count(c.relname) FROM pg_catalog.pg_class c
197
		LEFT JOIN pg_catalog.pg_namespace n ON (n.oid = c.relnamespace)
198
		WHERE (n.nspname=:schema) AND (c.relkind = 'v'::"char") AND c.relname = :table
199
EOD;
200 5
		$this->getDbConnection()->setActive(true);
201 5
		$command = $this->getDbConnection()->createCommand($sql);
202 5
		$command->bindValue(':schema', $schemaName);
203 5
		$command->bindValue(':table', $tableName);
204 5
		return (int) ($command->queryScalar()) === 1;
205
	}
206
207
	/**
208
	 * @param TPgsqlTableInfo $tableInfo table information.
209
	 * @param array $col column information.
210
	 */
211 5
	protected function processColumn($tableInfo, $col)
212
	{
213 5
		$columnId = $col['attname']; //use column name as column Id
214
215 5
		$info['ColumnName'] = '"' . $columnId . '"'; //quote the column names!
0 ignored issues
show
Comprehensibility Best Practice introduced by
$info was never initialized. Although not strictly required by PHP, it is generally a good practice to add $info = array(); before regardless.
Loading history...
216 5
		$info['ColumnId'] = $columnId;
217 5
		$info['ColumnIndex'] = $col['index'];
218 5
		if (!$col['attnotnull']) {
219
			$info['AllowNull'] = true;
220
		}
221 5
		if (in_array($columnId, $tableInfo->getPrimaryKeys())) {
222 5
			$info['IsPrimaryKey'] = true;
223
		}
224 5
		if ($this->isForeignKeyColumn($columnId, $tableInfo)) {
225 5
			$info['IsForeignKey'] = true;
226
		}
227
228 5
		if ($col['atttypmod'] > 0) {
229 5
			$info['ColumnSize'] = $col['atttypmod'] - 4;
230
		}
231 5
		if ($col['atthasdef']) {
232 5
			$info['DefaultValue'] = $col['adsrc'];
233
		}
234 5
		if ($col['attisserial'] || substr($col['adsrc'], 0, 8) === 'nextval(') {
235 5
			if (($sequence = $this->getSequenceName($tableInfo, $col['adsrc'])) !== null) {
0 ignored issues
show
introduced by
The condition $sequence = $this->getSe...$col['adsrc']) !== null is always true.
Loading history...
236 5
				$info['SequenceName'] = $sequence;
237 5
				unset($info['DefaultValue']);
238
			}
239
		}
240 5
		$matches = [];
241 5
		if (preg_match('/\((\d+)(?:,(\d+))?+\)/', $col['type'], $matches)) {
242 5
			$info['DbType'] = preg_replace('/\(\d+(?:,\d+)?\)/', '', $col['type']);
243 5
			if ($this->isPrecisionType($info['DbType'])) {
244 5
				$info['NumericPrecision'] = (int) ($matches[1]);
245 5
				if (count($matches) > 2) {
246 5
					$info['NumericScale'] = (int) ($matches[2]);
247
				}
248
			} else {
249 5
				$info['ColumnSize'] = (int) ($matches[1]);
250
			}
251
		} else {
252 5
			$info['DbType'] = $col['type'];
253
		}
254
255 5
		$tableInfo->Columns[$columnId] = new TPgsqlTableColumn($info);
0 ignored issues
show
Bug Best Practice introduced by
The property Columns does not exist on Prado\Data\Common\Pgsql\TPgsqlTableInfo. Since you implemented __get, consider adding a @property annotation.
Loading history...
256 5
	}
257
258
	/**
259
	 * @param TPgsqlTableInfo $tableInfo
260
	 * @param mixed $src
261
	 * @return string serial name if found, null otherwise.
262
	 */
263 5
	protected function getSequenceName($tableInfo, $src)
264
	{
265 5
		$matches = [];
266 5
		if (preg_match('/nextval\([^\']*\'([^\']+)\'[^\)]*\)/i', $src, $matches)) {
267 5
			if (is_int(strpos($matches[1], '.'))) {
0 ignored issues
show
introduced by
The condition is_int(strpos($matches[1], '.')) is always true.
Loading history...
268
				return $matches[1];
269
			} else {
270 5
				return $tableInfo->getSchemaName() . '.' . $matches[1];
271
			}
272
		}
273
	}
274
275
	/**
276
	 * @param mixed $type
277
	 * @return bool true if column type if "numeric", "interval" or begins with "time".
278
	 */
279 5
	protected function isPrecisionType($type)
280
	{
281 5
		$type = strtolower(trim($type));
282 5
		return $type === 'numeric' || $type === 'interval' || strpos($type, 'time') === 0;
283
	}
284
285
	/**
286
	 * Gets the primary and foreign key column details for the given table.
287
	 * @param string $schemaName schema name
288
	 * @param string $tableName table name.
289
	 * @return array tuple ($primary, $foreign)
290
	 */
291 5
	protected function getConstraintKeys($schemaName, $tableName)
292
	{
293
		$sql =
294
<<<EOD
295 5
	SELECT conname, consrc, contype, indkey, indisclustered FROM (
296
			SELECT
297
					conname,
298
					CASE WHEN contype='f' THEN
299
							pg_catalog.pg_get_constraintdef(oid)
300
					ELSE
301
							'CHECK (' || consrc || ')'
302
					END AS consrc,
303
					contype,
304
					conrelid AS relid,
305
					NULL AS indkey,
306
					FALSE AS indisclustered
307
			FROM
308
					pg_catalog.pg_constraint
309
			WHERE
310
					contype IN ('f', 'c')
311
			UNION ALL
312
			SELECT
313
					pc.relname,
314
					NULL,
315
					CASE WHEN indisprimary THEN
316
							'p'
317
					ELSE
318
							'u'
319
					END,
320
					pi.indrelid,
321
					indkey,
322
					pi.indisclustered
323
			FROM
324
					pg_catalog.pg_class pc,
325
					pg_catalog.pg_index pi
326
			WHERE
327
					pc.oid=pi.indexrelid
328
					AND EXISTS (
329
							SELECT 1 FROM pg_catalog.pg_depend d JOIN pg_catalog.pg_constraint c
330
							ON (d.refclassid = c.tableoid AND d.refobjid = c.oid)
331
							WHERE d.classid = pc.tableoid AND d.objid = pc.oid AND d.deptype = 'i' AND c.contype IN ('u', 'p')
332
			)
333
	) AS sub
334
	WHERE relid = (SELECT oid FROM pg_catalog.pg_class WHERE relname=:table
335
					AND relnamespace = (SELECT oid FROM pg_catalog.pg_namespace
336
					WHERE nspname=:schema))
337
	ORDER BY
338
			1
339
EOD;
340 5
		$this->getDbConnection()->setActive(true);
341 5
		$command = $this->getDbConnection()->createCommand($sql);
342 5
		$command->bindValue(':table', $tableName);
343 5
		$command->bindValue(':schema', $schemaName);
344 5
		$primary = [];
345 5
		$foreign = [];
346 5
		foreach ($command->query() as $row) {
347 5
			switch ($row['contype']) {
348 5
				case 'p':
349 5
					$primary = $this->getPrimaryKeys($tableName, $schemaName, $row['indkey']);
350 5
					break;
351 5
				case 'f':
352 5
					if (($fkey = $this->getForeignKeys($row['consrc'])) !== null) {
353 5
						$foreign[] = $fkey;
354
					}
355 5
					break;
356
			}
357
		}
358 5
		return [$primary, $foreign];
359
	}
360
361
	/**
362
	 * Gets the primary key field names
363
	 * @param string $tableName
364
	 * @param string $schemaName
365
	 * @param string $columnIndex
366
	 * @return array primary key field names.
367
	 */
368 5
	protected function getPrimaryKeys($tableName, $schemaName, $columnIndex)
369
	{
370 5
		$index = implode(', ', explode(' ', $columnIndex));
371
		$sql =
372
<<<EOD
373
		SELECT attnum, attname FROM pg_catalog.pg_attribute WHERE
374
		attrelid=(
375
			SELECT oid FROM pg_catalog.pg_class WHERE relname=:table AND relnamespace=(
376
				SELECT oid FROM pg_catalog.pg_namespace WHERE nspname=:schema
377
			)
378
		)
379 5
				AND attnum IN ({$index})
380
EOD;
381 5
		$command = $this->getDbConnection()->createCommand($sql);
382 5
		$command->bindValue(':table', $tableName);
383 5
		$command->bindValue(':schema', $schemaName);
384
//		$command->bindValue(':columnIndex', join(', ', explode(' ', $columnIndex)));
385 5
		$primary = [];
386 5
		foreach ($command->query() as $row) {
387 5
			$primary[] = $row['attname'];
388
		}
389
390 5
		return $primary;
391
	}
392
393
	/**
394
	 * Gets foreign relationship constraint keys and table name
395
	 * @param string $src pgsql foreign key definition
396
	 * @return array foreign relationship table name and keys, null otherwise
397
	 */
398 5
	protected function getForeignKeys($src)
399
	{
400 5
		$matches = [];
401 5
		$brackets = '\(([^\)]+)\)';
402 5
		$find = "/FOREIGN\s+KEY\s+{$brackets}\s+REFERENCES\s+([^\(]+){$brackets}/i";
403 5
		if (preg_match($find, $src, $matches)) {
404 5
			$keys = preg_split('/,\s+/', $matches[1]);
405 5
			$fkeys = [];
406 5
			foreach (preg_split('/,\s+/', $matches[3]) as $i => $fkey) {
407 5
				$fkeys[$keys[$i]] = $fkey;
408
			}
409 5
			return ['table' => str_replace('"', '', $matches[2]), 'keys' => $fkeys];
410
		}
411
	}
412
413
	/**
414
	 * @param string $columnId column name.
415
	 * @param TPgsqlTableInfo $tableInfo table information.
416
	 * @return bool true if column is a foreign key.
417
	 */
418 5
	protected function isForeignKeyColumn($columnId, $tableInfo)
419
	{
420 5
		foreach ($tableInfo->getForeignKeys() as $fk) {
421 5
			if (in_array($columnId, array_keys($fk['keys']))) {
422 5
				return true;
423
			}
424
		}
425 5
		return false;
426
	}
427
428
	/**
429
	 * Returns all table names in the database.
430
	 * @param string $schema the schema of the tables. Defaults to empty string, meaning the current or default schema.
431
	 * If not empty, the returned table names will be prefixed with the schema name.
432
	 * @return array all table names in the database.
433
	 */
434
	public function findTableNames($schema = 'public')
435
	{
436
		if ($schema === '') {
437
			$schema = self::DEFAULT_SCHEMA;
0 ignored issues
show
Bug introduced by
The constant Prado\Data\Common\Pgsql\...etaData::DEFAULT_SCHEMA was not found. Maybe you did not declare it correctly or list all dependencies?
Loading history...
438
		}
439
		$sql = <<<EOD
440
SELECT table_name, table_schema FROM information_schema.tables
441
WHERE table_schema=:schema AND table_type='BASE TABLE'
442
EOD;
443
		$command = $this->getDbConnection()->createCommand($sql);
444
		$command->bindParam(':schema', $schema);
445
		$rows = $command->queryAll();
446
		$names = [];
447
		foreach ($rows as $row) {
448
			if ($schema === self::DEFAULT_SCHEMA) {
449
				$names[] = $row['table_name'];
450
			} else {
451
				$names[] = $row['table_schema'] . '.' . $row['table_name'];
452
			}
453
		}
454
		return $names;
455
	}
456
}
457