mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
find {base,src,programs} -name '*.h' -or -name '*.cpp' | xargs grep -l -P 'LOG_\w+\([^,]+, "[^"]+" << [^<]+ << "[^"]+"\);' | xargs sed -i -r -e 's/(LOG_\w+)\(([^,]+), "([^"]+)" << ([^<]+) << "([^"]+)"\);/\1_FORMATTED(\2, "\3{}\5", \4);/'
This commit is contained in:
parent
979e5d77c1
commit
e391b77d81
@ -850,7 +850,7 @@ void BaseDaemon::handleSignal(int signal_id)
|
|||||||
void BaseDaemon::onInterruptSignals(int signal_id)
|
void BaseDaemon::onInterruptSignals(int signal_id)
|
||||||
{
|
{
|
||||||
is_cancelled = true;
|
is_cancelled = true;
|
||||||
LOG_INFO(&logger(), "Received termination signal (" << strsignal(signal_id) << ")");
|
LOG_INFO_FORMATTED(&logger(), "Received termination signal ({})", strsignal(signal_id));
|
||||||
|
|
||||||
if (sigint_signals_counter >= 2)
|
if (sigint_signals_counter >= 2)
|
||||||
{
|
{
|
||||||
|
@ -47,7 +47,7 @@ void ClusterCopier::init()
|
|||||||
task_table.initShards(task_cluster->random_engine);
|
task_table.initShards(task_cluster->random_engine);
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG_DEBUG(log, "Will process " << task_cluster->table_tasks.size() << " table tasks");
|
LOG_DEBUG_FORMATTED(log, "Will process {} table tasks", task_cluster->table_tasks.size());
|
||||||
|
|
||||||
/// Do not initialize tables, will make deferred initialization in process()
|
/// Do not initialize tables, will make deferred initialization in process()
|
||||||
|
|
||||||
@ -181,7 +181,7 @@ void ClusterCopier::discoverTablePartitions(const ConnectionTimeouts & timeouts,
|
|||||||
for (const TaskShardPtr & task_shard : task_table.all_shards)
|
for (const TaskShardPtr & task_shard : task_table.all_shards)
|
||||||
thread_pool.scheduleOrThrowOnError([this, timeouts, task_shard]() { discoverShardPartitions(timeouts, task_shard); });
|
thread_pool.scheduleOrThrowOnError([this, timeouts, task_shard]() { discoverShardPartitions(timeouts, task_shard); });
|
||||||
|
|
||||||
LOG_DEBUG(log, "Waiting for " << thread_pool.active() << " setup jobs");
|
LOG_DEBUG_FORMATTED(log, "Waiting for {} setup jobs", thread_pool.active());
|
||||||
thread_pool.wait();
|
thread_pool.wait();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -484,7 +484,7 @@ bool ClusterCopier::checkPartitionPieceIsDone(const TaskTable & task_table, cons
|
|||||||
|
|
||||||
if (!is_clean)
|
if (!is_clean)
|
||||||
{
|
{
|
||||||
LOG_INFO(log, "Partition " << partition_name << " become dirty");
|
LOG_INFO_FORMATTED(log, "Partition {} become dirty", partition_name);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -511,7 +511,7 @@ bool ClusterCopier::checkPartitionPieceIsDone(const TaskTable & task_table, cons
|
|||||||
{
|
{
|
||||||
if (zxid1[shard_num] != zxid2[shard_num])
|
if (zxid1[shard_num] != zxid2[shard_num])
|
||||||
{
|
{
|
||||||
LOG_INFO(log, "The task " << piece_status_paths[shard_num] << " is being modified now. Partition piece will be rechecked");
|
LOG_INFO_FORMATTED(log, "The task {} is being modified now. Partition piece will be rechecked", piece_status_paths[shard_num]);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -530,7 +530,7 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
|
|||||||
inject_fault = value < move_fault_probability;
|
inject_fault = value < move_fault_probability;
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG_DEBUG(log, "Try to move " << partition_name << " to destionation table");
|
LOG_DEBUG_FORMATTED(log, "Try to move {} to destionation table", partition_name);
|
||||||
|
|
||||||
auto zookeeper = context.getZooKeeper();
|
auto zookeeper = context.getZooKeeper();
|
||||||
|
|
||||||
@ -794,7 +794,7 @@ bool ClusterCopier::tryDropPartitionPiece(
|
|||||||
{
|
{
|
||||||
if (e.code == Coordination::ZNODEEXISTS)
|
if (e.code == Coordination::ZNODEEXISTS)
|
||||||
{
|
{
|
||||||
LOG_DEBUG(log, "Partition " << task_partition.name << " is being filled now by somebody, sleep");
|
LOG_DEBUG_FORMATTED(log, "Partition {} is being filled now by somebody, sleep", task_partition.name);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -841,7 +841,7 @@ bool ClusterCopier::tryDropPartitionPiece(
|
|||||||
PoolMode::GET_MANY,
|
PoolMode::GET_MANY,
|
||||||
ClusterExecutionMode::ON_EACH_NODE);
|
ClusterExecutionMode::ON_EACH_NODE);
|
||||||
|
|
||||||
LOG_INFO(log, "DROP PARTITION was successfully executed on " << num_shards << " nodes of a cluster.");
|
LOG_INFO_FORMATTED(log, "DROP PARTITION was successfully executed on {} nodes of a cluster.", num_shards);
|
||||||
|
|
||||||
/// Update the locking node
|
/// Update the locking node
|
||||||
if (!my_clock.is_stale())
|
if (!my_clock.is_stale())
|
||||||
@ -889,7 +889,7 @@ bool ClusterCopier::tryProcessTable(const ConnectionTimeouts & timeouts, TaskTab
|
|||||||
|
|
||||||
++cluster_partition.total_tries;
|
++cluster_partition.total_tries;
|
||||||
|
|
||||||
LOG_DEBUG(log, "Processing partition " << partition_name << " for the whole cluster");
|
LOG_DEBUG_FORMATTED(log, "Processing partition {} for the whole cluster", partition_name);
|
||||||
|
|
||||||
/// Process each source shard having current partition and copy current partition
|
/// Process each source shard having current partition and copy current partition
|
||||||
/// NOTE: shards are sorted by "distance" to current host
|
/// NOTE: shards are sorted by "distance" to current host
|
||||||
@ -1591,7 +1591,7 @@ void ClusterCopier::dropHelpingTables(const TaskTable & task_table)
|
|||||||
PoolMode::GET_MANY,
|
PoolMode::GET_MANY,
|
||||||
ClusterExecutionMode::ON_EACH_NODE);
|
ClusterExecutionMode::ON_EACH_NODE);
|
||||||
|
|
||||||
LOG_DEBUG(log, "DROP TABLE query was successfully executed on " << toString(num_nodes) << " nodes.");
|
LOG_DEBUG_FORMATTED(log, "DROP TABLE query was successfully executed on {} nodes.", toString(num_nodes));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1618,7 +1618,7 @@ void ClusterCopier::dropParticularPartitionPieceFromAllHelpingTables(const TaskT
|
|||||||
PoolMode::GET_MANY,
|
PoolMode::GET_MANY,
|
||||||
ClusterExecutionMode::ON_EACH_NODE);
|
ClusterExecutionMode::ON_EACH_NODE);
|
||||||
|
|
||||||
LOG_DEBUG(log, "DROP PARTITION query was successfully executed on " << toString(num_nodes) << " nodes.");
|
LOG_DEBUG_FORMATTED(log, "DROP PARTITION query was successfully executed on {} nodes.", toString(num_nodes));
|
||||||
}
|
}
|
||||||
LOG_DEBUG_FORMATTED(log, "All helping tables dropped partition {}", partition_name);
|
LOG_DEBUG_FORMATTED(log, "All helping tables dropped partition {}", partition_name);
|
||||||
}
|
}
|
||||||
|
@ -89,11 +89,11 @@ void ODBCColumnsInfoHandler::handleRequest(Poco::Net::HTTPServerRequest & reques
|
|||||||
if (params.has("schema"))
|
if (params.has("schema"))
|
||||||
{
|
{
|
||||||
schema_name = params.get("schema");
|
schema_name = params.get("schema");
|
||||||
LOG_TRACE(log, "Will fetch info for table '" << schema_name + "." + table_name << "'");
|
LOG_TRACE_FORMATTED(log, "Will fetch info for table '{}'", schema_name + "." + table_name);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
LOG_TRACE(log, "Will fetch info for table '" << table_name << "'");
|
LOG_TRACE_FORMATTED(log, "Will fetch info for table '{}'", table_name);
|
||||||
LOG_TRACE(log, "Got connection str '" << connection_string << "'");
|
LOG_TRACE_FORMATTED(log, "Got connection str '{}'", connection_string);
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
@ -124,7 +124,7 @@ void ODBCColumnsInfoHandler::handleRequest(Poco::Net::HTTPServerRequest & reques
|
|||||||
select->format(settings);
|
select->format(settings);
|
||||||
std::string query = ss.str();
|
std::string query = ss.str();
|
||||||
|
|
||||||
LOG_TRACE(log, "Inferring structure with query '" << query << "'");
|
LOG_TRACE_FORMATTED(log, "Inferring structure with query '{}'", query);
|
||||||
|
|
||||||
if (POCO_SQL_ODBC_CLASS::Utility::isError(POCO_SQL_ODBC_CLASS::SQLPrepare(hstmt, reinterpret_cast<SQLCHAR *>(query.data()), query.size())))
|
if (POCO_SQL_ODBC_CLASS::Utility::isError(POCO_SQL_ODBC_CLASS::SQLPrepare(hstmt, reinterpret_cast<SQLCHAR *>(query.data()), query.size())))
|
||||||
throw POCO_SQL_ODBC_CLASS::DescriptorException(session.dbc());
|
throw POCO_SQL_ODBC_CLASS::DescriptorException(session.dbc());
|
||||||
|
@ -132,7 +132,7 @@ void ODBCHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Ne
|
|||||||
std::string format = params.get("format", "RowBinary");
|
std::string format = params.get("format", "RowBinary");
|
||||||
|
|
||||||
std::string connection_string = params.get("connection_string");
|
std::string connection_string = params.get("connection_string");
|
||||||
LOG_TRACE(log, "Connection string: '" << connection_string << "'");
|
LOG_TRACE_FORMATTED(log, "Connection string: '{}'", connection_string);
|
||||||
|
|
||||||
WriteBufferFromHTTPServerResponse out(request, response, keep_alive_timeout);
|
WriteBufferFromHTTPServerResponse out(request, response, keep_alive_timeout);
|
||||||
|
|
||||||
|
@ -103,7 +103,7 @@ void InterserverIOHTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & requ
|
|||||||
response.setStatusAndReason(Poco::Net::HTTPServerResponse::HTTP_UNAUTHORIZED);
|
response.setStatusAndReason(Poco::Net::HTTPServerResponse::HTTP_UNAUTHORIZED);
|
||||||
if (!response.sent())
|
if (!response.sent())
|
||||||
writeString(message, *used_output.out);
|
writeString(message, *used_output.out);
|
||||||
LOG_WARNING(log, "Query processing failed request: '" << request.getURI() << "' authentication failed");
|
LOG_WARNING_FORMATTED(log, "Query processing failed request: '{}' authentication failed", request.getURI());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (Exception & e)
|
catch (Exception & e)
|
||||||
|
@ -234,11 +234,11 @@ void MySQLHandler::authenticate(const String & user_name, const String & auth_pl
|
|||||||
}
|
}
|
||||||
catch (const Exception & exc)
|
catch (const Exception & exc)
|
||||||
{
|
{
|
||||||
LOG_ERROR(log, "Authentication for user " << user_name << " failed.");
|
LOG_ERROR_FORMATTED(log, "Authentication for user {} failed.", user_name);
|
||||||
packet_sender->sendPacket(ERR_Packet(exc.code(), "00000", exc.message()), true);
|
packet_sender->sendPacket(ERR_Packet(exc.code(), "00000", exc.message()), true);
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
LOG_INFO(log, "Authentication for user " << user_name << " succeeded.");
|
LOG_INFO_FORMATTED(log, "Authentication for user {} succeeded.", user_name);
|
||||||
}
|
}
|
||||||
|
|
||||||
void MySQLHandler::comInitDB(ReadBuffer & payload)
|
void MySQLHandler::comInitDB(ReadBuffer & payload)
|
||||||
|
@ -91,7 +91,7 @@ namespace
|
|||||||
|
|
||||||
void setupTmpPath(Logger * log, const std::string & path)
|
void setupTmpPath(Logger * log, const std::string & path)
|
||||||
{
|
{
|
||||||
LOG_DEBUG(log, "Setting up " << path << " to store temporary data in it");
|
LOG_DEBUG_FORMATTED(log, "Setting up {} to store temporary data in it", path);
|
||||||
|
|
||||||
Poco::File(path).createDirectories();
|
Poco::File(path).createDirectories();
|
||||||
|
|
||||||
@ -372,7 +372,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
|
|||||||
/// Initialize DateLUT early, to not interfere with running time of first query.
|
/// Initialize DateLUT early, to not interfere with running time of first query.
|
||||||
LOG_DEBUG_FORMATTED(log, "Initializing DateLUT.");
|
LOG_DEBUG_FORMATTED(log, "Initializing DateLUT.");
|
||||||
DateLUT::instance();
|
DateLUT::instance();
|
||||||
LOG_TRACE(log, "Initialized DateLUT with time zone '" << DateLUT::instance().getTimeZone() << "'.");
|
LOG_TRACE_FORMATTED(log, "Initialized DateLUT with time zone '{}'.", DateLUT::instance().getTimeZone());
|
||||||
|
|
||||||
|
|
||||||
/// Storage with temporary data for processing of heavy queries.
|
/// Storage with temporary data for processing of heavy queries.
|
||||||
|
@ -619,7 +619,7 @@ void ConfigProcessor::savePreprocessedConfig(const LoadedConfig & loaded_config,
|
|||||||
Poco::File(preprocessed_path_parent).createDirectories();
|
Poco::File(preprocessed_path_parent).createDirectories();
|
||||||
}
|
}
|
||||||
DOMWriter().writeNode(preprocessed_path, loaded_config.preprocessed_xml);
|
DOMWriter().writeNode(preprocessed_path, loaded_config.preprocessed_xml);
|
||||||
LOG_DEBUG(log, "Saved preprocessed configuration to '" << preprocessed_path << "'.");
|
LOG_DEBUG_FORMATTED(log, "Saved preprocessed configuration to '{}'.", preprocessed_path);
|
||||||
}
|
}
|
||||||
catch (Poco::Exception & e)
|
catch (Poco::Exception & e)
|
||||||
{
|
{
|
||||||
|
@ -87,7 +87,7 @@ void ConfigReloader::reloadIfNewer(bool force, bool throw_on_error, bool fallbac
|
|||||||
ConfigProcessor::LoadedConfig loaded_config;
|
ConfigProcessor::LoadedConfig loaded_config;
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
LOG_DEBUG(log, "Loading config '" << path << "'");
|
LOG_DEBUG_FORMATTED(log, "Loading config '{}'", path);
|
||||||
|
|
||||||
loaded_config = config_processor.loadConfig(/* allow_zk_includes = */ true);
|
loaded_config = config_processor.loadConfig(/* allow_zk_includes = */ true);
|
||||||
if (loaded_config.has_zk_includes)
|
if (loaded_config.has_zk_includes)
|
||||||
|
@ -90,7 +90,7 @@ void LazyPipeFDs::tryIncreaseSize(int desired_size)
|
|||||||
{
|
{
|
||||||
if (errno == EINVAL)
|
if (errno == EINVAL)
|
||||||
{
|
{
|
||||||
LOG_INFO(log, "Cannot get pipe capacity, " << errnoToString(ErrorCodes::CANNOT_FCNTL) << ". Very old Linux kernels have no support for this fcntl.");
|
LOG_INFO_FORMATTED(log, "Cannot get pipe capacity, {}. Very old Linux kernels have no support for this fcntl.", errnoToString(ErrorCodes::CANNOT_FCNTL));
|
||||||
/// It will work nevertheless.
|
/// It will work nevertheless.
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -50,7 +50,7 @@ ShellCommand::~ShellCommand()
|
|||||||
{
|
{
|
||||||
if (terminate_in_destructor)
|
if (terminate_in_destructor)
|
||||||
{
|
{
|
||||||
LOG_TRACE(getLogger(), "Will kill shell command pid " << pid << " with SIGTERM");
|
LOG_TRACE_FORMATTED(getLogger(), "Will kill shell command pid {} with SIGTERM", pid);
|
||||||
int retcode = kill(pid, SIGTERM);
|
int retcode = kill(pid, SIGTERM);
|
||||||
if (retcode != 0)
|
if (retcode != 0)
|
||||||
LOG_WARNING(getLogger(), "Cannot kill shell command pid " << pid << " errno '" << errnoToString(retcode) << "'");
|
LOG_WARNING(getLogger(), "Cannot kill shell command pid " << pid << " errno '" << errnoToString(retcode) << "'");
|
||||||
|
@ -45,7 +45,7 @@ StatusFile::StatusFile(const std::string & path_)
|
|||||||
if (!contents.empty())
|
if (!contents.empty())
|
||||||
LOG_INFO(&Logger::get("StatusFile"), "Status file " << path << " already exists - unclean restart. Contents:\n" << contents);
|
LOG_INFO(&Logger::get("StatusFile"), "Status file " << path << " already exists - unclean restart. Contents:\n" << contents);
|
||||||
else
|
else
|
||||||
LOG_INFO(&Logger::get("StatusFile"), "Status file " << path << " already exists and is empty - probably unclean hardware restart.");
|
LOG_INFO_FORMATTED(&Logger::get("StatusFile"), "Status file {} already exists and is empty - probably unclean hardware restart.", path);
|
||||||
}
|
}
|
||||||
|
|
||||||
fd = ::open(path.c_str(), O_WRONLY | O_CREAT | O_CLOEXEC, 0666);
|
fd = ::open(path.c_str(), O_WRONLY | O_CREAT | O_CLOEXEC, 0666);
|
||||||
|
@ -111,7 +111,7 @@ void BackgroundSchedulePoolTaskInfo::execute()
|
|||||||
static const int32_t slow_execution_threshold_ms = 200;
|
static const int32_t slow_execution_threshold_ms = 200;
|
||||||
|
|
||||||
if (milliseconds >= slow_execution_threshold_ms)
|
if (milliseconds >= slow_execution_threshold_ms)
|
||||||
LOG_TRACE(&Logger::get(log_name), "Execution took " << milliseconds << " ms.");
|
LOG_TRACE_FORMATTED(&Logger::get(log_name), "Execution took {} ms.", milliseconds);
|
||||||
|
|
||||||
{
|
{
|
||||||
std::lock_guard lock_schedule(schedule_mutex);
|
std::lock_guard lock_schedule(schedule_mutex);
|
||||||
@ -156,7 +156,7 @@ BackgroundSchedulePool::BackgroundSchedulePool(size_t size_, CurrentMetrics::Met
|
|||||||
, memory_metric(memory_metric_)
|
, memory_metric(memory_metric_)
|
||||||
, thread_name(thread_name_)
|
, thread_name(thread_name_)
|
||||||
{
|
{
|
||||||
LOG_INFO(&Logger::get("BackgroundSchedulePool/" + thread_name), "Create BackgroundSchedulePool with " << size << " threads");
|
LOG_INFO_FORMATTED(&Logger::get("BackgroundSchedulePool/" + thread_name), "Create BackgroundSchedulePool with {} threads", size);
|
||||||
|
|
||||||
threads.resize(size);
|
threads.resize(size);
|
||||||
for (auto & thread : threads)
|
for (auto & thread : threads)
|
||||||
|
@ -599,7 +599,7 @@ namespace details
|
|||||||
void SettingsCollectionUtils::warningNameNotFound(const StringRef & name)
|
void SettingsCollectionUtils::warningNameNotFound(const StringRef & name)
|
||||||
{
|
{
|
||||||
static auto * log = &Logger::get("Settings");
|
static auto * log = &Logger::get("Settings");
|
||||||
LOG_WARNING(log, "Unknown setting " << name << ", skipping");
|
LOG_WARNING_FORMATTED(log, "Unknown setting {}, skipping", name);
|
||||||
}
|
}
|
||||||
|
|
||||||
void SettingsCollectionUtils::throwNameNotFound(const StringRef & name)
|
void SettingsCollectionUtils::throwNameNotFound(const StringRef & name)
|
||||||
|
@ -118,7 +118,7 @@ Block MergeSortingBlockInputStream::readImpl()
|
|||||||
/// If there was temporary files.
|
/// If there was temporary files.
|
||||||
ProfileEvents::increment(ProfileEvents::ExternalSortMerge);
|
ProfileEvents::increment(ProfileEvents::ExternalSortMerge);
|
||||||
|
|
||||||
LOG_INFO(log, "There are " << temporary_files.size() << " temporary sorted parts to merge.");
|
LOG_INFO_FORMATTED(log, "There are {} temporary sorted parts to merge.", temporary_files.size());
|
||||||
|
|
||||||
/// Create sorted streams to merge.
|
/// Create sorted streams to merge.
|
||||||
for (const auto & file : temporary_files)
|
for (const auto & file : temporary_files)
|
||||||
|
@ -152,7 +152,7 @@ bool DatabaseLazy::empty() const
|
|||||||
|
|
||||||
void DatabaseLazy::attachTable(const String & table_name, const StoragePtr & table, const String &)
|
void DatabaseLazy::attachTable(const String & table_name, const StoragePtr & table, const String &)
|
||||||
{
|
{
|
||||||
LOG_DEBUG(log, "Attach table " << backQuote(table_name) << ".");
|
LOG_DEBUG_FORMATTED(log, "Attach table {}.", backQuote(table_name));
|
||||||
std::lock_guard lock(mutex);
|
std::lock_guard lock(mutex);
|
||||||
time_t current_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
|
time_t current_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
|
||||||
|
|
||||||
@ -169,7 +169,7 @@ StoragePtr DatabaseLazy::detachTable(const String & table_name)
|
|||||||
{
|
{
|
||||||
StoragePtr res;
|
StoragePtr res;
|
||||||
{
|
{
|
||||||
LOG_DEBUG(log, "Detach table " << backQuote(table_name) << ".");
|
LOG_DEBUG_FORMATTED(log, "Detach table {}.", backQuote(table_name));
|
||||||
std::lock_guard lock(mutex);
|
std::lock_guard lock(mutex);
|
||||||
auto it = tables_cache.find(table_name);
|
auto it = tables_cache.find(table_name);
|
||||||
if (it == tables_cache.end())
|
if (it == tables_cache.end())
|
||||||
@ -216,7 +216,7 @@ StoragePtr DatabaseLazy::loadTable(const String & table_name) const
|
|||||||
{
|
{
|
||||||
SCOPE_EXIT({ clearExpiredTables(); });
|
SCOPE_EXIT({ clearExpiredTables(); });
|
||||||
|
|
||||||
LOG_DEBUG(log, "Load table " << backQuote(table_name) << " to cache.");
|
LOG_DEBUG_FORMATTED(log, "Load table {} to cache.", backQuote(table_name));
|
||||||
|
|
||||||
const String table_metadata_path = getMetadataPath() + "/" + escapeForFileName(table_name) + ".sql";
|
const String table_metadata_path = getMetadataPath() + "/" + escapeForFileName(table_name) + ".sql";
|
||||||
|
|
||||||
@ -277,14 +277,14 @@ void DatabaseLazy::clearExpiredTables() const
|
|||||||
|
|
||||||
if (!it->second.table || it->second.table.unique())
|
if (!it->second.table || it->second.table.unique())
|
||||||
{
|
{
|
||||||
LOG_DEBUG(log, "Drop table " << backQuote(it->first) << " from cache.");
|
LOG_DEBUG_FORMATTED(log, "Drop table {} from cache.", backQuote(it->first));
|
||||||
it->second.table.reset();
|
it->second.table.reset();
|
||||||
expired_tables.erase(it->second.expiration_iterator);
|
expired_tables.erase(it->second.expiration_iterator);
|
||||||
it->second.expiration_iterator = cache_expiration_queue.end();
|
it->second.expiration_iterator = cache_expiration_queue.end();
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
LOG_DEBUG(log, "Table " << backQuote(it->first) << " is busy.");
|
LOG_DEBUG_FORMATTED(log, "Table {} is busy.", backQuote(it->first));
|
||||||
busy_tables.splice(busy_tables.end(), expired_tables, it->second.expiration_iterator);
|
busy_tables.splice(busy_tables.end(), expired_tables, it->second.expiration_iterator);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -376,7 +376,7 @@ void DatabaseOnDisk::iterateMetadataFiles(const Context & context, const Iterati
|
|||||||
if (Poco::File(context.getPath() + getDataPath() + '/' + object_name).exists())
|
if (Poco::File(context.getPath() + getDataPath() + '/' + object_name).exists())
|
||||||
{
|
{
|
||||||
Poco::File(getMetadataPath() + file_name).renameTo(getMetadataPath() + object_name + ".sql");
|
Poco::File(getMetadataPath() + file_name).renameTo(getMetadataPath() + object_name + ".sql");
|
||||||
LOG_WARNING(log, "Object " << backQuote(object_name) << " was not dropped previously and will be restored");
|
LOG_WARNING_FORMATTED(log, "Object {} was not dropped previously and will be restored", backQuote(object_name));
|
||||||
process_metadata_file(object_name + ".sql");
|
process_metadata_file(object_name + ".sql");
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@ -442,7 +442,7 @@ ASTPtr DatabaseOnDisk::parseQueryFromMetadata(Poco::Logger * loger, const Contex
|
|||||||
*/
|
*/
|
||||||
if (remove_empty && query.empty())
|
if (remove_empty && query.empty())
|
||||||
{
|
{
|
||||||
LOG_ERROR(loger, "File " << metadata_file_path << " is empty. Removing.");
|
LOG_ERROR_FORMATTED(loger, "File {} is empty. Removing.", metadata_file_path);
|
||||||
Poco::File(metadata_file_path).remove();
|
Poco::File(metadata_file_path).remove();
|
||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
|
@ -245,7 +245,7 @@ StoragePolicySelector::StoragePolicySelector(
|
|||||||
"StoragePolicy name can contain only alphanumeric and '_' (" + name + ")", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
|
"StoragePolicy name can contain only alphanumeric and '_' (" + name + ")", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
|
||||||
|
|
||||||
policies.emplace(name, std::make_shared<StoragePolicy>(name, config, config_prefix + "." + name, disks));
|
policies.emplace(name, std::make_shared<StoragePolicy>(name, config, config_prefix + "." + name, disks));
|
||||||
LOG_INFO(&Logger::get("StoragePolicySelector"), "Storage policy " << backQuote(name) << " loaded");
|
LOG_INFO_FORMATTED(&Logger::get("StoragePolicySelector"), "Storage policy {} loaded", backQuote(name));
|
||||||
}
|
}
|
||||||
|
|
||||||
constexpr auto default_storage_policy_name = "default";
|
constexpr auto default_storage_policy_name = "default";
|
||||||
|
@ -722,7 +722,7 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, co
|
|||||||
CompressedWriteBuffer compressed_buf(file_buf);
|
CompressedWriteBuffer compressed_buf(file_buf);
|
||||||
NativeBlockOutputStream block_out(compressed_buf, ClickHouseRevision::get(), getHeader(false));
|
NativeBlockOutputStream block_out(compressed_buf, ClickHouseRevision::get(), getHeader(false));
|
||||||
|
|
||||||
LOG_DEBUG(log, "Writing part of aggregation data into temporary file " << path << ".");
|
LOG_DEBUG_FORMATTED(log, "Writing part of aggregation data into temporary file {}.", path);
|
||||||
ProfileEvents::increment(ProfileEvents::ExternalAggregationWritePart);
|
ProfileEvents::increment(ProfileEvents::ExternalAggregationWritePart);
|
||||||
|
|
||||||
/// Flush only two-level data and possibly overflow data.
|
/// Flush only two-level data and possibly overflow data.
|
||||||
@ -2099,7 +2099,7 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
|
|||||||
auto bucket_num = blocks.front().info.bucket_num;
|
auto bucket_num = blocks.front().info.bucket_num;
|
||||||
bool is_overflows = blocks.front().info.is_overflows;
|
bool is_overflows = blocks.front().info.is_overflows;
|
||||||
|
|
||||||
LOG_TRACE(log, "Merging partially aggregated blocks (bucket = " << bucket_num << ").");
|
LOG_TRACE_FORMATTED(log, "Merging partially aggregated blocks (bucket = {}).", bucket_num);
|
||||||
Stopwatch watch;
|
Stopwatch watch;
|
||||||
|
|
||||||
/** If possible, change 'method' to some_hash64. Otherwise, leave as is.
|
/** If possible, change 'method' to some_hash64. Otherwise, leave as is.
|
||||||
|
@ -386,11 +386,11 @@ void DDLWorker::processTasks()
|
|||||||
{
|
{
|
||||||
if (current_task->entry_name == entry_name)
|
if (current_task->entry_name == entry_name)
|
||||||
{
|
{
|
||||||
LOG_INFO(log, "Trying to process task " << entry_name << " again");
|
LOG_INFO_FORMATTED(log, "Trying to process task {} again", entry_name);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
LOG_INFO(log, "Task " << current_task->entry_name << " was deleted from ZooKeeper before current host committed it");
|
LOG_INFO_FORMATTED(log, "Task {} was deleted from ZooKeeper before current host committed it", current_task->entry_name);
|
||||||
current_task = nullptr;
|
current_task = nullptr;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -863,7 +863,7 @@ void DDLWorker::cleanupQueue(Int64 current_time_seconds, const ZooKeeperPtr & zo
|
|||||||
/// Skip if there are active nodes (it is weak guard)
|
/// Skip if there are active nodes (it is weak guard)
|
||||||
if (zookeeper->exists(node_path + "/active", &stat) && stat.numChildren > 0)
|
if (zookeeper->exists(node_path + "/active", &stat) && stat.numChildren > 0)
|
||||||
{
|
{
|
||||||
LOG_INFO(log, "Task " << node_name << " should be deleted, but there are active workers. Skipping it.");
|
LOG_INFO_FORMATTED(log, "Task {} should be deleted, but there are active workers. Skipping it.", node_name);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -872,14 +872,14 @@ void DDLWorker::cleanupQueue(Int64 current_time_seconds, const ZooKeeperPtr & zo
|
|||||||
auto lock = createSimpleZooKeeperLock(zookeeper, node_path, "lock", host_fqdn_id);
|
auto lock = createSimpleZooKeeperLock(zookeeper, node_path, "lock", host_fqdn_id);
|
||||||
if (!lock->tryLock())
|
if (!lock->tryLock())
|
||||||
{
|
{
|
||||||
LOG_INFO(log, "Task " << node_name << " should be deleted, but it is locked. Skipping it.");
|
LOG_INFO_FORMATTED(log, "Task {} should be deleted, but it is locked. Skipping it.", node_name);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (node_lifetime_is_expired)
|
if (node_lifetime_is_expired)
|
||||||
LOG_INFO(log, "Lifetime of task " << node_name << " is expired, deleting it");
|
LOG_INFO_FORMATTED(log, "Lifetime of task {} is expired, deleting it", node_name);
|
||||||
else if (node_is_outside_max_window)
|
else if (node_is_outside_max_window)
|
||||||
LOG_INFO(log, "Task " << node_name << " is outdated, deleting it");
|
LOG_INFO_FORMATTED(log, "Task {} is outdated, deleting it", node_name);
|
||||||
|
|
||||||
/// Deleting
|
/// Deleting
|
||||||
{
|
{
|
||||||
@ -1027,7 +1027,7 @@ void DDLWorker::runMainThread()
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
LOG_ERROR(log, "Unexpected ZooKeeper error: " << getCurrentExceptionMessage(true) << ". Terminating.");
|
LOG_ERROR_FORMATTED(log, "Unexpected ZooKeeper error: {}. Terminating.", getCurrentExceptionMessage(true));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -271,7 +271,7 @@ private:
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG_TRACE(log, "Loading config file '" << path << "'.");
|
LOG_TRACE_FORMATTED(log, "Loading config file '{}'.", path);
|
||||||
file_info.file_contents = repository.load(path);
|
file_info.file_contents = repository.load(path);
|
||||||
auto & file_contents = *file_info.file_contents;
|
auto & file_contents = *file_info.file_contents;
|
||||||
|
|
||||||
@ -475,7 +475,7 @@ public:
|
|||||||
{
|
{
|
||||||
const auto & info = it->second;
|
const auto & info = it->second;
|
||||||
if (info.loaded() || info.isLoading())
|
if (info.loaded() || info.isLoading())
|
||||||
LOG_TRACE(log, "Unloading '" << name << "' because its configuration has been removed or detached");
|
LOG_TRACE_FORMATTED(log, "Unloading '{}' because its configuration has been removed or detached", name);
|
||||||
infos.erase(it);
|
infos.erase(it);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -931,7 +931,7 @@ private:
|
|||||||
/// Does the loading, possibly in the separate thread.
|
/// Does the loading, possibly in the separate thread.
|
||||||
void doLoading(const String & name, size_t loading_id, bool forced_to_reload, size_t min_id_to_finish_loading_dependencies_, bool async)
|
void doLoading(const String & name, size_t loading_id, bool forced_to_reload, size_t min_id_to_finish_loading_dependencies_, bool async)
|
||||||
{
|
{
|
||||||
LOG_TRACE(log, "Start loading object '" << name << "'");
|
LOG_TRACE_FORMATTED(log, "Start loading object '{}'", name);
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
/// Prepare for loading.
|
/// Prepare for loading.
|
||||||
@ -1045,12 +1045,12 @@ private:
|
|||||||
/// This is necessary because the object could be removed or load with another config while the `mutex` was unlocked.
|
/// This is necessary because the object could be removed or load with another config while the `mutex` was unlocked.
|
||||||
if (!info)
|
if (!info)
|
||||||
{
|
{
|
||||||
LOG_TRACE(log, "Next update time for '" << name << "' will not be set because this object was not found.");
|
LOG_TRACE_FORMATTED(log, "Next update time for '{}' will not be set because this object was not found.", name);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (!info->isLoading())
|
if (!info->isLoading())
|
||||||
{
|
{
|
||||||
LOG_TRACE(log, "Next update time for '" << name << "' will not be set because this object is not currently loading.");
|
LOG_TRACE_FORMATTED(log, "Next update time for '{}' will not be set because this object is not currently loading.", name);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (info->loading_id != loading_id)
|
if (info->loading_id != loading_id)
|
||||||
|
@ -84,7 +84,7 @@ ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * as
|
|||||||
if (!is_unlimited_query && max_size && processes.size() >= max_size)
|
if (!is_unlimited_query && max_size && processes.size() >= max_size)
|
||||||
{
|
{
|
||||||
if (queue_max_wait_ms)
|
if (queue_max_wait_ms)
|
||||||
LOG_WARNING(&Logger::get("ProcessList"), "Too many simultaneous queries, will wait " << queue_max_wait_ms << " ms.");
|
LOG_WARNING_FORMATTED(&Logger::get("ProcessList"), "Too many simultaneous queries, will wait {} ms.", queue_max_wait_ms);
|
||||||
if (!queue_max_wait_ms || !have_space.wait_for(lock, std::chrono::milliseconds(queue_max_wait_ms), [&]{ return processes.size() < max_size; }))
|
if (!queue_max_wait_ms || !have_space.wait_for(lock, std::chrono::milliseconds(queue_max_wait_ms), [&]{ return processes.size() < max_size; }))
|
||||||
throw Exception("Too many simultaneous queries. Maximum: " + toString(max_size), ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES);
|
throw Exception("Too many simultaneous queries. Maximum: " + toString(max_size), ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES);
|
||||||
}
|
}
|
||||||
|
@ -150,7 +150,7 @@ Chunk IRowInputFormat::generate()
|
|||||||
if (num_errors && (params.allow_errors_num > 0 || params.allow_errors_ratio > 0))
|
if (num_errors && (params.allow_errors_num > 0 || params.allow_errors_ratio > 0))
|
||||||
{
|
{
|
||||||
Logger * log = &Logger::get("IRowInputFormat");
|
Logger * log = &Logger::get("IRowInputFormat");
|
||||||
LOG_TRACE(log, "Skipped " << num_errors << " rows with errors while reading the input stream");
|
LOG_TRACE_FORMATTED(log, "Skipped {} rows with errors while reading the input stream", num_errors);
|
||||||
}
|
}
|
||||||
|
|
||||||
readSuffix();
|
readSuffix();
|
||||||
|
@ -229,7 +229,7 @@ void MergeSortingTransform::generate()
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
ProfileEvents::increment(ProfileEvents::ExternalSortMerge);
|
ProfileEvents::increment(ProfileEvents::ExternalSortMerge);
|
||||||
LOG_INFO(log, "There are " << temporary_files.size() << " temporary sorted parts to merge.");
|
LOG_INFO_FORMATTED(log, "There are {} temporary sorted parts to merge.", temporary_files.size());
|
||||||
|
|
||||||
if (!chunks.empty())
|
if (!chunks.empty())
|
||||||
processors.emplace_back(std::make_shared<MergeSorterSource>(
|
processors.emplace_back(std::make_shared<MergeSorterSource>(
|
||||||
|
@ -428,7 +428,7 @@ struct StorageDistributedDirectoryMonitor::Batch
|
|||||||
String tmp_file{parent.current_batch_file_path + ".tmp"};
|
String tmp_file{parent.current_batch_file_path + ".tmp"};
|
||||||
|
|
||||||
if (Poco::File{tmp_file}.exists())
|
if (Poco::File{tmp_file}.exists())
|
||||||
LOG_ERROR(parent.log, "Temporary file " << backQuote(tmp_file) << " exists. Unclean shutdown?");
|
LOG_ERROR_FORMATTED(parent.log, "Temporary file {} exists. Unclean shutdown?", backQuote(tmp_file));
|
||||||
|
|
||||||
{
|
{
|
||||||
WriteBufferFromFile out{tmp_file, O_WRONLY | O_TRUNC | O_CREAT};
|
WriteBufferFromFile out{tmp_file, O_WRONLY | O_TRUNC | O_CREAT};
|
||||||
@ -454,7 +454,7 @@ struct StorageDistributedDirectoryMonitor::Batch
|
|||||||
auto file_path = file_index_to_path.find(file_idx);
|
auto file_path = file_index_to_path.find(file_idx);
|
||||||
if (file_path == file_index_to_path.end())
|
if (file_path == file_index_to_path.end())
|
||||||
{
|
{
|
||||||
LOG_ERROR(parent.log, "Failed to send batch: file with index " << file_idx << " is absent");
|
LOG_ERROR_FORMATTED(parent.log, "Failed to send batch: file with index {} is absent", file_idx);
|
||||||
batch_broken = true;
|
batch_broken = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -488,14 +488,14 @@ struct StorageDistributedDirectoryMonitor::Batch
|
|||||||
|
|
||||||
if (!batch_broken)
|
if (!batch_broken)
|
||||||
{
|
{
|
||||||
LOG_TRACE(parent.log, "Sent a batch of " << file_indices.size() << " files.");
|
LOG_TRACE_FORMATTED(parent.log, "Sent a batch of {} files.", file_indices.size());
|
||||||
|
|
||||||
for (UInt64 file_index : file_indices)
|
for (UInt64 file_index : file_indices)
|
||||||
Poco::File{file_index_to_path.at(file_index)}.remove();
|
Poco::File{file_index_to_path.at(file_index)}.remove();
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
LOG_ERROR(parent.log, "Marking a batch of " << file_indices.size() << " files as broken.");
|
LOG_ERROR_FORMATTED(parent.log, "Marking a batch of {} files as broken.", file_indices.size());
|
||||||
|
|
||||||
for (UInt64 file_idx : file_indices)
|
for (UInt64 file_idx : file_indices)
|
||||||
{
|
{
|
||||||
|
@ -174,7 +174,7 @@ Pipes StorageKafka::read(
|
|||||||
pipes.emplace_back(std::make_shared<SourceFromInputStream>(std::make_shared<KafkaBlockInputStream>(*this, context, column_names, 1)));
|
pipes.emplace_back(std::make_shared<SourceFromInputStream>(std::make_shared<KafkaBlockInputStream>(*this, context, column_names, 1)));
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG_DEBUG(log, "Starting reading " << pipes.size() << " streams");
|
LOG_DEBUG_FORMATTED(log, "Starting reading {} streams", pipes.size());
|
||||||
return pipes;
|
return pipes;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -391,7 +391,7 @@ void StorageKafka::threadFunc()
|
|||||||
if (!checkDependencies(table_id))
|
if (!checkDependencies(table_id))
|
||||||
break;
|
break;
|
||||||
|
|
||||||
LOG_DEBUG(log, "Started streaming to " << dependencies_count << " attached views");
|
LOG_DEBUG_FORMATTED(log, "Started streaming to {} attached views", dependencies_count);
|
||||||
|
|
||||||
// Reschedule if not limited
|
// Reschedule if not limited
|
||||||
if (!streamToViews())
|
if (!streamToViews())
|
||||||
|
@ -1100,7 +1100,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
|
|||||||
|
|
||||||
calculateColumnSizesImpl();
|
calculateColumnSizesImpl();
|
||||||
|
|
||||||
LOG_DEBUG(log, "Loaded data parts (" << data_parts_indexes.size() << " items)");
|
LOG_DEBUG_FORMATTED(log, "Loaded data parts ({} items)", data_parts_indexes.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -1197,7 +1197,7 @@ MergeTreeData::DataPartsVector MergeTreeData::grabOldParts(bool force)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (!res.empty())
|
if (!res.empty())
|
||||||
LOG_TRACE(log, "Found " << res.size() << " old parts to remove.");
|
LOG_TRACE_FORMATTED(log, "Found {} old parts to remove.", res.size());
|
||||||
|
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
@ -2771,7 +2771,7 @@ void MergeTreeData::dropDetached(const ASTPtr & partition, bool part, const Cont
|
|||||||
renamed_parts.addPart(part_info.dir_name, "deleting_" + part_info.dir_name);
|
renamed_parts.addPart(part_info.dir_name, "deleting_" + part_info.dir_name);
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG_DEBUG(log, "Will drop " << renamed_parts.old_and_new_names.size() << " detached parts.");
|
LOG_DEBUG_FORMATTED(log, "Will drop {} detached parts.", renamed_parts.old_and_new_names.size());
|
||||||
|
|
||||||
renamed_parts.tryRenameAll();
|
renamed_parts.tryRenameAll();
|
||||||
|
|
||||||
@ -3300,7 +3300,7 @@ void MergeTreeData::freezePartitionsByMatcher(MatcherFn matcher, const String &
|
|||||||
++parts_processed;
|
++parts_processed;
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG_DEBUG(log, "Freezed " << parts_processed << " parts");
|
LOG_DEBUG_FORMATTED(log, "Freezed {} parts", parts_processed);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool MergeTreeData::canReplacePartition(const DataPartPtr & src_part) const
|
bool MergeTreeData::canReplacePartition(const DataPartPtr & src_part) const
|
||||||
@ -3486,7 +3486,7 @@ MergeTreeData::CurrentlyMovingPartsTagger MergeTreeData::checkPartsForMove(const
|
|||||||
|
|
||||||
bool MergeTreeData::moveParts(CurrentlyMovingPartsTagger && moving_tagger)
|
bool MergeTreeData::moveParts(CurrentlyMovingPartsTagger && moving_tagger)
|
||||||
{
|
{
|
||||||
LOG_INFO(log, "Got " << moving_tagger.parts_to_move.size() << " parts to move.");
|
LOG_INFO_FORMATTED(log, "Got {} parts to move.", moving_tagger.parts_to_move.size());
|
||||||
|
|
||||||
for (const auto & moving_part : moving_tagger.parts_to_move)
|
for (const auto & moving_part : moving_tagger.parts_to_move)
|
||||||
{
|
{
|
||||||
|
@ -633,7 +633,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
|
|||||||
|
|
||||||
if (need_remove_expired_values && ttl_merges_blocker.isCancelled())
|
if (need_remove_expired_values && ttl_merges_blocker.isCancelled())
|
||||||
{
|
{
|
||||||
LOG_INFO(log, "Part " << new_data_part->name << " has values with expired TTL, but merges with TTL are cancelled.");
|
LOG_INFO_FORMATTED(log, "Part {} has values with expired TTL, but merges with TTL are cancelled.", new_data_part->name);
|
||||||
need_remove_expired_values = false;
|
need_remove_expired_values = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1383,7 +1383,7 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
|
|||||||
{
|
{
|
||||||
if (!part->volume->getDisk()->exists(part->getFullRelativePath() + index->getFileName() + ".idx"))
|
if (!part->volume->getDisk()->exists(part->getFullRelativePath() + index->getFileName() + ".idx"))
|
||||||
{
|
{
|
||||||
LOG_DEBUG(log, "File for index " << backQuote(index->name) << " does not exist. Skipping it.");
|
LOG_DEBUG_FORMATTED(log, "File for index {} does not exist. Skipping it.", backQuote(index->name));
|
||||||
return ranges;
|
return ranges;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -151,7 +151,7 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block)
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
LOG_DEBUG(log, "Wrote block with " << current_block.block.rows() << " rows");
|
LOG_DEBUG_FORMATTED(log, "Wrote block with {} rows", current_block.block.rows());
|
||||||
}
|
}
|
||||||
|
|
||||||
try
|
try
|
||||||
@ -214,7 +214,7 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
|
|||||||
|
|
||||||
if (!block_number_lock)
|
if (!block_number_lock)
|
||||||
{
|
{
|
||||||
LOG_INFO(log, "Block with ID " << block_id << " already exists; ignoring it.");
|
LOG_INFO_FORMATTED(log, "Block with ID {} already exists; ignoring it.", block_id);
|
||||||
part->is_duplicate = true;
|
part->is_duplicate = true;
|
||||||
last_block_is_duplicate = true;
|
last_block_is_duplicate = true;
|
||||||
ProfileEvents::increment(ProfileEvents::DuplicatedInsertedBlocks);
|
ProfileEvents::increment(ProfileEvents::DuplicatedInsertedBlocks);
|
||||||
|
@ -339,7 +339,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
|
|||||||
|
|
||||||
auto num_nodes_to_delete = timed_blocks.end() - first_outdated_block;
|
auto num_nodes_to_delete = timed_blocks.end() - first_outdated_block;
|
||||||
if (num_nodes_to_delete)
|
if (num_nodes_to_delete)
|
||||||
LOG_TRACE(log, "Cleared " << num_nodes_to_delete << " old blocks from ZooKeeper");
|
LOG_TRACE_FORMATTED(log, "Cleared {} old blocks from ZooKeeper", num_nodes_to_delete);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -116,7 +116,7 @@ void ReplicatedMergeTreePartCheckThread::searchForMissingPart(const String & par
|
|||||||
* and don't delete the queue entry when in doubt.
|
* and don't delete the queue entry when in doubt.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
LOG_WARNING(log, "Checking if anyone has a part covering " << part_name << ".");
|
LOG_WARNING_FORMATTED(log, "Checking if anyone has a part covering {}.", part_name);
|
||||||
|
|
||||||
bool found_part_with_the_same_min_block = false;
|
bool found_part_with_the_same_min_block = false;
|
||||||
bool found_part_with_the_same_max_block = false;
|
bool found_part_with_the_same_max_block = false;
|
||||||
@ -170,14 +170,14 @@ void ReplicatedMergeTreePartCheckThread::searchForMissingPart(const String & par
|
|||||||
if (!storage.queue.remove(zookeeper, part_name))
|
if (!storage.queue.remove(zookeeper, part_name))
|
||||||
{
|
{
|
||||||
/// The part was not in our queue. Why did it happen?
|
/// The part was not in our queue. Why did it happen?
|
||||||
LOG_ERROR(log, "Missing part " << part_name << " is not in our queue.");
|
LOG_ERROR_FORMATTED(log, "Missing part {} is not in our queue.", part_name);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** This situation is possible if on all the replicas where the part was, it deteriorated.
|
/** This situation is possible if on all the replicas where the part was, it deteriorated.
|
||||||
* For example, a replica that has just written it has power turned off and the data has not been written from cache to disk.
|
* For example, a replica that has just written it has power turned off and the data has not been written from cache to disk.
|
||||||
*/
|
*/
|
||||||
LOG_ERROR(log, "Part " << part_name << " is lost forever.");
|
LOG_ERROR_FORMATTED(log, "Part {} is lost forever.", part_name);
|
||||||
ProfileEvents::increment(ProfileEvents::ReplicatedDataLoss);
|
ProfileEvents::increment(ProfileEvents::ReplicatedDataLoss);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -216,7 +216,7 @@ CheckResult ReplicatedMergeTreePartCheckThread::checkPart(const String & part_na
|
|||||||
/// If the part is in ZooKeeper, check its data with its checksums, and them with ZooKeeper.
|
/// If the part is in ZooKeeper, check its data with its checksums, and them with ZooKeeper.
|
||||||
if (zookeeper->tryGet(part_path, part_znode))
|
if (zookeeper->tryGet(part_path, part_znode))
|
||||||
{
|
{
|
||||||
LOG_WARNING(log, "Checking data of part " << part_name << ".");
|
LOG_WARNING_FORMATTED(log, "Checking data of part {}.", part_name);
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
@ -247,7 +247,7 @@ CheckResult ReplicatedMergeTreePartCheckThread::checkPart(const String & part_na
|
|||||||
return {part_name, false, "Checking part was cancelled"};
|
return {part_name, false, "Checking part was cancelled"};
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG_INFO(log, "Part " << part_name << " looks good.");
|
LOG_INFO_FORMATTED(log, "Part {} looks good.", part_name);
|
||||||
}
|
}
|
||||||
catch (const Exception & e)
|
catch (const Exception & e)
|
||||||
{
|
{
|
||||||
|
@ -152,7 +152,7 @@ void ReplicatedMergeTreeQueue::insertUnlocked(
|
|||||||
}
|
}
|
||||||
if (entry->type == LogEntry::ALTER_METADATA)
|
if (entry->type == LogEntry::ALTER_METADATA)
|
||||||
{
|
{
|
||||||
LOG_TRACE(log, "Adding alter metadata version " << entry->alter_version << " to the queue");
|
LOG_TRACE_FORMATTED(log, "Adding alter metadata version {} to the queue", entry->alter_version);
|
||||||
alter_sequence.addMetadataAlter(entry->alter_version, entry->have_mutation, state_lock);
|
alter_sequence.addMetadataAlter(entry->alter_version, entry->have_mutation, state_lock);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -560,7 +560,7 @@ void ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, C
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (!copied_entries.empty())
|
if (!copied_entries.empty())
|
||||||
LOG_DEBUG(log, "Pulled " << copied_entries.size() << " entries to queue.");
|
LOG_DEBUG_FORMATTED(log, "Pulled {} entries to queue.", copied_entries.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (storage.queue_task_handle)
|
if (storage.queue_task_handle)
|
||||||
@ -1397,7 +1397,7 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep
|
|||||||
}
|
}
|
||||||
else if (mutation.parts_to_do.size() == 0)
|
else if (mutation.parts_to_do.size() == 0)
|
||||||
{
|
{
|
||||||
LOG_TRACE(log, "Will check if mutation " << mutation.entry->znode_name << " is done");
|
LOG_TRACE_FORMATTED(log, "Will check if mutation {} is done", mutation.entry->znode_name);
|
||||||
candidates.push_back(mutation.entry);
|
candidates.push_back(mutation.entry);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1406,7 +1406,7 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep
|
|||||||
if (candidates.empty())
|
if (candidates.empty())
|
||||||
return false;
|
return false;
|
||||||
else
|
else
|
||||||
LOG_DEBUG(log, "Trying to finalize " << candidates.size() << " mutations");
|
LOG_DEBUG_FORMATTED(log, "Trying to finalize {} mutations", candidates.size());
|
||||||
|
|
||||||
auto merge_pred = getMergePredicate(zookeeper);
|
auto merge_pred = getMergePredicate(zookeeper);
|
||||||
|
|
||||||
@ -1430,7 +1430,7 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep
|
|||||||
auto it = mutations_by_znode.find(entry->znode_name);
|
auto it = mutations_by_znode.find(entry->znode_name);
|
||||||
if (it != mutations_by_znode.end())
|
if (it != mutations_by_znode.end())
|
||||||
{
|
{
|
||||||
LOG_TRACE(log, "Mutation " << entry->znode_name << " is done");
|
LOG_TRACE_FORMATTED(log, "Mutation {} is done", entry->znode_name);
|
||||||
it->second.is_done = true;
|
it->second.is_done = true;
|
||||||
if (entry->isAlterMutation())
|
if (entry->isAlterMutation())
|
||||||
{
|
{
|
||||||
|
@ -248,7 +248,7 @@ void ReplicatedMergeTreeRestartingThread::removeFailedQuorumParts()
|
|||||||
|
|
||||||
if (part)
|
if (part)
|
||||||
{
|
{
|
||||||
LOG_DEBUG(log, "Found part " << part_name << " with failed quorum. Moving to detached. This shouldn't happen often.");
|
LOG_DEBUG_FORMATTED(log, "Found part {} with failed quorum. Moving to detached. This shouldn't happen often.", part_name);
|
||||||
storage.forgetPartAndMoveToDetached(part, "noquorum");
|
storage.forgetPartAndMoveToDetached(part, "noquorum");
|
||||||
storage.queue.removeFromVirtualParts(part->info);
|
storage.queue.removeFromVirtualParts(part->info);
|
||||||
}
|
}
|
||||||
|
@ -636,7 +636,7 @@ void StorageBuffer::writeBlockToDestination(const Block & block, StoragePtr tabl
|
|||||||
|
|
||||||
if (!table)
|
if (!table)
|
||||||
{
|
{
|
||||||
LOG_ERROR(log, "Destination table " << destination_id.getNameForLogs() << " doesn't exist. Block of data is discarded.");
|
LOG_ERROR_FORMATTED(log, "Destination table {} doesn't exist. Block of data is discarded.", destination_id.getNameForLogs());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -439,7 +439,7 @@ bool StorageDistributed::canForceGroupByNoMerge(const Context &context, QueryPro
|
|||||||
reason = "GROUP BY " + backQuote(serializeAST(*group_by, true));
|
reason = "GROUP BY " + backQuote(serializeAST(*group_by, true));
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG_DEBUG(log, "Force distributed_group_by_no_merge for " << reason << " (injective)");
|
LOG_DEBUG_FORMATTED(log, "Force distributed_group_by_no_merge for {} (injective)", reason);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -214,7 +214,7 @@ void StorageMergeTree::truncate(const ASTPtr &, const Context &, TableStructureW
|
|||||||
auto parts_to_remove = getDataPartsVector();
|
auto parts_to_remove = getDataPartsVector();
|
||||||
removePartsFromWorkingSet(parts_to_remove, true);
|
removePartsFromWorkingSet(parts_to_remove, true);
|
||||||
|
|
||||||
LOG_INFO(log, "Removed " << parts_to_remove.size() << " parts.");
|
LOG_INFO_FORMATTED(log, "Removed {} parts.", parts_to_remove.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
clearOldMutations(true);
|
clearOldMutations(true);
|
||||||
@ -395,7 +395,7 @@ void StorageMergeTree::waitForMutation(Int64 version, const String & file_name)
|
|||||||
auto check = [version, this]() { return shutdown_called || isMutationDone(version); };
|
auto check = [version, this]() { return shutdown_called || isMutationDone(version); };
|
||||||
std::unique_lock lock(mutation_wait_mutex);
|
std::unique_lock lock(mutation_wait_mutex);
|
||||||
mutation_wait_event.wait(lock, check);
|
mutation_wait_event.wait(lock, check);
|
||||||
LOG_INFO(log, "Mutation " << file_name << " done");
|
LOG_INFO_FORMATTED(log, "Mutation {} done", file_name);
|
||||||
}
|
}
|
||||||
|
|
||||||
void StorageMergeTree::mutate(const MutationCommands & commands, const Context & query_context)
|
void StorageMergeTree::mutate(const MutationCommands & commands, const Context & query_context)
|
||||||
|
@ -335,7 +335,7 @@ void StorageReplicatedMergeTree::waitMutationToFinishOnReplicas(
|
|||||||
Coordination::Stat exists_stat;
|
Coordination::Stat exists_stat;
|
||||||
if (!getZooKeeper()->exists(zookeeper_path + "/mutations/" + mutation_id, &exists_stat, wait_event))
|
if (!getZooKeeper()->exists(zookeeper_path + "/mutations/" + mutation_id, &exists_stat, wait_event))
|
||||||
{
|
{
|
||||||
LOG_WARNING(log, "Mutation " << mutation_id << " was killed or manually removed. Nothing to wait.");
|
LOG_WARNING_FORMATTED(log, "Mutation {} was killed or manually removed. Nothing to wait.", mutation_id);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -883,7 +883,7 @@ MergeTreeData::DataPartsVector StorageReplicatedMergeTree::checkPartChecksumsAnd
|
|||||||
|
|
||||||
if (failed_op_index < num_check_ops && e.code == Coordination::ZNODEEXISTS)
|
if (failed_op_index < num_check_ops && e.code == Coordination::ZNODEEXISTS)
|
||||||
{
|
{
|
||||||
LOG_INFO(log, "The part " << e.getPathForFirstFailedOp() << " on a replica suddenly appeared, will recheck checksums");
|
LOG_INFO_FORMATTED(log, "The part {} on a replica suddenly appeared, will recheck checksums", e.getPathForFirstFailedOp());
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
throw;
|
throw;
|
||||||
@ -935,12 +935,12 @@ bool StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (entry.type == LogEntry::GET_PART && entry.source_replica == replica_name)
|
if (entry.type == LogEntry::GET_PART && entry.source_replica == replica_name)
|
||||||
LOG_WARNING(log, "Part " << entry.new_part_name << " from own log doesn't exist.");
|
LOG_WARNING_FORMATTED(log, "Part {} from own log doesn't exist.", entry.new_part_name);
|
||||||
|
|
||||||
/// Perhaps we don't need this part, because during write with quorum, the quorum has failed (see below about `/quorum/failed_parts`).
|
/// Perhaps we don't need this part, because during write with quorum, the quorum has failed (see below about `/quorum/failed_parts`).
|
||||||
if (entry.quorum && getZooKeeper()->exists(zookeeper_path + "/quorum/failed_parts/" + entry.new_part_name))
|
if (entry.quorum && getZooKeeper()->exists(zookeeper_path + "/quorum/failed_parts/" + entry.new_part_name))
|
||||||
{
|
{
|
||||||
LOG_DEBUG(log, "Skipping action for part " << entry.new_part_name << " because quorum for that part was failed.");
|
LOG_DEBUG_FORMATTED(log, "Skipping action for part {} because quorum for that part was failed.", entry.new_part_name);
|
||||||
return true; /// NOTE Deletion from `virtual_parts` is not done, but it is only necessary for merge.
|
return true; /// NOTE Deletion from `virtual_parts` is not done, but it is only necessary for merge.
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -992,7 +992,7 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
|
|||||||
|
|
||||||
if (storage_settings_ptr->always_fetch_merged_part)
|
if (storage_settings_ptr->always_fetch_merged_part)
|
||||||
{
|
{
|
||||||
LOG_INFO(log, "Will fetch part " << entry.new_part_name << " because setting 'always_fetch_merged_part' is true");
|
LOG_INFO_FORMATTED(log, "Will fetch part {} because setting 'always_fetch_merged_part' is true", entry.new_part_name);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1019,7 +1019,7 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
|
|||||||
if (!have_all_parts)
|
if (!have_all_parts)
|
||||||
{
|
{
|
||||||
/// If you do not have all the necessary parts, try to take some already merged part from someone.
|
/// If you do not have all the necessary parts, try to take some already merged part from someone.
|
||||||
LOG_DEBUG(log, "Don't have all parts for merge " << entry.new_part_name << "; will try to fetch it instead");
|
LOG_DEBUG_FORMATTED(log, "Don't have all parts for merge {}; will try to fetch it instead", entry.new_part_name);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
else if (entry.create_time + storage_settings_ptr->prefer_fetch_merged_part_time_threshold.totalSeconds() <= time(nullptr))
|
else if (entry.create_time + storage_settings_ptr->prefer_fetch_merged_part_time_threshold.totalSeconds() <= time(nullptr))
|
||||||
@ -1378,7 +1378,7 @@ bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry)
|
|||||||
|
|
||||||
if (code == Coordination::ZOK)
|
if (code == Coordination::ZOK)
|
||||||
{
|
{
|
||||||
LOG_DEBUG(log, "Marked quorum for part " << entry.new_part_name << " as failed.");
|
LOG_DEBUG_FORMATTED(log, "Marked quorum for part {} as failed.", entry.new_part_name);
|
||||||
queue.removeFromVirtualParts(part_info);
|
queue.removeFromVirtualParts(part_info);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -1437,7 +1437,7 @@ bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry)
|
|||||||
|
|
||||||
if (!parts_for_merge.empty() && replica.empty())
|
if (!parts_for_merge.empty() && replica.empty())
|
||||||
{
|
{
|
||||||
LOG_INFO(log, "No active replica has part " << entry.new_part_name << ". Will fetch merged part instead.");
|
LOG_INFO_FORMATTED(log, "No active replica has part {}. Will fetch merged part instead.", entry.new_part_name);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1595,7 +1595,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
|
|||||||
source_table = DatabaseCatalog::instance().tryGetTable(source_table_id);
|
source_table = DatabaseCatalog::instance().tryGetTable(source_table_id);
|
||||||
if (!source_table)
|
if (!source_table)
|
||||||
{
|
{
|
||||||
LOG_DEBUG(log, "Can't use " << source_table_id.getNameForLogs() << " as source table for REPLACE PARTITION command. It does not exist.");
|
LOG_DEBUG_FORMATTED(log, "Can't use {} as source table for REPLACE PARTITION command. It does not exist.", source_table_id.getNameForLogs());
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1669,7 +1669,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
|
|||||||
|
|
||||||
if (replica.empty())
|
if (replica.empty())
|
||||||
{
|
{
|
||||||
LOG_DEBUG(log, "Part " << part_desc->new_part_name << " is not found on remote replicas");
|
LOG_DEBUG_FORMATTED(log, "Part {} is not found on remote replicas", part_desc->new_part_name);
|
||||||
|
|
||||||
/// Fallback to covering part
|
/// Fallback to covering part
|
||||||
replica = findReplicaHavingCoveringPart(part_desc->new_part_name, true, found_part_name);
|
replica = findReplicaHavingCoveringPart(part_desc->new_part_name, true, found_part_name);
|
||||||
@ -1677,7 +1677,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
|
|||||||
if (replica.empty())
|
if (replica.empty())
|
||||||
{
|
{
|
||||||
/// It is not fail, since adjacent parts could cover current part
|
/// It is not fail, since adjacent parts could cover current part
|
||||||
LOG_DEBUG(log, "Parts covering " << part_desc->new_part_name << " are not found on remote replicas");
|
LOG_DEBUG_FORMATTED(log, "Parts covering {} are not found on remote replicas", part_desc->new_part_name);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1834,7 +1834,7 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
|
|||||||
*/
|
*/
|
||||||
while (!zookeeper->exists(source_path + "/columns"))
|
while (!zookeeper->exists(source_path + "/columns"))
|
||||||
{
|
{
|
||||||
LOG_INFO(log, "Waiting for replica " << source_path << " to be fully created");
|
LOG_INFO_FORMATTED(log, "Waiting for replica {} to be fully created", source_path);
|
||||||
|
|
||||||
zkutil::EventPtr event = std::make_shared<Poco::Event>();
|
zkutil::EventPtr event = std::make_shared<Poco::Event>();
|
||||||
if (zookeeper->exists(source_path + "/columns", nullptr, event))
|
if (zookeeper->exists(source_path + "/columns", nullptr, event))
|
||||||
@ -1901,7 +1901,7 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
|
|||||||
{
|
{
|
||||||
/// If source replica's log_pointer changed than we probably read
|
/// If source replica's log_pointer changed than we probably read
|
||||||
/// stale state of /queue and have to try one more time.
|
/// stale state of /queue and have to try one more time.
|
||||||
LOG_WARNING(log, "Log pointer of source replica " << source_replica << " changed while we loading queue nodes. Will retry.");
|
LOG_WARNING_FORMATTED(log, "Log pointer of source replica {} changed while we loading queue nodes. Will retry.", source_replica);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@ -1936,7 +1936,7 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
|
|||||||
{
|
{
|
||||||
queue.remove(zookeeper, part);
|
queue.remove(zookeeper, part);
|
||||||
parts_to_remove_from_zk.emplace_back(part);
|
parts_to_remove_from_zk.emplace_back(part);
|
||||||
LOG_WARNING(log, "Source replica does not have part " << part << ". Removing it from ZooKeeper.");
|
LOG_WARNING_FORMATTED(log, "Source replica does not have part {}. Removing it from ZooKeeper.", part);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
tryRemovePartsFromZooKeeperWithRetries(parts_to_remove_from_zk);
|
tryRemovePartsFromZooKeeperWithRetries(parts_to_remove_from_zk);
|
||||||
@ -1948,7 +1948,7 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
|
|||||||
if (active_parts_set.getContainingPart(part->name).empty())
|
if (active_parts_set.getContainingPart(part->name).empty())
|
||||||
{
|
{
|
||||||
parts_to_remove_from_working_set.emplace_back(part);
|
parts_to_remove_from_working_set.emplace_back(part);
|
||||||
LOG_WARNING(log, "Source replica does not have part " << part->name << ". Removing it from working set.");
|
LOG_WARNING_FORMATTED(log, "Source replica does not have part {}. Removing it from working set.", part->name);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
removePartsFromWorkingSet(parts_to_remove_from_working_set, true);
|
removePartsFromWorkingSet(parts_to_remove_from_working_set, true);
|
||||||
@ -1964,7 +1964,7 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
|
|||||||
zookeeper->create(replica_path + "/queue/queue-", log_entry.toString(), zkutil::CreateMode::PersistentSequential);
|
zookeeper->create(replica_path + "/queue/queue-", log_entry.toString(), zkutil::CreateMode::PersistentSequential);
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG_DEBUG(log, "Queued " << active_parts.size() << " parts to be fetched");
|
LOG_DEBUG_FORMATTED(log, "Queued {} parts to be fetched", active_parts.size());
|
||||||
|
|
||||||
/// Add content of the reference/master replica queue to the queue.
|
/// Add content of the reference/master replica queue to the queue.
|
||||||
for (const String & entry : source_queue)
|
for (const String & entry : source_queue)
|
||||||
@ -1972,7 +1972,7 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
|
|||||||
zookeeper->create(replica_path + "/queue/queue-", entry, zkutil::CreateMode::PersistentSequential);
|
zookeeper->create(replica_path + "/queue/queue-", entry, zkutil::CreateMode::PersistentSequential);
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG_DEBUG(log, "Copied " << source_queue.size() << " queue entries");
|
LOG_DEBUG_FORMATTED(log, "Copied {} queue entries", source_queue.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -2772,7 +2772,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
|
|||||||
|
|
||||||
if (auto part = getPartIfExists(part_info, {IMergeTreeDataPart::State::Outdated, IMergeTreeDataPart::State::Deleting}))
|
if (auto part = getPartIfExists(part_info, {IMergeTreeDataPart::State::Outdated, IMergeTreeDataPart::State::Deleting}))
|
||||||
{
|
{
|
||||||
LOG_DEBUG(log, "Part " << part->name << " should be deleted after previous attempt before fetch");
|
LOG_DEBUG_FORMATTED(log, "Part {} should be deleted after previous attempt before fetch", part->name);
|
||||||
/// Force immediate parts cleanup to delete the part that was left from the previous fetch attempt.
|
/// Force immediate parts cleanup to delete the part that was left from the previous fetch attempt.
|
||||||
cleanup_thread.wakeup();
|
cleanup_thread.wakeup();
|
||||||
return false;
|
return false;
|
||||||
@ -2782,7 +2782,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
|
|||||||
std::lock_guard lock(currently_fetching_parts_mutex);
|
std::lock_guard lock(currently_fetching_parts_mutex);
|
||||||
if (!currently_fetching_parts.insert(part_name).second)
|
if (!currently_fetching_parts.insert(part_name).second)
|
||||||
{
|
{
|
||||||
LOG_DEBUG(log, "Part " << part_name << " is already fetching right now");
|
LOG_DEBUG_FORMATTED(log, "Part {} is already fetching right now", part_name);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -3734,7 +3734,7 @@ void StorageReplicatedMergeTree::drop()
|
|||||||
Strings replicas;
|
Strings replicas;
|
||||||
if (zookeeper->tryGetChildren(zookeeper_path + "/replicas", replicas) == Coordination::ZOK && replicas.empty())
|
if (zookeeper->tryGetChildren(zookeeper_path + "/replicas", replicas) == Coordination::ZOK && replicas.empty())
|
||||||
{
|
{
|
||||||
LOG_INFO(log, "Removing table " << zookeeper_path << " (this might take several minutes)");
|
LOG_INFO_FORMATTED(log, "Removing table {} (this might take several minutes)", zookeeper_path);
|
||||||
zookeeper->tryRemoveRecursive(zookeeper_path);
|
zookeeper->tryRemoveRecursive(zookeeper_path);
|
||||||
if (zookeeper->exists(zookeeper_path))
|
if (zookeeper->exists(zookeeper_path))
|
||||||
LOG_ERROR(log, "Table was not completely removed from ZooKeeper, "
|
LOG_ERROR(log, "Table was not completely removed from ZooKeeper, "
|
||||||
@ -3963,7 +3963,7 @@ bool StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String &
|
|||||||
if (!log_node_name.empty())
|
if (!log_node_name.empty())
|
||||||
LOG_DEBUG(log, "Looking for node corresponding to " << log_node_name << " in " << replica << " queue");
|
LOG_DEBUG(log, "Looking for node corresponding to " << log_node_name << " in " << replica << " queue");
|
||||||
else
|
else
|
||||||
LOG_DEBUG(log, "Looking for corresponding node in " << replica << " queue");
|
LOG_DEBUG_FORMATTED(log, "Looking for corresponding node in {} queue", replica);
|
||||||
|
|
||||||
/** Second - find the corresponding entry in the queue of the specified replica.
|
/** Second - find the corresponding entry in the queue of the specified replica.
|
||||||
* Its number may match neither the `log` node nor the `queue` node of the current replica (for us).
|
* Its number may match neither the `log` node nor the `queue` node of the current replica (for us).
|
||||||
@ -4359,7 +4359,7 @@ void StorageReplicatedMergeTree::fetchPartition(const ASTPtr & partition, const
|
|||||||
do
|
do
|
||||||
{
|
{
|
||||||
if (try_no)
|
if (try_no)
|
||||||
LOG_INFO(log, "Some of parts (" << missing_parts.size() << ") are missing. Will try to fetch covering parts.");
|
LOG_INFO_FORMATTED(log, "Some of parts ({}) are missing. Will try to fetch covering parts.", missing_parts.size());
|
||||||
|
|
||||||
if (try_no >= query_context.getSettings().max_fetch_partition_retries_count)
|
if (try_no >= query_context.getSettings().max_fetch_partition_retries_count)
|
||||||
throw Exception("Too many retries to fetch parts from " + best_replica_path, ErrorCodes::TOO_MANY_RETRIES_TO_FETCH_PARTS);
|
throw Exception("Too many retries to fetch parts from " + best_replica_path, ErrorCodes::TOO_MANY_RETRIES_TO_FETCH_PARTS);
|
||||||
@ -4623,7 +4623,7 @@ void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK()
|
|||||||
remove_parts_from_filesystem(parts_to_delete_only_from_filesystem);
|
remove_parts_from_filesystem(parts_to_delete_only_from_filesystem);
|
||||||
removePartsFinally(parts_to_delete_only_from_filesystem);
|
removePartsFinally(parts_to_delete_only_from_filesystem);
|
||||||
|
|
||||||
LOG_DEBUG(log, "Removed " << parts_to_delete_only_from_filesystem.size() << " old duplicate parts");
|
LOG_DEBUG_FORMATTED(log, "Removed {} old duplicate parts", parts_to_delete_only_from_filesystem.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Delete normal parts from ZooKeeper
|
/// Delete normal parts from ZooKeeper
|
||||||
@ -4634,7 +4634,7 @@ void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK()
|
|||||||
for (const auto & part : parts_to_delete_completely)
|
for (const auto & part : parts_to_delete_completely)
|
||||||
part_names_to_delete_completely.emplace_back(part->name);
|
part_names_to_delete_completely.emplace_back(part->name);
|
||||||
|
|
||||||
LOG_DEBUG(log, "Removing " << parts_to_delete_completely.size() << " old parts from ZooKeeper");
|
LOG_DEBUG_FORMATTED(log, "Removing {} old parts from ZooKeeper", parts_to_delete_completely.size());
|
||||||
removePartsFromZooKeeper(zookeeper, part_names_to_delete_completely, &part_names_to_retry_deletion);
|
removePartsFromZooKeeper(zookeeper, part_names_to_delete_completely, &part_names_to_retry_deletion);
|
||||||
}
|
}
|
||||||
catch (...)
|
catch (...)
|
||||||
@ -4644,7 +4644,7 @@ void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK()
|
|||||||
|
|
||||||
/// Part names that were reliably deleted from ZooKeeper should be deleted from filesystem
|
/// Part names that were reliably deleted from ZooKeeper should be deleted from filesystem
|
||||||
auto num_reliably_deleted_parts = parts_to_delete_completely.size() - part_names_to_retry_deletion.size();
|
auto num_reliably_deleted_parts = parts_to_delete_completely.size() - part_names_to_retry_deletion.size();
|
||||||
LOG_DEBUG(log, "Removed " << num_reliably_deleted_parts << " old parts from ZooKeeper. Removing them from filesystem.");
|
LOG_DEBUG_FORMATTED(log, "Removed {} old parts from ZooKeeper. Removing them from filesystem.", num_reliably_deleted_parts);
|
||||||
|
|
||||||
/// Delete normal parts on two sets
|
/// Delete normal parts on two sets
|
||||||
for (auto & part : parts_to_delete_completely)
|
for (auto & part : parts_to_delete_completely)
|
||||||
@ -4659,7 +4659,7 @@ void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK()
|
|||||||
if (!parts_to_retry_deletion.empty())
|
if (!parts_to_retry_deletion.empty())
|
||||||
{
|
{
|
||||||
rollbackDeletingParts(parts_to_retry_deletion);
|
rollbackDeletingParts(parts_to_retry_deletion);
|
||||||
LOG_DEBUG(log, "Will retry deletion of " << parts_to_retry_deletion.size() << " parts in the next time");
|
LOG_DEBUG_FORMATTED(log, "Will retry deletion of {} parts in the next time", parts_to_retry_deletion.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Remove parts from filesystem and finally from data_parts
|
/// Remove parts from filesystem and finally from data_parts
|
||||||
@ -4668,7 +4668,7 @@ void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK()
|
|||||||
remove_parts_from_filesystem(parts_to_remove_from_filesystem);
|
remove_parts_from_filesystem(parts_to_remove_from_filesystem);
|
||||||
removePartsFinally(parts_to_remove_from_filesystem);
|
removePartsFinally(parts_to_remove_from_filesystem);
|
||||||
|
|
||||||
LOG_DEBUG(log, "Removed " << parts_to_remove_from_filesystem.size() << " old parts");
|
LOG_DEBUG_FORMATTED(log, "Removed {} old parts", parts_to_remove_from_filesystem.size());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -4890,7 +4890,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(const StoragePtr & source_
|
|||||||
Strings part_checksums;
|
Strings part_checksums;
|
||||||
std::vector<EphemeralLockInZooKeeper> ephemeral_locks;
|
std::vector<EphemeralLockInZooKeeper> ephemeral_locks;
|
||||||
|
|
||||||
LOG_DEBUG(log, "Cloning " << src_all_parts.size() << " parts");
|
LOG_DEBUG_FORMATTED(log, "Cloning {} parts", src_all_parts.size());
|
||||||
|
|
||||||
static const String TMP_PREFIX = "tmp_replace_from_";
|
static const String TMP_PREFIX = "tmp_replace_from_";
|
||||||
auto zookeeper = getZooKeeper();
|
auto zookeeper = getZooKeeper();
|
||||||
@ -5080,7 +5080,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
|
|||||||
Strings part_checksums;
|
Strings part_checksums;
|
||||||
std::vector<EphemeralLockInZooKeeper> ephemeral_locks;
|
std::vector<EphemeralLockInZooKeeper> ephemeral_locks;
|
||||||
|
|
||||||
LOG_DEBUG(log, "Cloning " << src_all_parts.size() << " parts");
|
LOG_DEBUG_FORMATTED(log, "Cloning {} parts", src_all_parts.size());
|
||||||
|
|
||||||
static const String TMP_PREFIX = "tmp_move_from_";
|
static const String TMP_PREFIX = "tmp_move_from_";
|
||||||
auto zookeeper = getZooKeeper();
|
auto zookeeper = getZooKeeper();
|
||||||
@ -5371,7 +5371,7 @@ bool StorageReplicatedMergeTree::dropPartsInPartition(
|
|||||||
MergeTreePartInfo drop_range_info;
|
MergeTreePartInfo drop_range_info;
|
||||||
if (!getFakePartCoveringAllPartsInPartition(partition_id, drop_range_info))
|
if (!getFakePartCoveringAllPartsInPartition(partition_id, drop_range_info))
|
||||||
{
|
{
|
||||||
LOG_INFO(log, "Will not drop partition " << partition_id << ", it is empty.");
|
LOG_INFO_FORMATTED(log, "Will not drop partition {}, it is empty.", partition_id);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user