Progress on task

This commit is contained in:
Alexey Milovidov 2020-05-23 21:22:40 +03:00
parent 3384f3b77e
commit 45b6abdfee
5 changed files with 24 additions and 26 deletions

View File

@ -507,11 +507,12 @@ void Connection::sendScalarsData(Scalars & data)
msg << std::fixed << std::setprecision(3);
msg << "Sent data for " << data.size() << " scalars, total " << rows << " rows in " << elapsed << " sec., "
<< static_cast<size_t>(rows / watch.elapsedSeconds()) << " rows/sec., "
<< maybe_compressed_out_bytes / 1048576.0 << " MiB (" << maybe_compressed_out_bytes / 1048576.0 / watch.elapsedSeconds() << " MiB/sec.)";
<< formatReadableSizeWithBinarySuffix(maybe_compressed_out_bytes) << " ("
<< formatReadableSizeWithBinarySuffix(maybe_compressed_out_bytes / watch.elapsedSeconds()) << "/sec.)";
if (compression == Protocol::Compression::Enable)
msg << ", compressed " << static_cast<double>(maybe_compressed_out_bytes) / out_bytes << " times to "
<< out_bytes / 1048576.0 << " MiB (" << out_bytes / 1048576.0 / watch.elapsedSeconds() << " MiB/sec.)";
<< formatReadableSizeWithBinarySuffix(out_bytes) << " (" << formatReadableSizeWithBinarySuffix(out_bytes / watch.elapsedSeconds()) << "/sec.)";
else
msg << ", no compression.";
@ -607,11 +608,11 @@ void Connection::sendExternalTablesData(ExternalTablesData & data)
msg << std::fixed << std::setprecision(3);
msg << "Sent data for " << data.size() << " external tables, total " << rows << " rows in " << elapsed << " sec., "
<< static_cast<size_t>(rows / watch.elapsedSeconds()) << " rows/sec., "
<< maybe_compressed_out_bytes / 1048576.0 << " MiB (" << maybe_compressed_out_bytes / 1048576.0 / watch.elapsedSeconds() << " MiB/sec.)";
<< formatReadableSizeWithBinarySuffix(maybe_compressed_out_bytes) << " (" << formatReadableSizeWithBinarySuffix(maybe_compressed_out_bytes / watch.elapsedSeconds()) << "/sec.)";
if (compression == Protocol::Compression::Enable)
msg << ", compressed " << static_cast<double>(maybe_compressed_out_bytes) / out_bytes << " times to "
<< out_bytes / 1048576.0 << " MiB (" << out_bytes / 1048576.0 / watch.elapsedSeconds() << " MiB/sec.)";
<< formatReadableSizeWithBinarySuffix(out_bytes) << " (" << formatReadableSizeWithBinarySuffix(out_bytes / watch.elapsedSeconds()) << "/sec.)";
else
msg << ", no compression.";

View File

@ -1,6 +1,7 @@
#include <DataStreams/ColumnGathererStream.h>
#include <common/logger_useful.h>
#include <Common/typeid_cast.h>
#include <Common/formatReadable.h>
#include <IO/WriteHelpers.h>
#include <iomanip>
@ -98,17 +99,13 @@ void ColumnGathererStream::readSuffixImpl()
double seconds = profile_info.total_stopwatch.elapsedSeconds();
std::stringstream message;
message << std::fixed << std::setprecision(2)
<< "Gathered column " << column_name
<< " (" << static_cast<double>(profile_info.bytes) / profile_info.rows << " bytes/elem.)"
<< " in " << seconds << " sec.";
if (seconds)
message << ", " << profile_info.rows / seconds << " rows/sec., "
<< profile_info.bytes / 1048576.0 / seconds << " MiB/sec.";
LOG_TRACE(log, message.str());
if (!seconds)
LOG_DEBUG_FORMATTED(log, "Gathered column {} ({} bytes/elem.) in 0 sec.",
column_name, static_cast<double>(profile_info.bytes) / profile_info.rows);
else
LOG_DEBUG_FORMATTED(log, "Gathered column {} ({} bytes/elem.) in {} sec., {} rows/sec., {}/sec.",
column_name, static_cast<double>(profile_info.bytes) / profile_info.rows, seconds,
profile_info.rows / seconds, formatReadableSizeWithBinarySuffix(profile_info.bytes / seconds));
}
}

View File

