Merge pull request #62549 from ClickHouse/revert-60112-feat-59377

Revert "[feature]: allow to attach parts from a different disk"
This commit is contained in:
Alexander Tokmakov 2024-04-11 17:56:41 +00:00 committed by GitHub
commit 4e486f4176
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 55 additions and 264 deletions

View File

@ -133,8 +133,6 @@ For the query to run successfully, the following conditions must be met:
- Both tables must have the same indices and projections.
- Both tables must have the same storage policy.
If both tables have the same storage policy, use hardlink to attach partition. Otherwise, use copying the data to attach partition.
## REPLACE PARTITION
``` sql

View File

@ -7074,7 +7074,7 @@ MergeTreeData & MergeTreeData::checkStructureAndGetMergeTreeData(
return checkStructureAndGetMergeTreeData(*source_table, src_snapshot, my_snapshot);
}
std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> MergeTreeData::cloneAndLoadDataPart(
std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> MergeTreeData::cloneAndLoadDataPartOnSameDisk(
const MergeTreeData::DataPartPtr & src_part,
const String & tmp_part_prefix,
const MergeTreePartInfo & dst_part_info,
@ -7084,23 +7084,28 @@ std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> MergeTreeData::cloneAn
const WriteSettings & write_settings)
{
chassert(!isStaticStorage());
bool on_same_disk = false;
for (const DiskPtr & disk : this->getStoragePolicy()->getDisks())
/// Check that the storage policy contains the disk where the src_part is located.
bool does_storage_policy_allow_same_disk = false;
for (const DiskPtr & disk : getStoragePolicy()->getDisks())
{
if (disk->getName() == src_part->getDataPartStorage().getDiskName())
{
on_same_disk = true;
does_storage_policy_allow_same_disk = true;
break;
}
}
if (!does_storage_policy_allow_same_disk)
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Could not clone and load part {} because disk does not belong to storage policy",
quoteString(src_part->getDataPartStorage().getFullPath()));
String dst_part_name = src_part->getNewName(dst_part_info);
String tmp_dst_part_name = tmp_part_prefix + dst_part_name;
auto temporary_directory_lock = getTemporaryPartDirectoryHolder(tmp_dst_part_name);
/// Why it is needed if we only hardlink files?
/// Answer: In issue #59377, add copy when attach from different disk.
auto reservation = src_part->getDataPartStorage().reserve(src_part->getBytesOnDisk());
auto src_part_storage = src_part->getDataPartStoragePtr();
@ -7108,30 +7113,16 @@ std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> MergeTreeData::cloneAn
MergeTreeData::MutableDataPartPtr src_flushed_tmp_part;
String with_copy;
if (params.copy_instead_of_hardlink || !on_same_disk)
if (params.copy_instead_of_hardlink)
with_copy = " (copying data)";
std::shared_ptr<IDataPartStorage> dst_part_storage{};
if (on_same_disk && !params.copy_instead_of_hardlink)
{
dst_part_storage = src_part_storage->freeze(
relative_data_path,
tmp_dst_part_name,
read_settings,
write_settings,
/* save_metadata_callback= */ {},
params);
}
else
{
auto reservation_on_dst = getStoragePolicy()->reserve(src_part->getBytesOnDisk());
if (!reservation_on_dst)
throw Exception(ErrorCodes::NOT_ENOUGH_SPACE, "Not enough space on disk.");
dst_part_storage = src_part_storage->clonePart(
this->getRelativeDataPath(), tmp_dst_part_name, reservation_on_dst->getDisk(), read_settings, write_settings, {}, {});
}
auto dst_part_storage = src_part_storage->freeze(
relative_data_path,
tmp_dst_part_name,
read_settings,
write_settings,
/* save_metadata_callback= */ {},
params);
if (params.metadata_version_to_write.has_value())
{
@ -7153,7 +7144,7 @@ std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> MergeTreeData::cloneAn
.withPartFormatFromDisk()
.build();
if (on_same_disk && !params.copy_instead_of_hardlink && params.hardlinked_files)
if (!params.copy_instead_of_hardlink && params.hardlinked_files)
{
params.hardlinked_files->source_part_name = src_part->name;
params.hardlinked_files->source_table_shared_id = src_part->storage.getTableSharedID();
@ -7197,7 +7188,6 @@ std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> MergeTreeData::cloneAn
return std::make_pair(dst_data_part, std::move(temporary_directory_lock));
}
String MergeTreeData::getFullPathOnDisk(const DiskPtr & disk) const
{
return disk->getPath() + relative_data_path;

View File

@ -839,7 +839,7 @@ public:
MergeTreeData & checkStructureAndGetMergeTreeData(const StoragePtr & source_table, const StorageMetadataPtr & src_snapshot, const StorageMetadataPtr & my_snapshot) const;
MergeTreeData & checkStructureAndGetMergeTreeData(IStorage & source_table, const StorageMetadataPtr & src_snapshot, const StorageMetadataPtr & my_snapshot) const;
std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> cloneAndLoadDataPart(
std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> cloneAndLoadDataPartOnSameDisk(
const MergeTreeData::DataPartPtr & src_part,
const String & tmp_part_prefix,
const MergeTreePartInfo & dst_part_info,

View File

@ -2146,7 +2146,7 @@ bool MutateTask::prepare()
scope_guard lock;
{
std::tie(part, lock) = ctx->data->cloneAndLoadDataPart(
std::tie(part, lock) = ctx->data->cloneAndLoadDataPartOnSameDisk(
ctx->source_part, prefix, ctx->future_part->part_info, ctx->metadata_snapshot, clone_params, ctx->context->getReadSettings(), ctx->context->getWriteSettings());
part->getDataPartStorage().beginTransaction();
ctx->temporary_directory_lock = std::move(lock);

View File

@ -2096,7 +2096,7 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con
MergeTreePartInfo dst_part_info(partition_id, temp_index, temp_index, src_part->info.level);
IDataPartStorage::ClonePartParams clone_params{.txn = local_context->getCurrentTransaction()};
auto [dst_part, part_lock] = cloneAndLoadDataPart(
auto [dst_part, part_lock] = cloneAndLoadDataPartOnSameDisk(
src_part,
TMP_PREFIX,
dst_part_info,
@ -2207,7 +2207,7 @@ void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const
.copy_instead_of_hardlink = getSettings()->always_use_copy_instead_of_hardlinks,
};
auto [dst_part, part_lock] = dest_table_storage->cloneAndLoadDataPart(
auto [dst_part, part_lock] = dest_table_storage->cloneAndLoadDataPartOnSameDisk(
src_part,
TMP_PREFIX,
dst_part_info,

View File

@ -2788,7 +2788,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(LogEntry & entry)
auto obtain_part = [&] (PartDescriptionPtr & part_desc)
{
/// Fetches with zero-copy-replication are cheap, but cloneAndLoadDataPart(OnSameDisk) will do full copy.
/// Fetches with zero-copy-replication are cheap, but cloneAndLoadDataPartOnSameDisk will do full copy.
/// It's okay to check the setting for current table and disk for the source table, because src and dst part are on the same disk.
bool prefer_fetch_from_other_replica = !part_desc->replica.empty() && storage_settings_ptr->allow_remote_fs_zero_copy_replication
&& part_desc->src_table_part && part_desc->src_table_part->isStoredOnRemoteDiskWithZeroCopySupport();
@ -2807,7 +2807,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(LogEntry & entry)
.copy_instead_of_hardlink = storage_settings_ptr->always_use_copy_instead_of_hardlinks || ((our_zero_copy_enabled || source_zero_copy_enabled) && part_desc->src_table_part->isStoredOnRemoteDiskWithZeroCopySupport()),
.metadata_version_to_write = metadata_snapshot->getMetadataVersion()
};
auto [res_part, temporary_part_lock] = cloneAndLoadDataPart(
auto [res_part, temporary_part_lock] = cloneAndLoadDataPartOnSameDisk(
part_desc->src_table_part,
TMP_PREFIX + "clone_",
part_desc->new_part_info,
@ -4888,7 +4888,7 @@ bool StorageReplicatedMergeTree::fetchPart(
.keep_metadata_version = true,
};
auto [cloned_part, lock] = cloneAndLoadDataPart(
auto [cloned_part, lock] = cloneAndLoadDataPartOnSameDisk(
part_to_clone,
"tmp_clone_",
part_info,
@ -8078,14 +8078,12 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
bool zero_copy_enabled = storage_settings_ptr->allow_remote_fs_zero_copy_replication
|| dynamic_cast<const MergeTreeData *>(source_table.get())->getSettings()->allow_remote_fs_zero_copy_replication;
IDataPartStorage::ClonePartParams clone_params
{
.copy_instead_of_hardlink = storage_settings_ptr->always_use_copy_instead_of_hardlinks || (zero_copy_enabled && src_part->isStoredOnRemoteDiskWithZeroCopySupport()),
.metadata_version_to_write = metadata_snapshot->getMetadataVersion()
};
auto [dst_part, part_lock] = cloneAndLoadDataPart(
auto [dst_part, part_lock] = cloneAndLoadDataPartOnSameDisk(
src_part,
TMP_PREFIX,
dst_part_info,
@ -8093,10 +8091,9 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
clone_params,
query_context->getReadSettings(),
query_context->getWriteSettings());
dst_parts.emplace_back(std::move(dst_part));
dst_parts_locks.emplace_back(std::move(part_lock));
src_parts.emplace_back(src_part);
dst_parts.emplace_back(dst_part);
dst_parts_locks.emplace_back(std::move(part_lock));
ephemeral_locks.emplace_back(std::move(*lock));
block_id_paths.emplace_back(block_id_path);
part_checksums.emplace_back(hash_hex);
@ -8349,7 +8346,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
.copy_instead_of_hardlink = storage_settings_ptr->always_use_copy_instead_of_hardlinks || (zero_copy_enabled && src_part->isStoredOnRemoteDiskWithZeroCopySupport()),
.metadata_version_to_write = dest_metadata_snapshot->getMetadataVersion()
};
auto [dst_part, dst_part_lock] = dest_table_storage->cloneAndLoadDataPart(
auto [dst_part, dst_part_lock] = dest_table_storage->cloneAndLoadDataPartOnSameDisk(
src_part,
TMP_PREFIX,
dst_part_info,

View File

@ -1,17 +0,0 @@
<clickhouse>
<remote_servers>
<test_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>replica1</host>
<port>9000</port>
</replica>
<replica>
<host>replica2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</clickhouse>

View File

@ -1,187 +0,0 @@
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import assert_eq_with_retry
cluster = ClickHouseCluster(__file__)
replica1 = cluster.add_instance(
"replica1", with_zookeeper=True, main_configs=["configs/remote_servers.xml"]
)
replica2 = cluster.add_instance(
"replica2", with_zookeeper=True, main_configs=["configs/remote_servers.xml"]
)
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
except Exception as ex:
print(ex)
finally:
cluster.shutdown()
def cleanup(nodes):
for node in nodes:
node.query("DROP TABLE IF EXISTS source SYNC")
node.query("DROP TABLE IF EXISTS destination SYNC")
def create_source_table(node, table_name, replicated):
replica = node.name
engine = (
f"ReplicatedMergeTree('/clickhouse/tables/1/{table_name}', '{replica}')"
if replicated
else "MergeTree()"
)
node.query_with_retry(
"""
ATTACH TABLE {table_name} UUID 'cf712b4f-2ca8-435c-ac23-c4393efe52f7'
(
price UInt32,
date Date,
postcode1 LowCardinality(String),
postcode2 LowCardinality(String),
type Enum8('other' = 0, 'terraced' = 1, 'semi-detached' = 2, 'detached' = 3, 'flat' = 4),
is_new UInt8,
duration Enum8('unknown' = 0, 'freehold' = 1, 'leasehold' = 2),
addr1 String,
addr2 String,
street LowCardinality(String),
locality LowCardinality(String),
town LowCardinality(String),
district LowCardinality(String),
county LowCardinality(String)
)
ENGINE = {engine}
ORDER BY (postcode1, postcode2, addr1, addr2)
SETTINGS disk = disk(type = web, endpoint = 'https://raw.githubusercontent.com/ClickHouse/web-tables-demo/main/web/')
""".format(
table_name=table_name, engine=engine
)
)
def create_destination_table(node, table_name, replicated):
replica = node.name
engine = (
f"ReplicatedMergeTree('/clickhouse/tables/1/{table_name}', '{replica}')"
if replicated
else "MergeTree()"
)
node.query_with_retry(
"""
CREATE TABLE {table_name}
(
price UInt32,
date Date,
postcode1 LowCardinality(String),
postcode2 LowCardinality(String),
type Enum8('other' = 0, 'terraced' = 1, 'semi-detached' = 2, 'detached' = 3, 'flat' = 4),
is_new UInt8,
duration Enum8('unknown' = 0, 'freehold' = 1, 'leasehold' = 2),
addr1 String,
addr2 String,
street LowCardinality(String),
locality LowCardinality(String),
town LowCardinality(String),
district LowCardinality(String),
county LowCardinality(String)
)
ENGINE = {engine}
ORDER BY (postcode1, postcode2, addr1, addr2)
""".format(
table_name=table_name, engine=engine
)
)
def test_both_mergtree(start_cluster):
create_source_table(replica1, "source", False)
create_destination_table(replica1, "destination", False)
replica1.query(f"ALTER TABLE destination ATTACH PARTITION tuple() FROM source")
assert_eq_with_retry(
replica1,
f"SELECT toYear(date) AS year,round(avg(price)) AS price,bar(price, 0, 1000000, 80) FROM destination GROUP BY year ORDER BY year ASC",
replica1.query(
f"SELECT toYear(date) AS year,round(avg(price)) AS price,bar(price, 0, 1000000, 80) FROM source GROUP BY year ORDER BY year ASC"
),
)
assert_eq_with_retry(
replica1, f"SELECT town from destination LIMIT 1", "SCARBOROUGH"
)
cleanup([replica1])
def test_all_replicated(start_cluster):
create_source_table(replica1, "source", True)
create_destination_table(replica1, "destination", True)
create_destination_table(replica2, "destination", True)
replica1.query("SYSTEM SYNC REPLICA destination")
replica1.query(f"ALTER TABLE destination ATTACH PARTITION tuple() FROM source")
assert_eq_with_retry(
replica1,
f"SELECT toYear(date) AS year,round(avg(price)) AS price,bar(price, 0, 1000000, 80) FROM destination GROUP BY year ORDER BY year ASC",
replica1.query(
f"SELECT toYear(date) AS year,round(avg(price)) AS price,bar(price, 0, 1000000, 80) FROM source GROUP BY year ORDER BY year ASC"
),
)
assert_eq_with_retry(
replica1,
f"SELECT toYear(date) AS year,round(avg(price)) AS price,bar(price, 0, 1000000, 80) FROM source GROUP BY year ORDER BY year ASC",
replica2.query(
f"SELECT toYear(date) AS year,round(avg(price)) AS price,bar(price, 0, 1000000, 80) FROM destination GROUP BY year ORDER BY year ASC"
),
)
assert_eq_with_retry(
replica1, f"SELECT town from destination LIMIT 1", "SCARBOROUGH"
)
assert_eq_with_retry(
replica2, f"SELECT town from destination LIMIT 1", "SCARBOROUGH"
)
cleanup([replica1, replica2])
def test_only_destination_replicated(start_cluster):
create_source_table(replica1, "source", False)
create_destination_table(replica1, "destination", True)
create_destination_table(replica2, "destination", True)
replica1.query("SYSTEM SYNC REPLICA destination")
replica1.query(f"ALTER TABLE destination ATTACH PARTITION tuple() FROM source")
assert_eq_with_retry(
replica1,
f"SELECT toYear(date) AS year,round(avg(price)) AS price,bar(price, 0, 1000000, 80) FROM destination GROUP BY year ORDER BY year ASC",
replica1.query(
f"SELECT toYear(date) AS year,round(avg(price)) AS price,bar(price, 0, 1000000, 80) FROM source GROUP BY year ORDER BY year ASC"
),
)
assert_eq_with_retry(
replica1,
f"SELECT toYear(date) AS year,round(avg(price)) AS price,bar(price, 0, 1000000, 80) FROM source GROUP BY year ORDER BY year ASC",
replica2.query(
f"SELECT toYear(date) AS year,round(avg(price)) AS price,bar(price, 0, 1000000, 80) FROM destination GROUP BY year ORDER BY year ASC"
),
)
assert_eq_with_retry(
replica1, f"SELECT town from destination LIMIT 1", "SCARBOROUGH"
)
assert_eq_with_retry(
replica2, f"SELECT town from destination LIMIT 1", "SCARBOROUGH"
)
cleanup([replica1, replica2])

View File

@ -5,7 +5,6 @@ import string
import threading
import time
from multiprocessing.dummy import Pool
from helpers.test_tools import assert_eq_with_retry
import pytest
from helpers.client import QueryRuntimeException
@ -1746,9 +1745,9 @@ def test_move_while_merge(start_cluster):
node1.query(f"DROP TABLE IF EXISTS {name} SYNC")
def test_move_across_policies_work_for_attach_not_work_for_move(start_cluster):
def test_move_across_policies_does_not_work(start_cluster):
try:
name = "test_move_across_policies_work_for_attach_not_work_for_move"
name = "test_move_across_policies_does_not_work"
node1.query(
"""
@ -1784,18 +1783,25 @@ def test_move_across_policies_work_for_attach_not_work_for_move(start_cluster):
except QueryRuntimeException:
"""All parts of partition 'all' are already on disk 'jbod2'."""
node1.query(
"""ALTER TABLE {name}2 ATTACH PARTITION tuple() FROM {name}""".format(
name=name
)
)
assert_eq_with_retry(
node1,
"""SELECT * FROM {name}2""".format(name=name),
with pytest.raises(
QueryRuntimeException,
match=".*because disk does not belong to storage policy.*",
):
node1.query(
"""SELECT * FROM {name}""".format(name=name),
),
)
"""ALTER TABLE {name}2 ATTACH PARTITION tuple() FROM {name}""".format(
name=name
)
)
with pytest.raises(
QueryRuntimeException,
match=".*because disk does not belong to storage policy.*",
):
node1.query(
"""ALTER TABLE {name}2 REPLACE PARTITION tuple() FROM {name}""".format(
name=name
)
)
with pytest.raises(
QueryRuntimeException,
@ -1807,6 +1813,10 @@ def test_move_across_policies_work_for_attach_not_work_for_move(start_cluster):
)
)
assert node1.query(
"""SELECT * FROM {name}""".format(name=name)
).splitlines() == ["1"]
finally:
node1.query(f"DROP TABLE IF EXISTS {name} SYNC")
node1.query(f"DROP TABLE IF EXISTS {name}2 SYNC")