1
2
3
4
5
6
7
8
9
10
11
12
13
14 package eu.fbk.rdfpro;
15
16 import java.nio.ByteBuffer;
17 import java.util.ArrayList;
18 import java.util.Comparator;
19 import java.util.List;
20 import java.util.Objects;
21
22 import javax.annotation.Nullable;
23
24 import org.openrdf.model.BNode;
25 import org.openrdf.model.Literal;
26 import org.openrdf.model.Resource;
27 import org.openrdf.model.Statement;
28 import org.openrdf.model.URI;
29 import org.openrdf.model.Value;
30 import org.openrdf.model.vocabulary.OWL;
31 import org.openrdf.rio.RDFHandler;
32 import org.openrdf.rio.RDFHandlerException;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 import eu.fbk.rdfpro.util.Statements;
37
38 final class ProcessorSmush implements RDFProcessor {
39
40 private static final Logger LOGGER = LoggerFactory.getLogger(ProcessorSmush.class);
41
42 private static final int BUFFER_SIZE = 262144;
43
44 private final String[] rankedNamespaces;
45
46 ProcessorSmush(final String... rankedNamespaces) {
47 this.rankedNamespaces = rankedNamespaces.clone();
48 }
49
50 @Override
51 public int getExtraPasses() {
52 return 1;
53 }
54
55 @Override
56 public RDFHandler wrap(final RDFHandler handler) {
57 return new Handler(Objects.requireNonNull(handler));
58 }
59
60 private final class Handler extends AbstractRDFHandlerWrapper {
61
62 private int[] table;
63
64 private int entries;
65
66 private ByteBuffer[] buffers;
67
68 private int endPointer;
69
70 private boolean firstPass;
71
72 private long numLookups;
73
74 private long numTests;
75
76 Handler(final RDFHandler handler) {
77
78 super(handler);
79
80 this.table = new int[1022];
81 this.entries = 0;
82 this.buffers = new ByteBuffer[65536];
83 this.endPointer = pointerFor(0, 4);
84 this.firstPass = true;
85 this.numLookups = 0;
86 this.numTests = 0;
87
88 this.buffers[0] = ByteBuffer.allocate(BUFFER_SIZE);
89 }
90
91 @Override
92 public void startRDF() throws RDFHandlerException {
93 if (!this.firstPass) {
94 super.startRDF();
95 }
96 }
97
98 @Override
99 public void handleComment(final String comment) throws RDFHandlerException {
100 if (!this.firstPass) {
101 super.handleComment(comment);
102 }
103 }
104
105 @Override
106 public void handleNamespace(final String prefix, final String uri)
107 throws RDFHandlerException {
108 if (!this.firstPass) {
109 super.handleNamespace(prefix, uri);
110 }
111 }
112
113 @Override
114 public void handleStatement(final Statement statement) throws RDFHandlerException {
115
116 final Resource s = statement.getSubject();
117 final URI p = statement.getPredicate();
118 final Value o = statement.getObject();
119 final Resource c = statement.getContext();
120
121 final boolean isSameAs = p.equals(OWL.SAMEAS) && o instanceof Resource;
122
123 if (!this.firstPass) {
124 final Resource sn = rewrite(s);
125 final Value on = o instanceof Literal ? o : rewrite((Resource) o);
126 final Resource cn = c == null ? null : rewrite(c);
127 if (isSameAs) {
128 if (sn != s) {
129 super.handleStatement(createStatement(sn, OWL.SAMEAS, s, cn));
130 }
131 if (on != o) {
132 super.handleStatement(createStatement((Resource) on, OWL.SAMEAS, o, cn));
133 }
134 } else {
135 final URI pn = (URI) rewrite(p);
136 if (sn == s && pn == p && on == o && cn == c) {
137 super.handleStatement(statement);
138 } else {
139 super.handleStatement(createStatement(sn, pn, on, cn));
140 }
141 }
142 } else if (isSameAs && !s.equals(o)) {
143 synchronized (this) {
144 link(s, (Resource) o);
145 }
146 }
147 }
148
149 @Override
150 public void endRDF() throws RDFHandlerException {
151 if (!this.firstPass) {
152 super.endRDF();
153 } else {
154 normalize();
155 this.firstPass = false;
156 }
157 }
158
159 @Override
160 public void close() {
161 super.close();
162 this.table = null;
163 this.buffers = null;
164 }
165
166
167
168 private void link(final Resource resource1, final Resource resource2) {
169 final int pointer1 = lookup(resource1, true);
170 final int pointer2 = lookup(resource2, true);
171 final int nextPointer1 = readNext(pointer1);
172 for (int pointer = nextPointer1; pointer != pointer1; pointer = readNext(pointer)) {
173 if (pointer == pointer2) {
174 return;
175 }
176 }
177 final int nextPointer2 = readNext(pointer2);
178 writeNext(pointer1, nextPointer2);
179 writeNext(pointer2, nextPointer1);
180 }
181
182 private void normalize() throws RDFHandlerException {
183 final Comparator<Value> comparator = Statements
184 .valueComparator(ProcessorSmush.this.rankedNamespaces);
185 int numClusters = 0;
186 int numResources = 0;
187 for (int j = 0; j < this.table.length; j += 2) {
188 final int pointer = this.table[j];
189 if (pointer == 0 || readNormalized(pointer)) {
190 continue;
191 }
192 final List<Resource> resources = new ArrayList<Resource>();
193 Resource chosenResource = null;
194 int chosenPointer = 0;
195
196 for (int p = pointer; p != pointer || chosenPointer == 0; p = readNext(p)) {
197 final Resource resource = readResource(p);
198 resources.add(resource);
199 if (chosenResource == null || comparator.compare(resource, chosenResource) < 0) {
200 chosenResource = resource;
201 chosenPointer = p;
202 }
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228 }
229 ++numClusters;
230 numResources += resources.size();
231 int p = pointer;
232 do {
233 final int pn = readNext(p);
234 writeNext(p, chosenPointer);
235 writeNormalized(p, true);
236 p = pn;
237 } while (p != pointer);
238 }
239 if (LOGGER.isInfoEnabled()) {
240 LOGGER.info(String.format(
241 "owl:sameAs normalization: %d resource(s), %d cluster(s), "
242 + "%.3f collisions/lookup, %dMB buffered ", numResources,
243 numClusters, (double) (this.numTests - this.numLookups) / this.numLookups,
244 bufferFor(this.endPointer) * (BUFFER_SIZE / 1024) / 1024));
245 }
246 }
247
248 private Resource rewrite(final Resource resource) {
249 final int pointer = lookup(resource, false);
250 if (pointer == 0) {
251 return resource;
252 }
253 final int nextPointer = readNext(pointer);
254 return nextPointer == pointer ? resource : readResource(nextPointer);
255 }
256
257 private int lookup(final Resource resource, final boolean canAppend) {
258 ++this.numLookups;
259 final int hash = resource.hashCode();
260 int offset = (hash & 0x7FFFFFFF) % (this.table.length / 2) * 2;
261 while (true) {
262 ++this.numTests;
263 int pointer = this.table[offset];
264 if (pointer == 0) {
265 if (!canAppend) {
266 return 0;
267 } else if (this.entries > this.table.length / 4) {
268 rehash();
269 return lookup(resource, canAppend);
270 }
271 pointer = append(resource);
272 this.table[offset] = pointer;
273 this.table[offset + 1] = hash;
274 ++this.entries;
275 return pointer;
276 }
277 if (this.table[offset + 1] == hash && matchResource(pointer, resource)) {
278 return pointer;
279 }
280 offset += 2;
281 if (offset >= this.table.length) {
282 offset = 0;
283 }
284 }
285 }
286
287 private void rehash() {
288 final int newSize = this.table.length + 1;
289 final int newTable[] = new int[2 * newSize];
290 for (int oldOffset = 0; oldOffset < this.table.length; oldOffset += 2) {
291 final int pointer = this.table[oldOffset];
292 if (pointer != 0) {
293 final int hash = this.table[oldOffset + 1];
294 int newOffset = (hash & 0x7FFFFFFF) % newSize * 2;
295 while (newTable[newOffset] != 0) {
296 newOffset += 2;
297 if (newOffset >= newTable.length) {
298 newOffset = 0;
299 }
300 }
301 newTable[newOffset] = pointer;
302 newTable[newOffset + 1] = hash;
303 }
304 }
305 this.table = newTable;
306 }
307
308
309
310 private int pointerFor(final int buffer, final int offset) {
311 return (buffer & 0xFFFF) << 16 | offset + 3 >> 2 & 0xFFFF;
312 }
313
314 private int bufferFor(final int pointer) {
315 return pointer >> 16 & 0xFFFF;
316 }
317
318 private int offsetFor(final int pointer) {
319 return (pointer & 0xFFFF) << 2;
320 }
321
322
323
324 private int append(final Resource resource) {
325 final String string = resource.stringValue();
326 final int length = string.length();
327 int bufferIndex = bufferFor(this.endPointer);
328 int offset = offsetFor(this.endPointer);
329 final ByteBuffer buffer;
330 if (offset + 6 + length * 3 > BUFFER_SIZE) {
331 ++bufferIndex;
332 buffer = ByteBuffer.allocate(BUFFER_SIZE);
333 this.buffers[bufferIndex] = buffer;
334 this.endPointer = pointerFor(bufferIndex, 0);
335 offset = 0;
336 } else {
337 buffer = this.buffers[bufferIndex];
338 }
339 buffer.putInt(offset, this.endPointer);
340 offset += 4;
341 buffer.putShort(offset,
342 (short) (length | (resource instanceof BNode ? 0x4000 : 0x0000)));
343 offset += 2;
344 for (int i = 0; i < length; ++i) {
345 final char ch = string.charAt(i);
346 if (ch > 0 && ch < 128) {
347 buffer.put(offset++, (byte) ch);
348 } else {
349 buffer.put(offset++, (byte) 0);
350 buffer.putChar(offset, ch);
351 offset += 2;
352 }
353 }
354 final int pointer = this.endPointer;
355 this.endPointer = pointerFor(bufferIndex, offset);
356 return pointer;
357 }
358
359 private Resource readResource(final int pointer) {
360 final ByteBuffer buffer = this.buffers[bufferFor(pointer)];
361 int offset = offsetFor(pointer) + 4;
362 final int lengthAndFlags = buffer.getShort(offset);
363 final int length = lengthAndFlags & 0x3FFF;
364 final boolean bnode = (lengthAndFlags & 0x4000) != 0;
365 final StringBuilder builder = new StringBuilder();
366 offset += 2;
367 while (builder.length() < length) {
368 final byte b = buffer.get(offset++);
369 if (b != 0) {
370 builder.append((char) b);
371 } else {
372 builder.append(buffer.getChar(offset));
373 offset += 2;
374 }
375 }
376 final String string = builder.toString();
377 return bnode ? Statements.VALUE_FACTORY.createBNode(string)
378 : Statements.VALUE_FACTORY.createURI(string);
379 }
380
381 private boolean matchResource(final int pointer, final Resource resource) {
382 final String string = resource.stringValue();
383 final int length = string.length();
384 final ByteBuffer buffer = this.buffers[bufferFor(pointer)];
385 int offset = offsetFor(pointer) + 4;
386 final int lengthAndFlags = buffer.getShort(offset);
387 final boolean bnode = (lengthAndFlags & 0x4000) != 0;
388 if (bnode && resource instanceof URI || !bnode && resource instanceof BNode) {
389 return false;
390 }
391 final int l = lengthAndFlags & 0x3FFF;
392 if (l != length) {
393 return false;
394 }
395 offset += 2;
396 for (int i = 0; i < length; ++i) {
397 final char c = string.charAt(i);
398 if (c > 0 && c < 128) {
399 if (c != (char) buffer.get(offset++)) {
400 return false;
401 }
402 } else {
403 if (buffer.get(offset++) != 0) {
404 return false;
405 }
406 if (c != buffer.getChar(offset)) {
407 return false;
408 }
409 offset += 2;
410 }
411 }
412 return true;
413 }
414
415 private int readNext(final int pointer) {
416 final ByteBuffer buffer = this.buffers[bufferFor(pointer)];
417 final int offset = offsetFor(pointer);
418 return buffer.getInt(offset);
419 }
420
421 private void writeNext(final int pointer, final int next) {
422 final ByteBuffer buffer = this.buffers[bufferFor(pointer)];
423 final int offset = offsetFor(pointer);
424 buffer.putInt(offset, next);
425 }
426
427 private boolean readNormalized(final int pointer) {
428 final ByteBuffer buffer = this.buffers[bufferFor(pointer)];
429 final int offset = offsetFor(pointer) + 4;
430 return (buffer.getShort(offset) & 0x8000) != 0;
431 }
432
433 private void writeNormalized(final int pointer, final boolean normalized) {
434 final ByteBuffer buffer = this.buffers[bufferFor(pointer)];
435 final int offset = offsetFor(pointer) + 4;
436 buffer.putShort(offset, (short) (buffer.getShort(offset) | (normalized ? 0x8000
437 : 0x0000)));
438 }
439
440 private Statement createStatement(final Resource subj, final URI pred, final Value obj,
441 @Nullable final Resource ctx) {
442 return ctx == null ? Statements.VALUE_FACTORY.createStatement(subj, pred, obj)
443 : Statements.VALUE_FACTORY.createStatement(subj, pred, obj, ctx);
444 }
445
446 }
447 }