1   /*
2    * RDFpro - An extensible tool for building stream-oriented RDF processing libraries.
3    * 
4    * Written in 2015 by Francesco Corcoglioniti with support by Alessio Palmero Aprosio and Marco
5    * Rospocher. Contact info on http://rdfpro.fbk.eu/
6    * 
7    * To the extent possible under law, the authors have dedicated all copyright and related and
8    * neighboring rights to this software to the public domain worldwide. This software is
9    * distributed without any warranty.
10   * 
11   * You should have received a copy of the CC0 Public Domain Dedication along with this software.
12   * If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.
13   */
14  package eu.fbk.rdfpro;
15  
16  import java.lang.reflect.Constructor;
17  import java.lang.reflect.InvocationTargetException;
18  import java.util.ArrayList;
19  import java.util.Collection;
20  import java.util.Collections;
21  import java.util.List;
22  import java.util.Objects;
23  import java.util.Set;
24  import java.util.concurrent.atomic.AtomicInteger;
25  
26  import com.google.common.base.Throwables;
27  
28  import org.openrdf.model.Statement;
29  import org.openrdf.rio.RDFHandler;
30  import org.openrdf.rio.RDFHandlerException;
31  import org.slf4j.Logger;
32  import org.slf4j.LoggerFactory;
33  
34  import eu.fbk.rdfpro.util.Environment;
35  import eu.fbk.rdfpro.util.IO;
36  import eu.fbk.rdfpro.util.QuadModel;
37  
38  /**
39   * Rule engine abstraction.
40   * <p>
41   * Implementation note: concrete rule engine implementations should extend this abstract class and
42   * implement one or both methods {@link #doEval(QuadModel)} and {@link #doEval(RDFHandler)}.
43   * </p>
44   */
45  public abstract class RuleEngine {
46  
47      private static final Logger LOGGER = LoggerFactory.getLogger(RuleEngine.class);
48  
49      private static final String IMPLEMENTATION = Environment.getProperty(
50              "rdfpro.rules.implementation", "eu.fbk.rdfpro.RuleEngineImpl");
51  
52      // private static final String IMPLEMENTATION = Environment.getProperty(
53      // "rdfpro.rules.implementation", "eu.fbk.rdfpro.RuleEngineDrools");
54  
55      private final Ruleset ruleset;
56  
57      /**
58       * Creates a new {@code RuleEngine} using the {@code Ruleset} specified. The ruleset must not
59       * contain unsafe rules.
60       *
61       * @param ruleset
62       *            the ruleset, not null and without unsafe rules
63       */
64      protected RuleEngine(final Ruleset ruleset) {
65  
66          // Check the input ruleset
67          Objects.requireNonNull(ruleset);
68          for (final Rule rule : ruleset.getRules()) {
69              if (!rule.isSafe()) {
70                  throw new IllegalArgumentException("Ruleset contains unsafe rule " + rule);
71              }
72          }
73  
74          // Store the ruleset
75          this.ruleset = ruleset;
76      }
77  
78      /**
79       * Factory method for creating a new {@code RuleEngine} using the {@code Ruleset} specified.
80       * The ruleset must not contain unsafe rules. The engine implementation instantiated is based
81       * on the value of configuration property {@code rdfpro.rules.implementation}, which contains
82       * the qualified name of a concrete class extending abstract class {@code RuleEngine}.
83       *
84       * @param ruleset
85       *            the ruleset, not null and without unsafe rules
86       * @return the created rule engine
87       */
88      public static RuleEngine create(final Ruleset ruleset) {
89  
90          // Check parameters
91          Objects.requireNonNull(ruleset);
92  
93          try {
94              // Log the operation
95              if (LOGGER.isTraceEnabled()) {
96                  LOGGER.trace("Creating '{}' engine with ruleset:\n{}\n", IMPLEMENTATION, ruleset);
97              }
98  
99              // Locate the RuleEngine constructor to be used
100             final Class<?> clazz = Class.forName(IMPLEMENTATION);
101             final Constructor<?> constructor = clazz.getConstructor(Ruleset.class);
102 
103             // Instantiate the engine via reflection
104             return (RuleEngine) constructor.newInstance(ruleset);
105 
106         } catch (final IllegalAccessException | ClassNotFoundException | NoSuchMethodException
107                 | InstantiationException ex) {
108             // Configuration is wrong
109             throw new Error("Illegal rule engine implementation: " + IMPLEMENTATION, ex);
110 
111         } catch (final InvocationTargetException ex) {
112             // Configuration is ok, but the RuleEngine cannot be created
113             throw Throwables.propagate(ex.getCause());
114         }
115     }
116 
117     /**
118      * Returns the ruleset applied by this engine
119      *
120      * @return the ruleset
121      */
122     public final Ruleset getRuleset() {
123         return this.ruleset;
124     }
125 
126     /**
127      * Evaluates rules on the {@code QuadModel} specified.
128      *
129      * @param model
130      *            the model the engine will operate on
131      */
132     public final void eval(final Collection<Statement> model) {
133 
134         // Check parameters
135         Objects.requireNonNull(model);
136 
137         // Handle two cases, respectively with/without logging information emitted
138         if (!LOGGER.isDebugEnabled()) {
139 
140             // Logging disabled: directly forward to doEval()
141             doEval(model);
142 
143         } else {
144 
145             // Logging enabled: log relevant info before and after forwarding to doEval()
146             final long ts = System.currentTimeMillis();
147             final int inputSize = model.size();
148             LOGGER.debug("Rule evaluation started: {} input statements, {} rule(s), model input",
149                     inputSize, this.ruleset.getRules().size());
150             doEval(model);
151             LOGGER.debug(
152                     "Rule evaluation completed: {} input statements, {} output statements, {} ms",
153                     inputSize, model.size(), System.currentTimeMillis() - ts);
154         }
155     }
156 
157     /**
158      * Evaluates rules in streaming mode, emitting resulting statements to the {@code RDFHandler}
159      * supplied.
160      *
161      * @param handler
162      *            the handler where to emit resulting statements
163      * @param deduplicate
164      *            true if the output should not contain duplicate statements
165      * @return an {@code RDFHandler} where input statements can be streamed into
166      */
167     public final RDFHandler eval(final RDFHandler handler, final boolean deduplicate) {
168 
169         // Check parameters
170         Objects.requireNonNull(handler);
171 
172         // Handle two cases, respectively with/without logging information emitted
173         if (!LOGGER.isDebugEnabled()) {
174 
175             // Logging disabled: delegate to doEval(), filtering out non-matchable quads
176             return doEval(handler, deduplicate);
177 
178         } else {
179 
180             // Logging enabled: allocate counters to track quads in (processed/propagated) and out
181             final AtomicInteger numProcessed = new AtomicInteger(0);
182             final AtomicInteger numOut = new AtomicInteger(0);
183 
184             // Wrap sink handler to count out quads
185             final RDFHandler sink = new AbstractRDFHandlerWrapper(handler) {
186 
187                 @Override
188                 public void handleStatement(final Statement statement) throws RDFHandlerException {
189                     super.handleStatement(statement);
190                     numOut.incrementAndGet();
191                 }
192 
193             };
194 
195             // Delegate to doEval(), wrapping the returned handler to perform logging and filter
196             // out non-matchable quads
197             return new AbstractRDFHandlerWrapper(doEval(sink, deduplicate)) {
198 
199                 private long ts;
200 
201                 @Override
202                 public void startRDF() throws RDFHandlerException {
203                     this.ts = System.currentTimeMillis();
204                     numProcessed.set(0);
205                     numOut.set(0);
206                     LOGGER.debug("Rule evaluation started: {} rule(s), stream input",
207                             RuleEngine.this.ruleset.getRules().size());
208                     super.startRDF();
209                 }
210 
211                 @Override
212                 public void handleStatement(final Statement stmt) throws RDFHandlerException {
213                     super.handleStatement(stmt);
214                     numProcessed.incrementAndGet();
215                 }
216 
217                 @Override
218                 public void endRDF() throws RDFHandlerException {
219                     super.endRDF();
220                     LOGGER.debug("Rule evaluation completed: {} input statements, "
221                             + "{} output statements , {} ms", numProcessed.get(), numOut.get(),
222                             System.currentTimeMillis() - this.ts);
223                 }
224 
225             };
226         }
227     }
228 
229     /**
230      * Internal method called by {@link #eval(QuadModel)}. Its base implementation delegates to
231      * {@link #doEval(RDFHandler)}.
232      *
233      * @param model
234      *            the model to operate on
235      */
236     protected void doEval(final Collection<Statement> model) {
237 
238         // Delegate to doEval(RDFHandler), handling two cases for performance reasons
239         if (!this.ruleset.isDeletePossible()
240                 && (model instanceof QuadModel || model instanceof Set<?>)) {
241 
242             // Optimized version that adds inferred statement back to the supplied model, relying
243             // on the fact that no statement can be possibly deleted
244             final List<Statement> inputStmts = new ArrayList<>(model);
245             final RDFHandler handler = doEval(RDFHandlers.decouple(RDFHandlers.wrap(Collections
246                     .synchronizedCollection(model))), false);
247             try {
248                 handler.startRDF();
249                 for (final Statement stmt : inputStmts) {
250                     handler.handleStatement(stmt);
251                 }
252                 handler.endRDF();
253             } catch (final RDFHandlerException ex) {
254                 throw new RuntimeException(ex);
255             } finally {
256                 IO.closeQuietly(handler);
257             }
258 
259         } else {
260 
261             // General implementation that stores resulting statement in a list, and then clears
262             // the input model and loads those statement (this will also take into consideration
263             // possible deletions)
264             final List<Statement> outputStmts = new ArrayList<>();
265             final RDFHandler handler = doEval(RDFHandlers.decouple(RDFHandlers.wrap(Collections
266                     .synchronizedCollection(outputStmts))), true);
267             try {
268                 handler.startRDF();
269                 for (final Statement stmt : model) {
270                     handler.handleStatement(stmt);
271                 }
272                 handler.endRDF();
273             } catch (final RDFHandlerException ex) {
274                 throw new RuntimeException(ex);
275             } finally {
276                 IO.closeQuietly(handler);
277             }
278             model.clear();
279             for (final Statement stmt : outputStmts) {
280                 model.add(stmt);
281             }
282         }
283     }
284 
285     /**
286      * Internal method called by {@link #eval(RDFHandler)}. Its base implementation delegates to
287      * {@link #doEval(QuadModel)}.
288      *
289      * @param handler
290      *            the handler where to emit resulting statements
291      * @param deduplicate
292      *            true if output should not contain duplicate statements
293      * @return an handler accepting input statements
294      */
295     protected RDFHandler doEval(final RDFHandler handler, final boolean deduplicate) {
296 
297         // Return an RDFHandler that delegates to doEval(QuadModel)
298         return new AbstractRDFHandlerWrapper(handler) {
299 
300             private QuadModel model;
301 
302             @Override
303             public void startRDF() throws RDFHandlerException {
304                 super.startRDF();
305                 this.model = QuadModel.create();
306             }
307 
308             @Override
309             public synchronized void handleStatement(final Statement stmt)
310                     throws RDFHandlerException {
311                 this.model.add(stmt);
312             }
313 
314             @Override
315             public void endRDF() throws RDFHandlerException {
316                 doEval(this.model);
317                 for (final Statement stmt : this.model) {
318                     super.handleStatement(stmt);
319                 }
320                 this.model = null; // free memory
321                 super.endRDF();
322             }
323 
324         };
325     }
326 
327 }