// // $Id: BinaryIndexStream.cpp 2051 2010-06-15 18:39:13Z chambm $ // // // Original author: Matt Chambers <matt.chambers .@. vanderbilt.edu> // // Copyright 2009 Vanderbilt University - Nashville, TN 37232 // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // #define PWIZ_SOURCE #include "BinaryIndexStream.hpp" #include "pwiz/utility/misc/Std.hpp" #include <boost/thread/thread.hpp> #include <boost/thread/mutex.hpp> using boost::shared_ptr; namespace pwiz { namespace data { namespace { template<typename _Ty> struct stream_vector_reader { istream& operator() (istream& is, _Ty& value) const {return is >> value;} }; template<typename _Ty, typename reader_type = stream_vector_reader<_Ty> > class stream_vector_const_iterator { public: typedef stream_vector_const_iterator<_Ty, reader_type> _Myt; typedef std::random_access_iterator_tag iterator_category; typedef _Ty value_type; typedef std::ptrdiff_t difference_type; typedef value_type* pointer; typedef value_type& reference; typedef const value_type* const_pointer; typedef const value_type& const_reference; typedef boost::iostreams::stream_offset stream_offset; stream_vector_const_iterator() : begin_(0), end_(0), next_(0) { } stream_vector_const_iterator(boost::shared_ptr<std::istream> streamPtr, size_t value_size, reader_type reader, stream_offset begin = 0, stream_offset end = std::numeric_limits<stream_offset>::max()) : streamPtr_(streamPtr), value_size_(value_size), begin_(begin), end_(end), next_(begin_), cur_(end), reader_(reader) { } reference operator*() const { if (cur_ != next_) { cur_ = next_; streamPtr_->seekg(next_); reader_(*streamPtr_, curItem_); } return curItem_; } pointer operator->() const { return &curItem_; } _Myt& operator++() { next_ += value_size_; return (*this); } _Myt operator++(int) { _Myt _Tmp = *this; ++*this; return (_Tmp); } _Myt& operator--() { next_ -= value_size_; return (*this); } _Myt operator--(int) { _Myt _Tmp = *this; --*this; return (_Tmp); } _Myt& operator+=(difference_type _Off) { next_ += _Off * value_size_; return (*this); } _Myt operator+(difference_type _Off) const { _Myt _Tmp = *this; return (_Tmp += _Off); } _Myt& operator-=(difference_type _Off) { return (*this += -_Off); } _Myt operator-(difference_type _Off) const { _Myt _Tmp = *this; return (_Tmp -= _Off); } difference_type operator-(const _Myt& that) const { bool gotThis = this->streamPtr_.get() != NULL; bool gotThat = that.streamPtr_.get() != NULL; if (gotThis && gotThat) return (next_ - that.next_) / value_size_; else if (!gotThis && !gotThat) // end() - end() return 0; else if (gotThis) return -(end_ - begin_) / value_size_; else // gotThat return (that.end_ - that.begin_) / that.value_size_; } reference operator[](difference_type _Off) { next_ = _Off * value_size_; return (**this); } bool operator==(const _Myt& that) const { bool gotThis = this->streamPtr_.get() != NULL; bool gotThat = that.streamPtr_.get() != NULL; if (gotThis && gotThat) return next_ == that.next_; else if (!gotThis && !gotThat) // end() == end() return true; else if (gotThis) return next_ >= end_; else // gotThat return that.next_ >= that.end_; } bool operator!=(const _Myt& _Right) const { return (!(*this == _Right)); } bool operator<(const _Myt& _Right) const { return (next_ < _Right.next_); } bool operator>(const _Myt& _Right) const { return (_Right < *this); } bool operator<=(const _Myt& _Right) const { return (!(_Right < *this)); } bool operator>=(const _Myt& _Right) const { return (!(*this < _Right)); } private: boost::shared_ptr<std::istream> streamPtr_; size_t value_size_; boost::iostreams::stream_offset begin_, end_, next_; mutable boost::iostreams::stream_offset cur_; mutable _Ty curItem_; reader_type reader_; }; } // namespace class BinaryIndexStream::Impl { shared_ptr<iostream> isPtr_; stream_offset streamLength_; boost::uint64_t maxIdLength_; size_t size_; size_t entrySize_; // space-padded so entries are constant-length struct EntryIdLessThan { bool operator() (const Entry& lhs, const Entry& rhs) const {return lhs.id < rhs.id;} }; struct EntryIndexLessThan { bool operator() (const Entry& lhs, const Entry& rhs) const {return lhs.index < rhs.index;} }; // the index structure: // // stream length (if overwriting an existing index in the same stream, the existing index may be longer // than the new index, so the length marks the correct end point) // // size_t indicating maximum id length + 1 (all ids are padded with spaces to this length) // // a vector of index entries sorted by index (O(1) access) // // a vector of index entries sorted by id (O(logN) access) struct EntryReader { EntryReader(boost::uint64_t maxIdLength = 0) : maxIdLength_(maxIdLength) {} istream& operator() (istream& is, Entry& entry) const { is >> entry.id; is.seekg(maxIdLength_ - entry.id.length(), std::ios::cur); is.read(reinterpret_cast<char*>(&entry.index), sizeof(entry.index)); is.read(reinterpret_cast<char*>(&entry.offset), sizeof(entry.offset)); return is; } private: boost::uint64_t maxIdLength_; }; typedef stream_vector_const_iterator<Entry, EntryReader> IndexStreamIterator; EntryReader entryReader_; mutable boost::mutex io_mutex; public: Impl(shared_ptr<iostream> isPtr) : isPtr_(isPtr) { if (!isPtr_.get()) throw runtime_error("[BinaryIndexStream::ctor] Stream is null"); isPtr_->clear(); isPtr_->seekg(0); isPtr_->read(reinterpret_cast<char*>(&streamLength_), sizeof(streamLength_)); isPtr_->read(reinterpret_cast<char*>(&maxIdLength_), sizeof(maxIdLength_)); if (*isPtr_) { // calculate entry size (depends on max. id length) entrySize_ = maxIdLength_ + sizeof(streamLength_) + sizeof(maxIdLength_); size_t headerSize = sizeof(streamLength_) + sizeof(maxIdLength_); // calculate number of entries (depends on entry size and stream length) size_ = (streamLength_ - headerSize) / (entrySize_ * 2); entryReader_ = EntryReader(maxIdLength_); } else size_ = maxIdLength_ = streamLength_ = 0; } virtual void create(std::vector<Entry>& entries) { isPtr_->clear(); isPtr_->seekp(0); isPtr_->clear(); size_ = entries.size(); // determine max. id length maxIdLength_ = 0; BOOST_FOREACH(const Entry& entry, entries) maxIdLength_ = std::max(maxIdLength_, (boost::uint64_t) entry.id.length()); ++maxIdLength_; // space-terminated // entry size depends on max. id length entrySize_ = maxIdLength_ + sizeof(streamLength_) + sizeof(maxIdLength_); size_t headerSize = sizeof(streamLength_) + sizeof(maxIdLength_); // write stream length (depends on entry size and number of entries) streamLength_ = headerSize + entrySize_ * size_ * 2; isPtr_->write(reinterpret_cast<const char*>(&streamLength_), sizeof(streamLength_)); // write max. id length isPtr_->write(reinterpret_cast<const char*>(&maxIdLength_), sizeof(maxIdLength_)); // padding buffer string padding(maxIdLength_, ' '); // sort entries by index sort(entries.begin(), entries.end(), EntryIndexLessThan()); // write entries sorted by index BOOST_FOREACH(const Entry& entry, entries) { isPtr_->write(entry.id.c_str(), entry.id.length()); isPtr_->write(padding.c_str(), maxIdLength_ - entry.id.length()); isPtr_->write(reinterpret_cast<const char*>(&entry.index), sizeof(entry.index)); isPtr_->write(reinterpret_cast<const char*>(&entry.offset), sizeof(entry.offset)); } // sort entries by id sort(entries.begin(), entries.end(), EntryIdLessThan()); // write entries sorted by id BOOST_FOREACH(const Entry& entry, entries) { isPtr_->write(entry.id.c_str(), entry.id.length()); isPtr_->write(padding.c_str(), maxIdLength_ - entry.id.length()); isPtr_->write(reinterpret_cast<const char*>(&entry.index), sizeof(entry.index)); isPtr_->write(reinterpret_cast<const char*>(&entry.offset), sizeof(entry.offset)); } // flush stream isPtr_->sync(); entryReader_ = EntryReader(maxIdLength_); } virtual size_t size() const { return size_; } virtual EntryPtr find(const std::string& id) const { if (size_ == 0) return EntryPtr(); EntryPtr entryPtr(new Entry); entryPtr->id = id; const stream_offset indexBegin = sizeof(streamLength_) + sizeof(maxIdLength_) + entrySize_ * size_; const stream_offset indexEnd = streamLength_; { boost::mutex::scoped_lock io_lock(io_mutex); isPtr_->clear(); // binary search is done directly on the index stream IndexStreamIterator itr(isPtr_, entrySize_, entryReader_, indexBegin, indexEnd); itr = lower_bound(itr, IndexStreamIterator(), *entryPtr, EntryIdLessThan()); // return null if search went to indexEnd if (itr == IndexStreamIterator()) return EntryPtr(); // update entry from iterator *entryPtr = *itr; } // return null if the returned iterator isn't exactly equal to the queried one if (entryPtr->id != id) return EntryPtr(); return entryPtr; } virtual EntryPtr find(size_t index) const { if (index >= size_) return EntryPtr(); EntryPtr entryPtr(new Entry); const stream_offset indexBegin = sizeof(streamLength_) + sizeof(maxIdLength_); const stream_offset entryOffset = indexBegin + index * entrySize_; { boost::mutex::scoped_lock io_lock(io_mutex); isPtr_->clear(); isPtr_->seekg(entryOffset); entryReader_(*isPtr_, *entryPtr); } return entryPtr; } }; PWIZ_API_DECL BinaryIndexStream::BinaryIndexStream(shared_ptr<iostream> isPtr) : impl_(new Impl(isPtr)) {} PWIZ_API_DECL void BinaryIndexStream::create(vector<Entry>& entries) {impl_->create(entries);} PWIZ_API_DECL size_t BinaryIndexStream::size() const {return impl_->size();} PWIZ_API_DECL Index::EntryPtr BinaryIndexStream::find(const string& id) const {return impl_->find(id);} PWIZ_API_DECL Index::EntryPtr BinaryIndexStream::find(size_t index) const {return impl_->find(index);} } // namespace data } // namespace pwiz