1   /*
2    * RDFpro - An extensible tool for building stream-oriented RDF processing libraries.
3    * 
4    * Written in 2014 by Francesco Corcoglioniti with support by Marco Amadori, Michele Mostarda,
5    * Alessio Palmero Aprosio and Marco Rospocher. Contact info on http://rdfpro.fbk.eu/
6    * 
7    * To the extent possible under law, the authors have dedicated all copyright and related and
8    * neighboring rights to this software to the public domain worldwide. This software is
9    * distributed without any warranty.
10   * 
11   * You should have received a copy of the CC0 Public Domain Dedication along with this software.
12   * If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.
13   */
14  package eu.fbk.rdfpro;
15  
16  import java.io.Closeable;
17  import java.io.DataOutputStream;
18  import java.io.File;
19  import java.io.IOException;
20  import java.io.InputStream;
21  import java.io.InputStreamReader;
22  import java.io.Reader;
23  import java.net.HttpURLConnection;
24  import java.net.URL;
25  import java.net.URLEncoder;
26  import java.nio.charset.Charset;
27  import java.util.ArrayList;
28  import java.util.Arrays;
29  import java.util.Collections;
30  import java.util.Comparator;
31  import java.util.HashMap;
32  import java.util.Iterator;
33  import java.util.List;
34  import java.util.Map;
35  import java.util.Objects;
36  import java.util.Spliterator;
37  import java.util.Spliterators;
38  import java.util.concurrent.CountDownLatch;
39  import java.util.concurrent.atomic.AtomicInteger;
40  import java.util.concurrent.atomic.AtomicReference;
41  
42  import javax.annotation.Nullable;
43  import javax.xml.stream.XMLInputFactory;
44  import javax.xml.stream.XMLStreamConstants;
45  import javax.xml.stream.XMLStreamException;
46  import javax.xml.stream.XMLStreamReader;
47  
48  import org.openrdf.model.BNode;
49  import org.openrdf.model.Model;
50  import org.openrdf.model.Namespace;
51  import org.openrdf.model.Resource;
52  import org.openrdf.model.Statement;
53  import org.openrdf.model.URI;
54  import org.openrdf.model.Value;
55  import org.openrdf.model.ValueFactory;
56  import org.openrdf.model.impl.NamespaceImpl;
57  import org.openrdf.rio.ParserConfig;
58  import org.openrdf.rio.RDFFormat;
59  import org.openrdf.rio.RDFHandler;
60  import org.openrdf.rio.RDFHandlerException;
61  import org.openrdf.rio.RDFParseException;
62  import org.openrdf.rio.RDFParser;
63  import org.openrdf.rio.Rio;
64  import org.openrdf.rio.helpers.BasicParserSettings;
65  import org.openrdf.rio.helpers.NTriplesParserSettings;
66  import org.openrdf.rio.helpers.ParseErrorLogger;
67  import org.openrdf.rio.helpers.RDFJSONParserSettings;
68  import org.openrdf.rio.helpers.TriXParserSettings;
69  import org.openrdf.rio.helpers.XMLParserSettings;
70  import org.slf4j.Logger;
71  import org.slf4j.LoggerFactory;
72  
73  import eu.fbk.rdfpro.util.Environment;
74  import eu.fbk.rdfpro.util.Hash;
75  import eu.fbk.rdfpro.util.IO;
76  import eu.fbk.rdfpro.util.QuadModel;
77  import eu.fbk.rdfpro.util.Statements;
78  
79  /**
80   * Utility methods dealing with {@code RDFSource}s.
81   */
82  public final class RDFSources {
83  
84      private static final ParserConfig DEFAULT_PARSER_CONFIG;
85  
86      static {
87          final ParserConfig config = new ParserConfig();
88          config.set(BasicParserSettings.FAIL_ON_UNKNOWN_DATATYPES, false);
89          config.set(BasicParserSettings.FAIL_ON_UNKNOWN_LANGUAGES, false);
90          config.set(BasicParserSettings.VERIFY_DATATYPE_VALUES, false);
91          config.set(BasicParserSettings.VERIFY_LANGUAGE_TAGS, false);
92          config.set(BasicParserSettings.VERIFY_RELATIVE_URIS, false);
93          config.set(BasicParserSettings.NORMALIZE_DATATYPE_VALUES, false);
94          config.set(BasicParserSettings.NORMALIZE_LANGUAGE_TAGS, true);
95          config.set(BasicParserSettings.PRESERVE_BNODE_IDS, true);
96          config.set(NTriplesParserSettings.FAIL_ON_NTRIPLES_INVALID_LINES, false);
97          config.set(RDFJSONParserSettings.FAIL_ON_MULTIPLE_OBJECT_DATATYPES, false);
98          config.set(RDFJSONParserSettings.FAIL_ON_MULTIPLE_OBJECT_LANGUAGES, false);
99          config.set(RDFJSONParserSettings.FAIL_ON_MULTIPLE_OBJECT_TYPES, false);
100         config.set(RDFJSONParserSettings.FAIL_ON_MULTIPLE_OBJECT_VALUES, false);
101         config.set(RDFJSONParserSettings.FAIL_ON_UNKNOWN_PROPERTY, false);
102         config.set(RDFJSONParserSettings.SUPPORT_GRAPHS_EXTENSION, true);
103         config.set(TriXParserSettings.FAIL_ON_TRIX_INVALID_STATEMENT, false);
104         config.set(TriXParserSettings.FAIL_ON_TRIX_MISSING_DATATYPE, false);
105         config.set(XMLParserSettings.FAIL_ON_DUPLICATE_RDF_ID, false);
106         config.set(XMLParserSettings.FAIL_ON_INVALID_NCNAME, false);
107         config.set(XMLParserSettings.FAIL_ON_INVALID_QNAME, false);
108         config.set(XMLParserSettings.FAIL_ON_MISMATCHED_TAGS, false);
109         config.set(XMLParserSettings.FAIL_ON_NON_STANDARD_ATTRIBUTES, false);
110         config.set(XMLParserSettings.FAIL_ON_SAX_NON_FATAL_ERRORS, false);
111         DEFAULT_PARSER_CONFIG = config;
112     }
113 
114     /** The null {@code RDFSource} that returns no statements, namespaces and comments. */
115     public static final RDFSource NIL = new RDFSource() {
116 
117         @Override
118         public void emit(final RDFHandler handler, final int passes) throws RDFSourceException,
119                 RDFHandlerException {
120             Objects.requireNonNull(handler);
121             try {
122                 for (int i = 0; i < passes; ++i) {
123                     handler.startRDF();
124                     handler.endRDF();
125                 }
126             } finally {
127                 IO.closeQuietly(handler);
128             }
129         }
130 
131         @Override
132         public Spliterator<Statement> spliterator() {
133             return Spliterators.emptySpliterator();
134         }
135 
136     };
137 
138     /**
139      * Returns an {@code RDFSource} providing access to the statements in the supplied collection.
140      * If the collection is a {@link Model} or {@link QuadModel}, also its namespaces are wrapped
141      * and returned when reading from the source. If you don't want namespaces to be read, use
142      * {@link #wrap(Iterable, Iterable)} passing null as second argument. The collection MUST NOT
143      * be changed while the returned source is being used, as this will cause the source to return
144      * different statements in different passes. Access to the collection is sequential, so it
145      * does not need to be thread-safe.
146      *
147      * @param statements
148      *            the statements collection, not null (can also be another {@code RDFSource})
149      * @return the created {@code RDFSource}.
150      */
151     public static RDFSource wrap(final Iterable<? extends Statement> statements) {
152         if (statements instanceof Model) {
153             return wrap(statements, ((Model) statements).getNamespaces());
154         } else if (statements instanceof QuadModel) {
155             return wrap(statements, ((QuadModel) statements).getNamespaces());
156         } else {
157             return wrap(statements, Collections.emptyList());
158         }
159     }
160 
161     /**
162      * Returns an {@code RDFSource} providing access to the statements and namespaces in the
163      * supplied collections. The two collections MUST NOT be changed while the returned source is
164      * being used, as this will cause the source to return different data in different passes.
165      * Access to the collections is sequential, so they do not need to be thread-safe.
166      *
167      * @param statements
168      *            the statements collection, not null (can also be another {@code RDFSource})
169      * @param namespaces
170      *            the namespaces collection, possibly null
171      * @return the created {@code RDFSource}.
172      */
173     public static RDFSource wrap(final Iterable<? extends Statement> statements,
174             @Nullable final Iterable<? extends Namespace> namespaces) {
175 
176         Objects.requireNonNull(statements);
177 
178         return new RDFSource() {
179 
180             @Override
181             public void emit(final RDFHandler handler, final int passes)
182                     throws RDFSourceException, RDFHandlerException {
183 
184                 Objects.requireNonNull(handler);
185 
186                 if (statements instanceof RDFSource) {
187                     ((RDFSource) statements).emit(new AbstractRDFHandlerWrapper(handler) {
188 
189                         @Override
190                         public void startRDF() throws RDFHandlerException {
191                             super.startRDF();
192                             if (namespaces != null) {
193                                 for (final Namespace ns : namespaces) {
194                                     this.handler.handleNamespace(ns.getPrefix(), ns.getName());
195                                 }
196                             }
197                         }
198 
199                         @Override
200                         public void handleComment(final String comment) throws RDFHandlerException {
201                             // discard
202                         }
203 
204                         @Override
205                         public void handleNamespace(final String prefix, final String uri)
206                                 throws RDFHandlerException {
207                             // discard
208                         }
209 
210                     }, passes);
211 
212                 } else {
213                     try {
214                         for (int i = 0; i < passes; ++i) {
215                             handler.startRDF();
216                             if (namespaces != null) {
217                                 for (final Namespace ns : namespaces) {
218                                     handler.handleNamespace(ns.getPrefix(), ns.getName());
219                                 }
220                             }
221                             for (final Statement statement : statements) {
222                                 handler.handleStatement(statement);
223                             }
224                             handler.endRDF();
225                         }
226                     } finally {
227                         IO.closeQuietly(handler);
228                     }
229                 }
230             }
231 
232             @SuppressWarnings({ "rawtypes", "unchecked" })
233             @Override
234             public Iterator<Statement> iterator() {
235                 return (Iterator) statements.iterator();
236             }
237 
238             @SuppressWarnings({ "rawtypes", "unchecked" })
239             @Override
240             public Spliterator<Statement> spliterator() {
241                 return (Spliterator) statements.spliterator();
242             }
243 
244         };
245     }
246 
247     /**
248      * Returns an {@code RDFSource} providing access to the statements and namespaces in the
249      * supplied collection and map. The collection and map MUST NOT be changed while the returned
250      * source is being used, as this will cause the source to return different data in different
251      * passes. Access to the collection and map is sequential, so they do not need to be
252      * thread-safe.
253      *
254      * @param statements
255      *            the statements collection, not null (can also be another {@code RDFSource})
256      * @param namespaces
257      *            the namespaces map, possibly null
258      * @return the created {@code RDFSource}
259      */
260     public static RDFSource wrap(final Iterable<? extends Statement> statements,
261             @Nullable final Map<String, String> namespaces) {
262 
263         List<Namespace> list;
264         if (namespaces == null || namespaces.isEmpty()) {
265             list = Collections.emptyList();
266         } else {
267             list = new ArrayList<Namespace>(namespaces.size());
268             for (final Map.Entry<String, String> entry : namespaces.entrySet()) {
269                 list.add(new NamespaceImpl(entry.getKey(), entry.getValue()));
270             }
271         }
272 
273         return wrap(statements, list);
274     }
275 
276     /**
277      * Returns an {@code RDFSource} that reads files from the locations specified. Each location
278      * is either a file path or a full URL, possibly prefixed with an {@code .ext:} fragment that
279      * overrides the file extension used to detect RDF format and compression. Files are read
280      * sequentially if {@code parallelize == false}, otherwise multiple files can be parsed in
281      * parallel and files in a line-oriented RDF format can be parsed using multiple threads, in
282      * both cases the effect being a greater throughput but no guarantee on the statements order,
283      * however. Arguments {@code preserveBNodes}, {@code baseURI} and {@code config} control the
284      * parsing process.
285      *
286      * @param parallelize
287      *            false if files should be parsed sequentially using only one thread
288      * @param preserveBNodes
289      *            true if BNodes in parsed files should be preserved, false if they should be
290      *            rewritten on a per-file basis to avoid possible clashes
291      * @param baseURI
292      *            the base URI to be used for resolving relative URIs, possibly null
293      * @param config
294      *            the optional {@code ParserConfig} for the fine tuning of the used RDF parser; if
295      *            null a default, maximally permissive configuration will be used
296      * @param locations
297      *            the locations of the RDF files to be read
298      * @return the created {@code RDFSource}
299      */
300     public static RDFSource read(final boolean parallelize, final boolean preserveBNodes,
301             @Nullable final String baseURI, @Nullable final ParserConfig config,
302             final String... locations) {
303         return new FileSource(parallelize, preserveBNodes, baseURI, config, locations);
304     }
305 
306     /**
307      * Returns an {@code RDFSource} that retrieves data from a SPARQL endpoint using SPARQL
308      * CONSTRUCT or SELECT queries. CONSTRUCT queries are limited (due to SPARQL) to return only
309      * triples in a default graph. SELECT query should return bindings for variables {@code s},
310      * {@code p}, {@code o} and {@code c}, which are used as subject, predicate, object and
311      * context of returned statements; incomplete or invalid bindings (e.g., bindings where the
312      * subject is bound to a literal) are silently ignored.
313      *
314      * @param parallelize
315      *            true to use multiple threads should for handling parsed triples
316      * @param preserveBNodes
317      *            true if BNodes in the query result should be preserved, false if they should be
318      *            rewritten on a per-endpoint basis to avoid possible clashes
319      * @param endpointURL
320      *            the URL of the SPARQL endpoint, not null
321      * @param query
322      *            the SPARQL query (CONSTRUCT or SELECT form) to submit to the endpoint
323      * @return the created {@code RDFSource}
324      */
325     public static RDFSource query(final boolean parallelize, final boolean preserveBNodes,
326             final String endpointURL, final String query) {
327         return new SparqlSource(parallelize, preserveBNodes, endpointURL, query);
328     }
329 
330     private RDFSources() {
331     }
332 
333     private static RDFHandler rewriteBNodes(final RDFHandler handler, final String suffix) {
334         Objects.requireNonNull(suffix);
335         return handler == RDFHandlers.NIL ? handler : new AbstractRDFHandlerWrapper(handler) {
336 
337             @Override
338             public void handleStatement(final Statement statement) throws RDFHandlerException {
339 
340                 if (statement.getSubject() instanceof BNode
341                         || statement.getObject() instanceof BNode
342                         || statement.getContext() instanceof BNode) {
343 
344                     final Resource s = (Resource) rewrite(statement.getSubject());
345                     final URI p = statement.getPredicate();
346                     final Value o = rewrite(statement.getObject());
347                     final Resource c = (Resource) rewrite(statement.getContext());
348 
349                     final ValueFactory vf = Statements.VALUE_FACTORY;
350                     if (c == null) {
351                         super.handleStatement(vf.createStatement(s, p, o));
352                     } else {
353                         super.handleStatement(vf.createStatement(s, p, o, c));
354                     }
355 
356                 } else {
357                     super.handleStatement(statement);
358                 }
359             }
360 
361             @Nullable
362             private Value rewrite(@Nullable final Value value) {
363                 if (!(value instanceof BNode)) {
364                     return value;
365                 }
366                 final String oldID = ((BNode) value).getID();
367                 final String newID = Hash.murmur3(oldID, suffix).toString();
368                 return Statements.VALUE_FACTORY.createBNode(newID);
369             }
370 
371         };
372     }
373 
374     private static class FileSource implements RDFSource {
375 
376         private static final Logger LOGGER = LoggerFactory.getLogger(FileSource.class);
377 
378         private final boolean parallelize;
379 
380         private final boolean preserveBNodes;
381 
382         private final String base;
383 
384         private final ParserConfig parserConfig;
385 
386         private final String[] locations;
387 
388         public FileSource(final boolean parallelize, final boolean preserveBNodes,
389                 @Nullable final String baseURI, @Nullable final ParserConfig parserConfig,
390                 final String... locations) {
391 
392             this.parallelize = parallelize;
393             this.preserveBNodes = preserveBNodes;
394             this.base = baseURI != null ? baseURI : "";
395             this.parserConfig = parserConfig != null ? parserConfig : DEFAULT_PARSER_CONFIG;
396             this.locations = locations;
397         }
398 
399         @Override
400         public void emit(final RDFHandler handler, final int passes) throws RDFSourceException,
401                 RDFHandlerException {
402 
403             Objects.requireNonNull(handler);
404 
405             RDFHandler sink = handler;
406             if (this.parallelize) {
407                 sink = RDFHandlers.decouple(sink);
408             }
409 
410             final RDFHandler wrappedSink = RDFHandlers.ignoreMethods(sink,
411                     RDFHandlers.METHOD_START_RDF | RDFHandlers.METHOD_END_RDF);
412 
413             try {
414                 for (int i = 0; i < passes; ++i) {
415                     sink.startRDF();
416                     parse(wrappedSink);
417                     sink.endRDF();
418                 }
419             } catch (RDFHandlerException | RuntimeException | Error ex) {
420                 throw ex;
421             } catch (final Throwable ex) {
422                 throw new RDFSourceException(ex);
423             } finally {
424                 IO.closeQuietly(handler);
425             }
426         }
427 
428         private void parse(final RDFHandler handler) throws Throwable {
429 
430             // Sort the locations based on decreasing size to improve throughput
431             final String[] sortedLocations = this.locations.clone();
432             Arrays.sort(sortedLocations, new Comparator<String>() {
433 
434                 @Override
435                 public int compare(final String first, final String second) {
436                     final URL firstURL = IO.extractURL(first);
437                     final URL secondURL = IO.extractURL(second);
438                     final boolean firstIsFile = "file".equals(firstURL.getProtocol());
439                     final boolean secondIsFile = "file".equals(secondURL.getProtocol());
440                     if (firstIsFile && secondIsFile) {
441                         try {
442                             final File firstFile = new File(firstURL.toURI());
443                             final File secondFile = new File(secondURL.toURI());
444                             return secondFile.length() > firstFile.length() ? 1 : -1;
445                         } catch (final Throwable ex) {
446                             // ignore
447                         }
448                     } else if (firstIsFile) {
449                         return 1;
450                     } else if (secondIsFile) {
451                         return -1;
452                     }
453                     return firstURL.toString().compareTo(secondURL.toString());
454                 }
455 
456             });
457 
458             final Map<String, InputStream> streams = new HashMap<String, InputStream>();
459 
460             final List<ParseJob> jobs = new ArrayList<ParseJob>();
461             for (final String location : this.locations) {
462                 final RDFFormat format = Rio.getParserFormatForFileName("test"
463                         + IO.extractExtension(location));
464                 final int parallelism = !this.parallelize
465                         || !Statements.isRDFFormatLineBased(format) ? 1 : Environment.getCores();
466                 for (int i = 0; i < parallelism; ++i) {
467                     jobs.add(new ParseJob(streams, location.toString(), handler));
468                 }
469             }
470 
471             final int parallelism = !this.parallelize ? 1 : Math.min(Environment.getCores(),
472                     jobs.size());
473 
474             final CountDownLatch latch = new CountDownLatch(parallelism);
475             final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
476             final AtomicInteger index = new AtomicInteger(0);
477 
478             final List<Runnable> runnables = new ArrayList<Runnable>();
479             for (int i = 0; i < parallelism; ++i) {
480                 runnables.add(new Runnable() {
481 
482                     @Override
483                     public void run() {
484                         try {
485                             while (true) {
486                                 ParseJob currentJob;
487                                 synchronized (jobs) {
488                                     final int i = index.getAndIncrement();
489                                     if (i >= jobs.size()) {
490                                         break;
491                                     }
492                                     currentJob = jobs.get(i);
493                                 }
494                                 currentJob.run();
495                             }
496                         } catch (final Throwable ex) {
497                             synchronized (jobs) {
498                                 for (final ParseJob job : jobs) {
499                                     job.cancel();
500                                 }
501                             }
502                             exception.set(ex);
503                         } finally {
504                             latch.countDown();
505                         }
506                     }
507 
508                 });
509             }
510 
511             try {
512                 for (int i = 1; i < parallelism; ++i) {
513                     Environment.getPool().execute(runnables.get(i));
514                 }
515                 if (!runnables.isEmpty()) {
516                     runnables.get(0).run();
517                 }
518                 latch.await();
519                 if (exception.get() != null) {
520                     throw exception.get();
521                 }
522             } finally {
523                 for (final InputStream stream : streams.values()) {
524                     IO.closeQuietly(stream);
525                 }
526             }
527         }
528 
529         private class ParseJob {
530 
531             private final Map<String, InputStream> streams;
532 
533             private final String location;
534 
535             private final RDFHandler handler;
536 
537             private volatile boolean closed;
538 
539             private Closeable in;
540 
541             ParseJob(final Map<String, InputStream> streams, final String location,
542                     final RDFHandler handler) {
543                 this.streams = streams;
544                 this.location = location;
545                 this.handler = handler;
546                 this.closed = false;
547                 this.in = null;
548             }
549 
550             void cancel() {
551                 this.closed = true;
552                 IO.closeQuietly(this.in);
553             }
554 
555             void run() throws Throwable {
556 
557                 if (this.closed) {
558                     return;
559                 }
560 
561                 final RDFFormat format = Rio.getParserFormatForFileName("test"
562                         + IO.extractExtension(this.location));
563 
564                 final String logMsg = "Starting {} {} {} parsing for {}";
565                 if (!Statements.isRDFFormatTextBased(format)) {
566                     LOGGER.debug(logMsg, "sequential", "binary", format.getName(), this.location);
567                     this.in = IO.buffer(IO.read(this.location));
568 
569                 } else if (!FileSource.this.parallelize
570                         || !Statements.isRDFFormatLineBased(format)) {
571                     LOGGER.debug(logMsg, "sequential", "text", format.getName(), this.location);
572                     this.in = IO.buffer(new InputStreamReader(IO.read(this.location), Charset
573                             .forName("UTF-8")));
574 
575                 } else {
576                     LOGGER.debug(logMsg, "parallel", "text", format.getName(), this.location);
577                     synchronized (this.streams) {
578                         InputStream stream = this.streams.get(this.location);
579                         if (stream == null) {
580                             if (this.streams.containsKey(this.location)) {
581                                 return; // read already completed for file at location
582                             }
583                             stream = IO.read(this.location);
584                             this.streams.put(this.location, stream);
585                         }
586                         this.in = IO.utf8Reader(IO.parallelBuffer(stream, (byte) '\n'));
587                     }
588                 }
589 
590                 try {
591                     final RDFHandler handler = FileSource.this.preserveBNodes ? this.handler //
592                             : rewriteBNodes(this.handler, Hash.murmur3(this.location).toString());
593                     final RDFParser parser = Rio.createParser(format);
594                     parser.setParserConfig(FileSource.this.parserConfig);
595                     parser.setValueFactory(Statements.VALUE_FACTORY);
596                     parser.setRDFHandler(handler);
597                     if (this.in instanceof InputStream) {
598                         parser.parse((InputStream) this.in, FileSource.this.base);
599                     } else {
600                         parser.parse((Reader) this.in, FileSource.this.base);
601                     }
602                 } catch (final Throwable ex) {
603                     if (!this.closed) {
604                         final String exMsg = "Parsing of " + this.location + " failed";
605                         if (ex instanceof RDFHandlerException) {
606                             throw new RDFHandlerException(exMsg, ex);
607                         } else {
608                             throw new RDFSourceException(exMsg, ex);
609                         }
610                     }
611                 } finally {
612                     synchronized (this.streams) {
613                         IO.closeQuietly(this.in);
614                         this.in = null;
615                         this.streams.put(this.location, null); // ensure stream is not read again
616                     }
617                 }
618             }
619 
620         }
621 
622     }
623 
624     private static class SparqlSource implements RDFSource {
625 
626         private final boolean parallelize;
627 
628         private final boolean preserveBNodes;
629 
630         private final String endpointURL;
631 
632         private final String query;
633 
634         private final boolean isSelect;
635 
636         SparqlSource(final boolean parallelize, final boolean preserveBNodes,
637                 final String endpointURL, final String query) {
638 
639             this.parallelize = parallelize;
640             this.preserveBNodes = preserveBNodes;
641             this.endpointURL = Objects.requireNonNull(endpointURL);
642             this.query = Objects.requireNonNull(query);
643             this.isSelect = isSelectQuery(query);
644         }
645 
646         @Override
647         public void emit(final RDFHandler handler, final int passes) throws RDFSourceException,
648                 RDFHandlerException {
649 
650             // different BNodes may be returned each time the query is evaluated;
651             // to allow preserving their identities, we should store the query result and
652             // read from it from disk the next times
653             Objects.requireNonNull(handler);
654 
655             RDFHandler actualHandler = handler;
656             if (this.parallelize) {
657                 actualHandler = RDFHandlers.decouple(actualHandler);
658             }
659             if (!this.preserveBNodes) {
660                 actualHandler = rewriteBNodes(actualHandler, //
661                         Hash.murmur3(this.endpointURL).toString());
662             }
663 
664             try {
665                 for (int i = 0; i < passes; ++i) {
666                     actualHandler.startRDF();
667                     sendQuery(actualHandler);
668                     actualHandler.endRDF();
669                 }
670             } catch (RDFHandlerException | RuntimeException | Error ex) {
671                 throw ex;
672             } catch (final Throwable ex) {
673                 throw new RDFSourceException("Sparql query to " + this.endpointURL + " failed", ex);
674             } finally {
675                 IO.closeQuietly(actualHandler);
676             }
677         }
678 
679         private void sendQuery(final RDFHandler handler) throws Throwable {
680 
681             final List<String> acceptTypes;
682             acceptTypes = this.isSelect ? Arrays.asList("application/sparql-results+xml",
683                     "application/xml") : RDFFormat.RDFXML.getMIMETypes();
684 
685             final byte[] requestBody = ("query=" + URLEncoder.encode(this.query, "UTF-8") + "&infer=true")
686                     .getBytes(Charset.forName("UTF-8"));
687 
688             final URL url = new URL(this.endpointURL);
689             final HttpURLConnection connection = (HttpURLConnection) url.openConnection();
690             connection.setDoOutput(true);
691             connection.setDoInput(true);
692             connection.setRequestMethod("POST");
693             connection.setRequestProperty("Accept", String.join(",", acceptTypes));
694             connection.setRequestProperty("Content-Length", Integer.toString(requestBody.length));
695             connection.setRequestProperty("Content-Type",
696                     "application/x-www-form-urlencoded; charset=utf-8");
697 
698             connection.connect();
699 
700             try {
701                 final DataOutputStream out = new DataOutputStream(connection.getOutputStream());
702                 out.write(requestBody);
703                 out.close();
704 
705                 final int httpCode = connection.getResponseCode();
706                 if (httpCode != HttpURLConnection.HTTP_OK) {
707                     throw new IOException("Download from '" + this.endpointURL + "' failed (HTTP "
708                             + httpCode + ")");
709                 }
710 
711                 try (InputStream in = connection.getInputStream()) {
712                     if (this.isSelect) {
713                         parseTupleResult(in, handler);
714                     } else {
715                         parseTripleResult(in, handler);
716                     }
717                 }
718 
719             } finally {
720                 connection.disconnect();
721             }
722         }
723 
724         private void parseTripleResult(final InputStream stream, final RDFHandler handler)
725                 throws RDFHandlerException, RDFParseException, IOException {
726 
727             final ParserConfig parserConfig = new ParserConfig();
728             parserConfig.addNonFatalError(BasicParserSettings.VERIFY_DATATYPE_VALUES);
729             parserConfig.addNonFatalError(BasicParserSettings.VERIFY_LANGUAGE_TAGS);
730 
731             final RDFParser parser = Rio.createParser(RDFFormat.RDFXML, Statements.VALUE_FACTORY);
732             parser.setParserConfig(parserConfig);
733             parser.setParseErrorListener(new ParseErrorLogger());
734             parser.setRDFHandler(new AbstractRDFHandler() {
735 
736                 @Override
737                 public void handleStatement(final Statement statement) throws RDFHandlerException {
738                     handler.handleStatement(statement);
739                 }
740 
741             });
742             parser.parse(stream, this.endpointURL);
743         }
744 
745         private void parseTupleResult(final InputStream stream, final RDFHandler handler)
746                 throws RDFHandlerException, XMLStreamException {
747 
748             final ValueFactory vf = Statements.VALUE_FACTORY;
749             final Value[] values = new Value[4];
750 
751             final XMLStreamReader in = XMLInputFactory.newInstance().createXMLStreamReader(stream);
752 
753             while (in.nextTag() != XMLStreamConstants.START_ELEMENT
754                     || !in.getLocalName().equals("results")) {
755             }
756 
757             while (enterChild(in, "result")) {
758                 while (enterChild(in, "binding")) {
759                     final String varName = in.getAttributeValue(null, "name");
760                     final char var = Character.toLowerCase(varName.charAt(0));
761                     final int index = var == 's' ? 0 : var == 'p' ? 1 : var == 'o' ? 2 : 3;
762                     if (!enterChild(in, null)) {
763                         throw new XMLStreamException("Empty <binding> element found");
764                     }
765                     final String tag = in.getLocalName();
766                     Value value;
767                     if ("bnode".equals(tag)) {
768                         value = vf.createBNode(in.getElementText());
769                     } else if ("uri".equals(tag)) {
770                         value = vf.createURI(in.getElementText());
771                     } else if ("literal".equals(tag)) {
772                         final String lang = in.getAttributeValue(null, "lang");
773                         final String dt = in.getAttributeValue(null, "datatype");
774                         final String label = in.getElementText();
775                         value = lang != null ? vf.createLiteral(label, lang) : dt != null ? vf
776                                 .createLiteral(label, vf.createURI(dt)) : vf.createLiteral(label);
777                     } else {
778                         throw new XMLStreamException(
779                                 "Expected <bnode>, <uri> or <literal>, found <" + tag + ">");
780                     }
781                     values[index] = value;
782                     leaveChild(in); // leave binding
783                 }
784                 leaveChild(in); // leave result
785 
786                 if (values[0] instanceof Resource && values[1] instanceof URI && values[2] != null) {
787                     final Resource s = (Resource) values[0];
788                     final URI p = (URI) values[1];
789                     final Value o = values[2];
790                     if (values[3] instanceof Resource) {
791                         final Resource c = (Resource) values[3];
792                         handler.handleStatement(vf.createStatement(s, p, o, c));
793                     } else {
794                         handler.handleStatement(vf.createStatement(s, p, o));
795                     }
796                 }
797                 Arrays.fill(values, null);
798             }
799 
800             while (in.nextTag() != XMLStreamConstants.END_DOCUMENT) {
801             }
802         }
803 
804         private static boolean enterChild(final XMLStreamReader in, @Nullable final String name)
805                 throws XMLStreamException {
806             if (in.nextTag() == XMLStreamConstants.END_ELEMENT) {
807                 return false;
808             }
809             if (name == null || name.equals(in.getLocalName())) {
810                 return true;
811             }
812             final String childName = in.getLocalName();
813             throw new XMLStreamException("Expected <" + name + ">, found <" + childName + ">");
814         }
815 
816         private static void leaveChild(final XMLStreamReader in) throws XMLStreamException {
817             if (in.nextTag() != XMLStreamConstants.END_ELEMENT) {
818                 throw new XMLStreamException("Unexpected element <" + in.getLocalName() + ">");
819             }
820         }
821 
822         private static boolean isSelectQuery(final String string) {
823             final int length = string.length();
824             int index = 0;
825             while (index < length) {
826                 final char ch = string.charAt(index);
827                 if (ch == '#') { // comment
828                     while (index < length && string.charAt(index) != '\n') {
829                         ++index;
830                     }
831                 } else if (ch == 'p' || ch == 'b' || ch == 'P' || ch == 'B') { // prefix or base
832                     while (index < length && string.charAt(index) != '>') {
833                         ++index;
834                     }
835                 } else if (!Character.isWhitespace(ch)) { // found begin of query
836                     final int start = index;
837                     while (!Character.isWhitespace(string.charAt(index))) {
838                         ++index;
839                     }
840                     final String form = string.substring(start, index).toLowerCase();
841                     if (form.equals("select")) {
842                         return true;
843                     } else if (form.equals("construct") || form.equals("describe")) {
844                         return false;
845                     } else {
846                         throw new IllegalArgumentException("Invalid query form: " + form);
847                     }
848                 }
849                 ++index;
850             }
851             throw new IllegalArgumentException("Cannot detect SPARQL query form");
852         }
853 
854     }
855 
856 }