1
2
3
4
5
6
7
8
9
10
11
12
13
14 package eu.fbk.rdfpro;
15
16 import java.io.BufferedReader;
17 import java.io.File;
18 import java.io.InputStreamReader;
19 import java.io.PrintStream;
20 import java.net.URL;
21 import java.util.ArrayList;
22 import java.util.Arrays;
23 import java.util.Collections;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Objects;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.function.Predicate;
29
30 import javax.annotation.Nullable;
31
32 import org.openrdf.model.BNode;
33 import org.openrdf.model.Resource;
34 import org.openrdf.model.Statement;
35 import org.openrdf.model.URI;
36 import org.openrdf.model.vocabulary.SESAME;
37 import org.openrdf.rio.ParserConfig;
38 import org.openrdf.rio.RDFHandler;
39 import org.openrdf.rio.RDFHandlerException;
40 import org.openrdf.rio.WriterConfig;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 import eu.fbk.rdfpro.util.Environment;
45 import eu.fbk.rdfpro.util.IO;
46 import eu.fbk.rdfpro.util.Namespaces;
47 import eu.fbk.rdfpro.util.Options;
48 import eu.fbk.rdfpro.util.Scripting;
49 import eu.fbk.rdfpro.util.Statements;
50 import eu.fbk.rdfpro.util.Tracker;
51 import eu.fbk.rdfpro.vocab.VOIDX;
52
53
54
55
56 public final class RDFProcessors {
57
58 private static final Logger LOGGER = LoggerFactory.getLogger(RDFProcessors.class);
59
60
61 public static final RDFProcessor NIL = new RDFProcessor() {
62
63 @Override
64 public RDFHandler wrap(final RDFHandler handler) {
65 return RDFHandlers.ignoreMethods(Objects.requireNonNull(handler),
66 RDFHandlers.METHOD_HANDLE_COMMENT | RDFHandlers.METHOD_HANDLE_NAMESPACE
67 | RDFHandlers.METHOD_HANDLE_STATEMENT);
68 }
69
70 };
71
72
73 public static final RDFProcessor IDENTITY = new RDFProcessor() {
74
75 @Override
76 public RDFHandler wrap(final RDFHandler handler) {
77 return Objects.requireNonNull(handler);
78 }
79
80 };
81
82 private RDFProcessors() {
83 }
84
85 private static String[] tokenize(final String spec) {
86
87 final List<String> tokens = new ArrayList<String>();
88
89 final StringBuilder builder = new StringBuilder();
90 boolean quoted = false;
91 boolean escaped = false;
92 int start = -1;
93
94 for (int i = 0; i < spec.length(); ++i) {
95 final char ch = spec.charAt(i);
96 final boolean ws = Character.isWhitespace(ch);
97 if (ch == '\\' && !escaped) {
98 escaped = true;
99 } else {
100 if (start < 0) {
101 if (!ws) {
102 start = i;
103 quoted = ch == '\'' || ch == '\"';
104 builder.setLength(0);
105 if (!quoted) {
106 builder.append(ch);
107 }
108 }
109 } else {
110 final boolean tokenChar = escaped || quoted && ch != spec.charAt(start)
111 || !quoted && !ws;
112 if (tokenChar) {
113 builder.append(ch);
114 }
115 if (!tokenChar || i == spec.length() - 1) {
116 tokens.add(builder.toString());
117 start = -1;
118 quoted = false;
119 }
120 }
121 escaped = false;
122 }
123 }
124
125 return tokens.toArray(new String[tokens.size()]);
126 }
127
128 @Nullable
129 private static URI parseURI(@Nullable final String string) {
130 if (string == null) {
131 return null;
132 } else if (!string.contains(":")) {
133 return (URI) Statements.parseValue(string + ":", Namespaces.DEFAULT);
134 } else {
135 return (URI) Statements.parseValue(string, Namespaces.DEFAULT);
136 }
137 }
138
139 static RDFProcessor create(final String name, final String... args) {
140
141 switch (name) {
142 case "r":
143 case "read": {
144 final Options options = Options.parse("b!|w|+", args);
145 final String[] fileSpecs = options.getPositionalArgs(String.class).toArray(
146 new String[0]);
147 final boolean preserveBNodes = !options.hasOption("w");
148 final URI base = parseURI(options.getOptionArg("b", String.class));
149 return read(true, preserveBNodes, base == null ? null : base.stringValue(), null,
150 fileSpecs);
151 }
152
153 case "w":
154 case "write": {
155 final Options options = Options.parse("c!|+", args);
156 final int chunkSize = options.getOptionArg("c", Integer.class, 1);
157 final String[] locations = options.getPositionalArgs(String.class).toArray(
158 new String[0]);
159 return write(null, chunkSize, locations);
160 }
161
162 case "t":
163 case "transform": {
164 final Options options = Options.parse("+", args);
165 final String spec = String.join(" ", options.getPositionalArgs(String.class));
166 final Transformer transformer = Scripting.isScript(spec) ? Scripting.compile(
167 Transformer.class, spec, "q", "h") : Transformer.rules(spec);
168 return transform(transformer);
169 }
170
171 case "u":
172 case "unique": {
173 final Options options = Options.parse("m", args);
174 return unique(options.hasOption("m"));
175 }
176
177 case "p":
178 case "prefix": {
179 final Options options = Options.parse("f!", args);
180 final String source = options.getOptionArg("f", String.class);
181 Namespaces namespaces = Namespaces.DEFAULT;
182 if (source != null) {
183 try {
184 URL url;
185 final File file = new File(source);
186 if (file.exists()) {
187 url = file.toURI().toURL();
188 } else {
189 url = RDFProcessors.class.getClassLoader().getResource(source);
190 }
191 namespaces = Namespaces.load(Collections.singleton(url), false);
192 } catch (final Throwable ex) {
193 throw new IllegalArgumentException(
194 "Cannot load prefix/namespace bindings from " + source + ": "
195 + ex.getMessage(), ex);
196 }
197 }
198 return prefix(namespaces.prefixMap());
199 }
200
201 case "smush": {
202 final Options options = Options.parse("x|*", args);
203 final String[] namespaces = options.getPositionalArgs(String.class).toArray(
204 new String[0]);
205 final boolean hasSmushEasterEgg = options.hasOption("x");
206 if (hasSmushEasterEgg) {
207
208
209 final PrintStream p = System.out;
210 p.println();
211 p.println(".==================================================================.");
212 p.println("|| ( ) ( ) ( ) ( ) ||");
213 p.println("|'================================================================'|");
214 p.println("|| ||");
215 p.println("|| ||");
216 p.println("|| .::::. ||");
217 p.println("|| .::::::::. ||");
218 p.println("|| ::::::::::: ||");
219 p.println("|| ':::::::::::.. ||");
220 p.println("|| :::::::::::::::' ||");
221 p.println("|| ':::::::::::. ||");
222 p.println("|| .::::::::::::::' ||");
223 p.println("|| .:::::::::::... ||");
224 p.println("|| ::::::::::::::'' ||");
225 p.println("|| .:::. '::::::::'':::: ||");
226 p.println("|| .::::::::. ':::::' ':::: ||");
227 p.println("|| .::::':::::::. ::::: '::::. ||");
228 p.println("|| .:::::' ':::::::::. ::::: ':::. ||");
229 p.println("|| .:::::' ':::::::::.::::: '::. ||");
230 p.println("|| .::::'' ':::::::::::::: '::. ||");
231 p.println("|| .::'' ':::::::::::: :::... ||");
232 p.println("|| ..:::: ':::::::::' .:' '''' ||");
233 p.println("|| ..''''':' ':::::.' ||");
234 p.println("|| ||");
235 p.println("|| ||");
236 p.println("|'================================================================'|");
237 p.println("|| __________________ ||");
238 p.println("|| | ___ \\ _ \\ ___| ||");
239 p.println("|| | |_/ / | | | |_ ||");
240 p.println("|| | /| | | | _| ||");
241 p.println("|| | |\\ \\| |/ /| | ___ ___ ____ ||");
242 p.println("|| \\_| \\_|___/ \\_| / _ \\/ _ \\/ __ \\ ||");
243 p.println("|| / ___/ , _/ /_/ / ||");
244 p.println("|| /_/ /_/|_|\\____/ ||");
245 p.println("|| ||");
246 p.println("'=============================================================LGB=='");
247 p.println();
248 }
249
250 for (int i = 0; i < namespaces.length; ++i) {
251 namespaces[i] = parseURI(namespaces[i]).stringValue();
252 }
253 return smush(namespaces);
254 }
255
256 case "tbox": {
257 Options.parse("", args);
258 return tbox();
259 }
260
261 case "rdfs": {
262 final Options options = Options.parse("d|e!|C|c!|b!|t|w|+", args);
263 final URI base = parseURI(options.getOptionArg("b", String.class));
264 final boolean preserveBNodes = !options.hasOption("w");
265 final String[] fileSpecs = options.getPositionalArgs(String.class).toArray(
266 new String[0]);
267 final RDFSource tbox = RDFProcessors.track(
268 new Tracker(LOGGER, null, "%d TBox triples read (%d tr/s avg)",
269 "%d TBox triples read (%d tr/s, %d tr/s avg)")).wrap(
270 RDFSources.read(true, preserveBNodes,
271 base == null ? null : base.stringValue(), null, fileSpecs));
272 final boolean decomposeOWLAxioms = options.hasOption("d");
273 final boolean dropBNodeTypes = options.hasOption("t");
274 String[] excludedRules = new String[0];
275 if (options.hasOption("e")) {
276 excludedRules = options.getOptionArg("e", String.class).split(",");
277 }
278 URI context = null;
279 if (options.hasOption("C")) {
280 context = SESAME.NIL;
281 } else if (options.hasOption("c")) {
282 context = parseURI(options.getOptionArg("c", String.class));
283 }
284 return rdfs(tbox, context, decomposeOWLAxioms, dropBNodeTypes, excludedRules);
285 }
286
287 case "stats": {
288 final Options options = Options.parse("n!|p!|c!|t!|o", args);
289 final URI namespace = parseURI(options.getOptionArg("n", String.class));
290 final URI property = parseURI(options.getOptionArg("p", String.class));
291 final URI context = parseURI(options.getOptionArg("c", String.class));
292 final Long threshold = options.getOptionArg("t", Long.class);
293 final boolean processCooccurrences = options.hasOption("o");
294 return stats(namespace == null ? null : namespace.stringValue(), property, context,
295 threshold, processCooccurrences);
296 }
297
298 case "download": {
299 final Options options = Options.parse("w|q!|f!|!", args);
300 final boolean preserveBNodes = !options.hasOption("w");
301 final String endpointURL = parseURI(options.getPositionalArg(0, String.class))
302 .stringValue();
303 String query = options.getOptionArg("q", String.class);
304 if (query == null) {
305 final String source = options.getOptionArg("f", String.class);
306 try {
307 final File file = new File(source);
308 URL url;
309 if (file.exists()) {
310 url = file.toURI().toURL();
311 } else {
312 url = RDFProcessors.class.getClassLoader().getResource(source);
313 }
314 final BufferedReader reader = new BufferedReader(new InputStreamReader(
315 url.openStream()));
316 try {
317 final StringBuilder builder = new StringBuilder();
318 String line;
319 while ((line = reader.readLine()) != null) {
320 builder.append(line);
321 }
322 query = builder.toString();
323 } finally {
324 IO.closeQuietly(reader);
325 }
326 } catch (final Throwable ex) {
327 throw new IllegalArgumentException("Cannot load SPARQL query from " + source
328 + ": " + ex.getMessage(), ex);
329 }
330 }
331 return download(true, preserveBNodes, endpointURL, query);
332 }
333
334 case "upload": {
335 final Options options = Options.parse("!", args);
336 final String endpointURL = parseURI(options.getPositionalArg(0, String.class))
337 .stringValue();
338 return upload(endpointURL);
339 }
340
341 case "mapreduce": {
342 final Options options = Options.parse("b!|r!|e!|a!|u|+", args);
343 final boolean deduplicate = options.hasOption("u");
344 final String bypassExp = options.getOptionArg("b", String.class);
345 final String existsExp = options.getOptionArg("e", String.class);
346 final String forallExp = options.getOptionArg("a", String.class);
347 final String reducerExp = options.getOptionArg("r", String.class);
348 final Predicate<Statement> bypassPred = Statements.statementMatcher(bypassExp);
349 final Predicate<Statement> existsPred = Statements.statementMatcher(existsExp);
350 final Predicate<Statement> forallPred = Statements.statementMatcher(forallExp);
351 Reducer reducer = reducerExp == null ? Reducer.IDENTITY
352 : Scripting.compile(Reducer.class, reducerExp, "k", "p", "h");
353 reducer = Reducer.filter(reducer, existsPred, forallPred);
354 final List<Mapper> mappers = new ArrayList<>();
355 for (final String mapperExp : options.getPositionalArgs(String.class)) {
356 mappers.add(Mapper.parse(mapperExp));
357 }
358 Mapper mapper = Mapper.concat(mappers.toArray(new Mapper[mappers.size()]));
359 if (bypassPred != null) {
360 mapper = Mapper.bypass(mapper, bypassPred);
361 }
362 return mapReduce(mapper, reducer, deduplicate);
363 }
364
365 default:
366 throw new Error("Unsupported " + name + " processor, check properties file");
367 }
368 }
369
370
371
372
373
374
375
376
377
378
379
380
381 public static RDFProcessor parse(final boolean tokenize, final String... args) {
382 List<String> list;
383 if (tokenize) {
384 list = new ArrayList<>();
385 for (final String arg : args) {
386 list.addAll(Arrays.asList(tokenize(arg)));
387 }
388 } else {
389 list = Arrays.asList(args);
390 }
391 return new Parser(list).parse();
392 }
393
394
395
396
397
398
399
400
401
402
403
404
405 public static RDFProcessor parallel(final SetOperator operator,
406 final RDFProcessor... processors) {
407
408 Objects.requireNonNull(operator);
409
410 if (processors.length == 0) {
411 throw new IllegalArgumentException("At least one processor should be supplied "
412 + "in a parallel composition");
413 }
414
415 int count = 0;
416 for (final RDFProcessor processor : processors) {
417 count = Math.max(count, processor.getExtraPasses());
418 }
419 final int extraPasses = count;
420
421 return new RDFProcessor() {
422
423 @Override
424 public int getExtraPasses() {
425 return extraPasses;
426 }
427
428 @Override
429 public RDFHandler wrap(final RDFHandler handler) {
430
431 Objects.requireNonNull(handler);
432
433 final int numProcessors = processors.length;
434
435 final int[] extraPasses = new int[numProcessors];
436 final RDFHandler[] handlers = RDFHandlers
437 .collect(handler, numProcessors, operator);
438
439 for (int i = 0; i < numProcessors; ++i) {
440 final RDFProcessor processor = processors[i];
441 extraPasses[i] = processor.getExtraPasses();
442 handlers[i] = processor.wrap(handlers[i]);
443 }
444
445 return RDFHandlers.dispatchAll(handlers, extraPasses);
446 }
447
448 };
449 }
450
451
452
453
454
455
456
457
458
459
460 public static RDFProcessor sequence(final RDFProcessor... processors) {
461
462 if (processors.length == 0) {
463 throw new IllegalArgumentException("At least one processor should be supplied "
464 + "in a sequence composition");
465 }
466
467 if (processors.length == 1) {
468 return Objects.requireNonNull(processors[0]);
469 }
470
471 int count = 0;
472 for (final RDFProcessor processor : processors) {
473 count += processor.getExtraPasses();
474 }
475 final int extraPasses = count;
476
477 return new RDFProcessor() {
478
479 @Override
480 public int getExtraPasses() {
481 return extraPasses;
482 }
483
484 @Override
485 public RDFHandler wrap(final RDFHandler handler) {
486 RDFHandler result = Objects.requireNonNull(handler);
487 for (int i = processors.length - 1; i >= 0; --i) {
488 result = processors[i].wrap(result);
489 }
490 return result;
491 }
492 };
493 }
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515 public static RDFProcessor mapReduce(final Mapper mapper, final Reducer reducer,
516 final boolean deduplicate) {
517 return new ProcessorMapReduce(mapper, reducer, deduplicate);
518 }
519
520
521
522
523
524
525
526
527
528
529
530
531 public static RDFProcessor prefix(@Nullable final Map<String, String> nsToPrefixMap) {
532 return new ProcessorPrefix(nsToPrefixMap);
533 }
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558 public static RDFProcessor rdfs(final RDFSource tbox, @Nullable final Resource tboxContext,
559 final boolean decomposeOWLAxioms, final boolean dropBNodeTypes,
560 final String... excludedRules) {
561 return new ProcessorRDFS(tbox, tboxContext, decomposeOWLAxioms, dropBNodeTypes,
562 excludedRules);
563 }
564
565
566
567
568
569
570
571
572
573
574
575 public static RDFProcessor smush(final String... rankedNamespaces) {
576 return new ProcessorSmush(rankedNamespaces);
577 }
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613 public static RDFProcessor stats(@Nullable final String outputNamespace,
614 @Nullable final URI sourceProperty, @Nullable final URI sourceContext,
615 @Nullable final Long threshold, final boolean processCooccurrences) {
616 return new ProcessorStats(outputNamespace, sourceProperty, sourceContext, threshold,
617 processCooccurrences);
618 }
619
620
621
622
623
624
625 public static RDFProcessor tbox() {
626 return ProcessorTBox.INSTANCE;
627 }
628
629
630
631
632
633
634
635
636
637 public static RDFProcessor transform(final Transformer transformer) {
638 Objects.requireNonNull(transformer);
639 return new RDFProcessor() {
640
641 @Override
642 public RDFHandler wrap(final RDFHandler handler) {
643 return new AbstractRDFHandlerWrapper(Objects.requireNonNull(handler)) {
644
645 @Override
646 public void handleStatement(final Statement statement)
647 throws RDFHandlerException {
648 transformer.transform(statement, this.handler);
649 }
650
651 };
652 }
653
654 };
655 }
656
657
658
659
660
661
662
663
664
665
666
667 public static RDFProcessor unique(final boolean mergeContexts) {
668 return new ProcessorUnique(mergeContexts);
669 }
670
671
672
673
674
675
676
677
678
679 public static RDFProcessor inject(final RDFSource source) {
680 Objects.requireNonNull(source);
681 return new RDFProcessor() {
682
683 @Override
684 public RDFHandler wrap(final RDFHandler handler) {
685 return new InjectSourceHandler(Objects.requireNonNull(handler), source, null);
686 }
687
688 };
689 }
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712 public static RDFProcessor read(final boolean parallelize, final boolean preserveBNodes,
713 @Nullable final String baseURI, @Nullable final ParserConfig config,
714 final String... locations) {
715 final RDFProcessor tracker = track(new Tracker(LOGGER, null,
716 "%d triples read (%d tr/s avg)",
717 "%d triples read (%d tr/s, %d tr/s avg)"));
718 final RDFSource source = RDFSources.read(parallelize, preserveBNodes, baseURI, config,
719 locations);
720 return inject(tracker.wrap(source));
721 }
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743 public static RDFProcessor download(final boolean parallelize, final boolean preserveBNodes,
744 final String endpointURL, final String query) {
745 final RDFProcessor tracker = track(new Tracker(LOGGER, null,
746 "%d triples queried (%d tr/s avg)",
747 "%d triples queried (%d tr/s, %d tr/s avg)"));
748 final RDFSource source = RDFSources.query(parallelize, preserveBNodes, endpointURL, query);
749 return inject(tracker.wrap(source));
750 }
751
752
753
754
755
756
757
758
759
760
761
762
763 public static RDFProcessor tee(final RDFHandler... handlers) {
764 if (handlers.length == 0) {
765 return IDENTITY;
766 }
767 return new RDFProcessor() {
768
769 @Override
770 public RDFHandler wrap(final RDFHandler handler) {
771 final RDFHandler[] allHandlers = new RDFHandler[handlers.length + 1];
772 allHandlers[0] = Objects.requireNonNull(handler);
773 System.arraycopy(handlers, 0, allHandlers, 1, handlers.length);
774 return RDFHandlers.dispatchAll(allHandlers);
775 }
776
777 };
778 }
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797 public static RDFProcessor write(@Nullable final WriterConfig config, final int chunkSize,
798 final String... locations) {
799 if (locations.length == 0) {
800 return IDENTITY;
801 }
802 final RDFHandler handler = RDFHandlers.write(config, chunkSize, locations);
803 final RDFProcessor tracker = track(new Tracker(LOGGER, null,
804 "%d triples written (%d tr/s avg)",
805 "%d triples written (%d tr/s, %d tr/s avg)"));
806 return tee(RDFHandlers.ignorePasses(tracker.wrap(handler), 1));
807 }
808
809
810
811
812
813
814
815
816
817
818
819
820 public static RDFProcessor upload(final String endpointURL) {
821 final RDFProcessor tracker = track(new Tracker(LOGGER, null,
822 "%d triples uploaded (%d tr/s avg)",
823 "%d triples uploaded (%d tr/s, %d tr/s avg)"));
824 final RDFHandler handler = tracker.wrap(RDFHandlers.update(endpointURL));
825 return tee(handler);
826 }
827
828
829
830
831
832
833
834
835
836 public static RDFProcessor track(final Tracker tracker) {
837
838 Objects.requireNonNull(tracker);
839
840 return new RDFProcessor() {
841
842 @Override
843 public RDFHandler wrap(final RDFHandler handler) {
844 return new AbstractRDFHandlerWrapper(Objects.requireNonNull(handler)) {
845
846 @Override
847 public void startRDF() throws RDFHandlerException {
848 tracker.start();
849 super.startRDF();
850 }
851
852 @Override
853 public void handleStatement(final Statement statement)
854 throws RDFHandlerException {
855 super.handleStatement(statement);
856 tracker.increment();
857 }
858
859 @Override
860 public void endRDF() throws RDFHandlerException {
861 try {
862 super.endRDF();
863 } finally {
864 tracker.end();
865 }
866 }
867 };
868 }
869
870 };
871 }
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888 public static RDFProcessor rules(final Ruleset ruleset, @Nullable final Mapper mapper,
889 final boolean dropBNodeTypes, final boolean deduplicate) {
890 return new ProcessorRules(ruleset, mapper, dropBNodeTypes, deduplicate, null, false, null);
891 }
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917 public static RDFProcessor rules(final Ruleset ruleset, @Nullable final Mapper mapper,
918 final boolean dropBNodeTypes, final boolean deduplicate,
919 @Nullable final RDFSource tboxData, final boolean emitTBox,
920 @Nullable final URI tboxContext) {
921 return new ProcessorRules(ruleset, mapper, dropBNodeTypes, deduplicate, tboxData,
922 emitTBox, tboxContext);
923 }
924
925 private static class InjectSourceHandler extends AbstractRDFHandler {
926
927 @Nullable
928 private final RDFHandler handler;
929
930 private final RDFSource source;
931
932 @Nullable
933 private final Tracker tracker;
934
935 private RDFHandler sourceHandler;
936
937 @Nullable
938 private CountDownLatch latch;
939
940 @Nullable
941 private volatile Throwable exception;
942
943 InjectSourceHandler(final RDFHandler handler, final RDFSource source,
944 @Nullable final Tracker tracker) {
945 this.handler = Objects.requireNonNull(handler);
946 this.source = Objects.requireNonNull(source);
947 this.tracker = tracker;
948 this.sourceHandler = handler;
949 this.latch = null;
950 this.exception = null;
951 }
952
953 @Override
954 public void startRDF() throws RDFHandlerException {
955 this.handler.startRDF();
956 this.latch = new CountDownLatch(1);
957 Environment.getPool().execute(new Runnable() {
958
959 @Override
960 public void run() {
961 inject();
962 }
963
964 });
965 }
966
967 @Override
968 public void handleComment(final String comment) throws RDFHandlerException {
969 checkNotFailed();
970 this.handler.handleComment(comment);
971 }
972
973 @Override
974 public void handleNamespace(final String prefix, final String uri)
975 throws RDFHandlerException {
976 checkNotFailed();
977 this.handler.handleNamespace(prefix, uri);
978 }
979
980 @Override
981 public void handleStatement(final Statement statement) throws RDFHandlerException {
982 checkNotFailed();
983 this.handler.handleStatement(statement);
984 }
985
986 @Override
987 public void endRDF() throws RDFHandlerException {
988 try {
989 if (this.latch != null) {
990 this.latch.await();
991 }
992 } catch (final InterruptedException ex) {
993 this.exception = ex;
994 }
995 checkNotFailed();
996 this.handler.endRDF();
997 }
998
999 @Override
1000 public void close() {
1001 IO.closeQuietly(this.handler);
1002 this.sourceHandler = null;
1003 }
1004
1005 private void inject() {
1006 try {
1007 InjectSourceHandler.this.source.emit(new AbstractRDFHandler() {
1008
1009 @Override
1010 public void handleComment(final String comment) throws RDFHandlerException {
1011 InjectSourceHandler.this.sourceHandler.handleComment(comment);
1012 }
1013
1014 @Override
1015 public void handleNamespace(final String prefix, final String uri)
1016 throws RDFHandlerException {
1017 InjectSourceHandler.this.sourceHandler.handleNamespace(prefix, uri);
1018 }
1019
1020 @Override
1021 public void handleStatement(final Statement statement)
1022 throws RDFHandlerException {
1023 InjectSourceHandler.this.sourceHandler.handleStatement(statement);
1024 if (InjectSourceHandler.this.tracker != null) {
1025 InjectSourceHandler.this.tracker.increment();
1026 }
1027 }
1028
1029 }, 1);
1030 } catch (final Throwable ex) {
1031 if (this.sourceHandler != null) {
1032 this.exception = ex;
1033 }
1034 } finally {
1035 this.latch.countDown();
1036 }
1037 }
1038
1039 private void checkNotFailed() throws RDFHandlerException {
1040 if (this.exception != null) {
1041 if (this.exception instanceof RDFHandlerException) {
1042 throw (RDFHandlerException) this.exception;
1043 } else if (this.exception instanceof RuntimeException) {
1044 throw (RuntimeException) this.exception;
1045 } else if (this.exception instanceof Error) {
1046 throw (Error) this.exception;
1047 }
1048 throw new RDFHandlerException(this.exception);
1049 }
1050 }
1051
1052 }
1053
1054 private static final class Parser {
1055
1056 private static final int EOF = 0;
1057
1058 private static final int COMMAND = 1;
1059
1060 private static final int OPTION = 2;
1061
1062 private static final int OPEN_BRACE = 3;
1063
1064 private static final int COMMA = 4;
1065
1066 private static final int CLOSE_BRACE = 5;
1067
1068 private final List<String> tokens;
1069
1070 private String token;
1071
1072 private int type;
1073
1074 private int pos;
1075
1076 Parser(final List<String> tokens) {
1077
1078 this.tokens = tokens;
1079 this.token = null;
1080 this.type = 0;
1081 this.pos = 0;
1082
1083 next();
1084 }
1085
1086 RDFProcessor parse() {
1087 final RDFProcessor processor = parseSequence();
1088 if (this.type != EOF) {
1089 syntaxError("<EOF>");
1090 }
1091 return processor;
1092 }
1093
1094 private RDFProcessor parseSequence() {
1095 final List<RDFProcessor> processors = new ArrayList<RDFProcessor>();
1096 do {
1097 if (this.type == COMMAND) {
1098 processors.add(parseCommand());
1099 } else if (this.type == OPEN_BRACE) {
1100 processors.add(parseParallel());
1101 } else {
1102 syntaxError("'@command' or '{'");
1103 }
1104 } while (this.type == COMMAND || this.type == OPEN_BRACE);
1105 return sequence(processors.toArray(new RDFProcessor[processors.size()]));
1106 }
1107
1108 private RDFProcessor parseParallel() {
1109 final List<RDFProcessor> processors = new ArrayList<RDFProcessor>();
1110 do {
1111 next();
1112 processors.add(parseSequence());
1113 } while (this.type == COMMA);
1114 if (this.type != CLOSE_BRACE) {
1115 syntaxError("'}x'");
1116 }
1117 final String mod = this.token.length() == 1 ? "a" : this.token.substring(1);
1118 final SetOperator merging = SetOperator.valueOf(mod);
1119 next();
1120 return parallel(merging, processors.toArray(new RDFProcessor[processors.size()]));
1121 }
1122
1123 private RDFProcessor parseCommand() {
1124 final String command = this.token.substring(1).toLowerCase();
1125 final List<String> args = new ArrayList<String>();
1126 while (next() == OPTION) {
1127 args.add(this.token);
1128 }
1129 return Environment.newPlugin(RDFProcessor.class, command,
1130 args.toArray(new String[args.size()]));
1131 }
1132
1133 private void syntaxError(final String expected) {
1134 throw new IllegalArgumentException("Invalid specification. Expected " + expected
1135 + ", found '" + this.token + "'");
1136 }
1137
1138 private int next() {
1139 if (this.pos == this.tokens.size()) {
1140 this.token = "<EOF>";
1141 this.type = EOF;
1142 } else {
1143 this.token = this.tokens.get(this.pos++);
1144 final char ch = this.token.charAt(0);
1145 if (ch == '@') {
1146 this.type = COMMAND;
1147 } else if (ch == '}') {
1148 this.type = CLOSE_BRACE;
1149 } else if ("{".equals(this.token)) {
1150 this.type = OPEN_BRACE;
1151 } else if (",".equals(this.token)) {
1152 this.type = COMMA;
1153 } else {
1154 this.type = OPTION;
1155 }
1156 }
1157 return this.type;
1158 }
1159
1160 }
1161
1162 }