Merge remote-tracking branch 'origin/master' into pr-right-joins

This commit is contained in:
Igor Nikonov 2024-11-06 11:29:27 +00:00
commit 9c1be76c14
11 changed files with 56 additions and 10 deletions

View File

@ -86,10 +86,12 @@ void ExecutionSpeedLimits::throttle(
if (timeout_overflow_mode == OverflowMode::THROW && estimated_execution_time_seconds > max_estimated_execution_time.totalSeconds())
throw Exception(
ErrorCodes::TOO_SLOW,
"Estimated query execution time ({} seconds) is too long. Maximum: {}. Estimated rows to process: {}",
"Estimated query execution time ({:.5f} seconds) is too long. Maximum: {}. Estimated rows to process: {} ({} read in {:.5f} seconds).",
estimated_execution_time_seconds,
max_estimated_execution_time.totalSeconds(),
total_rows_to_read);
total_rows_to_read,
read_rows,
elapsed_seconds);
}
if (max_execution_rps && rows_per_second >= max_execution_rps)

View File

@ -46,6 +46,8 @@ public:
virtual void finish(bool sync) = 0;
virtual size_t getNumberOfOpenStreams() const = 0;
Columns releaseIndexColumns();
PlainMarksByName releaseCachedMarks();

View File

@ -39,6 +39,11 @@ public:
return writer->releaseCachedMarks();
}
size_t getNumberOfOpenStreams() const
{
return writer->getNumberOfOpenStreams();
}
protected:
/// Remove all columns marked expired in data_part. Also, clears checksums

View File

@ -71,6 +71,7 @@ namespace MergeTreeSetting
extern const MergeTreeSettingsUInt64 parts_to_throw_insert;
extern const MergeTreeSettingsMergeSelectorAlgorithm merge_selector_algorithm;
extern const MergeTreeSettingsBool merge_selector_enable_heuristic_to_remove_small_parts_at_right;
extern const MergeTreeSettingsFloat merge_selector_base;
}
namespace ErrorCodes
@ -542,6 +543,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMergeFromRanges(
simple_merge_settings.window_size = (*data_settings)[MergeTreeSetting::merge_selector_window_size];
simple_merge_settings.max_parts_to_merge_at_once = (*data_settings)[MergeTreeSetting::max_parts_to_merge_at_once];
simple_merge_settings.enable_heuristic_to_remove_small_parts_at_right = (*data_settings)[MergeTreeSetting::merge_selector_enable_heuristic_to_remove_small_parts_at_right];
simple_merge_settings.base = (*data_settings)[MergeTreeSetting::merge_selector_base];
if (!(*data_settings)[MergeTreeSetting::min_age_to_force_merge_on_partition_only])
simple_merge_settings.min_age_to_force_merge = (*data_settings)[MergeTreeSetting::min_age_to_force_merge_seconds];

View File

@ -32,6 +32,8 @@ public:
void fillChecksums(MergeTreeDataPartChecksums & checksums, NameSet & checksums_to_remove) override;
void finish(bool sync) override;
size_t getNumberOfOpenStreams() const override { return 1; }
private:
/// Finish serialization of the data. Flush rows in buffer to disk, compute checksums.
void fillDataChecksums(MergeTreeDataPartChecksums & checksums);

View File

@ -43,6 +43,8 @@ public:
void finish(bool sync) final;
size_t getNumberOfOpenStreams() const override { return column_streams.size(); }
private:
/// Finish serialization of data: write final mark if required and compute checksums
/// Also validate written data in debug mode

View File

@ -101,6 +101,7 @@ namespace ErrorCodes
DECLARE(Milliseconds, background_task_preferred_step_execution_time_ms, 50, "Target time to execution of one step of merge or mutation. Can be exceeded if one step takes longer time", 0) \
DECLARE(MergeSelectorAlgorithm, merge_selector_algorithm, MergeSelectorAlgorithm::SIMPLE, "The algorithm to select parts for merges assignment", EXPERIMENTAL) \
DECLARE(Bool, merge_selector_enable_heuristic_to_remove_small_parts_at_right, true, "Enable heuristic for selecting parts for merge which removes parts from right side of range, if their size is less than specified ratio (0.01) of sum_size. Works for Simple and StochasticSimple merge selectors", 0) \
DECLARE(Float, merge_selector_base, 5.0, "Affects write amplification of assigned merges (expert level setting, don't change if you don't understand what it is doing). Works for Simple and StochasticSimple merge selectors", 0) \
\
/** Inserts settings. */ \
DECLARE(UInt64, parts_to_delay_insert, 1000, "If table contains at least that many active parts in single partition, artificially slow down insert into table. Disabled if set to 0", 0) \

View File

@ -94,7 +94,7 @@ void MergeTreeSink::consume(Chunk & chunk)
DelayedPartitions partitions;
const Settings & settings = context->getSettingsRef();
size_t streams = 0;
size_t total_streams = 0;
bool support_parallel_write = false;
auto token_info = chunk.getChunkInfos().get<DeduplicationToken::TokenInfo>();
@ -153,16 +153,18 @@ void MergeTreeSink::consume(Chunk & chunk)
max_insert_delayed_streams_for_parallel_write = 0;
/// In case of too much columns/parts in block, flush explicitly.
streams += temp_part.streams.size();
size_t current_streams = 0;
for (const auto & stream : temp_part.streams)
current_streams += stream.stream->getNumberOfOpenStreams();
if (streams > max_insert_delayed_streams_for_parallel_write)
if (total_streams + current_streams > max_insert_delayed_streams_for_parallel_write)
{
finishDelayedChunk();
delayed_chunk = std::make_unique<MergeTreeSink::DelayedChunk>();
delayed_chunk->partitions = std::move(partitions);
finishDelayedChunk();
streams = 0;
total_streams = 0;
support_parallel_write = false;
partitions = DelayedPartitions{};
}
@ -174,6 +176,8 @@ void MergeTreeSink::consume(Chunk & chunk)
.block_dedup_token = block_dedup_token,
.part_counters = std::move(part_counters),
});
total_streams += current_streams;
}
if (need_to_define_dedup_token)

