1   /*
2    * RDFpro - An extensible tool for building stream-oriented RDF processing libraries.
3    * 
4    * Written in 2015 by Francesco Corcoglioniti with support by Alessio Palmero Aprosio and Marco
5    * Rospocher. Contact info on http://rdfpro.fbk.eu/
6    * 
7    * To the extent possible under law, the authors have dedicated all copyright and related and
8    * neighboring rights to this software to the public domain worldwide. This software is
9    * distributed without any warranty.
10   * 
11   * You should have received a copy of the CC0 Public Domain Dedication along with this software.
12   * If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.
13   */
14  package eu.fbk.rdfpro.util;
15  
16  import javax.annotation.Nullable;
17  
18  import com.google.common.base.Preconditions;
19  
20  import org.openrdf.model.Resource;
21  import org.openrdf.model.Statement;
22  import org.openrdf.model.URI;
23  import org.openrdf.model.Value;
24  import org.openrdf.rio.RDFHandler;
25  import org.openrdf.rio.RDFHandlerException;
26  
27  import eu.fbk.rdfpro.AbstractRDFHandlerWrapper;
28  
29  public abstract class StatementDeduplicator {
30  
31      // initial capacity of hash tables (num statements): 64-128 seems a good range when doing RDFS
32      // inference - lower values cause a lot of rehashing; higher values impact negatively on cache
33      // locality
34      private static final int INITIAL_TABLE_SIZE = 64;
35  
36      private static final int LOCK_NUM = 64;
37  
38      private static final int LOCK_MASK = 0x3F;
39  
40      public static StatementDeduplicator newTotalDeduplicator(final ComparisonMethod method) {
41  
42          Preconditions.checkNotNull(method);
43  
44          if (method == ComparisonMethod.EQUALS) {
45              return new TotalEqualsDeduplicator();
46          } else if (method == ComparisonMethod.HASH) {
47              return new TotalHashDeduplicator();
48          } else if (method == ComparisonMethod.IDENTITY) {
49              return new TotalIdentityDeduplicator();
50          } else {
51              throw new Error("Unexpected method " + method);
52          }
53      }
54  
55      public static StatementDeduplicator newPartialDeduplicator(final ComparisonMethod method,
56              final int numCachedStatements) {
57  
58          Preconditions.checkNotNull(method);
59          Preconditions.checkArgument(numCachedStatements > 0);
60  
61          if (method == ComparisonMethod.EQUALS) {
62              return new PartialEqualsDeduplicator(numCachedStatements);
63          } else if (method == ComparisonMethod.HASH) {
64              return new PartialHashDeduplicator(numCachedStatements);
65          } else if (method == ComparisonMethod.IDENTITY) {
66              return new PartialIdentityDeduplicator(numCachedStatements);
67          } else {
68              throw new Error("Unexpected method " + method);
69          }
70      }
71  
72      public static StatementDeduplicator newChainedDeduplicator(
73              final StatementDeduplicator... deduplicators) {
74  
75          for (final StatementDeduplicator deduplicator : deduplicators) {
76              Preconditions.checkNotNull(deduplicator);
77          }
78  
79          return deduplicators.length == 1 ? deduplicators[0] : new ChainedDeduplicator(
80                  deduplicators.clone());
81      }
82  
83      public final boolean isTotal() {
84          return total();
85      }
86  
87      public final boolean test(final Statement stmt) {
88          return process(stmt.getSubject(), stmt.getPredicate(), stmt.getObject(),
89                  stmt.getContext(), false);
90      }
91  
92      public final boolean test(final Resource subj, final URI pred, final Value obj,
93              @Nullable final Resource ctx) {
94          return process(subj, pred, obj, ctx, false);
95      }
96  
97      public final boolean add(final Statement stmt) {
98          return process(stmt.getSubject(), stmt.getPredicate(), stmt.getObject(),
99                  stmt.getContext(), true);
100     }
101 
102     public final boolean add(final Resource subj, final URI pred, final Value obj,
103             @Nullable final Resource ctx) {
104         return process(subj, pred, obj, ctx, true);
105     }
106 
107     public final RDFHandler deduplicate(final RDFHandler handler, final boolean add) {
108         return new AbstractRDFHandlerWrapper(handler) {
109 
110             @Override
111             public void handleStatement(final Statement stmt) throws RDFHandlerException {
112                 if (process(stmt.getSubject(), stmt.getPredicate(), stmt.getObject(),
113                         stmt.getContext(), add)) {
114                     super.handleStatement(stmt);
115                 }
116             }
117 
118         };
119     }
120 
121     abstract boolean total();
122 
123     abstract boolean process(Resource subj, URI pred, Value obj, @Nullable Resource ctx,
124             boolean mark);
125 
126     static Hash hash(final Resource subj, final URI pred, final Value obj, final Resource ctx) {
127         Hash hash = Hash.combine(Statements.getHash(subj), Statements.getHash(pred),
128                 Statements.getHash(obj), Statements.getHash(ctx));
129         if (hash.getLow() == 0) {
130             hash = Hash.fromLongs(hash.getHigh(), 1L);
131         }
132         return hash;
133     }
134 
135     private static Object[] rehash(final int[] hashes, final Value[] values) {
136         final int[] newHashes = new int[hashes.length * 2];
137         final Value[] newValues = new Value[values.length * 2];
138         for (int hashIndex = 0; hashIndex < hashes.length; ++hashIndex) {
139             final int valueIndex = hashIndex << 2;
140             final int hash = hashes[hashIndex];
141             int newHashIndex = (hash & 0x7FFFFFFF) % newHashes.length;
142             while (newHashes[newHashIndex] != 0) {
143                 newHashIndex++;
144                 if (newHashIndex >= newHashes.length) {
145                     newHashIndex = 0;
146                 }
147             }
148             newHashes[newHashIndex] = hash;
149             final int newValueIndex = newHashIndex << 2;
150             System.arraycopy(values, valueIndex, newValues, newValueIndex, 4);
151         }
152         return new Object[] { newHashes, newValues };
153     }
154 
155     public static enum ComparisonMethod {
156 
157         IDENTITY,
158 
159         EQUALS,
160 
161         HASH
162 
163     }
164 
165     private static final class TotalHashDeduplicator extends StatementDeduplicator {
166 
167         private final Table[] tables;
168 
169         TotalHashDeduplicator() {
170             this.tables = new Table[64];
171             for (int i = 0; i < this.tables.length; ++i) {
172                 this.tables[i] = new Table();
173             }
174         }
175 
176         @Override
177         boolean total() {
178             return true;
179         }
180 
181         @Override
182         boolean process(final Resource subj, final URI pred, final Value obj, final Resource ctx,
183                 final boolean add) {
184 
185             final Hash hash = hash(subj, pred, obj, ctx);
186 
187             final long lo = hash.getLow();
188             final long hi = hash.getHigh();
189 
190             return this.tables[(int) hi & 0x3F].process(lo, hi, add);
191         }
192 
193         private static class Table {
194 
195             private long[] hashes;
196 
197             private int size;
198 
199             Table() {
200                 this.hashes = new long[2 * INITIAL_TABLE_SIZE];
201                 this.size = 0;
202             }
203 
204             synchronized boolean process(final long lo, final long hi, final boolean add) {
205 
206                 int slot = ((int) lo & 0x7FFFFFFF) % (this.hashes.length >>> 1) << 1;
207                 while (true) {
208                     final long storedLo = this.hashes[slot];
209                     final long storedHi = this.hashes[slot + 1];
210                     if (storedLo == 0L) {
211                         if (add) {
212                             this.hashes[slot] = lo;
213                             this.hashes[slot + 1] = hi;
214                             ++this.size;
215                             if (this.size >= this.hashes.length / 3) { // fill factor 0.66
216                                 rehash();
217                             }
218                         }
219                         return true;
220 
221                     } else if (storedLo == lo && storedHi == hi) {
222                         return false;
223 
224                     } else {
225                         slot += 2;
226                         if (slot >= this.hashes.length) {
227                             slot = 0;
228                         }
229                     }
230                 }
231             }
232 
233             private void rehash() {
234                 final long[] newTable = new long[this.hashes.length * 2];
235                 for (int slot = 0; slot < this.hashes.length; slot += 2) {
236                     final long lo = this.hashes[slot];
237                     final long hi = this.hashes[slot + 1];
238                     int newSlot = ((int) lo & 0x7FFFFFFF) % (newTable.length >>> 1) << 1;
239                     while (newTable[newSlot] != 0L) {
240                         newSlot += 2;
241                         if (newSlot >= newTable.length) {
242                             newSlot = 0;
243                         }
244                     }
245                     newTable[newSlot] = lo;
246                     newTable[newSlot + 1] = hi;
247                 }
248                 this.hashes = newTable;
249             }
250 
251         }
252 
253     }
254 
255     private static final class TotalEqualsDeduplicator extends StatementDeduplicator {
256 
257         private int[] hashes; // SPOC hashes
258 
259         private Value[] values; // 4 values for each statement
260 
261         private int size; // # statements
262 
263         TotalEqualsDeduplicator() {
264             this.hashes = new int[INITIAL_TABLE_SIZE];
265             this.values = new Value[INITIAL_TABLE_SIZE * 4];
266         }
267 
268         @Override
269         boolean total() {
270             return true;
271         }
272 
273         @Override
274         boolean process(final Resource subj, final URI pred, final Value obj, final Resource ctx,
275                 final boolean add) {
276 
277             int hash = 6661 * subj.hashCode() + 961 * pred.hashCode() + 31 * obj.hashCode()
278                     + (ctx == null ? 0 : ctx.hashCode());
279             hash = hash != 0 ? hash : 1;
280 
281             synchronized (this) {
282                 int hashIndex = (hash & 0x7FFFFFFF) % this.hashes.length;
283                 while (true) {
284                     if (this.hashes[hashIndex] == 0) {
285                         if (add) {
286                             final int valueIndex = hashIndex << 2;
287                             this.hashes[hashIndex] = hash;
288                             this.values[valueIndex] = subj;
289                             this.values[valueIndex + 1] = pred;
290                             this.values[valueIndex + 2] = obj;
291                             this.values[valueIndex + 3] = ctx;
292                             ++this.size;
293                             if (this.size >= (this.hashes.length << 1) / 3) { // fill factor 0.66
294                                 final Object[] pair = rehash(this.hashes, this.values);
295                                 this.hashes = (int[]) pair[0];
296                                 this.values = (Value[]) pair[1];
297                             }
298                         }
299                         return true;
300 
301                     } else if (this.hashes[hashIndex] == hash) {
302                         final int valueIndex = hashIndex << 2;
303                         if (subj.equals(this.values[valueIndex])
304                                 && pred.equals(this.values[valueIndex + 1])
305                                 && obj.equals(this.values[valueIndex + 2])
306                                 && (ctx == null && this.values[valueIndex + 3] == null || ctx != null
307                                         && ctx.equals(this.values[valueIndex + 3]))) {
308                             return false;
309                         }
310                     }
311 
312                     ++hashIndex;
313                     if (hashIndex == this.hashes.length) {
314                         hashIndex = 0;
315                     }
316                 }
317             }
318         }
319 
320     }
321 
322     private static final class TotalIdentityDeduplicator extends StatementDeduplicator {
323 
324         private int[] hashes; // SPOC hashes
325 
326         private Value[] values; // 4 values for each statement
327 
328         private int size; // # statements
329 
330         TotalIdentityDeduplicator() {
331             this.hashes = new int[INITIAL_TABLE_SIZE];
332             this.values = new Value[INITIAL_TABLE_SIZE * 4];
333         }
334 
335         @Override
336         boolean total() {
337             return true;
338         }
339 
340         @Override
341         boolean process(final Resource subj, final URI pred, final Value obj, final Resource ctx,
342                 final boolean add) {
343 
344             int hash = 6661 * System.identityHashCode(subj) + 961 * System.identityHashCode(pred)
345                     + 31 * System.identityHashCode(obj)
346                     + (ctx == null ? 0 : System.identityHashCode(ctx));
347             hash = hash != 0 ? hash : 1;
348 
349             synchronized (this) {
350                 int hashIndex = (hash & 0x7FFFFFFF) % this.hashes.length;
351                 while (true) {
352                     if (this.hashes[hashIndex] == 0) {
353                         if (add) {
354                             final int valueIndex = hashIndex << 2;
355                             this.hashes[hashIndex] = hash;
356                             this.values[valueIndex] = subj;
357                             this.values[valueIndex + 1] = pred;
358                             this.values[valueIndex + 2] = obj;
359                             this.values[valueIndex + 3] = ctx;
360                             ++this.size;
361                             if (this.size >= (this.hashes.length << 1) / 3) { // fill factor 0.66
362                                 final Object[] pair = rehash(this.hashes, this.values);
363                                 this.hashes = (int[]) pair[0];
364                                 this.values = (Value[]) pair[1];
365                             }
366                         }
367                         return true;
368 
369                     } else if (this.hashes[hashIndex] == hash) {
370                         final int valueIndex = hashIndex << 2;
371                         if (subj == this.values[valueIndex] //
372                                 && pred == this.values[valueIndex + 1]
373                                 && obj == this.values[valueIndex + 2]
374                                 && ctx == this.values[valueIndex + 3]) {
375                             return false;
376                         }
377 
378                     }
379 
380                     ++hashIndex;
381                     if (hashIndex == this.hashes.length) {
382                         hashIndex = 0;
383                     }
384                 }
385             }
386         }
387 
388     }
389 
390     private static final class PartialHashDeduplicator extends StatementDeduplicator {
391 
392         private final long[] hashes;
393 
394         private final Object[] locks;
395 
396         PartialHashDeduplicator(final int numCachedStatements) {
397             this.hashes = new long[numCachedStatements * 2];
398             this.locks = new Object[LOCK_NUM];
399             for (int i = 0; i < LOCK_NUM; ++i) {
400                 this.locks[i] = new Object();
401             }
402         }
403 
404         @Override
405         boolean total() {
406             return false;
407         }
408 
409         @Override
410         boolean process(final Resource subj, final URI pred, final Value obj,
411                 @Nullable final Resource ctx, final boolean add) {
412 
413             final Hash hash = hash(subj, pred, obj, ctx);
414 
415             final long hi = hash.getHigh();
416             final long lo = hash.getLow();
417 
418             final int index = ((int) lo & 0x7FFFFFFF) % (this.hashes.length >>> 1) << 1;
419 
420             synchronized (this.locks[index & LOCK_MASK]) {
421                 if (this.hashes[index] == hi && this.hashes[index + 1] == lo) {
422                     return false;
423                 }
424                 if (add) {
425                     this.hashes[index] = hi;
426                     this.hashes[index + 1] = lo;
427                 }
428             }
429 
430             return true;
431         }
432 
433     }
434 
435     private static final class PartialIdentityDeduplicator extends StatementDeduplicator {
436 
437         private final int[] hashes;
438 
439         private final Value[] values;
440 
441         @Nullable
442         private final Object[] locks;
443 
444         PartialIdentityDeduplicator(final int numCachedStatements) {
445             this.hashes = new int[numCachedStatements];
446             this.values = new Value[numCachedStatements * 4];
447             this.locks = new Object[LOCK_NUM];
448             for (int i = 0; i < LOCK_NUM; ++i) {
449                 this.locks[i] = new Object();
450             }
451         }
452 
453         @Override
454         boolean total() {
455             return false;
456         }
457 
458         @Override
459         boolean process(final Resource subj, final URI pred, final Value obj,
460                 @Nullable final Resource ctx, final boolean add) {
461 
462             int hash = 6661 * System.identityHashCode(subj) + 961 * System.identityHashCode(pred)
463                     + 31 * System.identityHashCode(obj)
464                     + (ctx == null ? 0 : System.identityHashCode(ctx));
465             hash = hash != 0 ? hash : 1;
466 
467             final int hashIndex = (hash & 0x7FFFFFFF) % this.hashes.length;
468             final int valueIndex = hashIndex << 2;
469 
470             synchronized (this.locks[hash & LOCK_MASK]) {
471                 if (this.hashes[hashIndex] == hash //
472                         && subj == this.values[valueIndex]
473                         && pred == this.values[valueIndex + 1]
474                         && obj == this.values[valueIndex + 2]
475                         && ctx == this.values[valueIndex + 3]) {
476                     return false;
477                 }
478                 if (add) {
479                     this.hashes[hashIndex] = hash;
480                     this.values[valueIndex] = subj;
481                     this.values[valueIndex + 1] = pred;
482                     this.values[valueIndex + 2] = obj;
483                     this.values[valueIndex + 3] = ctx;
484                 }
485             }
486 
487             return true;
488         }
489 
490     }
491 
492     private static final class PartialEqualsDeduplicator extends StatementDeduplicator {
493 
494         private final int[] hashes;
495 
496         private final Value[] values;
497 
498         @Nullable
499         private final Object[] locks;
500 
501         PartialEqualsDeduplicator(final int numCachedStatements) {
502             this.hashes = new int[numCachedStatements];
503             this.values = new Value[numCachedStatements * 4];
504             this.locks = new Object[LOCK_NUM];
505             for (int i = 0; i < LOCK_NUM; ++i) {
506                 this.locks[i] = new Object();
507             }
508         }
509 
510         @Override
511         boolean total() {
512             return false;
513         }
514 
515         @Override
516         boolean process(final Resource subj, final URI pred, final Value obj,
517                 @Nullable final Resource ctx, final boolean add) {
518 
519             int hash = 6661 * subj.hashCode() + 961 * pred.hashCode() + 31 * obj.hashCode()
520                     + (ctx == null ? 0 : ctx.hashCode());
521             hash = hash != 0 ? hash : 1;
522 
523             final int hashIndex = (hash & 0x7FFFFFFF) % this.hashes.length;
524             final int valueIndex = hashIndex << 2;
525 
526             synchronized (this.locks[hash & LOCK_MASK]) {
527                 if (this.hashes[hashIndex] == hash
528                         && subj.equals(this.values[valueIndex])
529                         && pred.equals(this.values[valueIndex + 1])
530                         && obj.equals(this.values[valueIndex + 2])
531                         && (ctx == null && this.values[valueIndex + 3] == null || ctx != null
532                                 && ctx.equals(this.values[valueIndex + 3]))) {
533                     return false;
534                 }
535                 if (add) {
536                     this.hashes[hashIndex] = hash;
537                     this.values[valueIndex] = subj;
538                     this.values[valueIndex + 1] = pred;
539                     this.values[valueIndex + 2] = obj;
540                     this.values[valueIndex + 3] = ctx;
541                 }
542             }
543 
544             return true;
545         }
546 
547     }
548 
549     private static class ChainedDeduplicator extends StatementDeduplicator {
550 
551         private final StatementDeduplicator[] deduplicators;
552 
553         private final boolean total;
554 
555         ChainedDeduplicator(final StatementDeduplicator[] deduplicators) {
556 
557             boolean total = deduplicators.length > 0;
558             for (final StatementDeduplicator deduplicator : deduplicators) {
559                 if (!deduplicator.isTotal()) {
560                     total = false;
561                     break;
562                 }
563             }
564 
565             this.deduplicators = deduplicators;
566             this.total = total;
567         }
568 
569         @Override
570         boolean total() {
571             return this.total;
572         }
573 
574         @Override
575         boolean process(final Resource subj, final URI pred, final Value obj, final Resource ctx,
576                 final boolean add) {
577 
578             for (final StatementDeduplicator deduplicator : this.deduplicators) {
579                 if (!deduplicator.process(subj, pred, obj, ctx, true)) {
580                     return false;
581                 }
582             }
583             return true;
584         }
585 
586     }
587 
588 }