/*** * Copyright (C) Microsoft. All rights reserved. * Licensed under the MIT license. See LICENSE.txt file in the project root for full license information. * * =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ * * This file defines a basic memory-based stream buffer, which allows consumer / producer pairs to communicate * data via a buffer. * * For the latest on this and related APIs, please see: https://github.com/Microsoft/cpprestsdk * * =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- ****/ #pragma once #ifndef CASA_PRODUCER_CONSUMER_STREAMS_H #define CASA_PRODUCER_CONSUMER_STREAMS_H #include "cpprest/astreambuf.h" #include "pplx/pplxtasks.h" #include #include #include #include namespace Concurrency { namespace streams { namespace details { /// /// The basic_producer_consumer_buffer class serves as a memory-based steam buffer that supports both writing and /// reading sequences of characters. It can be used as a consumer/producer buffer. /// template class basic_producer_consumer_buffer : public streams::details::streambuf_state_manager<_CharType> { public: typedef typename ::concurrency::streams::char_traits<_CharType> traits; typedef typename basic_streambuf<_CharType>::int_type int_type; typedef typename basic_streambuf<_CharType>::pos_type pos_type; typedef typename basic_streambuf<_CharType>::off_type off_type; /// /// Constructor /// basic_producer_consumer_buffer(size_t alloc_size) : streambuf_state_manager<_CharType>(std::ios_base::out | std::ios_base::in) , m_alloc_size(alloc_size) , m_allocBlock(nullptr) , m_total(0) , m_total_read(0) , m_total_written(0) , m_synced(0) { } /// /// Destructor /// virtual ~basic_producer_consumer_buffer() { // Note: there is no need to call 'wait()' on the result of close(), // since we happen to know that close() will return without actually // doing anything asynchronously. Should the implementation of _close_write() // change in that regard, this logic may also have to change. this->_close_read(); this->_close_write(); _ASSERTE(m_requests.empty()); m_blocks.clear(); } /// /// can_seek is used to determine whether a stream buffer supports seeking. /// virtual bool can_seek() const { return false; } /// /// has_size is used to determine whether a stream buffer supports size(). /// virtual bool has_size() const { return false; } /// /// Get the stream buffer size, if one has been set. /// /// The direction of buffering (in or out) /// An implementation that does not support buffering will always return '0'. virtual size_t buffer_size(std::ios_base::openmode = std::ios_base::in) const { return 0; } /// /// Sets the stream buffer implementation to buffer or not buffer. /// /// The size to use for internal buffering, 0 if no buffering should be done. /// The direction of buffering (in or out) /// An implementation that does not support buffering will silently ignore calls to this function and it /// will not have any effect on what is returned by subsequent calls to . virtual void set_buffer_size(size_t, std::ios_base::openmode = std::ios_base::in) { return; } /// /// For any input stream, in_avail returns the number of characters that are immediately available /// to be consumed without blocking. May be used in conjunction with to read data without /// incurring the overhead of using tasks. /// virtual size_t in_avail() const { return m_total; } /// /// Gets the current read or write position in the stream. /// /// The I/O direction to seek (see remarks) /// The current position. EOF if the operation fails. /// Some streams may have separate write and read cursors. /// For such streams, the direction parameter defines whether to move the read or the write /// cursor. virtual pos_type getpos(std::ios_base::openmode mode) const { if (((mode & std::ios_base::in) && !this->can_read()) || ((mode & std::ios_base::out) && !this->can_write())) return static_cast(traits::eof()); if (mode == std::ios_base::in) return (pos_type)m_total_read; else if (mode == std::ios_base::out) return (pos_type)m_total_written; else return (pos_type)traits::eof(); } // Seeking is not supported virtual pos_type seekpos(pos_type, std::ios_base::openmode) { return (pos_type)traits::eof(); } virtual pos_type seekoff(off_type, std::ios_base::seekdir, std::ios_base::openmode) { return (pos_type)traits::eof(); } /// /// Allocates a contiguous memory block and returns it. /// /// The number of characters to allocate. /// A pointer to a block to write to, null if the stream buffer implementation does not support /// alloc/commit. virtual _CharType* _alloc(size_t count) { if (!this->can_write()) { return nullptr; } // We always allocate a new block even if the count could be satisfied by // the current write block. While this does lead to wasted space it allows for // easier book keeping _ASSERTE(!m_allocBlock); m_allocBlock = std::make_shared<_block>(count); return m_allocBlock->wbegin(); } /// /// Submits a block already allocated by the stream buffer. /// /// The number of characters to be committed. virtual void _commit(size_t count) { pplx::extensibility::scoped_critical_section_t l(m_lock); // The count does not reflect the actual size of the block. // Since we do not allow any more writes to this block it would suffice. // If we ever change the algorithm to reuse blocks then this needs to be revisited. _ASSERTE((bool)m_allocBlock); m_allocBlock->update_write_head(count); m_blocks.push_back(m_allocBlock); m_allocBlock = nullptr; update_write_head(count); } /// /// Gets a pointer to the next already allocated contiguous block of data. /// /// A reference to a pointer variable that will hold the address of the block on success. /// The number of contiguous characters available at the address in 'ptr'. /// true if the operation succeeded, false otherwise. /// /// A return of false does not necessarily indicate that a subsequent read operation would fail, only that /// there is no block to return immediately or that the stream buffer does not support the operation. /// The stream buffer may not de-allocate the block until is called. /// If the end of the stream is reached, the function will return true, a null pointer, and a count of zero; /// a subsequent read will not succeed. /// virtual bool acquire(_Out_ _CharType*& ptr, _Out_ size_t& count) { count = 0; ptr = nullptr; if (!this->can_read()) return false; pplx::extensibility::scoped_critical_section_t l(m_lock); if (m_blocks.empty()) { // If the write head has been closed then have reached the end of the // stream (return true), otherwise more data could be written later (return false). return !this->can_write(); } else { auto block = m_blocks.front(); count = block->rd_chars_left(); ptr = block->rbegin(); _ASSERTE(ptr != nullptr); return true; } } /// /// Releases a block of data acquired using . This frees the stream buffer to /// de-allocate the memory, if it so desires. Move the read position ahead by the count. /// /// A pointer to the block of data to be released. /// The number of characters that were read. virtual void release(_Out_writes_opt_(count) _CharType* ptr, _In_ size_t count) { if (ptr == nullptr) return; pplx::extensibility::scoped_critical_section_t l(m_lock); auto block = m_blocks.front(); _ASSERTE(block->rd_chars_left() >= count); block->m_read += count; update_read_head(count); } protected: virtual pplx::task _sync() { pplx::extensibility::scoped_critical_section_t l(m_lock); m_synced = in_avail(); fulfill_outstanding(); return pplx::task_from_result(true); } virtual pplx::task _putc(_CharType ch) { return pplx::task_from_result((this->write(&ch, 1) == 1) ? static_cast(ch) : traits::eof()); } virtual pplx::task _putn(const _CharType* ptr, size_t count) { return pplx::task_from_result(this->write(ptr, count)); } virtual pplx::task _getn(_Out_writes_(count) _CharType* ptr, _In_ size_t count) { pplx::task_completion_event tce; enqueue_request(_request(count, [this, ptr, count, tce]() { // VS 2010 resolves read to a global function. Explicit // invocation through the "this" pointer fixes the issue. tce.set(this->read(ptr, count)); })); return pplx::create_task(tce); } virtual size_t _sgetn(_Out_writes_(count) _CharType* ptr, _In_ size_t count) { pplx::extensibility::scoped_critical_section_t l(m_lock); return can_satisfy(count) ? this->read(ptr, count) : (size_t)traits::requires_async(); } virtual size_t _scopy(_Out_writes_(count) _CharType* ptr, _In_ size_t count) { pplx::extensibility::scoped_critical_section_t l(m_lock); return can_satisfy(count) ? this->read(ptr, count, false) : (size_t)traits::requires_async(); } virtual pplx::task _bumpc() { pplx::task_completion_event tce; enqueue_request(_request(1, [this, tce]() { tce.set(this->read_byte(true)); })); return pplx::create_task(tce); } virtual int_type _sbumpc() { pplx::extensibility::scoped_critical_section_t l(m_lock); return can_satisfy(1) ? this->read_byte(true) : traits::requires_async(); } virtual pplx::task _getc() { pplx::task_completion_event tce; enqueue_request(_request(1, [this, tce]() { tce.set(this->read_byte(false)); })); return pplx::create_task(tce); } int_type _sgetc() { pplx::extensibility::scoped_critical_section_t l(m_lock); return can_satisfy(1) ? this->read_byte(false) : traits::requires_async(); } virtual pplx::task _nextc() { pplx::task_completion_event tce; enqueue_request(_request(1, [this, tce]() { this->read_byte(true); tce.set(this->read_byte(false)); })); return pplx::create_task(tce); } virtual pplx::task _ungetc() { return pplx::task_from_result(traits::eof()); } private: /// /// Close the stream buffer for writing /// pplx::task _close_write() { // First indicate that there could be no more writes. // Fulfill outstanding relies on that to flush all the // read requests. this->m_stream_can_write = false; { pplx::extensibility::scoped_critical_section_t l(this->m_lock); // This runs on the thread that called close. this->fulfill_outstanding(); } return pplx::task_from_result(); } /// /// Updates the write head by an offset specified by count /// /// This should be called with the lock held void update_write_head(size_t count) { m_total += count; m_total_written += count; fulfill_outstanding(); } /// /// Writes count characters from ptr into the stream buffer /// size_t write(const _CharType* ptr, size_t count) { if (!this->can_write() || (count == 0)) return 0; // If no one is going to read, why bother? // Just pretend to be writing! if (!this->can_read()) return count; pplx::extensibility::scoped_critical_section_t l(m_lock); // Allocate a new block if necessary if (m_blocks.empty() || m_blocks.back()->wr_chars_left() < count) { msl::safeint3::SafeInt alloc = m_alloc_size.Max(count); m_blocks.push_back(std::make_shared<_block>(alloc)); } // The block at the back is always the write head auto last = m_blocks.back(); auto countWritten = last->write(ptr, count); _ASSERTE(countWritten == count); update_write_head(countWritten); return countWritten; } /// /// Fulfill pending requests /// /// This should be called with the lock held void fulfill_outstanding() { while (!m_requests.empty()) { auto req = m_requests.front(); // If we cannot satisfy the request then we need // to wait for the producer to write data if (!can_satisfy(req.size())) return; // We have enough data to satisfy this request req.complete(); // Remove it from the request queue m_requests.pop(); } } /// /// Represents a memory block /// class _block { public: _block(size_t size) : m_read(0), m_pos(0), m_size(size), m_data(new _CharType[size]) {} ~_block() { delete[] m_data; } // Read head size_t m_read; // Write head size_t m_pos; // Allocation size (of m_data) size_t m_size; // The data store _CharType* m_data; // Pointer to the read head _CharType* rbegin() { return m_data + m_read; } // Pointer to the write head _CharType* wbegin() { return m_data + m_pos; } // Read up to count characters from the block size_t read(_Out_writes_(count) _CharType* dest, _In_ size_t count, bool advance = true) { msl::safeint3::SafeInt avail(rd_chars_left()); auto countRead = static_cast(avail.Min(count)); _CharType* beg = rbegin(); _CharType* end = rbegin() + countRead; #if defined(_ITERATOR_DEBUG_LEVEL) && _ITERATOR_DEBUG_LEVEL != 0 // Avoid warning C4996: Use checked iterators under SECURE_SCL std::copy(beg, end, stdext::checked_array_iterator<_CharType*>(dest, count)); #else std::copy(beg, end, dest); #endif // _WIN32 if (advance) { m_read += countRead; } return countRead; } // Write count characters into the block size_t write(const _CharType* src, size_t count) { msl::safeint3::SafeInt avail(wr_chars_left()); auto countWritten = static_cast(avail.Min(count)); const _CharType* srcEnd = src + countWritten; #if defined(_ITERATOR_DEBUG_LEVEL) && _ITERATOR_DEBUG_LEVEL != 0 // Avoid warning C4996: Use checked iterators under SECURE_SCL std::copy(src, srcEnd, stdext::checked_array_iterator<_CharType*>(wbegin(), static_cast(avail))); #else std::copy(src, srcEnd, wbegin()); #endif // _WIN32 update_write_head(countWritten); return countWritten; } void update_write_head(size_t count) { m_pos += count; } size_t rd_chars_left() const { return m_pos - m_read; } size_t wr_chars_left() const { return m_size - m_pos; } private: // Copy is not supported _block(const _block&); _block& operator=(const _block&); }; /// /// Represents a request on the stream buffer - typically reads /// class _request { public: typedef std::function func_type; _request(size_t count, const func_type& func) : m_func(func), m_count(count) {} void complete() { m_func(); } size_t size() const { return m_count; } private: func_type m_func; size_t m_count; }; void enqueue_request(_request req) { pplx::extensibility::scoped_critical_section_t l(m_lock); if (can_satisfy(req.size())) { // We can immediately fulfill the request. req.complete(); } else { // We must wait for data to arrive. m_requests.push(req); } } /// /// Determine if the request can be satisfied. /// bool can_satisfy(size_t count) { return (m_synced > 0) || (this->in_avail() >= count) || !this->can_write(); } /// /// Reads a byte from the stream and returns it as int_type. /// Note: This routine shall only be called if can_satisfy() returned true. /// /// This should be called with the lock held int_type read_byte(bool advance = true) { _CharType value; auto read_size = this->read(&value, 1, advance); return read_size == 1 ? static_cast(value) : traits::eof(); } /// /// Reads up to count characters into ptr and returns the count of characters copied. /// The return value (actual characters copied) could be <= count. /// Note: This routine shall only be called if can_satisfy() returned true. /// /// This should be called with the lock held size_t read(_Out_writes_(count) _CharType* ptr, _In_ size_t count, bool advance = true) { _ASSERTE(can_satisfy(count)); size_t read = 0; for (auto iter = begin(m_blocks); iter != std::end(m_blocks); ++iter) { auto block = *iter; auto read_from_block = block->read(ptr + read, count - read, advance); read += read_from_block; _ASSERTE(count >= read); if (read == count) break; } if (advance) { update_read_head(read); } return read; } /// /// Updates the read head by the specified offset /// /// This should be called with the lock held void update_read_head(size_t count) { m_total -= count; m_total_read += count; if (m_synced > 0) m_synced = (m_synced > count) ? (m_synced - count) : 0; // The block at the front is always the read head. // Purge empty blocks so that the block at the front reflects the read head while (!m_blocks.empty()) { // If front block is not empty - we are done if (m_blocks.front()->rd_chars_left() > 0) break; // The block has no more data to be read. Relase the block m_blocks.pop_front(); } } // The in/out mode for the buffer std::ios_base::openmode m_mode; // Default block size msl::safeint3::SafeInt m_alloc_size; // Block used for alloc/commit std::shared_ptr<_block> m_allocBlock; // Total available data size_t m_total; size_t m_total_read; size_t m_total_written; // Keeps track of the number of chars that have been flushed but still // remain to be consumed by a read operation. size_t m_synced; // The producer-consumer buffer is intended to be used concurrently by a reader // and a writer, who are not coordinating their accesses to the buffer (coordination // being what the buffer is for in the first place). Thus, we have to protect // against some of the internal data elements against concurrent accesses // and the possibility of inconsistent states. A simple non-recursive lock // should be sufficient for those purposes. pplx::extensibility::critical_section_t m_lock; // Memory blocks std::deque> m_blocks; // Queue of requests std::queue<_request> m_requests; }; } // namespace details /// /// The producer_consumer_buffer class serves as a memory-based steam buffer that supports both writing and reading /// sequences of bytes. It can be used as a consumer/producer buffer. /// /// /// The data type of the basic element of the producer_consumer_buffer. /// /// /// This is a reference-counted version of basic_producer_consumer_buffer. template class producer_consumer_buffer : public streambuf<_CharType> { public: typedef _CharType char_type; /// /// Create a producer_consumer_buffer. /// /// The internal default block size. producer_consumer_buffer(size_t alloc_size = 512) : streambuf<_CharType>(std::make_shared>(alloc_size)) { } }; } // namespace streams } // namespace Concurrency #endif