Merge branch 'master' into randomize-trace_profile_events

This commit is contained in:
Alexey Milovidov 2024-07-28 21:28:25 +02:00
commit 337cbe65dd
129 changed files with 1076 additions and 707 deletions

View File

@ -36,7 +36,7 @@ These actions are described in detail below.
ADD COLUMN [IF NOT EXISTS] name [type] [default_expr] [codec] [AFTER name_after | FIRST]
```
Adds a new column to the table with the specified `name`, `type`, [`codec`](../create/table.md/#codecs) and `default_expr` (see the section [Default expressions](/docs/en/sql-reference/statements/create/table.md/#create-default-values)).
Adds a new column to the table with the specified `name`, `type`, [`codec`](../create/table.md/#column_compression_codec) and `default_expr` (see the section [Default expressions](/docs/en/sql-reference/statements/create/table.md/#create-default-values)).
If the `IF NOT EXISTS` clause is included, the query wont return an error if the column already exists. If you specify `AFTER name_after` (the name of another column), the column is added after the specified one in the list of table columns. If you want to add a column to the beginning of the table use the `FIRST` clause. Otherwise, the column is added to the end of the table. For a chain of actions, `name_after` can be the name of a column that is added in one of the previous actions.
@ -155,7 +155,7 @@ This query changes the `name` column properties:
- Column-level Settings
For examples of columns compression CODECS modifying, see [Column Compression Codecs](../create/table.md/#codecs).
For examples of columns compression CODECS modifying, see [Column Compression Codecs](../create/table.md/#column_compression_codec).
For examples of columns TTL modifying, see [Column TTL](/docs/en/engines/table-engines/mergetree-family/mergetree.md/#mergetree-column-ttl).

View File

@ -209,8 +209,8 @@ std::vector<String> Client::loadWarningMessages()
{} /* query_parameters */,
"" /* query_id */,
QueryProcessingStage::Complete,
&global_context->getSettingsRef(),
&global_context->getClientInfo(), false, {});
&client_context->getSettingsRef(),
&client_context->getClientInfo(), false, {});
while (true)
{
Packet packet = connection->receivePacket();
@ -306,9 +306,6 @@ void Client::initialize(Poco::Util::Application & self)
if (env_password && !config().has("password"))
config().setString("password", env_password);
// global_context->setApplicationType(Context::ApplicationType::CLIENT);
global_context->setQueryParameters(query_parameters);
/// settings and limits could be specified in config file, but passed settings has higher priority
for (const auto & setting : global_context->getSettingsRef().allUnchanged())
{
@ -382,7 +379,7 @@ try
showWarnings();
/// Set user password complexity rules
auto & access_control = global_context->getAccessControl();
auto & access_control = client_context->getAccessControl();
access_control.setPasswordComplexityRules(connection->getPasswordComplexityRules());
if (is_interactive && !delayed_interactive)
@ -459,7 +456,7 @@ void Client::connect()
<< connection_parameters.host << ":" << connection_parameters.port
<< (!connection_parameters.user.empty() ? " as user " + connection_parameters.user : "") << "." << std::endl;
connection = Connection::createConnection(connection_parameters, global_context);
connection = Connection::createConnection(connection_parameters, client_context);
if (max_client_network_bandwidth)
{
@ -528,7 +525,7 @@ void Client::connect()
}
}
if (!global_context->getSettingsRef().use_client_time_zone)
if (!client_context->getSettingsRef().use_client_time_zone)
{
const auto & time_zone = connection->getServerTimezone(connection_parameters.timeouts);
if (!time_zone.empty())
@ -611,7 +608,7 @@ void Client::printChangedSettings() const
}
};
print_changes(global_context->getSettingsRef().changes(), "settings");
print_changes(client_context->getSettingsRef().changes(), "settings");
print_changes(cmd_merge_tree_settings.changes(), "MergeTree settings");
}
@ -709,7 +706,7 @@ bool Client::processWithFuzzing(const String & full_query)
{
const char * begin = full_query.data();
orig_ast = parseQuery(begin, begin + full_query.size(),
global_context->getSettingsRef(),
client_context->getSettingsRef(),
/*allow_multi_statements=*/ true);
}
catch (const Exception & e)
@ -733,7 +730,7 @@ bool Client::processWithFuzzing(const String & full_query)
}
// Kusto is not a subject for fuzzing (yet)
if (global_context->getSettingsRef().dialect == DB::Dialect::kusto)
if (client_context->getSettingsRef().dialect == DB::Dialect::kusto)
{
return true;
}
@ -1166,6 +1163,11 @@ void Client::processOptions(const OptionsDescription & options_description,
if (options.count("opentelemetry-tracestate"))
global_context->getClientTraceContext().tracestate = options["opentelemetry-tracestate"].as<std::string>();
/// In case of clickhouse-client the `client_context` can be just an alias for the `global_context`.
/// (There is no need to copy the context because clickhouse-client has no background tasks so it won't use that context in parallel.)
client_context = global_context;
initClientContext();
}
@ -1205,11 +1207,6 @@ void Client::processConfig()
pager = config().getString("pager", "");
setDefaultFormatsAndCompressionFromConfiguration();
global_context->setClientName(std::string(DEFAULT_CLIENT_NAME));
global_context->setQueryKindInitial();
global_context->setQuotaClientKey(config().getString("quota_key", ""));
global_context->setQueryKind(query_kind);
}

View File

@ -16,7 +16,6 @@ public:
int main(const std::vector<String> & /*args*/) override;
protected:
Poco::Util::LayeredConfiguration & getClientConfiguration() override;
bool processWithFuzzing(const String & full_query) override;

View File

@ -295,6 +295,8 @@ void LocalServer::cleanup()
if (suggest)
suggest.reset();
client_context.reset();
if (global_context)
{
global_context->shutdown();
@ -436,7 +438,7 @@ void LocalServer::connect()
in = input.get();
}
connection = LocalConnection::createConnection(
connection_parameters, global_context, in, need_render_progress, need_render_profile_events, server_display_name);
connection_parameters, client_context, in, need_render_progress, need_render_profile_events, server_display_name);
}
@ -497,8 +499,6 @@ try
initTTYBuffer(toProgressOption(getClientConfiguration().getString("progress", "default")));
ASTAlterCommand::setFormatAlterCommandsWithParentheses(true);
applyCmdSettings(global_context);
/// try to load user defined executable functions, throw on error and die
try
{
@ -510,6 +510,11 @@ try
throw;
}
/// Must be called after we stopped initializing the global context and changing its settings.
/// After this point the global context must be stayed almost unchanged till shutdown,
/// and all necessary changes must be made to the client context instead.
createClientContext();
if (is_interactive)
{
clearTerminal();
@ -735,6 +740,9 @@ void LocalServer::processConfig()
/// Load global settings from default_profile and system_profile.
global_context->setDefaultProfiles(getClientConfiguration());
/// Command-line parameters can override settings from the default profile.
applyCmdSettings(global_context);
/// We load temporary database first, because projections need it.
DatabaseCatalog::instance().initializeAndLoadTemporaryDatabase();
@ -778,10 +786,6 @@ void LocalServer::processConfig()
server_display_name = getClientConfiguration().getString("display_name", "");
prompt_by_server_display_name = getClientConfiguration().getRawString("prompt_by_server_display_name.default", ":) ");
global_context->setQueryKindInitial();
global_context->setQueryKind(query_kind);
global_context->setQueryParameters(query_parameters);
}
@ -860,6 +864,16 @@ void LocalServer::applyCmdOptions(ContextMutablePtr context)
}
void LocalServer::createClientContext()
{
/// In case of clickhouse-local it's necessary to use a separate context for client-related purposes.
/// We can't just change the global context because it is used in background tasks (for example, in merges)
/// which don't expect that the global context can suddenly change.
client_context = Context::createCopy(global_context);
initClientContext();
}
void LocalServer::processOptions(const OptionsDescription &, const CommandLineOptions & options, const std::vector<Arguments> &, const std::vector<Arguments> &)
{
if (options.count("table"))

View File

@ -31,7 +31,6 @@ public:
int main(const std::vector<String> & /*args*/) override;
protected:
Poco::Util::LayeredConfiguration & getClientConfiguration() override;
void connect() override;
@ -50,7 +49,6 @@ protected:
void processConfig() override;
void readArguments(int argc, char ** argv, Arguments & common_arguments, std::vector<Arguments> &, std::vector<Arguments> &) override;
void updateLoggerLevel(const String & logs_level) override;
private:
@ -67,6 +65,8 @@ private:
void applyCmdOptions(ContextMutablePtr context);
void applyCmdSettings(ContextMutablePtr context);
void createClientContext();
ServerSettings server_settings;
std::optional<StatusFile> status;

View File

@ -0,0 +1,13 @@
<clickhouse>
<storage_configuration>
<disks>
<backups>
<type>local</type>
<path>/tmp/backups/</path>
</backups>
</disks>
</storage_configuration>
<backups>
<allowed_disk>backups</allowed_disk>
</backups>
</clickhouse>

View File

@ -0,0 +1 @@
../../../tests/config/config.d/enable_keeper_map.xml

View File

@ -0,0 +1 @@
../../../tests/config/config.d/session_log.xml

View File

@ -268,6 +268,8 @@ QueryTreeNodePtr QueryTreeBuilder::buildSelectExpression(const ASTPtr & select_q
}
}
const auto enable_order_by_all = updated_context->getSettingsRef().enable_order_by_all;
auto current_query_tree = std::make_shared<QueryNode>(std::move(updated_context), std::move(settings_changes));
current_query_tree->setIsSubquery(is_subquery);
@ -281,7 +283,10 @@ QueryTreeNodePtr QueryTreeBuilder::buildSelectExpression(const ASTPtr & select_q
current_query_tree->setIsGroupByWithRollup(select_query_typed.group_by_with_rollup);
current_query_tree->setIsGroupByWithGroupingSets(select_query_typed.group_by_with_grouping_sets);
current_query_tree->setIsGroupByAll(select_query_typed.group_by_all);
current_query_tree->setIsOrderByAll(select_query_typed.order_by_all);
/// order_by_all flag in AST is set w/o consideration of `enable_order_by_all` setting
/// since SETTINGS section has not been parsed yet, - so, check the setting here
if (enable_order_by_all)
current_query_tree->setIsOrderByAll(select_query_typed.order_by_all);
current_query_tree->setOriginalAST(select_query);
auto current_context = current_query_tree->getContext();

View File

@ -1740,7 +1740,7 @@ QueryAnalyzer::QueryTreeNodesWithNames QueryAnalyzer::resolveQualifiedMatcher(Qu
const auto * tuple_data_type = typeid_cast<const DataTypeTuple *>(result_type.get());
if (!tuple_data_type)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
"Qualified matcher {} find non compound expression {} with type {}. Expected tuple or array of tuples. In scope {}",
"Qualified matcher {} found a non-compound expression {} with type {}. Expected a tuple or an array of tuples. In scope {}",
matcher_node->formatASTForErrorMessage(),
expression_query_tree_node->formatASTForErrorMessage(),
expression_query_tree_node->getResultType()->getName(),

View File

@ -477,7 +477,7 @@ void ClientBase::sendExternalTables(ASTPtr parsed_query)
std::vector<ExternalTableDataPtr> data;
for (auto & table : external_tables)
data.emplace_back(table.getData(global_context));
data.emplace_back(table.getData(client_context));
connection->sendExternalTablesData(data);
}
@ -690,10 +690,10 @@ try
/// intermixed with data with parallel formatting.
/// It may increase code complexity significantly.
if (!extras_into_stdout || select_only_into_file)
output_format = global_context->getOutputFormatParallelIfPossible(
output_format = client_context->getOutputFormatParallelIfPossible(
current_format, out_file_buf ? *out_file_buf : *out_buf, block);
else
output_format = global_context->getOutputFormat(
output_format = client_context->getOutputFormat(
current_format, out_file_buf ? *out_file_buf : *out_buf, block);
output_format->setAutoFlush();
@ -772,6 +772,15 @@ void ClientBase::adjustSettings()
global_context->setSettings(settings);
}
void ClientBase::initClientContext()
{
client_context->setClientName(std::string(DEFAULT_CLIENT_NAME));
client_context->setQuotaClientKey(getClientConfiguration().getString("quota_key", ""));
client_context->setQueryKindInitial();
client_context->setQueryKind(query_kind);
client_context->setQueryParameters(query_parameters);
}
bool ClientBase::isRegularFile(int fd)
{
struct stat file_stat;
@ -962,7 +971,7 @@ void ClientBase::processTextAsSingleQuery(const String & full_query)
/// client-side. Thus we need to parse the query.
const char * begin = full_query.data();
auto parsed_query = parseQuery(begin, begin + full_query.size(),
global_context->getSettingsRef(),
client_context->getSettingsRef(),
/*allow_multi_statements=*/ false);
if (!parsed_query)
@ -985,7 +994,7 @@ void ClientBase::processTextAsSingleQuery(const String & full_query)
/// But for asynchronous inserts we don't extract data, because it's needed
/// to be done on server side in that case (for coalescing the data from multiple inserts on server side).
const auto * insert = parsed_query->as<ASTInsertQuery>();
if (insert && isSyncInsertWithData(*insert, global_context))
if (insert && isSyncInsertWithData(*insert, client_context))
query_to_execute = full_query.substr(0, insert->data - full_query.data());
else
query_to_execute = full_query;
@ -1103,7 +1112,7 @@ void ClientBase::processOrdinaryQuery(const String & query_to_execute, ASTPtr pa
}
}
const auto & settings = global_context->getSettingsRef();
const auto & settings = client_context->getSettingsRef();
const Int32 signals_before_stop = settings.partial_result_on_first_cancel ? 2 : 1;
int retries_left = 10;
@ -1118,10 +1127,10 @@ void ClientBase::processOrdinaryQuery(const String & query_to_execute, ASTPtr pa
connection_parameters.timeouts,
query,
query_parameters,
global_context->getCurrentQueryId(),
client_context->getCurrentQueryId(),
query_processing_stage,
&global_context->getSettingsRef(),
&global_context->getClientInfo(),
&client_context->getSettingsRef(),
&client_context->getClientInfo(),
true,
[&](const Progress & progress) { onProgress(progress); });
@ -1308,7 +1317,7 @@ void ClientBase::onProgress(const Progress & value)
void ClientBase::onTimezoneUpdate(const String & tz)
{
global_context->setSetting("session_timezone", tz);
client_context->setSetting("session_timezone", tz);
}
@ -1504,13 +1513,13 @@ bool ClientBase::receiveSampleBlock(Block & out, ColumnsDescription & columns_de
void ClientBase::setInsertionTable(const ASTInsertQuery & insert_query)
{
if (!global_context->hasInsertionTable() && insert_query.table)
if (!client_context->hasInsertionTable() && insert_query.table)
{
String table = insert_query.table->as<ASTIdentifier &>().shortName();
if (!table.empty())
{
String database = insert_query.database ? insert_query.database->as<ASTIdentifier &>().shortName() : "";
global_context->setInsertionTable(StorageID(database, table));
client_context->setInsertionTable(StorageID(database, table));
}
}
}
@ -1561,7 +1570,7 @@ void ClientBase::processInsertQuery(const String & query_to_execute, ASTPtr pars
const auto & parsed_insert_query = parsed_query->as<ASTInsertQuery &>();
if ((!parsed_insert_query.data && !parsed_insert_query.infile) && (is_interactive || (!stdin_is_a_tty && !isStdinNotEmptyAndValid(std_in))))
{
const auto & settings = global_context->getSettingsRef();
const auto & settings = client_context->getSettingsRef();
if (settings.throw_if_no_data_to_insert)
throw Exception(ErrorCodes::NO_DATA_TO_INSERT, "No data to insert");
else
@ -1575,10 +1584,10 @@ void ClientBase::processInsertQuery(const String & query_to_execute, ASTPtr pars
connection_parameters.timeouts,
query,
query_parameters,
global_context->getCurrentQueryId(),
client_context->getCurrentQueryId(),
query_processing_stage,
&global_context->getSettingsRef(),
&global_context->getClientInfo(),
&client_context->getSettingsRef(),
&client_context->getClientInfo(),
true,
[&](const Progress & progress) { onProgress(progress); });
@ -1626,7 +1635,7 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
/// Set callback to be called on file progress.
if (tty_buf)
progress_indication.setFileProgressCallback(global_context, *tty_buf);
progress_indication.setFileProgressCallback(client_context, *tty_buf);
}
/// If data fetched from file (maybe compressed file)
@ -1660,10 +1669,10 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
}
StorageFile::CommonArguments args{
WithContext(global_context),
WithContext(client_context),
parsed_insert_query->table_id,
current_format,
getFormatSettings(global_context),
getFormatSettings(client_context),
compression_method,
columns_for_storage_file,
ConstraintsDescription{},
@ -1671,7 +1680,7 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
{},
String{},
};
StoragePtr storage = std::make_shared<StorageFile>(in_file, global_context->getUserFilesPath(), args);
StoragePtr storage = std::make_shared<StorageFile>(in_file, client_context->getUserFilesPath(), args);
storage->startup();
SelectQueryInfo query_info;
@ -1682,16 +1691,16 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
storage->read(
plan,
sample.getNames(),
storage->getStorageSnapshot(metadata, global_context),
storage->getStorageSnapshot(metadata, client_context),
query_info,
global_context,
client_context,
{},
global_context->getSettingsRef().max_block_size,
client_context->getSettingsRef().max_block_size,
getNumberOfPhysicalCPUCores());
auto builder = plan.buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(global_context),
BuildQueryPipelineSettings::fromContext(global_context));
QueryPlanOptimizationSettings::fromContext(client_context),
BuildQueryPipelineSettings::fromContext(client_context));
QueryPlanResourceHolder resources;
auto pipe = QueryPipelineBuilder::getPipe(std::move(*builder), resources);
@ -1752,14 +1761,14 @@ void ClientBase::sendDataFrom(ReadBuffer & buf, Block & sample, const ColumnsDes
current_format = insert->format;
}
auto source = global_context->getInputFormat(current_format, buf, sample, insert_format_max_block_size);
auto source = client_context->getInputFormat(current_format, buf, sample, insert_format_max_block_size);
Pipe pipe(source);
if (columns_description.hasDefaults())
{
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<AddingDefaultsTransform>(header, columns_description, *source, global_context);
return std::make_shared<AddingDefaultsTransform>(header, columns_description, *source, client_context);
});
}
@ -1921,12 +1930,12 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
if (is_interactive)
{
global_context->setCurrentQueryId("");
client_context->setCurrentQueryId("");
// Generate a new query_id
for (const auto & query_id_format : query_id_formats)
{
writeString(query_id_format.first, std_out);
writeString(fmt::format(fmt::runtime(query_id_format.second), fmt::arg("query_id", global_context->getCurrentQueryId())), std_out);
writeString(fmt::format(fmt::runtime(query_id_format.second), fmt::arg("query_id", client_context->getCurrentQueryId())), std_out);
writeChar('\n', std_out);
std_out.next();
}
@ -1953,7 +1962,7 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
auto password = auth_data->getPassword();
if (password)
global_context->getAccessControl().checkPasswordComplexityRules(*password);
client_context->getAccessControl().checkPasswordComplexityRules(*password);
}
}
}
@ -1968,15 +1977,15 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
std::optional<Settings> old_settings;
SCOPE_EXIT_SAFE({
if (old_settings)
global_context->setSettings(*old_settings);
client_context->setSettings(*old_settings);
});
auto apply_query_settings = [&](const IAST & settings_ast)
{
if (!old_settings)
old_settings.emplace(global_context->getSettingsRef());
global_context->applySettingsChanges(settings_ast.as<ASTSetQuery>()->changes);
global_context->resetSettingsToDefaultValue(settings_ast.as<ASTSetQuery>()->default_settings);
old_settings.emplace(client_context->getSettingsRef());
client_context->applySettingsChanges(settings_ast.as<ASTSetQuery>()->changes);
client_context->resetSettingsToDefaultValue(settings_ast.as<ASTSetQuery>()->default_settings);
};
const auto * insert = parsed_query->as<ASTInsertQuery>();
@ -2009,7 +2018,7 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
if (insert && insert->select)
insert->tryFindInputFunction(input_function);
bool is_async_insert_with_inlined_data = global_context->getSettingsRef().async_insert && insert && insert->hasInlinedData();
bool is_async_insert_with_inlined_data = client_context->getSettingsRef().async_insert && insert && insert->hasInlinedData();
if (is_async_insert_with_inlined_data)
{
@ -2044,9 +2053,9 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
if (change.name == "profile")
current_profile = change.value.safeGet<String>();
else
global_context->applySettingChange(change);
client_context->applySettingChange(change);
}
global_context->resetSettingsToDefaultValue(set_query->default_settings);
client_context->resetSettingsToDefaultValue(set_query->default_settings);
/// Query parameters inside SET queries should be also saved on the client side
/// to override their previous definitions set with --param_* arguments
@ -2054,7 +2063,7 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
for (const auto & [name, value] : set_query->query_parameters)
query_parameters.insert_or_assign(name, value);
global_context->addQueryParameters(NameToNameMap{set_query->query_parameters.begin(), set_query->query_parameters.end()});
client_context->addQueryParameters(NameToNameMap{set_query->query_parameters.begin(), set_query->query_parameters.end()});
}
if (const auto * use_query = parsed_query->as<ASTUseQuery>())
{
@ -2131,8 +2140,8 @@ MultiQueryProcessingStage ClientBase::analyzeMultiQueryText(
if (this_query_begin >= all_queries_end)
return MultiQueryProcessingStage::QUERIES_END;
unsigned max_parser_depth = static_cast<unsigned>(global_context->getSettingsRef().max_parser_depth);
unsigned max_parser_backtracks = static_cast<unsigned>(global_context->getSettingsRef().max_parser_backtracks);
unsigned max_parser_depth = static_cast<unsigned>(client_context->getSettingsRef().max_parser_depth);
unsigned max_parser_backtracks = static_cast<unsigned>(client_context->getSettingsRef().max_parser_backtracks);
// If there are only comments left until the end of file, we just
// stop. The parser can't handle this situation because it always
@ -2152,7 +2161,7 @@ MultiQueryProcessingStage ClientBase::analyzeMultiQueryText(
try
{
parsed_query = parseQuery(this_query_end, all_queries_end,
global_context->getSettingsRef(),
client_context->getSettingsRef(),
/*allow_multi_statements=*/ true);
}
catch (const Exception & e)
@ -2195,7 +2204,7 @@ MultiQueryProcessingStage ClientBase::analyzeMultiQueryText(
{
this_query_end = find_first_symbols<'\n'>(insert_ast->data, all_queries_end);
insert_ast->end = this_query_end;
query_to_execute_end = isSyncInsertWithData(*insert_ast, global_context) ? insert_ast->data : this_query_end;
query_to_execute_end = isSyncInsertWithData(*insert_ast, client_context) ? insert_ast->data : this_query_end;
}
query_to_execute = all_queries_text.substr(this_query_begin - all_queries_text.data(), query_to_execute_end - this_query_begin);
@ -2404,13 +2413,13 @@ bool ClientBase::executeMultiQuery(const String & all_queries_text)
// , where the inline data is delimited by semicolon and not by a
// newline.
auto * insert_ast = parsed_query->as<ASTInsertQuery>();
if (insert_ast && isSyncInsertWithData(*insert_ast, global_context))
if (insert_ast && isSyncInsertWithData(*insert_ast, client_context))
{
this_query_end = insert_ast->end;
adjustQueryEnd(
this_query_end, all_queries_end,
static_cast<unsigned>(global_context->getSettingsRef().max_parser_depth),
static_cast<unsigned>(global_context->getSettingsRef().max_parser_backtracks));
static_cast<unsigned>(client_context->getSettingsRef().max_parser_depth),
static_cast<unsigned>(client_context->getSettingsRef().max_parser_backtracks));
}
// Report error.
@ -2541,10 +2550,10 @@ void ClientBase::runInteractive()
if (load_suggestions)
{
/// Load suggestion data from the server.
if (global_context->getApplicationType() == Context::ApplicationType::CLIENT)
suggest->load<Connection>(global_context, connection_parameters, getClientConfiguration().getInt("suggestion_limit"), wait_for_suggestions_to_load);
else if (global_context->getApplicationType() == Context::ApplicationType::LOCAL)
suggest->load<LocalConnection>(global_context, connection_parameters, getClientConfiguration().getInt("suggestion_limit"), wait_for_suggestions_to_load);
if (client_context->getApplicationType() == Context::ApplicationType::CLIENT)
suggest->load<Connection>(client_context, connection_parameters, getClientConfiguration().getInt("suggestion_limit"), wait_for_suggestions_to_load);
else if (client_context->getApplicationType() == Context::ApplicationType::LOCAL)
suggest->load<LocalConnection>(client_context, connection_parameters, getClientConfiguration().getInt("suggestion_limit"), wait_for_suggestions_to_load);
}
if (home_path.empty())
@ -2682,7 +2691,7 @@ void ClientBase::runInteractive()
{
// If a separate connection loading suggestions failed to open a new session,
// use the main session to receive them.
suggest->load(*connection, connection_parameters.timeouts, getClientConfiguration().getInt("suggestion_limit"), global_context->getClientInfo());
suggest->load(*connection, connection_parameters.timeouts, getClientConfiguration().getInt("suggestion_limit"), client_context->getClientInfo());
}
try
@ -2731,10 +2740,10 @@ bool ClientBase::processMultiQueryFromFile(const String & file_name)
if (!getClientConfiguration().has("log_comment"))
{
Settings settings = global_context->getSettings();
Settings settings = client_context->getSettings();
/// NOTE: cannot use even weakly_canonical() since it fails for /dev/stdin due to resolving of "pipe:[X]"
settings.log_comment = fs::absolute(fs::path(file_name));
global_context->setSettings(settings);
client_context->setSettings(settings);
}
return executeMultiQuery(queries_from_file);

View File

@ -206,6 +206,9 @@ protected:
/// Adjust some settings after command line options and config had been processed.
void adjustSettings();
/// Initializes the client context.
void initClientContext();
void setDefaultFormatsAndCompressionFromConfiguration();
void initTTYBuffer(ProgressOption progress);
@ -215,6 +218,9 @@ protected:
SharedContextHolder shared_context;
ContextMutablePtr global_context;
/// Client context is a context used only by the client to parse queries, process query parameters and to connect to clickhouse-server.
ContextMutablePtr client_context;
LoggerPtr fatal_log;
Poco::AutoPtr<Poco::SplitterChannel> fatal_channel_ptr;
Poco::AutoPtr<Poco::Channel> fatal_console_channel_ptr;

View File

@ -8,6 +8,7 @@
#include <Common/ErrorCodes.h>
#include <Common/Exception.h>
#include <Common/LockMemoryExceptionInThread.h>
#include <Common/Logger.h>
#include <Common/MemorySanitizer.h>
#include <Common/SensitiveDataMasker.h>
#include <Common/config_version.h>
@ -100,7 +101,7 @@ Exception::Exception(const MessageMasked & msg_masked, int code, bool remote_)
{
if (terminate_on_any_exception)
std::_Exit(terminate_status_code);
capture_thread_frame_pointers = thread_frame_pointers;
capture_thread_frame_pointers = getThreadFramePointers();
handle_error_code(msg_masked.msg, code, remote, getStackFramePointers());
}
@ -110,7 +111,7 @@ Exception::Exception(MessageMasked && msg_masked, int code, bool remote_)
{
if (terminate_on_any_exception)
std::_Exit(terminate_status_code);
capture_thread_frame_pointers = thread_frame_pointers;
capture_thread_frame_pointers = getThreadFramePointers();
handle_error_code(message(), code, remote, getStackFramePointers());
}
@ -119,7 +120,7 @@ Exception::Exception(CreateFromPocoTag, const Poco::Exception & exc)
{
if (terminate_on_any_exception)
std::_Exit(terminate_status_code);
capture_thread_frame_pointers = thread_frame_pointers;
capture_thread_frame_pointers = getThreadFramePointers();
#ifdef STD_EXCEPTION_HAS_STACK_TRACE
auto * stack_trace_frames = exc.get_stack_trace_frames();
auto stack_trace_size = exc.get_stack_trace_size();
@ -133,7 +134,7 @@ Exception::Exception(CreateFromSTDTag, const std::exception & exc)
{
if (terminate_on_any_exception)
std::_Exit(terminate_status_code);
capture_thread_frame_pointers = thread_frame_pointers;
capture_thread_frame_pointers = getThreadFramePointers();
#ifdef STD_EXCEPTION_HAS_STACK_TRACE
auto * stack_trace_frames = exc.get_stack_trace_frames();
auto stack_trace_size = exc.get_stack_trace_size();
@ -223,10 +224,38 @@ Exception::FramePointers Exception::getStackFramePointers() const
}
thread_local bool Exception::enable_job_stack_trace = false;
thread_local std::vector<StackTrace::FramePointers> Exception::thread_frame_pointers = {};
thread_local bool Exception::can_use_thread_frame_pointers = false;
thread_local Exception::ThreadFramePointers Exception::thread_frame_pointers;
Exception::ThreadFramePointers::ThreadFramePointers()
{
can_use_thread_frame_pointers = true;
}
Exception::ThreadFramePointers::~ThreadFramePointers()
{
can_use_thread_frame_pointers = false;
}
Exception::ThreadFramePointersBase Exception::getThreadFramePointers()
{
if (can_use_thread_frame_pointers)
return thread_frame_pointers.frame_pointers;
return {};
}
void Exception::setThreadFramePointers(ThreadFramePointersBase frame_pointers)
{
if (can_use_thread_frame_pointers)
thread_frame_pointers.frame_pointers = std::move(frame_pointers);
}
static void tryLogCurrentExceptionImpl(Poco::Logger * logger, const std::string & start_of_message)
{
if (!isLoggingEnabled())
return;
try
{
PreformattedMessage message = getCurrentExceptionMessageAndPattern(true);
@ -242,6 +271,9 @@ static void tryLogCurrentExceptionImpl(Poco::Logger * logger, const std::string
void tryLogCurrentException(const char * log_name, const std::string & start_of_message)
{
if (!isLoggingEnabled())
return;
/// Under high memory pressure, new allocations throw a
/// MEMORY_LIMIT_EXCEEDED exception.
///

View File

@ -10,7 +10,6 @@
#include <cerrno>
#include <exception>
#include <memory>
#include <vector>
#include <fmt/core.h>
@ -49,14 +48,14 @@ public:
{
if (terminate_on_any_exception)
std::terminate();
capture_thread_frame_pointers = thread_frame_pointers;
capture_thread_frame_pointers = getThreadFramePointers();
}
Exception(const PreformattedMessage & msg, int code): Exception(msg.text, code)
{
if (terminate_on_any_exception)
std::terminate();
capture_thread_frame_pointers = thread_frame_pointers;
capture_thread_frame_pointers = getThreadFramePointers();
message_format_string = msg.format_string;
message_format_string_args = msg.format_string_args;
}
@ -65,18 +64,36 @@ public:
{
if (terminate_on_any_exception)
std::terminate();
capture_thread_frame_pointers = thread_frame_pointers;
capture_thread_frame_pointers = getThreadFramePointers();
message_format_string = msg.format_string;
message_format_string_args = msg.format_string_args;
}
/// Collect call stacks of all previous jobs' schedulings leading to this thread job's execution
static thread_local bool enable_job_stack_trace;
static thread_local std::vector<StackTrace::FramePointers> thread_frame_pointers;
static thread_local bool can_use_thread_frame_pointers;
/// Because of unknown order of static destructor calls,
/// thread_frame_pointers can already be uninitialized when a different destructor generates an exception.
/// To prevent such scenarios, a wrapper class is created and a function that will return empty vector
/// if its destructor is already called
using ThreadFramePointersBase = std::vector<StackTrace::FramePointers>;
struct ThreadFramePointers
{
ThreadFramePointers();
~ThreadFramePointers();
ThreadFramePointersBase frame_pointers;
};
static ThreadFramePointersBase getThreadFramePointers();
static void setThreadFramePointers(ThreadFramePointersBase frame_pointers);
/// Callback for any exception
static std::function<void(const std::string & msg, int code, bool remote, const Exception::FramePointers & trace)> callback;
protected:
static thread_local ThreadFramePointers thread_frame_pointers;
// used to remove the sensitive information from exceptions if query_masking_rules is configured
struct MessageMasked
{
@ -178,7 +195,7 @@ class ErrnoException : public Exception
public:
ErrnoException(std::string && msg, int code, int with_errno) : Exception(msg, code), saved_errno(with_errno)
{
capture_thread_frame_pointers = thread_frame_pointers;
capture_thread_frame_pointers = getThreadFramePointers();
addMessage(", {}", errnoToString(saved_errno));
}
@ -187,7 +204,7 @@ public:
requires std::is_convertible_v<T, String>
ErrnoException(int code, T && message) : Exception(message, code), saved_errno(errno)
{
capture_thread_frame_pointers = thread_frame_pointers;
capture_thread_frame_pointers = getThreadFramePointers();
addMessage(", {}", errnoToString(saved_errno));
}

View File

@ -25,3 +25,15 @@ bool hasLogger(const std::string & name)
{
return Poco::Logger::has(name);
}
static constinit std::atomic<bool> allow_logging{true};
bool isLoggingEnabled()
{
return allow_logging;
}
void disableLogging()
{
allow_logging = false;
}

View File

@ -64,3 +64,7 @@ LoggerRawPtr createRawLogger(const std::string & name, Poco::Channel * channel,
* Otherwise, returns false.
*/
bool hasLogger(const std::string & name);
void disableLogging();
bool isLoggingEnabled();

View File

@ -89,7 +89,7 @@ void signalHandler(int sig, siginfo_t * info, void * context)
writePODBinary(*info, out);
writePODBinary(signal_context, out);
writePODBinary(stack_trace, out);
writeVectorBinary(Exception::enable_job_stack_trace ? Exception::thread_frame_pointers : std::vector<StackTrace::FramePointers>{}, out);
writeVectorBinary(Exception::enable_job_stack_trace ? Exception::getThreadFramePointers() : std::vector<StackTrace::FramePointers>{}, out);
writeBinary(static_cast<UInt32>(getThreadId()), out);
writePODBinary(current_thread, out);

View File

@ -489,24 +489,25 @@ struct CacheEntry
using CacheEntryPtr = std::shared_ptr<CacheEntry>;
static constinit std::atomic<bool> can_use_cache = false;
static constinit bool can_use_cache = false;
using StackTraceCacheBase = std::map<StackTraceTriple, CacheEntryPtr, std::less<>>;
struct StackTraceCache : public StackTraceCacheBase
{
StackTraceCache()
: StackTraceCacheBase()
{
can_use_cache = true;
}
~StackTraceCache()
{
can_use_cache = false;
}
};
static StackTraceCache & cacheInstance()
{
static StackTraceCache cache;
can_use_cache = true;
return cache;
}
static StackTraceCache cache;
static DB::SharedMutex stacktrace_cache_mutex;
@ -524,7 +525,6 @@ String toStringCached(const StackTrace::FramePointers & pointers, size_t offset,
/// Calculation of stack trace text is extremely slow.
/// We use cache because otherwise the server could be overloaded by trash queries.
/// Note that this cache can grow unconditionally, but practically it should be small.
StackTraceCache & cache = cacheInstance();
CacheEntryPtr cache_entry;
// Optimistic try for cache hit to avoid any contention whatsoever, should be the main hot code route
@ -576,7 +576,7 @@ std::string StackTrace::toString(void * const * frame_pointers_raw, size_t offse
void StackTrace::dropCache()
{
std::lock_guard lock{stacktrace_cache_mutex};
cacheInstance().clear();
cache.clear();
}

View File

@ -51,7 +51,7 @@ public:
if (!capture_frame_pointers)
return;
/// Save all previous jobs call stacks and append with current
frame_pointers = DB::Exception::thread_frame_pointers;
frame_pointers = DB::Exception::getThreadFramePointers();
frame_pointers.push_back(StackTrace().getFramePointers());
}
@ -455,7 +455,7 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
try
{
if (DB::Exception::enable_job_stack_trace)
DB::Exception::thread_frame_pointers = std::move(job_data->frame_pointers);
DB::Exception::setThreadFramePointers(std::move(job_data->frame_pointers));
CurrentMetrics::Increment metric_active_pool_threads(metric_active_threads);

View File

@ -1,11 +1,12 @@
#if defined(OS_LINUX)
#include <Common/TimerDescriptor.h>
#include <Common/Exception.h>
#include <sys/timerfd.h>
#include <fcntl.h>
#include <unistd.h>
namespace DB
{
@ -13,21 +14,18 @@ namespace ErrorCodes
{
extern const int CANNOT_CREATE_TIMER;
extern const int CANNOT_SET_TIMER_PERIOD;
extern const int CANNOT_FCNTL;
extern const int CANNOT_READ_FROM_SOCKET;
}
TimerDescriptor::TimerDescriptor(int clockid, int flags)
TimerDescriptor::TimerDescriptor()
{
timer_fd = timerfd_create(clockid, flags);
timer_fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
if (timer_fd == -1)
throw Exception(ErrorCodes::CANNOT_CREATE_TIMER, "Cannot create timer_fd descriptor");
if (-1 == fcntl(timer_fd, F_SETFL, O_NONBLOCK))
throw ErrnoException(ErrorCodes::CANNOT_FCNTL, "Cannot set O_NONBLOCK for timer_fd");
throw ErrnoException(ErrorCodes::CANNOT_CREATE_TIMER, "Cannot create timer_fd descriptor");
}
TimerDescriptor::TimerDescriptor(TimerDescriptor && other) noexcept : timer_fd(other.timer_fd)
TimerDescriptor::TimerDescriptor(TimerDescriptor && other) noexcept
: timer_fd(other.timer_fd)
{
other.timer_fd = -1;
}
@ -40,21 +38,19 @@ TimerDescriptor & TimerDescriptor::operator=(DB::TimerDescriptor && other) noexc
TimerDescriptor::~TimerDescriptor()
{
/// Do not check for result cause cannot throw exception.
if (timer_fd != -1)
{
int err = close(timer_fd);
chassert(!err || errno == EINTR);
if (0 != ::close(timer_fd))
std::terminate();
}
}
void TimerDescriptor::reset() const
{
itimerspec spec;
spec.it_interval.tv_nsec = 0;
spec.it_interval.tv_sec = 0;
spec.it_value.tv_sec = 0;
spec.it_value.tv_nsec = 0;
if (timer_fd == -1)
return;
itimerspec spec{};
if (-1 == timerfd_settime(timer_fd, 0 /*relative timer */, &spec, nullptr))
throw ErrnoException(ErrorCodes::CANNOT_SET_TIMER_PERIOD, "Cannot reset timer_fd");
@ -66,25 +62,46 @@ void TimerDescriptor::reset() const
void TimerDescriptor::drain() const
{
if (timer_fd == -1)
return;
/// It is expected that socket returns 8 bytes when readable.
/// Read in loop anyway cause signal may interrupt read call.
/// man timerfd_create:
/// If the timer has already expired one or more times since its settings were last modified using timerfd_settime(),
/// or since the last successful read(2), then the buffer given to read(2) returns an unsigned 8-byte integer (uint64_t)
/// containing the number of expirations that have occurred.
/// (The returned value is in host byte order—that is, the native byte order for integers on the host machine.)
uint64_t buf;
while (true)
{
ssize_t res = ::read(timer_fd, &buf, sizeof(buf));
if (res < 0)
{
/// man timerfd_create:
/// If no timer expirations have occurred at the time of the read(2),
/// then the call either blocks until the next timer expiration, or fails with the error EAGAIN
/// if the file descriptor has been made nonblocking
/// (via the use of the fcntl(2) F_SETFL operation to set the O_NONBLOCK flag).
if (errno == EAGAIN)
break;
if (errno != EINTR)
throw ErrnoException(ErrorCodes::CANNOT_READ_FROM_SOCKET, "Cannot drain timer_fd");
/// A signal happened, need to retry.
if (errno == EINTR)
continue;
throw ErrnoException(ErrorCodes::CANNOT_READ_FROM_SOCKET, "Cannot drain timer_fd");
}
chassert(res == sizeof(buf));
}
}
void TimerDescriptor::setRelative(uint64_t usec) const
{
chassert(timer_fd >= 0);
static constexpr uint32_t TIMER_PRECISION = 1e6;
itimerspec spec;
@ -103,4 +120,5 @@ void TimerDescriptor::setRelative(Poco::Timespan timespan) const
}
}
#endif

View File

@ -12,7 +12,7 @@ private:
int timer_fd;
public:
explicit TimerDescriptor(int clockid = CLOCK_MONOTONIC, int flags = 0);
TimerDescriptor();
~TimerDescriptor();
TimerDescriptor(const TimerDescriptor &) = delete;

View File

@ -548,7 +548,7 @@ public:
virtual bool isExpired() const = 0;
/// Get the current connected node idx.
virtual Int8 getConnectedNodeIdx() const = 0;
virtual std::optional<int8_t> getConnectedNodeIdx() const = 0;
/// Get the current connected host and port.
virtual String getConnectedHostPort() const = 0;

View File

@ -39,7 +39,7 @@ public:
~TestKeeper() override;
bool isExpired() const override { return expired; }
Int8 getConnectedNodeIdx() const override { return 0; }
std::optional<int8_t> getConnectedNodeIdx() const override { return 0; }
String getConnectedHostPort() const override { return "TestKeeper:0000"; }
int32_t getConnectionXid() const override { return 0; }
int64_t getSessionID() const override { return 0; }

View File

@ -128,16 +128,15 @@ void ZooKeeper::init(ZooKeeperArgs args_, std::unique_ptr<Coordination::IKeeper>
ShuffleHosts shuffled_hosts = shuffleHosts();
impl = std::make_unique<Coordination::ZooKeeper>(shuffled_hosts, args, zk_log);
Int8 node_idx = impl->getConnectedNodeIdx();
auto node_idx = impl->getConnectedNodeIdx();
if (args.chroot.empty())
LOG_TRACE(log, "Initialized, hosts: {}", fmt::join(args.hosts, ","));
else
LOG_TRACE(log, "Initialized, hosts: {}, chroot: {}", fmt::join(args.hosts, ","), args.chroot);
/// If the balancing strategy has an optimal node then it will be the first in the list
bool connected_to_suboptimal_node = node_idx != shuffled_hosts[0].original_index;
bool connected_to_suboptimal_node = node_idx && static_cast<UInt8>(*node_idx) != shuffled_hosts[0].original_index;
bool respect_az = args.prefer_local_availability_zone && !args.client_availability_zone.empty();
bool may_benefit_from_reconnecting = respect_az || args.get_priority_load_balancing.hasOptimalNode();
if (connected_to_suboptimal_node && may_benefit_from_reconnecting)
@ -145,7 +144,7 @@ void ZooKeeper::init(ZooKeeperArgs args_, std::unique_ptr<Coordination::IKeeper>
auto reconnect_timeout_sec = getSecondsUntilReconnect(args);
LOG_DEBUG(log, "Connected to a suboptimal ZooKeeper host ({}, index {})."
" To preserve balance in ZooKeeper usage, this ZooKeeper session will expire in {} seconds",
impl->getConnectedHostPort(), node_idx, reconnect_timeout_sec);
impl->getConnectedHostPort(), *node_idx, reconnect_timeout_sec);
auto reconnect_task_holder = DB::Context::getGlobalContextInstance()->getSchedulePool().createTask("ZKReconnect", [this, optimal_host = shuffled_hosts[0]]()
{
@ -154,13 +153,15 @@ void ZooKeeper::init(ZooKeeperArgs args_, std::unique_ptr<Coordination::IKeeper>
LOG_DEBUG(log, "Trying to connect to a more optimal node {}", optimal_host.host);
ShuffleHosts node{optimal_host};
std::unique_ptr<Coordination::IKeeper> new_impl = std::make_unique<Coordination::ZooKeeper>(node, args, zk_log);
Int8 new_node_idx = new_impl->getConnectedNodeIdx();
auto new_node_idx = new_impl->getConnectedNodeIdx();
chassert(new_node_idx.has_value());
/// Maybe the node was unavailable when getting AZs first time, update just in case
if (args.availability_zone_autodetect && availability_zones[new_node_idx].empty())
if (args.availability_zone_autodetect && availability_zones[*new_node_idx].empty())
{
availability_zones[new_node_idx] = new_impl->tryGetAvailabilityZone();
LOG_DEBUG(log, "Got availability zone for {}: {}", optimal_host.host, availability_zones[new_node_idx]);
availability_zones[*new_node_idx] = new_impl->tryGetAvailabilityZone();
LOG_DEBUG(log, "Got availability zone for {}: {}", optimal_host.host, availability_zones[*new_node_idx]);
}
optimal_impl = std::move(new_impl);
@ -1525,7 +1526,7 @@ void ZooKeeper::setServerCompletelyStarted()
zk->setServerCompletelyStarted();
}
Int8 ZooKeeper::getConnectedHostIdx() const
std::optional<int8_t> ZooKeeper::getConnectedHostIdx() const
{
return impl->getConnectedNodeIdx();
}
@ -1544,10 +1545,10 @@ String ZooKeeper::getConnectedHostAvailabilityZone() const
{
if (args.implementation != "zookeeper" || !impl)
return "";
Int8 idx = impl->getConnectedNodeIdx();
if (idx < 0)
std::optional<int8_t> idx = impl->getConnectedNodeIdx();
if (!idx)
return ""; /// session expired
return availability_zones.at(idx);
return availability_zones.at(*idx);
}
size_t getFailedOpIndex(Coordination::Error exception_code, const Coordination::Responses & responses)

View File

@ -620,7 +620,7 @@ public:
void setServerCompletelyStarted();
Int8 getConnectedHostIdx() const;
std::optional<int8_t> getConnectedHostIdx() const;
String getConnectedHostPort() const;
int32_t getConnectionXid() const;

View File

@ -536,7 +536,7 @@ void ZooKeeper::connect(
compressed_out.emplace(*out, CompressionCodecFactory::instance().get("LZ4", {}));
}
original_index = static_cast<Int8>(node.original_index);
original_index.store(node.original_index);
break;
}
catch (...)
@ -1531,6 +1531,30 @@ void ZooKeeper::close()
}
std::optional<int8_t> ZooKeeper::getConnectedNodeIdx() const
{
int8_t res = original_index.load();
if (res == -1)
return std::nullopt;
else
return res;
}
String ZooKeeper::getConnectedHostPort() const
{
auto idx = getConnectedNodeIdx();
if (idx)
return args.hosts[*idx];
else
return "";
}
int32_t ZooKeeper::getConnectionXid() const
{
return next_xid.load();
}
void ZooKeeper::setZooKeeperLog(std::shared_ptr<DB::ZooKeeperLog> zk_log_)
{
/// logOperationIfNeeded(...) uses zk_log and can be called from different threads, so we have to use atomic shared_ptr

View File

@ -114,13 +114,12 @@ public:
~ZooKeeper() override;
/// If expired, you can only destroy the object. All other methods will throw exception.
bool isExpired() const override { return requests_queue.isFinished(); }
Int8 getConnectedNodeIdx() const override { return original_index; }
String getConnectedHostPort() const override { return (original_index == -1) ? "" : args.hosts[original_index]; }
int32_t getConnectionXid() const override { return next_xid.load(); }
std::optional<int8_t> getConnectedNodeIdx() const override;
String getConnectedHostPort() const override;
int32_t getConnectionXid() const override;
String tryGetAvailabilityZone() override;
@ -219,7 +218,7 @@ private:
ACLs default_acls;
zkutil::ZooKeeperArgs args;
Int8 original_index = -1;
std::atomic<int8_t> original_index{-1};
/// Fault injection
void maybeInjectSendFault();

View File

@ -1,2 +1,2 @@
clickhouse_add_executable (mysqlxx_pool_test mysqlxx_pool_test.cpp)
target_link_libraries (mysqlxx_pool_test PRIVATE mysqlxx clickhouse_common_config)
target_link_libraries (mysqlxx_pool_test PRIVATE mysqlxx clickhouse_common_config loggers_no_text_log)

View File

@ -158,7 +158,7 @@ BaseDaemon::~BaseDaemon()
tryLogCurrentException(&logger());
}
OwnSplitChannel::disableLogging();
disableLogging();
}

View File

@ -647,12 +647,13 @@ LoadTaskPtr DatabaseReplicated::startupDatabaseAsync(AsyncLoader & async_loader,
{
std::lock_guard lock{ddl_worker_mutex};
ddl_worker = std::make_unique<DatabaseReplicatedDDLWorker>(this, getContext());
ddl_worker->startup();
ddl_worker_initialized = true;
}
ddl_worker->startup();
ddl_worker_initialized = true;
});
std::scoped_lock lock(mutex);
return startup_replicated_database_task = makeLoadTask(async_loader, {job});
startup_replicated_database_task = makeLoadTask(async_loader, {job});
return startup_replicated_database_task;
}
void DatabaseReplicated::waitDatabaseStarted() const
@ -1530,8 +1531,11 @@ void DatabaseReplicated::stopReplication()
void DatabaseReplicated::shutdown()
{
stopReplication();
ddl_worker_initialized = false;
ddl_worker = nullptr;
{
std::lock_guard lock{ddl_worker_mutex};
ddl_worker_initialized = false;
ddl_worker = nullptr;
}
DatabaseAtomic::shutdown();
}
@ -1679,6 +1683,7 @@ bool DatabaseReplicated::canExecuteReplicatedMetadataAlter() const
/// It may update the metadata digest (both locally and in ZooKeeper)
/// before DatabaseReplicatedDDLWorker::initializeReplication() has finished.
/// We should not update metadata until the database is initialized.
std::lock_guard lock{ddl_worker_mutex};
return ddl_worker_initialized && ddl_worker->isCurrentlyActive();
}

View File

@ -155,7 +155,7 @@ private:
std::atomic_bool is_recovering = false;
std::atomic_bool ddl_worker_initialized = false;
std::unique_ptr<DatabaseReplicatedDDLWorker> ddl_worker;
std::mutex ddl_worker_mutex;
mutable std::mutex ddl_worker_mutex;
UInt32 max_log_ptr_at_creation = 0;
/// Usually operation with metadata are single-threaded because of the way replication works,

View File

@ -34,7 +34,7 @@ public:
String getFileName() const override { return impl->getFileName(); }
size_t getFileSize() override { return impl->getFileSize(); }
std::optional<size_t> tryGetFileSize() override { return impl->tryGetFileSize(); }
String getInfoForLog() override { return impl->getInfoForLog(); }

View File

@ -253,16 +253,15 @@ void ReadBufferFromAzureBlobStorage::initialize()
initialized = true;
}
size_t ReadBufferFromAzureBlobStorage::getFileSize()
std::optional<size_t> ReadBufferFromAzureBlobStorage::tryGetFileSize()
{
if (!blob_client)
blob_client = std::make_unique<Azure::Storage::Blobs::BlobClient>(blob_container_client->GetBlobClient(path));
if (file_size.has_value())
return *file_size;
if (!file_size)
file_size = blob_client->GetProperties().Value.BlobSize;
file_size = blob_client->GetProperties().Value.BlobSize;
return *file_size;
return file_size;
}
size_t ReadBufferFromAzureBlobStorage::readBigAt(char * to, size_t n, size_t range_begin, const std::function<bool(size_t)> & /*progress_callback*/) const

View File

@ -42,7 +42,7 @@ public:
bool supportsRightBoundedReads() const override { return true; }
size_t getFileSize() override;
std::optional<size_t> tryGetFileSize() override;
size_t readBigAt(char * to, size_t n, size_t range_begin, const std::function<bool(size_t)> & progress_callback) const override;

View File

@ -41,7 +41,7 @@ public:
void setReadUntilEnd() override { setReadUntilPosition(getFileSize()); }
size_t getFileSize() override { return getTotalSize(blobs_to_read); }
std::optional<size_t> tryGetFileSize() override { return getTotalSize(blobs_to_read); }
size_t getFileOffsetOfBufferEnd() const override { return file_offset_of_buffer_end; }

View File

@ -321,7 +321,7 @@ public:
off_t getPosition() override { throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "getPosition not supported when reading from archive"); }
String getFileName() const override { return handle.getFileName(); }
size_t getFileSize() override { return handle.getFileInfo().uncompressed_size; }
std::optional<size_t> tryGetFileSize() override { return handle.getFileInfo().uncompressed_size; }
Handle releaseHandle() && { return std::move(handle); }

View File

@ -317,7 +317,7 @@ public:
String getFileName() const override { return handle.getFileName(); }
size_t getFileSize() override { return handle.getFileInfo().uncompressed_size; }
std::optional<size_t> tryGetFileSize() override { return handle.getFileInfo().uncompressed_size; }
/// Releases owned handle to pass it to an enumerator.
HandleHolder releaseHandle() &&

View File

@ -93,7 +93,10 @@ void AsynchronousReadBufferFromFile::close()
return;
if (0 != ::close(fd))
{
fd = -1;
throw Exception(ErrorCodes::CANNOT_CLOSE_FILE, "Cannot close file");
}
fd = -1;
}

View File

@ -244,7 +244,7 @@ void AsynchronousReadBufferFromFileDescriptor::rewind()
file_offset_of_buffer_end = 0;
}
size_t AsynchronousReadBufferFromFileDescriptor::getFileSize()
std::optional<size_t> AsynchronousReadBufferFromFileDescriptor::tryGetFileSize()
{
return getSizeFromFileDescriptor(fd, getFileName());
}

View File

@ -68,7 +68,7 @@ public:
/// Seek to the beginning, discarding already read data if any. Useful to reread file that changes on every read.
void rewind();
size_t getFileSize() override;
std::optional<size_t> tryGetFileSize() override;
size_t getFileOffsetOfBufferEnd() const override { return file_offset_of_buffer_end; }

View File

@ -21,7 +21,7 @@ public:
off_t seek(off_t off, int whence) override;
off_t getPosition() override;
size_t getFileSize() override { return total_size; }
std::optional<size_t> tryGetFileSize() override { return total_size; }
private:
bool nextImpl() override;

View File

@ -77,7 +77,10 @@ void MMapReadBufferFromFile::close()
finish();
if (0 != ::close(fd))
{
fd = -1;
throw Exception(ErrorCodes::CANNOT_CLOSE_FILE, "Cannot close file");
}
fd = -1;
metric_increment.destroy();

View File

@ -87,7 +87,7 @@ off_t MMapReadBufferFromFileDescriptor::seek(off_t offset, int whence)
return new_pos;
}
size_t MMapReadBufferFromFileDescriptor::getFileSize()
std::optional<size_t> MMapReadBufferFromFileDescriptor::tryGetFileSize()
{
return getSizeFromFileDescriptor(getFD(), getFileName());
}

View File

@ -38,7 +38,7 @@ public:
int getFD() const;
size_t getFileSize() override;
std::optional<size_t> tryGetFileSize() override;
size_t readBigAt(char * to, size_t n, size_t offset, const std::function<bool(size_t)> &) const override;
bool supportsReadAt() override { return true; }

View File

@ -69,7 +69,10 @@ void MMappedFile::close()
finish();
if (0 != ::close(fd))
{
fd = -1;
throw Exception(ErrorCodes::CANNOT_CLOSE_FILE, "Cannot close file");
}
fd = -1;
metric_increment.destroy();

View File

@ -67,11 +67,13 @@ void OpenedFile::close()
return;
if (0 != ::close(fd))
{
fd = -1;
throw Exception(ErrorCodes::CANNOT_CLOSE_FILE, "Cannot close file");
}
fd = -1;
metric_increment.destroy();
}
}

View File

@ -152,7 +152,7 @@ off_t ParallelReadBuffer::seek(off_t offset, int whence)
return offset;
}
size_t ParallelReadBuffer::getFileSize()
std::optional<size_t> ParallelReadBuffer::tryGetFileSize()
{
return file_size;
}

View File

@ -33,7 +33,7 @@ public:
~ParallelReadBuffer() override { finishAndWait(); }
off_t seek(off_t off, int whence) override;
size_t getFileSize() override;
std::optional<size_t> tryGetFileSize() override;
off_t getPosition() override;
const SeekableReadBuffer & getReadBuffer() const { return input; }

View File

@ -19,7 +19,8 @@ private:
std::string getFileName() const override { return "<empty>"; }
off_t seek(off_t /*off*/, int /*whence*/) override { return 0; }
off_t getPosition() override { return 0; }
size_t getFileSize() override { return 0; }
std::optional<size_t> tryGetFileSize() override { return 0; }
size_t getFileOffsetOfBufferEnd() const override { return 0; }
};
}

View File

@ -30,7 +30,7 @@ public:
void setReadUntilEnd() override { in->setReadUntilEnd(); }
size_t getFileSize() override { return in->getFileSize(); }
std::optional<size_t> tryGetFileSize() override { return in->tryGetFileSize(); }
private:
bool nextImpl() override;

View File

@ -88,7 +88,10 @@ void ReadBufferFromFile::close()
return;
if (0 != ::close(fd))
{
fd = -1;
throw Exception(ErrorCodes::CANNOT_CLOSE_FILE, "Cannot close file");
}
fd = -1;
metric_increment.destroy();

View File

@ -5,11 +5,6 @@
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_FILE_SIZE;
}
ReadBufferFromFileBase::ReadBufferFromFileBase() : BufferWithOwnMemory<SeekableReadBuffer>(0)
{
}
@ -26,11 +21,9 @@ ReadBufferFromFileBase::ReadBufferFromFileBase(
ReadBufferFromFileBase::~ReadBufferFromFileBase() = default;
size_t ReadBufferFromFileBase::getFileSize()
std::optional<size_t> ReadBufferFromFileBase::tryGetFileSize()
{
if (file_size)
return *file_size;
throw Exception(ErrorCodes::UNKNOWN_FILE_SIZE, "Cannot find out file size for read buffer");
return file_size;
}
void ReadBufferFromFileBase::setProgressCallback(ContextPtr context)

View File

@ -50,7 +50,7 @@ public:
clock_type = clock_type_;
}
size_t getFileSize() override;
std::optional<size_t> tryGetFileSize() override;
void setProgressCallback(ContextPtr context);

View File

@ -52,9 +52,9 @@ bool ReadBufferFromFileDecorator::nextImpl()
return result;
}
size_t ReadBufferFromFileDecorator::getFileSize()
std::optional<size_t> ReadBufferFromFileDecorator::tryGetFileSize()
{
return getFileSizeFromReadBuffer(*impl);
return tryGetFileSizeFromReadBuffer(*impl);
}
}

View File

@ -27,7 +27,7 @@ public:
ReadBuffer & getWrappedReadBuffer() { return *impl; }
size_t getFileSize() override;
std::optional<size_t> tryGetFileSize() override;
protected:
std::unique_ptr<SeekableReadBuffer> impl;

View File

@ -253,7 +253,7 @@ void ReadBufferFromFileDescriptor::rewind()
file_offset_of_buffer_end = 0;
}
size_t ReadBufferFromFileDescriptor::getFileSize()
std::optional<size_t> ReadBufferFromFileDescriptor::tryGetFileSize()
{
return getSizeFromFileDescriptor(fd, getFileName());
}

View File

@ -69,7 +69,7 @@ public:
/// Seek to the beginning, discarding already read data if any. Useful to reread file that changes on every read.
void rewind();
size_t getFileSize() override;
std::optional<size_t> tryGetFileSize() override;
bool checkIfActuallySeekable() override;

View File

@ -311,15 +311,15 @@ off_t ReadBufferFromS3::seek(off_t offset_, int whence)
return offset;
}
size_t ReadBufferFromS3::getFileSize()
std::optional<size_t> ReadBufferFromS3::tryGetFileSize()
{
if (file_size)
return *file_size;
return file_size;
auto object_size = S3::getObjectSize(*client_ptr, bucket, key, version_id);
file_size = object_size;
return *file_size;
return file_size;
}
off_t ReadBufferFromS3::getPosition()

View File

@ -63,7 +63,7 @@ public:
off_t getPosition() override;
size_t getFileSize() override;
std::optional<size_t> tryGetFileSize() override;
void setReadUntilPosition(size_t position) override;
void setReadUntilEnd() override;

View File

@ -72,7 +72,6 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int SEEK_POSITION_OUT_OF_BOUND;
extern const int UNKNOWN_FILE_SIZE;
}
std::unique_ptr<ReadBuffer> ReadWriteBufferFromHTTP::CallResult::transformToReadBuffer(size_t buf_size) &&
@ -121,15 +120,33 @@ void ReadWriteBufferFromHTTP::prepareRequest(Poco::Net::HTTPRequest & request, s
credentials.authenticate(request);
}
size_t ReadWriteBufferFromHTTP::getFileSize()
std::optional<size_t> ReadWriteBufferFromHTTP::tryGetFileSize()
{
if (!file_info)
file_info = getFileInfo();
{
try
{
file_info = getFileInfo();
}
catch (const HTTPException &)
{
return std::nullopt;
}
catch (const NetException &)
{
return std::nullopt;
}
catch (const Poco::Net::NetException &)
{
return std::nullopt;
}
catch (const Poco::IOException &)
{
return std::nullopt;
}
}
if (file_info->file_size)
return *file_info->file_size;
throw Exception(ErrorCodes::UNKNOWN_FILE_SIZE, "Cannot find out file size for: {}", initial_uri.toString());
return file_info->file_size;
}
bool ReadWriteBufferFromHTTP::supportsReadAt()
@ -311,12 +328,12 @@ void ReadWriteBufferFromHTTP::doWithRetries(std::function<void()> && callable,
error_message = e.displayText();
exception = std::current_exception();
}
catch (DB::NetException & e)
catch (NetException & e)
{
error_message = e.displayText();
exception = std::current_exception();
}
catch (DB::HTTPException & e)
catch (HTTPException & e)
{
if (!isRetriableError(e.getHTTPStatus()))
is_retriable = false;
@ -324,7 +341,7 @@ void ReadWriteBufferFromHTTP::doWithRetries(std::function<void()> && callable,
error_message = e.displayText();
exception = std::current_exception();
}
catch (DB::Exception & e)
catch (Exception & e)
{
is_retriable = false;
@ -683,7 +700,19 @@ std::optional<time_t> ReadWriteBufferFromHTTP::tryGetLastModificationTime()
{
file_info = getFileInfo();
}
catch (...)
catch (const HTTPException &)
{
return std::nullopt;
}
catch (const NetException &)
{
return std::nullopt;
}
catch (const Poco::Net::NetException &)
{
return std::nullopt;
}
catch (const Poco::IOException &)
{
return std::nullopt;
}
@ -704,7 +733,7 @@ ReadWriteBufferFromHTTP::HTTPFileInfo ReadWriteBufferFromHTTP::getFileInfo()
{
getHeadResponse(response);
}
catch (HTTPException & e)
catch (const HTTPException & e)
{
/// Maybe the web server doesn't support HEAD requests.
/// E.g. webhdfs reports status 400.

View File

@ -118,7 +118,7 @@ private:
std::unique_ptr<ReadBuffer> initialize();
size_t getFileSize() override;
std::optional<size_t> tryGetFileSize() override;
bool supportsReadAt() override;

View File

@ -13,41 +13,47 @@ namespace ErrorCodes
extern const int UNKNOWN_FILE_SIZE;
}
template <typename T>
static size_t getFileSize(T & in)
size_t WithFileSize::getFileSize()
{
if (auto * with_file_size = dynamic_cast<WithFileSize *>(&in))
{
return with_file_size->getFileSize();
}
if (auto maybe_size = tryGetFileSize())
return *maybe_size;
throw Exception(ErrorCodes::UNKNOWN_FILE_SIZE, "Cannot find out file size");
}
size_t getFileSizeFromReadBuffer(ReadBuffer & in)
template <typename T>
static std::optional<size_t> tryGetFileSize(T & in)
{
if (auto * delegate = dynamic_cast<ReadBufferFromFileDecorator *>(&in))
{
return getFileSize(delegate->getWrappedReadBuffer());
}
else if (auto * compressed = dynamic_cast<CompressedReadBufferWrapper *>(&in))
{
return getFileSize(compressed->getWrappedReadBuffer());
}
if (auto * with_file_size = dynamic_cast<WithFileSize *>(&in))
return with_file_size->tryGetFileSize();
return getFileSize(in);
return std::nullopt;
}
template <typename T>
static size_t getFileSize(T & in)
{
if (auto maybe_size = tryGetFileSize(in))
return *maybe_size;
throw Exception(ErrorCodes::UNKNOWN_FILE_SIZE, "Cannot find out file size");
}
std::optional<size_t> tryGetFileSizeFromReadBuffer(ReadBuffer & in)
{
try
{
return getFileSizeFromReadBuffer(in);
}
catch (...)
{
return std::nullopt;
}
if (auto * delegate = dynamic_cast<ReadBufferFromFileDecorator *>(&in))
return tryGetFileSize(delegate->getWrappedReadBuffer());
else if (auto * compressed = dynamic_cast<CompressedReadBufferWrapper *>(&in))
return tryGetFileSize(compressed->getWrappedReadBuffer());
return tryGetFileSize(in);
}
size_t getFileSizeFromReadBuffer(ReadBuffer & in)
{
if (auto maybe_size = tryGetFileSizeFromReadBuffer(in))
return *maybe_size;
throw Exception(ErrorCodes::UNKNOWN_FILE_SIZE, "Cannot find out file size");
}
bool isBufferWithFileSize(const ReadBuffer & in)

View File

@ -10,15 +10,16 @@ class ReadBuffer;
class WithFileSize
{
public:
virtual size_t getFileSize() = 0;
/// Returns nullopt if couldn't find out file size;
virtual std::optional<size_t> tryGetFileSize() = 0;
virtual ~WithFileSize() = default;
size_t getFileSize();
};
bool isBufferWithFileSize(const ReadBuffer & in);
size_t getFileSizeFromReadBuffer(ReadBuffer & in);
/// Return nullopt if couldn't find out file size;
std::optional<size_t> tryGetFileSizeFromReadBuffer(ReadBuffer & in);
size_t getDataOffsetMaybeCompressed(const ReadBuffer & in);

View File

@ -116,7 +116,10 @@ void WriteBufferFromFile::close()
finalize();
if (0 != ::close(fd))
{
fd = -1;
throw Exception(ErrorCodes::CANNOT_CLOSE_FILE, "Cannot close file");
}
fd = -1;
metric_increment.destroy();

View File

@ -13,10 +13,6 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
namespace
{
@ -237,16 +233,8 @@ void SubstituteColumnOptimizer::perform()
const auto & compare_graph = metadata_snapshot->getConstraints().getGraph();
// Fill aliases
if (select_query->select())
{
auto * list = select_query->refSelect()->as<ASTExpressionList>();
if (!list)
throw Exception(ErrorCodes::LOGICAL_ERROR, "List of selected columns must be ASTExpressionList");
for (ASTPtr & ast : list->children)
ast->setAlias(ast->getAliasOrColumnName());
}
if (compare_graph.getNumOfComponents() == 0)
return;
auto run_for_all = [&](const auto func)
{

View File

@ -15,7 +15,7 @@ struct StorageInMemoryMetadata;
using StorageMetadataPtr = std::shared_ptr<const StorageInMemoryMetadata>;
/// Optimizer that tries to replace columns to equal columns (according to constraints)
/// with lower size (according to compressed and uncomressed size).
/// with lower size (according to compressed and uncompressed sizes).
class SubstituteColumnOptimizer
{
public:

View File

@ -16,16 +16,9 @@
namespace DB
{
static constinit std::atomic<bool> allow_logging{true};
void OwnSplitChannel::disableLogging()
{
allow_logging = false;
}
void OwnSplitChannel::log(const Poco::Message & msg)
{
if (!allow_logging)
if (!isLoggingEnabled())
return;
#ifndef WITHOUT_TEXT_LOG

View File

@ -39,8 +39,6 @@ public:
void setLevel(const std::string & name, int level);
static void disableLogging();
private:
void logSplit(const Poco::Message & msg);
void tryLogSplit(const Poco::Message & msg);

View File

@ -66,7 +66,7 @@ public:
/** Set the alias. */
virtual void setAlias(const String & /*to*/)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't set alias of {}", getColumnName());
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't set alias of {} of {}", getColumnName(), getID());
}
/** Get the text that identifies this element. */

View File

@ -8,7 +8,6 @@
#include <Storages/IStorage_fwd.h>
#include <Interpreters/Context.h>
#include <Interpreters/StorageID.h>
#include <Common/TimerDescriptor.h>
#include <sys/types.h>

View File

@ -69,7 +69,7 @@ private:
/// * timer is a timerfd descriptor to manually check socket timeout
/// * pipe_fd is a pipe we use to cancel query and socket polling by executor.
/// We put those descriptors into our own epoll which is used by external executor.
TimerDescriptor timer{CLOCK_MONOTONIC, 0};
TimerDescriptor timer;
Poco::Timespan timeout;
AsyncEventTimeoutType timeout_type;
std::atomic_bool is_timer_alarmed = false;

View File

@ -53,7 +53,7 @@ public:
bool nextImpl() override;
off_t seek(off_t off, int whence) override;
off_t getPosition() override;
size_t getFileSize() override { return remote_file_size; }
std::optional<size_t> tryGetFileSize() override { return remote_file_size; }
private:
std::unique_ptr<LocalFileHolder> local_file_holder;

View File

@ -91,9 +91,9 @@ void AsynchronousReadBufferFromHDFS::prefetch(Priority priority)
}
size_t AsynchronousReadBufferFromHDFS::getFileSize()
std::optional<size_t> AsynchronousReadBufferFromHDFS::tryGetFileSize()
{
return impl->getFileSize();
return impl->tryGetFileSize();
}
String AsynchronousReadBufferFromHDFS::getFileName() const

View File

@ -35,7 +35,7 @@ public:
void prefetch(Priority priority) override;
size_t getFileSize() override;
std::optional<size_t> tryGetFileSize() override;
String getFileName() const override;

View File

@ -31,7 +31,7 @@ namespace ErrorCodes
}
struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<SeekableReadBuffer>
struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<SeekableReadBuffer>, public WithFileSize
{
String hdfs_uri;
String hdfs_file_path;
@ -90,7 +90,7 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
hdfsCloseFile(fs.get(), fin);
}
size_t getFileSize() const
std::optional<size_t> tryGetFileSize() override
{
return file_size;
}
@ -191,9 +191,9 @@ ReadBufferFromHDFS::ReadBufferFromHDFS(
ReadBufferFromHDFS::~ReadBufferFromHDFS() = default;
size_t ReadBufferFromHDFS::getFileSize()
std::optional<size_t> ReadBufferFromHDFS::tryGetFileSize()
{
return impl->getFileSize();
return impl->tryGetFileSize();
}
bool ReadBufferFromHDFS::nextImpl()

View File

@ -40,7 +40,7 @@ public:
off_t getPosition() override;
size_t getFileSize() override;
std::optional<size_t> tryGetFileSize() override;
size_t getFileOffsetOfBufferEnd() const override;

View File

@ -36,6 +36,7 @@ namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
extern const int BAD_QUERY_PARAMETER;
extern const int QUERY_NOT_ALLOWED;
}
@ -150,7 +151,7 @@ StorageObjectStorageQueue::StorageObjectStorageQueue(
}
else if (!configuration->isPathWithGlobs())
{
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "ObjectStorageQueue url must either end with '/' or contain globs");
throw Exception(ErrorCodes::BAD_QUERY_PARAMETER, "ObjectStorageQueue url must either end with '/' or contain globs");
}
checkAndAdjustSettings(*queue_settings, engine_args, mode > LoadingStrictnessLevel::CREATE, log);

View File

@ -4,6 +4,7 @@
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeNullable.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Coordination/KeeperFeatureFlags.h>
#include <Storages/System/StorageSystemZooKeeperConnection.h>
@ -27,7 +28,7 @@ ColumnsDescription StorageSystemZooKeeperConnection::getColumnsDescription()
/* 0 */ {"name", std::make_shared<DataTypeString>(), "ZooKeeper cluster's name."},
/* 1 */ {"host", std::make_shared<DataTypeString>(), "The hostname/IP of the ZooKeeper node that ClickHouse connected to."},
/* 2 */ {"port", std::make_shared<DataTypeUInt16>(), "The port of the ZooKeeper node that ClickHouse connected to."},
/* 3 */ {"index", std::make_shared<DataTypeUInt8>(), "The index of the ZooKeeper node that ClickHouse connected to. The index is from ZooKeeper config."},
/* 3 */ {"index", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt8>()), "The index of the ZooKeeper node that ClickHouse connected to. The index is from ZooKeeper config. If not connected, this column is NULL."},
/* 4 */ {"connected_time", std::make_shared<DataTypeDateTime>(), "When the connection was established."},
/* 5 */ {"session_uptime_elapsed_seconds", std::make_shared<DataTypeUInt64>(), "Seconds elapsed since the connection was established."},
/* 6 */ {"is_expired", std::make_shared<DataTypeUInt8>(), "Is the current connection expired."},
@ -64,7 +65,7 @@ void StorageSystemZooKeeperConnection::fillData(MutableColumns & res_columns, Co
/// For read-only snapshot type functionality, it's acceptable even though 'getZooKeeper' may cause data inconsistency.
auto fill_data = [&](const String & name, const zkutil::ZooKeeperPtr zookeeper, MutableColumns & columns)
{
Int8 index = zookeeper->getConnectedHostIdx();
auto index = zookeeper->getConnectedHostIdx();
String host_port = zookeeper->getConnectedHostPort();
if (index != -1 && !host_port.empty())
{
@ -78,7 +79,10 @@ void StorageSystemZooKeeperConnection::fillData(MutableColumns & res_columns, Co
columns[0]->insert(name);
columns[1]->insert(host);
columns[2]->insert(port);
columns[3]->insert(index);
if (index)
columns[3]->insert(*index);
else
columns[3]->insertDefault();
columns[4]->insert(connected_time);
columns[5]->insert(uptime);
columns[6]->insert(zookeeper->expired());

View File

@ -420,7 +420,12 @@ class Backport:
fetch_release_prs = self.gh.get_release_pulls(self._fetch_from)
fetch_release_branches = [pr.head.ref for pr in fetch_release_prs]
self.labels_to_backport = [
f"v{branch}-must-backport" for branch in fetch_release_branches
(
f"v{branch}-must-backport"
if self._repo_name == "ClickHouse/ClickHouse"
else f"v{branch.replace('release/','')}-must-backport"
)
for branch in fetch_release_branches
]
logging.info("Fetching from %s", self._fetch_from)
@ -490,17 +495,23 @@ class Backport:
def process_pr(self, pr: PullRequest) -> None:
pr_labels = [label.name for label in pr.labels]
if (
any(label in pr_labels for label in self.must_create_backport_labels)
or self._repo_name != self._fetch_from
):
if any(label in pr_labels for label in self.must_create_backport_labels):
branches = [
ReleaseBranch(br, pr, self.repo, self.backport_created_label)
for br in self.release_branches
] # type: List[ReleaseBranch]
else:
branches = [
ReleaseBranch(br, pr, self.repo, self.backport_created_label)
ReleaseBranch(
(
br
if self._repo_name == "ClickHouse/Clickhouse"
else f"release/{br}"
),
pr,
self.repo,
self.backport_created_label,
)
for br in [
label.split("-", 1)[0][1:] # v21.8-must-backport
for label in pr_labels

View File

@ -135,6 +135,7 @@ th {{ cursor: pointer; }}
tr:hover {{ filter: var(--tr-hover-filter); }}
.expandable {{ cursor: pointer; }}
.expandable-content {{ display: none; }}
pre {{ white-space: pre-wrap; }}
#fish {{ display: none; float: right; position: relative; top: -20em; right: 2vw; margin-bottom: -20em; width: 30vw; filter: brightness(7%); z-index: -1; }}
.themes {{

View File

@ -1,4 +0,0 @@
275 0 138 136 0
275 0
275 0 138 136 0
275 0

View File

@ -1,5 +1,5 @@
#!/usr/bin/env bash
# Tags: long, no-parallel, no-ordinary-database, no-debug
# Tags: long, no-parallel, no-ordinary-database
# Test is too heavy, avoid parallel run in Flaky Check
# shellcheck disable=SC2119
@ -7,82 +7,126 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
set -e
set -ue
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS src";
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS dst";
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS mv";
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS tmp";
$CLICKHOUSE_CLIENT --query "CREATE TABLE src (n Int8, m Int8, CONSTRAINT c CHECK xxHash32(n+m) % 8 != 0) ENGINE=MergeTree ORDER BY n PARTITION BY 0 < n SETTINGS old_parts_lifetime=0";
$CLICKHOUSE_CLIENT --query "CREATE TABLE dst (nm Int16, CONSTRAINT c CHECK xxHash32(nm) % 8 != 0) ENGINE=MergeTree ORDER BY nm SETTINGS old_parts_lifetime=0";
$CLICKHOUSE_CLIENT --query "CREATE MATERIALIZED VIEW mv TO dst (nm Int16) AS SELECT n*m AS nm FROM src";
$CLICKHOUSE_CLIENT --query "CREATE TABLE tmp (x UInt8, nm Int16) ENGINE=MergeTree ORDER BY (x, nm) SETTINGS old_parts_lifetime=0"
$CLICKHOUSE_CLIENT --query "CREATE TABLE src (n Int32, m Int32, CONSTRAINT c CHECK xxHash32(n+m) % 8 != 0) ENGINE=MergeTree ORDER BY n PARTITION BY 0 < n SETTINGS old_parts_lifetime=0";
$CLICKHOUSE_CLIENT --query "CREATE TABLE dst (nm Int32, CONSTRAINT c CHECK xxHash32(nm) % 8 != 0) ENGINE=MergeTree ORDER BY nm SETTINGS old_parts_lifetime=0";
$CLICKHOUSE_CLIENT --query "CREATE MATERIALIZED VIEW mv TO dst (nm Int32) AS SELECT n*m AS nm FROM src";
$CLICKHOUSE_CLIENT --query "CREATE TABLE tmp (x UInt32, nm Int32) ENGINE=MergeTree ORDER BY (x, nm) SETTINGS old_parts_lifetime=0"
$CLICKHOUSE_CLIENT --query "INSERT INTO src VALUES (0, 0)"
# some transactions will fail due to constraint
function thread_insert_commit()
function get_now()
{
set -e
for i in {1..100}; do
$CLICKHOUSE_CLIENT --multiquery --query "
BEGIN TRANSACTION;
INSERT INTO src VALUES /* ($i, $1) */ ($i, $1);
SELECT throwIf((SELECT sum(nm) FROM mv) != $(($i * $1))) FORMAT Null;
INSERT INTO src VALUES /* (-$i, $1) */ (-$i, $1);
COMMIT;" 2>&1| grep -Fv "is violated at row" | grep -Fv "Transaction is not in RUNNING state" | grep -F "Received from " ||:
done
date +%s
}
function thread_insert_rollback()
is_pid_exist()
{
local pid=$1
ps -p $pid > /dev/null
}
function run_until_deadline_and_at_least_times()
{
set -e
for _ in {1..100}; do
$CLICKHOUSE_CLIENT --multiquery --query "
BEGIN TRANSACTION;
INSERT INTO src VALUES /* (42, $1) */ (42, $1);
SELECT throwIf((SELECT count() FROM src WHERE n=42 AND m=$1) != 1) FORMAT Null;
ROLLBACK;"
local deadline=$1; shift
local min_iterations=$1; shift
local function_to_run=$1; shift
local started_time
started_time=$(get_now)
local i=0
while true
do
$function_to_run $i "$@"
[[ $(get_now) -lt $deadline ]] || break
i=$(($i + 1))
done
[[ $i -gt $min_iterations ]] || echo "$i/$min_iterations : not enough iterations of $function_to_run has been made from $started_time until $deadline" >&2
}
function insert_commit_action()
{
set -e
local i=$1; shift
local tag=$1; shift
# some transactions will fail due to constraint
$CLICKHOUSE_CLIENT --multiquery --query "
BEGIN TRANSACTION;
INSERT INTO src VALUES /* ($i, $tag) */ ($i, $tag);
SELECT throwIf((SELECT sum(nm) FROM mv) != $(($i * $tag))) /* ($i, $tag) */ FORMAT Null;
INSERT INTO src VALUES /* (-$i, $tag) */ (-$i, $tag);
COMMIT;
" 2>&1 \
| grep -Fv "is violated at row" | grep -Fv "Transaction is not in RUNNING state" | grep -F "Received from " ||:
}
function insert_rollback_action()
{
set -e
local i=$1; shift
local tag=$1; shift
$CLICKHOUSE_CLIENT --multiquery --query "
BEGIN TRANSACTION;
INSERT INTO src VALUES /* (42, $tag) */ (42, $tag);
SELECT throwIf((SELECT count() FROM src WHERE n=42 AND m=$tag) != 1) FORMAT Null;
ROLLBACK;"
}
# make merges more aggressive
function thread_optimize()
function optimize_action()
{
set -e
while true; do
optimize_query="OPTIMIZE TABLE src"
partition_id=$(( RANDOM % 2 ))
if (( RANDOM % 2 )); then
optimize_query="OPTIMIZE TABLE dst"
partition_id="all"
fi
if (( RANDOM % 2 )); then
optimize_query="$optimize_query PARTITION ID '$partition_id'"
fi
if (( RANDOM % 2 )); then
optimize_query="$optimize_query FINAL"
fi
action="COMMIT"
if (( RANDOM % 4 )); then
action="ROLLBACK"
fi
$CLICKHOUSE_CLIENT --multiquery --query "
optimize_query="OPTIMIZE TABLE src"
partition_id=$(( RANDOM % 2 ))
if (( RANDOM % 2 )); then
optimize_query="OPTIMIZE TABLE dst"
partition_id="all"
fi
if (( RANDOM % 2 )); then
optimize_query="$optimize_query PARTITION ID '$partition_id'"
fi
if (( RANDOM % 2 )); then
optimize_query="$optimize_query FINAL"
fi
action="COMMIT"
if (( RANDOM % 4 )); then
action="ROLLBACK"
fi
$CLICKHOUSE_CLIENT --multiquery --query "
BEGIN TRANSACTION;
$optimize_query;
$optimize_query;
$action;
" 2>&1| grep -Fv "already exists, but it will be deleted soon" | grep -F "Received from " ||:
sleep 0.$RANDOM;
done
" 2>&1 \
| grep -Fv "already exists, but it will be deleted soon" | grep -F "Received from " ||:
sleep 0.$RANDOM;
}
function thread_select()
function select_action()
{
set -e
while true; do
$CLICKHOUSE_CLIENT --multiquery --query "
$CLICKHOUSE_CLIENT --multiquery --query "
BEGIN TRANSACTION;
SELECT throwIf((SELECT (sum(n), count() % 2) FROM src) != (0, 1)) FORMAT Null;
SELECT throwIf((SELECT (sum(nm), count() % 2) FROM mv) != (0, 1)) FORMAT Null;
@ -90,14 +134,13 @@ function thread_select()
SELECT throwIf((SELECT arraySort(groupArray(nm)) FROM mv) != (SELECT arraySort(groupArray(nm)) FROM dst)) FORMAT Null;
SELECT throwIf((SELECT arraySort(groupArray(nm)) FROM mv) != (SELECT arraySort(groupArray(n*m)) FROM src)) FORMAT Null;
COMMIT;"
done
}
function thread_select_insert()
function select_insert_action()
{
set -e
while true; do
$CLICKHOUSE_CLIENT --multiquery --query "
$CLICKHOUSE_CLIENT --multiquery --query "
BEGIN TRANSACTION;
SELECT throwIf((SELECT count() FROM tmp) != 0) FORMAT Null;
INSERT INTO tmp SELECT 1, n*m FROM src;
@ -110,36 +153,69 @@ function thread_select_insert()
SELECT throwIf(1 != (SELECT countDistinct(arr) FROM (SELECT x, arraySort(groupArray(nm)) AS arr FROM tmp WHERE x!=4 GROUP BY x))) FORMAT Null;
SELECT throwIf((SELECT count(), sum(nm) FROM tmp WHERE x=4) != (SELECT count(), sum(nm) FROM tmp WHERE x!=4)) FORMAT Null;
ROLLBACK;"
done
}
thread_insert_commit 1 & PID_1=$!
thread_insert_commit 2 & PID_2=$!
thread_insert_rollback 3 & PID_3=$!
MAIN_TIME_PART=400
SECOND_TIME_PART=30
WAIT_FINISH=60
LAST_TIME_GAP=10
thread_optimize & PID_4=$!
thread_select & PID_5=$!
thread_select_insert & PID_6=$!
sleep 0.$RANDOM;
thread_select & PID_7=$!
thread_select_insert & PID_8=$!
if [[ $((MAIN_TIME_PART + SECOND_TIME_PART + WAIT_FINISH + LAST_TIME_GAP)) -ge 600 ]]; then
echo "time sttings are wrong" 2>&1
exit 1
fi
wait $PID_1 && wait $PID_2 && wait $PID_3
kill -TERM $PID_4
kill -TERM $PID_5
kill -TERM $PID_6
kill -TERM $PID_7
kill -TERM $PID_8
wait
wait_for_queries_to_finish 40
START_TIME=$(get_now)
STOP_TIME=$((START_TIME + MAIN_TIME_PART))
SECOND_STOP_TIME=$((STOP_TIME + SECOND_TIME_PART))
MIN_ITERATIONS=20
run_until_deadline_and_at_least_times $STOP_TIME $MIN_ITERATIONS insert_commit_action 1 & PID_1=$!
run_until_deadline_and_at_least_times $STOP_TIME $MIN_ITERATIONS insert_commit_action 2 & PID_2=$!
run_until_deadline_and_at_least_times $STOP_TIME $MIN_ITERATIONS insert_rollback_action 3 & PID_3=$!
run_until_deadline_and_at_least_times $SECOND_STOP_TIME $MIN_ITERATIONS optimize_action & PID_4=$!
run_until_deadline_and_at_least_times $SECOND_STOP_TIME $MIN_ITERATIONS select_action & PID_5=$!
run_until_deadline_and_at_least_times $SECOND_STOP_TIME $MIN_ITERATIONS select_insert_action & PID_6=$!
sleep 0.$RANDOM
run_until_deadline_and_at_least_times $SECOND_STOP_TIME $MIN_ITERATIONS select_action & PID_7=$!
run_until_deadline_and_at_least_times $SECOND_STOP_TIME $MIN_ITERATIONS select_insert_action & PID_8=$!
wait $PID_1 || echo "insert_commit_action has failed with status $?" 2>&1
wait $PID_2 || echo "second insert_commit_action has failed with status $?" 2>&1
wait $PID_3 || echo "insert_rollback_action has failed with status $?" 2>&1
is_pid_exist $PID_4 || echo "optimize_action is not running" 2>&1
is_pid_exist $PID_5 || echo "select_action is not running" 2>&1
is_pid_exist $PID_6 || echo "select_insert_action is not running" 2>&1
is_pid_exist $PID_7 || echo "second select_action is not running" 2>&1
is_pid_exist $PID_8 || echo "second select_insert_action is not running" 2>&1
wait $PID_4 || echo "optimize_action has failed with status $?" 2>&1
wait $PID_5 || echo "select_action has failed with status $?" 2>&1
wait $PID_6 || echo "select_insert_action has failed with status $?" 2>&1
wait $PID_7 || echo "second select_action has failed with status $?" 2>&1
wait $PID_8 || echo "second select_insert_action has failed with status $?" 2>&1
wait_for_queries_to_finish $WAIT_FINISH
$CLICKHOUSE_CLIENT --multiquery --query "
BEGIN TRANSACTION;
SELECT count(), sum(n), sum(m=1), sum(m=2), sum(m=3) FROM src;
SELECT count(), sum(nm) FROM mv";
BEGIN TRANSACTION;
SELECT throwIf((SELECT (sum(n), count() % 2) FROM src) != (0, 1)) FORMAT Null;
SELECT throwIf((SELECT (sum(nm), count() % 2) FROM mv) != (0, 1)) FORMAT Null;
SELECT throwIf((SELECT (sum(nm), count() % 2) FROM dst) != (0, 1)) FORMAT Null;
SELECT throwIf((SELECT arraySort(groupArray(nm)) FROM mv) != (SELECT arraySort(groupArray(nm)) FROM dst)) FORMAT Null;
SELECT throwIf((SELECT arraySort(groupArray(nm)) FROM mv) != (SELECT arraySort(groupArray(n*m)) FROM src)) FORMAT Null;
COMMIT;
"
$CLICKHOUSE_CLIENT --query "SELECT count(), sum(n), sum(m=1), sum(m=2), sum(m=3) FROM src"
$CLICKHOUSE_CLIENT --query "SELECT count(), sum(nm) FROM mv"
$CLICKHOUSE_CLIENT --multiquery --query "
SELECT throwIf((SELECT (sum(n), count() % 2) FROM src) != (0, 1)) FORMAT Null;
SELECT throwIf((SELECT (sum(nm), count() % 2) FROM mv) != (0, 1)) FORMAT Null;
SELECT throwIf((SELECT (sum(nm), count() % 2) FROM dst) != (0, 1)) FORMAT Null;
SELECT throwIf((SELECT arraySort(groupArray(nm)) FROM mv) != (SELECT arraySort(groupArray(nm)) FROM dst)) FORMAT Null;
SELECT throwIf((SELECT arraySort(groupArray(nm)) FROM mv) != (SELECT arraySort(groupArray(n*m)) FROM src)) FORMAT Null;
"
$CLICKHOUSE_CLIENT --query "DROP TABLE src";
$CLICKHOUSE_CLIENT --query "DROP TABLE dst";

View File

@ -1,9 +1,9 @@
#!/usr/bin/env bash
# Tags: no-random-settings
unset CLICKHOUSE_LOG_COMMENT
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# reset --log_comment, because the test has to use the readonly mode
CLICKHOUSE_LOG_COMMENT=
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh

View File

@ -32,10 +32,10 @@
1
1
0
SELECT count() AS `count()`
SELECT count()
FROM constraint_test_constants
WHERE (b > 100) OR (c > 100)
SELECT count() AS `count()`
SELECT count()
FROM constraint_test_constants
WHERE c > 100
QUERY id: 0
@ -53,7 +53,7 @@ QUERY id: 0
COLUMN id: 6, column_name: c, result_type: Int64, source_id: 3
CONSTANT id: 7, constant_value: UInt64_100, constant_value_type: UInt8
SETTINGS allow_experimental_analyzer=1
SELECT count() AS `count()`
SELECT count()
FROM constraint_test_constants
WHERE c > 100
QUERY id: 0
@ -71,7 +71,7 @@ QUERY id: 0
COLUMN id: 6, column_name: c, result_type: Int64, source_id: 3
CONSTANT id: 7, constant_value: UInt64_100, constant_value_type: UInt8
SETTINGS allow_experimental_analyzer=1
SELECT count() AS `count()`
SELECT count()
FROM constraint_test_constants
QUERY id: 0
PROJECTION COLUMNS

View File

@ -1,6 +1,6 @@
SELECT
(b AS `cityHash64(a)`) + 10 AS `plus(cityHash64(a), 10)`,
(b AS b) + 3 AS `plus(b, 3)`
(b AS `cityHash64(a)`) + 10,
(b AS b) + 3
FROM column_swap_test_test
WHERE b = 1
QUERY id: 0
@ -59,8 +59,8 @@ QUERY id: 0
CONSTANT id: 14, constant_value: UInt64_1, constant_value_type: UInt8
SETTINGS allow_experimental_analyzer=1
SELECT
(b AS `cityHash64(a)`) + 10 AS `plus(cityHash64(a), 10)`,
(b AS b) + 3 AS `plus(b, 3)`
(b AS `cityHash64(a)`) + 10,
(b AS b) + 3
FROM column_swap_test_test
WHERE b = 0
QUERY id: 0
@ -89,8 +89,8 @@ QUERY id: 0
CONSTANT id: 14, constant_value: UInt64_0, constant_value_type: UInt8
SETTINGS allow_experimental_analyzer=1
SELECT
(b AS `cityHash64(a)`) + 10 AS `plus(cityHash64(a), 10)`,
(b AS b) + 3 AS `plus(b, 3)`
(b AS `cityHash64(a)`) + 10,
(b AS b) + 3
FROM column_swap_test_test
WHERE b = 0
QUERY id: 0
@ -119,8 +119,8 @@ QUERY id: 0
CONSTANT id: 14, constant_value: UInt64_0, constant_value_type: UInt8
SETTINGS allow_experimental_analyzer=1
SELECT
(b AS `cityHash64(a)`) + 10 AS `plus(cityHash64(a), 10)`,
(b AS b) + 3 AS `plus(b, 3)`
(b AS `cityHash64(a)`) + 10,
(b AS b) + 3
FROM column_swap_test_test
WHERE b = 1
QUERY id: 0
@ -148,7 +148,7 @@ QUERY id: 0
COLUMN id: 13, column_name: b, result_type: UInt64, source_id: 5
CONSTANT id: 14, constant_value: UInt64_1, constant_value_type: UInt8
SETTINGS allow_experimental_analyzer=1
SELECT (b AS `cityHash64(a)`) + 10 AS `plus(cityHash64(a), 10)`
SELECT (b AS `cityHash64(a)`) + 10
FROM column_swap_test_test
WHERE b = 0
QUERY id: 0
@ -171,8 +171,8 @@ QUERY id: 0
CONSTANT id: 10, constant_value: UInt64_0, constant_value_type: UInt8
SETTINGS allow_experimental_analyzer=1
SELECT
(cityHash64(a) AS `cityHash64(a)`) + 10 AS `plus(cityHash64(a), 10)`,
a AS a
(cityHash64(a) AS `cityHash64(a)`) + 10,
a
FROM column_swap_test_test
WHERE cityHash64(a) = 0
QUERY id: 0
@ -203,8 +203,8 @@ QUERY id: 0
CONSTANT id: 15, constant_value: UInt64_0, constant_value_type: UInt8
SETTINGS allow_experimental_analyzer=1
SELECT
(cityHash64(a) AS b) + 10 AS `plus(b, 10)`,
a AS a
(cityHash64(a) AS b) + 10,
a
FROM column_swap_test_test
WHERE cityHash64(a) = 0
QUERY id: 0

View File

@ -10,18 +10,38 @@ DEBUG_LOG = os.path.join(
os.path.basename(os.path.abspath(__file__)).strip(".python") + ".debuglog",
)
STATE_MAP = {
-1: "process did not start",
0: "completion was found",
1: "process started and said ':)'",
2: "completion search was started",
3: "completion is missing",
}
def run_with_timeout(func, args, timeout):
process = multiprocessing.Process(target=func, args=args)
process.start()
process.join(timeout)
for _ in range(5):
state = multiprocessing.Value("i", -1)
process = multiprocessing.Process(
target=func, args=args, kwargs={"state": state}
)
process.start()
process.join(timeout)
if process.is_alive():
process.terminate()
print("Timeout")
if state.value in (0, 3):
return
if process.is_alive():
process.terminate()
if state.value == -1:
continue
print(f"Timeout, state: {STATE_MAP[state.value]}")
return
def test_completion(program, argv, comp_word):
def test_completion(program, argv, comp_word, state=None):
comp_begin = comp_word[:-3]
shell_pid, master = pty.fork()
@ -41,6 +61,8 @@ def test_completion(program, argv, comp_word):
debug_log_fd.write(repr(output_b) + "\n")
debug_log_fd.flush()
state.value = 1
os.write(master, b"SET " + bytes(comp_begin.encode()))
output_b = os.read(master, 4096)
output = output_b.decode()
@ -55,23 +77,28 @@ def test_completion(program, argv, comp_word):
time.sleep(0.01)
os.write(master, b"\t")
state.value = 2
output_b = os.read(master, 4096)
output = output_b.decode()
debug_log_fd.write(repr(output_b) + "\n")
debug_log_fd.flush()
# fail fast if there is a bell character in the output,
# meaning no concise completion is found
if "\x07" in output:
print(f"{comp_word}: FAIL")
return
while not comp_word in output:
# fail fast if there is a bell character in the output,
# meaning no concise completion is found
if "\x07" in output:
print(f"{comp_word}: FAIL")
state.value = 3
return
output_b = os.read(master, 4096)
output += output_b.decode()
debug_log_fd.write(repr(output_b) + "\n")
debug_log_fd.flush()
print(f"{comp_word}: OK")
state.value = 0
finally:
os.close(master)
debug_log_fd.close()

View File

@ -1,4 +1,6 @@
#!/usr/bin/expect -f
# Tags: no-debug, no-tsan, no-msan, no-asan, no-ubsan, no-s3-storage
# ^ it can be slower than 60 seconds
# This is the regression for the concurrent access in ProgressIndication,
# so it is important to read enough rows here (10e6).

View File

@ -1,5 +1,9 @@
#!/usr/bin/env bash
# Tags: no-parallel
# no-parallel: This test is not parallel because when we execute system-wide SYSTEM DROP REPLICA,
# other tests might shut down the storage in parallel and the test will fail.
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh

View File

@ -1,6 +1,7 @@
#!/usr/bin/env bash
# Tags: no-ubsan, no-fasttest, no-tsan
# Tags: no-ubsan, no-fasttest, no-tsan, no-msan, no-asan
# It is too slow under TSan
# It eats too much memory under ASan or MSan
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh

View File

@ -5,4 +5,11 @@ CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CUR_DIR"/../shell_config.sh
$CLICKHOUSE_BENCHMARK -q 'select throwIf(1)' |& grep '^An error occurred while processing the query.*Exception:' -c
$CLICKHOUSE_BENCHMARK --max-consecutive-errors 10 -q 'select throwIf(1)' |& grep '^An error occurred while processing the query.*Exception:' -c
RES=$($CLICKHOUSE_BENCHMARK --max-consecutive-errors 10 -q 'select throwIf(1)' |& tee "${CLICKHOUSE_TMP}/${CLICKHOUSE_DATABASE}.log" | grep '^An error occurred while processing the query.*Exception:' -c)
if [ "$RES" -eq 10 ]
then
echo "$RES"
else
cat "${CLICKHOUSE_TMP}/${CLICKHOUSE_DATABASE}.log"
fi

View File

@ -1,34 +1,34 @@
sessions:
150
45
port_0_sessions:
0
address_0_sessions:
0
tcp_sessions
60
18
http_sessions
30
9
http_with_session_id_sessions
30
my_sql_sessions
30
9
mysql_sessions
9
Corresponding LoginSuccess/Logout
10
3
LoginFailure
10
3
Corresponding LoginSuccess/Logout
10
3
LoginFailure
10
3
Corresponding LoginSuccess/Logout
10
3
LoginFailure
10
3
Corresponding LoginSuccess/Logout
10
3
LoginFailure
10
3
Corresponding LoginSuccess/Logout
10
3
LoginFailure
10
3

View File

@ -37,11 +37,11 @@ done
# These functions try to create a session with successful login and logout.
# Sleep a small, random amount of time to make concurrency more intense.
# and try to login with an invalid password.
function tcp_session()
function tcp_session()
{
local user=$1
local i=0
while (( (i++) < 10 )); do
while (( (i++) < 3 )); do
# login logout
${CLICKHOUSE_CLIENT} -q "SELECT 1, sleep(0.01${RANDOM})" --user="${user}" --password="pass"
# login failure
@ -49,11 +49,11 @@ function tcp_session()
done
}
function http_session()
function http_session()
{
local user=$1
local i=0
while (( (i++) < 10 )); do
while (( (i++) < 3 )); do
# login logout
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&user=${user}&password=pass" -d "SELECT 3, sleep(0.01${RANDOM})"
@ -62,11 +62,11 @@ function http_session()
done
}
function http_with_session_id_session()
function http_with_session_id_session()
{
local user=$1
local i=0
while (( (i++) < 10 )); do
while (( (i++) < 3 )); do
# login logout
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&session_id=${user}&user=${user}&password=pass" -d "SELECT 5, sleep 0.01${RANDOM}"
@ -75,11 +75,11 @@ function http_with_session_id_session()
done
}
function mysql_session()
function mysql_session()
{
local user=$1
local i=0
while (( (i++) < 10 )); do
while (( (i++) < 3 )); do
# login logout
${CLICKHOUSE_CLIENT} -q "SELECT 1, sleep(0.01${RANDOM}) FROM mysql('127.0.0.1:9004', 'system', 'one', '${user}', 'pass')"
@ -97,29 +97,29 @@ export -f http_with_session_id_session;
export -f mysql_session;
for user in "${TCP_USERS[@]}"; do
timeout 60s bash -c "tcp_session ${user}" >/dev/null 2>&1 &
tcp_session ${user} >/dev/null 2>&1 &
done
for user in "${HTTP_USERS[@]}"; do
timeout 60s bash -c "http_session ${user}" >/dev/null 2>&1 &
http_session ${user} >/dev/null 2>&1 &
done
for user in "${HTTP_WITH_SESSION_ID_SESSION_USERS[@]}"; do
timeout 60s bash -c "http_with_session_id_session ${user}" >/dev/null 2>&1 &
http_with_session_id_session ${user} >/dev/null 2>&1 &
done
for user in "${MYSQL_USERS[@]}"; do
timeout 60s bash -c "mysql_session ${user}" >/dev/null 2>&1 &
mysql_session ${user} >/dev/null 2>&1 &
done
wait
${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS"
echo "sessions:"
echo "sessions:"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${ALL_USERS_SQL_COLLECTION_STRING})"
echo "port_0_sessions:"
echo "port_0_sessions:"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${ALL_USERS_SQL_COLLECTION_STRING}) AND client_port = 0"
echo "address_0_sessions:"
@ -131,13 +131,13 @@ echo "http_sessions"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${HTTP_USERS_SQL_COLLECTION_STRING}) AND interface = 'HTTP'"
echo "http_with_session_id_sessions"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${HTTP_WITH_SESSION_ID_USERS_SQL_COLLECTION_STRING}) AND interface = 'HTTP'"
echo "my_sql_sessions"
echo "mysql_sessions"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${MYSQL_USERS_SQL_COLLECTION_STRING}) AND interface = 'MySQL'"
for user in "${ALL_USERS[@]}"; do
${CLICKHOUSE_CLIENT} -q "DROP USER ${user}"
echo "Corresponding LoginSuccess/Logout"
echo "Corresponding LoginSuccess/Logout"
${CLICKHOUSE_CLIENT} -q "SELECT COUNT(*) FROM (SELECT ${SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = '${user}' AND type = 'LoginSuccess' INTERSECT SELECT ${SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = '${user}' AND type = 'Logout')"
echo "LoginFailure"
${CLICKHOUSE_CLIENT} -q "SELECT COUNT(*) FROM system.session_log WHERE user = '${user}' AND type = 'LoginFailure'"
${CLICKHOUSE_CLIENT} -q "SELECT COUNT(*) FROM system.session_log WHERE user = '${user}' AND type = 'LoginFailure'"
done

View File

@ -11,11 +11,23 @@ $CLICKHOUSE_CLIENT -nm -q "
CREATE TABLE $database_name.02911_backup_restore_keeper_map1 (key UInt64, value String) Engine=KeeperMap('/' || currentDatabase() || '/test02911') PRIMARY KEY key;
CREATE TABLE $database_name.02911_backup_restore_keeper_map2 (key UInt64, value String) Engine=KeeperMap('/' || currentDatabase() || '/test02911') PRIMARY KEY key; -- table using same Keeper path as 02911_backup_restore_keeper_map1
CREATE TABLE $database_name.02911_backup_restore_keeper_map3 (key UInt64, value String) Engine=KeeperMap('/' || currentDatabase() || '/test02911_different') PRIMARY KEY key;
INSERT INTO $database_name.02911_backup_restore_keeper_map2 SELECT number, 'test' || toString(number) FROM system.numbers LIMIT 5000;
INSERT INTO $database_name.02911_backup_restore_keeper_map3 SELECT number, 'test' || toString(number) FROM system.numbers LIMIT 3000;
"
# KeeperMap table engine doesn't have internal retries for interaction with Keeper. Do it on our own, otherwise tests with overloaded server can be flaky.
while true
do
$CLICKHOUSE_CLIENT -nm -q "INSERT INTO $database_name.02911_backup_restore_keeper_map2 SELECT number, 'test' || toString(number) FROM system.numbers LIMIT 5000;
" | grep -q "KEEPER_EXCEPTION" && sleep 1 && continue
break
done
while true
do
$CLICKHOUSE_CLIENT -nm -q "INSERT INTO $database_name.02911_backup_restore_keeper_map3 SELECT number, 'test' || toString(number) FROM system.numbers LIMIT 3000;
" | grep -q "KEEPER_EXCEPTION" && sleep 1 && continue
break
done
backup_path="$database_name"
for i in $(seq 1 3); do
$CLICKHOUSE_CLIENT -q "SELECT count() FROM $database_name.02911_backup_restore_keeper_map$i;"
@ -45,4 +57,4 @@ for i in $(seq 1 3); do
$CLICKHOUSE_CLIENT -q "SELECT count() FROM $database_name.02911_backup_restore_keeper_map$i;"
done
$CLICKHOUSE_CLIENT -q "DROP DATABASE $database_name SYNC;"
$CLICKHOUSE_CLIENT -q "DROP DATABASE $database_name SYNC;"

View File

@ -1,7 +1,7 @@
#!/usr/bin/env bash
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# reset --log_comment
# reset --log_comment, because the test has to set its own
CLICKHOUSE_LOG_COMMENT=
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh

View File

@ -2,8 +2,6 @@
# Tags: long
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# reset --log_comment
CLICKHOUSE_LOG_COMMENT=
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh

View File

@ -2,8 +2,6 @@
# Tags: long
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# reset --log_comment
CLICKHOUSE_LOG_COMMENT=
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh

View File

@ -2,8 +2,6 @@
# Tags: long
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# reset --log_comment
CLICKHOUSE_LOG_COMMENT=
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh

View File

@ -2,8 +2,6 @@
# Tags: long
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# reset --log_comment
CLICKHOUSE_LOG_COMMENT=
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh

View File

@ -2,8 +2,6 @@
# Tags: long
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# reset --log_comment
CLICKHOUSE_LOG_COMMENT=
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh

View File

@ -2,8 +2,6 @@
# Tags: long
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# reset --log_comment
CLICKHOUSE_LOG_COMMENT=
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
@ -34,4 +32,3 @@ echo "MergeTree wide"
$CH_CLIENT -q "create table test (id UInt64, v Variant(UInt64, Array(Variant(String, UInt64)))) engine=MergeTree order by id settings min_rows_for_wide_part=1, min_bytes_for_wide_part=1;"
test
$CH_CLIENT -q "drop table test;"

Some files were not shown because too many files have changed in this diff Show More