001 /**
002 * Copyright (C) 2013 - Francesc Auli-Llinas
003 *
004 * This program is distributed under the BOI License.
005 * This program is distributed in the hope that it will be useful, but
006 * without any warranty; without even the implied warranty of merchantability or
007 * fitness for a particular purpose.
008 * You should have received a copy of the BOI License along with this program.
009 * If not, see <http://www.deic.uab.cat/~francesc/software/license/>.
010 */
011 package streams;
012
013 import java.io.File;
014 import java.io.FileOutputStream;
015 import java.io.IOException;
016 import java.nio.ByteBuffer;
017 import java.nio.channels.FileChannel;
018 import java.nio.file.FileSystems;
019 import java.nio.file.StandardOpenOption;
020
021
022 /**
023 * This class is a buffered byte stream with no limits on its capacity. It implements
024 * fewer functions than the Java ByteBuffer class, though it is slightly faster. It also
025 * incorporates functions to temporarily save the stream in a file, so the buffer memory can
026 * be freed. When the ByteStream is only needed for reading purposes, it can use the readFile
027 * mode, in which the stream is linked to (segments of) a file from which bytes are read.<br>
028 *
029 * Usage: the object can be in three different modes: normal, readFile, and temporalFile. The
030 * normal mode indicates that the stream data are in a buffer in memory. All functions to put/remove
031 * bytes can be used in this mode. Also, the stream has a pointer which serves to
032 * sequentially read all bytes of the stream. This pointer is only used for getting bytes, and
033 * can be reset through the <code>reset</code> function.<br>
034 * At any point when in the normal mode, the functions to <code>save/load from/to temporal
035 * file</code> can be used to temporarily save the data in a file. When the data is in the temporal
036 * file the stream is in the temporalFile mode. In this mode, the functions to get/put bytes
037 * are not operative. Be aware that all objects of this class employ the same temporal file to avoid
038 * the creation of many files. It is important to delete this file at the execution end by
039 * calling the <code>destroyTemporalFile</code> function.<br>
040 * The readFile mode can be used to link the stream to (segments of) a file from which data are read.
041 * To use this mode, the object must be created with the constructor that has a FileChannel as a parameter.
042 * Then, segments of the file can be added via the corresponding <code>putFileSegment</code> function.
043 * At any moment, the function <code>endReadFileMode</code> can be called, loading all bytes from the
044 * file to put them in memory and therefore ending the readFile mode and entering in the normal mode.
045 * A stream that was a created in readFile mode can always return to that state calling the
046 * function <code>returnReadFileMode</code>. Regardless of whether the data is in memory or
047 * still in the file, the function <code>getByte</code>
048 * reads data of the stream. When in readFile mode, the <code>get</code> functions do not advance the
049 * position of the file.<br>
050 * Each object of this class is intended to allocate one stream, without options to reuse the object.<br>
051 *
052 * Multithreading support: the object must be created and manipulated by a single thread. There
053 * can be many objects of this class running simultaneously as long as a single thread manipulates each object.
054 *
055 * @author Francesc Auli-Llinas
056 * @version 1.3
057 */
058 public final class ByteStream{
059
060 /**
061 * Initial length of the byte buffer. The allocation of the buffer is doubled each time it
062 * reaches its maximum capacity.
063 * <p>
064 * Negative values are not allowed.
065 */
066 private static final int INITIAL_ALLOCATION = 1024;
067
068 /**
069 * Initial length of the array that keeps the segments of the file that contain the data of
070 * the stream (only when the stream is in readFile mode). The length is doubled each time the
071 * array reaches its maximum capacity.
072 * <p>
073 * Negative values are not allowed.
074 */
075 private static final int INITIAL_NUM_SEGMENTS = 32;
076
077 /**
078 * Mode in which the stream is.
079 * <p>
080 * Valid modes are:
081 * <ul>
082 * <li>0: normal mode. The stream is in the byte buffer.</li>
083 * <li>1: readFile mode. The stream is in a file from which data is read.</li>
084 * <li>2: temporalFile mode. The stream is in a temporal file.</li>
085 * </ul>
086 */
087 private int streamMode = -1;
088
089 /**
090 * Array in which the bytes are stored.
091 * <p>
092 * The length of the array is increased as more data are put into it.
093 */
094 private byte[] buffer = null;
095
096 /**
097 * Current limit of the buffer (position in which new bytes will be put). It is also the
098 * number of bytes put in the stream.
099 * <p>
100 * Negative values are not allowed.
101 */
102 private long limit = 0;
103
104 /**
105 * Current position for the <code>get</code> functions.
106 * <p>
107 * Only smaller or equal values than <code>limit</code> are allowed.
108 */
109 private long position = 0;
110
111 /**
112 * File channel from which bytes are read when the stream is in readFile mode.
113 * <p>
114 * This is only employed when the object is initialized with a file channel.
115 */
116 private FileChannel readFileChannel = null;
117
118 /**
119 * Number of segments in the file that contain the data of the stream (only employed when
120 * the stream is in readFile mode).
121 * <p>
122 * Negative values not allowed.
123 */
124 private int readFileNumSegments = -1;
125
126 /**
127 * Segments of the stream within the file, employed when the stream is in readFile mode.
128 * <p>
129 * The indices of the array are [segment][0- first byte, 1- length].
130 */
131 private long[][] readFileSegments = null;
132
133 /**
134 * Position of this stream in the temporal file in which the data are saved. This is only
135 * employed when the stream is in the temporal file, otherwise is null.
136 * <p>
137 * First byte of the stream in the temporal file.
138 */
139 private long temporalFilePosition = -1;
140
141 /**
142 * File name in which the stream is saved temporarily.
143 * <p>
144 * It is set in the <code>saveToTemporalFile</code> function.
145 */
146 private static String temporalFileName = null;
147
148 /**
149 * Temporal ByteBuffer employed to read from the file channel when necessary (only used when
150 * the stream is in readFile mode).
151 * <p>
152 * This object is reused every time a new byte is read from the file.
153 */
154 private ByteBuffer temporalBuffer = ByteBuffer.allocate(1);
155
156 /**
157 * This object is useful to lock the access to the variables shared by multiple threads
158 * (i.e., the <code>temporalFileName</code>).
159 * <p>
160 * The object is never modified, its only purpose is to allow a lock.
161 */
162 private static final Object lock = new Object();
163
164
165 /**
166 * Allocates memory for the buffer. The stream enters in the normal mode.
167 */
168 public ByteStream(){
169 buffer = new byte[INITIAL_ALLOCATION];
170 streamMode = 0;
171 }
172
173 /**
174 * Allocates memory for the buffer, with a specified limit. The stream enters in the normal mode.
175 *
176 * @param initialAllocation number of bytes reserved in the buffer
177 */
178 public ByteStream(int initialAllocation){
179 buffer = new byte[initialAllocation];
180 streamMode = 0;
181 }
182
183 /**
184 * Initializes this stream with a file from which bytes are read. The stream enters in
185 * the readFile mode.
186 *
187 * @param fc the file from which bytes are read
188 */
189 public ByteStream(FileChannel fc){
190 this.readFileChannel = fc;
191 readFileNumSegments = 0;
192 readFileSegments = new long[INITIAL_NUM_SEGMENTS][2];
193 streamMode = 1;
194 }
195
196 /**
197 * Puts a byte into the stream. This function can only be called when the stream is in
198 * normal mode.
199 *
200 * @param b the added byte
201 */
202 public void putByte(byte b){
203 assert(streamMode == 0);
204 if(limit == buffer.length){
205 byte[] bufferTMP = new byte[buffer.length * 2];
206 System.arraycopy(buffer, 0, bufferTMP, 0, (int) limit);
207 buffer = bufferTMP;
208 }
209 buffer[(int) limit] = b;
210 limit++;
211 }
212
213 /**
214 * Puts one or more bytes into the stream. This function can only be called when the stream
215 * is in normal mode.
216 *
217 * @param array the array of bytes that will be added
218 * @param offset first position of the array that is added
219 * @param length number of bytes added
220 */
221 public void putBytes(byte[] array, int offset, int length){
222 assert(streamMode == 0);
223 assert((offset >= 0) && (length > 0) && (offset + length <= array.length));
224 if(limit + (long) length > (long) buffer.length){
225 byte[] bufferTMP = new byte[(buffer.length + length) * 2];
226 System.arraycopy(buffer, 0, bufferTMP, 0, (int) limit);
227 buffer = bufferTMP;
228 }
229 System.arraycopy(array, offset, buffer, (int) limit, length);
230 limit += (long) length;
231 }
232
233 /**
234 * Puts one or more bytes into the stream. This function can only be called when the
235 * stream is in normal mode.
236 *
237 * @param num integer containing the bytes that will be added
238 * @param numBytes number of bytes added
239 */
240 public void putBytes(int num, int numBytes){
241 assert(streamMode == 0);
242 assert((numBytes > 0) && (numBytes <= 4));
243 for(int b = numBytes - 1; b >= 0; b--){
244 byte addedByte = (byte) ((num >> (8 * b)) & 0x000000FF);
245 putByte(addedByte);
246 }
247 }
248
249 /**
250 * Puts one file segment containing some bytes into the stream. This function can only be
251 * called when the stream is in readFile mode.
252 *
253 * @param begin position of the first byte of the file that is added to the stream
254 * @param length length of the segment
255 * @throws Exception when some problem introducing the segment occurs
256 */
257 public void putFileSegment(long begin, long length) throws Exception{
258 assert(streamMode == 1);
259 assert(readFileChannel != null);
260 assert((begin >= 0) && (length > 0));
261 assert(begin + length <= readFileChannel.size());
262 assert(readFileNumSegments <= readFileSegments.length);
263
264 if(readFileNumSegments == readFileSegments.length){
265 long[][] fcSegmentsTMP = new long[readFileSegments.length * 2][2];
266 for(int segment = 0; segment < readFileNumSegments; segment++){
267 fcSegmentsTMP[segment][0] = readFileSegments[segment][0];
268 fcSegmentsTMP[segment][1] = readFileSegments[segment][1];
269 }
270 }
271
272 readFileSegments[readFileNumSegments][0] = begin;
273 readFileSegments[readFileNumSegments][1] = length;
274 readFileNumSegments++;
275 limit += length;
276 }
277
278 /**
279 * Removes one byte from the end of the stream. This function can only be called when the
280 * stream is in normal mode.
281 */
282 public void removeByte(){
283 assert(streamMode == 0);
284 limit = limit - 1 <= 0 ? 0: limit - 1;
285 if(position > limit){
286 position = limit;
287 }
288 }
289
290 /**
291 * Removes one or more bytes from the end of the stream. This function can only be called
292 * when the stream is in normal mode.
293 *
294 * @param num number of removed bytes
295 */
296 public void removeBytes(int num){
297 assert(streamMode == 0);
298 limit = limit - num <= 0 ? 0: limit - num;
299 if(position > limit){
300 position = limit;
301 }
302 }
303
304 /**
305 * Gets the byte in the current position. This function can only be called when the stream
306 * is in normal or readFile mode.
307 *
308 * @return the corresponding byte
309 * @throws Exception when the end of the stream is reached
310 */
311 public byte getByte() throws Exception{
312 assert((streamMode == 0) || (streamMode == 1));
313 assert(position >= 0);
314
315 byte getByte = getByte(position);
316 position++;
317 return(getByte);
318 }
319
320 /**
321 * Gets the bytes in the current position, and places them to an integer. This function
322 * can only be called when the stream is in normal or readFile mode.
323 *
324 * @param numBytes number of bytes read
325 * @return the integer having the corresponding bytes
326 * @throws Exception when the end of the stream is reached
327 */
328 public int getBytes(int numBytes) throws Exception{
329 assert((streamMode == 0) || (streamMode == 1));
330 int num = 0;
331 for(int b = numBytes - 1; b >= 0; b--){
332 byte getByte = getByte();
333 for(int i = 7; i >= 0; i--){
334 int bitMask = 1 << i;
335 num += ((int) getByte & bitMask) << (b * 8);
336 }
337 }
338 return(num);
339 }
340
341 /**
342 * Checks whether <code>get</code> functions can get more bytes. This function can only
343 * be called when the stream is in normal or readFile mode.
344 *
345 * @return true when there are one or more bytes
346 * @throws IOException when some problem with the file occurs
347 */
348 public boolean hasMoreBytes() throws IOException{
349 assert((streamMode == 0) || (streamMode == 1));
350 return(position < limit);
351 }
352
353 /**
354 * Gets the indicated byte. This function can only be called when the stream is in
355 * normal or readFile mode.
356 *
357 * @param index the index of the byte (starting from 0)
358 * @return the corresponding byte
359 * @throws Exception when the indicated index is not valid
360 */
361 public byte getByte(long index) throws Exception{
362 assert((streamMode == 0) || (streamMode == 1));
363
364 if((index < 0) || (index >= limit)){
365 throw new Exception("Invalid position.");
366 }
367 byte getByte = 0;
368 if(streamMode == 0){
369 getByte = buffer[(int) index];
370 }else if(streamMode == 1){
371 //Determines the segment in which this index lies
372 int segment = 0;
373 long accBytes = 0;
374 while(index >= accBytes + readFileSegments[segment][1]){
375 accBytes += readFileSegments[segment][1];
376 segment++;
377 }
378 assert(segment < readFileNumSegments);
379
380 //Determines the position in the file
381 long fcPosition = index - accBytes + readFileSegments[segment][0];
382 assert(fcPosition < readFileChannel.size());
383
384 //Gets the byte
385 temporalBuffer.clear();
386 readFileChannel.read(temporalBuffer, fcPosition);
387 getByte = temporalBuffer.array()[0];
388 }else{
389 assert(false);
390 }
391 return(getByte);
392 }
393
394 /**
395 * Returns the array of bytes. The length of the array is equal or greater than
396 * the bytes put in the stream. This function can only be called when the stream is in normal mode.
397 *
398 * @return the byte array
399 */
400 public byte[] getByteStream(){
401 assert(streamMode == 0);
402 return(buffer);
403 }
404
405 /**
406 * Returns the number of bytes in the stream.
407 *
408 * @return number of bytes
409 */
410 public long getLength(){
411 return(limit);
412 }
413
414 /**
415 * Returns the current position in the stream.
416 *
417 * @return {@link #position}
418 */
419 public long getPosition(){
420 return(position);
421 }
422
423 /**
424 * Clears the stream removing all bytes in it.
425 */
426 public void clear(){
427 limit = 0;
428 position = 0;
429 }
430
431 /**
432 * Sets the position to the first byte.
433 */
434 public void reset(){
435 position = 0;
436 }
437
438 /**
439 * Advances the position of the stream in the specified number of bytes.
440 *
441 * @param numBytes number of skipped bytes (can be positive or negative)
442 */
443 public void skip(long numBytes){
444 if(numBytes >= 0){
445 if(position + numBytes > limit){
446 position = limit;
447 }else{
448 position += numBytes;
449 }
450 }else{
451 if(position + numBytes < 0){
452 position = 0;
453 }else{
454 position += numBytes;
455 }
456 }
457 }
458
459 /**
460 * Ends the readFile mode putting all segments of the file in the real byte buffer,
461 * entering in the normal mode. This stream can return to the readFile mode
462 * calling <code>returnReadFileMode</code>.
463 *
464 * @throws IOException when some problem reading the file occurs
465 */
466 public void endReadFileMode() throws IOException{
467 assert(streamMode == 1);
468
469 buffer = new byte[(int) limit];
470 int readBytes = 0;
471 for(int segment = 0; segment < readFileNumSegments; segment++){
472 long begin = readFileSegments[segment][0];
473 long length = readFileSegments[segment][1];
474
475 temporalBuffer = ByteBuffer.allocate((int) length);
476 readFileChannel.read(temporalBuffer, begin);
477 System.arraycopy(temporalBuffer.array(), 0, buffer, readBytes, (int) length);
478 readBytes += length;
479 }
480 streamMode = 0;
481 }
482
483 /**
484 * Returns a stream that was created in readFile mode and that was converted to the normal
485 * mode afterwards to the readFile mode again. Attention! Be aware that once this function
486 * is called, the stream returns to the state left when the function <code>endReadFileMode</code> was
487 * called. This means that ALL changes (such as deleting or adding bytes) that were carried out while
488 * the stream was in normal mode are lost.
489 */
490 public void returnReadFileMode(){
491 assert(streamMode == 0);
492 assert(readFileChannel != null);
493 assert(readFileNumSegments != -1);
494 assert(readFileSegments != null);
495
496 buffer = null;
497 limit = 0;
498 for(int segment = 0; segment < readFileNumSegments; segment++){
499 long length = (int) readFileSegments[segment][1];
500 limit += length;
501 }
502 if(position >= limit){
503 position = limit -1;
504 }
505 streamMode = 1;
506 }
507
508 /**
509 * Removes unnecessary bytes allocated for this buffer. This function can only be called
510 * when the stream is in normal mode.
511 */
512 public void packetize(){
513 assert(streamMode == 0);
514
515 byte[] bufferTMP = new byte[(int) limit];
516 System.arraycopy(buffer, 0, bufferTMP, 0, (int) limit);
517 buffer = bufferTMP;
518 }
519
520 /**
521 * Returns whether this stream is in normal mode or not.
522 *
523 * @return true when it is
524 */
525 public boolean isInReadNormalMode(){
526 return(streamMode == 0);
527 }
528
529 /**
530 * Returns whether this stream is in readFile mode or not.
531 *
532 * @return true when it is
533 */
534 public boolean isInReadFileMode(){
535 return(streamMode == 1);
536 }
537
538 /**
539 * Returns whether this stream is in a temporal file or not.
540 *
541 * @return true when it is
542 */
543 public boolean isInTemporalFile(){
544 return(streamMode == 2);
545 }
546
547 /**
548 * Writes the stream to a file. This function can only be called when the stream is in
549 * normal or readFile mode.
550 *
551 * @param fos file in which is written
552 * @throws IOException when some error writing in the file occurs
553 */
554 public void write(FileOutputStream fos) throws IOException{
555 assert((streamMode == 0) || (streamMode == 2));
556 write(fos, 0, limit);
557 }
558
559 /**
560 * Writes a segment of the stream to a file. This function can only be called when the
561 * stream is in normal or readFile mode.
562 *
563 * @param fos file in which is written
564 * @param begin first byte of the stream written
565 * @param length length written
566 * @throws IOException when some error writing in the file occurs
567 */
568 public void write(FileOutputStream fos, long begin, long length) throws IOException{
569 assert((streamMode == 0) || (streamMode == 2));
570 assert(begin + length <= limit);
571 if(!isInTemporalFile()){
572 fos.write(buffer, (int) begin, (int) length);
573 }else{
574 FileChannel outputFC = fos.getChannel();
575 FileChannel streamFC = FileChannel.open(FileSystems.getDefault().getPath(temporalFileName),
576 StandardOpenOption.READ);
577 long bytesWritten = 0;
578 long positionOutputFC = outputFC.position();
579 streamFC.position(begin + temporalFilePosition);
580 while(bytesWritten < length){
581 bytesWritten += outputFC.transferFrom(streamFC, positionOutputFC, length - bytesWritten);
582 positionOutputFC += bytesWritten;
583 streamFC.position(begin + temporalFilePosition + bytesWritten);
584 }
585 outputFC.position(positionOutputFC);
586 streamFC.close();
587 }
588 }
589
590 /**
591 * Saves the stream to a file and frees allocated memory. Before using the stream again,
592 * the function <code>loadFromTemporalFile</code> must be called. This function can only be
593 * called when the stream is in normal mode.
594 *
595 * @param temporalDirectory temporal directory in which the file is saved
596 * @throws Exception when some problem writing the file occurs
597 */
598 public void saveToTemporalFile(String temporalDirectory) throws Exception{
599 assert(streamMode == 0);
600
601 synchronized(lock){
602 //Checks whether is the first time an object of this class is saved to the temporal file
603 if(temporalFileName == null){
604 do{
605 temporalFileName = temporalDirectory;
606 if(!temporalDirectory.endsWith("/")){
607 temporalFileName += "/";
608 }
609 temporalFileName += "TMP-"; //prefix
610 temporalFileName += Long.toString((long) (Math.random() * 999999999999d)) + ".tmp";
611 }while(new File(temporalFileName).exists());
612 }
613
614 FileOutputStream fos = new FileOutputStream(temporalFileName, true);
615 temporalFilePosition = fos.getChannel().position();
616 FileChannel fc = fos.getChannel();
617 ByteBuffer bf = ByteBuffer.wrap(buffer, 0, (int) limit);
618 fc.write(bf);
619 fos.close();
620 }
621 buffer = null;
622 streamMode = 2;
623 }
624
625 /**
626 * Loads the stream from the file in which it was saved. Data of the stream is not deleted
627 * from the file. This function can only be called when the stream is in temporalFile mode.
628 *
629 * @throws Exception when some problem reading the file occurs
630 */
631 public void loadFromTemporalFile() throws Exception{
632 assert(streamMode == 2);
633
634 buffer = new byte[(int) limit];
635 FileChannel fc = FileChannel.open(FileSystems.getDefault().getPath(temporalFileName),
636 StandardOpenOption.READ);
637 fc.position(temporalFilePosition);
638 ByteBuffer bf = ByteBuffer.wrap(buffer, 0, (int) limit);
639 fc.read(bf);
640 fc.close();
641 temporalFilePosition = -1;
642 streamMode = 0;
643 }
644
645 /**
646 * Frees the memory occupied by this stream. Call this function when this object is no
647 * longer needed.
648 */
649 public void destroy(){
650 buffer = null;
651 readFileChannel = null;
652 readFileSegments = null;
653 }
654
655 /**
656 * Destroys the temporal file shared by all objects of this class. Call this function only
657 * when objects of this class are no longer needed.
658 *
659 * @throws Exception when some problem removing the temporary file occurs
660 */
661 public static void destroyTemporalFile() throws Exception{
662 if(temporalFileName != null){
663 File file = new File(temporalFileName);
664 boolean delete = file.delete();
665 assert(delete);
666 temporalFileName = null;
667 }
668 }
669
670 /**
671 * Computes the memory employed to keep the segments of the stream in the file, when the
672 * stream is in file read mode. This function is for debugging purposes.
673 *
674 * @return the number of bytes employed
675 */
676 public int getMemorySegments(){
677 int memory = 0;
678 if(streamMode == 0){
679 memory = readFileSegments.length * 2 * (Long.SIZE / 8);
680 }
681 return(memory);
682 }
683 }
|