From 5c9f92ac1e02a804ba07507518864f4c7df8fea7 Mon Sep 17 00:00:00 2001 From: Michael Kolupaev Date: Mon, 28 Jul 2014 14:36:11 +0400 Subject: [PATCH] Merge --- .../DB/Storages/MergeTree/MergeTreeData.h | 13 +++ .../MergeTree/MergeTreeDataSelectExecutor.h | 6 +- dbms/include/DB/Storages/StorageMergeTree.h | 10 +++ .../MergeTree/MergeTreeDataSelectExecutor.cpp | 87 +++++++++++++++---- 4 files changed, 99 insertions(+), 17 deletions(-) diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeData.h b/dbms/include/DB/Storages/MergeTree/MergeTreeData.h index baaa20c7bb8..6673d28324f 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeData.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeData.h @@ -10,6 +10,7 @@ #include #include #include +#include #include @@ -572,6 +573,18 @@ public: const NamesAndTypesList & getColumnsList() const { return *columns; } + NameAndTypePair getColumn(const String &column_name) const + { + if (column_name == "_part") return NameAndTypePair("_part", new DataTypeString); + return getRealColumn(column_name); + } + + bool hasColumn(const String &column_name) const + { + if (column_name == "_part") return true; + return hasRealColumn(column_name); + } + String getFullPath() const { return full_path; } String getLogName() const { return log_name; } diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeDataSelectExecutor.h b/dbms/include/DB/Storages/MergeTree/MergeTreeDataSelectExecutor.h index 7ea10726200..6fb8f03ed75 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeDataSelectExecutor.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeDataSelectExecutor.h @@ -55,7 +55,8 @@ private: size_t max_block_size, bool use_uncompressed_cache, ExpressionActionsPtr prewhere_actions, - const String & prewhere_column); + const String & prewhere_column, + bool add_virtual_column); BlockInputStreams spreadMarkRangesAmongThreadsFinal( RangesInDataParts parts, @@ -64,7 +65,8 @@ private: size_t max_block_size, bool use_uncompressed_cache, ExpressionActionsPtr prewhere_actions, - const String & prewhere_column); + const String & prewhere_column, + bool add_virtual_column); /// Создать выражение "Sign == 1". void createPositiveSignCondition(ExpressionActionsPtr & out_expression, String & out_column); diff --git a/dbms/include/DB/Storages/StorageMergeTree.h b/dbms/include/DB/Storages/StorageMergeTree.h index 80269000d35..2d4e452190f 100644 --- a/dbms/include/DB/Storages/StorageMergeTree.h +++ b/dbms/include/DB/Storages/StorageMergeTree.h @@ -50,6 +50,16 @@ public: const NamesAndTypesList & getColumnsList() const { return data.getColumnsList(); } + NameAndTypePair getColumn(const String &column_name) const + { + return data.getColumn(column_name); + } + + bool hasColumn(const String &column_name) const + { + return data.hasColumn(column_name); + } + BlockInputStreams read( const Names & column_names, ASTPtr query, diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 08588e3289b..7c0f47d9d69 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -6,6 +6,7 @@ #include #include #include +#include namespace DB { @@ -19,6 +20,19 @@ MergeTreeDataSelectExecutor::MergeTreeDataSelectExecutor(MergeTreeData & data_) } +/// Построить блок состоящий только из возможных значений виртуальных столбцов +static Block getBlockWithVirtualColumns(const MergeTreeData::DataPartsVector & parts) +{ + Block res; + ColumnWithNameAndType _part(new ColumnString, new DataTypeString, "_part"); + + for (const auto & part : parts) + _part.column->insert(part->name); + + res.insert(_part); + return res; +} + BlockInputStreams MergeTreeDataSelectExecutor::read( const Names & column_names_to_return, ASTPtr query, @@ -27,30 +41,60 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( size_t max_block_size, unsigned threads) { - data.check(column_names_to_return); + MergeTreeData::DataPartsVector parts; + + { + auto parts_set = data.getDataParts(); + parts.assign(parts_set.begin(), parts_set.end()); + } + + /// Если в запросе есть ограничения на виртуальный столбец _part, выберем только подходящие под него куски. + Names virt_column_names, real_column_names; + for (const auto & it : column_names_to_return) + if (it != "_part") + real_column_names.push_back(it); + else + virt_column_names.push_back(it); + + Block virtual_columns_block = getBlockWithVirtualColumns(parts); + BlockInputStreamPtr virtual_columns; + + /// Если запрошен хотя бы один виртуальный столбец, пробуем индексировать + if (!virt_column_names.empty()) + virtual_columns = VirtualColumnUtils::getVirtualColumnsBlocks(query->clone(), virtual_columns_block, data.context); + else /// Иначе, считаем допустимыми все возможные значения + virtual_columns = new OneBlockInputStream(virtual_columns_block); + + std::multiset values = VirtualColumnUtils::extractSingleValueFromBlocks(virtual_columns, "_part"); + + data.check(real_column_names); processed_stage = QueryProcessingStage::FetchColumns; PKCondition key_condition(query, data.context, data.getColumnsList(), data.getSortDescription()); PKCondition date_condition(query, data.context, data.getColumnsList(), SortDescription(1, SortColumnDescription(data.date_column_name, 1))); - MergeTreeData::DataPartsVector parts; - - /// Выберем куски, в которых могут быть данные, удовлетворяющие date_condition. + /// Выберем куски, в которых могут быть данные, удовлетворяющие date_condition, и которые подходят под условие на _part. { - MergeTreeData::DataParts data_parts = data.getDataParts(); + auto prev_parts = parts; + parts.clear(); - for (MergeTreeData::DataParts::iterator it = data_parts.begin(); it != data_parts.end(); ++it) + for (const auto & part : prev_parts) { - Field left = static_cast((*it)->left_date); - Field right = static_cast((*it)->right_date); + if (values.find(part->name) == values.end()) + continue; - if (date_condition.mayBeTrueInRange(&left, &right)) - parts.push_back(*it); + Field left = static_cast(part->left_date); + Field right = static_cast(part->right_date); + + if (!date_condition.mayBeTrueInRange(&left, &right)) + continue; + + parts.push_back(part); } } /// Семплирование. - Names column_names_to_read = column_names_to_return; + Names column_names_to_read = real_column_names; UInt64 sampling_column_value_limit = 0; typedef Poco::SharedPtr ASTFunctionPtr; ASTFunctionPtr filter_function; @@ -185,7 +229,8 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( max_block_size, settings.use_uncompressed_cache, prewhere_actions, - prewhere_column); + prewhere_column, + !virt_column_names.empty()); } else { @@ -196,7 +241,8 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( max_block_size, settings.use_uncompressed_cache, prewhere_actions, - prewhere_column); + prewhere_column, + !virt_column_names.empty()); } if (select.sample_size) @@ -220,7 +266,8 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreads( size_t max_block_size, bool use_uncompressed_cache, ExpressionActionsPtr prewhere_actions, - const String & prewhere_column) + const String & prewhere_column, + bool add_virtual_column) { /// На всякий случай перемешаем куски. std::random_shuffle(parts.begin(), parts.end()); @@ -282,6 +329,9 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreads( data.getFullPath() + part.data_part->name + '/', max_block_size, column_names, data, part.data_part, part.ranges, use_uncompressed_cache, prewhere_actions, prewhere_column)); + if (add_virtual_column) + streams.back() = new AddingConstColumnBlockInputStream( + streams.back(), new DataTypeString, part.data_part->name, "_part"); need_marks -= marks_in_part; parts.pop_back(); sum_marks_in_parts.pop_back(); @@ -312,6 +362,9 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreads( data.getFullPath() + part.data_part->name + '/', max_block_size, column_names, data, part.data_part, ranges_to_get_from_part, use_uncompressed_cache, prewhere_actions, prewhere_column)); + if (add_virtual_column) + streams.back() = new AddingConstColumnBlockInputStream( + streams.back(), new DataTypeString, part.data_part->name, "_part"); } if (streams.size() == 1) @@ -334,7 +387,8 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreadsFinal size_t max_block_size, bool use_uncompressed_cache, ExpressionActionsPtr prewhere_actions, - const String & prewhere_column) + const String & prewhere_column, + bool add_virtual_column) { size_t sum_marks = 0; for (size_t i = 0; i < parts.size(); ++i) @@ -358,6 +412,9 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreadsFinal data.getFullPath() + part.data_part->name + '/', max_block_size, column_names, data, part.data_part, part.ranges, use_uncompressed_cache, prewhere_actions, prewhere_column); + if (add_virtual_column) + source_stream = new AddingConstColumnBlockInputStream( + source_stream, new DataTypeString, part.data_part->name, "_part"); to_collapse.push_back(new ExpressionBlockInputStream(source_stream, data.getPrimaryExpression())); }