1   /*
2    * RDFpro - An extensible tool for building stream-oriented RDF processing libraries.
3    * 
4    * Written in 2014 by Francesco Corcoglioniti with support by Marco Amadori, Michele Mostarda,
5    * Alessio Palmero Aprosio and Marco Rospocher. Contact info on http://rdfpro.fbk.eu/
6    * 
7    * To the extent possible under law, the authors have dedicated all copyright and related and
8    * neighboring rights to this software to the public domain worldwide. This software is
9    * distributed without any warranty.
10   * 
11   * You should have received a copy of the CC0 Public Domain Dedication along with this software.
12   * If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.
13   */
14  package eu.fbk.rdfpro;
15  
16  import java.util.AbstractCollection;
17  import java.util.Iterator;
18  import java.util.List;
19  import java.util.NoSuchElementException;
20  import java.util.Objects;
21  import java.util.function.Supplier;
22  
23  import javax.annotation.Nullable;
24  
25  import com.google.common.base.Throwables;
26  import com.google.common.collect.Lists;
27  
28  import org.openrdf.model.Resource;
29  import org.openrdf.model.Statement;
30  import org.openrdf.model.URI;
31  import org.openrdf.model.Value;
32  import org.openrdf.model.impl.ContextStatementImpl;
33  import org.openrdf.rio.RDFHandler;
34  import org.openrdf.rio.RDFHandlerException;
35  import org.slf4j.Logger;
36  import org.slf4j.LoggerFactory;
37  
38  import eu.fbk.rdfpro.util.QuadModel;
39  import eu.fbk.rdfpro.util.Tracker;
40  
41  /**
42   * A statement buffer behaving as a list with duplicates where statements can only be added and
43   * retrieved, but never removed.
44   */
45  final class StatementBuffer extends AbstractCollection<Statement> implements Supplier<RDFHandler> {
46  
47      private static final Logger LOGGER = LoggerFactory.getLogger(StatementBuffer.class);
48  
49      private static final int BLOCK_SIZE = 4 * 1024; // 1K quads, 4K values, 16K bytes
50  
51      private final List<Value[]> blocks;
52  
53      private int offset;
54  
55      @Nullable
56      private transient int[] buckets;
57  
58      @Nullable
59      private Tracker addTracker;
60  
61      private int addTrackCount;
62  
63      public StatementBuffer() {
64          this.blocks = Lists.newArrayList();
65          this.offset = BLOCK_SIZE;
66          this.addTracker = null;
67          this.addTrackCount = 0;
68      }
69  
70      @Override
71      public boolean isEmpty() {
72          return this.blocks.isEmpty();
73      }
74  
75      @Override
76      public int size() {
77          return this.blocks.isEmpty() ? 0
78                  : ((this.blocks.size() - 1) * BLOCK_SIZE + this.offset) / 4;
79      }
80  
81      public boolean contains(final Resource subj, final URI pred, final Value obj,
82              @Nullable final Resource ctx) {
83  
84          // Retrieve (build if necessary) the hash index
85          final int[] buckets = getBuckets();
86  
87          // Lookup the statement using the index
88          final int hash = hash(subj, pred, obj, ctx);
89          int slot = (hash & 0x7FFFFFFF) % buckets.length;
90          while (true) {
91              if (buckets[slot] == 0) {
92                  return false;
93              } else {
94                  final int pointer = buckets[slot] - 4;
95                  final int thisIndex = pointer / BLOCK_SIZE;
96                  final int offset = pointer % BLOCK_SIZE;
97                  final Value[] block = this.blocks.get(thisIndex);
98                  if (block[offset].equals(subj) && block[offset + 1].equals(pred)
99                          && block[offset + 2].equals(obj) && Objects.equals(block[offset + 3], ctx)) {
100                     return true;
101                 }
102             }
103             slot = (slot + 1) % buckets.length;
104         }
105     }
106 
107     @Override
108     public boolean contains(final Object object) {
109 
110         if (!(object instanceof Statement)) {
111             return false;
112         }
113 
114         final Statement stmt = (Statement) object;
115         return contains(stmt.getSubject(), stmt.getPredicate(), stmt.getObject(),
116                 stmt.getContext());
117     }
118 
119     @Override
120     public Iterator<Statement> iterator() {
121         return new Iterator<Statement>() {
122 
123             private Value[] block = StatementBuffer.this.blocks.isEmpty() ? null
124                     : StatementBuffer.this.blocks.get(0);
125 
126             private int index = 0;
127 
128             private int offset = 0;
129 
130             private int maxOffset = StatementBuffer.this.blocks.size() > 1 ? BLOCK_SIZE
131                     : StatementBuffer.this.offset;
132 
133             @Override
134             public boolean hasNext() {
135                 return this.block != null;
136             }
137 
138             @Override
139             public Statement next() {
140 
141                 // Fail if there are no more elements to retrieve
142                 if (this.block == null) {
143                     throw new NoSuchElementException();
144                 }
145 
146                 // Otherwise, retrieve the SPOC components of the next statement to return
147                 final Resource subj = (Resource) this.block[this.offset++];
148                 final URI pred = (URI) this.block[this.offset++];
149                 final Value obj = this.block[this.offset++];
150                 final Resource ctx = (Resource) this.block[this.offset++];
151 
152                 // Update index / offset / block variables; block set to null if iterator exhaust
153                 if (this.offset >= this.maxOffset) {
154                     ++this.index;
155                     if (this.index < StatementBuffer.this.blocks.size()) {
156                         this.block = StatementBuffer.this.blocks.get(this.index);
157                         this.offset = 0;
158                         this.maxOffset = this.index < StatementBuffer.this.blocks.size() - 1 ? BLOCK_SIZE
159                                 : StatementBuffer.this.offset;
160                     } else {
161                         this.block = null;
162                     }
163                 }
164 
165                 // Return the statement
166                 return new ContextStatementImpl(subj, pred, obj, ctx);
167             }
168 
169         };
170     }
171 
172     public int toModel(final QuadModel model, final boolean add,
173             @Nullable final RDFHandler callback) {
174 
175         // Create a tracker
176         final Tracker tracker = new Tracker(LOGGER, null, null, "%d triples "
177                 + (add ? "inserted" : "deleted") + " (%d tr/s, %d tr/s avg)");
178         tracker.start();
179 
180         try {
181             // Notify the callback handler, if any
182             if (callback != null) {
183                 callback.startRDF();
184             }
185 
186             // Iterate over the statements in the buffer
187             int numChanges = 0;
188             for (int index = 0; index < this.blocks.size(); ++index) {
189                 final Value[] block = this.blocks.get(index);
190                 final int maxOffset = index < this.blocks.size() - 1 ? BLOCK_SIZE : this.offset;
191                 for (int offset = 0; offset < maxOffset; offset += 4) {
192 
193                     // Retrieve SPOC components of current statement
194                     final Resource subj = (Resource) block[offset];
195                     final URI pred = (URI) block[offset + 1];
196                     final Value obj = block[offset + 2];
197                     final Resource ctx = (Resource) block[offset + 3];
198 
199                     // Either add or remove the statement to/from the model
200                     boolean modified;
201                     if (add) {
202                         // if (callback != null) {
203                         // subj = model.normalize(subj);
204                         // pred = model.normalize(pred);
205                         // obj = model.normalize(obj);
206                         // ctx = model.normalize(ctx);
207                         // }
208                         modified = model.add(subj, pred, obj, ctx);
209                     } else {
210                         modified = model.remove(subj, pred, obj, ctx);
211                     }
212 
213                     // If the model was modified as a result of the operation, increment changes
214                     // counter and notify the callback, if any
215                     if (modified) {
216                         ++numChanges;
217                         tracker.increment();
218                         if (callback != null) {
219                             callback.handleStatement(new ContextStatementImpl(subj, pred, obj, ctx));
220                         }
221                     }
222                 }
223             }
224 
225             // Notify the callback handler, if any
226             if (callback != null) {
227                 callback.endRDF();
228             }
229 
230             // Return the number of statements actually added to or deleted from the model
231             return numChanges;
232 
233         } catch (final RDFHandlerException ex) {
234             // Wrap and propagate
235             throw Throwables.propagate(ex);
236 
237         } finally {
238             // Stop tracking
239             tracker.end();
240         }
241     }
242 
243     public void toHandler(final RDFHandler handler) throws RDFHandlerException {
244 
245         // Forward statements to supplied handler, calling also startRDF and endRDF
246         handler.startRDF();
247         for (int index = 0; index < this.blocks.size(); ++index) {
248             final Value[] block = this.blocks.get(index);
249             final int maxOffset = index < this.blocks.size() - 1 ? BLOCK_SIZE : this.offset;
250             for (int offset = 0; offset < maxOffset; offset += 4) {
251                 handler.handleStatement(new ContextStatementImpl((Resource) block[offset],
252                         (URI) block[offset + 1], block[offset + 2], (Resource) block[offset + 3]));
253             }
254         }
255         handler.endRDF();
256     }
257 
258     public synchronized boolean add(final Resource subj, final URI pred, final Value obj,
259             @Nullable final Resource ctx) {
260 
261         // Invalidate hash index
262         this.buckets = null;
263 
264         // Retrieve the block where to add the statement; add a new block if necessary
265         Value[] block;
266         if (this.offset < BLOCK_SIZE) {
267             block = this.blocks.get(this.blocks.size() - 1);
268         } else {
269             block = new Value[BLOCK_SIZE];
270             this.blocks.add(block);
271             this.offset = 0;
272         }
273 
274         // Store the statement in the block and increment the offset in the block
275         block[this.offset] = subj;
276         block[this.offset + 1] = pred;
277         block[this.offset + 2] = obj;
278         block[this.offset + 3] = ctx;
279         this.offset += 4;
280 
281         // Update tracker if available
282         if (this.addTracker != null) {
283             this.addTracker.increment();
284         }
285 
286         // Always return true (buffer always modified)
287         return true;
288     }
289 
290     @Override
291     public boolean add(final Statement stmt) {
292         return add(stmt.getSubject(), stmt.getPredicate(), stmt.getObject(), stmt.getContext());
293     }
294 
295     @Override
296     public RDFHandler get() {
297         return new Appender();
298     }
299 
300     private int[] getBuckets() {
301 
302         // Create hash index at first access. Elements of the hash table are pointers to values in
303         // the combined block arrays (pointer incremeted by 4 to avoid pointer = 0)
304         if (this.buckets == null) {
305             final int size = size();
306             final int[] buckets = new int[Math.max(4, Integer.highestOneBit(size) * 4) - 1];
307             int pointer = 4; // never use 0
308             for (int index = 0; index < this.blocks.size(); ++index) {
309                 final Value[] block = this.blocks.get(index);
310                 final int maxOffset = index < this.blocks.size() - 1 ? BLOCK_SIZE : this.offset;
311                 for (int offset = 0; offset < maxOffset; offset += 4) {
312                     final int hash = hash(block[offset], block[offset + 1], block[offset + 2],
313                             block[offset + 3]);
314                     int slot = (hash & 0x7FFFFFFF) % buckets.length;
315                     while (buckets[slot] != 0) {
316                         slot = (slot + 1) % buckets.length;
317                     }
318                     buckets[slot] = pointer;
319                     pointer += 4;
320                 }
321             }
322             this.buckets = buckets;
323         }
324         return this.buckets;
325     }
326 
327     private synchronized void startAddTracker() {
328         if (this.addTracker == null) {
329             this.addTracker = new Tracker(LOGGER, null, null,
330                     "%d triples buffered (%d tr/s, %d tr/s avg)");
331             this.addTracker.start();
332         }
333         ++this.addTrackCount;
334     }
335 
336     private synchronized void stopAddTracker() {
337         --this.addTrackCount;
338         if (this.addTrackCount <= 0 && this.addTracker != null) {
339             this.addTracker.end();
340             this.addTracker = null;
341         }
342     }
343 
344     private synchronized void append(final Value[] block, final int blockLength) {
345 
346         // Invalidate hash index
347         this.buckets = null;
348 
349         // Handle two cases
350         if (blockLength == block.length) {
351 
352             // (1) A full block is being added. Don't copy, just insert the block in the list
353             if (this.offset >= BLOCK_SIZE) {
354                 this.blocks.add(block);
355             } else {
356                 final Value[] last = this.blocks.remove(this.blocks.size() - 1);
357                 this.blocks.add(block);
358                 this.blocks.add(last);
359             }
360 
361         } else {
362 
363             // (2) A partial block is being added. Copy the content of the block specified
364             // into buffer blocks, possibly allocating new blocks if necessary.
365             int offset = 0;
366             while (offset < blockLength) {
367                 Value[] thisBlock;
368                 if (this.offset < BLOCK_SIZE) {
369                     thisBlock = this.blocks.get(this.blocks.size() - 1);
370                 } else {
371                     thisBlock = new Value[BLOCK_SIZE];
372                     this.blocks.add(thisBlock);
373                     this.offset = 0;
374                 }
375                 final int length = Math.min(blockLength - offset, BLOCK_SIZE - this.offset);
376                 System.arraycopy(block, offset, thisBlock, this.offset, length);
377                 offset += length;
378                 this.offset += length;
379             }
380         }
381 
382         // Update tracker if available
383         if (this.addTracker != null) {
384             this.addTracker.add(blockLength >> 2);
385         }
386     }
387 
388     private static int hash(final Value subj, final Value pred, final Value obj, final Value ctx) {
389         // Return an hash code depending on all four SPOC components
390         return 6661 * subj.hashCode() + 961 * pred.hashCode() + 31 * obj.hashCode()
391                 + (ctx == null ? 0 : ctx.hashCode());
392     }
393 
394     private final class Appender extends AbstractRDFHandler {
395 
396         private Value[] block;
397 
398         private int offset;
399 
400         private Appender() {
401             this.block = null;
402             this.offset = 0;
403         }
404 
405         @Override
406         public void startRDF() {
407 
408             // Allocate a local block
409             this.block = new Value[BLOCK_SIZE];
410             this.offset = 0;
411 
412             // Start tracking if necessary
413             startAddTracker();
414         }
415 
416         @Override
417         public void handleStatement(final Statement stmt) {
418 
419             // Extract components
420             final Resource subj = stmt.getSubject();
421             final URI pred = stmt.getPredicate();
422             final Value obj = stmt.getObject();
423             final Resource ctx = stmt.getContext();
424 
425             // Append the SPOC components to the local block
426             this.block[this.offset++] = subj;
427             this.block[this.offset++] = pred;
428             this.block[this.offset++] = obj;
429             this.block[this.offset++] = ctx;
430 
431             // If the local block is full, copy its content to the buffer (this requires
432             // synchronization)
433             if (this.offset == this.block.length) {
434                 append(this.block, this.block.length);
435                 this.block = new Value[BLOCK_SIZE];
436                 this.offset = 0;
437             }
438 
439         }
440 
441         @Override
442         public void endRDF() {
443 
444             // Flush the content of the local block to the buffer, if necessary, and release
445             // the block to free memory
446             if (this.offset > 0) {
447                 append(this.block, this.offset);
448                 this.offset = 0;
449             }
450             this.block = null;
451 
452             // Stop tracking
453             stopAddTracker();
454         }
455 
456     }
457 
458 }