ClickHouse/dbms/src/IO/ReadWriteBufferFromHTTP.h

144 lines
4.7 KiB
C++
Raw Normal View History

2016-11-19 00:07:58 +00:00
#pragma once
#include <functional>
#include <Core/Types.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/HTTPCommon.h>
#include <IO/ReadBuffer.h>
#include <IO/ReadBufferFromIStream.h>
#include <Poco/Any.h>
#include <Poco/Net/HTTPBasicCredentials.h>
2016-11-19 00:07:58 +00:00
#include <Poco/Net/HTTPClientSession.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Net/HTTPResponse.h>
2016-11-24 01:01:11 +00:00
#include <Poco/URI.h>
#include <Poco/Version.h>
#include <Common/DNSResolver.h>
#include <Common/config.h>
#include <common/logger_useful.h>
2016-11-19 00:07:58 +00:00
#define DEFAULT_HTTP_READ_BUFFER_TIMEOUT 1800
#define DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT 1
2017-02-07 06:18:16 +00:00
2016-11-19 00:07:58 +00:00
namespace DB
{
/** Perform HTTP POST request and provide response to read.
*/
namespace detail
2016-11-19 00:07:58 +00:00
{
template <typename SessionPtr>
class ReadWriteBufferFromHTTPBase : public ReadBuffer
{
protected:
Poco::URI uri;
std::string method;
2016-11-19 00:07:58 +00:00
SessionPtr session;
std::istream * istr; /// owned by session
std::unique_ptr<ReadBuffer> impl;
2016-11-19 00:07:58 +00:00
public:
using OutStreamCallback = std::function<void(std::ostream &)>;
explicit ReadWriteBufferFromHTTPBase(SessionPtr session_,
2019-08-03 11:02:40 +00:00
Poco::URI uri_,
const std::string & method_ = {},
OutStreamCallback out_stream_callback = {},
const Poco::Net::HTTPBasicCredentials & credentials = {},
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE)
: ReadBuffer(nullptr, 0)
2019-08-03 11:02:40 +00:00
, uri {uri_}
, method {!method_.empty() ? method_ : out_stream_callback ? Poco::Net::HTTPRequest::HTTP_POST : Poco::Net::HTTPRequest::HTTP_GET}
, session {session_}
{
// With empty path poco will send "POST HTTP/1.1" its bug.
if (uri.getPath().empty())
uri.setPath("/");
Poco::Net::HTTPRequest request(method, uri.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1);
request.setHost(uri.getHost()); // use original, not resolved host name in header
if (out_stream_callback)
request.setChunkedTransferEncoding(true);
if (!credentials.getUsername().empty())
credentials.authenticate(request);
Poco::Net::HTTPResponse response;
LOG_TRACE((&Logger::get("ReadWriteBufferFromHTTP")), "Sending request to " << uri.toString());
try
{
auto & stream_out = session->sendRequest(request);
if (out_stream_callback)
out_stream_callback(stream_out);
2016-11-25 00:16:20 +00:00
istr = receiveResponse(*session, request, response);
impl = std::make_unique<ReadBufferFromIStream>(*istr, buffer_size_);
}
catch (const Poco::Exception & e)
{
/// We use session data storage as storage for exception text
/// Depend on it we can deduce to reconnect session or reresolve session host
session->attachSessionData(e.message());
throw;
}
}
bool nextImpl() override
{
if (!impl->next())
return false;
internal_buffer = impl->buffer();
working_buffer = internal_buffer;
return true;
}
};
}
class ReadWriteBufferFromHTTP : public detail::ReadWriteBufferFromHTTPBase<HTTPSessionPtr>
{
using Parent = detail::ReadWriteBufferFromHTTPBase<HTTPSessionPtr>;
public:
explicit ReadWriteBufferFromHTTP(Poco::URI uri_,
const std::string & method_ = {},
OutStreamCallback out_stream_callback = {},
const ConnectionTimeouts & timeouts = {},
const Poco::Net::HTTPBasicCredentials & credentials = {},
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE)
: Parent(makeHTTPSession(uri_, timeouts), uri_, method_, out_stream_callback, credentials, buffer_size_)
{
}
};
class PooledReadWriteBufferFromHTTP : public detail::ReadWriteBufferFromHTTPBase<PooledHTTPSessionPtr>
{
using Parent = detail::ReadWriteBufferFromHTTPBase<PooledHTTPSessionPtr>;
2016-11-19 00:07:58 +00:00
public:
explicit PooledReadWriteBufferFromHTTP(Poco::URI uri_,
const std::string & method_ = {},
OutStreamCallback out_stream_callback = {},
const ConnectionTimeouts & timeouts = {},
const Poco::Net::HTTPBasicCredentials & credentials = {},
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
2018-11-16 13:33:43 +00:00
size_t max_connections_per_endpoint = DEFAULT_COUNT_OF_HTTP_CONNECTIONS_PER_ENDPOINT)
: Parent(makePooledHTTPSession(uri_, timeouts, max_connections_per_endpoint),
uri_,
method_,
out_stream_callback,
credentials,
buffer_size_)
{
}
2016-11-19 00:07:58 +00:00
};
2016-11-19 00:07:58 +00:00
}