Fix build after merge

This commit is contained in:
Anton Ivashkin 2021-01-20 12:48:22 +03:00
parent 357d98eb36
commit df6c882aab
12 changed files with 61 additions and 30 deletions

View File

@ -37,7 +37,7 @@
В гибридном хранилище если парт переносится на S3, нода через ZK проверяет, нет был ли парт перенесен другой нодой, если был, то делает fetch (модифицированный по сравнению с обычным fetch'ем).
В конфиг добавлен флаг, по которому включается функционал нового протокола репликации - merge_tree->allow_s3_zero_copy_replication. Сейчас стоит в true - это времеменно, чтобы все тесты сейчас проходили с включенным флагом, перед финальным мержем надо не забыть заменить на false.
В конфиг добавлен флаг, по которому включается функционал нового протокола репликации - merge_tree->allow_s3_zero_copy_replication. Сейчас стоит в false.
## Костыли и недоработки, коих много

View File

@ -278,11 +278,11 @@ void DiskCacheWrapper::removeRecursive(const String & path)
DiskDecorator::removeRecursive(path);
}
void DiskCacheWrapper::removeShared(const String & path, bool keep_s3)
void DiskCacheWrapper::removeSharedFile(const String & path, bool keep_s3)
{
if (cache_disk->exists(path))
cache_disk->removeShared(path, keep_s3);
DiskDecorator::removeShared(path, keep_s3);
cache_disk->removeSharedFile(path, keep_s3);
DiskDecorator::removeSharedFile(path, keep_s3);
}
void DiskCacheWrapper::removeSharedRecursive(const String & path, bool keep_s3)

View File

@ -41,7 +41,7 @@ public:
void removeFileIfExists(const String & path) override;
void removeDirectory(const String & path) override;
void removeRecursive(const String & path) override;
void removeShared(const String & path, bool keep_s3) override;
void removeSharedFile(const String & path, bool keep_s3) override;
void removeSharedRecursive(const String & path, bool keep_s3) override;
void createHardLink(const String & src_path, const String & dst_path) override;
ReservationPtr reserve(UInt64 bytes) override;

View File

@ -150,9 +150,9 @@ void DiskDecorator::removeRecursive(const String & path)
delegate->removeRecursive(path);
}
void DiskDecorator::removeShared(const String & path, bool keep_s3)
void DiskDecorator::removeSharedFile(const String & path, bool keep_s3)
{
delegate->removeShared(path, keep_s3);
delegate->removeSharedFile(path, keep_s3);
}
void DiskDecorator::removeSharedRecursive(const String & path, bool keep_s3)

View File

@ -43,7 +43,7 @@ public:
void removeFileIfExists(const String & path) override;
void removeDirectory(const String & path) override;
void removeRecursive(const String & path) override;
void removeShared(const String & path, bool keep_s3) override;
void removeSharedFile(const String & path, bool keep_s3) override;
void removeSharedRecursive(const String & path, bool keep_s3) override;
void setLastModified(const String & path, const Poco::Timestamp & timestamp) override;
Poco::Timestamp getLastModified(const String & path) override;

View File

@ -201,10 +201,10 @@ public:
/// Invoked when Global Context is shutdown.
virtual void shutdown() { }
/// Return some uniq string for file, overrided for S3
/// Return some uniq string for file, overrode for S3
virtual String getUniqueId(const String & path) const { return path; }
/// Check file, overrided for S3 only
/// Check file, overrode for S3 only
virtual bool checkUniqueId(const String & id) const { return exists(id); }
/// Returns executor to perform asynchronous operations.

View File

@ -935,8 +935,6 @@ bool DiskS3::checkUniqueId(const String & id) const
throwIfError(resp);
Aws::Vector<Aws::S3::Model::Object> object_list = resp.GetResult().GetContents();
if (object_list.size() < 1)
return false;
for (const auto & object : object_list)
if (object.GetKey() == id)
return true;

View File

@ -118,7 +118,7 @@ public:
String getUniqueId(const String & path) const override;
bool checkUniqueId(const String & path) const override;
bool checkUniqueId(const String & id) const override;
private:
bool tryReserve(UInt64 bytes);

View File

