Merge pull request #31505 from ContentSquare/azure_blob_storage_dependency

Azure Blob Storage Disks
This commit is contained in:
Kseniia Sumarokova 2021-12-13 18:28:48 +03:00 committed by GitHub
commit 78ed383f15
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
45 changed files with 1836 additions and 96 deletions

3
.gitmodules vendored
View File

@ -247,3 +247,6 @@
[submodule "contrib/sysroot"]
path = contrib/sysroot
url = https://github.com/ClickHouse-Extras/sysroot.git
[submodule "contrib/azure"]
path = contrib/azure
url = https://github.com/ClickHouse-Extras/azure-sdk-for-cpp.git

View File

@ -508,6 +508,7 @@ include (cmake/find/hdfs3.cmake) # uses protobuf
include (cmake/find/poco.cmake)
include (cmake/find/curl.cmake)
include (cmake/find/s3.cmake)
include (cmake/find/blob_storage.cmake)
include (cmake/find/base64.cmake)
include (cmake/find/parquet.cmake)
include (cmake/find/simdjson.cmake)

View File

@ -0,0 +1,28 @@
option(USE_INTERNAL_AZURE_BLOB_STORAGE_LIBRARY
"Set to FALSE to use system Azure SDK instead of bundled (OFF currently not implemented)"
ON)
if (USE_INTERNAL_AZURE_BLOB_STORAGE_LIBRARY)
set(USE_AZURE_BLOB_STORAGE 1)
set(AZURE_BLOB_STORAGE_LIBRARY azure_sdk)
endif()
if ((NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/azure/sdk"
OR NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/azure/cmake-modules")
AND USE_INTERNAL_AZURE_BLOB_STORAGE_LIBRARY)
message (WARNING "submodule contrib/azure is missing. to fix try run: \n git submodule update --init")
set(USE_INTERNAL_AZURE_BLOB_STORAGE_LIBRARY OFF)
set(USE_AZURE_BLOB_STORAGE 0)
endif ()
if (NOT USE_INTERNAL_SSL_LIBRARY AND USE_INTERNAL_AZURE_BLOB_STORAGE_LIBRARY)
message (FATAL_ERROR "Currently Blob Storage support can be built only with internal SSL library")
endif()
if (NOT USE_INTERNAL_CURL AND USE_INTERNAL_AZURE_BLOB_STORAGE_LIBRARY)
message (FATAL_ERROR "Currently Blob Storage support can be built only with internal curl library")
endif()
if (USE_AZURE_BLOB_STORAGE)
message (STATUS "Using Azure Blob Storage - ${USE_AZURE_BLOB_STORAGE}")
endif()

View File

@ -249,6 +249,10 @@ endif()
# - sentry-native
add_subdirectory (curl-cmake)
if (USE_INTERNAL_AZURE_BLOB_STORAGE_LIBRARY)
add_subdirectory(azure-cmake)
endif()
if (USE_SENTRY)
add_subdirectory (sentry-native-cmake)
endif()

1
contrib/azure vendored Submodule

@ -0,0 +1 @@
Subproject commit ac4b763d4ca40122275f1497cbdc5451337461d9

View File

@ -0,0 +1,71 @@
set(AZURE_DIR "${ClickHouse_SOURCE_DIR}/contrib/azure")
set(AZURE_SDK_LIBRARY_DIR "${AZURE_DIR}/sdk")
file(GLOB AZURE_SDK_CORE_SRC
"${AZURE_SDK_LIBRARY_DIR}/core/azure-core/src/*.cpp"
"${AZURE_SDK_LIBRARY_DIR}/core/azure-core/src/cryptography/*.cpp"
"${AZURE_SDK_LIBRARY_DIR}/core/azure-core/src/http/*.cpp"
"${AZURE_SDK_LIBRARY_DIR}/core/azure-core/src/http/curl/*.hpp"
"${AZURE_SDK_LIBRARY_DIR}/core/azure-core/src/http/curl/*.cpp"
"${AZURE_SDK_LIBRARY_DIR}/core/azure-core/src/winhttp/*.cpp"
"${AZURE_SDK_LIBRARY_DIR}/core/azure-core/src/io/*.cpp"
"${AZURE_SDK_LIBRARY_DIR}/core/azure-core/src/private/*.hpp"
)
file(GLOB AZURE_SDK_IDENTITY_SRC
"${AZURE_SDK_LIBRARY_DIR}/identity/azure-identity/src/*.cpp"
"${AZURE_SDK_LIBRARY_DIR}/identity/azure-identity/src/private/*.hpp"
)
file(GLOB AZURE_SDK_STORAGE_COMMON_SRC
"${AZURE_SDK_LIBRARY_DIR}/storage/azure-storage-common/src/*.cpp"
"${AZURE_SDK_LIBRARY_DIR}/storage/azure-storage-common/src/private/*.cpp"
)
file(GLOB AZURE_SDK_STORAGE_BLOBS_SRC
"${AZURE_SDK_LIBRARY_DIR}/storage/azure-storage-blobs/src/*.cpp"
"${AZURE_SDK_LIBRARY_DIR}/storage/azure-storage-blobs/src/private/*.hpp"
)
file(GLOB AZURE_SDK_UNIFIED_SRC
${AZURE_SDK_CORE_SRC}
${AZURE_SDK_IDENTITY_SRC}
${AZURE_SDK_STORAGE_COMMON_SRC}
${AZURE_SDK_STORAGE_BLOBS_SRC}
)
set(AZURE_SDK_INCLUDES
"${AZURE_SDK_LIBRARY_DIR}/core/azure-core/inc/"
"${AZURE_SDK_LIBRARY_DIR}/identity/azure-identity/inc/"
"${AZURE_SDK_LIBRARY_DIR}/storage/azure-storage-common/inc/"
"${AZURE_SDK_LIBRARY_DIR}/storage/azure-storage-blobs/inc/"
)
include("${AZURE_DIR}/cmake-modules/AzureTransportAdapters.cmake")
add_library(azure_sdk ${AZURE_SDK_UNIFIED_SRC})
if (COMPILER_CLANG)
target_compile_options(azure_sdk PUBLIC
-Wno-deprecated-copy-dtor
-Wno-extra-semi
-Wno-suggest-destructor-override
-Wno-inconsistent-missing-destructor-override
-Wno-error=unknown-warning-option
-Wno-reserved-identifier
)
endif()
# Originally, on Windows azure-core is built with bcrypt and crypt32 by default
if (OPENSSL_FOUND)
target_link_libraries(azure_sdk PRIVATE ${OPENSSL_LIBRARIES})
endif()
# Originally, on Windows azure-core is built with winhttp by default
if (CURL_FOUND)
target_link_libraries(azure_sdk PRIVATE ${CURL_LIBRARY})
endif()
target_link_libraries(azure_sdk PRIVATE ${LIBXML2_LIBRARIES})
target_include_directories(azure_sdk PUBLIC ${AZURE_SDK_INCLUDES})

View File

@ -639,6 +639,7 @@ add_library(
"${BORINGSSL_SOURCE_DIR}/decrepit/ssl/ssl_decrepit.c"
"${BORINGSSL_SOURCE_DIR}/decrepit/cfb/cfb.c"
"${BORINGSSL_SOURCE_DIR}/decrepit/bio/base64_bio.c"
)
add_executable(

View File

@ -85,7 +85,8 @@ RUN python3 -m pip install \
tzlocal==2.1 \
urllib3 \
requests-kerberos \
pyhdfs
pyhdfs \
azure-storage-blob
COPY modprobe.sh /usr/local/bin/modprobe
COPY dockerd-entrypoint.sh /usr/local/bin/

View File

@ -0,0 +1,13 @@
version: '2.3'
services:
azurite1:
image: mcr.microsoft.com/azure-storage/azurite
ports:
- "10000:10000"
volumes:
- data1-1:/data1
command: azurite-blob --blobHost 0.0.0.0 --blobPort 10000 --debug /azurite_log
volumes:
data1-1:

View File

@ -106,6 +106,10 @@ if (USE_AWS_S3)
add_headers_and_sources(dbms Disks/S3)
endif()
if (USE_AZURE_BLOB_STORAGE)
add_headers_and_sources(dbms Disks/BlobStorage)
endif()
if (USE_HDFS)
add_headers_and_sources(dbms Storages/HDFS)
add_headers_and_sources(dbms Disks/HDFS)
@ -450,6 +454,11 @@ if (USE_AWS_S3)
target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${AWS_S3_INCLUDE_DIR})
endif()
if (USE_AZURE_BLOB_STORAGE)
target_link_libraries (clickhouse_common_io PUBLIC ${AZURE_BLOB_STORAGE_LIBRARY})
target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${AZURE_SDK_INCLUDES})
endif()
if (USE_S2_GEOMETRY)
dbms_target_link_libraries (PUBLIC ${S2_GEOMETRY_LIBRARY})
dbms_target_include_directories (SYSTEM BEFORE PUBLIC ${S2_GEOMETRY_INCLUDE_DIR})

View File

@ -470,6 +470,7 @@
M(497, ACCESS_DENIED) \
M(498, LIMIT_BY_WITH_TIES_IS_NOT_SUPPORTED) \
M(499, S3_ERROR) \
M(500, BLOB_STORAGE_ERROR) \
M(501, CANNOT_CREATE_DATABASE) \
M(502, CANNOT_SIGQUEUE) \
M(503, AGGREGATE_FUNCTION_THROW) \

View File

@ -9,6 +9,7 @@
#cmakedefine01 USE_HDFS
#cmakedefine01 USE_INTERNAL_HDFS3_LIBRARY
#cmakedefine01 USE_AWS_S3
#cmakedefine01 USE_AZURE_BLOB_STORAGE
#cmakedefine01 USE_BROTLI
#cmakedefine01 USE_UNWIND
#cmakedefine01 USE_OPENCL

View File

