1   /*
2    * RDFpro - An extensible tool for building stream-oriented RDF processing libraries.
3    * 
4    * Written in 2014 by Francesco Corcoglioniti and Alessio Palmero Aprosio with support by Marco
5    * Amadori, Michele Mostarda and Marco Rospocher. Contact info on http://rdfpro.fbk.eu/
6    * 
7    * To the extent possible under law, the authors have dedicated all copyright and related and
8    * neighboring rights to this software to the public domain worldwide. This software is
9    * distributed without any warranty.
10   * 
11   * You should have received a copy of the CC0 Public Domain Dedication along with this software.
12   * If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.
13   */
14  package eu.fbk.rdfpro;
15  
16  import java.io.BufferedReader;
17  import java.io.File;
18  import java.io.InputStreamReader;
19  import java.io.PrintStream;
20  import java.net.URL;
21  import java.util.ArrayList;
22  import java.util.Arrays;
23  import java.util.Collections;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.Objects;
27  import java.util.concurrent.CountDownLatch;
28  import java.util.function.Predicate;
29  
30  import javax.annotation.Nullable;
31  
32  import org.openrdf.model.BNode;
33  import org.openrdf.model.Resource;
34  import org.openrdf.model.Statement;
35  import org.openrdf.model.URI;
36  import org.openrdf.model.vocabulary.SESAME;
37  import org.openrdf.rio.ParserConfig;
38  import org.openrdf.rio.RDFHandler;
39  import org.openrdf.rio.RDFHandlerException;
40  import org.openrdf.rio.WriterConfig;
41  import org.slf4j.Logger;
42  import org.slf4j.LoggerFactory;
43  
44  import eu.fbk.rdfpro.util.Environment;
45  import eu.fbk.rdfpro.util.IO;
46  import eu.fbk.rdfpro.util.Namespaces;
47  import eu.fbk.rdfpro.util.Options;
48  import eu.fbk.rdfpro.util.Scripting;
49  import eu.fbk.rdfpro.util.Statements;
50  import eu.fbk.rdfpro.util.Tracker;
51  import eu.fbk.rdfpro.vocab.VOIDX;
52  
53  /**
54   * Utility methods dealing with {@code RDFProcessor}s.
55   */
56  public final class RDFProcessors {
57  
58      private static final Logger LOGGER = LoggerFactory.getLogger(RDFProcessors.class);
59  
60      /** The null {@code RDFProcessor} that always produces an empty RDF stream. */
61      public static final RDFProcessor NIL = new RDFProcessor() {
62  
63          @Override
64          public RDFHandler wrap(final RDFHandler handler) {
65              return RDFHandlers.ignoreMethods(Objects.requireNonNull(handler),
66                      RDFHandlers.METHOD_HANDLE_COMMENT | RDFHandlers.METHOD_HANDLE_NAMESPACE
67                              | RDFHandlers.METHOD_HANDLE_STATEMENT);
68          }
69  
70      };
71  
72      /** The identity {@code RDFProcessor} that returns the input RDF stream unchanged. */
73      public static final RDFProcessor IDENTITY = new RDFProcessor() {
74  
75          @Override
76          public RDFHandler wrap(final RDFHandler handler) {
77              return Objects.requireNonNull(handler);
78          }
79  
80      };
81  
82      private RDFProcessors() {
83      }
84  
85      private static String[] tokenize(final String spec) {
86  
87          final List<String> tokens = new ArrayList<String>();
88  
89          final StringBuilder builder = new StringBuilder();
90          boolean quoted = false;
91          boolean escaped = false;
92          int start = -1;
93  
94          for (int i = 0; i < spec.length(); ++i) {
95              final char ch = spec.charAt(i);
96              final boolean ws = Character.isWhitespace(ch);
97              if (ch == '\\' && !escaped) {
98                  escaped = true;
99              } else {
100                 if (start < 0) {
101                     if (!ws) {
102                         start = i;
103                         quoted = ch == '\'' || ch == '\"';
104                         builder.setLength(0);
105                         if (!quoted) {
106                             builder.append(ch);
107                         }
108                     }
109                 } else {
110                     final boolean tokenChar = escaped || quoted && ch != spec.charAt(start)
111                             || !quoted && !ws;
112                     if (tokenChar) {
113                         builder.append(ch);
114                     }
115                     if (!tokenChar || i == spec.length() - 1) {
116                         tokens.add(builder.toString());
117                         start = -1;
118                         quoted = false;
119                     }
120                 }
121                 escaped = false;
122             }
123         }
124 
125         return tokens.toArray(new String[tokens.size()]);
126     }
127 
128     @Nullable
129     private static URI parseURI(@Nullable final String string) {
130         if (string == null) {
131             return null;
132         } else if (!string.contains(":")) {
133             return (URI) Statements.parseValue(string + ":", Namespaces.DEFAULT);
134         } else {
135             return (URI) Statements.parseValue(string, Namespaces.DEFAULT);
136         }
137     }
138 
139     static RDFProcessor create(final String name, final String... args) {
140 
141         switch (name) {
142         case "r":
143         case "read": {
144             final Options options = Options.parse("b!|w|+", args);
145             final String[] fileSpecs = options.getPositionalArgs(String.class).toArray(
146                     new String[0]);
147             final boolean preserveBNodes = !options.hasOption("w");
148             final URI base = parseURI(options.getOptionArg("b", String.class));
149             return read(true, preserveBNodes, base == null ? null : base.stringValue(), null,
150                     fileSpecs);
151         }
152 
153         case "w":
154         case "write": {
155             final Options options = Options.parse("c!|+", args);
156             final int chunkSize = options.getOptionArg("c", Integer.class, 1);
157             final String[] locations = options.getPositionalArgs(String.class).toArray(
158                     new String[0]);
159             return write(null, chunkSize, locations);
160         }
161 
162         case "t":
163         case "transform": {
164             final Options options = Options.parse("+", args);
165             final String spec = String.join(" ", options.getPositionalArgs(String.class));
166             final Transformer transformer = Scripting.isScript(spec) ? Scripting.compile(
167                     Transformer.class, spec, "q", "h") : Transformer.rules(spec);
168             return transform(transformer);
169         }
170 
171         case "u":
172         case "unique": {
173             final Options options = Options.parse("m", args);
174             return unique(options.hasOption("m"));
175         }
176 
177         case "p":
178         case "prefix": {
179             final Options options = Options.parse("f!", args);
180             final String source = options.getOptionArg("f", String.class);
181             Namespaces namespaces = Namespaces.DEFAULT;
182             if (source != null) {
183                 try {
184                     URL url;
185                     final File file = new File(source);
186                     if (file.exists()) {
187                         url = file.toURI().toURL();
188                     } else {
189                         url = RDFProcessors.class.getClassLoader().getResource(source);
190                     }
191                     namespaces = Namespaces.load(Collections.singleton(url), false);
192                 } catch (final Throwable ex) {
193                     throw new IllegalArgumentException(
194                             "Cannot load prefix/namespace bindings from " + source + ": "
195                                     + ex.getMessage(), ex);
196                 }
197             }
198             return prefix(namespaces.prefixMap());
199         }
200 
201         case "smush": {
202             final Options options = Options.parse("x|*", args);
203             final String[] namespaces = options.getPositionalArgs(String.class).toArray(
204                     new String[0]);
205             final boolean hasSmushEasterEgg = options.hasOption("x");
206             if (hasSmushEasterEgg) {
207                 // Below you can find one of the most important contributions by Alessio :-)
208                 // (google for 'smush' if you wonder why we added this easter egg)
209                 final PrintStream p = System.out;
210                 p.println();
211                 p.println(".==================================================================.");
212                 p.println("||    ( )              ( )                ( )              ( )    ||");
213                 p.println("|'================================================================'|");
214                 p.println("||                                                                ||");
215                 p.println("||                                                                ||");
216                 p.println("||                                  .::::.                        ||");
217                 p.println("||                                .::::::::.                      ||");
218                 p.println("||                                :::::::::::                     ||");
219                 p.println("||                                ':::::::::::..                  ||");
220                 p.println("||                                 :::::::::::::::'               ||");
221                 p.println("||                                  ':::::::::::.                 ||");
222                 p.println("||                                    .::::::::::::::'            ||");
223                 p.println("||                                  .:::::::::::...               ||");
224                 p.println("||                                 ::::::::::::::''               ||");
225                 p.println("||                     .:::.       '::::::::''::::                ||");
226                 p.println("||                   .::::::::.      ':::::'  '::::               ||");
227                 p.println("||                  .::::':::::::.    :::::    '::::.             ||");
228                 p.println("||                .:::::' ':::::::::. :::::      ':::.            ||");
229                 p.println("||              .:::::'     ':::::::::.:::::       '::.           ||");
230                 p.println("||            .::::''         '::::::::::::::       '::.          ||");
231                 p.println("||           .::''              '::::::::::::         :::...      ||");
232                 p.println("||        ..::::                  ':::::::::'        .:' ''''     ||");
233                 p.println("||     ..''''':'                    ':::::.'                      ||");
234                 p.println("||                                                                ||");
235                 p.println("||                                                                ||");
236                 p.println("|'================================================================'|");
237                 p.println("||              __________________                                ||");
238                 p.println("||              | ___ \\  _  \\  ___|                               ||");
239                 p.println("||              | |_/ / | | | |_                                  ||");
240                 p.println("||              |    /| | | |  _|                                 ||");
241                 p.println("||              | |\\ \\| |/ /| |  ___  ___  ____                   ||");
242                 p.println("||              \\_| \\_|___/ \\_| / _ \\/ _ \\/ __ \\                  ||");
243                 p.println("||                             / ___/ , _/ /_/ /                  ||");
244                 p.println("||                            /_/  /_/|_|\\____/                   ||");
245                 p.println("||                                                                ||");
246                 p.println("'=============================================================LGB=='");
247                 p.println();
248             }
249 
250             for (int i = 0; i < namespaces.length; ++i) {
251                 namespaces[i] = parseURI(namespaces[i]).stringValue();
252             }
253             return smush(namespaces);
254         }
255 
256         case "tbox": {
257             Options.parse("", args);
258             return tbox();
259         }
260 
261         case "rdfs": {
262             final Options options = Options.parse("d|e!|C|c!|b!|t|w|+", args);
263             final URI base = parseURI(options.getOptionArg("b", String.class));
264             final boolean preserveBNodes = !options.hasOption("w");
265             final String[] fileSpecs = options.getPositionalArgs(String.class).toArray(
266                     new String[0]);
267             final RDFSource tbox = RDFProcessors.track(
268                     new Tracker(LOGGER, null, "%d TBox triples read (%d tr/s avg)", //
269                             "%d TBox triples read (%d tr/s, %d tr/s avg)")).wrap(
270                     RDFSources.read(true, preserveBNodes,
271                             base == null ? null : base.stringValue(), null, fileSpecs));
272             final boolean decomposeOWLAxioms = options.hasOption("d");
273             final boolean dropBNodeTypes = options.hasOption("t");
274             String[] excludedRules = new String[0];
275             if (options.hasOption("e")) {
276                 excludedRules = options.getOptionArg("e", String.class).split(",");
277             }
278             URI context = null;
279             if (options.hasOption("C")) {
280                 context = SESAME.NIL;
281             } else if (options.hasOption("c")) {
282                 context = parseURI(options.getOptionArg("c", String.class));
283             }
284             return rdfs(tbox, context, decomposeOWLAxioms, dropBNodeTypes, excludedRules);
285         }
286 
287         case "stats": {
288             final Options options = Options.parse("n!|p!|c!|t!|o", args);
289             final URI namespace = parseURI(options.getOptionArg("n", String.class));
290             final URI property = parseURI(options.getOptionArg("p", String.class));
291             final URI context = parseURI(options.getOptionArg("c", String.class));
292             final Long threshold = options.getOptionArg("t", Long.class);
293             final boolean processCooccurrences = options.hasOption("o");
294             return stats(namespace == null ? null : namespace.stringValue(), property, context,
295                     threshold, processCooccurrences);
296         }
297 
298         case "download": {
299             final Options options = Options.parse("w|q!|f!|!", args);
300             final boolean preserveBNodes = !options.hasOption("w");
301             final String endpointURL = parseURI(options.getPositionalArg(0, String.class))
302                     .stringValue();
303             String query = options.getOptionArg("q", String.class);
304             if (query == null) {
305                 final String source = options.getOptionArg("f", String.class);
306                 try {
307                     final File file = new File(source);
308                     URL url;
309                     if (file.exists()) {
310                         url = file.toURI().toURL();
311                     } else {
312                         url = RDFProcessors.class.getClassLoader().getResource(source);
313                     }
314                     final BufferedReader reader = new BufferedReader(new InputStreamReader(
315                             url.openStream()));
316                     try {
317                         final StringBuilder builder = new StringBuilder();
318                         String line;
319                         while ((line = reader.readLine()) != null) {
320                             builder.append(line);
321                         }
322                         query = builder.toString();
323                     } finally {
324                         IO.closeQuietly(reader);
325                     }
326                 } catch (final Throwable ex) {
327                     throw new IllegalArgumentException("Cannot load SPARQL query from " + source
328                             + ": " + ex.getMessage(), ex);
329                 }
330             }
331             return download(true, preserveBNodes, endpointURL, query);
332         }
333 
334         case "upload": {
335             final Options options = Options.parse("!", args);
336             final String endpointURL = parseURI(options.getPositionalArg(0, String.class))
337                     .stringValue();
338             return upload(endpointURL);
339         }
340 
341         case "mapreduce": {
342             final Options options = Options.parse("b!|r!|e!|a!|u|+", args);
343             final boolean deduplicate = options.hasOption("u");
344             final String bypassExp = options.getOptionArg("b", String.class);
345             final String existsExp = options.getOptionArg("e", String.class);
346             final String forallExp = options.getOptionArg("a", String.class);
347             final String reducerExp = options.getOptionArg("r", String.class);
348             final Predicate<Statement> bypassPred = Statements.statementMatcher(bypassExp);
349             final Predicate<Statement> existsPred = Statements.statementMatcher(existsExp);
350             final Predicate<Statement> forallPred = Statements.statementMatcher(forallExp);
351             Reducer reducer = reducerExp == null ? Reducer.IDENTITY //
352                     : Scripting.compile(Reducer.class, reducerExp, "k", "p", "h");
353             reducer = Reducer.filter(reducer, existsPred, forallPred);
354             final List<Mapper> mappers = new ArrayList<>();
355             for (final String mapperExp : options.getPositionalArgs(String.class)) {
356                 mappers.add(Mapper.parse(mapperExp));
357             }
358             Mapper mapper = Mapper.concat(mappers.toArray(new Mapper[mappers.size()]));
359             if (bypassPred != null) {
360                 mapper = Mapper.bypass(mapper, bypassPred);
361             }
362             return mapReduce(mapper, reducer, deduplicate);
363         }
364 
365         default:
366             throw new Error("Unsupported " + name + " processor, check properties file");
367         }
368     }
369 
370     /**
371      * Creates an {@code RDFProcessor} by parsing the supplied specification string(s). The
372      * specification can be already tokenized or the method can be asked by tokenize it itself
373      * (set {@code tokenize = true}).
374      *
375      * @param tokenize
376      *            true if input string(s) should be tokenized (again)
377      * @param args
378      *            the input string(s)
379      * @return the created {@code RDFProcessor}
380      */
381     public static RDFProcessor parse(final boolean tokenize, final String... args) {
382         List<String> list;
383         if (tokenize) {
384             list = new ArrayList<>();
385             for (final String arg : args) {
386                 list.addAll(Arrays.asList(tokenize(arg)));
387             }
388         } else {
389             list = Arrays.asList(args);
390         }
391         return new Parser(list).parse();
392     }
393 
394     /**
395      * Returns an {@code RDFProcessor} performing the parallel composition of the processors
396      * specified, using the given {@code SetOperator} to merge their results.
397      *
398      * @param operator
399      *            the {@code SetOperator} to use for merging the results of composed processors,
400      *            not null
401      * @param processors
402      *            the processors to compose in parallel
403      * @return the resulting {@code RDFProcessor}
404      */
405     public static RDFProcessor parallel(final SetOperator operator,
406             final RDFProcessor... processors) {
407 
408         Objects.requireNonNull(operator);
409 
410         if (processors.length == 0) {
411             throw new IllegalArgumentException("At least one processor should be supplied "
412                     + "in a parallel composition");
413         }
414 
415         int count = 0;
416         for (final RDFProcessor processor : processors) {
417             count = Math.max(count, processor.getExtraPasses());
418         }
419         final int extraPasses = count;
420 
421         return new RDFProcessor() {
422 
423             @Override
424             public int getExtraPasses() {
425                 return extraPasses;
426             }
427 
428             @Override
429             public RDFHandler wrap(final RDFHandler handler) {
430 
431                 Objects.requireNonNull(handler);
432 
433                 final int numProcessors = processors.length;
434 
435                 final int[] extraPasses = new int[numProcessors];
436                 final RDFHandler[] handlers = RDFHandlers
437                         .collect(handler, numProcessors, operator);
438 
439                 for (int i = 0; i < numProcessors; ++i) {
440                     final RDFProcessor processor = processors[i];
441                     extraPasses[i] = processor.getExtraPasses();
442                     handlers[i] = processor.wrap(handlers[i]);
443                 }
444 
445                 return RDFHandlers.dispatchAll(handlers, extraPasses);
446             }
447 
448         };
449     }
450 
451     /**
452      * Returns an {@code RDFProcessor} performing the sequence composition of the supplied
453      * {@code RDFProcessors}. In a sequence composition, the first processor is applied first to
454      * the stream, with its output fed to the next processor and so on.
455      *
456      * @param processors
457      *            the processor to compose in a sequence
458      * @return the resulting {@code RDFProcessor}
459      */
460     public static RDFProcessor sequence(final RDFProcessor... processors) {
461 
462         if (processors.length == 0) {
463             throw new IllegalArgumentException("At least one processor should be supplied "
464                     + "in a sequence composition");
465         }
466 
467         if (processors.length == 1) {
468             return Objects.requireNonNull(processors[0]);
469         }
470 
471         int count = 0;
472         for (final RDFProcessor processor : processors) {
473             count += processor.getExtraPasses();
474         }
475         final int extraPasses = count;
476 
477         return new RDFProcessor() {
478 
479             @Override
480             public int getExtraPasses() {
481                 return extraPasses;
482             }
483 
484             @Override
485             public RDFHandler wrap(final RDFHandler handler) {
486                 RDFHandler result = Objects.requireNonNull(handler);
487                 for (int i = processors.length - 1; i >= 0; --i) {
488                     result = processors[i].wrap(result);
489                 }
490                 return result;
491             }
492         };
493     }
494 
495     /**
496      * Creates an {@code RDFProcessor} that processes the RDF stream in a MapReduce fashion. The
497      * method is parameterized by a {@link Mapper} and a {@link Reducer} object, which perform the
498      * actual computation, and a {@code deduplicate} flag that controls whether duplicate
499      * statements mapped to the same key by the mapper should be merged. MapReduce is performed
500      * relying on external sorting: input statements are mapped to a {@code Value} key, based on
501      * which they are sorted (externally); each key partition is then fed to the reducer and the
502      * reducer output emitted. Hadoop is not involved :-) - this scheme is limited to a single
503      * machine environment on one hand; on the other it exploits this limitation by using
504      * available memory to encode sorted data, thus limiting its volume and speeding up the
505      * operation.
506      *
507      * @param mapper
508      *            the mapper, not null
509      * @param reducer
510      *            the reducer, not null
511      * @param deduplicate
512      *            true if duplicate statements mapped to the same key should be merged
513      * @return the created {@code RDFProcessor}
514      */
515     public static RDFProcessor mapReduce(final Mapper mapper, final Reducer reducer,
516             final boolean deduplicate) {
517         return new ProcessorMapReduce(mapper, reducer, deduplicate);
518     }
519 
520     /**
521      * Creates an {@code RDFProcessor} that augments the RDF stream with prefix-to-namespace
522      * bindings from the supplied map or from {@code prefix.cc}. NOTE: if a map is supplied, it is
523      * important it is not changed externally while the produced {@code RDFProcessor} is in use,
524      * as this will alter the RDF stream produced at each pass and may cause race conditions.
525      *
526      * @param nsToPrefixMap
527      *            the prefix-to-namespace map to use; if null, a builtin map derived from data of
528      *            {@code prefix.cc} will be used
529      * @return the created {@code RDFProcessor}
530      */
531     public static RDFProcessor prefix(@Nullable final Map<String, String> nsToPrefixMap) {
532         return new ProcessorPrefix(nsToPrefixMap);
533     }
534 
535     /**
536      * Creates an {@code RDFProcessor} that computes the RDFS closure of the RDF stream based on
537      * the TBox separately supplied.
538      *
539      * @param tbox
540      *            a {@code RDFSource} providing access to TBox data, not null
541      * @param tboxContext
542      *            the context where to emit TBox data; if null TBox is not emitted (use
543      *            {@link SESAME#NIL} for emitting data in the default context)
544      * @param decomposeOWLAxioms
545      *            true if simple OWL axioms mappable to RDFS (e.g. {@code owl:equivalentClass}
546      *            should be decomposed to corresponding RDFS axioms (OWL axioms are otherwise
547      *            ignored when computing the closure)
548      * @param dropBNodeTypes
549      *            true if {@code <x rdf:type _:b>} statements should not be emitted (as
550      *            uninformative); note that this option does not prevent this statements to be
551      *            used for inference (even if dropped), possibly leading to infer statements that
552      *            are not dropped
553      * @param excludedRules
554      *            a vararg array with the names of the RDFS rule to exclude; if empty, all the
555      *            RDFS rules will be used
556      * @return the created {@code RDFProcessor}
557      */
558     public static RDFProcessor rdfs(final RDFSource tbox, @Nullable final Resource tboxContext,
559             final boolean decomposeOWLAxioms, final boolean dropBNodeTypes,
560             final String... excludedRules) {
561         return new ProcessorRDFS(tbox, tboxContext, decomposeOWLAxioms, dropBNodeTypes,
562                 excludedRules);
563     }
564 
565     /**
566      * Creates an {@code RDFProcessor} performing {@code owl:sameAs} smushing. A ranked list of
567      * namespaces controls the selection of the canonical URI for each coreferring URI cluster.
568      * {@code owl:sameAs} statements are emitted in output linking the selected canonical URI to
569      * the other entity aliases.
570      *
571      * @param rankedNamespaces
572      *            the ranked list of namespaces used to select canonical URIs
573      * @return the created {@code RDFProcessor}
574      */
575     public static RDFProcessor smush(final String... rankedNamespaces) {
576         return new ProcessorSmush(rankedNamespaces);
577     }
578 
579     /**
580      * Creates an {@code RDFProcessor} extracting VOID structural statistics from the RDF stream.
581      * A VOID dataset is associated to the whole input and to each set of graphs associated to the
582      * same 'source' URI with a configurable property, specified by {@code sourceProperty}; if
583      * parameter {@code sourceContext} is not null, these association statements are searched only
584      * in the graph with the URI specified. Class and property partitions are then generated for
585      * each of these datasets, assigning them URIs in the namespace given by
586      * {@code outputNamespace} (if null, a default namespace is used). In addition to standard
587      * VOID terms, the processor emits additional statements based on the {@link VOIDX} extension
588      * vocabulary to express the number of TBox, ABox, {@code rdf:type} and {@code owl:sameAs}
589      * statements, the average number of properties per entity and informative labels and examples
590      * for each TBox term, which are then viewable in tools such as Protégé. Internally, the
591      * processor makes use of external sorting to (conceptually) sort the RDF stream twice: first
592      * based on the subject to group statements about the same entity and compute entity-based and
593      * distinct subjects statistics; then based on the object to compute distinct objects
594      * statistics. Therefore, computing VOID statistics is quite a slow operation.
595      *
596      * @param outputNamespace
597      *            the namespace for generated URIs (if null, a default is used)
598      * @param sourceProperty
599      *            the URI of property linking graphs to sources (if null, sources will not be
600      *            considered)
601      * @param sourceContext
602      *            the graph where to look for graph-to-source links (if null, will be searched in
603      *            the whole RDF stream)
604      * @param threshold
605      *            the minimum number of statements or entities that a VOID partition must have in
606      *            order to be emitted; this parameter allows to drop VOID partitions for
607      *            infrequent concepts, sensibly reducing the output size
608      * @param processCooccurrences
609      *            true to enable analysis of co-occurrences for computing {@code void:classes} and
610      *            {@code void:properties} statements
611      * @return the created {@code RDFProcessor}
612      */
613     public static RDFProcessor stats(@Nullable final String outputNamespace,
614             @Nullable final URI sourceProperty, @Nullable final URI sourceContext,
615             @Nullable final Long threshold, final boolean processCooccurrences) {
616         return new ProcessorStats(outputNamespace, sourceProperty, sourceContext, threshold,
617                 processCooccurrences);
618     }
619 
620     /**
621      * Returns a {@code RDFProcessor} that extracts the TBox of data in the RDF stream.
622      *
623      * @return the TBox-extracting {@code RDFProcessor}
624      */
625     public static RDFProcessor tbox() {
626         return ProcessorTBox.INSTANCE;
627     }
628 
629     /**
630      * Returns a {@code RDFProcessor} that applies the supplied {@code Transformer} to each input
631      * triple, producing the transformed triples in output.
632      *
633      * @param transformer
634      *            the transformer, not null
635      * @return the created {@code RDFProcessor}
636      */
637     public static RDFProcessor transform(final Transformer transformer) {
638         Objects.requireNonNull(transformer);
639         return new RDFProcessor() {
640 
641             @Override
642             public RDFHandler wrap(final RDFHandler handler) {
643                 return new AbstractRDFHandlerWrapper(Objects.requireNonNull(handler)) {
644 
645                     @Override
646                     public void handleStatement(final Statement statement)
647                             throws RDFHandlerException {
648                         transformer.transform(statement, this.handler);
649                     }
650 
651                 };
652             }
653 
654         };
655     }
656 
657     /**
658      * Creates an {@code RDFProcessor} that removes duplicate from the RDF stream, optionally
659      * merging similar statements with different contexts in a unique statement.
660      *
661      * @param mergeContexts
662      *            true if statements with same subject, predicate and object but different context
663      *            should be merged in a single statement, whose context is a combination of the
664      *            source contexts
665      * @return the created {@code RDFProcessor}
666      */
667     public static RDFProcessor unique(final boolean mergeContexts) {
668         return new ProcessorUnique(mergeContexts);
669     }
670 
671     /**
672      * Creates an {@code RDFProcessor} that injects in the RDF stream the data loaded from the
673      * specified {@code RDFSource}. Data is read and injected at every pass on the RDF stream.
674      *
675      * @param source
676      *            the {@code RDFSource}, not null
677      * @return the created {@code RDFProcessor}
678      */
679     public static RDFProcessor inject(final RDFSource source) {
680         Objects.requireNonNull(source);
681         return new RDFProcessor() {
682 
683             @Override
684             public RDFHandler wrap(final RDFHandler handler) {
685                 return new InjectSourceHandler(Objects.requireNonNull(handler), source, null);
686             }
687 
688         };
689     }
690 
691     /**
692      * Creates an {@code RDFProcessor} that reads data from the files specified and inject it in
693      * the RDF stream at each pass. This is a utility method that relies on
694      * {@link #inject(RDFSource)}, on
695      * {@link RDFSources#read(boolean, boolean, String, ParserConfig, String...)} and on
696      * {@link #track(Tracker)} for providing progress information on loaded statements.
697      *
698      * @param parallelize
699      *            false if files should be parsed sequentially using only one thread
700      * @param preserveBNodes
701      *            true if BNodes in parsed files should be preserved, false if they should be
702      *            rewritten on a per-file basis to avoid possible clashes
703      * @param baseURI
704      *            the base URI to be used for resolving relative URIs, possibly null
705      * @param config
706      *            the optional {@code ParserConfig} for the fine tuning of the used RDF parser; if
707      *            null a default, maximally permissive configuration will be used
708      * @param locations
709      *            the locations of the RDF files to be read
710      * @return the created {@code RDFProcessor}
711      */
712     public static RDFProcessor read(final boolean parallelize, final boolean preserveBNodes,
713             @Nullable final String baseURI, @Nullable final ParserConfig config,
714             final String... locations) {
715         final RDFProcessor tracker = track(new Tracker(LOGGER, null,
716                 "%d triples read (%d tr/s avg)", //
717                 "%d triples read (%d tr/s, %d tr/s avg)"));
718         final RDFSource source = RDFSources.read(parallelize, preserveBNodes, baseURI, config,
719                 locations);
720         return inject(tracker.wrap(source));
721     }
722 
723     /**
724      * Creates an {@code RDFProcessor} that retrieves data from a SPARQL endpoint and inject it in
725      * the RDF stream at each pass. This is a utility method that relies on
726      * {@link #inject(RDFSource)}, on {@link RDFSources#query(boolean, boolean, String, String)}
727      * and on {@link #track(Tracker)} for providing progress information on fetched statements.
728      * NOTE: as SPARQL does not provide any guarantee on the identifiers of returned BNodes, it
729      * may happen that different BNodes are returned in different passes, causing the RDF stream
730      * produced by this {@code RDFProcessor} to change from one pass to another.
731      *
732      * @param parallelize
733      *            true if query results should be handled by multiple threads in parallel
734      * @param endpointURL
735      *            the URL of the SPARQL endpoint, not null
736      * @param query
737      *            the SPARQL query (CONSTRUCT or SELECT form) to submit to the endpoint
738      * @param preserveBNodes
739      *            true if BNodes in the query result should be preserved, false if they should be
740      *            rewritten on a per-endpoint basis to avoid possible clashes
741      * @return the created {@code RDFProcessor}
742      */
743     public static RDFProcessor download(final boolean parallelize, final boolean preserveBNodes,
744             final String endpointURL, final String query) {
745         final RDFProcessor tracker = track(new Tracker(LOGGER, null,
746                 "%d triples queried (%d tr/s avg)", //
747                 "%d triples queried (%d tr/s, %d tr/s avg)"));
748         final RDFSource source = RDFSources.query(parallelize, preserveBNodes, endpointURL, query);
749         return inject(tracker.wrap(source));
750     }
751 
752     /**
753      * Creates an {@code RDFProcessor} that duplicates data of the RDF stream to the
754      * {@code RDFHandlers} specified. The produced processor can be used to 'peek' into the RDF
755      * stream, possibly allowing to fork the stream. Note that RDF data is emitted to the supplied
756      * handlers at each pass; if this is not the desired behavior, please wrap the handlers using
757      * {@link RDFHandlers#ignorePasses(RDFHandler, int)}.
758      *
759      * @param handlers
760      *            the handlers to duplicate RDF data to
761      * @return the created {@code RDFProcessor}
762      */
763     public static RDFProcessor tee(final RDFHandler... handlers) {
764         if (handlers.length == 0) {
765             return IDENTITY;
766         }
767         return new RDFProcessor() {
768 
769             @Override
770             public RDFHandler wrap(final RDFHandler handler) {
771                 final RDFHandler[] allHandlers = new RDFHandler[handlers.length + 1];
772                 allHandlers[0] = Objects.requireNonNull(handler);
773                 System.arraycopy(handlers, 0, allHandlers, 1, handlers.length);
774                 return RDFHandlers.dispatchAll(allHandlers);
775             }
776 
777         };
778     }
779 
780     /**
781      * Creates an {@code RDFProcessor} that writes data of the RDF stream to the files specified.
782      * This is a utility method that relies on {@link #tee(RDFHandler...)}, on
783      * {@link RDFHandlers#write(WriterConfig, int, String...)} and on {@link #track(Tracker)} for
784      * reporting progress information about written statements. Note that data is written only at
785      * the first pass.
786      *
787      * @param config
788      *            the optional {@code WriterConfig} for fine tuning the writing process; if null,
789      *            a default configuration enabling pretty printing will be used
790      * @param chunkSize
791      *            the number of consecutive statements to be written as a single chunk to a single
792      *            location (increase it to preserve locality)
793      * @param locations
794      *            the locations of the files to write
795      * @return the created {@code RDFProcessor}
796      */
797     public static RDFProcessor write(@Nullable final WriterConfig config, final int chunkSize,
798             final String... locations) {
799         if (locations.length == 0) {
800             return IDENTITY;
801         }
802         final RDFHandler handler = RDFHandlers.write(config, chunkSize, locations);
803         final RDFProcessor tracker = track(new Tracker(LOGGER, null, //
804                 "%d triples written (%d tr/s avg)", //
805                 "%d triples written (%d tr/s, %d tr/s avg)"));
806         return tee(RDFHandlers.ignorePasses(tracker.wrap(handler), 1));
807     }
808 
809     /**
810      * Creates an {@code RDFProcessor} that uploads data of the RDF stream to the SPARQL endpoint
811      * specified, using SPARQL Update INSERT DATA calls. This is a utility method that relies on
812      * {@link #tee(RDFHandler...)}, on {@link RDFHandlers#update(String)} and on
813      * {@link #track(Tracker)} for reporting progress information about uploaded statements. Note
814      * that data is uploaded only at the first pass.
815      *
816      * @param endpointURL
817      *            the URL of the SPARQL Update endpoint, not null
818      * @return the created {@code RDFProcessor}
819      */
820     public static RDFProcessor upload(final String endpointURL) {
821         final RDFProcessor tracker = track(new Tracker(LOGGER, null, //
822                 "%d triples uploaded (%d tr/s avg)", //
823                 "%d triples uploaded (%d tr/s, %d tr/s avg)"));
824         final RDFHandler handler = tracker.wrap(RDFHandlers.update(endpointURL));
825         return tee(handler);
826     }
827 
828     /**
829      * Returns an {@code RDFProcessor} that tracks the number of statements flowing through it
830      * using the supplied {@code Tracker} object.
831      *
832      * @param tracker
833      *            the tracker object
834      * @return an {@code RDFProcessor} that tracks the number of RDF statements passing through it
835      */
836     public static RDFProcessor track(final Tracker tracker) {
837 
838         Objects.requireNonNull(tracker);
839 
840         return new RDFProcessor() {
841 
842             @Override
843             public RDFHandler wrap(final RDFHandler handler) {
844                 return new AbstractRDFHandlerWrapper(Objects.requireNonNull(handler)) {
845 
846                     @Override
847                     public void startRDF() throws RDFHandlerException {
848                         tracker.start();
849                         super.startRDF();
850                     }
851 
852                     @Override
853                     public void handleStatement(final Statement statement)
854                             throws RDFHandlerException {
855                         super.handleStatement(statement);
856                         tracker.increment();
857                     }
858 
859                     @Override
860                     public void endRDF() throws RDFHandlerException {
861                         try {
862                             super.endRDF();
863                         } finally {
864                             tracker.end();
865                         }
866                     }
867                 };
868             }
869 
870         };
871     }
872 
873     /**
874      * Returns an {@code RDFProcessor} that applies the ruleset specified on input statements
875      * either as a whole or partitioned based on an optional {@code Mapper}.
876      *
877      * @param ruleset
878      *            the ruleset to apply
879      * @param mapper
880      *            the optional mapper for partitioning input statements, possibly null
881      * @param dropBNodeTypes
882      *            true to drop output {@code rdf:type} statements with a {@link BNode} object
883      * @param deduplicate
884      *            true to enforce that output statements do not contain duplicates (if false,
885      *            duplicates might be returned if this enables the rule engine to operate faster)
886      * @return the created {@code RDFProcessor}
887      */
888     public static RDFProcessor rules(final Ruleset ruleset, @Nullable final Mapper mapper,
889             final boolean dropBNodeTypes, final boolean deduplicate) {
890         return new ProcessorRules(ruleset, mapper, dropBNodeTypes, deduplicate, null, false, null);
891     }
892 
893     /**
894      * Returns an {@code RDFProcessor} that expands the ruleset based on the supplied TBox and
895      * applies the resulting ruleset on input statements either as a whole or partitioned based on
896      * an optional {@code Mapper}.
897      *
898      * @param ruleset
899      *            the ruleset to apply
900      * @param mapper
901      *            the optional mapper for partitioning input statements, possibly null
902      * @param dropBNodeTypes
903      *            true to drop output {@code rdf:type} statements with a {@link BNode} object
904      * @param deduplicate
905      *            true to enforce that output statements do not contain duplicates (if false,
906      *            duplicates might be returned if this enables the rule engine to operate faster)
907      * @param tboxData
908      *            the {@code RDFSource} of TBox data; null to disable TBox expansion
909      * @param emitTBox
910      *            true to emit TBox data (closed based on rules in the supplied {@code Ruleset})
911      * @param tboxContext
912      *            the context where to emit closed TBox data; null to emit TBox statements with
913      *            their original contexts (use {@link SESAME#NIL} for emitting TBox data in the
914      *            default context)
915      * @return the created {@code RDFProcessor}
916      */
917     public static RDFProcessor rules(final Ruleset ruleset, @Nullable final Mapper mapper,
918             final boolean dropBNodeTypes, final boolean deduplicate,
919             @Nullable final RDFSource tboxData, final boolean emitTBox,
920             @Nullable final URI tboxContext) {
921         return new ProcessorRules(ruleset, mapper, dropBNodeTypes, deduplicate, tboxData,
922                 emitTBox, tboxContext);
923     }
924 
925     private static class InjectSourceHandler extends AbstractRDFHandler {
926 
927         @Nullable
928         private final RDFHandler handler;
929 
930         private final RDFSource source;
931 
932         @Nullable
933         private final Tracker tracker;
934 
935         private RDFHandler sourceHandler;
936 
937         @Nullable
938         private CountDownLatch latch;
939 
940         @Nullable
941         private volatile Throwable exception;
942 
943         InjectSourceHandler(final RDFHandler handler, final RDFSource source,
944                 @Nullable final Tracker tracker) {
945             this.handler = Objects.requireNonNull(handler);
946             this.source = Objects.requireNonNull(source);
947             this.tracker = tracker;
948             this.sourceHandler = handler;
949             this.latch = null;
950             this.exception = null;
951         }
952 
953         @Override
954         public void startRDF() throws RDFHandlerException {
955             this.handler.startRDF();
956             this.latch = new CountDownLatch(1);
957             Environment.getPool().execute(new Runnable() {
958 
959                 @Override
960                 public void run() {
961                     inject();
962                 }
963 
964             });
965         }
966 
967         @Override
968         public void handleComment(final String comment) throws RDFHandlerException {
969             checkNotFailed();
970             this.handler.handleComment(comment);
971         }
972 
973         @Override
974         public void handleNamespace(final String prefix, final String uri)
975                 throws RDFHandlerException {
976             checkNotFailed();
977             this.handler.handleNamespace(prefix, uri);
978         }
979 
980         @Override
981         public void handleStatement(final Statement statement) throws RDFHandlerException {
982             checkNotFailed();
983             this.handler.handleStatement(statement);
984         }
985 
986         @Override
987         public void endRDF() throws RDFHandlerException {
988             try {
989                 if (this.latch != null) {
990                     this.latch.await();
991                 }
992             } catch (final InterruptedException ex) {
993                 this.exception = ex;
994             }
995             checkNotFailed();
996             this.handler.endRDF();
997         }
998 
999         @Override
1000         public void close() {
1001             IO.closeQuietly(this.handler);
1002             this.sourceHandler = null; // will ultimately stop the download process
1003         }
1004 
1005         private void inject() {
1006             try {
1007                 InjectSourceHandler.this.source.emit(new AbstractRDFHandler() {
1008 
1009                     @Override
1010                     public void handleComment(final String comment) throws RDFHandlerException {
1011                         InjectSourceHandler.this.sourceHandler.handleComment(comment);
1012                     }
1013 
1014                     @Override
1015                     public void handleNamespace(final String prefix, final String uri)
1016                             throws RDFHandlerException {
1017                         InjectSourceHandler.this.sourceHandler.handleNamespace(prefix, uri);
1018                     }
1019 
1020                     @Override
1021                     public void handleStatement(final Statement statement)
1022                             throws RDFHandlerException {
1023                         InjectSourceHandler.this.sourceHandler.handleStatement(statement);
1024                         if (InjectSourceHandler.this.tracker != null) {
1025                             InjectSourceHandler.this.tracker.increment();
1026                         }
1027                     }
1028 
1029                 }, 1);
1030             } catch (final Throwable ex) {
1031                 if (this.sourceHandler != null) {
1032                     this.exception = ex;
1033                 }
1034             } finally {
1035                 this.latch.countDown();
1036             }
1037         }
1038 
1039         private void checkNotFailed() throws RDFHandlerException {
1040             if (this.exception != null) {
1041                 if (this.exception instanceof RDFHandlerException) {
1042                     throw (RDFHandlerException) this.exception;
1043                 } else if (this.exception instanceof RuntimeException) {
1044                     throw (RuntimeException) this.exception;
1045                 } else if (this.exception instanceof Error) {
1046                     throw (Error) this.exception;
1047                 }
1048                 throw new RDFHandlerException(this.exception);
1049             }
1050         }
1051 
1052     }
1053 
1054     private static final class Parser {
1055 
1056         private static final int EOF = 0;
1057 
1058         private static final int COMMAND = 1;
1059 
1060         private static final int OPTION = 2;
1061 
1062         private static final int OPEN_BRACE = 3;
1063 
1064         private static final int COMMA = 4;
1065 
1066         private static final int CLOSE_BRACE = 5;
1067 
1068         private final List<String> tokens;
1069 
1070         private String token;
1071 
1072         private int type;
1073 
1074         private int pos;
1075 
1076         Parser(final List<String> tokens) {
1077 
1078             this.tokens = tokens;
1079             this.token = null;
1080             this.type = 0;
1081             this.pos = 0;
1082 
1083             next();
1084         }
1085 
1086         RDFProcessor parse() {
1087             final RDFProcessor processor = parseSequence();
1088             if (this.type != EOF) {
1089                 syntaxError("<EOF>");
1090             }
1091             return processor;
1092         }
1093 
1094         private RDFProcessor parseSequence() {
1095             final List<RDFProcessor> processors = new ArrayList<RDFProcessor>();
1096             do {
1097                 if (this.type == COMMAND) {
1098                     processors.add(parseCommand());
1099                 } else if (this.type == OPEN_BRACE) {
1100                     processors.add(parseParallel());
1101                 } else {
1102                     syntaxError("'@command' or '{'");
1103                 }
1104             } while (this.type == COMMAND || this.type == OPEN_BRACE);
1105             return sequence(processors.toArray(new RDFProcessor[processors.size()]));
1106         }
1107 
1108         private RDFProcessor parseParallel() {
1109             final List<RDFProcessor> processors = new ArrayList<RDFProcessor>();
1110             do {
1111                 next();
1112                 processors.add(parseSequence());
1113             } while (this.type == COMMA);
1114             if (this.type != CLOSE_BRACE) {
1115                 syntaxError("'}x'");
1116             }
1117             final String mod = this.token.length() == 1 ? "a" : this.token.substring(1);
1118             final SetOperator merging = SetOperator.valueOf(mod);
1119             next();
1120             return parallel(merging, processors.toArray(new RDFProcessor[processors.size()]));
1121         }
1122 
1123         private RDFProcessor parseCommand() {
1124             final String command = this.token.substring(1).toLowerCase();
1125             final List<String> args = new ArrayList<String>();
1126             while (next() == OPTION) {
1127                 args.add(this.token);
1128             }
1129             return Environment.newPlugin(RDFProcessor.class, command,
1130                     args.toArray(new String[args.size()]));
1131         }
1132 
1133         private void syntaxError(final String expected) {
1134             throw new IllegalArgumentException("Invalid specification. Expected " + expected
1135                     + ", found '" + this.token + "'");
1136         }
1137 
1138         private int next() {
1139             if (this.pos == this.tokens.size()) {
1140                 this.token = "<EOF>";
1141                 this.type = EOF;
1142             } else {
1143                 this.token = this.tokens.get(this.pos++);
1144                 final char ch = this.token.charAt(0);
1145                 if (ch == '@') {
1146                     this.type = COMMAND;
1147                 } else if (ch == '}') {
1148                     this.type = CLOSE_BRACE;
1149                 } else if ("{".equals(this.token)) {
1150                     this.type = OPEN_BRACE;
1151                 } else if (",".equals(this.token)) {
1152                     this.type = COMMA;
1153                 } else {
1154                     this.type = OPTION;
1155                 }
1156             }
1157             return this.type;
1158         }
1159 
1160     }
1161 
1162 }