Fix Zero-Copy replication with several S3 volumes (issue 22679)

This commit is contained in:
Anton Ivashkin 2021-04-08 21:27:56 +03:00
parent ad85467128
commit c3212f1c46
6 changed files with 167 additions and 23 deletions

View File

@ -410,9 +410,34 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
size_t sum_files_size = 0;
readBinary(sum_files_size, in);
IMergeTreeDataPart::TTLInfos ttl_infos;
/// Skip ttl infos, not required for S3 metadata
String ttl_infos_string;
readBinary(ttl_infos_string, in);
ReadBufferFromString ttl_infos_buffer(ttl_infos_string);
assertString("ttl format version: 1\n", ttl_infos_buffer);
ttl_infos.read(ttl_infos_buffer);
ReservationPtr reservation
= data.balancedReservation(metadata_snapshot, sum_files_size, 0, part_name, part_info, {}, tagger_ptr, &ttl_infos, true);
if (!reservation)
reservation
= data.reserveSpacePreferringTTLRules(metadata_snapshot, sum_files_size, ttl_infos, std::time(nullptr), 0, true);
if (reservation)
{
DiskPtr disk = reservation->getDisk();
if (disk && disk->getType() == DiskType::Type::S3)
{
for (const auto & d : disks_s3)
{
if (d->getPath() == disk->getPath())
{
Disks disks_tmp = { disk };
disks_s3.swap(disks_tmp);
break;
}
}
}
}
String part_type = "Wide";
readStringBinary(part_type, in);
if (part_type == "InMemory")

View File

@ -189,6 +189,9 @@ static void incrementStateMetric(IMergeTreeDataPart::State state)
case IMergeTreeDataPart::State::DeleteOnDestroy:
CurrentMetrics::add(CurrentMetrics::PartsDeleteOnDestroy);
return;
case IMergeTreeDataPart::State::DeleteOnDestroyKeepS3:
CurrentMetrics::add(CurrentMetrics::PartsDeleteOnDestroy);
return;
}
}
@ -214,6 +217,9 @@ static void decrementStateMetric(IMergeTreeDataPart::State state)
case IMergeTreeDataPart::State::DeleteOnDestroy:
CurrentMetrics::sub(CurrentMetrics::PartsDeleteOnDestroy);
return;
case IMergeTreeDataPart::State::DeleteOnDestroyKeepS3:
CurrentMetrics::sub(CurrentMetrics::PartsDeleteOnDestroy);
return;
}
}
@ -393,7 +399,7 @@ void IMergeTreeDataPart::setColumns(const NamesAndTypesList & new_columns)
void IMergeTreeDataPart::removeIfNeeded()
{
if (state == State::DeleteOnDestroy || is_temp)
if (state == State::DeleteOnDestroy || state == State::DeleteOnDestroyKeepS3 || is_temp)
{
try
{
@ -416,9 +422,9 @@ void IMergeTreeDataPart::removeIfNeeded()
}
}
remove(false);
remove(state == State::DeleteOnDestroyKeepS3);
if (state == State::DeleteOnDestroy)
if (state == State::DeleteOnDestroy || state == State::DeleteOnDestroyKeepS3)
{
LOG_TRACE(storage.log, "Removed part from old location {}", path);
}
@ -463,6 +469,8 @@ String IMergeTreeDataPart::stateToString(IMergeTreeDataPart::State state)
return "Deleting";
case State::DeleteOnDestroy:
return "DeleteOnDestroy";
case State::DeleteOnDestroyKeepS3:
return "DeleteOnDestroyKeepS3";
}
__builtin_unreachable();

View File

