mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-02 12:32:04 +00:00
Merge 9a9f0161bb
into 5e7a90f85f
This commit is contained in:
commit
4c08c7c1da
@ -295,8 +295,13 @@ private:
|
|||||||
String getTarget() const
|
String getTarget() const
|
||||||
{
|
{
|
||||||
if (!Session::getProxyConfig().host.empty())
|
if (!Session::getProxyConfig().host.empty())
|
||||||
return fmt::format("{} over proxy {}", Session::getHost(), Session::getProxyConfig().host);
|
return fmt::format("{}:{} over proxy {}",
|
||||||
return Session::getHost();
|
Session::getHost(),
|
||||||
|
Session::getPort(),
|
||||||
|
Session::getProxyConfig().host);
|
||||||
|
return fmt::format("{}:{}",
|
||||||
|
Session::getHost(),
|
||||||
|
Session::getPort());
|
||||||
}
|
}
|
||||||
|
|
||||||
void flushRequest() override
|
void flushRequest() override
|
||||||
@ -472,7 +477,8 @@ public:
|
|||||||
String getTarget() const
|
String getTarget() const
|
||||||
{
|
{
|
||||||
if (!proxy_configuration.isEmpty())
|
if (!proxy_configuration.isEmpty())
|
||||||
return fmt::format("{} over proxy {}", host, proxy_configuration.host);
|
return fmt::format("{} over proxy {}",
|
||||||
|
host, proxy_configuration.host);
|
||||||
return host;
|
return host;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -97,7 +97,7 @@ size_t ReadWriteBufferFromHTTP::getOffset() const
|
|||||||
|
|
||||||
void ReadWriteBufferFromHTTP::prepareRequest(Poco::Net::HTTPRequest & request, std::optional<HTTPRange> range) const
|
void ReadWriteBufferFromHTTP::prepareRequest(Poco::Net::HTTPRequest & request, std::optional<HTTPRange> range) const
|
||||||
{
|
{
|
||||||
request.setHost(initial_uri.getHost()); // use original, not resolved host name in header
|
request.setHost(current_uri.getHost());
|
||||||
|
|
||||||
if (out_stream_callback)
|
if (out_stream_callback)
|
||||||
request.setChunkedTransferEncoding(true);
|
request.setChunkedTransferEncoding(true);
|
||||||
@ -237,15 +237,15 @@ ReadWriteBufferFromHTTP::ReadWriteBufferFromHTTP(
|
|||||||
}
|
}
|
||||||
|
|
||||||
ReadWriteBufferFromHTTP::CallResult ReadWriteBufferFromHTTP::callImpl(
|
ReadWriteBufferFromHTTP::CallResult ReadWriteBufferFromHTTP::callImpl(
|
||||||
Poco::Net::HTTPResponse & response, const Poco::URI & uri_, const std::string & method_, const std::optional<HTTPRange> & range, bool allow_redirects) const
|
Poco::Net::HTTPResponse & response, const std::string & method_, const std::optional<HTTPRange> & range, bool allow_redirects) const
|
||||||
{
|
{
|
||||||
if (remote_host_filter)
|
if (remote_host_filter)
|
||||||
remote_host_filter->checkURL(uri_);
|
remote_host_filter->checkURL(current_uri);
|
||||||
|
|
||||||
Poco::Net::HTTPRequest request(method_, uri_.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1);
|
Poco::Net::HTTPRequest request(method_, current_uri.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1);
|
||||||
prepareRequest(request, range);
|
prepareRequest(request, range);
|
||||||
|
|
||||||
auto session = makeHTTPSession(connection_group, uri_, timeouts, proxy_config);
|
auto session = makeHTTPSession(connection_group, current_uri, timeouts, proxy_config);
|
||||||
|
|
||||||
ProfileEvents::increment(ProfileEvents::ReadWriteBufferFromHTTPRequestsSent);
|
ProfileEvents::increment(ProfileEvents::ReadWriteBufferFromHTTPRequestsSent);
|
||||||
|
|
||||||
@ -263,7 +263,7 @@ ReadWriteBufferFromHTTP::CallResult ReadWriteBufferFromHTTP::callImpl(
|
|||||||
ReadWriteBufferFromHTTP::CallResult ReadWriteBufferFromHTTP::callWithRedirects(
|
ReadWriteBufferFromHTTP::CallResult ReadWriteBufferFromHTTP::callWithRedirects(
|
||||||
Poco::Net::HTTPResponse & response, const String & method_, const std::optional<HTTPRange> & range)
|
Poco::Net::HTTPResponse & response, const String & method_, const std::optional<HTTPRange> & range)
|
||||||
{
|
{
|
||||||
auto result = callImpl(response, current_uri, method_, range, true);
|
auto result = callImpl(response, method_, range, true);
|
||||||
|
|
||||||
while (isRedirect(response.getStatus()))
|
while (isRedirect(response.getStatus()))
|
||||||
{
|
{
|
||||||
@ -279,8 +279,7 @@ ReadWriteBufferFromHTTP::CallResult ReadWriteBufferFromHTTP::callWithRedirects(
|
|||||||
initial_uri.toString(), max_redirects ? "increase the allowed maximum number of" : "allow");
|
initial_uri.toString(), max_redirects ? "increase the allowed maximum number of" : "allow");
|
||||||
|
|
||||||
current_uri = uri_redirect;
|
current_uri = uri_redirect;
|
||||||
|
result = callImpl(response, method_, range, true);
|
||||||
result = callImpl(response, uri_redirect, method_, range, true);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
@ -347,9 +346,11 @@ void ReadWriteBufferFromHTTP::doWithRetries(std::function<void()> && callable,
|
|||||||
{
|
{
|
||||||
if (!mute_logging)
|
if (!mute_logging)
|
||||||
LOG_ERROR(log,
|
LOG_ERROR(log,
|
||||||
"Failed to make request to '{}'. Error: '{}'. "
|
"Failed to make request to '{}'{}. "
|
||||||
|
"Error: '{}'. "
|
||||||
"Failed at try {}/{}.",
|
"Failed at try {}/{}.",
|
||||||
initial_uri.toString(), error_message,
|
initial_uri.toString(), current_uri == initial_uri ? String() : fmt::format(" redirect to '{}'", current_uri.toString()),
|
||||||
|
error_message,
|
||||||
attempt, read_settings.http_max_tries);
|
attempt, read_settings.http_max_tries);
|
||||||
|
|
||||||
std::rethrow_exception(exception);
|
std::rethrow_exception(exception);
|
||||||
@ -361,10 +362,12 @@ void ReadWriteBufferFromHTTP::doWithRetries(std::function<void()> && callable,
|
|||||||
|
|
||||||
if (!mute_logging)
|
if (!mute_logging)
|
||||||
LOG_INFO(log,
|
LOG_INFO(log,
|
||||||
"Failed to make request to `{}`. Error: {}. "
|
"Failed to make request to '{}'{}. "
|
||||||
|
"Error: {}. "
|
||||||
"Failed at try {}/{}. "
|
"Failed at try {}/{}. "
|
||||||
"Will retry with current backoff wait is {}/{} ms.",
|
"Will retry with current backoff wait is {}/{} ms.",
|
||||||
initial_uri.toString(), error_message,
|
initial_uri.toString(), current_uri == initial_uri ? String() : fmt::format(" redirect to '{}'", current_uri.toString()),
|
||||||
|
error_message,
|
||||||
attempt + 1, read_settings.http_max_tries,
|
attempt + 1, read_settings.http_max_tries,
|
||||||
milliseconds_to_wait, read_settings.http_retry_max_backoff_ms);
|
milliseconds_to_wait, read_settings.http_retry_max_backoff_ms);
|
||||||
|
|
||||||
@ -512,7 +515,7 @@ size_t ReadWriteBufferFromHTTP::readBigAt(char * to, size_t n, size_t offset, co
|
|||||||
auto range = HTTPRange{offset, offset + n - 1};
|
auto range = HTTPRange{offset, offset + n - 1};
|
||||||
|
|
||||||
Poco::Net::HTTPResponse response;
|
Poco::Net::HTTPResponse response;
|
||||||
auto result = callImpl(response, current_uri, method, range, false);
|
auto result = callImpl(response, method, range, false);
|
||||||
|
|
||||||
if (response.getStatus() != Poco::Net::HTTPResponse::HTTPStatus::HTTP_PARTIAL_CONTENT &&
|
if (response.getStatus() != Poco::Net::HTTPResponse::HTTPStatus::HTTP_PARTIAL_CONTENT &&
|
||||||
(offset != 0 || offset + n < *file_info->file_size))
|
(offset != 0 || offset + n < *file_info->file_size))
|
||||||
|
@ -107,7 +107,6 @@ private:
|
|||||||
|
|
||||||
CallResult callImpl(
|
CallResult callImpl(
|
||||||
Poco::Net::HTTPResponse & response,
|
Poco::Net::HTTPResponse & response,
|
||||||
const Poco::URI & uri_,
|
|
||||||
const std::string & method_,
|
const std::string & method_,
|
||||||
const std::optional<HTTPRange> & range,
|
const std::optional<HTTPRange> & range,
|
||||||
bool allow_redirects) const;
|
bool allow_redirects) const;
|
||||||
|
@ -1,6 +1,8 @@
|
|||||||
import http.server
|
import http.server
|
||||||
|
import sys
|
||||||
|
import json
|
||||||
|
|
||||||
RESULT_PATH = "/headers.txt"
|
RESULT_PATH = "/echo_server_headers.txt"
|
||||||
|
|
||||||
|
|
||||||
class RequestHandler(http.server.BaseHTTPRequestHandler):
|
class RequestHandler(http.server.BaseHTTPRequestHandler):
|
||||||
@ -8,6 +10,28 @@ class RequestHandler(http.server.BaseHTTPRequestHandler):
|
|||||||
with open(RESULT_PATH, "w") as f:
|
with open(RESULT_PATH, "w") as f:
|
||||||
f.write(self.headers.as_string())
|
f.write(self.headers.as_string())
|
||||||
|
|
||||||
|
def do_GET(self):
|
||||||
|
if self.path == "/":
|
||||||
|
self.send_response(200)
|
||||||
|
self.send_header("Content-Type", "text/plain")
|
||||||
|
self.end_headers()
|
||||||
|
self.wfile.write(b'{"status":"ok"}')
|
||||||
|
if self.path == "/sample-data":
|
||||||
|
self.send_response(200)
|
||||||
|
self.send_header("Content-Type", "text/plain")
|
||||||
|
self.end_headers()
|
||||||
|
sample_data = [
|
||||||
|
{
|
||||||
|
"title": "ClickHouse Newsletter June 2022: Materialized, but still real-time",
|
||||||
|
"theme": "Newsletter",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"title": "ClickHouse Over the Years with Benchmarks",
|
||||||
|
"theme": "ClickHouse Journey",
|
||||||
|
}
|
||||||
|
]
|
||||||
|
self.wfile.write(bytes(json.dumps(sample_data), "UTF-8"))
|
||||||
|
|
||||||
def do_POST(self):
|
def do_POST(self):
|
||||||
self.rfile.read1()
|
self.rfile.read1()
|
||||||
self.send_response(200)
|
self.send_response(200)
|
||||||
@ -16,15 +40,16 @@ class RequestHandler(http.server.BaseHTTPRequestHandler):
|
|||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
with open(RESULT_PATH, "w") as f:
|
host = sys.argv[1]
|
||||||
f.write("")
|
port = int(sys.argv[2])
|
||||||
httpd = http.server.HTTPServer(
|
httpd = http.server.ThreadingHTTPServer(
|
||||||
(
|
(
|
||||||
"localhost",
|
host,
|
||||||
8000,
|
port,
|
||||||
),
|
),
|
||||||
RequestHandler,
|
RequestHandler,
|
||||||
)
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
httpd.serve_forever()
|
httpd.serve_forever()
|
||||||
finally:
|
finally:
|
||||||
|
@ -0,0 +1,46 @@
|
|||||||
|
import http.server
|
||||||
|
import sys
|
||||||
|
|
||||||
|
REDIRECT_HOST = ""
|
||||||
|
REDIRECT_PORT = 0
|
||||||
|
|
||||||
|
RESULT_PATH = "/redirect_server_headers.txt"
|
||||||
|
|
||||||
|
|
||||||
|
class RequestHandler(http.server.BaseHTTPRequestHandler):
|
||||||
|
def log_message(self, *args):
|
||||||
|
with open(RESULT_PATH, "w") as f:
|
||||||
|
f.write(self.headers.as_string())
|
||||||
|
|
||||||
|
def do_GET(self):
|
||||||
|
if self.path == "/":
|
||||||
|
self.send_response(200)
|
||||||
|
self.send_header("Content-Type", "text/plain")
|
||||||
|
self.end_headers()
|
||||||
|
self.wfile.write(b'{"status":"ok"}')
|
||||||
|
else:
|
||||||
|
global REDIRECT_HOST, REDIRECT_PORT
|
||||||
|
self.send_response(302)
|
||||||
|
target_location = f"http://{REDIRECT_HOST}:{REDIRECT_PORT}{self.path}"
|
||||||
|
self.send_header("Location", target_location)
|
||||||
|
self.end_headers()
|
||||||
|
self.wfile.write(b'{"status":"redirected"}')
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
host = sys.argv[1]
|
||||||
|
port = int(sys.argv[2])
|
||||||
|
REDIRECT_HOST = sys.argv[3]
|
||||||
|
REDIRECT_PORT = int(sys.argv[4])
|
||||||
|
httpd = http.server.ThreadingHTTPServer(
|
||||||
|
(
|
||||||
|
host,
|
||||||
|
port,
|
||||||
|
),
|
||||||
|
RequestHandler,
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
httpd.serve_forever()
|
||||||
|
finally:
|
||||||
|
httpd.server_close()
|
@ -1,8 +1,7 @@
|
|||||||
import pytest
|
import pytest
|
||||||
import os
|
import os
|
||||||
import time
|
|
||||||
|
|
||||||
from . import http_headers_echo_server
|
from . import http_headers_echo_server
|
||||||
|
from . import redirect_server
|
||||||
|
|
||||||
from helpers.cluster import ClickHouseCluster
|
from helpers.cluster import ClickHouseCluster
|
||||||
|
|
||||||
@ -10,31 +9,37 @@ cluster = ClickHouseCluster(__file__)
|
|||||||
server = cluster.add_instance("node")
|
server = cluster.add_instance("node")
|
||||||
|
|
||||||
|
|
||||||
def run_echo_server():
|
def run_server(container_id, file_name, hostname, port, *args):
|
||||||
script_dir = os.path.dirname(os.path.realpath(__file__))
|
script_dir = os.path.dirname(os.path.realpath(__file__))
|
||||||
|
|
||||||
server.copy_file_to_container(
|
cluster.copy_file_to_container(
|
||||||
os.path.join(script_dir, "http_headers_echo_server.py"),
|
container_id,
|
||||||
"/http_headers_echo_server.py",
|
os.path.join(script_dir, file_name),
|
||||||
|
f"/{file_name}",
|
||||||
)
|
)
|
||||||
|
|
||||||
server.exec_in_container(
|
cmd_args = [hostname, port] + list(args)
|
||||||
|
cmd_args_val = " ".join([str(x) for x in cmd_args])
|
||||||
|
|
||||||
|
cluster.exec_in_container(
|
||||||
|
container_id,
|
||||||
[
|
[
|
||||||
"bash",
|
"bash",
|
||||||
"-c",
|
"-c",
|
||||||
"python3 /http_headers_echo_server.py > /http_headers_echo.server.log 2>&1",
|
f"python3 /{file_name} {cmd_args_val} > {file_name}.log 2>&1",
|
||||||
],
|
],
|
||||||
detach=True,
|
detach=True,
|
||||||
user="root",
|
user="root",
|
||||||
)
|
)
|
||||||
|
|
||||||
for _ in range(0, 10):
|
for _ in range(0, 10):
|
||||||
ping_response = server.exec_in_container(
|
ping_response = cluster.exec_in_container(
|
||||||
["curl", "-s", f"http://localhost:8000/"],
|
container_id,
|
||||||
|
["curl", "-s", f"http://{hostname}:{port}/"],
|
||||||
nothrow=True,
|
nothrow=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
if "html" in ping_response:
|
if '{"status":"ok"}' in ping_response:
|
||||||
return
|
return
|
||||||
|
|
||||||
print(ping_response)
|
print(ping_response)
|
||||||
@ -42,11 +47,23 @@ def run_echo_server():
|
|||||||
raise Exception("Echo server is not responding")
|
raise Exception("Echo server is not responding")
|
||||||
|
|
||||||
|
|
||||||
|
def run_echo_server():
|
||||||
|
container_id = cluster.get_container_id("node")
|
||||||
|
run_server(container_id, "http_headers_echo_server.py", "localhost", 8000)
|
||||||
|
|
||||||
|
|
||||||
|
def run_redirect_server():
|
||||||
|
container_id = cluster.get_container_id("node")
|
||||||
|
run_server(container_id, "redirect_server.py", "localhost", 8080, "localhost", 8000)
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope="module")
|
@pytest.fixture(scope="module")
|
||||||
def started_cluster():
|
def started_cluster():
|
||||||
try:
|
try:
|
||||||
cluster.start()
|
cluster.start()
|
||||||
|
run_redirect_server()
|
||||||
run_echo_server()
|
run_echo_server()
|
||||||
|
|
||||||
yield cluster
|
yield cluster
|
||||||
finally:
|
finally:
|
||||||
cluster.shutdown()
|
cluster.shutdown()
|
||||||
@ -64,3 +81,35 @@ def test_storage_url_http_headers(started_cluster):
|
|||||||
print(result)
|
print(result)
|
||||||
|
|
||||||
assert "X-My-Custom-Header: test-header" in result
|
assert "X-My-Custom-Header: test-header" in result
|
||||||
|
|
||||||
|
|
||||||
|
def test_storage_url_redirected_headers(started_cluster):
|
||||||
|
query = """
|
||||||
|
SELECT
|
||||||
|
title::String as title,
|
||||||
|
theme::String as theme
|
||||||
|
FROM
|
||||||
|
url('http://127.0.0.1:8080/sample-data', 'JSONEachRow', 'title String, theme String')
|
||||||
|
SETTINGS http_max_tries=2, max_http_get_redirects=2
|
||||||
|
"""
|
||||||
|
|
||||||
|
result = server.query(query)
|
||||||
|
assert 2 == len(result.strip().split("\n"))
|
||||||
|
|
||||||
|
result_redirect = server.exec_in_container(
|
||||||
|
["cat", redirect_server.RESULT_PATH], user="root"
|
||||||
|
)
|
||||||
|
|
||||||
|
print(result_redirect)
|
||||||
|
|
||||||
|
assert "Host: 127.0.0.1" in result_redirect
|
||||||
|
assert "Host: localhost" not in result_redirect
|
||||||
|
|
||||||
|
result = server.exec_in_container(
|
||||||
|
["cat", http_headers_echo_server.RESULT_PATH], user="root"
|
||||||
|
)
|
||||||
|
|
||||||
|
print(result)
|
||||||
|
|
||||||
|
assert "Host: 127.0.0.1" not in result
|
||||||
|
assert "Host: localhost" in result
|
||||||
|
Loading…
Reference in New Issue
Block a user