1
2
3
4
5
6
7
8
9
10
11
12
13
14 package eu.fbk.rdfpro;
15
16 import java.io.Closeable;
17 import java.io.DataOutputStream;
18 import java.io.IOException;
19 import java.io.OutputStream;
20 import java.io.OutputStreamWriter;
21 import java.io.Writer;
22 import java.net.HttpURLConnection;
23 import java.net.URL;
24 import java.net.URLEncoder;
25 import java.nio.charset.Charset;
26 import java.util.ArrayList;
27 import java.util.Arrays;
28 import java.util.Collection;
29 import java.util.Collections;
30 import java.util.HashSet;
31 import java.util.Iterator;
32 import java.util.LinkedList;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.Objects;
36 import java.util.Set;
37 import java.util.concurrent.ArrayBlockingQueue;
38 import java.util.concurrent.BlockingQueue;
39 import java.util.concurrent.Future;
40 import java.util.concurrent.atomic.AtomicInteger;
41 import java.util.concurrent.atomic.AtomicLong;
42 import java.util.concurrent.atomic.AtomicReference;
43 import java.util.function.Consumer;
44
45 import javax.annotation.Nullable;
46
47 import com.google.common.base.Throwables;
48 import com.google.common.collect.Lists;
49
50 import org.openrdf.model.Model;
51 import org.openrdf.model.Namespace;
52 import org.openrdf.model.Resource;
53 import org.openrdf.model.Statement;
54 import org.openrdf.model.URI;
55 import org.openrdf.model.Value;
56 import org.openrdf.model.impl.NamespaceImpl;
57 import org.openrdf.rio.RDFFormat;
58 import org.openrdf.rio.RDFHandler;
59 import org.openrdf.rio.RDFHandlerException;
60 import org.openrdf.rio.RDFWriter;
61 import org.openrdf.rio.Rio;
62 import org.openrdf.rio.WriterConfig;
63 import org.openrdf.rio.helpers.BasicWriterSettings;
64 import org.slf4j.Logger;
65 import org.slf4j.LoggerFactory;
66
67 import eu.fbk.rdfpro.util.Environment;
68 import eu.fbk.rdfpro.util.IO;
69 import eu.fbk.rdfpro.util.QuadModel;
70 import eu.fbk.rdfpro.util.Sorter;
71 import eu.fbk.rdfpro.util.Statements;
72
73
74
75
76 public final class RDFHandlers {
77
78
79 public static final int METHOD_START_RDF = 0x01;
80
81
82 public static final int METHOD_HANDLE_COMMENT = 0x02;
83
84
85 public static final int METHOD_HANDLE_NAMESPACE = 0x04;
86
87
88 public static final int METHOD_HANDLE_STATEMENT = 0x08;
89
90
91 public static final int METHOD_END_RDF = 0x10;
92
93
94 public static final int METHOD_CLOSE = 0x20;
95
96
97 public static final AbstractRDFHandler NIL = new AbstractRDFHandler() {};
98
99 private static final Logger LOGGER = LoggerFactory.getLogger(RDFHandlers.class);
100
101 private static final WriterConfig DEFAULT_WRITER_CONFIG;
102
103 static {
104 final WriterConfig config = new WriterConfig();
105 config.set(BasicWriterSettings.PRETTY_PRINT, true);
106 config.set(BasicWriterSettings.RDF_LANGSTRING_TO_LANG_LITERAL, true);
107 config.set(BasicWriterSettings.XSD_STRING_TO_PLAIN_LITERAL, true);
108 DEFAULT_WRITER_CONFIG = config;
109 }
110
111 private RDFHandlers() {
112 }
113
114
115
116
117
118
119
120
121
122
123
124
125 public static RDFHandler wrap(final Collection<? super Statement> statements) {
126 Objects.requireNonNull(statements);
127 return new WrapHandler(statements, statements instanceof Model
128 || statements instanceof QuadModel ? statements : null);
129 }
130
131
132
133
134
135
136
137
138
139
140
141
142 public static RDFHandler wrap(final Collection<? super Statement> statements,
143 @Nullable final Collection<? super Namespace> namespaces) {
144 Objects.requireNonNull(statements);
145 return new WrapHandler(statements, namespaces);
146 }
147
148
149
150
151
152
153
154
155
156
157
158
159
160 public static RDFHandler wrap(final Collection<? super Statement> statements,
161 @Nullable final Map<? super String, ? super String> namespaces) {
162 Objects.requireNonNull(statements);
163 return new WrapHandler(statements, namespaces);
164 }
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186 public static RDFHandler write(@Nullable final WriterConfig config, final int chunkSize,
187 final String... locations) {
188 final WriterConfig actualConfig = config != null ? config : DEFAULT_WRITER_CONFIG;
189 final RDFHandler[] handlers = new RDFHandler[locations.length];
190 for (int i = 0; i < locations.length; ++i) {
191 final String location = locations[i];
192 final RDFFormat format = Statements.toRDFFormat(location);
193 final boolean parallel = Statements.isRDFFormatLineBased(format);
194 handlers[i] = parallel ? new ParallelWriteHandler(actualConfig, location)
195 : new SequentialWriteHandler(actualConfig, location);
196 }
197 return handlers.length == 0 ? NIL : handlers.length == 1 ? handlers[0]
198 : dispatchRoundRobin(chunkSize, handlers);
199 }
200
201
202
203
204
205
206
207
208
209
210 public static RDFHandler update(final String endpointURL) {
211 return new UpdateHandler(endpointURL, null);
212 }
213
214
215
216
217
218
219
220
221
222
223
224
225
226 public static RDFHandler ignoreMethods(final RDFHandler handler, final int ignoredMethods) {
227
228 Objects.requireNonNull(handler);
229
230 if (ignoredMethods == 0 || handler == NIL) {
231 return handler;
232
233 } else if ((ignoredMethods & METHOD_HANDLE_STATEMENT) == 0) {
234 return new IgnoreMethodHandler(handler, ignoredMethods) {
235
236 @Override
237 public void handleStatement(final Statement statement) throws RDFHandlerException {
238 super.handleStatement(statement);
239 }
240
241 };
242
243 } else {
244 return new IgnoreMethodHandler(handler, ignoredMethods) {
245
246 @Override
247 public void handleStatement(final Statement statement) throws RDFHandlerException {
248
249 }
250
251 };
252 }
253 }
254
255
256
257
258
259
260
261
262
263
264
265 public static RDFHandler ignorePasses(final RDFHandler handler, final int maxPasses) {
266 if (handler == NIL) {
267 return handler;
268 }
269 return new AbstractRDFHandlerWrapper(handler) {
270
271 private RDFHandler passHandler = null;
272
273 private int pass = 0;
274
275 @Override
276 public void startRDF() throws RDFHandlerException {
277 this.passHandler = this.pass < maxPasses ? this.handler : NIL;
278 this.passHandler.startRDF();
279 }
280
281 @Override
282 public void handleComment(final String comment) throws RDFHandlerException {
283 this.passHandler.handleComment(comment);
284 }
285
286 @Override
287 public void handleNamespace(final String prefix, final String uri)
288 throws RDFHandlerException {
289 this.passHandler.handleNamespace(prefix, uri);
290 }
291
292 @Override
293 public void handleStatement(final Statement statement) throws RDFHandlerException {
294 this.passHandler.handleStatement(statement);
295 }
296
297 @Override
298 public void endRDF() throws RDFHandlerException {
299 try {
300 this.passHandler.endRDF();
301 } finally {
302 ++this.pass;
303 }
304 }
305
306 };
307 }
308
309
310
311
312
313
314
315
316
317 public static RDFHandler dispatchAll(final RDFHandler... handlers) {
318 return dispatchAll(handlers, new int[handlers.length]);
319 }
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338 public static RDFHandler dispatchAll(final RDFHandler[] handlers, final int[] extraPasses) {
339 Objects.requireNonNull(extraPasses);
340 if (Arrays.asList(handlers).contains(null)) {
341 throw new NullPointerException();
342 }
343 if (handlers.length == 0) {
344 return NIL;
345 } else if (handlers.length == 1) {
346 return handlers[0];
347 } else if (handlers.length == 2 && extraPasses[0] == extraPasses[1]) {
348 return new DispatchTwoHandler(handlers[0], handlers[1]);
349 } else {
350 return new DispatchAllHandler(handlers, extraPasses);
351 }
352 }
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371 public static RDFHandler dispatchRoundRobin(final int chunkSize, final RDFHandler... handlers) {
372 if (Arrays.asList(handlers).contains(null)) {
373 throw new NullPointerException();
374 }
375 if (handlers.length == 0) {
376 return NIL;
377 } else if (handlers.length == 1) {
378 return handlers[0];
379 } else {
380 return new DispatchRoundRobinHandler(chunkSize, handlers);
381 }
382 }
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401 public static RDFHandler[] collect(final RDFHandler handler, final int count,
402 final SetOperator operation) {
403
404 Objects.requireNonNull(handler);
405 Objects.requireNonNull(operation);
406
407 if (count < 1) {
408 throw new IllegalArgumentException();
409 }
410
411 final RDFHandler[] result = new RDFHandler[count];
412
413 if (count == 1 && (operation == SetOperator.SUM_MULTISET
414 || operation == SetOperator.UNION_MULTISET
415 || operation == SetOperator.INTERSECTION_MULTISET
416 || operation == SetOperator.DIFFERENCE_MULTISET)) {
417 result[0] = handler;
418 } else if (count == 1 && operation == SetOperator.SYMMETRIC_DIFFERENCE
419 || operation == SetOperator.SYMMETRIC_DIFFERENCE_MULTISET) {
420 result[0] = NIL;
421 } else if (operation == SetOperator.SUM_MULTISET) {
422 Arrays.fill(result, new CollectMergerHandler(handler, count));
423 } else if (operation == SetOperator.UNION) {
424 Arrays.fill(result, new CollectSorterHandler(handler, count, true, true));
425 } else {
426 final CollectSetOperatorHandler sink;
427 sink = new CollectSetOperatorHandler(handler, count, operation);
428 for (int i = 0; i < count; ++i) {
429 result[i] = new CollectLabellerHandler(sink, i);
430 }
431 }
432
433 return result;
434 }
435
436
437
438
439
440
441
442
443
444
445 public static RDFHandler decouple(final RDFHandler handler) {
446 if (handler == NIL || handler instanceof DecoupleHandler) {
447 return handler;
448 }
449 return new DecoupleHandler(handler);
450 }
451
452
453
454
455
456
457
458
459
460
461
462
463
464 public static RDFHandler decouple(final RDFHandler handler, final int numConsumerThreads) {
465 if (numConsumerThreads <= 0) {
466 throw new IllegalArgumentException("Invalid number of consumer threads: "
467 + numConsumerThreads);
468 }
469 if (handler == NIL || handler instanceof DecoupleQueueHandler) {
470 return handler;
471 }
472 return new DecoupleQueueHandler(handler, numConsumerThreads);
473 }
474
475
476
477
478
479
480
481
482
483
484 public static RDFHandler synchronize(final RDFHandler handler) {
485 if (handler == NIL || handler instanceof SynchronizeHandler) {
486 return handler;
487 }
488 return new SynchronizeHandler(handler);
489 }
490
491 private static final class WrapHandler extends AbstractRDFHandler {
492
493 private final Collection<? super Statement> statementSink;
494
495 private final Object namespaceSink;
496
497 public WrapHandler(final Collection<? super Statement> statementSink,
498 @Nullable final Object namespaceSink) {
499 this.statementSink = statementSink;
500 this.namespaceSink = namespaceSink;
501 }
502
503 @SuppressWarnings("unchecked")
504 @Override
505 public synchronized void handleNamespace(final String prefix, final String uri)
506 throws RDFHandlerException {
507 if (this.namespaceSink instanceof Model) {
508 ((Model) this.namespaceSink).setNamespace(prefix, uri);
509 } else if (this.namespaceSink instanceof QuadModel) {
510 ((QuadModel) this.namespaceSink).setNamespace(prefix, uri);
511 } else if (this.namespaceSink instanceof Collection<?>) {
512 ((Collection<Namespace>) this.namespaceSink).add(new NamespaceImpl(prefix, uri));
513 } else if (this.namespaceSink instanceof Map<?, ?>) {
514 ((Map<String, String>) this.namespaceSink).put(prefix, uri);
515 }
516 }
517
518 @Override
519 public synchronized void handleStatement(final Statement statement)
520 throws RDFHandlerException {
521 this.statementSink.add(statement);
522 }
523
524 }
525
526 private static final class SequentialWriteHandler extends AbstractRDFHandler {
527
528 private final WriterConfig config;
529
530 private final String location;
531
532 @Nullable
533 private Closeable out;
534
535 @Nullable
536 private RDFWriter writer;
537
538 SequentialWriteHandler(final WriterConfig config, final String location) {
539 this.config = config;
540 this.location = location;
541 }
542
543 @Override
544 public void startRDF() throws RDFHandlerException {
545 try {
546 final RDFFormat format = Statements.toRDFFormat(this.location);
547 LOGGER.debug("Starting sequential {} writing of {}", format, this.location);
548 final OutputStream stream = IO.write(this.location);
549 if (Statements.isRDFFormatTextBased(format)) {
550 this.out = IO.buffer(new OutputStreamWriter(stream, Charset.forName("UTF-8")));
551 this.writer = Rio.createWriter(format, (Writer) this.out);
552 } else {
553 this.out = IO.buffer(stream);
554 this.writer = Rio.createWriter(format, (OutputStream) this.out);
555 }
556 this.writer.setWriterConfig(this.config);
557 this.writer.startRDF();
558 } catch (final IOException ex) {
559 throw new RDFHandlerException("Could not write to " + this.location);
560 }
561 super.startRDF();
562 }
563
564 @Override
565 public synchronized void handleComment(final String comment) throws RDFHandlerException {
566 this.writer.handleComment(comment);
567 }
568
569 @Override
570 public synchronized void handleNamespace(final String prefix, final String uri)
571 throws RDFHandlerException {
572 this.writer.handleNamespace(prefix, uri);
573 }
574
575 @Override
576 public synchronized void handleStatement(final Statement statement)
577 throws RDFHandlerException {
578 this.writer.handleStatement(statement);
579 }
580
581 @Override
582 public void endRDF() throws RDFHandlerException {
583 this.writer.endRDF();
584 try {
585 this.out.close();
586 } catch (final IOException ex) {
587 throw new RDFHandlerException("Unable to properly close " + this.location, ex);
588 }
589 }
590
591 @Override
592 public void close() {
593 IO.closeQuietly(this.out);
594 this.out = null;
595 this.writer = null;
596 }
597
598 }
599
600 private static final class ParallelWriteHandler extends AbstractRDFHandler {
601
602 private final WriterConfig config;
603
604 private final String location;
605
606 @Nullable
607 private OutputStream out;
608
609 @Nullable
610 private List<Writer> partialOuts;
611
612 @Nullable
613 private List<RDFWriter> partialWriters;
614
615 @Nullable
616 private ThreadLocal<RDFWriter> threadWriter;
617
618 ParallelWriteHandler(final WriterConfig config, final String location) {
619 this.config = config;
620 this.location = location;
621 }
622
623 @Override
624 public void startRDF() throws RDFHandlerException {
625 try {
626 LOGGER.debug("Starting parallel {} writing of {}",
627 Statements.toRDFFormat(this.location).getName(), this.location);
628 this.out = IO.write(this.location);
629 this.partialOuts = new ArrayList<Writer>();
630 this.partialWriters = new ArrayList<RDFWriter>();
631 this.threadWriter = new ThreadLocal<RDFWriter>() {
632
633 @Override
634 protected RDFWriter initialValue() {
635 return newWriter();
636 }
637
638 };
639
640 } catch (final IOException ex) {
641 throw new RDFHandlerException("Could not write to " + this.location);
642 }
643 super.startRDF();
644 }
645
646 @Override
647 public void handleStatement(final Statement statement) throws RDFHandlerException {
648 this.threadWriter.get().handleStatement(statement);
649 }
650
651 @Override
652 public void endRDF() throws RDFHandlerException {
653 for (final RDFHandler partialWriter : this.partialWriters) {
654 partialWriter.endRDF();
655 }
656 try {
657 for (final Writer partialOut : this.partialOuts) {
658 partialOut.close();
659 }
660 this.out.close();
661 } catch (final IOException ex) {
662 throw new RDFHandlerException("Unable to properly close " + this.location, ex);
663 }
664 }
665
666 @Override
667 public void close() {
668 IO.closeQuietly(this.out);
669 this.out = null;
670 this.partialOuts = null;
671 this.partialWriters = null;
672 this.threadWriter = null;
673 }
674
675 private RDFWriter newWriter() {
676 final Writer partialOut = IO.utf8Writer(IO.parallelBuffer(this.out, (byte) '\n'));
677 final RDFFormat format = Statements.toRDFFormat(this.location);
678 final RDFWriter partialWriter = Rio.createWriter(format, partialOut);
679 partialWriter.setWriterConfig(this.config);
680 synchronized (this.partialOuts) {
681 this.partialOuts.add(partialOut);
682 this.partialWriters.add(partialWriter);
683 }
684 try {
685 partialWriter.startRDF();
686 } catch (final RDFHandlerException ex) {
687 throw new RuntimeException(ex);
688 }
689 return partialWriter;
690 }
691
692 }
693
694 private final static class UpdateHandler extends AbstractRDFHandler {
695
696 private static final int DEFAULT_CHUNK_SIZE = 1024;
697
698 private static final String HEAD = "INSERT DATA {\n";
699
700 private final String endpointURL;
701
702 private final int chunkSize;
703
704 private final StringBuilder builder;
705
706 private Resource lastCtx;
707
708 private Resource lastSubj;
709
710 private URI lastPred;
711
712 private int count;
713
714 UpdateHandler(final String endpointURL, @Nullable final Integer chunkSize) {
715 this.endpointURL = endpointURL;
716 this.chunkSize = chunkSize != null ? chunkSize : DEFAULT_CHUNK_SIZE;
717 this.builder = new StringBuilder();
718 this.lastCtx = null;
719 this.lastSubj = null;
720 this.lastPred = null;
721 this.count = 0;
722 this.builder.append(HEAD);
723 }
724
725 @Override
726 public synchronized void handleStatement(final Statement statement)
727 throws RDFHandlerException {
728
729 final boolean sameCtx = Objects.equals(this.lastCtx, statement.getContext());
730 final boolean sameSubj = sameCtx && statement.getSubject().equals(this.lastSubj);
731 final boolean samePred = sameSubj && statement.getPredicate().equals(this.lastPred);
732
733 if (this.lastSubj != null) {
734 if (!sameSubj) {
735 this.builder.append(" .\n");
736 }
737 if (!sameCtx && this.lastCtx != null) {
738 this.builder.append("}\n");
739 }
740 }
741
742 if (!sameCtx && statement.getContext() != null) {
743 this.builder.append("GRAPH ");
744 emit(statement.getContext());
745 this.builder.append(" {\n");
746 }
747
748 if (!samePred) {
749 if (!sameSubj) {
750 emit(statement.getSubject());
751 this.builder.append(" ");
752 } else {
753 this.builder.append(" ; ");
754 }
755 emit(statement.getPredicate());
756 this.builder.append(" ");
757 } else {
758 this.builder.append(" , ");
759 }
760
761 emit(statement.getObject());
762
763 this.lastCtx = statement.getContext();
764 this.lastSubj = statement.getSubject();
765 this.lastPred = statement.getPredicate();
766
767 ++this.count;
768 if (this.count == this.chunkSize) {
769 flush();
770 }
771 }
772
773 @Override
774 public void endRDF() throws RDFHandlerException {
775 flush();
776 }
777
778 private void emit(final Value value) {
779 try {
780 Statements.formatValue(value, null, this.builder);
781 } catch (final IOException ex) {
782 throw new Error("Unexpected exception (!)", ex);
783 }
784 }
785
786 private void flush() throws RDFHandlerException {
787 if (this.count > 0) {
788 if (this.lastSubj != null && this.lastCtx != null) {
789 this.builder.append("}");
790 }
791 this.builder.append("}");
792 final String update = this.builder.toString();
793 this.builder.setLength(HEAD.length());
794 this.count = 0;
795 try {
796 sendUpdate(update);
797 } catch (final Throwable ex) {
798 throw new RDFHandlerException(ex);
799 }
800 }
801 }
802
803 private void sendUpdate(final String update) throws IOException {
804
805 final byte[] requestBody = ("update=" + URLEncoder.encode(update, "UTF-8"))
806 .getBytes(Charset.forName("UTF-8"));
807
808 final URL url = new URL(this.endpointURL);
809 final HttpURLConnection connection = (HttpURLConnection) url.openConnection();
810 connection.setDoOutput(true);
811 connection.setDoInput(true);
812 connection.setRequestMethod("POST");
813 connection.setRequestProperty("Content-Type",
814 "application/x-www-form-urlencoded; charset=utf-8");
815 connection.setRequestProperty("Content-Length", Integer.toString(requestBody.length));
816
817 connection.connect();
818
819 try {
820 final DataOutputStream out = new DataOutputStream(connection.getOutputStream());
821 out.write(requestBody);
822 out.close();
823
824 final int httpCode = connection.getResponseCode();
825 if (httpCode != HttpURLConnection.HTTP_OK) {
826 throw new IOException("Upload to '" + this.endpointURL + "' failed (HTTP "
827 + httpCode + ")");
828 }
829
830 } finally {
831 connection.disconnect();
832 }
833 }
834
835 }
836
837 private static abstract class IgnoreMethodHandler extends AbstractRDFHandlerWrapper {
838
839 private final boolean forwardStartRDF;
840
841 private final boolean forwardHandleComment;
842
843 private final boolean forwardHandleNamespace;
844
845 private final boolean forwardEndRDF;
846
847 private final boolean forwardClose;
848
849 IgnoreMethodHandler(final RDFHandler handler, final int ignoredMethods) {
850 super(handler);
851 this.forwardStartRDF = (ignoredMethods & METHOD_START_RDF) == 0;
852 this.forwardHandleComment = (ignoredMethods & METHOD_HANDLE_COMMENT) == 0;
853 this.forwardHandleNamespace = (ignoredMethods & METHOD_HANDLE_NAMESPACE) == 0;
854 this.forwardEndRDF = (ignoredMethods & METHOD_END_RDF) == 0;
855 this.forwardClose = (ignoredMethods & METHOD_CLOSE) == 0;
856 }
857
858 @Override
859 public final void startRDF() throws RDFHandlerException {
860 if (this.forwardStartRDF) {
861 super.startRDF();
862 }
863 }
864
865 @Override
866 public final void handleComment(final String comment) throws RDFHandlerException {
867 if (this.forwardHandleComment) {
868 super.handleComment(comment);
869 }
870 }
871
872 @Override
873 public final void handleNamespace(final String prefix, final String uri)
874 throws RDFHandlerException {
875 if (this.forwardHandleNamespace) {
876 super.handleNamespace(prefix, uri);
877 }
878 }
879
880 @Override
881 public final void endRDF() throws RDFHandlerException {
882 if (this.forwardEndRDF) {
883 super.endRDF();
884 }
885 }
886
887 @Override
888 public final void close() {
889 if (this.forwardClose) {
890 super.close();
891 }
892 }
893
894 }
895
896 private static final class DispatchTwoHandler extends AbstractRDFHandler {
897
898 private final RDFHandler first;
899
900 private final RDFHandler second;
901
902 DispatchTwoHandler(final RDFHandler first, final RDFHandler second) {
903 this.first = first;
904 this.second = second;
905 }
906
907 @Override
908 public void startRDF() throws RDFHandlerException {
909 this.first.startRDF();
910 this.second.startRDF();
911 }
912
913 @Override
914 public void handleComment(final String comment) throws RDFHandlerException {
915 this.first.handleComment(comment);
916 this.second.handleComment(comment);
917 }
918
919 @Override
920 public void handleNamespace(final String prefix, final String uri)
921 throws RDFHandlerException {
922 this.first.handleNamespace(prefix, uri);
923 this.second.handleNamespace(prefix, uri);
924 }
925
926 @Override
927 public void handleStatement(final Statement statement) throws RDFHandlerException {
928 this.first.handleStatement(statement);
929 this.second.handleStatement(statement);
930 }
931
932 @Override
933 public void endRDF() throws RDFHandlerException {
934 this.first.endRDF();
935 this.second.endRDF();
936 }
937
938 @Override
939 public void close() {
940 IO.closeQuietly(this.first);
941 IO.closeQuietly(this.second);
942 }
943
944 }
945
946 private static final class DispatchAllHandler extends AbstractRDFHandler {
947
948 private final RDFHandler[] handlers;
949
950 private final int[] extraPasses;
951
952 private RDFHandler[] passHandlers;
953
954 private int passIndex;
955
956 DispatchAllHandler(final RDFHandler[] handlers, final int[] extraPasses) {
957
958 int maxExtraPasses = 0;
959 for (int i = 0; i < extraPasses.length; ++i) {
960 maxExtraPasses = Math.max(maxExtraPasses, extraPasses[i]);
961 }
962
963 this.handlers = handlers;
964 this.extraPasses = extraPasses;
965 this.passHandlers = null;
966 this.passIndex = maxExtraPasses;
967 }
968
969 @Override
970 public void startRDF() throws RDFHandlerException {
971
972 if (this.passIndex == 0) {
973 this.passHandlers = this.handlers;
974 } else {
975 final List<RDFHandler> list = new ArrayList<RDFHandler>();
976 for (int i = 0; i < this.handlers.length; ++i) {
977 if (this.extraPasses[i] >= this.passIndex) {
978 list.add(this.handlers[i]);
979 }
980 }
981 this.passHandlers = list.toArray(new RDFHandler[list.size()]);
982 --this.passIndex;
983 }
984
985 for (final RDFHandler handler : this.passHandlers) {
986 handler.startRDF();
987 }
988 }
989
990 @Override
991 public void handleComment(final String comment) throws RDFHandlerException {
992 for (final RDFHandler handler : this.passHandlers) {
993 handler.handleComment(comment);
994 }
995 }
996
997 @Override
998 public void handleNamespace(final String prefix, final String uri)
999 throws RDFHandlerException {
1000 for (final RDFHandler handler : this.passHandlers) {
1001 handler.handleNamespace(prefix, uri);
1002 }
1003 }
1004
1005 @Override
1006 public void handleStatement(final Statement statement) throws RDFHandlerException {
1007 for (final RDFHandler handler : this.passHandlers) {
1008 handler.handleStatement(statement);
1009 }
1010 }
1011
1012 @Override
1013 public void endRDF() throws RDFHandlerException {
1014 for (final RDFHandler handler : this.passHandlers) {
1015 handler.endRDF();
1016 }
1017 }
1018
1019 @Override
1020 public void close() {
1021 for (final RDFHandler handler : this.handlers) {
1022 IO.closeQuietly(handler);
1023 }
1024 }
1025
1026 }
1027
1028 private static final class DispatchRoundRobinHandler extends AbstractRDFHandler {
1029
1030 private final AtomicLong counter = new AtomicLong(0);
1031
1032 private final int chunkSize;
1033
1034 private final RDFHandler[] handlers;
1035
1036 DispatchRoundRobinHandler(final int chunkSize, final RDFHandler[] handlers) {
1037 this.chunkSize = chunkSize;
1038 this.handlers = handlers;
1039 }
1040
1041 @Override
1042 public void startRDF() throws RDFHandlerException {
1043 for (final RDFHandler handler : this.handlers) {
1044 handler.startRDF();
1045 }
1046 }
1047
1048 @Override
1049 public void handleComment(final String comment) throws RDFHandlerException {
1050 pickHandler().handleComment(comment);
1051 }
1052
1053 @Override
1054 public void handleNamespace(final String prefix, final String uri)
1055 throws RDFHandlerException {
1056 for (final RDFHandler handler : this.handlers) {
1057 handler.handleNamespace(prefix, uri);
1058 }
1059 }
1060
1061 @Override
1062 public void handleStatement(final Statement statement) throws RDFHandlerException {
1063 pickHandler().handleStatement(statement);
1064 }
1065
1066 @Override
1067 public void endRDF() throws RDFHandlerException {
1068 Throwable exception = null;
1069 for (final RDFHandler handler : this.handlers) {
1070 try {
1071 handler.endRDF();
1072 } catch (final Throwable ex) {
1073 if (exception == null) {
1074 exception = ex;
1075 } else {
1076 exception.addSuppressed(ex);
1077 }
1078 }
1079 }
1080 if (exception != null) {
1081 if (exception instanceof RuntimeException) {
1082 throw (RDFHandlerException) exception;
1083 } else if (exception instanceof Error) {
1084 throw (Error) exception;
1085 }
1086 throw (RDFHandlerException) exception;
1087 }
1088 }
1089
1090 @Override
1091 public void close() {
1092 for (final RDFHandler handler : this.handlers) {
1093 IO.closeQuietly(handler);
1094 }
1095 }
1096
1097 private RDFHandler pickHandler() {
1098 return this.handlers[(int) (this.counter.getAndIncrement()
1099 / this.chunkSize % this.handlers.length)];
1100 }
1101
1102 }
1103
1104 private static final class CollectLabellerHandler extends AbstractRDFHandlerWrapper {
1105
1106 private final CollectMergerHandler collector;
1107
1108 private final int label;
1109
1110 CollectLabellerHandler(final CollectMergerHandler handler, final int label) {
1111 super(handler);
1112 this.collector = handler;
1113 this.label = label;
1114 }
1115
1116 @Override
1117 public void handleStatement(final Statement statement) throws RDFHandlerException {
1118 this.collector.handleStatement(statement, this.label);
1119 }
1120
1121 }
1122
1123 private static class CollectMergerHandler extends AbstractRDFHandlerWrapper {
1124
1125 private final int size;
1126
1127 private int pending;
1128
1129 CollectMergerHandler(final RDFHandler handler, final int size) {
1130 super(handler);
1131 this.size = size;
1132 this.pending = 0;
1133 }
1134
1135 @Override
1136 public final void startRDF() throws RDFHandlerException {
1137 if (this.pending <= 0) {
1138 this.pending = this.size;
1139 super.startRDF();
1140 doStartRDF();
1141 }
1142 }
1143
1144 @Override
1145 public final void handleStatement(final Statement statement) throws RDFHandlerException {
1146 doHandleStatement(statement, 0);
1147 }
1148
1149 public final void handleStatement(final Statement statement, final int label)
1150 throws RDFHandlerException {
1151 doHandleStatement(statement, label);
1152 }
1153
1154 @Override
1155 public final void endRDF() throws RDFHandlerException {
1156 --this.pending;
1157 if (this.pending == 0) {
1158 doEndRDF();
1159 super.endRDF();
1160 }
1161 }
1162
1163 @Override
1164 public void close() {
1165 super.close();
1166 doClose();
1167 }
1168
1169 void doStartRDF() throws RDFHandlerException {
1170 }
1171
1172 void doHandleStatement(final Statement statement, final int label)
1173 throws RDFHandlerException {
1174 super.handleStatement(statement);
1175 }
1176
1177 void doEndRDF() throws RDFHandlerException {
1178 }
1179
1180 void doClose() {
1181 }
1182
1183 }
1184
1185 private static class CollectSorterHandler extends CollectMergerHandler {
1186
1187 private final boolean deduplicate;
1188
1189 private final boolean parallelize;
1190
1191 private Sorter<Object[]> sorter;
1192
1193 CollectSorterHandler(final RDFHandler handler, final int size, final boolean deduplicate,
1194 final boolean parallelize) {
1195 super(handler, size);
1196 this.deduplicate = deduplicate;
1197 this.parallelize = parallelize;
1198 this.sorter = null;
1199 }
1200
1201 @Override
1202 void doStartRDF() throws RDFHandlerException {
1203 this.sorter = Sorter.newTupleSorter(true, Statement.class, Long.class);
1204 try {
1205 this.sorter.start(this.deduplicate);
1206 } catch (final IOException ex) {
1207 throw new RDFHandlerException(ex);
1208 }
1209 }
1210
1211 @Override
1212 void doHandleStatement(final Statement statement, final int label)
1213 throws RDFHandlerException {
1214 try {
1215 this.sorter.emit(new Object[] { statement, label });
1216 } catch (final Throwable ex) {
1217 throw new RDFHandlerException(ex);
1218 }
1219 }
1220
1221 @Override
1222 void doEndRDF() throws RDFHandlerException {
1223 try {
1224 this.sorter.end(this.parallelize, new Consumer<Object[]>() {
1225
1226 @Override
1227 public void accept(final Object[] record) {
1228 try {
1229 final Statement statement = (Statement) record[0];
1230 final int label = ((Long) record[1]).intValue();
1231 doHandleStatementSorted(statement, label);
1232 } catch (final RDFHandlerException ex) {
1233 throw new RuntimeException(ex);
1234 }
1235 }
1236
1237 });
1238 this.sorter.close();
1239 this.sorter = null;
1240 } catch (final IOException ex) {
1241 throw new RDFHandlerException(ex);
1242 }
1243 }
1244
1245 @Override
1246 void doClose() {
1247 IO.closeQuietly(this.sorter);
1248 }
1249
1250 void doHandleStatementSorted(final Statement statement, final int label)
1251 throws RDFHandlerException {
1252 this.handler.handleStatement(statement);
1253 }
1254
1255 }
1256
1257 private static class CollectSetOperatorHandler extends CollectSorterHandler {
1258
1259 private final SetOperator operator;
1260
1261 private final int[] multiplicities;
1262
1263 private Statement statement;
1264
1265 CollectSetOperatorHandler(final RDFHandler handler, final int size,
1266 final SetOperator operator) {
1267 super(handler, size, false, false);
1268 this.operator = operator;
1269 this.multiplicities = new int[size];
1270 this.statement = null;
1271 }
1272
1273 @Override
1274 void doHandleStatementSorted(final Statement statement, final int label)
1275 throws RDFHandlerException {
1276
1277 if (!statement.equals(this.statement)
1278 || !Objects.equals(statement.getContext(), this.statement.getContext())) {
1279 flush();
1280 this.statement = statement;
1281 Arrays.fill(this.multiplicities, 0);
1282 }
1283 ++this.multiplicities[label];
1284 }
1285
1286 @Override
1287 void doEndRDF() throws RDFHandlerException {
1288 super.doEndRDF();
1289 flush();
1290 }
1291
1292 private void flush() throws RDFHandlerException {
1293 if (this.statement != null) {
1294 final int multiplicity = this.operator.apply(this.multiplicities);
1295 for (int i = 0; i < multiplicity; ++i) {
1296 this.handler.handleStatement(this.statement);
1297 }
1298 }
1299 }
1300
1301 }
1302
1303 private static final class DecoupleHandler extends AbstractRDFHandlerWrapper {
1304
1305 private static final int BUFFER_SIZE = 4 * 1024;
1306
1307 private final int numCores;
1308
1309 private final Set<Thread> incomingThreads;
1310
1311 private final List<Future<?>> futures;
1312
1313 private final AtomicInteger counter;
1314
1315 private Throwable exception;
1316
1317 private Statement[] buffer;
1318
1319 private int size;
1320
1321 private int mask;
1322
1323 private boolean disabled;
1324
1325 DecoupleHandler(final RDFHandler handler) {
1326 super(handler);
1327 this.numCores = Environment.getCores();
1328 this.incomingThreads = new HashSet<Thread>();
1329 this.futures = new LinkedList<Future<?>>();
1330 this.counter = new AtomicInteger(0);
1331 }
1332
1333 @Override
1334 public void startRDF() throws RDFHandlerException {
1335 super.startRDF();
1336 this.incomingThreads.clear();
1337 this.futures.clear();
1338 this.exception = null;
1339 this.buffer = new Statement[BUFFER_SIZE];
1340 this.size = 0;
1341 this.mask = 0;
1342 this.disabled = false;
1343 }
1344
1345 @Override
1346 public void handleStatement(final Statement statement) throws RDFHandlerException {
1347
1348 if (this.disabled) {
1349 super.handleStatement(statement);
1350 } else {
1351 handleStatementHelper(statement);
1352 }
1353 }
1354
1355 private void handleStatementHelper(final Statement statement) throws RDFHandlerException {
1356 if ((this.counter.getAndIncrement() & this.mask) != 0) {
1357 super.handleStatement(statement);
1358 } else {
1359 handleStatementInBackground(statement);
1360 }
1361 }
1362
1363 private void handleStatementInBackground(final Statement statement)
1364 throws RDFHandlerException {
1365
1366 Statement[] fullBuffer = null;
1367 synchronized (this) {
1368 this.buffer[this.size++] = statement;
1369 if (this.size == BUFFER_SIZE) {
1370 fullBuffer = this.buffer;
1371 this.buffer = new Statement[BUFFER_SIZE];
1372 this.size = 0;
1373 this.incomingThreads.add(Thread.currentThread());
1374 checkNotFailed();
1375 calibrateMask();
1376 fullBuffer = handleStatementsInBackground(fullBuffer);
1377 }
1378 }
1379
1380 if (fullBuffer != null) {
1381 for (final Statement stmt : fullBuffer) {
1382 super.handleStatement(stmt);
1383 }
1384 }
1385 }
1386
1387 private Statement[] handleStatementsInBackground(final Statement[] buffer) {
1388
1389
1390 for (final Iterator<Future<?>> i = this.futures.iterator(); i.hasNext();) {
1391 final Future<?> future = i.next();
1392 if (future.isDone()) {
1393 i.remove();
1394 }
1395 }
1396
1397
1398 if (this.futures.size() >= this.numCores) {
1399 return buffer;
1400 }
1401
1402
1403 this.futures.add(Environment.getPool().submit(new Runnable() {
1404
1405 @Override
1406 public void run() {
1407 try {
1408 for (final Statement statement : buffer) {
1409 DecoupleHandler.super.handleStatement(statement);
1410 }
1411 } catch (final Throwable ex) {
1412 @SuppressWarnings("resource")
1413 final DecoupleHandler h = DecoupleHandler.this;
1414 synchronized (h) {
1415 if (h.exception == null) {
1416 h.exception = ex;
1417 } else {
1418 h.exception.addSuppressed(ex);
1419 }
1420 for (final Future<?> future : h.futures) {
1421 future.cancel(false);
1422 }
1423 }
1424 }
1425 }
1426
1427 }));
1428
1429
1430 return null;
1431 }
1432
1433 private void calibrateMask() {
1434
1435
1436
1437
1438
1439
1440 final int numCores = Environment.getCores();
1441 final int numThreads = this.incomingThreads.size();
1442 if (numCores > numThreads) {
1443 this.mask = Integer.highestOneBit(numCores / (numCores - numThreads)) - 1;
1444 } else {
1445 this.mask = 0xFFFFFFFF;
1446 this.disabled = true;
1447 LOGGER.debug("Decoupler disabled");
1448 }
1449
1450
1451
1452 }
1453
1454 private void checkNotFailed() throws RDFHandlerException {
1455 if (this.exception != null) {
1456 if (this.exception instanceof RDFHandlerException) {
1457 throw (RDFHandlerException) this.exception;
1458 } else if (this.exception instanceof RuntimeException) {
1459 throw (RuntimeException) this.exception;
1460 } else if (this.exception instanceof Error) {
1461 throw (Error) this.exception;
1462 }
1463 throw new RDFHandlerException(this.exception);
1464 }
1465 }
1466
1467 @Override
1468 public void endRDF() throws RDFHandlerException {
1469
1470
1471 for (int i = 0; i < this.size; ++i) {
1472 super.handleStatement(this.buffer[i]);
1473 }
1474
1475
1476 List<Future<?>> futuresToWaitFor;
1477 synchronized (this) {
1478 futuresToWaitFor = new ArrayList<Future<?>>(this.futures);
1479 }
1480 for (final Future<?> future : futuresToWaitFor) {
1481 while (!future.isDone()) {
1482 try {
1483 future.get();
1484 } catch (final Throwable ex) {
1485
1486 }
1487 }
1488 }
1489
1490
1491 checkNotFailed();
1492
1493
1494 super.endRDF();
1495 }
1496
1497 @Override
1498 public void close() {
1499 super.close();
1500 synchronized (this) {
1501 for (final Future<?> future : this.futures) {
1502 future.cancel(false);
1503 }
1504 }
1505 }
1506
1507 }
1508
1509 private static final class SynchronizeHandler extends AbstractRDFHandlerWrapper {
1510
1511 SynchronizeHandler(final RDFHandler delegate) {
1512 super(delegate);
1513 }
1514
1515 @Override
1516 public synchronized void handleComment(final String comment) throws RDFHandlerException {
1517 super.handleComment(comment);
1518 }
1519
1520 @Override
1521 public synchronized void handleNamespace(final String prefix, final String uri)
1522 throws RDFHandlerException {
1523 super.handleNamespace(prefix, uri);
1524 }
1525
1526 @Override
1527 public synchronized void handleStatement(final Statement statement)
1528 throws RDFHandlerException {
1529 super.handleStatement(statement);
1530 }
1531
1532 }
1533
1534 private static final class DecoupleQueueHandler extends AbstractRDFHandlerWrapper {
1535
1536 private static final int CAPACITY = 4 * 1024;
1537
1538 private static final Object EOF = new Object();
1539
1540 private final int numConsumers;
1541
1542 private AtomicReference<Throwable> exception;
1543
1544 private BlockingQueue<Object> queue;
1545
1546 private List<Future<?>> futures;
1547
1548 DecoupleQueueHandler(final RDFHandler delegate, final int numConsumers) {
1549 super(delegate);
1550 this.numConsumers = numConsumers;
1551 }
1552
1553 @Override
1554 public void startRDF() throws RDFHandlerException {
1555 super.startRDF();
1556 this.exception = new AtomicReference<>(null);
1557 this.queue = new ArrayBlockingQueue<>(CAPACITY);
1558 this.futures = Lists.newArrayList();
1559 for (int i = 0; i < this.numConsumers; ++i) {
1560 this.futures.add(Environment.getPool().submit(new Runnable() {
1561
1562 @Override
1563 public void run() {
1564 try {
1565 final RDFHandler handler = DecoupleQueueHandler.this.handler;
1566 while (true) {
1567 if (DecoupleQueueHandler.this.exception.get() != null) {
1568 break;
1569 }
1570 final Object element = DecoupleQueueHandler.this.queue.take();
1571 if (element instanceof Statement) {
1572 handler.handleStatement((Statement) element);
1573 } else if (element instanceof NamespaceImpl) {
1574 final NamespaceImpl ns = (NamespaceImpl) element;
1575 handler.handleNamespace(ns.getPrefix(), ns.getName());
1576 } else if (element instanceof String) {
1577 handler.handleComment((String) element);
1578 } else if (element == EOF) {
1579 break;
1580 }
1581 }
1582 } catch (final Throwable ex) {
1583 DecoupleQueueHandler.this.exception.set(ex);
1584 DecoupleQueueHandler.this.queue.clear();
1585 }
1586 }
1587
1588 }));
1589 }
1590 }
1591
1592 @Override
1593 public void handleComment(final String comment) throws RDFHandlerException {
1594 check();
1595 put(comment);
1596 }
1597
1598 @Override
1599 public void handleNamespace(final String prefix, final String uri)
1600 throws RDFHandlerException {
1601 check();
1602 put(new NamespaceImpl(prefix, uri));
1603 }
1604
1605 @Override
1606 public void handleStatement(final Statement statement) throws RDFHandlerException {
1607 check();
1608 put(statement);
1609 }
1610
1611 @Override
1612 public void endRDF() throws RDFHandlerException {
1613 try {
1614 check();
1615 put(EOF);
1616 for (final Future<?> future : this.futures) {
1617 try {
1618 future.get();
1619 } catch (final Throwable ex) {
1620 this.exception.compareAndSet(null, ex);
1621 }
1622 }
1623 check();
1624 super.endRDF();
1625 } finally {
1626 this.exception = null;
1627 this.queue = null;
1628 this.futures = null;
1629 }
1630 }
1631
1632 private void put(final Object object) throws RDFHandlerException {
1633 try {
1634 this.queue.put(object);
1635 } catch (final InterruptedException ex) {
1636 this.exception.set(ex);
1637 throw new RDFHandlerException(ex);
1638 }
1639 }
1640
1641 private void check() throws RDFHandlerException {
1642 final Throwable ex = this.exception.get();
1643 if (ex != null) {
1644 Throwables.propagateIfPossible(ex, RDFHandlerException.class);
1645 throw new RDFHandlerException(ex);
1646 }
1647 }
1648
1649 }
1650
1651 }