@ -202,22 +202,24 @@ public:
* Part state should be modified under data_parts mutex.
*
* Possible state transitions:
* Temporary -> Precommitted: we are trying to commit a fetched, inserted or merged part to active set
* Precommitted -> Outdated: we could not add a part to active set and are doing a rollback (for example it is duplicated part)
* Precommitted -> Committed: we successfully committed a part to active dataset
* Precommitted -> Outdated: a part was replaced by a covering part or DROP PARTITION
* Outdated -> Deleting: a cleaner selected this part for deletion
* Deleting -> Outdated: if an ZooKeeper error occurred during the deletion, we will retry deletion
* Committed -> DeleteOnDestroy if part was moved to another disk
* Temporary -> Precommitted: we are trying to commit a fetched, inserted or merged part to active set
* Precommitted -> Outdated: we could not add a part to active set and are doing a rollback (for example it is duplicated part)
* Precommitted -> Committed: we successfully committed a part to active dataset
* Precommitted -> Outdated: a part was replaced by a covering part or DROP PARTITION
* Outdated -> Deleting: a cleaner selected this part for deletion
* Deleting -> Outdated: if an ZooKeeper error occurred during the deletion, we will retry deletion
* Committed -> DeleteOnDestroy if part was moved to another disk
* Committed -> DeleteOnDestroyKeepS3 if part was moved to another disk but shared data on S3
*/
enum class State
{
Temporary, /// the part is generating now, it is not in data_parts list
PreCommitted, /// the part is in data_parts, but not used for SELECTs
Committed, /// active data part, used by current and upcoming SELECTs
Outdated, /// not active data part, but could be used by only current SELECTs, could be deleted after SELECTs finishes
Deleting, /// not active data part with identity refcounter, it is deleting right now by a cleaner
DeleteOnDestroy, /// part was moved to another disk and should be deleted in own destructor
Temporary, /// the part is generating now, it is not in data_parts list
PreCommitted, /// the part is in data_parts, but not used for SELECTs
Committed, /// active data part, used by current and upcoming SELECTs
Outdated, /// not active data part, but could be used by only current SELECTs, could be deleted after SELECTs finishes
Deleting, /// not active data part with identity refcounter, it is deleting right now by a cleaner
DeleteOnDestroy, /// part was moved to another disk and should be deleted in own destructor
DeleteOnDestroyKeepS3, /// same as DeleteOnDestroy but shared S3 data should be keeped
};
using TTLInfo = MergeTreeDataPartTTLInfo;

View File

@ -2611,7 +2611,22 @@ void MergeTreeData::swapActivePart(MergeTreeData::DataPartPtr part_copy)
if (active_part_it == data_parts_by_info.end())
throw Exception("Cannot swap part '" + part_copy->name + "', no such active part.", ErrorCodes::NO_SUCH_DATA_PART);
modifyPartState(original_active_part, DataPartState::DeleteOnDestroy);
/// We do not check allow_s3_zero_copy_replication here because data may be shared
/// when allow_s3_zero_copy_replication turned on and off again
bool keep_s3 = false;
if (original_active_part->volume->getDisk()->getType() == DiskType::Type::S3)
{
if (part_copy->volume->getDisk()->getType() == DiskType::Type::S3
&& original_active_part->getUniqueId() == part_copy->getUniqueId())
{ /// May be when several volumes use the same S3 storage
keep_s3 = true;
}
else
keep_s3 = !unlockSharedData(*original_active_part);
}
modifyPartState(original_active_part, keep_s3 ? DataPartState::DeleteOnDestroyKeepS3 : DataPartState::DeleteOnDestroy);
data_parts_indexes.erase(active_part_it);
auto part_it = data_parts_indexes.insert(part_copy).first;

View File

@ -8,6 +8,18 @@
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s31>
<s31_again>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s31_again>
<s32>
<type>s3</type>
<endpoint>http://minio1:9001/root/data2/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s32>
</disks>
<policies>
<s3>
@ -27,6 +39,26 @@
</external>
</volumes>
</hybrid>
<tiered>
<volumes>
<main>
<disk>s31</disk>
</main>
<external>
<disk>s32</disk>
</external>
</volumes>
</tiered>
<tiered_copy>
<volumes>
<main>
<disk>s31</disk>
</main>
<external>
<disk>s31_again</disk>
</external>
</volumes>
</tiered_copy>
</policies>
</storage_configuration>

View File

