1 /*
2 * RDFpro - An extensible tool for building stream-oriented RDF processing libraries.
3 *
4 * Written in 2015 by Francesco Corcoglioniti with support by Alessio Palmero Aprosio and Marco
5 * Rospocher. Contact info on http://rdfpro.fbk.eu/
6 *
7 * To the extent possible under law, the authors have dedicated all copyright and related and
8 * neighboring rights to this software to the public domain worldwide. This software is
9 * distributed without any warranty.
10 *
11 * You should have received a copy of the CC0 Public Domain Dedication along with this software.
12 * If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.
13 */
14 package eu.fbk.rdfpro;
15
16 import java.lang.reflect.Constructor;
17 import java.lang.reflect.InvocationTargetException;
18 import java.util.ArrayList;
19 import java.util.Collection;
20 import java.util.Collections;
21 import java.util.List;
22 import java.util.Objects;
23 import java.util.Set;
24 import java.util.concurrent.atomic.AtomicInteger;
25
26 import com.google.common.base.Throwables;
27
28 import org.openrdf.model.Statement;
29 import org.openrdf.rio.RDFHandler;
30 import org.openrdf.rio.RDFHandlerException;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 import eu.fbk.rdfpro.util.Environment;
35 import eu.fbk.rdfpro.util.IO;
36 import eu.fbk.rdfpro.util.QuadModel;
37
38 /**
39 * Rule engine abstraction.
40 * <p>
41 * Implementation note: concrete rule engine implementations should extend this abstract class and
42 * implement one or both methods {@link #doEval(QuadModel)} and {@link #doEval(RDFHandler)}.
43 * </p>
44 */
45 public abstract class RuleEngine {
46
47 private static final Logger LOGGER = LoggerFactory.getLogger(RuleEngine.class);
48
49 private static final String IMPLEMENTATION = Environment.getProperty(
50 "rdfpro.rules.implementation", "eu.fbk.rdfpro.RuleEngineImpl");
51
52 // private static final String IMPLEMENTATION = Environment.getProperty(
53 // "rdfpro.rules.implementation", "eu.fbk.rdfpro.RuleEngineDrools");
54
55 private final Ruleset ruleset;
56
57 /**
58 * Creates a new {@code RuleEngine} using the {@code Ruleset} specified. The ruleset must not
59 * contain unsafe rules.
60 *
61 * @param ruleset
62 * the ruleset, not null and without unsafe rules
63 */
64 protected RuleEngine(final Ruleset ruleset) {
65
66 // Check the input ruleset
67 Objects.requireNonNull(ruleset);
68 for (final Rule rule : ruleset.getRules()) {
69 if (!rule.isSafe()) {
70 throw new IllegalArgumentException("Ruleset contains unsafe rule " + rule);
71 }
72 }
73
74 // Store the ruleset
75 this.ruleset = ruleset;
76 }
77
78 /**
79 * Factory method for creating a new {@code RuleEngine} using the {@code Ruleset} specified.
80 * The ruleset must not contain unsafe rules. The engine implementation instantiated is based
81 * on the value of configuration property {@code rdfpro.rules.implementation}, which contains
82 * the qualified name of a concrete class extending abstract class {@code RuleEngine}.
83 *
84 * @param ruleset
85 * the ruleset, not null and without unsafe rules
86 * @return the created rule engine
87 */
88 public static RuleEngine create(final Ruleset ruleset) {
89
90 // Check parameters
91 Objects.requireNonNull(ruleset);
92
93 try {
94 // Log the operation
95 if (LOGGER.isTraceEnabled()) {
96 LOGGER.trace("Creating '{}' engine with ruleset:\n{}\n", IMPLEMENTATION, ruleset);
97 }
98
99 // Locate the RuleEngine constructor to be used
100 final Class<?> clazz = Class.forName(IMPLEMENTATION);
101 final Constructor<?> constructor = clazz.getConstructor(Ruleset.class);
102
103 // Instantiate the engine via reflection
104 return (RuleEngine) constructor.newInstance(ruleset);
105
106 } catch (final IllegalAccessException | ClassNotFoundException | NoSuchMethodException
107 | InstantiationException ex) {
108 // Configuration is wrong
109 throw new Error("Illegal rule engine implementation: " + IMPLEMENTATION, ex);
110
111 } catch (final InvocationTargetException ex) {
112 // Configuration is ok, but the RuleEngine cannot be created
113 throw Throwables.propagate(ex.getCause());
114 }
115 }
116
117 /**
118 * Returns the ruleset applied by this engine
119 *
120 * @return the ruleset
121 */
122 public final Ruleset getRuleset() {
123 return this.ruleset;
124 }
125
126 /**
127 * Evaluates rules on the {@code QuadModel} specified.
128 *
129 * @param model
130 * the model the engine will operate on
131 */
132 public final void eval(final Collection<Statement> model) {
133
134 // Check parameters
135 Objects.requireNonNull(model);
136
137 // Handle two cases, respectively with/without logging information emitted
138 if (!LOGGER.isDebugEnabled()) {
139
140 // Logging disabled: directly forward to doEval()
141 doEval(model);
142
143 } else {
144
145 // Logging enabled: log relevant info before and after forwarding to doEval()
146 final long ts = System.currentTimeMillis();
147 final int inputSize = model.size();
148 LOGGER.debug("Rule evaluation started: {} input statements, {} rule(s), model input",
149 inputSize, this.ruleset.getRules().size());
150 doEval(model);
151 LOGGER.debug(
152 "Rule evaluation completed: {} input statements, {} output statements, {} ms",
153 inputSize, model.size(), System.currentTimeMillis() - ts);
154 }
155 }
156
157 /**
158 * Evaluates rules in streaming mode, emitting resulting statements to the {@code RDFHandler}
159 * supplied.
160 *
161 * @param handler
162 * the handler where to emit resulting statements
163 * @param deduplicate
164 * true if the output should not contain duplicate statements
165 * @return an {@code RDFHandler} where input statements can be streamed into
166 */
167 public final RDFHandler eval(final RDFHandler handler, final boolean deduplicate) {
168
169 // Check parameters
170 Objects.requireNonNull(handler);
171
172 // Handle two cases, respectively with/without logging information emitted
173 if (!LOGGER.isDebugEnabled()) {
174
175 // Logging disabled: delegate to doEval(), filtering out non-matchable quads
176 return doEval(handler, deduplicate);
177
178 } else {
179
180 // Logging enabled: allocate counters to track quads in (processed/propagated) and out
181 final AtomicInteger numProcessed = new AtomicInteger(0);
182 final AtomicInteger numOut = new AtomicInteger(0);
183
184 // Wrap sink handler to count out quads
185 final RDFHandler sink = new AbstractRDFHandlerWrapper(handler) {
186
187 @Override
188 public void handleStatement(final Statement statement) throws RDFHandlerException {
189 super.handleStatement(statement);
190 numOut.incrementAndGet();
191 }
192
193 };
194
195 // Delegate to doEval(), wrapping the returned handler to perform logging and filter
196 // out non-matchable quads
197 return new AbstractRDFHandlerWrapper(doEval(sink, deduplicate)) {
198
199 private long ts;
200
201 @Override
202 public void startRDF() throws RDFHandlerException {
203 this.ts = System.currentTimeMillis();
204 numProcessed.set(0);
205 numOut.set(0);
206 LOGGER.debug("Rule evaluation started: {} rule(s), stream input",
207 RuleEngine.this.ruleset.getRules().size());
208 super.startRDF();
209 }
210
211 @Override
212 public void handleStatement(final Statement stmt) throws RDFHandlerException {
213 super.handleStatement(stmt);
214 numProcessed.incrementAndGet();
215 }
216
217 @Override
218 public void endRDF() throws RDFHandlerException {
219 super.endRDF();
220 LOGGER.debug("Rule evaluation completed: {} input statements, "
221 + "{} output statements , {} ms", numProcessed.get(), numOut.get(),
222 System.currentTimeMillis() - this.ts);
223 }
224
225 };
226 }
227 }
228
229 /**
230 * Internal method called by {@link #eval(QuadModel)}. Its base implementation delegates to
231 * {@link #doEval(RDFHandler)}.
232 *
233 * @param model
234 * the model to operate on
235 */
236 protected void doEval(final Collection<Statement> model) {
237
238 // Delegate to doEval(RDFHandler), handling two cases for performance reasons
239 if (!this.ruleset.isDeletePossible()
240 && (model instanceof QuadModel || model instanceof Set<?>)) {
241
242 // Optimized version that adds inferred statement back to the supplied model, relying
243 // on the fact that no statement can be possibly deleted
244 final List<Statement> inputStmts = new ArrayList<>(model);
245 final RDFHandler handler = doEval(RDFHandlers.decouple(RDFHandlers.wrap(Collections
246 .synchronizedCollection(model))), false);
247 try {
248 handler.startRDF();
249 for (final Statement stmt : inputStmts) {
250 handler.handleStatement(stmt);
251 }
252 handler.endRDF();
253 } catch (final RDFHandlerException ex) {
254 throw new RuntimeException(ex);
255 } finally {
256 IO.closeQuietly(handler);
257 }
258
259 } else {
260
261 // General implementation that stores resulting statement in a list, and then clears
262 // the input model and loads those statement (this will also take into consideration
263 // possible deletions)
264 final List<Statement> outputStmts = new ArrayList<>();
265 final RDFHandler handler = doEval(RDFHandlers.decouple(RDFHandlers.wrap(Collections
266 .synchronizedCollection(outputStmts))), true);
267 try {
268 handler.startRDF();
269 for (final Statement stmt : model) {
270 handler.handleStatement(stmt);
271 }
272 handler.endRDF();
273 } catch (final RDFHandlerException ex) {
274 throw new RuntimeException(ex);
275 } finally {
276 IO.closeQuietly(handler);
277 }
278 model.clear();
279 for (final Statement stmt : outputStmts) {
280 model.add(stmt);
281 }
282 }
283 }
284
285 /**
286 * Internal method called by {@link #eval(RDFHandler)}. Its base implementation delegates to
287 * {@link #doEval(QuadModel)}.
288 *
289 * @param handler
290 * the handler where to emit resulting statements
291 * @param deduplicate
292 * true if output should not contain duplicate statements
293 * @return an handler accepting input statements
294 */
295 protected RDFHandler doEval(final RDFHandler handler, final boolean deduplicate) {
296
297 // Return an RDFHandler that delegates to doEval(QuadModel)
298 return new AbstractRDFHandlerWrapper(handler) {
299
300 private QuadModel model;
301
302 @Override
303 public void startRDF() throws RDFHandlerException {
304 super.startRDF();
305 this.model = QuadModel.create();
306 }
307
308 @Override
309 public synchronized void handleStatement(final Statement stmt)
310 throws RDFHandlerException {
311 this.model.add(stmt);
312 }
313
314 @Override
315 public void endRDF() throws RDFHandlerException {
316 doEval(this.model);
317 for (final Statement stmt : this.model) {
318 super.handleStatement(stmt);
319 }
320 this.model = null; // free memory
321 super.endRDF();
322 }
323
324 };
325 }
326
327 }