Merge pull request #6914 from maqroll/master

+ Redirect URL Storage
This commit is contained in:
alexey-milovidov 2019-09-24 02:07:06 +03:00 committed by GitHub
commit e2cc551b22
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 233 additions and 48 deletions

View File

@ -457,6 +457,7 @@ namespace ErrorCodes
extern const int UNKNOWN_PROTOCOL = 480;
extern const int PATH_ACCESS_DENIED = 481;
extern const int DICTIONARY_ACCESS_DENIED = 482;
extern const int TOO_MANY_REDIRECTS = 483;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -170,6 +170,8 @@ struct Settings : public SettingsCollection<Settings>
\
M(SettingBool, add_http_cors_header, false, "Write add http CORS header.") \
\
M(SettingUInt64, max_http_get_redirects, 0, "Max number of http GET redirects hops allowed. Make sure additional security measures are in place to prevent a malicious server to redirect your requests to unexpected services.") \
\
M(SettingBool, input_format_skip_unknown_fields, false, "Skip columns with unknown names from input data (it works for JSONEachRow, CSVWithNames, TSVWithNames and TSKV formats).") \
M(SettingBool, input_format_with_names_use_header, false, "For TSVWithNames and CSVWithNames input formats this controls whether format parser is to assume that column data appear in the input exactly as they are specified in the header.") \
M(SettingBool, input_format_import_nested_json, false, "Map nested JSON data to nested tables (it works for JSONEachRow format).") \

View File

@ -40,6 +40,7 @@ namespace ErrorCodes
extern const int RECEIVED_ERROR_TOO_MANY_REQUESTS;
extern const int FEATURE_IS_NOT_ENABLED_AT_BUILD_TIME;
extern const int UNSUPPORTED_URI_SCHEME;
extern const int TOO_MANY_REDIRECTS;
}
@ -216,20 +217,21 @@ PooledHTTPSessionPtr makePooledHTTPSession(const Poco::URI & uri, const Connecti
return HTTPSessionPool::instance().getSession(uri, timeouts, per_endpoint_pool_size);
}
bool isRedirect(const Poco::Net::HTTPResponse::HTTPStatus status) { return status == Poco::Net::HTTPResponse::HTTP_MOVED_PERMANENTLY || status == Poco::Net::HTTPResponse::HTTP_FOUND || status == Poco::Net::HTTPResponse::HTTP_SEE_OTHER || status == Poco::Net::HTTPResponse::HTTP_TEMPORARY_REDIRECT; }
std::istream * receiveResponse(
Poco::Net::HTTPClientSession & session, const Poco::Net::HTTPRequest & request, Poco::Net::HTTPResponse & response)
Poco::Net::HTTPClientSession & session, const Poco::Net::HTTPRequest & request, Poco::Net::HTTPResponse & response, const bool allow_redirects)
{
auto & istr = session.receiveResponse(response);
assertResponseIsOk(request, response, istr);
assertResponseIsOk(request, response, istr, allow_redirects);
return &istr;
}
void assertResponseIsOk(const Poco::Net::HTTPRequest & request, Poco::Net::HTTPResponse & response, std::istream & istr)
void assertResponseIsOk(const Poco::Net::HTTPRequest & request, Poco::Net::HTTPResponse & response, std::istream & istr, const bool allow_redirects)
{
auto status = response.getStatus();
if (status != Poco::Net::HTTPResponse::HTTP_OK)
if (!(status == Poco::Net::HTTPResponse::HTTP_OK || (isRedirect(status) && allow_redirects)))
{
std::stringstream error_message;
error_message << "Received error from remote server " << request.getURI() << ". HTTP status code: " << status << " "

View File

@ -9,7 +9,7 @@
#include <Poco/Net/HTTPResponse.h>
#include <Poco/URI.h>
#include <Common/PoolBase.h>
#include <Poco/URIStreamFactory.h>
#include <IO/ConnectionTimeouts.h>
@ -50,13 +50,14 @@ HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts &
/// As previous method creates session, but tooks it from pool
PooledHTTPSessionPtr makePooledHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, size_t per_endpoint_pool_size);
bool isRedirect(const Poco::Net::HTTPResponse::HTTPStatus status);
/** Used to receive response (response headers and possibly body)
* after sending data (request headers and possibly body).
* Throws exception in case of non HTTP_OK (200) response code.
* Returned istream lives in 'session' object.
*/
std::istream * receiveResponse(
Poco::Net::HTTPClientSession & session, const Poco::Net::HTTPRequest & request, Poco::Net::HTTPResponse & response);
void assertResponseIsOk(const Poco::Net::HTTPRequest & request, Poco::Net::HTTPResponse & response, std::istream & istr);
Poco::Net::HTTPClientSession & session, const Poco::Net::HTTPRequest & request, Poco::Net::HTTPResponse & response, bool allow_redirects);
void assertResponseIsOk(const Poco::Net::HTTPRequest & request, Poco::Net::HTTPResponse & response, std::istream & istr, const bool allow_redirects = false);
}

