1
|
|
|
from datetime import timedelta |
2
|
|
|
from socket import gethostname |
3
|
|
|
import sys |
4
|
|
|
from uuid import uuid4 |
5
|
|
|
|
6
|
|
|
from kazoo.client import KazooClient |
7
|
|
|
|
8
|
|
|
from st2actions.runners.pythonrunner import Action |
9
|
|
|
|
10
|
|
|
|
11
|
|
|
class NonBlockingLease(Action): |
12
|
|
|
""" |
13
|
|
|
Exclusive lease that does not block. |
14
|
|
|
|
15
|
|
|
An exclusive lease ensures that only one client at a time owns the lease. |
16
|
|
|
The action exits successfully if the lease was obtained and with code 1 otherwise. |
17
|
|
|
|
18
|
|
|
A common use case is a situation where a task should only run on a single host. In this case, |
19
|
|
|
the clients that did not obtain the lease should exit without performing the protected task. |
20
|
|
|
|
21
|
|
|
The lease stores time stamps using client clocks, and will therefore only work if client clocks |
22
|
|
|
are roughly synchronised. It uses UTC, and works across time zones and daylight savings. |
23
|
|
|
""" |
24
|
|
|
|
25
|
|
|
# NOTE: the drift on the clocks running cronjobs controlled by this lease must be |
26
|
|
|
# less than the frequency - the lease_time. For example, a cronjob running every |
27
|
|
|
# minute with a lease_time of 30s means that your clocks must all be within 30s of |
28
|
|
|
# each other or nobody will ever get the lease. |
29
|
|
|
# |
30
|
|
|
# Changing the threshold allows you to decide whether it's more important that |
31
|
|
|
# the job always runs (and sometimes twice) or if it's better to run only once |
32
|
|
|
# or not at all. With a lease_time of 50, if clocks drift by more than 10s, it |
33
|
|
|
# will appear as though the lease is always taken and your job will never run. |
34
|
|
|
# (Conversely, with a lease_time of 10 and a drift of >10s, two hosts will both |
35
|
|
|
# get a lease, running your job twice.) |
36
|
|
|
|
37
|
|
|
def __init__(self, config): |
38
|
|
|
super(NonBlockingLease, self).__init__(config) |
39
|
|
|
self.hosts = self.config['zookeeper_hosts'] |
40
|
|
|
self.root = self.config['zookeeper_root'] |
41
|
|
|
|
42
|
|
|
def run(self, lease_name, lease_time): |
43
|
|
|
zk = KazooClient(hosts=self.hosts) |
44
|
|
|
zk.start() |
45
|
|
|
path = "%s/%s" % (self.root, lease_name) |
46
|
|
|
# unique id ensures only one action execution if multiple executions on the same host |
47
|
|
|
identifier = '%s: %s' % (gethostname(), uuid4()) |
48
|
|
|
duration = timedelta(seconds=lease_time) |
49
|
|
|
lease = zk.NonBlockingLease(path, duration, identifier=identifier) |
50
|
|
|
if not lease: |
51
|
|
|
sys.exit(1) |
52
|
|
|
|