Possibility to move part to another disk/volume if first attempt was failed.

This commit is contained in:
Pavel Kovalenko 2020-10-07 14:35:28 +03:00
parent 403a5320f5
commit 69c126f1f1
8 changed files with 96 additions and 4 deletions

View File

@ -180,4 +180,9 @@ void DiskDecorator::sync(int fd) const
delegate->sync(fd);
}
Executor & DiskDecorator::getExecutor()
{
return delegate->getExecutor();
}
}

View File

@ -46,6 +46,7 @@ public:
void close(int fd) const override;
void sync(int fd) const override;
const String getType() const override { return delegate->getType(); }
Executor & getExecutor() override;
protected:
DiskPtr delegate;

View File

@ -195,10 +195,10 @@ public:
/// Invoked when Global Context is shutdown.
virtual void shutdown() { }
private:
/// Returns executor to perform asynchronous operations.
Executor & getExecutor() { return *executor; }
virtual Executor & getExecutor() { return *executor; }
private:
std::unique_ptr<Executor> executor;
};

View File

@ -953,7 +953,10 @@ void IMergeTreeDataPart::makeCloneOnDiskDetached(const ReservationPtr & reservat
String path_to_clone = storage.relative_data_path + "detached/";
if (reserved_disk->exists(path_to_clone + relative_path))
throw Exception("Path " + fullPath(reserved_disk, path_to_clone + relative_path) + " already exists. Can not clone ", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
{
LOG_WARNING(storage.log, "Path " + fullPath(reserved_disk, path_to_clone + relative_path) + " already exists. Will remove it and clone again.");
reserved_disk->removeRecursive(path_to_clone + relative_path + '/');
}
reserved_disk->createDirectory(path_to_clone);
volume->getDisk()->copy(getFullRelativePath(), reserved_disk, path_to_clone);

View File

@ -0,0 +1,4 @@
<yandex>
<background_move_processing_pool_thread_sleep_seconds>0.5</background_move_processing_pool_thread_sleep_seconds>
<background_move_processing_pool_task_sleep_seconds_when_no_work_max>0.5</background_move_processing_pool_task_sleep_seconds_when_no_work_max>
</yandex>

View File

@ -0,0 +1,8 @@
<?xml version="1.0"?>
<yandex>
<part_log>
<database>system</database>
<table>part_log</table>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
</part_log>
</yandex>

View File

@ -12,6 +12,7 @@
<!-- Avoid extra retries to speed up tests -->
<retry_attempts>0</retry_attempts>
</s3>
<default/>
</disks>
<policies>
<s3>
@ -21,6 +22,16 @@
</main>
</volumes>
</s3>
<s3_cold>
<volumes>
<main>
<disk>default</disk>
</main>
<external>
<disk>s3</disk>
</external>
</volumes>
</s3_cold>
</policies>
</storage_configuration>
</yandex>

View File

@ -45,7 +45,10 @@ def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance("node",
main_configs=["configs/config.d/log_conf.xml", "configs/config.d/storage_conf.xml"],
main_configs=["configs/config.d/log_conf.xml",
"configs/config.d/storage_conf.xml",
"configs/config.d/instant_moves.xml",
"configs/config.d/part_log.xml"],
with_minio=True)
logging.info("Starting cluster...")
cluster.start()
@ -115,3 +118,60 @@ def test_write_failover(cluster, min_bytes_for_wide_part, request_count):
assert node.query("CHECK TABLE s3_failover_test") == '1\n'
assert node.query("SELECT * FROM s3_failover_test FORMAT Values") == data
# Check that second data part move is ended successfully if first attempt was failed.
def test_move_failover(cluster):
node = cluster.instances["node"]
node.query(
"""
CREATE TABLE s3_failover_test (
dt DateTime,
id Int64,
data String
) ENGINE=MergeTree()
ORDER BY id
TTL dt + INTERVAL 3 SECOND TO VOLUME 'external'
SETTINGS storage_policy='s3_cold'
"""
)
# Fail a request to S3 to break first TTL move.
fail_request(cluster, 1)
node.query("INSERT INTO s3_failover_test VALUES (now() - 2, 0, 'data'), (now() - 2, 1, 'data')")
# Wait for part move to S3.
max_attempts = 10
for attempt in range(max_attempts + 1):
disk = node.query("SELECT disk_name FROM system.parts WHERE table='s3_failover_test' LIMIT 1")
if disk != "s3\n":
if attempt == max_attempts:
assert disk == "s3\n", "Expected move to S3 while part still on disk " + disk
else:
time.sleep(1)
else:
break
# Ensure part_log is created.
node.query("SYSTEM FLUSH LOGS")
# There should be 2 attempts to move part.
assert node.query("""
SELECT count(*) FROM system.part_log
WHERE event_type='MovePart' AND table='s3_failover_test'
""") == '2\n'
# First attempt should be failed with expected error.
exception = node.query("""
SELECT exception FROM system.part_log
WHERE event_type='MovePart' AND table='s3_failover_test'
ORDER BY event_time
LIMIT 1
""")
assert exception.find("Expected Error") != -1, exception
# Ensure data is not corrupted.
assert node.query("CHECK TABLE s3_failover_test") == '1\n'
assert node.query("SELECT id,data FROM s3_failover_test FORMAT Values") == "(0,'data'),(1,'data')"