1
2
3
4
5
6
7
8
9
10
11
12
13
14 package eu.fbk.rdfpro;
15
16 import java.io.IOException;
17 import java.util.ArrayList;
18 import java.util.List;
19 import java.util.Objects;
20 import java.util.concurrent.Semaphore;
21 import java.util.concurrent.atomic.AtomicReference;
22 import java.util.function.Consumer;
23
24 import org.openrdf.model.Resource;
25 import org.openrdf.model.Statement;
26 import org.openrdf.model.URI;
27 import org.openrdf.model.Value;
28 import org.openrdf.model.ValueFactory;
29 import org.openrdf.rio.RDFHandler;
30 import org.openrdf.rio.RDFHandlerException;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 import eu.fbk.rdfpro.util.Environment;
35 import eu.fbk.rdfpro.util.Sorter;
36 import eu.fbk.rdfpro.util.Statements;
37 import eu.fbk.rdfpro.util.Tracker;
38
39 final class ProcessorMapReduce implements RDFProcessor {
40
41 private static final Logger LOGGER = LoggerFactory.getLogger(ProcessorMapReduce.class);
42
43 private static final int MIN_RUNNABLE_STATEMENTS = 256;
44
45 private static final int MAX_RUNNABLE_MULTIPLIER = 4;
46
47 private final Mapper mapper;
48
49 private final Reducer reducer;
50
51 private final boolean deduplicate;
52
53 ProcessorMapReduce(final Mapper mapper, final Reducer reducer, final boolean deduplicate) {
54 this.mapper = Objects.requireNonNull(mapper);
55 this.reducer = Objects.requireNonNull(reducer);
56 this.deduplicate = deduplicate;
57 }
58
59 @Override
60 public RDFHandler wrap(final RDFHandler handler) {
61 return new Handler(Objects.requireNonNull(handler));
62 }
63
64 private final class Handler extends AbstractRDFHandlerWrapper implements Consumer<Object[]> {
65
66 private final List<Value> jobKeys;
67
68 private final List<Statement[]> jobStatements;
69
70 private int jobSize;
71
72 private Value currentKey;
73
74 private final List<Statement> currentStatements;
75
76 private final AtomicReference<Throwable> exceptionHolder;
77
78 private final int semaphoreSize;
79
80 private final Semaphore semaphore;
81
82 private Sorter<Object[]> sorter;
83
84 private final Tracker tracker;
85
86 Handler(final RDFHandler handler) {
87 super(handler);
88 this.jobKeys = new ArrayList<Value>();
89 this.jobStatements = new ArrayList<Statement[]>();
90 this.jobSize = 0;
91 this.currentKey = null;
92 this.currentStatements = new ArrayList<Statement>();
93 this.exceptionHolder = new AtomicReference<Throwable>();
94 this.semaphoreSize = MAX_RUNNABLE_MULTIPLIER * Environment.getCores();
95 this.semaphore = new Semaphore(this.semaphoreSize);
96 this.sorter = Sorter.newTupleSorter(true, Value.class, Value.class, Value.class,
97 Value.class, Value.class, Long.class);
98 this.tracker = new Tracker(LOGGER, null,
99 "%d reductions (%d red/s avg)",
100 "%d reductions (%d red/s, %d red/s avg)");
101 }
102
103 @Override
104 public void startRDF() throws RDFHandlerException {
105 super.startRDF();
106 try {
107 this.sorter.start(ProcessorMapReduce.this.deduplicate);
108 } catch (final IOException ex) {
109 throw new RDFHandlerException(ex);
110 }
111 }
112
113 @Override
114 public void handleComment(final String comment) throws RDFHandlerException {
115
116 }
117
118 @Override
119 public void handleStatement(final Statement statement) throws RDFHandlerException {
120 final Value[] keys = ProcessorMapReduce.this.mapper.map(statement);
121 for (final Value key : keys) {
122 if (Mapper.BYPASS_KEY.equals(key)) {
123 super.handleStatement(statement);
124 } else {
125 final Value s = statement.getSubject();
126 final Value p = statement.getPredicate();
127 final Value o = statement.getObject();
128 final Value c = statement.getContext();
129 final boolean skey = Objects.equals(s, key);
130 final boolean pkey = Objects.equals(p, key);
131 final boolean okey = Objects.equals(o, key);
132 final boolean ckey = Objects.equals(c, key);
133 final Object[] record = new Object[6];
134 record[0] = key;
135 record[1] = skey ? null : s;
136 record[2] = pkey ? null : p;
137 record[3] = okey ? null : o;
138 record[4] = ckey ? null : c;
139 record[5] = new Long((skey ? 0x08 : 0) | (pkey ? 0x04 : 0) | (okey ? 0x02 : 0)
140 | (ckey ? 0x01 : 0));
141 try {
142 this.sorter.emit(record);
143 } catch (final IOException ex) {
144 throw new RDFHandlerException(ex);
145 }
146 }
147 }
148 }
149
150 @Override
151 public void endRDF() throws RDFHandlerException {
152 try {
153 this.tracker.start();
154 this.sorter.end(false, this);
155 flush(true);
156 this.semaphore.acquire(this.semaphoreSize);
157 this.tracker.end();
158 super.endRDF();
159 } catch (final InterruptedException | IOException ex) {
160 throw new RDFHandlerException(ex);
161 } finally {
162 this.sorter.close();
163 this.sorter = null;
164 }
165 }
166
167 @Override
168 public void accept(final Object[] record) {
169
170 final Value key = (Value) record[0];
171 final int mask = ((Number) record[5]).intValue();
172
173 final Resource s = (Resource) ((mask & 0x08) != 0 ? key : record[1]);
174 final URI p = (URI) ((mask & 0x04) != 0 ? key : record[2]);
175 final Value o = (Value) ((mask & 0x02) != 0 ? key : record[3]);
176 final Resource c = (Resource) ((mask & 0x01) != 0 ? key : record[4]);
177
178 final ValueFactory vf = Statements.VALUE_FACTORY;
179 final Statement statement = c == null ? vf.createStatement(s, p, o)
180 : vf.createStatement(s, p, o, c);
181
182 if (!key.equals(this.currentKey)) {
183 try {
184 flush(false);
185 } catch (final Throwable ex) {
186 throw new RuntimeException(ex);
187 }
188 this.currentKey = key;
189 this.currentStatements.clear();
190 }
191
192 this.currentStatements.add(statement);
193 }
194
195 private void flush(final boolean done) throws RDFHandlerException, InterruptedException {
196
197 final int numStmt = this.currentStatements.size();
198 if (numStmt > 0) {
199 this.jobKeys.add(this.currentKey);
200 this.jobStatements.add(this.currentStatements.toArray(new Statement[numStmt]));
201 this.jobSize += numStmt;
202 }
203
204 final int len = this.jobKeys.size();
205 if (len == 0 || !done && this.jobSize < MIN_RUNNABLE_STATEMENTS) {
206 return;
207 }
208
209 final Throwable exception = this.exceptionHolder.get();
210 if (exception != null) {
211 if (exception instanceof RDFHandlerException) {
212 throw (RDFHandlerException) exception;
213 } else if (exception instanceof RuntimeException) {
214 throw (RuntimeException) exception;
215 } else if (exception instanceof Error) {
216 throw (Error) exception;
217 }
218 throw new RDFHandlerException(exception);
219 }
220
221 final Value[] jobKeys = this.jobKeys.toArray(new Value[len]);
222 final Statement[][] jobStatements = this.jobStatements.toArray(new Statement[len][]);
223 this.jobKeys.clear();
224 this.jobStatements.clear();
225 this.jobSize = 0;
226 this.semaphore.acquire();
227 try {
228 Environment.getPool().execute(new Runnable() {
229
230 @Override
231 public void run() {
232 try {
233 for (int i = 0; i < len; ++i) {
234 ProcessorMapReduce.this.reducer.reduce(jobKeys[i],
235 jobStatements[i], Handler.this.handler);
236 Handler.this.tracker.increment();
237 }
238 } catch (final Throwable ex) {
239 synchronized (Handler.this.exceptionHolder) {
240 final Throwable exception = Handler.this.exceptionHolder.get();
241 if (exception != null) {
242 exception.addSuppressed(ex);
243 } else {
244 Handler.this.exceptionHolder.set(ex);
245 }
246 }
247 } finally {
248 Handler.this.semaphore.release();
249 }
250 }
251
252 });
253 } catch (final Throwable ex) {
254 this.semaphore.release();
255 throw ex;
256 }
257 }
258
259 }
260
261 }