From 7b149be2c6924ae7d787a40ffb0e62cfb0040853 Mon Sep 17 00:00:00 2001 From: Andrey Mironov Date: Wed, 16 Sep 2015 20:49:08 +0300 Subject: [PATCH] Merge --- .../DB/Storages/MergeTree/MergeTreeReader.h | 45 +++++++------------ .../MergeTreeThreadBlockInputStream.h | 14 ++++-- 2 files changed, 26 insertions(+), 33 deletions(-) diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeReader.h b/dbms/include/DB/Storages/MergeTree/MergeTreeReader.h index 3187e742092..bfd35850edd 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeReader.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeReader.h @@ -24,33 +24,19 @@ namespace DB */ class MergeTreeReader { - typedef std::map OffsetColumns; + using OffsetColumns = std::map; + using ValueSizeMap = std::map; public: - MergeTreeReader(const String & path_, /// Путь к куску - const MergeTreeData::DataPartPtr & data_part, const NamesAndTypesList & columns_, - UncompressedCache * uncompressed_cache_, MarkCache * mark_cache_, - MergeTreeData & storage_, const MarkRanges & all_mark_ranges, - size_t aio_threshold_, size_t max_read_buffer_size_) - : uncompressed_cache(uncompressed_cache_), mark_cache(mark_cache_), storage(storage_), - aio_threshold(aio_threshold_), max_read_buffer_size(max_read_buffer_size_) + MergeTreeReader(const String & path, /// Путь к куску + const MergeTreeData::DataPartPtr & data_part, const NamesAndTypesList & columns, + UncompressedCache * uncompressed_cache, MarkCache * mark_cache, + MergeTreeData & storage, const MarkRanges & all_mark_ranges, + size_t aio_threshold, size_t max_read_buffer_size, const ValueSizeMap & avg_value_size_hints = ValueSizeMap{}) + : avg_value_size_hints(avg_value_size_hints), path(path), data_part(data_part), columns(columns), + uncompressed_cache(uncompressed_cache), mark_cache(mark_cache), storage(storage), + all_mark_ranges(all_mark_ranges), aio_threshold(aio_threshold), max_read_buffer_size(max_read_buffer_size) { - reconfigure(path_, data_part, columns_, all_mark_ranges); - } - - /** Allows to use the same MergeTreeReader across multiple data parts and/or columns and/or ranges, - * all while preserving avg_value_size_hints (may and does significantly improve read times). */ - void reconfigure( - const String & path, const MergeTreeData::DataPartPtr & data_part, const NamesAndTypesList & columns, - const MarkRanges & all_mark_ranges) - { - this->path = path; - this->data_part = data_part; - this->part_name = data_part->name; - this->columns = columns; - this->all_mark_ranges = all_mark_ranges; - this->streams.clear(); - try { if (!Poco::File(path).exists()) @@ -61,11 +47,13 @@ public: } catch (...) { - storage.reportBrokenPart(part_name); + storage.reportBrokenPart(data_part->name); throw; } } + const ValueSizeMap & getAvgValueSizeHints() const { return avg_value_size_hints; } + /** Если столбцов нет в блоке, добавляет их, если есть - добавляет прочитанные значения к ним в конец. * Не добавляет столбцы, для которых нет файлов. Чтобы их добавить, нужно вызвать fillMissingColumns. * В блоке должно быть либо ни одного столбца из columns, либо все, для которых есть файлы. @@ -121,7 +109,7 @@ public: catch (const Exception & e) { if (e.code() != ErrorCodes::MEMORY_LIMIT_EXCEEDED) - storage.reportBrokenPart(part_name); + storage.reportBrokenPart(data_part->name); /// Более хорошая диагностика. throw Exception(e.message() + "\n(while reading from part " + path + " from mark " + toString(from_mark) + " to " @@ -129,7 +117,7 @@ public: } catch (...) { - storage.reportBrokenPart(part_name); + storage.reportBrokenPart(data_part->name); throw; } @@ -286,10 +274,9 @@ private: typedef std::map > FileStreams; /// Используется в качестве подсказки, чтобы уменьшить количество реаллокаций при создании столбца переменной длины. - std::map avg_value_size_hints; + ValueSizeMap avg_value_size_hints; String path; MergeTreeData::DataPartPtr data_part; - String part_name; FileStreams streams; /// Запрашиваемые столбцы. diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeThreadBlockInputStream.h b/dbms/include/DB/Storages/MergeTree/MergeTreeThreadBlockInputStream.h index dea27b4bf46..2c5e01275d6 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeThreadBlockInputStream.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeThreadBlockInputStream.h @@ -121,11 +121,17 @@ private: } else { - /** reader and possible pre_reader were already created, just configure them to a new data part, ranges and - * columns to preserve internal state. */ - reader->reconfigure(path, task->data_part, task->columns, task->mark_ranges); + /// retain avg_value_size_hints + reader = std::make_unique( + path, task->data_part, task->columns, owned_uncompressed_cache.get(), owned_mark_cache.get(), + storage, task->mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size, + reader->getAvgValueSizeHints()); + if (prewhere_actions) - pre_reader->reconfigure(path, task->data_part, task->pre_columns, task->mark_ranges); + pre_reader = std::make_unique( + path, task->data_part, task->pre_columns, owned_uncompressed_cache.get(), + owned_mark_cache.get(), storage, task->mark_ranges, min_bytes_to_use_direct_io, + max_read_buffer_size, pre_reader->getAvgValueSizeHints()); } return true;