This commit is contained in:
Alexander Tokmakov 2023-06-21 20:29:32 +02:00
parent 318ac807c7
commit 9157314b2a
7 changed files with 32 additions and 14 deletions

View File

@ -544,8 +544,10 @@ void DiskObjectStorage::writeFileUsingBlobWritingFunction(const String & path, W
}
void DiskObjectStorage::applyNewSettings(
const Poco::Util::AbstractConfiguration & config, ContextPtr context_, const String & config_prefix, const DisksMap & disk_map)
const Poco::Util::AbstractConfiguration & config, ContextPtr context_, const String & /*config_prefix*/, const DisksMap & disk_map)
{
/// FIXME we cannot use config_prefix that was passed through arguments because the disk may be wrapped with cache and we need another name
const auto config_prefix = "storage_configuration.disks." + name;
object_storage->applyNewSettings(config, config_prefix, context_);
IDisk::applyNewSettings(config, context_, config_prefix, disk_map);
}

View File

@ -455,23 +455,34 @@ MutableDataPartStoragePtr DataPartStorageOnDiskBase::freeze(
MutableDataPartStoragePtr DataPartStorageOnDiskBase::clonePart(
const std::string & to,
const std::string & dir_path,
const DiskPtr & disk,
Poco::Logger *) const
const DiskPtr & dst_disk,
Poco::Logger * log) const
{
String path_to_clone = fs::path(to) / dir_path / "";
auto src_disk = volume->getDisk();
if (disk->exists(path_to_clone))
if (dst_disk->exists(path_to_clone))
{
throw Exception(ErrorCodes::DIRECTORY_ALREADY_EXISTS,
"Cannot clone part {} from '{}' to '{}': path '{}' already exists",
dir_path, getRelativePath(), path_to_clone, fullPath(disk, path_to_clone));
dir_path, getRelativePath(), path_to_clone, fullPath(dst_disk, path_to_clone));
}
disk->createDirectories(to);
volume->getDisk()->copyDirectoryContent(getRelativePath(), disk, path_to_clone);
volume->getDisk()->removeFileIfExists(fs::path(path_to_clone) / "delete-on-destroy.txt");
try
{
dst_disk->createDirectories(to);
src_disk->copyDirectoryContent(getRelativePath(), dst_disk, path_to_clone);
}
catch (...)
{
/// It's safe to remove it recursively (even with zero-copy-replication)
/// because we've just did full copy through copyDirectoryContent
LOG_WARNING(log, "Removing directory {} after failed attempt to move a data part", path_to_clone);
dst_disk->removeRecursive(path_to_clone);
throw;
}
auto single_disk_volume = std::make_shared<SingleDiskVolume>(disk->getName(), disk, 0);
auto single_disk_volume = std::make_shared<SingleDiskVolume>(dst_disk->getName(), dst_disk, 0);
return create(single_disk_volume, to, dir_path, /*initialize=*/ true);
}

View File

@ -71,7 +71,7 @@ public:
MutableDataPartStoragePtr clonePart(
const std::string & to,
const std::string & dir_path,
const DiskPtr & disk,
const DiskPtr & dst_disk,
Poco::Logger * log) const override;
void rename(

View File

@ -502,8 +502,10 @@ void IMergeTreeDataPart::removeIfNeeded()
throw Exception(ErrorCodes::LOGICAL_ERROR, "relative_path {} of part {} is invalid or not set",
getDataPartStorage().getPartDirectory(), name);
const auto part_parent_directory = directoryPath(part_directory);
bool is_moving_part = part_parent_directory.ends_with("moving/");
fs::path part_directory_path = getDataPartStorage().getRelativePath();
if (part_directory_path.filename().empty())
part_directory_path = part_directory_path.parent_path();
bool is_moving_part = part_directory_path.parent_path().filename() == "moving";
if (!startsWith(file_name, "tmp") && !endsWith(file_name, ".tmp_proj") && !is_moving_part)
{
LOG_ERROR(

View File

@ -36,6 +36,7 @@ try:
from confluent_kafka.avro.cached_schema_registry_client import (
CachedSchemaRegistryClient,
)
from .hdfs_api import HDFSApi # imports requests_kerberos
except Exception as e:
logging.warning(f"Cannot import some modules, some tests may not work: {e}")
@ -51,7 +52,6 @@ from helpers.client import QueryRuntimeException
import docker
from .client import Client
from .hdfs_api import HDFSApi
from .config_cluster import *

View File

@ -72,4 +72,6 @@
</s3_no_retries>
</policies>
</storage_configuration>
<allow_remove_stale_moving_parts>true</allow_remove_stale_moving_parts>
</clickhouse>

View File

@ -183,7 +183,8 @@ def test_move_failover(cluster):
) ENGINE=MergeTree()
ORDER BY id
TTL dt + INTERVAL 4 SECOND TO VOLUME 'external'
SETTINGS storage_policy='s3_cold'
SETTINGS storage_policy='s3_cold', temporary_directories_lifetime=1,
merge_tree_clear_old_temporary_directories_interval_seconds=1
"""
)