1 /*
2 * RDFpro - An extensible tool for building stream-oriented RDF processing libraries.
3 *
4 * Written in 2014 by Francesco Corcoglioniti with support by Marco Amadori, Michele Mostarda,
5 * Alessio Palmero Aprosio and Marco Rospocher. Contact info on http://rdfpro.fbk.eu/
6 *
7 * To the extent possible under law, the authors have dedicated all copyright and related and
8 * neighboring rights to this software to the public domain worldwide. This software is
9 * distributed without any warranty.
10 *
11 * You should have received a copy of the CC0 Public Domain Dedication along with this software.
12 * If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.
13 */
14 package eu.fbk.rdfpro;
15
16 import java.util.ArrayList;
17 import java.util.Iterator;
18 import java.util.List;
19 import java.util.Objects;
20 import java.util.Spliterator;
21 import java.util.Spliterators;
22 import java.util.concurrent.ArrayBlockingQueue;
23 import java.util.concurrent.BlockingQueue;
24 import java.util.function.Consumer;
25 import java.util.stream.Stream;
26 import java.util.stream.StreamSupport;
27
28 import org.openrdf.model.Statement;
29 import org.openrdf.rio.RDFHandler;
30 import org.openrdf.rio.RDFHandlerException;
31
32 import eu.fbk.rdfpro.util.Environment;
33
34 /**
35 * A source of RDF data.
36 * <p>
37 * An {@code RDFSource} provides repeatable access to RDF data consisting of RDF statements but
38 * also namespace declarations and comments. Source data is assumed immutable and can be accessed
39 * multiple times in different ways:
40 * </p>
41 * <ul>
42 * <li>calling {@link #emit(RDFHandler, int)} that allows full access to all the kinds of data
43 * items (statements, namespaces, comments) and allow for optimizing multiple passes over source
44 * data;</li>
45 * <li>calling methods of the {@link Iterable} interface, i.e., {@link #iterator()} and
46 * {@link #forEach(Consumer)}; they allow access only to statements and may not be as efficient as
47 * {@code emit()}.</li>
48 * <li>obtaining a {@link Stream} using methods {@link #stream()} and {@link #parallelStream()},
49 * as can be done on Java {@code Collection}s.</li>
50 * </ul>
51 * <p>
52 * In order to implement this interface it is enough to implement method
53 * {@link #emit(RDFHandler, int)}, as default implementations are provided for the remaining
54 * methods. However, consider overriding also methods {@link #spliterator()} (and possibly
55 * {@link #iterator()}) in case you have access to a better {@code Splititerator} implementation,
56 * as the supplied one requires to launch a background thread performing
57 * {@link #emit(RDFHandler, int)} (this is not necessarily bad, as it decouples data extraction
58 * from data consumption); in general it is not necessary to override other methods.
59 * </p>
60 */
61 public interface RDFSource extends Iterable<Statement> {
62
63 /**
64 * Emits the RDF data of the source to the {@code RDFHandler} supplied, performing the
65 * requested number of passes. For each pass, the {@code handler} method {@code startRDF()} is
66 * called, followed by possibly concurrent invocations to {@code handleXXX()} methods and
67 * finally an invocation to {@code endRDF()}. Any exception causes this process to be
68 * interrupted. In any case, both after the successful completion of the requested passes or
69 * after a failure, method {@link AutoCloseable#close()} is called on the {@code handler} in
70 * case it implements interface {@link AutoCloseable} (or a sub-interface), thus giving it the
71 * possibility to release allocated resources.
72 *
73 * @param handler
74 * the handler where to emit RDF data (statements, comments, namespaces)
75 * @param passes
76 * the number of passes to perform on source data
77 * @throws RDFSourceException
78 * in case of specific errors in retrieving data from this source
79 * @throws RDFHandlerException
80 * in case of specific errors in the invoked {@link RDFHandler}
81 */
82 void emit(RDFHandler handler, int passes) throws RDFSourceException, RDFHandlerException;
83
84 /**
85 * {@inheritDoc} This default implementation delegates to {@link #spliterator()}. This
86 * baseline implementation is as good as the underlying {@code Spliterator} (wrapping is not
87 * expensive); it should be overridden if a more efficient can be produced.
88 */
89 @Override
90 default Iterator<Statement> iterator() {
91 return Spliterators.iterator(spliterator());
92 }
93
94 /**
95 * {@inheritDoc} This default implementation either delegates to {@link #forEach(Consumer)},
96 * in case {@link Spliterator#forEachRemaining(Consumer)} is immediately called (more
97 * efficient), or run {@link #emit(RDFHandler, int)} in a separate thread, populating a queue
98 * from where the returned {@code Spliterator} retrieves its elements. This baseline
99 * implementation should be overridden whenever possible.
100 */
101 @Override
102 default Spliterator<Statement> spliterator() {
103 return new Spliterator<Statement>() {
104
105 private BlockingQueue<Object> queue;
106
107 private boolean done;
108
109 @Override
110 public long estimateSize() {
111 return Long.MAX_VALUE; // size unknown
112 }
113
114 @Override
115 public int characteristics() {
116 return Spliterator.NONNULL | Spliterator.IMMUTABLE; // no other guarantees
117 }
118
119 @Override
120 public void forEachRemaining(final Consumer<? super Statement> action) {
121 Objects.requireNonNull(action);
122 if (this.queue != null) {
123 while (tryAdvance(action)) {
124 }
125 } else if (!this.done) {
126 forEach(action);
127 this.done = true;
128 }
129 }
130
131 @Override
132 public boolean tryAdvance(final Consumer<? super Statement> action) {
133 if (this.queue != null || triggerEmit()) {
134 try {
135 final Object object = this.queue.take();
136 if (object instanceof Statement) {
137 action.accept((Statement) object);
138 return true;
139 } else if (object instanceof Throwable) {
140 throw (Throwable) object;
141 } else {
142 this.queue = null;
143 this.done = true;
144 }
145 } catch (final Throwable ex) {
146 this.queue = null; // will ultimately kill the emit thread
147 this.done = true;
148 if (ex instanceof RuntimeException) {
149 throw (RuntimeException) ex;
150 } else if (ex instanceof Error) {
151 throw (Error) ex;
152 }
153 throw new RuntimeException(ex);
154 }
155 }
156 return false;
157 }
158
159 @SuppressWarnings({ "rawtypes", "unchecked" })
160 @Override
161 public Spliterator<Statement> trySplit() {
162 if (this.queue != null || triggerEmit()) {
163 final List<Object> list = new ArrayList<Object>(1024);
164 this.queue.drainTo(list);
165 final int last = list.size() - 1;
166 if (last >= 0) {
167 if (!(list.get(last) instanceof Statement)) {
168 this.queue.offer(list.remove(last));
169 }
170 return (Spliterator) list.spliterator();
171 }
172 }
173 return null;
174 }
175
176 private boolean triggerEmit() {
177 if (this.done) {
178 return false;
179 }
180 this.queue = new ArrayBlockingQueue<Object>(1024);
181 Environment.getPool().execute(new Runnable() {
182
183 @Override
184 public void run() {
185 doEmit();
186 }
187
188 });
189 return true;
190 }
191
192 private void doEmit() {
193 try {
194 emit(new AbstractRDFHandler() {
195
196 @Override
197 public void handleStatement(final Statement statement)
198 throws RDFHandlerException {
199 try {
200 queue.put(statement);
201 } catch (final InterruptedException ex) {
202 throw new RDFHandlerException("Interrupted", ex);
203 }
204 }
205
206 }, 1);
207 this.queue.put(this.queue); // queue object used as EOF marker
208
209 } catch (final Throwable ex) {
210 try {
211 this.queue.put(ex);
212 } catch (final Throwable ex2) {
213 this.queue = null; // last resort to break iteration
214 }
215 }
216 }
217
218 };
219 }
220
221 /**
222 * {@inheritDoc} This default implementation delegates to {@link #emit(RDFHandler, int)},
223 * enforcing a sequential invocation (but possibly by different threads) of the supplied
224 * {@code action}. Iteration order is unspecified and may differ across successive invocations
225 * of this method. There is usually no need to override this method.
226 *
227 * @throws RDFSourceException
228 * in case of errors in retrieving data from this source
229 */
230 @Override
231 default void forEach(final Consumer<? super Statement> action) throws RDFSourceException {
232 try {
233 emit(new AbstractRDFHandler() {
234
235 @Override
236 public synchronized void handleStatement(final Statement statement)
237 throws RDFHandlerException {
238 action.accept(statement);
239 }
240
241 }, 1);
242 } catch (final RDFHandlerException ex) {
243 // Not thrown by Consumer -> RDFHandler adapter
244 throw new Error("Unexpected exception (!)", ex);
245 }
246 }
247
248 /**
249 * Returns a sequential Java 8 {@code Stream} over the statements of this {@code RDFSource}.
250 * This method is meant for integration with the Java 8 Stream API. Its default implementation
251 * builds on top of the {@link Spliterator} returned by {@link #spliterator()}. There is
252 * usually no need to override this method.
253 *
254 * @return a sequential {@code Stream} over the statements of this {@code RDFSource}
255 */
256 default Stream<Statement> stream() {
257 return StreamSupport.stream(spliterator(), false);
258 }
259
260 /**
261 * Returns a parallel Java 8 {@code Stream} over the statements of this {@code RDFSource}.
262 * This method is meant for integration with the Java 8 Stream API. Its default implementation
263 * builds on top of the {@link Spliterator} returned by {@link #spliterator()}. There is
264 * usually no need to override this method.
265 *
266 * @return a parallel {@code Stream} over the statements of this {@code RDFSource}
267 */
268 default Stream<Statement> parallelStream() {
269 return StreamSupport.stream(spliterator(), true);
270 }
271
272 }