Merge branch 'master' of github.com:ClickHouse/ClickHouse into remove-cpp-templates-2

This commit is contained in:
Alexey Milovidov 2023-11-11 03:45:54 +01:00
commit fcd45d47e4
40 changed files with 225 additions and 389 deletions

View File

@ -12,7 +12,6 @@ A client application to interact with clickhouse-keeper by its native protocol.
- `-q QUERY`, `--query=QUERY` — Query to execute. If this parameter is not passed, `clickhouse-keeper-client` will start in interactive mode.
- `-h HOST`, `--host=HOST` — Server host. Default value: `localhost`.
- `-p N`, `--port=N` — Server port. Default value: 9181
- `-c FILE_PATH`, `--config-file=FILE_PATH` — Set path of config file to get the connection string. Default value: `config.xml`.
- `--connection-timeout=TIMEOUT` — Set connection timeout in seconds. Default value: 10s.
- `--session-timeout=TIMEOUT` — Set session timeout in seconds. Default value: 10s.
- `--operation-timeout=TIMEOUT` — Set operation timeout in seconds. Default value: 10s.

View File

@ -586,7 +586,6 @@
M(704, CANNOT_USE_QUERY_CACHE_WITH_NONDETERMINISTIC_FUNCTIONS) \
M(705, TABLE_NOT_EMPTY) \
M(706, LIBSSH_ERROR) \
M(707, GCP_ERROR) \
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \
M(1001, STD_EXCEPTION) \

View File

@ -36,7 +36,7 @@ void CoordinationSettings::loadFromConfig(const String & config_elem, const Poco
}
const String KeeperConfigurationAndSettings::DEFAULT_FOUR_LETTER_WORD_CMD = "conf,cons,crst,envi,ruok,srst,srvr,stat,wchs,dirs,mntr,isro,rcvr,apiv,csnp,lgif,rqld,rclc,clrs,ftfl,ydld";
const String KeeperConfigurationAndSettings::DEFAULT_FOUR_LETTER_WORD_CMD = "conf,cons,crst,envi,ruok,srst,srvr,stat,wchs,dirs,mntr,isro,rcvr,apiv,csnp,lgif,rqld,rclc,clrs,ftfl";
KeeperConfigurationAndSettings::KeeperConfigurationAndSettings()
: server_id(NOT_EXIST)

View File

@ -172,9 +172,6 @@ void FourLetterCommandFactory::registerCommands(KeeperDispatcher & keeper_dispat
FourLetterCommandPtr feature_flags_command = std::make_shared<FeatureFlagsCommand>(keeper_dispatcher);
factory.registerCommand(feature_flags_command);
FourLetterCommandPtr yield_leadership_command = std::make_shared<YieldLeadershipCommand>(keeper_dispatcher);
factory.registerCommand(yield_leadership_command);
factory.initializeAllowList(keeper_dispatcher);
factory.setInitialize(true);
}
@ -582,10 +579,4 @@ String FeatureFlagsCommand::run()
return ret.str();
}
String YieldLeadershipCommand::run()
{
keeper_dispatcher.yieldLeadership();
return "Sent yield leadership request to leader.";
}
}

View File

@ -415,17 +415,4 @@ struct FeatureFlagsCommand : public IFourLetterCommand
~FeatureFlagsCommand() override = default;
};
/// Yield leadership and become follower.
struct YieldLeadershipCommand : public IFourLetterCommand
{
explicit YieldLeadershipCommand(KeeperDispatcher & keeper_dispatcher_)
: IFourLetterCommand(keeper_dispatcher_)
{
}
String name() override { return "ydld"; }
String run() override;
~YieldLeadershipCommand() override = default;
};
}

View File

@ -17,6 +17,5 @@ const String keeper_system_path = "/keeper";
const String keeper_api_version_path = keeper_system_path + "/api_version";
const String keeper_api_feature_flags_path = keeper_system_path + "/feature_flags";
const String keeper_config_path = keeper_system_path + "/config";
const String keeper_availability_zone_path = keeper_system_path + "/availability_zone";
}

View File

@ -32,17 +32,9 @@ KeeperContext::KeeperContext(bool standalone_keeper_)
system_nodes_with_data[keeper_api_version_path] = toString(static_cast<uint8_t>(KeeperApiVersion::WITH_MULTI_READ));
}
void KeeperContext::initialize(const Poco::Util::AbstractConfiguration & config, KeeperDispatcher * dispatcher_, const std::string & environment_az)
void KeeperContext::initialize(const Poco::Util::AbstractConfiguration & config, KeeperDispatcher * dispatcher_)
{
dispatcher = dispatcher_;
/// We only use the environment availability zone when configuration option is missing.
auto keeper_az = config.getString("keeper_server.availability_zone", environment_az);
if (!keeper_az.empty())
system_nodes_with_data[keeper_availability_zone_path] = keeper_az;
LOG_INFO(&Poco::Logger::get("KeeperContext"),
"Initialize the KeeperContext with availability zone: '{}', environment availability zone '{}'. ", keeper_az, environment_az);
digest_enabled = config.getBool("keeper_server.digest_enabled", false);
ignore_system_path_on_startup = config.getBool("keeper_server.ignore_system_path_on_startup", false);

View File

@ -3,6 +3,7 @@
#include <Disks/DiskSelector.h>
#include <IO/WriteBufferFromString.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <cstdint>
#include <memory>
@ -23,7 +24,7 @@ public:
SHUTDOWN
};
void initialize(const Poco::Util::AbstractConfiguration & config, KeeperDispatcher * dispatcher_, const std::string & environment_az);
void initialize(const Poco::Util::AbstractConfiguration & config, KeeperDispatcher * dispatcher_);
Phase getServerState() const;
void setServerState(Phase server_state_);

View File

