| 
001 /**002  * Copyright (C) 2013 - Francesc Auli-Llinas
 003  *
 004  * This program is distributed under the BOI License.
 005  * This program is distributed in the hope that it will be useful, but
 006  * without any warranty; without even the implied warranty of merchantability or
 007  * fitness for a particular purpose.
 008  * You should have received a copy of the BOI License along with this program.
 009  * If not, see <http://www.deic.uab.cat/~francesc/software/license/>.
 010  */
 011 package streams;
 012
 013 import java.io.File;
 014 import java.io.FileOutputStream;
 015 import java.io.IOException;
 016 import java.nio.ByteBuffer;
 017 import java.nio.channels.FileChannel;
 018 import java.nio.file.FileSystems;
 019 import java.nio.file.StandardOpenOption;
 020
 021
 022 /**
 023  * This class is a buffered byte stream with no limits on its capacity. It implements
 024  * fewer functions than the Java ByteBuffer class, though it is slightly faster. It also
 025  * incorporates functions to temporarily save the stream in a file, so the buffer memory can
 026  * be freed. When the ByteStream is only needed for reading purposes, it can use the readFile
 027  * mode, in which the stream is linked to (segments of) a file from which bytes are read.<br>
 028  *
 029  * Usage: the object can be in three different modes: normal, readFile, and temporalFile. The
 030  * normal mode indicates that the stream data are in a buffer in memory. All functions to put/remove
 031  * bytes can be used in this mode. Also, the stream has a pointer which serves to
 032  * sequentially read all bytes of the stream. This pointer is only used for getting bytes, and
 033  * can be reset through the <code>reset</code> function.<br>
 034  * At any point when in the normal mode, the functions to <code>save/load from/to temporal
 035  * file</code> can be used to temporarily save the data in a file. When the data is in the temporal
 036  * file the stream is in the temporalFile mode. In this mode, the functions to get/put bytes
 037  * are not operative. Be aware that all objects of this class employ the same temporal file to avoid
 038  * the creation of many files. It is important to delete this file at the execution end by
 039  * calling the <code>destroyTemporalFile</code> function.<br>
 040  * The readFile mode can be used to link the stream to (segments of) a file from which data are read.
 041  * To use this mode, the object must be created with the constructor that has a FileChannel as a parameter.
 042  * Then, segments of the file can be added via the corresponding <code>putFileSegment</code> function.
 043  * At any moment, the function <code>endReadFileMode</code> can be called, loading all bytes from the
 044  * file to put them in memory and therefore ending the readFile mode and entering in the normal mode.
 045  * A stream that was a created in readFile mode can always return to that state calling the
 046  * function <code>returnReadFileMode</code>. Regardless of whether the data is in memory or
 047  * still in the file, the function <code>getByte</code>
 048  * reads data of the stream. When in readFile mode, the <code>get</code> functions do not advance the
 049  * position of the file.<br>
 050  * Each object of this class is intended to allocate one stream, without options to reuse the object.<br>
 051  *
 052  * Multithreading support: the object must be created and manipulated by a single thread. There
 053  * can be many objects of this class running simultaneously as long as a single thread manipulates each object.
 054  *
 055  * @author Francesc Auli-Llinas
 056  * @version 1.3
 057  */
 058 public final class ByteStream{
 059
 060   /**
 061    * Initial length of the byte buffer. The allocation of the buffer is doubled each time it
 062    * reaches its maximum capacity.
 063    * <p>
 064    * Negative values are not allowed.
 065    */
 066   private static final int INITIAL_ALLOCATION = 1024;
 067
 068   /**
 069    * Initial length of the array that keeps the segments of the file that contain the data of
 070    * the stream (only when the stream is in readFile mode). The length is doubled each time the
 071    * array reaches its maximum capacity.
 072    * <p>
 073    * Negative values are not allowed.
 074    */
 075   private static final int INITIAL_NUM_SEGMENTS = 32;
 076
 077   /**
 078    * Mode in which the stream is.
 079    * <p>
 080    * Valid modes are:
 081    * <ul>
 082    * <li>0: normal mode. The stream is in the byte buffer.</li>
 083    * <li>1: readFile mode. The stream is in a file from which data is read.</li>
 084    * <li>2: temporalFile mode. The stream is in a temporal file.</li>
 085    * </ul>
 086    */
 087   private int streamMode = -1;
 088
 089   /**
 090    * Array in which the bytes are stored.
 091    * <p>
 092    * The length of the array is increased as more data are put into it.
 093    */
 094   private byte[] buffer = null;
 095
 096   /**
 097    * Current limit of the buffer (position in which new bytes will be put). It is also the
 098    * number of bytes put in the stream.
 099    * <p>
 100    * Negative values are not allowed.
 101    */
 102   private long limit = 0;
 103
 104   /**
 105    * Current position for the <code>get</code> functions.
 106    * <p>
 107    * Only smaller or equal values than <code>limit</code> are allowed.
 108    */
 109   private long position = 0;
 110
 111   /**
 112    * File channel from which bytes are read when the stream is in readFile mode.
 113    * <p>
 114    * This is only employed when the object is initialized with a file channel.
 115    */
 116   private FileChannel readFileChannel = null;
 117
 118   /**
 119    * Number of segments in the file that contain the data of the stream (only employed when
 120    * the stream is in readFile mode).
 121    * <p>
 122    * Negative values not allowed.
 123    */
 124   private int readFileNumSegments = -1;
 125
 126   /**
 127    * Segments of the stream within the file, employed when the stream is in readFile mode.
 128    * <p>
 129    * The indices of the array are [segment][0- first byte, 1- length].
 130    */
 131   private long[][] readFileSegments = null;
 132
 133   /**
 134    * Position of this stream in the temporal file in which the data are saved. This is only
 135    * employed when the stream is in the temporal file, otherwise is null.
 136    * <p>
 137    * First byte of the stream in the temporal file.
 138    */
 139   private long temporalFilePosition = -1;
 140
 141   /**
 142    * File name in which the stream is saved temporarily.
 143    * <p>
 144    * It is set in the <code>saveToTemporalFile</code> function.
 145    */
 146   private static String temporalFileName = null;
 147
 148   /**
 149    * Temporal ByteBuffer employed to read from the file channel when necessary (only used when
 150    * the stream is in readFile mode).
 151    * <p>
 152    * This object is reused every time a new byte is read from the file.
 153    */
 154   private ByteBuffer temporalBuffer = ByteBuffer.allocate(1);
 155
 156   /**
 157    * This object is useful to lock the access to the variables shared by multiple threads
 158    * (i.e., the <code>temporalFileName</code>).
 159    * <p>
 160    * The object is never modified, its only purpose is to allow a lock.
 161    */
 162   private static final Object lock = new Object();
 163
 164
 165   /**
 166    * Allocates memory for the buffer. The stream enters in the normal mode.
 167    */
 168   public ByteStream(){
 169     buffer = new byte[INITIAL_ALLOCATION];
 170     streamMode = 0;
 171   }
 172
 173   /**
 174    * Allocates memory for the buffer, with a specified limit. The stream enters in the normal mode.
 175    *
 176    * @param initialAllocation number of bytes reserved in the buffer
 177    */
 178   public ByteStream(int initialAllocation){
 179     buffer = new byte[initialAllocation];
 180     streamMode = 0;
 181   }
 182
 183   /**
 184    * Initializes this stream with a file from which bytes are read. The stream enters in
 185    * the readFile mode.
 186    *
 187    * @param fc the file from which bytes are read
 188    */
 189   public ByteStream(FileChannel fc){
 190     this.readFileChannel = fc;
 191     readFileNumSegments = 0;
 192     readFileSegments = new long[INITIAL_NUM_SEGMENTS][2];
 193     streamMode = 1;
 194   }
 195
 196   /**
 197    * Puts a byte into the stream. This function can only be called when the stream is in
 198    * normal mode.
 199    *
 200    * @param b the added byte
 201    */
 202   public void putByte(byte b){
 203     assert(streamMode == 0);
 204     if(limit == buffer.length){
 205       byte[] bufferTMP = new byte[buffer.length * 2];
 206       System.arraycopy(buffer, 0, bufferTMP, 0, (int) limit);
 207       buffer = bufferTMP;
 208     }
 209     buffer[(int) limit] = b;
 210     limit++;
 211   }
 212
 213   /**
 214    * Puts one or more bytes into the stream. This function can only be called when the stream
 215    * is in normal mode.
 216    *
 217    * @param array the array of bytes that will be added
 218    * @param offset first position of the array that is added
 219    * @param length number of bytes added
 220    */
 221   public void putBytes(byte[] array, int offset, int length){
 222     assert(streamMode == 0);
 223     assert((offset >= 0) && (length > 0) && (offset + length <= array.length));
 224     if(limit + (long) length > (long) buffer.length){
 225       byte[] bufferTMP = new byte[(buffer.length + length) * 2];
 226       System.arraycopy(buffer, 0, bufferTMP, 0, (int) limit);
 227       buffer = bufferTMP;
 228     }
 229     System.arraycopy(array, offset, buffer, (int) limit, length);
 230     limit += (long) length;
 231   }
 232
 233   /**
 234    * Puts one or more bytes into the stream. This function can only be called when the
 235    * stream is in normal mode.
 236    *
 237    * @param num integer containing the bytes that will be added
 238    * @param numBytes number of bytes added
 239    */
 240   public void putBytes(int num, int numBytes){
 241     assert(streamMode == 0);
 242     assert((numBytes > 0) && (numBytes <= 4));
 243     for(int b = numBytes - 1; b >= 0; b--){
 244       byte addedByte = (byte) ((num >> (8 * b)) & 0x000000FF);
 245       putByte(addedByte);
 246     }
 247   }
 248
 249   /**
 250    * Puts one file segment containing some bytes into the stream. This function can only be
 251    * called when the stream is in readFile mode.
 252    *
 253    * @param begin position of the first byte of the file that is added to the stream
 254    * @param length length of the segment
 255    * @throws Exception when some problem introducing the segment occurs
 256    */
 257   public void putFileSegment(long begin, long length) throws Exception{
 258     assert(streamMode == 1);
 259     assert(readFileChannel != null);
 260     assert((begin >= 0) && (length > 0));
 261     assert(begin + length <= readFileChannel.size());
 262     assert(readFileNumSegments <= readFileSegments.length);
 263
 264     if(readFileNumSegments == readFileSegments.length){
 265       long[][] fcSegmentsTMP = new long[readFileSegments.length * 2][2];
 266       for(int segment = 0; segment < readFileNumSegments; segment++){
 267         fcSegmentsTMP[segment][0] = readFileSegments[segment][0];
 268         fcSegmentsTMP[segment][1] = readFileSegments[segment][1];
 269       }
 270     }
 271
 272     readFileSegments[readFileNumSegments][0] = begin;
 273     readFileSegments[readFileNumSegments][1] = length;
 274     readFileNumSegments++;
 275     limit += length;
 276   }
 277
 278   /**
 279    * Removes one byte from the end of the stream. This function can only be called when the
 280    * stream is in normal mode.
 281    */
 282   public void removeByte(){
 283     assert(streamMode == 0);
 284     limit = limit - 1 <= 0 ? 0: limit - 1;
 285     if(position > limit){
 286       position = limit;
 287     }
 288   }
 289
 290   /**
 291    * Removes one or more bytes from the end of the stream. This function can only be called
 292    * when the stream is in normal mode.
 293    *
 294    * @param num number of removed bytes
 295    */
 296   public void removeBytes(int num){
 297     assert(streamMode == 0);
 298     limit = limit - num <= 0 ? 0: limit - num;
 299     if(position > limit){
 300       position = limit;
 301     }
 302   }
 303
 304   /**
 305    * Gets the byte in the current position. This function can only be called when the stream
 306    * is in normal or readFile mode.
 307    *
 308    * @return the corresponding byte
 309    * @throws Exception when the end of the stream is reached
 310    */
 311   public byte getByte() throws Exception{
 312     assert((streamMode == 0) || (streamMode == 1));
 313     assert(position >= 0);
 314
 315     byte getByte = getByte(position);
 316     position++;
 317     return(getByte);
 318   }
 319
 320   /**
 321    * Gets the bytes in the current position, and places them to an integer. This function
 322    * can only be called when the stream is in normal or readFile mode.
 323    *
 324    * @param numBytes number of bytes read
 325    * @return the integer having the corresponding bytes
 326    * @throws Exception when the end of the stream is reached
 327    */
 328   public int getBytes(int numBytes) throws Exception{
 329     assert((streamMode == 0) || (streamMode == 1));
 330     int num = 0;
 331     for(int b = numBytes - 1; b >= 0; b--){
 332       byte getByte = getByte();
 333       for(int i = 7; i >= 0; i--){
 334         int bitMask = 1 << i;
 335         num += ((int) getByte & bitMask) << (b * 8);
 336       }
 337     }
 338     return(num);
 339   }
 340
 341   /**
 342    * Checks whether <code>get</code> functions can get more bytes. This function can only
 343    * be called when the stream is in normal or readFile mode.
 344    *
 345    * @return true when there are one or more bytes
 346    * @throws IOException when some problem with the file occurs
 347    */
 348   public boolean hasMoreBytes() throws IOException{
 349     assert((streamMode == 0) || (streamMode == 1));
 350     return(position < limit);
 351   }
 352
 353   /**
 354    * Gets the indicated byte. This function can only be called when the stream is in
 355    * normal or readFile mode.
 356    *
 357    * @param index the index of the byte (starting from 0)
 358    * @return the corresponding byte
 359    * @throws Exception when the indicated index is not valid
 360    */
 361   public byte getByte(long index) throws Exception{
 362     assert((streamMode == 0) || (streamMode == 1));
 363
 364     if((index < 0) || (index >= limit)){
 365       throw new Exception("Invalid position.");
 366     }
 367     byte getByte = 0;
 368     if(streamMode == 0){
 369       getByte = buffer[(int) index];
 370     }else if(streamMode == 1){
 371       //Determines the segment in which this index lies
 372       int segment = 0;
 373       long accBytes = 0;
 374       while(index >= accBytes + readFileSegments[segment][1]){
 375         accBytes += readFileSegments[segment][1];
 376         segment++;
 377       }
 378       assert(segment < readFileNumSegments);
 379
 380       //Determines the position in the file
 381       long fcPosition = index - accBytes + readFileSegments[segment][0];
 382       assert(fcPosition < readFileChannel.size());
 383
 384       //Gets the byte
 385       temporalBuffer.clear();
 386       readFileChannel.read(temporalBuffer, fcPosition);
 387       getByte = temporalBuffer.array()[0];
 388     }else{
 389       assert(false);
 390     }
 391     return(getByte);
 392   }
 393
 394   /**
 395    * Returns the array of bytes. The length of the array is equal or greater than
 396    * the bytes put in the stream. This function can only be called when the stream is in normal mode.
 397    *
 398    * @return the byte array
 399    */
 400   public byte[] getByteStream(){
 401     assert(streamMode == 0);
 402     return(buffer);
 403   }
 404
 405   /**
 406    * Returns the number of bytes in the stream.
 407    *
 408    * @return number of bytes
 409    */
 410   public long getLength(){
 411     return(limit);
 412   }
 413
 414   /**
 415    * Returns the current position in the stream.
 416    *
 417    * @return {@link #position}
 418    */
 419   public long getPosition(){
 420     return(position);
 421   }
 422
 423   /**
 424    * Clears the stream removing all bytes in it.
 425    */
 426   public void clear(){
 427     limit = 0;
 428     position = 0;
 429   }
 430
 431   /**
 432    * Sets the position to the first byte.
 433    */
 434   public void reset(){
 435     position = 0;
 436   }
 437
 438   /**
 439    * Advances the position of the stream in the specified number of bytes.
 440    *
 441    * @param numBytes number of skipped bytes (can be positive or negative)
 442    */
 443   public void skip(long numBytes){
 444     if(numBytes >= 0){
 445       if(position + numBytes > limit){
 446         position = limit;
 447       }else{
 448         position += numBytes;
 449       }
 450     }else{
 451       if(position + numBytes < 0){
 452         position = 0;
 453       }else{
 454         position += numBytes;
 455       }
 456     }
 457   }
 458
 459   /**
 460    * Ends the readFile mode putting all segments of the file in the real byte buffer,
 461    * entering in the normal mode. This stream can return to the readFile mode
 462    * calling <code>returnReadFileMode</code>.
 463    *
 464    * @throws IOException when some problem reading the file occurs
 465    */
 466   public void endReadFileMode() throws IOException{
 467     assert(streamMode == 1);
 468
 469     buffer = new byte[(int) limit];
 470     int readBytes = 0;
 471     for(int segment = 0; segment < readFileNumSegments; segment++){
 472       long begin = readFileSegments[segment][0];
 473       long length = readFileSegments[segment][1];
 474
 475       temporalBuffer = ByteBuffer.allocate((int) length);
 476       readFileChannel.read(temporalBuffer, begin);
 477       System.arraycopy(temporalBuffer.array(), 0, buffer, readBytes, (int) length);
 478       readBytes += length;
 479     }
 480     streamMode = 0;
 481   }
 482
 483   /**
 484    * Returns a stream that was created in readFile mode and that was converted to the normal
 485    * mode afterwards to the readFile mode again. Attention! Be aware that once this function
 486    * is called, the stream returns to the state left when the function <code>endReadFileMode</code> was
 487    * called. This means that ALL changes (such as deleting or adding bytes) that were carried out while
 488    * the stream was in normal mode are lost.
 489    */
 490   public void returnReadFileMode(){
 491     assert(streamMode == 0);
 492     assert(readFileChannel != null);
 493     assert(readFileNumSegments != -1);
 494     assert(readFileSegments != null);
 495
 496     buffer = null;
 497     limit = 0;
 498     for(int segment = 0; segment < readFileNumSegments; segment++){
 499       long length = (int) readFileSegments[segment][1];
 500       limit += length;
 501     }
 502     if(position >= limit){
 503       position = limit -1;
 504     }
 505     streamMode = 1;
 506   }
 507
 508   /**
 509    * Removes unnecessary bytes allocated for this buffer. This function can only be called
 510    * when the stream is in normal mode.
 511    */
 512   public void packetize(){
 513     assert(streamMode == 0);
 514
 515     byte[] bufferTMP = new byte[(int) limit];
 516     System.arraycopy(buffer, 0, bufferTMP, 0, (int) limit);
 517     buffer = bufferTMP;
 518   }
 519
 520   /**
 521    * Returns whether this stream is in normal mode or not.
 522    *
 523    * @return true when it is
 524    */
 525   public boolean isInReadNormalMode(){
 526     return(streamMode == 0);
 527   }
 528
 529   /**
 530    * Returns whether this stream is in readFile mode or not.
 531    *
 532    * @return true when it is
 533    */
 534   public boolean isInReadFileMode(){
 535     return(streamMode == 1);
 536   }
 537
 538   /**
 539    * Returns whether this stream is in a temporal file or not.
 540    *
 541    * @return true when it is
 542    */
 543   public boolean isInTemporalFile(){
 544     return(streamMode == 2);
 545   }
 546
 547   /**
 548    * Writes the stream to a file. This function can only be called when the stream is in
 549    * normal or readFile mode.
 550    *
 551    * @param fos file in which is written
 552    * @throws IOException when some error writing in the file occurs
 553    */
 554   public void write(FileOutputStream fos) throws IOException{
 555     assert((streamMode == 0) || (streamMode == 2));
 556     write(fos, 0, limit);
 557   }
 558
 559   /**
 560    * Writes a segment of the stream to a file. This function can only be called when the
 561    * stream is in normal or readFile mode.
 562    *
 563    * @param fos file in which is written
 564    * @param begin first byte of the stream written
 565    * @param length length written
 566    * @throws IOException when some error writing in the file occurs
 567    */
 568   public void write(FileOutputStream fos, long begin, long length) throws IOException{
 569     assert((streamMode == 0) || (streamMode == 2));
 570     assert(begin + length <= limit);
 571     if(!isInTemporalFile()){
 572       fos.write(buffer, (int) begin, (int) length);
 573     }else{
 574       FileChannel outputFC = fos.getChannel();
 575       FileChannel streamFC = FileChannel.open(FileSystems.getDefault().getPath(temporalFileName),
 576         StandardOpenOption.READ);
 577       long bytesWritten = 0;
 578       long positionOutputFC = outputFC.position();
 579       streamFC.position(begin + temporalFilePosition);
 580       while(bytesWritten < length){
 581         bytesWritten += outputFC.transferFrom(streamFC, positionOutputFC, length - bytesWritten);
 582         positionOutputFC += bytesWritten;
 583         streamFC.position(begin + temporalFilePosition + bytesWritten);
 584       }
 585       outputFC.position(positionOutputFC);
 586       streamFC.close();
 587     }
 588   }
 589
 590   /**
 591    * Saves the stream to a file and frees allocated memory. Before using the stream again,
 592    * the function <code>loadFromTemporalFile</code> must be called. This function can only be
 593    * called when the stream is in normal mode.
 594    *
 595    * @param temporalDirectory temporal directory in which the file is saved
 596    * @throws Exception when some problem writing the file occurs
 597    */
 598   public void saveToTemporalFile(String temporalDirectory) throws Exception{
 599     assert(streamMode == 0);
 600
 601     synchronized(lock){
 602       //Checks whether is the first time an object of this class is saved to the temporal file
 603       if(temporalFileName == null){
 604         do{
 605           temporalFileName = temporalDirectory;
 606           if(!temporalDirectory.endsWith("/")){
 607             temporalFileName += "/";
 608           }
 609           temporalFileName += "TMP-"; //prefix
 610           temporalFileName += Long.toString((long) (Math.random() * 999999999999d)) + ".tmp";
 611         }while(new File(temporalFileName).exists());
 612       }
 613
 614       FileOutputStream fos = new FileOutputStream(temporalFileName, true);
 615       temporalFilePosition = fos.getChannel().position();
 616       FileChannel fc = fos.getChannel();
 617       ByteBuffer bf = ByteBuffer.wrap(buffer, 0, (int) limit);
 618       fc.write(bf);
 619       fos.close();
 620     }
 621     buffer = null;
 622     streamMode = 2;
 623   }
 624
 625   /**
 626    * Loads the stream from the file in which it was saved. Data of the stream is not deleted
 627    * from the file. This function can only be called when the stream is in temporalFile mode.
 628    *
 629    * @throws Exception when some problem reading the file occurs
 630    */
 631   public void loadFromTemporalFile() throws Exception{
 632     assert(streamMode == 2);
 633
 634     buffer = new byte[(int) limit];
 635     FileChannel fc = FileChannel.open(FileSystems.getDefault().getPath(temporalFileName),
 636       StandardOpenOption.READ);
 637     fc.position(temporalFilePosition);
 638     ByteBuffer bf = ByteBuffer.wrap(buffer, 0, (int) limit);
 639     fc.read(bf);
 640     fc.close();
 641     temporalFilePosition = -1;
 642     streamMode = 0;
 643   }
 644
 645   /**
 646    * Frees the memory occupied by this stream. Call this function when this object is no
 647    * longer needed.
 648    */
 649   public void destroy(){
 650     buffer = null;
 651     readFileChannel = null;
 652     readFileSegments = null;
 653   }
 654
 655   /**
 656    * Destroys the temporal file shared by all objects of this class. Call this function only
 657    * when objects of this class are no longer needed.
 658    *
 659    * @throws Exception when some problem removing the temporary file occurs
 660    */
 661   public static void destroyTemporalFile() throws Exception{
 662     if(temporalFileName != null){
 663       File file = new File(temporalFileName);
 664       boolean delete = file.delete();
 665       assert(delete);
 666       temporalFileName = null;
 667     }
 668   }
 669
 670   /**
 671    * Computes the memory employed to keep the segments of the stream in the file, when the
 672    * stream is in file read mode. This function is for debugging purposes.
 673    *
 674    * @return the number of bytes employed
 675    */
 676   public int getMemorySegments(){
 677     int memory = 0;
 678     if(streamMode == 0){
 679       memory = readFileSegments.length * 2 * (Long.SIZE / 8);
 680     }
 681     return(memory);
 682   }
 683 }
 |