1
2
3
4
5
6
7
8
9
10
11
12
13
14 package eu.fbk.rdfpro.util;
15
16 import java.io.BufferedReader;
17 import java.io.EOFException;
18 import java.io.IOException;
19 import java.io.InputStream;
20 import java.io.InputStreamReader;
21 import java.io.OutputStream;
22 import java.nio.charset.Charset;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.List;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.atomic.AtomicInteger;
28 import java.util.function.Consumer;
29
30 import javax.annotation.Nullable;
31
32 import org.openrdf.model.BNode;
33 import org.openrdf.model.Literal;
34 import org.openrdf.model.Resource;
35 import org.openrdf.model.Statement;
36 import org.openrdf.model.URI;
37 import org.openrdf.model.Value;
38 import org.openrdf.model.ValueFactory;
39 import org.openrdf.model.vocabulary.XMLSchema;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43 public abstract class Sorter<T> implements AutoCloseable {
44
45 private static final Logger LOGGER = LoggerFactory.getLogger(Sorter.class);
46
47 @Nullable
48 private Dictionary dictionary;
49
50 @Nullable
51 private Process sortProcess;
52
53 @Nullable
54 private OutputStream sortOut;
55
56 @Nullable
57 private InputStream sortIn;
58
59 @Nullable
60 private Tracker writeTracker;
61
62 @Nullable
63 private Tracker readTracker;
64
65 @Nullable
66 private List<Output> outputs;
67
68 @Nullable
69 private ThreadLocal<Output> threadOutput;
70
71 @Nullable
72 private List<Input> inputs;
73
74 @Nullable
75 private CountDownLatch decodersLatch;
76
77 @Nullable
78 private Throwable exception;
79
80 private boolean startable;
81
82 public static Sorter<Statement> newStatementSorter(final boolean compress) {
83 return new StatementSorter(compress);
84 }
85
86 public static Sorter<Object[]> newTupleSorter(final boolean compress, final Class<?>... schema) {
87 return new TupleSorter(compress, schema);
88 }
89
90 protected Sorter() {
91
92 this.startable = true;
93 }
94
95 public void start(final boolean deduplicate) throws IOException {
96
97
98 synchronized (this) {
99 if (!this.startable) {
100 throw new IllegalArgumentException();
101 }
102 this.startable = false;
103 }
104
105
106 this.dictionary = new Dictionary();
107
108
109 this.outputs = new ArrayList<Output>();
110 this.threadOutput = new ThreadLocal<Output>() {
111
112 @Override
113 protected Output initialValue() {
114 final OutputStream out = IO.parallelBuffer(Sorter.this.sortOut, (byte) 0);
115 final Output output = new Output(out, Sorter.this.dictionary);
116 synchronized (Sorter.this.outputs) {
117 Sorter.this.outputs.add(output);
118 }
119 return output;
120 }
121
122 };
123
124
125 final List<String> command = new ArrayList<String>(Arrays.asList(Environment.getProperty(
126 "rdfpro.cmd.sort", "sort").split("\\s+")));
127 command.add("-z");
128 if (deduplicate) {
129 command.add("-u");
130 }
131 final ProcessBuilder builder = new ProcessBuilder(command);
132 builder.environment().put("LC_ALL", "C");
133 this.sortProcess = builder.start();
134
135
136 this.sortOut = this.sortProcess.getOutputStream();
137 this.sortIn = this.sortProcess.getInputStream();
138
139
140 Environment.getPool().submit(new Runnable() {
141
142 @Override
143 public void run() {
144 final BufferedReader in = new BufferedReader(new InputStreamReader(
145 Sorter.this.sortProcess.getErrorStream(), Charset.forName("UTF-8")));
146 try {
147 String line;
148 while ((line = in.readLine()) != null) {
149 LOGGER.error("[sort] {}", line);
150 }
151 } catch (final Throwable ex) {
152 LOGGER.error("[sort] failed to read from stream", ex);
153 } finally {
154 IO.closeQuietly(in);
155 }
156 }
157
158 });
159
160
161 this.writeTracker = new Tracker(LOGGER, null,
162 "%d records to sort (%d rec/s avg)",
163 "%d records to sort (%d rec/s, %d rec/s avg)");
164 this.readTracker = new Tracker(LOGGER, null,
165 "%d records from sort (%d rec/s avg)",
166 "%d records from sort (%d rec/s, %d rec/s avg)");
167
168
169 this.writeTracker.start();
170 }
171
172 public void emit(final T element) throws IOException {
173 final Output output;
174 try {
175 output = this.threadOutput.get();
176 } catch (final NullPointerException ex) {
177 throw new IllegalStateException();
178 }
179 encode(output, element);
180 output.endRecord();
181 this.writeTracker.increment();
182 }
183
184 public void end(final boolean parallelize, final Consumer<T> consumer) throws IOException {
185
186
187 synchronized (this) {
188 if (this.threadOutput == null) {
189 throw new IllegalStateException();
190 }
191 this.threadOutput = null;
192 this.writeTracker.end();
193 }
194
195
196 LOGGER.debug("Dictionary status:\n{}", this.dictionary);
197
198 try {
199
200 try {
201 for (final Output output : this.outputs) {
202 output.close();
203 }
204 this.outputs.clear();
205 } finally {
206 this.sortOut.close();
207 }
208
209
210 final int decoders = parallelize ? Environment.getCores() : 1;
211 this.decodersLatch = new CountDownLatch(decoders);
212 this.inputs = new ArrayList<Input>();
213 this.readTracker.start();
214
215
216
217
218 for (int i = 0; i < decoders; ++i) {
219 final InputStream in = IO.parallelBuffer(this.sortIn, (byte) 0);
220 this.inputs.add(new Input(in, this.dictionary));
221 }
222 for (int i = 1; i < decoders; ++i) {
223 final Input input = this.inputs.get(i);
224 Environment.getPool().execute(new Runnable() {
225
226 @Override
227 public void run() {
228 tryDecode(input, consumer);
229 }
230
231 });
232 }
233 tryDecode(this.inputs.get(0), consumer);
234
235 this.decodersLatch.await();
236 this.readTracker.end();
237
238 } catch (final Throwable ex) {
239 this.exception = ex;
240
241 } finally {
242
243 IO.closeQuietly(this.sortIn);
244 if (this.inputs != null) {
245 for (final Input input : this.inputs) {
246 input.close();
247 }
248 }
249 if (this.exception != null) {
250 if (this.exception instanceof IOException) {
251 throw (IOException) this.exception;
252 } else if (this.exception instanceof RuntimeException) {
253 throw (RuntimeException) this.exception;
254 } else if (this.exception instanceof Error) {
255 throw (Error) this.exception;
256 }
257 throw new RuntimeException(this.exception);
258 }
259 }
260 }
261
262 private void tryDecode(final Input input, final Consumer<T> consumer) {
263 try {
264 while (input.nextRecord()) {
265 final T element = decode(input);
266 consumer.accept(element);
267 this.readTracker.increment();
268 }
269 input.close();
270 } catch (final Throwable ex) {
271 if (this.exception == null) {
272 this.exception = ex;
273 for (final Input r : this.inputs) {
274 r.close();
275 }
276 }
277 } finally {
278 this.decodersLatch.countDown();
279 }
280 }
281
282 @Override
283 public void close() {
284 try {
285
286 if (this.sortProcess != null) {
287 this.sortProcess.destroy();
288 }
289
290 } catch (final Throwable ex) {
291 LOGGER.error("Exception caught while killing sort process", ex);
292
293 } finally {
294
295 this.startable = false;
296 this.dictionary = null;
297 this.sortProcess = null;
298 this.sortOut = null;
299 this.sortIn = null;
300 this.outputs = null;
301 this.threadOutput = null;
302 this.inputs = null;
303 this.decodersLatch = null;
304 this.exception = null;
305 }
306 }
307
308 protected abstract void encode(Output output, final T element) throws IOException;
309
310 protected abstract T decode(Input input) throws IOException;
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328 public static final class Output {
329
330 private final OutputStream out;
331
332 private final Dictionary dictionary;
333
334 private final int[] remaining;
335
336 Output(final OutputStream out, final Dictionary dictionary) {
337 this.out = out;
338 this.dictionary = dictionary;
339 this.remaining = new int[] { -1 };
340 }
341
342 void endRecord() throws IOException {
343 this.out.write(0);
344 }
345
346 void close() throws IOException {
347 this.out.close();
348 }
349
350 public final void writeStatement(@Nullable final Statement statement,
351 final boolean compress) throws IOException {
352 if (statement == null) {
353 writeValue(null, compress);
354 } else {
355 writeValue(statement.getSubject(), compress);
356 writeValue(statement.getPredicate(), compress);
357 writeValue(statement.getObject(), compress);
358 writeValue(statement.getContext(), compress);
359 }
360 }
361
362 public final void writeValue(@Nullable final Value value, final boolean compress)
363 throws IOException {
364 if (value == null) {
365 write(1);
366 } else if (value instanceof BNode) {
367 writeStringHelper(((BNode) value).getID(), 0, 1);
368 } else if (value instanceof Literal) {
369 final Literal lit = (Literal) value;
370 final String lang = lit.getLanguage();
371 if (lang == null) {
372 final URI dt = lit.getDatatype();
373 if (dt == null || XMLSchema.STRING.equals(dt)) {
374 writeStringHelper(lit.getLabel(), 0, 2);
375 } else {
376 final int key = !compress ? -1 : this.dictionary.encodeDatatype(dt);
377 if (key < 0) {
378 writeStringHelper(lit.getLabel(), 0, 3);
379 writeStringHelper(dt.stringValue(), 0, 1);
380 } else {
381 writeStringHelper(lit.getLabel(), 0, 4);
382 writeNumber(key);
383 }
384 }
385 } else {
386 final int key = !compress ? -1 : this.dictionary.encodeLanguage(lang);
387 if (key < 0) {
388 writeStringHelper(lit.getLabel(), 0, 3);
389 writeStringHelper(lang, 0, 2);
390 } else {
391 writeStringHelper(lit.getLabel(), 0, 5);
392 writeNumber(key);
393 }
394 }
395 } else if (value instanceof URI) {
396 final URI uri = (URI) value;
397 boolean done = false;
398 if (compress) {
399 final int key = this.dictionary.encodeURI(uri, this.remaining);
400 if (key >= 0) {
401 final int r = this.remaining[0];
402 if (r < 0) {
403 write(7);
404 writeNumber(key);
405 done = true;
406 } else {
407 writeStringHelper(uri.stringValue(), r, 7);
408 writeNumber(key);
409 done = true;
410 }
411 }
412 }
413 if (!done) {
414 writeStringHelper(uri.stringValue(), 0, 6);
415 }
416 }
417 }
418
419 public final void writeString(@Nullable final String s) throws IOException {
420 if (s == null) {
421 write(0x02);
422 } else {
423 writeStringHelper(s, 0, 0x01);
424 }
425 }
426
427 public final void writeNumber(final long num) throws IOException {
428 if (num < 0L || num > 0x1FFFFFFFFFFFFFFL ) {
429 writeNumberHelper(10, 0xFE, num);
430 } else if (num <= 0x3FL ) {
431 writeNumberHelper(1, 0x40, num);
432 } else if (num <= 0x1FFFL ) {
433 writeNumberHelper(2, 0x80, num);
434 } else if (num <= 0x3FFFFL ) {
435 writeNumberHelper(3, 0xC0, num);
436 } else if (num <= 0x1FFFFFFL ) {
437 writeNumberHelper(4, 0xD0, num);
438 } else if (num <= 0xFFFFFFFFL ) {
439 writeNumberHelper(5, 0xE0, num);
440 } else if (num <= 0x1FFFFFFFFFL ) {
441 writeNumberHelper(6, 0xF0, num);
442 } else if (num <= 0xFFFFFFFFFFFL ) {
443 writeNumberHelper(7, 0xF4, num);
444 } else if (num <= 0x7FFFFFFFFFFFFL ) {
445 writeNumberHelper(8, 0xF8, num);
446 } else {
447 writeNumberHelper(9, 0xFC, num);
448 }
449 }
450
451
452
453
454
455 private void writeStringHelper(final String s, final int offset, final int delimiter)
456 throws IOException {
457 final int len = s.length();
458 for (int i = offset; i < len; ++i) {
459 int c = s.charAt(i);
460 if (c <= 0x07) {
461 c += 0xFFFF;
462 }
463 if (c <= 0x7F) {
464 write(c);
465 } else if (c <= 0x1FFF) {
466 write(0x80 | c >> 7);
467 write(0x80 | c & 0x7F);
468 } else {
469 write(0xC0 | c >> 14);
470 write(0x80 | c >> 7 & 0x7F);
471 write(0x80 | c & 0x7F);
472 }
473 }
474 write(delimiter);
475 }
476
477 private void writeNumberHelper(final int len, final int mask, final long num)
478 throws IOException {
479 write(mask | (int) (num >>> (len - 1) * 7));
480 for (int i = len - 2; i >= 0; --i) {
481 write(0x80 | (int) (num >>> i * 7 & 0x7F));
482 }
483 }
484
485 private void write(final int b) throws IOException {
486 assert (b & 0xFF) != 0;
487 this.out.write(b);
488 }
489
490 }
491
492 public static final class Input {
493
494 private final InputStream in;
495
496 private final Dictionary dictionary;
497
498 private final StringBuilder builder;
499
500 private int c;
501
502 Input(final InputStream in, final Dictionary dictionary) {
503 this.in = in;
504 this.dictionary = dictionary;
505 this.builder = new StringBuilder();
506 this.c = 0;
507 }
508
509 boolean nextRecord() throws IOException {
510 while (this.c != 0) {
511 LOGGER.warn("Skipping " + this.c);
512 this.c = this.in.read();
513 if (this.c < 0) {
514 throw new EOFException("EOF found before completing read of record");
515 }
516 }
517 this.c = this.in.read();
518 if (this.c < 0) {
519 return false;
520 }
521 if (this.c == 0) {
522 throw new Error("Empty record!");
523 }
524 return true;
525 }
526
527 void close() {
528 IO.closeQuietly(this.in);
529 }
530
531 public final boolean isEOF() {
532 return this.c <= 0;
533 }
534
535 @Nullable
536 public final Statement readStatement() throws IOException {
537
538 final Resource s = (Resource) readValue();
539 if (s == null) {
540 return null;
541 }
542
543 final URI p = (URI) readValue();
544 final Value o = readValue();
545 final Resource c = (Resource) readValue();
546
547 final ValueFactory vf = Statements.VALUE_FACTORY;
548 return c == null ? vf.createStatement(s, p, o) : vf.createStatement(s, p, o, c);
549 }
550
551 @Nullable
552 public final Value readValue() throws IOException {
553 final int delim = readStringHelper();
554 if (delim == 1 && this.builder.length() == 0) {
555 return null;
556 }
557 final String s = this.builder.toString();
558 final ValueFactory vf = Statements.VALUE_FACTORY;
559 if (delim == 1) {
560 return vf.createBNode(this.builder.toString());
561 } else if (delim == 2) {
562 return vf.createLiteral(s);
563 } else if (delim == 3) {
564 final int delim2 = readStringHelper();
565 final String s2 = this.builder.toString();
566 if (delim2 == 1) {
567 return vf.createLiteral(s, vf.createURI(s2));
568 } else {
569 return vf.createLiteral(s, s2);
570 }
571 } else if (delim == 4) {
572 final int key = (int) readNumber();
573 final URI dt = this.dictionary.decodeDatatype(key);
574 return vf.createLiteral(s, dt);
575 } else if (delim == 5) {
576 final int key = (int) readNumber();
577 final String lang = this.dictionary.decodeLanguage(key);
578 return vf.createLiteral(s, lang);
579 } else if (delim == 6) {
580 return vf.createURI(s);
581 } else if (delim == 7) {
582 final int key = (int) readNumber();
583 return this.dictionary.decodeURI(key, s.isEmpty() ? null : s);
584 }
585 throw new IllegalArgumentException("Invalid value delimiter: " + delim);
586 }
587
588 @Nullable
589 public final String readString() throws IOException {
590 final int delimiter = readStringHelper();
591 if (delimiter == 0x02) {
592 return null;
593 } else if (delimiter != 0x01) {
594 throw new IOException("Found invalid string delimiter: " + delimiter);
595 }
596 return this.builder.toString();
597 }
598
599 public final long readNumber() throws IOException {
600 final int b = read();
601 if (b <= 0x40 + 0x3F) {
602 return readNumberHelper(1, b & 0x3F);
603 } else if (b <= 0x80 + 0x3F) {
604 return readNumberHelper(2, b & 0x3F);
605 } else if (b <= 0xC0 + 0x0F) {
606 return readNumberHelper(3, b & 0x0F);
607 } else if (b <= 0xD0 + 0x0F) {
608 return readNumberHelper(4, b & 0x0F);
609 } else if (b <= 0xE0 + 0x0F) {
610 return readNumberHelper(5, b & 0x0F);
611 } else if (b <= 0xF0 + 0x03) {
612 return readNumberHelper(6, b & 0x03);
613 } else if (b <= 0xF4 + 0x03) {
614 return readNumberHelper(7, b & 0x03);
615 } else if (b <= 0xF8 + 0x03) {
616 return readNumberHelper(8, b & 0x03);
617 } else if (b <= 0xFC + 0x01) {
618 return readNumberHelper(9, b & 0x01);
619 } else {
620 return readNumberHelper(10, b & 0x01);
621 }
622 }
623
624 private int readStringHelper() throws IOException {
625 this.builder.setLength(0);
626 while (true) {
627 final int c = read();
628 if (c <= 0x07) {
629 return c;
630 } else if (c <= 0x7F) {
631 this.builder.append((char) c);
632 } else if (c <= 0xBF) {
633 final int c1 = read();
634 final int n = (c & 0x3F) << 7 | c1 & 0x7F;
635 this.builder.append((char) n);
636 } else {
637 final int c1 = read();
638 final int c2 = read();
639 int n = (c & 0x3F) << 14 | (c1 & 0x7F) << 7 | c2 & 0x7F;
640 if (n > 0xFFFF) {
641 n = n - 0xFFFF;
642 }
643 this.builder.append((char) n);
644 }
645 }
646 }
647
648 private long readNumberHelper(final int len, final int start) throws IOException {
649 long num = start;
650 for (int i = 1; i < len; ++i) {
651 final int c = read();
652 num = num << 7 | c & 0x7F;
653 }
654 return num;
655 }
656
657 private int read() throws IOException {
658 final int result = this.c;
659 if (result <= 0) {
660 throw new EOFException("Byte is " + result);
661 }
662 this.c = this.in.read();
663 return result;
664 }
665
666 }
667
668 private static final class StatementSorter extends Sorter<Statement> {
669
670 private final boolean compress;
671
672 StatementSorter(final boolean compress) {
673 this.compress = compress;
674 }
675
676 @Override
677 protected void encode(final Output output, final Statement record) throws IOException {
678 output.writeStatement(record, this.compress);
679 }
680
681 @Override
682 protected Statement decode(final Input input) throws IOException {
683 return input.readStatement();
684 }
685
686 }
687
688 private static final class TupleSorter extends Sorter<Object[]> {
689
690 private static final int TYPE_STATEMENT = 0;
691
692 private static final int TYPE_VALUE = 1;
693
694 private static final int TYPE_STRING = 2;
695
696 private static final int TYPE_NUMBER = 3;
697
698 private final boolean compress;
699
700 private final int[] schema;
701
702 TupleSorter(final boolean compress, final Class<?>... schema) {
703 this.compress = compress;
704 this.schema = new int[schema.length];
705 for (int i = 0; i < schema.length; ++i) {
706 final Class<?> clazz = schema[i];
707 if (Statement.class.equals(clazz)) {
708 this.schema[i] = TYPE_STATEMENT;
709 } else if (Value.class.equals(clazz)) {
710 this.schema[i] = TYPE_VALUE;
711 } else if (String.class.equals(clazz)) {
712 this.schema[i] = TYPE_STRING;
713 } else if (Long.class.equals(clazz)) {
714 this.schema[i] = TYPE_NUMBER;
715 } else {
716 throw new IllegalArgumentException("Unsupported tuple field: " + clazz);
717 }
718 }
719 }
720
721 @Override
722 protected void encode(final Output output, final Object[] record) throws IOException {
723 for (int i = 0; i < this.schema.length; ++i) {
724 final Object field = record[i];
725 final int type = this.schema[i];
726 switch (type) {
727 case TYPE_STATEMENT:
728 output.writeStatement((Statement) field, this.compress);
729 break;
730 case TYPE_VALUE:
731 output.writeValue((Value) field, this.compress);
732 break;
733 case TYPE_STRING:
734 output.writeString((String) field);
735 break;
736 case TYPE_NUMBER:
737 output.writeNumber(((Number) field).longValue());
738 break;
739 default:
740 throw new Error("Unexpected type " + type);
741 }
742 }
743 }
744
745 @Override
746 protected Object[] decode(final Input input) throws IOException {
747 final Object[] record = new Object[this.schema.length];
748 for (int i = 0; i < this.schema.length; ++i) {
749 final int type = this.schema[i];
750 switch (type) {
751 case TYPE_STATEMENT:
752 record[i] = input.readStatement();
753 break;
754 case TYPE_VALUE:
755 record[i] = input.readValue();
756 break;
757 case TYPE_STRING:
758 record[i] = input.readString();
759 break;
760 case TYPE_NUMBER:
761 record[i] = input.readNumber();
762 break;
763 default:
764 throw new Error("Unexpected type " + type);
765 }
766 }
767 return record;
768 }
769
770 }
771
772 private static final class Dictionary {
773
774 private static final int LANGUAGE_INDEX_SIZE = 1024;
775
776 private static final int DATATYPE_INDEX_SIZE = 1024;
777
778 private static final int NAMESPACE_INDEX_SIZE = 256 * 1024;
779
780 private static final int VOCAB_INDEX_SIZE = 64 * 1024;
781
782 private static final int OTHER_INDEX_SIZE = 4 * 1024;
783
784 private static final int URI_CACHE_SIZE = 8191;
785
786 private final GenericIndex<String> languageIndex;
787
788 private final GenericIndex<URI> datatypeIndex;
789
790 private final StringIndex namespaceIndex;
791
792 private final StringIndex vocabNameIndex;
793
794 private final StringIndex otherNameIndex;
795
796 private final int vocabNamespaces;
797
798 private final int[] uriCacheCodes;
799
800 private final URI[] uriCacheURIs;
801
802 private final Object[] uriCacheLocks;
803
804 public Dictionary() {
805 this.languageIndex = new GenericIndex<String>(LANGUAGE_INDEX_SIZE);
806 this.datatypeIndex = new GenericIndex<URI>(DATATYPE_INDEX_SIZE);
807 this.namespaceIndex = new StringIndex(NAMESPACE_INDEX_SIZE);
808 this.vocabNameIndex = new StringIndex(VOCAB_INDEX_SIZE);
809 this.otherNameIndex = new StringIndex(OTHER_INDEX_SIZE);
810
811 for (final String ns : Namespaces.DEFAULT.uris()) {
812 int nsHash = 0;
813 for (int i = ns.length() - 1; i >= 0; --i) {
814 final int c = ns.charAt(i);
815 nsHash = nsHash * 31 + c;
816 }
817 this.namespaceIndex.put(ns, 0, ns.length(), nsHash, false);
818 }
819 this.vocabNamespaces = Namespaces.DEFAULT.uris().size();
820
821 this.vocabNameIndex.put("", 0, 0, 0, false);
822 this.otherNameIndex.put("", 0, 0, 0, false);
823
824 this.uriCacheCodes = new int[URI_CACHE_SIZE];
825 this.uriCacheURIs = new URI[URI_CACHE_SIZE];
826 this.uriCacheLocks = new Object[32];
827 for (int i = 0; i < 32; ++i) {
828 this.uriCacheLocks[i] = new Object();
829 }
830 }
831
832 public int encodeLanguage(final String language) {
833 return this.languageIndex.put(language);
834 }
835
836 public String decodeLanguage(final int code) {
837 return this.languageIndex.get(code);
838 }
839
840 public int encodeDatatype(final URI datatype) {
841 return this.datatypeIndex.put(datatype);
842 }
843
844 public URI decodeDatatype(final int code) {
845 return this.datatypeIndex.get(code);
846 }
847
848 public int encodeURI(final URI uri, final int[] remaining) {
849
850 final String s = uri.stringValue();
851 final int len = s.length();
852
853 int nameHash = 0;
854 int nsLen = len;
855 while (--nsLen >= 0) {
856 final int c = s.charAt(nsLen);
857 if (c == '#' || c == '/' || c == ':') {
858 break;
859 }
860 nameHash = nameHash * 31 + c;
861 }
862
863 int nsHash = 0;
864 for (int i = nsLen; i >= 0; --i) {
865 final int c = s.charAt(i);
866 nsHash = nsHash * 31 + c;
867 }
868 ++nsLen;
869
870 final int nsKey = this.namespaceIndex.put(s, 0, nsLen, nsHash, false);
871 if (nsKey < 0) {
872 remaining[0] = -1;
873 return -1;
874 }
875
876 if (nsKey < this.vocabNamespaces) {
877 final int nameKey = this.vocabNameIndex.put(s, nsLen, len, nameHash, true);
878 if (nameKey < 0) {
879 remaining[0] = nsLen;
880 return nsKey;
881 } else {
882 final int code = nameKey * this.vocabNamespaces + nsKey << 1;
883 remaining[0] = -1;
884 return code;
885 }
886 }
887
888 final int nameKey = this.otherNameIndex.put(s, nsLen, len, nameHash, false);
889 if (nameKey < 0) {
890 remaining[0] = nsLen;
891 return nsKey;
892 } else {
893 final int code = nameKey * NAMESPACE_INDEX_SIZE + nsKey << 1 | 0x01;
894 remaining[0] = -1;
895 return code;
896 }
897 }
898
899 public URI decodeURI(final int code, final String remaining) {
900
901 if (remaining != null && !remaining.isEmpty()) {
902 final String ns = this.namespaceIndex.get(code);
903 return Statements.VALUE_FACTORY.createURI(ns, remaining);
904 }
905
906 final int offset = code % this.uriCacheURIs.length;
907 final Object lock = this.uriCacheLocks[offset % this.uriCacheLocks.length];
908 synchronized (lock) {
909 final int cachedCode = this.uriCacheCodes[offset];
910 if (cachedCode == code) {
911 return this.uriCacheURIs[offset];
912 }
913 }
914
915 final boolean isVocab = (code & 0x01) == 0;
916 final int c = code >>> 1;
917
918 URI uri;
919 if (isVocab) {
920 final int nsKey = c % this.vocabNamespaces;
921 final int nameKey = c / this.vocabNamespaces;
922 final String ns = this.namespaceIndex.get(nsKey);
923 final String name = this.vocabNameIndex.get(nameKey);
924 uri = Statements.VALUE_FACTORY.createURI(ns, name);
925 } else {
926 final int nsKey = c % NAMESPACE_INDEX_SIZE;
927 final int nameKey = c / NAMESPACE_INDEX_SIZE;
928 final String ns = this.namespaceIndex.get(nsKey);
929 final String name = this.otherNameIndex.get(nameKey);
930 uri = Statements.VALUE_FACTORY.createURI(ns, name);
931 }
932
933 synchronized (lock) {
934 this.uriCacheURIs[offset] = uri;
935 this.uriCacheCodes[offset] = code;
936 }
937
938 return uri;
939 }
940
941 @Override
942 public String toString() {
943 final StringBuilder builder = new StringBuilder();
944 builder.append("language index: ").append(this.languageIndex).append("\n");
945 builder.append("datatype index: ").append(this.datatypeIndex).append("\n");
946 builder.append("namespace index: ").append(this.namespaceIndex).append("\n");
947 builder.append("vocab index: ").append(this.vocabNameIndex).append("\n");
948 builder.append("other index: ").append(this.otherNameIndex);
949 return builder.toString();
950 }
951
952 private static final class StringIndex {
953
954 private static final int NUM_LOCKS = 32;
955
956 private final int capacity;
957
958 private final AtomicInteger size;
959
960 private final List<String> list;
961
962 private final int[] table;
963
964 private final Object[] locks;
965
966 StringIndex(final int capacity) {
967 this.capacity = capacity;
968 this.size = new AtomicInteger(0);
969 this.list = new ArrayList<String>();
970 this.table = new int[2 * (this.capacity * 4 - 1)];
971 this.locks = new Object[NUM_LOCKS];
972 for (int i = 0; i < NUM_LOCKS; ++i) {
973 this.locks[i] = new Object();
974 }
975 }
976
977 @Nullable
978 public int put(final String string, final int begin, final int end, final int hash,
979 final boolean likelyExist) {
980
981 final int tableSize = this.table.length / 2;
982 final int segmentSize = (tableSize + NUM_LOCKS - 1) / NUM_LOCKS;
983
984 int index = (hash & 0x7FFFFFFF) % tableSize;
985
986 final int segment = index / segmentSize;
987 final int segmentStart = segment * segmentSize;
988 final int segmentEnd = Math.min(tableSize, segmentStart + segmentSize);
989
990 final boolean full = this.size.get() >= this.capacity;
991
992
993 if (full || likelyExist) {
994 for (int i = 0; i < segmentSize; ++i) {
995 final int offset = index * 2;
996 final int key = this.table[offset] - 1;
997 if (key < 0) {
998 if (full) {
999 return -1;
1000 }
1001 break;
1002 } else if (hash == this.table[offset + 1]
1003 && equals(this.list.get(key), string, begin, end)) {
1004 return key;
1005 }
1006 ++index;
1007 if (index >= segmentEnd) {
1008 index = segmentStart;
1009 }
1010 }
1011 }
1012
1013
1014
1015 synchronized (this.locks[segment]) {
1016 for (int i = 0; i < segmentSize; ++i) {
1017 final int offset = index * 2;
1018 final int key = this.table[offset] - 1;
1019 if (key < 0) {
1020 synchronized (this.list) {
1021 final int newKey = this.size.get();
1022 if (newKey >= this.capacity) {
1023 return -1;
1024 }
1025 this.list.add(string.substring(begin, end));
1026 this.table[offset] = newKey + 1;
1027 this.table[offset + 1] = hash;
1028 this.size.incrementAndGet();
1029 return newKey;
1030 }
1031 } else if (hash == this.table[offset + 1]
1032 && equals(this.list.get(key), string, begin, end)) {
1033 return key;
1034 }
1035 ++index;
1036 if (index >= segmentEnd) {
1037 index = segmentStart;
1038 }
1039 }
1040 }
1041
1042 return -1;
1043 }
1044
1045 @Nullable
1046 public String get(final int key) {
1047 final String result = this.list.get(key);
1048 if (result == null) {
1049 throw new IllegalArgumentException("No element for key " + key);
1050 }
1051 return result;
1052 }
1053
1054 @Override
1055 public String toString() {
1056 return this.size.get() + "/" + this.capacity;
1057 }
1058
1059 private boolean equals(final String reference, final String string, final int begin,
1060 final int end) {
1061 final int len = reference.length();
1062 if (len == end - begin) {
1063 int i = len;
1064 int j = end;
1065 while (--i >= 0) {
1066 --j;
1067 if (reference.charAt(i) != string.charAt(j)) {
1068 return false;
1069 }
1070 }
1071 return true;
1072 }
1073 return false;
1074 }
1075
1076 }
1077
1078 private static final class GenericIndex<T> {
1079
1080 private final int capacity;
1081
1082 private final List<T> list;
1083
1084 private final int[] table;
1085
1086 GenericIndex(final int capacity) {
1087 this.capacity = capacity;
1088 this.list = new ArrayList<T>();
1089 this.table = new int[2 * (this.capacity * 4 - 1)];
1090 }
1091
1092 @Nullable
1093 public int put(final T element) {
1094
1095 final int tableSize = this.table.length / 2;
1096 final int hash = element.hashCode();
1097 int index = (hash & 0x7FFFFFFF) % tableSize;
1098
1099
1100
1101 while (true) {
1102 final int offset = index * 2;
1103 final int key = this.table[offset] - 1;
1104 if (key < 0) {
1105 break;
1106 } else if (hash == this.table[offset + 1]
1107 && element.equals(this.list.get(key))) {
1108 return key;
1109 }
1110 index = (index + 1) % tableSize;
1111 }
1112
1113
1114 synchronized (this.list) {
1115 while (true) {
1116 final int offset = index * 2;
1117 final int key = this.table[offset] - 1;
1118 if (key < 0) {
1119 final int newKey = this.list.size();
1120 if (newKey >= this.capacity) {
1121 return -1;
1122 }
1123 this.list.add(element);
1124 this.table[offset] = newKey + 1;
1125 this.table[offset + 1] = hash;
1126 return newKey;
1127 } else if (hash == this.table[offset + 1]
1128 && element.equals(this.list.get(key))) {
1129 return key;
1130 }
1131 index = (index + 1) % tableSize;
1132 }
1133 }
1134 }
1135
1136 @Nullable
1137 public T get(final int key) {
1138 final T result = this.list.get(key);
1139 if (result == null) {
1140 throw new IllegalArgumentException("No element for key " + key);
1141 }
1142 return result;
1143 }
1144
1145 @Override
1146 public String toString() {
1147 return this.list.size() + "/" + this.capacity;
1148 }
1149
1150 }
1151
1152 }
1153
1154 }