mv fixReplicaMetadataVersionIfNeeded from attach thread to restarting thread

This commit is contained in:
Michael Stetsyuk 2024-10-03 08:30:52 +00:00 committed by Michael Stetsyuk
parent 3e6b62f420
commit 01cb0eb32f
10 changed files with 206 additions and 101 deletions

View File

@ -1,5 +1,6 @@
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Storages/MergeTree/ReplicatedMergeTreeAttachThread.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQueue.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Common/ZooKeeper/IKeeper.h>
@ -20,7 +21,6 @@ namespace ErrorCodes
{
extern const int SUPPORT_IS_DISABLED;
extern const int REPLICA_STATUS_CHANGED;
extern const int LOGICAL_ERROR;
}
ReplicatedMergeTreeAttachThread::ReplicatedMergeTreeAttachThread(StorageReplicatedMergeTree & storage_)
@ -123,67 +123,6 @@ void ReplicatedMergeTreeAttachThread::checkHasReplicaMetadataInZooKeeper(const z
}
}
Int32 ReplicatedMergeTreeAttachThread::fixReplicaMetadataVersionIfNeeded(zkutil::ZooKeeperPtr zookeeper)
{
const String & zookeeper_path = storage.zookeeper_path;
const String & replica_path = storage.replica_path;
const bool replica_readonly = storage.is_readonly;
for (size_t i = 0; i != 2; ++i)
{
String replica_metadata_version_str;
const bool replica_metadata_version_exists = zookeeper->tryGet(replica_path + "/metadata_version", replica_metadata_version_str);
if (!replica_metadata_version_exists)
return -1;
const Int32 metadata_version = parse<Int32>(replica_metadata_version_str);
if (metadata_version != 0 || replica_readonly)
{
/// No need to fix anything
return metadata_version;
}
Coordination::Stat stat;
zookeeper->get(fs::path(zookeeper_path) / "metadata", &stat);
if (stat.version == 0)
{
/// No need to fix anything
return metadata_version;
}
ReplicatedMergeTreeQueue & queue = storage.queue;
queue.pullLogsToQueue(zookeeper);
if (queue.getStatus().metadata_alters_in_queue != 0)
{
LOG_DEBUG(log, "No need to update metadata_version as there are ALTER_METADATA entries in the queue");
return metadata_version;
}
const Coordination::Requests ops = {
zkutil::makeSetRequest(fs::path(replica_path) / "metadata_version", std::to_string(stat.version), 0),
zkutil::makeCheckRequest(fs::path(zookeeper_path) / "metadata", stat.version),
};
Coordination::Responses ops_responses;
const auto code = zookeeper->tryMulti(ops, ops_responses);
if (code == Coordination::Error::ZOK)
{
LOG_DEBUG(log, "Successfully set metadata_version to {}", stat.version);
return stat.version;
}
if (code != Coordination::Error::ZBADVERSION)
{
throw zkutil::KeeperException(code);
}
}
/// Second attempt is only possible if metadata_version != 0 or metadata.version changed during the first attempt.
/// If metadata_version != 0, on second attempt we will return the new metadata_version.
/// If metadata.version changed, on second attempt we will either get metadata_version != 0 and return the new metadata_version or we will get metadata_alters_in_queue != 0 and return 0.
/// Either way, on second attempt this method should return.
throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to fix replica metadata_version in ZooKeeper after two attempts");
}
void ReplicatedMergeTreeAttachThread::runImpl()
{
storage.setZooKeeper();
@ -227,33 +166,6 @@ void ReplicatedMergeTreeAttachThread::runImpl()
/// Just in case it was not removed earlier due to connection loss
zookeeper->tryRemove(replica_path + "/flags/force_restore_data");
const Int32 replica_metadata_version = fixReplicaMetadataVersionIfNeeded(zookeeper);
const bool replica_metadata_version_exists = replica_metadata_version != -1;
if (replica_metadata_version_exists)
{
storage.setInMemoryMetadata(metadata_snapshot->withMetadataVersion(replica_metadata_version));
}
else
{
/// Table was created before 20.4 and was never altered,
/// let's initialize replica metadata version from global metadata version.
Coordination::Stat table_metadata_version_stat;
zookeeper->get(zookeeper_path + "/metadata", &table_metadata_version_stat);
Coordination::Requests ops;
ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/metadata", table_metadata_version_stat.version));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/metadata_version", toString(table_metadata_version_stat.version), zkutil::CreateMode::Persistent));
Coordination::Responses res;
auto code = zookeeper->tryMulti(ops, res);
if (code == Coordination::Error::ZBADVERSION)
throw Exception(ErrorCodes::REPLICA_STATUS_CHANGED, "Failed to initialize metadata_version "
"because table was concurrently altered, will retry");
zkutil::KeeperMultiException::check(code, ops, res);
}
storage.checkTableStructure(replica_path, metadata_snapshot);
storage.checkParts(skip_sanity_checks);

View File

@ -48,8 +48,6 @@ private:
void runImpl();
void finalizeInitialization();
Int32 fixReplicaMetadataVersionIfNeeded(zkutil::ZooKeeperPtr zookeeper);
};
}

