inst/include/seqan/stream/stream_compressor.h
d92ab6f0
 // ==========================================================================
 //                 SeqAn - The Library for Sequence Analysis
 // ==========================================================================
 // Copyright (c) 2006-2018, Knut Reinert, FU Berlin
 // All rights reserved.
 //
 // Redistribution and use in source and binary forms, with or without
 // modification, are permitted provided that the following conditions are met:
 //
 //     * Redistributions of source code must retain the above copyright
 //       notice, this list of conditions and the following disclaimer.
 //     * Redistributions in binary form must reproduce the above copyright
 //       notice, this list of conditions and the following disclaimer in the
 //       documentation and/or other materials provided with the distribution.
 //     * Neither the name of Knut Reinert or the FU Berlin nor the names of
 //       its contributors may be used to endorse or promote products derived
 //       from this software without specific prior written permission.
 //
 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 // ARE DISCLAIMED. IN NO EVENT SHALL KNUT REINERT OR THE FU BERLIN BE LIABLE
 // FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
 // DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
 // SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
 // CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
 // LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
 // OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
 // DAMAGE.
 //
 // ==========================================================================
 // Author: David Weese <david.weese@fu-berlin.de>
 // ==========================================================================
 
 #ifndef SEQAN_STREAM_STREAM_COMPRESSOR_H_
 #define SEQAN_STREAM_STREAM_COMPRESSOR_H_
 
 
 #if SEQAN_HAS_ZLIB
 // Zlib headers
 #include <zlib.h>
 #include "iostream_zutil.h"
 #endif
 
 #include <algorithm>    // copy
 
 namespace seqan {
 
 // ============================================================================
 // Forwards
 // ============================================================================
 
 template <typename TOutPager, typename TSpec>
 struct Pager;
 
 // ============================================================================
 // Tags, Enums
 // ============================================================================
 
 // ============================================================================
 // Classes
 // ============================================================================
 
 // Special end-of-file marker defined by the BGZF compression format.
 // See: https://samtools.github.io/hts-specs/SAMv1.pdf
 static constexpr std::array<uint8_t, 28> BGZF_END_OF_FILE_MARKER {{0x1f, 0x8b, 0x08, 0x04,
                                                                   0x00, 0x00, 0x00, 0x00,
                                                                   0x00, 0xff, 0x06, 0x00,
                                                                   0x42, 0x43, 0x02, 0x00,
                                                                   0x1b, 0x00, 0x03, 0x00,
                                                                   0x00, 0x00, 0x00, 0x00,
                                                                   0x00, 0x00, 0x00, 0x00}};
 
 template <typename TAlgTag>
 struct Compress;
 
 template <typename TAlgTag>
 struct CompressionContext {};
 
 template <typename TAlgTag>
 struct DefaultPageSize;
 
 #if SEQAN_HAS_ZLIB
 
 template <>
 struct CompressionContext<GZFile>
 {
     z_stream strm;
 
     CompressionContext()
     {
         memset(&strm, 0, sizeof(z_stream));
     }
 };
 
 template <>
 struct CompressionContext<BgzfFile>:
     CompressionContext<GZFile>
 {
     enum { BLOCK_HEADER_LENGTH = 18 };
     unsigned char headerPos;
 };
 
 template <typename T>
 struct MagicHeader<BgzfFile, T>
 {
     static char const VALUE[18];
 };
 
 template <typename T>
 char const MagicHeader<BgzfFile, T>::VALUE[18] =
 {
     MagicHeader<GZFile>::VALUE[0], MagicHeader<GZFile>::VALUE[1], MagicHeader<GZFile>::VALUE[2],
     4, 0, 0, 0, 0, 0, '\xff', 6, 0, 'B', 'C', 2, 0, 0, 0
 };
 
 template <>
 struct DefaultPageSize<BgzfFile>
 {
     static const unsigned MAX_BLOCK_SIZE = 64 * 1024;
     static const unsigned BLOCK_FOOTER_LENGTH = 8;
     // 5 bytes block overhead (see 3.2.4. at http://www.gzip.org/zlib/rfc-deflate.html)
     static const unsigned ZLIB_BLOCK_OVERHEAD = 5;
 
     // Reduce the maximal input size, such that the compressed data
     // always fits in one block even for level Z_NO_COMPRESSION.
     enum { BLOCK_HEADER_LENGTH = CompressionContext<BgzfFile>::BLOCK_HEADER_LENGTH };
     static const unsigned VALUE = MAX_BLOCK_SIZE - BLOCK_HEADER_LENGTH - BLOCK_FOOTER_LENGTH - ZLIB_BLOCK_OVERHEAD;
 };
 
 
 /*
 template <typename TOutPager, typename TAlgTag>
 Pager<TOutPager, Compress<TAlgTag> >
 {
     TOutPager outPager;             // outbound pager
     PageTable<FixedSize> table;     // our page table
 
     Pager():
         table(DefaultPageSize<TAlgTag>::VALUE)
     {}
 
     Page & getPage (int64_t position)
     {
         Page *page;
         {
             ScopedReadLock(table.lock);
 
             page = table[position];
             if (posInPage(position, page))                      // does the page exist yet?
                 return page;
         }
         {
             ScopedWriteLock(table.lock);
 
             page = table[position];
             if (posInPage(position, page))                      // does the page exist yet?
                 return page;
 
             page = new Page(table.rangeForPos(position));       // create new page
             reserve(page.data, table.pageSize);                 // allocate required memory
             table.insertPage(page);                             // insert page
             prevPage = prevPage(position);
         }
         return page;
     }
 
     void putPage (Page &page)
     {
         int64_t outPosition = 0;                                // compute start position in outbound pager
         if (page.range.begin != 0)
         {
             PageRange range = getPageRange(beginPosition(page.range) - 1);
             outPosition = endPosition(range);                   // wait for end position of the previous page
         }
 
         TCompressionContext ctx;
         initCompressionContext(ctx);
 
         Size<Page>::Type leftToCompress = length(page);
         while (leftToCompress != 0)
         {
             Page &outPage = outPager.getPage(outPosition);
 
             auto handle = std::async(std::launch::async,
                                   parallel_sum<RAIter>, mid, end);
             compress
         }
     }
 };
 */
 // ============================================================================
 // Functions
 // ============================================================================
 
 inline void
 compressInit(CompressionContext<GZFile> & ctx)
 {
     const int GZIP_WINDOW_BITS = -15;   // no zlib header
     const int Z_DEFAULT_MEM_LEVEL = 8;
 
     ctx.strm.zalloc = NULL;
     ctx.strm.zfree = NULL;
 
     // (weese:) We use Z_BEST_SPEED instead of Z_DEFAULT_COMPRESSION as it turned out
     //          to be 2x faster and produces only 7% bigger output
 //    int status = deflateInit2(&ctx.strm, Z_DEFAULT_COMPRESSION, Z_DEFLATED,
 //                              GZIP_WINDOW_BITS, Z_DEFAULT_MEM_LEVEL, Z_DEFAULT_STRATEGY);
     int status = deflateInit2(&ctx.strm, Z_BEST_SPEED, Z_DEFLATED,
                               GZIP_WINDOW_BITS, Z_DEFAULT_MEM_LEVEL, Z_DEFAULT_STRATEGY);
     if (status != Z_OK)
         throw IOError("GZFile deflateInit2() failed.");
 }
 
 inline void
 compressInit(CompressionContext<BgzfFile> & ctx)
 {
     compressInit(static_cast<CompressionContext<GZFile> &>(ctx));
     ctx.headerPos = 0;
 }
 
 template <typename TTarget, typename TSourceIterator>
 inline typename Size<TTarget>::Type
 compress(TTarget & target, TSourceIterator & source, CompressionContext<BgzfFile> & ctx)
 {
     typedef typename Chunk<TTarget>::Type           TTargetChunk;
     typedef typename Chunk<TSourceIterator>::Type   TSourceChunk;
     typedef typename Value<TSourceChunk>::Type      TSourceValue;
 
     TTargetChunk tChunk;
     TSourceChunk sChunk;
 
     if (ctx.headerPos < sizeof(MagicHeader<BgzfFile>::VALUE))
     {
         size_t headerLeft = sizeof(MagicHeader<BgzfFile>::VALUE) - ctx.headerPos;
         reserveChunk(target, headerLeft, Output());
 
         tChunk = getChunk(target, Output());
         size_t size = std::min(headerLeft, length(tChunk));
         SEQAN_ASSERT_GT(size, 0u);
 
         std::copy(tChunk.begin, sChunk.begin, size);
 
         advanceChunk(target, size);
         ctx.headerPos += size;
         return size;
     }
     else
     {
         sChunk = getChunk(source, Input());
         tChunk = getChunk(target, Output());
 
         ctx.strm.next_in = static_cast<Bytef *>(sChunk.begin);
         ctx.strm.next_out = static_cast<Bytef *>(tChunk.begin);
         ctx.strm.avail_in = length(sChunk) * sizeof(TSourceValue);
         ctx.strm.avail_out = length(tChunk);
 
         SEQAN_ASSERT_GT(ctx.strm.avail_out, 0u);
 
         int status = deflate(&ctx.strm, Z_NO_FLUSH);
         if (status != Z_OK)
             throw IOError("BgzfFile deflateInit2() failed.");
 
         source += length(sChunk) - ctx.strm.avail_in;
         size_t size = length(tChunk) - ctx.strm.avail_out;
         advanceChunk(target, size);
         return size;
     }
 
 
 //    status = deflate(&zs, Z_FINISH);
 //    bool rawDataTooBig = (status != Z_STREAM_END);
 //
 //    status = deflateEnd(&zs);
 //    if (status != Z_OK)
 //        throw IOError("BgzfFile deflateEnd() failed.");
 //
 //    if (!rawDataTooBig)
 //    {
 //        resize(page.raw, zs.total_out + BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH);
 //        break;
 //    }
 }
 
 // ----------------------------------------------------------------------------
 // Helper Function _bgzfUnpackXX()
 // ----------------------------------------------------------------------------
 
 inline uint16_t
 _bgzfUnpack16(char const * buffer)
 {
     uint16_t tmp = *reinterpret_cast<uint16_t const *>(buffer);
     enforceLittleEndian(tmp);
     return tmp;
 }
 
 inline uint32_t
 _bgzfUnpack32(char const * buffer)
 {
     uint32_t tmp = *reinterpret_cast<uint32_t const *>(buffer);
     enforceLittleEndian(tmp);
     return tmp;
 }
 
 // ----------------------------------------------------------------------------
 // Helper Function _bgzfPackXX()
 // ----------------------------------------------------------------------------
 
 inline void
 _bgzfPack16(char * buffer, uint16_t value)
 {
     enforceLittleEndian(value);
     *reinterpret_cast<uint16_t *>(buffer) = value;
 }
 
 inline void
 _bgzfPack32(char * buffer, uint32_t value)
 {
     enforceLittleEndian(value);
     *reinterpret_cast<uint32_t *>(buffer) = value;
 }
 
 
 template <typename TDestValue, typename TDestCapacity, typename TSourceValue, typename TSourceLength>
 inline TDestCapacity
 _compressBlock(TDestValue *dstBegin,   TDestCapacity dstCapacity,
                TSourceValue *srcBegin, TSourceLength srcLength, CompressionContext<BgzfFile> & ctx)
 {
     const size_t BLOCK_HEADER_LENGTH = DefaultPageSize<BgzfFile>::BLOCK_HEADER_LENGTH;
     const size_t BLOCK_FOOTER_LENGTH = DefaultPageSize<BgzfFile>::BLOCK_FOOTER_LENGTH;
 
     SEQAN_ASSERT_GT(dstCapacity, BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH);
     SEQAN_ASSERT_EQ(sizeof(TDestValue), 1u);
     SEQAN_ASSERT_EQ(sizeof(unsigned), 4u);
 
     // 1. COPY HEADER
 
     std::copy(&MagicHeader<BgzfFile>::VALUE[0], &MagicHeader<BgzfFile>::VALUE[BLOCK_HEADER_LENGTH], dstBegin);
 
 
     // 2. COMPRESS
 
     compressInit(ctx);
     ctx.strm.next_in = (Bytef *)(srcBegin);
     ctx.strm.next_out = (Bytef *)(dstBegin + BLOCK_HEADER_LENGTH);
     ctx.strm.avail_in = srcLength * sizeof(TSourceValue);
     ctx.strm.avail_out = dstCapacity - BLOCK_HEADER_LENGTH - BLOCK_FOOTER_LENGTH;
 
     int status = deflate(&ctx.strm, Z_FINISH);
     if (status != Z_STREAM_END)
     {
         deflateEnd(&ctx.strm);
         throw IOError("Deflation failed. Compressed BGZF data is too big.");
     }
 
     status = deflateEnd(&ctx.strm);
     if (status != Z_OK)
         throw IOError("BGZF deflateEnd() failed.");
 
 
     // 3. APPEND FOOTER
 
     // Set compressed length into buffer, compute CRC and write CRC into buffer.
 
     size_t len = dstCapacity - ctx.strm.avail_out;
     _bgzfPack16(dstBegin + 16, len - 1);
 
     dstBegin += len - BLOCK_FOOTER_LENGTH;
     _bgzfPack32(dstBegin, crc32(crc32(0u, NULL, 0u), (Bytef *)(srcBegin), srcLength * sizeof(TSourceValue)));
     _bgzfPack32(dstBegin + 4, srcLength * sizeof(TSourceValue));
 
     return dstCapacity - ctx.strm.avail_out;
 }
 
 inline void
 decompressInit(CompressionContext<GZFile> & ctx)
 {
     const int GZIP_WINDOW_BITS = -15;   // no zlib header
 
     ctx.strm.zalloc = NULL;
     ctx.strm.zfree = NULL;
     int status = inflateInit2(&ctx.strm, GZIP_WINDOW_BITS);
     if (status != Z_OK)
         throw IOError("GZip inflateInit2() failed.");
 }
 
 inline void
 decompressInit(CompressionContext<BgzfFile> & ctx)
 {
     decompressInit(static_cast<CompressionContext<GZFile> &>(ctx));
     ctx.headerPos = 0;
 }
 
 inline bool
 _bgzfCheckHeader(char const * header)
 {
     const char FLG_FEXTRA = 4;
     const char BGZF_ID1 = 'B';
     const char BGZF_ID2 = 'C';
     const char BGZF_LEN = 2;
     const char BGZF_XLEN = 6;  // BGZF_LEN+4
 
     return (header[0] == (char)MagicHeader<GZFile>::VALUE[0] &&
             header[1] == (char)MagicHeader<GZFile>::VALUE[1] &&
             header[2] == (char)MagicHeader<GZFile>::VALUE[2] &&
             (header[3] & FLG_FEXTRA) != 0 &&
             _bgzfUnpack16(header + 10) == BGZF_XLEN &&
             header[12] == BGZF_ID1 &&
             header[13] == BGZF_ID2 &&
             _bgzfUnpack16(header + 14) == BGZF_LEN);
 }
 
 // read first bytes of a file/stream and compare with file format's magic header
 template <typename TStream>
 inline bool
 guessFormatFromStream(TStream &istream, BgzfFile)
 {
     char putbackBuf[18];
     bool match = false;
 
     SEQAN_ASSERT(istream.good());
 
     // try to read and check header
     size_t numRead = istream.readsome(&putbackBuf[0], sizeof(putbackBuf));
     if (numRead == sizeof(putbackBuf) && _bgzfCheckHeader(putbackBuf))
         match = true;
 
     // unget all read characters
     for (; numRead > 0; --numRead)
         istream.unget();
 
     SEQAN_ASSERT(istream.good());
 
     return match;
 }
 
 // ----------------------------------------------------------------------------
 // Function _preprocessFilePage()
 // ----------------------------------------------------------------------------
 
 template <typename TDestValue, typename TDestCapacity, typename TSourceValue, typename TSourceLength>
 inline TDestCapacity
 _decompressBlock(TDestValue *dstBegin,   TDestCapacity dstCapacity,
                  TSourceValue *srcBegin, TSourceLength srcLength, CompressionContext<BgzfFile> & ctx)
 {
     const size_t BLOCK_HEADER_LENGTH = DefaultPageSize<BgzfFile>::BLOCK_HEADER_LENGTH;
     const size_t BLOCK_FOOTER_LENGTH = DefaultPageSize<BgzfFile>::BLOCK_FOOTER_LENGTH;
 
     SEQAN_ASSERT_EQ(sizeof(TSourceValue), 1u);
     SEQAN_ASSERT_EQ(sizeof(unsigned), 4u);
 
     // 1. CHECK HEADER
 
     if (srcLength <= BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH)
         throw IOError("BGZF block too short.");
 
     if (!_bgzfCheckHeader(srcBegin))
         throw IOError("Invalid BGZF block header.");
 
     size_t compressedLen = _bgzfUnpack16(srcBegin + 16) + 1u;
     if (compressedLen != srcLength)
         throw IOError("BGZF compressed size mismatch.");
 
 
     // 2. DECOMPRESS
 
     decompressInit(ctx);
     ctx.strm.next_in = (Bytef *)(srcBegin + BLOCK_HEADER_LENGTH);
     ctx.strm.next_out = (Bytef *)(dstBegin);
     ctx.strm.avail_in = srcLength - BLOCK_HEADER_LENGTH - BLOCK_FOOTER_LENGTH;
     ctx.strm.avail_out = dstCapacity * sizeof(TDestValue);
 
     int status = inflate(&ctx.strm, Z_FINISH);
     if (status != Z_STREAM_END)
     {
         inflateEnd(&ctx.strm);
         throw IOError("Inflation failed. Decompressed BGZF data is too big.");
     }
 
     status = inflateEnd(&ctx.strm);
     if (status != Z_OK)
         throw IOError("BGZF inflateEnd() failed.");
 
 
     // 3. CHECK FOOTER
 
     // Check compressed length in buffer, compute CRC and compare with CRC in buffer.
 
     unsigned crc = crc32(crc32(0u, NULL, 0u), (Bytef *)(dstBegin), dstCapacity - ctx.strm.avail_out);
 
     srcBegin += compressedLen - BLOCK_FOOTER_LENGTH;
     if (_bgzfUnpack32(srcBegin) != crc)
         throw IOError("BGZF wrong checksum.");
 
     if (_bgzfUnpack32(srcBegin + 4) != dstCapacity - ctx.strm.avail_out)
         throw IOError("BGZF size mismatch.");
 
     return (dstCapacity - ctx.strm.avail_out) / sizeof(TDestValue);
 }
 
 #endif  // #if SEQAN_HAS_ZLIB
 
 }  // namespace seqan
 
 #endif  // SEQAN_STREAM_STREAM_COMPRESSOR_H_