pkg/filehandler/csv/csvhandler.go   B
last analyzed

Size/Duplication

Total Lines 251
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
cc 44
eloc 155
dl 0
loc 251
rs 8.8798
c 0
b 0
f 0

12 Methods

Rating   Name   Duplication   Size   Complexity  
A csv.*csvHandler.readline 0 18 4
A csv.*csvHandler.convertToAnyArray 0 7 2
A csv.*csvHandler.formatTableName 0 4 1
A csv.*csvHandler.Lines 0 2 1
A csv.*csvHandler.loadDataFromFile 0 27 5
A csv.NewCsvHandler 0 2 1
C csv.*csvHandler.Import 0 42 10
A csv.*csvHandler.openFiles 0 26 5
B csv.*csvHandler.loadTotalRows 0 23 6
A csv.*csvHandler.Close 0 12 4
A csv.*csvHandler.Query 0 7 2
A csv.*csvHandler.readHeader 0 11 3
1
package csv
2
3
import (
4
	"adrianolaselva.github.io/csvql/pkg/filehandler"
5
	"adrianolaselva.github.io/csvql/pkg/storage"
6
	"bytes"
7
	"database/sql"
8
	"encoding/csv"
9
	"errors"
10
	"fmt"
11
	"github.com/schollz/progressbar/v3"
12
	"io"
13
	"os"
14
	"path/filepath"
15
	"regexp"
16
	"strings"
17
	"sync"
18
)
19
20
const (
21
	bufferMaxLength = 32 * 1024
22
)
23
24
var nonAlphanumericRegex = regexp.MustCompile(`[^a-zA-Z0-9 ]+`)
25
26
type csvHandler struct {
27
	mx          sync.Mutex
28
	bar         *progressbar.ProgressBar
29
	storage     storage.Storage
30
	files       []*os.File
31
	fileInputs  []string
32
	totalLines  int
33
	limitLines  int
34
	currentLine int
35
	delimiter   rune
36
}
37
38
func NewCsvHandler(fileInputs []string, delimiter rune, bar *progressbar.ProgressBar, storage storage.Storage, limitLines int) filehandler.FileHandler {
39
	return &csvHandler{fileInputs: fileInputs, delimiter: delimiter, storage: storage, bar: bar, limitLines: limitLines}
40
}
41
42
// Import import data
43
func (c *csvHandler) Import() error {
44
	if err := c.openFiles(); err != nil {
45
		return err
46
	}
47
48
	wg := new(sync.WaitGroup)
49
	wg.Add(len(c.fileInputs))
50
	errChannels := make(chan error, len(c.fileInputs))
51
52
	for _, file := range c.fileInputs {
53
		go func(wg *sync.WaitGroup, file string, errChan chan error) {
54
			defer wg.Done()
55
			err := c.loadTotalRows(file)
56
			errChan <- err
57
		}(wg, file, errChannels)
58
	}
59
60
	wg.Wait()
61
	if err := <-errChannels; err != nil {
62
		return err
63
	}
64
65
	if c.limitLines > 0 && c.totalLines > c.limitLines {
66
		c.totalLines = c.limitLines
67
	}
68
69
	wg.Add(len(c.files))
70
	errChannels = make(chan error, len(c.files))
71
	for _, file := range c.files {
72
		tableName := c.formatTableName(file)
73
		go func(wg *sync.WaitGroup, file *os.File, tableName string, errChan chan error) {
74
			defer wg.Done()
75
			errChan <- c.loadDataFromFile(tableName, file)
76
		}(wg, file, tableName, errChannels)
77
	}
78
79
	wg.Wait()
80
	if err := <-errChannels; err != nil {
81
		return err
82
	}
83
84
	return nil
85
}
86
87
// formatTableName format table name by removing invalid characters
88
func (c *csvHandler) formatTableName(file *os.File) string {
89
	tableName := strings.ReplaceAll(strings.ToLower(filepath.Base(file.Name())), filepath.Ext(file.Name()), "")
90
	tableName = strings.ReplaceAll(tableName, " ", "_")
91
	return nonAlphanumericRegex.ReplaceAllString(tableName, "")
92
}
93
94
// Query execute statements
95
func (c *csvHandler) Query(cmd string) (*sql.Rows, error) {
96
	rows, err := c.storage.Query(cmd)
97
	if err != nil {
98
		return nil, fmt.Errorf("failed to execute query: %w", err)
99
	}
100
101
	return rows, nil
102
}
103
104
// Lines return total lines
105
func (c *csvHandler) Lines() int {
106
	return c.totalLines
107
}
108
109
// Close execute in defer
110
func (c *csvHandler) Close() error {
111
	defer func(storage storage.Storage) {
112
		_ = storage.Close()
113
	}(c.storage)
114
115
	defer func(files []*os.File) {
116
		for _, file := range files {
117
			_ = file.Close()
118
		}
119
	}(c.files)
120
121
	return nil
122
}
123
124
// loadDataFromFile load data from file
125
func (c *csvHandler) loadDataFromFile(tableName string, file *os.File) error {
126
	c.mx.Lock()
127
	defer c.mx.Unlock()
128
129
	c.bar.ChangeMax(c.totalLines)
130
131
	r := csv.NewReader(file)
132
	r.Comma = c.delimiter
133
134
	columns, err := c.readHeader(tableName, r)
135
	if err != nil {
136
		return fmt.Errorf("failed to load headers and build structure: %w", err)
137
	}
138
139
	c.currentLine = 0
140
	for {
141
		err := c.readline(tableName, columns, r)
142
		if errors.Is(err, io.EOF) {
143
			break
144
		}
145
146
		if err != nil {
147
			return err
148
		}
149
	}
150
151
	return nil
152
}
153
154
// readHeader read header
155
func (c *csvHandler) readHeader(tableName string, r *csv.Reader) ([]string, error) {
156
	columns, err := r.Read()
157
	if err != nil {
158
		return nil, fmt.Errorf("failed to load headers: %w", err)
159
	}
160
161
	if err := c.storage.BuildStructure(tableName, columns); err != nil {
162
		return nil, fmt.Errorf("failed to load headers and build structure: %w", err)
163
	}
164
165
	return columns, nil
166
}
167
168
// readline read line
169
func (c *csvHandler) readline(tableName string, columns []string, r *csv.Reader) error {
170
	records, err := r.Read()
171
	if err != nil {
172
		return fmt.Errorf("failed to read line: %w", err)
173
	}
174
175
	if c.totalLines == c.currentLine {
176
		return io.EOF
177
	}
178
179
	_ = c.bar.Add(1)
180
	c.currentLine++
181
182
	if err := c.storage.InsertRow(tableName, columns, c.convertToAnyArray(records)); err != nil {
183
		return fmt.Errorf("failed to process row number %d: %w", c.currentLine, err)
184
	}
185
186
	return nil
187
}
188
189
// convertToAnyArray convert string array to any array
190
func (c *csvHandler) convertToAnyArray(records []string) []any {
191
	values := make([]any, 0, len(records))
192
	for _, r := range records {
193
		values = append(values, r)
194
	}
195
196
	return values
197
}
198
199
// openFile open file
200
func (c *csvHandler) openFiles() error {
201
	wg := new(sync.WaitGroup)
202
	wg.Add(len(c.fileInputs))
203
	errChannels := make(chan error, len(c.fileInputs))
204
205
	for _, file := range c.fileInputs {
206
		go func(wg *sync.WaitGroup, file string, errChan chan error) {
207
			defer wg.Done()
208
209
			f, err := os.Open(file)
210
			if err != nil {
211
				errChan <- fmt.Errorf("failed to open file: %w", err)
212
				return
213
			}
214
215
			c.files = append(c.files, f)
216
			errChan <- nil
217
		}(wg, file, errChannels)
218
	}
219
220
	wg.Wait()
221
	if err := <-errChannels; err != nil {
222
		return fmt.Errorf("failed to open file: %w", err)
223
	}
224
225
	return nil
226
}
227
228
// loadTotalRows load total rows in file
229
func (c *csvHandler) loadTotalRows(file string) error {
230
	r, err := os.Open(file)
231
	if err != nil {
232
		return fmt.Errorf("failed to open file %s: %w", file, err)
233
	}
234
	defer func(r *os.File) {
235
		_ = r.Close()
236
	}(r)
237
238
	buf := make([]byte, bufferMaxLength)
239
	c.totalLines = 0
240
	lineSep := []byte{'\n'}
241
242
	for {
243
		r, err := r.Read(buf)
244
		c.totalLines += bytes.Count(buf[:r], lineSep)
245
246
		switch {
247
		case err == io.EOF:
248
			return nil
249
250
		case err != nil:
251
			return fmt.Errorf("failed to totalize rows: %w", err)
252
		}
253
	}
254
}
255