Remove zero-copy version converter

This commit is contained in:
Anton Ivashkin 2021-12-20 20:23:25 +03:00
parent f0b9a4327a
commit c724b074ae
6 changed files with 5 additions and 456 deletions

View File

@ -126,7 +126,7 @@ struct Settings;
M(Bool, allow_nullable_key, false, "Allow Nullable types as primary keys.", 0) \
M(Bool, allow_remote_fs_zero_copy_replication, true, "Allow Zero-copy replication over remote fs.", 0) \
M(String, remote_fs_zero_copy_zookeeper_path, "/clickhouse/zero_copy", "ZooKeeper path for Zero-copy table-independet info.", 0) \
M(UInt64, need_revert_zero_copy_version, 0, "Revert Zero-copy to old version", 0) \
M(Bool, remote_fs_zero_copy_path_compatible_mode, false, "Run zero-copy in compatible mode during convertion process.", 0) \
M(Bool, remove_empty_parts, true, "Remove empty parts after they were pruned by TTL, mutation, or collapsing merge algorithm.", 0) \
M(Bool, assign_part_uuids, false, "Generate UUIDs for parts. Before enabling check that all replicas support new format.", 0) \
M(Int64, max_partitions_to_read, -1, "Limit the max number of partitions that can be accessed in one query. <= 0 means unlimited. This setting is the default that can be overridden by the query-level setting with the same name.", 0) \

View File

