Fix fake race condition on system.merges merge_algorithm

This commit is contained in:
alesapin 2020-10-27 18:27:12 +03:00
parent 9ec78855cd
commit 60f2d822d7
4 changed files with 13 additions and 12 deletions

View File

@ -68,7 +68,7 @@ MergeInfo MergeListElement::getInfo() const
res.memory_usage = memory_tracker.get();
res.thread_id = thread_id;
res.merge_type = toString(merge_type);
res.merge_algorithm = toString(merge_algorithm);
res.merge_algorithm = toString(merge_algorithm.load(std::memory_order_relaxed));
for (const auto & source_part_name : source_part_names)
res.source_part_names.emplace_back(source_part_name);

View File

@ -92,7 +92,8 @@ struct MergeListElement : boost::noncopyable
UInt64 thread_id;
MergeType merge_type;
MergeAlgorithm merge_algorithm;
/// Detected after merge already started
std::atomic<MergeAlgorithm> merge_algorithm;
MergeListElement(const std::string & database, const std::string & table, const FutureMergedMutatedPart & future_part);

View File

@ -710,10 +710,10 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
size_t sum_input_rows_upper_bound = merge_entry->total_rows_count;
size_t sum_compressed_bytes_upper_bound = merge_entry->total_size_bytes_compressed;
MergeAlgorithm merge_alg = chooseMergeAlgorithm(parts, sum_input_rows_upper_bound, gathering_columns, deduplicate, need_remove_expired_values);
merge_entry->merge_algorithm = merge_alg;
MergeAlgorithm chosen_merge_algorithm = chooseMergeAlgorithm(parts, sum_input_rows_upper_bound, gathering_columns, deduplicate, need_remove_expired_values);
merge_entry->merge_algorithm.store(chosen_merge_algorithm, std::memory_order_relaxed);
LOG_DEBUG(log, "Selected MergeAlgorithm: {}", toString(merge_alg));
LOG_DEBUG(log, "Selected MergeAlgorithm: {}", toString(chosen_merge_algorithm));
/// Note: this is done before creating input streams, because otherwise data.data_parts_mutex
/// (which is locked in data.getTotalActiveSizeInBytes())
@ -728,7 +728,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
std::unique_ptr<WriteBuffer> rows_sources_write_buf;
std::optional<ColumnSizeEstimator> column_sizes;
if (merge_alg == MergeAlgorithm::Vertical)
if (chosen_merge_algorithm == MergeAlgorithm::Vertical)
{
tmp_disk->createDirectories(new_part_tmp_path);
rows_sources_file_path = new_part_tmp_path + "rows_sources";
@ -818,7 +818,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
ProcessorPtr merged_transform;
/// If merge is vertical we cannot calculate it
bool blocks_are_granules_size = (merge_alg == MergeAlgorithm::Vertical);
bool blocks_are_granules_size = (chosen_merge_algorithm == MergeAlgorithm::Vertical);
UInt64 merge_block_size = data_settings->merge_max_block_size;
switch (data.merging_params.mode)
@ -917,7 +917,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
{
/// The same progress from merge_entry could be used for both algorithms (it should be more accurate)
/// But now we are using inaccurate row-based estimation in Horizontal case for backward compatibility
Float64 progress = (merge_alg == MergeAlgorithm::Horizontal)
Float64 progress = (chosen_merge_algorithm == MergeAlgorithm::Horizontal)
? std::min(1., 1. * rows_written / sum_input_rows_upper_bound)
: std::min(1., merge_entry->progress.load(std::memory_order_relaxed));
@ -938,7 +938,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
MergeTreeData::DataPart::Checksums checksums_gathered_columns;
/// Gather ordinary columns
if (merge_alg == MergeAlgorithm::Vertical)
if (chosen_merge_algorithm == MergeAlgorithm::Vertical)
{
size_t sum_input_rows_exact = merge_entry->rows_read;
merge_entry->columns_written = merging_column_names.size();
@ -1054,7 +1054,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
ReadableSize(merge_entry->bytes_read_uncompressed / elapsed_seconds));
}
if (merge_alg != MergeAlgorithm::Vertical)
if (chosen_merge_algorithm != MergeAlgorithm::Vertical)
to.writeSuffixAndFinalizePart(new_data_part, need_sync);
else
to.writeSuffixAndFinalizePart(new_data_part, need_sync, &storage_columns, &checksums_gathered_columns);

View File

@ -5,8 +5,8 @@ from helpers.cluster import ClickHouseCluster
from helpers.test_tools import assert_eq_with_retry
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/fast_background_pool.xml'], with_zookeeper=True)
node2 = cluster.add_instance('node2', main_configs=['configs/fast_background_pool.xml'], with_zookeeper=True)
node1 = cluster.add_instance('node1', main_configs=['configs/fast_background_pool.xml', 'configs/log_conf.xml'], with_zookeeper=True)
node2 = cluster.add_instance('node2', main_configs=['configs/fast_background_pool.xml', 'configs/log_conf.xml'], with_zookeeper=True)
@pytest.fixture(scope="module")