Added merge_tree.enable_vertical_merge_algorithm setting.

This commit is contained in:
Vitaliy Lyudvichenko 2016-11-23 14:57:56 +03:00
parent 588add5a49
commit ef593d1b01
5 changed files with 16 additions and 6 deletions

View File

@ -52,7 +52,7 @@ struct SortCursorImpl
* Order is determined by number of cursor.
*
* Cursor number (always?) equals to number of merging part.
* Therefore this filed can be used to determine part number of current row (see ColumnGathererStream).
* Therefore this field can be used to determine part number of current row (see ColumnGathererStream).
*/
size_t order;

View File

@ -11,8 +11,11 @@ namespace DB
struct __attribute__((__packed__)) RowSourcePart
{
unsigned int flag: 1;
unsigned int source_id: 7;
/// Sequence of members is important to use RowSourcePart * as UInt8 * if flag = false
UInt8 source_id: 7;
UInt8 flag: 1;
RowSourcePart() = default;
RowSourcePart(unsigned source_id_, bool flag_ = false)
{

View File

@ -90,6 +90,9 @@ struct MergeTreeSettings
/// Minimal absolute delay to close, stop serving requests and not return Ok during status check.
size_t min_absolute_delay_to_close = 0;
/// Enable usage of Vertical merge algorithm.
size_t enable_vertical_merge_algorithm = 0;
void loadFromConfig(const String & config_elem, Poco::Util::AbstractConfiguration & config)
{
@ -124,6 +127,7 @@ struct MergeTreeSettings
SET_SIZE_T(min_relative_delay_to_yield_leadership);
SET_SIZE_T(min_relative_delay_to_close);
SET_SIZE_T(min_absolute_delay_to_close);
SET_SIZE_T(enable_vertical_merge_algorithm);
#undef SET_SIZE_T
#undef SET_DOUBLE

View File

@ -482,6 +482,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
MergedRowSources merged_rows_sources;
MergeAlgorithm merge_alg = chooseMergeAlgorithm(data, parts, sum_input_rows_upper_bound, merged_rows_sources);
LOG_DEBUG(log, "Selected MergeAlgorithm: " << ((merge_alg == MergeAlgorithm::Vertical) ? "Vertical" : "Horizontal"));
MergedRowSources * merged_rows_sources_ptr = (merge_alg == MergeAlgorithm::Vertical)
? &merged_rows_sources : nullptr;
Names & main_column_names = (merge_alg == MergeAlgorithm::Vertical)
@ -679,6 +681,9 @@ MergeTreeDataMerger::MergeAlgorithm MergeTreeDataMerger::chooseMergeAlgorithm(
const MergeTreeData & data, const MergeTreeData::DataPartsVector & parts,
size_t sum_rows_upper_bound, MergedRowSources & rows_sources_to_alloc) const
{
if (data.context.getMergeTreeSettings().enable_vertical_merge_algorithm == 0)
return MergeAlgorithm::Horizontal;
bool is_supported_storage =
data.merging_params.mode == MergeTreeData::MergingParams::Ordinary ||
data.merging_params.mode == MergeTreeData::MergingParams::Collapsing;
@ -705,8 +710,6 @@ MergeTreeDataMerger::MergeAlgorithm MergeTreeDataMerger::chooseMergeAlgorithm(
}
}
LOG_DEBUG(log, "Selected MergeAlgorithm: " << ((merge_alg == MergeAlgorithm::Vertical) ? "Vertical" : "Basic"));
return merge_alg;
}

View File

@ -70,7 +70,7 @@ BlockInputStreams StorageSystemMerges::read(
col_database.column->insert(merge.database);
col_table.column->insert(merge.table);
col_elapsed.column->insert(merge.watch.elapsedSeconds());
col_progress.column->insert(merge.progress);
col_progress.column->insert(std::min(1., merge.progress)); /// little cheat
col_num_parts.column->insert(merge.num_parts);
col_result_part_name.column->insert(merge.result_part_name);
col_total_size_bytes_compressed.column->insert(merge.total_size_bytes_compressed);