Code Duplication    Length = 105-114 lines in 2 locations

plugins/GraphSparqlEndpoint/src/main/java/br/ufrj/ppgi/greco/kettle/GraphSparqlStep.java 1 location

@@ 62-175 (lines=114) @@
59
	 */
60
	// Rogers(Nov/2012): Correcao de bug na ordenacao dos campos da consulta
61
	// SPARQL
62
	public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
63
		GraphSparqlStepMeta meta = (GraphSparqlStepMeta) smi;
64
		GraphSparqlStepData data = (GraphSparqlStepData) sdi;
65
66
		// Obtem linha do fluxo de entrada
67
		final Object[] row = getRow();
68
69
		if (first) {
70
			// Executa apenas uma vez. Variavel first definida na superclasse
71
			first = false;
72
73
			// Obtem todas as colunas ate o step anterior.
74
			// Chamar apenas apos chamar getRow()
75
			RowMetaInterface rowMeta = getInputRowMeta(row != null);
76
			data.outputRowMeta = rowMeta.clone();
77
78
			// Adiciona os metadados do step atual
79
			meta.getFields(data.outputRowMeta, getStepname(), null, null, this);
80
81
			data.inputRowSize = rowMeta.size();
82
83
			// Obtem string de consulta e constroi o objeto consulta
84
			String queryStr = GraphSparqlStepUtils.toFullQueryString(meta.getPrefixes(), meta.getQueryString());
85
			try {
86
				data.originalQuery = QueryFactory.create(queryStr);
87
			} catch (QueryException e) {
88
				// Se consulta for invalida nao pode continuar
89
				throw new KettleException(e);
90
			}
91
92
			// Se nao usar SAX o execSelect() nao funciona
93
			ARQ.set(ARQ.useSAX, true);
94
95
			// Offset e Limit para Construct/select/describe quando limit nao
96
			// especificado
97
			if (!data.originalQuery.hasLimit() && (data.originalQuery.getQueryType() != Query.QueryTypeAsk)
98
					&& (data.originalQuery.getQueryType() != Query.QueryTypeDescribe)) {
99
				// Consulta eh quebrada em varias usando OFFSET e LIMIT
100
				data.offset = data.originalQuery.hasOffset() ? data.originalQuery.getOffset() : 0;
101
				data.limit = 1000;
102
				data.runAtOnce = false;
103
			} else {
104
				data.runAtOnce = true;
105
			}
106
107
			data.remainingTries = MAX_ATTEMPTS;
108
109
			return true;
110
		}
111
112
		Query query = null;
113
		if (data.runAtOnce) {
114
			// Roda consulta num unico HTTP Request
115
			query = data.originalQuery;
116
117
			while (data.remainingTries > 0) {
118
				// Tenta executar consulta ate MAX_ATTEMPTS vezes
119
				try {
120
					runQueryAndPutResults(query, meta, data, row);
121
122
					setOutputDone();
123
					return false; // Nao ha mais resultados, ie, processRow()
124
									// nao sera' chamado novamente
125
				} catch (Throwable e) {
126
					handleError(e, MAX_ATTEMPTS - data.remainingTries + 1);
127
				}
128
129
				data.remainingTries--;
130
			}
131
		} else {
132
			// Cria consulta que representa o bloco atual
133
			query = data.originalQuery.cloneQuery();
134
			query.setOffset(data.offset);
135
			query.setLimit(data.limit);
136
137
			while (data.remainingTries > 0) { // Tenta executar este bloco ate'
138
												// MAX_ATTEMPTS vezes
139
				try {
140
					int numRows = runQueryAndPutResults(query, meta, data, row);
141
142
					if (numRows > 0) { // Este bloco de consulta rodou
143
						data.offset += data.limit;
144
						data.remainingTries = MAX_ATTEMPTS;
145
146
						return true;
147
					} else { // Nao ha mais resultados, ie, processRow() nao
148
								// sera'
149
								// chamado novamente
150
						setOutputDone();
151
						return false;
152
					}
153
				} catch (Throwable e) {
154
					handleError(e, MAX_ATTEMPTS - data.remainingTries + 1);
155
				}
156
157
				data.remainingTries--;
158
			}
159
		}
160
161
		// Nao funfou!
162
		StringBuilder sb = new StringBuilder();
163
		sb.append("Todas as tentativas de executar a consulta falharam. ");
164
		sb.append("Verifique conexão de rede e o SPARQL Endpoint.\n");
165
		sb.append("Endpoint: ");
166
		sb.append(meta.getEndpointUri());
167
		sb.append('\n');
168
		sb.append("Grafo padrão: ");
