processRow(StepMetaInterface,StepDataInterface)   D
last analyzed

Complexity

Conditions 13

Size

Total Lines 105
Code Lines 66

Duplication

Lines 105
Ratio 100 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
eloc 66
c 1
b 0
f 1
dl 105
loc 105
rs 4.0527
cc 13

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like br.ufrj.ppgi.greco.kettle.SparqlStep.processRow(StepMetaInterface,StepDataInterface) often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
package br.ufrj.ppgi.greco.kettle;
2
3
import java.io.ByteArrayOutputStream;
4
import java.io.IOException;
5
import java.io.PrintWriter;
6
7
import org.pentaho.di.core.exception.KettleException;
8
import org.pentaho.di.core.exception.KettleStepException;
9
import org.pentaho.di.core.row.RowDataUtil;
10
import org.pentaho.di.core.row.RowMeta;
11
import org.pentaho.di.core.row.RowMetaInterface;
12
import org.pentaho.di.trans.Trans;
13
import org.pentaho.di.trans.TransMeta;
14
import org.pentaho.di.trans.step.BaseStep;
15
import org.pentaho.di.trans.step.StepDataInterface;
16
import org.pentaho.di.trans.step.StepInterface;
17
import org.pentaho.di.trans.step.StepMeta;
18
import org.pentaho.di.trans.step.StepMetaInterface;
19
20
import org.apache.jena.query.ARQ;
21
import org.apache.jena.query.Query;
22
import org.apache.jena.query.QueryException;
23
import org.apache.jena.query.QueryExecution;
24
import org.apache.jena.query.QueryFactory;
25
import org.apache.jena.query.QuerySolution;
26
import org.apache.jena.query.ResultSet;
27
import org.apache.jena.rdf.model.Literal;
28
import org.apache.jena.rdf.model.Model;
29
import org.apache.jena.rdf.model.RDFNode;
30
import org.apache.jena.rdf.model.Statement;
31
import org.apache.jena.rdf.model.StmtIterator;
32
33
public class SparqlStep extends BaseStep implements StepInterface {
34
35
	private static int MAX_ATTEMPTS = 4;
36
37
	public SparqlStep(StepMeta stepMeta, StepDataInterface stepDataInterface, int copyNr, TransMeta transMeta,
38
			Trans trans) {
39
		super(stepMeta, stepDataInterface, copyNr, transMeta, trans);
40
	}
41
42 View Code Duplication
	public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
43
		SparqlStepMeta meta = (SparqlStepMeta) smi;
44
		SparqlStepData data = (SparqlStepData) sdi;
45
46
		final Object[] row = getRow();
47
48
		if (first) {
49
			first = false;
50
51
			RowMetaInterface rowMeta = getInputRowMeta(row != null);
52
			data.outputRowMeta = rowMeta.clone();
53
54
			// Adiciona os metadados do step atual
55
			meta.getFields(data.outputRowMeta, getStepname(), null, null, this);
56
57
			data.inputRowSize = rowMeta.size();
58
59
			// Obtem string de consulta e constroi o objeto consulta
60
			String queryStr = SparqlStepUtils.toFullQueryString(meta.getPrefixes(), meta.getQueryString());
61
			try {
62
				data.originalQuery = QueryFactory.create(queryStr);
63
			} catch (QueryException e) {
64
				// Se consulta for invalida nao pode continuar
65
				throw new KettleException(e);
66
			}
67
68
			// Se nao usar SAX o execSelect() nao funciona
69
			ARQ.set(ARQ.useSAX, true);
70
71
			// Offset e Limit para Construct/select/describe quando limit nao
72
			// especificado
73
			if (!data.originalQuery.hasLimit() && (data.originalQuery.getQueryType() != Query.QueryTypeAsk)
74
					&& (data.originalQuery.getQueryType() != Query.QueryTypeDescribe)) {
75
				// Consulta eh quebrada em varias usando OFFSET e LIMIT
76
				data.offset = data.originalQuery.hasOffset() ? data.originalQuery.getOffset() : 0;
77
				data.limit = 1000;
78
				data.runAtOnce = false;
79
			} else {
80
				data.runAtOnce = true;
81
			}
82
83
			data.remainingTries = MAX_ATTEMPTS;
84
85
			return true;
86
		}
87
88
		Query query = null;
89
		if (data.runAtOnce) {
90
			// Roda consulta num unico HTTP Request
91
			query = data.originalQuery;
92
93
			while (data.remainingTries > 0) {
94
				// Tenta executar consulta ate MAX_ATTEMPTS vezes
95
				try {
96
					runQueryAndPutResults(query, meta, data, row);
97
98
					setOutputDone();
99
					return false; // Nao ha mais resultados, ie, processRow()
100
									// nao sera' chamado novamente
101
				} catch (Throwable e) {
102
					handleError(e, MAX_ATTEMPTS - data.remainingTries + 1);
103
				}
104
105
				data.remainingTries--;
106
			}
107
		} else {
108
			// Cria consulta que representa o bloco atual
109
			query = data.originalQuery.cloneQuery();
110
			query.setOffset(data.offset);
111
			query.setLimit(data.limit);
112
113
			while (data.remainingTries > 0) {
114
				try {
115
					int numRows = runQueryAndPutResults(query, meta, data, row);
116
117
					if (numRows > 0) {
118
						data.offset += data.limit;
119
						data.remainingTries = MAX_ATTEMPTS;
120
						return true;
121
					} else { 
122
						setOutputDone();
123
						return false;
124
					}
125
				} catch (Throwable e) {
126
					handleError(e, MAX_ATTEMPTS - data.remainingTries + 1);
127
				}
128
129
				data.remainingTries--;
130
			}
131
		}
132
133
		StringBuilder sb = new StringBuilder();
134
		sb.append("Todas as tentativas de executar a consulta falharam. ");
135
		sb.append("Verifique conexão de rede e o SPARQL Endpoint.\n");
136
		sb.append("Endpoint: ");
137
		sb.append(meta.getEndpointUri());
138
		sb.append('\n');
139
		sb.append("Grafo padrão: ");
140
		sb.append(meta.getDefaultGraph());
141
		sb.append('\n');
142
		sb.append("Consulta:\n");
143
		sb.append(query.toString());
144
		sb.append('\n');
145
146
		throw new KettleException(sb.toString());
147
	}
