Better test

This commit is contained in:
alesapin 2023-10-16 13:04:45 +02:00
parent bdafe17e9a
commit 31dc46a889
7 changed files with 82 additions and 119 deletions

View File

@ -20,6 +20,7 @@
#include <Common/ThreadFuzzer.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/Config/ConfigHelper.h>
#include <Storages/MergeTree/RangesInDataPart.h>
#include <Compression/CompressedReadBuffer.h>
#include <Core/QueryProcessingStage.h>
#include <DataTypes/DataTypeEnum.h>
@ -94,6 +95,7 @@
#include <iomanip>
#include <limits>
#include <optional>
#include <ranges>
#include <set>
#include <thread>
#include <typeinfo>
@ -3943,10 +3945,26 @@ void MergeTreeData::forcefullyMovePartToDetachedAndRemoveFromMemory(const MergeT
/// ActiveDataPartSet allows to restore most top-level parts instead of unexpected.
/// It can be important in case of assigned merges. If unexpected part is result of some
/// finished, but not committed merge we should restore closest ancestors for the
/// unexpected part to be able to execute it.
/// finished, but not committed merge then we should restore (at least try to restore)
/// closest ancestors for the unexpected part to be able to execute it.
/// However it's not guaranteed because outdated parts can intersect
ActiveDataPartSet parts_for_replacement(format_version);
for (const auto & part_candidate_in_partition : getDataPartsPartitionRange(part->info.partition_id))
auto range = getDataPartsPartitionRange(part->info.partition_id);
DataPartsVector parts_candidates(range.begin(), range.end());
/// In case of intersecting outdated parts we want to add bigger parts (with higher level) first
auto comparator = [] (const DataPartPtr left, const DataPartPtr right) -> bool
{
if (left->info.level < right->info.level)
return true;
else if (left->info.level > right->info.level)
return false;
else
return left->info.mutation < right->info.mutation;
};
std::sort(parts_candidates.begin(), parts_candidates.end(), comparator);
/// From larger to smaller parts
for (const auto & part_candidate_in_partition : parts_candidates | std::views::reverse)
{
if (part->info.contains(part_candidate_in_partition->info)
&& is_appropriate_state(part_candidate_in_partition->getState()))
@ -3966,6 +3984,7 @@ void MergeTreeData::forcefullyMovePartToDetachedAndRemoveFromMemory(const MergeT
if (parts_for_replacement.size() > 0)
{
std::vector<std::pair<uint64_t, uint64_t>> holes_list;
/// Most part of the code bellow is just to write pretty message
auto part_infos = parts_for_replacement.getPartInfos();
int64_t current_right_block = part_infos[0].min_block;
for (const auto & top_level_part_to_replace : part_infos)

View File

@ -1,21 +0,0 @@
<clickhouse>
<storage_configuration>
<disks>
<s3>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3>
</disks>
<policies>
<s3>
<volumes>
<main>
<disk>s3</disk>
</main>
</volumes>
</s3>
</policies>
</storage_configuration>
</clickhouse>

View File

@ -1,5 +0,0 @@
<clickhouse>
<zookeeper>
<session_timeout_ms>3000</session_timeout_ms>
</zookeeper>
</clickhouse>

View File

@ -1,90 +0,0 @@
import logging
import pytest
import time
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
from helpers.client import QueryRuntimeException
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1",
main_configs=["configs/keeper_config.xml", "configs/disks.xml"],
stay_alive=True,
with_zookeeper=True,
with_minio=True,
)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_merge_session_expired(started_cluster):
node1.query("drop table if exists tab")
node1.query(
"create table tab (x UInt64) engine = ReplicatedMergeTree('/clickhouse/tables/tab/', '0') order by tuple() settings old_parts_lifetime=3"
)
node1.query("insert into tab select number from numbers(10)")
node1.query("insert into tab select number + 10 from numbers(10)")
node1.query("alter table tab delete where x = 12 settings mutations_sync=2")
node1.query("alter table tab delete where x = 14 settings mutations_sync=2")
node1.query("alter table tab delete where x = 16 settings mutations_sync=2")
node1.query("system stop merges")
node1.query("optimize table tab final settings alter_sync=0")
with PartitionManager() as pm:
pm.drop_instance_zk_connections(node1)
node1.query("system start merges")
node1.query("select sleep(1)")
node1.restart_clickhouse()
pm.restore_instance_zk_connections(node1)
node1.query("system restart replica tab")
assert node1.query("select count() from tab") == "17\n"
def test_merge_session_expired_zero_copy(started_cluster):
node1.query("drop table if exists tab")
node1.query(
"""
create table tab (x UInt64, y UInt64) engine = ReplicatedMergeTree('/clickhouse/tables/tab2/', '0') order by tuple()
settings old_parts_lifetime=1, storage_policy='s3', allow_remote_fs_zero_copy_replication=1, replicated_max_ratio_of_wrong_parts=1, min_bytes_for_wide_part=1
"""
)
node1.query("insert into tab select number, number from numbers(10)")
node1.query("insert into tab select number + 10, number + 10 from numbers(10)")
node1.query("alter table tab update x = x + 1 where 1 settings mutations_sync=2")
node1.query("select * from tab")
node1.query(
"alter table tab update x = x + 1, y = y + 1 where 1 settings mutations_sync=2"
)
node1.query("select * from tab")
node1.query("alter table tab update x = x + 1 where 1 settings mutations_sync=2")
node1.query("select * from tab")
node1.query(
"alter table tab add column z UInt64 materialized x + sleepEachRow(0.05) settings mutations_sync=2"
)
node1.query("optimize table tab final settings alter_sync=0")
with PartitionManager() as pm:
pm.drop_instance_zk_connections(node1)
# Wait some time for merge to start
# Part should be merged and stayed on disk, but not commited into zk
node1.query("select sleep(2)")
node1.restart_clickhouse()
pm.restore_instance_zk_connections(node1)
node1.query("system restart replica tab")
# Wait for outdated parts to be removed
node1.query("select sleep(3)")
node1.query("select * from tab")
node1.query("system sync replica tab")
assert node1.query("select count() from tab") == "20\n"

View File

@ -0,0 +1,6 @@
all_0_1_11_5
all_4_4_0_5
all_0_1_11_5
all_4_4_0_5
all_0_1_11_5
all_4_4_0_5

View File

@ -0,0 +1,54 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS table_with_unsuccessful_commits"
$CLICKHOUSE_CLIENT --query "CREATE TABLE table_with_unsuccessful_commits (key UInt64, value String) ENGINE ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/unsuccessful', '1') ORDER BY tuple() SETTINGS cleanup_delay_period=1000, max_cleanup_delay_period=1000, old_parts_lifetime = 1697460529, remove_rolled_back_parts_immediately=0, replicated_max_ratio_of_wrong_parts=1, max_suspicious_broken_parts=1000000, max_suspicious_broken_parts_bytes=10000000000"
$CLICKHOUSE_CLIENT --query "INSERT INTO table_with_unsuccessful_commits SELECT rand(), toString(rand()) FROM numbers(10)"
$CLICKHOUSE_CLIENT --query "INSERT INTO table_with_unsuccessful_commits SELECT rand(), toString(rand()) FROM numbers(10)"
for i in {0..10}; do
$CLICKHOUSE_CLIENT --query "OPTIMIZE TABLE table_with_unsuccessful_commits FINAL"
done
$CLICKHOUSE_CLIENT --query "ALTER TABLE table_with_unsuccessful_commits DELETE WHERE value = 'hello' SETTINGS mutations_sync=2"
$CLICKHOUSE_CLIENT --query "ALTER TABLE table_with_unsuccessful_commits DELETE WHERE value = 'hello' SETTINGS mutations_sync=2"
$CLICKHOUSE_CLIENT --query "INSERT INTO table_with_unsuccessful_commits SELECT rand(), toString(rand()) FROM numbers(10)"
$CLICKHOUSE_CLIENT --query "ALTER TABLE table_with_unsuccessful_commits DELETE WHERE value = 'hello' SETTINGS mutations_sync=2"
$CLICKHOUSE_CLIENT --query "SELECT name FROM system.parts where table = 'table_with_unsuccessful_commits' and database = currentDatabase() and active order by name"
$CLICKHOUSE_CLIENT --query "ALTER TABLE table_with_unsuccessful_commits MODIFY SETTING fault_probability_before_part_commit=1"
$CLICKHOUSE_CLIENT --query "OPTIMIZE TABLE table_with_unsuccessful_commits FINAL SETTINGS alter_sync=0"
i=0 retries=300
while [[ $i -lt $retries ]]; do
result=$($CLICKHOUSE_CLIENT --query "SELECT last_exception FROM system.replication_queue WHERE table = 'table_with_unsuccessful_commits' and database=currentDatabase()")
if [[ $result ]]; then
break
fi
((++i))
done
$CLICKHOUSE_CLIENT --query "SELECT name FROM system.parts where table = 'table_with_unsuccessful_commits' and database = currentDatabase() and active order by name"
$CLICKHOUSE_CLIENT --query "DETACH TABLE table_with_unsuccessful_commits"
$CLICKHOUSE_CLIENT --query "ATTACH TABLE table_with_unsuccessful_commits"
$CLICKHOUSE_CLIENT --query "SELECT name FROM system.parts where table = 'table_with_unsuccessful_commits' and database = currentDatabase() and active order by name"
$CLICKHOUSE_CLIENT --query "DROP TABLE table_with_unsuccessful_commits"