Passed
Push — main ( 25bc96...3a375b )
by Adriano
02:00
created

csv.*csvHandler.Import   B

Complexity

Conditions 6

Size

Total Lines 18
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
eloc 10
nop 0
dl 0
loc 18
rs 8.6666
c 0
b 0
f 0
1
package csv
2
3
import (
4
	"adrianolaselva.github.io/csvql/pkg/filehandler"
5
	"adrianolaselva.github.io/csvql/pkg/storage"
6
	"bytes"
7
	"database/sql"
8
	"encoding/csv"
9
	"errors"
10
	"fmt"
11
	"github.com/schollz/progressbar/v3"
12
	"io"
13
	"os"
14
)
15
16
const (
17
	bufferMaxLength = 32 * 1024
18
)
19
20
type csvHandler struct {
21
	bar         *progressbar.ProgressBar
22
	storage     storage.Storage
23
	file        *os.File
24
	fileInput   string
25
	totalLines  int
26
	limitLines  int
27
	currentLine int
28
	delimiter   rune
29
}
30
31
func NewCsvHandler(fileInput string, delimiter rune, bar *progressbar.ProgressBar, storage storage.Storage, limitLines int) filehandler.FileHandler {
32
	return &csvHandler{fileInput: fileInput, delimiter: delimiter, storage: storage, bar: bar, limitLines: limitLines}
33
}
34
35
// Import import data
36
func (c *csvHandler) Import() error {
37
	if err := c.openFile(); err != nil {
38
		return err
39
	}
40
41
	if err := c.loadTotalRows(); err != nil {
42
		return err
43
	}
44
45
	if c.limitLines > 0 && c.totalLines > c.limitLines {
46
		c.totalLines = c.limitLines
47
	}
48
49
	if err := c.loadDataFromFile(); err != nil {
50
		return err
51
	}
52
53
	return nil
54
}
55
56
// Query execute statements
57
func (c *csvHandler) Query(cmd string) (*sql.Rows, error) {
58
	rows, err := c.storage.Query(cmd)
59
	if err != nil {
60
		return nil, fmt.Errorf("failed to execute query: %w", err)
1 ignored issue
show
introduced by
unrecognized printf verb 'w'
Loading history...
61
	}
62
63
	return rows, nil
64
}
65
66
// Lines return total lines
67
func (c *csvHandler) Lines() int {
68
	return c.totalLines
69
}
70
71
// Close execute in defer
72
func (c *csvHandler) Close() error {
73
	defer func(storage storage.Storage) {
74
		_ = storage.Close()
75
	}(c.storage)
76
77
	defer func(file *os.File) {
78
		_ = file.Close()
79
	}(c.file)
80
81
	return nil
82
}
83
84
// loadDataFromFile load data from file
85
func (c *csvHandler) loadDataFromFile() error {
86
	c.bar.ChangeMax(c.totalLines)
87
88
	r := csv.NewReader(c.file)
89
	r.Comma = c.delimiter
90
91
	if err := c.readHeader(r); err != nil {
92
		return fmt.Errorf("failed to load headers and build structure: %w", err)
1 ignored issue
show
introduced by
unrecognized printf verb 'w'
Loading history...
93
	}
94
95
	c.currentLine = 0
96
	for {
97
		err := c.readline(r)
98
		if errors.Is(err, io.EOF) {
99
			break
100
		}
101
102
		if err != nil {
103
			return err
104
		}
105
	}
106
107
	return nil
108
}
109
110
// readHeader read header
111
func (c *csvHandler) readHeader(r *csv.Reader) error {
112
	headers, err := r.Read()
113
	if err != nil {
114
		return fmt.Errorf("failed to load headers: %w", err)
1 ignored issue
show
introduced by
unrecognized printf verb 'w'
Loading history...
115
	}
116
117
	if err := c.storage.SetColumns(headers).BuildStructure(); err != nil {
118
		return fmt.Errorf("failed to load headers and build structure: %w", err)
1 ignored issue
show
introduced by
unrecognized printf verb 'w'
Loading history...
119
	}
120
121
	return nil
122
}
123
124
// readline read line
125
func (c *csvHandler) readline(r *csv.Reader) error {
126
	records, err := r.Read()
127
	if err != nil {
128
		return fmt.Errorf("failed to read line: %w", err)
1 ignored issue
show
introduced by
unrecognized printf verb 'w'
Loading history...
129
	}
130
131
	if c.totalLines == c.currentLine {
132
		return io.EOF
133
	}
134
135
	_ = c.bar.Add(1)
136
	c.currentLine++
137
138
	if err := c.storage.InsertRow(c.convertToAnyArray(records)); err != nil {
139
		return fmt.Errorf("failed to process row number %d: %w", c.currentLine, err)
1 ignored issue
show
introduced by
unrecognized printf verb 'w'
Loading history...
140
	}
141
142
	return nil
143
}
144
145
// convertToAnyArray convert string array to any array
146
func (c *csvHandler) convertToAnyArray(records []string) []any {
147
	values := make([]any, 0, len(records))
148
	for _, r := range records {
149
		values = append(values, r)
150
	}
151
152
	return values
153
}
154
155
// openFile open file
156
func (c *csvHandler) openFile() error {
157
	f, err := os.Open(c.fileInput)
158
	if err != nil {
159
		return fmt.Errorf("failed to open file: %w", err)
1 ignored issue
show
introduced by
unrecognized printf verb 'w'
Loading history...
160
	}
161
162
	c.file = f
163
164
	return nil
165
}
166
167
// loadTotalRows load total rows in file
168
func (c *csvHandler) loadTotalRows() error {
169
	r, err := os.Open(c.fileInput)
170
	if err != nil {
171
		return fmt.Errorf("failed to open file %s: %w", c.fileInput, err)
1 ignored issue
show
introduced by
unrecognized printf verb 'w'
Loading history...
172
	}
173
	defer r.Close()
174
175
	buf := make([]byte, bufferMaxLength)
176
	c.totalLines = 0
177
	lineSep := []byte{'\n'}
178
179
	for {
180
		r, err := r.Read(buf)
181
		c.totalLines += bytes.Count(buf[:r], lineSep)
182
183
		switch {
184
		case err == io.EOF:
185
			return nil
186
187
		case err != nil:
188
			return fmt.Errorf("failed to totalize rows: %w", err)
1 ignored issue
show
introduced by
unrecognized printf verb 'w'
Loading history...
189
		}
190
	}
191
}
192