@ -0,0 +1,145 @@
#include <Disks/BlobStorage/BlobStorageAuth.h>
#if USE_AZURE_BLOB_STORAGE
#include <optional>
#include <re2/re2.h>
#include <azure/identity/managed_identity_credential.hpp>
using namespace Azure::Storage::Blobs;
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
struct BlobStorageEndpoint
{
const String storage_account_url;
const String container_name;
const std::optional<bool> container_already_exists;
};
void validateStorageAccountUrl(const String & storage_account_url)
{
const auto * storage_account_url_pattern_str = R"(http(()|s)://[a-z0-9-.:]+(()|/)[a-z0-9]*(()|/))";
static const RE2 storage_account_url_pattern(storage_account_url_pattern_str);
if (!re2::RE2::FullMatch(storage_account_url, storage_account_url_pattern))
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Blob Storage URL is not valid, should follow the format: {}, got: {}", storage_account_url_pattern_str, storage_account_url);
}
void validateContainerName(const String & container_name)
{
auto len = container_name.length();
if (len < 3 || len > 64)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Blob Storage container name is not valid, should have length between 3 and 64, but has length: {}", len);
const auto * container_name_pattern_str = R"([a-z][a-z0-9-]+)";
static const RE2 container_name_pattern(container_name_pattern_str);
if (!re2::RE2::FullMatch(container_name, container_name_pattern))
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Blob Storage container name is not valid, should follow the format: {}, got: {}", container_name_pattern_str, container_name);
}
BlobStorageEndpoint processBlobStorageEndpoint(const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
{
String storage_account_url = config.getString(config_prefix + ".storage_account_url");
validateStorageAccountUrl(storage_account_url);
String container_name = config.getString(config_prefix + ".container_name", "default-container");
validateContainerName(container_name);
std::optional<bool> container_already_exists {};
if (config.has(config_prefix + ".container_already_exists"))
container_already_exists = {config.getBool(config_prefix + ".container_already_exists")};
return {storage_account_url, container_name, container_already_exists};
}
template <class T>
std::shared_ptr<T> getClientWithConnectionString(const String & connection_str, const String & container_name) = delete;
template<>
std::shared_ptr<BlobServiceClient> getClientWithConnectionString(
const String & connection_str, const String & /*container_name*/)
{
return std::make_shared<BlobServiceClient>(BlobServiceClient::CreateFromConnectionString(connection_str));
}
template<>
std::shared_ptr<BlobContainerClient> getClientWithConnectionString(
const String & connection_str, const String & container_name)
{
return std::make_shared<BlobContainerClient>(BlobContainerClient::CreateFromConnectionString(connection_str, container_name));
}
template <class T>
std::shared_ptr<T> getBlobStorageClientWithAuth(
const String & url, const String & container_name, const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
{
if (config.has(config_prefix + ".connection_string"))
{
String connection_str = config.getString(config_prefix + ".connection_string");
return getClientWithConnectionString<T>(connection_str, container_name);
}
if (config.has(config_prefix + ".account_key") && config.has(config_prefix + ".account_name"))
{
auto storage_shared_key_credential = std::make_shared<Azure::Storage::StorageSharedKeyCredential>(
config.getString(config_prefix + ".account_name"),
config.getString(config_prefix + ".account_key")
);
return std::make_shared<T>(url, storage_shared_key_credential);
}
auto managed_identity_credential = std::make_shared<Azure::Identity::ManagedIdentityCredential>();
return std::make_shared<T>(url, managed_identity_credential);
}
std::shared_ptr<BlobContainerClient> getBlobContainerClient(
const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
{
auto endpoint = processBlobStorageEndpoint(config, config_prefix);
auto container_name = endpoint.container_name;
auto final_url = endpoint.storage_account_url
+ (endpoint.storage_account_url.back() == '/' ? "" : "/")
+ container_name;
if (endpoint.container_already_exists.value_or(false))
return getBlobStorageClientWithAuth<BlobContainerClient>(final_url, container_name, config, config_prefix);
auto blob_service_client = getBlobStorageClientWithAuth<BlobServiceClient>(endpoint.storage_account_url, container_name, config, config_prefix);
if (!endpoint.container_already_exists.has_value())
{
ListBlobContainersOptions blob_containers_list_options;
blob_containers_list_options.Prefix = container_name;
blob_containers_list_options.PageSizeHint = 1;
auto blob_containers = blob_service_client->ListBlobContainers().BlobContainers;
for (const auto & blob_container : blob_containers)
{
if (blob_container.Name == endpoint.container_name)
return getBlobStorageClientWithAuth<BlobContainerClient>(final_url, container_name, config, config_prefix);
}
}
return std::make_shared<BlobContainerClient>(
blob_service_client->CreateBlobContainer(container_name).Value);
}
}
#endif

View File

@ -0,0 +1,20 @@
#pragma once
#if !defined(ARCADIA_BUILD)
#include <Common/config.h>
#endif
#if USE_AZURE_BLOB_STORAGE
#include <Disks/IDiskRemote.h>
#include <azure/storage/blobs.hpp>
namespace DB
{
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> getBlobContainerClient(
const Poco::Util::AbstractConfiguration & config, const String & config_prefix);
}
#endif

View File

@ -0,0 +1,190 @@
#include <Disks/BlobStorage/DiskBlobStorage.h>
#if USE_AZURE_BLOB_STORAGE
#include <Disks/RemoteDisksCommon.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/ReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/WriteIndirectBufferFromRemoteFS.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BLOB_STORAGE_ERROR;
}
DiskBlobStorageSettings::DiskBlobStorageSettings(
UInt64 max_single_part_upload_size_,
UInt64 min_bytes_for_seek_,
int max_single_read_retries_,
int max_single_download_retries_,
int thread_pool_size_) :
max_single_part_upload_size(max_single_part_upload_size_),
min_bytes_for_seek(min_bytes_for_seek_),
max_single_read_retries(max_single_read_retries_),
max_single_download_retries(max_single_download_retries_),
thread_pool_size(thread_pool_size_) {}
class BlobStoragePathKeeper : public RemoteFSPathKeeper
{
public:
/// RemoteFSPathKeeper constructed with a placeholder argument for chunk_limit, it is unused in this class
BlobStoragePathKeeper() : RemoteFSPathKeeper(1000) {}
void addPath(const String & path) override
{
paths.push_back(path);
}
std::vector<String> paths;
};
DiskBlobStorage::DiskBlobStorage(
const String & name_,
DiskPtr metadata_disk_,
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> blob_container_client_,
SettingsPtr settings_,
GetDiskSettings settings_getter_) :
IDiskRemote(name_, "", metadata_disk_, "DiskBlobStorage", settings_->thread_pool_size),
blob_container_client(blob_container_client_),
current_settings(std::move(settings_)),
settings_getter(settings_getter_) {}
std::unique_ptr<ReadBufferFromFileBase> DiskBlobStorage::readFile(
const String & path,
const ReadSettings & read_settings,
std::optional<size_t> /*estimated_size*/) const
{
auto settings = current_settings.get();
auto metadata = readMeta(path);
LOG_TRACE(log, "Read from file by path: {}", backQuote(metadata_disk->getPath() + path));
bool threadpool_read = read_settings.remote_fs_method == RemoteFSReadMethod::threadpool;
auto reader_impl = std::make_unique<ReadBufferFromBlobStorageGather>(
path, blob_container_client, metadata, settings->max_single_read_retries,
settings->max_single_download_retries, read_settings, threadpool_read);
if (threadpool_read)
{
auto reader = getThreadPoolReader();
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(reader, read_settings, std::move(reader_impl));
}
else
{
auto buf = std::make_unique<ReadIndirectBufferFromRemoteFS>(std::move(reader_impl));
return std::make_unique<SeekAvoidingReadBuffer>(std::move(buf), current_settings.get()->min_bytes_for_seek);
}
}
std::unique_ptr<WriteBufferFromFileBase> DiskBlobStorage::writeFile(
const String & path,
size_t buf_size,
WriteMode mode)
{
auto metadata = readOrCreateMetaForWriting(path, mode);
auto blob_path = path + "_" + getRandomName(8); /// NOTE: path contains the tmp_* prefix in the blob name
LOG_TRACE(log, "{} to file by path: {}. Blob Storage path: {}",
mode == WriteMode::Rewrite ? "Write" : "Append", backQuote(metadata_disk->getPath() + path), blob_path);
auto buffer = std::make_unique<WriteBufferFromBlobStorage>(
blob_container_client,
blob_path,
current_settings.get()->max_single_part_upload_size,
buf_size);
return std::make_unique<WriteIndirectBufferFromRemoteFS<WriteBufferFromBlobStorage>>(std::move(buffer), std::move(metadata), blob_path);
}
DiskType DiskBlobStorage::getType() const
{
return DiskType::BlobStorage;
}
bool DiskBlobStorage::isRemote() const
{
return true;
}
bool DiskBlobStorage::supportZeroCopyReplication() const
{
return true;
}
bool DiskBlobStorage::checkUniqueId(const String & id) const
{
Azure::Storage::Blobs::ListBlobsOptions blobs_list_options;
blobs_list_options.Prefix = id;
blobs_list_options.PageSizeHint = 1;
auto blobs_list_response = blob_container_client->ListBlobs(blobs_list_options);
auto blobs_list = blobs_list_response.Blobs;
for (const auto & blob : blobs_list)
{
if (id == blob.Name)
return true;
}
return false;
}
void DiskBlobStorage::removeFromRemoteFS(RemoteFSPathKeeperPtr fs_paths_keeper)
{
auto * paths_keeper = dynamic_cast<BlobStoragePathKeeper *>(fs_paths_keeper.get());
if (paths_keeper)
{
for (const auto & path : paths_keeper->paths)
{
try
{
auto delete_info = blob_container_client->DeleteBlob(path);
if (!delete_info.Value.Deleted)
throw Exception(ErrorCodes::BLOB_STORAGE_ERROR, "Failed to delete file in Blob Storage: {}", path);
}
catch (const Azure::Storage::StorageException& e)
{
LOG_INFO(log, "Caught an error while deleting file {} : {}", path, e.Message);
throw e;
}
}
}
}
RemoteFSPathKeeperPtr DiskBlobStorage::createFSPathKeeper() const
{
return std::make_shared<BlobStoragePathKeeper>();
}
void DiskBlobStorage::applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String &, const DisksMap &)
{
auto new_settings = settings_getter(config, "storage_configuration.disks." + name, context);
current_settings.set(std::move(new_settings));
if (AsyncExecutor * exec = dynamic_cast<AsyncExecutor*>(&getExecutor()))
exec->setMaxThreads(current_settings.get()->thread_pool_size);
}
}
#endif

