Apply all transformations again

This commit is contained in:
Alexey Milovidov 2020-05-23 21:59:49 +03:00
parent a2ad11897f
commit 9d2a0d2dd7
24 changed files with 72 additions and 72 deletions

View File

@ -459,7 +459,7 @@ bool ClusterCopier::checkPartitionPieceIsDone(const TaskTable & task_table, cons
TaskStateWithOwner status = TaskStateWithOwner::fromString(res.data);
if (status.state != TaskState::Finished)
{
LOG_INFO(log, "The task " << res.data << " is being rewritten by " << status.owner << ". Partition piece will be rechecked");
LOG_INFO_FORMATTED(log, "The task {} is being rewritten by {}. Partition piece will be rechecked", res.data, status.owner);
return false;
}
@ -494,7 +494,7 @@ bool ClusterCopier::checkPartitionPieceIsDone(const TaskTable & task_table, cons
}
catch (const Coordination::Exception & e)
{
LOG_INFO(log, "A ZooKeeper error occurred while checking partition " << partition_name << " piece number " << toString(piece_number) << ". Will recheck the partition. Error: " << e.displayText());
LOG_INFO_FORMATTED(log, "A ZooKeeper error occurred while checking partition {} piece number {}. Will recheck the partition. Error: {}", partition_name, toString(piece_number), e.displayText());
return false;
}
@ -557,13 +557,13 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
TaskStateWithOwner status = TaskStateWithOwner::fromString(status_data);
if (status.state == TaskState::Finished)
{
LOG_DEBUG(log, "All pieces for partition from this task " << current_partition_attach_is_active << " has been successfully moved to destination table by " << status.owner);
LOG_DEBUG_FORMATTED(log, "All pieces for partition from this task {} has been successfully moved to destination table by {}", current_partition_attach_is_active, status.owner);
return TaskStatus::Finished;
}
/// Task is abandoned, because previously we created ephemeral node, possibly in other copier's process.
/// Initialize DROP PARTITION
LOG_DEBUG(log, "Moving piece for partition " << current_partition_attach_is_active << " has not been successfully finished by " << status.owner << ". Will try to move by myself.");
LOG_DEBUG_FORMATTED(log, "Moving piece for partition {} has not been successfully finished by {}. Will try to move by myself.", current_partition_attach_is_active, status.owner);
/// Remove is_done marker.
zookeeper->remove(current_partition_attach_is_done);
@ -580,7 +580,7 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
/// Move partition to original destination table.
for (size_t current_piece_number = 0; current_piece_number < task_table.number_of_splits; ++current_piece_number)
{
LOG_DEBUG(log, "Trying to move partition " << partition_name << " piece " << toString(current_piece_number) << " to original table");
LOG_DEBUG_FORMATTED(log, "Trying to move partition {} piece {} to original table", partition_name, toString(current_piece_number));
ASTPtr query_alter_ast;
String query_alter_ast_string;
@ -617,7 +617,7 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
}
catch (...)
{
LOG_DEBUG(log, "Error while moving partition " << partition_name << " piece " << toString(current_piece_number) << "to original table");
LOG_DEBUG_FORMATTED(log, "Error while moving partition {} piece {} to original table", partition_name, toString(current_piece_number));
throw;
}
@ -641,12 +641,12 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
&task_cluster->settings_push,
PoolMode::GET_MANY);
LOG_INFO(log, "Number of shard that executed OPTIMIZE DEDUPLICATE query successfully : " << toString(num_nodes));
LOG_INFO_FORMATTED(log, "Number of shard that executed OPTIMIZE DEDUPLICATE query successfully : {}", toString(num_nodes));
}
}
catch (...)
{
LOG_DEBUG(log, "Error while executing OPTIMIZE DEDUPLICATE partition " << partition_name << "in the original table");
LOG_DEBUG_FORMATTED(log, "Error while executing OPTIMIZE DEDUPLICATE partition {}in the original table", partition_name);
throw;
}
}
@ -742,7 +742,7 @@ bool ClusterCopier::tryDropPartitionPiece(
{
if (e.code == Coordination::ZNODEEXISTS)
{
LOG_DEBUG(log, "Partition " << task_partition.name << " piece " << toString(current_piece_number) << " is cleaning now by somebody, sleep");
LOG_DEBUG_FORMATTED(log, "Partition {} piece {} is cleaning now by somebody, sleep", task_partition.name, toString(current_piece_number));
std::this_thread::sleep_for(default_sleep_time);
return false;
}
@ -755,7 +755,7 @@ bool ClusterCopier::tryDropPartitionPiece(
{
if (stat.numChildren != 0)
{
LOG_DEBUG(log, "Partition " << task_partition.name << " contains " << stat.numChildren << " active workers while trying to drop it. Going to sleep.");
LOG_DEBUG_FORMATTED(log, "Partition {} contains {} active workers while trying to drop it. Going to sleep.", task_partition.name, stat.numChildren);
std::this_thread::sleep_for(default_sleep_time);
return false;
}
@ -840,12 +840,12 @@ bool ClusterCopier::tryDropPartitionPiece(
return false;
}
LOG_INFO(log, "Partition " << task_partition.name << " piece " << toString(current_piece_number) << " was dropped on cluster " << task_table.cluster_push_name);
LOG_INFO_FORMATTED(log, "Partition {} piece {} was dropped on cluster {}", task_partition.name, toString(current_piece_number), task_table.cluster_push_name);
if (zookeeper->tryCreate(current_shards_path, host_id, zkutil::CreateMode::Persistent) == Coordination::ZNODEEXISTS)
zookeeper->set(current_shards_path, host_id);
}
LOG_INFO(log, "Partition " << task_partition.name << " piece " << toString(current_piece_number) << " is safe for work now.");
LOG_INFO_FORMATTED(log, "Partition {} piece {} is safe for work now.", task_partition.name, toString(current_piece_number));
return true;
}
@ -1014,12 +1014,12 @@ bool ClusterCopier::tryProcessTable(const ConnectionTimeouts & timeouts, TaskTab
if (cluster_partition.rows_copied)
{
LOG_INFO(log, "Average partition speed: " << formatReadableSizeWithDecimalSuffix(cluster_partition.bytes_copied / elapsed) << " per second.");
LOG_INFO_FORMATTED(log, "Average partition speed: {} per second.", formatReadableSizeWithDecimalSuffix(cluster_partition.bytes_copied / elapsed));
}
if (task_table.rows_copied)
{
LOG_INFO(log, "Average table " << task_table.table_id << " speed: " << formatReadableSizeWithDecimalSuffix(task_table.bytes_copied / elapsed) << " per second.");
LOG_INFO_FORMATTED(log, "Average table {} speed: {} per second.", task_table.table_id, formatReadableSizeWithDecimalSuffix(task_table.bytes_copied / elapsed));
}
}
}
@ -1239,13 +1239,13 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
TaskStateWithOwner status = TaskStateWithOwner::fromString(status_data);
if (status.state == TaskState::Finished)
{
LOG_DEBUG(log, "Task " << current_task_piece_status_path << " has been successfully executed by " << status.owner);
LOG_DEBUG_FORMATTED(log, "Task {} has been successfully executed by {}", current_task_piece_status_path, status.owner);
return TaskStatus::Finished;
}
/// Task is abandoned, because previously we created ephemeral node, possibly in other copier's process.
/// Initialize DROP PARTITION
LOG_DEBUG(log, "Task " << current_task_piece_status_path << " has not been successfully finished by " << status.owner << ". Partition will be dropped and refilled.");
LOG_DEBUG_FORMATTED(log, "Task {} has not been successfully finished by {}. Partition will be dropped and refilled.", current_task_piece_status_path, status.owner);
create_is_dirty_node(clean_state_clock);
return TaskStatus::Error;
@ -1314,12 +1314,12 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
CleanStateClock new_clean_state_clock (zookeeper, piece_is_dirty_flag_path, piece_is_dirty_cleaned_path);
if (clean_state_clock != new_clean_state_clock)
{
LOG_INFO(log, "Partition " << task_partition.name << " piece " << toString(current_piece_number) << " clean state changed, cowardly bailing");
LOG_INFO_FORMATTED(log, "Partition {} piece {} clean state changed, cowardly bailing", task_partition.name, toString(current_piece_number));
return TaskStatus::Error;
}
else if (!new_clean_state_clock.is_clean())
{
LOG_INFO(log, "Partition " << task_partition.name << " piece " << toString(current_piece_number) << " is dirty and will be dropped and refilled");
LOG_INFO_FORMATTED(log, "Partition {} piece {} is dirty and will be dropped and refilled", task_partition.name, toString(current_piece_number));
create_is_dirty_node(new_clean_state_clock);
return TaskStatus::Error;
}
@ -1350,7 +1350,7 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query,
create_query_push_ast, &task_cluster->settings_push,
PoolMode::GET_MANY);
LOG_DEBUG(log, "Destination tables " << getQuotedTable(task_table.table_push) << " have been created on " << shards << " shards of " << task_table.cluster_push->getShardCount());
LOG_DEBUG_FORMATTED(log, "Destination tables {} have been created on {} shards of {}", getQuotedTable(task_table.table_push), shards, task_table.cluster_push->getShardCount());
}
/// Do the copying
@ -1365,7 +1365,7 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
// Select all fields
ASTPtr query_select_ast = get_select_query(task_shard.table_read_shard, "*", /*enable_splitting*/ true, inject_fault ? "1" : "");
LOG_DEBUG(log, "Executing SELECT query and pull from " << task_shard.getDescription() << " : " << queryToString(query_select_ast));
LOG_DEBUG_FORMATTED(log, "Executing SELECT query and pull from {} : {}", task_shard.getDescription(), queryToString(query_select_ast));
ASTPtr query_insert_ast;
{
@ -1458,7 +1458,7 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
}
}
LOG_INFO(log, "Partition " << task_partition.name << " piece " << toString(current_piece_number) << " copied. But not moved to original destination table.");
LOG_INFO_FORMATTED(log, "Partition {} piece {} copied. But not moved to original destination table.", task_partition.name, toString(current_piece_number));
/// Try create original table (if not exists) on each shard
@ -1473,7 +1473,7 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query,
create_query_push_ast, &task_cluster->settings_push,
PoolMode::GET_MANY);
LOG_DEBUG(log, "Destination tables " << getQuotedTable(task_table.table_push) << " have been created on " << shards << " shards of " << task_table.cluster_push->getShardCount());
LOG_DEBUG_FORMATTED(log, "Destination tables {} have been created on {} shards of {}", getQuotedTable(task_table.table_push), shards, task_table.cluster_push->getShardCount());
}
catch (...)
{
@ -1486,12 +1486,12 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
CleanStateClock new_clean_state_clock (zookeeper, piece_is_dirty_flag_path, piece_is_dirty_cleaned_path);
if (clean_state_clock != new_clean_state_clock)
{
LOG_INFO(log, "Partition " << task_partition.name << " piece " << toString(current_piece_number) << " clean state changed, cowardly bailing");
LOG_INFO_FORMATTED(log, "Partition {} piece {} clean state changed, cowardly bailing", task_partition.name, toString(current_piece_number));
return TaskStatus::Error;
}
else if (!new_clean_state_clock.is_clean())
{
LOG_INFO(log, "Partition " << task_partition.name << " piece " << toString(current_piece_number) << " became dirty and will be dropped and refilled");
LOG_INFO_FORMATTED(log, "Partition {} piece {} became dirty and will be dropped and refilled", task_partition.name, toString(current_piece_number));
create_is_dirty_node(new_clean_state_clock);
return TaskStatus::Error;
}
@ -1718,7 +1718,7 @@ bool ClusterCopier::checkShardHasPartition(const ConnectionTimeouts & timeouts,
query += " LIMIT 1";
LOG_DEBUG(log, "Checking shard " << task_shard.getDescription() << " for partition " << partition_quoted_name << " existence, executing query: " << query);
LOG_DEBUG_FORMATTED(log, "Checking shard {} for partition {} existence, executing query: {}", task_shard.getDescription(), partition_quoted_name, query);
ParserQuery parser_query(query.data() + query.size());
const auto & settings = context.getSettingsRef();
@ -1767,9 +1767,9 @@ bool ClusterCopier::checkPresentPartitionPiecesOnCurrentShard(const ConnectionTi
local_context.setSettings(task_cluster->settings_pull);
auto result = InterpreterFactory::get(query_ast, local_context)->execute().in->read().rows();
if (result != 0)
LOG_DEBUG(log, "Partition " << partition_quoted_name << " piece number " << std::to_string(current_piece_number) << " is PRESENT on shard " << task_shard.getDescription());
LOG_DEBUG_FORMATTED(log, "Partition {} piece number {} is PRESENT on shard {}", partition_quoted_name, std::to_string(current_piece_number), task_shard.getDescription());
else
LOG_DEBUG(log, "Partition " << partition_quoted_name << " piece number " << std::to_string(current_piece_number) << " is ABSENT on shard " << task_shard.getDescription());
LOG_DEBUG_FORMATTED(log, "Partition {} piece number {} is ABSENT on shard {}", partition_quoted_name, std::to_string(current_piece_number), task_shard.getDescription());
return result != 0;
}
@ -1886,7 +1886,7 @@ UInt64 ClusterCopier::executeQueryOnCluster(
if (execution_mode == ClusterExecutionMode::ON_EACH_NODE && successful_nodes != origin_replicas_number)
{
LOG_INFO(log, "There was an error while executing ALTER on each node. Query was executed on " << toString(successful_nodes) << " nodes. But had to be executed on " << toString(origin_replicas_number.load()));
LOG_INFO_FORMATTED(log, "There was an error while executing ALTER on each node. Query was executed on {} nodes. But had to be executed on {}", toString(successful_nodes), toString(origin_replicas_number.load()));
}

View File

@ -530,7 +530,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
if (uncompressed_cache_size > max_cache_size)
{
uncompressed_cache_size = max_cache_size;
LOG_INFO(log, "Uncompressed cache size was lowered to " << formatReadableSizeWithBinarySuffix(uncompressed_cache_size) << " because the system has low amount of memory");
LOG_INFO_FORMATTED(log, "Uncompressed cache size was lowered to {} because the system has low amount of memory", formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
}
global_context->setUncompressedCache(uncompressed_cache_size);
@ -545,7 +545,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
if (mark_cache_size > max_cache_size)
{
mark_cache_size = max_cache_size;
LOG_INFO(log, "Mark cache size was lowered to " << formatReadableSizeWithBinarySuffix(uncompressed_cache_size) << " because the system has low amount of memory");
LOG_INFO_FORMATTED(log, "Mark cache size was lowered to {} because the system has low amount of memory", formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
}
global_context->setMarkCache(mark_cache_size);
@ -574,7 +574,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
else if (max_server_memory_usage > default_max_server_memory_usage)
{
max_server_memory_usage = default_max_server_memory_usage;
LOG_INFO(log, "Setting max_server_memory_usage was lowered to " << formatReadableSizeWithBinarySuffix(max_server_memory_usage) << " because the system has low amount of memory");
LOG_INFO_FORMATTED(log, "Setting max_server_memory_usage was lowered to {} because the system has low amount of memory", formatReadableSizeWithBinarySuffix(max_server_memory_usage));
}
total_memory_tracker.setOrRaiseHardLimit(max_server_memory_usage);
@ -959,7 +959,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
current_connections += server->currentConnections();
}
LOG_INFO(log, "Closed all listening sockets." << (current_connections ? " Waiting for " + toString(current_connections) + " outstanding connections." : ""));
LOG_INFO_FORMATTED(log, "Closed all listening sockets.{}", (current_connections ? " Waiting for " + toString(current_connections) + " outstanding connections." : ""));
/// Killing remaining queries.
global_context->getProcessList().killAllQueries();

View File

@ -115,7 +115,7 @@ void TCPHandler::runImpl()
if (!DatabaseCatalog::instance().isDatabaseExist(default_database))
{
Exception e("Database " + backQuote(default_database) + " doesn't exist", ErrorCodes::UNKNOWN_DATABASE);
LOG_ERROR(log, "Code: " << e.code() << ", e.displayText() = " << e.displayText() << ", Stack trace:\n\n" << e.getStackTraceString());
LOG_ERROR_FORMATTED(log, "Code: {}, e.displayText() = {}, Stack trace:\n\n{}", e.code(), e.displayText(), e.getStackTraceString());
sendException(e, connection_context.getSettingsRef().calculate_text_stack_trace);
return;
}

View File

@ -49,12 +49,12 @@ MemoryTracker::~MemoryTracker()
void MemoryTracker::logPeakMemoryUsage() const
{
LOG_DEBUG(&Logger::get("MemoryTracker"), "Peak memory usage" << (description ? " " + std::string(description) : "") << ": " << formatReadableSizeWithBinarySuffix(peak) << ".");
LOG_DEBUG_FORMATTED(&Logger::get("MemoryTracker"), "Peak memory usage{}: {}.", (description ? " " + std::string(description) : ""), formatReadableSizeWithBinarySuffix(peak));
}
void MemoryTracker::logMemoryUsage(Int64 current) const
{
LOG_DEBUG(&Logger::get("MemoryTracker"), "Current memory usage" << (description ? " " + std::string(description) : "") << ": " << formatReadableSizeWithBinarySuffix(current) << ".");
LOG_DEBUG_FORMATTED(&Logger::get("MemoryTracker"), "Current memory usage{}: {}.", (description ? " " + std::string(description) : ""), formatReadableSizeWithBinarySuffix(current));
}

View File

@ -264,7 +264,7 @@ void MergeSortingBlockInputStream::remerge()
}
merger.readSuffix();
LOG_DEBUG(log, "Memory usage is lowered from " << formatReadableSizeWithBinarySuffix(sum_bytes_in_blocks) << " to " << formatReadableSizeWithBinarySuffix(new_sum_bytes_in_blocks));
LOG_DEBUG_FORMATTED(log, "Memory usage is lowered from {} to {}", formatReadableSizeWithBinarySuffix(sum_bytes_in_blocks), formatReadableSizeWithBinarySuffix(new_sum_bytes_in_blocks));
/// If the memory consumption was not lowered enough - we will not perform remerge anymore. 2 is a guess.
if (new_sum_bytes_in_blocks * 2 > sum_bytes_in_blocks)

View File

@ -857,7 +857,7 @@ void Aggregator::writeToTemporaryFileImpl(
/// `data_variants` will not destroy them in the destructor, they are now owned by ColumnAggregateFunction objects.
data_variants.aggregator = nullptr;
LOG_TRACE(log, "Max size of temporary block: " << max_temporary_block_size_rows << " rows, " << (max_temporary_block_size_bytes / 1048576.0) << " MiB.");
LOG_TRACE_FORMATTED(log, "Max size of temporary block: {} rows, {} MiB.", max_temporary_block_size_rows, (max_temporary_block_size_bytes / 1048576.0));
}
@ -1924,7 +1924,7 @@ void Aggregator::mergeStream(const BlockInputStreamPtr & stream, AggregatedDataV
bucket_to_blocks[block.info.bucket_num].emplace_back(std::move(block));
}
LOG_TRACE(log, "Read " << total_input_blocks << " blocks of partially aggregated data, total " << total_input_rows << " rows.");
LOG_TRACE_FORMATTED(log, "Read {} blocks of partially aggregated data, total {} rows.", total_input_blocks, total_input_rows);
mergeBlocks(bucket_to_blocks, result, max_threads);
}

View File

@ -755,7 +755,7 @@ bool DDLWorker::tryExecuteQueryOnLeaderReplica(
String executed_by;
if (zookeeper->tryGet(is_executed_path, executed_by))
{
LOG_DEBUG(log, "Task " << task.entry_name << " has already been executed by leader replica (" << executed_by << ") of the same shard.");
LOG_DEBUG_FORMATTED(log, "Task {} has already been executed by leader replica ({}) of the same shard.", task.entry_name, executed_by);
return true;
}

View File

@ -21,7 +21,7 @@ void DNSCacheUpdater::run()
/// Reload cluster config if IP of any host has been changed since last update.
if (resolver.updateCache())
{
LOG_INFO(&Poco::Logger::get("DNSCacheUpdater"), "IPs of some hosts have been changed. Will reload cluster config.");
LOG_INFO_FORMATTED(&Poco::Logger::get("DNSCacheUpdater"), "IPs of some hosts have been changed. Will reload cluster config.");
try
{
context.reloadClusterConfig();

View File

@ -869,7 +869,7 @@ private:
{
if (info.isLoading())
{
LOG_TRACE(log, "The object '" << info.name << "' is already being loaded, force = " << forced_to_reload << ".");
LOG_TRACE_FORMATTED(log, "The object '{}' is already being loaded, force = {}.", info.name, forced_to_reload);
if (!forced_to_reload)
{
@ -924,7 +924,7 @@ private:
info = prepareToLoadSingleObject(name, loading_id, min_id_to_finish_loading_dependencies_, lock);
if (!info)
{
LOG_TRACE(log, "Could not lock object '" << name << "' for loading");
LOG_TRACE_FORMATTED(log, "Could not lock object '{}' for loading", name);
return;
}
}
@ -1067,7 +1067,7 @@ private:
info->last_successful_update_time = current_time;
info->state_id = info->loading_id;
info->next_update_time = next_update_time;
LOG_TRACE(log, "Next update time for '" << info->name << "' was set to " << ext::to_string(next_update_time));
LOG_TRACE_FORMATTED(log, "Next update time for '{}' was set to {}", info->name, ext::to_string(next_update_time));
}
/// Removes the references to the loading thread from the maps.

View File

@ -378,7 +378,7 @@ void SystemLog<LogElement>::flushImpl(const std::vector<LogElement> & to_flush,
{
try
{
LOG_TRACE(log, "Flushing system log, " << to_flush.size() << " entries to flush");
LOG_TRACE_FORMATTED(log, "Flushing system log, {} entries to flush", to_flush.size());
/// We check for existence of the table and create it as needed at every
/// flush. This is done to allow user to drop the table at any moment

View File

@ -629,7 +629,7 @@ private:
Poco::JSON::Parser parser;
auto json_body = parser.parse(*response_body).extract<Poco::JSON::Object::Ptr>();
auto schema = json_body->getValue<std::string>("schema");
LOG_TRACE((&Logger::get("AvroConfluentRowInputFormat")), "Succesfully fetched schema id = " << id << "\n" << schema);
LOG_TRACE_FORMATTED((&Logger::get("AvroConfluentRowInputFormat")), "Succesfully fetched schema id = {}\n{}", id, schema);
return avro::compileJsonSchemaFromString(schema);
}
catch (const Exception &)

View File

@ -83,7 +83,7 @@ public:
{
if (free_chunks.size() != chunks.size())
{
LOG_ERROR(&Logger::get("SharedChunkAllocator"), "SharedChunkAllocator was destroyed before RowRef was released. StackTrace: " << StackTrace().toString());
LOG_ERROR_FORMATTED(&Logger::get("SharedChunkAllocator"), "SharedChunkAllocator was destroyed before RowRef was released. StackTrace: {}", StackTrace().toString());
return;
}
@ -100,7 +100,7 @@ private:
/// This may happen if allocator was removed before chunks.
/// Log message and exit, because we don't want to throw exception in destructor.
LOG_ERROR(&Logger::get("SharedChunkAllocator"), "SharedChunkAllocator was destroyed before RowRef was released. StackTrace: " << StackTrace().toString());
LOG_ERROR_FORMATTED(&Logger::get("SharedChunkAllocator"), "SharedChunkAllocator was destroyed before RowRef was released. StackTrace: {}", StackTrace().toString());
return;
}

View File

@ -251,7 +251,7 @@ void MergeSortingTransform::generate()
void MergeSortingTransform::remerge()
{
LOG_DEBUG(log, "Re-merging intermediate ORDER BY data (" << chunks.size() << " blocks with " << sum_rows_in_blocks << " rows) to save memory consumption");
LOG_DEBUG_FORMATTED(log, "Re-merging intermediate ORDER BY data ({} blocks with {} rows) to save memory consumption", chunks.size(), sum_rows_in_blocks);
/// NOTE Maybe concat all blocks and partial sort will be faster than merge?
MergeSorter remerge_sorter(std::move(chunks), description, max_merged_block_size, limit);
@ -267,7 +267,7 @@ void MergeSortingTransform::remerge()
new_chunks.emplace_back(std::move(chunk));
}
LOG_DEBUG(log, "Memory usage is lowered from " << formatReadableSizeWithBinarySuffix(sum_bytes_in_blocks) << " to " << formatReadableSizeWithBinarySuffix(new_sum_bytes_in_blocks));
LOG_DEBUG_FORMATTED(log, "Memory usage is lowered from {} to {}", formatReadableSizeWithBinarySuffix(sum_bytes_in_blocks), formatReadableSizeWithBinarySuffix(new_sum_bytes_in_blocks));
/// If the memory consumption was not lowered enough - we will not perform remerge anymore. 2 is a guess.
if (new_sum_bytes_in_blocks * 2 > sum_bytes_in_blocks)

View File

@ -46,7 +46,7 @@ Chunk MergingAggregatedTransform::generate()
if (!generate_started)
{
generate_started = true;
LOG_TRACE(log, "Read " << total_input_blocks << " blocks of partially aggregated data, total " << total_input_rows << " rows.");
LOG_TRACE_FORMATTED(log, "Read {} blocks of partially aggregated data, total {} rows.", total_input_blocks, total_input_rows);
/// Exception safety. Make iterator valid in case any method below throws.
next_block = blocks.begin();

View File

@ -107,7 +107,7 @@ EphemeralLocksInAllPartitions::EphemeralLocksInAllPartitions(
int rc = zookeeper.tryMulti(lock_ops, lock_responses);
if (rc == Coordination::ZBADVERSION)
{
LOG_TRACE(&Logger::get("EphemeralLocksInAllPartitions"), "Someone has inserted a block in a new partition while we were creating locks. Retry.");
LOG_TRACE_FORMATTED(&Logger::get("EphemeralLocksInAllPartitions"), "Someone has inserted a block in a new partition while we were creating locks. Retry.");
continue;
}
else if (rc != Coordination::ZOK)

View File

@ -257,7 +257,7 @@ void IMergeTreeDataPart::removeIfNeeded()
if (!startsWith(file_name, "tmp"))
{
LOG_ERROR(storage.log, "~DataPart() should remove part " << path << " but its name doesn't start with tmp. Too suspicious, keeping the part.");
LOG_ERROR_FORMATTED(storage.log, "~DataPart() should remove part {} but its name doesn't start with tmp. Too suspicious, keeping the part.", path);
return;
}
}
@ -765,7 +765,7 @@ void IMergeTreeDataPart::remove() const
{
/// Recursive directory removal does many excessive "stat" syscalls under the hood.
LOG_ERROR(storage.log, "Cannot quickly remove directory " << fullPath(volume->getDisk(), to) << " by removing files; fallback to recursive removal. Reason: " << getCurrentExceptionMessage(false));
LOG_ERROR_FORMATTED(storage.log, "Cannot quickly remove directory {} by removing files; fallback to recursive removal. Reason: {}", fullPath(volume->getDisk(), to), getCurrentExceptionMessage(false));
volume->getDisk()->removeRecursive(to + "/");
}

View File

@ -204,7 +204,7 @@ MergeTreeData::MergeTreeData(
{
if (!version_file.first.empty())
{
LOG_ERROR(log, "Duplication of version file " << fullPath(version_file.second, version_file.first) << " and " << current_version_file_path);
LOG_ERROR_FORMATTED(log, "Duplication of version file {} and {}", fullPath(version_file.second, version_file.first), current_version_file_path);
throw Exception("Multiple format_version.txt file", ErrorCodes::CORRUPTED_DATA);
}
version_file = {current_version_file_path, disk};

View File

@ -368,7 +368,7 @@ void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(zkutil::ZooKeeper &
auto not_cached_blocks = stat.numChildren - cached_block_stats.size();
if (not_cached_blocks)
{
LOG_TRACE(log, "Checking " << stat.numChildren << " blocks (" << not_cached_blocks << " are not cached)" << " to clear old ones from ZooKeeper.");
LOG_TRACE_FORMATTED(log, "Checking {} blocks ({} are not cached){}", stat.numChildren, not_cached_blocks, " to clear old ones from ZooKeeper.");
}
zkutil::AsyncResponses<Coordination::ExistsResponse> exists_futures;

View File

@ -158,7 +158,7 @@ void ReplicatedMergeTreePartCheckThread::searchForMissingPart(const String & par
not_found_msg = "a smaller part with the same min block.";
else
not_found_msg = "smaller parts with either the same min block or the same max block.";
LOG_ERROR(log, "No replica has part covering " << part_name << " and a merge is impossible: we didn't find " << not_found_msg);
LOG_ERROR_FORMATTED(log, "No replica has part covering {} and a merge is impossible: we didn't find {}", part_name, not_found_msg);
ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed);
@ -285,7 +285,7 @@ CheckResult ReplicatedMergeTreePartCheckThread::checkPart(const String & part_na
/// but remained in the filesystem and in a number of active parts.
/// And then for a long time (before restarting), the data on the replicas will be different.
LOG_TRACE(log, "Young part " << part_name << " with age " << (time(nullptr) - part->modification_time) << " seconds hasn't been added to ZooKeeper yet. It's ok.");
LOG_TRACE_FORMATTED(log, "Young part {} with age {} seconds hasn't been added to ZooKeeper yet. It's ok.", part_name, (time(nullptr) - part->modification_time));
}
}
else

View File

@ -75,7 +75,7 @@ bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper)
return already_loaded_paths.count(path);
});
LOG_DEBUG(log, "Having " << (to_remove_it - children.begin()) << " queue entries to load, " << (children.end() - to_remove_it) << " entries already loaded.");
LOG_DEBUG_FORMATTED(log, "Having {} queue entries to load, {} entries already loaded.", (to_remove_it - children.begin()), (children.end() - to_remove_it));
children.erase(to_remove_it, children.end());
std::sort(children.begin(), children.end());
@ -645,7 +645,7 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C
if (!entries_to_load.empty())
{
LOG_INFO(log, "Loading " + toString(entries_to_load.size()) + " mutation entries: " + entries_to_load.front() + " - " + entries_to_load.back());
LOG_INFO_FORMATTED(log, "Loading {} mutation entries: {} - {}", toString(entries_to_load.size()), entries_to_load.front(), entries_to_load.back());
std::vector<std::future<Coordination::GetResponse>> futures;
for (const String & entry : entries_to_load)
@ -845,7 +845,7 @@ void ReplicatedMergeTreeQueue::removePartProducingOpsInRange(
to_wait.push_back(*it);
auto code = zookeeper->tryRemove(replica_path + "/queue/" + (*it)->znode_name);
if (code)
LOG_INFO(log, "Couldn't remove " << replica_path + "/queue/" + (*it)->znode_name << ": " << zkutil::ZooKeeper::error2string(code));
LOG_INFO_FORMATTED(log, "Couldn't remove {}: {}", replica_path + "/queue/" + (*it)->znode_name, zkutil::ZooKeeper::error2string(code));
updateStateOnQueueEntryRemoval(
*it, /* is_successful = */ false,
@ -1378,7 +1378,7 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep
alter_sequence.finishDataAlter(mutation.entry->alter_version, lock);
if (mutation.parts_to_do.size() != 0)
{
LOG_INFO(log, "Seems like we jumped over mutation " << znode << " when downloaded part with bigger mutation number." << " It's OK, tasks for rest parts will be skipped, but probably a lot of mutations were executed concurrently on different replicas.");
LOG_INFO_FORMATTED(log, "Seems like we jumped over mutation {} when downloaded part with bigger mutation number.{}", znode, " It's OK, tasks for rest parts will be skipped, but probably a lot of mutations were executed concurrently on different replicas.");
mutation.parts_to_do.clear();
}
}
@ -1897,7 +1897,7 @@ bool ReplicatedMergeTreeMergePredicate::isMutationFinished(const ReplicatedMerge
size_t suddenly_appeared_parts = getPartNamesToMutate(mutation, queue.virtual_parts).size();
if (suddenly_appeared_parts)
{
LOG_TRACE(queue.log, "Mutation " << mutation.znode_name << " is not done yet because " << suddenly_appeared_parts << " parts to mutate suddenly appeared.");
LOG_TRACE_FORMATTED(queue.log, "Mutation {} is not done yet because {} parts to mutate suddenly appeared.", mutation.znode_name, suddenly_appeared_parts);
return false;
}
}

View File

@ -141,7 +141,7 @@ void ReplicatedMergeTreeRestartingThread::run()
if (storage.is_leader
&& relative_delay > static_cast<time_t>(storage_settings->min_relative_delay_to_yield_leadership))
{
LOG_INFO(log, "Relative replica delay (" << relative_delay << " seconds) is bigger than threshold (" << storage_settings->min_relative_delay_to_yield_leadership << "). Will yield leadership.");
LOG_INFO_FORMATTED(log, "Relative replica delay ({} seconds) is bigger than threshold ({}). Will yield leadership.", relative_delay, storage_settings->min_relative_delay_to_yield_leadership);
ProfileEvents::increment(ProfileEvents::ReplicaYieldLeadership);

View File

@ -664,7 +664,7 @@ void StorageBuffer::writeBlockToDestination(const Block & block, StoragePtr tabl
if (block_to_write.columns() == 0)
{
LOG_ERROR(log, "Destination table " << destination_id.getNameForLogs() << " have no common columns with block in buffer. Block of data is discarded.");
LOG_ERROR_FORMATTED(log, "Destination table {} have no common columns with block in buffer. Block of data is discarded.", destination_id.getNameForLogs());
return;
}

View File

@ -477,7 +477,7 @@ Pipes StorageDistributed::read(
ClusterPtr optimized_cluster = getOptimizedCluster(context, query_info.query);
if (optimized_cluster)
{
LOG_DEBUG(log, "Skipping irrelevant shards - the query will be sent to the following shards of the cluster (shard numbers): " << makeFormattedListOfShards(optimized_cluster));
LOG_DEBUG_FORMATTED(log, "Skipping irrelevant shards - the query will be sent to the following shards of the cluster (shard numbers): {}", makeFormattedListOfShards(optimized_cluster));
cluster = optimized_cluster;
}
else

View File

@ -777,7 +777,7 @@ void StorageReplicatedMergeTree::checkPartChecksumsAndAddCommitOps(const zkutil:
!zookeeper->exists(current_part_path + "/columns", &columns_stat_after) ||
columns_stat_before.version != columns_stat_after.version)
{
LOG_INFO(log, "Not checking checksums of part " << part_name << " with replica " << replica << " because part changed while we were reading its checksums");
LOG_INFO_FORMATTED(log, "Not checking checksums of part {} with replica {} because part changed while we were reading its checksums", part_name, replica);
continue;
}
@ -787,7 +787,7 @@ void StorageReplicatedMergeTree::checkPartChecksumsAndAddCommitOps(const zkutil:
if (replica_part_header.getColumnsHash() != local_part_header.getColumnsHash())
{
LOG_INFO(log, "Not checking checksums of part " << part_name << " with replica " << replica << " because columns are different");
LOG_INFO_FORMATTED(log, "Not checking checksums of part {} with replica {} because columns are different", part_name, replica);
continue;
}
@ -1363,7 +1363,7 @@ bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry)
}
else if (code == Coordination::ZBADVERSION || code == Coordination::ZNONODE || code == Coordination::ZNODEEXISTS)
{
LOG_DEBUG(log, "State was changed or isn't expected when trying to mark quorum for part " << entry.new_part_name << " as failed. Code: " << zkutil::ZooKeeper::error2string(code));
LOG_DEBUG_FORMATTED(log, "State was changed or isn't expected when trying to mark quorum for part {} as failed. Code: {}", entry.new_part_name, zkutil::ZooKeeper::error2string(code));
}
else
throw Coordination::Exception(code);
@ -3692,7 +3692,7 @@ void StorageReplicatedMergeTree::drop()
/// It may left some garbage if replica_path subtree are concurently modified
zookeeper->tryRemoveRecursive(replica_path);
if (zookeeper->exists(replica_path))
LOG_ERROR(log, "Replica was not completely removed from ZooKeeper, " << replica_path << " still exists and may contain some garbage.");
LOG_ERROR_FORMATTED(log, "Replica was not completely removed from ZooKeeper, {} still exists and may contain some garbage.", replica_path);
/// Check that `zookeeper_path` exists: it could have been deleted by another replica after execution of previous line.
Strings replicas;
@ -3701,7 +3701,7 @@ void StorageReplicatedMergeTree::drop()
LOG_INFO_FORMATTED(log, "Removing table {} (this might take several minutes)", zookeeper_path);
zookeeper->tryRemoveRecursive(zookeeper_path);
if (zookeeper->exists(zookeeper_path))
LOG_ERROR(log, "Table was not completely removed from ZooKeeper, " << zookeeper_path << " still exists and may contain some garbage.");
LOG_ERROR_FORMATTED(log, "Table was not completely removed from ZooKeeper, {} still exists and may contain some garbage.", zookeeper_path);
}
}
@ -4740,7 +4740,7 @@ void StorageReplicatedMergeTree::removePartsFromZooKeeper(
}
else
{
LOG_DEBUG(log, "There is no part " << part_names[i] << " in ZooKeeper, it was only in filesystem");
LOG_DEBUG_FORMATTED(log, "There is no part {} in ZooKeeper, it was only in filesystem", part_names[i]);
// emplace invalid future so that the total number of futures is the same as part_names.size();
remove_futures.emplace_back();
}
@ -4765,7 +4765,7 @@ void StorageReplicatedMergeTree::removePartsFromZooKeeper(
continue;
else if (response.error == Coordination::ZNONODE)
{
LOG_DEBUG(log, "There is no part " << part_names[i] << " in ZooKeeper, it was only in filesystem");
LOG_DEBUG_FORMATTED(log, "There is no part {} in ZooKeeper, it was only in filesystem", part_names[i]);
continue;
}
else if (Coordination::isHardwareError(response.error))