Completed
Push — master ( ab5ee9...632855 )
by John
01:33
created

Subscription.uri_join()   A

Complexity

Conditions 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
c 0
b 0
f 0
dl 0
loc 4
rs 10
1
require 'base64'
2
require 'logger'
3
require 'multi_json'
4
require 'observer'
5
require 'openssl'
6
require 'pubnub'
7
8
module RingCentralSdk::REST
9
  class Subscription
10
    include Observable
11
12
    RENEW_HANDICAP = 60
13
14
    attr_reader :event_filters
15
16
    def initialize(client)
17
      @_client = client
18
      @event_filters = []
19
      @_timeout = nil
20
      @_subscription = nil_subscription()
21
      @_pubnub = nil
22
      @_logger_prefix = " -- #{self.class.name}: "
23
    end
24
25
    def nil_subscription()
26
      return {
27
        'eventFilters'    => [],
28
        'expirationTime'  => '', # 2014-03-12T19:54:35.613Z
29
        'expiresIn'       => 0,
30
        'deliveryMode'    => {
31
          'transportType' => 'PubNub',
32
          'encryption'    => false,
33
          'address'       => '',
34
          'subscriberKey' => '',
35
          'secretKey'     => ''
36
        },
37
        'id'              => '',
38
        'creationTime'    => '', # 2014-03-12T19:54:35.613Z
39
        'status'          => '', # Active
40
        'uri'             => ''
41
      }
42
    end
43
44
    def pubnub()
45
      return @_pubnub
46
    end
47
48
    def register(events = nil)
49
      return alive? ? renew(events) : subscribe(events)
50
    end
51
52
    def add_events(events)
53
      unless events.is_a? Array
54
        raise 'Events is not an array.'
55
      end
56
      @event_filters.push(events) if events.length > 0
57
    end
58
59
    def set_events(events)
60
      unless events.is_a? Array
61
        raise 'Events is not an array.'
62
      end
63
      @event_filters = events
64
    end
65
66
    def subscribe(events=nil)
67
      set_events(events) if events.is_a? Array
68
69
      if !@event_filters.is_a?(Array) || @event_filters.length == 0
70
        raise 'Events are undefined'
71
      end
72
73
      begin
74
        response = @_client.http.post do |req|
75
          req.url 'subscription'
76
          req.headers['Content-Type'] = 'application/json'
77
          req.body = {
78
            eventFilters: @_client.create_urls(@event_filters),
79
            deliveryMode: {
80
              transportType: 'PubNub'
81
            }
82
          }
83
        end
84
        set_subscription response.body
85
        _subscribe_at_pubnub()
86
        changed
87
        notify_observers response
88
        return response
89
      rescue StandardError => e
90
        reset()
91
        changed
92
        notify_observers(e)
93
        raise 'Subscribe HTTP Request Error: ' + e.to_s
94
      end
95
    end
96
97
    def renew(events = nil)
98
      set_events(events) if events.is_a? Array
99
100
      unless alive?
101
        raise 'Subscription is not alive'
102
      end
103
104
      if !@event_filters.is_a?(Array) || @event_filters.length ==0
105
        raise 'Events are undefined'
106
      end
107
      _clear_timeout
108
109
      begin
110
        response = @_client.http.post do |req|
111
          req.url uri_join(@_subscription['uri'], 'renew')
112
          req.headers['Content-Type'] = 'application/json'
113
        end
114
115
        set_subscription response.body
116
        changed
117
        notify_observers response
118
119
        return response
120
      rescue StandardError => e
121
        puts "RingCentralSdk::REST::Subscription: RENEW_ERROR #{e}"
122
        reset()
123
        changed
124
        notify_observers e
125
        raise 'Renew HTTP Request Error'
126
      end
127
    end
128
129
    def remove()
130
      unless alive?
131
        raise 'Subscription is not alive'
132
      end
133
134
      begin
135
        response = @_client.http.delete do |req|
136
          req.url 'subscription/' + @_subscription['id'].to_s
137
        end
138
        reset()
139
        changed
140
        notify_observers response.body
141
        return response
142
      rescue StandardError => e
143
        reset()
144
        changed
145
        notify_observers e
146
      end
147
    end
148
149
    def alive?
150
      s = @_subscription
151
      return (s.has_key?('deliveryMode') && s['deliveryMode']) && \
152
        (s['deliveryMode'].has_key?('subscriberKey') && s['deliveryMode']['subscriberKey']) && \