@ -11,7 +11,6 @@
#include <Common/CurrentMetrics.h>
#include <Common/ProfileEvents.h>
#include <Common/logger_useful.h>
#include <IO/S3/Credentials.h>
#include <atomic>
#include <future>
@ -371,16 +370,7 @@ void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & conf
configuration_and_settings = KeeperConfigurationAndSettings::loadFromConfig(config, standalone_keeper);
keeper_context = std::make_shared<KeeperContext>(standalone_keeper);
String availability_zone;
try
{
availability_zone = DB::S3::getRunningAvailabilityZone();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
keeper_context->initialize(config, this, availability_zone);
keeper_context->initialize(config, this);
requests_queue = std::make_unique<RequestsQueue>(configuration_and_settings->coordination_settings->max_request_queue_size);
request_thread = ThreadFromGlobalPool([this] { requestThread(); });

View File

@ -237,12 +237,6 @@ public:
return server->requestLeader();
}
/// Yield leadership and become follower.
void yieldLeadership()
{
return server->yieldLeadership();
}
void recalculateStorageStats()
{
return server->recalculateStorageStats();

View File

@ -1101,12 +1101,6 @@ bool KeeperServer::requestLeader()
return isLeader() || raft_instance->request_leadership();
}
void KeeperServer::yieldLeadership()
{
if (isLeader())
raft_instance->yield_leadership();
}
void KeeperServer::recalculateStorageStats()
{
state_machine->recalculateStorageStats();

View File

@ -144,8 +144,6 @@ public:
bool requestLeader();
void yieldLeadership();
void recalculateStorageStats();
};

View File

@ -1081,8 +1081,7 @@ struct KeeperStorageGetRequestProcessor final : public KeeperStorageRequestProce
Coordination::ZooKeeperGetRequest & request = dynamic_cast<Coordination::ZooKeeperGetRequest &>(*zk_request);
if (request.path == Coordination::keeper_api_feature_flags_path
|| request.path == Coordination::keeper_config_path
|| request.path == Coordination::keeper_availability_zone_path)
|| request.path == Coordination::keeper_config_path)
return {};
if (!storage.uncommitted_state.getNode(request.path))

View File

