Progress on task

This commit is contained in:
Alexey Milovidov 2020-05-24 00:16:05 +03:00
parent 7c0c328a35
commit bab24879e9
11 changed files with 61 additions and 36 deletions

View File

@ -607,19 +607,23 @@ void Connection::sendExternalTablesData(ExternalTablesData & data)
maybe_compressed_out_bytes = maybe_compressed_out->count() - maybe_compressed_out_bytes;
double elapsed = watch.elapsedSeconds();
std::stringstream msg;
msg << std::fixed << std::setprecision(3);
msg << "Sent data for " << data.size() << " external tables, total " << rows << " rows in " << elapsed << " sec., "
<< static_cast<size_t>(rows / watch.elapsedSeconds()) << " rows/sec., "
<< formatReadableSizeWithBinarySuffix(maybe_compressed_out_bytes) << " (" << formatReadableSizeWithBinarySuffix(maybe_compressed_out_bytes / watch.elapsedSeconds()) << "/sec.)";
if (compression == Protocol::Compression::Enable)
msg << ", compressed " << static_cast<double>(maybe_compressed_out_bytes) / out_bytes << " times to "
<< formatReadableSizeWithBinarySuffix(out_bytes) << " (" << formatReadableSizeWithBinarySuffix(out_bytes / watch.elapsedSeconds()) << "/sec.)";
LOG_DEBUG_FORMATTED(log_wrapper.get(),
"Sent data for {} external tables, total {} rows in {} sec., {} rows/sec., {} ({}/sec.), compressed {} times to {} ({}/sec.)",
data.size(), rows, elapsed,
static_cast<size_t>(rows / watch.elapsedSeconds()),
formatReadableSizeWithBinarySuffix(maybe_compressed_out_bytes),
formatReadableSizeWithBinarySuffix(maybe_compressed_out_bytes / watch.elapsedSeconds()),
static_cast<double>(maybe_compressed_out_bytes) / out_bytes,
formatReadableSizeWithBinarySuffix(out_bytes),
formatReadableSizeWithBinarySuffix(out_bytes / watch.elapsedSeconds()));
else
msg << ", no compression.";
LOG_DEBUG(log_wrapper.get(), msg.rdbuf());
LOG_DEBUG_FORMATTED(log_wrapper.get(),
"Sent data for {} external tables, total {} rows in {} sec., {} rows/sec., {} ({}/sec.), no compression.",
data.size(), rows, elapsed,
static_cast<size_t>(rows / watch.elapsedSeconds()),
formatReadableSizeWithBinarySuffix(maybe_compressed_out_bytes),
formatReadableSizeWithBinarySuffix(maybe_compressed_out_bytes / watch.elapsedSeconds()));
}
std::optional<Poco::Net::SocketAddress> Connection::getResolvedAddress() const

View File

@ -762,7 +762,20 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, co
ProfileEvents::increment(ProfileEvents::ExternalAggregationCompressedBytes, compressed_bytes);
ProfileEvents::increment(ProfileEvents::ExternalAggregationUncompressedBytes, uncompressed_bytes);
LOG_TRACE(log, "Written part in " << elapsed_seconds << " sec., " << rows << " rows, " << formatReadableSizeWithBinarySuffix(uncompressed_bytes) << " uncompressed, " << formatReadableSizeWithBinarySuffix(compressed_bytes) << " compressed, " << (uncompressed_bytes / rows) << " uncompressed bytes per row, " << (compressed_bytes / rows) << " compressed bytes per row, compression rate: " << (uncompressed_bytes / compressed_bytes) << " (" << (rows / elapsed_seconds) << " rows/sec., " << formatReadableSizeWithBinarySuffix(uncompressed_bytes / elapsed_seconds) << "/sec. uncompressed, " << formatReadableSizeWithBinarySuffix(compressed_bytes / elapsed_seconds) << "/sec. compressed)");
LOG_TRACE_FORMATTED(log,
"Written part in {} sec., {} rows, {} uncompressed, {} compressed,"
" {} uncompressed bytes per row, {} compressed bytes per row, compression rate: {}"
" ({} rows/sec., {}/sec. uncompressed, {}/sec. compressed)",
elapsed_seconds,
rows,
formatReadableSizeWithBinarySuffix(uncompressed_bytes),
formatReadableSizeWithBinarySuffix(compressed_bytes),
uncompressed_bytes / rows,
compressed_bytes / rows,
uncompressed_bytes / compressed_bytes,
rows / elapsed_seconds,
formatReadableSizeWithBinarySuffix(uncompressed_bytes / elapsed_seconds),
formatReadableSizeWithBinarySuffix(compressed_bytes / elapsed_seconds));
}
void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants)
{
@ -928,7 +941,11 @@ void Aggregator::execute(const BlockInputStreamPtr & stream, AggregatedDataVaria
double elapsed_seconds = watch.elapsedSeconds();
size_t rows = result.sizeWithoutOverflowRow();
LOG_TRACE(log, "Aggregated. " << src_rows << " to " << rows << " rows (from " << formatReadableSizeWithBinarySuffix(src_bytes) << ") in " << elapsed_seconds << " sec. (" << src_rows / elapsed_seconds << " rows/sec., " << formatReadableSizeWithBinarySuffix(src_bytes / elapsed_seconds) << "/sec.)");
LOG_TRACE_FORMATTED(log, "Aggregated. {} to {} rows (from {}) in {} sec. ({} rows/sec., {}/sec.)",
src_rows, rows, formatReadableSizeWithBinarySuffix(src_bytes),
elapsed_seconds, src_rows / elapsed_seconds,
formatReadableSizeWithBinarySuffix(src_bytes / elapsed_seconds));
}

View File

@ -540,7 +540,11 @@ void AggregatingTransform::initGenerate()
double elapsed_seconds = watch.elapsedSeconds();
size_t rows = variants.sizeWithoutOverflowRow();
LOG_TRACE(log, "Aggregated. " << src_rows << " to " << rows << " rows (from " << formatReadableSizeWithBinarySuffix(src_bytes) << ") in " << elapsed_seconds << " sec. (" << src_rows / elapsed_seconds << " rows/sec., " << formatReadableSizeWithBinarySuffix(src_bytes / elapsed_seconds) << "/sec.)");
LOG_TRACE_FORMATTED(log, "Aggregated. {} to {} rows (from {}) in {} sec. ({} rows/sec., {}/sec.)",
src_rows, rows, formatReadableSizeWithBinarySuffix(src_bytes),
elapsed_seconds, src_rows / elapsed_seconds,
formatReadableSizeWithBinarySuffix(src_bytes / elapsed_seconds));
if (params->aggregator.hasTemporaryFiles())
{

View File

@ -56,7 +56,12 @@ IProcessor::Status CreatingSetsTransform::prepare()
void CreatingSetsTransform::startSubquery(SubqueryForSet & subquery)
{
LOG_TRACE(log, (subquery.set ? "Creating set. " : "") << (subquery.join ? "Creating join. " : "") << (subquery.table ? "Filling temporary table. " : ""));
if (subquery.set)
LOG_TRACE_FORMATTED(log, "Creating set.");
if (subquery.join)
LOG_TRACE_FORMATTED(log, "Creating join.");
if (subquery.table)
LOG_TRACE_FORMATTED(log, "Filling temporary table.");
elapsed_nanoseconds = 0;
@ -85,19 +90,14 @@ void CreatingSetsTransform::finishSubquery(SubqueryForSet & subquery)
if (head_rows != 0)
{
std::stringstream msg;
msg << std::fixed << std::setprecision(3);
msg << "Created. ";
auto seconds = elapsed_nanoseconds / 1e9;
if (subquery.set)
msg << "Set with " << subquery.set->getTotalRowCount() << " entries from " << head_rows << " rows. ";
LOG_DEBUG_FORMATTED(log, "Created Set with {} entries from {} rows in {} sec.", subquery.set->getTotalRowCount(), head_rows, seconds);
if (subquery.join)
msg << "Join with " << subquery.join->getTotalRowCount() << " entries from " << head_rows << " rows. ";
LOG_DEBUG_FORMATTED(log, "Created Join with {} entries from {} rows in {} sec.", subquery.join->getTotalRowCount(), head_rows, seconds);
if (subquery.table)
msg << "Table with " << head_rows << " rows. ";
msg << "In " << (static_cast<double>(elapsed_nanoseconds) / 1000000000ULL) << " sec.";
LOG_DEBUG(log, msg.rdbuf());
LOG_DEBUG_FORMATTED(log, "Created Table with {} rows in {} sec.", head_rows, seconds);
}
else
{

View File

@ -411,7 +411,7 @@ void DistributedBlockOutputStream::writeSuffix()
auto log_performance = [this] ()
{
double elapsed = watch.elapsedSeconds();
LOG_DEBUG(log, "It took " << std::fixed << std::setprecision(1) << elapsed << " sec. to insert " << inserted_blocks << " blocks, " << std::fixed << std::setprecision(1) << inserted_rows / elapsed << " rows per second. " << getCurrentStateDescription());
LOG_DEBUG(log, "It took " << elapsed << " sec. to insert " << inserted_blocks << " blocks, " << inserted_rows / elapsed << " rows per second. " << getCurrentStateDescription());
};
if (insert_sync && pool)

View File

@ -3275,7 +3275,7 @@ void MergeTreeData::freezePartitionsByMatcher(MatcherFn matcher, const String &
: toString(increment))
+ "/";
LOG_DEBUG(log, "Freezing part " << part->name << " snapshot will be placed at " + backup_path);
LOG_DEBUG(log, "Freezing part " << part->name << " snapshot will be placed at " << backup_path);
String backup_part_path = backup_path + relative_data_path + part->relative_path;
localBackup(part->volume->getDisk(), part->getFullRelativePath(), backup_part_path);

View File

@ -327,7 +327,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
cached_block_stats.erase(first_outdated_block->node);
}
else if (rc)
LOG_WARNING(log, "Error while deleting ZooKeeper path `" << path << "`: " + zkutil::ZooKeeper::error2string(rc) << ", ignoring.");
LOG_WARNING(log, "Error while deleting ZooKeeper path `" << path << "`: " << zkutil::ZooKeeper::error2string(rc) << ", ignoring.");
else
{
/// Successfully removed blocks have to be removed from cache

View File

@ -82,7 +82,7 @@ void ReplicatedMergeTreePartCheckThread::searchForMissingPart(const String & par
/// If the part is in ZooKeeper, remove it from there and add the task to download it to the queue.
if (zookeeper->exists(part_path))
{
LOG_WARNING(log, "Part " << part_name << " exists in ZooKeeper but not locally. " "Removing from ZooKeeper and queueing a fetch.");
LOG_WARNING(log, "Part " << part_name << " exists in ZooKeeper but not locally. Removing from ZooKeeper and queueing a fetch.");
ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed);
storage.removePartAndEnqueueFetch(part_name);

View File

@ -745,7 +745,7 @@ ReplicatedMergeTreeMutationEntryPtr ReplicatedMergeTreeQueue::removeMutation(
if (entry->isAlterMutation())
{
LOG_DEBUG(log, "Removed alter " << entry->alter_version << " because mutation " + entry->znode_name + " were killed.");
LOG_DEBUG(log, "Removed alter " << entry->alter_version << " because mutation " << entry->znode_name << " were killed.");
alter_sequence.finishDataAlter(entry->alter_version, state_lock);
}

View File

@ -837,7 +837,7 @@ BackgroundProcessingPoolTaskResult StorageMergeTree::mergeMutateTask()
{
if (e.code() == ErrorCodes::ABORTED)
{
LOG_INFO(log, e.message());
LOG_INFO_FORMATTED(log, e.message());
return BackgroundProcessingPoolTaskResult::ERROR;
}

View File

@ -2105,17 +2105,17 @@ BackgroundProcessingPoolTaskResult StorageReplicatedMergeTree::queueTask()
if (e.code() == ErrorCodes::NO_REPLICA_HAS_PART)
{
/// If no one has the right part, probably not all replicas work; We will not write to log with Error level.
LOG_INFO(log, e.displayText());
LOG_INFO_FORMATTED(log, e.displayText());
}
else if (e.code() == ErrorCodes::ABORTED)
{
/// Interrupted merge or downloading a part is not an error.
LOG_INFO(log, e.message());
LOG_INFO_FORMATTED(log, e.message());
}
else if (e.code() == ErrorCodes::PART_IS_TEMPORARILY_LOCKED)
{
/// Part cannot be added temporarily
LOG_INFO(log, e.displayText());
LOG_INFO_FORMATTED(log, e.displayText());
cleanup_thread.wakeup();
}
else
@ -3205,7 +3205,7 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p
message << "Cannot select parts for optimization";
if (!disable_reason.empty())
message << ": " << disable_reason;
LOG_INFO(log, message.rdbuf());
LOG_INFO_FORMATTED(log, message.str());
return handle_noop(message.str());
}
@ -4380,7 +4380,7 @@ void StorageReplicatedMergeTree::fetchPartition(const ASTPtr & partition, const
&& e.code() != ErrorCodes::CANNOT_READ_ALL_DATA)
throw;
LOG_INFO(log, e.displayText());
LOG_INFO_FORMATTED(log, e.displayText());
missing_parts.push_back(part);
}
}
@ -4830,7 +4830,7 @@ void StorageReplicatedMergeTree::clearBlocksInPartition(
zookeeper.removeRecursive(path);
}
else if (rc)
LOG_WARNING(log, "Error while deleting ZooKeeper path `" << path << "`: " + zkutil::ZooKeeper::error2string(rc) << ", ignoring.");
LOG_WARNING_FORMATTED(log, "Error while deleting ZooKeeper path `{}`: {}, ignoring.", path, zkutil::ZooKeeper::error2string(rc));
}
LOG_TRACE_FORMATTED(log, "Deleted {} deduplication block IDs in partition ID {}", to_delete_futures.size(), partition_id);