Merge branch 'master' into autodetect-config-format

This commit is contained in:
sakulali 2024-07-29 21:27:25 +08:00
commit 7d36cfeeaf
163 changed files with 2790 additions and 873 deletions

View File

@ -93,21 +93,21 @@ jobs:
with: with:
stage: Builds_2 stage: Builds_2
data: ${{ needs.RunConfig.outputs.data }} data: ${{ needs.RunConfig.outputs.data }}
Tests_2: Tests_2_ww:
needs: [RunConfig, Builds_2] needs: [RunConfig, Builds_2]
if: ${{ !failure() && !cancelled() && contains(fromJson(needs.RunConfig.outputs.data).stages_data.stages_to_do, 'Tests_2_ww') }}
uses: ./.github/workflows/reusable_test_stage.yml
with:
stage: Tests_2_ww
data: ${{ needs.RunConfig.outputs.data }}
Tests_2:
# Test_3 should not wait for Test_1/Test_2 and should not be blocked by them on master branch since all jobs need to run there.
needs: [RunConfig, Builds_1]
if: ${{ !failure() && !cancelled() && contains(fromJson(needs.RunConfig.outputs.data).stages_data.stages_to_do, 'Tests_2') }} if: ${{ !failure() && !cancelled() && contains(fromJson(needs.RunConfig.outputs.data).stages_data.stages_to_do, 'Tests_2') }}
uses: ./.github/workflows/reusable_test_stage.yml uses: ./.github/workflows/reusable_test_stage.yml
with: with:
stage: Tests_2 stage: Tests_2
data: ${{ needs.RunConfig.outputs.data }} data: ${{ needs.RunConfig.outputs.data }}
Tests_3:
# Test_3 should not wait for Test_1/Test_2 and should not be blocked by them on master branch since all jobs need to run there.
needs: [RunConfig, Builds_1]
if: ${{ !failure() && !cancelled() && contains(fromJson(needs.RunConfig.outputs.data).stages_data.stages_to_do, 'Tests_3') }}
uses: ./.github/workflows/reusable_test_stage.yml
with:
stage: Tests_3
data: ${{ needs.RunConfig.outputs.data }}
################################# Reports ################################# ################################# Reports #################################
# Reports should run even if Builds_1/2 fail - run them separately, not in Tests_1/2/3 # Reports should run even if Builds_1/2 fail - run them separately, not in Tests_1/2/3
@ -123,7 +123,7 @@ jobs:
FinishCheck: FinishCheck:
if: ${{ !cancelled() }} if: ${{ !cancelled() }}
needs: [RunConfig, Builds_1, Builds_2, Builds_Report, Tests_1, Tests_2, Tests_3] needs: [RunConfig, Builds_1, Builds_2, Builds_Report, Tests_1, Tests_2_ww, Tests_2]
runs-on: [self-hosted, style-checker-aarch64] runs-on: [self-hosted, style-checker-aarch64]
steps: steps:
- name: Check out repository code - name: Check out repository code
@ -133,6 +133,7 @@ jobs:
cd "$GITHUB_WORKSPACE/tests/ci" cd "$GITHUB_WORKSPACE/tests/ci"
python3 finish_check.py --wf-status ${{ contains(needs.*.result, 'failure') && 'failure' || 'success' }} python3 finish_check.py --wf-status ${{ contains(needs.*.result, 'failure') && 'failure' || 'success' }}
- name: Check Workflow results - name: Check Workflow results
if: ${{ !cancelled() }}
run: | run: |
export WORKFLOW_RESULT_FILE="/tmp/workflow_results.json" export WORKFLOW_RESULT_FILE="/tmp/workflow_results.json"
cat > "$WORKFLOW_RESULT_FILE" << 'EOF' cat > "$WORKFLOW_RESULT_FILE" << 'EOF'

View File

@ -123,20 +123,20 @@ jobs:
stage: Builds_2 stage: Builds_2
data: ${{ needs.RunConfig.outputs.data }} data: ${{ needs.RunConfig.outputs.data }}
# stage for running non-required checks without being blocked by required checks (Test_1) if corresponding settings is selected # stage for running non-required checks without being blocked by required checks (Test_1) if corresponding settings is selected
Tests_2: Tests_2_ww:
needs: [RunConfig, Builds_1] needs: [RunConfig, Builds_1]
if: ${{ !failure() && !cancelled() && contains(fromJson(needs.RunConfig.outputs.data).stages_data.stages_to_do, 'Tests_2_ww') }}
uses: ./.github/workflows/reusable_test_stage.yml
with:
stage: Tests_2_ww
data: ${{ needs.RunConfig.outputs.data }}
Tests_2:
needs: [RunConfig, Builds_1, Tests_1]
if: ${{ !failure() && !cancelled() && contains(fromJson(needs.RunConfig.outputs.data).stages_data.stages_to_do, 'Tests_2') }} if: ${{ !failure() && !cancelled() && contains(fromJson(needs.RunConfig.outputs.data).stages_data.stages_to_do, 'Tests_2') }}
uses: ./.github/workflows/reusable_test_stage.yml uses: ./.github/workflows/reusable_test_stage.yml
with: with:
stage: Tests_2 stage: Tests_2
data: ${{ needs.RunConfig.outputs.data }} data: ${{ needs.RunConfig.outputs.data }}
Tests_3:
needs: [RunConfig, Builds_1, Tests_1]
if: ${{ !failure() && !cancelled() && contains(fromJson(needs.RunConfig.outputs.data).stages_data.stages_to_do, 'Tests_3') }}
uses: ./.github/workflows/reusable_test_stage.yml
with:
stage: Tests_3
data: ${{ needs.RunConfig.outputs.data }}
################################# Reports ################################# ################################# Reports #################################
# Reports should run even if Builds_1/2 fail - run them separately (not in Tests_1/2/3) # Reports should run even if Builds_1/2 fail - run them separately (not in Tests_1/2/3)
@ -154,7 +154,7 @@ jobs:
if: ${{ !cancelled() }} if: ${{ !cancelled() }}
# Test_2 or Test_3 do not have the jobs required for Mergeable check, # Test_2 or Test_3 do not have the jobs required for Mergeable check,
# however, set them as "needs" to get all checks results before the automatic merge occurs. # however, set them as "needs" to get all checks results before the automatic merge occurs.
needs: [RunConfig, BuildDockers, StyleCheck, FastTest, Builds_1, Builds_2, Builds_Report, Tests_1, Tests_2, Tests_3] needs: [RunConfig, BuildDockers, StyleCheck, FastTest, Builds_1, Builds_2, Builds_Report, Tests_1, Tests_2_ww, Tests_2]
runs-on: [self-hosted, style-checker-aarch64] runs-on: [self-hosted, style-checker-aarch64]
steps: steps:
- name: Check out repository code - name: Check out repository code
@ -178,7 +178,7 @@ jobs:
# #
FinishCheck: FinishCheck:
if: ${{ !failure() && !cancelled() }} if: ${{ !failure() && !cancelled() }}
needs: [RunConfig, BuildDockers, StyleCheck, FastTest, Builds_1, Builds_2, Builds_Report, Tests_1, Tests_2, Tests_3] needs: [RunConfig, BuildDockers, StyleCheck, FastTest, Builds_1, Builds_2, Builds_Report, Tests_1, Tests_2_ww, Tests_2]
runs-on: [self-hosted, style-checker-aarch64] runs-on: [self-hosted, style-checker-aarch64]
steps: steps:
- name: Check out repository code - name: Check out repository code

View File

@ -54,7 +54,7 @@ CREATE TABLE keeper_map_table
`v2` String, `v2` String,
`v3` Float32 `v3` Float32
) )
ENGINE = KeeperMap(/keeper_map_table, 4) ENGINE = KeeperMap('/keeper_map_table', 4)
PRIMARY KEY key PRIMARY KEY key
``` ```

File diff suppressed because it is too large Load Diff

View File

@ -36,7 +36,7 @@ These actions are described in detail below.
ADD COLUMN [IF NOT EXISTS] name [type] [default_expr] [codec] [AFTER name_after | FIRST] ADD COLUMN [IF NOT EXISTS] name [type] [default_expr] [codec] [AFTER name_after | FIRST]
``` ```
Adds a new column to the table with the specified `name`, `type`, [`codec`](../create/table.md/#codecs) and `default_expr` (see the section [Default expressions](/docs/en/sql-reference/statements/create/table.md/#create-default-values)). Adds a new column to the table with the specified `name`, `type`, [`codec`](../create/table.md/#column_compression_codec) and `default_expr` (see the section [Default expressions](/docs/en/sql-reference/statements/create/table.md/#create-default-values)).
If the `IF NOT EXISTS` clause is included, the query wont return an error if the column already exists. If you specify `AFTER name_after` (the name of another column), the column is added after the specified one in the list of table columns. If you want to add a column to the beginning of the table use the `FIRST` clause. Otherwise, the column is added to the end of the table. For a chain of actions, `name_after` can be the name of a column that is added in one of the previous actions. If the `IF NOT EXISTS` clause is included, the query wont return an error if the column already exists. If you specify `AFTER name_after` (the name of another column), the column is added after the specified one in the list of table columns. If you want to add a column to the beginning of the table use the `FIRST` clause. Otherwise, the column is added to the end of the table. For a chain of actions, `name_after` can be the name of a column that is added in one of the previous actions.
@ -155,7 +155,7 @@ This query changes the `name` column properties:
- Column-level Settings - Column-level Settings
For examples of columns compression CODECS modifying, see [Column Compression Codecs](../create/table.md/#codecs). For examples of columns compression CODECS modifying, see [Column Compression Codecs](../create/table.md/#column_compression_codec).
For examples of columns TTL modifying, see [Column TTL](/docs/en/engines/table-engines/mergetree-family/mergetree.md/#mergetree-column-ttl). For examples of columns TTL modifying, see [Column TTL](/docs/en/engines/table-engines/mergetree-family/mergetree.md/#mergetree-column-ttl).

View File

@ -209,8 +209,8 @@ std::vector<String> Client::loadWarningMessages()
{} /* query_parameters */, {} /* query_parameters */,
"" /* query_id */, "" /* query_id */,
QueryProcessingStage::Complete, QueryProcessingStage::Complete,
&global_context->getSettingsRef(), &client_context->getSettingsRef(),
&global_context->getClientInfo(), false, {}); &client_context->getClientInfo(), false, {});
while (true) while (true)
{ {
Packet packet = connection->receivePacket(); Packet packet = connection->receivePacket();
@ -306,9 +306,6 @@ void Client::initialize(Poco::Util::Application & self)
if (env_password && !config().has("password")) if (env_password && !config().has("password"))
config().setString("password", env_password); config().setString("password", env_password);
// global_context->setApplicationType(Context::ApplicationType::CLIENT);
global_context->setQueryParameters(query_parameters);
/// settings and limits could be specified in config file, but passed settings has higher priority /// settings and limits could be specified in config file, but passed settings has higher priority
for (const auto & setting : global_context->getSettingsRef().allUnchanged()) for (const auto & setting : global_context->getSettingsRef().allUnchanged())
{ {
@ -382,7 +379,7 @@ try
showWarnings(); showWarnings();
/// Set user password complexity rules /// Set user password complexity rules
auto & access_control = global_context->getAccessControl(); auto & access_control = client_context->getAccessControl();
access_control.setPasswordComplexityRules(connection->getPasswordComplexityRules()); access_control.setPasswordComplexityRules(connection->getPasswordComplexityRules());
if (is_interactive && !delayed_interactive) if (is_interactive && !delayed_interactive)
@ -459,7 +456,7 @@ void Client::connect()
<< connection_parameters.host << ":" << connection_parameters.port << connection_parameters.host << ":" << connection_parameters.port
<< (!connection_parameters.user.empty() ? " as user " + connection_parameters.user : "") << "." << std::endl; << (!connection_parameters.user.empty() ? " as user " + connection_parameters.user : "") << "." << std::endl;
connection = Connection::createConnection(connection_parameters, global_context); connection = Connection::createConnection(connection_parameters, client_context);
if (max_client_network_bandwidth) if (max_client_network_bandwidth)
{ {
@ -528,7 +525,7 @@ void Client::connect()
} }
} }
if (!global_context->getSettingsRef().use_client_time_zone) if (!client_context->getSettingsRef().use_client_time_zone)
{ {
const auto & time_zone = connection->getServerTimezone(connection_parameters.timeouts); const auto & time_zone = connection->getServerTimezone(connection_parameters.timeouts);
if (!time_zone.empty()) if (!time_zone.empty())
@ -611,7 +608,7 @@ void Client::printChangedSettings() const
} }
}; };
print_changes(global_context->getSettingsRef().changes(), "settings"); print_changes(client_context->getSettingsRef().changes(), "settings");
print_changes(cmd_merge_tree_settings.changes(), "MergeTree settings"); print_changes(cmd_merge_tree_settings.changes(), "MergeTree settings");
} }
@ -709,7 +706,7 @@ bool Client::processWithFuzzing(const String & full_query)
{ {
const char * begin = full_query.data(); const char * begin = full_query.data();
orig_ast = parseQuery(begin, begin + full_query.size(), orig_ast = parseQuery(begin, begin + full_query.size(),
global_context->getSettingsRef(), client_context->getSettingsRef(),
/*allow_multi_statements=*/ true); /*allow_multi_statements=*/ true);
} }
catch (const Exception & e) catch (const Exception & e)
@ -733,7 +730,7 @@ bool Client::processWithFuzzing(const String & full_query)
} }
// Kusto is not a subject for fuzzing (yet) // Kusto is not a subject for fuzzing (yet)
if (global_context->getSettingsRef().dialect == DB::Dialect::kusto) if (client_context->getSettingsRef().dialect == DB::Dialect::kusto)
{ {
return true; return true;
} }
@ -1166,6 +1163,11 @@ void Client::processOptions(const OptionsDescription & options_description,
if (options.count("opentelemetry-tracestate")) if (options.count("opentelemetry-tracestate"))
global_context->getClientTraceContext().tracestate = options["opentelemetry-tracestate"].as<std::string>(); global_context->getClientTraceContext().tracestate = options["opentelemetry-tracestate"].as<std::string>();
/// In case of clickhouse-client the `client_context` can be just an alias for the `global_context`.
/// (There is no need to copy the context because clickhouse-client has no background tasks so it won't use that context in parallel.)
client_context = global_context;
initClientContext();
} }
@ -1205,11 +1207,6 @@ void Client::processConfig()
pager = config().getString("pager", ""); pager = config().getString("pager", "");
setDefaultFormatsAndCompressionFromConfiguration(); setDefaultFormatsAndCompressionFromConfiguration();
global_context->setClientName(std::string(DEFAULT_CLIENT_NAME));
global_context->setQueryKindInitial();
global_context->setQuotaClientKey(config().getString("quota_key", ""));
global_context->setQueryKind(query_kind);
} }

View File

@ -16,7 +16,6 @@ public:
int main(const std::vector<String> & /*args*/) override; int main(const std::vector<String> & /*args*/) override;
protected: protected:
Poco::Util::LayeredConfiguration & getClientConfiguration() override; Poco::Util::LayeredConfiguration & getClientConfiguration() override;
bool processWithFuzzing(const String & full_query) override; bool processWithFuzzing(const String & full_query) override;

View File

