1 /*
2 * RDFpro - An extensible tool for building stream-oriented RDF processing libraries.
3 *
4 * Written in 2014 by Francesco Corcoglioniti with support by Marco Amadori, Michele Mostarda,
5 * Alessio Palmero Aprosio and Marco Rospocher. Contact info on http://rdfpro.fbk.eu/
6 *
7 * To the extent possible under law, the authors have dedicated all copyright and related and
8 * neighboring rights to this software to the public domain worldwide. This software is
9 * distributed without any warranty.
10 *
11 * You should have received a copy of the CC0 Public Domain Dedication along with this software.
12 * If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.
13 */
14 package eu.fbk.rdfpro;
15
16 import java.util.ArrayList;
17 import java.util.List;
18 import java.util.function.Predicate;
19
20 import javax.annotation.Nullable;
21
22 import org.openrdf.model.BNode;
23 import org.openrdf.model.Literal;
24 import org.openrdf.model.Resource;
25 import org.openrdf.model.Statement;
26 import org.openrdf.model.URI;
27 import org.openrdf.model.Value;
28 import org.openrdf.model.impl.URIImpl;
29 import org.openrdf.rio.RDFHandlerException;
30
31 import eu.fbk.rdfpro.util.Hash;
32 import eu.fbk.rdfpro.util.Scripting;
33 import eu.fbk.rdfpro.util.Statements;
34
35 /**
36 * Mapper function in a MapReduce job.
37 * <p>
38 * A {@code Mapper} object is used in a MapReduce job (see
39 * {@link RDFProcessors#mapReduce(Mapper, Reducer, boolean)}) for mapping a statement to zero or
40 * more {@code Value keys}; statements are then grouped by key and each partition is processed in
41 * a reduce job (see {@link Reducer}) to produce the final output statements. Mapping is done by
42 * method {@link #map(Statement)}. It can return zero keys to drop the statement, or a key array
43 * including {@link #BYPASS_KEY} to force the statement to be directly emitted in output,
44 * bypassing the reduce stage.
45 * </p>
46 * <p>
47 * A common type of {@code Mapper} is the one extracting a component or hashing a subset of
48 * components of the input statement. This mapper is already implemented and can be instantiated
49 * using the factory method {@link #select(String)}.
50 * </p>
51 * <p>
52 * Implementations of this interface should be thread-safe, as method {@code map()} is meant to be
53 * invoked concurrently by different threads on different statements.
54 * </p>
55 */
56 @FunctionalInterface
57 public interface Mapper {
58
59 /** Special key used to bypass the reduce stage and directly emit the statement in output. */
60 URI BYPASS_KEY = new URIImpl("rdfpro:bypass");
61
62 /**
63 * Maps a statement to zero or more {@code Value} keys. When used in a MapReduce job,
64 * returning zero keys has the effect of dropping the statements, otherwise the statement will
65 * be put in each key partition, each one being later processed in a reduce job (see
66 * {@link Reducer}); if one of the keys is {@link #BYPASS_KEY}, then the statement will be
67 * (also) directly emitted in output, bypassing the reduce stage (this can be used to apply
68 * the MapReduce paradigm to only a subset of statements). Null keys are admissible in output
69 * (e.g., to denote the default context).
70 *
71 * @param statement
72 * the statement to map, not null
73 * @return an array with zero or more {@code Value} keys associated to the statement, not null
74 * @throws RDFHandlerException
75 * on error
76 */
77 Value[] map(Statement statement) throws RDFHandlerException;
78
79 /**
80 * Returns a bypassed version of the input mapper that skips and marks with
81 * {@link #BYPASS_KEY} quads matching the specified predicate.
82 *
83 * @param mapper
84 * the mapper to filter
85 * @param predicate
86 * the predicate; if null, no bypassing is performed
87 * @return the resulting mapper
88 */
89 public static Mapper bypass(final Mapper mapper, @Nullable final Predicate<Statement> predicate) {
90 if (predicate != null) {
91 final Value[] bypass = new Value[] { BYPASS_KEY };
92 return new Mapper() {
93
94 @Override
95 public Value[] map(final Statement statement) throws RDFHandlerException {
96 if (predicate.test(statement)) {
97 return bypass;
98 } else {
99 return mapper.map(statement);
100 }
101 }
102 };
103
104 } else {
105 return mapper;
106 }
107 }
108
109 /**
110 * Returns a filtered version of the input mapper that only maps quads matching the supplied
111 * predicate.
112 *
113 * @param mapper
114 * the mapper to filter
115 * @param predicate
116 * the predicate; if null, no filtering is performed
117 * @return the resulting mapper
118 */
119 public static Mapper filter(final Mapper mapper, @Nullable final Predicate<Statement> predicate) {
120 if (predicate != null) {
121 final Value[] empty = new Value[0];
122 return new Mapper() {
123
124 @Override
125 public Value[] map(final Statement statement) throws RDFHandlerException {
126 if (predicate.test(statement)) {
127 return mapper.map(statement);
128 } else {
129 return empty;
130 }
131 }
132 };
133 } else {
134 return mapper;
135 }
136 }
137
138 /**
139 * Returns a {@code Mapper} returning a concatenation of all the keys produced by the
140 * {@code Mapper}s supplied for the input statement. Duplicate keys for the same statement are
141 * merged.
142 *
143 * @param mappers
144 * the mappers whose output has to be concatenated
145 * @return the created {@code Mapper}
146 */
147 public static Mapper concat(final Mapper... mappers) {
148 return new Mapper() {
149
150 @Override
151 public Value[] map(final Statement statement) throws RDFHandlerException {
152 final List<Value> keys = new ArrayList<>(mappers.length);
153 for (int i = 0; i < mappers.length; ++i) {
154 for (final Value key : mappers[i].map(statement)) {
155 if (!keys.contains(key)) {
156 keys.add(key);
157 }
158 }
159 }
160 return keys.toArray(new Value[keys.size()]);
161 }
162
163 };
164 }
165
166 /**
167 * Returns a {@code Mapper} returning a single key based on one or more selected components of
168 * the input statement. Parameter {@code components} is a {@code s} , {@code p}, {@code o},
169 * {@code c} string specifying which components should be selected. If a single component is
170 * selected it is returned as the key unchanged. If multiple components are selected, they are
171 * merged and hashed to produce the returned key. In any case, exactly one key is returned for
172 * each input statement.
173 *
174 * @param components
175 * a string of symbols {@code s} , {@code p}, {@code o}, {@code c} specifying which
176 * components to select, not null nor empty
177 * @return the created {@code Mapper}
178 */
179 public static Mapper select(final String components) {
180
181 final String comp = components.trim().toLowerCase();
182
183 if (comp.equals("e")) {
184 return new Mapper() {
185
186 @Override
187 public Value[] map(final Statement statement) throws RDFHandlerException {
188 if (statement.getObject() instanceof Resource) {
189 return new Value[] { statement.getSubject(), statement.getObject() };
190 } else {
191 return new Value[] { statement.getSubject() };
192 }
193 }
194
195 };
196 }
197
198 int num = 0;
199 for (int i = 0; i < comp.length(); ++i) {
200 final char c = comp.charAt(i);
201 final int b = c == 's' ? 0x8 : c == 'p' ? 0x4 : c == 'o' ? 0x2 : c == 'c' ? 0x1 : -1;
202 if (b < 0 || (num & b) != 0) {
203 throw new IllegalArgumentException("Invalid components '" + components + "'");
204 }
205 num = num | b;
206 }
207 final int mask = num;
208
209 if (mask == 0x08 || mask == 0x04 || mask == 0x02 || mask == 0x01) {
210 return new Mapper() {
211
212 @Override
213 public Value[] map(final Statement statement) throws RDFHandlerException {
214 switch (mask) {
215 case 0x08:
216 return new Value[] { statement.getSubject() };
217 case 0x04:
218 return new Value[] { statement.getPredicate() };
219 case 0x02:
220 return new Value[] { statement.getObject() };
221 case 0x01:
222 return new Value[] { statement.getContext() };
223 default:
224 throw new Error();
225 }
226 }
227
228 };
229 }
230
231 return new Mapper() {
232
233 private final boolean hasSubj = (mask & 0x80) != 0;
234
235 private final boolean hasPred = (mask & 0x40) != 0;
236
237 private final boolean hasObj = (mask & 0x20) != 0;
238
239 private final boolean hasCtx = (mask & 0x10) != 0;
240
241 @Override
242 public Value[] map(final Statement statement) throws RDFHandlerException {
243
244 int header = 0;
245 int count = 0;
246
247 if (hasSubj) {
248 final int bits = classify(statement.getSubject());
249 header |= bits << 24;
250 count += bits & 0xF;
251 }
252 if (hasPred) {
253 final int bits = classify(statement.getPredicate());
254 header |= bits << 16;
255 count += bits & 0xF;
256 }
257 if (hasObj) {
258 final int bits = classify(statement.getObject());
259 header |= bits << 8;
260 count += bits & 0xF;
261 }
262 if (hasCtx) {
263 final int bits = classify(statement.getContext());
264 header |= bits;
265 count += bits & 0xF;
266 }
267
268 final String[] strings = new String[count];
269 int index = 0;
270 strings[index++] = Integer.toString(header);
271 if (hasSubj) {
272 index = add(strings, index, statement.getSubject());
273 }
274 if (hasPred) {
275 index = add(strings, index, statement.getPredicate());
276 }
277 if (hasObj) {
278 index = add(strings, index, statement.getObject());
279 }
280 if (hasCtx) {
281 index = add(strings, index, statement.getContext());
282 }
283
284 final String hash = Hash.murmur3(strings).toString();
285 return new Value[] { Statements.VALUE_FACTORY.createBNode(hash) };
286 }
287
288 private int classify(final Value value) {
289 if (value == null) {
290 return 0;
291 } else if (value instanceof BNode) {
292 return 0x11;
293 } else if (value instanceof URI) {
294 return 0x21;
295 }
296 final Literal l = (Literal) value;
297 if (l.getLanguage() != null) {
298 return 0x52;
299 } else if (l.getDatatype() != null) {
300 return 0x42;
301 } else {
302 return 0x31;
303 }
304 }
305
306 private int add(final String[] strings, int index, final Value value) {
307 if (value instanceof URI || value instanceof BNode) {
308 strings[index++] = value.stringValue();
309 } else if (value instanceof Literal) {
310 final Literal l = (Literal) value;
311 strings[index++] = l.getLabel();
312 if (l.getLanguage() != null) {
313 strings[index++] = l.getLanguage();
314 } else if (l.getDatatype() != null) {
315 strings[index++] = l.getDatatype().stringValue();
316 }
317 }
318 return index;
319 }
320
321 };
322 }
323
324 /**
325 * Parses a {@code Mapper} out of the supplied expression string. The expression can be a
326 * {@code language: expression} script or a component expression supported by
327 * {@link #select(String)}.
328 *
329 * @param expression
330 * the expression to parse
331 * @return the parsed mapper, or null if a null expression was supplied
332 */
333 @Nullable
334 static Mapper parse(@Nullable final String expression) {
335 if (expression == null) {
336 return null;
337 } else if (Scripting.isScript(expression)) {
338 return Scripting.compile(Mapper.class, expression, "q");
339 } else {
340 return select(expression);
341 }
342 }
343
344 }