1
2
3
4
5
6
7
8
9
10
11
12
13
14 package eu.fbk.rdfpro;
15
16 import java.util.ArrayList;
17 import java.util.Arrays;
18 import java.util.Collection;
19 import java.util.List;
20 import java.util.Set;
21 import java.util.function.Function;
22 import java.util.function.Supplier;
23
24 import javax.annotation.Nullable;
25
26 import com.google.common.base.Throwables;
27 import com.google.common.collect.ImmutableList;
28 import com.google.common.collect.Iterables;
29 import com.google.common.collect.Lists;
30
31 import org.openrdf.model.Resource;
32 import org.openrdf.model.Statement;
33 import org.openrdf.model.URI;
34 import org.openrdf.model.Value;
35 import org.openrdf.query.algebra.StatementPattern;
36 import org.openrdf.rio.RDFHandler;
37 import org.openrdf.rio.RDFHandlerException;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 import eu.fbk.rdfpro.util.Environment;
42 import eu.fbk.rdfpro.util.QuadModel;
43 import eu.fbk.rdfpro.util.StatementDeduplicator;
44 import eu.fbk.rdfpro.util.StatementMatcher;
45 import eu.fbk.rdfpro.util.StatementTemplate;
46 import eu.fbk.rdfpro.util.StatementDeduplicator.ComparisonMethod;
47 import eu.fbk.rdfpro.util.Statements;
48
49 final class RuleEngineImpl extends RuleEngine {
50
51 private static final Logger LOGGER = LoggerFactory.getLogger(RuleEngineImpl.class);
52
53 private static final int DEDUPLICATION_CACHE_SIZE = 16 * 1024;
54
55 private static final boolean FORCE_DEDUPLICATION = Boolean.parseBoolean(Environment
56 .getProperty("rdfpro.rules.deduplication", "false"));
57
58 private static final boolean ENABLE_STREAMING = Boolean.parseBoolean(Environment.getProperty(
59 "rdfpro.rules.streaming", "true"));
60
61 private static final boolean ENABLE_SEMINAIVE = Boolean.parseBoolean(Environment.getProperty(
62 "rdfpro.rules.seminaive", "true"));
63
64 private final List<Phase> phases;
65
66 private final boolean unique;
67
68 public RuleEngineImpl(final Ruleset ruleset) {
69 super(ruleset);
70 this.phases = buildPhases(ruleset);
71
72 boolean unique = false;
73 for (final Phase phase : this.phases) {
74 unique = phase.isHandlerOutputUnique(unique);
75 }
76 this.unique = unique;
77 }
78
79 @Override
80 public String toString() {
81 final StringBuilder builder = new StringBuilder();
82 builder.append("SN+ rule engine (");
83 for (final Phase phase : this.phases) {
84 if (phase instanceof StreamPhase) {
85 builder.append('X');
86 } else if (phase instanceof NaivePhase) {
87 builder.append('N');
88 } else if (phase instanceof SemiNaivePhase) {
89 builder.append('S');
90 } else {
91 builder.append('?');
92 }
93 }
94 builder.append(this.unique ? "*" : "");
95 builder.append(')');
96 return builder.toString();
97 }
98
99 @Override
100 protected void doEval(final Collection<Statement> model) {
101
102 final QuadModel quadModel = model instanceof QuadModel ? (QuadModel) model
103 : QuadModel.create(model);
104 for (final Phase phase : this.phases) {
105 phase.normalize(quadModel.getValueNormalizer()).eval(quadModel);
106 }
107 if (model != quadModel) {
108 if (getRuleset().isDeletePossible() || !(model instanceof Set<?>)) {
109 model.clear();
110 }
111 model.addAll(quadModel);
112 }
113 }
114
115 @Override
116 protected RDFHandler doEval(final RDFHandler handler, final boolean deduplicate) {
117
118
119
120 int i = 0;
121 int j = this.phases.size() - 1;
122 while (i < this.phases.size() && this.phases.get(i).isHandlerSupported()) {
123 ++i;
124 }
125 while (j > i && this.phases.get(j).isHandlerSupported()
126 && !this.phases.get(j).isModelSupported()) {
127 --j;
128 }
129
130
131 RDFHandler result = handler;
132 for (int k = this.phases.size() - 1; k > j; --k) {
133 result = this.phases.get(k).eval(result,
134 deduplicate && !this.unique && k == this.phases.size() - 1);
135 }
136
137
138 if (i <= j) {
139 final List<Phase> modelPhases = this.phases.subList(i, j + 1);
140 result = RDFHandlers.decouple(new AbstractRDFHandlerWrapper(result) {
141
142 private long ts;
143
144 private QuadModel model;
145
146 @Override
147 public void startRDF() throws RDFHandlerException {
148 super.startRDF();
149 this.ts = System.currentTimeMillis();
150 this.model = QuadModel.create();
151 }
152
153 @Override
154 public void handleStatement(final Statement stmt) throws RDFHandlerException {
155 this.model.add(stmt);
156 }
157
158 @Override
159 public void endRDF() throws RDFHandlerException {
160 if (LOGGER.isDebugEnabled()) {
161 LOGGER.debug("Model populated in {} ms, {} statements",
162 System.currentTimeMillis() - this.ts, this.model.size());
163 }
164 for (final Phase phase : modelPhases) {
165 phase.normalize(this.model.getValueNormalizer()).eval(this.model);
166 }
167 for (final Statement stmt : this.model) {
168 super.handleStatement(stmt);
169 }
170 super.endRDF();
171 }
172
173 }, 1);
174 }
175
176
177 for (int k = i - 1; k >= 0; --k) {
178 result = this.phases.get(k).eval(result,
179 deduplicate && !this.unique && k == this.phases.size() - 1);
180 }
181
182
183 return result;
184 }
185
186 private static void expand(final Statement stmt, final RDFHandler sink,
187 final StatementDeduplicator deduplicator,
188 @Nullable final StatementMatcher deleteMatcher,
189 @Nullable final StatementMatcher insertMatcher, final boolean fixpoint,
190 final boolean emitStmt) throws RDFHandlerException {
191
192
193
194
195
196 if (!deduplicator.add(stmt)) {
197 return;
198 }
199
200
201 if (emitStmt
202 && (deleteMatcher == null || !deleteMatcher.match(stmt.getSubject(),
203 stmt.getPredicate(), stmt.getObject(), stmt.getContext()))) {
204 sink.handleStatement(stmt);
205 }
206
207
208 if (insertMatcher == null) {
209 return;
210 }
211
212
213 if (!fixpoint) {
214
215 for (final StatementTemplate template : insertMatcher.map(stmt.getSubject(),
216 stmt.getPredicate(), stmt.getObject(), stmt.getContext(),
217 StatementTemplate.class)) {
218 final Statement stmt2 = template.apply(stmt);
219 if (stmt2 != null && deduplicator.add(stmt2)) {
220 sink.handleStatement(stmt2);
221 }
222 }
223
224 } else {
225
226 StatementDeduplicator totalDeduplicator = null;
227 List<StatementTemplate> templates = null;
228 Statement[] stack = null;
229 int index = 0;
230 int capacity = 0;
231 Statement s = stmt;
232 while (true) {
233
234
235 if (stack != null
236 && (deleteMatcher == null || !deleteMatcher.match(s.getSubject(),
237 s.getPredicate(), s.getObject(), s.getContext()))) {
238 sink.handleStatement(s);
239 }
240
241
242 templates = insertMatcher.map(s.getSubject(), s.getPredicate(), s.getObject(),
243 s.getContext(), StatementTemplate.class, templates);
244 for (final StatementTemplate template : templates) {
245 final Statement stmt2 = template.apply(s, deduplicator);
246 if (stmt2 != null) {
247 if (!deduplicator.isTotal()) {
248 if (totalDeduplicator == null) {
249 totalDeduplicator = StatementDeduplicator
250 .newTotalDeduplicator(ComparisonMethod.EQUALS);
251 totalDeduplicator.add(stmt);
252 }
253 if (!totalDeduplicator.add(stmt2)) {
254 continue;
255 }
256 }
257 if (index == capacity) {
258 if (capacity == 0) {
259 capacity = 16;
260 stack = new Statement[capacity];
261 } else {
262 capacity *= 2;
263 stack = Arrays.copyOf(stack, capacity);
264 }
265 }
266 stack[index++] = stmt2;
267 }
268 }
269
270
271 if (index == 0) {
272 break;
273 }
274
275
276 s = stack[--index];
277
278
279 templates.clear();
280 }
281 }
282 }
283
284 private static Statement[] normalize(Statement[] statements,
285 final Function<Value, Value> normalizer) {
286
287 if (normalizer != null) {
288 statements = statements.clone();
289 for (int i = 0; i < statements.length; ++i) {
290 final Statement s = statements[i];
291 statements[i] = Statements.VALUE_FACTORY.createStatement(
292 (Resource) normalizer.apply(s.getSubject()),
293 (URI) normalizer.apply(s.getPredicate()),
294 normalizer.apply(s.getObject()),
295 (Resource) normalizer.apply(s.getContext()));
296 }
297 }
298 return statements;
299 }
300
301 private static List<Phase> buildPhases(final Ruleset ruleset) {
302
303
304
305 final List<Phase> phases = new ArrayList<>();
306 final List<Rule> rules = new ArrayList<>();
307 for (final Rule rule : ruleset.getRules()) {
308 if (!rules.isEmpty() && (rule.isFixpoint() != rules.get(0).isFixpoint()
309 || rule.getPhase() != rules.get(0).getPhase())) {
310 phases.add(buildPhase(rules));
311 rules.clear();
312 }
313 rules.add(rule);
314 }
315 if (!rules.isEmpty()) {
316 phases.add(buildPhase(rules));
317 }
318 return phases;
319 }
320
321 private static Phase buildPhase(final List<Rule> rules) {
322
323
324 boolean simple = true;
325 boolean insertOnly = true;
326 boolean streamable = true;
327 for (final Rule rule : rules) {
328 simple &= rule.isSimple();
329 insertOnly &= rule.getDeleteExpr() == null;
330 streamable &= rule.isStreamable();
331 }
332
333
334 Phase phase;
335 if (streamable && ENABLE_STREAMING) {
336 phase = StreamPhase.create(rules);
337 } else if (simple && insertOnly && ENABLE_SEMINAIVE) {
338 phase = SemiNaivePhase.create(rules);
339 } else {
340 phase = NaivePhase.create(rules);
341 }
342
343
344 return phase;
345 }
346
347 private static abstract class Phase {
348
349 private final boolean handlerSupported;
350
351 private final boolean modelSupported;
352
353 Phase(final boolean handlerSupported, final boolean modelSupported) {
354 this.handlerSupported = handlerSupported;
355 this.modelSupported = modelSupported;
356 }
357
358 public Phase normalize(final Function<Value, Value> normalizer) {
359 return this;
360 }
361
362 public RDFHandler eval(final RDFHandler handler, final boolean deduplicate) {
363 throw new Error();
364 }
365
366 public void eval(final QuadModel model) {
367 throw new Error();
368 }
369
370 public boolean isHandlerOutputUnique(final boolean inputUnique) {
371 return true;
372 }
373
374 public final boolean isHandlerSupported() {
375 return this.handlerSupported;
376 }
377
378 public final boolean isModelSupported() {
379 return this.modelSupported;
380 }
381
382 }
383
384 private static final class StreamPhase extends Phase {
385
386 @Nullable
387 private final StatementMatcher deleteMatcher;
388
389 @Nullable
390 private final StatementMatcher insertMatcher;
391
392 private final Statement[] axioms;
393
394 private final boolean fixpoint;
395
396 private StreamPhase(@Nullable final StatementMatcher deleteMatcher,
397 @Nullable final StatementMatcher insertMatcher, final Statement[] axioms,
398 final boolean fixpoint) {
399
400
401 super(true, false);
402
403
404 this.deleteMatcher = deleteMatcher;
405 this.insertMatcher = insertMatcher;
406 this.axioms = axioms;
407 this.fixpoint = fixpoint;
408 }
409
410 static StreamPhase create(final Iterable<Rule> rules) {
411
412
413 StatementMatcher.Builder db = null;
414 StatementMatcher.Builder ib = null;
415 final List<Statement> axioms = new ArrayList<>();
416 boolean containsFixpointRule = false;
417 boolean containsNonFixpointRule = false;
418
419
420 for (final Rule rule : rules) {
421 assert rule.isSafe() && rule.isStreamable();
422 containsFixpointRule |= rule.isFixpoint();
423 containsNonFixpointRule |= !rule.isFixpoint();
424 if (!rule.getWherePatterns().isEmpty()) {
425 final StatementPattern wp = rule.getWherePatterns().iterator().next();
426 for (final StatementPattern ip : rule.getInsertPatterns()) {
427 ib = ib != null ? ib : StatementMatcher.builder();
428 ib.addExpr(rule.getWhereExpr(), new StatementTemplate(ip, wp));
429 }
430 if (!rule.getDeletePatterns().isEmpty()) {
431 db = db != null ? db : StatementMatcher.builder();
432 db.addExpr(rule.getWhereExpr());
433 }
434 } else {
435 for (final StatementPattern ip : rule.getInsertPatterns()) {
436 final Value subj = ip.getSubjectVar().getValue();
437 final Value pred = ip.getPredicateVar().getValue();
438 final Value obj = ip.getObjectVar().getValue();
439 final Value ctx = ip.getContextVar() == null ? null : ip.getContextVar()
440 .getValue();
441 if (subj instanceof Resource && pred instanceof URI
442 && (ctx == null || ctx instanceof Resource)) {
443 axioms.add(Statements.VALUE_FACTORY.createStatement((Resource) subj,
444 (URI) pred, obj, (Resource) ctx));
445 }
446 }
447 }
448 }
449 assert containsFixpointRule ^ containsNonFixpointRule;
450
451
452 final StatementMatcher dm = db == null ? null : db.build(null);
453 final StatementMatcher im = ib == null ? null : ib.build(null);
454
455
456 if (LOGGER.isDebugEnabled()) {
457 LOGGER.debug(
458 "Configured StreamPhase: {} rules; {}; {} axioms; {} delete matcher; "
459 + "{} insert matcher", Iterables.size(rules),
460 containsFixpointRule ? "fixpoint" : "non fixpoint", axioms.size(), dm, im);
461 }
462
463
464
465 return new StreamPhase(dm, im, axioms.toArray(new Statement[axioms.size()]),
466 containsFixpointRule);
467 }
468
469 @Override
470 public boolean isHandlerOutputUnique(final boolean inputUnique) {
471 return inputUnique && this.insertMatcher == null;
472 }
473
474 @Override
475 public Phase normalize(final Function<Value, Value> normalizer) {
476
477
478 final StatementMatcher normalizedDeleteMatcher = this.deleteMatcher == null ? null
479 : this.deleteMatcher.normalize(normalizer);
480 final StatementMatcher normalizedInsertMatcher = this.insertMatcher == null ? null
481 : this.insertMatcher.normalize(normalizer);
482
483
484 final Statement[] normalizedAxioms = RuleEngineImpl.normalize(this.axioms, normalizer);
485
486
487 return new StreamPhase(normalizedDeleteMatcher, normalizedInsertMatcher,
488 normalizedAxioms, this.fixpoint);
489 }
490
491 @Override
492 public RDFHandler eval(final RDFHandler handler, final boolean deduplicate) {
493
494 return new AbstractRDFHandlerWrapper(handler) {
495
496 private StatementDeduplicator deduplicator;
497
498 @Override
499 public void startRDF() throws RDFHandlerException {
500
501
502 super.startRDF();
503
504
505 if (deduplicate || FORCE_DEDUPLICATION) {
506 this.deduplicator = StatementDeduplicator
507 .newTotalDeduplicator(ComparisonMethod.HASH);
508 } else {
509 this.deduplicator = StatementDeduplicator.newPartialDeduplicator(
510 ComparisonMethod.EQUALS, DEDUPLICATION_CACHE_SIZE);
511 }
512
513
514 for (final Statement axiom : StreamPhase.this.axioms) {
515 expand(axiom, this.handler, this.deduplicator,
516 StreamPhase.this.fixpoint ? StreamPhase.this.deleteMatcher : null,
517 StreamPhase.this.insertMatcher, StreamPhase.this.fixpoint, true);
518 }
519 }
520
521 @Override
522 public void handleStatement(final Statement stmt) throws RDFHandlerException {
523
524
525 expand(stmt, this.handler, this.deduplicator, StreamPhase.this.deleteMatcher,
526 StreamPhase.this.insertMatcher, StreamPhase.this.fixpoint, true);
527 }
528
529 };
530 }
531 }
532
533 private static final class NaivePhase extends Phase {
534
535 private final List<Rule> rules;
536
537 private final boolean fixpoint;
538
539 private final boolean canDelete;
540
541 private final boolean canInsert;
542
543 private NaivePhase(final List<Rule> rules, final boolean fixpoint,
544 final boolean canDelete, final boolean canInsert) {
545 super(false, true);
546 this.rules = rules;
547 this.fixpoint = fixpoint;
548 this.canDelete = canDelete;
549 this.canInsert = canInsert;
550 }
551
552 static NaivePhase create(final Iterable<Rule> rules) {
553
554
555 final List<Rule> ruleList = ImmutableList.copyOf(rules);
556 final boolean fixpoint = ruleList.get(0).isFixpoint();
557
558
559 if (LOGGER.isDebugEnabled()) {
560 LOGGER.debug("Configured NaivePhase: {} rules; {}", ruleList.size(),
561 fixpoint ? "fixpoint" : "non fixpoint");
562 }
563
564
565 boolean canDelete = false;
566 boolean canInsert = false;
567 for (final Rule rule : rules) {
568 canDelete |= rule.getDeleteExpr() != null;
569 canInsert |= rule.getInsertExpr() != null;
570 }
571
572
573 return new NaivePhase(ruleList, fixpoint && canInsert, canDelete, canInsert);
574 }
575
576 private StatementDeduplicator newDeduplicator() {
577 return FORCE_DEDUPLICATION ? StatementDeduplicator
578 .newTotalDeduplicator(ComparisonMethod.HASH) : StatementDeduplicator
579 .newPartialDeduplicator(ComparisonMethod.EQUALS, DEDUPLICATION_CACHE_SIZE);
580 }
581
582 @Override
583 public void eval(final QuadModel model) {
584
585 StatementDeduplicator deleteDeduplicator = this.canDelete ? newDeduplicator() : null;
586 StatementDeduplicator insertDeduplicator = this.canInsert ? newDeduplicator() : null;
587
588 if (!this.fixpoint) {
589
590 evalRules(deleteDeduplicator, insertDeduplicator, model);
591
592 } else {
593
594 while (true) {
595 final boolean modified = evalRules(deleteDeduplicator, insertDeduplicator,
596 model);
597 if (!modified) {
598 break;
599 }
600 if (this.canInsert && this.canDelete) {
601 deleteDeduplicator = newDeduplicator();
602 insertDeduplicator = newDeduplicator();
603 }
604 }
605 }
606 }
607
608 private boolean evalRules(final StatementDeduplicator deleteDeduplicator,
609 final StatementDeduplicator insertDeduplicator, final QuadModel model) {
610
611
612 final long ts1 = System.currentTimeMillis();
613
614
615 final StatementBuffer deleteBuffer = new StatementBuffer();
616 final StatementBuffer insertBuffer = new StatementBuffer();
617
618
619 final int numRules = Rule.evaluate(this.rules, model, null, () -> {
620 return deleteDeduplicator.deduplicate(deleteBuffer.get(), true);
621 }, () -> {
622 return insertDeduplicator.deduplicate(insertBuffer.get(), true);
623 });
624
625
626 final long ts2 = System.currentTimeMillis();
627 final int deleteBufferSize = deleteBuffer.size();
628 final int insertBufferSize = insertBuffer.size();
629
630
631
632 final StatementBuffer deleteDelta = deleteBuffer.isEmpty() ||
633 insertBuffer.isEmpty() ? null : new StatementBuffer();
634 final RDFHandler deleteCallback = deleteDelta == null ? null : deleteDelta.get();
635
636
637
638
639 final int size0 = model.size();
640 deleteBuffer.toModel(model, false, deleteCallback);
641 final int size1 = model.size();
642 insertBuffer.toModel(model, true, null);
643 final int size2 = model.size();
644
645
646
647
648 boolean result;
649 if (size0 != size2) {
650 result = true;
651 } else if (size0 == size1) {
652 result = false;
653 } else {
654 result = insertBuffer.contains(deleteBuffer);
655 }
656
657
658 final long ts3 = System.currentTimeMillis();
659 if (LOGGER.isDebugEnabled()) {
660 LOGGER.debug("{} rules (out of {}) rules evaluated in {} ms ({} ms query, "
661 + "{} ms modify), {} deletions ({} buffered), {} insertions "
662 + "({} buffered), {} quads in, {} quads out", numRules, this.rules.size(),
663 ts3 - ts1, ts2 - ts1, ts3 - ts2, size1 - size0, deleteBufferSize, size2
664 - size1, insertBufferSize, size0, size2);
665 }
666
667
668 return result;
669 }
670
671 }
672
673 private static final class SemiNaivePhase extends Phase {
674
675 private final List<Rule> allRules;
676
677 private final List<Rule> joinRules;
678
679 private final StatementMatcher streamMatcher;
680
681 private final StatementMatcher joinMatcher;
682
683 private final Statement[] axioms;
684
685 private final boolean fixpoint;
686
687 private SemiNaivePhase(final List<Rule> rules, final List<Rule> joinRules,
688 final StatementMatcher streamMatcher, final StatementMatcher joinMatcher,
689 final Statement[] axioms, final boolean fixpoint) {
690
691 super(!joinMatcher.matchAll(), true);
692 this.allRules = rules;
693 this.joinRules = joinRules;
694 this.streamMatcher = streamMatcher;
695 this.joinMatcher = joinMatcher;
696 this.axioms = axioms;
697 this.fixpoint = fixpoint;
698 }
699
700 public static SemiNaivePhase create(final Iterable<Rule> rules) {
701
702
703 final List<Rule> allRules = ImmutableList.copyOf(rules);
704 final boolean fixpoint = allRules.get(0).isFixpoint();
705
706
707 final List<Rule> joinRules = Lists.newArrayList();
708 final List<Statement> axioms = new ArrayList<>();
709 final StatementMatcher.Builder joinBuilder = StatementMatcher.builder();
710 final StatementMatcher.Builder streamBuilder = StatementMatcher.builder();
711
712
713
714 for (final Rule rule : allRules) {
715 if (!rule.isStreamable()) {
716 joinRules.add(rule);
717 for (final StatementPattern wp : rule.getWherePatterns()) {
718 joinBuilder.addPattern(wp, null);
719 }
720 } else if (!rule.getWherePatterns().isEmpty()) {
721 final StatementPattern wp = rule.getWherePatterns().iterator().next();
722 for (final StatementPattern ip : rule.getInsertPatterns()) {
723 streamBuilder.addExpr(rule.getWhereExpr(), new StatementTemplate(ip, wp));
724 }
725 } else {
726 for (final StatementPattern ip : rule.getInsertPatterns()) {
727 final Value subj = ip.getSubjectVar().getValue();
728 final Value pred = ip.getPredicateVar().getValue();
729 final Value obj = ip.getObjectVar().getValue();
730 final Value ctx = ip.getContextVar() == null ? null : ip.getContextVar()
731 .getValue();
732 if (subj instanceof Resource && pred instanceof URI
733 && (ctx == null || ctx instanceof Resource)) {
734 axioms.add(Statements.VALUE_FACTORY.createStatement((Resource) subj,
735 (URI) pred, obj, (Resource) ctx));
736 }
737 }
738 }
739 }
740 final StatementMatcher joinMatcher = joinBuilder.build(null);
741 final StatementMatcher streamMatcher = streamBuilder.build(null);
742
743
744 if (LOGGER.isDebugEnabled()) {
745 LOGGER.debug("Configured SemiNaivePhase: {} rules ({} join); {}; {} axioms; "
746 + "{} join matcher ({}); {} stream matcher", allRules.size(),
747 joinRules.size(), fixpoint ? "fixpoint" : "non fixpoint", axioms.size(),
748 joinMatcher, joinMatcher.matchAll() ? "match all" : "no match all",
749 streamMatcher);
750 }
751
752
753 return new SemiNaivePhase(allRules, ImmutableList.copyOf(joinRules), streamMatcher,
754 joinMatcher, axioms.toArray(new Statement[axioms.size()]), fixpoint);
755 }
756
757 @Override
758 public boolean isHandlerOutputUnique(final boolean inputUnique) {
759 return this.fixpoint && this.joinMatcher.matchAll();
760 }
761
762 @Override
763 public Phase normalize(final Function<Value, Value> normalizer) {
764
765 final StatementMatcher normStreamMatcher = this.streamMatcher.normalize(normalizer);
766 final StatementMatcher normModelMatcher = this.joinMatcher.normalize(normalizer);
767 final Statement[] normAxioms = RuleEngineImpl.normalize(this.axioms, normalizer);
768
769 Phase result = this;
770 if (normStreamMatcher != this.streamMatcher || normModelMatcher != this.joinMatcher
771 || normAxioms != this.axioms) {
772 result = new SemiNaivePhase(this.allRules, this.joinRules, normStreamMatcher,
773 normModelMatcher, normAxioms, this.fixpoint);
774 }
775 return result;
776 }
777
778 @SuppressWarnings("resource")
779 @Override
780 public RDFHandler eval(final RDFHandler handler, final boolean deduplicate) {
781
782
783 return this.fixpoint ? new FixpointHandler(handler, deduplicate)
784 : new NonFixpointHandler(handler, deduplicate);
785 }
786
787 @Override
788 public void eval(final QuadModel model) {
789
790
791 final StatementDeduplicator deduplicator;
792 if (FORCE_DEDUPLICATION) {
793 deduplicator = StatementDeduplicator.newTotalDeduplicator(ComparisonMethod.HASH);
794 } else {
795 deduplicator = StatementDeduplicator.newPartialDeduplicator(
796 ComparisonMethod.EQUALS, DEDUPLICATION_CACHE_SIZE);
797 }
798
799
800 if (!this.fixpoint) {
801
802 evalJoinStreamIteration(deduplicator, model);
803
804 } else {
805
806
807 if (this.joinRules.size() < this.allRules.size()) {
808 evalStreamFixpoint(deduplicator, model);
809 }
810 QuadModel delta = null;
811 while (true) {
812 delta = evalJoinIterationStreamFixpoint(deduplicator, model, delta, null);
813 if (delta.isEmpty()) {
814 break;
815 }
816 }
817 }
818 }
819
820 private void evalJoinStreamIteration(final StatementDeduplicator deduplicator,
821 final QuadModel model) {
822
823
824 final long ts0 = System.currentTimeMillis();
825
826
827 final StatementBuffer buffer = new StatementBuffer();
828
829
830 buffer.addAll(Arrays.asList(this.axioms));
831 applyStreamRules(deduplicator, model, buffer, false);
832
833
834 final int numVariants = Rule.evaluate(SemiNaivePhase.this.joinRules, model, null,
835 null, () -> {
836 return deduplicator.deduplicate(buffer.get(), true);
837 });
838
839
840 final long ts1 = System.currentTimeMillis();
841
842
843 final int size0 = model.size();
844 buffer.toModel(model, true, null);
845 final int size1 = model.size();
846
847
848 final long ts2 = System.currentTimeMillis();
849
850
851 if (LOGGER.isDebugEnabled()) {
852 LOGGER.debug("Iteration of {} join rules ({} variants) and {} stream rules "
853 + "performed in {} ms ({} ms evaluation, {} ms model update), "
854 + "{} insertions ({} buffered), {} quads in, {} quads out",
855 this.joinRules.size(), numVariants,
856 this.allRules.size() - this.joinRules.size(), ts2 - ts0, ts1 - ts0, ts2
857 - ts1, size1 - size0, buffer.size(), size0, size1);
858 }
859 }
860
861 private QuadModel evalJoinIterationStreamFixpoint(
862 final StatementDeduplicator deduplicator, final QuadModel model,
863 @Nullable final QuadModel delta, @Nullable final RDFHandler unmatcheableSink) {
864
865
866 final long ts0 = System.currentTimeMillis();
867
868
869 final StatementBuffer buffer = new StatementBuffer();
870
871
872
873
874
875 final Supplier<RDFHandler> supplier = () -> {
876 RDFHandler handler = buffer.get();
877 if (unmatcheableSink != null) {
878 handler = new AbstractRDFHandlerWrapper(handler) {
879
880 @Override
881 public void handleStatement(final Statement stmt)
882 throws RDFHandlerException {
883 if (SemiNaivePhase.this.joinMatcher.match(stmt)) {
884 super.handleStatement(stmt);
885 } else {
886 unmatcheableSink.handleStatement(stmt);
887 }
888 }
889
890 };
891 }
892 handler = new AbstractRDFHandlerWrapper(handler) {
893
894 @Override
895 public void handleStatement(final Statement stmt) throws RDFHandlerException {
896 expand(stmt, this.handler, deduplicator, null,
897 SemiNaivePhase.this.streamMatcher, true, true);
898 }
899
900 };
901 return handler;
902 };
903
904
905 final int numVariants = Rule.evaluate(this.joinRules, model, delta, null, supplier);
906
907
908 final long ts1 = System.currentTimeMillis();
909 final int joinBufferSize = buffer.size();
910
911
912 final StatementBuffer deltaBuffer = new StatementBuffer();
913 final int size0 = model.size();
914 buffer.toModel(model, true, deltaBuffer.get());
915 final int size1 = model.size();
916 final long ts2 = System.currentTimeMillis();
917
918
919 final QuadModel newDelta = model.filter(deltaBuffer);
920
921
922 final long ts3 = System.currentTimeMillis();
923 if (LOGGER.isDebugEnabled()) {
924 final int numJoinRules = this.joinRules.size();
925 final int numStreamRules = this.allRules.size() - this.joinRules.size();
926 LOGGER.debug("Iteration of {} join rules ({} variants) and fixpoint of {} stream "
927 + "rules evaluated in {} ms ({} ms evaluation, {} ms model update, {} ms "
928 + "delta), {} insertions ({} buffered), {} quads in, {} quads out",
929 numJoinRules, numVariants, numStreamRules, ts3 - ts0, ts1 - ts0,
930 ts2 - ts1, ts3 - ts2, size1 - size0, joinBufferSize, size0, size1);
931 }
932
933
934 return newDelta;
935 }
936
937 private void evalStreamFixpoint(final StatementDeduplicator deduplicator,
938 final QuadModel model) {
939
940
941 final long ts0 = System.currentTimeMillis();
942
943
944 final StatementBuffer buffer = new StatementBuffer();
945 buffer.addAll(Arrays.asList(this.axioms));
946 applyStreamRules(deduplicator, Iterables.concat(Arrays.asList(this.axioms), model),
947 buffer, true);
948
949
950 final long ts1 = System.currentTimeMillis();
951
952
953 final int size0 = model.size();
954 buffer.toModel(model, true, null);
955 final int size1 = model.size();
956
957
958 final long ts2 = System.currentTimeMillis();
959
960
961 if (LOGGER.isDebugEnabled()) {
962 final int numStreamRules = this.allRules.size() - this.joinRules.size();
963 LOGGER.debug("Fixpoint of {} stream rules evaluated in {} ms ({} ms evaluation, "
964 + "{} ms model update), {} insertions ({} buffered), {} quads in, "
965 + "{} quads out", numStreamRules, ts2 - ts0, ts1 - ts0, ts2 - ts1, size1
966 - size0, buffer.size(), size0, size1);
967 }
968 }
969
970 private void applyStreamRules(final StatementDeduplicator deduplicator,
971 final Iterable<Statement> statements, final Supplier<RDFHandler> sink,
972 final boolean fixpoint) {
973
974
975
976
977 final RDFHandler[] sinks = new RDFHandler[Environment.getCores()];
978 for (int i = 0; i < sinks.length; ++i) {
979 sinks[i] = RDFHandlers.decouple(new AbstractRDFHandlerWrapper(sink.get()) {
980
981 @Override
982 public void handleStatement(final Statement stmt) throws RDFHandlerException {
983 expand(stmt, this.handler, deduplicator, null,
984 SemiNaivePhase.this.streamMatcher, fixpoint, false);
985 }
986
987 }, 1);
988 }
989 final RDFHandler handler = RDFHandlers.dispatchRoundRobin(64, sinks);
990
991
992 try {
993 RDFSources.wrap(statements).emit(handler, 1);
994 } catch (final RDFHandlerException ex) {
995 Throwables.propagate(ex);
996 }
997 }
998
999 private final class FixpointHandler extends AbstractRDFHandlerWrapper {
1000
1001 private final boolean deduplicate;
1002
1003 private QuadModel joinModel;
1004
1005 private StatementDeduplicator deduplicator;
1006
1007 private RDFHandler sink;
1008
1009 FixpointHandler(final RDFHandler handler, final boolean deduplicate) {
1010 super(handler);
1011 this.deduplicate = deduplicate && SemiNaivePhase.this.fixpoint
1012 && SemiNaivePhase.this.joinMatcher.matchAll();
1013 }
1014
1015 @Override
1016 public void startRDF() throws RDFHandlerException {
1017
1018
1019 super.startRDF();
1020
1021
1022 this.joinModel = QuadModel.create();
1023
1024
1025 this.deduplicator = this.deduplicate || FORCE_DEDUPLICATION ? StatementDeduplicator
1026 .newTotalDeduplicator(ComparisonMethod.HASH) : StatementDeduplicator
1027 .newPartialDeduplicator(ComparisonMethod.EQUALS, DEDUPLICATION_CACHE_SIZE);
1028
1029
1030 this.sink = new AbstractRDFHandlerWrapper(this.handler) {
1031
1032 @Override
1033 public void handleStatement(final Statement stmt) throws RDFHandlerException {
1034 if (SemiNaivePhase.this.joinMatcher.match(stmt)) {
1035 synchronized (FixpointHandler.this.joinModel) {
1036 FixpointHandler.this.joinModel.add(stmt);
1037 }
1038 } else {
1039 this.handler.handleStatement(stmt);
1040 }
1041 }
1042
1043 };
1044
1045
1046 for (final Statement axiom : SemiNaivePhase.this.axioms) {
1047 handleStatement(axiom);
1048 }
1049 }
1050
1051 @Override
1052 public void handleStatement(final Statement stmt) throws RDFHandlerException {
1053
1054
1055
1056 expand(stmt, this.sink, this.deduplicator, null,
1057 SemiNaivePhase.this.streamMatcher, true, true);
1058 }
1059
1060 @Override
1061 public void endRDF() throws RDFHandlerException {
1062
1063
1064 QuadModel delta = null;
1065 while (true) {
1066 delta = evalJoinIterationStreamFixpoint(this.deduplicator, this.joinModel,
1067 delta, this.handler);
1068 if (delta.isEmpty()) {
1069 break;
1070 }
1071 }
1072
1073
1074 final RDFHandler sink = RDFHandlers.decouple(RDFHandlers.ignoreMethods(
1075 this.handler, RDFHandlers.METHOD_START_RDF | RDFHandlers.METHOD_END_RDF
1076 | RDFHandlers.METHOD_CLOSE));
1077 RDFSources.wrap(this.joinModel).emit(sink, 1);
1078
1079
1080 this.joinModel = null;
1081 this.deduplicator = null;
1082 this.sink = null;
1083
1084
1085 super.endRDF();
1086 }
1087
1088 }
1089
1090 private final class NonFixpointHandler extends AbstractRDFHandlerWrapper {
1091
1092 private final boolean deduplicate;
1093
1094 private QuadModel joinModel;
1095
1096 private StatementDeduplicator deduplicator;
1097
1098 NonFixpointHandler(final RDFHandler handler, final boolean deduplicate) {
1099 super(handler);
1100 this.deduplicate = deduplicate;
1101 }
1102
1103 @Override
1104 public void startRDF() throws RDFHandlerException {
1105
1106
1107 super.startRDF();
1108
1109
1110 this.joinModel = QuadModel.create();
1111 this.deduplicator = this.deduplicate || FORCE_DEDUPLICATION ? StatementDeduplicator
1112 .newTotalDeduplicator(ComparisonMethod.HASH) : StatementDeduplicator
1113 .newPartialDeduplicator(ComparisonMethod.EQUALS, DEDUPLICATION_CACHE_SIZE);
1114
1115
1116 for (final Statement axiom : SemiNaivePhase.this.axioms) {
1117 this.handler.handleStatement(axiom);
1118 }
1119 }
1120
1121 @Override
1122 public void handleStatement(final Statement stmt) throws RDFHandlerException {
1123
1124
1125 expand(stmt, this.handler, this.deduplicator, null,
1126 SemiNaivePhase.this.streamMatcher, false, true);
1127
1128
1129 if (SemiNaivePhase.this.joinMatcher.match(stmt)) {
1130 synchronized (this.joinModel) {
1131 this.joinModel.add(stmt);
1132 }
1133 }
1134 }
1135
1136 @Override
1137 public void endRDF() throws RDFHandlerException {
1138
1139
1140 Rule.evaluate(SemiNaivePhase.this.joinRules, this.joinModel, null, null, () -> {
1141 return this.deduplicator.deduplicate(this.handler, true);
1142 });
1143
1144
1145 super.endRDF();
1146 }
1147
1148 }
1149
1150 }
1151
1152 }