1   /*
2    * RDFpro - An extensible tool for building stream-oriented RDF processing libraries.
3    * 
4    * Written in 2014 by Francesco Corcoglioniti with support by Marco Amadori, Michele Mostarda,
5    * Alessio Palmero Aprosio and Marco Rospocher. Contact info on http://rdfpro.fbk.eu/
6    * 
7    * To the extent possible under law, the authors have dedicated all copyright and related and
8    * neighboring rights to this software to the public domain worldwide. This software is
9    * distributed without any warranty.
10   * 
11   * You should have received a copy of the CC0 Public Domain Dedication along with this software.
12   * If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.
13   */
14  package eu.fbk.rdfpro.util;
15  
16  import java.io.BufferedReader;
17  import java.io.EOFException;
18  import java.io.IOException;
19  import java.io.InputStream;
20  import java.io.InputStreamReader;
21  import java.io.OutputStream;
22  import java.nio.charset.Charset;
23  import java.util.ArrayList;
24  import java.util.Arrays;
25  import java.util.List;
26  import java.util.concurrent.CountDownLatch;
27  import java.util.concurrent.atomic.AtomicInteger;
28  import java.util.function.Consumer;
29  
30  import javax.annotation.Nullable;
31  
32  import org.openrdf.model.BNode;
33  import org.openrdf.model.Literal;
34  import org.openrdf.model.Resource;
35  import org.openrdf.model.Statement;
36  import org.openrdf.model.URI;
37  import org.openrdf.model.Value;
38  import org.openrdf.model.ValueFactory;
39  import org.openrdf.model.vocabulary.XMLSchema;
40  import org.slf4j.Logger;
41  import org.slf4j.LoggerFactory;
42  
43  public abstract class Sorter<T> implements AutoCloseable {
44  
45      private static final Logger LOGGER = LoggerFactory.getLogger(Sorter.class);
46  
47      @Nullable
48      private Dictionary dictionary;
49  
50      @Nullable
51      private Process sortProcess;
52  
53      @Nullable
54      private OutputStream sortOut;
55  
56      @Nullable
57      private InputStream sortIn;
58  
59      @Nullable
60      private Tracker writeTracker;
61  
62      @Nullable
63      private Tracker readTracker;
64  
65      @Nullable
66      private List<Output> outputs;
67  
68      @Nullable
69      private ThreadLocal<Output> threadOutput;
70  
71      @Nullable
72      private List<Input> inputs;
73  
74      @Nullable
75      private CountDownLatch decodersLatch;
76  
77      @Nullable
78      private Throwable exception;
79  
80      private boolean startable;
81  
82      public static Sorter<Statement> newStatementSorter(final boolean compress) {
83          return new StatementSorter(compress); // TODO: add configurable component order
84      }
85  
86      public static Sorter<Object[]> newTupleSorter(final boolean compress, final Class<?>... schema) {
87          return new TupleSorter(compress, schema);
88      }
89  
90      protected Sorter() {
91          // No initialization here: done in start()
92          this.startable = true;
93      }
94  
95      public void start(final boolean deduplicate) throws IOException {
96  
97          // Check state
98          synchronized (this) {
99              if (!this.startable) {
100                 throw new IllegalArgumentException();
101             }
102             this.startable = false;
103         }
104 
105         // Allocate dictionary indexes
106         this.dictionary = new Dictionary();
107 
108         // Setup streams for sending data to sort
109         this.outputs = new ArrayList<Output>();
110         this.threadOutput = new ThreadLocal<Output>() {
111 
112             @Override
113             protected Output initialValue() {
114                 final OutputStream out = IO.parallelBuffer(Sorter.this.sortOut, (byte) 0);
115                 final Output output = new Output(out, Sorter.this.dictionary);
116                 synchronized (Sorter.this.outputs) {
117                     Sorter.this.outputs.add(output);
118                 }
119                 return output;
120             }
121 
122         };
123 
124         // Invoke sort
125         final List<String> command = new ArrayList<String>(Arrays.asList(Environment.getProperty(
126                 "rdfpro.cmd.sort", "sort").split("\\s+")));
127         command.add("-z"); // zero-terminated lines
128         if (deduplicate) {
129             command.add("-u"); // remove duplicates
130         }
131         final ProcessBuilder builder = new ProcessBuilder(command);
132         builder.environment().put("LC_ALL", "C"); // case sensitive sort
133         this.sortProcess = builder.start();
134 
135         // Retrieve input and output streams
136         this.sortOut = this.sortProcess.getOutputStream();
137         this.sortIn = this.sortProcess.getInputStream();
138 
139         // Launch a task to log STDERR at ERROR level
140         Environment.getPool().submit(new Runnable() {
141 
142             @Override
143             public void run() {
144                 final BufferedReader in = new BufferedReader(new InputStreamReader(
145                         Sorter.this.sortProcess.getErrorStream(), Charset.forName("UTF-8")));
146                 try {
147                     String line;
148                     while ((line = in.readLine()) != null) {
149                         LOGGER.error("[sort] {}", line);
150                     }
151                 } catch (final Throwable ex) {
152                     LOGGER.error("[sort] failed to read from stream", ex);
153                 } finally {
154                     IO.closeQuietly(in);
155                 }
156             }
157 
158         });
159 
160         // Initialize trackers
161         this.writeTracker = new Tracker(LOGGER, null, //
162                 "%d records to sort (%d rec/s avg)", //
163                 "%d records to sort (%d rec/s, %d rec/s avg)");
164         this.readTracker = new Tracker(LOGGER, null, //
165                 "%d records from sort (%d rec/s avg)", //
166                 "%d records from sort (%d rec/s, %d rec/s avg)");
167 
168         // Start write tracker
169         this.writeTracker.start();
170     }
171 
172     public void emit(final T element) throws IOException {
173         final Output output;
174         try {
175             output = this.threadOutput.get();
176         } catch (final NullPointerException ex) {
177             throw new IllegalStateException();
178         }
179         encode(output, element);
180         output.endRecord();
181         this.writeTracker.increment();
182     }
183 
184     public void end(final boolean parallelize, final Consumer<T> consumer) throws IOException {
185 
186         // Check state and invalidate thread local to reject further elements
187         synchronized (this) {
188             if (this.threadOutput == null) {
189                 throw new IllegalStateException();
190             }
191             this.threadOutput = null;
192             this.writeTracker.end();
193         }
194 
195         // Log dictionary status
196         LOGGER.debug("Dictionary status:\n{}", this.dictionary);
197 
198         try {
199             // Complete data sending to sort
200             try {
201                 for (final Output output : this.outputs) {
202                     output.close();
203                 }
204                 this.outputs.clear();
205             } finally {
206                 this.sortOut.close();
207             }
208 
209             // Consume sort output, possibly using multiple decode threads
210             final int decoders = parallelize ? Environment.getCores() : 1;
211             this.decodersLatch = new CountDownLatch(decoders);
212             this.inputs = new ArrayList<Input>();
213             this.readTracker.start();
214             // if (!parallelize) {
215             // this.inputs.add(new Input(IO.buffer(this.sortIn), this.dictionary));
216             // tryDecode(this.inputs.get(0), consumer);
217             // } else {
218             for (int i = 0; i < decoders; ++i) {
219                 final InputStream in = IO.parallelBuffer(this.sortIn, (byte) 0);
220                 this.inputs.add(new Input(in, this.dictionary));
221             }
222             for (int i = 1; i < decoders; ++i) {
223                 final Input input = this.inputs.get(i);
224                 Environment.getPool().execute(new Runnable() {
225 
226                     @Override
227                     public void run() {
228                         tryDecode(input, consumer);
229                     }
230 
231                 });
232             }
233             tryDecode(this.inputs.get(0), consumer);
234             // }
235             this.decodersLatch.await();
236             this.readTracker.end();
237 
238         } catch (final Throwable ex) {
239             this.exception = ex;
240 
241         } finally {
242             // Close streams and propagate exception, if any
243             IO.closeQuietly(this.sortIn);
244             if (this.inputs != null) {
245                 for (final Input input : this.inputs) {
246                     input.close();
247                 }
248             }
249             if (this.exception != null) {
250                 if (this.exception instanceof IOException) {
251                     throw (IOException) this.exception;
252                 } else if (this.exception instanceof RuntimeException) {
253                     throw (RuntimeException) this.exception;
254                 } else if (this.exception instanceof Error) {
255                     throw (Error) this.exception;
256                 }
257                 throw new RuntimeException(this.exception);
258             }
259         }
260     }
261 
262     private void tryDecode(final Input input, final Consumer<T> consumer) {
263         try {
264             while (input.nextRecord()) {
265                 final T element = decode(input);
266                 consumer.accept(element);
267                 this.readTracker.increment();
268             }
269             input.close();
270         } catch (final Throwable ex) {
271             if (this.exception == null) {
272                 this.exception = ex;
273                 for (final Input r : this.inputs) {
274                     r.close(); // forces other decoders threads to abort
275                 }
276             }
277         } finally {
278             this.decodersLatch.countDown();
279         }
280     }
281 
282     @Override
283     public void close() {
284         try {
285             // Kill the sort process, if still active. This will ultimately break ongoing threads
286             if (this.sortProcess != null) {
287                 this.sortProcess.destroy();
288             }
289 
290         } catch (final Throwable ex) {
291             LOGGER.error("Exception caught while killing sort process", ex);
292 
293         } finally {
294             // Mark as non startable and release everything
295             this.startable = false;
296             this.dictionary = null;
297             this.sortProcess = null;
298             this.sortOut = null;
299             this.sortIn = null;
300             this.outputs = null;
301             this.threadOutput = null;
302             this.inputs = null;
303             this.decodersLatch = null;
304             this.exception = null;
305         }
306     }
307 
308     protected abstract void encode(Output output, final T element) throws IOException;
309 
310     protected abstract T decode(Input input) throws IOException;
311 
312     // bits len len mask hi mask layout
313     // 06 01 0x40 0x3F 01 6
314     // 13 02 0x80 0x3F 10 6 7
315     // 18 03 0xC0 0x0F 1100 4 7 7
316     // 25 04 0xD0 0x0F 1101 4 7 7 7
317     // 32 05 0xE0 0x0F 1110 4 7 7 7 7
318     // 37 06 0xF0 0x03 111100 2 7 7 7 7 7
319     // 44 07 0xF4 0x03 111101 2 7 7 7 7 7 7
320     // 51 08 0xF8 0x03 111110 2 7 7 7 7 7 7 7
321     // 57 09 0xFC 0x01 1111110 1 7 7 7 7 7 7 7 7
322     // 64 10 9xFE 0x01 1111111 1 7 7 7 7 7 7 7 7 7
323 
324     // 0 7 -- 7
325     // 10 6 7 -- 13 = 0x1FFF
326     // 11 6 7 7 -- 20
327 
328     public static final class Output {
329 
330         private final OutputStream out;
331 
332         private final Dictionary dictionary;
333 
334         private final int[] remaining;
335 
336         Output(final OutputStream out, final Dictionary dictionary) {
337             this.out = out;
338             this.dictionary = dictionary;
339             this.remaining = new int[] { -1 };
340         }
341 
342         void endRecord() throws IOException {
343             this.out.write(0);
344         }
345 
346         void close() throws IOException {
347             this.out.close();
348         }
349 
350         public final void writeStatement(@Nullable final Statement statement,
351                 final boolean compress) throws IOException {
352             if (statement == null) {
353                 writeValue(null, compress);
354             } else {
355                 writeValue(statement.getSubject(), compress);
356                 writeValue(statement.getPredicate(), compress);
357                 writeValue(statement.getObject(), compress);
358                 writeValue(statement.getContext(), compress);
359             }
360         }
361 
362         public final void writeValue(@Nullable final Value value, final boolean compress)
363                 throws IOException {
364             if (value == null) {
365                 write(1);
366             } else if (value instanceof BNode) {
367                 writeStringHelper(((BNode) value).getID(), 0, 1);
368             } else if (value instanceof Literal) {
369                 final Literal lit = (Literal) value;
370                 final String lang = lit.getLanguage();
371                 if (lang == null) {
372                     final URI dt = lit.getDatatype();
373                     if (dt == null || XMLSchema.STRING.equals(dt)) {
374                         writeStringHelper(lit.getLabel(), 0, 2);
375                     } else {
376                         final int key = !compress ? -1 : this.dictionary.encodeDatatype(dt);
377                         if (key < 0) {
378                             writeStringHelper(lit.getLabel(), 0, 3);
379                             writeStringHelper(dt.stringValue(), 0, 1);
380                         } else {
381                             writeStringHelper(lit.getLabel(), 0, 4);
382                             writeNumber(key);
383                         }
384                     }
385                 } else {
386                     final int key = !compress ? -1 : this.dictionary.encodeLanguage(lang);
387                     if (key < 0) {
388                         writeStringHelper(lit.getLabel(), 0, 3);
389                         writeStringHelper(lang, 0, 2);
390                     } else {
391                         writeStringHelper(lit.getLabel(), 0, 5);
392                         writeNumber(key);
393                     }
394                 }
395             } else if (value instanceof URI) {
396                 final URI uri = (URI) value;
397                 boolean done = false;
398                 if (compress) {
399                     final int key = this.dictionary.encodeURI(uri, this.remaining);
400                     if (key >= 0) {
401                         final int r = this.remaining[0];
402                         if (r < 0) {
403                             write(7);
404                             writeNumber(key);
405                             done = true;
406                         } else {
407                             writeStringHelper(uri.stringValue(), r, 7);
408                             writeNumber(key);
409                             done = true;
410                         }
411                     }
412                 }
413                 if (!done) {
414                     writeStringHelper(uri.stringValue(), 0, 6);
415                 }
416             }
417         }
418 
419         public final void writeString(@Nullable final String s) throws IOException {
420             if (s == null) {
421                 write(0x02);
422             } else {
423                 writeStringHelper(s, 0, 0x01);
424             }
425         }
426 
427         public final void writeNumber(final long num) throws IOException {
428             if (num < 0L || num > 0x1FFFFFFFFFFFFFFL /* 57 bit */) {
429                 writeNumberHelper(10, 0xFE, num);
430             } else if (num <= 0x3FL /* 6 bit */) {
431                 writeNumberHelper(1, 0x40, num);
432             } else if (num <= 0x1FFFL /* 13 bit */) {
433                 writeNumberHelper(2, 0x80, num);
434             } else if (num <= 0x3FFFFL /* 18 bit */) {
435                 writeNumberHelper(3, 0xC0, num);
436             } else if (num <= 0x1FFFFFFL /* 25 bit */) {
437                 writeNumberHelper(4, 0xD0, num);
438             } else if (num <= 0xFFFFFFFFL /* 32 bit */) {
439                 writeNumberHelper(5, 0xE0, num);
440             } else if (num <= 0x1FFFFFFFFFL /* 37 bit */) {
441                 writeNumberHelper(6, 0xF0, num);
442             } else if (num <= 0xFFFFFFFFFFFL /* 44 bit */) {
443                 writeNumberHelper(7, 0xF4, num);
444             } else if (num <= 0x7FFFFFFFFFFFFL /* 51 bit */) {
445                 writeNumberHelper(8, 0xF8, num);
446             } else {
447                 writeNumberHelper(9, 0xFC, num);
448             }
449         }
450 
451         // B = 1011 1111
452 
453         // 0x1FFF xx11 1111 x111 1111
454 
455         private void writeStringHelper(final String s, final int offset, final int delimiter)
456                 throws IOException {
457             final int len = s.length();
458             for (int i = offset; i < len; ++i) {
459                 int c = s.charAt(i);
460                 if (c <= 0x07) {
461                     c += 0xFFFF;
462                 }
463                 if (c <= 0x7F) {
464                     write(c);
465                 } else if (c <= 0x1FFF) {
466                     write(0x80 | c >> 7);
467                     write(0x80 | c & 0x7F);
468                 } else {
469                     write(0xC0 | c >> 14);
470                     write(0x80 | c >> 7 & 0x7F);
471                     write(0x80 | c & 0x7F);
472                 }
473             }
474             write(delimiter);
475         }
476 
477         private void writeNumberHelper(final int len, final int mask, final long num)
478                 throws IOException {
479             write(mask | (int) (num >>> (len - 1) * 7));
480             for (int i = len - 2; i >= 0; --i) {
481                 write(0x80 | (int) (num >>> i * 7 & 0x7F));
482             }
483         }
484 
485         private void write(final int b) throws IOException {
486             assert (b & 0xFF) != 0;
487             this.out.write(b);
488         }
489 
490     }
491 
492     public static final class Input {
493 
494         private final InputStream in;
495 
496         private final Dictionary dictionary;
497 
498         private final StringBuilder builder;
499 
500         private int c;
501 
502         Input(final InputStream in, final Dictionary dictionary) {
503             this.in = in;
504             this.dictionary = dictionary;
505             this.builder = new StringBuilder();
506             this.c = 0;
507         }
508 
509         boolean nextRecord() throws IOException {
510             while (this.c != 0) {
511                 LOGGER.warn("Skipping " + this.c);
512                 this.c = this.in.read();
513                 if (this.c < 0) {
514                     throw new EOFException("EOF found before completing read of record");
515                 }
516             }
517             this.c = this.in.read();
518             if (this.c < 0) {
519                 return false; // EOF reached, no more records
520             }
521             if (this.c == 0) {
522                 throw new Error("Empty record!");
523             }
524             return true;
525         }
526 
527         void close() {
528             IO.closeQuietly(this.in);
529         }
530 
531         public final boolean isEOF() {
532             return this.c <= 0;
533         }
534 
535         @Nullable
536         public final Statement readStatement() throws IOException {
537 
538             final Resource s = (Resource) readValue();
539             if (s == null) {
540                 return null;
541             }
542 
543             final URI p = (URI) readValue();
544             final Value o = readValue();
545             final Resource c = (Resource) readValue();
546 
547             final ValueFactory vf = Statements.VALUE_FACTORY;
548             return c == null ? vf.createStatement(s, p, o) : vf.createStatement(s, p, o, c);
549         }
550 
551         @Nullable
552         public final Value readValue() throws IOException {
553             final int delim = readStringHelper();
554             if (delim == 1 && this.builder.length() == 0) {
555                 return null;
556             }
557             final String s = this.builder.toString();
558             final ValueFactory vf = Statements.VALUE_FACTORY;
559             if (delim == 1) {
560                 return vf.createBNode(this.builder.toString());
561             } else if (delim == 2) {
562                 return vf.createLiteral(s);
563             } else if (delim == 3) {
564                 final int delim2 = readStringHelper();
565                 final String s2 = this.builder.toString();
566                 if (delim2 == 1) {
567                     return vf.createLiteral(s, vf.createURI(s2));
568                 } else {
569                     return vf.createLiteral(s, s2);
570                 }
571             } else if (delim == 4) {
572                 final int key = (int) readNumber();
573                 final URI dt = this.dictionary.decodeDatatype(key);
574                 return vf.createLiteral(s, dt);
575             } else if (delim == 5) {
576                 final int key = (int) readNumber();
577                 final String lang = this.dictionary.decodeLanguage(key);
578                 return vf.createLiteral(s, lang);
579             } else if (delim == 6) {
580                 return vf.createURI(s);
581             } else if (delim == 7) {
582                 final int key = (int) readNumber();
583                 return this.dictionary.decodeURI(key, s.isEmpty() ? null : s);
584             }
585             throw new IllegalArgumentException("Invalid value delimiter: " + delim);
586         }
587 
588         @Nullable
589         public final String readString() throws IOException {
590             final int delimiter = readStringHelper();
591             if (delimiter == 0x02) {
592                 return null;
593             } else if (delimiter != 0x01) {
594                 throw new IOException("Found invalid string delimiter: " + delimiter);
595             }
596             return this.builder.toString();
597         }
598 
599         public final long readNumber() throws IOException {
600             final int b = read();
601             if (b <= 0x40 + 0x3F) {
602                 return readNumberHelper(1, b & 0x3F);
603             } else if (b <= 0x80 + 0x3F) {
604                 return readNumberHelper(2, b & 0x3F);
605             } else if (b <= 0xC0 + 0x0F) {
606                 return readNumberHelper(3, b & 0x0F);
607             } else if (b <= 0xD0 + 0x0F) {
608                 return readNumberHelper(4, b & 0x0F);
609             } else if (b <= 0xE0 + 0x0F) {
610                 return readNumberHelper(5, b & 0x0F);
611             } else if (b <= 0xF0 + 0x03) {
612                 return readNumberHelper(6, b & 0x03);
613             } else if (b <= 0xF4 + 0x03) {
614                 return readNumberHelper(7, b & 0x03);
615             } else if (b <= 0xF8 + 0x03) {
616                 return readNumberHelper(8, b & 0x03);
617             } else if (b <= 0xFC + 0x01) {
618                 return readNumberHelper(9, b & 0x01);
619             } else {
620                 return readNumberHelper(10, b & 0x01);
621             }
622         }
623 
624         private int readStringHelper() throws IOException {
625             this.builder.setLength(0);
626             while (true) {
627                 final int c = read();
628                 if (c <= 0x07) {
629                     return c;
630                 } else if (c <= 0x7F) {
631                     this.builder.append((char) c);
632                 } else if (c <= 0xBF) {
633                     final int c1 = read();
634                     final int n = (c & 0x3F) << 7 | c1 & 0x7F;
635                     this.builder.append((char) n);
636                 } else {
637                     final int c1 = read();
638                     final int c2 = read();
639                     int n = (c & 0x3F) << 14 | (c1 & 0x7F) << 7 | c2 & 0x7F;
640                     if (n > 0xFFFF) {
641                         n = n - 0xFFFF;
642                     }
643                     this.builder.append((char) n);
644                 }
645             }
646         }
647 
648         private long readNumberHelper(final int len, final int start) throws IOException {
649             long num = start;
650             for (int i = 1; i < len; ++i) {
651                 final int c = read();
652                 num = num << 7 | c & 0x7F;
653             }
654             return num;
655         }
656 
657         private int read() throws IOException {
658             final int result = this.c;
659             if (result <= 0) {
660                 throw new EOFException("Byte is " + result);
661             }
662             this.c = this.in.read();
663             return result;
664         }
665 
666     }
667 
668     private static final class StatementSorter extends Sorter<Statement> {
669 
670         private final boolean compress;
671 
672         StatementSorter(final boolean compress) {
673             this.compress = compress;
674         }
675 
676         @Override
677         protected void encode(final Output output, final Statement record) throws IOException {
678             output.writeStatement(record, this.compress);
679         }
680 
681         @Override
682         protected Statement decode(final Input input) throws IOException {
683             return input.readStatement();
684         }
685 
686     }
687 
688     private static final class TupleSorter extends Sorter<Object[]> {
689 
690         private static final int TYPE_STATEMENT = 0;
691 
692         private static final int TYPE_VALUE = 1;
693 
694         private static final int TYPE_STRING = 2;
695 
696         private static final int TYPE_NUMBER = 3;
697 
698         private final boolean compress;
699 
700         private final int[] schema;
701 
702         TupleSorter(final boolean compress, final Class<?>... schema) {
703             this.compress = compress;
704             this.schema = new int[schema.length];
705             for (int i = 0; i < schema.length; ++i) {
706                 final Class<?> clazz = schema[i];
707                 if (Statement.class.equals(clazz)) {
708                     this.schema[i] = TYPE_STATEMENT;
709                 } else if (Value.class.equals(clazz)) {
710                     this.schema[i] = TYPE_VALUE;
711                 } else if (String.class.equals(clazz)) {
712                     this.schema[i] = TYPE_STRING;
713                 } else if (Long.class.equals(clazz)) {
714                     this.schema[i] = TYPE_NUMBER;
715                 } else {
716                     throw new IllegalArgumentException("Unsupported tuple field: " + clazz);
717                 }
718             }
719         }
720 
721         @Override
722         protected void encode(final Output output, final Object[] record) throws IOException {
723             for (int i = 0; i < this.schema.length; ++i) {
724                 final Object field = record[i];
725                 final int type = this.schema[i];
726                 switch (type) {
727                 case TYPE_STATEMENT:
728                     output.writeStatement((Statement) field, this.compress);
729                     break;
730                 case TYPE_VALUE:
731                     output.writeValue((Value) field, this.compress);
732                     break;
733                 case TYPE_STRING:
734                     output.writeString((String) field);
735                     break;
736                 case TYPE_NUMBER:
737                     output.writeNumber(((Number) field).longValue());
738                     break;
739                 default:
740                     throw new Error("Unexpected type " + type);
741                 }
742             }
743         }
744 
745         @Override
746         protected Object[] decode(final Input input) throws IOException {
747             final Object[] record = new Object[this.schema.length];
748             for (int i = 0; i < this.schema.length; ++i) {
749                 final int type = this.schema[i];
750                 switch (type) {
751                 case TYPE_STATEMENT:
752                     record[i] = input.readStatement();
753                     break;
754                 case TYPE_VALUE:
755                     record[i] = input.readValue();
756                     break;
757                 case TYPE_STRING:
758                     record[i] = input.readString();
759                     break;
760                 case TYPE_NUMBER:
761                     record[i] = input.readNumber();
762                     break;
763                 default:
764                     throw new Error("Unexpected type " + type);
765                 }
766             }
767             return record;
768         }
769 
770     }
771 
772     private static final class Dictionary {
773 
774         private static final int LANGUAGE_INDEX_SIZE = 1024;
775 
776         private static final int DATATYPE_INDEX_SIZE = 1024;
777 
778         private static final int NAMESPACE_INDEX_SIZE = 256 * 1024;
779 
780         private static final int VOCAB_INDEX_SIZE = 64 * 1024;
781 
782         private static final int OTHER_INDEX_SIZE = 4 * 1024;
783 
784         private static final int URI_CACHE_SIZE = 8191;
785 
786         private final GenericIndex<String> languageIndex;
787 
788         private final GenericIndex<URI> datatypeIndex;
789 
790         private final StringIndex namespaceIndex;
791 
792         private final StringIndex vocabNameIndex;
793 
794         private final StringIndex otherNameIndex;
795 
796         private final int vocabNamespaces;
797 
798         private final int[] uriCacheCodes;
799 
800         private final URI[] uriCacheURIs;
801 
802         private final Object[] uriCacheLocks;
803 
804         public Dictionary() {
805             this.languageIndex = new GenericIndex<String>(LANGUAGE_INDEX_SIZE);
806             this.datatypeIndex = new GenericIndex<URI>(DATATYPE_INDEX_SIZE);
807             this.namespaceIndex = new StringIndex(NAMESPACE_INDEX_SIZE);
808             this.vocabNameIndex = new StringIndex(VOCAB_INDEX_SIZE);
809             this.otherNameIndex = new StringIndex(OTHER_INDEX_SIZE);
810 
811             for (final String ns : Namespaces.DEFAULT.uris()) {
812                 int nsHash = 0;
813                 for (int i = ns.length() - 1; i >= 0; --i) {
814                     final int c = ns.charAt(i);
815                     nsHash = nsHash * 31 + c;
816                 }
817                 this.namespaceIndex.put(ns, 0, ns.length(), nsHash, false);
818             }
819             this.vocabNamespaces = Namespaces.DEFAULT.uris().size();
820 
821             this.vocabNameIndex.put("", 0, 0, 0, false);
822             this.otherNameIndex.put("", 0, 0, 0, false);
823 
824             this.uriCacheCodes = new int[URI_CACHE_SIZE];
825             this.uriCacheURIs = new URI[URI_CACHE_SIZE];
826             this.uriCacheLocks = new Object[32];
827             for (int i = 0; i < 32; ++i) {
828                 this.uriCacheLocks[i] = new Object();
829             }
830         }
831 
832         public int encodeLanguage(final String language) {
833             return this.languageIndex.put(language);
834         }
835 
836         public String decodeLanguage(final int code) {
837             return this.languageIndex.get(code);
838         }
839 
840         public int encodeDatatype(final URI datatype) {
841             return this.datatypeIndex.put(datatype);
842         }
843 
844         public URI decodeDatatype(final int code) {
845             return this.datatypeIndex.get(code);
846         }
847 
848         public int encodeURI(final URI uri, final int[] remaining) {
849 
850             final String s = uri.stringValue();
851             final int len = s.length();
852 
853             int nameHash = 0;
854             int nsLen = len;
855             while (--nsLen >= 0) {
856                 final int c = s.charAt(nsLen);
857                 if (c == '#' || c == '/' || c == ':') {
858                     break;
859                 }
860                 nameHash = nameHash * 31 + c;
861             }
862 
863             int nsHash = 0;
864             for (int i = nsLen; i >= 0; --i) {
865                 final int c = s.charAt(i);
866                 nsHash = nsHash * 31 + c;
867             }
868             ++nsLen;
869 
870             final int nsKey = this.namespaceIndex.put(s, 0, nsLen, nsHash, false);
871             if (nsKey < 0) {
872                 remaining[0] = -1;
873                 return -1;
874             }
875 
876             if (nsKey < this.vocabNamespaces) {
877                 final int nameKey = this.vocabNameIndex.put(s, nsLen, len, nameHash, true);
878                 if (nameKey < 0) {
879                     remaining[0] = nsLen;
880                     return nsKey;
881                 } else {
882                     final int code = nameKey * this.vocabNamespaces + nsKey << 1;
883                     remaining[0] = -1;
884                     return code;
885                 }
886             }
887 
888             final int nameKey = this.otherNameIndex.put(s, nsLen, len, nameHash, false);
889             if (nameKey < 0) {
890                 remaining[0] = nsLen;
891                 return nsKey;
892             } else {
893                 final int code = nameKey * NAMESPACE_INDEX_SIZE + nsKey << 1 | 0x01;
894                 remaining[0] = -1;
895                 return code;
896             }
897         }
898 
899         public URI decodeURI(final int code, final String remaining) {
900 
901             if (remaining != null && !remaining.isEmpty()) {
902                 final String ns = this.namespaceIndex.get(code);
903                 return Statements.VALUE_FACTORY.createURI(ns, remaining);
904             }
905 
906             final int offset = code % this.uriCacheURIs.length;
907             final Object lock = this.uriCacheLocks[offset % this.uriCacheLocks.length];
908             synchronized (lock) {
909                 final int cachedCode = this.uriCacheCodes[offset];
910                 if (cachedCode == code) {
911                     return this.uriCacheURIs[offset];
912                 }
913             }
914 
915             final boolean isVocab = (code & 0x01) == 0;
916             final int c = code >>> 1;
917 
918             URI uri;
919             if (isVocab) {
920                 final int nsKey = c % this.vocabNamespaces;
921                 final int nameKey = c / this.vocabNamespaces;
922                 final String ns = this.namespaceIndex.get(nsKey);
923                 final String name = this.vocabNameIndex.get(nameKey);
924                 uri = Statements.VALUE_FACTORY.createURI(ns, name);
925             } else {
926                 final int nsKey = c % NAMESPACE_INDEX_SIZE;
927                 final int nameKey = c / NAMESPACE_INDEX_SIZE;
928                 final String ns = this.namespaceIndex.get(nsKey);
929                 final String name = this.otherNameIndex.get(nameKey);
930                 uri = Statements.VALUE_FACTORY.createURI(ns, name);
931             }
932 
933             synchronized (lock) {
934                 this.uriCacheURIs[offset] = uri;
935                 this.uriCacheCodes[offset] = code;
936             }
937 
938             return uri;
939         }
940 
941         @Override
942         public String toString() {
943             final StringBuilder builder = new StringBuilder();
944             builder.append("language index:  ").append(this.languageIndex).append("\n");
945             builder.append("datatype index:  ").append(this.datatypeIndex).append("\n");
946             builder.append("namespace index: ").append(this.namespaceIndex).append("\n");
947             builder.append("vocab index:     ").append(this.vocabNameIndex).append("\n");
948             builder.append("other index:     ").append(this.otherNameIndex);
949             return builder.toString();
950         }
951 
952         private static final class StringIndex {
953 
954             private static final int NUM_LOCKS = 32;
955 
956             private final int capacity;
957 
958             private final AtomicInteger size;
959 
960             private final List<String> list;
961 
962             private final int[] table;
963 
964             private final Object[] locks;
965 
966             StringIndex(final int capacity) {
967                 this.capacity = capacity;
968                 this.size = new AtomicInteger(0);
969                 this.list = new ArrayList<String>();
970                 this.table = new int[2 * (this.capacity * 4 - 1)];
971                 this.locks = new Object[NUM_LOCKS];
972                 for (int i = 0; i < NUM_LOCKS; ++i) {
973                     this.locks[i] = new Object();
974                 }
975             }
976 
977             @Nullable
978             public int put(final String string, final int begin, final int end, final int hash,
979                     final boolean likelyExist) {
980 
981                 final int tableSize = this.table.length / 2;
982                 final int segmentSize = (tableSize + NUM_LOCKS - 1) / NUM_LOCKS;
983 
984                 int index = (hash & 0x7FFFFFFF) % tableSize;
985 
986                 final int segment = index / segmentSize;
987                 final int segmentStart = segment * segmentSize;
988                 final int segmentEnd = Math.min(tableSize, segmentStart + segmentSize);
989 
990                 final boolean full = this.size.get() >= this.capacity;
991 
992                 // first we operate read-only with no synchronization if possible
993                 if (full || likelyExist) {
994                     for (int i = 0; i < segmentSize; ++i) {
995                         final int offset = index * 2;
996                         final int key = this.table[offset] - 1;
997                         if (key < 0) {
998                             if (full) {
999                                 return -1;
1000                             }
1001                             break;
1002                         } else if (hash == this.table[offset + 1]
1003                                 && equals(this.list.get(key), string, begin, end)) {
1004                             return key;
1005                         }
1006                         ++index;
1007                         if (index >= segmentEnd) {
1008                             index = segmentStart;
1009                         }
1010                     }
1011                 }
1012 
1013                 // then operate read-write with synchronization using lock striping
1014                 // NOTE: index may have changed
1015                 synchronized (this.locks[segment]) {
1016                     for (int i = 0; i < segmentSize; ++i) {
1017                         final int offset = index * 2;
1018                         final int key = this.table[offset] - 1;
1019                         if (key < 0) {
1020                             synchronized (this.list) {
1021                                 final int newKey = this.size.get();
1022                                 if (newKey >= this.capacity) {
1023                                     return -1;
1024                                 }
1025                                 this.list.add(string.substring(begin, end));
1026                                 this.table[offset] = newKey + 1;
1027                                 this.table[offset + 1] = hash;
1028                                 this.size.incrementAndGet(); // should flush memory changes
1029                                 return newKey;
1030                             }
1031                         } else if (hash == this.table[offset + 1]
1032                                 && equals(this.list.get(key), string, begin, end)) {
1033                             return key;
1034                         }
1035                         ++index;
1036                         if (index >= segmentEnd) {
1037                             index = segmentStart;
1038                         }
1039                     }
1040                 }
1041 
1042                 return -1; // segment full (unlikely, thus we handle but don't optimize this case)
1043             }
1044 
1045             @Nullable
1046             public String get(final int key) {
1047                 final String result = this.list.get(key);
1048                 if (result == null) {
1049                     throw new IllegalArgumentException("No element for key " + key);
1050                 }
1051                 return result;
1052             }
1053 
1054             @Override
1055             public String toString() {
1056                 return this.size.get() + "/" + this.capacity;
1057             }
1058 
1059             private boolean equals(final String reference, final String string, final int begin,
1060                     final int end) {
1061                 final int len = reference.length();
1062                 if (len == end - begin) {
1063                     int i = len;
1064                     int j = end;
1065                     while (--i >= 0) {
1066                         --j;
1067                         if (reference.charAt(i) != string.charAt(j)) {
1068                             return false;
1069                         }
1070                     }
1071                     return true;
1072                 }
1073                 return false;
1074             }
1075 
1076         }
1077 
1078         private static final class GenericIndex<T> {
1079 
1080             private final int capacity;
1081 
1082             private final List<T> list;
1083 
1084             private final int[] table;
1085 
1086             GenericIndex(final int capacity) {
1087                 this.capacity = capacity;
1088                 this.list = new ArrayList<T>();
1089                 this.table = new int[2 * (this.capacity * 4 - 1)];
1090             }
1091 
1092             @Nullable
1093             public int put(final T element) {
1094 
1095                 final int tableSize = this.table.length / 2;
1096                 final int hash = element.hashCode();
1097                 int index = (hash & 0x7FFFFFFF) % tableSize;
1098 
1099                 // first we operate read-only with no synchronization, as it is likely the entry
1100                 // is already there (for datatypes and langagues, at least)
1101                 while (true) {
1102                     final int offset = index * 2;
1103                     final int key = this.table[offset] - 1;
1104                     if (key < 0) {
1105                         break;
1106                     } else if (hash == this.table[offset + 1]
1107                             && element.equals(this.list.get(key))) {
1108                         return key;
1109                     }
1110                     index = (index + 1) % tableSize;
1111                 }
1112 
1113                 // then we operate read-write with a simple global lock (few insertions expected)
1114                 synchronized (this.list) {
1115                     while (true) {
1116                         final int offset = index * 2;
1117                         final int key = this.table[offset] - 1;
1118                         if (key < 0) {
1119                             final int newKey = this.list.size();
1120                             if (newKey >= this.capacity) {
1121                                 return -1;
1122                             }
1123                             this.list.add(element);
1124                             this.table[offset] = newKey + 1;
1125                             this.table[offset + 1] = hash;
1126                             return newKey;
1127                         } else if (hash == this.table[offset + 1]
1128                                 && element.equals(this.list.get(key))) {
1129                             return key;
1130                         }
1131                         index = (index + 1) % tableSize;
1132                     }
1133                 }
1134             }
1135 
1136             @Nullable
1137             public T get(final int key) {
1138                 final T result = this.list.get(key);
1139                 if (result == null) {
1140                     throw new IllegalArgumentException("No element for key " + key);
1141                 }
1142                 return result;
1143             }
1144 
1145             @Override
1146             public String toString() {
1147                 return this.list.size() + "/" + this.capacity;
1148             }
1149 
1150         }
1151 
1152     }
1153 
1154 }