Merge pull request #34215 from ClickHouse/revert-34211-revert-34153-add_func_tests_over_s3

Add func tests run with s3 and fix several bugs
This commit is contained in:
alesapin 2022-02-15 19:07:11 +03:00 committed by GitHub
commit bc2d0ee7c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
94 changed files with 937 additions and 470 deletions

View File

@ -1065,6 +1065,41 @@ jobs:
docker kill "$(docker ps -q)" ||:
docker rm -f "$(docker ps -a -q)" ||:
sudo rm -fr "$TEMP_PATH"
FunctionalStatelessTestReleaseS3:
needs: [BuilderDebRelease]
runs-on: [self-hosted, func-tester]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/stateless_s3_storage
REPORTS_PATH=${{runner.temp}}/reports_dir
CHECK_NAME=Stateless tests (release, s3 storage, actions)
REPO_COPY=${{runner.temp}}/stateless_s3_storage/ClickHouse
KILL_TIMEOUT=10800
EOF
- name: Download json reports
uses: actions/download-artifact@v2
with:
path: ${{ env.REPORTS_PATH }}
- name: Clear repository
run: |
sudo rm -fr "$GITHUB_WORKSPACE" && mkdir "$GITHUB_WORKSPACE"
- name: Check out repository code
uses: actions/checkout@v2
- name: Functional test
run: |
sudo rm -fr "$TEMP_PATH"
mkdir -p "$TEMP_PATH"
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
cd "$REPO_COPY/tests/ci"
python3 functional_test_check.py "$CHECK_NAME" "$KILL_TIMEOUT"
- name: Cleanup
if: always()
run: |
docker kill "$(docker ps -q)" ||:
docker rm -f "$(docker ps -a -q)" ||:
sudo rm -fr "$TEMP_PATH"
FunctionalStatelessTestAarch64:
needs: [BuilderDebAarch64]
runs-on: [self-hosted, func-tester-aarch64]
@ -2844,6 +2879,7 @@ jobs:
- FunctionalStatefulTestDebug
- FunctionalStatefulTestRelease
- FunctionalStatefulTestReleaseDatabaseOrdinary
- FunctionalStatelessTestReleaseS3
- FunctionalStatefulTestAarch64
- FunctionalStatefulTestAsan
- FunctionalStatefulTestTsan

View File

@ -1215,6 +1215,41 @@ jobs:
docker kill "$(docker ps -q)" ||:
docker rm -f "$(docker ps -a -q)" ||:
sudo rm -fr "$TEMP_PATH"
FunctionalStatelessTestReleaseS3:
needs: [BuilderDebRelease]
runs-on: [self-hosted, func-tester]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/stateless_s3_storage
REPORTS_PATH=${{runner.temp}}/reports_dir
CHECK_NAME=Stateless tests (release, s3 storage, actions)
REPO_COPY=${{runner.temp}}/stateless_s3_storage/ClickHouse
KILL_TIMEOUT=10800
EOF
- name: Download json reports
uses: actions/download-artifact@v2
with:
path: ${{ env.REPORTS_PATH }}
- name: Clear repository
run: |
sudo rm -fr "$GITHUB_WORKSPACE" && mkdir "$GITHUB_WORKSPACE"
- name: Check out repository code
uses: actions/checkout@v2
- name: Functional test
run: |
sudo rm -fr "$TEMP_PATH"
mkdir -p "$TEMP_PATH"
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
cd "$REPO_COPY/tests/ci"
python3 functional_test_check.py "$CHECK_NAME" "$KILL_TIMEOUT"
- name: Cleanup
if: always()
run: |
docker kill "$(docker ps -q)" ||:
docker rm -f "$(docker ps -a -q)" ||:
sudo rm -fr "$TEMP_PATH"
FunctionalStatelessTestAarch64:
needs: [BuilderDebAarch64]
runs-on: [self-hosted, func-tester-aarch64]
@ -3037,6 +3072,7 @@ jobs:
- FunctionalStatefulTestTsan
- FunctionalStatefulTestMsan
- FunctionalStatefulTestUBsan
- FunctionalStatelessTestReleaseS3
- StressTestDebug
- StressTestAsan
- StressTestTsan

View File

@ -89,6 +89,10 @@ function run_tests()
# everything in parallel except DatabaseReplicated. See below.
fi
if [[ -n "$USE_S3_STORAGE_FOR_MERGE_TREE" ]] && [[ "$USE_S3_STORAGE_FOR_MERGE_TREE" -eq 1 ]]; then
ADDITIONAL_OPTIONS+=('--s3-storage')
fi
if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]; then
ADDITIONAL_OPTIONS+=('--replicated-database')
ADDITIONAL_OPTIONS+=('--jobs')

View File

@ -30,6 +30,7 @@ void CachedCompressedReadBuffer::initInput()
void CachedCompressedReadBuffer::prefetch()
{
initInput();
file_in->prefetch();
}

View File

@ -61,14 +61,14 @@ public:
void setReadUntilPosition(size_t position) override
{
if (file_in)
file_in->setReadUntilPosition(position);
initInput();
file_in->setReadUntilPosition(position);
}
void setReadUntilEnd() override
{
if (file_in)
file_in->setReadUntilEnd();
initInput();
file_in->setReadUntilEnd();
}
};

View File

@ -69,7 +69,7 @@ public:
static ASTPtr parseQueryFromMetadata(Poco::Logger * log, ContextPtr context, const String & metadata_file_path, bool throw_on_error = true, bool remove_empty = false);
/// will throw when the table we want to attach already exists (in active / detached / detached permanently form)
void checkMetadataFilenameAvailability(const String & to_table_name) const;
void checkMetadataFilenameAvailability(const String & to_table_name) const override;
void checkMetadataFilenameAvailabilityUnlocked(const String & to_table_name, std::unique_lock<std::mutex> &) const;
void modifySettingsMetadata(const SettingsChanges & settings_changes, ContextPtr query_context);

View File

@ -158,9 +158,14 @@ public:
virtual void startupTables(ThreadPool & /*thread_pool*/, bool /*force_restore*/, bool /*force_attach*/) {}
/// Check the existence of the table.
/// Check the existence of the table in memory (attached).
virtual bool isTableExist(const String & name, ContextPtr context) const = 0;
/// Check the existence of the table in any state (in active / detached / detached permanently state).
/// Throws exception when table exists.
virtual void checkMetadataFilenameAvailability(const String & /*table_name*/) const {}
/// Get the table for work. Return nullptr if there is no table.
virtual StoragePtr tryGetTable(const String & name, ContextPtr context) const = 0;

View File

