Fixed merges progress bar. [#CLICKHOUSE-2]

This commit is contained in:
Vitaliy Lyudvichenko 2017-09-14 16:56:54 +03:00 committed by alexey-milovidov
parent a43b9ec398
commit 826c354ff5
4 changed files with 19 additions and 9 deletions

View File

@ -118,8 +118,13 @@ void ColumnGathererStream::fetchNewBlock(Source & source, size_t source_num)
void ColumnGathererStream::readSuffixImpl()
{
const BlockStreamProfileInfo & profile_info = getProfileInfo();
/// Don't print info for small parts (< 10M rows)
if (profile_info.rows < 10000000)
return;
double seconds = profile_info.total_stopwatch.elapsedSeconds();
LOG_DEBUG(log, std::fixed << std::setprecision(2)
LOG_TRACE(log, std::fixed << std::setprecision(2)
<< "Gathered column " << name
<< " (" << static_cast<double>(profile_info.bytes) / profile_info.rows << " bytes/elem.)"
<< " in " << seconds << " sec., "

View File

@ -386,7 +386,7 @@ public:
* - amount of merged rows and their size (PK columns subset is used in case of Vertical merge)
* - time elapsed for current merge.
*/
class MergeProgressCallback : public ProgressCallback
class MergeProgressCallback
{
public:
MergeProgressCallback(MergeList::Entry & merge_entry_, UInt64 & watch_prev_elapsed_)
@ -540,14 +540,14 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
BlockInputStreams src_streams;
UInt64 watch_prev_elapsed = 0;
for (size_t i = 0; i < parts.size(); ++i)
for (const auto & part : parts)
{
auto input = std::make_unique<MergeTreeBlockInputStream>(
data, parts[i], DEFAULT_MERGE_BLOCK_SIZE, 0, 0, merging_column_names, MarkRanges(1, MarkRange(0, parts[i]->size)),
data, part, DEFAULT_MERGE_BLOCK_SIZE, 0, 0, merging_column_names, MarkRanges(1, MarkRange(0, part->size)),
false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false);
input->setProgressCallback(
MergeProgressCallback{merge_entry, sum_input_rows_upper_bound, column_sizes, watch_prev_elapsed, merge_alg});
input->setProgressCallback(MergeProgressCallback(
merge_entry, sum_input_rows_upper_bound, column_sizes, watch_prev_elapsed, merge_alg));
if (data.merging_params.mode != MergeTreeData::MergingParams::Unsorted)
src_streams.emplace_back(std::make_shared<MaterializingBlockInputStream>(
@ -680,8 +680,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
data, parts[part_num], DEFAULT_MERGE_BLOCK_SIZE, 0, 0, column_name_, MarkRanges{MarkRange(0, parts[part_num]->size)},
false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false, Names{}, 0, true);
column_part_stream->setProgressCallback(
MergeProgressCallbackVerticalStep{merge_entry, sum_input_rows_exact, column_sizes, column_name, watch_prev_elapsed});
column_part_stream->setProgressCallback(MergeProgressCallbackVerticalStep(
merge_entry, sum_input_rows_exact, column_sizes, column_name, watch_prev_elapsed));
column_part_streams[part_num] = std::move(column_part_stream);
}

View File

@ -54,7 +54,7 @@ BlockInputStreams StorageSystemMerges::read(
block.getByPosition(i++).column->insert(merge.database);
block.getByPosition(i++).column->insert(merge.table);
block.getByPosition(i++).column->insert(merge.elapsed);
block.getByPosition(i++).column->insert(std::min(1., merge.progress)); /// little cheat
block.getByPosition(i++).column->insert(merge.progress);
block.getByPosition(i++).column->insert(merge.num_parts);
block.getByPosition(i++).column->insert(merge.source_part_names);
block.getByPosition(i++).column->insert(merge.result_part_name);

View File

@ -63,6 +63,8 @@ function test {
echo
}
merged_rows_0=`clickhouse-client -q "select value from system.events where event = 'MergedRows'"`
test 8191 8191
test 8191 8192
test 8192 8191
@ -76,4 +78,7 @@ test 8193 8194
test 8194 8193
test 8194 8194
merged_rows_1=`clickhouse-client -q "select value from system.events where event = 'MergedRows'"`
[[ $merged_rows_1 -le $merged_rows_0 ]]
cleanup