fix per review

This commit is contained in:
Zhichang Yu 2021-07-05 11:32:56 +08:00
parent fbd5eee8a1
commit 5047c758f4
30 changed files with 309 additions and 446 deletions

View File

@ -559,6 +559,7 @@
M(589, DISTRIBUTED_BROKEN_BATCH_FILES) \
M(590, CANNOT_SYSCONF) \
M(591, SQLITE_ENGINE_ERROR) \
M(592, ZERO_COPY_REPLICATION_ERROR) \
\
M(998, POSTGRESQL_CONNECTION_FAILURE) \
M(999, KEEPER_EXCEPTION) \

View File

@ -65,6 +65,7 @@ public:
String getUniqueId(const String & path) const override { return delegate->getUniqueId(path); }
bool checkUniqueId(const String & id) const override { return delegate->checkUniqueId(id); }
DiskType::Type getType() const override { return delegate->getType(); }
bool supportZeroCopyReplication() const override { return delegate->supportZeroCopyReplication(); }
void onFreeze(const String & path) override;
SyncGuardPtr getDirectorySyncGuard(const String & path) const override;
void shutdown() override;

View File

@ -100,6 +100,8 @@ public:
DiskType::Type getType() const override { return DiskType::Type::Local; }
bool supportZeroCopyReplication() const override { return false; }
SyncGuardPtr getDirectorySyncGuard(const String & path) const override;
private:

View File

@ -92,6 +92,8 @@ public:
DiskType::Type getType() const override { return DiskType::Type::RAM; }
bool supportZeroCopyReplication() const override { return false; }
private:
void createDirectoriesImpl(const String & path);
void replaceFileImpl(const String & from_path, const String & to_path);

View File

@ -115,7 +115,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskHDFS::writeFile(const String & path
auto hdfs_path = remote_fs_root_path + file_name;
LOG_DEBUG(log, "{} to file by path: {}. HDFS path: {}", mode == WriteMode::Rewrite ? "Write" : "Append",
backQuote(metadata_path + path), remote_fs_root_path + hdfs_path);
backQuote(metadata_path + path), hdfs_path);
/// Single O_WRONLY in libhdfs adds O_TRUNC
auto hdfs_buffer = std::make_unique<WriteBufferFromHDFS>(hdfs_path,
@ -153,20 +153,13 @@ void DiskHDFS::removeFromRemoteFS(RemoteFSPathKeeperPtr fs_paths_keeper)
});
}
String DiskHDFS::getUniqueId(const String & path) const
{
Metadata metadata(remote_fs_root_path, metadata_path, path);
String id;
if (!metadata.remote_fs_objects.empty())
id = metadata.remote_fs_root_path + metadata.remote_fs_objects[0].first;
return id;
}
bool DiskHDFS::checkUniqueId(const String & hdfs_uri) const
{
if (!boost::algorithm::starts_with(hdfs_uri, remote_fs_root_path))
return false;
const size_t begin_of_path = hdfs_uri.find('/', hdfs_uri.find("//") + 2);
const String path = hdfs_uri.substr(begin_of_path);
return (0 == hdfsExists(hdfs_fs.get(), path.c_str()));
const String remote_fs_object_path = hdfs_uri.substr(begin_of_path);
return (0 == hdfsExists(hdfs_fs.get(), remote_fs_object_path.c_str()));
}
namespace

View File

@ -44,6 +44,8 @@ public:
DiskType::Type getType() const override { return DiskType::Type::HDFS; }
bool supportZeroCopyReplication() const override { return true; }
std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
size_t buf_size,
@ -58,10 +60,6 @@ public:
RemoteFSPathKeeperPtr createFSPathKeeper() const override;
/// Return some uniq string for file, overrode for remote disk
/// Required for distinguish different copies of the same part on remote disk
String getUniqueId(const String & path) const override;
/// Check file exists and ClickHouse has an access to it
/// Overrode in remote disk
/// Required for remote disk to ensure that replica has access to data written by other node

View File

@ -211,18 +211,22 @@ public:
/// Return disk type - "local", "s3", etc.
virtual DiskType::Type getType() const = 0;
/// Whether this disk support zero-copy replication.
/// Overrode in remote fs disks.
virtual bool supportZeroCopyReplication() const = 0;
/// Invoked when Global Context is shutdown.
virtual void shutdown() {}
/// Performs action on disk startup.
virtual void startup() {}
/// Return some uniq string for file, overrode for remote disk
/// Return some uniq string for file, overrode for IDiskRemote
/// Required for distinguish different copies of the same part on remote disk
virtual String getUniqueId(const String & path) const { return path; }
/// Check file exists and ClickHouse has an access to it
/// Overrode in remote disk
/// Overrode in remote FS disks (s3/hdfs)
/// Required for remote disk to ensure that replica has access to data written by other node
virtual bool checkUniqueId(const String & id) const { return exists(id); }

View File

@ -488,4 +488,13 @@ bool IDiskRemote::tryReserve(UInt64 bytes)
return false;
}
String IDiskRemote::getUniqueId(const String & path) const
{
Metadata metadata(remote_fs_root_path, metadata_path, path);
String id;
if (!metadata.remote_fs_objects.empty())
id = metadata.remote_fs_root_path + metadata.remote_fs_objects[0].first;
return id;
}
}

View File

@ -117,6 +117,10 @@ public:
ReservationPtr reserve(UInt64 bytes) override;
String getUniqueId(const String & path) const override;
bool checkUniqueId(const String & id) const override = 0;
virtual void removeFromRemoteFS(RemoteFSPathKeeperPtr fs_paths_keeper) = 0;
virtual RemoteFSPathKeeperPtr createFSPathKeeper() const = 0;

View File