169
		sb.append(meta.getDefaultGraph());
170
		sb.append('\n');
171
		sb.append("Consulta:\n");
172
		sb.append(query.toString());
173
		sb.append('\n');
174
175
		throw new KettleException(sb.toString());
176
	}
177
178
	private RowMetaInterface getInputRowMeta(boolean hasInputRow) {

plugins/SparqlEndpoint/src/main/java/br/ufrj/ppgi/greco/kettle/SparqlStep.java 1 location

@@ 42-146 (lines=105) @@
39
		super(stepMeta, stepDataInterface, copyNr, transMeta, trans);
40
	}
41
42
	public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
43
		SparqlStepMeta meta = (SparqlStepMeta) smi;
44
		SparqlStepData data = (SparqlStepData) sdi;
45
46
		final Object[] row = getRow();
47
48
		if (first) {
49
			first = false;
50
51
			RowMetaInterface rowMeta = getInputRowMeta(row != null);
52
			data.outputRowMeta = rowMeta.clone();
53
54
			// Adiciona os metadados do step atual
55
			meta.getFields(data.outputRowMeta, getStepname(), null, null, this);
56
57
			data.inputRowSize = rowMeta.size();
58
59
			// Obtem string de consulta e constroi o objeto consulta
60
			String queryStr = SparqlStepUtils.toFullQueryString(meta.getPrefixes(), meta.getQueryString());
61
			try {
62
				data.originalQuery = QueryFactory.create(queryStr);
63
			} catch (QueryException e) {
64
				// Se consulta for invalida nao pode continuar
65
				throw new KettleException(e);
66
			}
67
68
			// Se nao usar SAX o execSelect() nao funciona
69
			ARQ.set(ARQ.useSAX, true);
70
71
			// Offset e Limit para Construct/select/describe quando limit nao
72
			// especificado
73
			if (!data.originalQuery.hasLimit() && (data.originalQuery.getQueryType() != Query.QueryTypeAsk)
74
					&& (data.originalQuery.getQueryType() != Query.QueryTypeDescribe)) {
75
				// Consulta eh quebrada em varias usando OFFSET e LIMIT
76
				data.offset = data.originalQuery.hasOffset() ? data.originalQuery.getOffset() : 0;
77
				data.limit = 1000;
78
				data.runAtOnce = false;
79
			} else {
80
				data.runAtOnce = true;
81
			}
82
83
			data.remainingTries = MAX_ATTEMPTS;
84
85
			return true;
86
		}
87
88
		Query query = null;
89
		if (data.runAtOnce) {
90
			// Roda consulta num unico HTTP Request
91
			query = data.originalQuery;
92
93
			while (data.remainingTries > 0) {
94
				// Tenta executar consulta ate MAX_ATTEMPTS vezes
95
				try {
96
					runQueryAndPutResults(query, meta, data, row);
97
98
					setOutputDone();
99
					return false; // Nao ha mais resultados, ie, processRow()
100
									// nao sera' chamado novamente
101
				} catch (Throwable e) {
102
					handleError(e, MAX_ATTEMPTS - data.remainingTries + 1);
103
				}
104
105
				data.remainingTries--;
106
			}
107
		} else {
108
			// Cria consulta que representa o bloco atual
109
			query = data.originalQuery.cloneQuery();
110
			query.setOffset(data.offset);
111
			query.setLimit(data.limit);
112
113
			while (data.remainingTries > 0) {
114
				try {
115
					int numRows = runQueryAndPutResults(query, meta, data, row);
116
117
					if (numRows > 0) {
118
						data.offset += data.limit;
119
						data.remainingTries = MAX_ATTEMPTS;
120
						return true;
121
					} else { 
122
						setOutputDone();
123
						return false;
124
					}
125
				} catch (Throwable e) {
126
					handleError(e, MAX_ATTEMPTS - data.remainingTries + 1);
127
				}
128
129
				data.remainingTries--;
130
			}
131
		}
132
133
		StringBuilder sb = new StringBuilder();
134
		sb.append("Todas as tentativas de executar a consulta falharam. ");
135
		sb.append("Verifique conexão de rede e o SPARQL Endpoint.\n");
136
		sb.append("Endpoint: ");
137
		sb.append(meta.getEndpointUri());
138
		sb.append('\n');
139
		sb.append("Grafo padrão: ");
140
		sb.append(meta.getDefaultGraph());
141
		sb.append('\n');
142
		sb.append("Consulta:\n");
143
		sb.append(query.toString());
144
		sb.append('\n');
145
146
		throw new KettleException(sb.toString());
147
	}
148
149
	private RowMetaInterface getInputRowMeta(boolean hasInputRow) {