Return logic for cache

This commit is contained in:
alesapin 2019-04-01 14:09:30 +03:00
parent 37427077b8
commit 69b623c47c
4 changed files with 52 additions and 47 deletions

View File

@ -98,6 +98,7 @@ struct Settings
M(SettingUInt64, merge_tree_min_bytes_for_seek, 0, "You can skip reading more than that number of bytes at the price of one seek per file.") \
M(SettingUInt64, merge_tree_coarse_index_granularity, 8, "If the index segment can contain the required keys, divide it into as many parts and recursively check them.") \
M(SettingUInt64, merge_tree_max_rows_to_use_cache, (1024 * 1024), "The maximum number of rows per request, to use the cache of uncompressed data. If the request is large, the cache is not used. (For large queries not to flush out the cache.)") \
M(SettingUInt64, merge_tree_max_bytes_to_use_cache, (600 * 1024 * 1024), "The maximum number of rows per request, to use the cache of uncompressed data. If the request is large, the cache is not used. (For large queries not to flush out the cache.)") \
\
M(SettingBool, merge_tree_uniform_read_distribution, true, "Distribute read from MergeTree over threads evenly, ensuring stable average execution time of each thread within one read operation.") \
\

View File

@ -617,6 +617,27 @@ BlockInputStreams MergeTreeDataSelectExecutor::readFromParts(
return res;
}
namespace {
size_t roundRowsOrBytesToMarks(
size_t rows_setting,
size_t bytes_setting,
const MergeTreeData::IndexGranularityInfo & granularity_info)
{
if (!granularity_info.is_adaptive)
{
size_t fixed_index_granularity = granularity_info.fixed_index_granularity;
return (rows_setting + fixed_index_granularity - 1) / fixed_index_granularity;
}
else
{
size_t index_granularity_bytes = granularity_info.index_granularity_bytes;
return (bytes_setting + index_granularity_bytes - 1) / index_granularity_bytes;
}
}
}
BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
RangesInDataParts && parts,
@ -628,7 +649,15 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
const Names & virt_columns,
const Settings & settings) const
{
const size_t min_marks_for_concurrent_read = getMinMarksForConcurrentRead(settings, data.index_granularity_info);
const size_t max_marks_to_use_cache = roundRowsOrBytesToMarks(
settings.merge_tree_max_rows_to_use_cache,
settings.merge_tree_max_bytes_to_use_cache,
data.index_granularity_info);
const size_t min_marks_for_concurrent_read = roundRowsOrBytesToMarks(
settings.merge_tree_min_rows_for_concurrent_read,
settings.merge_tree_min_bytes_for_concurrent_read,
data.index_granularity_info);
/// Count marks for each part.
std::vector<size_t> sum_marks_in_parts(parts.size());
@ -646,7 +675,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
sum_marks += sum_marks_in_parts[i];
}
if (total_rows > settings.merge_tree_max_rows_to_use_cache)
if (sum_marks > max_marks_to_use_cache)
use_uncompressed_cache = false;
BlockInputStreams res;
@ -768,11 +797,17 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal
const Names & virt_columns,
const Settings & settings) const
{
size_t sum_rows = 0;
for (size_t i = 0; i < parts.size(); ++i)
sum_rows += parts[i].getRowsCount();
const size_t max_marks_to_use_cache = roundRowsOrBytesToMarks(
settings.merge_tree_max_rows_to_use_cache,
settings.merge_tree_max_bytes_to_use_cache,
data.index_granularity_info);
if (sum_rows > settings.merge_tree_max_rows_to_use_cache)
size_t sum_marks = 0;
for (size_t i = 0; i < parts.size(); ++i)
for (size_t j = 0; j < parts[i].ranges.size(); ++j)
sum_marks += parts[i].ranges[j].end - parts[i].ranges[j].begin;
if (sum_marks > max_marks_to_use_cache)
use_uncompressed_cache = false;
BlockInputStreams to_merge;
@ -882,7 +917,10 @@ MarkRanges MergeTreeDataSelectExecutor::markRangesFromPKRange(
else
{
size_t used_key_size = key_condition.getMaxKeyColumn() + 1;
size_t min_marks_for_seek = getMinMarksForSeek(settings, data.index_granularity_info);
size_t min_marks_for_seek = roundRowsOrBytesToMarks(
settings.merge_tree_min_rows_for_seek,
settings.merge_tree_min_bytes_for_seek,
data.index_granularity_info);
/** There will always be disjoint suspicious segments on the stack, the leftmost one at the top (back).
* At each step, take the left segment and check if it fits.
@ -964,7 +1002,10 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
return ranges;
}
const size_t min_marks_for_seek = getMinMarksForSeek(settings, data.index_granularity_info);
const size_t min_marks_for_seek = roundRowsOrBytesToMarks(
settings.merge_tree_min_rows_for_seek,
settings.merge_tree_min_bytes_for_seek,
data.index_granularity_info);
size_t granules_dropped = 0;
@ -1016,35 +1057,6 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
return res;
}
size_t MergeTreeDataSelectExecutor::getMinMarksForConcurrentRead(
const Settings & settings,
const MergeTreeData::IndexGranularityInfo & granularity_info) const
{
if (!granularity_info.is_adaptive)
{
size_t fixed_index_granularity = granularity_info.fixed_index_granularity;
return (settings.merge_tree_min_rows_for_concurrent_read + fixed_index_granularity - 1) / fixed_index_granularity;
}
else
{
size_t index_granularity_bytes = granularity_info.index_granularity_bytes;
return (settings.merge_tree_min_bytes_for_concurrent_read + index_granularity_bytes - 1) / index_granularity_bytes;
}
}
size_t MergeTreeDataSelectExecutor::getMinMarksForSeek(
const Settings & settings,
const MergeTreeData::IndexGranularityInfo & granularity_info) const
{
if (!granularity_info.is_adaptive)
{
size_t fixed_index_granularity = granularity_info.fixed_index_granularity;
return (settings.merge_tree_min_rows_for_seek + fixed_index_granularity - 1) / fixed_index_granularity;
}
else
{
size_t index_granularity_bytes = granularity_info.index_granularity_bytes;
return (settings.merge_tree_min_bytes_for_seek + index_granularity_bytes - 1) / index_granularity_bytes;
}
}
}

View File

@ -88,14 +88,6 @@ private:
MergeTreeData::DataPartPtr part,
const MarkRanges & ranges,
const Settings & settings) const;
size_t getMinMarksForConcurrentRead(
const Settings & settings,
const MergeTreeData::IndexGranularityInfo & granularity_info) const;
size_t getMinMarksForSeek(
const Settings & settings,
const MergeTreeData::IndexGranularityInfo & granularity_info) const;
};
}

View File

@ -89,7 +89,7 @@ size_t MergeTreeRangeReader::DelayedStream::finalize(Block & block)
}
/// Skip some rows from begging of granule
/// Skip some rows from beging of granule
/// We don't know size of rows in compressed granule,
/// so have to read them and throw out
if (current_offset)