@@ 62-175 (lines=114) @@ | ||
59 | */ |
|
60 | // Rogers(Nov/2012): Correcao de bug na ordenacao dos campos da consulta |
|
61 | // SPARQL |
|
62 | public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { |
|
63 | GraphSparqlStepMeta meta = (GraphSparqlStepMeta) smi; |
|
64 | GraphSparqlStepData data = (GraphSparqlStepData) sdi; |
|
65 | ||
66 | // Obtem linha do fluxo de entrada |
|
67 | final Object[] row = getRow(); |
|
68 | ||
69 | if (first) { |
|
70 | // Executa apenas uma vez. Variavel first definida na superclasse |
|
71 | first = false; |
|
72 | ||
73 | // Obtem todas as colunas ate o step anterior. |
|
74 | // Chamar apenas apos chamar getRow() |
|
75 | RowMetaInterface rowMeta = getInputRowMeta(row != null); |
|
76 | data.outputRowMeta = rowMeta.clone(); |
|
77 | ||
78 | // Adiciona os metadados do step atual |
|
79 | meta.getFields(data.outputRowMeta, getStepname(), null, null, this); |
|
80 | ||
81 | data.inputRowSize = rowMeta.size(); |
|
82 | ||
83 | // Obtem string de consulta e constroi o objeto consulta |
|
84 | String queryStr = GraphSparqlStepUtils.toFullQueryString(meta.getPrefixes(), meta.getQueryString()); |
|
85 | try { |
|
86 | data.originalQuery = QueryFactory.create(queryStr); |
|
87 | } catch (QueryException e) { |
|
88 | // Se consulta for invalida nao pode continuar |
|
89 | throw new KettleException(e); |
|
90 | } |
|
91 | ||
92 | // Se nao usar SAX o execSelect() nao funciona |
|
93 | ARQ.set(ARQ.useSAX, true); |
|
94 | ||
95 | // Offset e Limit para Construct/select/describe quando limit nao |
|
96 | // especificado |
|
97 | if (!data.originalQuery.hasLimit() && (data.originalQuery.getQueryType() != Query.QueryTypeAsk) |
|
98 | && (data.originalQuery.getQueryType() != Query.QueryTypeDescribe)) { |
|
99 | // Consulta eh quebrada em varias usando OFFSET e LIMIT |
|
100 | data.offset = data.originalQuery.hasOffset() ? data.originalQuery.getOffset() : 0; |
|
101 | data.limit = 1000; |
|
102 | data.runAtOnce = false; |
|
103 | } else { |
|
104 | data.runAtOnce = true; |
|
105 | } |
|
106 | ||
107 | data.remainingTries = MAX_ATTEMPTS; |
|
108 | ||
109 | return true; |
|
110 | } |
|
111 | ||
112 | Query query = null; |
|
113 | if (data.runAtOnce) { |
|
114 | // Roda consulta num unico HTTP Request |
|
115 | query = data.originalQuery; |
|
116 | ||
117 | while (data.remainingTries > 0) { |
|
118 | // Tenta executar consulta ate MAX_ATTEMPTS vezes |
|
119 | try { |
|
120 | runQueryAndPutResults(query, meta, data, row); |
|
121 | ||
122 | setOutputDone(); |
|
123 | return false; // Nao ha mais resultados, ie, processRow() |
|
124 | // nao sera' chamado novamente |
|
125 | } catch (Throwable e) { |
|
126 | handleError(e, MAX_ATTEMPTS - data.remainingTries + 1); |
|
127 | } |
|
128 | ||
129 | data.remainingTries--; |
|
130 | } |
|
131 | } else { |
|
132 | // Cria consulta que representa o bloco atual |
|
133 | query = data.originalQuery.cloneQuery(); |
|
134 | query.setOffset(data.offset); |
|
135 | query.setLimit(data.limit); |
|
136 | ||
137 | while (data.remainingTries > 0) { // Tenta executar este bloco ate' |
|
138 | // MAX_ATTEMPTS vezes |
|
139 | try { |
|
140 | int numRows = runQueryAndPutResults(query, meta, data, row); |
|
141 | ||
142 | if (numRows > 0) { // Este bloco de consulta rodou |
|
143 | data.offset += data.limit; |
|
144 | data.remainingTries = MAX_ATTEMPTS; |
|
145 | ||
146 | return true; |
|
147 | } else { // Nao ha mais resultados, ie, processRow() nao |
|
148 | // sera' |
|
149 | // chamado novamente |
|
150 | setOutputDone(); |
|
151 | return false; |
|
152 | } |
|
153 | } catch (Throwable e) { |
|
154 | handleError(e, MAX_ATTEMPTS - data.remainingTries + 1); |
|
155 | } |
|
156 | ||
157 | data.remainingTries--; |
|
158 | } |
|
159 | } |
|
160 | ||
161 | // Nao funfou! |
|
162 | StringBuilder sb = new StringBuilder(); |
|
163 | sb.append("Todas as tentativas de executar a consulta falharam. "); |
|
164 | sb.append("Verifique conexão de rede e o SPARQL Endpoint.\n"); |
|
165 | sb.append("Endpoint: "); |
|
166 | sb.append(meta.getEndpointUri()); |
|
167 | sb.append('\n'); |
|
168 | sb.append("Grafo padrão: "); |
|
169 | sb.append(meta.getDefaultGraph()); |
|
170 | sb.append('\n'); |
|
171 | sb.append("Consulta:\n"); |
|
172 | sb.append(query.toString()); |
|
173 | sb.append('\n'); |
|
174 | ||
175 | throw new KettleException(sb.toString()); |
|
176 | } |
|
177 | ||
178 | private RowMetaInterface getInputRowMeta(boolean hasInputRow) { |
@@ 42-146 (lines=105) @@ | ||
39 | super(stepMeta, stepDataInterface, copyNr, transMeta, trans); |
|
40 | } |
|
41 | ||
42 | public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { |
|
43 | SparqlStepMeta meta = (SparqlStepMeta) smi; |
|
44 | SparqlStepData data = (SparqlStepData) sdi; |
|
45 | ||
46 | final Object[] row = getRow(); |
|
47 | ||
48 | if (first) { |
|
49 | first = false; |
|
50 | ||
51 | RowMetaInterface rowMeta = getInputRowMeta(row != null); |
|
52 | data.outputRowMeta = rowMeta.clone(); |
|
53 | ||
54 | // Adiciona os metadados do step atual |
|
55 | meta.getFields(data.outputRowMeta, getStepname(), null, null, this); |
|
56 | ||
57 | data.inputRowSize = rowMeta.size(); |
|
58 | ||
59 | // Obtem string de consulta e constroi o objeto consulta |
|
60 | String queryStr = SparqlStepUtils.toFullQueryString(meta.getPrefixes(), meta.getQueryString()); |
|
61 | try { |
|
62 | data.originalQuery = QueryFactory.create(queryStr); |
|
63 | } catch (QueryException e) { |
|
64 | // Se consulta for invalida nao pode continuar |
|
65 | throw new KettleException(e); |
|
66 | } |
|
67 | ||
68 | // Se nao usar SAX o execSelect() nao funciona |
|
69 | ARQ.set(ARQ.useSAX, true); |
|
70 | ||
71 | // Offset e Limit para Construct/select/describe quando limit nao |
|
72 | // especificado |
|
73 | if (!data.originalQuery.hasLimit() && (data.originalQuery.getQueryType() != Query.QueryTypeAsk) |
|
74 | && (data.originalQuery.getQueryType() != Query.QueryTypeDescribe)) { |
|
75 | // Consulta eh quebrada em varias usando OFFSET e LIMIT |
|
76 | data.offset = data.originalQuery.hasOffset() ? data.originalQuery.getOffset() : 0; |
|
77 | data.limit = 1000; |
|
78 | data.runAtOnce = false; |
|
79 | } else { |
|
80 | data.runAtOnce = true; |
|
81 | } |
|
82 | ||
83 | data.remainingTries = MAX_ATTEMPTS; |
|
84 | ||
85 | return true; |
|
86 | } |
|
87 | ||
88 | Query query = null; |
|
89 | if (data.runAtOnce) { |
|
90 | // Roda consulta num unico HTTP Request |
|
91 | query = data.originalQuery; |
|
92 | ||
93 | while (data.remainingTries > 0) { |
|
94 | // Tenta executar consulta ate MAX_ATTEMPTS vezes |
|
95 | try { |
|
96 | runQueryAndPutResults(query, meta, data, row); |
|
97 | ||
98 | setOutputDone(); |
|
99 | return false; // Nao ha mais resultados, ie, processRow() |
|
100 | // nao sera' chamado novamente |
|
101 | } catch (Throwable e) { |
|
102 | handleError(e, MAX_ATTEMPTS - data.remainingTries + 1); |
|
103 | } |
|
104 | ||
105 | data.remainingTries--; |
|
106 | } |
|
107 | } else { |
|
108 | // Cria consulta que representa o bloco atual |
|
109 | query = data.originalQuery.cloneQuery(); |
|
110 | query.setOffset(data.offset); |
|
111 | query.setLimit(data.limit); |
|
112 | ||
113 | while (data.remainingTries > 0) { |
|
114 | try { |
|
115 | int numRows = runQueryAndPutResults(query, meta, data, row); |
|
116 | ||
117 | if (numRows > 0) { |
|
118 | data.offset += data.limit; |
|
119 | data.remainingTries = MAX_ATTEMPTS; |
|
120 | return true; |
|
121 | } else { |
|
122 | setOutputDone(); |
|
123 | return false; |
|
124 | } |
|
125 | } catch (Throwable e) { |
|
126 | handleError(e, MAX_ATTEMPTS - data.remainingTries + 1); |
|
127 | } |
|
128 | ||
129 | data.remainingTries--; |
|
130 | } |
|
131 | } |
|
132 | ||
133 | StringBuilder sb = new StringBuilder(); |
|
134 | sb.append("Todas as tentativas de executar a consulta falharam. "); |
|
135 | sb.append("Verifique conexão de rede e o SPARQL Endpoint.\n"); |
|
136 | sb.append("Endpoint: "); |
|
137 | sb.append(meta.getEndpointUri()); |
|
138 | sb.append('\n'); |
|
139 | sb.append("Grafo padrão: "); |
|
140 | sb.append(meta.getDefaultGraph()); |
|
141 | sb.append('\n'); |
|
142 | sb.append("Consulta:\n"); |
|
143 | sb.append(query.toString()); |
|
144 | sb.append('\n'); |
|
145 | ||
146 | throw new KettleException(sb.toString()); |
|
147 | } |
|
148 | ||
149 | private RowMetaInterface getInputRowMeta(boolean hasInputRow) { |