1   /*
2    * RDFpro - An extensible tool for building stream-oriented RDF processing libraries.
3    * 
4    * Written in 2014 by Francesco Corcoglioniti with support by Marco Amadori, Michele Mostarda,
5    * Alessio Palmero Aprosio and Marco Rospocher. Contact info on http://rdfpro.fbk.eu/
6    * 
7    * To the extent possible under law, the authors have dedicated all copyright and related and
8    * neighboring rights to this software to the public domain worldwide. This software is
9    * distributed without any warranty.
10   * 
11   * You should have received a copy of the CC0 Public Domain Dedication along with this software.
12   * If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.
13   */
14  package eu.fbk.rdfpro;
15  
16  import java.nio.ByteBuffer;
17  import java.util.ArrayList;
18  import java.util.Comparator;
19  import java.util.List;
20  import java.util.Objects;
21  
22  import javax.annotation.Nullable;
23  
24  import org.openrdf.model.BNode;
25  import org.openrdf.model.Literal;
26  import org.openrdf.model.Resource;
27  import org.openrdf.model.Statement;
28  import org.openrdf.model.URI;
29  import org.openrdf.model.Value;
30  import org.openrdf.model.vocabulary.OWL;
31  import org.openrdf.rio.RDFHandler;
32  import org.openrdf.rio.RDFHandlerException;
33  import org.slf4j.Logger;
34  import org.slf4j.LoggerFactory;
35  
36  import eu.fbk.rdfpro.util.Statements;
37  
38  final class ProcessorSmush implements RDFProcessor {
39  
40      private static final Logger LOGGER = LoggerFactory.getLogger(ProcessorSmush.class);
41  
42      private static final int BUFFER_SIZE = 262144;
43  
44      private final String[] rankedNamespaces;
45  
46      ProcessorSmush(final String... rankedNamespaces) {
47          this.rankedNamespaces = rankedNamespaces.clone();
48      }
49  
50      @Override
51      public int getExtraPasses() {
52          return 1;
53      }
54  
55      @Override
56      public RDFHandler wrap(final RDFHandler handler) {
57          return new Handler(Objects.requireNonNull(handler));
58      }
59  
60      private final class Handler extends AbstractRDFHandlerWrapper {
61  
62          private int[] table;
63  
64          private int entries;
65  
66          private ByteBuffer[] buffers;
67  
68          private int endPointer;
69  
70          private boolean firstPass;
71  
72          private long numLookups;
73  
74          private long numTests;
75  
76          Handler(final RDFHandler handler) {
77  
78              super(handler);
79  
80              this.table = new int[1022]; // 511 entries, ~4K memory page
81              this.entries = 0;
82              this.buffers = new ByteBuffer[65536];
83              this.endPointer = pointerFor(0, 4); // skip first word to avoid 0 pointers
84              this.firstPass = true;
85              this.numLookups = 0;
86              this.numTests = 0;
87  
88              this.buffers[0] = ByteBuffer.allocate(BUFFER_SIZE);
89          }
90  
91          @Override
92          public void startRDF() throws RDFHandlerException {
93              if (!this.firstPass) {
94                  super.startRDF();
95              }
96          }
97  
98          @Override
99          public void handleComment(final String comment) throws RDFHandlerException {
100             if (!this.firstPass) {
101                 super.handleComment(comment);
102             }
103         }
104 
105         @Override
106         public void handleNamespace(final String prefix, final String uri)
107                 throws RDFHandlerException {
108             if (!this.firstPass) {
109                 super.handleNamespace(prefix, uri);
110             }
111         }
112 
113         @Override
114         public void handleStatement(final Statement statement) throws RDFHandlerException {
115 
116             final Resource s = statement.getSubject();
117             final URI p = statement.getPredicate();
118             final Value o = statement.getObject();
119             final Resource c = statement.getContext();
120 
121             final boolean isSameAs = p.equals(OWL.SAMEAS) && o instanceof Resource;
122 
123             if (!this.firstPass) {
124                 final Resource sn = rewrite(s);
125                 final Value on = o instanceof Literal ? o : rewrite((Resource) o);
126                 final Resource cn = c == null ? null : rewrite(c);
127                 if (isSameAs) {
128                     if (sn != s) {
129                         super.handleStatement(createStatement(sn, OWL.SAMEAS, s, cn));
130                     }
131                     if (on != o) {
132                         super.handleStatement(createStatement((Resource) on, OWL.SAMEAS, o, cn));
133                     }
134                 } else {
135                     final URI pn = (URI) rewrite(p);
136                     if (sn == s && pn == p && on == o && cn == c) {
137                         super.handleStatement(statement);
138                     } else {
139                         super.handleStatement(createStatement(sn, pn, on, cn));
140                     }
141                 }
142             } else if (isSameAs && !s.equals(o)) {
143                 synchronized (this) {
144                     link(s, (Resource) o);
145                 }
146             }
147         }
148 
149         @Override
150         public void endRDF() throws RDFHandlerException {
151             if (!this.firstPass) {
152                 super.endRDF();
153             } else {
154                 normalize();
155                 this.firstPass = false;
156             }
157         }
158 
159         @Override
160         public void close() {
161             super.close();
162             this.table = null; // eagerly release memory
163             this.buffers = null; // eagerly release memory
164         }
165 
166         // LINKING, NORMALIZATION AND REWRITING METHODS
167 
168         private void link(final Resource resource1, final Resource resource2) {
169             final int pointer1 = lookup(resource1, true);
170             final int pointer2 = lookup(resource2, true);
171             final int nextPointer1 = readNext(pointer1);
172             for (int pointer = nextPointer1; pointer != pointer1; pointer = readNext(pointer)) {
173                 if (pointer == pointer2) {
174                     return; // already linked
175                 }
176             }
177             final int nextPointer2 = readNext(pointer2);
178             writeNext(pointer1, nextPointer2);
179             writeNext(pointer2, nextPointer1);
180         }
181 
182         private void normalize() throws RDFHandlerException {
183             final Comparator<Value> comparator = Statements
184                     .valueComparator(ProcessorSmush.this.rankedNamespaces);
185             int numClusters = 0;
186             int numResources = 0;
187             for (int j = 0; j < this.table.length; j += 2) {
188                 final int pointer = this.table[j];
189                 if (pointer == 0 || readNormalized(pointer)) {
190                     continue;
191                 }
192                 final List<Resource> resources = new ArrayList<Resource>();
193                 Resource chosenResource = null;
194                 int chosenPointer = 0;
195                 // int chosenRank = Integer.MAX_VALUE;
196                 for (int p = pointer; p != pointer || chosenPointer == 0; p = readNext(p)) {
197                     final Resource resource = readResource(p);
198                     resources.add(resource);
199                     if (chosenResource == null || comparator.compare(resource, chosenResource) < 0) {
200                         chosenResource = resource;
201                         chosenPointer = p;
202                     }
203                     // if (resource instanceof BNode) {
204                     // if (chosenResource == null
205                     // || chosenResource instanceof BNode
206                     // && resource.stringValue().length() < chosenResource.stringValue()
207                     // .length()) {
208                     // chosenResource = resource;
209                     // chosenPointer = p;
210                     // }
211                     // } else if (resource instanceof URI) {
212                     // final String string = resource.stringValue();
213                     // int rank = Integer.MAX_VALUE;
214                     // for (int i = 0; i < ProcessorSmush.this.rankedNamespaces.length; ++i) {
215                     // if (string.startsWith(ProcessorSmush.this.rankedNamespaces[i])) {
216                     // rank = i;
217                     // break;
218                     // }
219                     // }
220                     // if (!(chosenResource instanceof URI) || rank < chosenRank
221                     // || rank == chosenRank
222                     // && string.length() < chosenResource.stringValue().length()) {
223                     // chosenResource = resource;
224                     // chosenPointer = p;
225                     // chosenRank = rank;
226                     // }
227                     // }
228                 }
229                 ++numClusters;
230                 numResources += resources.size();
231                 int p = pointer;
232                 do {
233                     final int pn = readNext(p);
234                     writeNext(p, chosenPointer);
235                     writeNormalized(p, true);
236                     p = pn;
237                 } while (p != pointer);
238             }
239             if (LOGGER.isInfoEnabled()) {
240                 LOGGER.info(String.format(
241                         "owl:sameAs normalization: %d resource(s), %d cluster(s), "
242                                 + "%.3f collisions/lookup, %dMB buffered ", numResources,
243                         numClusters, (double) (this.numTests - this.numLookups) / this.numLookups,
244                         bufferFor(this.endPointer) * (BUFFER_SIZE / 1024) / 1024));
245             }
246         }
247 
248         private Resource rewrite(final Resource resource) {
249             final int pointer = lookup(resource, false);
250             if (pointer == 0) {
251                 return resource;
252             }
253             final int nextPointer = readNext(pointer);
254             return nextPointer == pointer ? resource : readResource(nextPointer);
255         }
256 
257         private int lookup(final Resource resource, final boolean canAppend) {
258             ++this.numLookups;
259             final int hash = resource.hashCode();
260             int offset = (hash & 0x7FFFFFFF) % (this.table.length / 2) * 2;
261             while (true) {
262                 ++this.numTests;
263                 int pointer = this.table[offset];
264                 if (pointer == 0) {
265                     if (!canAppend) {
266                         return 0;
267                     } else if (this.entries > this.table.length / 4) { // enforce load factor < .5
268                         rehash();
269                         return lookup(resource, canAppend); // repeat after rehashing
270                     }
271                     pointer = append(resource);
272                     this.table[offset] = pointer;
273                     this.table[offset + 1] = hash;
274                     ++this.entries;
275                     return pointer;
276                 }
277                 if (this.table[offset + 1] == hash && matchResource(pointer, resource)) {
278                     return pointer;
279                 }
280                 offset += 2;
281                 if (offset >= this.table.length) {
282                     offset = 0;
283                 }
284             }
285         }
286 
287         private void rehash() {
288             final int newSize = this.table.length + 1;
289             final int newTable[] = new int[2 * newSize];
290             for (int oldOffset = 0; oldOffset < this.table.length; oldOffset += 2) {
291                 final int pointer = this.table[oldOffset];
292                 if (pointer != 0) {
293                     final int hash = this.table[oldOffset + 1];
294                     int newOffset = (hash & 0x7FFFFFFF) % newSize * 2;
295                     while (newTable[newOffset] != 0) {
296                         newOffset += 2;
297                         if (newOffset >= newTable.length) {
298                             newOffset = 0;
299                         }
300                     }
301                     newTable[newOffset] = pointer;
302                     newTable[newOffset + 1] = hash;
303                 }
304             }
305             this.table = newTable;
306         }
307 
308         // POINTER MANAGEMENT METHODS
309 
310         private int pointerFor(final int buffer, final int offset) {
311             return (buffer & 0xFFFF) << 16 | offset + 3 >> 2 & 0xFFFF;
312         }
313 
314         private int bufferFor(final int pointer) {
315             return pointer >> 16 & 0xFFFF;
316         }
317 
318         private int offsetFor(final int pointer) {
319             return (pointer & 0xFFFF) << 2;
320         }
321 
322         // BUFFER MANIPULATION METHODS
323 
324         private int append(final Resource resource) {
325             final String string = resource.stringValue();
326             final int length = string.length();
327             int bufferIndex = bufferFor(this.endPointer);
328             int offset = offsetFor(this.endPointer);
329             final ByteBuffer buffer;
330             if (offset + 6 + length * 3 > BUFFER_SIZE) {
331                 ++bufferIndex;
332                 buffer = ByteBuffer.allocate(BUFFER_SIZE);
333                 this.buffers[bufferIndex] = buffer;
334                 this.endPointer = pointerFor(bufferIndex, 0);
335                 offset = 0;
336             } else {
337                 buffer = this.buffers[bufferIndex];
338             }
339             buffer.putInt(offset, this.endPointer);
340             offset += 4;
341             buffer.putShort(offset,
342                     (short) (length | (resource instanceof BNode ? 0x4000 : 0x0000)));
343             offset += 2;
344             for (int i = 0; i < length; ++i) {
345                 final char ch = string.charAt(i);
346                 if (ch > 0 && ch < 128) {
347                     buffer.put(offset++, (byte) ch);
348                 } else {
349                     buffer.put(offset++, (byte) 0);
350                     buffer.putChar(offset, ch);
351                     offset += 2;
352                 }
353             }
354             final int pointer = this.endPointer;
355             this.endPointer = pointerFor(bufferIndex, offset);
356             return pointer;
357         }
358 
359         private Resource readResource(final int pointer) {
360             final ByteBuffer buffer = this.buffers[bufferFor(pointer)];
361             int offset = offsetFor(pointer) + 4;
362             final int lengthAndFlags = buffer.getShort(offset);
363             final int length = lengthAndFlags & 0x3FFF;
364             final boolean bnode = (lengthAndFlags & 0x4000) != 0;
365             final StringBuilder builder = new StringBuilder();
366             offset += 2;
367             while (builder.length() < length) {
368                 final byte b = buffer.get(offset++);
369                 if (b != 0) {
370                     builder.append((char) b);
371                 } else {
372                     builder.append(buffer.getChar(offset));
373                     offset += 2;
374                 }
375             }
376             final String string = builder.toString();
377             return bnode ? Statements.VALUE_FACTORY.createBNode(string) //
378                     : Statements.VALUE_FACTORY.createURI(string);
379         }
380 
381         private boolean matchResource(final int pointer, final Resource resource) {
382             final String string = resource.stringValue();
383             final int length = string.length();
384             final ByteBuffer buffer = this.buffers[bufferFor(pointer)];
385             int offset = offsetFor(pointer) + 4;
386             final int lengthAndFlags = buffer.getShort(offset);
387             final boolean bnode = (lengthAndFlags & 0x4000) != 0;
388             if (bnode && resource instanceof URI || !bnode && resource instanceof BNode) {
389                 return false;
390             }
391             final int l = lengthAndFlags & 0x3FFF;
392             if (l != length) {
393                 return false;
394             }
395             offset += 2;
396             for (int i = 0; i < length; ++i) {
397                 final char c = string.charAt(i);
398                 if (c > 0 && c < 128) {
399                     if (c != (char) buffer.get(offset++)) {
400                         return false;
401                     }
402                 } else {
403                     if (buffer.get(offset++) != 0) {
404                         return false;
405                     }
406                     if (c != buffer.getChar(offset)) {
407                         return false;
408                     }
409                     offset += 2;
410                 }
411             }
412             return true;
413         }
414 
415         private int readNext(final int pointer) {
416             final ByteBuffer buffer = this.buffers[bufferFor(pointer)];
417             final int offset = offsetFor(pointer);
418             return buffer.getInt(offset);
419         }
420 
421         private void writeNext(final int pointer, final int next) {
422             final ByteBuffer buffer = this.buffers[bufferFor(pointer)];
423             final int offset = offsetFor(pointer);
424             buffer.putInt(offset, next);
425         }
426 
427         private boolean readNormalized(final int pointer) {
428             final ByteBuffer buffer = this.buffers[bufferFor(pointer)];
429             final int offset = offsetFor(pointer) + 4;
430             return (buffer.getShort(offset) & 0x8000) != 0;
431         }
432 
433         private void writeNormalized(final int pointer, final boolean normalized) {
434             final ByteBuffer buffer = this.buffers[bufferFor(pointer)];
435             final int offset = offsetFor(pointer) + 4;
436             buffer.putShort(offset, (short) (buffer.getShort(offset) | (normalized ? 0x8000
437                     : 0x0000)));
438         }
439 
440         private Statement createStatement(final Resource subj, final URI pred, final Value obj,
441                 @Nullable final Resource ctx) {
442             return ctx == null ? Statements.VALUE_FACTORY.createStatement(subj, pred, obj) //
443                     : Statements.VALUE_FACTORY.createStatement(subj, pred, obj, ctx);
444         }
445 
446     }
447 }