1   /*
2    * RDFpro - An extensible tool for building stream-oriented RDF processing libraries.
3    * 
4    * Written in 2014 by Francesco Corcoglioniti with support by Marco Amadori, Michele Mostarda,
5    * Alessio Palmero Aprosio and Marco Rospocher. Contact info on http://rdfpro.fbk.eu/
6    * 
7    * To the extent possible under law, the authors have dedicated all copyright and related and
8    * neighboring rights to this software to the public domain worldwide. This software is
9    * distributed without any warranty.
10   * 
11   * You should have received a copy of the CC0 Public Domain Dedication along with this software.
12   * If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.
13   */
14  package eu.fbk.rdfpro;
15  
16  import java.io.Closeable;
17  import java.io.DataOutputStream;
18  import java.io.IOException;
19  import java.io.OutputStream;
20  import java.io.OutputStreamWriter;
21  import java.io.Writer;
22  import java.net.HttpURLConnection;
23  import java.net.URL;
24  import java.net.URLEncoder;
25  import java.nio.charset.Charset;
26  import java.util.ArrayList;
27  import java.util.Arrays;
28  import java.util.Collection;
29  import java.util.Collections;
30  import java.util.HashSet;
31  import java.util.Iterator;
32  import java.util.LinkedList;
33  import java.util.List;
34  import java.util.Map;
35  import java.util.Objects;
36  import java.util.Set;
37  import java.util.concurrent.ArrayBlockingQueue;
38  import java.util.concurrent.BlockingQueue;
39  import java.util.concurrent.Future;
40  import java.util.concurrent.atomic.AtomicInteger;
41  import java.util.concurrent.atomic.AtomicLong;
42  import java.util.concurrent.atomic.AtomicReference;
43  import java.util.function.Consumer;
44  
45  import javax.annotation.Nullable;
46  
47  import com.google.common.base.Throwables;
48  import com.google.common.collect.Lists;
49  
50  import org.openrdf.model.Model;
51  import org.openrdf.model.Namespace;
52  import org.openrdf.model.Resource;
53  import org.openrdf.model.Statement;
54  import org.openrdf.model.URI;
55  import org.openrdf.model.Value;
56  import org.openrdf.model.impl.NamespaceImpl;
57  import org.openrdf.rio.RDFFormat;
58  import org.openrdf.rio.RDFHandler;
59  import org.openrdf.rio.RDFHandlerException;
60  import org.openrdf.rio.RDFWriter;
61  import org.openrdf.rio.Rio;
62  import org.openrdf.rio.WriterConfig;
63  import org.openrdf.rio.helpers.BasicWriterSettings;
64  import org.slf4j.Logger;
65  import org.slf4j.LoggerFactory;
66  
67  import eu.fbk.rdfpro.util.Environment;
68  import eu.fbk.rdfpro.util.IO;
69  import eu.fbk.rdfpro.util.QuadModel;
70  import eu.fbk.rdfpro.util.Sorter;
71  import eu.fbk.rdfpro.util.Statements;
72  
73  /**
74   * Utility methods dealing with {@code RDFHandler}s.
75   */
76  public final class RDFHandlers {
77  
78      /** The {@code int} value for method {@link RDFHandler#startRDF()}. */
79      public static final int METHOD_START_RDF = 0x01;
80  
81      /** The {@code int} value for method {@link RDFHandler#handleComment(String)}. */
82      public static final int METHOD_HANDLE_COMMENT = 0x02;
83  
84      /** The {@code int} value for method {@link RDFHandler#handleNamespace(String, String)}. */
85      public static final int METHOD_HANDLE_NAMESPACE = 0x04;
86  
87      /** The {@code int} value for method {@link RDFHandler#handleStatement(Statement)}. */
88      public static final int METHOD_HANDLE_STATEMENT = 0x08;
89  
90      /** The {@code int} value for method {@link RDFHandler#endRDF()}. */
91      public static final int METHOD_END_RDF = 0x10;
92  
93      /** The {@code int} value for method {@link AutoCloseable#close()}. */
94      public static final int METHOD_CLOSE = 0x20;
95  
96      /** The null {@code RDFHandler} that does nothing. */
97      public static final AbstractRDFHandler NIL = new AbstractRDFHandler() {};
98  
99      private static final Logger LOGGER = LoggerFactory.getLogger(RDFHandlers.class);
100 
101     private static final WriterConfig DEFAULT_WRITER_CONFIG;
102 
103     static {
104         final WriterConfig config = new WriterConfig();
105         config.set(BasicWriterSettings.PRETTY_PRINT, true);
106         config.set(BasicWriterSettings.RDF_LANGSTRING_TO_LANG_LITERAL, true);
107         config.set(BasicWriterSettings.XSD_STRING_TO_PLAIN_LITERAL, true);
108         DEFAULT_WRITER_CONFIG = config;
109     }
110 
111     private RDFHandlers() {
112     }
113 
114     /**
115      * Returns an {@code RDFHandler} that populates the supplied statement collection. If the
116      * collection is a {@link Model} or a {@link QuadModel}, it is also populated with namespaces.
117      * If you don't want to populate namespaces, use {@link #wrap(Collection, Collection)} passing
118      * null as second argument. Note that access to the collection is not synchronized, so it must
119      * be thread-safe.
120      *
121      * @param statements
122      *            the statement collection to populate, not null
123      * @return the created {@code RDFHandler}
124      */
125     public static RDFHandler wrap(final Collection<? super Statement> statements) {
126         Objects.requireNonNull(statements);
127         return new WrapHandler(statements, statements instanceof Model
128                 || statements instanceof QuadModel ? statements : null);
129     }
130 
131     /**
132      * Returns an {@code RDFHandler} that populates the supplied statement and namespace
133      * collections. Access to the two collections is not synchronized, so they MUST be thread safe
134      * (may use {@link Collections#synchronizedCollection(Collection)}).
135      *
136      * @param statements
137      *            the statement collection to populate, not null
138      * @param namespaces
139      *            the namespace collection to populate, or null to discard namespaces
140      * @return the created {@code RDFHandler}
141      */
142     public static RDFHandler wrap(final Collection<? super Statement> statements,
143             @Nullable final Collection<? super Namespace> namespaces) {
144         Objects.requireNonNull(statements);
145         return new WrapHandler(statements, namespaces);
146     }
147 
148     /**
149      * Returns an {@code RDFHandler} that populates the supplied statement collection and
150      * prefix-to-namespace-uri map. Access to the collection and map is not synchronized, so they
151      * MUST be thread-safe (may use {@link Collections#synchronizedCollection(Collection)} and
152      * {@link Collections#synchronizedMap(Map)}).
153      *
154      * @param statements
155      *            the statement collection to populate, not null
156      * @param namespaces
157      *            the prefix-to-namespace-uri map to populate, or null to discard namespaces
158      * @return the created {@code RDFHandler}
159      */
160     public static RDFHandler wrap(final Collection<? super Statement> statements,
161             @Nullable final Map<? super String, ? super String> namespaces) {
162         Objects.requireNonNull(statements);
163         return new WrapHandler(statements, namespaces);
164     }
165 
166     /**
167      * Returns an {@code RDFHandler} that writes data to the files at the locations specified.
168      * Each location is either a file path or a full URL, possibly prefixed with an {@code .ext:}
169      * fragment that overrides the file extension used to detect RDF format and compression.
170      * Currently, URL different from {@code file://} are not supported, i.e., only local files can
171      * be written. If more locations are specified, statements are divided among them evenly (in
172      * chunks of configurable size). If no locations are given, the {@link #NIL} handler is
173      * returned. Note that data is written at each pass, so you may consider filtering out
174      * multiple passes to avoid writing the same data again.
175      *
176      * @param config
177      *            the optional {@code WriterConfig} for fine tuning the writing process; if null,
178      *            a default configuration enabling pretty printing will be used
179      * @param chunkSize
180      *            the number of consecutive statements to write as a block to a single location
181      *            (at least 1)
182      * @param locations
183      *            the locations of the files to write
184      * @return the created {@code RDFHandler}
185      */
186     public static RDFHandler write(@Nullable final WriterConfig config, final int chunkSize,
187             final String... locations) {
188         final WriterConfig actualConfig = config != null ? config : DEFAULT_WRITER_CONFIG;
189         final RDFHandler[] handlers = new RDFHandler[locations.length];
190         for (int i = 0; i < locations.length; ++i) {
191             final String location = locations[i];
192             final RDFFormat format = Statements.toRDFFormat(location);
193             final boolean parallel = Statements.isRDFFormatLineBased(format);
194             handlers[i] = parallel ? new ParallelWriteHandler(actualConfig, location)
195                     : new SequentialWriteHandler(actualConfig, location);
196         }
197         return handlers.length == 0 ? NIL : handlers.length == 1 ? handlers[0]
198                 : dispatchRoundRobin(chunkSize, handlers);
199     }
200 
201     /**
202      * Returns an {@code RDFHandler} that uploads data to a SPARQL endpoint via SPARQL Update
203      * INSERT DATA calls. Note that data is written at each pass, so you may consider filtering
204      * out multiple passes to avoid writing the same data again.
205      *
206      * @param endpointURL
207      *            the URL of the SPARQL Update endpoint, not null
208      * @return the created {@code RDFHandler}
209      */
210     public static RDFHandler update(final String endpointURL) {
211         return new UpdateHandler(endpointURL, null);
212     }
213 
214     /**
215      * Wraps the supplied {@code RDFHandler} ignoring calls to the methods specified by the given
216      * bitmap. Calling an ignored method on the returned {@code RDFHandler} produces no effect,
217      * while calls to other method are forwarded to the wrapped {@code handler}. Use constants
218      * {@code METHOD_XXX} in this class (OR-ed together) to define which methods to ignore.
219      *
220      * @param handler
221      *            the handler to wrap, not null
222      * @param ignoredMethods
223      *            a bitmap specifying which methods to ignore
224      * @return the created {@code RDFHandler} wrapper
225      */
226     public static RDFHandler ignoreMethods(final RDFHandler handler, final int ignoredMethods) {
227 
228         Objects.requireNonNull(handler);
229 
230         if (ignoredMethods == 0 || handler == NIL) {
231             return handler;
232 
233         } else if ((ignoredMethods & METHOD_HANDLE_STATEMENT) == 0) {
234             return new IgnoreMethodHandler(handler, ignoredMethods) {
235 
236                 @Override
237                 public void handleStatement(final Statement statement) throws RDFHandlerException {
238                     super.handleStatement(statement);
239                 }
240 
241             };
242 
243         } else {
244             return new IgnoreMethodHandler(handler, ignoredMethods) {
245 
246                 @Override
247                 public void handleStatement(final Statement statement) throws RDFHandlerException {
248                     // Discard
249                 }
250 
251             };
252         }
253     }
254 
255     /**
256      * Wraps the supplied {@code RDFHandler} discarding any method call after the specified number
257      * of passes has been performed. Calls to {@code close()} are always propagated, however.
258      *
259      * @param handler
260      *            the handler to wrap, not null
261      * @param maxPasses
262      *            the maximum number of passes to perform; additional passes will be ignored
263      * @return the created {@code RDFHandler} wrapper
264      */
265     public static RDFHandler ignorePasses(final RDFHandler handler, final int maxPasses) {
266         if (handler == NIL) {
267             return handler;
268         }
269         return new AbstractRDFHandlerWrapper(handler) {
270 
271             private RDFHandler passHandler = null;
272 
273             private int pass = 0;
274 
275             @Override
276             public void startRDF() throws RDFHandlerException {
277                 this.passHandler = this.pass < maxPasses ? this.handler : NIL;
278                 this.passHandler.startRDF();
279             }
280 
281             @Override
282             public void handleComment(final String comment) throws RDFHandlerException {
283                 this.passHandler.handleComment(comment);
284             }
285 
286             @Override
287             public void handleNamespace(final String prefix, final String uri)
288                     throws RDFHandlerException {
289                 this.passHandler.handleNamespace(prefix, uri);
290             }
291 
292             @Override
293             public void handleStatement(final Statement statement) throws RDFHandlerException {
294                 this.passHandler.handleStatement(statement);
295             }
296 
297             @Override
298             public void endRDF() throws RDFHandlerException {
299                 try {
300                     this.passHandler.endRDF();
301                 } finally {
302                     ++this.pass;
303                 }
304             }
305 
306         };
307     }
308 
309     /**
310      * Returns an {@code RDFHandler} that dispatches calls to all the {@code RDFHandler}s
311      * supplied. If no {@code RDFHandler} is supplied, {@link #NIL} is returned.
312      *
313      * @param handlers
314      *            the {@code RDFHandler}s to forward calls to
315      * @return the created {@code RDFHandler} dispatcher
316      */
317     public static RDFHandler dispatchAll(final RDFHandler... handlers) {
318         return dispatchAll(handlers, new int[handlers.length]);
319     }
320 
321     /**
322      * Returns an {@code RDFHandler} that dispatches calls to all the {@code RDFHandler}s
323      * supplied, optionally performing more passes on selected handlers. Argument
324      * {@code extraPasses} controls how many passes a supplied handler should perform w.r.t. other
325      * handlers. Operatively, if N is the maximum number in {@code extraPasses}, at pass I &lt; N
326      * the dispatcher will forward calls only to handles whose extra passes value is greater than
327      * N - I; in passes I &gt;= N all handlers will be called. This mechanism allows for selected
328      * handlers to receive the additional passes they need for their initialization, without
329      * performing these passes also on other handlers that do not need them. Note that if no
330      * {@code RDFHandler} is supplied, {@link #NIL} is returned.
331      *
332      * @param handlers
333      *            the {@code RDFHandler}s to forward calls to
334      * @param extraPasses
335      *            the number of extra passes for each supplied handler
336      * @return the created {@code RDFHandler} dispatcher
337      */
338     public static RDFHandler dispatchAll(final RDFHandler[] handlers, final int[] extraPasses) {
339         Objects.requireNonNull(extraPasses);
340         if (Arrays.asList(handlers).contains(null)) {
341             throw new NullPointerException();
342         }
343         if (handlers.length == 0) {
344             return NIL;
345         } else if (handlers.length == 1) {
346             return handlers[0];
347         } else if (handlers.length == 2 && extraPasses[0] == extraPasses[1]) {
348             return new DispatchTwoHandler(handlers[0], handlers[1]);
349         } else {
350             return new DispatchAllHandler(handlers, extraPasses);
351         }
352     }
353 
354     /**
355      * Returns an {@code RDFHandler} that dispatches calls to one of {@code RDFHandler}s supplied
356      * chosen in a round robin fashion. More precisely, calls to methods
357      * {@link RDFHandler#handleStatement(Statement)} and {@link RDFHandler#handleComment(String)}
358      * are forwarded in a round robin fashion, with each chunk of {@code chunkSize >= 1}
359      * consecutive calls dispatched to a certain handler. Other methods are always forwarded to
360      * all the handlers. If no {@code RDFHandler} is supplied, {@link #NIL} is returned.
361      *
362      * @param chunkSize
363      *            the chunk size, greater than or equal to 1; you may use this parameter to keep
364      *            triples that are received consecutively together (as far as possible) when
365      *            propagated to wrapped handlers, e.g., because in this way they compress better
366      *            when written to a file (assuming input triples are somehow sorted)
367      * @param handlers
368      *            the {@code RDFHandler}s to forward calls to, in a round robin fashion
369      * @return the created {@code RDFHandler} dispatcher
370      */
371     public static RDFHandler dispatchRoundRobin(final int chunkSize, final RDFHandler... handlers) {
372         if (Arrays.asList(handlers).contains(null)) {
373             throw new NullPointerException();
374         }
375         if (handlers.length == 0) {
376             return NIL;
377         } else if (handlers.length == 1) {
378             return handlers[0];
379         } else {
380             return new DispatchRoundRobinHandler(chunkSize, handlers);
381         }
382     }
383 
384     /**
385      * Returns an {@code RDFHandler} that collects multiple streams of RDF data, merging them by
386      * means in a unique stream using a {@code SetOperator}. This method accepts as arguments the
387      * sink where to forward the merged RDF stream, the {@code SetOperator} to apply and the
388      * number of input streams to collect. The method returns an array of {@code RDFHandler}, one
389      * for each input stream to be collected, that can be used by the method caller to provide the
390      * data to be merged.
391      *
392      * @param handler
393      *            the handler to decompose, not null
394      * @param count
395      *            the number of streams to collect and consequently the number of
396      *            {@code RDFHandler}s to return (at least 1)
397      * @param operation
398      *            the set operation to apply to merge input RDF streams
399      * @return the created {@code RDFHandler}s
400      */
401     public static RDFHandler[] collect(final RDFHandler handler, final int count,
402             final SetOperator operation) {
403 
404         Objects.requireNonNull(handler);
405         Objects.requireNonNull(operation);
406 
407         if (count < 1) {
408             throw new IllegalArgumentException();
409         }
410 
411         final RDFHandler[] result = new RDFHandler[count];
412 
413         if (count == 1 && (operation == SetOperator.SUM_MULTISET //
414                 || operation == SetOperator.UNION_MULTISET //
415                 || operation == SetOperator.INTERSECTION_MULTISET //
416         /*    */|| operation == SetOperator.DIFFERENCE_MULTISET)) {
417             result[0] = handler;
418         } else if (count == 1 && operation == SetOperator.SYMMETRIC_DIFFERENCE
419                 || operation == SetOperator.SYMMETRIC_DIFFERENCE_MULTISET) {
420             result[0] = NIL;
421         } else if (operation == SetOperator.SUM_MULTISET) {
422             Arrays.fill(result, new CollectMergerHandler(handler, count));
423         } else if (operation == SetOperator.UNION) {
424             Arrays.fill(result, new CollectSorterHandler(handler, count, true, true));
425         } else {
426             final CollectSetOperatorHandler sink;
427             sink = new CollectSetOperatorHandler(handler, count, operation);
428             for (int i = 0; i < count; ++i) {
429                 result[i] = new CollectLabellerHandler(sink, i);
430             }
431         }
432 
433         return result;
434     }
435 
436     /**
437      * Wraps the supplied {@code RDFHandler} (if necessary) so to buffer incoming statements and
438      * use additional threads for their processing. An {@code RDFHandler} that has already been
439      * decoupled is not wrapped again.
440      *
441      * @param handler
442      *            the handler to wrap
443      * @return the (possibly) wrapped handler
444      */
445     public static RDFHandler decouple(final RDFHandler handler) {
446         if (handler == NIL || handler instanceof DecoupleHandler) {
447             return handler;
448         }
449         return new DecoupleHandler(handler);
450     }
451 
452     /**
453      * Wraps the supplied {@code RDFHandelr} (if necessary) using a queue to decouple threads
454      * submitting statements from threads consuming them. A positive number of consumer threads
455      * should be specified. Wrapping does not occur if the handler is already decoupled using a
456      * queue.
457      *
458      * @param handler
459      *            the handler to wrap
460      * @param numConsumerThreads
461      *            the number of consumer threads, positive
462      * @return the (possibly) wrapped handler
463      */
464     public static RDFHandler decouple(final RDFHandler handler, final int numConsumerThreads) {
465         if (numConsumerThreads <= 0) {
466             throw new IllegalArgumentException("Invalid number of consumer threads: "
467                     + numConsumerThreads);
468         }
469         if (handler == NIL || handler instanceof DecoupleQueueHandler) {
470             return handler;
471         }
472         return new DecoupleQueueHandler(handler, numConsumerThreads);
473     }
474 
475     /**
476      * Wraps the supplied {@code RDFHandler} (if necessary) so that {@code handleXXX()} calls are
477      * invoked in a mutually exclusive way. This method can be used with {@code RDFHandler}s that
478      * are not thread-safe.
479      *
480      * @param handler
481      *            the handler to wrap
482      * @return the (possibly) wrapped handler
483      */
484     public static RDFHandler synchronize(final RDFHandler handler) {
485         if (handler == NIL || handler instanceof SynchronizeHandler) {
486             return handler;
487         }
488         return new SynchronizeHandler(handler);
489     }
490 
491     private static final class WrapHandler extends AbstractRDFHandler {
492 
493         private final Collection<? super Statement> statementSink;
494 
495         private final Object namespaceSink;
496 
497         public WrapHandler(final Collection<? super Statement> statementSink,
498                 @Nullable final Object namespaceSink) {
499             this.statementSink = statementSink;
500             this.namespaceSink = namespaceSink;
501         }
502 
503         @SuppressWarnings("unchecked")
504         @Override
505         public synchronized void handleNamespace(final String prefix, final String uri)
506                 throws RDFHandlerException {
507             if (this.namespaceSink instanceof Model) {
508                 ((Model) this.namespaceSink).setNamespace(prefix, uri);
509             } else if (this.namespaceSink instanceof QuadModel) {
510                 ((QuadModel) this.namespaceSink).setNamespace(prefix, uri);
511             } else if (this.namespaceSink instanceof Collection<?>) {
512                 ((Collection<Namespace>) this.namespaceSink).add(new NamespaceImpl(prefix, uri));
513             } else if (this.namespaceSink instanceof Map<?, ?>) {
514                 ((Map<String, String>) this.namespaceSink).put(prefix, uri);
515             }
516         }
517 
518         @Override
519         public synchronized void handleStatement(final Statement statement)
520                 throws RDFHandlerException {
521             this.statementSink.add(statement);
522         }
523 
524     }
525 
526     private static final class SequentialWriteHandler extends AbstractRDFHandler {
527 
528         private final WriterConfig config;
529 
530         private final String location;
531 
532         @Nullable
533         private Closeable out;
534 
535         @Nullable
536         private RDFWriter writer;
537 
538         SequentialWriteHandler(final WriterConfig config, final String location) {
539             this.config = config;
540             this.location = location;
541         }
542 
543         @Override
544         public void startRDF() throws RDFHandlerException {
545             try {
546                 final RDFFormat format = Statements.toRDFFormat(this.location);
547                 LOGGER.debug("Starting sequential {} writing of {}", format, this.location);
548                 final OutputStream stream = IO.write(this.location);
549                 if (Statements.isRDFFormatTextBased(format)) {
550                     this.out = IO.buffer(new OutputStreamWriter(stream, Charset.forName("UTF-8")));
551                     this.writer = Rio.createWriter(format, (Writer) this.out);
552                 } else {
553                     this.out = IO.buffer(stream);
554                     this.writer = Rio.createWriter(format, (OutputStream) this.out);
555                 }
556                 this.writer.setWriterConfig(this.config);
557                 this.writer.startRDF();
558             } catch (final IOException ex) {
559                 throw new RDFHandlerException("Could not write to " + this.location);
560             }
561             super.startRDF();
562         }
563 
564         @Override
565         public synchronized void handleComment(final String comment) throws RDFHandlerException {
566             this.writer.handleComment(comment);
567         }
568 
569         @Override
570         public synchronized void handleNamespace(final String prefix, final String uri)
571                 throws RDFHandlerException {
572             this.writer.handleNamespace(prefix, uri);
573         }
574 
575         @Override
576         public synchronized void handleStatement(final Statement statement)
577                 throws RDFHandlerException {
578             this.writer.handleStatement(statement);
579         }
580 
581         @Override
582         public void endRDF() throws RDFHandlerException {
583             this.writer.endRDF();
584             try {
585                 this.out.close();
586             } catch (final IOException ex) {
587                 throw new RDFHandlerException("Unable to properly close " + this.location, ex);
588             }
589         }
590 
591         @Override
592         public void close() {
593             IO.closeQuietly(this.out); // should be already closed
594             this.out = null;
595             this.writer = null;
596         }
597 
598     }
599 
600     private static final class ParallelWriteHandler extends AbstractRDFHandler {
601 
602         private final WriterConfig config;
603 
604         private final String location;
605 
606         @Nullable
607         private OutputStream out;
608 
609         @Nullable
610         private List<Writer> partialOuts;
611 
612         @Nullable
613         private List<RDFWriter> partialWriters;
614 
615         @Nullable
616         private ThreadLocal<RDFWriter> threadWriter;
617 
618         ParallelWriteHandler(final WriterConfig config, final String location) {
619             this.config = config;
620             this.location = location;
621         }
622 
623         @Override
624         public void startRDF() throws RDFHandlerException {
625             try {
626                 LOGGER.debug("Starting parallel {} writing of {}",
627                         Statements.toRDFFormat(this.location).getName(), this.location);
628                 this.out = IO.write(this.location);
629                 this.partialOuts = new ArrayList<Writer>();
630                 this.partialWriters = new ArrayList<RDFWriter>();
631                 this.threadWriter = new ThreadLocal<RDFWriter>() {
632 
633                     @Override
634                     protected RDFWriter initialValue() {
635                         return newWriter();
636                     }
637 
638                 };
639 
640             } catch (final IOException ex) {
641                 throw new RDFHandlerException("Could not write to " + this.location);
642             }
643             super.startRDF();
644         }
645 
646         @Override
647         public void handleStatement(final Statement statement) throws RDFHandlerException {
648             this.threadWriter.get().handleStatement(statement);
649         }
650 
651         @Override
652         public void endRDF() throws RDFHandlerException {
653             for (final RDFHandler partialWriter : this.partialWriters) {
654                 partialWriter.endRDF();
655             }
656             try {
657                 for (final Writer partialOut : this.partialOuts) {
658                     partialOut.close();
659                 }
660                 this.out.close();
661             } catch (final IOException ex) {
662                 throw new RDFHandlerException("Unable to properly close " + this.location, ex);
663             }
664         }
665 
666         @Override
667         public void close() {
668             IO.closeQuietly(this.out);
669             this.out = null;
670             this.partialOuts = null;
671             this.partialWriters = null;
672             this.threadWriter = null;
673         }
674 
675         private RDFWriter newWriter() {
676             final Writer partialOut = IO.utf8Writer(IO.parallelBuffer(this.out, (byte) '\n'));
677             final RDFFormat format = Statements.toRDFFormat(this.location);
678             final RDFWriter partialWriter = Rio.createWriter(format, partialOut);
679             partialWriter.setWriterConfig(this.config);
680             synchronized (this.partialOuts) {
681                 this.partialOuts.add(partialOut);
682                 this.partialWriters.add(partialWriter);
683             }
684             try {
685                 partialWriter.startRDF();
686             } catch (final RDFHandlerException ex) {
687                 throw new RuntimeException(ex);
688             }
689             return partialWriter;
690         }
691 
692     }
693 
694     private final static class UpdateHandler extends AbstractRDFHandler {
695 
696         private static final int DEFAULT_CHUNK_SIZE = 1024;
697 
698         private static final String HEAD = "INSERT DATA {\n";
699 
700         private final String endpointURL;
701 
702         private final int chunkSize;
703 
704         private final StringBuilder builder;
705 
706         private Resource lastCtx;
707 
708         private Resource lastSubj;
709 
710         private URI lastPred;
711 
712         private int count;
713 
714         UpdateHandler(final String endpointURL, @Nullable final Integer chunkSize) {
715             this.endpointURL = endpointURL;
716             this.chunkSize = chunkSize != null ? chunkSize : DEFAULT_CHUNK_SIZE;
717             this.builder = new StringBuilder();
718             this.lastCtx = null;
719             this.lastSubj = null;
720             this.lastPred = null;
721             this.count = 0;
722             this.builder.append(HEAD);
723         }
724 
725         @Override
726         public synchronized void handleStatement(final Statement statement)
727                 throws RDFHandlerException {
728 
729             final boolean sameCtx = Objects.equals(this.lastCtx, statement.getContext());
730             final boolean sameSubj = sameCtx && statement.getSubject().equals(this.lastSubj);
731             final boolean samePred = sameSubj && statement.getPredicate().equals(this.lastPred);
732 
733             if (this.lastSubj != null) {
734                 if (!sameSubj) {
735                     this.builder.append(" .\n");
736                 }
737                 if (!sameCtx && this.lastCtx != null) {
738                     this.builder.append("}\n");
739                 }
740             }
741 
742             if (!sameCtx && statement.getContext() != null) {
743                 this.builder.append("GRAPH ");
744                 emit(statement.getContext());
745                 this.builder.append(" {\n");
746             }
747 
748             if (!samePred) {
749                 if (!sameSubj) {
750                     emit(statement.getSubject());
751                     this.builder.append(" ");
752                 } else {
753                     this.builder.append(" ; ");
754                 }
755                 emit(statement.getPredicate());
756                 this.builder.append(" ");
757             } else {
758                 this.builder.append(" , ");
759             }
760 
761             emit(statement.getObject());
762 
763             this.lastCtx = statement.getContext();
764             this.lastSubj = statement.getSubject();
765             this.lastPred = statement.getPredicate();
766 
767             ++this.count;
768             if (this.count == this.chunkSize) {
769                 flush();
770             }
771         }
772 
773         @Override
774         public void endRDF() throws RDFHandlerException {
775             flush();
776         }
777 
778         private void emit(final Value value) {
779             try {
780                 Statements.formatValue(value, null, this.builder);
781             } catch (final IOException ex) {
782                 throw new Error("Unexpected exception (!)", ex);
783             }
784         }
785 
786         private void flush() throws RDFHandlerException {
787             if (this.count > 0) {
788                 if (this.lastSubj != null && this.lastCtx != null) {
789                     this.builder.append("}");
790                 }
791                 this.builder.append("}");
792                 final String update = this.builder.toString();
793                 this.builder.setLength(HEAD.length());
794                 this.count = 0;
795                 try {
796                     sendUpdate(update);
797                 } catch (final Throwable ex) {
798                     throw new RDFHandlerException(ex);
799                 }
800             }
801         }
802 
803         private void sendUpdate(final String update) throws IOException {
804 
805             final byte[] requestBody = ("update=" + URLEncoder.encode(update, "UTF-8"))
806                     .getBytes(Charset.forName("UTF-8"));
807 
808             final URL url = new URL(this.endpointURL);
809             final HttpURLConnection connection = (HttpURLConnection) url.openConnection();
810             connection.setDoOutput(true);
811             connection.setDoInput(true);
812             connection.setRequestMethod("POST");
813             connection.setRequestProperty("Content-Type",
814                     "application/x-www-form-urlencoded; charset=utf-8");
815             connection.setRequestProperty("Content-Length", Integer.toString(requestBody.length));
816 
817             connection.connect();
818 
819             try {
820                 final DataOutputStream out = new DataOutputStream(connection.getOutputStream());
821                 out.write(requestBody);
822                 out.close();
823 
824                 final int httpCode = connection.getResponseCode();
825                 if (httpCode != HttpURLConnection.HTTP_OK) {
826                     throw new IOException("Upload to '" + this.endpointURL + "' failed (HTTP "
827                             + httpCode + ")");
828                 }
829 
830             } finally {
831                 connection.disconnect();
832             }
833         }
834 
835     }
836 
837     private static abstract class IgnoreMethodHandler extends AbstractRDFHandlerWrapper {
838 
839         private final boolean forwardStartRDF;
840 
841         private final boolean forwardHandleComment;
842 
843         private final boolean forwardHandleNamespace;
844 
845         private final boolean forwardEndRDF;
846 
847         private final boolean forwardClose;
848 
849         IgnoreMethodHandler(final RDFHandler handler, final int ignoredMethods) {
850             super(handler);
851             this.forwardStartRDF = (ignoredMethods & METHOD_START_RDF) == 0;
852             this.forwardHandleComment = (ignoredMethods & METHOD_HANDLE_COMMENT) == 0;
853             this.forwardHandleNamespace = (ignoredMethods & METHOD_HANDLE_NAMESPACE) == 0;
854             this.forwardEndRDF = (ignoredMethods & METHOD_END_RDF) == 0;
855             this.forwardClose = (ignoredMethods & METHOD_CLOSE) == 0;
856         }
857 
858         @Override
859         public final void startRDF() throws RDFHandlerException {
860             if (this.forwardStartRDF) {
861                 super.startRDF();
862             }
863         }
864 
865         @Override
866         public final void handleComment(final String comment) throws RDFHandlerException {
867             if (this.forwardHandleComment) {
868                 super.handleComment(comment);
869             }
870         }
871 
872         @Override
873         public final void handleNamespace(final String prefix, final String uri)
874                 throws RDFHandlerException {
875             if (this.forwardHandleNamespace) {
876                 super.handleNamespace(prefix, uri);
877             }
878         }
879 
880         @Override
881         public final void endRDF() throws RDFHandlerException {
882             if (this.forwardEndRDF) {
883                 super.endRDF();
884             }
885         }
886 
887         @Override
888         public final void close() {
889             if (this.forwardClose) {
890                 super.close();
891             }
892         }
893 
894     }
895 
896     private static final class DispatchTwoHandler extends AbstractRDFHandler {
897 
898         private final RDFHandler first;
899 
900         private final RDFHandler second;
901 
902         DispatchTwoHandler(final RDFHandler first, final RDFHandler second) {
903             this.first = first;
904             this.second = second;
905         }
906 
907         @Override
908         public void startRDF() throws RDFHandlerException {
909             this.first.startRDF();
910             this.second.startRDF();
911         }
912 
913         @Override
914         public void handleComment(final String comment) throws RDFHandlerException {
915             this.first.handleComment(comment);
916             this.second.handleComment(comment);
917         }
918 
919         @Override
920         public void handleNamespace(final String prefix, final String uri)
921                 throws RDFHandlerException {
922             this.first.handleNamespace(prefix, uri);
923             this.second.handleNamespace(prefix, uri);
924         }
925 
926         @Override
927         public void handleStatement(final Statement statement) throws RDFHandlerException {
928             this.first.handleStatement(statement);
929             this.second.handleStatement(statement);
930         }
931 
932         @Override
933         public void endRDF() throws RDFHandlerException {
934             this.first.endRDF();
935             this.second.endRDF();
936         }
937 
938         @Override
939         public void close() {
940             IO.closeQuietly(this.first);
941             IO.closeQuietly(this.second);
942         }
943 
944     }
945 
946     private static final class DispatchAllHandler extends AbstractRDFHandler {
947 
948         private final RDFHandler[] handlers;
949 
950         private final int[] extraPasses;
951 
952         private RDFHandler[] passHandlers;
953 
954         private int passIndex; // counting downward to zero
955 
956         DispatchAllHandler(final RDFHandler[] handlers, final int[] extraPasses) {
957 
958             int maxExtraPasses = 0;
959             for (int i = 0; i < extraPasses.length; ++i) {
960                 maxExtraPasses = Math.max(maxExtraPasses, extraPasses[i]);
961             }
962 
963             this.handlers = handlers;
964             this.extraPasses = extraPasses;
965             this.passHandlers = null;
966             this.passIndex = maxExtraPasses;
967         }
968 
969         @Override
970         public void startRDF() throws RDFHandlerException {
971 
972             if (this.passIndex == 0) {
973                 this.passHandlers = this.handlers;
974             } else {
975                 final List<RDFHandler> list = new ArrayList<RDFHandler>();
976                 for (int i = 0; i < this.handlers.length; ++i) {
977                     if (this.extraPasses[i] >= this.passIndex) {
978                         list.add(this.handlers[i]);
979                     }
980                 }
981                 this.passHandlers = list.toArray(new RDFHandler[list.size()]);
982                 --this.passIndex;
983             }
984 
985             for (final RDFHandler handler : this.passHandlers) {
986                 handler.startRDF();
987             }
988         }
989 
990         @Override
991         public void handleComment(final String comment) throws RDFHandlerException {
992             for (final RDFHandler handler : this.passHandlers) {
993                 handler.handleComment(comment);
994             }
995         }
996 
997         @Override
998         public void handleNamespace(final String prefix, final String uri)
999                 throws RDFHandlerException {
1000             for (final RDFHandler handler : this.passHandlers) {
1001                 handler.handleNamespace(prefix, uri);
1002             }
1003         }
1004 
1005         @Override
1006         public void handleStatement(final Statement statement) throws RDFHandlerException {
1007             for (final RDFHandler handler : this.passHandlers) {
1008                 handler.handleStatement(statement);
1009             }
1010         }
1011 
1012         @Override
1013         public void endRDF() throws RDFHandlerException {
1014             for (final RDFHandler handler : this.passHandlers) {
1015                 handler.endRDF();
1016             }
1017         }
1018 
1019         @Override
1020         public void close() {
1021             for (final RDFHandler handler : this.handlers) {
1022                 IO.closeQuietly(handler);
1023             }
1024         }
1025 
1026     }
1027 
1028     private static final class DispatchRoundRobinHandler extends AbstractRDFHandler {
1029 
1030         private final AtomicLong counter = new AtomicLong(0);
1031 
1032         private final int chunkSize;
1033 
1034         private final RDFHandler[] handlers;
1035 
1036         DispatchRoundRobinHandler(final int chunkSize, final RDFHandler[] handlers) {
1037             this.chunkSize = chunkSize;
1038             this.handlers = handlers;
1039         }
1040 
1041         @Override
1042         public void startRDF() throws RDFHandlerException {
1043             for (final RDFHandler handler : this.handlers) {
1044                 handler.startRDF();
1045             }
1046         }
1047 
1048         @Override
1049         public void handleComment(final String comment) throws RDFHandlerException {
1050             pickHandler().handleComment(comment);
1051         }
1052 
1053         @Override
1054         public void handleNamespace(final String prefix, final String uri)
1055                 throws RDFHandlerException {
1056             for (final RDFHandler handler : this.handlers) {
1057                 handler.handleNamespace(prefix, uri);
1058             }
1059         }
1060 
1061         @Override
1062         public void handleStatement(final Statement statement) throws RDFHandlerException {
1063             pickHandler().handleStatement(statement);
1064         }
1065 
1066         @Override
1067         public void endRDF() throws RDFHandlerException {
1068             Throwable exception = null;
1069             for (final RDFHandler handler : this.handlers) {
1070                 try {
1071                     handler.endRDF();
1072                 } catch (final Throwable ex) {
1073                     if (exception == null) {
1074                         exception = ex;
1075                     } else {
1076                         exception.addSuppressed(ex);
1077                     }
1078                 }
1079             }
1080             if (exception != null) {
1081                 if (exception instanceof RuntimeException) {
1082                     throw (RDFHandlerException) exception;
1083                 } else if (exception instanceof Error) {
1084                     throw (Error) exception;
1085                 }
1086                 throw (RDFHandlerException) exception;
1087             }
1088         }
1089 
1090         @Override
1091         public void close() {
1092             for (final RDFHandler handler : this.handlers) {
1093                 IO.closeQuietly(handler);
1094             }
1095         }
1096 
1097         private RDFHandler pickHandler() {
1098             return this.handlers[(int) (this.counter.getAndIncrement() //
1099                     / this.chunkSize % this.handlers.length)];
1100         }
1101 
1102     }
1103 
1104     private static final class CollectLabellerHandler extends AbstractRDFHandlerWrapper {
1105 
1106         private final CollectMergerHandler collector;
1107 
1108         private final int label;
1109 
1110         CollectLabellerHandler(final CollectMergerHandler handler, final int label) {
1111             super(handler);
1112             this.collector = handler;
1113             this.label = label;
1114         }
1115 
1116         @Override
1117         public void handleStatement(final Statement statement) throws RDFHandlerException {
1118             this.collector.handleStatement(statement, this.label);
1119         }
1120 
1121     }
1122 
1123     private static class CollectMergerHandler extends AbstractRDFHandlerWrapper {
1124 
1125         private final int size;
1126 
1127         private int pending;
1128 
1129         CollectMergerHandler(final RDFHandler handler, final int size) {
1130             super(handler);
1131             this.size = size;
1132             this.pending = 0;
1133         }
1134 
1135         @Override
1136         public final void startRDF() throws RDFHandlerException {
1137             if (this.pending <= 0) {
1138                 this.pending = this.size;
1139                 super.startRDF();
1140                 doStartRDF();
1141             }
1142         }
1143 
1144         @Override
1145         public final void handleStatement(final Statement statement) throws RDFHandlerException {
1146             doHandleStatement(statement, 0);
1147         }
1148 
1149         public final void handleStatement(final Statement statement, final int label)
1150                 throws RDFHandlerException {
1151             doHandleStatement(statement, label);
1152         }
1153 
1154         @Override
1155         public final void endRDF() throws RDFHandlerException {
1156             --this.pending;
1157             if (this.pending == 0) {
1158                 doEndRDF();
1159                 super.endRDF();
1160             }
1161         }
1162 
1163         @Override
1164         public void close() {
1165             super.close();
1166             doClose();
1167         }
1168 
1169         void doStartRDF() throws RDFHandlerException {
1170         }
1171 
1172         void doHandleStatement(final Statement statement, final int label)
1173                 throws RDFHandlerException {
1174             super.handleStatement(statement);
1175         }
1176 
1177         void doEndRDF() throws RDFHandlerException {
1178         }
1179 
1180         void doClose() {
1181         }
1182 
1183     }
1184 
1185     private static class CollectSorterHandler extends CollectMergerHandler {
1186 
1187         private final boolean deduplicate;
1188 
1189         private final boolean parallelize;
1190 
1191         private Sorter<Object[]> sorter;
1192 
1193         CollectSorterHandler(final RDFHandler handler, final int size, final boolean deduplicate,
1194                 final boolean parallelize) {
1195             super(handler, size);
1196             this.deduplicate = deduplicate;
1197             this.parallelize = parallelize;
1198             this.sorter = null;
1199         }
1200 
1201         @Override
1202         void doStartRDF() throws RDFHandlerException {
1203             this.sorter = Sorter.newTupleSorter(true, Statement.class, Long.class);
1204             try {
1205                 this.sorter.start(this.deduplicate);
1206             } catch (final IOException ex) {
1207                 throw new RDFHandlerException(ex);
1208             }
1209         }
1210 
1211         @Override
1212         void doHandleStatement(final Statement statement, final int label)
1213                 throws RDFHandlerException {
1214             try {
1215                 this.sorter.emit(new Object[] { statement, label });
1216             } catch (final Throwable ex) {
1217                 throw new RDFHandlerException(ex);
1218             }
1219         }
1220 
1221         @Override
1222         void doEndRDF() throws RDFHandlerException {
1223             try {
1224                 this.sorter.end(this.parallelize, new Consumer<Object[]>() {
1225 
1226                     @Override
1227                     public void accept(final Object[] record) {
1228                         try {
1229                             final Statement statement = (Statement) record[0];
1230                             final int label = ((Long) record[1]).intValue();
1231                             doHandleStatementSorted(statement, label);
1232                         } catch (final RDFHandlerException ex) {
1233                             throw new RuntimeException(ex);
1234                         }
1235                     }
1236 
1237                 });
1238                 this.sorter.close();
1239                 this.sorter = null;
1240             } catch (final IOException ex) {
1241                 throw new RDFHandlerException(ex);
1242             }
1243         }
1244 
1245         @Override
1246         void doClose() {
1247             IO.closeQuietly(this.sorter);
1248         }
1249 
1250         void doHandleStatementSorted(final Statement statement, final int label)
1251                 throws RDFHandlerException {
1252             this.handler.handleStatement(statement);
1253         }
1254 
1255     }
1256 
1257     private static class CollectSetOperatorHandler extends CollectSorterHandler {
1258 
1259         private final SetOperator operator;
1260 
1261         private final int[] multiplicities;
1262 
1263         private Statement statement;
1264 
1265         CollectSetOperatorHandler(final RDFHandler handler, final int size,
1266                 final SetOperator operator) {
1267             super(handler, size, false, false);
1268             this.operator = operator;
1269             this.multiplicities = new int[size];
1270             this.statement = null;
1271         }
1272 
1273         @Override
1274         void doHandleStatementSorted(final Statement statement, final int label)
1275                 throws RDFHandlerException {
1276 
1277             if (!statement.equals(this.statement)
1278                     || !Objects.equals(statement.getContext(), this.statement.getContext())) {
1279                 flush();
1280                 this.statement = statement;
1281                 Arrays.fill(this.multiplicities, 0);
1282             }
1283             ++this.multiplicities[label];
1284         }
1285 
1286         @Override
1287         void doEndRDF() throws RDFHandlerException {
1288             super.doEndRDF();
1289             flush();
1290         }
1291 
1292         private void flush() throws RDFHandlerException {
1293             if (this.statement != null) {
1294                 final int multiplicity = this.operator.apply(this.multiplicities);
1295                 for (int i = 0; i < multiplicity; ++i) {
1296                     this.handler.handleStatement(this.statement);
1297                 }
1298             }
1299         }
1300 
1301     }
1302 
1303     private static final class DecoupleHandler extends AbstractRDFHandlerWrapper {
1304 
1305         private static final int BUFFER_SIZE = 4 * 1024;
1306 
1307         private final int numCores;
1308 
1309         private final Set<Thread> incomingThreads;
1310 
1311         private final List<Future<?>> futures;
1312 
1313         private final AtomicInteger counter;
1314 
1315         private Throwable exception;
1316 
1317         private Statement[] buffer;
1318 
1319         private int size;
1320 
1321         private int mask;
1322 
1323         private boolean disabled;
1324 
1325         DecoupleHandler(final RDFHandler handler) {
1326             super(handler);
1327             this.numCores = Environment.getCores();
1328             this.incomingThreads = new HashSet<Thread>(); // equals/hashCode based on identity
1329             this.futures = new LinkedList<Future<?>>();
1330             this.counter = new AtomicInteger(0);
1331         }
1332 
1333         @Override
1334         public void startRDF() throws RDFHandlerException {
1335             super.startRDF();
1336             this.incomingThreads.clear();
1337             this.futures.clear();
1338             this.exception = null;
1339             this.buffer = new Statement[BUFFER_SIZE];
1340             this.size = 0;
1341             this.mask = 0;
1342             this.disabled = false;
1343         }
1344 
1345         @Override
1346         public void handleStatement(final Statement statement) throws RDFHandlerException {
1347             // Most compact implementation tackling frequent case decoupler is disabled
1348             if (this.disabled) {
1349                 super.handleStatement(statement);
1350             } else {
1351                 handleStatementHelper(statement);
1352             }
1353         }
1354 
1355         private void handleStatementHelper(final Statement statement) throws RDFHandlerException {
1356             if ((this.counter.getAndIncrement() & this.mask) != 0) {
1357                 super.handleStatement(statement);
1358             } else {
1359                 handleStatementInBackground(statement);
1360             }
1361         }
1362 
1363         private void handleStatementInBackground(final Statement statement)
1364                 throws RDFHandlerException {
1365 
1366             Statement[] fullBuffer = null;
1367             synchronized (this) {
1368                 this.buffer[this.size++] = statement;
1369                 if (this.size == BUFFER_SIZE) {
1370                     fullBuffer = this.buffer;
1371                     this.buffer = new Statement[BUFFER_SIZE];
1372                     this.size = 0;
1373                     this.incomingThreads.add(Thread.currentThread());
1374                     checkNotFailed();
1375                     calibrateMask();
1376                     fullBuffer = handleStatementsInBackground(fullBuffer);
1377                 }
1378             }
1379 
1380             if (fullBuffer != null) {
1381                 for (final Statement stmt : fullBuffer) {
1382                     super.handleStatement(stmt);
1383                 }
1384             }
1385         }
1386 
1387         private Statement[] handleStatementsInBackground(final Statement[] buffer) {
1388 
1389             // Update the list of pending futures. Abort if too many tasks pending.
1390             for (final Iterator<Future<?>> i = this.futures.iterator(); i.hasNext();) {
1391                 final Future<?> future = i.next();
1392                 if (future.isDone()) {
1393                     i.remove();
1394                 }
1395             }
1396 
1397             // If there are too many tasks (futures) pending, do not use background processing
1398             if (this.futures.size() >= this.numCores) {
1399                 return buffer;
1400             }
1401 
1402             // Schedule a background task, properly managing exceptions it may throw
1403             this.futures.add(Environment.getPool().submit(new Runnable() {
1404 
1405                 @Override
1406                 public void run() {
1407                     try {
1408                         for (final Statement statement : buffer) {
1409                             DecoupleHandler.super.handleStatement(statement);
1410                         }
1411                     } catch (final Throwable ex) {
1412                         @SuppressWarnings("resource")
1413                         final DecoupleHandler h = DecoupleHandler.this;
1414                         synchronized (h) {
1415                             if (h.exception == null) {
1416                                 h.exception = ex;
1417                             } else {
1418                                 h.exception.addSuppressed(ex);
1419                             }
1420                             for (final Future<?> future : h.futures) {
1421                                 future.cancel(false);
1422                             }
1423                         }
1424                     }
1425                 }
1426 
1427             }));
1428 
1429             // Nothing left to be processed
1430             return null;
1431         }
1432 
1433         private void calibrateMask() {
1434 
1435             // Here we adapt the fraction of incoming statements that are buffered, based on the
1436             // numbers of threads we detected entered the decoupler. The fraction is chosen so to
1437             // divide the work evenly between N incoming threads and #CORES - N background
1438             // threads. In practice, the fraction moves quickly to 1000 (multiple threads in)
1439             // or to 1 (single thread in).
1440             final int numCores = Environment.getCores();
1441             final int numThreads = this.incomingThreads.size();
1442             if (numCores > numThreads) {
1443                 this.mask = Integer.highestOneBit(numCores / (numCores - numThreads)) - 1;
1444             } else {
1445                 this.mask = 0xFFFFFFFF;
1446                 this.disabled = true;
1447                 LOGGER.debug("Decoupler disabled");
1448             }
1449 
1450             // note: we do not declare mask as volatile, so the change will be picked up by
1451             // threads only at a later time, but this is not a problem
1452         }
1453 
1454         private void checkNotFailed() throws RDFHandlerException {
1455             if (this.exception != null) {
1456                 if (this.exception instanceof RDFHandlerException) {
1457                     throw (RDFHandlerException) this.exception;
1458                 } else if (this.exception instanceof RuntimeException) {
1459                     throw (RuntimeException) this.exception;
1460                 } else if (this.exception instanceof Error) {
1461                     throw (Error) this.exception;
1462                 }
1463                 throw new RDFHandlerException(this.exception);
1464             }
1465         }
1466 
1467         @Override
1468         public void endRDF() throws RDFHandlerException {
1469 
1470             // Handle remaining buffered statements
1471             for (int i = 0; i < this.size; ++i) {
1472                 super.handleStatement(this.buffer[i]);
1473             }
1474 
1475             // Wait for completion of pending tasks
1476             List<Future<?>> futuresToWaitFor;
1477             synchronized (this) {
1478                 futuresToWaitFor = new ArrayList<Future<?>>(this.futures);
1479             }
1480             for (final Future<?> future : futuresToWaitFor) {
1481                 while (!future.isDone()) {
1482                     try {
1483                         future.get();
1484                     } catch (final Throwable ex) {
1485                         // Ignore
1486                     }
1487                 }
1488             }
1489 
1490             // Check there were no errors in background processing
1491             checkNotFailed();
1492 
1493             // Propagate
1494             super.endRDF();
1495         }
1496 
1497         @Override
1498         public void close() {
1499             super.close();
1500             synchronized (this) {
1501                 for (final Future<?> future : this.futures) {
1502                     future.cancel(false);
1503                 }
1504             }
1505         }
1506 
1507     }
1508 
1509     private static final class SynchronizeHandler extends AbstractRDFHandlerWrapper {
1510 
1511         SynchronizeHandler(final RDFHandler delegate) {
1512             super(delegate);
1513         }
1514 
1515         @Override
1516         public synchronized void handleComment(final String comment) throws RDFHandlerException {
1517             super.handleComment(comment);
1518         }
1519 
1520         @Override
1521         public synchronized void handleNamespace(final String prefix, final String uri)
1522                 throws RDFHandlerException {
1523             super.handleNamespace(prefix, uri);
1524         }
1525 
1526         @Override
1527         public synchronized void handleStatement(final Statement statement)
1528                 throws RDFHandlerException {
1529             super.handleStatement(statement);
1530         }
1531 
1532     }
1533 
1534     private static final class DecoupleQueueHandler extends AbstractRDFHandlerWrapper {
1535 
1536         private static final int CAPACITY = 4 * 1024;
1537 
1538         private static final Object EOF = new Object();
1539 
1540         private final int numConsumers;
1541 
1542         private AtomicReference<Throwable> exception;
1543 
1544         private BlockingQueue<Object> queue;
1545 
1546         private List<Future<?>> futures;
1547 
1548         DecoupleQueueHandler(final RDFHandler delegate, final int numConsumers) {
1549             super(delegate);
1550             this.numConsumers = numConsumers;
1551         }
1552 
1553         @Override
1554         public void startRDF() throws RDFHandlerException {
1555             super.startRDF();
1556             this.exception = new AtomicReference<>(null);
1557             this.queue = new ArrayBlockingQueue<>(CAPACITY);
1558             this.futures = Lists.newArrayList();
1559             for (int i = 0; i < this.numConsumers; ++i) {
1560                 this.futures.add(Environment.getPool().submit(new Runnable() {
1561 
1562                     @Override
1563                     public void run() {
1564                         try {
1565                             final RDFHandler handler = DecoupleQueueHandler.this.handler;
1566                             while (true) {
1567                                 if (DecoupleQueueHandler.this.exception.get() != null) {
1568                                     break;
1569                                 }
1570                                 final Object element = DecoupleQueueHandler.this.queue.take();
1571                                 if (element instanceof Statement) {
1572                                     handler.handleStatement((Statement) element);
1573                                 } else if (element instanceof NamespaceImpl) {
1574                                     final NamespaceImpl ns = (NamespaceImpl) element;
1575                                     handler.handleNamespace(ns.getPrefix(), ns.getName());
1576                                 } else if (element instanceof String) {
1577                                     handler.handleComment((String) element);
1578                                 } else if (element == EOF) {
1579                                     break;
1580                                 }
1581                             }
1582                         } catch (final Throwable ex) {
1583                             DecoupleQueueHandler.this.exception.set(ex);
1584                             DecoupleQueueHandler.this.queue.clear();
1585                         }
1586                     }
1587 
1588                 }));
1589             }
1590         }
1591 
1592         @Override
1593         public void handleComment(final String comment) throws RDFHandlerException {
1594             check();
1595             put(comment);
1596         }
1597 
1598         @Override
1599         public void handleNamespace(final String prefix, final String uri)
1600                 throws RDFHandlerException {
1601             check();
1602             put(new NamespaceImpl(prefix, uri));
1603         }
1604 
1605         @Override
1606         public void handleStatement(final Statement statement) throws RDFHandlerException {
1607             check();
1608             put(statement);
1609         }
1610 
1611         @Override
1612         public void endRDF() throws RDFHandlerException {
1613             try {
1614                 check();
1615                 put(EOF);
1616                 for (final Future<?> future : this.futures) {
1617                     try {
1618                         future.get();
1619                     } catch (final Throwable ex) {
1620                         this.exception.compareAndSet(null, ex);
1621                     }
1622                 }
1623                 check();
1624                 super.endRDF();
1625             } finally {
1626                 this.exception = null;
1627                 this.queue = null;
1628                 this.futures = null;
1629             }
1630         }
1631 
1632         private void put(final Object object) throws RDFHandlerException {
1633             try {
1634                 this.queue.put(object);
1635             } catch (final InterruptedException ex) {
1636                 this.exception.set(ex);
1637                 throw new RDFHandlerException(ex);
1638             }
1639         }
1640 
1641         private void check() throws RDFHandlerException {
1642             final Throwable ex = this.exception.get();
1643             if (ex != null) {
1644                 Throwables.propagateIfPossible(ex, RDFHandlerException.class);
1645                 throw new RDFHandlerException(ex);
1646             }
1647         }
1648 
1649     }
1650 
1651 }