1   /*
2    * RDFpro - An extensible tool for building stream-oriented RDF processing libraries.
3    * 
4    * Written in 2014 by Francesco Corcoglioniti with support by Marco Amadori, Michele Mostarda,
5    * Alessio Palmero Aprosio and Marco Rospocher. Contact info on http://rdfpro.fbk.eu/
6    * 
7    * To the extent possible under law, the authors have dedicated all copyright and related and
8    * neighboring rights to this software to the public domain worldwide. This software is
9    * distributed without any warranty.
10   * 
11   * You should have received a copy of the CC0 Public Domain Dedication along with this software.
12   * If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.
13   */
14  package eu.fbk.rdfpro;
15  
16  import java.io.IOException;
17  import java.util.ArrayList;
18  import java.util.Arrays;
19  import java.util.Collections;
20  import java.util.HashMap;
21  import java.util.HashSet;
22  import java.util.List;
23  import java.util.Map;
24  import java.util.Objects;
25  import java.util.Set;
26  import java.util.concurrent.ConcurrentHashMap;
27  import java.util.function.Consumer;
28  
29  import javax.annotation.Nullable;
30  
31  import org.openrdf.model.Resource;
32  import org.openrdf.model.Statement;
33  import org.openrdf.model.URI;
34  import org.openrdf.model.Value;
35  import org.openrdf.rio.RDFHandler;
36  import org.openrdf.rio.RDFHandlerException;
37  
38  import eu.fbk.rdfpro.util.Hash;
39  import eu.fbk.rdfpro.util.IO;
40  import eu.fbk.rdfpro.util.Sorter;
41  import eu.fbk.rdfpro.util.Statements;
42  
43  final class ProcessorUnique implements RDFProcessor {
44  
45      private final boolean mergeContexts;
46  
47      ProcessorUnique(final boolean mergeContexts) {
48          this.mergeContexts = mergeContexts;
49      }
50  
51      @SuppressWarnings("resource")
52      @Override
53      public RDFHandler wrap(final RDFHandler handler) {
54          Objects.requireNonNull(handler);
55          return this.mergeContexts ? new MergeHandler(RDFHandlers.decouple(handler)) //
56                  : new Handler(handler, true);
57      }
58  
59      // private static final class KeepContextsHandler extends AbstractRDFHandlerWrapper {
60      //
61      // private final int threshold;
62      //
63      // private StatementDeduplicator deduplicator;
64      //
65      // private AtomicInteger count;
66      //
67      // private Sorter<Statement> sorter;
68      //
69      // KeepContextsHandler(final RDFHandler handler) {
70      // super(handler);
71      // this.threshold = (int) (Runtime.getRuntime().freeMemory() / 2 / 24);
72      // this.deduplicator = null;
73      // this.sorter = null;
74      // this.count = null;
75      // }
76      //
77      // @Override
78      // public void startRDF() throws RDFHandlerException {
79      // super.startRDF();
80      // this.deduplicator = StatementDeduplicator.newHashDeduplicator();
81      // this.count = new AtomicInteger(0);
82      // this.sorter = Sorter.newStatementSorter(true);
83      // try {
84      // this.sorter.start(true);
85      // } catch (final IOException ex) {
86      // throw new RDFHandlerException(ex);
87      // }
88      // }
89      //
90      // @Override
91      // public void handleStatement(final Statement stmt) throws RDFHandlerException {
92      // try {
93      // if (this.deduplicator.isNew(stmt)) {
94      // final int count = this.count.incrementAndGet();
95      // }
96      //
97      // this.sorter.emit(statement);
98      // } catch (final Throwable ex) {
99      // throw new RDFHandlerException(ex);
100     // }
101     // }
102     //
103     // @Override
104     // public void endRDF() throws RDFHandlerException {
105     // try {
106     // this.sorter.end(true, new Consumer<Statement>() {
107     //
108     // @Override
109     // public void accept(final Statement statement) {
110     // try {
111     // KeepContextsHandler.this.handler.handleStatement(statement);
112     // } catch (final RDFHandlerException ex) {
113     // throw new RuntimeException(ex);
114     // }
115     // }
116     //
117     // });
118     // this.sorter.close();
119     // this.sorter = null;
120     // } catch (final IOException ex) {
121     // throw new RDFHandlerException(ex);
122     // }
123     // super.endRDF();
124     // }
125     //
126     // @Override
127     // public final void close() {
128     // IO.closeQuietly(this.sorter);
129     // super.close();
130     // }
131     //
132     // }
133 
134     // private static final class MergeContextsHandler extends AbstractRDFHandlerWrapper {
135     //
136     // private final Map<Resource, List<Statement>> contextsStatements;
137     //
138     // private final Map<ContextSet, Resource> mergedContexts;
139     //
140     // @Nullable
141     // private Sorter<Statement> sorter;
142     //
143     // @Nullable
144     // private Statement statement;
145     //
146     // @Nullable
147     // private Resource statementSubj;
148     //
149     // @Nullable
150     // private URI statementPred;
151     //
152     // @Nullable
153     // private Value statementObj;
154     //
155     // @Nullable
156     // private Resource statementCtx; // if there is only a context;
157     //
158     // private final Set<Resource> statementContexts; // if there are multiple contexts
159     //
160     // public MergeContextsHandler(final RDFHandler handler) {
161     // super(handler);
162     // this.sorter = null;
163     // this.contextsStatements = new ConcurrentHashMap<>();
164     // this.mergedContexts = new HashMap<>();
165     // this.statementSubj = null;
166     // this.statementPred = null;
167     // this.statementObj = null;
168     // this.statementCtx = null;
169     // this.statementContexts = new HashSet<>();
170     // }
171     //
172     // @Override
173     // public void startRDF() throws RDFHandlerException {
174     // super.startRDF();
175     // this.sorter = Sorter.newStatementSorter(true);
176     // try {
177     // this.sorter.start(true);
178     // } catch (final IOException ex) {
179     // throw new RDFHandlerException(ex);
180     // }
181     // }
182     //
183     // @Override
184     // public void handleStatement(final Statement statement) throws RDFHandlerException {
185     // try {
186     // this.sorter.emit(statement);
187     // } catch (final Throwable ex) {
188     // throw new RDFHandlerException(ex);
189     // }
190     // final Resource context = statement.getContext();
191     // if (context != null) {
192     // this.contextsStatements.putIfAbsent(context, Collections.<Statement>emptyList());
193     // }
194     // }
195     //
196     // @Override
197     // public void endRDF() throws RDFHandlerException {
198     // try {
199     // this.sorter.end(false, new Consumer<Statement>() {
200     //
201     // @Override
202     // public void accept(final Statement statement) {
203     // try {
204     // handleStatementSorted(statement);
205     // } catch (final RDFHandlerException ex) {
206     // throw new RuntimeException(ex);
207     // }
208     // }
209     //
210     // });
211     // this.sorter.close();
212     // this.sorter = null;
213     // handleEndRDF();
214     // } catch (final IOException ex) {
215     // throw new RDFHandlerException(ex);
216     // }
217     // super.endRDF();
218     // }
219     //
220     // void handleStatementSorted(final Statement statement) throws RDFHandlerException {
221     //
222     // final Resource subj = statement.getSubject();
223     // final URI pred = statement.getPredicate();
224     // final Value obj = statement.getObject();
225     // final Resource ctx = statement.getContext();
226     //
227     // List<Statement> contextStatements = this.contextsStatements.get(subj);
228     // if (contextStatements != null) {
229     // if (contextStatements.isEmpty()) {
230     // contextStatements = new ArrayList<>();
231     // this.contextsStatements.put(subj, contextStatements);
232     // }
233     // contextStatements.add(statement); // context data buffered and emitted later
234     //
235     // } else if (subj.equals(this.statementSubj) && pred.equals(this.statementPred)
236     // && obj.equals(this.statementObj)) {
237     // if (this.statementCtx != null) {
238     // if (ctx == null) {
239     // this.statementCtx = null;
240     // this.statement = statement;
241     // } else {
242     // if (this.statementContexts.isEmpty()) {
243     // // we add the context firstly seen only now, so to avoid useless work
244     // // in the frequent case the input contains almost unique statements
245     // this.statementContexts.add(this.statementCtx);
246     // }
247     // this.statementContexts.add(ctx);
248     // }
249     // }
250     //
251     // } else {
252     // flush();
253     // this.statement = statement;
254     // this.statementSubj = subj;
255     // this.statementPred = pred;
256     // this.statementObj = obj;
257     // this.statementCtx = ctx;
258     // this.statementContexts.clear();
259     // }
260     // }
261     //
262     // void handleEndRDF() throws RDFHandlerException {
263     // flush();
264     // for (final List<Statement> statements : this.contextsStatements.values()) {
265     // for (final Statement statement : statements) {
266     // this.handler.handleStatement(statement);
267     // }
268     // }
269     // for (final Map.Entry<ContextSet, Resource> entry : this.mergedContexts.entrySet()) {
270     // final ContextSet set = entry.getKey();
271     // final Resource context = entry.getValue();
272     // final Set<Statement> statements = new HashSet<Statement>();
273     // for (final Resource source : set.contexts) {
274     // for (final Statement statement : this.contextsStatements.get(source)) {
275     // statements.add(Statements.VALUE_FACTORY.createStatement(context,
276     // statement.getPredicate(), statement.getObject(),
277     // statement.getContext()));
278     // }
279     // }
280     // for (final Statement statement : statements) {
281     // this.handler.handleStatement(statement);
282     // }
283     // }
284     // }
285     //
286     // private void flush() throws RDFHandlerException {
287     // if (this.statement != null) {
288     // Statement statement;
289     // if (this.statementCtx == null || this.statementContexts.size() <= 1) {
290     // statement = this.statement;
291     // } else {
292     // final Resource mergedContext = mergeContexts(this.statementContexts);
293     // statement = mergedContext.equals(this.statement.getContext()) ? this.statement
294     // : Statements.VALUE_FACTORY.createStatement(this.statementSubj,
295     // this.statementPred, this.statementObj, mergedContext);
296     // }
297     // this.handler.handleStatement(statement);
298     // }
299     // }
300     //
301     // @Nullable
302     // private Resource mergeContexts(final Set<Resource> contexts) {
303     //
304     // final ContextSet set = new ContextSet(contexts.toArray(new Resource[contexts.size()]));
305     //
306     // Resource context = this.mergedContexts.get(set);
307     // if (context == null) {
308     // final String[] args = new String[contexts.size()];
309     // String namespace = null;
310     // int index = 0;
311     // for (final Resource source : contexts) {
312     // args[index++] = source.stringValue();
313     // if (source instanceof URI) {
314     // final String ns = ((URI) source).getNamespace();
315     // if (namespace == null) {
316     // namespace = ns;
317     // } else {
318     // final int length = Math.min(ns.length(), namespace.length());
319     // for (int i = 0; i < length; ++i) {
320     // if (ns.charAt(i) != namespace.charAt(i)) {
321     // namespace = ns.substring(0, i);
322     // break;
323     // }
324     // }
325     // }
326     // }
327     // }
328     // Arrays.sort(args);
329     // if (namespace == null || "".equals(namespace)) {
330     // namespace = "urn:graph:";
331     // } else if (!namespace.endsWith("/") && !namespace.endsWith("#")
332     // && !namespace.endsWith(":")) {
333     // namespace = namespace + "/";
334     // }
335     // final String localName = Hash.murmur3(args).toString();
336     // context = Statements.VALUE_FACTORY.createURI(namespace, localName);
337     // this.mergedContexts.put(set, context);
338     // }
339     // return context;
340     // }
341     //
342     // private static final class ContextSet {
343     //
344     // Resource[] contexts;
345     //
346     // int hash;
347     //
348     // ContextSet(final Resource[] contexts) {
349     //
350     // int hash = 0;
351     // for (final Resource context : contexts) {
352     // hash += context.hashCode();
353     // }
354     //
355     // this.contexts = contexts;
356     // this.hash = hash;
357     // }
358     //
359     // @Override
360     // public boolean equals(final Object object) {
361     // if (object == this) {
362     // return true;
363     // }
364     // if (!(object instanceof ContextSet)) {
365     // return false;
366     // }
367     // final ContextSet other = (ContextSet) object;
368     // if (this.hash != other.hash || this.contexts.length != other.contexts.length) {
369     // return false;
370     // }
371     // final boolean[] matched = new boolean[this.contexts.length];
372     // outer: for (int i = 0; i < this.contexts.length; ++i) {
373     // final Resource thisContext = this.contexts[i];
374     // for (int j = 0; j < this.contexts.length; ++j) {
375     // if (!matched[j]) {
376     // final Resource otherContext = other.contexts[j];
377     // if (thisContext.equals(otherContext)) {
378     // matched[j] = true;
379     // continue outer;
380     // }
381     // }
382     // }
383     // return false;
384     // }
385     // return true;
386     // }
387     //
388     // @Override
389     // public int hashCode() {
390     // return this.hash;
391     // }
392     //
393     // }
394     //
395     // }
396 
397     private static class Handler extends AbstractRDFHandlerWrapper {
398 
399         private final boolean parallelize;
400 
401         private Sorter<Statement> sorter;
402 
403         Handler(final RDFHandler handler, final boolean parallelize) {
404             super(handler);
405             this.parallelize = parallelize;
406             this.sorter = null;
407         }
408 
409         @Override
410         public void startRDF() throws RDFHandlerException {
411             super.startRDF();
412             this.sorter = Sorter.newStatementSorter(true);
413             try {
414                 this.sorter.start(true);
415             } catch (final IOException ex) {
416                 throw new RDFHandlerException(ex);
417             }
418         }
419 
420         @Override
421         public void handleStatement(final Statement statement) throws RDFHandlerException {
422             try {
423                 this.sorter.emit(statement);
424             } catch (final Throwable ex) {
425                 throw new RDFHandlerException(ex);
426             }
427         }
428 
429         @Override
430         public void endRDF() throws RDFHandlerException {
431             try {
432                 this.sorter.end(this.parallelize, new Consumer<Statement>() {
433 
434                     @Override
435                     public void accept(final Statement statement) {
436                         try {
437                             handleStatementSorted(statement);
438                         } catch (final RDFHandlerException ex) {
439                             throw new RuntimeException(ex);
440                         }
441                     }
442 
443                 });
444                 this.sorter.close();
445                 this.sorter = null;
446                 handleEndRDF();
447             } catch (final IOException ex) {
448                 throw new RDFHandlerException(ex);
449             }
450             super.endRDF();
451         }
452 
453         @Override
454         public final void close() {
455             IO.closeQuietly(this.sorter);
456             super.close();
457         }
458 
459         void handleStatementSorted(final Statement statement) throws RDFHandlerException {
460             this.handler.handleStatement(statement);
461         }
462 
463         void handleEndRDF() throws RDFHandlerException {
464         }
465 
466     }
467 
468     private static final class MergeHandler extends Handler {
469 
470         private final Map<Resource, List<Statement>> contextsStatements;
471 
472         private final Map<ContextSet, Resource> mergedContexts;
473 
474         @Nullable
475         private Statement statement;
476 
477         @Nullable
478         private Resource statementSubj;
479 
480         @Nullable
481         private URI statementPred;
482 
483         @Nullable
484         private Value statementObj;
485 
486         @Nullable
487         private Resource statementCtx; // if there is only a context;
488 
489         private final Set<Resource> statementContexts; // if there are multiple contexts
490 
491         public MergeHandler(final RDFHandler handler) {
492             super(handler, false);
493             this.contextsStatements = new ConcurrentHashMap<>();
494             this.mergedContexts = new HashMap<>();
495             this.statementSubj = null;
496             this.statementPred = null;
497             this.statementObj = null;
498             this.statementCtx = null;
499             this.statementContexts = new HashSet<>();
500         }
501 
502         @Override
503         public void handleStatement(final Statement statement) throws RDFHandlerException {
504             super.handleStatement(statement);
505             final Resource context = statement.getContext();
506             if (context != null) {
507                 this.contextsStatements.putIfAbsent(context, Collections.<Statement>emptyList());
508             }
509         }
510 
511         @Override
512         void handleStatementSorted(final Statement statement) throws RDFHandlerException {
513 
514             final Resource subj = statement.getSubject();
515             final URI pred = statement.getPredicate();
516             final Value obj = statement.getObject();
517             final Resource ctx = statement.getContext();
518 
519             List<Statement> contextStatements = this.contextsStatements.get(subj);
520             if (contextStatements != null) {
521                 if (contextStatements.isEmpty()) {
522                     contextStatements = new ArrayList<>();
523                     this.contextsStatements.put(subj, contextStatements);
524                 }
525                 contextStatements.add(statement); // context data buffered and emitted later
526 
527             } else if (subj.equals(this.statementSubj) && pred.equals(this.statementPred)
528                     && obj.equals(this.statementObj)) {
529                 if (this.statementCtx != null) {
530                     if (ctx == null) {
531                         this.statementCtx = null;
532                         this.statement = statement;
533                     } else {
534                         if (this.statementContexts.isEmpty()) {
535                             // we add the context firstly seen only now, so to avoid useless work
536                             // in the frequent case the input contains almost unique statements
537                             this.statementContexts.add(this.statementCtx);
538                         }
539                         this.statementContexts.add(ctx);
540                     }
541                 }
542 
543             } else {
544                 flush();
545                 this.statement = statement;
546                 this.statementSubj = subj;
547                 this.statementPred = pred;
548                 this.statementObj = obj;
549                 this.statementCtx = ctx;
550                 this.statementContexts.clear();
551             }
552         }
553 
554         @Override
555         void handleEndRDF() throws RDFHandlerException {
556             flush();
557             for (final List<Statement> statements : this.contextsStatements.values()) {
558                 for (final Statement statement : statements) {
559                     this.handler.handleStatement(statement);
560                 }
561             }
562             for (final Map.Entry<ContextSet, Resource> entry : this.mergedContexts.entrySet()) {
563                 final ContextSet set = entry.getKey();
564                 final Resource context = entry.getValue();
565                 final Set<Statement> statements = new HashSet<Statement>();
566                 for (final Resource source : set.contexts) {
567                     for (final Statement statement : this.contextsStatements.get(source)) {
568                         statements.add(Statements.VALUE_FACTORY.createStatement(context,
569                                 statement.getPredicate(), statement.getObject(),
570                                 statement.getContext()));
571                     }
572                 }
573                 for (final Statement statement : statements) {
574                     this.handler.handleStatement(statement);
575                 }
576             }
577         }
578 
579         private void flush() throws RDFHandlerException {
580             if (this.statement != null) {
581                 Statement statement;
582                 if (this.statementCtx == null || this.statementContexts.size() <= 1) {
583                     statement = this.statement;
584                 } else {
585                     final Resource mergedContext = mergeContexts(this.statementContexts);
586                     statement = mergedContext.equals(this.statement.getContext()) ? this.statement
587                             : Statements.VALUE_FACTORY.createStatement(this.statementSubj,
588                                     this.statementPred, this.statementObj, mergedContext);
589                 }
590                 this.handler.handleStatement(statement);
591             }
592         }
593 
594         @Nullable
595         private Resource mergeContexts(final Set<Resource> contexts) {
596 
597             final ContextSet set = new ContextSet(contexts.toArray(new Resource[contexts.size()]));
598 
599             Resource context = this.mergedContexts.get(set);
600             if (context == null) {
601                 final String[] args = new String[contexts.size()];
602                 String namespace = null;
603                 int index = 0;
604                 for (final Resource source : contexts) {
605                     args[index++] = source.stringValue();
606                     if (source instanceof URI) {
607                         final String ns = ((URI) source).getNamespace();
608                         if (namespace == null) {
609                             namespace = ns;
610                         } else {
611                             final int length = Math.min(ns.length(), namespace.length());
612                             for (int i = 0; i < length; ++i) {
613                                 if (ns.charAt(i) != namespace.charAt(i)) {
614                                     namespace = ns.substring(0, i);
615                                     break;
616                                 }
617                             }
618                         }
619                     }
620                 }
621                 Arrays.sort(args);
622                 if (namespace == null || "".equals(namespace)) {
623                     namespace = "urn:graph:";
624                 } else if (!namespace.endsWith("/") && !namespace.endsWith("#")
625                         && !namespace.endsWith(":")) {
626                     namespace = namespace + "/";
627                 }
628                 final String localName = Hash.murmur3(args).toString();
629                 context = Statements.VALUE_FACTORY.createURI(namespace, localName);
630                 this.mergedContexts.put(set, context);
631             }
632             return context;
633         }
634 
635         private static final class ContextSet {
636 
637             Resource[] contexts;
638 
639             int hash;
640 
641             ContextSet(final Resource[] contexts) {
642 
643                 int hash = 0;
644                 for (final Resource context : contexts) {
645                     hash += context.hashCode();
646                 }
647 
648                 this.contexts = contexts;
649                 this.hash = hash;
650             }
651 
652             @Override
653             public boolean equals(final Object object) {
654                 if (object == this) {
655                     return true;
656                 }
657                 if (!(object instanceof ContextSet)) {
658                     return false;
659                 }
660                 final ContextSet other = (ContextSet) object;
661                 if (this.hash != other.hash || this.contexts.length != other.contexts.length) {
662                     return false;
663                 }
664                 final boolean[] matched = new boolean[this.contexts.length];
665                 outer: for (int i = 0; i < this.contexts.length; ++i) {
666                     final Resource thisContext = this.contexts[i];
667                     for (int j = 0; j < this.contexts.length; ++j) {
668                         if (!matched[j]) {
669                             final Resource otherContext = other.contexts[j];
670                             if (thisContext.equals(otherContext)) {
671                                 matched[j] = true;
672                                 continue outer;
673                             }
674                         }
675                     }
676                     return false;
677                 }
678                 return true;
679             }
680 
681             @Override
682             public int hashCode() {
683                 return this.hash;
684             }
685 
686         }
687 
688     }
689 
690 }