Completed
Push — master ( 3bbaa7...673749 )
by John
01:30
created

Subscription._subscribe_at_pubnub()   A

Complexity

Conditions 2

Size

Total Lines 23

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 2
c 2
b 0
f 0
dl 0
loc 23
rs 9.0856
1
require 'base64'
2
require 'logger'
3
require 'multi_json'
4
require 'observer'
5
require 'openssl'
6
require 'pubnub'
7
8
module RingCentralSdk::REST
9
  class Subscription
10
    include Observable
11
12
    RENEW_HANDICAP = 60
13
14
    attr_reader :event_filters
15
16
    def initialize(client)
17
      @_client = client
18
      @event_filters = []
19
      @_timeout = nil
20
      @_subscription = nil_subscription()
21
      @_pubnub = nil
22
    end
23
24
    def nil_subscription()
25
      return {
26
        'eventFilters'    => [],
27
        'expirationTime'  => '', # 2014-03-12T19:54:35.613Z
28
        'expiresIn'       => 0,
29
        'deliveryMode'    => {
30
          'transportType' => 'PubNub',
31
          'encryption'    => false,
32
          'address'       => '',
33
          'subscriberKey' => '',
34
          'secretKey'     => ''
35
        },
36
        'id'              => '',
37
        'creationTime'    => '', # 2014-03-12T19:54:35.613Z
38
        'status'          => '', # Active
39
        'uri'             => ''
40
      }
41
    end
42
43
    def pubnub()
44
      return @_pubnub
45
    end
46
47
    def register(events = nil)
48
      return alive? ? renew(events) : subscribe(events)
49
    end
50
51
    def add_events(events)
52
      unless events.is_a? Array
53
        raise 'Events is not an array.'
54
      end
55
      @event_filters.push(events) if events.length > 0
56
    end
57
58
    def set_events(events)
59
      unless events.is_a? Array
60
        raise 'Events is not an array.'
61
      end
62
      @event_filters = events
63
    end
64
65 View Code Duplication
    def subscribe(events=nil)
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
66
      set_events(events) if events.is_a? Array
67
68
      if !@event_filters.is_a?(Array) || @event_filters.length == 0
69
        raise 'Events are undefined'
70
      end
71
72
      begin
73
        response = @_client.http.post do |req|
74
          req.url 'subscription'
75
          req.headers['Content-Type'] = 'application/json'
76
          req.body = {
77
            eventFilters: @_client.create_urls(@event_filters),
78
            deliveryMode: {
79
              transportType: 'PubNub'
80
            }
81
          }
82
        end
83
        set_subscription response.body
84
        _subscribe_at_pubnub()
85
        changed
86
        notify_observers response
87
        return response
88
      rescue StandardError => e
89
        reset()
90
        changed
91
        notify_observers(e)
92
        raise 'Subscribe HTTP Request Error: ' + e.to_s
93
      end
94
    end
95
96 View Code Duplication
    def renew(events = nil)
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
97
      set_events(events) if events.is_a? Array
98
99
      unless alive?
100
        raise 'Subscription is not alive'
101
      end
102
103
      if !@event_filters.is_a?(Array) || @event_filters.length ==0
104
        raise 'Events are undefined'
105
      end
106
 
107
      _clear_timeout()
108
109
      begin
110
        response = @_client.http.put do |req|
111
          req.url 'subscription/' + @_subscription['id'].to_s
112
          req.headers['Content-Type'] = 'application/json'
113
          req.body = {
114
            eventFilters: @_client.create_urls(@event_filters)
115
          }
116
        end
117
118
        set_subscription response.body
119
        changed
120
        notify_observers response
121
        return response
122
      rescue StandardError => e
123
        reset()
124
        changed
125
        notify_observers e
126
        raise 'Renew HTTP Request Error'
127
      end
128
    end
129
130
    def remove()
131
      unless alive?
132
        raise 'Subscription is not alive'
133
      end
134
135
      begin
136
        response = @_client.http.delete do |req|
137
          req.url 'subscription/' + @_subscription['id'].to_s
138
        end
139
        reset()
140
        changed
141
        notify_observers response.body
142
        return response
143
      rescue StandardError => e
144
        reset()
