processRow(StepMetaInterface,StepDataInterface)   D
last analyzed

Complexity

Conditions 13

Size

Total Lines 114
Code Lines 66

Duplication

Lines 114
Ratio 100 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
eloc 66
c 1
b 0
f 1
dl 114
loc 114
rs 4.0527
cc 13

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like br.ufrj.ppgi.greco.kettle.GraphSparqlStep.processRow(StepMetaInterface,StepDataInterface) often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
package br.ufrj.ppgi.greco.kettle;
2
3
import java.io.ByteArrayOutputStream;
4
import java.io.IOException;
5
import java.io.PrintWriter;
6
7
import org.pentaho.di.core.exception.KettleException;
8
import org.pentaho.di.core.exception.KettleStepException;
9
import org.pentaho.di.core.row.RowDataUtil;
10
import org.pentaho.di.core.row.RowMeta;
11
import org.pentaho.di.core.row.RowMetaInterface;
12
import org.pentaho.di.trans.Trans;
13
import org.pentaho.di.trans.TransMeta;
14
import org.pentaho.di.trans.step.BaseStep;
15
import org.pentaho.di.trans.step.StepDataInterface;
16
import org.pentaho.di.trans.step.StepInterface;
17
import org.pentaho.di.trans.step.StepMeta;
18
import org.pentaho.di.trans.step.StepMetaInterface;
19
20
import org.apache.jena.query.ARQ;
21
import org.apache.jena.query.Query;
22
import org.apache.jena.query.QueryException;
23
import org.apache.jena.query.QueryExecution;
24
import org.apache.jena.query.QueryFactory;
25
import org.apache.jena.query.ResultSet;
26
import org.apache.jena.rdf.model.Model;
27
import org.apache.jena.rdf.model.Property;
28
import org.apache.jena.rdf.model.RDFNode;
29
import org.apache.jena.rdf.model.ResIterator;
30
import org.apache.jena.rdf.model.Resource;
31
import org.apache.jena.rdf.model.Selector;
32
import org.apache.jena.rdf.model.SimpleSelector;
33
34
/**
35
 * Adaptacoes: <br />
36
 * No output, em vez de passar campos separados, passar: <br />
37
 * (i) um objeto Graph (SELECT, DESCRIBE, CONSTRUCT) ou <br />
38
 * (ii) um objeto Boolean (ASK). <br />
39
 * 
40
 * @author rogers
41
 * 
42
 *         Change: Step grain changed to subjectItem (resource and its
43
 *         properties) instead of the resultRDF Graph result from CONSTRUCT
44
 *         sparql command
45
 * 
46
 * @author Kelli
47
 */