@ -295,6 +295,8 @@ void LocalServer::cleanup()
if (suggest) if (suggest)
suggest.reset(); suggest.reset();
client_context.reset();
if (global_context) if (global_context)
{ {
global_context->shutdown(); global_context->shutdown();
@ -436,7 +438,7 @@ void LocalServer::connect()
in = input.get(); in = input.get();
} }
connection = LocalConnection::createConnection( connection = LocalConnection::createConnection(
connection_parameters, global_context, in, need_render_progress, need_render_profile_events, server_display_name); connection_parameters, client_context, in, need_render_progress, need_render_profile_events, server_display_name);
} }
@ -497,8 +499,6 @@ try
initTTYBuffer(toProgressOption(getClientConfiguration().getString("progress", "default"))); initTTYBuffer(toProgressOption(getClientConfiguration().getString("progress", "default")));
ASTAlterCommand::setFormatAlterCommandsWithParentheses(true); ASTAlterCommand::setFormatAlterCommandsWithParentheses(true);
applyCmdSettings(global_context);
/// try to load user defined executable functions, throw on error and die /// try to load user defined executable functions, throw on error and die
try try
{ {
@ -510,6 +510,11 @@ try
throw; throw;
} }
/// Must be called after we stopped initializing the global context and changing its settings.
/// After this point the global context must be stayed almost unchanged till shutdown,
/// and all necessary changes must be made to the client context instead.
createClientContext();
if (is_interactive) if (is_interactive)
{ {
clearTerminal(); clearTerminal();
@ -735,6 +740,9 @@ void LocalServer::processConfig()
/// Load global settings from default_profile and system_profile. /// Load global settings from default_profile and system_profile.
global_context->setDefaultProfiles(getClientConfiguration()); global_context->setDefaultProfiles(getClientConfiguration());
/// Command-line parameters can override settings from the default profile.
applyCmdSettings(global_context);
/// We load temporary database first, because projections need it. /// We load temporary database first, because projections need it.
DatabaseCatalog::instance().initializeAndLoadTemporaryDatabase(); DatabaseCatalog::instance().initializeAndLoadTemporaryDatabase();
@ -778,10 +786,6 @@ void LocalServer::processConfig()
server_display_name = getClientConfiguration().getString("display_name", ""); server_display_name = getClientConfiguration().getString("display_name", "");
prompt_by_server_display_name = getClientConfiguration().getRawString("prompt_by_server_display_name.default", ":) "); prompt_by_server_display_name = getClientConfiguration().getRawString("prompt_by_server_display_name.default", ":) ");
global_context->setQueryKindInitial();
global_context->setQueryKind(query_kind);
global_context->setQueryParameters(query_parameters);
} }
@ -860,6 +864,16 @@ void LocalServer::applyCmdOptions(ContextMutablePtr context)
} }
void LocalServer::createClientContext()
{
/// In case of clickhouse-local it's necessary to use a separate context for client-related purposes.
/// We can't just change the global context because it is used in background tasks (for example, in merges)
/// which don't expect that the global context can suddenly change.
client_context = Context::createCopy(global_context);
initClientContext();
}
void LocalServer::processOptions(const OptionsDescription &, const CommandLineOptions & options, const std::vector<Arguments> &, const std::vector<Arguments> &) void LocalServer::processOptions(const OptionsDescription &, const CommandLineOptions & options, const std::vector<Arguments> &, const std::vector<Arguments> &)
{ {
if (options.count("table")) if (options.count("table"))

View File

@ -31,7 +31,6 @@ public:
int main(const std::vector<String> & /*args*/) override; int main(const std::vector<String> & /*args*/) override;
protected: protected:
Poco::Util::LayeredConfiguration & getClientConfiguration() override; Poco::Util::LayeredConfiguration & getClientConfiguration() override;
void connect() override; void connect() override;
@ -50,7 +49,6 @@ protected:
void processConfig() override; void processConfig() override;
void readArguments(int argc, char ** argv, Arguments & common_arguments, std::vector<Arguments> &, std::vector<Arguments> &) override; void readArguments(int argc, char ** argv, Arguments & common_arguments, std::vector<Arguments> &, std::vector<Arguments> &) override;
void updateLoggerLevel(const String & logs_level) override; void updateLoggerLevel(const String & logs_level) override;
private: private:
@ -67,6 +65,8 @@ private:
void applyCmdOptions(ContextMutablePtr context); void applyCmdOptions(ContextMutablePtr context);
void applyCmdSettings(ContextMutablePtr context); void applyCmdSettings(ContextMutablePtr context);
void createClientContext();
ServerSettings server_settings; ServerSettings server_settings;
std::optional<StatusFile> status; std::optional<StatusFile> status;

View File

@ -0,0 +1,13 @@
<clickhouse>
<storage_configuration>
<disks>
<backups>
<type>local</type>
<path>/tmp/backups/</path>
</backups>
</disks>
</storage_configuration>
<backups>
<allowed_disk>backups</allowed_disk>
</backups>
</clickhouse>

View File

@ -0,0 +1 @@
../../../tests/config/config.d/enable_keeper_map.xml

View File

@ -0,0 +1 @@
../../../tests/config/config.d/session_log.xml

View File

@ -268,6 +268,8 @@ QueryTreeNodePtr QueryTreeBuilder::buildSelectExpression(const ASTPtr & select_q
} }
} }
const auto enable_order_by_all = updated_context->getSettingsRef().enable_order_by_all;
auto current_query_tree = std::make_shared<QueryNode>(std::move(updated_context), std::move(settings_changes)); auto current_query_tree = std::make_shared<QueryNode>(std::move(updated_context), std::move(settings_changes));
current_query_tree->setIsSubquery(is_subquery); current_query_tree->setIsSubquery(is_subquery);
@ -281,7 +283,10 @@ QueryTreeNodePtr QueryTreeBuilder::buildSelectExpression(const ASTPtr & select_q
current_query_tree->setIsGroupByWithRollup(select_query_typed.group_by_with_rollup); current_query_tree->setIsGroupByWithRollup(select_query_typed.group_by_with_rollup);
current_query_tree->setIsGroupByWithGroupingSets(select_query_typed.group_by_with_grouping_sets); current_query_tree->setIsGroupByWithGroupingSets(select_query_typed.group_by_with_grouping_sets);
current_query_tree->setIsGroupByAll(select_query_typed.group_by_all); current_query_tree->setIsGroupByAll(select_query_typed.group_by_all);
current_query_tree->setIsOrderByAll(select_query_typed.order_by_all); /// order_by_all flag in AST is set w/o consideration of `enable_order_by_all` setting
/// since SETTINGS section has not been parsed yet, - so, check the setting here
if (enable_order_by_all)
current_query_tree->setIsOrderByAll(select_query_typed.order_by_all);
current_query_tree->setOriginalAST(select_query); current_query_tree->setOriginalAST(select_query);
auto current_context = current_query_tree->getContext(); auto current_context = current_query_tree->getContext();

View File

@ -1740,7 +1740,7 @@ QueryAnalyzer::QueryTreeNodesWithNames QueryAnalyzer::resolveQualifiedMatcher(Qu
const auto * tuple_data_type = typeid_cast<const DataTypeTuple *>(result_type.get()); const auto * tuple_data_type = typeid_cast<const DataTypeTuple *>(result_type.get());
if (!tuple_data_type) if (!tuple_data_type)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
"Qualified matcher {} find non compound expression {} with type {}. Expected tuple or array of tuples. In scope {}", "Qualified matcher {} found a non-compound expression {} with type {}. Expected a tuple or an array of tuples. In scope {}",
matcher_node->formatASTForErrorMessage(), matcher_node->formatASTForErrorMessage(),
expression_query_tree_node->formatASTForErrorMessage(), expression_query_tree_node->formatASTForErrorMessage(),
expression_query_tree_node->getResultType()->getName(), expression_query_tree_node->getResultType()->getName(),

View File

@ -477,7 +477,7 @@ void ClientBase::sendExternalTables(ASTPtr parsed_query)
std::vector<ExternalTableDataPtr> data; std::vector<ExternalTableDataPtr> data;
for (auto & table : external_tables) for (auto & table : external_tables)
data.emplace_back(table.getData(global_context)); data.emplace_back(table.getData(client_context));
connection->sendExternalTablesData(data); connection->sendExternalTablesData(data);
} }
@ -690,10 +690,10 @@ try
/// intermixed with data with parallel formatting. /// intermixed with data with parallel formatting.
/// It may increase code complexity significantly. /// It may increase code complexity significantly.
if (!extras_into_stdout || select_only_into_file) if (!extras_into_stdout || select_only_into_file)
output_format = global_context->getOutputFormatParallelIfPossible( output_format = client_context->getOutputFormatParallelIfPossible(
current_format, out_file_buf ? *out_file_buf : *out_buf, block); current_format, out_file_buf ? *out_file_buf : *out_buf, block);
else else
output_format = global_context->getOutputFormat( output_format = client_context->getOutputFormat(
current_format, out_file_buf ? *out_file_buf : *out_buf, block); current_format, out_file_buf ? *out_file_buf : *out_buf, block);
output_format->setAutoFlush(); output_format->setAutoFlush();
@ -772,6 +772,15 @@ void ClientBase::adjustSettings()
global_context->setSettings(settings); global_context->setSettings(settings);
} }
void ClientBase::initClientContext()
{
client_context->setClientName(std::string(DEFAULT_CLIENT_NAME));
client_context->setQuotaClientKey(getClientConfiguration().getString("quota_key", ""));
client_context->setQueryKindInitial();
client_context->setQueryKind(query_kind);
client_context->setQueryParameters(query_parameters);
}
bool ClientBase::isRegularFile(int fd) bool ClientBase::isRegularFile(int fd)
{ {
struct stat file_stat; struct stat file_stat;
@ -962,7 +971,7 @@ void ClientBase::processTextAsSingleQuery(const String & full_query)
/// client-side. Thus we need to parse the query. /// client-side. Thus we need to parse the query.
const char * begin = full_query.data(); const char * begin = full_query.data();
auto parsed_query = parseQuery(begin, begin + full_query.size(), auto parsed_query = parseQuery(begin, begin + full_query.size(),
global_context->getSettingsRef(), client_context->getSettingsRef(),
/*allow_multi_statements=*/ false); /*allow_multi_statements=*/ false);
if (!parsed_query) if (!parsed_query)
@ -985,7 +994,7 @@ void ClientBase::processTextAsSingleQuery(const String & full_query)
/// But for asynchronous inserts we don't extract data, because it's needed /// But for asynchronous inserts we don't extract data, because it's needed
/// to be done on server side in that case (for coalescing the data from multiple inserts on server side). /// to be done on server side in that case (for coalescing the data from multiple inserts on server side).
const auto * insert = parsed_query->as<ASTInsertQuery>(); const auto * insert = parsed_query->as<ASTInsertQuery>();
if (insert && isSyncInsertWithData(*insert, global_context)) if (insert && isSyncInsertWithData(*insert, client_context))
query_to_execute = full_query.substr(0, insert->data - full_query.data()); query_to_execute = full_query.substr(0, insert->data - full_query.data());
else else
query_to_execute = full_query; query_to_execute = full_query;
@ -1103,7 +1112,7 @@ void ClientBase::processOrdinaryQuery(const String & query_to_execute, ASTPtr pa
} }
} }
const auto & settings = global_context->getSettingsRef(); const auto & settings = client_context->getSettingsRef();
const Int32 signals_before_stop = settings.partial_result_on_first_cancel ? 2 : 1; const Int32 signals_before_stop = settings.partial_result_on_first_cancel ? 2 : 1;
int retries_left = 10; int retries_left = 10;
@ -1118,10 +1127,10 @@ void ClientBase::processOrdinaryQuery(const String & query_to_execute, ASTPtr pa
connection_parameters.timeouts, connection_parameters.timeouts,
query, query,
query_parameters, query_parameters,
global_context->getCurrentQueryId(), client_context->getCurrentQueryId(),
query_processing_stage, query_processing_stage,
&global_context->getSettingsRef(), &client_context->getSettingsRef(),
&global_context->getClientInfo(), &client_context->getClientInfo(),
true, true,
[&](const Progress & progress) { onProgress(progress); }); [&](const Progress & progress) { onProgress(progress); });
@ -1308,7 +1317,7 @@ void ClientBase::onProgress(const Progress & value)
void ClientBase::onTimezoneUpdate(const String & tz) void ClientBase::onTimezoneUpdate(const String & tz)
{ {
global_context->setSetting("session_timezone", tz); client_context->setSetting("session_timezone", tz);
} }
@ -1504,13 +1513,13 @@ bool ClientBase::receiveSampleBlock(Block & out, ColumnsDescription & columns_de
void ClientBase::setInsertionTable(const ASTInsertQuery & insert_query) void ClientBase::setInsertionTable(const ASTInsertQuery & insert_query)
{ {
if (!global_context->hasInsertionTable() && insert_query.table) if (!client_context->hasInsertionTable() && insert_query.table)
{ {
String table = insert_query.table->as<ASTIdentifier &>().shortName(); String table = insert_query.table->as<ASTIdentifier &>().shortName();
if (!table.empty()) if (!table.empty())
{ {
String database = insert_query.database ? insert_query.database->as<ASTIdentifier &>().shortName() : ""; String database = insert_query.database ? insert_query.database->as<ASTIdentifier &>().shortName() : "";
global_context->setInsertionTable(StorageID(database, table)); client_context->setInsertionTable(StorageID(database, table));
} }
} }
} }
@ -1561,7 +1570,7 @@ void ClientBase::processInsertQuery(const String & query_to_execute, ASTPtr pars
const auto & parsed_insert_query = parsed_query->as<ASTInsertQuery &>(); const auto & parsed_insert_query = parsed_query->as<ASTInsertQuery &>();
if ((!parsed_insert_query.data && !parsed_insert_query.infile) && (is_interactive || (!stdin_is_a_tty && !isStdinNotEmptyAndValid(std_in)))) if ((!parsed_insert_query.data && !parsed_insert_query.infile) && (is_interactive || (!stdin_is_a_tty && !isStdinNotEmptyAndValid(std_in))))
{ {
const auto & settings = global_context->getSettingsRef(); const auto & settings = client_context->getSettingsRef();
if (settings.throw_if_no_data_to_insert) if (settings.throw_if_no_data_to_insert)
throw Exception(ErrorCodes::NO_DATA_TO_INSERT, "No data to insert"); throw Exception(ErrorCodes::NO_DATA_TO_INSERT, "No data to insert");
else else
@ -1575,10 +1584,10 @@ void ClientBase::processInsertQuery(const String & query_to_execute, ASTPtr pars
connection_parameters.timeouts, connection_parameters.timeouts,
query, query,
query_parameters, query_parameters,
global_context->getCurrentQueryId(), client_context->getCurrentQueryId(),
query_processing_stage, query_processing_stage,
&global_context->getSettingsRef(), &client_context->getSettingsRef(),
&global_context->getClientInfo(), &client_context->getClientInfo(),
true, true,
[&](const Progress & progress) { onProgress(progress); }); [&](const Progress & progress) { onProgress(progress); });
@ -1626,7 +1635,7 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
/// Set callback to be called on file progress. /// Set callback to be called on file progress.
if (tty_buf) if (tty_buf)
progress_indication.setFileProgressCallback(global_context, *tty_buf); progress_indication.setFileProgressCallback(client_context, *tty_buf);
} }
/// If data fetched from file (maybe compressed file) /// If data fetched from file (maybe compressed file)
@ -1660,10 +1669,10 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
} }
StorageFile::CommonArguments args{ StorageFile::CommonArguments args{
WithContext(global_context), WithContext(client_context),
parsed_insert_query->table_id, parsed_insert_query->table_id,
current_format, current_format,
getFormatSettings(global_context), getFormatSettings(client_context),
compression_method, compression_method,
columns_for_storage_file, columns_for_storage_file,
ConstraintsDescription{}, ConstraintsDescription{},
@ -1671,7 +1680,7 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
{}, {},
String{}, String{},
}; };
StoragePtr storage = std::make_shared<StorageFile>(in_file, global_context->getUserFilesPath(), args); StoragePtr storage = std::make_shared<StorageFile>(in_file, client_context->getUserFilesPath(), args);
storage->startup(); storage->startup();
SelectQueryInfo query_info; SelectQueryInfo query_info;
@ -1682,16 +1691,16 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
storage->read( storage->read(
plan, plan,
sample.getNames(), sample.getNames(),
storage->getStorageSnapshot(metadata, global_context), storage->getStorageSnapshot(metadata, client_context),
query_info, query_info,
global_context, client_context,
{}, {},
global_context->getSettingsRef().max_block_size, client_context->getSettingsRef().max_block_size,
getNumberOfPhysicalCPUCores()); getNumberOfPhysicalCPUCores());
auto builder = plan.buildQueryPipeline( auto builder = plan.buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(global_context), QueryPlanOptimizationSettings::fromContext(client_context),
BuildQueryPipelineSettings::fromContext(global_context)); BuildQueryPipelineSettings::fromContext(client_context));
QueryPlanResourceHolder resources; QueryPlanResourceHolder resources;
auto pipe = QueryPipelineBuilder::getPipe(std::move(*builder), resources); auto pipe = QueryPipelineBuilder::getPipe(std::move(*builder), resources);
@ -1752,14 +1761,14 @@ void ClientBase::sendDataFrom(ReadBuffer & buf, Block & sample, const ColumnsDes
current_format = insert->format; current_format = insert->format;
} }
auto source = global_context->getInputFormat(current_format, buf, sample, insert_format_max_block_size); auto source = client_context->getInputFormat(current_format, buf, sample, insert_format_max_block_size);
Pipe pipe(source); Pipe pipe(source);
if (columns_description.hasDefaults()) if (columns_description.hasDefaults())
{ {
pipe.addSimpleTransform([&](const Block & header) pipe.addSimpleTransform([&](const Block & header)
{ {
return std::make_shared<AddingDefaultsTransform>(header, columns_description, *source, global_context); return std::make_shared<AddingDefaultsTransform>(header, columns_description, *source, client_context);
}); });
} }
@ -1921,12 +1930,12 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
if (is_interactive) if (is_interactive)
{ {
global_context->setCurrentQueryId(""); client_context->setCurrentQueryId("");
// Generate a new query_id // Generate a new query_id
for (const auto & query_id_format : query_id_formats) for (const auto & query_id_format : query_id_formats)
{ {
writeString(query_id_format.first, std_out); writeString(query_id_format.first, std_out);
writeString(fmt::format(fmt::runtime(query_id_format.second), fmt::arg("query_id", global_context->getCurrentQueryId())), std_out); writeString(fmt::format(fmt::runtime(query_id_format.second), fmt::arg("query_id", client_context->getCurrentQueryId())), std_out);
writeChar('\n', std_out); writeChar('\n', std_out);
std_out.next(); std_out.next();
} }
@ -1953,7 +1962,7 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
auto password = auth_data->getPassword(); auto password = auth_data->getPassword();
if (password) if (password)
global_context->getAccessControl().checkPasswordComplexityRules(*password); client_context->getAccessControl().checkPasswordComplexityRules(*password);
} }
} }
} }
@ -1968,15 +1977,15 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
std::optional<Settings> old_settings; std::optional<Settings> old_settings;
SCOPE_EXIT_SAFE({ SCOPE_EXIT_SAFE({
if (old_settings) if (old_settings)
global_context->setSettings(*old_settings); client_context->setSettings(*old_settings);
}); });
auto apply_query_settings = [&](const IAST & settings_ast) auto apply_query_settings = [&](const IAST & settings_ast)
{ {
if (!old_settings) if (!old_settings)
old_settings.emplace(global_context->getSettingsRef()); old_settings.emplace(client_context->getSettingsRef());
global_context->applySettingsChanges(settings_ast.as<ASTSetQuery>()->changes); client_context->applySettingsChanges(settings_ast.as<ASTSetQuery>()->changes);
global_context->resetSettingsToDefaultValue(settings_ast.as<ASTSetQuery>()->default_settings); client_context->resetSettingsToDefaultValue(settings_ast.as<ASTSetQuery>()->default_settings);
}; };
const auto * insert = parsed_query->as<ASTInsertQuery>(); const auto * insert = parsed_query->as<ASTInsertQuery>();
@ -2009,7 +2018,7 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
if (insert && insert->select) if (insert && insert->select)
insert->tryFindInputFunction(input_function); insert->tryFindInputFunction(input_function);
bool is_async_insert_with_inlined_data = global_context->getSettingsRef().async_insert && insert && insert->hasInlinedData(); bool is_async_insert_with_inlined_data = client_context->getSettingsRef().async_insert && insert && insert->hasInlinedData();
if (is_async_insert_with_inlined_data) if (is_async_insert_with_inlined_data)
{ {
@ -2044,9 +2053,9 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
if (change.name == "profile") if (change.name == "profile")
current_profile = change.value.safeGet<String>(); current_profile = change.value.safeGet<String>();
else else
global_context->applySettingChange(change); client_context->applySettingChange(change);
} }
global_context->resetSettingsToDefaultValue(set_query->default_settings); client_context->resetSettingsToDefaultValue(set_query->default_settings);
/// Query parameters inside SET queries should be also saved on the client side /// Query parameters inside SET queries should be also saved on the client side
/// to override their previous definitions set with --param_* arguments /// to override their previous definitions set with --param_* arguments
@ -2054,7 +2063,7 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
for (const auto & [name, value] : set_query->query_parameters) for (const auto & [name, value] : set_query->query_parameters)
query_parameters.insert_or_assign(name, value); query_parameters.insert_or_assign(name, value);
global_context->addQueryParameters(NameToNameMap{set_query->query_parameters.begin(), set_query->query_parameters.end()}); client_context->addQueryParameters(NameToNameMap{set_query->query_parameters.begin(), set_query->query_parameters.end()});
} }
if (const auto * use_query = parsed_query->as<ASTUseQuery>()) if (const auto * use_query = parsed_query->as<ASTUseQuery>())
{ {
@ -2131,8 +2140,8 @@ MultiQueryProcessingStage ClientBase::analyzeMultiQueryText(
if (this_query_begin >= all_queries_end) if (this_query_begin >= all_queries_end)
return MultiQueryProcessingStage::QUERIES_END; return MultiQueryProcessingStage::QUERIES_END;
unsigned max_parser_depth = static_cast<unsigned>(global_context->getSettingsRef().max_parser_depth); unsigned max_parser_depth = static_cast<unsigned>(client_context->getSettingsRef().max_parser_depth);
unsigned max_parser_backtracks = static_cast<unsigned>(global_context->getSettingsRef().max_parser_backtracks); unsigned max_parser_backtracks = static_cast<unsigned>(client_context->getSettingsRef().max_parser_backtracks);
// If there are only comments left until the end of file, we just // If there are only comments left until the end of file, we just
// stop. The parser can't handle this situation because it always // stop. The parser can't handle this situation because it always
@ -2152,7 +2161,7 @@ MultiQueryProcessingStage ClientBase::analyzeMultiQueryText(
try try
{ {
parsed_query = parseQuery(this_query_end, all_queries_end, parsed_query = parseQuery(this_query_end, all_queries_end,
global_context->getSettingsRef(), client_context->getSettingsRef(),
/*allow_multi_statements=*/ true); /*allow_multi_statements=*/ true);
} }
catch (const Exception & e) catch (const Exception & e)
@ -2195,7 +2204,7 @@ MultiQueryProcessingStage ClientBase::analyzeMultiQueryText(
{ {
this_query_end = find_first_symbols<'\n'>(insert_ast->data, all_queries_end); this_query_end = find_first_symbols<'\n'>(insert_ast->data, all_queries_end);
insert_ast->end = this_query_end; insert_ast->end = this_query_end;
query_to_execute_end = isSyncInsertWithData(*insert_ast, global_context) ? insert_ast->data : this_query_end; query_to_execute_end = isSyncInsertWithData(*insert_ast, client_context) ? insert_ast->data : this_query_end;
} }
query_to_execute = all_queries_text.substr(this_query_begin - all_queries_text.data(), query_to_execute_end - this_query_begin); query_to_execute = all_queries_text.substr(this_query_begin - all_queries_text.data(), query_to_execute_end - this_query_begin);
@ -2404,13 +2413,13 @@ bool ClientBase::executeMultiQuery(const String & all_queries_text)
// , where the inline data is delimited by semicolon and not by a // , where the inline data is delimited by semicolon and not by a
// newline. // newline.
auto * insert_ast = parsed_query->as<ASTInsertQuery>(); auto * insert_ast = parsed_query->as<ASTInsertQuery>();
if (insert_ast && isSyncInsertWithData(*insert_ast, global_context)) if (insert_ast && isSyncInsertWithData(*insert_ast, client_context))
{ {
this_query_end = insert_ast->end; this_query_end = insert_ast->end;
adjustQueryEnd( adjustQueryEnd(
this_query_end, all_queries_end, this_query_end, all_queries_end,
static_cast<unsigned>(global_context->getSettingsRef().max_parser_depth), static_cast<unsigned>(client_context->getSettingsRef().max_parser_depth),
static_cast<unsigned>(global_context->getSettingsRef().max_parser_backtracks)); static_cast<unsigned>(client_context->getSettingsRef().max_parser_backtracks));
} }
// Report error. // Report error.
@ -2541,10 +2550,10 @@ void ClientBase::runInteractive()
if (load_suggestions) if (load_suggestions)
{ {
/// Load suggestion data from the server. /// Load suggestion data from the server.
if (global_context->getApplicationType() == Context::ApplicationType::CLIENT) if (client_context->getApplicationType() == Context::ApplicationType::CLIENT)
suggest->load<Connection>(global_context, connection_parameters, getClientConfiguration().getInt("suggestion_limit"), wait_for_suggestions_to_load); suggest->load<Connection>(client_context, connection_parameters, getClientConfiguration().getInt("suggestion_limit"), wait_for_suggestions_to_load);
else if (global_context->getApplicationType() == Context::ApplicationType::LOCAL) else if (client_context->getApplicationType() == Context::ApplicationType::LOCAL)
suggest->load<LocalConnection>(global_context, connection_parameters, getClientConfiguration().getInt("suggestion_limit"), wait_for_suggestions_to_load); suggest->load<LocalConnection>(client_context, connection_parameters, getClientConfiguration().getInt("suggestion_limit"), wait_for_suggestions_to_load);
} }
if (home_path.empty()) if (home_path.empty())
@ -2682,7 +2691,7 @@ void ClientBase::runInteractive()
{ {
// If a separate connection loading suggestions failed to open a new session, // If a separate connection loading suggestions failed to open a new session,
// use the main session to receive them. // use the main session to receive them.
suggest->load(*connection, connection_parameters.timeouts, getClientConfiguration().getInt("suggestion_limit"), global_context->getClientInfo()); suggest->load(*connection, connection_parameters.timeouts, getClientConfiguration().getInt("suggestion_limit"), client_context->getClientInfo());
} }
try try
@ -2731,10 +2740,10 @@ bool ClientBase::processMultiQueryFromFile(const String & file_name)
if (!getClientConfiguration().has("log_comment")) if (!getClientConfiguration().has("log_comment"))
{ {
Settings settings = global_context->getSettings(); Settings settings = client_context->getSettings();
/// NOTE: cannot use even weakly_canonical() since it fails for /dev/stdin due to resolving of "pipe:[X]" /// NOTE: cannot use even weakly_canonical() since it fails for /dev/stdin due to resolving of "pipe:[X]"
settings.log_comment = fs::absolute(fs::path(file_name)); settings.log_comment = fs::absolute(fs::path(file_name));
global_context->setSettings(settings); client_context->setSettings(settings);
} }
return executeMultiQuery(queries_from_file); return executeMultiQuery(queries_from_file);

View File

@ -206,6 +206,9 @@ protected:
/// Adjust some settings after command line options and config had been processed. /// Adjust some settings after command line options and config had been processed.
void adjustSettings(); void adjustSettings();
/// Initializes the client context.
void initClientContext();
void setDefaultFormatsAndCompressionFromConfiguration(); void setDefaultFormatsAndCompressionFromConfiguration();
void initTTYBuffer(ProgressOption progress); void initTTYBuffer(ProgressOption progress);
@ -215,6 +218,9 @@ protected:
SharedContextHolder shared_context; SharedContextHolder shared_context;
ContextMutablePtr global_context; ContextMutablePtr global_context;
/// Client context is a context used only by the client to parse queries, process query parameters and to connect to clickhouse-server.
ContextMutablePtr client_context;
LoggerPtr fatal_log; LoggerPtr fatal_log;
Poco::AutoPtr<Poco::SplitterChannel> fatal_channel_ptr; Poco::AutoPtr<Poco::SplitterChannel> fatal_channel_ptr;
Poco::AutoPtr<Poco::Channel> fatal_console_channel_ptr; Poco::AutoPtr<Poco::Channel> fatal_console_channel_ptr;

View File

@ -8,6 +8,7 @@
#include <Common/ErrorCodes.h> #include <Common/ErrorCodes.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Common/LockMemoryExceptionInThread.h> #include <Common/LockMemoryExceptionInThread.h>
#include <Common/Logger.h>
#include <Common/MemorySanitizer.h> #include <Common/MemorySanitizer.h>
#include <Common/SensitiveDataMasker.h> #include <Common/SensitiveDataMasker.h>
#include <Common/config_version.h> #include <Common/config_version.h>
@ -100,7 +101,7 @@ Exception::Exception(const MessageMasked & msg_masked, int code, bool remote_)
{ {
if (terminate_on_any_exception) if (terminate_on_any_exception)
std::_Exit(terminate_status_code); std::_Exit(terminate_status_code);
capture_thread_frame_pointers = thread_frame_pointers; capture_thread_frame_pointers = getThreadFramePointers();
handle_error_code(msg_masked.msg, code, remote, getStackFramePointers()); handle_error_code(msg_masked.msg, code, remote, getStackFramePointers());
} }
@ -110,7 +111,7 @@ Exception::Exception(MessageMasked && msg_masked, int code, bool remote_)
{ {
if (terminate_on_any_exception) if (terminate_on_any_exception)
std::_Exit(terminate_status_code); std::_Exit(terminate_status_code);
capture_thread_frame_pointers = thread_frame_pointers; capture_thread_frame_pointers = getThreadFramePointers();
handle_error_code(message(), code, remote, getStackFramePointers()); handle_error_code(message(), code, remote, getStackFramePointers());
} }
@ -119,7 +120,7 @@ Exception::Exception(CreateFromPocoTag, const Poco::Exception & exc)
{ {
if (terminate_on_any_exception) if (terminate_on_any_exception)
std::_Exit(terminate_status_code); std::_Exit(terminate_status_code);
capture_thread_frame_pointers = thread_frame_pointers; capture_thread_frame_pointers = getThreadFramePointers();
#ifdef STD_EXCEPTION_HAS_STACK_TRACE #ifdef STD_EXCEPTION_HAS_STACK_TRACE
auto * stack_trace_frames = exc.get_stack_trace_frames(); auto * stack_trace_frames = exc.get_stack_trace_frames();
auto stack_trace_size = exc.get_stack_trace_size(); auto stack_trace_size = exc.get_stack_trace_size();
@ -133,7 +134,7 @@ Exception::Exception(CreateFromSTDTag, const std::exception & exc)
{ {
if (terminate_on_any_exception) if (terminate_on_any_exception)
std::_Exit(terminate_status_code); std::_Exit(terminate_status_code);
capture_thread_frame_pointers = thread_frame_pointers; capture_thread_frame_pointers = getThreadFramePointers();
#ifdef STD_EXCEPTION_HAS_STACK_TRACE #ifdef STD_EXCEPTION_HAS_STACK_TRACE
auto * stack_trace_frames = exc.get_stack_trace_frames(); auto * stack_trace_frames = exc.get_stack_trace_frames();
auto stack_trace_size = exc.get_stack_trace_size(); auto stack_trace_size = exc.get_stack_trace_size();
@ -223,10 +224,38 @@ Exception::FramePointers Exception::getStackFramePointers() const
} }
thread_local bool Exception::enable_job_stack_trace = false; thread_local bool Exception::enable_job_stack_trace = false;
thread_local std::vector<StackTrace::FramePointers> Exception::thread_frame_pointers = {}; thread_local bool Exception::can_use_thread_frame_pointers = false;
thread_local Exception::ThreadFramePointers Exception::thread_frame_pointers;
Exception::ThreadFramePointers::ThreadFramePointers()
{
can_use_thread_frame_pointers = true;
}
Exception::ThreadFramePointers::~ThreadFramePointers()
{
can_use_thread_frame_pointers = false;
}
Exception::ThreadFramePointersBase Exception::getThreadFramePointers()
{
if (can_use_thread_frame_pointers)
return thread_frame_pointers.frame_pointers;
return {};
}
void Exception::setThreadFramePointers(ThreadFramePointersBase frame_pointers)
{
if (can_use_thread_frame_pointers)
thread_frame_pointers.frame_pointers = std::move(frame_pointers);
}
static void tryLogCurrentExceptionImpl(Poco::Logger * logger, const std::string & start_of_message) static void tryLogCurrentExceptionImpl(Poco::Logger * logger, const std::string & start_of_message)
{ {
if (!isLoggingEnabled())
return;
try try
{ {
PreformattedMessage message = getCurrentExceptionMessageAndPattern(true); PreformattedMessage message = getCurrentExceptionMessageAndPattern(true);
@ -242,6 +271,9 @@ static void tryLogCurrentExceptionImpl(Poco::Logger * logger, const std::string
void tryLogCurrentException(const char * log_name, const std::string & start_of_message) void tryLogCurrentException(const char * log_name, const std::string & start_of_message)
{ {
if (!isLoggingEnabled())
return;
/// Under high memory pressure, new allocations throw a /// Under high memory pressure, new allocations throw a
/// MEMORY_LIMIT_EXCEEDED exception. /// MEMORY_LIMIT_EXCEEDED exception.
/// ///

View File

@ -10,7 +10,6 @@
#include <cerrno> #include <cerrno>
#include <exception> #include <exception>
#include <memory>
#include <vector> #include <vector>
#include <fmt/core.h> #include <fmt/core.h>
@ -49,14 +48,14 @@ public:
{ {
if (terminate_on_any_exception) if (terminate_on_any_exception)
std::terminate(); std::terminate();
capture_thread_frame_pointers = thread_frame_pointers; capture_thread_frame_pointers = getThreadFramePointers();
} }
Exception(const PreformattedMessage & msg, int code): Exception(msg.text, code) Exception(const PreformattedMessage & msg, int code): Exception(msg.text, code)
{ {
if (terminate_on_any_exception) if (terminate_on_any_exception)
std::terminate(); std::terminate();
capture_thread_frame_pointers = thread_frame_pointers; capture_thread_frame_pointers = getThreadFramePointers();
message_format_string = msg.format_string; message_format_string = msg.format_string;
message_format_string_args = msg.format_string_args; message_format_string_args = msg.format_string_args;
} }
@ -65,18 +64,36 @@ public:
{ {
if (terminate_on_any_exception) if (terminate_on_any_exception)
std::terminate(); std::terminate();
capture_thread_frame_pointers = thread_frame_pointers; capture_thread_frame_pointers = getThreadFramePointers();
message_format_string = msg.format_string; message_format_string = msg.format_string;
message_format_string_args = msg.format_string_args; message_format_string_args = msg.format_string_args;
} }
/// Collect call stacks of all previous jobs' schedulings leading to this thread job's execution /// Collect call stacks of all previous jobs' schedulings leading to this thread job's execution
static thread_local bool enable_job_stack_trace; static thread_local bool enable_job_stack_trace;
static thread_local std::vector<StackTrace::FramePointers> thread_frame_pointers; static thread_local bool can_use_thread_frame_pointers;
/// Because of unknown order of static destructor calls,
/// thread_frame_pointers can already be uninitialized when a different destructor generates an exception.
/// To prevent such scenarios, a wrapper class is created and a function that will return empty vector
/// if its destructor is already called
using ThreadFramePointersBase = std::vector<StackTrace::FramePointers>;
struct ThreadFramePointers
{
ThreadFramePointers();
~ThreadFramePointers();
ThreadFramePointersBase frame_pointers;
};
static ThreadFramePointersBase getThreadFramePointers();
static void setThreadFramePointers(ThreadFramePointersBase frame_pointers);
/// Callback for any exception /// Callback for any exception
static std::function<void(const std::string & msg, int code, bool remote, const Exception::FramePointers & trace)> callback; static std::function<void(const std::string & msg, int code, bool remote, const Exception::FramePointers & trace)> callback;
protected: protected:
static thread_local ThreadFramePointers thread_frame_pointers;
// used to remove the sensitive information from exceptions if query_masking_rules is configured // used to remove the sensitive information from exceptions if query_masking_rules is configured
struct MessageMasked struct MessageMasked
{ {
@ -178,7 +195,7 @@ class ErrnoException : public Exception
public: public:
ErrnoException(std::string && msg, int code, int with_errno) : Exception(msg, code), saved_errno(with_errno) ErrnoException(std::string && msg, int code, int with_errno) : Exception(msg, code), saved_errno(with_errno)
{ {
capture_thread_frame_pointers = thread_frame_pointers; capture_thread_frame_pointers = getThreadFramePointers();
addMessage(", {}", errnoToString(saved_errno)); addMessage(", {}", errnoToString(saved_errno));
} }
@ -187,7 +204,7 @@ public:
requires std::is_convertible_v<T, String> requires std::is_convertible_v<T, String>
ErrnoException(int code, T && message) : Exception(message, code), saved_errno(errno) ErrnoException(int code, T && message) : Exception(message, code), saved_errno(errno)
{ {
capture_thread_frame_pointers = thread_frame_pointers; capture_thread_frame_pointers = getThreadFramePointers();
addMessage(", {}", errnoToString(saved_errno)); addMessage(", {}", errnoToString(saved_errno));
} }

View File

@ -25,3 +25,15 @@ bool hasLogger(const std::string & name)
{ {
return Poco::Logger::has(name); return Poco::Logger::has(name);
} }
static constinit std::atomic<bool> allow_logging{true};
bool isLoggingEnabled()
{
return allow_logging;
}
void disableLogging()
{
allow_logging = false;
}

View File

@ -64,3 +64,7 @@ LoggerRawPtr createRawLogger(const std::string & name, Poco::Channel * channel,
* Otherwise, returns false. * Otherwise, returns false.
*/ */
bool hasLogger(const std::string & name); bool hasLogger(const std::string & name);
void disableLogging();
bool isLoggingEnabled();

View File

@ -89,7 +89,7 @@ void signalHandler(int sig, siginfo_t * info, void * context)
writePODBinary(*info, out); writePODBinary(*info, out);
writePODBinary(signal_context, out); writePODBinary(signal_context, out);
writePODBinary(stack_trace, out); writePODBinary(stack_trace, out);
writeVectorBinary(Exception::enable_job_stack_trace ? Exception::thread_frame_pointers : std::vector<StackTrace::FramePointers>{}, out); writeVectorBinary(Exception::enable_job_stack_trace ? Exception::getThreadFramePointers() : std::vector<StackTrace::FramePointers>{}, out);
writeBinary(static_cast<UInt32>(getThreadId()), out); writeBinary(static_cast<UInt32>(getThreadId()), out);
writePODBinary(current_thread, out); writePODBinary(current_thread, out);

View File

@ -489,24 +489,25 @@ struct CacheEntry
using CacheEntryPtr = std::shared_ptr<CacheEntry>; using CacheEntryPtr = std::shared_ptr<CacheEntry>;
static constinit std::atomic<bool> can_use_cache = false; static constinit bool can_use_cache = false;
using StackTraceCacheBase = std::map<StackTraceTriple, CacheEntryPtr, std::less<>>; using StackTraceCacheBase = std::map<StackTraceTriple, CacheEntryPtr, std::less<>>;
struct StackTraceCache : public StackTraceCacheBase struct StackTraceCache : public StackTraceCacheBase
{ {
StackTraceCache()
: StackTraceCacheBase()
{
can_use_cache = true;
}
~StackTraceCache() ~StackTraceCache()
{ {
can_use_cache = false; can_use_cache = false;
} }
}; };
static StackTraceCache & cacheInstance() static StackTraceCache cache;
{
static StackTraceCache cache;
can_use_cache = true;
return cache;
}
static DB::SharedMutex stacktrace_cache_mutex; static DB::SharedMutex stacktrace_cache_mutex;
@ -524,7 +525,6 @@ String toStringCached(const StackTrace::FramePointers & pointers, size_t offset,
/// Calculation of stack trace text is extremely slow. /// Calculation of stack trace text is extremely slow.
/// We use cache because otherwise the server could be overloaded by trash queries. /// We use cache because otherwise the server could be overloaded by trash queries.
/// Note that this cache can grow unconditionally, but practically it should be small. /// Note that this cache can grow unconditionally, but practically it should be small.
StackTraceCache & cache = cacheInstance();
CacheEntryPtr cache_entry; CacheEntryPtr cache_entry;
// Optimistic try for cache hit to avoid any contention whatsoever, should be the main hot code route // Optimistic try for cache hit to avoid any contention whatsoever, should be the main hot code route
@ -576,7 +576,7 @@ std::string StackTrace::toString(void * const * frame_pointers_raw, size_t offse
void StackTrace::dropCache() void StackTrace::dropCache()
{ {
std::lock_guard lock{stacktrace_cache_mutex}; std::lock_guard lock{stacktrace_cache_mutex};
cacheInstance().clear(); cache.clear();
} }

View File

@ -51,7 +51,7 @@ public:
if (!capture_frame_pointers) if (!capture_frame_pointers)
return; return;
/// Save all previous jobs call stacks and append with current /// Save all previous jobs call stacks and append with current
frame_pointers = DB::Exception::thread_frame_pointers; frame_pointers = DB::Exception::getThreadFramePointers();
frame_pointers.push_back(StackTrace().getFramePointers()); frame_pointers.push_back(StackTrace().getFramePointers());
} }
@ -455,7 +455,7 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
try try
{ {
if (DB::Exception::enable_job_stack_trace) if (DB::Exception::enable_job_stack_trace)
DB::Exception::thread_frame_pointers = std::move(job_data->frame_pointers); DB::Exception::setThreadFramePointers(std::move(job_data->frame_pointers));
CurrentMetrics::Increment metric_active_pool_threads(metric_active_threads); CurrentMetrics::Increment metric_active_pool_threads(metric_active_threads);

View File

@ -1,11 +1,12 @@
#if defined(OS_LINUX) #if defined(OS_LINUX)
#include <Common/TimerDescriptor.h> #include <Common/TimerDescriptor.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <sys/timerfd.h> #include <sys/timerfd.h>
#include <fcntl.h>
#include <unistd.h> #include <unistd.h>
namespace DB namespace DB
{ {
@ -13,21 +14,18 @@ namespace ErrorCodes
{ {
extern const int CANNOT_CREATE_TIMER; extern const int CANNOT_CREATE_TIMER;
extern const int CANNOT_SET_TIMER_PERIOD; extern const int CANNOT_SET_TIMER_PERIOD;
extern const int CANNOT_FCNTL;
extern const int CANNOT_READ_FROM_SOCKET; extern const int CANNOT_READ_FROM_SOCKET;
} }
TimerDescriptor::TimerDescriptor(int clockid, int flags) TimerDescriptor::TimerDescriptor()
{ {
timer_fd = timerfd_create(clockid, flags); timer_fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
if (timer_fd == -1) if (timer_fd == -1)
throw Exception(ErrorCodes::CANNOT_CREATE_TIMER, "Cannot create timer_fd descriptor"); throw ErrnoException(ErrorCodes::CANNOT_CREATE_TIMER, "Cannot create timer_fd descriptor");
if (-1 == fcntl(timer_fd, F_SETFL, O_NONBLOCK))
throw ErrnoException(ErrorCodes::CANNOT_FCNTL, "Cannot set O_NONBLOCK for timer_fd");
} }
TimerDescriptor::TimerDescriptor(TimerDescriptor && other) noexcept : timer_fd(other.timer_fd) TimerDescriptor::TimerDescriptor(TimerDescriptor && other) noexcept
: timer_fd(other.timer_fd)
{ {
other.timer_fd = -1; other.timer_fd = -1;
} }
@ -40,21 +38,19 @@ TimerDescriptor & TimerDescriptor::operator=(DB::TimerDescriptor && other) noexc
TimerDescriptor::~TimerDescriptor() TimerDescriptor::~TimerDescriptor()
{ {
/// Do not check for result cause cannot throw exception.
if (timer_fd != -1) if (timer_fd != -1)
{ {
int err = close(timer_fd); if (0 != ::close(timer_fd))
chassert(!err || errno == EINTR); std::terminate();
} }
} }
void TimerDescriptor::reset() const void TimerDescriptor::reset() const
{ {
itimerspec spec; if (timer_fd == -1)
spec.it_interval.tv_nsec = 0; return;
spec.it_interval.tv_sec = 0;
spec.it_value.tv_sec = 0; itimerspec spec{};
spec.it_value.tv_nsec = 0;
if (-1 == timerfd_settime(timer_fd, 0 /*relative timer */, &spec, nullptr)) if (-1 == timerfd_settime(timer_fd, 0 /*relative timer */, &spec, nullptr))
throw ErrnoException(ErrorCodes::CANNOT_SET_TIMER_PERIOD, "Cannot reset timer_fd"); throw ErrnoException(ErrorCodes::CANNOT_SET_TIMER_PERIOD, "Cannot reset timer_fd");
@ -66,25 +62,46 @@ void TimerDescriptor::reset() const
void TimerDescriptor::drain() const void TimerDescriptor::drain() const
{ {
if (timer_fd == -1)
return;
/// It is expected that socket returns 8 bytes when readable. /// It is expected that socket returns 8 bytes when readable.
/// Read in loop anyway cause signal may interrupt read call. /// Read in loop anyway cause signal may interrupt read call.
/// man timerfd_create:
/// If the timer has already expired one or more times since its settings were last modified using timerfd_settime(),
/// or since the last successful read(2), then the buffer given to read(2) returns an unsigned 8-byte integer (uint64_t)
/// containing the number of expirations that have occurred.
/// (The returned value is in host byte order—that is, the native byte order for integers on the host machine.)
uint64_t buf; uint64_t buf;
while (true) while (true)
{ {
ssize_t res = ::read(timer_fd, &buf, sizeof(buf)); ssize_t res = ::read(timer_fd, &buf, sizeof(buf));
if (res < 0) if (res < 0)
{ {
/// man timerfd_create:
/// If no timer expirations have occurred at the time of the read(2),
/// then the call either blocks until the next timer expiration, or fails with the error EAGAIN
/// if the file descriptor has been made nonblocking
/// (via the use of the fcntl(2) F_SETFL operation to set the O_NONBLOCK flag).
if (errno == EAGAIN) if (errno == EAGAIN)
break; break;
if (errno != EINTR) /// A signal happened, need to retry.
throw ErrnoException(ErrorCodes::CANNOT_READ_FROM_SOCKET, "Cannot drain timer_fd"); if (errno == EINTR)
continue;
throw ErrnoException(ErrorCodes::CANNOT_READ_FROM_SOCKET, "Cannot drain timer_fd");
} }
chassert(res == sizeof(buf));
} }
} }
void TimerDescriptor::setRelative(uint64_t usec) const void TimerDescriptor::setRelative(uint64_t usec) const
{ {
chassert(timer_fd >= 0);
static constexpr uint32_t TIMER_PRECISION = 1e6; static constexpr uint32_t TIMER_PRECISION = 1e6;
itimerspec spec; itimerspec spec;
@ -103,4 +120,5 @@ void TimerDescriptor::setRelative(Poco::Timespan timespan) const
} }
} }
#endif #endif

View File

@ -12,7 +12,7 @@ private:
int timer_fd; int timer_fd;
public: public:
explicit TimerDescriptor(int clockid = CLOCK_MONOTONIC, int flags = 0); TimerDescriptor();
~TimerDescriptor(); ~TimerDescriptor();
TimerDescriptor(const TimerDescriptor &) = delete; TimerDescriptor(const TimerDescriptor &) = delete;

View File

@ -548,7 +548,7 @@ public:
virtual bool isExpired() const = 0; virtual bool isExpired() const = 0;
/// Get the current connected node idx. /// Get the current connected node idx.
virtual Int8 getConnectedNodeIdx() const = 0; virtual std::optional<int8_t> getConnectedNodeIdx() const = 0;
/// Get the current connected host and port. /// Get the current connected host and port.
virtual String getConnectedHostPort() const = 0; virtual String getConnectedHostPort() const = 0;

View File

@ -39,7 +39,7 @@ public:
~TestKeeper() override; ~TestKeeper() override;
bool isExpired() const override { return expired; } bool isExpired() const override { return expired; }
Int8 getConnectedNodeIdx() const override { return 0; } std::optional<int8_t> getConnectedNodeIdx() const override { return 0; }
String getConnectedHostPort() const override { return "TestKeeper:0000"; } String getConnectedHostPort() const override { return "TestKeeper:0000"; }
int32_t getConnectionXid() const override { return 0; } int32_t getConnectionXid() const override { return 0; }
int64_t getSessionID() const override { return 0; } int64_t getSessionID() const override { return 0; }

View File

@ -128,16 +128,15 @@ void ZooKeeper::init(ZooKeeperArgs args_, std::unique_ptr<Coordination::IKeeper>
ShuffleHosts shuffled_hosts = shuffleHosts(); ShuffleHosts shuffled_hosts = shuffleHosts();
impl = std::make_unique<Coordination::ZooKeeper>(shuffled_hosts, args, zk_log); impl = std::make_unique<Coordination::ZooKeeper>(shuffled_hosts, args, zk_log);
Int8 node_idx = impl->getConnectedNodeIdx(); auto node_idx = impl->getConnectedNodeIdx();
if (args.chroot.empty()) if (args.chroot.empty())
LOG_TRACE(log, "Initialized, hosts: {}", fmt::join(args.hosts, ",")); LOG_TRACE(log, "Initialized, hosts: {}", fmt::join(args.hosts, ","));
else else
LOG_TRACE(log, "Initialized, hosts: {}, chroot: {}", fmt::join(args.hosts, ","), args.chroot); LOG_TRACE(log, "Initialized, hosts: {}, chroot: {}", fmt::join(args.hosts, ","), args.chroot);
/// If the balancing strategy has an optimal node then it will be the first in the list /// If the balancing strategy has an optimal node then it will be the first in the list
bool connected_to_suboptimal_node = node_idx != shuffled_hosts[0].original_index; bool connected_to_suboptimal_node = node_idx && static_cast<UInt8>(*node_idx) != shuffled_hosts[0].original_index;
bool respect_az = args.prefer_local_availability_zone && !args.client_availability_zone.empty(); bool respect_az = args.prefer_local_availability_zone && !args.client_availability_zone.empty();
bool may_benefit_from_reconnecting = respect_az || args.get_priority_load_balancing.hasOptimalNode(); bool may_benefit_from_reconnecting = respect_az || args.get_priority_load_balancing.hasOptimalNode();
if (connected_to_suboptimal_node && may_benefit_from_reconnecting) if (connected_to_suboptimal_node && may_benefit_from_reconnecting)
@ -145,7 +144,7 @@ void ZooKeeper::init(ZooKeeperArgs args_, std::unique_ptr<Coordination::IKeeper>
auto reconnect_timeout_sec = getSecondsUntilReconnect(args); auto reconnect_timeout_sec = getSecondsUntilReconnect(args);
LOG_DEBUG(log, "Connected to a suboptimal ZooKeeper host ({}, index {})." LOG_DEBUG(log, "Connected to a suboptimal ZooKeeper host ({}, index {})."
" To preserve balance in ZooKeeper usage, this ZooKeeper session will expire in {} seconds", " To preserve balance in ZooKeeper usage, this ZooKeeper session will expire in {} seconds",
impl->getConnectedHostPort(), node_idx, reconnect_timeout_sec); impl->getConnectedHostPort(), *node_idx, reconnect_timeout_sec);
auto reconnect_task_holder = DB::Context::getGlobalContextInstance()->getSchedulePool().createTask("ZKReconnect", [this, optimal_host = shuffled_hosts[0]]() auto reconnect_task_holder = DB::Context::getGlobalContextInstance()->getSchedulePool().createTask("ZKReconnect", [this, optimal_host = shuffled_hosts[0]]()
{ {
@ -154,13 +153,15 @@ void ZooKeeper::init(ZooKeeperArgs args_, std::unique_ptr<Coordination::IKeeper>
LOG_DEBUG(log, "Trying to connect to a more optimal node {}", optimal_host.host); LOG_DEBUG(log, "Trying to connect to a more optimal node {}", optimal_host.host);
ShuffleHosts node{optimal_host}; ShuffleHosts node{optimal_host};
std::unique_ptr<Coordination::IKeeper> new_impl = std::make_unique<Coordination::ZooKeeper>(node, args, zk_log); std::unique_ptr<Coordination::IKeeper> new_impl = std::make_unique<Coordination::ZooKeeper>(node, args, zk_log);
Int8 new_node_idx = new_impl->getConnectedNodeIdx();
auto new_node_idx = new_impl->getConnectedNodeIdx();
chassert(new_node_idx.has_value());
/// Maybe the node was unavailable when getting AZs first time, update just in case /// Maybe the node was unavailable when getting AZs first time, update just in case
if (args.availability_zone_autodetect && availability_zones[new_node_idx].empty()) if (args.availability_zone_autodetect && availability_zones[*new_node_idx].empty())
{ {
availability_zones[new_node_idx] = new_impl->tryGetAvailabilityZone(); availability_zones[*new_node_idx] = new_impl->tryGetAvailabilityZone();
LOG_DEBUG(log, "Got availability zone for {}: {}", optimal_host.host, availability_zones[new_node_idx]); LOG_DEBUG(log, "Got availability zone for {}: {}", optimal_host.host, availability_zones[*new_node_idx]);
} }
optimal_impl = std::move(new_impl); optimal_impl = std::move(new_impl);
@ -1525,7 +1526,7 @@ void ZooKeeper::setServerCompletelyStarted()
zk->setServerCompletelyStarted(); zk->setServerCompletelyStarted();
} }
Int8 ZooKeeper::getConnectedHostIdx() const std::optional<int8_t> ZooKeeper::getConnectedHostIdx() const
{ {
return impl->getConnectedNodeIdx(); return impl->getConnectedNodeIdx();
} }
@ -1544,10 +1545,10 @@ String ZooKeeper::getConnectedHostAvailabilityZone() const
{ {
if (args.implementation != "zookeeper" || !impl) if (args.implementation != "zookeeper" || !impl)
return ""; return "";
Int8 idx = impl->getConnectedNodeIdx(); std::optional<int8_t> idx = impl->getConnectedNodeIdx();
if (idx < 0) if (!idx)
return ""; /// session expired return ""; /// session expired
return availability_zones.at(idx); return availability_zones.at(*idx);
} }
size_t getFailedOpIndex(Coordination::Error exception_code, const Coordination::Responses & responses) size_t getFailedOpIndex(Coordination::Error exception_code, const Coordination::Responses & responses)

View File

@ -620,7 +620,7 @@ public:
void setServerCompletelyStarted(); void setServerCompletelyStarted();
Int8 getConnectedHostIdx() const; std::optional<int8_t> getConnectedHostIdx() const;
String getConnectedHostPort() const; String getConnectedHostPort() const;
int32_t getConnectionXid() const; int32_t getConnectionXid() const;

View File

@ -536,7 +536,7 @@ void ZooKeeper::connect(
compressed_out.emplace(*out, CompressionCodecFactory::instance().get("LZ4", {})); compressed_out.emplace(*out, CompressionCodecFactory::instance().get("LZ4", {}));
} }
original_index = static_cast<Int8>(node.original_index); original_index.store(node.original_index);
break; break;
} }
catch (...) catch (...)
@ -1531,6 +1531,30 @@ void ZooKeeper::close()
} }
std::optional<int8_t> ZooKeeper::getConnectedNodeIdx() const
{
int8_t res = original_index.load();
if (res == -1)
return std::nullopt;
else
return res;
}
String ZooKeeper::getConnectedHostPort() const
{
auto idx = getConnectedNodeIdx();
if (idx)
return args.hosts[*idx];
else
return "";
}
int32_t ZooKeeper::getConnectionXid() const
{
return next_xid.load();
}
void ZooKeeper::setZooKeeperLog(std::shared_ptr<DB::ZooKeeperLog> zk_log_) void ZooKeeper::setZooKeeperLog(std::shared_ptr<DB::ZooKeeperLog> zk_log_)
{ {
/// logOperationIfNeeded(...) uses zk_log and can be called from different threads, so we have to use atomic shared_ptr /// logOperationIfNeeded(...) uses zk_log and can be called from different threads, so we have to use atomic shared_ptr

View File

@ -114,13 +114,12 @@ public:
~ZooKeeper() override; ~ZooKeeper() override;
/// If expired, you can only destroy the object. All other methods will throw exception. /// If expired, you can only destroy the object. All other methods will throw exception.
bool isExpired() const override { return requests_queue.isFinished(); } bool isExpired() const override { return requests_queue.isFinished(); }
Int8 getConnectedNodeIdx() const override { return original_index; } std::optional<int8_t> getConnectedNodeIdx() const override;
String getConnectedHostPort() const override { return (original_index == -1) ? "" : args.hosts[original_index]; } String getConnectedHostPort() const override;
int32_t getConnectionXid() const override { return next_xid.load(); } int32_t getConnectionXid() const override;
String tryGetAvailabilityZone() override; String tryGetAvailabilityZone() override;
@ -219,7 +218,7 @@ private:
ACLs default_acls; ACLs default_acls;
zkutil::ZooKeeperArgs args; zkutil::ZooKeeperArgs args;
Int8 original_index = -1; std::atomic<int8_t> original_index{-1};
/// Fault injection /// Fault injection
void maybeInjectSendFault(); void maybeInjectSendFault();

View File

@ -1,2 +1,2 @@
clickhouse_add_executable (mysqlxx_pool_test mysqlxx_pool_test.cpp) clickhouse_add_executable (mysqlxx_pool_test mysqlxx_pool_test.cpp)
target_link_libraries (mysqlxx_pool_test PRIVATE mysqlxx clickhouse_common_config) target_link_libraries (mysqlxx_pool_test PRIVATE mysqlxx clickhouse_common_config loggers_no_text_log)

View File

@ -186,7 +186,7 @@ class IColumn;
M(Bool, allow_suspicious_ttl_expressions, false, "Reject TTL expressions that don't depend on any of table's columns. It indicates a user error most of the time.", 0) \ M(Bool, allow_suspicious_ttl_expressions, false, "Reject TTL expressions that don't depend on any of table's columns. It indicates a user error most of the time.", 0) \
M(Bool, allow_suspicious_variant_types, false, "In CREATE TABLE statement allows specifying Variant type with similar variant types (for example, with different numeric or date types). Enabling this setting may introduce some ambiguity when working with values with similar types.", 0) \ M(Bool, allow_suspicious_variant_types, false, "In CREATE TABLE statement allows specifying Variant type with similar variant types (for example, with different numeric or date types). Enabling this setting may introduce some ambiguity when working with values with similar types.", 0) \
M(Bool, allow_suspicious_primary_key, false, "Forbid suspicious PRIMARY KEY/ORDER BY for MergeTree (i.e. SimpleAggregateFunction)", 0) \ M(Bool, allow_suspicious_primary_key, false, "Forbid suspicious PRIMARY KEY/ORDER BY for MergeTree (i.e. SimpleAggregateFunction)", 0) \
M(Bool, compile_expressions, false, "Compile some scalar functions and operators to native code.", 0) \ M(Bool, compile_expressions, false, "Compile some scalar functions and operators to native code. Due to a bug in the LLVM compiler infrastructure, on AArch64 machines, it is known to lead to a nullptr dereference and, consequently, server crash. Do not enable this setting.", 0) \
M(UInt64, min_count_to_compile_expression, 3, "The number of identical expressions before they are JIT-compiled", 0) \ M(UInt64, min_count_to_compile_expression, 3, "The number of identical expressions before they are JIT-compiled", 0) \
M(Bool, compile_aggregate_expressions, true, "Compile aggregate functions to native code.", 0) \ M(Bool, compile_aggregate_expressions, true, "Compile aggregate functions to native code.", 0) \
M(UInt64, min_count_to_compile_aggregate_expression, 3, "The number of identical aggregate expressions before they are JIT-compiled", 0) \ M(UInt64, min_count_to_compile_aggregate_expression, 3, "The number of identical aggregate expressions before they are JIT-compiled", 0) \

View File

@ -158,7 +158,7 @@ BaseDaemon::~BaseDaemon()
tryLogCurrentException(&logger()); tryLogCurrentException(&logger());
} }
OwnSplitChannel::disableLogging(); disableLogging();
} }

View File

@ -647,12 +647,13 @@ LoadTaskPtr DatabaseReplicated::startupDatabaseAsync(AsyncLoader & async_loader,
{ {
std::lock_guard lock{ddl_worker_mutex}; std::lock_guard lock{ddl_worker_mutex};
ddl_worker = std::make_unique<DatabaseReplicatedDDLWorker>(this, getContext()); ddl_worker = std::make_unique<DatabaseReplicatedDDLWorker>(this, getContext());
ddl_worker->startup();
ddl_worker_initialized = true;
} }
ddl_worker->startup();
ddl_worker_initialized = true;
}); });
std::scoped_lock lock(mutex); std::scoped_lock lock(mutex);
return startup_replicated_database_task = makeLoadTask(async_loader, {job}); startup_replicated_database_task = makeLoadTask(async_loader, {job});
return startup_replicated_database_task;
} }
void DatabaseReplicated::waitDatabaseStarted() const void DatabaseReplicated::waitDatabaseStarted() const
@ -1530,8 +1531,11 @@ void DatabaseReplicated::stopReplication()
void DatabaseReplicated::shutdown() void DatabaseReplicated::shutdown()
{ {
stopReplication(); stopReplication();
ddl_worker_initialized = false; {
ddl_worker = nullptr; std::lock_guard lock{ddl_worker_mutex};
ddl_worker_initialized = false;
ddl_worker = nullptr;
}
DatabaseAtomic::shutdown(); DatabaseAtomic::shutdown();
} }
@ -1679,6 +1683,7 @@ bool DatabaseReplicated::canExecuteReplicatedMetadataAlter() const
/// It may update the metadata digest (both locally and in ZooKeeper) /// It may update the metadata digest (both locally and in ZooKeeper)
/// before DatabaseReplicatedDDLWorker::initializeReplication() has finished. /// before DatabaseReplicatedDDLWorker::initializeReplication() has finished.
/// We should not update metadata until the database is initialized. /// We should not update metadata until the database is initialized.
std::lock_guard lock{ddl_worker_mutex};
return ddl_worker_initialized && ddl_worker->isCurrentlyActive(); return ddl_worker_initialized && ddl_worker->isCurrentlyActive();
} }

View File

@ -155,7 +155,7 @@ private:
std::atomic_bool is_recovering = false; std::atomic_bool is_recovering = false;
std::atomic_bool ddl_worker_initialized = false; std::atomic_bool ddl_worker_initialized = false;
std::unique_ptr<DatabaseReplicatedDDLWorker> ddl_worker; std::unique_ptr<DatabaseReplicatedDDLWorker> ddl_worker;
std::mutex ddl_worker_mutex; mutable std::mutex ddl_worker_mutex;
UInt32 max_log_ptr_at_creation = 0; UInt32 max_log_ptr_at_creation = 0;
/// Usually operation with metadata are single-threaded because of the way replication works, /// Usually operation with metadata are single-threaded because of the way replication works,

View File

@ -34,7 +34,7 @@ public:
String getFileName() const override { return impl->getFileName(); } String getFileName() const override { return impl->getFileName(); }
size_t getFileSize() override { return impl->getFileSize(); } std::optional<size_t> tryGetFileSize() override { return impl->tryGetFileSize(); }
String getInfoForLog() override { return impl->getInfoForLog(); } String getInfoForLog() override { return impl->getInfoForLog(); }

View File

@ -810,6 +810,7 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
{ {
last_caller_id = FileSegment::getCallerId(); last_caller_id = FileSegment::getCallerId();
chassert(file_offset_of_buffer_end <= read_until_position);
if (file_offset_of_buffer_end == read_until_position) if (file_offset_of_buffer_end == read_until_position)
return false; return false;
@ -1051,7 +1052,11 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
if (download_current_segment && download_current_segment_succeeded) if (download_current_segment && download_current_segment_succeeded)
chassert(file_segment.getCurrentWriteOffset() >= file_offset_of_buffer_end); chassert(file_segment.getCurrentWriteOffset() >= file_offset_of_buffer_end);
chassert(file_offset_of_buffer_end <= read_until_position);
chassert(
file_offset_of_buffer_end <= read_until_position,
fmt::format("Expected {} <= {} (size: {}, read range: {})",
file_offset_of_buffer_end, read_until_position, size, current_read_range.toString()));
} }
swap(*implementation_buffer); swap(*implementation_buffer);

View File

@ -253,16 +253,15 @@ void ReadBufferFromAzureBlobStorage::initialize()
initialized = true; initialized = true;
} }
size_t ReadBufferFromAzureBlobStorage::getFileSize() std::optional<size_t> ReadBufferFromAzureBlobStorage::tryGetFileSize()
{ {
if (!blob_client) if (!blob_client)
blob_client = std::make_unique<Azure::Storage::Blobs::BlobClient>(blob_container_client->GetBlobClient(path)); blob_client = std::make_unique<Azure::Storage::Blobs::BlobClient>(blob_container_client->GetBlobClient(path));
if (file_size.has_value()) if (!file_size)
return *file_size; file_size = blob_client->GetProperties().Value.BlobSize;
file_size = blob_client->GetProperties().Value.BlobSize; return file_size;
return *file_size;
} }
size_t ReadBufferFromAzureBlobStorage::readBigAt(char * to, size_t n, size_t range_begin, const std::function<bool(size_t)> & /*progress_callback*/) const size_t ReadBufferFromAzureBlobStorage::readBigAt(char * to, size_t n, size_t range_begin, const std::function<bool(size_t)> & /*progress_callback*/) const

View File

@ -42,7 +42,7 @@ public:
bool supportsRightBoundedReads() const override { return true; } bool supportsRightBoundedReads() const override { return true; }
size_t getFileSize() override; std::optional<size_t> tryGetFileSize() override;
size_t readBigAt(char * to, size_t n, size_t range_begin, const std::function<bool(size_t)> & progress_callback) const override; size_t readBigAt(char * to, size_t n, size_t range_begin, const std::function<bool(size_t)> & progress_callback) const override;

View File

@ -41,7 +41,7 @@ public:
void setReadUntilEnd() override { setReadUntilPosition(getFileSize()); } void setReadUntilEnd() override { setReadUntilPosition(getFileSize()); }
size_t getFileSize() override { return getTotalSize(blobs_to_read); } std::optional<size_t> tryGetFileSize() override { return getTotalSize(blobs_to_read); }
size_t getFileOffsetOfBufferEnd() const override { return file_offset_of_buffer_end; } size_t getFileOffsetOfBufferEnd() const override { return file_offset_of_buffer_end; }

View File

@ -321,7 +321,7 @@ public:
off_t getPosition() override { throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "getPosition not supported when reading from archive"); } off_t getPosition() override { throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "getPosition not supported when reading from archive"); }
String getFileName() const override { return handle.getFileName(); } String getFileName() const override { return handle.getFileName(); }
size_t getFileSize() override { return handle.getFileInfo().uncompressed_size; } std::optional<size_t> tryGetFileSize() override { return handle.getFileInfo().uncompressed_size; }
Handle releaseHandle() && { return std::move(handle); } Handle releaseHandle() && { return std::move(handle); }

View File

@ -317,7 +317,7 @@ public:
String getFileName() const override { return handle.getFileName(); } String getFileName() const override { return handle.getFileName(); }
size_t getFileSize() override { return handle.getFileInfo().uncompressed_size; } std::optional<size_t> tryGetFileSize() override { return handle.getFileInfo().uncompressed_size; }
/// Releases owned handle to pass it to an enumerator. /// Releases owned handle to pass it to an enumerator.
HandleHolder releaseHandle() && HandleHolder releaseHandle() &&

View File

@ -93,7 +93,10 @@ void AsynchronousReadBufferFromFile::close()
return; return;
if (0 != ::close(fd)) if (0 != ::close(fd))
{
fd = -1;
throw Exception(ErrorCodes::CANNOT_CLOSE_FILE, "Cannot close file"); throw Exception(ErrorCodes::CANNOT_CLOSE_FILE, "Cannot close file");
}
fd = -1; fd = -1;
} }

View File

@ -244,7 +244,7 @@ void AsynchronousReadBufferFromFileDescriptor::rewind()
file_offset_of_buffer_end = 0; file_offset_of_buffer_end = 0;
} }
size_t AsynchronousReadBufferFromFileDescriptor::getFileSize() std::optional<size_t> AsynchronousReadBufferFromFileDescriptor::tryGetFileSize()
{ {
return getSizeFromFileDescriptor(fd, getFileName()); return getSizeFromFileDescriptor(fd, getFileName());
} }

View File

@ -68,7 +68,7 @@ public:
/// Seek to the beginning, discarding already read data if any. Useful to reread file that changes on every read. /// Seek to the beginning, discarding already read data if any. Useful to reread file that changes on every read.
void rewind(); void rewind();
size_t getFileSize() override; std::optional<size_t> tryGetFileSize() override;
size_t getFileOffsetOfBufferEnd() const override { return file_offset_of_buffer_end; } size_t getFileOffsetOfBufferEnd() const override { return file_offset_of_buffer_end; }

View File

@ -21,7 +21,7 @@ public:
off_t seek(off_t off, int whence) override; off_t seek(off_t off, int whence) override;
off_t getPosition() override; off_t getPosition() override;
size_t getFileSize() override { return total_size; } std::optional<size_t> tryGetFileSize() override { return total_size; }
private: private:
bool nextImpl() override; bool nextImpl() override;

View File

@ -77,7 +77,10 @@ void MMapReadBufferFromFile::close()
finish(); finish();
if (0 != ::close(fd)) if (0 != ::close(fd))
{
fd = -1;
throw Exception(ErrorCodes::CANNOT_CLOSE_FILE, "Cannot close file"); throw Exception(ErrorCodes::CANNOT_CLOSE_FILE, "Cannot close file");
}
fd = -1; fd = -1;
metric_increment.destroy(); metric_increment.destroy();

View File

@ -87,7 +87,7 @@ off_t MMapReadBufferFromFileDescriptor::seek(off_t offset, int whence)
return new_pos; return new_pos;
} }
size_t MMapReadBufferFromFileDescriptor::getFileSize() std::optional<size_t> MMapReadBufferFromFileDescriptor::tryGetFileSize()
{ {
return getSizeFromFileDescriptor(getFD(), getFileName()); return getSizeFromFileDescriptor(getFD(), getFileName());
} }

View File

@ -38,7 +38,7 @@ public:
int getFD() const; int getFD() const;
size_t getFileSize() override; std::optional<size_t> tryGetFileSize() override;
size_t readBigAt(char * to, size_t n, size_t offset, const std::function<bool(size_t)> &) const override; size_t readBigAt(char * to, size_t n, size_t offset, const std::function<bool(size_t)> &) const override;
bool supportsReadAt() override { return true; } bool supportsReadAt() override { return true; }

View File

@ -69,7 +69,10 @@ void MMappedFile::close()
finish(); finish();
if (0 != ::close(fd)) if (0 != ::close(fd))
{
fd = -1;
throw Exception(ErrorCodes::CANNOT_CLOSE_FILE, "Cannot close file"); throw Exception(ErrorCodes::CANNOT_CLOSE_FILE, "Cannot close file");
}
fd = -1; fd = -1;
metric_increment.destroy(); metric_increment.destroy();

View File

@ -67,11 +67,13 @@ void OpenedFile::close()
return; return;
if (0 != ::close(fd)) if (0 != ::close(fd))
{
fd = -1;
throw Exception(ErrorCodes::CANNOT_CLOSE_FILE, "Cannot close file"); throw Exception(ErrorCodes::CANNOT_CLOSE_FILE, "Cannot close file");
}
fd = -1; fd = -1;
metric_increment.destroy(); metric_increment.destroy();
} }
} }

