mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Merge branch 'master' into fix-bad-test-gerasimchuk
This commit is contained in:
commit
d6c0f4d4a5
@ -209,8 +209,8 @@ std::vector<String> Client::loadWarningMessages()
|
||||
{} /* query_parameters */,
|
||||
"" /* query_id */,
|
||||
QueryProcessingStage::Complete,
|
||||
&global_context->getSettingsRef(),
|
||||
&global_context->getClientInfo(), false, {});
|
||||
&client_context->getSettingsRef(),
|
||||
&client_context->getClientInfo(), false, {});
|
||||
while (true)
|
||||
{
|
||||
Packet packet = connection->receivePacket();
|
||||
@ -306,9 +306,6 @@ void Client::initialize(Poco::Util::Application & self)
|
||||
if (env_password && !config().has("password"))
|
||||
config().setString("password", env_password);
|
||||
|
||||
// global_context->setApplicationType(Context::ApplicationType::CLIENT);
|
||||
global_context->setQueryParameters(query_parameters);
|
||||
|
||||
/// settings and limits could be specified in config file, but passed settings has higher priority
|
||||
for (const auto & setting : global_context->getSettingsRef().allUnchanged())
|
||||
{
|
||||
@ -382,7 +379,7 @@ try
|
||||
showWarnings();
|
||||
|
||||
/// Set user password complexity rules
|
||||
auto & access_control = global_context->getAccessControl();
|
||||
auto & access_control = client_context->getAccessControl();
|
||||
access_control.setPasswordComplexityRules(connection->getPasswordComplexityRules());
|
||||
|
||||
if (is_interactive && !delayed_interactive)
|
||||
@ -459,7 +456,7 @@ void Client::connect()
|
||||
<< connection_parameters.host << ":" << connection_parameters.port
|
||||
<< (!connection_parameters.user.empty() ? " as user " + connection_parameters.user : "") << "." << std::endl;
|
||||
|
||||
connection = Connection::createConnection(connection_parameters, global_context);
|
||||
connection = Connection::createConnection(connection_parameters, client_context);
|
||||
|
||||
if (max_client_network_bandwidth)
|
||||
{
|
||||
@ -528,7 +525,7 @@ void Client::connect()
|
||||
}
|
||||
}
|
||||
|
||||
if (!global_context->getSettingsRef().use_client_time_zone)
|
||||
if (!client_context->getSettingsRef().use_client_time_zone)
|
||||
{
|
||||
const auto & time_zone = connection->getServerTimezone(connection_parameters.timeouts);
|
||||
if (!time_zone.empty())
|
||||
@ -611,7 +608,7 @@ void Client::printChangedSettings() const
|
||||
}
|
||||
};
|
||||
|
||||
print_changes(global_context->getSettingsRef().changes(), "settings");
|
||||
print_changes(client_context->getSettingsRef().changes(), "settings");
|
||||
print_changes(cmd_merge_tree_settings.changes(), "MergeTree settings");
|
||||
}
|
||||
|
||||
@ -709,7 +706,7 @@ bool Client::processWithFuzzing(const String & full_query)
|
||||
{
|
||||
const char * begin = full_query.data();
|
||||
orig_ast = parseQuery(begin, begin + full_query.size(),
|
||||
global_context->getSettingsRef(),
|
||||
client_context->getSettingsRef(),
|
||||
/*allow_multi_statements=*/ true);
|
||||
}
|
||||
catch (const Exception & e)
|
||||
@ -733,7 +730,7 @@ bool Client::processWithFuzzing(const String & full_query)
|
||||
}
|
||||
|
||||
// Kusto is not a subject for fuzzing (yet)
|
||||
if (global_context->getSettingsRef().dialect == DB::Dialect::kusto)
|
||||
if (client_context->getSettingsRef().dialect == DB::Dialect::kusto)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
@ -1166,6 +1163,11 @@ void Client::processOptions(const OptionsDescription & options_description,
|
||||
|
||||
if (options.count("opentelemetry-tracestate"))
|
||||
global_context->getClientTraceContext().tracestate = options["opentelemetry-tracestate"].as<std::string>();
|
||||
|
||||
/// In case of clickhouse-client the `client_context` can be just an alias for the `global_context`.
|
||||
/// (There is no need to copy the context because clickhouse-client has no background tasks so it won't use that context in parallel.)
|
||||
client_context = global_context;
|
||||
initClientContext();
|
||||
}
|
||||
|
||||
|
||||
@ -1205,11 +1207,6 @@ void Client::processConfig()
|
||||
pager = config().getString("pager", "");
|
||||
|
||||
setDefaultFormatsAndCompressionFromConfiguration();
|
||||
|
||||
global_context->setClientName(std::string(DEFAULT_CLIENT_NAME));
|
||||
global_context->setQueryKindInitial();
|
||||
global_context->setQuotaClientKey(config().getString("quota_key", ""));
|
||||
global_context->setQueryKind(query_kind);
|
||||
}
|
||||
|
||||
|
||||
|
@ -16,7 +16,6 @@ public:
|
||||
int main(const std::vector<String> & /*args*/) override;
|
||||
|
||||
protected:
|
||||
|
||||
Poco::Util::LayeredConfiguration & getClientConfiguration() override;
|
||||
|
||||
bool processWithFuzzing(const String & full_query) override;
|
||||
|
@ -295,6 +295,8 @@ void LocalServer::cleanup()
|
||||
if (suggest)
|
||||
suggest.reset();
|
||||
|
||||
client_context.reset();
|
||||
|
||||
if (global_context)
|
||||
{
|
||||
global_context->shutdown();
|
||||
@ -436,7 +438,7 @@ void LocalServer::connect()
|
||||
in = input.get();
|
||||
}
|
||||
connection = LocalConnection::createConnection(
|
||||
connection_parameters, global_context, in, need_render_progress, need_render_profile_events, server_display_name);
|
||||
connection_parameters, client_context, in, need_render_progress, need_render_profile_events, server_display_name);
|
||||
}
|
||||
|
||||
|
||||
@ -497,8 +499,6 @@ try
|
||||
initTTYBuffer(toProgressOption(getClientConfiguration().getString("progress", "default")));
|
||||
ASTAlterCommand::setFormatAlterCommandsWithParentheses(true);
|
||||
|
||||
applyCmdSettings(global_context);
|
||||
|
||||
/// try to load user defined executable functions, throw on error and die
|
||||
try
|
||||
{
|
||||
@ -510,6 +510,11 @@ try
|
||||
throw;
|
||||
}
|
||||
|
||||
/// Must be called after we stopped initializing the global context and changing its settings.
|
||||
/// After this point the global context must be stayed almost unchanged till shutdown,
|
||||
/// and all necessary changes must be made to the client context instead.
|
||||
createClientContext();
|
||||
|
||||
if (is_interactive)
|
||||
{
|
||||
clearTerminal();
|
||||
@ -735,6 +740,9 @@ void LocalServer::processConfig()
|
||||
/// Load global settings from default_profile and system_profile.
|
||||
global_context->setDefaultProfiles(getClientConfiguration());
|
||||
|
||||
/// Command-line parameters can override settings from the default profile.
|
||||
applyCmdSettings(global_context);
|
||||
|
||||
/// We load temporary database first, because projections need it.
|
||||
DatabaseCatalog::instance().initializeAndLoadTemporaryDatabase();
|
||||
|
||||
@ -778,10 +786,6 @@ void LocalServer::processConfig()
|
||||
|
||||
server_display_name = getClientConfiguration().getString("display_name", "");
|
||||
prompt_by_server_display_name = getClientConfiguration().getRawString("prompt_by_server_display_name.default", ":) ");
|
||||
|
||||
global_context->setQueryKindInitial();
|
||||
global_context->setQueryKind(query_kind);
|
||||
global_context->setQueryParameters(query_parameters);
|
||||
}
|
||||
|
||||
|
||||
@ -860,6 +864,16 @@ void LocalServer::applyCmdOptions(ContextMutablePtr context)
|
||||
}
|
||||
|
||||
|
||||
void LocalServer::createClientContext()
|
||||
{
|
||||
/// In case of clickhouse-local it's necessary to use a separate context for client-related purposes.
|
||||
/// We can't just change the global context because it is used in background tasks (for example, in merges)
|
||||
/// which don't expect that the global context can suddenly change.
|
||||
client_context = Context::createCopy(global_context);
|
||||
initClientContext();
|
||||
}
|
||||
|
||||
|
||||
void LocalServer::processOptions(const OptionsDescription &, const CommandLineOptions & options, const std::vector<Arguments> &, const std::vector<Arguments> &)
|
||||
{
|
||||
if (options.count("table"))
|
||||
|
@ -31,7 +31,6 @@ public:
|
||||
int main(const std::vector<String> & /*args*/) override;
|
||||
|
||||
protected:
|
||||
|
||||
Poco::Util::LayeredConfiguration & getClientConfiguration() override;
|
||||
|
||||
void connect() override;
|
||||
@ -50,7 +49,6 @@ protected:
|
||||
void processConfig() override;
|
||||
void readArguments(int argc, char ** argv, Arguments & common_arguments, std::vector<Arguments> &, std::vector<Arguments> &) override;
|
||||
|
||||
|
||||
void updateLoggerLevel(const String & logs_level) override;
|
||||
|
||||
private:
|
||||
@ -67,6 +65,8 @@ private:
|
||||
void applyCmdOptions(ContextMutablePtr context);
|
||||
void applyCmdSettings(ContextMutablePtr context);
|
||||
|
||||
void createClientContext();
|
||||
|
||||
ServerSettings server_settings;
|
||||
|
||||
std::optional<StatusFile> status;
|
||||
|
@ -1740,7 +1740,7 @@ QueryAnalyzer::QueryTreeNodesWithNames QueryAnalyzer::resolveQualifiedMatcher(Qu
|
||||
const auto * tuple_data_type = typeid_cast<const DataTypeTuple *>(result_type.get());
|
||||
if (!tuple_data_type)
|
||||
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
|
||||
"Qualified matcher {} find non compound expression {} with type {}. Expected tuple or array of tuples. In scope {}",
|
||||
"Qualified matcher {} found a non-compound expression {} with type {}. Expected a tuple or an array of tuples. In scope {}",
|
||||
matcher_node->formatASTForErrorMessage(),
|
||||
expression_query_tree_node->formatASTForErrorMessage(),
|
||||
expression_query_tree_node->getResultType()->getName(),
|
||||
|
@ -477,7 +477,7 @@ void ClientBase::sendExternalTables(ASTPtr parsed_query)
|
||||
|
||||
std::vector<ExternalTableDataPtr> data;
|
||||
for (auto & table : external_tables)
|
||||
data.emplace_back(table.getData(global_context));
|
||||
data.emplace_back(table.getData(client_context));
|
||||
|
||||
connection->sendExternalTablesData(data);
|
||||
}
|
||||
@ -690,10 +690,10 @@ try
|
||||
/// intermixed with data with parallel formatting.
|
||||
/// It may increase code complexity significantly.
|
||||
if (!extras_into_stdout || select_only_into_file)
|
||||
output_format = global_context->getOutputFormatParallelIfPossible(
|
||||
output_format = client_context->getOutputFormatParallelIfPossible(
|
||||
current_format, out_file_buf ? *out_file_buf : *out_buf, block);
|
||||
else
|
||||
output_format = global_context->getOutputFormat(
|
||||
output_format = client_context->getOutputFormat(
|
||||
current_format, out_file_buf ? *out_file_buf : *out_buf, block);
|
||||
|
||||
output_format->setAutoFlush();
|
||||
@ -772,6 +772,15 @@ void ClientBase::adjustSettings()
|
||||
global_context->setSettings(settings);
|
||||
}
|
||||
|
||||
void ClientBase::initClientContext()
|
||||
{
|
||||
client_context->setClientName(std::string(DEFAULT_CLIENT_NAME));
|
||||
client_context->setQuotaClientKey(getClientConfiguration().getString("quota_key", ""));
|
||||
client_context->setQueryKindInitial();
|
||||
client_context->setQueryKind(query_kind);
|
||||
client_context->setQueryParameters(query_parameters);
|
||||
}
|
||||
|
||||
bool ClientBase::isRegularFile(int fd)
|
||||
{
|
||||
struct stat file_stat;
|
||||
@ -962,7 +971,7 @@ void ClientBase::processTextAsSingleQuery(const String & full_query)
|
||||
/// client-side. Thus we need to parse the query.
|
||||
const char * begin = full_query.data();
|
||||
auto parsed_query = parseQuery(begin, begin + full_query.size(),
|
||||
global_context->getSettingsRef(),
|
||||
client_context->getSettingsRef(),
|
||||
/*allow_multi_statements=*/ false);
|
||||
|
||||
if (!parsed_query)
|
||||
@ -985,7 +994,7 @@ void ClientBase::processTextAsSingleQuery(const String & full_query)
|
||||
/// But for asynchronous inserts we don't extract data, because it's needed
|
||||
/// to be done on server side in that case (for coalescing the data from multiple inserts on server side).
|
||||
const auto * insert = parsed_query->as<ASTInsertQuery>();
|
||||
if (insert && isSyncInsertWithData(*insert, global_context))
|
||||
if (insert && isSyncInsertWithData(*insert, client_context))
|
||||
query_to_execute = full_query.substr(0, insert->data - full_query.data());
|
||||
else
|
||||
query_to_execute = full_query;
|
||||
@ -1103,7 +1112,7 @@ void ClientBase::processOrdinaryQuery(const String & query_to_execute, ASTPtr pa
|
||||
}
|
||||
}
|
||||
|
||||
const auto & settings = global_context->getSettingsRef();
|
||||
const auto & settings = client_context->getSettingsRef();
|
||||
const Int32 signals_before_stop = settings.partial_result_on_first_cancel ? 2 : 1;
|
||||
|
||||
int retries_left = 10;
|
||||
@ -1118,10 +1127,10 @@ void ClientBase::processOrdinaryQuery(const String & query_to_execute, ASTPtr pa
|
||||
connection_parameters.timeouts,
|
||||
query,
|
||||
query_parameters,
|
||||
global_context->getCurrentQueryId(),
|
||||
client_context->getCurrentQueryId(),
|
||||
query_processing_stage,
|
||||
&global_context->getSettingsRef(),
|
||||
&global_context->getClientInfo(),
|
||||
&client_context->getSettingsRef(),
|
||||
&client_context->getClientInfo(),
|
||||
true,
|
||||
[&](const Progress & progress) { onProgress(progress); });
|
||||
|
||||
@ -1308,7 +1317,7 @@ void ClientBase::onProgress(const Progress & value)
|
||||
|
||||
void ClientBase::onTimezoneUpdate(const String & tz)
|
||||
{
|
||||
global_context->setSetting("session_timezone", tz);
|
||||
client_context->setSetting("session_timezone", tz);
|
||||
}
|
||||
|
||||
|
||||
@ -1504,13 +1513,13 @@ bool ClientBase::receiveSampleBlock(Block & out, ColumnsDescription & columns_de
|
||||
|
||||
void ClientBase::setInsertionTable(const ASTInsertQuery & insert_query)
|
||||
{
|
||||
if (!global_context->hasInsertionTable() && insert_query.table)
|
||||
if (!client_context->hasInsertionTable() && insert_query.table)
|
||||
{
|
||||
String table = insert_query.table->as<ASTIdentifier &>().shortName();
|
||||
if (!table.empty())
|
||||
{
|
||||
String database = insert_query.database ? insert_query.database->as<ASTIdentifier &>().shortName() : "";
|
||||
global_context->setInsertionTable(StorageID(database, table));
|
||||
client_context->setInsertionTable(StorageID(database, table));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1561,7 +1570,7 @@ void ClientBase::processInsertQuery(const String & query_to_execute, ASTPtr pars
|
||||
const auto & parsed_insert_query = parsed_query->as<ASTInsertQuery &>();
|
||||
if ((!parsed_insert_query.data && !parsed_insert_query.infile) && (is_interactive || (!stdin_is_a_tty && !isStdinNotEmptyAndValid(std_in))))
|
||||
{
|
||||
const auto & settings = global_context->getSettingsRef();
|
||||
const auto & settings = client_context->getSettingsRef();
|
||||
if (settings.throw_if_no_data_to_insert)
|
||||
throw Exception(ErrorCodes::NO_DATA_TO_INSERT, "No data to insert");
|
||||
else
|
||||
@ -1575,10 +1584,10 @@ void ClientBase::processInsertQuery(const String & query_to_execute, ASTPtr pars
|
||||
connection_parameters.timeouts,
|
||||
query,
|
||||
query_parameters,
|
||||
global_context->getCurrentQueryId(),
|
||||
client_context->getCurrentQueryId(),
|
||||
query_processing_stage,
|
||||
&global_context->getSettingsRef(),
|
||||
&global_context->getClientInfo(),
|
||||
&client_context->getSettingsRef(),
|
||||
&client_context->getClientInfo(),
|
||||
true,
|
||||
[&](const Progress & progress) { onProgress(progress); });
|
||||
|
||||
@ -1626,7 +1635,7 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
|
||||
|
||||
/// Set callback to be called on file progress.
|
||||
if (tty_buf)
|
||||
progress_indication.setFileProgressCallback(global_context, *tty_buf);
|
||||
progress_indication.setFileProgressCallback(client_context, *tty_buf);
|
||||
}
|
||||
|
||||
/// If data fetched from file (maybe compressed file)
|
||||
@ -1660,10 +1669,10 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
|
||||
}
|
||||
|
||||
StorageFile::CommonArguments args{
|
||||
WithContext(global_context),
|
||||
WithContext(client_context),
|
||||
parsed_insert_query->table_id,
|
||||
current_format,
|
||||
getFormatSettings(global_context),
|
||||
getFormatSettings(client_context),
|
||||
compression_method,
|
||||
columns_for_storage_file,
|
||||
ConstraintsDescription{},
|
||||
@ -1671,7 +1680,7 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
|
||||
{},
|
||||
String{},
|
||||
};
|
||||
StoragePtr storage = std::make_shared<StorageFile>(in_file, global_context->getUserFilesPath(), args);
|
||||
StoragePtr storage = std::make_shared<StorageFile>(in_file, client_context->getUserFilesPath(), args);
|
||||
storage->startup();
|
||||
SelectQueryInfo query_info;
|
||||
|
||||
@ -1682,16 +1691,16 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
|
||||
storage->read(
|
||||
plan,
|
||||
sample.getNames(),
|
||||
storage->getStorageSnapshot(metadata, global_context),
|
||||
storage->getStorageSnapshot(metadata, client_context),
|
||||
query_info,
|
||||
global_context,
|
||||
client_context,
|
||||
{},
|
||||
global_context->getSettingsRef().max_block_size,
|
||||
client_context->getSettingsRef().max_block_size,
|
||||
getNumberOfPhysicalCPUCores());
|
||||
|
||||
auto builder = plan.buildQueryPipeline(
|
||||
QueryPlanOptimizationSettings::fromContext(global_context),
|
||||
BuildQueryPipelineSettings::fromContext(global_context));
|
||||
QueryPlanOptimizationSettings::fromContext(client_context),
|
||||
BuildQueryPipelineSettings::fromContext(client_context));
|
||||
|
||||
QueryPlanResourceHolder resources;
|
||||
auto pipe = QueryPipelineBuilder::getPipe(std::move(*builder), resources);
|
||||
@ -1752,14 +1761,14 @@ void ClientBase::sendDataFrom(ReadBuffer & buf, Block & sample, const ColumnsDes
|
||||
current_format = insert->format;
|
||||
}
|
||||
|
||||
auto source = global_context->getInputFormat(current_format, buf, sample, insert_format_max_block_size);
|
||||
auto source = client_context->getInputFormat(current_format, buf, sample, insert_format_max_block_size);
|
||||
Pipe pipe(source);
|
||||
|
||||
if (columns_description.hasDefaults())
|
||||
{
|
||||
pipe.addSimpleTransform([&](const Block & header)
|
||||
{
|
||||
return std::make_shared<AddingDefaultsTransform>(header, columns_description, *source, global_context);
|
||||
return std::make_shared<AddingDefaultsTransform>(header, columns_description, *source, client_context);
|
||||
});
|
||||
}
|
||||
|
||||
@ -1921,12 +1930,12 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
|
||||
|
||||
if (is_interactive)
|
||||
{
|
||||
global_context->setCurrentQueryId("");
|
||||
client_context->setCurrentQueryId("");
|
||||
// Generate a new query_id
|
||||
for (const auto & query_id_format : query_id_formats)
|
||||
{
|
||||
writeString(query_id_format.first, std_out);
|
||||
writeString(fmt::format(fmt::runtime(query_id_format.second), fmt::arg("query_id", global_context->getCurrentQueryId())), std_out);
|
||||
writeString(fmt::format(fmt::runtime(query_id_format.second), fmt::arg("query_id", client_context->getCurrentQueryId())), std_out);
|
||||
writeChar('\n', std_out);
|
||||
std_out.next();
|
||||
}
|
||||
@ -1953,7 +1962,7 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
|
||||
auto password = auth_data->getPassword();
|
||||
|
||||
if (password)
|
||||
global_context->getAccessControl().checkPasswordComplexityRules(*password);
|
||||
client_context->getAccessControl().checkPasswordComplexityRules(*password);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1968,15 +1977,15 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
|
||||
std::optional<Settings> old_settings;
|
||||
SCOPE_EXIT_SAFE({
|
||||
if (old_settings)
|
||||
global_context->setSettings(*old_settings);
|
||||
client_context->setSettings(*old_settings);
|
||||
});
|
||||
|
||||
auto apply_query_settings = [&](const IAST & settings_ast)
|
||||
{
|
||||
if (!old_settings)
|
||||
old_settings.emplace(global_context->getSettingsRef());
|
||||
global_context->applySettingsChanges(settings_ast.as<ASTSetQuery>()->changes);
|
||||
global_context->resetSettingsToDefaultValue(settings_ast.as<ASTSetQuery>()->default_settings);
|
||||
old_settings.emplace(client_context->getSettingsRef());
|
||||
client_context->applySettingsChanges(settings_ast.as<ASTSetQuery>()->changes);
|
||||
client_context->resetSettingsToDefaultValue(settings_ast.as<ASTSetQuery>()->default_settings);
|
||||
};
|
||||
|
||||
const auto * insert = parsed_query->as<ASTInsertQuery>();
|
||||
@ -2009,7 +2018,7 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
|
||||
if (insert && insert->select)
|
||||
insert->tryFindInputFunction(input_function);
|
||||
|
||||
bool is_async_insert_with_inlined_data = global_context->getSettingsRef().async_insert && insert && insert->hasInlinedData();
|
||||
bool is_async_insert_with_inlined_data = client_context->getSettingsRef().async_insert && insert && insert->hasInlinedData();
|
||||
|
||||
if (is_async_insert_with_inlined_data)
|
||||
{
|
||||
@ -2044,9 +2053,9 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
|
||||
if (change.name == "profile")
|
||||
current_profile = change.value.safeGet<String>();
|
||||
else
|
||||
global_context->applySettingChange(change);
|
||||
client_context->applySettingChange(change);
|
||||
}
|
||||
global_context->resetSettingsToDefaultValue(set_query->default_settings);
|
||||
client_context->resetSettingsToDefaultValue(set_query->default_settings);
|
||||
|
||||
/// Query parameters inside SET queries should be also saved on the client side
|
||||
/// to override their previous definitions set with --param_* arguments
|
||||
@ -2054,7 +2063,7 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
|
||||
for (const auto & [name, value] : set_query->query_parameters)
|
||||
query_parameters.insert_or_assign(name, value);
|
||||
|
||||
global_context->addQueryParameters(NameToNameMap{set_query->query_parameters.begin(), set_query->query_parameters.end()});
|
||||
client_context->addQueryParameters(NameToNameMap{set_query->query_parameters.begin(), set_query->query_parameters.end()});
|
||||
}
|
||||
if (const auto * use_query = parsed_query->as<ASTUseQuery>())
|
||||
{
|
||||
@ -2131,8 +2140,8 @@ MultiQueryProcessingStage ClientBase::analyzeMultiQueryText(
|
||||
if (this_query_begin >= all_queries_end)
|
||||
return MultiQueryProcessingStage::QUERIES_END;
|
||||
|
||||
unsigned max_parser_depth = static_cast<unsigned>(global_context->getSettingsRef().max_parser_depth);
|
||||
unsigned max_parser_backtracks = static_cast<unsigned>(global_context->getSettingsRef().max_parser_backtracks);
|
||||
unsigned max_parser_depth = static_cast<unsigned>(client_context->getSettingsRef().max_parser_depth);
|
||||
unsigned max_parser_backtracks = static_cast<unsigned>(client_context->getSettingsRef().max_parser_backtracks);
|
||||
|
||||
// If there are only comments left until the end of file, we just
|
||||
// stop. The parser can't handle this situation because it always
|
||||
@ -2152,7 +2161,7 @@ MultiQueryProcessingStage ClientBase::analyzeMultiQueryText(
|
||||
try
|
||||
{
|
||||
parsed_query = parseQuery(this_query_end, all_queries_end,
|
||||
global_context->getSettingsRef(),
|
||||
client_context->getSettingsRef(),
|
||||
/*allow_multi_statements=*/ true);
|
||||
}
|
||||
catch (const Exception & e)
|
||||
@ -2195,7 +2204,7 @@ MultiQueryProcessingStage ClientBase::analyzeMultiQueryText(
|
||||
{
|
||||
this_query_end = find_first_symbols<'\n'>(insert_ast->data, all_queries_end);
|
||||
insert_ast->end = this_query_end;
|
||||
query_to_execute_end = isSyncInsertWithData(*insert_ast, global_context) ? insert_ast->data : this_query_end;
|
||||
query_to_execute_end = isSyncInsertWithData(*insert_ast, client_context) ? insert_ast->data : this_query_end;
|
||||
}
|
||||
|
||||
query_to_execute = all_queries_text.substr(this_query_begin - all_queries_text.data(), query_to_execute_end - this_query_begin);
|
||||
@ -2404,13 +2413,13 @@ bool ClientBase::executeMultiQuery(const String & all_queries_text)
|
||||
// , where the inline data is delimited by semicolon and not by a
|
||||
// newline.
|
||||
auto * insert_ast = parsed_query->as<ASTInsertQuery>();
|
||||
if (insert_ast && isSyncInsertWithData(*insert_ast, global_context))
|
||||
if (insert_ast && isSyncInsertWithData(*insert_ast, client_context))
|
||||
{
|
||||
this_query_end = insert_ast->end;
|
||||
adjustQueryEnd(
|
||||
this_query_end, all_queries_end,
|
||||
static_cast<unsigned>(global_context->getSettingsRef().max_parser_depth),
|
||||
static_cast<unsigned>(global_context->getSettingsRef().max_parser_backtracks));
|
||||
static_cast<unsigned>(client_context->getSettingsRef().max_parser_depth),
|
||||
static_cast<unsigned>(client_context->getSettingsRef().max_parser_backtracks));
|
||||
}
|
||||
|
||||
// Report error.
|
||||
@ -2541,10 +2550,10 @@ void ClientBase::runInteractive()
|
||||
if (load_suggestions)
|
||||
{
|
||||
/// Load suggestion data from the server.
|
||||
if (global_context->getApplicationType() == Context::ApplicationType::CLIENT)
|
||||
suggest->load<Connection>(global_context, connection_parameters, getClientConfiguration().getInt("suggestion_limit"), wait_for_suggestions_to_load);
|
||||
else if (global_context->getApplicationType() == Context::ApplicationType::LOCAL)
|
||||
suggest->load<LocalConnection>(global_context, connection_parameters, getClientConfiguration().getInt("suggestion_limit"), wait_for_suggestions_to_load);
|
||||
if (client_context->getApplicationType() == Context::ApplicationType::CLIENT)
|
||||
suggest->load<Connection>(client_context, connection_parameters, getClientConfiguration().getInt("suggestion_limit"), wait_for_suggestions_to_load);
|
||||
else if (client_context->getApplicationType() == Context::ApplicationType::LOCAL)
|
||||
suggest->load<LocalConnection>(client_context, connection_parameters, getClientConfiguration().getInt("suggestion_limit"), wait_for_suggestions_to_load);
|
||||
}
|
||||
|
||||
if (home_path.empty())
|
||||
@ -2682,7 +2691,7 @@ void ClientBase::runInteractive()
|
||||
{
|
||||
// If a separate connection loading suggestions failed to open a new session,
|
||||
// use the main session to receive them.
|
||||
suggest->load(*connection, connection_parameters.timeouts, getClientConfiguration().getInt("suggestion_limit"), global_context->getClientInfo());
|
||||
suggest->load(*connection, connection_parameters.timeouts, getClientConfiguration().getInt("suggestion_limit"), client_context->getClientInfo());
|
||||
}
|
||||
|
||||
try
|
||||
@ -2731,10 +2740,10 @@ bool ClientBase::processMultiQueryFromFile(const String & file_name)
|
||||
|
||||
if (!getClientConfiguration().has("log_comment"))
|
||||
{
|
||||
Settings settings = global_context->getSettings();
|
||||
Settings settings = client_context->getSettings();
|
||||
/// NOTE: cannot use even weakly_canonical() since it fails for /dev/stdin due to resolving of "pipe:[X]"
|
||||
settings.log_comment = fs::absolute(fs::path(file_name));
|
||||
global_context->setSettings(settings);
|
||||
client_context->setSettings(settings);
|
||||
}
|
||||
|
||||
return executeMultiQuery(queries_from_file);
|
||||
|
@ -206,6 +206,9 @@ protected:
|
||||
/// Adjust some settings after command line options and config had been processed.
|
||||
void adjustSettings();
|
||||
|
||||
/// Initializes the client context.
|
||||
void initClientContext();
|
||||
|
||||
void setDefaultFormatsAndCompressionFromConfiguration();
|
||||
|
||||
void initTTYBuffer(ProgressOption progress);
|
||||
@ -215,6 +218,9 @@ protected:
|
||||
SharedContextHolder shared_context;
|
||||
ContextMutablePtr global_context;
|
||||
|
||||
/// Client context is a context used only by the client to parse queries, process query parameters and to connect to clickhouse-server.
|
||||
ContextMutablePtr client_context;
|
||||
|
||||
LoggerPtr fatal_log;
|
||||
Poco::AutoPtr<Poco::SplitterChannel> fatal_channel_ptr;
|
||||
Poco::AutoPtr<Poco::Channel> fatal_console_channel_ptr;
|
||||
|
@ -186,7 +186,7 @@ class IColumn;
|
||||
M(Bool, allow_suspicious_ttl_expressions, false, "Reject TTL expressions that don't depend on any of table's columns. It indicates a user error most of the time.", 0) \
|
||||
M(Bool, allow_suspicious_variant_types, false, "In CREATE TABLE statement allows specifying Variant type with similar variant types (for example, with different numeric or date types). Enabling this setting may introduce some ambiguity when working with values with similar types.", 0) \
|
||||
M(Bool, allow_suspicious_primary_key, false, "Forbid suspicious PRIMARY KEY/ORDER BY for MergeTree (i.e. SimpleAggregateFunction)", 0) \
|
||||
M(Bool, compile_expressions, false, "Compile some scalar functions and operators to native code.", 0) \
|
||||
M(Bool, compile_expressions, true, "Compile some scalar functions and operators to native code.", 0) \
|
||||
M(UInt64, min_count_to_compile_expression, 3, "The number of identical expressions before they are JIT-compiled", 0) \
|
||||
M(Bool, compile_aggregate_expressions, true, "Compile aggregate functions to native code.", 0) \
|
||||
M(UInt64, min_count_to_compile_aggregate_expression, 3, "The number of identical aggregate expressions before they are JIT-compiled", 0) \
|
||||
|
@ -57,6 +57,7 @@ String ClickHouseVersion::toString() const
|
||||
/// Note: please check if the key already exists to prevent duplicate entries.
|
||||
static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory::SettingsChanges>> settings_changes_history_initializer =
|
||||
{
|
||||
{"24.8", {{"compile_expressions", false, true, "We believe that the LLVM infrastructure behind the JIT compiler is stable enough to enable this setting by default."}}},
|
||||
{"24.7", {{"output_format_parquet_write_page_index", false, true, "Add a possibility to write page index into parquet files."},
|
||||
{"output_format_binary_encode_types_in_binary_format", false, false, "Added new setting to allow to write type names in binary format in RowBinaryWithNamesAndTypes output format"},
|
||||
{"input_format_binary_decode_types_in_binary_format", false, false, "Added new setting to allow to read type names in binary format in RowBinaryWithNamesAndTypes input format"},
|
||||
@ -80,7 +81,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
|
||||
{"ignore_on_cluster_for_replicated_named_collections_queries", false, false, "Ignore ON CLUSTER clause for replicated named collections management queries."},
|
||||
{"backup_restore_s3_retry_attempts", 1000,1000, "Setting for Aws::Client::RetryStrategy, Aws::Client does retries itself, 0 means no retries. It takes place only for backup/restore."},
|
||||
{"postgresql_connection_attempt_timeout", 2, 2, "Allow to control 'connect_timeout' parameter of PostgreSQL connection."},
|
||||
{"postgresql_connection_pool_retries", 2, 2, "Allow to control the number of retries in PostgreSQL connection pool."}
|
||||
{"postgresql_connection_pool_retries", 2, 2, "Allow to control the number of retries in PostgreSQL connection pool."},
|
||||
}},
|
||||
{"24.6", {{"materialize_skip_indexes_on_insert", true, true, "Added new setting to allow to disable materialization of skip indexes on insert"},
|
||||
{"materialize_statistics_on_insert", true, true, "Added new setting to allow to disable materialization of statistics on insert"},
|
||||
|
@ -34,7 +34,7 @@ public:
|
||||
|
||||
String getFileName() const override { return impl->getFileName(); }
|
||||
|
||||
size_t getFileSize() override { return impl->getFileSize(); }
|
||||
std::optional<size_t> tryGetFileSize() override { return impl->tryGetFileSize(); }
|
||||
|
||||
String getInfoForLog() override { return impl->getInfoForLog(); }
|
||||
|
||||
|
@ -253,16 +253,15 @@ void ReadBufferFromAzureBlobStorage::initialize()
|
||||
initialized = true;
|
||||
}
|
||||
|
||||
size_t ReadBufferFromAzureBlobStorage::getFileSize()
|
||||
std::optional<size_t> ReadBufferFromAzureBlobStorage::tryGetFileSize()
|
||||
{
|
||||
if (!blob_client)
|
||||
blob_client = std::make_unique<Azure::Storage::Blobs::BlobClient>(blob_container_client->GetBlobClient(path));
|
||||
|
||||
if (file_size.has_value())
|
||||
return *file_size;
|
||||
if (!file_size)
|
||||
file_size = blob_client->GetProperties().Value.BlobSize;
|
||||
|
||||
file_size = blob_client->GetProperties().Value.BlobSize;
|
||||
return *file_size;
|
||||
return file_size;
|
||||
}
|
||||
|
||||
size_t ReadBufferFromAzureBlobStorage::readBigAt(char * to, size_t n, size_t range_begin, const std::function<bool(size_t)> & /*progress_callback*/) const
|
||||
|
@ -42,7 +42,7 @@ public:
|
||||
|
||||
bool supportsRightBoundedReads() const override { return true; }
|
||||
|
||||
size_t getFileSize() override;
|
||||
std::optional<size_t> tryGetFileSize() override;
|
||||
|
||||
size_t readBigAt(char * to, size_t n, size_t range_begin, const std::function<bool(size_t)> & progress_callback) const override;
|
||||
|
||||
|
@ -41,7 +41,7 @@ public:
|
||||
|
||||
void setReadUntilEnd() override { setReadUntilPosition(getFileSize()); }
|
||||
|
||||
size_t getFileSize() override { return getTotalSize(blobs_to_read); }
|
||||
std::optional<size_t> tryGetFileSize() override { return getTotalSize(blobs_to_read); }
|
||||
|
||||
size_t getFileOffsetOfBufferEnd() const override { return file_offset_of_buffer_end; }
|
||||
|
||||
|
@ -321,7 +321,7 @@ public:
|
||||
off_t getPosition() override { throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "getPosition not supported when reading from archive"); }
|
||||
String getFileName() const override { return handle.getFileName(); }
|
||||
|
||||
size_t getFileSize() override { return handle.getFileInfo().uncompressed_size; }
|
||||
std::optional<size_t> tryGetFileSize() override { return handle.getFileInfo().uncompressed_size; }
|
||||
|
||||
Handle releaseHandle() && { return std::move(handle); }
|
||||
|
||||
|
@ -317,7 +317,7 @@ public:
|
||||
|
||||
String getFileName() const override { return handle.getFileName(); }
|
||||
|
||||
size_t getFileSize() override { return handle.getFileInfo().uncompressed_size; }
|
||||
std::optional<size_t> tryGetFileSize() override { return handle.getFileInfo().uncompressed_size; }
|
||||
|
||||
/// Releases owned handle to pass it to an enumerator.
|
||||
HandleHolder releaseHandle() &&
|
||||
|
@ -244,7 +244,7 @@ void AsynchronousReadBufferFromFileDescriptor::rewind()
|
||||
file_offset_of_buffer_end = 0;
|
||||
}
|
||||
|
||||
size_t AsynchronousReadBufferFromFileDescriptor::getFileSize()
|
||||
std::optional<size_t> AsynchronousReadBufferFromFileDescriptor::tryGetFileSize()
|
||||
{
|
||||
return getSizeFromFileDescriptor(fd, getFileName());
|
||||
}
|
||||
|
@ -68,7 +68,7 @@ public:
|
||||
/// Seek to the beginning, discarding already read data if any. Useful to reread file that changes on every read.
|
||||
void rewind();
|
||||
|
||||
size_t getFileSize() override;
|
||||
std::optional<size_t> tryGetFileSize() override;
|
||||
|
||||
size_t getFileOffsetOfBufferEnd() const override { return file_offset_of_buffer_end; }
|
||||
|
||||
|
@ -21,7 +21,7 @@ public:
|
||||
off_t seek(off_t off, int whence) override;
|
||||
off_t getPosition() override;
|
||||
|
||||
size_t getFileSize() override { return total_size; }
|
||||
std::optional<size_t> tryGetFileSize() override { return total_size; }
|
||||
|
||||
private:
|
||||
bool nextImpl() override;
|
||||
|
@ -87,7 +87,7 @@ off_t MMapReadBufferFromFileDescriptor::seek(off_t offset, int whence)
|
||||
return new_pos;
|
||||
}
|
||||
|
||||
size_t MMapReadBufferFromFileDescriptor::getFileSize()
|
||||
std::optional<size_t> MMapReadBufferFromFileDescriptor::tryGetFileSize()
|
||||
{
|
||||
return getSizeFromFileDescriptor(getFD(), getFileName());
|
||||
}
|
||||
|
@ -38,7 +38,7 @@ public:
|
||||
|
||||
int getFD() const;
|
||||
|
||||
size_t getFileSize() override;
|
||||
std::optional<size_t> tryGetFileSize() override;
|
||||
|
||||
size_t readBigAt(char * to, size_t n, size_t offset, const std::function<bool(size_t)> &) const override;
|
||||
bool supportsReadAt() override { return true; }
|
||||
|
@ -152,7 +152,7 @@ off_t ParallelReadBuffer::seek(off_t offset, int whence)
|
||||
return offset;
|
||||
}
|
||||
|
||||
size_t ParallelReadBuffer::getFileSize()
|
||||
std::optional<size_t> ParallelReadBuffer::tryGetFileSize()
|
||||
{
|
||||
return file_size;
|
||||
}
|
||||
|
@ -33,7 +33,7 @@ public:
|
||||
~ParallelReadBuffer() override { finishAndWait(); }
|
||||
|
||||
off_t seek(off_t off, int whence) override;
|
||||
size_t getFileSize() override;
|
||||
std::optional<size_t> tryGetFileSize() override;
|
||||
off_t getPosition() override;
|
||||
|
||||
const SeekableReadBuffer & getReadBuffer() const { return input; }
|
||||
|
@ -19,7 +19,8 @@ private:
|
||||
std::string getFileName() const override { return "<empty>"; }
|
||||
off_t seek(off_t /*off*/, int /*whence*/) override { return 0; }
|
||||
off_t getPosition() override { return 0; }
|
||||
size_t getFileSize() override { return 0; }
|
||||
std::optional<size_t> tryGetFileSize() override { return 0; }
|
||||
size_t getFileOffsetOfBufferEnd() const override { return 0; }
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -30,7 +30,7 @@ public:
|
||||
|
||||
void setReadUntilEnd() override { in->setReadUntilEnd(); }
|
||||
|
||||
size_t getFileSize() override { return in->getFileSize(); }
|
||||
std::optional<size_t> tryGetFileSize() override { return in->tryGetFileSize(); }
|
||||
|
||||
private:
|
||||
bool nextImpl() override;
|
||||
|
@ -5,11 +5,6 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int UNKNOWN_FILE_SIZE;
|
||||
}
|
||||
|
||||
ReadBufferFromFileBase::ReadBufferFromFileBase() : BufferWithOwnMemory<SeekableReadBuffer>(0)
|
||||
{
|
||||
}
|
||||
@ -26,11 +21,9 @@ ReadBufferFromFileBase::ReadBufferFromFileBase(
|
||||
|
||||
ReadBufferFromFileBase::~ReadBufferFromFileBase() = default;
|
||||
|
||||
size_t ReadBufferFromFileBase::getFileSize()
|
||||
std::optional<size_t> ReadBufferFromFileBase::tryGetFileSize()
|
||||
{
|
||||
if (file_size)
|
||||
return *file_size;
|
||||
throw Exception(ErrorCodes::UNKNOWN_FILE_SIZE, "Cannot find out file size for read buffer");
|
||||
return file_size;
|
||||
}
|
||||
|
||||
void ReadBufferFromFileBase::setProgressCallback(ContextPtr context)
|
||||
|
@ -50,7 +50,7 @@ public:
|
||||
clock_type = clock_type_;
|
||||
}
|
||||
|
||||
size_t getFileSize() override;
|
||||
std::optional<size_t> tryGetFileSize() override;
|
||||
|
||||
void setProgressCallback(ContextPtr context);
|
||||
|
||||
|
@ -52,9 +52,9 @@ bool ReadBufferFromFileDecorator::nextImpl()
|
||||
return result;
|
||||
}
|
||||
|
||||
size_t ReadBufferFromFileDecorator::getFileSize()
|
||||
std::optional<size_t> ReadBufferFromFileDecorator::tryGetFileSize()
|
||||
{
|
||||
return getFileSizeFromReadBuffer(*impl);
|
||||
return tryGetFileSizeFromReadBuffer(*impl);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -27,7 +27,7 @@ public:
|
||||
|
||||
ReadBuffer & getWrappedReadBuffer() { return *impl; }
|
||||
|
||||
size_t getFileSize() override;
|
||||
std::optional<size_t> tryGetFileSize() override;
|
||||
|
||||
protected:
|
||||
std::unique_ptr<SeekableReadBuffer> impl;
|
||||
|
@ -253,7 +253,7 @@ void ReadBufferFromFileDescriptor::rewind()
|
||||
file_offset_of_buffer_end = 0;
|
||||
}
|
||||
|
||||
size_t ReadBufferFromFileDescriptor::getFileSize()
|
||||
std::optional<size_t> ReadBufferFromFileDescriptor::tryGetFileSize()
|
||||
{
|
||||
return getSizeFromFileDescriptor(fd, getFileName());
|
||||
}
|
||||
|
@ -69,7 +69,7 @@ public:
|
||||
/// Seek to the beginning, discarding already read data if any. Useful to reread file that changes on every read.
|
||||
void rewind();
|
||||
|
||||
size_t getFileSize() override;
|
||||
std::optional<size_t> tryGetFileSize() override;
|
||||
|
||||
bool checkIfActuallySeekable() override;
|
||||
|
||||
|
@ -311,15 +311,15 @@ off_t ReadBufferFromS3::seek(off_t offset_, int whence)
|
||||
return offset;
|
||||
}
|
||||
|
||||
size_t ReadBufferFromS3::getFileSize()
|
||||
std::optional<size_t> ReadBufferFromS3::tryGetFileSize()
|
||||
{
|
||||
if (file_size)
|
||||
return *file_size;
|
||||
return file_size;
|
||||
|
||||
auto object_size = S3::getObjectSize(*client_ptr, bucket, key, version_id);
|
||||
|
||||
file_size = object_size;
|
||||
return *file_size;
|
||||
return file_size;
|
||||
}
|
||||
|
||||
off_t ReadBufferFromS3::getPosition()
|
||||
|
@ -63,7 +63,7 @@ public:
|
||||
|
||||
off_t getPosition() override;
|
||||
|
||||
size_t getFileSize() override;
|
||||
std::optional<size_t> tryGetFileSize() override;
|
||||
|
||||
void setReadUntilPosition(size_t position) override;
|
||||
void setReadUntilEnd() override;
|
||||
|
@ -72,7 +72,6 @@ namespace ErrorCodes
|
||||
extern const int BAD_ARGUMENTS;
|
||||
extern const int CANNOT_SEEK_THROUGH_FILE;
|
||||
extern const int SEEK_POSITION_OUT_OF_BOUND;
|
||||
extern const int UNKNOWN_FILE_SIZE;
|
||||
}
|
||||
|
||||
std::unique_ptr<ReadBuffer> ReadWriteBufferFromHTTP::CallResult::transformToReadBuffer(size_t buf_size) &&
|
||||
@ -121,15 +120,33 @@ void ReadWriteBufferFromHTTP::prepareRequest(Poco::Net::HTTPRequest & request, s
|
||||
credentials.authenticate(request);
|
||||
}
|
||||
|
||||
size_t ReadWriteBufferFromHTTP::getFileSize()
|
||||
std::optional<size_t> ReadWriteBufferFromHTTP::tryGetFileSize()
|
||||
{
|
||||
if (!file_info)
|
||||
file_info = getFileInfo();
|
||||
{
|
||||
try
|
||||
{
|
||||
file_info = getFileInfo();
|
||||
}
|
||||
catch (const HTTPException &)
|
||||
{
|
||||
return std::nullopt;
|
||||
}
|
||||
catch (const NetException &)
|
||||
{
|
||||
return std::nullopt;
|
||||
}
|
||||
catch (const Poco::Net::NetException &)
|
||||
{
|
||||
return std::nullopt;
|
||||
}
|
||||
catch (const Poco::IOException &)
|
||||
{
|
||||
return std::nullopt;
|
||||
}
|
||||
}
|
||||
|
||||
if (file_info->file_size)
|
||||
return *file_info->file_size;
|
||||
|
||||
throw Exception(ErrorCodes::UNKNOWN_FILE_SIZE, "Cannot find out file size for: {}", initial_uri.toString());
|
||||
return file_info->file_size;
|
||||
}
|
||||
|
||||
bool ReadWriteBufferFromHTTP::supportsReadAt()
|
||||
@ -311,12 +328,12 @@ void ReadWriteBufferFromHTTP::doWithRetries(std::function<void()> && callable,
|
||||
error_message = e.displayText();
|
||||
exception = std::current_exception();
|
||||
}
|
||||
catch (DB::NetException & e)
|
||||
catch (NetException & e)
|
||||
{
|
||||
error_message = e.displayText();
|
||||
exception = std::current_exception();
|
||||
}
|
||||
catch (DB::HTTPException & e)
|
||||
catch (HTTPException & e)
|
||||
{
|
||||
if (!isRetriableError(e.getHTTPStatus()))
|
||||
is_retriable = false;
|
||||
@ -324,7 +341,7 @@ void ReadWriteBufferFromHTTP::doWithRetries(std::function<void()> && callable,
|
||||
error_message = e.displayText();
|
||||
exception = std::current_exception();
|
||||
}
|
||||
catch (DB::Exception & e)
|
||||
catch (Exception & e)
|
||||
{
|
||||
is_retriable = false;
|
||||
|
||||
@ -683,7 +700,19 @@ std::optional<time_t> ReadWriteBufferFromHTTP::tryGetLastModificationTime()
|
||||
{
|
||||
file_info = getFileInfo();
|
||||
}
|
||||
catch (...)
|
||||
catch (const HTTPException &)
|
||||
{
|
||||
return std::nullopt;
|
||||
}
|
||||
catch (const NetException &)
|
||||
{
|
||||
return std::nullopt;
|
||||
}
|
||||
catch (const Poco::Net::NetException &)
|
||||
{
|
||||
return std::nullopt;
|
||||
}
|
||||
catch (const Poco::IOException &)
|
||||
{
|
||||
return std::nullopt;
|
||||
}
|
||||
@ -704,7 +733,7 @@ ReadWriteBufferFromHTTP::HTTPFileInfo ReadWriteBufferFromHTTP::getFileInfo()
|
||||
{
|
||||
getHeadResponse(response);
|
||||
}
|
||||
catch (HTTPException & e)
|
||||
catch (const HTTPException & e)
|
||||
{
|
||||
/// Maybe the web server doesn't support HEAD requests.
|
||||
/// E.g. webhdfs reports status 400.
|
||||
|
@ -118,7 +118,7 @@ private:
|
||||
|
||||
std::unique_ptr<ReadBuffer> initialize();
|
||||
|
||||
size_t getFileSize() override;
|
||||
std::optional<size_t> tryGetFileSize() override;
|
||||
|
||||
bool supportsReadAt() override;
|
||||
|
||||
|
@ -13,41 +13,47 @@ namespace ErrorCodes
|
||||
extern const int UNKNOWN_FILE_SIZE;
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
static size_t getFileSize(T & in)
|
||||
size_t WithFileSize::getFileSize()
|
||||
{
|
||||
if (auto * with_file_size = dynamic_cast<WithFileSize *>(&in))
|
||||
{
|
||||
return with_file_size->getFileSize();
|
||||
}
|
||||
if (auto maybe_size = tryGetFileSize())
|
||||
return *maybe_size;
|
||||
|
||||
throw Exception(ErrorCodes::UNKNOWN_FILE_SIZE, "Cannot find out file size");
|
||||
}
|
||||
|
||||
size_t getFileSizeFromReadBuffer(ReadBuffer & in)
|
||||
template <typename T>
|
||||
static std::optional<size_t> tryGetFileSize(T & in)
|
||||
{
|
||||
if (auto * delegate = dynamic_cast<ReadBufferFromFileDecorator *>(&in))
|
||||
{
|
||||
return getFileSize(delegate->getWrappedReadBuffer());
|
||||
}
|
||||
else if (auto * compressed = dynamic_cast<CompressedReadBufferWrapper *>(&in))
|
||||
{
|
||||
return getFileSize(compressed->getWrappedReadBuffer());
|
||||
}
|
||||
if (auto * with_file_size = dynamic_cast<WithFileSize *>(&in))
|
||||
return with_file_size->tryGetFileSize();
|
||||
|
||||
return getFileSize(in);
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
static size_t getFileSize(T & in)
|
||||
{
|
||||
if (auto maybe_size = tryGetFileSize(in))
|
||||
return *maybe_size;
|
||||
|
||||
throw Exception(ErrorCodes::UNKNOWN_FILE_SIZE, "Cannot find out file size");
|
||||
}
|
||||
|
||||
std::optional<size_t> tryGetFileSizeFromReadBuffer(ReadBuffer & in)
|
||||
{
|
||||
try
|
||||
{
|
||||
return getFileSizeFromReadBuffer(in);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
return std::nullopt;
|
||||
}
|
||||
if (auto * delegate = dynamic_cast<ReadBufferFromFileDecorator *>(&in))
|
||||
return tryGetFileSize(delegate->getWrappedReadBuffer());
|
||||
else if (auto * compressed = dynamic_cast<CompressedReadBufferWrapper *>(&in))
|
||||
return tryGetFileSize(compressed->getWrappedReadBuffer());
|
||||
return tryGetFileSize(in);
|
||||
}
|
||||
|
||||
size_t getFileSizeFromReadBuffer(ReadBuffer & in)
|
||||
{
|
||||
if (auto maybe_size = tryGetFileSizeFromReadBuffer(in))
|
||||
return *maybe_size;
|
||||
|
||||
throw Exception(ErrorCodes::UNKNOWN_FILE_SIZE, "Cannot find out file size");
|
||||
}
|
||||
|
||||
bool isBufferWithFileSize(const ReadBuffer & in)
|
||||
|
@ -10,15 +10,16 @@ class ReadBuffer;
|
||||
class WithFileSize
|
||||
{
|
||||
public:
|
||||
virtual size_t getFileSize() = 0;
|
||||
/// Returns nullopt if couldn't find out file size;
|
||||
virtual std::optional<size_t> tryGetFileSize() = 0;
|
||||
virtual ~WithFileSize() = default;
|
||||
|
||||
size_t getFileSize();
|
||||
};
|
||||
|
||||
bool isBufferWithFileSize(const ReadBuffer & in);
|
||||
|
||||
size_t getFileSizeFromReadBuffer(ReadBuffer & in);
|
||||
|
||||
/// Return nullopt if couldn't find out file size;
|
||||
std::optional<size_t> tryGetFileSizeFromReadBuffer(ReadBuffer & in);
|
||||
|
||||
size_t getDataOffsetMaybeCompressed(const ReadBuffer & in);
|
||||
|
@ -13,10 +13,6 @@
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
@ -237,16 +233,8 @@ void SubstituteColumnOptimizer::perform()
|
||||
|
||||
const auto & compare_graph = metadata_snapshot->getConstraints().getGraph();
|
||||
|
||||
// Fill aliases
|
||||
if (select_query->select())
|
||||
{
|
||||
auto * list = select_query->refSelect()->as<ASTExpressionList>();
|
||||
if (!list)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "List of selected columns must be ASTExpressionList");
|
||||
|
||||
for (ASTPtr & ast : list->children)
|
||||
ast->setAlias(ast->getAliasOrColumnName());
|
||||
}
|
||||
if (compare_graph.getNumOfComponents() == 0)
|
||||
return;
|
||||
|
||||
auto run_for_all = [&](const auto func)
|
||||
{
|
||||
|
@ -15,7 +15,7 @@ struct StorageInMemoryMetadata;
|
||||
using StorageMetadataPtr = std::shared_ptr<const StorageInMemoryMetadata>;
|
||||
|
||||
/// Optimizer that tries to replace columns to equal columns (according to constraints)
|
||||
/// with lower size (according to compressed and uncomressed size).
|
||||
/// with lower size (according to compressed and uncompressed sizes).
|
||||
class SubstituteColumnOptimizer
|
||||
{
|
||||
public:
|
||||
|
@ -66,7 +66,7 @@ public:
|
||||
/** Set the alias. */
|
||||
virtual void setAlias(const String & /*to*/)
|
||||
{
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't set alias of {}", getColumnName());
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't set alias of {} of {}", getColumnName(), getID());
|
||||
}
|
||||
|
||||
/** Get the text that identifies this element. */
|
||||
|
@ -53,7 +53,7 @@ public:
|
||||
bool nextImpl() override;
|
||||
off_t seek(off_t off, int whence) override;
|
||||
off_t getPosition() override;
|
||||
size_t getFileSize() override { return remote_file_size; }
|
||||
std::optional<size_t> tryGetFileSize() override { return remote_file_size; }
|
||||
|
||||
private:
|
||||
std::unique_ptr<LocalFileHolder> local_file_holder;
|
||||
|
@ -91,9 +91,9 @@ void AsynchronousReadBufferFromHDFS::prefetch(Priority priority)
|
||||
}
|
||||
|
||||
|
||||
size_t AsynchronousReadBufferFromHDFS::getFileSize()
|
||||
std::optional<size_t> AsynchronousReadBufferFromHDFS::tryGetFileSize()
|
||||
{
|
||||
return impl->getFileSize();
|
||||
return impl->tryGetFileSize();
|
||||
}
|
||||
|
||||
String AsynchronousReadBufferFromHDFS::getFileName() const
|
||||
|
@ -35,7 +35,7 @@ public:
|
||||
|
||||
void prefetch(Priority priority) override;
|
||||
|
||||
size_t getFileSize() override;
|
||||
std::optional<size_t> tryGetFileSize() override;
|
||||
|
||||
String getFileName() const override;
|
||||
|
||||
|
@ -31,7 +31,7 @@ namespace ErrorCodes
|
||||
}
|
||||
|
||||
|
||||
struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<SeekableReadBuffer>
|
||||
struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<SeekableReadBuffer>, public WithFileSize
|
||||
{
|
||||
String hdfs_uri;
|
||||
String hdfs_file_path;
|
||||
@ -90,7 +90,7 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
|
||||
hdfsCloseFile(fs.get(), fin);
|
||||
}
|
||||
|
||||
size_t getFileSize() const
|
||||
std::optional<size_t> tryGetFileSize() override
|
||||
{
|
||||
return file_size;
|
||||
}
|
||||
@ -191,9 +191,9 @@ ReadBufferFromHDFS::ReadBufferFromHDFS(
|
||||
|
||||
ReadBufferFromHDFS::~ReadBufferFromHDFS() = default;
|
||||
|
||||
size_t ReadBufferFromHDFS::getFileSize()
|
||||
std::optional<size_t> ReadBufferFromHDFS::tryGetFileSize()
|
||||
{
|
||||
return impl->getFileSize();
|
||||
return impl->tryGetFileSize();
|
||||
}
|
||||
|
||||
bool ReadBufferFromHDFS::nextImpl()
|
||||
|
@ -40,7 +40,7 @@ public:
|
||||
|
||||
off_t getPosition() override;
|
||||
|
||||
size_t getFileSize() override;
|
||||
std::optional<size_t> tryGetFileSize() override;
|
||||
|
||||
size_t getFileOffsetOfBufferEnd() const override;
|
||||
|
||||
|
@ -779,7 +779,7 @@ class SettingsRandomizer:
|
||||
"filesystem_prefetch_step_bytes": lambda: random.choice(
|
||||
[0, "100Mi"]
|
||||
), # 0 means 'auto'
|
||||
# "compile_expressions": lambda: random.randint(0, 1), - this setting has a bug: https://github.com/ClickHouse/ClickHouse/issues/51264
|
||||
"compile_expressions": lambda: random.randint(0, 1),
|
||||
"compile_aggregate_expressions": lambda: random.randint(0, 1),
|
||||
"compile_sort_description": lambda: random.randint(0, 1),
|
||||
"merge_tree_coarse_index_granularity": lambda: random.randint(2, 32),
|
||||
|
@ -32,10 +32,10 @@
|
||||
1
|
||||
1
|
||||
0
|
||||
SELECT count() AS `count()`
|
||||
SELECT count()
|
||||
FROM constraint_test_constants
|
||||
WHERE (b > 100) OR (c > 100)
|
||||
SELECT count() AS `count()`
|
||||
SELECT count()
|
||||
FROM constraint_test_constants
|
||||
WHERE c > 100
|
||||
QUERY id: 0
|
||||
@ -53,7 +53,7 @@ QUERY id: 0
|
||||
COLUMN id: 6, column_name: c, result_type: Int64, source_id: 3
|
||||
CONSTANT id: 7, constant_value: UInt64_100, constant_value_type: UInt8
|
||||
SETTINGS allow_experimental_analyzer=1
|
||||
SELECT count() AS `count()`
|
||||
SELECT count()
|
||||
FROM constraint_test_constants
|
||||
WHERE c > 100
|
||||
QUERY id: 0
|
||||
@ -71,7 +71,7 @@ QUERY id: 0
|
||||
COLUMN id: 6, column_name: c, result_type: Int64, source_id: 3
|
||||
CONSTANT id: 7, constant_value: UInt64_100, constant_value_type: UInt8
|
||||
SETTINGS allow_experimental_analyzer=1
|
||||
SELECT count() AS `count()`
|
||||
SELECT count()
|
||||
FROM constraint_test_constants
|
||||
QUERY id: 0
|
||||
PROJECTION COLUMNS
|
||||
|
@ -1,6 +1,6 @@
|
||||
SELECT
|
||||
(b AS `cityHash64(a)`) + 10 AS `plus(cityHash64(a), 10)`,
|
||||
(b AS b) + 3 AS `plus(b, 3)`
|
||||
(b AS `cityHash64(a)`) + 10,
|
||||
(b AS b) + 3
|
||||
FROM column_swap_test_test
|
||||
WHERE b = 1
|
||||
QUERY id: 0
|
||||
@ -59,8 +59,8 @@ QUERY id: 0
|
||||
CONSTANT id: 14, constant_value: UInt64_1, constant_value_type: UInt8
|
||||
SETTINGS allow_experimental_analyzer=1
|
||||
SELECT
|
||||
(b AS `cityHash64(a)`) + 10 AS `plus(cityHash64(a), 10)`,
|
||||
(b AS b) + 3 AS `plus(b, 3)`
|
||||
(b AS `cityHash64(a)`) + 10,
|
||||
(b AS b) + 3
|
||||
FROM column_swap_test_test
|
||||
WHERE b = 0
|
||||
QUERY id: 0
|
||||
@ -89,8 +89,8 @@ QUERY id: 0
|
||||
CONSTANT id: 14, constant_value: UInt64_0, constant_value_type: UInt8
|
||||
SETTINGS allow_experimental_analyzer=1
|
||||
SELECT
|
||||
(b AS `cityHash64(a)`) + 10 AS `plus(cityHash64(a), 10)`,
|
||||
(b AS b) + 3 AS `plus(b, 3)`
|
||||
(b AS `cityHash64(a)`) + 10,
|
||||
(b AS b) + 3
|
||||
FROM column_swap_test_test
|
||||
WHERE b = 0
|
||||
QUERY id: 0
|
||||
@ -119,8 +119,8 @@ QUERY id: 0
|
||||
CONSTANT id: 14, constant_value: UInt64_0, constant_value_type: UInt8
|
||||
SETTINGS allow_experimental_analyzer=1
|
||||
SELECT
|
||||
(b AS `cityHash64(a)`) + 10 AS `plus(cityHash64(a), 10)`,
|
||||
(b AS b) + 3 AS `plus(b, 3)`
|
||||
(b AS `cityHash64(a)`) + 10,
|
||||
(b AS b) + 3
|
||||
FROM column_swap_test_test
|
||||
WHERE b = 1
|
||||
QUERY id: 0
|
||||
@ -148,7 +148,7 @@ QUERY id: 0
|
||||
COLUMN id: 13, column_name: b, result_type: UInt64, source_id: 5
|
||||
CONSTANT id: 14, constant_value: UInt64_1, constant_value_type: UInt8
|
||||
SETTINGS allow_experimental_analyzer=1
|
||||
SELECT (b AS `cityHash64(a)`) + 10 AS `plus(cityHash64(a), 10)`
|
||||
SELECT (b AS `cityHash64(a)`) + 10
|
||||
FROM column_swap_test_test
|
||||
WHERE b = 0
|
||||
QUERY id: 0
|
||||
@ -171,8 +171,8 @@ QUERY id: 0
|
||||
CONSTANT id: 10, constant_value: UInt64_0, constant_value_type: UInt8
|
||||
SETTINGS allow_experimental_analyzer=1
|
||||
SELECT
|
||||
(cityHash64(a) AS `cityHash64(a)`) + 10 AS `plus(cityHash64(a), 10)`,
|
||||
a AS a
|
||||
(cityHash64(a) AS `cityHash64(a)`) + 10,
|
||||
a
|
||||
FROM column_swap_test_test
|
||||
WHERE cityHash64(a) = 0
|
||||
QUERY id: 0
|
||||
@ -203,8 +203,8 @@ QUERY id: 0
|
||||
CONSTANT id: 15, constant_value: UInt64_0, constant_value_type: UInt8
|
||||
SETTINGS allow_experimental_analyzer=1
|
||||
SELECT
|
||||
(cityHash64(a) AS b) + 10 AS `plus(b, 10)`,
|
||||
a AS a
|
||||
(cityHash64(a) AS b) + 10,
|
||||
a
|
||||
FROM column_swap_test_test
|
||||
WHERE cityHash64(a) = 0
|
||||
QUERY id: 0
|
||||
|
@ -6,8 +6,8 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
|
||||
function test
|
||||
{
|
||||
$CLICKHOUSE_LOCAL --allow_experimental_dynamic_type=1 --allow_experimental_variant_type=1 --output_format_binary_encode_types_in_binary_format=1 -q "select $1 as value format RowBinaryWithNamesAndTypes" | $CLICKHOUSE_LOCAL --input-format RowBinaryWithNamesAndTypes --input_format_binary_decode_types_in_binary_format=1 -q "select value, toTypeName(value) from table"
|
||||
$CLICKHOUSE_LOCAL --allow_experimental_dynamic_type=1 --allow_experimental_variant_type=1 --output_format_native_encode_types_in_binary_format=1 -q "select $1 as value format Native" | $CLICKHOUSE_LOCAL --input-format Native --input_format_native_decode_types_in_binary_format=1 -q "select value, toTypeName(value) from table"
|
||||
$CLICKHOUSE_LOCAL --allow_experimental_dynamic_type=1 --allow_experimental_variant_type=1 --output_format_binary_encode_types_in_binary_format=1 -q "select $1 as value format RowBinaryWithNamesAndTypes" | $CLICKHOUSE_LOCAL --input-format RowBinaryWithNamesAndTypes --input_format_binary_decode_types_in_binary_format=1 -q "select value, toTypeName(value) from table"
|
||||
$CLICKHOUSE_LOCAL --allow_experimental_dynamic_type=1 --allow_experimental_variant_type=1 --output_format_native_encode_types_in_binary_format=1 -q "select $1 as value format Native" | $CLICKHOUSE_LOCAL --input-format Native --input_format_native_decode_types_in_binary_format=1 -q "select value, toTypeName(value) from table"
|
||||
}
|
||||
|
||||
test "materialize(42)::UInt8"
|
||||
|
@ -17,8 +17,7 @@ function test()
|
||||
$CH_CLIENT -q "select v.UInt64.null, v.\`Array(Variant(String, UInt64))\`.null, v.\`Array(Variant(String, UInt64))\`.size0, v.\`Array(Variant(String, UInt64))\`.UInt64.null from test order by id"
|
||||
$CH_CLIENT -q "select v.\`Array(Variant(String, UInt64))\`.null, v.\`Array(Variant(String, UInt64))\`.size0, v.\`Array(Variant(String, UInt64))\`.UInt64.null, v.\`Array(Variant(String, UInt64))\`.String.null from test order by id"
|
||||
$CH_CLIENT -q "select id from test where v.UInt64 is null order by id"
|
||||
|
||||
$CH_CLIENT -q "insert into test select number, multiIf(number % 3 == 2, NULL, number % 3 == 1, number, arrayMap(x -> multiIf(number % 9 == 0, NULL, number % 9 == 3, 'str_' || toString(number), number), range(number % 10))) from numbers(1000000) settings min_insert_block_size_rows=100000"
|
||||
$CH_CLIENT -q "insert into test select number, multiIf(number % 3 == 2, NULL, number % 3 == 1, number, arrayMap(x -> multiIf(number % 9 == 0, NULL, number % 9 == 3, 'str_' || toString(number), number), range(number % 10))) from numbers(250000) settings min_insert_block_size_rows=100000, min_insert_block_size_bytes=0"
|
||||
$CH_CLIENT -q "select v, v.UInt64.null, v.\`Array(Variant(String, UInt64))\`.null, v.\`Array(Variant(String, UInt64))\`.size0, v.\`Array(Variant(String, UInt64))\`.UInt64.null from test order by id format Null"
|
||||
$CH_CLIENT -q "select v.UInt64.null, v.\`Array(Variant(String, UInt64))\`.null, v.\`Array(Variant(String, UInt64))\`.size0, v.\`Array(Variant(String, UInt64))\`.UInt64.null from test order by id format Null"
|
||||
$CH_CLIENT -q "select v.\`Array(Variant(String, UInt64))\`.null, v.\`Array(Variant(String, UInt64))\`.size0, v.\`Array(Variant(String, UInt64))\`.UInt64.null, v.\`Array(Variant(String, UInt64))\`.String.null from test order by id format Null"
|
||||
@ -41,4 +40,3 @@ echo "MergeTree wide"
|
||||
$CH_CLIENT -q "create table test (id UInt64, v Variant(UInt64, Array(Variant(String, UInt64)))) engine=MergeTree order by id settings min_rows_for_wide_part=1, min_bytes_for_wide_part=1;"
|
||||
test
|
||||
$CH_CLIENT -q "drop table test;"
|
||||
|
||||
|
@ -0,0 +1 @@
|
||||
Hello world
|
7
tests/queries/0_stateless/03206_no_exceptions_clickhouse_local.sh
Executable file
7
tests/queries/0_stateless/03206_no_exceptions_clickhouse_local.sh
Executable file
@ -0,0 +1,7 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
CLICKHOUSE_TERMINATE_ON_ANY_EXCEPTION=1 ${CLICKHOUSE_LOCAL} --query "SELECT * FROM table" --input-format CSV <<<"Hello, world"
|
@ -0,0 +1,13 @@
|
||||
DROP TABLE IF EXISTS test_table;
|
||||
CREATE TABLE test_table
|
||||
(
|
||||
id UInt64,
|
||||
value String
|
||||
) ENGINE=TinyLog;
|
||||
|
||||
EXPLAIN SYNTAX
|
||||
WITH 1 AS compound_value SELECT * APPLY (x -> compound_value.*)
|
||||
FROM test_table WHERE x > 0
|
||||
SETTINGS convert_query_to_cnf = true, optimize_using_constraints = true, optimize_substitute_columns = true; -- { serverError UNKNOWN_IDENTIFIER }
|
||||
|
||||
DROP TABLE test_table;
|
Loading…
Reference in New Issue
Block a user