Merge pull request #30111 from ClickHouse/remove_trash

Remove trash from MergeTreeReadPool
This commit is contained in:
alesapin 2021-10-16 15:41:41 +03:00 committed by GitHub
commit 9418eda122
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 13 additions and 34 deletions

View File

@ -142,30 +142,6 @@ MergeTreeReadTaskPtr MergeTreeReadPool::getTask(const size_t min_marks_to_read,
prewhere_info && prewhere_info->remove_prewhere_column, per_part_should_reorder[part_idx], std::move(curr_task_size_predictor));
}
MarkRanges MergeTreeReadPool::getRestMarks(const IMergeTreeDataPart & part, const MarkRange & from) const
{
MarkRanges all_part_ranges;
/// Inefficient in presence of large number of data parts.
for (const auto & part_ranges : parts_ranges)
{
if (part_ranges.data_part.get() == &part)
{
all_part_ranges = part_ranges.ranges;
break;
}
}
if (all_part_ranges.empty())
throw Exception("Trying to read marks range [" + std::to_string(from.begin) + ", " + std::to_string(from.end) + "] from part '"
+ part.getFullPath() + "' which has no ranges in this query", ErrorCodes::LOGICAL_ERROR);
auto begin = std::lower_bound(all_part_ranges.begin(), all_part_ranges.end(), from, [] (const auto & f, const auto & s) { return f.begin < s.begin; });
if (begin == all_part_ranges.end())
begin = std::prev(all_part_ranges.end());
begin->begin = from.begin;
return MarkRanges(begin, all_part_ranges.end());
}
Block MergeTreeReadPool::getHeader() const
{
return metadata_snapshot->getSampleBlockForColumns(column_names, data.getVirtuals(), data.getStorageID());

View File

@ -85,9 +85,6 @@ public:
*/
void profileFeedback(const ReadBufferFromFileBase::ProfileInfo info);
/// This method tells which mark ranges we have to read if we start from @from mark range
MarkRanges getRestMarks(const IMergeTreeDataPart & part, const MarkRange & from) const;
Block getHeader() const;
private:

View File

@ -68,18 +68,16 @@ bool MergeTreeThreadSelectProcessor::getNewTask()
if (!reader)
{
auto rest_mark_ranges = pool->getRestMarks(*task->data_part, task->mark_ranges[0]);
if (use_uncompressed_cache)
owned_uncompressed_cache = storage.getContext()->getUncompressedCache();
owned_mark_cache = storage.getContext()->getMarkCache();
reader = task->data_part->getReader(task->columns, metadata_snapshot, rest_mark_ranges,
reader = task->data_part->getReader(task->columns, metadata_snapshot, task->mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
IMergeTreeReader::ValueSizeMap{}, profile_callback);
if (prewhere_info)
pre_reader = task->data_part->getReader(task->pre_columns, metadata_snapshot, rest_mark_ranges,
pre_reader = task->data_part->getReader(task->pre_columns, metadata_snapshot, task->mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
IMergeTreeReader::ValueSizeMap{}, profile_callback);
}
@ -88,14 +86,13 @@ bool MergeTreeThreadSelectProcessor::getNewTask()
/// in other case we can reuse readers, anyway they will be "seeked" to required mark
if (part_name != last_readed_part_name)
{
auto rest_mark_ranges = pool->getRestMarks(*task->data_part, task->mark_ranges[0]);
/// retain avg_value_size_hints
reader = task->data_part->getReader(task->columns, metadata_snapshot, rest_mark_ranges,
reader = task->data_part->getReader(task->columns, metadata_snapshot, task->mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
reader->getAvgValueSizeHints(), profile_callback);
if (prewhere_info)
pre_reader = task->data_part->getReader(task->pre_columns, metadata_snapshot, rest_mark_ranges,
pre_reader = task->data_part->getReader(task->pre_columns, metadata_snapshot, task->mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
reader->getAvgValueSizeHints(), profile_callback);
}

View File

@ -0,0 +1,2 @@
468426149779992039
1

View File

@ -0,0 +1,7 @@
SELECT sum(cityHash64(*)) FROM test.hits SETTINGS max_threads=40;
-- We had a bug which lead to additional compressed data read. test.hits compressed size is about 1.2Gb, but we read more then 3Gb.
-- Small additional reads still possible, so we compare with about 1.5Gb.
SYSTEM FLUSH LOGS;
SELECT ProfileEvents['ReadBufferFromFileDescriptorReadBytes'] < 1500000000 from system.query_log where query = 'SELECT sum(cityHash64(*)) FROM test.hits SETTINGS max_threads=40;' and current_database = currentDatabase() and type = 'QueryFinish';