View File

@ -152,7 +152,7 @@ off_t ParallelReadBuffer::seek(off_t offset, int whence)
return offset; return offset;
} }
size_t ParallelReadBuffer::getFileSize() std::optional<size_t> ParallelReadBuffer::tryGetFileSize()
{ {
return file_size; return file_size;
} }

View File

@ -33,7 +33,7 @@ public:
~ParallelReadBuffer() override { finishAndWait(); } ~ParallelReadBuffer() override { finishAndWait(); }
off_t seek(off_t off, int whence) override; off_t seek(off_t off, int whence) override;
size_t getFileSize() override; std::optional<size_t> tryGetFileSize() override;
off_t getPosition() override; off_t getPosition() override;
const SeekableReadBuffer & getReadBuffer() const { return input; } const SeekableReadBuffer & getReadBuffer() const { return input; }

View File

@ -19,7 +19,8 @@ private:
std::string getFileName() const override { return "<empty>"; } std::string getFileName() const override { return "<empty>"; }
off_t seek(off_t /*off*/, int /*whence*/) override { return 0; } off_t seek(off_t /*off*/, int /*whence*/) override { return 0; }
off_t getPosition() override { return 0; } off_t getPosition() override { return 0; }
size_t getFileSize() override { return 0; } std::optional<size_t> tryGetFileSize() override { return 0; }
size_t getFileOffsetOfBufferEnd() const override { return 0; }
}; };
} }

