Passed
Push — master ( cd1d60...fb47b8 )
by Michael
02:11 queued 10s
created

WebSocketClient.initSocket   C

Complexity

Conditions 11

Size

Total Lines 60
Code Lines 51

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 51
dl 0
loc 60
rs 5.3836
c 0
b 0
f 0
cc 11

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like WebSocketClient.initSocket often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import './types'
2
import { Zipnum } from 'zipnum'
3
import { add_event, rm_event, sett } from './utils'
4
import { processConfig } from './config'
5
import { AnyFunc, both, callWith, F, isNil, notf, once, qfilter, T, typeIs } from 'pepka'
6
7
const MAX_32 = 2**31 - 1
8
const zipnum = new Zipnum()
9
const callit = callWith([])
10
const isNumber = both(typeIs('Number'), notf(isNaN))
11
12
type EventHandler<T extends keyof WebSocketEventMap> = AnyFunc<any, [WebSocketEventMap[T]]>
13
type EventHandlers = {
14
  open: EventHandler<'open'>[]
15
  close: EventHandler<'close'>[]
16
  error: EventHandler<'error'>[]
17
  message: AnyFunc<any, [WebSocketEventMap['message'] & {data: any}]>[]
18
  timeout: AnyFunc<any, [data: any]>[]
19
}
20
21
class WebSocketClient {
22
  private ws: wsc.Socket|null = null
23
  private intentionally_closed = false
24
  private reconnect_timeout: NodeJS.Timeout|null = null
25
  private queue: Record<string, wsc.Message> = {}
26
  private onReadyQueue: AnyFunc[] = []
27
  private onCloseQueue: AnyFunc[] = []
28
  private handlers: EventHandlers = { open: [], close: [], message: [], error: [], timeout: [] }
29
  private config = <wsc.Config>{}
30
  private ping_timer: NodeJS.Timeout|null = null
31
  private idle_timer: NodeJS.Timeout|null = null
32
  private get opened() { return this.ws?.readyState===1 }  // The only opened state.
33
34
  private init_flush(): void {
35
    // TODO: reject them or save somehow ?..
36
    qfilter(F, this.queue)
37
  }
38
  private call(event_name: wsc.WSEvent, ...args: any[]) {
39
    for(const h of this.handlers[event_name]) h(...args)
40
  }
41
42
  private log(event: string, message: any = null, time: number|null = null): void {
43
    const {config} = this
44
    if(time === null)
45
      if(config.timer) config.log(event, null, message)
46
      else config.log(event, message)
47
    else
48
      config.log(event, time, message)
49
  }
50
51
  private resetPing() {
52
    const {config: {ping}, ping_timer} = this
53
    if(ping) {
54
      if(!isNil(ping_timer))
55
        clearTimeout(ping_timer as NodeJS.Timeout)
56
      this.ping_timer = sett(ping.interval*1e3, async () => {
57
        const {ping_timer, opened} = this
58
        if(opened) {
59
          await this.send(ping.content)
60
          this.resetPing()
61
        } else clearTimeout(ping_timer!)
62
      })
63
    }
64
  }
65
66
  private resetIdle() {
67
    const {config: {max_idle_time: time}, idle_timer} = this
68
    if(time!==Infinity) {
69
      if(!isNil(idle_timer)) clearTimeout(idle_timer!)
70
      this.idle_timer = sett(time*1e3, () => this.opened && this.close())
71
    }
72
  }
73
74
  private initSocket(ws: wsc.Socket) {
75
    const {queue, config} = this
76
    this.ws = ws
77
    this.onReadyQueue.forEach((fn: Function) => fn())
78
    this.onReadyQueue.splice(0)
79
    const {id_key, data_key} = config.server
80
    // Works also on previously opened sockets that do not fire 'open' event.
81
    this.call('open', ws)
82
    for(const msg_id in queue) ws.send(queue[msg_id].msg)
83
    if(this.reconnect_timeout !== null) {
84
      clearInterval(this.reconnect_timeout)
85
      this.reconnect_timeout = null
86
    }
87
    this.resetPing(); this.resetIdle()
88
    add_event(ws, 'close', async (...e) => {
89
      this.log('close')
90
      this.ws = null
91
      this.onCloseQueue.forEach(callit)
92
      this.onCloseQueue.splice(0)
93
      this.call('close', ...e)
94
      // Auto reconnect.
95
      let {reconnect, reconnection_attempts} = config
96
      if(isNumber(reconnect)) {
97
        const reconnectFunc = async () => {
98
          if(this.intentionally_closed || !reconnection_attempts) return;
99
          reconnection_attempts--
100
          this.log('reconnect')
101
          if(!isNil(this.ws)) {
102
            this.ws!.close()
103
            this.ws = null
104
          }
105
          // If some error occured, try again.
106
          const status = await this.connect()
107
          if(!isNil(status))
108
            this.reconnect_timeout = setTimeout(reconnectFunc, reconnect*1e3)
109
        }
110
        // TODO: test normal close by server. Would it be infinite ?
111
        reconnectFunc()
112
      }
113
    })
114
    add_event(ws, 'message', (e) => {
115
      try {
116
        const data = config.decode(e.data)
117
        this.call('message', {...e, data})
118
        if(data[id_key]) {
119
          const q = this.queue[data[id_key]]
120
          if(q) {
121
            // Debug, Log.
122
            const time = q.sent_time ? (Date.now() - q.sent_time) : null
123
            this.log('message', data[data_key], time)
124
            // Play.
125
            q.ff(data[data_key])
126
          }
127
        }
128
      } catch (err) {
129
        console.error(err, `WSP: Decode error. Got: ${e.data}`)
130
      }
131
      this.resetPing()
132
      this.resetIdle()
133
    })
134
  }
135
136
  private opening = false
137
  private connect() { // returns status if won't open or null if ok.
138
    return new Promise<null|number>((ff) => {
139
      if(this.opened||this.opening) return ff(null)
140
      this.opening = true
141
      const config = this.config
142
      const ws = config.socket || config.adapter(config.url, config.protocols)
143
      if(!ws || ws.readyState > 1) {
144
        this.opening = false
145
        this.ws = null
146
        this.log('error', 'ready() on closing or closed state! status 2.')
147
        return ff(2)
148
      }
149
      const ffo = once((s: null|number) => {this.opening=false; ff(s)})
150
      add_event(ws, 'error', once((e) => {
151
        this.ws = null
152
        this.log('error', 'status 3. Err: '+e.message)
153
        this.call('error', e)
154
        // Some network error: Connection refused or so.
155
        ffo(3)
156
      }))
157
      // Because 'open' won't be envoked on opened socket.
158
      if(ws.readyState) {
159
        this.initSocket(ws)
160
        ffo(null)
161
      } else {
162
        add_event(ws, 'open', once(() => {
163
          this.log('open')
164
          this.initSocket(ws)
165
          ffo(null)
166
        }))
167
      }
168
    })
169
  }
170
  public get socket() { return this.ws }
171
  public async ready() {
172
    return new Promise<void>((ff) => {
173
      if(this.opened) ff()
174
      else this.onReadyQueue.push(ff)
175
    })
176
  }
177
  public on(
178
    event_name: wsc.WSEvent,
179
    handler: (data: any) => any,
180
    predicate: (data: any) => boolean = T,
181
    raw = false
182
  ) {
183
    const _handler: wsc.EventHandler = (event) =>
184
      predicate(event) && handler(event)
185
    if(raw) add_event(this.ws as wsc.Socket, event_name, _handler)
186
    else this.handlers[event_name].push(_handler)
187
    return _handler
188
  }
189
  public off(
190
    event_name: wsc.WSEvent,
191
    handler: (data: any) => any,
192
    raw = false
193
  ) {
194
    if(raw) return rm_event(this.ws as wsc.Socket, event_name, handler)
195
    const handlers = this.handlers[event_name]
196
    const i = handlers.indexOf(handler)
197
    if(~i) handlers.splice(i, 1)
198
  }
199
200
  public async close(): wsc.AsyncErrCode {
201
    return new Promise((ff, rj) => {
202
      if(this.ws === null) {
203
        rj('WSP: closing a non-inited socket!')
204
      } else {
205
        this.onCloseQueue.push(() => {
206
          this.init_flush()
207
          ff(null)
208
        })
209
        this.ws.close()
210
        this.ws = null
211
        this.intentionally_closed = true
212
      }
213
    })
214
  }
215
216
  public open() {
217
    if(!this.opened) {
218
      this.intentionally_closed = false
219
      return this.connect()
220
    }
221
  }
222
223
  /**  .send(your_data) wraps request to server with {id: `hash`, data: `actually your data`},
224
    returns a Promise that will be rejected after a timeout or
225
    resolved if server returns the same signature: {id: `same_hash`, data: `response data`}.
226
  */
227
  public async send<RequestDataType = any, ResponseDataType = any>(
228
    message_data: RequestDataType,
229
    opts = <wsc.SendOptions>{}
230
  ): Promise<ResponseDataType> {
231
    this.log('send', message_data)
232
    const {config, queue} = this
233
    const message = {}
234
    const {pipes, server: {data_key}} = config
235
236
    const message_id = zipnum.zip((Math.random()*(MAX_32-10))|0)
237
    if(typeof opts.top === 'object') {
238
      if(opts.top[data_key]) {
239
        throw new Error('Attempting to set data key/token via send() options!')
240
      }
241
      Object.assign(message, opts.top)
242
    }
243
    for(const pipe of pipes) message_data = pipe(message_data)
244
    const [msg, err] = await Promise.all([
245
      config.encode(message_id, message_data, config),
246
      this.connect()
247
    ])
248
    if(err) throw new Error('ERR while opening connection #'+err)
249
    if(this.opened) {
250
      this.ws!.send(msg)
251
      this.resetPing()
252
      this.resetIdle()
253
    }
254
255
    return new Promise((ff, rj) => {
256
      this.queue[message_id] = {
257
        msg, ff(x: any) {
258
          clearTimeout(this.timeout) // from this object!
259
          delete queue[message_id]
260
          ff(x)
261
        },
262
        data_type: config.data_type,
263
        sent_time: config.timer ? Date.now() : null,
264
        timeout: sett(config.timeout, () => {
265
          if(message_id in this.queue) {
266
            this.call('timeout', message_data)
267
            rj({'Websocket timeout expired': config.timeout, 'for the message': message_data})
268
            delete queue[message_id]
269
          }
270
        })
271
      }
272
    })
273
  }
274
275
  // TODO: Add .on handlers to config!
276
  constructor(user_config: wsc.UserConfig = {}) {
277
    this.config = processConfig(user_config)
278
    if(!this.config.lazy) this.connect()
279
  }
280
}
281
282
/* TODO: v3: @.deprecated. Use named import { WebSocketClient } instead. */
283
export default WebSocketClient