1   /*
2    * RDFpro - An extensible tool for building stream-oriented RDF processing libraries.
3    * 
4    * Written in 2015 by Francesco Corcoglioniti with support by Alessio Palmero Aprosio and Marco
5    * Rospocher. Contact info on http://rdfpro.fbk.eu/
6    * 
7    * To the extent possible under law, the authors have dedicated all copyright and related and
8    * neighboring rights to this software to the public domain worldwide. This software is
9    * distributed without any warranty.
10   * 
11   * You should have received a copy of the CC0 Public Domain Dedication along with this software.
12   * If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.
13   */
14  package eu.fbk.rdfpro;
15  
16  import java.io.IOException;
17  import java.io.Reader;
18  import java.util.ArrayList;
19  import java.util.List;
20  
21  import javax.annotation.Nullable;
22  
23  import org.openrdf.model.BNode;
24  import org.openrdf.model.Statement;
25  import org.openrdf.model.URI;
26  import org.openrdf.model.Value;
27  import org.openrdf.model.vocabulary.RDF;
28  import org.openrdf.model.vocabulary.SESAME;
29  import org.openrdf.query.impl.MapBindingSet;
30  import org.openrdf.rio.RDFHandler;
31  import org.openrdf.rio.RDFHandlerException;
32  import org.slf4j.Logger;
33  import org.slf4j.LoggerFactory;
34  
35  import eu.fbk.rdfpro.util.Environment;
36  import eu.fbk.rdfpro.util.IO;
37  import eu.fbk.rdfpro.util.Namespaces;
38  import eu.fbk.rdfpro.util.Options;
39  import eu.fbk.rdfpro.util.QuadModel;
40  import eu.fbk.rdfpro.util.Statements;
41  import eu.fbk.rdfpro.util.Tracker;
42  
43  final class ProcessorRules implements RDFProcessor {
44  
45      private static final Logger LOGGER = LoggerFactory.getLogger(ProcessorRules.class);
46  
47      private final RuleEngine engine;
48  
49      @Nullable
50      private final Mapper mapper;
51  
52      @Nullable
53      private final QuadModel tboxClosure;
54  
55      private final boolean dropBNodeTypes;
56  
57      private final boolean deduplicate;
58  
59      static RDFProcessor create(final String name, final String... args) throws IOException,
60              RDFHandlerException {
61  
62          // Validate and parse options
63          final Options options = Options.parse("r!|B!|p!|g!|t|C|c!|b!|w|u|*", args);
64  
65          // Read base and preserve BNodes settings
66          final boolean preserveBNodes = !options.hasOption("w");
67          String base = options.getOptionArg("b", String.class);
68          base = base == null ? null : Statements.parseValue(base.contains(":") ? base : base + ":",
69                  Namespaces.DEFAULT).stringValue();
70  
71          // Read bindings
72          final String parameters = options.getOptionArg("B", String.class, "");
73          final MapBindingSet bindings = new MapBindingSet();
74          for (final String token : parameters.split("\\s+")) {
75              final int index = token.indexOf('=');
76              if (index >= 0) {
77                  bindings.addBinding(token.substring(0, index).trim(), Statements.parseValue(token
78                          .substring(index + 1).trim(), Namespaces.DEFAULT));
79              }
80          }
81  
82          // Read rulesets
83          final List<String> rdfRulesetURLs = new ArrayList<>();
84          final List<String> dlogRulesetURLs = new ArrayList<>();
85          final String rulesetNames = options.getOptionArg("r", String.class);
86          for (final String rulesetName : rulesetNames.split(",")) {
87              String location = Environment.getProperty("rdfpro.rules." + rulesetName);
88              location = location != null ? location : rulesetName;
89              final String url = IO.extractURL(location).toString();
90              (url.endsWith(".dlog") ? dlogRulesetURLs : rdfRulesetURLs).add(url);
91          }
92          Ruleset ruleset = null;
93          if (!rdfRulesetURLs.isEmpty()) {
94              final RDFSource rulesetSource = RDFSources.read(true, preserveBNodes, base, null,
95                      rdfRulesetURLs.toArray(new String[rdfRulesetURLs.size()]));
96              try {
97                  ruleset = Ruleset.fromRDF(rulesetSource);
98              } catch (final Throwable ex) {
99                  LOGGER.error("Invalid ruleset", ex);
100                 throw ex;
101             }
102         }
103         if (!dlogRulesetURLs.isEmpty()) {
104             final List<Ruleset> rulesets = new ArrayList<>();
105             if (ruleset != null) {
106                 rulesets.add(ruleset);
107             }
108             for (final String dlogRulesetURL : dlogRulesetURLs) {
109                 try (Reader dlogReader = IO.utf8Reader(IO.read(dlogRulesetURL))) {
110                     final List<Rule> rules = Rule.fromDLOG(dlogReader);
111                     rulesets.add(new Ruleset(rules, null));
112                 }
113             }
114             ruleset = Ruleset.merge(rulesets.toArray(new Ruleset[rulesets.size()]));
115         }
116 
117         // Transform ruleset
118         ruleset = ruleset.rewriteVariables(bindings);
119         URI globalURI = null;
120         if (options.hasOption("G")) {
121             final String u = options.getOptionArg("G", String.class);
122             globalURI = (URI) Statements.parseValue(u.contains(":") ? u //
123                     : u + ":", Namespaces.DEFAULT);
124         }
125         final String mode = options.getOptionArg("g", String.class, "none").trim();
126         if ("global".equalsIgnoreCase(mode)) {
127             ruleset = ruleset.rewriteGlobalGM(globalURI);
128         } else if ("separate".equalsIgnoreCase(mode)) {
129             ruleset = ruleset.rewriteSeparateGM();
130         } else if ("star".equalsIgnoreCase(mode)) {
131             ruleset = ruleset.rewriteStarGM(globalURI);
132         } else if (!"none".equalsIgnoreCase(mode)) {
133             throw new IllegalArgumentException("Unknown graph inference mode: " + mode);
134         }
135 
136         // Read TBox closure settings
137         boolean emitTBox = false;
138         URI tboxContext = null;
139         if (options.hasOption("C")) {
140             emitTBox = true;
141         } else if (options.hasOption("c")) {
142             emitTBox = true;
143             final String ctx = options.getOptionArg("c", String.class);
144             tboxContext = (URI) Statements.parseValue(ctx.contains(":") ? ctx //
145                     : ctx + ":", Namespaces.DEFAULT);
146         }
147 
148         // Read bnode types settings
149         final boolean dropBNodeTypes = options.hasOption("t");
150 
151         // Read Mapper for optional partitioning
152         Mapper mapper = null;
153         final String partitioning = options.getOptionArg("p", String.class, "none").trim();
154         if ("entity".equalsIgnoreCase(partitioning)) {
155             mapper = Mapper.concat(Mapper.select("s"), Mapper.select("o"));
156         } else if ("graph".equalsIgnoreCase(partitioning)) {
157             mapper = Mapper.select("c");
158         } else if ("rules".equalsIgnoreCase(partitioning)) {
159             throw new UnsupportedOperationException("Rule-based partitioning not yet implemented");
160         } else if (!"none".equals(partitioning)) {
161             throw new IllegalArgumentException("Unknown partitioning scheme: " + partitioning);
162         }
163 
164         // Read TBox data, if any
165         final String[] tboxSpecs = options.getPositionalArgs(String.class).toArray(new String[0]);
166         final RDFSource tboxData = tboxSpecs.length == 0 ? null : RDFProcessors.track(
167                 new Tracker(LOGGER, null, "%d TBox triples read (%d tr/s avg)", //
168                         "%d TBox triples read (%d tr/s, %d tr/s avg)")).wrap(
169                 RDFSources.read(true, preserveBNodes, base, null, tboxSpecs));
170 
171         // Read deduplicate flag
172         final boolean deduplicate = options.hasOption("u");
173 
174         // Build processor
175         return new ProcessorRules(ruleset, mapper, dropBNodeTypes, deduplicate, tboxData,
176                 emitTBox, tboxContext);
177     }
178 
179     public ProcessorRules(final Ruleset ruleset, @Nullable final Mapper mapper,
180             final boolean dropBNodeTypes, final boolean deduplicate) {
181         this(ruleset, mapper, dropBNodeTypes, deduplicate, null, false, null);
182     }
183 
184     public ProcessorRules(final Ruleset ruleset, @Nullable final Mapper mapper,
185             final boolean dropBNodeTypes, final boolean deduplicate,
186             @Nullable final RDFSource tboxData, final boolean emitTBox,
187             @Nullable final URI tboxContext) {
188 
189         // Process ruleset and static data
190         LOGGER.debug("Processing {} rules {} TBox data", ruleset.getRules().size(),
191                 tboxData == null ? "without" : "with");
192         final long ts = System.currentTimeMillis();
193         Ruleset processedRuleset = ruleset.mergeSameWhereExpr();
194         RuleEngine engine = RuleEngine.create(processedRuleset);
195         QuadModel tboxClosure = null;
196         if (tboxData != null) {
197             tboxClosure = QuadModel.create();
198             try {
199                 tboxData.emit(RDFHandlers.synchronize(RDFHandlers.wrap(tboxClosure)), 1);
200             } catch (final RDFHandlerException ex) {
201                 throw new RuntimeException(ex);
202             }
203             engine.eval(tboxClosure);
204             processedRuleset = processedRuleset.getABoxRuleset(tboxClosure).mergeSameWhereExpr();
205             engine = RuleEngine.create(processedRuleset);
206             if (!emitTBox) {
207                 tboxClosure = null;
208             } else if (tboxContext != null) {
209                 final URI ctx = tboxContext.equals(SESAME.NIL) ? null : tboxContext;
210                 final List<Statement> stmts = new ArrayList<>(tboxClosure);
211                 tboxClosure.clear();
212                 for (final Statement stmt : stmts) {
213                     tboxClosure.add(stmt.getSubject(), stmt.getPredicate(), stmt.getObject(), ctx);
214                 }
215             }
216         }
217         LOGGER.info("{} initialized with {} ABox rules (from {} rules) in {} ms", engine,
218                 processedRuleset.getRules().size(), ruleset.getRules().size(),
219                 System.currentTimeMillis() - ts);
220 
221         // Setup object
222         this.engine = engine;
223         this.mapper = mapper;
224         this.tboxClosure = tboxClosure;
225         this.dropBNodeTypes = dropBNodeTypes;
226         this.deduplicate = deduplicate;
227     }
228 
229     @Override
230     public RDFHandler wrap(final RDFHandler handler) {
231 
232         // Start from the supplied handler
233         RDFHandler result = handler;
234 
235         // If necessary, filter the handler so to drop <s rdf:type _:bnode> statements
236         if (this.dropBNodeTypes) {
237             result = new AbstractRDFHandlerWrapper(result) {
238 
239                 @Override
240                 public void handleStatement(final Statement stmt) throws RDFHandlerException {
241                     if (!RDF.TYPE.equals(stmt.getPredicate())
242                             || !(stmt.getObject() instanceof BNode)) {
243                         super.handleStatement(stmt);
244                     }
245                 }
246 
247             };
248         }
249 
250         // If necessary, filter the handler so to inject the TBox closure (in parallel)
251         if (this.tboxClosure != null) {
252             result = RDFProcessors.inject(RDFSources.wrap(this.tboxClosure)).wrap(result);
253         }
254 
255         // Add decoupler so to ensure that output is dispatched to parallel threads
256         result = RDFHandlers.decouple(result);
257 
258         // Filter the handler to perform inference. Handle two cases.
259         if (this.mapper == null) {
260 
261             // (1) No mapper: just invoke the rule engine
262             result = this.engine.eval(result, this.deduplicate);
263 
264         } else {
265 
266             // (2) Mapper configured: perform map/reduce and do inference on reduce phase
267             result = RDFProcessors.mapReduce(this.mapper, new Reducer() {
268 
269                 @Override
270                 public void reduce(final Value key, final Statement[] stmts,
271                         final RDFHandler handler) throws RDFHandlerException {
272                     final RDFHandler session = ProcessorRules.this.engine.eval(
273                             RDFHandlers.ignoreMethods(handler, RDFHandlers.METHOD_START_RDF
274                                     | RDFHandlers.METHOD_END_RDF | RDFHandlers.METHOD_CLOSE),
275                             ProcessorRules.this.deduplicate);
276                     try {
277                         session.startRDF();
278                         for (final Statement stmt : stmts) {
279                             session.handleStatement(stmt);
280                         }
281                         session.endRDF();
282                     } finally {
283                         IO.closeQuietly(session);
284                     }
285                 }
286 
287             }, true).wrap(result);
288 
289         }
290 
291         // Return the resulting handler after all the necessary wrappings
292         return result;
293     }
294 
295 }