View File

@ -30,7 +30,7 @@ public:
void setReadUntilEnd() override { in->setReadUntilEnd(); } void setReadUntilEnd() override { in->setReadUntilEnd(); }
size_t getFileSize() override { return in->getFileSize(); } std::optional<size_t> tryGetFileSize() override { return in->tryGetFileSize(); }
private: private:
bool nextImpl() override; bool nextImpl() override;

View File

@ -88,7 +88,10 @@ void ReadBufferFromFile::close()
return; return;
if (0 != ::close(fd)) if (0 != ::close(fd))
{
fd = -1;
throw Exception(ErrorCodes::CANNOT_CLOSE_FILE, "Cannot close file"); throw Exception(ErrorCodes::CANNOT_CLOSE_FILE, "Cannot close file");
}
fd = -1; fd = -1;
metric_increment.destroy(); metric_increment.destroy();

View File

@ -5,11 +5,6 @@
namespace DB namespace DB
{ {
namespace ErrorCodes
{
extern const int UNKNOWN_FILE_SIZE;
}
ReadBufferFromFileBase::ReadBufferFromFileBase() : BufferWithOwnMemory<SeekableReadBuffer>(0) ReadBufferFromFileBase::ReadBufferFromFileBase() : BufferWithOwnMemory<SeekableReadBuffer>(0)
{ {
} }
@ -26,11 +21,9 @@ ReadBufferFromFileBase::ReadBufferFromFileBase(
ReadBufferFromFileBase::~ReadBufferFromFileBase() = default; ReadBufferFromFileBase::~ReadBufferFromFileBase() = default;
size_t ReadBufferFromFileBase::getFileSize() std::optional<size_t> ReadBufferFromFileBase::tryGetFileSize()
{ {
if (file_size) return file_size;
return *file_size;
throw Exception(ErrorCodes::UNKNOWN_FILE_SIZE, "Cannot find out file size for read buffer");
} }
void ReadBufferFromFileBase::setProgressCallback(ContextPtr context) void ReadBufferFromFileBase::setProgressCallback(ContextPtr context)

View File

@ -50,7 +50,7 @@ public:
clock_type = clock_type_; clock_type = clock_type_;
} }
size_t getFileSize() override; std::optional<size_t> tryGetFileSize() override;
void setProgressCallback(ContextPtr context); void setProgressCallback(ContextPtr context);

View File

@ -52,9 +52,9 @@ bool ReadBufferFromFileDecorator::nextImpl()
return result; return result;
} }
size_t ReadBufferFromFileDecorator::getFileSize() std::optional<size_t> ReadBufferFromFileDecorator::tryGetFileSize()
{ {
return getFileSizeFromReadBuffer(*impl); return tryGetFileSizeFromReadBuffer(*impl);
} }
} }

