1   /*
2    * RDFpro - An extensible tool for building stream-oriented RDF processing libraries.
3    * 
4    * Written in 2014 by Francesco Corcoglioniti with support by Marco Amadori, Michele Mostarda,
5    * Alessio Palmero Aprosio and Marco Rospocher. Contact info on http://rdfpro.fbk.eu/
6    * 
7    * To the extent possible under law, the authors have dedicated all copyright and related and
8    * neighboring rights to this software to the public domain worldwide. This software is
9    * distributed without any warranty.
10   * 
11   * You should have received a copy of the CC0 Public Domain Dedication along with this software.
12   * If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.
13   */
14  package eu.fbk.rdfpro;
15  
16  import java.util.ArrayList;
17  import java.util.List;
18  import java.util.function.Predicate;
19  
20  import javax.annotation.Nullable;
21  
22  import org.openrdf.model.BNode;
23  import org.openrdf.model.Literal;
24  import org.openrdf.model.Resource;
25  import org.openrdf.model.Statement;
26  import org.openrdf.model.URI;
27  import org.openrdf.model.Value;
28  import org.openrdf.model.impl.URIImpl;
29  import org.openrdf.rio.RDFHandlerException;
30  
31  import eu.fbk.rdfpro.util.Hash;
32  import eu.fbk.rdfpro.util.Scripting;
33  import eu.fbk.rdfpro.util.Statements;
34  
35  /**
36   * Mapper function in a MapReduce job.
37   * <p>
38   * A {@code Mapper} object is used in a MapReduce job (see
39   * {@link RDFProcessors#mapReduce(Mapper, Reducer, boolean)}) for mapping a statement to zero or
40   * more {@code Value keys}; statements are then grouped by key and each partition is processed in
41   * a reduce job (see {@link Reducer}) to produce the final output statements. Mapping is done by
42   * method {@link #map(Statement)}. It can return zero keys to drop the statement, or a key array
43   * including {@link #BYPASS_KEY} to force the statement to be directly emitted in output,
44   * bypassing the reduce stage.
45   * </p>
46   * <p>
47   * A common type of {@code Mapper} is the one extracting a component or hashing a subset of
48   * components of the input statement. This mapper is already implemented and can be instantiated
49   * using the factory method {@link #select(String)}.
50   * </p>
51   * <p>
52   * Implementations of this interface should be thread-safe, as method {@code map()} is meant to be
53   * invoked concurrently by different threads on different statements.
54   * </p>
55   */
56  @FunctionalInterface
57  public interface Mapper {
58  
59      /** Special key used to bypass the reduce stage and directly emit the statement in output. */
60      URI BYPASS_KEY = new URIImpl("rdfpro:bypass");
61  
62      /**
63       * Maps a statement to zero or more {@code Value} keys. When used in a MapReduce job,
64       * returning zero keys has the effect of dropping the statements, otherwise the statement will
65       * be put in each key partition, each one being later processed in a reduce job (see
66       * {@link Reducer}); if one of the keys is {@link #BYPASS_KEY}, then the statement will be
67       * (also) directly emitted in output, bypassing the reduce stage (this can be used to apply
68       * the MapReduce paradigm to only a subset of statements). Null keys are admissible in output
69       * (e.g., to denote the default context).
70       *
71       * @param statement
72       *            the statement to map, not null
73       * @return an array with zero or more {@code Value} keys associated to the statement, not null
74       * @throws RDFHandlerException
75       *             on error
76       */
77      Value[] map(Statement statement) throws RDFHandlerException;
78  
79      /**
80       * Returns a bypassed version of the input mapper that skips and marks with
81       * {@link #BYPASS_KEY} quads matching the specified predicate.
82       *
83       * @param mapper
84       *            the mapper to filter
85       * @param predicate
86       *            the predicate; if null, no bypassing is performed
87       * @return the resulting mapper
88       */
89      public static Mapper bypass(final Mapper mapper, @Nullable final Predicate<Statement> predicate) {
90          if (predicate != null) {
91              final Value[] bypass = new Value[] { BYPASS_KEY };
92              return new Mapper() {
93  
94                  @Override
95                  public Value[] map(final Statement statement) throws RDFHandlerException {
96                      if (predicate.test(statement)) {
97                          return bypass;
98                      } else {
99                          return mapper.map(statement);
100                     }
101                 }
102             };
103 
104         } else {
105             return mapper;
106         }
107     }
108 
109     /**
110      * Returns a filtered version of the input mapper that only maps quads matching the supplied
111      * predicate.
112      *
113      * @param mapper
114      *            the mapper to filter
115      * @param predicate
116      *            the predicate; if null, no filtering is performed
117      * @return the resulting mapper
118      */
119     public static Mapper filter(final Mapper mapper, @Nullable final Predicate<Statement> predicate) {
120         if (predicate != null) {
121             final Value[] empty = new Value[0];
122             return new Mapper() {
123 
124                 @Override
125                 public Value[] map(final Statement statement) throws RDFHandlerException {
126                     if (predicate.test(statement)) {
127                         return mapper.map(statement);
128                     } else {
129                         return empty;
130                     }
131                 }
132             };
133         } else {
134             return mapper;
135         }
136     }
137 
138     /**
139      * Returns a {@code Mapper} returning a concatenation of all the keys produced by the
140      * {@code Mapper}s supplied for the input statement. Duplicate keys for the same statement are
141      * merged.
142      *
143      * @param mappers
144      *            the mappers whose output has to be concatenated
145      * @return the created {@code Mapper}
146      */
147     public static Mapper concat(final Mapper... mappers) {
148         return new Mapper() {
149 
150             @Override
151             public Value[] map(final Statement statement) throws RDFHandlerException {
152                 final List<Value> keys = new ArrayList<>(mappers.length);
153                 for (int i = 0; i < mappers.length; ++i) {
154                     for (final Value key : mappers[i].map(statement)) {
155                         if (!keys.contains(key)) {
156                             keys.add(key);
157                         }
158                     }
159                 }
160                 return keys.toArray(new Value[keys.size()]);
161             }
162 
163         };
164     }
165 
166     /**
167      * Returns a {@code Mapper} returning a single key based on one or more selected components of
168      * the input statement. Parameter {@code components} is a {@code s} , {@code p}, {@code o},
169      * {@code c} string specifying which components should be selected. If a single component is
170      * selected it is returned as the key unchanged. If multiple components are selected, they are
171      * merged and hashed to produce the returned key. In any case, exactly one key is returned for
172      * each input statement.
173      *
174      * @param components
175      *            a string of symbols {@code s} , {@code p}, {@code o}, {@code c} specifying which
176      *            components to select, not null nor empty
177      * @return the created {@code Mapper}
178      */
179     public static Mapper select(final String components) {
180 
181         final String comp = components.trim().toLowerCase();
182 
183         if (comp.equals("e")) {
184             return new Mapper() {
185 
186                 @Override
187                 public Value[] map(final Statement statement) throws RDFHandlerException {
188                     if (statement.getObject() instanceof Resource) {
189                         return new Value[] { statement.getSubject(), statement.getObject() };
190                     } else {
191                         return new Value[] { statement.getSubject() };
192                     }
193                 }
194 
195             };
196         }
197 
198         int num = 0;
199         for (int i = 0; i < comp.length(); ++i) {
200             final char c = comp.charAt(i);
201             final int b = c == 's' ? 0x8 : c == 'p' ? 0x4 : c == 'o' ? 0x2 : c == 'c' ? 0x1 : -1;
202             if (b < 0 || (num & b) != 0) {
203                 throw new IllegalArgumentException("Invalid components '" + components + "'");
204             }
205             num = num | b;
206         }
207         final int mask = num;
208 
209         if (mask == 0x08 || mask == 0x04 || mask == 0x02 || mask == 0x01) {
210             return new Mapper() {
211 
212                 @Override
213                 public Value[] map(final Statement statement) throws RDFHandlerException {
214                     switch (mask) {
215                     case 0x08:
216                         return new Value[] { statement.getSubject() };
217                     case 0x04:
218                         return new Value[] { statement.getPredicate() };
219                     case 0x02:
220                         return new Value[] { statement.getObject() };
221                     case 0x01:
222                         return new Value[] { statement.getContext() };
223                     default:
224                         throw new Error();
225                     }
226                 }
227 
228             };
229         }
230 
231         return new Mapper() {
232 
233             private final boolean hasSubj = (mask & 0x80) != 0;
234 
235             private final boolean hasPred = (mask & 0x40) != 0;
236 
237             private final boolean hasObj = (mask & 0x20) != 0;
238 
239             private final boolean hasCtx = (mask & 0x10) != 0;
240 
241             @Override
242             public Value[] map(final Statement statement) throws RDFHandlerException {
243 
244                 int header = 0;
245                 int count = 0;
246 
247                 if (hasSubj) {
248                     final int bits = classify(statement.getSubject());
249                     header |= bits << 24;
250                     count += bits & 0xF;
251                 }
252                 if (hasPred) {
253                     final int bits = classify(statement.getPredicate());
254                     header |= bits << 16;
255                     count += bits & 0xF;
256                 }
257                 if (hasObj) {
258                     final int bits = classify(statement.getObject());
259                     header |= bits << 8;
260                     count += bits & 0xF;
261                 }
262                 if (hasCtx) {
263                     final int bits = classify(statement.getContext());
264                     header |= bits;
265                     count += bits & 0xF;
266                 }
267 
268                 final String[] strings = new String[count];
269                 int index = 0;
270                 strings[index++] = Integer.toString(header);
271                 if (hasSubj) {
272                     index = add(strings, index, statement.getSubject());
273                 }
274                 if (hasPred) {
275                     index = add(strings, index, statement.getPredicate());
276                 }
277                 if (hasObj) {
278                     index = add(strings, index, statement.getObject());
279                 }
280                 if (hasCtx) {
281                     index = add(strings, index, statement.getContext());
282                 }
283 
284                 final String hash = Hash.murmur3(strings).toString();
285                 return new Value[] { Statements.VALUE_FACTORY.createBNode(hash) };
286             }
287 
288             private int classify(final Value value) {
289                 if (value == null) {
290                     return 0;
291                 } else if (value instanceof BNode) {
292                     return 0x11;
293                 } else if (value instanceof URI) {
294                     return 0x21;
295                 }
296                 final Literal l = (Literal) value;
297                 if (l.getLanguage() != null) {
298                     return 0x52;
299                 } else if (l.getDatatype() != null) {
300                     return 0x42;
301                 } else {
302                     return 0x31;
303                 }
304             }
305 
306             private int add(final String[] strings, int index, final Value value) {
307                 if (value instanceof URI || value instanceof BNode) {
308                     strings[index++] = value.stringValue();
309                 } else if (value instanceof Literal) {
310                     final Literal l = (Literal) value;
311                     strings[index++] = l.getLabel();
312                     if (l.getLanguage() != null) {
313                         strings[index++] = l.getLanguage();
314                     } else if (l.getDatatype() != null) {
315                         strings[index++] = l.getDatatype().stringValue();
316                     }
317                 }
318                 return index;
319             }
320 
321         };
322     }
323 
324     /**
325      * Parses a {@code Mapper} out of the supplied expression string. The expression can be a
326      * {@code language: expression} script or a component expression supported by
327      * {@link #select(String)}.
328      *
329      * @param expression
330      *            the expression to parse
331      * @return the parsed mapper, or null if a null expression was supplied
332      */
333     @Nullable
334     static Mapper parse(@Nullable final String expression) {
335         if (expression == null) {
336             return null;
337         } else if (Scripting.isScript(expression)) {
338             return Scripting.compile(Mapper.class, expression, "q");
339         } else {
340             return select(expression);
341         }
342     }
343 
344 }