View File

@ -615,7 +615,7 @@ std::pair<int32_t, int32_t> ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::Zo
{
std::lock_guard lock(pull_logs_to_queue_mutex);
if (reason != LOAD)
if (reason != LOAD && reason != FIX_METADATA_VERSION)
{
/// It's totally ok to load queue on readonly replica (that's what RestartingThread does on initialization).
/// It's ok if replica became readonly due to connection loss after we got current zookeeper (in this case zookeeper must be expired).

View File

@ -334,6 +334,7 @@ public:
UPDATE,
MERGE_PREDICATE,
SYNC,
FIX_METADATA_VERSION,
OTHER,
};

View File

@ -29,6 +29,8 @@ namespace MergeTreeSetting
namespace ErrorCodes
{
extern const int REPLICA_IS_ALREADY_ACTIVE;
extern const int REPLICA_STATUS_CHANGED;
extern const int LOGICAL_ERROR;
}
namespace FailPoints
@ -207,6 +209,36 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
throw;
}
const Int32 replica_metadata_version = fixReplicaMetadataVersionIfNeeded(zookeeper);
const bool replica_metadata_version_exists = replica_metadata_version != -1;
if (replica_metadata_version_exists)
{
storage.setInMemoryMetadata(storage.getInMemoryMetadataPtr()->withMetadataVersion(replica_metadata_version));
}
else
{
/// Table was created before 20.4 and was never altered,
/// let's initialize replica metadata version from global metadata version.
const String & zookeeper_path = storage.zookeeper_path, & replica_path = storage.replica_path;
Coordination::Stat table_metadata_version_stat;
zookeeper->get(zookeeper_path + "/metadata", &table_metadata_version_stat);
Coordination::Requests ops;
ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/metadata", table_metadata_version_stat.version));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/metadata_version", toString(table_metadata_version_stat.version), zkutil::CreateMode::Persistent));
Coordination::Responses res;
auto code = zookeeper->tryMulti(ops, res);
if (code == Coordination::Error::ZBADVERSION)
throw Exception(ErrorCodes::REPLICA_STATUS_CHANGED, "Failed to initialize metadata_version "
"because table was concurrently altered, will retry");
zkutil::KeeperMultiException::check(code, ops, res);
}
storage.queue.removeCurrentPartsFromMutations();
storage.last_queue_update_finish_time.store(time(nullptr));
@ -424,4 +456,64 @@ void ReplicatedMergeTreeRestartingThread::setNotReadonly()
storage.readonly_start_time.store(0, std::memory_order_relaxed);
}
Int32 ReplicatedMergeTreeRestartingThread::fixReplicaMetadataVersionIfNeeded(zkutil::ZooKeeperPtr zookeeper)
{
const String & zookeeper_path = storage.zookeeper_path;
const String & replica_path = storage.replica_path;
const size_t num_attempts = 2;
for (size_t attempt = 0; attempt != num_attempts; ++attempt)
{
String replica_metadata_version_str;
Coordination::Stat replica_stat;
const bool replica_metadata_version_exists = zookeeper->tryGet(replica_path + "/metadata_version", replica_metadata_version_str, &replica_stat);
if (!replica_metadata_version_exists)
return -1;
const Int32 metadata_version = parse<Int32>(replica_metadata_version_str);
if (metadata_version != 0)
return metadata_version;
Coordination::Stat table_stat;
zookeeper->get(fs::path(zookeeper_path) / "metadata", &table_stat);
if (table_stat.version == 0)
return metadata_version;
ReplicatedMergeTreeQueue & queue = storage.queue;
queue.pullLogsToQueue(zookeeper, {}, ReplicatedMergeTreeQueue::FIX_METADATA_VERSION);
if (queue.getStatus().metadata_alters_in_queue != 0)
{
LOG_INFO(log, "Skipping updating metadata_version as there are ALTER_METADATA entries in the queue");
return metadata_version;
}
const Coordination::Requests ops = {
zkutil::makeSetRequest(fs::path(replica_path) / "metadata_version", std::to_string(table_stat.version), replica_stat.version),
zkutil::makeCheckRequest(fs::path(zookeeper_path) / "metadata", table_stat.version),
};
Coordination::Responses ops_responses;
const Coordination::Error code = zookeeper->tryMulti(ops, ops_responses);
if (code == Coordination::Error::ZOK)
{
LOG_DEBUG(log, "Successfully set metadata_version to {}", table_stat.version);
return table_stat.version;
}
if (code == Coordination::Error::ZBADVERSION)
{
LOG_WARNING(log, "Cannot fix metadata_version because either metadata.version or metadata_version.version changed, attempts left = {}", num_attempts - attempt - 1);
continue;
}
throw zkutil::KeeperException(code);
}
/// Second attempt is only possible if either metadata_version.version or metadata.version changed during the first attempt.
/// If metadata_version changed to non-zero value during the first attempt, on second attempt we will return the new metadata_version.
/// If metadata.version changed during first attempt, on second attempt we will either get metadata_version != 0 and return the new metadata_version or we will get metadata_alters_in_queue != 0 and return 0.
/// So either first or second attempt should return unless metadata_version was rewritten from 0 to 0 during the first attempt which is highly unlikely.
throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to fix replica metadata_version in ZooKeeper after two attempts");
}
}