View File

@ -341,7 +341,7 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::consume(Chunk & chunk)
using DelayedPartitions = std::vector<DelayedPartition>;
DelayedPartitions partitions;
size_t streams = 0;
size_t total_streams = 0;
bool support_parallel_write = false;
for (auto & current_block : part_blocks)
@ -418,15 +418,18 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::consume(Chunk & chunk)
max_insert_delayed_streams_for_parallel_write = 0;
/// In case of too much columns/parts in block, flush explicitly.
streams += temp_part.streams.size();
if (streams > max_insert_delayed_streams_for_parallel_write)
size_t current_streams = 0;
for (const auto & stream : temp_part.streams)
current_streams += stream.stream->getNumberOfOpenStreams();
if (total_streams + current_streams > max_insert_delayed_streams_for_parallel_write)
{
finishDelayedChunk(zookeeper);
delayed_chunk = std::make_unique<ReplicatedMergeTreeSinkImpl<async_insert>::DelayedChunk>(replicas_num);
delayed_chunk->partitions = std::move(partitions);
finishDelayedChunk(zookeeper);
streams = 0;
total_streams = 0;
support_parallel_write = false;
partitions = DelayedPartitions{};
}
@ -447,6 +450,8 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::consume(Chunk & chunk)
std::move(unmerged_block),
std::move(part_counters) /// profile_events_scope must be reset here.
));
total_streams += current_streams;
}
if (need_to_define_dedup_token)

View File

@ -0,0 +1 @@
Ok

View File

@ -0,0 +1,20 @@
-- Tags: long, no-debug, no-asan, no-tsan, no-msan, no-ubsan, no-random-settings, no-random-merge-tree-settings
DROP TABLE IF EXISTS t_100_columns;
CREATE TABLE t_100_columns (id UInt64, c0 String, c1 String, c2 String, c3 String, c4 String, c5 String, c6 String, c7 String, c8 String, c9 String, c10 String, c11 String, c12 String, c13 String, c14 String, c15 String, c16 String, c17 String, c18 String, c19 String, c20 String, c21 String, c22 String, c23 String, c24 String, c25 String, c26 String, c27 String, c28 String, c29 String, c30 String, c31 String, c32 String, c33 String, c34 String, c35 String, c36 String, c37 String, c38 String, c39 String, c40 String, c41 String, c42 String, c43 String, c44 String, c45 String, c46 String, c47 String, c48 String, c49 String, c50 String)
ENGINE = MergeTree
ORDER BY id PARTITION BY id % 50
SETTINGS min_bytes_for_wide_part = 0, ratio_of_defaults_for_sparse_serialization = 1.0, max_compress_block_size = '1M', storage_policy = 's3_cache';
SET max_insert_delayed_streams_for_parallel_write = 55;
INSERT INTO t_100_columns (id) SELECT number FROM numbers(100);
SYSTEM FLUSH LOGS;
SELECT if (memory_usage < 300000000, 'Ok', format('Fail: memory usage {}', formatReadableSize(memory_usage)))
FROM system.query_log
WHERE current_database = currentDatabase() AND query LIKE 'INSERT INTO t_100_columns%' AND type = 'QueryFinish';
DROP TABLE t_100_columns;