1   /*
2    * RDFpro - An extensible tool for building stream-oriented RDF processing libraries.
3    * 
4    * Written in 2014 by Francesco Corcoglioniti with support by Marco Amadori, Michele Mostarda,
5    * Alessio Palmero Aprosio and Marco Rospocher. Contact info on http://rdfpro.fbk.eu/
6    * 
7    * To the extent possible under law, the authors have dedicated all copyright and related and
8    * neighboring rights to this software to the public domain worldwide. This software is
9    * distributed without any warranty.
10   * 
11   * You should have received a copy of the CC0 Public Domain Dedication along with this software.
12   * If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.
13   */
14  package eu.fbk.rdfpro;
15  
16  import java.util.function.Predicate;
17  
18  import javax.annotation.Nullable;
19  
20  import org.openrdf.model.Statement;
21  import org.openrdf.model.Value;
22  import org.openrdf.rio.RDFHandler;
23  import org.openrdf.rio.RDFHandlerException;
24  
25  import eu.fbk.rdfpro.util.Scripting;
26  
27  /**
28   * Reduce function in a MapReduce job.
29   * <p>
30   * A {@code Reducer} object is used in a MapReduce job (see
31   * {@link RDFProcessors#mapReduce(Mapper, Reducer, boolean)}) to process a partition of statements
32   * associated to a certain {@code Value} key produced by a {@link Mapper} in a previous map phase
33   * (e.g., all the statements having a specific subject).
34   * </p>
35   * <p>
36   * Implementations of this interface should be thread-safe, as multiple reduce jobs can be fired
37   * in parallel with method {@code reduce()} being invoked concurrently by different threads on
38   * different statement partitions.
39   * </p>
40   */
41  @FunctionalInterface
42  public interface Reducer {
43  
44      /**
45       * The identity reducer that emits all the quads of a partition unchanged.
46       */
47      Reducer IDENTITY = new Reducer() {
48  
49          @Override
50          public void reduce(final Value key, final Statement[] statements, final RDFHandler handler)
51                  throws RDFHandlerException {
52              for (final Statement statement : statements) {
53                  handler.handleStatement(statement);
54              }
55          }
56  
57      };
58  
59      /**
60       * Returns a filtered version of the input reducer that operates only on partitions satisfying
61       * the existential and forall predicates supplied.
62       *
63       * @param reducer
64       *            the reducer to filter
65       * @param existsPredicate
66       *            the exists predicate, that must be satisfied by at least a partition quad in
67       *            order for the partition to be processed; if null no existential filtering is
68       *            done
69       * @param forallPredicate
70       *            the forall predicate, that must be satisfied by all the quads of a partition in
71       *            order for it to be processed; if null, no forall filtering is applied
72       * @return the resulting filtered reducer
73       */
74      static Reducer filter(final Reducer reducer,
75              @Nullable final Predicate<Statement> existsPredicate,
76              @Nullable final Predicate<Statement> forallPredicate) {
77  
78          if (existsPredicate != null) {
79              if (forallPredicate != null) {
80                  return new Reducer() {
81  
82                      @Override
83                      public void reduce(final Value key, final Statement[] statements,
84                              final RDFHandler handler) throws RDFHandlerException {
85                          boolean exists = false;
86                          for (final Statement statement : statements) {
87                              if (!forallPredicate.test(statement)) {
88                                  return;
89                              }
90                              exists = exists || existsPredicate.test(statement);
91                          }
92                          if (exists) {
93                              reducer.reduce(key, statements, handler);
94                          }
95                      }
96  
97                  };
98              } else {
99                  return new Reducer() {
100 
101                     @Override
102                     public void reduce(final Value key, final Statement[] statements,
103                             final RDFHandler handler) throws RDFHandlerException {
104                         for (final Statement statement : statements) {
105                             if (existsPredicate.test(statement)) {
106                                 reducer.reduce(key, statements, handler);
107                                 return;
108                             }
109                         }
110                     }
111 
112                 };
113             }
114         } else {
115             if (forallPredicate != null) {
116                 return new Reducer() {
117 
118                     @Override
119                     public void reduce(final Value key, final Statement[] statements,
120                             final RDFHandler handler) throws RDFHandlerException {
121                         for (final Statement statement : statements) {
122                             if (!forallPredicate.test(statement)) {
123                                 return;
124                             }
125                         }
126                         reducer.reduce(key, statements, handler);
127                     }
128 
129                 };
130             } else {
131                 return reducer;
132             }
133         }
134     }
135 
136     /**
137      * Returns a {@code Reducer} that emits the output of multiple reductions on the same quad
138      * partition.
139      *
140      * @param reducers
141      *            the reducers whose output has to be concatenated
142      * @return the created {@code Reducer}
143      */
144     static Reducer concat(final Reducer... reducers) {
145         return new Reducer() {
146 
147             @Override
148             public void reduce(final Value key, final Statement[] statements,
149                     final RDFHandler handler) throws RDFHandlerException {
150                 for (final Reducer reducer : reducers) {
151                     reducer.reduce(key, statements, handler);
152                 }
153             }
154 
155         };
156     }
157 
158     /**
159      * Parses a {@code Reducer} out of the supplied expression string. The expression must be a
160      * {@code language: expression} script.
161      *
162      * @param expression
163      *            the expression to parse
164      * @return the parsed reducer, or null if a null expression was supplied
165      */
166     @Nullable
167     static Mapper parse(@Nullable final String expression) {
168         return expression == null ? null //
169                 : Scripting.compile(Mapper.class, expression, "k", "p", "h");
170     }
171 
172     /**
173      * Processes the statement partition associated to a certain key, emitting output statements
174      * to the supplied {@code RDFHandler}.
175      *
176      * @param key
177      *            the partition key, possibly null
178      * @param statements
179      *            a modifiable array with the statements belonging to the partition, not null
180      * @param handler
181      *            the {@code RDFHandler} where to emit output statements, not null
182      * @throws RDFHandlerException
183      *             on error
184      */
185     void reduce(@Nullable Value key, Statement[] statements, RDFHandler handler)
186             throws RDFHandlerException;
187 
188 }