Merge branch 'master' into more-aggresive-batching-keeper

This commit is contained in:
Antonio Andelic 2022-11-27 16:42:06 +01:00 committed by GitHub
commit 9dd5bf35b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1142,7 +1142,7 @@ TaskStatus ClusterCopier::tryCreateDestinationTable(const ConnectionTimeouts & t
InterpreterCreateQuery::prepareOnClusterQuery(create, getContext(), task_table.cluster_push_name);
String query = queryToString(create_query_push_ast);
LOG_INFO(log, "Create destination tables. Query: \n {}", query);
LOG_INFO(log, "Create destination tables. Query: {}", query);
UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, task_cluster->settings_push, ClusterExecutionMode::ON_EACH_NODE);
LOG_INFO(
log,
@ -1413,7 +1413,7 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
auto create_query_push_ast = rewriteCreateQueryStorage(create_query_ast, database_and_table_for_current_piece, new_engine_push_ast);
String query = queryToString(create_query_push_ast);
LOG_INFO(log, "Create destination tables. Query: \n {}", query);
LOG_INFO(log, "Create destination tables. Query: {}", query);
UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, task_cluster->settings_push, ClusterExecutionMode::ON_EACH_NODE);
LOG_INFO(
log,
@ -1517,7 +1517,7 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
// Select all fields
ASTPtr query_select_ast = get_select_query(task_shard.table_read_shard, "*", /*enable_splitting*/ true, inject_fault ? "1" : "");
LOG_INFO(log, "Executing SELECT query and pull from {} : {}", task_shard.getDescription(), queryToString(query_select_ast));
LOG_INFO(log, "Executing SELECT query and pull from {}: {}", task_shard.getDescription(), queryToString(query_select_ast));
ASTPtr query_insert_ast;
{
@ -1871,7 +1871,7 @@ std::set<String> ClusterCopier::getShardPartitions(const ConnectionTimeouts & ti
const auto & settings = getContext()->getSettingsRef();
ASTPtr query_ast = parseQuery(parser_query, query, settings.max_query_size, settings.max_parser_depth);
LOG_INFO(log, "Computing destination partition set, executing query: \n {}", query);
LOG_INFO(log, "Computing destination partition set, executing query: {}", query);
auto local_context = Context::createCopy(context);
local_context->setSettings(task_cluster->settings_pull);
@ -1922,7 +1922,7 @@ bool ClusterCopier::checkShardHasPartition(const ConnectionTimeouts & timeouts,
const auto & settings = getContext()->getSettingsRef();
ASTPtr query_ast = parseQuery(parser_query, query, settings.max_query_size, settings.max_parser_depth);
LOG_INFO(log, "Checking shard {} for partition {} existence, executing query: \n {}",
LOG_INFO(log, "Checking shard {} for partition {} existence, executing query: {}",
task_shard.getDescription(), partition_quoted_name, query_ast->formatForErrorMessage());
auto local_context = Context::createCopy(context);
@ -1964,7 +1964,7 @@ bool ClusterCopier::checkPresentPartitionPiecesOnCurrentShard(const ConnectionTi
query += " LIMIT 1";
LOG_INFO(log, "Checking shard {} for partition {} piece {} existence, executing query: \n \u001b[36m {}", task_shard.getDescription(), partition_quoted_name, std::to_string(current_piece_number), query);
LOG_INFO(log, "Checking shard {} for partition {} piece {} existence, executing query: {}", task_shard.getDescription(), partition_quoted_name, std::to_string(current_piece_number), query);
ParserQuery parser_query(query.data() + query.size());
const auto & settings = getContext()->getSettingsRef();
@ -2046,7 +2046,7 @@ UInt64 ClusterCopier::executeQueryOnCluster(
}
catch (...)
{
LOG_WARNING(log, "An error occurred while processing query : \n {}", query);
LOG_WARNING(log, "An error occurred while processing query: {}", query);
tryLogCurrentException(log);
continue;
}