/*** * Copyright (C) Microsoft. All rights reserved. * Licensed under the MIT license. See LICENSE.txt file in the project root for full license information. * * =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ * * Asynchronous I/O: stream buffer. This is an extension to the PPL concurrency features and therefore * lives in the Concurrency namespace. * * For the latest on this and related APIs, please see: https://github.com/Microsoft/cpprestsdk * * =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- ****/ #pragma once #include "cpprest/asyncrt_utils.h" #include "cpprest/details/basic_types.h" #include "pplx/pplxtasks.h" #include #include #include #include #include #if (defined(_MSC_VER) && (_MSC_VER >= 1800)) && !CPPREST_FORCE_PPLX namespace Concurrency // since namespace pplx = Concurrency #else namespace pplx #endif { namespace details { template pplx::task _do_while(F func) { pplx::task first = func(); return first.then([=](bool guard) -> pplx::task { if (guard) return pplx::details::_do_while(func); else return first; }); } } // namespace details } namespace Concurrency { /// Library for asynchronous streams. namespace streams { /// /// Extending the standard char_traits type with one that adds values and types /// that are unique to "C++ REST SDK" streams. /// /// /// The data type of the basic element of the stream. /// template struct char_traits : std::char_traits<_CharType> { /// /// Some synchronous functions will return this value if the operation /// requires an asynchronous call in a given situation. /// /// An int_type value which implies that an asynchronous call is required. static typename std::char_traits<_CharType>::int_type requires_async() { return std::char_traits<_CharType>::eof() - 1; } }; #if !defined(_WIN32) template<> struct char_traits : private std::char_traits { public: typedef unsigned char char_type; using std::char_traits::eof; using std::char_traits::int_type; using std::char_traits::off_type; using std::char_traits::pos_type; static size_t length(const unsigned char* str) { return std::char_traits::length(reinterpret_cast(str)); } static void assign(unsigned char& left, const unsigned char& right) { left = right; } static unsigned char* assign(unsigned char* left, size_t n, unsigned char value) { return reinterpret_cast( std::char_traits::assign(reinterpret_cast(left), n, static_cast(value))); } static unsigned char* copy(unsigned char* left, const unsigned char* right, size_t n) { return reinterpret_cast( std::char_traits::copy(reinterpret_cast(left), reinterpret_cast(right), n)); } static unsigned char* move(unsigned char* left, const unsigned char* right, size_t n) { return reinterpret_cast( std::char_traits::move(reinterpret_cast(left), reinterpret_cast(right), n)); } static int_type requires_async() { return eof() - 1; } }; #endif namespace details { /// /// Stream buffer base class. /// template class basic_streambuf { public: typedef _CharType char_type; typedef ::concurrency::streams::char_traits<_CharType> traits; typedef typename traits::int_type int_type; typedef typename traits::pos_type pos_type; typedef typename traits::off_type off_type; /// /// Virtual constructor for stream buffers. /// virtual ~basic_streambuf() {} /// /// can_read is used to determine whether a stream buffer will support read operations (get). /// virtual bool can_read() const = 0; /// /// can_write is used to determine whether a stream buffer will support write operations (put). /// virtual bool can_write() const = 0; /// /// can_seek is used to determine whether a stream buffer supports seeking. /// virtual bool can_seek() const = 0; /// /// has_size is used to determine whether a stream buffer supports size(). /// virtual bool has_size() const = 0; /// /// is_eof is used to determine whether a read head has reached the end of the buffer. /// virtual bool is_eof() const = 0; /// /// Gets the stream buffer size, if one has been set. /// /// The direction of buffering (in or out) /// The size of the internal buffer (for the given direction). /// An implementation that does not support buffering will always return 0. virtual size_t buffer_size(std::ios_base::openmode direction = std::ios_base::in) const = 0; /// /// Sets the stream buffer implementation to buffer or not buffer. /// /// The size to use for internal buffering, 0 if no buffering should be done. /// The direction of buffering (in or out) /// An implementation that does not support buffering will silently ignore calls to this function and it /// will not have any effect on what is returned by subsequent calls to . virtual void set_buffer_size(size_t size, std::ios_base::openmode direction = std::ios_base::in) = 0; /// /// For any input stream, in_avail returns the number of characters that are immediately available /// to be consumed without blocking. May be used in conjunction with to read data without /// incurring the overhead of using tasks. /// virtual size_t in_avail() const = 0; /// /// Checks if the stream buffer is open. /// /// No separation is made between open for reading and open for writing. virtual bool is_open() const = 0; /// /// Closes the stream buffer, preventing further read or write operations. /// /// The I/O mode (in or out) to close for. virtual pplx::task close(std::ios_base::openmode mode = (std::ios_base::in | std::ios_base::out)) = 0; /// /// Closes the stream buffer with an exception. /// /// The I/O mode (in or out) to close for. /// Pointer to the exception. virtual pplx::task close(std::ios_base::openmode mode, std::exception_ptr eptr) = 0; /// /// Writes a single character to the stream. /// /// The character to write /// A task that holds the value of the character. This value is EOF if the write operation /// fails. virtual pplx::task putc(_CharType ch) = 0; /// /// Writes a number of characters to the stream. /// /// A pointer to the block of data to be written. /// The number of characters to write. /// A task that holds the number of characters actually written, either 'count' or 0. virtual pplx::task putn(const _CharType* ptr, size_t count) = 0; /// /// Writes a number of characters to the stream. Note: callers must make sure the data to be written is valid until /// the returned task completes. /// /// A pointer to the block of data to be written. /// The number of characters to write. /// A task that holds the number of characters actually written, either 'count' or 0. virtual pplx::task putn_nocopy(const _CharType* ptr, size_t count) = 0; /// /// Reads a single character from the stream and advances the read position. /// /// A task that holds the value of the character. This value is EOF if the read fails. virtual pplx::task bumpc() = 0; /// /// Reads a single character from the stream and advances the read position. /// /// The value of the character. -1 if the read fails. -2 if an asynchronous read is /// required This is a synchronous operation, but is guaranteed to never block. virtual int_type sbumpc() = 0; /// /// Reads a single character from the stream without advancing the read position. /// /// A task that holds the value of the byte. This value is EOF if the read fails. virtual pplx::task getc() = 0; /// /// Reads a single character from the stream without advancing the read position. /// /// The value of the character. EOF if the read fails. if an /// asynchronous read is required This is a synchronous operation, but is guaranteed to never /// block. virtual int_type sgetc() = 0; /// /// Advances the read position, then returns the next character without advancing again. /// /// A task that holds the value of the character. This value is EOF if the read fails. virtual pplx::task nextc() = 0; /// /// Retreats the read position, then returns the current character without advancing. /// /// A task that holds the value of the character. This value is EOF if the read fails, /// requires_async if an asynchronous read is required virtual pplx::task ungetc() = 0; /// /// Reads up to a given number of characters from the stream. /// /// The address of the target memory area. /// The maximum number of characters to read. /// A task that holds the number of characters read. This value is O if the end of the stream is /// reached. virtual pplx::task getn(_Out_writes_(count) _CharType* ptr, _In_ size_t count) = 0; /// /// Copies up to a given number of characters from the stream, synchronously. /// /// The address of the target memory area. /// The maximum number of characters to read. /// The number of characters copied. O if the end of the stream is reached or an asynchronous read is /// required. This is a synchronous operation, but is guaranteed to never block. virtual size_t scopy(_Out_writes_(count) _CharType* ptr, _In_ size_t count) = 0; /// /// Gets the current read or write position in the stream. /// /// The I/O direction to seek (see remarks) /// The current position. EOF if the operation fails. /// Some streams may have separate write and read cursors. /// For such streams, the direction parameter defines whether to move the read or the write /// cursor. virtual pos_type getpos(std::ios_base::openmode direction) const = 0; /// /// Gets the size of the stream, if known. Calls to has_size will determine whether /// the result of size can be relied on. /// virtual utility::size64_t size() const = 0; /// /// Seeks to the given position. /// /// The offset from the beginning of the stream. /// The I/O direction to seek (see remarks). /// The position. EOF if the operation fails. /// Some streams may have separate write and read cursors. For such streams, the direction parameter /// defines whether to move the read or the write cursor. virtual pos_type seekpos(pos_type pos, std::ios_base::openmode direction) = 0; /// /// Seeks to a position given by a relative offset. /// /// The relative position to seek to /// The starting point (beginning, end, current) for the seek. /// The I/O direction to seek (see remarks) /// The position. EOF if the operation fails. /// Some streams may have separate write and read cursors. /// For such streams, the mode parameter defines whether to move the read or the write cursor. virtual pos_type seekoff(off_type offset, std::ios_base::seekdir way, std::ios_base::openmode mode) = 0; /// /// For output streams, flush any internally buffered data to the underlying medium. /// /// A task that returns true if the sync succeeds, false if not. virtual pplx::task sync() = 0; // // Efficient read and write. // // The following routines are intended to be used for more efficient, copy-free, reading and // writing of data from/to the stream. Rather than having the caller provide a buffer into which // data is written or from which it is read, the stream buffer provides a pointer directly to the // internal data blocks that it is using. Since not all stream buffers use internal data structures // to copy data, the functions may not be supported by all. An application that wishes to use this // functionality should therefore first try them and check for failure to support. If there is // such failure, the application should fall back on the copying interfaces (putn / getn) // /// /// Allocates a contiguous memory block and returns it. /// /// The number of characters to allocate. /// A pointer to a block to write to, null if the stream buffer implementation does not support /// alloc/commit. virtual _CharType* alloc(_In_ size_t count) = 0; /// /// Submits a block already allocated by the stream buffer. /// /// The number of characters to be committed. virtual void commit(_In_ size_t count) = 0; /// /// Gets a pointer to the next already allocated contiguous block of data. /// /// A reference to a pointer variable that will hold the address of the block on success. /// The number of contiguous characters available at the address in 'ptr'. /// true if the operation succeeded, false otherwise. /// /// A return of false does not necessarily indicate that a subsequent read operation would fail, only that /// there is no block to return immediately or that the stream buffer does not support the operation. /// The stream buffer may not de-allocate the block until is called. /// If the end of the stream is reached, the function will return true, a null pointer, and a count of zero; /// a subsequent read will not succeed. /// virtual bool acquire(_Out_ _CharType*& ptr, _Out_ size_t& count) = 0; /// /// Releases a block of data acquired using . This frees the stream buffer to /// de-allocate the memory, if it so desires. Move the read position ahead by the count. /// /// A pointer to the block of data to be released. /// The number of characters that were read. virtual void release(_Out_writes_(count) _CharType* ptr, _In_ size_t count) = 0; /// /// Retrieves the stream buffer exception_ptr if it has been set. /// /// Pointer to the exception, if it has been set; otherwise, nullptr will be returned virtual std::exception_ptr exception() const = 0; }; template class streambuf_state_manager : public basic_streambuf<_CharType>, public std::enable_shared_from_this> { public: typedef typename details::basic_streambuf<_CharType>::traits traits; typedef typename details::basic_streambuf<_CharType>::int_type int_type; typedef typename details::basic_streambuf<_CharType>::pos_type pos_type; typedef typename details::basic_streambuf<_CharType>::off_type off_type; /// /// can_read is used to determine whether a stream buffer will support read operations (get). /// virtual bool can_read() const { return m_stream_can_read; } /// /// can_write is used to determine whether a stream buffer will support write operations (put). /// virtual bool can_write() const { return m_stream_can_write; } /// /// Checks if the stream buffer is open. /// /// No separation is made between open for reading and open for writing. virtual bool is_open() const { return can_read() || can_write(); } /// /// Closes the stream buffer, preventing further read or write operations. /// /// The I/O mode (in or out) to close for. virtual pplx::task close(std::ios_base::openmode mode = std::ios_base::in | std::ios_base::out) { pplx::task closeOp = pplx::task_from_result(); if (mode & std::ios_base::in && can_read()) { closeOp = _close_read(); } // After the flush_internal task completed, "this" object may have been destroyed, // accessing the members is invalid, use shared_from_this to avoid access violation exception. auto this_ptr = std::static_pointer_cast(this->shared_from_this()); if (mode & std::ios_base::out && can_write()) { if (closeOp.is_done()) closeOp = closeOp && _close_write().then([this_ptr] {}); // passing down exceptions from closeOp else closeOp = closeOp.then([this_ptr] { return this_ptr->_close_write().then([this_ptr] {}); }); } return closeOp; } /// /// Closes the stream buffer with an exception. /// /// The I/O mode (in or out) to close for. /// Pointer to the exception. virtual pplx::task close(std::ios_base::openmode mode, std::exception_ptr eptr) { if (m_currentException == nullptr) m_currentException = eptr; return close(mode); } /// /// is_eof is used to determine whether a read head has reached the end of the buffer. /// virtual bool is_eof() const { return m_stream_read_eof; } /// /// Writes a single character to the stream. /// /// The character to write /// The value of the character. EOF if the write operation fails virtual pplx::task putc(_CharType ch) { if (!can_write()) return create_exception_checked_value_task(traits::eof()); return create_exception_checked_task(_putc(ch), [](int_type) { return false; // no EOF for write }); } /// /// Writes a number of characters to the stream. /// /// A pointer to the block of data to be written. /// The number of characters to write. /// The number of characters actually written, either 'count' or 0. CASABLANCA_DEPRECATED("This API in some cases performs a copy. It is deprecated and will be removed in a future " "release. Use putn_nocopy instead.") virtual pplx::task putn(const _CharType* ptr, size_t count) { if (!can_write()) return create_exception_checked_value_task(0); if (count == 0) return pplx::task_from_result(0); return create_exception_checked_task(_putn(ptr, count, true), [](size_t) { return false; // no EOF for write }); } /// /// Writes a number of characters to the stream. Note: callers must make sure the data to be written is valid until /// the returned task completes. /// /// A pointer to the block of data to be written. /// The number of characters to write. /// A task that holds the number of characters actually written, either 'count' or 0. virtual pplx::task putn_nocopy(const _CharType* ptr, size_t count) { if (!can_write()) return create_exception_checked_value_task(0); if (count == 0) return pplx::task_from_result(0); return create_exception_checked_task(_putn(ptr, count), [](size_t) { return false; // no EOF for write }); } /// /// Reads a single character from the stream and advances the read position. /// /// The value of the character. EOF if the read fails. virtual pplx::task bumpc() { if (!can_read()) return create_exception_checked_value_task(streambuf_state_manager<_CharType>::traits::eof()); return create_exception_checked_task( _bumpc(), [](int_type val) { return val == streambuf_state_manager<_CharType>::traits::eof(); }); } /// /// Reads a single character from the stream and advances the read position. /// /// The value of the character. -1 if the read fails. -2 if an asynchronous read is /// required This is a synchronous operation, but is guaranteed to never block. virtual int_type sbumpc() { if (!(m_currentException == nullptr)) std::rethrow_exception(m_currentException); if (!can_read()) return traits::eof(); return check_sync_read_eof(_sbumpc()); } /// /// Reads a single character from the stream without advancing the read position. /// /// The value of the byte. EOF if the read fails. virtual pplx::task getc() { if (!can_read()) return create_exception_checked_value_task(traits::eof()); return create_exception_checked_task( _getc(), [](int_type val) { return val == streambuf_state_manager<_CharType>::traits::eof(); }); } /// /// Reads a single character from the stream without advancing the read position. /// /// The value of the character. EOF if the read fails. if an /// asynchronous read is required This is a synchronous operation, but is guaranteed to never /// block. virtual int_type sgetc() { if (!(m_currentException == nullptr)) std::rethrow_exception(m_currentException); if (!can_read()) return traits::eof(); return check_sync_read_eof(_sgetc()); } /// /// Advances the read position, then returns the next character without advancing again. /// /// The value of the character. EOF if the read fails. virtual pplx::task nextc() { if (!can_read()) return create_exception_checked_value_task(traits::eof()); return create_exception_checked_task( _nextc(), [](int_type val) { return val == streambuf_state_manager<_CharType>::traits::eof(); }); } /// /// Retreats the read position, then returns the current character without advancing. /// /// The value of the character. EOF if the read fails. if an /// asynchronous read is required virtual pplx::task ungetc() { if (!can_read()) return create_exception_checked_value_task(traits::eof()); return create_exception_checked_task(_ungetc(), [](int_type) { return false; }); } /// /// Reads up to a given number of characters from the stream. /// /// The address of the target memory area. /// The maximum number of characters to read. /// The number of characters read. O if the end of the stream is reached. virtual pplx::task getn(_Out_writes_(count) _CharType* ptr, _In_ size_t count) { if (!can_read()) return create_exception_checked_value_task(0); if (count == 0) return pplx::task_from_result(0); return create_exception_checked_task(_getn(ptr, count), [](size_t val) { return val == 0; }); } /// /// Copies up to a given number of characters from the stream, synchronously. /// /// The address of the target memory area. /// The maximum number of characters to read. /// The number of characters copied. O if the end of the stream is reached or an asynchronous read is /// required. This is a synchronous operation, but is guaranteed to never block. virtual size_t scopy(_Out_writes_(count) _CharType* ptr, _In_ size_t count) { if (!(m_currentException == nullptr)) std::rethrow_exception(m_currentException); if (!can_read()) return 0; return _scopy(ptr, count); } /// /// For output streams, flush any internally buffered data to the underlying medium. /// /// true if the flush succeeds, false if not virtual pplx::task sync() { if (!can_write()) { if (m_currentException == nullptr) return pplx::task_from_result(); else return pplx::task_from_exception(m_currentException); } return create_exception_checked_task(_sync(), [](bool) { return false; }).then([](bool) {}); } /// /// Retrieves the stream buffer exception_ptr if it has been set. /// /// Pointer to the exception, if it has been set; otherwise, nullptr will be returned. virtual std::exception_ptr exception() const { return m_currentException; } /// /// Allocates a contiguous memory block and returns it. /// /// The number of characters to allocate. /// A pointer to a block to write to, null if the stream buffer implementation does not support /// alloc/commit. This is intended as an advanced API to be used only when it is important to /// avoid extra copies. _CharType* alloc(size_t count) { if (m_alloced) throw std::logic_error( "The buffer is already allocated, this maybe caused by overlap of stream read or write"); _CharType* alloc_result = _alloc(count); if (alloc_result) m_alloced = true; return alloc_result; } /// /// Submits a block already allocated by the stream buffer. /// /// The number of characters to be committed. /// This is intended as an advanced API to be used only when it is important to avoid extra /// copies. void commit(size_t count) { if (!m_alloced) throw std::logic_error("The buffer needs to allocate first"); _commit(count); m_alloced = false; } public: virtual bool can_seek() const = 0; virtual bool has_size() const = 0; virtual utility::size64_t size() const { return 0; } virtual size_t buffer_size(std::ios_base::openmode direction = std::ios_base::in) const = 0; virtual void set_buffer_size(size_t size, std::ios_base::openmode direction = std::ios_base::in) = 0; virtual size_t in_avail() const = 0; virtual pos_type getpos(std::ios_base::openmode direction) const = 0; virtual pos_type seekpos(pos_type pos, std::ios_base::openmode direction) = 0; virtual pos_type seekoff(off_type offset, std::ios_base::seekdir way, std::ios_base::openmode mode) = 0; virtual bool acquire(_Out_writes_(count) _CharType*& ptr, _In_ size_t& count) = 0; virtual void release(_Out_writes_(count) _CharType* ptr, _In_ size_t count) = 0; protected: virtual pplx::task _putc(_CharType ch) = 0; // This API is only needed for file streams and until we remove the deprecated stream buffer putn overload. virtual pplx::task _putn(const _CharType* ptr, size_t count, bool) { // Default to no copy, only the file streams API overloads and performs a copy. return _putn(ptr, count); } virtual pplx::task _putn(const _CharType* ptr, size_t count) = 0; virtual pplx::task _bumpc() = 0; virtual int_type _sbumpc() = 0; virtual pplx::task _getc() = 0; virtual int_type _sgetc() = 0; virtual pplx::task _nextc() = 0; virtual pplx::task _ungetc() = 0; virtual pplx::task _getn(_Out_writes_(count) _CharType* ptr, _In_ size_t count) = 0; virtual size_t _scopy(_Out_writes_(count) _CharType* ptr, _In_ size_t count) = 0; virtual pplx::task _sync() = 0; virtual _CharType* _alloc(size_t count) = 0; virtual void _commit(size_t count) = 0; /// /// The real read head close operation, implementation should override it if there is any resource to be released. /// virtual pplx::task _close_read() { m_stream_can_read = false; return pplx::task_from_result(); } /// /// The real write head close operation, implementation should override it if there is any resource to be released. /// virtual pplx::task _close_write() { m_stream_can_write = false; return pplx::task_from_result(); } protected: streambuf_state_manager(std::ios_base::openmode mode) { m_stream_can_read = (mode & std::ios_base::in) != 0; m_stream_can_write = (mode & std::ios_base::out) != 0; m_stream_read_eof = false; m_alloced = false; } std::exception_ptr m_currentException; // The in/out mode for the buffer std::atomic m_stream_can_read; std::atomic m_stream_can_write; std::atomic m_stream_read_eof; std::atomic m_alloced; private: template pplx::task<_CharType1> create_exception_checked_value_task(const _CharType1& val) const { if (this->exception() == nullptr) return pplx::task_from_result<_CharType1>(static_cast<_CharType1>(val)); else return pplx::task_from_exception<_CharType1>(this->exception()); } // Set exception and eof states for async read template pplx::task<_CharType1> create_exception_checked_task(pplx::task<_CharType1> result, std::function eof_test, std::ios_base::openmode mode = std::ios_base::in | std::ios_base::out) { auto thisPointer = this->shared_from_this(); auto func1 = [=](pplx::task<_CharType1> t1) -> pplx::task<_CharType1> { try { thisPointer->m_stream_read_eof = eof_test(t1.get()); } catch (...) { thisPointer->close(mode, std::current_exception()).get(); return pplx::task_from_exception<_CharType1>(thisPointer->exception(), pplx::task_options()); } if (thisPointer->m_stream_read_eof && !(thisPointer->exception() == nullptr)) return pplx::task_from_exception<_CharType1>(thisPointer->exception(), pplx::task_options()); return t1; }; if (result.is_done()) { // If the data is already available, we should avoid scheduling a continuation, so we do it inline. return func1(result); } else { return result.then(func1); } } // Set eof states for sync read int_type check_sync_read_eof(int_type ch) { m_stream_read_eof = ch == traits::eof(); return ch; } }; } // namespace details // Forward declarations template class basic_istream; template class basic_ostream; /// /// Reference-counted stream buffer. /// /// /// The data type of the basic element of the streambuf. /// /// /// The data type of the basic element of the streambuf. /// template class streambuf : public details::basic_streambuf<_CharType> { public: typedef typename details::basic_streambuf<_CharType>::traits traits; typedef typename details::basic_streambuf<_CharType>::int_type int_type; typedef typename details::basic_streambuf<_CharType>::pos_type pos_type; typedef typename details::basic_streambuf<_CharType>::off_type off_type; typedef typename details::basic_streambuf<_CharType>::char_type char_type; template friend class streambuf; /// /// Constructor. /// /// A pointer to the concrete stream buffer implementation. streambuf(_In_ const std::shared_ptr>& ptr) : m_buffer(ptr) {} /// /// Default constructor. /// streambuf() {} /// /// Converter Constructor. /// /// /// The data type of the basic element of the source streambuf. /// /// The source buffer to be converted. template streambuf(const streambuf& other) : m_buffer(std::static_pointer_cast>( std::static_pointer_cast(other.m_buffer))) { static_assert(std::is_same::pos_type>::value && std::is_same::off_type>::value && std::is_integral<_CharType>::value && std::is_integral::value && std::is_integral::value && std::is_integral::int_type>::value && sizeof(_CharType) == sizeof(AlterCharType) && sizeof(int_type) == sizeof(typename details::basic_streambuf::int_type), "incompatible stream character types"); } /// /// Constructs an input stream head for this stream buffer. /// /// basic_istream. concurrency::streams::basic_istream<_CharType> create_istream() const { if (!can_read()) throw std::runtime_error("stream buffer not set up for input of data"); return concurrency::streams::basic_istream<_CharType>(*this); } /// /// Constructs an output stream for this stream buffer. /// /// basic_ostream concurrency::streams::basic_ostream<_CharType> create_ostream() const { if (!can_write()) throw std::runtime_error("stream buffer not set up for output of data"); return concurrency::streams::basic_ostream<_CharType>(*this); } /// /// Checks if the stream buffer has been initialized or not. /// operator bool() const { return (bool)m_buffer; } /// /// Destructor /// virtual ~streambuf() {} const std::shared_ptr>& get_base() const { if (!m_buffer) { throw std::invalid_argument("Invalid streambuf object"); } return m_buffer; } /// /// can_read is used to determine whether a stream buffer will support read operations (get). /// virtual bool can_read() const { return get_base()->can_read(); } /// /// can_write is used to determine whether a stream buffer will support write operations (put). /// virtual bool can_write() const { return get_base()->can_write(); } /// /// can_seek is used to determine whether a stream buffer supports seeking. /// /// True if seeking is supported, false otherwise. virtual bool can_seek() const { return get_base()->can_seek(); } /// /// has_size is used to determine whether a stream buffer supports size(). /// /// True if the size API is supported, false otherwise. virtual bool has_size() const { return get_base()->has_size(); } /// /// Gets the total number of characters in the stream buffer, if known. Calls to has_size will determine /// whether the result of size can be relied on. /// /// The total number of characters in the stream buffer. virtual utility::size64_t size() const { return get_base()->size(); } /// /// Gets the stream buffer size, if one has been set. /// /// The direction of buffering (in or out) /// The size of the internal buffer (for the given direction). /// An implementation that does not support buffering will always return 0. virtual size_t buffer_size(std::ios_base::openmode direction = std::ios_base::in) const { return get_base()->buffer_size(direction); } /// /// Sets the stream buffer implementation to buffer or not buffer. /// /// The size to use for internal buffering, 0 if no buffering should be done. /// The direction of buffering (in or out) /// An implementation that does not support buffering will silently ignore calls to this function and it /// will not have any effect on what is returned by subsequent calls to . virtual void set_buffer_size(size_t size, std::ios_base::openmode direction = std::ios_base::in) { get_base()->set_buffer_size(size, direction); } /// /// For any input stream, in_avail returns the number of characters that are immediately available /// to be consumed without blocking. May be used in conjunction with to read data without /// incurring the overhead of using tasks. /// /// Number of characters that are ready to read. virtual size_t in_avail() const { return get_base()->in_avail(); } /// /// Checks if the stream buffer is open. /// /// No separation is made between open for reading and open for writing. /// True if the stream buffer is open for reading or writing, false otherwise. virtual bool is_open() const { return get_base()->is_open(); } /// /// is_eof is used to determine whether a read head has reached the end of the buffer. /// /// True if at the end of the buffer, false otherwise. virtual bool is_eof() const { return get_base()->is_eof(); } /// /// Closes the stream buffer, preventing further read or write operations. /// /// The I/O mode (in or out) to close for. virtual pplx::task close(std::ios_base::openmode mode = (std::ios_base::in | std::ios_base::out)) { // We preserve the check here to workaround a Dev10 compiler crash auto buffer = get_base(); return buffer ? buffer->close(mode) : pplx::task_from_result(); } /// /// Closes the stream buffer with an exception. /// /// The I/O mode (in or out) to close for. /// Pointer to the exception. virtual pplx::task close(std::ios_base::openmode mode, std::exception_ptr eptr) { // We preserve the check here to workaround a Dev10 compiler crash auto buffer = get_base(); return buffer ? buffer->close(mode, eptr) : pplx::task_from_result(); } /// /// Writes a single character to the stream. /// /// The character to write /// The value of the character. EOF if the write operation fails virtual pplx::task putc(_CharType ch) { return get_base()->putc(ch); } /// /// Allocates a contiguous memory block and returns it. /// /// The number of characters to allocate. /// A pointer to a block to write to, null if the stream buffer implementation does not support /// alloc/commit. virtual _CharType* alloc(size_t count) { return get_base()->alloc(count); } /// /// Submits a block already allocated by the stream buffer. /// /// The number of characters to be committed. virtual void commit(size_t count) { get_base()->commit(count); } /// /// Gets a pointer to the next already allocated contiguous block of data. /// /// A reference to a pointer variable that will hold the address of the block on success. /// The number of contiguous characters available at the address in 'ptr'. /// true if the operation succeeded, false otherwise. /// /// A return of false does not necessarily indicate that a subsequent read operation would fail, only that /// there is no block to return immediately or that the stream buffer does not support the operation. /// The stream buffer may not de-allocate the block until is called. /// If the end of the stream is reached, the function will return true, a null pointer, and a count of zero; /// a subsequent read will not succeed. /// virtual bool acquire(_Out_ _CharType*& ptr, _Out_ size_t& count) { ptr = nullptr; count = 0; return get_base()->acquire(ptr, count); } /// /// Releases a block of data acquired using . This frees the stream buffer to /// de-allocate the memory, if it so desires. Move the read position ahead by the count. /// /// A pointer to the block of data to be released. /// The number of characters that were read. virtual void release(_Out_writes_(count) _CharType* ptr, _In_ size_t count) { get_base()->release(ptr, count); } /// /// Writes a number of characters to the stream. /// /// A pointer to the block of data to be written. /// The number of characters to write. /// The number of characters actually written, either 'count' or 0. CASABLANCA_DEPRECATED("This API in some cases performs a copy. It is deprecated and will be removed in a future " "release. Use putn_nocopy instead.") virtual pplx::task putn(const _CharType* ptr, size_t count) { return get_base()->putn(ptr, count); } /// /// Writes a number of characters to the stream. Note: callers must make sure the data to be written is valid until /// the returned task completes. /// /// A pointer to the block of data to be written. /// The number of characters to write. /// The number of characters actually written, either 'count' or 0. virtual pplx::task putn_nocopy(const _CharType* ptr, size_t count) { return get_base()->putn_nocopy(ptr, count); } /// /// Reads a single character from the stream and advances the read position. /// /// The value of the character. EOF if the read fails. virtual pplx::task bumpc() { return get_base()->bumpc(); } /// /// Reads a single character from the stream and advances the read position. /// /// The value of the character. -1 if the read fails. -2 if an asynchronous read is /// required This is a synchronous operation, but is guaranteed to never block. virtual typename details::basic_streambuf<_CharType>::int_type sbumpc() { return get_base()->sbumpc(); } /// /// Reads a single character from the stream without advancing the read position. /// /// The value of the byte. EOF if the read fails. virtual pplx::task getc() { return get_base()->getc(); } /// /// Reads a single character from the stream without advancing the read position. /// /// The value of the character. EOF if the read fails. if an /// asynchronous read is required This is a synchronous operation, but is guaranteed to never /// block. virtual typename details::basic_streambuf<_CharType>::int_type sgetc() { return get_base()->sgetc(); } /// /// Advances the read position, then returns the next character without advancing again. /// /// The value of the character. EOF if the read fails. pplx::task nextc() { return get_base()->nextc(); } /// /// Retreats the read position, then returns the current character without advancing. /// /// The value of the character. EOF if the read fails. if an /// asynchronous read is required pplx::task ungetc() { return get_base()->ungetc(); } /// /// Reads up to a given number of characters from the stream. /// /// The address of the target memory area. /// The maximum number of characters to read. /// The number of characters read. O if the end of the stream is reached. virtual pplx::task getn(_Out_writes_(count) _CharType* ptr, _In_ size_t count) { return get_base()->getn(ptr, count); } /// /// Copies up to a given number of characters from the stream, synchronously. /// /// The address of the target memory area. /// The maximum number of characters to read. /// The number of characters copied. O if the end of the stream is reached or an asynchronous read is /// required. This is a synchronous operation, but is guaranteed to never block. virtual size_t scopy(_Out_writes_(count) _CharType* ptr, _In_ size_t count) { return get_base()->scopy(ptr, count); } /// /// Gets the current read or write position in the stream. /// /// The I/O direction to seek (see remarks) /// The current position. EOF if the operation fails. /// Some streams may have separate write and read cursors. /// For such streams, the direction parameter defines whether to move the read or the write /// cursor. virtual typename details::basic_streambuf<_CharType>::pos_type getpos(std::ios_base::openmode direction) const { return get_base()->getpos(direction); } /// /// Seeks to the given position. /// /// The offset from the beginning of the stream. /// The I/O direction to seek (see remarks). /// The position. EOF if the operation fails. /// Some streams may have separate write and read cursors. For such streams, the direction parameter /// defines whether to move the read or the write cursor. virtual typename details::basic_streambuf<_CharType>::pos_type seekpos( typename details::basic_streambuf<_CharType>::pos_type pos, std::ios_base::openmode direction) { return get_base()->seekpos(pos, direction); } /// /// Seeks to a position given by a relative offset. /// /// The relative position to seek to /// The starting point (beginning, end, current) for the seek. /// The I/O direction to seek (see remarks) /// The position. EOF if the operation fails. /// Some streams may have separate write and read cursors. /// For such streams, the mode parameter defines whether to move the read or the write cursor. virtual typename details::basic_streambuf<_CharType>::pos_type seekoff( typename details::basic_streambuf<_CharType>::off_type offset, std::ios_base::seekdir way, std::ios_base::openmode mode) { return get_base()->seekoff(offset, way, mode); } /// /// For output streams, flush any internally buffered data to the underlying medium. /// /// true if the flush succeeds, false if not virtual pplx::task sync() { return get_base()->sync(); } /// /// Retrieves the stream buffer exception_ptr if it has been set. /// /// Pointer to the exception, if it has been set; otherwise, nullptr will be returned virtual std::exception_ptr exception() const { return get_base()->exception(); } private: std::shared_ptr> m_buffer; }; } // namespace streams } // namespace Concurrency