CLICKHOUSE-2153: Move new_data_part shared pointer to allow Outdated part to be deleted immediately on mutation failure

This commit is contained in:
Nicolae Vartolomei 2022-05-03 14:16:03 +00:00
parent 5eeec8d9cb
commit 90666a43a8
4 changed files with 52 additions and 1 deletions

View File

@ -137,6 +137,9 @@ struct Settings;
M(UInt64, min_bytes_to_rebalance_partition_over_jbod, 0, "Minimal amount of bytes to enable part rebalance over JBOD array (0 - disabled).", 0) \ M(UInt64, min_bytes_to_rebalance_partition_over_jbod, 0, "Minimal amount of bytes to enable part rebalance over JBOD array (0 - disabled).", 0) \
M(Bool, check_sample_column_is_correct, true, "Check columns or columns by hash for sampling are unsigned integer.", 0) \ M(Bool, check_sample_column_is_correct, true, "Check columns or columns by hash for sampling are unsigned integer.", 0) \
\ \
/** Flags that are relevant only in testing. */ \
M(Bool, testing_mutate_corrupt_checksums, false, "Tetsing only! Corrupt part checksums during mutation to test fetch fallback.", 0) \
\
/** Experimental/work in progress feature. Unsafe for production. */ \ /** Experimental/work in progress feature. Unsafe for production. */ \
M(UInt64, part_moves_between_shards_enable, 0, "Experimental/Incomplete feature to move parts between shards. Does not take into account sharding expressions.", 0) \ M(UInt64, part_moves_between_shards_enable, 0, "Experimental/Incomplete feature to move parts between shards. Does not take into account sharding expressions.", 0) \
M(UInt64, part_moves_between_shards_delay_seconds, 30, "Time to wait before/after moving parts between shards.", 0) \ M(UInt64, part_moves_between_shards_delay_seconds, 30, "Time to wait before/after moving parts between shards.", 0) \

View File

@ -13,6 +13,11 @@ namespace ProfileEvents
namespace DB namespace DB
{ {
namespace ErrorCodes
{
extern const int CHECKSUM_DOESNT_MATCH;
}
ReplicatedMergeMutateTaskBase::PrepareResult MutateFromLogEntryTask::prepare() ReplicatedMergeMutateTaskBase::PrepareResult MutateFromLogEntryTask::prepare()
{ {
const String & source_part_name = entry.source_parts.at(0); const String & source_part_name = entry.source_parts.at(0);
@ -202,6 +207,9 @@ bool MutateFromLogEntryTask::finalize(ReplicatedMergeMutateTaskBase::PartLogWrit
try try
{ {
if (unlikely(storage.getSettings()->testing_mutate_corrupt_checksums))
throw Exception(ErrorCodes::CHECKSUM_DOESNT_MATCH, "Throwing exception because `testing_mutate_corrupt_checksum` is enabled.");
storage.checkPartChecksumsAndCommit(*transaction_ptr, new_part, mutate_task->getHardlinkedFiles()); storage.checkPartChecksumsAndCommit(*transaction_ptr, new_part, mutate_task->getHardlinkedFiles());
} }
catch (const Exception & e) catch (const Exception & e)

View File

@ -1470,7 +1470,7 @@ bool MutateTask::execute()
if (task->executeStep()) if (task->executeStep())
return true; return true;
promise.set_value(ctx->new_data_part); promise.set_value(std::move(ctx->new_data_part));
return false; return false;
} }
} }

View File

@ -0,0 +1,40 @@
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance("node1", with_zookeeper=True)
node2 = cluster.add_instance("node2", with_zookeeper=True, stay_alive=True)
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
for i, node in enumerate([node1, node2]):
# node 2 will corrupt checksums, it must be down while node 1 commits mutation
# with correct checksums to zookeeper.
node.query_with_retry(
"""CREATE TABLE fetch_fallback (k int, v int, z int)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/t0', '{}')
ORDER BY tuple()
SETTINGS testing_mutate_corrupt_checksums = {}""".format(i, i == 1)
)
yield cluster
finally:
cluster.shutdown()
def test_mutation_fetch_fallback(start_cluster):
node1.query("INSERT INTO fetch_fallback(k, v) VALUES (1, 3), (2, 7), (3, 4)")
node2.stop_clickhouse()
node1.query("ALTER TABLE fetch_fallback DROP COLUMN z", settings={"mutations_sync": 1})
node2.start_clickhouse()
node1.query("SYSTEM SYNC REPLICA fetch_fallback", timeout=10)
node2.query("SYSTEM SYNC REPLICA fetch_fallback", timeout=10)
node2.contains_in_log("We will download merged part from replica to force byte-identical result.")