View File

@ -6,6 +6,7 @@
#include <thread>
#include <atomic>
#include <Common/logger_useful.h>
#include <Common/ZooKeeper/ZooKeeper.h>
namespace DB
@ -68,6 +69,9 @@ private:
/// Disable readonly mode for table
void setNotReadonly();
/// Fix replica metadata_version if needed
Int32 fixReplicaMetadataVersionIfNeeded(zkutil::ZooKeeperPtr zookeeper);
};

View File

@ -83,6 +83,8 @@ CLICKHOUSE_ERROR_LOG_FILE = "/var/log/clickhouse-server/clickhouse-server.err.lo
# This means that this minimum need to be, at least, 1 year older than the current release
CLICKHOUSE_CI_MIN_TESTED_VERSION = "23.3"
ZOOKEEPER_CONTAINERS = ("zoo1", "zoo2", "zoo3")
# to create docker-compose env file
def _create_env_file(path, variables):
@ -2061,6 +2063,11 @@ class ClickHouseCluster:
container_id = self.get_container_id(instance_name)
return self.docker_client.api.logs(container_id).decode()
def query_zookeeper(self, query, node=ZOOKEEPER_CONTAINERS[0], nothrow=False):
cmd = f'clickhouse keeper-client -p {self.zookeeper_port} -q "{query}"'
container_id = self.get_container_id(node)
return self.exec_in_container(container_id, cmd, nothrow=nothrow, use_cli=False)
def exec_in_container(
self,
container_id: str,
@ -2391,16 +2398,16 @@ class ClickHouseCluster:
def wait_zookeeper_secure_to_start(self, timeout=20):
logging.debug("Wait ZooKeeper Secure to start")
nodes = ["zoo1", "zoo2", "zoo3"]
self.wait_zookeeper_nodes_to_start(nodes, timeout)
self.wait_zookeeper_nodes_to_start(ZOOKEEPER_CONTAINERS, timeout)
def wait_zookeeper_to_start(self, timeout: float = 180) -> None:
logging.debug("Wait ZooKeeper to start")
nodes = ["zoo1", "zoo2", "zoo3"]
self.wait_zookeeper_nodes_to_start(nodes, timeout)
self.wait_zookeeper_nodes_to_start(ZOOKEEPER_CONTAINERS, timeout)
def wait_zookeeper_nodes_to_start(
self, nodes: List[str], timeout: float = 60
self,
nodes: List[str],
timeout: float = 60,
) -> None:
start = time.time()
err = Exception("")
@ -3226,7 +3233,11 @@ class ClickHouseCluster:
return zk
def run_kazoo_commands_with_retries(
self, kazoo_callback, zoo_instance_name="zoo1", repeats=1, sleep_for=1
self,
kazoo_callback,
zoo_instance_name=ZOOKEEPER_CONTAINERS[0],
repeats=1,
sleep_for=1,
):
zk = self.get_kazoo_client(zoo_instance_name)
logging.debug(
@ -4648,9 +4659,7 @@ class ClickHouseInstance:
depends_on.append("nats1")
if self.with_zookeeper:
depends_on.append("zoo1")
depends_on.append("zoo2")
depends_on.append("zoo3")
depends_on += list(ZOOKEEPER_CONTAINERS)
if self.with_minio:
depends_on.append("minio1")

View File

@ -0,0 +1,16 @@
<clickhouse>
<tcp_port>9000</tcp_port>
<profiles>
<default>
</default>
</profiles>
<users>
<default>
<profile>default</profile>
<no_password></no_password>
</default>
</users>
</clickhouse>

View File

@ -0,0 +1,73 @@
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node",
main_configs=["configs/config.xml"],
stay_alive=True,
with_zookeeper=True,
)
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_fix_metadata_version(start_cluster):
zookeeper_path = "/clickhouse/test_fix_metadata_version"
replica = "replica1"
replica_path = f"{zookeeper_path}/replicas/{replica}"
def get_metadata_versions():
table_metadata_version = int(
node.query(
f"""
SELECT version
FROM system.zookeeper
WHERE path = '{zookeeper_path}' AND name = 'metadata'
"""
).strip()
)
replica_metadata_version = int(
node.query(
f"""
SELECT value
FROM system.zookeeper
WHERE path = '{replica_path}' AND name = 'metadata_version'
"""
).strip()
)
return table_metadata_version, replica_metadata_version
node.query(
f"""
DROP TABLE IF EXISTS t SYNC;
CREATE TABLE t
(
`x` UInt32
)
ENGINE = ReplicatedMergeTree('{zookeeper_path}', '{replica}')
ORDER BY x
"""
)
node.query("ALTER TABLE t (ADD COLUMN `y` UInt32)")
assert get_metadata_versions() == (1, 1)
cluster.query_zookeeper(f"set '{replica_path}/metadata_version' '0'")
assert get_metadata_versions() == (1, 0)
node.restart_clickhouse()
assert get_metadata_versions() == (1, 1)