148
149
	private RowMetaInterface getInputRowMeta(boolean hasInputRow) {
150
151
		RowMetaInterface rowMeta = null;
152
		if (hasInputRow)
153
			rowMeta = getInputRowMeta();
154
		else
155
			rowMeta = new RowMeta();
156
157
		return rowMeta;
158
	}
159
160 View Code Duplication
	private void handleError(Throwable e, int attempts) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
161
162
		try {
163
			String msg = String.format("Falha ao executar consulta (tentativa %d de %d): ", attempts, MAX_ATTEMPTS);
164
165
			ByteArrayOutputStream baos = new ByteArrayOutputStream();
166
			baos.write(msg.getBytes());
167
168
			e.printStackTrace(new PrintWriter(baos, true));
169
170
			long sleepTime = (long) (500 * Math.pow(2, attempts));
171
			msg = String.format("Tentando novamente em %d milissegundos...", sleepTime);
172
			baos.write(msg.getBytes());
173
174
			log.logBasic(baos.toString());
175
176
			Thread.sleep(sleepTime);
177
178
		} catch (IOException e1) {
179
			e1.printStackTrace();
180
		} catch (InterruptedException e2) {
181
			e2.printStackTrace();
182
		}
183
	}
184
185
	private int tripleWriter(Model model, Object[] row, SparqlStepData data) throws KettleStepException {
186
		int numPutRows = 0;
187
		StmtIterator it = model.listStatements();
188
189
		while (it.hasNext()) {
190
			Statement stmt = it.next();
191
192
			incrementLinesInput();
193
194
			String subject = stmt.getSubject().toString();
195
			String predicate = stmt.getPredicate().toString();
196
			String object = stmt.getObject().toString();
197
198
			if (subject != null && !subject.isEmpty() && predicate != null && !predicate.isEmpty() && object != null) {
199
				// Monta linha com a tripla
200
				Object[] outputRow = row;
201
				outputRow = RowDataUtil.addValueData(outputRow, data.inputRowSize + 0, subject);
202
				outputRow = RowDataUtil.addValueData(outputRow, data.inputRowSize + 1, predicate);
203
				outputRow = RowDataUtil.addValueData(outputRow, data.inputRowSize + 2, object);
204
205
				// Joga tripla no fluxo
206
				putRow(data.outputRowMeta, outputRow);
207
208
				numPutRows++;
209
			} else
210
				logBasic("Tripla ignorada: " + stmt.getString());
211
		}
212
213
		return numPutRows;
214
	}
215
216
	private int runQueryAndPutResults(Query query, SparqlStepMeta meta, SparqlStepData data, Object[] row)
217
			throws KettleStepException {
218
		int numPutRows = 0;
219
		Model model = null;
220
		QueryExecution qexec = SparqlStepUtils.createQueryExecution(query, meta.getEndpointUri(),
221
				meta.getDefaultGraph());
222
223
		try {
224
			switch (query.getQueryType()) {
225
			case Query.QueryTypeAsk:
226
				boolean result = qexec.execAsk();
227
				incrementLinesInput();
228
				putRow(data.outputRowMeta, RowDataUtil.addValueData(row, data.inputRowSize, result));
229
				break;
230
231
			case Query.QueryTypeConstruct:
232
				model = qexec.execConstruct();
233
				numPutRows = tripleWriter(model, row, data);
234
				break;
235
236
			case Query.QueryTypeDescribe:
237
				model = qexec.execDescribe();
238
				numPutRows = tripleWriter(model, row, data);
239
				break;
240
241
			case Query.QueryTypeSelect:
242
				ResultSet resultSet = qexec.execSelect();
243
				// Gera linhas
244
				while (resultSet.hasNext()) {
245
					QuerySolution qs = resultSet.next();
246
247
					// Diz ao Kettle que leu mais uma linha da entrada
248
					incrementLinesInput();
249
250
					// Gera uma linha de saida do fluxo
251
					Object[] outputRow = row;
252
					String[] fieldNames = data.outputRowMeta.getFieldNames();
253
					int posAddValueData = (outputRow != null) ? outputRow.length : 0;
254
					for (int i = 0; i < fieldNames.length; i++) {
255
						// Retira o prefixo para obter o nome do campo da
256
						// consulta SPARQL
257
						String var = fieldNames[i].replaceAll(meta.getVarPrefix(), "");
258
259
						// Obtem o node RDF
260
						RDFNode node = qs.get(var);
261
262
						// Obtem o valor do node RDF
263
						String value = null;
264
						if (node instanceof Literal) {
265
							value = qs.getLiteral(var).getString();
266
						} else {
267
							value = qs.getResource(var).getURI();
268
						}
269
270
						// Set output row
271
						outputRow = RowDataUtil.addValueData(outputRow, posAddValueData++, value);
272
					}
273
274
					putRow(data.outputRowMeta, outputRow);
275
					numPutRows++;
276
				}
277
				break;
278
			}
279
		} finally {
280
			qexec.close();
281
		}
282
283
		return numPutRows;
284
	}
285
}