/*** * Copyright (C) Microsoft. All rights reserved. * Licensed under the MIT license. See LICENSE.txt file in the project root for full license information. * * =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ * * Asynchronous I/O: streams API, used for formatted input and output, based on unformatted I/O using stream buffers * * For the latest on this and related APIs, please see: https://github.com/Microsoft/cpprestsdk * * =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- ****/ #pragma once #ifndef CASA_STREAMS_H #define CASA_STREAMS_H #include "cpprest/astreambuf.h" #include namespace Concurrency { namespace streams { template class basic_ostream; template class basic_istream; namespace details { template class basic_ostream_helper { public: basic_ostream_helper(streams::streambuf buffer) : m_buffer(buffer) {} ~basic_ostream_helper() {} private: template friend class streams::basic_ostream; concurrency::streams::streambuf m_buffer; }; template class basic_istream_helper { public: basic_istream_helper(streams::streambuf buffer) : m_buffer(buffer) {} ~basic_istream_helper() {} private: template friend class streams::basic_istream; concurrency::streams::streambuf m_buffer; }; template struct Value2StringFormatter { template static std::basic_string format(const T& val) { std::basic_ostringstream ss; ss << val; return ss.str(); } }; template<> struct Value2StringFormatter { template static std::basic_string format(const T& val) { std::basic_ostringstream ss; ss << val; return reinterpret_cast(ss.str().c_str()); } static std::basic_string format(const utf16string& val) { return format(utility::conversions::utf16_to_utf8(val)); } }; static const char* _in_stream_msg = "stream not set up for input of data"; static const char* _in_streambuf_msg = "stream buffer not set up for input of data"; static const char* _out_stream_msg = "stream not set up for output of data"; static const char* _out_streambuf_msg = "stream buffer not set up for output of data"; } // namespace details /// /// Base interface for all asynchronous output streams. /// template class basic_ostream { public: typedef char_traits traits; typedef typename traits::int_type int_type; typedef typename traits::pos_type pos_type; typedef typename traits::off_type off_type; /// /// Default constructor /// basic_ostream() {} /// /// Copy constructor /// /// The source object basic_ostream(const basic_ostream& other) : m_helper(other.m_helper) {} /// /// Assignment operator /// /// The source object /// A reference to the stream object that contains the result of the assignment. basic_ostream& operator=(const basic_ostream& other) { m_helper = other.m_helper; return *this; } /// /// Constructor /// /// A stream buffer. basic_ostream(streams::streambuf buffer) : m_helper(std::make_shared>(buffer)) { _verify_and_throw(details::_out_streambuf_msg); } /// /// Close the stream, preventing further write operations. /// pplx::task close() const { return is_valid() ? helper()->m_buffer.close(std::ios_base::out) : pplx::task_from_result(); } /// /// Close the stream with exception, preventing further write operations. /// /// Pointer to the exception. pplx::task close(std::exception_ptr eptr) const { return is_valid() ? helper()->m_buffer.close(std::ios_base::out, eptr) : pplx::task_from_result(); } /// /// Put a single character into the stream. /// /// A character pplx::task write(CharType ch) const { pplx::task result; if (!_verify_and_return_task(details::_out_stream_msg, result)) return result; return helper()->m_buffer.putc(ch); } /// /// Write a single value of "blittable" type T into the stream. /// /// A value of type T. /// /// This is not a replacement for a proper binary serialization solution, but it may /// form the foundation for one. Writing data bit-wise to a stream is a primitive /// operation of binary serialization. /// Currently, no attention is paid to byte order. All data is written in the platform's /// native byte order, which means little-endian on all platforms that have been tested. /// This function is only available for streams using a single-byte character size. /// template CASABLANCA_DEPRECATED( "Unsafe API that will be removed in future releases, use one of the other write overloads instead.") pplx::task write(T value) const { static_assert(sizeof(CharType) == 1, "binary write is only supported for single-byte streams"); static_assert(std::is_trivial::value, "unsafe to use with non-trivial types"); pplx::task result; if (!_verify_and_return_task(details::_out_stream_msg, result)) return result; auto copy = std::make_shared(std::move(value)); return helper() ->m_buffer.putn_nocopy((CharType*)copy.get(), sizeof(T)) .then([copy](pplx::task op) -> size_t { return op.get(); }); } /// /// Write a number of characters from a given stream buffer into the stream. /// /// A source stream buffer. /// The number of characters to write. pplx::task write(streams::streambuf source, size_t count) const { pplx::task result; if (!_verify_and_return_task(details::_out_stream_msg, result)) return result; if (!source.can_read()) return pplx::task_from_exception( std::make_exception_ptr(std::runtime_error("source buffer not set up for input of data"))); if (count == 0) return pplx::task_from_result((size_t)0); auto buffer = helper()->m_buffer; auto data = buffer.alloc(count); if (data != nullptr) { auto post_read = [buffer](pplx::task op) -> pplx::task { auto b = buffer; b.commit(op.get()); return op; }; return source.getn(data, count).then(post_read); } else { size_t available = 0; const bool acquired = source.acquire(data, available); if (available >= count) { auto post_write = [source, data](pplx::task op) -> pplx::task { auto s = source; s.release(data, op.get()); return op; }; return buffer.putn_nocopy(data, count).then(post_write); } else { // Always have to release if acquire returned true. if (acquired) { source.release(data, 0); } std::shared_ptr buf(new CharType[count], [](CharType* buf) { delete[] buf; }); auto post_write = [buf](pplx::task op) -> pplx::task { return op; }; auto post_read = [buf, post_write, buffer](pplx::task op) -> pplx::task { auto b = buffer; return b.putn_nocopy(buf.get(), op.get()).then(post_write); }; return source.getn(buf.get(), count).then(post_read); } } } /// /// Write the specified string to the output stream. /// /// Input string. pplx::task print(const std::basic_string& str) const { pplx::task result; if (!_verify_and_return_task(details::_out_stream_msg, result)) return result; if (str.empty()) { return pplx::task_from_result(0); } else { auto sharedStr = std::make_shared>(str); return helper()->m_buffer.putn_nocopy(sharedStr->c_str(), sharedStr->size()).then([sharedStr](size_t size) { return size; }); } } /// /// Write a value of type T to the output stream. /// /// /// The data type of the object to be written to the stream /// /// Input object. template pplx::task print(const T& val) const { pplx::task result; if (!_verify_and_return_task(details::_out_stream_msg, result)) return result; // TODO in the future this could be improved to have Value2StringFormatter avoid another unnecessary copy // by putting the string on the heap before calling the print string overload. return print(details::Value2StringFormatter::format(val)); } /// /// Write a value of type T to the output stream and append a newline character. /// /// /// The data type of the object to be written to the stream /// /// Input object. template pplx::task print_line(const T& val) const { pplx::task result; if (!_verify_and_return_task(details::_out_stream_msg, result)) return result; auto str = details::Value2StringFormatter::format(val); str.push_back(CharType('\n')); return print(str); } /// /// Flush any buffered output data. /// pplx::task flush() const { pplx::task result; if (!_verify_and_return_task(details::_out_stream_msg, result)) return result; return helper()->m_buffer.sync(); } /// /// Seeks to the specified write position. /// /// An offset relative to the beginning of the stream. /// The new position in the stream. pos_type seek(pos_type pos) const { _verify_and_throw(details::_out_stream_msg); return helper()->m_buffer.seekpos(pos, std::ios_base::out); } /// /// Seeks to the specified write position. /// /// An offset relative to the beginning, current write position, or the end of the stream. /// The starting point (beginning, current, end) for the seek. /// The new position in the stream. pos_type seek(off_type off, std::ios_base::seekdir way) const { _verify_and_throw(details::_out_stream_msg); return helper()->m_buffer.seekoff(off, way, std::ios_base::out); } /// /// Get the current write position, i.e. the offset from the beginning of the stream. /// /// The current write position. pos_type tell() const { _verify_and_throw(details::_out_stream_msg); return helper()->m_buffer.getpos(std::ios_base::out); } /// /// can_seek is used to determine whether the stream supports seeking. /// /// true if the stream supports seeking, false otherwise. bool can_seek() const { return is_valid() && m_helper->m_buffer.can_seek(); } /// /// Test whether the stream has been initialized with a valid stream buffer. /// /// true if the stream has been initialized with a valid stream buffer, false /// otherwise. bool is_valid() const { return (m_helper != nullptr) && ((bool)m_helper->m_buffer); } /// /// Test whether the stream has been initialized or not. /// operator bool() const { return is_valid(); } /// /// Test whether the stream is open for writing. /// /// true if the stream is open for writing, false otherwise. bool is_open() const { return is_valid() && m_helper->m_buffer.can_write(); } /// /// Get the underlying stream buffer. /// /// The underlying stream buffer. concurrency::streams::streambuf streambuf() const { return helper()->m_buffer; } protected: void set_helper(std::shared_ptr> helper) { m_helper = helper; } private: template bool _verify_and_return_task(const char* msg, pplx::task& tsk) const { auto buffer = helper()->m_buffer; if (!(buffer.exception() == nullptr)) { tsk = pplx::task_from_exception(buffer.exception()); return false; } if (!buffer.can_write()) { tsk = pplx::task_from_exception(std::make_exception_ptr(std::runtime_error(msg))); return false; } return true; } void _verify_and_throw(const char* msg) const { auto buffer = helper()->m_buffer; if (!(buffer.exception() == nullptr)) std::rethrow_exception(buffer.exception()); if (!buffer.can_write()) throw std::runtime_error(msg); } std::shared_ptr> helper() const { if (!m_helper) throw std::logic_error("uninitialized stream object"); return m_helper; } std::shared_ptr> m_helper; }; template struct _type_parser_integral_traits { typedef std::false_type _is_integral; typedef std::false_type _is_unsigned; }; #ifdef _WIN32 #define _INT_TRAIT(_t, _low, _high) \ template<> \ struct _type_parser_integral_traits<_t> \ { \ typedef std::true_type _is_integral; \ typedef std::false_type _is_unsigned; \ static const int64_t _min = _low; \ static const int64_t _max = _high; \ }; #define _UINT_TRAIT(_t, _low, _high) \ template<> \ struct _type_parser_integral_traits<_t> \ { \ typedef std::true_type _is_integral; \ typedef std::true_type _is_unsigned; \ static const uint64_t _max = _high; \ }; _INT_TRAIT(char, INT8_MIN, INT8_MAX) _INT_TRAIT(signed char, INT8_MIN, INT8_MAX) _INT_TRAIT(short, INT16_MIN, INT16_MAX) #if defined(_NATIVE_WCHAR_T_DEFINED) _INT_TRAIT(wchar_t, WCHAR_MIN, WCHAR_MAX) #endif _INT_TRAIT(int, INT32_MIN, INT32_MAX) _INT_TRAIT(long, LONG_MIN, LONG_MAX) _INT_TRAIT(long long, LLONG_MIN, LLONG_MAX) _UINT_TRAIT(unsigned char, UINT8_MIN, UINT8_MAX) _UINT_TRAIT(unsigned short, UINT16_MIN, UINT16_MAX) _UINT_TRAIT(unsigned int, UINT32_MIN, UINT32_MAX) _UINT_TRAIT(unsigned long, ULONG_MIN, ULONG_MAX) _UINT_TRAIT(unsigned long long, ULLONG_MIN, ULLONG_MAX) #else #define _INT_TRAIT(_t) \ template<> \ struct _type_parser_integral_traits<_t> \ { \ typedef std::true_type _is_integral; \ typedef std::false_type _is_unsigned; \ static const int64_t _min = std::numeric_limits<_t>::min(); \ static const int64_t _max = (std::numeric_limits<_t>::max)(); \ }; #define _UINT_TRAIT(_t) \ template<> \ struct _type_parser_integral_traits<_t> \ { \ typedef std::true_type _is_integral; \ typedef std::true_type _is_unsigned; \ static const uint64_t _max = (std::numeric_limits<_t>::max)(); \ }; _INT_TRAIT(char) _INT_TRAIT(signed char) _INT_TRAIT(short) _INT_TRAIT(utf16char) _INT_TRAIT(int) _INT_TRAIT(long) _INT_TRAIT(long long) _UINT_TRAIT(unsigned char) _UINT_TRAIT(unsigned short) _UINT_TRAIT(unsigned int) _UINT_TRAIT(unsigned long) _UINT_TRAIT(unsigned long long) #endif template class _type_parser_base { public: typedef char_traits traits; typedef typename traits::int_type int_type; _type_parser_base() {} protected: // Aid in parsing input: skipping whitespace characters. static pplx::task _skip_whitespace(streams::streambuf buffer); // Aid in parsing input: peek at a character at a time, call type-specific code to examine, extract value when done. // AcceptFunctor should model std::function, int_type)> // ExtractFunctor should model std::function(std::shared_ptr)> template static pplx::task _parse_input(streams::streambuf buffer, AcceptFunctor accept_character, ExtractFunctor extract); }; /// /// Class used to handle asynchronous parsing for basic_istream::extract. To support new /// types create a new template specialization and implement the parse function. /// template class type_parser { public: static pplx::task parse(streams::streambuf buffer) { typename _type_parser_integral_traits::_is_integral ii; typename _type_parser_integral_traits::_is_unsigned ui; return _parse(buffer, ii, ui); } private: static pplx::task _parse(streams::streambuf buffer, std::false_type, std::false_type) { _parse_floating_point(buffer); } static pplx::task _parse(streams::streambuf, std::false_type, std::true_type) { #ifdef _WIN32 static_assert(false, "type is not supported for extraction from a stream"); #else throw std::runtime_error("type is not supported for extraction from a stream"); #endif } static pplx::task _parse(streams::streambuf buffer, std::true_type, std::false_type) { return type_parser::parse(buffer).then([](pplx::task op) -> T { int64_t val = op.get(); if (val <= _type_parser_integral_traits::_max && val >= _type_parser_integral_traits::_min) return (T)val; else throw std::range_error("input out of range for target type"); }); } static pplx::task _parse(streams::streambuf buffer, std::true_type, std::true_type) { return type_parser::parse(buffer).then([](pplx::task op) -> T { uint64_t val = op.get(); if (val <= _type_parser_integral_traits::_max) return (T)val; else throw std::range_error("input out of range for target type"); }); } }; /// /// Base interface for all asynchronous input streams. /// template class basic_istream { public: typedef char_traits traits; typedef typename traits::int_type int_type; typedef typename traits::pos_type pos_type; typedef typename traits::off_type off_type; /// /// Default constructor /// basic_istream() {} /// /// Constructor /// /// /// The data type of the basic element of the stream. /// /// A stream buffer. template basic_istream(streams::streambuf buffer) : m_helper(std::make_shared>(std::move(buffer))) { _verify_and_throw(details::_in_streambuf_msg); } /// /// Copy constructor /// /// The source object basic_istream(const basic_istream& other) : m_helper(other.m_helper) {} /// /// Assignment operator /// /// The source object /// A reference to the stream object that contains the result of the assignment. basic_istream& operator=(const basic_istream& other) { m_helper = other.m_helper; return *this; } /// /// Close the stream, preventing further read operations. /// pplx::task close() const { return is_valid() ? helper()->m_buffer.close(std::ios_base::in) : pplx::task_from_result(); } /// /// Close the stream with exception, preventing further read operations. /// /// Pointer to the exception. pplx::task close(std::exception_ptr eptr) const { return is_valid() ? m_helper->m_buffer.close(std::ios_base::in, eptr) : pplx::task_from_result(); } /// /// Tests whether last read cause the stream reach EOF. /// /// True if the read head has reached the end of the stream, false otherwise. bool is_eof() const { return is_valid() ? m_helper->m_buffer.is_eof() : false; } /// /// Get the next character and return it as an int_type. Advance the read position. /// /// A task that holds the next character as an int_type on successful completion. pplx::task read() const { pplx::task result; if (!_verify_and_return_task(details::_in_stream_msg, result)) return result; return helper()->m_buffer.bumpc(); } /// /// Read a single value of "blittable" type T from the stream. /// /// A value of type T. /// /// This is not a replacement for a proper binary serialization solution, but it may /// form the foundation for one. Reading data bit-wise to a stream is a primitive /// operation of binary serialization. /// Currently, no attention is paid to byte order. All data is read in the platform's /// native byte order, which means little-endian on all platforms that have been tested. /// This function is only available for streams using a single-byte character size. /// template CASABLANCA_DEPRECATED( "Unsafe API that will be removed in future releases, use one of the other read overloads instead.") pplx::task read() const { static_assert(sizeof(CharType) == 1, "binary read is only supported for single-byte streams"); static_assert(std::is_trivial::value, "unsafe to use with non-trivial types"); pplx::task result; if (!_verify_and_return_task(details::_in_stream_msg, result)) return result; auto copy = std::make_shared(); return helper()->m_buffer.getn((CharType*)copy.get(), sizeof(T)).then([copy](pplx::task) -> T { return std::move(*copy); }); } /// /// Reads up to count characters and place into the provided buffer. /// /// An async stream buffer supporting write operations. /// The maximum number of characters to read /// A task that holds the number of characters read. This number is 0 if the end of the stream is /// reached. pplx::task read(streams::streambuf target, size_t count) const { pplx::task result; if (!_verify_and_return_task(details::_in_stream_msg, result)) return result; if (!target.can_write()) return pplx::task_from_exception( std::make_exception_ptr(std::runtime_error("target not set up for output of data"))); // Capture 'buffer' rather than 'helper' here due to VC++ 2010 limitations. auto buffer = helper()->m_buffer; auto data = target.alloc(count); if (data != nullptr) { auto post_read = [target](pplx::task op) -> pplx::task { auto t = target; t.commit(op.get()); return op; }; return buffer.getn(data, count).then(post_read); } else { size_t available = 0; const bool acquired = buffer.acquire(data, available); if (available >= count) { auto post_write = [buffer, data](pplx::task op) -> pplx::task { auto b = buffer; b.release(data, op.get()); return op; }; return target.putn_nocopy(data, count).then(post_write); } else { // Always have to release if acquire returned true. if (acquired) { buffer.release(data, 0); } std::shared_ptr buf(new CharType[count], [](CharType* buf) { delete[] buf; }); auto post_write = [buf](pplx::task op) -> pplx::task { return op; }; auto post_read = [buf, target, post_write](pplx::task op) -> pplx::task { auto trg = target; return trg.putn_nocopy(buf.get(), op.get()).then(post_write); }; return helper()->m_buffer.getn(buf.get(), count).then(post_read); } } } /// /// Get the next character and return it as an int_type. Do not advance the read position. /// /// A task that holds the character, widened to an integer. This character is EOF when the peek /// operation fails. pplx::task peek() const { pplx::task result; if (!_verify_and_return_task(details::_in_stream_msg, result)) return result; return helper()->m_buffer.getc(); } /// /// Read characters until a delimiter or EOF is found, and place them into the target. /// Proceed past the delimiter, but don't include it in the target buffer. /// /// An async stream buffer supporting write operations. /// The delimiting character to stop the read at. /// A task that holds the number of characters read. pplx::task read_to_delim(streams::streambuf target, int_type delim) const { pplx::task result; if (!_verify_and_return_task(details::_in_stream_msg, result)) return result; if (!target.can_write()) return pplx::task_from_exception( std::make_exception_ptr(std::runtime_error("target not set up for output of data"))); // Capture 'buffer' rather than 'helper' here due to VC++ 2010 limitations. auto buffer = helper()->m_buffer; int_type req_async = traits::requires_async(); std::shared_ptr<_read_helper> _locals = std::make_shared<_read_helper>(); auto flush = [=]() mutable { return target.putn_nocopy(_locals->outbuf, _locals->write_pos).then([=](size_t wrote) mutable { _locals->total += wrote; _locals->write_pos = 0; return target.sync(); }); }; auto update = [=](int_type ch) mutable { if (ch == traits::eof()) return false; if (ch == delim) return false; _locals->outbuf[_locals->write_pos] = static_cast(ch); _locals->write_pos += 1; if (_locals->is_full()) { // Flushing synchronously because performance is terrible if we // schedule an empty task. This isn't on a user's thread. flush().get(); } return true; }; auto loop = pplx::details::_do_while([=]() mutable -> pplx::task { while (buffer.in_avail() > 0) { int_type ch = buffer.sbumpc(); if (ch == req_async) { break; } if (!update(ch)) { return pplx::task_from_result(false); } } return buffer.bumpc().then(update); }); return loop.then([=](bool) mutable { return flush().then([=] { return _locals->total; }); }); } /// /// Read until reaching a newline character. The newline is not included in the target. /// /// An asynchronous stream buffer supporting write operations. /// A task that holds the number of characters read. This number is 0 if the end of the stream is /// reached. pplx::task read_line(streams::streambuf target) const { pplx::task result; if (!_verify_and_return_task(details::_in_stream_msg, result)) return result; if (!target.can_write()) return pplx::task_from_exception( std::make_exception_ptr(std::runtime_error("target not set up for receiving data"))); // Capture 'buffer' rather than 'helper' here due to VC++ 2010 limitations. concurrency::streams::streambuf buffer = helper()->m_buffer; int_type req_async = traits::requires_async(); std::shared_ptr<_read_helper> _locals = std::make_shared<_read_helper>(); auto flush = [=]() mutable { return target.putn_nocopy(_locals->outbuf, _locals->write_pos).then([=](size_t wrote) mutable { _locals->total += wrote; _locals->write_pos = 0; return target.sync(); }); }; auto update = [=](int_type ch) mutable { if (ch == traits::eof()) return false; if (ch == '\n') return false; if (ch == '\r') { _locals->saw_CR = true; return true; } _locals->outbuf[_locals->write_pos] = static_cast(ch); _locals->write_pos += 1; if (_locals->is_full()) { // Flushing synchronously because performance is terrible if we // schedule an empty task. This isn't on a user's thread. flush().wait(); } return true; }; auto update_after_cr = [=](int_type ch) mutable -> pplx::task { if (ch == traits::eof()) return pplx::task_from_result(false); if (ch == '\n') { return buffer.bumpc().then([](int_type) { return false; }); } return pplx::task_from_result(false); }; auto loop = pplx::details::_do_while([=]() mutable -> pplx::task { while (buffer.in_avail() > 0) { int_type ch; if (_locals->saw_CR) { ch = buffer.sgetc(); if (ch == '\n') buffer.sbumpc(); return pplx::task_from_result(false); } ch = buffer.sbumpc(); if (ch == req_async) break; if (!update(ch)) { return pplx::task_from_result(false); } } if (_locals->saw_CR) { return buffer.getc().then(update_after_cr); } return buffer.bumpc().then(update); }); return loop.then([=](bool) mutable { return flush().then([=] { return _locals->total; }); }); } /// /// Read until reaching the end of the stream. /// /// An asynchronous stream buffer supporting write operations. /// The number of characters read. pplx::task read_to_end(streams::streambuf target) const { pplx::task result; if (!_verify_and_return_task("stream not set up for output of data", result)) return result; if (!target.can_write()) return pplx::task_from_exception( std::make_exception_ptr(std::runtime_error("source buffer not set up for input of data"))); auto l_buffer = helper()->m_buffer; auto l_buf_size = this->buf_size; std::shared_ptr<_read_helper> l_locals = std::make_shared<_read_helper>(); auto copy_to_target = [l_locals, target, l_buffer, l_buf_size]() mutable -> pplx::task { // We need to capture these, because the object itself may go away // before we're done processing the data. // auto locs = _locals; // auto trg = target; return l_buffer.getn(l_locals->outbuf, l_buf_size).then([=](size_t rd) mutable -> pplx::task { if (rd == 0) return pplx::task_from_result(false); // Must be nested to capture rd return target.putn_nocopy(l_locals->outbuf, rd) .then([target, l_locals, rd](size_t wr) mutable -> pplx::task { l_locals->total += wr; if (rd != wr) // Number of bytes written is less than number of bytes received. throw std::runtime_error("failed to write all bytes"); return target.sync().then([]() { return true; }); }); }); }; auto loop = pplx::details::_do_while(copy_to_target); return loop.then([=](bool) mutable -> size_t { return l_locals->total; }); } /// /// Seeks to the specified write position. /// /// An offset relative to the beginning of the stream. /// The new position in the stream. pos_type seek(pos_type pos) const { _verify_and_throw(details::_in_stream_msg); return helper()->m_buffer.seekpos(pos, std::ios_base::in); } /// /// Seeks to the specified write position. /// /// An offset relative to the beginning, current write position, or the end of the stream. /// The starting point (beginning, current, end) for the seek. /// The new position in the stream. pos_type seek(off_type off, std::ios_base::seekdir way) const { _verify_and_throw(details::_in_stream_msg); return helper()->m_buffer.seekoff(off, way, std::ios_base::in); } /// /// Get the current write position, i.e. the offset from the beginning of the stream. /// /// The current write position. pos_type tell() const { _verify_and_throw(details::_in_stream_msg); return helper()->m_buffer.getpos(std::ios_base::in); } /// /// can_seek is used to determine whether the stream supports seeking. /// /// true if the stream supports seeking, false otherwise. bool can_seek() const { return is_valid() && m_helper->m_buffer.can_seek(); } /// /// Test whether the stream has been initialized with a valid stream buffer. /// bool is_valid() const { return (m_helper != nullptr) && ((bool)m_helper->m_buffer); } /// /// Test whether the stream has been initialized or not. /// operator bool() const { return is_valid(); } /// /// Test whether the stream is open for writing. /// /// true if the stream is open for writing, false otherwise. bool is_open() const { return is_valid() && m_helper->m_buffer.can_read(); } /// /// Get the underlying stream buffer. /// concurrency::streams::streambuf streambuf() const { return helper()->m_buffer; } /// /// Read a value of type T from the stream. /// /// /// Supports the C++ primitive types. Can be expanded to additional types /// by adding template specializations for type_parser. /// /// /// The data type of the element to be read from the stream. /// /// A task that holds the element read from the stream. template pplx::task extract() const { pplx::task result; if (!_verify_and_return_task(details::_in_stream_msg, result)) return result; return type_parser::parse(helper()->m_buffer); } private: template bool _verify_and_return_task(const char* msg, pplx::task& tsk) const { auto buffer = helper()->m_buffer; if (!(buffer.exception() == nullptr)) { tsk = pplx::task_from_exception(buffer.exception()); return false; } if (!buffer.can_read()) { tsk = pplx::task_from_exception(std::make_exception_ptr(std::runtime_error(msg))); return false; } return true; } void _verify_and_throw(const char* msg) const { auto buffer = helper()->m_buffer; if (!(buffer.exception() == nullptr)) std::rethrow_exception(buffer.exception()); if (!buffer.can_read()) throw std::runtime_error(msg); } std::shared_ptr> helper() const { if (!m_helper) throw std::logic_error("uninitialized stream object"); return m_helper; } static const size_t buf_size = 16 * 1024; struct _read_helper { size_t total; CharType outbuf[buf_size]; size_t write_pos; bool saw_CR; bool is_full() const { return write_pos == buf_size; } _read_helper() : total(0), write_pos(0), saw_CR(false) {} }; std::shared_ptr> m_helper; }; typedef basic_ostream ostream; typedef basic_istream istream; typedef basic_ostream wostream; typedef basic_istream wistream; template pplx::task _type_parser_base::_skip_whitespace(streams::streambuf buffer) { int_type req_async = traits::requires_async(); auto update = [=](int_type ch) mutable { if (isspace(ch)) { if (buffer.sbumpc() == req_async) { // Synchronously because performance is terrible if we // schedule an empty task. This isn't on a user's thread. buffer.nextc().wait(); } return true; } return false; }; auto loop = pplx::details::_do_while([=]() mutable -> pplx::task { while (buffer.in_avail() > 0) { int_type ch = buffer.sgetc(); if (ch == req_async) break; if (!update(ch)) { return pplx::task_from_result(false); } } return buffer.getc().then(update); }); return loop.then([=](pplx::task op) { op.wait(); }); } template template pplx::task _type_parser_base::_parse_input(concurrency::streams::streambuf buffer, AcceptFunctor accept_character, ExtractFunctor extract) { std::shared_ptr state = std::make_shared(); auto update = [=](pplx::task op) -> pplx::task { int_type ch = op.get(); if (ch == traits::eof()) return pplx::task_from_result(false); bool accptd = accept_character(state, ch); if (!accptd) return pplx::task_from_result(false); // We peeked earlier, so now we must advance the position. concurrency::streams::streambuf buf = buffer; return buf.bumpc().then([](int_type) { return true; }); }; auto peek_char = [=]() -> pplx::task { concurrency::streams::streambuf buf = buffer; // If task results are immediately available, there's little need to use ".then()," // so optimize for prompt values. auto get_op = buf.getc(); while (get_op.is_done()) { auto condition = update(get_op); if (!condition.is_done() || !condition.get()) return condition; get_op = buf.getc(); } return get_op.then(update); }; auto finish = [=](pplx::task op) -> pplx::task { op.wait(); pplx::task result = extract(state); return result; }; return _skip_whitespace(buffer).then([=](pplx::task op) -> pplx::task { op.wait(); return pplx::details::_do_while(peek_char).then(finish); }); } template class type_parser> : public _type_parser_base { typedef _type_parser_base base; public: typedef typename base::traits traits; typedef typename base::int_type int_type; static pplx::task parse(streams::streambuf buffer) { return base::template _parse_input, std::string>( buffer, _accept_char, _extract_result); } private: static bool _accept_char(std::shared_ptr> state, int_type ch) { if (ch == traits::eof() || isspace(ch)) return false; state->push_back(CharType(ch)); return true; } static pplx::task> _extract_result(std::shared_ptr> state) { return pplx::task_from_result(*state); } }; template class type_parser : public _type_parser_base { typedef _type_parser_base base; public: typedef typename base::traits traits; typedef typename base::int_type int_type; static pplx::task parse(streams::streambuf buffer) { return base::template _parse_input<_int64_state, int64_t>(buffer, _accept_char, _extract_result); } private: struct _int64_state { _int64_state() : result(0), correct(false), minus(0) {} int64_t result; bool correct; char minus; // 0 -- no sign, 1 -- plus, 2 -- minus }; static bool _accept_char(std::shared_ptr<_int64_state> state, int_type ch) { if (ch == traits::eof()) return false; if (state->minus == 0) { // OK to find a sign. if (!::isdigit(ch) && ch != int_type('+') && ch != int_type('-')) return false; } else { if (!::isdigit(ch)) return false; } // At least one digit was found. state->correct = true; if (ch == int_type('+')) { state->minus = 1; } else if (ch == int_type('-')) { state->minus = 2; } else { if (state->minus == 0) state->minus = 1; // Shift the existing value by 10, then add the new value. bool positive = state->result >= 0; state->result *= 10; state->result += int64_t(ch - int_type('0')); if ((state->result >= 0) != positive) { state->correct = false; return false; } } return true; } static pplx::task _extract_result(std::shared_ptr<_int64_state> state) { if (!state->correct) throw std::range_error("integer value is too large to fit in 64 bits"); int64_t result = (state->minus == 2) ? -state->result : state->result; return pplx::task_from_result(result); } }; template struct _double_state { _double_state() : result(0) , minus(0) , after_comma(0) , exponent(false) , exponent_number(0) , exponent_minus(0) , complete(false) , p_exception_string() { } FloatingPoint result; char minus; // 0 -- no sign, 1 -- plus, 2 -- minus int after_comma; bool exponent; int exponent_number; char exponent_minus; // 0 -- no sign, 1 -- plus, 2 -- minus bool complete; std::string p_exception_string; }; template static std::string create_exception_message(int_type ch, bool exponent) { std::ostringstream os; os << "Invalid character '" << char(ch) << "'" << (exponent ? " in exponent" : ""); return os.str(); } template static bool _accept_char(std::shared_ptr<_double_state> state, int_type ch) { if (state->minus == 0) { if (!::isdigit(ch) && ch != int_type('.') && ch != int_type('+') && ch != int_type('-')) { if (!state->complete) state->p_exception_string = create_exception_message(ch, false); return false; } } else { if (!state->exponent && !::isdigit(ch) && ch != int_type('.') && ch != int_type('E') && ch != int_type('e')) { if (!state->complete) state->p_exception_string = create_exception_message(ch, false); return false; } if (state->exponent && !::isdigit(ch) && ch != int_type('+') && ch != int_type('-')) { if (!state->complete) state->p_exception_string = create_exception_message(ch, true); return false; } } switch (ch) { case int_type('+'): state->complete = false; if (state->exponent) { if (state->exponent_minus != 0) { state->p_exception_string = "The exponent sign already set"; return false; } state->exponent_minus = 1; } else { state->minus = 1; } break; case int_type('-'): state->complete = false; if (state->exponent) { if (state->exponent_minus != 0) { state->p_exception_string = "The exponent sign already set"; return false; } state->exponent_minus = 2; } else { state->minus = 2; } break; case int_type('.'): state->complete = false; if (state->after_comma > 0) return false; state->after_comma = 1; break; case int_type('E'): case int_type('e'): state->complete = false; if (state->exponent) return false; state->exponent_number = 0; state->exponent = true; break; default: state->complete = true; if (!state->exponent) { if (state->minus == 0) state->minus = 1; state->result *= 10; state->result += int64_t(ch - int_type('0')); if (state->after_comma > 0) state->after_comma++; } else { if (state->exponent_minus == 0) state->exponent_minus = 1; state->exponent_number *= 10; state->exponent_number += int64_t(ch - int_type('0')); } } return true; } template static pplx::task _extract_result(std::shared_ptr<_double_state> state) { if (state->p_exception_string.length() > 0) throw std::runtime_error(state->p_exception_string.c_str()); if (!state->complete && state->exponent) throw std::runtime_error("Incomplete exponent"); FloatingPoint result = static_cast((state->minus == 2) ? -state->result : state->result); if (state->exponent_minus == 2) state->exponent_number = 0 - state->exponent_number; if (state->after_comma > 0) state->exponent_number -= state->after_comma - 1; if (state->exponent_number >= 0) { result *= pow(FloatingPoint(10.0), state->exponent_number); #pragma push_macro("max") #undef max if (result > std::numeric_limits::max() || result < -std::numeric_limits::max()) throw std::overflow_error("The value is too big"); #pragma pop_macro("max") } else { bool is_zero = (result == 0); result /= pow(FloatingPoint(10.0), -state->exponent_number); if (!is_zero && result > -std::numeric_limits::denorm_min() && result < std::numeric_limits::denorm_min()) throw std::underflow_error("The value is too small"); } return pplx::task_from_result(result); } template class type_parser : public _type_parser_base { typedef _type_parser_base base; public: typedef typename base::traits traits; typedef typename base::int_type int_type; static pplx::task parse(streams::streambuf buffer) { return base::template _parse_input<_double_state, double>( buffer, _accept_char, _extract_result); } protected: }; template class type_parser : public _type_parser_base { typedef _type_parser_base base; public: typedef typename base::traits traits; typedef typename base::int_type int_type; static pplx::task parse(streams::streambuf buffer) { return base::template _parse_input<_double_state, float>( buffer, _accept_char, _extract_result); } protected: }; template class type_parser : public _type_parser_base { typedef _type_parser_base base; public: typedef typename base::traits traits; typedef typename base::int_type int_type; static pplx::task parse(streams::streambuf buffer) { return base::template _parse_input<_uint64_state, uint64_t>(buffer, _accept_char, _extract_result); } private: struct _uint64_state { _uint64_state() : result(0), correct(false) {} uint64_t result; bool correct; }; static bool _accept_char(std::shared_ptr<_uint64_state> state, int_type ch) { if (!::isdigit(ch)) return false; // At least one digit was found. state->correct = true; // Shift the existing value by 10, then add the new value. state->result *= 10; state->result += uint64_t(ch - int_type('0')); return true; } static pplx::task _extract_result(std::shared_ptr<_uint64_state> state) { if (!state->correct) throw std::range_error("integer value is too large to fit in 64 bits"); return pplx::task_from_result(state->result); } }; template class type_parser : public _type_parser_base { typedef _type_parser_base base; public: typedef typename base::traits traits; typedef typename base::int_type int_type; static pplx::task parse(streams::streambuf buffer) { return base::template _parse_input<_bool_state, bool>(buffer, _accept_char, _extract_result); } private: struct _bool_state { _bool_state() : state(0) {} // { 0 -- not started, 1 -- 't', 2 -- 'tr', 3 -- 'tru', 4 -- 'f', 5 -- 'fa', 6 -- 'fal', 7 -- 'fals', 8 -- // 'true', 9 -- 'false' } short state; }; static bool _accept_char(std::shared_ptr<_bool_state> state, int_type ch) { switch (state->state) { case 0: if (ch == int_type('t')) state->state = 1; else if (ch == int_type('f')) state->state = 4; else if (ch == int_type('1')) state->state = 8; else if (ch == int_type('0')) state->state = 9; else return false; break; case 1: if (ch == int_type('r')) state->state = 2; else return false; break; case 2: if (ch == int_type('u')) state->state = 3; else return false; break; case 3: if (ch == int_type('e')) state->state = 8; else return false; break; case 4: if (ch == int_type('a')) state->state = 5; else return false; break; case 5: if (ch == int_type('l')) state->state = 6; else return false; break; case 6: if (ch == int_type('s')) state->state = 7; else return false; break; case 7: if (ch == int_type('e')) state->state = 9; else return false; break; case 8: case 9: return false; } return true; } static pplx::task _extract_result(std::shared_ptr<_bool_state> state) { bool correct = (state->state == 8 || state->state == 9); if (!correct) { std::runtime_error exc("cannot parse as Boolean value"); throw exc; } return pplx::task_from_result(state->state == 8); } }; template class type_parser : public _type_parser_base { typedef _type_parser_base base; public: typedef typename base::traits traits; typedef typename base::int_type int_type; static pplx::task parse(streams::streambuf buffer) { return base::_skip_whitespace(buffer).then([=](pplx::task op) -> pplx::task { op.wait(); return type_parser::_get_char(buffer); }); } private: static pplx::task _get_char(streams::streambuf buffer) { concurrency::streams::streambuf buf = buffer; return buf.bumpc().then([=](pplx::task op) -> signed char { int_type val = op.get(); if (val == traits::eof()) throw std::runtime_error("reached end-of-stream while constructing a value"); return static_cast(val); }); } }; template class type_parser : public _type_parser_base { typedef _type_parser_base base; public: typedef typename base::traits traits; typedef typename base::int_type int_type; static pplx::task parse(streams::streambuf buffer) { return base::_skip_whitespace(buffer).then([=](pplx::task op) -> pplx::task { op.wait(); return type_parser::_get_char(buffer); }); } private: static pplx::task _get_char(streams::streambuf buffer) { concurrency::streams::streambuf buf = buffer; return buf.bumpc().then([=](pplx::task op) -> unsigned char { int_type val = op.get(); if (val == traits::eof()) throw std::runtime_error("reached end-of-stream while constructing a value"); return static_cast(val); }); } }; template class type_parser : public _type_parser_base { typedef _type_parser_base base; public: typedef typename base::traits traits; typedef typename base::int_type int_type; static pplx::task parse(streams::streambuf buffer) { return base::_skip_whitespace(buffer).then([=](pplx::task op) -> pplx::task { op.wait(); return _get_char(buffer); }); } private: static pplx::task _get_char(streams::streambuf buffer) { concurrency::streams::streambuf buf = buffer; return buf.bumpc().then([=](pplx::task op) -> char { int_type val = op.get(); if (val == traits::eof()) throw std::runtime_error("reached end-of-stream while constructing a value"); return char(val); }); } }; #ifdef _WIN32 template class type_parser>> : public _type_parser_base { typedef _type_parser_base base; public: typedef typename base::traits traits; typedef typename base::int_type int_type; static pplx::task parse(streams::streambuf buffer) { return base::template _parse_input, std::basic_string>( buffer, _accept_char, _extract_result); } private: static bool _accept_char(const std::shared_ptr>& state, int_type ch) { if (ch == concurrency::streams::char_traits::eof() || isspace(ch)) return false; state->push_back(char(ch)); return true; } static pplx::task> _extract_result(std::shared_ptr> state) { return pplx::task_from_result(utility::conversions::utf8_to_utf16(*state)); } }; #endif //_WIN32 } // namespace streams } // namespace Concurrency #endif