View File

@ -0,0 +1,88 @@
#pragma once
#if !defined(ARCADIA_BUILD)
#include <Common/config.h>
#endif
#if USE_AZURE_BLOB_STORAGE
#include <Disks/IDiskRemote.h>
#include <IO/ReadBufferFromBlobStorage.h>
#include <IO/WriteBufferFromBlobStorage.h>
#include <IO/SeekAvoidingReadBuffer.h>
#include <azure/identity/managed_identity_credential.hpp>
#include <azure/storage/blobs.hpp>
namespace DB
{
struct DiskBlobStorageSettings final
{
DiskBlobStorageSettings(
UInt64 max_single_part_upload_size_,
UInt64 min_bytes_for_seek_,
int max_single_read_retries,
int max_single_download_retries,
int thread_pool_size_);
size_t max_single_part_upload_size; /// NOTE: on 32-bit machines it will be at most 4GB, but size_t is also used in BufferBase for offset
UInt64 min_bytes_for_seek;
size_t max_single_read_retries;
size_t max_single_download_retries;
size_t thread_pool_size;
};
class DiskBlobStorage final : public IDiskRemote
{
public:
using SettingsPtr = std::unique_ptr<DiskBlobStorageSettings>;
using GetDiskSettings = std::function<SettingsPtr(const Poco::Util::AbstractConfiguration &, const String, ContextPtr)>;
DiskBlobStorage(
const String & name_,
DiskPtr metadata_disk_,
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> blob_container_client_,
SettingsPtr settings_,
GetDiskSettings settings_getter_);
std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
const ReadSettings & settings,
std::optional<size_t> estimated_size) const override;
std::unique_ptr<WriteBufferFromFileBase> writeFile(
const String & path,
size_t buf_size,
WriteMode mode) override;
DiskType getType() const override;
bool isRemote() const override;
bool supportZeroCopyReplication() const override;
bool checkUniqueId(const String & id) const override;
void removeFromRemoteFS(RemoteFSPathKeeperPtr fs_paths_keeper) override;
RemoteFSPathKeeperPtr createFSPathKeeper() const override;
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String &, const DisksMap &) override;
private:
/// client used to access the files in the Blob Storage cloud
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> blob_container_client;
MultiVersion<DiskBlobStorageSettings> current_settings;
/// Gets disk settings from context.
GetDiskSettings settings_getter;
};
}
#endif

View File

@ -0,0 +1,128 @@
#if !defined(ARCADIA_BUILD)
#include <Common/config.h>
#endif
#include <Disks/DiskFactory.h>
#if USE_AZURE_BLOB_STORAGE
#include <Disks/DiskRestartProxy.h>
#include <Disks/DiskCacheWrapper.h>
#include <Disks/RemoteDisksCommon.h>
#include <Disks/BlobStorage/DiskBlobStorage.h>
#include <Disks/BlobStorage/BlobStorageAuth.h>
namespace DB
{
namespace ErrorCodes
{
extern const int PATH_ACCESS_DENIED;
}
constexpr char test_file[] = "test.txt";
constexpr char test_str[] = "test";
constexpr size_t test_str_size = 4;
void checkWriteAccess(IDisk & disk)
{
auto file = disk.writeFile(test_file, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite);
file->write(test_str, test_str_size);
}
void checkReadAccess(IDisk & disk)
{
auto file = disk.readFile(test_file);
String buf(test_str_size, '0');
file->readStrict(buf.data(), test_str_size);
if (buf != test_str)
throw Exception("No read access to disk", ErrorCodes::PATH_ACCESS_DENIED);
}
void checkReadWithOffset(IDisk & disk)
{
auto file = disk.readFile(test_file);
auto offset = 2;
auto test_size = test_str_size - offset;
String buf(test_size, '0');
file->seek(offset, 0);
file->readStrict(buf.data(), test_size);
if (buf != test_str + offset)
throw Exception("Failed to read file with offset", ErrorCodes::PATH_ACCESS_DENIED);
}
void checkRemoveAccess(IDisk & disk)
{
disk.removeFile(test_file);
}
std::unique_ptr<DiskBlobStorageSettings> getSettings(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextPtr /*context*/)
{
return std::make_unique<DiskBlobStorageSettings>(
config.getUInt64(config_prefix + ".max_single_part_upload_size", 100 * 1024 * 1024),
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024),
config.getInt(config_prefix + ".max_single_read_retries", 3),
config.getInt(config_prefix + ".max_single_download_retries", 3),
config.getInt(config_prefix + ".thread_pool_size", 16)
);
}
void registerDiskBlobStorage(DiskFactory & factory)
{
auto creator = [](
const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context,
const DisksMap & /*map*/)
{
auto [metadata_path, metadata_disk] = prepareForLocalMetadata(name, config, config_prefix, context);
std::shared_ptr<IDisk> blob_storage_disk = std::make_shared<DiskBlobStorage>(
name,
metadata_disk,
getBlobContainerClient(config, config_prefix),
getSettings(config, config_prefix, context),
getSettings
);
if (!config.getBool(config_prefix + ".skip_access_check", false))
{
checkWriteAccess(*blob_storage_disk);
checkReadAccess(*blob_storage_disk);
checkReadWithOffset(*blob_storage_disk);
checkRemoveAccess(*blob_storage_disk);
}
blob_storage_disk->startup();
if (config.getBool(config_prefix + ".cache_enabled", true))
{
String cache_path = config.getString(config_prefix + ".cache_path", context->getPath() + "disks/" + name + "/cache/");
blob_storage_disk = wrapWithCache(blob_storage_disk, "blob-storage-cache", cache_path, metadata_path);
}
return std::make_shared<DiskRestartProxy>(blob_storage_disk);
};
factory.registerDiskType("blob_storage", creator);
}
}
#else
namespace DB
{
void registerDiskBlobStorage(DiskFactory &) {}
}
#endif

View File

@ -10,6 +10,7 @@ enum class DiskType
Local,
RAM,
S3,
BlobStorage,
HDFS,
Encrypted,
WebServer,
@ -25,6 +26,8 @@ inline String toString(DiskType disk_type)
return "memory";
case DiskType::S3:
return "s3";
case DiskType::BlobStorage:
return "blob_storage";
case DiskType::HDFS:
return "hdfs";
case DiskType::Encrypted:

View File

