1 /* 2 * RDFpro - An extensible tool for building stream-oriented RDF processing libraries. 3 * 4 * Written in 2014 by Francesco Corcoglioniti with support by Marco Amadori, Michele Mostarda, 5 * Alessio Palmero Aprosio and Marco Rospocher. Contact info on http://rdfpro.fbk.eu/ 6 * 7 * To the extent possible under law, the authors have dedicated all copyright and related and 8 * neighboring rights to this software to the public domain worldwide. This software is 9 * distributed without any warranty. 10 * 11 * You should have received a copy of the CC0 Public Domain Dedication along with this software. 12 * If not, see <http://creativecommons.org/publicdomain/zero/1.0/>. 13 */ 14 package eu.fbk.rdfpro; 15 16 import java.util.ArrayList; 17 import java.util.Iterator; 18 import java.util.List; 19 import java.util.Objects; 20 import java.util.Spliterator; 21 import java.util.Spliterators; 22 import java.util.concurrent.ArrayBlockingQueue; 23 import java.util.concurrent.BlockingQueue; 24 import java.util.function.Consumer; 25 import java.util.stream.Stream; 26 import java.util.stream.StreamSupport; 27 28 import org.openrdf.model.Statement; 29 import org.openrdf.rio.RDFHandler; 30 import org.openrdf.rio.RDFHandlerException; 31 32 import eu.fbk.rdfpro.util.Environment; 33 34 /** 35 * A source of RDF data. 36 * <p> 37 * An {@code RDFSource} provides repeatable access to RDF data consisting of RDF statements but 38 * also namespace declarations and comments. Source data is assumed immutable and can be accessed 39 * multiple times in different ways: 40 * </p> 41 * <ul> 42 * <li>calling {@link #emit(RDFHandler, int)} that allows full access to all the kinds of data 43 * items (statements, namespaces, comments) and allow for optimizing multiple passes over source 44 * data;</li> 45 * <li>calling methods of the {@link Iterable} interface, i.e., {@link #iterator()} and 46 * {@link #forEach(Consumer)}; they allow access only to statements and may not be as efficient as 47 * {@code emit()}.</li> 48 * <li>obtaining a {@link Stream} using methods {@link #stream()} and {@link #parallelStream()}, 49 * as can be done on Java {@code Collection}s.</li> 50 * </ul> 51 * <p> 52 * In order to implement this interface it is enough to implement method 53 * {@link #emit(RDFHandler, int)}, as default implementations are provided for the remaining 54 * methods. However, consider overriding also methods {@link #spliterator()} (and possibly 55 * {@link #iterator()}) in case you have access to a better {@code Splititerator} implementation, 56 * as the supplied one requires to launch a background thread performing 57 * {@link #emit(RDFHandler, int)} (this is not necessarily bad, as it decouples data extraction 58 * from data consumption); in general it is not necessary to override other methods. 59 * </p> 60 */ 61 public interface RDFSource extends Iterable<Statement> { 62 63 /** 64 * Emits the RDF data of the source to the {@code RDFHandler} supplied, performing the 65 * requested number of passes. For each pass, the {@code handler} method {@code startRDF()} is 66 * called, followed by possibly concurrent invocations to {@code handleXXX()} methods and 67 * finally an invocation to {@code endRDF()}. Any exception causes this process to be 68 * interrupted. In any case, both after the successful completion of the requested passes or 69 * after a failure, method {@link AutoCloseable#close()} is called on the {@code handler} in 70 * case it implements interface {@link AutoCloseable} (or a sub-interface), thus giving it the 71 * possibility to release allocated resources. 72 * 73 * @param handler 74 * the handler where to emit RDF data (statements, comments, namespaces) 75 * @param passes 76 * the number of passes to perform on source data 77 * @throws RDFSourceException 78 * in case of specific errors in retrieving data from this source 79 * @throws RDFHandlerException 80 * in case of specific errors in the invoked {@link RDFHandler} 81 */ 82 void emit(RDFHandler handler, int passes) throws RDFSourceException, RDFHandlerException; 83 84 /** 85 * {@inheritDoc} This default implementation delegates to {@link #spliterator()}. This 86 * baseline implementation is as good as the underlying {@code Spliterator} (wrapping is not 87 * expensive); it should be overridden if a more efficient can be produced. 88 */ 89 @Override 90 default Iterator<Statement> iterator() { 91 return Spliterators.iterator(spliterator()); 92 } 93 94 /** 95 * {@inheritDoc} This default implementation either delegates to {@link #forEach(Consumer)}, 96 * in case {@link Spliterator#forEachRemaining(Consumer)} is immediately called (more 97 * efficient), or run {@link #emit(RDFHandler, int)} in a separate thread, populating a queue 98 * from where the returned {@code Spliterator} retrieves its elements. This baseline 99 * implementation should be overridden whenever possible. 100 */ 101 @Override 102 default Spliterator<Statement> spliterator() { 103 return new Spliterator<Statement>() { 104 105 private BlockingQueue<Object> queue; 106 107 private boolean done; 108 109 @Override 110 public long estimateSize() { 111 return Long.MAX_VALUE; // size unknown 112 } 113 114 @Override 115 public int characteristics() { 116 return Spliterator.NONNULL | Spliterator.IMMUTABLE; // no other guarantees 117 } 118 119 @Override 120 public void forEachRemaining(final Consumer<? super Statement> action) { 121 Objects.requireNonNull(action); 122 if (this.queue != null) { 123 while (tryAdvance(action)) { 124 } 125 } else if (!this.done) { 126 forEach(action); 127 this.done = true; 128 } 129 } 130 131 @Override 132 public boolean tryAdvance(final Consumer<? super Statement> action) { 133 if (this.queue != null || triggerEmit()) { 134 try { 135 final Object object = this.queue.take(); 136 if (object instanceof Statement) { 137 action.accept((Statement) object); 138 return true; 139 } else if (object instanceof Throwable) { 140 throw (Throwable) object; 141 } else { 142 this.queue = null; 143 this.done = true; 144 } 145 } catch (final Throwable ex) { 146 this.queue = null; // will ultimately kill the emit thread 147 this.done = true; 148 if (ex instanceof RuntimeException) { 149 throw (RuntimeException) ex; 150 } else if (ex instanceof Error) { 151 throw (Error) ex; 152 } 153 throw new RuntimeException(ex); 154 } 155 } 156 return false; 157 } 158 159 @SuppressWarnings({ "rawtypes", "unchecked" }) 160 @Override 161 public Spliterator<Statement> trySplit() { 162 if (this.queue != null || triggerEmit()) { 163 final List<Object> list = new ArrayList<Object>(1024); 164 this.queue.drainTo(list); 165 final int last = list.size() - 1; 166 if (last >= 0) { 167 if (!(list.get(last) instanceof Statement)) { 168 this.queue.offer(list.remove(last)); 169 } 170 return (Spliterator) list.spliterator(); 171 } 172 } 173 return null; 174 } 175 176 private boolean triggerEmit() { 177 if (this.done) { 178 return false; 179 } 180 this.queue = new ArrayBlockingQueue<Object>(1024); 181 Environment.getPool().execute(new Runnable() { 182 183 @Override 184 public void run() { 185 doEmit(); 186 } 187 188 }); 189 return true; 190 } 191 192 private void doEmit() { 193 try { 194 emit(new AbstractRDFHandler() { 195 196 @Override 197 public void handleStatement(final Statement statement) 198 throws RDFHandlerException { 199 try { 200 queue.put(statement); 201 } catch (final InterruptedException ex) { 202 throw new RDFHandlerException("Interrupted", ex); 203 } 204 } 205 206 }, 1); 207 this.queue.put(this.queue); // queue object used as EOF marker 208 209 } catch (final Throwable ex) { 210 try { 211 this.queue.put(ex); 212 } catch (final Throwable ex2) { 213 this.queue = null; // last resort to break iteration 214 } 215 } 216 } 217 218 }; 219 } 220 221 /** 222 * {@inheritDoc} This default implementation delegates to {@link #emit(RDFHandler, int)}, 223 * enforcing a sequential invocation (but possibly by different threads) of the supplied 224 * {@code action}. Iteration order is unspecified and may differ across successive invocations 225 * of this method. There is usually no need to override this method. 226 * 227 * @throws RDFSourceException 228 * in case of errors in retrieving data from this source 229 */ 230 @Override 231 default void forEach(final Consumer<? super Statement> action) throws RDFSourceException { 232 try { 233 emit(new AbstractRDFHandler() { 234 235 @Override 236 public synchronized void handleStatement(final Statement statement) 237 throws RDFHandlerException { 238 action.accept(statement); 239 } 240 241 }, 1); 242 } catch (final RDFHandlerException ex) { 243 // Not thrown by Consumer -> RDFHandler adapter 244 throw new Error("Unexpected exception (!)", ex); 245 } 246 } 247 248 /** 249 * Returns a sequential Java 8 {@code Stream} over the statements of this {@code RDFSource}. 250 * This method is meant for integration with the Java 8 Stream API. Its default implementation 251 * builds on top of the {@link Spliterator} returned by {@link #spliterator()}. There is 252 * usually no need to override this method. 253 * 254 * @return a sequential {@code Stream} over the statements of this {@code RDFSource} 255 */ 256 default Stream<Statement> stream() { 257 return StreamSupport.stream(spliterator(), false); 258 } 259 260 /** 261 * Returns a parallel Java 8 {@code Stream} over the statements of this {@code RDFSource}. 262 * This method is meant for integration with the Java 8 Stream API. Its default implementation 263 * builds on top of the {@link Spliterator} returned by {@link #spliterator()}. There is 264 * usually no need to override this method. 265 * 266 * @return a parallel {@code Stream} over the statements of this {@code RDFSource} 267 */ 268 default Stream<Statement> parallelStream() { 269 return StreamSupport.stream(spliterator(), true); 270 } 271 272 }