@ -158,15 +158,6 @@ DiskS3::DiskS3(
{
}
String DiskS3::getUniqueId(const String & path) const
{
Metadata metadata(remote_fs_root_path, metadata_path, path);
String id;
if (!metadata.remote_fs_objects.empty())
id = metadata.remote_fs_root_path + metadata.remote_fs_objects[0].first;
return id;
}
RemoteFSPathKeeperPtr DiskS3::createFSPathKeeper() const
{
auto settings = current_settings.get();

View File

@ -98,14 +98,12 @@ public:
DiskType::Type getType() const override { return DiskType::Type::S3; }
bool supportZeroCopyReplication() const override { return true; }
void shutdown() override;
void startup() override;
/// Return some uniq string for file, overrode for remote disk
/// Required for distinguish different copies of the same part on remote disk
String getUniqueId(const String & path) const override;
/// Check file exists and ClickHouse has an access to it
/// Overrode in remote disk
/// Required for remote disk to ensure that replica has access to data written by other node

View File

@ -1,6 +1,7 @@
#include <Storages/MergeTree/DataPartsExchange.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <Disks/IDiskRemote.h>
#include <Disks/SingleDiskVolume.h>
#include <Disks/createVolume.h>
#include <IO/HTTPCommon.h>
@ -15,7 +16,8 @@
#include <IO/createReadBufferFromFileBase.h>
#include <common/scope_guard.h>
#include <Poco/Net/HTTPRequest.h>
#include <iterator>
#include <regex>
namespace fs = std::filesystem;
@ -40,6 +42,8 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
extern const int S3_ERROR;
extern const int INCORRECT_PART_TYPE;
extern const int ZERO_COPY_REPLICATION_ERROR;
extern const int NO_RESERVATIONS_PROVIDED;
}
namespace DataPartsExchange
@ -52,9 +56,8 @@ constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS = 2;
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_TYPE = 3;
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_DEFAULT_COMPRESSION = 4;
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_UUID = 5;
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_S3_COPY = 6;
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_ZERO_COPY = 6;
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION = 7;
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_HDFS_COPY = 8;
std::string getEndpointId(const std::string & node_id)
@ -123,7 +126,7 @@ void Service::processQuery(const HTMLForm & params, ReadBuffer & /*body*/, Write
}
/// We pretend to work as older server version, to be sure that client will correctly process our version
response.addCookie({"server_protocol_version", toString(std::min(client_protocol_version, REPLICATION_PROTOCOL_VERSION_WITH_PARTS_HDFS_COPY))});
response.addCookie({"server_protocol_version", toString(std::min(client_protocol_version, REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION))});
++total_sends;
SCOPE_EXIT({--total_sends;});
@ -169,37 +172,27 @@ void Service::processQuery(const HTMLForm & params, ReadBuffer & /*body*/, Write
if (client_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_UUID)
writeUUIDText(part->uuid, out);
bool try_use_s3_copy = false;
bool try_use_hdfs_copy = false;
int send_s3_metadata = parse<int>(params.get("send_s3_metadata", "0"));
int send_hdfs_metadata = parse<int>(params.get("send_hdfs_metadata", "0"));
auto disk_type = part->volume->getDisk()->getType();
String remote_fs_metadata = parse<String>(params.get("remote_fs_metadata", ""));
std::regex re("\\s*,\\s*");
Strings capability(
std::sregex_token_iterator(remote_fs_metadata.begin(), remote_fs_metadata.end(), re, -1),
std::sregex_token_iterator());
if (disk_type == DiskType::Type::S3
&& send_s3_metadata
&& data_settings->allow_s3_zero_copy_replication
&& client_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_S3_COPY)
{ /// if source and destination are in the same S3 storage we try to use S3 CopyObject request first
try_use_s3_copy = true;
}
else if (disk_type == DiskType::Type::HDFS
&& send_hdfs_metadata
&& data_settings->allow_hdfs_zero_copy_replication
&& client_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_HDFS_COPY)
{ /// if source and destination are in the same HDFS storage
try_use_hdfs_copy = true;
}
if (try_use_s3_copy)
if (data_settings->allow_remote_fs_zero_copy_replication &&
client_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_ZERO_COPY)
{
response.addCookie({"send_s3_metadata", "1"});
sendPartFromDiskRemoteMeta(part, out);
auto disk = part->volume->getDisk();
auto disk_type = DiskType::toString(disk->getType());
if (disk->supportZeroCopyReplication() && std::find(capability.begin(), capability.end(), disk_type) != capability.end())
{
/// Send metadata if the receiver's capability covers the source disk type.
response.addCookie({"remote_fs_metadata", disk_type});
sendPartFromDiskRemoteMeta(part, out);
return;
}
}
else if (try_use_hdfs_copy)
{
response.addCookie({"send_hdfs_metadata", "1"});
sendPartFromDiskRemoteMeta(part, out);
}
else if (client_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION)
if (client_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION)
{
const auto & projections = part->getProjectionParts();
writeBinary(projections.size(), out);
@ -344,8 +337,8 @@ void Service::sendPartFromDiskRemoteMeta(const MergeTreeData::DataPartPtr & part
checksums.files[file_name] = {};
auto disk = part->volume->getDisk();
if (disk->getType() != DiskType::Type::S3 && disk->getType() != DiskType::Type::HDFS)
throw Exception("remote disk is not remote anymore", ErrorCodes::LOGICAL_ERROR);
if (!disk->supportZeroCopyReplication())
throw Exception(fmt::format("disk {} doesn't support zero-copy replication", disk->getName()), ErrorCodes::LOGICAL_ERROR);
part->storage.lockSharedData(*part);
@ -362,9 +355,9 @@ void Service::sendPartFromDiskRemoteMeta(const MergeTreeData::DataPartPtr & part
fs::path metadata(metadata_file);
if (!fs::exists(metadata))
throw Exception("remote metadata '" + file_name + "' is not exists", ErrorCodes::CORRUPTED_DATA);
throw Exception("Remote metadata '" + file_name + "' is not exists", ErrorCodes::CORRUPTED_DATA);
if (!fs::is_regular_file(metadata))
throw Exception("remote metadata '" + file_name + "' is not a file", ErrorCodes::CORRUPTED_DATA);
throw Exception("Remote metadata '" + file_name + "' is not a file", ErrorCodes::CORRUPTED_DATA);
UInt64 file_size = fs::file_size(metadata);
writeStringBinary(it.first, out);
@ -411,7 +404,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
const String & tmp_prefix_,
std::optional<CurrentlySubmergingEmergingTagger> * tagger_ptr,
bool try_zero_copy,
const DiskPtr disk_remote)
DiskPtr disk)
{
if (blocker.isCancelled())
throw Exception("Fetching of part was cancelled", ErrorCodes::ABORTED);
@ -428,60 +421,38 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
{
{"endpoint", getEndpointId(replica_path)},
{"part", part_name},
{"client_protocol_version", toString(REPLICATION_PROTOCOL_VERSION_WITH_PARTS_HDFS_COPY)},
{"client_protocol_version", toString(REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION)},
{"compress", "false"}
});
bool try_use_s3_copy = false;
bool try_use_hdfs_copy = false;
if (try_zero_copy && disk_remote && disk_remote->getType() != DiskType::Type::S3 && disk_remote->getType() != DiskType::Type::HDFS)
throw Exception("Try to fetch shared part on non-shared disk", ErrorCodes::LOGICAL_ERROR);
Disks disks_s3;
Disks disks_hdfs;
if (try_zero_copy)
Strings capability;
if (try_zero_copy && data_settings->allow_remote_fs_zero_copy_replication)
{
try_use_s3_copy = true;
try_use_hdfs_copy = true;
if (!data_settings->allow_s3_zero_copy_replication)
if (!disk)
{
try_use_s3_copy = false;
}
else
{
if (disk_remote && disk_remote->getType() == DiskType::Type::S3)
disks_s3.push_back(disk_remote);
else
DiskType::Type zero_copy_disk_types[] = {DiskType::Type::S3, DiskType::Type::HDFS};
for (auto disk_type: zero_copy_disk_types)
{
disks_s3 = data.getDisksByType(DiskType::Type::S3);
if (disks_s3.empty())
try_use_s3_copy = false;
Disks disks = data.getDisksByType(disk_type);
if (!disks.empty())
{
capability.push_back(DiskType::toString(disk_type));
}
}
}
if (!data_settings->allow_hdfs_zero_copy_replication)
else if (disk->supportZeroCopyReplication())
{
try_use_hdfs_copy = false;
}
else
{
if (disk_remote && disk_remote->getType() == DiskType::Type::HDFS)
disks_hdfs.push_back(disk_remote);
else
{
disks_hdfs = data.getDisksByType(DiskType::Type::HDFS);
if (disks_hdfs.empty())
try_use_hdfs_copy = false;
}
capability.push_back(DiskType::toString(disk->getType()));
}
}
if (try_use_s3_copy)
if (!capability.empty())
{
uri.addQueryParameter("send_s3_metadata", "1");
const String & remote_fs_metadata = boost::algorithm::join(capability, ", ");
uri.addQueryParameter("remote_fs_metadata", remote_fs_metadata);
}
if (try_use_hdfs_copy)
else
{
uri.addQueryParameter("send_hdfs_metadata", "1");
try_zero_copy = false;
}
Poco::Net::HTTPBasicCredentials creds{};
@ -504,99 +475,6 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
int server_protocol_version = parse<int>(in.getResponseCookie("server_protocol_version", "0"));
int send_s3 = parse<int>(in.getResponseCookie("send_s3_metadata", "0"));
int send_hdfs = parse<int>(in.getResponseCookie("send_hdfs_metadata", "0"));
if (send_s3 == 1 || send_hdfs == 1)
{
if (send_s3 == 1)
{
if (server_protocol_version < REPLICATION_PROTOCOL_VERSION_WITH_PARTS_S3_COPY)
throw Exception("Got 'send_s3_metadata' cookie with old protocol version", ErrorCodes::LOGICAL_ERROR);
if (!try_use_s3_copy)
throw Exception("Got 'send_s3_metadata' cookie when was not requested", ErrorCodes::LOGICAL_ERROR);
}
if (send_hdfs == 1)
{
if (server_protocol_version < REPLICATION_PROTOCOL_VERSION_WITH_PARTS_HDFS_COPY)
throw Exception("Got 'send_hdfs_metadata' cookie with old protocol version", ErrorCodes::LOGICAL_ERROR);
if (!try_use_hdfs_copy)
throw Exception("Got 'send_hdfs_metadata' cookie when was not requested", ErrorCodes::LOGICAL_ERROR);
}
size_t sum_files_size = 0;
readBinary(sum_files_size, in);
IMergeTreeDataPart::TTLInfos ttl_infos;
String ttl_infos_string;
readBinary(ttl_infos_string, in);
ReadBufferFromString ttl_infos_buffer(ttl_infos_string);
assertString("ttl format version: 1\n", ttl_infos_buffer);
ttl_infos.read(ttl_infos_buffer);
ReservationPtr reservation
= data.balancedReservation(metadata_snapshot, sum_files_size, 0, part_name, part_info, {}, tagger_ptr, &ttl_infos, true);
if (!reservation)
reservation
= data.reserveSpacePreferringTTLRules(metadata_snapshot, sum_files_size, ttl_infos, std::time(nullptr), 0, true);
if (reservation)
{
/// When we have multi-volume storage, one of them was chosen, depends on TTL, free space, etc.
/// Chosen one may be S3 or not.
DiskPtr disk = reservation->getDisk();
if (send_s3 && disk && disk->getType() == DiskType::Type::S3)
{
for (const auto & d : disks_s3)
{
if (d->getPath() == disk->getPath())
{
Disks disks_tmp = { disk };
disks_s3.swap(disks_tmp);
break;
}
}
}
if (send_hdfs && disk && disk->getType() == DiskType::Type::HDFS)
{
for (const auto & d : disks_hdfs)
{
if (d->getPath() == disk->getPath())
{
Disks disks_tmp = { disk };
disks_hdfs.swap(disks_tmp);
break;
}
}
}
}
String part_type = "Wide";
readStringBinary(part_type, in);
if (part_type == "InMemory")
throw Exception("Got 'send_s3_metadata' or 'send_hdfs_metadata' cookie for in-memory part", ErrorCodes::INCORRECT_PART_TYPE);
UUID part_uuid = UUIDHelpers::Nil;
/// Always true due to values of constants. But we keep this condition just in case.
if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_UUID) //-V547
readUUIDText(part_uuid, in);
try
{
if (send_s3)
return downloadPartToDiskRemoteMeta(part_name, replica_path, to_detached, tmp_prefix_, std::move(disks_s3), in, throttler);
else
return downloadPartToDiskRemoteMeta(part_name, replica_path, to_detached, tmp_prefix_, std::move(disks_hdfs), in, throttler);
}
catch (const Exception & e)
{
if (e.code() != ErrorCodes::S3_ERROR)
throw;
/// Try again but without S3/HDFS copy
return fetchPart(metadata_snapshot, context, part_name, replica_path, host, port, timeouts,
user, password, interserver_scheme, throttler, to_detached, tmp_prefix_, nullptr, false);
}
}
ReservationPtr reservation;
size_t sum_files_size = 0;
if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE)
@ -610,24 +488,29 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
ReadBufferFromString ttl_infos_buffer(ttl_infos_string);
assertString("ttl format version: 1\n", ttl_infos_buffer);
ttl_infos.read(ttl_infos_buffer);
reservation
= data.balancedReservation(metadata_snapshot, sum_files_size, 0, part_name, part_info, {}, tagger_ptr, &ttl_infos, true);
if (!reservation)
if (!disk)
{
reservation
= data.reserveSpacePreferringTTLRules(metadata_snapshot, sum_files_size, ttl_infos, std::time(nullptr), 0, true);
= data.balancedReservation(metadata_snapshot, sum_files_size, 0, part_name, part_info, {}, tagger_ptr, &ttl_infos, true);
if (!reservation)
reservation
= data.reserveSpacePreferringTTLRules(metadata_snapshot, sum_files_size, ttl_infos, std::time(nullptr), 0, true);
}
}
else
else if (!disk)
{
reservation = data.balancedReservation(metadata_snapshot, sum_files_size, 0, part_name, part_info, {}, tagger_ptr, nullptr);
if (!reservation)
reservation = data.reserveSpace(sum_files_size);
}
}
else
else if (!disk)
{
/// We don't know real size of part because sender server version is too old
reservation = data.makeEmptyReservationOnLargestDisk();
}
if (!disk)
disk = reservation->getDisk();
bool sync = (data_settings->min_compressed_bytes_to_fsync_after_fetch
&& sum_files_size >= data_settings->min_compressed_bytes_to_fsync_after_fetch);
@ -640,8 +523,35 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_UUID)
readUUIDText(part_uuid, in);
String remote_fs_metadata = parse<String>(in.getResponseCookie("remote_fs_metadata", ""));
if (!remote_fs_metadata.empty())
{
if (!try_zero_copy)
throw Exception("Got unexpected 'remote_fs_metadata' cookie", ErrorCodes::LOGICAL_ERROR);
if (std::find(capability.begin(), capability.end(), remote_fs_metadata) == capability.end())
throw Exception(fmt::format("Got 'remote_fs_metadata' cookie {}, expect one from {}", remote_fs_metadata, fmt::join(capability, ", ")), ErrorCodes::LOGICAL_ERROR);
if (server_protocol_version < REPLICATION_PROTOCOL_VERSION_WITH_PARTS_ZERO_COPY)
throw Exception(fmt::format("Got 'remote_fs_metadata' cookie with old protocol version {}", server_protocol_version), ErrorCodes::LOGICAL_ERROR);
if (part_type == "InMemory")
throw Exception("Got 'remote_fs_metadata' cookie for in-memory part", ErrorCodes::INCORRECT_PART_TYPE);
try
{
return downloadPartToDiskRemoteMeta(part_name, replica_path, to_detached, tmp_prefix_, disk, in, throttler);
}
catch (const Exception & e)
{
if (e.code() != ErrorCodes::S3_ERROR && e.code() != ErrorCodes::ZERO_COPY_REPLICATION_ERROR)
throw;
LOG_WARNING(log, e.message() + " Will retry fetching part without zero-copy.");
/// Try again but without zero-copy
return fetchPart(metadata_snapshot, context, part_name, replica_path, host, port, timeouts,
user, password, interserver_scheme, throttler, to_detached, tmp_prefix_, nullptr, false, disk);
}
}
auto storage_id = data.getStorageID();
String new_part_path = part_type == "InMemory" ? "memory" : fs::path(data.getFullPathOnDisk(reservation->getDisk())) / part_name / "";
String new_part_path = part_type == "InMemory" ? "memory" : fs::path(data.getFullPathOnDisk(disk)) / part_name / "";
auto entry = data.getContext()->getReplicatedFetchList().insert(
storage_id.getDatabaseName(), storage_id.getTableName(),
part_info.partition_id, part_name, new_part_path,
@ -649,15 +559,14 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
in.setNextCallback(ReplicatedFetchReadCallback(*entry));
size_t projections = 0;
if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION)
readBinary(projections, in);
MergeTreeData::DataPart::Checksums checksums;
return part_type == "InMemory"
? downloadPartToMemory(part_name, part_uuid, metadata_snapshot, context, std::move(reservation), in, projections, throttler)
: downloadPartToDisk(part_name, replica_path, to_detached, tmp_prefix_, sync, reservation->getDisk(), in, projections, checksums, throttler);
? downloadPartToMemory(part_name, part_uuid, metadata_snapshot, context, disk, in, projections, throttler)
: downloadPartToDisk(part_name, replica_path, to_detached, tmp_prefix_, sync, disk, in, projections, checksums, throttler);
}
MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
@ -665,12 +574,12 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
const UUID & part_uuid,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
ReservationPtr reservation,
DiskPtr disk,
PooledReadWriteBufferFromHTTP & in,
size_t projections,
ThrottlerPtr throttler)
{
auto volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, reservation->getDisk(), 0);
auto volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, disk, 0);
MergeTreeData::MutableDataPartPtr new_data_part =
std::make_shared<MergeTreeDataPartInMemory>(data, part_name, volume);
@ -862,31 +771,19 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDiskRemoteMeta(
const String & replica_path,
bool to_detached,
const String & tmp_prefix_,
const Disks & disks_remote,
DiskPtr disk,
PooledReadWriteBufferFromHTTP & in,
ThrottlerPtr throttler)
{
if (disks_remote.empty())
throw Exception("No remote disks anymore", ErrorCodes::LOGICAL_ERROR);
String part_id;
readStringBinary(part_id, in);
DiskPtr disk = nullptr;
for (const auto & disk_remote : disks_remote)
if (!disk->supportZeroCopyReplication() || !disk->checkUniqueId(part_id))
{
if (disk_remote->checkUniqueId(part_id))
{
disk = disk_remote;
break;
}
}
if (disk == nullptr)
{
LOG_WARNING(log, "Part {} unique id {} doesn't exist on this replica's remote disks. Will retry fetching part disabling zero-copy.",
part_name, part_id);
throw Exception("Part " + part_name + " unique id " + part_id + " doesn't exist on this replica's remote disks.", ErrorCodes::S3_ERROR);
throw Exception(fmt::format("Part {} unique id {} doesn't exist on {}.", part_name, part_id, disk->getName()), ErrorCodes::ZERO_COPY_REPLICATION_ERROR);
}
LOG_DEBUG(log, "Downloading Part {} unique id {} metadata onto disk {}.",
part_name, part_id, disk->getName());
static const String TMP_PREFIX = "tmp_fetch_";
String tmp_prefix = tmp_prefix_.empty() ? TMP_PREFIX : tmp_prefix_;

View File

@ -82,7 +82,7 @@ public:
const String & tmp_prefix_ = "",
std::optional<CurrentlySubmergingEmergingTagger> * tagger_ptr = nullptr,
bool try_zero_copy = true,
const DiskPtr disk_remote = nullptr);
DiskPtr dest_disk = nullptr);
/// You need to stop the data transfer.
ActionBlocker blocker;
@ -115,7 +115,7 @@ private:
const UUID & part_uuid,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
ReservationPtr reservation,
DiskPtr disk,
PooledReadWriteBufferFromHTTP & in,
size_t projections,
ThrottlerPtr throttler);
@ -125,7 +125,7 @@ private:
const String & replica_path,
bool to_detached,
const String & tmp_prefix_,
const Disks & disks_remote,
DiskPtr disk,
PooledReadWriteBufferFromHTTP & in,
ThrottlerPtr throttler);

