mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-27 01:51:59 +00:00
Merge remote-tracking branch 'upstream/master' into group_by_all
This commit is contained in:
commit
c9576e3761
4
.snyk
Normal file
4
.snyk
Normal file
@ -0,0 +1,4 @@
|
||||
# Snyk (https://snyk.io) policy file
|
||||
exclude:
|
||||
global:
|
||||
- tests/**
|
@ -33,7 +33,7 @@ RUN arch=${TARGETARCH:-amd64} \
|
||||
# lts / testing / prestable / etc
|
||||
ARG REPO_CHANNEL="stable"
|
||||
ARG REPOSITORY="https://packages.clickhouse.com/tgz/${REPO_CHANNEL}"
|
||||
ARG VERSION="22.10.1.1877"
|
||||
ARG VERSION="22.10.2.11"
|
||||
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
|
||||
|
||||
# user/group precreated explicitly with fixed uid/gid on purpose.
|
||||
|
@ -21,7 +21,7 @@ RUN sed -i "s|http://archive.ubuntu.com|${apt_archive}|g" /etc/apt/sources.list
|
||||
|
||||
ARG REPO_CHANNEL="stable"
|
||||
ARG REPOSITORY="deb https://packages.clickhouse.com/deb ${REPO_CHANNEL} main"
|
||||
ARG VERSION="22.10.1.1877"
|
||||
ARG VERSION="22.10.2.11"
|
||||
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
|
||||
|
||||
# set non-empty deb_location_url url to create a docker image
|
||||
|
@ -212,4 +212,4 @@ Templates:
|
||||
|
||||
## How to Build Documentation
|
||||
|
||||
You can build your documentation manually by following the instructions in [docs/tools/README.md](../docs/tools/README.md). Also, our CI runs the documentation build after the `documentation` label is added to PR. You can see the results of a build in the GitHub interface. If you have no permissions to add labels, a reviewer of your PR will add it.
|
||||
You can build your documentation manually by following the instructions in the docs repo [contrib-writing-guide](https://github.com/ClickHouse/clickhouse-docs/blob/main/contrib-writing-guide.md). Also, our CI runs the documentation build after the `documentation` label is added to PR. You can see the results of a build in the GitHub interface. If you have no permissions to add labels, a reviewer of your PR will add it.
|
||||
|
18
docs/changelogs/v22.10.2.11-stable.md
Normal file
18
docs/changelogs/v22.10.2.11-stable.md
Normal file
@ -0,0 +1,18 @@
|
||||
---
|
||||
sidebar_position: 1
|
||||
sidebar_label: 2022
|
||||
---
|
||||
|
||||
# 2022 Changelog
|
||||
|
||||
### ClickHouse release v22.10.2.11-stable (d2bfcaba002) FIXME as compared to v22.10.1.1877-stable (98ab5a3c189)
|
||||
|
||||
#### Bug Fix (user-visible misbehavior in official stable or prestable release)
|
||||
|
||||
* Backported in [#42750](https://github.com/ClickHouse/ClickHouse/issues/42750): A segmentation fault related to DNS & c-ares has been reported. The below error ocurred in multiple threads: ``` 2022-09-28 15:41:19.008,2022.09.28 15:41:19.008088 [ 356 ] {} <Fatal> BaseDaemon: ######################################## 2022-09-28 15:41:19.008,"2022.09.28 15:41:19.008147 [ 356 ] {} <Fatal> BaseDaemon: (version 22.8.5.29 (official build), build id: 92504ACA0B8E2267) (from thread 353) (no query) Received signal Segmentation fault (11)" 2022-09-28 15:41:19.008,2022.09.28 15:41:19.008196 [ 356 ] {} <Fatal> BaseDaemon: Address: 0xf Access: write. Address not mapped to object. 2022-09-28 15:41:19.008,2022.09.28 15:41:19.008216 [ 356 ] {} <Fatal> BaseDaemon: Stack trace: 0x188f8212 0x1626851b 0x1626a69e 0x16269b3f 0x16267eab 0x13cf8284 0x13d24afc 0x13c5217e 0x14ec2495 0x15ba440f 0x15b9d13b 0x15bb2699 0x1891ccb3 0x1891e00d 0x18ae0769 0x18ade022 0x7f76aa985609 0x7f76aa8aa133 2022-09-28 15:41:19.008,2022.09.28 15:41:19.008274 [ 356 ] {} <Fatal> BaseDaemon: 2. Poco::Net::IPAddress::family() const @ 0x188f8212 in /usr/bin/clickhouse 2022-09-28 15:41:19.008,2022.09.28 15:41:19.008297 [ 356 ] {} <Fatal> BaseDaemon: 3. ? @ 0x1626851b in /usr/bin/clickhouse 2022-09-28 15:41:19.008,2022.09.28 15:41:19.008309 [ 356 ] {} <Fatal> BaseDaemon: 4. ? @ 0x1626a69e in /usr/bin/clickhouse ```. [#42234](https://github.com/ClickHouse/ClickHouse/pull/42234) ([Arthur Passos](https://github.com/arthurpassos)).
|
||||
* Backported in [#42793](https://github.com/ClickHouse/ClickHouse/issues/42793): Fix a bug in ParserFunction that could have led to a segmentation fault. [#42724](https://github.com/ClickHouse/ClickHouse/pull/42724) ([Nikolay Degterinsky](https://github.com/evillique)).
|
||||
|
||||
#### NOT FOR CHANGELOG / INSIGNIFICANT
|
||||
|
||||
* Always run `BuilderReport` and `BuilderSpecialReport` in all CI types [#42684](https://github.com/ClickHouse/ClickHouse/pull/42684) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).
|
||||
|
@ -1,10 +1,7 @@
|
||||
---
|
||||
slug: /en/operations/update
|
||||
sidebar_position: 47
|
||||
sidebar_label: ClickHouse Upgrade
|
||||
---
|
||||
|
||||
# ClickHouse Upgrade
|
||||
[//]: # (This file is included in Manage > Updates)
|
||||
|
||||
## Self-managed ClickHouse Upgrade
|
||||
|
||||
If ClickHouse was installed from `deb` packages, execute the following commands on the server:
|
||||
|
@ -178,7 +178,7 @@ Columns:
|
||||
- `view_definition` ([String](../../sql-reference/data-types/string.md)) — `SELECT` query for view.
|
||||
- `check_option` ([String](../../sql-reference/data-types/string.md)) — `NONE`, no checking.
|
||||
- `is_updatable` ([Enum8](../../sql-reference/data-types/enum.md)) — `NO`, the view is not updated.
|
||||
- `is_insertable_into` ([Enum8](../../sql-reference/data-types/enum.md)) — Shows whether the created view is [materialized](../../sql-reference/statements/create/view/#materialized). Possible values:
|
||||
- `is_insertable_into` ([Enum8](../../sql-reference/data-types/enum.md)) — Shows whether the created view is [materialized](../../sql-reference/statements/create/view.md/#materialized-view). Possible values:
|
||||
- `NO` — The created view is not materialized.
|
||||
- `YES` — The created view is materialized.
|
||||
- `is_trigger_updatable` ([Enum8](../../sql-reference/data-types/enum.md)) — `NO`, the trigger is not updated.
|
||||
|
@ -68,6 +68,5 @@ thread_id: 54
|
||||
|
||||
**See Also**
|
||||
|
||||
- [Managing ReplicatedMergeTree Tables](../../sql-reference/statements/system/#query-language-system-replicated)
|
||||
- [Managing ReplicatedMergeTree Tables](../../sql-reference/statements/system.md/#managing-replicatedmergetree-tables)
|
||||
|
||||
[Original article](https://clickhouse.com/docs/en/operations/system_tables/replicated_fetches) <!--hide-->
|
||||
|
@ -4,7 +4,7 @@ sidebar_position: 38
|
||||
sidebar_label: FUNCTION
|
||||
---
|
||||
|
||||
# CREATE FUNCTION
|
||||
# CREATE FUNCTION — user defined function (UDF)
|
||||
|
||||
Creates a user defined function from a lambda expression. The expression must consist of function parameters, constants, operators, or other function calls.
|
||||
|
||||
|
@ -5,6 +5,7 @@
|
||||
#include <algorithm>
|
||||
#include <utility>
|
||||
#include <base/range.h>
|
||||
#include <base/unaligned.h>
|
||||
#include <Common/hex.h>
|
||||
#include <Common/StringUtils/StringUtils.h>
|
||||
|
||||
@ -55,8 +56,11 @@ inline bool parseIPv4(const char * src, unsigned char * dst)
|
||||
}
|
||||
if (*(src - 1) != '\0')
|
||||
return false;
|
||||
|
||||
#if __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
|
||||
reverseMemcpy(dst, &result, sizeof(result));
|
||||
#else
|
||||
memcpy(dst, &result, sizeof(result));
|
||||
#endif
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -141,7 +141,7 @@ std::unique_ptr<WriteBufferFromFileBase> AzureObjectStorage::writeObject( /// NO
|
||||
return std::make_unique<WriteIndirectBufferFromRemoteFS>(std::move(buffer), std::move(finalize_callback), object.absolute_path);
|
||||
}
|
||||
|
||||
void AzureObjectStorage::listPrefix(const std::string & path, RelativePathsWithSize & children) const
|
||||
void AzureObjectStorage::findAllFiles(const std::string & path, RelativePathsWithSize & children) const
|
||||
{
|
||||
auto client_ptr = client.get();
|
||||
|
||||
|
@ -84,7 +84,7 @@ public:
|
||||
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
|
||||
const WriteSettings & write_settings = {}) override;
|
||||
|
||||
void listPrefix(const std::string & path, RelativePathsWithSize & children) const override;
|
||||
void findAllFiles(const std::string & path, RelativePathsWithSize & children) const override;
|
||||
|
||||
/// Remove file. Throws exception if file doesn't exists or it's a directory.
|
||||
void removeObject(const StoredObject & object) override;
|
||||
|
@ -282,9 +282,9 @@ std::unique_ptr<IObjectStorage> CachedObjectStorage::cloneObjectStorage(
|
||||
return object_storage->cloneObjectStorage(new_namespace, config, config_prefix, context);
|
||||
}
|
||||
|
||||
void CachedObjectStorage::listPrefix(const std::string & path, RelativePathsWithSize & children) const
|
||||
void CachedObjectStorage::findAllFiles(const std::string & path, RelativePathsWithSize & children) const
|
||||
{
|
||||
object_storage->listPrefix(path, children);
|
||||
object_storage->findAllFiles(path, children);
|
||||
}
|
||||
|
||||
ObjectMetadata CachedObjectStorage::getObjectMetadata(const std::string & path) const
|
||||
|
@ -72,7 +72,7 @@ public:
|
||||
const std::string & config_prefix,
|
||||
ContextPtr context) override;
|
||||
|
||||
void listPrefix(const std::string & path, RelativePathsWithSize & children) const override;
|
||||
void findAllFiles(const std::string & path, RelativePathsWithSize & children) const override;
|
||||
|
||||
ObjectMetadata getObjectMetadata(const std::string & path) const override;
|
||||
|
||||
|
@ -390,7 +390,7 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::restoreFiles(IObjectStorage *
|
||||
};
|
||||
|
||||
RelativePathsWithSize children;
|
||||
source_object_storage->listPrefix(restore_information.source_path, children);
|
||||
source_object_storage->findAllFiles(restore_information.source_path, children);
|
||||
|
||||
restore_files(children);
|
||||
|
||||
@ -540,7 +540,7 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::restoreFileOperations(IObject
|
||||
};
|
||||
|
||||
RelativePathsWithSize children;
|
||||
source_object_storage->listPrefix(restore_information.source_path + "operations/", children);
|
||||
source_object_storage->findAllFiles(restore_information.source_path + "operations/", children);
|
||||
restore_file_operations(children);
|
||||
|
||||
if (restore_information.detached)
|
||||
|
@ -101,18 +101,6 @@ std::unique_ptr<WriteBufferFromFileBase> HDFSObjectStorage::writeObject( /// NOL
|
||||
}
|
||||
|
||||
|
||||
void HDFSObjectStorage::listPrefix(const std::string & path, RelativePathsWithSize & children) const
|
||||
{
|
||||
const size_t begin_of_path = path.find('/', path.find("//") + 2);
|
||||
int32_t num_entries;
|
||||
auto * files_list = hdfsListDirectory(hdfs_fs.get(), path.substr(begin_of_path).c_str(), &num_entries);
|
||||
if (num_entries == -1)
|
||||
throw Exception(ErrorCodes::HDFS_ERROR, "HDFSDelete failed with path: " + path);
|
||||
|
||||
for (int32_t i = 0; i < num_entries; ++i)
|
||||
children.emplace_back(files_list[i].mName, files_list[i].mSize);
|
||||
}
|
||||
|
||||
/// Remove file. Throws exception if file doesn't exists or it's a directory.
|
||||
void HDFSObjectStorage::removeObject(const StoredObject & object)
|
||||
{
|
||||
|
@ -85,8 +85,6 @@ public:
|
||||
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
|
||||
const WriteSettings & write_settings = {}) override;
|
||||
|
||||
void listPrefix(const std::string & path, RelativePathsWithSize & children) const override;
|
||||
|
||||
/// Remove file. Throws exception if file doesn't exists or it's a directory.
|
||||
void removeObject(const StoredObject & object) override;
|
||||
|
||||
|
@ -11,10 +11,16 @@
|
||||
#include <Disks/DirectoryIterator.h>
|
||||
#include <Disks/WriteMode.h>
|
||||
#include <Disks/ObjectStorages/IObjectStorage.h>
|
||||
#include <Common/ErrorCodes.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
}
|
||||
|
||||
class IMetadataStorage;
|
||||
|
||||
/// Tries to provide some "transactions" interface, which allow
|
||||
@ -33,32 +39,71 @@ public:
|
||||
/// General purpose methods
|
||||
|
||||
/// Write metadata string to file
|
||||
virtual void writeStringToFile(const std::string & path, const std::string & data) = 0;
|
||||
virtual void writeStringToFile(const std::string & /* path */, const std::string & /* data */)
|
||||
{
|
||||
throwNotImplemented();
|
||||
}
|
||||
|
||||
virtual void setLastModified(const std::string & path, const Poco::Timestamp & timestamp) = 0;
|
||||
virtual void setLastModified(const std::string & /* path */, const Poco::Timestamp & /* timestamp */)
|
||||
{
|
||||
throwNotImplemented();
|
||||
}
|
||||
|
||||
virtual bool supportsChmod() const = 0;
|
||||
virtual void chmod(const String & path, mode_t mode) = 0;
|
||||
virtual void chmod(const String & /* path */, mode_t /* mode */)
|
||||
{
|
||||
throwNotImplemented();
|
||||
}
|
||||
|
||||
virtual void setReadOnly(const std::string & path) = 0;
|
||||
virtual void setReadOnly(const std::string & /* path */)
|
||||
{
|
||||
throwNotImplemented();
|
||||
}
|
||||
|
||||
virtual void unlinkFile(const std::string & path) = 0;
|
||||
virtual void unlinkFile(const std::string & /* path */)
|
||||
{
|
||||
throwNotImplemented();
|
||||
}
|
||||
|
||||
virtual void createDirectory(const std::string & path) = 0;
|
||||
virtual void createDirectory(const std::string & /* path */)
|
||||
{
|
||||
throwNotImplemented();
|
||||
}
|
||||
|
||||
virtual void createDirectoryRecursive(const std::string & path) = 0;
|
||||
virtual void createDirectoryRecursive(const std::string & /* path */)
|
||||
{
|
||||
throwNotImplemented();
|
||||
}
|
||||
|
||||
virtual void removeDirectory(const std::string & path) = 0;
|
||||
virtual void removeDirectory(const std::string & /* path */)
|
||||
{
|
||||
throwNotImplemented();
|
||||
}
|
||||
|
||||
virtual void removeRecursive(const std::string & path) = 0;
|
||||
virtual void removeRecursive(const std::string & /* path */)
|
||||
{
|
||||
throwNotImplemented();
|
||||
}
|
||||
|
||||
virtual void createHardLink(const std::string & path_from, const std::string & path_to) = 0;
|
||||
virtual void createHardLink(const std::string & /* path_from */, const std::string & /* path_to */)
|
||||
{
|
||||
throwNotImplemented();
|
||||
}
|
||||
|
||||
virtual void moveFile(const std::string & path_from, const std::string & path_to) = 0;
|
||||
virtual void moveFile(const std::string & /* path_from */, const std::string & /* path_to */)
|
||||
{
|
||||
throwNotImplemented();
|
||||
}
|
||||
|
||||
virtual void moveDirectory(const std::string & path_from, const std::string & path_to) = 0;
|
||||
virtual void moveDirectory(const std::string & /* path_from */, const std::string & /* path_to */)
|
||||
{
|
||||
throwNotImplemented();
|
||||
}
|
||||
|
||||
virtual void replaceFile(const std::string & path_from, const std::string & path_to) = 0;
|
||||
virtual void replaceFile(const std::string & /* path_from */, const std::string & /* path_to */)
|
||||
{
|
||||
throwNotImplemented();
|
||||
}
|
||||
|
||||
/// Metadata related methods
|
||||
|
||||
@ -69,7 +114,10 @@ public:
|
||||
virtual void createMetadataFile(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes) = 0;
|
||||
|
||||
/// Add to new blob to metadata file (way to implement appends)
|
||||
virtual void addBlobToMetadata(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes) = 0;
|
||||
virtual void addBlobToMetadata(const std::string & /* path */, const std::string & /* blob_name */, uint64_t /* size_in_bytes */)
|
||||
{
|
||||
throwNotImplemented();
|
||||
}
|
||||
|
||||
/// Unlink metadata file and do something special if required
|
||||
/// By default just remove file (unlink file).
|
||||
@ -79,6 +127,12 @@ public:
|
||||
}
|
||||
|
||||
virtual ~IMetadataTransaction() = default;
|
||||
|
||||
private:
|
||||
[[noreturn]] static void throwNotImplemented()
|
||||
{
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Operation is not implemented");
|
||||
}
|
||||
};
|
||||
|
||||
using MetadataTransactionPtr = std::shared_ptr<IMetadataTransaction>;
|
||||
@ -106,12 +160,18 @@ public:
|
||||
|
||||
virtual Poco::Timestamp getLastModified(const std::string & path) const = 0;
|
||||
|
||||
virtual time_t getLastChanged(const std::string & path) const = 0;
|
||||
virtual time_t getLastChanged(const std::string & /* path */) const
|
||||
{
|
||||
throwNotImplemented();
|
||||
}
|
||||
|
||||
virtual bool supportsChmod() const = 0;
|
||||
|
||||
virtual bool supportsStat() const = 0;
|
||||
virtual struct stat stat(const String & path) const = 0;
|
||||
virtual struct stat stat(const String & /* path */) const
|
||||
{
|
||||
throwNotImplemented();
|
||||
}
|
||||
|
||||
virtual std::vector<std::string> listDirectory(const std::string & path) const = 0;
|
||||
|
||||
@ -120,20 +180,32 @@ public:
|
||||
virtual uint32_t getHardlinkCount(const std::string & path) const = 0;
|
||||
|
||||
/// Read metadata file to string from path
|
||||
virtual std::string readFileToString(const std::string & path) const = 0;
|
||||
virtual std::string readFileToString(const std::string & /* path */) const
|
||||
{
|
||||
throwNotImplemented();
|
||||
}
|
||||
|
||||
virtual ~IMetadataStorage() = default;
|
||||
|
||||
/// ==== More specific methods. Previous were almost general purpose. ====
|
||||
|
||||
/// Read multiple metadata files into strings and return mapping from file_path -> metadata
|
||||
virtual std::unordered_map<std::string, std::string> getSerializedMetadata(const std::vector<String> & file_paths) const = 0;
|
||||
virtual std::unordered_map<std::string, std::string> getSerializedMetadata(const std::vector<String> & /* file_paths */) const
|
||||
{
|
||||
throwNotImplemented();
|
||||
}
|
||||
|
||||
/// Return object information (absolute_path, bytes_size, ...) for metadata path.
|
||||
/// object_storage_path is absolute.
|
||||
virtual StoredObjects getStorageObjects(const std::string & path) const = 0;
|
||||
|
||||
virtual std::string getObjectStorageRootPath() const = 0;
|
||||
|
||||
private:
|
||||
[[noreturn]] static void throwNotImplemented()
|
||||
{
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Operation is not implemented");
|
||||
}
|
||||
};
|
||||
|
||||
using MetadataStoragePtr = std::shared_ptr<IMetadataStorage>;
|
||||
|
@ -14,6 +14,17 @@ namespace ErrorCodes
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
void IObjectStorage::findAllFiles(const std::string &, RelativePathsWithSize &) const
|
||||
{
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "findAllFiles() is not supported");
|
||||
}
|
||||
void IObjectStorage::getDirectoryContents(const std::string &,
|
||||
RelativePathsWithSize &,
|
||||
std::vector<std::string> &) const
|
||||
{
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "getDirectoryContents() is not supported");
|
||||
}
|
||||
|
||||
IAsynchronousReader & IObjectStorage::getThreadPoolReader()
|
||||
{
|
||||
auto context = Context::getGlobalContextInstance();
|
||||
|
@ -65,8 +65,32 @@ public:
|
||||
/// Object exists or not
|
||||
virtual bool exists(const StoredObject & object) const = 0;
|
||||
|
||||
/// List on prefix, return children (relative paths) with their sizes.
|
||||
virtual void listPrefix(const std::string & path, RelativePathsWithSize & children) const = 0;
|
||||
/// List all objects with specific prefix.
|
||||
///
|
||||
/// For example if you do this over filesystem, you should skip folders and
|
||||
/// return files only, so something like on local filesystem:
|
||||
///
|
||||
/// find . -type f
|
||||
///
|
||||
/// @param children - out files (relative paths) with their sizes.
|
||||
///
|
||||
/// NOTE: It makes sense only for real object storages (S3, Azure), since
|
||||
/// it is used only for one of the following:
|
||||
/// - send_metadata (to restore metadata)
|
||||
/// - see DiskObjectStorage::restoreMetadataIfNeeded()
|
||||
/// - MetadataStorageFromPlainObjectStorage - only for s3_plain disk
|
||||
virtual void findAllFiles(const std::string & path, RelativePathsWithSize & children) const;
|
||||
|
||||
/// Analog of directory content for object storage (object storage does not
|
||||
/// have "directory" definition, but it can be emulated with usage of
|
||||
/// "delimiter"), so this is analog of:
|
||||
///
|
||||
/// find . -maxdepth 1 $path
|
||||
///
|
||||
/// Return files in @files and directories in @directories
|
||||
virtual void getDirectoryContents(const std::string & path,
|
||||
RelativePathsWithSize & files,
|
||||
std::vector<std::string> & directories) const;
|
||||
|
||||
/// Get object metadata if supported. It should be possible to receive
|
||||
/// at least size of object
|
||||
|
@ -104,13 +104,6 @@ std::unique_ptr<WriteBufferFromFileBase> LocalObjectStorage::writeObject( /// NO
|
||||
return std::make_unique<WriteBufferFromFile>(path, buf_size, flags);
|
||||
}
|
||||
|
||||
void LocalObjectStorage::listPrefix(const std::string & path, RelativePathsWithSize & children) const
|
||||
{
|
||||
fs::directory_iterator end_it;
|
||||
for (auto it = fs::directory_iterator(path); it != end_it; ++it)
|
||||
children.emplace_back(it->path().filename(), it->file_size());
|
||||
}
|
||||
|
||||
void LocalObjectStorage::removeObject(const StoredObject & object)
|
||||
{
|
||||
/// For local object storage files are actually removed when "metadata" is removed.
|
||||
|
@ -45,8 +45,6 @@ public:
|
||||
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
|
||||
const WriteSettings & write_settings = {}) override;
|
||||
|
||||
void listPrefix(const std::string & path, RelativePathsWithSize & children) const override;
|
||||
|
||||
void removeObject(const StoredObject & object) override;
|
||||
|
||||
void removeObjects(const StoredObjects & objects) override;
|
||||
|
@ -12,7 +12,6 @@ namespace DB
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
@ -33,194 +32,102 @@ const std::string & MetadataStorageFromPlainObjectStorage::getPath() const
|
||||
{
|
||||
return object_storage_root_path;
|
||||
}
|
||||
std::filesystem::path MetadataStorageFromPlainObjectStorage::getAbsolutePath(const std::string & path) const
|
||||
{
|
||||
return fs::path(object_storage_root_path) / path;
|
||||
}
|
||||
|
||||
bool MetadataStorageFromPlainObjectStorage::exists(const std::string & path) const
|
||||
{
|
||||
auto object = StoredObject::create(*object_storage, fs::path(object_storage_root_path) / path);
|
||||
auto object = StoredObject::create(*object_storage, getAbsolutePath(path));
|
||||
return object_storage->exists(object);
|
||||
}
|
||||
|
||||
bool MetadataStorageFromPlainObjectStorage::isFile(const std::string & path) const
|
||||
{
|
||||
/// NOTE: This check is inaccurate and has excessive API calls
|
||||
return !isDirectory(path) && exists(path);
|
||||
return exists(path) && !isDirectory(path);
|
||||
}
|
||||
|
||||
bool MetadataStorageFromPlainObjectStorage::isDirectory(const std::string & path) const
|
||||
{
|
||||
std::string directory = path;
|
||||
std::string directory = getAbsolutePath(path);
|
||||
trimRight(directory);
|
||||
directory += "/";
|
||||
|
||||
/// NOTE: This check is far from ideal, since it work only if the directory
|
||||
/// really has files, and has excessive API calls
|
||||
RelativePathsWithSize children;
|
||||
object_storage->listPrefix(directory, children);
|
||||
return !children.empty();
|
||||
}
|
||||
|
||||
Poco::Timestamp MetadataStorageFromPlainObjectStorage::getLastModified(const std::string &) const
|
||||
{
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "getLastModified is not implemented for MetadataStorageFromPlainObjectStorage");
|
||||
}
|
||||
|
||||
struct stat MetadataStorageFromPlainObjectStorage::stat(const std::string &) const
|
||||
{
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "stat is not implemented for MetadataStorageFromPlainObjectStorage");
|
||||
}
|
||||
|
||||
time_t MetadataStorageFromPlainObjectStorage::getLastChanged(const std::string &) const
|
||||
{
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "getLastChanged is not implemented for MetadataStorageFromPlainObjectStorage");
|
||||
RelativePathsWithSize files;
|
||||
std::vector<std::string> directories;
|
||||
object_storage->getDirectoryContents(directory, files, directories);
|
||||
return !files.empty() || !directories.empty();
|
||||
}
|
||||
|
||||
uint64_t MetadataStorageFromPlainObjectStorage::getFileSize(const String & path) const
|
||||
{
|
||||
RelativePathsWithSize children;
|
||||
object_storage->listPrefix(path, children);
|
||||
object_storage->findAllFiles(getAbsolutePath(path), children);
|
||||
if (children.empty())
|
||||
return 0;
|
||||
if (children.size() != 1)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "listPrefix() return multiple paths ({}) for {}", children.size(), path);
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "findAllFiles() return multiple paths ({}) for {}", children.size(), path);
|
||||
return children.front().bytes_size;
|
||||
}
|
||||
|
||||
std::vector<std::string> MetadataStorageFromPlainObjectStorage::listDirectory(const std::string & path) const
|
||||
{
|
||||
RelativePathsWithSize children;
|
||||
object_storage->listPrefix(path, children);
|
||||
RelativePathsWithSize files;
|
||||
std::vector<std::string> directories;
|
||||
object_storage->getDirectoryContents(getAbsolutePath(path), files, directories);
|
||||
|
||||
std::vector<std::string> result;
|
||||
for (const auto & path_size : children)
|
||||
{
|
||||
for (const auto & path_size : files)
|
||||
result.push_back(path_size.relative_path);
|
||||
}
|
||||
for (const auto & directory : directories)
|
||||
result.push_back(directory);
|
||||
return result;
|
||||
}
|
||||
|
||||
DirectoryIteratorPtr MetadataStorageFromPlainObjectStorage::iterateDirectory(const std::string & path) const
|
||||
{
|
||||
/// NOTE: this is not required for BACKUP/RESTORE, but this is a first step
|
||||
/// towards MergeTree on plain S3.
|
||||
/// Required for MergeTree
|
||||
auto paths = listDirectory(path);
|
||||
std::vector<std::filesystem::path> fs_paths(paths.begin(), paths.end());
|
||||
return std::make_unique<StaticDirectoryIterator>(std::move(fs_paths));
|
||||
}
|
||||
|
||||
std::string MetadataStorageFromPlainObjectStorage::readFileToString(const std::string &) const
|
||||
{
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "readFileToString is not implemented for MetadataStorageFromPlainObjectStorage");
|
||||
}
|
||||
|
||||
std::unordered_map<String, String> MetadataStorageFromPlainObjectStorage::getSerializedMetadata(const std::vector<String> &) const
|
||||
{
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "getSerializedMetadata is not implemented for MetadataStorageFromPlainObjectStorage");
|
||||
}
|
||||
|
||||
StoredObjects MetadataStorageFromPlainObjectStorage::getStorageObjects(const std::string & path) const
|
||||
{
|
||||
std::string blob_name = object_storage->generateBlobNameForPath(path);
|
||||
|
||||
std::string object_path = fs::path(object_storage_root_path) / blob_name;
|
||||
size_t object_size = getFileSize(object_path);
|
||||
|
||||
auto object = StoredObject::create(*object_storage, object_path, object_size, /* exists */true);
|
||||
size_t object_size = getFileSize(blob_name);
|
||||
auto object = StoredObject::create(*object_storage, getAbsolutePath(blob_name), object_size, /* exists */true);
|
||||
return {std::move(object)};
|
||||
}
|
||||
|
||||
uint32_t MetadataStorageFromPlainObjectStorage::getHardlinkCount(const std::string &) const
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
|
||||
const IMetadataStorage & MetadataStorageFromPlainObjectStorageTransaction::getStorageForNonTransactionalReads() const
|
||||
{
|
||||
return metadata_storage;
|
||||
}
|
||||
|
||||
void MetadataStorageFromPlainObjectStorageTransaction::writeStringToFile(const std::string &, const std::string & /* data */)
|
||||
{
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "writeStringToFile is not implemented for MetadataStorageFromPlainObjectStorage");
|
||||
}
|
||||
|
||||
void MetadataStorageFromPlainObjectStorageTransaction::setLastModified(const std::string &, const Poco::Timestamp &)
|
||||
{
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "setLastModified is not implemented for MetadataStorageFromPlainObjectStorage");
|
||||
}
|
||||
|
||||
void MetadataStorageFromPlainObjectStorageTransaction::unlinkFile(const std::string & path)
|
||||
{
|
||||
auto object = StoredObject::create(*metadata_storage.object_storage, fs::path(metadata_storage.object_storage_root_path) / path);
|
||||
auto object = StoredObject::create(*metadata_storage.object_storage, metadata_storage.getAbsolutePath(path));
|
||||
metadata_storage.object_storage->removeObject(object);
|
||||
}
|
||||
|
||||
void MetadataStorageFromPlainObjectStorageTransaction::removeRecursive(const std::string &)
|
||||
{
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "removeRecursive is not implemented for MetadataStorageFromPlainObjectStorage");
|
||||
}
|
||||
|
||||
void MetadataStorageFromPlainObjectStorageTransaction::createDirectory(const std::string &)
|
||||
{
|
||||
/// Noop. It is an Object Storage not a filesystem.
|
||||
}
|
||||
|
||||
void MetadataStorageFromPlainObjectStorageTransaction::createDirectoryRecursive(const std::string &)
|
||||
{
|
||||
/// Noop. It is an Object Storage not a filesystem.
|
||||
}
|
||||
|
||||
void MetadataStorageFromPlainObjectStorageTransaction::removeDirectory(const std::string &)
|
||||
{
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "removeDirectory is not implemented for MetadataStorageFromPlainObjectStorage");
|
||||
}
|
||||
|
||||
void MetadataStorageFromPlainObjectStorageTransaction::moveFile(const std::string & /* path_from */, const std::string & /* path_to */)
|
||||
{
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "moveFile is not implemented for MetadataStorageFromPlainObjectStorage");
|
||||
}
|
||||
|
||||
void MetadataStorageFromPlainObjectStorageTransaction::moveDirectory(const std::string & /* path_from */, const std::string & /* path_to */)
|
||||
{
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "moveDirectory is not implemented for MetadataStorageFromPlainObjectStorage");
|
||||
}
|
||||
|
||||
void MetadataStorageFromPlainObjectStorageTransaction::replaceFile(const std::string & /* path_from */, const std::string & /* path_to */)
|
||||
{
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "replaceFile is not implemented for MetadataStorageFromPlainObjectStorage");
|
||||
}
|
||||
|
||||
void MetadataStorageFromPlainObjectStorageTransaction::chmod(const String &, mode_t)
|
||||
{
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "chmod is not implemented for MetadataStorageFromPlainObjectStorage");
|
||||
}
|
||||
|
||||
void MetadataStorageFromPlainObjectStorageTransaction::setReadOnly(const std::string &)
|
||||
{
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "setReadOnly is not implemented for MetadataStorageFromPlainObjectStorage");
|
||||
}
|
||||
|
||||
void MetadataStorageFromPlainObjectStorageTransaction::createHardLink(const std::string & /* path_from */, const std::string & /* path_to */)
|
||||
{
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "createHardLink is not implemented for MetadataStorageFromPlainObjectStorage");
|
||||
}
|
||||
|
||||
void MetadataStorageFromPlainObjectStorageTransaction::createEmptyMetadataFile(const std::string &)
|
||||
{
|
||||
/// Noop, no separate metadata.
|
||||
}
|
||||
|
||||
void MetadataStorageFromPlainObjectStorageTransaction::createMetadataFile(
|
||||
const std::string &, const std::string & /* blob_name */, uint64_t /* size_in_bytes */)
|
||||
{
|
||||
/// Noop, no separate metadata.
|
||||
}
|
||||
|
||||
void MetadataStorageFromPlainObjectStorageTransaction::addBlobToMetadata(
|
||||
const std::string &, const std::string & /* blob_name */, uint64_t /* size_in_bytes */)
|
||||
{
|
||||
/// Noop, local metadata files is only one file, it is the metadata file itself.
|
||||
}
|
||||
|
||||
void MetadataStorageFromPlainObjectStorageTransaction::unlinkMetadata(const std::string &)
|
||||
{
|
||||
/// Noop, no separate metadata.
|
||||
|
@ -45,31 +45,32 @@ public:
|
||||
|
||||
uint64_t getFileSize(const String & path) const override;
|
||||
|
||||
Poco::Timestamp getLastModified(const std::string & path) const override;
|
||||
|
||||
time_t getLastChanged(const std::string & path) const override;
|
||||
|
||||
bool supportsChmod() const override { return false; }
|
||||
|
||||
bool supportsStat() const override { return false; }
|
||||
|
||||
struct stat stat(const String & path) const override;
|
||||
|
||||
std::vector<std::string> listDirectory(const std::string & path) const override;
|
||||
|
||||
DirectoryIteratorPtr iterateDirectory(const std::string & path) const override;
|
||||
|
||||
std::string readFileToString(const std::string & path) const override;
|
||||
|
||||
std::unordered_map<String, String> getSerializedMetadata(const std::vector<String> & file_paths) const override;
|
||||
|
||||
uint32_t getHardlinkCount(const std::string & path) const override;
|
||||
|
||||
DiskPtr getDisk() const { return {}; }
|
||||
|
||||
StoredObjects getStorageObjects(const std::string & path) const override;
|
||||
|
||||
std::string getObjectStorageRootPath() const override { return object_storage_root_path; }
|
||||
|
||||
Poco::Timestamp getLastModified(const std::string & /* path */) const override
|
||||
{
|
||||
/// Required by MergeTree
|
||||
return {};
|
||||
}
|
||||
|
||||
uint32_t getHardlinkCount(const std::string & /* path */) const override
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
|
||||
bool supportsChmod() const override { return false; }
|
||||
bool supportsStat() const override { return false; }
|
||||
|
||||
private:
|
||||
std::filesystem::path getAbsolutePath(const std::string & path) const;
|
||||
};
|
||||
|
||||
class MetadataStorageFromPlainObjectStorageTransaction final : public IMetadataTransaction
|
||||
@ -83,47 +84,34 @@ public:
|
||||
: metadata_storage(metadata_storage_)
|
||||
{}
|
||||
|
||||
~MetadataStorageFromPlainObjectStorageTransaction() override = default;
|
||||
|
||||
const IMetadataStorage & getStorageForNonTransactionalReads() const final;
|
||||
|
||||
void commit() final {}
|
||||
|
||||
void writeStringToFile(const std::string & path, const std::string & data) override;
|
||||
|
||||
void createEmptyMetadataFile(const std::string & path) override;
|
||||
|
||||
void createMetadataFile(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes) override;
|
||||
const IMetadataStorage & getStorageForNonTransactionalReads() const override;
|
||||
|
||||
void addBlobToMetadata(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes) override;
|
||||
|
||||
void setLastModified(const std::string & path, const Poco::Timestamp & timestamp) override;
|
||||
void createEmptyMetadataFile(const std::string & /* path */) override
|
||||
{
|
||||
/// No metadata, no need to create anything.
|
||||
}
|
||||
|
||||
bool supportsChmod() const override { return false; }
|
||||
|
||||
void chmod(const String & path, mode_t mode) override;
|
||||
|
||||
void setReadOnly(const std::string & path) override;
|
||||
|
||||
void unlinkFile(const std::string & path) override;
|
||||
void createMetadataFile(const std::string & /* path */, const std::string & /* blob_name */, uint64_t /* size_in_bytes */) override
|
||||
{
|
||||
/// Noop
|
||||
}
|
||||
|
||||
void createDirectory(const std::string & path) override;
|
||||
|
||||
void createDirectoryRecursive(const std::string & path) override;
|
||||
|
||||
void removeDirectory(const std::string & path) override;
|
||||
|
||||
void removeRecursive(const std::string & path) override;
|
||||
|
||||
void createHardLink(const std::string & path_from, const std::string & path_to) override;
|
||||
|
||||
void moveFile(const std::string & path_from, const std::string & path_to) override;
|
||||
|
||||
void moveDirectory(const std::string & path_from, const std::string & path_to) override;
|
||||
|
||||
void replaceFile(const std::string & path_from, const std::string & path_to) override;
|
||||
void unlinkFile(const std::string & path) override;
|
||||
|
||||
void unlinkMetadata(const std::string & path) override;
|
||||
|
||||
void commit() override
|
||||
{
|
||||
/// Nothing to commit.
|
||||
}
|
||||
|
||||
bool supportsChmod() const override { return false; }
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -28,7 +28,7 @@
|
||||
#include <aws/s3/model/AbortMultipartUploadRequest.h>
|
||||
|
||||
#include <Common/getRandomASCIIString.h>
|
||||
|
||||
#include <Common/StringUtils/StringUtils.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/MultiVersion.h>
|
||||
|
||||
@ -248,7 +248,7 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
|
||||
std::move(s3_buffer), std::move(finalize_callback), object.absolute_path);
|
||||
}
|
||||
|
||||
void S3ObjectStorage::listPrefix(const std::string & path, RelativePathsWithSize & children) const
|
||||
void S3ObjectStorage::findAllFiles(const std::string & path, RelativePathsWithSize & children) const
|
||||
{
|
||||
auto settings_ptr = s3_settings.get();
|
||||
auto client_ptr = client.get();
|
||||
@ -279,6 +279,49 @@ void S3ObjectStorage::listPrefix(const std::string & path, RelativePathsWithSize
|
||||
} while (outcome.GetResult().GetIsTruncated());
|
||||
}
|
||||
|
||||
void S3ObjectStorage::getDirectoryContents(const std::string & path,
|
||||
RelativePathsWithSize & files,
|
||||
std::vector<std::string> & directories) const
|
||||
{
|
||||
auto settings_ptr = s3_settings.get();
|
||||
auto client_ptr = client.get();
|
||||
|
||||
Aws::S3::Model::ListObjectsV2Request request;
|
||||
request.SetBucket(bucket);
|
||||
request.SetPrefix(path);
|
||||
request.SetMaxKeys(settings_ptr->list_object_keys_size);
|
||||
request.SetDelimiter("/");
|
||||
|
||||
Aws::S3::Model::ListObjectsV2Outcome outcome;
|
||||
do
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::S3ListObjects);
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3ListObjects);
|
||||
outcome = client_ptr->ListObjectsV2(request);
|
||||
throwIfError(outcome);
|
||||
|
||||
auto result = outcome.GetResult();
|
||||
auto result_objects = result.GetContents();
|
||||
auto result_common_prefixes = result.GetCommonPrefixes();
|
||||
|
||||
if (result_objects.empty() && result_common_prefixes.empty())
|
||||
break;
|
||||
|
||||
for (const auto & object : result_objects)
|
||||
files.emplace_back(object.GetKey(), object.GetSize());
|
||||
|
||||
for (const auto & common_prefix : result_common_prefixes)
|
||||
{
|
||||
std::string directory = common_prefix.GetPrefix();
|
||||
/// Make it compatible with std::filesystem::path::filename()
|
||||
trimRight(directory, '/');
|
||||
directories.emplace_back(directory);
|
||||
}
|
||||
|
||||
request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken());
|
||||
} while (outcome.GetResult().GetIsTruncated());
|
||||
}
|
||||
|
||||
void S3ObjectStorage::removeObjectImpl(const StoredObject & object, bool if_exists)
|
||||
{
|
||||
auto client_ptr = client.get();
|
||||
|
@ -105,7 +105,10 @@ public:
|
||||
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
|
||||
const WriteSettings & write_settings = {}) override;
|
||||
|
||||
void listPrefix(const std::string & path, RelativePathsWithSize & children) const override;
|
||||
void findAllFiles(const std::string & path, RelativePathsWithSize & children) const override;
|
||||
void getDirectoryContents(const std::string & path,
|
||||
RelativePathsWithSize & files,
|
||||
std::vector<std::string> & directories) const override;
|
||||
|
||||
/// Uses `DeleteObjectRequest`.
|
||||
void removeObject(const StoredObject & object) override;
|
||||
|
@ -12,7 +12,6 @@ namespace DB
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
extern const int FILE_DOESNT_EXIST;
|
||||
extern const int NETWORK_ERROR;
|
||||
}
|
||||
@ -168,91 +167,11 @@ DirectoryIteratorPtr MetadataStorageFromStaticFilesWebServer::iterateDirectory(c
|
||||
return std::make_unique<StaticDirectoryIterator>(std::move(dir_file_paths));
|
||||
}
|
||||
|
||||
std::string MetadataStorageFromStaticFilesWebServer::readFileToString(const std::string &) const
|
||||
{
|
||||
WebObjectStorage::throwNotAllowed();
|
||||
}
|
||||
|
||||
Poco::Timestamp MetadataStorageFromStaticFilesWebServer::getLastModified(const std::string &) const
|
||||
{
|
||||
return {};
|
||||
}
|
||||
|
||||
time_t MetadataStorageFromStaticFilesWebServer::getLastChanged(const std::string &) const
|
||||
{
|
||||
return {};
|
||||
}
|
||||
|
||||
uint32_t MetadataStorageFromStaticFilesWebServer::getHardlinkCount(const std::string &) const
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
|
||||
const IMetadataStorage & MetadataStorageFromStaticFilesWebServerTransaction::getStorageForNonTransactionalReads() const
|
||||
{
|
||||
return metadata_storage;
|
||||
}
|
||||
|
||||
void MetadataStorageFromStaticFilesWebServerTransaction::writeStringToFile(const std::string &, const std::string &)
|
||||
{
|
||||
WebObjectStorage::throwNotAllowed();
|
||||
}
|
||||
|
||||
void MetadataStorageFromStaticFilesWebServerTransaction::setLastModified(const std::string &, const Poco::Timestamp &)
|
||||
{
|
||||
WebObjectStorage::throwNotAllowed();
|
||||
}
|
||||
|
||||
void MetadataStorageFromStaticFilesWebServerTransaction::unlinkFile(const std::string &)
|
||||
{
|
||||
WebObjectStorage::throwNotAllowed();
|
||||
}
|
||||
|
||||
void MetadataStorageFromStaticFilesWebServerTransaction::removeRecursive(const std::string &)
|
||||
{
|
||||
WebObjectStorage::throwNotAllowed();
|
||||
}
|
||||
|
||||
void MetadataStorageFromStaticFilesWebServerTransaction::removeDirectory(const std::string &)
|
||||
{
|
||||
WebObjectStorage::throwNotAllowed();
|
||||
}
|
||||
|
||||
void MetadataStorageFromStaticFilesWebServerTransaction::moveFile(const std::string &, const std::string &)
|
||||
{
|
||||
WebObjectStorage::throwNotAllowed();
|
||||
}
|
||||
|
||||
void MetadataStorageFromStaticFilesWebServerTransaction::moveDirectory(const std::string &, const std::string &)
|
||||
{
|
||||
WebObjectStorage::throwNotAllowed();
|
||||
}
|
||||
|
||||
void MetadataStorageFromStaticFilesWebServerTransaction::replaceFile(const std::string &, const std::string &)
|
||||
{
|
||||
WebObjectStorage::throwNotAllowed();
|
||||
}
|
||||
|
||||
void MetadataStorageFromStaticFilesWebServerTransaction::setReadOnly(const std::string &)
|
||||
{
|
||||
WebObjectStorage::throwNotAllowed();
|
||||
}
|
||||
|
||||
void MetadataStorageFromStaticFilesWebServerTransaction::createHardLink(const std::string &, const std::string &)
|
||||
{
|
||||
WebObjectStorage::throwNotAllowed();
|
||||
}
|
||||
|
||||
void MetadataStorageFromStaticFilesWebServerTransaction::addBlobToMetadata(const std::string &, const std::string &, uint64_t)
|
||||
{
|
||||
WebObjectStorage::throwNotAllowed();
|
||||
}
|
||||
|
||||
void MetadataStorageFromStaticFilesWebServerTransaction::unlinkMetadata(const std::string &)
|
||||
{
|
||||
WebObjectStorage::throwNotAllowed();
|
||||
}
|
||||
|
||||
void MetadataStorageFromStaticFilesWebServerTransaction::createDirectory(const std::string &)
|
||||
{
|
||||
/// Noop.
|
||||
@ -263,30 +182,4 @@ void MetadataStorageFromStaticFilesWebServerTransaction::createDirectoryRecursiv
|
||||
/// Noop.
|
||||
}
|
||||
|
||||
void MetadataStorageFromStaticFilesWebServerTransaction::createEmptyMetadataFile(const std::string & /* path */)
|
||||
{
|
||||
/// Noop.
|
||||
}
|
||||
|
||||
void MetadataStorageFromStaticFilesWebServerTransaction::createMetadataFile(
|
||||
const std::string & /* path */, const std::string & /* blob_name */, uint64_t /* size_in_bytes */)
|
||||
{
|
||||
/// Noop.
|
||||
}
|
||||
|
||||
void MetadataStorageFromStaticFilesWebServerTransaction::commit()
|
||||
{
|
||||
/// Noop.
|
||||
}
|
||||
|
||||
std::unordered_map<String, String> MetadataStorageFromStaticFilesWebServer::getSerializedMetadata(const std::vector<String> &) const
|
||||
{
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "getSerializedMetadata is not implemented for MetadataStorageFromStaticFilesWebServer");
|
||||
}
|
||||
|
||||
void MetadataStorageFromStaticFilesWebServerTransaction::chmod(const String &, mode_t)
|
||||
{
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "chmod is not implemented for MetadataStorageFromStaticFilesWebServer");
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -36,29 +36,28 @@ public:
|
||||
|
||||
uint64_t getFileSize(const String & path) const override;
|
||||
|
||||
Poco::Timestamp getLastModified(const std::string & path) const override;
|
||||
|
||||
time_t getLastChanged(const std::string & path) const override;
|
||||
|
||||
std::vector<std::string> listDirectory(const std::string & path) const override;
|
||||
|
||||
DirectoryIteratorPtr iterateDirectory(const std::string & path) const override;
|
||||
|
||||
std::string readFileToString(const std::string & path) const override;
|
||||
|
||||
std::unordered_map<String, String> getSerializedMetadata(const std::vector<String> & file_paths) const override;
|
||||
|
||||
uint32_t getHardlinkCount(const std::string & path) const override;
|
||||
|
||||
StoredObjects getStorageObjects(const std::string & path) const override;
|
||||
|
||||
std::string getObjectStorageRootPath() const override { return ""; }
|
||||
|
||||
struct stat stat(const String & /* path */) const override { return {}; }
|
||||
|
||||
Poco::Timestamp getLastModified(const std::string & /* path */) const override
|
||||
{
|
||||
/// Required by MergeTree
|
||||
return {};
|
||||
}
|
||||
uint32_t getHardlinkCount(const std::string & /* path */) const override
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
|
||||
bool supportsChmod() const override { return false; }
|
||||
|
||||
bool supportsStat() const override { return false; }
|
||||
|
||||
struct stat stat(const String &) const override { return {}; }
|
||||
};
|
||||
|
||||
class MetadataStorageFromStaticFilesWebServerTransaction final : public IMetadataTransaction
|
||||
@ -73,47 +72,28 @@ public:
|
||||
: metadata_storage(metadata_storage_)
|
||||
{}
|
||||
|
||||
~MetadataStorageFromStaticFilesWebServerTransaction() override = default;
|
||||
|
||||
const IMetadataStorage & getStorageForNonTransactionalReads() const override;
|
||||
|
||||
void commit() override;
|
||||
void createEmptyMetadataFile(const std::string & /* path */) override
|
||||
{
|
||||
/// No metadata, no need to create anything.
|
||||
}
|
||||
|
||||
void writeStringToFile(const std::string & path, const std::string & data) override;
|
||||
|
||||
void createEmptyMetadataFile(const std::string & path) override;
|
||||
|
||||
void createMetadataFile(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes) override;
|
||||
|
||||
void addBlobToMetadata(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes) override;
|
||||
|
||||
void setLastModified(const std::string & path, const Poco::Timestamp & timestamp) override;
|
||||
|
||||
void setReadOnly(const std::string & path) override;
|
||||
|
||||
void unlinkFile(const std::string & path) override;
|
||||
void createMetadataFile(const std::string & /* path */, const std::string & /* blob_name */, uint64_t /* size_in_bytes */) override
|
||||
{
|
||||
/// Noop
|
||||
}
|
||||
|
||||
void createDirectory(const std::string & path) override;
|
||||
|
||||
void createDirectoryRecursive(const std::string & path) override;
|
||||
|
||||
void removeDirectory(const std::string & path) override;
|
||||
|
||||
void removeRecursive(const std::string & path) override;
|
||||
|
||||
void createHardLink(const std::string & path_from, const std::string & path_to) override;
|
||||
|
||||
void moveFile(const std::string & path_from, const std::string & path_to) override;
|
||||
|
||||
void moveDirectory(const std::string & path_from, const std::string & path_to) override;
|
||||
|
||||
void replaceFile(const std::string & path_from, const std::string & path_to) override;
|
||||
|
||||
void unlinkMetadata(const std::string & path) override;
|
||||
void commit() override
|
||||
{
|
||||
/// Nothing to commit.
|
||||
}
|
||||
|
||||
bool supportsChmod() const override { return false; }
|
||||
|
||||
void chmod(const String &, mode_t) override;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -178,17 +178,6 @@ std::unique_ptr<ReadBufferFromFileBase> WebObjectStorage::readObject( /// NOLINT
|
||||
}
|
||||
}
|
||||
|
||||
void WebObjectStorage::listPrefix(const std::string & path, RelativePathsWithSize & children) const
|
||||
{
|
||||
for (const auto & [file_path, file_info] : files)
|
||||
{
|
||||
if (file_info.type == FileType::File && file_path.starts_with(path))
|
||||
{
|
||||
children.emplace_back(file_path, file_info.size);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void WebObjectStorage::throwNotAllowed()
|
||||
{
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Only read-only operations are supported");
|
||||
|
@ -55,8 +55,6 @@ public:
|
||||
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
|
||||
const WriteSettings & write_settings = {}) override;
|
||||
|
||||
void listPrefix(const std::string & path, RelativePathsWithSize & children) const override;
|
||||
|
||||
void removeObject(const StoredObject & object) override;
|
||||
|
||||
void removeObjects(const StoredObjects & objects) override;
|
||||
|
@ -180,6 +180,13 @@ void Service::processQuery(const HTMLForm & params, ReadBuffer & /*body*/, Write
|
||||
std::sregex_token_iterator(remote_fs_metadata.begin(), remote_fs_metadata.end(), re, -1),
|
||||
std::sregex_token_iterator());
|
||||
|
||||
bool send_projections = client_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION;
|
||||
if (send_projections)
|
||||
{
|
||||
const auto & projections = part->getProjectionParts();
|
||||
writeBinary(projections.size(), out);
|
||||
}
|
||||
|
||||
if (data_settings->allow_remote_fs_zero_copy_replication &&
|
||||
/// In memory data part does not have metadata yet.
|
||||
!isInMemoryPart(part) &&
|
||||
@ -190,33 +197,15 @@ void Service::processQuery(const HTMLForm & params, ReadBuffer & /*body*/, Write
|
||||
{
|
||||
/// Send metadata if the receiver's capability covers the source disk type.
|
||||
response.addCookie({"remote_fs_metadata", disk_type});
|
||||
if (client_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION)
|
||||
{
|
||||
const auto & projections = part->getProjectionParts();
|
||||
writeBinary(projections.size(), out);
|
||||
}
|
||||
|
||||
sendPartFromDiskRemoteMeta(part, out, true, part->getProjectionParts());
|
||||
sendPartFromDiskRemoteMeta(part, out, true, send_projections);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (client_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION)
|
||||
{
|
||||
const auto & projections = part->getProjectionParts();
|
||||
writeBinary(projections.size(), out);
|
||||
if (isInMemoryPart(part))
|
||||
sendPartFromMemory(part, out, projections);
|
||||
else
|
||||
sendPartFromDisk(part, out, client_protocol_version, projections);
|
||||
}
|
||||
if (isInMemoryPart(part))
|
||||
sendPartFromMemory(part, out, send_projections);
|
||||
else
|
||||
{
|
||||
if (isInMemoryPart(part))
|
||||
sendPartFromMemory(part, out);
|
||||
else
|
||||
sendPartFromDisk(part, out, client_protocol_version);
|
||||
}
|
||||
sendPartFromDisk(part, out, client_protocol_version, send_projections);
|
||||
}
|
||||
catch (const NetException &)
|
||||
{
|
||||
@ -238,20 +227,23 @@ void Service::processQuery(const HTMLForm & params, ReadBuffer & /*body*/, Write
|
||||
}
|
||||
|
||||
void Service::sendPartFromMemory(
|
||||
const MergeTreeData::DataPartPtr & part, WriteBuffer & out, const std::map<String, std::shared_ptr<IMergeTreeDataPart>> & projections)
|
||||
const MergeTreeData::DataPartPtr & part, WriteBuffer & out, bool send_projections)
|
||||
{
|
||||
auto metadata_snapshot = data.getInMemoryMetadataPtr();
|
||||
for (const auto & [name, projection] : projections)
|
||||
if (send_projections)
|
||||
{
|
||||
auto projection_sample_block = metadata_snapshot->projections.get(name).sample_block;
|
||||
auto part_in_memory = asInMemoryPart(projection);
|
||||
if (!part_in_memory)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Projection {} of part {} is not stored in memory", name, part->name);
|
||||
for (const auto & [name, projection] : part->getProjectionParts())
|
||||
{
|
||||
auto projection_sample_block = metadata_snapshot->projections.get(name).sample_block;
|
||||
auto part_in_memory = asInMemoryPart(projection);
|
||||
if (!part_in_memory)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Projection {} of part {} is not stored in memory", name, part->name);
|
||||
|
||||
writeStringBinary(name, out);
|
||||
projection->checksums.write(out);
|
||||
NativeWriter block_out(out, 0, projection_sample_block);
|
||||
block_out.write(part_in_memory->block);
|
||||
writeStringBinary(name, out);
|
||||
projection->checksums.write(out);
|
||||
NativeWriter block_out(out, 0, projection_sample_block);
|
||||
block_out.write(part_in_memory->block);
|
||||
}
|
||||
}
|
||||
|
||||
auto part_in_memory = asInMemoryPart(part);
|
||||
@ -269,7 +261,7 @@ MergeTreeData::DataPart::Checksums Service::sendPartFromDisk(
|
||||
const MergeTreeData::DataPartPtr & part,
|
||||
WriteBuffer & out,
|
||||
int client_protocol_version,
|
||||
const std::map<String, std::shared_ptr<IMergeTreeDataPart>> & projections)
|
||||
bool send_projections)
|
||||
{
|
||||
/// We'll take a list of files from the list of checksums.
|
||||
MergeTreeData::DataPart::Checksums checksums = part->checksums;
|
||||
@ -277,7 +269,8 @@ MergeTreeData::DataPart::Checksums Service::sendPartFromDisk(
|
||||
auto file_names_without_checksums = part->getFileNamesWithoutChecksums();
|
||||
for (const auto & file_name : file_names_without_checksums)
|
||||
{
|
||||
if (client_protocol_version < REPLICATION_PROTOCOL_VERSION_WITH_PARTS_DEFAULT_COMPRESSION && file_name == IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME)
|
||||
if (client_protocol_version < REPLICATION_PROTOCOL_VERSION_WITH_PARTS_DEFAULT_COMPRESSION
|
||||
&& file_name == IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME)
|
||||
continue;
|
||||
|
||||
checksums.files[file_name] = {};
|
||||
@ -288,11 +281,10 @@ MergeTreeData::DataPart::Checksums Service::sendPartFromDisk(
|
||||
{
|
||||
// Get rid of projection files
|
||||
checksums.files.erase(name + ".proj");
|
||||
auto it = projections.find(name);
|
||||
if (it != projections.end())
|
||||
if (send_projections)
|
||||
{
|
||||
writeStringBinary(name, out);
|
||||
MergeTreeData::DataPart::Checksums projection_checksum = sendPartFromDisk(it->second, out, client_protocol_version);
|
||||
MergeTreeData::DataPart::Checksums projection_checksum = sendPartFromDisk(projection, out, client_protocol_version, false);
|
||||
data_checksums.addFile(name + ".proj", projection_checksum.getTotalSizeOnDisk(), projection_checksum.getTotalChecksumUInt128());
|
||||
}
|
||||
else if (part->checksums.has(name + ".proj"))
|
||||
@ -337,18 +329,15 @@ MergeTreeData::DataPart::Checksums Service::sendPartFromDisk(
|
||||
return data_checksums;
|
||||
}
|
||||
|
||||
MergeTreeData::DataPart::Checksums Service::sendPartFromDiskRemoteMeta(
|
||||
void Service::sendPartFromDiskRemoteMeta(
|
||||
const MergeTreeData::DataPartPtr & part,
|
||||
WriteBuffer & out,
|
||||
bool send_part_id,
|
||||
const std::map<String, std::shared_ptr<IMergeTreeDataPart>> & projections)
|
||||
bool send_projections)
|
||||
{
|
||||
const auto * data_part_storage_on_disk = dynamic_cast<const DataPartStorageOnDisk *>(&part->getDataPartStorage());
|
||||
if (!data_part_storage_on_disk)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Storage '{}' doesn't support zero-copy replication", part->getDataPartStorage().getDiskName());
|
||||
|
||||
if (!data_part_storage_on_disk->supportZeroCopyReplication())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Disk '{}' doesn't support zero-copy replication", data_part_storage_on_disk->getDiskName());
|
||||
auto data_part_storage = part->getDataPartStoragePtr();
|
||||
if (!data_part_storage->supportZeroCopyReplication())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Disk '{}' doesn't support zero-copy replication", data_part_storage->getDiskName());
|
||||
|
||||
/// We'll take a list of files from the list of checksums.
|
||||
MergeTreeData::DataPart::Checksums checksums = part->checksums;
|
||||
@ -369,30 +358,20 @@ MergeTreeData::DataPart::Checksums Service::sendPartFromDiskRemoteMeta(
|
||||
paths.push_back(fs::path(part->getDataPartStorage().getRelativePath()) / it.first);
|
||||
|
||||
/// Serialized metadatadatas with zero ref counts.
|
||||
auto metadatas = data_part_storage_on_disk->getSerializedMetadata(paths);
|
||||
auto metadatas = data_part_storage->getSerializedMetadata(paths);
|
||||
|
||||
if (send_part_id)
|
||||
{
|
||||
String part_id = data_part_storage_on_disk->getUniqueId();
|
||||
String part_id = data_part_storage->getUniqueId();
|
||||
writeStringBinary(part_id, out);
|
||||
}
|
||||
|
||||
MergeTreeData::DataPart::Checksums data_checksums;
|
||||
for (const auto & [name, projection] : part->getProjectionParts())
|
||||
if (send_projections)
|
||||
{
|
||||
auto it = projections.find(name);
|
||||
if (it != projections.end())
|
||||
for (const auto & [name, projection] : part->getProjectionParts())
|
||||
{
|
||||
|
||||
writeStringBinary(name, out);
|
||||
MergeTreeData::DataPart::Checksums projection_checksum = sendPartFromDiskRemoteMeta(it->second, out, false);
|
||||
data_checksums.addFile(name + ".proj", projection_checksum.getTotalSizeOnDisk(), projection_checksum.getTotalChecksumUInt128());
|
||||
}
|
||||
else if (part->checksums.has(name + ".proj"))
|
||||
{
|
||||
// We don't send this projection, just add out checksum to bypass the following check
|
||||
const auto & our_checksum = part->checksums.files.find(name + ".proj")->second;
|
||||
data_checksums.addFile(name + ".proj", our_checksum.file_size, our_checksum.file_hash);
|
||||
sendPartFromDiskRemoteMeta(projection, out, false, false);
|
||||
}
|
||||
}
|
||||
|
||||
@ -403,7 +382,7 @@ MergeTreeData::DataPart::Checksums Service::sendPartFromDiskRemoteMeta(
|
||||
String file_path_prefix = fs::path(part->getDataPartStorage().getRelativePath()) / file_name;
|
||||
|
||||
/// Just some additional checks
|
||||
String metadata_file_path = fs::path(data_part_storage_on_disk->getDiskPath()) / file_path_prefix;
|
||||
String metadata_file_path = fs::path(data_part_storage->getDiskPath()) / file_path_prefix;
|
||||
fs::path metadata(metadata_file_path);
|
||||
if (!fs::exists(metadata))
|
||||
throw Exception(ErrorCodes::CORRUPTED_DATA, "Remote metadata '{}' is not exists", file_name);
|
||||
@ -427,12 +406,7 @@ MergeTreeData::DataPart::Checksums Service::sendPartFromDiskRemoteMeta(
|
||||
throw Exception(ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART, "Unexpected size of file {}", metadata_file_path);
|
||||
|
||||
writePODBinary(hashing_out.getHash(), out);
|
||||
|
||||
if (!file_names_without_checksums.contains(file_name))
|
||||
data_checksums.addFile(file_name, hashing_out.count(), hashing_out.getHash());
|
||||
}
|
||||
|
||||
return data_checksums;
|
||||
}
|
||||
|
||||
MergeTreeData::DataPartPtr Service::findPart(const String & name)
|
||||
@ -707,68 +681,54 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchSelectedPart(
|
||||
|
||||
in->setNextCallback(ReplicatedFetchReadCallback(*entry));
|
||||
|
||||
return part_type == "InMemory"
|
||||
? downloadPartToMemory(part_name, part_uuid, metadata_snapshot, context, disk, *in, projections, throttler)
|
||||
: downloadPartToDisk(part_name, replica_path, to_detached, tmp_prefix, sync, disk, *in, projections, checksums, throttler);
|
||||
if (part_type == "InMemory")
|
||||
{
|
||||
auto volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, disk, 0);
|
||||
|
||||
auto data_part_storage = std::make_shared<DataPartStorageOnDisk>(
|
||||
volume,
|
||||
data.getRelativeDataPath(),
|
||||
part_name);
|
||||
|
||||
return downloadPartToMemory(
|
||||
data_part_storage, part_name,
|
||||
MergeTreePartInfo::fromPartName(part_name, data.format_version),
|
||||
part_uuid, metadata_snapshot, context, *in,
|
||||
projections, false, throttler);
|
||||
}
|
||||
|
||||
return downloadPartToDisk(
|
||||
part_name, replica_path, to_detached, tmp_prefix,
|
||||
sync, disk, *in, projections, checksums, throttler);
|
||||
}
|
||||
|
||||
MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
|
||||
MutableDataPartStoragePtr data_part_storage,
|
||||
const String & part_name,
|
||||
const MergeTreePartInfo & part_info,
|
||||
const UUID & part_uuid,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
ContextPtr context,
|
||||
DiskPtr disk,
|
||||
PooledReadWriteBufferFromHTTP & in,
|
||||
size_t projections,
|
||||
bool is_projection,
|
||||
ThrottlerPtr throttler)
|
||||
{
|
||||
auto volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, disk, 0);
|
||||
auto new_data_part = std::make_shared<MergeTreeDataPartInMemory>(data, part_name, part_info, data_part_storage);
|
||||
|
||||
auto data_part_storage = std::make_shared<DataPartStorageOnDisk>(
|
||||
volume,
|
||||
data.getRelativeDataPath(),
|
||||
part_name);
|
||||
|
||||
MergeTreeData::MutableDataPartPtr new_data_part =
|
||||
std::make_shared<MergeTreeDataPartInMemory>(data, part_name, data_part_storage);
|
||||
|
||||
new_data_part->version.setCreationTID(Tx::PrehistoricTID, nullptr);
|
||||
|
||||
for (auto i = 0ul; i < projections; ++i)
|
||||
for (size_t i = 0; i < projections; ++i)
|
||||
{
|
||||
String projection_name;
|
||||
readStringBinary(projection_name, in);
|
||||
MergeTreeData::DataPart::Checksums checksums;
|
||||
if (!checksums.read(in))
|
||||
throw Exception("Cannot deserialize checksums", ErrorCodes::CORRUPTED_DATA);
|
||||
|
||||
NativeReader block_in(in, 0);
|
||||
auto block = block_in.read();
|
||||
throttler->add(block.bytes());
|
||||
|
||||
auto projection_part_storage = data_part_storage->getProjection(projection_name + ".proj");
|
||||
|
||||
MergeTreePartInfo new_part_info("all", 0, 0, 0);
|
||||
MergeTreeData::MutableDataPartPtr new_projection_part =
|
||||
std::make_shared<MergeTreeDataPartInMemory>(data, projection_name, new_part_info, projection_part_storage, new_data_part.get());
|
||||
auto projection_part_storage = data_part_storage->getProjection(projection_name + ".proj");
|
||||
|
||||
new_projection_part->is_temp = false;
|
||||
new_projection_part->setColumns(block.getNamesAndTypesList(), {});
|
||||
MergeTreePartition partition{};
|
||||
new_projection_part->partition = std::move(partition);
|
||||
new_projection_part->minmax_idx = std::make_shared<IMergeTreeDataPart::MinMaxIndex>();
|
||||
auto new_projection_part = downloadPartToMemory(
|
||||
projection_part_storage, projection_name,
|
||||
new_part_info, part_uuid, metadata_snapshot,
|
||||
context, in, 0, true, throttler);
|
||||
|
||||
MergedBlockOutputStream part_out(
|
||||
new_projection_part,
|
||||
metadata_snapshot->projections.get(projection_name).metadata,
|
||||
block.getNamesAndTypesList(),
|
||||
{},
|
||||
CompressionCodecFactory::instance().get("NONE", {}),
|
||||
NO_TRANSACTION_PTR);
|
||||
|
||||
part_out.write(block);
|
||||
part_out.finalizePart(new_projection_part, false);
|
||||
new_projection_part->checksums.checkEqual(checksums, /* have_uncompressed = */ true);
|
||||
new_data_part->addProjectionPart(projection_name, std::move(new_projection_part));
|
||||
}
|
||||
|
||||
@ -780,11 +740,16 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
|
||||
auto block = block_in.read();
|
||||
throttler->add(block.bytes());
|
||||
|
||||
new_data_part->uuid = part_uuid;
|
||||
new_data_part->is_temp = true;
|
||||
new_data_part->setColumns(block.getNamesAndTypesList(), {});
|
||||
new_data_part->minmax_idx->update(block, data.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey()));
|
||||
new_data_part->partition.create(metadata_snapshot, block, 0, context);
|
||||
|
||||
if (!is_projection)
|
||||
{
|
||||
new_data_part->version.setCreationTID(Tx::PrehistoricTID, nullptr);
|
||||
new_data_part->uuid = part_uuid;
|
||||
new_data_part->is_temp = true;
|
||||
new_data_part->minmax_idx->update(block, data.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey()));
|
||||
new_data_part->partition.create(metadata_snapshot, block, 0, context);
|
||||
}
|
||||
|
||||
MergedBlockOutputStream part_out(
|
||||
new_data_part, metadata_snapshot, block.getNamesAndTypesList(), {},
|
||||
@ -850,7 +815,6 @@ void Fetcher::downloadBasePartOrProjectionPartToDiskRemoteMeta(
|
||||
checksums.addFile(file_name, file_size, expected_hash);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
@ -966,11 +930,11 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk(
|
||||
|
||||
SyncGuardPtr sync_guard;
|
||||
if (data.getSettings()->fsync_part_directory)
|
||||
sync_guard = disk->getDirectorySyncGuard(data_part_storage->getRelativePath());
|
||||
sync_guard = data_part_storage->getDirectorySyncGuard();
|
||||
|
||||
CurrentMetrics::Increment metric_increment{CurrentMetrics::ReplicatedFetch};
|
||||
|
||||
for (auto i = 0ul; i < projections; ++i)
|
||||
for (size_t i = 0; i < projections; ++i)
|
||||
{
|
||||
String projection_name;
|
||||
readStringBinary(projection_name, in);
|
||||
@ -1043,7 +1007,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDiskRemoteMeta(
|
||||
|
||||
data_part_storage->createDirectories();
|
||||
|
||||
for (auto i = 0ul; i < projections; ++i)
|
||||
for (size_t i = 0; i < projections; ++i)
|
||||
{
|
||||
String projection_name;
|
||||
readStringBinary(projection_name, in);
|
||||
@ -1071,7 +1035,6 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDiskRemoteMeta(
|
||||
new_data_part->version.setCreationTID(Tx::PrehistoricTID, nullptr);
|
||||
new_data_part->is_temp = true;
|
||||
new_data_part->modification_time = time(nullptr);
|
||||
|
||||
new_data_part->loadColumnsChecksumsIndexes(true, false);
|
||||
}
|
||||
#if USE_AWS_S3
|
||||
|
@ -1,5 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include "Storages/MergeTree/MergeTreePartInfo.h"
|
||||
#include <Interpreters/InterserverIOHandler.h>
|
||||
#include <Storages/MergeTree/MergeTreeData.h>
|
||||
#include <Storages/IStorage_fwd.h>
|
||||
@ -42,19 +43,19 @@ private:
|
||||
void sendPartFromMemory(
|
||||
const MergeTreeData::DataPartPtr & part,
|
||||
WriteBuffer & out,
|
||||
const std::map<String, std::shared_ptr<IMergeTreeDataPart>> & projections = {});
|
||||
bool send_projections);
|
||||
|
||||
MergeTreeData::DataPart::Checksums sendPartFromDisk(
|
||||
const MergeTreeData::DataPartPtr & part,
|
||||
WriteBuffer & out,
|
||||
int client_protocol_version,
|
||||
const std::map<String, std::shared_ptr<IMergeTreeDataPart>> & projections = {});
|
||||
bool send_projections);
|
||||
|
||||
MergeTreeData::DataPart::Checksums sendPartFromDiskRemoteMeta(
|
||||
void sendPartFromDiskRemoteMeta(
|
||||
const MergeTreeData::DataPartPtr & part,
|
||||
WriteBuffer & out,
|
||||
bool send_part_id,
|
||||
const std::map<String, std::shared_ptr<IMergeTreeDataPart>> & projections = {});
|
||||
bool send_projections);
|
||||
|
||||
/// StorageReplicatedMergeTree::shutdown() waits for all parts exchange handlers to finish,
|
||||
/// so Service will never access dangling reference to storage
|
||||
@ -120,13 +121,15 @@ private:
|
||||
ThrottlerPtr throttler);
|
||||
|
||||
MergeTreeData::MutableDataPartPtr downloadPartToMemory(
|
||||
MutableDataPartStoragePtr data_part_storage,
|
||||
const String & part_name,
|
||||
const MergeTreePartInfo & part_info,
|
||||
const UUID & part_uuid,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
ContextPtr context,
|
||||
DiskPtr disk,
|
||||
PooledReadWriteBufferFromHTTP & in,
|
||||
size_t projections,
|
||||
bool is_projection,
|
||||
ThrottlerPtr throttler);
|
||||
|
||||
MergeTreeData::MutableDataPartPtr downloadPartToDiskRemoteMeta(
|
||||
|
@ -480,8 +480,6 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPart(
|
||||
|
||||
MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPartImpl(
|
||||
const String & part_name,
|
||||
MergeTreeDataPartType part_type,
|
||||
const String & relative_path,
|
||||
bool is_temp,
|
||||
IMergeTreeDataPart * parent_part,
|
||||
const MergeTreeData & data,
|
||||
@ -493,6 +491,21 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPartImpl(
|
||||
const StorageMetadataPtr & metadata_snapshot = projection.metadata;
|
||||
MergeTreePartInfo new_part_info("all", 0, 0, 0);
|
||||
|
||||
MergeTreeDataPartType part_type;
|
||||
if (parent_part->getType() == MergeTreeDataPartType::InMemory)
|
||||
{
|
||||
part_type = MergeTreeDataPartType::InMemory;
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Size of part would not be greater than block.bytes() + epsilon
|
||||
size_t expected_size = block.bytes();
|
||||
// just check if there is enough space on parent volume
|
||||
data.reserveSpace(expected_size, parent_part->getDataPartStorage());
|
||||
part_type = data.choosePartTypeOnDisk(expected_size, block.rows());
|
||||
}
|
||||
|
||||
auto relative_path = part_name + (is_temp ? ".tmp_proj" : ".proj");
|
||||
auto projection_part_storage = parent_part->getDataPartStorage().getProjection(relative_path);
|
||||
auto new_data_part = data.createPart(
|
||||
part_name,
|
||||
@ -583,77 +596,6 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPartImpl(
|
||||
}
|
||||
|
||||
MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPart(
|
||||
MergeTreeData & data,
|
||||
Poco::Logger * log,
|
||||
Block block,
|
||||
const ProjectionDescription & projection,
|
||||
IMergeTreeDataPart * parent_part)
|
||||
{
|
||||
String part_name = projection.name;
|
||||
MergeTreeDataPartType part_type;
|
||||
if (parent_part->getType() == MergeTreeDataPartType::InMemory)
|
||||
{
|
||||
part_type = MergeTreeDataPartType::InMemory;
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Size of part would not be greater than block.bytes() + epsilon
|
||||
size_t expected_size = block.bytes();
|
||||
// just check if there is enough space on parent volume
|
||||
data.reserveSpace(expected_size, parent_part->getDataPartStorage());
|
||||
part_type = data.choosePartTypeOnDisk(expected_size, block.rows());
|
||||
}
|
||||
|
||||
return writeProjectionPartImpl(
|
||||
part_name,
|
||||
part_type,
|
||||
part_name + ".proj" /* relative_path */,
|
||||
false /* is_temp */,
|
||||
parent_part,
|
||||
data,
|
||||
log,
|
||||
block,
|
||||
projection);
|
||||
}
|
||||
|
||||
/// This is used for projection materialization process which may contain multiple stages of
|
||||
/// projection part merges.
|
||||
MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempProjectionPart(
|
||||
MergeTreeData & data,
|
||||
Poco::Logger * log,
|
||||
Block block,
|
||||
const ProjectionDescription & projection,
|
||||
IMergeTreeDataPart * parent_part,
|
||||
size_t block_num)
|
||||
{
|
||||
String part_name = fmt::format("{}_{}", projection.name, block_num);
|
||||
MergeTreeDataPartType part_type;
|
||||
if (parent_part->getType() == MergeTreeDataPartType::InMemory)
|
||||
{
|
||||
part_type = MergeTreeDataPartType::InMemory;
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Size of part would not be greater than block.bytes() + epsilon
|
||||
size_t expected_size = block.bytes();
|
||||
// just check if there is enough space on parent volume
|
||||
data.reserveSpace(expected_size, parent_part->getDataPartStorage());
|
||||
part_type = data.choosePartTypeOnDisk(expected_size, block.rows());
|
||||
}
|
||||
|
||||
return writeProjectionPartImpl(
|
||||
part_name,
|
||||
part_type,
|
||||
part_name + ".tmp_proj" /* relative_path */,
|
||||
true /* is_temp */,
|
||||
parent_part,
|
||||
data,
|
||||
log,
|
||||
block,
|
||||
projection);
|
||||
}
|
||||
|
||||
MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeInMemoryProjectionPart(
|
||||
const MergeTreeData & data,
|
||||
Poco::Logger * log,
|
||||
Block block,
|
||||
@ -662,13 +604,32 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeInMemoryProjectionP
|
||||
{
|
||||
return writeProjectionPartImpl(
|
||||
projection.name,
|
||||
MergeTreeDataPartType::InMemory,
|
||||
projection.name + ".proj" /* relative_path */,
|
||||
false /* is_temp */,
|
||||
parent_part,
|
||||
data,
|
||||
log,
|
||||
block,
|
||||
std::move(block),
|
||||
projection);
|
||||
}
|
||||
|
||||
/// This is used for projection materialization process which may contain multiple stages of
|
||||
/// projection part merges.
|
||||
MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempProjectionPart(
|
||||
const MergeTreeData & data,
|
||||
Poco::Logger * log,
|
||||
Block block,
|
||||
const ProjectionDescription & projection,
|
||||
IMergeTreeDataPart * parent_part,
|
||||
size_t block_num)
|
||||
{
|
||||
String part_name = fmt::format("{}_{}", projection.name, block_num);
|
||||
return writeProjectionPartImpl(
|
||||
part_name,
|
||||
true /* is_temp */,
|
||||
parent_part,
|
||||
data,
|
||||
log,
|
||||
std::move(block),
|
||||
projection);
|
||||
}
|
||||
|
||||
|
@ -73,7 +73,7 @@ public:
|
||||
|
||||
/// For insertion.
|
||||
static TemporaryPart writeProjectionPart(
|
||||
MergeTreeData & data,
|
||||
const MergeTreeData & data,
|
||||
Poco::Logger * log,
|
||||
Block block,
|
||||
const ProjectionDescription & projection,
|
||||
@ -81,21 +81,13 @@ public:
|
||||
|
||||
/// For mutation: MATERIALIZE PROJECTION.
|
||||
static TemporaryPart writeTempProjectionPart(
|
||||
MergeTreeData & data,
|
||||
const MergeTreeData & data,
|
||||
Poco::Logger * log,
|
||||
Block block,
|
||||
const ProjectionDescription & projection,
|
||||
IMergeTreeDataPart * parent_part,
|
||||
size_t block_num);
|
||||
|
||||
/// For WriteAheadLog AddPart.
|
||||
static TemporaryPart writeInMemoryProjectionPart(
|
||||
const MergeTreeData & data,
|
||||
Poco::Logger * log,
|
||||
Block block,
|
||||
const ProjectionDescription & projection,
|
||||
IMergeTreeDataPart * parent_part);
|
||||
|
||||
static Block mergeBlock(
|
||||
const Block & block,
|
||||
SortDescription sort_description,
|
||||
@ -106,8 +98,6 @@ public:
|
||||
private:
|
||||
static TemporaryPart writeProjectionPartImpl(
|
||||
const String & part_name,
|
||||
MergeTreeDataPartType part_type,
|
||||
const String & relative_path,
|
||||
bool is_temp,
|
||||
IMergeTreeDataPart * parent_part,
|
||||
const MergeTreeData & data,
|
||||
@ -116,7 +106,6 @@ private:
|
||||
const ProjectionDescription & projection);
|
||||
|
||||
MergeTreeData & data;
|
||||
|
||||
Poco::Logger * log;
|
||||
};
|
||||
|
||||
|
@ -237,7 +237,7 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(
|
||||
for (const auto & projection : metadata_snapshot->getProjections())
|
||||
{
|
||||
auto projection_block = projection.calculate(block, context);
|
||||
auto temp_part = MergeTreeDataWriter::writeInMemoryProjectionPart(storage, log, projection_block, projection, part.get());
|
||||
auto temp_part = MergeTreeDataWriter::writeProjectionPart(storage, log, projection_block, projection, part.get());
|
||||
temp_part.finalize();
|
||||
if (projection_block.rows())
|
||||
part->addProjectionPart(projection.name, std::move(temp_part.part));
|
||||
|
@ -112,98 +112,17 @@ IMergeTreeDataPart::Checksums checkDataPart(
|
||||
};
|
||||
|
||||
/// This function calculates only checksum of file content (compressed or uncompressed).
|
||||
/// It also calculates checksum of projections.
|
||||
auto checksum_file = [&](const String & file_name)
|
||||
{
|
||||
if (data_part_storage.isDirectory(file_name) && endsWith(file_name, ".proj"))
|
||||
{
|
||||
auto projection_name = file_name.substr(0, file_name.size() - sizeof(".proj") + 1);
|
||||
auto pit = data_part->getProjectionParts().find(projection_name);
|
||||
if (pit == data_part->getProjectionParts().end())
|
||||
{
|
||||
if (require_checksums)
|
||||
throw Exception("Unexpected file " + file_name + " in data part", ErrorCodes::UNEXPECTED_FILE_IN_DATA_PART);
|
||||
else
|
||||
return;
|
||||
}
|
||||
|
||||
const auto & projection = pit->second;
|
||||
IMergeTreeDataPart::Checksums projection_checksums_data;
|
||||
|
||||
auto projection_part_storage = data_part_storage.getProjection(file_name);
|
||||
|
||||
if (projection->getType() == MergeTreeDataPartType::Compact)
|
||||
{
|
||||
auto file_buf = projection_part_storage->readFile(MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION, {}, std::nullopt, std::nullopt);
|
||||
HashingReadBuffer hashing_buf(*file_buf);
|
||||
hashing_buf.ignoreAll();
|
||||
projection_checksums_data.files[MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION]
|
||||
= IMergeTreeDataPart::Checksums::Checksum(hashing_buf.count(), hashing_buf.getHash());
|
||||
}
|
||||
else
|
||||
{
|
||||
const NamesAndTypesList & projection_columns_list = projection->getColumns();
|
||||
for (const auto & projection_column : projection_columns_list)
|
||||
{
|
||||
get_serialization(projection_column)->enumerateStreams(
|
||||
[&](const ISerialization::SubstreamPath & substream_path)
|
||||
{
|
||||
String projection_file_name = ISerialization::getFileNameForStream(projection_column, substream_path) + ".bin";
|
||||
projection_checksums_data.files[projection_file_name] = checksum_compressed_file(*projection_part_storage, projection_file_name);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
IMergeTreeDataPart::Checksums projection_checksums_txt;
|
||||
|
||||
if (require_checksums || projection_part_storage->exists("checksums.txt"))
|
||||
{
|
||||
auto buf = projection_part_storage->readFile("checksums.txt", {}, std::nullopt, std::nullopt);
|
||||
projection_checksums_txt.read(*buf);
|
||||
assertEOF(*buf);
|
||||
}
|
||||
|
||||
const auto & projection_checksum_files_txt = projection_checksums_txt.files;
|
||||
for (auto projection_it = projection_part_storage->iterate(); projection_it->isValid(); projection_it->next())
|
||||
{
|
||||
const String & projection_file_name = projection_it->name();
|
||||
auto projection_checksum_it = projection_checksums_data.files.find(projection_file_name);
|
||||
|
||||
/// Skip files that we already calculated. Also skip metadata files that are not checksummed.
|
||||
if (projection_checksum_it == projection_checksums_data.files.end() && !files_without_checksums.contains(projection_file_name))
|
||||
{
|
||||
auto projection_txt_checksum_it = projection_checksum_files_txt.find(file_name);
|
||||
if (projection_txt_checksum_it == projection_checksum_files_txt.end()
|
||||
|| projection_txt_checksum_it->second.uncompressed_size == 0)
|
||||
{
|
||||
auto projection_file_buf = projection_part_storage->readFile(projection_file_name, {}, std::nullopt, std::nullopt);
|
||||
HashingReadBuffer projection_hashing_buf(*projection_file_buf);
|
||||
projection_hashing_buf.ignoreAll();
|
||||
projection_checksums_data.files[projection_file_name] = IMergeTreeDataPart::Checksums::Checksum(
|
||||
projection_hashing_buf.count(), projection_hashing_buf.getHash());
|
||||
}
|
||||
else
|
||||
{
|
||||
projection_checksums_data.files[projection_file_name] = checksum_compressed_file(*projection_part_storage, projection_file_name);
|
||||
}
|
||||
}
|
||||
}
|
||||
checksums_data.files[file_name] = IMergeTreeDataPart::Checksums::Checksum(
|
||||
projection_checksums_data.getTotalSizeOnDisk(), projection_checksums_data.getTotalChecksumUInt128());
|
||||
|
||||
if (require_checksums || !projection_checksums_txt.files.empty())
|
||||
projection_checksums_txt.checkEqual(projection_checksums_data, false);
|
||||
}
|
||||
else
|
||||
{
|
||||
auto file_buf = data_part_storage.readFile(file_name, {}, std::nullopt, std::nullopt);
|
||||
HashingReadBuffer hashing_buf(*file_buf);
|
||||
hashing_buf.ignoreAll();
|
||||
checksums_data.files[file_name] = IMergeTreeDataPart::Checksums::Checksum(hashing_buf.count(), hashing_buf.getHash());
|
||||
}
|
||||
auto file_buf = data_part_storage.readFile(file_name, {}, std::nullopt, std::nullopt);
|
||||
HashingReadBuffer hashing_buf(*file_buf);
|
||||
hashing_buf.ignoreAll();
|
||||
checksums_data.files[file_name] = IMergeTreeDataPart::Checksums::Checksum(hashing_buf.count(), hashing_buf.getHash());
|
||||
};
|
||||
|
||||
bool check_uncompressed = true;
|
||||
/// Do not check uncompressed for projections. But why?
|
||||
bool check_uncompressed = !data_part->isProjectionPart();
|
||||
|
||||
/// First calculate checksums for columns data
|
||||
if (part_type == MergeTreeDataPartType::Compact)
|
||||
{
|
||||
@ -238,10 +157,19 @@ IMergeTreeDataPart::Checksums checkDataPart(
|
||||
assertEOF(*buf);
|
||||
}
|
||||
|
||||
NameSet projections_on_disk;
|
||||
const auto & checksum_files_txt = checksums_txt.files;
|
||||
for (auto it = data_part_storage.iterate(); it->isValid(); it->next())
|
||||
{
|
||||
const String & file_name = it->name();
|
||||
auto file_name = it->name();
|
||||
|
||||
/// We will check projections later.
|
||||
if (data_part_storage.isDirectory(file_name) && endsWith(file_name, ".proj"))
|
||||
{
|
||||
projections_on_disk.insert(file_name);
|
||||
continue;
|
||||
}
|
||||
|
||||
auto checksum_it = checksums_data.files.find(file_name);
|
||||
|
||||
/// Skip files that we already calculated. Also skip metadata files that are not checksummed.
|
||||
@ -260,11 +188,38 @@ IMergeTreeDataPart::Checksums checkDataPart(
|
||||
}
|
||||
}
|
||||
|
||||
for (const auto & [name, projection] : data_part->getProjectionParts())
|
||||
{
|
||||
if (is_cancelled())
|
||||
return {};
|
||||
|
||||
auto projection_file = name + ".proj";
|
||||
auto projection_checksums = checkDataPart(
|
||||
projection, *data_part_storage.getProjection(projection_file),
|
||||
projection->getColumns(), projection->getType(),
|
||||
projection->getFileNamesWithoutChecksums(),
|
||||
require_checksums, is_cancelled);
|
||||
|
||||
checksums_data.files[projection_file] = IMergeTreeDataPart::Checksums::Checksum(
|
||||
projection_checksums.getTotalSizeOnDisk(),
|
||||
projection_checksums.getTotalChecksumUInt128());
|
||||
|
||||
projections_on_disk.erase(projection_file);
|
||||
}
|
||||
|
||||
if (require_checksums && !projections_on_disk.empty())
|
||||
{
|
||||
throw Exception(ErrorCodes::UNEXPECTED_FILE_IN_DATA_PART,
|
||||
"Found unexpected projection directories: {}",
|
||||
fmt::join(projections_on_disk, ","));
|
||||
}
|
||||
|
||||
if (is_cancelled())
|
||||
return {};
|
||||
|
||||
if (require_checksums || !checksums_txt.files.empty())
|
||||
checksums_txt.checkEqual(checksums_data, check_uncompressed);
|
||||
|
||||
return checksums_data;
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,34 @@
|
||||
<?xml version="1.0"?>
|
||||
<clickhouse>
|
||||
<storage_configuration>
|
||||
<disks>
|
||||
<backup_disk_s3_plain>
|
||||
<type>s3_plain</type>
|
||||
<endpoint>http://minio1:9001/root/data/disks/disk_s3_plain/</endpoint>
|
||||
<access_key_id>minio</access_key_id>
|
||||
<secret_access_key>minio123</secret_access_key>
|
||||
<s3_max_single_part_upload_size>33554432</s3_max_single_part_upload_size>
|
||||
</backup_disk_s3_plain>
|
||||
<attach_disk_s3_plain>
|
||||
<type>s3_plain</type>
|
||||
<!-- NOTE: /backup/ is a name of BACKUP -->
|
||||
<endpoint>http://minio1:9001/root/data/disks/disk_s3_plain/backup/</endpoint>
|
||||
<access_key_id>minio</access_key_id>
|
||||
<secret_access_key>minio123</secret_access_key>
|
||||
<s3_max_single_part_upload_size>33554432</s3_max_single_part_upload_size>
|
||||
</attach_disk_s3_plain>
|
||||
</disks>
|
||||
<policies>
|
||||
<attach_policy_s3_plain>
|
||||
<volumes>
|
||||
<main>
|
||||
<disk>attach_disk_s3_plain</disk>
|
||||
</main>
|
||||
</volumes>
|
||||
</attach_policy_s3_plain>
|
||||
</policies>
|
||||
</storage_configuration>
|
||||
<backups>
|
||||
<allowed_disk>backup_disk_s3_plain</allowed_disk>
|
||||
</backups>
|
||||
</clickhouse>
|
40
tests/integration/test_attach_backup_from_s3_plain/test.py
Normal file
40
tests/integration/test_attach_backup_from_s3_plain/test.py
Normal file
@ -0,0 +1,40 @@
|
||||
# pylint: disable=global-statement
|
||||
# pylint: disable=line-too-long
|
||||
|
||||
import pytest
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
node = cluster.add_instance(
|
||||
"node",
|
||||
main_configs=["configs/disk_s3.xml"],
|
||||
with_minio=True,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module", autouse=True)
|
||||
def start_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
yield
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def test_attach_backup():
|
||||
node.query(
|
||||
f"""
|
||||
-- BACKUP writes Ordinary like structure
|
||||
set allow_deprecated_database_ordinary=1;
|
||||
create database ordinary engine=Ordinary;
|
||||
|
||||
create table ordinary.test_backup_attach engine=MergeTree() order by tuple() as select * from numbers(100);
|
||||
-- NOTE: name of backup ("backup") is significant.
|
||||
backup table ordinary.test_backup_attach TO Disk('backup_disk_s3_plain', 'backup');
|
||||
|
||||
drop table ordinary.test_backup_attach;
|
||||
attach table ordinary.test_backup_attach (number UInt64) engine=MergeTree() order by tuple() settings storage_policy='attach_policy_s3_plain';
|
||||
"""
|
||||
)
|
||||
|
||||
assert int(node.query("select count() from ordinary.test_backup_attach")) == 100
|
@ -1,3 +1,4 @@
|
||||
v22.10.2.11-stable 2022-11-01
|
||||
v22.10.1.1877-stable 2022-10-26
|
||||
v22.9.4.32-stable 2022-10-26
|
||||
v22.9.3.18-stable 2022-09-30
|
||||
|
|
Loading…
Reference in New Issue
Block a user