48
public class GraphSparqlStep extends BaseStep implements StepInterface {
49
50
	private static int MAX_ATTEMPTS = 4;
51
52
	public GraphSparqlStep(StepMeta stepMeta, StepDataInterface stepDataInterface, int copyNr, TransMeta transMeta,
53
			Trans trans) {
54
		super(stepMeta, stepDataInterface, copyNr, transMeta, trans);
55
	}
56
57
	/**
58
	 * Metodo chamado para cada linha que entra no step.
59
	 */
60
	// Rogers(Nov/2012): Correcao de bug na ordenacao dos campos da consulta
61
	// SPARQL
62 View Code Duplication
	public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
63
		GraphSparqlStepMeta meta = (GraphSparqlStepMeta) smi;
64
		GraphSparqlStepData data = (GraphSparqlStepData) sdi;
65
66
		// Obtem linha do fluxo de entrada
67
		final Object[] row = getRow();
68
69
		if (first) {
70
			// Executa apenas uma vez. Variavel first definida na superclasse
71
			first = false;
72
73
			// Obtem todas as colunas ate o step anterior.
74
			// Chamar apenas apos chamar getRow()
75
			RowMetaInterface rowMeta = getInputRowMeta(row != null);
76
			data.outputRowMeta = rowMeta.clone();
77
78
			// Adiciona os metadados do step atual
79
			meta.getFields(data.outputRowMeta, getStepname(), null, null, this);
80
81
			data.inputRowSize = rowMeta.size();
82
83
			// Obtem string de consulta e constroi o objeto consulta
84
			String queryStr = GraphSparqlStepUtils.toFullQueryString(meta.getPrefixes(), meta.getQueryString());
85
			try {
86
				data.originalQuery = QueryFactory.create(queryStr);
87
			} catch (QueryException e) {
88
				// Se consulta for invalida nao pode continuar
89
				throw new KettleException(e);
90
			}
91
92
			// Se nao usar SAX o execSelect() nao funciona
93
			ARQ.set(ARQ.useSAX, true);
94
95
			// Offset e Limit para Construct/select/describe quando limit nao
96
			// especificado
97
			if (!data.originalQuery.hasLimit() && (data.originalQuery.getQueryType() != Query.QueryTypeAsk)
98
					&& (data.originalQuery.getQueryType() != Query.QueryTypeDescribe)) {
99
				// Consulta eh quebrada em varias usando OFFSET e LIMIT
100
				data.offset = data.originalQuery.hasOffset() ? data.originalQuery.getOffset() : 0;
101
				data.limit = 1000;
102
				data.runAtOnce = false;
103
			} else {
104
				data.runAtOnce = true;
105
			}
106
107
			data.remainingTries = MAX_ATTEMPTS;
108
109
			return true;
110
		}
111
112
		Query query = null;
113
		if (data.runAtOnce) {
114
			// Roda consulta num unico HTTP Request
115
			query = data.originalQuery;
116
117
			while (data.remainingTries > 0) {
118
				// Tenta executar consulta ate MAX_ATTEMPTS vezes
119
				try {
120
					runQueryAndPutResults(query, meta, data, row);
121
122
					setOutputDone();
123
					return false; // Nao ha mais resultados, ie, processRow()
124
									// nao sera' chamado novamente
125
				} catch (Throwable e) {
126
					handleError(e, MAX_ATTEMPTS - data.remainingTries + 1);
127
				}
128
129
				data.remainingTries--;
130
			}
131
		} else {
132
			// Cria consulta que representa o bloco atual
133
			query = data.originalQuery.cloneQuery();
134
			query.setOffset(data.offset);
135
			query.setLimit(data.limit);
136
137
			while (data.remainingTries > 0) { // Tenta executar este bloco ate'
138
												// MAX_ATTEMPTS vezes
139
				try {
140
					int numRows = runQueryAndPutResults(query, meta, data, row);
141
142
					if (numRows > 0) { // Este bloco de consulta rodou
143
						data.offset += data.limit;
144
						data.remainingTries = MAX_ATTEMPTS;
145
146
						return true;
147
					} else { // Nao ha mais resultados, ie, processRow() nao
148
								// sera'
149
								// chamado novamente
150
						setOutputDone();
151
						return false;
152
					}
153
				} catch (Throwable e) {
154
					handleError(e, MAX_ATTEMPTS - data.remainingTries + 1);
155
				}
156
157
				data.remainingTries--;
158
			}
159
		}
160
161
		// Nao funfou!
162
		StringBuilder sb = new StringBuilder();
163
		sb.append("Todas as tentativas de executar a consulta falharam. ");
164
		sb.append("Verifique conexão de rede e o SPARQL Endpoint.\n");
165
		sb.append("Endpoint: ");
166
		sb.append(meta.getEndpointUri());
167
		sb.append('\n');
168
		sb.append("Grafo padrão: ");
169
		sb.append(meta.getDefaultGraph());
170
		sb.append('\n');
171
		sb.append("Consulta:\n");
172
		sb.append(query.toString());
173
		sb.append('\n');
174
175
		throw new KettleException(sb.toString());
176
	}
177
178
	private RowMetaInterface getInputRowMeta(boolean hasInputRow) {
179
180
		RowMetaInterface rowMeta = null;
181
		if (hasInputRow)
182
			rowMeta = getInputRowMeta();
183
		else
184
			rowMeta = new RowMeta();
185
186
		return rowMeta;
187
	}
188
189 View Code Duplication
	private void handleError(Throwable e, int attempts) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
