dbms: Aggregator: more logging [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2013-02-04 03:31:53 +00:00
parent 04d9e21301
commit b9bf7544dd

View File

@ -131,10 +131,18 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re
*/
bool no_more_keys = false;
LOG_TRACE(log, "Aggregating");
Stopwatch watch;
/// Читаем все данные
size_t src_rows = 0;
size_t src_bytes = 0;
while (Block block = stream->read())
{
initialize(block);
src_rows += block.rows();
src_bytes += block.bytes();
for (size_t i = 0; i < aggregates_size; ++i)
{
@ -417,6 +425,13 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re
throw Exception("Logical error: unkown overflow mode", ErrorCodes::LOGICAL_ERROR);
}
}
double elapsed_seconds = watch.elapsedSeconds();
size_t rows = result.size();
LOG_TRACE(log, std::fixed << std::setprecision(3)
<< "Aggregated. " << src_rows << " to " << rows << " rows (from " << src_bytes / 1048576.0 << " MiB)"
<< " in " << elapsed_seconds << " sec."
<< " (" << src_rows / elapsed_seconds << " rows/sec., " << src_bytes / elapsed_seconds / 1048576.0 << " MiB/sec.)");
}
@ -528,9 +543,10 @@ Block Aggregator::convertToBlock(AggregatedDataVariants & data_variants)
res.getByPosition(i).column->cut(0, rows);
double elapsed_seconds = watch.elapsedSeconds();
LOG_TRACE(log, "Converted aggregated data to block. "
LOG_TRACE(log, std::fixed << std::setprecision(3)
<< "Converted aggregated data to block. "
<< rows << " rows, " << res.bytes() / 1048576.0 << " MiB"
<< " in " << std::fixed << std::setprecision(3) << elapsed_seconds << " sec. "
<< " in " << elapsed_seconds << " sec."
<< " (" << rows / elapsed_seconds << " rows/sec., " << res.bytes() / elapsed_seconds / 1048576.0 << " MiB/sec.)");
return res;
@ -544,11 +560,15 @@ AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_va
LOG_TRACE(log, "Merging aggregated data");
Stopwatch watch;
AggregatedDataVariantsPtr res = data_variants[0];
/// Все результаты агрегации соединяем с первым.
size_t rows = res->size();
for (size_t i = 1, size = data_variants.size(); i < size; ++i)
{
rows += data_variants[i]->size();
AggregatedDataVariants & current = *data_variants[i];
if (current.empty())
@ -673,7 +693,14 @@ AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_va
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
}
LOG_TRACE(log, "Merged aggregated data");
double elapsed_seconds = watch.elapsedSeconds();
size_t res_rows = res->size();
LOG_TRACE(log, std::fixed << std::setprecision(3)
<< "Merged aggregated data. "
<< "From " << rows << " to " << res_rows << " rows (efficiency: " << static_cast<double>(rows) / res_rows << ")"
<< " in " << elapsed_seconds << " sec."
<< " (" << rows / elapsed_seconds << " rows/sec.)");
return res;
}