Passed
Push — main ( 2cd17f...238ec8 )
by Lorenzo
01:26 queued 16s
created

ExecutorImpl.getExecutionPhase   A

Complexity

Conditions 3

Size

Total Lines 5
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 3

Importance

Changes 0
Metric Value
eloc 5
dl 0
loc 5
c 0
b 0
f 0
ccs 2
cts 2
cp 1
rs 10
cc 3
crap 3
1 11
import EventEmitter from 'events';
2 11
import { logger } from '@/core';
3 11
import {
4
  isError, Result, wrap,
5
} from '@/core/errors';
6 11
import { Task } from './Task';
7
8 11
const phases = {
9
  0: 'start',
10
  1: 'register',
11
  2: 'routing',
12
  3: 'init',
13
  4: 'run',
14
  5: 'exit',
15
} as const;
16
export type ExecutorPhase = typeof phases[keyof typeof phases];
17
18
type ExecutorEventMap = {
19
  [phase in ExecutorPhase]: [Result<void>[]];
20
} & {
21
  error: [Error];
22
};
23
24
class ExecutorImpl extends EventEmitter<ExecutorEventMap> {
25 11
  public readonly PHASES = phases;
26
27 11
  private tasks: Map<ExecutorPhase, Array<Task>> = new Map();
28
29 11
  private phasePromises: Map<ExecutorPhase, Promise<Result<void>[]>> = new Map();
30
31 11
  private started = false;
32
33 11
  execution: Promise<Result<void>[]> | null = null;
34
35 11
  private taskMap: Map<any, Task> = new Map();
36
37
  constructor() {
38 11
    super();
39
  }
40
41
  /**
42
 * Registers a task to be executed in a given phase.
43
 * @param task {() => Promise<void> | void} function to be executed, can return a Promise or void
44
 * @param phase {ExecutorPhase} phase in which the task should be executed
45
 */
46
  setExecution(phase: ExecutorPhase, taskFn: () => Promise<void> | void, contextKey?: any, order = 0) {
47 95
    let phaseTasks = this.tasks.get(phase) ?? [];
48 95
    const task = new Task({ task: taskFn, phase, order });
49 95
    if (contextKey) {
50 14
      this.taskMap.set(contextKey, task);
51
    }
52 95
    phaseTasks.push(task);
53 95
    this.tasks.set(phase, phaseTasks);
54
  }
55
56
  getExecutionPhase(phase: ExecutorPhase) {
57 50
    return this.phasePromises.get(phase) ?? new Promise((resolve) => {
58 13
      this.once(phase, resolve);
59
    });
60
  }
61
62
  getTask(contextKey: any) {
63 6
    return this.taskMap.get(contextKey);
64
  }
65
66
  /**
67
 * Executes all registered tasks in a defined sequence based on phases.
68
 * Tasks are sorted by their phase index and executed concurrently within each phase.
69
 * Debug logs are generated for each phase indicating the number of tasks being executed.
70
 */
71
  async execute(): Promise<Result<void, Error>[]> {
72 64
    this.started = true;
73 64
    const phasesToExecute = Object.entries(phases)
74 384
      .filter(([, phase]) => phase !== 'exit')
75 320
      .map(([idx, phase]) => [Number(idx), phase] as const)
76 256
      .sort(([a], [b]) => a - b)
77 320
      .map(([, phase]) => phase);
78
79 64
    const results = [];
80 64
    for (const phase of phasesToExecute) {
81 320
      const result = await this.executePhase(phase);
82 318
      results.push(...result);
83
    }
84
85 62
    return results;
86
  }
87
88
  /**
89
   * Groups tasks by their execution order
90
   * @param tasks
91
   * @returns {{ [order: number]: (() => Promise<void> | void)[] }}}
92
   */
93
  private groupByOrder(tasks: Array<Task>): { [order: number]: (() => Promise<void> | void)[] } {
94 402
    return tasks.reduce((acc, taskObj) => {
95 85
      const { order, task } = taskObj;
96 85
      acc[order] = acc[order] ?? [];
97 85
      acc[order].push(task);
98 85
      return acc;
99
    }, {} as { [order: number]: (() => Promise<void> | void)[] });
100
  }
101
102
  /**
103
   * Executes all tasks in a given phase
104
   * @param phase {ExecutorPhase}
105
   * @returns {Promise<Result<void>[]>}
106
   */
107
  private async executePhase(phase: ExecutorPhase) {
108 402
    const tasksToExecute = this.tasks.get(phase) ?? [];
109 402
    logger.debug(`Executing phase ${phase} with ${tasksToExecute.length} tasks`);
110
111 402
    const groupedTasks = this.groupByOrder(tasksToExecute);
112
113 402
    const allResults: Result<void>[] = [];
114
115 402
    const sortedGroups = Object.entries(groupedTasks)
116 7
      .sort(([a], [b]) => Number(a) - Number(b));
117
118 402
    for (const [, tasks] of sortedGroups) {
119 85
      const wrappedTasks = tasks.map((task) => wrap(task));
120 81
      const groupResults = await Promise.all(wrappedTasks);
121
122 79
      allResults.push(...groupResults);
123
    }
124 400
    const processedResults = this.processPhaseResults(allResults, phase);
125
126 400
    const phasePromise = Promise.resolve(processedResults);
127 400
    this.phasePromises.set(phase, phasePromise);
128
129 400
    const result = await phasePromise;
130 400
    this.emit(phase, result);
131
132 400
    return result;
133
  }
134
135
  private processPhaseResults(results: Result<void>[], phase: ExecutorPhase): Result<void>[] {
136 400
    return results.map((result, index) => {
137 83
      if (isError(result)) {
138 7
        logger.error(new Error(`Task ${index} in phase ${phase} failed`, { cause: result.error }));
139 7
        this.emit('error', result.error);
140
      }
141 83
      return result;
142
    });
143
  }
144
145
  private async beforeExit() {
146 2
    await this.stopLifecycle();
147
  }
148
149
  private async gracefulShutdown() {
150 1
    logger.debug('Graceful shutdown');
151 1
    await this.stopLifecycle();
152 1
    logger.info('Graceful shutdown completed');
153 1
    process.exit(0);
154
  }
155
156
  /**
157
 * Starts the lifecycle of the ExpressBeans application.
158
 * If there are tasks to execute, they are executed in the defined sequence.
159
 * If execution is already in progress, the function does nothing.
160
 * @returns {void}
161
 */
162
  startLifecycle(): void {
163 38
    setImmediate(() => {
164 38
      if (this.started) {
165 1
        return;
166
      }
167 37
      logger.debug('Starting lifecycle');
168 37
      process.on('beforeExit', this.beforeExit.bind(this));
169 37
      process.on('SIGTERM', this.gracefulShutdown.bind(this));
170 37
      process.on('SIGINT', this.gracefulShutdown.bind(this));
171 37
      this.execution = this.execute();
172
    });
173
  }
174
175
  /**
176
 * Stops the lifecycle of the ExpressBeans application.
177
 * All tasks are cleared and the lifecycle is stopped.
178
 * USE ONLY IF YOU KNOW WHAT YOU ARE DOING
179
 * @returns {Promise<void>}
180
 */
181
  async stopLifecycle(): Promise<void> {
182 82
    logger.debug('Stopping lifecycle');
183 82
    await this.executePhase('exit').then(() => {
184 82
      this.phasePromises.clear();
185 82
      this.tasks.clear();
186 82
      this.started = false;
187 82
      this.execution = null;
188 82
      this.removeAllListeners();
189 82
      logger.debug('Lifecycle stopped');
190 82
      process.removeAllListeners();
191
    });
192
  }
193
}
194
195
type ExecutorType = typeof ExecutorImpl & ExecutorImpl;
196 11
let instance: ExecutorImpl | null = null;
197
198
function getInstance() {
199 11
  instance ??= new ExecutorImpl();
200 11
  return instance as ExecutorType;
201
}
202
203
export const Executor: ExecutorType = getInstance();
204