@ -1,5 +1,6 @@
#include <Disks/HDFS/DiskHDFS.h>
#include <Disks/DiskLocal.h>
#include <Disks/RemoteDisksCommon.h>
#include <IO/SeekAvoidingReadBuffer.h>
#include <Storages/HDFS/WriteBufferFromHDFS.h>
@ -160,17 +161,13 @@ void registerDiskHDFS(DiskFactory & factory)
ContextPtr context_,
const DisksMap & /*map*/) -> DiskPtr
{
fs::path disk = fs::path(context_->getPath()) / "disks" / name;
fs::create_directories(disk);
String uri{config.getString(config_prefix + ".endpoint")};
checkHDFSURL(uri);
if (uri.back() != '/')
throw Exception(ErrorCodes::BAD_ARGUMENTS, "HDFS path must ends with '/', but '{}' doesn't.", uri);
String metadata_path = context_->getPath() + "disks/" + name + "/";
auto metadata_disk = std::make_shared<DiskLocal>(name + "-metadata", metadata_path, 0);
auto metadata_disk = prepareForLocalMetadata(name, config, config_prefix, context_).second;
return std::make_shared<DiskHDFS>(
name, uri,

View File

@ -42,7 +42,7 @@ class IAsynchronousReader;
using AsynchronousReaderPtr = std::shared_ptr<IAsynchronousReader>;
/// Base Disk class for remote FS's, which are not posix-compatible (DiskS3 and DiskHDFS)
/// Base Disk class for remote FS's, which are not posix-compatible (e.g. DiskS3, DiskHDFS, DiskBlobStorage)
class IDiskRemote : public IDisk
{

View File

@ -8,6 +8,10 @@
#include <IO/ReadBufferFromS3.h>
#endif
#if USE_AZURE_BLOB_STORAGE
#include <IO/ReadBufferFromBlobStorage.h>
#endif
#if USE_HDFS
#include <Storages/HDFS/ReadBufferFromHDFS.h>
#endif
@ -30,6 +34,15 @@ SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBuffer(const S
#endif
#if USE_AZURE_BLOB_STORAGE
SeekableReadBufferPtr ReadBufferFromBlobStorageGather::createImplementationBuffer(const String & path, size_t read_until_position_) const
{
return std::make_unique<ReadBufferFromBlobStorage>(blob_container_client, path, max_single_read_retries,
max_single_download_retries, settings.remote_fs_buffer_size, threadpool_read, read_until_position_);
}
#endif
SeekableReadBufferPtr ReadBufferFromWebServerGather::createImplementationBuffer(const String & path, size_t read_until_position_) const
{
return std::make_unique<ReadBufferFromWebServer>(fs::path(uri) / path, context, settings, threadpool_read, read_until_position_);

View File

@ -5,6 +5,10 @@
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadSettings.h>
#if USE_AZURE_BLOB_STORAGE
#include <azure/storage/blobs.hpp>
#endif
namespace Aws
{
namespace S3
@ -97,6 +101,40 @@ private:
#endif
#if USE_AZURE_BLOB_STORAGE
/// Reads data from Blob Storage using paths stored in metadata.
class ReadBufferFromBlobStorageGather final : public ReadBufferFromRemoteFSGather
{
public:
ReadBufferFromBlobStorageGather(
const String & path_,
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> blob_container_client_,
IDiskRemote::Metadata metadata_,
size_t max_single_read_retries_,
size_t max_single_download_retries_,
const ReadSettings & settings_,
bool threadpool_read_ = false)
: ReadBufferFromRemoteFSGather(metadata_, path_)
, blob_container_client(blob_container_client_)
, max_single_read_retries(max_single_read_retries_)
, max_single_download_retries(max_single_download_retries_)
, settings(settings_)
, threadpool_read(threadpool_read_)
{
}
SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t read_until_position) const override;
private:
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> blob_container_client;
size_t max_single_read_retries;
size_t max_single_download_retries;
ReadSettings settings;
bool threadpool_read;
};
#endif
class ReadBufferFromWebServerGather final : public ReadBufferFromRemoteFSGather
{
public:

View File

@ -1,6 +1,7 @@
#include "WriteIndirectBufferFromRemoteFS.h"
#include <IO/WriteBufferFromS3.h>
#include <IO/WriteBufferFromBlobStorage.h>
#include <Storages/HDFS/WriteBufferFromHDFS.h>
#include <IO/WriteBufferFromHTTP.h>
@ -57,6 +58,11 @@ template
class WriteIndirectBufferFromRemoteFS<WriteBufferFromS3>;
#endif
#if USE_AZURE_BLOB_STORAGE
template
class WriteIndirectBufferFromRemoteFS<WriteBufferFromBlobStorage>;
#endif
#if USE_HDFS
template
class WriteIndirectBufferFromRemoteFS<WriteBufferFromHDFS>;

View File

@ -0,0 +1,53 @@
#include <Disks/RemoteDisksCommon.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
String getRandomName(size_t len, char first, char last)
{
std::uniform_int_distribution<int> distribution(first, last);
String res(len, ' ');
for (auto & c : res)
c = distribution(thread_local_rng);
return res;
}
std::shared_ptr<DiskCacheWrapper> wrapWithCache(
std::shared_ptr<IDisk> disk, String cache_name, String cache_path, String metadata_path)
{
if (metadata_path == cache_path)
throw Exception("Metadata and cache paths should be different: " + metadata_path, ErrorCodes::BAD_ARGUMENTS);
auto cache_disk = std::make_shared<DiskLocal>(cache_name, cache_path, 0);
auto cache_file_predicate = [] (const String & path)
{
return path.ends_with("idx") // index files.
|| path.ends_with("mrk") || path.ends_with("mrk2") || path.ends_with("mrk3") /// mark files.
|| path.ends_with("txt") || path.ends_with("dat");
};
return std::make_shared<DiskCacheWrapper>(disk, cache_disk, cache_file_predicate);
}
std::pair<String, DiskPtr> prepareForLocalMetadata(
const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context)
{
/// where the metadata files are stored locally
auto metadata_path = config.getString(config_prefix + ".metadata_path", context->getPath() + "disks/" + name + "/");
fs::create_directories(metadata_path);
auto metadata_disk = std::make_shared<DiskLocal>(name + "-metadata", metadata_path, 0);
return std::make_pair(metadata_path, metadata_disk);
}
}

View File

@ -0,0 +1,25 @@
#pragma once
#include <random>
#include <utility>
#include <Core/Types.h>
#include <Common/thread_local_rng.h>
#include <Disks/IDisk.h>
#include <Disks/DiskCacheWrapper.h>
namespace DB
{
String getRandomName(size_t len = 32, char first = 'a', char last = 'z');
std::shared_ptr<DiskCacheWrapper> wrapWithCache(
std::shared_ptr<IDisk> disk, String cache_name, String cache_path, String metadata_path);
std::pair<String, DiskPtr> prepareForLocalMetadata(
const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context);
}

View File

@ -25,6 +25,7 @@
#include <IO/WriteBufferFromS3.h>
#include <IO/WriteHelpers.h>
#include <Disks/RemoteDisksCommon.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/ReadIndirectBufferFromRemoteFS.h>
@ -100,15 +101,6 @@ private:
Chunks chunks;
};
String getRandomName()
{
std::uniform_int_distribution<int> distribution('a', 'z');
String res(32, ' '); /// The number of bits of entropy should be not less than 128.
for (auto & c : res)
c = distribution(thread_local_rng);
return res;
}
template <typename Result, typename Error>
void throwIfError(Aws::Utils::Outcome<Result, Error> & response)
{

View File

@ -18,6 +18,7 @@
#include "ProxyResolverConfiguration.h"
#include "Disks/DiskRestartProxy.h"
#include "Disks/DiskLocal.h"
#include "Disks/RemoteDisksCommon.h"
namespace DB
{
@ -176,9 +177,7 @@ void registerDiskS3(DiskFactory & factory)
if (uri.key.back() != '/')
throw Exception("S3 path must ends with '/', but '" + uri.key + "' doesn't.", ErrorCodes::BAD_ARGUMENTS);
String metadata_path = config.getString(config_prefix + ".metadata_path", context->getPath() + "disks/" + name + "/");
fs::create_directories(metadata_path);
auto metadata_disk = std::make_shared<DiskLocal>(name + "-metadata", metadata_path, 0);
auto [metadata_path, metadata_disk] = prepareForLocalMetadata(name, config, config_prefix, context);
std::shared_ptr<IDisk> s3disk = std::make_shared<DiskS3>(
name,
@ -199,24 +198,10 @@ void registerDiskS3(DiskFactory & factory)
s3disk->startup();
bool cache_enabled = config.getBool(config_prefix + ".cache_enabled", true);
if (cache_enabled)
if (config.getBool(config_prefix + ".cache_enabled", true))
{
String cache_path = config.getString(config_prefix + ".cache_path", context->getPath() + "disks/" + name + "/cache/");
if (metadata_path == cache_path)
throw Exception("Metadata and cache path should be different: " + metadata_path, ErrorCodes::BAD_ARGUMENTS);
auto cache_disk = std::make_shared<DiskLocal>("s3-cache", cache_path, 0);
auto cache_file_predicate = [] (const String & path)
{
return path.ends_with("idx") // index files.
|| path.ends_with("mrk") || path.ends_with("mrk2") || path.ends_with("mrk3") // mark files.
|| path.ends_with("txt") || path.ends_with("dat");
};
s3disk = std::make_shared<DiskCacheWrapper>(s3disk, cache_disk, cache_file_predicate);
s3disk = wrapWithCache(s3disk, "s3-cache", cache_path, metadata_path);
}
return std::make_shared<DiskRestartProxy>(s3disk);

View File

@ -14,6 +14,10 @@ void registerDiskMemory(DiskFactory & factory);
void registerDiskS3(DiskFactory & factory);
#endif
#if USE_AZURE_BLOB_STORAGE
void registerDiskBlobStorage(DiskFactory & factory);
#endif
#if USE_SSL
void registerDiskEncrypted(DiskFactory & factory);
#endif
@ -36,6 +40,10 @@ void registerDisks()
registerDiskS3(factory);
#endif
#if USE_AZURE_BLOB_STORAGE
registerDiskBlobStorage(factory);
#endif
#if USE_SSL
registerDiskEncrypted(factory);
#endif

View File

@ -0,0 +1,173 @@
#if !defined(ARCADIA_BUILD)
#include <Common/config.h>
#endif
#if USE_AZURE_BLOB_STORAGE
#include <IO/ReadBufferFromBlobStorage.h>
#include <IO/ReadBufferFromString.h>
#include <base/logger_useful.h>
#include <base/sleep.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int SEEK_POSITION_OUT_OF_BOUND;
extern const int RECEIVED_EMPTY_DATA;
extern const int LOGICAL_ERROR;
}
ReadBufferFromBlobStorage::ReadBufferFromBlobStorage(
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> blob_container_client_,
const String & path_,
size_t max_single_read_retries_,
size_t max_single_download_retries_,
size_t tmp_buffer_size_,
bool use_external_buffer_,
size_t read_until_position_)
: SeekableReadBuffer(nullptr, 0)
, blob_container_client(blob_container_client_)
, path(path_)
, max_single_read_retries(max_single_read_retries_)
, max_single_download_retries(max_single_download_retries_)
, tmp_buffer_size(tmp_buffer_size_)
, use_external_buffer(use_external_buffer_)
, read_until_position(read_until_position_)
{
if (!use_external_buffer)
{
tmp_buffer.resize(tmp_buffer_size);
data_ptr = tmp_buffer.data();
data_capacity = tmp_buffer_size;
}
}
bool ReadBufferFromBlobStorage::nextImpl()
{
if (read_until_position)
{
if (read_until_position == offset)
return false;
if (read_until_position < offset)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyond right offset ({} > {})", offset, read_until_position - 1);
}
if (!initialized)
initialize();
if (use_external_buffer)
{
data_ptr = internal_buffer.begin();
data_capacity = internal_buffer.size();
}
size_t to_read_bytes = std::min(total_size - offset, data_capacity);
size_t bytes_read = 0;
size_t sleep_time_with_backoff_milliseconds = 100;
for (size_t i = 0; i < max_single_read_retries; ++i)
{
try
{
bytes_read = data_stream->ReadToCount(reinterpret_cast<uint8_t *>(data_ptr), to_read_bytes);
break;
}
catch (const Azure::Storage::StorageException & e)
{
LOG_INFO(log, "Exception caught during Azure Read for file {} at attempt {} : {}", path, i, e.Message);
if (i + 1 == max_single_read_retries)
throw e;
sleepForMilliseconds(sleep_time_with_backoff_milliseconds);
sleep_time_with_backoff_milliseconds *= 2;
initialized = false;
initialize();
}
}
if (bytes_read == 0)
return false;
BufferBase::set(data_ptr, bytes_read, 0);
offset += bytes_read;
return true;
}
off_t ReadBufferFromBlobStorage::seek(off_t offset_, int whence)
{
if (initialized)
throw Exception("Seek is allowed only before first read attempt from the buffer.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
if (whence != SEEK_SET)
throw Exception("Only SEEK_SET mode is allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
if (offset_ < 0)
throw Exception("Seek position is out of bounds. Offset: " + std::to_string(offset_), ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
offset = offset_;
return offset;
}
off_t ReadBufferFromBlobStorage::getPosition()
{
return offset - available();
}
void ReadBufferFromBlobStorage::initialize()
{
if (initialized)
return;
Azure::Storage::Blobs::DownloadBlobOptions download_options;
Azure::Nullable<int64_t> length {};
if (read_until_position != 0)
length = {static_cast<int64_t>(read_until_position - offset)};
download_options.Range = {static_cast<int64_t>(offset), length};
blob_client = std::make_unique<Azure::Storage::Blobs::BlobClient>(blob_container_client->GetBlobClient(path));
size_t sleep_time_with_backoff_milliseconds = 100;
for (size_t i = 0; i < max_single_download_retries; ++i)
{
try
{
auto download_response = blob_client->Download(download_options);
data_stream = std::move(download_response.Value.BodyStream);
break;
}
catch (const Azure::Storage::StorageException & e)
{
LOG_INFO(log, "Exception caught during Azure Download for file {} at offset {} at attempt {} : {}", path, offset, i, e.Message);
if (i + 1 == max_single_download_retries)
throw e;
sleepForMilliseconds(sleep_time_with_backoff_milliseconds);
sleep_time_with_backoff_milliseconds *= 2;
}
}
if (data_stream == nullptr)
throw Exception(ErrorCodes::RECEIVED_EMPTY_DATA, "Null data stream obtained while downloading file {} from Blob Storage", path);
total_size = data_stream->Length() + offset;
initialized = true;
}
}
#endif

View File

@ -0,0 +1,63 @@
#pragma once
#if !defined(ARCADIA_BUILD)
#include <Common/config.h>
#endif
#if USE_AZURE_BLOB_STORAGE
#include <IO/HTTPCommon.h>
#include <IO/SeekableReadBuffer.h>
#include <IO/ReadSettings.h>
#include <azure/storage/blobs.hpp>
namespace DB
{
class ReadBufferFromBlobStorage : public SeekableReadBuffer
{
public:
explicit ReadBufferFromBlobStorage(
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> blob_container_client_,
const String & path_,
size_t max_single_read_retries_,
size_t max_single_download_retries_,
size_t tmp_buffer_size_,
bool use_external_buffer_ = false,
size_t read_until_position_ = 0
);
off_t seek(off_t off, int whence) override;
off_t getPosition() override;
bool nextImpl() override;
private:
void initialize();
std::unique_ptr<Azure::Core::IO::BodyStream> data_stream;
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> blob_container_client;
std::unique_ptr<Azure::Storage::Blobs::BlobClient> blob_client;
const String path;
size_t max_single_read_retries;
size_t max_single_download_retries;
std::vector<char> tmp_buffer;
size_t tmp_buffer_size;
bool use_external_buffer;
off_t read_until_position = 0;
off_t offset = 0;
size_t total_size;
bool initialized = false;
char * data_ptr;
size_t data_capacity;
Poco::Logger * log = &Poco::Logger::get("ReadBufferFromBlobStorage");
};
}
#endif

View File

@ -0,0 +1,67 @@
#if !defined(ARCADIA_BUILD)
#include <Common/config.h>
#endif
#if USE_AZURE_BLOB_STORAGE
#include <IO/WriteBufferFromBlobStorage.h>
#include <Disks/RemoteDisksCommon.h>
namespace DB
{
WriteBufferFromBlobStorage::WriteBufferFromBlobStorage(
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> blob_container_client_,
const String & blob_path_,
size_t max_single_part_upload_size_,
size_t buf_size_) :
BufferWithOwnMemory<WriteBuffer>(buf_size_, nullptr, 0),
blob_container_client(blob_container_client_),
max_single_part_upload_size(max_single_part_upload_size_),
blob_path(blob_path_) {}
WriteBufferFromBlobStorage::~WriteBufferFromBlobStorage()
{
finalize();
}
void WriteBufferFromBlobStorage::nextImpl()
{
if (!offset())
return;
auto * buffer_begin = working_buffer.begin();
auto len = offset();
auto block_blob_client = blob_container_client->GetBlockBlobClient(blob_path);
size_t read = 0;
while (read < len)
{
auto part_len = std::min(len - read, max_single_part_upload_size);
auto block_id = getRandomName(64);
block_ids.push_back(block_id);
Azure::Core::IO::MemoryBodyStream tmp_buffer(reinterpret_cast<uint8_t *>(buffer_begin + read), part_len);
block_blob_client.StageBlock(block_id, tmp_buffer);
read += part_len;
}
}
void WriteBufferFromBlobStorage::finalizeImpl()
{
next();
auto block_blob_client = blob_container_client->GetBlockBlobClient(blob_path);
block_blob_client.CommitBlockList(block_ids);
finalized = true;
}
}
#endif

View File

@ -0,0 +1,46 @@
#pragma once
#if !defined(ARCADIA_BUILD)
#include <Common/config.h>
#endif
#if USE_AZURE_BLOB_STORAGE
#include <memory>
#include <IO/BufferWithOwnMemory.h>
#include <IO/WriteBuffer.h>
#include <azure/storage/blobs.hpp>
#include <azure/core/io/body_stream.hpp>
namespace DB
{
class WriteBufferFromBlobStorage : public BufferWithOwnMemory<WriteBuffer>
{
public:
explicit WriteBufferFromBlobStorage(
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> blob_container_client_,
const String & blob_path_,
size_t max_single_part_upload_size_,
size_t buf_size_);
~WriteBufferFromBlobStorage() override;
void nextImpl() override;
private:
void finalizeImpl() override;
std::vector<std::string> block_ids;
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> blob_container_client;
size_t max_single_part_upload_size;
const String blob_path;
};
}
#endif

View File

@ -285,6 +285,8 @@ class ClickHouseCluster:
self.minio_redirect_ip = None
self.minio_redirect_port = 8080
self.with_azurite = False
# available when with_hdfs == True
self.hdfs_host = "hdfs1"
self.hdfs_ip = None
@ -744,6 +746,13 @@ class ClickHouseCluster:
'--file', p.join(docker_compose_yml_dir, 'docker_compose_minio.yml')]
return self.base_minio_cmd
def setup_azurite_cmd(self, instance, env_variables, docker_compose_yml_dir):
self.with_azurite = True
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_azurite.yml')])
self.base_azurite_cmd = ['docker-compose', '--env-file', instance.env_file, '--project-name', self.project_name,
'--file', p.join(docker_compose_yml_dir, 'docker_compose_azurite.yml')]
return self.base_azurite_cmd
def setup_cassandra_cmd(self, instance, env_variables, docker_compose_yml_dir):
self.with_cassandra = True
env_variables['CASSANDRA_PORT'] = str(self.cassandra_port)
@ -775,7 +784,7 @@ class ClickHouseCluster:
with_kafka=False, with_kerberized_kafka=False, with_rabbitmq=False, clickhouse_path_dir=None,
with_odbc_drivers=False, with_postgres=False, with_postgres_cluster=False, with_hdfs=False,
with_kerberized_hdfs=False, with_mongo=False, with_mongo_secure=False, with_nginx=False,
with_redis=False, with_minio=False, with_cassandra=False, with_jdbc_bridge=False,
with_redis=False, with_minio=False, with_azurite=False, with_cassandra=False, with_jdbc_bridge=False,
hostname=None, env_variables=None, image="clickhouse/integration-test", tag=None,
stay_alive=False, ipv4_address=None, ipv6_address=None, with_installed_binary=False, external_dirs=None, tmpfs=None,
zookeeper_docker_compose_path=None, minio_certs_dir=None, use_keeper=True,
@ -831,6 +840,7 @@ class ClickHouseCluster:
with_mongo=with_mongo or with_mongo_secure,
with_redis=with_redis,
with_minio=with_minio,
with_azurite=with_azurite,
with_cassandra=with_cassandra,
with_jdbc_bridge=with_jdbc_bridge,
server_bin_path=self.server_bin_path,
@ -934,6 +944,9 @@ class ClickHouseCluster:
if with_minio and not self.with_minio:
cmds.append(self.setup_minio_cmd(instance, env_variables, docker_compose_yml_dir))
if with_azurite and not self.with_azurite:
cmds.append(self.setup_azurite_cmd(instance, env_variables, docker_compose_yml_dir))
if minio_certs_dir is not None:
if self.minio_certs_dir is None:
self.minio_certs_dir = minio_certs_dir
@ -1385,6 +1398,23 @@ class ClickHouseCluster:
raise Exception("Can't wait Minio to start")
def wait_azurite_to_start(self, timeout=180):
from azure.storage.blob import BlobServiceClient
connection_string = "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;"
time.sleep(1)
start = time.time()
while time.time() - start < timeout:
try:
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
logging.debug(blob_service_client.get_account_information())
self.blob_service_client = blob_service_client
return
except Exception as ex:
logging.debug("Can't connect to Azurite: %s", str(ex))
time.sleep(1)
raise Exception("Can't wait Azurite to start")
def wait_schema_registry_to_start(self, timeout=180):
sr_client = CachedSchemaRegistryClient({"url":'http://localhost:{}'.format(self.schema_registry_port)})
start = time.time()
@ -1626,6 +1656,14 @@ class ClickHouseCluster:
logging.info("Trying to connect to Minio...")
self.wait_minio_to_start(secure=self.minio_certs_dir is not None)
if self.with_azurite and self.base_azurite_cmd:
azurite_start_cmd = self.base_azurite_cmd + common_opts
logging.info("Trying to create Azurite instance by command %s", ' '.join(map(str, azurite_start_cmd)))
run_and_check(azurite_start_cmd)
self.up_called = True
logging.info("Trying to connect to Azurite")
self.wait_azurite_to_start()
if self.with_cassandra and self.base_cassandra_cmd:
subprocess_check_call(self.base_cassandra_cmd + ['up', '-d'])
self.up_called = True
@ -1845,7 +1883,7 @@ class ClickHouseInstance:
self, cluster, base_path, name, base_config_dir, custom_main_configs, custom_user_configs,
custom_dictionaries,
macros, with_zookeeper, zookeeper_config_path, with_mysql_client, with_mysql, with_mysql8, with_mysql_cluster, with_kafka, with_kerberized_kafka,
with_rabbitmq, with_nginx, with_kerberized_hdfs, with_mongo, with_redis, with_minio, with_jdbc_bridge,
with_rabbitmq, with_nginx, with_kerberized_hdfs, with_mongo, with_redis, with_minio, with_azurite, with_jdbc_bridge,
with_cassandra, server_bin_path, odbc_bridge_bin_path, library_bridge_bin_path, clickhouse_path_dir, with_odbc_drivers, with_postgres, with_postgres_cluster,
clickhouse_start_command=CLICKHOUSE_START_COMMAND,
main_config_name="config.xml", users_config_name="users.xml", copy_common_configs=True,
@ -1889,6 +1927,7 @@ class ClickHouseInstance:
self.with_mongo = with_mongo
self.with_redis = with_redis
self.with_minio = with_minio
self.with_azurite = with_azurite
self.with_cassandra = with_cassandra
self.with_jdbc_bridge = with_jdbc_bridge
@ -2552,6 +2591,9 @@ class ClickHouseInstance:
if self.with_minio:
depends_on.append("minio1")
if self.with_azurite:
depends_on.append("azurite1")
self.cluster.env_variables.update(self.env_variables)
odbc_ini_path = ""

View File

@ -0,0 +1,43 @@
import string
import random
import threading
# By default the exceptions that was throwed in threads will be ignored
# (they will not mark the test as failed, only printed to stderr).
# Wrap thrading.Thread and re-throw exception on join()
class SafeThread(threading.Thread):
def __init__(self, target):
super().__init__()
self.target = target
self.exception = None
def run(self):
try:
self.target()
except Exception as e: # pylint: disable=broad-except
self.exception = e
def join(self, timeout=None):
super().join(timeout)
if self.exception:
raise self.exception
def random_string(length):
letters = string.ascii_letters
return ''.join(random.choice(letters) for i in range(length))
def generate_values(date_str, count, sign=1):
data = [[date_str, sign * (i + 1), random_string(10)] for i in range(count)]
data.sort(key=lambda tup: tup[1])
return ",".join(["('{}',{},'{}')".format(x, y, z) for x, y, z in data])
def replace_config(config_path, old, new):
config = open(config_path, 'r')
config_lines = config.readlines()
config.close()
config_lines = [line.replace(old, new) for line in config_lines]
config = open(config_path, 'w')
config.writelines(config_lines)
config.close()

View File

@ -15,3 +15,7 @@ log_file = pytest.log
log_file_level = DEBUG
log_file_format = %(asctime)s [ %(process)d ] %(levelname)s : %(message)s (%(filename)s:%(lineno)s, %(funcName)s)
log_file_date_format = %Y-%m-%d %H:%M:%S
markers =
long_run: marks tests which run for a long time
addopts =
-m 'not long_run'

View File

@ -0,0 +1,46 @@
<clickhouse>
<storage_configuration>
<disks>
<blob_storage_disk>
<type>blob_storage</type>
<storage_account_url>http://azurite1:10000/devstoreaccount1</storage_account_url>
<container_name>cont</container_name>
<skip_access_check>false</skip_access_check>
<!-- default credentials for Azurite storage account -->
<account_name>devstoreaccount1</account_name>
<account_key>Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==</account_key>
</blob_storage_disk>
</disks>
<policies>
<blob_storage_policy>
<volumes>
<main>
<disk>blob_storage_disk</disk>
</main>
</volumes>
</blob_storage_policy>
</policies>
</storage_configuration>
<remote_servers>
<test_cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
<macros>
<cluster>test_cluster</cluster>
</macros>
</clickhouse>

View File

@ -0,0 +1,83 @@
import logging
import pytest
from helpers.cluster import ClickHouseCluster
logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler())
NODE1 = "node1"
NODE2 = "node2"
TABLE_NAME = "blob_storage_table"
CONTAINER_NAME = "cont"
CLUSTER_NAME = "test_cluster"
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance(NODE1, main_configs=["configs/config.d/storage_conf.xml"], macros={'replica': '1'},
with_azurite=True,
with_zookeeper=True)
cluster.add_instance(NODE2, main_configs=["configs/config.d/storage_conf.xml"], macros={'replica': '2'},
with_azurite=True,
with_zookeeper=True)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
yield cluster
finally:
cluster.shutdown()
def create_table(node, table_name, replica, **additional_settings):
settings = {
"storage_policy": "blob_storage_policy",
"old_parts_lifetime": 1,
}
settings.update(additional_settings)
create_table_statement = f"""
CREATE TABLE {table_name} ON CLUSTER {CLUSTER_NAME} (
id Int64,
data String
) ENGINE=ReplicatedMergeTree('/clickhouse/tables/{table_name}', '{{replica}}')
ORDER BY id
SETTINGS {",".join((k+"="+repr(v) for k, v in settings.items()))}"""
node.query(f"DROP TABLE IF EXISTS {table_name}")
node.query(create_table_statement)
assert node.query(f"SELECT COUNT(*) FROM {table_name} FORMAT Values") == "(0)"
def get_large_objects_count(blob_container_client, large_size_threshold=100):
return sum(blob['size'] > large_size_threshold for blob in blob_container_client.list_blobs())
def test_zero_copy_replication(cluster):
node1 = cluster.instances[NODE1]
node2 = cluster.instances[NODE2]
create_table(node1, TABLE_NAME, 1)
blob_container_client = cluster.blob_service_client.get_container_client(CONTAINER_NAME)
values1 = "(0,'data'),(1,'data')"
values2 = "(2,'data'),(3,'data')"
node1.query(f"INSERT INTO {TABLE_NAME} VALUES {values1}")
node2.query(f"SYSTEM SYNC REPLICA {TABLE_NAME}")
assert node1.query(f"SELECT * FROM {TABLE_NAME} order by id FORMAT Values") == values1
assert node2.query(f"SELECT * FROM {TABLE_NAME} order by id FORMAT Values") == values1
# Based on version 21.x - should be only one file with size 100+ (checksums.txt), used by both nodes
assert get_large_objects_count(blob_container_client) == 1
node2.query(f"INSERT INTO {TABLE_NAME} VALUES {values2}")
node1.query(f"SYSTEM SYNC REPLICA {TABLE_NAME}")
assert node2.query(f"SELECT * FROM {TABLE_NAME} order by id FORMAT Values") == values1 + "," + values2
assert node1.query(f"SELECT * FROM {TABLE_NAME} order by id FORMAT Values") == values1 + "," + values2
assert get_large_objects_count(blob_container_client) == 2

View File

@ -0,0 +1,5 @@
<clickhouse>
<background_processing_pool_thread_sleep_seconds>0.5</background_processing_pool_thread_sleep_seconds>
<background_processing_pool_task_sleep_seconds_when_no_work_min>0.5</background_processing_pool_task_sleep_seconds_when_no_work_min>
<background_processing_pool_task_sleep_seconds_when_no_work_max>0.5</background_processing_pool_task_sleep_seconds_when_no_work_max>
</clickhouse>

View File

@ -0,0 +1,33 @@
<clickhouse>
<storage_configuration>
<disks>
<blob_storage_disk>
<type>blob_storage</type>
<storage_account_url>http://azurite1:10000/devstoreaccount1</storage_account_url>
<container_name>cont</container_name>
<container_already_exists>false</container_already_exists>
<skip_access_check>false</skip_access_check>
<!-- default credentials for Azurite storage account -->
<account_name>devstoreaccount1</account_name>
<account_key>Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==</account_key>
<max_single_part_upload_size>33554432</max_single_part_upload_size>
</blob_storage_disk>
<hdd>
<type>local</type>
<path>/</path>
</hdd>
</disks>
<policies>
<blob_storage_policy>
<volumes>
<main>
<disk>blob_storage_disk</disk>
</main>
<external>
<disk>hdd</disk>
</external>
</volumes>
</blob_storage_policy>
</policies>
</storage_configuration>
</clickhouse>

View File

@ -0,0 +1,20 @@
<?xml version="1.0"?>
<clickhouse>
<tcp_port>9000</tcp_port>
<listen_host>127.0.0.1</listen_host>
<openSSL>
<client>
<cacheSessions>true</cacheSessions>
<verificationMode>none</verificationMode>
<invalidCertificateHandler>
<name>AcceptCertificateHandler</name>
</invalidCertificateHandler>
</client>
</openSSL>
<max_concurrent_queries>500</max_concurrent_queries>
<mark_cache_size>5368709120</mark_cache_size>
<path>./clickhouse/</path>
<users_config>users.xml</users_config>
</clickhouse>

View File

@ -0,0 +1,344 @@
import logging
import time
import os
import pytest
from helpers.cluster import ClickHouseCluster, get_instances_dir
from helpers.utility import generate_values, replace_config, SafeThread
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
CONFIG_PATH = os.path.join(SCRIPT_DIR, './{}/node/configs/config.d/storage_conf.xml'.format(get_instances_dir()))
NODE_NAME = "node"
TABLE_NAME = "blob_storage_table"
BLOB_STORAGE_DISK = "blob_storage_disk"
LOCAL_DISK = "hdd"
CONTAINER_NAME = "cont"
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance(NODE_NAME,
main_configs=["configs/config.d/storage_conf.xml", "configs/config.d/bg_processing_pool_conf.xml"],
with_azurite=True)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
yield cluster
finally:
cluster.shutdown()
def create_table(node, table_name, **additional_settings):
settings = {
"storage_policy": "blob_storage_policy",
"old_parts_lifetime": 1,
"index_granularity": 512
}
settings.update(additional_settings)
create_table_statement = f"""
CREATE TABLE {table_name} (
dt Date,
id Int64,
data String,
INDEX min_max (id) TYPE minmax GRANULARITY 3
) ENGINE=MergeTree()
PARTITION BY dt
ORDER BY (dt, id)
SETTINGS {",".join((k+"="+repr(v) for k, v in settings.items()))}"""
node.query(f"DROP TABLE IF EXISTS {table_name}")
node.query(create_table_statement)
assert node.query(f"SELECT COUNT(*) FROM {table_name} FORMAT Values") == "(0)"
def test_create_table(cluster):
node = cluster.instances[NODE_NAME]
create_table(node, TABLE_NAME)
def test_read_after_cache_is_wiped(cluster):
node = cluster.instances[NODE_NAME]
create_table(node, TABLE_NAME)
values = "('2021-11-13',3,'hello'),('2021-11-14',4,'heyo')"
node.query(f"INSERT INTO {TABLE_NAME} VALUES {values}")
# Wipe cache
cluster.exec_in_container(cluster.get_container_id(NODE_NAME), ["rm", "-rf", "/var/lib/clickhouse/disks/blob_storage_disk/cache/"])
# After cache is populated again, only .bin files should be accessed from Blob Storage.
assert node.query(f"SELECT * FROM {TABLE_NAME} order by dt, id FORMAT Values") == values
def test_simple_insert_select(cluster):
node = cluster.instances[NODE_NAME]
create_table(node, TABLE_NAME)
values = "('2021-11-13',3,'hello')"
node.query(f"INSERT INTO {TABLE_NAME} VALUES {values}")
assert node.query(f"SELECT dt, id, data FROM {TABLE_NAME} FORMAT Values") == values
blob_container_client = cluster.blob_service_client.get_container_client(CONTAINER_NAME)
assert len(list(blob_container_client.list_blobs())) >= 12 # 1 format file + 2 skip index files + 9 regular MergeTree files + leftovers from other tests
def test_inserts_selects(cluster):
node = cluster.instances[NODE_NAME]
create_table(node, TABLE_NAME)
values1 = generate_values('2020-01-03', 4096)
node.query(f"INSERT INTO {TABLE_NAME} VALUES {values1}")
assert node.query(f"SELECT * FROM {TABLE_NAME} order by dt, id FORMAT Values") == values1
values2 = generate_values('2020-01-04', 4096)
node.query(f"INSERT INTO {TABLE_NAME} VALUES {values2}")
assert node.query(f"SELECT * FROM {TABLE_NAME} ORDER BY dt, id FORMAT Values") == values1 + "," + values2
assert node.query(f"SELECT count(*) FROM {TABLE_NAME} where id = 1 FORMAT Values") == "(2)"
@pytest.mark.parametrize(
"merge_vertical", [
(True),
(False),
])
def test_insert_same_partition_and_merge(cluster, merge_vertical):
settings = {}
if merge_vertical:
settings['vertical_merge_algorithm_min_rows_to_activate'] = 0
settings['vertical_merge_algorithm_min_columns_to_activate'] = 0
node = cluster.instances[NODE_NAME]
create_table(node, TABLE_NAME, **settings)
node.query(f"SYSTEM STOP MERGES {TABLE_NAME}")
node.query(f"INSERT INTO {TABLE_NAME} VALUES {generate_values('2020-01-03', 1024)}")
node.query(f"INSERT INTO {TABLE_NAME} VALUES {generate_values('2020-01-03', 2048)}")
node.query(f"INSERT INTO {TABLE_NAME} VALUES {generate_values('2020-01-03', 4096)}")
node.query(f"INSERT INTO {TABLE_NAME} VALUES {generate_values('2020-01-03', 1024, -1)}")
node.query(f"INSERT INTO {TABLE_NAME} VALUES {generate_values('2020-01-03', 2048, -1)}")
node.query(f"INSERT INTO {TABLE_NAME} VALUES {generate_values('2020-01-03', 4096, -1)}")
assert node.query(f"SELECT sum(id) FROM {TABLE_NAME} FORMAT Values") == "(0)"
assert node.query(f"SELECT count(distinct(id)) FROM {TABLE_NAME} FORMAT Values") == "(8192)"
node.query(f"SYSTEM START MERGES {TABLE_NAME}")
# Wait for merges and old parts deletion
for attempt in range(0, 10):
parts_count = node.query(f"SELECT COUNT(*) FROM system.parts WHERE table = '{TABLE_NAME}' FORMAT Values")
if parts_count == "(1)":
break
if attempt == 9:
assert parts_count == "(1)"
time.sleep(1)
assert node.query(f"SELECT sum(id) FROM {TABLE_NAME} FORMAT Values") == "(0)"
assert node.query(f"SELECT count(distinct(id)) FROM {TABLE_NAME} FORMAT Values") == "(8192)"
def test_alter_table_columns(cluster):
node = cluster.instances[NODE_NAME]
create_table(node, TABLE_NAME)
node.query(f"INSERT INTO {TABLE_NAME} VALUES {generate_values('2020-01-03', 4096)}")
node.query(f"INSERT INTO {TABLE_NAME} VALUES {generate_values('2020-01-03', 4096, -1)}")
node.query(f"ALTER TABLE {TABLE_NAME} ADD COLUMN col1 UInt64 DEFAULT 1")
# To ensure parts have been merged
node.query(f"OPTIMIZE TABLE {TABLE_NAME}")
assert node.query(f"SELECT sum(col1) FROM {TABLE_NAME} FORMAT Values") == "(8192)"
assert node.query(f"SELECT sum(col1) FROM {TABLE_NAME} WHERE id > 0 FORMAT Values") == "(4096)"
node.query(f"ALTER TABLE {TABLE_NAME} MODIFY COLUMN col1 String", settings={"mutations_sync": 2})
assert node.query(f"SELECT distinct(col1) FROM {TABLE_NAME} FORMAT Values") == "('1')"
def test_attach_detach_partition(cluster):
node = cluster.instances[NODE_NAME]
create_table(node, TABLE_NAME)
node.query(f"INSERT INTO {TABLE_NAME} VALUES {generate_values('2020-01-03', 4096)}")
node.query(f"INSERT INTO {TABLE_NAME} VALUES {generate_values('2020-01-04', 4096)}")
assert node.query(f"SELECT count(*) FROM {TABLE_NAME} FORMAT Values") == "(8192)"
node.query(f"ALTER TABLE {TABLE_NAME} DETACH PARTITION '2020-01-03'")
assert node.query(f"SELECT count(*) FROM {TABLE_NAME} FORMAT Values") == "(4096)"
node.query(f"ALTER TABLE {TABLE_NAME} ATTACH PARTITION '2020-01-03'")
assert node.query(f"SELECT count(*) FROM {TABLE_NAME} FORMAT Values") == "(8192)"
node.query(f"ALTER TABLE {TABLE_NAME} DROP PARTITION '2020-01-03'")
assert node.query(f"SELECT count(*) FROM {TABLE_NAME} FORMAT Values") == "(4096)"
node.query(f"ALTER TABLE {TABLE_NAME} DETACH PARTITION '2020-01-04'")
node.query(f"ALTER TABLE {TABLE_NAME} DROP DETACHED PARTITION '2020-01-04'", settings={"allow_drop_detached": 1})
assert node.query(f"SELECT count(*) FROM {TABLE_NAME} FORMAT Values") == "(0)"
def test_move_partition_to_another_disk(cluster):
node = cluster.instances[NODE_NAME]
create_table(node, TABLE_NAME)
node.query(f"INSERT INTO {TABLE_NAME} VALUES {generate_values('2020-01-03', 4096)}")
node.query(f"INSERT INTO {TABLE_NAME} VALUES {generate_values('2020-01-04', 4096)}")
assert node.query(f"SELECT count(*) FROM {TABLE_NAME} FORMAT Values") == "(8192)"
node.query(f"ALTER TABLE {TABLE_NAME} MOVE PARTITION '2020-01-04' TO DISK '{LOCAL_DISK}'")
assert node.query(f"SELECT count(*) FROM {TABLE_NAME} FORMAT Values") == "(8192)"
node.query(f"ALTER TABLE {TABLE_NAME} MOVE PARTITION '2020-01-04' TO DISK '{BLOB_STORAGE_DISK}'")
assert node.query(f"SELECT count(*) FROM {TABLE_NAME} FORMAT Values") == "(8192)"
def test_table_manipulations(cluster):
node = cluster.instances[NODE_NAME]
create_table(node, TABLE_NAME)
renamed_table = TABLE_NAME + "_renamed"
node.query(f"INSERT INTO {TABLE_NAME} VALUES {generate_values('2020-01-03', 4096)}")
node.query(f"INSERT INTO {TABLE_NAME} VALUES {generate_values('2020-01-04', 4096)}")
node.query(f"RENAME TABLE {TABLE_NAME} TO {renamed_table}")
assert node.query(f"SELECT count(*) FROM {renamed_table} FORMAT Values") == "(8192)"
node.query(f"RENAME TABLE {renamed_table} TO {TABLE_NAME}")
assert node.query(f"CHECK TABLE {TABLE_NAME} FORMAT Values") == "(1)"
node.query(f"DETACH TABLE {TABLE_NAME}")
node.query(f"ATTACH TABLE {TABLE_NAME}")
assert node.query(f"SELECT count(*) FROM {TABLE_NAME} FORMAT Values") == "(8192)"
node.query(f"TRUNCATE TABLE {TABLE_NAME}")
assert node.query(f"SELECT count(*) FROM {TABLE_NAME} FORMAT Values") == "(0)"
@pytest.mark.long_run
def test_move_replace_partition_to_another_table(cluster):
node = cluster.instances[NODE_NAME]
create_table(node, TABLE_NAME)
table_clone_name = TABLE_NAME + "_clone"
node.query(f"INSERT INTO {TABLE_NAME} VALUES {generate_values('2020-01-03', 256)}")
node.query(f"INSERT INTO {TABLE_NAME} VALUES {generate_values('2020-01-04', 256)}")
node.query(f"INSERT INTO {TABLE_NAME} VALUES {generate_values('2020-01-05', 256, -1)}")
node.query(f"INSERT INTO {TABLE_NAME} VALUES {generate_values('2020-01-06', 256, -1)}")
assert node.query(f"SELECT sum(id) FROM {TABLE_NAME} FORMAT Values") == "(0)"
assert node.query(f"SELECT count(*) FROM {TABLE_NAME} FORMAT Values") == "(1024)"
create_table(node, table_clone_name)
node.query(f"ALTER TABLE {TABLE_NAME} MOVE PARTITION '2020-01-03' TO TABLE {table_clone_name}")
node.query(f"ALTER TABLE {TABLE_NAME} MOVE PARTITION '2020-01-05' TO TABLE {table_clone_name}")
assert node.query(f"SELECT sum(id) FROM {TABLE_NAME} FORMAT Values") == "(0)"
assert node.query(f"SELECT count(*) FROM {TABLE_NAME} FORMAT Values") == "(512)"
assert node.query(f"SELECT sum(id) FROM {table_clone_name} FORMAT Values") == "(0)"
assert node.query(f"SELECT count(*) FROM {table_clone_name} FORMAT Values") == "(512)"
# Add new partitions to source table, but with different values and replace them from copied table.
node.query(f"INSERT INTO {TABLE_NAME} VALUES {generate_values('2020-01-03', 256, -1)}")
node.query(f"INSERT INTO {TABLE_NAME} VALUES {generate_values('2020-01-05', 256)}")
assert node.query(f"SELECT sum(id) FROM {TABLE_NAME} FORMAT Values") == "(0)"
assert node.query(f"SELECT count(*) FROM {TABLE_NAME} FORMAT Values") == "(1024)"
node.query(f"ALTER TABLE {TABLE_NAME} REPLACE PARTITION '2020-01-03' FROM {table_clone_name}")
node.query(f"ALTER TABLE {TABLE_NAME} REPLACE PARTITION '2020-01-05' FROM {table_clone_name}")
assert node.query(f"SELECT sum(id) FROM {TABLE_NAME} FORMAT Values") == "(0)"
assert node.query(f"SELECT count(*) FROM {TABLE_NAME} FORMAT Values") == "(1024)"
assert node.query(f"SELECT sum(id) FROM {table_clone_name} FORMAT Values") == "(0)"
assert node.query(f"SELECT count(*) FROM {table_clone_name} FORMAT Values") == "(512)"
node.query(f"DROP TABLE {table_clone_name} NO DELAY")
assert node.query(f"SELECT sum(id) FROM {TABLE_NAME} FORMAT Values") == "(0)"
assert node.query(f"SELECT count(*) FROM {TABLE_NAME} FORMAT Values") == "(1024)"
node.query(f"ALTER TABLE {TABLE_NAME} FREEZE")
node.query(f"DROP TABLE {TABLE_NAME} NO DELAY")
def test_freeze_unfreeze(cluster):
node = cluster.instances[NODE_NAME]
create_table(node, TABLE_NAME)
backup1 = 'backup1'
backup2 = 'backup2'
node.query(f"INSERT INTO {TABLE_NAME} VALUES {generate_values('2020-01-03', 4096)}")
node.query(f"ALTER TABLE {TABLE_NAME} FREEZE WITH NAME '{backup1}'")
node.query(f"INSERT INTO {TABLE_NAME} VALUES {generate_values('2020-01-04', 4096)}")
node.query(f"ALTER TABLE {TABLE_NAME} FREEZE WITH NAME '{backup2}'")
node.query(f"TRUNCATE TABLE {TABLE_NAME}")
# Unfreeze single partition from backup1.
node.query(f"ALTER TABLE {TABLE_NAME} UNFREEZE PARTITION '2020-01-03' WITH NAME '{backup1}'")
# Unfreeze all partitions from backup2.
node.query(f"ALTER TABLE {TABLE_NAME} UNFREEZE WITH NAME '{backup2}'")
def test_apply_new_settings(cluster):
node = cluster.instances[NODE_NAME]
create_table(node, TABLE_NAME)
node.query(f"INSERT INTO {TABLE_NAME} VALUES {generate_values('2020-01-03', 4096)}")
# Force multi-part upload mode.
replace_config(
CONFIG_PATH,
"<max_single_part_upload_size>33554432</max_single_part_upload_size>",
"<max_single_part_upload_size>4096</max_single_part_upload_size>")
node.query("SYSTEM RELOAD CONFIG")
node.query(f"INSERT INTO {TABLE_NAME} VALUES {generate_values('2020-01-04', 4096, -1)}")
# NOTE: this test takes a couple of minutes when run together with other tests
@pytest.mark.long_run
def test_restart_during_load(cluster):
node = cluster.instances[NODE_NAME]
create_table(node, TABLE_NAME)
# Force multi-part upload mode.
replace_config(CONFIG_PATH, "<container_already_exists>false</container_already_exists>", "")
node.query(f"INSERT INTO {TABLE_NAME} VALUES {generate_values('2020-01-04', 4096)}")
node.query(f"INSERT INTO {TABLE_NAME} VALUES {generate_values('2020-01-05', 4096, -1)}")
def read():
for ii in range(0, 5):
logging.info(f"Executing {ii} query")
assert node.query(f"SELECT sum(id) FROM {TABLE_NAME} FORMAT Values") == "(0)"
logging.info(f"Query {ii} executed")
time.sleep(0.2)
def restart_disk():
for iii in range(0, 2):
logging.info(f"Restarting disk, attempt {iii}")
node.query(f"SYSTEM RESTART DISK {BLOB_STORAGE_DISK}")
logging.info(f"Disk restarted, attempt {iii}")
time.sleep(0.5)
threads = []
for _ in range(0, 4):
threads.append(SafeThread(target=read))
threads.append(SafeThread(target=restart_disk))
for thread in threads:
thread.start()
for thread in threads:
thread.join()

View File

@ -1,12 +1,10 @@
import logging
import random
import string
import time
import threading
import os
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.utility import generate_values
from pyhdfs import HdfsClient
@ -43,17 +41,6 @@ FILES_OVERHEAD_PER_PART_WIDE = FILES_OVERHEAD_PER_COLUMN * 3 + 2 + 6 + 1
FILES_OVERHEAD_PER_PART_COMPACT = 10 + 1
def random_string(length):
letters = string.ascii_letters
return ''.join(random.choice(letters) for i in range(length))
def generate_values(date_str, count, sign=1):
data = [[date_str, sign * (i + 1), random_string(10)] for i in range(count)]
data.sort(key=lambda tup: tup[1])
return ",".join(["('{}',{},'{}')".format(x, y, z) for x, y, z in data])
@pytest.fixture(scope="module")
def cluster():
try:

View File

@ -1,47 +1,16 @@
import logging
import random
import string
import time
import threading
import os
import pytest
from helpers.cluster import ClickHouseCluster, get_instances_dir
from helpers.utility import generate_values, replace_config, SafeThread
# By default the exceptions that was throwed in threads will be ignored
# (they will not mark the test as failed, only printed to stderr).
#
# Wrap thrading.Thread and re-throw exception on join()
class SafeThread(threading.Thread):
def __init__(self, target):
super().__init__()
self.target = target
self.exception = None
def run(self):
try:
self.target()
except Exception as e: # pylint: disable=broad-except
self.exception = e
def join(self, timeout=None):
super().join(timeout)
if self.exception:
raise self.exception
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
CONFIG_PATH = os.path.join(SCRIPT_DIR, './{}/node/configs/config.d/storage_conf.xml'.format(get_instances_dir()))
def replace_config(old, new):
config = open(CONFIG_PATH, 'r')
config_lines = config.readlines()
config.close()
config_lines = [line.replace(old, new) for line in config_lines]
config = open(CONFIG_PATH, 'w')
config.writelines(config_lines)
config.close()
@pytest.fixture(scope="module")
def cluster():
try:
@ -66,17 +35,6 @@ FILES_OVERHEAD_PER_PART_WIDE = FILES_OVERHEAD_PER_COLUMN * 3 + 2 + 6 + 1
FILES_OVERHEAD_PER_PART_COMPACT = 10 + 1
def random_string(length):
letters = string.ascii_letters
return ''.join(random.choice(letters) for i in range(length))
def generate_values(date_str, count, sign=1):
data = [[date_str, sign * (i + 1), random_string(10)] for i in range(count)]
data.sort(key=lambda tup: tup[1])
return ",".join(["('{}',{},'{}')".format(x, y, z) for x, y, z in data])
def create_table(node, table_name, **additional_settings):
settings = {
"storage_policy": "s3",
@ -442,8 +400,9 @@ def test_s3_disk_apply_new_settings(cluster, node_name):
s3_requests_to_write_partition = get_s3_requests() - s3_requests_before
# Force multi-part upload mode.
replace_config("<s3_max_single_part_upload_size>33554432</s3_max_single_part_upload_size>",
"<s3_max_single_part_upload_size>0</s3_max_single_part_upload_size>")
replace_config(CONFIG_PATH,
"<s3_max_single_part_upload_size>33554432</s3_max_single_part_upload_size>",
"<s3_max_single_part_upload_size>0</s3_max_single_part_upload_size>")
node.query("SYSTEM RELOAD CONFIG")