View File

@ -27,7 +27,7 @@ public:
ReadBuffer & getWrappedReadBuffer() { return *impl; } ReadBuffer & getWrappedReadBuffer() { return *impl; }
size_t getFileSize() override; std::optional<size_t> tryGetFileSize() override;
protected: protected:
std::unique_ptr<SeekableReadBuffer> impl; std::unique_ptr<SeekableReadBuffer> impl;

View File

@ -253,7 +253,7 @@ void ReadBufferFromFileDescriptor::rewind()
file_offset_of_buffer_end = 0; file_offset_of_buffer_end = 0;
} }
size_t ReadBufferFromFileDescriptor::getFileSize() std::optional<size_t> ReadBufferFromFileDescriptor::tryGetFileSize()
{ {
return getSizeFromFileDescriptor(fd, getFileName()); return getSizeFromFileDescriptor(fd, getFileName());
} }

View File

@ -69,7 +69,7 @@ public:
/// Seek to the beginning, discarding already read data if any. Useful to reread file that changes on every read. /// Seek to the beginning, discarding already read data if any. Useful to reread file that changes on every read.
void rewind(); void rewind();
size_t getFileSize() override; std::optional<size_t> tryGetFileSize() override;
bool checkIfActuallySeekable() override; bool checkIfActuallySeekable() override;

View File

@ -311,15 +311,15 @@ off_t ReadBufferFromS3::seek(off_t offset_, int whence)
return offset; return offset;
} }
size_t ReadBufferFromS3::getFileSize() std::optional<size_t> ReadBufferFromS3::tryGetFileSize()
{ {
if (file_size) if (file_size)
return *file_size; return file_size;
auto object_size = S3::getObjectSize(*client_ptr, bucket, key, version_id); auto object_size = S3::getObjectSize(*client_ptr, bucket, key, version_id);
file_size = object_size; file_size = object_size;
return *file_size; return file_size;
} }
off_t ReadBufferFromS3::getPosition() off_t ReadBufferFromS3::getPosition()

View File

@ -63,7 +63,7 @@ public:
off_t getPosition() override; off_t getPosition() override;
size_t getFileSize() override; std::optional<size_t> tryGetFileSize() override;
void setReadUntilPosition(size_t position) override; void setReadUntilPosition(size_t position) override;
void setReadUntilEnd() override; void setReadUntilEnd() override;

View File

