Workflow::arrayToMigration()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 10
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 8
c 0
b 0
f 0
nc 1
nop 1
dl 0
loc 10
rs 10
1
<?php
2
3
namespace Kaliop\eZWorkflowEngineBundle\Core\StorageHandler\Database;
4
5
use Kaliop\eZMigrationBundle\Core\StorageHandler\Database\Migration as StorageMigration;
6
use Kaliop\eZMigrationBundle\API\Value\Migration;
7
use Kaliop\eZMigrationBundle\API\Value\MigrationDefinition;
8
use Kaliop\eZMigrationBundle\API\ConfigResolverInterface;
9
use Kaliop\eZWorkflowEngineBundle\API\Value\Workflow as APIWorkflow;
10
use eZ\Publish\Core\Persistence\Database\DatabaseHandler;
11
use Doctrine\DBAL\Exception\UniqueConstraintViolationException;
12
use Doctrine\DBAL\Schema\Schema;
13
14
/**
15
 * @todo add methods aliases using 'Workflow' in place of 'Migration'
16
 */
17
class Workflow extends StorageMigration
18
{
19
    protected $fieldList = 'migration, md5, path, execution_date, status, execution_error, signal_name';
20
21
    /**
22
     * @param DatabaseHandler $dbHandler
23
     * @param string $tableNameParameter
24
     * @param ConfigResolverInterface $configResolver
25
     * @throws \Exception
26
     */
27
    public function __construct(DatabaseHandler $dbHandler, $tableNameParameter = 'kaliop_workflows', ConfigResolverInterface $configResolver = null)
28
    {
29
        parent::__construct($dbHandler, $tableNameParameter, $configResolver);
30
    }
31
32
    /**
33
     * @param MigrationDefinition $migrationDefinition
34
     * @return APIWorkflow
35
     * @throws \Exception
36
     */
37
    public function addMigration(MigrationDefinition $migrationDefinition)
38
    {
39
        throw new \Exception("Can not add workflows to the database if not when starting them");
40
    }
41
42
    /// @todo should we handle $force ? Or maybe throw if it is not false...
43
    protected function createMigration(MigrationDefinition $migrationDefinition, $status, $action, $force = false)
44
    {
45
        $this->createTableIfNeeded();
46
47
        $workflowName = $this->getWorkflowName($migrationDefinition->name);
48
49
        // select for update
50
51
        // annoyingly enough, neither Doctrine nor EZP provide built in support for 'FOR UPDATE' in their query builders...
52
        // at least the doctrine one allows us to still use parameter binding when we add our sql particle
53
        $conn = $this->dbHandler->getConnection();
54
55
        $qb = $conn->createQueryBuilder();
56
        $qb->select('*')
57
            ->from($this->tableName, 'm')
58
            ->where('migration = ?');
59
        $sql = $qb->getSQL() . ' FOR UPDATE';
60
61
        $conn->beginTransaction();
62
63
        $stmt = $conn->executeQuery($sql, array($workflowName));
64
        $existingMigrationData = $stmt->fetch(\PDO::FETCH_ASSOC);
65
66
        if (is_array($existingMigrationData)) {
67
            // workflow exists - start it
68
69
            $workflowName = $existingMigrationData['name'];
70
71
            // fail if it was already executing or already done
72
            if ($existingMigrationData['status'] == Migration::STATUS_STARTED) {
73
                // commit to release the lock
74
                $conn->commit();
75
                throw new \Exception("Workflow '{$migrationDefinition->name}' can not be $action as it is already executing");
76
            }
77
            if ($existingMigrationData['status'] == Migration::STATUS_DONE) {
78
                // commit to release the lock
79
                $conn->commit();
80
                throw new \Exception("Workflow '{$migrationDefinition->name}' can not be $action as it was already executed");
81
            }
82
            if ($existingMigrationData['status'] == Migration::STATUS_SKIPPED) {
83
                // commit to release the lock
84
                $conn->commit();
85
                throw new \Exception("Workflow '{$migrationDefinition->name}' can not be $action as it was already skipped");
86
            }
87
88
            // do not set migration start date if we are skipping it
89
            $migration = new APIWorkflow(
90
                $workflowName,
91
                md5($migrationDefinition->rawDefinition),
92
                $migrationDefinition->path,
93
                ($status == Migration::STATUS_SKIPPED ? null : time()),
94
                $status,
95
                null,
96
                $migrationDefinition->signalName
97
            );
98
            $conn->update(
99
                $this->tableName,
100
                array(
101
                    'execution_date' => $migration->executionDate,
102
                    'status' => $status,
103
                    'execution_error' => null
104
                ),
105
                array('migration' => $workflowName)
106
            );
107
            $conn->commit();
108
109
        } else {
110
            // migration did not exist. Create it!
111
112
            // commit immediately, to release the lock and avoid deadlocks
113
            $conn->commit();
114
115
            $migration = new APIWorkflow(
116
                $workflowName,
117
                md5($migrationDefinition->rawDefinition),
118
                $migrationDefinition->path,
119
                ($status == Migration::STATUS_SKIPPED ? null : time()),
120
                $status,
121
                null,
122
                $migrationDefinition->signalName
123
            );
124
            $conn->insert($this->tableName, $this->migrationToArray($migration));
125
        }
126
127
        return $migration;
128
    }
129
130
    public function skipMigration(MigrationDefinition $migrationDefinition)
131
    {
132
        throw new \Exception("Can not tag workflows in the database as to be skipped");
133
    }
134
135
    public function createTable()
136
    {
137
        /** @var \Doctrine\DBAL\Schema\AbstractSchemaManager $sm */
138
        $sm = $this->dbHandler->getConnection()->getSchemaManager();
139
        $dbPlatform = $sm->getDatabasePlatform();
140
141
        $schema = new Schema();
142
143
        $t = $schema->createTable($this->tableName);
144
        $t->addColumn('migration', 'string', array('length' => 255));
145
        $t->addColumn('path', 'string', array('length' => 4000));
146
        $t->addColumn('md5', 'string', array('length' => 32));
147
        $t->addColumn('execution_date', 'integer', array('notnull' => false));
148
        $t->addColumn('status', 'integer', array('default ' => Migration::STATUS_TODO));
149
        $t->addColumn('execution_error', 'string', array('length' => 4000, 'notnull' => false));
150
        $t->addColumn('signal_name', 'string', array('length' => 4000, 'notnull' => false));
151
        $t->setPrimaryKey(array('migration'));
152
        // in case users want to look up migrations by their full path
153
        // NB: disabled for the moment, as it causes problems on some versions of mysql which limit index length to 767 bytes,
154
        // and 767 bytes can be either 255 chars or 191 chars depending on charset utf8 or utf8mb4...
155
        //$t->addIndex(array('path'));
156
157
        /// @todo add support for utf8mb4 charset
158
159
        foreach ($schema->toSql($dbPlatform) as $sql) {
160
            $this->dbHandler->exec($sql);
161
        }
162
    }
163
164
    protected function migrationToArray(Migration $migration)
165
    {
166
        return array(
167
            'migration' => $migration->name,
168
            'md5' => $migration->md5,
169
            'path' => $migration->path,
170
            'execution_date' => $migration->executionDate,
171
            'status' => $migration->status,
172
            'execution_error' => $migration->executionError,
173
            'signal_name' => $migration->signalName
174
        );
175
    }
176
177
    protected function arrayToMigration(array $data)
178
    {
179
        return new APIWorkflow(
180
            $data['migration'],
181
            $data['md5'],
182
            $data['path'],
183
            $data['execution_date'],
184
            $data['status'],
185
            $data['execution_error'],
186
            $data['signal_name']
187
        );
188
    }
189
190
    /**
191
     * @param string $workflowDefinitionName
192
     * @return string currently YYYYMMDDHHmmSSuuu_PID/$workflowDefinitionName
193
     * @todo for strict sorting across leap hours we should use unix timestamp
194
     * @bug if we have a cluster of servers, we migth hit a pid collision...
195
     */
196
    protected function getWorkflowName($workflowDefinitionName)
197
    {
198
        $mtime = explode(' ', microtime());
199
        $time = date('YmdHis', $mtime[1]). substr($mtime[0], 2, 3);
0 ignored issues
show
Bug introduced by
$mtime[1] of type string is incompatible with the type integer|null expected by parameter $timestamp of date(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

199
        $time = date('YmdHis', /** @scrutinizer ignore-type */ $mtime[1]). substr($mtime[0], 2, 3);
Loading history...
200
        return $time . '/' . getmypid() . '/' . $workflowDefinitionName;
201
    }
202
}
203