145
        changed
146
        notify_observers e
147
      end
148
    end
149
150
    def alive?
151
      s = @_subscription
152
      return (s.has_key?('deliveryMode') && s['deliveryMode']) && \
153
        (s['deliveryMode'].has_key?('subscriberKey') && s['deliveryMode']['subscriberKey']) && \
154
        (
155
          s['deliveryMode'].has_key?('address') && s['deliveryMode']['address'] && \
156
          s['deliveryMode']['address'].length>0) \
157
        ? true : false
158
    end
159
160
    def subscription
161
      return @_subscription
162
    end
163
164
    def set_subscription(data)
165
      _clear_timeout()
166
      @_subscription = data
167
      _set_timeout()
168
    end
169
170
    def reset
171
      _clear_timeout()
172
      _unsubscribe_at_pubnub()
173
      @_subscription = nil_subscription()
174
    end
175
176
    def destroy
177
      reset()
178
    end
179
180
    def _subscribe_at_pubnub
181
      unless alive?
182
        raise 'Subscription is not alive'
183
      end
184
185
      s_key = @_subscription['deliveryMode']['subscriberKey']
186
187
      @_pubnub = new_pubnub(s_key, false, '')
188
189
      callback = lambda { |envelope|
190
      	_notify(envelope.msg)
191
      	changed
192
      }
193
194
      @_pubnub.subscribe(
195
        channel:    @_subscription['deliveryMode']['address'],
196
        callback:   callback,
197
        error:      lambda { |envelope| puts('ERROR: ' + envelope.msg.to_s) },
198
        connect:    lambda { |envelope| puts('CONNECTED') },
199
        reconnect:  lambda { |envelope| puts('RECONNECTED') },
200
        disconnect: lambda { |envelope| puts('DISCONNECTED') }
201
      )
202
    end
203
204
    def _notify(message)
205
      message = _decrypt message
206
      changed
207
      notify_observers message
208
    end
209
210
    def _decrypt(message)
211
      unless alive?
212
        raise 'Subscription is not alive'
213
      end
214
215
      if _encrypted?()
216
        delivery_mode = @_subscription['deliveryMode']
217
218
        cipher = OpenSSL::Cipher::AES.new(128, :ECB)
219
        cipher.decrypt
220
        cipher.key = Base64.decode64(delivery_mode['encryptionKey'].to_s)
221
222
        ciphertext = Base64.decode64(message)
223
        plaintext = cipher.update(ciphertext) + cipher.final
224
225
        message = MultiJson.decode(plaintext, symbolize_keys: false)
226
      end
227
228
      return message
229
    end
230
231
    def _encrypted?
232
      delivery_mode = @_subscription['deliveryMode']
233
      is_encrypted  = delivery_mode.has_key?('encryption') && \
234
        delivery_mode['encryption']                        && \
235
        delivery_mode.has_key?('encryptionKey')            && \
236
        delivery_mode['encryptionKey']
237
      return is_encrypted
238
    end
239
240
    def _unsubscribe_at_pubnub
241
      if @_pubnub && alive?()
242
        @_pubnub.unsubscribe(channel: @_subscription['deliveryMode']['address']) do |envelope|
243
          # puts envelope.message
244
        end
245
      end
246
    end
247
248
    def _set_timeout
249
      time_to_expiration = @_subscription['expiresIn'] - RENEW_HANDICAP
250
251
      @_timeout = Thread.new do
252
        sleep time_to_expiration
253
        renew
254
      end
255
    end
256
257
    def _clear_timeout
258
      @_timeout = nil
259
    end
260
261
    def new_pubnub(subscribe_key='', ssl_on=false, publish_key='', my_logger=nil)
262
      my_logger = Logger.new(STDOUT) if my_logger.nil?
263
264
      return Pubnub.new(
265
        subscribe_key: subscribe_key.to_s,
266
        publish_key: publish_key.to_s,
267
        error_callback: lambda { |msg|
268
          puts "Error callback says: #{msg.inspect}"
269
        },
270
        connect_callback: lambda { |msg|
271
          puts "CONNECTED: #{msg.inspect}"
272
        },
273
        logger: my_logger
274
      )
275
    end
276
  end
277
end
278