1
|
|
|
package io.mcarle.strix; |
2
|
|
|
|
3
|
|
|
import io.mcarle.strix.annotation.Transactional; |
4
|
|
|
import org.aspectj.lang.ProceedingJoinPoint; |
5
|
|
|
import org.slf4j.Logger; |
6
|
|
|
import org.slf4j.LoggerFactory; |
7
|
|
|
import pl.touk.throwing.ThrowingFunction; |
8
|
|
|
|
9
|
|
|
import javax.persistence.EntityManager; |
10
|
|
|
import javax.persistence.EntityManagerFactory; |
11
|
|
|
import javax.persistence.EntityTransaction; |
12
|
|
|
import javax.persistence.Persistence; |
13
|
|
|
import java.util.Collections; |
14
|
|
|
import java.util.Map; |
15
|
|
|
import java.util.concurrent.ConcurrentHashMap; |
16
|
|
|
import java.util.concurrent.ExecutionException; |
17
|
|
|
import java.util.concurrent.FutureTask; |
18
|
|
|
|
19
|
|
|
/** |
20
|
|
|
* Strix's main logic. |
21
|
|
|
*/ |
22
|
1 |
|
final class StrixManager { |
23
|
|
|
|
24
|
1 |
|
private static final Logger LOG = LoggerFactory.getLogger(TransactionalAspect.class); |
25
|
1 |
|
private static final Map<String, EntityManagerFactory> SESSION_FACTORY_STORE = new ConcurrentHashMap<>(); |
26
|
1 |
|
private static final Map<String, Map<String, String>> PERSISTENCE_PROPERTIES = new ConcurrentHashMap<>(); |
27
|
|
|
private static final String STRIX_DEFAULT_PERSISTENCE_UNIT = "DUMMY_VALUE"; |
28
|
1 |
|
static boolean STARTED = false; |
29
|
1 |
|
private static String DEFAULT_PERSISTENCE_UNIT = STRIX_DEFAULT_PERSISTENCE_UNIT; |
30
|
|
|
|
31
|
|
|
/** |
32
|
|
|
* Start strix with additional persistence properties and a default persistence unit. |
33
|
|
|
* |
34
|
|
|
* @param persistenceProperties Map of persistence unit to persistence properties map |
35
|
|
|
* @param defaultPersistenceUnit The default persistence unit |
36
|
|
|
*/ |
37
|
|
|
static void startup(Map<String, Map<String, String>> persistenceProperties, String defaultPersistenceUnit) { |
38
|
1 |
|
LOG.trace("Startup strix"); |
39
|
1 |
|
if (STARTED) { |
40
|
1 |
|
LOG.trace("Strix already running, shutdown"); |
41
|
1 |
|
shutdown(); |
42
|
|
|
} |
43
|
1 |
|
LOG.trace("Set default persistence unit to '{}'", defaultPersistenceUnit); |
44
|
1 |
|
DEFAULT_PERSISTENCE_UNIT = defaultPersistenceUnit; |
45
|
|
|
|
46
|
1 |
|
if (persistenceProperties != null) { |
47
|
1 |
|
LOG.trace("Save persistence properties"); |
48
|
1 |
|
persistenceProperties.keySet().forEach(key -> |
49
|
1 |
|
PERSISTENCE_PROPERTIES.put(key, Collections.unmodifiableMap(persistenceProperties.get(key))) |
50
|
|
|
); |
51
|
|
|
} |
52
|
1 |
|
STARTED = true; |
53
|
1 |
|
LOG.info("Strix started"); |
54
|
1 |
|
} |
55
|
|
|
|
56
|
|
|
/** |
57
|
|
|
* Shutdown strix, i.e. close all {@link EntityManagerFactory} and clear persistence properties. |
58
|
|
|
*/ |
59
|
|
|
static void shutdown() { |
60
|
1 |
|
LOG.trace("Shutdown strix"); |
61
|
1 |
|
STARTED = false; |
62
|
1 |
|
LOG.info("Close all open EntityManagerFactories."); |
63
|
1 |
|
SESSION_FACTORY_STORE.values().forEach(EntityManagerFactory::close); |
64
|
1 |
|
SESSION_FACTORY_STORE.clear(); |
65
|
1 |
|
LOG.debug("Restore initial default values"); |
66
|
1 |
|
DEFAULT_PERSISTENCE_UNIT = STRIX_DEFAULT_PERSISTENCE_UNIT; |
67
|
1 |
|
PERSISTENCE_PROPERTIES.clear(); |
68
|
1 |
|
} |
69
|
|
|
|
70
|
|
|
/** |
71
|
|
|
* Called whenever from the {@link TransactionalAspect} and ensures that the method runs in a transactional context, |
72
|
|
|
* i.e. ensures there is an open {@link EntityManager} when invoking {@link Strix#em()}. |
73
|
|
|
* |
74
|
|
|
* @param joinPoint The aspectj reference to the aspected method |
75
|
|
|
* @param transactional The {@link Transactional} annotation of the aspected method |
76
|
|
|
* @return The result of the aspected method |
77
|
|
|
* @throws Throwable If the aspected method throws an exception |
78
|
|
|
*/ |
79
|
|
|
static Object handleTransactionalMethodExecution( |
80
|
|
|
ProceedingJoinPoint joinPoint, |
81
|
|
|
Transactional transactional |
82
|
|
|
) throws Throwable { |
83
|
1 |
|
LOG.trace("Handle @Transactional method execution"); |
84
|
1 |
|
String persistenceUnit = transactional.persistenceUnit(); |
85
|
1 |
|
if (!PersistenceManager.isEntityManagerPresent()) { |
86
|
1 |
|
LOG.debug("No transaction active in current thread"); |
87
|
|
|
|
88
|
1 |
|
Class<? extends Throwable>[] noRollbackFor = transactional.noRollbackFor(); |
89
|
1 |
|
boolean readOnly = transactional.readOnly(); |
90
|
1 |
|
final int timeoutTime = transactional.timeout(); |
91
|
|
|
|
92
|
1 |
|
return executeWithTransaction( |
93
|
1 |
|
(em) -> joinPoint.proceed(), |
94
|
1 |
|
persistenceUnit, |
95
|
1 |
|
timeoutTime, |
96
|
1 |
|
noRollbackFor, |
97
|
1 |
|
readOnly |
98
|
|
|
); |
99
|
1 |
|
} else if (!PersistenceManager.isEntityManagerFromPU(persistenceUnit) || transactional.requiresNew()) { |
100
|
1 |
|
LOG.debug( |
101
|
1 |
|
"New EntityManager needed, as requiresNew ({}) or different peristence unit ({}) defined.", |
102
|
1 |
|
transactional.requiresNew(), |
103
|
1 |
|
persistenceUnit |
104
|
|
|
); |
105
|
1 |
|
FutureTask<Object> future = new FutureTask<>(() -> { |
106
|
|
|
try { |
107
|
1 |
|
return handleTransactionalMethodExecution(joinPoint, transactional); |
108
|
1 |
|
} catch (Exception ex) { |
109
|
1 |
|
throw ex; |
110
|
1 |
|
} catch (Throwable ex) { |
111
|
1 |
|
throw new TransactionalExecutionException(ex); |
112
|
|
|
} |
113
|
|
|
}); |
114
|
|
|
try { |
115
|
1 |
|
LOG.trace("Start execution in own thread"); |
116
|
1 |
|
Thread thread = new Thread(future); |
117
|
1 |
|
thread.start(); |
118
|
1 |
|
return future.get(); // Waits, till the thread finishes |
119
|
1 |
|
} catch (ExecutionException ee) { |
120
|
1 |
|
if (ee.getCause() instanceof TransactionalExecutionException) { |
121
|
1 |
|
throw ee.getCause().getCause(); |
122
|
|
|
} else { |
123
|
1 |
|
throw ee.getCause(); |
124
|
|
|
} |
125
|
|
|
} |
126
|
|
|
} else { |
127
|
1 |
|
LOG.trace("Already inside a transactional context, proceed method execution"); |
128
|
1 |
|
return joinPoint.proceed(); |
129
|
|
|
} |
130
|
|
|
} |
131
|
|
|
|
132
|
|
|
/** |
133
|
|
|
* Starts a thread which will close the {@code em} after the specified {@code timeoutTime}. |
134
|
|
|
* |
135
|
|
|
* @param timeoutTime Time in milliseconds |
136
|
|
|
* @param em The {@link EntityManager}, which may be used of the aspected method |
137
|
|
|
* @param transaction The {@link EntityTransaction}, which will be marked as rollback-only if the timeout is reached |
138
|
|
|
* @return The started timeout thread |
139
|
|
|
*/ |
140
|
|
|
static Thread startTimeoutChecker(final int timeoutTime, EntityManager em, EntityTransaction transaction) { |
141
|
1 |
|
LOG.trace("Starts the timeout thread with {}ms", timeoutTime); |
142
|
1 |
|
Thread thread = new Thread(() -> { |
143
|
|
|
try { |
144
|
1 |
|
Thread.currentThread().setName("STRIX-TT"); |
145
|
1 |
|
Thread.sleep(timeoutTime); |
146
|
1 |
|
LOG.trace("Timeout thread reached timeout time ({}ms)", timeoutTime); |
147
|
1 |
|
if (em.isOpen()) { |
148
|
1 |
|
if (transaction.isActive()) { |
149
|
1 |
|
LOG.trace("Mark the transaction to rollbackOnly"); |
150
|
1 |
|
transaction.setRollbackOnly(); |
151
|
|
|
} |
152
|
1 |
|
LOG.trace("Close EntityManager"); |
153
|
1 |
|
em.close(); |
154
|
|
|
} |
155
|
1 |
|
} catch (InterruptedException ex) { |
156
|
|
|
// Ignore InterruptedException |
157
|
|
|
} |
158
|
1 |
|
}); |
159
|
1 |
|
thread.start(); |
160
|
1 |
|
return thread; |
161
|
|
|
} |
162
|
|
|
|
163
|
|
|
/** |
164
|
|
|
* Checks if {@code t} is marked as an exception, for which no rollback should be done. |
165
|
|
|
* |
166
|
|
|
* @param noRollbackFor List of no-rollback-exceptions |
167
|
|
|
* @param t The actual exception of the aspected method |
168
|
|
|
* @return {@code false}, if {@code t} or a superclass of {@code t} is an exception specified in |
169
|
|
|
* {@code noRollbackFor}. Otherwise {@code true}. |
170
|
|
|
*/ |
171
|
|
|
private static boolean checkNeedForRollback(Class<? extends Throwable>[] noRollbackFor, Throwable t) { |
172
|
1 |
|
for (Class<? extends Throwable> exceptionClass : noRollbackFor) { |
173
|
1 |
|
if (exceptionClass.isAssignableFrom(t.getClass())) { |
174
|
1 |
|
LOG.trace("Exception {} is expected, i.e. no rollback is needed", t.getClass()); |
175
|
1 |
|
return false; |
176
|
|
|
} |
177
|
|
|
} |
178
|
1 |
|
return true; |
179
|
|
|
} |
180
|
|
|
|
181
|
|
|
/** |
182
|
|
|
* Executes the aspected method within a session, i.e. opens and closes an {@link EntityManager} before and after |
183
|
|
|
* execution. |
184
|
|
|
* |
185
|
|
|
* @param function The function, in which the aspected method will be executed |
186
|
|
|
* @param persistenceUnit The persistence unit to identify the {@link EntityManagerFactory} from which the |
187
|
|
|
* {@link EntityManager} will be created. |
188
|
|
|
* @return The result of the aspected method |
189
|
|
|
* @throws Throwable If the aspected method throws an exception |
190
|
|
|
*/ |
191
|
|
|
private static Object executeWithSession( |
192
|
|
|
ThrowingFunction<EntityManager, Object, Throwable> function, |
193
|
|
|
String persistenceUnit |
194
|
|
|
) throws Throwable { |
195
|
1 |
|
LOG.trace("Create new EntityManager from persistence unit {}", persistenceUnit); |
196
|
1 |
|
EntityManager em = getEntityManagerFactory(persistenceUnit).createEntityManager(); |
197
|
|
|
try { |
198
|
1 |
|
PersistenceManager.setEntityManager(persistenceUnit, em); |
199
|
1 |
|
return function.apply(em); |
200
|
1 |
|
} finally { |
201
|
1 |
|
PersistenceManager.clearEntityManager(); |
202
|
1 |
|
if (em.isOpen()) { |
203
|
1 |
|
LOG.trace("Close EntityManager"); |
204
|
1 |
|
em.close(); |
205
|
|
|
} |
206
|
1 |
|
} |
207
|
|
|
} |
208
|
|
|
|
209
|
|
|
/** |
210
|
|
|
* Executes the aspected method within a transaction, i.e. opens and commits or rollbacks an |
211
|
|
|
* {@link EntityTransaction} before and after execution. |
212
|
|
|
* |
213
|
|
|
* @param function The function, which should be executed |
214
|
|
|
* @param persistenceUnit The persistence unit to identify the {@link EntityManagerFactory} from which the |
215
|
|
|
* {@link EntityManager} will be created. |
216
|
|
|
* @param timeoutTime The specified timeout time |
217
|
|
|
* @param noRollbackFor The specified list of exceptions |
218
|
|
|
* @param readOnly The specified value for read-only |
219
|
|
|
* @return The result of the aspected method |
220
|
|
|
* @throws Throwable If the aspected method throws an exception |
221
|
|
|
*/ |
222
|
|
|
private static Object executeWithTransaction( |
223
|
|
|
ThrowingFunction<EntityManager, Object, Throwable> function, |
224
|
|
|
String persistenceUnit, |
225
|
|
|
int timeoutTime, |
226
|
|
|
Class<? extends Throwable>[] noRollbackFor, |
227
|
|
|
boolean readOnly |
228
|
|
|
) throws Throwable { |
229
|
1 |
|
return executeWithSession((em) -> { |
230
|
1 |
|
EntityTransaction transaction = em.getTransaction(); // Will never be invoked on JTA EM |
231
|
1 |
|
boolean rollback = false; |
232
|
1 |
|
Thread timeoutThread = null; |
233
|
|
|
try { |
234
|
1 |
|
LOG.trace("Start a new transaction"); |
235
|
1 |
|
transaction.begin(); |
236
|
1 |
|
if (readOnly) { |
237
|
1 |
|
LOG.trace("Set transaction to be read-only"); |
238
|
1 |
|
transaction.setRollbackOnly(); |
239
|
|
|
} |
240
|
1 |
|
if (timeoutTime > 0) { |
241
|
1 |
|
timeoutThread = startTimeoutChecker(timeoutTime, em, transaction); |
242
|
|
|
} |
243
|
1 |
|
return function.apply(em); |
244
|
1 |
|
} catch (Throwable t) { |
245
|
1 |
|
rollback = checkNeedForRollback(noRollbackFor, t); |
246
|
1 |
|
throw t; |
247
|
1 |
|
} finally { |
248
|
1 |
|
if (timeoutThread != null && timeoutThread.isAlive()) { |
249
|
1 |
|
LOG.trace("Interrupt timeout thread"); |
250
|
1 |
|
timeoutThread.interrupt(); |
251
|
|
|
} |
252
|
1 |
|
if (em.isOpen() && transaction.isActive()) { |
253
|
1 |
|
if (rollback || transaction.getRollbackOnly()) { |
254
|
1 |
|
LOG.trace( |
255
|
1 |
|
"Rollback transaction because of unexpected exception ({}) or marked as read-only ({})", |
256
|
1 |
|
rollback, |
257
|
1 |
|
transaction.getRollbackOnly() |
258
|
|
|
); |
259
|
1 |
|
transaction.rollback(); |
260
|
1 |
|
} else { |
261
|
1 |
|
LOG.trace("Commit transaction"); |
262
|
1 |
|
transaction.commit(); |
263
|
|
|
} |
264
|
|
|
} |
265
|
1 |
|
} |
266
|
1 |
|
}, persistenceUnit); |
267
|
|
|
} |
268
|
|
|
|
269
|
|
|
/** |
270
|
|
|
* Opens an {@link EntityManagerFactory} for the provided {@code persistenceUnit} if not already opened/cached. |
271
|
|
|
* |
272
|
|
|
* @param persistenceUnit The name of the persistence unit |
273
|
|
|
* @return The {@link EntityManagerFactory} for {@code persistenceUnit} |
274
|
|
|
*/ |
275
|
|
|
private static synchronized EntityManagerFactory getEntityManagerFactory(String persistenceUnit) { |
276
|
1 |
|
if (persistenceUnit.isEmpty() && DEFAULT_PERSISTENCE_UNIT != null) { |
277
|
1 |
|
persistenceUnit = DEFAULT_PERSISTENCE_UNIT; |
278
|
|
|
} |
279
|
1 |
|
if (!SESSION_FACTORY_STORE.containsKey(persistenceUnit)) { |
280
|
1 |
|
LOG.debug("Create new EntityManagerFactory for persistence unit {}", persistenceUnit); |
281
|
1 |
|
SESSION_FACTORY_STORE.put( |
282
|
1 |
|
persistenceUnit, |
283
|
1 |
|
Persistence.createEntityManagerFactory( |
284
|
1 |
|
persistenceUnit.isEmpty() ? null : persistenceUnit, |
285
|
1 |
|
PERSISTENCE_PROPERTIES.get(persistenceUnit) |
286
|
|
|
) |
287
|
|
|
); |
288
|
|
|
} |
289
|
1 |
|
return SESSION_FACTORY_STORE.get(persistenceUnit); |
290
|
|
|
} |
291
|
|
|
|
292
|
|
|
/** |
293
|
|
|
* An only internal used exception |
294
|
|
|
*/ |
295
|
|
|
private static class TransactionalExecutionException extends RuntimeException { |
296
|
|
|
|
297
|
|
|
private TransactionalExecutionException(Throwable cause) { |
298
|
1 |
|
super(cause); |
299
|
1 |
|
} |
300
|
|
|
|
301
|
|
|
} |
302
|
|
|
} |
303
|
|
|
|