1
2
3
4
5
6
7
8
9
10
11
12
13
14 package eu.fbk.rdfpro;
15
16 import java.io.IOException;
17 import java.io.Reader;
18 import java.util.ArrayList;
19 import java.util.List;
20
21 import javax.annotation.Nullable;
22
23 import org.openrdf.model.BNode;
24 import org.openrdf.model.Statement;
25 import org.openrdf.model.URI;
26 import org.openrdf.model.Value;
27 import org.openrdf.model.vocabulary.RDF;
28 import org.openrdf.model.vocabulary.SESAME;
29 import org.openrdf.query.impl.MapBindingSet;
30 import org.openrdf.rio.RDFHandler;
31 import org.openrdf.rio.RDFHandlerException;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 import eu.fbk.rdfpro.util.Environment;
36 import eu.fbk.rdfpro.util.IO;
37 import eu.fbk.rdfpro.util.Namespaces;
38 import eu.fbk.rdfpro.util.Options;
39 import eu.fbk.rdfpro.util.QuadModel;
40 import eu.fbk.rdfpro.util.Statements;
41 import eu.fbk.rdfpro.util.Tracker;
42
43 final class ProcessorRules implements RDFProcessor {
44
45 private static final Logger LOGGER = LoggerFactory.getLogger(ProcessorRules.class);
46
47 private final RuleEngine engine;
48
49 @Nullable
50 private final Mapper mapper;
51
52 @Nullable
53 private final QuadModel tboxClosure;
54
55 private final boolean dropBNodeTypes;
56
57 private final boolean deduplicate;
58
59 static RDFProcessor create(final String name, final String... args) throws IOException,
60 RDFHandlerException {
61
62
63 final Options options = Options.parse("r!|B!|p!|g!|t|C|c!|b!|w|u|*", args);
64
65
66 final boolean preserveBNodes = !options.hasOption("w");
67 String base = options.getOptionArg("b", String.class);
68 base = base == null ? null : Statements.parseValue(base.contains(":") ? base : base + ":",
69 Namespaces.DEFAULT).stringValue();
70
71
72 final String parameters = options.getOptionArg("B", String.class, "");
73 final MapBindingSet bindings = new MapBindingSet();
74 for (final String token : parameters.split("\\s+")) {
75 final int index = token.indexOf('=');
76 if (index >= 0) {
77 bindings.addBinding(token.substring(0, index).trim(), Statements.parseValue(token
78 .substring(index + 1).trim(), Namespaces.DEFAULT));
79 }
80 }
81
82
83 final List<String> rdfRulesetURLs = new ArrayList<>();
84 final List<String> dlogRulesetURLs = new ArrayList<>();
85 final String rulesetNames = options.getOptionArg("r", String.class);
86 for (final String rulesetName : rulesetNames.split(",")) {
87 String location = Environment.getProperty("rdfpro.rules." + rulesetName);
88 location = location != null ? location : rulesetName;
89 final String url = IO.extractURL(location).toString();
90 (url.endsWith(".dlog") ? dlogRulesetURLs : rdfRulesetURLs).add(url);
91 }
92 Ruleset ruleset = null;
93 if (!rdfRulesetURLs.isEmpty()) {
94 final RDFSource rulesetSource = RDFSources.read(true, preserveBNodes, base, null,
95 rdfRulesetURLs.toArray(new String[rdfRulesetURLs.size()]));
96 try {
97 ruleset = Ruleset.fromRDF(rulesetSource);
98 } catch (final Throwable ex) {
99 LOGGER.error("Invalid ruleset", ex);
100 throw ex;
101 }
102 }
103 if (!dlogRulesetURLs.isEmpty()) {
104 final List<Ruleset> rulesets = new ArrayList<>();
105 if (ruleset != null) {
106 rulesets.add(ruleset);
107 }
108 for (final String dlogRulesetURL : dlogRulesetURLs) {
109 try (Reader dlogReader = IO.utf8Reader(IO.read(dlogRulesetURL))) {
110 final List<Rule> rules = Rule.fromDLOG(dlogReader);
111 rulesets.add(new Ruleset(rules, null));
112 }
113 }
114 ruleset = Ruleset.merge(rulesets.toArray(new Ruleset[rulesets.size()]));
115 }
116
117
118 ruleset = ruleset.rewriteVariables(bindings);
119 URI globalURI = null;
120 if (options.hasOption("G")) {
121 final String u = options.getOptionArg("G", String.class);
122 globalURI = (URI) Statements.parseValue(u.contains(":") ? u
123 : u + ":", Namespaces.DEFAULT);
124 }
125 final String mode = options.getOptionArg("g", String.class, "none").trim();
126 if ("global".equalsIgnoreCase(mode)) {
127 ruleset = ruleset.rewriteGlobalGM(globalURI);
128 } else if ("separate".equalsIgnoreCase(mode)) {
129 ruleset = ruleset.rewriteSeparateGM();
130 } else if ("star".equalsIgnoreCase(mode)) {
131 ruleset = ruleset.rewriteStarGM(globalURI);
132 } else if (!"none".equalsIgnoreCase(mode)) {
133 throw new IllegalArgumentException("Unknown graph inference mode: " + mode);
134 }
135
136
137 boolean emitTBox = false;
138 URI tboxContext = null;
139 if (options.hasOption("C")) {
140 emitTBox = true;
141 } else if (options.hasOption("c")) {
142 emitTBox = true;
143 final String ctx = options.getOptionArg("c", String.class);
144 tboxContext = (URI) Statements.parseValue(ctx.contains(":") ? ctx
145 : ctx + ":", Namespaces.DEFAULT);
146 }
147
148
149 final boolean dropBNodeTypes = options.hasOption("t");
150
151
152 Mapper mapper = null;
153 final String partitioning = options.getOptionArg("p", String.class, "none").trim();
154 if ("entity".equalsIgnoreCase(partitioning)) {
155 mapper = Mapper.concat(Mapper.select("s"), Mapper.select("o"));
156 } else if ("graph".equalsIgnoreCase(partitioning)) {
157 mapper = Mapper.select("c");
158 } else if ("rules".equalsIgnoreCase(partitioning)) {
159 throw new UnsupportedOperationException("Rule-based partitioning not yet implemented");
160 } else if (!"none".equals(partitioning)) {
161 throw new IllegalArgumentException("Unknown partitioning scheme: " + partitioning);
162 }
163
164
165 final String[] tboxSpecs = options.getPositionalArgs(String.class).toArray(new String[0]);
166 final RDFSource tboxData = tboxSpecs.length == 0 ? null : RDFProcessors.track(
167 new Tracker(LOGGER, null, "%d TBox triples read (%d tr/s avg)",
168 "%d TBox triples read (%d tr/s, %d tr/s avg)")).wrap(
169 RDFSources.read(true, preserveBNodes, base, null, tboxSpecs));
170
171
172 final boolean deduplicate = options.hasOption("u");
173
174
175 return new ProcessorRules(ruleset, mapper, dropBNodeTypes, deduplicate, tboxData,
176 emitTBox, tboxContext);
177 }
178
179 public ProcessorRules(final Ruleset ruleset, @Nullable final Mapper mapper,
180 final boolean dropBNodeTypes, final boolean deduplicate) {
181 this(ruleset, mapper, dropBNodeTypes, deduplicate, null, false, null);
182 }
183
184 public ProcessorRules(final Ruleset ruleset, @Nullable final Mapper mapper,
185 final boolean dropBNodeTypes, final boolean deduplicate,
186 @Nullable final RDFSource tboxData, final boolean emitTBox,
187 @Nullable final URI tboxContext) {
188
189
190 LOGGER.debug("Processing {} rules {} TBox data", ruleset.getRules().size(),
191 tboxData == null ? "without" : "with");
192 final long ts = System.currentTimeMillis();
193 Ruleset processedRuleset = ruleset.mergeSameWhereExpr();
194 RuleEngine engine = RuleEngine.create(processedRuleset);
195 QuadModel tboxClosure = null;
196 if (tboxData != null) {
197 tboxClosure = QuadModel.create();
198 try {
199 tboxData.emit(RDFHandlers.synchronize(RDFHandlers.wrap(tboxClosure)), 1);
200 } catch (final RDFHandlerException ex) {
201 throw new RuntimeException(ex);
202 }
203 engine.eval(tboxClosure);
204 processedRuleset = processedRuleset.getABoxRuleset(tboxClosure).mergeSameWhereExpr();
205 engine = RuleEngine.create(processedRuleset);
206 if (!emitTBox) {
207 tboxClosure = null;
208 } else if (tboxContext != null) {
209 final URI ctx = tboxContext.equals(SESAME.NIL) ? null : tboxContext;
210 final List<Statement> stmts = new ArrayList<>(tboxClosure);
211 tboxClosure.clear();
212 for (final Statement stmt : stmts) {
213 tboxClosure.add(stmt.getSubject(), stmt.getPredicate(), stmt.getObject(), ctx);
214 }
215 }
216 }
217 LOGGER.info("{} initialized with {} ABox rules (from {} rules) in {} ms", engine,
218 processedRuleset.getRules().size(), ruleset.getRules().size(),
219 System.currentTimeMillis() - ts);
220
221
222 this.engine = engine;
223 this.mapper = mapper;
224 this.tboxClosure = tboxClosure;
225 this.dropBNodeTypes = dropBNodeTypes;
226 this.deduplicate = deduplicate;
227 }
228
229 @Override
230 public RDFHandler wrap(final RDFHandler handler) {
231
232
233 RDFHandler result = handler;
234
235
236 if (this.dropBNodeTypes) {
237 result = new AbstractRDFHandlerWrapper(result) {
238
239 @Override
240 public void handleStatement(final Statement stmt) throws RDFHandlerException {
241 if (!RDF.TYPE.equals(stmt.getPredicate())
242 || !(stmt.getObject() instanceof BNode)) {
243 super.handleStatement(stmt);
244 }
245 }
246
247 };
248 }
249
250
251 if (this.tboxClosure != null) {
252 result = RDFProcessors.inject(RDFSources.wrap(this.tboxClosure)).wrap(result);
253 }
254
255
256 result = RDFHandlers.decouple(result);
257
258
259 if (this.mapper == null) {
260
261
262 result = this.engine.eval(result, this.deduplicate);
263
264 } else {
265
266
267 result = RDFProcessors.mapReduce(this.mapper, new Reducer() {
268
269 @Override
270 public void reduce(final Value key, final Statement[] stmts,
271 final RDFHandler handler) throws RDFHandlerException {
272 final RDFHandler session = ProcessorRules.this.engine.eval(
273 RDFHandlers.ignoreMethods(handler, RDFHandlers.METHOD_START_RDF
274 | RDFHandlers.METHOD_END_RDF | RDFHandlers.METHOD_CLOSE),
275 ProcessorRules.this.deduplicate);
276 try {
277 session.startRDF();
278 for (final Statement stmt : stmts) {
279 session.handleStatement(stmt);
280 }
281 session.endRDF();
282 } finally {
283 IO.closeQuietly(session);
284 }
285 }
286
287 }, true).wrap(result);
288
289 }
290
291
292 return result;
293 }
294
295 }