ClickHouse/dbms/src/IO/ReadWriteBufferFromHTTP.cpp

100 lines
2.8 KiB
C++
Raw Normal View History

2016-11-19 00:07:58 +00:00
#include <DB/IO/ReadWriteBufferFromHTTP.h>
2017-02-07 06:18:16 +00:00
#include <Poco/Version.h>
2016-11-19 00:07:58 +00:00
#include <Poco/URI.h>
#include <Poco/Net/DNS.h>
2016-11-24 19:57:24 +00:00
#include <Poco/Net/HTTPRequest.h>
2016-11-25 00:16:20 +00:00
#include <Poco/Net/HTTPResponse.h>
#include <Poco/Net/HTTPSClientSession.h>
2016-11-19 00:07:58 +00:00
#include <DB/IO/ReadBufferFromIStream.h>
#include <DB/Common/SimpleCache.h>
#include <common/logger_useful.h>
namespace DB
{
// copypaste from ReadBufferFromHTTP.cpp
namespace ErrorCodes
{
extern const int RECEIVED_ERROR_FROM_REMOTE_IO_SERVER;
}
static Poco::Net::IPAddress resolveHostImpl(const String & host)
{
return Poco::Net::DNS::resolveOne(host);
}
static Poco::Net::IPAddress resolveHost(const String & host)
{
static SimpleCache<decltype(resolveHostImpl), &resolveHostImpl> cache;
return cache(host);
}
// ==========
ReadWriteBufferFromHTTP::ReadWriteBufferFromHTTP(
2016-11-24 01:01:11 +00:00
const Poco::URI & uri,
const std::string & method,
2016-11-25 00:16:20 +00:00
OutStreamCallback out_stream_callback,
2016-11-19 00:07:58 +00:00
size_t buffer_size_,
2016-11-24 01:01:11 +00:00
const HTTPTimeouts & timeouts
2016-11-19 00:07:58 +00:00
) :
ReadBuffer(nullptr, 0),
2016-11-24 01:01:11 +00:00
uri{uri},
2016-11-25 00:16:20 +00:00
method{!method.empty() ? method : out_stream_callback ? Poco::Net::HTTPRequest::HTTP_POST : Poco::Net::HTTPRequest::HTTP_GET},
timeouts{timeouts},
is_ssl{uri.getScheme() == "https"},
session{ std::unique_ptr<Poco::Net::HTTPClientSession>(is_ssl ? new Poco::Net::HTTPSClientSession : new Poco::Net::HTTPClientSession) }
2016-11-19 00:07:58 +00:00
{
session->setHost(resolveHost(uri.getHost()).toString()); /// Cache DNS forever (until server restart)
session->setPort(uri.getPort());
2016-11-19 00:07:58 +00:00
#if POCO_CLICKHOUSE_PATCH || POCO_VERSION >= 0x02000000
session->setTimeout(timeouts.connection_timeout, timeouts.send_timeout, timeouts.receive_timeout);
#else
session->setTimeout(timeouts.connection_timeout);
#endif
2016-11-19 00:07:58 +00:00
2016-11-25 00:16:20 +00:00
Poco::Net::HTTPRequest request(method, uri.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1);
request.setHost(uri.getHost()); // use original, not resolved host name in header
2016-11-25 00:16:20 +00:00
if (out_stream_callback)
request.setChunkedTransferEncoding(true);
2016-11-19 00:07:58 +00:00
2016-11-25 00:16:20 +00:00
Poco::Net::HTTPResponse response;
2016-11-19 00:07:58 +00:00
2016-11-25 00:16:20 +00:00
LOG_TRACE((&Logger::get("ReadWriteBufferFromHTTP")), "Sending request to " << uri.toString());
2016-11-24 19:57:24 +00:00
auto & stream_out = session->sendRequest(request);
2016-11-22 15:03:54 +00:00
2016-11-25 00:16:20 +00:00
if (out_stream_callback)
out_stream_callback(stream_out);
2016-11-22 15:03:54 +00:00
istr = &session->receiveResponse(response);
2016-11-19 00:07:58 +00:00
auto status = response.getStatus();
if (status != Poco::Net::HTTPResponse::HTTP_OK)
{
std::stringstream error_message;
2016-11-24 01:01:11 +00:00
error_message << "Received error from remote server " << uri.toString() << ". HTTP status code: "
2016-11-19 00:07:58 +00:00
<< status << ", body: " << istr->rdbuf();
throw Exception(error_message.str(), ErrorCodes::RECEIVED_ERROR_FROM_REMOTE_IO_SERVER);
}
impl = std::make_unique<ReadBufferFromIStream>(*istr, buffer_size_);
}
bool ReadWriteBufferFromHTTP::nextImpl()
{
if (!impl->next())
return false;
internal_buffer = impl->buffer();
working_buffer = internal_buffer;
return true;
}
}