Completed
Push — master ( b7118a...c3970f )
by John
01:10
created

Subscription.set_events()   A

Complexity

Conditions 2

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 2
dl 0
loc 6
rs 9.4285
1
require 'base64'
2
require 'logger'
3
require 'multi_json'
4
require 'observer'
5
require 'openssl'
6
require 'pubnub'
7
require 'timers'
8
9
module RingCentralSdk::REST
10
  class Subscription
11
    include Observable
12
13
    RENEW_HANDICAP = 60
14
15
    attr_reader :event_filters
16
17
    def initialize(client)
18
      @_client = client
19
      @event_filters = []
20
      @_timeout = nil
21
      @_subscription = nil_subscription()
22
      @_pubnub = nil
23
    end
24
25
    def nil_subscription()
26
      subscription       =  {
27
        'eventFilters'    => [],
28
        'expirationTime'  => '', # 2014-03-12T19:54:35.613Z
29
        'expiresIn'       => 0,
30
        'deliveryMode'    => {
31
          'transportType' => 'PubNub',
32
          'encryption'    => false,
33
          'address'       => '',
34
          'subscriberKey' => '',
35
          'secretKey'     => ''
36
        },
37
        'id'              => '',
38
        'creationTime'    => '', # 2014-03-12T19:54:35.613Z
39
        'status'          => '', # Active
40
        'uri'             => ''
41
      }
42
      return subscription
43
    end
44
45
    def pubnub()
46
      return @_pubnub
47
    end
48
49
    def register(events=nil)
50
      return alive?() ? renew(events) : subscribe(events)
51
    end
52
53
    def add_events(events)
54
      unless events.is_a?(Array)
55
        raise 'Events is not an array.'
56
      end
57
      @event_filters.push(events) if events.length>0
58
    end
59
60
    def set_events(events)
61
      unless events.is_a?(Array)
62
        raise 'Events is not an array.'
63
      end
64
      @event_filters = events
65
    end
66
67
    def subscribe(events=nil)
68
      set_events(events) if events.is_a?(Array)
69
70
      if !@event_filters.is_a?(Array) || @event_filters.length ==0
71
        raise 'Events are undefined'
72
      end
73
74
      begin
75
        response = @_client.http.post do |req|
76
          req.url 'subscription'
77
          req.body = {
78
            :eventFilters    => @_client.create_urls(@event_filters),
79
            :deliveryMode    => {
80
              :transportType => 'PubNub'
81
            }
82
          }
83
        end
84
        set_subscription(response.body)
85
        _subscribe_at_pubnub()
86
        changed
87
        notify_observers(response)
88
        return response
89
      rescue StandardError => e
90
        reset()
91
        changed
92
        notify_observers(e)
93
        raise 'Subscribe HTTP Request Error'
94
      end
95
96
    end
97
98
    def renew(events=nil)
99
      set_events(events) if events.is_a?(Array)
100
101
      unless alive?()
102
        raise 'Subscription is not alive'
103
      end
104
105
      if !@event_filters.is_a?(Array) || @event_filters.length ==0
106
        raise 'Events are undefined'
107
      end
108
 
109
      _clear_timeout()
110
111
      begin
112
        response = @_client.http.put do |req|
113
          req.url 'subscription/' + @_subscription['id'].to_s
114
          req.body = {
115
            :eventFilters => @_client.create_urls(@event_filters),
116
          }
117
        end
118
119
        set_subscription(response.body)
120
        changed
121
        notify_observers(response)
122
        return response
123
      rescue StandardError => e
124
        reset()
125
        changed
126
        notify_observers(e)
127
        raise 'Renew HTTP Request Error'
128
      end
129
    end
130
131
    def remove()
132
      unless alive?()
133
        raise 'Subscription is not alive'
134
      end
135
136
      begin
137
        url = 'subscription/' + @_subscription['id'].to_s
138
        response = @_client.http.delete do |req|
139
          req.url = 'subscription' + @_subscription['id']
140
        end
141
        reset()
142
        changed
143
        notify_observers(response.body)
144
        return response
145
      rescue StandardError => e
146
        reset()
147
        changed
148
        notify_observers(e)
149
      end
150
    end
151
152
    def alive?()
153
      s = @_subscription