@ -184,7 +184,7 @@ void ParallelAggregatingBlockInputStream::execute()
<< " (from " << threads_data[i].src_bytes / 1048576.0 << " MiB)"
<< " in " << elapsed_seconds << " sec."
<< " (" << threads_data[i].src_rows / elapsed_seconds << " rows/sec., "
<< threads_data[i].src_bytes / elapsed_seconds / 1048576.0 << " MiB/sec.)");
<< formatReadableSizeWithBinarySuffix(threads_data[i].src_bytes / elapsed_seconds) << "/sec.)");
total_src_rows += threads_data[i].src_rows;
total_src_bytes += threads_data[i].src_bytes;
@ -192,7 +192,7 @@ void ParallelAggregatingBlockInputStream::execute()
LOG_TRACE(log,
"Total aggregated. " << total_src_rows << " rows (from " << total_src_bytes / 1048576.0 << " MiB)"
<< " in " << elapsed_seconds << " sec."
<< " (" << total_src_rows / elapsed_seconds << " rows/sec., " << total_src_bytes / elapsed_seconds / 1048576.0 << " MiB/sec.)");
<< " (" << total_src_rows / elapsed_seconds << " rows/sec., " << formatReadableSizeWithBinarySuffix(total_src_bytes / elapsed_seconds) << "/sec.)");
/// If there was no data, and we aggregate without keys, we must return single row with the result of empty aggregation.
/// To do this, we pass a block with zero rows to aggregate.

View File

@ -764,14 +764,14 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, co
LOG_TRACE(log,
"Written part in " << elapsed_seconds << " sec., "
<< rows << " rows, "
<< (uncompressed_bytes / 1048576.0) << " MiB uncompressed, "
<< (compressed_bytes / 1048576.0) << " MiB compressed, "
<< formatReadableSizeWithBinarySuffix(uncompressed_bytes) << " uncompressed, "
<< formatReadableSizeWithBinarySuffix(compressed_bytes) << " compressed, "
<< (uncompressed_bytes / rows) << " uncompressed bytes per row, "
<< (compressed_bytes / rows) << " compressed bytes per row, "
<< "compression rate: " << (uncompressed_bytes / compressed_bytes)
<< " (" << (rows / elapsed_seconds) << " rows/sec., "
<< (uncompressed_bytes / elapsed_seconds / 1048576.0) << " MiB/sec. uncompressed, "
<< (compressed_bytes / elapsed_seconds / 1048576.0) << " MiB/sec. compressed)");
<< formatReadableSizeWithBinarySuffix(uncompressed_bytes / elapsed_seconds) << "/sec. uncompressed, "
<< formatReadableSizeWithBinarySuffix(compressed_bytes / elapsed_seconds) << "/sec. compressed)");
}
void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants)
{
@ -940,9 +940,9 @@ void Aggregator::execute(const BlockInputStreamPtr & stream, AggregatedDataVaria
double elapsed_seconds = watch.elapsedSeconds();
size_t rows = result.sizeWithoutOverflowRow();
LOG_TRACE(log,
"Aggregated. " << src_rows << " to " << rows << " rows (from " << src_bytes / 1048576.0 << " MiB)"
"Aggregated. " << src_rows << " to " << rows << " rows (from " << formatReadableSizeWithBinarySuffix(src_bytes) << ")"
<< " in " << elapsed_seconds << " sec."
<< " (" << src_rows / elapsed_seconds << " rows/sec., " << src_bytes / elapsed_seconds / 1048576.0 << " MiB/sec.)");
<< " (" << src_rows / elapsed_seconds << " rows/sec., " << formatReadableSizeWithBinarySuffix(src_bytes / elapsed_seconds) << "/sec.)");
}
@ -1312,7 +1312,7 @@ BlocksList Aggregator::convertToBlocks(AggregatedDataVariants & data_variants, b
"Converted aggregated data to blocks. "
<< rows << " rows, " << bytes / 1048576.0 << " MiB"
<< " in " << elapsed_seconds << " sec."
<< " (" << rows / elapsed_seconds << " rows/sec., " << bytes / elapsed_seconds / 1048576.0 << " MiB/sec.)");
<< " (" << rows / elapsed_seconds << " rows/sec., " << formatReadableSizeWithBinarySuffix(bytes / elapsed_seconds) << "/sec.)");
return blocks;
}
@ -2179,7 +2179,7 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
"Merged partially aggregated blocks. "
<< rows << " rows, " << bytes / 1048576.0 << " MiB."
<< " in " << elapsed_seconds << " sec."
<< " (" << rows / elapsed_seconds << " rows/sec., " << bytes / elapsed_seconds / 1048576.0 << " MiB/sec.)");
<< " (" << rows / elapsed_seconds << " rows/sec., " << formatReadableSizeWithBinarySuffix(bytes / elapsed_seconds) << "/sec.)");
if (isCancelled())
return {};

View File

@ -543,7 +543,7 @@ void AggregatingTransform::initGenerate()
LOG_TRACE(log,
"Aggregated. " << src_rows << " to " << rows << " rows (from " << src_bytes / 1048576.0 << " MiB)"
<< " in " << elapsed_seconds << " sec."
<< " (" << src_rows / elapsed_seconds << " rows/sec., " << src_bytes / elapsed_seconds / 1048576.0 << " MiB/sec.)");
<< " (" << src_rows / elapsed_seconds << " rows/sec., " << formatReadableSizeWithBinarySuffix(src_bytes / elapsed_seconds) << "/sec.)");
if (params->aggregator.hasTemporaryFiles())
{