@ -27,10 +27,10 @@ def cluster():
cluster.shutdown()
def get_large_objects_count(cluster, size=100):
def get_large_objects_count(cluster, folder='data', size=100):
minio = cluster.minio_client
counter = 0
for obj in minio.list_objects(cluster.minio_bucket, 'data/'):
for obj in minio.list_objects(cluster.minio_bucket, '{}/'.format(folder)):
if obj.size >= size:
counter = counter + 1
return counter
@ -69,7 +69,7 @@ def test_s3_zero_copy_replication(cluster, policy):
# Based on version 20.x - two parts
assert get_large_objects_count(cluster) == 2
node1.query("OPTIMIZE TABLE s3_test")
node1.query("OPTIMIZE TABLE s3_test FINAL")
time.sleep(1)
@ -119,7 +119,7 @@ def test_s3_zero_copy_on_hybrid_storage(cluster):
assert node2.query("SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values") == "('all','default')"
# Total objects in S3
s3_objects = get_large_objects_count(cluster, 0)
s3_objects = get_large_objects_count(cluster, size=0)
node2.query("ALTER TABLE hybrid_test MOVE PARTITION ID 'all' TO DISK 's31'")
@ -127,10 +127,72 @@ def test_s3_zero_copy_on_hybrid_storage(cluster):
assert node2.query("SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values") == "('all','s31')"
# Check that after moving partition on node2 no new obects on s3
assert get_large_objects_count(cluster, 0) == s3_objects
assert get_large_objects_count(cluster, size=0) == s3_objects
assert node1.query("SELECT * FROM hybrid_test ORDER BY id FORMAT Values") == "(0,'data'),(1,'data')"
assert node2.query("SELECT * FROM hybrid_test ORDER BY id FORMAT Values") == "(0,'data'),(1,'data')"
node1.query("DROP TABLE IF EXISTS hybrid_test NO DELAY")
node2.query("DROP TABLE IF EXISTS hybrid_test NO DELAY")
@pytest.mark.parametrize(
"storage_policy", ["tiered", "tiered_copy"]
)
def test_s3_zero_copy_with_ttl_move(cluster, storage_policy):
node1 = cluster.instances["node1"]
node2 = cluster.instances["node2"]
for i in range(10):
node1.query("DROP TABLE IF EXISTS ttl_move_test NO DELAY")
node2.query("DROP TABLE IF EXISTS ttl_move_test NO DELAY")
node1.query(
"""
CREATE TABLE ttl_move_test ON CLUSTER test_cluster (d UInt64, d1 DateTime)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/ttl_move_test', '{}')
ORDER BY d
TTL d1 + INTERVAL 2 DAY TO VOLUME 'external'
SETTINGS storage_policy='{}'
"""
.format('{replica}', storage_policy)
)
node1.query("INSERT INTO ttl_move_test VALUES (10, now() - INTERVAL 3 DAY)")
node1.query("INSERT INTO ttl_move_test VALUES (11, now() - INTERVAL 1 DAY)")
assert node1.query("SELECT count() FROM ttl_move_test FORMAT Values") == "(2)"
assert node2.query("SELECT count() FROM ttl_move_test FORMAT Values") == "(2)"
assert node1.query("SELECT d FROM ttl_move_test ORDER BY d1 FORMAT Values") == "(10),(11)"
assert node2.query("SELECT d FROM ttl_move_test ORDER BY d1 FORMAT Values") == "(10),(11)"
node1.query("DROP TABLE IF EXISTS ttl_move_test NO DELAY")
node2.query("DROP TABLE IF EXISTS ttl_move_test NO DELAY")
def test_s3_zero_copy_with_ttl_delete(cluster):
node1 = cluster.instances["node1"]
node2 = cluster.instances["node2"]
for i in range(10):
node1.query(
"""
CREATE TABLE ttl_delete_test ON CLUSTER test_cluster (d UInt64, d1 DateTime)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/ttl_delete_test', '{}')
ORDER BY d
TTL d1 + INTERVAL 2 DAY
SETTINGS storage_policy='tiered'
"""
.format('{replica}')
)
node1.query("INSERT INTO ttl_delete_test VALUES (10, now() - INTERVAL 3 DAY)")
node1.query("INSERT INTO ttl_delete_test VALUES (11, now() - INTERVAL 1 DAY)")
assert node1.query("SELECT count() FROM ttl_delete_test FORMAT Values") == "(1)"
assert node2.query("SELECT count() FROM ttl_delete_test FORMAT Values") == "(1)"
assert node1.query("SELECT d FROM ttl_delete_test ORDER BY d1 FORMAT Values") == "(11)"
assert node2.query("SELECT d FROM ttl_delete_test ORDER BY d1 FORMAT Values") == "(11)"
node1.query("DROP TABLE IF EXISTS ttl_delete_test NO DELAY")
node2.query("DROP TABLE IF EXISTS ttl_delete_test NO DELAY")