154
      return (s.has_key?('deliveryMode') && s['deliveryMode']) && \
155
        (s['deliveryMode'].has_key?('subscriberKey') && s['deliveryMode']['subscriberKey']) && \
156
        (
157
          s['deliveryMode'].has_key?('address') && s['deliveryMode']['address'] && \
158
          s['deliveryMode']['address'].length>0) \
159
        ? true : false
160
    end
161
162
    def subscription()
163
      return @_subscription
164
    end
165
166
    def set_subscription(data)
167
      _clear_timeout()
168
      @_subscription = data
169
      _set_timeout()
170
    end
171
172
    def reset()
173
      _clear_timeout()
174
      _unsubscribe_at_pubnub()
175
      @_subscription = nil_subscription()
176
    end
177
178
    def destroy()
179
      reset()
180
    end
181
182
    def _subscribe_at_pubnub()
183
      if ! alive?()
184
        raise 'Subscription is not alive'
185
      end
186
187
      s_key = @_subscription['deliveryMode']['subscriberKey']
188
189
      @_pubnub = new_pubnub(s_key, false, '')
190
191
      callback = lambda { |envelope|
192
      	_notify(envelope.msg)
193
      	changed
194
      	notify_observers('GOT_PUBNUB_MESSAGE_NOTIFY')
195
      }
196
197
      @_pubnub.subscribe(
198
        :channel    => @_subscription['deliveryMode']['address'],
199
        :callback   => callback,
200
        :error      => lambda { |envelope| puts('ERROR: ' + envelope.msg.to_s) },
201
        :connect    => lambda { |envelope| puts('CONNECTED') },
202
        :reconnect  => lambda { |envelope| puts('RECONNECTED') },
203
        :disconnect => lambda { |envelope| puts('DISCONNECTED') }
204
      )
205
    end
206
207
    def _notify(message)
208
      message = _decrypt(message)
209
      changed
210
      notify_observers(message)
211
    end
212
213
    def _decrypt(message)
214
      unless alive?()
215
        raise 'Subscription is not alive'
216
      end
217
218
      if _encrypted?()
219
        delivery_mode = @_subscription['deliveryMode']
220
221
        cipher = OpenSSL::Cipher::AES.new(128, :ECB)
222
        cipher.decrypt
223
        cipher.key = Base64.decode64(delivery_mode['encryptionKey'].to_s)
224
225
        ciphertext = Base64.decode64(message)
226
        plaintext = cipher.update(ciphertext) + cipher.final
227
228
        message = MultiJson.decode(plaintext)
229
      end
230
231
      return message
232
    end
233
234
    def _encrypted?()
235
      delivery_mode = @_subscription['deliveryMode']
236
      is_encrypted  = delivery_mode.has_key?('encryption') && \
237
        delivery_mode['encryption']                        && \
238
        delivery_mode.has_key?('encryptionKey')            && \
239
        delivery_mode['encryptionKey']
240
      return is_encrypted
241
    end
242
243
    def _unsubscribe_at_pubnub()
244
      if @_pubnub && alive?()
245
        @_pubnub.unsubscribe(channel: @_subscription['deliveryMode']['address']) do |envelope|
246
          # puts envelope.message
247
        end
248
      end
249
    end
250
251
    def _set_timeout()
252
      time_to_expiration = @_subscription['expiresIn'] - RENEW_HANDICAP
253
      @_timeout = Timers::Group.new
254
      @_timeout.after(time_to_expiration) do
255
        renew()
256
      end
257
    end
258
259
    def _clear_timeout()
260
      if @_timeout.is_a?(Timers::Group)
261
        @_timeout.cancel()
262
      end
263
    end
264
265
    def new_pubnub(subscribe_key='', ssl_on=false, publish_key='', my_logger=nil)
266
      my_logger = Logger.new(STDOUT) if my_logger.nil?
267
268
      return Pubnub.new(
269
        :subscribe_key    => subscribe_key.to_s,
270
        :publish_key      => publish_key.to_s,
271
        :error_callback   => lambda { |msg|
272
          puts "Error callback says: #{msg.inspect}"
273
        },
274
        :connect_callback => lambda { |msg|
275
          puts "CONNECTED: #{msg.inspect}"
276
        },
277
        :logger => my_logger
278
      )
279
    end
280
281
  end
282
end