190
191
		try {
192
			String msg = String.format("Falha ao executar consulta (tentativa %d de %d): ", attempts, MAX_ATTEMPTS);
193
194
			ByteArrayOutputStream baos = new ByteArrayOutputStream();
195
			baos.write(msg.getBytes());
196
197
			e.printStackTrace(new PrintWriter(baos, true));
198
199
			long sleepTime = (long) (500 * Math.pow(2, attempts));
200
			msg = String.format("Tentando novamente em %d milissegundos...", sleepTime);
201
			baos.write(msg.getBytes());
202
203
			log.logBasic(baos.toString());
204
205
			Thread.sleep(sleepTime);
206
207
		} catch (IOException e1) {
208
			e1.printStackTrace();
209
		} catch (InterruptedException e2) {
210
			e2.printStackTrace();
211
		}
212
	}
213
214
	// Rogers(Nov/2012): Correcao de bug na ordenacao dos campos da consulta
215
	// SPARQL
216
	private int runQueryAndPutResults(Query query, GraphSparqlStepMeta meta, GraphSparqlStepData data, Object[] row)
217
			throws KettleStepException {
218
		int numPutRows = 0;
219
		QueryExecution qexec = GraphSparqlStepUtils.createQueryExecution(query, meta.getEndpointUri(),
220
				meta.getDefaultGraph());
221
222
		try {
223
			Model model = null;
224
			switch (query.getQueryType()) {
225
			case Query.QueryTypeAsk:
226
				Boolean result = qexec.execAsk();
227
				incrementLinesInput();
228
				putRow(data.outputRowMeta, RowDataUtil.addValueData(row, data.inputRowSize, result));
229
				break;
230
231
			case Query.QueryTypeConstruct:
232
				model = qexec.execConstruct();
233
				ResIterator resourceSet = model.listSubjects();
234
				int count = 0;
235
				while (resourceSet.hasNext()) {
236
					Resource resource = resourceSet.nextResource();
237
					// gets a subgraph
238
					Model subjectItemGraph = createSubjectItemGraph(resource, model);
239
240
					// send a subGraph to the next step
241
					incrementLinesInput();
242
					putRow(data.outputRowMeta, RowDataUtil.addValueData(row, data.inputRowSize, subjectItemGraph));
243
					count++;
244
				}
245
				if (count == 0) {
246
					incrementLinesInput();
247
					putRow(data.outputRowMeta, RowDataUtil.addValueData(row, data.inputRowSize, model));
248
				}
249
				break;
250
251
			case Query.QueryTypeDescribe:
252
				model = qexec.execDescribe();
253
				ResIterator resourceSetD = model.listSubjects();
254
				int countD = 0;
255
				while (resourceSetD.hasNext()) {
256
					Resource resource = resourceSetD.nextResource();
257
					// gets a subgraph
258
					Model subjectItemGraph = createSubjectItemGraph(resource, model);
259
260
					// send a subGraph to the next step
261
					incrementLinesInput();
262
					putRow(data.outputRowMeta, RowDataUtil.addValueData(row, data.inputRowSize, subjectItemGraph));
263
					countD++;
264
				}
265
				if (countD == 0) {
266
					incrementLinesInput();
267
					putRow(data.outputRowMeta, RowDataUtil.addValueData(row, data.inputRowSize, model));
268
				}
269
270
				// incrementLinesInput();
271
				// putRow(data.outputRowMeta, RowDataUtil.addValueData(row,
272
				// data.inputRowSize, model));
273
				break;
274
275
			case Query.QueryTypeSelect:
276
				ResultSet resultSet = qexec.execSelect();
277
				model = resultSet.getResourceModel();
278
279
				Object extra = (model != null) ? model : resultSet;
280
				incrementLinesInput();
281
				putRow(data.outputRowMeta, RowDataUtil.addValueData(row, data.inputRowSize, extra));
282
				break;
283
			}
284
		} finally {
285
			qexec.close();
286
		}
287
288
		return numPutRows;
289
	}
290
291
	// Creates a subGraph with a Resource and its Properties
292
	private Model createSubjectItemGraph(Resource resource, Model model) {
293
		Model subjectItemGraph = null;
294
		Selector s = new SimpleSelector(resource, (Property) null, (RDFNode) null);
295
		subjectItemGraph = model.query(s);
296
		// StmtIterator i = model.listStatements(s);
297
298
		return subjectItemGraph;
299
	}
300
}