mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 16:42:05 +00:00
Progress on task
This commit is contained in:
parent
0ab3ca8534
commit
eacff92d0e
@ -248,7 +248,7 @@ ConnectionPoolWithFailover::tryGetEntry(
|
|||||||
result.is_up_to_date = false;
|
result.is_up_to_date = false;
|
||||||
result.staleness = delay;
|
result.staleness = delay;
|
||||||
|
|
||||||
LOG_TRACE(log, "Server " << result.entry->getDescription() << " has unacceptable replica delay " << "for table " << table_to_check->database << "." << table_to_check->table << ": " << delay);
|
LOG_TRACE(log, "Server " << result.entry->getDescription() << " has unacceptable replica delay for table " << table_to_check->database << "." << table_to_check->table << ": " << delay);
|
||||||
ProfileEvents::increment(ProfileEvents::DistributedConnectionStaleReplica);
|
ProfileEvents::increment(ProfileEvents::DistributedConnectionStaleReplica);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -95,7 +95,7 @@ int main(int, char **)
|
|||||||
mut->set(3);
|
mut->set(3);
|
||||||
|
|
||||||
std::cerr << "refcounts: " << x->use_count() << ", " << mut->use_count() << "\n";
|
std::cerr << "refcounts: " << x->use_count() << ", " << mut->use_count() << "\n";
|
||||||
std::cerr << "addresses: " << x.get() << ", " << ", " << mut.get() << "\n";
|
std::cerr << "addresses: " << x.get() << ", " << mut.get() << "\n";
|
||||||
y = std::move(mut);
|
y = std::move(mut);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -62,6 +62,6 @@ TEST(ShellCommand, AutoWait)
|
|||||||
//command->wait(); // now automatic
|
//command->wait(); // now automatic
|
||||||
}
|
}
|
||||||
|
|
||||||
// std::cerr << "inspect me: ps auxwwf" << "\n";
|
// std::cerr << "inspect me: ps auxwwf\n";
|
||||||
// std::this_thread::sleep_for(std::chrono::seconds(100));
|
// std::this_thread::sleep_for(std::chrono::seconds(100));
|
||||||
}
|
}
|
||||||
|
@ -220,7 +220,7 @@ template <typename T, typename ContainerLeft, typename ContainerRight>
|
|||||||
|
|
||||||
if (l_size != r_size)
|
if (l_size != r_size)
|
||||||
{
|
{
|
||||||
result = ::testing::AssertionFailure() << "size mismatch" << " expected: " << l_size << " got:" << r_size;
|
result = ::testing::AssertionFailure() << "size mismatch expected: " << l_size << " got:" << r_size;
|
||||||
}
|
}
|
||||||
if (l_size == 0 || r_size == 0)
|
if (l_size == 0 || r_size == 0)
|
||||||
{
|
{
|
||||||
|
@ -177,12 +177,12 @@ void ParallelAggregatingBlockInputStream::execute()
|
|||||||
for (size_t i = 0; i < max_threads; ++i)
|
for (size_t i = 0; i < max_threads; ++i)
|
||||||
{
|
{
|
||||||
size_t rows = many_data[i]->size();
|
size_t rows = many_data[i]->size();
|
||||||
LOG_TRACE(log, "Aggregated. " << threads_data[i].src_rows << " to " << rows << " rows" << " (from " << threads_data[i].src_bytes / 1048576.0 << " MiB)" << " in " << elapsed_seconds << " sec." << " (" << threads_data[i].src_rows / elapsed_seconds << " rows/sec., " << formatReadableSizeWithBinarySuffix(threads_data[i].src_bytes / elapsed_seconds) << "/sec.)");
|
LOG_TRACE(log, "Aggregated. " << threads_data[i].src_rows << " to " << rows << " rows (from " << threads_data[i].src_bytes / 1048576.0 << " MiB) in " << elapsed_seconds << " sec. (" << threads_data[i].src_rows / elapsed_seconds << " rows/sec., " << formatReadableSizeWithBinarySuffix(threads_data[i].src_bytes / elapsed_seconds) << "/sec.)");
|
||||||
|
|
||||||
total_src_rows += threads_data[i].src_rows;
|
total_src_rows += threads_data[i].src_rows;
|
||||||
total_src_bytes += threads_data[i].src_bytes;
|
total_src_bytes += threads_data[i].src_bytes;
|
||||||
}
|
}
|
||||||
LOG_TRACE(log, "Total aggregated. " << total_src_rows << " rows (from " << total_src_bytes / 1048576.0 << " MiB)" << " in " << elapsed_seconds << " sec." << " (" << total_src_rows / elapsed_seconds << " rows/sec., " << formatReadableSizeWithBinarySuffix(total_src_bytes / elapsed_seconds) << "/sec.)");
|
LOG_TRACE(log, "Total aggregated. " << total_src_rows << " rows (from " << total_src_bytes / 1048576.0 << " MiB) in " << elapsed_seconds << " sec. (" << total_src_rows / elapsed_seconds << " rows/sec., " << formatReadableSizeWithBinarySuffix(total_src_bytes / elapsed_seconds) << "/sec.)");
|
||||||
|
|
||||||
/// If there was no data, and we aggregate without keys, we must return single row with the result of empty aggregation.
|
/// If there was no data, and we aggregate without keys, we must return single row with the result of empty aggregation.
|
||||||
/// To do this, we pass a block with zero rows to aggregate.
|
/// To do this, we pass a block with zero rows to aggregate.
|
||||||
|
@ -135,7 +135,7 @@ void WriteBufferFromS3::writePart(const String & data)
|
|||||||
{
|
{
|
||||||
auto etag = outcome.GetResult().GetETag();
|
auto etag = outcome.GetResult().GetETag();
|
||||||
part_tags.push_back(etag);
|
part_tags.push_back(etag);
|
||||||
LOG_DEBUG(log, "Writing part finished. " << "Total parts: " << part_tags.size() << ", Upload_id: " << upload_id << ", Etag: " << etag);
|
LOG_DEBUG_FORMATTED(log, "Writing part finished. Total parts: {}, Upload_id: {}, Etag: {}", part_tags.size(), upload_id, etag);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
|
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
|
||||||
|
@ -761,7 +761,7 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, co
|
|||||||
ProfileEvents::increment(ProfileEvents::ExternalAggregationCompressedBytes, compressed_bytes);
|
ProfileEvents::increment(ProfileEvents::ExternalAggregationCompressedBytes, compressed_bytes);
|
||||||
ProfileEvents::increment(ProfileEvents::ExternalAggregationUncompressedBytes, uncompressed_bytes);
|
ProfileEvents::increment(ProfileEvents::ExternalAggregationUncompressedBytes, uncompressed_bytes);
|
||||||
|
|
||||||
LOG_TRACE(log, "Written part in " << elapsed_seconds << " sec., " << rows << " rows, " << formatReadableSizeWithBinarySuffix(uncompressed_bytes) << " uncompressed, " << formatReadableSizeWithBinarySuffix(compressed_bytes) << " compressed, " << (uncompressed_bytes / rows) << " uncompressed bytes per row, " << (compressed_bytes / rows) << " compressed bytes per row, " << "compression rate: " << (uncompressed_bytes / compressed_bytes) << " (" << (rows / elapsed_seconds) << " rows/sec., " << formatReadableSizeWithBinarySuffix(uncompressed_bytes / elapsed_seconds) << "/sec. uncompressed, " << formatReadableSizeWithBinarySuffix(compressed_bytes / elapsed_seconds) << "/sec. compressed)");
|
LOG_TRACE(log, "Written part in " << elapsed_seconds << " sec., " << rows << " rows, " << formatReadableSizeWithBinarySuffix(uncompressed_bytes) << " uncompressed, " << formatReadableSizeWithBinarySuffix(compressed_bytes) << " compressed, " << (uncompressed_bytes / rows) << " uncompressed bytes per row, " << (compressed_bytes / rows) << " compressed bytes per row, compression rate: " << (uncompressed_bytes / compressed_bytes) << " (" << (rows / elapsed_seconds) << " rows/sec., " << formatReadableSizeWithBinarySuffix(uncompressed_bytes / elapsed_seconds) << "/sec. uncompressed, " << formatReadableSizeWithBinarySuffix(compressed_bytes / elapsed_seconds) << "/sec. compressed)");
|
||||||
}
|
}
|
||||||
void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants)
|
void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants)
|
||||||
{
|
{
|
||||||
@ -927,7 +927,7 @@ void Aggregator::execute(const BlockInputStreamPtr & stream, AggregatedDataVaria
|
|||||||
|
|
||||||
double elapsed_seconds = watch.elapsedSeconds();
|
double elapsed_seconds = watch.elapsedSeconds();
|
||||||
size_t rows = result.sizeWithoutOverflowRow();
|
size_t rows = result.sizeWithoutOverflowRow();
|
||||||
LOG_TRACE(log, "Aggregated. " << src_rows << " to " << rows << " rows (from " << formatReadableSizeWithBinarySuffix(src_bytes) << ")" << " in " << elapsed_seconds << " sec." << " (" << src_rows / elapsed_seconds << " rows/sec., " << formatReadableSizeWithBinarySuffix(src_bytes / elapsed_seconds) << "/sec.)");
|
LOG_TRACE(log, "Aggregated. " << src_rows << " to " << rows << " rows (from " << formatReadableSizeWithBinarySuffix(src_bytes) << ") in " << elapsed_seconds << " sec. (" << src_rows / elapsed_seconds << " rows/sec., " << formatReadableSizeWithBinarySuffix(src_bytes / elapsed_seconds) << "/sec.)");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -1293,7 +1293,7 @@ BlocksList Aggregator::convertToBlocks(AggregatedDataVariants & data_variants, b
|
|||||||
}
|
}
|
||||||
|
|
||||||
double elapsed_seconds = watch.elapsedSeconds();
|
double elapsed_seconds = watch.elapsedSeconds();
|
||||||
LOG_TRACE(log, "Converted aggregated data to blocks. " << rows << " rows, " << bytes / 1048576.0 << " MiB" << " in " << elapsed_seconds << " sec." << " (" << rows / elapsed_seconds << " rows/sec., " << formatReadableSizeWithBinarySuffix(bytes / elapsed_seconds) << "/sec.)");
|
LOG_TRACE(log, "Converted aggregated data to blocks. " << rows << " rows, " << bytes / 1048576.0 << " MiB in " << elapsed_seconds << " sec. (" << rows / elapsed_seconds << " rows/sec., " << formatReadableSizeWithBinarySuffix(bytes / elapsed_seconds) << "/sec.)");
|
||||||
|
|
||||||
return blocks;
|
return blocks;
|
||||||
}
|
}
|
||||||
@ -2155,7 +2155,7 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
|
|||||||
size_t rows = block.rows();
|
size_t rows = block.rows();
|
||||||
size_t bytes = block.bytes();
|
size_t bytes = block.bytes();
|
||||||
double elapsed_seconds = watch.elapsedSeconds();
|
double elapsed_seconds = watch.elapsedSeconds();
|
||||||
LOG_TRACE(log, "Merged partially aggregated blocks. " << rows << " rows, " << bytes / 1048576.0 << " MiB." << " in " << elapsed_seconds << " sec." << " (" << rows / elapsed_seconds << " rows/sec., " << formatReadableSizeWithBinarySuffix(bytes / elapsed_seconds) << "/sec.)");
|
LOG_TRACE(log, "Merged partially aggregated blocks. " << rows << " rows, " << bytes / 1048576.0 << " MiB. in " << elapsed_seconds << " sec. (" << rows / elapsed_seconds << " rows/sec., " << formatReadableSizeWithBinarySuffix(bytes / elapsed_seconds) << "/sec.)");
|
||||||
|
|
||||||
if (isCancelled())
|
if (isCancelled())
|
||||||
return {};
|
return {};
|
||||||
|
@ -238,7 +238,7 @@ DDLWorker::DDLWorker(const std::string & zk_root_dir, Context & context_, const
|
|||||||
|
|
||||||
if (context.getSettingsRef().readonly)
|
if (context.getSettingsRef().readonly)
|
||||||
{
|
{
|
||||||
LOG_WARNING(log, "Distributed DDL worker is run with readonly settings, it will not be able to execute DDL queries" << " Set appropriate system_profile or distributed_ddl.profile to fix this.");
|
LOG_WARNING(log, "Distributed DDL worker is run with readonly settings, it will not be able to execute DDL queries Set appropriate system_profile or distributed_ddl.profile to fix this.");
|
||||||
}
|
}
|
||||||
|
|
||||||
host_fqdn = getFQDNOrHostName();
|
host_fqdn = getFQDNOrHostName();
|
||||||
@ -940,7 +940,7 @@ String DDLWorker::enqueueQuery(DDLLogEntry & entry)
|
|||||||
}
|
}
|
||||||
catch (...)
|
catch (...)
|
||||||
{
|
{
|
||||||
LOG_INFO(log, "An error occurred while creating auxiliary ZooKeeper directories in " << node_path << " . They will be created later" << ". Error : " << getCurrentExceptionMessage(true));
|
LOG_INFO_FORMATTED(log, "An error occurred while creating auxiliary ZooKeeper directories in {} . They will be created later. Error : {}", node_path, getCurrentExceptionMessage(true));
|
||||||
}
|
}
|
||||||
|
|
||||||
return node_path;
|
return node_path;
|
||||||
@ -1217,7 +1217,7 @@ private:
|
|||||||
if (!ignoring_hosts.count(host))
|
if (!ignoring_hosts.count(host))
|
||||||
{
|
{
|
||||||
ignoring_hosts.emplace(host);
|
ignoring_hosts.emplace(host);
|
||||||
LOG_INFO(log, "Unexpected host " << host << " appeared " << " in task " << node_path);
|
LOG_INFO_FORMATTED(log, "Unexpected host {} appeared in task {}", host, node_path);
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -256,7 +256,7 @@ void SystemLog<LogElement>::add(const LogElement & element)
|
|||||||
|
|
||||||
// TextLog sets its logger level to 0, so this log is a noop and
|
// TextLog sets its logger level to 0, so this log is a noop and
|
||||||
// there is no recursive logging.
|
// there is no recursive logging.
|
||||||
LOG_ERROR(log, "Queue is full for system log '" << demangle(typeid(*this).name()) << "'" << " at " << queue_front_index);
|
LOG_ERROR_FORMATTED(log, "Queue is full for system log '{}' at {}", demangle(typeid(*this).name()), queue_front_index);
|
||||||
}
|
}
|
||||||
|
|
||||||
return;
|
return;
|
||||||
|
@ -140,7 +140,7 @@ static void setExceptionStackTrace(QueryLogElement & elem)
|
|||||||
/// Log exception (with query info) into text log (not into system table).
|
/// Log exception (with query info) into text log (not into system table).
|
||||||
static void logException(Context & context, QueryLogElement & elem)
|
static void logException(Context & context, QueryLogElement & elem)
|
||||||
{
|
{
|
||||||
LOG_ERROR(&Logger::get("executeQuery"), elem.exception << " (from " << context.getClientInfo().current_address.toString() << ")" << " (in query: " << joinLines(elem.query) << ")" << (!elem.stack_trace.empty() ? ", Stack trace (when copying this message, always include the lines below):\n\n" + elem.stack_trace : ""));
|
LOG_ERROR(&Logger::get("executeQuery"), elem.exception << " (from " << context.getClientInfo().current_address.toString() << ") (in query: " << joinLines(elem.query) << ")" << (!elem.stack_trace.empty() ? ", Stack trace (when copying this message, always include the lines below):\n\n" + elem.stack_trace : ""));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -41,7 +41,7 @@ protected:
|
|||||||
{
|
{
|
||||||
std::string indent_str = s.one_line ? "" : std::string(4 * frame.indent, ' ');
|
std::string indent_str = s.one_line ? "" : std::string(4 * frame.indent, ' ');
|
||||||
|
|
||||||
s.ostr << (s.hilite ? hilite_keyword : "") << "WATCH" << " " << (s.hilite ? hilite_none : "")
|
s.ostr << (s.hilite ? hilite_keyword : "") << "WATCH " << (s.hilite ? hilite_none : "")
|
||||||
<< (!database.empty() ? backQuoteIfNeed(database) + "." : "") << backQuoteIfNeed(table);
|
<< (!database.empty() ? backQuoteIfNeed(database) + "." : "") << backQuoteIfNeed(table);
|
||||||
|
|
||||||
if (is_watch_events)
|
if (is_watch_events)
|
||||||
|
@ -547,7 +547,7 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
|
|||||||
|
|
||||||
#ifndef NDEBUG
|
#ifndef NDEBUG
|
||||||
auto & context = executor_contexts[thread_num];
|
auto & context = executor_contexts[thread_num];
|
||||||
LOG_TRACE(log, "Thread finished." << " Total time: " << (context->total_time_ns / 1e9) << " sec." << " Execution time: " << (context->execution_time_ns / 1e9) << " sec." << " Processing time: " << (context->processing_time_ns / 1e9) << " sec." << " Wait time: " << (context->wait_time_ns / 1e9) << " sec.");
|
LOG_TRACE(log, "Thread finished. Total time: " << (context->total_time_ns / 1e9) << " sec. Execution time: " << (context->execution_time_ns / 1e9) << " sec. Processing time: " << (context->processing_time_ns / 1e9) << " sec. Wait time: " << (context->wait_time_ns / 1e9) << " sec.");
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -540,7 +540,7 @@ void AggregatingTransform::initGenerate()
|
|||||||
|
|
||||||
double elapsed_seconds = watch.elapsedSeconds();
|
double elapsed_seconds = watch.elapsedSeconds();
|
||||||
size_t rows = variants.sizeWithoutOverflowRow();
|
size_t rows = variants.sizeWithoutOverflowRow();
|
||||||
LOG_TRACE(log, "Aggregated. " << src_rows << " to " << rows << " rows (from " << src_bytes / 1048576.0 << " MiB)" << " in " << elapsed_seconds << " sec." << " (" << src_rows / elapsed_seconds << " rows/sec., " << formatReadableSizeWithBinarySuffix(src_bytes / elapsed_seconds) << "/sec.)");
|
LOG_TRACE(log, "Aggregated. " << src_rows << " to " << rows << " rows (from " << src_bytes / 1048576.0 << " MiB) in " << elapsed_seconds << " sec. (" << src_rows / elapsed_seconds << " rows/sec., " << formatReadableSizeWithBinarySuffix(src_bytes / elapsed_seconds) << "/sec.)");
|
||||||
|
|
||||||
if (params->aggregator.hasTemporaryFiles())
|
if (params->aggregator.hasTemporaryFiles())
|
||||||
{
|
{
|
||||||
|
@ -48,7 +48,7 @@ void printPipeline(const Processors & processors, const Statuses & statuses, Wri
|
|||||||
const IProcessor & curr = *processor;
|
const IProcessor & curr = *processor;
|
||||||
const IProcessor & next = port.getInputPort().getProcessor();
|
const IProcessor & next = port.getInputPort().getProcessor();
|
||||||
|
|
||||||
out << "n" << get_proc_id(curr) << " -> " << "n" << get_proc_id(next) << ";\n";
|
out << "n" << get_proc_id(curr) << " -> n" << get_proc_id(next) << ";\n";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
out << "}\n";
|
out << "}\n";
|
||||||
|
@ -317,7 +317,7 @@ void StorageDistributedDirectoryMonitor::readHeader(
|
|||||||
readVarUInt(initiator_revision, header_buf);
|
readVarUInt(initiator_revision, header_buf);
|
||||||
if (ClickHouseRevision::get() < initiator_revision)
|
if (ClickHouseRevision::get() < initiator_revision)
|
||||||
{
|
{
|
||||||
LOG_WARNING(log, "ClickHouse shard version is older than ClickHouse initiator version. " << "It may lack support for new features.");
|
LOG_WARNING(log, "ClickHouse shard version is older than ClickHouse initiator version. It may lack support for new features.");
|
||||||
}
|
}
|
||||||
|
|
||||||
readStringBinary(insert_query, header_buf);
|
readStringBinary(insert_query, header_buf);
|
||||||
|
@ -114,7 +114,7 @@ void DistributedBlockOutputStream::write(const Block & block)
|
|||||||
if (ordinary_block.has(col.name))
|
if (ordinary_block.has(col.name))
|
||||||
{
|
{
|
||||||
ordinary_block.erase(col.name);
|
ordinary_block.erase(col.name);
|
||||||
LOG_DEBUG(log, storage.getStorageID().getNameForLogs() << ": column " + col.name + " will be removed, " << "because it is MATERIALIZED");
|
LOG_DEBUG(log, storage.getStorageID().getNameForLogs() << ": column " + col.name + " will be removed, because it is MATERIALIZED");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -411,7 +411,7 @@ void DistributedBlockOutputStream::writeSuffix()
|
|||||||
auto log_performance = [this] ()
|
auto log_performance = [this] ()
|
||||||
{
|
{
|
||||||
double elapsed = watch.elapsedSeconds();
|
double elapsed = watch.elapsedSeconds();
|
||||||
LOG_DEBUG(log, "It took " << std::fixed << std::setprecision(1) << elapsed << " sec. to insert " << inserted_blocks << " blocks" << ", " << std::fixed << std::setprecision(1) << inserted_rows / elapsed << " rows per second" << ". " << getCurrentStateDescription());
|
LOG_DEBUG(log, "It took " << std::fixed << std::setprecision(1) << elapsed << " sec. to insert " << inserted_blocks << " blocks, " << std::fixed << std::setprecision(1) << inserted_rows / elapsed << " rows per second. " << getCurrentStateDescription());
|
||||||
};
|
};
|
||||||
|
|
||||||
if (insert_sync && pool)
|
if (insert_sync && pool)
|
||||||
|
@ -668,7 +668,7 @@ void IMergeTreeDataPart::renameTo(const String & new_relative_path, bool remove_
|
|||||||
Names files;
|
Names files;
|
||||||
volume->getDisk()->listFiles(to, files);
|
volume->getDisk()->listFiles(to, files);
|
||||||
|
|
||||||
LOG_WARNING(storage.log, "Part directory " << fullPath(volume->getDisk(), to) << " already exists" << " and contains " << files.size() << " files. Removing it.");
|
LOG_WARNING(storage.log, "Part directory " << fullPath(volume->getDisk(), to) << " already exists and contains " << files.size() << " files. Removing it.");
|
||||||
|
|
||||||
volume->getDisk()->removeRecursive(to);
|
volume->getDisk()->removeRecursive(to);
|
||||||
}
|
}
|
||||||
|
@ -2088,7 +2088,7 @@ restore_covered)
|
|||||||
|
|
||||||
if (error)
|
if (error)
|
||||||
{
|
{
|
||||||
LOG_ERROR(log, "The set of parts restored in place of " << part->name << " looks incomplete." << " There might or might not be a data loss." << (error_parts.empty() ? "" : " Suspicious parts: " + error_parts));
|
LOG_ERROR_FORMATTED(log, "The set of parts restored in place of {} looks incomplete. There might or might not be a data loss.{}", part->name, (error_parts.empty() ? "" : " Suspicious parts: " + error_parts));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -215,7 +215,7 @@ void MergeTreePartsMover::swapClonedPart(const MergeTreeData::DataPartPtr & clon
|
|||||||
/// It's ok, because we don't block moving parts for merges or mutations
|
/// It's ok, because we don't block moving parts for merges or mutations
|
||||||
if (!active_part || active_part->name != cloned_part->name)
|
if (!active_part || active_part->name != cloned_part->name)
|
||||||
{
|
{
|
||||||
LOG_INFO(log, "Failed to swap " << cloned_part->name << ". Active part doesn't exist." << " Possible it was merged or mutated. Will remove copy on path '" << cloned_part->getFullPath() << "'.");
|
LOG_INFO_FORMATTED(log, "Failed to swap {}. Active part doesn't exist. Possible it was merged or mutated. Will remove copy on path '{}'.", cloned_part->name, cloned_part->getFullPath());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -320,7 +320,7 @@ void ReplicatedMergeTreeQueue::updateTimesInZooKeeper(
|
|||||||
auto code = zookeeper->tryMulti(ops, responses);
|
auto code = zookeeper->tryMulti(ops, responses);
|
||||||
|
|
||||||
if (code)
|
if (code)
|
||||||
LOG_ERROR(log, "Couldn't set value of nodes for insert times (" << replica_path << "/min_unprocessed_insert_time, max_processed_insert_time)" << ": " << zkutil::ZooKeeper::error2string(code) + ". This shouldn't happen often.");
|
LOG_ERROR_FORMATTED(log, "Couldn't set value of nodes for insert times ({}/min_unprocessed_insert_time, max_processed_insert_time): {}", replica_path, zkutil::ZooKeeper::error2string(code) + ". This shouldn't happen often.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1885,7 +1885,7 @@ bool ReplicatedMergeTreeMergePredicate::isMutationFinished(const ReplicatedMerge
|
|||||||
partition_it->second.begin(), partition_it->second.lower_bound(block_num));
|
partition_it->second.begin(), partition_it->second.lower_bound(block_num));
|
||||||
if (blocks_count)
|
if (blocks_count)
|
||||||
{
|
{
|
||||||
LOG_TRACE(queue.log, "Mutation " << mutation.znode_name << " is not done yet because " << "in partition ID " << partition_id << " there are still " << blocks_count << " uncommitted blocks.");
|
LOG_TRACE(queue.log, "Mutation " << mutation.znode_name << " is not done yet because in partition ID " << partition_id << " there are still " << blocks_count << " uncommitted blocks.");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -66,7 +66,7 @@ ReplicatedMergeTreeTableMetadata::ReplicatedMergeTreeTableMetadata(const MergeTr
|
|||||||
|
|
||||||
void ReplicatedMergeTreeTableMetadata::write(WriteBuffer & out) const
|
void ReplicatedMergeTreeTableMetadata::write(WriteBuffer & out) const
|
||||||
{
|
{
|
||||||
out << "metadata format version: 1" << "\n"
|
out << "metadata format version: 1\n"
|
||||||
<< "date column: " << date_column << "\n"
|
<< "date column: " << date_column << "\n"
|
||||||
<< "sampling expression: " << sampling_expression << "\n"
|
<< "sampling expression: " << sampling_expression << "\n"
|
||||||
<< "index granularity: " << index_granularity << "\n"
|
<< "index granularity: " << index_granularity << "\n"
|
||||||
|
@ -447,7 +447,7 @@ void StorageBuffer::startup()
|
|||||||
{
|
{
|
||||||
if (global_context.getSettingsRef().readonly)
|
if (global_context.getSettingsRef().readonly)
|
||||||
{
|
{
|
||||||
LOG_WARNING(log, "Storage " << getName() << " is run with readonly settings, it will not be able to insert data." << " Set appropriate system_profile to fix this.");
|
LOG_WARNING(log, "Storage " << getName() << " is run with readonly settings, it will not be able to insert data. Set appropriate system_profile to fix this.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -830,7 +830,7 @@ void StorageReplicatedMergeTree::checkPartChecksumsAndAddCommitOps(const zkutil:
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
LOG_WARNING(log, "checkPartAndAddToZooKeeper: node " << replica_path + "/parts/" + part_name << " already exists." << " Will not commit any nodes.");
|
LOG_WARNING(log, "checkPartAndAddToZooKeeper: node " << replica_path + "/parts/" + part_name << " already exists. Will not commit any nodes.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1370,7 +1370,7 @@ bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry)
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
LOG_WARNING(log, "No active replica has part " << entry.new_part_name << ", but that part needs quorum and /quorum/status contains entry about another part " << quorum_entry.part_name << ". It means that part was successfully written to " << entry.quorum << " replicas, but then all of them goes offline." << " Or it is a bug.");
|
LOG_WARNING(log, "No active replica has part " << entry.new_part_name << ", but that part needs quorum and /quorum/status contains entry about another part " << quorum_entry.part_name << ". It means that part was successfully written to " << entry.quorum << " replicas, but then all of them goes offline. Or it is a bug.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1580,7 +1580,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
|
|||||||
}
|
}
|
||||||
catch (Exception &)
|
catch (Exception &)
|
||||||
{
|
{
|
||||||
LOG_INFO(log, "Can't use " << source_table_id.getNameForLogs() << " as source table for REPLACE PARTITION command. Will fetch all parts." << " Reason: " << getCurrentExceptionMessage(false));
|
LOG_INFO_FORMATTED(log, "Can't use {} as source table for REPLACE PARTITION command. Will fetch all parts. Reason: {}", source_table_id.getNameForLogs(), getCurrentExceptionMessage(false));
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2297,7 +2297,7 @@ bool StorageReplicatedMergeTree::createLogEntryToMergeParts(
|
|||||||
const auto & part = parts[i];
|
const auto & part = parts[i];
|
||||||
if (part->modification_time + MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER < time(nullptr))
|
if (part->modification_time + MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER < time(nullptr))
|
||||||
{
|
{
|
||||||
LOG_WARNING(log, "Part " << part->name << " (that was selected for merge)" << " with age " << (time(nullptr) - part->modification_time) << " seconds exists locally but not in ZooKeeper." << " Won't do merge with that part and will check it.");
|
LOG_WARNING(log, "Part " << part->name << " (that was selected for merge) with age " << (time(nullptr) - part->modification_time) << " seconds exists locally but not in ZooKeeper. Won't do merge with that part and will check it.");
|
||||||
enqueuePartForCheck(part->name);
|
enqueuePartForCheck(part->name);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -2339,7 +2339,7 @@ bool StorageReplicatedMergeTree::createLogEntryToMutatePart(const IMergeTreeData
|
|||||||
{
|
{
|
||||||
if (part.modification_time + MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER < time(nullptr))
|
if (part.modification_time + MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER < time(nullptr))
|
||||||
{
|
{
|
||||||
LOG_WARNING(log, "Part " << part.name << " (that was selected for mutation)" << " with age " << (time(nullptr) - part.modification_time) << " seconds exists locally but not in ZooKeeper." << " Won't mutate that part and will check it.");
|
LOG_WARNING(log, "Part " << part.name << " (that was selected for mutation) with age " << (time(nullptr) - part.modification_time) << " seconds exists locally but not in ZooKeeper. Won't mutate that part and will check it.");
|
||||||
enqueuePartForCheck(part.name);
|
enqueuePartForCheck(part.name);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -4305,7 +4305,7 @@ void StorageReplicatedMergeTree::fetchPartition(const ASTPtr & partition, const
|
|||||||
if (best_replica.empty())
|
if (best_replica.empty())
|
||||||
throw Exception("Logical error: cannot choose best replica.", ErrorCodes::LOGICAL_ERROR);
|
throw Exception("Logical error: cannot choose best replica.", ErrorCodes::LOGICAL_ERROR);
|
||||||
|
|
||||||
LOG_INFO(log, "Found " << replicas.size() << " replicas, " << active_replicas.size() << " of them are active." << " Selected " << best_replica << " to fetch from.");
|
LOG_INFO(log, "Found " << replicas.size() << " replicas, " << active_replicas.size() << " of them are active. Selected " << best_replica << " to fetch from.");
|
||||||
|
|
||||||
String best_replica_path = from + "/replicas/" + best_replica;
|
String best_replica_path = from + "/replicas/" + best_replica;
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user