View File

@ -1501,16 +1501,11 @@ SerializationPtr IMergeTreeDataPart::getSerializationForColumn(const NameAndType
String IMergeTreeDataPart::getUniqueId() const
{
String id;
auto disk = volume->getDisk();
if (!disk->supportZeroCopyReplication())
throw Exception(fmt::format("Disk {} doesn't support zero-copy replication", disk->getName()), ErrorCodes::LOGICAL_ERROR);
if (disk->getType() == DiskType::Type::S3 || disk->getType() == DiskType::Type::HDFS)
id = disk->getUniqueId(fs::path(getFullRelativePath()) / "checksums.txt");
if (id.empty())
throw Exception("Can't get unique S3/HDFS object", ErrorCodes::LOGICAL_ERROR);
String id = disk->getUniqueId(fs::path(getFullRelativePath()) / "checksums.txt");
return id;
}

View File

@ -2746,15 +2746,15 @@ void MergeTreeData::swapActivePart(MergeTreeData::DataPartPtr part_copy)
if (active_part_it == data_parts_by_info.end())
throw Exception("Cannot swap part '" + part_copy->name + "', no such active part.", ErrorCodes::NO_SUCH_DATA_PART);
/// We do not check allow_s3_zero_copy_replication/allow_hdfs_zero_copy_replication here because data may be shared
/// when allow_s3_zero_copy_replication/allow_hdfs_zero_copy_replication turned on and off again
/// We do not check allow_remote_fs_zero_copy_replication here because data may be shared
/// when allow_remote_fs_zero_copy_replication turned on and off again
original_active_part->force_keep_shared_data = false;
auto orig_disk_type = original_active_part->volume->getDisk()->getType();
if (orig_disk_type == DiskType::Type::S3 || orig_disk_type == DiskType::Type::HDFS)
auto original_disk_type = original_active_part->volume->getDisk()->getType();
if (original_disk_type == DiskType::Type::S3 || original_disk_type == DiskType::Type::HDFS)
{
if (part_copy->volume->getDisk()->getType() == orig_disk_type
if (part_copy->volume->getDisk()->getType() == original_disk_type
&& original_active_part->getUniqueId() == part_copy->getUniqueId())
{
/// May be when several volumes use the same S3 storage

View File

@ -813,11 +813,11 @@ public:
bool scheduleDataMovingJob(IBackgroundJobExecutor & executor);
bool areBackgroundMovesNeeded() const;
/// Lock part in zookeeper for use common S3/HDFS data in several nodes
/// Lock part in zookeeper for shared data in several nodes
/// Overridden in StorageReplicatedMergeTree
virtual void lockSharedData(const IMergeTreeDataPart &) const {}
/// Unlock common S3/HDFS data part in zookeeper
/// Unlock shared data part in zookeeper
/// Overridden in StorageReplicatedMergeTree
virtual bool unlockSharedData(const IMergeTreeDataPart &) const { return true; }

View File

@ -195,17 +195,14 @@ MergeTreeData::DataPartPtr MergeTreePartsMover::clonePart(const MergeTreeMoveEnt
throw Exception("Cancelled moving parts.", ErrorCodes::ABORTED);
auto settings = data->getSettings();
auto part = moving_part.part;
LOG_TRACE(log, "Cloning part {}", part->name);
auto disk = moving_part.reserved_space->getDisk();
auto disk_type = disk->getType();
LOG_DEBUG(log, "Cloning part {} from {} to {}", part->name, part->volume->getDisk()->getName(), disk->getName());
const String directory_to_move = "moving";
if ((disk_type==DiskType::Type::S3 && settings->allow_s3_zero_copy_replication) || (disk_type==DiskType::Type::HDFS && settings->allow_hdfs_zero_copy_replication))
if (disk->supportZeroCopyReplication() && settings->allow_remote_fs_zero_copy_replication)
{
/// Try to fetch part from S3/HDFS without copy and fallback to default copy
/// if it's not possible
/// Try zero-copy replication and fallback to default copy if it's not possible
moving_part.part->assertOnDisk();
String path_to_clone = fs::path(data->getRelativeDataPath()) / directory_to_move / "";
String relative_path = part->relative_path;

View File

@ -74,8 +74,7 @@ struct Settings;
M(Seconds, prefer_fetch_merged_part_time_threshold, 3600, "If time passed after replication log entry creation exceeds this threshold and sum size of parts is greater than \"prefer_fetch_merged_part_size_threshold\", prefer fetching merged part from replica instead of doing merge locally. To speed up very long merges.", 0) \
M(UInt64, prefer_fetch_merged_part_size_threshold, 10ULL * 1024 * 1024 * 1024, "If sum size of parts exceeds this threshold and time passed after replication log entry creation is greater than \"prefer_fetch_merged_part_time_threshold\", prefer fetching merged part from replica instead of doing merge locally. To speed up very long merges.", 0) \
M(Seconds, execute_merges_on_single_replica_time_threshold, 0, "When greater than zero only a single replica starts the merge immediately, others wait up to that amount of time to download the result instead of doing merges locally. If the chosen replica doesn't finish the merge during that amount of time, fallback to standard behavior happens.", 0) \
M(Seconds, s3_execute_merges_on_single_replica_time_threshold, 3 * 60 * 60, "When greater than zero only a single replica starts the merge immediatelys when merged part on S3 storage and 'allow_s3_zero_copy_replication' is enabled.", 0) \
M(Seconds, hdfs_execute_merges_on_single_replica_time_threshold, 3 * 60 * 60, "When greater than zero only a single replica starts the merge immediatelys when merged part on HDFS storage and 'allow_hdfs_zero_copy_replication' is enabled.", 0) \
M(Seconds, remote_fs_execute_merges_on_single_replica_time_threshold, 3 * 60 * 60, "When greater than zero only a single replica starts the merge immediatelys when merged part on shared storage and 'allow_remote_fs_zero_copy_replication' is enabled.", 0) \
M(Seconds, try_fetch_recompressed_part_timeout, 7200, "Recompression works slow in most cases, so we don't start merge with recompression until this timeout and trying to fetch recompressed part from replica which assigned this merge with recompression.", 0) \
M(Bool, always_fetch_merged_part, 0, "If true, replica never merge parts and always download merged parts from other replicas.", 0) \
M(UInt64, max_suspicious_broken_parts, 10, "Max broken parts, if more - deny automatic deletion.", 0) \
@ -124,8 +123,7 @@ struct Settings;
M(UInt64, concurrent_part_removal_threshold, 100, "Activate concurrent part removal (see 'max_part_removal_threads') only if the number of inactive data parts is at least this.", 0) \
M(String, storage_policy, "default", "Name of storage disk policy", 0) \
M(Bool, allow_nullable_key, false, "Allow Nullable types as primary keys.", 0) \
M(Bool, allow_s3_zero_copy_replication, false, "Allow Zero-copy replication over S3", 0) \
M(Bool, allow_hdfs_zero_copy_replication, false, "Allow Zero-copy replication over HDFS", 0) \
M(Bool, allow_remote_fs_zero_copy_replication, false, "Allow Zero-copy replication over remote fs", 0) \
M(Bool, remove_empty_parts, true, "Remove empty parts after they were pruned by TTL, mutation, or collapsing merge algorithm", 0) \
M(Bool, assign_part_uuids, false, "Generate UUIDs for parts. Before enabling check that all replicas support new format.", 0) \
M(Int64, max_partitions_to_read, -1, "Limit the max number of partitions that can be accessed in one query. <= 0 means unlimited. This setting is the default that can be overridden by the query-level setting with the same name.", 0) \

View File

@ -56,20 +56,9 @@ bool ReplicatedMergeTreeMergeStrategyPicker::shouldMergeOnSingleReplica(const Re
}
bool ReplicatedMergeTreeMergeStrategyPicker::shouldMergeOnSingleReplicaS3Shared(const ReplicatedMergeTreeLogEntryData & entry) const
bool ReplicatedMergeTreeMergeStrategyPicker::shouldMergeOnSingleReplicaShared(const ReplicatedMergeTreeLogEntryData & entry) const
{
time_t threshold = s3_execute_merges_on_single_replica_time_threshold;
return (
threshold > 0 /// feature turned on
&& entry.type == ReplicatedMergeTreeLogEntry::MERGE_PARTS /// it is a merge log entry
&& entry.create_time + threshold > time(nullptr) /// not too much time waited
);
}
bool ReplicatedMergeTreeMergeStrategyPicker::shouldMergeOnSingleReplicaHdfsShared(const ReplicatedMergeTreeLogEntryData & entry) const
{
time_t threshold = hdfs_execute_merges_on_single_replica_time_threshold;
time_t threshold = remote_fs_execute_merges_on_single_replica_time_threshold;
return (
threshold > 0 /// feature turned on
&& entry.type == ReplicatedMergeTreeLogEntry::MERGE_PARTS /// it is a merge log entry
@ -113,29 +102,23 @@ void ReplicatedMergeTreeMergeStrategyPicker::refreshState()
{
const auto settings = storage.getSettings();
auto threshold = settings->execute_merges_on_single_replica_time_threshold.totalSeconds();
auto threshold_s3 = 0;
if (settings->allow_s3_zero_copy_replication)
threshold_s3 = settings->s3_execute_merges_on_single_replica_time_threshold.totalSeconds();
auto threshold_hdfs = 0;
if (settings->allow_hdfs_zero_copy_replication)
threshold_hdfs = settings->hdfs_execute_merges_on_single_replica_time_threshold.totalSeconds();
auto threshold_init = 0;
if (settings->allow_remote_fs_zero_copy_replication)
threshold_init = settings->remote_fs_execute_merges_on_single_replica_time_threshold.totalSeconds();
if (threshold == 0)
/// we can reset the settings w/o lock (it's atomic)
execute_merges_on_single_replica_time_threshold = threshold;
if (threshold_s3 == 0)
s3_execute_merges_on_single_replica_time_threshold = threshold_s3;
if (threshold_hdfs == 0)
hdfs_execute_merges_on_single_replica_time_threshold = threshold_hdfs;
if (threshold == 0 && threshold_s3 == 0 && threshold_hdfs == 0)
if (threshold_init == 0)
remote_fs_execute_merges_on_single_replica_time_threshold = threshold_init;
if (threshold == 0 && threshold_init == 0)
return;
auto now = time(nullptr);
/// the setting was already enabled, and last state refresh was done recently
if (((threshold != 0 && execute_merges_on_single_replica_time_threshold != 0)
|| (threshold_s3 != 0 && s3_execute_merges_on_single_replica_time_threshold != 0)
|| (threshold_hdfs != 0 && hdfs_execute_merges_on_single_replica_time_threshold != 0))
|| (threshold_init != 0 && remote_fs_execute_merges_on_single_replica_time_threshold != 0))
&& now - last_refresh_time < REFRESH_STATE_MINIMUM_INTERVAL_SECONDS)
return;
@ -164,17 +147,15 @@ void ReplicatedMergeTreeMergeStrategyPicker::refreshState()
LOG_WARNING(storage.log, "Can't find current replica in the active replicas list, or too few active replicas to use execute_merges_on_single_replica_time_threshold!");
/// we can reset the settings w/o lock (it's atomic)
execute_merges_on_single_replica_time_threshold = 0;
s3_execute_merges_on_single_replica_time_threshold = 0;
remote_fs_execute_merges_on_single_replica_time_threshold = 0;
return;
}
std::lock_guard lock(mutex);
if (threshold != 0) /// Zeros already reset
execute_merges_on_single_replica_time_threshold = threshold;
if (threshold_s3 != 0)
s3_execute_merges_on_single_replica_time_threshold = threshold_s3;
if (threshold_hdfs != 0)
hdfs_execute_merges_on_single_replica_time_threshold = threshold_hdfs;
if (threshold_init != 0)
remote_fs_execute_merges_on_single_replica_time_threshold = threshold_init;
last_refresh_time = now;
current_replica_index = current_replica_index_tmp;
active_replicas = active_replicas_tmp;

View File

@ -52,13 +52,9 @@ public:
/// and we may need to do a fetch (or postpone) instead of merge
bool shouldMergeOnSingleReplica(const ReplicatedMergeTreeLogEntryData & entry) const;
/// return true if s3_execute_merges_on_single_replica_time_threshold feature is active
/// return true if remote_fs_execute_merges_on_single_replica_time_threshold feature is active
/// and we may need to do a fetch (or postpone) instead of merge
bool shouldMergeOnSingleReplicaS3Shared(const ReplicatedMergeTreeLogEntryData & entry) const;
/// return true if hdfs_execute_merges_on_single_replica_time_threshold feature is active
/// and we may need to do a fetch (or postpone) instead of merge
bool shouldMergeOnSingleReplicaHdfsShared(const ReplicatedMergeTreeLogEntryData & entry) const;
bool shouldMergeOnSingleReplicaShared(const ReplicatedMergeTreeLogEntryData & entry) const;
/// returns the replica name
/// and it's not current replica should do the merge
@ -76,8 +72,7 @@ private:
uint64_t getEntryHash(const ReplicatedMergeTreeLogEntryData & entry) const;
std::atomic<time_t> execute_merges_on_single_replica_time_threshold = 0;
std::atomic<time_t> s3_execute_merges_on_single_replica_time_threshold = 0;
std::atomic<time_t> hdfs_execute_merges_on_single_replica_time_threshold = 0;
std::atomic<time_t> remote_fs_execute_merges_on_single_replica_time_threshold = 0;
std::atomic<time_t> last_refresh_time = 0;
std::mutex mutex;

View File

@ -607,17 +607,12 @@ void StorageReplicatedMergeTree::createNewZooKeeperNodes()
zookeeper->createIfNotExists(zookeeper_path + "/mutations", String());
zookeeper->createIfNotExists(replica_path + "/mutation_pointer", String());
/// Nodes for zero-copy S3 replication
/// Nodes for remote fs zero-copy replication
const auto settings = getSettings();
if (settings->allow_s3_zero_copy_replication)
if (settings->allow_remote_fs_zero_copy_replication)
{
zookeeper->createIfNotExists(zookeeper_path + "/zero_copy_s3", String());
zookeeper->createIfNotExists(zookeeper_path + "/zero_copy_s3/shared", String());
}
/// Nodes for zero-copy HDFS replication
if (settings->allow_hdfs_zero_copy_replication)
{
zookeeper->createIfNotExists(zookeeper_path + "/zero_copy_hdfs", String());
zookeeper->createIfNotExists(zookeeper_path + "/zero_copy_hdfs/shared", String());
}
@ -1736,23 +1731,18 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
future_merged_part.updatePath(*this, reserved_space);
future_merged_part.merge_type = entry.merge_type;
auto disk_type = reserved_space->getDisk()->getType();
if ((disk_type == DiskType::Type::S3
&& storage_settings_ptr->allow_s3_zero_copy_replication
&& merge_strategy_picker.shouldMergeOnSingleReplicaS3Shared(entry))
|| (disk_type == DiskType::Type::HDFS
&& storage_settings_ptr->allow_hdfs_zero_copy_replication
&& merge_strategy_picker.shouldMergeOnSingleReplicaHdfsShared(entry)))
if (reserved_space->getDisk()->supportZeroCopyReplication()
&& storage_settings_ptr->allow_remote_fs_zero_copy_replication
&& merge_strategy_picker.shouldMergeOnSingleReplicaShared(entry))
{
if (!replica_to_execute_merge_picked)
replica_to_execute_merge = merge_strategy_picker.pickReplicaToExecuteMerge(entry);
if (replica_to_execute_merge)
{
const auto * param = (disk_type == DiskType::Type::S3) ? "s3_execute_merges_on_single_replica_time_threshold" : "hdfs_execute_merges_on_single_replica_time_threshold";
LOG_DEBUG(log,
"Prefer fetching part {} from replica {} due to {}",
entry.new_part_name, replica_to_execute_merge.value(), param);
"Prefer fetching part {} from replica {} due to remote_fs_execute_merges_on_single_replica_time_threshold",
entry.new_part_name, replica_to_execute_merge.value());
return false;
}
}
@ -2177,7 +2167,7 @@ bool StorageReplicatedMergeTree::executeFetchShared(
{
if (source_replica.empty())
{
LOG_INFO(log, "No active replica has part {} on S3/HDFS.", new_part_name);
LOG_INFO(log, "No active replica has part {} on shared storage.", new_part_name);
return false;
}
@ -7210,19 +7200,9 @@ void StorageReplicatedMergeTree::lockSharedData(const IMergeTreeDataPart & part)
if (!part.volume)
return;
DiskPtr disk = part.volume->getDisk();
if (!disk)
return;
String zero_copy;
if (disk->getType() == DiskType::Type::S3)
{
zero_copy = "zero_copy_s3";
}
else if (disk->getType() == DiskType::Type::HDFS)
{
zero_copy = "zero_copy_hdfs";
}
else
if (!disk || !disk->supportZeroCopyReplication())
return;
String zero_copy = fmt::format("zero_copy_{}", DiskType::toString(disk->getType()));
zkutil::ZooKeeperPtr zookeeper = tryGetZooKeeper();
if (!zookeeper)
@ -7260,19 +7240,9 @@ bool StorageReplicatedMergeTree::unlockSharedData(const IMergeTreeDataPart & par
if (!part.volume)
return true;
DiskPtr disk = part.volume->getDisk();
if (!disk)
return true;
String zero_copy;
if (disk->getType() == DiskType::Type::S3)
{
zero_copy = "zero_copy_s3";
}
else if (disk->getType() == DiskType::Type::HDFS)
{
zero_copy = "zero_copy_hdfs";
}
else
if (!disk || !disk->supportZeroCopyReplication())
return true;
String zero_copy = fmt::format("zero_copy_{}", DiskType::toString(disk->getType()));
zkutil::ZooKeeperPtr zookeeper = tryGetZooKeeper();
if (!zookeeper)
@ -7318,7 +7288,7 @@ bool StorageReplicatedMergeTree::tryToFetchIfShared(
{
const auto settings = getSettings();
auto disk_type = disk->getType();
if (!(disk_type==DiskType::Type::S3 && settings->allow_s3_zero_copy_replication) && !(disk_type==DiskType::Type::HDFS && settings->allow_hdfs_zero_copy_replication))
if (!(disk->supportZeroCopyReplication() && settings->allow_remote_fs_zero_copy_replication))
return false;
String replica = getSharedDataReplica(part, disk_type);

View File

@ -225,10 +225,10 @@ public:
/// Fetch part only when it stored on shared storage like S3
bool executeFetchShared(const String & source_replica, const String & new_part_name, const DiskPtr & disk, const String & path);
/// Lock part in zookeeper for use common S3/HDFS data in several nodes
/// Lock part in zookeeper for use shared data in several nodes
void lockSharedData(const IMergeTreeDataPart & part) const override;
/// Unlock common S3/HDFS data part in zookeeper
/// Unlock shared data part in zookeeper
/// Return true if data unlocked
/// Return false if data is still used by another node
bool unlockSharedData(const IMergeTreeDataPart & part) const override;

View File

@ -5,14 +5,14 @@ This directory contains tests that involve several ClickHouse instances, custom
### Running natively
Prerequisites:
* Ubuntu 14.04 (Trusty) or higher.
* Ubuntu 20.04 (Focal) or higher.
* [docker](https://www.docker.com/community-edition#/download). Minimum required API version: 1.25, check with `docker version`.
You must install latest Docker from
https://docs.docker.com/engine/installation/linux/docker-ce/ubuntu/#set-up-the-repository
Don't use Docker from your system repository.
* [pip](https://pypi.python.org/pypi/pip) and `libpq-dev`. To install: `sudo apt-get install python3-pip libpq-dev zlib1g-dev libcrypto++-dev libssl-dev libkrb5-dev`
* [pip](https://pypi.python.org/pypi/pip) and `libpq-dev`. To install: `sudo apt-get install python3-pip libpq-dev zlib1g-dev libcrypto++-dev libssl-dev libkrb5-dev python3-dev`
* [py.test](https://docs.pytest.org/) testing framework. To install: `sudo -H pip install pytest`
* [docker-compose](https://docs.docker.com/compose/) and additional python libraries. To install:
@ -25,14 +25,14 @@ sudo -H pip install \
confluent-kafka \
dicttoxml \
docker \
docker-compose==1.22.0 \
docker-compose \
grpcio \
grpcio-tools \
kafka-python \
kazoo \
minio \
protobuf \
psycopg2-binary==2.7.5 \
psycopg2-binary \
pymongo \
pytest \
pytest-timeout \
@ -40,10 +40,13 @@ sudo -H pip install \
tzlocal \
urllib3 \
requests-kerberos \
dict2xml
dict2xml \
hypothesis \
pyhdfs \
pika
```
(highly not recommended) If you really want to use OS packages on modern debian/ubuntu instead of "pip": `sudo apt install -y docker docker-compose python3-pytest python3-dicttoxml python3-docker python3-pymysql python3-pymongo python3-tzlocal python3-kazoo python3-psycopg2 kafka-python python3-pytest-timeout python3-minio`
(highly not recommended) If you really want to use OS packages on modern debian/ubuntu instead of "pip": `sudo apt install -y docker docker-compose python3-pytest python3-dicttoxml python3-docker python3-pymysql python3-protobuf python3-pymongo python3-tzlocal python3-kazoo python3-psycopg2 kafka-python python3-pytest-timeout python3-minio`
If you want to run the tests under a non-privileged user, you must add this user to `docker` group: `sudo usermod -aG docker $USER` and re-login.
(You must close all your sessions (for example, restart your computer))

View File

@ -102,6 +102,7 @@ class HDFSApi(object):
return response_data
else:
logging.error(f"unexpected response_data.status_code {response_data.status_code} != {expected_code}")
time.sleep(1)
response_data.raise_for_status()

View File

@ -78,7 +78,7 @@ def wait_for_delete_hdfs_objects(cluster, expected, num_tries=30):
while num_tries > 0:
num_hdfs_objects = len(fs.listdir('/clickhouse'))
if num_hdfs_objects == expected:
break;
break
num_tries -= 1
time.sleep(1)
assert(len(fs.listdir('/clickhouse')) == expected)

View File

@ -36,20 +36,20 @@
<tiered>
<volumes>
<main>
<disk>hdfs1</disk>
<disk>hdfs2</disk>
</main>
<external>
<disk>hdfs2</disk>
<disk>hdfs1</disk>
</external>
</volumes>
</tiered>
<tiered_copy>
<volumes>
<main>
<disk>hdfs1</disk>
<disk>hdfs1_again</disk>
</main>
<external>
<disk>hdfs1_again</disk>
<disk>hdfs1</disk>
</external>
</volumes>
</tiered_copy>
@ -59,7 +59,7 @@
<merge_tree>
<min_bytes_for_wide_part>1024000</min_bytes_for_wide_part>
<old_parts_lifetime>1</old_parts_lifetime>
<allow_hdfs_zero_copy_replication>1</allow_hdfs_zero_copy_replication>
<allow_remote_fs_zero_copy_replication>1</allow_remote_fs_zero_copy_replication>
</merge_tree>
<remote_servers>

View File

@ -28,11 +28,11 @@ def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance("node1", main_configs=["configs/config.d/storage_conf.xml"],
macros={'replica': '1'},
macros={'replica': 'node1'},
with_zookeeper=True,
with_hdfs=True)
cluster.add_instance("node2", main_configs=["configs/config.d/storage_conf.xml"],
macros={'replica': '2'},
macros={'replica': 'node2'},
with_zookeeper=True,
with_hdfs=True)
logging.info("Starting cluster...")
@ -49,7 +49,7 @@ def cluster():
cluster.shutdown()
def test_hdfs_zero_copy_replication(cluster):
def test_hdfs_zero_copy_replication_insert(cluster):
node1 = cluster.instances["node1"]
node2 = cluster.instances["node2"]
@ -70,6 +70,8 @@ def test_hdfs_zero_copy_replication(cluster):
assert node2.query("SELECT count() FROM hdfs_test FORMAT Values") == "(1)"
assert node1.query("SELECT id FROM hdfs_test ORDER BY dt FORMAT Values") == "(10)"
assert node2.query("SELECT id FROM hdfs_test ORDER BY dt FORMAT Values") == "(10)"
assert node1.query("SELECT partition_id,disk_name FROM system.parts WHERE table='hdfs_test' FORMAT Values") == "('all','hdfs1')"
assert node2.query("SELECT partition_id,disk_name FROM system.parts WHERE table='hdfs_test' FORMAT Values") == "('all','hdfs1')"
wait_for_hdfs_objects(cluster, "/clickhouse1", SHARDS * FILES_OVERHEAD_PER_TABLE + FILES_OVERHEAD_PER_PART_COMPACT)
@ -77,122 +79,143 @@ def test_hdfs_zero_copy_replication(cluster):
node2.query("DROP TABLE IF EXISTS hdfs_test NO DELAY")
def test_hdfs_zero_copy_on_hybrid_storage(cluster):
@pytest.mark.parametrize(
("storage_policy", "init_objects"),
[("hybrid", 0),
("tiered", 0),
("tiered_copy", FILES_OVERHEAD_PER_TABLE)]
)
def test_hdfs_zero_copy_replication_single_move(cluster, storage_policy, init_objects):
node1 = cluster.instances["node1"]
node2 = cluster.instances["node2"]
node1.query("DROP TABLE IF EXISTS single_node_move_test NO DELAY")
node1.query(
"""
CREATE TABLE hybrid_test ON CLUSTER test_cluster (dt DateTime, id Int64)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/{cluster}/{shard}/hybrid_test', '{replica}')
Template("""
CREATE TABLE single_node_move_test (dt DateTime, id Int64)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/{cluster}/{shard}/single_node_move_test', '{replica}')
ORDER BY (dt, id)
SETTINGS storage_policy='hybrid'
"""
SETTINGS storage_policy='$policy'
""").substitute(policy=storage_policy)
)
wait_for_hdfs_objects(cluster, "/clickhouse1", init_objects)
wait_for_hdfs_objects(cluster, "/clickhouse1", 0)
node1.query("INSERT INTO single_node_move_test VALUES (now() - INTERVAL 3 DAY, 10), (now() - INTERVAL 1 DAY, 11)")
assert node1.query("SELECT id FROM single_node_move_test ORDER BY dt FORMAT Values") == "(10),(11)"
node1.query("INSERT INTO hybrid_test VALUES (now() - INTERVAL 3 DAY, 10), (now() - INTERVAL 1 DAY, 11)")
node2.query("SYSTEM SYNC REPLICA hybrid_test")
node1.query("ALTER TABLE single_node_move_test MOVE PARTITION ID 'all' TO VOLUME 'external'")
assert node1.query("SELECT partition_id,disk_name FROM system.parts WHERE table='single_node_move_test' FORMAT Values") == "('all','hdfs1')"
assert node1.query("SELECT id FROM single_node_move_test ORDER BY dt FORMAT Values") == "(10),(11)"
wait_for_hdfs_objects(cluster, "/clickhouse1", init_objects + FILES_OVERHEAD_PER_PART_COMPACT)
assert node1.query("SELECT id FROM hybrid_test ORDER BY dt FORMAT Values") == "(10),(11)"
assert node2.query("SELECT id FROM hybrid_test ORDER BY dt FORMAT Values") == "(10),(11)"
assert node1.query("SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values") == "('all','default')"
assert node2.query("SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values") == "('all','default')"
node1.query("ALTER TABLE single_node_move_test MOVE PARTITION ID 'all' TO VOLUME 'main'")
assert node1.query("SELECT id FROM single_node_move_test ORDER BY dt FORMAT Values") == "(10),(11)"
node1.query("ALTER TABLE hybrid_test MOVE PARTITION ID 'all' TO DISK 'hdfs1'")
assert node1.query("SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values") == "('all','hdfs1')"
assert node2.query("SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values") == "('all','default')"
wait_for_hdfs_objects(cluster, "/clickhouse1", FILES_OVERHEAD_PER_PART_COMPACT)
node2.query("ALTER TABLE hybrid_test MOVE PARTITION ID 'all' TO DISK 'hdfs1'")
assert node1.query("SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values") == "('all','hdfs1')"
assert node2.query("SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values") == "('all','hdfs1')"
wait_for_hdfs_objects(cluster, "/clickhouse1", FILES_OVERHEAD_PER_PART_COMPACT)
assert node1.query("SELECT id FROM hybrid_test ORDER BY dt FORMAT Values") == "(10),(11)"
assert node2.query("SELECT id FROM hybrid_test ORDER BY dt FORMAT Values") == "(10),(11)"
node1.query("DROP TABLE IF EXISTS hybrid_test NO DELAY")
node2.query("DROP TABLE IF EXISTS hybrid_test NO DELAY")
node1.query("DROP TABLE IF EXISTS single_node_move_test NO DELAY")
@pytest.mark.parametrize(
("storage_policy", "iterations"),
[
("hybrid", 1),
("tiered", 1),
("tiered_copy", 1),
]
("storage_policy", "init_objects"),
[("hybrid", 0),
("tiered", 0),
("tiered_copy", SHARDS * FILES_OVERHEAD_PER_TABLE)]
)
def test_hdfs_zero_copy_with_ttl_move(cluster, storage_policy, iterations):
def test_hdfs_zero_copy_replication_move(cluster, storage_policy, init_objects):
node1 = cluster.instances["node1"]
node2 = cluster.instances["node2"]
node1.query("DROP TABLE IF EXISTS move_test NO DELAY")
node2.query("DROP TABLE IF EXISTS move_test NO DELAY")
node1.query(
Template("""
CREATE TABLE move_test ON CLUSTER test_cluster (dt DateTime, id Int64)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/{cluster}/{shard}/move_test', '{replica}')
ORDER BY (dt, id)
SETTINGS storage_policy='$policy'
""").substitute(policy=storage_policy)
)
wait_for_hdfs_objects(cluster, "/clickhouse1", init_objects)
node1.query("INSERT INTO move_test VALUES (now() - INTERVAL 3 DAY, 10), (now() - INTERVAL 1 DAY, 11)")
node2.query("SYSTEM SYNC REPLICA move_test")
assert node1.query("SELECT id FROM move_test ORDER BY dt FORMAT Values") == "(10),(11)"
assert node2.query("SELECT id FROM move_test ORDER BY dt FORMAT Values") == "(10),(11)"
node1.query("ALTER TABLE move_test MOVE PARTITION ID 'all' TO VOLUME 'external'")
wait_for_hdfs_objects(cluster, "/clickhouse1", init_objects + FILES_OVERHEAD_PER_PART_COMPACT)
node2.query("ALTER TABLE move_test MOVE PARTITION ID 'all' TO VOLUME 'external'")
assert node1.query("SELECT partition_id,disk_name FROM system.parts WHERE table='move_test' FORMAT Values") == "('all','hdfs1')"
assert node2.query("SELECT partition_id,disk_name FROM system.parts WHERE table='move_test' FORMAT Values") == "('all','hdfs1')"
assert node1.query("SELECT id FROM move_test ORDER BY dt FORMAT Values") == "(10),(11)"
assert node2.query("SELECT id FROM move_test ORDER BY dt FORMAT Values") == "(10),(11)"
wait_for_hdfs_objects(cluster, "/clickhouse1", init_objects + FILES_OVERHEAD_PER_PART_COMPACT)
node1.query("DROP TABLE IF EXISTS move_test NO DELAY")
node2.query("DROP TABLE IF EXISTS move_test NO DELAY")
@pytest.mark.parametrize(
("storage_policy"), ["hybrid", "tiered", "tiered_copy"]
)
def test_hdfs_zero_copy_with_ttl_move(cluster, storage_policy):
node1 = cluster.instances["node1"]
node2 = cluster.instances["node2"]
node1.query("DROP TABLE IF EXISTS ttl_move_test NO DELAY")
node2.query("DROP TABLE IF EXISTS ttl_move_test NO DELAY")
node1.query(
Template("""
CREATE TABLE ttl_move_test ON CLUSTER test_cluster (dt DateTime, id Int64)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/{cluster}/{shard}/ttl_move_test', '{replica}')
ORDER BY (dt, id)
TTL dt + INTERVAL 2 DAY TO VOLUME 'external'
SETTINGS storage_policy='$policy'
""").substitute(policy=storage_policy)
)
for i in range(iterations):
node1.query(
Template("""
CREATE TABLE ttl_move_test ON CLUSTER test_cluster (dt DateTime, id Int64)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/{cluster}/{shard}/ttl_move_test', '{replica}')
ORDER BY (dt, id)
TTL dt + INTERVAL 2 DAY TO VOLUME 'external'
SETTINGS storage_policy='$policy'
""").substitute(policy=storage_policy)
)
node1.query("INSERT INTO ttl_move_test VALUES (now() - INTERVAL 3 DAY, 10)")
node1.query("INSERT INTO ttl_move_test VALUES (now() - INTERVAL 1 DAY, 11)")
node1.query("INSERT INTO ttl_move_test VALUES (now() - INTERVAL 3 DAY, 10)")
node1.query("INSERT INTO ttl_move_test VALUES (now() - INTERVAL 1 DAY, 11)")
node1.query("OPTIMIZE TABLE ttl_move_test FINAL")
node2.query("SYSTEM SYNC REPLICA ttl_move_test")
node1.query("OPTIMIZE TABLE ttl_move_test FINAL")
node2.query("SYSTEM SYNC REPLICA ttl_move_test")
assert node1.query("SELECT count() FROM ttl_move_test FORMAT Values") == "(2)"
assert node2.query("SELECT count() FROM ttl_move_test FORMAT Values") == "(2)"
assert node1.query("SELECT id FROM ttl_move_test ORDER BY id FORMAT Values") == "(10),(11)"
assert node2.query("SELECT id FROM ttl_move_test ORDER BY id FORMAT Values") == "(10),(11)"
assert node1.query("SELECT count() FROM ttl_move_test FORMAT Values") == "(2)"
assert node2.query("SELECT count() FROM ttl_move_test FORMAT Values") == "(2)"
assert node1.query("SELECT id FROM ttl_move_test ORDER BY id FORMAT Values") == "(10),(11)"
assert node2.query("SELECT id FROM ttl_move_test ORDER BY id FORMAT Values") == "(10),(11)"
node1.query("DROP TABLE IF EXISTS ttl_move_test NO DELAY")
node2.query("DROP TABLE IF EXISTS ttl_move_test NO DELAY")
node1.query("DROP TABLE IF EXISTS ttl_move_test NO DELAY")
node2.query("DROP TABLE IF EXISTS ttl_move_test NO DELAY")
@pytest.mark.parametrize(
("iterations"), [1]
)
def test_hdfs_zero_copy_with_ttl_delete(cluster, iterations):
def test_hdfs_zero_copy_with_ttl_delete(cluster):
node1 = cluster.instances["node1"]
node2 = cluster.instances["node2"]
node1.query("DROP TABLE IF EXISTS ttl_delete_test NO DELAY")
node2.query("DROP TABLE IF EXISTS ttl_delete_test NO DELAY")
for i in range(iterations):
node1.query(
"""
CREATE TABLE ttl_delete_test ON CLUSTER test_cluster (dt DateTime, id Int64)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/{cluster}/{shard}/ttl_delete_test', '{replica}')
ORDER BY (dt, id)
TTL dt + INTERVAL 2 DAY
SETTINGS storage_policy='tiered'
"""
)
node1.query(
"""
CREATE TABLE ttl_delete_test ON CLUSTER test_cluster (dt DateTime, id Int64)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/{cluster}/{shard}/ttl_delete_test', '{replica}')
ORDER BY (dt, id)
TTL dt + INTERVAL 2 DAY
SETTINGS storage_policy='tiered'
"""
)
node1.query("INSERT INTO ttl_delete_test VALUES (now() - INTERVAL 3 DAY, 10)")
node1.query("INSERT INTO ttl_delete_test VALUES (now() - INTERVAL 1 DAY, 11)")
node1.query("INSERT INTO ttl_delete_test VALUES (now() - INTERVAL 3 DAY, 10)")
node1.query("INSERT INTO ttl_delete_test VALUES (now() - INTERVAL 1 DAY, 11)")
node1.query("OPTIMIZE TABLE ttl_delete_test FINAL")
node2.query("SYSTEM SYNC REPLICA ttl_delete_test")
node1.query("OPTIMIZE TABLE ttl_delete_test FINAL")
node2.query("SYSTEM SYNC REPLICA ttl_delete_test")
assert node1.query("SELECT count() FROM ttl_delete_test FORMAT Values") == "(1)"
assert node2.query("SELECT count() FROM ttl_delete_test FORMAT Values") == "(1)"
assert node1.query("SELECT id FROM ttl_delete_test ORDER BY id FORMAT Values") == "(11)"
assert node2.query("SELECT id FROM ttl_delete_test ORDER BY id FORMAT Values") == "(11)"
assert node1.query("SELECT count() FROM ttl_delete_test FORMAT Values") == "(1)"
assert node2.query("SELECT count() FROM ttl_delete_test FORMAT Values") == "(1)"
assert node1.query("SELECT id FROM ttl_delete_test ORDER BY id FORMAT Values") == "(11)"
assert node2.query("SELECT id FROM ttl_delete_test ORDER BY id FORMAT Values") == "(11)"
node1.query("DROP TABLE IF EXISTS ttl_delete_test NO DELAY")
node2.query("DROP TABLE IF EXISTS ttl_delete_test NO DELAY")
node1.query("DROP TABLE IF EXISTS ttl_delete_test NO DELAY")
node2.query("DROP TABLE IF EXISTS ttl_delete_test NO DELAY")

View File

@ -21,7 +21,7 @@
<merge_tree>
<min_bytes_for_wide_part>0</min_bytes_for_wide_part>
<allow_s3_zero_copy_replication>0</allow_s3_zero_copy_replication>
<allow_remote_fs_zero_copy_replication>0</allow_remote_fs_zero_copy_replication>
</merge_tree>
<remote_servers>

View File

@ -21,7 +21,7 @@
<merge_tree>
<min_bytes_for_wide_part>0</min_bytes_for_wide_part>
<allow_s3_zero_copy_replication>1</allow_s3_zero_copy_replication>
<allow_remote_fs_zero_copy_replication>1</allow_remote_fs_zero_copy_replication>
</merge_tree>
<remote_servers>

View File

@ -66,7 +66,7 @@
<merge_tree>
<min_bytes_for_wide_part>1024</min_bytes_for_wide_part>
<old_parts_lifetime>1</old_parts_lifetime>
<allow_s3_zero_copy_replication>1</allow_s3_zero_copy_replication>
<allow_remote_fs_zero_copy_replication>1</allow_remote_fs_zero_copy_replication>
</merge_tree>
<remote_servers>