1 /*
2 * RDFpro - An extensible tool for building stream-oriented RDF processing libraries.
3 *
4 * Written in 2014 by Francesco Corcoglioniti with support by Marco Amadori, Michele Mostarda,
5 * Alessio Palmero Aprosio and Marco Rospocher. Contact info on http://rdfpro.fbk.eu/
6 *
7 * To the extent possible under law, the authors have dedicated all copyright and related and
8 * neighboring rights to this software to the public domain worldwide. This software is
9 * distributed without any warranty.
10 *
11 * You should have received a copy of the CC0 Public Domain Dedication along with this software.
12 * If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.
13 */
14 package eu.fbk.rdfpro;
15
16 import java.util.function.Predicate;
17
18 import javax.annotation.Nullable;
19
20 import org.openrdf.model.Statement;
21 import org.openrdf.model.Value;
22 import org.openrdf.rio.RDFHandler;
23 import org.openrdf.rio.RDFHandlerException;
24
25 import eu.fbk.rdfpro.util.Scripting;
26
27 /**
28 * Reduce function in a MapReduce job.
29 * <p>
30 * A {@code Reducer} object is used in a MapReduce job (see
31 * {@link RDFProcessors#mapReduce(Mapper, Reducer, boolean)}) to process a partition of statements
32 * associated to a certain {@code Value} key produced by a {@link Mapper} in a previous map phase
33 * (e.g., all the statements having a specific subject).
34 * </p>
35 * <p>
36 * Implementations of this interface should be thread-safe, as multiple reduce jobs can be fired
37 * in parallel with method {@code reduce()} being invoked concurrently by different threads on
38 * different statement partitions.
39 * </p>
40 */
41 @FunctionalInterface
42 public interface Reducer {
43
44 /**
45 * The identity reducer that emits all the quads of a partition unchanged.
46 */
47 Reducer IDENTITY = new Reducer() {
48
49 @Override
50 public void reduce(final Value key, final Statement[] statements, final RDFHandler handler)
51 throws RDFHandlerException {
52 for (final Statement statement : statements) {
53 handler.handleStatement(statement);
54 }
55 }
56
57 };
58
59 /**
60 * Returns a filtered version of the input reducer that operates only on partitions satisfying
61 * the existential and forall predicates supplied.
62 *
63 * @param reducer
64 * the reducer to filter
65 * @param existsPredicate
66 * the exists predicate, that must be satisfied by at least a partition quad in
67 * order for the partition to be processed; if null no existential filtering is
68 * done
69 * @param forallPredicate
70 * the forall predicate, that must be satisfied by all the quads of a partition in
71 * order for it to be processed; if null, no forall filtering is applied
72 * @return the resulting filtered reducer
73 */
74 static Reducer filter(final Reducer reducer,
75 @Nullable final Predicate<Statement> existsPredicate,
76 @Nullable final Predicate<Statement> forallPredicate) {
77
78 if (existsPredicate != null) {
79 if (forallPredicate != null) {
80 return new Reducer() {
81
82 @Override
83 public void reduce(final Value key, final Statement[] statements,
84 final RDFHandler handler) throws RDFHandlerException {
85 boolean exists = false;
86 for (final Statement statement : statements) {
87 if (!forallPredicate.test(statement)) {
88 return;
89 }
90 exists = exists || existsPredicate.test(statement);
91 }
92 if (exists) {
93 reducer.reduce(key, statements, handler);
94 }
95 }
96
97 };
98 } else {
99 return new Reducer() {
100
101 @Override
102 public void reduce(final Value key, final Statement[] statements,
103 final RDFHandler handler) throws RDFHandlerException {
104 for (final Statement statement : statements) {
105 if (existsPredicate.test(statement)) {
106 reducer.reduce(key, statements, handler);
107 return;
108 }
109 }
110 }
111
112 };
113 }
114 } else {
115 if (forallPredicate != null) {
116 return new Reducer() {
117
118 @Override
119 public void reduce(final Value key, final Statement[] statements,
120 final RDFHandler handler) throws RDFHandlerException {
121 for (final Statement statement : statements) {
122 if (!forallPredicate.test(statement)) {
123 return;
124 }
125 }
126 reducer.reduce(key, statements, handler);
127 }
128
129 };
130 } else {
131 return reducer;
132 }
133 }
134 }
135
136 /**
137 * Returns a {@code Reducer} that emits the output of multiple reductions on the same quad
138 * partition.
139 *
140 * @param reducers
141 * the reducers whose output has to be concatenated
142 * @return the created {@code Reducer}
143 */
144 static Reducer concat(final Reducer... reducers) {
145 return new Reducer() {
146
147 @Override
148 public void reduce(final Value key, final Statement[] statements,
149 final RDFHandler handler) throws RDFHandlerException {
150 for (final Reducer reducer : reducers) {
151 reducer.reduce(key, statements, handler);
152 }
153 }
154
155 };
156 }
157
158 /**
159 * Parses a {@code Reducer} out of the supplied expression string. The expression must be a
160 * {@code language: expression} script.
161 *
162 * @param expression
163 * the expression to parse
164 * @return the parsed reducer, or null if a null expression was supplied
165 */
166 @Nullable
167 static Mapper parse(@Nullable final String expression) {
168 return expression == null ? null //
169 : Scripting.compile(Mapper.class, expression, "k", "p", "h");
170 }
171
172 /**
173 * Processes the statement partition associated to a certain key, emitting output statements
174 * to the supplied {@code RDFHandler}.
175 *
176 * @param key
177 * the partition key, possibly null
178 * @param statements
179 * a modifiable array with the statements belonging to the partition, not null
180 * @param handler
181 * the {@code RDFHandler} where to emit output statements, not null
182 * @throws RDFHandlerException
183 * on error
184 */
185 void reduce(@Nullable Value key, Statement[] statements, RDFHandler handler)
186 throws RDFHandlerException;
187
188 }