add support DEDUPLICATE to MergeTreeDataMerger

This commit is contained in:
Yuri Dyachenko 2017-04-11 19:08:02 +03:00 committed by alexey-milovidov
parent f9496e590f
commit 5e61448f25
2 changed files with 13 additions and 7 deletions

View File

@ -7,6 +7,7 @@
#include <Storages/MergeTree/SimpleMergeSelector.h> #include <Storages/MergeTree/SimpleMergeSelector.h>
#include <Storages/MergeTree/AllMergeSelector.h> #include <Storages/MergeTree/AllMergeSelector.h>
#include <Storages/MergeTree/MergeList.h> #include <Storages/MergeTree/MergeList.h>
#include <DataStreams/DistinctSortedBlockInputStream.h>
#include <DataStreams/ExpressionBlockInputStream.h> #include <DataStreams/ExpressionBlockInputStream.h>
#include <DataStreams/MergingSortedBlockInputStream.h> #include <DataStreams/MergingSortedBlockInputStream.h>
#include <DataStreams/CollapsingSortedBlockInputStream.h> #include <DataStreams/CollapsingSortedBlockInputStream.h>
@ -469,7 +470,7 @@ public:
/// parts should be sorted. /// parts should be sorted.
MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart( MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart(
MergeTreeData::DataPartsVector & parts, const String & merged_name, MergeList::Entry & merge_entry, MergeTreeData::DataPartsVector & parts, const String & merged_name, MergeList::Entry & merge_entry,
size_t aio_threshold, time_t time_of_merge, DiskSpaceMonitor::Reservation * disk_reservation) size_t aio_threshold, time_t time_of_merge, DiskSpaceMonitor::Reservation * disk_reservation, bool deduplicate)
{ {
if (isCancelled()) if (isCancelled())
throw Exception("Cancelled merging parts", ErrorCodes::ABORTED); throw Exception("Cancelled merging parts", ErrorCodes::ABORTED);
@ -496,7 +497,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
Names all_column_names = data.getColumnNamesList(); Names all_column_names = data.getColumnNamesList();
NamesAndTypesList all_columns = data.getColumnsList(); NamesAndTypesList all_columns = data.getColumnsList();
SortDescription sort_desc = data.getSortDescription(); const SortDescription sort_desc = data.getSortDescription();
NamesAndTypesList gathering_columns, merging_columns; NamesAndTypesList gathering_columns, merging_columns;
Names gathering_column_names, merging_column_names; Names gathering_column_names, merging_column_names;
@ -512,7 +513,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
MergedRowSources merged_rows_sources; MergedRowSources merged_rows_sources;
MergedRowSources * merged_rows_sources_ptr = &merged_rows_sources; MergedRowSources * merged_rows_sources_ptr = &merged_rows_sources;
MergeAlgorithm merge_alg = chooseMergeAlgorithm(data, parts, sum_input_rows_upper_bound, gathering_columns, merged_rows_sources); MergeAlgorithm merge_alg = chooseMergeAlgorithm(data, parts, sum_input_rows_upper_bound, gathering_columns, merged_rows_sources, deduplicate);
LOG_DEBUG(log, "Selected MergeAlgorithm: " << ((merge_alg == MergeAlgorithm::Vertical) ? "Vertical" : "Horizontal")); LOG_DEBUG(log, "Selected MergeAlgorithm: " << ((merge_alg == MergeAlgorithm::Vertical) ? "Vertical" : "Horizontal"));
@ -552,7 +553,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
/// The order of the threads is important: when the key is matched, the elements go in the order of the source stream number. /// The order of the threads is important: when the key is matched, the elements go in the order of the source stream number.
/// In the merged part, the lines with the same key must be in the ascending order of the identifier of original part, /// In the merged part, the lines with the same key must be in the ascending order of the identifier of original part,
/// that is (approximately) increasing insertion time. /// that is (approximately) increasing insertion time.
std::unique_ptr<IProfilingBlockInputStream> merged_stream; std::shared_ptr<IProfilingBlockInputStream> merged_stream;
switch (data.merging_params.mode) switch (data.merging_params.mode)
{ {
@ -595,6 +596,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
throw Exception("Unknown mode of operation for MergeTreeData: " + toString<int>(data.merging_params.mode), ErrorCodes::LOGICAL_ERROR); throw Exception("Unknown mode of operation for MergeTreeData: " + toString<int>(data.merging_params.mode), ErrorCodes::LOGICAL_ERROR);
} }
if (deduplicate && merged_stream->isGroupedOutput())
merged_stream = std::make_shared<DistinctSortedBlockInputStream>(merged_stream, Limits(), 0 /*limit_hint*/, Names());
String new_part_tmp_path = data.getFullPath() + "tmp_" + merged_name + "/"; String new_part_tmp_path = data.getFullPath() + "tmp_" + merged_name + "/";
auto compression_method = data.context.chooseCompressionMethod( auto compression_method = data.context.chooseCompressionMethod(
@ -739,8 +743,10 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
MergeTreeDataMerger::MergeAlgorithm MergeTreeDataMerger::chooseMergeAlgorithm( MergeTreeDataMerger::MergeAlgorithm MergeTreeDataMerger::chooseMergeAlgorithm(
const MergeTreeData & data, const MergeTreeData::DataPartsVector & parts, size_t sum_rows_upper_bound, const MergeTreeData & data, const MergeTreeData::DataPartsVector & parts, size_t sum_rows_upper_bound,
const NamesAndTypesList & gathering_columns, MergedRowSources & rows_sources_to_alloc) const const NamesAndTypesList & gathering_columns, MergedRowSources & rows_sources_to_alloc, bool deduplicate) const
{ {
if (deduplicate)
return MergeAlgorithm::Horizontal;
if (data.context.getMergeTreeSettings().enable_vertical_merge_algorithm == 0) if (data.context.getMergeTreeSettings().enable_vertical_merge_algorithm == 0)
return MergeAlgorithm::Horizontal; return MergeAlgorithm::Horizontal;

View File

@ -73,7 +73,7 @@ public:
*/ */
MergeTreeData::MutableDataPartPtr mergePartsToTemporaryPart( MergeTreeData::MutableDataPartPtr mergePartsToTemporaryPart(
MergeTreeData::DataPartsVector & parts, const String & merged_name, MergeListEntry & merge_entry, MergeTreeData::DataPartsVector & parts, const String & merged_name, MergeListEntry & merge_entry,
size_t aio_threshold, time_t time_of_merge, DiskSpaceMonitor::Reservation * disk_reservation = nullptr); size_t aio_threshold, time_t time_of_merge, DiskSpaceMonitor::Reservation * disk_reservation = nullptr, bool deduplication = true); ///yurial!!!! deduplication = false
MergeTreeData::DataPartPtr renameMergedTemporaryPart( MergeTreeData::DataPartPtr renameMergedTemporaryPart(
MergeTreeData::DataPartsVector & parts, MergeTreeData::DataPartsVector & parts,
@ -137,7 +137,7 @@ public:
private: private:
MergeAlgorithm chooseMergeAlgorithm(const MergeTreeData & data, const MergeTreeData::DataPartsVector & parts, MergeAlgorithm chooseMergeAlgorithm(const MergeTreeData & data, const MergeTreeData::DataPartsVector & parts,
size_t rows_upper_bound, const NamesAndTypesList & gathering_columns, MergedRowSources & rows_sources_to_alloc) const; size_t rows_upper_bound, const NamesAndTypesList & gathering_columns, MergedRowSources & rows_sources_to_alloc, bool deduplicate) const;
private: private:
MergeTreeData & data; MergeTreeData & data;