/*** * Copyright (C) Microsoft. All rights reserved. * Licensed under the MIT license. See LICENSE.txt file in the project root for full license information. * * =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ * * This file defines a stream buffer that is based on a raw pointer and block size. Unlike a vector-based * stream buffer, the buffer cannot be expanded or contracted, it has a fixed capacity. * * For the latest on this and related APIs, please see: https://github.com/Microsoft/cpprestsdk * * =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- ****/ #pragma once #ifndef CASA_RAWPTR_STREAMS_H #define CASA_RAWPTR_STREAMS_H #include "cpprest/astreambuf.h" #include "cpprest/streams.h" #include "pplx/pplxtasks.h" #include #include #include #include namespace Concurrency { namespace streams { // Forward declarations template class rawptr_buffer; namespace details { /// /// The basic_rawptr_buffer class serves as a memory-based steam buffer that supports both writing and reading /// sequences of characters to and from a fixed-size block. /// template class basic_rawptr_buffer : public streams::details::streambuf_state_manager<_CharType> { public: typedef _CharType char_type; typedef typename basic_streambuf<_CharType>::traits traits; typedef typename basic_streambuf<_CharType>::int_type int_type; typedef typename basic_streambuf<_CharType>::pos_type pos_type; typedef typename basic_streambuf<_CharType>::off_type off_type; /// /// Constructor /// basic_rawptr_buffer() : streambuf_state_manager<_CharType>(std::ios_base::in | std::ios_base::out) , m_data(nullptr) , m_current_position(0) , m_size(0) { } /// /// Destructor /// virtual ~basic_rawptr_buffer() { this->_close_read(); this->_close_write(); } protected: /// /// can_seek is used to determine whether a stream buffer supports seeking. /// virtual bool can_seek() const { return this->is_open(); } /// /// has_size is used to determine whether a stream buffer supports size(). /// virtual bool has_size() const { return this->is_open(); } /// /// Gets the size of the stream, if known. Calls to has_size will determine whether /// the result of size can be relied on. /// virtual utility::size64_t size() const { return utility::size64_t(m_size); } /// /// Get the stream buffer size, if one has been set. /// /// The direction of buffering (in or out) /// An implementation that does not support buffering will always return '0'. virtual size_t buffer_size(std::ios_base::openmode = std::ios_base::in) const { return 0; } /// /// Set the stream buffer implementation to buffer or not buffer. /// /// The size to use for internal buffering, 0 if no buffering should be done. /// The direction of buffering (in or out) /// An implementation that does not support buffering will silently ignore calls to this function and it /// will not have /// any effect on what is returned by subsequent calls to buffer_size(). virtual void set_buffer_size(size_t, std::ios_base::openmode = std::ios_base::in) { return; } /// /// For any input stream, in_avail returns the number of characters that are immediately available /// to be consumed without blocking. May be used in conjunction with and sgetn() to /// read data without incurring the overhead of using tasks. /// virtual size_t in_avail() const { // See the comment in seek around the restiction that we do not allow read head to // seek beyond the current size. _ASSERTE(m_current_position <= m_size); msl::safeint3::SafeInt readhead(m_current_position); msl::safeint3::SafeInt writeend(m_size); return (size_t)(writeend - readhead); } /// /// Closes the stream buffer, preventing further read or write operations. /// /// The I/O mode (in or out) to close for. virtual pplx::task close(std::ios_base::openmode mode) { if (mode & std::ios_base::in) { this->_close_read().get(); // Safe to call get() here. } if (mode & std::ios_base::out) { this->_close_write().get(); // Safe to call get() here. } if (!this->can_read() && !this->can_write()) { m_data = nullptr; } // Exceptions will be propagated out of _close_read or _close_write return pplx::task_from_result(); } virtual pplx::task _sync() { return pplx::task_from_result(true); } virtual pplx::task _putc(_CharType ch) { if (m_current_position >= m_size) return pplx::task_from_result(traits::eof()); int_type retVal = (this->write(&ch, 1) == 1) ? static_cast(ch) : traits::eof(); return pplx::task_from_result(retVal); } virtual pplx::task _putn(const _CharType* ptr, size_t count) { msl::safeint3::SafeInt newSize = msl::safeint3::SafeInt(count) + m_current_position; if (newSize > m_size) return pplx::task_from_exception( std::make_exception_ptr(std::runtime_error("Writing past the end of the buffer"))); return pplx::task_from_result(this->write(ptr, count)); } /// /// Allocates a contiguous memory block and returns it. /// /// The number of characters to allocate. /// A pointer to a block to write to, null if the stream buffer implementation does not support /// alloc/commit. _CharType* _alloc(size_t count) { if (!this->can_write()) return nullptr; msl::safeint3::SafeInt readhead(m_current_position); msl::safeint3::SafeInt writeend(m_size); size_t space_left = (size_t)(writeend - readhead); if (space_left < count) return nullptr; // Let the caller copy the data return (_CharType*)(m_data + m_current_position); } /// /// Submits a block already allocated by the stream buffer. /// /// The number of characters to be committed. void _commit(size_t actual) { // Update the write position and satisfy any pending reads update_current_position(m_current_position + actual); } /// /// Gets a pointer to the next already allocated contiguous block of data. /// /// A reference to a pointer variable that will hold the address of the block on success. /// The number of contiguous characters available at the address in 'ptr'. /// true if the operation succeeded, false otherwise. /// /// A return of false does not necessarily indicate that a subsequent read operation would fail, only that /// there is no block to return immediately or that the stream buffer does not support the operation. /// The stream buffer may not de-allocate the block until is called. /// If the end of the stream is reached, the function will return true, a null pointer, and a count of zero; /// a subsequent read will not succeed. /// virtual bool acquire(_Out_ _CharType*& ptr, _Out_ size_t& count) { count = 0; ptr = nullptr; if (!this->can_read()) return false; count = in_avail(); if (count > 0) { ptr = (_CharType*)(m_data + m_current_position); return true; } else { ptr = nullptr; // Can only be open for read OR write, not both. If there is no data then // we have reached the end of the stream so indicate such with true. return true; } } /// /// Releases a block of data acquired using . This frees the stream buffer to /// de-allocate the memory, if it so desires. Move the read position ahead by the count. /// /// A pointer to the block of data to be released. /// The number of characters that were read. virtual void release(_Out_writes_opt_(count) _CharType* ptr, _In_ size_t count) { if (ptr != nullptr) update_current_position(m_current_position + count); } virtual pplx::task _getn(_Out_writes_(count) _CharType* ptr, _In_ size_t count) { return pplx::task_from_result(this->read(ptr, count)); } size_t _sgetn(_Out_writes_(count) _CharType* ptr, _In_ size_t count) { return this->read(ptr, count); } virtual size_t _scopy(_Out_writes_(count) _CharType* ptr, _In_ size_t count) { return this->read(ptr, count, false); } virtual pplx::task _bumpc() { return pplx::task_from_result(this->read_byte(true)); } virtual int_type _sbumpc() { return this->read_byte(true); } virtual pplx::task _getc() { return pplx::task_from_result(this->read_byte(false)); } int_type _sgetc() { return this->read_byte(false); } virtual pplx::task _nextc() { if (m_current_position >= m_size - 1) return pplx::task_from_result(basic_streambuf<_CharType>::traits::eof()); this->read_byte(true); return pplx::task_from_result(this->read_byte(false)); } virtual pplx::task _ungetc() { auto pos = seekoff(-1, std::ios_base::cur, std::ios_base::in); if (pos == (pos_type)traits::eof()) return pplx::task_from_result(traits::eof()); return this->getc(); } /// /// Gets the current read or write position in the stream. /// /// The I/O direction to seek (see remarks) /// The current position. EOF if the operation fails. /// Some streams may have separate write and read cursors. /// For such streams, the direction parameter defines whether to move the read or the write /// cursor. virtual pos_type getpos(std::ios_base::openmode mode) const { if (((mode & std::ios_base::in) && !this->can_read()) || ((mode & std::ios_base::out) && !this->can_write())) return static_cast(traits::eof()); if (mode == std::ios_base::in) return (pos_type)m_current_position; else if (mode == std::ios_base::out) return (pos_type)m_current_position; else return (pos_type)traits::eof(); } /// /// Seeks to the given position. /// /// The offset from the beginning of the stream. /// The I/O direction to seek (see remarks). /// The position. EOF if the operation fails. /// Some streams may have separate write and read cursors. For such streams, the direction parameter /// defines whether to move the read or the write cursor. virtual pos_type seekpos(pos_type position, std::ios_base::openmode mode) { pos_type beg(0); pos_type end(m_size); if (position >= beg) { auto pos = static_cast(position); // Read head if ((mode & std::ios_base::in) && this->can_read()) { if (position <= end) { // We do not allow reads to seek beyond the end or before the start position. update_current_position(pos); return static_cast(m_current_position); } } // Write head if ((mode & std::ios_base::out) && this->can_write()) { // Update write head and satisfy read requests if any update_current_position(pos); return static_cast(m_current_position); } } return static_cast(traits::eof()); } /// /// Seeks to a position given by a relative offset. /// /// The relative position to seek to /// The starting point (beginning, end, current) for the seek. /// The I/O direction to seek (see remarks) /// The position. EOF if the operation fails. /// Some streams may have separate write and read cursors. /// For such streams, the mode parameter defines whether to move the read or the write cursor. virtual pos_type seekoff(off_type offset, std::ios_base::seekdir way, std::ios_base::openmode mode) { pos_type beg = 0; pos_type cur = static_cast(m_current_position); pos_type end = static_cast(m_size); switch (way) { case std::ios_base::beg: return seekpos(beg + offset, mode); case std::ios_base::cur: return seekpos(cur + offset, mode); case std::ios_base::end: return seekpos(end + offset, mode); default: return static_cast(traits::eof()); } } private: template friend class ::concurrency::streams::rawptr_buffer; /// /// Constructor /// /// The address (pointer to) the memory block. /// The memory block size, measured in number of characters. basic_rawptr_buffer(const _CharType* data, size_t size) : streambuf_state_manager<_CharType>(std::ios_base::in) , m_data(const_cast<_CharType*>(data)) , m_size(size) , m_current_position(0) { validate_mode(std::ios_base::in); } /// /// Constructor /// /// The address (pointer to) the memory block. /// The memory block size, measured in number of characters. /// The stream mode (in, out, etc.). basic_rawptr_buffer(_CharType* data, size_t size, std::ios_base::openmode mode) : streambuf_state_manager<_CharType>(mode), m_data(data), m_size(size), m_current_position(0) { validate_mode(mode); } static void validate_mode(std::ios_base::openmode mode) { // Disallow simultaneous use of the stream buffer for writing and reading. if ((mode & std::ios_base::in) && (mode & std::ios_base::out)) throw std::invalid_argument("this combination of modes on raw pointer stream not supported"); } /// /// Determines if the request can be satisfied. /// bool can_satisfy(size_t) const { // We can always satisfy a read, at least partially, unless the // read position is at the very end of the buffer. return (in_avail() > 0); } /// /// Reads a byte from the stream and returns it as int_type. /// Note: This routine must only be called if can_satisfy() returns true. /// int_type read_byte(bool advance = true) { _CharType value; auto read_size = this->read(&value, 1, advance); return read_size == 1 ? static_cast(value) : traits::eof(); } /// /// Reads up to count characters into ptr and returns the count of characters copied. /// The return value (actual characters copied) could be <= count. /// Note: This routine must only be called if can_satisfy() returns true. /// size_t read(_Out_writes_(count) _CharType* ptr, _In_ size_t count, bool advance = true) { if (!can_satisfy(count)) return 0; msl::safeint3::SafeInt request_size(count); msl::safeint3::SafeInt read_size = request_size.Min(in_avail()); size_t newPos = m_current_position + read_size; auto readBegin = m_data + m_current_position; auto readEnd = m_data + newPos; #if defined(_ITERATOR_DEBUG_LEVEL) && _ITERATOR_DEBUG_LEVEL != 0 // Avoid warning C4996: Use checked iterators under SECURE_SCL std::copy(readBegin, readEnd, stdext::checked_array_iterator<_CharType*>(ptr, count)); #else std::copy(readBegin, readEnd, ptr); #endif // _WIN32 if (advance) { update_current_position(newPos); } return (size_t)read_size; } /// /// Write count characters from the ptr into the stream buffer /// size_t write(const _CharType* ptr, size_t count) { if (!this->can_write() || (count == 0)) return 0; msl::safeint3::SafeInt newSize = msl::safeint3::SafeInt(count) + m_current_position; if (newSize > m_size) throw std::runtime_error("Writing past the end of the buffer"); // Copy the data #if defined(_ITERATOR_DEBUG_LEVEL) && _ITERATOR_DEBUG_LEVEL != 0 // Avoid warning C4996: Use checked iterators under SECURE_SCL std::copy(ptr, ptr + count, stdext::checked_array_iterator<_CharType*>(m_data, m_size, m_current_position)); #else std::copy(ptr, ptr + count, m_data + m_current_position); #endif // _WIN32 // Update write head and satisfy pending reads if any update_current_position(newSize); return count; } /// /// Updates the current read or write position /// void update_current_position(size_t newPos) { // The new write head m_current_position = newPos; _ASSERTE(m_current_position <= m_size); } // The actual memory block _CharType* m_data; // The size of the memory block size_t m_size; // Read/write head size_t m_current_position; }; } // namespace details /// /// The rawptr_buffer class serves as a memory-based stream buffer that supports reading /// sequences of characters to or from a fixed-size block. Note that it cannot be used simultaneously for reading as /// well as writing. /// /// /// The data type of the basic element of the rawptr_buffer. /// template class rawptr_buffer : public streambuf<_CharType> { public: typedef _CharType char_type; /// /// Create a rawptr_buffer given a pointer to a memory block and the size of the block. /// /// The address (pointer to) the memory block. /// The memory block size, measured in number of characters. rawptr_buffer(const char_type* data, size_t size) : streambuf(std::shared_ptr>( new details::basic_rawptr_buffer(data, size))) { } /// /// Create a rawptr_buffer given a pointer to a memory block and the size of the block. /// /// The address (pointer to) the memory block. /// The memory block size, measured in number of characters. rawptr_buffer(char_type* data, size_t size, std::ios_base::openmode mode = std::ios::out) : streambuf(std::shared_ptr>( new details::basic_rawptr_buffer(data, size, mode))) { } /// /// Default constructor. /// rawptr_buffer() {} }; /// /// The rawptr_stream class is used to create memory-backed streams that support writing or reading /// sequences of characters to / from a fixed-size block. /// /// /// The data type of the basic element of the rawptr_stream. /// template class rawptr_stream { public: typedef _CharType char_type; typedef rawptr_buffer<_CharType> buffer_type; /// /// Create a rawptr-stream given a pointer to a read-only memory block and the size of the block. /// /// The address (pointer to) the memory block. /// The memory block size, measured in number of characters. /// An opened input stream. static concurrency::streams::basic_istream open_istream(const char_type* data, size_t size) { return concurrency::streams::basic_istream(buffer_type(data, size)); } /// /// Create a rawptr-stream given a pointer to a writable memory block and the size of the block. /// /// The address (pointer to) the memory block. /// The memory block size, measured in number of characters. /// An opened input stream. static concurrency::streams::basic_istream open_istream(char_type* data, size_t size) { return concurrency::streams::basic_istream(buffer_type(data, size, std::ios::in)); } /// /// Create a rawptr-stream given a pointer to a writable memory block and the size of the block. /// /// The address (pointer to) the memory block. /// The memory block size, measured in number of characters. /// An opened output stream. static concurrency::streams::basic_ostream open_ostream(char_type* data, size_t size) { return concurrency::streams::basic_ostream(buffer_type(data, size, std::ios::out)); } }; } // namespace streams } // namespace Concurrency #endif