1   /*
2    * RDFpro - An extensible tool for building stream-oriented RDF processing libraries.
3    * 
4    * Written in 2015 by Francesco Corcoglioniti with support by Alessio Palmero Aprosio and Marco
5    * Rospocher. Contact info on http://rdfpro.fbk.eu/
6    * 
7    * To the extent possible under law, the authors have dedicated all copyright and related and
8    * neighboring rights to this software to the public domain worldwide. This software is
9    * distributed without any warranty.
10   * 
11   * You should have received a copy of the CC0 Public Domain Dedication along with this software.
12   * If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.
13   */
14  package eu.fbk.rdfpro;
15  
16  import java.util.ArrayList;
17  import java.util.Arrays;
18  import java.util.Collection;
19  import java.util.List;
20  import java.util.Set;
21  import java.util.function.Function;
22  import java.util.function.Supplier;
23  
24  import javax.annotation.Nullable;
25  
26  import com.google.common.base.Throwables;
27  import com.google.common.collect.ImmutableList;
28  import com.google.common.collect.Iterables;
29  import com.google.common.collect.Lists;
30  
31  import org.openrdf.model.Resource;
32  import org.openrdf.model.Statement;
33  import org.openrdf.model.URI;
34  import org.openrdf.model.Value;
35  import org.openrdf.query.algebra.StatementPattern;
36  import org.openrdf.rio.RDFHandler;
37  import org.openrdf.rio.RDFHandlerException;
38  import org.slf4j.Logger;
39  import org.slf4j.LoggerFactory;
40  
41  import eu.fbk.rdfpro.util.Environment;
42  import eu.fbk.rdfpro.util.QuadModel;
43  import eu.fbk.rdfpro.util.StatementDeduplicator;
44  import eu.fbk.rdfpro.util.StatementMatcher;
45  import eu.fbk.rdfpro.util.StatementTemplate;
46  import eu.fbk.rdfpro.util.StatementDeduplicator.ComparisonMethod;
47  import eu.fbk.rdfpro.util.Statements;
48  
49  final class RuleEngineImpl extends RuleEngine {
50  
51      private static final Logger LOGGER = LoggerFactory.getLogger(RuleEngineImpl.class);
52  
53      private static final int DEDUPLICATION_CACHE_SIZE = 16 * 1024;
54  
55      private static final boolean FORCE_DEDUPLICATION = Boolean.parseBoolean(Environment
56              .getProperty("rdfpro.rules.deduplication", "false"));
57  
58      private static final boolean ENABLE_STREAMING = Boolean.parseBoolean(Environment.getProperty(
59              "rdfpro.rules.streaming", "true"));
60  
61      private static final boolean ENABLE_SEMINAIVE = Boolean.parseBoolean(Environment.getProperty(
62              "rdfpro.rules.seminaive", "true"));
63  
64      private final List<Phase> phases;
65  
66      private final boolean unique;
67  
68      public RuleEngineImpl(final Ruleset ruleset) {
69          super(ruleset);
70          this.phases = buildPhases(ruleset);
71  
72          boolean unique = false;
73          for (final Phase phase : this.phases) {
74              unique = phase.isHandlerOutputUnique(unique);
75          }
76          this.unique = unique;
77      }
78  
79      @Override
80      public String toString() {
81          final StringBuilder builder = new StringBuilder();
82          builder.append("SN+ rule engine (");
83          for (final Phase phase : this.phases) {
84              if (phase instanceof StreamPhase) {
85                  builder.append('X');
86              } else if (phase instanceof NaivePhase) {
87                  builder.append('N');
88              } else if (phase instanceof SemiNaivePhase) {
89                  builder.append('S');
90              } else {
91                  builder.append('?');
92              }
93          }
94          builder.append(this.unique ? "*" : "");
95          builder.append(')');
96          return builder.toString();
97      }
98  
99      @Override
100     protected void doEval(final Collection<Statement> model) {
101 
102         final QuadModel quadModel = model instanceof QuadModel ? (QuadModel) model //
103                 : QuadModel.create(model);
104         for (final Phase phase : this.phases) {
105             phase.normalize(quadModel.getValueNormalizer()).eval(quadModel);
106         }
107         if (model != quadModel) {
108             if (getRuleset().isDeletePossible() || !(model instanceof Set<?>)) {
109                 model.clear();
110             }
111             model.addAll(quadModel);
112         }
113     }
114 
115     @Override
116     protected RDFHandler doEval(final RDFHandler handler, final boolean deduplicate) {
117 
118         // Determine the phase index range [i, j] (i,j included) where evaluation should be done
119         // on a fully indexed model (i for necessity, up to j for necessity or convenience)
120         int i = 0;
121         int j = this.phases.size() - 1;
122         while (i < this.phases.size() && this.phases.get(i).isHandlerSupported()) {
123             ++i;
124         }
125         while (j > i && this.phases.get(j).isHandlerSupported()
126                 && !this.phases.get(j).isModelSupported()) {
127             --j;
128         }
129 
130         // Build the handler chain for [j+1, #phases-1]
131         RDFHandler result = handler;
132         for (int k = this.phases.size() - 1; k > j; --k) {
133             result = this.phases.get(k).eval(result,
134                     deduplicate && !this.unique && k == this.phases.size() - 1);
135         }
136 
137         // Build the handler for interval [i, j], if non empty
138         if (i <= j) {
139             final List<Phase> modelPhases = this.phases.subList(i, j + 1);
140             result = RDFHandlers.decouple(new AbstractRDFHandlerWrapper(result) {
141 
142                 private long ts;
143 
144                 private QuadModel model;
145 
146                 @Override
147                 public void startRDF() throws RDFHandlerException {
148                     super.startRDF();
149                     this.ts = System.currentTimeMillis();
150                     this.model = QuadModel.create();
151                 }
152 
153                 @Override
154                 public void handleStatement(final Statement stmt) throws RDFHandlerException {
155                     this.model.add(stmt);
156                 }
157 
158                 @Override
159                 public void endRDF() throws RDFHandlerException {
160                     if (LOGGER.isDebugEnabled()) {
161                         LOGGER.debug("Model populated in {} ms, {} statements",
162                                 System.currentTimeMillis() - this.ts, this.model.size());
163                     }
164                     for (final Phase phase : modelPhases) {
165                         phase.normalize(this.model.getValueNormalizer()).eval(this.model);
166                     }
167                     for (final Statement stmt : this.model) {
168                         super.handleStatement(stmt);
169                     }
170                     super.endRDF();
171                 }
172 
173             }, 1);
174         }
175 
176         // Build the handler chain for [0, i-1]
177         for (int k = i - 1; k >= 0; --k) {
178             result = this.phases.get(k).eval(result,
179                     deduplicate && !this.unique && k == this.phases.size() - 1);
180         }
181 
182         // Return the constructed handler chain
183         return result;
184     }
185 
186     private static void expand(final Statement stmt, final RDFHandler sink,
187             final StatementDeduplicator deduplicator,
188             @Nullable final StatementMatcher deleteMatcher,
189             @Nullable final StatementMatcher insertMatcher, final boolean fixpoint,
190             final boolean emitStmt) throws RDFHandlerException {
191 
192         // NOTE: this method has a HUGE impact on overall performances. In particular,
193         // deduplication speed is critical for overall performances.
194 
195         // If statement was already processed, abort
196         if (!deduplicator.add(stmt)) {
197             return;
198         }
199 
200         // Emit the supplied statements if authorized and if it does not match delete patterns
201         if (emitStmt
202                 && (deleteMatcher == null || !deleteMatcher.match(stmt.getSubject(),
203                         stmt.getPredicate(), stmt.getObject(), stmt.getContext()))) {
204             sink.handleStatement(stmt);
205         }
206 
207         // Abort if there is no way to infer new statements
208         if (insertMatcher == null) {
209             return;
210         }
211 
212         // Otherwise handle two cases based on whether evaluation should be done till fixpoint
213         if (!fixpoint) {
214             // No fixpoint. Inferred statements are directly emitted if they might be new
215             for (final StatementTemplate template : insertMatcher.map(stmt.getSubject(),
216                     stmt.getPredicate(), stmt.getObject(), stmt.getContext(),
217                     StatementTemplate.class)) {
218                 final Statement stmt2 = template.apply(stmt);
219                 if (stmt2 != null && deduplicator.add(stmt2)) {
220                     sink.handleStatement(stmt2);
221                 }
222             }
223 
224         } else {
225             // Fixpoint. We proceed in a breadth-first way
226             StatementDeduplicator totalDeduplicator = null;
227             List<StatementTemplate> templates = null; // created on demand
228             Statement[] stack = null;
229             int index = 0;
230             int capacity = 0;
231             Statement s = stmt;
232             while (true) {
233 
234                 // Emit statement if it does not match delete patterns
235                 if (stack != null
236                         && (deleteMatcher == null || !deleteMatcher.match(s.getSubject(),
237                                 s.getPredicate(), s.getObject(), s.getContext()))) {
238                     sink.handleStatement(s);
239                 }
240 
241                 // Apply insert part by looking up and applying insert templates.
242                 templates = insertMatcher.map(s.getSubject(), s.getPredicate(), s.getObject(),
243                         s.getContext(), StatementTemplate.class, templates);
244                 for (final StatementTemplate template : templates) {
245                     final Statement stmt2 = template.apply(s, deduplicator);
246                     if (stmt2 != null) {
247                         if (!deduplicator.isTotal()) {
248                             if (totalDeduplicator == null) {
249                                 totalDeduplicator = StatementDeduplicator
250                                         .newTotalDeduplicator(ComparisonMethod.EQUALS);
251                                 totalDeduplicator.add(stmt);
252                             }
253                             if (!totalDeduplicator.add(stmt2)) {
254                                 continue;
255                             }
256                         }
257                         if (index == capacity) {
258                             if (capacity == 0) {
259                                 capacity = 16;
260                                 stack = new Statement[capacity];
261                             } else {
262                                 capacity *= 2;
263                                 stack = Arrays.copyOf(stack, capacity);
264                             }
265                         }
266                         stack[index++] = stmt2;
267                     }
268                 }
269 
270                 // Terminate if there are no more statements to process
271                 if (index == 0) {
272                     break;
273                 }
274 
275                 // Otherwise, pick up the next statement to process from the stack
276                 s = stack[--index];
277 
278                 // Restore cached templates list
279                 templates.clear();
280             }
281         }
282     }
283 
284     private static Statement[] normalize(Statement[] statements,
285             final Function<Value, Value> normalizer) {
286 
287         if (normalizer != null) {
288             statements = statements.clone();
289             for (int i = 0; i < statements.length; ++i) {
290                 final Statement s = statements[i];
291                 statements[i] = Statements.VALUE_FACTORY.createStatement(
292                         (Resource) normalizer.apply(s.getSubject()),
293                         (URI) normalizer.apply(s.getPredicate()), //
294                         normalizer.apply(s.getObject()),
295                         (Resource) normalizer.apply(s.getContext()));
296             }
297         }
298         return statements;
299     }
300 
301     private static List<Phase> buildPhases(final Ruleset ruleset) {
302 
303         // Scan rules (which are ordered by phase, fixpoint, id) and identify the rules for
304         // each phase/fixpoint combination, instantiating the corresponding phase object
305         final List<Phase> phases = new ArrayList<>();
306         final List<Rule> rules = new ArrayList<>();
307         for (final Rule rule : ruleset.getRules()) {
308             if (!rules.isEmpty() && (rule.isFixpoint() != rules.get(0).isFixpoint() //
309                     || rule.getPhase() != rules.get(0).getPhase())) {
310                 phases.add(buildPhase(rules));
311                 rules.clear();
312             }
313             rules.add(rule);
314         }
315         if (!rules.isEmpty()) {
316             phases.add(buildPhase(rules));
317         }
318         return phases;
319     }
320 
321     private static Phase buildPhase(final List<Rule> rules) {
322 
323         // Determine whether all rules are (i) simple, (ii) streamable, (iii) insert-only
324         boolean simple = true;
325         boolean insertOnly = true;
326         boolean streamable = true;
327         for (final Rule rule : rules) {
328             simple &= rule.isSimple();
329             insertOnly &= rule.getDeleteExpr() == null;
330             streamable &= rule.isStreamable();
331         }
332 
333         // Select the type of phase based on rule properties
334         Phase phase;
335         if (streamable && ENABLE_STREAMING) {
336             phase = StreamPhase.create(rules);
337         } else if (simple && insertOnly && ENABLE_SEMINAIVE) {
338             phase = SemiNaivePhase.create(rules);
339         } else {
340             phase = NaivePhase.create(rules);
341         }
342 
343         // Return the Phase object built
344         return phase;
345     }
346 
347     private static abstract class Phase {
348 
349         private final boolean handlerSupported;
350 
351         private final boolean modelSupported;
352 
353         Phase(final boolean handlerSupported, final boolean modelSupported) {
354             this.handlerSupported = handlerSupported;
355             this.modelSupported = modelSupported;
356         }
357 
358         public Phase normalize(final Function<Value, Value> normalizer) {
359             return this;
360         }
361 
362         public RDFHandler eval(final RDFHandler handler, final boolean deduplicate) {
363             throw new Error();
364         }
365 
366         public void eval(final QuadModel model) {
367             throw new Error();
368         }
369 
370         public boolean isHandlerOutputUnique(final boolean inputUnique) {
371             return true;
372         }
373 
374         public final boolean isHandlerSupported() {
375             return this.handlerSupported;
376         }
377 
378         public final boolean isModelSupported() {
379             return this.modelSupported;
380         }
381 
382     }
383 
384     private static final class StreamPhase extends Phase {
385 
386         @Nullable
387         private final StatementMatcher deleteMatcher;
388 
389         @Nullable
390         private final StatementMatcher insertMatcher;
391 
392         private final Statement[] axioms;
393 
394         private final boolean fixpoint;
395 
396         private StreamPhase(@Nullable final StatementMatcher deleteMatcher,
397                 @Nullable final StatementMatcher insertMatcher, final Statement[] axioms,
398                 final boolean fixpoint) {
399 
400             // We only work in streaming
401             super(true, false);
402 
403             // Store supplied structures
404             this.deleteMatcher = deleteMatcher;
405             this.insertMatcher = insertMatcher;
406             this.axioms = axioms;
407             this.fixpoint = fixpoint;
408         }
409 
410         static StreamPhase create(final Iterable<Rule> rules) {
411 
412             // Allocate builders for the two matchers and an empty axiom list
413             StatementMatcher.Builder db = null;
414             StatementMatcher.Builder ib = null;
415             final List<Statement> axioms = new ArrayList<>();
416             boolean containsFixpointRule = false;
417             boolean containsNonFixpointRule = false;
418 
419             // Populate matchers by iterating over supplied rules
420             for (final Rule rule : rules) {
421                 assert rule.isSafe() && rule.isStreamable();
422                 containsFixpointRule |= rule.isFixpoint();
423                 containsNonFixpointRule |= !rule.isFixpoint();
424                 if (!rule.getWherePatterns().isEmpty()) {
425                     final StatementPattern wp = rule.getWherePatterns().iterator().next();
426                     for (final StatementPattern ip : rule.getInsertPatterns()) {
427                         ib = ib != null ? ib : StatementMatcher.builder();
428                         ib.addExpr(rule.getWhereExpr(), new StatementTemplate(ip, wp));
429                     }
430                     if (!rule.getDeletePatterns().isEmpty()) {
431                         db = db != null ? db : StatementMatcher.builder();
432                         db.addExpr(rule.getWhereExpr());
433                     }
434                 } else {
435                     for (final StatementPattern ip : rule.getInsertPatterns()) {
436                         final Value subj = ip.getSubjectVar().getValue();
437                         final Value pred = ip.getPredicateVar().getValue();
438                         final Value obj = ip.getObjectVar().getValue();
439                         final Value ctx = ip.getContextVar() == null ? null : ip.getContextVar()
440                                 .getValue();
441                         if (subj instanceof Resource && pred instanceof URI
442                                 && (ctx == null || ctx instanceof Resource)) {
443                             axioms.add(Statements.VALUE_FACTORY.createStatement((Resource) subj,
444                                     (URI) pred, obj, (Resource) ctx));
445                         }
446                     }
447                 }
448             }
449             assert containsFixpointRule ^ containsNonFixpointRule;
450 
451             // Build matchers
452             final StatementMatcher dm = db == null ? null : db.build(null);
453             final StatementMatcher im = ib == null ? null : ib.build(null);
454 
455             // Log result
456             if (LOGGER.isDebugEnabled()) {
457                 LOGGER.debug(
458                         "Configured StreamPhase: {} rules; {}; {} axioms; {} delete matcher; "
459                                 + "{} insert matcher", Iterables.size(rules),
460                         containsFixpointRule ? "fixpoint" : "non fixpoint", axioms.size(), dm, im);
461             }
462 
463             // Create a new StreamPhase object, using null when there are no deletions or
464             // insertions possible, and determining whether fixpoint evaluation should occur
465             return new StreamPhase(dm, im, axioms.toArray(new Statement[axioms.size()]),
466                     containsFixpointRule);
467         }
468 
469         @Override
470         public boolean isHandlerOutputUnique(final boolean inputUnique) {
471             return inputUnique && this.insertMatcher == null;
472         }
473 
474         @Override
475         public Phase normalize(final Function<Value, Value> normalizer) {
476 
477             // Normalize delete matchers and insert matchers with associated templates
478             final StatementMatcher normalizedDeleteMatcher = this.deleteMatcher == null ? null
479                     : this.deleteMatcher.normalize(normalizer);
480             final StatementMatcher normalizedInsertMatcher = this.insertMatcher == null ? null
481                     : this.insertMatcher.normalize(normalizer);
482 
483             // Normalize axioms
484             final Statement[] normalizedAxioms = RuleEngineImpl.normalize(this.axioms, normalizer);
485 
486             // Return a normalized copy of this phase object
487             return new StreamPhase(normalizedDeleteMatcher, normalizedInsertMatcher,
488                     normalizedAxioms, this.fixpoint);
489         }
490 
491         @Override
492         public RDFHandler eval(final RDFHandler handler, final boolean deduplicate) {
493 
494             return new AbstractRDFHandlerWrapper(handler) {
495 
496                 private StatementDeduplicator deduplicator;
497 
498                 @Override
499                 public void startRDF() throws RDFHandlerException {
500 
501                     // Delegate
502                     super.startRDF();
503 
504                     // Initialize deduplicator
505                     if (deduplicate || FORCE_DEDUPLICATION) {
506                         this.deduplicator = StatementDeduplicator
507                                 .newTotalDeduplicator(ComparisonMethod.HASH);
508                     } else {
509                         this.deduplicator = StatementDeduplicator.newPartialDeduplicator(
510                                 ComparisonMethod.EQUALS, DEDUPLICATION_CACHE_SIZE);
511                     }
512 
513                     // Emit axioms
514                     for (final Statement axiom : StreamPhase.this.axioms) {
515                         expand(axiom, this.handler, this.deduplicator,
516                                 StreamPhase.this.fixpoint ? StreamPhase.this.deleteMatcher : null,
517                                 StreamPhase.this.insertMatcher, StreamPhase.this.fixpoint, true);
518                     }
519                 }
520 
521                 @Override
522                 public void handleStatement(final Statement stmt) throws RDFHandlerException {
523 
524                     // Delegate to recursive method expand(), marking the statement as explicit
525                     expand(stmt, this.handler, this.deduplicator, StreamPhase.this.deleteMatcher,
526                             StreamPhase.this.insertMatcher, StreamPhase.this.fixpoint, true);
527                 }
528 
529             };
530         }
531     }
532 
533     private static final class NaivePhase extends Phase {
534 
535         private final List<Rule> rules;
536 
537         private final boolean fixpoint;
538 
539         private final boolean canDelete;
540 
541         private final boolean canInsert;
542 
543         private NaivePhase(final List<Rule> rules, final boolean fixpoint,
544                 final boolean canDelete, final boolean canInsert) {
545             super(false, true);
546             this.rules = rules;
547             this.fixpoint = fixpoint;
548             this.canDelete = canDelete;
549             this.canInsert = canInsert;
550         }
551 
552         static NaivePhase create(final Iterable<Rule> rules) {
553 
554             // Extract list of rules and fixpoint mode
555             final List<Rule> ruleList = ImmutableList.copyOf(rules);
556             final boolean fixpoint = ruleList.get(0).isFixpoint();
557 
558             // Log result
559             if (LOGGER.isDebugEnabled()) {
560                 LOGGER.debug("Configured NaivePhase: {} rules; {}", ruleList.size(),
561                         fixpoint ? "fixpoint" : "non fixpoint");
562             }
563 
564             // Determine whether deletions and/or insertions are possible
565             boolean canDelete = false;
566             boolean canInsert = false;
567             for (final Rule rule : rules) {
568                 canDelete |= rule.getDeleteExpr() != null;
569                 canInsert |= rule.getInsertExpr() != null;
570             }
571 
572             // Build the naive phase
573             return new NaivePhase(ruleList, fixpoint && canInsert, canDelete, canInsert);
574         }
575 
576         private StatementDeduplicator newDeduplicator() {
577             return FORCE_DEDUPLICATION ? StatementDeduplicator
578                     .newTotalDeduplicator(ComparisonMethod.HASH) : StatementDeduplicator
579                     .newPartialDeduplicator(ComparisonMethod.EQUALS, DEDUPLICATION_CACHE_SIZE);
580         }
581 
582         @Override
583         public void eval(final QuadModel model) {
584 
585             StatementDeduplicator deleteDeduplicator = this.canDelete ? newDeduplicator() : null;
586             StatementDeduplicator insertDeduplicator = this.canInsert ? newDeduplicator() : null;
587 
588             if (!this.fixpoint) {
589                 // (1) One-shot evaluation
590                 evalRules(deleteDeduplicator, insertDeduplicator, model);
591 
592             } else {
593                 // (2) Naive fixpoint evaluation
594                 while (true) {
595                     final boolean modified = evalRules(deleteDeduplicator, insertDeduplicator,
596                             model);
597                     if (!modified) {
598                         break; // fixpoint reached
599                     }
600                     if (this.canInsert && this.canDelete) {
601                         deleteDeduplicator = newDeduplicator();
602                         insertDeduplicator = newDeduplicator();
603                     }
604                 }
605             }
606         }
607 
608         private boolean evalRules(final StatementDeduplicator deleteDeduplicator,
609                 final StatementDeduplicator insertDeduplicator, final QuadModel model) {
610 
611             // Take a timestamp
612             final long ts1 = System.currentTimeMillis();
613 
614             // Allocate delete and insert buffers (initially empty)
615             final StatementBuffer deleteBuffer = new StatementBuffer();
616             final StatementBuffer insertBuffer = new StatementBuffer();
617 
618             // Evaluate all rules in parallel, collecting produced quads in the two buffers
619             final int numRules = Rule.evaluate(this.rules, model, null, () -> {
620                 return deleteDeduplicator.deduplicate(deleteBuffer.get(), true);
621             }, () -> {
622                 return insertDeduplicator.deduplicate(insertBuffer.get(), true);
623             });
624 
625             // Take another timestamp and measure buffer sizes after rule evaluation
626             final long ts2 = System.currentTimeMillis();
627             final int deleteBufferSize = deleteBuffer.size();
628             final int insertBufferSize = insertBuffer.size();
629 
630             // Allocate a buffer where to accumulate statements actually deleted. This is
631             // necessary only in case there are both deletions and insertions
632             final StatementBuffer deleteDelta = deleteBuffer.isEmpty() || //
633                     insertBuffer.isEmpty() ? null : new StatementBuffer();
634             final RDFHandler deleteCallback = deleteDelta == null ? null : deleteDelta.get();
635 
636             // Apply the modifications resulting from rule evaluation, tracking the model sizes. A
637             // side result of this operation is that the two buffers are deduplicated (this is
638             // essential for determining if the model changed or not)
639             final int size0 = model.size();
640             deleteBuffer.toModel(model, false, deleteCallback);
641             final int size1 = model.size();
642             insertBuffer.toModel(model, true, null);
643             final int size2 = model.size();
644 
645             // Determine whether the model has changed w.r.t. its original state. This is done by
646             // first comparing the different model sizes and as a last resort by comparing the two
647             // delete and insert buffers to see if they are equal (=> model unchanged)
648             boolean result;
649             if (size0 != size2) {
650                 result = true;
651             } else if (size0 == size1) {
652                 result = false;
653             } else {
654                 result = insertBuffer.contains(deleteBuffer);
655             }
656 
657             // Take a final timestamp and log relevant statistics if enabled
658             final long ts3 = System.currentTimeMillis();
659             if (LOGGER.isDebugEnabled()) {
660                 LOGGER.debug("{} rules (out of {}) rules evaluated in {} ms ({} ms query, "
661                         + "{} ms modify), {} deletions ({} buffered), {} insertions "
662                         + "({} buffered), {} quads in, {} quads out", numRules, this.rules.size(),
663                         ts3 - ts1, ts2 - ts1, ts3 - ts2, size1 - size0, deleteBufferSize, size2
664                                 - size1, insertBufferSize, size0, size2);
665             }
666 
667             // Return true if the model has changed
668             return result;
669         }
670 
671     }
672 
673     private static final class SemiNaivePhase extends Phase {
674 
675         private final List<Rule> allRules;
676 
677         private final List<Rule> joinRules;
678 
679         private final StatementMatcher streamMatcher;
680 
681         private final StatementMatcher joinMatcher;
682 
683         private final Statement[] axioms;
684 
685         private final boolean fixpoint;
686 
687         private SemiNaivePhase(final List<Rule> rules, final List<Rule> joinRules,
688                 final StatementMatcher streamMatcher, final StatementMatcher joinMatcher,
689                 final Statement[] axioms, final boolean fixpoint) {
690 
691             super(!joinMatcher.matchAll(), true);
692             this.allRules = rules;
693             this.joinRules = joinRules;
694             this.streamMatcher = streamMatcher;
695             this.joinMatcher = joinMatcher;
696             this.axioms = axioms;
697             this.fixpoint = fixpoint;
698         }
699 
700         public static SemiNaivePhase create(final Iterable<Rule> rules) {
701 
702             // Extract list of rules and fixpoint mode
703             final List<Rule> allRules = ImmutableList.copyOf(rules);
704             final boolean fixpoint = allRules.get(0).isFixpoint();
705 
706             // Allocate builders for the two matchers and an empty axiom list
707             final List<Rule> joinRules = Lists.newArrayList();
708             final List<Statement> axioms = new ArrayList<>();
709             final StatementMatcher.Builder joinBuilder = StatementMatcher.builder();
710             final StatementMatcher.Builder streamBuilder = StatementMatcher.builder();
711 
712             // Scan rules to (i) identify join rules; (ii) collect axioms from stream rules
713             // with empty WHERE part; and (iii) build the join and stream matchers
714             for (final Rule rule : allRules) {
715                 if (!rule.isStreamable()) {
716                     joinRules.add(rule);
717                     for (final StatementPattern wp : rule.getWherePatterns()) {
718                         joinBuilder.addPattern(wp, null);
719                     }
720                 } else if (!rule.getWherePatterns().isEmpty()) {
721                     final StatementPattern wp = rule.getWherePatterns().iterator().next();
722                     for (final StatementPattern ip : rule.getInsertPatterns()) {
723                         streamBuilder.addExpr(rule.getWhereExpr(), new StatementTemplate(ip, wp));
724                     }
725                 } else {
726                     for (final StatementPattern ip : rule.getInsertPatterns()) {
727                         final Value subj = ip.getSubjectVar().getValue();
728                         final Value pred = ip.getPredicateVar().getValue();
729                         final Value obj = ip.getObjectVar().getValue();
730                         final Value ctx = ip.getContextVar() == null ? null : ip.getContextVar()
731                                 .getValue();
732                         if (subj instanceof Resource && pred instanceof URI
733                                 && (ctx == null || ctx instanceof Resource)) {
734                             axioms.add(Statements.VALUE_FACTORY.createStatement((Resource) subj,
735                                     (URI) pred, obj, (Resource) ctx));
736                         }
737                     }
738                 }
739             }
740             final StatementMatcher joinMatcher = joinBuilder.build(null);
741             final StatementMatcher streamMatcher = streamBuilder.build(null);
742 
743             // Log result
744             if (LOGGER.isDebugEnabled()) {
745                 LOGGER.debug("Configured SemiNaivePhase: {} rules ({} join); {}; {} axioms; "
746                         + "{} join matcher ({}); {} stream matcher", allRules.size(),
747                         joinRules.size(), fixpoint ? "fixpoint" : "non fixpoint", axioms.size(),
748                         joinMatcher, joinMatcher.matchAll() ? "match all" : "no match all",
749                         streamMatcher);
750             }
751 
752             // Create and return the SemiNaivePhase object for the rules specified
753             return new SemiNaivePhase(allRules, ImmutableList.copyOf(joinRules), streamMatcher,
754                     joinMatcher, axioms.toArray(new Statement[axioms.size()]), fixpoint);
755         }
756 
757         @Override
758         public boolean isHandlerOutputUnique(final boolean inputUnique) {
759             return this.fixpoint && this.joinMatcher.matchAll();
760         }
761 
762         @Override
763         public Phase normalize(final Function<Value, Value> normalizer) {
764 
765             final StatementMatcher normStreamMatcher = this.streamMatcher.normalize(normalizer);
766             final StatementMatcher normModelMatcher = this.joinMatcher.normalize(normalizer);
767             final Statement[] normAxioms = RuleEngineImpl.normalize(this.axioms, normalizer);
768 
769             Phase result = this;
770             if (normStreamMatcher != this.streamMatcher || normModelMatcher != this.joinMatcher
771                     || normAxioms != this.axioms) {
772                 result = new SemiNaivePhase(this.allRules, this.joinRules, normStreamMatcher,
773                         normModelMatcher, normAxioms, this.fixpoint);
774             }
775             return result;
776         }
777 
778         @SuppressWarnings("resource")
779         @Override
780         public RDFHandler eval(final RDFHandler handler, final boolean deduplicate) {
781 
782             // Instantiate and return a wrapper, depending on whether fixpoint evaluation is used
783             return this.fixpoint ? new FixpointHandler(handler, deduplicate)
784                     : new NonFixpointHandler(handler, deduplicate);
785         }
786 
787         @Override
788         public void eval(final QuadModel model) {
789 
790             // Allocate a partial deduplicator to use during all the phase evaluation
791             final StatementDeduplicator deduplicator;
792             if (FORCE_DEDUPLICATION) {
793                 deduplicator = StatementDeduplicator.newTotalDeduplicator(ComparisonMethod.HASH);
794             } else {
795                 deduplicator = StatementDeduplicator.newPartialDeduplicator(
796                         ComparisonMethod.EQUALS, DEDUPLICATION_CACHE_SIZE);
797             }
798 
799             // Handle three case
800             if (!this.fixpoint) {
801                 // (1) Single iteration of both join and stream rules
802                 evalJoinStreamIteration(deduplicator, model);
803 
804             } else {
805                 // (2) Semi-naive fixpoint evaluation. Expand the model evaluating stream
806                 // rules first, then evaluate join rules + stream rules in fixpoint
807                 if (this.joinRules.size() < this.allRules.size()) {
808                     evalStreamFixpoint(deduplicator, model);
809                 }
810                 QuadModel delta = null;
811                 while (true) {
812                     delta = evalJoinIterationStreamFixpoint(deduplicator, model, delta, null);
813                     if (delta.isEmpty()) {
814                         break; // fixpoint reached
815                     }
816                 }
817             }
818         }
819 
820         private void evalJoinStreamIteration(final StatementDeduplicator deduplicator,
821                 final QuadModel model) {
822 
823             // Take a timestamp before evaluating rules
824             final long ts0 = System.currentTimeMillis();
825 
826             // Allocate insert buffer (initially empty)
827             final StatementBuffer buffer = new StatementBuffer();
828 
829             // Evaluate stream rules, including axioms, single iteration (no fixpoint)
830             buffer.addAll(Arrays.asList(this.axioms));
831             applyStreamRules(deduplicator, model, buffer, false);
832 
833             // Evaluate join rules, single iteration (no fixpoint)
834             final int numVariants = Rule.evaluate(SemiNaivePhase.this.joinRules, model, null,
835                     null, () -> {
836                         return deduplicator.deduplicate(buffer.get(), true);
837                     });
838 
839             // Take a timestamp after evaluating rules
840             final long ts1 = System.currentTimeMillis();
841 
842             // Apply changes to the model
843             final int size0 = model.size();
844             buffer.toModel(model, true, null);
845             final int size1 = model.size();
846 
847             // Take a timestamp after modifying the model
848             final long ts2 = System.currentTimeMillis();
849 
850             // Log evaluation statistics
851             if (LOGGER.isDebugEnabled()) {
852                 LOGGER.debug("Iteration of {} join rules ({} variants) and {} stream rules "
853                         + "performed in {} ms ({} ms evaluation, {} ms model update), "
854                         + "{} insertions ({} buffered), {} quads in, {} quads out",
855                         this.joinRules.size(), numVariants,
856                         this.allRules.size() - this.joinRules.size(), ts2 - ts0, ts1 - ts0, ts2
857                                 - ts1, size1 - size0, buffer.size(), size0, size1);
858             }
859         }
860 
861         private QuadModel evalJoinIterationStreamFixpoint(
862                 final StatementDeduplicator deduplicator, final QuadModel model,
863                 @Nullable final QuadModel delta, @Nullable final RDFHandler unmatcheableSink) {
864 
865             // Take a timestamp
866             final long ts0 = System.currentTimeMillis();
867 
868             // Allocate insert buffer (initially empty)
869             final StatementBuffer buffer = new StatementBuffer();
870 
871             // Build a supplier of RDFHandlers to be used for handling inferred statements.
872             // Produced handlers will expand statements applying streamable rules with partial
873             // result deduplication. Expanded statements are either emitted (if not further
874             // processable) or accumulated in a buffer
875             final Supplier<RDFHandler> supplier = () -> {
876                 RDFHandler handler = buffer.get();
877                 if (unmatcheableSink != null) {
878                     handler = new AbstractRDFHandlerWrapper(handler) {
879 
880                         @Override
881                         public void handleStatement(final Statement stmt)
882                                 throws RDFHandlerException {
883                             if (SemiNaivePhase.this.joinMatcher.match(stmt)) {
884                                 super.handleStatement(stmt);
885                             } else {
886                                 unmatcheableSink.handleStatement(stmt);
887                             }
888                         }
889 
890                     };
891                 }
892                 handler = new AbstractRDFHandlerWrapper(handler) {
893 
894                     @Override
895                     public void handleStatement(final Statement stmt) throws RDFHandlerException {
896                         expand(stmt, this.handler, deduplicator, null,
897                                 SemiNaivePhase.this.streamMatcher, true, true);
898                     }
899 
900                 };
901                 return handler;
902             };
903 
904             // Evaluate join rules in parallel using the supplier created before
905             final int numVariants = Rule.evaluate(this.joinRules, model, delta, null, supplier);
906 
907             // Take another timestamp and measure size of join buffer after evaluation
908             final long ts1 = System.currentTimeMillis();
909             final int joinBufferSize = buffer.size();
910 
911             // Insert the quads resulting from rule evaluation.
912             final StatementBuffer deltaBuffer = new StatementBuffer();
913             final int size0 = model.size();
914             buffer.toModel(model, true, deltaBuffer.get());
915             final int size1 = model.size();
916             final long ts2 = System.currentTimeMillis();
917 
918             // Compute new delta model
919             final QuadModel newDelta = model.filter(deltaBuffer);
920 
921             // Take a final timestamp and log relevant statistics if enabled
922             final long ts3 = System.currentTimeMillis();
923             if (LOGGER.isDebugEnabled()) {
924                 final int numJoinRules = this.joinRules.size();
925                 final int numStreamRules = this.allRules.size() - this.joinRules.size();
926                 LOGGER.debug("Iteration of {} join rules ({} variants) and fixpoint of {} stream "
927                         + "rules evaluated in {} ms ({} ms evaluation, {} ms model update, {} ms "
928                         + "delta), {} insertions ({} buffered), {} quads in, {} quads out",
929                         numJoinRules, numVariants, numStreamRules, ts3 - ts0, ts1 - ts0,
930                         ts2 - ts1, ts3 - ts2, size1 - size0, joinBufferSize, size0, size1);
931             }
932 
933             // Return the new delta model
934             return newDelta;
935         }
936 
937         private void evalStreamFixpoint(final StatementDeduplicator deduplicator,
938                 final QuadModel model) {
939 
940             // Take a timestamp for tracking execution time
941             final long ts0 = System.currentTimeMillis();
942 
943             // Allocate a buffer where to accumulate the result of rule evaluation
944             final StatementBuffer buffer = new StatementBuffer();
945             buffer.addAll(Arrays.asList(this.axioms));
946             applyStreamRules(deduplicator, Iterables.concat(Arrays.asList(this.axioms), model),
947                     buffer, true);
948 
949             // Take a timestamp before modifying the model
950             final long ts1 = System.currentTimeMillis();
951 
952             // Apply changes to the model
953             final int size0 = model.size();
954             buffer.toModel(model, true, null);
955             final int size1 = model.size();
956 
957             // Take a final timestamp
958             final long ts2 = System.currentTimeMillis();
959 
960             // Log statistics
961             if (LOGGER.isDebugEnabled()) {
962                 final int numStreamRules = this.allRules.size() - this.joinRules.size();
963                 LOGGER.debug("Fixpoint of {} stream rules evaluated in {} ms ({} ms evaluation, "
964                         + "{} ms model update), {} insertions ({} buffered), {} quads in, "
965                         + "{} quads out", numStreamRules, ts2 - ts0, ts1 - ts0, ts2 - ts1, size1
966                         - size0, buffer.size(), size0, size1);
967             }
968         }
969 
970         private void applyStreamRules(final StatementDeduplicator deduplicator,
971                 final Iterable<Statement> statements, final Supplier<RDFHandler> sink,
972                 final boolean fixpoint) {
973 
974             // Prepare a tree of RDFHandlers to process model statements. The root handler
975             // dispatches statements in a round robin way to child handler sequences, each one
976             // delegating work to another thread that perform expansion and populates the buffer
977             final RDFHandler[] sinks = new RDFHandler[Environment.getCores()];
978             for (int i = 0; i < sinks.length; ++i) {
979                 sinks[i] = RDFHandlers.decouple(new AbstractRDFHandlerWrapper(sink.get()) {
980 
981                     @Override
982                     public void handleStatement(final Statement stmt) throws RDFHandlerException {
983                         expand(stmt, this.handler, deduplicator, null,
984                                 SemiNaivePhase.this.streamMatcher, fixpoint, false);
985                     }
986 
987                 }, 1);
988             }
989             final RDFHandler handler = RDFHandlers.dispatchRoundRobin(64, sinks);
990 
991             // Evaluate rules
992             try {
993                 RDFSources.wrap(statements).emit(handler, 1);
994             } catch (final RDFHandlerException ex) {
995                 Throwables.propagate(ex);
996             }
997         }
998 
999         private final class FixpointHandler extends AbstractRDFHandlerWrapper {
1000 
1001             private final boolean deduplicate;
1002 
1003             private QuadModel joinModel;
1004 
1005             private StatementDeduplicator deduplicator;
1006 
1007             private RDFHandler sink;
1008 
1009             FixpointHandler(final RDFHandler handler, final boolean deduplicate) {
1010                 super(handler);
1011                 this.deduplicate = deduplicate && SemiNaivePhase.this.fixpoint
1012                         && SemiNaivePhase.this.joinMatcher.matchAll();
1013             }
1014 
1015             @Override
1016             public void startRDF() throws RDFHandlerException {
1017 
1018                 // Delegate
1019                 super.startRDF();
1020 
1021                 // Allocate a model for statements matching WHERE patterns of join rule
1022                 this.joinModel = QuadModel.create();
1023 
1024                 // Allocate a deduplicator
1025                 this.deduplicator = this.deduplicate || FORCE_DEDUPLICATION ? StatementDeduplicator
1026                         .newTotalDeduplicator(ComparisonMethod.HASH) : StatementDeduplicator
1027                         .newPartialDeduplicator(ComparisonMethod.EQUALS, DEDUPLICATION_CACHE_SIZE);
1028 
1029                 // Setup the sink where to emit output of stream rules
1030                 this.sink = new AbstractRDFHandlerWrapper(this.handler) {
1031 
1032                     @Override
1033                     public void handleStatement(final Statement stmt) throws RDFHandlerException {
1034                         if (SemiNaivePhase.this.joinMatcher.match(stmt)) {
1035                             synchronized (FixpointHandler.this.joinModel) {
1036                                 FixpointHandler.this.joinModel.add(stmt);
1037                             }
1038                         } else {
1039                             this.handler.handleStatement(stmt);
1040                         }
1041                     }
1042 
1043                 };
1044 
1045                 // Emit axioms
1046                 for (final Statement axiom : SemiNaivePhase.this.axioms) {
1047                     handleStatement(axiom);
1048                 }
1049             }
1050 
1051             @Override
1052             public void handleStatement(final Statement stmt) throws RDFHandlerException {
1053 
1054                 // Apply stream rules with output deduplication. Inferred statements are either
1055                 // emitted (if not further processable) or accumulated in a model
1056                 expand(stmt, this.sink, this.deduplicator, null,
1057                         SemiNaivePhase.this.streamMatcher, true, true);
1058             }
1059 
1060             @Override
1061             public void endRDF() throws RDFHandlerException {
1062 
1063                 // Semi-naive fixpoint evaluation
1064                 QuadModel delta = null;
1065                 while (true) {
1066                     delta = evalJoinIterationStreamFixpoint(this.deduplicator, this.joinModel,
1067                             delta, this.handler);
1068                     if (delta.isEmpty()) {
1069                         break; // fixpoint reached
1070                     }
1071                 }
1072 
1073                 // Emit closed statements in the join model
1074                 final RDFHandler sink = RDFHandlers.decouple(RDFHandlers.ignoreMethods(
1075                         this.handler, RDFHandlers.METHOD_START_RDF | RDFHandlers.METHOD_END_RDF
1076                                 | RDFHandlers.METHOD_CLOSE));
1077                 RDFSources.wrap(this.joinModel).emit(sink, 1);
1078 
1079                 // Release memory
1080                 this.joinModel = null;
1081                 this.deduplicator = null;
1082                 this.sink = null;
1083 
1084                 // Signal completion
1085                 super.endRDF();
1086             }
1087 
1088         }
1089 
1090         private final class NonFixpointHandler extends AbstractRDFHandlerWrapper {
1091 
1092             private final boolean deduplicate;
1093 
1094             private QuadModel joinModel;
1095 
1096             private StatementDeduplicator deduplicator;
1097 
1098             NonFixpointHandler(final RDFHandler handler, final boolean deduplicate) {
1099                 super(handler);
1100                 this.deduplicate = deduplicate;
1101             }
1102 
1103             @Override
1104             public void startRDF() throws RDFHandlerException {
1105 
1106                 // Delegate
1107                 super.startRDF();
1108 
1109                 // Allocate model and deduplicator
1110                 this.joinModel = QuadModel.create();
1111                 this.deduplicator = this.deduplicate || FORCE_DEDUPLICATION ? StatementDeduplicator
1112                         .newTotalDeduplicator(ComparisonMethod.HASH) : StatementDeduplicator
1113                         .newPartialDeduplicator(ComparisonMethod.EQUALS, DEDUPLICATION_CACHE_SIZE);
1114 
1115                 // Emit axioms
1116                 for (final Statement axiom : SemiNaivePhase.this.axioms) {
1117                     this.handler.handleStatement(axiom);
1118                 }
1119             }
1120 
1121             @Override
1122             public void handleStatement(final Statement stmt) throws RDFHandlerException {
1123 
1124                 // Apply stream rules to all incoming statements
1125                 expand(stmt, this.handler, this.deduplicator, null,
1126                         SemiNaivePhase.this.streamMatcher, false, true);
1127 
1128                 // Statements matching WHERE part of join rules are accumulated in a model
1129                 if (SemiNaivePhase.this.joinMatcher.match(stmt)) {
1130                     synchronized (this.joinModel) {
1131                         this.joinModel.add(stmt);
1132                     }
1133                 }
1134             }
1135 
1136             @Override
1137             public void endRDF() throws RDFHandlerException {
1138 
1139                 // Apply join rules on accumulated statements, emitting inferred statements
1140                 Rule.evaluate(SemiNaivePhase.this.joinRules, this.joinModel, null, null, () -> {
1141                     return this.deduplicator.deduplicate(this.handler, true);
1142                 });
1143 
1144                 // Notify completion
1145                 super.endRDF();
1146             }
1147 
1148         }
1149 
1150     }
1151 
1152 }