@ -1,9 +1,4 @@
#include <exception>
#include <variant>
#include <IO/S3/Credentials.h>
#include <boost/algorithm/string/classification.hpp>
#include <Poco/Exception.h>
#include "Common/Exception.h"
#if USE_AWS_S3
@ -16,7 +11,6 @@
# include <aws/core/utils/UUID.h>
# include <aws/core/http/HttpClientFactory.h>
# include <IO/S3/PocoHTTPClientFactory.h>
# include <aws/core/utils/HashingUtils.h>
# include <aws/core/platform/FileSystem.h>
@ -28,16 +22,6 @@
# include <fstream>
# include <base/EnumReflection.h>
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/split.hpp>
#include <Poco/URI.h>
#include <Poco/Net/HTTPClientSession.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Net/HTTPResponse.h>
#include <Poco/StreamCopier.h>
namespace DB
{
@ -45,8 +29,6 @@ namespace DB
namespace ErrorCodes
{
extern const int AWS_ERROR;
extern const int GCP_ERROR;
extern const int UNSUPPORTED_METHOD;
}
namespace S3
@ -169,6 +151,30 @@ Aws::String AWSEC2MetadataClient::getDefaultCredentialsSecurely() const
return GetResourceWithAWSWebServiceResult(credentials_request).GetPayload();
}
Aws::String AWSEC2MetadataClient::getCurrentAvailabilityZone() const
{
String user_agent_string = awsComputeUserAgentString();
auto [new_token, response_code] = getEC2MetadataToken(user_agent_string);
if (response_code != Aws::Http::HttpResponseCode::OK || new_token.empty())
throw DB::Exception(ErrorCodes::AWS_ERROR,
"Failed to make token request. HTTP response code: {}", response_code);
token = std::move(new_token);
const String url = endpoint + EC2_AVAILABILITY_ZONE_RESOURCE;
std::shared_ptr<Aws::Http::HttpRequest> profile_request(
Aws::Http::CreateHttpRequest(url, Aws::Http::HttpMethod::HTTP_GET, Aws::Utils::Stream::DefaultResponseStreamFactoryMethod));
profile_request->SetHeaderValue(EC2_IMDS_TOKEN_HEADER, token);
profile_request->SetUserAgent(user_agent_string);
const auto result = GetResourceWithAWSWebServiceResult(profile_request);
if (result.GetResponseCode() != Aws::Http::HttpResponseCode::OK)
throw DB::Exception(ErrorCodes::AWS_ERROR,
"Failed to get availability zone. HTTP response code: {}", result.GetResponseCode());
return Aws::Utils::StringUtils::Trim(result.GetPayload().c_str());
}
std::pair<Aws::String, Aws::Http::HttpResponseCode> AWSEC2MetadataClient::getEC2MetadataToken(const std::string & user_agent_string) const
{
std::lock_guard locker(token_mutex);
@ -193,10 +199,10 @@ Aws::String AWSEC2MetadataClient::getCurrentRegion() const
return Aws::Region::AWS_GLOBAL;
}
static Aws::String getAWSMetadataEndpoint()
std::shared_ptr<AWSEC2MetadataClient> InitEC2MetadataClient(const Aws::Client::ClientConfiguration & client_configuration)
{
auto * logger = &Poco::Logger::get("AWSEC2InstanceProfileConfigLoader");
Aws::String ec2_metadata_service_endpoint = Aws::Environment::GetEnv("AWS_EC2_METADATA_SERVICE_ENDPOINT");
auto * logger = &Poco::Logger::get("AWSEC2InstanceProfileConfigLoader");
if (ec2_metadata_service_endpoint.empty())
{
Aws::String ec2_metadata_service_endpoint_mode = Aws::Environment::GetEnv("AWS_EC2_METADATA_SERVICE_ENDPOINT_MODE");
@ -227,95 +233,8 @@ static Aws::String getAWSMetadataEndpoint()
}
}
}
return ec2_metadata_service_endpoint;
}
std::shared_ptr<AWSEC2MetadataClient> InitEC2MetadataClient(const Aws::Client::ClientConfiguration & client_configuration)
{
auto endpoint = getAWSMetadataEndpoint();
return std::make_shared<AWSEC2MetadataClient>(client_configuration, endpoint.c_str());
}
String AWSEC2MetadataClient::getAvailabilityZoneOrException()
{
Poco::URI uri(getAWSMetadataEndpoint() + EC2_AVAILABILITY_ZONE_RESOURCE);
Poco::Net::HTTPClientSession session(uri.getHost(), uri.getPort());
Poco::Net::HTTPResponse response;
Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, uri.getPath());
session.sendRequest(request);
std::istream & rs = session.receiveResponse(response);
if (response.getStatus() != Poco::Net::HTTPResponse::HTTP_OK)
throw DB::Exception(ErrorCodes::AWS_ERROR, "Failed to get AWS availability zone. HTTP response code: {}", response.getStatus());
String response_data;
Poco::StreamCopier::copyToString(rs, response_data);
return response_data;
}
String getGCPAvailabilityZoneOrException()
{
Poco::URI uri(String(GCP_METADATA_SERVICE_ENDPOINT) + "/computeMetadata/v1/instance/zone");
Poco::Net::HTTPClientSession session(uri.getHost(), uri.getPort());
Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, uri.getPath());
Poco::Net::HTTPResponse response;
request.set("Metadata-Flavor", "Google");
session.sendRequest(request);
std::istream & rs = session.receiveResponse(response);
if (response.getStatus() != Poco::Net::HTTPResponse::HTTP_OK)
throw DB::Exception(ErrorCodes::GCP_ERROR, "Failed to get GCP availability zone. HTTP response code: {}", response.getStatus());
String response_data;
Poco::StreamCopier::copyToString(rs, response_data);
Strings zone_info;
boost::split(zone_info, response_data, boost::is_any_of("/"));
/// We expect GCP returns a string as "projects/123456789/zones/us-central1a".
if (zone_info.size() != 4)
throw DB::Exception(ErrorCodes::GCP_ERROR, "Invalid format of GCP zone information, expect projects/<project-number>/zones/<zone-value>, got {}", response_data);
return zone_info[3];
}
String getRunningAvailabilityZoneImpl()
{
LOG_INFO(&Poco::Logger::get("Application"), "Trying to detect the availability zone.");
try
{
auto aws_az = AWSEC2MetadataClient::getAvailabilityZoneOrException();
return aws_az;
}
catch (const DB::Exception & aws_ex)
{
try
{
auto gcp_zone = getGCPAvailabilityZoneOrException();
return gcp_zone;
}
catch (const DB::Exception & gcp_ex)
{
throw DB::Exception(ErrorCodes::UNSUPPORTED_METHOD,
"Failed to find the availability zone, tried AWS and GCP. AWS Error: {}\nGCP Error: {}", aws_ex.displayText(), gcp_ex.displayText());
}
}
}
std::variant<String, std::exception_ptr> getRunningAvailabilityZoneImplOrException()
{
try
{
return getRunningAvailabilityZoneImpl();
}
catch (...)
{
return std::current_exception();
}
}
String getRunningAvailabilityZone()
{
static auto az_or_exception = getRunningAvailabilityZoneImplOrException();
if (const auto * az = std::get_if<String>(&az_or_exception))
return *az;
else
std::rethrow_exception(std::get<std::exception_ptr>(az_or_exception));
LOG_INFO(logger, "Using IMDS endpoint: {}", ec2_metadata_service_endpoint);
return std::make_shared<AWSEC2MetadataClient>(client_configuration, ec2_metadata_service_endpoint.c_str());
}
AWSEC2InstanceProfileConfigLoader::AWSEC2InstanceProfileConfigLoader(const std::shared_ptr<AWSEC2MetadataClient> & client_, bool use_secure_pull_)
@ -784,6 +703,7 @@ S3CredentialsProviderChain::S3CredentialsProviderChain(
aws_client_configuration.requestTimeoutMs = 1000;
aws_client_configuration.retryStrategy = std::make_shared<Aws::Client::DefaultRetryStrategy>(1, 1000);
auto ec2_metadata_client = InitEC2MetadataClient(aws_client_configuration);
auto config_loader = std::make_shared<AWSEC2InstanceProfileConfigLoader>(ec2_metadata_client, !credentials_configuration.use_insecure_imds_request);
@ -801,21 +721,4 @@ S3CredentialsProviderChain::S3CredentialsProviderChain(
}
#else
namespace DB
{
namespace S3
{
String getRunningAvailabilityZone()
{
throw Poco::Exception("Does not support availability zone detection for non-cloud environment");
}
}
}
#endif

View File

@ -1,8 +1,5 @@
#pragma once
#include <exception>
#include <base/types.h>
#include <variant>
#include "config.h"
#if USE_AWS_S3
@ -21,12 +18,6 @@ namespace DB::S3
inline static constexpr uint64_t DEFAULT_EXPIRATION_WINDOW_SECONDS = 120;
/// In GCP metadata service can be accessed via DNS regardless of IPv4 or IPv6.
static constexpr char GCP_METADATA_SERVICE_ENDPOINT[] = "http://metadata.google.internal";
/// getRunningAvailabilityZone returns the availability zone of the underlying compute resources where the current process runs.
String getRunningAvailabilityZone();
class AWSEC2MetadataClient : public Aws::Internal::AWSHttpResourceClient
{
static constexpr char EC2_SECURITY_CREDENTIALS_RESOURCE[] = "/latest/meta-data/iam/security-credentials";
@ -59,11 +50,10 @@ public:
virtual Aws::String getCurrentRegion() const;
friend String getRunningAvailabilityZoneImpl();
virtual Aws::String getCurrentAvailabilityZone() const;
private:
std::pair<Aws::String, Aws::Http::HttpResponseCode> getEC2MetadataToken(const std::string & user_agent_string) const;
static String getAvailabilityZoneOrException();
const Aws::String endpoint;
mutable std::recursive_mutex token_mutex;
@ -187,15 +177,4 @@ public:
}
#else
namespace DB
{
namespace S3
{
String getRunningAvailabilityZone();
}
}
#endif

View File

@ -411,7 +411,6 @@ struct MinMaxProjectionCandidate
{
AggregateProjectionCandidate candidate;
Block block;
MergeTreeData::DataPartsVector normal_parts;
};
struct AggregateProjectionCandidates
@ -477,7 +476,6 @@ AggregateProjectionCandidates getAggregateProjectionCandidates(
{
// LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Projection analyzed DAG {}", proj_dag->dumpDAG());
AggregateProjectionCandidate candidate{.info = std::move(info), .dag = std::move(proj_dag)};
MergeTreeData::DataPartsVector minmax_projection_normal_parts;
// LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Projection sample block {}", sample_block.dumpStructure());
auto block = reading.getMergeTreeData().getMinMaxCountProjectionBlock(
@ -486,13 +484,12 @@ AggregateProjectionCandidates getAggregateProjectionCandidates(
dag.filter_node != nullptr,
query_info,
parts,
minmax_projection_normal_parts,
max_added_blocks.get(),
context);
// LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Projection sample block 2 {}", block.dumpStructure());
// minmax_count_projection cannot be used used when there is no data to process, because
// minmax_count_projection cannot be used when there is no data to process, because
// it will produce incorrect result during constant aggregation.
// See https://github.com/ClickHouse/ClickHouse/issues/36728
if (block)
@ -500,7 +497,6 @@ AggregateProjectionCandidates getAggregateProjectionCandidates(
MinMaxProjectionCandidate minmax;
minmax.candidate = std::move(candidate);
minmax.block = std::move(block);
minmax.normal_parts = std::move(minmax_projection_normal_parts);
minmax.candidate.projection = projection;
candidates.minmax_projection.emplace(std::move(minmax));
}
@ -509,6 +505,18 @@ AggregateProjectionCandidates getAggregateProjectionCandidates(
if (!candidates.minmax_projection)
{
auto it = std::find_if(agg_projections.begin(), agg_projections.end(), [&](const auto * projection)
{
return projection->name == context->getSettings().preferred_optimize_projection_name.value;
});
if (it != agg_projections.end())
{
const ProjectionDescription * preferred_projection = *it;
agg_projections.clear();
agg_projections.push_back(preferred_projection);
}
candidates.real.reserve(agg_projections.size());
for (const auto * projection : agg_projections)
{
@ -570,57 +578,74 @@ bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes &
auto candidates = getAggregateProjectionCandidates(node, *aggregating, *reading, max_added_blocks, allow_implicit_projections);
AggregateProjectionCandidate * best_candidate = nullptr;
if (candidates.minmax_projection)
best_candidate = &candidates.minmax_projection->candidate;
else if (candidates.real.empty())
return false;
const auto & parts = reading->getParts();
const auto & alter_conversions = reading->getAlterConvertionsForParts();
const auto & query_info = reading->getQueryInfo();
const auto metadata = reading->getStorageMetadata();
ContextPtr context = reading->getContext();
MergeTreeDataSelectExecutor reader(reading->getMergeTreeData());
auto ordinary_reading_select_result = reading->selectRangesToRead(parts, /* alter_conversions = */ {});
size_t ordinary_reading_marks = ordinary_reading_select_result->marks();
const auto & proj_name_from_settings = context->getSettings().preferred_optimize_projection_name.value;
bool found_best_candidate = false;
/// Selecting best candidate.
for (auto & candidate : candidates.real)
AggregateProjectionCandidate * best_candidate = nullptr;
if (candidates.minmax_projection)
{
auto required_column_names = candidate.dag->getRequiredColumnsNames();
ActionDAGNodes added_filter_nodes;
if (candidates.has_filter)
added_filter_nodes.nodes.push_back(candidate.dag->getOutputs().front());
best_candidate = &candidates.minmax_projection->candidate;
}
else if (!candidates.real.empty())
{
auto ordinary_reading_select_result = reading->selectRangesToRead(parts, alter_conversions);
size_t ordinary_reading_marks = ordinary_reading_select_result->marks();
bool analyzed = analyzeProjectionCandidate(
candidate, *reading, reader, required_column_names, parts,
metadata, query_info, context, max_added_blocks, added_filter_nodes);
if (!analyzed)
continue;
if (candidate.sum_marks > ordinary_reading_marks)
continue;
if ((best_candidate == nullptr || best_candidate->sum_marks > candidate.sum_marks) && !found_best_candidate)
best_candidate = &candidate;
if (!proj_name_from_settings.empty() && candidate.projection->name == proj_name_from_settings)
/// Nothing to read. Ignore projections.
if (ordinary_reading_marks == 0)
{
best_candidate = &candidate;
found_best_candidate = true;
reading->setAnalyzedResult(std::move(ordinary_reading_select_result));
return false;
}
const auto & parts_with_ranges = ordinary_reading_select_result->partsWithRanges();
/// Selecting best candidate.
for (auto & candidate : candidates.real)
{
auto required_column_names = candidate.dag->getRequiredColumnsNames();
ActionDAGNodes added_filter_nodes;
if (candidates.has_filter)
added_filter_nodes.nodes.push_back(candidate.dag->getOutputs().front());
bool analyzed = analyzeProjectionCandidate(
candidate,
*reading,
reader,
required_column_names,
parts_with_ranges,
metadata,
query_info,
context,
max_added_blocks,
added_filter_nodes);
if (!analyzed)
continue;
if (candidate.sum_marks > ordinary_reading_marks)
continue;
if (best_candidate == nullptr || best_candidate->sum_marks > candidate.sum_marks)
best_candidate = &candidate;
}
if (!best_candidate)
{
reading->setAnalyzedResult(std::move(ordinary_reading_select_result));
return false;
}
}
if (!best_candidate)
else
{
reading->setAnalyzedResult(std::move(ordinary_reading_select_result));
return false;
}
chassert(best_candidate != nullptr);
QueryPlanStepPtr projection_reading;
bool has_ordinary_parts;
@ -641,9 +666,7 @@ bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes &
.storage_id = reading->getMergeTreeData().getStorageID(),
.projection_name = candidates.minmax_projection->candidate.projection->name,
});
has_ordinary_parts = !candidates.minmax_projection->normal_parts.empty();
if (has_ordinary_parts)
reading->resetParts(std::move(candidates.minmax_projection->normal_parts));
has_ordinary_parts = false;
}
else
{

View File

@ -10,7 +10,7 @@
#include <Storages/ProjectionsDescription.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <stack>
#include <algorithm>
namespace DB::QueryPlanOptimizations
{
@ -109,6 +109,19 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
if (normal_projections.empty())
return false;
ContextPtr context = reading->getContext();
auto it = std::find_if(normal_projections.begin(), normal_projections.end(), [&](const auto * projection)
{
return projection->name == context->getSettings().preferred_optimize_projection_name.value;
});
if (it != normal_projections.end())
{
const ProjectionDescription * preferred_projection = *it;
normal_projections.clear();
normal_projections.push_back(preferred_projection);
}
QueryDAG query;
{
auto & child = iter->node->children[iter->next_child - 1];
@ -124,30 +137,24 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
const Names & required_columns = reading->getRealColumnNames();
const auto & parts = reading->getParts();
const auto & alter_conversions = reading->getAlterConvertionsForParts();
const auto & query_info = reading->getQueryInfo();
ContextPtr context = reading->getContext();
MergeTreeDataSelectExecutor reader(reading->getMergeTreeData());
auto ordinary_reading_select_result = reading->selectRangesToRead(parts, /* alter_conversions = */ {});
auto ordinary_reading_select_result = reading->selectRangesToRead(parts, alter_conversions);
size_t ordinary_reading_marks = ordinary_reading_select_result->marks();
std::shared_ptr<PartitionIdToMaxBlock> max_added_blocks = getMaxAddedBlocks(reading);
// Here we iterate over the projections and check if we have the same projections as we specified in preferred_projection_name
bool is_projection_found = false;
const auto & proj_name_from_settings = context->getSettings().preferred_optimize_projection_name.value;
if (!proj_name_from_settings.empty())
/// Nothing to read. Ignore projections.
if (ordinary_reading_marks == 0)
{
for (const auto * projection : normal_projections)
{
if (projection->name == proj_name_from_settings)
{
is_projection_found = true;
break;
}
}
reading->setAnalyzedResult(std::move(ordinary_reading_select_result));
return false;
}
const auto & parts_with_ranges = ordinary_reading_select_result->partsWithRanges();
std::shared_ptr<PartitionIdToMaxBlock> max_added_blocks = getMaxAddedBlocks(reading);
for (const auto * projection : normal_projections)
{
if (!hasAllRequiredColumns(projection, required_columns))
@ -161,8 +168,16 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
added_filter_nodes.nodes.push_back(query.filter_node);
bool analyzed = analyzeProjectionCandidate(
candidate, *reading, reader, required_columns, parts,
metadata, query_info, context, max_added_blocks, added_filter_nodes);
candidate,
*reading,
reader,
required_columns,
parts_with_ranges,
metadata,
query_info,
context,
max_added_blocks,
added_filter_nodes);
if (!analyzed)
continue;
@ -170,9 +185,7 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
if (candidate.sum_marks >= ordinary_reading_marks)
continue;
if (!is_projection_found && (best_candidate == nullptr || candidate.sum_marks < best_candidate->sum_marks))
best_candidate = &candidate;
else if (is_projection_found && projection->name == proj_name_from_settings)
if (best_candidate == nullptr || candidate.sum_marks < best_candidate->sum_marks)
best_candidate = &candidate;
}

View File

@ -210,7 +210,7 @@ bool analyzeProjectionCandidate(
const ReadFromMergeTree & reading,
const MergeTreeDataSelectExecutor & reader,
const Names & required_column_names,
const MergeTreeData::DataPartsVector & parts,
const RangesInDataParts & parts_with_ranges,
const StorageMetadataPtr & metadata,
const SelectQueryInfo & query_info,
const ContextPtr & context,
@ -219,14 +219,20 @@ bool analyzeProjectionCandidate(
{
MergeTreeData::DataPartsVector projection_parts;
MergeTreeData::DataPartsVector normal_parts;
for (const auto & part : parts)
std::vector<AlterConversionsPtr> alter_conversions;
for (const auto & part_with_ranges : parts_with_ranges)
{
const auto & created_projections = part->getProjectionParts();
const auto & created_projections = part_with_ranges.data_part->getProjectionParts();
auto it = created_projections.find(candidate.projection->name);
if (it != created_projections.end())
{
projection_parts.push_back(it->second);
}
else
normal_parts.push_back(part);
{
normal_parts.push_back(part_with_ranges.data_part);
alter_conversions.push_back(part_with_ranges.alter_conversions);
}
}
if (projection_parts.empty())
@ -252,7 +258,8 @@ bool analyzeProjectionCandidate(
if (!normal_parts.empty())
{
auto normal_result_ptr = reading.selectRangesToRead(std::move(normal_parts), /* alter_conversions = */ {});
/// TODO: We can reuse existing analysis_result by filtering out projection parts
auto normal_result_ptr = reading.selectRangesToRead(std::move(normal_parts), std::move(alter_conversions));
if (normal_result_ptr->error())
return false;

View File

@ -19,6 +19,7 @@ using MergeTreeDataSelectAnalysisResultPtr = std::shared_ptr<MergeTreeDataSelect
class IMergeTreeDataPart;
using DataPartPtr = std::shared_ptr<const IMergeTreeDataPart>;
using DataPartsVector = std::vector<DataPartPtr>;
struct RangesInDataParts;
struct StorageInMemoryMetadata;
using StorageMetadataPtr = std::shared_ptr<const StorageInMemoryMetadata>;
@ -71,7 +72,7 @@ bool analyzeProjectionCandidate(
const ReadFromMergeTree & reading,
const MergeTreeDataSelectExecutor & reader,
const Names & required_column_names,
const DataPartsVector & parts,
const RangesInDataParts & parts_with_ranges,
const StorageMetadataPtr & metadata,
const SelectQueryInfo & query_info,
const ContextPtr & context,

View File

@ -2258,10 +2258,7 @@ size_t MergeTreeDataSelectAnalysisResult::marks() const
if (std::holds_alternative<std::exception_ptr>(result))
std::rethrow_exception(std::get<std::exception_ptr>(result));
const auto & index_stats = std::get<ReadFromMergeTree::AnalysisResult>(result).index_stats;
if (index_stats.empty())
return 0;
return index_stats.back().num_granules_after;
return std::get<ReadFromMergeTree::AnalysisResult>(result).selected_marks;
}
UInt64 MergeTreeDataSelectAnalysisResult::rows() const
@ -2269,9 +2266,15 @@ UInt64 MergeTreeDataSelectAnalysisResult::rows() const
if (std::holds_alternative<std::exception_ptr>(result))
std::rethrow_exception(std::get<std::exception_ptr>(result));
const auto & index_stats = std::get<ReadFromMergeTree::AnalysisResult>(result).index_stats;
if (index_stats.empty())
return 0;
return std::get<ReadFromMergeTree::AnalysisResult>(result).selected_rows;
}
const RangesInDataParts & MergeTreeDataSelectAnalysisResult::partsWithRanges() const
{
if (std::holds_alternative<std::exception_ptr>(result))
std::rethrow_exception(std::get<std::exception_ptr>(result));
return std::get<ReadFromMergeTree::AnalysisResult>(result).parts_with_ranges;
}
}

View File

@ -197,13 +197,9 @@ public:
bool hasAnalyzedResult() const { return analyzed_result_ptr != nullptr; }
void setAnalyzedResult(MergeTreeDataSelectAnalysisResultPtr analyzed_result_ptr_) { analyzed_result_ptr = std::move(analyzed_result_ptr_); }
void resetParts(MergeTreeData::DataPartsVector parts)
{
prepared_parts = std::move(parts);
alter_conversions_for_parts = {};
}
const MergeTreeData::DataPartsVector & getParts() const { return prepared_parts; }
const std::vector<AlterConversionsPtr> & getAlterConvertionsForParts() const { return alter_conversions_for_parts; }
const MergeTreeData & getMergeTreeData() const { return data; }
size_t getMaxBlockSize() const { return block_size.max_block_size_rows; }
size_t getNumStreams() const { return requested_num_streams; }
@ -310,6 +306,7 @@ struct MergeTreeDataSelectAnalysisResult
bool error() const;
size_t marks() const;
UInt64 rows() const;
const RangesInDataParts & partsWithRanges() const;
};
}

View File

@ -6498,7 +6498,6 @@ Block MergeTreeData::getMinMaxCountProjectionBlock(
bool has_filter,
const SelectQueryInfo & query_info,
const DataPartsVector & parts,
DataPartsVector & normal_parts,
const PartitionIdToMaxBlock * max_block_numbers_to_read,
ContextPtr query_context) const
{
@ -6623,11 +6622,11 @@ Block MergeTreeData::getMinMaxCountProjectionBlock(
continue;
}
/// It's extremely rare that some parts have final marks while others don't. To make it
/// straightforward, disable minmax_count projection when `max(pk)' encounters any part with
/// no final mark.
if (need_primary_key_max_column && !part->index_granularity.hasFinalMark())
{
normal_parts.push_back(part);
continue;
}
return {};
real_parts.push_back(part);
filter_column_data.back() = 1;

View File

@ -401,17 +401,12 @@ public:
/// query_info - used to filter unneeded parts
///
/// parts - part set to filter
///
/// normal_parts - collects parts that don't have all the needed values to form the block.
/// Specifically, this is when a part doesn't contain a final mark and the related max value is
/// required.
Block getMinMaxCountProjectionBlock(
const StorageMetadataPtr & metadata_snapshot,
const Names & required_columns,
bool has_filter,
const SelectQueryInfo & query_info,
const DataPartsVector & parts,
DataPartsVector & normal_parts,
const PartitionIdToMaxBlock * max_block_numbers_to_read,
ContextPtr query_context) const;

View File

@ -828,8 +828,8 @@ std::optional<std::unordered_set<String>> MergeTreeDataSelectExecutor::filterPar
}
void MergeTreeDataSelectExecutor::filterPartsByPartition(
std::optional<PartitionPruner> & partition_pruner,
std::optional<KeyCondition> & minmax_idx_condition,
const std::optional<PartitionPruner> & partition_pruner,
const std::optional<KeyCondition> & minmax_idx_condition,
MergeTreeData::DataPartsVector & parts,
std::vector<AlterConversionsPtr> & alter_conversions,
const std::optional<std::unordered_set<String>> & part_values,
@ -1288,6 +1288,8 @@ MergeTreeDataSelectAnalysisResultPtr MergeTreeDataSelectExecutor::estimateNumMar
selectColumnNames(column_names_to_return, data, real_column_names, virt_column_names, sample_factor_column_queried);
std::optional<ReadFromMergeTree::Indexes> indexes;
/// NOTE: We don't need alter_conversions because the returned analysis_result is only used for:
/// 1. estimate the number of rows to read; 2. projection reading, which doesn't have alter_conversions.
return ReadFromMergeTree::selectRangesToRead(
std::move(parts),
/*alter_conversions=*/ {},
@ -1824,7 +1826,7 @@ void MergeTreeDataSelectExecutor::selectPartsToRead(
const std::optional<std::unordered_set<String>> & part_values,
const std::optional<KeyCondition> & minmax_idx_condition,
const DataTypes & minmax_columns_types,
std::optional<PartitionPruner> & partition_pruner,
const std::optional<PartitionPruner> & partition_pruner,
const PartitionIdToMaxBlock * max_block_numbers_to_read,
PartFilterCounters & counters)
{
@ -1886,7 +1888,7 @@ void MergeTreeDataSelectExecutor::selectPartsToReadWithUUIDFilter(
MergeTreeData::PinnedPartUUIDsPtr pinned_part_uuids,
const std::optional<KeyCondition> & minmax_idx_condition,
const DataTypes & minmax_columns_types,
std::optional<PartitionPruner> & partition_pruner,
const std::optional<PartitionPruner> & partition_pruner,
const PartitionIdToMaxBlock * max_block_numbers_to_read,
ContextPtr query_context,
PartFilterCounters & counters,

View File

@ -126,7 +126,7 @@ private:
const std::optional<std::unordered_set<String>> & part_values,
const std::optional<KeyCondition> & minmax_idx_condition,
const DataTypes & minmax_columns_types,
std::optional<PartitionPruner> & partition_pruner,
const std::optional<PartitionPruner> & partition_pruner,
const PartitionIdToMaxBlock * max_block_numbers_to_read,
PartFilterCounters & counters);
@ -138,7 +138,7 @@ private:
MergeTreeData::PinnedPartUUIDsPtr pinned_part_uuids,
const std::optional<KeyCondition> & minmax_idx_condition,
const DataTypes & minmax_columns_types,
std::optional<PartitionPruner> & partition_pruner,
const std::optional<PartitionPruner> & partition_pruner,
const PartitionIdToMaxBlock * max_block_numbers_to_read,
ContextPtr query_context,
PartFilterCounters & counters,
@ -178,8 +178,8 @@ public:
/// Filter parts using minmax index and partition key.
static void filterPartsByPartition(
std::optional<PartitionPruner> & partition_pruner,
std::optional<KeyCondition> & minmax_idx_condition,
const std::optional<PartitionPruner> & partition_pruner,
const std::optional<KeyCondition> & minmax_idx_condition,
MergeTreeData::DataPartsVector & parts,
std::vector<AlterConversionsPtr> & alter_conversions,
const std::optional<std::unordered_set<String>> & part_values,

View File

@ -31,7 +31,7 @@ PartitionPruner::PartitionPruner(const StorageMetadataPtr & metadata, ActionsDAG
{
}
bool PartitionPruner::canBePruned(const IMergeTreeDataPart & part)
bool PartitionPruner::canBePruned(const IMergeTreeDataPart & part) const
{
if (part.isEmpty())
return true;

View File

@ -16,14 +16,15 @@ public:
PartitionPruner(const StorageMetadataPtr & metadata, const SelectQueryInfo & query_info, ContextPtr context, bool strict);
PartitionPruner(const StorageMetadataPtr & metadata, ActionsDAGPtr filter_actions_dag, ContextPtr context, bool strict);
bool canBePruned(const IMergeTreeDataPart & part);
bool canBePruned(const IMergeTreeDataPart & part) const;
bool isUseless() const { return useless; }
const KeyCondition & getKeyCondition() const { return partition_condition; }
private:
std::unordered_map<String, bool> partition_filter_map;
/// Cache already analyzed partitions.
mutable std::unordered_map<String, bool> partition_filter_map;
/// partition_key is adjusted here (with substitution from modulo to moduloLegacy).
KeyDescription partition_key;

View File

@ -341,6 +341,8 @@ void StorageMergeTree::alter(
prev_mutation = it->first;
}
/// Always wait previous mutations synchronously, because alters
/// should be executed in sequential order.
if (prev_mutation != 0)
{
LOG_DEBUG(log, "Cannot change metadata with barrier alter query, will wait for mutation {}", prev_mutation);
@ -368,9 +370,7 @@ void StorageMergeTree::alter(
resetObjectColumnsFromActiveParts(parts_lock);
}
/// Always execute required mutations synchronously, because alters
/// should be executed in sequential order.
if (!maybe_mutation_commands.empty())
if (!maybe_mutation_commands.empty() && local_context->getSettingsRef().alter_sync > 0)
waitForMutation(mutation_version, false);
}

View File

@ -11,7 +11,6 @@
<keeper_server>
<tcp_port>2181</tcp_port>
<availability_zone>az-zoo1</availability_zone>
<server_id>1</server_id>
<coordination_settings>

View File

@ -12,7 +12,6 @@
<keeper_server>
<tcp_port>2181</tcp_port>
<server_id>2</server_id>
<availability_zone>az-zoo2</availability_zone>
<coordination_settings>
<operation_timeout_ms>10000</operation_timeout_ms>

View File

@ -248,11 +248,6 @@ def is_leader(cluster, node, port=9181):
return "Mode: leader" in stat
def is_follower(cluster, node, port=9181):
stat = send_4lw_cmd(cluster, node, "stat", port)
return "Mode: follower" in stat
def get_leader(cluster, nodes):
for node in nodes:
if is_leader(cluster, node):

View File

@ -1,2 +0,0 @@
<clickhouse>
</clickhouse>

View File

@ -1,31 +0,0 @@
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.keeper_utils import KeeperClient
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node",
main_configs=["configs/keeper_config.xml"],
with_zookeeper=True,
stay_alive=True,
)
@pytest.fixture(scope="module", autouse=True)
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_get_availability_zone():
with KeeperClient.from_cluster(cluster, "zoo1") as client1:
assert client1.get("/keeper/availability_zone") == "az-zoo1"
with KeeperClient.from_cluster(cluster, "zoo2") as client2:
assert client2.get("/keeper/availability_zone") == "az-zoo2"

View File

@ -183,8 +183,8 @@ def test_cmd_mntr(started_cluster):
# contains:
# 10 nodes created by test
# 3 nodes created by clickhouse "/clickhouse/task_queue/ddl"
# 1 root node, 4 keeper system nodes
assert int(result["zk_znode_count"]) == 15
# 1 root node, 3 keeper system nodes
assert int(result["zk_znode_count"]) == 14
assert int(result["zk_watch_count"]) == 2
assert int(result["zk_ephemerals_count"]) == 2
assert int(result["zk_approximate_data_size"]) > 0
@ -333,7 +333,7 @@ def test_cmd_srvr(started_cluster):
assert int(result["Connections"]) == 1
assert int(result["Zxid"], 16) > 10
assert result["Mode"] == "leader"
assert result["Node count"] == "15"
assert result["Node count"] == "14"
finally:
destroy_zk_client(zk)
@ -373,7 +373,7 @@ def test_cmd_stat(started_cluster):
assert int(result["Connections"]) == 1
assert int(result["Zxid"], 16) >= 10
assert result["Mode"] == "leader"
assert result["Node count"] == "15"
assert result["Node count"] == "14"
# filter connection statistics
cons = [n for n in data.split("\n") if "=" in n]
@ -725,26 +725,3 @@ def test_cmd_clrs(started_cluster):
finally:
destroy_zk_client(zk)
def test_cmd_ydld(started_cluster):
wait_nodes()
for node in [node1, node3]:
data = keeper_utils.send_4lw_cmd(cluster, node, cmd="ydld")
assert data == "Sent yield leadership request to leader."
print("ydld output -------------------------------------")
print(data)
if keeper_utils.is_leader(cluster, node):
# wait for it to yield leadership
retry = 0
while keeper_utils.is_leader(cluster, node) and retry < 30:
time.sleep(1)
retry += 1
if retry == 30:
print(
node.name
+ " did not yield leadership after 30s, maybe there is something wrong."
)
assert keeper_utils.is_follower(cluster, node)

View File

@ -0,0 +1 @@
Selected 2/2 parts by partition key, 1 parts by primary key, 1/2 marks by primary key, 1 marks to read from 1 ranges

View File

@ -0,0 +1,16 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
${CLICKHOUSE_CLIENT} -q "drop table if exists t"
${CLICKHOUSE_CLIENT} -q "create table t(s LowCardinality(String), e DateTime64(3), projection p1 (select * order by s, e)) engine MergeTree partition by toYYYYMM(e) order by tuple() settings index_granularity = 8192, index_granularity_bytes = '100M'"
${CLICKHOUSE_CLIENT} -q "insert into t select 'AAP', toDateTime('2023-07-01') + 360 * number from numbers(50000)"
${CLICKHOUSE_CLIENT} -q "insert into t select 'AAPL', toDateTime('2023-07-01') + 360 * number from numbers(50000)"
CLICKHOUSE_CLIENT_DEBUG_LOG=$(echo ${CLICKHOUSE_CLIENT} | sed 's/'"--send_logs_level=${CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL}"'/--send_logs_level=debug/g')
${CLICKHOUSE_CLIENT_DEBUG_LOG} -q "select count() from t where e >= '2023-11-08 00:00:00.000' and e < '2023-11-09 00:00:00.000' and s in ('AAPL') format Null" 2>&1 | grep -oh "Selected .* parts by partition key, *. parts by primary key, .* marks by primary key, .* marks to read from .* ranges.*$"
${CLICKHOUSE_CLIENT} -q "drop table t"

View File

@ -0,0 +1,15 @@
drop table if exists t;
create table t (i int, j int, projection p (select i order by i)) engine MergeTree order by tuple();
insert into t values (1, 2);
system stop merges t;
set alter_sync = 0;
alter table t rename column j to k;
select * from t;
drop table t;

View File

@ -1,6 +1,6 @@
drop table if exists projection_test;
create table projection_test (`sum(block_count)` UInt64, domain_alias UInt64 alias length(domain), datetime DateTime, domain LowCardinality(String), x_id String, y_id String, block_count Int64, retry_count Int64, duration Int64, kbytes Int64, buffer_time Int64, first_time Int64, total_bytes Nullable(UInt64), valid_bytes Nullable(UInt64), completed_bytes Nullable(UInt64), fixed_bytes Nullable(UInt64), force_bytes Nullable(UInt64), projection p (select toStartOfMinute(datetime) dt_m, countIf(first_time = 0) / count(), avg((kbytes * 8) / duration), count(), sum(block_count) / sum(duration), avg(block_count / duration), sum(buffer_time) / sum(duration), avg(buffer_time / duration), sum(valid_bytes) / sum(total_bytes), sum(completed_bytes) / sum(total_bytes), sum(fixed_bytes) / sum(total_bytes), sum(force_bytes) / sum(total_bytes), sum(valid_bytes) / sum(total_bytes), sum(retry_count) / sum(duration), avg(retry_count / duration), countIf(block_count > 0) / count(), countIf(first_time = 0) / count(), uniqHLL12(x_id), uniqHLL12(y_id) group by dt_m, domain)) engine MergeTree partition by toDate(datetime) order by (toStartOfTenMinutes(datetime), domain) settings index_granularity_bytes = 10000000;
create table projection_test (`sum(block_count)` UInt64, domain_alias UInt64 alias length(domain), datetime DateTime, domain LowCardinality(String), x_id String, y_id String, block_count Int64, retry_count Int64, duration Int64, kbytes Int64, buffer_time Int64, first_time Int64, total_bytes Nullable(UInt64), valid_bytes Nullable(UInt64), completed_bytes Nullable(UInt64), fixed_bytes Nullable(UInt64), force_bytes Nullable(UInt64), projection p (select toStartOfMinute(datetime) dt_m, countIf(first_time = 0) / count(), avg((kbytes * 8) / duration), count(), sum(block_count) / sum(duration), avg(block_count / duration), sum(buffer_time) / sum(duration), avg(buffer_time / duration), sum(valid_bytes) / sum(total_bytes), sum(completed_bytes) / sum(total_bytes), sum(fixed_bytes) / sum(total_bytes), sum(force_bytes) / sum(total_bytes), sum(valid_bytes) / sum(total_bytes), sum(retry_count) / sum(duration), avg(retry_count / duration), countIf(block_count > 0) / count(), countIf(first_time = 0) / count(), uniqHLL12(x_id), uniqHLL12(y_id) group by dt_m, domain)) engine MergeTree partition by toDate(datetime) order by toStartOfTenMinutes(datetime) settings index_granularity_bytes = 10000000;
insert into projection_test with rowNumberInAllBlocks() as id select 1, toDateTime('2020-10-24 00:00:00') + (id / 20), toString(id % 100), * from generateRandom('x_id String, y_id String, block_count Int64, retry_count Int64, duration Int64, kbytes Int64, buffer_time Int64, first_time Int64, total_bytes Nullable(UInt64), valid_bytes Nullable(UInt64), completed_bytes Nullable(UInt64), fixed_bytes Nullable(UInt64), force_bytes Nullable(UInt64)', 10, 10, 1) limit 1000 settings max_threads = 1;