Buildable code

This commit is contained in:
alesapin 2019-08-13 13:29:31 +03:00
parent 428c753ed7
commit ad81c743c1
19 changed files with 186 additions and 126 deletions

View File

@ -54,14 +54,15 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo
throw Exception("Transferring part to replica was cancelled", ErrorCodes::ABORTED);
String part_name = params.get("part");
const auto data_settings = data.getCOWSettings();
/// Validation of the input that may come from malicious replica.
MergeTreePartInfo::fromPartName(part_name, data.format_version);
static std::atomic_uint total_sends {0};
if ((data.settings.replicated_max_parallel_sends && total_sends >= data.settings.replicated_max_parallel_sends)
|| (data.settings.replicated_max_parallel_sends_for_table && data.current_table_sends >= data.settings.replicated_max_parallel_sends_for_table))
if ((data_settings->replicated_max_parallel_sends && total_sends >= data_settings->replicated_max_parallel_sends)
|| (data_settings->replicated_max_parallel_sends_for_table && data.current_table_sends >= data_settings->replicated_max_parallel_sends_for_table))
{
response.setStatus(std::to_string(HTTP_TOO_MANY_REQUESTS));
response.setReason("Too many concurrent fetches, try again later");
@ -174,6 +175,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
{
/// Validation of the input that may come from malicious replica.
MergeTreePartInfo::fromPartName(part_name, data.format_version);
const auto data_settings = data.getCOWSettings();
Poco::URI uri;
uri.setScheme(interserver_scheme);
@ -200,7 +202,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
timeouts,
creds,
DBMS_DEFAULT_BUFFER_SIZE,
data.settings.replicated_max_parallel_fetches_for_host
data_settings->replicated_max_parallel_fetches_for_host
};
static const String TMP_PREFIX = "tmp_fetch_";

View File

@ -32,7 +32,7 @@ IMergedBlockOutputStream::IMergedBlockOutputStream(
, index_granularity(index_granularity_)
, compute_granularity(index_granularity.empty())
, codec(std::move(codec_))
, with_final_mark(storage.settings.write_final_mark && storage.canUseAdaptiveGranularity())
, with_final_mark(storage.getCOWSettings()->write_final_mark && storage.canUseAdaptiveGranularity())
{
if (blocks_are_granules_size && !index_granularity.empty())
throw Exception("Can't take information about index granularity from blocks, when non empty index_granularity array specified", ErrorCodes::LOGICAL_ERROR);
@ -133,10 +133,11 @@ void fillIndexGranularityImpl(
void IMergedBlockOutputStream::fillIndexGranularity(const Block & block)
{
const auto storage_settings = storage.getCOWSettings();
fillIndexGranularityImpl(
block,
storage.settings.index_granularity_bytes,
storage.settings.index_granularity,
storage_settings->index_granularity_bytes,
storage_settings->index_granularity,
blocks_are_granules_size,
index_offset,
index_granularity,

View File

@ -110,7 +110,6 @@ MergeTreeData::MergeTreeData(
BrokenPartCallback broken_part_callback_)
: global_context(context_),
merging_params(merging_params_),
settings(settings_),
partition_by_ast(partition_by_ast_),
sample_by_ast(sample_by_ast_),
ttl_table_ast(ttl_table_ast_),
@ -119,9 +118,11 @@ MergeTreeData::MergeTreeData(
full_path(full_path_),
broken_part_callback(broken_part_callback_),
log_name(database_name + "." + table_name), log(&Logger::get(log_name)),
guarded_settings(std::move(settings_)),
data_parts_by_info(data_parts_indexes.get<TagByInfo>()),
data_parts_by_state_and_info(data_parts_indexes.get<TagByStateAndInfo>())
{
const auto settings = getCOWSettings();
setPrimaryKeyIndicesAndColumns(order_by_ast_, primary_key_ast_, columns_, indices_);
/// NOTE: using the same columns list as is read when performing actual merges.
@ -725,7 +726,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
{
LOG_DEBUG(log, "Loading data parts");
auto settings_ptr = getImmutableSettings();
const auto settings = getCOWSettings();
Strings part_file_names;
Poco::DirectoryIterator end;
for (Poco::DirectoryIterator it(full_path); it != end; ++it)
@ -747,7 +748,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
}
/// Parallel loading of data parts.
size_t num_threads = std::min(size_t(settings.max_part_loading_threads), part_file_names.size());
size_t num_threads = std::min(size_t(settings->max_part_loading_threads), part_file_names.size());
std::mutex mutex;
@ -866,12 +867,12 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
pool.wait();
if (has_non_adaptive_parts && has_adaptive_parts && !settings_ptr->enable_mixed_granularity_parts)
if (has_non_adaptive_parts && has_adaptive_parts && !settings->enable_mixed_granularity_parts)
throw Exception("Table contains parts with adaptive and non adaptive marks, but `setting enable_mixed_granularity_parts` is disabled", ErrorCodes::LOGICAL_ERROR);
has_non_adaptive_index_granularity_parts = has_non_adaptive_parts;
if (suspicious_broken_parts > settings_ptr->max_suspicious_broken_parts && !skip_sanity_checks)
if (suspicious_broken_parts > settings->max_suspicious_broken_parts && !skip_sanity_checks)
throw Exception("Suspiciously many (" + toString(suspicious_broken_parts) + ") broken parts to remove.",
ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS);
@ -958,10 +959,11 @@ void MergeTreeData::clearOldTemporaryDirectories(ssize_t custom_directories_life
if (!lock.try_lock())
return;
const auto settings = getCOWSettings();
time_t current_time = time(nullptr);
ssize_t deadline = (custom_directories_lifetime_seconds >= 0)
? current_time - custom_directories_lifetime_seconds
: current_time - settings_ptr->temporary_directories_lifetime.totalSeconds();
: current_time - settings->temporary_directories_lifetime.totalSeconds();
/// Delete temporary directories older than a day.
Poco::DirectoryIterator end;
@ -1012,7 +1014,7 @@ MergeTreeData::DataPartsVector MergeTreeData::grabOldParts()
if (part.unique() && /// Grab only parts that are not used by anyone (SELECTs for example).
part_remove_time < now &&
now - part_remove_time > getSettings()->old_parts_lifetime.totalSeconds())
now - part_remove_time > getCOWSettings()->old_parts_lifetime.totalSeconds())
{
parts_to_delete.emplace_back(it);
}
@ -1096,11 +1098,12 @@ void MergeTreeData::clearOldPartsFromFilesystem()
void MergeTreeData::clearPartsFromFilesystem(const DataPartsVector & parts_to_remove)
{
if (parts_to_remove.size() > 1 && settings.max_part_removal_threads > 1 && parts_to_remove.size() > settings.concurrent_part_removal_threshold)
const auto settings = getCOWSettings();
if (parts_to_remove.size() > 1 && settings->max_part_removal_threads > 1 && parts_to_remove.size() > settings->concurrent_part_removal_threshold)
{
/// Parallel parts removal.
size_t num_threads = std::min(size_t(settings.max_part_removal_threads), parts_to_remove.size());
size_t num_threads = std::min(size_t(settings->max_part_removal_threads), parts_to_remove.size());
ThreadPool pool(num_threads);
/// NOTE: Under heavy system load you may get "Cannot schedule a task" from ThreadPool.
@ -1332,6 +1335,7 @@ void MergeTreeData::createConvertExpression(const DataPartPtr & part, const Name
const IndicesASTs & old_indices, const IndicesASTs & new_indices, ExpressionActionsPtr & out_expression,
NameToNameMap & out_rename_map, bool & out_force_update_metadata) const
{
const auto settings = getCOWSettings();
out_expression = nullptr;
out_rename_map = {};
out_force_update_metadata = false;
@ -1339,7 +1343,7 @@ void MergeTreeData::createConvertExpression(const DataPartPtr & part, const Name
if (part)
part_mrk_file_extension = part->index_granularity_info.marks_file_extension;
else
part_mrk_file_extension = mutable_settings->index_granularity_bytes == 0 ? getNonAdaptiveMrkExtension() : getAdaptiveMrkExtension();
part_mrk_file_extension = settings->index_granularity_bytes == 0 ? getNonAdaptiveMrkExtension() : getAdaptiveMrkExtension();
using NameToType = std::map<String, const IDataType *>;
NameToType new_types;
@ -1497,7 +1501,7 @@ void MergeTreeData::alterDataPart(
bool skip_sanity_checks,
AlterDataPartTransactionPtr & transaction)
{
auto settings = getImmutableSettings();
const auto settings = getCOWSettings();
ExpressionActionsPtr expression;
const auto & part = transaction->getDataPart();
bool force_update_metadata;
@ -1641,10 +1645,10 @@ void MergeTreeData::alterSettings(
TableStructureWriteLockHolder & table_lock_holder)
{
{
MutableMergeTreeSettingsPtr settings = std::move(*mutable_settings).mutate();
MutableMergeTreeSettingsPtr settings = std::move(*guarded_settings.getPtr()).mutate();
settings->updateFromChanges(new_changes);
IStorage::alterSettings(new_changes, current_database_name, current_table_name, context, table_lock_holder);
mutable_settings = std::move(settings);
guarded_settings.setPtr(std::move(settings));
}
}
@ -2285,27 +2289,27 @@ std::optional<Int64> MergeTreeData::getMinPartDataVersion() const
void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until) const
{
auto settings_ptr = getImmutableSettings();
const auto settings = getCOWSettings();
const size_t parts_count_in_total = getPartsCount();
if (parts_count_in_total >= settings_ptr->max_parts_in_total)
if (parts_count_in_total >= settings->max_parts_in_total)
{
ProfileEvents::increment(ProfileEvents::RejectedInserts);
throw Exception("Too many parts (" + toString(parts_count_in_total) + ") in all partitions in total. This indicates wrong choice of partition key. The threshold can be modified with 'max_parts_in_total' setting in <merge_tree> element in config.xml or with per-table setting.", ErrorCodes::TOO_MANY_PARTS);
}
const size_t parts_count_in_partition = getMaxPartsCountForPartition();
if (parts_count_in_partition < settings_ptr->parts_to_delay_insert)
if (parts_count_in_partition < settings->parts_to_delay_insert)
return;
if (parts_count_in_partition >= settings_ptr->parts_to_throw_insert)
if (parts_count_in_partition >= settings->parts_to_throw_insert)
{
ProfileEvents::increment(ProfileEvents::RejectedInserts);
throw Exception("Too many parts (" + toString(parts_count_in_partition) + "). Merges are processing significantly slower than inserts.", ErrorCodes::TOO_MANY_PARTS);
}
const size_t max_k = settings_ptr->parts_to_throw_insert - settings_ptr->parts_to_delay_insert; /// always > 0
const size_t k = 1 + parts_count_in_partition - settings_ptr->parts_to_delay_insert; /// from 1 to max_k
const double delay_milliseconds = ::pow(settings_ptr->max_delay_to_insert * 1000, static_cast<double>(k) / max_k);
const size_t max_k = settings->parts_to_throw_insert - settings->parts_to_delay_insert; /// always > 0
const size_t k = 1 + parts_count_in_partition - settings->parts_to_delay_insert; /// from 1 to max_k
const double delay_milliseconds = ::pow(settings->max_delay_to_insert * 1000, static_cast<double>(k) / max_k);
ProfileEvents::increment(ProfileEvents::DelayedInserts);
ProfileEvents::increment(ProfileEvents::DelayedInsertsMilliseconds, delay_milliseconds);
@ -2323,9 +2327,9 @@ void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until) const
void MergeTreeData::throwInsertIfNeeded() const
{
auto settings_ptr = getImmutableSettings();
const auto settings = getCOWSettings();
const size_t parts_count_in_total = getPartsCount();
if (parts_count_in_total >= settings_ptr->max_parts_in_total)
if (parts_count_in_total >= settings->max_parts_in_total)
{
ProfileEvents::increment(ProfileEvents::RejectedInserts);
throw Exception("Too many parts (" + toString(parts_count_in_total) + ") in all partitions in total. This indicates wrong choice of partition key. The threshold can be modified with 'max_parts_in_total' setting in <merge_tree> element in config.xml or with per-table setting.", ErrorCodes::TOO_MANY_PARTS);
@ -2333,7 +2337,7 @@ void MergeTreeData::throwInsertIfNeeded() const
const size_t parts_count_in_partition = getMaxPartsCountForPartition();
if (parts_count_in_partition >= settings_ptr->parts_to_throw_insert)
if (parts_count_in_partition >= settings->parts_to_throw_insert)
{
ProfileEvents::increment(ProfileEvents::RejectedInserts);
throw Exception("Too many parts (" + toString(parts_count_in_partition) + "). Merges are processing significantly slower than inserts.", ErrorCodes::TOO_MANY_PARTS);
@ -2916,9 +2920,9 @@ void MergeTreeData::freezePartitionsByMatcher(MatcherFn matcher, const String &
bool MergeTreeData::canReplacePartition(const DataPartPtr & src_part) const
{
auto settings_ptr = getImmutableSettings();
const auto settings = getCOWSettings();
if (!settings_ptr->enable_mixed_granularity_parts || settings_ptr->index_granularity_bytes == 0)
if (!settings->enable_mixed_granularity_parts || settings->index_granularity_bytes == 0)
{
if (!canUseAdaptiveGranularity() && src_part->index_granularity_info.is_adaptive)
return false;

View File

@ -594,9 +594,9 @@ public:
/// Has additional constraint in replicated version
virtual bool canUseAdaptiveGranularity() const
{
auto settings_ptr = getImmutableSettings();
return settings_ptr->index_granularity_bytes != 0 &&
(settings_ptr->enable_mixed_granularity_parts || !has_non_adaptive_index_granularity_parts);
const auto settings = getCOWSettings();
return settings->index_granularity_bytes != 0 &&
(settings->enable_mixed_granularity_parts || !has_non_adaptive_index_granularity_parts);
}
@ -657,9 +657,12 @@ public:
bool has_non_adaptive_index_granularity_parts = false;
MergeTreeSettingsPtr getImmutableSettings() const
/// Get copy-on-write pointer to storage settings.
/// Copy this pointer into your scope and you will
/// get consistent settings.
const MergeTreeSettingsPtr getCOWSettings() const
{
return mutable_settings;
return guarded_settings.copyPtr();
}
protected:
@ -691,9 +694,26 @@ protected:
String log_name;
Logger * log;
/// Just hides settings pointer from direct usage
class MergeTreeSettingsGuard
{
private:
/// Settings COW pointer. Data maybe changed at any point of time.
/// If you need consistent settings, just copy pointer to your scope.
MergeTreeSettingsPtr mutable_settings;
MergeTreeSettingsPtr settings_ptr;
public:
MergeTreeSettingsGuard(MergeTreeSettingsPtr settings_ptr_)
: settings_ptr(settings_ptr_)
{}
const MergeTreeSettingsPtr copyPtr() const { return settings_ptr; }
MergeTreeSettingsPtr & getPtr() { return settings_ptr; }
void setPtr(MergeTreeSettingsPtr ptr) { settings_ptr = ptr; }
};
/// Storage settings. Don't use this field directly, if you
/// want readonly settings. Prefer getCOWSettings() method.
MergeTreeSettingsGuard guarded_settings;
/// Work with data parts

View File

@ -141,15 +141,16 @@ UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge(size_t pool_siz
throw Exception("Logical error: invalid arguments passed to getMaxSourcePartsSize: pool_used > pool_size", ErrorCodes::LOGICAL_ERROR);
size_t free_entries = pool_size - pool_used;
const auto data_settings = data.getCOWSettings();
UInt64 max_size = 0;
if (free_entries >= data.settings.number_of_free_entries_in_pool_to_lower_max_size_of_merge)
max_size = data.settings.max_bytes_to_merge_at_max_space_in_pool;
if (free_entries >= data_settings->number_of_free_entries_in_pool_to_lower_max_size_of_merge)
max_size = data_settings->max_bytes_to_merge_at_max_space_in_pool;
else
max_size = interpolateExponential(
data.settings.max_bytes_to_merge_at_min_space_in_pool,
data.settings.max_bytes_to_merge_at_max_space_in_pool,
static_cast<double>(free_entries) / data.settings.number_of_free_entries_in_pool_to_lower_max_size_of_merge);
data_settings->max_bytes_to_merge_at_min_space_in_pool,
data_settings->max_bytes_to_merge_at_max_space_in_pool,
static_cast<double>(free_entries) / data_settings->number_of_free_entries_in_pool_to_lower_max_size_of_merge);
return std::min(max_size, static_cast<UInt64>(DiskSpaceMonitor::getUnreservedFreeSpace(data.full_path) / DISK_USAGE_COEFFICIENT_TO_SELECT));
}
@ -169,6 +170,7 @@ bool MergeTreeDataMergerMutator::selectPartsToMerge(
String * out_disable_reason)
{
MergeTreeData::DataPartsVector data_parts = data.getDataPartsVector();
const auto data_settings = data.getCOWSettings();
if (data_parts.empty())
{
@ -223,7 +225,7 @@ bool MergeTreeDataMergerMutator::selectPartsToMerge(
merge_settings.base = 1;
bool can_merge_with_ttl =
(current_time - last_merge_with_ttl > data.settings.merge_with_ttl_timeout);
(current_time - last_merge_with_ttl > data_settings->merge_with_ttl_timeout);
/// NOTE Could allow selection of different merge strategy.
if (can_merge_with_ttl && has_part_with_expired_ttl && !ttl_merges_blocker.isCancelled())
@ -545,6 +547,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
Names all_column_names = data.getColumns().getNamesOfPhysical();
NamesAndTypesList all_columns = data.getColumns().getAllPhysical();
const auto data_settings = data.getCOWSettings();
NamesAndTypesList gathering_columns, merging_columns;
Names gathering_column_names, merging_column_names;
@ -617,13 +620,13 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
/// We count total amount of bytes in parts
/// and use direct_io + aio if there is more than min_merge_bytes_to_use_direct_io
bool read_with_direct_io = false;
if (data.settings.min_merge_bytes_to_use_direct_io != 0)
if (data_settings->min_merge_bytes_to_use_direct_io != 0)
{
size_t total_size = 0;
for (const auto & part : parts)
{
total_size += part->bytes_on_disk;
if (total_size >= data.settings.min_merge_bytes_to_use_direct_io)
if (total_size >= data_settings->min_merge_bytes_to_use_direct_io)
{
LOG_DEBUG(log, "Will merge parts reading files in O_DIRECT");
read_with_direct_io = true;
@ -720,7 +723,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
merging_columns,
compression_codec,
merged_column_to_size,
data.settings.min_merge_bytes_to_use_direct_io,
data_settings->min_merge_bytes_to_use_direct_io,
blocks_are_granules_size};
merged_stream->readPrefix();
@ -938,6 +941,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
auto in = mutations_interpreter.execute();
NamesAndTypesList all_columns = data.getColumns().getAllPhysical();
const auto data_settings = data.getCOWSettings();
Block in_header = in->getHeader();
@ -995,7 +999,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
}
NameSet files_to_skip = {"checksums.txt", "columns.txt"};
auto mrk_extension = data.settings.index_granularity_bytes ? getAdaptiveMrkExtension() : getNonAdaptiveMrkExtension();
auto mrk_extension = data_settings->index_granularity_bytes ? getAdaptiveMrkExtension() : getNonAdaptiveMrkExtension();
for (const auto & entry : in_header)
{
IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path)
@ -1092,9 +1096,11 @@ MergeTreeDataMergerMutator::MergeAlgorithm MergeTreeDataMergerMutator::chooseMer
const MergeTreeData::DataPartsVector & parts, size_t sum_rows_upper_bound,
const NamesAndTypesList & gathering_columns, bool deduplicate, bool need_remove_expired_values) const
{
const auto data_settings = data.getCOWSettings();
if (deduplicate)
return MergeAlgorithm::Horizontal;
if (data.settings.enable_vertical_merge_algorithm == 0)
if (data_settings->enable_vertical_merge_algorithm == 0)
return MergeAlgorithm::Horizontal;
if (need_remove_expired_values)
return MergeAlgorithm::Horizontal;
@ -1105,9 +1111,9 @@ MergeTreeDataMergerMutator::MergeAlgorithm MergeTreeDataMergerMutator::chooseMer
data.merging_params.mode == MergeTreeData::MergingParams::Replacing ||
data.merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing;
bool enough_ordinary_cols = gathering_columns.size() >= data.settings.vertical_merge_algorithm_min_columns_to_activate;
bool enough_ordinary_cols = gathering_columns.size() >= data_settings->vertical_merge_algorithm_min_columns_to_activate;
bool enough_total_rows = sum_rows_upper_bound >= data.settings.vertical_merge_algorithm_min_rows_to_activate;
bool enough_total_rows = sum_rows_upper_bound >= data_settings->vertical_merge_algorithm_min_rows_to_activate;
bool no_parts_overflow = parts.size() <= RowSourcePart::MAX_PARTS;

View File

@ -672,6 +672,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
size_t sum_marks = 0;
size_t total_rows = 0;
const auto data_settings = data.getCOWSettings();
size_t adaptive_parts = 0;
for (size_t i = 0; i < parts.size(); ++i)
{
@ -688,18 +689,18 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
size_t index_granularity_bytes = 0;
if (adaptive_parts > parts.size() / 2)
index_granularity_bytes = data.settings.index_granularity_bytes;
index_granularity_bytes = data_settings->index_granularity_bytes;
const size_t max_marks_to_use_cache = roundRowsOrBytesToMarks(
settings.merge_tree_max_rows_to_use_cache,
settings.merge_tree_max_bytes_to_use_cache,
data.settings.index_granularity,
data_settings->index_granularity,
index_granularity_bytes);
const size_t min_marks_for_concurrent_read = roundRowsOrBytesToMarks(
settings.merge_tree_min_rows_for_concurrent_read,
settings.merge_tree_min_bytes_for_concurrent_read,
data.settings.index_granularity,
data_settings->index_granularity,
index_granularity_bytes);
if (sum_marks > max_marks_to_use_cache)
@ -830,6 +831,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithO
SortingInfoPtr sorting_info = query_info.sorting_info;
size_t adaptive_parts = 0;
std::vector<size_t> sum_marks_in_parts(parts.size());
const auto data_settings = data.getCOWSettings();
/// In case of reverse order let's split ranges to avoid reading much data.
auto split_ranges = [max_block_size](const auto & ranges, size_t rows_granularity, size_t num_marks_in_part)
@ -862,7 +864,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithO
sum_marks += sum_marks_in_parts[i];
if (sorting_info->direction == -1)
parts[i].ranges = split_ranges(parts[i].ranges, data.settings.index_granularity, sum_marks_in_parts[i]);
parts[i].ranges = split_ranges(parts[i].ranges, data_settings->index_granularity, sum_marks_in_parts[i]);
/// Let the ranges be listed from right to left so that the leftmost range can be dropped using `pop_back()`.
std::reverse(parts[i].ranges.begin(), parts[i].ranges.end());
@ -873,18 +875,18 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithO
size_t index_granularity_bytes = 0;
if (adaptive_parts > parts.size() / 2)
index_granularity_bytes = data.settings.index_granularity_bytes;
index_granularity_bytes = data_settings->index_granularity_bytes;
const size_t max_marks_to_use_cache = roundRowsOrBytesToMarks(
settings.merge_tree_max_rows_to_use_cache,
settings.merge_tree_max_bytes_to_use_cache,
data.settings.index_granularity,
data_settings->index_granularity,
index_granularity_bytes);
const size_t min_marks_for_concurrent_read = roundRowsOrBytesToMarks(
settings.merge_tree_min_rows_for_concurrent_read,
settings.merge_tree_min_bytes_for_concurrent_read,
data.settings.index_granularity,
data_settings->index_granularity,
index_granularity_bytes);
if (sum_marks > max_marks_to_use_cache)
@ -1012,6 +1014,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal
const Names & virt_columns,
const Settings & settings) const
{
const auto data_settings = data.getCOWSettings();
size_t sum_marks = 0;
size_t adaptive_parts = 0;
for (size_t i = 0; i < parts.size(); ++i)
@ -1025,12 +1028,12 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal
size_t index_granularity_bytes = 0;
if (adaptive_parts >= parts.size() / 2)
index_granularity_bytes = data.settings.index_granularity_bytes;
index_granularity_bytes = data_settings->index_granularity_bytes;
const size_t max_marks_to_use_cache = roundRowsOrBytesToMarks(
settings.merge_tree_max_rows_to_use_cache,
settings.merge_tree_max_bytes_to_use_cache,
data.settings.index_granularity,
data_settings->index_granularity,
index_granularity_bytes);
if (sum_marks > max_marks_to_use_cache)

View File

@ -25,12 +25,13 @@ std::optional<std::string> MergeTreeIndexGranularityInfo::getMrkExtensionFromFS(
MergeTreeIndexGranularityInfo::MergeTreeIndexGranularityInfo(
const MergeTreeData & storage)
{
fixed_index_granularity = storage.settings.index_granularity;
const auto storage_settings = storage.getCOWSettings();
fixed_index_granularity = storage_settings->index_granularity;
/// Granularity is fixed
if (!storage.canUseAdaptiveGranularity())
setNonAdaptive();
else
setAdaptive(storage.settings.index_granularity_bytes);
setAdaptive(storage_settings->index_granularity_bytes);
}

View File

@ -76,8 +76,8 @@ void MergeTreeSettings::loadFromQuery(ASTStorage & storage_def)
}
MergeTreeSettings * MergeTreeSettings::clone() const
MergeTreeSettings::MutablePtr MergeTreeSettings::clone() const
{
return new MergeTreeSettings(*this);
return COW::create(*this);
}
}

View File

@ -100,7 +100,7 @@ struct MergeTreeSettings : public SettingsCollection<MergeTreeSettings>, public
/// NOTE: will rewrite the AST to add immutable settings.
void loadFromQuery(ASTStorage & storage_def);
MergeTreeSettings * clone() const;
MutablePtr clone() const;
~MergeTreeSettings() {}
};

View File

@ -31,7 +31,7 @@ MergeTreeThreadSelectBlockInputStream::MergeTreeThreadSelectBlockInputStream(
/// Maybe it will make sence to add settings `max_block_size_bytes`
if (max_block_size_rows && !storage.canUseAdaptiveGranularity())
{
size_t fixed_index_granularity = storage.settings.index_granularity;
size_t fixed_index_granularity = storage.getCOWSettings()->index_granularity;
min_marks_to_read = (min_marks_to_read_ * fixed_index_granularity + max_block_size_rows - 1)
/ max_block_size_rows * max_block_size_rows / fixed_index_granularity;
}

View File

@ -27,8 +27,9 @@ ReplicatedMergeTreeCleanupThread::ReplicatedMergeTreeCleanupThread(StorageReplic
void ReplicatedMergeTreeCleanupThread::run()
{
const auto CLEANUP_SLEEP_MS = storage.settings.cleanup_delay_period * 1000
+ std::uniform_int_distribution<UInt64>(0, storage.settings.cleanup_delay_period_random_add * 1000)(rng);
auto storage_settings = storage.getCOWSettings();
const auto CLEANUP_SLEEP_MS = storage_settings->cleanup_delay_period * 1000
+ std::uniform_int_distribution<UInt64>(0, storage_settings->cleanup_delay_period_random_add * 1000)(rng);
try
{
@ -74,6 +75,7 @@ void ReplicatedMergeTreeCleanupThread::iterate()
void ReplicatedMergeTreeCleanupThread::clearOldLogs()
{
auto zookeeper = storage.getZooKeeper();
auto storage_settings = storage.getCOWSettings();
Coordination::Stat stat;
if (!zookeeper->exists(storage.zookeeper_path + "/log", &stat))
@ -82,7 +84,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
int children_count = stat.numChildren;
/// We will wait for 1.1 times more records to accumulate than necessary.
if (static_cast<double>(children_count) < storage.settings.min_replicated_logs_to_keep * 1.1)
if (static_cast<double>(children_count) < storage_settings->min_replicated_logs_to_keep * 1.1)
return;
Strings replicas = zookeeper->getChildren(storage.zookeeper_path + "/replicas", &stat);
@ -100,8 +102,8 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
std::sort(entries.begin(), entries.end());
String min_saved_record_log_str = entries[
entries.size() > storage.settings.max_replicated_logs_to_keep
? entries.size() - storage.settings.max_replicated_logs_to_keep
entries.size() > storage_settings->max_replicated_logs_to_keep
? entries.size() - storage_settings->max_replicated_logs_to_keep
: 0];
/// Replicas that were marked is_lost but are active.
@ -203,7 +205,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
min_saved_log_pointer = std::min(min_saved_log_pointer, min_log_pointer_lost_candidate);
/// We will not touch the last `min_replicated_logs_to_keep` records.
entries.erase(entries.end() - std::min<UInt64>(entries.size(), storage.settings.min_replicated_logs_to_keep), entries.end());
entries.erase(entries.end() - std::min<UInt64>(entries.size(), storage_settings->min_replicated_logs_to_keep), entries.end());
/// We will not touch records that are no less than `min_saved_log_pointer`.
entries.erase(std::lower_bound(entries.begin(), entries.end(), "log-" + padIndex(min_saved_log_pointer)), entries.end());
@ -285,6 +287,7 @@ struct ReplicatedMergeTreeCleanupThread::NodeWithStat
void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
{
auto zookeeper = storage.getZooKeeper();
auto storage_settings = storage.getCOWSettings();
std::vector<NodeWithStat> timed_blocks;
getBlocksSortedByTime(*zookeeper, timed_blocks);
@ -294,12 +297,12 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
/// Use ZooKeeper's first node (last according to time) timestamp as "current" time.
Int64 current_time = timed_blocks.front().ctime;
Int64 time_threshold = std::max(static_cast<Int64>(0), current_time - static_cast<Int64>(1000 * storage.settings.replicated_deduplication_window_seconds));
Int64 time_threshold = std::max(static_cast<Int64>(0), current_time - static_cast<Int64>(1000 * storage_settings->replicated_deduplication_window_seconds));
/// Virtual node, all nodes that are "greater" than this one will be deleted
NodeWithStat block_threshold{{}, time_threshold};
size_t current_deduplication_window = std::min<size_t>(timed_blocks.size(), storage.settings.replicated_deduplication_window);
size_t current_deduplication_window = std::min<size_t>(timed_blocks.size(), storage_settings->replicated_deduplication_window);
auto first_outdated_block_fixed_threshold = timed_blocks.begin() + current_deduplication_window;
auto first_outdated_block_time_threshold = std::upper_bound(timed_blocks.begin(), timed_blocks.end(), block_threshold, NodeWithStat::greaterByTime);
auto first_outdated_block = std::min(first_outdated_block_fixed_threshold, first_outdated_block_time_threshold);
@ -401,10 +404,11 @@ void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(zkutil::ZooKeeper &
void ReplicatedMergeTreeCleanupThread::clearOldMutations()
{
if (!storage.settings.finished_mutations_to_keep)
auto storage_settings = storage.getCOWSettings();
if (!storage_settings->finished_mutations_to_keep)
return;
if (storage.queue.countFinishedMutations() <= storage.settings.finished_mutations_to_keep)
if (storage.queue.countFinishedMutations() <= storage_settings->finished_mutations_to_keep)
{
/// Not strictly necessary, but helps to avoid unnecessary ZooKeeper requests.
/// If even this replica hasn't finished enough mutations yet, then we don't need to clean anything.
@ -431,10 +435,10 @@ void ReplicatedMergeTreeCleanupThread::clearOldMutations()
/// Do not remove entries that are greater than `min_pointer` (they are not done yet).
entries.erase(std::upper_bound(entries.begin(), entries.end(), padIndex(min_pointer)), entries.end());
/// Do not remove last `storage.settings.finished_mutations_to_keep` entries.
if (entries.size() <= storage.settings.finished_mutations_to_keep)
/// Do not remove last `storage_settings->finished_mutations_to_keep` entries.
if (entries.size() <= storage_settings->finished_mutations_to_keep)
return;
entries.erase(entries.end() - storage.settings.finished_mutations_to_keep, entries.end());
entries.erase(entries.end() - storage_settings->finished_mutations_to_keep, entries.end());
if (entries.empty())
return;

View File

@ -961,7 +961,8 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
* (because it may be ordered by OPTIMIZE or early with differrent settings).
*/
UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSizeForMerge();
if (max_source_parts_size != data.settings.max_bytes_to_merge_at_max_space_in_pool
const auto data_settings = data.getCOWSettings();
if (max_source_parts_size != data_settings->max_bytes_to_merge_at_max_space_in_pool
&& sum_parts_size_in_bytes > max_source_parts_size)
{
String reason = "Not executing log entry for part " + entry.new_part_name

View File

@ -44,11 +44,12 @@ ReplicatedMergeTreeRestartingThread::ReplicatedMergeTreeRestartingThread(Storage
, log(&Logger::get(log_name))
, active_node_identifier(generateActiveNodeIdentifier())
{
check_period_ms = storage.settings.zookeeper_session_expiration_check_period.totalSeconds() * 1000;
const auto storage_settings = storage.getCOWSettings();
check_period_ms = storage_settings->zookeeper_session_expiration_check_period.totalSeconds() * 1000;
/// Periodicity of checking lag of replica.
if (check_period_ms > static_cast<Int64>(storage.settings.check_delay_period) * 1000)
check_period_ms = storage.settings.check_delay_period * 1000;
if (check_period_ms > static_cast<Int64>(storage_settings->check_delay_period) * 1000)
check_period_ms = storage_settings->check_delay_period * 1000;
task = storage.global_context.getSchedulePool().createTask(log_name, [this]{ run(); });
}
@ -121,7 +122,8 @@ void ReplicatedMergeTreeRestartingThread::run()
}
time_t current_time = time(nullptr);
if (current_time >= prev_time_of_check_delay + static_cast<time_t>(storage.settings.check_delay_period))
const auto storage_settings = storage.getCOWSettings();
if (current_time >= prev_time_of_check_delay + static_cast<time_t>(storage_settings->check_delay_period))
{
/// Find out lag of replicas.
time_t absolute_delay = 0;
@ -136,10 +138,10 @@ void ReplicatedMergeTreeRestartingThread::run()
/// We give up leadership if the relative lag is greater than threshold.
if (storage.is_leader
&& relative_delay > static_cast<time_t>(storage.settings.min_relative_delay_to_yield_leadership))
&& relative_delay > static_cast<time_t>(storage_settings->min_relative_delay_to_yield_leadership))
{
LOG_INFO(log, "Relative replica delay (" << relative_delay << " seconds) is bigger than threshold ("
<< storage.settings.min_relative_delay_to_yield_leadership << "). Will yield leadership.");
<< storage_settings->min_relative_delay_to_yield_leadership << "). Will yield leadership.");
ProfileEvents::increment(ProfileEvents::ReplicaYieldLeadership);
@ -169,6 +171,7 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
activateReplica();
const auto & zookeeper = storage.getZooKeeper();
const auto storage_settings = storage.getCOWSettings();
storage.cloneReplicaIfNeeded(zookeeper);
@ -181,7 +184,7 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
updateQuorumIfWeHavePart();
if (storage.settings.replicated_can_become_leader)
if (storage_settings->replicated_can_become_leader)
storage.enterLeaderElection();
else
LOG_INFO(log, "Will not enter leader election because replicated_can_become_leader=0");

View File

@ -27,8 +27,9 @@ ReplicatedMergeTreeTableMetadata::ReplicatedMergeTreeTableMetadata(const MergeTr
if (data.format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
date_column = data.minmax_idx_columns[data.minmax_idx_date_column_pos];
const auto data_settings = data.getCOWSettings();
sampling_expression = formattedAST(data.sample_by_ast);
index_granularity = data.settings.index_granularity;
index_granularity = data_settings->index_granularity;
merging_params_mode = static_cast<int>(data.merging_params.mode);
sign_column = data.merging_params.sign_column;
@ -48,7 +49,7 @@ ReplicatedMergeTreeTableMetadata::ReplicatedMergeTreeTableMetadata(const MergeTr
ttl_table = formattedAST(data.ttl_table_ast);
skip_indices = data.getIndices().toString();
if (data.canUseAdaptiveGranularity())
index_granularity_bytes = data.settings.index_granularity_bytes;
index_granularity_bytes = data_settings->index_granularity_bytes;
else
index_granularity_bytes = 0;
}

View File

@ -641,13 +641,13 @@ static StoragePtr create(const StorageFactory::Arguments & args)
zookeeper_path, replica_name, args.attach, args.data_path, args.database_name, args.table_name,
args.columns, indices_description,
args.context, date_column_name, partition_by_ast, order_by_ast, primary_key_ast,
sample_by_ast, ttl_table_ast, merging_params, storage_settings,
sample_by_ast, ttl_table_ast, merging_params, std::move(storage_settings),
args.has_force_restore_data_flag);
else
return StorageMergeTree::create(
args.data_path, args.database_name, args.table_name, args.columns, indices_description,
args.attach, args.context, date_column_name, partition_by_ast, order_by_ast,
primary_key_ast, sample_by_ast, ttl_table_ast, merging_params, storage_settings,
primary_key_ast, sample_by_ast, ttl_table_ast, merging_params, std::move(storage_settings),
args.has_force_restore_data_flag);
}

View File

@ -62,7 +62,7 @@ StorageMergeTree::StorageMergeTree(
const ASTPtr & sample_by_ast_, /// nullptr, if sampling is not supported.
const ASTPtr & ttl_table_ast_,
const MergingParams & merging_params_,
const MergeTreeSettings & settings_,
MergeTreeSettingsPtr settings_,
bool has_force_restore_data_flag)
: MergeTreeData(database_name_, table_name_,
path_ + escapeForFileName(table_name_) + '/',
@ -801,14 +801,15 @@ Int64 StorageMergeTree::getCurrentMutationVersion(
void StorageMergeTree::clearOldMutations()
{
if (!settings.finished_mutations_to_keep)
const auto settings = getCOWSettings();
if (!settings->finished_mutations_to_keep)
return;
std::vector<MergeTreeMutationEntry> mutations_to_delete;
{
std::lock_guard lock(currently_merging_mutex);
if (current_mutations_by_version.size() <= settings.finished_mutations_to_keep)
if (current_mutations_by_version.size() <= settings->finished_mutations_to_keep)
return;
auto begin_it = current_mutations_by_version.begin();
@ -819,10 +820,10 @@ void StorageMergeTree::clearOldMutations()
end_it = current_mutations_by_version.upper_bound(*min_version);
size_t done_count = std::distance(begin_it, end_it);
if (done_count <= settings.finished_mutations_to_keep)
if (done_count <= settings->finished_mutations_to_keep)
return;
size_t to_delete_count = done_count - settings.finished_mutations_to_keep;
size_t to_delete_count = done_count - settings->finished_mutations_to_keep;
auto it = begin_it;
for (size_t i = 0; i < to_delete_count; ++i)

View File

@ -151,7 +151,7 @@ protected:
const ASTPtr & sample_by_ast_, /// nullptr, if sampling is not supported.
const ASTPtr & ttl_table_ast_,
const MergingParams & merging_params_,
const MergeTreeSettings & settings_,
MergeTreeSettingsPtr settings_,
bool has_force_restore_data_flag);
};

View File

@ -202,7 +202,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
const ASTPtr & sample_by_ast_,
const ASTPtr & ttl_table_ast_,
const MergingParams & merging_params_,
const MergeTreeSettings & settings_,
MergeTreeSettingsPtr settings_,
bool has_force_restore_data_flag)
: MergeTreeData(database_name_, table_name_,
path_ + escapeForFileName(table_name_) + '/',
@ -376,7 +376,7 @@ void StorageReplicatedMergeTree::createTableIfNotExists()
}
/** Verify that list of columns and table settings match those specified in ZK (/ metadata).
/** Verify that list of columns and table storage_settings match those specified in ZK (/ metadata).
* If not, throw an exception.
*/
void StorageReplicatedMergeTree::checkTableStructure(bool skip_sanity_checks, bool allow_alter)
@ -633,7 +633,8 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
for (const auto & part : parts)
total_rows_on_filesystem += part->rows_count;
bool insane = unexpected_parts_rows > total_rows_on_filesystem * settings.replicated_max_ratio_of_wrong_parts;
const auto storage_settings = getCOWSettings();
bool insane = unexpected_parts_rows > total_rows_on_filesystem * storage_settings->replicated_max_ratio_of_wrong_parts;
if (insane && !skip_sanity_checks)
{
@ -776,12 +777,13 @@ void StorageReplicatedMergeTree::checkPartChecksumsAndAddCommitOps(const zkutil:
if (!has_been_already_added)
{
const auto storage_settings = getCOWSettings();
String part_path = replica_path + "/parts/" + part_name;
ops.emplace_back(zkutil::makeCheckRequest(
zookeeper_path + "/columns", expected_columns_version));
if (settings.use_minimalistic_part_header_in_zookeeper)
if (storage_settings->use_minimalistic_part_header_in_zookeeper)
{
ops.emplace_back(zkutil::makeCreateRequest(
part_path, local_part_header.toString(), zkutil::CreateMode::Persistent));
@ -858,7 +860,7 @@ MergeTreeData::DataPartsVector StorageReplicatedMergeTree::checkPartChecksumsAnd
String StorageReplicatedMergeTree::getChecksumsForZooKeeper(const MergeTreeDataPartChecksums & checksums) const
{
return MinimalisticDataPartChecksums::getSerializedString(checksums,
static_cast<bool>(settings.use_minimalistic_checksums_in_zookeeper));
getCOWSettings()->use_minimalistic_checksums_in_zookeeper);
}
@ -1029,13 +1031,14 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
parts.push_back(part);
}
const auto storage_settings = getCOWSettings();
if (!have_all_parts)
{
/// If you do not have all the necessary parts, try to take some already merged part from someone.
LOG_DEBUG(log, "Don't have all parts for merge " << entry.new_part_name << "; will try to fetch it instead");
return false;
}
else if (entry.create_time + settings.prefer_fetch_merged_part_time_threshold.totalSeconds() <= time(nullptr))
else if (entry.create_time + storage_settings->prefer_fetch_merged_part_time_threshold.totalSeconds() <= time(nullptr))
{
/// If entry is old enough, and have enough size, and part are exists in any replica,
/// then prefer fetching of merged part from replica.
@ -1044,7 +1047,7 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
for (const auto & part : parts)
sum_parts_bytes_on_disk += part->bytes_on_disk;
if (sum_parts_bytes_on_disk >= settings.prefer_fetch_merged_part_size_threshold)
if (sum_parts_bytes_on_disk >= storage_settings->prefer_fetch_merged_part_size_threshold)
{
String replica = findReplicaHavingPart(entry.new_part_name, true); /// NOTE excessive ZK requests for same data later, may remove.
if (!replica.empty())
@ -1154,6 +1157,7 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedMergeTree::LogEntry & entry)
{
const String & source_part_name = entry.source_parts.at(0);
const auto storage_settings = getCOWSettings();
LOG_TRACE(log, "Executing log entry to mutate part " << source_part_name << " to " << entry.new_part_name);
DataPartPtr source_part = getActiveContainingPart(source_part_name);
@ -1173,8 +1177,8 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM
/// TODO - some better heuristic?
size_t estimated_space_for_result = MergeTreeDataMergerMutator::estimateNeededDiskSpace({source_part});
if (entry.create_time + settings.prefer_fetch_merged_part_time_threshold.totalSeconds() <= time(nullptr)
&& estimated_space_for_result >= settings.prefer_fetch_merged_part_size_threshold)
if (entry.create_time + storage_settings->prefer_fetch_merged_part_time_threshold.totalSeconds() <= time(nullptr)
&& estimated_space_for_result >= storage_settings->prefer_fetch_merged_part_size_threshold)
{
/// If entry is old enough, and have enough size, and some replica has the desired part,
/// then prefer fetching from replica.
@ -1268,20 +1272,21 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM
bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry)
{
String replica = findReplicaHavingCoveringPart(entry, true);
const auto storage_settings = getCOWSettings();
static std::atomic_uint total_fetches {0};
if (settings.replicated_max_parallel_fetches && total_fetches >= settings.replicated_max_parallel_fetches)
if (storage_settings->replicated_max_parallel_fetches && total_fetches >= storage_settings->replicated_max_parallel_fetches)
{
throw Exception("Too many total fetches from replicas, maximum: " + settings.replicated_max_parallel_fetches.toString(),
throw Exception("Too many total fetches from replicas, maximum: " + storage_settings->replicated_max_parallel_fetches.toString(),
ErrorCodes::TOO_MANY_FETCHES);
}
++total_fetches;
SCOPE_EXIT({--total_fetches;});
if (settings.replicated_max_parallel_fetches_for_table && current_table_fetches >= settings.replicated_max_parallel_fetches_for_table)
if (storage_settings->replicated_max_parallel_fetches_for_table && current_table_fetches >= storage_settings->replicated_max_parallel_fetches_for_table)
{
throw Exception("Too many fetches from replicas for table, maximum: " + settings.replicated_max_parallel_fetches_for_table.toString(),
throw Exception("Too many fetches from replicas for table, maximum: " + storage_settings->replicated_max_parallel_fetches_for_table.toString(),
ErrorCodes::TOO_MANY_FETCHES);
}
@ -2162,6 +2167,7 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
if (!is_leader)
return;
const auto storage_settings = getCOWSettings();
const bool deduplicate = false; /// TODO: read deduplicate option from table config
const bool force_ttl = false;
@ -2181,16 +2187,16 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
/// Otherwise merge queue could be filled with only large merges,
/// and in the same time, many small parts could be created and won't be merged.
size_t merges_and_mutations_queued = queue.countMergesAndPartMutations();
if (merges_and_mutations_queued >= settings.max_replicated_merges_in_queue)
if (merges_and_mutations_queued >= storage_settings->max_replicated_merges_in_queue)
{
LOG_TRACE(log, "Number of queued merges and part mutations (" << merges_and_mutations_queued
<< ") is greater than max_replicated_merges_in_queue ("
<< settings.max_replicated_merges_in_queue << "), so won't select new parts to merge or mutate.");
<< storage_settings->max_replicated_merges_in_queue << "), so won't select new parts to merge or mutate.");
}
else
{
UInt64 max_source_parts_size_for_merge = merger_mutator.getMaxSourcePartsSizeForMerge(
settings.max_replicated_merges_in_queue, merges_and_mutations_queued);
storage_settings->max_replicated_merges_in_queue, merges_and_mutations_queued);
UInt64 max_source_part_size_for_mutation = merger_mutator.getMaxSourcePartSizeForMutation();
FutureMergedMutatedPart future_merged_part;
@ -2973,10 +2979,11 @@ void StorageReplicatedMergeTree::assertNotReadonly() const
BlockOutputStreamPtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/, const Context & context)
{
const auto storage_settings = getCOWSettings();
assertNotReadonly();
const Settings & query_settings = context.getSettingsRef();
bool deduplicate = settings.replicated_deduplication_window != 0 && query_settings.insert_deduplicate;
bool deduplicate = storage_settings->replicated_deduplication_window != 0 && query_settings.insert_deduplicate;
return std::make_shared<ReplicatedMergeTreeBlockOutputStream>(*this,
query_settings.insert_quorum, query_settings.insert_quorum_timeout.totalMilliseconds(), query_settings.max_partitions_per_insert_block, deduplicate);
@ -3010,6 +3017,7 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p
};
bool force_ttl = (final && (hasTableTTL() || hasAnyColumnTTL()));
const auto storage_settings = getCOWSettings();
if (!partition && final)
{
@ -3042,7 +3050,7 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p
if (!partition)
{
selected = merger_mutator.selectPartsToMerge(
future_merged_part, true, settings.max_bytes_to_merge_at_max_space_in_pool, can_merge, &disable_reason);
future_merged_part, true, storage_settings->max_bytes_to_merge_at_max_space_in_pool, can_merge, &disable_reason);
}
else
{
@ -3092,9 +3100,9 @@ void StorageReplicatedMergeTree::alter(
if (params.isSettingsAlter())
{
/// We don't replicate settings ALTER. It's local operation.
/// We don't replicate storage_settings ALTER. It's local operation.
/// Also we don't upgrade alter lock to table structure lock.
LOG_DEBUG(log, "ALTER settings only");
LOG_DEBUG(log, "ALTER storage_settings only");
SettingsChanges new_changes;
params.applyForSettingsOnly(new_changes);
alterSettings(new_changes, current_database_name, current_table_name, query_context, table_lock_holder);
@ -3920,9 +3928,10 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String &
void StorageReplicatedMergeTree::getStatus(Status & res, bool with_zk_fields)
{
auto zookeeper = tryGetZooKeeper();
const auto storage_settings = getCOWSettings();
res.is_leader = is_leader;
res.can_become_leader = settings.replicated_can_become_leader;
res.can_become_leader = storage_settings->replicated_can_become_leader;
res.is_readonly = is_readonly;
res.is_session_expired = !zookeeper || zookeeper->expired();
@ -4112,13 +4121,14 @@ void StorageReplicatedMergeTree::getReplicaDelays(time_t & out_absolute_delay, t
out_absolute_delay = getAbsoluteDelay();
out_relative_delay = 0;
const auto storage_settings = getCOWSettings();
/** Relative delay is the maximum difference of absolute delay from any other replica,
* (if this replica lags behind any other live replica, or zero, otherwise).
* Calculated only if the absolute delay is large enough.
*/
if (out_absolute_delay < static_cast<time_t>(settings.min_relative_delay_to_yield_leadership))
if (out_absolute_delay < static_cast<time_t>(storage_settings->min_relative_delay_to_yield_leadership))
return;
auto zookeeper = getZooKeeper();
@ -4376,7 +4386,7 @@ void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, const
/// instead.
///
/// Mutations of individual parts are in fact pretty similar to merges, e.g. their assignment and execution
/// is governed by the same settings. TODO: support a single "merge-mutation" operation when the data
/// is governed by the same storage_settings. TODO: support a single "merge-mutation" operation when the data
/// read from the the source parts is first mutated on the fly to some uniform mutation version and then
/// merged to a resulting part.
///
@ -4939,6 +4949,7 @@ void StorageReplicatedMergeTree::getCommitPartOps(
const String & block_id_path) const
{
const String & part_name = part->name;
const auto storage_settings = getCOWSettings();
if (!block_id_path.empty())
{
@ -4956,7 +4967,7 @@ void StorageReplicatedMergeTree::getCommitPartOps(
zookeeper_path + "/columns",
columns_version));
if (settings.use_minimalistic_part_header_in_zookeeper)
if (storage_settings->use_minimalistic_part_header_in_zookeeper)
{
ops.emplace_back(zkutil::makeCreateRequest(
replica_path + "/parts/" + part->name,
@ -4985,11 +4996,12 @@ void StorageReplicatedMergeTree::updatePartHeaderInZooKeeperAndCommit(
AlterDataPartTransaction & transaction)
{
String part_path = replica_path + "/parts/" + transaction.getPartName();
const auto storage_settings = getCOWSettings();
bool need_delete_columns_and_checksums_nodes = false;
try
{
if (settings.use_minimalistic_part_header_in_zookeeper)
if (storage_settings->use_minimalistic_part_header_in_zookeeper)
{
auto part_header = ReplicatedMergeTreePartHeader::fromColumnsAndChecksums(
transaction.getNewColumns(), transaction.getNewChecksums());
@ -5169,8 +5181,9 @@ CheckResults StorageReplicatedMergeTree::checkData(const ASTPtr & query, const C
bool StorageReplicatedMergeTree::canUseAdaptiveGranularity() const
{
return settings.index_granularity_bytes != 0 &&
(settings.enable_mixed_granularity_parts ||
const auto storage_settings = getCOWSettings();
return storage_settings->index_granularity_bytes != 0 &&
(storage_settings->enable_mixed_granularity_parts ||
(!has_non_adaptive_index_granularity_parts && !other_replicas_fixed_granularity));
}

View File

@ -534,7 +534,7 @@ protected:
const ASTPtr & sample_by_ast_,
const ASTPtr & table_ttl_ast_,
const MergingParams & merging_params_,
const MergeTreeSettings & settings_,
MergeTreeSettingsPtr settings_,
bool has_force_restore_data_flag);
};