Merge branch 'master' into move-regions

This commit is contained in:
Alexey Milovidov 2023-07-31 12:28:59 +02:00
commit b77eaa184a
36 changed files with 929 additions and 216 deletions

View File

@ -3643,7 +3643,7 @@ jobs:
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/unit_tests_asan
REPORTS_PATH=${{runner.temp}}/reports_dir
CHECK_NAME=Unit tests (release-clang)
CHECK_NAME=Unit tests (release)
REPO_COPY=${{runner.temp}}/unit_tests_asan/ClickHouse
EOF
- name: Download json reports

View File

@ -4541,7 +4541,7 @@ jobs:
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/unit_tests_asan
REPORTS_PATH=${{runner.temp}}/reports_dir
CHECK_NAME=Unit tests (release-clang)
CHECK_NAME=Unit tests (release)
REPO_COPY=${{runner.temp}}/unit_tests_asan/ClickHouse
EOF
- name: Download json reports

View File

@ -23,7 +23,6 @@
* Added `Overlay` database engine to combine multiple databases into one. Added `Filesystem` database engine to represent a directory in the filesystem as a set of implicitly available tables with auto-detected formats and structures. A new `S3` database engine allows to read-only interact with s3 storage by representing a prefix as a set of tables. A new `HDFS` database engine allows to interact with HDFS storage in the same way. [#48821](https://github.com/ClickHouse/ClickHouse/pull/48821) ([alekseygolub](https://github.com/alekseygolub)).
* Add support for external disks in Keeper for storing snapshots and logs. [#50098](https://github.com/ClickHouse/ClickHouse/pull/50098) ([Antonio Andelic](https://github.com/antonio2368)).
* Add support for multi-directory selection (`{}`) globs. [#50559](https://github.com/ClickHouse/ClickHouse/pull/50559) ([Andrey Zvonov](https://github.com/zvonand)).
* Support ZooKeeper `reconfig` command for ClickHouse Keeper with incremental reconfiguration which can be enabled via `keeper_server.enable_reconfiguration` setting. Support adding servers, removing servers, and changing server priorities. [#49450](https://github.com/ClickHouse/ClickHouse/pull/49450) ([Mike Kot](https://github.com/myrrc)).
* Kafka connector can fetch Avro schema from schema registry with basic authentication using url-encoded credentials. [#49664](https://github.com/ClickHouse/ClickHouse/pull/49664) ([Ilya Golshtein](https://github.com/ilejn)).
* Add function `arrayJaccardIndex` which computes the Jaccard similarity between two arrays. [#50076](https://github.com/ClickHouse/ClickHouse/pull/50076) ([FFFFFFFHHHHHHH](https://github.com/FFFFFFFHHHHHHH)).
* Add a column `is_obsolete` to `system.settings` and similar tables. Closes [#50819](https://github.com/ClickHouse/ClickHouse/issues/50819). [#50826](https://github.com/ClickHouse/ClickHouse/pull/50826) ([flynn](https://github.com/ucasfl)).
@ -124,6 +123,7 @@
* (experimental MaterializedMySQL) Now double quoted comments are supported in MaterializedMySQL. [#52355](https://github.com/ClickHouse/ClickHouse/pull/52355) ([Val Doroshchuk](https://github.com/valbok)).
* Upgrade Intel QPL from v1.1.0 to v1.2.0 2. Upgrade Intel accel-config from v3.5 to v4.0 3. Fixed issue that Device IOTLB miss has big perf. impact for IAA accelerators. [#52180](https://github.com/ClickHouse/ClickHouse/pull/52180) ([jasperzhu](https://github.com/jinjunzh)).
* The `session_timezone` setting (new in version 23.6) is demoted to experimental. [#52445](https://github.com/ClickHouse/ClickHouse/pull/52445) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
* Support ZooKeeper `reconfig` command for ClickHouse Keeper with incremental reconfiguration which can be enabled via `keeper_server.enable_reconfiguration` setting. Support adding servers, removing servers, and changing server priorities. [#49450](https://github.com/ClickHouse/ClickHouse/pull/49450) ([Mike Kot](https://github.com/myrrc)). It is suspected that this feature is incomplete.
#### Build/Testing/Packaging Improvement
* Add experimental ClickHouse builds for Linux RISC-V 64 to CI. [#31398](https://github.com/ClickHouse/ClickHouse/pull/31398) ([Alexey Milovidov](https://github.com/alexey-milovidov)).

View File

@ -2045,27 +2045,26 @@ void Server::createServers(
for (const auto & protocol : protocols)
{
if (!server_type.shouldStart(ServerType::Type::CUSTOM, protocol))
std::string prefix = "protocols." + protocol + ".";
std::string port_name = prefix + "port";
std::string description {"<undefined> protocol"};
if (config.has(prefix + "description"))
description = config.getString(prefix + "description");
if (!config.has(prefix + "port"))
continue;
if (!server_type.shouldStart(ServerType::Type::CUSTOM, port_name))
continue;
std::vector<std::string> hosts;
if (config.has("protocols." + protocol + ".host"))
hosts.push_back(config.getString("protocols." + protocol + ".host"));
if (config.has(prefix + "host"))
hosts.push_back(config.getString(prefix + "host"));
else
hosts = listen_hosts;
for (const auto & host : hosts)
{
std::string conf_name = "protocols." + protocol;
std::string prefix = conf_name + ".";
if (!config.has(prefix + "port"))
continue;
std::string description {"<undefined> protocol"};
if (config.has(prefix + "description"))
description = config.getString(prefix + "description");
std::string port_name = prefix + "port";
bool is_secure = false;
auto stack = buildProtocolStackFromConfig(config, protocol, http_params, async_metrics, is_secure);

View File

@ -136,6 +136,8 @@ using ResponseCallback = std::function<void(const Response &)>;
struct Response
{
Error error = Error::ZOK;
int64_t zxid = 0;
Response() = default;
Response(const Response &) = default;
Response & operator=(const Response &) = default;

View File

@ -195,6 +195,7 @@ struct TestKeeperMultiRequest final : MultiRequest, TestKeeperRequest
std::pair<ResponsePtr, Undo> TestKeeperCreateRequest::process(TestKeeper::Container & container, int64_t zxid) const
{
CreateResponse response;
response.zxid = zxid;
Undo undo;
if (container.contains(path))
@ -257,9 +258,10 @@ std::pair<ResponsePtr, Undo> TestKeeperCreateRequest::process(TestKeeper::Contai
return { std::make_shared<CreateResponse>(response), undo };
}
std::pair<ResponsePtr, Undo> TestKeeperRemoveRequest::process(TestKeeper::Container & container, int64_t) const
std::pair<ResponsePtr, Undo> TestKeeperRemoveRequest::process(TestKeeper::Container & container, int64_t zxid) const
{
RemoveResponse response;
response.zxid = zxid;
Undo undo;
auto it = container.find(path);
@ -296,9 +298,10 @@ std::pair<ResponsePtr, Undo> TestKeeperRemoveRequest::process(TestKeeper::Contai
return { std::make_shared<RemoveResponse>(response), undo };
}
std::pair<ResponsePtr, Undo> TestKeeperExistsRequest::process(TestKeeper::Container & container, int64_t) const
std::pair<ResponsePtr, Undo> TestKeeperExistsRequest::process(TestKeeper::Container & container, int64_t zxid) const
{
ExistsResponse response;
response.zxid = zxid;
auto it = container.find(path);
if (it != container.end())
@ -314,9 +317,10 @@ std::pair<ResponsePtr, Undo> TestKeeperExistsRequest::process(TestKeeper::Contai
return { std::make_shared<ExistsResponse>(response), {} };
}
std::pair<ResponsePtr, Undo> TestKeeperGetRequest::process(TestKeeper::Container & container, int64_t) const
std::pair<ResponsePtr, Undo> TestKeeperGetRequest::process(TestKeeper::Container & container, int64_t zxid) const
{
GetResponse response;
response.zxid = zxid;
auto it = container.find(path);
if (it == container.end())
@ -336,6 +340,7 @@ std::pair<ResponsePtr, Undo> TestKeeperGetRequest::process(TestKeeper::Container
std::pair<ResponsePtr, Undo> TestKeeperSetRequest::process(TestKeeper::Container & container, int64_t zxid) const
{
SetResponse response;
response.zxid = zxid;
Undo undo;
auto it = container.find(path);
@ -370,9 +375,10 @@ std::pair<ResponsePtr, Undo> TestKeeperSetRequest::process(TestKeeper::Container
return { std::make_shared<SetResponse>(response), undo };
}
std::pair<ResponsePtr, Undo> TestKeeperListRequest::process(TestKeeper::Container & container, int64_t) const
std::pair<ResponsePtr, Undo> TestKeeperListRequest::process(TestKeeper::Container & container, int64_t zxid) const
{
ListResponse response;
response.zxid = zxid;
auto it = container.find(path);
if (it == container.end())
@ -414,9 +420,10 @@ std::pair<ResponsePtr, Undo> TestKeeperListRequest::process(TestKeeper::Containe
return { std::make_shared<ListResponse>(response), {} };
}
std::pair<ResponsePtr, Undo> TestKeeperCheckRequest::process(TestKeeper::Container & container, int64_t) const
std::pair<ResponsePtr, Undo> TestKeeperCheckRequest::process(TestKeeper::Container & container, int64_t zxid) const
{
CheckResponse response;
response.zxid = zxid;
auto it = container.find(path);
if (it == container.end())
{
@ -434,10 +441,11 @@ std::pair<ResponsePtr, Undo> TestKeeperCheckRequest::process(TestKeeper::Contain
return { std::make_shared<CheckResponse>(response), {} };
}
std::pair<ResponsePtr, Undo> TestKeeperSyncRequest::process(TestKeeper::Container & /*container*/, int64_t) const
std::pair<ResponsePtr, Undo> TestKeeperSyncRequest::process(TestKeeper::Container & /*container*/, int64_t zxid) const
{
SyncResponse response;
response.path = path;
response.zxid = zxid;
return { std::make_shared<SyncResponse>(std::move(response)), {} };
}
@ -456,6 +464,7 @@ std::pair<ResponsePtr, Undo> TestKeeperReconfigRequest::process(TestKeeper::Cont
std::pair<ResponsePtr, Undo> TestKeeperMultiRequest::process(TestKeeper::Container & container, int64_t zxid) const
{
MultiResponse response;
response.zxid = zxid;
response.responses.reserve(requests.size());
std::vector<Undo> undo_actions;

View File

@ -642,6 +642,8 @@ void ZooKeeperMultiResponse::readImpl(ReadBuffer & in)
if (op_error == Error::ZOK || op_num == OpNum::Error)
dynamic_cast<ZooKeeperResponse &>(*response).readImpl(in);
response->zxid = zxid;
}
/// Footer.

View File

@ -28,7 +28,6 @@ using LogElements = std::vector<ZooKeeperLogElement>;
struct ZooKeeperResponse : virtual Response
{
XID xid = 0;
int64_t zxid = 0;
UInt64 response_created_time_ns = 0;

View File

@ -17,13 +17,13 @@ namespace ErrorCodes
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
}
void DictionaryFactory::registerLayout(const std::string & layout_type, LayoutCreateFunction create_layout, bool is_layout_complex)
void DictionaryFactory::registerLayout(const std::string & layout_type, LayoutCreateFunction create_layout, bool is_layout_complex, bool has_layout_complex)
{
auto it = registered_layouts.find(layout_type);
if (it != registered_layouts.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "DictionaryFactory: the layout name '{}' is not unique", layout_type);
RegisteredLayout layout { .layout_create_function = create_layout, .is_layout_complex = is_layout_complex };
RegisteredLayout layout { .layout_create_function = create_layout, .is_layout_complex = is_layout_complex, .has_layout_complex = has_layout_complex };
registered_layouts.emplace(layout_type, std::move(layout));
}
@ -89,6 +89,25 @@ bool DictionaryFactory::isComplex(const std::string & layout_type) const
return it->second.is_layout_complex;
}
bool DictionaryFactory::convertToComplex(std::string & layout_type) const
{
auto it = registered_layouts.find(layout_type);
if (it == registered_layouts.end())
{
throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG,
"Unknown dictionary layout type: {}",
layout_type);
}
if (!it->second.is_layout_complex && it->second.has_layout_complex)
{
layout_type = "complex_key_" + layout_type;
return true;
}
return false;
}
DictionaryFactory & DictionaryFactory::instance()
{

View File

@ -55,13 +55,18 @@ public:
bool isComplex(const std::string & layout_type) const;
void registerLayout(const std::string & layout_type, LayoutCreateFunction create_layout, bool is_layout_complex);
/// If the argument `layout_type` is not complex layout and has corresponding complex layout,
/// change `layout_type` to corresponding complex and return true; otherwise do nothing and return false.
bool convertToComplex(std::string & layout_type) const;
void registerLayout(const std::string & layout_type, LayoutCreateFunction create_layout, bool is_layout_complex, bool has_layout_complex = true);
private:
struct RegisteredLayout
{
LayoutCreateFunction layout_create_function;
bool is_layout_complex;
bool has_layout_complex;
};
using LayoutRegistry = std::unordered_map<std::string, RegisteredLayout>;

View File

@ -683,7 +683,7 @@ void registerDictionaryFlat(DictionaryFactory & factory)
return std::make_unique<FlatDictionary>(dict_id, dict_struct, std::move(source_ptr), configuration);
};
factory.registerLayout("flat", create_layout, false);
factory.registerLayout("flat", create_layout, false, false);
}

View File

@ -19,6 +19,7 @@
#include <Functions/FunctionFactory.h>
#include <Common/isLocalAddress.h>
#include <Interpreters/Context.h>
#include <DataTypes/DataTypeFactory.h>
namespace DB
@ -614,6 +615,16 @@ getDictionaryConfigurationFromAST(const ASTCreateQuery & query, ContextPtr conte
checkPrimaryKey(all_attr_names_and_types, pk_attrs);
/// If the pk size is 1 and pk's DataType is not number, we should convert to complex.
/// NOTE: the data type of Numeric key(simple layout) is UInt64, so if the type is not under UInt64, type casting will lead to precision loss.
DataTypePtr first_key_type = DataTypeFactory::instance().get(all_attr_names_and_types.find(pk_attrs[0])->second.type);
if ((pk_attrs.size() > 1 || (pk_attrs.size() == 1 && !isNumber(first_key_type)))
&& !complex
&& DictionaryFactory::instance().convertToComplex(dictionary_layout->layout_type))
{
complex = true;
}
buildPrimaryKeyConfiguration(xml_document, structure_element, complex, pk_attrs, query.dictionary_attributes_list);
buildLayoutConfiguration(xml_document, current_dictionary, query.dictionary->dict_settings, dictionary_layout);

View File

@ -3,6 +3,7 @@
#include <IO/ReadBufferFromString.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromEmptyFile.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h>
@ -485,8 +486,15 @@ std::unique_ptr<ReadBufferFromFileBase> DiskObjectStorage::readFile(
std::optional<size_t> read_hint,
std::optional<size_t> file_size) const
{
auto storage_objects = metadata_storage->getStorageObjects(path);
const bool file_can_be_empty = !file_size.has_value() || *file_size == 0;
if (storage_objects.empty() && file_can_be_empty)
return std::make_unique<ReadBufferFromEmptyFile>();
return object_storage->readObjects(
metadata_storage->getStorageObjects(path),
storage_objects,
object_storage->getAdjustedSettingsFromMetadataFile(settings, path),
read_hint,
file_size);

View File

@ -13,6 +13,8 @@
#include <aws/core/utils/HashingUtils.h>
#include <aws/core/utils/logging/ErrorMacros.h>
#include <Poco/Net/NetException.h>
#include <IO/S3Common.h>
#include <IO/S3/Requests.h>
#include <IO/S3/PocoHTTPClientFactory.h>
@ -23,6 +25,15 @@
#include <Common/logger_useful.h>
namespace ProfileEvents
{
extern const Event S3WriteRequestsErrors;
extern const Event S3ReadRequestsErrors;
extern const Event DiskS3WriteRequestsErrors;
extern const Event DiskS3ReadRequestsErrors;
}
namespace DB
{
@ -346,12 +357,14 @@ Model::HeadObjectOutcome Client::HeadObject(const HeadObjectRequest & request) c
Model::ListObjectsV2Outcome Client::ListObjectsV2(const ListObjectsV2Request & request) const
{
return doRequest(request, [this](const Model::ListObjectsV2Request & req) { return ListObjectsV2(req); });
return doRequestWithRetryNetworkErrors</*IsReadMethod*/ true>(
request, [this](const Model::ListObjectsV2Request & req) { return ListObjectsV2(req); });
}
Model::ListObjectsOutcome Client::ListObjects(const ListObjectsRequest & request) const
{
return doRequest(request, [this](const Model::ListObjectsRequest & req) { return ListObjects(req); });
return doRequestWithRetryNetworkErrors</*IsReadMethod*/ true>(
request, [this](const Model::ListObjectsRequest & req) { return ListObjects(req); });
}
Model::GetObjectOutcome Client::GetObject(const GetObjectRequest & request) const
@ -361,19 +374,19 @@ Model::GetObjectOutcome Client::GetObject(const GetObjectRequest & request) cons
Model::AbortMultipartUploadOutcome Client::AbortMultipartUpload(const AbortMultipartUploadRequest & request) const
{
return doRequest(
return doRequestWithRetryNetworkErrors</*IsReadMethod*/ false>(
request, [this](const Model::AbortMultipartUploadRequest & req) { return AbortMultipartUpload(req); });
}
Model::CreateMultipartUploadOutcome Client::CreateMultipartUpload(const CreateMultipartUploadRequest & request) const
{
return doRequest(
return doRequestWithRetryNetworkErrors</*IsReadMethod*/ false>(
request, [this](const Model::CreateMultipartUploadRequest & req) { return CreateMultipartUpload(req); });
}
Model::CompleteMultipartUploadOutcome Client::CompleteMultipartUpload(const CompleteMultipartUploadRequest & request) const
{
auto outcome = doRequest(
auto outcome = doRequestWithRetryNetworkErrors</*IsReadMethod*/ false>(
request, [this](const Model::CompleteMultipartUploadRequest & req) { return CompleteMultipartUpload(req); });
if (!outcome.IsSuccess() || provider_type != ProviderType::GCS)
@ -403,32 +416,38 @@ Model::CompleteMultipartUploadOutcome Client::CompleteMultipartUpload(const Comp
Model::CopyObjectOutcome Client::CopyObject(const CopyObjectRequest & request) const
{
return doRequest(request, [this](const Model::CopyObjectRequest & req) { return CopyObject(req); });
return doRequestWithRetryNetworkErrors</*IsReadMethod*/ false>(
request, [this](const Model::CopyObjectRequest & req) { return CopyObject(req); });
}
Model::PutObjectOutcome Client::PutObject(const PutObjectRequest & request) const
{
return doRequest(request, [this](const Model::PutObjectRequest & req) { return PutObject(req); });
return doRequestWithRetryNetworkErrors</*IsReadMethod*/ false>(
request, [this](const Model::PutObjectRequest & req) { return PutObject(req); });
}
Model::UploadPartOutcome Client::UploadPart(const UploadPartRequest & request) const
{
return doRequest(request, [this](const Model::UploadPartRequest & req) { return UploadPart(req); });
return doRequestWithRetryNetworkErrors</*IsReadMethod*/ false>(
request, [this](const Model::UploadPartRequest & req) { return UploadPart(req); });
}
Model::UploadPartCopyOutcome Client::UploadPartCopy(const UploadPartCopyRequest & request) const
{
return doRequest(request, [this](const Model::UploadPartCopyRequest & req) { return UploadPartCopy(req); });
return doRequestWithRetryNetworkErrors</*IsReadMethod*/ false>(
request, [this](const Model::UploadPartCopyRequest & req) { return UploadPartCopy(req); });
}
Model::DeleteObjectOutcome Client::DeleteObject(const DeleteObjectRequest & request) const
{
return doRequest(request, [this](const Model::DeleteObjectRequest & req) { return DeleteObject(req); });
return doRequestWithRetryNetworkErrors</*IsReadMethod*/ false>(
request, [this](const Model::DeleteObjectRequest & req) { return DeleteObject(req); });
}
Model::DeleteObjectsOutcome Client::DeleteObjects(const DeleteObjectsRequest & request) const
{
return doRequest(request, [this](const Model::DeleteObjectsRequest & req) { return DeleteObjects(req); });
return doRequestWithRetryNetworkErrors</*IsReadMethod*/ false>(
request, [this](const Model::DeleteObjectsRequest & req) { return DeleteObjects(req); });
}
Client::ComposeObjectOutcome Client::ComposeObject(const ComposeObjectRequest & request) const
@ -457,7 +476,8 @@ Client::ComposeObjectOutcome Client::ComposeObject(const ComposeObjectRequest &
return ComposeObjectOutcome(MakeRequest(req, endpointResolutionOutcome.GetResult(), Aws::Http::HttpMethod::HTTP_PUT));
};
return doRequest(request, request_fn);
return doRequestWithRetryNetworkErrors</*IsReadMethod*/ false>(
request, request_fn);
}
template <typename RequestType, typename RequestFn>
@ -538,6 +558,65 @@ Client::doRequest(const RequestType & request, RequestFn request_fn) const
throw Exception(ErrorCodes::TOO_MANY_REDIRECTS, "Too many redirects");
}
template <bool IsReadMethod, typename RequestType, typename RequestFn>
std::invoke_result_t<RequestFn, RequestType>
Client::doRequestWithRetryNetworkErrors(const RequestType & request, RequestFn request_fn) const
{
auto with_retries = [this, request_fn_ = std::move(request_fn)] (const RequestType & request_)
{
chassert(client_configuration.retryStrategy);
const Int64 max_attempts = client_configuration.retryStrategy->GetMaxAttempts();
std::exception_ptr last_exception = nullptr;
for (Int64 attempt_no = 0; attempt_no < max_attempts; ++attempt_no)
{
try
{
/// S3 does retries network errors actually.
/// But it is matter when errors occur.
/// This code retries a specific case when
/// network error happens when XML document is being read from the response body.
/// Hence, the response body is a stream, network errors are possible at reading.
/// S3 doesn't retry them.
/// Not all requests can be retried in that way.
/// Requests that read out response body to build the result are possible to retry.
/// Requests that expose the response stream as an answer are not retried with that code. E.g. GetObject.
return request_fn_(request_);
}
catch (Poco::Net::ConnectionResetException &)
{
if constexpr (IsReadMethod)
{
if (client_configuration.for_disk_s3)
ProfileEvents::increment(ProfileEvents::DiskS3ReadRequestsErrors);
else
ProfileEvents::increment(ProfileEvents::S3ReadRequestsErrors);
}
else
{
if (client_configuration.for_disk_s3)
ProfileEvents::increment(ProfileEvents::DiskS3WriteRequestsErrors);
else
ProfileEvents::increment(ProfileEvents::S3WriteRequestsErrors);
}
tryLogCurrentException(log, "Will retry");
last_exception = std::current_exception();
auto error = Aws::Client::AWSError<Aws::Client::CoreErrors>(Aws::Client::CoreErrors::NETWORK_CONNECTION, /*retry*/ true);
client_configuration.retryStrategy->CalculateDelayBeforeNextRetry(error, attempt_no);
continue;
}
}
chassert(last_exception);
std::rethrow_exception(last_exception);
};
return doRequest(request, with_retries);
}
bool Client::supportsMultiPartCopy() const
{
return provider_type != ProviderType::GCS;

View File

@ -250,6 +250,10 @@ private:
std::invoke_result_t<RequestFn, RequestType>
doRequest(const RequestType & request, RequestFn request_fn) const;
template <bool IsReadMethod, typename RequestType, typename RequestFn>
std::invoke_result_t<RequestFn, RequestType>
doRequestWithRetryNetworkErrors(const RequestType & request, RequestFn request_fn) const;
void updateURIForBucket(const std::string & bucket, S3::URI new_uri) const;
std::optional<S3::URI> getURIFromError(const Aws::S3::S3Error & error) const;
std::optional<Aws::S3::S3Error> updateURIForBucketForHead(const std::string & bucket) const;

View File

@ -40,7 +40,7 @@ const char * ServerType::serverTypeToString(ServerType::Type type)
return type_name.data();
}
bool ServerType::shouldStart(Type server_type, const std::string & custom_name_) const
bool ServerType::shouldStart(Type server_type, const std::string & server_custom_name) const
{
if (type == Type::QUERIES_ALL)
return true;
@ -77,13 +77,15 @@ bool ServerType::shouldStart(Type server_type, const std::string & custom_name_)
}
}
return type == server_type && custom_name == custom_name_;
if (type == Type::CUSTOM)
return server_type == type && server_custom_name == "protocols." + custom_name + ".port";
return server_type == type;
}
bool ServerType::shouldStop(const std::string & port_name) const
{
Type port_type;
std::string port_custom_name;
if (port_name == "http_port")
port_type = Type::HTTP;
@ -119,20 +121,12 @@ bool ServerType::shouldStop(const std::string & port_name) const
port_type = Type::INTERSERVER_HTTPS;
else if (port_name.starts_with("protocols.") && port_name.ends_with(".port"))
{
constexpr size_t protocols_size = std::string_view("protocols.").size();
constexpr size_t port_size = std::string_view("protocols.").size();
port_type = Type::CUSTOM;
port_custom_name = port_name.substr(protocols_size, port_name.size() - port_size);
}
else
port_type = Type::UNKNOWN;
if (port_type == Type::UNKNOWN)
else
return false;
return shouldStart(type, port_custom_name);
return shouldStart(port_type, port_name);
}
}

View File

@ -10,7 +10,6 @@ public:
enum Type
{
UNKNOWN,
TCP,
TCP_WITH_PROXY,
TCP_SECURE,
@ -34,7 +33,8 @@ public:
static const char * serverTypeToString(Type type);
bool shouldStart(Type server_type, const std::string & custom_name_ = "") const;
/// Checks whether provided in the arguments type should be started or stopped based on current server type.
bool shouldStart(Type server_type, const std::string & server_custom_name = "") const;
bool shouldStop(const std::string & port_name) const;
Type type;

View File

@ -457,8 +457,11 @@ const ActionsDAG::Node * MergeTreeIndexConditionSet::operatorFromDAG(const Actio
if (arguments_size != 1)
return nullptr;
auto bit_wrapper_function = FunctionFactory::instance().get("__bitWrapperFunc", context);
const auto & bit_wrapper_func_node = result_dag->addFunction(bit_wrapper_function, {arguments[0]}, {});
auto bit_swap_last_two_function = FunctionFactory::instance().get("__bitSwapLastTwo", context);
return &result_dag->addFunction(bit_swap_last_two_function, {arguments[0]}, {});
return &result_dag->addFunction(bit_swap_last_two_function, {&bit_wrapper_func_node}, {});
}
else if (function_name == "and" || function_name == "indexHint" || function_name == "or")
{

View File

@ -11,7 +11,6 @@
00927_asof_joins
00940_order_by_read_in_order_query_plan
00945_bloom_filter_index
00979_set_index_not
00981_in_subquery_with_tuple
01049_join_low_card_bug_long
01062_pm_all_join_with_block_continuation

View File

@ -346,7 +346,7 @@ CI_CONFIG = {
"Compatibility check (aarch64)": {
"required_build": "package_aarch64",
},
"Unit tests (release-clang)": {
"Unit tests (release)": {
"required_build": "binary_release",
},
"Unit tests (asan)": {
@ -509,7 +509,7 @@ REQUIRED_CHECKS = [
"Style Check",
"Unit tests (asan)",
"Unit tests (msan)",
"Unit tests (release-clang)",
"Unit tests (release)",
"Unit tests (tsan)",
"Unit tests (ubsan)",
]

View File

@ -7,11 +7,18 @@ import urllib.parse
import http.server
import socketserver
import string
import socket
import struct
INF_COUNT = 100000000
def _and_then(value, func):
assert callable(func)
return None if value is None else func(value)
class MockControl:
def __init__(self, cluster, container, port):
self._cluster = cluster
@ -30,8 +37,8 @@ class MockControl:
)
assert response == "OK", response
def setup_error_at_object_upload(self, count=None, after=None):
url = f"http://localhost:{self._port}/mock_settings/error_at_object_upload?nothing=1"
def setup_action(self, when, count=None, after=None, action=None, action_args=None):
url = f"http://localhost:{self._port}/mock_settings/{when}?nothing=1"
if count is not None:
url += f"&count={count}"
@ -39,25 +46,12 @@ class MockControl:
if after is not None:
url += f"&after={after}"
response = self._cluster.exec_in_container(
self._cluster.get_container_id(self._container),
[
"curl",
"-s",
url,
],
nothrow=True,
)
assert response == "OK", response
if action is not None:
url += f"&action={action}"
def setup_error_at_part_upload(self, count=None, after=None):
url = f"http://localhost:{self._port}/mock_settings/error_at_part_upload?nothing=1"
if count is not None:
url += f"&count={count}"
if after is not None:
url += f"&after={after}"
if action_args is not None:
for x in action_args:
url += f"&action_args={x}"
response = self._cluster.exec_in_container(
self._cluster.get_container_id(self._container),
@ -70,22 +64,14 @@ class MockControl:
)
assert response == "OK", response
def setup_error_at_create_multi_part_upload(self, count=None):
url = f"http://localhost:{self._port}/mock_settings/error_at_create_multi_part_upload"
def setup_at_object_upload(self, **kwargs):
self.setup_action("at_object_upload", **kwargs)
if count is not None:
url += f"?count={count}"
def setup_at_part_upload(self, **kwargs):
self.setup_action("at_part_upload", **kwargs)
response = self._cluster.exec_in_container(
self._cluster.get_container_id(self._container),
[
"curl",
"-s",
url,
],
nothrow=True,
)
assert response == "OK", response
def setup_at_create_multi_part_upload(self, **kwargs):
self.setup_action("at_create_multi_part_upload", **kwargs)
def setup_fake_puts(self, part_length):
response = self._cluster.exec_in_container(
@ -140,8 +126,14 @@ class MockControl:
class _ServerRuntime:
class SlowPut:
def __init__(
self, probability_=None, timeout_=None, minimal_length_=None, count_=None
self,
lock,
probability_=None,
timeout_=None,
minimal_length_=None,
count_=None,
):
self.lock = lock
self.probability = probability_ if probability_ is not None else 1
self.timeout = timeout_ if timeout_ is not None else 0.1
self.minimal_length = minimal_length_ if minimal_length_ is not None else 0
@ -156,42 +148,135 @@ class _ServerRuntime:
)
def get_timeout(self, content_length):
if content_length > self.minimal_length:
if self.count > 0:
if (
_runtime.slow_put.probability == 1
or random.random() <= _runtime.slow_put.probability
):
self.count -= 1
return _runtime.slow_put.timeout
with self.lock:
if content_length > self.minimal_length:
if self.count > 0:
if (
_runtime.slow_put.probability == 1
or random.random() <= _runtime.slow_put.probability
):
self.count -= 1
return _runtime.slow_put.timeout
return None
class Expected500ErrorAction:
def inject_error(self, request_handler):
data = (
'<?xml version="1.0" encoding="UTF-8"?>'
"<Error>"
"<Code>ExpectedError</Code>"
"<Message>mock s3 injected error</Message>"
"<RequestId>txfbd566d03042474888193-00608d7537</RequestId>"
"</Error>"
)
request_handler.write_error(data)
class RedirectAction:
def __init__(self, host="localhost", port=1):
self.dst_host = _and_then(host, str)
self.dst_port = _and_then(port, int)
def inject_error(self, request_handler):
request_handler.redirect(host=self.dst_host, port=self.dst_port)
class ConnectionResetByPeerAction:
def __init__(self, with_partial_data=None):
self.partial_data = ""
if with_partial_data is not None and with_partial_data == "1":
self.partial_data = (
'<?xml version="1.0" encoding="UTF-8"?>\n'
"<InitiateMultipartUploadResult>\n"
)
def inject_error(self, request_handler):
request_handler.read_all_input()
if self.partial_data:
request_handler.send_response(200)
request_handler.send_header("Content-Type", "text/xml")
request_handler.send_header("Content-Length", 10000)
request_handler.end_headers()
request_handler.wfile.write(bytes(self.partial_data, "UTF-8"))
time.sleep(1)
request_handler.connection.setsockopt(
socket.SOL_SOCKET, socket.SO_LINGER, struct.pack("ii", 1, 0)
)
request_handler.connection.close()
class BrokenPipeAction:
def inject_error(self, request_handler):
# partial read
self.rfile.read(50)
time.sleep(1)
request_handler.connection.setsockopt(
socket.SOL_SOCKET, socket.SO_LINGER, struct.pack("ii", 1, 0)
)
request_handler.connection.close()
class ConnectionRefusedAction(RedirectAction):
pass
class CountAfter:
def __init__(self, count_=None, after_=None):
def __init__(
self, lock, count_=None, after_=None, action_=None, action_args_=[]
):
self.lock = lock
self.count = count_ if count_ is not None else INF_COUNT
self.after = after_ if after_ is not None else 0
self.action = action_
self.action_args = action_args_
if self.action == "connection_refused":
self.error_handler = _ServerRuntime.ConnectionRefusedAction()
elif self.action == "connection_reset_by_peer":
self.error_handler = _ServerRuntime.ConnectionResetByPeerAction(
*self.action_args
)
elif self.action == "broken_pipe":
self.error_handler = _ServerRuntime.BrokenPipeAction()
elif self.action == "redirect_to":
self.error_handler = _ServerRuntime.RedirectAction(*self.action_args)
else:
self.error_handler = _ServerRuntime.Expected500ErrorAction()
@staticmethod
def from_cgi_params(lock, params):
return _ServerRuntime.CountAfter(
lock=lock,
count_=_and_then(params.get("count", [None])[0], int),
after_=_and_then(params.get("after", [None])[0], int),
action_=params.get("action", [None])[0],
action_args_=params.get("action_args", []),
)
def __str__(self):
return f"count:{self.count} after:{self.after}"
return f"count:{self.count} after:{self.after} action:{self.action} action_args:{self.action_args}"
def has_effect(self):
if self.after:
self.after -= 1
if self.after == 0:
if self.count:
self.count -= 1
return True
return False
with self.lock:
if self.after:
self.after -= 1
if self.after == 0:
if self.count:
self.count -= 1
return True
return False
def inject_error(self, request_handler):
self.error_handler.inject_error(request_handler)
def __init__(self):
self.lock = threading.Lock()
self.error_at_part_upload = None
self.error_at_object_upload = None
self.at_part_upload = None
self.at_object_upload = None
self.fake_put_when_length_bigger = None
self.fake_uploads = dict()
self.slow_put = None
self.fake_multipart_upload = None
self.error_at_create_multi_part_upload = None
self.at_create_multi_part_upload = None
def register_fake_upload(self, upload_id, key):
with self.lock:
@ -205,23 +290,18 @@ class _ServerRuntime:
def reset(self):
with self.lock:
self.error_at_part_upload = None
self.error_at_object_upload = None
self.at_part_upload = None
self.at_object_upload = None
self.fake_put_when_length_bigger = None
self.fake_uploads = dict()
self.slow_put = None
self.fake_multipart_upload = None
self.error_at_create_multi_part_upload = None
self.at_create_multi_part_upload = None
_runtime = _ServerRuntime()
def _and_then(value, func):
assert callable(func)
return None if value is None else func(value)
def get_random_string(length):
# choose from all lowercase letter
letters = string.ascii_lowercase
@ -239,7 +319,7 @@ class RequestHandler(http.server.BaseHTTPRequestHandler):
def _ping(self):
self._ok()
def _read_out(self):
def read_all_input(self):
content_length = int(self.headers.get("Content-Length", 0))
to_read = content_length
while to_read > 0:
@ -250,36 +330,36 @@ class RequestHandler(http.server.BaseHTTPRequestHandler):
str(self.rfile.read(size))
to_read -= size
def _redirect(self):
self._read_out()
def redirect(self, host=None, port=None):
if host is None and port is None:
host = self.server.upstream_host
port = self.server.upstream_port
self.read_all_input()
self.send_response(307)
url = (
f"http://{self.server.upstream_host}:{self.server.upstream_port}{self.path}"
)
url = f"http://{host}:{port}{self.path}"
self.log_message("redirect to %s", url)
self.send_header("Location", url)
self.end_headers()
self.wfile.write(b"Redirected")
def _error(self, data):
self._read_out()
def write_error(self, data, content_length=None):
if content_length is None:
content_length = len(data)
self.log_message("write_error %s", data)
self.read_all_input()
self.send_response(500)
self.send_header("Content-Type", "text/xml")
self.send_header("Content-Length", str(content_length))
self.end_headers()
self.wfile.write(bytes(data, "UTF-8"))
def _error_expected_500(self):
self._error(
'<?xml version="1.0" encoding="UTF-8"?>'
"<Error>"
"<Code>ExpectedError</Code>"
"<Message>mock s3 injected error</Message>"
"<RequestId>txfbd566d03042474888193-00608d7537</RequestId>"
"</Error>"
)
if data:
self.wfile.write(bytes(data, "UTF-8"))
def _fake_put_ok(self):
self._read_out()
self.log_message("fake put")
self.read_all_input()
self.send_response(200)
self.send_header("Content-Type", "text/xml")
@ -288,7 +368,7 @@ class RequestHandler(http.server.BaseHTTPRequestHandler):
self.end_headers()
def _fake_uploads(self, path, upload_id):
self._read_out()
self.read_all_input()
parts = [x for x in path.split("/") if x]
bucket = parts[0]
@ -310,7 +390,7 @@ class RequestHandler(http.server.BaseHTTPRequestHandler):
self.wfile.write(bytes(data, "UTF-8"))
def _fake_post_ok(self, path):
self._read_out()
self.read_all_input()
parts = [x for x in path.split("/") if x]
bucket = parts[0]
@ -338,22 +418,22 @@ class RequestHandler(http.server.BaseHTTPRequestHandler):
path = [x for x in parts.path.split("/") if x]
assert path[0] == "mock_settings", path
if len(path) < 2:
return self._error("_mock_settings: wrong command")
return self.write_error("_mock_settings: wrong command")
if path[1] == "error_at_part_upload":
if path[1] == "at_part_upload":
params = urllib.parse.parse_qs(parts.query, keep_blank_values=False)
_runtime.error_at_part_upload = _ServerRuntime.CountAfter(
count_=_and_then(params.get("count", [None])[0], int),
after_=_and_then(params.get("after", [None])[0], int),
_runtime.at_part_upload = _ServerRuntime.CountAfter.from_cgi_params(
_runtime.lock, params
)
self.log_message("set at_part_upload %s", _runtime.at_part_upload)
return self._ok()
if path[1] == "error_at_object_upload":
if path[1] == "at_object_upload":
params = urllib.parse.parse_qs(parts.query, keep_blank_values=False)
_runtime.error_at_object_upload = _ServerRuntime.CountAfter(
count_=_and_then(params.get("count", [None])[0], int),
after_=_and_then(params.get("after", [None])[0], int),
_runtime.at_object_upload = _ServerRuntime.CountAfter.from_cgi_params(
_runtime.lock, params
)
self.log_message("set at_object_upload %s", _runtime.at_object_upload)
return self._ok()
if path[1] == "fake_puts":
@ -361,11 +441,13 @@ class RequestHandler(http.server.BaseHTTPRequestHandler):
_runtime.fake_put_when_length_bigger = int(
params.get("when_length_bigger", [1024 * 1024])[0]
)
self.log_message("set fake_puts %s", _runtime.fake_put_when_length_bigger)
return self._ok()
if path[1] == "slow_put":
params = urllib.parse.parse_qs(parts.query, keep_blank_values=False)
_runtime.slow_put = _ServerRuntime.SlowPut(
lock=_runtime.lock,
minimal_length_=_and_then(params.get("minimal_length", [None])[0], int),
probability_=_and_then(params.get("probability", [None])[0], float),
timeout_=_and_then(params.get("timeout", [None])[0], float),
@ -376,20 +458,26 @@ class RequestHandler(http.server.BaseHTTPRequestHandler):
if path[1] == "setup_fake_multpartuploads":
_runtime.fake_multipart_upload = True
self.log_message("set setup_fake_multpartuploads")
return self._ok()
if path[1] == "error_at_create_multi_part_upload":
if path[1] == "at_create_multi_part_upload":
params = urllib.parse.parse_qs(parts.query, keep_blank_values=False)
_runtime.error_at_create_multi_part_upload = int(
params.get("count", [INF_COUNT])[0]
_runtime.at_create_multi_part_upload = (
_ServerRuntime.CountAfter.from_cgi_params(_runtime.lock, params)
)
self.log_message(
"set at_create_multi_part_upload %s",
_runtime.at_create_multi_part_upload,
)
return self._ok()
if path[1] == "reset":
_runtime.reset()
self.log_message("reset")
return self._ok()
return self._error("_mock_settings: wrong command")
return self.write_error("_mock_settings: wrong command")
def do_GET(self):
if self.path == "/":
@ -398,7 +486,8 @@ class RequestHandler(http.server.BaseHTTPRequestHandler):
if self.path.startswith("/mock_settings"):
return self._mock_settings()
return self._redirect()
self.log_message("get redirect")
return self.redirect()
def do_PUT(self):
content_length = int(self.headers.get("Content-Length", 0))
@ -414,30 +503,52 @@ class RequestHandler(http.server.BaseHTTPRequestHandler):
upload_id = params.get("uploadId", [None])[0]
if upload_id is not None:
if _runtime.error_at_part_upload is not None:
if _runtime.error_at_part_upload.has_effect():
return self._error_expected_500()
if _runtime.at_part_upload is not None:
self.log_message(
"put at_part_upload %s, %s, %s",
_runtime.at_part_upload,
upload_id,
parts,
)
if _runtime.at_part_upload.has_effect():
return _runtime.at_part_upload.inject_error(self)
if _runtime.fake_multipart_upload:
if _runtime.is_fake_upload(upload_id, parts.path):
return self._fake_put_ok()
else:
if _runtime.error_at_object_upload is not None:
if _runtime.error_at_object_upload.has_effect():
return self._error_expected_500()
if _runtime.at_object_upload is not None:
if _runtime.at_object_upload.has_effect():
self.log_message(
"put error_at_object_upload %s, %s",
_runtime.at_object_upload,
parts,
)
return _runtime.at_object_upload.inject_error(self)
if _runtime.fake_put_when_length_bigger is not None:
if content_length > _runtime.fake_put_when_length_bigger:
self.log_message(
"put fake_put_when_length_bigger %s, %s, %s",
_runtime.fake_put_when_length_bigger,
content_length,
parts,
)
return self._fake_put_ok()
return self._redirect()
self.log_message(
"put redirect %s",
parts,
)
return self.redirect()
def do_POST(self):
parts = urllib.parse.urlsplit(self.path)
params = urllib.parse.parse_qs(parts.query, keep_blank_values=True)
uploads = params.get("uploads", [None])[0]
if uploads is not None:
if _runtime.error_at_create_multi_part_upload:
_runtime.error_at_create_multi_part_upload -= 1
return self._error_expected_500()
if _runtime.at_create_multi_part_upload is not None:
if _runtime.at_create_multi_part_upload.has_effect():
return _runtime.at_create_multi_part_upload.inject_error(self)
if _runtime.fake_multipart_upload:
upload_id = get_random_string(5)
@ -448,13 +559,13 @@ class RequestHandler(http.server.BaseHTTPRequestHandler):
if _runtime.is_fake_upload(upload_id, parts.path):
return self._fake_post_ok(parts.path)
return self._redirect()
return self.redirect()
def do_HEAD(self):
self._redirect()
self.redirect()
def do_DELETE(self):
self._redirect()
self.redirect()
class _ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer):

View File

@ -91,7 +91,7 @@ def get_counters(node, query_id, log_type="ExceptionWhileProcessing"):
def test_upload_s3_fail_create_multi_part_upload(cluster, broken_s3, compression):
node = cluster.instances["node"]
broken_s3.setup_error_at_create_multi_part_upload()
broken_s3.setup_at_create_multi_part_upload()
insert_query_id = f"INSERT_INTO_TABLE_FUNCTION_FAIL_CREATE_MPU_{compression}"
error = node.query_and_get_error(
@ -134,7 +134,7 @@ def test_upload_s3_fail_upload_part_when_multi_part_upload(
node = cluster.instances["node"]
broken_s3.setup_fake_multpartuploads()
broken_s3.setup_error_at_part_upload(count=1, after=2)
broken_s3.setup_at_part_upload(count=1, after=2)
insert_query_id = f"INSERT_INTO_TABLE_FUNCTION_FAIL_UPLOAD_PART_{compression}"
error = node.query_and_get_error(
@ -165,3 +165,302 @@ def test_upload_s3_fail_upload_part_when_multi_part_upload(
assert count_create_multi_part_uploads == 1
assert count_upload_parts >= 2
assert count_s3_errors >= 2
def test_when_s3_connection_refused_is_retried(cluster, broken_s3):
node = cluster.instances["node"]
broken_s3.setup_fake_multpartuploads()
broken_s3.setup_at_part_upload(count=3, after=2, action="connection_refused")
insert_query_id = f"INSERT_INTO_TABLE_FUNCTION_CONNECTION_REFUSED_RETRIED"
node.query(
f"""
INSERT INTO
TABLE FUNCTION s3(
'http://resolver:8083/root/data/test_when_s3_connection_refused_at_write_retried',
'minio', 'minio123',
'CSV', auto, 'none'
)
SELECT
*
FROM system.numbers
LIMIT 1000
SETTINGS
s3_max_single_part_upload_size=100,
s3_min_upload_part_size=100,
s3_check_objects_after_upload=0
""",
query_id=insert_query_id,
)
count_create_multi_part_uploads, count_upload_parts, count_s3_errors = get_counters(
node, insert_query_id, log_type="QueryFinish"
)
assert count_create_multi_part_uploads == 1
assert count_upload_parts == 39
assert count_s3_errors == 3
broken_s3.setup_at_part_upload(count=1000, after=2, action="connection_refused")
insert_query_id = f"INSERT_INTO_TABLE_FUNCTION_CONNECTION_REFUSED_RETRIED_1"
error = node.query_and_get_error(
f"""
INSERT INTO
TABLE FUNCTION s3(
'http://resolver:8083/root/data/test_when_s3_connection_refused_at_write_retried',
'minio', 'minio123',
'CSV', auto, 'none'
)
SELECT
*
FROM system.numbers
LIMIT 1000
SETTINGS
s3_max_single_part_upload_size=100,
s3_min_upload_part_size=100,
s3_check_objects_after_upload=0
""",
query_id=insert_query_id,
)
assert "Code: 499" in error, error
assert (
"Poco::Exception. Code: 1000, e.code() = 111, Connection refused" in error
), error
@pytest.mark.parametrize("send_something", [True, False])
def test_when_s3_connection_reset_by_peer_at_upload_is_retried(
cluster, broken_s3, send_something
):
node = cluster.instances["node"]
broken_s3.setup_fake_multpartuploads()
broken_s3.setup_at_part_upload(
count=3,
after=2,
action="connection_reset_by_peer",
action_args=["1"] if send_something else ["0"],
)
insert_query_id = (
f"TEST_WHEN_S3_CONNECTION_RESET_BY_PEER_AT_UPLOAD_{send_something}"
)
node.query(
f"""
INSERT INTO
TABLE FUNCTION s3(
'http://resolver:8083/root/data/test_when_s3_connection_reset_by_peer_at_upload_is_retried',
'minio', 'minio123',
'CSV', auto, 'none'
)
SELECT
*
FROM system.numbers
LIMIT 1000
SETTINGS
s3_max_single_part_upload_size=100,
s3_min_upload_part_size=100,
s3_check_objects_after_upload=0
""",
query_id=insert_query_id,
)
count_create_multi_part_uploads, count_upload_parts, count_s3_errors = get_counters(
node, insert_query_id, log_type="QueryFinish"
)
assert count_create_multi_part_uploads == 1
assert count_upload_parts == 39
assert count_s3_errors == 3
broken_s3.setup_at_part_upload(
count=1000,
after=2,
action="connection_reset_by_peer",
action_args=["1"] if send_something else ["0"],
)
insert_query_id = (
f"TEST_WHEN_S3_CONNECTION_RESET_BY_PEER_AT_UPLOAD_{send_something}_1"
)
error = node.query_and_get_error(
f"""
INSERT INTO
TABLE FUNCTION s3(
'http://resolver:8083/root/data/test_when_s3_connection_reset_by_peer_at_upload_is_retried',
'minio', 'minio123',
'CSV', auto, 'none'
)
SELECT
*
FROM system.numbers
LIMIT 1000
SETTINGS
s3_max_single_part_upload_size=100,
s3_min_upload_part_size=100,
s3_check_objects_after_upload=0
""",
query_id=insert_query_id,
)
assert "Code: 1000" in error, error
assert (
"DB::Exception: Connection reset by peer." in error
or "DB::Exception: Poco::Exception. Code: 1000, e.code() = 104, Connection reset by peer"
in error
), error
@pytest.mark.parametrize("send_something", [True, False])
def test_when_s3_connection_reset_by_peer_at_create_mpu_retried(
cluster, broken_s3, send_something
):
node = cluster.instances["node"]
broken_s3.setup_fake_multpartuploads()
broken_s3.setup_at_create_multi_part_upload(
count=3,
after=0,
action="connection_reset_by_peer",
action_args=["1"] if send_something else ["0"],
)
insert_query_id = (
f"TEST_WHEN_S3_CONNECTION_RESET_BY_PEER_AT_MULTIPARTUPLOAD_{send_something}"
)
node.query(
f"""
INSERT INTO
TABLE FUNCTION s3(
'http://resolver:8083/root/data/test_when_s3_connection_reset_by_peer_at_create_mpu_retried',
'minio', 'minio123',
'CSV', auto, 'none'
)
SELECT
*
FROM system.numbers
LIMIT 1000
SETTINGS
s3_max_single_part_upload_size=100,
s3_min_upload_part_size=100,
s3_check_objects_after_upload=0
""",
query_id=insert_query_id,
)
count_create_multi_part_uploads, count_upload_parts, count_s3_errors = get_counters(
node, insert_query_id, log_type="QueryFinish"
)
assert count_create_multi_part_uploads == 1
assert count_upload_parts == 39
assert count_s3_errors == 3
broken_s3.setup_at_create_multi_part_upload(
count=1000,
after=0,
action="connection_reset_by_peer",
action_args=["1"] if send_something else ["0"],
)
insert_query_id = (
f"TEST_WHEN_S3_CONNECTION_RESET_BY_PEER_AT_MULTIPARTUPLOAD_{send_something}_1"
)
error = node.query_and_get_error(
f"""
INSERT INTO
TABLE FUNCTION s3(
'http://resolver:8083/root/data/test_when_s3_connection_reset_by_peer_at_create_mpu_retried',
'minio', 'minio123',
'CSV', auto, 'none'
)
SELECT
*
FROM system.numbers
LIMIT 1000
SETTINGS
s3_max_single_part_upload_size=100,
s3_min_upload_part_size=100,
s3_check_objects_after_upload=0
""",
query_id=insert_query_id,
)
assert "Code: 1000" in error, error
assert (
"DB::Exception: Connection reset by peer." in error
or "DB::Exception: Poco::Exception. Code: 1000, e.code() = 104, Connection reset by peer"
in error
), error
def test_when_s3_broken_pipe_at_upload_is_retried(cluster, broken_s3):
node = cluster.instances["node"]
broken_s3.setup_fake_multpartuploads()
broken_s3.setup_at_part_upload(
count=3,
after=2,
action="broken_pipe",
)
insert_query_id = f"TEST_WHEN_S3_BROKEN_PIPE_AT_UPLOAD"
node.query(
f"""
INSERT INTO
TABLE FUNCTION s3(
'http://resolver:8083/root/data/test_when_s3_broken_pipe_at_upload_is_retried',
'minio', 'minio123',
'CSV', auto, 'none'
)
SELECT
*
FROM system.numbers
LIMIT 1000000
SETTINGS
s3_max_single_part_upload_size=100,
s3_min_upload_part_size=1000000,
s3_check_objects_after_upload=0
""",
query_id=insert_query_id,
)
count_create_multi_part_uploads, count_upload_parts, count_s3_errors = get_counters(
node, insert_query_id, log_type="QueryFinish"
)
assert count_create_multi_part_uploads == 1
assert count_upload_parts == 7
assert count_s3_errors == 3
broken_s3.setup_at_part_upload(
count=1000,
after=2,
action="broken_pipe",
)
insert_query_id = f"TEST_WHEN_S3_BROKEN_PIPE_AT_UPLOAD_1"
error = node.query_and_get_error(
f"""
INSERT INTO
TABLE FUNCTION s3(
'http://resolver:8083/root/data/test_when_s3_broken_pipe_at_upload_is_retried',
'minio', 'minio123',
'CSV', auto, 'none'
)
SELECT
*
FROM system.numbers
LIMIT 1000000
SETTINGS
s3_max_single_part_upload_size=100,
s3_min_upload_part_size=1000000,
s3_check_objects_after_upload=0
""",
query_id=insert_query_id,
)
assert "Code: 1000" in error, error
assert (
"DB::Exception: Poco::Exception. Code: 1000, e.code() = 32, I/O error: Broken pipe"
in error
), error

View File

@ -83,6 +83,8 @@ def test_reconfig_replace_leader(started_cluster):
assert "node3" in config
assert "node4" not in config
ku.wait_configs_equal(config, zk2)
with pytest.raises(Exception):
zk1.stop()
zk1.close()

View File

@ -783,9 +783,9 @@ def test_merge_canceled_by_s3_errors(cluster, broken_s3, node_name, storage_poli
min_key = node.query("SELECT min(key) FROM test_merge_canceled_by_s3_errors")
assert int(min_key) == 0, min_key
broken_s3.setup_error_at_object_upload()
broken_s3.setup_at_object_upload()
broken_s3.setup_fake_multpartuploads()
broken_s3.setup_error_at_part_upload()
broken_s3.setup_at_part_upload()
node.query("SYSTEM START MERGES test_merge_canceled_by_s3_errors")
@ -828,7 +828,7 @@ def test_merge_canceled_by_s3_errors_when_move(cluster, broken_s3, node_name):
settings={"materialize_ttl_after_modify": 0},
)
broken_s3.setup_error_at_object_upload(count=1, after=1)
broken_s3.setup_at_object_upload(count=1, after=1)
node.query("SYSTEM START MERGES merge_canceled_by_s3_errors_when_move")

View File

@ -208,7 +208,9 @@ def test_https_wrong_cert():
with pytest.raises(Exception) as err:
execute_query_https("SELECT currentUser()", user="john", cert_name="wrong")
err_str = str(err.value)
if count < MAX_RETRY and "Broken pipe" in err_str:
if count < MAX_RETRY and (
("Broken pipe" in err_str) or ("EOF occurred" in err_str)
):
count = count + 1
logging.warning(f"Failed attempt with wrong cert, err: {err_str}")
continue
@ -314,7 +316,9 @@ def test_https_non_ssl_auth():
cert_name="wrong",
)
err_str = str(err.value)
if count < MAX_RETRY and "Broken pipe" in err_str:
if count < MAX_RETRY and (
("Broken pipe" in err_str) or ("EOF occurred" in err_str)
):
count = count + 1
logging.warning(
f"Failed attempt with wrong cert, user: peter, err: {err_str}"
@ -334,7 +338,9 @@ def test_https_non_ssl_auth():
cert_name="wrong",
)
err_str = str(err.value)
if count < MAX_RETRY and "Broken pipe" in err_str:
if count < MAX_RETRY and (
("Broken pipe" in err_str) or ("EOF occurred" in err_str)
):
count = count + 1
logging.warning(
f"Failed attempt with wrong cert, user: jane, err: {err_str}"

View File

@ -3,11 +3,11 @@
<default>
<shard>
<replica>
<host>node1</host>
<host>main_node</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<host>backup_node</host>
<port>9000</port>
</replica>
</shard>

View File

@ -0,0 +1,23 @@
<clickhouse>
<listen_host>0.0.0.0</listen_host>
<!-- Default protocols -->
<tcp_port>9000</tcp_port>
<http_port>8123</http_port>
<mysql_port>9004</mysql_port>
<!-- Custom protocols -->
<protocols>
<tcp>
<type>tcp</type>
<host>0.0.0.0</host>
<port>9001</port>
<description>native protocol (tcp)</description>
</tcp>
<http>
<type>http</type>
<port>8124</port>
<description>http protocol</description>
</http>
</protocols>
</clickhouse>

View File

@ -2,20 +2,18 @@
import pytest
import time
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
from helpers.test_tools import assert_eq_with_retry
import random
import string
import json
from helpers.client import Client
import requests
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1", main_configs=["configs/cluster.xml"], with_zookeeper=True
main_node = cluster.add_instance(
"main_node",
main_configs=["configs/cluster.xml", "configs/protocols.xml"],
with_zookeeper=True,
)
node2 = cluster.add_instance(
"node2", main_configs=["configs/cluster.xml"], with_zookeeper=True
backup_node = cluster.add_instance(
"backup_node", main_configs=["configs/cluster.xml"], with_zookeeper=True
)
@ -30,11 +28,118 @@ def started_cluster():
cluster.shutdown()
def test_system_start_stop_listen_queries(started_cluster):
node1.query("SYSTEM STOP LISTEN QUERIES ALL")
def http_works(port=8123):
try:
response = requests.post(f"http://{main_node.ip_address}:{port}/ping")
if response.status_code == 400:
return True
except:
pass
assert "Connection refused" in node1.query_and_get_error("SELECT 1", timeout=3)
return False
node2.query("SYSTEM START LISTEN ON CLUSTER default QUERIES ALL")
node1.query("SELECT 1")
def assert_everything_works():
custom_client = Client(main_node.ip_address, 9001, command=cluster.client_bin_path)
main_node.query(QUERY)
main_node.query(MYSQL_QUERY)
custom_client.query(QUERY)
assert http_works()
assert http_works(8124)
QUERY = "SELECT 1"
MYSQL_QUERY = "SELECT * FROM mysql('127.0.0.1:9004', 'system', 'one', 'default', '', SETTINGS connect_timeout = 100, connection_wait_timeout = 100)"
def test_default_protocols(started_cluster):
# TCP
assert_everything_works()
main_node.query("SYSTEM STOP LISTEN TCP")
assert "Connection refused" in main_node.query_and_get_error(QUERY)
backup_node.query("SYSTEM START LISTEN ON CLUSTER default TCP")
# HTTP
assert_everything_works()
main_node.query("SYSTEM STOP LISTEN HTTP")
assert http_works() == False
main_node.query("SYSTEM START LISTEN HTTP")
# MySQL
assert_everything_works()
main_node.query("SYSTEM STOP LISTEN MYSQL")
assert "Connections to mysql failed" in main_node.query_and_get_error(MYSQL_QUERY)
main_node.query("SYSTEM START LISTEN MYSQL")
assert_everything_works()
def test_custom_protocols(started_cluster):
# TCP
custom_client = Client(main_node.ip_address, 9001, command=cluster.client_bin_path)
assert_everything_works()
main_node.query("SYSTEM STOP LISTEN CUSTOM 'tcp'")
assert "Connection refused" in custom_client.query_and_get_error(QUERY)
main_node.query("SYSTEM START LISTEN CUSTOM 'tcp'")
# HTTP
assert_everything_works()
main_node.query("SYSTEM STOP LISTEN CUSTOM 'http'")
assert http_works(8124) == False
main_node.query("SYSTEM START LISTEN CUSTOM 'http'")
assert_everything_works()
def test_all_protocols(started_cluster):
custom_client = Client(main_node.ip_address, 9001, command=cluster.client_bin_path)
assert_everything_works()
# STOP LISTEN QUERIES ALL
main_node.query("SYSTEM STOP LISTEN QUERIES ALL")
assert "Connection refused" in main_node.query_and_get_error(QUERY)
assert "Connection refused" in custom_client.query_and_get_error(QUERY)
assert http_works() == False
assert http_works(8124) == False
backup_node.query("SYSTEM START LISTEN ON CLUSTER default QUERIES ALL")
# STOP LISTEN QUERIES DEFAULT
assert_everything_works()
main_node.query("SYSTEM STOP LISTEN QUERIES DEFAULT")
assert "Connection refused" in main_node.query_and_get_error(QUERY)
custom_client.query(QUERY)
assert http_works() == False
assert http_works(8124)
backup_node.query("SYSTEM START LISTEN ON CLUSTER default QUERIES DEFAULT")
# STOP LISTEN QUERIES CUSTOM
assert_everything_works()
main_node.query("SYSTEM STOP LISTEN QUERIES CUSTOM")
main_node.query(QUERY)
assert "Connection refused" in custom_client.query_and_get_error(QUERY)
assert http_works()
assert http_works(8124) == False
backup_node.query("SYSTEM START LISTEN ON CLUSTER default QUERIES CUSTOM")
# Disable all protocols, check first START LISTEN QUERIES DEFAULT then START LISTEN QUERIES CUSTOM
assert_everything_works()
main_node.query("SYSTEM STOP LISTEN QUERIES ALL")
backup_node.query("SYSTEM START LISTEN ON CLUSTER default QUERIES DEFAULT")
main_node.query(QUERY)
assert "Connection refused" in custom_client.query_and_get_error(QUERY)
assert http_works()
assert http_works(8124) == False
main_node.query("SYSTEM STOP LISTEN QUERIES ALL")
backup_node.query("SYSTEM START LISTEN ON CLUSTER default QUERIES CUSTOM")
assert "Connection refused" in main_node.query_and_get_error(QUERY)
custom_client.query(QUERY)
assert http_works() == False
assert http_works(8124)
backup_node.query("SYSTEM START LISTEN ON CLUSTER default QUERIES ALL")
assert_everything_works()

View File

@ -96,7 +96,9 @@ def test_https_wrong_cert():
with pytest.raises(Exception) as err:
execute_query_https("SELECT currentUser()", user="john", cert_name="wrong")
err_str = str(err.value)
if count < MAX_RETRY and "Broken pipe" in err_str:
if count < MAX_RETRY and (
("Broken pipe" in err_str) or ("EOF occurred" in err_str)
):
count = count + 1
logging.warning(f"Failed attempt with wrong cert, err: {err_str}")
continue
@ -202,7 +204,9 @@ def test_https_non_ssl_auth():
cert_name="wrong",
)
err_str = str(err.value)
if count < MAX_RETRY and "Broken pipe" in err_str:
if count < MAX_RETRY and (
("Broken pipe" in err_str) or ("EOF occurred" in err_str)
):
count = count + 1
logging.warning(
f"Failed attempt with wrong cert, user: peter, err: {err_str}"
@ -222,7 +226,9 @@ def test_https_non_ssl_auth():
cert_name="wrong",
)
err_str = str(err.value)
if count < MAX_RETRY and "Broken pipe" in err_str:
if count < MAX_RETRY and (
("Broken pipe" in err_str) or ("EOF occurred" in err_str)
):
count = count + 1
logging.warning(
f"Failed attempt with wrong cert, user: jane, err: {err_str}"

View File

@ -1,2 +1,4 @@
Jon alive
Jon alive
Ramsey rip
Ramsey rip

View File

@ -11,5 +11,7 @@ insert into set_index_not values ('Jon','alive'),('Ramsey','rip');
select * from set_index_not where status!='rip';
select * from set_index_not where NOT (status ='rip');
select * from set_index_not where NOT (status!='rip');
select * from set_index_not where NOT (NOT (status ='rip'));
DROP TABLE set_index_not;

View File

@ -9,21 +9,6 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
$CLICKHOUSE_CLIENT -q "DROP DICTIONARY IF EXISTS dict1"
# Simple layout, but with two keys
$CLICKHOUSE_CLIENT -q "
CREATE DICTIONARY dict1
(
key1 UInt64,
key2 UInt64,
value String
)
PRIMARY KEY key1, key2
LAYOUT(HASHED())
SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() USER 'default' TABLE 'table_for_dict1' DB '$CLICKHOUSE_DATABASE'))
LIFETIME(MIN 1 MAX 10)
" 2>&1 | grep -c 'Primary key for simple dictionary must contain exactly one element'
# Simple layout, but with non existing key
$CLICKHOUSE_CLIENT -q "
CREATE DICTIONARY dict1

View File

@ -89,7 +89,7 @@ SOURCE(CLICKHOUSE(TABLE test_table_string))
LAYOUT(SPARSE_HASHED(SHARDS 10))
LIFETIME(0);
SYSTEM RELOAD DICTIONARY test_dictionary_10_shards_string; -- { serverError CANNOT_PARSE_TEXT }
SYSTEM RELOAD DICTIONARY test_dictionary_10_shards_string;
DROP DICTIONARY test_dictionary_10_shards_string;

View File

@ -0,0 +1,5 @@
dict_flat_simple Flat
dict_hashed_simple_Decimal128 Hashed
dict_hashed_simple_Float32 Hashed
dict_hashed_simple_String ComplexKeyHashed
dict_hashed_simple_auto_convert ComplexKeyHashed

View File

@ -0,0 +1,35 @@
DROP DICTIONARY IF EXISTS dict_flat_simple;
DROP DICTIONARY IF EXISTS dict_hashed_simple_Decimal128;
DROP DICTIONARY IF EXISTS dict_hashed_simple_Float32;
DROP DICTIONARY IF EXISTS dict_hashed_simple_String;
DROP DICTIONARY IF EXISTS dict_hashed_simple_auto_convert;
DROP TABLE IF EXISTS dict_data;
CREATE TABLE dict_data (v0 UInt16, v1 Int16, v2 Float32, v3 Decimal128(10), v4 String) engine=Memory() AS SELECT number, number%65535, number*1.1, number*1.1, 'foo' FROM numbers(10);;
CREATE DICTIONARY dict_flat_simple (v0 UInt16, v1 UInt16, v2 UInt16) PRIMARY KEY v0 SOURCE(CLICKHOUSE(TABLE 'dict_data')) LIFETIME(0) LAYOUT(flat());
SYSTEM RELOAD DICTIONARY dict_flat_simple;
SELECT name, type FROM system.dictionaries WHERE database = currentDatabase() AND name = 'dict_flat_simple';
DROP DICTIONARY dict_flat_simple;
CREATE DICTIONARY dict_hashed_simple_Decimal128 (v3 Decimal128(10), v1 UInt16, v2 Float32) PRIMARY KEY v3 SOURCE(CLICKHOUSE(TABLE 'dict_data')) LIFETIME(0) LAYOUT(hashed());
SYSTEM RELOAD DICTIONARY dict_hashed_simple_Decimal128;
SELECT name, type FROM system.dictionaries WHERE database = currentDatabase() AND name = 'dict_hashed_simple_Decimal128';
DROP DICTIONARY dict_hashed_simple_Decimal128;
CREATE DICTIONARY dict_hashed_simple_Float32 (v2 Float32, v3 Decimal128(10), v4 String) PRIMARY KEY v2 SOURCE(CLICKHOUSE(TABLE 'dict_data')) LIFETIME(0) LAYOUT(hashed());
SYSTEM RELOAD DICTIONARY dict_hashed_simple_Float32;
SELECT name, type FROM system.dictionaries WHERE database = currentDatabase() AND name = 'dict_hashed_simple_Float32';
DROP DICTIONARY dict_hashed_simple_Float32;
CREATE DICTIONARY dict_hashed_simple_String (v4 String, v3 Decimal128(10), v2 Float32) PRIMARY KEY v4 SOURCE(CLICKHOUSE(TABLE 'dict_data')) LIFETIME(0) LAYOUT(hashed());
SYSTEM RELOAD DICTIONARY dict_hashed_simple_String;
SELECT name, type FROM system.dictionaries WHERE database = currentDatabase() AND name = 'dict_hashed_simple_String';
DROP DICTIONARY dict_hashed_simple_String;
CREATE DICTIONARY dict_hashed_simple_auto_convert (v0 UInt16, v1 Int16, v2 UInt16) PRIMARY KEY v0,v1 SOURCE(CLICKHOUSE(TABLE 'dict_data')) LIFETIME(0) LAYOUT(hashed());
SYSTEM RELOAD DICTIONARY dict_hashed_simple_auto_convert;
SELECT name, type FROM system.dictionaries WHERE database = currentDatabase() AND name = 'dict_hashed_simple_auto_convert';
DROP DICTIONARY dict_hashed_simple_auto_convert;
DROP TABLE dict_data;