dbms: optimization of reading from files in merge tree [METR-10931]

This commit is contained in:
Sergey Fedorov 2014-04-23 19:34:47 +04:00
parent 6e55c822c0
commit 0fbd8a32be
2 changed files with 40 additions and 11 deletions

View File

@ -85,9 +85,9 @@ protected:
if (!reader)
{
UncompressedCache * uncompressed_cache = use_uncompressed_cache ? storage.context.getUncompressedCache() : NULL;
reader.reset(new MergeTreeReader(path, column_names, uncompressed_cache, storage));
reader.reset(new MergeTreeReader(path, column_names, uncompressed_cache, storage, all_mark_ranges));
if (prewhere_actions)
pre_reader.reset(new MergeTreeReader(path, pre_column_names, uncompressed_cache, storage));
pre_reader.reset(new MergeTreeReader(path, pre_column_names, uncompressed_cache, storage, all_mark_ranges));
}
if (prewhere_actions)

View File

@ -38,11 +38,11 @@ class MergeTreeReader
public:
MergeTreeReader(const String & path_, /// Путь к куску
const Names & columns_names_, bool use_uncompressed_cache_, MergeTreeData & storage_)
const Names & columns_names_, bool use_uncompressed_cache_, MergeTreeData & storage_, const MarkRanges & all_mark_ranges)
: path(path_), column_names(columns_names_), use_uncompressed_cache(use_uncompressed_cache_), storage(storage_)
{
for (Names::const_iterator it = column_names.begin(); it != column_names.end(); ++it)
addStream(*it, *storage.getDataTypeByName(*it));
addStream(*it, *storage.getDataTypeByName(*it), all_mark_ranges);
}
/** Если столбцов нет в блоке, добавляет их, если есть - добавляет прочитанные значения к ним в конец.
@ -193,19 +193,46 @@ private:
Poco::SharedPtr<CachedCompressedReadBuffer> cached_buffer;
Poco::SharedPtr<CompressedReadBufferFromFile> non_cached_buffer;
std::string path_prefix;
size_t max_mark_range;
Stream(const String & path_prefix, UncompressedCache * uncompressed_cache, MarkCache * mark_cache)
Stream(const String & path_prefix, UncompressedCache * uncompressed_cache, MarkCache * mark_cache, const MarkRanges & all_mark_ranges)
: path_prefix(path_prefix)
{
loadMarks(mark_cache);
size_t max_mark_range = 0;
for (size_t i = 0; i < all_mark_ranges.size(); ++i)
{
size_t right = all_mark_ranges[i].end;
/// Если правая граница лежит внутри блока, то его тоже придется читать.
if ((*marks)[right].offset_in_decompressed_block > 0)
{
while (right < (*marks).size() && (*marks)[right].offset_in_compressed_file ==
(*marks)[all_mark_ranges[i].end].offset_in_compressed_file)
++right;
}
/// Если правее засечек нет, просто используем DEFAULT_BUFFER_SIZE
if (right + 1 >= (*marks).size() && (*marks)[right].offset_in_compressed_file ==
(*marks)[all_mark_ranges[i].end].offset_in_compressed_file)
{
max_mark_range = DBMS_DEFAULT_BUFFER_SIZE;
break;
}
max_mark_range = std::max(max_mark_range, (*marks)[right].offset_in_compressed_file - (*marks)[all_mark_ranges[i].begin].offset_in_compressed_file);
}
size_t buffer_size = DBMS_DEFAULT_BUFFER_SIZE < max_mark_range ? DBMS_DEFAULT_BUFFER_SIZE : max_mark_range;
if (uncompressed_cache)
{
cached_buffer = new CachedCompressedReadBuffer(path_prefix + ".bin", uncompressed_cache);
cached_buffer = new CachedCompressedReadBuffer(path_prefix + ".bin", uncompressed_cache, buffer_size);
data_buffer = &*cached_buffer;
}
else
{
non_cached_buffer = new CompressedReadBufferFromFile(path_prefix + ".bin");
non_cached_buffer = new CompressedReadBufferFromFile(path_prefix + ".bin", buffer_size);
data_buffer = &*non_cached_buffer;
}
}
@ -245,7 +272,9 @@ private:
try
{
if (cached_buffer)
{
cached_buffer->seek(mark.offset_in_compressed_file, mark.offset_in_decompressed_block);
}
if (non_cached_buffer)
non_cached_buffer->seek(mark.offset_in_compressed_file, mark.offset_in_decompressed_block);
}
@ -271,7 +300,7 @@ private:
bool use_uncompressed_cache;
MergeTreeData & storage;
void addStream(const String & name, const IDataType & type, size_t level = 0)
void addStream(const String & name, const IDataType & type, const MarkRanges & all_mark_ranges, size_t level = 0)
{
String escaped_column_name = escapeForFileName(name);
@ -294,12 +323,12 @@ private:
if (!streams.count(size_name))
streams.emplace(size_name, std::unique_ptr<Stream>(new Stream(
path + escaped_size_name, uncompressed_cache, mark_cache)));
path + escaped_size_name, uncompressed_cache, mark_cache, all_mark_ranges)));
addStream(name, *type_arr->getNestedType(), level + 1);
addStream(name, *type_arr->getNestedType(), all_mark_ranges, level + 1);
}
else
streams[name].reset(new Stream(path + escaped_column_name, uncompressed_cache, mark_cache));
streams[name].reset(new Stream(path + escaped_column_name, uncompressed_cache, mark_cache, all_mark_ranges));
}
void readData(const String & name, const IDataType & type, IColumn & column, size_t from_mark, size_t max_rows_to_read,