View File

@ -16,6 +16,7 @@
#include <Common/DNSResolver.h>
#include <Common/config.h>
#include <common/logger_useful.h>
#include <Poco/URIStreamFactory.h>
#define DEFAULT_HTTP_READ_BUFFER_TIMEOUT 1800
@ -26,41 +27,84 @@ namespace DB
/** Perform HTTP POST request and provide response to read.
*/
namespace ErrorCodes
{
extern const int TOO_MANY_REDIRECTS;
}
template <typename SessionPtr>
class UpdatableSessionBase
{
protected:
SessionPtr session;
UInt64 redirects { 0 };
Poco::URI initial_uri;
const ConnectionTimeouts & timeouts;
DB::SettingUInt64 max_redirects;
public:
virtual void buildNewSession(const Poco::URI & uri) = 0;
explicit UpdatableSessionBase(const Poco::URI uri,
const ConnectionTimeouts & timeouts_,
SettingUInt64 max_redirects_)
: initial_uri { uri }
, timeouts { timeouts_ }
, max_redirects { max_redirects_ }
{
}
SessionPtr getSession()
{
return session;
}
void updateSession(const Poco::URI & uri)
{
++redirects;
if (redirects <= max_redirects)
{
buildNewSession(uri);
}
else
{
std::stringstream error_message;
error_message << "Too many redirects while trying to access " << initial_uri.toString();
throw Exception(error_message.str(), ErrorCodes::TOO_MANY_REDIRECTS);
}
}
virtual ~UpdatableSessionBase()
{
}
};
namespace detail
{
template <typename SessionPtr>
template <typename UpdatableSessionPtr>
class ReadWriteBufferFromHTTPBase : public ReadBuffer
{
protected:
Poco::URI uri;
std::string method;
SessionPtr session;
UpdatableSessionPtr session;
std::istream * istr; /// owned by session
std::unique_ptr<ReadBuffer> impl;
std::function<void(std::ostream &)> out_stream_callback;
const Poco::Net::HTTPBasicCredentials & credentials;
std::vector<Poco::Net::HTTPCookie> cookies;
public:
using OutStreamCallback = std::function<void(std::ostream &)>;
explicit ReadWriteBufferFromHTTPBase(
SessionPtr session_,
Poco::URI uri_,
const std::string & method_ = {},
OutStreamCallback out_stream_callback = {},
const Poco::Net::HTTPBasicCredentials & credentials = {},
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE)
: ReadBuffer(nullptr, 0)
, uri{uri_}
, method{!method_.empty() ? method_ : out_stream_callback ? Poco::Net::HTTPRequest::HTTP_POST : Poco::Net::HTTPRequest::HTTP_GET}
, session{session_}
std::istream * call(const Poco::URI uri_, Poco::Net::HTTPResponse & response)
{
// With empty path poco will send "POST HTTP/1.1" its bug.
if (uri.getPath().empty())
uri.setPath("/");
Poco::Net::HTTPRequest request(method, uri.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1);
request.setHost(uri.getHost()); // use original, not resolved host name in header
Poco::Net::HTTPRequest request(method, uri_.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1);
request.setHost(uri_.getHost()); // use original, not resolved host name in header
if (out_stream_callback)
request.setChunkedTransferEncoding(true);
@ -68,27 +112,71 @@ namespace detail
if (!credentials.getUsername().empty())
credentials.authenticate(request);
Poco::Net::HTTPResponse response;
LOG_TRACE((&Logger::get("ReadWriteBufferFromHTTP")), "Sending request to " << uri.toString());
auto sess = session->getSession();
try
{
auto & stream_out = session->sendRequest(request);
auto & stream_out = sess->sendRequest(request);
if (out_stream_callback)
out_stream_callback(stream_out);
istr = receiveResponse(*session, request, response);
istr = receiveResponse(*sess, request, response, true);
response.getCookies(cookies);
return istr;
}
catch (const Poco::Exception & e)
{
/// We use session data storage as storage for exception text
/// Depend on it we can deduce to reconnect session or reresolve session host
sess->attachSessionData(e.message());
throw;
}
}
public:
using OutStreamCallback = std::function<void(std::ostream &)>;
explicit ReadWriteBufferFromHTTPBase(UpdatableSessionPtr session_,
Poco::URI uri_,
const std::string & method_ = {},
OutStreamCallback out_stream_callback_ = {},
const Poco::Net::HTTPBasicCredentials & credentials_ = {},
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE)
: ReadBuffer(nullptr, 0)
, uri {uri_}
, method {!method_.empty() ? method_ : out_stream_callback_ ? Poco::Net::HTTPRequest::HTTP_POST : Poco::Net::HTTPRequest::HTTP_GET}
, session {session_}
, out_stream_callback {out_stream_callback_}
, credentials {credentials_}
{
Poco::Net::HTTPResponse response;
istr = call(uri, response);
while (isRedirect(response.getStatus()))
{
Poco::URI uri_redirect(response.get("Location"));
session->updateSession(uri_redirect);
istr = call(uri_redirect,response);
}
try
{
impl = std::make_unique<ReadBufferFromIStream>(*istr, buffer_size_);
}
catch (const Poco::Exception & e)
{
/// We use session data storage as storage for exception text
/// Depend on it we can deduce to reconnect session or reresolve session host
session->attachSessionData(e.message());
auto sess = session->getSession();
sess->attachSessionData(e.message());
throw;
}
}
@ -112,42 +200,88 @@ namespace detail
};
}
class ReadWriteBufferFromHTTP : public detail::ReadWriteBufferFromHTTPBase<HTTPSessionPtr>
class UpdatableSession : public UpdatableSessionBase<HTTPSessionPtr>
{
using Parent = detail::ReadWriteBufferFromHTTPBase<HTTPSessionPtr>;
using Parent = UpdatableSessionBase<HTTPSessionPtr>;
public:
explicit UpdatableSession(const Poco::URI uri,
const ConnectionTimeouts & timeouts_,
const SettingUInt64 max_redirects_)
: Parent(uri, timeouts_, max_redirects_)
{
session = makeHTTPSession(initial_uri, timeouts);
}
void buildNewSession(const Poco::URI & uri) override
{
session = makeHTTPSession(uri, timeouts);
}
};
class ReadWriteBufferFromHTTP : public detail::ReadWriteBufferFromHTTPBase<std::shared_ptr<UpdatableSession>>
{
using Parent = detail::ReadWriteBufferFromHTTPBase<std::shared_ptr<UpdatableSession>>;
public:
explicit ReadWriteBufferFromHTTP(Poco::URI uri_,
const std::string & method_ = {},
OutStreamCallback out_stream_callback = {},
OutStreamCallback out_stream_callback_ = {},
const ConnectionTimeouts & timeouts = {},
const Poco::Net::HTTPBasicCredentials & credentials = {},
const DB::SettingUInt64 max_redirects = 0,
const Poco::Net::HTTPBasicCredentials & credentials_ = {},
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE)
: Parent(makeHTTPSession(uri_, timeouts), uri_, method_, out_stream_callback, credentials, buffer_size_)
: Parent(std::make_shared<UpdatableSession>(uri_, timeouts, max_redirects), uri_, method_, out_stream_callback_, credentials_, buffer_size_)
{
}
};
class PooledReadWriteBufferFromHTTP : public detail::ReadWriteBufferFromHTTPBase<PooledHTTPSessionPtr>
class UpdatablePooledSession : public UpdatableSessionBase<PooledHTTPSessionPtr>
{
using Parent = detail::ReadWriteBufferFromHTTPBase<PooledHTTPSessionPtr>;
using Parent = UpdatableSessionBase<PooledHTTPSessionPtr>;
private:
size_t per_endpoint_pool_size;
public:
explicit UpdatablePooledSession(const Poco::URI uri,
const ConnectionTimeouts & timeouts_,
const SettingUInt64 max_redirects_,
size_t per_endpoint_pool_size_)
: Parent(uri, timeouts_, max_redirects_)
, per_endpoint_pool_size { per_endpoint_pool_size_ }
{
session = makePooledHTTPSession(initial_uri, timeouts, per_endpoint_pool_size);
}
void buildNewSession(const Poco::URI & uri) override
{
session = makePooledHTTPSession(uri, timeouts, per_endpoint_pool_size);
}
};
class PooledReadWriteBufferFromHTTP : public detail::ReadWriteBufferFromHTTPBase<std::shared_ptr<UpdatablePooledSession>>
{
using Parent = detail::ReadWriteBufferFromHTTPBase<std::shared_ptr<UpdatablePooledSession>>;
public:
explicit PooledReadWriteBufferFromHTTP(Poco::URI uri_,
const std::string & method_ = {},
OutStreamCallback out_stream_callback = {},
const ConnectionTimeouts & timeouts = {},
const Poco::Net::HTTPBasicCredentials & credentials = {},
OutStreamCallback out_stream_callback_ = {},
const ConnectionTimeouts & timeouts_ = {},
const Poco::Net::HTTPBasicCredentials & credentials_ = {},
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
const DB::SettingUInt64 max_redirects = 0,
size_t max_connections_per_endpoint = DEFAULT_COUNT_OF_HTTP_CONNECTIONS_PER_ENDPOINT)
: Parent(makePooledHTTPSession(uri_, timeouts, max_connections_per_endpoint),
: Parent(std::make_shared<UpdatablePooledSession>(uri_, timeouts_, max_redirects, max_connections_per_endpoint),
uri_,
method_,
out_stream_callback,
credentials,
out_stream_callback_,
credentials_,
buffer_size_)
{
}
};
}

