001 /**
002  * Copyright (C) 2013 - Francesc Auli-Llinas
003  *
004  * This program is distributed under the BOI License.
005  * This program is distributed in the hope that it will be useful, but
006  * without any warranty; without even the implied warranty of merchantability or
007  * fitness for a particular purpose.
008  * You should have received a copy of the BOI License along with this program.
009  * If not, see <http://www.deic.uab.cat/~francesc/software/license/>.
010  */
011 package streams;
012 
013 import java.io.File;
014 import java.io.FileOutputStream;
015 import java.io.IOException;
016 import java.nio.ByteBuffer;
017 import java.nio.channels.FileChannel;
018 import java.nio.file.FileSystems;
019 import java.nio.file.StandardOpenOption;
020 
021 
022 /**
023  * This class is a buffered byte stream with no limits on its capacity. It implements
024  * fewer functions than the Java ByteBuffer class, though it is slightly faster. It also
025  * incorporates functions to temporarily save the stream in a file, so the buffer memory can
026  * be freed. When the ByteStream is only needed for reading purposes, it can use the readFile
027  * mode, in which the stream is linked to (segments of) a file from which bytes are read.<br>
028  *
029  * Usage: the object can be in three different modes: normal, readFile, and temporalFile. The
030  * normal mode indicates that the stream data are in a buffer in memory. All functions to put/remove
031  * bytes can be used in this mode. Also, the stream has a pointer which serves to
032  * sequentially read all bytes of the stream. This pointer is only used for getting bytes, and
033  * can be reset through the <code>reset</code> function.<br>
034  * At any point when in the normal mode, the functions to <code>save/load from/to temporal
035  * file</code> can be used to temporarily save the data in a file. When the data is in the temporal
036  * file the stream is in the temporalFile mode. In this mode, the functions to get/put bytes
037  * are not operative. Be aware that all objects of this class employ the same temporal file to avoid
038  * the creation of many files. It is important to delete this file at the execution end by
039  * calling the <code>destroyTemporalFile</code> function.<br>
040  * The readFile mode can be used to link the stream to (segments of) a file from which data are read.
041  * To use this mode, the object must be created with the constructor that has a FileChannel as a parameter.
042  * Then, segments of the file can be added via the corresponding <code>putFileSegment</code> function.
043  * At any moment, the function <code>endReadFileMode</code> can be called, loading all bytes from the
044  * file to put them in memory and therefore ending the readFile mode and entering in the normal mode.
045  * A stream that was a created in readFile mode can always return to that state calling the
046  * function <code>returnReadFileMode</code>. Regardless of whether the data is in memory or
047  * still in the file, the function <code>getByte</code>
048  * reads data of the stream. When in readFile mode, the <code>get</code> functions do not advance the
049  * position of the file.<br>
050  * Each object of this class is intended to allocate one stream, without options to reuse the object.<br>
051  *
052  * Multithreading support: the object must be created and manipulated by a single thread. There
053  * can be many objects of this class running simultaneously as long as a single thread manipulates each object.
054  *
055  @author Francesc Auli-Llinas
056  @version 1.3
057  */
058 public final class ByteStream{
059 
060   /**
061    * Initial length of the byte buffer. The allocation of the buffer is doubled each time it
062    * reaches its maximum capacity.
063    <p>
064    * Negative values are not allowed.
065    */
066   private static final int INITIAL_ALLOCATION = 1024;
067 
068   /**
069    * Initial length of the array that keeps the segments of the file that contain the data of
070    * the stream (only when the stream is in readFile mode). The length is doubled each time the
071    * array reaches its maximum capacity.
072    <p>
073    * Negative values are not allowed.
074    */
075   private static final int INITIAL_NUM_SEGMENTS = 32;
076 
077   /**
078    * Mode in which the stream is.
079    <p>
080    * Valid modes are:
081    <ul>
082    <li>0: normal mode. The stream is in the byte buffer.</li>
083    <li>1: readFile mode. The stream is in a file from which data is read.</li>
084    <li>2: temporalFile mode. The stream is in a temporal file.</li>
085    </ul>
086    */
087   private int streamMode = -1;
088 
089   /**
090    * Array in which the bytes are stored.
091    <p>
092    * The length of the array is increased as more data are put into it.
093    */
094   private byte[] buffer = null;
095 
096   /**
097    * Current limit of the buffer (position in which new bytes will be put). It is also the
098    * number of bytes put in the stream.
099    <p>
100    * Negative values are not allowed.
101    */
102   private long limit = 0;
103 
104   /**
105    * Current position for the <code>get</code> functions.
106    <p>
107    * Only smaller or equal values than <code>limit</code> are allowed.
108    */
109   private long position = 0;
110 
111   /**
112    * File channel from which bytes are read when the stream is in readFile mode.
113    <p>
114    * This is only employed when the object is initialized with a file channel.
115    */
116   private FileChannel readFileChannel = null;
117 
118   /**
119    * Number of segments in the file that contain the data of the stream (only employed when
120    * the stream is in readFile mode).
121    <p>
122    * Negative values not allowed.
123    */
124   private int readFileNumSegments = -1;
125 
126   /**
127    * Segments of the stream within the file, employed when the stream is in readFile mode.
128    <p>
129    * The indices of the array are [segment][0- first byte, 1- length].
130    */
131   private long[][] readFileSegments = null;
132 
133   /**
134    * Position of this stream in the temporal file in which the data are saved. This is only
135    * employed when the stream is in the temporal file, otherwise is null.
136    <p>
137    * First byte of the stream in the temporal file.
138    */
139   private long temporalFilePosition = -1;
140 
141   /**
142    * File name in which the stream is saved temporarily.
143    <p>
144    * It is set in the <code>saveToTemporalFile</code> function.
145    */
146   private static String temporalFileName = null;
147 
148   /**
149    * Temporal ByteBuffer employed to read from the file channel when necessary (only used when
150    * the stream is in readFile mode).
151    <p>
152    * This object is reused every time a new byte is read from the file.
153    */
154   private ByteBuffer temporalBuffer = ByteBuffer.allocate(1);
155 
156   /**
157    * This object is useful to lock the access to the variables shared by multiple threads
158    * (i.e., the <code>temporalFileName</code>).
159    <p>
160    * The object is never modified, its only purpose is to allow a lock.
161    */
162   private static final Object lock = new Object();
163 
164 
165   /**
166    * Allocates memory for the buffer. The stream enters in the normal mode.
167    */
168   public ByteStream(){
169     buffer = new byte[INITIAL_ALLOCATION];
170     streamMode = 0;
171   }
172 
173   /**
174    * Allocates memory for the buffer, with a specified limit. The stream enters in the normal mode.
175    *
176    @param initialAllocation number of bytes reserved in the buffer
177    */
178   public ByteStream(int initialAllocation){
179     buffer = new byte[initialAllocation];
180     streamMode = 0;
181   }
182 
183   /**
184    * Initializes this stream with a file from which bytes are read. The stream enters in
185    * the readFile mode.
186    *
187    @param fc the file from which bytes are read
188    */
189   public ByteStream(FileChannel fc){
190     this.readFileChannel = fc;
191     readFileNumSegments = 0;
192     readFileSegments = new long[INITIAL_NUM_SEGMENTS][2];
193     streamMode = 1;
194   }
195 
196   /**
197    * Puts a byte into the stream. This function can only be called when the stream is in
198    * normal mode.
199    *
200    @param b the added byte
201    */
202   public void putByte(byte b){
203     assert(streamMode == 0);
204     if(limit == buffer.length){
205       byte[] bufferTMP = new byte[buffer.length * 2];
206       System.arraycopy(buffer, 0, bufferTMP, 0(intlimit);
207       buffer = bufferTMP;
208     }
209     buffer[(intlimit= b;
210     limit++;
211   }
212 
213   /**
214    * Puts one or more bytes into the stream. This function can only be called when the stream
215    * is in normal mode.
216    *
217    @param array the array of bytes that will be added
218    @param offset first position of the array that is added
219    @param length number of bytes added
220    */
221   public void putBytes(byte[] array, int offset, int length){
222     assert(streamMode == 0);
223     assert((offset >= 0&& (length > 0&& (offset + length <= array.length));
224     if(limit + (longlength > (longbuffer.length){
225       byte[] bufferTMP = new byte[(buffer.length + length2];
226       System.arraycopy(buffer, 0, bufferTMP, 0(intlimit);
227       buffer = bufferTMP;
228     }
229     System.arraycopy(array, offset, buffer, (intlimit, length);
230     limit += (longlength;
231   }
232 
233   /**
234    * Puts one or more bytes into the stream. This function can only be called when the
235    * stream is in normal mode.
236    *
237    @param num integer containing the bytes that will be added
238    @param numBytes number of bytes added
239    */
240   public void putBytes(int num, int numBytes){
241     assert(streamMode == 0);
242     assert((numBytes > 0&& (numBytes <= 4));
243     for(int b = numBytes - 1; b >= 0; b--){
244       byte addedByte = (byte) ((num >> (* b)) 0x000000FF);
245       putByte(addedByte);
246     }
247   }
248 
249   /**
250    * Puts one file segment containing some bytes into the stream. This function can only be
251    * called when the stream is in readFile mode.
252    *
253    @param begin position of the first byte of the file that is added to the stream
254    @param length length of the segment
255    @throws Exception when some problem introducing the segment occurs
256    */
257   public void putFileSegment(long begin, long lengththrows Exception{
258     assert(streamMode == 1);
259     assert(readFileChannel != null);
260     assert((begin >= 0&& (length > 0));
261     assert(begin + length <= readFileChannel.size());
262     assert(readFileNumSegments <= readFileSegments.length);
263 
264     if(readFileNumSegments == readFileSegments.length){
265       long[][] fcSegmentsTMP = new long[readFileSegments.length * 2][2];
266       for(int segment = 0; segment < readFileNumSegments; segment++){
267         fcSegmentsTMP[segment][0= readFileSegments[segment][0];
268         fcSegmentsTMP[segment][1= readFileSegments[segment][1];
269       }
270     }
271 
272     readFileSegments[readFileNumSegments][0= begin;
273     readFileSegments[readFileNumSegments][1= length;
274     readFileNumSegments++;
275     limit += length;
276   }
277 
278   /**
279    * Removes one byte from the end of the stream. This function can only be called when the
280    * stream is in normal mode.
281    */
282   public void removeByte(){
283     assert(streamMode == 0);
284     limit = limit - <= 0: limit - 1;
285     if(position > limit){
286       position = limit;
287     }
288   }
289 
290   /**
291    * Removes one or more bytes from the end of the stream. This function can only be called
292    * when the stream is in normal mode.
293    *
294    @param num number of removed bytes
295    */
296   public void removeBytes(int num){
297     assert(streamMode == 0);
298     limit = limit - num <= 0: limit - num;
299     if(position > limit){
300       position = limit;
301     }
302   }
303 
304   /**
305    * Gets the byte in the current position. This function can only be called when the stream
306    * is in normal or readFile mode.
307    *
308    @return the corresponding byte
309    @throws Exception when the end of the stream is reached
310    */
311   public byte getByte() throws Exception{
312     assert((streamMode == 0|| (streamMode == 1));
313     assert(position >= 0);
314 
315     byte getByte = getByte(position);
316     position++;
317     return(getByte);
318   }
319 
320   /**
321    * Gets the bytes in the current position, and places them to an integer. This function
322    * can only be called when the stream is in normal or readFile mode.
323    *
324    @param numBytes number of bytes read
325    @return the integer having the corresponding bytes
326    @throws Exception when the end of the stream is reached
327    */
328   public int getBytes(int numBytesthrows Exception{
329     assert((streamMode == 0|| (streamMode == 1));
330     int num = 0;
331     for(int b = numBytes - 1; b >= 0; b--){
332       byte getByte = getByte();
333       for(int i = 7; i >= 0; i--){
334         int bitMask = << i;
335         num += ((intgetByte & bitMask<< (b * 8);
336       }
337     }
338     return(num);
339   }
340 
341   /**
342    * Checks whether <code>get</code> functions can get more bytes. This function can only
343    * be called when the stream is in normal or readFile mode.
344    *
345    @return true when there are one or more bytes
346    @throws IOException when some problem with the file occurs
347    */
348   public boolean hasMoreBytes() throws IOException{
349     assert((streamMode == 0|| (streamMode == 1));
350     return(position < limit);
351   }
352 
353   /**
354    * Gets the indicated byte. This function can only be called when the stream is in
355    * normal or readFile mode.
356    *
357    @param index the index of the byte (starting from 0)
358    @return the corresponding byte
359    @throws Exception when the indicated index is not valid
360    */
361   public byte getByte(long indexthrows Exception{
362     assert((streamMode == 0|| (streamMode == 1));
363 
364     if((index < 0|| (index >= limit)){
365       throw new Exception("Invalid position.");
366     }
367     byte getByte = 0;
368     if(streamMode == 0){
369       getByte = buffer[(intindex];
370     }else if(streamMode == 1){
371       //Determines the segment in which this index lies
372       int segment = 0;
373       long accBytes = 0;
374       while(index >= accBytes + readFileSegments[segment][1]){
375         accBytes += readFileSegments[segment][1];
376         segment++;
377       }
378       assert(segment < readFileNumSegments);
379 
380       //Determines the position in the file
381       long fcPosition = index - accBytes + readFileSegments[segment][0];
382       assert(fcPosition < readFileChannel.size());
383 
384       //Gets the byte
385       temporalBuffer.clear();
386       readFileChannel.read(temporalBuffer, fcPosition);
387       getByte = temporalBuffer.array()[0];
388     }else{
389       assert(false);
390     }
391     return(getByte);
392   }
393 
394   /**
395    * Returns the array of bytes. The length of the array is equal or greater than
396    * the bytes put in the stream. This function can only be called when the stream is in normal mode.
397    *
398    @return the byte array
399    */
400   public byte[] getByteStream(){
401     assert(streamMode == 0);
402     return(buffer);
403   }
404 
405   /**
406    * Returns the number of bytes in the stream.
407    *
408    @return number of bytes
409    */
410   public long getLength(){
411     return(limit);
412   }
413 
414   /**
415    * Returns the current position in the stream.
416    *
417    @return {@link #position}
418    */
419   public long getPosition(){
420     return(position);
421   }
422 
423   /**
424    * Clears the stream removing all bytes in it.
425    */
426   public void clear(){
427     limit = 0;
428     position = 0;
429   }
430 
431   /**
432    * Sets the position to the first byte.
433    */
434   public void reset(){
435     position = 0;
436   }
437 
438   /**
439    * Advances the position of the stream in the specified number of bytes.
440    *
441    @param numBytes number of skipped bytes (can be positive or negative)
442    */
443   public void skip(long numBytes){
444     if(numBytes >= 0){
445       if(position + numBytes > limit){
446         position = limit;
447       }else{
448         position += numBytes;
449       }
450     }else{
451       if(position + numBytes < 0){
452         position = 0;
453       }else{
454         position += numBytes;
455       }
456     }
457   }
458 
459   /**
460    * Ends the readFile mode putting all segments of the file in the real byte buffer,
461    * entering in the normal mode. This stream can return to the readFile mode
462    * calling <code>returnReadFileMode</code>.
463    *
464    @throws IOException when some problem reading the file occurs
465    */
466   public void endReadFileMode() throws IOException{
467     assert(streamMode == 1);
468 
469     buffer = new byte[(intlimit];
470     int readBytes = 0;
471     for(int segment = 0; segment < readFileNumSegments; segment++){
472       long begin = readFileSegments[segment][0];
473       long length = readFileSegments[segment][1];
474 
475       temporalBuffer = ByteBuffer.allocate((intlength);
476       readFileChannel.read(temporalBuffer, begin);
477       System.arraycopy(temporalBuffer.array()0, buffer, readBytes, (intlength);
478       readBytes += length;
479     }
480     streamMode = 0;
481   }
482 
483   /**
484    * Returns a stream that was created in readFile mode and that was converted to the normal
485    * mode afterwards to the readFile mode again. Attention! Be aware that once this function
486    * is called, the stream returns to the state left when the function <code>endReadFileMode</code> was
487    * called. This means that ALL changes (such as deleting or adding bytes) that were carried out while
488    * the stream was in normal mode are lost.
489    */
490   public void returnReadFileMode(){
491     assert(streamMode == 0);
492     assert(readFileChannel != null);
493     assert(readFileNumSegments != -1);
494     assert(readFileSegments != null);
495 
496     buffer = null;
497     limit = 0;
498     for(int segment = 0; segment < readFileNumSegments; segment++){
499       long length = (intreadFileSegments[segment][1];
500       limit += length;
501     }
502     if(position >= limit){
503       position = limit -1;
504     }
505     streamMode = 1;
506   }
507 
508   /**
509    * Removes unnecessary bytes allocated for this buffer. This function can only be called
510    * when the stream is in normal mode.
511    */
512   public void packetize(){
513     assert(streamMode == 0);
514 
515     byte[] bufferTMP = new byte[(intlimit];
516     System.arraycopy(buffer, 0, bufferTMP, 0(intlimit);
517     buffer = bufferTMP;
518   }
519 
520   /**
521    * Returns whether this stream is in normal mode or not.
522    *
523    @return true when it is
524    */
525   public boolean isInReadNormalMode(){
526     return(streamMode == 0);
527   }
528 
529   /**
530    * Returns whether this stream is in readFile mode or not.
531    *
532    @return true when it is
533    */
534   public boolean isInReadFileMode(){
535     return(streamMode == 1);
536   }
537 
538   /**
539    * Returns whether this stream is in a temporal file or not.
540    *
541    @return true when it is
542    */
543   public boolean isInTemporalFile(){
544     return(streamMode == 2);
545   }
546 
547   /**
548    * Writes the stream to a file. This function can only be called when the stream is in
549    * normal or readFile mode.
550    *
551    @param fos file in which is written
552    @throws IOException when some error writing in the file occurs
553    */
554   public void write(FileOutputStream fosthrows IOException{
555     assert((streamMode == 0|| (streamMode == 2));
556     write(fos, 0, limit);
557   }
558 
559   /**
560    * Writes a segment of the stream to a file. This function can only be called when the
561    * stream is in normal or readFile mode.
562    *
563    @param fos file in which is written
564    @param begin first byte of the stream written
565    @param length length written
566    @throws IOException when some error writing in the file occurs
567    */
568   public void write(FileOutputStream fos, long begin, long lengththrows IOException{
569     assert((streamMode == 0|| (streamMode == 2));
570     assert(begin + length <= limit);
571     if(!isInTemporalFile()){
572       fos.write(buffer, (intbegin, (intlength);
573     }else{
574       FileChannel outputFC = fos.getChannel();
575       FileChannel streamFC = FileChannel.open(FileSystems.getDefault().getPath(temporalFileName),
576         StandardOpenOption.READ);
577       long bytesWritten = 0;
578       long positionOutputFC = outputFC.position();
579       streamFC.position(begin + temporalFilePosition);
580       while(bytesWritten < length){
581         bytesWritten += outputFC.transferFrom(streamFC, positionOutputFC, length - bytesWritten);
582         positionOutputFC += bytesWritten;
583         streamFC.position(begin + temporalFilePosition + bytesWritten);
584       }
585       outputFC.position(positionOutputFC);
586       streamFC.close();
587     }
588   }
589 
590   /**
591    * Saves the stream to a file and frees allocated memory. Before using the stream again,
592    * the function <code>loadFromTemporalFile</code> must be called. This function can only be
593    * called when the stream is in normal mode.
594    *
595    @param temporalDirectory temporal directory in which the file is saved
596    @throws Exception when some problem writing the file occurs
597    */
598   public void saveToTemporalFile(String temporalDirectorythrows Exception{
599     assert(streamMode == 0);
600 
601     synchronized(lock){
602       //Checks whether is the first time an object of this class is saved to the temporal file
603       if(temporalFileName == null){
604         do{
605           temporalFileName = temporalDirectory;
606           if(!temporalDirectory.endsWith("/")){
607             temporalFileName += "/";
608           }
609           temporalFileName += "TMP-"//prefix
610           temporalFileName += Long.toString((long) (Math.random() 999999999999d)) ".tmp";
611         }while(new File(temporalFileName).exists());
612       }
613 
614       FileOutputStream fos = new FileOutputStream(temporalFileName, true);
615       temporalFilePosition = fos.getChannel().position();
616       FileChannel fc = fos.getChannel();
617       ByteBuffer bf = ByteBuffer.wrap(buffer, 0(intlimit);
618       fc.write(bf);
619       fos.close();
620     }
621     buffer = null;
622     streamMode = 2;
623   }
624 
625   /**
626    * Loads the stream from the file in which it was saved. Data of the stream is not deleted
627    * from the file. This function can only be called when the stream is in temporalFile mode.
628    *
629    @throws Exception when some problem reading the file occurs
630    */
631   public void loadFromTemporalFile() throws Exception{
632     assert(streamMode == 2);
633 
634     buffer = new byte[(intlimit];
635     FileChannel fc = FileChannel.open(FileSystems.getDefault().getPath(temporalFileName),
636       StandardOpenOption.READ);
637     fc.position(temporalFilePosition);
638     ByteBuffer bf = ByteBuffer.wrap(buffer, 0(intlimit);
639     fc.read(bf);
640     fc.close();
641     temporalFilePosition = -1;
642     streamMode = 0;
643   }
644 
645   /**
646    * Frees the memory occupied by this stream. Call this function when this object is no
647    * longer needed.
648    */
649   public void destroy(){
650     buffer = null;
651     readFileChannel = null;
652     readFileSegments = null;
653   }
654 
655   /**
656    * Destroys the temporal file shared by all objects of this class. Call this function only
657    * when objects of this class are no longer needed.
658    *
659    @throws Exception when some problem removing the temporary file occurs
660    */
661   public static void destroyTemporalFile() throws Exception{
662     if(temporalFileName != null){
663       File file = new File(temporalFileName);
664       boolean delete = file.delete();
665       assert(delete);
666       temporalFileName = null;
667     }
668   }
669 
670   /**
671    * Computes the memory employed to keep the segments of the stream in the file, when the
672    * stream is in file read mode. This function is for debugging purposes.
673    *
674    @return the number of bytes employed
675    */
676   public int getMemorySegments(){
677     int memory = 0;
678     if(streamMode == 0){
679       memory = readFileSegments.length * (Long.SIZE / 8);
680     }
681     return(memory);
682   }
683 }
Java2html