1   /*
2    * RDFpro - An extensible tool for building stream-oriented RDF processing libraries.
3    * 
4    * Written in 2014 by Francesco Corcoglioniti with support by Marco Amadori, Michele Mostarda,
5    * Alessio Palmero Aprosio and Marco Rospocher. Contact info on http://rdfpro.fbk.eu/
6    * 
7    * To the extent possible under law, the authors have dedicated all copyright and related and
8    * neighboring rights to this software to the public domain worldwide. This software is
9    * distributed without any warranty.
10   * 
11   * You should have received a copy of the CC0 Public Domain Dedication along with this software.
12   * If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.
13   */
14  package eu.fbk.rdfpro.util;
15  
16  import java.io.File;
17  import java.io.FileInputStream;
18  import java.io.FileOutputStream;
19  import java.io.FilterOutputStream;
20  import java.io.IOException;
21  import java.io.InputStream;
22  import java.io.OutputStream;
23  import java.io.Reader;
24  import java.io.Writer;
25  import java.lang.ProcessBuilder.Redirect;
26  import java.net.MalformedURLException;
27  import java.net.URISyntaxException;
28  import java.net.URL;
29  import java.nio.ByteBuffer;
30  import java.nio.CharBuffer;
31  import java.util.ArrayList;
32  import java.util.List;
33  import java.util.Map;
34  import java.util.Objects;
35  import java.util.WeakHashMap;
36  import java.util.concurrent.ArrayBlockingQueue;
37  import java.util.concurrent.BlockingQueue;
38  import java.util.concurrent.CountDownLatch;
39  import java.util.concurrent.atomic.AtomicBoolean;
40  
41  import javax.annotation.Nullable;
42  
43  import org.slf4j.Logger;
44  import org.slf4j.LoggerFactory;
45  
46  // note on buffered stream thread safety: they are not thread safe and are expected to be used by
47  // a single thread, with the exception of method close() which can be called concurrently by other
48  // threads (so to guarantee e.g. asynchronous write/read termination) and it is synchronized so
49  // that it is run exactly once
50  
51  public final class IO {
52  
53      private static final Logger LOGGER = LoggerFactory.getLogger(IO.class);
54  
55      // sequential read performances varying buffer size on test TQL file
56      // 8k - 380781 tr/s
57      // 16k - 383115 tr/s
58      // 32k - 386237 tr/s
59      // 64k - 394210 tr/s
60      // 128k - 398547 tr/s
61      // 256k - 396883 tr/s
62      // 512k - 389184 tr/s
63      // 1M - 383701 tr/s
64      // note: pipe buffer on linux is 64k
65  
66      private static final int BUFFER_SIZE = Integer.parseInt(Environment.getProperty(
67              "rdfpro.buffer.size", "" + 64 * 1024));
68  
69      // parallel read performances varying queue size on test TQL file, buffer = 64K
70      // 16 * 64k (1M) - 603k tr/s
71      // 64 * 64k (4M) - 605k-616k tr/s
72      // 128 * 64k (8M) - 601k-618k tr/s
73      // 256 * 64k (16M) - 624k-631k tr/s
74      // 1024 * 64k (64M) - 625k tr/s
75  
76      private static final int BUFFER_NUM_READ = Integer.parseInt(Environment.getProperty(
77              "rdfpro.buffer.numr", "256"));
78  
79      private static final int BUFFER_NUM_WRITE = Integer.parseInt(Environment.getProperty(
80              "rdfpro.buffer.numw", "16"));
81  
82      @Nullable
83      public static <T> T closeQuietly(@Nullable final T object) {
84          if (object instanceof AutoCloseable) {
85              try {
86                  ((AutoCloseable) object).close();
87              } catch (final Throwable ex) {
88                  LOGGER.error("Error closing " + object.getClass().getSimpleName(), ex);
89              }
90          }
91          return object;
92      }
93  
94      public static URL extractURL(final String location) {
95          Objects.requireNonNull(location);
96          try {
97              final int index = location.indexOf(':');
98              if (index < 0) {
99                  return new File(location).toURI().toURL();
100             }
101             String s = location.charAt(0) != '.' ? location : location.substring(index + 1);
102             if (s.startsWith("classpath:")) {
103                 s = s.substring("classpath:".length());
104                 return Objects.requireNonNull(IO.class.getResource(s));
105             } else {
106                 try {
107                     return new URL(s);
108                 } catch (final MalformedURLException ex) {
109                     return new File(s).toURI().toURL();
110                 }
111             }
112         } catch (final Throwable ex) {
113             throw new IllegalArgumentException("Cannot extract URL from '" + location + "'", ex);
114         }
115     }
116 
117     public static String extractExtension(final String location) {
118         Objects.requireNonNull(location);
119         final int index = location.indexOf(':');
120         int extEnd = location.length();
121         if (index >= 0) {
122             if (location.charAt(0) == '.') {
123                 return location.substring(0, index);
124             }
125             int index2 = location.lastIndexOf('#');
126             index2 = index2 >= 0 ? index2 : location.length();
127             extEnd = location.lastIndexOf('?', index2);
128             extEnd = extEnd >= 0 ? extEnd : index2;
129         }
130         final int nameStart = Math.max(-1, location.lastIndexOf('/', extEnd)) + 1;
131         final int extStart = location.indexOf('.', nameStart);
132         return extStart < 0 ? "" : location.substring(extStart, extEnd);
133     }
134 
135     public static InputStream read(final String location) throws IOException {
136 
137         final String ext = extractExtension(location);
138         final URL url = extractURL(location);
139 
140         String cmd = null;
141         if (ext.endsWith(".bz2")) {
142             cmd = Environment.getProperty("rdfpro.cmd.bzip2", "bzip2") + " -dck";
143         } else if (ext.endsWith(".gz")) {
144             cmd = Environment.getProperty("rdfpro.cmd.gzip", "gzip") + " -dc";
145         } else if (ext.endsWith(".xz")) {
146             cmd = Environment.getProperty("rdfpro.cmd.xz", "xz") + " -dc";
147         } else if (ext.endsWith(".7z")) {
148             cmd = Environment.getProperty("rdfpro.cmd.7za", "7za") + " -so e";
149         } else if (ext.endsWith(".lz4")) {
150             cmd = Environment.getProperty("rdfpro.cmd.lz4", "lz4") + " -dc";
151         }
152 
153         if ("file".equals(url.getProtocol())) {
154             final File file;
155             try {
156                 file = new File(url.toURI());
157             } catch (final URISyntaxException ex) {
158                 throw new IllegalArgumentException("Invalid file:// URL: " + location);
159             }
160 
161             if (cmd == null) {
162                 LOGGER.debug("Reading file {}", file);
163                 return new FileInputStream(file);
164 
165             } else {
166                 LOGGER.debug("Reading file {} using {}", file, cmd);
167                 cmd += " " + file.getAbsolutePath();
168                 final Process process = new ProcessBuilder(cmd.split("\\s+")) //
169                         .redirectError(Redirect.INHERIT).start();
170                 return process.getInputStream();
171             }
172 
173         } else {
174             final InputStream stream = url.openStream();
175             if (cmd == null) {
176                 LOGGER.debug("Downloading file {}", url);
177                 return stream;
178 
179             } else {
180                 LOGGER.debug("Downloading file {} using {}", url, cmd);
181                 final Process process = new ProcessBuilder(cmd.split("\\s+")) //
182                         .redirectError(Redirect.INHERIT).start();
183                 Environment.getPool().execute(new Runnable() {
184 
185                     @Override
186                     public void run() {
187                         try {
188                             final byte[] buffer = new byte[8 * 1024];
189                             while (true) {
190                                 final int count = stream.read(buffer);
191                                 if (count < 0) {
192                                     break;
193                                 }
194                                 process.getOutputStream().write(buffer, 0, count);
195                             }
196                             process.getOutputStream().close();
197 
198                         } catch (final Throwable ex) {
199                             LOGGER.error("Error reading from " + url, ex);
200                             process.destroy();
201                         } finally {
202                             closeQuietly(stream);
203                         }
204                     }
205                 });
206                 return process.getInputStream();
207             }
208         }
209     }
210 
211     public static OutputStream write(final String location) throws IOException {
212 
213         final String ext = extractExtension(location);
214         final URL url = extractURL(location);
215 
216         if (!"file".equals(url.getProtocol())) {
217             throw new IllegalArgumentException("Cannot write to non-file URL " + location);
218         }
219 
220         final File file;
221         try {
222             file = new File(url.toURI());
223         } catch (final URISyntaxException ex) {
224             throw new IllegalArgumentException("Invalid file:// URL: " + location);
225         }
226 
227         final String cmd;
228         if (ext.endsWith(".bz2")) {
229             cmd = Environment.getProperty("rdfpro.cmd.bzip2", "bzip2") + " -c -9";
230         } else if (ext.endsWith(".gz")) {
231             cmd = Environment.getProperty("rdfpro.cmd.gzip", "gzip") + " -c -9";
232         } else if (ext.endsWith(".xz")) {
233             cmd = Environment.getProperty("rdfpro.cmd.xz", "xz") + " -c -9";
234         } else if (ext.endsWith(".lz4")) {
235             cmd = Environment.getProperty("rdfpro.cmd.lz4", "lz4") + " -c -9";
236         } else {
237             cmd = null;
238         }
239 
240         if (cmd == null) {
241             LOGGER.debug("Writing file {}", file);
242             return new FileOutputStream(file);
243 
244         } else {
245             LOGGER.debug("Writing file {} using {}", file, cmd);
246             final Process process = new ProcessBuilder(cmd.split("\\s+")) //
247                     .redirectOutput(file).redirectError(Redirect.INHERIT).start();
248             return new FilterOutputStream(process.getOutputStream()) {
249 
250                 private final AtomicBoolean closed = new AtomicBoolean(false);
251 
252                 @Override
253                 public void write(final int b) throws IOException {
254                     this.out.write(b);
255                 }
256 
257                 @Override
258                 public void write(final byte[] b) throws IOException {
259                     this.out.write(b);
260                 }
261 
262                 @Override
263                 public void write(final byte[] b, final int off, final int len) throws IOException {
264                     this.out.write(b, off, len);
265                 }
266 
267                 @Override
268                 public void close() throws IOException {
269                     if (this.closed.compareAndSet(false, true)) {
270                         LOGGER.debug("Completing '{}'", cmd);
271                         this.out.flush();
272                         this.out.close();
273                         try {
274                             final int code = process.waitFor();
275                             LOGGER.debug("Process completed with exit code {}", code);
276                         } catch (final InterruptedException ex) {
277                             throw new IOException("Didn't wait till IO completion", ex);
278                         } finally {
279                             process.destroy(); // not strictly necessary
280                         }
281                     }
282                 }
283 
284             };
285         }
286     }
287 
288     public static InputStream buffer(final InputStream stream) {
289         return new SimpleBufferedInputStream(stream);
290     }
291 
292     public static OutputStream buffer(final OutputStream stream) {
293         return new SimpleBufferedOutputStream(stream);
294     }
295 
296     public static Reader buffer(final Reader reader) {
297         return new SimpleBufferedReader(reader);
298     }
299 
300     public static Writer buffer(final Writer writer) {
301         return new SimpleBufferedWriter(writer);
302     }
303 
304     public static InputStream parallelBuffer(final InputStream stream, final byte delimiter) {
305         return new ParallelBufferedInputStream(stream, delimiter);
306     }
307 
308     public static OutputStream parallelBuffer(final OutputStream stream, final byte delimiter) {
309         return new ParallelBufferedOutputStream(stream, delimiter);
310     }
311 
312     public static Reader parallelBuffer(final Reader reader, final char delimiter) {
313         return new ParallelBufferedReader(reader, delimiter);
314     }
315 
316     public static Writer parallelBuffer(final Writer writer, final char delimiter) {
317         return new ParallelBufferedWriter(writer, delimiter);
318     }
319 
320     public static Reader utf8Reader(final InputStream stream) {
321         return new UTF8Reader(stream);
322     }
323 
324     public static Writer utf8Writer(final OutputStream stream) {
325         return new UTF8Writer(stream);
326     }
327 
328     private static void propagate(final Throwable ex) throws IOException {
329         if (ex instanceof RuntimeException) {
330             throw (RuntimeException) ex;
331         } else if (ex instanceof Error) {
332             throw (Error) ex;
333         } else if (ex instanceof IOException) {
334             throw (IOException) ex;
335         }
336         throw new IOException(ex);
337     }
338 
339     private IO() {
340     }
341 
342     private static final class SimpleBufferedInputStream extends InputStream {
343 
344         private final InputStream stream;
345 
346         private final byte buffer[];
347 
348         private int count;
349 
350         private int pos;
351 
352         private boolean closed;
353 
354         public SimpleBufferedInputStream(final InputStream stream) {
355             this.stream = Objects.requireNonNull(stream);
356             this.buffer = new byte[BUFFER_SIZE];
357             this.count = 0;
358             this.pos = 0;
359             this.closed = false;
360         }
361 
362         @Override
363         public int read() throws IOException {
364             if (this.pos >= this.count) {
365                 fill();
366                 if (this.pos >= this.count) {
367                     return -1;
368                 }
369             }
370             return this.buffer[this.pos++] & 0xFF;
371         }
372 
373         @Override
374         public int read(final byte buf[], int off, int len) throws IOException {
375             if ((off | len | off + len | buf.length - (off + len)) < 0) {
376                 throw new IndexOutOfBoundsException();
377             }
378             if (len == 0) {
379                 checkNotClosed();
380                 return 0;
381             }
382             int result = 0;
383             while (true) {
384                 final int available = this.count - this.pos;
385                 if (available > 0) {
386                     final int n = available > len ? len : available;
387                     System.arraycopy(this.buffer, this.pos, buf, off, n);
388                     this.pos += n;
389                     off += n;
390                     len -= n;
391                     result += n;
392                     if (len == 0 || this.stream.available() == 0) {
393                         return result;
394                     }
395                 }
396                 if (len >= BUFFER_SIZE) {
397                     final int n = this.stream.read(buf, off, len);
398                     result += n < 0 ? 0 : n;
399                     return result == 0 ? -1 : result;
400                 } else if (len > 0) {
401                     fill();
402                     if (this.count == 0) {
403                         return result == 0 ? -1 : result;
404                     }
405                 }
406             }
407         }
408 
409         @Override
410         public long skip(final long n) throws IOException {
411             if (n <= 0) {
412                 checkNotClosed();
413                 return 0;
414             }
415             final int available = this.count - this.pos;
416             if (available <= 0) {
417                 return this.stream.skip(n);
418             }
419             final long skipped = available < n ? available : n;
420             this.pos += skipped;
421             return skipped;
422         }
423 
424         @Override
425         public int available() throws IOException {
426             final int n = this.count - this.pos;
427             final int available = this.stream.available();
428             return n > Integer.MAX_VALUE - available ? Integer.MAX_VALUE : n + available;
429         }
430 
431         @Override
432         public void reset() throws IOException {
433             throw new IOException("Mark not supported");
434         }
435 
436         @Override
437         public void mark(final int readlimit) {
438         }
439 
440         @Override
441         public boolean markSupported() {
442             return false;
443         }
444 
445         @Override
446         public void close() throws IOException {
447             synchronized (this.buffer) {
448                 if (this.closed) {
449                     return;
450                 }
451                 this.closed = true;
452             }
453             this.count = this.pos; // fail soon in case a new write request is received
454             this.stream.close();
455         }
456 
457         private void fill() throws IOException {
458             checkNotClosed();
459             final int n = this.stream.read(this.buffer);
460             this.count = n < 0 ? 0 : n;
461             this.pos = 0;
462         }
463 
464         private void checkNotClosed() throws IOException {
465             if (this.closed) {
466                 throw new IOException("Stream has been closed");
467             }
468         }
469 
470     }
471 
472     private static final class SimpleBufferedOutputStream extends OutputStream {
473 
474         private final OutputStream stream;
475 
476         private final byte[] buffer;
477 
478         private int count; // num of bytes in the buffer
479 
480         private boolean closed;
481 
482         SimpleBufferedOutputStream(final OutputStream stream) {
483             this.stream = Objects.requireNonNull(stream);
484             this.buffer = new byte[BUFFER_SIZE];
485             this.count = 0;
486             this.closed = false;
487         }
488 
489         @Override
490         public void write(final int b) throws IOException {
491             if (this.count >= BUFFER_SIZE) {
492                 flushBuffer();
493             }
494             this.buffer[this.count++] = (byte) b;
495         }
496 
497         @Override
498         public void write(final byte buf[], int off, int len) throws IOException {
499             if (len >= BUFFER_SIZE) {
500                 flushBuffer();
501                 this.stream.write(buf, off, len);
502                 return;
503             }
504             final int available = BUFFER_SIZE - this.count;
505             if (available < len) {
506                 System.arraycopy(buf, off, this.buffer, this.count, available);
507                 this.count += available;
508                 off += available;
509                 len -= available;
510                 flushBuffer();
511             }
512             System.arraycopy(buf, off, this.buffer, this.count, len);
513             this.count += len;
514         }
515 
516         @Override
517         public void flush() throws IOException {
518             flushBuffer();
519             this.stream.flush();
520         }
521 
522         @Override
523         public void close() throws IOException {
524             synchronized (this.buffer) {
525                 if (this.closed) {
526                     return;
527                 }
528                 this.closed = true;
529             }
530             flushBuffer();
531             this.stream.close();
532             // this.count = BUFFER_SIZE; // fail soon in case a new write request is received
533         }
534 
535         private void flushBuffer() throws IOException {
536             if (this.count > 0) {
537                 this.stream.write(this.buffer, 0, this.count);
538                 this.count = 0;
539             }
540         }
541 
542     }
543 
544     private static final class SimpleBufferedReader extends Reader {
545 
546         private final Reader reader;
547 
548         private final char[] buffer;
549 
550         private int count;
551 
552         private int pos;
553 
554         private boolean closed;
555 
556         public SimpleBufferedReader(final Reader reader) {
557             this.reader = reader;
558             this.buffer = new char[BUFFER_SIZE];
559             this.count = 0;
560             this.pos = 0;
561             this.closed = false;
562         }
563 
564         @Override
565         public int read() throws IOException {
566             if (this.pos >= this.count) {
567                 fill();
568                 if (this.pos >= this.count) {
569                     return -1;
570                 }
571             }
572             return this.buffer[this.pos++] & 0xFFFF;
573         }
574 
575         @Override
576         public int read(final char[] cbuf, final int off, final int len) throws IOException {
577             if ((off | len | off + len | cbuf.length - (off + len)) < 0) {
578                 throw new IndexOutOfBoundsException();
579             }
580             if (len == 0) {
581                 checkNotClosed();
582                 return 0;
583             }
584             int available = this.count - this.pos;
585             if (available == 0) {
586                 if (len >= BUFFER_SIZE) {
587                     return this.reader.read(cbuf, off, len);
588                 } else {
589                     fill();
590                     available = this.count - this.pos;
591                     if (available == 0) {
592                         return -1;
593                     }
594                 }
595             }
596             final int n = available > len ? len : available;
597             System.arraycopy(this.buffer, this.pos, cbuf, off, n);
598             this.pos += n;
599             return n;
600         }
601 
602         @Override
603         public long skip(final long n) throws IOException {
604             if (n <= 0) {
605                 checkNotClosed();
606                 return 0;
607             }
608             final int available = this.count - this.pos;
609             if (available == 0) {
610                 return this.reader.skip(n);
611             }
612             final long skipped = available < n ? available : n;
613             this.pos += skipped;
614             return skipped;
615         }
616 
617         @Override
618         public void reset() throws IOException {
619             throw new IOException("Mark not supported");
620         }
621 
622         @Override
623         public void mark(final int readlimit) {
624         }
625 
626         @Override
627         public boolean markSupported() {
628             return false;
629         }
630 
631         @Override
632         public void close() throws IOException {
633             synchronized (this.buffer) {
634                 if (this.closed) {
635                     return;
636                 }
637                 this.closed = true;
638             }
639             this.closed = true;
640             this.count = this.pos; // fail soon in case a new write request is received
641             this.reader.close();
642         }
643 
644         private void fill() throws IOException {
645             checkNotClosed();
646             final int n = this.reader.read(this.buffer);
647             this.count = n < 0 ? 0 : n;
648             this.pos = 0;
649         }
650 
651         private void checkNotClosed() throws IOException {
652             if (this.closed) {
653                 throw new IOException("Reader has been closed");
654             }
655         }
656 
657     }
658 
659     private static final class SimpleBufferedWriter extends Writer {
660 
661         private final Writer writer;
662 
663         private final char[] buffer;
664 
665         private int count; // num of chars in the buffer
666 
667         private boolean closed;
668 
669         SimpleBufferedWriter(final Writer writer) {
670             this.writer = Objects.requireNonNull(writer);
671             this.buffer = new char[BUFFER_SIZE];
672             this.count = 0;
673             this.closed = false;
674         }
675 
676         @Override
677         public void write(final int c) throws IOException {
678             if (this.count >= BUFFER_SIZE) {
679                 flushBuffer();
680             }
681             this.buffer[this.count++] = (char) c;
682         }
683 
684         @Override
685         public void write(final char[] cbuf, int off, int len) throws IOException {
686             if (len >= BUFFER_SIZE) {
687                 flushBuffer();
688                 this.writer.write(cbuf, off, len);
689                 return;
690             }
691             final int available = BUFFER_SIZE - this.count;
692             if (available < len) {
693                 System.arraycopy(cbuf, off, this.buffer, this.count, available);
694                 this.count += available;
695                 off += available;
696                 len -= available;
697                 flushBuffer();
698             }
699             System.arraycopy(cbuf, off, this.buffer, this.count, len);
700             this.count += len;
701         }
702 
703         @Override
704         public void write(final String str, int off, int len) throws IOException {
705             if (len >= BUFFER_SIZE) {
706                 flushBuffer();
707                 this.writer.write(str, off, len);
708                 return;
709             }
710             final int available = BUFFER_SIZE - this.count;
711             if (available < len) {
712                 str.getChars(off, off + available, this.buffer, this.count);
713                 this.count += available;
714                 off += available;
715                 len -= available;
716                 flushBuffer();
717             }
718             str.getChars(off, off + len, this.buffer, this.count);
719             this.count += len;
720         }
721 
722         @Override
723         public void flush() throws IOException {
724             flushBuffer();
725             this.writer.flush();
726         }
727 
728         @Override
729         public void close() throws IOException {
730             synchronized (this.buffer) {
731                 if (this.closed) {
732                     return;
733                 }
734                 this.closed = true;
735             }
736             flushBuffer();
737             this.writer.close();
738             // this.count = BUFFER_SIZE; // fail soon in case a new write request is received
739         }
740 
741         private void flushBuffer() throws IOException {
742             if (this.count > 0) {
743                 this.writer.write(this.buffer, 0, this.count);
744                 this.count = 0;
745             }
746         }
747 
748     }
749 
750     private static final class ParallelBufferedReader extends Reader {
751 
752         private Fetcher fetcher;
753 
754         private final List<CharBuffer> buffers;
755 
756         private int index;
757 
758         private char[] buffer;
759 
760         private int count;
761 
762         private int pos;
763 
764         private boolean closed;
765 
766         ParallelBufferedReader(final Reader reader, final char delimiter) {
767             this.fetcher = Fetcher.forReader(reader, delimiter);
768             this.buffers = new ArrayList<CharBuffer>();
769             this.index = 0;
770             this.buffer = null;
771             this.count = 0;
772             this.pos = 0;
773             this.closed = false;
774             this.fetcher.open();
775         }
776 
777         @Override
778         public int read() throws IOException {
779             if (this.pos >= this.count) {
780                 fill();
781                 if (this.count == 0) {
782                     return -1;
783                 }
784             }
785             return this.buffer[this.pos++] & 0xFFFF;
786         }
787 
788         @Override
789         public int read(final char[] cbuf, final int off, final int len) throws IOException {
790             if ((off | len | off + len | cbuf.length - (off + len)) < 0) {
791                 throw new IndexOutOfBoundsException();
792             }
793             if (len == 0) {
794                 checkNotClosed();
795                 return 0;
796             }
797             final int available = this.count - this.pos;
798             if (available == 0) {
799                 fill();
800                 if (this.count == 0) {
801                     return -1;
802                 }
803             }
804             final int n = available > len ? len : available;
805             System.arraycopy(this.buffer, this.pos, cbuf, off, n);
806             this.pos += n;
807             return n;
808         }
809 
810         @Override
811         public long skip(final long n) throws IOException {
812             if (n <= 0) {
813                 checkNotClosed();
814                 return 0;
815             }
816             int available = this.count - this.pos;
817             if (available == 0) {
818                 fill();
819                 available = this.count;
820             }
821             final long skipped = available < n ? available : n;
822             this.pos += skipped;
823             return skipped;
824         }
825 
826         @Override
827         public void reset() throws IOException {
828             throw new IOException("Mark not supported");
829         }
830 
831         @Override
832         public void mark(final int readlimit) {
833         }
834 
835         @Override
836         public boolean markSupported() {
837             return false;
838         }
839 
840         @Override
841         public void close() throws IOException {
842             synchronized (this.buffers) {
843                 if (this.closed) {
844                     return;
845                 }
846                 this.closed = true;
847             }
848             this.count = this.pos;
849             this.buffers.clear();
850             this.fetcher.close();
851             this.fetcher = null;
852         }
853 
854         private void fill() throws IOException {
855             checkNotClosed();
856             if (this.buffer != null) {
857                 this.buffer = null;
858                 this.pos = 0;
859                 this.count = 0;
860             }
861             if (this.index == this.buffers.size()) {
862                 this.fetcher.fetch(this.buffers);
863                 this.index = 0;
864             }
865             if (this.index < this.buffers.size()) {
866                 final CharBuffer cb = this.buffers.get(this.index++);
867                 this.buffer = cb.array();
868                 this.count = cb.limit();
869             }
870         }
871 
872         private void checkNotClosed() throws IOException {
873             if (this.closed) {
874                 throw new IOException("Reader has been closed");
875             }
876         }
877 
878         private static final class Fetcher implements Runnable {
879 
880             private static final Map<Reader, Fetcher> FETCHERS = new WeakHashMap<Reader, Fetcher>();
881 
882             private static final Object EOF = new Object();
883 
884             private final BlockingQueue<Object> queue;
885 
886             private Reader reader;
887 
888             private final char delimiter;
889 
890             private final List<CharBuffer> buffers;
891 
892             private int references;
893 
894             private Throwable exception;
895 
896             private final CountDownLatch latch;
897 
898             private Fetcher(final Reader reader, final char delimiter) {
899                 this.queue = new ArrayBlockingQueue<Object>(BUFFER_NUM_READ, false);
900                 this.reader = reader;
901                 this.delimiter = delimiter;
902                 this.buffers = new ArrayList<CharBuffer>();
903                 this.references = 0;
904                 this.exception = null;
905                 this.latch = new CountDownLatch(1);
906 
907                 Environment.getPool().submit(this);
908             }
909 
910             private void release(final CharBuffer buffer) {
911                 synchronized (this.buffers) {
912                     if (this.buffers.size() < BUFFER_NUM_READ + Environment.getCores() + 1) {
913                         buffer.clear();
914                         this.buffers.add(buffer);
915                     }
916                 }
917             }
918 
919             private CharBuffer allocate() {
920                 synchronized (this.buffers) {
921                     if (!this.buffers.isEmpty()) {
922                         return this.buffers.remove(this.buffers.size() - 1);
923                     }
924                 }
925                 return CharBuffer.allocate(BUFFER_SIZE);
926             }
927 
928             public void open() {
929                 synchronized (this) {
930                     if (this.references < 0) {
931                         throw new IllegalStateException("Reader has been closed");
932                     }
933                     ++this.references;
934                 }
935             }
936 
937             public void close() throws IOException {
938                 synchronized (this) {
939                     --this.references;
940                     if (this.references != 0) {
941                         return;
942                     }
943                     this.references = -1; // prevent further open() to occur
944                 }
945                 this.queue.clear(); // nobody will use queued buffers
946                 while (true) {
947                     try {
948                         this.latch.await();
949                         break;
950                     } catch (final InterruptedException ex) {
951                         // ignore
952                     }
953                 }
954                 synchronized (FETCHERS) {
955                     FETCHERS.remove(this.reader);
956                 }
957                 this.queue.clear();
958                 this.buffers.clear();
959                 this.reader = null; // may be heavyweight, better to release immediately
960                 synchronized (this) {
961                     if (this.exception != null) {
962                         propagate(this.exception);
963                     }
964                 }
965             }
966 
967             @SuppressWarnings("unchecked")
968             public void fetch(final List<CharBuffer> buffers) throws IOException {
969                 try {
970                     synchronized (this) {
971                         if (this.exception != null) {
972                             throw this.exception;
973                         }
974                     }
975                     for (final CharBuffer buffer : buffers) {
976                         release(buffer);
977                     }
978                     buffers.clear();
979                     final Object object = this.queue.take();
980                     if (object == EOF) {
981                         this.queue.add(EOF);
982                         return;
983                     }
984                     buffers.addAll((List<CharBuffer>) object);
985                 } catch (IOException | RuntimeException | Error ex) {
986                     throw ex;
987                 } catch (final Throwable ex) {
988                     throw new IOException(ex);
989                 }
990             }
991 
992             @Override
993             public void run() {
994 
995                 try {
996                     CharBuffer restBuffer = allocate();
997                     List<CharBuffer> buffers = new ArrayList<CharBuffer>();
998 
999                     boolean eof = false;
1000                     while (!eof) {
1001 
1002                         synchronized (this) {
1003                             if (this.references < 0) {
1004                                 break;
1005                             }
1006                         }
1007 
1008                         final CharBuffer curBuffer = restBuffer;
1009                         while (!eof && curBuffer.hasRemaining()) {
1010                             final int n = this.reader.read(curBuffer);
1011                             eof = n < 0;
1012                         }
1013                         curBuffer.flip();
1014                         buffers.add(curBuffer);
1015 
1016                         restBuffer = allocate();
1017                         if (!eof) {
1018                             final char[] curChars = curBuffer.array();
1019                             final int curLastIndex = curBuffer.limit() - 1;
1020                             for (int i = curLastIndex; i >= 0; --i) {
1021                                 if (curChars[i] == this.delimiter) {
1022                                     restBuffer.position(curLastIndex - i);
1023                                     System.arraycopy(curChars, i + 1, restBuffer.array(), 0,
1024                                             restBuffer.position());
1025                                     curBuffer.limit(i + 1);
1026                                     this.queue.put(buffers);
1027                                     buffers = new ArrayList<CharBuffer>();
1028                                     break;
1029                                 }
1030                             }
1031                         }
1032                     }
1033 
1034                     this.queue.put(buffers);
1035 
1036                 } catch (final Throwable ex) {
1037                     synchronized (this) {
1038                         this.exception = ex;
1039                     }
1040                 }
1041 
1042                 try {
1043                     closeQuietly(this.reader);
1044 
1045                     while (true) {
1046                         try {
1047                             this.queue.put(EOF);
1048                             break;
1049                         } catch (final InterruptedException ex) {
1050                             // ignore
1051                         }
1052                     }
1053                 } finally {
1054                     this.latch.countDown();
1055                 }
1056             }
1057 
1058             public static Fetcher forReader(final Reader reader, final char delimiter) {
1059                 synchronized (FETCHERS) {
1060                     Fetcher fetcher = FETCHERS.get(reader);
1061                     if (fetcher == null) {
1062                         fetcher = new Fetcher(reader, delimiter);
1063                         FETCHERS.put(reader, fetcher);
1064                     } else if (fetcher.delimiter != delimiter) {
1065                         throw new IllegalStateException("Already reading from reader " + reader
1066                                 + " using delimiter " + delimiter);
1067                     }
1068                     return fetcher;
1069                 }
1070             }
1071 
1072         }
1073 
1074     }
1075 
1076     private static final class ParallelBufferedWriter extends Writer {
1077 
1078         private Emitter emitter;
1079 
1080         private final char delimiter;
1081 
1082         private final List<CharBuffer> buffers;
1083 
1084         private char[] buffer;
1085 
1086         private int count; // from 0 to BUFFER_SIZE
1087 
1088         private int threshold;
1089 
1090         private boolean closed;
1091 
1092         ParallelBufferedWriter(final Writer writer, final char delimiter) {
1093             this.emitter = Emitter.forWriter(writer);
1094             this.delimiter = delimiter;
1095             this.buffers = new ArrayList<CharBuffer>();
1096             this.buffer = new char[2 * BUFFER_SIZE];
1097             this.count = 0;
1098             this.threshold = BUFFER_SIZE;
1099             this.closed = false;
1100             this.emitter.open();
1101         }
1102 
1103         @Override
1104         public void write(final int c) throws IOException {
1105             if (this.count < this.threshold) {
1106                 this.buffer[this.count++] = (char) c;
1107             } else {
1108                 writeAndTryFlush((char) c);
1109             }
1110         }
1111 
1112         @Override
1113         public void write(final char[] cbuf, int off, int len) throws IOException {
1114             final int available = this.threshold - this.count;
1115             if (available >= len) {
1116                 System.arraycopy(cbuf, off, this.buffer, this.count, len);
1117                 this.count += len;
1118                 return;
1119             }
1120             if (available > 0) {
1121                 System.arraycopy(cbuf, off, this.buffer, this.count, available);
1122                 this.count += available;
1123                 off += available;
1124                 len -= available;
1125             }
1126             final int end = off + len;
1127             while (off < end) {
1128                 writeAndTryFlush(cbuf[off++]);
1129             }
1130         }
1131 
1132         @Override
1133         public void write(final String str, int off, int len) throws IOException {
1134             final int available = this.threshold - this.count;
1135             final int end = off + len;
1136             if (available >= len) {
1137                 str.getChars(off, end, this.buffer, this.count);
1138                 this.count += len;
1139                 return;
1140             }
1141             if (available > 0) {
1142                 str.getChars(off, off + available, this.buffer, this.count);
1143                 this.count += available;
1144                 off += available;
1145                 len -= available;
1146             }
1147             while (off < end) {
1148                 writeAndTryFlush(str.charAt(off++));
1149             }
1150         }
1151 
1152         @Override
1153         public void flush() throws IOException {
1154             flushBuffers();
1155         }
1156 
1157         @Override
1158         public void close() throws IOException {
1159             synchronized (this.buffers) {
1160                 if (this.closed) {
1161                     return;
1162                 }
1163                 flushBuffers();
1164                 this.closed = true;
1165             }
1166             this.buffers.clear();
1167             this.buffer = null;
1168             this.emitter.close();
1169             this.emitter = null;
1170         }
1171 
1172         private void writeAndTryFlush(final char c) throws IOException {
1173             this.buffer[this.count++] = c;
1174             if (c == this.delimiter) {
1175                 flushBuffers();
1176             } else if (this.count == this.buffer.length) {
1177                 checkNotClosed();
1178                 this.buffers.add(CharBuffer.wrap(this.buffer));
1179                 this.buffer = new char[BUFFER_SIZE];
1180                 this.count = 0;
1181                 this.threshold = 0;
1182             }
1183         }
1184 
1185         private void flushBuffers() throws IOException {
1186             checkNotClosed();
1187             if (this.count > 0) {
1188                 final CharBuffer cb = CharBuffer.wrap(this.buffer);
1189                 cb.limit(this.count);
1190                 this.buffers.add(cb);
1191             }
1192             this.emitter.emit(this.buffers);
1193             if (!this.buffers.isEmpty()) {
1194                 this.buffer = this.buffers.get(0).array();
1195                 this.buffers.clear();
1196             }
1197             this.count = 0;
1198             this.threshold = BUFFER_SIZE;
1199         }
1200 
1201         private void checkNotClosed() throws IOException {
1202             if (this.closed) {
1203                 throw new IOException("Writer has been closed");
1204             }
1205         }
1206 
1207         private static class Emitter implements Runnable {
1208 
1209             private static final Map<Writer, Emitter> EMITTERS = new WeakHashMap<Writer, Emitter>();
1210 
1211             private static final Object EOF = new Object();
1212 
1213             private final BlockingQueue<Object> queue;
1214 
1215             private final List<CharBuffer> buffers;
1216 
1217             private Writer writer;
1218 
1219             private int references;
1220 
1221             private Throwable exception;
1222 
1223             private final CountDownLatch latch;
1224 
1225             private Emitter(final Writer writer) {
1226                 this.queue = new ArrayBlockingQueue<Object>(BUFFER_NUM_WRITE, false);
1227                 this.writer = writer;
1228                 this.buffers = new ArrayList<CharBuffer>();
1229                 this.references = 0;
1230                 this.exception = null;
1231                 this.latch = new CountDownLatch(1);
1232                 Environment.getPool().submit(this);
1233             }
1234 
1235             private void release(final CharBuffer buffer) {
1236                 synchronized (this.buffers) {
1237                     if (this.buffers.size() < BUFFER_NUM_WRITE + Environment.getCores() + 1) {
1238                         buffer.clear();
1239                         this.buffers.add(buffer);
1240                     }
1241                 }
1242             }
1243 
1244             private CharBuffer allocate() {
1245                 synchronized (this.buffers) {
1246                     if (!this.buffers.isEmpty()) {
1247                         return this.buffers.remove(this.buffers.size() - 1);
1248                     }
1249                 }
1250                 return CharBuffer.allocate(2 * BUFFER_SIZE);
1251             }
1252 
1253             public void open() {
1254                 synchronized (this) {
1255                     if (this.references < 0) {
1256                         throw new IllegalStateException("Stream has been closed");
1257                     }
1258                     ++this.references;
1259                 }
1260             }
1261 
1262             public void close() throws IOException {
1263                 synchronized (this) {
1264                     --this.references;
1265                     if (this.references != 0) {
1266                         return;
1267                     }
1268                     this.references = -1; // prevent further open() to occur
1269                 }
1270                 while (true) {
1271                     try {
1272                         this.queue.put(EOF);
1273                         break;
1274                     } catch (final InterruptedException ex) {
1275                         // ignore
1276                     }
1277                 }
1278                 while (true) {
1279                     try {
1280                         this.latch.await();
1281                         break;
1282                     } catch (final InterruptedException ex) {
1283                         // ignore
1284                     }
1285                 }
1286                 synchronized (EMITTERS) {
1287                     EMITTERS.remove(this.writer);
1288                 }
1289                 this.queue.clear();
1290                 this.buffers.clear();
1291                 this.writer = null; // may be heavyweight, better to release immediately
1292                 synchronized (this) {
1293                     if (this.exception != null) {
1294                         propagate(this.exception);
1295                     }
1296                 }
1297             }
1298 
1299             public void emit(final List<CharBuffer> buffers) throws IOException {
1300                 try {
1301                     synchronized (this) {
1302                         if (this.exception != null) {
1303                             throw this.exception;
1304                         }
1305                     }
1306                     this.queue.put(new ArrayList<CharBuffer>(buffers));
1307                     buffers.clear();
1308                     buffers.add(allocate());
1309                 } catch (IOException | RuntimeException | Error ex) {
1310                     throw ex;
1311                 } catch (final Throwable ex) {
1312                     throw new IOException(ex);
1313                 }
1314             }
1315 
1316             @SuppressWarnings("unchecked")
1317             @Override
1318             public void run() {
1319                 try {
1320                     while (true) {
1321                         final Object object = this.queue.take();
1322                         if (object == EOF) {
1323                             break;
1324                         }
1325                         final List<CharBuffer> buffers = (List<CharBuffer>) object;
1326                         for (final CharBuffer buffer : buffers) {
1327                             this.writer.write(buffer.array(), buffer.position(), buffer.limit());
1328                         }
1329                         if (!buffers.isEmpty()) {
1330                             release(buffers.get(0));
1331                         }
1332                     }
1333                 } catch (final Throwable ex) {
1334                     synchronized (this) {
1335                         this.exception = ex;
1336                     }
1337                     this.queue.clear();
1338                 } finally {
1339                     closeQuietly(this.writer);
1340                     this.latch.countDown();
1341                 }
1342             }
1343 
1344             public static Emitter forWriter(final Writer writer) {
1345                 synchronized (EMITTERS) {
1346                     Emitter manager = EMITTERS.get(writer);
1347                     if (manager == null) {
1348                         manager = new Emitter(writer);
1349                         EMITTERS.put(writer, manager);
1350                     }
1351                     return manager;
1352                 }
1353             }
1354 
1355         }
1356 
1357     }
1358 
1359     private static final class ParallelBufferedInputStream extends InputStream {
1360 
1361         private Fetcher fetcher;
1362 
1363         private final List<ByteBuffer> buffers;
1364 
1365         private int index;
1366 
1367         private byte[] buffer;
1368 
1369         private int count;
1370 
1371         private int pos;
1372 
1373         private boolean closed;
1374 
1375         ParallelBufferedInputStream(final InputStream stream, final byte delimiter) {
1376             this.fetcher = Fetcher.forStream(stream, delimiter);
1377             this.buffers = new ArrayList<ByteBuffer>();
1378             this.index = 0;
1379             this.buffer = null;
1380             this.count = 0;
1381             this.pos = 0;
1382             this.closed = false;
1383             this.fetcher.open();
1384         }
1385 
1386         @Override
1387         public int read() throws IOException {
1388             if (this.pos >= this.count) {
1389                 fill();
1390                 if (this.count == 0) {
1391                     return -1;
1392                 }
1393             }
1394             return this.buffer[this.pos++] & 0xFF;
1395         }
1396 
1397         @Override
1398         public int read(final byte[] buf, final int off, final int len) throws IOException {
1399             if ((off | len | off + len | buf.length - (off + len)) < 0) {
1400                 throw new IndexOutOfBoundsException();
1401             }
1402             if (len == 0) {
1403                 checkNotClosed();
1404                 return 0;
1405             }
1406             final int available = this.count - this.pos;
1407             if (available == 0) {
1408                 fill();
1409                 if (this.count == 0) {
1410                     return -1;
1411                 }
1412             }
1413             final int n = available > len ? len : available;
1414             System.arraycopy(this.buffer, this.pos, buf, off, n);
1415             this.pos += n;
1416             return n;
1417         }
1418 
1419         @Override
1420         public long skip(final long n) throws IOException {
1421             if (n <= 0) {
1422                 checkNotClosed();
1423                 return 0;
1424             }
1425             int available = this.count - this.pos;
1426             if (available == 0) {
1427                 fill();
1428                 available = this.count;
1429             }
1430             final long skipped = available < n ? available : n;
1431             this.pos += skipped;
1432             return skipped;
1433         }
1434 
1435         @Override
1436         public void reset() throws IOException {
1437             throw new IOException("Mark not supported");
1438         }
1439 
1440         @Override
1441         public void mark(final int readlimit) {
1442         }
1443 
1444         @Override
1445         public boolean markSupported() {
1446             return false;
1447         }
1448 
1449         @Override
1450         public void close() throws IOException {
1451             synchronized (this.buffers) {
1452                 if (this.closed) {
1453                     return;
1454                 }
1455                 this.closed = true;
1456             }
1457             this.count = this.pos;
1458             this.buffers.clear();
1459             this.fetcher.close();
1460             this.fetcher = null;
1461         }
1462 
1463         private void fill() throws IOException {
1464             checkNotClosed();
1465             if (this.buffer != null) {
1466                 this.buffer = null;
1467                 this.pos = 0;
1468                 this.count = 0;
1469             }
1470             if (this.index == this.buffers.size()) {
1471                 this.fetcher.fetch(this.buffers);
1472                 this.index = 0;
1473             }
1474             if (this.index < this.buffers.size()) {
1475                 final ByteBuffer buffer = this.buffers.get(this.index++);
1476                 this.buffer = buffer.array();
1477                 this.count = buffer.limit();
1478             }
1479         }
1480 
1481         private void checkNotClosed() throws IOException {
1482             if (this.closed) {
1483                 throw new IOException("Stream has been closed");
1484             }
1485         }
1486 
1487         private static final class Fetcher implements Runnable {
1488 
1489             private static final Map<InputStream, Fetcher> FETCHERS = new WeakHashMap<InputStream, Fetcher>();
1490 
1491             private static final Object EOF = new Object();
1492 
1493             private final BlockingQueue<Object> queue;
1494 
1495             private InputStream stream;
1496 
1497             private final byte delimiter;
1498 
1499             private final List<ByteBuffer> buffers;
1500 
1501             private int references;
1502 
1503             private Throwable exception;
1504 
1505             private final CountDownLatch latch;
1506 
1507             private Fetcher(final InputStream stream, final byte delimiter) {
1508                 this.queue = new ArrayBlockingQueue<Object>(BUFFER_NUM_READ, false);
1509                 this.stream = stream;
1510                 this.delimiter = delimiter;
1511                 this.buffers = new ArrayList<ByteBuffer>();
1512                 this.references = 0;
1513                 this.exception = null;
1514                 this.latch = new CountDownLatch(1);
1515 
1516                 Environment.getPool().submit(this);
1517             }
1518 
1519             private void release(final ByteBuffer buffer) {
1520                 synchronized (this.buffers) {
1521                     if (this.buffers.size() < BUFFER_NUM_READ + Environment.getCores() + 1) {
1522                         buffer.clear();
1523                         this.buffers.add(buffer);
1524                     }
1525                 }
1526             }
1527 
1528             private ByteBuffer allocate() {
1529                 synchronized (this.buffers) {
1530                     if (!this.buffers.isEmpty()) {
1531                         return this.buffers.remove(this.buffers.size() - 1);
1532                     }
1533                 }
1534                 return ByteBuffer.allocate(2 * BUFFER_SIZE);
1535             }
1536 
1537             public void open() {
1538                 synchronized (this) {
1539                     if (this.references < 0) {
1540                         throw new IllegalStateException("Reader has been closed");
1541                     }
1542                     ++this.references;
1543                 }
1544             }
1545 
1546             public void close() throws IOException {
1547                 synchronized (this) {
1548                     --this.references;
1549                     if (this.references != 0) {
1550                         return;
1551                     }
1552                     this.references = -1; // prevent further open() to occur
1553                 }
1554                 this.queue.clear(); // nobody will use queued buffers
1555                 while (true) {
1556                     try {
1557                         this.latch.await();
1558                         break;
1559                     } catch (final InterruptedException ex) {
1560                         // ignore
1561                     }
1562                 }
1563                 synchronized (FETCHERS) {
1564                     FETCHERS.remove(this.stream);
1565                 }
1566                 this.queue.clear();
1567                 this.buffers.clear();
1568                 this.stream = null; // may be heavyweight, better to release immediately
1569                 synchronized (this) {
1570                     if (this.exception != null) {
1571                         propagate(this.exception);
1572                     }
1573                 }
1574             }
1575 
1576             @SuppressWarnings("unchecked")
1577             public void fetch(final List<ByteBuffer> buffers) throws IOException {
1578                 try {
1579                     synchronized (this) {
1580                         if (this.exception != null) {
1581                             throw this.exception;
1582                         }
1583                     }
1584                     for (final ByteBuffer buffer : buffers) {
1585                         release(buffer);
1586                     }
1587                     buffers.clear();
1588                     final Object object = this.queue.take();
1589                     if (object == EOF) {
1590                         this.queue.add(EOF);
1591                         return;
1592                     }
1593                     buffers.addAll((List<ByteBuffer>) object);
1594                 } catch (IOException | RuntimeException | Error ex) {
1595                     throw ex;
1596                 } catch (final Throwable ex) {
1597                     throw new IOException(ex);
1598                 }
1599             }
1600 
1601             @Override
1602             public void run() {
1603 
1604                 try {
1605                     ByteBuffer restBuffer = allocate();
1606                     List<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
1607 
1608                     boolean eof = false;
1609                     while (!eof) {
1610 
1611                         synchronized (this) {
1612                             if (this.references < 0) {
1613                                 break;
1614                             }
1615                         }
1616 
1617                         final ByteBuffer curBuffer = restBuffer;
1618                         final byte[] array = curBuffer.array();
1619 
1620                         while (!eof && curBuffer.hasRemaining()) {
1621                             final int offset = curBuffer.position();
1622                             final int len = curBuffer.remaining();
1623                             final int n = this.stream.read(array, offset, len);
1624                             eof = n < 0;
1625                             if (!eof) {
1626                                 curBuffer.position(offset + n);
1627                             }
1628                         }
1629 
1630                         curBuffer.flip();
1631                         buffers.add(curBuffer);
1632 
1633                         restBuffer = allocate();
1634                         if (!eof) {
1635                             final int curLastIndex = curBuffer.limit() - 1;
1636                             for (int i = curLastIndex; i >= 0; --i) {
1637                                 if (array[i] == this.delimiter) {
1638                                     restBuffer.position(curLastIndex - i);
1639                                     System.arraycopy(array, i + 1, restBuffer.array(), 0,
1640                                             restBuffer.position());
1641                                     curBuffer.limit(i + 1);
1642                                     this.queue.put(buffers);
1643                                     buffers = new ArrayList<ByteBuffer>();
1644                                     break;
1645                                 }
1646                             }
1647                         }
1648                     }
1649 
1650                     this.queue.put(buffers);
1651 
1652                 } catch (final Throwable ex) {
1653                     synchronized (this) {
1654                         this.exception = ex;
1655                     }
1656                 }
1657 
1658                 try {
1659                     closeQuietly(this.stream);
1660 
1661                     while (true) {
1662                         try {
1663                             this.queue.put(EOF);
1664                             break;
1665                         } catch (final InterruptedException ex) {
1666                             // ignore
1667                         }
1668                     }
1669                 } finally {
1670                     this.latch.countDown();
1671                 }
1672             }
1673 
1674             public static Fetcher forStream(final InputStream stream, final byte delimiter) {
1675                 synchronized (FETCHERS) {
1676                     Fetcher fetcher = FETCHERS.get(stream);
1677                     if (fetcher == null) {
1678                         fetcher = new Fetcher(stream, delimiter);
1679                         FETCHERS.put(stream, fetcher);
1680                     } else if (fetcher.delimiter != delimiter) {
1681                         throw new IllegalStateException("Already reading from stream " + stream
1682                                 + " using delimiter " + delimiter);
1683                     }
1684                     return fetcher;
1685                 }
1686             }
1687 
1688         }
1689 
1690     }
1691 
1692     private static final class ParallelBufferedOutputStream extends OutputStream {
1693 
1694         private Emitter emitter;
1695 
1696         private final byte delimiter;
1697 
1698         private final List<ByteBuffer> buffers;
1699 
1700         private byte[] buffer;
1701 
1702         private int count; // from 0 to BUFFER_SIZE
1703 
1704         private int threshold;
1705 
1706         private boolean closed;
1707 
1708         ParallelBufferedOutputStream(final OutputStream stream, final byte delimiter) {
1709             this.emitter = Emitter.forStream(stream);
1710             this.delimiter = delimiter;
1711             this.buffers = new ArrayList<ByteBuffer>();
1712             this.buffer = new byte[2 * BUFFER_SIZE];
1713             this.count = 0;
1714             this.threshold = BUFFER_SIZE;
1715             this.closed = false;
1716             this.emitter.open();
1717         }
1718 
1719         @Override
1720         public void write(final int c) throws IOException {
1721             if (this.count < this.threshold) {
1722                 this.buffer[this.count++] = (byte) c;
1723             } else {
1724                 writeAndTryFlush((byte) c);
1725             }
1726         }
1727 
1728         @Override
1729         public void write(final byte[] buf, int off, int len) throws IOException {
1730             final int available = this.threshold - this.count;
1731             if (available >= len) {
1732                 System.arraycopy(buf, off, this.buffer, this.count, len);
1733                 this.count += len;
1734                 return;
1735             }
1736             if (available > 0) {
1737                 System.arraycopy(buf, off, this.buffer, this.count, available);
1738                 this.count += available;
1739                 off += available;
1740                 len -= available;
1741             }
1742             final int end = off + len;
1743             while (off < end) {
1744                 writeAndTryFlush(buf[off++]);
1745             }
1746         }
1747 
1748         @Override
1749         public void flush() throws IOException {
1750             flushBuffers();
1751         }
1752 
1753         @Override
1754         public void close() throws IOException {
1755             synchronized (this.buffers) {
1756                 if (this.closed) {
1757                     return;
1758                 }
1759                 flushBuffers();
1760                 this.closed = true;
1761             }
1762             this.buffers.clear();
1763             this.buffer = null;
1764             this.emitter.close();
1765             this.emitter = null;
1766         }
1767 
1768         private void writeAndTryFlush(final byte c) throws IOException {
1769             this.buffer[this.count++] = c;
1770             if (c == this.delimiter) {
1771                 flushBuffers();
1772             } else if (this.count == this.buffer.length) {
1773                 checkNotClosed();
1774                 this.buffers.add(ByteBuffer.wrap(this.buffer));
1775                 this.buffer = new byte[BUFFER_SIZE];
1776                 this.count = 0;
1777                 this.threshold = 0;
1778             }
1779         }
1780 
1781         private void flushBuffers() throws IOException {
1782             checkNotClosed();
1783             if (this.count > 0) {
1784                 final ByteBuffer buffer = ByteBuffer.wrap(this.buffer);
1785                 buffer.limit(this.count);
1786                 this.buffers.add(buffer);
1787             }
1788             this.emitter.emit(this.buffers);
1789             if (!this.buffers.isEmpty()) {
1790                 this.buffer = this.buffers.get(0).array();
1791                 this.buffers.clear();
1792             }
1793             this.count = 0;
1794             this.threshold = BUFFER_SIZE;
1795         }
1796 
1797         private void checkNotClosed() throws IOException {
1798             if (this.closed) {
1799                 throw new IOException("Writer has been closed");
1800             }
1801         }
1802 
1803         private static class Emitter implements Runnable {
1804 
1805             private static final Map<OutputStream, Emitter> EMITTERS = new WeakHashMap<OutputStream, Emitter>();
1806 
1807             private static final Object EOF = new Object();
1808 
1809             private final BlockingQueue<Object> queue;
1810 
1811             private final List<ByteBuffer> buffers;
1812 
1813             private OutputStream stream;
1814 
1815             private int references;
1816 
1817             private Throwable exception;
1818 
1819             private final CountDownLatch latch;
1820 
1821             private Emitter(final OutputStream stream) {
1822                 this.queue = new ArrayBlockingQueue<Object>(BUFFER_NUM_WRITE, false);
1823                 this.stream = stream;
1824                 this.buffers = new ArrayList<ByteBuffer>();
1825                 this.references = 0;
1826                 this.exception = null;
1827                 this.latch = new CountDownLatch(1);
1828                 Environment.getPool().submit(this);
1829             }
1830 
1831             private void release(final ByteBuffer buffer) {
1832                 synchronized (this.buffers) {
1833                     if (this.buffers.size() < BUFFER_NUM_WRITE + Environment.getCores() + 1) {
1834                         buffer.clear();
1835                         this.buffers.add(buffer);
1836                     }
1837                 }
1838             }
1839 
1840             private ByteBuffer allocate() {
1841                 synchronized (this.buffers) {
1842                     if (!this.buffers.isEmpty()) {
1843                         return this.buffers.remove(this.buffers.size() - 1);
1844                     }
1845                 }
1846                 return ByteBuffer.allocate(2 * BUFFER_SIZE);
1847             }
1848 
1849             public void open() {
1850                 synchronized (this) {
1851                     if (this.references < 0) {
1852                         throw new IllegalStateException("Stream has been closed");
1853                     }
1854                     ++this.references;
1855                 }
1856             }
1857 
1858             public void close() throws IOException {
1859                 synchronized (this) {
1860                     --this.references;
1861                     if (this.references != 0) {
1862                         return;
1863                     }
1864                     this.references = -1; // prevent further open() to occur
1865                 }
1866                 while (true) {
1867                     try {
1868                         this.queue.put(EOF);
1869                         break;
1870                     } catch (final InterruptedException ex) {
1871                         // ignore
1872                     }
1873                 }
1874                 while (true) {
1875                     try {
1876                         this.latch.await();
1877                         break;
1878                     } catch (final InterruptedException ex) {
1879                         // ignore
1880                     }
1881                 }
1882                 synchronized (EMITTERS) {
1883                     EMITTERS.remove(this.stream);
1884                 }
1885                 this.queue.clear();
1886                 this.buffers.clear();
1887                 this.stream = null; // may be heavyweight, better to release immediately
1888                 synchronized (this) {
1889                     if (this.exception != null) {
1890                         propagate(this.exception);
1891                     }
1892                 }
1893             }
1894 
1895             public void emit(final List<ByteBuffer> buffers) throws IOException {
1896                 try {
1897                     synchronized (this) {
1898                         if (this.exception != null) {
1899                             throw this.exception;
1900                         }
1901                     }
1902                     this.queue.put(new ArrayList<ByteBuffer>(buffers));
1903                     buffers.clear();
1904                     buffers.add(allocate());
1905                 } catch (final Throwable ex) {
1906                     propagate(ex);
1907                 }
1908             }
1909 
1910             @SuppressWarnings("unchecked")
1911             @Override
1912             public void run() {
1913                 try {
1914                     while (true) {
1915                         final Object object = this.queue.take();
1916                         if (object == EOF) {
1917                             break;
1918                         }
1919                         final List<ByteBuffer> buffers = (List<ByteBuffer>) object;
1920                         for (final ByteBuffer buffer : buffers) {
1921                             this.stream.write(buffer.array(), buffer.position(), buffer.limit());
1922                         }
1923                         if (!buffers.isEmpty()) {
1924                             release(buffers.get(0));
1925                         }
1926                     }
1927                 } catch (final Throwable ex) {
1928                     synchronized (this) {
1929                         this.exception = ex;
1930                     }
1931                     this.queue.clear();
1932                 } finally {
1933                     closeQuietly(this.stream);
1934                     this.latch.countDown();
1935                 }
1936             }
1937 
1938             public static Emitter forStream(final OutputStream stream) {
1939                 synchronized (EMITTERS) {
1940                     Emitter emitter = EMITTERS.get(stream);
1941                     if (emitter == null) {
1942                         emitter = new Emitter(stream);
1943                         EMITTERS.put(stream, emitter);
1944                     }
1945                     return emitter;
1946                 }
1947             }
1948 
1949         }
1950 
1951     }
1952 
1953     private static final class UTF8Reader extends Reader {
1954 
1955         private final InputStream stream;
1956 
1957         private boolean closed;
1958 
1959         public UTF8Reader(final InputStream stream) {
1960             this.stream = stream;
1961             this.closed = false;
1962         }
1963 
1964         @Override
1965         public int read() throws IOException {
1966             final int b0 = this.stream.read();
1967             return (b0 & 0xFFFFFF80) == 0 ? b0 : readHelper(b0);
1968         }
1969 
1970         private int readHelper(final int b0) throws IOException {
1971 
1972             if (b0 < 0) { // EOF
1973                 return -1;
1974 
1975             } else if (b0 <= 0b11011111) { // 110xxxxx 10xxxxxx
1976                 final int b1 = this.stream.read();
1977                 if ((b1 & 0b11000000) == 0b10000000) {
1978                     return (b0 & 0b00011111) << 6 | b1 & 0b00111111;
1979                 }
1980 
1981             } else if (b0 <= 0b11101111) { // 1110xxxx 10xxxxxx 10xxxxxx
1982                 final int b1 = this.stream.read();
1983                 final int b2 = this.stream.read();
1984                 if ((b1 & 0b11000000) == 0b10000000 && (b2 & 0b11000000) == 0b10000000) {
1985                     return (b0 & 0b00001111) << 12 | (b1 & 0b00111111) << 6 | b2 & 0b00111111;
1986                 }
1987 
1988             } else if (b0 <= 0b11110111) { // 11110xxx 10xxxxxx 10xxxxxx 10xxxxxx
1989                 final int b1 = this.stream.read();
1990                 final int b2 = this.stream.read();
1991                 final int b3 = this.stream.read();
1992                 if ((b1 & 0b11000000) == 0b10000000 && (b2 & 0b11000000) == 0b10000000
1993                         && (b3 & 0b11000000) == 0b10000000) {
1994                     return (b0 & 0b00000111) << 18 | (b1 & 0b00111111) << 12
1995                             | (b2 & 0b00111111) << 6 | b3 & 0b00111111;
1996                 }
1997             }
1998 
1999             throw new IOException("Invalid/truncated UTF8 code");
2000         }
2001 
2002         @Override
2003         public int read(final char[] buf, final int off, final int len) throws IOException {
2004             if ((off | len | off + len | buf.length - (off + len)) < 0) {
2005                 throw new IndexOutOfBoundsException();
2006             }
2007             if (len == 0) {
2008                 checkNotClosed();
2009                 return 0;
2010             }
2011             int index = off;
2012             int c = read();
2013             if (c < 0) {
2014                 return -1;
2015             }
2016             buf[index++] = (char) c;
2017             final int end = off + Math.min(len, this.stream.available() / 2);
2018             while (index < end) {
2019                 c = read();
2020                 if (c < 0) {
2021                     break;
2022                 }
2023                 buf[index++] = (char) c;
2024             }
2025             return index - off;
2026         }
2027 
2028         @Override
2029         public long skip(final long n) throws IOException {
2030             if (n == 0L) {
2031                 checkNotClosed();
2032                 return 0L;
2033             }
2034             final int skippable = this.stream.available() / 2;
2035             int toSkip = skippable;
2036             do {
2037                 final int c = read();
2038                 if (c < 0) {
2039                     break;
2040                 }
2041                 --toSkip;
2042             } while (toSkip > 0);
2043             return skippable - toSkip;
2044         }
2045 
2046         @Override
2047         public boolean ready() throws IOException {
2048             return this.stream.available() >= 4;
2049         }
2050 
2051         @Override
2052         public void reset() throws IOException {
2053             throw new IOException("Mark not supported");
2054         }
2055 
2056         @Override
2057         public void mark(final int readlimit) {
2058         }
2059 
2060         @Override
2061         public boolean markSupported() {
2062             return false;
2063         }
2064 
2065         @Override
2066         public void close() throws IOException {
2067             synchronized (this) {
2068                 if (this.closed) {
2069                     return;
2070                 }
2071                 this.closed = true;
2072             }
2073             this.stream.close();
2074         }
2075 
2076         private void checkNotClosed() throws IOException {
2077             if (this.closed) {
2078                 throw new IOException("Reader has been closed");
2079             }
2080         }
2081 
2082     }
2083 
2084     private static final class UTF8Writer extends Writer {
2085 
2086         private final OutputStream stream;
2087 
2088         private boolean closed;
2089 
2090         UTF8Writer(final OutputStream stream) {
2091             this.stream = stream;
2092             this.closed = false;
2093         }
2094 
2095         @Override
2096         public void write(final int c) throws IOException {
2097             if (c <= 0b1111111) { // 0xxxxxxx
2098                 this.stream.write(c);
2099             } else {
2100                 writeHelper(c);
2101             }
2102         }
2103 
2104         private void writeHelper(final int c) throws IOException {
2105 
2106             if (c <= 0b11111_111111) { // 110xxxxx 10xxxxxx
2107                 this.stream.write(0b11000000 | c >>> 6);
2108                 this.stream.write(0b10000000 | c & 0b00111111);
2109 
2110             } else if (c <= 0b1111_111111_111111) { // 1110xxxx 10xxxxxx 10xxxxxx
2111                 this.stream.write(0b11100000 | c >>> 12);
2112                 this.stream.write(0b10000000 | c >>> 6 & 0b00111111);
2113                 this.stream.write(0b10000000 | c & 0b00111111);
2114 
2115             } else if (c <= 0b111_111111_111111_111111) { // 11110xxx 10xxxxxx 10xxxxxx 10xxxxxx
2116                 this.stream.write(0b11110000 | c >>> 18);
2117                 this.stream.write(0b10000000 | c >>> 12 & 0b00111111);
2118                 this.stream.write(0b10000000 | c >>> 6 & 0b00111111);
2119                 this.stream.write(0b10000000 | c & 0b00111111);
2120 
2121             } else {
2122                 throw new IOException("Invalid code point " + c);
2123             }
2124         }
2125 
2126         @Override
2127         public void write(final char[] cbuf, final int off, final int len) throws IOException {
2128             final int end = off + len;
2129             for (int index = off; index < end; ++index) {
2130                 write(cbuf[index]);
2131             }
2132         }
2133 
2134         @Override
2135         public void write(final String str, final int off, final int len) throws IOException {
2136             final int end = off + len;
2137             for (int index = off; index < end; ++index) {
2138                 write(str.charAt(index));
2139             }
2140         }
2141 
2142         @Override
2143         public void flush() throws IOException {
2144             this.stream.flush();
2145         }
2146 
2147         @Override
2148         public void close() throws IOException {
2149             synchronized (this) {
2150                 if (this.closed) {
2151                     return;
2152                 }
2153                 this.closed = true;
2154             }
2155             this.stream.close();
2156         }
2157 
2158     }
2159 
2160 }