Passed
Push — main ( 10dbfc...61a674 )
by LCS
05:39 queued 03:09
created

node_modules/minipass/index.js   F

Complexity

Total Complexity 136
Complexity/F 2.43

Size

Lines of Code 537
Function Count 56

Duplication

Duplicated Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 354
dl 0
loc 537
rs 2
c 0
b 0
f 0
wmc 136
mnd 80
bc 80
fnc 56
bpm 1.4285
cpm 2.4285
noi 192

How to fix   Complexity   

Complexity

Complex classes like node_modules/minipass/index.js often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
'use strict'
2
const EE = require('events')
3
const Yallist = require('yallist')
4
const SD = require('string_decoder').StringDecoder
5
6
const EOF = Symbol('EOF')
7
const MAYBE_EMIT_END = Symbol('maybeEmitEnd')
8
const EMITTED_END = Symbol('emittedEnd')
9
const EMITTING_END = Symbol('emittingEnd')
10
const CLOSED = Symbol('closed')
11
const READ = Symbol('read')
12
const FLUSH = Symbol('flush')
13
const FLUSHCHUNK = Symbol('flushChunk')
14
const ENCODING = Symbol('encoding')
15
const DECODER = Symbol('decoder')
16
const FLOWING = Symbol('flowing')
17
const PAUSED = Symbol('paused')
18
const RESUME = Symbol('resume')
19
const BUFFERLENGTH = Symbol('bufferLength')
20
const BUFFERPUSH = Symbol('bufferPush')
21
const BUFFERSHIFT = Symbol('bufferShift')
22
const OBJECTMODE = Symbol('objectMode')
23
const DESTROYED = Symbol('destroyed')
24
25
// TODO remove when Node v8 support drops
26
const doIter = global._MP_NO_ITERATOR_SYMBOLS_  !== '1'
27
const ASYNCITERATOR = doIter && Symbol.asyncIterator
28
  || Symbol('asyncIterator not implemented')
29
const ITERATOR = doIter && Symbol.iterator
30
  || Symbol('iterator not implemented')
31
32
// Buffer in node 4.x < 4.5.0 doesn't have working Buffer.from
33
// or Buffer.alloc, and Buffer in node 10 deprecated the ctor.
34
// .M, this is fine .\^/M..
35
const B = Buffer.alloc ? Buffer
36
  : /* istanbul ignore next */ require('safe-buffer').Buffer
37
38
// events that mean 'the stream is over'
39
// these are treated specially, and re-emitted
40
// if they are listened for after emitting.
41
const isEndish = ev =>
42
  ev === 'end' ||
43
  ev === 'finish' ||
44
  ev === 'prefinish'
45
46
const isArrayBuffer = b => b instanceof ArrayBuffer ||
47
  typeof b === 'object' &&
48
  b.constructor &&
49
  b.constructor.name === 'ArrayBuffer' &&
50
  b.byteLength >= 0
