1
2
3
4
5
6
7
8
9
10
11
12
13
14 package eu.fbk.rdfpro;
15
16 import java.lang.reflect.Constructor;
17 import java.lang.reflect.InvocationTargetException;
18 import java.util.ArrayList;
19 import java.util.Collection;
20 import java.util.Collections;
21 import java.util.List;
22 import java.util.Objects;
23 import java.util.Set;
24 import java.util.concurrent.atomic.AtomicInteger;
25
26 import com.google.common.base.Throwables;
27
28 import org.openrdf.model.Statement;
29 import org.openrdf.rio.RDFHandler;
30 import org.openrdf.rio.RDFHandlerException;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 import eu.fbk.rdfpro.util.Environment;
35 import eu.fbk.rdfpro.util.IO;
36 import eu.fbk.rdfpro.util.QuadModel;
37
38
39
40
41
42
43
44
45 public abstract class RuleEngine {
46
47 private static final Logger LOGGER = LoggerFactory.getLogger(RuleEngine.class);
48
49 private static final String IMPLEMENTATION = Environment.getProperty(
50 "rdfpro.rules.implementation", "eu.fbk.rdfpro.RuleEngineImpl");
51
52
53
54
55 private final Ruleset ruleset;
56
57
58
59
60
61
62
63
64 protected RuleEngine(final Ruleset ruleset) {
65
66
67 Objects.requireNonNull(ruleset);
68 for (final Rule rule : ruleset.getRules()) {
69 if (!rule.isSafe()) {
70 throw new IllegalArgumentException("Ruleset contains unsafe rule " + rule);
71 }
72 }
73
74
75 this.ruleset = ruleset;
76 }
77
78
79
80
81
82
83
84
85
86
87
88 public static RuleEngine create(final Ruleset ruleset) {
89
90
91 Objects.requireNonNull(ruleset);
92
93 try {
94
95 if (LOGGER.isTraceEnabled()) {
96 LOGGER.trace("Creating '{}' engine with ruleset:\n{}\n", IMPLEMENTATION, ruleset);
97 }
98
99
100 final Class<?> clazz = Class.forName(IMPLEMENTATION);
101 final Constructor<?> constructor = clazz.getConstructor(Ruleset.class);
102
103
104 return (RuleEngine) constructor.newInstance(ruleset);
105
106 } catch (final IllegalAccessException | ClassNotFoundException | NoSuchMethodException
107 | InstantiationException ex) {
108
109 throw new Error("Illegal rule engine implementation: " + IMPLEMENTATION, ex);
110
111 } catch (final InvocationTargetException ex) {
112
113 throw Throwables.propagate(ex.getCause());
114 }
115 }
116
117
118
119
120
121
122 public final Ruleset getRuleset() {
123 return this.ruleset;
124 }
125
126
127
128
129
130
131
132 public final void eval(final Collection<Statement> model) {
133
134
135 Objects.requireNonNull(model);
136
137
138 if (!LOGGER.isDebugEnabled()) {
139
140
141 doEval(model);
142
143 } else {
144
145
146 final long ts = System.currentTimeMillis();
147 final int inputSize = model.size();
148 LOGGER.debug("Rule evaluation started: {} input statements, {} rule(s), model input",
149 inputSize, this.ruleset.getRules().size());
150 doEval(model);
151 LOGGER.debug(
152 "Rule evaluation completed: {} input statements, {} output statements, {} ms",
153 inputSize, model.size(), System.currentTimeMillis() - ts);
154 }
155 }
156
157
158
159
160
161
162
163
164
165
166
167 public final RDFHandler eval(final RDFHandler handler, final boolean deduplicate) {
168
169
170 Objects.requireNonNull(handler);
171
172
173 if (!LOGGER.isDebugEnabled()) {
174
175
176 return doEval(handler, deduplicate);
177
178 } else {
179
180
181 final AtomicInteger numProcessed = new AtomicInteger(0);
182 final AtomicInteger numOut = new AtomicInteger(0);
183
184
185 final RDFHandler sink = new AbstractRDFHandlerWrapper(handler) {
186
187 @Override
188 public void handleStatement(final Statement statement) throws RDFHandlerException {
189 super.handleStatement(statement);
190 numOut.incrementAndGet();
191 }
192
193 };
194
195
196
197 return new AbstractRDFHandlerWrapper(doEval(sink, deduplicate)) {
198
199 private long ts;
200
201 @Override
202 public void startRDF() throws RDFHandlerException {
203 this.ts = System.currentTimeMillis();
204 numProcessed.set(0);
205 numOut.set(0);
206 LOGGER.debug("Rule evaluation started: {} rule(s), stream input",
207 RuleEngine.this.ruleset.getRules().size());
208 super.startRDF();
209 }
210
211 @Override
212 public void handleStatement(final Statement stmt) throws RDFHandlerException {
213 super.handleStatement(stmt);
214 numProcessed.incrementAndGet();
215 }
216
217 @Override
218 public void endRDF() throws RDFHandlerException {
219 super.endRDF();
220 LOGGER.debug("Rule evaluation completed: {} input statements, "
221 + "{} output statements , {} ms", numProcessed.get(), numOut.get(),
222 System.currentTimeMillis() - this.ts);
223 }
224
225 };
226 }
227 }
228
229
230
231
232
233
234
235
236 protected void doEval(final Collection<Statement> model) {
237
238
239 if (!this.ruleset.isDeletePossible()
240 && (model instanceof QuadModel || model instanceof Set<?>)) {
241
242
243
244 final List<Statement> inputStmts = new ArrayList<>(model);
245 final RDFHandler handler = doEval(RDFHandlers.decouple(RDFHandlers.wrap(Collections
246 .synchronizedCollection(model))), false);
247 try {
248 handler.startRDF();
249 for (final Statement stmt : inputStmts) {
250 handler.handleStatement(stmt);
251 }
252 handler.endRDF();
253 } catch (final RDFHandlerException ex) {
254 throw new RuntimeException(ex);
255 } finally {
256 IO.closeQuietly(handler);
257 }
258
259 } else {
260
261
262
263
264 final List<Statement> outputStmts = new ArrayList<>();
265 final RDFHandler handler = doEval(RDFHandlers.decouple(RDFHandlers.wrap(Collections
266 .synchronizedCollection(outputStmts))), true);
267 try {
268 handler.startRDF();
269 for (final Statement stmt : model) {
270 handler.handleStatement(stmt);
271 }
272 handler.endRDF();
273 } catch (final RDFHandlerException ex) {
274 throw new RuntimeException(ex);
275 } finally {
276 IO.closeQuietly(handler);
277 }
278 model.clear();
279 for (final Statement stmt : outputStmts) {
280 model.add(stmt);
281 }
282 }
283 }
284
285
286
287
288
289
290
291
292
293
294
295 protected RDFHandler doEval(final RDFHandler handler, final boolean deduplicate) {
296
297
298 return new AbstractRDFHandlerWrapper(handler) {
299
300 private QuadModel model;
301
302 @Override
303 public void startRDF() throws RDFHandlerException {
304 super.startRDF();
305 this.model = QuadModel.create();
306 }
307
308 @Override
309 public synchronized void handleStatement(final Statement stmt)
310 throws RDFHandlerException {
311 this.model.add(stmt);
312 }
313
314 @Override
315 public void endRDF() throws RDFHandlerException {
316 doEval(this.model);
317 for (final Statement stmt : this.model) {
318 super.handleStatement(stmt);
319 }
320 this.model = null;
321 super.endRDF();
322 }
323
324 };
325 }
326
327 }