Browse code

seqan header files

aguang authored on 15/02/2018 18:04:57
Showing1 changed files
1 1
new file mode 100644
... ...
@@ -0,0 +1,931 @@
1
+// zipstream Library License:
2
+// --------------------------
3
+//
4
+// The zlib/libpng License Copyright (c) 2003 Jonathan de Halleux.
5
+//
6
+// This software is provided 'as-is', without any express or implied warranty. In no event will the authors be held liable for any damages arising from the use of this software.
7
+//
8
+// Permission is granted to anyone to use this software for any purpose, including commercial applications, and to alter it and redistribute it freely, subject to the following restrictions:
9
+//
10
+// 1. The origin of this software must not be misrepresented; you must not claim that you wrote the original software. If you use this software in a product, an acknowledgment in the product documentation would be appreciated but is not required.
11
+//
12
+// 2. Altered source versions must be plainly marked as such, and must not be misrepresented as being the original software.
13
+//
14
+// 3. This notice may not be removed or altered from any source distribution
15
+//
16
+//
17
+// Author: Jonathan de Halleux, dehalleux@pelikhan.com, 2003   (original zlib stream)
18
+// Author: David Weese, dave.weese@gmail.com, 2014             (extension to parallel block-wise compression in bgzf format)
19
+
20
+#ifndef INCLUDE_SEQAN_STREAM_IOSTREAM_BGZF_H_
21
+#define INCLUDE_SEQAN_STREAM_IOSTREAM_BGZF_H_
22
+
23
+#ifndef SEQAN_BGZF_NUM_THREADS
24
+#define SEQAN_BGZF_NUM_THREADS 16
25
+#endif
26
+
27
+namespace seqan {
28
+
29
+const unsigned BGZF_MAX_BLOCK_SIZE = 64 * 1024;
30
+const unsigned BGZF_BLOCK_HEADER_LENGTH = 18;
31
+const unsigned BGZF_BLOCK_FOOTER_LENGTH = 8;
32
+const unsigned ZLIB_BLOCK_OVERHEAD = 5; // 5 bytes block overhead (see 3.2.4. at http://www.gzip.org/zlib/rfc-deflate.html)
33
+
34
+// Reduce the maximal input size, such that the compressed data
35
+// always fits in one block even for level Z_NO_COMPRESSION.
36
+const unsigned BGZF_BLOCK_SIZE = BGZF_MAX_BLOCK_SIZE - BGZF_BLOCK_HEADER_LENGTH - BGZF_BLOCK_FOOTER_LENGTH - ZLIB_BLOCK_OVERHEAD;
37
+
38
+// ===========================================================================
39
+// Classes
40
+// ===========================================================================
41
+
42
+// --------------------------------------------------------------------------
43
+// Class basic_bgzf_streambuf
44
+// --------------------------------------------------------------------------
45
+
46
+template<
47
+    typename Elem,
48
+    typename Tr = std::char_traits<Elem>,
49
+    typename ElemA = std::allocator<Elem>,
50
+    typename ByteT = char,
51
+    typename ByteAT = std::allocator<ByteT>
52
+>
53
+class basic_bgzf_streambuf : public std::basic_streambuf<Elem, Tr>
54
+{
55
+public:
56
+    typedef std::basic_ostream<Elem, Tr>& ostream_reference;
57
+    typedef ElemA char_allocator_type;
58
+    typedef ByteT byte_type;
59
+    typedef ByteAT byte_allocator_type;
60
+    typedef byte_type* byte_buffer_type;
61
+    typedef typename Tr::char_type char_type;
62
+    typedef typename Tr::int_type int_type;
63
+
64
+    typedef ConcurrentQueue<size_t, Suspendable<Limit> > TJobQueue;
65
+
66
+    struct OutputBuffer
67
+    {
68
+        char    buffer[BGZF_MAX_BLOCK_SIZE];
69
+        size_t  size;
70
+    };
71
+
72
+    struct BufferWriter
73
+    {
74
+        ostream_reference ostream;
75
+
76
+        BufferWriter(ostream_reference ostream) :
77
+            ostream(ostream)
78
+        {}
79
+
80
+        bool operator() (OutputBuffer const & outputBuffer)
81
+        {
82
+            ostream.write(outputBuffer.buffer, outputBuffer.size);
83
+            return ostream.good();
84
+        }
85
+    };
86
+
87
+    struct CompressionJob
88
+    {
89
+        typedef std::vector<char_type, char_allocator_type> TBuffer;
90
+
91
+        TBuffer         buffer;
92
+        size_t          size;
93
+        OutputBuffer    *outputBuffer;
94
+
95
+        CompressionJob() :
96
+            buffer(BGZF_BLOCK_SIZE / sizeof(char_type), 0),
97
+            size(0),
98
+            outputBuffer(NULL)
99
+        {}
100
+    };
101
+
102
+    // string of recycable jobs
103
+    size_t                  numThreads;
104
+    size_t                  numJobs;
105
+    String<CompressionJob>  jobs;
106
+    TJobQueue               jobQueue;
107
+    TJobQueue               idleQueue;
108
+    Serializer<
109
+        OutputBuffer,
110
+        BufferWriter>       serializer;
111
+
112
+    size_t                  currentJobId;
113
+    bool                    currentJobAvail;
114
+
115
+
116
+    struct CompressionThread
117
+    {
118
+        basic_bgzf_streambuf            *streamBuf;
119
+        CompressionContext<BgzfFile>    compressionCtx;
120
+        size_t                          threadNum;
121
+
122
+        void operator()()
123
+        {
124
+            ScopedReadLock<TJobQueue> readLock(streamBuf->jobQueue);
125
+            ScopedWriteLock<TJobQueue> writeLock(streamBuf->idleQueue);
126
+
127
+            // wait for a new job to become available
128
+            bool success = true;
129
+            while (success)
130
+            {
131
+                size_t jobId = -1;
132
+                if (!popFront(jobId, streamBuf->jobQueue))
133
+                    return;
134
+
135
+                CompressionJob &job = streamBuf->jobs[jobId];
136
+
137
+                // compress block with zlib
138
+                job.outputBuffer->size = _compressBlock(
139
+                    job.outputBuffer->buffer, sizeof(job.outputBuffer->buffer),
140
+                    &job.buffer[0], job.size, compressionCtx);
141
+
142
+                success = releaseValue(streamBuf->serializer, job.outputBuffer);
143
+                appendValue(streamBuf->idleQueue, jobId);
144
+            }
145
+        }
146
+    };
147
+
148
+    // array of worker threads
149
+    using TFuture = decltype(std::async(CompressionThread{nullptr, CompressionContext<BgzfFile>{}, static_cast<size_t>(0)}));
150
+    std::vector<TFuture>         threads;
151
+
152
+    basic_bgzf_streambuf(ostream_reference ostream_,
153
+                         size_t numThreads = SEQAN_BGZF_NUM_THREADS,
154
+                         size_t jobsPerThread = 8) :
155
+        numThreads(numThreads),
156
+        numJobs(numThreads * jobsPerThread),
157
+        jobQueue(numJobs),
158
+        idleQueue(numJobs),
159
+        serializer(ostream_, numThreads * jobsPerThread)
160
+    {
161
+        resize(jobs, numJobs, Exact());
162
+        currentJobId = 0;
163
+
164
+        lockWriting(jobQueue);
165
+        lockReading(idleQueue);
166
+        setReaderWriterCount(jobQueue, numThreads, 1);
167
+        setReaderWriterCount(idleQueue, 1, numThreads);
168
+
169
+        for (unsigned i = 0; i < numJobs; ++i)
170
+        {
171
+            bool success = appendValue(idleQueue, i);
172
+            ignoreUnusedVariableWarning(success);
173
+            SEQAN_ASSERT(success);
174
+        }
175
+
176
+        for (size_t i = 0; i < numThreads; ++i)
177
+        {
178
+            threads.push_back(std::async(std::launch::async, CompressionThread{this, CompressionContext<BgzfFile>{}, i}));
179
+        }
180
+
181
+        currentJobAvail = popFront(currentJobId, idleQueue);
182
+        SEQAN_ASSERT(currentJobAvail);
183
+
184
+        CompressionJob &job = jobs[currentJobId];
185
+        job.outputBuffer = aquireValue(serializer);
186
+        this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
187
+    }
188
+
189
+    ~basic_bgzf_streambuf()
190
+    {
191
+        // the buffer is now (after addFooter()) and flush will append the empty EOF marker
192
+        flush(true);
193
+
194
+        unlockWriting(jobQueue);
195
+        unlockReading(idleQueue);
196
+    }
197
+
198
+    bool compressBuffer(size_t size)
199
+    {
200
+        // submit current job
201
+        if (currentJobAvail)
202
+        {
203
+            jobs[currentJobId].size = size;
204
+            appendValue(jobQueue, currentJobId);
205
+        }
206
+
207
+        // recycle existing idle job
208
+        if (!(currentJobAvail = popFront(currentJobId, idleQueue)))
209
+            return false;
210
+
211
+        jobs[currentJobId].outputBuffer = aquireValue(serializer);
212
+
213
+        return serializer;
214
+    }
215
+
216
+    int_type overflow(int_type c)
217
+    {
218
+        int w = static_cast<int>(this->pptr() - this->pbase());
219
+        if (c != EOF)
220
+        {
221
+            *this->pptr() = c;
222
+            ++w;
223
+        }
224
+        if (compressBuffer(w))
225
+        {
226
+            CompressionJob &job = jobs[currentJobId];
227
+            this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
228
+            return c;
229
+        }
230
+        else
231
+        {
232
+            return EOF;
233
+        }
234
+    }
235
+
236
+    std::streamsize flush(bool flushEmptyBuffer = false)
237
+    {
238
+        int w = static_cast<int>(this->pptr() - this->pbase());
239
+        if ((w != 0 || flushEmptyBuffer) && compressBuffer(w))
240
+        {
241
+            CompressionJob &job = jobs[currentJobId];
242
+            this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
243
+        }
244
+        else
245
+        {
246
+            w = 0;
247
+        }
248
+
249
+        // wait for running compressor threads
250
+        waitForMinSize(idleQueue, numJobs - 1);
251
+
252
+        serializer.worker.ostream.flush();
253
+        return w;
254
+    }
255
+
256
+    int sync()
257
+    {
258
+        if (this->pptr() != this->pbase())
259
+        {
260
+            int c = overflow(EOF);
261
+            if (c == EOF)
262
+                return -1;
263
+        }
264
+        return 0;
265
+    }
266
+
267
+    void addFooter()
268
+    {
269
+        // we flush the filled buffer here, so that an empty (EOF) buffer is flushed in the d'tor
270
+        if (this->pptr() != this->pbase())
271
+            overflow(EOF);
272
+    }
273
+
274
+    // returns a reference to the output stream
275
+    ostream_reference get_ostream() const    { return serializer.worker.ostream; };
276
+};
277
+
278
+// --------------------------------------------------------------------------
279
+// Class basic_unbgzf_streambuf
280
+// --------------------------------------------------------------------------
281
+
282
+template<
283
+    typename Elem,
284
+    typename Tr = std::char_traits<Elem>,
285
+    typename ElemA = std::allocator<Elem>,
286
+    typename ByteT = char,
287
+    typename ByteAT = std::allocator<ByteT>
288
+>
289
+class basic_unbgzf_streambuf :
290
+    public std::basic_streambuf<Elem, Tr>
291
+{
292
+public:
293
+    typedef std::basic_istream<Elem, Tr>& istream_reference;
294
+    typedef ElemA char_allocator_type;
295
+    typedef ByteT byte_type;
296
+    typedef ByteAT byte_allocator_type;
297
+    typedef byte_type* byte_buffer_type;
298
+    typedef typename Tr::char_type char_type;
299
+    typedef typename Tr::int_type int_type;
300
+    typedef typename Tr::off_type off_type;
301
+    typedef typename Tr::pos_type pos_type;
302
+
303
+    typedef std::vector<char_type, char_allocator_type>     TBuffer;
304
+    typedef ConcurrentQueue<int, Suspendable<Limit> >       TJobQueue;
305
+
306
+    static const size_t MAX_PUTBACK = 4;
307
+
308
+    struct Serializer
309
+    {
310
+        istream_reference   istream;
311
+        std::mutex          lock;
312
+        IOError             *error;
313
+        off_type            fileOfs;
314
+
315
+        Serializer(istream_reference istream) :
316
+            istream(istream),
317
+            error(NULL),
318
+            fileOfs(0u)
319
+        {}
320
+
321
+        ~Serializer()
322
+        {
323
+            delete error;
324
+        }
325
+    };
326
+
327
+    Serializer serializer;
328
+
329
+    struct DecompressionJob
330
+    {
331
+        typedef std::vector<byte_type, byte_allocator_type> TInputBuffer;
332
+
333
+        TInputBuffer            inputBuffer;
334
+        TBuffer                 buffer;
335
+        off_type                fileOfs;
336
+        int                     size;
337
+        unsigned                compressedSize;
338
+
339
+        std::mutex              cs;
340
+        std::condition_variable readyEvent;
341
+        bool                    ready;
342
+        bool                    bgzfEofMarker;
343
+
344
+        DecompressionJob() :
345
+            inputBuffer(BGZF_MAX_BLOCK_SIZE, 0),
346
+            buffer(MAX_PUTBACK + BGZF_MAX_BLOCK_SIZE / sizeof(char_type), 0),
347
+            fileOfs(),
348
+            size(0),
349
+            cs(),
350
+            readyEvent(),
351
+            ready(true),
352
+            bgzfEofMarker(false)
353
+        {}
354
+
355
+        // TODO(rrahn): Do we need a copy constructor for the decompression job.
356
+        DecompressionJob(DecompressionJob const &other) :
357
+            inputBuffer(other.inputBuffer),
358
+            buffer(other.buffer),
359
+            fileOfs(other.fileOfs),
360
+            size(other.size),
361
+            cs(),
362
+            readyEvent(),
363
+            ready(other.ready),
364
+            bgzfEofMarker(other.bgzfEofMarker)
365
+        {}
366
+    };
367
+
368
+    // string of recycable jobs
369
+    size_t                      numThreads;
370
+    size_t                      numJobs;
371
+    String<DecompressionJob>    jobs;
372
+    TJobQueue                   runningQueue;
373
+    TJobQueue                   todoQueue;
374
+    int                         currentJobId;
375
+
376
+    struct DecompressionThread
377
+    {
378
+        basic_unbgzf_streambuf          *streamBuf;
379
+        CompressionContext<BgzfFile>    compressionCtx;
380
+
381
+        void operator()()
382
+        {
383
+            ScopedReadLock<TJobQueue> readLock(streamBuf->todoQueue);
384
+            ScopedWriteLock<TJobQueue> writeLock(streamBuf->runningQueue);
385
+
386
+            // wait for a new job to become available
387
+            while (true)
388
+            {
389
+                int jobId = -1;
390
+                if (!popFront(jobId, streamBuf->todoQueue))
391
+                    return;
392
+
393
+                DecompressionJob &job = streamBuf->jobs[jobId];
394
+                size_t tailLen = 0;
395
+
396
+                // typically the idle queue contains only ready jobs
397
+                // however, if seek() fast forwards running jobs into the todoQueue
398
+                // the caller defers the task of waiting to the decompression threads
399
+                if (!job.ready)
400
+                {
401
+                    std::unique_lock<std::mutex> lock(job.cs);
402
+                    job.readyEvent.wait(lock, [&job]{return job.ready;});
403
+                    SEQAN_ASSERT_EQ(job.ready, true);
404
+                }
405
+
406
+                {
407
+                    std::lock_guard<std::mutex> scopedLock(streamBuf->serializer.lock);
408
+
409
+                    job.bgzfEofMarker = false;
410
+                    if (streamBuf->serializer.error != NULL)
411
+                        return;
412
+
413
+                    // remember start offset (for tellg later)
414
+                    job.fileOfs = streamBuf->serializer.fileOfs;
415
+                    job.size = -1;
416
+                    job.compressedSize = 0;
417
+
418
+                    // only load if not at EOF
419
+                    if (job.fileOfs != -1)
420
+                    {
421
+                        // read header
422
+                        streamBuf->serializer.istream.read(
423
+                            (char*)&job.inputBuffer[0],
424
+                            BGZF_BLOCK_HEADER_LENGTH);
425
+
426
+                        if (!streamBuf->serializer.istream.good())
427
+                        {
428
+                            streamBuf->serializer.fileOfs = -1;
429
+                            if (streamBuf->serializer.istream.eof())
430
+                                goto eofSkip;
431
+                            streamBuf->serializer.error = new IOError("Stream read error.");
432
+                            return;
433
+                        }
434
+
435
+                        // check header
436
+                        if (!_bgzfCheckHeader(&job.inputBuffer[0]))
437
+                        {
438
+                            streamBuf->serializer.fileOfs = -1;
439
+                            streamBuf->serializer.error = new IOError("Invalid BGZF block header.");
440
+                            return;
441
+                        }
442
+
443
+                        // extract length of compressed data
444
+                        tailLen = _bgzfUnpack16(&job.inputBuffer[0] + 16) + 1u - BGZF_BLOCK_HEADER_LENGTH;
445
+
446
+                        // read compressed data and tail
447
+                        streamBuf->serializer.istream.read(
448
+                            (char*)&job.inputBuffer[0] + BGZF_BLOCK_HEADER_LENGTH,
449
+                            tailLen);
450
+
451
+                        // Check if end-of-file marker is set
452
+                        if (memcmp(reinterpret_cast<uint8_t const *>(&job.inputBuffer[0]),
453
+                                   reinterpret_cast<uint8_t const *>(&BGZF_END_OF_FILE_MARKER[0]),
454
+                                   28) == 0)
455
+                        {
456
+                            job.bgzfEofMarker = true;
457
+                        }
458
+
459
+                        if (!streamBuf->serializer.istream.good())
460
+                        {
461
+                            streamBuf->serializer.fileOfs = -1;
462
+                            if (streamBuf->serializer.istream.eof())
463
+                                goto eofSkip;
464
+                            streamBuf->serializer.error = new IOError("Stream read error.");
465
+                            return;
466
+                        }
467
+
468
+                        job.compressedSize = BGZF_BLOCK_HEADER_LENGTH + tailLen;
469
+                        streamBuf->serializer.fileOfs += job.compressedSize;
470
+                        job.ready = false;
471
+
472
+                    eofSkip:
473
+                        streamBuf->serializer.istream.clear(
474
+                            streamBuf->serializer.istream.rdstate() & ~std::ios_base::failbit);
475
+                    }
476
+
477
+                    if (!appendValue(streamBuf->runningQueue, jobId))
478
+                    {
479
+                        // signal that job is ready
480
+                        {
481
+                            std::unique_lock<std::mutex> lock(job.cs);
482
+                            job.ready = true;
483
+                        }
484
+                        job.readyEvent.notify_all();
485
+                        return;  // Terminate this thread.
486
+                    }
487
+                }
488
+
489
+                if (!job.ready)
490
+                {
491
+                    // decompress block
492
+                    job.size = _decompressBlock(
493
+                        &job.buffer[0] + MAX_PUTBACK, capacity(job.buffer),
494
+                        &job.inputBuffer[0], job.compressedSize, compressionCtx);
495
+
496
+                    // signal that job is ready
497
+                    {
498
+                        std::unique_lock<std::mutex> lock(job.cs);
499
+                        job.ready = true;
500
+                    }
501
+                    job.readyEvent.notify_all();
502
+                }
503
+            }
504
+        }
505
+    };
506
+
507
+    // array of worker threads
508
+    using TFuture = decltype(std::async(DecompressionThread{nullptr, CompressionContext<BgzfFile>{}}));
509
+    std::vector<TFuture> threads;
510
+    TBuffer              putbackBuffer;
511
+
512
+    basic_unbgzf_streambuf(istream_reference istream_,
513
+                           size_t numThreads = SEQAN_BGZF_NUM_THREADS,
514
+                           size_t jobsPerThread = 8) :
515
+        serializer(istream_),
516
+        numThreads(numThreads),
517
+        numJobs(numThreads * jobsPerThread),
518
+        runningQueue(numJobs),
519
+        todoQueue(numJobs),
520
+        putbackBuffer(MAX_PUTBACK)
521
+    {
522
+        resize(jobs, numJobs, Exact());
523
+        currentJobId = -1;
524
+
525
+        lockReading(runningQueue);
526
+        lockWriting(todoQueue);
527
+        setReaderWriterCount(runningQueue, 1, numThreads);
528
+        setReaderWriterCount(todoQueue, numThreads, 1);
529
+
530
+        for (unsigned i = 0; i < numJobs; ++i)
531
+        {
532
+            bool success = appendValue(todoQueue, i);
533
+            ignoreUnusedVariableWarning(success);
534
+            SEQAN_ASSERT(success);
535
+        }
536
+
537
+        for (unsigned i = 0; i < numThreads; ++i)
538
+        {
539
+            threads.push_back(std::async(std::launch::async, DecompressionThread{this, CompressionContext<BgzfFile>{}}));
540
+        }
541
+    }
542
+
543
+    ~basic_unbgzf_streambuf()
544
+    {
545
+        unlockWriting(todoQueue);
546
+        unlockReading(runningQueue);
547
+    }
548
+
549
+    int_type underflow()
550
+    {
551
+        // no need to use the next buffer?
552
+        if (this->gptr() && this->gptr() < this->egptr())
553
+            return Tr::to_int_type(*this->gptr());
554
+
555
+        size_t putback = this->gptr() - this->eback();
556
+        if (putback > MAX_PUTBACK)
557
+            putback = MAX_PUTBACK;
558
+
559
+        // save at most MAX_PUTBACK characters from previous page to putback buffer
560
+        if (putback != 0)
561
+            std::copy(
562
+                this->gptr() - putback,
563
+                this->gptr(),
564
+                &putbackBuffer[0]);
565
+
566
+        if (currentJobId >= 0)
567
+            appendValue(todoQueue, currentJobId);
568
+
569
+        while (true)
570
+        {
571
+            if (!popFront(currentJobId, runningQueue))
572
+            {
573
+                currentJobId = -1;
574
+                SEQAN_ASSERT(serializer.error != NULL);
575
+                if (serializer.error != NULL)
576
+                    throw *serializer.error;
577
+                return EOF;
578
+            }
579
+
580
+            DecompressionJob &job = jobs[currentJobId];
581
+
582
+            // restore putback buffer
583
+            this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
584
+            if (putback != 0)
585
+                std::copy(
586
+                    &putbackBuffer[0],
587
+                    &putbackBuffer[0] + putback,
588
+                    &job.buffer[0] + (MAX_PUTBACK - putback));
589
+
590
+            // wait for the end of decompression
591
+            {
592
+                std::unique_lock<std::mutex> lock(job.cs);
593
+                job.readyEvent.wait(lock, [&job]{return job.ready;});
594
+            }
595
+
596
+            size_t size = (job.size != -1)? job.size : 0;
597
+
598
+            // reset buffer pointers
599
+            this->setg(
600
+                  &job.buffer[0] + (MAX_PUTBACK - putback),     // beginning of putback area
601
+                  &job.buffer[0] + MAX_PUTBACK,                 // read position
602
+                  &job.buffer[0] + (MAX_PUTBACK + size));       // end of buffer
603
+
604
+            // The end of the bgzf file is reached, either if there was an error, or if the
605
+            // end-of-file marker was reached, while the uncompressed block had zero size.
606
+            if (job.size == -1 || (job.size == 0 && job.bgzfEofMarker))
607
+                return EOF;
608
+            else if (job.size > 0)
609
+                return Tr::to_int_type(*this->gptr());      // return next character
610
+
611
+            throw IOError("BGZF: Invalid end condition in decompression. "
612
+                          "Most likely due to an empty bgzf block without end-of-file marker.");
613
+        }
614
+    }
615
+
616
+    pos_type seekoff(off_type ofs, std::ios_base::seekdir dir, std::ios_base::openmode openMode)
617
+    {
618
+        if ((openMode & (std::ios_base::in | std::ios_base::out)) == std::ios_base::in)
619
+        {
620
+            if (dir == std::ios_base::cur && ofs >= 0)
621
+            {
622
+                // forward delta seek
623
+                while (currentJobId < 0 || this->egptr() - this->gptr() < ofs)
624
+                {
625
+                    ofs -= this->egptr() - this->gptr();
626
+                    if (this->underflow() == EOF)
627
+                        break;
628
+                }
629
+
630
+                if (currentJobId >= 0 && ofs <= this->egptr() - this->gptr())
631
+                {
632
+                    DecompressionJob &job = jobs[currentJobId];
633
+
634
+                    // reset buffer pointers
635
+                    this->setg(
636
+                          this->eback(),            // beginning of putback area
637
+                          this->gptr() + ofs,       // read position
638
+                          this->egptr());           // end of buffer
639
+
640
+                    if (this->gptr() != this->egptr())
641
+                        return pos_type((job.fileOfs << 16) + ((this->gptr() - &job.buffer[MAX_PUTBACK])));
642
+                    else
643
+                        return pos_type((job.fileOfs + job.compressedSize) << 16);
644
+                }
645
+
646
+            }
647
+            else if (dir == std::ios_base::beg)
648
+            {
649
+                // random seek
650
+                std::streampos destFileOfs = ofs >> 16;
651
+
652
+                // are we in the same block?
653
+                if (currentJobId >= 0 && jobs[currentJobId].fileOfs == (off_type)destFileOfs)
654
+                {
655
+                    DecompressionJob &job = jobs[currentJobId];
656
+
657
+                    // reset buffer pointers
658
+                    this->setg(
659
+                          this->eback(),                                        // beginning of putback area
660
+                          &job.buffer[0] + (MAX_PUTBACK + (ofs & 0xffff)),      // read position
661
+                          this->egptr());                                       // end of buffer
662
+                    return ofs;
663
+                }
664
+
665
+                // ok, different block
666
+                {
667
+                    std::lock_guard<std::mutex> scopedLock(serializer.lock);
668
+
669
+                    // remove all running jobs and put them in the idle queue unless we
670
+                    // find our seek target
671
+
672
+                    if (currentJobId >= 0)
673
+                        appendValue(todoQueue, currentJobId);
674
+
675
+                    // Note that if we are here the current job does not represent the sought block.
676
+                    // Hence if the running queue is empty we need to explicitly unset the jobId,
677
+                    // otherwise we would not update the serializers istream pointer to the correct position.
678
+                    if (empty(runningQueue))
679
+                        currentJobId = -1;
680
+
681
+                    // empty is thread-safe in serializer.lock
682
+                    while (!empty(runningQueue))
683
+                    {
684
+                        popFront(currentJobId, runningQueue);
685
+
686
+                        if (jobs[currentJobId].fileOfs == (off_type)destFileOfs)
687
+                            break;
688
+
689
+                        // push back useless job
690
+                        appendValue(todoQueue, currentJobId);
691
+                        currentJobId = -1;
692
+                    }
693
+
694
+                    if (currentJobId == -1)
695
+                    {
696
+                        SEQAN_ASSERT(empty(runningQueue));
697
+                        serializer.istream.clear(serializer.istream.rdstate() & ~std::ios_base::eofbit);
698
+                        if (serializer.istream.rdbuf()->pubseekpos(destFileOfs, std::ios_base::in) == destFileOfs)
699
+                            serializer.fileOfs = destFileOfs;
700
+                        else
701
+                            currentJobId = -2;      // temporarily signals a seek error
702
+                    }
703
+                }
704
+
705
+                // if our block wasn't in the running queue yet, it should now
706
+                // be the first that falls out after modifying serializer.fileOfs
707
+                if (currentJobId == -1)
708
+                    popFront(currentJobId, runningQueue);
709
+                else if (currentJobId == -2)
710
+                    currentJobId = -1;
711
+
712
+                if (currentJobId >= 0)
713
+                {
714
+                    // wait for the end of decompression
715
+                    DecompressionJob &job = jobs[currentJobId];
716
+
717
+                    {
718
+                        std::unique_lock<std::mutex> lock(job.cs);
719
+                        job.readyEvent.wait(lock, [&job]{return job.ready;});
720
+                    }
721
+
722
+                    SEQAN_ASSERT_EQ(job.fileOfs, (off_type)destFileOfs);
723
+
724
+                    // reset buffer pointers
725
+                    this->setg(
726
+                          &job.buffer[0] + MAX_PUTBACK,                     // no putback area
727
+                          &job.buffer[0] + (MAX_PUTBACK + (ofs & 0xffff)),  // read position
728
+                          &job.buffer[0] + (MAX_PUTBACK + job.size));       // end of buffer
729
+                    return ofs;
730
+                }
731
+            }
732
+        }
733
+        return pos_type(off_type(-1));
734
+    }
735
+
736
+    pos_type seekpos(pos_type pos, std::ios_base::openmode openMode)
737
+    {
738
+        return seekoff(off_type(pos), std::ios_base::beg, openMode);
739
+    }
740
+
741
+    // returns the compressed input istream
742
+    istream_reference get_istream()    { return serializer.istream; };
743
+};
744
+
745
+// --------------------------------------------------------------------------
746
+// Class basic_bgzf_ostreambase
747
+// --------------------------------------------------------------------------
748
+
749
+template<
750
+    typename Elem,
751
+    typename Tr = std::char_traits<Elem>,
752
+    typename ElemA = std::allocator<Elem>,
753
+    typename ByteT = char,
754
+    typename ByteAT = std::allocator<ByteT>
755
+>
756
+class basic_bgzf_ostreambase : virtual public std::basic_ios<Elem,Tr>
757
+{
758
+public:
759
+    typedef std::basic_ostream<Elem, Tr>&                        ostream_reference;
760
+    typedef basic_bgzf_streambuf<Elem, Tr, ElemA, ByteT, ByteAT> bgzf_streambuf_type;
761
+
762
+    basic_bgzf_ostreambase(ostream_reference ostream_)
763
+        : m_buf(ostream_)
764
+    {
765
+        this->init(&m_buf );
766
+    };
767
+
768
+    // returns the underlying zip ostream object
769
+    bgzf_streambuf_type* rdbuf()            { return &m_buf; };
770
+    // returns the bgzf error state
771
+    int get_zerr() const                    { return m_buf.get_err(); };
772
+    // returns the uncompressed data crc
773
+    long get_crc() const                    { return m_buf.get_crc(); };
774
+    // returns the compressed data size
775
+    long get_out_size() const               { return m_buf.get_out_size(); };
776
+    // returns the uncompressed data size
777
+    long get_in_size() const                { return m_buf.get_in_size(); };
778
+
779
+private:
780
+    bgzf_streambuf_type m_buf;
781
+};
782
+
783
+// --------------------------------------------------------------------------
784
+// Class basic_bgzf_istreambase
785
+// --------------------------------------------------------------------------
786
+
787
+template<
788
+    typename Elem,
789
+    typename Tr = std::char_traits<Elem>,
790
+    typename ElemA = std::allocator<Elem>,
791
+    typename ByteT = char,
792
+    typename ByteAT = std::allocator<ByteT>
793
+>
794
+class basic_bgzf_istreambase : virtual public std::basic_ios<Elem,Tr>
795
+{
796
+public:
797
+    typedef std::basic_istream<Elem, Tr>&                           istream_reference;
798
+    typedef basic_unbgzf_streambuf<Elem, Tr, ElemA, ByteT, ByteAT>  unbgzf_streambuf_type;
799
+
800
+    basic_bgzf_istreambase(istream_reference ostream_)
801
+        : m_buf(ostream_)
802
+    {
803
+        this->init(&m_buf );
804
+    };
805
+
806
+    // returns the underlying unzip istream object
807
+    unbgzf_streambuf_type* rdbuf() { return &m_buf; };
808
+
809
+    // returns the bgzf error state
810
+    int get_zerr() const                    { return m_buf.get_zerr(); };
811
+    // returns the uncompressed data crc
812
+    long get_crc() const                    { return m_buf.get_crc(); };
813
+    // returns the uncompressed data size
814
+    long get_out_size() const               { return m_buf.get_out_size(); };
815
+    // returns the compressed data size
816
+    long get_in_size() const                { return m_buf.get_in_size(); };
817
+
818
+private:
819
+    unbgzf_streambuf_type m_buf;
820
+};
821
+
822
+// --------------------------------------------------------------------------
823
+// Class basic_bgzf_ostream
824
+// --------------------------------------------------------------------------
825
+
826
+template<
827
+    typename Elem,
828
+    typename Tr = std::char_traits<Elem>,
829
+    typename ElemA = std::allocator<Elem>,
830
+    typename ByteT = char,
831
+    typename ByteAT = std::allocator<ByteT>
832
+>
833
+class basic_bgzf_ostream :
834
+    public basic_bgzf_ostreambase<Elem,Tr,ElemA,ByteT,ByteAT>,
835
+    public std::basic_ostream<Elem,Tr>
836
+{
837
+public:
838
+    typedef basic_bgzf_ostreambase<Elem,Tr,ElemA,ByteT,ByteAT> bgzf_ostreambase_type;
839
+    typedef std::basic_ostream<Elem,Tr>                        ostream_type;
840
+    typedef ostream_type&                                      ostream_reference;
841
+
842
+    basic_bgzf_ostream(ostream_reference ostream_) :
843
+        bgzf_ostreambase_type(ostream_),
844
+        ostream_type(bgzf_ostreambase_type::rdbuf())
845
+    {}
846
+
847
+    // flush inner buffer and zipper buffer
848
+    basic_bgzf_ostream<Elem,Tr>& zflush()
849
+    {
850
+        this->flush(); this->rdbuf()->flush(); return *this;
851
+    };
852
+
853
+    ~basic_bgzf_ostream()
854
+    {
855
+        this->rdbuf()->addFooter();
856
+    }
857
+
858
+private:
859
+    static void put_long(ostream_reference out_, unsigned long x_);
860
+#ifdef _WIN32
861
+    void _Add_vtordisp1() { } // Required to avoid VC++ warning C4250
862
+    void _Add_vtordisp2() { } // Required to avoid VC++ warning C4250
863
+#endif
864
+};
865
+
866
+// --------------------------------------------------------------------------
867
+// Class basic_bgzf_istream
868
+// --------------------------------------------------------------------------
869
+
870
+template<
871
+    typename Elem,
872
+    typename Tr = std::char_traits<Elem>,
873
+    typename ElemA = std::allocator<Elem>,
874
+    typename ByteT = char,
875
+    typename ByteAT = std::allocator<ByteT>
876
+>
877
+class basic_bgzf_istream :
878
+    public basic_bgzf_istreambase<Elem,Tr,ElemA,ByteT,ByteAT>,
879
+    public std::basic_istream<Elem,Tr>
880
+{
881
+public:
882
+    typedef basic_bgzf_istreambase<Elem,Tr,ElemA,ByteT,ByteAT> bgzf_istreambase_type;
883
+    typedef std::basic_istream<Elem,Tr>                        istream_type;
884
+    typedef istream_type &                                     istream_reference;
885
+    typedef char                                               byte_type;
886
+
887
+    basic_bgzf_istream(istream_reference istream_) :
888
+        bgzf_istreambase_type(istream_),
889
+        istream_type(bgzf_istreambase_type::rdbuf()),
890
+        m_is_gzip(false),
891
+        m_gbgzf_data_size(0)
892
+    {};
893
+
894
+    // returns true if it is a gzip file
895
+    bool is_gzip() const                { return m_is_gzip; };
896
+    // return data size check
897
+    bool check_data_size() const        { return this->get_out_size() == m_gbgzf_data_size; };
898
+
899
+    // return the data size in the file
900
+    long get_gbgzf_data_size() const    { return m_gbgzf_data_size; };
901
+
902
+protected:
903
+    static void read_long(istream_reference in_, unsigned long& x_);
904
+
905
+    int check_header();
906
+    bool m_is_gzip;
907
+    unsigned long m_gbgzf_data_size;
908
+
909
+#ifdef _WIN32
910
+private:
911
+    void _Add_vtordisp1() { } // Required to avoid VC++ warning C4250
912
+    void _Add_vtordisp2() { } // Required to avoid VC++ warning C4250
913
+#endif
914
+};
915
+
916
+// ===========================================================================
917
+// Typedefs
918
+// ===========================================================================
919
+
920
+// A typedef for basic_bgzf_ostream<char>
921
+typedef basic_bgzf_ostream<char> bgzf_ostream;
922
+// A typedef for basic_bgzf_ostream<wchar_t>
923
+typedef basic_bgzf_ostream<wchar_t> bgzf_wostream;
924
+// A typedef for basic_bgzf_istream<char>
925
+typedef basic_bgzf_istream<char> bgzf_istream;
926
+// A typedef for basic_bgzf_istream<wchart>
927
+typedef basic_bgzf_istream<wchar_t> bgzf_wistream;
928
+
929
+}  // namespace seqan
930
+
931
+#endif // INCLUDE_SEQAN_STREAM_IOSTREAM_BGZF_H_