Merge branch 'master' into retries-in-submodules

This commit is contained in:
Alexey Milovidov 2024-05-20 17:32:00 +02:00
commit d5c52e08df
51 changed files with 628 additions and 132 deletions

View File

@ -7,7 +7,7 @@ endif()
set (LIBRARY_DIR "${ClickHouse_SOURCE_DIR}/contrib/libbcrypt")
set(SRCS
set(SRCS
"${LIBRARY_DIR}/bcrypt.c"
"${LIBRARY_DIR}/crypt_blowfish/crypt_blowfish.c"
"${LIBRARY_DIR}/crypt_blowfish/crypt_gensalt.c"
@ -16,4 +16,13 @@ set(SRCS
add_library(_bcrypt ${SRCS})
target_include_directories(_bcrypt SYSTEM PUBLIC "${LIBRARY_DIR}")
# Avoid conflicts for crypt_r on FreeBSD [1]:
#
# - char *crypt_r(__const char *key, __const char *setting, void *data);
# - char *crypt_r(const char *, const char *, struct crypt_data *);
#
# [1]: https://github.com/freebsd/freebsd-src/commit/5f521d7ba72145092ea23ff6081d8791ad6c1f9d
#
# NOTE: ow-crypt.h is unsed only internally, so PRIVATE is enough
target_compile_definitions(_bcrypt PRIVATE -D__SKIP_GNU)
add_library(ch_contrib::bcrypt ALIAS _bcrypt)

View File

@ -58,8 +58,14 @@ echo "ATTACH DATABASE system ENGINE=Ordinary" > /var/lib/clickhouse/metadata/sys
# Install previous release packages
install_packages previous_release_package_folder
# Save old settings from system table for settings changes check
clickhouse-local -q "select * from system.settings format Native" > old_settings.native
# NOTE: we need to run clickhouse-local under script to get settings without any adjustments, like clickhouse-local does in case of stdout is not a tty
function save_settings_clean()
{
local out=$1 && shift
script -q -c "clickhouse-local -q \"select * from system.settings into outfile '$out'\"" --log-out /dev/null
}
save_settings_clean 'old_settings.native'
# Initial run without S3 to create system.*_log on local file system to make it
# available for dump via clickhouse-local
@ -183,7 +189,7 @@ configure
IS_SANITIZED=$(clickhouse-local --query "SELECT value LIKE '%-fsanitize=%' FROM system.build_options WHERE name = 'CXX_FLAGS'")
if [ "${IS_SANITIZED}" -eq "0" ]
then
clickhouse-local -q "select * from system.settings format Native" > new_settings.native
save_settings_clean 'new_settings.native'
clickhouse-local -nmq "
CREATE TABLE old_settings AS file('old_settings.native');
CREATE TABLE new_settings AS file('new_settings.native');

View File

@ -108,7 +108,7 @@ Columns:
- `used_aggregate_function_combinators` ([Array(String)](../../sql-reference/data-types/array.md)) — Canonical names of `aggregate functions combinators`, which were used during query execution.
- `used_database_engines` ([Array(String)](../../sql-reference/data-types/array.md)) — Canonical names of `database engines`, which were used during query execution.
- `used_data_type_families` ([Array(String)](../../sql-reference/data-types/array.md)) — Canonical names of `data type families`, which were used during query execution.
- `used_dictionaries` ([Array(String)](../../sql-reference/data-types/array.md)) — Canonical names of `dictionaries`, which were used during query execution.
- `used_dictionaries` ([Array(String)](../../sql-reference/data-types/array.md)) — Canonical names of `dictionaries`, which were used during query execution. For dictionaries configured using an XML file this is the name of the dictionary, and for dictionaries created by an SQL statement, the canonical name is the fully qualified object name.
- `used_formats` ([Array(String)](../../sql-reference/data-types/array.md)) — Canonical names of `formats`, which were used during query execution.
- `used_functions` ([Array(String)](../../sql-reference/data-types/array.md)) — Canonical names of `functions`, which were used during query execution.
- `used_storages` ([Array(String)](../../sql-reference/data-types/array.md)) — Canonical names of `storages`, which were used during query execution.

View File

@ -148,6 +148,7 @@ if (BUILD_STANDALONE_KEEPER)
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/IO/createReadBufferFromFileBase.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/IO/ReadBufferFromRemoteFSGather.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/IO/IOUringReader.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/IO/getIOUringReader.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/IO/WriteBufferFromTemporaryFile.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/IO/WriteBufferWithFinalizeCallback.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/IO/AsynchronousBoundedReadBuffer.cpp

View File

@ -39,6 +39,7 @@ public:
std::optional<UUID> backup_uuid;
bool deduplicate_files = true;
bool allow_s3_native_copy = true;
bool allow_azure_native_copy = true;
bool use_same_s3_credentials_for_base_backup = false;
bool azure_attempt_to_create_container = true;
ReadSettings read_settings;

View File

@ -31,22 +31,28 @@ namespace ErrorCodes
BackupReaderAzureBlobStorage::BackupReaderAzureBlobStorage(
StorageAzureBlob::Configuration configuration_,
bool allow_azure_native_copy,
const ReadSettings & read_settings_,
const WriteSettings & write_settings_,
const ContextPtr & context_)
: BackupReaderDefault(read_settings_, write_settings_, getLogger("BackupReaderAzureBlobStorage"))
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::Azure, MetadataStorageType::None, configuration_.container, false, false}
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::Azure, MetadataStorageType::None, configuration_.getConnectionURLWithContainer(), false, false}
, configuration(configuration_)
{
auto client_ptr = StorageAzureBlob::createClient(configuration, /* is_read_only */ false);
client_ptr->SetClickhouseOptions(Azure::Storage::Blobs::ClickhouseClientOptions{.IsClientForDisk=true});
object_storage = std::make_unique<AzureObjectStorage>("BackupReaderAzureBlobStorage",
std::move(client_ptr),
StorageAzureBlob::createSettings(context_),
configuration_.container);
object_storage = std::make_unique<AzureObjectStorage>(
"BackupReaderAzureBlobStorage",
std::move(client_ptr),
StorageAzureBlob::createSettings(context_),
configuration.container,
configuration.getConnectionURLWithContainer());
client = object_storage->getAzureBlobStorageClient();
settings = object_storage->getSettings();
auto settings_copy = *object_storage->getSettings();
settings_copy.use_native_copy = allow_azure_native_copy;
settings = std::make_unique<const AzureObjectStorageSettings>(settings_copy);
}
BackupReaderAzureBlobStorage::~BackupReaderAzureBlobStorage() = default;
@ -76,9 +82,9 @@ void BackupReaderAzureBlobStorage::copyFileToDisk(const String & path_in_backup,
DiskPtr destination_disk, const String & destination_path, WriteMode write_mode)
{
auto destination_data_source_description = destination_disk->getDataSourceDescription();
if ((destination_data_source_description.type == DataSourceType::ObjectStorage)
&& (destination_data_source_description.object_storage_type == ObjectStorageType::Azure)
&& (destination_data_source_description.is_encrypted == encrypted_in_backup))
LOG_TRACE(log, "Source description {}, desctionation description {}", data_source_description.description, destination_data_source_description.description);
if (destination_data_source_description.sameKind(data_source_description)
&& destination_data_source_description.is_encrypted == encrypted_in_backup)
{
LOG_TRACE(log, "Copying {} from AzureBlobStorage to disk {}", path_in_backup, destination_disk->getName());
auto write_blob_function = [&](const Strings & blob_path, WriteMode mode, const std::optional<ObjectAttributes> &) -> size_t
@ -116,12 +122,13 @@ void BackupReaderAzureBlobStorage::copyFileToDisk(const String & path_in_backup,
BackupWriterAzureBlobStorage::BackupWriterAzureBlobStorage(
StorageAzureBlob::Configuration configuration_,
bool allow_azure_native_copy,
const ReadSettings & read_settings_,
const WriteSettings & write_settings_,
const ContextPtr & context_,
bool attempt_to_create_container)
: BackupWriterDefault(read_settings_, write_settings_, getLogger("BackupWriterAzureBlobStorage"))
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::Azure, MetadataStorageType::None, configuration_.container, false, false}
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::Azure, MetadataStorageType::None, configuration_.getConnectionURLWithContainer(), false, false}
, configuration(configuration_)
{
auto client_ptr = StorageAzureBlob::createClient(configuration, /* is_read_only */ false, attempt_to_create_container);
@ -130,9 +137,12 @@ BackupWriterAzureBlobStorage::BackupWriterAzureBlobStorage(
object_storage = std::make_unique<AzureObjectStorage>("BackupWriterAzureBlobStorage",
std::move(client_ptr),
StorageAzureBlob::createSettings(context_),
configuration_.container);
configuration_.container,
configuration.getConnectionURLWithContainer());
client = object_storage->getAzureBlobStorageClient();
settings = object_storage->getSettings();
auto settings_copy = *object_storage->getSettings();
settings_copy.use_native_copy = allow_azure_native_copy;
settings = std::make_unique<const AzureObjectStorageSettings>(settings_copy);
}
void BackupWriterAzureBlobStorage::copyFileFromDisk(const String & path_in_backup, DiskPtr src_disk, const String & src_path,
@ -140,7 +150,9 @@ void BackupWriterAzureBlobStorage::copyFileFromDisk(const String & path_in_backu
{
/// Use the native copy as a more optimal way to copy a file from AzureBlobStorage to AzureBlobStorage if it's possible.
auto source_data_source_description = src_disk->getDataSourceDescription();
if (source_data_source_description.sameKind(data_source_description) && (source_data_source_description.is_encrypted == copy_encrypted))
LOG_TRACE(log, "Source description {}, desctionation description {}", source_data_source_description.description, data_source_description.description);
if (source_data_source_description.sameKind(data_source_description)
&& source_data_source_description.is_encrypted == copy_encrypted)
{
/// getBlobPath() can return more than 3 elements if the file is stored as multiple objects in AzureBlobStorage container.
/// In this case we can't use the native copy.

View File

@ -16,7 +16,12 @@ namespace DB
class BackupReaderAzureBlobStorage : public BackupReaderDefault
{
public:
BackupReaderAzureBlobStorage(StorageAzureBlob::Configuration configuration_, const ReadSettings & read_settings_, const WriteSettings & write_settings_, const ContextPtr & context_);
BackupReaderAzureBlobStorage(
StorageAzureBlob::Configuration configuration_,
bool allow_azure_native_copy,
const ReadSettings & read_settings_,
const WriteSettings & write_settings_,
const ContextPtr & context_);
~BackupReaderAzureBlobStorage() override;
bool fileExists(const String & file_name) override;
@ -37,7 +42,13 @@ private:
class BackupWriterAzureBlobStorage : public BackupWriterDefault
{
public:
BackupWriterAzureBlobStorage(StorageAzureBlob::Configuration configuration_, const ReadSettings & read_settings_, const WriteSettings & write_settings_, const ContextPtr & context_, bool attempt_to_create_container);
BackupWriterAzureBlobStorage(
StorageAzureBlob::Configuration configuration_,
bool allow_azure_native_copy,
const ReadSettings & read_settings_,
const WriteSettings & write_settings_,
const ContextPtr & context_,
bool attempt_to_create_container);
~BackupWriterAzureBlobStorage() override;
bool fileExists(const String & file_name) override;

View File

@ -27,6 +27,7 @@ namespace ErrorCodes
M(Bool, decrypt_files_from_encrypted_disks) \
M(Bool, deduplicate_files) \
M(Bool, allow_s3_native_copy) \
M(Bool, allow_azure_native_copy) \
M(Bool, use_same_s3_credentials_for_base_backup) \
M(Bool, azure_attempt_to_create_container) \
M(Bool, read_from_filesystem_cache) \

View File

@ -44,6 +44,9 @@ struct BackupSettings
/// Whether native copy is allowed (optimization for cloud storages, that sometimes could have bugs)
bool allow_s3_native_copy = true;
/// Whether native copy is allowed (optimization for cloud storages, that sometimes could have bugs)
bool allow_azure_native_copy = true;
/// Whether base backup to S3 should inherit credentials from the BACKUP query.
bool use_same_s3_credentials_for_base_backup = false;

View File

@ -598,6 +598,7 @@ void BackupsWorker::doBackup(
backup_create_params.backup_uuid = backup_settings.backup_uuid;
backup_create_params.deduplicate_files = backup_settings.deduplicate_files;
backup_create_params.allow_s3_native_copy = backup_settings.allow_s3_native_copy;
backup_create_params.allow_azure_native_copy = backup_settings.allow_azure_native_copy;
backup_create_params.use_same_s3_credentials_for_base_backup = backup_settings.use_same_s3_credentials_for_base_backup;
backup_create_params.azure_attempt_to_create_container = backup_settings.azure_attempt_to_create_container;
backup_create_params.read_settings = getReadSettingsForBackup(context, backup_settings);

View File

@ -135,10 +135,12 @@ void registerBackupEngineAzureBlobStorage(BackupFactory & factory)
if (params.open_mode == IBackup::OpenMode::READ)
{
auto reader = std::make_shared<BackupReaderAzureBlobStorage>(configuration,
params.read_settings,
params.write_settings,
params.context);
auto reader = std::make_shared<BackupReaderAzureBlobStorage>(
configuration,
params.allow_azure_native_copy,
params.read_settings,
params.write_settings,
params.context);
return std::make_unique<BackupImpl>(
params.backup_info,
@ -150,11 +152,13 @@ void registerBackupEngineAzureBlobStorage(BackupFactory & factory)
}
else
{
auto writer = std::make_shared<BackupWriterAzureBlobStorage>(configuration,
params.read_settings,
params.write_settings,
params.context,
params.azure_attempt_to_create_container);
auto writer = std::make_shared<BackupWriterAzureBlobStorage>(
configuration,
params.allow_azure_native_copy,
params.read_settings,
params.write_settings,
params.context,
params.azure_attempt_to_create_container);
return std::make_unique<BackupImpl>(
params.backup_info,

View File

@ -710,8 +710,8 @@ void ClientBase::adjustSettings()
settings.input_format_values_allow_data_after_semicolon.changed = false;
}
/// Do not limit pretty format output in case of --pager specified.
if (!pager.empty())
/// Do not limit pretty format output in case of --pager specified or in case of stdout is not a tty.
if (!pager.empty() || !stdout_is_a_tty)
{
if (!global_context->getSettingsRef().output_format_pretty_max_rows.changed)
{

View File

@ -0,0 +1,39 @@
#pragma once
#include <atomic>
#include <utility>
namespace DB
{
template <typename T>
struct CopyableAtomic
{
CopyableAtomic(const CopyableAtomic & other)
: value(other.value.load())
{}
explicit CopyableAtomic(T && value_)
: value(std::forward<T>(value_))
{}
CopyableAtomic & operator=(const CopyableAtomic & other)
{
value = other.value.load();
return *this;
}
CopyableAtomic & operator=(bool value_)
{
value = value_;
return *this;
}
explicit operator T() const { return value; }
const T & getValue() const { return value; }
std::atomic<T> value;
};
}

View File

@ -288,8 +288,10 @@
M(HTTPConnectionsTotal, "Total count of all sessions: stored in the pool and actively used right now for http hosts") \
\
M(AddressesActive, "Total count of addresses which are used for creation connections with connection pools") \
M(AddressesBanned, "Total count of addresses which are banned as faulty for creation connections with connection pools") \
M(AddressesBanned, "Total count of addresses which are banned as faulty for creation connections with connection pools") \
\
M(FilteringMarksWithPrimaryKey, "Number of threads currently doing filtering of mark ranges by the primary key") \
M(FilteringMarksWithSecondaryKeys, "Number of threads currently doing filtering of mark ranges by secondary keys") \
#ifdef APPLY_FOR_EXTERNAL_METRICS
#define APPLY_FOR_METRICS(M) APPLY_FOR_BUILTIN_METRICS(M) APPLY_FOR_EXTERNAL_METRICS(M)

View File

@ -5,6 +5,7 @@
#include <Common/ThreadPool.h>
#include <Common/callOnce.h>
#include <Disks/IO/IOUringReader.h>
#include <Disks/IO/getIOUringReader.h>
#include <Core/ServerSettings.h>
@ -303,10 +304,10 @@ IAsynchronousReader & Context::getThreadPoolReader(FilesystemReaderType type) co
}
#if USE_LIBURING
IOUringReader & Context::getIOURingReader() const
IOUringReader & Context::getIOUringReader() const
{
callOnce(shared->io_uring_reader_initialized, [&] {
shared->io_uring_reader = std::make_unique<IOUringReader>(512);
shared->io_uring_reader = createIOUringReader();
});
return *shared->io_uring_reader;

View File

@ -137,7 +137,7 @@ public:
IAsynchronousReader & getThreadPoolReader(FilesystemReaderType type) const;
#if USE_LIBURING
IOUringReader & getIOURingReader() const;
IOUringReader & getIOUringReader() const;
#endif
std::shared_ptr<AsyncReadCounters> getAsyncReadCounters() const;
ThreadPool & getThreadPoolWriter() const;

View File

@ -26,6 +26,8 @@
#include <Common/escapeForFileName.h>
#include <Common/filesystemHelpers.h>
#include <Common/logger_useful.h>
#include <Common/setThreadName.h>
namespace fs = std::filesystem;
@ -665,6 +667,7 @@ void DatabaseOnDisk::iterateMetadataFiles(ContextPtr local_context, const Iterat
pool.scheduleOrThrowOnError(
[batch, &process_metadata_file, &process_tmp_drop_metadata_file]() mutable
{
setThreadName("DatabaseOnDisk");
for (const auto & file : batch)
if (file.second)
process_metadata_file(file.first);

View File

@ -69,6 +69,15 @@ public:
return dictionary_id.getNameForLogs();
}
/// Returns fully qualified unquoted dictionary name
std::string getQualifiedName() const
{
std::lock_guard lock{mutex};
if (dictionary_id.database_name.empty())
return dictionary_id.table_name;
return dictionary_id.database_name + "." + dictionary_id.table_name;
}
StorageID getDictionaryID() const
{
std::lock_guard lock{mutex};

View File

@ -1,5 +1,4 @@
#include "IOUringReader.h"
#include <memory>
#if USE_LIBURING
@ -13,6 +12,7 @@
#include <Common/ThreadPool.h>
#include <Common/logger_useful.h>
#include <future>
#include <memory>
namespace ProfileEvents
{

View File

@ -4,9 +4,9 @@
#include <IO/MMapReadBufferFromFileWithCache.h>
#include <IO/AsynchronousReadBufferFromFile.h>
#include <Disks/IO/IOUringReader.h>
#include <Disks/IO/getIOUringReader.h>
#include <Disks/IO/ThreadPoolReader.h>
#include <Disks/IO/getThreadPoolReader.h>
#include <IO/SynchronousReader.h>
#include <IO/AsynchronousReader.h>
#include <Common/ProfileEvents.h>
#include "config.h"
@ -100,14 +100,7 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
else if (settings.local_fs_method == LocalFSReadMethod::io_uring)
{
#if USE_LIBURING
auto global_context = Context::getGlobalContextInstance();
if (!global_context)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot obtain io_uring reader (global context not initialized)");
auto & reader = global_context->getIOURingReader();
if (!reader.isSupported())
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "io_uring is not supported by this system");
auto & reader = getIOUringReaderOrThrow();
res = std::make_unique<AsynchronousReadBufferFromFileWithDescriptorsCache>(
reader,
settings.priority,

View File

@ -0,0 +1,41 @@
#include <Disks/IO/getIOUringReader.h>
#if USE_LIBURING
#include <Interpreters/Context.h>
#include <Common/ErrorCodes.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int UNSUPPORTED_METHOD;
}
std::unique_ptr<IOUringReader> createIOUringReader()
{
return std::make_unique<IOUringReader>(512);
}
IOUringReader & getIOUringReaderOrThrow(ContextPtr context)
{
auto & reader = context->getIOUringReader();
if (!reader.isSupported())
{
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "io_uring is not supported by this system");
}
return reader;
}
IOUringReader & getIOUringReaderOrThrow()
{
auto context = Context::getGlobalContextInstance();
if (!context)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context not initialized");
return getIOUringReaderOrThrow(context);
}
}
#endif

View File

@ -0,0 +1,21 @@
#pragma once
#include "config.h"
#if USE_LIBURING
#include <Interpreters/Context_fwd.h>
#include <Disks/IO/IOUringReader.h>
#include <memory>
namespace DB
{
std::unique_ptr<IOUringReader> createIOUringReader();
IOUringReader & getIOUringReaderOrThrow(ContextPtr);
IOUringReader & getIOUringReaderOrThrow();
}
#endif

View File

@ -107,11 +107,13 @@ AzureObjectStorage::AzureObjectStorage(
const String & name_,
AzureClientPtr && client_,
SettingsPtr && settings_,
const String & object_namespace_)
const String & object_namespace_,
const String & description_)
: name(name_)
, client(std::move(client_))
, settings(std::move(settings_))
, object_namespace(object_namespace_)
, description(description_)
, log(getLogger("AzureObjectStorage"))
{
}
@ -409,7 +411,8 @@ std::unique_ptr<IObjectStorage> AzureObjectStorage::cloneObjectStorage(const std
name,
getAzureBlobContainerClient(config, config_prefix),
getAzureBlobStorageSettings(config, config_prefix, context),
object_namespace
object_namespace,
description
);
}

View File

@ -81,7 +81,8 @@ public:
const String & name_,
AzureClientPtr && client_,
SettingsPtr && settings_,
const String & object_namespace_);
const String & object_namespace_,
const String & description_);
void listObjects(const std::string & path, RelativePathsWithMetadata & children, int max_keys) const override;
@ -93,7 +94,7 @@ public:
std::string getCommonKeyPrefix() const override { return ""; }
std::string getDescription() const override { return client.get()->GetUrl(); }
std::string getDescription() const override { return description; }
bool exists(const StoredObject & object) const override;
@ -172,6 +173,7 @@ private:
MultiVersion<Azure::Storage::Blobs::BlobContainerClient> client;
MultiVersion<AzureObjectStorageSettings> settings;
const String object_namespace; /// container + prefix
const String description; /// url + container
LoggerPtr log;
};

View File

@ -129,6 +129,7 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::migrateToRestorableSchemaRecu
{
pool.scheduleOrThrowOnError([this, path]
{
setThreadName("BackupWorker");
for (auto it = disk->iterateDirectory(path); it->isValid(); it->next())
migrateFileToRestorableSchema(it->path());
});

View File

@ -306,11 +306,14 @@ void registerAzureObjectStorage(ObjectStorageFactory & factory)
bool /* skip_access_check */) -> ObjectStoragePtr
{
AzureBlobStorageEndpoint endpoint = processAzureBlobStorageEndpoint(config, config_prefix);
std::string endpoint_string = endpoint.getEndpoint();
return createObjectStorage<AzureObjectStorage>(
ObjectStorageType::Azure, config, config_prefix, name,
getAzureBlobContainerClient(config, config_prefix),
getAzureBlobStorageSettings(config, config_prefix, context),
endpoint.prefix.empty() ? endpoint.container_name : endpoint.container_name + "/" + endpoint.prefix);
endpoint.prefix.empty() ? endpoint.container_name : endpoint.container_name + "/" + endpoint.prefix,
endpoint.prefix.empty() ? endpoint_string : endpoint_string.substr(0, endpoint_string.length() - endpoint.prefix.length()));
};
factory.registerObjectStorageType("azure_blob_storage", creator);
factory.registerObjectStorageType("azure", creator);

View File

@ -289,6 +289,7 @@ void copyAzureBlobStorageFile(
if (settings->use_native_copy)
{
LOG_TRACE(getLogger("copyAzureBlobStorageFile"), "Copying Blob: {} from Container: {} using native copy", src_container_for_logging, src_blob);
ProfileEvents::increment(ProfileEvents::AzureCopyObject);
if (dest_client->GetClickhouseOptions().IsClientForDisk)
ProfileEvents::increment(ProfileEvents::DiskAzureCopyObject);

View File

@ -695,7 +695,6 @@ String serializeQuery(const IAST & query, size_t max_length)
}
// static
void AsynchronousInsertQueue::processData(
InsertQuery key, InsertDataPtr data, ContextPtr global_context, QueueShardFlushTimeHistory & queue_shard_flush_time_history)
try
@ -705,6 +704,8 @@ try
SCOPE_EXIT(CurrentMetrics::sub(CurrentMetrics::PendingAsyncInsert, data->entries.size()));
setThreadName("AsyncInsertQ");
const auto log = getLogger("AsynchronousInsertQueue");
const auto & insert_query = assert_cast<const ASTInsertQuery &>(*key.query);

View File

@ -37,6 +37,7 @@
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Disks/StoragePolicy.h>
#include <Disks/IO/IOUringReader.h>
#include <Disks/IO/getIOUringReader.h>
#include <IO/SynchronousReader.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Interpreters/ActionLocksManager.h>
@ -5188,10 +5189,10 @@ IAsynchronousReader & Context::getThreadPoolReader(FilesystemReaderType type) co
}
#if USE_LIBURING
IOUringReader & Context::getIOURingReader() const
IOUringReader & Context::getIOUringReader() const
{
callOnce(shared->io_uring_reader_initialized, [&] {
shared->io_uring_reader = std::make_unique<IOUringReader>(512);
shared->io_uring_reader = createIOUringReader();
});
return *shared->io_uring_reader;

View File

@ -1246,7 +1246,7 @@ public:
IAsynchronousReader & getThreadPoolReader(FilesystemReaderType type) const;
#if USE_LIBURING
IOUringReader & getIOURingReader() const;
IOUringReader & getIOUringReader() const;
#endif
std::shared_ptr<AsyncReadCounters> getAsyncReadCounters() const;

View File

@ -77,21 +77,23 @@ void ExternalDictionariesLoader::updateObjectFromConfigWithoutReloading(IExterna
ExternalDictionariesLoader::DictPtr ExternalDictionariesLoader::getDictionary(const std::string & dictionary_name, ContextPtr local_context) const
{
std::string resolved_dictionary_name = resolveDictionaryName(dictionary_name, local_context->getCurrentDatabase());
auto dictionary = std::static_pointer_cast<const IDictionary>(load(resolved_dictionary_name));
if (local_context->hasQueryContext() && local_context->getSettingsRef().log_queries)
local_context->addQueryFactoriesInfo(Context::QueryLogFactories::Dictionary, resolved_dictionary_name);
local_context->getQueryContext()->addQueryFactoriesInfo(Context::QueryLogFactories::Dictionary, dictionary->getQualifiedName());
return std::static_pointer_cast<const IDictionary>(load(resolved_dictionary_name));
return dictionary;
}
ExternalDictionariesLoader::DictPtr ExternalDictionariesLoader::tryGetDictionary(const std::string & dictionary_name, ContextPtr local_context) const
{
std::string resolved_dictionary_name = resolveDictionaryName(dictionary_name, local_context->getCurrentDatabase());
auto dictionary = std::static_pointer_cast<const IDictionary>(tryLoad(resolved_dictionary_name));
if (local_context->hasQueryContext() && local_context->getSettingsRef().log_queries)
local_context->addQueryFactoriesInfo(Context::QueryLogFactories::Dictionary, resolved_dictionary_name);
if (local_context->hasQueryContext() && local_context->getSettingsRef().log_queries && dictionary)
local_context->getQueryContext()->addQueryFactoriesInfo(Context::QueryLogFactories::Dictionary, dictionary->getQualifiedName());
return std::static_pointer_cast<const IDictionary>(tryLoad(resolved_dictionary_name));
return dictionary;
}

View File

@ -44,7 +44,7 @@ bool replaceForPositionalArguments(ASTPtr & argument, const ASTSelectQuery * sel
pos = value;
else
{
if (static_cast<size_t>(std::abs(value)) > columns.size())
if (value < -static_cast<Int64>(columns.size()))
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Negative positional argument number {} is out of bounds. Expected in range [-{}, -1]",

View File

@ -53,6 +53,8 @@ namespace CurrentMetrics
extern const Metric MergeTreeDataSelectExecutorThreads;
extern const Metric MergeTreeDataSelectExecutorThreadsActive;
extern const Metric MergeTreeDataSelectExecutorThreadsScheduled;
extern const Metric FilteringMarksWithPrimaryKey;
extern const Metric FilteringMarksWithSecondaryKeys;
}
namespace DB
@ -664,15 +666,22 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
size_t total_marks_count = part->index_granularity.getMarksCountWithoutFinal();
if (metadata_snapshot->hasPrimaryKey() || part_offset_condition)
{
CurrentMetrics::Increment metric(CurrentMetrics::FilteringMarksWithPrimaryKey);
ranges.ranges = markRangesFromPKRange(part, metadata_snapshot, key_condition, part_offset_condition, settings, log);
}
else if (total_marks_count)
{
ranges.ranges = MarkRanges{{MarkRange{0, total_marks_count}}};
}
sum_marks_pk.fetch_add(ranges.getMarksCount(), std::memory_order_relaxed);
if (!ranges.ranges.empty())
sum_parts_pk.fetch_add(1, std::memory_order_relaxed);
CurrentMetrics::Increment metric(CurrentMetrics::FilteringMarksWithSecondaryKeys);
for (size_t idx = 0; idx < skip_indexes.useful_indices.size(); ++idx)
{
if (ranges.ranges.empty())
@ -733,6 +742,8 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
num_threads = std::min<size_t>(num_streams, settings.max_threads_for_indexes);
}
LOG_TRACE(log, "Filtering marks by primary and secondary keys");
if (num_threads <= 1)
{
for (size_t part_index = 0; part_index < parts.size(); ++part_index)
@ -740,7 +751,7 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
}
else
{
/// Parallel loading of data parts.
/// Parallel loading and filtering of data parts.
ThreadPool pool(
CurrentMetrics::MergeTreeDataSelectExecutorThreads,
CurrentMetrics::MergeTreeDataSelectExecutorThreadsActive,
@ -748,8 +759,11 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
num_threads);
for (size_t part_index = 0; part_index < parts.size(); ++part_index)
{
pool.scheduleOrThrowOnError([&, part_index, thread_group = CurrentThread::getGroup()]
{
setThreadName("MergeTreeIndex");
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachFromGroupIfNotDetached();
@ -759,6 +773,7 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
process_part(part_index);
});
}
pool.wait();
}
@ -1296,8 +1311,7 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
size_t last_index_mark = 0;
PostingsCacheForStore cache_in_store;
if (dynamic_cast<const MergeTreeIndexFullText *>(&*index_helper) != nullptr)
if (dynamic_cast<const MergeTreeIndexFullText *>(index_helper.get()))
cache_in_store.store = GinIndexStoreFactory::instance().get(index_helper->getFileName(), part->getDataPartStoragePtr());
for (size_t i = 0; i < ranges.size(); ++i)
@ -1315,12 +1329,12 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
auto ann_condition = std::dynamic_pointer_cast<IMergeTreeIndexConditionApproximateNearestNeighbor>(condition);
if (ann_condition != nullptr)
{
// vector of indexes of useful ranges
/// An array of indices of useful ranges.
auto result = ann_condition->getUsefulRanges(granule);
for (auto range : result)
{
// range for corresponding index
/// The range for the corresponding index.
MarkRange data_range(
std::max(ranges[i].begin, index_mark * index_granularity + range),
std::min(ranges[i].end, index_mark * index_granularity + range + 1));
@ -1344,8 +1358,8 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
continue;
MarkRange data_range(
std::max(ranges[i].begin, index_mark * index_granularity),
std::min(ranges[i].end, (index_mark + 1) * index_granularity));
std::max(ranges[i].begin, index_mark * index_granularity),
std::min(ranges[i].end, (index_mark + 1) * index_granularity));
if (res.empty() || data_range.begin - res.back().end > min_marks_for_seek)
res.push_back(data_range);

View File

@ -35,8 +35,7 @@ MergeTreeIndexGranuleSet::MergeTreeIndexGranuleSet(
size_t max_rows_)
: index_name(index_name_)
, max_rows(max_rows_)
, index_sample_block(index_sample_block_)
, block(index_sample_block)
, block(index_sample_block_.cloneEmpty())
{
}
@ -47,8 +46,7 @@ MergeTreeIndexGranuleSet::MergeTreeIndexGranuleSet(
MutableColumns && mutable_columns_)
: index_name(index_name_)
, max_rows(max_rows_)
, index_sample_block(index_sample_block_)
, block(index_sample_block.cloneWithColumns(std::move(mutable_columns_)))
, block(index_sample_block_.cloneWithColumns(std::move(mutable_columns_)))
{
}
@ -67,10 +65,11 @@ void MergeTreeIndexGranuleSet::serializeBinary(WriteBuffer & ostr) const
}
size_serialization->serializeBinary(size(), ostr, {});
size_t num_columns = block.columns();
for (size_t i = 0; i < index_sample_block.columns(); ++i)
for (size_t i = 0; i < num_columns; ++i)
{
const auto & type = index_sample_block.getByPosition(i).type;
const auto & type = block.getByPosition(i).type;
ISerialization::SerializeBinaryBulkSettings settings;
settings.getter = [&ostr](ISerialization::SubstreamPath) -> WriteBuffer * { return &ostr; };
@ -92,8 +91,6 @@ void MergeTreeIndexGranuleSet::deserializeBinary(ReadBuffer & istr, MergeTreeInd
if (version != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown index version {}.", version);
block.clear();
Field field_rows;
const auto & size_type = DataTypePtr(std::make_shared<DataTypeUInt64>());
size_type->getDefaultSerialization()->deserializeBinary(field_rows, istr, {});
@ -102,24 +99,22 @@ void MergeTreeIndexGranuleSet::deserializeBinary(ReadBuffer & istr, MergeTreeInd
if (rows_to_read == 0)
return;
for (size_t i = 0; i < index_sample_block.columns(); ++i)
size_t num_columns = block.columns();
ISerialization::DeserializeBinaryBulkSettings settings;
settings.getter = [&](ISerialization::SubstreamPath) -> ReadBuffer * { return &istr; };
settings.position_independent_encoding = false;
for (size_t i = 0; i < num_columns; ++i)
{
const auto & column = index_sample_block.getByPosition(i);
const auto & type = column.type;
ColumnPtr new_column = type->createColumn();
ISerialization::DeserializeBinaryBulkSettings settings;
settings.getter = [&](ISerialization::SubstreamPath) -> ReadBuffer * { return &istr; };
settings.position_independent_encoding = false;
auto & elem = block.getByPosition(i);
elem.column = elem.column->cloneEmpty();
ISerialization::DeserializeBinaryBulkStatePtr state;
auto serialization = type->getDefaultSerialization();
auto serialization = elem.type->getDefaultSerialization();
serialization->deserializeBinaryBulkStatePrefix(settings, state);
serialization->deserializeBinaryBulkWithMultipleStreams(new_column, rows_to_read, settings, state, nullptr);
block.insert(ColumnWithTypeAndName(new_column, type, column.name));
serialization->deserializeBinaryBulkWithMultipleStreams(elem.column, rows_to_read, settings, state, nullptr);
}
}
@ -272,6 +267,8 @@ MergeTreeIndexConditionSet::MergeTreeIndexConditionSet(
filter_actions_dag->removeUnusedActions();
actions = std::make_shared<ExpressionActions>(filter_actions_dag);
actions_output_column_name = filter_actions_dag->getOutputs().at(0)->result_name;
}
bool MergeTreeIndexConditionSet::alwaysUnknownOrTrue() const
@ -284,42 +281,19 @@ bool MergeTreeIndexConditionSet::mayBeTrueOnGranule(MergeTreeIndexGranulePtr idx
if (isUseless())
return true;
auto granule = std::dynamic_pointer_cast<MergeTreeIndexGranuleSet>(idx_granule);
if (!granule)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Set index condition got a granule with the wrong type");
const MergeTreeIndexGranuleSet & granule = assert_cast<const MergeTreeIndexGranuleSet &>(*idx_granule);
if (isUseless() || granule->empty() || (max_rows != 0 && granule->size() > max_rows))
size_t size = granule.size();
if (size == 0 || (max_rows != 0 && size > max_rows))
return true;
Block result = granule->block;
Block result = granule.block;
actions->execute(result);
const auto & filter_node_name = actions->getActionsDAG().getOutputs().at(0)->result_name;
auto column = result.getByName(filter_node_name).column->convertToFullColumnIfConst()->convertToFullColumnIfLowCardinality();
const auto & column = result.getByName(actions_output_column_name).column;
if (column->onlyNull())
return false;
const auto * col_uint8 = typeid_cast<const ColumnUInt8 *>(column.get());
const NullMap * null_map = nullptr;
if (const auto * col_nullable = checkAndGetColumn<ColumnNullable>(&*column))
{
col_uint8 = typeid_cast<const ColumnUInt8 *>(&col_nullable->getNestedColumn());
null_map = &col_nullable->getNullMapData();
}
if (!col_uint8)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"ColumnUInt8 expected as Set index condition result");
const auto & condition = col_uint8->getData();
size_t column_size = column->size();
for (size_t i = 0; i < column_size; ++i)
if ((!null_map || (*null_map)[i] == 0) && condition[i] & 1)
for (size_t i = 0; i < size; ++i)
if (!column->isNullAt(i) && (column->get64(i) & 1))
return true;
return false;

View File

@ -34,7 +34,6 @@ struct MergeTreeIndexGranuleSet final : public IMergeTreeIndexGranule
const String index_name;
const size_t max_rows;
const Block index_sample_block;
Block block;
};
@ -127,6 +126,7 @@ private:
std::unordered_set<String> key_columns;
ExpressionActionsPtr actions;
String actions_output_column_name;
};

View File

@ -1,5 +1,6 @@
#pragma once
#include <Common/CopyableAtomic.h>
#include <Common/Exception.h>
#include <Common/ZooKeeper/Types.h>
#include <base/types.h>
@ -9,7 +10,6 @@
#include <Storages/MergeTree/MergeTreeDataFormatVersion.h>
#include <Disks/IDisk.h>
#include <mutex>
#include <condition_variable>
@ -174,7 +174,7 @@ struct ReplicatedMergeTreeLogEntryData
size_t quorum = 0;
/// Used only in tests for permanent fault injection for particular queue entry.
bool fault_injected = false;
CopyableAtomic<bool> fault_injected{false};
/// If this MUTATE_PART entry caused by alter(modify/drop) query.
bool isAlterMutation() const

View File

@ -302,8 +302,8 @@ void registerStorageAzureBlob(StorageFactory & factory)
auto settings = StorageAzureBlob::createSettings(args.getContext());
return std::make_shared<StorageAzureBlob>(
std::move(configuration),
std::make_unique<AzureObjectStorage>("AzureBlobStorage", std::move(client), std::move(settings),configuration.container),
configuration,
std::make_unique<AzureObjectStorage>("AzureBlobStorage", std::move(client), std::move(settings), configuration.container, configuration.getConnectionURLWithContainer()),
args.getContext(),
args.table_id,
args.columns,
@ -491,6 +491,12 @@ Poco::URI StorageAzureBlob::Configuration::getConnectionURL() const
return Poco::URI(parsed_connection_string.BlobServiceUrl.GetAbsoluteUrl());
}
std::string StorageAzureBlob::Configuration::getConnectionURLWithContainer() const
{
auto url = getConnectionURL();
return fs::path(url.toString()) / container;
}
bool StorageAzureBlob::Configuration::withGlobsIgnorePartitionWildcard() const
{
if (!withPartitionWildcard())

View File

@ -45,6 +45,8 @@ public:
Poco::URI getConnectionURL() const;
std::string getConnectionURLWithContainer() const;
std::string connection_url;
bool is_connection_string;

View File

@ -28,6 +28,7 @@
#include <IO/PeekableReadBuffer.h>
#include <IO/AsynchronousReadBufferFromFile.h>
#include <Disks/IO/IOUringReader.h>
#include <Disks/IO/getIOUringReader.h>
#include <Formats/FormatFactory.h>
#include <Formats/ReadSchemaUtils.h>
@ -282,10 +283,7 @@ std::unique_ptr<ReadBuffer> selectReadBuffer(
else if (read_method == LocalFSReadMethod::io_uring && !use_table_fd)
{
#if USE_LIBURING
auto & reader = context->getIOURingReader();
if (!reader.isSupported())
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "io_uring is not supported by this system");
auto & reader = getIOUringReaderOrThrow(context);
res = std::make_unique<AsynchronousReadBufferFromFileWithDescriptorsCache>(
reader,
Priority{},

View File

@ -141,6 +141,8 @@ public:
if (thread_group)
CurrentThread::attachToGroupIfDetached(thread_group);
setThreadName("SystemReplicas");
try
{
ReplicatedTableStatus status;

View File

@ -333,7 +333,7 @@ ColumnsDescription TableFunctionAzureBlobStorage::getActualTableStructure(Contex
auto client = StorageAzureBlob::createClient(configuration, !is_insert_query);
auto settings = StorageAzureBlob::createSettings(context);
auto object_storage = std::make_unique<AzureObjectStorage>("AzureBlobStorageTableFunction", std::move(client), std::move(settings), configuration.container);
auto object_storage = std::make_unique<AzureObjectStorage>("AzureBlobStorageTableFunction", std::move(client), std::move(settings), configuration.container, configuration.getConnectionURLWithContainer());
if (configuration.format == "auto")
return StorageAzureBlob::getTableStructureAndFormatFromData(object_storage.get(), configuration, std::nullopt, context).first;
return StorageAzureBlob::getTableStructureFromData(object_storage.get(), configuration, std::nullopt, context);
@ -365,7 +365,7 @@ StoragePtr TableFunctionAzureBlobStorage::executeImpl(const ASTPtr & /*ast_funct
StoragePtr storage = std::make_shared<StorageAzureBlob>(
configuration,
std::make_unique<AzureObjectStorage>(table_name, std::move(client), std::move(settings), configuration.container),
std::make_unique<AzureObjectStorage>(table_name, std::move(client), std::move(settings), configuration.container, configuration.getConnectionURLWithContainer()),
context,
StorageID(getDatabaseName(), table_name),
columns,

View File

@ -39,7 +39,7 @@ StoragePtr TableFunctionAzureBlobStorageCluster::executeImpl(
/// On worker node this filename won't contains globs
storage = std::make_shared<StorageAzureBlob>(
configuration,
std::make_unique<AzureObjectStorage>(table_name, std::move(client), std::move(settings), configuration.container),
std::make_unique<AzureObjectStorage>(table_name, std::move(client), std::move(settings), configuration.container, configuration.getConnectionURLWithContainer()),
context,
StorageID(getDatabaseName(), table_name),
columns,
@ -54,7 +54,7 @@ StoragePtr TableFunctionAzureBlobStorageCluster::executeImpl(
storage = std::make_shared<StorageAzureBlobCluster>(
cluster_name,
configuration,
std::make_unique<AzureObjectStorage>(table_name, std::move(client), std::move(settings), configuration.container),
std::make_unique<AzureObjectStorage>(table_name, std::move(client), std::move(settings), configuration.container, configuration.getConnectionURLWithContainer()),
StorageID(getDatabaseName(), table_name),
columns,
ConstraintsDescription{},

View File

@ -0,0 +1 @@
#!/usr/bin/env python3

View File

@ -0,0 +1,215 @@
#!/usr/bin/env python3
import gzip
import json
import logging
import os
import io
import random
import threading
import time
from azure.storage.blob import BlobServiceClient
import helpers.client
import pytest
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
from helpers.network import PartitionManager
from helpers.mock_servers import start_mock_servers
from helpers.test_tools import exec_query_with_retry
def generate_config(port):
path = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
"./_gen/storage_conf.xml",
)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w") as f:
TEMPLATE = """
<clickhouse>
<storage_configuration>
<disks>
<disk_azure>
<metadata_type>local</metadata_type>
<type>object_storage</type>
<object_storage_type>azure_blob_storage</object_storage_type>
<storage_account_url>http://azurite1:{port}/devstoreaccount1</storage_account_url>
<container_name>cont</container_name>
<skip_access_check>false</skip_access_check>
<account_name>devstoreaccount1</account_name>
<account_key>Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==</account_key>
<use_native_copy>true</use_native_copy>
</disk_azure>
<disk_azure_other_bucket>
<metadata_type>local</metadata_type>
<type>object_storage</type>
<object_storage_type>azure_blob_storage</object_storage_type>
<use_native_copy>true</use_native_copy>
<storage_account_url>http://azurite1:{port}/devstoreaccount1</storage_account_url>
<container_name>othercontainer</container_name>
<skip_access_check>false</skip_access_check>
<account_name>devstoreaccount1</account_name>
<account_key>Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==</account_key>
</disk_azure_other_bucket>
<disk_azure_cache>
<type>cache</type>
<disk>disk_azure</disk>
<path>/tmp/azure_cache/</path>
<max_size>1000000000</max_size>
<cache_on_write_operations>1</cache_on_write_operations>
</disk_azure_cache>
</disks>
<policies>
<policy_azure>
<volumes>
<main>
<disk>disk_azure</disk>
</main>
</volumes>
</policy_azure>
<policy_azure_other_bucket>
<volumes>
<main>
<disk>disk_azure_other_bucket</disk>
</main>
</volumes>
</policy_azure_other_bucket>
<policy_azure_cache>
<volumes>
<main>
<disk>disk_azure_cache</disk>
</main>
</volumes>
</policy_azure_cache>
</policies>
</storage_configuration>
<backups>
<allowed_disk>disk_azure</allowed_disk>
<allowed_disk>disk_azure_cache</allowed_disk>
<allowed_disk>disk_azure_other_bucket</allowed_disk>
</backups>
</clickhouse>
"""
f.write(TEMPLATE.format(port=port))
return path
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
port = cluster.azurite_port
path = generate_config(port)
cluster.add_instance(
"node1",
main_configs=[path],
with_azurite=True,
)
cluster.add_instance(
"node2",
main_configs=[path],
with_azurite=True,
)
cluster.start()
yield cluster
finally:
cluster.shutdown()
def azure_query(
node, query, expect_error=False, try_num=10, settings={}, query_on_retry=None
):
for i in range(try_num):
try:
if expect_error:
return node.query_and_get_error(query, settings=settings)
else:
return node.query(query, settings=settings)
except Exception as ex:
retriable_errors = [
"DB::Exception: Azure::Core::Http::TransportException: Connection was closed by the server while trying to read a response",
"DB::Exception: Azure::Core::Http::TransportException: Connection closed before getting full response or response is less than expected",
"DB::Exception: Azure::Core::Http::TransportException: Connection was closed by the server while trying to read a response",
"DB::Exception: Azure::Core::Http::TransportException: Error while polling for socket ready read",
"Azure::Core::Http::TransportException, e.what() = Connection was closed by the server while trying to read a response",
"Azure::Core::Http::TransportException, e.what() = Connection closed before getting full response or response is less than expected",
"Azure::Core::Http::TransportException, e.what() = Connection was closed by the server while trying to read a response",
"Azure::Core::Http::TransportException, e.what() = Error while polling for socket ready read",
]
retry = False
for error in retriable_errors:
if error in str(ex):
retry = True
print(f"Try num: {i}. Having retriable error: {ex}")
time.sleep(i)
break
if not retry or i == try_num - 1:
raise Exception(ex)
if query_on_retry is not None:
node.query(query_on_retry)
continue
def test_backup_restore_on_merge_tree_same_container(cluster):
node1 = cluster.instances["node1"]
azure_query(
node1,
f"CREATE TABLE test_simple_merge_tree(key UInt64, data String) Engine = MergeTree() ORDER BY tuple() SETTINGS storage_policy='policy_azure_cache'",
)
azure_query(node1, f"INSERT INTO test_simple_merge_tree VALUES (1, 'a')")
backup_destination = f"AzureBlobStorage('{cluster.env_variables['AZURITE_CONNECTION_STRING']}', 'cont', 'test_simple_merge_tree_backup')"
print("BACKUP DEST", backup_destination)
azure_query(
node1,
f"BACKUP TABLE test_simple_merge_tree TO {backup_destination}",
)
assert node1.contains_in_log("using native copy")
azure_query(
node1,
f"RESTORE TABLE test_simple_merge_tree AS test_simple_merge_tree_restored FROM {backup_destination};",
)
assert (
azure_query(node1, f"SELECT * from test_simple_merge_tree_restored") == "1\ta\n"
)
azure_query(node1, f"DROP TABLE test_simple_merge_tree")
azure_query(node1, f"DROP TABLE test_simple_merge_tree_restored")
def test_backup_restore_on_merge_tree_different_container(cluster):
node2 = cluster.instances["node2"]
azure_query(
node2,
f"CREATE TABLE test_simple_merge_tree_different_bucket(key UInt64, data String) Engine = MergeTree() ORDER BY tuple() SETTINGS storage_policy='policy_azure_other_bucket'",
)
azure_query(
node2, f"INSERT INTO test_simple_merge_tree_different_bucket VALUES (1, 'a')"
)
backup_destination = f"AzureBlobStorage('{cluster.env_variables['AZURITE_CONNECTION_STRING']}', 'cont', 'test_simple_merge_tree_different_bucket_backup_different_bucket')"
print("BACKUP DEST", backup_destination)
azure_query(
node2,
f"BACKUP TABLE test_simple_merge_tree_different_bucket TO {backup_destination}",
)
assert not node2.contains_in_log("using native copy")
azure_query(
node2,
f"RESTORE TABLE test_simple_merge_tree_different_bucket AS test_simple_merge_tree_different_bucket_restored FROM {backup_destination};",
)
assert (
azure_query(
node2, f"SELECT * from test_simple_merge_tree_different_bucket_restored"
)
== "1\ta\n"
)
assert not node2.contains_in_log("using native copy")
azure_query(node2, f"DROP TABLE test_simple_merge_tree_different_bucket")
azure_query(node2, f"DROP TABLE test_simple_merge_tree_different_bucket_restored")

View File

@ -0,0 +1,14 @@
<test>
<create_query>
CREATE TABLE test_set (k UInt32, x UInt32, INDEX idx (x) TYPE set(10) GRANULARITY 1) ENGINE = MergeTree ORDER BY k SETTINGS index_granularity = 111;
</create_query>
<fill_query>SYSTEM STOP MERGES</fill_query>
<fill_query>INSERT INTO test_set SELECT number, number DIV 100 + rand() % 7 FROM numbers(3000000) SETTINGS max_insert_threads = 4;</fill_query>
<query>
SELECT count() FROM test_set WHERE x = 1234 SETTINGS max_threads = 8;
</query>
<drop_query>SYSTEM START MERGES</drop_query>
<drop_query>DROP TABLE IF EXISTS test_set</drop_query>
</test>

View File

@ -0,0 +1,4 @@
simple_with_analyzer ['default.03148_dictionary']
nested_with_analyzer ['default.03148_dictionary']
simple_without_analyzer ['default.03148_dictionary']
nested_without_analyzer ['default.03148_dictionary']

View File

@ -0,0 +1,84 @@
DROP DICTIONARY IF EXISTS 03148_dictionary;
CREATE DICTIONARY 03148_dictionary (
id UInt64,
name String
)
PRIMARY KEY id
SOURCE(CLICKHOUSE(
QUERY 'select 0 as id, ''name0'' as name'
))
LIFETIME(MIN 1 MAX 10)
LAYOUT(HASHED);
SELECT
dictGet('03148_dictionary', 'name', number) as dict_value
FROM numbers(1)
SETTINGS
allow_experimental_analyzer = 1,
log_comment = 'simple_with_analyzer'
FORMAT Null;
SYSTEM FLUSH LOGS;
SELECT log_comment, used_dictionaries
FROM system.query_log
WHERE current_database = currentDatabase()
AND type = 'QueryFinish'
AND log_comment = 'simple_with_analyzer';
SELECT *
FROM (
SELECT
dictGet('03148_dictionary', 'name', number) as dict_value
FROM numbers(1)
) t
SETTINGS
allow_experimental_analyzer = 1,
log_comment = 'nested_with_analyzer'
FORMAT Null;
SYSTEM FLUSH LOGS;
SELECT log_comment, used_dictionaries
FROM system.query_log
WHERE current_database = currentDatabase()
AND type = 'QueryFinish'
AND log_comment = 'nested_with_analyzer';
SELECT
dictGet('03148_dictionary', 'name', number) as dict_value
FROM numbers(1)
SETTINGS
allow_experimental_analyzer = 0,
log_comment = 'simple_without_analyzer'
FORMAT Null;
SYSTEM FLUSH LOGS;
SELECT log_comment, used_dictionaries
FROM system.query_log
WHERE current_database = currentDatabase()
AND type = 'QueryFinish'
AND log_comment = 'simple_without_analyzer';
SELECT *
FROM (
SELECT
dictGet('03148_dictionary', 'name', number) as dict_value
FROM numbers(1)
) t
SETTINGS
allow_experimental_analyzer = 0,
log_comment = 'nested_without_analyzer'
FORMAT Null;
SYSTEM FLUSH LOGS;
SELECT log_comment, used_dictionaries
FROM system.query_log
WHERE current_database = currentDatabase()
AND type = 'QueryFinish'
AND log_comment = 'nested_without_analyzer';
DROP DICTIONARY IF EXISTS 03148_dictionary;

View File

@ -0,0 +1 @@
SELECT 1 GROUP BY -9223372036854775808; -- { serverError BAD_ARGUMENTS }

View File

@ -0,0 +1 @@
100004

View File

@ -0,0 +1,8 @@
#!/usr/bin/env bash
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
# default output_format_pretty_max_rows is 10K
$CLICKHOUSE_LOCAL -q "select * from numbers(100e3) format PrettySpace settings max_threads=1" | wc -l