ClickHouse/dbms/src/IO/InterserverWriteBuffer.cpp

110 lines
2.6 KiB
C++
Raw Normal View History

#include <IO/InterserverWriteBuffer.h>
#include <IO/WriteBufferFromOStream.h>
2016-03-01 17:47:53 +00:00
2017-02-07 06:18:16 +00:00
#include <Poco/Version.h>
2016-03-01 17:47:53 +00:00
#include <Poco/URI.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Net/HTTPResponse.h>
#include <common/logger_useful.h>
2017-02-07 06:18:16 +00:00
2016-03-01 17:47:53 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_WRITE_TO_OSTREAM;
extern const int RECEIVED_ERROR_FROM_REMOTE_IO_SERVER;
2016-03-01 17:47:53 +00:00
}
InterserverWriteBuffer::InterserverWriteBuffer(const std::string & host_, int port_,
const std::string & endpoint_,
const std::string & path_,
bool compress_,
size_t buffer_size_,
const Poco::Timespan & connection_timeout,
const Poco::Timespan & send_timeout,
const Poco::Timespan & receive_timeout)
: WriteBuffer(nullptr, 0), host(host_), port(port_), path(path_)
2016-03-01 17:47:53 +00:00
{
std::string encoded_path;
Poco::URI::encode(path, "&#", encoded_path);
2016-03-01 17:47:53 +00:00
std::string encoded_endpoint;
Poco::URI::encode(endpoint_, "&#", encoded_endpoint);
2016-03-01 17:47:53 +00:00
std::string compress_str = compress_ ? "true" : "false";
std::string encoded_compress;
Poco::URI::encode(compress_str, "&#", encoded_compress);
2016-03-01 17:47:53 +00:00
std::stringstream uri;
uri << "http://" << host << ":" << port
<< "/?endpoint=" << encoded_endpoint
<< "&compress=" << encoded_compress
<< "&path=" << encoded_path;
2016-03-01 17:47:53 +00:00
std::string uri_str = Poco::URI(uri.str()).getPathAndQuery();
2016-03-01 17:47:53 +00:00
session.setHost(host);
session.setPort(port);
session.setKeepAlive(true);
2016-03-01 17:47:53 +00:00
/// set the timeout
#if POCO_CLICKHOUSE_PATCH || POCO_VERSION >= 0x02000000
session.setTimeout(connection_timeout, send_timeout, receive_timeout);
#else
session.setTimeout(connection_timeout);
#endif
2016-03-01 17:47:53 +00:00
Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_POST, uri_str, Poco::Net::HTTPRequest::HTTP_1_1);
2016-03-01 17:47:53 +00:00
request.setChunkedTransferEncoding(true);
2016-03-01 17:47:53 +00:00
ostr = &session.sendRequest(request);
impl = std::make_unique<WriteBufferFromOStream>(*ostr, buffer_size_);
set(impl->buffer().begin(), impl->buffer().size());
2016-03-01 17:47:53 +00:00
}
InterserverWriteBuffer::~InterserverWriteBuffer()
{
try
{
finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
2016-03-01 17:47:53 +00:00
}
void InterserverWriteBuffer::nextImpl()
{
if (!offset() || finalized)
return;
2016-03-01 17:47:53 +00:00
/// For correct work with AsynchronousWriteBuffer, which replaces buffers.
impl->set(buffer().begin(), buffer().size());
2016-03-01 17:47:53 +00:00
impl->position() = pos;
2016-03-01 17:47:53 +00:00
impl->next();
2016-03-01 17:47:53 +00:00
}
void InterserverWriteBuffer::finalize()
{
if (finalized)
return;
2016-03-01 17:47:53 +00:00
next();
2016-03-01 17:47:53 +00:00
finalized = true;
2016-03-01 17:47:53 +00:00
}
void InterserverWriteBuffer::cancel()
{
finalized = true;
2016-03-01 17:47:53 +00:00
}
}