diff --git a/dbms/src/Storages/MergeTree/DataPartsExchange.cpp b/dbms/src/Storages/MergeTree/DataPartsExchange.cpp index f01b384d441..795cc68f1ea 100644 --- a/dbms/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/dbms/src/Storages/MergeTree/DataPartsExchange.cpp @@ -54,14 +54,15 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo throw Exception("Transferring part to replica was cancelled", ErrorCodes::ABORTED); String part_name = params.get("part"); + const auto data_settings = data.getCOWSettings(); /// Validation of the input that may come from malicious replica. MergeTreePartInfo::fromPartName(part_name, data.format_version); static std::atomic_uint total_sends {0}; - if ((data.settings.replicated_max_parallel_sends && total_sends >= data.settings.replicated_max_parallel_sends) - || (data.settings.replicated_max_parallel_sends_for_table && data.current_table_sends >= data.settings.replicated_max_parallel_sends_for_table)) + if ((data_settings->replicated_max_parallel_sends && total_sends >= data_settings->replicated_max_parallel_sends) + || (data_settings->replicated_max_parallel_sends_for_table && data.current_table_sends >= data_settings->replicated_max_parallel_sends_for_table)) { response.setStatus(std::to_string(HTTP_TOO_MANY_REQUESTS)); response.setReason("Too many concurrent fetches, try again later"); @@ -174,6 +175,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart( { /// Validation of the input that may come from malicious replica. MergeTreePartInfo::fromPartName(part_name, data.format_version); + const auto data_settings = data.getCOWSettings(); Poco::URI uri; uri.setScheme(interserver_scheme); @@ -200,7 +202,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart( timeouts, creds, DBMS_DEFAULT_BUFFER_SIZE, - data.settings.replicated_max_parallel_fetches_for_host + data_settings->replicated_max_parallel_fetches_for_host }; static const String TMP_PREFIX = "tmp_fetch_"; diff --git a/dbms/src/Storages/MergeTree/IMergedBlockOutputStream.cpp b/dbms/src/Storages/MergeTree/IMergedBlockOutputStream.cpp index 2bd0ebb61ea..a20b03cc7b0 100644 --- a/dbms/src/Storages/MergeTree/IMergedBlockOutputStream.cpp +++ b/dbms/src/Storages/MergeTree/IMergedBlockOutputStream.cpp @@ -32,7 +32,7 @@ IMergedBlockOutputStream::IMergedBlockOutputStream( , index_granularity(index_granularity_) , compute_granularity(index_granularity.empty()) , codec(std::move(codec_)) - , with_final_mark(storage.settings.write_final_mark && storage.canUseAdaptiveGranularity()) + , with_final_mark(storage.getCOWSettings()->write_final_mark && storage.canUseAdaptiveGranularity()) { if (blocks_are_granules_size && !index_granularity.empty()) throw Exception("Can't take information about index granularity from blocks, when non empty index_granularity array specified", ErrorCodes::LOGICAL_ERROR); @@ -133,10 +133,11 @@ void fillIndexGranularityImpl( void IMergedBlockOutputStream::fillIndexGranularity(const Block & block) { + const auto storage_settings = storage.getCOWSettings(); fillIndexGranularityImpl( block, - storage.settings.index_granularity_bytes, - storage.settings.index_granularity, + storage_settings->index_granularity_bytes, + storage_settings->index_granularity, blocks_are_granules_size, index_offset, index_granularity, diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index f8ff2ab87b1..4d47f60a2fe 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -110,7 +110,6 @@ MergeTreeData::MergeTreeData( BrokenPartCallback broken_part_callback_) : global_context(context_), merging_params(merging_params_), - settings(settings_), partition_by_ast(partition_by_ast_), sample_by_ast(sample_by_ast_), ttl_table_ast(ttl_table_ast_), @@ -119,9 +118,11 @@ MergeTreeData::MergeTreeData( full_path(full_path_), broken_part_callback(broken_part_callback_), log_name(database_name + "." + table_name), log(&Logger::get(log_name)), + guarded_settings(std::move(settings_)), data_parts_by_info(data_parts_indexes.get()), data_parts_by_state_and_info(data_parts_indexes.get()) { + const auto settings = getCOWSettings(); setPrimaryKeyIndicesAndColumns(order_by_ast_, primary_key_ast_, columns_, indices_); /// NOTE: using the same columns list as is read when performing actual merges. @@ -725,7 +726,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) { LOG_DEBUG(log, "Loading data parts"); - auto settings_ptr = getImmutableSettings(); + const auto settings = getCOWSettings(); Strings part_file_names; Poco::DirectoryIterator end; for (Poco::DirectoryIterator it(full_path); it != end; ++it) @@ -747,7 +748,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) } /// Parallel loading of data parts. - size_t num_threads = std::min(size_t(settings.max_part_loading_threads), part_file_names.size()); + size_t num_threads = std::min(size_t(settings->max_part_loading_threads), part_file_names.size()); std::mutex mutex; @@ -866,12 +867,12 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) pool.wait(); - if (has_non_adaptive_parts && has_adaptive_parts && !settings_ptr->enable_mixed_granularity_parts) + if (has_non_adaptive_parts && has_adaptive_parts && !settings->enable_mixed_granularity_parts) throw Exception("Table contains parts with adaptive and non adaptive marks, but `setting enable_mixed_granularity_parts` is disabled", ErrorCodes::LOGICAL_ERROR); has_non_adaptive_index_granularity_parts = has_non_adaptive_parts; - if (suspicious_broken_parts > settings_ptr->max_suspicious_broken_parts && !skip_sanity_checks) + if (suspicious_broken_parts > settings->max_suspicious_broken_parts && !skip_sanity_checks) throw Exception("Suspiciously many (" + toString(suspicious_broken_parts) + ") broken parts to remove.", ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS); @@ -958,10 +959,11 @@ void MergeTreeData::clearOldTemporaryDirectories(ssize_t custom_directories_life if (!lock.try_lock()) return; + const auto settings = getCOWSettings(); time_t current_time = time(nullptr); ssize_t deadline = (custom_directories_lifetime_seconds >= 0) ? current_time - custom_directories_lifetime_seconds - : current_time - settings_ptr->temporary_directories_lifetime.totalSeconds(); + : current_time - settings->temporary_directories_lifetime.totalSeconds(); /// Delete temporary directories older than a day. Poco::DirectoryIterator end; @@ -1012,7 +1014,7 @@ MergeTreeData::DataPartsVector MergeTreeData::grabOldParts() if (part.unique() && /// Grab only parts that are not used by anyone (SELECTs for example). part_remove_time < now && - now - part_remove_time > getSettings()->old_parts_lifetime.totalSeconds()) + now - part_remove_time > getCOWSettings()->old_parts_lifetime.totalSeconds()) { parts_to_delete.emplace_back(it); } @@ -1096,11 +1098,12 @@ void MergeTreeData::clearOldPartsFromFilesystem() void MergeTreeData::clearPartsFromFilesystem(const DataPartsVector & parts_to_remove) { - if (parts_to_remove.size() > 1 && settings.max_part_removal_threads > 1 && parts_to_remove.size() > settings.concurrent_part_removal_threshold) + const auto settings = getCOWSettings(); + if (parts_to_remove.size() > 1 && settings->max_part_removal_threads > 1 && parts_to_remove.size() > settings->concurrent_part_removal_threshold) { /// Parallel parts removal. - size_t num_threads = std::min(size_t(settings.max_part_removal_threads), parts_to_remove.size()); + size_t num_threads = std::min(size_t(settings->max_part_removal_threads), parts_to_remove.size()); ThreadPool pool(num_threads); /// NOTE: Under heavy system load you may get "Cannot schedule a task" from ThreadPool. @@ -1332,6 +1335,7 @@ void MergeTreeData::createConvertExpression(const DataPartPtr & part, const Name const IndicesASTs & old_indices, const IndicesASTs & new_indices, ExpressionActionsPtr & out_expression, NameToNameMap & out_rename_map, bool & out_force_update_metadata) const { + const auto settings = getCOWSettings(); out_expression = nullptr; out_rename_map = {}; out_force_update_metadata = false; @@ -1339,7 +1343,7 @@ void MergeTreeData::createConvertExpression(const DataPartPtr & part, const Name if (part) part_mrk_file_extension = part->index_granularity_info.marks_file_extension; else - part_mrk_file_extension = mutable_settings->index_granularity_bytes == 0 ? getNonAdaptiveMrkExtension() : getAdaptiveMrkExtension(); + part_mrk_file_extension = settings->index_granularity_bytes == 0 ? getNonAdaptiveMrkExtension() : getAdaptiveMrkExtension(); using NameToType = std::map; NameToType new_types; @@ -1497,7 +1501,7 @@ void MergeTreeData::alterDataPart( bool skip_sanity_checks, AlterDataPartTransactionPtr & transaction) { - auto settings = getImmutableSettings(); + const auto settings = getCOWSettings(); ExpressionActionsPtr expression; const auto & part = transaction->getDataPart(); bool force_update_metadata; @@ -1641,10 +1645,10 @@ void MergeTreeData::alterSettings( TableStructureWriteLockHolder & table_lock_holder) { { - MutableMergeTreeSettingsPtr settings = std::move(*mutable_settings).mutate(); + MutableMergeTreeSettingsPtr settings = std::move(*guarded_settings.getPtr()).mutate(); settings->updateFromChanges(new_changes); IStorage::alterSettings(new_changes, current_database_name, current_table_name, context, table_lock_holder); - mutable_settings = std::move(settings); + guarded_settings.setPtr(std::move(settings)); } } @@ -2285,27 +2289,27 @@ std::optional MergeTreeData::getMinPartDataVersion() const void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until) const { - auto settings_ptr = getImmutableSettings(); + const auto settings = getCOWSettings(); const size_t parts_count_in_total = getPartsCount(); - if (parts_count_in_total >= settings_ptr->max_parts_in_total) + if (parts_count_in_total >= settings->max_parts_in_total) { ProfileEvents::increment(ProfileEvents::RejectedInserts); throw Exception("Too many parts (" + toString(parts_count_in_total) + ") in all partitions in total. This indicates wrong choice of partition key. The threshold can be modified with 'max_parts_in_total' setting in element in config.xml or with per-table setting.", ErrorCodes::TOO_MANY_PARTS); } const size_t parts_count_in_partition = getMaxPartsCountForPartition(); - if (parts_count_in_partition < settings_ptr->parts_to_delay_insert) + if (parts_count_in_partition < settings->parts_to_delay_insert) return; - if (parts_count_in_partition >= settings_ptr->parts_to_throw_insert) + if (parts_count_in_partition >= settings->parts_to_throw_insert) { ProfileEvents::increment(ProfileEvents::RejectedInserts); throw Exception("Too many parts (" + toString(parts_count_in_partition) + "). Merges are processing significantly slower than inserts.", ErrorCodes::TOO_MANY_PARTS); } - const size_t max_k = settings_ptr->parts_to_throw_insert - settings_ptr->parts_to_delay_insert; /// always > 0 - const size_t k = 1 + parts_count_in_partition - settings_ptr->parts_to_delay_insert; /// from 1 to max_k - const double delay_milliseconds = ::pow(settings_ptr->max_delay_to_insert * 1000, static_cast(k) / max_k); + const size_t max_k = settings->parts_to_throw_insert - settings->parts_to_delay_insert; /// always > 0 + const size_t k = 1 + parts_count_in_partition - settings->parts_to_delay_insert; /// from 1 to max_k + const double delay_milliseconds = ::pow(settings->max_delay_to_insert * 1000, static_cast(k) / max_k); ProfileEvents::increment(ProfileEvents::DelayedInserts); ProfileEvents::increment(ProfileEvents::DelayedInsertsMilliseconds, delay_milliseconds); @@ -2323,9 +2327,9 @@ void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until) const void MergeTreeData::throwInsertIfNeeded() const { - auto settings_ptr = getImmutableSettings(); + const auto settings = getCOWSettings(); const size_t parts_count_in_total = getPartsCount(); - if (parts_count_in_total >= settings_ptr->max_parts_in_total) + if (parts_count_in_total >= settings->max_parts_in_total) { ProfileEvents::increment(ProfileEvents::RejectedInserts); throw Exception("Too many parts (" + toString(parts_count_in_total) + ") in all partitions in total. This indicates wrong choice of partition key. The threshold can be modified with 'max_parts_in_total' setting in element in config.xml or with per-table setting.", ErrorCodes::TOO_MANY_PARTS); @@ -2333,7 +2337,7 @@ void MergeTreeData::throwInsertIfNeeded() const const size_t parts_count_in_partition = getMaxPartsCountForPartition(); - if (parts_count_in_partition >= settings_ptr->parts_to_throw_insert) + if (parts_count_in_partition >= settings->parts_to_throw_insert) { ProfileEvents::increment(ProfileEvents::RejectedInserts); throw Exception("Too many parts (" + toString(parts_count_in_partition) + "). Merges are processing significantly slower than inserts.", ErrorCodes::TOO_MANY_PARTS); @@ -2916,9 +2920,9 @@ void MergeTreeData::freezePartitionsByMatcher(MatcherFn matcher, const String & bool MergeTreeData::canReplacePartition(const DataPartPtr & src_part) const { - auto settings_ptr = getImmutableSettings(); + const auto settings = getCOWSettings(); - if (!settings_ptr->enable_mixed_granularity_parts || settings_ptr->index_granularity_bytes == 0) + if (!settings->enable_mixed_granularity_parts || settings->index_granularity_bytes == 0) { if (!canUseAdaptiveGranularity() && src_part->index_granularity_info.is_adaptive) return false; diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.h b/dbms/src/Storages/MergeTree/MergeTreeData.h index c07e2fc34f4..b94555477f9 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.h +++ b/dbms/src/Storages/MergeTree/MergeTreeData.h @@ -594,9 +594,9 @@ public: /// Has additional constraint in replicated version virtual bool canUseAdaptiveGranularity() const { - auto settings_ptr = getImmutableSettings(); - return settings_ptr->index_granularity_bytes != 0 && - (settings_ptr->enable_mixed_granularity_parts || !has_non_adaptive_index_granularity_parts); + const auto settings = getCOWSettings(); + return settings->index_granularity_bytes != 0 && + (settings->enable_mixed_granularity_parts || !has_non_adaptive_index_granularity_parts); } @@ -657,9 +657,12 @@ public: bool has_non_adaptive_index_granularity_parts = false; - MergeTreeSettingsPtr getImmutableSettings() const + /// Get copy-on-write pointer to storage settings. + /// Copy this pointer into your scope and you will + /// get consistent settings. + const MergeTreeSettingsPtr getCOWSettings() const { - return mutable_settings; + return guarded_settings.copyPtr(); } protected: @@ -691,9 +694,26 @@ protected: String log_name; Logger * log; - /// Settings COW pointer. Data maybe changed at any point of time. - /// If you need consistent settings, just copy pointer to your scope. - MergeTreeSettingsPtr mutable_settings; + /// Just hides settings pointer from direct usage + class MergeTreeSettingsGuard + { + private: + /// Settings COW pointer. Data maybe changed at any point of time. + /// If you need consistent settings, just copy pointer to your scope. + MergeTreeSettingsPtr settings_ptr; + public: + MergeTreeSettingsGuard(MergeTreeSettingsPtr settings_ptr_) + : settings_ptr(settings_ptr_) + {} + + const MergeTreeSettingsPtr copyPtr() const { return settings_ptr; } + MergeTreeSettingsPtr & getPtr() { return settings_ptr; } + void setPtr(MergeTreeSettingsPtr ptr) { settings_ptr = ptr; } + }; + + /// Storage settings. Don't use this field directly, if you + /// want readonly settings. Prefer getCOWSettings() method. + MergeTreeSettingsGuard guarded_settings; /// Work with data parts diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 3a4d4038317..fad63f35aec 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -141,15 +141,16 @@ UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge(size_t pool_siz throw Exception("Logical error: invalid arguments passed to getMaxSourcePartsSize: pool_used > pool_size", ErrorCodes::LOGICAL_ERROR); size_t free_entries = pool_size - pool_used; + const auto data_settings = data.getCOWSettings(); UInt64 max_size = 0; - if (free_entries >= data.settings.number_of_free_entries_in_pool_to_lower_max_size_of_merge) - max_size = data.settings.max_bytes_to_merge_at_max_space_in_pool; + if (free_entries >= data_settings->number_of_free_entries_in_pool_to_lower_max_size_of_merge) + max_size = data_settings->max_bytes_to_merge_at_max_space_in_pool; else max_size = interpolateExponential( - data.settings.max_bytes_to_merge_at_min_space_in_pool, - data.settings.max_bytes_to_merge_at_max_space_in_pool, - static_cast(free_entries) / data.settings.number_of_free_entries_in_pool_to_lower_max_size_of_merge); + data_settings->max_bytes_to_merge_at_min_space_in_pool, + data_settings->max_bytes_to_merge_at_max_space_in_pool, + static_cast(free_entries) / data_settings->number_of_free_entries_in_pool_to_lower_max_size_of_merge); return std::min(max_size, static_cast(DiskSpaceMonitor::getUnreservedFreeSpace(data.full_path) / DISK_USAGE_COEFFICIENT_TO_SELECT)); } @@ -169,6 +170,7 @@ bool MergeTreeDataMergerMutator::selectPartsToMerge( String * out_disable_reason) { MergeTreeData::DataPartsVector data_parts = data.getDataPartsVector(); + const auto data_settings = data.getCOWSettings(); if (data_parts.empty()) { @@ -223,7 +225,7 @@ bool MergeTreeDataMergerMutator::selectPartsToMerge( merge_settings.base = 1; bool can_merge_with_ttl = - (current_time - last_merge_with_ttl > data.settings.merge_with_ttl_timeout); + (current_time - last_merge_with_ttl > data_settings->merge_with_ttl_timeout); /// NOTE Could allow selection of different merge strategy. if (can_merge_with_ttl && has_part_with_expired_ttl && !ttl_merges_blocker.isCancelled()) @@ -545,6 +547,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor Names all_column_names = data.getColumns().getNamesOfPhysical(); NamesAndTypesList all_columns = data.getColumns().getAllPhysical(); + const auto data_settings = data.getCOWSettings(); NamesAndTypesList gathering_columns, merging_columns; Names gathering_column_names, merging_column_names; @@ -617,13 +620,13 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor /// We count total amount of bytes in parts /// and use direct_io + aio if there is more than min_merge_bytes_to_use_direct_io bool read_with_direct_io = false; - if (data.settings.min_merge_bytes_to_use_direct_io != 0) + if (data_settings->min_merge_bytes_to_use_direct_io != 0) { size_t total_size = 0; for (const auto & part : parts) { total_size += part->bytes_on_disk; - if (total_size >= data.settings.min_merge_bytes_to_use_direct_io) + if (total_size >= data_settings->min_merge_bytes_to_use_direct_io) { LOG_DEBUG(log, "Will merge parts reading files in O_DIRECT"); read_with_direct_io = true; @@ -720,7 +723,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor merging_columns, compression_codec, merged_column_to_size, - data.settings.min_merge_bytes_to_use_direct_io, + data_settings->min_merge_bytes_to_use_direct_io, blocks_are_granules_size}; merged_stream->readPrefix(); @@ -938,6 +941,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor auto in = mutations_interpreter.execute(); NamesAndTypesList all_columns = data.getColumns().getAllPhysical(); + const auto data_settings = data.getCOWSettings(); Block in_header = in->getHeader(); @@ -995,7 +999,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor } NameSet files_to_skip = {"checksums.txt", "columns.txt"}; - auto mrk_extension = data.settings.index_granularity_bytes ? getAdaptiveMrkExtension() : getNonAdaptiveMrkExtension(); + auto mrk_extension = data_settings->index_granularity_bytes ? getAdaptiveMrkExtension() : getNonAdaptiveMrkExtension(); for (const auto & entry : in_header) { IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path) @@ -1092,9 +1096,11 @@ MergeTreeDataMergerMutator::MergeAlgorithm MergeTreeDataMergerMutator::chooseMer const MergeTreeData::DataPartsVector & parts, size_t sum_rows_upper_bound, const NamesAndTypesList & gathering_columns, bool deduplicate, bool need_remove_expired_values) const { + const auto data_settings = data.getCOWSettings(); + if (deduplicate) return MergeAlgorithm::Horizontal; - if (data.settings.enable_vertical_merge_algorithm == 0) + if (data_settings->enable_vertical_merge_algorithm == 0) return MergeAlgorithm::Horizontal; if (need_remove_expired_values) return MergeAlgorithm::Horizontal; @@ -1105,9 +1111,9 @@ MergeTreeDataMergerMutator::MergeAlgorithm MergeTreeDataMergerMutator::chooseMer data.merging_params.mode == MergeTreeData::MergingParams::Replacing || data.merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing; - bool enough_ordinary_cols = gathering_columns.size() >= data.settings.vertical_merge_algorithm_min_columns_to_activate; + bool enough_ordinary_cols = gathering_columns.size() >= data_settings->vertical_merge_algorithm_min_columns_to_activate; - bool enough_total_rows = sum_rows_upper_bound >= data.settings.vertical_merge_algorithm_min_rows_to_activate; + bool enough_total_rows = sum_rows_upper_bound >= data_settings->vertical_merge_algorithm_min_rows_to_activate; bool no_parts_overflow = parts.size() <= RowSourcePart::MAX_PARTS; diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 513a713651b..314967b485d 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -672,6 +672,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams( size_t sum_marks = 0; size_t total_rows = 0; + const auto data_settings = data.getCOWSettings(); size_t adaptive_parts = 0; for (size_t i = 0; i < parts.size(); ++i) { @@ -688,18 +689,18 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams( size_t index_granularity_bytes = 0; if (adaptive_parts > parts.size() / 2) - index_granularity_bytes = data.settings.index_granularity_bytes; + index_granularity_bytes = data_settings->index_granularity_bytes; const size_t max_marks_to_use_cache = roundRowsOrBytesToMarks( settings.merge_tree_max_rows_to_use_cache, settings.merge_tree_max_bytes_to_use_cache, - data.settings.index_granularity, + data_settings->index_granularity, index_granularity_bytes); const size_t min_marks_for_concurrent_read = roundRowsOrBytesToMarks( settings.merge_tree_min_rows_for_concurrent_read, settings.merge_tree_min_bytes_for_concurrent_read, - data.settings.index_granularity, + data_settings->index_granularity, index_granularity_bytes); if (sum_marks > max_marks_to_use_cache) @@ -830,6 +831,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithO SortingInfoPtr sorting_info = query_info.sorting_info; size_t adaptive_parts = 0; std::vector sum_marks_in_parts(parts.size()); + const auto data_settings = data.getCOWSettings(); /// In case of reverse order let's split ranges to avoid reading much data. auto split_ranges = [max_block_size](const auto & ranges, size_t rows_granularity, size_t num_marks_in_part) @@ -862,7 +864,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithO sum_marks += sum_marks_in_parts[i]; if (sorting_info->direction == -1) - parts[i].ranges = split_ranges(parts[i].ranges, data.settings.index_granularity, sum_marks_in_parts[i]); + parts[i].ranges = split_ranges(parts[i].ranges, data_settings->index_granularity, sum_marks_in_parts[i]); /// Let the ranges be listed from right to left so that the leftmost range can be dropped using `pop_back()`. std::reverse(parts[i].ranges.begin(), parts[i].ranges.end()); @@ -873,18 +875,18 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithO size_t index_granularity_bytes = 0; if (adaptive_parts > parts.size() / 2) - index_granularity_bytes = data.settings.index_granularity_bytes; + index_granularity_bytes = data_settings->index_granularity_bytes; const size_t max_marks_to_use_cache = roundRowsOrBytesToMarks( settings.merge_tree_max_rows_to_use_cache, settings.merge_tree_max_bytes_to_use_cache, - data.settings.index_granularity, + data_settings->index_granularity, index_granularity_bytes); const size_t min_marks_for_concurrent_read = roundRowsOrBytesToMarks( settings.merge_tree_min_rows_for_concurrent_read, settings.merge_tree_min_bytes_for_concurrent_read, - data.settings.index_granularity, + data_settings->index_granularity, index_granularity_bytes); if (sum_marks > max_marks_to_use_cache) @@ -1012,6 +1014,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal const Names & virt_columns, const Settings & settings) const { + const auto data_settings = data.getCOWSettings(); size_t sum_marks = 0; size_t adaptive_parts = 0; for (size_t i = 0; i < parts.size(); ++i) @@ -1025,12 +1028,12 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal size_t index_granularity_bytes = 0; if (adaptive_parts >= parts.size() / 2) - index_granularity_bytes = data.settings.index_granularity_bytes; + index_granularity_bytes = data_settings->index_granularity_bytes; const size_t max_marks_to_use_cache = roundRowsOrBytesToMarks( settings.merge_tree_max_rows_to_use_cache, settings.merge_tree_max_bytes_to_use_cache, - data.settings.index_granularity, + data_settings->index_granularity, index_granularity_bytes); if (sum_marks > max_marks_to_use_cache) diff --git a/dbms/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.cpp b/dbms/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.cpp index 63b19da9e64..143af37c10d 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.cpp @@ -25,12 +25,13 @@ std::optional MergeTreeIndexGranularityInfo::getMrkExtensionFromFS( MergeTreeIndexGranularityInfo::MergeTreeIndexGranularityInfo( const MergeTreeData & storage) { - fixed_index_granularity = storage.settings.index_granularity; + const auto storage_settings = storage.getCOWSettings(); + fixed_index_granularity = storage_settings->index_granularity; /// Granularity is fixed if (!storage.canUseAdaptiveGranularity()) setNonAdaptive(); else - setAdaptive(storage.settings.index_granularity_bytes); + setAdaptive(storage_settings->index_granularity_bytes); } diff --git a/dbms/src/Storages/MergeTree/MergeTreeSettings.cpp b/dbms/src/Storages/MergeTree/MergeTreeSettings.cpp index 3b90bf293ba..7c3c7ef02cc 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeSettings.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeSettings.cpp @@ -76,8 +76,8 @@ void MergeTreeSettings::loadFromQuery(ASTStorage & storage_def) } -MergeTreeSettings * MergeTreeSettings::clone() const +MergeTreeSettings::MutablePtr MergeTreeSettings::clone() const { - return new MergeTreeSettings(*this); + return COW::create(*this); } } diff --git a/dbms/src/Storages/MergeTree/MergeTreeSettings.h b/dbms/src/Storages/MergeTree/MergeTreeSettings.h index 480209c740a..dff7cc2e523 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeSettings.h +++ b/dbms/src/Storages/MergeTree/MergeTreeSettings.h @@ -100,7 +100,7 @@ struct MergeTreeSettings : public SettingsCollection, public /// NOTE: will rewrite the AST to add immutable settings. void loadFromQuery(ASTStorage & storage_def); - MergeTreeSettings * clone() const; + MutablePtr clone() const; ~MergeTreeSettings() {} }; diff --git a/dbms/src/Storages/MergeTree/MergeTreeThreadSelectBlockInputStream.cpp b/dbms/src/Storages/MergeTree/MergeTreeThreadSelectBlockInputStream.cpp index 9c34782dec8..7a09bde0998 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeThreadSelectBlockInputStream.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeThreadSelectBlockInputStream.cpp @@ -31,7 +31,7 @@ MergeTreeThreadSelectBlockInputStream::MergeTreeThreadSelectBlockInputStream( /// Maybe it will make sence to add settings `max_block_size_bytes` if (max_block_size_rows && !storage.canUseAdaptiveGranularity()) { - size_t fixed_index_granularity = storage.settings.index_granularity; + size_t fixed_index_granularity = storage.getCOWSettings()->index_granularity; min_marks_to_read = (min_marks_to_read_ * fixed_index_granularity + max_block_size_rows - 1) / max_block_size_rows * max_block_size_rows / fixed_index_granularity; } diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp index 6108704b45a..2b03ed86895 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp @@ -27,8 +27,9 @@ ReplicatedMergeTreeCleanupThread::ReplicatedMergeTreeCleanupThread(StorageReplic void ReplicatedMergeTreeCleanupThread::run() { - const auto CLEANUP_SLEEP_MS = storage.settings.cleanup_delay_period * 1000 - + std::uniform_int_distribution(0, storage.settings.cleanup_delay_period_random_add * 1000)(rng); + auto storage_settings = storage.getCOWSettings(); + const auto CLEANUP_SLEEP_MS = storage_settings->cleanup_delay_period * 1000 + + std::uniform_int_distribution(0, storage_settings->cleanup_delay_period_random_add * 1000)(rng); try { @@ -74,6 +75,7 @@ void ReplicatedMergeTreeCleanupThread::iterate() void ReplicatedMergeTreeCleanupThread::clearOldLogs() { auto zookeeper = storage.getZooKeeper(); + auto storage_settings = storage.getCOWSettings(); Coordination::Stat stat; if (!zookeeper->exists(storage.zookeeper_path + "/log", &stat)) @@ -82,7 +84,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs() int children_count = stat.numChildren; /// We will wait for 1.1 times more records to accumulate than necessary. - if (static_cast(children_count) < storage.settings.min_replicated_logs_to_keep * 1.1) + if (static_cast(children_count) < storage_settings->min_replicated_logs_to_keep * 1.1) return; Strings replicas = zookeeper->getChildren(storage.zookeeper_path + "/replicas", &stat); @@ -100,8 +102,8 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs() std::sort(entries.begin(), entries.end()); String min_saved_record_log_str = entries[ - entries.size() > storage.settings.max_replicated_logs_to_keep - ? entries.size() - storage.settings.max_replicated_logs_to_keep + entries.size() > storage_settings->max_replicated_logs_to_keep + ? entries.size() - storage_settings->max_replicated_logs_to_keep : 0]; /// Replicas that were marked is_lost but are active. @@ -203,7 +205,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs() min_saved_log_pointer = std::min(min_saved_log_pointer, min_log_pointer_lost_candidate); /// We will not touch the last `min_replicated_logs_to_keep` records. - entries.erase(entries.end() - std::min(entries.size(), storage.settings.min_replicated_logs_to_keep), entries.end()); + entries.erase(entries.end() - std::min(entries.size(), storage_settings->min_replicated_logs_to_keep), entries.end()); /// We will not touch records that are no less than `min_saved_log_pointer`. entries.erase(std::lower_bound(entries.begin(), entries.end(), "log-" + padIndex(min_saved_log_pointer)), entries.end()); @@ -285,6 +287,7 @@ struct ReplicatedMergeTreeCleanupThread::NodeWithStat void ReplicatedMergeTreeCleanupThread::clearOldBlocks() { auto zookeeper = storage.getZooKeeper(); + auto storage_settings = storage.getCOWSettings(); std::vector timed_blocks; getBlocksSortedByTime(*zookeeper, timed_blocks); @@ -294,12 +297,12 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks() /// Use ZooKeeper's first node (last according to time) timestamp as "current" time. Int64 current_time = timed_blocks.front().ctime; - Int64 time_threshold = std::max(static_cast(0), current_time - static_cast(1000 * storage.settings.replicated_deduplication_window_seconds)); + Int64 time_threshold = std::max(static_cast(0), current_time - static_cast(1000 * storage_settings->replicated_deduplication_window_seconds)); /// Virtual node, all nodes that are "greater" than this one will be deleted NodeWithStat block_threshold{{}, time_threshold}; - size_t current_deduplication_window = std::min(timed_blocks.size(), storage.settings.replicated_deduplication_window); + size_t current_deduplication_window = std::min(timed_blocks.size(), storage_settings->replicated_deduplication_window); auto first_outdated_block_fixed_threshold = timed_blocks.begin() + current_deduplication_window; auto first_outdated_block_time_threshold = std::upper_bound(timed_blocks.begin(), timed_blocks.end(), block_threshold, NodeWithStat::greaterByTime); auto first_outdated_block = std::min(first_outdated_block_fixed_threshold, first_outdated_block_time_threshold); @@ -401,10 +404,11 @@ void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(zkutil::ZooKeeper & void ReplicatedMergeTreeCleanupThread::clearOldMutations() { - if (!storage.settings.finished_mutations_to_keep) + auto storage_settings = storage.getCOWSettings(); + if (!storage_settings->finished_mutations_to_keep) return; - if (storage.queue.countFinishedMutations() <= storage.settings.finished_mutations_to_keep) + if (storage.queue.countFinishedMutations() <= storage_settings->finished_mutations_to_keep) { /// Not strictly necessary, but helps to avoid unnecessary ZooKeeper requests. /// If even this replica hasn't finished enough mutations yet, then we don't need to clean anything. @@ -431,10 +435,10 @@ void ReplicatedMergeTreeCleanupThread::clearOldMutations() /// Do not remove entries that are greater than `min_pointer` (they are not done yet). entries.erase(std::upper_bound(entries.begin(), entries.end(), padIndex(min_pointer)), entries.end()); - /// Do not remove last `storage.settings.finished_mutations_to_keep` entries. - if (entries.size() <= storage.settings.finished_mutations_to_keep) + /// Do not remove last `storage_settings->finished_mutations_to_keep` entries. + if (entries.size() <= storage_settings->finished_mutations_to_keep) return; - entries.erase(entries.end() - storage.settings.finished_mutations_to_keep, entries.end()); + entries.erase(entries.end() - storage_settings->finished_mutations_to_keep, entries.end()); if (entries.empty()) return; diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 665e8c9bd5c..5d044fbf839 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -961,7 +961,8 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( * (because it may be ordered by OPTIMIZE or early with differrent settings). */ UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSizeForMerge(); - if (max_source_parts_size != data.settings.max_bytes_to_merge_at_max_space_in_pool + const auto data_settings = data.getCOWSettings(); + if (max_source_parts_size != data_settings->max_bytes_to_merge_at_max_space_in_pool && sum_parts_size_in_bytes > max_source_parts_size) { String reason = "Not executing log entry for part " + entry.new_part_name diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp index a98625336c5..6145713492f 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp @@ -44,11 +44,12 @@ ReplicatedMergeTreeRestartingThread::ReplicatedMergeTreeRestartingThread(Storage , log(&Logger::get(log_name)) , active_node_identifier(generateActiveNodeIdentifier()) { - check_period_ms = storage.settings.zookeeper_session_expiration_check_period.totalSeconds() * 1000; + const auto storage_settings = storage.getCOWSettings(); + check_period_ms = storage_settings->zookeeper_session_expiration_check_period.totalSeconds() * 1000; /// Periodicity of checking lag of replica. - if (check_period_ms > static_cast(storage.settings.check_delay_period) * 1000) - check_period_ms = storage.settings.check_delay_period * 1000; + if (check_period_ms > static_cast(storage_settings->check_delay_period) * 1000) + check_period_ms = storage_settings->check_delay_period * 1000; task = storage.global_context.getSchedulePool().createTask(log_name, [this]{ run(); }); } @@ -121,7 +122,8 @@ void ReplicatedMergeTreeRestartingThread::run() } time_t current_time = time(nullptr); - if (current_time >= prev_time_of_check_delay + static_cast(storage.settings.check_delay_period)) + const auto storage_settings = storage.getCOWSettings(); + if (current_time >= prev_time_of_check_delay + static_cast(storage_settings->check_delay_period)) { /// Find out lag of replicas. time_t absolute_delay = 0; @@ -136,10 +138,10 @@ void ReplicatedMergeTreeRestartingThread::run() /// We give up leadership if the relative lag is greater than threshold. if (storage.is_leader - && relative_delay > static_cast(storage.settings.min_relative_delay_to_yield_leadership)) + && relative_delay > static_cast(storage_settings->min_relative_delay_to_yield_leadership)) { LOG_INFO(log, "Relative replica delay (" << relative_delay << " seconds) is bigger than threshold (" - << storage.settings.min_relative_delay_to_yield_leadership << "). Will yield leadership."); + << storage_settings->min_relative_delay_to_yield_leadership << "). Will yield leadership."); ProfileEvents::increment(ProfileEvents::ReplicaYieldLeadership); @@ -169,6 +171,7 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup() activateReplica(); const auto & zookeeper = storage.getZooKeeper(); + const auto storage_settings = storage.getCOWSettings(); storage.cloneReplicaIfNeeded(zookeeper); @@ -181,7 +184,7 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup() updateQuorumIfWeHavePart(); - if (storage.settings.replicated_can_become_leader) + if (storage_settings->replicated_can_become_leader) storage.enterLeaderElection(); else LOG_INFO(log, "Will not enter leader election because replicated_can_become_leader=0"); diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeTableMetadata.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeTableMetadata.cpp index 10f77358b7d..381d1f47412 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeTableMetadata.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeTableMetadata.cpp @@ -27,8 +27,9 @@ ReplicatedMergeTreeTableMetadata::ReplicatedMergeTreeTableMetadata(const MergeTr if (data.format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING) date_column = data.minmax_idx_columns[data.minmax_idx_date_column_pos]; + const auto data_settings = data.getCOWSettings(); sampling_expression = formattedAST(data.sample_by_ast); - index_granularity = data.settings.index_granularity; + index_granularity = data_settings->index_granularity; merging_params_mode = static_cast(data.merging_params.mode); sign_column = data.merging_params.sign_column; @@ -48,7 +49,7 @@ ReplicatedMergeTreeTableMetadata::ReplicatedMergeTreeTableMetadata(const MergeTr ttl_table = formattedAST(data.ttl_table_ast); skip_indices = data.getIndices().toString(); if (data.canUseAdaptiveGranularity()) - index_granularity_bytes = data.settings.index_granularity_bytes; + index_granularity_bytes = data_settings->index_granularity_bytes; else index_granularity_bytes = 0; } diff --git a/dbms/src/Storages/MergeTree/registerStorageMergeTree.cpp b/dbms/src/Storages/MergeTree/registerStorageMergeTree.cpp index 81ead0fd20d..1e9f1a252f4 100644 --- a/dbms/src/Storages/MergeTree/registerStorageMergeTree.cpp +++ b/dbms/src/Storages/MergeTree/registerStorageMergeTree.cpp @@ -641,13 +641,13 @@ static StoragePtr create(const StorageFactory::Arguments & args) zookeeper_path, replica_name, args.attach, args.data_path, args.database_name, args.table_name, args.columns, indices_description, args.context, date_column_name, partition_by_ast, order_by_ast, primary_key_ast, - sample_by_ast, ttl_table_ast, merging_params, storage_settings, + sample_by_ast, ttl_table_ast, merging_params, std::move(storage_settings), args.has_force_restore_data_flag); else return StorageMergeTree::create( args.data_path, args.database_name, args.table_name, args.columns, indices_description, args.attach, args.context, date_column_name, partition_by_ast, order_by_ast, - primary_key_ast, sample_by_ast, ttl_table_ast, merging_params, storage_settings, + primary_key_ast, sample_by_ast, ttl_table_ast, merging_params, std::move(storage_settings), args.has_force_restore_data_flag); } diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index 9ed413157a2..f6975e34dfa 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -62,7 +62,7 @@ StorageMergeTree::StorageMergeTree( const ASTPtr & sample_by_ast_, /// nullptr, if sampling is not supported. const ASTPtr & ttl_table_ast_, const MergingParams & merging_params_, - const MergeTreeSettings & settings_, + MergeTreeSettingsPtr settings_, bool has_force_restore_data_flag) : MergeTreeData(database_name_, table_name_, path_ + escapeForFileName(table_name_) + '/', @@ -801,14 +801,15 @@ Int64 StorageMergeTree::getCurrentMutationVersion( void StorageMergeTree::clearOldMutations() { - if (!settings.finished_mutations_to_keep) + const auto settings = getCOWSettings(); + if (!settings->finished_mutations_to_keep) return; std::vector mutations_to_delete; { std::lock_guard lock(currently_merging_mutex); - if (current_mutations_by_version.size() <= settings.finished_mutations_to_keep) + if (current_mutations_by_version.size() <= settings->finished_mutations_to_keep) return; auto begin_it = current_mutations_by_version.begin(); @@ -819,10 +820,10 @@ void StorageMergeTree::clearOldMutations() end_it = current_mutations_by_version.upper_bound(*min_version); size_t done_count = std::distance(begin_it, end_it); - if (done_count <= settings.finished_mutations_to_keep) + if (done_count <= settings->finished_mutations_to_keep) return; - size_t to_delete_count = done_count - settings.finished_mutations_to_keep; + size_t to_delete_count = done_count - settings->finished_mutations_to_keep; auto it = begin_it; for (size_t i = 0; i < to_delete_count; ++i) diff --git a/dbms/src/Storages/StorageMergeTree.h b/dbms/src/Storages/StorageMergeTree.h index 0df90604f67..556cf7999b8 100644 --- a/dbms/src/Storages/StorageMergeTree.h +++ b/dbms/src/Storages/StorageMergeTree.h @@ -151,7 +151,7 @@ protected: const ASTPtr & sample_by_ast_, /// nullptr, if sampling is not supported. const ASTPtr & ttl_table_ast_, const MergingParams & merging_params_, - const MergeTreeSettings & settings_, + MergeTreeSettingsPtr settings_, bool has_force_restore_data_flag); }; diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 0b66e98d0dc..c788c940efc 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -202,7 +202,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( const ASTPtr & sample_by_ast_, const ASTPtr & ttl_table_ast_, const MergingParams & merging_params_, - const MergeTreeSettings & settings_, + MergeTreeSettingsPtr settings_, bool has_force_restore_data_flag) : MergeTreeData(database_name_, table_name_, path_ + escapeForFileName(table_name_) + '/', @@ -376,7 +376,7 @@ void StorageReplicatedMergeTree::createTableIfNotExists() } -/** Verify that list of columns and table settings match those specified in ZK (/ metadata). +/** Verify that list of columns and table storage_settings match those specified in ZK (/ metadata). * If not, throw an exception. */ void StorageReplicatedMergeTree::checkTableStructure(bool skip_sanity_checks, bool allow_alter) @@ -633,7 +633,8 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks) for (const auto & part : parts) total_rows_on_filesystem += part->rows_count; - bool insane = unexpected_parts_rows > total_rows_on_filesystem * settings.replicated_max_ratio_of_wrong_parts; + const auto storage_settings = getCOWSettings(); + bool insane = unexpected_parts_rows > total_rows_on_filesystem * storage_settings->replicated_max_ratio_of_wrong_parts; if (insane && !skip_sanity_checks) { @@ -776,12 +777,13 @@ void StorageReplicatedMergeTree::checkPartChecksumsAndAddCommitOps(const zkutil: if (!has_been_already_added) { + const auto storage_settings = getCOWSettings(); String part_path = replica_path + "/parts/" + part_name; ops.emplace_back(zkutil::makeCheckRequest( zookeeper_path + "/columns", expected_columns_version)); - if (settings.use_minimalistic_part_header_in_zookeeper) + if (storage_settings->use_minimalistic_part_header_in_zookeeper) { ops.emplace_back(zkutil::makeCreateRequest( part_path, local_part_header.toString(), zkutil::CreateMode::Persistent)); @@ -858,7 +860,7 @@ MergeTreeData::DataPartsVector StorageReplicatedMergeTree::checkPartChecksumsAnd String StorageReplicatedMergeTree::getChecksumsForZooKeeper(const MergeTreeDataPartChecksums & checksums) const { return MinimalisticDataPartChecksums::getSerializedString(checksums, - static_cast(settings.use_minimalistic_checksums_in_zookeeper)); + getCOWSettings()->use_minimalistic_checksums_in_zookeeper); } @@ -1029,13 +1031,14 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry) parts.push_back(part); } + const auto storage_settings = getCOWSettings(); if (!have_all_parts) { /// If you do not have all the necessary parts, try to take some already merged part from someone. LOG_DEBUG(log, "Don't have all parts for merge " << entry.new_part_name << "; will try to fetch it instead"); return false; } - else if (entry.create_time + settings.prefer_fetch_merged_part_time_threshold.totalSeconds() <= time(nullptr)) + else if (entry.create_time + storage_settings->prefer_fetch_merged_part_time_threshold.totalSeconds() <= time(nullptr)) { /// If entry is old enough, and have enough size, and part are exists in any replica, /// then prefer fetching of merged part from replica. @@ -1044,7 +1047,7 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry) for (const auto & part : parts) sum_parts_bytes_on_disk += part->bytes_on_disk; - if (sum_parts_bytes_on_disk >= settings.prefer_fetch_merged_part_size_threshold) + if (sum_parts_bytes_on_disk >= storage_settings->prefer_fetch_merged_part_size_threshold) { String replica = findReplicaHavingPart(entry.new_part_name, true); /// NOTE excessive ZK requests for same data later, may remove. if (!replica.empty()) @@ -1154,6 +1157,7 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry) bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedMergeTree::LogEntry & entry) { const String & source_part_name = entry.source_parts.at(0); + const auto storage_settings = getCOWSettings(); LOG_TRACE(log, "Executing log entry to mutate part " << source_part_name << " to " << entry.new_part_name); DataPartPtr source_part = getActiveContainingPart(source_part_name); @@ -1173,8 +1177,8 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM /// TODO - some better heuristic? size_t estimated_space_for_result = MergeTreeDataMergerMutator::estimateNeededDiskSpace({source_part}); - if (entry.create_time + settings.prefer_fetch_merged_part_time_threshold.totalSeconds() <= time(nullptr) - && estimated_space_for_result >= settings.prefer_fetch_merged_part_size_threshold) + if (entry.create_time + storage_settings->prefer_fetch_merged_part_time_threshold.totalSeconds() <= time(nullptr) + && estimated_space_for_result >= storage_settings->prefer_fetch_merged_part_size_threshold) { /// If entry is old enough, and have enough size, and some replica has the desired part, /// then prefer fetching from replica. @@ -1268,20 +1272,21 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry) { String replica = findReplicaHavingCoveringPart(entry, true); + const auto storage_settings = getCOWSettings(); static std::atomic_uint total_fetches {0}; - if (settings.replicated_max_parallel_fetches && total_fetches >= settings.replicated_max_parallel_fetches) + if (storage_settings->replicated_max_parallel_fetches && total_fetches >= storage_settings->replicated_max_parallel_fetches) { - throw Exception("Too many total fetches from replicas, maximum: " + settings.replicated_max_parallel_fetches.toString(), + throw Exception("Too many total fetches from replicas, maximum: " + storage_settings->replicated_max_parallel_fetches.toString(), ErrorCodes::TOO_MANY_FETCHES); } ++total_fetches; SCOPE_EXIT({--total_fetches;}); - if (settings.replicated_max_parallel_fetches_for_table && current_table_fetches >= settings.replicated_max_parallel_fetches_for_table) + if (storage_settings->replicated_max_parallel_fetches_for_table && current_table_fetches >= storage_settings->replicated_max_parallel_fetches_for_table) { - throw Exception("Too many fetches from replicas for table, maximum: " + settings.replicated_max_parallel_fetches_for_table.toString(), + throw Exception("Too many fetches from replicas for table, maximum: " + storage_settings->replicated_max_parallel_fetches_for_table.toString(), ErrorCodes::TOO_MANY_FETCHES); } @@ -2162,6 +2167,7 @@ void StorageReplicatedMergeTree::mergeSelectingTask() if (!is_leader) return; + const auto storage_settings = getCOWSettings(); const bool deduplicate = false; /// TODO: read deduplicate option from table config const bool force_ttl = false; @@ -2181,16 +2187,16 @@ void StorageReplicatedMergeTree::mergeSelectingTask() /// Otherwise merge queue could be filled with only large merges, /// and in the same time, many small parts could be created and won't be merged. size_t merges_and_mutations_queued = queue.countMergesAndPartMutations(); - if (merges_and_mutations_queued >= settings.max_replicated_merges_in_queue) + if (merges_and_mutations_queued >= storage_settings->max_replicated_merges_in_queue) { LOG_TRACE(log, "Number of queued merges and part mutations (" << merges_and_mutations_queued << ") is greater than max_replicated_merges_in_queue (" - << settings.max_replicated_merges_in_queue << "), so won't select new parts to merge or mutate."); + << storage_settings->max_replicated_merges_in_queue << "), so won't select new parts to merge or mutate."); } else { UInt64 max_source_parts_size_for_merge = merger_mutator.getMaxSourcePartsSizeForMerge( - settings.max_replicated_merges_in_queue, merges_and_mutations_queued); + storage_settings->max_replicated_merges_in_queue, merges_and_mutations_queued); UInt64 max_source_part_size_for_mutation = merger_mutator.getMaxSourcePartSizeForMutation(); FutureMergedMutatedPart future_merged_part; @@ -2973,10 +2979,11 @@ void StorageReplicatedMergeTree::assertNotReadonly() const BlockOutputStreamPtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/, const Context & context) { + const auto storage_settings = getCOWSettings(); assertNotReadonly(); const Settings & query_settings = context.getSettingsRef(); - bool deduplicate = settings.replicated_deduplication_window != 0 && query_settings.insert_deduplicate; + bool deduplicate = storage_settings->replicated_deduplication_window != 0 && query_settings.insert_deduplicate; return std::make_shared(*this, query_settings.insert_quorum, query_settings.insert_quorum_timeout.totalMilliseconds(), query_settings.max_partitions_per_insert_block, deduplicate); @@ -3010,6 +3017,7 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p }; bool force_ttl = (final && (hasTableTTL() || hasAnyColumnTTL())); + const auto storage_settings = getCOWSettings(); if (!partition && final) { @@ -3042,7 +3050,7 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p if (!partition) { selected = merger_mutator.selectPartsToMerge( - future_merged_part, true, settings.max_bytes_to_merge_at_max_space_in_pool, can_merge, &disable_reason); + future_merged_part, true, storage_settings->max_bytes_to_merge_at_max_space_in_pool, can_merge, &disable_reason); } else { @@ -3092,9 +3100,9 @@ void StorageReplicatedMergeTree::alter( if (params.isSettingsAlter()) { - /// We don't replicate settings ALTER. It's local operation. + /// We don't replicate storage_settings ALTER. It's local operation. /// Also we don't upgrade alter lock to table structure lock. - LOG_DEBUG(log, "ALTER settings only"); + LOG_DEBUG(log, "ALTER storage_settings only"); SettingsChanges new_changes; params.applyForSettingsOnly(new_changes); alterSettings(new_changes, current_database_name, current_table_name, query_context, table_lock_holder); @@ -3920,9 +3928,10 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String & void StorageReplicatedMergeTree::getStatus(Status & res, bool with_zk_fields) { auto zookeeper = tryGetZooKeeper(); + const auto storage_settings = getCOWSettings(); res.is_leader = is_leader; - res.can_become_leader = settings.replicated_can_become_leader; + res.can_become_leader = storage_settings->replicated_can_become_leader; res.is_readonly = is_readonly; res.is_session_expired = !zookeeper || zookeeper->expired(); @@ -4112,13 +4121,14 @@ void StorageReplicatedMergeTree::getReplicaDelays(time_t & out_absolute_delay, t out_absolute_delay = getAbsoluteDelay(); out_relative_delay = 0; + const auto storage_settings = getCOWSettings(); /** Relative delay is the maximum difference of absolute delay from any other replica, * (if this replica lags behind any other live replica, or zero, otherwise). * Calculated only if the absolute delay is large enough. */ - if (out_absolute_delay < static_cast(settings.min_relative_delay_to_yield_leadership)) + if (out_absolute_delay < static_cast(storage_settings->min_relative_delay_to_yield_leadership)) return; auto zookeeper = getZooKeeper(); @@ -4376,7 +4386,7 @@ void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, const /// instead. /// /// Mutations of individual parts are in fact pretty similar to merges, e.g. their assignment and execution - /// is governed by the same settings. TODO: support a single "merge-mutation" operation when the data + /// is governed by the same storage_settings. TODO: support a single "merge-mutation" operation when the data /// read from the the source parts is first mutated on the fly to some uniform mutation version and then /// merged to a resulting part. /// @@ -4939,6 +4949,7 @@ void StorageReplicatedMergeTree::getCommitPartOps( const String & block_id_path) const { const String & part_name = part->name; + const auto storage_settings = getCOWSettings(); if (!block_id_path.empty()) { @@ -4956,7 +4967,7 @@ void StorageReplicatedMergeTree::getCommitPartOps( zookeeper_path + "/columns", columns_version)); - if (settings.use_minimalistic_part_header_in_zookeeper) + if (storage_settings->use_minimalistic_part_header_in_zookeeper) { ops.emplace_back(zkutil::makeCreateRequest( replica_path + "/parts/" + part->name, @@ -4985,11 +4996,12 @@ void StorageReplicatedMergeTree::updatePartHeaderInZooKeeperAndCommit( AlterDataPartTransaction & transaction) { String part_path = replica_path + "/parts/" + transaction.getPartName(); + const auto storage_settings = getCOWSettings(); bool need_delete_columns_and_checksums_nodes = false; try { - if (settings.use_minimalistic_part_header_in_zookeeper) + if (storage_settings->use_minimalistic_part_header_in_zookeeper) { auto part_header = ReplicatedMergeTreePartHeader::fromColumnsAndChecksums( transaction.getNewColumns(), transaction.getNewChecksums()); @@ -5169,8 +5181,9 @@ CheckResults StorageReplicatedMergeTree::checkData(const ASTPtr & query, const C bool StorageReplicatedMergeTree::canUseAdaptiveGranularity() const { - return settings.index_granularity_bytes != 0 && - (settings.enable_mixed_granularity_parts || + const auto storage_settings = getCOWSettings(); + return storage_settings->index_granularity_bytes != 0 && + (storage_settings->enable_mixed_granularity_parts || (!has_non_adaptive_index_granularity_parts && !other_replicas_fixed_granularity)); } diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.h b/dbms/src/Storages/StorageReplicatedMergeTree.h index 7f632ab4cb4..a32a9f19473 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.h +++ b/dbms/src/Storages/StorageReplicatedMergeTree.h @@ -534,7 +534,7 @@ protected: const ASTPtr & sample_by_ast_, const ASTPtr & table_ttl_ast_, const MergingParams & merging_params_, - const MergeTreeSettings & settings_, + MergeTreeSettingsPtr settings_, bool has_force_restore_data_flag); };