From 2bb5a4749b6c6ed053f6ee62461d9368da910640 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Fri, 15 Apr 2016 22:09:42 +0300 Subject: [PATCH] Merge --- dbms/CMakeLists.txt | 2 + .../CollapsingSortedBlockInputStream.h | 2 +- .../ReplacingSortedBlockInputStream.h | 71 +++++++++++ .../CollapsingSortedBlockInputStream.cpp | 4 +- .../ReplacingSortedBlockInputStream.cpp | 115 ++++++++++++++++++ .../MergeTree/MergeTreeDataMerger.cpp | 13 +- .../MergeTree/MergeTreeDataSelectExecutor.cpp | 6 +- dbms/src/Storages/StorageFactory.cpp | 39 ++++-- 8 files changed, 234 insertions(+), 18 deletions(-) create mode 100644 dbms/include/DB/DataStreams/ReplacingSortedBlockInputStream.h create mode 100644 dbms/src/DataStreams/ReplacingSortedBlockInputStream.cpp diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index ff842ece7e8..421cb9672df 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -193,6 +193,7 @@ add_library (dbms include/DB/DataStreams/MergingAggregatedBlockInputStream.h include/DB/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h include/DB/DataStreams/SummingSortedBlockInputStream.h + include/DB/DataStreams/ReplacingSortedBlockInputStream.h include/DB/DataStreams/AddingConstColumnBlockInputStream.h include/DB/DataStreams/DistinctBlockInputStream.h include/DB/DataStreams/BlockOutputStreamFromRowOutputStream.h @@ -705,6 +706,7 @@ add_library (dbms src/DataStreams/CSVRowOutputStream.cpp src/DataStreams/CSVRowInputStream.cpp src/DataStreams/SummingSortedBlockInputStream.cpp + src/DataStreams/ReplacingSortedBlockInputStream.cpp src/DataStreams/TotalsHavingBlockInputStream.cpp src/DataStreams/CreatingSetsBlockInputStream.cpp src/DataStreams/DistinctBlockInputStream.cpp diff --git a/dbms/include/DB/DataStreams/CollapsingSortedBlockInputStream.h b/dbms/include/DB/DataStreams/CollapsingSortedBlockInputStream.h index cee1e4da93e..3dba4cb00c3 100644 --- a/dbms/include/DB/DataStreams/CollapsingSortedBlockInputStream.h +++ b/dbms/include/DB/DataStreams/CollapsingSortedBlockInputStream.h @@ -83,7 +83,7 @@ private: template void merge(ColumnPlainPtrs & merged_columns, std::priority_queue & queue); - /// Вставить в результат строки для текущего идентификатора "визита". + /// Вставить в результат строки для текущего первичного ключа. void insertRows(ColumnPlainPtrs & merged_columns, size_t & merged_rows, bool last_in_stream = false); void reportIncorrectData(); diff --git a/dbms/include/DB/DataStreams/ReplacingSortedBlockInputStream.h b/dbms/include/DB/DataStreams/ReplacingSortedBlockInputStream.h new file mode 100644 index 00000000000..9f1fdbf7513 --- /dev/null +++ b/dbms/include/DB/DataStreams/ReplacingSortedBlockInputStream.h @@ -0,0 +1,71 @@ +#pragma once + +#include + +#include + + +namespace DB +{ + +/** Соединяет несколько сортированных потоков в один. + * При этом, для каждой группы идущих подряд одинаковых значений первичного ключа (столбцов, по которым сортируются данные), + * оставляет + */ +class ReplacingSortedBlockInputStream : public MergingSortedBlockInputStream +{ +public: + ReplacingSortedBlockInputStream(BlockInputStreams inputs_, const SortDescription & description_, + const String & version_column_, size_t max_block_size_) + : MergingSortedBlockInputStream(inputs_, description_, max_block_size_), + version_column(version_column_) + { + } + + String getName() const override { return "ReplacingSorted"; } + + String getID() const override + { + std::stringstream res; + res << "ReplacingSorted(inputs"; + + for (size_t i = 0; i < children.size(); ++i) + res << ", " << children[i]->getID(); + + res << ", description"; + + for (size_t i = 0; i < description.size(); ++i) + res << ", " << description[i].getID(); + + res << ", version_column, " << version_column << ")"; + return res.str(); + } + +protected: + /// Может возвращаться на 1 больше записей, чем max_block_size. + Block readImpl() override; + +private: + String version_column; + ssize_t version_column_number = -1; + + Logger * log = &Logger::get("ReplacingSortedBlockInputStream"); + + /// Прочитали до конца. + bool finished = false; + + RowRef current_key; /// Текущий первичный ключ. + RowRef next_key; /// Первичный ключ следующей строки. + + RowRef selected_row; /// Последняя строка с максимальной версией для текущего первичного ключа. + + UInt64 max_version = 0; /// Максимальная версия для текущего первичного ключа. + + template + void merge(ColumnPlainPtrs & merged_columns, std::priority_queue & queue); + + /// Вставить в результат строки для текущего первичного ключа. + void insertRow(ColumnPlainPtrs & merged_columns, size_t & merged_rows); +}; + +} diff --git a/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp b/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp index 392e1e995e1..b3d80677fe3 100644 --- a/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp @@ -151,7 +151,7 @@ void CollapsingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, s if (key_differs) { - /// Запишем данные для предыдущего визита. + /// Запишем данные для предыдущего первичного ключа. insertRows(merged_columns, merged_rows); current_key.swap(next_key); @@ -193,7 +193,7 @@ void CollapsingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, s } } - /// Запишем данные для последнего визита. + /// Запишем данные для последнего первичного ключа. insertRows(merged_columns, merged_rows, true); finished = true; diff --git a/dbms/src/DataStreams/ReplacingSortedBlockInputStream.cpp b/dbms/src/DataStreams/ReplacingSortedBlockInputStream.cpp new file mode 100644 index 00000000000..f0657d42f5c --- /dev/null +++ b/dbms/src/DataStreams/ReplacingSortedBlockInputStream.cpp @@ -0,0 +1,115 @@ +#include +#include + + +namespace DB +{ + + +void ReplacingSortedBlockInputStream::insertRow(ColumnPlainPtrs & merged_columns, size_t & merged_rows) +{ + ++merged_rows; + for (size_t i = 0; i < num_columns; ++i) + merged_columns[i]->insertFrom(*selected_row.columns[i], selected_row.row_num); +} + + +Block ReplacingSortedBlockInputStream::readImpl() +{ + if (finished) + return Block(); + + if (children.size() == 1) + return children[0]->read(); + + Block merged_block; + ColumnPlainPtrs merged_columns; + + init(merged_block, merged_columns); + if (merged_columns.empty()) + return Block(); + + /// Дополнительная инициализация. + if (selected_row.empty()) + { + selected_row.columns.resize(num_columns); + + if (!version_column.empty()) + version_column_number = merged_block.getPositionByName(version_column); + } + + if (has_collation) + merge(merged_columns, queue_with_collation); + else + merge(merged_columns, queue); + + return merged_block; +} + + +template +void ReplacingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, std::priority_queue & queue) +{ + size_t merged_rows = 0; + + /// Вынимаем строки в нужном порядке и кладём в merged_block, пока строк не больше max_block_size + while (!queue.empty()) + { + TSortCursor current = queue.top(); + + if (current_key.empty()) + { + current_key.columns.resize(description.size()); + next_key.columns.resize(description.size()); + + setPrimaryKeyRef(current_key, current); + } + + UInt64 version = version_column_number != -1 + ? current->all_columns[version_column_number]->get64(current->pos) + : 0; + + setPrimaryKeyRef(next_key, current); + + bool key_differs = next_key != current_key; + + /// если накопилось достаточно строк и последняя посчитана полностью + if (key_differs && merged_rows >= max_block_size) + return; + + queue.pop(); + + if (key_differs) + { + max_version = 0; + /// Запишем данные для предыдущего первичного ключа. + insertRow(merged_columns, merged_rows); + current_key.swap(next_key); + } + + /// Нестрогое сравнение, так как мы выбираем последнюю строку для одинаковых значений версий. + if (version >= max_version) + { + max_version = version; + setRowRef(selected_row, current); + } + + if (!current->isLast()) + { + current->next(); + queue.push(current); + } + else + { + /// Достаём из соответствующего источника следующий блок, если есть. + fetchNextBlock(current, queue); + } + } + + /// Запишем данные для последнего первичного ключа. + insertRow(merged_columns, merged_rows); + + finished = true; +} + +} diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp index 115dd7486a3..3205edd8e1a 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -438,9 +439,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE); break; - case MergeTreeData::MergingParams::Replacing: /// TODO - merged_stream = std::make_unique( - src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE); + case MergeTreeData::MergingParams::Replacing: + merged_stream = std::make_unique( + src_streams, data.getSortDescription(), data.merging_params.version_column, DEFAULT_MERGE_BLOCK_SIZE); break; case MergeTreeData::MergingParams::Unsorted: @@ -698,9 +699,9 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition( src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE); break; - case MergeTreeData::MergingParams::Replacing: /// TODO - merged_stream = std::make_unique( - src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE); + case MergeTreeData::MergingParams::Replacing: + merged_stream = std::make_unique( + src_streams, data.getSortDescription(), data.merging_params.version_column, DEFAULT_MERGE_BLOCK_SIZE); break; case MergeTreeData::MergingParams::Unsorted: diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index f330efd12ba..ec123b2c600 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -834,8 +835,9 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreadsFinal merged = new AggregatingSortedBlockInputStream(to_merge, data.getSortDescription(), max_block_size); break; - case MergeTreeData::MergingParams::Replacing: /// TODO - merged = new MergingSortedBlockInputStream(to_merge, data.getSortDescription(), max_block_size); + case MergeTreeData::MergingParams::Replacing: /// TODO Сделать ReplacingFinalBlockInputStream + merged = new ReplacingSortedBlockInputStream(to_merge, + data.getSortDescription(), data.merging_params.version_column, max_block_size); break; case MergeTreeData::MergingParams::Unsorted: diff --git a/dbms/src/Storages/StorageFactory.cpp b/dbms/src/Storages/StorageFactory.cpp index 61d8a216e4e..64b2cb1b712 100644 --- a/dbms/src/Storages/StorageFactory.cpp +++ b/dbms/src/Storages/StorageFactory.cpp @@ -361,15 +361,17 @@ StoragePtr StorageFactory::get( * (запрос с SAMPLE x будет выбирать строки, у которых в этом столбце значение меньше, чем x * UINT32_MAX); * - выражение для сортировки (либо скалярное выражение, либо tuple из нескольких); * - index_granularity; - * - (для Collapsing) имя столбца, содержащего тип строчки с изменением "визита" (принимающего значения 1 и -1). + * - (для Collapsing) имя столбца типа Int8, содержащего тип строчки с изменением "визита" (принимающего значения 1 и -1). * Например: ENGINE = ReplicatedCollapsingMergeTree('/tables/mytable', 'rep02', EventDate, (CounterID, EventDate, intHash32(UniqID), VisitID), 8192, Sign). * - (для Summing, не обязательно) кортеж столбцов, которых следует суммировать. Если не задано - используются все числовые столбцы, не входящие в первичный ключ. + * - (для Replacing, не обязательно) имя столбца одного из UInt типов, обозначающего "версию" * Например: ENGINE = ReplicatedCollapsingMergeTree('/tables/mytable', 'rep02', EventDate, (CounterID, EventDate, intHash32(UniqID), VisitID), 8192, Sign). * * MergeTree(date, [sample_key], primary_key, index_granularity) * CollapsingMergeTree(date, [sample_key], primary_key, index_granularity, sign) * SummingMergeTree(date, [sample_key], primary_key, index_granularity, [columns_to_sum]) * AggregatingMergeTree(date, [sample_key], primary_key, index_granularity) + * ReplacingMergeTree(date, [sample_key], primary_key, index_granularity, [version_column]) * UnsortedMergeTree(date, index_granularity) TODO Добавить описание ниже. */ @@ -381,9 +383,9 @@ MergeTrees is different in two ways: - it may be replicated and non-replicated; - it may do different actions on merge: nothing; sign collapse; sum; apply aggregete functions. -So we have 8 combinations: - MergeTree, CollapsingMergeTree, SummingMergeTree, AggregatingMergeTree, - ReplicatedMergeTree, ReplicatedCollapsingMergeTree, ReplicatedSummingMergeTree, ReplicatedAggregatingMergeTree. +So we have 12 combinations: + MergeTree, CollapsingMergeTree, SummingMergeTree, AggregatingMergeTree, ReplacingMergeTree, UnsortedMergeTree + ReplicatedMergeTree, ReplicatedCollapsingMergeTree, ReplicatedSummingMergeTree, ReplicatedAggregatingMergeTree, ReplicatedReplacingMergeTree, ReplicatedUnsortedMergeTree In most of cases, you need MergeTree or ReplicatedMergeTree. @@ -416,6 +418,8 @@ For Collapsing mode, last parameter is name of sign column - special column that For Summing mode, last parameter is optional list of columns to sum while merge. List is passed in round brackets, like (PageViews, Cost). If this parameter is omitted, storage will sum all numeric columns except columns participated in primary key. +For Replacing mode, last parameter is optional name of 'version' column. While merging, for all rows with same primary key, only one row is selected: last row, if version column was not specified, or last row with maximum version value, if specified. + Examples: @@ -452,6 +456,8 @@ For further info please read the documentation: https://clickhouse.yandex-team.r merging_params.mode = MergeTreeData::MergingParams::Aggregating; else if (name_part == "Unsorted") merging_params.mode = MergeTreeData::MergingParams::Unsorted; + else if (name_part == "Replacing") + merging_params.mode = MergeTreeData::MergingParams::Replacing; else if (!name_part.empty()) throw Exception("Unknown storage " + name + verbose_help, ErrorCodes::UNKNOWN_STORAGE); @@ -485,6 +491,7 @@ For further info please read the documentation: https://clickhouse.yandex-team.r } if (merging_params.mode != MergeTreeData::MergingParams::Summing + && merging_params.mode != MergeTreeData::MergingParams::Replacing && merging_params.mode != MergeTreeData::MergingParams::Unsorted && args.size() != num_additional_params + 3 && args.size() != num_additional_params + 4) @@ -511,7 +518,8 @@ For further info please read the documentation: https://clickhouse.yandex-team.r ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); } - if (merging_params.mode == MergeTreeData::MergingParams::Summing + if ((merging_params.mode == MergeTreeData::MergingParams::Summing + || merging_params.mode == MergeTreeData::MergingParams::Replacing) && args.size() != num_additional_params + 3 && args.size() != num_additional_params + 4 && args.size() != num_additional_params + 5) @@ -527,8 +535,12 @@ For further info please read the documentation: https://clickhouse.yandex-team.r "\nname of column with date," "\n[sampling element of primary key]," "\nprimary key expression," - "\nindex granularity," - "\n[list of columns to sum]\n"; + "\nindex granularity,"; + + if (merging_params.mode == MergeTreeData::MergingParams::Summing) + params += "\n[list of columns to sum]\n"; + else + params += "\n[version]\n"; throw Exception("Storage " + name + " requires " + toString(num_additional_params + 3) + " or " @@ -576,6 +588,19 @@ For further info please read the documentation: https://clickhouse.yandex-team.r args.pop_back(); } + else if (merging_params.mode == MergeTreeData::MergingParams::Replacing) + { + /// Если последний элемент - не index granularity (литерал), то это - имя столбца-версии. + if (!typeid_cast(&*args.back())) + { + if (auto ast = typeid_cast(&*args.back())) + merging_params.version_column = ast->name; + else + throw Exception(String("Version column name must be an unquoted string") + verbose_help, ErrorCodes::BAD_ARGUMENTS); + + args.pop_back(); + } + } else if (merging_params.mode == MergeTreeData::MergingParams::Summing) { /// Если последний элемент - не index granularity (литерал), то это - список суммируемых столбцов.