Make it configurable

This commit is contained in:
Alexey Milovidov 2024-10-21 00:35:01 +02:00
parent ab10830317
commit bb3bfa536a
7 changed files with 28 additions and 11 deletions

View File

@ -4,6 +4,7 @@
#include <Common/Scheduler/ResourceLink.h>
#include <IO/DistributedCacheSettings.h>
namespace DB
{

View File

@ -178,9 +178,24 @@ MergeTreeDataPartWriterPtr createMergeTreeDataPartWriter(
const MergeTreeIndexGranularity & computed_index_granularity)
{
if (part_type == MergeTreeDataPartType::Compact)
return createMergeTreeDataPartCompactWriter(data_part_name_, logger_name_, serializations_, data_part_storage_,
index_granularity_info_, storage_settings_, columns_list, column_positions, metadata_snapshot, virtual_columns, indices_to_recalc, stats_to_recalc_,
marks_file_extension_, default_codec_, writer_settings, computed_index_granularity);
return createMergeTreeDataPartCompactWriter(
data_part_name_,
logger_name_,
serializations_,
data_part_storage_,
index_granularity_info_,
storage_settings_,
columns_list,
column_positions,
metadata_snapshot,
virtual_columns,
indices_to_recalc,
stats_to_recalc_,
marks_file_extension_,
default_codec_,
writer_settings,
computed_index_granularity);
if (part_type == MergeTreeDataPartType::Wide)
return createMergeTreeDataPartWideWriter(
data_part_name_,
@ -198,6 +213,7 @@ MergeTreeDataPartWriterPtr createMergeTreeDataPartWriter(
default_codec_,
writer_settings,
computed_index_granularity);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown part type: {}", part_type.toString());
}

View File

@ -4,6 +4,7 @@
#include <Storages/MergeTree/IMergeTreeDataPartWriter.h>
#include <Common/logger_useful.h>
namespace DB
{

View File

@ -7,6 +7,7 @@
#include <Storages/MergeTree/MergeTreeIndexGranularity.h>
#include <Common/Logger.h>
namespace DB
{

View File

@ -6,11 +6,8 @@
#include <fmt/format.h>
#include <Common/logger_useful.h>
#include <Common/ActionBlocker.h>
#include <Core/Settings.h>
#include <Common/ProfileEvents.h>
#include <Processors/Transforms/CheckSortedTransform.h>
#include <Storages/MergeTree/DataPartStorageOnDiskFull.h>
#include <Compression/CompressedWriteBuffer.h>
#include <DataTypes/ObjectUtils.h>
#include <DataTypes/Serializations/SerializationInfo.h>
@ -20,10 +17,8 @@
#include <Storages/MergeTree/MergeTreeSequentialSource.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Storages/MergeTree/FutureMergedMutatedPart.h>
#include <Storages/MergeTree/MergeTreeDataMergerMutator.h>
#include <Storages/MergeTree/MergeTreeDataWriter.h>
#include <Storages/MergeTree/MergeProjectionPartsTask.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/MaterializingTransform.h>
#include <Processors/Transforms/FilterTransform.h>
#include <Processors/Merges/MergingSortedTransform.h>
@ -34,9 +29,6 @@
#include <Processors/Merges/AggregatingSortedTransform.h>
#include <Processors/Merges/VersionedCollapsingTransform.h>
#include <Processors/Transforms/TTLTransform.h>
#include <Processors/Transforms/TTLCalcTransform.h>
#include <Processors/Transforms/DistinctSortedTransform.h>
#include <Processors/Transforms/DistinctTransform.h>
#include <Processors/QueryPlan/CreatingSetsStep.h>
#include <Processors/QueryPlan/DistinctStep.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
@ -48,6 +40,7 @@
#include <Interpreters/MergeTreeTransaction.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
namespace ProfileEvents
{
extern const Event Merge;

View File

@ -26,6 +26,7 @@ namespace MergeTreeSetting
extern const MergeTreeSettingsString primary_key_compression_codec;
extern const MergeTreeSettingsBool use_adaptive_write_buffer_for_dynamic_subcolumns;
extern const MergeTreeSettingsBool use_compact_variant_discriminators_serialization;
extern const MergeTreeSettingsUInt64 max_compression_threads;
}
MergeTreeWriterSettings::MergeTreeWriterSettings(
@ -54,6 +55,7 @@ MergeTreeWriterSettings::MergeTreeWriterSettings(
, use_adaptive_write_buffer_for_dynamic_subcolumns((*storage_settings)[MergeTreeSetting::use_adaptive_write_buffer_for_dynamic_subcolumns])
, adaptive_write_buffer_initial_size((*storage_settings)[MergeTreeSetting::adaptive_write_buffer_initial_size])
{
query_write_settings.max_compression_threads = (*storage_settings)[MergeTreeSetting::max_compression_threads];
}
}

View File

@ -53,6 +53,9 @@ namespace ErrorCodes
M(Bool, load_existing_rows_count_for_old_parts, false, "Whether to load existing_rows_count for existing parts. If false, existing_rows_count will be equal to rows_count for existing parts.", 0) \
M(Bool, use_compact_variant_discriminators_serialization, true, "Use compact version of Variant discriminators serialization.", 0) \
\
/** Merge and insert settings */ \
M(UInt64, max_compression_threads, 1, "Maximum number of threads for writing compressed data. This is an expert-level setting, do not change it.", 0) \
\
/** Merge selector settings. */ \
M(UInt64, merge_selector_blurry_base_scale_factor, 0, "Controls when the logic kicks in relatively to the number of parts in partition. The bigger the factor the more belated reaction will be.", 0) \
M(UInt64, merge_selector_window_size, 1000, "How many parts to look at once.", 0) \