1   /*
2    * RDFpro - An extensible tool for building stream-oriented RDF processing libraries.
3    * 
4    * Written in 2014 by Francesco Corcoglioniti with support by Marco Amadori, Michele Mostarda,
5    * Alessio Palmero Aprosio and Marco Rospocher. Contact info on http://rdfpro.fbk.eu/
6    * 
7    * To the extent possible under law, the authors have dedicated all copyright and related and
8    * neighboring rights to this software to the public domain worldwide. This software is
9    * distributed without any warranty.
10   * 
11   * You should have received a copy of the CC0 Public Domain Dedication along with this software.
12   * If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.
13   */
14  package eu.fbk.rdfpro;
15  
16  import java.io.IOException;
17  import java.util.ArrayList;
18  import java.util.List;
19  import java.util.Objects;
20  import java.util.concurrent.Semaphore;
21  import java.util.concurrent.atomic.AtomicReference;
22  import java.util.function.Consumer;
23  
24  import org.openrdf.model.Resource;
25  import org.openrdf.model.Statement;
26  import org.openrdf.model.URI;
27  import org.openrdf.model.Value;
28  import org.openrdf.model.ValueFactory;
29  import org.openrdf.rio.RDFHandler;
30  import org.openrdf.rio.RDFHandlerException;
31  import org.slf4j.Logger;
32  import org.slf4j.LoggerFactory;
33  
34  import eu.fbk.rdfpro.util.Environment;
35  import eu.fbk.rdfpro.util.Sorter;
36  import eu.fbk.rdfpro.util.Statements;
37  import eu.fbk.rdfpro.util.Tracker;
38  
39  final class ProcessorMapReduce implements RDFProcessor {
40  
41      private static final Logger LOGGER = LoggerFactory.getLogger(ProcessorMapReduce.class);
42  
43      private static final int MIN_RUNNABLE_STATEMENTS = 256; // min 256 statements per runnable
44  
45      private static final int MAX_RUNNABLE_MULTIPLIER = 4; // 4 runnables enqueued per core
46  
47      private final Mapper mapper;
48  
49      private final Reducer reducer;
50  
51      private final boolean deduplicate;
52  
53      ProcessorMapReduce(final Mapper mapper, final Reducer reducer, final boolean deduplicate) {
54          this.mapper = Objects.requireNonNull(mapper);
55          this.reducer = Objects.requireNonNull(reducer);
56          this.deduplicate = deduplicate;
57      }
58  
59      @Override
60      public RDFHandler wrap(final RDFHandler handler) {
61          return new Handler(Objects.requireNonNull(handler));
62      }
63  
64      private final class Handler extends AbstractRDFHandlerWrapper implements Consumer<Object[]> {
65  
66          private final List<Value> jobKeys;
67  
68          private final List<Statement[]> jobStatements;
69  
70          private int jobSize;
71  
72          private Value currentKey;
73  
74          private final List<Statement> currentStatements;
75  
76          private final AtomicReference<Throwable> exceptionHolder;
77  
78          private final int semaphoreSize;
79  
80          private final Semaphore semaphore;
81  
82          private Sorter<Object[]> sorter;
83  
84          private final Tracker tracker;
85  
86          Handler(final RDFHandler handler) {
87              super(handler);
88              this.jobKeys = new ArrayList<Value>();
89              this.jobStatements = new ArrayList<Statement[]>();
90              this.jobSize = 0;
91              this.currentKey = null;
92              this.currentStatements = new ArrayList<Statement>();
93              this.exceptionHolder = new AtomicReference<Throwable>();
94              this.semaphoreSize = MAX_RUNNABLE_MULTIPLIER * Environment.getCores();
95              this.semaphore = new Semaphore(this.semaphoreSize);
96              this.sorter = Sorter.newTupleSorter(true, Value.class, Value.class, Value.class,
97                      Value.class, Value.class, Long.class);
98              this.tracker = new Tracker(LOGGER, null, //
99                      "%d reductions (%d red/s avg)", //
100                     "%d reductions (%d red/s, %d red/s avg)");
101         }
102 
103         @Override
104         public void startRDF() throws RDFHandlerException {
105             super.startRDF();
106             try {
107                 this.sorter.start(ProcessorMapReduce.this.deduplicate);
108             } catch (final IOException ex) {
109                 throw new RDFHandlerException(ex);
110             }
111         }
112 
113         @Override
114         public void handleComment(final String comment) throws RDFHandlerException {
115             // dropped
116         }
117 
118         @Override
119         public void handleStatement(final Statement statement) throws RDFHandlerException {
120             final Value[] keys = ProcessorMapReduce.this.mapper.map(statement);
121             for (final Value key : keys) {
122                 if (Mapper.BYPASS_KEY.equals(key)) {
123                     super.handleStatement(statement); // bypass
124                 } else {
125                     final Value s = statement.getSubject();
126                     final Value p = statement.getPredicate();
127                     final Value o = statement.getObject();
128                     final Value c = statement.getContext();
129                     final boolean skey = Objects.equals(s, key);
130                     final boolean pkey = Objects.equals(p, key);
131                     final boolean okey = Objects.equals(o, key);
132                     final boolean ckey = Objects.equals(c, key);
133                     final Object[] record = new Object[6];
134                     record[0] = key;
135                     record[1] = skey ? null : s;
136                     record[2] = pkey ? null : p;
137                     record[3] = okey ? null : o;
138                     record[4] = ckey ? null : c;
139                     record[5] = new Long((skey ? 0x08 : 0) | (pkey ? 0x04 : 0) | (okey ? 0x02 : 0)
140                             | (ckey ? 0x01 : 0));
141                     try {
142                         this.sorter.emit(record);
143                     } catch (final IOException ex) {
144                         throw new RDFHandlerException(ex);
145                     }
146                 }
147             }
148         }
149 
150         @Override
151         public void endRDF() throws RDFHandlerException {
152             try {
153                 this.tracker.start();
154                 this.sorter.end(false, this);
155                 flush(true);
156                 this.semaphore.acquire(this.semaphoreSize);
157                 this.tracker.end();
158                 super.endRDF();
159             } catch (final InterruptedException | IOException ex) {
160                 throw new RDFHandlerException(ex);
161             } finally {
162                 this.sorter.close();
163                 this.sorter = null;
164             }
165         }
166 
167         @Override
168         public void accept(final Object[] record) {
169 
170             final Value key = (Value) record[0];
171             final int mask = ((Number) record[5]).intValue();
172 
173             final Resource s = (Resource) ((mask & 0x08) != 0 ? key : record[1]);
174             final URI p = (URI) ((mask & 0x04) != 0 ? key : record[2]);
175             final Value o = (Value) ((mask & 0x02) != 0 ? key : record[3]);
176             final Resource c = (Resource) ((mask & 0x01) != 0 ? key : record[4]);
177 
178             final ValueFactory vf = Statements.VALUE_FACTORY;
179             final Statement statement = c == null ? vf.createStatement(s, p, o) //
180                     : vf.createStatement(s, p, o, c);
181 
182             if (!key.equals(this.currentKey)) {
183                 try {
184                     flush(false);
185                 } catch (final Throwable ex) {
186                     throw new RuntimeException(ex);
187                 }
188                 this.currentKey = key;
189                 this.currentStatements.clear();
190             }
191 
192             this.currentStatements.add(statement);
193         }
194 
195         private void flush(final boolean done) throws RDFHandlerException, InterruptedException {
196 
197             final int numStmt = this.currentStatements.size();
198             if (numStmt > 0) {
199                 this.jobKeys.add(this.currentKey);
200                 this.jobStatements.add(this.currentStatements.toArray(new Statement[numStmt]));
201                 this.jobSize += numStmt;
202             }
203 
204             final int len = this.jobKeys.size();
205             if (len == 0 || !done && this.jobSize < MIN_RUNNABLE_STATEMENTS) {
206                 return;
207             }
208 
209             final Throwable exception = this.exceptionHolder.get();
210             if (exception != null) {
211                 if (exception instanceof RDFHandlerException) {
212                     throw (RDFHandlerException) exception;
213                 } else if (exception instanceof RuntimeException) {
214                     throw (RuntimeException) exception;
215                 } else if (exception instanceof Error) {
216                     throw (Error) exception;
217                 }
218                 throw new RDFHandlerException(exception);
219             }
220 
221             final Value[] jobKeys = this.jobKeys.toArray(new Value[len]);
222             final Statement[][] jobStatements = this.jobStatements.toArray(new Statement[len][]);
223             this.jobKeys.clear();
224             this.jobStatements.clear();
225             this.jobSize = 0;
226             this.semaphore.acquire(); // will block if too many runnables were submitted
227             try {
228                 Environment.getPool().execute(new Runnable() {
229 
230                     @Override
231                     public void run() {
232                         try {
233                             for (int i = 0; i < len; ++i) {
234                                 ProcessorMapReduce.this.reducer.reduce(jobKeys[i],
235                                         jobStatements[i], Handler.this.handler);
236                                 Handler.this.tracker.increment();
237                             }
238                         } catch (final Throwable ex) {
239                             synchronized (Handler.this.exceptionHolder) {
240                                 final Throwable exception = Handler.this.exceptionHolder.get();
241                                 if (exception != null) {
242                                     exception.addSuppressed(ex);
243                                 } else {
244                                     Handler.this.exceptionHolder.set(ex);
245                                 }
246                             }
247                         } finally {
248                             Handler.this.semaphore.release();
249                         }
250                     }
251 
252                 });
253             } catch (final Throwable ex) {
254                 this.semaphore.release();
255                 throw ex;
256             }
257         }
258 
259     }
260 
261 }