1
2
3
4
5
6
7
8
9
10
11
12
13
14 package eu.fbk.rdfpro;
15
16 import java.util.AbstractCollection;
17 import java.util.Iterator;
18 import java.util.List;
19 import java.util.NoSuchElementException;
20 import java.util.Objects;
21 import java.util.function.Supplier;
22
23 import javax.annotation.Nullable;
24
25 import com.google.common.base.Throwables;
26 import com.google.common.collect.Lists;
27
28 import org.openrdf.model.Resource;
29 import org.openrdf.model.Statement;
30 import org.openrdf.model.URI;
31 import org.openrdf.model.Value;
32 import org.openrdf.model.impl.ContextStatementImpl;
33 import org.openrdf.rio.RDFHandler;
34 import org.openrdf.rio.RDFHandlerException;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 import eu.fbk.rdfpro.util.QuadModel;
39 import eu.fbk.rdfpro.util.Tracker;
40
41
42
43
44
45 final class StatementBuffer extends AbstractCollection<Statement> implements Supplier<RDFHandler> {
46
47 private static final Logger LOGGER = LoggerFactory.getLogger(StatementBuffer.class);
48
49 private static final int BLOCK_SIZE = 4 * 1024;
50
51 private final List<Value[]> blocks;
52
53 private int offset;
54
55 @Nullable
56 private transient int[] buckets;
57
58 @Nullable
59 private Tracker addTracker;
60
61 private int addTrackCount;
62
63 public StatementBuffer() {
64 this.blocks = Lists.newArrayList();
65 this.offset = BLOCK_SIZE;
66 this.addTracker = null;
67 this.addTrackCount = 0;
68 }
69
70 @Override
71 public boolean isEmpty() {
72 return this.blocks.isEmpty();
73 }
74
75 @Override
76 public int size() {
77 return this.blocks.isEmpty() ? 0
78 : ((this.blocks.size() - 1) * BLOCK_SIZE + this.offset) / 4;
79 }
80
81 public boolean contains(final Resource subj, final URI pred, final Value obj,
82 @Nullable final Resource ctx) {
83
84
85 final int[] buckets = getBuckets();
86
87
88 final int hash = hash(subj, pred, obj, ctx);
89 int slot = (hash & 0x7FFFFFFF) % buckets.length;
90 while (true) {
91 if (buckets[slot] == 0) {
92 return false;
93 } else {
94 final int pointer = buckets[slot] - 4;
95 final int thisIndex = pointer / BLOCK_SIZE;
96 final int offset = pointer % BLOCK_SIZE;
97 final Value[] block = this.blocks.get(thisIndex);
98 if (block[offset].equals(subj) && block[offset + 1].equals(pred)
99 && block[offset + 2].equals(obj) && Objects.equals(block[offset + 3], ctx)) {
100 return true;
101 }
102 }
103 slot = (slot + 1) % buckets.length;
104 }
105 }
106
107 @Override
108 public boolean contains(final Object object) {
109
110 if (!(object instanceof Statement)) {
111 return false;
112 }
113
114 final Statement stmt = (Statement) object;
115 return contains(stmt.getSubject(), stmt.getPredicate(), stmt.getObject(),
116 stmt.getContext());
117 }
118
119 @Override
120 public Iterator<Statement> iterator() {
121 return new Iterator<Statement>() {
122
123 private Value[] block = StatementBuffer.this.blocks.isEmpty() ? null
124 : StatementBuffer.this.blocks.get(0);
125
126 private int index = 0;
127
128 private int offset = 0;
129
130 private int maxOffset = StatementBuffer.this.blocks.size() > 1 ? BLOCK_SIZE
131 : StatementBuffer.this.offset;
132
133 @Override
134 public boolean hasNext() {
135 return this.block != null;
136 }
137
138 @Override
139 public Statement next() {
140
141
142 if (this.block == null) {
143 throw new NoSuchElementException();
144 }
145
146
147 final Resource subj = (Resource) this.block[this.offset++];
148 final URI pred = (URI) this.block[this.offset++];
149 final Value obj = this.block[this.offset++];
150 final Resource ctx = (Resource) this.block[this.offset++];
151
152
153 if (this.offset >= this.maxOffset) {
154 ++this.index;
155 if (this.index < StatementBuffer.this.blocks.size()) {
156 this.block = StatementBuffer.this.blocks.get(this.index);
157 this.offset = 0;
158 this.maxOffset = this.index < StatementBuffer.this.blocks.size() - 1 ? BLOCK_SIZE
159 : StatementBuffer.this.offset;
160 } else {
161 this.block = null;
162 }
163 }
164
165
166 return new ContextStatementImpl(subj, pred, obj, ctx);
167 }
168
169 };
170 }
171
172 public int toModel(final QuadModel model, final boolean add,
173 @Nullable final RDFHandler callback) {
174
175
176 final Tracker tracker = new Tracker(LOGGER, null, null, "%d triples "
177 + (add ? "inserted" : "deleted") + " (%d tr/s, %d tr/s avg)");
178 tracker.start();
179
180 try {
181
182 if (callback != null) {
183 callback.startRDF();
184 }
185
186
187 int numChanges = 0;
188 for (int index = 0; index < this.blocks.size(); ++index) {
189 final Value[] block = this.blocks.get(index);
190 final int maxOffset = index < this.blocks.size() - 1 ? BLOCK_SIZE : this.offset;
191 for (int offset = 0; offset < maxOffset; offset += 4) {
192
193
194 final Resource subj = (Resource) block[offset];
195 final URI pred = (URI) block[offset + 1];
196 final Value obj = block[offset + 2];
197 final Resource ctx = (Resource) block[offset + 3];
198
199
200 boolean modified;
201 if (add) {
202
203
204
205
206
207
208 modified = model.add(subj, pred, obj, ctx);
209 } else {
210 modified = model.remove(subj, pred, obj, ctx);
211 }
212
213
214
215 if (modified) {
216 ++numChanges;
217 tracker.increment();
218 if (callback != null) {
219 callback.handleStatement(new ContextStatementImpl(subj, pred, obj, ctx));
220 }
221 }
222 }
223 }
224
225
226 if (callback != null) {
227 callback.endRDF();
228 }
229
230
231 return numChanges;
232
233 } catch (final RDFHandlerException ex) {
234
235 throw Throwables.propagate(ex);
236
237 } finally {
238
239 tracker.end();
240 }
241 }
242
243 public void toHandler(final RDFHandler handler) throws RDFHandlerException {
244
245
246 handler.startRDF();
247 for (int index = 0; index < this.blocks.size(); ++index) {
248 final Value[] block = this.blocks.get(index);
249 final int maxOffset = index < this.blocks.size() - 1 ? BLOCK_SIZE : this.offset;
250 for (int offset = 0; offset < maxOffset; offset += 4) {
251 handler.handleStatement(new ContextStatementImpl((Resource) block[offset],
252 (URI) block[offset + 1], block[offset + 2], (Resource) block[offset + 3]));
253 }
254 }
255 handler.endRDF();
256 }
257
258 public synchronized boolean add(final Resource subj, final URI pred, final Value obj,
259 @Nullable final Resource ctx) {
260
261
262 this.buckets = null;
263
264
265 Value[] block;
266 if (this.offset < BLOCK_SIZE) {
267 block = this.blocks.get(this.blocks.size() - 1);
268 } else {
269 block = new Value[BLOCK_SIZE];
270 this.blocks.add(block);
271 this.offset = 0;
272 }
273
274
275 block[this.offset] = subj;
276 block[this.offset + 1] = pred;
277 block[this.offset + 2] = obj;
278 block[this.offset + 3] = ctx;
279 this.offset += 4;
280
281
282 if (this.addTracker != null) {
283 this.addTracker.increment();
284 }
285
286
287 return true;
288 }
289
290 @Override
291 public boolean add(final Statement stmt) {
292 return add(stmt.getSubject(), stmt.getPredicate(), stmt.getObject(), stmt.getContext());
293 }
294
295 @Override
296 public RDFHandler get() {
297 return new Appender();
298 }
299
300 private int[] getBuckets() {
301
302
303
304 if (this.buckets == null) {
305 final int size = size();
306 final int[] buckets = new int[Math.max(4, Integer.highestOneBit(size) * 4) - 1];
307 int pointer = 4;
308 for (int index = 0; index < this.blocks.size(); ++index) {
309 final Value[] block = this.blocks.get(index);
310 final int maxOffset = index < this.blocks.size() - 1 ? BLOCK_SIZE : this.offset;
311 for (int offset = 0; offset < maxOffset; offset += 4) {
312 final int hash = hash(block[offset], block[offset + 1], block[offset + 2],
313 block[offset + 3]);
314 int slot = (hash & 0x7FFFFFFF) % buckets.length;
315 while (buckets[slot] != 0) {
316 slot = (slot + 1) % buckets.length;
317 }
318 buckets[slot] = pointer;
319 pointer += 4;
320 }
321 }
322 this.buckets = buckets;
323 }
324 return this.buckets;
325 }
326
327 private synchronized void startAddTracker() {
328 if (this.addTracker == null) {
329 this.addTracker = new Tracker(LOGGER, null, null,
330 "%d triples buffered (%d tr/s, %d tr/s avg)");
331 this.addTracker.start();
332 }
333 ++this.addTrackCount;
334 }
335
336 private synchronized void stopAddTracker() {
337 --this.addTrackCount;
338 if (this.addTrackCount <= 0 && this.addTracker != null) {
339 this.addTracker.end();
340 this.addTracker = null;
341 }
342 }
343
344 private synchronized void append(final Value[] block, final int blockLength) {
345
346
347 this.buckets = null;
348
349
350 if (blockLength == block.length) {
351
352
353 if (this.offset >= BLOCK_SIZE) {
354 this.blocks.add(block);
355 } else {
356 final Value[] last = this.blocks.remove(this.blocks.size() - 1);
357 this.blocks.add(block);
358 this.blocks.add(last);
359 }
360
361 } else {
362
363
364
365 int offset = 0;
366 while (offset < blockLength) {
367 Value[] thisBlock;
368 if (this.offset < BLOCK_SIZE) {
369 thisBlock = this.blocks.get(this.blocks.size() - 1);
370 } else {
371 thisBlock = new Value[BLOCK_SIZE];
372 this.blocks.add(thisBlock);
373 this.offset = 0;
374 }
375 final int length = Math.min(blockLength - offset, BLOCK_SIZE - this.offset);
376 System.arraycopy(block, offset, thisBlock, this.offset, length);
377 offset += length;
378 this.offset += length;
379 }
380 }
381
382
383 if (this.addTracker != null) {
384 this.addTracker.add(blockLength >> 2);
385 }
386 }
387
388 private static int hash(final Value subj, final Value pred, final Value obj, final Value ctx) {
389
390 return 6661 * subj.hashCode() + 961 * pred.hashCode() + 31 * obj.hashCode()
391 + (ctx == null ? 0 : ctx.hashCode());
392 }
393
394 private final class Appender extends AbstractRDFHandler {
395
396 private Value[] block;
397
398 private int offset;
399
400 private Appender() {
401 this.block = null;
402 this.offset = 0;
403 }
404
405 @Override
406 public void startRDF() {
407
408
409 this.block = new Value[BLOCK_SIZE];
410 this.offset = 0;
411
412
413 startAddTracker();
414 }
415
416 @Override
417 public void handleStatement(final Statement stmt) {
418
419
420 final Resource subj = stmt.getSubject();
421 final URI pred = stmt.getPredicate();
422 final Value obj = stmt.getObject();
423 final Resource ctx = stmt.getContext();
424
425
426 this.block[this.offset++] = subj;
427 this.block[this.offset++] = pred;
428 this.block[this.offset++] = obj;
429 this.block[this.offset++] = ctx;
430
431
432
433 if (this.offset == this.block.length) {
434 append(this.block, this.block.length);
435 this.block = new Value[BLOCK_SIZE];
436 this.offset = 0;
437 }
438
439 }
440
441 @Override
442 public void endRDF() {
443
444
445
446 if (this.offset > 0) {
447 append(this.block, this.offset);
448 this.offset = 0;
449 }
450 this.block = null;
451
452
453 stopAddTracker();
454 }
455
456 }
457
458 }