Commit 14571fd2 authored by 藤森雅人's avatar 藤森雅人
Browse files

Initial commit

parents
/***
* Copyright (C) Microsoft. All rights reserved.
* Licensed under the MIT license. See LICENSE.txt file in the project root for full license information.
*
* =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
*
* This file defines a basic memory-based stream buffer, which allows consumer / producer pairs to communicate
* data via a buffer.
*
* For the latest on this and related APIs, please see: https://github.com/Microsoft/cpprestsdk
*
* =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
****/
#pragma once
#ifndef CASA_PRODUCER_CONSUMER_STREAMS_H
#define CASA_PRODUCER_CONSUMER_STREAMS_H
#include "cpprest/astreambuf.h"
#include "pplx/pplxtasks.h"
#include <algorithm>
#include <iterator>
#include <queue>
#include <vector>
namespace Concurrency
{
namespace streams
{
namespace details
{
/// <summary>
/// The basic_producer_consumer_buffer class serves as a memory-based steam buffer that supports both writing and
/// reading sequences of characters. It can be used as a consumer/producer buffer.
/// </summary>
template<typename _CharType>
class basic_producer_consumer_buffer : public streams::details::streambuf_state_manager<_CharType>
{
public:
typedef typename ::concurrency::streams::char_traits<_CharType> traits;
typedef typename basic_streambuf<_CharType>::int_type int_type;
typedef typename basic_streambuf<_CharType>::pos_type pos_type;
typedef typename basic_streambuf<_CharType>::off_type off_type;
/// <summary>
/// Constructor
/// </summary>
basic_producer_consumer_buffer(size_t alloc_size)
: streambuf_state_manager<_CharType>(std::ios_base::out | std::ios_base::in)
, m_alloc_size(alloc_size)
, m_allocBlock(nullptr)
, m_total(0)
, m_total_read(0)
, m_total_written(0)
, m_synced(0)
{
}
/// <summary>
/// Destructor
/// </summary>
virtual ~basic_producer_consumer_buffer()
{
// Note: there is no need to call 'wait()' on the result of close(),
// since we happen to know that close() will return without actually
// doing anything asynchronously. Should the implementation of _close_write()
// change in that regard, this logic may also have to change.
this->_close_read();
this->_close_write();
_ASSERTE(m_requests.empty());
m_blocks.clear();
}
/// <summary>
/// <c>can_seek<c/> is used to determine whether a stream buffer supports seeking.
/// </summary>
virtual bool can_seek() const { return false; }
/// <summary>
/// <c>has_size<c/> is used to determine whether a stream buffer supports size().
/// </summary>
virtual bool has_size() const { return false; }
/// <summary>
/// Get the stream buffer size, if one has been set.
/// </summary>
/// <param name="direction">The direction of buffering (in or out)</param>
/// <remarks>An implementation that does not support buffering will always return '0'.</remarks>
virtual size_t buffer_size(std::ios_base::openmode = std::ios_base::in) const { return 0; }
/// <summary>
/// Sets the stream buffer implementation to buffer or not buffer.
/// </summary>
/// <param name="size">The size to use for internal buffering, 0 if no buffering should be done.</param>
/// <param name="direction">The direction of buffering (in or out)</param>
/// <remarks>An implementation that does not support buffering will silently ignore calls to this function and it
/// will not have any effect on what is returned by subsequent calls to <see cref="::buffer_size method"
/// />.</remarks>
virtual void set_buffer_size(size_t, std::ios_base::openmode = std::ios_base::in) { return; }
/// <summary>
/// For any input stream, <c>in_avail</c> returns the number of characters that are immediately available
/// to be consumed without blocking. May be used in conjunction with <cref="::sbumpc method"/> to read data without
/// incurring the overhead of using tasks.
/// </summary>
virtual size_t in_avail() const { return m_total; }
/// <summary>
/// Gets the current read or write position in the stream.
/// </summary>
/// <param name="direction">The I/O direction to seek (see remarks)</param>
/// <returns>The current position. EOF if the operation fails.</returns>
/// <remarks>Some streams may have separate write and read cursors.
/// For such streams, the direction parameter defines whether to move the read or the write
/// cursor.</remarks>
virtual pos_type getpos(std::ios_base::openmode mode) const
{
if (((mode & std::ios_base::in) && !this->can_read()) || ((mode & std::ios_base::out) && !this->can_write()))
return static_cast<pos_type>(traits::eof());
if (mode == std::ios_base::in)
return (pos_type)m_total_read;
else if (mode == std::ios_base::out)
return (pos_type)m_total_written;
else
return (pos_type)traits::eof();
}
// Seeking is not supported
virtual pos_type seekpos(pos_type, std::ios_base::openmode) { return (pos_type)traits::eof(); }
virtual pos_type seekoff(off_type, std::ios_base::seekdir, std::ios_base::openmode)
{
return (pos_type)traits::eof();
}
/// <summary>
/// Allocates a contiguous memory block and returns it.
/// </summary>
/// <param name="count">The number of characters to allocate.</param>
/// <returns>A pointer to a block to write to, null if the stream buffer implementation does not support
/// alloc/commit.</returns>
virtual _CharType* _alloc(size_t count)
{
if (!this->can_write())
{
return nullptr;
}
// We always allocate a new block even if the count could be satisfied by
// the current write block. While this does lead to wasted space it allows for
// easier book keeping
_ASSERTE(!m_allocBlock);
m_allocBlock = std::make_shared<_block>(count);
return m_allocBlock->wbegin();
}
/// <summary>
/// Submits a block already allocated by the stream buffer.
/// </summary>
/// <param name="count">The number of characters to be committed.</param>
virtual void _commit(size_t count)
{
pplx::extensibility::scoped_critical_section_t l(m_lock);
// The count does not reflect the actual size of the block.
// Since we do not allow any more writes to this block it would suffice.
// If we ever change the algorithm to reuse blocks then this needs to be revisited.
_ASSERTE((bool)m_allocBlock);
m_allocBlock->update_write_head(count);
m_blocks.push_back(m_allocBlock);
m_allocBlock = nullptr;
update_write_head(count);
}
/// <summary>
/// Gets a pointer to the next already allocated contiguous block of data.
/// </summary>
/// <param name="ptr">A reference to a pointer variable that will hold the address of the block on success.</param>
/// <param name="count">The number of contiguous characters available at the address in 'ptr'.</param>
/// <returns><c>true</c> if the operation succeeded, <c>false</c> otherwise.</returns>
/// <remarks>
/// A return of false does not necessarily indicate that a subsequent read operation would fail, only that
/// there is no block to return immediately or that the stream buffer does not support the operation.
/// The stream buffer may not de-allocate the block until <see cref="::release method" /> is called.
/// If the end of the stream is reached, the function will return <c>true</c>, a null pointer, and a count of zero;
/// a subsequent read will not succeed.
/// </remarks>
virtual bool acquire(_Out_ _CharType*& ptr, _Out_ size_t& count)
{
count = 0;
ptr = nullptr;
if (!this->can_read()) return false;
pplx::extensibility::scoped_critical_section_t l(m_lock);
if (m_blocks.empty())
{
// If the write head has been closed then have reached the end of the
// stream (return true), otherwise more data could be written later (return false).
return !this->can_write();
}
else
{
auto block = m_blocks.front();
count = block->rd_chars_left();
ptr = block->rbegin();
_ASSERTE(ptr != nullptr);
return true;
}
}
/// <summary>
/// Releases a block of data acquired using <see cref="::acquire method"/>. This frees the stream buffer to
/// de-allocate the memory, if it so desires. Move the read position ahead by the count.
/// </summary>
/// <param name="ptr">A pointer to the block of data to be released.</param>
/// <param name="count">The number of characters that were read.</param>
virtual void release(_Out_writes_opt_(count) _CharType* ptr, _In_ size_t count)
{
if (ptr == nullptr) return;
pplx::extensibility::scoped_critical_section_t l(m_lock);
auto block = m_blocks.front();
_ASSERTE(block->rd_chars_left() >= count);
block->m_read += count;
update_read_head(count);
}
protected:
virtual pplx::task<bool> _sync()
{
pplx::extensibility::scoped_critical_section_t l(m_lock);
m_synced = in_avail();
fulfill_outstanding();
return pplx::task_from_result(true);
}
virtual pplx::task<int_type> _putc(_CharType ch)
{
return pplx::task_from_result((this->write(&ch, 1) == 1) ? static_cast<int_type>(ch) : traits::eof());
}
virtual pplx::task<size_t> _putn(const _CharType* ptr, size_t count)
{
return pplx::task_from_result<size_t>(this->write(ptr, count));
}
virtual pplx::task<size_t> _getn(_Out_writes_(count) _CharType* ptr, _In_ size_t count)
{
pplx::task_completion_event<size_t> tce;
enqueue_request(_request(count, [this, ptr, count, tce]() {
// VS 2010 resolves read to a global function. Explicit
// invocation through the "this" pointer fixes the issue.
tce.set(this->read(ptr, count));
}));
return pplx::create_task(tce);
}
virtual size_t _sgetn(_Out_writes_(count) _CharType* ptr, _In_ size_t count)
{
pplx::extensibility::scoped_critical_section_t l(m_lock);
return can_satisfy(count) ? this->read(ptr, count) : (size_t)traits::requires_async();
}
virtual size_t _scopy(_Out_writes_(count) _CharType* ptr, _In_ size_t count)
{
pplx::extensibility::scoped_critical_section_t l(m_lock);
return can_satisfy(count) ? this->read(ptr, count, false) : (size_t)traits::requires_async();
}
virtual pplx::task<int_type> _bumpc()
{
pplx::task_completion_event<int_type> tce;
enqueue_request(_request(1, [this, tce]() { tce.set(this->read_byte(true)); }));
return pplx::create_task(tce);
}
virtual int_type _sbumpc()
{
pplx::extensibility::scoped_critical_section_t l(m_lock);
return can_satisfy(1) ? this->read_byte(true) : traits::requires_async();
}
virtual pplx::task<int_type> _getc()
{
pplx::task_completion_event<int_type> tce;
enqueue_request(_request(1, [this, tce]() { tce.set(this->read_byte(false)); }));
return pplx::create_task(tce);
}
int_type _sgetc()
{
pplx::extensibility::scoped_critical_section_t l(m_lock);
return can_satisfy(1) ? this->read_byte(false) : traits::requires_async();
}
virtual pplx::task<int_type> _nextc()
{
pplx::task_completion_event<int_type> tce;
enqueue_request(_request(1, [this, tce]() {
this->read_byte(true);
tce.set(this->read_byte(false));
}));
return pplx::create_task(tce);
}
virtual pplx::task<int_type> _ungetc() { return pplx::task_from_result<int_type>(traits::eof()); }
private:
/// <summary>
/// Close the stream buffer for writing
/// </summary>
pplx::task<void> _close_write()
{
// First indicate that there could be no more writes.
// Fulfill outstanding relies on that to flush all the
// read requests.
this->m_stream_can_write = false;
{
pplx::extensibility::scoped_critical_section_t l(this->m_lock);
// This runs on the thread that called close.
this->fulfill_outstanding();
}
return pplx::task_from_result();
}
/// <summary>
/// Updates the write head by an offset specified by count
/// </summary>
/// <remarks>This should be called with the lock held</remarks>
void update_write_head(size_t count)
{
m_total += count;
m_total_written += count;
fulfill_outstanding();
}
/// <summary>
/// Writes count characters from ptr into the stream buffer
/// </summary>
size_t write(const _CharType* ptr, size_t count)
{
if (!this->can_write() || (count == 0)) return 0;
// If no one is going to read, why bother?
// Just pretend to be writing!
if (!this->can_read()) return count;
pplx::extensibility::scoped_critical_section_t l(m_lock);
// Allocate a new block if necessary
if (m_blocks.empty() || m_blocks.back()->wr_chars_left() < count)
{
msl::safeint3::SafeInt<size_t> alloc = m_alloc_size.Max(count);
m_blocks.push_back(std::make_shared<_block>(alloc));
}
// The block at the back is always the write head
auto last = m_blocks.back();
auto countWritten = last->write(ptr, count);
_ASSERTE(countWritten == count);
update_write_head(countWritten);
return countWritten;
}
/// <summary>
/// Fulfill pending requests
/// </summary>
/// <remarks>This should be called with the lock held</remarks>
void fulfill_outstanding()
{
while (!m_requests.empty())
{
auto req = m_requests.front();
// If we cannot satisfy the request then we need
// to wait for the producer to write data
if (!can_satisfy(req.size())) return;
// We have enough data to satisfy this request
req.complete();
// Remove it from the request queue
m_requests.pop();
}
}
/// <summary>
/// Represents a memory block
/// </summary>
class _block
{
public:
_block(size_t size) : m_read(0), m_pos(0), m_size(size), m_data(new _CharType[size]) {}
~_block() { delete[] m_data; }
// Read head
size_t m_read;
// Write head
size_t m_pos;
// Allocation size (of m_data)
size_t m_size;
// The data store
_CharType* m_data;
// Pointer to the read head
_CharType* rbegin() { return m_data + m_read; }
// Pointer to the write head
_CharType* wbegin() { return m_data + m_pos; }
// Read up to count characters from the block
size_t read(_Out_writes_(count) _CharType* dest, _In_ size_t count, bool advance = true)
{
msl::safeint3::SafeInt<size_t> avail(rd_chars_left());
auto countRead = static_cast<size_t>(avail.Min(count));
_CharType* beg = rbegin();
_CharType* end = rbegin() + countRead;
#if defined(_ITERATOR_DEBUG_LEVEL) && _ITERATOR_DEBUG_LEVEL != 0
// Avoid warning C4996: Use checked iterators under SECURE_SCL
std::copy(beg, end, stdext::checked_array_iterator<_CharType*>(dest, count));
#else
std::copy(beg, end, dest);
#endif // _WIN32
if (advance)
{
m_read += countRead;
}
return countRead;
}
// Write count characters into the block
size_t write(const _CharType* src, size_t count)
{
msl::safeint3::SafeInt<size_t> avail(wr_chars_left());
auto countWritten = static_cast<size_t>(avail.Min(count));
const _CharType* srcEnd = src + countWritten;
#if defined(_ITERATOR_DEBUG_LEVEL) && _ITERATOR_DEBUG_LEVEL != 0
// Avoid warning C4996: Use checked iterators under SECURE_SCL
std::copy(src, srcEnd, stdext::checked_array_iterator<_CharType*>(wbegin(), static_cast<size_t>(avail)));
#else
std::copy(src, srcEnd, wbegin());
#endif // _WIN32
update_write_head(countWritten);
return countWritten;
}
void update_write_head(size_t count) { m_pos += count; }
size_t rd_chars_left() const { return m_pos - m_read; }
size_t wr_chars_left() const { return m_size - m_pos; }
private:
// Copy is not supported
_block(const _block&);
_block& operator=(const _block&);
};
/// <summary>
/// Represents a request on the stream buffer - typically reads
/// </summary>
class _request
{
public:
typedef std::function<void()> func_type;
_request(size_t count, const func_type& func) : m_func(func), m_count(count) {}
void complete() { m_func(); }
size_t size() const { return m_count; }
private:
func_type m_func;
size_t m_count;
};
void enqueue_request(_request req)
{
pplx::extensibility::scoped_critical_section_t l(m_lock);
if (can_satisfy(req.size()))
{
// We can immediately fulfill the request.
req.complete();
}
else
{
// We must wait for data to arrive.
m_requests.push(req);
}
}
/// <summary>
/// Determine if the request can be satisfied.
/// </summary>
bool can_satisfy(size_t count) { return (m_synced > 0) || (this->in_avail() >= count) || !this->can_write(); }
/// <summary>
/// Reads a byte from the stream and returns it as int_type.
/// Note: This routine shall only be called if can_satisfy() returned true.
/// </summary>
/// <remarks>This should be called with the lock held</remarks>
int_type read_byte(bool advance = true)
{
_CharType value;
auto read_size = this->read(&value, 1, advance);
return read_size == 1 ? static_cast<int_type>(value) : traits::eof();
}
/// <summary>
/// Reads up to count characters into ptr and returns the count of characters copied.
/// The return value (actual characters copied) could be <= count.
/// Note: This routine shall only be called if can_satisfy() returned true.
/// </summary>
/// <remarks>This should be called with the lock held</remarks>
size_t read(_Out_writes_(count) _CharType* ptr, _In_ size_t count, bool advance = true)
{
_ASSERTE(can_satisfy(count));
size_t read = 0;
for (auto iter = begin(m_blocks); iter != std::end(m_blocks); ++iter)
{
auto block = *iter;
auto read_from_block = block->read(ptr + read, count - read, advance);
read += read_from_block;
_ASSERTE(count >= read);
if (read == count) break;
}
if (advance)
{
update_read_head(read);
}
return read;
}
/// <summary>
/// Updates the read head by the specified offset
/// </summary>
/// <remarks>This should be called with the lock held</remarks>
void update_read_head(size_t count)
{
m_total -= count;
m_total_read += count;
if (m_synced > 0) m_synced = (m_synced > count) ? (m_synced - count) : 0;
// The block at the front is always the read head.
// Purge empty blocks so that the block at the front reflects the read head
while (!m_blocks.empty())
{
// If front block is not empty - we are done
if (m_blocks.front()->rd_chars_left() > 0) break;
// The block has no more data to be read. Relase the block
m_blocks.pop_front();
}
}
// The in/out mode for the buffer
std::ios_base::openmode m_mode;
// Default block size
msl::safeint3::SafeInt<size_t> m_alloc_size;
// Block used for alloc/commit
std::shared_ptr<_block> m_allocBlock;
// Total available data
size_t m_total;
size_t m_total_read;
size_t m_total_written;
// Keeps track of the number of chars that have been flushed but still
// remain to be consumed by a read operation.
size_t m_synced;
// The producer-consumer buffer is intended to be used concurrently by a reader
// and a writer, who are not coordinating their accesses to the buffer (coordination
// being what the buffer is for in the first place). Thus, we have to protect
// against some of the internal data elements against concurrent accesses
// and the possibility of inconsistent states. A simple non-recursive lock
// should be sufficient for those purposes.
pplx::extensibility::critical_section_t m_lock;
// Memory blocks
std::deque<std::shared_ptr<_block>> m_blocks;
// Queue of requests
std::queue<_request> m_requests;
};
} // namespace details
/// <summary>
/// The producer_consumer_buffer class serves as a memory-based steam buffer that supports both writing and reading
/// sequences of bytes. It can be used as a consumer/producer buffer.
/// </summary>
/// <typeparam name="_CharType">
/// The data type of the basic element of the <c>producer_consumer_buffer</c>.
/// </typeparam>
/// <remarks>
/// This is a reference-counted version of basic_producer_consumer_buffer.</remarks>
template<typename _CharType>
class producer_consumer_buffer : public streambuf<_CharType>
{
public:
typedef _CharType char_type;
/// <summary>
/// Create a producer_consumer_buffer.
/// </summary>
/// <param name="alloc_size">The internal default block size.</param>
producer_consumer_buffer(size_t alloc_size = 512)
: streambuf<_CharType>(std::make_shared<details::basic_producer_consumer_buffer<_CharType>>(alloc_size))
{
}
};
} // namespace streams
} // namespace Concurrency
#endif
/***
* Copyright (C) Microsoft. All rights reserved.
* Licensed under the MIT license. See LICENSE.txt file in the project root for full license information.
*
* =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
*
* This file defines a stream buffer that is based on a raw pointer and block size. Unlike a vector-based
* stream buffer, the buffer cannot be expanded or contracted, it has a fixed capacity.
*
* For the latest on this and related APIs, please see: https://github.com/Microsoft/cpprestsdk
*
* =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
****/
#pragma once
#ifndef CASA_RAWPTR_STREAMS_H
#define CASA_RAWPTR_STREAMS_H
#include "cpprest/astreambuf.h"
#include "cpprest/streams.h"
#include "pplx/pplxtasks.h"
#include <algorithm>
#include <iterator>
#include <queue>
#include <vector>
namespace Concurrency
{
namespace streams
{
// Forward declarations
template<typename _CharType>
class rawptr_buffer;
namespace details
{
/// <summary>
/// The basic_rawptr_buffer class serves as a memory-based steam buffer that supports both writing and reading
/// sequences of characters to and from a fixed-size block.
/// </summary>
template<typename _CharType>
class basic_rawptr_buffer : public streams::details::streambuf_state_manager<_CharType>
{
public:
typedef _CharType char_type;
typedef typename basic_streambuf<_CharType>::traits traits;
typedef typename basic_streambuf<_CharType>::int_type int_type;
typedef typename basic_streambuf<_CharType>::pos_type pos_type;
typedef typename basic_streambuf<_CharType>::off_type off_type;
/// <summary>
/// Constructor
/// </summary>
basic_rawptr_buffer()
: streambuf_state_manager<_CharType>(std::ios_base::in | std::ios_base::out)
, m_data(nullptr)
, m_current_position(0)
, m_size(0)
{
}
/// <summary>
/// Destructor
/// </summary>
virtual ~basic_rawptr_buffer()
{
this->_close_read();
this->_close_write();
}
protected:
/// <summary>
/// can_seek is used to determine whether a stream buffer supports seeking.
/// </summary>
virtual bool can_seek() const { return this->is_open(); }
/// <summary>
/// <c>has_size<c/> is used to determine whether a stream buffer supports size().
/// </summary>
virtual bool has_size() const { return this->is_open(); }
/// <summary>
/// Gets the size of the stream, if known. Calls to <c>has_size</c> will determine whether
/// the result of <c>size</c> can be relied on.
/// </summary>
virtual utility::size64_t size() const { return utility::size64_t(m_size); }
/// <summary>
/// Get the stream buffer size, if one has been set.
/// </summary>
/// <param name="direction">The direction of buffering (in or out)</param>
/// <remarks>An implementation that does not support buffering will always return '0'.</remarks>
virtual size_t buffer_size(std::ios_base::openmode = std::ios_base::in) const { return 0; }
/// <summary>
/// Set the stream buffer implementation to buffer or not buffer.
/// </summary>
/// <param name="size">The size to use for internal buffering, 0 if no buffering should be done.</param>
/// <param name="direction">The direction of buffering (in or out)</param>
/// <remarks>An implementation that does not support buffering will silently ignore calls to this function and it
/// will not have
/// any effect on what is returned by subsequent calls to buffer_size().</remarks>
virtual void set_buffer_size(size_t, std::ios_base::openmode = std::ios_base::in) { return; }
/// <summary>
/// For any input stream, in_avail returns the number of characters that are immediately available
/// to be consumed without blocking. May be used in conjunction with <cref="::sbumpc method"/> and sgetn() to
/// read data without incurring the overhead of using tasks.
/// </summary>
virtual size_t in_avail() const
{
// See the comment in seek around the restiction that we do not allow read head to
// seek beyond the current size.
_ASSERTE(m_current_position <= m_size);
msl::safeint3::SafeInt<size_t> readhead(m_current_position);
msl::safeint3::SafeInt<size_t> writeend(m_size);
return (size_t)(writeend - readhead);
}
/// <summary>
/// Closes the stream buffer, preventing further read or write operations.
/// </summary>
/// <param name="mode">The I/O mode (in or out) to close for.</param>
virtual pplx::task<void> close(std::ios_base::openmode mode)
{
if (mode & std::ios_base::in)
{
this->_close_read().get(); // Safe to call get() here.
}
if (mode & std::ios_base::out)
{
this->_close_write().get(); // Safe to call get() here.
}
if (!this->can_read() && !this->can_write())
{
m_data = nullptr;
}
// Exceptions will be propagated out of _close_read or _close_write
return pplx::task_from_result();
}
virtual pplx::task<bool> _sync() { return pplx::task_from_result(true); }
virtual pplx::task<int_type> _putc(_CharType ch)
{
if (m_current_position >= m_size) return pplx::task_from_result<int_type>(traits::eof());
int_type retVal = (this->write(&ch, 1) == 1) ? static_cast<int_type>(ch) : traits::eof();
return pplx::task_from_result<int_type>(retVal);
}
virtual pplx::task<size_t> _putn(const _CharType* ptr, size_t count)
{
msl::safeint3::SafeInt<size_t> newSize = msl::safeint3::SafeInt<size_t>(count) + m_current_position;
if (newSize > m_size)
return pplx::task_from_exception<size_t>(
std::make_exception_ptr(std::runtime_error("Writing past the end of the buffer")));
return pplx::task_from_result<size_t>(this->write(ptr, count));
}
/// <summary>
/// Allocates a contiguous memory block and returns it.
/// </summary>
/// <param name="count">The number of characters to allocate.</param>
/// <returns>A pointer to a block to write to, null if the stream buffer implementation does not support
/// alloc/commit.</returns>
_CharType* _alloc(size_t count)
{
if (!this->can_write()) return nullptr;
msl::safeint3::SafeInt<size_t> readhead(m_current_position);
msl::safeint3::SafeInt<size_t> writeend(m_size);
size_t space_left = (size_t)(writeend - readhead);
if (space_left < count) return nullptr;
// Let the caller copy the data
return (_CharType*)(m_data + m_current_position);
}
/// <summary>
/// Submits a block already allocated by the stream buffer.
/// </summary>
/// <param name="count">The number of characters to be committed.</param>
void _commit(size_t actual)
{
// Update the write position and satisfy any pending reads
update_current_position(m_current_position + actual);
}
/// <summary>
/// Gets a pointer to the next already allocated contiguous block of data.
/// </summary>
/// <param name="ptr">A reference to a pointer variable that will hold the address of the block on success.</param>
/// <param name="count">The number of contiguous characters available at the address in 'ptr'.</param>
/// <returns><c>true</c> if the operation succeeded, <c>false</c> otherwise.</returns>
/// <remarks>
/// A return of false does not necessarily indicate that a subsequent read operation would fail, only that
/// there is no block to return immediately or that the stream buffer does not support the operation.
/// The stream buffer may not de-allocate the block until <see cref="::release method" /> is called.
/// If the end of the stream is reached, the function will return <c>true</c>, a null pointer, and a count of zero;
/// a subsequent read will not succeed.
/// </remarks>
virtual bool acquire(_Out_ _CharType*& ptr, _Out_ size_t& count)
{
count = 0;
ptr = nullptr;
if (!this->can_read()) return false;
count = in_avail();
if (count > 0)
{
ptr = (_CharType*)(m_data + m_current_position);
return true;
}
else
{
ptr = nullptr;
// Can only be open for read OR write, not both. If there is no data then
// we have reached the end of the stream so indicate such with true.
return true;
}
}
/// <summary>
/// Releases a block of data acquired using <see cref="::acquire method"/>. This frees the stream buffer to
/// de-allocate the memory, if it so desires. Move the read position ahead by the count.
/// </summary>
/// <param name="ptr">A pointer to the block of data to be released.</param>
/// <param name="count">The number of characters that were read.</param>
virtual void release(_Out_writes_opt_(count) _CharType* ptr, _In_ size_t count)
{
if (ptr != nullptr) update_current_position(m_current_position + count);
}
virtual pplx::task<size_t> _getn(_Out_writes_(count) _CharType* ptr, _In_ size_t count)
{
return pplx::task_from_result(this->read(ptr, count));
}
size_t _sgetn(_Out_writes_(count) _CharType* ptr, _In_ size_t count) { return this->read(ptr, count); }
virtual size_t _scopy(_Out_writes_(count) _CharType* ptr, _In_ size_t count)
{
return this->read(ptr, count, false);
}
virtual pplx::task<int_type> _bumpc() { return pplx::task_from_result(this->read_byte(true)); }
virtual int_type _sbumpc() { return this->read_byte(true); }
virtual pplx::task<int_type> _getc() { return pplx::task_from_result(this->read_byte(false)); }
int_type _sgetc() { return this->read_byte(false); }
virtual pplx::task<int_type> _nextc()
{
if (m_current_position >= m_size - 1) return pplx::task_from_result(basic_streambuf<_CharType>::traits::eof());
this->read_byte(true);
return pplx::task_from_result(this->read_byte(false));
}
virtual pplx::task<int_type> _ungetc()
{
auto pos = seekoff(-1, std::ios_base::cur, std::ios_base::in);
if (pos == (pos_type)traits::eof()) return pplx::task_from_result(traits::eof());
return this->getc();
}
/// <summary>
/// Gets the current read or write position in the stream.
/// </summary>
/// <param name="direction">The I/O direction to seek (see remarks)</param>
/// <returns>The current position. EOF if the operation fails.</returns>
/// <remarks>Some streams may have separate write and read cursors.
/// For such streams, the direction parameter defines whether to move the read or the write
/// cursor.</remarks>
virtual pos_type getpos(std::ios_base::openmode mode) const
{
if (((mode & std::ios_base::in) && !this->can_read()) || ((mode & std::ios_base::out) && !this->can_write()))
return static_cast<pos_type>(traits::eof());
if (mode == std::ios_base::in)
return (pos_type)m_current_position;
else if (mode == std::ios_base::out)
return (pos_type)m_current_position;
else
return (pos_type)traits::eof();
}
/// <summary>
/// Seeks to the given position.
/// </summary>
/// <param name="pos">The offset from the beginning of the stream.</param>
/// <param name="direction">The I/O direction to seek (see remarks).</param>
/// <returns>The position. EOF if the operation fails.</returns>
/// <remarks>Some streams may have separate write and read cursors. For such streams, the direction parameter
/// defines whether to move the read or the write cursor.</remarks>
virtual pos_type seekpos(pos_type position, std::ios_base::openmode mode)
{
pos_type beg(0);
pos_type end(m_size);
if (position >= beg)
{
auto pos = static_cast<size_t>(position);
// Read head
if ((mode & std::ios_base::in) && this->can_read())
{
if (position <= end)
{
// We do not allow reads to seek beyond the end or before the start position.
update_current_position(pos);
return static_cast<pos_type>(m_current_position);
}
}
// Write head
if ((mode & std::ios_base::out) && this->can_write())
{
// Update write head and satisfy read requests if any
update_current_position(pos);
return static_cast<pos_type>(m_current_position);
}
}
return static_cast<pos_type>(traits::eof());
}
/// <summary>
/// Seeks to a position given by a relative offset.
/// </summary>
/// <param name="offset">The relative position to seek to</param>
/// <param name="way">The starting point (beginning, end, current) for the seek.</param>
/// <param name="mode">The I/O direction to seek (see remarks)</param>
/// <returns>The position. EOF if the operation fails.</returns>
/// <remarks>Some streams may have separate write and read cursors.
/// For such streams, the mode parameter defines whether to move the read or the write cursor.</remarks>
virtual pos_type seekoff(off_type offset, std::ios_base::seekdir way, std::ios_base::openmode mode)
{
pos_type beg = 0;
pos_type cur = static_cast<pos_type>(m_current_position);
pos_type end = static_cast<pos_type>(m_size);
switch (way)
{
case std::ios_base::beg: return seekpos(beg + offset, mode);
case std::ios_base::cur: return seekpos(cur + offset, mode);
case std::ios_base::end: return seekpos(end + offset, mode);
default: return static_cast<pos_type>(traits::eof());
}
}
private:
template<typename _CharType1>
friend class ::concurrency::streams::rawptr_buffer;
/// <summary>
/// Constructor
/// </summary>
/// <param name="data">The address (pointer to) the memory block.</param>
/// <param name="size">The memory block size, measured in number of characters.</param>
basic_rawptr_buffer(const _CharType* data, size_t size)
: streambuf_state_manager<_CharType>(std::ios_base::in)
, m_data(const_cast<_CharType*>(data))
, m_size(size)
, m_current_position(0)
{
validate_mode(std::ios_base::in);
}
/// <summary>
/// Constructor
/// </summary>
/// <param name="data">The address (pointer to) the memory block.</param>
/// <param name="size">The memory block size, measured in number of characters.</param>
/// <param name="mode">The stream mode (in, out, etc.).</param>
basic_rawptr_buffer(_CharType* data, size_t size, std::ios_base::openmode mode)
: streambuf_state_manager<_CharType>(mode), m_data(data), m_size(size), m_current_position(0)
{
validate_mode(mode);
}
static void validate_mode(std::ios_base::openmode mode)
{
// Disallow simultaneous use of the stream buffer for writing and reading.
if ((mode & std::ios_base::in) && (mode & std::ios_base::out))
throw std::invalid_argument("this combination of modes on raw pointer stream not supported");
}
/// <summary>
/// Determines if the request can be satisfied.
/// </summary>
bool can_satisfy(size_t) const
{
// We can always satisfy a read, at least partially, unless the
// read position is at the very end of the buffer.
return (in_avail() > 0);
}
/// <summary>
/// Reads a byte from the stream and returns it as int_type.
/// Note: This routine must only be called if can_satisfy() returns true.
/// </summary>
int_type read_byte(bool advance = true)
{
_CharType value;
auto read_size = this->read(&value, 1, advance);
return read_size == 1 ? static_cast<int_type>(value) : traits::eof();
}
/// <summary>
/// Reads up to count characters into ptr and returns the count of characters copied.
/// The return value (actual characters copied) could be <= count.
/// Note: This routine must only be called if can_satisfy() returns true.
/// </summary>
size_t read(_Out_writes_(count) _CharType* ptr, _In_ size_t count, bool advance = true)
{
if (!can_satisfy(count)) return 0;
msl::safeint3::SafeInt<size_t> request_size(count);
msl::safeint3::SafeInt<size_t> read_size = request_size.Min(in_avail());
size_t newPos = m_current_position + read_size;
auto readBegin = m_data + m_current_position;
auto readEnd = m_data + newPos;
#if defined(_ITERATOR_DEBUG_LEVEL) && _ITERATOR_DEBUG_LEVEL != 0
// Avoid warning C4996: Use checked iterators under SECURE_SCL
std::copy(readBegin, readEnd, stdext::checked_array_iterator<_CharType*>(ptr, count));
#else
std::copy(readBegin, readEnd, ptr);
#endif // _WIN32
if (advance)
{
update_current_position(newPos);
}
return (size_t)read_size;
}
/// <summary>
/// Write count characters from the ptr into the stream buffer
/// </summary>
size_t write(const _CharType* ptr, size_t count)
{
if (!this->can_write() || (count == 0)) return 0;
msl::safeint3::SafeInt<size_t> newSize = msl::safeint3::SafeInt<size_t>(count) + m_current_position;
if (newSize > m_size) throw std::runtime_error("Writing past the end of the buffer");
// Copy the data
#if defined(_ITERATOR_DEBUG_LEVEL) && _ITERATOR_DEBUG_LEVEL != 0
// Avoid warning C4996: Use checked iterators under SECURE_SCL
std::copy(ptr, ptr + count, stdext::checked_array_iterator<_CharType*>(m_data, m_size, m_current_position));
#else
std::copy(ptr, ptr + count, m_data + m_current_position);
#endif // _WIN32
// Update write head and satisfy pending reads if any
update_current_position(newSize);
return count;
}
/// <summary>
/// Updates the current read or write position
/// </summary>
void update_current_position(size_t newPos)
{
// The new write head
m_current_position = newPos;
_ASSERTE(m_current_position <= m_size);
}
// The actual memory block
_CharType* m_data;
// The size of the memory block
size_t m_size;
// Read/write head
size_t m_current_position;
};
} // namespace details
/// <summary>
/// The <c>rawptr_buffer</c> class serves as a memory-based stream buffer that supports reading
/// sequences of characters to or from a fixed-size block. Note that it cannot be used simultaneously for reading as
/// well as writing.
/// </summary>
/// <typeparam name="_CharType">
/// The data type of the basic element of the <c>rawptr_buffer</c>.
/// </typeparam>
template<typename _CharType>
class rawptr_buffer : public streambuf<_CharType>
{
public:
typedef _CharType char_type;
/// <summary>
/// Create a rawptr_buffer given a pointer to a memory block and the size of the block.
/// </summary>
/// <param name="data">The address (pointer to) the memory block.</param>
/// <param name="size">The memory block size, measured in number of characters.</param>
rawptr_buffer(const char_type* data, size_t size)
: streambuf<char_type>(std::shared_ptr<details::basic_rawptr_buffer<char_type>>(
new details::basic_rawptr_buffer<char_type>(data, size)))
{
}
/// <summary>
/// Create a rawptr_buffer given a pointer to a memory block and the size of the block.
/// </summary>
/// <param name="data">The address (pointer to) the memory block.</param>
/// <param name="size">The memory block size, measured in number of characters.</param>
rawptr_buffer(char_type* data, size_t size, std::ios_base::openmode mode = std::ios::out)
: streambuf<char_type>(std::shared_ptr<details::basic_rawptr_buffer<char_type>>(
new details::basic_rawptr_buffer<char_type>(data, size, mode)))
{
}
/// <summary>
/// Default constructor.
/// </summary>
rawptr_buffer() {}
};
/// <summary>
/// The rawptr_stream class is used to create memory-backed streams that support writing or reading
/// sequences of characters to / from a fixed-size block.
/// </summary>
/// <typeparam name="_CharType">
/// The data type of the basic element of the <c>rawptr_stream</c>.
/// </typeparam>
template<typename _CharType>
class rawptr_stream
{
public:
typedef _CharType char_type;
typedef rawptr_buffer<_CharType> buffer_type;
/// <summary>
/// Create a rawptr-stream given a pointer to a read-only memory block and the size of the block.
/// </summary>
/// <param name="data">The address (pointer to) the memory block.</param>
/// <param name="size">The memory block size, measured in number of characters.</param>
/// <returns>An opened input stream.</returns>
static concurrency::streams::basic_istream<char_type> open_istream(const char_type* data, size_t size)
{
return concurrency::streams::basic_istream<char_type>(buffer_type(data, size));
}
/// <summary>
/// Create a rawptr-stream given a pointer to a writable memory block and the size of the block.
/// </summary>
/// <param name="data">The address (pointer to) the memory block.</param>
/// <param name="size">The memory block size, measured in number of characters.</param>
/// <returns>An opened input stream.</returns>
static concurrency::streams::basic_istream<char_type> open_istream(char_type* data, size_t size)
{
return concurrency::streams::basic_istream<char_type>(buffer_type(data, size, std::ios::in));
}
/// <summary>
/// Create a rawptr-stream given a pointer to a writable memory block and the size of the block.
/// </summary>
/// <param name="data">The address (pointer to) the memory block.</param>
/// <param name="size">The memory block size, measured in number of characters.</param>
/// <returns>An opened output stream.</returns>
static concurrency::streams::basic_ostream<char_type> open_ostream(char_type* data, size_t size)
{
return concurrency::streams::basic_ostream<char_type>(buffer_type(data, size, std::ios::out));
}
};
} // namespace streams
} // namespace Concurrency
#endif
/***
* Copyright (C) Microsoft. All rights reserved.
* Licensed under the MIT license. See LICENSE.txt file in the project root for full license information.
*
* =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
*
* Asynchronous I/O: streams API, used for formatted input and output, based on unformatted I/O using stream buffers
*
* For the latest on this and related APIs, please see: https://github.com/Microsoft/cpprestsdk
*
* =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
****/
#pragma once
#ifndef CASA_STREAMS_H
#define CASA_STREAMS_H
#include "cpprest/astreambuf.h"
#include <iosfwd>
namespace Concurrency
{
namespace streams
{
template<typename CharType>
class basic_ostream;
template<typename CharType>
class basic_istream;
namespace details
{
template<typename CharType>
class basic_ostream_helper
{
public:
basic_ostream_helper(streams::streambuf<CharType> buffer) : m_buffer(buffer) {}
~basic_ostream_helper() {}
private:
template<typename CharType1>
friend class streams::basic_ostream;
concurrency::streams::streambuf<CharType> m_buffer;
};
template<typename CharType>
class basic_istream_helper
{
public:
basic_istream_helper(streams::streambuf<CharType> buffer) : m_buffer(buffer) {}
~basic_istream_helper() {}
private:
template<typename CharType1>
friend class streams::basic_istream;
concurrency::streams::streambuf<CharType> m_buffer;
};
template<typename CharType>
struct Value2StringFormatter
{
template<typename T>
static std::basic_string<CharType> format(const T& val)
{
std::basic_ostringstream<CharType> ss;
ss << val;
return ss.str();
}
};
template<>
struct Value2StringFormatter<uint8_t>
{
template<typename T>
static std::basic_string<uint8_t> format(const T& val)
{
std::basic_ostringstream<char> ss;
ss << val;
return reinterpret_cast<const uint8_t*>(ss.str().c_str());
}
static std::basic_string<uint8_t> format(const utf16string& val)
{
return format(utility::conversions::utf16_to_utf8(val));
}
};
static const char* _in_stream_msg = "stream not set up for input of data";
static const char* _in_streambuf_msg = "stream buffer not set up for input of data";
static const char* _out_stream_msg = "stream not set up for output of data";
static const char* _out_streambuf_msg = "stream buffer not set up for output of data";
} // namespace details
/// <summary>
/// Base interface for all asynchronous output streams.
/// </summary>
template<typename CharType>
class basic_ostream
{
public:
typedef char_traits<CharType> traits;
typedef typename traits::int_type int_type;
typedef typename traits::pos_type pos_type;
typedef typename traits::off_type off_type;
/// <summary>
/// Default constructor
/// </summary>
basic_ostream() {}
/// <summary>
/// Copy constructor
/// </summary>
/// <param name="other">The source object</param>
basic_ostream(const basic_ostream& other) : m_helper(other.m_helper) {}
/// <summary>
/// Assignment operator
/// </summary>
/// <param name="other">The source object</param>
/// <returns>A reference to the stream object that contains the result of the assignment.</returns>
basic_ostream& operator=(const basic_ostream& other)
{
m_helper = other.m_helper;
return *this;
}
/// <summary>
/// Constructor
/// </summary>
/// <param name="buffer">A stream buffer.</param>
basic_ostream(streams::streambuf<CharType> buffer)
: m_helper(std::make_shared<details::basic_ostream_helper<CharType>>(buffer))
{
_verify_and_throw(details::_out_streambuf_msg);
}
/// <summary>
/// Close the stream, preventing further write operations.
/// </summary>
pplx::task<void> close() const
{
return is_valid() ? helper()->m_buffer.close(std::ios_base::out) : pplx::task_from_result();
}
/// <summary>
/// Close the stream with exception, preventing further write operations.
/// </summary>
/// <param name="eptr">Pointer to the exception.</param>
pplx::task<void> close(std::exception_ptr eptr) const
{
return is_valid() ? helper()->m_buffer.close(std::ios_base::out, eptr) : pplx::task_from_result();
}
/// <summary>
/// Put a single character into the stream.
/// </summary>
/// <param name="ch">A character</param>
pplx::task<int_type> write(CharType ch) const
{
pplx::task<int_type> result;
if (!_verify_and_return_task(details::_out_stream_msg, result)) return result;
return helper()->m_buffer.putc(ch);
}
/// <summary>
/// Write a single value of "blittable" type T into the stream.
/// </summary>
/// <param name="value">A value of type T.</param>
/// <remarks>
/// This is not a replacement for a proper binary serialization solution, but it may
/// form the foundation for one. Writing data bit-wise to a stream is a primitive
/// operation of binary serialization.
/// Currently, no attention is paid to byte order. All data is written in the platform's
/// native byte order, which means little-endian on all platforms that have been tested.
/// This function is only available for streams using a single-byte character size.
/// </remarks>
template<typename T>
CASABLANCA_DEPRECATED(
"Unsafe API that will be removed in future releases, use one of the other write overloads instead.")
pplx::task<size_t> write(T value) const
{
static_assert(sizeof(CharType) == 1, "binary write is only supported for single-byte streams");
static_assert(std::is_trivial<T>::value, "unsafe to use with non-trivial types");
pplx::task<size_t> result;
if (!_verify_and_return_task(details::_out_stream_msg, result)) return result;
auto copy = std::make_shared<T>(std::move(value));
return helper()
->m_buffer.putn_nocopy((CharType*)copy.get(), sizeof(T))
.then([copy](pplx::task<size_t> op) -> size_t { return op.get(); });
}
/// <summary>
/// Write a number of characters from a given stream buffer into the stream.
/// </summary>
/// <param name="source">A source stream buffer.</param>
/// <param name="count">The number of characters to write.</param>
pplx::task<size_t> write(streams::streambuf<CharType> source, size_t count) const
{
pplx::task<size_t> result;
if (!_verify_and_return_task(details::_out_stream_msg, result)) return result;
if (!source.can_read())
return pplx::task_from_exception<size_t>(
std::make_exception_ptr(std::runtime_error("source buffer not set up for input of data")));
if (count == 0) return pplx::task_from_result((size_t)0);
auto buffer = helper()->m_buffer;
auto data = buffer.alloc(count);
if (data != nullptr)
{
auto post_read = [buffer](pplx::task<size_t> op) -> pplx::task<size_t> {
auto b = buffer;
b.commit(op.get());
return op;
};
return source.getn(data, count).then(post_read);
}
else
{
size_t available = 0;
const bool acquired = source.acquire(data, available);
if (available >= count)
{
auto post_write = [source, data](pplx::task<size_t> op) -> pplx::task<size_t> {
auto s = source;
s.release(data, op.get());
return op;
};
return buffer.putn_nocopy(data, count).then(post_write);
}
else
{
// Always have to release if acquire returned true.
if (acquired)
{
source.release(data, 0);
}
std::shared_ptr<CharType> buf(new CharType[count], [](CharType* buf) { delete[] buf; });
auto post_write = [buf](pplx::task<size_t> op) -> pplx::task<size_t> { return op; };
auto post_read = [buf, post_write, buffer](pplx::task<size_t> op) -> pplx::task<size_t> {
auto b = buffer;
return b.putn_nocopy(buf.get(), op.get()).then(post_write);
};
return source.getn(buf.get(), count).then(post_read);
}
}
}
/// <summary>
/// Write the specified string to the output stream.
/// </summary>
/// <param name="str">Input string.</param>
pplx::task<size_t> print(const std::basic_string<CharType>& str) const
{
pplx::task<size_t> result;
if (!_verify_and_return_task(details::_out_stream_msg, result)) return result;
if (str.empty())
{
return pplx::task_from_result<size_t>(0);
}
else
{
auto sharedStr = std::make_shared<std::basic_string<CharType>>(str);
return helper()->m_buffer.putn_nocopy(sharedStr->c_str(), sharedStr->size()).then([sharedStr](size_t size) {
return size;
});
}
}
/// <summary>
/// Write a value of type <c>T</c> to the output stream.
/// </summary>
/// <typeparam name="T">
/// The data type of the object to be written to the stream
/// </typeparam>
/// <param name="val">Input object.</param>
template<typename T>
pplx::task<size_t> print(const T& val) const
{
pplx::task<size_t> result;
if (!_verify_and_return_task(details::_out_stream_msg, result)) return result;
// TODO in the future this could be improved to have Value2StringFormatter avoid another unnecessary copy
// by putting the string on the heap before calling the print string overload.
return print(details::Value2StringFormatter<CharType>::format(val));
}
/// <summary>
/// Write a value of type <c>T</c> to the output stream and append a newline character.
/// </summary>
/// <typeparam name="T">
/// The data type of the object to be written to the stream
/// </typeparam>
/// <param name="val">Input object.</param>
template<typename T>
pplx::task<size_t> print_line(const T& val) const
{
pplx::task<size_t> result;
if (!_verify_and_return_task(details::_out_stream_msg, result)) return result;
auto str = details::Value2StringFormatter<CharType>::format(val);
str.push_back(CharType('\n'));
return print(str);
}
/// <summary>
/// Flush any buffered output data.
/// </summary>
pplx::task<void> flush() const
{
pplx::task<void> result;
if (!_verify_and_return_task(details::_out_stream_msg, result)) return result;
return helper()->m_buffer.sync();
}
/// <summary>
/// Seeks to the specified write position.
/// </summary>
/// <param name="pos">An offset relative to the beginning of the stream.</param>
/// <returns>The new position in the stream.</returns>
pos_type seek(pos_type pos) const
{
_verify_and_throw(details::_out_stream_msg);
return helper()->m_buffer.seekpos(pos, std::ios_base::out);
}
/// <summary>
/// Seeks to the specified write position.
/// </summary>
/// <param name="off">An offset relative to the beginning, current write position, or the end of the stream.</param>
/// <param name="way">The starting point (beginning, current, end) for the seek.</param>
/// <returns>The new position in the stream.</returns>
pos_type seek(off_type off, std::ios_base::seekdir way) const
{
_verify_and_throw(details::_out_stream_msg);
return helper()->m_buffer.seekoff(off, way, std::ios_base::out);
}
/// <summary>
/// Get the current write position, i.e. the offset from the beginning of the stream.
/// </summary>
/// <returns>The current write position.</returns>
pos_type tell() const
{
_verify_and_throw(details::_out_stream_msg);
return helper()->m_buffer.getpos(std::ios_base::out);
}
/// <summary>
/// <c>can_seek<c/> is used to determine whether the stream supports seeking.
/// </summary>
/// <returns><c>true</c> if the stream supports seeking, <c>false</c> otherwise.</returns>
bool can_seek() const { return is_valid() && m_helper->m_buffer.can_seek(); }
/// <summary>
/// Test whether the stream has been initialized with a valid stream buffer.
/// </summary>
/// <returns><c>true</c> if the stream has been initialized with a valid stream buffer, <c>false</c>
/// otherwise.</returns>
bool is_valid() const { return (m_helper != nullptr) && ((bool)m_helper->m_buffer); }
/// <summary>
/// Test whether the stream has been initialized or not.
/// </summary>
operator bool() const { return is_valid(); }
/// <summary>
/// Test whether the stream is open for writing.
/// </summary>
/// <returns><c>true</c> if the stream is open for writing, <c>false</c> otherwise.</returns>
bool is_open() const { return is_valid() && m_helper->m_buffer.can_write(); }
/// <summary>
/// Get the underlying stream buffer.
/// </summary>
/// <returns>The underlying stream buffer.</returns>
concurrency::streams::streambuf<CharType> streambuf() const { return helper()->m_buffer; }
protected:
void set_helper(std::shared_ptr<details::basic_ostream_helper<CharType>> helper) { m_helper = helper; }
private:
template<typename T>
bool _verify_and_return_task(const char* msg, pplx::task<T>& tsk) const
{
auto buffer = helper()->m_buffer;
if (!(buffer.exception() == nullptr))
{
tsk = pplx::task_from_exception<T>(buffer.exception());
return false;
}
if (!buffer.can_write())
{
tsk = pplx::task_from_exception<T>(std::make_exception_ptr(std::runtime_error(msg)));
return false;
}
return true;
}
void _verify_and_throw(const char* msg) const
{
auto buffer = helper()->m_buffer;
if (!(buffer.exception() == nullptr)) std::rethrow_exception(buffer.exception());
if (!buffer.can_write()) throw std::runtime_error(msg);
}
std::shared_ptr<details::basic_ostream_helper<CharType>> helper() const
{
if (!m_helper) throw std::logic_error("uninitialized stream object");
return m_helper;
}
std::shared_ptr<details::basic_ostream_helper<CharType>> m_helper;
};
template<typename int_type>
struct _type_parser_integral_traits
{
typedef std::false_type _is_integral;
typedef std::false_type _is_unsigned;
};
#ifdef _WIN32
#define _INT_TRAIT(_t, _low, _high) \
template<> \
struct _type_parser_integral_traits<_t> \
{ \
typedef std::true_type _is_integral; \
typedef std::false_type _is_unsigned; \
static const int64_t _min = _low; \
static const int64_t _max = _high; \
};
#define _UINT_TRAIT(_t, _low, _high) \
template<> \
struct _type_parser_integral_traits<_t> \
{ \
typedef std::true_type _is_integral; \
typedef std::true_type _is_unsigned; \
static const uint64_t _max = _high; \
};
_INT_TRAIT(char, INT8_MIN, INT8_MAX)
_INT_TRAIT(signed char, INT8_MIN, INT8_MAX)
_INT_TRAIT(short, INT16_MIN, INT16_MAX)
#if defined(_NATIVE_WCHAR_T_DEFINED)
_INT_TRAIT(wchar_t, WCHAR_MIN, WCHAR_MAX)
#endif
_INT_TRAIT(int, INT32_MIN, INT32_MAX)
_INT_TRAIT(long, LONG_MIN, LONG_MAX)
_INT_TRAIT(long long, LLONG_MIN, LLONG_MAX)
_UINT_TRAIT(unsigned char, UINT8_MIN, UINT8_MAX)
_UINT_TRAIT(unsigned short, UINT16_MIN, UINT16_MAX)
_UINT_TRAIT(unsigned int, UINT32_MIN, UINT32_MAX)
_UINT_TRAIT(unsigned long, ULONG_MIN, ULONG_MAX)
_UINT_TRAIT(unsigned long long, ULLONG_MIN, ULLONG_MAX)
#else
#define _INT_TRAIT(_t) \
template<> \
struct _type_parser_integral_traits<_t> \
{ \
typedef std::true_type _is_integral; \
typedef std::false_type _is_unsigned; \
static const int64_t _min = std::numeric_limits<_t>::min(); \
static const int64_t _max = (std::numeric_limits<_t>::max)(); \
};
#define _UINT_TRAIT(_t) \
template<> \
struct _type_parser_integral_traits<_t> \
{ \
typedef std::true_type _is_integral; \
typedef std::true_type _is_unsigned; \
static const uint64_t _max = (std::numeric_limits<_t>::max)(); \
};
_INT_TRAIT(char)
_INT_TRAIT(signed char)
_INT_TRAIT(short)
_INT_TRAIT(utf16char)
_INT_TRAIT(int)
_INT_TRAIT(long)
_INT_TRAIT(long long)
_UINT_TRAIT(unsigned char)
_UINT_TRAIT(unsigned short)
_UINT_TRAIT(unsigned int)
_UINT_TRAIT(unsigned long)
_UINT_TRAIT(unsigned long long)
#endif
template<typename CharType>
class _type_parser_base
{
public:
typedef char_traits<CharType> traits;
typedef typename traits::int_type int_type;
_type_parser_base() {}
protected:
// Aid in parsing input: skipping whitespace characters.
static pplx::task<void> _skip_whitespace(streams::streambuf<CharType> buffer);
// Aid in parsing input: peek at a character at a time, call type-specific code to examine, extract value when done.
// <remark>AcceptFunctor should model std::function<bool(std::shared_ptr<X>, int_type)></remark>
// <remark>ExtractFunctor should model std::function<pplx::task<ReturnType>(std::shared_ptr<X>)></remark>
template<typename StateType, typename ReturnType, typename AcceptFunctor, typename ExtractFunctor>
static pplx::task<ReturnType> _parse_input(streams::streambuf<CharType> buffer,
AcceptFunctor accept_character,
ExtractFunctor extract);
};
/// <summary>
/// Class used to handle asynchronous parsing for basic_istream::extract. To support new
/// types create a new template specialization and implement the parse function.
/// </summary>
template<typename CharType, typename T>
class type_parser
{
public:
static pplx::task<T> parse(streams::streambuf<CharType> buffer)
{
typename _type_parser_integral_traits<T>::_is_integral ii;
typename _type_parser_integral_traits<T>::_is_unsigned ui;
return _parse(buffer, ii, ui);
}
private:
static pplx::task<T> _parse(streams::streambuf<CharType> buffer, std::false_type, std::false_type)
{
_parse_floating_point(buffer);
}
static pplx::task<T> _parse(streams::streambuf<CharType>, std::false_type, std::true_type)
{
#ifdef _WIN32
static_assert(false, "type is not supported for extraction from a stream");
#else
throw std::runtime_error("type is not supported for extraction from a stream");
#endif
}
static pplx::task<T> _parse(streams::streambuf<CharType> buffer, std::true_type, std::false_type)
{
return type_parser<CharType, int64_t>::parse(buffer).then([](pplx::task<int64_t> op) -> T {
int64_t val = op.get();
if (val <= _type_parser_integral_traits<T>::_max && val >= _type_parser_integral_traits<T>::_min)
return (T)val;
else
throw std::range_error("input out of range for target type");
});
}
static pplx::task<T> _parse(streams::streambuf<CharType> buffer, std::true_type, std::true_type)
{
return type_parser<CharType, uint64_t>::parse(buffer).then([](pplx::task<uint64_t> op) -> T {
uint64_t val = op.get();
if (val <= _type_parser_integral_traits<T>::_max)
return (T)val;
else
throw std::range_error("input out of range for target type");
});
}
};
/// <summary>
/// Base interface for all asynchronous input streams.
/// </summary>
template<typename CharType>
class basic_istream
{
public:
typedef char_traits<CharType> traits;
typedef typename traits::int_type int_type;
typedef typename traits::pos_type pos_type;
typedef typename traits::off_type off_type;
/// <summary>
/// Default constructor
/// </summary>
basic_istream() {}
/// <summary>
/// Constructor
/// </summary>
/// <typeparam name="CharType">
/// The data type of the basic element of the stream.
/// </typeparam>
/// <param name="buffer">A stream buffer.</param>
template<class AlterCharType>
basic_istream(streams::streambuf<AlterCharType> buffer)
: m_helper(std::make_shared<details::basic_istream_helper<CharType>>(std::move(buffer)))
{
_verify_and_throw(details::_in_streambuf_msg);
}
/// <summary>
/// Copy constructor
/// </summary>
/// <param name="other">The source object</param>
basic_istream(const basic_istream& other) : m_helper(other.m_helper) {}
/// <summary>
/// Assignment operator
/// </summary>
/// <param name="other">The source object</param>
/// <returns>A reference to the stream object that contains the result of the assignment.</returns>
basic_istream& operator=(const basic_istream& other)
{
m_helper = other.m_helper;
return *this;
}
/// <summary>
/// Close the stream, preventing further read operations.
/// </summary>
pplx::task<void> close() const
{
return is_valid() ? helper()->m_buffer.close(std::ios_base::in) : pplx::task_from_result();
}
/// <summary>
/// Close the stream with exception, preventing further read operations.
/// </summary>
/// <param name="eptr">Pointer to the exception.</param>
pplx::task<void> close(std::exception_ptr eptr) const
{
return is_valid() ? m_helper->m_buffer.close(std::ios_base::in, eptr) : pplx::task_from_result();
}
/// <summary>
/// Tests whether last read cause the stream reach EOF.
/// </summary>
/// <returns>True if the read head has reached the end of the stream, false otherwise.</returns>
bool is_eof() const { return is_valid() ? m_helper->m_buffer.is_eof() : false; }
/// <summary>
/// Get the next character and return it as an int_type. Advance the read position.
/// </summary>
/// <returns>A <c>task</c> that holds the next character as an <c>int_type</c> on successful completion.</returns>
pplx::task<int_type> read() const
{
pplx::task<int_type> result;
if (!_verify_and_return_task(details::_in_stream_msg, result)) return result;
return helper()->m_buffer.bumpc();
}
/// <summary>
/// Read a single value of "blittable" type T from the stream.
/// </summary>
/// <returns>A value of type T.</returns>
/// <remarks>
/// This is not a replacement for a proper binary serialization solution, but it may
/// form the foundation for one. Reading data bit-wise to a stream is a primitive
/// operation of binary serialization.
/// Currently, no attention is paid to byte order. All data is read in the platform's
/// native byte order, which means little-endian on all platforms that have been tested.
/// This function is only available for streams using a single-byte character size.
/// </remarks>
template<typename T>
CASABLANCA_DEPRECATED(
"Unsafe API that will be removed in future releases, use one of the other read overloads instead.")
pplx::task<T> read() const
{
static_assert(sizeof(CharType) == 1, "binary read is only supported for single-byte streams");
static_assert(std::is_trivial<T>::value, "unsafe to use with non-trivial types");
pplx::task<T> result;
if (!_verify_and_return_task(details::_in_stream_msg, result)) return result;
auto copy = std::make_shared<T>();
return helper()->m_buffer.getn((CharType*)copy.get(), sizeof(T)).then([copy](pplx::task<size_t>) -> T {
return std::move(*copy);
});
}
/// <summary>
/// Reads up to <c>count</c> characters and place into the provided buffer.
/// </summary>
/// <param name="target">An async stream buffer supporting write operations.</param>
/// <param name="count">The maximum number of characters to read</param>
/// <returns>A <c>task</c> that holds the number of characters read. This number is 0 if the end of the stream is
/// reached.</returns>
pplx::task<size_t> read(streams::streambuf<CharType> target, size_t count) const
{
pplx::task<size_t> result;
if (!_verify_and_return_task(details::_in_stream_msg, result)) return result;
if (!target.can_write())
return pplx::task_from_exception<size_t>(
std::make_exception_ptr(std::runtime_error("target not set up for output of data")));
// Capture 'buffer' rather than 'helper' here due to VC++ 2010 limitations.
auto buffer = helper()->m_buffer;
auto data = target.alloc(count);
if (data != nullptr)
{
auto post_read = [target](pplx::task<size_t> op) -> pplx::task<size_t> {
auto t = target;
t.commit(op.get());
return op;
};
return buffer.getn(data, count).then(post_read);
}
else
{
size_t available = 0;
const bool acquired = buffer.acquire(data, available);
if (available >= count)
{
auto post_write = [buffer, data](pplx::task<size_t> op) -> pplx::task<size_t> {
auto b = buffer;
b.release(data, op.get());
return op;
};
return target.putn_nocopy(data, count).then(post_write);
}
else
{
// Always have to release if acquire returned true.
if (acquired)
{
buffer.release(data, 0);
}
std::shared_ptr<CharType> buf(new CharType[count], [](CharType* buf) { delete[] buf; });
auto post_write = [buf](pplx::task<size_t> op) -> pplx::task<size_t> { return op; };
auto post_read = [buf, target, post_write](pplx::task<size_t> op) -> pplx::task<size_t> {
auto trg = target;
return trg.putn_nocopy(buf.get(), op.get()).then(post_write);
};
return helper()->m_buffer.getn(buf.get(), count).then(post_read);
}
}
}
/// <summary>
/// Get the next character and return it as an int_type. Do not advance the read position.
/// </summary>
/// <returns>A <c>task</c> that holds the character, widened to an integer. This character is EOF when the peek
/// operation fails.</returns>
pplx::task<int_type> peek() const
{
pplx::task<int_type> result;
if (!_verify_and_return_task(details::_in_stream_msg, result)) return result;
return helper()->m_buffer.getc();
}
/// <summary>
/// Read characters until a delimiter or EOF is found, and place them into the target.
/// Proceed past the delimiter, but don't include it in the target buffer.
/// </summary>
/// <param name="target">An async stream buffer supporting write operations.</param>
/// <param name="delim">The delimiting character to stop the read at.</param>
/// <returns>A <c>task</c> that holds the number of characters read.</returns>
pplx::task<size_t> read_to_delim(streams::streambuf<CharType> target, int_type delim) const
{
pplx::task<size_t> result;
if (!_verify_and_return_task(details::_in_stream_msg, result)) return result;
if (!target.can_write())
return pplx::task_from_exception<size_t>(
std::make_exception_ptr(std::runtime_error("target not set up for output of data")));
// Capture 'buffer' rather than 'helper' here due to VC++ 2010 limitations.
auto buffer = helper()->m_buffer;
int_type req_async = traits::requires_async();
std::shared_ptr<_read_helper> _locals = std::make_shared<_read_helper>();
auto flush = [=]() mutable {
return target.putn_nocopy(_locals->outbuf, _locals->write_pos).then([=](size_t wrote) mutable {
_locals->total += wrote;
_locals->write_pos = 0;
return target.sync();
});
};
auto update = [=](int_type ch) mutable {
if (ch == traits::eof()) return false;
if (ch == delim) return false;
_locals->outbuf[_locals->write_pos] = static_cast<CharType>(ch);
_locals->write_pos += 1;
if (_locals->is_full())
{
// Flushing synchronously because performance is terrible if we
// schedule an empty task. This isn't on a user's thread.
flush().get();
}
return true;
};
auto loop = pplx::details::_do_while([=]() mutable -> pplx::task<bool> {
while (buffer.in_avail() > 0)
{
int_type ch = buffer.sbumpc();
if (ch == req_async)
{
break;
}
if (!update(ch))
{
return pplx::task_from_result(false);
}
}
return buffer.bumpc().then(update);
});
return loop.then([=](bool) mutable { return flush().then([=] { return _locals->total; }); });
}
/// <summary>
/// Read until reaching a newline character. The newline is not included in the target.
/// </summary>
/// <param name="target">An asynchronous stream buffer supporting write operations.</param>
/// <returns>A <c>task</c> that holds the number of characters read. This number is 0 if the end of the stream is
/// reached.</returns>
pplx::task<size_t> read_line(streams::streambuf<CharType> target) const
{
pplx::task<size_t> result;
if (!_verify_and_return_task(details::_in_stream_msg, result)) return result;
if (!target.can_write())
return pplx::task_from_exception<size_t>(
std::make_exception_ptr(std::runtime_error("target not set up for receiving data")));
// Capture 'buffer' rather than 'helper' here due to VC++ 2010 limitations.
concurrency::streams::streambuf<CharType> buffer = helper()->m_buffer;
int_type req_async = traits::requires_async();
std::shared_ptr<_read_helper> _locals = std::make_shared<_read_helper>();
auto flush = [=]() mutable {
return target.putn_nocopy(_locals->outbuf, _locals->write_pos).then([=](size_t wrote) mutable {
_locals->total += wrote;
_locals->write_pos = 0;
return target.sync();
});
};
auto update = [=](int_type ch) mutable {
if (ch == traits::eof()) return false;
if (ch == '\n') return false;
if (ch == '\r')
{
_locals->saw_CR = true;
return true;
}
_locals->outbuf[_locals->write_pos] = static_cast<CharType>(ch);
_locals->write_pos += 1;
if (_locals->is_full())
{
// Flushing synchronously because performance is terrible if we
// schedule an empty task. This isn't on a user's thread.
flush().wait();
}
return true;
};
auto update_after_cr = [=](int_type ch) mutable -> pplx::task<bool> {
if (ch == traits::eof()) return pplx::task_from_result(false);
if (ch == '\n')
{
return buffer.bumpc().then([](int_type) { return false; });
}
return pplx::task_from_result(false);
};
auto loop = pplx::details::_do_while([=]() mutable -> pplx::task<bool> {
while (buffer.in_avail() > 0)
{
int_type ch;
if (_locals->saw_CR)
{
ch = buffer.sgetc();
if (ch == '\n') buffer.sbumpc();
return pplx::task_from_result(false);
}
ch = buffer.sbumpc();
if (ch == req_async) break;
if (!update(ch))
{
return pplx::task_from_result(false);
}
}
if (_locals->saw_CR)
{
return buffer.getc().then(update_after_cr);
}
return buffer.bumpc().then(update);
});
return loop.then([=](bool) mutable { return flush().then([=] { return _locals->total; }); });
}
/// <summary>
/// Read until reaching the end of the stream.
/// </summary>
/// <param name="target">An asynchronous stream buffer supporting write operations.</param>
/// <returns>The number of characters read.</returns>
pplx::task<size_t> read_to_end(streams::streambuf<CharType> target) const
{
pplx::task<size_t> result;
if (!_verify_and_return_task("stream not set up for output of data", result)) return result;
if (!target.can_write())
return pplx::task_from_exception<size_t>(
std::make_exception_ptr(std::runtime_error("source buffer not set up for input of data")));
auto l_buffer = helper()->m_buffer;
auto l_buf_size = this->buf_size;
std::shared_ptr<_read_helper> l_locals = std::make_shared<_read_helper>();
auto copy_to_target = [l_locals, target, l_buffer, l_buf_size]() mutable -> pplx::task<bool> {
// We need to capture these, because the object itself may go away
// before we're done processing the data.
// auto locs = _locals;
// auto trg = target;
return l_buffer.getn(l_locals->outbuf, l_buf_size).then([=](size_t rd) mutable -> pplx::task<bool> {
if (rd == 0) return pplx::task_from_result(false);
// Must be nested to capture rd
return target.putn_nocopy(l_locals->outbuf, rd)
.then([target, l_locals, rd](size_t wr) mutable -> pplx::task<bool> {
l_locals->total += wr;
if (rd != wr)
// Number of bytes written is less than number of bytes received.
throw std::runtime_error("failed to write all bytes");
return target.sync().then([]() { return true; });
});
});
};
auto loop = pplx::details::_do_while(copy_to_target);
return loop.then([=](bool) mutable -> size_t { return l_locals->total; });
}
/// <summary>
/// Seeks to the specified write position.
/// </summary>
/// <param name="pos">An offset relative to the beginning of the stream.</param>
/// <returns>The new position in the stream.</returns>
pos_type seek(pos_type pos) const
{
_verify_and_throw(details::_in_stream_msg);
return helper()->m_buffer.seekpos(pos, std::ios_base::in);
}
/// <summary>
/// Seeks to the specified write position.
/// </summary>
/// <param name="off">An offset relative to the beginning, current write position, or the end of the stream.</param>
/// <param name="way">The starting point (beginning, current, end) for the seek.</param>
/// <returns>The new position in the stream.</returns>
pos_type seek(off_type off, std::ios_base::seekdir way) const
{
_verify_and_throw(details::_in_stream_msg);
return helper()->m_buffer.seekoff(off, way, std::ios_base::in);
}
/// <summary>
/// Get the current write position, i.e. the offset from the beginning of the stream.
/// </summary>
/// <returns>The current write position.</returns>
pos_type tell() const
{
_verify_and_throw(details::_in_stream_msg);
return helper()->m_buffer.getpos(std::ios_base::in);
}
/// <summary>
/// <c>can_seek<c/> is used to determine whether the stream supports seeking.
/// </summary>
/// <returns><c>true</c> if the stream supports seeking, <c>false</c> otherwise.</returns>
bool can_seek() const { return is_valid() && m_helper->m_buffer.can_seek(); }
/// <summary>
/// Test whether the stream has been initialized with a valid stream buffer.
/// </summary>
bool is_valid() const { return (m_helper != nullptr) && ((bool)m_helper->m_buffer); }
/// <summary>
/// Test whether the stream has been initialized or not.
/// </summary>
operator bool() const { return is_valid(); }
/// <summary>
/// Test whether the stream is open for writing.
/// </summary>
/// <returns><c>true</c> if the stream is open for writing, <c>false</c> otherwise.</returns>
bool is_open() const { return is_valid() && m_helper->m_buffer.can_read(); }
/// <summary>
/// Get the underlying stream buffer.
/// </summary>
concurrency::streams::streambuf<CharType> streambuf() const { return helper()->m_buffer; }
/// <summary>
/// Read a value of type <c>T</c> from the stream.
/// </summary>
/// <remarks>
/// Supports the C++ primitive types. Can be expanded to additional types
/// by adding template specializations for <c>type_parser</c>.
/// </remarks>
/// <typeparam name="T">
/// The data type of the element to be read from the stream.
/// </typeparam>
/// <returns>A <c>task</c> that holds the element read from the stream.</returns>
template<typename T>
pplx::task<T> extract() const
{
pplx::task<T> result;
if (!_verify_and_return_task(details::_in_stream_msg, result)) return result;
return type_parser<CharType, T>::parse(helper()->m_buffer);
}
private:
template<typename T>
bool _verify_and_return_task(const char* msg, pplx::task<T>& tsk) const
{
auto buffer = helper()->m_buffer;
if (!(buffer.exception() == nullptr))
{
tsk = pplx::task_from_exception<T>(buffer.exception());
return false;
}
if (!buffer.can_read())
{
tsk = pplx::task_from_exception<T>(std::make_exception_ptr(std::runtime_error(msg)));
return false;
}
return true;
}
void _verify_and_throw(const char* msg) const
{
auto buffer = helper()->m_buffer;
if (!(buffer.exception() == nullptr)) std::rethrow_exception(buffer.exception());
if (!buffer.can_read()) throw std::runtime_error(msg);
}
std::shared_ptr<details::basic_istream_helper<CharType>> helper() const
{
if (!m_helper) throw std::logic_error("uninitialized stream object");
return m_helper;
}
static const size_t buf_size = 16 * 1024;
struct _read_helper
{
size_t total;
CharType outbuf[buf_size];
size_t write_pos;
bool saw_CR;
bool is_full() const { return write_pos == buf_size; }
_read_helper() : total(0), write_pos(0), saw_CR(false) {}
};
std::shared_ptr<details::basic_istream_helper<CharType>> m_helper;
};
typedef basic_ostream<uint8_t> ostream;
typedef basic_istream<uint8_t> istream;
typedef basic_ostream<utf16char> wostream;
typedef basic_istream<utf16char> wistream;
template<typename CharType>
pplx::task<void> _type_parser_base<CharType>::_skip_whitespace(streams::streambuf<CharType> buffer)
{
int_type req_async = traits::requires_async();
auto update = [=](int_type ch) mutable {
if (isspace(ch))
{
if (buffer.sbumpc() == req_async)
{
// Synchronously because performance is terrible if we
// schedule an empty task. This isn't on a user's thread.
buffer.nextc().wait();
}
return true;
}
return false;
};
auto loop = pplx::details::_do_while([=]() mutable -> pplx::task<bool> {
while (buffer.in_avail() > 0)
{
int_type ch = buffer.sgetc();
if (ch == req_async) break;
if (!update(ch))
{
return pplx::task_from_result(false);
}
}
return buffer.getc().then(update);
});
return loop.then([=](pplx::task<bool> op) { op.wait(); });
}
template<typename CharType>
template<typename StateType, typename ReturnType, typename AcceptFunctor, typename ExtractFunctor>
pplx::task<ReturnType> _type_parser_base<CharType>::_parse_input(concurrency::streams::streambuf<CharType> buffer,
AcceptFunctor accept_character,
ExtractFunctor extract)
{
std::shared_ptr<StateType> state = std::make_shared<StateType>();
auto update = [=](pplx::task<int_type> op) -> pplx::task<bool> {
int_type ch = op.get();
if (ch == traits::eof()) return pplx::task_from_result(false);
bool accptd = accept_character(state, ch);
if (!accptd) return pplx::task_from_result(false);
// We peeked earlier, so now we must advance the position.
concurrency::streams::streambuf<CharType> buf = buffer;
return buf.bumpc().then([](int_type) { return true; });
};
auto peek_char = [=]() -> pplx::task<bool> {
concurrency::streams::streambuf<CharType> buf = buffer;
// If task results are immediately available, there's little need to use ".then(),"
// so optimize for prompt values.
auto get_op = buf.getc();
while (get_op.is_done())
{
auto condition = update(get_op);
if (!condition.is_done() || !condition.get()) return condition;
get_op = buf.getc();
}
return get_op.then(update);
};
auto finish = [=](pplx::task<bool> op) -> pplx::task<ReturnType> {
op.wait();
pplx::task<ReturnType> result = extract(state);
return result;
};
return _skip_whitespace(buffer).then([=](pplx::task<void> op) -> pplx::task<ReturnType> {
op.wait();
return pplx::details::_do_while(peek_char).then(finish);
});
}
template<typename CharType>
class type_parser<CharType, std::basic_string<CharType>> : public _type_parser_base<CharType>
{
typedef _type_parser_base<CharType> base;
public:
typedef typename base::traits traits;
typedef typename base::int_type int_type;
static pplx::task<std::string> parse(streams::streambuf<CharType> buffer)
{
return base::template _parse_input<std::basic_string<CharType>, std::string>(
buffer, _accept_char, _extract_result);
}
private:
static bool _accept_char(std::shared_ptr<std::basic_string<CharType>> state, int_type ch)
{
if (ch == traits::eof() || isspace(ch)) return false;
state->push_back(CharType(ch));
return true;
}
static pplx::task<std::basic_string<CharType>> _extract_result(std::shared_ptr<std::basic_string<CharType>> state)
{
return pplx::task_from_result(*state);
}
};
template<typename CharType>
class type_parser<CharType, int64_t> : public _type_parser_base<CharType>
{
typedef _type_parser_base<CharType> base;
public:
typedef typename base::traits traits;
typedef typename base::int_type int_type;
static pplx::task<int64_t> parse(streams::streambuf<CharType> buffer)
{
return base::template _parse_input<_int64_state, int64_t>(buffer, _accept_char, _extract_result);
}
private:
struct _int64_state
{
_int64_state() : result(0), correct(false), minus(0) {}
int64_t result;
bool correct;
char minus; // 0 -- no sign, 1 -- plus, 2 -- minus
};
static bool _accept_char(std::shared_ptr<_int64_state> state, int_type ch)
{
if (ch == traits::eof()) return false;
if (state->minus == 0)
{
// OK to find a sign.
if (!::isdigit(ch) && ch != int_type('+') && ch != int_type('-')) return false;
}
else
{
if (!::isdigit(ch)) return false;
}
// At least one digit was found.
state->correct = true;
if (ch == int_type('+'))
{
state->minus = 1;
}
else if (ch == int_type('-'))
{
state->minus = 2;
}
else
{
if (state->minus == 0) state->minus = 1;
// Shift the existing value by 10, then add the new value.
bool positive = state->result >= 0;
state->result *= 10;
state->result += int64_t(ch - int_type('0'));
if ((state->result >= 0) != positive)
{
state->correct = false;
return false;
}
}
return true;
}
static pplx::task<int64_t> _extract_result(std::shared_ptr<_int64_state> state)
{
if (!state->correct) throw std::range_error("integer value is too large to fit in 64 bits");
int64_t result = (state->minus == 2) ? -state->result : state->result;
return pplx::task_from_result<int64_t>(result);
}
};
template<typename FloatingPoint>
struct _double_state
{
_double_state()
: result(0)
, minus(0)
, after_comma(0)
, exponent(false)
, exponent_number(0)
, exponent_minus(0)
, complete(false)
, p_exception_string()
{
}
FloatingPoint result;
char minus; // 0 -- no sign, 1 -- plus, 2 -- minus
int after_comma;
bool exponent;
int exponent_number;
char exponent_minus; // 0 -- no sign, 1 -- plus, 2 -- minus
bool complete;
std::string p_exception_string;
};
template<typename FloatingPoint, typename int_type>
static std::string create_exception_message(int_type ch, bool exponent)
{
std::ostringstream os;
os << "Invalid character '" << char(ch) << "'" << (exponent ? " in exponent" : "");
return os.str();
}
template<typename FloatingPoint, typename int_type>
static bool _accept_char(std::shared_ptr<_double_state<FloatingPoint>> state, int_type ch)
{
if (state->minus == 0)
{
if (!::isdigit(ch) && ch != int_type('.') && ch != int_type('+') && ch != int_type('-'))
{
if (!state->complete)
state->p_exception_string = create_exception_message<FloatingPoint, int_type>(ch, false);
return false;
}
}
else
{
if (!state->exponent && !::isdigit(ch) && ch != int_type('.') && ch != int_type('E') && ch != int_type('e'))
{
if (!state->complete)
state->p_exception_string = create_exception_message<FloatingPoint, int_type>(ch, false);
return false;
}
if (state->exponent && !::isdigit(ch) && ch != int_type('+') && ch != int_type('-'))
{
if (!state->complete)
state->p_exception_string = create_exception_message<FloatingPoint, int_type>(ch, true);
return false;
}
}
switch (ch)
{
case int_type('+'):
state->complete = false;
if (state->exponent)
{
if (state->exponent_minus != 0)
{
state->p_exception_string = "The exponent sign already set";
return false;
}
state->exponent_minus = 1;
}
else
{
state->minus = 1;
}
break;
case int_type('-'):
state->complete = false;
if (state->exponent)
{
if (state->exponent_minus != 0)
{
state->p_exception_string = "The exponent sign already set";
return false;
}
state->exponent_minus = 2;
}
else
{
state->minus = 2;
}
break;
case int_type('.'):
state->complete = false;
if (state->after_comma > 0) return false;
state->after_comma = 1;
break;
case int_type('E'):
case int_type('e'):
state->complete = false;
if (state->exponent) return false;
state->exponent_number = 0;
state->exponent = true;
break;
default:
state->complete = true;
if (!state->exponent)
{
if (state->minus == 0) state->minus = 1;
state->result *= 10;
state->result += int64_t(ch - int_type('0'));
if (state->after_comma > 0) state->after_comma++;
}
else
{
if (state->exponent_minus == 0) state->exponent_minus = 1;
state->exponent_number *= 10;
state->exponent_number += int64_t(ch - int_type('0'));
}
}
return true;
}
template<typename FloatingPoint>
static pplx::task<FloatingPoint> _extract_result(std::shared_ptr<_double_state<FloatingPoint>> state)
{
if (state->p_exception_string.length() > 0) throw std::runtime_error(state->p_exception_string.c_str());
if (!state->complete && state->exponent) throw std::runtime_error("Incomplete exponent");
FloatingPoint result = static_cast<FloatingPoint>((state->minus == 2) ? -state->result : state->result);
if (state->exponent_minus == 2) state->exponent_number = 0 - state->exponent_number;
if (state->after_comma > 0) state->exponent_number -= state->after_comma - 1;
if (state->exponent_number >= 0)
{
result *= pow(FloatingPoint(10.0), state->exponent_number);
#pragma push_macro("max")
#undef max
if (result > std::numeric_limits<FloatingPoint>::max() || result < -std::numeric_limits<FloatingPoint>::max())
throw std::overflow_error("The value is too big");
#pragma pop_macro("max")
}
else
{
bool is_zero = (result == 0);
result /= pow(FloatingPoint(10.0), -state->exponent_number);
if (!is_zero && result > -std::numeric_limits<FloatingPoint>::denorm_min() &&
result < std::numeric_limits<FloatingPoint>::denorm_min())
throw std::underflow_error("The value is too small");
}
return pplx::task_from_result<FloatingPoint>(result);
}
template<typename CharType>
class type_parser<CharType, double> : public _type_parser_base<CharType>
{
typedef _type_parser_base<CharType> base;
public:
typedef typename base::traits traits;
typedef typename base::int_type int_type;
static pplx::task<double> parse(streams::streambuf<CharType> buffer)
{
return base::template _parse_input<_double_state<double>, double>(
buffer, _accept_char<double, int_type>, _extract_result<double>);
}
protected:
};
template<typename CharType>
class type_parser<CharType, float> : public _type_parser_base<CharType>
{
typedef _type_parser_base<CharType> base;
public:
typedef typename base::traits traits;
typedef typename base::int_type int_type;
static pplx::task<float> parse(streams::streambuf<CharType> buffer)
{
return base::template _parse_input<_double_state<float>, float>(
buffer, _accept_char<float, int_type>, _extract_result<float>);
}
protected:
};
template<typename CharType>
class type_parser<CharType, uint64_t> : public _type_parser_base<CharType>
{
typedef _type_parser_base<CharType> base;
public:
typedef typename base::traits traits;
typedef typename base::int_type int_type;
static pplx::task<uint64_t> parse(streams::streambuf<CharType> buffer)
{
return base::template _parse_input<_uint64_state, uint64_t>(buffer, _accept_char, _extract_result);
}
private:
struct _uint64_state
{
_uint64_state() : result(0), correct(false) {}
uint64_t result;
bool correct;
};
static bool _accept_char(std::shared_ptr<_uint64_state> state, int_type ch)
{
if (!::isdigit(ch)) return false;
// At least one digit was found.
state->correct = true;
// Shift the existing value by 10, then add the new value.
state->result *= 10;
state->result += uint64_t(ch - int_type('0'));
return true;
}
static pplx::task<uint64_t> _extract_result(std::shared_ptr<_uint64_state> state)
{
if (!state->correct) throw std::range_error("integer value is too large to fit in 64 bits");
return pplx::task_from_result(state->result);
}
};
template<typename CharType>
class type_parser<CharType, bool> : public _type_parser_base<CharType>
{
typedef _type_parser_base<CharType> base;
public:
typedef typename base::traits traits;
typedef typename base::int_type int_type;
static pplx::task<bool> parse(streams::streambuf<CharType> buffer)
{
return base::template _parse_input<_bool_state, bool>(buffer, _accept_char, _extract_result);
}
private:
struct _bool_state
{
_bool_state() : state(0) {}
// { 0 -- not started, 1 -- 't', 2 -- 'tr', 3 -- 'tru', 4 -- 'f', 5 -- 'fa', 6 -- 'fal', 7 -- 'fals', 8 --
// 'true', 9 -- 'false' }
short state;
};
static bool _accept_char(std::shared_ptr<_bool_state> state, int_type ch)
{
switch (state->state)
{
case 0:
if (ch == int_type('t'))
state->state = 1;
else if (ch == int_type('f'))
state->state = 4;
else if (ch == int_type('1'))
state->state = 8;
else if (ch == int_type('0'))
state->state = 9;
else
return false;
break;
case 1:
if (ch == int_type('r'))
state->state = 2;
else
return false;
break;
case 2:
if (ch == int_type('u'))
state->state = 3;
else
return false;
break;
case 3:
if (ch == int_type('e'))
state->state = 8;
else
return false;
break;
case 4:
if (ch == int_type('a'))
state->state = 5;
else
return false;
break;
case 5:
if (ch == int_type('l'))
state->state = 6;
else
return false;
break;
case 6:
if (ch == int_type('s'))
state->state = 7;
else
return false;
break;
case 7:
if (ch == int_type('e'))
state->state = 9;
else
return false;
break;
case 8:
case 9: return false;
}
return true;
}
static pplx::task<bool> _extract_result(std::shared_ptr<_bool_state> state)
{
bool correct = (state->state == 8 || state->state == 9);
if (!correct)
{
std::runtime_error exc("cannot parse as Boolean value");
throw exc;
}
return pplx::task_from_result(state->state == 8);
}
};
template<typename CharType>
class type_parser<CharType, signed char> : public _type_parser_base<CharType>
{
typedef _type_parser_base<CharType> base;
public:
typedef typename base::traits traits;
typedef typename base::int_type int_type;
static pplx::task<signed char> parse(streams::streambuf<CharType> buffer)
{
return base::_skip_whitespace(buffer).then([=](pplx::task<void> op) -> pplx::task<signed char> {
op.wait();
return type_parser<CharType, signed char>::_get_char(buffer);
});
}
private:
static pplx::task<signed char> _get_char(streams::streambuf<CharType> buffer)
{
concurrency::streams::streambuf<CharType> buf = buffer;
return buf.bumpc().then([=](pplx::task<int_type> op) -> signed char {
int_type val = op.get();
if (val == traits::eof()) throw std::runtime_error("reached end-of-stream while constructing a value");
return static_cast<signed char>(val);
});
}
};
template<typename CharType>
class type_parser<CharType, unsigned char> : public _type_parser_base<CharType>
{
typedef _type_parser_base<CharType> base;
public:
typedef typename base::traits traits;
typedef typename base::int_type int_type;
static pplx::task<unsigned char> parse(streams::streambuf<CharType> buffer)
{
return base::_skip_whitespace(buffer).then([=](pplx::task<void> op) -> pplx::task<unsigned char> {
op.wait();
return type_parser<CharType, unsigned char>::_get_char(buffer);
});
}
private:
static pplx::task<unsigned char> _get_char(streams::streambuf<CharType> buffer)
{
concurrency::streams::streambuf<CharType> buf = buffer;
return buf.bumpc().then([=](pplx::task<int_type> op) -> unsigned char {
int_type val = op.get();
if (val == traits::eof()) throw std::runtime_error("reached end-of-stream while constructing a value");
return static_cast<unsigned char>(val);
});
}
};
template<typename CharType>
class type_parser<CharType, char> : public _type_parser_base<CharType>
{
typedef _type_parser_base<CharType> base;
public:
typedef typename base::traits traits;
typedef typename base::int_type int_type;
static pplx::task<char> parse(streams::streambuf<CharType> buffer)
{
return base::_skip_whitespace(buffer).then([=](pplx::task<void> op) -> pplx::task<char> {
op.wait();
return _get_char(buffer);
});
}
private:
static pplx::task<char> _get_char(streams::streambuf<CharType> buffer)
{
concurrency::streams::streambuf<CharType> buf = buffer;
return buf.bumpc().then([=](pplx::task<int_type> op) -> char {
int_type val = op.get();
if (val == traits::eof()) throw std::runtime_error("reached end-of-stream while constructing a value");
return char(val);
});
}
};
#ifdef _WIN32
template<class CharType>
class type_parser<CharType, std::enable_if_t<sizeof(CharType) == 1, std::basic_string<wchar_t>>>
: public _type_parser_base<CharType>
{
typedef _type_parser_base<CharType> base;
public:
typedef typename base::traits traits;
typedef typename base::int_type int_type;
static pplx::task<std::wstring> parse(streams::streambuf<CharType> buffer)
{
return base::template _parse_input<std::basic_string<char>, std::basic_string<wchar_t>>(
buffer, _accept_char, _extract_result);
}
private:
static bool _accept_char(const std::shared_ptr<std::basic_string<char>>& state, int_type ch)
{
if (ch == concurrency::streams::char_traits<char>::eof() || isspace(ch)) return false;
state->push_back(char(ch));
return true;
}
static pplx::task<std::basic_string<wchar_t>> _extract_result(std::shared_ptr<std::basic_string<char>> state)
{
return pplx::task_from_result(utility::conversions::utf8_to_utf16(*state));
}
};
#endif //_WIN32
} // namespace streams
} // namespace Concurrency
#endif
/***
* Copyright (C) Microsoft. All rights reserved.
* Licensed under the MIT license. See LICENSE.txt file in the project root for full license information.
*
* =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
*
* Protocol independent support for URIs.
*
* For the latest on this and related APIs, please see: https://github.com/Microsoft/cpprestsdk
*
* =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
****/
#pragma once
#ifndef CASA_URI_H
#define CASA_URI_H
#include "cpprest/base_uri.h"
#include "cpprest/uri_builder.h"
#endif
/***
* Copyright (C) Microsoft. All rights reserved.
* Licensed under the MIT license. See LICENSE.txt file in the project root for full license information.
*
* =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
*
* Builder style class for creating URIs.
*
* For the latest on this and related APIs, please see: https://github.com/Microsoft/cpprestsdk
*
* =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
****/
#pragma once
#include "cpprest/base_uri.h"
#include <string>
namespace web
{
/// <summary>
/// Builder for constructing URIs incrementally.
/// </summary>
class uri_builder
{
public:
/// <summary>
/// Creates a builder with an initially empty URI.
/// </summary>
uri_builder() = default;
/// <summary>
/// Creates a builder with a existing URI object.
/// </summary>
/// <param name="uri_str">Encoded string containing the URI.</param>
uri_builder(const uri& uri_str) : m_uri(uri_str.m_components) {}
/// <summary>
/// Get the scheme component of the URI as an encoded string.
/// </summary>
/// <returns>The URI scheme as a string.</returns>
const utility::string_t& scheme() const { return m_uri.m_scheme; }
/// <summary>
/// Get the user information component of the URI as an encoded string.
/// </summary>
/// <returns>The URI user information as a string.</returns>
const utility::string_t& user_info() const { return m_uri.m_user_info; }
/// <summary>
/// Get the host component of the URI as an encoded string.
/// </summary>
/// <returns>The URI host as a string.</returns>
const utility::string_t& host() const { return m_uri.m_host; }
/// <summary>
/// Get the port component of the URI. Returns -1 if no port is specified.
/// </summary>
/// <returns>The URI port as an integer.</returns>
int port() const { return m_uri.m_port; }
/// <summary>
/// Get the path component of the URI as an encoded string.
/// </summary>
/// <returns>The URI path as a string.</returns>
const utility::string_t& path() const { return m_uri.m_path; }
/// <summary>
/// Get the query component of the URI as an encoded string.
/// </summary>
/// <returns>The URI query as a string.</returns>
const utility::string_t& query() const { return m_uri.m_query; }
/// <summary>
/// Get the fragment component of the URI as an encoded string.
/// </summary>
/// <returns>The URI fragment as a string.</returns>
const utility::string_t& fragment() const { return m_uri.m_fragment; }
/// <summary>
/// Set the scheme of the URI.
/// </summary>
/// <param name="scheme">Uri scheme.</param>
/// <returns>A reference to this <c>uri_builder</c> to support chaining.</returns>
uri_builder& set_scheme(const utility::string_t& scheme)
{
m_uri.m_scheme = scheme;
return *this;
}
/// <summary>
/// Set the user info component of the URI.
/// </summary>
/// <param name="user_info">User info as a decoded string.</param>
/// <param name="do_encoding">Specify whether to apply URI encoding to the given string.</param>
/// <returns>A reference to this <c>uri_builder</c> to support chaining.</returns>
uri_builder& set_user_info(const utility::string_t& user_info, bool do_encoding = false)
{
if (do_encoding)
{
m_uri.m_user_info = uri::encode_uri(user_info, uri::components::user_info);
}
else
{
m_uri.m_user_info = user_info;
}
return *this;
}
/// <summary>
/// Set the host component of the URI.
/// </summary>
/// <param name="host">Host as a decoded string.</param>
/// <param name="do_encoding">Specify whether to apply URI encoding to the given string.</param>
/// <returns>A reference to this <c>uri_builder</c> to support chaining.</returns>
uri_builder& set_host(const utility::string_t& host, bool do_encoding = false)
{
if (do_encoding)
{
m_uri.m_host = uri::encode_uri(host, uri::components::host);
}
else
{
m_uri.m_host = host;
}
return *this;
}
/// <summary>
/// Set the port component of the URI.
/// </summary>
/// <param name="port">Port as an integer.</param>
/// <returns>A reference to this <c>uri_builder</c> to support chaining.</returns>
uri_builder& set_port(int port)
{
m_uri.m_port = port;
return *this;
}
/// <summary>
/// Set the port component of the URI.
/// </summary>
/// <param name="port">Port as a string.</param>
/// <returns>A reference to this <c>uri_builder</c> to support chaining.</returns>
/// <remarks>When string can't be converted to an integer the port is left unchanged.</remarks>
_ASYNCRTIMP uri_builder& set_port(const utility::string_t& port);
/// <summary>
/// Set the path component of the URI.
/// </summary>
/// <param name="path">Path as a decoded string.</param>
/// <param name="do_encoding">Specify whether to apply URI encoding to the given string.</param>
/// <returns>A reference to this <c>uri_builder</c> to support chaining.</returns>
uri_builder& set_path(const utility::string_t& path, bool do_encoding = false)
{
if (do_encoding)
{
m_uri.m_path = uri::encode_uri(path, uri::components::path);
}
else
{
m_uri.m_path = path;
}
return *this;
}
/// <summary>
/// Set the query component of the URI.
/// </summary>
/// <param name="query">Query as a decoded string.</param>
/// <param name="do_encoding">Specify whether apply URI encoding to the given string.</param>
/// <returns>A reference to this <c>uri_builder</c> to support chaining.</returns>
uri_builder& set_query(const utility::string_t& query, bool do_encoding = false)
{
if (do_encoding)
{
m_uri.m_query = uri::encode_uri(query, uri::components::query);
}
else
{
m_uri.m_query = query;
}
return *this;
}
/// <summary>
/// Set the fragment component of the URI.
/// </summary>
/// <param name="fragment">Fragment as a decoded string.</param>
/// <param name="do_encoding">Specify whether to apply URI encoding to the given string.</param>
/// <returns>A reference to this <c>uri_builder</c> to support chaining.</returns>
uri_builder& set_fragment(const utility::string_t& fragment, bool do_encoding = false)
{
if (do_encoding)
{
m_uri.m_fragment = uri::encode_uri(fragment, uri::components::fragment);
}
else
{
m_uri.m_fragment = fragment;
}
return *this;
}
/// <summary>
/// Clears all components of the underlying URI in this uri_builder.
/// </summary>
void clear() { m_uri = details::uri_components(); }
/// <summary>
/// Appends another path to the path of this uri_builder.
/// </summary>
/// <param name="path">Path to append as a already encoded string.</param>
/// <param name="do_encoding">Specify whether to apply URI encoding to the given string.</param>
/// <returns>A reference to this uri_builder to support chaining.</returns>
_ASYNCRTIMP uri_builder& append_path(const utility::string_t& path, bool do_encoding = false);
/// <summary>
/// Appends the raw contents of the path argument to the path of this uri_builder with no separator de-duplication.
/// </summary>
/// <remarks>
/// The path argument is appended after adding a '/' separator without regards to the contents of path. If an empty
/// string is provided, this function will immediately return without changes to the stored path value. For example:
/// if the current contents are "/abc" and path="/xyz", the result will be "/abc//xyz".
/// </remarks>
/// <param name="path">Path to append as a already encoded string.</param>
/// <param name="do_encoding">Specify whether to apply URI encoding to the given string.</param>
/// <returns>A reference to this uri_builder to support chaining.</returns>
_ASYNCRTIMP uri_builder& append_path_raw(const utility::string_t& path, bool do_encoding = false);
/// <summary>
/// Appends another query to the query of this uri_builder.
/// </summary>
/// <param name="query">Query to append as a decoded string.</param>
/// <param name="do_encoding">Specify whether to apply URI encoding to the given string.</param>
/// <returns>A reference to this uri_builder to support chaining.</returns>
_ASYNCRTIMP uri_builder& append_query(const utility::string_t& query, bool do_encoding = false);
/// <summary>
/// Appends an relative uri (Path, Query and fragment) at the end of the current uri.
/// </summary>
/// <param name="relative_uri">The relative uri to append.</param>
/// <returns>A reference to this uri_builder to support chaining.</returns>
_ASYNCRTIMP uri_builder& append(const uri& relative_uri);
/// <summary>
/// Appends another query to the query of this uri_builder, encoding it first. This overload is useful when building
/// a query segment of the form "element=10", where the right hand side of the query is stored as a type other than
/// a string, for instance, an integral type.
/// </summary>
/// <param name="name">The name portion of the query string</param>
/// <param name="value">The value portion of the query string</param>
/// <returns>A reference to this uri_builder to support chaining.</returns>
template<typename T>
uri_builder& append_query(const utility::string_t& name, const T& value, bool do_encoding = true)
{
if (do_encoding)
append_query_encode_impl(name, utility::conversions::details::print_utf8string(value));
else
append_query_no_encode_impl(name, utility::conversions::details::print_string(value));
return *this;
}
/// <summary>
/// Combine and validate the URI components into a encoded string. An exception will be thrown if the URI is
/// invalid.
/// </summary>
/// <returns>The created URI as a string.</returns>
_ASYNCRTIMP utility::string_t to_string() const;
/// <summary>
/// Combine and validate the URI components into a URI class instance. An exception will be thrown if the URI is
/// invalid.
/// </summary>
/// <returns>The create URI as a URI class instance.</returns>
_ASYNCRTIMP uri to_uri() const;
/// <summary>
/// Validate the generated URI from all existing components of this uri_builder.
/// </summary>
/// <returns>Whether the URI is valid.</returns>
_ASYNCRTIMP bool is_valid();
private:
_ASYNCRTIMP void append_query_encode_impl(const utility::string_t& name, const utf8string& value);
_ASYNCRTIMP void append_query_no_encode_impl(const utility::string_t& name, const utility::string_t& value);
details::uri_components m_uri;
};
} // namespace web
/***
* Copyright (C) Microsoft. All rights reserved.
* Licensed under the MIT license. See LICENSE.txt file in the project root for full license information.
*
*/
#define CPPREST_VERSION_MINOR 10
#define CPPREST_VERSION_MAJOR 2
#define CPPREST_VERSION_REVISION 12
#define CPPREST_VERSION (CPPREST_VERSION_MAJOR * 100000 + CPPREST_VERSION_MINOR * 100 + CPPREST_VERSION_REVISION)
/***
* Copyright (C) Microsoft. All rights reserved.
* Licensed under the MIT license. See LICENSE.txt file in the project root for full license information.
*
* =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
*
* Websocket client side implementation
*
* =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
****/
#pragma once
#ifndef CASA_WS_CLIENT_H
#define CASA_WS_CLIENT_H
#include "cpprest/details/basic_types.h"
#if !defined(CPPREST_EXCLUDE_WEBSOCKETS)
#include "cpprest/asyncrt_utils.h"
#include "cpprest/details/web_utilities.h"
#include "cpprest/http_headers.h"
#include "cpprest/uri.h"
#include "cpprest/ws_msg.h"
#include "pplx/pplxtasks.h"
#include <condition_variable>
#include <limits>
#include <memory>
#include <mutex>
#if !defined(_WIN32) || !defined(__cplusplus_winrt)
#if defined(__clang__)
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wconversion"
#endif
#include "boost/asio/ssl.hpp"
#if defined(__clang__)
#pragma clang diagnostic pop
#endif
#endif
namespace web
{
// For backwards compatibility for when in the experimental namespace.
// At next major release this should be deleted.
namespace experimental = web;
// In the past namespace was accidentally called 'web_sockets'. To avoid breaking code
// alias it. At our next major release this should be deleted.
namespace web_sockets = websockets;
namespace websockets
{
/// WebSocket client side library.
namespace client
{
/// Websocket close status values.
enum class websocket_close_status
{
normal = 1000,
going_away = 1001,
protocol_error = 1002,
unsupported = 1003, // or data_mismatch
abnormal_close = 1006,
inconsistent_datatype = 1007,
policy_violation = 1008,
too_large = 1009,
negotiate_error = 1010,
server_terminate = 1011,
};
/// <summary>
/// Websocket client configuration class, used to set the possible configuration options
/// used to create an websocket_client instance.
/// </summary>
class websocket_client_config
{
public:
/// <summary>
/// Creates a websocket client configuration with default settings.
/// </summary>
websocket_client_config() : m_sni_enabled(true), m_validate_certificates(true) {}
/// <summary>
/// Get the web proxy object
/// </summary>
/// <returns>A reference to the web proxy object.</returns>
const web_proxy& proxy() const { return m_proxy; }
/// <summary>
/// Set the web proxy object
/// </summary>
/// <param name="proxy">The web proxy object.</param>
void set_proxy(const web_proxy& proxy) { m_proxy = proxy; }
/// <summary>
/// Get the client credentials
/// </summary>
/// <returns>A reference to the client credentials.</returns>
const web::credentials& credentials() const { return m_credentials; }
/// <summary>
/// Set the client credentials
/// </summary>
/// <param name="cred">The client credentials.</param>
void set_credentials(const web::credentials& cred) { m_credentials = cred; }
/// <summary>
/// Disables Server Name Indication (SNI). Default is on.
/// </summary>
void disable_sni() { m_sni_enabled = false; }
/// <summary>
/// Determines if Server Name Indication (SNI) is enabled.
/// </summary>
/// <returns>True if enabled, false otherwise.</returns>
bool is_sni_enabled() const { return m_sni_enabled; }
/// <summary>
/// Sets the server host name to use for TLS Server Name Indication (SNI).
/// </summary>
/// <remarks>By default the host name is set to the websocket URI host.</remarks>
/// <param name="name">The host name to use, as a string.</param>
void set_server_name(const utf8string& name) { m_sni_hostname = name; }
/// <summary>
/// Gets the server host name to use for TLS Server Name Indication (SNI).
/// </summary>
/// <returns>Host name as a string.</returns>
const utf8string& server_name() const { return m_sni_hostname; }
/// <summary>
/// Sets the User Agent to be used for the connection
/// </summary>
/// <param name="name">The User Agent to use, as a string.</param>
_ASYNCRTIMP void set_user_agent(const utf8string& user_agent);
/// <summary>
/// Gets the headers of the HTTP request message used in the WebSocket protocol handshake.
/// </summary>
/// <returns>HTTP headers for the WebSocket protocol handshake.</returns>
/// <remarks>
/// Use the <seealso cref="http_headers::add Method"/> to fill in desired headers.
/// </remarks>
web::http::http_headers& headers() { return m_headers; }
/// <summary>
/// Gets a const reference to the headers of the WebSocket protocol handshake HTTP message.
/// </summary>
/// <returns>HTTP headers.</returns>
const web::http::http_headers& headers() const { return m_headers; }
/// <summary>
/// Adds a subprotocol to the request headers.
/// </summary>
/// <param name="name">The name of the subprotocol.</param>
/// <remarks>If additional subprotocols have already been specified, the new one will just be added.</remarks>
_ASYNCRTIMP void add_subprotocol(const ::utility::string_t& name);
/// <summary>
/// Gets list of the specified subprotocols.
/// </summary>
/// <returns>Vector of all the subprotocols </returns>
/// <remarks>If you want all the subprotocols in a comma separated string
/// they can be directly looked up in the headers using 'Sec-WebSocket-Protocol'.</remarks>
_ASYNCRTIMP std::vector<::utility::string_t> subprotocols() const;
/// <summary>
/// Gets the server certificate validation property.
/// </summary>
/// <returns>True if certificates are to be verified, false otherwise.</returns>
bool validate_certificates() const { return m_validate_certificates; }
/// <summary>
/// Sets the server certificate validation property.
/// </summary>
/// <param name="validate_certs">False to turn ignore all server certificate validation errors, true
/// otherwise.</param> <remarks>Note ignoring certificate errors can be dangerous and should be done with
/// caution.</remarks>
void set_validate_certificates(bool validate_certs) { m_validate_certificates = validate_certs; }
#if !defined(_WIN32) || !defined(__cplusplus_winrt)
/// <summary>
/// Sets a callback to enable custom setting of the ssl context, at construction time.
/// </summary>
/// <param name="callback">A user callback allowing for customization of the ssl context at construction
/// time.</param>
void set_ssl_context_callback(const std::function<void(boost::asio::ssl::context&)>& callback)
{
m_ssl_context_callback = callback;
}
/// <summary>
/// Gets the user's callback to allow for customization of the ssl context.
/// </summary>
const std::function<void(boost::asio::ssl::context&)>& get_ssl_context_callback() const
{
return m_ssl_context_callback;
}
#endif
private:
web::web_proxy m_proxy;
web::credentials m_credentials;
web::http::http_headers m_headers;
bool m_sni_enabled;
utf8string m_sni_hostname;
bool m_validate_certificates;
#if !defined(_WIN32) || !defined(__cplusplus_winrt)
std::function<void(boost::asio::ssl::context&)> m_ssl_context_callback;
#endif
};
/// <summary>
/// Represents a websocket error. This class holds an error message and an optional error code.
/// </summary>
class websocket_exception : public std::exception
{
public:
/// <summary>
/// Creates an <c>websocket_exception</c> with just a string message and no error code.
/// </summary>
/// <param name="whatArg">Error message string.</param>
websocket_exception(const utility::string_t& whatArg) : m_msg(utility::conversions::to_utf8string(whatArg)) {}
#ifdef _WIN32
/// <summary>
/// Creates an <c>websocket_exception</c> with just a string message and no error code.
/// </summary>
/// <param name="whatArg">Error message string.</param>
websocket_exception(std::string whatArg) : m_msg(std::move(whatArg)) {}
#endif
/// <summary>
/// Creates a <c>websocket_exception</c> from a error code using the current platform error category.
/// The message of the error code will be used as the what() string message.
/// </summary>
/// <param name="errorCode">Error code value.</param>
websocket_exception(int errorCode) : m_errorCode(utility::details::create_error_code(errorCode))
{
m_msg = m_errorCode.message();
}
/// <summary>
/// Creates a <c>websocket_exception</c> from a error code using the current platform error category.
/// </summary>
/// <param name="errorCode">Error code value.</param>
/// <param name="whatArg">Message to use in what() string.</param>
websocket_exception(int errorCode, const utility::string_t& whatArg)
: m_errorCode(utility::details::create_error_code(errorCode))
, m_msg(utility::conversions::to_utf8string(whatArg))
{
}
#ifdef _WIN32
/// <summary>
/// Creates a <c>websocket_exception</c> from a error code and string message.
/// </summary>
/// <param name="errorCode">Error code value.</param>
/// <param name="whatArg">Message to use in what() string.</param>
websocket_exception(int errorCode, std::string whatArg)
: m_errorCode(utility::details::create_error_code(errorCode)), m_msg(std::move(whatArg))
{
}
/// <summary>
/// Creates a <c>websocket_exception</c> from a error code and string message to use as the what() argument.
/// <param name="code">Error code.</param>
/// <param name="whatArg">Message to use in what() string.</param>
/// </summary>
websocket_exception(std::error_code code, std::string whatArg)
: m_errorCode(std::move(code)), m_msg(std::move(whatArg))
{
}
#endif
/// <summary>
/// Creates a <c>websocket_exception</c> from a error code and category. The message of the error code will be used
/// as the <c>what</c> string message.
/// </summary>
/// <param name="errorCode">Error code value.</param>
/// <param name="cat">Error category for the code.</param>
websocket_exception(int errorCode, const std::error_category& cat) : m_errorCode(std::error_code(errorCode, cat))
{
m_msg = m_errorCode.message();
}
/// <summary>
/// Creates a <c>websocket_exception</c> from a error code and string message to use as the what() argument.
/// <param name="code">Error code.</param>
/// <param name="whatArg">Message to use in what() string.</param>
/// </summary>
websocket_exception(std::error_code code, const utility::string_t& whatArg)
: m_errorCode(std::move(code)), m_msg(utility::conversions::to_utf8string(whatArg))
{
}
/// <summary>
/// Gets a string identifying the cause of the exception.
/// </summary>
/// <returns>A null terminated character string.</returns>
const char* what() const CPPREST_NOEXCEPT { return m_msg.c_str(); }
/// <summary>
/// Gets the underlying error code for the cause of the exception.
/// </summary>
/// <returns>The <c>error_code</c> object associated with the exception.</returns>
const std::error_code& error_code() const CPPREST_NOEXCEPT { return m_errorCode; }
private:
std::error_code m_errorCode;
std::string m_msg;
};
namespace details
{
// Interface to be implemented by the websocket client callback implementations.
class websocket_client_callback_impl
{
public:
websocket_client_callback_impl(websocket_client_config config) : m_config(std::move(config)) {}
virtual ~websocket_client_callback_impl() CPPREST_NOEXCEPT {}
virtual pplx::task<void> connect() = 0;
virtual pplx::task<void> send(websocket_outgoing_message& msg) = 0;
virtual void set_message_handler(const std::function<void(const websocket_incoming_message&)>& handler) = 0;
virtual pplx::task<void> close() = 0;
virtual pplx::task<void> close(websocket_close_status close_status,
const utility::string_t& close_reason = _XPLATSTR("")) = 0;
virtual void set_close_handler(
const std::function<void(websocket_close_status, const utility::string_t&, const std::error_code&)>&
handler) = 0;
const web::uri& uri() const { return m_uri; }
void set_uri(const web::uri& uri) { m_uri = uri; }
const websocket_client_config& config() const { return m_config; }
static void verify_uri(const web::uri& uri)
{
// Most of the URI schema validation is taken care by URI class.
// We only need to check certain things specific to websockets.
if (uri.scheme() != _XPLATSTR("ws") && uri.scheme() != _XPLATSTR("wss"))
{
throw std::invalid_argument("URI scheme must be 'ws' or 'wss'");
}
if (uri.host().empty())
{
throw std::invalid_argument("URI must contain a hostname.");
}
// Fragment identifiers are meaningless in the context of WebSocket URIs
// and MUST NOT be used on these URIs.
if (!uri.fragment().empty())
{
throw std::invalid_argument("WebSocket URI must not contain fragment identifiers");
}
}
protected:
web::uri m_uri;
websocket_client_config m_config;
};
// Interface to be implemented by the websocket client task implementations.
class websocket_client_task_impl
{
public:
_ASYNCRTIMP websocket_client_task_impl(websocket_client_config config);
_ASYNCRTIMP virtual ~websocket_client_task_impl() CPPREST_NOEXCEPT;
_ASYNCRTIMP pplx::task<websocket_incoming_message> receive();
_ASYNCRTIMP void close_pending_tasks_with_error(const websocket_exception& exc);
const std::shared_ptr<websocket_client_callback_impl>& callback_client() const { return m_callback_client; };
private:
void set_handler();
// When a message arrives, if there are tasks waiting for a message, signal the topmost one.
// Else enqueue the message in a queue.
// m_receive_queue_lock : to guard access to the queue & m_client_closed
std::mutex m_receive_queue_lock;
// Queue to store incoming messages when there are no tasks waiting for a message
std::queue<websocket_incoming_message> m_receive_msg_queue;
// Queue to maintain the receive tasks when there are no messages(yet).
std::queue<pplx::task_completion_event<websocket_incoming_message>> m_receive_task_queue;
// Initially set to false, becomes true if a close frame is received from the server or
// if the underlying connection is aborted or terminated.
bool m_client_closed;
std::shared_ptr<websocket_client_callback_impl> m_callback_client;
};
} // namespace details
/// <summary>
/// Websocket client class, used to maintain a connection to a remote host for an extended session.
/// </summary>
class websocket_client
{
public:
/// <summary>
/// Creates a new websocket_client.
/// </summary>
websocket_client() : m_client(std::make_shared<details::websocket_client_task_impl>(websocket_client_config())) {}
/// <summary>
/// Creates a new websocket_client.
/// </summary>
/// <param name="config">The client configuration object containing the possible configuration options to initialize
/// the <c>websocket_client</c>. </param>
websocket_client(websocket_client_config config)
: m_client(std::make_shared<details::websocket_client_task_impl>(std::move(config)))
{
}
/// <summary>
/// Connects to the remote network destination. The connect method initiates the websocket handshake with the
/// remote network destination, takes care of the protocol upgrade request.
/// </summary>
/// <param name="uri">The uri address to connect. </param>
/// <returns>An asynchronous operation that is completed once the client has successfully connected to the websocket
/// server.</returns>
pplx::task<void> connect(const web::uri& uri)
{
m_client->callback_client()->verify_uri(uri);
m_client->callback_client()->set_uri(uri);
auto client = m_client;
return m_client->callback_client()->connect().then([client](pplx::task<void> result) {
try
{
result.get();
}
catch (const websocket_exception& ex)
{
client->close_pending_tasks_with_error(ex);
throw;
}
});
}
/// <summary>
/// Sends a websocket message to the server .
/// </summary>
/// <returns>An asynchronous operation that is completed once the message is sent.</returns>
pplx::task<void> send(websocket_outgoing_message msg) { return m_client->callback_client()->send(msg); }
/// <summary>
/// Receive a websocket message.
/// </summary>
/// <returns>An asynchronous operation that is completed when a message has been received by the client
/// endpoint.</returns>
pplx::task<websocket_incoming_message> receive() { return m_client->receive(); }
/// <summary>
/// Closes a websocket client connection, sends a close frame to the server and waits for a close message from the
/// server.
/// </summary>
/// <returns>An asynchronous operation that is completed the connection has been successfully closed.</returns>
pplx::task<void> close() { return m_client->callback_client()->close(); }
/// <summary>
/// Closes a websocket client connection, sends a close frame to the server and waits for a close message from the
/// server.
/// </summary>
/// <param name="close_status">Endpoint MAY use the following pre-defined status codes when sending a Close
/// frame.</param> <param name="close_reason">While closing an established connection, an endpoint may indicate the
/// reason for closure.</param> <returns>An asynchronous operation that is completed the connection has been
/// successfully closed.</returns>
pplx::task<void> close(websocket_close_status close_status, const utility::string_t& close_reason = _XPLATSTR(""))
{
return m_client->callback_client()->close(close_status, close_reason);
}
/// <summary>
/// Gets the websocket client URI.
/// </summary>
/// <returns>URI connected to.</returns>
const web::uri& uri() const { return m_client->callback_client()->uri(); }
/// <summary>
/// Gets the websocket client config object.
/// </summary>
/// <returns>A reference to the client configuration object.</returns>
const websocket_client_config& config() const { return m_client->callback_client()->config(); }
private:
std::shared_ptr<details::websocket_client_task_impl> m_client;
};
/// <summary>
/// Websocket client class, used to maintain a connection to a remote host for an extended session, uses callback APIs
/// for handling receive and close event instead of async task. For some scenarios would be a alternative for the
/// websocket_client like if you want to special handling on close event.
/// </summary>
class websocket_callback_client
{
public:
/// <summary>
/// Creates a new websocket_callback_client.
/// </summary>
_ASYNCRTIMP websocket_callback_client();
/// <summary>
/// Creates a new websocket_callback_client.
/// </summary>
/// <param name="client_config">The client configuration object containing the possible configuration options to
/// initialize the <c>websocket_client</c>. </param>
_ASYNCRTIMP websocket_callback_client(websocket_client_config client_config);
/// <summary>
/// Connects to the remote network destination. The connect method initiates the websocket handshake with the
/// remote network destination, takes care of the protocol upgrade request.
/// </summary>
/// <param name="uri">The uri address to connect. </param>
/// <returns>An asynchronous operation that is completed once the client has successfully connected to the websocket
/// server.</returns>
pplx::task<void> connect(const web::uri& uri)
{
m_client->verify_uri(uri);
m_client->set_uri(uri);
return m_client->connect();
}
/// <summary>
/// Sends a websocket message to the server .
/// </summary>
/// <returns>An asynchronous operation that is completed once the message is sent.</returns>
pplx::task<void> send(websocket_outgoing_message msg) { return m_client->send(msg); }
/// <summary>
/// Set the received handler for notification of client websocket messages.
/// </summary>
/// <param name="handler">A function representing the incoming websocket messages handler. It's parameters are:
/// msg: a <c>websocket_incoming_message</c> value indicating the message received
/// </param>
/// <remarks>If this handler is not set before connecting incoming messages will be missed.</remarks>
void set_message_handler(const std::function<void(const websocket_incoming_message& msg)>& handler)
{
m_client->set_message_handler(handler);
}
/// <summary>
/// Closes a websocket client connection, sends a close frame to the server and waits for a close message from the
/// server.
/// </summary>
/// <returns>An asynchronous operation that is completed the connection has been successfully closed.</returns>
pplx::task<void> close() { return m_client->close(); }
/// <summary>
/// Closes a websocket client connection, sends a close frame to the server and waits for a close message from the
/// server.
/// </summary>
/// <param name="close_status">Endpoint MAY use the following pre-defined status codes when sending a Close
/// frame.</param> <param name="close_reason">While closing an established connection, an endpoint may indicate the
/// reason for closure.</param> <returns>An asynchronous operation that is completed the connection has been
/// successfully closed.</returns>
pplx::task<void> close(websocket_close_status close_status, const utility::string_t& close_reason = _XPLATSTR(""))
{
return m_client->close(close_status, close_reason);
}
/// <summary>
/// Set the closed handler for notification of client websocket closing event.
/// </summary>
/// <param name="handler">The handler for websocket closing event, It's parameters are:
/// close_status: The pre-defined status codes used by the endpoint when sending a Close frame.
/// reason: The reason string used by the endpoint when sending a Close frame.
/// error: The error code if the websocket is closed with abnormal error.
/// </param>
void set_close_handler(const std::function<void(websocket_close_status close_status,
const utility::string_t& reason,
const std::error_code& error)>& handler)
{
m_client->set_close_handler(handler);
}
/// <summary>
/// Gets the websocket client URI.
/// </summary>
/// <returns>URI connected to.</returns>
const web::uri& uri() const { return m_client->uri(); }
/// <summary>
/// Gets the websocket client config object.
/// </summary>
/// <returns>A reference to the client configuration object.</returns>
const websocket_client_config& config() const { return m_client->config(); }
private:
std::shared_ptr<details::websocket_client_callback_impl> m_client;
};
} // namespace client
} // namespace websockets
} // namespace web
#endif
#endif
/***
* Copyright (C) Microsoft. All rights reserved.
* Licensed under the MIT license. See LICENSE.txt file in the project root for full license information.
*
* =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
*
* Websocket incoming and outgoing message definitions.
*
* =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
****/
#pragma once
#include "cpprest/details/basic_types.h"
#if !defined(CPPREST_EXCLUDE_WEBSOCKETS)
#include "cpprest/asyncrt_utils.h"
#include "cpprest/containerstream.h"
#include "cpprest/streams.h"
#include "cpprest/uri.h"
#include "pplx/pplxtasks.h"
#include <limits>
#include <memory>
namespace web
{
namespace websockets
{
namespace client
{
namespace details
{
class winrt_callback_client;
class wspp_callback_client;
#if defined(__cplusplus_winrt)
ref class ReceiveContext;
#endif
} // namespace details
/// <summary>
/// The different types of websocket message.
/// Text type contains UTF-8 encoded data.
/// Interpretation of Binary type is left to the application.
/// Note: The fragment types and control frames like close, ping, pong are not supported on WinRT.
/// </summary>
enum class websocket_message_type
{
text_message,
binary_message,
close,
ping,
pong
};
/// <summary>
/// Represents an outgoing websocket message
/// </summary>
class websocket_outgoing_message
{
public:
#if !defined(__cplusplus_winrt)
/// <summary>
/// Sets a the outgoing message to be an unsolicited pong message.
/// This is useful when the client side wants to check whether the server is alive.
/// </summary>
void set_pong_message() { this->set_message_pong(); }
#endif
/// <summary>
/// Sets a UTF-8 message as the message body.
/// </summary>
/// <param name="data">UTF-8 String containing body of the message.</param>
void set_utf8_message(std::string&& data)
{
this->set_message(concurrency::streams::container_buffer<std::string>(std::move(data)));
}
/// <summary>
/// Sets a UTF-8 message as the message body.
/// </summary>
/// <param name="data">UTF-8 String containing body of the message.</param>
void set_utf8_message(const std::string& data)
{
this->set_message(concurrency::streams::container_buffer<std::string>(data));
}
/// <summary>
/// Sets a UTF-8 message as the message body.
/// </summary>
/// <param name="istream">casablanca input stream representing the body of the message.</param>
/// <remarks>Upon sending, the entire stream may be buffered to determine the length.</remarks>
void set_utf8_message(const concurrency::streams::istream& istream)
{
this->set_message(istream, SIZE_MAX, websocket_message_type::text_message);
}
/// <summary>
/// Sets a UTF-8 message as the message body.
/// </summary>
/// <param name="istream">casablanca input stream representing the body of the message.</param>
/// <param name="len">number of bytes to send.</param>
void set_utf8_message(const concurrency::streams::istream& istream, size_t len)
{
this->set_message(istream, len, websocket_message_type::text_message);
}
/// <summary>
/// Sets binary data as the message body.
/// </summary>
/// <param name="istream">casablanca input stream representing the body of the message.</param>
/// <param name="len">number of bytes to send.</param>
void set_binary_message(const concurrency::streams::istream& istream, size_t len)
{
this->set_message(istream, len, websocket_message_type::binary_message);
}
/// <summary>
/// Sets binary data as the message body.
/// </summary>
/// <param name="istream">Input stream representing the body of the message.</param>
/// <remarks>Upon sending, the entire stream may be buffered to determine the length.</remarks>
void set_binary_message(const concurrency::streams::istream& istream)
{
this->set_message(istream, SIZE_MAX, websocket_message_type::binary_message);
}
private:
friend class details::winrt_callback_client;
friend class details::wspp_callback_client;
pplx::task_completion_event<void> m_body_sent;
concurrency::streams::streambuf<uint8_t> m_body;
websocket_message_type m_msg_type;
size_t m_length;
void signal_body_sent() const { m_body_sent.set(); }
void signal_body_sent(const std::exception_ptr& e) const { m_body_sent.set_exception(e); }
const pplx::task_completion_event<void>& body_sent() const { return m_body_sent; }
#if !defined(__cplusplus_winrt)
void set_message_pong()
{
concurrency::streams::container_buffer<std::string> buffer("");
m_msg_type = websocket_message_type::pong;
m_length = static_cast<size_t>(buffer.size());
m_body = buffer;
}
#endif
void set_message(const concurrency::streams::container_buffer<std::string>& buffer)
{
m_msg_type = websocket_message_type::text_message;
m_length = static_cast<size_t>(buffer.size());
m_body = buffer;
}
void set_message(const concurrency::streams::istream& istream, size_t len, websocket_message_type msg_type)
{
m_msg_type = msg_type;
m_length = len;
m_body = istream.streambuf();
}
};
/// <summary>
/// Represents an incoming websocket message
/// </summary>
class websocket_incoming_message
{
public:
/// <summary>
/// Extracts the body of the incoming message as a string value, only if the message type is UTF-8.
/// A body can only be extracted once because in some cases an optimization is made where the data is 'moved' out.
/// </summary>
/// <returns>String containing body of the message.</returns>
_ASYNCRTIMP pplx::task<std::string> extract_string() const;
/// <summary>
/// Produces a stream which the caller may use to retrieve body from an incoming message.
/// Can be used for both UTF-8 (text) and binary message types.
/// </summary>
/// <returns>A readable, open asynchronous stream.</returns>
/// <remarks>
/// This cannot be used in conjunction with any other means of getting the body of the message.
/// </remarks>
concurrency::streams::istream body() const
{
auto to_uint8_t_stream =
[](const concurrency::streams::streambuf<uint8_t>& buf) -> concurrency::streams::istream {
return buf.create_istream();
};
return to_uint8_t_stream(m_body);
}
/// <summary>
/// Returns the length of the received message.
/// </summary>
size_t length() const { return static_cast<size_t>(m_body.size()); }
/// <summary>
/// Returns the type of the received message.
/// </summary>
CASABLANCA_DEPRECATED("Incorrectly spelled API, use message_type() instead.")
websocket_message_type messge_type() const { return m_msg_type; }
/// <summary>
/// Returns the type of the received message, either string or binary.
/// </summary>
/// <returns>websocket_message_type</returns>
websocket_message_type message_type() const { return m_msg_type; }
private:
friend class details::winrt_callback_client;
friend class details::wspp_callback_client;
#if defined(__cplusplus_winrt)
friend ref class details::ReceiveContext;
#endif
// Store message body in a container buffer backed by a string.
// Allows for optimization in the string message cases.
concurrency::streams::container_buffer<std::string> m_body;
websocket_message_type m_msg_type;
};
} // namespace client
} // namespace websockets
} // namespace web
#endif
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment