From 45b6abdfee344851d627694de8a69d730a52bc24 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sat, 23 May 2020 21:22:40 +0300 Subject: [PATCH] Progress on task --- src/Client/Connection.cpp | 9 +++++---- src/DataStreams/ColumnGathererStream.cpp | 19 ++++++++----------- .../ParallelAggregatingBlockInputStream.cpp | 4 ++-- src/Interpreters/Aggregator.cpp | 16 ++++++++-------- .../Transforms/AggregatingTransform.cpp | 2 +- 5 files changed, 24 insertions(+), 26 deletions(-) diff --git a/src/Client/Connection.cpp b/src/Client/Connection.cpp index 0173291e76c..ce060197f4a 100644 --- a/src/Client/Connection.cpp +++ b/src/Client/Connection.cpp @@ -507,11 +507,12 @@ void Connection::sendScalarsData(Scalars & data) msg << std::fixed << std::setprecision(3); msg << "Sent data for " << data.size() << " scalars, total " << rows << " rows in " << elapsed << " sec., " << static_cast(rows / watch.elapsedSeconds()) << " rows/sec., " - << maybe_compressed_out_bytes / 1048576.0 << " MiB (" << maybe_compressed_out_bytes / 1048576.0 / watch.elapsedSeconds() << " MiB/sec.)"; + << formatReadableSizeWithBinarySuffix(maybe_compressed_out_bytes) << " (" + << formatReadableSizeWithBinarySuffix(maybe_compressed_out_bytes / watch.elapsedSeconds()) << "/sec.)"; if (compression == Protocol::Compression::Enable) msg << ", compressed " << static_cast(maybe_compressed_out_bytes) / out_bytes << " times to " - << out_bytes / 1048576.0 << " MiB (" << out_bytes / 1048576.0 / watch.elapsedSeconds() << " MiB/sec.)"; + << formatReadableSizeWithBinarySuffix(out_bytes) << " (" << formatReadableSizeWithBinarySuffix(out_bytes / watch.elapsedSeconds()) << "/sec.)"; else msg << ", no compression."; @@ -607,11 +608,11 @@ void Connection::sendExternalTablesData(ExternalTablesData & data) msg << std::fixed << std::setprecision(3); msg << "Sent data for " << data.size() << " external tables, total " << rows << " rows in " << elapsed << " sec., " << static_cast(rows / watch.elapsedSeconds()) << " rows/sec., " - << maybe_compressed_out_bytes / 1048576.0 << " MiB (" << maybe_compressed_out_bytes / 1048576.0 / watch.elapsedSeconds() << " MiB/sec.)"; + << formatReadableSizeWithBinarySuffix(maybe_compressed_out_bytes) << " (" << formatReadableSizeWithBinarySuffix(maybe_compressed_out_bytes / watch.elapsedSeconds()) << "/sec.)"; if (compression == Protocol::Compression::Enable) msg << ", compressed " << static_cast(maybe_compressed_out_bytes) / out_bytes << " times to " - << out_bytes / 1048576.0 << " MiB (" << out_bytes / 1048576.0 / watch.elapsedSeconds() << " MiB/sec.)"; + << formatReadableSizeWithBinarySuffix(out_bytes) << " (" << formatReadableSizeWithBinarySuffix(out_bytes / watch.elapsedSeconds()) << "/sec.)"; else msg << ", no compression."; diff --git a/src/DataStreams/ColumnGathererStream.cpp b/src/DataStreams/ColumnGathererStream.cpp index 6115de8c1ab..f828ab92a47 100644 --- a/src/DataStreams/ColumnGathererStream.cpp +++ b/src/DataStreams/ColumnGathererStream.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include @@ -98,17 +99,13 @@ void ColumnGathererStream::readSuffixImpl() double seconds = profile_info.total_stopwatch.elapsedSeconds(); - std::stringstream message; - message << std::fixed << std::setprecision(2) - << "Gathered column " << column_name - << " (" << static_cast(profile_info.bytes) / profile_info.rows << " bytes/elem.)" - << " in " << seconds << " sec."; - - if (seconds) - message << ", " << profile_info.rows / seconds << " rows/sec., " - << profile_info.bytes / 1048576.0 / seconds << " MiB/sec."; - - LOG_TRACE(log, message.str()); + if (!seconds) + LOG_DEBUG_FORMATTED(log, "Gathered column {} ({} bytes/elem.) in 0 sec.", + column_name, static_cast(profile_info.bytes) / profile_info.rows); + else + LOG_DEBUG_FORMATTED(log, "Gathered column {} ({} bytes/elem.) in {} sec., {} rows/sec., {}/sec.", + column_name, static_cast(profile_info.bytes) / profile_info.rows, seconds, + profile_info.rows / seconds, formatReadableSizeWithBinarySuffix(profile_info.bytes / seconds)); } } diff --git a/src/DataStreams/ParallelAggregatingBlockInputStream.cpp b/src/DataStreams/ParallelAggregatingBlockInputStream.cpp index 979999ab745..8fcaecafd2a 100644 --- a/src/DataStreams/ParallelAggregatingBlockInputStream.cpp +++ b/src/DataStreams/ParallelAggregatingBlockInputStream.cpp @@ -184,7 +184,7 @@ void ParallelAggregatingBlockInputStream::execute() << " (from " << threads_data[i].src_bytes / 1048576.0 << " MiB)" << " in " << elapsed_seconds << " sec." << " (" << threads_data[i].src_rows / elapsed_seconds << " rows/sec., " - << threads_data[i].src_bytes / elapsed_seconds / 1048576.0 << " MiB/sec.)"); + << formatReadableSizeWithBinarySuffix(threads_data[i].src_bytes / elapsed_seconds) << "/sec.)"); total_src_rows += threads_data[i].src_rows; total_src_bytes += threads_data[i].src_bytes; @@ -192,7 +192,7 @@ void ParallelAggregatingBlockInputStream::execute() LOG_TRACE(log, "Total aggregated. " << total_src_rows << " rows (from " << total_src_bytes / 1048576.0 << " MiB)" << " in " << elapsed_seconds << " sec." - << " (" << total_src_rows / elapsed_seconds << " rows/sec., " << total_src_bytes / elapsed_seconds / 1048576.0 << " MiB/sec.)"); + << " (" << total_src_rows / elapsed_seconds << " rows/sec., " << formatReadableSizeWithBinarySuffix(total_src_bytes / elapsed_seconds) << "/sec.)"); /// If there was no data, and we aggregate without keys, we must return single row with the result of empty aggregation. /// To do this, we pass a block with zero rows to aggregate. diff --git a/src/Interpreters/Aggregator.cpp b/src/Interpreters/Aggregator.cpp index 6dc53befc9d..73f8840e2c9 100644 --- a/src/Interpreters/Aggregator.cpp +++ b/src/Interpreters/Aggregator.cpp @@ -764,14 +764,14 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, co LOG_TRACE(log, "Written part in " << elapsed_seconds << " sec., " << rows << " rows, " - << (uncompressed_bytes / 1048576.0) << " MiB uncompressed, " - << (compressed_bytes / 1048576.0) << " MiB compressed, " + << formatReadableSizeWithBinarySuffix(uncompressed_bytes) << " uncompressed, " + << formatReadableSizeWithBinarySuffix(compressed_bytes) << " compressed, " << (uncompressed_bytes / rows) << " uncompressed bytes per row, " << (compressed_bytes / rows) << " compressed bytes per row, " << "compression rate: " << (uncompressed_bytes / compressed_bytes) << " (" << (rows / elapsed_seconds) << " rows/sec., " - << (uncompressed_bytes / elapsed_seconds / 1048576.0) << " MiB/sec. uncompressed, " - << (compressed_bytes / elapsed_seconds / 1048576.0) << " MiB/sec. compressed)"); + << formatReadableSizeWithBinarySuffix(uncompressed_bytes / elapsed_seconds) << "/sec. uncompressed, " + << formatReadableSizeWithBinarySuffix(compressed_bytes / elapsed_seconds) << "/sec. compressed)"); } void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants) { @@ -940,9 +940,9 @@ void Aggregator::execute(const BlockInputStreamPtr & stream, AggregatedDataVaria double elapsed_seconds = watch.elapsedSeconds(); size_t rows = result.sizeWithoutOverflowRow(); LOG_TRACE(log, - "Aggregated. " << src_rows << " to " << rows << " rows (from " << src_bytes / 1048576.0 << " MiB)" + "Aggregated. " << src_rows << " to " << rows << " rows (from " << formatReadableSizeWithBinarySuffix(src_bytes) << ")" << " in " << elapsed_seconds << " sec." - << " (" << src_rows / elapsed_seconds << " rows/sec., " << src_bytes / elapsed_seconds / 1048576.0 << " MiB/sec.)"); + << " (" << src_rows / elapsed_seconds << " rows/sec., " << formatReadableSizeWithBinarySuffix(src_bytes / elapsed_seconds) << "/sec.)"); } @@ -1312,7 +1312,7 @@ BlocksList Aggregator::convertToBlocks(AggregatedDataVariants & data_variants, b "Converted aggregated data to blocks. " << rows << " rows, " << bytes / 1048576.0 << " MiB" << " in " << elapsed_seconds << " sec." - << " (" << rows / elapsed_seconds << " rows/sec., " << bytes / elapsed_seconds / 1048576.0 << " MiB/sec.)"); + << " (" << rows / elapsed_seconds << " rows/sec., " << formatReadableSizeWithBinarySuffix(bytes / elapsed_seconds) << "/sec.)"); return blocks; } @@ -2179,7 +2179,7 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final) "Merged partially aggregated blocks. " << rows << " rows, " << bytes / 1048576.0 << " MiB." << " in " << elapsed_seconds << " sec." - << " (" << rows / elapsed_seconds << " rows/sec., " << bytes / elapsed_seconds / 1048576.0 << " MiB/sec.)"); + << " (" << rows / elapsed_seconds << " rows/sec., " << formatReadableSizeWithBinarySuffix(bytes / elapsed_seconds) << "/sec.)"); if (isCancelled()) return {}; diff --git a/src/Processors/Transforms/AggregatingTransform.cpp b/src/Processors/Transforms/AggregatingTransform.cpp index a14e649b4d1..3672e23f302 100644 --- a/src/Processors/Transforms/AggregatingTransform.cpp +++ b/src/Processors/Transforms/AggregatingTransform.cpp @@ -543,7 +543,7 @@ void AggregatingTransform::initGenerate() LOG_TRACE(log, "Aggregated. " << src_rows << " to " << rows << " rows (from " << src_bytes / 1048576.0 << " MiB)" << " in " << elapsed_seconds << " sec." - << " (" << src_rows / elapsed_seconds << " rows/sec., " << src_bytes / elapsed_seconds / 1048576.0 << " MiB/sec.)"); + << " (" << src_rows / elapsed_seconds << " rows/sec., " << formatReadableSizeWithBinarySuffix(src_bytes / elapsed_seconds) << "/sec.)"); if (params->aggregator.hasTemporaryFiles()) {