View File

@ -22,7 +22,7 @@ WriteBufferFromHTTP::WriteBufferFromHTTP(
void WriteBufferFromHTTP::finalize()
{
receiveResponse(*session, request, response);
receiveResponse(*session, request, response, false);
/// TODO: Response body is ignored.
}

View File

@ -219,6 +219,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
timeouts,
creds,
DBMS_DEFAULT_BUFFER_SIZE,
0, /* no redirects */
data_settings->replicated_max_parallel_fetches_for_host
};

View File

@ -54,8 +54,7 @@ namespace
const ConnectionTimeouts & timeouts)
: name(name_)
{
read_buf = std::make_unique<ReadWriteBufferFromHTTP>(uri, method, callback, timeouts);
read_buf = std::make_unique<ReadWriteBufferFromHTTP>(uri, method, callback, timeouts, context.getSettingsRef().max_http_get_redirects);
reader = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size);
}

View File

@ -0,0 +1,45 @@
import pytest
from helpers.hdfs_api import HDFSApi
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', with_zookeeper=False, with_hdfs=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_url_without_redirect(started_cluster):
hdfs_api = HDFSApi("root")
hdfs_api.write_data("/simple_storage", "1\tMark\t72.53\n")
assert hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
# access datanode port directly
node1.query("create table WebHDFSStorage (id UInt32, name String, weight Float64) ENGINE = URL('http://hdfs1:50075/webhdfs/v1/simple_storage?op=OPEN&namenoderpcaddress=hdfs1:9000&offset=0', 'TSV')")
assert node1.query("select * from WebHDFSStorage") == "1\tMark\t72.53\n"
def test_url_with_redirect_not_allowed(started_cluster):
hdfs_api = HDFSApi("root")
hdfs_api.write_data("/simple_storage", "1\tMark\t72.53\n")
assert hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
# access proxy port without allowing redirects
node1.query("create table WebHDFSStorageWithoutRedirect (id UInt32, name String, weight Float64) ENGINE = URL('http://hdfs1:50070/webhdfs/v1/simple_storage?op=OPEN&namenoderpcaddress=hdfs1:9000&offset=0', 'TSV')")
with pytest.raises(Exception):
assert node1.query("select * from WebHDFSStorageWithoutRedirect") == "1\tMark\t72.53\n"
def test_url_with_redirect_allowed(started_cluster):
hdfs_api = HDFSApi("root")
hdfs_api.write_data("/simple_storage", "1\tMark\t72.53\n")
assert hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
# access proxy port with allowing redirects
# http://localhost:50070/webhdfs/v1/b?op=OPEN&namenoderpcaddress=hdfs1:9000&offset=0
node1.query("create table WebHDFSStorageWithRedirect (id UInt32, name String, weight Float64) ENGINE = URL('http://hdfs1:50070/webhdfs/v1/simple_storage?op=OPEN&namenoderpcaddress=hdfs1:9000&offset=0', 'TSV')")
assert node1.query("SET max_http_get_redirects=1; select * from WebHDFSStorageWithRedirect") == "1\tMark\t72.53\n"