1
2
3
4
5
6
7
8
9
10
11
12
13
14 package eu.fbk.rdfpro.util;
15
16 import java.io.File;
17 import java.io.FileInputStream;
18 import java.io.FileOutputStream;
19 import java.io.FilterOutputStream;
20 import java.io.IOException;
21 import java.io.InputStream;
22 import java.io.OutputStream;
23 import java.io.Reader;
24 import java.io.Writer;
25 import java.lang.ProcessBuilder.Redirect;
26 import java.net.MalformedURLException;
27 import java.net.URISyntaxException;
28 import java.net.URL;
29 import java.nio.ByteBuffer;
30 import java.nio.CharBuffer;
31 import java.util.ArrayList;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.Objects;
35 import java.util.WeakHashMap;
36 import java.util.concurrent.ArrayBlockingQueue;
37 import java.util.concurrent.BlockingQueue;
38 import java.util.concurrent.CountDownLatch;
39 import java.util.concurrent.atomic.AtomicBoolean;
40
41 import javax.annotation.Nullable;
42
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46
47
48
49
50
51 public final class IO {
52
53 private static final Logger LOGGER = LoggerFactory.getLogger(IO.class);
54
55
56
57
58
59
60
61
62
63
64
65
66 private static final int BUFFER_SIZE = Integer.parseInt(Environment.getProperty(
67 "rdfpro.buffer.size", "" + 64 * 1024));
68
69
70
71
72
73
74
75
76 private static final int BUFFER_NUM_READ = Integer.parseInt(Environment.getProperty(
77 "rdfpro.buffer.numr", "256"));
78
79 private static final int BUFFER_NUM_WRITE = Integer.parseInt(Environment.getProperty(
80 "rdfpro.buffer.numw", "16"));
81
82 @Nullable
83 public static <T> T closeQuietly(@Nullable final T object) {
84 if (object instanceof AutoCloseable) {
85 try {
86 ((AutoCloseable) object).close();
87 } catch (final Throwable ex) {
88 LOGGER.error("Error closing " + object.getClass().getSimpleName(), ex);
89 }
90 }
91 return object;
92 }
93
94 public static URL extractURL(final String location) {
95 Objects.requireNonNull(location);
96 try {
97 final int index = location.indexOf(':');
98 if (index < 0) {
99 return new File(location).toURI().toURL();
100 }
101 String s = location.charAt(0) != '.' ? location : location.substring(index + 1);
102 if (s.startsWith("classpath:")) {
103 s = s.substring("classpath:".length());
104 return Objects.requireNonNull(IO.class.getResource(s));
105 } else {
106 try {
107 return new URL(s);
108 } catch (final MalformedURLException ex) {
109 return new File(s).toURI().toURL();
110 }
111 }
112 } catch (final Throwable ex) {
113 throw new IllegalArgumentException("Cannot extract URL from '" + location + "'", ex);
114 }
115 }
116
117 public static String extractExtension(final String location) {
118 Objects.requireNonNull(location);
119 final int index = location.indexOf(':');
120 int extEnd = location.length();
121 if (index >= 0) {
122 if (location.charAt(0) == '.') {
123 return location.substring(0, index);
124 }
125 int index2 = location.lastIndexOf('#');
126 index2 = index2 >= 0 ? index2 : location.length();
127 extEnd = location.lastIndexOf('?', index2);
128 extEnd = extEnd >= 0 ? extEnd : index2;
129 }
130 final int nameStart = Math.max(-1, location.lastIndexOf('/', extEnd)) + 1;
131 final int extStart = location.indexOf('.', nameStart);
132 return extStart < 0 ? "" : location.substring(extStart, extEnd);
133 }
134
135 public static InputStream read(final String location) throws IOException {
136
137 final String ext = extractExtension(location);
138 final URL url = extractURL(location);
139
140 String cmd = null;
141 if (ext.endsWith(".bz2")) {
142 cmd = Environment.getProperty("rdfpro.cmd.bzip2", "bzip2") + " -dck";
143 } else if (ext.endsWith(".gz")) {
144 cmd = Environment.getProperty("rdfpro.cmd.gzip", "gzip") + " -dc";
145 } else if (ext.endsWith(".xz")) {
146 cmd = Environment.getProperty("rdfpro.cmd.xz", "xz") + " -dc";
147 } else if (ext.endsWith(".7z")) {
148 cmd = Environment.getProperty("rdfpro.cmd.7za", "7za") + " -so e";
149 } else if (ext.endsWith(".lz4")) {
150 cmd = Environment.getProperty("rdfpro.cmd.lz4", "lz4") + " -dc";
151 }
152
153 if ("file".equals(url.getProtocol())) {
154 final File file;
155 try {
156 file = new File(url.toURI());
157 } catch (final URISyntaxException ex) {
158 throw new IllegalArgumentException("Invalid file:// URL: " + location);
159 }
160
161 if (cmd == null) {
162 LOGGER.debug("Reading file {}", file);
163 return new FileInputStream(file);
164
165 } else {
166 LOGGER.debug("Reading file {} using {}", file, cmd);
167 cmd += " " + file.getAbsolutePath();
168 final Process process = new ProcessBuilder(cmd.split("\\s+"))
169 .redirectError(Redirect.INHERIT).start();
170 return process.getInputStream();
171 }
172
173 } else {
174 final InputStream stream = url.openStream();
175 if (cmd == null) {
176 LOGGER.debug("Downloading file {}", url);
177 return stream;
178
179 } else {
180 LOGGER.debug("Downloading file {} using {}", url, cmd);
181 final Process process = new ProcessBuilder(cmd.split("\\s+"))
182 .redirectError(Redirect.INHERIT).start();
183 Environment.getPool().execute(new Runnable() {
184
185 @Override
186 public void run() {
187 try {
188 final byte[] buffer = new byte[8 * 1024];
189 while (true) {
190 final int count = stream.read(buffer);
191 if (count < 0) {
192 break;
193 }
194 process.getOutputStream().write(buffer, 0, count);
195 }
196 process.getOutputStream().close();
197
198 } catch (final Throwable ex) {
199 LOGGER.error("Error reading from " + url, ex);
200 process.destroy();
201 } finally {
202 closeQuietly(stream);
203 }
204 }
205 });
206 return process.getInputStream();
207 }
208 }
209 }
210
211 public static OutputStream write(final String location) throws IOException {
212
213 final String ext = extractExtension(location);
214 final URL url = extractURL(location);
215
216 if (!"file".equals(url.getProtocol())) {
217 throw new IllegalArgumentException("Cannot write to non-file URL " + location);
218 }
219
220 final File file;
221 try {
222 file = new File(url.toURI());
223 } catch (final URISyntaxException ex) {
224 throw new IllegalArgumentException("Invalid file:// URL: " + location);
225 }
226
227 final String cmd;
228 if (ext.endsWith(".bz2")) {
229 cmd = Environment.getProperty("rdfpro.cmd.bzip2", "bzip2") + " -c -9";
230 } else if (ext.endsWith(".gz")) {
231 cmd = Environment.getProperty("rdfpro.cmd.gzip", "gzip") + " -c -9";
232 } else if (ext.endsWith(".xz")) {
233 cmd = Environment.getProperty("rdfpro.cmd.xz", "xz") + " -c -9";
234 } else if (ext.endsWith(".lz4")) {
235 cmd = Environment.getProperty("rdfpro.cmd.lz4", "lz4") + " -c -9";
236 } else {
237 cmd = null;
238 }
239
240 if (cmd == null) {
241 LOGGER.debug("Writing file {}", file);
242 return new FileOutputStream(file);
243
244 } else {
245 LOGGER.debug("Writing file {} using {}", file, cmd);
246 final Process process = new ProcessBuilder(cmd.split("\\s+"))
247 .redirectOutput(file).redirectError(Redirect.INHERIT).start();
248 return new FilterOutputStream(process.getOutputStream()) {
249
250 private final AtomicBoolean closed = new AtomicBoolean(false);
251
252 @Override
253 public void write(final int b) throws IOException {
254 this.out.write(b);
255 }
256
257 @Override
258 public void write(final byte[] b) throws IOException {
259 this.out.write(b);
260 }
261
262 @Override
263 public void write(final byte[] b, final int off, final int len) throws IOException {
264 this.out.write(b, off, len);
265 }
266
267 @Override
268 public void close() throws IOException {
269 if (this.closed.compareAndSet(false, true)) {
270 LOGGER.debug("Completing '{}'", cmd);
271 this.out.flush();
272 this.out.close();
273 try {
274 final int code = process.waitFor();
275 LOGGER.debug("Process completed with exit code {}", code);
276 } catch (final InterruptedException ex) {
277 throw new IOException("Didn't wait till IO completion", ex);
278 } finally {
279 process.destroy();
280 }
281 }
282 }
283
284 };
285 }
286 }
287
288 public static InputStream buffer(final InputStream stream) {
289 return new SimpleBufferedInputStream(stream);
290 }
291
292 public static OutputStream buffer(final OutputStream stream) {
293 return new SimpleBufferedOutputStream(stream);
294 }
295
296 public static Reader buffer(final Reader reader) {
297 return new SimpleBufferedReader(reader);
298 }
299
300 public static Writer buffer(final Writer writer) {
301 return new SimpleBufferedWriter(writer);
302 }
303
304 public static InputStream parallelBuffer(final InputStream stream, final byte delimiter) {
305 return new ParallelBufferedInputStream(stream, delimiter);
306 }
307
308 public static OutputStream parallelBuffer(final OutputStream stream, final byte delimiter) {
309 return new ParallelBufferedOutputStream(stream, delimiter);
310 }
311
312 public static Reader parallelBuffer(final Reader reader, final char delimiter) {
313 return new ParallelBufferedReader(reader, delimiter);
314 }
315
316 public static Writer parallelBuffer(final Writer writer, final char delimiter) {
317 return new ParallelBufferedWriter(writer, delimiter);
318 }
319
320 public static Reader utf8Reader(final InputStream stream) {
321 return new UTF8Reader(stream);
322 }
323
324 public static Writer utf8Writer(final OutputStream stream) {
325 return new UTF8Writer(stream);
326 }
327
328 private static void propagate(final Throwable ex) throws IOException {
329 if (ex instanceof RuntimeException) {
330 throw (RuntimeException) ex;
331 } else if (ex instanceof Error) {
332 throw (Error) ex;
333 } else if (ex instanceof IOException) {
334 throw (IOException) ex;
335 }
336 throw new IOException(ex);
337 }
338
339 private IO() {
340 }
341
342 private static final class SimpleBufferedInputStream extends InputStream {
343
344 private final InputStream stream;
345
346 private final byte buffer[];
347
348 private int count;
349
350 private int pos;
351
352 private boolean closed;
353
354 public SimpleBufferedInputStream(final InputStream stream) {
355 this.stream = Objects.requireNonNull(stream);
356 this.buffer = new byte[BUFFER_SIZE];
357 this.count = 0;
358 this.pos = 0;
359 this.closed = false;
360 }
361
362 @Override
363 public int read() throws IOException {
364 if (this.pos >= this.count) {
365 fill();
366 if (this.pos >= this.count) {
367 return -1;
368 }
369 }
370 return this.buffer[this.pos++] & 0xFF;
371 }
372
373 @Override
374 public int read(final byte buf[], int off, int len) throws IOException {
375 if ((off | len | off + len | buf.length - (off + len)) < 0) {
376 throw new IndexOutOfBoundsException();
377 }
378 if (len == 0) {
379 checkNotClosed();
380 return 0;
381 }
382 int result = 0;
383 while (true) {
384 final int available = this.count - this.pos;
385 if (available > 0) {
386 final int n = available > len ? len : available;
387 System.arraycopy(this.buffer, this.pos, buf, off, n);
388 this.pos += n;
389 off += n;
390 len -= n;
391 result += n;
392 if (len == 0 || this.stream.available() == 0) {
393 return result;
394 }
395 }
396 if (len >= BUFFER_SIZE) {
397 final int n = this.stream.read(buf, off, len);
398 result += n < 0 ? 0 : n;
399 return result == 0 ? -1 : result;
400 } else if (len > 0) {
401 fill();
402 if (this.count == 0) {
403 return result == 0 ? -1 : result;
404 }
405 }
406 }
407 }
408
409 @Override
410 public long skip(final long n) throws IOException {
411 if (n <= 0) {
412 checkNotClosed();
413 return 0;
414 }
415 final int available = this.count - this.pos;
416 if (available <= 0) {
417 return this.stream.skip(n);
418 }
419 final long skipped = available < n ? available : n;
420 this.pos += skipped;
421 return skipped;
422 }
423
424 @Override
425 public int available() throws IOException {
426 final int n = this.count - this.pos;
427 final int available = this.stream.available();
428 return n > Integer.MAX_VALUE - available ? Integer.MAX_VALUE : n + available;
429 }
430
431 @Override
432 public void reset() throws IOException {
433 throw new IOException("Mark not supported");
434 }
435
436 @Override
437 public void mark(final int readlimit) {
438 }
439
440 @Override
441 public boolean markSupported() {
442 return false;
443 }
444
445 @Override
446 public void close() throws IOException {
447 synchronized (this.buffer) {
448 if (this.closed) {
449 return;
450 }
451 this.closed = true;
452 }
453 this.count = this.pos;
454 this.stream.close();
455 }
456
457 private void fill() throws IOException {
458 checkNotClosed();
459 final int n = this.stream.read(this.buffer);
460 this.count = n < 0 ? 0 : n;
461 this.pos = 0;
462 }
463
464 private void checkNotClosed() throws IOException {
465 if (this.closed) {
466 throw new IOException("Stream has been closed");
467 }
468 }
469
470 }
471
472 private static final class SimpleBufferedOutputStream extends OutputStream {
473
474 private final OutputStream stream;
475
476 private final byte[] buffer;
477
478 private int count;
479
480 private boolean closed;
481
482 SimpleBufferedOutputStream(final OutputStream stream) {
483 this.stream = Objects.requireNonNull(stream);
484 this.buffer = new byte[BUFFER_SIZE];
485 this.count = 0;
486 this.closed = false;
487 }
488
489 @Override
490 public void write(final int b) throws IOException {
491 if (this.count >= BUFFER_SIZE) {
492 flushBuffer();
493 }
494 this.buffer[this.count++] = (byte) b;
495 }
496
497 @Override
498 public void write(final byte buf[], int off, int len) throws IOException {
499 if (len >= BUFFER_SIZE) {
500 flushBuffer();
501 this.stream.write(buf, off, len);
502 return;
503 }
504 final int available = BUFFER_SIZE - this.count;
505 if (available < len) {
506 System.arraycopy(buf, off, this.buffer, this.count, available);
507 this.count += available;
508 off += available;
509 len -= available;
510 flushBuffer();
511 }
512 System.arraycopy(buf, off, this.buffer, this.count, len);
513 this.count += len;
514 }
515
516 @Override
517 public void flush() throws IOException {
518 flushBuffer();
519 this.stream.flush();
520 }
521
522 @Override
523 public void close() throws IOException {
524 synchronized (this.buffer) {
525 if (this.closed) {
526 return;
527 }
528 this.closed = true;
529 }
530 flushBuffer();
531 this.stream.close();
532
533 }
534
535 private void flushBuffer() throws IOException {
536 if (this.count > 0) {
537 this.stream.write(this.buffer, 0, this.count);
538 this.count = 0;
539 }
540 }
541
542 }
543
544 private static final class SimpleBufferedReader extends Reader {
545
546 private final Reader reader;
547
548 private final char[] buffer;
549
550 private int count;
551
552 private int pos;
553
554 private boolean closed;
555
556 public SimpleBufferedReader(final Reader reader) {
557 this.reader = reader;
558 this.buffer = new char[BUFFER_SIZE];
559 this.count = 0;
560 this.pos = 0;
561 this.closed = false;
562 }
563
564 @Override
565 public int read() throws IOException {
566 if (this.pos >= this.count) {
567 fill();
568 if (this.pos >= this.count) {
569 return -1;
570 }
571 }
572 return this.buffer[this.pos++] & 0xFFFF;
573 }
574
575 @Override
576 public int read(final char[] cbuf, final int off, final int len) throws IOException {
577 if ((off | len | off + len | cbuf.length - (off + len)) < 0) {
578 throw new IndexOutOfBoundsException();
579 }
580 if (len == 0) {
581 checkNotClosed();
582 return 0;
583 }
584 int available = this.count - this.pos;
585 if (available == 0) {
586 if (len >= BUFFER_SIZE) {
587 return this.reader.read(cbuf, off, len);
588 } else {
589 fill();
590 available = this.count - this.pos;
591 if (available == 0) {
592 return -1;
593 }
594 }
595 }
596 final int n = available > len ? len : available;
597 System.arraycopy(this.buffer, this.pos, cbuf, off, n);
598 this.pos += n;
599 return n;
600 }
601
602 @Override
603 public long skip(final long n) throws IOException {
604 if (n <= 0) {
605 checkNotClosed();
606 return 0;
607 }
608 final int available = this.count - this.pos;
609 if (available == 0) {
610 return this.reader.skip(n);
611 }
612 final long skipped = available < n ? available : n;
613 this.pos += skipped;
614 return skipped;
615 }
616
617 @Override
618 public void reset() throws IOException {
619 throw new IOException("Mark not supported");
620 }
621
622 @Override
623 public void mark(final int readlimit) {
624 }
625
626 @Override
627 public boolean markSupported() {
628 return false;
629 }
630
631 @Override
632 public void close() throws IOException {
633 synchronized (this.buffer) {
634 if (this.closed) {
635 return;
636 }
637 this.closed = true;
638 }
639 this.closed = true;
640 this.count = this.pos;
641 this.reader.close();
642 }
643
644 private void fill() throws IOException {
645 checkNotClosed();
646 final int n = this.reader.read(this.buffer);
647 this.count = n < 0 ? 0 : n;
648 this.pos = 0;
649 }
650
651 private void checkNotClosed() throws IOException {
652 if (this.closed) {
653 throw new IOException("Reader has been closed");
654 }
655 }
656
657 }
658
659 private static final class SimpleBufferedWriter extends Writer {
660
661 private final Writer writer;
662
663 private final char[] buffer;
664
665 private int count;
666
667 private boolean closed;
668
669 SimpleBufferedWriter(final Writer writer) {
670 this.writer = Objects.requireNonNull(writer);
671 this.buffer = new char[BUFFER_SIZE];
672 this.count = 0;
673 this.closed = false;
674 }
675
676 @Override
677 public void write(final int c) throws IOException {
678 if (this.count >= BUFFER_SIZE) {
679 flushBuffer();
680 }
681 this.buffer[this.count++] = (char) c;
682 }
683
684 @Override
685 public void write(final char[] cbuf, int off, int len) throws IOException {
686 if (len >= BUFFER_SIZE) {
687 flushBuffer();
688 this.writer.write(cbuf, off, len);
689 return;
690 }
691 final int available = BUFFER_SIZE - this.count;
692 if (available < len) {
693 System.arraycopy(cbuf, off, this.buffer, this.count, available);
694 this.count += available;
695 off += available;
696 len -= available;
697 flushBuffer();
698 }
699 System.arraycopy(cbuf, off, this.buffer, this.count, len);
700 this.count += len;
701 }
702
703 @Override
704 public void write(final String str, int off, int len) throws IOException {
705 if (len >= BUFFER_SIZE) {
706 flushBuffer();
707 this.writer.write(str, off, len);
708 return;
709 }
710 final int available = BUFFER_SIZE - this.count;
711 if (available < len) {
712 str.getChars(off, off + available, this.buffer, this.count);
713 this.count += available;
714 off += available;
715 len -= available;
716 flushBuffer();
717 }
718 str.getChars(off, off + len, this.buffer, this.count);
719 this.count += len;
720 }
721
722 @Override
723 public void flush() throws IOException {
724 flushBuffer();
725 this.writer.flush();
726 }
727
728 @Override
729 public void close() throws IOException {
730 synchronized (this.buffer) {
731 if (this.closed) {
732 return;
733 }
734 this.closed = true;
735 }
736 flushBuffer();
737 this.writer.close();
738
739 }
740
741 private void flushBuffer() throws IOException {
742 if (this.count > 0) {
743 this.writer.write(this.buffer, 0, this.count);
744 this.count = 0;
745 }
746 }
747
748 }
749
750 private static final class ParallelBufferedReader extends Reader {
751
752 private Fetcher fetcher;
753
754 private final List<CharBuffer> buffers;
755
756 private int index;
757
758 private char[] buffer;
759
760 private int count;
761
762 private int pos;
763
764 private boolean closed;
765
766 ParallelBufferedReader(final Reader reader, final char delimiter) {
767 this.fetcher = Fetcher.forReader(reader, delimiter);
768 this.buffers = new ArrayList<CharBuffer>();
769 this.index = 0;
770 this.buffer = null;
771 this.count = 0;
772 this.pos = 0;
773 this.closed = false;
774 this.fetcher.open();
775 }
776
777 @Override
778 public int read() throws IOException {
779 if (this.pos >= this.count) {
780 fill();
781 if (this.count == 0) {
782 return -1;
783 }
784 }
785 return this.buffer[this.pos++] & 0xFFFF;
786 }
787
788 @Override
789 public int read(final char[] cbuf, final int off, final int len) throws IOException {
790 if ((off | len | off + len | cbuf.length - (off + len)) < 0) {
791 throw new IndexOutOfBoundsException();
792 }
793 if (len == 0) {
794 checkNotClosed();
795 return 0;
796 }
797 final int available = this.count - this.pos;
798 if (available == 0) {
799 fill();
800 if (this.count == 0) {
801 return -1;
802 }
803 }
804 final int n = available > len ? len : available;
805 System.arraycopy(this.buffer, this.pos, cbuf, off, n);
806 this.pos += n;
807 return n;
808 }
809
810 @Override
811 public long skip(final long n) throws IOException {
812 if (n <= 0) {
813 checkNotClosed();
814 return 0;
815 }
816 int available = this.count - this.pos;
817 if (available == 0) {
818 fill();
819 available = this.count;
820 }
821 final long skipped = available < n ? available : n;
822 this.pos += skipped;
823 return skipped;
824 }
825
826 @Override
827 public void reset() throws IOException {
828 throw new IOException("Mark not supported");
829 }
830
831 @Override
832 public void mark(final int readlimit) {
833 }
834
835 @Override
836 public boolean markSupported() {
837 return false;
838 }
839
840 @Override
841 public void close() throws IOException {
842 synchronized (this.buffers) {
843 if (this.closed) {
844 return;
845 }
846 this.closed = true;
847 }
848 this.count = this.pos;
849 this.buffers.clear();
850 this.fetcher.close();
851 this.fetcher = null;
852 }
853
854 private void fill() throws IOException {
855 checkNotClosed();
856 if (this.buffer != null) {
857 this.buffer = null;
858 this.pos = 0;
859 this.count = 0;
860 }
861 if (this.index == this.buffers.size()) {
862 this.fetcher.fetch(this.buffers);
863 this.index = 0;
864 }
865 if (this.index < this.buffers.size()) {
866 final CharBuffer cb = this.buffers.get(this.index++);
867 this.buffer = cb.array();
868 this.count = cb.limit();
869 }
870 }
871
872 private void checkNotClosed() throws IOException {
873 if (this.closed) {
874 throw new IOException("Reader has been closed");
875 }
876 }
877
878 private static final class Fetcher implements Runnable {
879
880 private static final Map<Reader, Fetcher> FETCHERS = new WeakHashMap<Reader, Fetcher>();
881
882 private static final Object EOF = new Object();
883
884 private final BlockingQueue<Object> queue;
885
886 private Reader reader;
887
888 private final char delimiter;
889
890 private final List<CharBuffer> buffers;
891
892 private int references;
893
894 private Throwable exception;
895
896 private final CountDownLatch latch;
897
898 private Fetcher(final Reader reader, final char delimiter) {
899 this.queue = new ArrayBlockingQueue<Object>(BUFFER_NUM_READ, false);
900 this.reader = reader;
901 this.delimiter = delimiter;
902 this.buffers = new ArrayList<CharBuffer>();
903 this.references = 0;
904 this.exception = null;
905 this.latch = new CountDownLatch(1);
906
907 Environment.getPool().submit(this);
908 }
909
910 private void release(final CharBuffer buffer) {
911 synchronized (this.buffers) {
912 if (this.buffers.size() < BUFFER_NUM_READ + Environment.getCores() + 1) {
913 buffer.clear();
914 this.buffers.add(buffer);
915 }
916 }
917 }
918
919 private CharBuffer allocate() {
920 synchronized (this.buffers) {
921 if (!this.buffers.isEmpty()) {
922 return this.buffers.remove(this.buffers.size() - 1);
923 }
924 }
925 return CharBuffer.allocate(BUFFER_SIZE);
926 }
927
928 public void open() {
929 synchronized (this) {
930 if (this.references < 0) {
931 throw new IllegalStateException("Reader has been closed");
932 }
933 ++this.references;
934 }
935 }
936
937 public void close() throws IOException {
938 synchronized (this) {
939 --this.references;
940 if (this.references != 0) {
941 return;
942 }
943 this.references = -1;
944 }
945 this.queue.clear();
946 while (true) {
947 try {
948 this.latch.await();
949 break;
950 } catch (final InterruptedException ex) {
951
952 }
953 }
954 synchronized (FETCHERS) {
955 FETCHERS.remove(this.reader);
956 }
957 this.queue.clear();
958 this.buffers.clear();
959 this.reader = null;
960 synchronized (this) {
961 if (this.exception != null) {
962 propagate(this.exception);
963 }
964 }
965 }
966
967 @SuppressWarnings("unchecked")
968 public void fetch(final List<CharBuffer> buffers) throws IOException {
969 try {
970 synchronized (this) {
971 if (this.exception != null) {
972 throw this.exception;
973 }
974 }
975 for (final CharBuffer buffer : buffers) {
976 release(buffer);
977 }
978 buffers.clear();
979 final Object object = this.queue.take();
980 if (object == EOF) {
981 this.queue.add(EOF);
982 return;
983 }
984 buffers.addAll((List<CharBuffer>) object);
985 } catch (IOException | RuntimeException | Error ex) {
986 throw ex;
987 } catch (final Throwable ex) {
988 throw new IOException(ex);
989 }
990 }
991
992 @Override
993 public void run() {
994
995 try {
996 CharBuffer restBuffer = allocate();
997 List<CharBuffer> buffers = new ArrayList<CharBuffer>();
998
999 boolean eof = false;
1000 while (!eof) {
1001
1002 synchronized (this) {
1003 if (this.references < 0) {
1004 break;
1005 }
1006 }
1007
1008 final CharBuffer curBuffer = restBuffer;
1009 while (!eof && curBuffer.hasRemaining()) {
1010 final int n = this.reader.read(curBuffer);
1011 eof = n < 0;
1012 }
1013 curBuffer.flip();
1014 buffers.add(curBuffer);
1015
1016 restBuffer = allocate();
1017 if (!eof) {
1018 final char[] curChars = curBuffer.array();
1019 final int curLastIndex = curBuffer.limit() - 1;
1020 for (int i = curLastIndex; i >= 0; --i) {
1021 if (curChars[i] == this.delimiter) {
1022 restBuffer.position(curLastIndex - i);
1023 System.arraycopy(curChars, i + 1, restBuffer.array(), 0,
1024 restBuffer.position());
1025 curBuffer.limit(i + 1);
1026 this.queue.put(buffers);
1027 buffers = new ArrayList<CharBuffer>();
1028 break;
1029 }
1030 }
1031 }
1032 }
1033
1034 this.queue.put(buffers);
1035
1036 } catch (final Throwable ex) {
1037 synchronized (this) {
1038 this.exception = ex;
1039 }
1040 }
1041
1042 try {
1043 closeQuietly(this.reader);
1044
1045 while (true) {
1046 try {
1047 this.queue.put(EOF);
1048 break;
1049 } catch (final InterruptedException ex) {
1050
1051 }
1052 }
1053 } finally {
1054 this.latch.countDown();
1055 }
1056 }
1057
1058 public static Fetcher forReader(final Reader reader, final char delimiter) {
1059 synchronized (FETCHERS) {
1060 Fetcher fetcher = FETCHERS.get(reader);
1061 if (fetcher == null) {
1062 fetcher = new Fetcher(reader, delimiter);
1063 FETCHERS.put(reader, fetcher);
1064 } else if (fetcher.delimiter != delimiter) {
1065 throw new IllegalStateException("Already reading from reader " + reader
1066 + " using delimiter " + delimiter);
1067 }
1068 return fetcher;
1069 }
1070 }
1071
1072 }
1073
1074 }
1075
1076 private static final class ParallelBufferedWriter extends Writer {
1077
1078 private Emitter emitter;
1079
1080 private final char delimiter;
1081
1082 private final List<CharBuffer> buffers;
1083
1084 private char[] buffer;
1085
1086 private int count;
1087
1088 private int threshold;
1089
1090 private boolean closed;
1091
1092 ParallelBufferedWriter(final Writer writer, final char delimiter) {
1093 this.emitter = Emitter.forWriter(writer);
1094 this.delimiter = delimiter;
1095 this.buffers = new ArrayList<CharBuffer>();
1096 this.buffer = new char[2 * BUFFER_SIZE];
1097 this.count = 0;
1098 this.threshold = BUFFER_SIZE;
1099 this.closed = false;
1100 this.emitter.open();
1101 }
1102
1103 @Override
1104 public void write(final int c) throws IOException {
1105 if (this.count < this.threshold) {
1106 this.buffer[this.count++] = (char) c;
1107 } else {
1108 writeAndTryFlush((char) c);
1109 }
1110 }
1111
1112 @Override
1113 public void write(final char[] cbuf, int off, int len) throws IOException {
1114 final int available = this.threshold - this.count;
1115 if (available >= len) {
1116 System.arraycopy(cbuf, off, this.buffer, this.count, len);
1117 this.count += len;
1118 return;
1119 }
1120 if (available > 0) {
1121 System.arraycopy(cbuf, off, this.buffer, this.count, available);
1122 this.count += available;
1123 off += available;
1124 len -= available;
1125 }
1126 final int end = off + len;
1127 while (off < end) {
1128 writeAndTryFlush(cbuf[off++]);
1129 }
1130 }
1131
1132 @Override
1133 public void write(final String str, int off, int len) throws IOException {
1134 final int available = this.threshold - this.count;
1135 final int end = off + len;
1136 if (available >= len) {
1137 str.getChars(off, end, this.buffer, this.count);
1138 this.count += len;
1139 return;
1140 }
1141 if (available > 0) {
1142 str.getChars(off, off + available, this.buffer, this.count);
1143 this.count += available;
1144 off += available;
1145 len -= available;
1146 }
1147 while (off < end) {
1148 writeAndTryFlush(str.charAt(off++));
1149 }
1150 }
1151
1152 @Override
1153 public void flush() throws IOException {
1154 flushBuffers();
1155 }
1156
1157 @Override
1158 public void close() throws IOException {
1159 synchronized (this.buffers) {
1160 if (this.closed) {
1161 return;
1162 }
1163 flushBuffers();
1164 this.closed = true;
1165 }
1166 this.buffers.clear();
1167 this.buffer = null;
1168 this.emitter.close();
1169 this.emitter = null;
1170 }
1171
1172 private void writeAndTryFlush(final char c) throws IOException {
1173 this.buffer[this.count++] = c;
1174 if (c == this.delimiter) {
1175 flushBuffers();
1176 } else if (this.count == this.buffer.length) {
1177 checkNotClosed();
1178 this.buffers.add(CharBuffer.wrap(this.buffer));
1179 this.buffer = new char[BUFFER_SIZE];
1180 this.count = 0;
1181 this.threshold = 0;
1182 }
1183 }
1184
1185 private void flushBuffers() throws IOException {
1186 checkNotClosed();
1187 if (this.count > 0) {
1188 final CharBuffer cb = CharBuffer.wrap(this.buffer);
1189 cb.limit(this.count);
1190 this.buffers.add(cb);
1191 }
1192 this.emitter.emit(this.buffers);
1193 if (!this.buffers.isEmpty()) {
1194 this.buffer = this.buffers.get(0).array();
1195 this.buffers.clear();
1196 }
1197 this.count = 0;
1198 this.threshold = BUFFER_SIZE;
1199 }
1200
1201 private void checkNotClosed() throws IOException {
1202 if (this.closed) {
1203 throw new IOException("Writer has been closed");
1204 }
1205 }
1206
1207 private static class Emitter implements Runnable {
1208
1209 private static final Map<Writer, Emitter> EMITTERS = new WeakHashMap<Writer, Emitter>();
1210
1211 private static final Object EOF = new Object();
1212
1213 private final BlockingQueue<Object> queue;
1214
1215 private final List<CharBuffer> buffers;
1216
1217 private Writer writer;
1218
1219 private int references;
1220
1221 private Throwable exception;
1222
1223 private final CountDownLatch latch;
1224
1225 private Emitter(final Writer writer) {
1226 this.queue = new ArrayBlockingQueue<Object>(BUFFER_NUM_WRITE, false);
1227 this.writer = writer;
1228 this.buffers = new ArrayList<CharBuffer>();
1229 this.references = 0;
1230 this.exception = null;
1231 this.latch = new CountDownLatch(1);
1232 Environment.getPool().submit(this);
1233 }
1234
1235 private void release(final CharBuffer buffer) {
1236 synchronized (this.buffers) {
1237 if (this.buffers.size() < BUFFER_NUM_WRITE + Environment.getCores() + 1) {
1238 buffer.clear();
1239 this.buffers.add(buffer);
1240 }
1241 }
1242 }
1243
1244 private CharBuffer allocate() {
1245 synchronized (this.buffers) {
1246 if (!this.buffers.isEmpty()) {
1247 return this.buffers.remove(this.buffers.size() - 1);
1248 }
1249 }
1250 return CharBuffer.allocate(2 * BUFFER_SIZE);
1251 }
1252
1253 public void open() {
1254 synchronized (this) {
1255 if (this.references < 0) {
1256 throw new IllegalStateException("Stream has been closed");
1257 }
1258 ++this.references;
1259 }
1260 }
1261
1262 public void close() throws IOException {
1263 synchronized (this) {
1264 --this.references;
1265 if (this.references != 0) {
1266 return;
1267 }
1268 this.references = -1;
1269 }
1270 while (true) {
1271 try {
1272 this.queue.put(EOF);
1273 break;
1274 } catch (final InterruptedException ex) {
1275
1276 }
1277 }
1278 while (true) {
1279 try {
1280 this.latch.await();
1281 break;
1282 } catch (final InterruptedException ex) {
1283
1284 }
1285 }
1286 synchronized (EMITTERS) {
1287 EMITTERS.remove(this.writer);
1288 }
1289 this.queue.clear();
1290 this.buffers.clear();
1291 this.writer = null;
1292 synchronized (this) {
1293 if (this.exception != null) {
1294 propagate(this.exception);
1295 }
1296 }
1297 }
1298
1299 public void emit(final List<CharBuffer> buffers) throws IOException {
1300 try {
1301 synchronized (this) {
1302 if (this.exception != null) {
1303 throw this.exception;
1304 }
1305 }
1306 this.queue.put(new ArrayList<CharBuffer>(buffers));
1307 buffers.clear();
1308 buffers.add(allocate());
1309 } catch (IOException | RuntimeException | Error ex) {
1310 throw ex;
1311 } catch (final Throwable ex) {
1312 throw new IOException(ex);
1313 }
1314 }
1315
1316 @SuppressWarnings("unchecked")
1317 @Override
1318 public void run() {
1319 try {
1320 while (true) {
1321 final Object object = this.queue.take();
1322 if (object == EOF) {
1323 break;
1324 }
1325 final List<CharBuffer> buffers = (List<CharBuffer>) object;
1326 for (final CharBuffer buffer : buffers) {
1327 this.writer.write(buffer.array(), buffer.position(), buffer.limit());
1328 }
1329 if (!buffers.isEmpty()) {
1330 release(buffers.get(0));
1331 }
1332 }
1333 } catch (final Throwable ex) {
1334 synchronized (this) {
1335 this.exception = ex;
1336 }
1337 this.queue.clear();
1338 } finally {
1339 closeQuietly(this.writer);
1340 this.latch.countDown();
1341 }
1342 }
1343
1344 public static Emitter forWriter(final Writer writer) {
1345 synchronized (EMITTERS) {
1346 Emitter manager = EMITTERS.get(writer);
1347 if (manager == null) {
1348 manager = new Emitter(writer);
1349 EMITTERS.put(writer, manager);
1350 }
1351 return manager;
1352 }
1353 }
1354
1355 }
1356
1357 }
1358
1359 private static final class ParallelBufferedInputStream extends InputStream {
1360
1361 private Fetcher fetcher;
1362
1363 private final List<ByteBuffer> buffers;
1364
1365 private int index;
1366
1367 private byte[] buffer;
1368
1369 private int count;
1370
1371 private int pos;
1372
1373 private boolean closed;
1374
1375 ParallelBufferedInputStream(final InputStream stream, final byte delimiter) {
1376 this.fetcher = Fetcher.forStream(stream, delimiter);
1377 this.buffers = new ArrayList<ByteBuffer>();
1378 this.index = 0;
1379 this.buffer = null;
1380 this.count = 0;
1381 this.pos = 0;
1382 this.closed = false;
1383 this.fetcher.open();
1384 }
1385
1386 @Override
1387 public int read() throws IOException {
1388 if (this.pos >= this.count) {
1389 fill();
1390 if (this.count == 0) {
1391 return -1;
1392 }
1393 }
1394 return this.buffer[this.pos++] & 0xFF;
1395 }
1396
1397 @Override
1398 public int read(final byte[] buf, final int off, final int len) throws IOException {
1399 if ((off | len | off + len | buf.length - (off + len)) < 0) {
1400 throw new IndexOutOfBoundsException();
1401 }
1402 if (len == 0) {
1403 checkNotClosed();
1404 return 0;
1405 }
1406 final int available = this.count - this.pos;
1407 if (available == 0) {
1408 fill();
1409 if (this.count == 0) {
1410 return -1;
1411 }
1412 }
1413 final int n = available > len ? len : available;
1414 System.arraycopy(this.buffer, this.pos, buf, off, n);
1415 this.pos += n;
1416 return n;
1417 }
1418
1419 @Override
1420 public long skip(final long n) throws IOException {
1421 if (n <= 0) {
1422 checkNotClosed();
1423 return 0;
1424 }
1425 int available = this.count - this.pos;
1426 if (available == 0) {
1427 fill();
1428 available = this.count;
1429 }
1430 final long skipped = available < n ? available : n;
1431 this.pos += skipped;
1432 return skipped;
1433 }
1434
1435 @Override
1436 public void reset() throws IOException {
1437 throw new IOException("Mark not supported");
1438 }
1439
1440 @Override
1441 public void mark(final int readlimit) {
1442 }
1443
1444 @Override
1445 public boolean markSupported() {
1446 return false;
1447 }
1448
1449 @Override
1450 public void close() throws IOException {
1451 synchronized (this.buffers) {
1452 if (this.closed) {
1453 return;
1454 }
1455 this.closed = true;
1456 }
1457 this.count = this.pos;
1458 this.buffers.clear();
1459 this.fetcher.close();
1460 this.fetcher = null;
1461 }
1462
1463 private void fill() throws IOException {
1464 checkNotClosed();
1465 if (this.buffer != null) {
1466 this.buffer = null;
1467 this.pos = 0;
1468 this.count = 0;
1469 }
1470 if (this.index == this.buffers.size()) {
1471 this.fetcher.fetch(this.buffers);
1472 this.index = 0;
1473 }
1474 if (this.index < this.buffers.size()) {
1475 final ByteBuffer buffer = this.buffers.get(this.index++);
1476 this.buffer = buffer.array();
1477 this.count = buffer.limit();
1478 }
1479 }
1480
1481 private void checkNotClosed() throws IOException {
1482 if (this.closed) {
1483 throw new IOException("Stream has been closed");
1484 }
1485 }
1486
1487 private static final class Fetcher implements Runnable {
1488
1489 private static final Map<InputStream, Fetcher> FETCHERS = new WeakHashMap<InputStream, Fetcher>();
1490
1491 private static final Object EOF = new Object();
1492
1493 private final BlockingQueue<Object> queue;
1494
1495 private InputStream stream;
1496
1497 private final byte delimiter;
1498
1499 private final List<ByteBuffer> buffers;
1500
1501 private int references;
1502
1503 private Throwable exception;
1504
1505 private final CountDownLatch latch;
1506
1507 private Fetcher(final InputStream stream, final byte delimiter) {
1508 this.queue = new ArrayBlockingQueue<Object>(BUFFER_NUM_READ, false);
1509 this.stream = stream;
1510 this.delimiter = delimiter;
1511 this.buffers = new ArrayList<ByteBuffer>();
1512 this.references = 0;
1513 this.exception = null;
1514 this.latch = new CountDownLatch(1);
1515
1516 Environment.getPool().submit(this);
1517 }
1518
1519 private void release(final ByteBuffer buffer) {
1520 synchronized (this.buffers) {
1521 if (this.buffers.size() < BUFFER_NUM_READ + Environment.getCores() + 1) {
1522 buffer.clear();
1523 this.buffers.add(buffer);
1524 }
1525 }
1526 }
1527
1528 private ByteBuffer allocate() {
1529 synchronized (this.buffers) {
1530 if (!this.buffers.isEmpty()) {
1531 return this.buffers.remove(this.buffers.size() - 1);
1532 }
1533 }
1534 return ByteBuffer.allocate(2 * BUFFER_SIZE);
1535 }
1536
1537 public void open() {
1538 synchronized (this) {
1539 if (this.references < 0) {
1540 throw new IllegalStateException("Reader has been closed");
1541 }
1542 ++this.references;
1543 }
1544 }
1545
1546 public void close() throws IOException {
1547 synchronized (this) {
1548 --this.references;
1549 if (this.references != 0) {
1550 return;
1551 }
1552 this.references = -1;
1553 }
1554 this.queue.clear();
1555 while (true) {
1556 try {
1557 this.latch.await();
1558 break;
1559 } catch (final InterruptedException ex) {
1560
1561 }
1562 }
1563 synchronized (FETCHERS) {
1564 FETCHERS.remove(this.stream);
1565 }
1566 this.queue.clear();
1567 this.buffers.clear();
1568 this.stream = null;
1569 synchronized (this) {
1570 if (this.exception != null) {
1571 propagate(this.exception);
1572 }
1573 }
1574 }
1575
1576 @SuppressWarnings("unchecked")
1577 public void fetch(final List<ByteBuffer> buffers) throws IOException {
1578 try {
1579 synchronized (this) {
1580 if (this.exception != null) {
1581 throw this.exception;
1582 }
1583 }
1584 for (final ByteBuffer buffer : buffers) {
1585 release(buffer);
1586 }
1587 buffers.clear();
1588 final Object object = this.queue.take();
1589 if (object == EOF) {
1590 this.queue.add(EOF);
1591 return;
1592 }
1593 buffers.addAll((List<ByteBuffer>) object);
1594 } catch (IOException | RuntimeException | Error ex) {
1595 throw ex;
1596 } catch (final Throwable ex) {
1597 throw new IOException(ex);
1598 }
1599 }
1600
1601 @Override
1602 public void run() {
1603
1604 try {
1605 ByteBuffer restBuffer = allocate();
1606 List<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
1607
1608 boolean eof = false;
1609 while (!eof) {
1610
1611 synchronized (this) {
1612 if (this.references < 0) {
1613 break;
1614 }
1615 }
1616
1617 final ByteBuffer curBuffer = restBuffer;
1618 final byte[] array = curBuffer.array();
1619
1620 while (!eof && curBuffer.hasRemaining()) {
1621 final int offset = curBuffer.position();
1622 final int len = curBuffer.remaining();
1623 final int n = this.stream.read(array, offset, len);
1624 eof = n < 0;
1625 if (!eof) {
1626 curBuffer.position(offset + n);
1627 }
1628 }
1629
1630 curBuffer.flip();
1631 buffers.add(curBuffer);
1632
1633 restBuffer = allocate();
1634 if (!eof) {
1635 final int curLastIndex = curBuffer.limit() - 1;
1636 for (int i = curLastIndex; i >= 0; --i) {
1637 if (array[i] == this.delimiter) {
1638 restBuffer.position(curLastIndex - i);
1639 System.arraycopy(array, i + 1, restBuffer.array(), 0,
1640 restBuffer.position());
1641 curBuffer.limit(i + 1);
1642 this.queue.put(buffers);
1643 buffers = new ArrayList<ByteBuffer>();
1644 break;
1645 }
1646 }
1647 }
1648 }
1649
1650 this.queue.put(buffers);
1651
1652 } catch (final Throwable ex) {
1653 synchronized (this) {
1654 this.exception = ex;
1655 }
1656 }
1657
1658 try {
1659 closeQuietly(this.stream);
1660
1661 while (true) {
1662 try {
1663 this.queue.put(EOF);
1664 break;
1665 } catch (final InterruptedException ex) {
1666
1667 }
1668 }
1669 } finally {
1670 this.latch.countDown();
1671 }
1672 }
1673
1674 public static Fetcher forStream(final InputStream stream, final byte delimiter) {
1675 synchronized (FETCHERS) {
1676 Fetcher fetcher = FETCHERS.get(stream);
1677 if (fetcher == null) {
1678 fetcher = new Fetcher(stream, delimiter);
1679 FETCHERS.put(stream, fetcher);
1680 } else if (fetcher.delimiter != delimiter) {
1681 throw new IllegalStateException("Already reading from stream " + stream
1682 + " using delimiter " + delimiter);
1683 }
1684 return fetcher;
1685 }
1686 }
1687
1688 }
1689
1690 }
1691
1692 private static final class ParallelBufferedOutputStream extends OutputStream {
1693
1694 private Emitter emitter;
1695
1696 private final byte delimiter;
1697
1698 private final List<ByteBuffer> buffers;
1699
1700 private byte[] buffer;
1701
1702 private int count;
1703
1704 private int threshold;
1705
1706 private boolean closed;
1707
1708 ParallelBufferedOutputStream(final OutputStream stream, final byte delimiter) {
1709 this.emitter = Emitter.forStream(stream);
1710 this.delimiter = delimiter;
1711 this.buffers = new ArrayList<ByteBuffer>();
1712 this.buffer = new byte[2 * BUFFER_SIZE];
1713 this.count = 0;
1714 this.threshold = BUFFER_SIZE;
1715 this.closed = false;
1716 this.emitter.open();
1717 }
1718
1719 @Override
1720 public void write(final int c) throws IOException {
1721 if (this.count < this.threshold) {
1722 this.buffer[this.count++] = (byte) c;
1723 } else {
1724 writeAndTryFlush((byte) c);
1725 }
1726 }
1727
1728 @Override
1729 public void write(final byte[] buf, int off, int len) throws IOException {
1730 final int available = this.threshold - this.count;
1731 if (available >= len) {
1732 System.arraycopy(buf, off, this.buffer, this.count, len);
1733 this.count += len;
1734 return;
1735 }
1736 if (available > 0) {
1737 System.arraycopy(buf, off, this.buffer, this.count, available);
1738 this.count += available;
1739 off += available;
1740 len -= available;
1741 }
1742 final int end = off + len;
1743 while (off < end) {
1744 writeAndTryFlush(buf[off++]);
1745 }
1746 }
1747
1748 @Override
1749 public void flush() throws IOException {
1750 flushBuffers();
1751 }
1752
1753 @Override
1754 public void close() throws IOException {
1755 synchronized (this.buffers) {
1756 if (this.closed) {
1757 return;
1758 }
1759 flushBuffers();
1760 this.closed = true;
1761 }
1762 this.buffers.clear();
1763 this.buffer = null;
1764 this.emitter.close();
1765 this.emitter = null;
1766 }
1767
1768 private void writeAndTryFlush(final byte c) throws IOException {
1769 this.buffer[this.count++] = c;
1770 if (c == this.delimiter) {
1771 flushBuffers();
1772 } else if (this.count == this.buffer.length) {
1773 checkNotClosed();
1774 this.buffers.add(ByteBuffer.wrap(this.buffer));
1775 this.buffer = new byte[BUFFER_SIZE];
1776 this.count = 0;
1777 this.threshold = 0;
1778 }
1779 }
1780
1781 private void flushBuffers() throws IOException {
1782 checkNotClosed();
1783 if (this.count > 0) {
1784 final ByteBuffer buffer = ByteBuffer.wrap(this.buffer);
1785 buffer.limit(this.count);
1786 this.buffers.add(buffer);
1787 }
1788 this.emitter.emit(this.buffers);
1789 if (!this.buffers.isEmpty()) {
1790 this.buffer = this.buffers.get(0).array();
1791 this.buffers.clear();
1792 }
1793 this.count = 0;
1794 this.threshold = BUFFER_SIZE;
1795 }
1796
1797 private void checkNotClosed() throws IOException {
1798 if (this.closed) {
1799 throw new IOException("Writer has been closed");
1800 }
1801 }
1802
1803 private static class Emitter implements Runnable {
1804
1805 private static final Map<OutputStream, Emitter> EMITTERS = new WeakHashMap<OutputStream, Emitter>();
1806
1807 private static final Object EOF = new Object();
1808
1809 private final BlockingQueue<Object> queue;
1810
1811 private final List<ByteBuffer> buffers;
1812
1813 private OutputStream stream;
1814
1815 private int references;
1816
1817 private Throwable exception;
1818
1819 private final CountDownLatch latch;
1820
1821 private Emitter(final OutputStream stream) {
1822 this.queue = new ArrayBlockingQueue<Object>(BUFFER_NUM_WRITE, false);
1823 this.stream = stream;
1824 this.buffers = new ArrayList<ByteBuffer>();
1825 this.references = 0;
1826 this.exception = null;
1827 this.latch = new CountDownLatch(1);
1828 Environment.getPool().submit(this);
1829 }
1830
1831 private void release(final ByteBuffer buffer) {
1832 synchronized (this.buffers) {
1833 if (this.buffers.size() < BUFFER_NUM_WRITE + Environment.getCores() + 1) {
1834 buffer.clear();
1835 this.buffers.add(buffer);
1836 }
1837 }
1838 }
1839
1840 private ByteBuffer allocate() {
1841 synchronized (this.buffers) {
1842 if (!this.buffers.isEmpty()) {
1843 return this.buffers.remove(this.buffers.size() - 1);
1844 }
1845 }
1846 return ByteBuffer.allocate(2 * BUFFER_SIZE);
1847 }
1848
1849 public void open() {
1850 synchronized (this) {
1851 if (this.references < 0) {
1852 throw new IllegalStateException("Stream has been closed");
1853 }
1854 ++this.references;
1855 }
1856 }
1857
1858 public void close() throws IOException {
1859 synchronized (this) {
1860 --this.references;
1861 if (this.references != 0) {
1862 return;
1863 }
1864 this.references = -1;
1865 }
1866 while (true) {
1867 try {
1868 this.queue.put(EOF);
1869 break;
1870 } catch (final InterruptedException ex) {
1871
1872 }
1873 }
1874 while (true) {
1875 try {
1876 this.latch.await();
1877 break;
1878 } catch (final InterruptedException ex) {
1879
1880 }
1881 }
1882 synchronized (EMITTERS) {
1883 EMITTERS.remove(this.stream);
1884 }
1885 this.queue.clear();
1886 this.buffers.clear();
1887 this.stream = null;
1888 synchronized (this) {
1889 if (this.exception != null) {
1890 propagate(this.exception);
1891 }
1892 }
1893 }
1894
1895 public void emit(final List<ByteBuffer> buffers) throws IOException {
1896 try {
1897 synchronized (this) {
1898 if (this.exception != null) {
1899 throw this.exception;
1900 }
1901 }
1902 this.queue.put(new ArrayList<ByteBuffer>(buffers));
1903 buffers.clear();
1904 buffers.add(allocate());
1905 } catch (final Throwable ex) {
1906 propagate(ex);
1907 }
1908 }
1909
1910 @SuppressWarnings("unchecked")
1911 @Override
1912 public void run() {
1913 try {
1914 while (true) {
1915 final Object object = this.queue.take();
1916 if (object == EOF) {
1917 break;
1918 }
1919 final List<ByteBuffer> buffers = (List<ByteBuffer>) object;
1920 for (final ByteBuffer buffer : buffers) {
1921 this.stream.write(buffer.array(), buffer.position(), buffer.limit());
1922 }
1923 if (!buffers.isEmpty()) {
1924 release(buffers.get(0));
1925 }
1926 }
1927 } catch (final Throwable ex) {
1928 synchronized (this) {
1929 this.exception = ex;
1930 }
1931 this.queue.clear();
1932 } finally {
1933 closeQuietly(this.stream);
1934 this.latch.countDown();
1935 }
1936 }
1937
1938 public static Emitter forStream(final OutputStream stream) {
1939 synchronized (EMITTERS) {
1940 Emitter emitter = EMITTERS.get(stream);
1941 if (emitter == null) {
1942 emitter = new Emitter(stream);
1943 EMITTERS.put(stream, emitter);
1944 }
1945 return emitter;
1946 }
1947 }
1948
1949 }
1950
1951 }
1952
1953 private static final class UTF8Reader extends Reader {
1954
1955 private final InputStream stream;
1956
1957 private boolean closed;
1958
1959 public UTF8Reader(final InputStream stream) {
1960 this.stream = stream;
1961 this.closed = false;
1962 }
1963
1964 @Override
1965 public int read() throws IOException {
1966 final int b0 = this.stream.read();
1967 return (b0 & 0xFFFFFF80) == 0 ? b0 : readHelper(b0);
1968 }
1969
1970 private int readHelper(final int b0) throws IOException {
1971
1972 if (b0 < 0) {
1973 return -1;
1974
1975 } else if (b0 <= 0b11011111) {
1976 final int b1 = this.stream.read();
1977 if ((b1 & 0b11000000) == 0b10000000) {
1978 return (b0 & 0b00011111) << 6 | b1 & 0b00111111;
1979 }
1980
1981 } else if (b0 <= 0b11101111) {
1982 final int b1 = this.stream.read();
1983 final int b2 = this.stream.read();
1984 if ((b1 & 0b11000000) == 0b10000000 && (b2 & 0b11000000) == 0b10000000) {
1985 return (b0 & 0b00001111) << 12 | (b1 & 0b00111111) << 6 | b2 & 0b00111111;
1986 }
1987
1988 } else if (b0 <= 0b11110111) {
1989 final int b1 = this.stream.read();
1990 final int b2 = this.stream.read();
1991 final int b3 = this.stream.read();
1992 if ((b1 & 0b11000000) == 0b10000000 && (b2 & 0b11000000) == 0b10000000
1993 && (b3 & 0b11000000) == 0b10000000) {
1994 return (b0 & 0b00000111) << 18 | (b1 & 0b00111111) << 12
1995 | (b2 & 0b00111111) << 6 | b3 & 0b00111111;
1996 }
1997 }
1998
1999 throw new IOException("Invalid/truncated UTF8 code");
2000 }
2001
2002 @Override
2003 public int read(final char[] buf, final int off, final int len) throws IOException {
2004 if ((off | len | off + len | buf.length - (off + len)) < 0) {
2005 throw new IndexOutOfBoundsException();
2006 }
2007 if (len == 0) {
2008 checkNotClosed();
2009 return 0;
2010 }
2011 int index = off;
2012 int c = read();
2013 if (c < 0) {
2014 return -1;
2015 }
2016 buf[index++] = (char) c;
2017 final int end = off + Math.min(len, this.stream.available() / 2);
2018 while (index < end) {
2019 c = read();
2020 if (c < 0) {
2021 break;
2022 }
2023 buf[index++] = (char) c;
2024 }
2025 return index - off;
2026 }
2027
2028 @Override
2029 public long skip(final long n) throws IOException {
2030 if (n == 0L) {
2031 checkNotClosed();
2032 return 0L;
2033 }
2034 final int skippable = this.stream.available() / 2;
2035 int toSkip = skippable;
2036 do {
2037 final int c = read();
2038 if (c < 0) {
2039 break;
2040 }
2041 --toSkip;
2042 } while (toSkip > 0);
2043 return skippable - toSkip;
2044 }
2045
2046 @Override
2047 public boolean ready() throws IOException {
2048 return this.stream.available() >= 4;
2049 }
2050
2051 @Override
2052 public void reset() throws IOException {
2053 throw new IOException("Mark not supported");
2054 }
2055
2056 @Override
2057 public void mark(final int readlimit) {
2058 }
2059
2060 @Override
2061 public boolean markSupported() {
2062 return false;
2063 }
2064
2065 @Override
2066 public void close() throws IOException {
2067 synchronized (this) {
2068 if (this.closed) {
2069 return;
2070 }
2071 this.closed = true;
2072 }
2073 this.stream.close();
2074 }
2075
2076 private void checkNotClosed() throws IOException {
2077 if (this.closed) {
2078 throw new IOException("Reader has been closed");
2079 }
2080 }
2081
2082 }
2083
2084 private static final class UTF8Writer extends Writer {
2085
2086 private final OutputStream stream;
2087
2088 private boolean closed;
2089
2090 UTF8Writer(final OutputStream stream) {
2091 this.stream = stream;
2092 this.closed = false;
2093 }
2094
2095 @Override
2096 public void write(final int c) throws IOException {
2097 if (c <= 0b1111111) {
2098 this.stream.write(c);
2099 } else {
2100 writeHelper(c);
2101 }
2102 }
2103
2104 private void writeHelper(final int c) throws IOException {
2105
2106 if (c <= 0b11111_111111) {
2107 this.stream.write(0b11000000 | c >>> 6);
2108 this.stream.write(0b10000000 | c & 0b00111111);
2109
2110 } else if (c <= 0b1111_111111_111111) {
2111 this.stream.write(0b11100000 | c >>> 12);
2112 this.stream.write(0b10000000 | c >>> 6 & 0b00111111);
2113 this.stream.write(0b10000000 | c & 0b00111111);
2114
2115 } else if (c <= 0b111_111111_111111_111111) {
2116 this.stream.write(0b11110000 | c >>> 18);
2117 this.stream.write(0b10000000 | c >>> 12 & 0b00111111);
2118 this.stream.write(0b10000000 | c >>> 6 & 0b00111111);
2119 this.stream.write(0b10000000 | c & 0b00111111);
2120
2121 } else {
2122 throw new IOException("Invalid code point " + c);
2123 }
2124 }
2125
2126 @Override
2127 public void write(final char[] cbuf, final int off, final int len) throws IOException {
2128 final int end = off + len;
2129 for (int index = off; index < end; ++index) {
2130 write(cbuf[index]);
2131 }
2132 }
2133
2134 @Override
2135 public void write(final String str, final int off, final int len) throws IOException {
2136 final int end = off + len;
2137 for (int index = off; index < end; ++index) {
2138 write(str.charAt(index));
2139 }
2140 }
2141
2142 @Override
2143 public void flush() throws IOException {
2144 this.stream.flush();
2145 }
2146
2147 @Override
2148 public void close() throws IOException {
2149 synchronized (this) {
2150 if (this.closed) {
2151 return;
2152 }
2153 this.closed = true;
2154 }
2155 this.stream.close();
2156 }
2157
2158 }
2159
2160 }