1   /*
2    * RDFpro - An extensible tool for building stream-oriented RDF processing libraries.
3    * 
4    * Written in 2014 by Francesco Corcoglioniti with support by Marco Amadori, Michele Mostarda,
5    * Alessio Palmero Aprosio and Marco Rospocher. Contact info on http://rdfpro.fbk.eu/
6    * 
7    * To the extent possible under law, the authors have dedicated all copyright and related and
8    * neighboring rights to this software to the public domain worldwide. This software is
9    * distributed without any warranty.
10   * 
11   * You should have received a copy of the CC0 Public Domain Dedication along with this software.
12   * If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.
13   */
14  package eu.fbk.rdfpro;
15  
16  import java.util.ArrayList;
17  import java.util.Iterator;
18  import java.util.List;
19  import java.util.Objects;
20  import java.util.Spliterator;
21  import java.util.Spliterators;
22  import java.util.concurrent.ArrayBlockingQueue;
23  import java.util.concurrent.BlockingQueue;
24  import java.util.function.Consumer;
25  import java.util.stream.Stream;
26  import java.util.stream.StreamSupport;
27  
28  import org.openrdf.model.Statement;
29  import org.openrdf.rio.RDFHandler;
30  import org.openrdf.rio.RDFHandlerException;
31  
32  import eu.fbk.rdfpro.util.Environment;
33  
34  /**
35   * A source of RDF data.
36   * <p>
37   * An {@code RDFSource} provides repeatable access to RDF data consisting of RDF statements but
38   * also namespace declarations and comments. Source data is assumed immutable and can be accessed
39   * multiple times in different ways:
40   * </p>
41   * <ul>
42   * <li>calling {@link #emit(RDFHandler, int)} that allows full access to all the kinds of data
43   * items (statements, namespaces, comments) and allow for optimizing multiple passes over source
44   * data;</li>
45   * <li>calling methods of the {@link Iterable} interface, i.e., {@link #iterator()} and
46   * {@link #forEach(Consumer)}; they allow access only to statements and may not be as efficient as
47   * {@code emit()}.</li>
48   * <li>obtaining a {@link Stream} using methods {@link #stream()} and {@link #parallelStream()},
49   * as can be done on Java {@code Collection}s.</li>
50   * </ul>
51   * <p>
52   * In order to implement this interface it is enough to implement method
53   * {@link #emit(RDFHandler, int)}, as default implementations are provided for the remaining
54   * methods. However, consider overriding also methods {@link #spliterator()} (and possibly
55   * {@link #iterator()}) in case you have access to a better {@code Splititerator} implementation,
56   * as the supplied one requires to launch a background thread performing
57   * {@link #emit(RDFHandler, int)} (this is not necessarily bad, as it decouples data extraction
58   * from data consumption); in general it is not necessary to override other methods.
59   * </p>
60   */
61  public interface RDFSource extends Iterable<Statement> {
62  
63      /**
64       * Emits the RDF data of the source to the {@code RDFHandler} supplied, performing the
65       * requested number of passes. For each pass, the {@code handler} method {@code startRDF()} is
66       * called, followed by possibly concurrent invocations to {@code handleXXX()} methods and
67       * finally an invocation to {@code endRDF()}. Any exception causes this process to be
68       * interrupted. In any case, both after the successful completion of the requested passes or
69       * after a failure, method {@link AutoCloseable#close()} is called on the {@code handler} in
70       * case it implements interface {@link AutoCloseable} (or a sub-interface), thus giving it the
71       * possibility to release allocated resources.
72       *
73       * @param handler
74       *            the handler where to emit RDF data (statements, comments, namespaces)
75       * @param passes
76       *            the number of passes to perform on source data
77       * @throws RDFSourceException
78       *             in case of specific errors in retrieving data from this source
79       * @throws RDFHandlerException
80       *             in case of specific errors in the invoked {@link RDFHandler}
81       */
82      void emit(RDFHandler handler, int passes) throws RDFSourceException, RDFHandlerException;
83  
84      /**
85       * {@inheritDoc} This default implementation delegates to {@link #spliterator()}. This
86       * baseline implementation is as good as the underlying {@code Spliterator} (wrapping is not
87       * expensive); it should be overridden if a more efficient can be produced.
88       */
89      @Override
90      default Iterator<Statement> iterator() {
91          return Spliterators.iterator(spliterator());
92      }
93  
94      /**
95       * {@inheritDoc} This default implementation either delegates to {@link #forEach(Consumer)},
96       * in case {@link Spliterator#forEachRemaining(Consumer)} is immediately called (more
97       * efficient), or run {@link #emit(RDFHandler, int)} in a separate thread, populating a queue
98       * from where the returned {@code Spliterator} retrieves its elements. This baseline
99       * implementation should be overridden whenever possible.
100      */
101     @Override
102     default Spliterator<Statement> spliterator() {
103         return new Spliterator<Statement>() {
104 
105             private BlockingQueue<Object> queue;
106 
107             private boolean done;
108 
109             @Override
110             public long estimateSize() {
111                 return Long.MAX_VALUE; // size unknown
112             }
113 
114             @Override
115             public int characteristics() {
116                 return Spliterator.NONNULL | Spliterator.IMMUTABLE; // no other guarantees
117             }
118 
119             @Override
120             public void forEachRemaining(final Consumer<? super Statement> action) {
121                 Objects.requireNonNull(action);
122                 if (this.queue != null) {
123                     while (tryAdvance(action)) {
124                     }
125                 } else if (!this.done) {
126                     forEach(action);
127                     this.done = true;
128                 }
129             }
130 
131             @Override
132             public boolean tryAdvance(final Consumer<? super Statement> action) {
133                 if (this.queue != null || triggerEmit()) {
134                     try {
135                         final Object object = this.queue.take();
136                         if (object instanceof Statement) {
137                             action.accept((Statement) object);
138                             return true;
139                         } else if (object instanceof Throwable) {
140                             throw (Throwable) object;
141                         } else {
142                             this.queue = null;
143                             this.done = true;
144                         }
145                     } catch (final Throwable ex) {
146                         this.queue = null; // will ultimately kill the emit thread
147                         this.done = true;
148                         if (ex instanceof RuntimeException) {
149                             throw (RuntimeException) ex;
150                         } else if (ex instanceof Error) {
151                             throw (Error) ex;
152                         }
153                         throw new RuntimeException(ex);
154                     }
155                 }
156                 return false;
157             }
158 
159             @SuppressWarnings({ "rawtypes", "unchecked" })
160             @Override
161             public Spliterator<Statement> trySplit() {
162                 if (this.queue != null || triggerEmit()) {
163                     final List<Object> list = new ArrayList<Object>(1024);
164                     this.queue.drainTo(list);
165                     final int last = list.size() - 1;
166                     if (last >= 0) {
167                         if (!(list.get(last) instanceof Statement)) {
168                             this.queue.offer(list.remove(last));
169                         }
170                         return (Spliterator) list.spliterator();
171                     }
172                 }
173                 return null;
174             }
175 
176             private boolean triggerEmit() {
177                 if (this.done) {
178                     return false;
179                 }
180                 this.queue = new ArrayBlockingQueue<Object>(1024);
181                 Environment.getPool().execute(new Runnable() {
182 
183                     @Override
184                     public void run() {
185                         doEmit();
186                     }
187 
188                 });
189                 return true;
190             }
191 
192             private void doEmit() {
193                 try {
194                     emit(new AbstractRDFHandler() {
195 
196                         @Override
197                         public void handleStatement(final Statement statement)
198                                 throws RDFHandlerException {
199                             try {
200                                 queue.put(statement);
201                             } catch (final InterruptedException ex) {
202                                 throw new RDFHandlerException("Interrupted", ex);
203                             }
204                         }
205 
206                     }, 1);
207                     this.queue.put(this.queue); // queue object used as EOF marker
208 
209                 } catch (final Throwable ex) {
210                     try {
211                         this.queue.put(ex);
212                     } catch (final Throwable ex2) {
213                         this.queue = null; // last resort to break iteration
214                     }
215                 }
216             }
217 
218         };
219     }
220 
221     /**
222      * {@inheritDoc} This default implementation delegates to {@link #emit(RDFHandler, int)},
223      * enforcing a sequential invocation (but possibly by different threads) of the supplied
224      * {@code action}. Iteration order is unspecified and may differ across successive invocations
225      * of this method. There is usually no need to override this method.
226      *
227      * @throws RDFSourceException
228      *             in case of errors in retrieving data from this source
229      */
230     @Override
231     default void forEach(final Consumer<? super Statement> action) throws RDFSourceException {
232         try {
233             emit(new AbstractRDFHandler() {
234 
235                 @Override
236                 public synchronized void handleStatement(final Statement statement)
237                         throws RDFHandlerException {
238                     action.accept(statement);
239                 }
240 
241             }, 1);
242         } catch (final RDFHandlerException ex) {
243             // Not thrown by Consumer -> RDFHandler adapter
244             throw new Error("Unexpected exception (!)", ex);
245         }
246     }
247 
248     /**
249      * Returns a sequential Java 8 {@code Stream} over the statements of this {@code RDFSource}.
250      * This method is meant for integration with the Java 8 Stream API. Its default implementation
251      * builds on top of the {@link Spliterator} returned by {@link #spliterator()}. There is
252      * usually no need to override this method.
253      *
254      * @return a sequential {@code Stream} over the statements of this {@code RDFSource}
255      */
256     default Stream<Statement> stream() {
257         return StreamSupport.stream(spliterator(), false);
258     }
259 
260     /**
261      * Returns a parallel Java 8 {@code Stream} over the statements of this {@code RDFSource}.
262      * This method is meant for integration with the Java 8 Stream API. Its default implementation
263      * builds on top of the {@link Spliterator} returned by {@link #spliterator()}. There is
264      * usually no need to override this method.
265      *
266      * @return a parallel {@code Stream} over the statements of this {@code RDFSource}
267      */
268     default Stream<Statement> parallelStream() {
269         return StreamSupport.stream(spliterator(), true);
270     }
271 
272 }