@ -66,7 +66,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskAzureBlobStorage::readFile(
std::optional<size_t>) const
{
auto settings = current_settings.get();
auto metadata = readMeta(path);
auto metadata = readMetadata(path);
LOG_TEST(log, "Read from file by path: {}", backQuote(metadata_disk->getPath() + path));
@ -94,7 +94,6 @@ std::unique_ptr<WriteBufferFromFileBase> DiskAzureBlobStorage::writeFile(
size_t buf_size,
WriteMode mode)
{
auto metadata = readOrCreateMetaForWriting(path, mode);
auto blob_path = path + "_" + getRandomASCIIString(8); /// NOTE: path contains the tmp_* prefix in the blob name
LOG_TRACE(log, "{} to file by path: {}. AzureBlob Storage path: {}",
@ -106,7 +105,12 @@ std::unique_ptr<WriteBufferFromFileBase> DiskAzureBlobStorage::writeFile(
current_settings.get()->max_single_part_upload_size,
buf_size);
return std::make_unique<WriteIndirectBufferFromRemoteFS<WriteBufferFromAzureBlobStorage>>(std::move(buffer), std::move(metadata), blob_path);
auto create_metadata_callback = [this, path, mode, blob_path] (size_t count)
{
readOrCreateUpdateAndStoreMetadata(path, mode, false, [blob_path, count] (Metadata & metadata) { metadata.addObject(blob_path, count); return true; });
};
return std::make_unique<WriteIndirectBufferFromRemoteFS<WriteBufferFromAzureBlobStorage>>(std::move(buffer), std::move(create_metadata_callback), path);
}

View File

@ -23,7 +23,7 @@ public:
{
}
virtual ~WritingToCacheWriteBuffer() override
~WritingToCacheWriteBuffer() override
{
try
{
@ -274,6 +274,7 @@ void DiskCacheWrapper::removeDirectory(const String & path)
{
if (cache_disk->exists(path))
cache_disk->removeDirectory(path);
DiskDecorator::removeDirectory(path);
}
@ -298,6 +299,18 @@ void DiskCacheWrapper::removeSharedRecursive(const String & path, bool keep_s3)
DiskDecorator::removeSharedRecursive(path, keep_s3);
}
void DiskCacheWrapper::removeSharedFiles(const RemoveBatchRequest & files, bool keep_s3)
{
for (const auto & file : files)
{
if (cache_disk->exists(file.path))
cache_disk->removeSharedFile(file.path, keep_s3);
}
DiskDecorator::removeSharedFiles(files, keep_s3);
}
void DiskCacheWrapper::createHardLink(const String & src_path, const String & dst_path)
{
/// Don't create hardlinks for cache files to shadow directory as it just waste cache disk space.

View File

@ -48,6 +48,7 @@ public:
void removeRecursive(const String & path) override;
void removeSharedFile(const String & path, bool keep_s3) override;
void removeSharedRecursive(const String & path, bool keep_s3) override;
void removeSharedFiles(const RemoveBatchRequest & files, bool keep_s3) override;
void createHardLink(const String & src_path, const String & dst_path) override;
ReservationPtr reserve(UInt64 bytes) override;

View File

@ -72,17 +72,9 @@ public:
void startup() override;
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap & map) override;
std::unique_ptr<ReadBufferFromFileBase> readMetaFile(
const String & path,
const ReadSettings & settings,
std::optional<size_t> size) const override { return delegate->readMetaFile(path, settings, size); }
DiskPtr getMetadataDiskIfExistsOrSelf() override { return delegate->getMetadataDiskIfExistsOrSelf(); }
std::unique_ptr<WriteBufferFromFileBase> writeMetaFile(
const String & path,
size_t buf_size,
WriteMode mode) override { return delegate->writeMetaFile(path, buf_size, mode); }
void removeMetaFileIfExists(const String & path) override { delegate->removeMetaFileIfExists(path); }
std::unordered_map<String, String> getSerializedMetadata(const std::vector<String> & file_paths) const override { return delegate->getSerializedMetadata(file_paths); }
UInt32 getRefCount(const String & path) const override { return delegate->getRefCount(path); }

View File

@ -76,7 +76,7 @@ DiskHDFS::DiskHDFS(
std::unique_ptr<ReadBufferFromFileBase> DiskHDFS::readFile(const String & path, const ReadSettings & read_settings, std::optional<size_t>, std::optional<size_t>) const
{
auto metadata = readMeta(path);
auto metadata = readMetadata(path);
LOG_TEST(log,
"Read from file by path: {}. Existing HDFS objects: {}",
@ -90,8 +90,6 @@ std::unique_ptr<ReadBufferFromFileBase> DiskHDFS::readFile(const String & path,
std::unique_ptr<WriteBufferFromFileBase> DiskHDFS::writeFile(const String & path, size_t buf_size, WriteMode mode)
{
auto metadata = readOrCreateMetaForWriting(path, mode);
/// Path to store new HDFS object.
auto file_name = getRandomName();
auto hdfs_path = remote_fs_root_path + file_name;
@ -103,10 +101,13 @@ std::unique_ptr<WriteBufferFromFileBase> DiskHDFS::writeFile(const String & path
auto hdfs_buffer = std::make_unique<WriteBufferFromHDFS>(hdfs_path,
config, settings->replication, buf_size,
mode == WriteMode::Rewrite ? O_WRONLY : O_WRONLY | O_APPEND);
auto create_metadata_callback = [this, path, mode, file_name] (size_t count)
{
readOrCreateUpdateAndStoreMetadata(path, mode, false, [file_name, count] (Metadata & metadata) { metadata.addObject(file_name, count); return true; });
};
return std::make_unique<WriteIndirectBufferFromRemoteFS<WriteBufferFromHDFS>>(std::move(hdfs_buffer),
std::move(metadata),
file_name);
return std::make_unique<WriteIndirectBufferFromRemoteFS<WriteBufferFromHDFS>>(
std::move(hdfs_buffer), std::move(create_metadata_callback), path);
}

View File

@ -86,28 +86,4 @@ SyncGuardPtr IDisk::getDirectorySyncGuard(const String & /* path */) const
return nullptr;
}
std::unique_ptr<ReadBufferFromFileBase> IDisk::readMetaFile(
const String & path,
const ReadSettings & settings,
std::optional<size_t> size) const
{
LOG_TRACE(&Poco::Logger::get("IDisk"), "Read local metafile: {}", path);
return readFile(path, settings, size);
}
std::unique_ptr<WriteBufferFromFileBase> IDisk::writeMetaFile(
const String & path,
size_t buf_size,
WriteMode mode)
{
LOG_TRACE(&Poco::Logger::get("IDisk"), "Write local metafile: {}", path);
return writeFile(path, buf_size, mode);
}
void IDisk::removeMetaFileIfExists(const String & path)
{
LOG_TRACE(&Poco::Logger::get("IDisk"), "Remove local metafile: {}", path);
removeFileIfExists(path);
}
}

View File

@ -277,28 +277,34 @@ public:
/// Applies new settings for disk in runtime.
virtual void applyNewSettings(const Poco::Util::AbstractConfiguration &, ContextPtr, const String &, const DisksMap &) {}
/// Open the local file for read and return ReadBufferFromFileBase object.
/// Overridden in IDiskRemote.
/// Used for work with custom metadata.
virtual std::unique_ptr<ReadBufferFromFileBase> readMetaFile(
const String & path,
const ReadSettings & settings,
std::optional<size_t> size) const;
/// Quite leaky abstraction. Some disks can use additional disk to store
/// some parts of metadata. In general case we have only one disk itself and
/// return pointer to it.
///
/// Actually it's a part of IDiskRemote implementation but we have so
/// complex hierarchy of disks (with decorators), so we cannot even
/// dynamic_cast some pointer to IDisk to pointer to IDiskRemote.
virtual std::shared_ptr<IDisk> getMetadataDiskIfExistsOrSelf() { return std::static_pointer_cast<IDisk>(shared_from_this()); }
/// Open the local file for write and return WriteBufferFromFileBase object.
/// Overridden in IDiskRemote.
/// Used for work with custom metadata.
virtual std::unique_ptr<WriteBufferFromFileBase> writeMetaFile(
const String & path,
size_t buf_size,
WriteMode mode);
virtual void removeMetaFileIfExists(const String & path);
/// Very similar case as for getMetadataDiskIfExistsOrSelf(). If disk has "metadata"
/// it will return mapping for each required path: path -> metadata as string.
/// Only for IDiskRemote.
virtual std::unordered_map<String, String> getSerializedMetadata(const std::vector<String> & /* paths */) const { return {}; }
/// Return reference count for remote FS.
/// Overridden in IDiskRemote.
/// You can ask -- why we have zero and what does it mean? For some unknown reason
/// the decision was made to take 0 as "no references exist", but only file itself left.
/// With normal file system we will get 1 in this case:
/// $ stat clickhouse
/// File: clickhouse
/// Size: 3014014920 Blocks: 5886760 IO Block: 4096 regular file
/// Device: 10301h/66305d Inode: 3109907 Links: 1
/// Why we have always zero by default? Because normal filesystem
/// manages hardlinks by itself. So you can always remove hardlink and all
/// other alive harlinks will not be removed.
virtual UInt32 getRefCount(const String &) const { return 0; }
protected:
friend class DiskDecorator;

View File

@ -24,23 +24,64 @@ namespace ErrorCodes
extern const int UNKNOWN_FORMAT;
extern const int FILE_ALREADY_EXISTS;
extern const int PATH_ACCESS_DENIED;;
extern const int CANNOT_DELETE_DIRECTORY;
extern const int FILE_DOESNT_EXIST;
extern const int BAD_FILE_TYPE;
}
/// Load metadata by path or create empty if `create` flag is set.
IDiskRemote::Metadata::Metadata(
const String & remote_fs_root_path_,
DiskPtr metadata_disk_,
const String & metadata_file_path_,
bool create)
: RemoteMetadata(remote_fs_root_path_, metadata_file_path_)
, metadata_disk(metadata_disk_)
, total_size(0), ref_count(0)
IDiskRemote::Metadata IDiskRemote::Metadata::readMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_)
{
if (create)
return;
Metadata result(remote_fs_root_path_, metadata_disk_, metadata_file_path_);
result.load();
return result;
}
IDiskRemote::Metadata IDiskRemote::Metadata::createAndStoreMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync)
{
Metadata result(remote_fs_root_path_, metadata_disk_, metadata_file_path_);
result.save(sync);
return result;
}
IDiskRemote::Metadata IDiskRemote::Metadata::readUpdateAndStoreMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, IDiskRemote::MetadataUpdater updater)
{
Metadata result(remote_fs_root_path_, metadata_disk_, metadata_file_path_);
result.load();
if (updater(result))
result.save(sync);
return result;
}
IDiskRemote::Metadata IDiskRemote::Metadata::createUpdateAndStoreMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, IDiskRemote::MetadataUpdater updater)
{
Metadata result(remote_fs_root_path_, metadata_disk_, metadata_file_path_);
updater(result);
result.save(sync);
return result;
}
IDiskRemote::Metadata IDiskRemote::Metadata::createAndStoreMetadataIfNotExists(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, bool overwrite)
{
if (overwrite || !metadata_disk_->exists(metadata_file_path_))
{
return createAndStoreMetadata(remote_fs_root_path_, metadata_disk_, metadata_file_path_, sync);
}
else
{
auto result = readMetadata(remote_fs_root_path_, metadata_disk_, metadata_file_path_);
if (result.read_only)
throw Exception("File is read-only: " + metadata_file_path_, ErrorCodes::PATH_ACCESS_DENIED);
return result;
}
}
void IDiskRemote::Metadata::load()
{
try
{
const ReadSettings read_settings;
@ -102,103 +143,158 @@ IDiskRemote::Metadata::Metadata(
}
}
/// Load metadata by path or create empty if `create` flag is set.
IDiskRemote::Metadata::Metadata(
const String & remote_fs_root_path_,
DiskPtr metadata_disk_,
const String & metadata_file_path_)
: RemoteMetadata(remote_fs_root_path_, metadata_file_path_)
, metadata_disk(metadata_disk_)
, total_size(0), ref_count(0)
{
}
void IDiskRemote::Metadata::addObject(const String & path, size_t size)
{
total_size += size;
remote_fs_objects.emplace_back(path, size);
}
void IDiskRemote::Metadata::saveToBuffer(WriteBuffer & buf, bool sync)
{
writeIntText(VERSION_RELATIVE_PATHS, buf);
writeChar('\n', buf);
writeIntText(remote_fs_objects.size(), buf);
writeChar('\t', buf);
writeIntText(total_size, buf);
writeChar('\n', buf);
for (const auto & [remote_fs_object_path, remote_fs_object_size] : remote_fs_objects)
{
writeIntText(remote_fs_object_size, buf);
writeChar('\t', buf);
writeEscapedString(remote_fs_object_path, buf);
writeChar('\n', buf);
}
writeIntText(ref_count, buf);
writeChar('\n', buf);
writeBoolText(read_only, buf);
writeChar('\n', buf);
buf.finalize();
if (sync)
buf.sync();
}
/// Fsync metadata file if 'sync' flag is set.
void IDiskRemote::Metadata::save(bool sync)
{
auto buf = metadata_disk->writeFile(metadata_file_path, 1024);
saveToBuffer(*buf, sync);
}
writeIntText(VERSION_RELATIVE_PATHS, *buf);
writeChar('\n', *buf);
std::string IDiskRemote::Metadata::serializeToString()
{
WriteBufferFromOwnString write_buf;
saveToBuffer(write_buf, false);
return write_buf.str();
}
writeIntText(remote_fs_objects.size(), *buf);
writeChar('\t', *buf);
writeIntText(total_size, *buf);
writeChar('\n', *buf);
IDiskRemote::Metadata IDiskRemote::readMetadataUnlocked(const String & path, std::shared_lock<std::shared_mutex> &) const
{
return Metadata::readMetadata(remote_fs_root_path, metadata_disk, path);
}
for (const auto & [remote_fs_object_path, remote_fs_object_size] : remote_fs_objects)
IDiskRemote::Metadata IDiskRemote::readMetadata(const String & path) const
{
std::shared_lock lock(metadata_mutex);
return readMetadataUnlocked(path, lock);
}
IDiskRemote::Metadata IDiskRemote::readUpdateAndStoreMetadata(const String & path, bool sync, IDiskRemote::MetadataUpdater updater)
{
std::unique_lock lock(metadata_mutex);
return Metadata::readUpdateAndStoreMetadata(remote_fs_root_path, metadata_disk, path, sync, updater);
}
IDiskRemote::Metadata IDiskRemote::readOrCreateUpdateAndStoreMetadata(const String & path, WriteMode mode, bool sync, IDiskRemote::MetadataUpdater updater)
{
if (mode == WriteMode::Rewrite || !metadata_disk->exists(path))
{
writeIntText(remote_fs_object_size, *buf);
writeChar('\t', *buf);
writeEscapedString(remote_fs_object_path, *buf);
writeChar('\n', *buf);
std::unique_lock lock(metadata_mutex);
return Metadata::createUpdateAndStoreMetadata(remote_fs_root_path, metadata_disk, path, sync, updater);
}
else
{
return Metadata::readUpdateAndStoreMetadata(remote_fs_root_path, metadata_disk, path, sync, updater);
}
}
IDiskRemote::Metadata IDiskRemote::createAndStoreMetadata(const String & path, bool sync)
{
return Metadata::createAndStoreMetadata(remote_fs_root_path, metadata_disk, path, sync);
}
IDiskRemote::Metadata IDiskRemote::createUpdateAndStoreMetadata(const String & path, bool sync, IDiskRemote::MetadataUpdater updater)
{
return Metadata::createUpdateAndStoreMetadata(remote_fs_root_path, metadata_disk, path, sync, updater);
}
std::unordered_map<String, String> IDiskRemote::getSerializedMetadata(const std::vector<std::string> & file_paths) const
{
std::unordered_map<String, String> metadatas;
std::shared_lock lock(metadata_mutex);
for (const auto & path : file_paths)
{
IDiskRemote::Metadata metadata = readMetadataUnlocked(path, lock);
metadata.ref_count = 0;
metadatas[path] = metadata.serializeToString();
}
writeIntText(ref_count, *buf);
writeChar('\n', *buf);
writeBoolText(read_only, *buf);
writeChar('\n', *buf);
buf->finalize();
if (sync)
buf->sync();
return metadatas;
}
IDiskRemote::Metadata IDiskRemote::readOrCreateMetaForWriting(const String & path, WriteMode mode)
{
bool exist = exists(path);
if (exist)
{
auto metadata = readMeta(path);
if (metadata.read_only)
throw Exception("File is read-only: " + path, ErrorCodes::PATH_ACCESS_DENIED);
if (mode == WriteMode::Rewrite)
removeFile(path); /// Remove for re-write.
else
return metadata;
}
auto metadata = createMeta(path);
/// Save empty metadata to disk to have ability to get file size while buffer is not finalized.
metadata.save();
return metadata;
}
IDiskRemote::Metadata IDiskRemote::readMeta(const String & path) const
{
return Metadata(remote_fs_root_path, metadata_disk, path);
}
IDiskRemote::Metadata IDiskRemote::createMeta(const String & path) const
{
return Metadata(remote_fs_root_path, metadata_disk, path, true);
}
void IDiskRemote::removeMeta(const String & path, RemoteFSPathKeeperPtr fs_paths_keeper)
void IDiskRemote::removeMetadata(const String & path, RemoteFSPathKeeperPtr fs_paths_keeper)
{
LOG_TRACE(log, "Remove file by path: {}", backQuote(metadata_disk->getPath() + path));
if (!metadata_disk->exists(path))
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "Metadata path '{}' doesn't exist", path);
if (!metadata_disk->isFile(path))
throw Exception(ErrorCodes::CANNOT_DELETE_DIRECTORY, "Path '{}' is a directory", path);
throw Exception(ErrorCodes::BAD_FILE_TYPE, "Path '{}' is not a regular file", path);
try
{
auto metadata = readMeta(path);
auto metadata_updater = [fs_paths_keeper, this] (Metadata & metadata)
{
if (metadata.ref_count == 0)
{
for (const auto & [remote_fs_object_path, _] : metadata.remote_fs_objects)
fs_paths_keeper->addPath(remote_fs_root_path + remote_fs_object_path);
return false;
}
else /// In other case decrement number of references, save metadata and delete hardlink.
{
--metadata.ref_count;
}
return true;
};
readUpdateAndStoreMetadata(path, false, metadata_updater);
metadata_disk->removeFile(path);
/// If there is no references - delete content from remote FS.
if (metadata.ref_count == 0)
{
metadata_disk->removeFile(path);
for (const auto & [remote_fs_object_path, _] : metadata.remote_fs_objects)
fs_paths_keeper->addPath(remote_fs_root_path + remote_fs_object_path);
}
else /// In other case decrement number of references, save metadata and delete file.
{
--metadata.ref_count;
metadata.save();
metadata_disk->removeFile(path);
}
}
catch (const Exception & e)
{
@ -216,18 +312,19 @@ void IDiskRemote::removeMeta(const String & path, RemoteFSPathKeeperPtr fs_paths
}
void IDiskRemote::removeMetaRecursive(const String & path, RemoteFSPathKeeperPtr fs_paths_keeper)
void IDiskRemote::removeMetadataRecursive(const String & path, RemoteFSPathKeeperPtr fs_paths_keeper)
{
checkStackSize(); /// This is needed to prevent stack overflow in case of cyclic symlinks.
if (metadata_disk->isFile(path))
{
removeMeta(path, fs_paths_keeper);
removeMetadata(path, fs_paths_keeper);
}
else
{
for (auto it{iterateDirectory(path)}; it->isValid(); it->next())
removeMetaRecursive(it->path(), fs_paths_keeper);
for (auto it = iterateDirectory(path); it->isValid(); it->next())
removeMetadataRecursive(it->path(), fs_paths_keeper);
metadata_disk->removeDirectory(path);
}
}
@ -305,16 +402,13 @@ bool IDiskRemote::isFile(const String & path) const
void IDiskRemote::createFile(const String & path)
{
/// Create empty metadata file.
auto metadata = createMeta(path);
metadata.save();
createAndStoreMetadata(path, false);
}
size_t IDiskRemote::getFileSize(const String & path) const
{
auto metadata = readMeta(path);
return metadata.total_size;
return readMetadata(path).total_size;
}
@ -341,45 +435,45 @@ void IDiskRemote::replaceFile(const String & from_path, const String & to_path)
}
void IDiskRemote::removeSharedFile(const String & path, bool keep_in_remote_fs)
void IDiskRemote::removeSharedFile(const String & path, bool delete_metadata_only)
{
RemoteFSPathKeeperPtr fs_paths_keeper = createFSPathKeeper();
removeMeta(path, fs_paths_keeper);
if (!keep_in_remote_fs)
removeMetadata(path, fs_paths_keeper);
if (!delete_metadata_only)
removeFromRemoteFS(fs_paths_keeper);
}
void IDiskRemote::removeSharedFileIfExists(const String & path, bool keep_in_remote_fs)
void IDiskRemote::removeSharedFileIfExists(const String & path, bool delete_metadata_only)
{
RemoteFSPathKeeperPtr fs_paths_keeper = createFSPathKeeper();
if (metadata_disk->exists(path))
{
removeMeta(path, fs_paths_keeper);
if (!keep_in_remote_fs)
removeMetadata(path, fs_paths_keeper);
if (!delete_metadata_only)
removeFromRemoteFS(fs_paths_keeper);
}
}
void IDiskRemote::removeSharedFiles(const RemoveBatchRequest & files, bool keep_in_remote_fs)
void IDiskRemote::removeSharedFiles(const RemoveBatchRequest & files, bool delete_metadata_only)
{
RemoteFSPathKeeperPtr fs_paths_keeper = createFSPathKeeper();
for (const auto & file : files)
{
bool skip = file.if_exists && !metadata_disk->exists(file.path);
if (!skip)
removeMeta(file.path, fs_paths_keeper);
removeMetadata(file.path, fs_paths_keeper);
}
if (!keep_in_remote_fs)
if (!delete_metadata_only)
removeFromRemoteFS(fs_paths_keeper);
}
void IDiskRemote::removeSharedRecursive(const String & path, bool keep_in_remote_fs)
void IDiskRemote::removeSharedRecursive(const String & path, bool delete_metadata_only)
{
RemoteFSPathKeeperPtr fs_paths_keeper = createFSPathKeeper();
removeMetaRecursive(path, fs_paths_keeper);
if (!keep_in_remote_fs)
removeMetadataRecursive(path, fs_paths_keeper);
if (!delete_metadata_only)
removeFromRemoteFS(fs_paths_keeper);
}
@ -388,9 +482,7 @@ void IDiskRemote::setReadOnly(const String & path)
{
/// We should store read only flag inside metadata file (instead of using FS flag),
/// because we modify metadata file when create hard-links from it.
auto metadata = readMeta(path);
metadata.read_only = true;
metadata.save();
readUpdateAndStoreMetadata(path, false, [] (Metadata & metadata) { metadata.read_only = true; return true; });
}
@ -414,7 +506,7 @@ void IDiskRemote::createDirectories(const String & path)
void IDiskRemote::clearDirectory(const String & path)
{
for (auto it{iterateDirectory(path)}; it->isValid(); it->next())
for (auto it = iterateDirectory(path); it->isValid(); it->next())
if (isFile(it->path()))
removeFile(it->path());
}
@ -453,10 +545,7 @@ Poco::Timestamp IDiskRemote::getLastModified(const String & path)
void IDiskRemote::createHardLink(const String & src_path, const String & dst_path)
{
/// Increment number of references.
auto src = readMeta(src_path);
++src.ref_count;
src.save();
readUpdateAndStoreMetadata(src_path, false, [] (Metadata & metadata) { metadata.ref_count++; return true; });
/// Create FS hardlink to metadata file.
metadata_disk->createHardLink(src_path, dst_path);
@ -498,7 +587,7 @@ bool IDiskRemote::tryReserve(UInt64 bytes)
String IDiskRemote::getUniqueId(const String & path) const
{
LOG_TRACE(log, "Remote path: {}, Path: {}", remote_fs_root_path, path);
Metadata metadata(remote_fs_root_path, metadata_disk, path);
auto metadata = readMetadata(path);
String id;
if (!metadata.remote_fs_objects.empty())
id = metadata.remote_fs_root_path + metadata.remote_fs_objects[0].first;
@ -514,34 +603,9 @@ AsynchronousReaderPtr IDiskRemote::getThreadPoolReader()
return reader;
}
std::unique_ptr<ReadBufferFromFileBase> IDiskRemote::readMetaFile(
const String & path,
const ReadSettings & settings,
std::optional<size_t> size) const
{
LOG_TRACE(log, "Read metafile: {}", path);
return metadata_disk->readFile(path, settings, size);
}
std::unique_ptr<WriteBufferFromFileBase> IDiskRemote::writeMetaFile(
const String & path,
size_t buf_size,
WriteMode mode)
{
LOG_TRACE(log, "Write metafile: {}", path);
return metadata_disk->writeFile(path, buf_size, mode);
}
void IDiskRemote::removeMetaFileIfExists(const String & path)
{
LOG_TRACE(log, "Remove metafile: {}", path);
return metadata_disk->removeFileIfExists(path);
}
UInt32 IDiskRemote::getRefCount(const String & path) const
{
auto meta = readMeta(path);
return meta.ref_count;
return readMetadata(path).ref_count;
}
ThreadPool & IDiskRemote::getThreadPoolWriter()

View File

@ -6,6 +6,8 @@
#include <Disks/DiskFactory.h>
#include <Disks/Executor.h>
#include <utility>
#include <mutex>
#include <shared_mutex>
#include <Common/MultiVersion.h>
#include <Common/ThreadPool.h>
#include <filesystem>
@ -57,16 +59,23 @@ public:
size_t thread_pool_size);
struct Metadata;
using MetadataUpdater = std::function<bool(Metadata & metadata)>;
const String & getName() const final override { return name; }
const String & getPath() const final override { return metadata_disk->getPath(); }
Metadata readMeta(const String & path) const;
/// Methods for working with metadata. For some operations (like hardlink
/// creation) metadata can be updated concurrently from multiple threads
/// (file actually rewritten on disk). So additional RW lock is required for
/// metadata read and write, but not for create new metadata.
Metadata readMetadata(const String & path) const;
Metadata readMetadataUnlocked(const String & path, std::shared_lock<std::shared_mutex> &) const;
Metadata readUpdateAndStoreMetadata(const String & path, bool sync, MetadataUpdater updater);
Metadata readOrCreateUpdateAndStoreMetadata(const String & path, WriteMode mode, bool sync, MetadataUpdater updater);
Metadata createMeta(const String & path) const;
Metadata readOrCreateMetaForWriting(const String & path, WriteMode mode);
Metadata createAndStoreMetadata(const String & path, bool sync);
Metadata createUpdateAndStoreMetadata(const String & path, bool sync, MetadataUpdater updater);
UInt64 getTotalSpace() const override { return std::numeric_limits<UInt64>::max(); }
@ -94,13 +103,13 @@ public:
void removeRecursive(const String & path) override { removeSharedRecursive(path, false); }
void removeSharedFile(const String & path, bool keep_in_remote_fs) override;
void removeSharedFile(const String & path, bool delete_metadata_only) override;
void removeSharedFileIfExists(const String & path, bool keep_in_remote_fs) override;
void removeSharedFileIfExists(const String & path, bool delete_metadata_only) override;
void removeSharedFiles(const RemoveBatchRequest & files, bool keep_in_remote_fs) override;
void removeSharedFiles(const RemoveBatchRequest & files, bool delete_metadata_only) override;
void removeSharedRecursive(const String & path, bool keep_in_remote_fs) override;
void removeSharedRecursive(const String & path, bool delete_metadata_only) override;
void listFiles(const String & path, std::vector<String> & file_names) override;
@ -139,21 +148,14 @@ public:
static AsynchronousReaderPtr getThreadPoolReader();
static ThreadPool & getThreadPoolWriter();
virtual std::unique_ptr<ReadBufferFromFileBase> readMetaFile(
const String & path,
const ReadSettings & settings,
std::optional<size_t> size) const override;
virtual std::unique_ptr<WriteBufferFromFileBase> writeMetaFile(
const String & path,
size_t buf_size,
WriteMode mode) override;
virtual void removeMetaFileIfExists(
const String & path) override;
DiskPtr getMetadataDiskIfExistsOrSelf() override { return metadata_disk; }
UInt32 getRefCount(const String & path) const override;
/// Return metadata for each file path. Also, before serialization reset
/// ref_count for each metadata to zero. This function used only for remote
/// fetches/sends in replicated engines. That's why we reset ref_count to zero.
std::unordered_map<String, String> getSerializedMetadata(const std::vector<String> & file_paths) const override;
protected:
Poco::Logger * log;
const String name;
@ -162,15 +164,16 @@ protected:
DiskPtr metadata_disk;
private:
void removeMeta(const String & path, RemoteFSPathKeeperPtr fs_paths_keeper);
void removeMetadata(const String & path, RemoteFSPathKeeperPtr fs_paths_keeper);
void removeMetaRecursive(const String & path, RemoteFSPathKeeperPtr fs_paths_keeper);
void removeMetadataRecursive(const String & path, RemoteFSPathKeeperPtr fs_paths_keeper);
bool tryReserve(UInt64 bytes);
UInt64 reserved_bytes = 0;
UInt64 reservation_count = 0;
std::mutex reservation_mutex;
mutable std::shared_mutex metadata_mutex;
};
using RemoteDiskPtr = std::shared_ptr<IDiskRemote>;
@ -200,6 +203,7 @@ struct RemoteMetadata
struct IDiskRemote::Metadata : RemoteMetadata
{
using Updater = std::function<bool(IDiskRemote::Metadata & metadata)>;
/// Metadata file version.
static constexpr UInt32 VERSION_ABSOLUTE_PATHS = 1;
static constexpr UInt32 VERSION_RELATIVE_PATHS = 2;
@ -211,22 +215,36 @@ struct IDiskRemote::Metadata : RemoteMetadata
size_t total_size = 0;
/// Number of references (hardlinks) to this metadata file.
///
/// FIXME: Why we are tracking it explicetly, without
/// info from filesystem????
UInt32 ref_count = 0;
/// Flag indicates that file is read only.
bool read_only = false;
/// Load metadata by path or create empty if `create` flag is set.
Metadata(const String & remote_fs_root_path_,
DiskPtr metadata_disk_,
const String & metadata_file_path_,
bool create = false);
Metadata(
const String & remote_fs_root_path_,
DiskPtr metadata_disk_,
const String & metadata_file_path_);
void addObject(const String & path, size_t size);
static Metadata readMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_);
static Metadata readUpdateAndStoreMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, Updater updater);
static Metadata createAndStoreMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync);
static Metadata createUpdateAndStoreMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, Updater updater);
static Metadata createAndStoreMetadataIfNotExists(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, bool overwrite);
/// Serialize metadata to string (very same with saveToBuffer)
std::string serializeToString();
private:
/// Fsync metadata file if 'sync' flag is set.
void save(bool sync = false);
void saveToBuffer(WriteBuffer & buffer, bool sync);
void load();
};
class DiskRemoteReservation final : public IReservation

