Merge pull request #44472 from CurtizJ/fix-mutations

Fix mutations with setting `max_streams_for_merge_tree_reading`
This commit is contained in:
Nikolai Kochetov 2022-12-22 14:05:20 +01:00 committed by GitHub
commit 773de7a8a6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 44 additions and 0 deletions

View File

@ -220,8 +220,13 @@ bool isStorageTouchedByMutations(
if (all_commands_can_be_skipped)
return false;
/// We must read with one thread because it guarantees that
/// output stream will be sorted after reading from MergeTree parts.
/// Disable all settings that can enable reading with several streams.
context_copy->setSetting("max_streams_to_max_threads_ratio", 1);
context_copy->setSetting("max_threads", 1);
context_copy->setSetting("allow_asynchronous_read_from_io_pool_for_merge_tree", false);
context_copy->setSetting("max_streams_for_merge_tree_reading", Field(0));
ASTPtr select_query = prepareQueryAffectedAST(commands, storage, context_copy);

View File

@ -1514,8 +1514,14 @@ bool MutateTask::prepare()
ctx->num_mutations = std::make_unique<CurrentMetrics::Increment>(CurrentMetrics::PartMutation);
auto context_for_reading = Context::createCopy(ctx->context);
/// We must read with one thread because it guarantees that output stream will be sorted.
/// Disable all settings that can enable reading with several streams.
context_for_reading->setSetting("max_streams_to_max_threads_ratio", 1);
context_for_reading->setSetting("max_threads", 1);
context_for_reading->setSetting("allow_asynchronous_read_from_io_pool_for_merge_tree", false);
context_for_reading->setSetting("max_streams_for_merge_tree_reading", Field(0));
/// Allow mutations to work when force_index_by_date or force_primary_key is on.
context_for_reading->setSetting("force_index_by_date", false);
context_for_reading->setSetting("force_primary_key", false);

View File

@ -0,0 +1,8 @@
<clickhouse>
<profiles>
<default>
<allow_asynchronous_read_from_io_pool_for_merge_tree>1</allow_asynchronous_read_from_io_pool_for_merge_tree>
<max_streams_for_merge_tree_reading>64</max_streams_for_merge_tree_reading>
</default>
</profiles>
</clickhouse>

View File

@ -19,6 +19,14 @@ node2 = cluster.add_instance(
stay_alive=True,
)
node3 = cluster.add_instance(
"node3",
main_configs=["configs/logs_config.xml", "configs/cluster.xml"],
user_configs=["configs/max_threads.xml"],
with_zookeeper=True,
stay_alive=True,
)
@pytest.fixture(scope="module")
def started_cluster():
@ -180,3 +188,20 @@ def test_trivial_alter_in_partition_replicated_merge_tree(started_cluster):
finally:
node1.query("DROP TABLE IF EXISTS {}".format(name))
node2.query("DROP TABLE IF EXISTS {}".format(name))
def test_mutation_max_streams(started_cluster):
try:
node3.query("DROP TABLE IF EXISTS t_mutations")
node3.query("CREATE TABLE t_mutations (a UInt32) ENGINE = MergeTree ORDER BY a")
node3.query("INSERT INTO t_mutations SELECT number FROM numbers(10000000)")
node3.query(
"ALTER TABLE t_mutations DELETE WHERE a = 300000",
settings={"mutations_sync": "2"},
)
assert node3.query("SELECT count() FROM t_mutations") == "9999999\n"
finally:
node3.query("DROP TABLE IF EXISTS t_mutations")