Passed
Branch master (5df6c0)
by Michael
02:01
created

WebSocketClient.send   C

Complexity

Conditions 9

Size

Total Lines 52
Code Lines 40

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 40
dl 0
loc 52
rs 6.5866
c 0
b 0
f 0
cc 9

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
import './types'
2
import { Zipnum } from 'zipnum'
3
import { add_event, sett } from './utils'
4
import { processConfig } from './config'
5
import { AnyFunc, once, T } from 'pepka'
6
7
const MAX_32 = 2**31 - 1
8
const zipnum = new Zipnum()
9
10
export class WebSocketClient {
11
  private open = false
12
  private ws: wsc.Socket|null = null
13
  private forcibly_closed = false
14
  private reconnect_timeout: NodeJS.Timeout|null = null
15
  private queue = {}
16
  private messages: any[] = []
17
  private onReadyQueue: AnyFunc[] = []
18
  private onCloseQueue: AnyFunc[] = []
19
  private handlers = <{[event in wsc.WSEvent]: ((e: any) => void)[]}>{
20
    open: [], close: [], message: [], error: []
21
  }
22
  private config = <wsc.Config>{}
23
24
  private init_flush(): void {
25
    this.queue    = {}  // data queuse
26
    this.messages = []  // send() queue
27
  }
28
29
  private log(event: string, message: any = null, time: number|null = null): void {
30
    const config = this.config
31
    if(time !== null) {
32
      config.log(event, time, message)
33
    } else {
34
      if(config.timer) {
35
        config.log(event, null, message)
36
      } else {
37
        config.log(event, message)
38
      }
39
    }
40
  }
41
42
  private initSocket(ws: wsc.Socket) {
43
    const config = this.config
44
    this.open = true
45
    this.onReadyQueue.forEach((fn: Function) => fn())
46
    this.onReadyQueue.splice(0)
47
    const {id_key, data_key} = config.server
48
    // Works also on previously opened sockets that do not fire 'open' event.
49
    this.handlers.open.forEach((h) => h(ws))
50
    // Send all pending messages.
51
    this.messages.forEach((message: any) => message.send())
52
    // It's reconnecting.
53
    if(this.reconnect_timeout !== null) {
54
      clearInterval(this.reconnect_timeout)
55
      this.reconnect_timeout = null
56
    }
57
    if(config.ping) {
58
      const ping_interval = setInterval(() => {
59
        if(this.open) this.send(config.ping.content)
60
        if(this.forcibly_closed) clearInterval(ping_interval)
61
      }, config.ping.interval*1e3)
62
    }
63
    add_event(ws, 'close', async (...e) => {
64
      this.log('close')
65
      this.open = false
66
      this.onCloseQueue.forEach((fn: Function) => fn())
67
      this.onCloseQueue.splice(0)
68
      this.handlers.close.forEach((h: any) => h(...e))
69
      // Auto reconnect.
70
      const reconnect = config.reconnect
71
      if(
72
        typeof reconnect === 'number' &&
73
        !isNaN(reconnect) &&
74
        !this.forcibly_closed
75
      ) {
76
        const reconnectFunc = async () => {
77
          this.log('reconnect')
78
          if(this.ws !== null) {
79
            this.ws.close()
80
            this.ws = null
81
          }
82
          // If some error occured, try again.
83
          const status = await this.connect()
84
          if(status !== null) {
85
            this.reconnect_timeout = setTimeout(reconnectFunc, reconnect * 1000)
86
          }
87
        }
88
        // No need for await.
89
        reconnectFunc()
90
      } else {
91
        this.ws = null
92
        this.open = false
93
      }
94
      // reset the flag to reuse.
95
      this.forcibly_closed = false
96
    })
97
    add_event(ws, 'message', (e) => {
98
      try {
99
        const data = config.decode(e.data)
100
        this.handlers.message.forEach((h: any) => h({...e, data}))
101
        if(data[id_key]) {
102
          const q = this.queue[data[id_key]]
103
          if(q) {
104
            // Debug, Log.
105
            const time = q.sent_time ? (Date.now() - q.sent_time) : null
106
            this.log('message', data[data_key], time)
107
            // Play.
108
            q.ff(data[data_key])
109
            clearTimeout(q.timeout)
110
            delete this.queue[data[id_key]]
111
          }
112
        }
113
      } catch (err) {
114
        console.error(err, `WSP: Decode error. Got: ${e.data}`)
115
      }
116
    })
117
  }
118
119
  private async connect() { // returns status if won't open or null if ok.
120
    return new Promise((ff) => {
121
      if(this.open === true) {
122
        return ff(null)
123
      }
124
      const config = this.config
125
      const ws = config.socket || config.adapter(config.url, config.protocols)
126
      this.ws = ws
127
      
128
      if(!ws || ws.readyState > 1) {
129
        this.ws = null
130
        this.log('error', 'ready() on closing or closed state! status 2.')
131
        return ff(2)
132
      }
133
    
134
      add_event(ws, 'error', once((e) => {
135
        this.log('error', 'status 3.')
136
        this.handlers.error.forEach((h) => h(e))
137
        this.ws = null
138
        // Some network error: Connection refused or so.
139
        return ff(3)
140
      }))
141
      // Because 'open' won't be envoked on opened socket.
142
      if(ws.readyState) {
143
        this.initSocket(ws)
144
        ff(null)
145
      } else {
146
        add_event(ws, 'open', once(() => {
147
          this.log('open')
148
          this.initSocket(ws)
149
          return ff(null)
150
        }))
151
      }
152
    })
153
  }
154
155
  public get socket() {
156
    return this.ws
157
  }
158
159
  public async ready() {
160
    return new Promise<void>((ff) => {
161
      if(this.open) {
162
        ff()
163
      } else {
164
        this.onReadyQueue.push(ff)
165
      }
166
    })
167
  }
168
169
  public on(
170
    event_name: wsc.WSEvent,
171
    handler: (data: any) => any,
172
    predicate: (data: any) => boolean = T,
173
    raw = false
174
  ) {
175
    const _handler: wsc.EventHandler = (event) =>
176
      predicate(event) && handler(event)
177
    return raw
178
      ? add_event(this.ws as wsc.Socket, event_name, _handler)
179
      : this.handlers[event_name].push(_handler)
180
  }
181
182
  public async close(): wsc.AsyncErrCode {
183
    return new Promise((ff, rj) => {
184
      if(this.ws === null) {
185
        rj('WSP: closing a non-inited socket!')
186
      } else {
187
        this.open = false
188
        this.onCloseQueue.push(() => {
189
          this.init_flush()
190
          this.ws = null
191
          this.forcibly_closed = true
192
          ff(null)
193
        })
194
        this.ws.close()
195
      }
196
    })
197
  }
198
199
  /**  .send(your_data) wraps request to server with {id: `hash`, data: `actually your data`},
200
    returns a Promise that will be rejected after a timeout or
201
    resolved if server returns the same signature: {id: `same_hash`, data: `response data`}.
202
  */
203
  public async send<RequestDataType = any, ResponseDataType = any>(
204
    message_data: RequestDataType,
205
    opts = <wsc.SendOptions>{}
206
  ): Promise<ResponseDataType> {
207
    this.log('send', message_data)
208
    const config   = this.config
209
    const message  = {}
210
    const data_key = config.server.data_key
211
    const first_time_lazy = config.lazy && !this.open
212
213
    const message_id = zipnum.zip((Math.random()*(MAX_32-10))|0)
214
    if(typeof opts.top === 'object') {
215
      if(opts.top[data_key]) {
216
        throw new Error('Attempting to set data key/token via send() options!')
217
      }
218
      Object.assign(message, opts.top)
219
    }
220
221
    config.pipes.forEach(
222
      (pipe) => message_data = pipe(message_data)
223
    )
224
225
    if(this.open === true) {
226
      (this.ws as wsc.Socket).send(config.encode(message_id, message_data, config))
227
    } else if(this.open === false || first_time_lazy) {
228
      this.messages.push({
229
        send: () => (this.ws as wsc.Socket).send(config.encode(message_id, message_data, config))
230
      })
231
      if(first_time_lazy) {
232
        this.connect()
233
      }
234
    } else if(this.open === null) {
235
      throw new Error('Attempting to send via closed WebSocket connection!')
236
    }
237
238
    return new Promise((ff, rj) => {
239
      this.queue[message_id] = {
240
        ff,
241
        data_type: config.data_type,
242
        sent_time: config.timer ? Date.now() : null,
243
        timeout: sett(config.timeout, () => {
244
          if(this.queue[message_id]) {
245
            rj({
246
              'Websocket timeout expired: ': config.timeout,
247
              'for the message ': message_data
248
            })
249
            delete this.queue[message_id]
250
          }
251
        })
252
      }
253
    })
254
  }
255
256
  constructor(user_config: wsc.UserConfig = {}) {
257
    this.config = processConfig(user_config)
258
    // Init.
259
    this.init_flush()
260
    // Flags.
261
    this.open = false
262
    this.reconnect_timeout = null
263
    this.forcibly_closed = false
264
    if(!this.config.lazy) {
265
      this.connect()
266
    }
267
  }
268
}
269
270
/** @deprecated. Use named import { WebSocketClient } instead. */
271
export default WebSocketClient