View File

@ -12,15 +12,14 @@ namespace DB
template <typename T>
WriteIndirectBufferFromRemoteFS<T>::WriteIndirectBufferFromRemoteFS(
std::unique_ptr<T> impl_,
IDiskRemote::Metadata metadata_,
const String & remote_fs_path_)
CreateMetadataCallback && create_callback_,
const String & metadata_file_path_)
: WriteBufferFromFileDecorator(std::move(impl_))
, metadata(std::move(metadata_))
, remote_fs_path(remote_fs_path_)
, create_metadata_callback(std::move(create_callback_))
, metadata_file_path(metadata_file_path_)
{
}
template <typename T>
WriteIndirectBufferFromRemoteFS<T>::~WriteIndirectBufferFromRemoteFS()
{
@ -34,25 +33,13 @@ WriteIndirectBufferFromRemoteFS<T>::~WriteIndirectBufferFromRemoteFS()
}
}
template <typename T>
void WriteIndirectBufferFromRemoteFS<T>::finalizeImpl()
{
WriteBufferFromFileDecorator::finalizeImpl();
metadata.addObject(remote_fs_path, count());
metadata.save();
create_metadata_callback(count());
}
template <typename T>
void WriteIndirectBufferFromRemoteFS<T>::sync()
{
if (finalized)
metadata.save(true);
}
#if USE_AWS_S3
template
class WriteIndirectBufferFromRemoteFS<WriteBufferFromS3>;

View File

@ -9,6 +9,8 @@
namespace DB
{
using CreateMetadataCallback = std::function<void(size_t bytes_count)>;
/// Stores data in S3/HDFS and adds the object path and object size to metadata file on local FS.
template <typename T>
class WriteIndirectBufferFromRemoteFS final : public WriteBufferFromFileDecorator
@ -16,21 +18,18 @@ class WriteIndirectBufferFromRemoteFS final : public WriteBufferFromFileDecorato
public:
WriteIndirectBufferFromRemoteFS(
std::unique_ptr<T> impl_,
IDiskRemote::Metadata metadata_,
const String & remote_fs_path_);
CreateMetadataCallback && create_callback_,
const String & metadata_file_path_);
virtual ~WriteIndirectBufferFromRemoteFS() override;
~WriteIndirectBufferFromRemoteFS() override;
void sync() override;
String getFileName() const override { return metadata.metadata_file_path; }
String getFileName() const override { return metadata_file_path; }
private:
void finalizeImpl() override;
IDiskRemote::Metadata metadata;
String remote_fs_path;
CreateMetadataCallback create_metadata_callback;
String metadata_file_path;
};
}

View File