@ -156,8 +156,6 @@ static const auto QUEUE_UPDATE_ERROR_SLEEP_MS = 1 * 1000;
static const auto MUTATIONS_FINALIZING_SLEEP_MS = 1 * 1000;
static const auto MUTATIONS_FINALIZING_IDLE_SLEEP_MS = 5 * 1000;
static const int CURRENT_ZERO_COPY_VERSION = 2;
void StorageReplicatedMergeTree::setZooKeeper()
{
/// Every ReplicatedMergeTree table is using only one ZooKeeper session.
@ -4111,10 +4109,6 @@ void StorageReplicatedMergeTree::startup()
assert(prev_ptr == nullptr);
getContext()->getInterserverIOHandler().addEndpoint(data_parts_exchange_ptr->getId(replica_path), data_parts_exchange_ptr);
convertZeroCopySchema();
is_zero_copy_in_compatible_mode = isZeroCopySchemaInCompatibleMode();
cleanupOldZeroCopySchema();
/// In this thread replica will be activated.
restarting_thread.start();
@ -7176,7 +7170,7 @@ void StorageReplicatedMergeTree::lockSharedData(const IMergeTreeDataPart & part)
boost::replace_all(id, "/", "_");
Strings zc_zookeeper_paths = getZeroCopyPartPath(*getDefaultSettings(), disk->getType(), getTableUniqID(),
part.name, is_zero_copy_in_compatible_mode ? zookeeper_path : "");
part.name, zookeeper_path);
for (const auto & zc_zookeeper_path : zc_zookeeper_paths)
{
String zookeeper_node = fs::path(zc_zookeeper_path) / id / replica_name;
@ -7210,7 +7204,7 @@ bool StorageReplicatedMergeTree::unlockSharedData(const IMergeTreeDataPart & par
return false;
return unlockSharedDataById(part.getUniqueId(), getTableUniqID(), name, replica_name, disk, zookeeper, *getDefaultSettings(), log,
is_zero_copy_in_compatible_mode ? zookeeper_path : String(""));
zookeeper_path);
}
@ -7302,7 +7296,7 @@ String StorageReplicatedMergeTree::getSharedDataReplica(
return best_replica;
Strings zc_zookeeper_paths = getZeroCopyPartPath(*getDefaultSettings(), disk_type, getTableUniqID(), part.name,
is_zero_copy_in_compatible_mode ? zookeeper_path : "");
zookeeper_path);
std::set<String> replicas;
@ -7381,7 +7375,7 @@ Strings StorageReplicatedMergeTree::getZeroCopyPartPath(const MergeTreeSettings
String new_path = fs::path(settings.remote_fs_zero_copy_zookeeper_path.toString()) / zero_copy / table_uuid / part_name;
res.push_back(new_path);
if (!zookeeper_path_old.empty())
if (settings.remote_fs_zero_copy_path_compatible_mode && !zookeeper_path_old.empty())
{ /// Compatibility mode for cluster with old and new versions
String old_path = fs::path(zookeeper_path_old) / zero_copy / "shared" / part_name;
res.push_back(old_path);
@ -7611,148 +7605,6 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP
}
void StorageReplicatedMergeTree::convertZeroCopySchema()
{
if (!current_zookeeper)
return;
int zero_copy_version = 1;
auto version_path = fs::path(getDefaultSettings()->remote_fs_zero_copy_zookeeper_path.toString()) / "version" / replica_name;
if (current_zookeeper->exists(version_path))
zero_copy_version = parse<int>(current_zookeeper->get(version_path));
/// Emergency parameter to restore zero-copy marks on old paths
int revert_to_version = getDefaultSettings()->need_revert_zero_copy_version;
if (!revert_to_version && zero_copy_version >= CURRENT_ZERO_COPY_VERSION)
return;
if (revert_to_version && zero_copy_version <= revert_to_version)
return;
int required_zero_copy_version = revert_to_version ? revert_to_version : CURRENT_ZERO_COPY_VERSION;
auto storage_policy = getStoragePolicy();
if (!storage_policy)
return;
auto disks = storage_policy->getDisks();
std::set<String> disk_types;
for (const auto & disk : disks)
if (disk->supportZeroCopyReplication())
disk_types.insert(toString(disk->getType()));
if (disk_types.empty())
return;
LOG_INFO(log, "Convert zero_copy version from {} to {} for {}", zero_copy_version, required_zero_copy_version,
version_path.string());
unsigned long converted_part_counter = 0;
for (auto const & disk_type : disk_types)
{
String zero_copy = fmt::format("zero_copy_{}", disk_type);
auto shard_root_v1 = fs::path(zookeeper_path) / zero_copy / "shared";
auto shard_root_v2 = fs::path(getDefaultSettings()->remote_fs_zero_copy_zookeeper_path.toString())
/ zero_copy / getTableUniqID();
auto old_shard_root = revert_to_version == 1 ? shard_root_v2 : shard_root_v1;
auto new_shard_root = revert_to_version == 1 ? shard_root_v1 : shard_root_v2;
Strings parts;
current_zookeeper->tryGetChildren(old_shard_root, parts);
for (const auto & part_name : parts)
{
auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
auto part = getPartIfExists(part_info, {MergeTreeDataPartState::Committed});
if (part)
{ /// Do not move lost locks
Strings ids;
current_zookeeper->tryGetChildren(old_shard_root / part_name, ids);
for (const auto & id : ids)
{
if (current_zookeeper->exists(old_shard_root / part_name / id / replica_name))
{
auto zookeeper_node = new_shard_root / part_name / id / replica_name;
createZeroCopyLockNode(current_zookeeper, zookeeper_node.string());
++converted_part_counter;
}
}
}
}
}
current_zookeeper->createAncestors(version_path);
current_zookeeper->createOrUpdate(version_path, std::to_string(required_zero_copy_version),
zkutil::CreateMode::Persistent);
current_zookeeper->createOrUpdate(version_path / "cleanup_required", std::to_string(zero_copy_version),
zkutil::CreateMode::Persistent);
LOG_INFO(log, "Convert zero_copy version from {} to {} for {} complete, converted {} locks", zero_copy_version, required_zero_copy_version,
version_path.string(), converted_part_counter);
}
void StorageReplicatedMergeTree::cleanupOldZeroCopySchema()
{
if (is_zero_copy_in_compatible_mode)
return; /// Some replicas have old version
if (!current_zookeeper)
return;
auto old_version_path = fs::path(getDefaultSettings()->remote_fs_zero_copy_zookeeper_path.toString()) / "version" / replica_name / "cleanup_required";
if (!current_zookeeper->exists(old_version_path))
return;
auto zero_copy_version = parse<int>(current_zookeeper->get(old_version_path));
if (zero_copy_version == 1)
{
auto storage_policy = getStoragePolicy();
if (!storage_policy)
return;
auto disks = storage_policy->getDisks();
std::set<String> disk_types;
for (const auto & disk : disks)
if (disk->supportZeroCopyReplication())
disk_types.insert(toString(disk->getType()));
if (disk_types.empty())
return;
LOG_INFO(log, "Cleanup zero_copy version {}", zero_copy_version);
for (auto const & disk_type : disk_types)
{
String zero_copy = fmt::format("zero_copy_{}", disk_type);
auto old_shard_root = fs::path(zookeeper_path) / zero_copy / "shared";
current_zookeeper->tryRemoveRecursive(old_shard_root);
}
current_zookeeper->remove(old_version_path);
LOG_INFO(log, "Cleanup zero_copy version {} complete", zero_copy_version);
}
}
void StorageReplicatedMergeTree::createZeroCopyLockNode(const zkutil::ZooKeeperPtr & zookeeper, const String & zookeeper_node)
{
/// In rare case other replica can remove path between createAncestors and createIfNotExists
@ -7776,27 +7628,4 @@ void StorageReplicatedMergeTree::createZeroCopyLockNode(const zkutil::ZooKeeperP
}
bool StorageReplicatedMergeTree::isZeroCopySchemaInCompatibleMode() const
{
if (!current_zookeeper)
return false;
auto version_root_path = fs::path(getDefaultSettings()->remote_fs_zero_copy_zookeeper_path.toString()) / "version";
Strings replicas = current_zookeeper->getChildren(fs::path(zookeeper_path) / "replicas");
for (const auto & replica : replicas)
{
if (!current_zookeeper->exists(version_root_path / replica))
return true;
int zero_copy_version = parse<int>(current_zookeeper->get(version_root_path / replica));
if (zero_copy_version < CURRENT_ZERO_COPY_VERSION)
return true;
/// If version is greater that current then other replica has new version.
/// In that case other replica with new version should be in compatible mode.
}
return false;
}
}

View File

@ -747,19 +747,8 @@ private:
static Strings getZeroCopyPartPath(const MergeTreeSettings & settings, DiskType disk_type, const String & table_uuid,
const String & part_name, const String & zookeeper_path_old);
/// Upgrave zero-copy version
/// version 1 - lock for shared part inside table node in ZooKeeper
/// version 2 - lock for shared part in separate node
void convertZeroCopySchema();
void cleanupOldZeroCopySchema();
static void createZeroCopyLockNode(const zkutil::ZooKeeperPtr & zookeeper, const String & zookeeper_node);
bool isZeroCopySchemaInCompatibleMode() const;
bool is_zero_copy_in_compatible_mode = false;
protected:
/** If not 'attach', either creates a new table in ZK, or adds a replica to an existing table.
*/

View File

@ -1,50 +0,0 @@
<clickhouse>
<storage_configuration>
<disks>
<s3>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<send_metadata>true</send_metadata>
</s3>
</disks>
<policies>
<s3>
<volumes>
<main>
<disk>s3</disk>
</main>
</volumes>
</s3>
</policies>
</storage_configuration>
<merge_tree>
<min_bytes_for_wide_part>1024</min_bytes_for_wide_part>
<old_parts_lifetime>1</old_parts_lifetime>
</merge_tree>
<remote_servers>
<test_cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
<macros>
<cluster>test_cluster</cluster>
</macros>
</clickhouse>

View File

@ -1,219 +0,0 @@
import logging
import time
import kazoo
import pytest
from helpers.cluster import ClickHouseCluster
logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler())
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance("node1", main_configs=["configs/config.d/s3.xml"], macros={'replica': '1'},
image='yandex/clickhouse-server', tag='21.11.4.14',
stay_alive=True, with_installed_binary=True,
with_minio=True,
with_zookeeper=True)
cluster.add_instance("node2", main_configs=["configs/config.d/s3.xml"], macros={'replica': '2'},
image='yandex/clickhouse-server', tag='21.11.4.14',
stay_alive=True, with_installed_binary=True,
with_minio=True,
with_zookeeper=True)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
yield cluster
finally:
cluster.shutdown()
def get_large_objects_count(cluster, size=100, folder='data'):
minio = cluster.minio_client
counter = 0
for obj in minio.list_objects(cluster.minio_bucket, '{}/'.format(folder)):
if obj.size is not None and obj.size >= size:
counter = counter + 1
return counter
def check_objects_exisis(cluster, object_list, folder='data'):
minio = cluster.minio_client
for obj in object_list:
if obj:
minio.stat_object(cluster.minio_bucket, '{}/{}'.format(folder, obj))
def check_objects_not_exisis(cluster, object_list, folder='data'):
minio = cluster.minio_client
for obj in object_list:
if obj:
try:
minio.stat_object(cluster.minio_bucket, '{}/{}'.format(folder, obj))
except Exception as error:
assert "NoSuchKey" in str(error)
else:
assert False, "Object {} should not be exists".format(obj)
def wait_for_large_objects_count(cluster, expected, size=100, timeout=30):
while timeout > 0:
if get_large_objects_count(cluster, size=size) == expected:
return
timeout -= 1
time.sleep(1)
assert get_large_objects_count(cluster, size=size) == expected
def wait_for_count_in_table(node, table, count, seconds):
while seconds > 0:
seconds -= 1
res = node.query(f"SELECT count() FROM {table}")
if res == f"{count}\n":
return
time.sleep(1)
res = node.query(f"SELECT count() FROM {table}")
assert res == f"{count}\n"
def get_ids(zookeeper, zk_path):
ids = []
try:
zk_nodes = zookeeper.get_children(zk_path)
for zk_node in zk_nodes:
part_ids = zookeeper.get_children(zk_path + "/" + zk_node)
assert len(part_ids) == 1
ids += part_ids
except kazoo.exceptions.NoNodeError:
ids = []
pass
ids = list(set(ids))
ids.sort()
return ids
def get_ids_new(zookeeper, zk_path):
ids = []
try:
zk_tables = zookeeper.get_children(zk_path)
for zk_table in zk_tables:
zk_nodes = zookeeper.get_children(zk_path + "/" + zk_table)
for zk_node in zk_nodes:
part_ids = zookeeper.get_children(zk_path + "/" + zk_table + "/" + zk_node)
assert len(part_ids) == 1
ids += part_ids
except kazoo.exceptions.NoNodeError:
ids = []
pass
ids = list(set(ids))
ids.sort()
return ids
def wait_mutations(node, table, seconds):
time.sleep(1)
while seconds > 0:
seconds -= 1
mutations = node.query(f"SELECT count() FROM system.mutations WHERE table='{table}' AND is_done=0")
if mutations == '0\n':
return
time.sleep(1)
mutations = node.query(f"SELECT count() FROM system.mutations WHERE table='{table}' AND is_done=0")
assert mutations == '0\n'
def test_s3_zero_copy_version_upgrade(cluster):
node1 = cluster.instances["node1"]
node2 = cluster.instances["node2"]
zookeeper = cluster.get_kazoo_client("zoo1")
node1.query("DROP TABLE IF EXISTS convert_test NO DELAY")
node2.query("DROP TABLE IF EXISTS convert_test NO DELAY")
node1.query(
"""
CREATE TABLE convert_test ON CLUSTER test_cluster (d String)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/convert_test', '{}')
ORDER BY d
PARTITION BY d
SETTINGS storage_policy='s3'
"""
.format('{replica}')
)
node1.query("INSERT INTO convert_test VALUES ('convert_part_1'),('convert_part_2'),('convert_part_3')")
wait_for_count_in_table(node2, "convert_test", 3, 10)
zk_old_path = "/clickhouse/tables/convert_test/zero_copy_s3/shared"
zk_path = "/clickhouse/zero_copy/zero_copy_s3"
part_ids = get_ids(zookeeper, zk_old_path)
assert len(part_ids) == 3
ids = get_ids_new(zookeeper, zk_path)
assert len(ids) == 0
node1.restart_with_latest_version()
ids = get_ids_new(zookeeper, zk_path)
assert ids == part_ids
old_ids = get_ids(zookeeper, zk_old_path)
assert old_ids == part_ids
node1.restart_clickhouse()
ids = get_ids_new(zookeeper, zk_path)
assert ids == part_ids
old_ids = get_ids(zookeeper, zk_old_path)
assert old_ids == part_ids
node1.query("INSERT INTO convert_test VALUES ('convert_part_4')")
wait_for_count_in_table(node1, "convert_test", 4, 10)
wait_for_count_in_table(node2, "convert_test", 4, 10)
node2.query("INSERT INTO convert_test VALUES ('convert_part_5')")
wait_for_count_in_table(node1, "convert_test", 5, 10)
wait_for_count_in_table(node2, "convert_test", 5, 10)
part_ids = get_ids_new(zookeeper, zk_path)
assert len(part_ids) == 5
old_ids = get_ids(zookeeper, zk_old_path)
assert old_ids == part_ids
node1.query("ALTER TABLE convert_test DETACH PARTITION 'convert_part_1'")
node1.query("ALTER TABLE convert_test DROP DETACHED PARTITION 'convert_part_1'", settings={"allow_drop_detached": 1})
wait_for_count_in_table(node1, "convert_test", 4, 10)
wait_for_count_in_table(node2, "convert_test", 4, 10)
node2.query("ALTER TABLE convert_test DETACH PARTITION 'convert_part_2'")
node2.query("ALTER TABLE convert_test DROP DETACHED PARTITION 'convert_part_2'", settings={"allow_drop_detached": 1})
wait_for_count_in_table(node1, "convert_test", 3, 10)
wait_for_count_in_table(node2, "convert_test", 3, 10)
wait_mutations(node1, "convert_test", 10)
wait_mutations(node2, "convert_test", 10)
part_ids = get_ids_new(zookeeper, zk_path)
assert len(part_ids) == 4
node1.query("ALTER TABLE convert_test DROP DETACHED PARTITION 'convert_part_2'", settings={"allow_drop_detached": 1})
wait_mutations(node1, "convert_test", 10)
part_ids = get_ids_new(zookeeper, zk_path)
assert len(part_ids) == 3
node2.restart_with_latest_version()
ids = get_ids_new(zookeeper, zk_path)
assert ids == part_ids
old_ids = get_ids(zookeeper, zk_old_path)
assert len(old_ids) == 0
node1.query("DROP TABLE IF EXISTS convert_test NO DELAY")
node2.query("DROP TABLE IF EXISTS convert_test NO DELAY")
zookeeper.stop()