@ -9,7 +9,6 @@
#include <DataStreams/NativeBlockOutputStream.h>
#include <IO/HTTPCommon.h>
#include <IO/createReadBufferFromFileBase.h>
#include <IO/createWriteBufferFromFileBase.h>
#include <ext/scope_guard.h>
#include <Poco/File.h>
#include <Poco/Net/HTTPServerResponse.h>
@ -619,11 +618,11 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToS3(
DiskPtr disk = disks_s3[0];
for (const auto & disk_ : disks_s3)
for (const auto & disk_s3 : disks_s3)
{
if (disk_->checkUniqueId(part_id))
if (disk_s3->checkUniqueId(part_id))
{
disk = disk_;
disk = disk_s3;
break;
}
}
@ -662,7 +661,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToS3(
String metadata_file = fullPath(disk, data_path);
{
auto file_out = createWriteBufferFromFileBase(metadata_file, 0, 0, DBMS_DEFAULT_BUFFER_SIZE, -1);
auto file_out = std::make_unique<WriteBufferFromFile>(metadata_file, DBMS_DEFAULT_BUFFER_SIZE, -1, 0666, nullptr, 0);
HashingWriteBuffer hashing_out(*file_out);

View File

@ -1132,7 +1132,9 @@ void IMergeTreeDataPart::makeCloneOnDisk(const DiskPtr & disk, const String & di
if (disk->getType() == "s3")
{
is_fetched = tryToFetchIfShared(disk, path_to_clone + "/" + name);
auto data_settings = storage.getSettings();
if (data_settings->allow_s3_zero_copy_replication)
is_fetched = tryToFetchIfShared(disk, path_to_clone + "/" + name);
}
if (!is_fetched)
@ -1301,8 +1303,23 @@ void IMergeTreeDataPart::lockSharedData() const
LOG_TRACE(storage.log, "Set zookeeper lock {}", zookeeper_node);
zk.zookeeper->createAncestors(zookeeper_node);
zk.zookeeper->createIfNotExists(zookeeper_node, "lock");
/// In rare case other replica can remove path between createAncestors and createIfNotExists
/// So we make up to 5 attempts
for (int attempts = 5; attempts > 0; --attempts)
{
try
{
zk.zookeeper->createAncestors(zookeeper_node);
zk.zookeeper->createIfNotExists(zookeeper_node, "lock");
break;
}
catch (const zkutil::KeeperException & e)
{
if (e.code == Coordination::Error::ZNONODE)
continue;
throw;
}
}
}
bool IMergeTreeDataPart::unlockSharedData() const
@ -1476,7 +1493,7 @@ bool IMergeTreeDataPart::tryToFetchIfShared(const DiskPtr & disk, const String &
log_entry.disk = disk;
log_entry.path = path;
/// TODO: !!! Fix const usage !!!
/// TODO: Fix const usage
StorageReplicatedMergeTree *replicated_storage_nc = const_cast<StorageReplicatedMergeTree *>(replicated_storage);
return replicated_storage_nc->executeFetchShared(log_entry);

View File

@ -108,7 +108,7 @@ struct Settings;
M(UInt64, concurrent_part_removal_threshold, 100, "Activate concurrent part removal (see 'max_part_removal_threads') only if the number of inactive data parts is at least this.", 0) \
M(String, storage_policy, "default", "Name of storage disk policy", 0) \
M(Bool, allow_nullable_key, false, "Allow Nullable types as primary keys.", 0) \
M(Bool, allow_s3_zero_copy_replication, true, "Allow Zero-copy replication over S3", 0) \
M(Bool, allow_s3_zero_copy_replication, false, "Allow Zero-copy replication over S3", 0) \
M(Bool, remove_empty_parts, true, "Remove empty parts after they were pruned by TTL, mutation, or collapsing merge algorithm", 0) \
M(Bool, assign_part_uuids, false, "Generate UUIDs for parts. Before enabling check that all replicas support new format.", 0) \
M(Int64, max_partitions_to_read, -1, "Limit the max number of partitions that can be accessed in one query. <= 0 means unlimited. This setting is the default that can be overridden by the query-level setting with the same name.", 0) \

View File

@ -1498,11 +1498,28 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
{
auto zookeeper = getZooKeeper();
String zookeeper_node = zookeeper_path + "/zero_copy_s3/merged/" + entry.new_part_name;
zookeeper->createAncestors(zookeeper_node);
auto code = zookeeper->tryCreate(zookeeper_node, "lock", zkutil::CreateMode::Ephemeral);
/// Someone else created or started create this merge
if (code == Coordination::Error::ZNODEEXISTS)
return false;
/// In rare case other replica can remove path between createAncestors and tryCreate
/// So we make up to 5 attempts to make a lock
for (int attempts = 5; attempts > 0; --attempts)
{
try
{
zookeeper->createAncestors(zookeeper_node);
auto code = zookeeper->tryCreate(zookeeper_node, "lock", zkutil::CreateMode::Ephemeral);
/// Someone else created or started create this merge
if (code == Coordination::Error::ZNODEEXISTS)
return false;
if (code != Coordination::Error::ZNONODE)
break;
}
catch (const zkutil::KeeperException & e)
{
if (e.code == Coordination::Error::ZNONODE)
continue;
throw;
}
}
}
}
@ -1930,7 +1947,7 @@ bool StorageReplicatedMergeTree::executeFetchShared(ReplicatedMergeTreeLogEntry
try
{
if (!fetchPart(entry.new_part_name, metadata_snapshot, zookeeper_path + "/replicas/" + entry.source_replica, false, entry.quorum,
if (!fetchPart(entry.new_part_name, metadata_snapshot, zookeeper_path + "/replicas/" + entry.source_replica, false, entry.quorum,
nullptr, true, entry.disk, entry.path))
return false;
}
@ -3624,7 +3641,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
{
if (part->volume->getDisk()->getName() != replaced_disk->getName())
throw Exception("Part " + part->name + " fetched on wrong disk " + part->volume->getDisk()->getName(), ErrorCodes::LOGICAL_ERROR);
replaced_disk->removeIfExists(replaced_part_path);
replaced_disk->removeFileIfExists(replaced_part_path);
replaced_disk->moveDirectory(part->getFullRelativePath(), replaced_part_path);
}
else