1 | 1 |
new file mode 100644 |
... | ... |
@@ -0,0 +1,931 @@ |
1 |
+// zipstream Library License: |
|
2 |
+// -------------------------- |
|
3 |
+// |
|
4 |
+// The zlib/libpng License Copyright (c) 2003 Jonathan de Halleux. |
|
5 |
+// |
|
6 |
+// This software is provided 'as-is', without any express or implied warranty. In no event will the authors be held liable for any damages arising from the use of this software. |
|
7 |
+// |
|
8 |
+// Permission is granted to anyone to use this software for any purpose, including commercial applications, and to alter it and redistribute it freely, subject to the following restrictions: |
|
9 |
+// |
|
10 |
+// 1. The origin of this software must not be misrepresented; you must not claim that you wrote the original software. If you use this software in a product, an acknowledgment in the product documentation would be appreciated but is not required. |
|
11 |
+// |
|
12 |
+// 2. Altered source versions must be plainly marked as such, and must not be misrepresented as being the original software. |
|
13 |
+// |
|
14 |
+// 3. This notice may not be removed or altered from any source distribution |
|
15 |
+// |
|
16 |
+// |
|
17 |
+// Author: Jonathan de Halleux, dehalleux@pelikhan.com, 2003 (original zlib stream) |
|
18 |
+// Author: David Weese, dave.weese@gmail.com, 2014 (extension to parallel block-wise compression in bgzf format) |
|
19 |
+ |
|
20 |
+#ifndef INCLUDE_SEQAN_STREAM_IOSTREAM_BGZF_H_ |
|
21 |
+#define INCLUDE_SEQAN_STREAM_IOSTREAM_BGZF_H_ |
|
22 |
+ |
|
23 |
+#ifndef SEQAN_BGZF_NUM_THREADS |
|
24 |
+#define SEQAN_BGZF_NUM_THREADS 16 |
|
25 |
+#endif |
|
26 |
+ |
|
27 |
+namespace seqan { |
|
28 |
+ |
|
29 |
+const unsigned BGZF_MAX_BLOCK_SIZE = 64 * 1024; |
|
30 |
+const unsigned BGZF_BLOCK_HEADER_LENGTH = 18; |
|
31 |
+const unsigned BGZF_BLOCK_FOOTER_LENGTH = 8; |
|
32 |
+const unsigned ZLIB_BLOCK_OVERHEAD = 5; // 5 bytes block overhead (see 3.2.4. at http://www.gzip.org/zlib/rfc-deflate.html) |
|
33 |
+ |
|
34 |
+// Reduce the maximal input size, such that the compressed data |
|
35 |
+// always fits in one block even for level Z_NO_COMPRESSION. |
|
36 |
+const unsigned BGZF_BLOCK_SIZE = BGZF_MAX_BLOCK_SIZE - BGZF_BLOCK_HEADER_LENGTH - BGZF_BLOCK_FOOTER_LENGTH - ZLIB_BLOCK_OVERHEAD; |
|
37 |
+ |
|
38 |
+// =========================================================================== |
|
39 |
+// Classes |
|
40 |
+// =========================================================================== |
|
41 |
+ |
|
42 |
+// -------------------------------------------------------------------------- |
|
43 |
+// Class basic_bgzf_streambuf |
|
44 |
+// -------------------------------------------------------------------------- |
|
45 |
+ |
|
46 |
+template< |
|
47 |
+ typename Elem, |
|
48 |
+ typename Tr = std::char_traits<Elem>, |
|
49 |
+ typename ElemA = std::allocator<Elem>, |
|
50 |
+ typename ByteT = char, |
|
51 |
+ typename ByteAT = std::allocator<ByteT> |
|
52 |
+> |
|
53 |
+class basic_bgzf_streambuf : public std::basic_streambuf<Elem, Tr> |
|
54 |
+{ |
|
55 |
+public: |
|
56 |
+ typedef std::basic_ostream<Elem, Tr>& ostream_reference; |
|
57 |
+ typedef ElemA char_allocator_type; |
|
58 |
+ typedef ByteT byte_type; |
|
59 |
+ typedef ByteAT byte_allocator_type; |
|
60 |
+ typedef byte_type* byte_buffer_type; |
|
61 |
+ typedef typename Tr::char_type char_type; |
|
62 |
+ typedef typename Tr::int_type int_type; |
|
63 |
+ |
|
64 |
+ typedef ConcurrentQueue<size_t, Suspendable<Limit> > TJobQueue; |
|
65 |
+ |
|
66 |
+ struct OutputBuffer |
|
67 |
+ { |
|
68 |
+ char buffer[BGZF_MAX_BLOCK_SIZE]; |
|
69 |
+ size_t size; |
|
70 |
+ }; |
|
71 |
+ |
|
72 |
+ struct BufferWriter |
|
73 |
+ { |
|
74 |
+ ostream_reference ostream; |
|
75 |
+ |
|
76 |
+ BufferWriter(ostream_reference ostream) : |
|
77 |
+ ostream(ostream) |
|
78 |
+ {} |
|
79 |
+ |
|
80 |
+ bool operator() (OutputBuffer const & outputBuffer) |
|
81 |
+ { |
|
82 |
+ ostream.write(outputBuffer.buffer, outputBuffer.size); |
|
83 |
+ return ostream.good(); |
|
84 |
+ } |
|
85 |
+ }; |
|
86 |
+ |
|
87 |
+ struct CompressionJob |
|
88 |
+ { |
|
89 |
+ typedef std::vector<char_type, char_allocator_type> TBuffer; |
|
90 |
+ |
|
91 |
+ TBuffer buffer; |
|
92 |
+ size_t size; |
|
93 |
+ OutputBuffer *outputBuffer; |
|
94 |
+ |
|
95 |
+ CompressionJob() : |
|
96 |
+ buffer(BGZF_BLOCK_SIZE / sizeof(char_type), 0), |
|
97 |
+ size(0), |
|
98 |
+ outputBuffer(NULL) |
|
99 |
+ {} |
|
100 |
+ }; |
|
101 |
+ |
|
102 |
+ // string of recycable jobs |
|
103 |
+ size_t numThreads; |
|
104 |
+ size_t numJobs; |
|
105 |
+ String<CompressionJob> jobs; |
|
106 |
+ TJobQueue jobQueue; |
|
107 |
+ TJobQueue idleQueue; |
|
108 |
+ Serializer< |
|
109 |
+ OutputBuffer, |
|
110 |
+ BufferWriter> serializer; |
|
111 |
+ |
|
112 |
+ size_t currentJobId; |
|
113 |
+ bool currentJobAvail; |
|
114 |
+ |
|
115 |
+ |
|
116 |
+ struct CompressionThread |
|
117 |
+ { |
|
118 |
+ basic_bgzf_streambuf *streamBuf; |
|
119 |
+ CompressionContext<BgzfFile> compressionCtx; |
|
120 |
+ size_t threadNum; |
|
121 |
+ |
|
122 |
+ void operator()() |
|
123 |
+ { |
|
124 |
+ ScopedReadLock<TJobQueue> readLock(streamBuf->jobQueue); |
|
125 |
+ ScopedWriteLock<TJobQueue> writeLock(streamBuf->idleQueue); |
|
126 |
+ |
|
127 |
+ // wait for a new job to become available |
|
128 |
+ bool success = true; |
|
129 |
+ while (success) |
|
130 |
+ { |
|
131 |
+ size_t jobId = -1; |
|
132 |
+ if (!popFront(jobId, streamBuf->jobQueue)) |
|
133 |
+ return; |
|
134 |
+ |
|
135 |
+ CompressionJob &job = streamBuf->jobs[jobId]; |
|
136 |
+ |
|
137 |
+ // compress block with zlib |
|
138 |
+ job.outputBuffer->size = _compressBlock( |
|
139 |
+ job.outputBuffer->buffer, sizeof(job.outputBuffer->buffer), |
|
140 |
+ &job.buffer[0], job.size, compressionCtx); |
|
141 |
+ |
|
142 |
+ success = releaseValue(streamBuf->serializer, job.outputBuffer); |
|
143 |
+ appendValue(streamBuf->idleQueue, jobId); |
|
144 |
+ } |
|
145 |
+ } |
|
146 |
+ }; |
|
147 |
+ |
|
148 |
+ // array of worker threads |
|
149 |
+ using TFuture = decltype(std::async(CompressionThread{nullptr, CompressionContext<BgzfFile>{}, static_cast<size_t>(0)})); |
|
150 |
+ std::vector<TFuture> threads; |
|
151 |
+ |
|
152 |
+ basic_bgzf_streambuf(ostream_reference ostream_, |
|
153 |
+ size_t numThreads = SEQAN_BGZF_NUM_THREADS, |
|
154 |
+ size_t jobsPerThread = 8) : |
|
155 |
+ numThreads(numThreads), |
|
156 |
+ numJobs(numThreads * jobsPerThread), |
|
157 |
+ jobQueue(numJobs), |
|
158 |
+ idleQueue(numJobs), |
|
159 |
+ serializer(ostream_, numThreads * jobsPerThread) |
|
160 |
+ { |
|
161 |
+ resize(jobs, numJobs, Exact()); |
|
162 |
+ currentJobId = 0; |
|
163 |
+ |
|
164 |
+ lockWriting(jobQueue); |
|
165 |
+ lockReading(idleQueue); |
|
166 |
+ setReaderWriterCount(jobQueue, numThreads, 1); |
|
167 |
+ setReaderWriterCount(idleQueue, 1, numThreads); |
|
168 |
+ |
|
169 |
+ for (unsigned i = 0; i < numJobs; ++i) |
|
170 |
+ { |
|
171 |
+ bool success = appendValue(idleQueue, i); |
|
172 |
+ ignoreUnusedVariableWarning(success); |
|
173 |
+ SEQAN_ASSERT(success); |
|
174 |
+ } |
|
175 |
+ |
|
176 |
+ for (size_t i = 0; i < numThreads; ++i) |
|
177 |
+ { |
|
178 |
+ threads.push_back(std::async(std::launch::async, CompressionThread{this, CompressionContext<BgzfFile>{}, i})); |
|
179 |
+ } |
|
180 |
+ |
|
181 |
+ currentJobAvail = popFront(currentJobId, idleQueue); |
|
182 |
+ SEQAN_ASSERT(currentJobAvail); |
|
183 |
+ |
|
184 |
+ CompressionJob &job = jobs[currentJobId]; |
|
185 |
+ job.outputBuffer = aquireValue(serializer); |
|
186 |
+ this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1)); |
|
187 |
+ } |
|
188 |
+ |
|
189 |
+ ~basic_bgzf_streambuf() |
|
190 |
+ { |
|
191 |
+ // the buffer is now (after addFooter()) and flush will append the empty EOF marker |
|
192 |
+ flush(true); |
|
193 |
+ |
|
194 |
+ unlockWriting(jobQueue); |
|
195 |
+ unlockReading(idleQueue); |
|
196 |
+ } |
|
197 |
+ |
|
198 |
+ bool compressBuffer(size_t size) |
|
199 |
+ { |
|
200 |
+ // submit current job |
|
201 |
+ if (currentJobAvail) |
|
202 |
+ { |
|
203 |
+ jobs[currentJobId].size = size; |
|
204 |
+ appendValue(jobQueue, currentJobId); |
|
205 |
+ } |
|
206 |
+ |
|
207 |
+ // recycle existing idle job |
|
208 |
+ if (!(currentJobAvail = popFront(currentJobId, idleQueue))) |
|
209 |
+ return false; |
|
210 |
+ |
|
211 |
+ jobs[currentJobId].outputBuffer = aquireValue(serializer); |
|
212 |
+ |
|
213 |
+ return serializer; |
|
214 |
+ } |
|
215 |
+ |
|
216 |
+ int_type overflow(int_type c) |
|
217 |
+ { |
|
218 |
+ int w = static_cast<int>(this->pptr() - this->pbase()); |
|
219 |
+ if (c != EOF) |
|
220 |
+ { |
|
221 |
+ *this->pptr() = c; |
|
222 |
+ ++w; |
|
223 |
+ } |
|
224 |
+ if (compressBuffer(w)) |
|
225 |
+ { |
|
226 |
+ CompressionJob &job = jobs[currentJobId]; |
|
227 |
+ this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1)); |
|
228 |
+ return c; |
|
229 |
+ } |
|
230 |
+ else |
|
231 |
+ { |
|
232 |
+ return EOF; |
|
233 |
+ } |
|
234 |
+ } |
|
235 |
+ |
|
236 |
+ std::streamsize flush(bool flushEmptyBuffer = false) |
|
237 |
+ { |
|
238 |
+ int w = static_cast<int>(this->pptr() - this->pbase()); |
|
239 |
+ if ((w != 0 || flushEmptyBuffer) && compressBuffer(w)) |
|
240 |
+ { |
|
241 |
+ CompressionJob &job = jobs[currentJobId]; |
|
242 |
+ this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1)); |
|
243 |
+ } |
|
244 |
+ else |
|
245 |
+ { |
|
246 |
+ w = 0; |
|
247 |
+ } |
|
248 |
+ |
|
249 |
+ // wait for running compressor threads |
|
250 |
+ waitForMinSize(idleQueue, numJobs - 1); |
|
251 |
+ |
|
252 |
+ serializer.worker.ostream.flush(); |
|
253 |
+ return w; |
|
254 |
+ } |
|
255 |
+ |
|
256 |
+ int sync() |
|
257 |
+ { |
|
258 |
+ if (this->pptr() != this->pbase()) |
|
259 |
+ { |
|
260 |
+ int c = overflow(EOF); |
|
261 |
+ if (c == EOF) |
|
262 |
+ return -1; |
|
263 |
+ } |
|
264 |
+ return 0; |
|
265 |
+ } |
|
266 |
+ |
|
267 |
+ void addFooter() |
|
268 |
+ { |
|
269 |
+ // we flush the filled buffer here, so that an empty (EOF) buffer is flushed in the d'tor |
|
270 |
+ if (this->pptr() != this->pbase()) |
|
271 |
+ overflow(EOF); |
|
272 |
+ } |
|
273 |
+ |
|
274 |
+ // returns a reference to the output stream |
|
275 |
+ ostream_reference get_ostream() const { return serializer.worker.ostream; }; |
|
276 |
+}; |
|
277 |
+ |
|
278 |
+// -------------------------------------------------------------------------- |
|
279 |
+// Class basic_unbgzf_streambuf |
|
280 |
+// -------------------------------------------------------------------------- |
|
281 |
+ |
|
282 |
+template< |
|
283 |
+ typename Elem, |
|
284 |
+ typename Tr = std::char_traits<Elem>, |
|
285 |
+ typename ElemA = std::allocator<Elem>, |
|
286 |
+ typename ByteT = char, |
|
287 |
+ typename ByteAT = std::allocator<ByteT> |
|
288 |
+> |
|
289 |
+class basic_unbgzf_streambuf : |
|
290 |
+ public std::basic_streambuf<Elem, Tr> |
|
291 |
+{ |
|
292 |
+public: |
|
293 |
+ typedef std::basic_istream<Elem, Tr>& istream_reference; |
|
294 |
+ typedef ElemA char_allocator_type; |
|
295 |
+ typedef ByteT byte_type; |
|
296 |
+ typedef ByteAT byte_allocator_type; |
|
297 |
+ typedef byte_type* byte_buffer_type; |
|
298 |
+ typedef typename Tr::char_type char_type; |
|
299 |
+ typedef typename Tr::int_type int_type; |
|
300 |
+ typedef typename Tr::off_type off_type; |
|
301 |
+ typedef typename Tr::pos_type pos_type; |
|
302 |
+ |
|
303 |
+ typedef std::vector<char_type, char_allocator_type> TBuffer; |
|
304 |
+ typedef ConcurrentQueue<int, Suspendable<Limit> > TJobQueue; |
|
305 |
+ |
|
306 |
+ static const size_t MAX_PUTBACK = 4; |
|
307 |
+ |
|
308 |
+ struct Serializer |
|
309 |
+ { |
|
310 |
+ istream_reference istream; |
|
311 |
+ std::mutex lock; |
|
312 |
+ IOError *error; |
|
313 |
+ off_type fileOfs; |
|
314 |
+ |
|
315 |
+ Serializer(istream_reference istream) : |
|
316 |
+ istream(istream), |
|
317 |
+ error(NULL), |
|
318 |
+ fileOfs(0u) |
|
319 |
+ {} |
|
320 |
+ |
|
321 |
+ ~Serializer() |
|
322 |
+ { |
|
323 |
+ delete error; |
|
324 |
+ } |
|
325 |
+ }; |
|
326 |
+ |
|
327 |
+ Serializer serializer; |
|
328 |
+ |
|
329 |
+ struct DecompressionJob |
|
330 |
+ { |
|
331 |
+ typedef std::vector<byte_type, byte_allocator_type> TInputBuffer; |
|
332 |
+ |
|
333 |
+ TInputBuffer inputBuffer; |
|
334 |
+ TBuffer buffer; |
|
335 |
+ off_type fileOfs; |
|
336 |
+ int size; |
|
337 |
+ unsigned compressedSize; |
|
338 |
+ |
|
339 |
+ std::mutex cs; |
|
340 |
+ std::condition_variable readyEvent; |
|
341 |
+ bool ready; |
|
342 |
+ bool bgzfEofMarker; |
|
343 |
+ |
|
344 |
+ DecompressionJob() : |
|
345 |
+ inputBuffer(BGZF_MAX_BLOCK_SIZE, 0), |
|
346 |
+ buffer(MAX_PUTBACK + BGZF_MAX_BLOCK_SIZE / sizeof(char_type), 0), |
|
347 |
+ fileOfs(), |
|
348 |
+ size(0), |
|
349 |
+ cs(), |
|
350 |
+ readyEvent(), |
|
351 |
+ ready(true), |
|
352 |
+ bgzfEofMarker(false) |
|
353 |
+ {} |
|
354 |
+ |
|
355 |
+ // TODO(rrahn): Do we need a copy constructor for the decompression job. |
|
356 |
+ DecompressionJob(DecompressionJob const &other) : |
|
357 |
+ inputBuffer(other.inputBuffer), |
|
358 |
+ buffer(other.buffer), |
|
359 |
+ fileOfs(other.fileOfs), |
|
360 |
+ size(other.size), |
|
361 |
+ cs(), |
|
362 |
+ readyEvent(), |
|
363 |
+ ready(other.ready), |
|
364 |
+ bgzfEofMarker(other.bgzfEofMarker) |
|
365 |
+ {} |
|
366 |
+ }; |
|
367 |
+ |
|
368 |
+ // string of recycable jobs |
|
369 |
+ size_t numThreads; |
|
370 |
+ size_t numJobs; |
|
371 |
+ String<DecompressionJob> jobs; |
|
372 |
+ TJobQueue runningQueue; |
|
373 |
+ TJobQueue todoQueue; |
|
374 |
+ int currentJobId; |
|
375 |
+ |
|
376 |
+ struct DecompressionThread |
|
377 |
+ { |
|
378 |
+ basic_unbgzf_streambuf *streamBuf; |
|
379 |
+ CompressionContext<BgzfFile> compressionCtx; |
|
380 |
+ |
|
381 |
+ void operator()() |
|
382 |
+ { |
|
383 |
+ ScopedReadLock<TJobQueue> readLock(streamBuf->todoQueue); |
|
384 |
+ ScopedWriteLock<TJobQueue> writeLock(streamBuf->runningQueue); |
|
385 |
+ |
|
386 |
+ // wait for a new job to become available |
|
387 |
+ while (true) |
|
388 |
+ { |
|
389 |
+ int jobId = -1; |
|
390 |
+ if (!popFront(jobId, streamBuf->todoQueue)) |
|
391 |
+ return; |
|
392 |
+ |
|
393 |
+ DecompressionJob &job = streamBuf->jobs[jobId]; |
|
394 |
+ size_t tailLen = 0; |
|
395 |
+ |
|
396 |
+ // typically the idle queue contains only ready jobs |
|
397 |
+ // however, if seek() fast forwards running jobs into the todoQueue |
|
398 |
+ // the caller defers the task of waiting to the decompression threads |
|
399 |
+ if (!job.ready) |
|
400 |
+ { |
|
401 |
+ std::unique_lock<std::mutex> lock(job.cs); |
|
402 |
+ job.readyEvent.wait(lock, [&job]{return job.ready;}); |
|
403 |
+ SEQAN_ASSERT_EQ(job.ready, true); |
|
404 |
+ } |
|
405 |
+ |
|
406 |
+ { |
|
407 |
+ std::lock_guard<std::mutex> scopedLock(streamBuf->serializer.lock); |
|
408 |
+ |
|
409 |
+ job.bgzfEofMarker = false; |
|
410 |
+ if (streamBuf->serializer.error != NULL) |
|
411 |
+ return; |
|
412 |
+ |
|
413 |
+ // remember start offset (for tellg later) |
|
414 |
+ job.fileOfs = streamBuf->serializer.fileOfs; |
|
415 |
+ job.size = -1; |
|
416 |
+ job.compressedSize = 0; |
|
417 |
+ |
|
418 |
+ // only load if not at EOF |
|
419 |
+ if (job.fileOfs != -1) |
|
420 |
+ { |
|
421 |
+ // read header |
|
422 |
+ streamBuf->serializer.istream.read( |
|
423 |
+ (char*)&job.inputBuffer[0], |
|
424 |
+ BGZF_BLOCK_HEADER_LENGTH); |
|
425 |
+ |
|
426 |
+ if (!streamBuf->serializer.istream.good()) |
|
427 |
+ { |
|
428 |
+ streamBuf->serializer.fileOfs = -1; |
|
429 |
+ if (streamBuf->serializer.istream.eof()) |
|
430 |
+ goto eofSkip; |
|
431 |
+ streamBuf->serializer.error = new IOError("Stream read error."); |
|
432 |
+ return; |
|
433 |
+ } |
|
434 |
+ |
|
435 |
+ // check header |
|
436 |
+ if (!_bgzfCheckHeader(&job.inputBuffer[0])) |
|
437 |
+ { |
|
438 |
+ streamBuf->serializer.fileOfs = -1; |
|
439 |
+ streamBuf->serializer.error = new IOError("Invalid BGZF block header."); |
|
440 |
+ return; |
|
441 |
+ } |
|
442 |
+ |
|
443 |
+ // extract length of compressed data |
|
444 |
+ tailLen = _bgzfUnpack16(&job.inputBuffer[0] + 16) + 1u - BGZF_BLOCK_HEADER_LENGTH; |
|
445 |
+ |
|
446 |
+ // read compressed data and tail |
|
447 |
+ streamBuf->serializer.istream.read( |
|
448 |
+ (char*)&job.inputBuffer[0] + BGZF_BLOCK_HEADER_LENGTH, |
|
449 |
+ tailLen); |
|
450 |
+ |
|
451 |
+ // Check if end-of-file marker is set |
|
452 |
+ if (memcmp(reinterpret_cast<uint8_t const *>(&job.inputBuffer[0]), |
|
453 |
+ reinterpret_cast<uint8_t const *>(&BGZF_END_OF_FILE_MARKER[0]), |
|
454 |
+ 28) == 0) |
|
455 |
+ { |
|
456 |
+ job.bgzfEofMarker = true; |
|
457 |
+ } |
|
458 |
+ |
|
459 |
+ if (!streamBuf->serializer.istream.good()) |
|
460 |
+ { |
|
461 |
+ streamBuf->serializer.fileOfs = -1; |
|
462 |
+ if (streamBuf->serializer.istream.eof()) |
|
463 |
+ goto eofSkip; |
|
464 |
+ streamBuf->serializer.error = new IOError("Stream read error."); |
|
465 |
+ return; |
|
466 |
+ } |
|
467 |
+ |
|
468 |
+ job.compressedSize = BGZF_BLOCK_HEADER_LENGTH + tailLen; |
|
469 |
+ streamBuf->serializer.fileOfs += job.compressedSize; |
|
470 |
+ job.ready = false; |
|
471 |
+ |
|
472 |
+ eofSkip: |
|
473 |
+ streamBuf->serializer.istream.clear( |
|
474 |
+ streamBuf->serializer.istream.rdstate() & ~std::ios_base::failbit); |
|
475 |
+ } |
|
476 |
+ |
|
477 |
+ if (!appendValue(streamBuf->runningQueue, jobId)) |
|
478 |
+ { |
|
479 |
+ // signal that job is ready |
|
480 |
+ { |
|
481 |
+ std::unique_lock<std::mutex> lock(job.cs); |
|
482 |
+ job.ready = true; |
|
483 |
+ } |
|
484 |
+ job.readyEvent.notify_all(); |
|
485 |
+ return; // Terminate this thread. |
|
486 |
+ } |
|
487 |
+ } |
|
488 |
+ |
|
489 |
+ if (!job.ready) |
|
490 |
+ { |
|
491 |
+ // decompress block |
|
492 |
+ job.size = _decompressBlock( |
|
493 |
+ &job.buffer[0] + MAX_PUTBACK, capacity(job.buffer), |
|
494 |
+ &job.inputBuffer[0], job.compressedSize, compressionCtx); |
|
495 |
+ |
|
496 |
+ // signal that job is ready |
|
497 |
+ { |
|
498 |
+ std::unique_lock<std::mutex> lock(job.cs); |
|
499 |
+ job.ready = true; |
|
500 |
+ } |
|
501 |
+ job.readyEvent.notify_all(); |
|
502 |
+ } |
|
503 |
+ } |
|
504 |
+ } |
|
505 |
+ }; |
|
506 |
+ |
|
507 |
+ // array of worker threads |
|
508 |
+ using TFuture = decltype(std::async(DecompressionThread{nullptr, CompressionContext<BgzfFile>{}})); |
|
509 |
+ std::vector<TFuture> threads; |
|
510 |
+ TBuffer putbackBuffer; |
|
511 |
+ |
|
512 |
+ basic_unbgzf_streambuf(istream_reference istream_, |
|
513 |
+ size_t numThreads = SEQAN_BGZF_NUM_THREADS, |
|
514 |
+ size_t jobsPerThread = 8) : |
|
515 |
+ serializer(istream_), |
|
516 |
+ numThreads(numThreads), |
|
517 |
+ numJobs(numThreads * jobsPerThread), |
|
518 |
+ runningQueue(numJobs), |
|
519 |
+ todoQueue(numJobs), |
|
520 |
+ putbackBuffer(MAX_PUTBACK) |
|
521 |
+ { |
|
522 |
+ resize(jobs, numJobs, Exact()); |
|
523 |
+ currentJobId = -1; |
|
524 |
+ |
|
525 |
+ lockReading(runningQueue); |
|
526 |
+ lockWriting(todoQueue); |
|
527 |
+ setReaderWriterCount(runningQueue, 1, numThreads); |
|
528 |
+ setReaderWriterCount(todoQueue, numThreads, 1); |
|
529 |
+ |
|
530 |
+ for (unsigned i = 0; i < numJobs; ++i) |
|
531 |
+ { |
|
532 |
+ bool success = appendValue(todoQueue, i); |
|
533 |
+ ignoreUnusedVariableWarning(success); |
|
534 |
+ SEQAN_ASSERT(success); |
|
535 |
+ } |
|
536 |
+ |
|
537 |
+ for (unsigned i = 0; i < numThreads; ++i) |
|
538 |
+ { |
|
539 |
+ threads.push_back(std::async(std::launch::async, DecompressionThread{this, CompressionContext<BgzfFile>{}})); |
|
540 |
+ } |
|
541 |
+ } |
|
542 |
+ |
|
543 |
+ ~basic_unbgzf_streambuf() |
|
544 |
+ { |
|
545 |
+ unlockWriting(todoQueue); |
|
546 |
+ unlockReading(runningQueue); |
|
547 |
+ } |
|
548 |
+ |
|
549 |
+ int_type underflow() |
|
550 |
+ { |
|
551 |
+ // no need to use the next buffer? |
|
552 |
+ if (this->gptr() && this->gptr() < this->egptr()) |
|
553 |
+ return Tr::to_int_type(*this->gptr()); |
|
554 |
+ |
|
555 |
+ size_t putback = this->gptr() - this->eback(); |
|
556 |
+ if (putback > MAX_PUTBACK) |
|
557 |
+ putback = MAX_PUTBACK; |
|
558 |
+ |
|
559 |
+ // save at most MAX_PUTBACK characters from previous page to putback buffer |
|
560 |
+ if (putback != 0) |
|
561 |
+ std::copy( |
|
562 |
+ this->gptr() - putback, |
|
563 |
+ this->gptr(), |
|
564 |
+ &putbackBuffer[0]); |
|
565 |
+ |
|
566 |
+ if (currentJobId >= 0) |
|
567 |
+ appendValue(todoQueue, currentJobId); |
|
568 |
+ |
|
569 |
+ while (true) |
|
570 |
+ { |
|
571 |
+ if (!popFront(currentJobId, runningQueue)) |
|
572 |
+ { |
|
573 |
+ currentJobId = -1; |
|
574 |
+ SEQAN_ASSERT(serializer.error != NULL); |
|
575 |
+ if (serializer.error != NULL) |
|
576 |
+ throw *serializer.error; |
|
577 |
+ return EOF; |
|
578 |
+ } |
|
579 |
+ |
|
580 |
+ DecompressionJob &job = jobs[currentJobId]; |
|
581 |
+ |
|
582 |
+ // restore putback buffer |
|
583 |
+ this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1)); |
|
584 |
+ if (putback != 0) |
|
585 |
+ std::copy( |
|
586 |
+ &putbackBuffer[0], |
|
587 |
+ &putbackBuffer[0] + putback, |
|
588 |
+ &job.buffer[0] + (MAX_PUTBACK - putback)); |
|
589 |
+ |
|
590 |
+ // wait for the end of decompression |
|
591 |
+ { |
|
592 |
+ std::unique_lock<std::mutex> lock(job.cs); |
|
593 |
+ job.readyEvent.wait(lock, [&job]{return job.ready;}); |
|
594 |
+ } |
|
595 |
+ |
|
596 |
+ size_t size = (job.size != -1)? job.size : 0; |
|
597 |
+ |
|
598 |
+ // reset buffer pointers |
|
599 |
+ this->setg( |
|
600 |
+ &job.buffer[0] + (MAX_PUTBACK - putback), // beginning of putback area |
|
601 |
+ &job.buffer[0] + MAX_PUTBACK, // read position |
|
602 |
+ &job.buffer[0] + (MAX_PUTBACK + size)); // end of buffer |
|
603 |
+ |
|
604 |
+ // The end of the bgzf file is reached, either if there was an error, or if the |
|
605 |
+ // end-of-file marker was reached, while the uncompressed block had zero size. |
|
606 |
+ if (job.size == -1 || (job.size == 0 && job.bgzfEofMarker)) |
|
607 |
+ return EOF; |
|
608 |
+ else if (job.size > 0) |
|
609 |
+ return Tr::to_int_type(*this->gptr()); // return next character |
|
610 |
+ |
|
611 |
+ throw IOError("BGZF: Invalid end condition in decompression. " |
|
612 |
+ "Most likely due to an empty bgzf block without end-of-file marker."); |
|
613 |
+ } |
|
614 |
+ } |
|
615 |
+ |
|
616 |
+ pos_type seekoff(off_type ofs, std::ios_base::seekdir dir, std::ios_base::openmode openMode) |
|
617 |
+ { |
|
618 |
+ if ((openMode & (std::ios_base::in | std::ios_base::out)) == std::ios_base::in) |
|
619 |
+ { |
|
620 |
+ if (dir == std::ios_base::cur && ofs >= 0) |
|
621 |
+ { |
|
622 |
+ // forward delta seek |
|
623 |
+ while (currentJobId < 0 || this->egptr() - this->gptr() < ofs) |
|
624 |
+ { |
|
625 |
+ ofs -= this->egptr() - this->gptr(); |
|
626 |
+ if (this->underflow() == EOF) |
|
627 |
+ break; |
|
628 |
+ } |
|
629 |
+ |
|
630 |
+ if (currentJobId >= 0 && ofs <= this->egptr() - this->gptr()) |
|
631 |
+ { |
|
632 |
+ DecompressionJob &job = jobs[currentJobId]; |
|
633 |
+ |
|
634 |
+ // reset buffer pointers |
|
635 |
+ this->setg( |
|
636 |
+ this->eback(), // beginning of putback area |
|
637 |
+ this->gptr() + ofs, // read position |
|
638 |
+ this->egptr()); // end of buffer |
|
639 |
+ |
|
640 |
+ if (this->gptr() != this->egptr()) |
|
641 |
+ return pos_type((job.fileOfs << 16) + ((this->gptr() - &job.buffer[MAX_PUTBACK]))); |
|
642 |
+ else |
|
643 |
+ return pos_type((job.fileOfs + job.compressedSize) << 16); |
|
644 |
+ } |
|
645 |
+ |
|
646 |
+ } |
|
647 |
+ else if (dir == std::ios_base::beg) |
|
648 |
+ { |
|
649 |
+ // random seek |
|
650 |
+ std::streampos destFileOfs = ofs >> 16; |
|
651 |
+ |
|
652 |
+ // are we in the same block? |
|
653 |
+ if (currentJobId >= 0 && jobs[currentJobId].fileOfs == (off_type)destFileOfs) |
|
654 |
+ { |
|
655 |
+ DecompressionJob &job = jobs[currentJobId]; |
|
656 |
+ |
|
657 |
+ // reset buffer pointers |
|
658 |
+ this->setg( |
|
659 |
+ this->eback(), // beginning of putback area |
|
660 |
+ &job.buffer[0] + (MAX_PUTBACK + (ofs & 0xffff)), // read position |
|
661 |
+ this->egptr()); // end of buffer |
|
662 |
+ return ofs; |
|
663 |
+ } |
|
664 |
+ |
|
665 |
+ // ok, different block |
|
666 |
+ { |
|
667 |
+ std::lock_guard<std::mutex> scopedLock(serializer.lock); |
|
668 |
+ |
|
669 |
+ // remove all running jobs and put them in the idle queue unless we |
|
670 |
+ // find our seek target |
|
671 |
+ |
|
672 |
+ if (currentJobId >= 0) |
|
673 |
+ appendValue(todoQueue, currentJobId); |
|
674 |
+ |
|
675 |
+ // Note that if we are here the current job does not represent the sought block. |
|
676 |
+ // Hence if the running queue is empty we need to explicitly unset the jobId, |
|
677 |
+ // otherwise we would not update the serializers istream pointer to the correct position. |
|
678 |
+ if (empty(runningQueue)) |
|
679 |
+ currentJobId = -1; |
|
680 |
+ |
|
681 |
+ // empty is thread-safe in serializer.lock |
|
682 |
+ while (!empty(runningQueue)) |
|
683 |
+ { |
|
684 |
+ popFront(currentJobId, runningQueue); |
|
685 |
+ |
|
686 |
+ if (jobs[currentJobId].fileOfs == (off_type)destFileOfs) |
|
687 |
+ break; |
|
688 |
+ |
|
689 |
+ // push back useless job |
|
690 |
+ appendValue(todoQueue, currentJobId); |
|
691 |
+ currentJobId = -1; |
|
692 |
+ } |
|
693 |
+ |
|
694 |
+ if (currentJobId == -1) |
|
695 |
+ { |
|
696 |
+ SEQAN_ASSERT(empty(runningQueue)); |
|
697 |
+ serializer.istream.clear(serializer.istream.rdstate() & ~std::ios_base::eofbit); |
|
698 |
+ if (serializer.istream.rdbuf()->pubseekpos(destFileOfs, std::ios_base::in) == destFileOfs) |
|
699 |
+ serializer.fileOfs = destFileOfs; |
|
700 |
+ else |
|
701 |
+ currentJobId = -2; // temporarily signals a seek error |
|
702 |
+ } |
|
703 |
+ } |
|
704 |
+ |
|
705 |
+ // if our block wasn't in the running queue yet, it should now |
|
706 |
+ // be the first that falls out after modifying serializer.fileOfs |
|
707 |
+ if (currentJobId == -1) |
|
708 |
+ popFront(currentJobId, runningQueue); |
|
709 |
+ else if (currentJobId == -2) |
|
710 |
+ currentJobId = -1; |
|
711 |
+ |
|
712 |
+ if (currentJobId >= 0) |
|
713 |
+ { |
|
714 |
+ // wait for the end of decompression |
|
715 |
+ DecompressionJob &job = jobs[currentJobId]; |
|
716 |
+ |
|
717 |
+ { |
|
718 |
+ std::unique_lock<std::mutex> lock(job.cs); |
|
719 |
+ job.readyEvent.wait(lock, [&job]{return job.ready;}); |
|
720 |
+ } |
|
721 |
+ |
|
722 |
+ SEQAN_ASSERT_EQ(job.fileOfs, (off_type)destFileOfs); |
|
723 |
+ |
|
724 |
+ // reset buffer pointers |
|
725 |
+ this->setg( |
|
726 |
+ &job.buffer[0] + MAX_PUTBACK, // no putback area |
|
727 |
+ &job.buffer[0] + (MAX_PUTBACK + (ofs & 0xffff)), // read position |
|
728 |
+ &job.buffer[0] + (MAX_PUTBACK + job.size)); // end of buffer |
|
729 |
+ return ofs; |
|
730 |
+ } |
|
731 |
+ } |
|
732 |
+ } |
|
733 |
+ return pos_type(off_type(-1)); |
|
734 |
+ } |
|
735 |
+ |
|
736 |
+ pos_type seekpos(pos_type pos, std::ios_base::openmode openMode) |
|
737 |
+ { |
|
738 |
+ return seekoff(off_type(pos), std::ios_base::beg, openMode); |
|
739 |
+ } |
|
740 |
+ |
|
741 |
+ // returns the compressed input istream |
|
742 |
+ istream_reference get_istream() { return serializer.istream; }; |
|
743 |
+}; |
|
744 |
+ |
|
745 |
+// -------------------------------------------------------------------------- |
|
746 |
+// Class basic_bgzf_ostreambase |
|
747 |
+// -------------------------------------------------------------------------- |
|
748 |
+ |
|
749 |
+template< |
|
750 |
+ typename Elem, |
|
751 |
+ typename Tr = std::char_traits<Elem>, |
|
752 |
+ typename ElemA = std::allocator<Elem>, |
|
753 |
+ typename ByteT = char, |
|
754 |
+ typename ByteAT = std::allocator<ByteT> |
|
755 |
+> |
|
756 |
+class basic_bgzf_ostreambase : virtual public std::basic_ios<Elem,Tr> |
|
757 |
+{ |
|
758 |
+public: |
|
759 |
+ typedef std::basic_ostream<Elem, Tr>& ostream_reference; |
|
760 |
+ typedef basic_bgzf_streambuf<Elem, Tr, ElemA, ByteT, ByteAT> bgzf_streambuf_type; |
|
761 |
+ |
|
762 |
+ basic_bgzf_ostreambase(ostream_reference ostream_) |
|
763 |
+ : m_buf(ostream_) |
|
764 |
+ { |
|
765 |
+ this->init(&m_buf ); |
|
766 |
+ }; |
|
767 |
+ |
|
768 |
+ // returns the underlying zip ostream object |
|
769 |
+ bgzf_streambuf_type* rdbuf() { return &m_buf; }; |
|
770 |
+ // returns the bgzf error state |
|
771 |
+ int get_zerr() const { return m_buf.get_err(); }; |
|
772 |
+ // returns the uncompressed data crc |
|
773 |
+ long get_crc() const { return m_buf.get_crc(); }; |
|
774 |
+ // returns the compressed data size |
|
775 |
+ long get_out_size() const { return m_buf.get_out_size(); }; |
|
776 |
+ // returns the uncompressed data size |
|
777 |
+ long get_in_size() const { return m_buf.get_in_size(); }; |
|
778 |
+ |
|
779 |
+private: |
|
780 |
+ bgzf_streambuf_type m_buf; |
|
781 |
+}; |
|
782 |
+ |
|
783 |
+// -------------------------------------------------------------------------- |
|
784 |
+// Class basic_bgzf_istreambase |
|
785 |
+// -------------------------------------------------------------------------- |
|
786 |
+ |
|
787 |
+template< |
|
788 |
+ typename Elem, |
|
789 |
+ typename Tr = std::char_traits<Elem>, |
|
790 |
+ typename ElemA = std::allocator<Elem>, |
|
791 |
+ typename ByteT = char, |
|
792 |
+ typename ByteAT = std::allocator<ByteT> |
|
793 |
+> |
|
794 |
+class basic_bgzf_istreambase : virtual public std::basic_ios<Elem,Tr> |
|
795 |
+{ |
|
796 |
+public: |
|
797 |
+ typedef std::basic_istream<Elem, Tr>& istream_reference; |
|
798 |
+ typedef basic_unbgzf_streambuf<Elem, Tr, ElemA, ByteT, ByteAT> unbgzf_streambuf_type; |
|
799 |
+ |
|
800 |
+ basic_bgzf_istreambase(istream_reference ostream_) |
|
801 |
+ : m_buf(ostream_) |
|
802 |
+ { |
|
803 |
+ this->init(&m_buf ); |
|
804 |
+ }; |
|
805 |
+ |
|
806 |
+ // returns the underlying unzip istream object |
|
807 |
+ unbgzf_streambuf_type* rdbuf() { return &m_buf; }; |
|
808 |
+ |
|
809 |
+ // returns the bgzf error state |
|
810 |
+ int get_zerr() const { return m_buf.get_zerr(); }; |
|
811 |
+ // returns the uncompressed data crc |
|
812 |
+ long get_crc() const { return m_buf.get_crc(); }; |
|
813 |
+ // returns the uncompressed data size |
|
814 |
+ long get_out_size() const { return m_buf.get_out_size(); }; |
|
815 |
+ // returns the compressed data size |
|
816 |
+ long get_in_size() const { return m_buf.get_in_size(); }; |
|
817 |
+ |
|
818 |
+private: |
|
819 |
+ unbgzf_streambuf_type m_buf; |
|
820 |
+}; |
|
821 |
+ |
|
822 |
+// -------------------------------------------------------------------------- |
|
823 |
+// Class basic_bgzf_ostream |
|
824 |
+// -------------------------------------------------------------------------- |
|
825 |
+ |
|
826 |
+template< |
|
827 |
+ typename Elem, |
|
828 |
+ typename Tr = std::char_traits<Elem>, |
|
829 |
+ typename ElemA = std::allocator<Elem>, |
|
830 |
+ typename ByteT = char, |
|
831 |
+ typename ByteAT = std::allocator<ByteT> |
|
832 |
+> |
|
833 |
+class basic_bgzf_ostream : |
|
834 |
+ public basic_bgzf_ostreambase<Elem,Tr,ElemA,ByteT,ByteAT>, |
|
835 |
+ public std::basic_ostream<Elem,Tr> |
|
836 |
+{ |
|
837 |
+public: |
|
838 |
+ typedef basic_bgzf_ostreambase<Elem,Tr,ElemA,ByteT,ByteAT> bgzf_ostreambase_type; |
|
839 |
+ typedef std::basic_ostream<Elem,Tr> ostream_type; |
|
840 |
+ typedef ostream_type& ostream_reference; |
|
841 |
+ |
|
842 |
+ basic_bgzf_ostream(ostream_reference ostream_) : |
|
843 |
+ bgzf_ostreambase_type(ostream_), |
|
844 |
+ ostream_type(bgzf_ostreambase_type::rdbuf()) |
|
845 |
+ {} |
|
846 |
+ |
|
847 |
+ // flush inner buffer and zipper buffer |
|
848 |
+ basic_bgzf_ostream<Elem,Tr>& zflush() |
|
849 |
+ { |
|
850 |
+ this->flush(); this->rdbuf()->flush(); return *this; |
|
851 |
+ }; |
|
852 |
+ |
|
853 |
+ ~basic_bgzf_ostream() |
|
854 |
+ { |
|
855 |
+ this->rdbuf()->addFooter(); |
|
856 |
+ } |
|
857 |
+ |
|
858 |
+private: |
|
859 |
+ static void put_long(ostream_reference out_, unsigned long x_); |
|
860 |
+#ifdef _WIN32 |
|
861 |
+ void _Add_vtordisp1() { } // Required to avoid VC++ warning C4250 |
|
862 |
+ void _Add_vtordisp2() { } // Required to avoid VC++ warning C4250 |
|
863 |
+#endif |
|
864 |
+}; |
|
865 |
+ |
|
866 |
+// -------------------------------------------------------------------------- |
|
867 |
+// Class basic_bgzf_istream |
|
868 |
+// -------------------------------------------------------------------------- |
|
869 |
+ |
|
870 |
+template< |
|
871 |
+ typename Elem, |
|
872 |
+ typename Tr = std::char_traits<Elem>, |
|
873 |
+ typename ElemA = std::allocator<Elem>, |
|
874 |
+ typename ByteT = char, |
|
875 |
+ typename ByteAT = std::allocator<ByteT> |
|
876 |
+> |
|
877 |
+class basic_bgzf_istream : |
|
878 |
+ public basic_bgzf_istreambase<Elem,Tr,ElemA,ByteT,ByteAT>, |
|
879 |
+ public std::basic_istream<Elem,Tr> |
|
880 |
+{ |
|
881 |
+public: |
|
882 |
+ typedef basic_bgzf_istreambase<Elem,Tr,ElemA,ByteT,ByteAT> bgzf_istreambase_type; |
|
883 |
+ typedef std::basic_istream<Elem,Tr> istream_type; |
|
884 |
+ typedef istream_type & istream_reference; |
|
885 |
+ typedef char byte_type; |
|
886 |
+ |
|
887 |
+ basic_bgzf_istream(istream_reference istream_) : |
|
888 |
+ bgzf_istreambase_type(istream_), |
|
889 |
+ istream_type(bgzf_istreambase_type::rdbuf()), |
|
890 |
+ m_is_gzip(false), |
|
891 |
+ m_gbgzf_data_size(0) |
|
892 |
+ {}; |
|
893 |
+ |
|
894 |
+ // returns true if it is a gzip file |
|
895 |
+ bool is_gzip() const { return m_is_gzip; }; |
|
896 |
+ // return data size check |
|
897 |
+ bool check_data_size() const { return this->get_out_size() == m_gbgzf_data_size; }; |
|
898 |
+ |
|
899 |
+ // return the data size in the file |
|
900 |
+ long get_gbgzf_data_size() const { return m_gbgzf_data_size; }; |
|
901 |
+ |
|
902 |
+protected: |
|
903 |
+ static void read_long(istream_reference in_, unsigned long& x_); |
|
904 |
+ |
|
905 |
+ int check_header(); |
|
906 |
+ bool m_is_gzip; |
|
907 |
+ unsigned long m_gbgzf_data_size; |
|
908 |
+ |
|
909 |
+#ifdef _WIN32 |
|
910 |
+private: |
|
911 |
+ void _Add_vtordisp1() { } // Required to avoid VC++ warning C4250 |
|
912 |
+ void _Add_vtordisp2() { } // Required to avoid VC++ warning C4250 |
|
913 |
+#endif |
|
914 |
+}; |
|
915 |
+ |
|
916 |
+// =========================================================================== |
|
917 |
+// Typedefs |
|
918 |
+// =========================================================================== |
|
919 |
+ |
|
920 |
+// A typedef for basic_bgzf_ostream<char> |
|
921 |
+typedef basic_bgzf_ostream<char> bgzf_ostream; |
|
922 |
+// A typedef for basic_bgzf_ostream<wchar_t> |
|
923 |
+typedef basic_bgzf_ostream<wchar_t> bgzf_wostream; |
|
924 |
+// A typedef for basic_bgzf_istream<char> |
|
925 |
+typedef basic_bgzf_istream<char> bgzf_istream; |
|
926 |
+// A typedef for basic_bgzf_istream<wchart> |
|
927 |
+typedef basic_bgzf_istream<wchar_t> bgzf_wistream; |
|
928 |
+ |
|
929 |
+} // namespace seqan |
|
930 |
+ |
|
931 |
+#endif // INCLUDE_SEQAN_STREAM_IOSTREAM_BGZF_H_ |