1 /*
2 * RDFpro - An extensible tool for building stream-oriented RDF processing libraries.
3 *
4 * Written in 2014 by Francesco Corcoglioniti with support by Marco Amadori, Michele Mostarda,
5 * Alessio Palmero Aprosio and Marco Rospocher. Contact info on http://rdfpro.fbk.eu/
6 *
7 * To the extent possible under law, the authors have dedicated all copyright and related and
8 * neighboring rights to this software to the public domain worldwide. This software is
9 * distributed without any warranty.
10 *
11 * You should have received a copy of the CC0 Public Domain Dedication along with this software.
12 * If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.
13 */
14 package eu.fbk.rdfpro;
15
16 import java.util.Objects;
17 import java.util.concurrent.CompletableFuture;
18
19 import org.openrdf.model.Statement;
20 import org.openrdf.rio.RDFHandler;
21 import org.openrdf.rio.RDFHandlerException;
22
23 import eu.fbk.rdfpro.util.Environment;
24
25 // assumptions
26 // - lifecycle: start, followed by handleXXX in parallel, followed by end
27 // - on error, further invocation to API method will result in exceptions
28 // - no specific way to interrupt computation (CTRL-C from outside)
29
30 /**
31 * A generic RDF stream transformer.
32 * <p>
33 * An {@code RDFProcessor} is a reusable Java component that consumes an input stream of RDF
34 * {@link Statement}s in one or more passes, produces an output stream of statements and may have
35 * side effect like writing RDF data.
36 * </p>
37 * <p>
38 * An {@code RDFProcessor} can be used by means of a number of methods:
39 * </p>
40 * <ul>
41 * <li>{@link #apply(RDFSource, RDFHandler, int)} and
42 * {@link #applyAsync(RDFSource, RDFHandler, int)} apply the {@code RDFProcessor} to data from a
43 * given {@code RDFSource}, emitting the results to a supplied {@code RDFHandler} in one or more
44 * passes;</li>
45 * <li>{@link #wrap(RDFSource)} wraps an {@code RDFSource}, returning a new {@code RDFSource} that
46 * post-process returned data with the {@code RDFProcessor};</li>
47 * <li>{@link #wrap(RDFHandler)} wraps an {@code RDFHandler}, returning a new {@code RDFHandler}
48 * that pre-process input data with the {@code RDFProcessor}.</li>
49 * </ul>
50 * <p>
51 * The transformation encapsulated by an {@code RDFProcessor} may require multiple passes on input
52 * data before results can be emitted. To this end, method {@link #getExtraPasses()} declares how
53 * many extra passes a {@code RDFProcessor} needs on its input with method before the result is
54 * produced in successive passes.
55 * </p>
56 * <p>
57 * Implementers of this interface should provide an implementation of {@link #wrap(RDFHandler)}
58 * and, optionally, of {@link #getExtraPasses()} in case their {@code RDFProcessor} requires extra
59 * passes. Other methods have reasonable default implementations that do not need (in principle)
60 * to be overridden.
61 * </p>
62 * <p>
63 * Implementations of this interface should be thread-safe, as it is allowed for its methods to be
64 * called concurrently by multiple threads (still, no particular contention is expected so basic
65 * synchronization is enough).
66 * </p>
67 */
68 public interface RDFProcessor {
69
70 /**
71 * Returns the number of extra passes required by the processor before starting emitting
72 * output data. This default implementation returns 0 (the common case).
73 *
74 * @return then number of extra passes, not negative
75 */
76 default int getExtraPasses() {
77 return 0; // may be overridden
78 }
79
80 /**
81 * Wraps the supplied {@code RDFSource} so to post-process data of the source with this
82 * {@code RDFProcessor} before returning it. Note that this method does not perform any real
83 * work, apart creating a wrapper. This default implementation delegates to
84 * {@link #wrap(RDFHandler)}.
85 *
86 * @param source
87 * the {@code RDFSource} to wrap, not null
88 * @return the wrapped {@code RDFSource}
89 */
90 default RDFSource wrap(final RDFSource source) {
91
92 Objects.requireNonNull(source);
93
94 return new RDFSource() {
95
96 @Override
97 public void emit(final RDFHandler handler, final int passes)
98 throws RDFSourceException, RDFHandlerException {
99 Objects.requireNonNull(handler);
100 final int totalPasses = passes + getExtraPasses();
101 if (passes > 0 && totalPasses > 0) {
102 final RDFHandler wrappedHandler = wrap(handler);
103 source.emit(wrappedHandler, totalPasses);
104 }
105 }
106
107 };
108 }
109
110 /**
111 * Wraps the supplied {@code RDFHandler} so to pre-process data fed to it with this
112 * {@code RDFProcessor}. Note that this method does not perform any real work, apart creating
113 * a wrapper.
114 *
115 * @param handler
116 * the {@code RDFHandler} to wrap, not null
117 * @return the wrapped {@code RDFHandler}
118 */
119 RDFHandler wrap(RDFHandler handler);
120
121 /**
122 * Applies the processor to the supplied {@code RDFSource}, emitting output data to the
123 * specified {@code RDFHandler} in one or more passes. This default implementation is based on
124 * {@link #wrap(RDFHandler)}.
125 *
126 * @param input
127 * the input {@code RDFSource}, not null
128 * @param output
129 * the output {@code RDFHandler}, not null
130 * @param passes
131 * the requested number of passes to be fed to the supplied {@code RDFHandler} (if
132 * zero nothing will be done)
133 * @throws RDFSourceException
134 * on error in retrieving data from the source
135 * @throws RDFHandlerException
136 * on error in the supplied {@code RDFHandler} or inside the {@code RDFProcessor}
137 */
138 default void apply(final RDFSource input, final RDFHandler output, final int passes)
139 throws RDFSourceException, RDFHandlerException {
140 if (passes > 0) {
141 input.emit(wrap(output), passes + getExtraPasses());
142 } else if (passes < 0) {
143 throw new IllegalArgumentException("Invalid number of passes " + passes);
144 } else {
145 Objects.requireNonNull(input);
146 Objects.requireNonNull(passes);
147 }
148 }
149
150 /**
151 * Asynchronously applies the processor to the supplied {@code RDFSource}, emitting output
152 * data to the specified {@code RDFHandler} in one or more passes. This method operates
153 * similarly to {@link #apply(RDFSource, RDFHandler, int)}, but immediately returns providing
154 * a {@link CompletableFuture} that can be used to track the result of the computation. This
155 * default implementation is based on {@link #apply(RDFSource, RDFHandler, int)} (and thus
156 * indirectly on {@link #wrap(RDFHandler)}) and makes use of the common thread pool provided
157 * by {@link Environment#getPool()}.
158 *
159 * @param input
160 * the input {@code RDFSource}, not null
161 * @param output
162 * the output {@code RDFHandler}, not null
163 * @param passes
164 * the requested number of passes to be fed to the supplied {@code RDFHandler} (if
165 * zero nothing will be done)
166 * @return a {@code CompletableFuture} that can be used for querying the state of the
167 * asynchronous computation and possibly execute a callback when it ends (note that
168 * cancellation is not supported).
169 */
170 default CompletableFuture<Void> applyAsync(final RDFSource input, final RDFHandler output,
171 final int passes) {
172 final CompletableFuture<Void> future = new CompletableFuture<Void>();
173 Environment.getPool().execute(new Runnable() {
174
175 @Override
176 public void run() {
177 if (!future.isDone()) {
178 try {
179 apply(input, output, passes);
180 future.complete(null);
181 } catch (final Throwable ex) {
182 future.completeExceptionally(ex);
183 }
184 }
185 }
186
187 });
188 return future;
189 }
190
191 }