@ -218,7 +218,7 @@ void DiskS3::moveFile(const String & from_path, const String & to_path, bool sen
std::unique_ptr<ReadBufferFromFileBase> DiskS3::readFile(const String & path, const ReadSettings & read_settings, std::optional<size_t>, std::optional<size_t>) const
{
auto settings = current_settings.get();
auto metadata = readMeta(path);
auto metadata = readMetadata(path);
LOG_TEST(log, "Read from file by path: {}. Existing S3 objects: {}",
backQuote(metadata_disk->getPath() + path), metadata.remote_fs_objects.size());
@ -245,10 +245,9 @@ std::unique_ptr<ReadBufferFromFileBase> DiskS3::readFile(const String & path, co
std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path, size_t buf_size, WriteMode mode)
{
auto settings = current_settings.get();
auto metadata = readOrCreateMetaForWriting(path, mode);
/// Path to store new S3 object.
auto s3_path = getRandomASCIIString();
auto blob_name = getRandomASCIIString();
std::optional<ObjectMetadata> object_metadata;
if (settings->send_metadata)
@ -257,40 +256,45 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
object_metadata = {
{"path", path}
};
s3_path = "r" + revisionToString(revision) + "-file-" + s3_path;
blob_name = "r" + revisionToString(revision) + "-file-" + blob_name;
}
LOG_TRACE(log, "{} to file by path: {}. S3 path: {}",
mode == WriteMode::Rewrite ? "Write" : "Append", backQuote(metadata_disk->getPath() + path), remote_fs_root_path + s3_path);
mode == WriteMode::Rewrite ? "Write" : "Append", backQuote(metadata_disk->getPath() + path), remote_fs_root_path + blob_name);
ScheduleFunc schedule = [pool = &getThreadPoolWriter()](auto callback)
{
pool->scheduleOrThrow([callback = std::move(callback), thread_group = CurrentThread::getGroup()]()
{
if (thread_group)
CurrentThread::attachTo(thread_group);
/// FIXME -- thread pool lead to obscure segfaults
/// ScheduleFunc schedule = [pool = &getThreadPoolWriter(), thread_group = CurrentThread::getGroup()](auto callback)
/// {
/// pool->scheduleOrThrow([callback = std::move(callback), thread_group]()
/// {
/// if (thread_group)
/// CurrentThread::attachTo(thread_group);
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
callback();
});
};
/// SCOPE_EXIT_SAFE(
/// if (thread_group)
/// CurrentThread::detachQueryIfNotDetached();
/// );
/// callback();
/// });
/// };
auto s3_buffer = std::make_unique<WriteBufferFromS3>(
settings->client,
bucket,
metadata.remote_fs_root_path + s3_path,
remote_fs_root_path + blob_name,
settings->s3_min_upload_part_size,
settings->s3_upload_part_size_multiply_factor,
settings->s3_upload_part_size_multiply_parts_count_threshold,
settings->s3_max_single_part_upload_size,
std::move(object_metadata),
buf_size,
std::move(schedule));
buf_size /*, std::move(schedule) */);
return std::make_unique<WriteIndirectBufferFromRemoteFS<WriteBufferFromS3>>(std::move(s3_buffer), std::move(metadata), s3_path);
auto create_metadata_callback = [this, path, blob_name, mode] (size_t count)
{
readOrCreateUpdateAndStoreMetadata(path, mode, false, [blob_name, count] (Metadata & metadata) { metadata.addObject(blob_name, count); return true; });
};
return std::make_unique<WriteIndirectBufferFromRemoteFS<WriteBufferFromS3>>(std::move(s3_buffer), std::move(create_metadata_callback), path);
}
void DiskS3::createHardLink(const String & src_path, const String & dst_path)
@ -312,13 +316,7 @@ void DiskS3::createHardLink(const String & src_path, const String & dst_path, bo
createFileOperationObject("hardlink", revision, object_metadata);
}
/// Increment number of references.
auto src = readMeta(src_path);
++src.ref_count;
src.save();
/// Create FS hardlink to metadata file.
metadata_disk->createHardLink(src_path, dst_path);
IDiskRemote::createHardLink(src_path, dst_path);
}
void DiskS3::shutdown()
@ -438,7 +436,7 @@ void DiskS3::migrateFileToRestorableSchema(const String & path)
{
LOG_TRACE(log, "Migrate file {} to restorable schema", metadata_disk->getPath() + path);
auto meta = readMeta(path);
auto meta = readMetadata(path);
for (const auto & [key, _] : meta.remote_fs_objects)
{
@ -894,15 +892,19 @@ void DiskS3::processRestoreFiles(const String & source_bucket, const String & so
const auto & path = path_entry->second;
createDirectories(directoryPath(path));
auto metadata = createMeta(path);
auto relative_key = shrinkKey(source_path, key);
/// Copy object if we restore to different bucket / path.
if (bucket != source_bucket || remote_fs_root_path != source_path)
copyObject(source_bucket, key, bucket, remote_fs_root_path + relative_key, head_result);
metadata.addObject(relative_key, head_result.GetContentLength());
metadata.save();
auto updater = [relative_key, head_result] (Metadata & metadata)
{
metadata.addObject(relative_key, head_result.GetContentLength());
return true;
};
createUpdateAndStoreMetadata(path, false, updater);
LOG_TRACE(log, "Restored file {}", path);
}

View File

@ -176,6 +176,10 @@ void registerDiskS3(DiskFactory & factory)
ContextPtr context,
const DisksMap & /*map*/) -> DiskPtr {
S3::URI uri(Poco::URI(config.getString(config_prefix + ".endpoint")));
if (uri.key.empty())
throw Exception("Empty S3 path specified in disk configuration", ErrorCodes::BAD_ARGUMENTS);
if (uri.key.back() != '/')
throw Exception("S3 path must ends with '/', but '" + uri.key + "' doesn't.", ErrorCodes::BAD_ARGUMENTS);
@ -200,7 +204,16 @@ void registerDiskS3(DiskFactory & factory)
s3disk->startup();
if (config.getBool(config_prefix + ".cache_enabled", true))
#ifdef NDEBUG
bool use_cache = true;
#else
/// Current S3 cache implementation lead to allocations in destructor of
/// read buffer.
bool use_cache = false;
#endif
if (config.getBool(config_prefix + ".cache_enabled", use_cache))
{
String cache_path = config.getString(config_prefix + ".cache_path", context->getPath() + "disks/" + name + "/cache/");
s3disk = wrapWithCache(s3disk, "s3-cache", cache_path, metadata_path);

View File

@ -79,6 +79,10 @@ void WriteBufferFromS3::nextImpl()
if (!offset())
return;
/// Buffer in a bad state after exception
if (temporary_buffer->tellp() == -1)
allocateBuffer();
temporary_buffer->write(working_buffer.begin(), offset());
ProfileEvents::increment(ProfileEvents::S3WriteBytes, offset());
@ -91,6 +95,7 @@ void WriteBufferFromS3::nextImpl()
if (!multipart_upload_id.empty() && last_part_size > upload_part_size)
{
writePart();
allocateBuffer();
@ -168,7 +173,10 @@ void WriteBufferFromS3::writePart()
LOG_DEBUG(log, "Writing part. Bucket: {}, Key: {}, Upload_id: {}, Size: {}", bucket, key, multipart_upload_id, size);
if (size < 0)
throw Exception("Failed to write part. Buffer in invalid state.", ErrorCodes::S3_ERROR);
{
LOG_WARNING(log, "Skipping part upload. Buffer is in bad state, it means that we have tried to upload something, but got an exception.");
return;
}
if (size == 0)
{
@ -292,7 +300,10 @@ void WriteBufferFromS3::makeSinglepartUpload()
LOG_DEBUG(log, "Making single part upload. Bucket: {}, Key: {}, Size: {}, WithPool: {}", bucket, key, size, with_pool);
if (size < 0)
throw Exception("Failed to make single part upload. Buffer in invalid state", ErrorCodes::S3_ERROR);
{
LOG_WARNING(log, "Skipping single part upload. Buffer is in bad state, it mean that we have tried to upload something, but got an exception.");
return;
}
if (size == 0)
{

View File

@ -1106,6 +1106,20 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
throw Exception(storage_already_exists_error_code,
"{} {}.{} already exists", storage_name, backQuoteIfNeed(create.getDatabase()), backQuoteIfNeed(create.getTable()));
}
else if (!create.attach)
{
/// Checking that table may exists in detached/detached permanently state
try
{
database->checkMetadataFilenameAvailability(create.getTable());
}
catch (const Exception &)
{
if (create.if_not_exists)
return false;
throw;
}
}
data_path = database->getTableDataPath(create);

View File

@ -25,7 +25,7 @@ struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl
HDFSBuilderWrapper builder;
HDFSFSPtr fs;
explicit WriteBufferFromHDFSImpl(
WriteBufferFromHDFSImpl(
const std::string & hdfs_uri_,
const Poco::Util::AbstractConfiguration & config_,
int replication_,

View File

@ -314,6 +314,10 @@ MergeTreeData::DataPart::Checksums Service::sendPartFromDisk(
void Service::sendPartFromDiskRemoteMeta(const MergeTreeData::DataPartPtr & part, WriteBuffer & out)
{
auto disk = part->volume->getDisk();
if (!disk->supportZeroCopyReplication())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Disk '{}' doesn't support zero-copy replication", disk->getName());
/// We'll take a list of files from the list of checksums.
MergeTreeData::DataPart::Checksums checksums = part->checksums;
/// Add files that are not in the checksum list.
@ -321,11 +325,13 @@ void Service::sendPartFromDiskRemoteMeta(const MergeTreeData::DataPartPtr & part
for (const auto & file_name : file_names_without_checksums)
checksums.files[file_name] = {};
auto disk = part->volume->getDisk();
if (!disk->supportZeroCopyReplication())
throw Exception(ErrorCodes::LOGICAL_ERROR, "disk {} doesn't support zero-copy replication", disk->getName());
std::vector<std::string> paths;
paths.reserve(checksums.files.size());
for (const auto & it : checksums.files)
paths.push_back(fs::path(part->getFullRelativePath()) / it.first);
part->storage.lockSharedData(*part);
/// Serialized metadatadatas with zero ref counts.
auto metadatas = disk->getSerializedMetadata(paths);
String part_id = part->getUniqueId();
writeStringBinary(part_id, out);
@ -333,29 +339,32 @@ void Service::sendPartFromDiskRemoteMeta(const MergeTreeData::DataPartPtr & part
writeBinary(checksums.files.size(), out);
for (const auto & it : checksums.files)
{
String file_name = it.first;
String metadata_file = fs::path(disk->getPath()) / part->getFullRelativePath() / file_name;
fs::path metadata(metadata_file);
const String & file_name = it.first;
String file_path_prefix = fs::path(part->getFullRelativePath()) / file_name;
/// Just some additional checks
String metadata_file_path = fs::path(disk->getPath()) / file_path_prefix;
fs::path metadata(metadata_file_path);
if (!fs::exists(metadata))
throw Exception(ErrorCodes::CORRUPTED_DATA, "Remote metadata '{}' is not exists", file_name);
if (!fs::is_regular_file(metadata))
throw Exception(ErrorCodes::CORRUPTED_DATA, "Remote metadata '{}' is not a file", file_name);
UInt64 file_size = fs::file_size(metadata);
/// Actual metadata send
auto metadata_str = metadatas[file_path_prefix];
UInt64 file_size = metadata_str.size();
ReadBufferFromString buf(metadata_str);
writeStringBinary(it.first, out);
writeBinary(file_size, out);
auto file_in = createReadBufferFromFileBase(metadata_file, /* settings= */ {});
HashingWriteBuffer hashing_out(out);
copyDataWithThrottler(*file_in, hashing_out, blocker.getCounter(), data.getSendsThrottler());
copyDataWithThrottler(buf, hashing_out, blocker.getCounter(), data.getSendsThrottler());
if (blocker.isCancelled())
throw Exception("Transferring part to replica was cancelled", ErrorCodes::ABORTED);
if (hashing_out.count() != file_size)
throw Exception(ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART, "Unexpected size of file {}", metadata_file);
throw Exception(ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART, "Unexpected size of file {}", metadata_file_path);
writePODBinary(hashing_out.getHash(), out);
}
@ -767,9 +776,12 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDiskRemoteMeta(
{
throw Exception(ErrorCodes::ZERO_COPY_REPLICATION_ERROR, "Part {} unique id {} doesn't exist on {}.", part_name, part_id, disk->getName());
}
LOG_DEBUG(log, "Downloading Part {} unique id {} metadata onto disk {}.",
part_name, part_id, disk->getName());
data.lockSharedDataTemporary(part_name, part_id, disk);
static const String TMP_PREFIX = "tmp-fetch_";
String tmp_prefix = tmp_prefix_.empty() ? TMP_PREFIX : tmp_prefix_;
@ -834,7 +846,10 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDiskRemoteMeta(
new_data_part->modification_time = time(nullptr);
new_data_part->loadColumnsChecksumsIndexes(true, false);
new_data_part->storage.lockSharedData(*new_data_part);
data.lockSharedData(*new_data_part, /* replace_existing_lock = */ true);
LOG_DEBUG(log, "Download of part {} unique id {} metadata onto disk {} finished.",
part_name, part_id, disk->getName());
return new_data_part;
}

View File

@ -63,7 +63,7 @@ private:
class Fetcher final : private boost::noncopyable
{
public:
explicit Fetcher(MergeTreeData & data_) : data(data_), log(&Poco::Logger::get("Fetcher")) {}
explicit Fetcher(StorageReplicatedMergeTree & data_) : data(data_), log(&Poco::Logger::get("Fetcher")) {}
/// Downloads a part to tmp_directory. If to_detached - downloads to the `detached` directory.
MergeTreeData::MutableDataPartPtr fetchPart(
@ -129,7 +129,7 @@ private:
PooledReadWriteBufferFromHTTP & in,
ThrottlerPtr throttler);
MergeTreeData & data;
StorageReplicatedMergeTree & data;
Poco::Logger * log;
};

View File

@ -1187,16 +1187,7 @@ std::optional<bool> IMergeTreeDataPart::keepSharedDataInDecoupledStorage() const
if (force_keep_shared_data)
return true;
/// TODO Unlocking in try-catch and ignoring exception look ugly
try
{
return !storage.unlockSharedData(*this);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__, "There is a problem with deleting part " + name + " from filesystem");
}
return {};
return !storage.unlockSharedData(*this);
}
void IMergeTreeDataPart::remove() const
@ -1642,18 +1633,10 @@ String IMergeTreeDataPart::getUniqueId() const
if (!disk->supportZeroCopyReplication())
throw Exception(fmt::format("Disk {} doesn't support zero-copy replication", disk->getName()), ErrorCodes::LOGICAL_ERROR);
String id = disk->getUniqueId(fs::path(getFullRelativePath()) / "checksums.txt");
return id;
return disk->getUniqueId(fs::path(getFullRelativePath()) / FILE_FOR_REFERENCES_CHECK);
}
UInt32 IMergeTreeDataPart::getNumberOfRefereneces() const
{
return volume->getDisk()->getRefCount(fs::path(getFullRelativePath()) / "checksums.txt");
}
String IMergeTreeDataPart::getZeroLevelPartBlockID(const std::string_view token) const
String IMergeTreeDataPart::getZeroLevelPartBlockID(std::string_view token) const
{
if (info.level != 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to get block id for non zero level part {}", name);

View File

@ -408,6 +408,18 @@ public:
/// (number of rows, number of rows with default values, etc).
static inline constexpr auto SERIALIZATION_FILE_NAME = "serialization.json";
/// One of part files which is used to check how many references (I'd like
/// to say hardlinks, but it will confuse even more) we have for the part
/// for zero copy replication. Sadly it's very complex.
///
/// NOTE: it's not a random "metadata" file for part like 'columns.txt'. If
/// two relative parts (for example all_1_1_0 and all_1_1_0_100) has equal
/// checksums.txt it means that one part was obtained by FREEZE operation or
/// it was mutation without any change for source part. In this case we
/// really don't need to remove data from remote FS and need only decrement
/// reference counter locally.
static inline constexpr auto FILE_FOR_REFERENCES_CHECK = "checksums.txt";
/// Checks that all TTLs (table min/max, column ttls, so on) for part
/// calculated. Part without calculated TTL may exist if TTL was added after
/// part creation (using alter query with materialize_ttl setting).
@ -417,10 +429,6 @@ public:
/// Required for distinguish different copies of the same part on remote FS.
String getUniqueId() const;
/// Return hardlink count for part.
/// Required for keep data on remote FS when part has shadow copies.
UInt32 getNumberOfRefereneces() const;
protected:
/// Total size of all columns, calculated once in calcuateColumnSizesOnDisk

View File

@ -79,10 +79,7 @@ NameSet IMergedBlockOutputStream::removeEmptyColumnsFromPart(
for (const String & removed_file : remove_files)
{
if (checksums.files.count(removed_file))
{
data_part->volume->getDisk()->removeFile(data_part->getFullRelativePath() + removed_file);
checksums.files.erase(removed_file);
}
}
/// Remove columns from columns array

View File

@ -46,14 +46,10 @@ std::pair<bool, ReplicatedMergeMutateTaskBase::PartLogWriter> MergeFromLogEntryT
/// In some use cases merging can be more expensive than fetching
/// and it may be better to spread merges tasks across the replicas
/// instead of doing exactly the same merge cluster-wise
std::optional<String> replica_to_execute_merge;
bool replica_to_execute_merge_picked = false;
if (storage.merge_strategy_picker.shouldMergeOnSingleReplica(entry))
{
replica_to_execute_merge = storage.merge_strategy_picker.pickReplicaToExecuteMerge(entry);
replica_to_execute_merge_picked = true;
std::optional<String> replica_to_execute_merge = storage.merge_strategy_picker.pickReplicaToExecuteMerge(entry);
if (replica_to_execute_merge)
{
LOG_DEBUG(log,
@ -158,22 +154,24 @@ std::pair<bool, ReplicatedMergeMutateTaskBase::PartLogWriter> MergeFromLogEntryT
future_merged_part->updatePath(storage, reserved_space.get());
future_merged_part->merge_type = entry.merge_type;
if (storage_settings_ptr->allow_remote_fs_zero_copy_replication)
{
if (auto disk = reserved_space->getDisk(); disk->getType() == DB::DiskType::S3)
{
if (storage.merge_strategy_picker.shouldMergeOnSingleReplicaShared(entry))
String dummy;
if (!storage.findReplicaHavingCoveringPart(entry.new_part_name, true, dummy).empty())
{
if (!replica_to_execute_merge_picked)
replica_to_execute_merge = storage.merge_strategy_picker.pickReplicaToExecuteMerge(entry);
LOG_DEBUG(log, "Merge of part {} finished by some other replica, will fetch merged part", entry.new_part_name);
return {false, {}};
}
if (replica_to_execute_merge)
{
LOG_DEBUG(log,
"Prefer fetching part {} from replica {} due s3_execute_merges_on_single_replica_time_threshold",
entry.new_part_name, replica_to_execute_merge.value());
return {false, {}};
}
zero_copy_lock = storage.tryCreateZeroCopyExclusiveLock(entry.new_part_name, disk);
if (!zero_copy_lock)
{
LOG_DEBUG(log, "Merge of part {} started by some other replica, will wait it and fetch merged part", entry.new_part_name);
return {false, {}};
}
}
}
@ -271,6 +269,9 @@ bool MergeFromLogEntryTask::finalize(ReplicatedMergeMutateTaskBase::PartLogWrite
throw;
}
if (zero_copy_lock)
zero_copy_lock->lock->unlock();
/** Removing old parts from ZK and from the disk is delayed - see ReplicatedMergeTreeCleanupThread, clearOldParts.
*/

View File

@ -8,6 +8,7 @@
#include <Storages/MergeTree/ReplicatedMergeTreeQueue.h>
#include <Storages/MergeTree/ReplicatedMergeTreeLogEntry.h>
#include <Storages/MergeTree/ReplicatedMergeMutateTaskBase.h>
#include <Storages/MergeTree/ZeroCopyLock.h>
namespace DB
@ -37,6 +38,7 @@ private:
MergeTreeData::DataPartsVector parts;
MergeTreeData::TransactionUniquePtr transaction_ptr{nullptr};
std::optional<ZeroCopyLock> zero_copy_lock;
StopwatchUniquePtr stopwatch_ptr{nullptr};
MergeTreeData::MutableDataPartPtr part;

View File

@ -265,6 +265,14 @@ MergeTreeData::MergeTreeData(
/// Creating directories, if not exist.
for (const auto & disk : getDisks())
{
/// TODO: implement it the main issue in DataPartsExchange (not able to send directories metadata)
if (supportsReplication() && settings->allow_remote_fs_zero_copy_replication
&& disk->supportZeroCopyReplication() && metadata_.hasProjections())
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Projections are not supported when zero-copy replication is enabled for table. "
"Currently disk '{}' supports zero copy replication", disk->getName());
}
if (disk->isBroken())
continue;
@ -2033,11 +2041,26 @@ void MergeTreeData::checkAlterIsPossible(const AlterCommands & commands, Context
"ALTER ADD INDEX is not supported for tables with the old syntax",
ErrorCodes::BAD_ARGUMENTS);
}
if (command.type == AlterCommand::ADD_PROJECTION && !is_custom_partitioned)
if (command.type == AlterCommand::ADD_PROJECTION)
{
throw Exception(
"ALTER ADD PROJECTION is not supported for tables with the old syntax",
ErrorCodes::BAD_ARGUMENTS);
if (!is_custom_partitioned)
throw Exception(
"ALTER ADD PROJECTION is not supported for tables with the old syntax",
ErrorCodes::BAD_ARGUMENTS);
/// TODO: implement it the main issue in DataPartsExchange (not able to send directories metadata)
if (supportsReplication() && getSettings()->allow_remote_fs_zero_copy_replication)
{
auto storage_policy = getStoragePolicy();
auto disks = storage_policy->getDisks();
for (const auto & disk : disks)
{
if (disk->supportZeroCopyReplication())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "ALTER ADD PROJECTION is not supported when zero-copy replication is enabled for table. "
"Currently disk '{}' supports zero copy replication", disk->getName());
}
}
}
if (command.type == AlterCommand::RENAME_COLUMN)
{
@ -5696,7 +5719,7 @@ bool MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagge
/// replica will actually move the part from disk to some
/// zero-copy storage other replicas will just fetch
/// metainformation.
if (auto lock = tryCreateZeroCopyExclusiveLock(moving_part.part, disk); lock)
if (auto lock = tryCreateZeroCopyExclusiveLock(moving_part.part->name, disk); lock)
{
cloned_part = parts_mover.clonePart(moving_part);
parts_mover.swapClonedPart(cloned_part);

View File

@ -876,7 +876,7 @@ public:
/// Lock part in zookeeper for shared data in several nodes
/// Overridden in StorageReplicatedMergeTree
virtual void lockSharedData(const IMergeTreeDataPart &) const {}
virtual void lockSharedData(const IMergeTreeDataPart &, bool = false) const {}
/// Unlock shared data part in zookeeper
/// Overridden in StorageReplicatedMergeTree
@ -1199,7 +1199,7 @@ private:
/// Create zero-copy exclusive lock for part and disk. Useful for coordination of
/// distributed operations which can lead to data duplication. Implemented only in ReplicatedMergeTree.
virtual std::optional<ZeroCopyLock> tryCreateZeroCopyExclusiveLock(const DataPartPtr &, const DiskPtr &) { return std::nullopt; }
virtual std::optional<ZeroCopyLock> tryCreateZeroCopyExclusiveLock(const String &, const DiskPtr &) { return std::nullopt; }
};
/// RAII struct to record big parts that are submerging or emerging.

View File

@ -303,7 +303,6 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
SelectPartsDecision MergeTreeDataMergerMutator::selectAllPartsToMergeWithinPartition(
FutureMergedMutatedPartPtr future_part,
UInt64 & available_disk_space,
const AllowedMergingPredicate & can_merge,
const String & partition_id,
bool final,
@ -355,6 +354,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectAllPartsToMergeWithinParti
++it;
}
auto available_disk_space = data.getStoragePolicy()->getMaxUnreservedFreeSpace();
/// Enough disk space to cover the new merge with a margin.
auto required_disk_space = sum_bytes * DISK_USAGE_COEFFICIENT_TO_SELECT;
if (available_disk_space <= required_disk_space)
@ -382,7 +382,6 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectAllPartsToMergeWithinParti
LOG_DEBUG(log, "Selected {} parts from {} to {}", parts.size(), parts.front()->name, parts.back()->name);
future_part->assign(std::move(parts));
available_disk_space -= required_disk_space;
return SelectPartsDecision::SELECTED;
}

View File

@ -81,7 +81,6 @@ public:
*/
SelectPartsDecision selectAllPartsToMergeWithinPartition(
FutureMergedMutatedPartPtr future_part,
UInt64 & available_disk_space,
const AllowedMergingPredicate & can_merge,
const String & partition_id,
bool final,

View File

@ -109,6 +109,8 @@ void MergeTreeWriteAheadLog::rotate(const std::unique_lock<std::mutex> &)
+ toString(min_block_number) + "_"
+ toString(max_block_number) + WAL_FILE_EXTENSION;
/// Finalize stream before file rename
out->finalize();
disk->replaceFile(path, storage.getRelativeDataPath() + new_name);
init();
}

View File

@ -55,11 +55,15 @@ struct MergedBlockOutputStream::Finalizer::Impl
{
IMergeTreeDataPartWriter & writer;
MergeTreeData::MutableDataPartPtr part;
NameSet files_to_remove_after_finish;
std::vector<std::unique_ptr<WriteBufferFromFileBase>> written_files;
bool sync;
Impl(IMergeTreeDataPartWriter & writer_, MergeTreeData::MutableDataPartPtr part_, bool sync_)
: writer(writer_), part(std::move(part_)), sync(sync_) {}
Impl(IMergeTreeDataPartWriter & writer_, MergeTreeData::MutableDataPartPtr part_, const NameSet & files_to_remove_after_finish_, bool sync_)
: writer(writer_)
, part(std::move(part_))
, files_to_remove_after_finish(files_to_remove_after_finish_)
, sync(sync_) {}
void finish();
};
@ -75,6 +79,10 @@ void MergedBlockOutputStream::Finalizer::Impl::finish()
{
writer.finish(sync);
auto disk = part->volume->getDisk();
for (const auto & file_name: files_to_remove_after_finish)
disk->removeFile(part->getFullRelativePath() + file_name);
for (auto & file : written_files)
{
file->finalize();
@ -133,19 +141,20 @@ MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePartAsync(
projection_part->checksums.getTotalSizeOnDisk(),
projection_part->checksums.getTotalChecksumUInt128());
NameSet files_to_remove_after_sync;
if (reset_columns)
{
auto part_columns = total_columns_list ? *total_columns_list : columns_list;
auto serialization_infos = new_part->getSerializationInfos();
serialization_infos.replaceData(new_serialization_infos);
removeEmptyColumnsFromPart(new_part, part_columns, serialization_infos, checksums);
files_to_remove_after_sync = removeEmptyColumnsFromPart(new_part, part_columns, serialization_infos, checksums);
new_part->setColumns(part_columns);
new_part->setSerializationInfos(serialization_infos);
}
auto finalizer = std::make_unique<Finalizer::Impl>(*writer, new_part, sync);
auto finalizer = std::make_unique<Finalizer::Impl>(*writer, new_part, files_to_remove_after_sync, sync);
if (new_part->isStoredOnDisk())
finalizer->written_files = finalizePartOnDisk(new_part, checksums);

View File

@ -74,9 +74,18 @@ MergedColumnOnlyOutputStream::fillChecksums(
serialization_infos.replaceData(new_serialization_infos);
auto removed_files = removeEmptyColumnsFromPart(new_part, columns, serialization_infos, checksums);
auto disk = new_part->volume->getDisk();
for (const String & removed_file : removed_files)
{
auto file_path = new_part->getFullRelativePath() + removed_file;
/// Can be called multiple times, don't need to remove file twice
if (disk->exists(file_path))
disk->removeFile(file_path);
if (all_checksums.files.count(removed_file))
all_checksums.files.erase(removed_file);
}
new_part->setColumns(columns);
new_part->setSerializationInfos(serialization_infos);

View File

@ -52,6 +52,23 @@ std::pair<bool, ReplicatedMergeMutateTaskBase::PartLogWriter> MutateFromLogEntry
}
}
/// In some use cases merging can be more expensive than fetching
/// and it may be better to spread merges tasks across the replicas
/// instead of doing exactly the same merge cluster-wise
if (storage.merge_strategy_picker.shouldMergeOnSingleReplica(entry))
{
std::optional<String> replica_to_execute_merge = storage.merge_strategy_picker.pickReplicaToExecuteMerge(entry);
if (replica_to_execute_merge)
{
LOG_DEBUG(log,
"Prefer fetching part {} from replica {} due to execute_merges_on_single_replica_time_threshold",
entry.new_part_name, replica_to_execute_merge.value());
return {false, {}};
}
}
new_part_info = MergeTreePartInfo::fromPartName(entry.new_part_name, storage.format_version);
commands = MutationCommands::create(storage.queue.getMutationCommands(source_part, new_part_info.mutation));
@ -73,6 +90,28 @@ std::pair<bool, ReplicatedMergeMutateTaskBase::PartLogWriter> MutateFromLogEntry
future_mutated_part->updatePath(storage, reserved_space.get());
future_mutated_part->type = source_part->getType();
if (storage_settings_ptr->allow_remote_fs_zero_copy_replication)
{
if (auto disk = reserved_space->getDisk(); disk->getType() == DB::DiskType::S3)
{
String dummy;
if (!storage.findReplicaHavingCoveringPart(entry.new_part_name, true, dummy).empty())
{
LOG_DEBUG(log, "Mutation of part {} finished by some other replica, will download merged part", entry.new_part_name);
return {false, {}};
}
zero_copy_lock = storage.tryCreateZeroCopyExclusiveLock(entry.new_part_name, disk);
if (!zero_copy_lock)
{
LOG_DEBUG(log, "Mutation of part {} started by some other replica, will wait it and fetch merged part", entry.new_part_name);
return {false, {}};
}
}
}
const Settings & settings = storage.getContext()->getSettingsRef();
merge_mutate_entry = storage.getContext()->getMergeList().insert(
storage.getStorageID(),
@ -140,6 +179,12 @@ bool MutateFromLogEntryTask::finalize(ReplicatedMergeMutateTaskBase::PartLogWrit
throw;
}
if (zero_copy_lock)
{
LOG_DEBUG(log, "Removing zero-copy lock");
zero_copy_lock->lock->unlock();
}
/** With `ZSESSIONEXPIRED` or `ZOPERATIONTIMEOUT`, we can inadvertently roll back local changes to the parts.
* This is not a problem, because in this case the entry will remain in the queue, and we will try again.
*/

View File

@ -7,6 +7,7 @@
#include <Storages/MergeTree/ReplicatedMergeMutateTaskBase.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQueue.h>
#include <Storages/MergeTree/ReplicatedMergeTreeLogEntry.h>
#include <Storages/MergeTree/ZeroCopyLock.h>
namespace DB
{
@ -41,6 +42,7 @@ private:
MutationCommandsConstPtr commands;
MergeTreeData::TransactionUniquePtr transaction_ptr{nullptr};
std::optional<ZeroCopyLock> zero_copy_lock;
StopwatchUniquePtr stopwatch_ptr{nullptr};
MergeTreeData::MutableDataPartPtr new_part{nullptr};

View File

@ -31,6 +31,7 @@ bool ReplicatedMergeMutateTaskBase::executeStep()
{
std::exception_ptr saved_exception;
bool retryable_error = false;
try
{
/// We don't have any backoff for failed entries
@ -46,16 +47,19 @@ bool ReplicatedMergeMutateTaskBase::executeStep()
{
/// If no one has the right part, probably not all replicas work; We will not write to log with Error level.
LOG_INFO(log, fmt::runtime(e.displayText()));
retryable_error = true;
}
else if (e.code() == ErrorCodes::ABORTED)
{
/// Interrupted merge or downloading a part is not an error.
LOG_INFO(log, fmt::runtime(e.message()));
retryable_error = true;
}
else if (e.code() == ErrorCodes::PART_IS_TEMPORARILY_LOCKED)
{
/// Part cannot be added temporarily
LOG_INFO(log, fmt::runtime(e.displayText()));
retryable_error = true;
storage.cleanup_thread.wakeup();
}
else
@ -80,7 +84,7 @@ bool ReplicatedMergeMutateTaskBase::executeStep()
}
if (saved_exception)
if (!retryable_error && saved_exception)
{
std::lock_guard lock(storage.queue.state_mutex);

View File

@ -57,17 +57,6 @@ bool ReplicatedMergeTreeMergeStrategyPicker::shouldMergeOnSingleReplica(const Re
}
bool ReplicatedMergeTreeMergeStrategyPicker::shouldMergeOnSingleReplicaShared(const ReplicatedMergeTreeLogEntryData & entry) const
{
time_t threshold = remote_fs_execute_merges_on_single_replica_time_threshold;
return (
threshold > 0 /// feature turned on
&& entry.type == ReplicatedMergeTreeLogEntry::MERGE_PARTS /// it is a merge log entry
&& entry.create_time + threshold > time(nullptr) /// not too much time waited
);
}
/// that will return the same replica name for ReplicatedMergeTreeLogEntry on all the replicas (if the replica set is the same).
/// that way each replica knows who is responsible for doing a certain merge.

View File

@ -52,10 +52,6 @@ public:
/// and we may need to do a fetch (or postpone) instead of merge
bool shouldMergeOnSingleReplica(const ReplicatedMergeTreeLogEntryData & entry) const;
/// return true if remote_fs_execute_merges_on_single_replica_time_threshold feature is active
/// and we may need to do a fetch (or postpone) instead of merge
bool shouldMergeOnSingleReplicaShared(const ReplicatedMergeTreeLogEntryData & entry) const;
/// returns the replica name
/// and it's not current replica should do the merge
std::optional<String> pickReplicaToExecuteMerge(const ReplicatedMergeTreeLogEntryData & entry);

View File

@ -1205,31 +1205,32 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
return false;
}
bool should_execute_on_single_replica = merge_strategy_picker.shouldMergeOnSingleReplica(entry);
if (!should_execute_on_single_replica)
const auto data_settings = data.getSettings();
if (data_settings->allow_remote_fs_zero_copy_replication)
{
/// Separate check. If we use only s3, check remote_fs_execute_merges_on_single_replica_time_threshold as well.
auto disks = storage.getDisks();
bool only_s3_storage = true;
for (const auto & disk : disks)
if (disk->getType() != DB::DiskType::S3)
only_s3_storage = false;
if (!disks.empty() && only_s3_storage)
should_execute_on_single_replica = merge_strategy_picker.shouldMergeOnSingleReplicaShared(entry);
if (!disks.empty() && only_s3_storage && storage.checkZeroCopyLockExists(entry.new_part_name, disks[0]))
{
out_postpone_reason = "Not executing merge/mutation for the part " + entry.new_part_name
+ ", waiting other replica to execute it and will fetch after.";
return false;
}
}
if (should_execute_on_single_replica)
if (merge_strategy_picker.shouldMergeOnSingleReplica(entry))
{
auto replica_to_execute_merge = merge_strategy_picker.pickReplicaToExecuteMerge(entry);
if (replica_to_execute_merge && !merge_strategy_picker.isMergeFinishedByReplica(replica_to_execute_merge.value(), entry))
{
out_postpone_reason = fmt::format(
"Not executing merge for the part {}, waiting for {} to execute merge.",
entry.new_part_name, replica_to_execute_merge.value());
LOG_DEBUG(log, fmt::runtime(out_postpone_reason));
String reason = "Not executing merge for the part " + entry.new_part_name
+ ", waiting for " + replica_to_execute_merge.value() + " to execute merge.";
out_postpone_reason = reason;
return false;
}
}
@ -1242,7 +1243,6 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
* Setting max_bytes_to_merge_at_max_space_in_pool still working for regular merges,
* because the leader replica does not assign merges of greater size (except OPTIMIZE PARTITION and OPTIMIZE FINAL).
*/
const auto data_settings = data.getSettings();
bool ignore_max_size = false;
if (entry.type == LogEntry::MERGE_PARTS)
{
@ -1674,6 +1674,7 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep
{
LOG_TRACE(log, "Marking mutation {} done because it is <= mutation_pointer ({})", znode, mutation_pointer);
mutation.is_done = true;
mutation.latest_fail_reason.clear();
alter_sequence.finishDataAlter(mutation.entry->alter_version, lock);
if (mutation.parts_to_do.size() != 0)
{
@ -1718,6 +1719,7 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep
{
LOG_TRACE(log, "Mutation {} is done", entry->znode_name);
it->second.is_done = true;
it->second.latest_fail_reason.clear();
if (entry->isAlterMutation())
{
LOG_TRACE(log, "Finishing data alter with version {} for entry {}", entry->alter_version, entry->znode_name);

View File

@ -42,15 +42,31 @@ static void localBackupImpl(const DiskPtr & disk, const String & source_path, co
}
}
namespace
{
class CleanupOnFail
{
public:
explicit CleanupOnFail(std::function<void()> && cleaner_) : cleaner(cleaner_), is_success(false) {}
explicit CleanupOnFail(std::function<void()> && cleaner_)
: cleaner(cleaner_)
{}
~CleanupOnFail()
{
if (!is_success)
cleaner();
{
/// We are trying to handle race condition here. So if we was not
/// able to backup directory try to remove garbage, but it's ok if
/// it doesn't exist.
try
{
cleaner();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
void success()
@ -60,8 +76,9 @@ public:
private:
std::function<void()> cleaner;
bool is_success;
bool is_success{false};
};
}
void localBackup(const DiskPtr & disk, const String & source_path, const String & destination_path, std::optional<size_t> max_level)
{
@ -73,11 +90,11 @@ void localBackup(const DiskPtr & disk, const String & source_path, const String
size_t try_no = 0;
const size_t max_tries = 10;
CleanupOnFail cleanup([&](){disk->removeRecursive(destination_path);});
CleanupOnFail cleanup([disk, destination_path]() { disk->removeRecursive(destination_path); });
/** Files in the directory can be permanently added and deleted.
* If some file is deleted during an attempt to make a backup, then try again,
* because it's important to take into account any new files that might appear.
* because it's important to take into account any new files that might appear.
*/
while (true)
{

View File

@ -745,9 +745,8 @@ std::shared_ptr<MergeMutateSelectedEntry> StorageMergeTree::selectPartsToMerge(
{
while (true)
{
UInt64 disk_space = getStoragePolicy()->getMaxUnreservedFreeSpace();
select_decision = merger_mutator.selectAllPartsToMergeWithinPartition(
future_part, disk_space, can_merge, partition_id, final, metadata_snapshot, out_disable_reason, optimize_skip_merged_partitions);
future_part, can_merge, partition_id, final, metadata_snapshot, out_disable_reason, optimize_skip_merged_partitions);
auto timeout_ms = getSettings()->lock_acquire_timeout_for_background_operations.totalMilliseconds();
auto timeout = std::chrono::milliseconds(timeout_ms);

View File

@ -1290,6 +1290,7 @@ void StorageReplicatedMergeTree::checkPartChecksumsAndAddCommitOps(const zkutil:
{
String columns_str;
String checksums_str;
if (zookeeper->tryGet(fs::path(current_part_path) / "columns", columns_str) &&
zookeeper->tryGet(fs::path(current_part_path) / "checksums", checksums_str))
{
@ -3786,24 +3787,41 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
if (source_part)
{
MinimalisticDataPartChecksums source_part_checksums;
source_part_checksums.computeTotalChecksums(source_part->checksums);
auto source_part_header = ReplicatedMergeTreePartHeader::fromColumnsAndChecksums(
source_part->getColumns(), source_part->checksums);
MinimalisticDataPartChecksums desired_checksums;
String part_path = fs::path(source_replica_path) / "parts" / part_name;
String part_znode = zookeeper->get(part_path);
std::optional<ReplicatedMergeTreePartHeader> desired_part_header;
if (!part_znode.empty())
desired_checksums = ReplicatedMergeTreePartHeader::fromString(part_znode).getChecksums();
{
desired_part_header = ReplicatedMergeTreePartHeader::fromString(part_znode);
}
else
{
String desired_checksums_str = zookeeper->get(fs::path(part_path) / "checksums");
desired_checksums = MinimalisticDataPartChecksums::deserializeFrom(desired_checksums_str);
String columns_str;
String checksums_str;
if (zookeeper->tryGet(fs::path(part_path) / "columns", columns_str) &&
zookeeper->tryGet(fs::path(part_path) / "checksums", checksums_str))
{
desired_part_header = ReplicatedMergeTreePartHeader::fromColumnsAndChecksumsZNodes(columns_str, checksums_str);
}
else
{
LOG_INFO(log, "Not checking checksums of part {} with replica {} because part was removed from ZooKeeper", part_name, source_replica_path);
}
}
if (source_part_checksums == desired_checksums)
/// Checking both checksums and columns hash. For example we can have empty part
/// with same checksums but different columns. And we attaching it exception will
/// be thrown.
if (desired_part_header
&& source_part_header.getColumnsHash() == desired_part_header->getColumnsHash()
&& source_part_header.getChecksums() == desired_part_header->getChecksums())
{
LOG_TRACE(log, "Found local part {} with the same checksums as {}", source_part->name, part_name);
LOG_TRACE(log, "Found local part {} with the same checksums and columns hash as {}", source_part->name, part_name);
part_to_clone = source_part;
}
}
@ -4349,7 +4367,6 @@ bool StorageReplicatedMergeTree::optimize(
};
auto zookeeper = getZooKeeperAndAssertNotReadonly();
UInt64 disk_space = getStoragePolicy()->getMaxUnreservedFreeSpace();
const auto storage_settings_ptr = getSettings();
auto metadata_snapshot = getInMemoryMetadataPtr();
std::vector<ReplicatedMergeTreeLogEntryData> merge_entries;
@ -4382,7 +4399,7 @@ bool StorageReplicatedMergeTree::optimize(
else
{
select_decision = merger_mutator.selectAllPartsToMergeWithinPartition(
future_merged_part, disk_space, can_merge, partition_id, final, metadata_snapshot,
future_merged_part, can_merge, partition_id, final, metadata_snapshot,
&disable_reason, query_context->getSettingsRef().optimize_skip_merged_partitions);
}
@ -7154,10 +7171,35 @@ void StorageReplicatedMergeTree::createTableSharedID()
}
void StorageReplicatedMergeTree::lockSharedData(const IMergeTreeDataPart & part) const
void StorageReplicatedMergeTree::lockSharedDataTemporary(const String & part_name, const String & part_id, const DiskPtr & disk) const
{
if (!part.volume)
if (!disk || !disk->supportZeroCopyReplication())
return;
zkutil::ZooKeeperPtr zookeeper = tryGetZooKeeper();
if (!zookeeper)
return;
String id = part_id;
boost::replace_all(id, "/", "_");
Strings zc_zookeeper_paths = getZeroCopyPartPath(*getSettings(), disk->getType(), getTableSharedID(),
part_name, zookeeper_path);
for (const auto & zc_zookeeper_path : zc_zookeeper_paths)
{
String zookeeper_node = fs::path(zc_zookeeper_path) / id / replica_name;
LOG_TRACE(log, "Set zookeeper temporary ephemeral lock {}", zookeeper_node);
createZeroCopyLockNode(zookeeper, zookeeper_node, zkutil::CreateMode::Ephemeral, false);
}
}
void StorageReplicatedMergeTree::lockSharedData(const IMergeTreeDataPart & part, bool replace_existing_lock) const
{
if (!part.volume || !part.isStoredOnDisk())
return;
DiskPtr disk = part.volume->getDisk();
if (!disk || !disk->supportZeroCopyReplication())
return;
@ -7175,8 +7217,9 @@ void StorageReplicatedMergeTree::lockSharedData(const IMergeTreeDataPart & part)
{
String zookeeper_node = fs::path(zc_zookeeper_path) / id / replica_name;
LOG_TRACE(log, "Set zookeeper lock {}", zookeeper_node);
createZeroCopyLockNode(zookeeper, zookeeper_node);
LOG_TRACE(log, "Set zookeeper persistent lock {}", zookeeper_node);
createZeroCopyLockNode(zookeeper, zookeeper_node, zkutil::CreateMode::Persistent, replace_existing_lock);
}
}
@ -7189,21 +7232,28 @@ bool StorageReplicatedMergeTree::unlockSharedData(const IMergeTreeDataPart & par
bool StorageReplicatedMergeTree::unlockSharedData(const IMergeTreeDataPart & part, const String & name) const
{
if (!part.volume)
if (!part.volume || !part.isStoredOnDisk())
return true;
DiskPtr disk = part.volume->getDisk();
if (!disk || !disk->supportZeroCopyReplication())
return true;
zkutil::ZooKeeperPtr zookeeper = tryGetZooKeeper();
if (!zookeeper)
/// If part is temporary refcount file may be absent
auto ref_count_path = fs::path(part.getFullRelativePath()) / IMergeTreeDataPart::FILE_FOR_REFERENCES_CHECK;
if (disk->exists(ref_count_path))
{
auto ref_count = disk->getRefCount(ref_count_path);
if (ref_count > 0) /// Keep part shard info for frozen backups
return false;
}
else
{
/// Temporary part with some absent file cannot be locked in shared mode
return true;
}
auto ref_count = part.getNumberOfRefereneces();
if (ref_count > 0) /// Keep part shard info for frozen backups
return false;
return unlockSharedDataByID(part.getUniqueId(), getTableSharedID(), name, replica_name, disk, zookeeper, *getSettings(), log,
return unlockSharedDataByID(part.getUniqueId(), getTableSharedID(), name, replica_name, disk, getZooKeeper(), *getSettings(), log,
zookeeper_path);
}
@ -7216,7 +7266,7 @@ bool StorageReplicatedMergeTree::unlockSharedDataByID(String part_id, const Stri
Strings zc_zookeeper_paths = getZeroCopyPartPath(settings, disk->getType(), table_uuid, part_name, zookeeper_path_old);
bool res = true;
bool part_has_no_more_locks = true;
for (const auto & zc_zookeeper_path : zc_zookeeper_paths)
{
@ -7236,7 +7286,7 @@ bool StorageReplicatedMergeTree::unlockSharedDataByID(String part_id, const Stri
if (!children.empty())
{
LOG_TRACE(logger, "Found zookeper locks for {}", zookeeper_part_uniq_node);
res = false;
part_has_no_more_locks = false;
continue;
}
@ -7265,7 +7315,7 @@ bool StorageReplicatedMergeTree::unlockSharedDataByID(String part_id, const Stri
}
}
return res;
return part_has_no_more_locks;
}
@ -7387,8 +7437,31 @@ Strings StorageReplicatedMergeTree::getZeroCopyPartPath(const MergeTreeSettings
return res;
}
bool StorageReplicatedMergeTree::checkZeroCopyLockExists(const String & part_name, const DiskPtr & disk)
{
auto path = getZeroCopyPartPath(part_name, disk);
if (path)
{
/// FIXME
auto lock_path = fs::path(*path) / "part_exclusive_lock";
if (getZooKeeper()->exists(lock_path))
{
return true;
}
}
std::optional<ZeroCopyLock> StorageReplicatedMergeTree::tryCreateZeroCopyExclusiveLock(const DataPartPtr & part, const DiskPtr & disk)
return false;
}
std::optional<String> StorageReplicatedMergeTree::getZeroCopyPartPath(const String & part_name, const DiskPtr & disk)
{
if (!disk || !disk->supportZeroCopyReplication())
return std::nullopt;
return getZeroCopyPartPath(*getSettings(), disk->getType(), getTableSharedID(), part_name, zookeeper_path)[0];
}
std::optional<ZeroCopyLock> StorageReplicatedMergeTree::tryCreateZeroCopyExclusiveLock(const String & part_name, const DiskPtr & disk)
{
if (!disk || !disk->supportZeroCopyReplication())
return std::nullopt;
@ -7397,8 +7470,7 @@ std::optional<ZeroCopyLock> StorageReplicatedMergeTree::tryCreateZeroCopyExclusi
if (!zookeeper)
return std::nullopt;
String zc_zookeeper_path = getZeroCopyPartPath(*getSettings(), disk->getType(), getTableSharedID(),
part->name, zookeeper_path)[0];
String zc_zookeeper_path = *getZeroCopyPartPath(part_name, disk);
/// Just recursively create ancestors for lock
zookeeper->createAncestors(zc_zookeeper_path);
@ -7633,7 +7705,7 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP
}
void StorageReplicatedMergeTree::createZeroCopyLockNode(const zkutil::ZooKeeperPtr & zookeeper, const String & zookeeper_node)
void StorageReplicatedMergeTree::createZeroCopyLockNode(const zkutil::ZooKeeperPtr & zookeeper, const String & zookeeper_node, int32_t mode, bool replace_existing_lock)
{
/// In rare case other replica can remove path between createAncestors and createIfNotExists
/// So we make up to 5 attempts
@ -7643,8 +7715,22 @@ void StorageReplicatedMergeTree::createZeroCopyLockNode(const zkutil::ZooKeeperP
try
{
zookeeper->createAncestors(zookeeper_node);
zookeeper->createIfNotExists(zookeeper_node, "lock");
break;
if (replace_existing_lock && zookeeper->exists(zookeeper_node))
{
Coordination::Requests ops;
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_node, -1));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_node, "", mode));
Coordination::Responses responses;
auto error = zookeeper->tryMulti(ops, responses);
if (error == Coordination::Error::ZOK)
break;
}
else
{
auto error = zookeeper->tryCreate(zookeeper_node, "", mode);
if (error == Coordination::Error::ZOK || error == Coordination::Error::ZNODEEXISTS)
break;
}
}
catch (const zkutil::KeeperException & e)
{
@ -7673,10 +7759,12 @@ public:
table_shared_id = storage.getTableSharedID();
}
void save(DiskPtr disk, const String & path) const
void save(DiskPtr data_disk, const String & path) const
{
auto metadata_disk = data_disk->getMetadataDiskIfExistsOrSelf();
auto file_path = getFileName(path);
auto buffer = disk->writeMetaFile(file_path, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite);
auto buffer = metadata_disk->writeFile(file_path, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite);
writeIntText(version, *buffer);
buffer->write("\n", 1);
writeBoolText(is_replicated, *buffer);
@ -7691,12 +7779,14 @@ public:
buffer->write("\n", 1);
}
bool load(DiskPtr disk, const String & path)
bool load(DiskPtr data_disk, const String & path)
{
auto metadata_disk = data_disk->getMetadataDiskIfExistsOrSelf();
auto file_path = getFileName(path);
if (!disk->exists(file_path))
if (!metadata_disk->exists(file_path))
return false;
auto buffer = disk->readMetaFile(file_path, ReadSettings(), {});
auto buffer = metadata_disk->readFile(file_path, ReadSettings(), {});
readIntText(version, *buffer);
if (version != 1)
{
@ -7717,9 +7807,10 @@ public:
return true;
}
static void clean(DiskPtr disk, const String & path)
static void clean(DiskPtr data_disk, const String & path)
{
disk->removeMetaFileIfExists(getFileName(path));
auto metadata_disk = data_disk->getMetadataDiskIfExistsOrSelf();
metadata_disk->removeFileIfExists(getFileName(path));
}
private:
@ -7773,22 +7864,18 @@ bool StorageReplicatedMergeTree::removeSharedDetachedPart(DiskPtr disk, const St
zkutil::ZooKeeperPtr zookeeper = getZooKeeper();
if (zookeeper)
fs::path checksums = fs::path(path) / IMergeTreeDataPart::FILE_FOR_REFERENCES_CHECK;
if (disk->exists(checksums))
{
fs::path checksums = fs::path(path) / "checksums.txt";
if (disk->exists(checksums))
if (disk->getRefCount(checksums) == 0)
{
auto ref_count = disk->getRefCount(checksums);
if (ref_count == 0)
{
String id = disk->getUniqueId(checksums);
keep_shared = !StorageReplicatedMergeTree::unlockSharedDataByID(id, table_uuid, part_name,
detached_replica_name, disk, zookeeper, getContext()->getReplicatedMergeTreeSettings(), log,
detached_zookeeper_path);
}
else
keep_shared = true;
String id = disk->getUniqueId(checksums);
keep_shared = !StorageReplicatedMergeTree::unlockSharedDataByID(id, table_uuid, part_name,
detached_replica_name, disk, zookeeper, getContext()->getReplicatedMergeTreeSettings(), log,
detached_zookeeper_path);
}
else
keep_shared = true;
}
disk->removeSharedRecursive(path, keep_shared);

View File

@ -231,7 +231,9 @@ public:
bool executeFetchShared(const String & source_replica, const String & new_part_name, const DiskPtr & disk, const String & path);
/// Lock part in zookeeper for use shared data in several nodes
void lockSharedData(const IMergeTreeDataPart & part) const override;
void lockSharedData(const IMergeTreeDataPart & part, bool replace_existing_lock) const override;
void lockSharedDataTemporary(const String & part_name, const String & part_id, const DiskPtr & disk) const;
/// Unlock shared data part in zookeeper
/// Return true if data unlocked
@ -758,7 +760,7 @@ private:
static Strings getZeroCopyPartPath(const MergeTreeSettings & settings, DiskType disk_type, const String & table_uuid,
const String & part_name, const String & zookeeper_path_old);
static void createZeroCopyLockNode(const zkutil::ZooKeeperPtr & zookeeper, const String & zookeeper_node);
static void createZeroCopyLockNode(const zkutil::ZooKeeperPtr & zookeeper, const String & zookeeper_node, int32_t mode = zkutil::CreateMode::Persistent, bool replace_existing_lock = false);
bool removeDetachedPart(DiskPtr disk, const String & path, const String & part_name, bool is_freezed) override;
@ -771,9 +773,14 @@ private:
// Create table id if needed
void createTableSharedID();
bool checkZeroCopyLockExists(const String & part_name, const DiskPtr & disk);
std::optional<String> getZeroCopyPartPath(const String & part_name, const DiskPtr & disk);
/// Create ephemeral lock in zookeeper for part and disk which support zero copy replication.
/// If somebody already holding the lock -- return std::nullopt.
std::optional<ZeroCopyLock> tryCreateZeroCopyExclusiveLock(const DataPartPtr & part, const DiskPtr & disk) override;
std::optional<ZeroCopyLock> tryCreateZeroCopyExclusiveLock(const String & part_name, const DiskPtr & disk) override;
protected:
/** If not 'attach', either creates a new table in ZK, or adds a replica to an existing table.

View File

@ -270,6 +270,9 @@ CI_CONFIG = {
"Stateless tests (release, DatabaseReplicated, actions)": {
"required_build": "package_release",
},
"Stateless tests (release, s3 storage, actions)": {
"required_build": "package_release",
},
"Stress test (address, actions)": {
"required_build": "package_asan",
},

View File

@ -30,6 +30,10 @@ def get_additional_envs(check_name, run_by_hash_num, run_by_hash_total):
if 'wide parts enabled' in check_name:
result.append("USE_POLYMORPHIC_PARTS=1")
#temporary
if 's3 storage' in check_name:
result.append("USE_S3_STORAGE_FOR_MERGE_TREE=1")
if run_by_hash_total != 0:
result.append(f"RUN_BY_HASH_NUM={run_by_hash_num}")
result.append(f"RUN_BY_HASH_TOTAL={run_by_hash_total}")

View File

@ -329,6 +329,7 @@ class FailureReason(enum.Enum):
FAST_ONLY = "running fast tests only"
NO_LONG = "not running long tests"
REPLICATED_DB = "replicated-database"
S3_STORAGE = "s3-storage"
BUILD = "not running for current build"
# UNKNOWN reasons
@ -463,6 +464,10 @@ class TestCase:
elif tags and ('no-replicated-database' in tags) and args.replicated_database:
return FailureReason.REPLICATED_DB
elif tags and ('no-s3-storage' in tags) and args.s3_storage:
return FailureReason.S3_STORAGE
elif tags:
for build_flag in args.build_flags:
if 'no-' + build_flag in tags:
@ -1369,6 +1374,7 @@ if __name__ == '__main__':
parser.add_argument('--client-option', nargs='+', help='Specify additional client argument')
parser.add_argument('--print-time', action='store_true', dest='print_time', help='Print test time')
parser.add_argument('--check-zookeeper-session', action='store_true', help='Check ZooKeeper session uptime to determine if failed test should be retried')
parser.add_argument('--s3-storage', action='store_true', default=False, help='Run tests over s3 storage')
parser.add_argument('--run-by-hash-num', type=int, help='Run tests matching crc32(test_name) % run_by_hash_total == run_by_hash_num')
parser.add_argument('--run-by-hash-total', type=int, help='Total test groups for crc32(test_name) % run_by_hash_total == run_by_hash_num')

View File

@ -0,0 +1,24 @@
<clickhouse>
<storage_configuration>
<disks>
<s3>
<type>s3</type>
<endpoint>http://localhost:11111/test/test/</endpoint>
<access_key_id>clickhouse</access_key_id>
<secret_access_key>clickhouse</secret_access_key>
</s3>
</disks>
<policies>
<s3>
<volumes>
<main>
<disk>s3</disk>
</main>
</volumes>
</s3>
</policies>
</storage_configuration>
<merge_tree>
<storage_policy>s3</storage_policy>
</merge_tree>
</clickhouse>

View File

@ -107,4 +107,8 @@ if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]
sudo chgrp clickhouse /var/lib/clickhouse2
fi
if [[ -n "$USE_S3_STORAGE_FOR_MERGE_TREE" ]] && [[ "$USE_S3_STORAGE_FOR_MERGE_TREE" -eq 1 ]]; then
ln -sf $SRC_PATH/config.d/s3_storage_policy_by_default.xml $DEST_SERVER_PATH/config.d/
fi
ln -sf $SRC_PATH/client_config.xml $DEST_CLIENT_PATH/config.xml

View File

@ -329,6 +329,7 @@ def test_s3_zero_copy_unfreeze(cluster):
check_objects_exisis(cluster, objects01)
node1.query("TRUNCATE TABLE unfreeze_test")
node2.query("SYSTEM SYNC REPLICA unfreeze_test")
objects11 = node1.get_backuped_s3_objects("s31", "freeze_backup1")
objects12 = node2.get_backuped_s3_objects("s31", "freeze_backup2")
@ -373,6 +374,7 @@ def test_s3_zero_copy_drop_detached(cluster):
node1.query("ALTER TABLE drop_detached_test FREEZE WITH NAME 'detach_backup1'")
node1.query("INSERT INTO drop_detached_test VALUES (1)")
node1.query("ALTER TABLE drop_detached_test FREEZE WITH NAME 'detach_backup2'")
node2.query("SYSTEM SYNC REPLICA drop_detached_test")
objects1 = node1.get_backuped_s3_objects("s31", "detach_backup1")
objects2 = node1.get_backuped_s3_objects("s31", "detach_backup2")
@ -384,6 +386,8 @@ def test_s3_zero_copy_drop_detached(cluster):
node1.query("ALTER TABLE drop_detached_test DETACH PARTITION '0'")
node1.query("ALTER TABLE drop_detached_test DETACH PARTITION '1'")
node2.query("SYSTEM SYNC REPLICA drop_detached_test")
wait_mutations(node1, "drop_detached_test", 10)
wait_mutations(node2, "drop_detached_test", 10)
@ -391,6 +395,7 @@ def test_s3_zero_copy_drop_detached(cluster):
check_objects_exisis(cluster, objects2)
node2.query("ALTER TABLE drop_detached_test DROP DETACHED PARTITION '1'", settings={"allow_drop_detached": 1})
node1.query("SYSTEM SYNC REPLICA drop_detached_test")
wait_mutations(node1, "drop_detached_test", 10)
wait_mutations(node2, "drop_detached_test", 10)
@ -398,6 +403,7 @@ def test_s3_zero_copy_drop_detached(cluster):
check_objects_exisis(cluster, objects2)
node1.query("ALTER TABLE drop_detached_test DROP DETACHED PARTITION '1'", settings={"allow_drop_detached": 1})
node2.query("SYSTEM SYNC REPLICA drop_detached_test")
wait_mutations(node1, "drop_detached_test", 10)
wait_mutations(node2, "drop_detached_test", 10)
@ -405,12 +411,14 @@ def test_s3_zero_copy_drop_detached(cluster):
check_objects_not_exisis(cluster, objects_diff)
node1.query("ALTER TABLE drop_detached_test DROP DETACHED PARTITION '0'", settings={"allow_drop_detached": 1})
node2.query("SYSTEM SYNC REPLICA drop_detached_test")
wait_mutations(node1, "drop_detached_test", 10)
wait_mutations(node2, "drop_detached_test", 10)
check_objects_exisis(cluster, objects1)
node2.query("ALTER TABLE drop_detached_test DROP DETACHED PARTITION '0'", settings={"allow_drop_detached": 1})
node1.query("SYSTEM SYNC REPLICA drop_detached_test")
wait_mutations(node1, "drop_detached_test", 10)
wait_mutations(node2, "drop_detached_test", 10)

View File

@ -1,3 +1,4 @@
-- Tags: no-s3-storage
SELECT '*** Not partitioned ***';
DROP TABLE IF EXISTS not_partitioned;

View File

@ -1,5 +1,5 @@
#!/usr/bin/env bash
# Tags: zookeeper, no-parallel
# Tags: zookeeper, no-parallel, no-s3-storage
# Because REPLACE PARTITION does not forces immediate removal of replaced data parts from local filesystem
# (it tries to do it as quick as possible, but it still performed in separate thread asynchronously)

View File

@ -1,5 +1,5 @@
#!/usr/bin/env bash
# Tags: long
# Tags: long, no-s3-storage
set -e

View File

@ -1,4 +1,4 @@
-- Tags: long
-- Tags: long, no-s3-storage
DROP TABLE IF EXISTS check_system_tables;

View File

@ -1,4 +1,4 @@
-- Tags: no-parallel
-- Tags: no-parallel, no-s3-storage
drop table if exists ttl;
set mutations_sync = 2;

View File

@ -1,3 +1,5 @@
-- Tags: no-s3-storage
-- Output slightly different plan
drop table if exists t;
create table t (a Int, b Int) engine = MergeTree order by (a, b) settings index_granularity = 400;

View File

@ -1,5 +1,5 @@
#!/usr/bin/env bash
# Tags: no-tsan, no-asan, no-ubsan, no-msan, no-debug, no-parallel, no-fasttest
# Tags: no-tsan, no-asan, no-ubsan, no-msan, no-debug, no-parallel, no-fasttest, no-s3-storage
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh

View File

@ -1,4 +1,4 @@
-- Tags: no-debug, no-parallel, long
-- Tags: no-debug, no-parallel, long, no-s3-storage
DROP TABLE IF EXISTS table_with_single_pk;

View File

@ -1,3 +1,4 @@
-- Tags: no-s3-storage
select * from system.settings where name = 'send_timeout';
select * from system.merge_tree_settings order by length(description) limit 1;

View File

@ -1,3 +1,4 @@
-- Tags: no-s3-storage
DROP TABLE IF EXISTS test_01343;
CREATE TABLE test_01343 (x String) ENGINE = MergeTree ORDER BY tuple() SETTINGS min_bytes_for_wide_part = 0;
INSERT INTO test_01343 VALUES ('Hello, world');

View File

@ -1,3 +1,4 @@
-- Tags: no-s3-storage
DROP TABLE IF EXISTS test_01344;
CREATE TABLE test_01344 (x String, INDEX idx (x) TYPE set(10) GRANULARITY 1) ENGINE = MergeTree ORDER BY tuple() SETTINGS min_bytes_for_wide_part = 0;
INSERT INTO test_01344 VALUES ('Hello, world');

View File

@ -1,3 +1,4 @@
-- Tags: no-s3-storage
SELECT '====array====';
DROP TABLE IF EXISTS t_arr;
CREATE TABLE t_arr (a Array(UInt32)) ENGINE = MergeTree ORDER BY tuple() SETTINGS min_bytes_for_wide_part = 0;

View File

@ -1,6 +1,8 @@
#!/usr/bin/env bash
# Tags: long, no-replicated-database
# Tags: long, no-replicated-database, no-s3-storage
# Tag no-replicated-database: Fails due to additional replicas or shards
# Tag no-s3-storage: Merge assigned to replica 2, but replication queues are stopped for it
set -e

View File

@ -1,4 +1,4 @@
-- Tags: long, replica, no-replicated-database, no-parallel
-- Tags: long, replica, no-replicated-database, no-parallel, no-s3-storage
-- Tag no-replicated-database: Fails due to additional replicas or shards
-- Tag no-parallel: static zk path

View File

@ -1,3 +1,5 @@
-- Tags: no-s3-storage
-- Temporary supressed
DROP TABLE IF EXISTS nested;
SET flatten_nested = 0;

View File

@ -1,3 +1,5 @@
-- Tags: no-s3-storage
DROP TABLE IF EXISTS data_01551;
CREATE TABLE data_01551

View File

@ -1,4 +1,4 @@
-- Tags: zookeeper, no-replicated-database, no-parallel
-- Tags: zookeeper, no-replicated-database, no-parallel, no-s3-storage
drop table if exists x;

View File

@ -1,3 +1,4 @@
-- Tags: no-s3-storage
set allow_experimental_projection_optimization = 1, force_optimize_projection = 1;
drop table if exists tp;

View File

@ -1,4 +1,5 @@
#!/usr/bin/env bash
# Tags: no-s3-storage
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh

View File

@ -1,3 +1,4 @@
-- Tags: no-s3-storage
drop table if exists tp;
create table tp (d1 Int32, d2 Int32, eventcnt Int64, projection p (select sum(eventcnt) group by d1)) engine = MergeTree order by (d1, d2);

View File

@ -1,3 +1,4 @@
-- Tags: no-s3-storage
drop table if exists d;
create table d (i int, j int) engine MergeTree partition by i % 2 order by tuple() settings index_granularity = 1;

View File

@ -1,3 +1,4 @@
-- Tags: no-s3-storage
drop table if exists t;
create table t (i int, j int) engine MergeTree order by i;

View File

@ -1,4 +1,6 @@
#!/usr/bin/env bash
# Tags: no-s3-storage
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh

View File

@ -1,3 +1,4 @@
-- Tags: no-s3-storage
set allow_experimental_projection_optimization = 1;
drop table if exists x;

View File

@ -1,3 +1,4 @@
-- Tags: no-s3-storage
set allow_experimental_projection_optimization = 1;
drop table if exists t;

View File

@ -1,3 +1,5 @@
-- Tags: no-s3-storage
drop table if exists tp;
create table tp (x Int32, y Int32, projection p (select x, y order by x)) engine = MergeTree order by y;

View File

@ -1,4 +1,4 @@
-- Tags: long
-- Tags: long, no-s3-storage
drop table if exists tp_1;
drop table if exists tp_2;

View File

@ -1,3 +1,6 @@
-- Tags: no-s3-storage
DROP TABLE IF EXISTS t;
drop table if exists tp;
create table tp (type Int32, eventcnt UInt64, projection p (select sum(eventcnt), type group by type order by sum(eventcnt))) engine = MergeTree order by type; -- { serverError 583 }

View File

@ -1,3 +1,4 @@
-- Tags: no-s3-storage
drop table if exists t;
create table t (i int, j int, k int, projection p (select * order by j)) engine MergeTree order by i settings index_granularity = 1;

View File

@ -1,3 +1,4 @@
-- Tags: no-s3-storage
drop table if exists x;
create table x (i UInt64, j UInt64, k UInt64, projection agg (select sum(j), avg(k) group by i), projection norm (select j, k order by i)) engine MergeTree order by tuple();

View File

@ -1,3 +1,4 @@
-- Tags: no-s3-storage
drop table if exists x;
create table x (i int) engine MergeTree order by tuple();

View File

@ -1,3 +1,4 @@
-- Tags: no-s3-storage
DROP TABLE IF EXISTS t;
CREATE TABLE t (`key` UInt32, `created_at` Date, `value` UInt32, PROJECTION xxx (SELECT key, created_at, sum(value) GROUP BY key, created_at)) ENGINE = MergeTree PARTITION BY toYYYYMM(created_at) ORDER BY key;
@ -5,3 +6,5 @@ CREATE TABLE t (`key` UInt32, `created_at` Date, `value` UInt32, PROJECTION xxx
INSERT INTO t SELECT 1 AS key, today() + (number % 30), number FROM numbers(1000);
ALTER TABLE t UPDATE value = 0 WHERE (value > 0) AND (created_at >= '2021-12-21') SETTINGS allow_experimental_projection_optimization = 1;
DROP TABLE IF EXISTS t;

View File

@ -1,3 +1,4 @@
-- Tags: no-s3-storage
drop table if exists z;
create table z (pk Int64, d Date, id UInt64, c UInt64) Engine MergeTree partition by d order by pk ;

View File

@ -1,3 +1,4 @@
-- Tags: no-s3-storage
drop table if exists tp;
create table tp (x Int32, y Int32, projection p (select x, y order by x)) engine = MergeTree order by y settings min_rows_for_compact_part = 2, min_rows_for_wide_part = 4, min_bytes_for_compact_part = 16, min_bytes_for_wide_part = 32;

View File

@ -1,4 +1,4 @@
-- Tags: long, no-parallel
-- Tags: long, no-parallel, no-s3-storage
drop table if exists t;

View File

@ -1,3 +1,4 @@
-- Tags: no-s3-storage
drop table if exists t;
create table t (s UInt16, l UInt16, projection p (select s, l order by l)) engine MergeTree order by s;

View File

@ -1,3 +1,4 @@
-- Tags: no-s3-storage
drop table if exists t;
create table t (x UInt32) engine = MergeTree order by tuple() settings index_granularity = 8;

View File

@ -1,3 +1,4 @@
-- Tags: no-s3-storage
drop table if exists projection_test;
create table projection_test (`sum(block_count)` UInt64, domain_alias UInt64 alias length(domain), datetime DateTime, domain LowCardinality(String), x_id String, y_id String, block_count Int64, retry_count Int64, duration Int64, kbytes Int64, buffer_time Int64, first_time Int64, total_bytes Nullable(UInt64), valid_bytes Nullable(UInt64), completed_bytes Nullable(UInt64), fixed_bytes Nullable(UInt64), force_bytes Nullable(UInt64), projection p (select toStartOfMinute(datetime) dt_m, countIf(first_time = 0) / count(), avg((kbytes * 8) / duration), count(), sum(block_count) / sum(duration), avg(block_count / duration), sum(buffer_time) / sum(duration), avg(buffer_time / duration), sum(valid_bytes) / sum(total_bytes), sum(completed_bytes) / sum(total_bytes), sum(fixed_bytes) / sum(total_bytes), sum(force_bytes) / sum(total_bytes), sum(valid_bytes) / sum(total_bytes), sum(retry_count) / sum(duration), avg(retry_count / duration), countIf(block_count > 0) / count(), countIf(first_time = 0) / count(), uniqHLL12(x_id), uniqHLL12(y_id) group by dt_m, domain)) engine MergeTree partition by toDate(datetime) order by (toStartOfTenMinutes(datetime), domain);

View File

@ -1,3 +1,4 @@
-- Tags: no-s3-storage
drop table if exists projection_without_key;
create table projection_without_key (key UInt32, PROJECTION x (SELECT sum(key) group by key % 3)) engine MergeTree order by key;

View File

@ -1,4 +1,4 @@
-- Tags: distributed
-- Tags: distributed, no-s3-storage
drop table if exists projection_test;

View File

@ -1,3 +1,4 @@
-- Tags: no-s3-storage
DROP TABLE IF EXISTS sparse_tuple;
CREATE TABLE sparse_tuple (id UInt64, t Tuple(a UInt64, s String))