1 /*
2 * RDFpro - An extensible tool for building stream-oriented RDF processing libraries.
3 *
4 * Written in 2014 by Francesco Corcoglioniti with support by Marco Amadori, Michele Mostarda,
5 * Alessio Palmero Aprosio and Marco Rospocher. Contact info on http://rdfpro.fbk.eu/
6 *
7 * To the extent possible under law, the authors have dedicated all copyright and related and
8 * neighboring rights to this software to the public domain worldwide. This software is
9 * distributed without any warranty.
10 *
11 * You should have received a copy of the CC0 Public Domain Dedication along with this software.
12 * If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.
13 */
14 package eu.fbk.rdfpro;
15
16 import java.io.IOException;
17 import java.util.ArrayList;
18 import java.util.Arrays;
19 import java.util.Collections;
20 import java.util.HashMap;
21 import java.util.HashSet;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Objects;
25 import java.util.Set;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.function.Consumer;
28
29 import javax.annotation.Nullable;
30
31 import org.openrdf.model.Resource;
32 import org.openrdf.model.Statement;
33 import org.openrdf.model.URI;
34 import org.openrdf.model.Value;
35 import org.openrdf.rio.RDFHandler;
36 import org.openrdf.rio.RDFHandlerException;
37
38 import eu.fbk.rdfpro.util.Hash;
39 import eu.fbk.rdfpro.util.IO;
40 import eu.fbk.rdfpro.util.Sorter;
41 import eu.fbk.rdfpro.util.Statements;
42
43 final class ProcessorUnique implements RDFProcessor {
44
45 private final boolean mergeContexts;
46
47 ProcessorUnique(final boolean mergeContexts) {
48 this.mergeContexts = mergeContexts;
49 }
50
51 @SuppressWarnings("resource")
52 @Override
53 public RDFHandler wrap(final RDFHandler handler) {
54 Objects.requireNonNull(handler);
55 return this.mergeContexts ? new MergeHandler(RDFHandlers.decouple(handler)) //
56 : new Handler(handler, true);
57 }
58
59 // private static final class KeepContextsHandler extends AbstractRDFHandlerWrapper {
60 //
61 // private final int threshold;
62 //
63 // private StatementDeduplicator deduplicator;
64 //
65 // private AtomicInteger count;
66 //
67 // private Sorter<Statement> sorter;
68 //
69 // KeepContextsHandler(final RDFHandler handler) {
70 // super(handler);
71 // this.threshold = (int) (Runtime.getRuntime().freeMemory() / 2 / 24);
72 // this.deduplicator = null;
73 // this.sorter = null;
74 // this.count = null;
75 // }
76 //
77 // @Override
78 // public void startRDF() throws RDFHandlerException {
79 // super.startRDF();
80 // this.deduplicator = StatementDeduplicator.newHashDeduplicator();
81 // this.count = new AtomicInteger(0);
82 // this.sorter = Sorter.newStatementSorter(true);
83 // try {
84 // this.sorter.start(true);
85 // } catch (final IOException ex) {
86 // throw new RDFHandlerException(ex);
87 // }
88 // }
89 //
90 // @Override
91 // public void handleStatement(final Statement stmt) throws RDFHandlerException {
92 // try {
93 // if (this.deduplicator.isNew(stmt)) {
94 // final int count = this.count.incrementAndGet();
95 // }
96 //
97 // this.sorter.emit(statement);
98 // } catch (final Throwable ex) {
99 // throw new RDFHandlerException(ex);
100 // }
101 // }
102 //
103 // @Override
104 // public void endRDF() throws RDFHandlerException {
105 // try {
106 // this.sorter.end(true, new Consumer<Statement>() {
107 //
108 // @Override
109 // public void accept(final Statement statement) {
110 // try {
111 // KeepContextsHandler.this.handler.handleStatement(statement);
112 // } catch (final RDFHandlerException ex) {
113 // throw new RuntimeException(ex);
114 // }
115 // }
116 //
117 // });
118 // this.sorter.close();
119 // this.sorter = null;
120 // } catch (final IOException ex) {
121 // throw new RDFHandlerException(ex);
122 // }
123 // super.endRDF();
124 // }
125 //
126 // @Override
127 // public final void close() {
128 // IO.closeQuietly(this.sorter);
129 // super.close();
130 // }
131 //
132 // }
133
134 // private static final class MergeContextsHandler extends AbstractRDFHandlerWrapper {
135 //
136 // private final Map<Resource, List<Statement>> contextsStatements;
137 //
138 // private final Map<ContextSet, Resource> mergedContexts;
139 //
140 // @Nullable
141 // private Sorter<Statement> sorter;
142 //
143 // @Nullable
144 // private Statement statement;
145 //
146 // @Nullable
147 // private Resource statementSubj;
148 //
149 // @Nullable
150 // private URI statementPred;
151 //
152 // @Nullable
153 // private Value statementObj;
154 //
155 // @Nullable
156 // private Resource statementCtx; // if there is only a context;
157 //
158 // private final Set<Resource> statementContexts; // if there are multiple contexts
159 //
160 // public MergeContextsHandler(final RDFHandler handler) {
161 // super(handler);
162 // this.sorter = null;
163 // this.contextsStatements = new ConcurrentHashMap<>();
164 // this.mergedContexts = new HashMap<>();
165 // this.statementSubj = null;
166 // this.statementPred = null;
167 // this.statementObj = null;
168 // this.statementCtx = null;
169 // this.statementContexts = new HashSet<>();
170 // }
171 //
172 // @Override
173 // public void startRDF() throws RDFHandlerException {
174 // super.startRDF();
175 // this.sorter = Sorter.newStatementSorter(true);
176 // try {
177 // this.sorter.start(true);
178 // } catch (final IOException ex) {
179 // throw new RDFHandlerException(ex);
180 // }
181 // }
182 //
183 // @Override
184 // public void handleStatement(final Statement statement) throws RDFHandlerException {
185 // try {
186 // this.sorter.emit(statement);
187 // } catch (final Throwable ex) {
188 // throw new RDFHandlerException(ex);
189 // }
190 // final Resource context = statement.getContext();
191 // if (context != null) {
192 // this.contextsStatements.putIfAbsent(context, Collections.<Statement>emptyList());
193 // }
194 // }
195 //
196 // @Override
197 // public void endRDF() throws RDFHandlerException {
198 // try {
199 // this.sorter.end(false, new Consumer<Statement>() {
200 //
201 // @Override
202 // public void accept(final Statement statement) {
203 // try {
204 // handleStatementSorted(statement);
205 // } catch (final RDFHandlerException ex) {
206 // throw new RuntimeException(ex);
207 // }
208 // }
209 //
210 // });
211 // this.sorter.close();
212 // this.sorter = null;
213 // handleEndRDF();
214 // } catch (final IOException ex) {
215 // throw new RDFHandlerException(ex);
216 // }
217 // super.endRDF();
218 // }
219 //
220 // void handleStatementSorted(final Statement statement) throws RDFHandlerException {
221 //
222 // final Resource subj = statement.getSubject();
223 // final URI pred = statement.getPredicate();
224 // final Value obj = statement.getObject();
225 // final Resource ctx = statement.getContext();
226 //
227 // List<Statement> contextStatements = this.contextsStatements.get(subj);
228 // if (contextStatements != null) {
229 // if (contextStatements.isEmpty()) {
230 // contextStatements = new ArrayList<>();
231 // this.contextsStatements.put(subj, contextStatements);
232 // }
233 // contextStatements.add(statement); // context data buffered and emitted later
234 //
235 // } else if (subj.equals(this.statementSubj) && pred.equals(this.statementPred)
236 // && obj.equals(this.statementObj)) {
237 // if (this.statementCtx != null) {
238 // if (ctx == null) {
239 // this.statementCtx = null;
240 // this.statement = statement;
241 // } else {
242 // if (this.statementContexts.isEmpty()) {
243 // // we add the context firstly seen only now, so to avoid useless work
244 // // in the frequent case the input contains almost unique statements
245 // this.statementContexts.add(this.statementCtx);
246 // }
247 // this.statementContexts.add(ctx);
248 // }
249 // }
250 //
251 // } else {
252 // flush();
253 // this.statement = statement;
254 // this.statementSubj = subj;
255 // this.statementPred = pred;
256 // this.statementObj = obj;
257 // this.statementCtx = ctx;
258 // this.statementContexts.clear();
259 // }
260 // }
261 //
262 // void handleEndRDF() throws RDFHandlerException {
263 // flush();
264 // for (final List<Statement> statements : this.contextsStatements.values()) {
265 // for (final Statement statement : statements) {
266 // this.handler.handleStatement(statement);
267 // }
268 // }
269 // for (final Map.Entry<ContextSet, Resource> entry : this.mergedContexts.entrySet()) {
270 // final ContextSet set = entry.getKey();
271 // final Resource context = entry.getValue();
272 // final Set<Statement> statements = new HashSet<Statement>();
273 // for (final Resource source : set.contexts) {
274 // for (final Statement statement : this.contextsStatements.get(source)) {
275 // statements.add(Statements.VALUE_FACTORY.createStatement(context,
276 // statement.getPredicate(), statement.getObject(),
277 // statement.getContext()));
278 // }
279 // }
280 // for (final Statement statement : statements) {
281 // this.handler.handleStatement(statement);
282 // }
283 // }
284 // }
285 //
286 // private void flush() throws RDFHandlerException {
287 // if (this.statement != null) {
288 // Statement statement;
289 // if (this.statementCtx == null || this.statementContexts.size() <= 1) {
290 // statement = this.statement;
291 // } else {
292 // final Resource mergedContext = mergeContexts(this.statementContexts);
293 // statement = mergedContext.equals(this.statement.getContext()) ? this.statement
294 // : Statements.VALUE_FACTORY.createStatement(this.statementSubj,
295 // this.statementPred, this.statementObj, mergedContext);
296 // }
297 // this.handler.handleStatement(statement);
298 // }
299 // }
300 //
301 // @Nullable
302 // private Resource mergeContexts(final Set<Resource> contexts) {
303 //
304 // final ContextSet set = new ContextSet(contexts.toArray(new Resource[contexts.size()]));
305 //
306 // Resource context = this.mergedContexts.get(set);
307 // if (context == null) {
308 // final String[] args = new String[contexts.size()];
309 // String namespace = null;
310 // int index = 0;
311 // for (final Resource source : contexts) {
312 // args[index++] = source.stringValue();
313 // if (source instanceof URI) {
314 // final String ns = ((URI) source).getNamespace();
315 // if (namespace == null) {
316 // namespace = ns;
317 // } else {
318 // final int length = Math.min(ns.length(), namespace.length());
319 // for (int i = 0; i < length; ++i) {
320 // if (ns.charAt(i) != namespace.charAt(i)) {
321 // namespace = ns.substring(0, i);
322 // break;
323 // }
324 // }
325 // }
326 // }
327 // }
328 // Arrays.sort(args);
329 // if (namespace == null || "".equals(namespace)) {
330 // namespace = "urn:graph:";
331 // } else if (!namespace.endsWith("/") && !namespace.endsWith("#")
332 // && !namespace.endsWith(":")) {
333 // namespace = namespace + "/";
334 // }
335 // final String localName = Hash.murmur3(args).toString();
336 // context = Statements.VALUE_FACTORY.createURI(namespace, localName);
337 // this.mergedContexts.put(set, context);
338 // }
339 // return context;
340 // }
341 //
342 // private static final class ContextSet {
343 //
344 // Resource[] contexts;
345 //
346 // int hash;
347 //
348 // ContextSet(final Resource[] contexts) {
349 //
350 // int hash = 0;
351 // for (final Resource context : contexts) {
352 // hash += context.hashCode();
353 // }
354 //
355 // this.contexts = contexts;
356 // this.hash = hash;
357 // }
358 //
359 // @Override
360 // public boolean equals(final Object object) {
361 // if (object == this) {
362 // return true;
363 // }
364 // if (!(object instanceof ContextSet)) {
365 // return false;
366 // }
367 // final ContextSet other = (ContextSet) object;
368 // if (this.hash != other.hash || this.contexts.length != other.contexts.length) {
369 // return false;
370 // }
371 // final boolean[] matched = new boolean[this.contexts.length];
372 // outer: for (int i = 0; i < this.contexts.length; ++i) {
373 // final Resource thisContext = this.contexts[i];
374 // for (int j = 0; j < this.contexts.length; ++j) {
375 // if (!matched[j]) {
376 // final Resource otherContext = other.contexts[j];
377 // if (thisContext.equals(otherContext)) {
378 // matched[j] = true;
379 // continue outer;
380 // }
381 // }
382 // }
383 // return false;
384 // }
385 // return true;
386 // }
387 //
388 // @Override
389 // public int hashCode() {
390 // return this.hash;
391 // }
392 //
393 // }
394 //
395 // }
396
397 private static class Handler extends AbstractRDFHandlerWrapper {
398
399 private final boolean parallelize;
400
401 private Sorter<Statement> sorter;
402
403 Handler(final RDFHandler handler, final boolean parallelize) {
404 super(handler);
405 this.parallelize = parallelize;
406 this.sorter = null;
407 }
408
409 @Override
410 public void startRDF() throws RDFHandlerException {
411 super.startRDF();
412 this.sorter = Sorter.newStatementSorter(true);
413 try {
414 this.sorter.start(true);
415 } catch (final IOException ex) {
416 throw new RDFHandlerException(ex);
417 }
418 }
419
420 @Override
421 public void handleStatement(final Statement statement) throws RDFHandlerException {
422 try {
423 this.sorter.emit(statement);
424 } catch (final Throwable ex) {
425 throw new RDFHandlerException(ex);
426 }
427 }
428
429 @Override
430 public void endRDF() throws RDFHandlerException {
431 try {
432 this.sorter.end(this.parallelize, new Consumer<Statement>() {
433
434 @Override
435 public void accept(final Statement statement) {
436 try {
437 handleStatementSorted(statement);
438 } catch (final RDFHandlerException ex) {
439 throw new RuntimeException(ex);
440 }
441 }
442
443 });
444 this.sorter.close();
445 this.sorter = null;
446 handleEndRDF();
447 } catch (final IOException ex) {
448 throw new RDFHandlerException(ex);
449 }
450 super.endRDF();
451 }
452
453 @Override
454 public final void close() {
455 IO.closeQuietly(this.sorter);
456 super.close();
457 }
458
459 void handleStatementSorted(final Statement statement) throws RDFHandlerException {
460 this.handler.handleStatement(statement);
461 }
462
463 void handleEndRDF() throws RDFHandlerException {
464 }
465
466 }
467
468 private static final class MergeHandler extends Handler {
469
470 private final Map<Resource, List<Statement>> contextsStatements;
471
472 private final Map<ContextSet, Resource> mergedContexts;
473
474 @Nullable
475 private Statement statement;
476
477 @Nullable
478 private Resource statementSubj;
479
480 @Nullable
481 private URI statementPred;
482
483 @Nullable
484 private Value statementObj;
485
486 @Nullable
487 private Resource statementCtx; // if there is only a context;
488
489 private final Set<Resource> statementContexts; // if there are multiple contexts
490
491 public MergeHandler(final RDFHandler handler) {
492 super(handler, false);
493 this.contextsStatements = new ConcurrentHashMap<>();
494 this.mergedContexts = new HashMap<>();
495 this.statementSubj = null;
496 this.statementPred = null;
497 this.statementObj = null;
498 this.statementCtx = null;
499 this.statementContexts = new HashSet<>();
500 }
501
502 @Override
503 public void handleStatement(final Statement statement) throws RDFHandlerException {
504 super.handleStatement(statement);
505 final Resource context = statement.getContext();
506 if (context != null) {
507 this.contextsStatements.putIfAbsent(context, Collections.<Statement>emptyList());
508 }
509 }
510
511 @Override
512 void handleStatementSorted(final Statement statement) throws RDFHandlerException {
513
514 final Resource subj = statement.getSubject();
515 final URI pred = statement.getPredicate();
516 final Value obj = statement.getObject();
517 final Resource ctx = statement.getContext();
518
519 List<Statement> contextStatements = this.contextsStatements.get(subj);
520 if (contextStatements != null) {
521 if (contextStatements.isEmpty()) {
522 contextStatements = new ArrayList<>();
523 this.contextsStatements.put(subj, contextStatements);
524 }
525 contextStatements.add(statement); // context data buffered and emitted later
526
527 } else if (subj.equals(this.statementSubj) && pred.equals(this.statementPred)
528 && obj.equals(this.statementObj)) {
529 if (this.statementCtx != null) {
530 if (ctx == null) {
531 this.statementCtx = null;
532 this.statement = statement;
533 } else {
534 if (this.statementContexts.isEmpty()) {
535 // we add the context firstly seen only now, so to avoid useless work
536 // in the frequent case the input contains almost unique statements
537 this.statementContexts.add(this.statementCtx);
538 }
539 this.statementContexts.add(ctx);
540 }
541 }
542
543 } else {
544 flush();
545 this.statement = statement;
546 this.statementSubj = subj;
547 this.statementPred = pred;
548 this.statementObj = obj;
549 this.statementCtx = ctx;
550 this.statementContexts.clear();
551 }
552 }
553
554 @Override
555 void handleEndRDF() throws RDFHandlerException {
556 flush();
557 for (final List<Statement> statements : this.contextsStatements.values()) {
558 for (final Statement statement : statements) {
559 this.handler.handleStatement(statement);
560 }
561 }
562 for (final Map.Entry<ContextSet, Resource> entry : this.mergedContexts.entrySet()) {
563 final ContextSet set = entry.getKey();
564 final Resource context = entry.getValue();
565 final Set<Statement> statements = new HashSet<Statement>();
566 for (final Resource source : set.contexts) {
567 for (final Statement statement : this.contextsStatements.get(source)) {
568 statements.add(Statements.VALUE_FACTORY.createStatement(context,
569 statement.getPredicate(), statement.getObject(),
570 statement.getContext()));
571 }
572 }
573 for (final Statement statement : statements) {
574 this.handler.handleStatement(statement);
575 }
576 }
577 }
578
579 private void flush() throws RDFHandlerException {
580 if (this.statement != null) {
581 Statement statement;
582 if (this.statementCtx == null || this.statementContexts.size() <= 1) {
583 statement = this.statement;
584 } else {
585 final Resource mergedContext = mergeContexts(this.statementContexts);
586 statement = mergedContext.equals(this.statement.getContext()) ? this.statement
587 : Statements.VALUE_FACTORY.createStatement(this.statementSubj,
588 this.statementPred, this.statementObj, mergedContext);
589 }
590 this.handler.handleStatement(statement);
591 }
592 }
593
594 @Nullable
595 private Resource mergeContexts(final Set<Resource> contexts) {
596
597 final ContextSet set = new ContextSet(contexts.toArray(new Resource[contexts.size()]));
598
599 Resource context = this.mergedContexts.get(set);
600 if (context == null) {
601 final String[] args = new String[contexts.size()];
602 String namespace = null;
603 int index = 0;
604 for (final Resource source : contexts) {
605 args[index++] = source.stringValue();
606 if (source instanceof URI) {
607 final String ns = ((URI) source).getNamespace();
608 if (namespace == null) {
609 namespace = ns;
610 } else {
611 final int length = Math.min(ns.length(), namespace.length());
612 for (int i = 0; i < length; ++i) {
613 if (ns.charAt(i) != namespace.charAt(i)) {
614 namespace = ns.substring(0, i);
615 break;
616 }
617 }
618 }
619 }
620 }
621 Arrays.sort(args);
622 if (namespace == null || "".equals(namespace)) {
623 namespace = "urn:graph:";
624 } else if (!namespace.endsWith("/") && !namespace.endsWith("#")
625 && !namespace.endsWith(":")) {
626 namespace = namespace + "/";
627 }
628 final String localName = Hash.murmur3(args).toString();
629 context = Statements.VALUE_FACTORY.createURI(namespace, localName);
630 this.mergedContexts.put(set, context);
631 }
632 return context;
633 }
634
635 private static final class ContextSet {
636
637 Resource[] contexts;
638
639 int hash;
640
641 ContextSet(final Resource[] contexts) {
642
643 int hash = 0;
644 for (final Resource context : contexts) {
645 hash += context.hashCode();
646 }
647
648 this.contexts = contexts;
649 this.hash = hash;
650 }
651
652 @Override
653 public boolean equals(final Object object) {
654 if (object == this) {
655 return true;
656 }
657 if (!(object instanceof ContextSet)) {
658 return false;
659 }
660 final ContextSet other = (ContextSet) object;
661 if (this.hash != other.hash || this.contexts.length != other.contexts.length) {
662 return false;
663 }
664 final boolean[] matched = new boolean[this.contexts.length];
665 outer: for (int i = 0; i < this.contexts.length; ++i) {
666 final Resource thisContext = this.contexts[i];
667 for (int j = 0; j < this.contexts.length; ++j) {
668 if (!matched[j]) {
669 final Resource otherContext = other.contexts[j];
670 if (thisContext.equals(otherContext)) {
671 matched[j] = true;
672 continue outer;
673 }
674 }
675 }
676 return false;
677 }
678 return true;
679 }
680
681 @Override
682 public int hashCode() {
683 return this.hash;
684 }
685
686 }
687
688 }
689
690 }