mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Merge pull request #34780 from azat/mt-delayed-part-flush
Do not delay final part writing by default (fixes possible Memory limit exceeded during INSERT)
This commit is contained in:
commit
ee9c2ec735
@ -3290,6 +3290,19 @@ Possible values:
|
||||
|
||||
Default value: `16`.
|
||||
|
||||
## max_insert_delayed_streams_for_parallel_write {#max-insert-delayed-streams-for-parallel-write}
|
||||
|
||||
The maximum number of streams (columns) to delay final part flush.
|
||||
|
||||
It makes difference only if underlying storage supports parallel write (i.e. S3), otherwise it will not give any benefit.
|
||||
|
||||
Possible values:
|
||||
|
||||
- Positive integer.
|
||||
- 0 or 1 — Disabled.
|
||||
|
||||
Default value: `1000` for S3 and `0` otherwise.
|
||||
|
||||
## opentelemetry_start_trace_probability {#opentelemetry-start-trace-probability}
|
||||
|
||||
Sets the probability that the ClickHouse can start a trace for executed queries (if no parent [trace context](https://www.w3.org/TR/trace-context/) is supplied).
|
||||
|
@ -44,6 +44,7 @@ class IColumn;
|
||||
M(UInt64, min_insert_block_size_bytes_for_materialized_views, 0, "Like min_insert_block_size_bytes, but applied only during pushing to MATERIALIZED VIEW (default: min_insert_block_size_bytes)", 0) \
|
||||
M(UInt64, max_joined_block_size_rows, DEFAULT_BLOCK_SIZE, "Maximum block size for JOIN result (if join algorithm supports it). 0 means unlimited.", 0) \
|
||||
M(UInt64, max_insert_threads, 0, "The maximum number of threads to execute the INSERT SELECT query. Values 0 or 1 means that INSERT SELECT is not run in parallel. Higher values will lead to higher memory usage. Parallel INSERT SELECT has effect only if the SELECT part is run on parallel, see 'max_threads' setting.", 0) \
|
||||
M(UInt64, max_insert_delayed_streams_for_parallel_write, 0, "The maximum number of streams (columns) to delay final part flush. Default - auto (1000 in case of underlying storage supports parallel write, for example S3 and disabled otherwise)", 0) \
|
||||
M(UInt64, max_final_threads, 16, "The maximum number of threads to read from table with FINAL.", 0) \
|
||||
M(MaxThreads, max_threads, 0, "The maximum number of threads to execute the request. By default, it is determined automatically.", 0) \
|
||||
M(UInt64, max_read_buffer_size, DBMS_DEFAULT_BUFFER_SIZE, "The maximum size of the buffer to read from the filesystem.", 0) \
|
||||
|
@ -248,6 +248,10 @@ public:
|
||||
/// Overrode in remote fs disks.
|
||||
virtual bool supportZeroCopyReplication() const = 0;
|
||||
|
||||
/// Whether this disk support parallel write
|
||||
/// Overrode in remote fs disks.
|
||||
virtual bool supportParallelWrite() const { return false; }
|
||||
|
||||
virtual bool isReadOnly() const { return false; }
|
||||
|
||||
/// Check if disk is broken. Broken disks will have 0 space and not be used.
|
||||
|
@ -4,7 +4,6 @@
|
||||
#include <IO/ReadBufferFromFile.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/WriteBufferFromFile.h>
|
||||
#include <IO/WriteBufferFromS3.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Common/createHardLink.h>
|
||||
#include <Common/quoteString.h>
|
||||
|
@ -105,6 +105,8 @@ public:
|
||||
|
||||
bool supportZeroCopyReplication() const override { return true; }
|
||||
|
||||
bool supportParallelWrite() const override { return true; }
|
||||
|
||||
void shutdown() override;
|
||||
|
||||
void startup() override;
|
||||
|
@ -39,6 +39,9 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/// Number of streams is not number parts, but number or parts*files, hence 1000.
|
||||
const size_t DEFAULT_DELAYED_STREAMS_FOR_PARALLEL_WRITE = 1000;
|
||||
|
||||
class AlterCommands;
|
||||
class MergeTreePartsMover;
|
||||
class MergeTreeDataMergerMutator;
|
||||
|
@ -52,7 +52,14 @@ void MergeTreeSink::consume(Chunk chunk)
|
||||
auto block = getHeader().cloneWithColumns(chunk.detachColumns());
|
||||
|
||||
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context);
|
||||
std::vector<MergeTreeSink::DelayedChunk::Partition> partitions;
|
||||
|
||||
using DelayedPartitions = std::vector<MergeTreeSink::DelayedChunk::Partition>;
|
||||
DelayedPartitions partitions;
|
||||
|
||||
const Settings & settings = context->getSettingsRef();
|
||||
size_t streams = 0;
|
||||
bool support_parallel_write = false;
|
||||
|
||||
for (auto & current_block : part_blocks)
|
||||
{
|
||||
Stopwatch watch;
|
||||
@ -67,9 +74,12 @@ void MergeTreeSink::consume(Chunk chunk)
|
||||
if (!temp_part.part)
|
||||
continue;
|
||||
|
||||
if (!support_parallel_write && temp_part.part->volume->getDisk()->supportParallelWrite())
|
||||
support_parallel_write = true;
|
||||
|
||||
if (storage.getDeduplicationLog())
|
||||
{
|
||||
const String & dedup_token = context->getSettingsRef().insert_deduplication_token;
|
||||
const String & dedup_token = settings.insert_deduplication_token;
|
||||
if (!dedup_token.empty())
|
||||
{
|
||||
/// multiple blocks can be inserted within the same insert query
|
||||
@ -79,6 +89,24 @@ void MergeTreeSink::consume(Chunk chunk)
|
||||
}
|
||||
}
|
||||
|
||||
size_t max_insert_delayed_streams_for_parallel_write = DEFAULT_DELAYED_STREAMS_FOR_PARALLEL_WRITE;
|
||||
if (!support_parallel_write || settings.max_insert_delayed_streams_for_parallel_write.changed)
|
||||
max_insert_delayed_streams_for_parallel_write = settings.max_insert_delayed_streams_for_parallel_write;
|
||||
|
||||
/// In case of too much columns/parts in block, flush explicitly.
|
||||
streams += temp_part.streams.size();
|
||||
if (streams > max_insert_delayed_streams_for_parallel_write)
|
||||
{
|
||||
finishDelayedChunk();
|
||||
delayed_chunk = std::make_unique<MergeTreeSink::DelayedChunk>();
|
||||
delayed_chunk->partitions = std::move(partitions);
|
||||
finishDelayedChunk();
|
||||
|
||||
streams = 0;
|
||||
support_parallel_write = false;
|
||||
partitions = DelayedPartitions{};
|
||||
}
|
||||
|
||||
partitions.emplace_back(MergeTreeSink::DelayedChunk::Partition
|
||||
{
|
||||
.temp_part = std::move(temp_part),
|
||||
|
@ -150,9 +150,14 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk)
|
||||
if (quorum)
|
||||
checkQuorumPrecondition(zookeeper);
|
||||
|
||||
const Settings & settings = context->getSettingsRef();
|
||||
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context);
|
||||
std::vector<ReplicatedMergeTreeSink::DelayedChunk::Partition> partitions;
|
||||
String block_dedup_token;
|
||||
|
||||
using DelayedPartitions = std::vector<ReplicatedMergeTreeSink::DelayedChunk::Partition>;
|
||||
DelayedPartitions partitions;
|
||||
|
||||
size_t streams = 0;
|
||||
bool support_parallel_write = false;
|
||||
|
||||
for (auto & current_block : part_blocks)
|
||||
{
|
||||
@ -171,10 +176,12 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk)
|
||||
|
||||
if (deduplicate)
|
||||
{
|
||||
String block_dedup_token;
|
||||
|
||||
/// We add the hash from the data and partition identifier to deduplication ID.
|
||||
/// That is, do not insert the same data to the same partition twice.
|
||||
|
||||
const String & dedup_token = context->getSettingsRef().insert_deduplication_token;
|
||||
const String & dedup_token = settings.insert_deduplication_token;
|
||||
if (!dedup_token.empty())
|
||||
{
|
||||
/// multiple blocks can be inserted within the same insert query
|
||||
@ -182,6 +189,7 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk)
|
||||
block_dedup_token = fmt::format("{}_{}", dedup_token, chunk_dedup_seqnum);
|
||||
++chunk_dedup_seqnum;
|
||||
}
|
||||
|
||||
block_id = temp_part.part->getZeroLevelPartBlockID(block_dedup_token);
|
||||
LOG_DEBUG(log, "Wrote block with ID '{}', {} rows", block_id, current_block.block.rows());
|
||||
}
|
||||
@ -192,6 +200,24 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk)
|
||||
|
||||
UInt64 elapsed_ns = watch.elapsed();
|
||||
|
||||
size_t max_insert_delayed_streams_for_parallel_write = DEFAULT_DELAYED_STREAMS_FOR_PARALLEL_WRITE;
|
||||
if (!support_parallel_write || settings.max_insert_delayed_streams_for_parallel_write.changed)
|
||||
max_insert_delayed_streams_for_parallel_write = settings.max_insert_delayed_streams_for_parallel_write;
|
||||
|
||||
/// In case of too much columns/parts in block, flush explicitly.
|
||||
streams += temp_part.streams.size();
|
||||
if (streams > max_insert_delayed_streams_for_parallel_write)
|
||||
{
|
||||
finishDelayedChunk(zookeeper);
|
||||
delayed_chunk = std::make_unique<ReplicatedMergeTreeSink::DelayedChunk>();
|
||||
delayed_chunk->partitions = std::move(partitions);
|
||||
finishDelayedChunk(zookeeper);
|
||||
|
||||
streams = 0;
|
||||
support_parallel_write = false;
|
||||
partitions = DelayedPartitions{};
|
||||
}
|
||||
|
||||
partitions.emplace_back(ReplicatedMergeTreeSink::DelayedChunk::Partition{
|
||||
.temp_part = std::move(temp_part),
|
||||
.elapsed_ns = elapsed_ns,
|
||||
@ -207,7 +233,7 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk)
|
||||
/// value for `last_block_is_duplicate`, which is possible only after the part is committed.
|
||||
/// Othervide we can delay commit.
|
||||
/// TODO: we can also delay commit if there is no MVs.
|
||||
if (!context->getSettingsRef().deduplicate_blocks_in_dependent_materialized_views)
|
||||
if (!settings.deduplicate_blocks_in_dependent_materialized_views)
|
||||
finishDelayedChunk(zookeeper);
|
||||
}
|
||||
|
||||
|
@ -1,4 +1,7 @@
|
||||
-- Tags: long
|
||||
-- Tags: long, no-tsan, no-parallel, no-random-settings
|
||||
-- Tag: no-tsan -- too slow under TSan (~5-6min)
|
||||
-- Tag: no-random-settings -- to avoid settings overlaps
|
||||
-- Tag: no-parallel -- to reduce test time
|
||||
--
|
||||
-- Test for testing various read settings.
|
||||
|
||||
|
@ -0,0 +1,15 @@
|
||||
-- Tags: long, no-parallel
|
||||
|
||||
-- regression for MEMORY_LIMIT_EXCEEDED error because of deferred final part flush
|
||||
|
||||
drop table if exists data_02228;
|
||||
create table data_02228 (key1 UInt32, sign Int8, s UInt64) engine = CollapsingMergeTree(sign) order by (key1) partition by key1 % 1024;
|
||||
insert into data_02228 select number, 1, number from numbers_mt(100e3) settings max_memory_usage='300Mi', max_partitions_per_insert_block=1024;
|
||||
insert into data_02228 select number, 1, number from numbers_mt(100e3) settings max_memory_usage='300Mi', max_partitions_per_insert_block=1024, max_insert_delayed_streams_for_parallel_write=10000000; -- { serverError MEMORY_LIMIT_EXCEEDED }
|
||||
drop table data_02228;
|
||||
|
||||
drop table if exists data_rep_02228;
|
||||
create table data_rep_02228 (key1 UInt32, sign Int8, s UInt64) engine = ReplicatedCollapsingMergeTree('/clickhouse/{database}', 'r1', sign) order by (key1) partition by key1 % 1024;
|
||||
insert into data_rep_02228 select number, 1, number from numbers_mt(100e3) settings max_memory_usage='300Mi', max_partitions_per_insert_block=1024;
|
||||
insert into data_rep_02228 select number, 1, number from numbers_mt(100e3) settings max_memory_usage='300Mi', max_partitions_per_insert_block=1024, max_insert_delayed_streams_for_parallel_write=10000000; -- { serverError MEMORY_LIMIT_EXCEEDED }
|
||||
drop table data_rep_02228;
|
Loading…
Reference in New Issue
Block a user