153
        (
154
          s['deliveryMode'].has_key?('address') && s['deliveryMode']['address'] && \
155
          s['deliveryMode']['address'].length>0) \
156
        ? true : false
157
    end
158
159
    def subscription
160
      return @_subscription
161
    end
162
163
    def set_subscription(data)
164
      _clear_timeout
165
      @_subscription = data
166
      _set_timeout
167
    end
168
169
    def reset
170
      _clear_timeout()
171
      _unsubscribe_at_pubnub()
172
      @_subscription = nil_subscription()
173
    end
174
175
    def destroy
176
      reset()
177
    end
178
179
    def _subscribe_at_pubnub
180
      unless alive?
181
        raise 'Subscription is not alive'
182
      end
183
184
      s_key = @_subscription['deliveryMode']['subscriberKey']
185
186
      @_pubnub = new_pubnub(s_key, false, '')
187
188
      callback = lambda { |envelope|
189
      	_notify(envelope.msg)
190
      	changed
191
      }
192
193
      @_pubnub.subscribe(
194
        channel:    @_subscription['deliveryMode']['address'],
195
        callback:   callback,
196
        error:      lambda { |envelope| puts('ERROR: ' + envelope.msg.to_s) },
197
        connect:    lambda { |envelope| puts('CONNECTED') },
198
        reconnect:  lambda { |envelope| puts('RECONNECTED') },
199
        disconnect: lambda { |envelope| puts('DISCONNECTED') }
200
      )
201
    end
202
203
    def _notify(message)
204
      count = count_observers
205
      count_string = " -- RingCentralSdk::REST::Subscription: Notify #{count.to_s} observers"
206
      puts count_string
207
208
      message = _decrypt message
209
      changed
210
      notify_observers message
211
    end
212
213
    def _decrypt(message)
214
      unless alive?
215
        raise 'Subscription is not alive'
216
      end
217
218
      if _encrypted?()
219
        delivery_mode = @_subscription['deliveryMode']
220
221
        cipher = OpenSSL::Cipher::AES.new(128, :ECB)
222
        cipher.decrypt
223
        cipher.key = Base64.decode64(delivery_mode['encryptionKey'].to_s)
224
225
        ciphertext = Base64.decode64(message)
226
        plaintext = cipher.update(ciphertext) + cipher.final
227
228
        message = MultiJson.decode(plaintext, symbolize_keys: false)
229
      end
230
231
      return message
232
    end
233
234
    def _encrypted?
235
      delivery_mode = @_subscription['deliveryMode']
236
      is_encrypted  = delivery_mode.has_key?('encryption') && \
237
        delivery_mode['encryption']                        && \
238
        delivery_mode.has_key?('encryptionKey')            && \
239
        delivery_mode['encryptionKey']
240
      return is_encrypted
241
    end
242
243
    def _unsubscribe_at_pubnub
244
      if @_pubnub && alive?()
245
        @_pubnub.unsubscribe(channel: @_subscription['deliveryMode']['address']) do |envelope|
246
          # puts envelope.message
247
        end
248
      end
249
    end
250
251
    def _set_timeout
252
      _clear_timeout
253
254
      time_to_expiration = @_subscription['expiresIn'] - RENEW_HANDICAP
255
256
      @_timeout = Thread.new do
257
        sleep time_to_expiration
258
        renew
259
      end
260
    end
261
262
    def _clear_timeout
263
      @_timeout.exit if @_timeout.is_a?(Thread) && @_timeout.status == 'sleep'
264
      @_timeout = nil
265
    end
266
267
    def uri_join(*args)
268
      url = args.join('/').gsub(/\/+/, '/')
269
      return url.gsub(/^(https?:\/)/i, '\1/')
270
    end
271
272
    def new_pubnub(subscribe_key='', ssl_on=false, publish_key='', my_logger=nil)
273
      my_logger = Logger.new(STDOUT) if my_logger.nil?
274
275
      return Pubnub.new(
276
        subscribe_key: subscribe_key.to_s,
277
        publish_key: publish_key.to_s,
278
        error_callback: lambda { |msg|
279
          puts "Error callback says: #{msg.inspect}"
280
        },
281
        connect_callback: lambda { |msg|
282
          puts "CONNECTED: #{msg.inspect}"
283
        },
284
        logger: my_logger
285
      )
286
    end
287
  end
288
end
289