51
52
const isArrayBufferView = b => !B.isBuffer(b) && ArrayBuffer.isView(b)
53
54
module.exports = class Minipass extends EE {
55
  constructor (options) {
56
    super()
57
    this[FLOWING] = false
58
    // whether we're explicitly paused
59
    this[PAUSED] = false
60
    this.pipes = new Yallist()
61
    this.buffer = new Yallist()
62
    this[OBJECTMODE] = options && options.objectMode || false
63
    if (this[OBJECTMODE])
64
      this[ENCODING] = null
65
    else
66
      this[ENCODING] = options && options.encoding || null
67
    if (this[ENCODING] === 'buffer')
68
      this[ENCODING] = null
69
    this[DECODER] = this[ENCODING] ? new SD(this[ENCODING]) : null
70
    this[EOF] = false
71
    this[EMITTED_END] = false
72
    this[EMITTING_END] = false
73
    this[CLOSED] = false
74
    this.writable = true
75
    this.readable = true
76
    this[BUFFERLENGTH] = 0
77
    this[DESTROYED] = false
78
  }
79
80
  get bufferLength () { return this[BUFFERLENGTH] }
81
82
  get encoding () { return this[ENCODING] }
83
  set encoding (enc) {
84
    if (this[OBJECTMODE])
85
      throw new Error('cannot set encoding in objectMode')
86
87
    if (this[ENCODING] && enc !== this[ENCODING] &&
88
        (this[DECODER] && this[DECODER].lastNeed || this[BUFFERLENGTH]))
89
      throw new Error('cannot change encoding')
90
91
    if (this[ENCODING] !== enc) {
92
      this[DECODER] = enc ? new SD(enc) : null
93
      if (this.buffer.length)
94
        this.buffer = this.buffer.map(chunk => this[DECODER].write(chunk))
95
    }
96
97
    this[ENCODING] = enc
98
  }
99
100
  setEncoding (enc) {
101
    this.encoding = enc
102
  }
103
104
  get objectMode () { return this[OBJECTMODE] }
105
  set objectMode (ॐ ) { this[OBJECTMODE] = this[OBJECTMODE] || !!ॐ  }
106
107
  write (chunk, encoding, cb) {
108
    if (this[EOF])
109
      throw new Error('write after end')
110
111
    if (this[DESTROYED]) {
112
      this.emit('error', Object.assign(
113
        new Error('Cannot call write after a stream was destroyed'),
114
        { code: 'ERR_STREAM_DESTROYED' }
115
      ))
116
      return true
117
    }
118
119
    if (typeof encoding === 'function')
120
      cb = encoding, encoding = 'utf8'
121
122
    if (!encoding)
123
      encoding = 'utf8'
124
125
    // convert array buffers and typed array views into buffers
126
    // at some point in the future, we may want to do the opposite!
127
    // leave strings and buffers as-is
128
    // anything else switches us into object mode
129
    if (!this[OBJECTMODE] && !B.isBuffer(chunk)) {
130
      if (isArrayBufferView(chunk))
131
        chunk = B.from(chunk.buffer, chunk.byteOffset, chunk.byteLength)
132
      else if (isArrayBuffer(chunk))
133
        chunk = B.from(chunk)
134
      else if (typeof chunk !== 'string')
135
        // use the setter so we throw if we have encoding set
136
        this.objectMode = true
137
    }
138
139
    // this ensures at this point that the chunk is a buffer or string
140
    // don't buffer it up or send it to the decoder
141
    if (!this.objectMode && !chunk.length) {
142
      const ret = this.flowing
143
      if (this[BUFFERLENGTH] !== 0)
144
        this.emit('readable')
145
      if (cb)
146
        cb()
147
      return ret
148
    }
149
150
    // fast-path writing strings of same encoding to a stream with
151
    // an empty buffer, skipping the buffer/decoder dance
152
    if (typeof chunk === 'string' && !this[OBJECTMODE] &&
153
        // unless it is a string already ready for us to use
154
        !(encoding === this[ENCODING] && !this[DECODER].lastNeed)) {
155
      chunk = B.from(chunk, encoding)
156
    }
157
158
    if (B.isBuffer(chunk) && this[ENCODING])
159
      chunk = this[DECODER].write(chunk)
160
161
    try {
162
      return this.flowing
163
        ? (this.emit('data', chunk), this.flowing)
164
        : (this[BUFFERPUSH](chunk), false)
165
    } finally {
166
      if (this[BUFFERLENGTH] !== 0)
167
        this.emit('readable')
168
      if (cb)
169
        cb()
170
    }
171
  }
172
173
  read (n) {
174
    if (this[DESTROYED])
175
      return null
176
177
    try {
178
      if (this[BUFFERLENGTH] === 0 || n === 0 || n > this[BUFFERLENGTH])
179
        return null
180
181
      if (this[OBJECTMODE])
182
        n = null
183
184
      if (this.buffer.length > 1 && !this[OBJECTMODE]) {
185
        if (this.encoding)
186
          this.buffer = new Yallist([
187
            Array.from(this.buffer).join('')
188
          ])
189
        else
190
          this.buffer = new Yallist([
191
            B.concat(Array.from(this.buffer), this[BUFFERLENGTH])
192
          ])
193
      }
194
195
      return this[READ](n || null, this.buffer.head.value)
196
    } finally {
197
      this[MAYBE_EMIT_END]()
198
    }
199
  }
200
201
  [READ] (n, chunk) {
202
    if (n === chunk.length || n === null)
203
      this[BUFFERSHIFT]()
204
    else {
205
      this.buffer.head.value = chunk.slice(n)
206
      chunk = chunk.slice(0, n)
207
      this[BUFFERLENGTH] -= n
208
    }
209
210
    this.emit('data', chunk)
211
212
    if (!this.buffer.length && !this[EOF])
213
      this.emit('drain')
214
215
    return chunk
216
  }
217
218
  end (chunk, encoding, cb) {
219
    if (typeof chunk === 'function')
220
      cb = chunk, chunk = null
221
    if (typeof encoding === 'function')
222
      cb = encoding, encoding = 'utf8'
223
    if (chunk)
224
      this.write(chunk, encoding)
225
    if (cb)
226
      this.once('end', cb)
227
    this[EOF] = true
228
    this.writable = false
229
230
    // if we haven't written anything, then go ahead and emit,
231
    // even if we're not reading.
232
    // we'll re-emit if a new 'end' listener is added anyway.
233
    // This makes MP more suitable to write-only use cases.
234
    if (this.flowing || !this[PAUSED])
235
      this[MAYBE_EMIT_END]()
236
    return this
237
  }
238
239
  // don't let the internal resume be overwritten
240
  [RESUME] () {
241
    if (this[DESTROYED])
242
      return
243
244
    this[PAUSED] = false
245
    this[FLOWING] = true
246
    this.emit('resume')
247
    if (this.buffer.length)
248
      this[FLUSH]()
249
    else if (this[EOF])
250
      this[MAYBE_EMIT_END]()
251
    else
252
      this.emit('drain')
253
  }
254
255
  resume () {
256
    return this[RESUME]()
257
  }
258
259
  pause () {
260
    this[FLOWING] = false
261
    this[PAUSED] = true
262
  }
263
264
  get destroyed () {
265
    return this[DESTROYED]
266
  }
267
268
  get flowing () {
269
    return this[FLOWING]
270
  }
271
272
  get paused () {
273
    return this[PAUSED]
274
  }
275
276
  [BUFFERPUSH] (chunk) {
277
    if (this[OBJECTMODE])
278
      this[BUFFERLENGTH] += 1
279
    else
280
      this[BUFFERLENGTH] += chunk.length
281
    return this.buffer.push(chunk)
282
  }
283
284
  [BUFFERSHIFT] () {
285
    if (this.buffer.length) {
286
      if (this[OBJECTMODE])
287
        this[BUFFERLENGTH] -= 1
288
      else
289
        this[BUFFERLENGTH] -= this.buffer.head.value.length
290
    }
291
    return this.buffer.shift()
292
  }
293
294
  [FLUSH] () {
295
    do {} while (this[FLUSHCHUNK](this[BUFFERSHIFT]()))
296
297
    if (!this.buffer.length && !this[EOF])
298
      this.emit('drain')
299
  }
300
301
  [FLUSHCHUNK] (chunk) {
302
    return chunk ? (this.emit('data', chunk), this.flowing) : false
303
  }
304
305
  pipe (dest, opts) {
306
    if (this[DESTROYED])
307
      return
308
309
    const ended = this[EMITTED_END]
310
    opts = opts || {}
311
    if (dest === process.stdout || dest === process.stderr)
312
      opts.end = false
313
    else
314
      opts.end = opts.end !== false
315
316
    const p = { dest: dest, opts: opts, ondrain: _ => this[RESUME]() }
317
    this.pipes.push(p)
318
319
    dest.on('drain', p.ondrain)
320
    this[RESUME]()
321
    // piping an ended stream ends immediately
322
    if (ended && p.opts.end)
323
      p.dest.end()
324
    return dest
325
  }
326
327
  addListener (ev, fn) {
328
    return this.on(ev, fn)
329
  }
330
331
  on (ev, fn) {
332
    try {
333
      return super.on(ev, fn)
334
    } finally {
335
      if (ev === 'data' && !this.pipes.length && !this.flowing)
336
        this[RESUME]()
337
      else if (isEndish(ev) && this[EMITTED_END]) {
338
        super.emit(ev)
339
        this.removeAllListeners(ev)
340
      }
341
    }
342
  }
343
344
  get emittedEnd () {
345
    return this[EMITTED_END]
346
  }
347
348
  [MAYBE_EMIT_END] () {
349
    if (!this[EMITTING_END] &&
350
        !this[EMITTED_END] &&
351
        !this[DESTROYED] &&
352
        this.buffer.length === 0 &&
353
        this[EOF]) {
354
      this[EMITTING_END] = true
355
      this.emit('end')
356
      this.emit('prefinish')
357
      this.emit('finish')
358
      if (this[CLOSED])
359
        this.emit('close')
360
      this[EMITTING_END] = false
361
    }
362
  }
363
364
  emit (ev, data) {
365
    // error and close are only events allowed after calling destroy()
366
    if (ev !== 'error' && ev !== 'close' && ev !== DESTROYED && this[DESTROYED])
367
      return
368
    else if (ev === 'data') {
369
      if (!data)
370
        return
371
372
      if (this.pipes.length)
373
        this.pipes.forEach(p =>
374
          p.dest.write(data) === false && this.pause())
375
    } else if (ev === 'end') {
376
      // only actual end gets this treatment
377
      if (this[EMITTED_END] === true)
378
        return
379
380
      this[EMITTED_END] = true
381
      this.readable = false
382
383
      if (this[DECODER]) {
384
        data = this[DECODER].end()
385
        if (data) {
386
          this.pipes.forEach(p => p.dest.write(data))
387
          super.emit('data', data)
388
        }
389
      }
390
391
      this.pipes.forEach(p => {
392
        p.dest.removeListener('drain', p.ondrain)
393
        if (p.opts.end)
394
          p.dest.end()
395
      })
396
    } else if (ev === 'close') {
397
      this[CLOSED] = true
398
      // don't emit close before 'end' and 'finish'
399
      if (!this[EMITTED_END] && !this[DESTROYED])
400
        return
401
    }
402
403
    // TODO: replace with a spread operator when Node v4 support drops
404
    const args = new Array(arguments.length)
405
    args[0] = ev
406
    args[1] = data
407
    if (arguments.length > 2) {
408
      for (let i = 2; i < arguments.length; i++) {
409
        args[i] = arguments[i]
410
      }
411
    }
412
413
    try {
414
      return super.emit.apply(this, args)
415
    } finally {
416
      if (!isEndish(ev))
417
        this[MAYBE_EMIT_END]()
418
      else
419
        this.removeAllListeners(ev)
420
    }
421
  }
422
423
  // const all = await stream.collect()
424
  collect () {
425
    const buf = []
426
    buf.dataLength = 0
427
    this.on('data', c => {
428
      buf.push(c)
429
      buf.dataLength += c.length
430
    })
431
    return this.promise().then(() => buf)
432
  }
433
434
  // const data = await stream.concat()
435
  concat () {
436
    return this[OBJECTMODE]
437
      ? Promise.reject(new Error('cannot concat in objectMode'))
438
      : this.collect().then(buf =>
439
          this[OBJECTMODE]
440
            ? Promise.reject(new Error('cannot concat in objectMode'))
441
            : this[ENCODING] ? buf.join('') : B.concat(buf, buf.dataLength))
442
  }
443
444
  // stream.promise().then(() => done, er => emitted error)
445
  promise () {
446
    return new Promise((resolve, reject) => {
447
      this.on(DESTROYED, () => reject(new Error('stream destroyed')))
448
      this.on('end', () => resolve())
449
      this.on('error', er => reject(er))
450
    })
451
  }
452
453
  // for await (let chunk of stream)
454
  [ASYNCITERATOR] () {
455
    const next = () => {
456
      const res = this.read()
457
      if (res !== null)
458
        return Promise.resolve({ done: false, value: res })
459
460
      if (this[EOF])
461
        return Promise.resolve({ done: true })
462
463
      let resolve = null
464
      let reject = null
465
      const onerr = er => {
466
        this.removeListener('data', ondata)
467
        this.removeListener('end', onend)
468
        reject(er)
469
      }
470
      const ondata = value => {
471
        this.removeListener('error', onerr)
472
        this.removeListener('end', onend)
473
        this.pause()
474
        resolve({ value: value, done: !!this[EOF] })
475
      }
476
      const onend = () => {
477
        this.removeListener('error', onerr)
478
        this.removeListener('data', ondata)
479
        resolve({ done: true })
480
      }
481
      const ondestroy = () => onerr(new Error('stream destroyed'))
482
      return new Promise((res, rej) => {
483
        reject = rej
484
        resolve = res
485
        this.once(DESTROYED, ondestroy)
486
        this.once('error', onerr)
487
        this.once('end', onend)
488
        this.once('data', ondata)
489
      })
490
    }
491
492
    return { next }
493
  }
494
495
  // for (let chunk of stream)
496
  [ITERATOR] () {
497
    const next = () => {
498
      const value = this.read()
499
      const done = value === null
500
      return { value, done }
501
    }
502
    return { next }
503
  }
504
505
  destroy (er) {
506
    if (this[DESTROYED]) {
507
      if (er)
508
        this.emit('error', er)
509
      else
510
        this.emit(DESTROYED)
511
      return this
512
    }
513
514
    this[DESTROYED] = true
515
516
    // throw away all buffered data, it's never coming out
517
    this.buffer = new Yallist()
518
    this[BUFFERLENGTH] = 0
519
520
    if (typeof this.close === 'function' && !this[CLOSED])
521
      this.close()
522
523
    if (er)
524
      this.emit('error', er)
525
    else // if no error to emit, still reject pending promises
526
      this.emit(DESTROYED)
527
528
    return this
529
  }
530
531
  static isStream (s) {
532
    return !!s && (s instanceof Minipass || s instanceof EE && (
533
      typeof s.pipe === 'function' || // readable
534
      (typeof s.write === 'function' && typeof s.end === 'function') // writable
535
    ))
536
  }
537
}
538