1   /*
2    * RDFpro - An extensible tool for building stream-oriented RDF processing libraries.
3    * 
4    * Written in 2014 by Francesco Corcoglioniti with support by Marco Amadori, Michele Mostarda,
5    * Alessio Palmero Aprosio and Marco Rospocher. Contact info on http://rdfpro.fbk.eu/
6    * 
7    * To the extent possible under law, the authors have dedicated all copyright and related and
8    * neighboring rights to this software to the public domain worldwide. This software is
9    * distributed without any warranty.
10   * 
11   * You should have received a copy of the CC0 Public Domain Dedication along with this software.
12   * If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.
13   */
14  package eu.fbk.rdfpro;
15  
16  import java.util.Objects;
17  import java.util.concurrent.CompletableFuture;
18  
19  import org.openrdf.model.Statement;
20  import org.openrdf.rio.RDFHandler;
21  import org.openrdf.rio.RDFHandlerException;
22  
23  import eu.fbk.rdfpro.util.Environment;
24  
25  // assumptions
26  // - lifecycle: start, followed by handleXXX in parallel, followed by end
27  // - on error, further invocation to API method will result in exceptions
28  // - no specific way to interrupt computation (CTRL-C from outside)
29  
30  /**
31   * A generic RDF stream transformer.
32   * <p>
33   * An {@code RDFProcessor} is a reusable Java component that consumes an input stream of RDF
34   * {@link Statement}s in one or more passes, produces an output stream of statements and may have
35   * side effect like writing RDF data.
36   * </p>
37   * <p>
38   * An {@code RDFProcessor} can be used by means of a number of methods:
39   * </p>
40   * <ul>
41   * <li>{@link #apply(RDFSource, RDFHandler, int)} and
42   * {@link #applyAsync(RDFSource, RDFHandler, int)} apply the {@code RDFProcessor} to data from a
43   * given {@code RDFSource}, emitting the results to a supplied {@code RDFHandler} in one or more
44   * passes;</li>
45   * <li>{@link #wrap(RDFSource)} wraps an {@code RDFSource}, returning a new {@code RDFSource} that
46   * post-process returned data with the {@code RDFProcessor};</li>
47   * <li>{@link #wrap(RDFHandler)} wraps an {@code RDFHandler}, returning a new {@code RDFHandler}
48   * that pre-process input data with the {@code RDFProcessor}.</li>
49   * </ul>
50   * <p>
51   * The transformation encapsulated by an {@code RDFProcessor} may require multiple passes on input
52   * data before results can be emitted. To this end, method {@link #getExtraPasses()} declares how
53   * many extra passes a {@code RDFProcessor} needs on its input with method before the result is
54   * produced in successive passes.
55   * </p>
56   * <p>
57   * Implementers of this interface should provide an implementation of {@link #wrap(RDFHandler)}
58   * and, optionally, of {@link #getExtraPasses()} in case their {@code RDFProcessor} requires extra
59   * passes. Other methods have reasonable default implementations that do not need (in principle)
60   * to be overridden.
61   * </p>
62   * <p>
63   * Implementations of this interface should be thread-safe, as it is allowed for its methods to be
64   * called concurrently by multiple threads (still, no particular contention is expected so basic
65   * synchronization is enough).
66   * </p>
67   */
68  public interface RDFProcessor {
69  
70      /**
71       * Returns the number of extra passes required by the processor before starting emitting
72       * output data. This default implementation returns 0 (the common case).
73       *
74       * @return then number of extra passes, not negative
75       */
76      default int getExtraPasses() {
77          return 0; // may be overridden
78      }
79  
80      /**
81       * Wraps the supplied {@code RDFSource} so to post-process data of the source with this
82       * {@code RDFProcessor} before returning it. Note that this method does not perform any real
83       * work, apart creating a wrapper. This default implementation delegates to
84       * {@link #wrap(RDFHandler)}.
85       *
86       * @param source
87       *            the {@code RDFSource} to wrap, not null
88       * @return the wrapped {@code RDFSource}
89       */
90      default RDFSource wrap(final RDFSource source) {
91  
92          Objects.requireNonNull(source);
93  
94          return new RDFSource() {
95  
96              @Override
97              public void emit(final RDFHandler handler, final int passes)
98                      throws RDFSourceException, RDFHandlerException {
99                  Objects.requireNonNull(handler);
100                 final int totalPasses = passes + getExtraPasses();
101                 if (passes > 0 && totalPasses > 0) {
102                     final RDFHandler wrappedHandler = wrap(handler);
103                     source.emit(wrappedHandler, totalPasses);
104                 }
105             }
106 
107         };
108     }
109 
110     /**
111      * Wraps the supplied {@code RDFHandler} so to pre-process data fed to it with this
112      * {@code RDFProcessor}. Note that this method does not perform any real work, apart creating
113      * a wrapper.
114      *
115      * @param handler
116      *            the {@code RDFHandler} to wrap, not null
117      * @return the wrapped {@code RDFHandler}
118      */
119     RDFHandler wrap(RDFHandler handler);
120 
121     /**
122      * Applies the processor to the supplied {@code RDFSource}, emitting output data to the
123      * specified {@code RDFHandler} in one or more passes. This default implementation is based on
124      * {@link #wrap(RDFHandler)}.
125      *
126      * @param input
127      *            the input {@code RDFSource}, not null
128      * @param output
129      *            the output {@code RDFHandler}, not null
130      * @param passes
131      *            the requested number of passes to be fed to the supplied {@code RDFHandler} (if
132      *            zero nothing will be done)
133      * @throws RDFSourceException
134      *             on error in retrieving data from the source
135      * @throws RDFHandlerException
136      *             on error in the supplied {@code RDFHandler} or inside the {@code RDFProcessor}
137      */
138     default void apply(final RDFSource input, final RDFHandler output, final int passes)
139             throws RDFSourceException, RDFHandlerException {
140         if (passes > 0) {
141             input.emit(wrap(output), passes + getExtraPasses());
142         } else if (passes < 0) {
143             throw new IllegalArgumentException("Invalid number of passes " + passes);
144         } else {
145             Objects.requireNonNull(input);
146             Objects.requireNonNull(passes);
147         }
148     }
149 
150     /**
151      * Asynchronously applies the processor to the supplied {@code RDFSource}, emitting output
152      * data to the specified {@code RDFHandler} in one or more passes. This method operates
153      * similarly to {@link #apply(RDFSource, RDFHandler, int)}, but immediately returns providing
154      * a {@link CompletableFuture} that can be used to track the result of the computation. This
155      * default implementation is based on {@link #apply(RDFSource, RDFHandler, int)} (and thus
156      * indirectly on {@link #wrap(RDFHandler)}) and makes use of the common thread pool provided
157      * by {@link Environment#getPool()}.
158      *
159      * @param input
160      *            the input {@code RDFSource}, not null
161      * @param output
162      *            the output {@code RDFHandler}, not null
163      * @param passes
164      *            the requested number of passes to be fed to the supplied {@code RDFHandler} (if
165      *            zero nothing will be done)
166      * @return a {@code CompletableFuture} that can be used for querying the state of the
167      *         asynchronous computation and possibly execute a callback when it ends (note that
168      *         cancellation is not supported).
169      */
170     default CompletableFuture<Void> applyAsync(final RDFSource input, final RDFHandler output,
171             final int passes) {
172         final CompletableFuture<Void> future = new CompletableFuture<Void>();
173         Environment.getPool().execute(new Runnable() {
174 
175             @Override
176             public void run() {
177                 if (!future.isDone()) {
178                     try {
179                         apply(input, output, passes);
180                         future.complete(null);
181                     } catch (final Throwable ex) {
182                         future.completeExceptionally(ex);
183                     }
184                 }
185             }
186 
187         });
188         return future;
189     }
190 
191 }