@ -72,7 +72,6 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS; extern const int BAD_ARGUMENTS;
extern const int CANNOT_SEEK_THROUGH_FILE; extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int SEEK_POSITION_OUT_OF_BOUND; extern const int SEEK_POSITION_OUT_OF_BOUND;
extern const int UNKNOWN_FILE_SIZE;
} }
std::unique_ptr<ReadBuffer> ReadWriteBufferFromHTTP::CallResult::transformToReadBuffer(size_t buf_size) && std::unique_ptr<ReadBuffer> ReadWriteBufferFromHTTP::CallResult::transformToReadBuffer(size_t buf_size) &&
@ -121,15 +120,33 @@ void ReadWriteBufferFromHTTP::prepareRequest(Poco::Net::HTTPRequest & request, s
credentials.authenticate(request); credentials.authenticate(request);
} }
size_t ReadWriteBufferFromHTTP::getFileSize() std::optional<size_t> ReadWriteBufferFromHTTP::tryGetFileSize()
{ {
if (!file_info) if (!file_info)
file_info = getFileInfo(); {
try
{
file_info = getFileInfo();
}
catch (const HTTPException &)
{
return std::nullopt;
}
catch (const NetException &)
{
return std::nullopt;
}
catch (const Poco::Net::NetException &)
{
return std::nullopt;
}
catch (const Poco::IOException &)
{
return std::nullopt;
}
}
if (file_info->file_size) return file_info->file_size;
return *file_info->file_size;
throw Exception(ErrorCodes::UNKNOWN_FILE_SIZE, "Cannot find out file size for: {}", initial_uri.toString());
} }
bool ReadWriteBufferFromHTTP::supportsReadAt() bool ReadWriteBufferFromHTTP::supportsReadAt()
@ -311,12 +328,12 @@ void ReadWriteBufferFromHTTP::doWithRetries(std::function<void()> && callable,
error_message = e.displayText(); error_message = e.displayText();
exception = std::current_exception(); exception = std::current_exception();
} }
catch (DB::NetException & e) catch (NetException & e)
{ {
error_message = e.displayText(); error_message = e.displayText();
exception = std::current_exception(); exception = std::current_exception();
} }
catch (DB::HTTPException & e) catch (HTTPException & e)
{ {
if (!isRetriableError(e.getHTTPStatus())) if (!isRetriableError(e.getHTTPStatus()))
is_retriable = false; is_retriable = false;
@ -324,7 +341,7 @@ void ReadWriteBufferFromHTTP::doWithRetries(std::function<void()> && callable,
error_message = e.displayText(); error_message = e.displayText();
exception = std::current_exception(); exception = std::current_exception();
} }
catch (DB::Exception & e) catch (Exception & e)
{ {
is_retriable = false; is_retriable = false;
@ -683,7 +700,19 @@ std::optional<time_t> ReadWriteBufferFromHTTP::tryGetLastModificationTime()
{ {
file_info = getFileInfo(); file_info = getFileInfo();
} }
catch (...) catch (const HTTPException &)
{
return std::nullopt;
}
catch (const NetException &)
{
return std::nullopt;
}
catch (const Poco::Net::NetException &)
{
return std::nullopt;
}
catch (const Poco::IOException &)
{ {
return std::nullopt; return std::nullopt;
} }
@ -704,7 +733,7 @@ ReadWriteBufferFromHTTP::HTTPFileInfo ReadWriteBufferFromHTTP::getFileInfo()
{ {
getHeadResponse(response); getHeadResponse(response);
} }
catch (HTTPException & e) catch (const HTTPException & e)
{ {
/// Maybe the web server doesn't support HEAD requests. /// Maybe the web server doesn't support HEAD requests.
/// E.g. webhdfs reports status 400. /// E.g. webhdfs reports status 400.

View File

@ -118,7 +118,7 @@ private:
std::unique_ptr<ReadBuffer> initialize(); std::unique_ptr<ReadBuffer> initialize();
size_t getFileSize() override; std::optional<size_t> tryGetFileSize() override;
bool supportsReadAt() override; bool supportsReadAt() override;

View File

@ -13,41 +13,47 @@ namespace ErrorCodes
extern const int UNKNOWN_FILE_SIZE; extern const int UNKNOWN_FILE_SIZE;
} }
template <typename T> size_t WithFileSize::getFileSize()
static size_t getFileSize(T & in)
{ {
if (auto * with_file_size = dynamic_cast<WithFileSize *>(&in)) if (auto maybe_size = tryGetFileSize())
{ return *maybe_size;
return with_file_size->getFileSize();
}
throw Exception(ErrorCodes::UNKNOWN_FILE_SIZE, "Cannot find out file size"); throw Exception(ErrorCodes::UNKNOWN_FILE_SIZE, "Cannot find out file size");
} }
size_t getFileSizeFromReadBuffer(ReadBuffer & in) template <typename T>
static std::optional<size_t> tryGetFileSize(T & in)
{ {
if (auto * delegate = dynamic_cast<ReadBufferFromFileDecorator *>(&in)) if (auto * with_file_size = dynamic_cast<WithFileSize *>(&in))
{ return with_file_size->tryGetFileSize();
return getFileSize(delegate->getWrappedReadBuffer());
}
else if (auto * compressed = dynamic_cast<CompressedReadBufferWrapper *>(&in))
{
return getFileSize(compressed->getWrappedReadBuffer());
}
return getFileSize(in); return std::nullopt;
}
template <typename T>
static size_t getFileSize(T & in)
{
if (auto maybe_size = tryGetFileSize(in))
return *maybe_size;
throw Exception(ErrorCodes::UNKNOWN_FILE_SIZE, "Cannot find out file size");
} }
std::optional<size_t> tryGetFileSizeFromReadBuffer(ReadBuffer & in) std::optional<size_t> tryGetFileSizeFromReadBuffer(ReadBuffer & in)
{ {
try if (auto * delegate = dynamic_cast<ReadBufferFromFileDecorator *>(&in))
{ return tryGetFileSize(delegate->getWrappedReadBuffer());
return getFileSizeFromReadBuffer(in); else if (auto * compressed = dynamic_cast<CompressedReadBufferWrapper *>(&in))
} return tryGetFileSize(compressed->getWrappedReadBuffer());
catch (...) return tryGetFileSize(in);
{ }
return std::nullopt;
} size_t getFileSizeFromReadBuffer(ReadBuffer & in)
{
if (auto maybe_size = tryGetFileSizeFromReadBuffer(in))
return *maybe_size;
throw Exception(ErrorCodes::UNKNOWN_FILE_SIZE, "Cannot find out file size");
} }
bool isBufferWithFileSize(const ReadBuffer & in) bool isBufferWithFileSize(const ReadBuffer & in)

View File

@ -10,15 +10,16 @@ class ReadBuffer;
class WithFileSize class WithFileSize
{ {
public: public:
virtual size_t getFileSize() = 0; /// Returns nullopt if couldn't find out file size;
virtual std::optional<size_t> tryGetFileSize() = 0;
virtual ~WithFileSize() = default; virtual ~WithFileSize() = default;
size_t getFileSize();
}; };
bool isBufferWithFileSize(const ReadBuffer & in); bool isBufferWithFileSize(const ReadBuffer & in);
size_t getFileSizeFromReadBuffer(ReadBuffer & in); size_t getFileSizeFromReadBuffer(ReadBuffer & in);
/// Return nullopt if couldn't find out file size;
std::optional<size_t> tryGetFileSizeFromReadBuffer(ReadBuffer & in); std::optional<size_t> tryGetFileSizeFromReadBuffer(ReadBuffer & in);
size_t getDataOffsetMaybeCompressed(const ReadBuffer & in); size_t getDataOffsetMaybeCompressed(const ReadBuffer & in);

View File

@ -116,7 +116,10 @@ void WriteBufferFromFile::close()
finalize(); finalize();
if (0 != ::close(fd)) if (0 != ::close(fd))
{
fd = -1;
throw Exception(ErrorCodes::CANNOT_CLOSE_FILE, "Cannot close file"); throw Exception(ErrorCodes::CANNOT_CLOSE_FILE, "Cannot close file");
}
fd = -1; fd = -1;
metric_increment.destroy(); metric_increment.destroy();

View File

@ -13,10 +13,6 @@
namespace DB namespace DB
{ {
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
namespace namespace
{ {
@ -237,16 +233,8 @@ void SubstituteColumnOptimizer::perform()
const auto & compare_graph = metadata_snapshot->getConstraints().getGraph(); const auto & compare_graph = metadata_snapshot->getConstraints().getGraph();
// Fill aliases if (compare_graph.getNumOfComponents() == 0)
if (select_query->select()) return;
{
auto * list = select_query->refSelect()->as<ASTExpressionList>();
if (!list)
throw Exception(ErrorCodes::LOGICAL_ERROR, "List of selected columns must be ASTExpressionList");
for (ASTPtr & ast : list->children)
ast->setAlias(ast->getAliasOrColumnName());
}
auto run_for_all = [&](const auto func) auto run_for_all = [&](const auto func)
{ {

View File

@ -15,7 +15,7 @@ struct StorageInMemoryMetadata;
using StorageMetadataPtr = std::shared_ptr<const StorageInMemoryMetadata>; using StorageMetadataPtr = std::shared_ptr<const StorageInMemoryMetadata>;
/// Optimizer that tries to replace columns to equal columns (according to constraints) /// Optimizer that tries to replace columns to equal columns (according to constraints)
/// with lower size (according to compressed and uncomressed size). /// with lower size (according to compressed and uncompressed sizes).
class SubstituteColumnOptimizer class SubstituteColumnOptimizer
{ {
public: public:

View File

@ -16,16 +16,9 @@
namespace DB namespace DB
{ {
static constinit std::atomic<bool> allow_logging{true};
void OwnSplitChannel::disableLogging()
{
allow_logging = false;
}
void OwnSplitChannel::log(const Poco::Message & msg) void OwnSplitChannel::log(const Poco::Message & msg)
{ {
if (!allow_logging) if (!isLoggingEnabled())
return; return;
#ifndef WITHOUT_TEXT_LOG #ifndef WITHOUT_TEXT_LOG

View File

@ -39,8 +39,6 @@ public:
void setLevel(const std::string & name, int level); void setLevel(const std::string & name, int level);
static void disableLogging();
private: private:
void logSplit(const Poco::Message & msg); void logSplit(const Poco::Message & msg);
void tryLogSplit(const Poco::Message & msg); void tryLogSplit(const Poco::Message & msg);

View File

@ -66,7 +66,7 @@ public:
/** Set the alias. */ /** Set the alias. */
virtual void setAlias(const String & /*to*/) virtual void setAlias(const String & /*to*/)
{ {
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't set alias of {}", getColumnName()); throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't set alias of {} of {}", getColumnName(), getID());
} }
/** Get the text that identifies this element. */ /** Get the text that identifies this element. */

View File

@ -8,7 +8,6 @@
#include <Storages/IStorage_fwd.h> #include <Storages/IStorage_fwd.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Interpreters/StorageID.h> #include <Interpreters/StorageID.h>
#include <Common/TimerDescriptor.h>
#include <sys/types.h> #include <sys/types.h>

View File

@ -69,7 +69,7 @@ private:
/// * timer is a timerfd descriptor to manually check socket timeout /// * timer is a timerfd descriptor to manually check socket timeout
/// * pipe_fd is a pipe we use to cancel query and socket polling by executor. /// * pipe_fd is a pipe we use to cancel query and socket polling by executor.
/// We put those descriptors into our own epoll which is used by external executor. /// We put those descriptors into our own epoll which is used by external executor.
TimerDescriptor timer{CLOCK_MONOTONIC, 0}; TimerDescriptor timer;
Poco::Timespan timeout; Poco::Timespan timeout;
AsyncEventTimeoutType timeout_type; AsyncEventTimeoutType timeout_type;
std::atomic_bool is_timer_alarmed = false; std::atomic_bool is_timer_alarmed = false;

View File

@ -53,7 +53,7 @@ public:
bool nextImpl() override; bool nextImpl() override;
off_t seek(off_t off, int whence) override; off_t seek(off_t off, int whence) override;
off_t getPosition() override; off_t getPosition() override;
size_t getFileSize() override { return remote_file_size; } std::optional<size_t> tryGetFileSize() override { return remote_file_size; }
private: private:
std::unique_ptr<LocalFileHolder> local_file_holder; std::unique_ptr<LocalFileHolder> local_file_holder;

View File

@ -4405,25 +4405,25 @@ bool MergeTreeData::tryRemovePartImmediately(DataPartPtr && part)
size_t MergeTreeData::getTotalActiveSizeInBytes() const size_t MergeTreeData::getTotalActiveSizeInBytes() const
{ {
return total_active_size_bytes.load(std::memory_order_acquire); return total_active_size_bytes.load();
} }
size_t MergeTreeData::getTotalActiveSizeInRows() const size_t MergeTreeData::getTotalActiveSizeInRows() const
{ {
return total_active_size_rows.load(std::memory_order_acquire); return total_active_size_rows.load();
} }
size_t MergeTreeData::getActivePartsCount() const size_t MergeTreeData::getActivePartsCount() const
{ {
return total_active_size_parts.load(std::memory_order_acquire); return total_active_size_parts.load();
} }
size_t MergeTreeData::getOutdatedPartsCount() const size_t MergeTreeData::getOutdatedPartsCount() const
{ {
return total_outdated_parts_count.load(std::memory_order_relaxed); return total_outdated_parts_count.load();
} }
size_t MergeTreeData::getNumberOfOutdatedPartsWithExpiredRemovalTime() const size_t MergeTreeData::getNumberOfOutdatedPartsWithExpiredRemovalTime() const
@ -8184,16 +8184,16 @@ void MergeTreeData::removePartContributionToDataVolume(const DataPartPtr & part)
void MergeTreeData::increaseDataVolume(ssize_t bytes, ssize_t rows, ssize_t parts) void MergeTreeData::increaseDataVolume(ssize_t bytes, ssize_t rows, ssize_t parts)
{ {
total_active_size_bytes.fetch_add(bytes, std::memory_order_acq_rel); total_active_size_bytes.fetch_add(bytes);
total_active_size_rows.fetch_add(rows, std::memory_order_acq_rel); total_active_size_rows.fetch_add(rows);
total_active_size_parts.fetch_add(parts, std::memory_order_acq_rel); total_active_size_parts.fetch_add(parts);
} }
void MergeTreeData::setDataVolume(size_t bytes, size_t rows, size_t parts) void MergeTreeData::setDataVolume(size_t bytes, size_t rows, size_t parts)
{ {
total_active_size_bytes.store(bytes, std::memory_order_release); total_active_size_bytes.store(bytes);
total_active_size_rows.store(rows, std::memory_order_release); total_active_size_rows.store(rows);
total_active_size_parts.store(parts, std::memory_order_release); total_active_size_parts.store(parts);
} }
bool MergeTreeData::insertQueryIdOrThrow(const String & query_id, size_t max_queries) const bool MergeTreeData::insertQueryIdOrThrow(const String & query_id, size_t max_queries) const

View File

@ -91,9 +91,9 @@ void AsynchronousReadBufferFromHDFS::prefetch(Priority priority)
} }
size_t AsynchronousReadBufferFromHDFS::getFileSize() std::optional<size_t> AsynchronousReadBufferFromHDFS::tryGetFileSize()
{ {
return impl->getFileSize(); return impl->tryGetFileSize();
} }
String AsynchronousReadBufferFromHDFS::getFileName() const String AsynchronousReadBufferFromHDFS::getFileName() const

View File

@ -35,7 +35,7 @@ public:
void prefetch(Priority priority) override; void prefetch(Priority priority) override;
size_t getFileSize() override; std::optional<size_t> tryGetFileSize() override;
String getFileName() const override; String getFileName() const override;

View File

@ -31,7 +31,7 @@ namespace ErrorCodes
} }
struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<SeekableReadBuffer> struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<SeekableReadBuffer>, public WithFileSize
{ {
String hdfs_uri; String hdfs_uri;
String hdfs_file_path; String hdfs_file_path;
@ -90,7 +90,7 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
hdfsCloseFile(fs.get(), fin); hdfsCloseFile(fs.get(), fin);
} }
size_t getFileSize() const std::optional<size_t> tryGetFileSize() override
{ {
return file_size; return file_size;
} }
@ -191,9 +191,9 @@ ReadBufferFromHDFS::ReadBufferFromHDFS(
ReadBufferFromHDFS::~ReadBufferFromHDFS() = default; ReadBufferFromHDFS::~ReadBufferFromHDFS() = default;
size_t ReadBufferFromHDFS::getFileSize() std::optional<size_t> ReadBufferFromHDFS::tryGetFileSize()
{ {
return impl->getFileSize(); return impl->tryGetFileSize();
} }
bool ReadBufferFromHDFS::nextImpl() bool ReadBufferFromHDFS::nextImpl()

View File

@ -40,7 +40,7 @@ public:
off_t getPosition() override; off_t getPosition() override;
size_t getFileSize() override; std::optional<size_t> tryGetFileSize() override;
size_t getFileOffsetOfBufferEnd() const override; size_t getFileOffsetOfBufferEnd() const override;

View File

@ -36,6 +36,7 @@ namespace ErrorCodes
{ {
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS; extern const int BAD_ARGUMENTS;
extern const int BAD_QUERY_PARAMETER;
extern const int QUERY_NOT_ALLOWED; extern const int QUERY_NOT_ALLOWED;
} }
@ -150,7 +151,7 @@ StorageObjectStorageQueue::StorageObjectStorageQueue(
} }
else if (!configuration->isPathWithGlobs()) else if (!configuration->isPathWithGlobs())
{ {
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "ObjectStorageQueue url must either end with '/' or contain globs"); throw Exception(ErrorCodes::BAD_QUERY_PARAMETER, "ObjectStorageQueue url must either end with '/' or contain globs");
} }
checkAndAdjustSettings(*queue_settings, engine_args, mode > LoadingStrictnessLevel::CREATE, log); checkAndAdjustSettings(*queue_settings, engine_args, mode > LoadingStrictnessLevel::CREATE, log);

View File

@ -369,6 +369,14 @@ void StorageMerge::read(
/// What will be result structure depending on query processed stage in source tables? /// What will be result structure depending on query processed stage in source tables?
Block common_header = getHeaderForProcessingStage(column_names, storage_snapshot, query_info, local_context, processed_stage); Block common_header = getHeaderForProcessingStage(column_names, storage_snapshot, query_info, local_context, processed_stage);
if (local_context->getSettingsRef().allow_experimental_analyzer && processed_stage == QueryProcessingStage::Complete)
{
/// Remove constants.
/// For StorageDistributed some functions like `hostName` that are constants only for local queries.
for (auto & column : common_header)
column.column = column.column->convertToFullColumnIfConst();
}
auto step = std::make_unique<ReadFromMerge>( auto step = std::make_unique<ReadFromMerge>(
column_names, column_names,
query_info, query_info,

View File

@ -4,6 +4,7 @@
#include <DataTypes/DataTypeString.h> #include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h> #include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeDateTime.h> #include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeNullable.h>
#include <Common/ZooKeeper/ZooKeeper.h> #include <Common/ZooKeeper/ZooKeeper.h>
#include <Coordination/KeeperFeatureFlags.h> #include <Coordination/KeeperFeatureFlags.h>
#include <Storages/System/StorageSystemZooKeeperConnection.h> #include <Storages/System/StorageSystemZooKeeperConnection.h>
@ -27,7 +28,7 @@ ColumnsDescription StorageSystemZooKeeperConnection::getColumnsDescription()
/* 0 */ {"name", std::make_shared<DataTypeString>(), "ZooKeeper cluster's name."}, /* 0 */ {"name", std::make_shared<DataTypeString>(), "ZooKeeper cluster's name."},
/* 1 */ {"host", std::make_shared<DataTypeString>(), "The hostname/IP of the ZooKeeper node that ClickHouse connected to."}, /* 1 */ {"host", std::make_shared<DataTypeString>(), "The hostname/IP of the ZooKeeper node that ClickHouse connected to."},
/* 2 */ {"port", std::make_shared<DataTypeUInt16>(), "The port of the ZooKeeper node that ClickHouse connected to."}, /* 2 */ {"port", std::make_shared<DataTypeUInt16>(), "The port of the ZooKeeper node that ClickHouse connected to."},
/* 3 */ {"index", std::make_shared<DataTypeUInt8>(), "The index of the ZooKeeper node that ClickHouse connected to. The index is from ZooKeeper config."}, /* 3 */ {"index", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt8>()), "The index of the ZooKeeper node that ClickHouse connected to. The index is from ZooKeeper config. If not connected, this column is NULL."},
/* 4 */ {"connected_time", std::make_shared<DataTypeDateTime>(), "When the connection was established."}, /* 4 */ {"connected_time", std::make_shared<DataTypeDateTime>(), "When the connection was established."},
/* 5 */ {"session_uptime_elapsed_seconds", std::make_shared<DataTypeUInt64>(), "Seconds elapsed since the connection was established."}, /* 5 */ {"session_uptime_elapsed_seconds", std::make_shared<DataTypeUInt64>(), "Seconds elapsed since the connection was established."},
/* 6 */ {"is_expired", std::make_shared<DataTypeUInt8>(), "Is the current connection expired."}, /* 6 */ {"is_expired", std::make_shared<DataTypeUInt8>(), "Is the current connection expired."},
@ -64,7 +65,7 @@ void StorageSystemZooKeeperConnection::fillData(MutableColumns & res_columns, Co
/// For read-only snapshot type functionality, it's acceptable even though 'getZooKeeper' may cause data inconsistency. /// For read-only snapshot type functionality, it's acceptable even though 'getZooKeeper' may cause data inconsistency.
auto fill_data = [&](const String & name, const zkutil::ZooKeeperPtr zookeeper, MutableColumns & columns) auto fill_data = [&](const String & name, const zkutil::ZooKeeperPtr zookeeper, MutableColumns & columns)
{ {
Int8 index = zookeeper->getConnectedHostIdx(); auto index = zookeeper->getConnectedHostIdx();
String host_port = zookeeper->getConnectedHostPort(); String host_port = zookeeper->getConnectedHostPort();
if (index != -1 && !host_port.empty()) if (index != -1 && !host_port.empty())
{ {
@ -78,7 +79,10 @@ void StorageSystemZooKeeperConnection::fillData(MutableColumns & res_columns, Co
columns[0]->insert(name); columns[0]->insert(name);
columns[1]->insert(host); columns[1]->insert(host);
columns[2]->insert(port); columns[2]->insert(port);
columns[3]->insert(index); if (index)
columns[3]->insert(*index);
else
columns[3]->insertDefault();
columns[4]->insert(connected_time); columns[4]->insert(connected_time);
columns[5]->insert(uptime); columns[5]->insert(uptime);
columns[6]->insert(zookeeper->expired()); columns[6]->insert(zookeeper->expired());

View File

@ -420,7 +420,12 @@ class Backport:
fetch_release_prs = self.gh.get_release_pulls(self._fetch_from) fetch_release_prs = self.gh.get_release_pulls(self._fetch_from)
fetch_release_branches = [pr.head.ref for pr in fetch_release_prs] fetch_release_branches = [pr.head.ref for pr in fetch_release_prs]
self.labels_to_backport = [ self.labels_to_backport = [
f"v{branch}-must-backport" for branch in fetch_release_branches (
f"v{branch}-must-backport"
if self._repo_name == "ClickHouse/ClickHouse"
else f"v{branch.replace('release/','')}-must-backport"
)
for branch in fetch_release_branches
] ]
logging.info("Fetching from %s", self._fetch_from) logging.info("Fetching from %s", self._fetch_from)
@ -490,17 +495,23 @@ class Backport:
def process_pr(self, pr: PullRequest) -> None: def process_pr(self, pr: PullRequest) -> None:
pr_labels = [label.name for label in pr.labels] pr_labels = [label.name for label in pr.labels]
if ( if any(label in pr_labels for label in self.must_create_backport_labels):
any(label in pr_labels for label in self.must_create_backport_labels)
or self._repo_name != self._fetch_from
):
branches = [ branches = [
ReleaseBranch(br, pr, self.repo, self.backport_created_label) ReleaseBranch(br, pr, self.repo, self.backport_created_label)
for br in self.release_branches for br in self.release_branches
] # type: List[ReleaseBranch] ] # type: List[ReleaseBranch]
else: else:
branches = [ branches = [
ReleaseBranch(br, pr, self.repo, self.backport_created_label) ReleaseBranch(
(
br
if self._repo_name == "ClickHouse/Clickhouse"
else f"release/{br}"
),
pr,
self.repo,
self.backport_created_label,
)
for br in [ for br in [
label.split("-", 1)[0][1:] # v21.8-must-backport label.split("-", 1)[0][1:] # v21.8-must-backport
for label in pr_labels for label in pr_labels

View File

@ -587,10 +587,10 @@ class CI:
if job_name in REQUIRED_CHECKS: if job_name in REQUIRED_CHECKS:
stage_type = WorkflowStages.TESTS_1 stage_type = WorkflowStages.TESTS_1
else: else:
stage_type = WorkflowStages.TESTS_3 stage_type = WorkflowStages.TESTS_2
assert stage_type, f"BUG [{job_name}]" assert stage_type, f"BUG [{job_name}]"
if non_blocking_ci and stage_type == WorkflowStages.TESTS_3: if non_blocking_ci and stage_type == WorkflowStages.TESTS_2:
stage_type = WorkflowStages.TESTS_2 stage_type = WorkflowStages.TESTS_2_WW
return stage_type return stage_type
@classmethod @classmethod

View File

@ -67,10 +67,10 @@ class WorkflowStages(metaclass=WithIter):
BUILDS_2 = "Builds_2" BUILDS_2 = "Builds_2"
# all tests required for merge # all tests required for merge
TESTS_1 = "Tests_1" TESTS_1 = "Tests_1"
# not used atm # used in woolenwolfdog mode
TESTS_2 = "Tests_2" TESTS_2_WW = "Tests_2_ww"
# all tests not required for merge # all tests not required for merge
TESTS_3 = "Tests_3" TESTS_2 = "Tests_2"
class Runners(metaclass=WithIter): class Runners(metaclass=WithIter):

View File

@ -212,7 +212,7 @@ class Shell:
return res.stdout.strip() return res.stdout.strip()
@classmethod @classmethod
def run(cls, command, check=False, dry_run=False): def run(cls, command, check=False, dry_run=False, **kwargs):
if dry_run: if dry_run:
print(f"Dry-ryn. Would run command [{command}]") print(f"Dry-ryn. Would run command [{command}]")
return "" return ""
@ -225,6 +225,7 @@ class Shell:
stderr=subprocess.PIPE, stderr=subprocess.PIPE,
text=True, text=True,
check=False, check=False,
**kwargs,
) )
if result.returncode == 0: if result.returncode == 0:
print(f"stdout: {result.stdout.strip()}") print(f"stdout: {result.stdout.strip()}")

View File

@ -3,12 +3,12 @@
import json import json
import logging import logging
import os import os
import subprocess
from pathlib import Path from pathlib import Path
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
from env_helper import ROOT_DIR, DOCKER_TAG from env_helper import ROOT_DIR, DOCKER_TAG
from get_robot_token import get_parameter_from_ssm from get_robot_token import get_parameter_from_ssm
from ci_utils import Shell
IMAGES_FILE_PATH = Path("docker/images.json") IMAGES_FILE_PATH = Path("docker/images.json")
@ -16,20 +16,14 @@ ImagesDict = Dict[str, dict]
def docker_login(relogin: bool = True) -> None: def docker_login(relogin: bool = True) -> None:
if ( if relogin or not Shell.check(
relogin "docker system info | grep --quiet -E 'Username|Registry'"
or subprocess.run( # pylint: disable=unexpected-keyword-arg
"docker system info | grep --quiet -E 'Username|Registry'",
shell=True,
check=False,
).returncode
== 1
): ):
subprocess.check_output( # pylint: disable=unexpected-keyword-arg Shell.run( # pylint: disable=unexpected-keyword-arg
"docker login --username 'robotclickhouse' --password-stdin", "docker login --username 'robotclickhouse' --password-stdin",
input=get_parameter_from_ssm("dockerhub_robot_password"), input=get_parameter_from_ssm("dockerhub_robot_password"),
encoding="utf-8", encoding="utf-8",
shell=True, check=True,
) )
@ -48,14 +42,10 @@ class DockerImage:
def pull_image(image: DockerImage) -> DockerImage: def pull_image(image: DockerImage) -> DockerImage:
try: try:
logging.info("Pulling image %s - start", image) logging.info("Pulling image %s - start", image)
subprocess.check_output( Shell.run(f"docker pull {image}", check=True)
f"docker pull {image}",
stderr=subprocess.STDOUT,
shell=True,
)
logging.info("Pulling image %s - done", image) logging.info("Pulling image %s - done", image)
except Exception as ex: except Exception as ex:
logging.info("Got execption pulling docker %s", ex) logging.info("Got exception pulling docker %s", ex)
raise ex raise ex
return image return image

View File

@ -2,6 +2,7 @@
import json import json
import logging import logging
import os import os
import re
from typing import Dict, List, Set, Union from typing import Dict, List, Set, Union
from urllib.parse import quote from urllib.parse import quote
@ -328,7 +329,13 @@ class PRInfo:
@property @property
def is_release(self) -> bool: def is_release(self) -> bool:
return self.number == 0 and not self.is_merge_queue return self.is_master or (
self.is_push_event
and (
bool(re.match(r"^2[1-9]\.[1-9][0-9]*$", self.head_ref))
or bool(re.match(r"^release/2[1-9]\.[1-9][0-9]*$", self.head_ref))
)
)
@property @property
def is_pr(self): def is_pr(self):

View File

@ -125,7 +125,7 @@ html {{ min-height: 100%; font-family: "DejaVu Sans", "Noto Sans", Arial, sans-s
h1 {{ margin-left: 10px; }} h1 {{ margin-left: 10px; }}
th, td {{ padding: 5px 10px 5px 10px; text-align: left; vertical-align: top; line-height: 1.5; border: 1px solid var(--table-border-color); }} th, td {{ padding: 5px 10px 5px 10px; text-align: left; vertical-align: top; line-height: 1.5; border: 1px solid var(--table-border-color); }}
td {{ background: var(--td-background); }} td {{ background: var(--td-background); }}
th {{ background: var(--th-background); }} th {{ background: var(--th-background); white-space: nowrap; }}
a {{ color: var(--link-color); text-decoration: none; }} a {{ color: var(--link-color); text-decoration: none; }}
a:hover, a:active {{ color: var(--link-hover-color); text-decoration: none; }} a:hover, a:active {{ color: var(--link-hover-color); text-decoration: none; }}
table {{ box-shadow: 0 8px 25px -5px rgba(0, 0, 0, var(--shadow-intensity)); border-collapse: collapse; border-spacing: 0; }} table {{ box-shadow: 0 8px 25px -5px rgba(0, 0, 0, var(--shadow-intensity)); border-collapse: collapse; border-spacing: 0; }}
@ -135,6 +135,7 @@ th {{ cursor: pointer; }}
tr:hover {{ filter: var(--tr-hover-filter); }} tr:hover {{ filter: var(--tr-hover-filter); }}
.expandable {{ cursor: pointer; }} .expandable {{ cursor: pointer; }}
.expandable-content {{ display: none; }} .expandable-content {{ display: none; }}
pre {{ white-space: pre-wrap; }}
#fish {{ display: none; float: right; position: relative; top: -20em; right: 2vw; margin-bottom: -20em; width: 30vw; filter: brightness(7%); z-index: -1; }} #fish {{ display: none; float: right; position: relative; top: -20em; right: 2vw; margin-bottom: -20em; width: 30vw; filter: brightness(7%); z-index: -1; }}
.themes {{ .themes {{

View File

@ -211,7 +211,7 @@ class TestCIConfig(unittest.TestCase):
else: else:
self.assertTrue( self.assertTrue(
CI.get_job_ci_stage(job) CI.get_job_ci_stage(job)
in (CI.WorkflowStages.TESTS_1, CI.WorkflowStages.TESTS_3), in (CI.WorkflowStages.TESTS_1, CI.WorkflowStages.TESTS_2),
msg=f"Stage for [{job}] is not correct", msg=f"Stage for [{job}] is not correct",
) )
@ -242,7 +242,7 @@ class TestCIConfig(unittest.TestCase):
else: else:
self.assertTrue( self.assertTrue(
CI.get_job_ci_stage(job, non_blocking_ci=True) CI.get_job_ci_stage(job, non_blocking_ci=True)
in (CI.WorkflowStages.TESTS_1, CI.WorkflowStages.TESTS_2), in (CI.WorkflowStages.TESTS_1, CI.WorkflowStages.TESTS_2_WW),
msg=f"Stage for [{job}] is not correct", msg=f"Stage for [{job}] is not correct",
) )
@ -478,6 +478,7 @@ class TestCIConfig(unittest.TestCase):
pr_info = PRInfo(github_event=_TEST_EVENT_JSON) pr_info = PRInfo(github_event=_TEST_EVENT_JSON)
pr_info.event_type = EventType.PUSH pr_info.event_type = EventType.PUSH
pr_info.number = 0 pr_info.number = 0
pr_info.head_ref = "24.12345"
assert pr_info.is_release and not pr_info.is_merge_queue assert pr_info.is_release and not pr_info.is_merge_queue
ci_cache = CIPY._configure_jobs( ci_cache = CIPY._configure_jobs(
S3Helper(), pr_info, settings, skip_jobs=False, dry_run=True S3Helper(), pr_info, settings, skip_jobs=False, dry_run=True

View File

@ -43,7 +43,6 @@
"test_replicated_database/test.py::test_startup_without_zk", "test_replicated_database/test.py::test_startup_without_zk",
"test_replicated_database/test.py::test_sync_replica", "test_replicated_database/test.py::test_sync_replica",
"test_replicated_fetches_timeouts/test.py::test_no_stall", "test_replicated_fetches_timeouts/test.py::test_no_stall",
"test_storage_kafka/test.py::test_kafka_no_holes_when_write_suffix_failed",
"test_storage_s3/test.py::test_url_reconnect_in_the_middle", "test_storage_s3/test.py::test_url_reconnect_in_the_middle",
"test_system_metrics/test.py::test_readonly_metrics", "test_system_metrics/test.py::test_readonly_metrics",
"test_system_replicated_fetches/test.py::test_system_replicated_fetches", "test_system_replicated_fetches/test.py::test_system_replicated_fetches",
@ -97,5 +96,70 @@
"test_ttl_move/test.py::TestCancelBackgroundMoving::test_cancel_background_moving_on_stop_moves_query", "test_ttl_move/test.py::TestCancelBackgroundMoving::test_cancel_background_moving_on_stop_moves_query",
"test_ttl_move/test.py::TestCancelBackgroundMoving::test_cancel_background_moving_on_table_detach", "test_ttl_move/test.py::TestCancelBackgroundMoving::test_cancel_background_moving_on_table_detach",
"test_ttl_move/test.py::TestCancelBackgroundMoving::test_cancel_background_moving_on_zookeeper_disconnect" "test_ttl_move/test.py::TestCancelBackgroundMoving::test_cancel_background_moving_on_zookeeper_disconnect",
"test_storage_kafka/test.py::test_kafka_column_types",
"test_storage_kafka/test.py::test_kafka_settings_old_syntax",
"test_storage_kafka/test.py::test_kafka_settings_new_syntax",
"test_storage_kafka/test.py::test_kafka_settings_predefined_macros",
"test_storage_kafka/test.py::test_kafka_json_as_string",
"test_storage_kafka/test.py::test_kafka_formats",
"test_storage_kafka/test.py::test_kafka_issue11308",
"test_storage_kafka/test.py::test_kafka_issue4116",
"test_storage_kafka/test.py::test_kafka_consumer_hang",
"test_storage_kafka/test.py::test_kafka_consumer_hang2",
"test_storage_kafka/test.py::test_kafka_read_consumers_in_parallel",
"test_storage_kafka/test.py::test_kafka_csv_with_delimiter",
"test_storage_kafka/test.py::test_kafka_tsv_with_delimiter",
"test_storage_kafka/test.py::test_kafka_select_empty",
"test_storage_kafka/test.py::test_kafka_json_without_delimiter",
"test_storage_kafka/test.py::test_kafka_protobuf",
"test_storage_kafka/test.py::test_kafka_string_field_on_first_position_in_protobuf",
"test_storage_kafka/test.py::test_kafka_protobuf_no_delimiter",
"test_storage_kafka/test.py::test_kafka_materialized_view",
"test_storage_kafka/test.py::test_kafka_recreate_kafka_table",
"test_storage_kafka/test.py::test_librdkafka_compression",
"test_storage_kafka/test.py::test_kafka_materialized_view_with_subquery",
"test_storage_kafka/test.py::test_kafka_many_materialized_views",
"test_storage_kafka/test.py::test_kafka_flush_on_big_message",
"test_storage_kafka/test.py::test_kafka_virtual_columns",
"test_storage_kafka/test.py::test_kafka_virtual_columns_with_materialized_view",
"test_storage_kafka/test.py::test_kafka_insert",
"test_storage_kafka/test.py::test_kafka_produce_consume",
"test_storage_kafka/test.py::test_kafka_commit_on_block_write",
"test_storage_kafka/test.py::test_kafka_virtual_columns2",
"test_storage_kafka/test.py::test_kafka_producer_consumer_separate_settings",
"test_storage_kafka/test.py::test_kafka_produce_key_timestamp",
"test_storage_kafka/test.py::test_kafka_insert_avro",
"test_storage_kafka/test.py::test_kafka_produce_consume_avro",
"test_storage_kafka/test.py::test_kafka_flush_by_time",
"test_storage_kafka/test.py::test_kafka_flush_by_block_size",
"test_storage_kafka/test.py::test_kafka_lot_of_partitions_partial_commit_of_bulk",
"test_storage_kafka/test.py::test_kafka_rebalance",
"test_storage_kafka/test.py::test_kafka_no_holes_when_write_suffix_failed",
"test_storage_kafka/test.py::test_exception_from_destructor",
"test_storage_kafka/test.py::test_commits_of_unprocessed_messages_on_drop",
"test_storage_kafka/test.py::test_bad_reschedule",
"test_storage_kafka/test.py::test_kafka_duplicates_when_commit_failed",
"test_storage_kafka/test.py::test_premature_flush_on_eof",
"test_storage_kafka/test.py::test_kafka_unavailable",
"test_storage_kafka/test.py::test_kafka_issue14202",
"test_storage_kafka/test.py::test_kafka_csv_with_thread_per_consumer",
"test_storage_kafka/test.py::test_kafka_engine_put_errors_to_stream",
"test_storage_kafka/test.py::test_kafka_engine_put_errors_to_stream_with_random_malformed_json",
"test_storage_kafka/test.py::test_kafka_formats_with_broken_message",
"test_storage_kafka/test.py::test_kafka_consumer_failover",
"test_storage_kafka/test.py::test_kafka_predefined_configuration",
"test_storage_kafka/test.py::test_issue26643",
"test_storage_kafka/test.py::test_num_consumers_limit",
"test_storage_kafka/test.py::test_format_with_prefix_and_suffix",
"test_storage_kafka/test.py::test_max_rows_per_message",
"test_storage_kafka/test.py::test_row_based_formats",
"test_storage_kafka/test.py::test_block_based_formats_1",
"test_storage_kafka/test.py::test_block_based_formats_2",
"test_storage_kafka/test.py::test_system_kafka_consumers",
"test_storage_kafka/test.py::test_system_kafka_consumers_rebalance",
"test_storage_kafka/test.py::test_system_kafka_consumers_rebalance_mv",
"test_storage_kafka/test.py::test_formats_errors",
"test_storage_kafka/test.py::test_multiple_read_in_materialized_views"
] ]

View File

@ -1,6 +1,7 @@
drop table if exists max_parts_in_total; drop table if exists max_parts_in_total;
create table max_parts_in_total (x UInt64) ENGINE = MergeTree PARTITION BY x ORDER BY x SETTINGS max_parts_in_total = 10; create table max_parts_in_total (x UInt64) ENGINE = MergeTree PARTITION BY x ORDER BY x SETTINGS max_parts_in_total = 10;
SET max_insert_threads = 1;
INSERT INTO max_parts_in_total SELECT number FROM numbers(10); INSERT INTO max_parts_in_total SELECT number FROM numbers(10);
SELECT 1; SELECT 1;
INSERT INTO max_parts_in_total SELECT 123; -- { serverError TOO_MANY_PARTS } INSERT INTO max_parts_in_total SELECT 123; -- { serverError TOO_MANY_PARTS }

View File

@ -1,4 +0,0 @@
275 0 138 136 0
275 0
275 0 138 136 0
275 0

View File

@ -1,5 +1,5 @@
#!/usr/bin/env bash #!/usr/bin/env bash
# Tags: long, no-parallel, no-ordinary-database, no-debug # Tags: long, no-parallel, no-ordinary-database
# Test is too heavy, avoid parallel run in Flaky Check # Test is too heavy, avoid parallel run in Flaky Check
# shellcheck disable=SC2119 # shellcheck disable=SC2119
@ -7,82 +7,126 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh # shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh . "$CURDIR"/../shell_config.sh
set -e set -ue
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS src"; $CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS src";
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS dst"; $CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS dst";
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS mv"; $CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS mv";
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS tmp"; $CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS tmp";
$CLICKHOUSE_CLIENT --query "CREATE TABLE src (n Int8, m Int8, CONSTRAINT c CHECK xxHash32(n+m) % 8 != 0) ENGINE=MergeTree ORDER BY n PARTITION BY 0 < n SETTINGS old_parts_lifetime=0";
$CLICKHOUSE_CLIENT --query "CREATE TABLE dst (nm Int16, CONSTRAINT c CHECK xxHash32(nm) % 8 != 0) ENGINE=MergeTree ORDER BY nm SETTINGS old_parts_lifetime=0";
$CLICKHOUSE_CLIENT --query "CREATE MATERIALIZED VIEW mv TO dst (nm Int16) AS SELECT n*m AS nm FROM src";
$CLICKHOUSE_CLIENT --query "CREATE TABLE tmp (x UInt8, nm Int16) ENGINE=MergeTree ORDER BY (x, nm) SETTINGS old_parts_lifetime=0" $CLICKHOUSE_CLIENT --query "CREATE TABLE src (n Int32, m Int32, CONSTRAINT c CHECK xxHash32(n+m) % 8 != 0) ENGINE=MergeTree ORDER BY n PARTITION BY 0 < n SETTINGS old_parts_lifetime=0";
$CLICKHOUSE_CLIENT --query "CREATE TABLE dst (nm Int32, CONSTRAINT c CHECK xxHash32(nm) % 8 != 0) ENGINE=MergeTree ORDER BY nm SETTINGS old_parts_lifetime=0";
$CLICKHOUSE_CLIENT --query "CREATE MATERIALIZED VIEW mv TO dst (nm Int32) AS SELECT n*m AS nm FROM src";
$CLICKHOUSE_CLIENT --query "CREATE TABLE tmp (x UInt32, nm Int32) ENGINE=MergeTree ORDER BY (x, nm) SETTINGS old_parts_lifetime=0"
$CLICKHOUSE_CLIENT --query "INSERT INTO src VALUES (0, 0)" $CLICKHOUSE_CLIENT --query "INSERT INTO src VALUES (0, 0)"
# some transactions will fail due to constraint function get_now()
function thread_insert_commit()
{ {
set -e date +%s
for i in {1..100}; do
$CLICKHOUSE_CLIENT --multiquery --query "
BEGIN TRANSACTION;
INSERT INTO src VALUES /* ($i, $1) */ ($i, $1);
SELECT throwIf((SELECT sum(nm) FROM mv) != $(($i * $1))) FORMAT Null;
INSERT INTO src VALUES /* (-$i, $1) */ (-$i, $1);
COMMIT;" 2>&1| grep -Fv "is violated at row" | grep -Fv "Transaction is not in RUNNING state" | grep -F "Received from " ||:
done
} }
function thread_insert_rollback() is_pid_exist()
{
local pid=$1
ps -p $pid > /dev/null
}
function run_until_deadline_and_at_least_times()
{ {
set -e set -e
for _ in {1..100}; do
$CLICKHOUSE_CLIENT --multiquery --query " local deadline=$1; shift
BEGIN TRANSACTION; local min_iterations=$1; shift
INSERT INTO src VALUES /* (42, $1) */ (42, $1); local function_to_run=$1; shift
SELECT throwIf((SELECT count() FROM src WHERE n=42 AND m=$1) != 1) FORMAT Null;
ROLLBACK;" local started_time
started_time=$(get_now)
local i=0
while true
do
$function_to_run $i "$@"
[[ $(get_now) -lt $deadline ]] || break
i=$(($i + 1))
done done
[[ $i -gt $min_iterations ]] || echo "$i/$min_iterations : not enough iterations of $function_to_run has been made from $started_time until $deadline" >&2
}
function insert_commit_action()
{
set -e
local i=$1; shift
local tag=$1; shift
# some transactions will fail due to constraint
$CLICKHOUSE_CLIENT --multiquery --query "
BEGIN TRANSACTION;
INSERT INTO src VALUES /* ($i, $tag) */ ($i, $tag);
SELECT throwIf((SELECT sum(nm) FROM mv) != $(($i * $tag))) /* ($i, $tag) */ FORMAT Null;
INSERT INTO src VALUES /* (-$i, $tag) */ (-$i, $tag);
COMMIT;
" 2>&1 \
| grep -Fv "is violated at row" | grep -Fv "Transaction is not in RUNNING state" | grep -F "Received from " ||:
}
function insert_rollback_action()
{
set -e
local i=$1; shift
local tag=$1; shift
$CLICKHOUSE_CLIENT --multiquery --query "
BEGIN TRANSACTION;
INSERT INTO src VALUES /* (42, $tag) */ (42, $tag);
SELECT throwIf((SELECT count() FROM src WHERE n=42 AND m=$tag) != 1) FORMAT Null;
ROLLBACK;"
} }
# make merges more aggressive # make merges more aggressive
function thread_optimize() function optimize_action()
{ {
set -e set -e
while true; do
optimize_query="OPTIMIZE TABLE src"
partition_id=$(( RANDOM % 2 ))
if (( RANDOM % 2 )); then
optimize_query="OPTIMIZE TABLE dst"
partition_id="all"
fi
if (( RANDOM % 2 )); then
optimize_query="$optimize_query PARTITION ID '$partition_id'"
fi
if (( RANDOM % 2 )); then
optimize_query="$optimize_query FINAL"
fi
action="COMMIT"
if (( RANDOM % 4 )); then
action="ROLLBACK"
fi
$CLICKHOUSE_CLIENT --multiquery --query " optimize_query="OPTIMIZE TABLE src"
partition_id=$(( RANDOM % 2 ))
if (( RANDOM % 2 )); then
optimize_query="OPTIMIZE TABLE dst"
partition_id="all"
fi
if (( RANDOM % 2 )); then
optimize_query="$optimize_query PARTITION ID '$partition_id'"
fi
if (( RANDOM % 2 )); then
optimize_query="$optimize_query FINAL"
fi
action="COMMIT"
if (( RANDOM % 4 )); then
action="ROLLBACK"
fi
$CLICKHOUSE_CLIENT --multiquery --query "
BEGIN TRANSACTION; BEGIN TRANSACTION;
$optimize_query; $optimize_query;
$action; $action;
" 2>&1| grep -Fv "already exists, but it will be deleted soon" | grep -F "Received from " ||: " 2>&1 \
sleep 0.$RANDOM; | grep -Fv "already exists, but it will be deleted soon" | grep -F "Received from " ||:
done
sleep 0.$RANDOM;
} }
function thread_select() function select_action()
{ {
set -e set -e
while true; do
$CLICKHOUSE_CLIENT --multiquery --query " $CLICKHOUSE_CLIENT --multiquery --query "
BEGIN TRANSACTION; BEGIN TRANSACTION;
SELECT throwIf((SELECT (sum(n), count() % 2) FROM src) != (0, 1)) FORMAT Null; SELECT throwIf((SELECT (sum(n), count() % 2) FROM src) != (0, 1)) FORMAT Null;
SELECT throwIf((SELECT (sum(nm), count() % 2) FROM mv) != (0, 1)) FORMAT Null; SELECT throwIf((SELECT (sum(nm), count() % 2) FROM mv) != (0, 1)) FORMAT Null;
@ -90,14 +134,13 @@ function thread_select()
SELECT throwIf((SELECT arraySort(groupArray(nm)) FROM mv) != (SELECT arraySort(groupArray(nm)) FROM dst)) FORMAT Null; SELECT throwIf((SELECT arraySort(groupArray(nm)) FROM mv) != (SELECT arraySort(groupArray(nm)) FROM dst)) FORMAT Null;
SELECT throwIf((SELECT arraySort(groupArray(nm)) FROM mv) != (SELECT arraySort(groupArray(n*m)) FROM src)) FORMAT Null; SELECT throwIf((SELECT arraySort(groupArray(nm)) FROM mv) != (SELECT arraySort(groupArray(n*m)) FROM src)) FORMAT Null;
COMMIT;" COMMIT;"
done
} }
function thread_select_insert() function select_insert_action()
{ {
set -e set -e
while true; do
$CLICKHOUSE_CLIENT --multiquery --query " $CLICKHOUSE_CLIENT --multiquery --query "
BEGIN TRANSACTION; BEGIN TRANSACTION;
SELECT throwIf((SELECT count() FROM tmp) != 0) FORMAT Null; SELECT throwIf((SELECT count() FROM tmp) != 0) FORMAT Null;
INSERT INTO tmp SELECT 1, n*m FROM src; INSERT INTO tmp SELECT 1, n*m FROM src;
@ -110,36 +153,69 @@ function thread_select_insert()
SELECT throwIf(1 != (SELECT countDistinct(arr) FROM (SELECT x, arraySort(groupArray(nm)) AS arr FROM tmp WHERE x!=4 GROUP BY x))) FORMAT Null; SELECT throwIf(1 != (SELECT countDistinct(arr) FROM (SELECT x, arraySort(groupArray(nm)) AS arr FROM tmp WHERE x!=4 GROUP BY x))) FORMAT Null;
SELECT throwIf((SELECT count(), sum(nm) FROM tmp WHERE x=4) != (SELECT count(), sum(nm) FROM tmp WHERE x!=4)) FORMAT Null; SELECT throwIf((SELECT count(), sum(nm) FROM tmp WHERE x=4) != (SELECT count(), sum(nm) FROM tmp WHERE x!=4)) FORMAT Null;
ROLLBACK;" ROLLBACK;"
done
} }
thread_insert_commit 1 & PID_1=$! MAIN_TIME_PART=400
thread_insert_commit 2 & PID_2=$! SECOND_TIME_PART=30
thread_insert_rollback 3 & PID_3=$! WAIT_FINISH=60
LAST_TIME_GAP=10
thread_optimize & PID_4=$! if [[ $((MAIN_TIME_PART + SECOND_TIME_PART + WAIT_FINISH + LAST_TIME_GAP)) -ge 600 ]]; then
thread_select & PID_5=$! echo "time sttings are wrong" 2>&1
thread_select_insert & PID_6=$! exit 1
sleep 0.$RANDOM; fi
thread_select & PID_7=$!
thread_select_insert & PID_8=$!
wait $PID_1 && wait $PID_2 && wait $PID_3 START_TIME=$(get_now)
kill -TERM $PID_4 STOP_TIME=$((START_TIME + MAIN_TIME_PART))
kill -TERM $PID_5 SECOND_STOP_TIME=$((STOP_TIME + SECOND_TIME_PART))
kill -TERM $PID_6 MIN_ITERATIONS=20
kill -TERM $PID_7
kill -TERM $PID_8 run_until_deadline_and_at_least_times $STOP_TIME $MIN_ITERATIONS insert_commit_action 1 & PID_1=$!
wait run_until_deadline_and_at_least_times $STOP_TIME $MIN_ITERATIONS insert_commit_action 2 & PID_2=$!
wait_for_queries_to_finish 40 run_until_deadline_and_at_least_times $STOP_TIME $MIN_ITERATIONS insert_rollback_action 3 & PID_3=$!
run_until_deadline_and_at_least_times $SECOND_STOP_TIME $MIN_ITERATIONS optimize_action & PID_4=$!
run_until_deadline_and_at_least_times $SECOND_STOP_TIME $MIN_ITERATIONS select_action & PID_5=$!
run_until_deadline_and_at_least_times $SECOND_STOP_TIME $MIN_ITERATIONS select_insert_action & PID_6=$!
sleep 0.$RANDOM
run_until_deadline_and_at_least_times $SECOND_STOP_TIME $MIN_ITERATIONS select_action & PID_7=$!
run_until_deadline_and_at_least_times $SECOND_STOP_TIME $MIN_ITERATIONS select_insert_action & PID_8=$!
wait $PID_1 || echo "insert_commit_action has failed with status $?" 2>&1
wait $PID_2 || echo "second insert_commit_action has failed with status $?" 2>&1
wait $PID_3 || echo "insert_rollback_action has failed with status $?" 2>&1
is_pid_exist $PID_4 || echo "optimize_action is not running" 2>&1
is_pid_exist $PID_5 || echo "select_action is not running" 2>&1
is_pid_exist $PID_6 || echo "select_insert_action is not running" 2>&1
is_pid_exist $PID_7 || echo "second select_action is not running" 2>&1
is_pid_exist $PID_8 || echo "second select_insert_action is not running" 2>&1
wait $PID_4 || echo "optimize_action has failed with status $?" 2>&1
wait $PID_5 || echo "select_action has failed with status $?" 2>&1
wait $PID_6 || echo "select_insert_action has failed with status $?" 2>&1
wait $PID_7 || echo "second select_action has failed with status $?" 2>&1
wait $PID_8 || echo "second select_insert_action has failed with status $?" 2>&1
wait_for_queries_to_finish $WAIT_FINISH
$CLICKHOUSE_CLIENT --multiquery --query " $CLICKHOUSE_CLIENT --multiquery --query "
BEGIN TRANSACTION; BEGIN TRANSACTION;
SELECT count(), sum(n), sum(m=1), sum(m=2), sum(m=3) FROM src; SELECT throwIf((SELECT (sum(n), count() % 2) FROM src) != (0, 1)) FORMAT Null;
SELECT count(), sum(nm) FROM mv"; SELECT throwIf((SELECT (sum(nm), count() % 2) FROM mv) != (0, 1)) FORMAT Null;
SELECT throwIf((SELECT (sum(nm), count() % 2) FROM dst) != (0, 1)) FORMAT Null;
SELECT throwIf((SELECT arraySort(groupArray(nm)) FROM mv) != (SELECT arraySort(groupArray(nm)) FROM dst)) FORMAT Null;
SELECT throwIf((SELECT arraySort(groupArray(nm)) FROM mv) != (SELECT arraySort(groupArray(n*m)) FROM src)) FORMAT Null;
COMMIT;
"
$CLICKHOUSE_CLIENT --query "SELECT count(), sum(n), sum(m=1), sum(m=2), sum(m=3) FROM src" $CLICKHOUSE_CLIENT --multiquery --query "
$CLICKHOUSE_CLIENT --query "SELECT count(), sum(nm) FROM mv" SELECT throwIf((SELECT (sum(n), count() % 2) FROM src) != (0, 1)) FORMAT Null;
SELECT throwIf((SELECT (sum(nm), count() % 2) FROM mv) != (0, 1)) FORMAT Null;
SELECT throwIf((SELECT (sum(nm), count() % 2) FROM dst) != (0, 1)) FORMAT Null;
SELECT throwIf((SELECT arraySort(groupArray(nm)) FROM mv) != (SELECT arraySort(groupArray(nm)) FROM dst)) FORMAT Null;
SELECT throwIf((SELECT arraySort(groupArray(nm)) FROM mv) != (SELECT arraySort(groupArray(n*m)) FROM src)) FORMAT Null;
"
$CLICKHOUSE_CLIENT --query "DROP TABLE src"; $CLICKHOUSE_CLIENT --query "DROP TABLE src";
$CLICKHOUSE_CLIENT --query "DROP TABLE dst"; $CLICKHOUSE_CLIENT --query "DROP TABLE dst";

View File

@ -1,9 +1,9 @@
#!/usr/bin/env bash #!/usr/bin/env bash
# Tags: no-random-settings # Tags: no-random-settings
unset CLICKHOUSE_LOG_COMMENT
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# reset --log_comment, because the test has to use the readonly mode
CLICKHOUSE_LOG_COMMENT=
# shellcheck source=../shell_config.sh # shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh . "$CURDIR"/../shell_config.sh

View File

@ -32,10 +32,10 @@
1 1
1 1
0 0
SELECT count() AS `count()` SELECT count()
FROM constraint_test_constants FROM constraint_test_constants
WHERE (b > 100) OR (c > 100) WHERE (b > 100) OR (c > 100)
SELECT count() AS `count()` SELECT count()
FROM constraint_test_constants FROM constraint_test_constants
WHERE c > 100 WHERE c > 100
QUERY id: 0 QUERY id: 0
@ -53,7 +53,7 @@ QUERY id: 0
COLUMN id: 6, column_name: c, result_type: Int64, source_id: 3 COLUMN id: 6, column_name: c, result_type: Int64, source_id: 3
CONSTANT id: 7, constant_value: UInt64_100, constant_value_type: UInt8 CONSTANT id: 7, constant_value: UInt64_100, constant_value_type: UInt8
SETTINGS allow_experimental_analyzer=1 SETTINGS allow_experimental_analyzer=1
SELECT count() AS `count()` SELECT count()
FROM constraint_test_constants FROM constraint_test_constants
WHERE c > 100 WHERE c > 100
QUERY id: 0 QUERY id: 0
@ -71,7 +71,7 @@ QUERY id: 0
COLUMN id: 6, column_name: c, result_type: Int64, source_id: 3 COLUMN id: 6, column_name: c, result_type: Int64, source_id: 3
CONSTANT id: 7, constant_value: UInt64_100, constant_value_type: UInt8 CONSTANT id: 7, constant_value: UInt64_100, constant_value_type: UInt8
SETTINGS allow_experimental_analyzer=1 SETTINGS allow_experimental_analyzer=1
SELECT count() AS `count()` SELECT count()
FROM constraint_test_constants FROM constraint_test_constants
QUERY id: 0 QUERY id: 0
PROJECTION COLUMNS PROJECTION COLUMNS

Some files were not shown because too many files have changed in this diff Show More