mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
Merge branch 'master' into add-test-1000-exceptions
This commit is contained in:
commit
d5b652d627
@ -209,8 +209,8 @@ std::vector<String> Client::loadWarningMessages()
|
|||||||
{} /* query_parameters */,
|
{} /* query_parameters */,
|
||||||
"" /* query_id */,
|
"" /* query_id */,
|
||||||
QueryProcessingStage::Complete,
|
QueryProcessingStage::Complete,
|
||||||
&global_context->getSettingsRef(),
|
&client_context->getSettingsRef(),
|
||||||
&global_context->getClientInfo(), false, {});
|
&client_context->getClientInfo(), false, {});
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
Packet packet = connection->receivePacket();
|
Packet packet = connection->receivePacket();
|
||||||
@ -306,9 +306,6 @@ void Client::initialize(Poco::Util::Application & self)
|
|||||||
if (env_password && !config().has("password"))
|
if (env_password && !config().has("password"))
|
||||||
config().setString("password", env_password);
|
config().setString("password", env_password);
|
||||||
|
|
||||||
// global_context->setApplicationType(Context::ApplicationType::CLIENT);
|
|
||||||
global_context->setQueryParameters(query_parameters);
|
|
||||||
|
|
||||||
/// settings and limits could be specified in config file, but passed settings has higher priority
|
/// settings and limits could be specified in config file, but passed settings has higher priority
|
||||||
for (const auto & setting : global_context->getSettingsRef().allUnchanged())
|
for (const auto & setting : global_context->getSettingsRef().allUnchanged())
|
||||||
{
|
{
|
||||||
@ -382,7 +379,7 @@ try
|
|||||||
showWarnings();
|
showWarnings();
|
||||||
|
|
||||||
/// Set user password complexity rules
|
/// Set user password complexity rules
|
||||||
auto & access_control = global_context->getAccessControl();
|
auto & access_control = client_context->getAccessControl();
|
||||||
access_control.setPasswordComplexityRules(connection->getPasswordComplexityRules());
|
access_control.setPasswordComplexityRules(connection->getPasswordComplexityRules());
|
||||||
|
|
||||||
if (is_interactive && !delayed_interactive)
|
if (is_interactive && !delayed_interactive)
|
||||||
@ -459,7 +456,7 @@ void Client::connect()
|
|||||||
<< connection_parameters.host << ":" << connection_parameters.port
|
<< connection_parameters.host << ":" << connection_parameters.port
|
||||||
<< (!connection_parameters.user.empty() ? " as user " + connection_parameters.user : "") << "." << std::endl;
|
<< (!connection_parameters.user.empty() ? " as user " + connection_parameters.user : "") << "." << std::endl;
|
||||||
|
|
||||||
connection = Connection::createConnection(connection_parameters, global_context);
|
connection = Connection::createConnection(connection_parameters, client_context);
|
||||||
|
|
||||||
if (max_client_network_bandwidth)
|
if (max_client_network_bandwidth)
|
||||||
{
|
{
|
||||||
@ -528,7 +525,7 @@ void Client::connect()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!global_context->getSettingsRef().use_client_time_zone)
|
if (!client_context->getSettingsRef().use_client_time_zone)
|
||||||
{
|
{
|
||||||
const auto & time_zone = connection->getServerTimezone(connection_parameters.timeouts);
|
const auto & time_zone = connection->getServerTimezone(connection_parameters.timeouts);
|
||||||
if (!time_zone.empty())
|
if (!time_zone.empty())
|
||||||
@ -611,7 +608,7 @@ void Client::printChangedSettings() const
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
print_changes(global_context->getSettingsRef().changes(), "settings");
|
print_changes(client_context->getSettingsRef().changes(), "settings");
|
||||||
print_changes(cmd_merge_tree_settings.changes(), "MergeTree settings");
|
print_changes(cmd_merge_tree_settings.changes(), "MergeTree settings");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -709,7 +706,7 @@ bool Client::processWithFuzzing(const String & full_query)
|
|||||||
{
|
{
|
||||||
const char * begin = full_query.data();
|
const char * begin = full_query.data();
|
||||||
orig_ast = parseQuery(begin, begin + full_query.size(),
|
orig_ast = parseQuery(begin, begin + full_query.size(),
|
||||||
global_context->getSettingsRef(),
|
client_context->getSettingsRef(),
|
||||||
/*allow_multi_statements=*/ true);
|
/*allow_multi_statements=*/ true);
|
||||||
}
|
}
|
||||||
catch (const Exception & e)
|
catch (const Exception & e)
|
||||||
@ -733,7 +730,7 @@ bool Client::processWithFuzzing(const String & full_query)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Kusto is not a subject for fuzzing (yet)
|
// Kusto is not a subject for fuzzing (yet)
|
||||||
if (global_context->getSettingsRef().dialect == DB::Dialect::kusto)
|
if (client_context->getSettingsRef().dialect == DB::Dialect::kusto)
|
||||||
{
|
{
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -1166,6 +1163,11 @@ void Client::processOptions(const OptionsDescription & options_description,
|
|||||||
|
|
||||||
if (options.count("opentelemetry-tracestate"))
|
if (options.count("opentelemetry-tracestate"))
|
||||||
global_context->getClientTraceContext().tracestate = options["opentelemetry-tracestate"].as<std::string>();
|
global_context->getClientTraceContext().tracestate = options["opentelemetry-tracestate"].as<std::string>();
|
||||||
|
|
||||||
|
/// In case of clickhouse-client the `client_context` can be just an alias for the `global_context`.
|
||||||
|
/// (There is no need to copy the context because clickhouse-client has no background tasks so it won't use that context in parallel.)
|
||||||
|
client_context = global_context;
|
||||||
|
initClientContext();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -1205,11 +1207,6 @@ void Client::processConfig()
|
|||||||
pager = config().getString("pager", "");
|
pager = config().getString("pager", "");
|
||||||
|
|
||||||
setDefaultFormatsAndCompressionFromConfiguration();
|
setDefaultFormatsAndCompressionFromConfiguration();
|
||||||
|
|
||||||
global_context->setClientName(std::string(DEFAULT_CLIENT_NAME));
|
|
||||||
global_context->setQueryKindInitial();
|
|
||||||
global_context->setQuotaClientKey(config().getString("quota_key", ""));
|
|
||||||
global_context->setQueryKind(query_kind);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -16,7 +16,6 @@ public:
|
|||||||
int main(const std::vector<String> & /*args*/) override;
|
int main(const std::vector<String> & /*args*/) override;
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
|
|
||||||
Poco::Util::LayeredConfiguration & getClientConfiguration() override;
|
Poco::Util::LayeredConfiguration & getClientConfiguration() override;
|
||||||
|
|
||||||
bool processWithFuzzing(const String & full_query) override;
|
bool processWithFuzzing(const String & full_query) override;
|
||||||
|
@ -295,6 +295,8 @@ void LocalServer::cleanup()
|
|||||||
if (suggest)
|
if (suggest)
|
||||||
suggest.reset();
|
suggest.reset();
|
||||||
|
|
||||||
|
client_context.reset();
|
||||||
|
|
||||||
if (global_context)
|
if (global_context)
|
||||||
{
|
{
|
||||||
global_context->shutdown();
|
global_context->shutdown();
|
||||||
@ -436,7 +438,7 @@ void LocalServer::connect()
|
|||||||
in = input.get();
|
in = input.get();
|
||||||
}
|
}
|
||||||
connection = LocalConnection::createConnection(
|
connection = LocalConnection::createConnection(
|
||||||
connection_parameters, global_context, in, need_render_progress, need_render_profile_events, server_display_name);
|
connection_parameters, client_context, in, need_render_progress, need_render_profile_events, server_display_name);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -497,8 +499,6 @@ try
|
|||||||
initTTYBuffer(toProgressOption(getClientConfiguration().getString("progress", "default")));
|
initTTYBuffer(toProgressOption(getClientConfiguration().getString("progress", "default")));
|
||||||
ASTAlterCommand::setFormatAlterCommandsWithParentheses(true);
|
ASTAlterCommand::setFormatAlterCommandsWithParentheses(true);
|
||||||
|
|
||||||
applyCmdSettings(global_context);
|
|
||||||
|
|
||||||
/// try to load user defined executable functions, throw on error and die
|
/// try to load user defined executable functions, throw on error and die
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
@ -510,6 +510,11 @@ try
|
|||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Must be called after we stopped initializing the global context and changing its settings.
|
||||||
|
/// After this point the global context must be stayed almost unchanged till shutdown,
|
||||||
|
/// and all necessary changes must be made to the client context instead.
|
||||||
|
createClientContext();
|
||||||
|
|
||||||
if (is_interactive)
|
if (is_interactive)
|
||||||
{
|
{
|
||||||
clearTerminal();
|
clearTerminal();
|
||||||
@ -735,6 +740,9 @@ void LocalServer::processConfig()
|
|||||||
/// Load global settings from default_profile and system_profile.
|
/// Load global settings from default_profile and system_profile.
|
||||||
global_context->setDefaultProfiles(getClientConfiguration());
|
global_context->setDefaultProfiles(getClientConfiguration());
|
||||||
|
|
||||||
|
/// Command-line parameters can override settings from the default profile.
|
||||||
|
applyCmdSettings(global_context);
|
||||||
|
|
||||||
/// We load temporary database first, because projections need it.
|
/// We load temporary database first, because projections need it.
|
||||||
DatabaseCatalog::instance().initializeAndLoadTemporaryDatabase();
|
DatabaseCatalog::instance().initializeAndLoadTemporaryDatabase();
|
||||||
|
|
||||||
@ -778,10 +786,6 @@ void LocalServer::processConfig()
|
|||||||
|
|
||||||
server_display_name = getClientConfiguration().getString("display_name", "");
|
server_display_name = getClientConfiguration().getString("display_name", "");
|
||||||
prompt_by_server_display_name = getClientConfiguration().getRawString("prompt_by_server_display_name.default", ":) ");
|
prompt_by_server_display_name = getClientConfiguration().getRawString("prompt_by_server_display_name.default", ":) ");
|
||||||
|
|
||||||
global_context->setQueryKindInitial();
|
|
||||||
global_context->setQueryKind(query_kind);
|
|
||||||
global_context->setQueryParameters(query_parameters);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -860,6 +864,16 @@ void LocalServer::applyCmdOptions(ContextMutablePtr context)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void LocalServer::createClientContext()
|
||||||
|
{
|
||||||
|
/// In case of clickhouse-local it's necessary to use a separate context for client-related purposes.
|
||||||
|
/// We can't just change the global context because it is used in background tasks (for example, in merges)
|
||||||
|
/// which don't expect that the global context can suddenly change.
|
||||||
|
client_context = Context::createCopy(global_context);
|
||||||
|
initClientContext();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void LocalServer::processOptions(const OptionsDescription &, const CommandLineOptions & options, const std::vector<Arguments> &, const std::vector<Arguments> &)
|
void LocalServer::processOptions(const OptionsDescription &, const CommandLineOptions & options, const std::vector<Arguments> &, const std::vector<Arguments> &)
|
||||||
{
|
{
|
||||||
if (options.count("table"))
|
if (options.count("table"))
|
||||||
|
@ -31,7 +31,6 @@ public:
|
|||||||
int main(const std::vector<String> & /*args*/) override;
|
int main(const std::vector<String> & /*args*/) override;
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
|
|
||||||
Poco::Util::LayeredConfiguration & getClientConfiguration() override;
|
Poco::Util::LayeredConfiguration & getClientConfiguration() override;
|
||||||
|
|
||||||
void connect() override;
|
void connect() override;
|
||||||
@ -50,7 +49,6 @@ protected:
|
|||||||
void processConfig() override;
|
void processConfig() override;
|
||||||
void readArguments(int argc, char ** argv, Arguments & common_arguments, std::vector<Arguments> &, std::vector<Arguments> &) override;
|
void readArguments(int argc, char ** argv, Arguments & common_arguments, std::vector<Arguments> &, std::vector<Arguments> &) override;
|
||||||
|
|
||||||
|
|
||||||
void updateLoggerLevel(const String & logs_level) override;
|
void updateLoggerLevel(const String & logs_level) override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
@ -67,6 +65,8 @@ private:
|
|||||||
void applyCmdOptions(ContextMutablePtr context);
|
void applyCmdOptions(ContextMutablePtr context);
|
||||||
void applyCmdSettings(ContextMutablePtr context);
|
void applyCmdSettings(ContextMutablePtr context);
|
||||||
|
|
||||||
|
void createClientContext();
|
||||||
|
|
||||||
ServerSettings server_settings;
|
ServerSettings server_settings;
|
||||||
|
|
||||||
std::optional<StatusFile> status;
|
std::optional<StatusFile> status;
|
||||||
|
@ -1740,7 +1740,7 @@ QueryAnalyzer::QueryTreeNodesWithNames QueryAnalyzer::resolveQualifiedMatcher(Qu
|
|||||||
const auto * tuple_data_type = typeid_cast<const DataTypeTuple *>(result_type.get());
|
const auto * tuple_data_type = typeid_cast<const DataTypeTuple *>(result_type.get());
|
||||||
if (!tuple_data_type)
|
if (!tuple_data_type)
|
||||||
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
|
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
|
||||||
"Qualified matcher {} find non compound expression {} with type {}. Expected tuple or array of tuples. In scope {}",
|
"Qualified matcher {} found a non-compound expression {} with type {}. Expected a tuple or an array of tuples. In scope {}",
|
||||||
matcher_node->formatASTForErrorMessage(),
|
matcher_node->formatASTForErrorMessage(),
|
||||||
expression_query_tree_node->formatASTForErrorMessage(),
|
expression_query_tree_node->formatASTForErrorMessage(),
|
||||||
expression_query_tree_node->getResultType()->getName(),
|
expression_query_tree_node->getResultType()->getName(),
|
||||||
|
@ -477,7 +477,7 @@ void ClientBase::sendExternalTables(ASTPtr parsed_query)
|
|||||||
|
|
||||||
std::vector<ExternalTableDataPtr> data;
|
std::vector<ExternalTableDataPtr> data;
|
||||||
for (auto & table : external_tables)
|
for (auto & table : external_tables)
|
||||||
data.emplace_back(table.getData(global_context));
|
data.emplace_back(table.getData(client_context));
|
||||||
|
|
||||||
connection->sendExternalTablesData(data);
|
connection->sendExternalTablesData(data);
|
||||||
}
|
}
|
||||||
@ -690,10 +690,10 @@ try
|
|||||||
/// intermixed with data with parallel formatting.
|
/// intermixed with data with parallel formatting.
|
||||||
/// It may increase code complexity significantly.
|
/// It may increase code complexity significantly.
|
||||||
if (!extras_into_stdout || select_only_into_file)
|
if (!extras_into_stdout || select_only_into_file)
|
||||||
output_format = global_context->getOutputFormatParallelIfPossible(
|
output_format = client_context->getOutputFormatParallelIfPossible(
|
||||||
current_format, out_file_buf ? *out_file_buf : *out_buf, block);
|
current_format, out_file_buf ? *out_file_buf : *out_buf, block);
|
||||||
else
|
else
|
||||||
output_format = global_context->getOutputFormat(
|
output_format = client_context->getOutputFormat(
|
||||||
current_format, out_file_buf ? *out_file_buf : *out_buf, block);
|
current_format, out_file_buf ? *out_file_buf : *out_buf, block);
|
||||||
|
|
||||||
output_format->setAutoFlush();
|
output_format->setAutoFlush();
|
||||||
@ -772,6 +772,15 @@ void ClientBase::adjustSettings()
|
|||||||
global_context->setSettings(settings);
|
global_context->setSettings(settings);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ClientBase::initClientContext()
|
||||||
|
{
|
||||||
|
client_context->setClientName(std::string(DEFAULT_CLIENT_NAME));
|
||||||
|
client_context->setQuotaClientKey(getClientConfiguration().getString("quota_key", ""));
|
||||||
|
client_context->setQueryKindInitial();
|
||||||
|
client_context->setQueryKind(query_kind);
|
||||||
|
client_context->setQueryParameters(query_parameters);
|
||||||
|
}
|
||||||
|
|
||||||
bool ClientBase::isRegularFile(int fd)
|
bool ClientBase::isRegularFile(int fd)
|
||||||
{
|
{
|
||||||
struct stat file_stat;
|
struct stat file_stat;
|
||||||
@ -962,7 +971,7 @@ void ClientBase::processTextAsSingleQuery(const String & full_query)
|
|||||||
/// client-side. Thus we need to parse the query.
|
/// client-side. Thus we need to parse the query.
|
||||||
const char * begin = full_query.data();
|
const char * begin = full_query.data();
|
||||||
auto parsed_query = parseQuery(begin, begin + full_query.size(),
|
auto parsed_query = parseQuery(begin, begin + full_query.size(),
|
||||||
global_context->getSettingsRef(),
|
client_context->getSettingsRef(),
|
||||||
/*allow_multi_statements=*/ false);
|
/*allow_multi_statements=*/ false);
|
||||||
|
|
||||||
if (!parsed_query)
|
if (!parsed_query)
|
||||||
@ -985,7 +994,7 @@ void ClientBase::processTextAsSingleQuery(const String & full_query)
|
|||||||
/// But for asynchronous inserts we don't extract data, because it's needed
|
/// But for asynchronous inserts we don't extract data, because it's needed
|
||||||
/// to be done on server side in that case (for coalescing the data from multiple inserts on server side).
|
/// to be done on server side in that case (for coalescing the data from multiple inserts on server side).
|
||||||
const auto * insert = parsed_query->as<ASTInsertQuery>();
|
const auto * insert = parsed_query->as<ASTInsertQuery>();
|
||||||
if (insert && isSyncInsertWithData(*insert, global_context))
|
if (insert && isSyncInsertWithData(*insert, client_context))
|
||||||
query_to_execute = full_query.substr(0, insert->data - full_query.data());
|
query_to_execute = full_query.substr(0, insert->data - full_query.data());
|
||||||
else
|
else
|
||||||
query_to_execute = full_query;
|
query_to_execute = full_query;
|
||||||
@ -1103,7 +1112,7 @@ void ClientBase::processOrdinaryQuery(const String & query_to_execute, ASTPtr pa
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const auto & settings = global_context->getSettingsRef();
|
const auto & settings = client_context->getSettingsRef();
|
||||||
const Int32 signals_before_stop = settings.partial_result_on_first_cancel ? 2 : 1;
|
const Int32 signals_before_stop = settings.partial_result_on_first_cancel ? 2 : 1;
|
||||||
|
|
||||||
int retries_left = 10;
|
int retries_left = 10;
|
||||||
@ -1118,10 +1127,10 @@ void ClientBase::processOrdinaryQuery(const String & query_to_execute, ASTPtr pa
|
|||||||
connection_parameters.timeouts,
|
connection_parameters.timeouts,
|
||||||
query,
|
query,
|
||||||
query_parameters,
|
query_parameters,
|
||||||
global_context->getCurrentQueryId(),
|
client_context->getCurrentQueryId(),
|
||||||
query_processing_stage,
|
query_processing_stage,
|
||||||
&global_context->getSettingsRef(),
|
&client_context->getSettingsRef(),
|
||||||
&global_context->getClientInfo(),
|
&client_context->getClientInfo(),
|
||||||
true,
|
true,
|
||||||
[&](const Progress & progress) { onProgress(progress); });
|
[&](const Progress & progress) { onProgress(progress); });
|
||||||
|
|
||||||
@ -1308,7 +1317,7 @@ void ClientBase::onProgress(const Progress & value)
|
|||||||
|
|
||||||
void ClientBase::onTimezoneUpdate(const String & tz)
|
void ClientBase::onTimezoneUpdate(const String & tz)
|
||||||
{
|
{
|
||||||
global_context->setSetting("session_timezone", tz);
|
client_context->setSetting("session_timezone", tz);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -1504,13 +1513,13 @@ bool ClientBase::receiveSampleBlock(Block & out, ColumnsDescription & columns_de
|
|||||||
|
|
||||||
void ClientBase::setInsertionTable(const ASTInsertQuery & insert_query)
|
void ClientBase::setInsertionTable(const ASTInsertQuery & insert_query)
|
||||||
{
|
{
|
||||||
if (!global_context->hasInsertionTable() && insert_query.table)
|
if (!client_context->hasInsertionTable() && insert_query.table)
|
||||||
{
|
{
|
||||||
String table = insert_query.table->as<ASTIdentifier &>().shortName();
|
String table = insert_query.table->as<ASTIdentifier &>().shortName();
|
||||||
if (!table.empty())
|
if (!table.empty())
|
||||||
{
|
{
|
||||||
String database = insert_query.database ? insert_query.database->as<ASTIdentifier &>().shortName() : "";
|
String database = insert_query.database ? insert_query.database->as<ASTIdentifier &>().shortName() : "";
|
||||||
global_context->setInsertionTable(StorageID(database, table));
|
client_context->setInsertionTable(StorageID(database, table));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1561,7 +1570,7 @@ void ClientBase::processInsertQuery(const String & query_to_execute, ASTPtr pars
|
|||||||
const auto & parsed_insert_query = parsed_query->as<ASTInsertQuery &>();
|
const auto & parsed_insert_query = parsed_query->as<ASTInsertQuery &>();
|
||||||
if ((!parsed_insert_query.data && !parsed_insert_query.infile) && (is_interactive || (!stdin_is_a_tty && !isStdinNotEmptyAndValid(std_in))))
|
if ((!parsed_insert_query.data && !parsed_insert_query.infile) && (is_interactive || (!stdin_is_a_tty && !isStdinNotEmptyAndValid(std_in))))
|
||||||
{
|
{
|
||||||
const auto & settings = global_context->getSettingsRef();
|
const auto & settings = client_context->getSettingsRef();
|
||||||
if (settings.throw_if_no_data_to_insert)
|
if (settings.throw_if_no_data_to_insert)
|
||||||
throw Exception(ErrorCodes::NO_DATA_TO_INSERT, "No data to insert");
|
throw Exception(ErrorCodes::NO_DATA_TO_INSERT, "No data to insert");
|
||||||
else
|
else
|
||||||
@ -1575,10 +1584,10 @@ void ClientBase::processInsertQuery(const String & query_to_execute, ASTPtr pars
|
|||||||
connection_parameters.timeouts,
|
connection_parameters.timeouts,
|
||||||
query,
|
query,
|
||||||
query_parameters,
|
query_parameters,
|
||||||
global_context->getCurrentQueryId(),
|
client_context->getCurrentQueryId(),
|
||||||
query_processing_stage,
|
query_processing_stage,
|
||||||
&global_context->getSettingsRef(),
|
&client_context->getSettingsRef(),
|
||||||
&global_context->getClientInfo(),
|
&client_context->getClientInfo(),
|
||||||
true,
|
true,
|
||||||
[&](const Progress & progress) { onProgress(progress); });
|
[&](const Progress & progress) { onProgress(progress); });
|
||||||
|
|
||||||
@ -1626,7 +1635,7 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
|
|||||||
|
|
||||||
/// Set callback to be called on file progress.
|
/// Set callback to be called on file progress.
|
||||||
if (tty_buf)
|
if (tty_buf)
|
||||||
progress_indication.setFileProgressCallback(global_context, *tty_buf);
|
progress_indication.setFileProgressCallback(client_context, *tty_buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// If data fetched from file (maybe compressed file)
|
/// If data fetched from file (maybe compressed file)
|
||||||
@ -1660,10 +1669,10 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
|
|||||||
}
|
}
|
||||||
|
|
||||||
StorageFile::CommonArguments args{
|
StorageFile::CommonArguments args{
|
||||||
WithContext(global_context),
|
WithContext(client_context),
|
||||||
parsed_insert_query->table_id,
|
parsed_insert_query->table_id,
|
||||||
current_format,
|
current_format,
|
||||||
getFormatSettings(global_context),
|
getFormatSettings(client_context),
|
||||||
compression_method,
|
compression_method,
|
||||||
columns_for_storage_file,
|
columns_for_storage_file,
|
||||||
ConstraintsDescription{},
|
ConstraintsDescription{},
|
||||||
@ -1671,7 +1680,7 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
|
|||||||
{},
|
{},
|
||||||
String{},
|
String{},
|
||||||
};
|
};
|
||||||
StoragePtr storage = std::make_shared<StorageFile>(in_file, global_context->getUserFilesPath(), args);
|
StoragePtr storage = std::make_shared<StorageFile>(in_file, client_context->getUserFilesPath(), args);
|
||||||
storage->startup();
|
storage->startup();
|
||||||
SelectQueryInfo query_info;
|
SelectQueryInfo query_info;
|
||||||
|
|
||||||
@ -1682,16 +1691,16 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
|
|||||||
storage->read(
|
storage->read(
|
||||||
plan,
|
plan,
|
||||||
sample.getNames(),
|
sample.getNames(),
|
||||||
storage->getStorageSnapshot(metadata, global_context),
|
storage->getStorageSnapshot(metadata, client_context),
|
||||||
query_info,
|
query_info,
|
||||||
global_context,
|
client_context,
|
||||||
{},
|
{},
|
||||||
global_context->getSettingsRef().max_block_size,
|
client_context->getSettingsRef().max_block_size,
|
||||||
getNumberOfPhysicalCPUCores());
|
getNumberOfPhysicalCPUCores());
|
||||||
|
|
||||||
auto builder = plan.buildQueryPipeline(
|
auto builder = plan.buildQueryPipeline(
|
||||||
QueryPlanOptimizationSettings::fromContext(global_context),
|
QueryPlanOptimizationSettings::fromContext(client_context),
|
||||||
BuildQueryPipelineSettings::fromContext(global_context));
|
BuildQueryPipelineSettings::fromContext(client_context));
|
||||||
|
|
||||||
QueryPlanResourceHolder resources;
|
QueryPlanResourceHolder resources;
|
||||||
auto pipe = QueryPipelineBuilder::getPipe(std::move(*builder), resources);
|
auto pipe = QueryPipelineBuilder::getPipe(std::move(*builder), resources);
|
||||||
@ -1752,14 +1761,14 @@ void ClientBase::sendDataFrom(ReadBuffer & buf, Block & sample, const ColumnsDes
|
|||||||
current_format = insert->format;
|
current_format = insert->format;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto source = global_context->getInputFormat(current_format, buf, sample, insert_format_max_block_size);
|
auto source = client_context->getInputFormat(current_format, buf, sample, insert_format_max_block_size);
|
||||||
Pipe pipe(source);
|
Pipe pipe(source);
|
||||||
|
|
||||||
if (columns_description.hasDefaults())
|
if (columns_description.hasDefaults())
|
||||||
{
|
{
|
||||||
pipe.addSimpleTransform([&](const Block & header)
|
pipe.addSimpleTransform([&](const Block & header)
|
||||||
{
|
{
|
||||||
return std::make_shared<AddingDefaultsTransform>(header, columns_description, *source, global_context);
|
return std::make_shared<AddingDefaultsTransform>(header, columns_description, *source, client_context);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1921,12 +1930,12 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
|
|||||||
|
|
||||||
if (is_interactive)
|
if (is_interactive)
|
||||||
{
|
{
|
||||||
global_context->setCurrentQueryId("");
|
client_context->setCurrentQueryId("");
|
||||||
// Generate a new query_id
|
// Generate a new query_id
|
||||||
for (const auto & query_id_format : query_id_formats)
|
for (const auto & query_id_format : query_id_formats)
|
||||||
{
|
{
|
||||||
writeString(query_id_format.first, std_out);
|
writeString(query_id_format.first, std_out);
|
||||||
writeString(fmt::format(fmt::runtime(query_id_format.second), fmt::arg("query_id", global_context->getCurrentQueryId())), std_out);
|
writeString(fmt::format(fmt::runtime(query_id_format.second), fmt::arg("query_id", client_context->getCurrentQueryId())), std_out);
|
||||||
writeChar('\n', std_out);
|
writeChar('\n', std_out);
|
||||||
std_out.next();
|
std_out.next();
|
||||||
}
|
}
|
||||||
@ -1953,7 +1962,7 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
|
|||||||
auto password = auth_data->getPassword();
|
auto password = auth_data->getPassword();
|
||||||
|
|
||||||
if (password)
|
if (password)
|
||||||
global_context->getAccessControl().checkPasswordComplexityRules(*password);
|
client_context->getAccessControl().checkPasswordComplexityRules(*password);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1968,15 +1977,15 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
|
|||||||
std::optional<Settings> old_settings;
|
std::optional<Settings> old_settings;
|
||||||
SCOPE_EXIT_SAFE({
|
SCOPE_EXIT_SAFE({
|
||||||
if (old_settings)
|
if (old_settings)
|
||||||
global_context->setSettings(*old_settings);
|
client_context->setSettings(*old_settings);
|
||||||
});
|
});
|
||||||
|
|
||||||
auto apply_query_settings = [&](const IAST & settings_ast)
|
auto apply_query_settings = [&](const IAST & settings_ast)
|
||||||
{
|
{
|
||||||
if (!old_settings)
|
if (!old_settings)
|
||||||
old_settings.emplace(global_context->getSettingsRef());
|
old_settings.emplace(client_context->getSettingsRef());
|
||||||
global_context->applySettingsChanges(settings_ast.as<ASTSetQuery>()->changes);
|
client_context->applySettingsChanges(settings_ast.as<ASTSetQuery>()->changes);
|
||||||
global_context->resetSettingsToDefaultValue(settings_ast.as<ASTSetQuery>()->default_settings);
|
client_context->resetSettingsToDefaultValue(settings_ast.as<ASTSetQuery>()->default_settings);
|
||||||
};
|
};
|
||||||
|
|
||||||
const auto * insert = parsed_query->as<ASTInsertQuery>();
|
const auto * insert = parsed_query->as<ASTInsertQuery>();
|
||||||
@ -2009,7 +2018,7 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
|
|||||||
if (insert && insert->select)
|
if (insert && insert->select)
|
||||||
insert->tryFindInputFunction(input_function);
|
insert->tryFindInputFunction(input_function);
|
||||||
|
|
||||||
bool is_async_insert_with_inlined_data = global_context->getSettingsRef().async_insert && insert && insert->hasInlinedData();
|
bool is_async_insert_with_inlined_data = client_context->getSettingsRef().async_insert && insert && insert->hasInlinedData();
|
||||||
|
|
||||||
if (is_async_insert_with_inlined_data)
|
if (is_async_insert_with_inlined_data)
|
||||||
{
|
{
|
||||||
@ -2044,9 +2053,9 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
|
|||||||
if (change.name == "profile")
|
if (change.name == "profile")
|
||||||
current_profile = change.value.safeGet<String>();
|
current_profile = change.value.safeGet<String>();
|
||||||
else
|
else
|
||||||
global_context->applySettingChange(change);
|
client_context->applySettingChange(change);
|
||||||
}
|
}
|
||||||
global_context->resetSettingsToDefaultValue(set_query->default_settings);
|
client_context->resetSettingsToDefaultValue(set_query->default_settings);
|
||||||
|
|
||||||
/// Query parameters inside SET queries should be also saved on the client side
|
/// Query parameters inside SET queries should be also saved on the client side
|
||||||
/// to override their previous definitions set with --param_* arguments
|
/// to override their previous definitions set with --param_* arguments
|
||||||
@ -2054,7 +2063,7 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
|
|||||||
for (const auto & [name, value] : set_query->query_parameters)
|
for (const auto & [name, value] : set_query->query_parameters)
|
||||||
query_parameters.insert_or_assign(name, value);
|
query_parameters.insert_or_assign(name, value);
|
||||||
|
|
||||||
global_context->addQueryParameters(NameToNameMap{set_query->query_parameters.begin(), set_query->query_parameters.end()});
|
client_context->addQueryParameters(NameToNameMap{set_query->query_parameters.begin(), set_query->query_parameters.end()});
|
||||||
}
|
}
|
||||||
if (const auto * use_query = parsed_query->as<ASTUseQuery>())
|
if (const auto * use_query = parsed_query->as<ASTUseQuery>())
|
||||||
{
|
{
|
||||||
@ -2131,8 +2140,8 @@ MultiQueryProcessingStage ClientBase::analyzeMultiQueryText(
|
|||||||
if (this_query_begin >= all_queries_end)
|
if (this_query_begin >= all_queries_end)
|
||||||
return MultiQueryProcessingStage::QUERIES_END;
|
return MultiQueryProcessingStage::QUERIES_END;
|
||||||
|
|
||||||
unsigned max_parser_depth = static_cast<unsigned>(global_context->getSettingsRef().max_parser_depth);
|
unsigned max_parser_depth = static_cast<unsigned>(client_context->getSettingsRef().max_parser_depth);
|
||||||
unsigned max_parser_backtracks = static_cast<unsigned>(global_context->getSettingsRef().max_parser_backtracks);
|
unsigned max_parser_backtracks = static_cast<unsigned>(client_context->getSettingsRef().max_parser_backtracks);
|
||||||
|
|
||||||
// If there are only comments left until the end of file, we just
|
// If there are only comments left until the end of file, we just
|
||||||
// stop. The parser can't handle this situation because it always
|
// stop. The parser can't handle this situation because it always
|
||||||
@ -2152,7 +2161,7 @@ MultiQueryProcessingStage ClientBase::analyzeMultiQueryText(
|
|||||||
try
|
try
|
||||||
{
|
{
|
||||||
parsed_query = parseQuery(this_query_end, all_queries_end,
|
parsed_query = parseQuery(this_query_end, all_queries_end,
|
||||||
global_context->getSettingsRef(),
|
client_context->getSettingsRef(),
|
||||||
/*allow_multi_statements=*/ true);
|
/*allow_multi_statements=*/ true);
|
||||||
}
|
}
|
||||||
catch (const Exception & e)
|
catch (const Exception & e)
|
||||||
@ -2195,7 +2204,7 @@ MultiQueryProcessingStage ClientBase::analyzeMultiQueryText(
|
|||||||
{
|
{
|
||||||
this_query_end = find_first_symbols<'\n'>(insert_ast->data, all_queries_end);
|
this_query_end = find_first_symbols<'\n'>(insert_ast->data, all_queries_end);
|
||||||
insert_ast->end = this_query_end;
|
insert_ast->end = this_query_end;
|
||||||
query_to_execute_end = isSyncInsertWithData(*insert_ast, global_context) ? insert_ast->data : this_query_end;
|
query_to_execute_end = isSyncInsertWithData(*insert_ast, client_context) ? insert_ast->data : this_query_end;
|
||||||
}
|
}
|
||||||
|
|
||||||
query_to_execute = all_queries_text.substr(this_query_begin - all_queries_text.data(), query_to_execute_end - this_query_begin);
|
query_to_execute = all_queries_text.substr(this_query_begin - all_queries_text.data(), query_to_execute_end - this_query_begin);
|
||||||
@ -2404,13 +2413,13 @@ bool ClientBase::executeMultiQuery(const String & all_queries_text)
|
|||||||
// , where the inline data is delimited by semicolon and not by a
|
// , where the inline data is delimited by semicolon and not by a
|
||||||
// newline.
|
// newline.
|
||||||
auto * insert_ast = parsed_query->as<ASTInsertQuery>();
|
auto * insert_ast = parsed_query->as<ASTInsertQuery>();
|
||||||
if (insert_ast && isSyncInsertWithData(*insert_ast, global_context))
|
if (insert_ast && isSyncInsertWithData(*insert_ast, client_context))
|
||||||
{
|
{
|
||||||
this_query_end = insert_ast->end;
|
this_query_end = insert_ast->end;
|
||||||
adjustQueryEnd(
|
adjustQueryEnd(
|
||||||
this_query_end, all_queries_end,
|
this_query_end, all_queries_end,
|
||||||
static_cast<unsigned>(global_context->getSettingsRef().max_parser_depth),
|
static_cast<unsigned>(client_context->getSettingsRef().max_parser_depth),
|
||||||
static_cast<unsigned>(global_context->getSettingsRef().max_parser_backtracks));
|
static_cast<unsigned>(client_context->getSettingsRef().max_parser_backtracks));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Report error.
|
// Report error.
|
||||||
@ -2541,10 +2550,10 @@ void ClientBase::runInteractive()
|
|||||||
if (load_suggestions)
|
if (load_suggestions)
|
||||||
{
|
{
|
||||||
/// Load suggestion data from the server.
|
/// Load suggestion data from the server.
|
||||||
if (global_context->getApplicationType() == Context::ApplicationType::CLIENT)
|
if (client_context->getApplicationType() == Context::ApplicationType::CLIENT)
|
||||||
suggest->load<Connection>(global_context, connection_parameters, getClientConfiguration().getInt("suggestion_limit"), wait_for_suggestions_to_load);
|
suggest->load<Connection>(client_context, connection_parameters, getClientConfiguration().getInt("suggestion_limit"), wait_for_suggestions_to_load);
|
||||||
else if (global_context->getApplicationType() == Context::ApplicationType::LOCAL)
|
else if (client_context->getApplicationType() == Context::ApplicationType::LOCAL)
|
||||||
suggest->load<LocalConnection>(global_context, connection_parameters, getClientConfiguration().getInt("suggestion_limit"), wait_for_suggestions_to_load);
|
suggest->load<LocalConnection>(client_context, connection_parameters, getClientConfiguration().getInt("suggestion_limit"), wait_for_suggestions_to_load);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (home_path.empty())
|
if (home_path.empty())
|
||||||
@ -2682,7 +2691,7 @@ void ClientBase::runInteractive()
|
|||||||
{
|
{
|
||||||
// If a separate connection loading suggestions failed to open a new session,
|
// If a separate connection loading suggestions failed to open a new session,
|
||||||
// use the main session to receive them.
|
// use the main session to receive them.
|
||||||
suggest->load(*connection, connection_parameters.timeouts, getClientConfiguration().getInt("suggestion_limit"), global_context->getClientInfo());
|
suggest->load(*connection, connection_parameters.timeouts, getClientConfiguration().getInt("suggestion_limit"), client_context->getClientInfo());
|
||||||
}
|
}
|
||||||
|
|
||||||
try
|
try
|
||||||
@ -2731,10 +2740,10 @@ bool ClientBase::processMultiQueryFromFile(const String & file_name)
|
|||||||
|
|
||||||
if (!getClientConfiguration().has("log_comment"))
|
if (!getClientConfiguration().has("log_comment"))
|
||||||
{
|
{
|
||||||
Settings settings = global_context->getSettings();
|
Settings settings = client_context->getSettings();
|
||||||
/// NOTE: cannot use even weakly_canonical() since it fails for /dev/stdin due to resolving of "pipe:[X]"
|
/// NOTE: cannot use even weakly_canonical() since it fails for /dev/stdin due to resolving of "pipe:[X]"
|
||||||
settings.log_comment = fs::absolute(fs::path(file_name));
|
settings.log_comment = fs::absolute(fs::path(file_name));
|
||||||
global_context->setSettings(settings);
|
client_context->setSettings(settings);
|
||||||
}
|
}
|
||||||
|
|
||||||
return executeMultiQuery(queries_from_file);
|
return executeMultiQuery(queries_from_file);
|
||||||
|
@ -206,6 +206,9 @@ protected:
|
|||||||
/// Adjust some settings after command line options and config had been processed.
|
/// Adjust some settings after command line options and config had been processed.
|
||||||
void adjustSettings();
|
void adjustSettings();
|
||||||
|
|
||||||
|
/// Initializes the client context.
|
||||||
|
void initClientContext();
|
||||||
|
|
||||||
void setDefaultFormatsAndCompressionFromConfiguration();
|
void setDefaultFormatsAndCompressionFromConfiguration();
|
||||||
|
|
||||||
void initTTYBuffer(ProgressOption progress);
|
void initTTYBuffer(ProgressOption progress);
|
||||||
@ -215,6 +218,9 @@ protected:
|
|||||||
SharedContextHolder shared_context;
|
SharedContextHolder shared_context;
|
||||||
ContextMutablePtr global_context;
|
ContextMutablePtr global_context;
|
||||||
|
|
||||||
|
/// Client context is a context used only by the client to parse queries, process query parameters and to connect to clickhouse-server.
|
||||||
|
ContextMutablePtr client_context;
|
||||||
|
|
||||||
LoggerPtr fatal_log;
|
LoggerPtr fatal_log;
|
||||||
Poco::AutoPtr<Poco::SplitterChannel> fatal_channel_ptr;
|
Poco::AutoPtr<Poco::SplitterChannel> fatal_channel_ptr;
|
||||||
Poco::AutoPtr<Poco::Channel> fatal_console_channel_ptr;
|
Poco::AutoPtr<Poco::Channel> fatal_console_channel_ptr;
|
||||||
|
@ -548,7 +548,7 @@ public:
|
|||||||
virtual bool isExpired() const = 0;
|
virtual bool isExpired() const = 0;
|
||||||
|
|
||||||
/// Get the current connected node idx.
|
/// Get the current connected node idx.
|
||||||
virtual Int8 getConnectedNodeIdx() const = 0;
|
virtual std::optional<int8_t> getConnectedNodeIdx() const = 0;
|
||||||
|
|
||||||
/// Get the current connected host and port.
|
/// Get the current connected host and port.
|
||||||
virtual String getConnectedHostPort() const = 0;
|
virtual String getConnectedHostPort() const = 0;
|
||||||
|
@ -39,7 +39,7 @@ public:
|
|||||||
~TestKeeper() override;
|
~TestKeeper() override;
|
||||||
|
|
||||||
bool isExpired() const override { return expired; }
|
bool isExpired() const override { return expired; }
|
||||||
Int8 getConnectedNodeIdx() const override { return 0; }
|
std::optional<int8_t> getConnectedNodeIdx() const override { return 0; }
|
||||||
String getConnectedHostPort() const override { return "TestKeeper:0000"; }
|
String getConnectedHostPort() const override { return "TestKeeper:0000"; }
|
||||||
int32_t getConnectionXid() const override { return 0; }
|
int32_t getConnectionXid() const override { return 0; }
|
||||||
int64_t getSessionID() const override { return 0; }
|
int64_t getSessionID() const override { return 0; }
|
||||||
|
@ -128,16 +128,15 @@ void ZooKeeper::init(ZooKeeperArgs args_, std::unique_ptr<Coordination::IKeeper>
|
|||||||
ShuffleHosts shuffled_hosts = shuffleHosts();
|
ShuffleHosts shuffled_hosts = shuffleHosts();
|
||||||
|
|
||||||
impl = std::make_unique<Coordination::ZooKeeper>(shuffled_hosts, args, zk_log);
|
impl = std::make_unique<Coordination::ZooKeeper>(shuffled_hosts, args, zk_log);
|
||||||
Int8 node_idx = impl->getConnectedNodeIdx();
|
auto node_idx = impl->getConnectedNodeIdx();
|
||||||
|
|
||||||
if (args.chroot.empty())
|
if (args.chroot.empty())
|
||||||
LOG_TRACE(log, "Initialized, hosts: {}", fmt::join(args.hosts, ","));
|
LOG_TRACE(log, "Initialized, hosts: {}", fmt::join(args.hosts, ","));
|
||||||
else
|
else
|
||||||
LOG_TRACE(log, "Initialized, hosts: {}, chroot: {}", fmt::join(args.hosts, ","), args.chroot);
|
LOG_TRACE(log, "Initialized, hosts: {}, chroot: {}", fmt::join(args.hosts, ","), args.chroot);
|
||||||
|
|
||||||
|
|
||||||
/// If the balancing strategy has an optimal node then it will be the first in the list
|
/// If the balancing strategy has an optimal node then it will be the first in the list
|
||||||
bool connected_to_suboptimal_node = node_idx != shuffled_hosts[0].original_index;
|
bool connected_to_suboptimal_node = node_idx && static_cast<UInt8>(*node_idx) != shuffled_hosts[0].original_index;
|
||||||
bool respect_az = args.prefer_local_availability_zone && !args.client_availability_zone.empty();
|
bool respect_az = args.prefer_local_availability_zone && !args.client_availability_zone.empty();
|
||||||
bool may_benefit_from_reconnecting = respect_az || args.get_priority_load_balancing.hasOptimalNode();
|
bool may_benefit_from_reconnecting = respect_az || args.get_priority_load_balancing.hasOptimalNode();
|
||||||
if (connected_to_suboptimal_node && may_benefit_from_reconnecting)
|
if (connected_to_suboptimal_node && may_benefit_from_reconnecting)
|
||||||
@ -145,7 +144,7 @@ void ZooKeeper::init(ZooKeeperArgs args_, std::unique_ptr<Coordination::IKeeper>
|
|||||||
auto reconnect_timeout_sec = getSecondsUntilReconnect(args);
|
auto reconnect_timeout_sec = getSecondsUntilReconnect(args);
|
||||||
LOG_DEBUG(log, "Connected to a suboptimal ZooKeeper host ({}, index {})."
|
LOG_DEBUG(log, "Connected to a suboptimal ZooKeeper host ({}, index {})."
|
||||||
" To preserve balance in ZooKeeper usage, this ZooKeeper session will expire in {} seconds",
|
" To preserve balance in ZooKeeper usage, this ZooKeeper session will expire in {} seconds",
|
||||||
impl->getConnectedHostPort(), node_idx, reconnect_timeout_sec);
|
impl->getConnectedHostPort(), *node_idx, reconnect_timeout_sec);
|
||||||
|
|
||||||
auto reconnect_task_holder = DB::Context::getGlobalContextInstance()->getSchedulePool().createTask("ZKReconnect", [this, optimal_host = shuffled_hosts[0]]()
|
auto reconnect_task_holder = DB::Context::getGlobalContextInstance()->getSchedulePool().createTask("ZKReconnect", [this, optimal_host = shuffled_hosts[0]]()
|
||||||
{
|
{
|
||||||
@ -154,13 +153,15 @@ void ZooKeeper::init(ZooKeeperArgs args_, std::unique_ptr<Coordination::IKeeper>
|
|||||||
LOG_DEBUG(log, "Trying to connect to a more optimal node {}", optimal_host.host);
|
LOG_DEBUG(log, "Trying to connect to a more optimal node {}", optimal_host.host);
|
||||||
ShuffleHosts node{optimal_host};
|
ShuffleHosts node{optimal_host};
|
||||||
std::unique_ptr<Coordination::IKeeper> new_impl = std::make_unique<Coordination::ZooKeeper>(node, args, zk_log);
|
std::unique_ptr<Coordination::IKeeper> new_impl = std::make_unique<Coordination::ZooKeeper>(node, args, zk_log);
|
||||||
Int8 new_node_idx = new_impl->getConnectedNodeIdx();
|
|
||||||
|
auto new_node_idx = new_impl->getConnectedNodeIdx();
|
||||||
|
chassert(new_node_idx.has_value());
|
||||||
|
|
||||||
/// Maybe the node was unavailable when getting AZs first time, update just in case
|
/// Maybe the node was unavailable when getting AZs first time, update just in case
|
||||||
if (args.availability_zone_autodetect && availability_zones[new_node_idx].empty())
|
if (args.availability_zone_autodetect && availability_zones[*new_node_idx].empty())
|
||||||
{
|
{
|
||||||
availability_zones[new_node_idx] = new_impl->tryGetAvailabilityZone();
|
availability_zones[*new_node_idx] = new_impl->tryGetAvailabilityZone();
|
||||||
LOG_DEBUG(log, "Got availability zone for {}: {}", optimal_host.host, availability_zones[new_node_idx]);
|
LOG_DEBUG(log, "Got availability zone for {}: {}", optimal_host.host, availability_zones[*new_node_idx]);
|
||||||
}
|
}
|
||||||
|
|
||||||
optimal_impl = std::move(new_impl);
|
optimal_impl = std::move(new_impl);
|
||||||
@ -1525,7 +1526,7 @@ void ZooKeeper::setServerCompletelyStarted()
|
|||||||
zk->setServerCompletelyStarted();
|
zk->setServerCompletelyStarted();
|
||||||
}
|
}
|
||||||
|
|
||||||
Int8 ZooKeeper::getConnectedHostIdx() const
|
std::optional<int8_t> ZooKeeper::getConnectedHostIdx() const
|
||||||
{
|
{
|
||||||
return impl->getConnectedNodeIdx();
|
return impl->getConnectedNodeIdx();
|
||||||
}
|
}
|
||||||
@ -1544,10 +1545,10 @@ String ZooKeeper::getConnectedHostAvailabilityZone() const
|
|||||||
{
|
{
|
||||||
if (args.implementation != "zookeeper" || !impl)
|
if (args.implementation != "zookeeper" || !impl)
|
||||||
return "";
|
return "";
|
||||||
Int8 idx = impl->getConnectedNodeIdx();
|
std::optional<int8_t> idx = impl->getConnectedNodeIdx();
|
||||||
if (idx < 0)
|
if (!idx)
|
||||||
return ""; /// session expired
|
return ""; /// session expired
|
||||||
return availability_zones.at(idx);
|
return availability_zones.at(*idx);
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t getFailedOpIndex(Coordination::Error exception_code, const Coordination::Responses & responses)
|
size_t getFailedOpIndex(Coordination::Error exception_code, const Coordination::Responses & responses)
|
||||||
|
@ -620,7 +620,7 @@ public:
|
|||||||
|
|
||||||
void setServerCompletelyStarted();
|
void setServerCompletelyStarted();
|
||||||
|
|
||||||
Int8 getConnectedHostIdx() const;
|
std::optional<int8_t> getConnectedHostIdx() const;
|
||||||
String getConnectedHostPort() const;
|
String getConnectedHostPort() const;
|
||||||
int32_t getConnectionXid() const;
|
int32_t getConnectionXid() const;
|
||||||
|
|
||||||
|
@ -536,7 +536,7 @@ void ZooKeeper::connect(
|
|||||||
compressed_out.emplace(*out, CompressionCodecFactory::instance().get("LZ4", {}));
|
compressed_out.emplace(*out, CompressionCodecFactory::instance().get("LZ4", {}));
|
||||||
}
|
}
|
||||||
|
|
||||||
original_index = static_cast<Int8>(node.original_index);
|
original_index.store(node.original_index);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
catch (...)
|
catch (...)
|
||||||
@ -1531,6 +1531,30 @@ void ZooKeeper::close()
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
std::optional<int8_t> ZooKeeper::getConnectedNodeIdx() const
|
||||||
|
{
|
||||||
|
int8_t res = original_index.load();
|
||||||
|
if (res == -1)
|
||||||
|
return std::nullopt;
|
||||||
|
else
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
String ZooKeeper::getConnectedHostPort() const
|
||||||
|
{
|
||||||
|
auto idx = getConnectedNodeIdx();
|
||||||
|
if (idx)
|
||||||
|
return args.hosts[*idx];
|
||||||
|
else
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t ZooKeeper::getConnectionXid() const
|
||||||
|
{
|
||||||
|
return next_xid.load();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void ZooKeeper::setZooKeeperLog(std::shared_ptr<DB::ZooKeeperLog> zk_log_)
|
void ZooKeeper::setZooKeeperLog(std::shared_ptr<DB::ZooKeeperLog> zk_log_)
|
||||||
{
|
{
|
||||||
/// logOperationIfNeeded(...) uses zk_log and can be called from different threads, so we have to use atomic shared_ptr
|
/// logOperationIfNeeded(...) uses zk_log and can be called from different threads, so we have to use atomic shared_ptr
|
||||||
|
@ -114,13 +114,12 @@ public:
|
|||||||
|
|
||||||
~ZooKeeper() override;
|
~ZooKeeper() override;
|
||||||
|
|
||||||
|
|
||||||
/// If expired, you can only destroy the object. All other methods will throw exception.
|
/// If expired, you can only destroy the object. All other methods will throw exception.
|
||||||
bool isExpired() const override { return requests_queue.isFinished(); }
|
bool isExpired() const override { return requests_queue.isFinished(); }
|
||||||
|
|
||||||
Int8 getConnectedNodeIdx() const override { return original_index; }
|
std::optional<int8_t> getConnectedNodeIdx() const override;
|
||||||
String getConnectedHostPort() const override { return (original_index == -1) ? "" : args.hosts[original_index]; }
|
String getConnectedHostPort() const override;
|
||||||
int32_t getConnectionXid() const override { return next_xid.load(); }
|
int32_t getConnectionXid() const override;
|
||||||
|
|
||||||
String tryGetAvailabilityZone() override;
|
String tryGetAvailabilityZone() override;
|
||||||
|
|
||||||
@ -219,7 +218,7 @@ private:
|
|||||||
ACLs default_acls;
|
ACLs default_acls;
|
||||||
|
|
||||||
zkutil::ZooKeeperArgs args;
|
zkutil::ZooKeeperArgs args;
|
||||||
Int8 original_index = -1;
|
std::atomic<int8_t> original_index{-1};
|
||||||
|
|
||||||
/// Fault injection
|
/// Fault injection
|
||||||
void maybeInjectSendFault();
|
void maybeInjectSendFault();
|
||||||
|
@ -186,7 +186,7 @@ class IColumn;
|
|||||||
M(Bool, allow_suspicious_ttl_expressions, false, "Reject TTL expressions that don't depend on any of table's columns. It indicates a user error most of the time.", 0) \
|
M(Bool, allow_suspicious_ttl_expressions, false, "Reject TTL expressions that don't depend on any of table's columns. It indicates a user error most of the time.", 0) \
|
||||||
M(Bool, allow_suspicious_variant_types, false, "In CREATE TABLE statement allows specifying Variant type with similar variant types (for example, with different numeric or date types). Enabling this setting may introduce some ambiguity when working with values with similar types.", 0) \
|
M(Bool, allow_suspicious_variant_types, false, "In CREATE TABLE statement allows specifying Variant type with similar variant types (for example, with different numeric or date types). Enabling this setting may introduce some ambiguity when working with values with similar types.", 0) \
|
||||||
M(Bool, allow_suspicious_primary_key, false, "Forbid suspicious PRIMARY KEY/ORDER BY for MergeTree (i.e. SimpleAggregateFunction)", 0) \
|
M(Bool, allow_suspicious_primary_key, false, "Forbid suspicious PRIMARY KEY/ORDER BY for MergeTree (i.e. SimpleAggregateFunction)", 0) \
|
||||||
M(Bool, compile_expressions, false, "Compile some scalar functions and operators to native code.", 0) \
|
M(Bool, compile_expressions, true, "Compile some scalar functions and operators to native code.", 0) \
|
||||||
M(UInt64, min_count_to_compile_expression, 3, "The number of identical expressions before they are JIT-compiled", 0) \
|
M(UInt64, min_count_to_compile_expression, 3, "The number of identical expressions before they are JIT-compiled", 0) \
|
||||||
M(Bool, compile_aggregate_expressions, true, "Compile aggregate functions to native code.", 0) \
|
M(Bool, compile_aggregate_expressions, true, "Compile aggregate functions to native code.", 0) \
|
||||||
M(UInt64, min_count_to_compile_aggregate_expression, 3, "The number of identical aggregate expressions before they are JIT-compiled", 0) \
|
M(UInt64, min_count_to_compile_aggregate_expression, 3, "The number of identical aggregate expressions before they are JIT-compiled", 0) \
|
||||||
|
@ -57,6 +57,7 @@ String ClickHouseVersion::toString() const
|
|||||||
/// Note: please check if the key already exists to prevent duplicate entries.
|
/// Note: please check if the key already exists to prevent duplicate entries.
|
||||||
static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory::SettingsChanges>> settings_changes_history_initializer =
|
static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory::SettingsChanges>> settings_changes_history_initializer =
|
||||||
{
|
{
|
||||||
|
{"24.8", {{"compile_expressions", false, true, "We believe that the LLVM infrastructure behind the JIT compiler is stable enough to enable this setting by default."}}},
|
||||||
{"24.7", {{"output_format_parquet_write_page_index", false, true, "Add a possibility to write page index into parquet files."},
|
{"24.7", {{"output_format_parquet_write_page_index", false, true, "Add a possibility to write page index into parquet files."},
|
||||||
{"output_format_binary_encode_types_in_binary_format", false, false, "Added new setting to allow to write type names in binary format in RowBinaryWithNamesAndTypes output format"},
|
{"output_format_binary_encode_types_in_binary_format", false, false, "Added new setting to allow to write type names in binary format in RowBinaryWithNamesAndTypes output format"},
|
||||||
{"input_format_binary_decode_types_in_binary_format", false, false, "Added new setting to allow to read type names in binary format in RowBinaryWithNamesAndTypes input format"},
|
{"input_format_binary_decode_types_in_binary_format", false, false, "Added new setting to allow to read type names in binary format in RowBinaryWithNamesAndTypes input format"},
|
||||||
@ -80,7 +81,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
|
|||||||
{"ignore_on_cluster_for_replicated_named_collections_queries", false, false, "Ignore ON CLUSTER clause for replicated named collections management queries."},
|
{"ignore_on_cluster_for_replicated_named_collections_queries", false, false, "Ignore ON CLUSTER clause for replicated named collections management queries."},
|
||||||
{"backup_restore_s3_retry_attempts", 1000,1000, "Setting for Aws::Client::RetryStrategy, Aws::Client does retries itself, 0 means no retries. It takes place only for backup/restore."},
|
{"backup_restore_s3_retry_attempts", 1000,1000, "Setting for Aws::Client::RetryStrategy, Aws::Client does retries itself, 0 means no retries. It takes place only for backup/restore."},
|
||||||
{"postgresql_connection_attempt_timeout", 2, 2, "Allow to control 'connect_timeout' parameter of PostgreSQL connection."},
|
{"postgresql_connection_attempt_timeout", 2, 2, "Allow to control 'connect_timeout' parameter of PostgreSQL connection."},
|
||||||
{"postgresql_connection_pool_retries", 2, 2, "Allow to control the number of retries in PostgreSQL connection pool."}
|
{"postgresql_connection_pool_retries", 2, 2, "Allow to control the number of retries in PostgreSQL connection pool."},
|
||||||
}},
|
}},
|
||||||
{"24.6", {{"materialize_skip_indexes_on_insert", true, true, "Added new setting to allow to disable materialization of skip indexes on insert"},
|
{"24.6", {{"materialize_skip_indexes_on_insert", true, true, "Added new setting to allow to disable materialization of skip indexes on insert"},
|
||||||
{"materialize_statistics_on_insert", true, true, "Added new setting to allow to disable materialization of statistics on insert"},
|
{"materialize_statistics_on_insert", true, true, "Added new setting to allow to disable materialization of statistics on insert"},
|
||||||
|
@ -34,7 +34,7 @@ public:
|
|||||||
|
|
||||||
String getFileName() const override { return impl->getFileName(); }
|
String getFileName() const override { return impl->getFileName(); }
|
||||||
|
|
||||||
size_t getFileSize() override { return impl->getFileSize(); }
|
std::optional<size_t> tryGetFileSize() override { return impl->tryGetFileSize(); }
|
||||||
|
|
||||||
String getInfoForLog() override { return impl->getInfoForLog(); }
|
String getInfoForLog() override { return impl->getInfoForLog(); }
|
||||||
|
|
||||||
|
@ -253,16 +253,15 @@ void ReadBufferFromAzureBlobStorage::initialize()
|
|||||||
initialized = true;
|
initialized = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t ReadBufferFromAzureBlobStorage::getFileSize()
|
std::optional<size_t> ReadBufferFromAzureBlobStorage::tryGetFileSize()
|
||||||
{
|
{
|
||||||
if (!blob_client)
|
if (!blob_client)
|
||||||
blob_client = std::make_unique<Azure::Storage::Blobs::BlobClient>(blob_container_client->GetBlobClient(path));
|
blob_client = std::make_unique<Azure::Storage::Blobs::BlobClient>(blob_container_client->GetBlobClient(path));
|
||||||
|
|
||||||
if (file_size.has_value())
|
if (!file_size)
|
||||||
return *file_size;
|
file_size = blob_client->GetProperties().Value.BlobSize;
|
||||||
|
|
||||||
file_size = blob_client->GetProperties().Value.BlobSize;
|
return file_size;
|
||||||
return *file_size;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t ReadBufferFromAzureBlobStorage::readBigAt(char * to, size_t n, size_t range_begin, const std::function<bool(size_t)> & /*progress_callback*/) const
|
size_t ReadBufferFromAzureBlobStorage::readBigAt(char * to, size_t n, size_t range_begin, const std::function<bool(size_t)> & /*progress_callback*/) const
|
||||||
|
@ -42,7 +42,7 @@ public:
|
|||||||
|
|
||||||
bool supportsRightBoundedReads() const override { return true; }
|
bool supportsRightBoundedReads() const override { return true; }
|
||||||
|
|
||||||
size_t getFileSize() override;
|
std::optional<size_t> tryGetFileSize() override;
|
||||||
|
|
||||||
size_t readBigAt(char * to, size_t n, size_t range_begin, const std::function<bool(size_t)> & progress_callback) const override;
|
size_t readBigAt(char * to, size_t n, size_t range_begin, const std::function<bool(size_t)> & progress_callback) const override;
|
||||||
|
|
||||||
|
@ -41,7 +41,7 @@ public:
|
|||||||
|
|
||||||
void setReadUntilEnd() override { setReadUntilPosition(getFileSize()); }
|
void setReadUntilEnd() override { setReadUntilPosition(getFileSize()); }
|
||||||
|
|
||||||
size_t getFileSize() override { return getTotalSize(blobs_to_read); }
|
std::optional<size_t> tryGetFileSize() override { return getTotalSize(blobs_to_read); }
|
||||||
|
|
||||||
size_t getFileOffsetOfBufferEnd() const override { return file_offset_of_buffer_end; }
|
size_t getFileOffsetOfBufferEnd() const override { return file_offset_of_buffer_end; }
|
||||||
|
|
||||||
|
@ -321,7 +321,7 @@ public:
|
|||||||
off_t getPosition() override { throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "getPosition not supported when reading from archive"); }
|
off_t getPosition() override { throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "getPosition not supported when reading from archive"); }
|
||||||
String getFileName() const override { return handle.getFileName(); }
|
String getFileName() const override { return handle.getFileName(); }
|
||||||
|
|
||||||
size_t getFileSize() override { return handle.getFileInfo().uncompressed_size; }
|
std::optional<size_t> tryGetFileSize() override { return handle.getFileInfo().uncompressed_size; }
|
||||||
|
|
||||||
Handle releaseHandle() && { return std::move(handle); }
|
Handle releaseHandle() && { return std::move(handle); }
|
||||||
|
|
||||||
|
@ -317,7 +317,7 @@ public:
|
|||||||
|
|
||||||
String getFileName() const override { return handle.getFileName(); }
|
String getFileName() const override { return handle.getFileName(); }
|
||||||
|
|
||||||
size_t getFileSize() override { return handle.getFileInfo().uncompressed_size; }
|
std::optional<size_t> tryGetFileSize() override { return handle.getFileInfo().uncompressed_size; }
|
||||||
|
|
||||||
/// Releases owned handle to pass it to an enumerator.
|
/// Releases owned handle to pass it to an enumerator.
|
||||||
HandleHolder releaseHandle() &&
|
HandleHolder releaseHandle() &&
|
||||||
|
@ -244,7 +244,7 @@ void AsynchronousReadBufferFromFileDescriptor::rewind()
|
|||||||
file_offset_of_buffer_end = 0;
|
file_offset_of_buffer_end = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t AsynchronousReadBufferFromFileDescriptor::getFileSize()
|
std::optional<size_t> AsynchronousReadBufferFromFileDescriptor::tryGetFileSize()
|
||||||
{
|
{
|
||||||
return getSizeFromFileDescriptor(fd, getFileName());
|
return getSizeFromFileDescriptor(fd, getFileName());
|
||||||
}
|
}
|
||||||
|
@ -68,7 +68,7 @@ public:
|
|||||||
/// Seek to the beginning, discarding already read data if any. Useful to reread file that changes on every read.
|
/// Seek to the beginning, discarding already read data if any. Useful to reread file that changes on every read.
|
||||||
void rewind();
|
void rewind();
|
||||||
|
|
||||||
size_t getFileSize() override;
|
std::optional<size_t> tryGetFileSize() override;
|
||||||
|
|
||||||
size_t getFileOffsetOfBufferEnd() const override { return file_offset_of_buffer_end; }
|
size_t getFileOffsetOfBufferEnd() const override { return file_offset_of_buffer_end; }
|
||||||
|
|
||||||
|
@ -21,7 +21,7 @@ public:
|
|||||||
off_t seek(off_t off, int whence) override;
|
off_t seek(off_t off, int whence) override;
|
||||||
off_t getPosition() override;
|
off_t getPosition() override;
|
||||||
|
|
||||||
size_t getFileSize() override { return total_size; }
|
std::optional<size_t> tryGetFileSize() override { return total_size; }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
bool nextImpl() override;
|
bool nextImpl() override;
|
||||||
|
@ -87,7 +87,7 @@ off_t MMapReadBufferFromFileDescriptor::seek(off_t offset, int whence)
|
|||||||
return new_pos;
|
return new_pos;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t MMapReadBufferFromFileDescriptor::getFileSize()
|
std::optional<size_t> MMapReadBufferFromFileDescriptor::tryGetFileSize()
|
||||||
{
|
{
|
||||||
return getSizeFromFileDescriptor(getFD(), getFileName());
|
return getSizeFromFileDescriptor(getFD(), getFileName());
|
||||||
}
|
}
|
||||||
|
@ -38,7 +38,7 @@ public:
|
|||||||
|
|
||||||
int getFD() const;
|
int getFD() const;
|
||||||
|
|
||||||
size_t getFileSize() override;
|
std::optional<size_t> tryGetFileSize() override;
|
||||||
|
|
||||||
size_t readBigAt(char * to, size_t n, size_t offset, const std::function<bool(size_t)> &) const override;
|
size_t readBigAt(char * to, size_t n, size_t offset, const std::function<bool(size_t)> &) const override;
|
||||||
bool supportsReadAt() override { return true; }
|
bool supportsReadAt() override { return true; }
|
||||||
|
@ -152,7 +152,7 @@ off_t ParallelReadBuffer::seek(off_t offset, int whence)
|
|||||||
return offset;
|
return offset;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t ParallelReadBuffer::getFileSize()
|
std::optional<size_t> ParallelReadBuffer::tryGetFileSize()
|
||||||
{
|
{
|
||||||
return file_size;
|
return file_size;
|
||||||
}
|
}
|
||||||
|
@ -33,7 +33,7 @@ public:
|
|||||||
~ParallelReadBuffer() override { finishAndWait(); }
|
~ParallelReadBuffer() override { finishAndWait(); }
|
||||||
|
|
||||||
off_t seek(off_t off, int whence) override;
|
off_t seek(off_t off, int whence) override;
|
||||||
size_t getFileSize() override;
|
std::optional<size_t> tryGetFileSize() override;
|
||||||
off_t getPosition() override;
|
off_t getPosition() override;
|
||||||
|
|
||||||
const SeekableReadBuffer & getReadBuffer() const { return input; }
|
const SeekableReadBuffer & getReadBuffer() const { return input; }
|
||||||
|
@ -19,7 +19,8 @@ private:
|
|||||||
std::string getFileName() const override { return "<empty>"; }
|
std::string getFileName() const override { return "<empty>"; }
|
||||||
off_t seek(off_t /*off*/, int /*whence*/) override { return 0; }
|
off_t seek(off_t /*off*/, int /*whence*/) override { return 0; }
|
||||||
off_t getPosition() override { return 0; }
|
off_t getPosition() override { return 0; }
|
||||||
size_t getFileSize() override { return 0; }
|
std::optional<size_t> tryGetFileSize() override { return 0; }
|
||||||
|
size_t getFileOffsetOfBufferEnd() const override { return 0; }
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -30,7 +30,7 @@ public:
|
|||||||
|
|
||||||
void setReadUntilEnd() override { in->setReadUntilEnd(); }
|
void setReadUntilEnd() override { in->setReadUntilEnd(); }
|
||||||
|
|
||||||
size_t getFileSize() override { return in->getFileSize(); }
|
std::optional<size_t> tryGetFileSize() override { return in->tryGetFileSize(); }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
bool nextImpl() override;
|
bool nextImpl() override;
|
||||||
|
@ -5,11 +5,6 @@
|
|||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
namespace ErrorCodes
|
|
||||||
{
|
|
||||||
extern const int UNKNOWN_FILE_SIZE;
|
|
||||||
}
|
|
||||||
|
|
||||||
ReadBufferFromFileBase::ReadBufferFromFileBase() : BufferWithOwnMemory<SeekableReadBuffer>(0)
|
ReadBufferFromFileBase::ReadBufferFromFileBase() : BufferWithOwnMemory<SeekableReadBuffer>(0)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
@ -26,11 +21,9 @@ ReadBufferFromFileBase::ReadBufferFromFileBase(
|
|||||||
|
|
||||||
ReadBufferFromFileBase::~ReadBufferFromFileBase() = default;
|
ReadBufferFromFileBase::~ReadBufferFromFileBase() = default;
|
||||||
|
|
||||||
size_t ReadBufferFromFileBase::getFileSize()
|
std::optional<size_t> ReadBufferFromFileBase::tryGetFileSize()
|
||||||
{
|
{
|
||||||
if (file_size)
|
return file_size;
|
||||||
return *file_size;
|
|
||||||
throw Exception(ErrorCodes::UNKNOWN_FILE_SIZE, "Cannot find out file size for read buffer");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void ReadBufferFromFileBase::setProgressCallback(ContextPtr context)
|
void ReadBufferFromFileBase::setProgressCallback(ContextPtr context)
|
||||||
|
@ -50,7 +50,7 @@ public:
|
|||||||
clock_type = clock_type_;
|
clock_type = clock_type_;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t getFileSize() override;
|
std::optional<size_t> tryGetFileSize() override;
|
||||||
|
|
||||||
void setProgressCallback(ContextPtr context);
|
void setProgressCallback(ContextPtr context);
|
||||||
|
|
||||||
|
@ -52,9 +52,9 @@ bool ReadBufferFromFileDecorator::nextImpl()
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t ReadBufferFromFileDecorator::getFileSize()
|
std::optional<size_t> ReadBufferFromFileDecorator::tryGetFileSize()
|
||||||
{
|
{
|
||||||
return getFileSizeFromReadBuffer(*impl);
|
return tryGetFileSizeFromReadBuffer(*impl);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -27,7 +27,7 @@ public:
|
|||||||
|
|
||||||
ReadBuffer & getWrappedReadBuffer() { return *impl; }
|
ReadBuffer & getWrappedReadBuffer() { return *impl; }
|
||||||
|
|
||||||
size_t getFileSize() override;
|
std::optional<size_t> tryGetFileSize() override;
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
std::unique_ptr<SeekableReadBuffer> impl;
|
std::unique_ptr<SeekableReadBuffer> impl;
|
||||||
|
@ -253,7 +253,7 @@ void ReadBufferFromFileDescriptor::rewind()
|
|||||||
file_offset_of_buffer_end = 0;
|
file_offset_of_buffer_end = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t ReadBufferFromFileDescriptor::getFileSize()
|
std::optional<size_t> ReadBufferFromFileDescriptor::tryGetFileSize()
|
||||||
{
|
{
|
||||||
return getSizeFromFileDescriptor(fd, getFileName());
|
return getSizeFromFileDescriptor(fd, getFileName());
|
||||||
}
|
}
|
||||||
|
@ -69,7 +69,7 @@ public:
|
|||||||
/// Seek to the beginning, discarding already read data if any. Useful to reread file that changes on every read.
|
/// Seek to the beginning, discarding already read data if any. Useful to reread file that changes on every read.
|
||||||
void rewind();
|
void rewind();
|
||||||
|
|
||||||
size_t getFileSize() override;
|
std::optional<size_t> tryGetFileSize() override;
|
||||||
|
|
||||||
bool checkIfActuallySeekable() override;
|
bool checkIfActuallySeekable() override;
|
||||||
|
|
||||||
|
@ -311,15 +311,15 @@ off_t ReadBufferFromS3::seek(off_t offset_, int whence)
|
|||||||
return offset;
|
return offset;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t ReadBufferFromS3::getFileSize()
|
std::optional<size_t> ReadBufferFromS3::tryGetFileSize()
|
||||||
{
|
{
|
||||||
if (file_size)
|
if (file_size)
|
||||||
return *file_size;
|
return file_size;
|
||||||
|
|
||||||
auto object_size = S3::getObjectSize(*client_ptr, bucket, key, version_id);
|
auto object_size = S3::getObjectSize(*client_ptr, bucket, key, version_id);
|
||||||
|
|
||||||
file_size = object_size;
|
file_size = object_size;
|
||||||
return *file_size;
|
return file_size;
|
||||||
}
|
}
|
||||||
|
|
||||||
off_t ReadBufferFromS3::getPosition()
|
off_t ReadBufferFromS3::getPosition()
|
||||||
|
@ -63,7 +63,7 @@ public:
|
|||||||
|
|
||||||
off_t getPosition() override;
|
off_t getPosition() override;
|
||||||
|
|
||||||
size_t getFileSize() override;
|
std::optional<size_t> tryGetFileSize() override;
|
||||||
|
|
||||||
void setReadUntilPosition(size_t position) override;
|
void setReadUntilPosition(size_t position) override;
|
||||||
void setReadUntilEnd() override;
|
void setReadUntilEnd() override;
|
||||||
|
@ -72,7 +72,6 @@ namespace ErrorCodes
|
|||||||
extern const int BAD_ARGUMENTS;
|
extern const int BAD_ARGUMENTS;
|
||||||
extern const int CANNOT_SEEK_THROUGH_FILE;
|
extern const int CANNOT_SEEK_THROUGH_FILE;
|
||||||
extern const int SEEK_POSITION_OUT_OF_BOUND;
|
extern const int SEEK_POSITION_OUT_OF_BOUND;
|
||||||
extern const int UNKNOWN_FILE_SIZE;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
std::unique_ptr<ReadBuffer> ReadWriteBufferFromHTTP::CallResult::transformToReadBuffer(size_t buf_size) &&
|
std::unique_ptr<ReadBuffer> ReadWriteBufferFromHTTP::CallResult::transformToReadBuffer(size_t buf_size) &&
|
||||||
@ -121,15 +120,33 @@ void ReadWriteBufferFromHTTP::prepareRequest(Poco::Net::HTTPRequest & request, s
|
|||||||
credentials.authenticate(request);
|
credentials.authenticate(request);
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t ReadWriteBufferFromHTTP::getFileSize()
|
std::optional<size_t> ReadWriteBufferFromHTTP::tryGetFileSize()
|
||||||
{
|
{
|
||||||
if (!file_info)
|
if (!file_info)
|
||||||
file_info = getFileInfo();
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
file_info = getFileInfo();
|
||||||
|
}
|
||||||
|
catch (const HTTPException &)
|
||||||
|
{
|
||||||
|
return std::nullopt;
|
||||||
|
}
|
||||||
|
catch (const NetException &)
|
||||||
|
{
|
||||||
|
return std::nullopt;
|
||||||
|
}
|
||||||
|
catch (const Poco::Net::NetException &)
|
||||||
|
{
|
||||||
|
return std::nullopt;
|
||||||
|
}
|
||||||
|
catch (const Poco::IOException &)
|
||||||
|
{
|
||||||
|
return std::nullopt;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (file_info->file_size)
|
return file_info->file_size;
|
||||||
return *file_info->file_size;
|
|
||||||
|
|
||||||
throw Exception(ErrorCodes::UNKNOWN_FILE_SIZE, "Cannot find out file size for: {}", initial_uri.toString());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ReadWriteBufferFromHTTP::supportsReadAt()
|
bool ReadWriteBufferFromHTTP::supportsReadAt()
|
||||||
@ -311,12 +328,12 @@ void ReadWriteBufferFromHTTP::doWithRetries(std::function<void()> && callable,
|
|||||||
error_message = e.displayText();
|
error_message = e.displayText();
|
||||||
exception = std::current_exception();
|
exception = std::current_exception();
|
||||||
}
|
}
|
||||||
catch (DB::NetException & e)
|
catch (NetException & e)
|
||||||
{
|
{
|
||||||
error_message = e.displayText();
|
error_message = e.displayText();
|
||||||
exception = std::current_exception();
|
exception = std::current_exception();
|
||||||
}
|
}
|
||||||
catch (DB::HTTPException & e)
|
catch (HTTPException & e)
|
||||||
{
|
{
|
||||||
if (!isRetriableError(e.getHTTPStatus()))
|
if (!isRetriableError(e.getHTTPStatus()))
|
||||||
is_retriable = false;
|
is_retriable = false;
|
||||||
@ -324,7 +341,7 @@ void ReadWriteBufferFromHTTP::doWithRetries(std::function<void()> && callable,
|
|||||||
error_message = e.displayText();
|
error_message = e.displayText();
|
||||||
exception = std::current_exception();
|
exception = std::current_exception();
|
||||||
}
|
}
|
||||||
catch (DB::Exception & e)
|
catch (Exception & e)
|
||||||
{
|
{
|
||||||
is_retriable = false;
|
is_retriable = false;
|
||||||
|
|
||||||
@ -683,7 +700,19 @@ std::optional<time_t> ReadWriteBufferFromHTTP::tryGetLastModificationTime()
|
|||||||
{
|
{
|
||||||
file_info = getFileInfo();
|
file_info = getFileInfo();
|
||||||
}
|
}
|
||||||
catch (...)
|
catch (const HTTPException &)
|
||||||
|
{
|
||||||
|
return std::nullopt;
|
||||||
|
}
|
||||||
|
catch (const NetException &)
|
||||||
|
{
|
||||||
|
return std::nullopt;
|
||||||
|
}
|
||||||
|
catch (const Poco::Net::NetException &)
|
||||||
|
{
|
||||||
|
return std::nullopt;
|
||||||
|
}
|
||||||
|
catch (const Poco::IOException &)
|
||||||
{
|
{
|
||||||
return std::nullopt;
|
return std::nullopt;
|
||||||
}
|
}
|
||||||
@ -704,7 +733,7 @@ ReadWriteBufferFromHTTP::HTTPFileInfo ReadWriteBufferFromHTTP::getFileInfo()
|
|||||||
{
|
{
|
||||||
getHeadResponse(response);
|
getHeadResponse(response);
|
||||||
}
|
}
|
||||||
catch (HTTPException & e)
|
catch (const HTTPException & e)
|
||||||
{
|
{
|
||||||
/// Maybe the web server doesn't support HEAD requests.
|
/// Maybe the web server doesn't support HEAD requests.
|
||||||
/// E.g. webhdfs reports status 400.
|
/// E.g. webhdfs reports status 400.
|
||||||
|
@ -118,7 +118,7 @@ private:
|
|||||||
|
|
||||||
std::unique_ptr<ReadBuffer> initialize();
|
std::unique_ptr<ReadBuffer> initialize();
|
||||||
|
|
||||||
size_t getFileSize() override;
|
std::optional<size_t> tryGetFileSize() override;
|
||||||
|
|
||||||
bool supportsReadAt() override;
|
bool supportsReadAt() override;
|
||||||
|
|
||||||
|
@ -13,41 +13,47 @@ namespace ErrorCodes
|
|||||||
extern const int UNKNOWN_FILE_SIZE;
|
extern const int UNKNOWN_FILE_SIZE;
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename T>
|
size_t WithFileSize::getFileSize()
|
||||||
static size_t getFileSize(T & in)
|
|
||||||
{
|
{
|
||||||
if (auto * with_file_size = dynamic_cast<WithFileSize *>(&in))
|
if (auto maybe_size = tryGetFileSize())
|
||||||
{
|
return *maybe_size;
|
||||||
return with_file_size->getFileSize();
|
|
||||||
}
|
|
||||||
|
|
||||||
throw Exception(ErrorCodes::UNKNOWN_FILE_SIZE, "Cannot find out file size");
|
throw Exception(ErrorCodes::UNKNOWN_FILE_SIZE, "Cannot find out file size");
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t getFileSizeFromReadBuffer(ReadBuffer & in)
|
template <typename T>
|
||||||
|
static std::optional<size_t> tryGetFileSize(T & in)
|
||||||
{
|
{
|
||||||
if (auto * delegate = dynamic_cast<ReadBufferFromFileDecorator *>(&in))
|
if (auto * with_file_size = dynamic_cast<WithFileSize *>(&in))
|
||||||
{
|
return with_file_size->tryGetFileSize();
|
||||||
return getFileSize(delegate->getWrappedReadBuffer());
|
|
||||||
}
|
|
||||||
else if (auto * compressed = dynamic_cast<CompressedReadBufferWrapper *>(&in))
|
|
||||||
{
|
|
||||||
return getFileSize(compressed->getWrappedReadBuffer());
|
|
||||||
}
|
|
||||||
|
|
||||||
return getFileSize(in);
|
return std::nullopt;
|
||||||
|
}
|
||||||
|
|
||||||
|
template <typename T>
|
||||||
|
static size_t getFileSize(T & in)
|
||||||
|
{
|
||||||
|
if (auto maybe_size = tryGetFileSize(in))
|
||||||
|
return *maybe_size;
|
||||||
|
|
||||||
|
throw Exception(ErrorCodes::UNKNOWN_FILE_SIZE, "Cannot find out file size");
|
||||||
}
|
}
|
||||||
|
|
||||||
std::optional<size_t> tryGetFileSizeFromReadBuffer(ReadBuffer & in)
|
std::optional<size_t> tryGetFileSizeFromReadBuffer(ReadBuffer & in)
|
||||||
{
|
{
|
||||||
try
|
if (auto * delegate = dynamic_cast<ReadBufferFromFileDecorator *>(&in))
|
||||||
{
|
return tryGetFileSize(delegate->getWrappedReadBuffer());
|
||||||
return getFileSizeFromReadBuffer(in);
|
else if (auto * compressed = dynamic_cast<CompressedReadBufferWrapper *>(&in))
|
||||||
}
|
return tryGetFileSize(compressed->getWrappedReadBuffer());
|
||||||
catch (...)
|
return tryGetFileSize(in);
|
||||||
{
|
}
|
||||||
return std::nullopt;
|
|
||||||
}
|
size_t getFileSizeFromReadBuffer(ReadBuffer & in)
|
||||||
|
{
|
||||||
|
if (auto maybe_size = tryGetFileSizeFromReadBuffer(in))
|
||||||
|
return *maybe_size;
|
||||||
|
|
||||||
|
throw Exception(ErrorCodes::UNKNOWN_FILE_SIZE, "Cannot find out file size");
|
||||||
}
|
}
|
||||||
|
|
||||||
bool isBufferWithFileSize(const ReadBuffer & in)
|
bool isBufferWithFileSize(const ReadBuffer & in)
|
||||||
|
@ -10,15 +10,16 @@ class ReadBuffer;
|
|||||||
class WithFileSize
|
class WithFileSize
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
virtual size_t getFileSize() = 0;
|
/// Returns nullopt if couldn't find out file size;
|
||||||
|
virtual std::optional<size_t> tryGetFileSize() = 0;
|
||||||
virtual ~WithFileSize() = default;
|
virtual ~WithFileSize() = default;
|
||||||
|
|
||||||
|
size_t getFileSize();
|
||||||
};
|
};
|
||||||
|
|
||||||
bool isBufferWithFileSize(const ReadBuffer & in);
|
bool isBufferWithFileSize(const ReadBuffer & in);
|
||||||
|
|
||||||
size_t getFileSizeFromReadBuffer(ReadBuffer & in);
|
size_t getFileSizeFromReadBuffer(ReadBuffer & in);
|
||||||
|
|
||||||
/// Return nullopt if couldn't find out file size;
|
|
||||||
std::optional<size_t> tryGetFileSizeFromReadBuffer(ReadBuffer & in);
|
std::optional<size_t> tryGetFileSizeFromReadBuffer(ReadBuffer & in);
|
||||||
|
|
||||||
size_t getDataOffsetMaybeCompressed(const ReadBuffer & in);
|
size_t getDataOffsetMaybeCompressed(const ReadBuffer & in);
|
||||||
|
@ -13,10 +13,6 @@
|
|||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
namespace ErrorCodes
|
|
||||||
{
|
|
||||||
extern const int LOGICAL_ERROR;
|
|
||||||
}
|
|
||||||
|
|
||||||
namespace
|
namespace
|
||||||
{
|
{
|
||||||
@ -237,16 +233,8 @@ void SubstituteColumnOptimizer::perform()
|
|||||||
|
|
||||||
const auto & compare_graph = metadata_snapshot->getConstraints().getGraph();
|
const auto & compare_graph = metadata_snapshot->getConstraints().getGraph();
|
||||||
|
|
||||||
// Fill aliases
|
if (compare_graph.getNumOfComponents() == 0)
|
||||||
if (select_query->select())
|
return;
|
||||||
{
|
|
||||||
auto * list = select_query->refSelect()->as<ASTExpressionList>();
|
|
||||||
if (!list)
|
|
||||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "List of selected columns must be ASTExpressionList");
|
|
||||||
|
|
||||||
for (ASTPtr & ast : list->children)
|
|
||||||
ast->setAlias(ast->getAliasOrColumnName());
|
|
||||||
}
|
|
||||||
|
|
||||||
auto run_for_all = [&](const auto func)
|
auto run_for_all = [&](const auto func)
|
||||||
{
|
{
|
||||||
|
@ -15,7 +15,7 @@ struct StorageInMemoryMetadata;
|
|||||||
using StorageMetadataPtr = std::shared_ptr<const StorageInMemoryMetadata>;
|
using StorageMetadataPtr = std::shared_ptr<const StorageInMemoryMetadata>;
|
||||||
|
|
||||||
/// Optimizer that tries to replace columns to equal columns (according to constraints)
|
/// Optimizer that tries to replace columns to equal columns (according to constraints)
|
||||||
/// with lower size (according to compressed and uncomressed size).
|
/// with lower size (according to compressed and uncompressed sizes).
|
||||||
class SubstituteColumnOptimizer
|
class SubstituteColumnOptimizer
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
@ -66,7 +66,7 @@ public:
|
|||||||
/** Set the alias. */
|
/** Set the alias. */
|
||||||
virtual void setAlias(const String & /*to*/)
|
virtual void setAlias(const String & /*to*/)
|
||||||
{
|
{
|
||||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't set alias of {}", getColumnName());
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't set alias of {} of {}", getColumnName(), getID());
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Get the text that identifies this element. */
|
/** Get the text that identifies this element. */
|
||||||
|
@ -53,7 +53,7 @@ public:
|
|||||||
bool nextImpl() override;
|
bool nextImpl() override;
|
||||||
off_t seek(off_t off, int whence) override;
|
off_t seek(off_t off, int whence) override;
|
||||||
off_t getPosition() override;
|
off_t getPosition() override;
|
||||||
size_t getFileSize() override { return remote_file_size; }
|
std::optional<size_t> tryGetFileSize() override { return remote_file_size; }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::unique_ptr<LocalFileHolder> local_file_holder;
|
std::unique_ptr<LocalFileHolder> local_file_holder;
|
||||||
|
@ -91,9 +91,9 @@ void AsynchronousReadBufferFromHDFS::prefetch(Priority priority)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
size_t AsynchronousReadBufferFromHDFS::getFileSize()
|
std::optional<size_t> AsynchronousReadBufferFromHDFS::tryGetFileSize()
|
||||||
{
|
{
|
||||||
return impl->getFileSize();
|
return impl->tryGetFileSize();
|
||||||
}
|
}
|
||||||
|
|
||||||
String AsynchronousReadBufferFromHDFS::getFileName() const
|
String AsynchronousReadBufferFromHDFS::getFileName() const
|
||||||
|
@ -35,7 +35,7 @@ public:
|
|||||||
|
|
||||||
void prefetch(Priority priority) override;
|
void prefetch(Priority priority) override;
|
||||||
|
|
||||||
size_t getFileSize() override;
|
std::optional<size_t> tryGetFileSize() override;
|
||||||
|
|
||||||
String getFileName() const override;
|
String getFileName() const override;
|
||||||
|
|
||||||
|
@ -31,7 +31,7 @@ namespace ErrorCodes
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<SeekableReadBuffer>
|
struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<SeekableReadBuffer>, public WithFileSize
|
||||||
{
|
{
|
||||||
String hdfs_uri;
|
String hdfs_uri;
|
||||||
String hdfs_file_path;
|
String hdfs_file_path;
|
||||||
@ -90,7 +90,7 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
|
|||||||
hdfsCloseFile(fs.get(), fin);
|
hdfsCloseFile(fs.get(), fin);
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t getFileSize() const
|
std::optional<size_t> tryGetFileSize() override
|
||||||
{
|
{
|
||||||
return file_size;
|
return file_size;
|
||||||
}
|
}
|
||||||
@ -191,9 +191,9 @@ ReadBufferFromHDFS::ReadBufferFromHDFS(
|
|||||||
|
|
||||||
ReadBufferFromHDFS::~ReadBufferFromHDFS() = default;
|
ReadBufferFromHDFS::~ReadBufferFromHDFS() = default;
|
||||||
|
|
||||||
size_t ReadBufferFromHDFS::getFileSize()
|
std::optional<size_t> ReadBufferFromHDFS::tryGetFileSize()
|
||||||
{
|
{
|
||||||
return impl->getFileSize();
|
return impl->tryGetFileSize();
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ReadBufferFromHDFS::nextImpl()
|
bool ReadBufferFromHDFS::nextImpl()
|
||||||
|
@ -40,7 +40,7 @@ public:
|
|||||||
|
|
||||||
off_t getPosition() override;
|
off_t getPosition() override;
|
||||||
|
|
||||||
size_t getFileSize() override;
|
std::optional<size_t> tryGetFileSize() override;
|
||||||
|
|
||||||
size_t getFileOffsetOfBufferEnd() const override;
|
size_t getFileOffsetOfBufferEnd() const override;
|
||||||
|
|
||||||
|
@ -4,6 +4,7 @@
|
|||||||
#include <DataTypes/DataTypeString.h>
|
#include <DataTypes/DataTypeString.h>
|
||||||
#include <DataTypes/DataTypesNumber.h>
|
#include <DataTypes/DataTypesNumber.h>
|
||||||
#include <DataTypes/DataTypeDateTime.h>
|
#include <DataTypes/DataTypeDateTime.h>
|
||||||
|
#include <DataTypes/DataTypeNullable.h>
|
||||||
#include <Common/ZooKeeper/ZooKeeper.h>
|
#include <Common/ZooKeeper/ZooKeeper.h>
|
||||||
#include <Coordination/KeeperFeatureFlags.h>
|
#include <Coordination/KeeperFeatureFlags.h>
|
||||||
#include <Storages/System/StorageSystemZooKeeperConnection.h>
|
#include <Storages/System/StorageSystemZooKeeperConnection.h>
|
||||||
@ -27,7 +28,7 @@ ColumnsDescription StorageSystemZooKeeperConnection::getColumnsDescription()
|
|||||||
/* 0 */ {"name", std::make_shared<DataTypeString>(), "ZooKeeper cluster's name."},
|
/* 0 */ {"name", std::make_shared<DataTypeString>(), "ZooKeeper cluster's name."},
|
||||||
/* 1 */ {"host", std::make_shared<DataTypeString>(), "The hostname/IP of the ZooKeeper node that ClickHouse connected to."},
|
/* 1 */ {"host", std::make_shared<DataTypeString>(), "The hostname/IP of the ZooKeeper node that ClickHouse connected to."},
|
||||||
/* 2 */ {"port", std::make_shared<DataTypeUInt16>(), "The port of the ZooKeeper node that ClickHouse connected to."},
|
/* 2 */ {"port", std::make_shared<DataTypeUInt16>(), "The port of the ZooKeeper node that ClickHouse connected to."},
|
||||||
/* 3 */ {"index", std::make_shared<DataTypeUInt8>(), "The index of the ZooKeeper node that ClickHouse connected to. The index is from ZooKeeper config."},
|
/* 3 */ {"index", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt8>()), "The index of the ZooKeeper node that ClickHouse connected to. The index is from ZooKeeper config. If not connected, this column is NULL."},
|
||||||
/* 4 */ {"connected_time", std::make_shared<DataTypeDateTime>(), "When the connection was established."},
|
/* 4 */ {"connected_time", std::make_shared<DataTypeDateTime>(), "When the connection was established."},
|
||||||
/* 5 */ {"session_uptime_elapsed_seconds", std::make_shared<DataTypeUInt64>(), "Seconds elapsed since the connection was established."},
|
/* 5 */ {"session_uptime_elapsed_seconds", std::make_shared<DataTypeUInt64>(), "Seconds elapsed since the connection was established."},
|
||||||
/* 6 */ {"is_expired", std::make_shared<DataTypeUInt8>(), "Is the current connection expired."},
|
/* 6 */ {"is_expired", std::make_shared<DataTypeUInt8>(), "Is the current connection expired."},
|
||||||
@ -64,7 +65,7 @@ void StorageSystemZooKeeperConnection::fillData(MutableColumns & res_columns, Co
|
|||||||
/// For read-only snapshot type functionality, it's acceptable even though 'getZooKeeper' may cause data inconsistency.
|
/// For read-only snapshot type functionality, it's acceptable even though 'getZooKeeper' may cause data inconsistency.
|
||||||
auto fill_data = [&](const String & name, const zkutil::ZooKeeperPtr zookeeper, MutableColumns & columns)
|
auto fill_data = [&](const String & name, const zkutil::ZooKeeperPtr zookeeper, MutableColumns & columns)
|
||||||
{
|
{
|
||||||
Int8 index = zookeeper->getConnectedHostIdx();
|
auto index = zookeeper->getConnectedHostIdx();
|
||||||
String host_port = zookeeper->getConnectedHostPort();
|
String host_port = zookeeper->getConnectedHostPort();
|
||||||
if (index != -1 && !host_port.empty())
|
if (index != -1 && !host_port.empty())
|
||||||
{
|
{
|
||||||
@ -78,7 +79,10 @@ void StorageSystemZooKeeperConnection::fillData(MutableColumns & res_columns, Co
|
|||||||
columns[0]->insert(name);
|
columns[0]->insert(name);
|
||||||
columns[1]->insert(host);
|
columns[1]->insert(host);
|
||||||
columns[2]->insert(port);
|
columns[2]->insert(port);
|
||||||
columns[3]->insert(index);
|
if (index)
|
||||||
|
columns[3]->insert(*index);
|
||||||
|
else
|
||||||
|
columns[3]->insertDefault();
|
||||||
columns[4]->insert(connected_time);
|
columns[4]->insert(connected_time);
|
||||||
columns[5]->insert(uptime);
|
columns[5]->insert(uptime);
|
||||||
columns[6]->insert(zookeeper->expired());
|
columns[6]->insert(zookeeper->expired());
|
||||||
|
@ -779,7 +779,7 @@ class SettingsRandomizer:
|
|||||||
"filesystem_prefetch_step_bytes": lambda: random.choice(
|
"filesystem_prefetch_step_bytes": lambda: random.choice(
|
||||||
[0, "100Mi"]
|
[0, "100Mi"]
|
||||||
), # 0 means 'auto'
|
), # 0 means 'auto'
|
||||||
# "compile_expressions": lambda: random.randint(0, 1), - this setting has a bug: https://github.com/ClickHouse/ClickHouse/issues/51264
|
"compile_expressions": lambda: random.randint(0, 1),
|
||||||
"compile_aggregate_expressions": lambda: random.randint(0, 1),
|
"compile_aggregate_expressions": lambda: random.randint(0, 1),
|
||||||
"compile_sort_description": lambda: random.randint(0, 1),
|
"compile_sort_description": lambda: random.randint(0, 1),
|
||||||
"merge_tree_coarse_index_granularity": lambda: random.randint(2, 32),
|
"merge_tree_coarse_index_granularity": lambda: random.randint(2, 32),
|
||||||
|
@ -32,10 +32,10 @@
|
|||||||
1
|
1
|
||||||
1
|
1
|
||||||
0
|
0
|
||||||
SELECT count() AS `count()`
|
SELECT count()
|
||||||
FROM constraint_test_constants
|
FROM constraint_test_constants
|
||||||
WHERE (b > 100) OR (c > 100)
|
WHERE (b > 100) OR (c > 100)
|
||||||
SELECT count() AS `count()`
|
SELECT count()
|
||||||
FROM constraint_test_constants
|
FROM constraint_test_constants
|
||||||
WHERE c > 100
|
WHERE c > 100
|
||||||
QUERY id: 0
|
QUERY id: 0
|
||||||
@ -53,7 +53,7 @@ QUERY id: 0
|
|||||||
COLUMN id: 6, column_name: c, result_type: Int64, source_id: 3
|
COLUMN id: 6, column_name: c, result_type: Int64, source_id: 3
|
||||||
CONSTANT id: 7, constant_value: UInt64_100, constant_value_type: UInt8
|
CONSTANT id: 7, constant_value: UInt64_100, constant_value_type: UInt8
|
||||||
SETTINGS allow_experimental_analyzer=1
|
SETTINGS allow_experimental_analyzer=1
|
||||||
SELECT count() AS `count()`
|
SELECT count()
|
||||||
FROM constraint_test_constants
|
FROM constraint_test_constants
|
||||||
WHERE c > 100
|
WHERE c > 100
|
||||||
QUERY id: 0
|
QUERY id: 0
|
||||||
@ -71,7 +71,7 @@ QUERY id: 0
|
|||||||
COLUMN id: 6, column_name: c, result_type: Int64, source_id: 3
|
COLUMN id: 6, column_name: c, result_type: Int64, source_id: 3
|
||||||
CONSTANT id: 7, constant_value: UInt64_100, constant_value_type: UInt8
|
CONSTANT id: 7, constant_value: UInt64_100, constant_value_type: UInt8
|
||||||
SETTINGS allow_experimental_analyzer=1
|
SETTINGS allow_experimental_analyzer=1
|
||||||
SELECT count() AS `count()`
|
SELECT count()
|
||||||
FROM constraint_test_constants
|
FROM constraint_test_constants
|
||||||
QUERY id: 0
|
QUERY id: 0
|
||||||
PROJECTION COLUMNS
|
PROJECTION COLUMNS
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
SELECT
|
SELECT
|
||||||
(b AS `cityHash64(a)`) + 10 AS `plus(cityHash64(a), 10)`,
|
(b AS `cityHash64(a)`) + 10,
|
||||||
(b AS b) + 3 AS `plus(b, 3)`
|
(b AS b) + 3
|
||||||
FROM column_swap_test_test
|
FROM column_swap_test_test
|
||||||
WHERE b = 1
|
WHERE b = 1
|
||||||
QUERY id: 0
|
QUERY id: 0
|
||||||
@ -59,8 +59,8 @@ QUERY id: 0
|
|||||||
CONSTANT id: 14, constant_value: UInt64_1, constant_value_type: UInt8
|
CONSTANT id: 14, constant_value: UInt64_1, constant_value_type: UInt8
|
||||||
SETTINGS allow_experimental_analyzer=1
|
SETTINGS allow_experimental_analyzer=1
|
||||||
SELECT
|
SELECT
|
||||||
(b AS `cityHash64(a)`) + 10 AS `plus(cityHash64(a), 10)`,
|
(b AS `cityHash64(a)`) + 10,
|
||||||
(b AS b) + 3 AS `plus(b, 3)`
|
(b AS b) + 3
|
||||||
FROM column_swap_test_test
|
FROM column_swap_test_test
|
||||||
WHERE b = 0
|
WHERE b = 0
|
||||||
QUERY id: 0
|
QUERY id: 0
|
||||||
@ -89,8 +89,8 @@ QUERY id: 0
|
|||||||
CONSTANT id: 14, constant_value: UInt64_0, constant_value_type: UInt8
|
CONSTANT id: 14, constant_value: UInt64_0, constant_value_type: UInt8
|
||||||
SETTINGS allow_experimental_analyzer=1
|
SETTINGS allow_experimental_analyzer=1
|
||||||
SELECT
|
SELECT
|
||||||
(b AS `cityHash64(a)`) + 10 AS `plus(cityHash64(a), 10)`,
|
(b AS `cityHash64(a)`) + 10,
|
||||||
(b AS b) + 3 AS `plus(b, 3)`
|
(b AS b) + 3
|
||||||
FROM column_swap_test_test
|
FROM column_swap_test_test
|
||||||
WHERE b = 0
|
WHERE b = 0
|
||||||
QUERY id: 0
|
QUERY id: 0
|
||||||
@ -119,8 +119,8 @@ QUERY id: 0
|
|||||||
CONSTANT id: 14, constant_value: UInt64_0, constant_value_type: UInt8
|
CONSTANT id: 14, constant_value: UInt64_0, constant_value_type: UInt8
|
||||||
SETTINGS allow_experimental_analyzer=1
|
SETTINGS allow_experimental_analyzer=1
|
||||||
SELECT
|
SELECT
|
||||||
(b AS `cityHash64(a)`) + 10 AS `plus(cityHash64(a), 10)`,
|
(b AS `cityHash64(a)`) + 10,
|
||||||
(b AS b) + 3 AS `plus(b, 3)`
|
(b AS b) + 3
|
||||||
FROM column_swap_test_test
|
FROM column_swap_test_test
|
||||||
WHERE b = 1
|
WHERE b = 1
|
||||||
QUERY id: 0
|
QUERY id: 0
|
||||||
@ -148,7 +148,7 @@ QUERY id: 0
|
|||||||
COLUMN id: 13, column_name: b, result_type: UInt64, source_id: 5
|
COLUMN id: 13, column_name: b, result_type: UInt64, source_id: 5
|
||||||
CONSTANT id: 14, constant_value: UInt64_1, constant_value_type: UInt8
|
CONSTANT id: 14, constant_value: UInt64_1, constant_value_type: UInt8
|
||||||
SETTINGS allow_experimental_analyzer=1
|
SETTINGS allow_experimental_analyzer=1
|
||||||
SELECT (b AS `cityHash64(a)`) + 10 AS `plus(cityHash64(a), 10)`
|
SELECT (b AS `cityHash64(a)`) + 10
|
||||||
FROM column_swap_test_test
|
FROM column_swap_test_test
|
||||||
WHERE b = 0
|
WHERE b = 0
|
||||||
QUERY id: 0
|
QUERY id: 0
|
||||||
@ -171,8 +171,8 @@ QUERY id: 0
|
|||||||
CONSTANT id: 10, constant_value: UInt64_0, constant_value_type: UInt8
|
CONSTANT id: 10, constant_value: UInt64_0, constant_value_type: UInt8
|
||||||
SETTINGS allow_experimental_analyzer=1
|
SETTINGS allow_experimental_analyzer=1
|
||||||
SELECT
|
SELECT
|
||||||
(cityHash64(a) AS `cityHash64(a)`) + 10 AS `plus(cityHash64(a), 10)`,
|
(cityHash64(a) AS `cityHash64(a)`) + 10,
|
||||||
a AS a
|
a
|
||||||
FROM column_swap_test_test
|
FROM column_swap_test_test
|
||||||
WHERE cityHash64(a) = 0
|
WHERE cityHash64(a) = 0
|
||||||
QUERY id: 0
|
QUERY id: 0
|
||||||
@ -203,8 +203,8 @@ QUERY id: 0
|
|||||||
CONSTANT id: 15, constant_value: UInt64_0, constant_value_type: UInt8
|
CONSTANT id: 15, constant_value: UInt64_0, constant_value_type: UInt8
|
||||||
SETTINGS allow_experimental_analyzer=1
|
SETTINGS allow_experimental_analyzer=1
|
||||||
SELECT
|
SELECT
|
||||||
(cityHash64(a) AS b) + 10 AS `plus(b, 10)`,
|
(cityHash64(a) AS b) + 10,
|
||||||
a AS a
|
a
|
||||||
FROM column_swap_test_test
|
FROM column_swap_test_test
|
||||||
WHERE cityHash64(a) = 0
|
WHERE cityHash64(a) = 0
|
||||||
QUERY id: 0
|
QUERY id: 0
|
||||||
|
@ -1,5 +1,9 @@
|
|||||||
#!/usr/bin/env bash
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
|
# Tags: no-parallel
|
||||||
|
# no-parallel: This test is not parallel because when we execute system-wide SYSTEM DROP REPLICA,
|
||||||
|
# other tests might shut down the storage in parallel and the test will fail.
|
||||||
|
|
||||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||||
# shellcheck source=../shell_config.sh
|
# shellcheck source=../shell_config.sh
|
||||||
. "$CURDIR"/../shell_config.sh
|
. "$CURDIR"/../shell_config.sh
|
||||||
|
@ -6,8 +6,8 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
|||||||
|
|
||||||
function test
|
function test
|
||||||
{
|
{
|
||||||
$CLICKHOUSE_LOCAL --allow_experimental_dynamic_type=1 --allow_experimental_variant_type=1 --output_format_binary_encode_types_in_binary_format=1 -q "select $1 as value format RowBinaryWithNamesAndTypes" | $CLICKHOUSE_LOCAL --input-format RowBinaryWithNamesAndTypes --input_format_binary_decode_types_in_binary_format=1 -q "select value, toTypeName(value) from table"
|
$CLICKHOUSE_LOCAL --allow_experimental_dynamic_type=1 --allow_experimental_variant_type=1 --output_format_binary_encode_types_in_binary_format=1 -q "select $1 as value format RowBinaryWithNamesAndTypes" | $CLICKHOUSE_LOCAL --input-format RowBinaryWithNamesAndTypes --input_format_binary_decode_types_in_binary_format=1 -q "select value, toTypeName(value) from table"
|
||||||
$CLICKHOUSE_LOCAL --allow_experimental_dynamic_type=1 --allow_experimental_variant_type=1 --output_format_native_encode_types_in_binary_format=1 -q "select $1 as value format Native" | $CLICKHOUSE_LOCAL --input-format Native --input_format_native_decode_types_in_binary_format=1 -q "select value, toTypeName(value) from table"
|
$CLICKHOUSE_LOCAL --allow_experimental_dynamic_type=1 --allow_experimental_variant_type=1 --output_format_native_encode_types_in_binary_format=1 -q "select $1 as value format Native" | $CLICKHOUSE_LOCAL --input-format Native --input_format_native_decode_types_in_binary_format=1 -q "select value, toTypeName(value) from table"
|
||||||
}
|
}
|
||||||
|
|
||||||
test "materialize(42)::UInt8"
|
test "materialize(42)::UInt8"
|
||||||
|
@ -17,8 +17,7 @@ function test()
|
|||||||
$CH_CLIENT -q "select v.UInt64.null, v.\`Array(Variant(String, UInt64))\`.null, v.\`Array(Variant(String, UInt64))\`.size0, v.\`Array(Variant(String, UInt64))\`.UInt64.null from test order by id"
|
$CH_CLIENT -q "select v.UInt64.null, v.\`Array(Variant(String, UInt64))\`.null, v.\`Array(Variant(String, UInt64))\`.size0, v.\`Array(Variant(String, UInt64))\`.UInt64.null from test order by id"
|
||||||
$CH_CLIENT -q "select v.\`Array(Variant(String, UInt64))\`.null, v.\`Array(Variant(String, UInt64))\`.size0, v.\`Array(Variant(String, UInt64))\`.UInt64.null, v.\`Array(Variant(String, UInt64))\`.String.null from test order by id"
|
$CH_CLIENT -q "select v.\`Array(Variant(String, UInt64))\`.null, v.\`Array(Variant(String, UInt64))\`.size0, v.\`Array(Variant(String, UInt64))\`.UInt64.null, v.\`Array(Variant(String, UInt64))\`.String.null from test order by id"
|
||||||
$CH_CLIENT -q "select id from test where v.UInt64 is null order by id"
|
$CH_CLIENT -q "select id from test where v.UInt64 is null order by id"
|
||||||
|
$CH_CLIENT -q "insert into test select number, multiIf(number % 3 == 2, NULL, number % 3 == 1, number, arrayMap(x -> multiIf(number % 9 == 0, NULL, number % 9 == 3, 'str_' || toString(number), number), range(number % 10))) from numbers(250000) settings min_insert_block_size_rows=100000, min_insert_block_size_bytes=0"
|
||||||
$CH_CLIENT -q "insert into test select number, multiIf(number % 3 == 2, NULL, number % 3 == 1, number, arrayMap(x -> multiIf(number % 9 == 0, NULL, number % 9 == 3, 'str_' || toString(number), number), range(number % 10))) from numbers(1000000) settings min_insert_block_size_rows=100000"
|
|
||||||
$CH_CLIENT -q "select v, v.UInt64.null, v.\`Array(Variant(String, UInt64))\`.null, v.\`Array(Variant(String, UInt64))\`.size0, v.\`Array(Variant(String, UInt64))\`.UInt64.null from test order by id format Null"
|
$CH_CLIENT -q "select v, v.UInt64.null, v.\`Array(Variant(String, UInt64))\`.null, v.\`Array(Variant(String, UInt64))\`.size0, v.\`Array(Variant(String, UInt64))\`.UInt64.null from test order by id format Null"
|
||||||
$CH_CLIENT -q "select v.UInt64.null, v.\`Array(Variant(String, UInt64))\`.null, v.\`Array(Variant(String, UInt64))\`.size0, v.\`Array(Variant(String, UInt64))\`.UInt64.null from test order by id format Null"
|
$CH_CLIENT -q "select v.UInt64.null, v.\`Array(Variant(String, UInt64))\`.null, v.\`Array(Variant(String, UInt64))\`.size0, v.\`Array(Variant(String, UInt64))\`.UInt64.null from test order by id format Null"
|
||||||
$CH_CLIENT -q "select v.\`Array(Variant(String, UInt64))\`.null, v.\`Array(Variant(String, UInt64))\`.size0, v.\`Array(Variant(String, UInt64))\`.UInt64.null, v.\`Array(Variant(String, UInt64))\`.String.null from test order by id format Null"
|
$CH_CLIENT -q "select v.\`Array(Variant(String, UInt64))\`.null, v.\`Array(Variant(String, UInt64))\`.size0, v.\`Array(Variant(String, UInt64))\`.UInt64.null, v.\`Array(Variant(String, UInt64))\`.String.null from test order by id format Null"
|
||||||
@ -41,4 +40,3 @@ echo "MergeTree wide"
|
|||||||
$CH_CLIENT -q "create table test (id UInt64, v Variant(UInt64, Array(Variant(String, UInt64)))) engine=MergeTree order by id settings min_rows_for_wide_part=1, min_bytes_for_wide_part=1;"
|
$CH_CLIENT -q "create table test (id UInt64, v Variant(UInt64, Array(Variant(String, UInt64)))) engine=MergeTree order by id settings min_rows_for_wide_part=1, min_bytes_for_wide_part=1;"
|
||||||
test
|
test
|
||||||
$CH_CLIENT -q "drop table test;"
|
$CH_CLIENT -q "drop table test;"
|
||||||
|
|
||||||
|
@ -0,0 +1 @@
|
|||||||
|
Hello world
|
7
tests/queries/0_stateless/03206_no_exceptions_clickhouse_local.sh
Executable file
7
tests/queries/0_stateless/03206_no_exceptions_clickhouse_local.sh
Executable file
@ -0,0 +1,7 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
|
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||||
|
# shellcheck source=../shell_config.sh
|
||||||
|
. "$CURDIR"/../shell_config.sh
|
||||||
|
|
||||||
|
CLICKHOUSE_TERMINATE_ON_ANY_EXCEPTION=1 ${CLICKHOUSE_LOCAL} --query "SELECT * FROM table" --input-format CSV <<<"Hello, world"
|
@ -0,0 +1,13 @@
|
|||||||
|
DROP TABLE IF EXISTS test_table;
|
||||||
|
CREATE TABLE test_table
|
||||||
|
(
|
||||||
|
id UInt64,
|
||||||
|
value String
|
||||||
|
) ENGINE=TinyLog;
|
||||||
|
|
||||||
|
EXPLAIN SYNTAX
|
||||||
|
WITH 1 AS compound_value SELECT * APPLY (x -> compound_value.*)
|
||||||
|
FROM test_table WHERE x > 0
|
||||||
|
SETTINGS convert_query_to_cnf = true, optimize_using_constraints = true, optimize_substitute_columns = true; -- { serverError UNKNOWN_IDENTIFIER }
|
||||||
|
|
||||||
|
DROP TABLE test_table;
|
Loading…
Reference in New Issue
Block a user