mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Merge branch 'master' into stress_s3
This commit is contained in:
commit
3ff6489fae
@ -334,7 +334,6 @@ else
|
||||
rm -rf /var/lib/clickhouse/*
|
||||
|
||||
# Make BC check more funny by forcing Ordinary engine for system database
|
||||
# New version will try to convert it to Atomic on startup
|
||||
mkdir /var/lib/clickhouse/metadata
|
||||
echo "ATTACH DATABASE system ENGINE=Ordinary" > /var/lib/clickhouse/metadata/system.sql
|
||||
|
||||
@ -344,16 +343,13 @@ else
|
||||
# Start server from previous release
|
||||
configure
|
||||
|
||||
# Avoid "Setting allow_deprecated_database_ordinary is neither a builtin setting..."
|
||||
rm -f /etc/clickhouse-server/users.d/database_ordinary.xml ||:
|
||||
# Avoid "Setting s3_check_objects_after_upload is neither a builtin setting..."
|
||||
rm -f /etc/clickhouse-server/users.d/enable_blobs_check.xml ||:
|
||||
|
||||
# Remove s3 related configs to avoid "there is no disk type `cache`"
|
||||
rm -f /etc/clickhouse-server/config.d/storage_conf.xml ||:
|
||||
rm -f /etc/clickhouse-server/config.d/azure_storage_conf.xml ||:
|
||||
|
||||
# Disable aggressive cleanup of tmp dirs (it worked incorrectly before 22.8)
|
||||
rm -f /etc/clickhouse-server/config.d/merge_tree_old_dirs_cleanup.xml ||:
|
||||
|
||||
start
|
||||
|
||||
clickhouse-client --query="SELECT 'Server version: ', version()"
|
||||
|
@ -5,16 +5,6 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
BackupEntryFromAppendOnlyFile::BackupEntryFromAppendOnlyFile(
|
||||
const String & file_path_,
|
||||
const std::optional<UInt64> & file_size_,
|
||||
const std::optional<UInt128> & checksum_,
|
||||
const std::shared_ptr<Poco::TemporaryFile> & temporary_file_)
|
||||
: BackupEntryFromImmutableFile(file_path_, file_size_, checksum_, temporary_file_)
|
||||
, limit(BackupEntryFromImmutableFile::getSize())
|
||||
{
|
||||
}
|
||||
|
||||
BackupEntryFromAppendOnlyFile::BackupEntryFromAppendOnlyFile(
|
||||
const DiskPtr & disk_,
|
||||
const String & file_path_,
|
||||
|
@ -11,13 +11,6 @@ namespace DB
|
||||
class BackupEntryFromAppendOnlyFile : public BackupEntryFromImmutableFile
|
||||
{
|
||||
public:
|
||||
/// The constructor is allowed to not set `file_size_` or `checksum_`, in that case it will be calculated from the data.
|
||||
explicit BackupEntryFromAppendOnlyFile(
|
||||
const String & file_path_,
|
||||
const std::optional<UInt64> & file_size_ = {},
|
||||
const std::optional<UInt128> & checksum_ = {},
|
||||
const std::shared_ptr<Poco::TemporaryFile> & temporary_file_ = {});
|
||||
|
||||
BackupEntryFromAppendOnlyFile(
|
||||
const DiskPtr & disk_,
|
||||
const String & file_path_,
|
||||
|
@ -2,20 +2,12 @@
|
||||
#include <Disks/IDisk.h>
|
||||
#include <Disks/IO/createReadBufferFromFileBase.h>
|
||||
#include <Poco/File.h>
|
||||
#include <Common/filesystemHelpers.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
BackupEntryFromImmutableFile::BackupEntryFromImmutableFile(
|
||||
const String & file_path_,
|
||||
const std::optional<UInt64> & file_size_,
|
||||
const std::optional<UInt128> & checksum_,
|
||||
const std::shared_ptr<Poco::TemporaryFile> & temporary_file_)
|
||||
: file_path(file_path_), file_size(file_size_), checksum(checksum_), temporary_file(temporary_file_)
|
||||
{
|
||||
}
|
||||
|
||||
BackupEntryFromImmutableFile::BackupEntryFromImmutableFile(
|
||||
const DiskPtr & disk_,
|
||||
const String & file_path_,
|
||||
@ -32,16 +24,24 @@ UInt64 BackupEntryFromImmutableFile::getSize() const
|
||||
{
|
||||
std::lock_guard lock{get_file_size_mutex};
|
||||
if (!file_size)
|
||||
file_size = disk ? disk->getFileSize(file_path) : Poco::File(file_path).getSize();
|
||||
file_size = disk->getFileSize(file_path);
|
||||
return *file_size;
|
||||
}
|
||||
|
||||
std::unique_ptr<SeekableReadBuffer> BackupEntryFromImmutableFile::getReadBuffer() const
|
||||
{
|
||||
if (disk)
|
||||
return disk->readFile(file_path);
|
||||
else
|
||||
return createReadBufferFromFileBase(file_path, /* settings= */ {});
|
||||
return disk->readFile(file_path);
|
||||
}
|
||||
|
||||
|
||||
DataSourceDescription BackupEntryFromImmutableFile::getDataSourceDescription() const
|
||||
{
|
||||
return disk->getDataSourceDescription();
|
||||
}
|
||||
|
||||
String BackupEntryFromImmutableFile::getFilePath() const
|
||||
{
|
||||
return file_path;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -16,13 +16,6 @@ using DiskPtr = std::shared_ptr<IDisk>;
|
||||
class BackupEntryFromImmutableFile : public IBackupEntry
|
||||
{
|
||||
public:
|
||||
/// The constructor is allowed to not set `file_size_` or `checksum_`, in that case it will be calculated from the data.
|
||||
explicit BackupEntryFromImmutableFile(
|
||||
const String & file_path_,
|
||||
const std::optional<UInt64> & file_size_ = {},
|
||||
const std::optional<UInt128> & checksum_ = {},
|
||||
const std::shared_ptr<Poco::TemporaryFile> & temporary_file_ = {});
|
||||
|
||||
BackupEntryFromImmutableFile(
|
||||
const DiskPtr & disk_,
|
||||
const String & file_path_,
|
||||
@ -36,8 +29,10 @@ public:
|
||||
std::optional<UInt128> getChecksum() const override { return checksum; }
|
||||
std::unique_ptr<SeekableReadBuffer> getReadBuffer() const override;
|
||||
|
||||
String getFilePath() const { return file_path; }
|
||||
DiskPtr getDisk() const { return disk; }
|
||||
String getFilePath() const override;
|
||||
DataSourceDescription getDataSourceDescription() const override;
|
||||
|
||||
DiskPtr tryGetDiskIfExists() const override { return disk; }
|
||||
|
||||
private:
|
||||
const DiskPtr disk;
|
||||
|
@ -19,6 +19,18 @@ public:
|
||||
std::optional<UInt128> getChecksum() const override { return checksum; }
|
||||
std::unique_ptr<SeekableReadBuffer> getReadBuffer() const override;
|
||||
|
||||
String getFilePath() const override
|
||||
{
|
||||
return "";
|
||||
}
|
||||
|
||||
DataSourceDescription getDataSourceDescription() const override
|
||||
{
|
||||
return DataSourceDescription{DataSourceType::RAM, "", false, false};
|
||||
}
|
||||
|
||||
DiskPtr tryGetDiskIfExists() const override { return nullptr; }
|
||||
|
||||
private:
|
||||
const String data;
|
||||
const std::optional<UInt128> checksum;
|
||||
|
@ -36,4 +36,5 @@ BackupEntryFromSmallFile::BackupEntryFromSmallFile(
|
||||
: BackupEntryFromMemory(readFile(disk_, file_path_), checksum_), disk(disk_), file_path(file_path_)
|
||||
{
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -23,9 +23,9 @@ public:
|
||||
const String & file_path_,
|
||||
const std::optional<UInt128> & checksum_ = {});
|
||||
|
||||
String getFilePath() const { return file_path; }
|
||||
DiskPtr getDisk() const { return disk; }
|
||||
String getFilePath() const override { return file_path; }
|
||||
|
||||
DiskPtr tryGetDiskIfExists() const override { return disk; }
|
||||
private:
|
||||
const DiskPtr disk;
|
||||
const String file_path;
|
||||
|
27
src/Backups/BackupIO.cpp
Normal file
27
src/Backups/BackupIO.cpp
Normal file
@ -0,0 +1,27 @@
|
||||
#include <Backups/BackupIO.h>
|
||||
|
||||
#include <IO/copyData.h>
|
||||
#include <IO/WriteBuffer.h>
|
||||
#include <IO/SeekableReadBuffer.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
}
|
||||
|
||||
void IBackupWriter::copyFileThroughBuffer(std::unique_ptr<SeekableReadBuffer> && source, const String & file_name)
|
||||
{
|
||||
auto write_buffer = writeFile(file_name);
|
||||
copyData(*source, *write_buffer);
|
||||
write_buffer->finalize();
|
||||
}
|
||||
|
||||
void IBackupWriter::copyFileNative(DiskPtr /* from_disk */, const String & /* file_name_from */, const String & /* file_name_to */)
|
||||
{
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Native copy not implemented for backup writer");
|
||||
}
|
||||
|
||||
}
|
@ -1,6 +1,8 @@
|
||||
#pragma once
|
||||
|
||||
#include <Core/Types.h>
|
||||
#include <Disks/DiskType.h>
|
||||
#include <Disks/IDisk.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -15,6 +17,7 @@ public:
|
||||
virtual bool fileExists(const String & file_name) = 0;
|
||||
virtual UInt64 getFileSize(const String & file_name) = 0;
|
||||
virtual std::unique_ptr<SeekableReadBuffer> readFile(const String & file_name) = 0;
|
||||
virtual DataSourceDescription getDataSourceDescription() const = 0;
|
||||
};
|
||||
|
||||
/// Represents operations of storing to disk or uploading for writing a backup.
|
||||
@ -27,6 +30,15 @@ public:
|
||||
virtual bool fileContentsEqual(const String & file_name, const String & expected_file_contents) = 0;
|
||||
virtual std::unique_ptr<WriteBuffer> writeFile(const String & file_name) = 0;
|
||||
virtual void removeFiles(const Strings & file_names) = 0;
|
||||
virtual DataSourceDescription getDataSourceDescription() const = 0;
|
||||
virtual void copyFileThroughBuffer(std::unique_ptr<SeekableReadBuffer> && source, const String & file_name);
|
||||
|
||||
virtual bool supportNativeCopy(DataSourceDescription /* data_source_description */) const
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
virtual void copyFileNative(DiskPtr from_disk, const String & file_name_from, const String & file_name_to);
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -6,6 +6,12 @@
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
BackupReaderDisk::BackupReaderDisk(const DiskPtr & disk_, const String & path_) : disk(disk_), path(path_)
|
||||
{
|
||||
}
|
||||
@ -77,4 +83,28 @@ void BackupWriterDisk::removeFiles(const Strings & file_names)
|
||||
disk->removeDirectory(path);
|
||||
}
|
||||
|
||||
DataSourceDescription BackupWriterDisk::getDataSourceDescription() const
|
||||
{
|
||||
return disk->getDataSourceDescription();
|
||||
}
|
||||
|
||||
DataSourceDescription BackupReaderDisk::getDataSourceDescription() const
|
||||
{
|
||||
return disk->getDataSourceDescription();
|
||||
}
|
||||
|
||||
bool BackupWriterDisk::supportNativeCopy(DataSourceDescription data_source_description) const
|
||||
{
|
||||
return data_source_description == disk->getDataSourceDescription();
|
||||
}
|
||||
|
||||
void BackupWriterDisk::copyFileNative(DiskPtr from_disk, const String & file_name_from, const String & file_name_to)
|
||||
{
|
||||
if (!from_disk)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot natively copy data to disk without source disk");
|
||||
auto file_path = path / file_name_to;
|
||||
disk->createDirectories(file_path.parent_path());
|
||||
from_disk->copyFile(file_name_from, *disk, file_path);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -17,6 +17,7 @@ public:
|
||||
bool fileExists(const String & file_name) override;
|
||||
UInt64 getFileSize(const String & file_name) override;
|
||||
std::unique_ptr<SeekableReadBuffer> readFile(const String & file_name) override;
|
||||
DataSourceDescription getDataSourceDescription() const override;
|
||||
|
||||
private:
|
||||
DiskPtr disk;
|
||||
@ -34,7 +35,11 @@ public:
|
||||
bool fileContentsEqual(const String & file_name, const String & expected_file_contents) override;
|
||||
std::unique_ptr<WriteBuffer> writeFile(const String & file_name) override;
|
||||
void removeFiles(const Strings & file_names) override;
|
||||
DataSourceDescription getDataSourceDescription() const override;
|
||||
|
||||
bool supportNativeCopy(DataSourceDescription data_source_description) const override;
|
||||
|
||||
void copyFileNative(DiskPtr from_disk, const String & file_name_from, const String & file_name_to) override;
|
||||
private:
|
||||
DiskPtr disk;
|
||||
std::filesystem::path path;
|
||||
|
@ -1,6 +1,8 @@
|
||||
#include <Backups/BackupIO_File.h>
|
||||
#include <Disks/IO/createReadBufferFromFileBase.h>
|
||||
#include <IO/WriteBufferFromFile.h>
|
||||
#include <IO/copyData.h>
|
||||
#include <Common/filesystemHelpers.h>
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
@ -78,4 +80,55 @@ void BackupWriterFile::removeFiles(const Strings & file_names)
|
||||
fs::remove(path);
|
||||
}
|
||||
|
||||
DataSourceDescription BackupWriterFile::getDataSourceDescription() const
|
||||
{
|
||||
DataSourceDescription data_source_description;
|
||||
|
||||
data_source_description.type = DataSourceType::Local;
|
||||
|
||||
if (auto block_device_id = tryGetBlockDeviceId(path); block_device_id.has_value())
|
||||
data_source_description.description = *block_device_id;
|
||||
else
|
||||
data_source_description.description = path;
|
||||
data_source_description.is_encrypted = false;
|
||||
data_source_description.is_cached = false;
|
||||
|
||||
return data_source_description;
|
||||
}
|
||||
|
||||
DataSourceDescription BackupReaderFile::getDataSourceDescription() const
|
||||
{
|
||||
DataSourceDescription data_source_description;
|
||||
|
||||
data_source_description.type = DataSourceType::Local;
|
||||
|
||||
if (auto block_device_id = tryGetBlockDeviceId(path); block_device_id.has_value())
|
||||
data_source_description.description = *block_device_id;
|
||||
else
|
||||
data_source_description.description = path;
|
||||
data_source_description.is_encrypted = false;
|
||||
data_source_description.is_cached = false;
|
||||
|
||||
return data_source_description;
|
||||
}
|
||||
|
||||
|
||||
bool BackupWriterFile::supportNativeCopy(DataSourceDescription data_source_description) const
|
||||
{
|
||||
return data_source_description == getDataSourceDescription();
|
||||
}
|
||||
|
||||
void BackupWriterFile::copyFileNative(DiskPtr from_disk, const String & file_name_from, const String & file_name_to)
|
||||
{
|
||||
auto file_path = path / file_name_to;
|
||||
fs::create_directories(file_path.parent_path());
|
||||
std::string abs_source_path;
|
||||
if (from_disk)
|
||||
abs_source_path = fullPath(from_disk, file_name_from);
|
||||
else
|
||||
abs_source_path = fs::absolute(file_name_from);
|
||||
|
||||
fs::copy(abs_source_path, file_path, fs::copy_options::recursive | fs::copy_options::overwrite_existing);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -9,12 +9,13 @@ namespace DB
|
||||
class BackupReaderFile : public IBackupReader
|
||||
{
|
||||
public:
|
||||
BackupReaderFile(const String & path_);
|
||||
explicit BackupReaderFile(const String & path_);
|
||||
~BackupReaderFile() override;
|
||||
|
||||
bool fileExists(const String & file_name) override;
|
||||
UInt64 getFileSize(const String & file_name) override;
|
||||
std::unique_ptr<SeekableReadBuffer> readFile(const String & file_name) override;
|
||||
DataSourceDescription getDataSourceDescription() const override;
|
||||
|
||||
private:
|
||||
std::filesystem::path path;
|
||||
@ -23,7 +24,7 @@ private:
|
||||
class BackupWriterFile : public IBackupWriter
|
||||
{
|
||||
public:
|
||||
BackupWriterFile(const String & path_);
|
||||
explicit BackupWriterFile(const String & path_);
|
||||
~BackupWriterFile() override;
|
||||
|
||||
bool fileExists(const String & file_name) override;
|
||||
@ -31,6 +32,10 @@ public:
|
||||
bool fileContentsEqual(const String & file_name, const String & expected_file_contents) override;
|
||||
std::unique_ptr<WriteBuffer> writeFile(const String & file_name) override;
|
||||
void removeFiles(const Strings & file_names) override;
|
||||
DataSourceDescription getDataSourceDescription() const override;
|
||||
bool supportNativeCopy(DataSourceDescription data_source_description) const override;
|
||||
|
||||
void copyFileNative(DiskPtr from_disk, const String & file_name_from, const String & file_name_to) override;
|
||||
|
||||
private:
|
||||
std::filesystem::path path;
|
||||
|
@ -111,6 +111,22 @@ public:
|
||||
UInt64 getSize() const override { return size; }
|
||||
std::optional<UInt128> getChecksum() const override { return checksum; }
|
||||
|
||||
String getFilePath() const override
|
||||
{
|
||||
return data_file_name;
|
||||
}
|
||||
|
||||
DiskPtr tryGetDiskIfExists() const override
|
||||
{
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
DataSourceDescription getDataSourceDescription() const override
|
||||
{
|
||||
return backup->reader->getDataSourceDescription();
|
||||
}
|
||||
|
||||
|
||||
private:
|
||||
const std::shared_ptr<const BackupImpl> backup;
|
||||
const String archive_suffix;
|
||||
@ -587,9 +603,86 @@ BackupEntryPtr BackupImpl::readFile(const SizeAndChecksum & size_and_checksum) c
|
||||
}
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
std::optional<SizeAndChecksum> getInfoAboutFileFromBaseBackupIfExists(std::shared_ptr<const IBackup> base_backup, const std::string & file_path)
|
||||
{
|
||||
if (base_backup && base_backup->fileExists(file_path))
|
||||
return std::pair{base_backup->getFileSize(file_path), base_backup->getFileChecksum(file_path)};
|
||||
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
enum class CheckBackupResult
|
||||
{
|
||||
HasPrefix,
|
||||
HasFull,
|
||||
HasNothing,
|
||||
};
|
||||
|
||||
CheckBackupResult checkBaseBackupForFile(const SizeAndChecksum & base_backup_info, const FileInfo & new_entry_info)
|
||||
{
|
||||
/// We cannot reuse base backup because our file is smaller
|
||||
/// than file stored in previous backup
|
||||
if (new_entry_info.size > base_backup_info.first)
|
||||
return CheckBackupResult::HasNothing;
|
||||
|
||||
if (base_backup_info.first == new_entry_info.size)
|
||||
return CheckBackupResult::HasFull;
|
||||
|
||||
return CheckBackupResult::HasPrefix;
|
||||
|
||||
}
|
||||
|
||||
struct ChecksumsForNewEntry
|
||||
{
|
||||
UInt128 full_checksum;
|
||||
UInt128 prefix_checksum;
|
||||
};
|
||||
|
||||
/// Calculate checksum for backup entry if it's empty.
|
||||
/// Also able to calculate additional checksum of some prefix.
|
||||
ChecksumsForNewEntry calculateNewEntryChecksumsIfNeeded(BackupEntryPtr entry, size_t prefix_size)
|
||||
{
|
||||
if (prefix_size > 0)
|
||||
{
|
||||
auto read_buffer = entry->getReadBuffer();
|
||||
HashingReadBuffer hashing_read_buffer(*read_buffer);
|
||||
hashing_read_buffer.ignore(prefix_size);
|
||||
auto prefix_checksum = hashing_read_buffer.getHash();
|
||||
if (entry->getChecksum() == std::nullopt)
|
||||
{
|
||||
hashing_read_buffer.ignoreAll();
|
||||
auto full_checksum = hashing_read_buffer.getHash();
|
||||
return ChecksumsForNewEntry{full_checksum, prefix_checksum};
|
||||
}
|
||||
else
|
||||
{
|
||||
return ChecksumsForNewEntry{*(entry->getChecksum()), prefix_checksum};
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (entry->getChecksum() == std::nullopt)
|
||||
{
|
||||
auto read_buffer = entry->getReadBuffer();
|
||||
HashingReadBuffer hashing_read_buffer(*read_buffer);
|
||||
hashing_read_buffer.ignoreAll();
|
||||
return ChecksumsForNewEntry{hashing_read_buffer.getHash(), 0};
|
||||
}
|
||||
else
|
||||
{
|
||||
return ChecksumsForNewEntry{*(entry->getChecksum()), 0};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void BackupImpl::writeFile(const String & file_name, BackupEntryPtr entry)
|
||||
{
|
||||
|
||||
std::lock_guard lock{mutex};
|
||||
if (open_mode != OpenMode::WRITE)
|
||||
throw Exception("Backup is not opened for writing", ErrorCodes::LOGICAL_ERROR);
|
||||
@ -597,164 +690,179 @@ void BackupImpl::writeFile(const String & file_name, BackupEntryPtr entry)
|
||||
if (writing_finalized)
|
||||
throw Exception("Backup is already finalized", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
std::string from_file_name = "memory buffer";
|
||||
if (auto fname = entry->getFilePath(); !fname.empty())
|
||||
from_file_name = "file " + fname;
|
||||
LOG_TRACE(log, "Writing backup for file {} from file {}", file_name, from_file_name);
|
||||
|
||||
auto adjusted_path = removeLeadingSlash(file_name);
|
||||
if (coordination->getFileInfo(adjusted_path))
|
||||
throw Exception(
|
||||
ErrorCodes::BACKUP_ENTRY_ALREADY_EXISTS, "Backup {}: Entry {} already exists", backup_name, quoteString(file_name));
|
||||
|
||||
FileInfo info;
|
||||
info.file_name = adjusted_path;
|
||||
size_t size = entry->getSize();
|
||||
info.size = size;
|
||||
FileInfo info
|
||||
{
|
||||
.file_name = adjusted_path,
|
||||
.size = entry->getSize(),
|
||||
.base_size = 0,
|
||||
.base_checksum = 0,
|
||||
};
|
||||
|
||||
/// Check if the entry's data is empty.
|
||||
if (!info.size)
|
||||
/// Empty file, nothing to backup
|
||||
if (info.size == 0)
|
||||
{
|
||||
coordination->addFileInfo(info);
|
||||
return;
|
||||
}
|
||||
|
||||
/// Maybe we have a copy of this file in the backup already.
|
||||
std::optional<UInt128> checksum = entry->getChecksum();
|
||||
if (checksum && coordination->getFileInfo(std::pair{size, *checksum}))
|
||||
{
|
||||
info.checksum = *checksum;
|
||||
coordination->addFileInfo(info);
|
||||
return;
|
||||
}
|
||||
std::optional<SizeAndChecksum> base_backup_file_info = getInfoAboutFileFromBaseBackupIfExists(base_backup, adjusted_path);
|
||||
|
||||
/// Check if a entry with such name exists in the base backup.
|
||||
bool base_exists = (base_backup && base_backup->fileExists(adjusted_path));
|
||||
UInt64 base_size = 0;
|
||||
UInt128 base_checksum{0, 0};
|
||||
if (base_exists)
|
||||
/// We have info about this file in base backup
|
||||
/// If file has no checksum -- calculate and fill it.
|
||||
if (base_backup_file_info.has_value())
|
||||
{
|
||||
base_size = base_backup->getFileSize(adjusted_path);
|
||||
base_checksum = base_backup->getFileChecksum(adjusted_path);
|
||||
}
|
||||
LOG_TRACE(log, "File {} found in base backup, checking for equality", adjusted_path);
|
||||
CheckBackupResult check_base = checkBaseBackupForFile(*base_backup_file_info, info);
|
||||
|
||||
std::unique_ptr<SeekableReadBuffer> read_buffer; /// We'll set that later.
|
||||
std::optional<HashingReadBuffer> hashing_read_buffer;
|
||||
UInt64 hashing_pos = 0; /// Current position in `hashing_read_buffer`.
|
||||
|
||||
/// Determine whether it's possible to receive this entry's data from the base backup completely or partly.
|
||||
bool use_base = false;
|
||||
if (base_exists && base_size && (size >= base_size))
|
||||
{
|
||||
if (checksum && (size == base_size))
|
||||
/// File with the same name but smaller size exist in previous backup
|
||||
if (check_base == CheckBackupResult::HasPrefix)
|
||||
{
|
||||
/// The size is the same, we need to compare checksums to find out
|
||||
/// if the entry's data has not changed since the base backup.
|
||||
use_base = (*checksum == base_checksum);
|
||||
auto checksums = calculateNewEntryChecksumsIfNeeded(entry, base_backup_file_info->first);
|
||||
info.checksum = checksums.full_checksum;
|
||||
|
||||
/// We have prefix of this file in backup with the same checksum.
|
||||
/// In ClickHouse this can happen for StorageLog for example.
|
||||
if (checksums.prefix_checksum == base_backup_file_info->second)
|
||||
{
|
||||
LOG_TRACE(log, "File prefix of {} in base backup, will write rest part of file to current backup", adjusted_path);
|
||||
info.base_size = base_backup_file_info->first;
|
||||
info.base_checksum = base_backup_file_info->second;
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_TRACE(log, "Prefix checksum of file {} doesn't match with checksum in base backup", adjusted_path);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/// The size has increased, we need to calculate a partial checksum to find out
|
||||
/// if the entry's data has only appended since the base backup.
|
||||
read_buffer = entry->getReadBuffer();
|
||||
hashing_read_buffer.emplace(*read_buffer);
|
||||
hashing_read_buffer->ignore(base_size);
|
||||
hashing_pos = base_size;
|
||||
UInt128 partial_checksum = hashing_read_buffer->getHash();
|
||||
if (size == base_size)
|
||||
checksum = partial_checksum;
|
||||
if (partial_checksum == base_checksum)
|
||||
use_base = true;
|
||||
/// We have full file or have nothing, first of all let's get checksum
|
||||
/// of current file
|
||||
auto checksums = calculateNewEntryChecksumsIfNeeded(entry, 0);
|
||||
info.checksum = checksums.full_checksum;
|
||||
|
||||
if (info.checksum == base_backup_file_info->second)
|
||||
{
|
||||
LOG_TRACE(log, "Found whole file {} in base backup", adjusted_path);
|
||||
assert(check_base == CheckBackupResult::HasFull);
|
||||
assert(info.size == base_backup_file_info->first);
|
||||
|
||||
info.base_size = base_backup_file_info->first;
|
||||
info.base_checksum = base_backup_file_info->second;
|
||||
/// Actually we can add this info to coordination and exist,
|
||||
/// but we intentionally don't do it, otherwise control flow
|
||||
/// of this function will be very complex.
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_TRACE(log, "Whole file {} in base backup doesn't match by checksum", adjusted_path);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Finish calculating the checksum.
|
||||
if (!checksum)
|
||||
else /// We don't have info about this file_name (sic!) in base backup,
|
||||
/// however file could be renamed, so we will check one more time using size and checksum
|
||||
{
|
||||
if (!read_buffer)
|
||||
read_buffer = entry->getReadBuffer();
|
||||
if (!hashing_read_buffer)
|
||||
hashing_read_buffer.emplace(*read_buffer);
|
||||
hashing_read_buffer->ignore(size - hashing_pos);
|
||||
checksum = hashing_read_buffer->getHash();
|
||||
|
||||
LOG_TRACE(log, "Nothing found for file {} in base backup", adjusted_path);
|
||||
auto checksums = calculateNewEntryChecksumsIfNeeded(entry, 0);
|
||||
info.checksum = checksums.full_checksum;
|
||||
}
|
||||
hashing_read_buffer.reset();
|
||||
info.checksum = *checksum;
|
||||
|
||||
/// Maybe we have a copy of this file in the backup already.
|
||||
if (coordination->getFileInfo(std::pair{size, *checksum}))
|
||||
if (coordination->getFileInfo(std::pair{info.size, info.checksum}))
|
||||
{
|
||||
LOG_TRACE(log, "File {} already exist in current backup, adding reference", adjusted_path);
|
||||
coordination->addFileInfo(info);
|
||||
return;
|
||||
}
|
||||
|
||||
/// Check if a entry with the same checksum exists in the base backup.
|
||||
if (base_backup && !use_base && base_backup->fileExists(std::pair{size, *checksum}))
|
||||
/// On the previous lines we checked that backup for file with adjusted_name exist in previous backup.
|
||||
/// However file can be renamed, but has the same size and checksums, let's check for this case.
|
||||
if (base_backup && base_backup->fileExists(std::pair{info.size, info.checksum}))
|
||||
{
|
||||
/// The entry's data has not changed since the base backup,
|
||||
/// but the entry itself has been moved or renamed.
|
||||
base_size = size;
|
||||
base_checksum = *checksum;
|
||||
use_base = true;
|
||||
}
|
||||
|
||||
if (use_base)
|
||||
{
|
||||
info.base_size = base_size;
|
||||
info.base_checksum = base_checksum;
|
||||
}
|
||||
LOG_TRACE(log, "File {} doesn't exist in current backup, but we have file with same size and checksum", adjusted_path);
|
||||
info.base_size = info.size;
|
||||
info.base_checksum = info.checksum;
|
||||
|
||||
if (use_base && (size == base_size))
|
||||
{
|
||||
/// The entry's data has not been changed since the base backup.
|
||||
coordination->addFileInfo(info);
|
||||
return;
|
||||
}
|
||||
|
||||
bool is_data_file_required;
|
||||
/// All "short paths" failed. We don't have this file in previous or existing backup
|
||||
/// or have only prefix of it in previous backup. Let's go long path.
|
||||
|
||||
info.data_file_name = info.file_name;
|
||||
info.archive_suffix = current_archive_suffix;
|
||||
|
||||
bool is_data_file_required;
|
||||
coordination->addFileInfo(info, is_data_file_required);
|
||||
if (!is_data_file_required)
|
||||
return; /// We copy data only if it's a new combination of size & checksum.
|
||||
|
||||
/// Either the entry wasn't exist in the base backup
|
||||
/// or the entry has data appended to the end of the data from the base backup.
|
||||
/// In both those cases we have to copy data to this backup.
|
||||
|
||||
/// Find out where the start position to copy data is.
|
||||
auto copy_pos = use_base ? base_size : 0;
|
||||
|
||||
/// Move the current read position to the start position to copy data.
|
||||
if (!read_buffer)
|
||||
read_buffer = entry->getReadBuffer();
|
||||
read_buffer->seek(copy_pos, SEEK_SET);
|
||||
|
||||
if (!num_files_written)
|
||||
checkLockFile(true);
|
||||
|
||||
/// Copy the entry's data after `copy_pos`.
|
||||
std::unique_ptr<WriteBuffer> out;
|
||||
if (use_archives)
|
||||
{
|
||||
String archive_suffix = current_archive_suffix;
|
||||
bool next_suffix = false;
|
||||
if (current_archive_suffix.empty() && is_internal_backup)
|
||||
next_suffix = true;
|
||||
/*if (archive_params.max_volume_size && current_archive_writer
|
||||
&& (current_archive_writer->getTotalSize() + size - base_size > archive_params.max_volume_size))
|
||||
next_suffix = true;*/
|
||||
if (next_suffix)
|
||||
current_archive_suffix = coordination->getNextArchiveSuffix();
|
||||
if (info.archive_suffix != current_archive_suffix)
|
||||
{
|
||||
info.archive_suffix = current_archive_suffix;
|
||||
coordination->updateFileInfo(info);
|
||||
}
|
||||
out = getArchiveWriter(current_archive_suffix)->writeFile(info.data_file_name);
|
||||
LOG_TRACE(log, "File {} doesn't exist in current backup, but we have file with same size and checksum", adjusted_path);
|
||||
return; /// We copy data only if it's a new combination of size & checksum.
|
||||
}
|
||||
auto writer_description = writer->getDataSourceDescription();
|
||||
auto reader_description = entry->getDataSourceDescription();
|
||||
|
||||
/// We need to copy whole file without archive, we can do it faster
|
||||
/// if source and destination are compatible
|
||||
if (!use_archives && info.base_size == 0 && writer->supportNativeCopy(reader_description))
|
||||
{
|
||||
|
||||
LOG_TRACE(log, "Will copy file {} using native copy", adjusted_path);
|
||||
/// Should be much faster than writing data through server
|
||||
writer->copyFileNative(entry->tryGetDiskIfExists(), entry->getFilePath(), info.data_file_name);
|
||||
}
|
||||
else
|
||||
{
|
||||
out = writer->writeFile(info.data_file_name);
|
||||
LOG_TRACE(log, "Will copy file {} through memory buffers", adjusted_path);
|
||||
auto read_buffer = entry->getReadBuffer();
|
||||
|
||||
/// If we have prefix in base we will seek to the start of the suffix which differs
|
||||
if (info.base_size != 0)
|
||||
read_buffer->seek(info.base_size, SEEK_SET);
|
||||
|
||||
if (!num_files_written)
|
||||
checkLockFile(true);
|
||||
|
||||
if (use_archives)
|
||||
{
|
||||
LOG_TRACE(log, "Adding file {} to archive", adjusted_path);
|
||||
String archive_suffix = current_archive_suffix;
|
||||
bool next_suffix = false;
|
||||
if (current_archive_suffix.empty() && is_internal_backup)
|
||||
next_suffix = true;
|
||||
/*if (archive_params.max_volume_size && current_archive_writer
|
||||
&& (current_archive_writer->getTotalSize() + size - base_size > archive_params.max_volume_size))
|
||||
next_suffix = true;*/
|
||||
if (next_suffix)
|
||||
current_archive_suffix = coordination->getNextArchiveSuffix();
|
||||
|
||||
if (info.archive_suffix != current_archive_suffix)
|
||||
{
|
||||
info.archive_suffix = current_archive_suffix;
|
||||
coordination->updateFileInfo(info);
|
||||
}
|
||||
auto out = getArchiveWriter(current_archive_suffix)->writeFile(info.data_file_name);
|
||||
copyData(*read_buffer, *out);
|
||||
out->finalize();
|
||||
}
|
||||
else
|
||||
{
|
||||
writer->copyFileThroughBuffer(std::move(read_buffer), info.data_file_name);
|
||||
}
|
||||
}
|
||||
|
||||
copyData(*read_buffer, *out);
|
||||
out->finalize();
|
||||
++num_files_written;
|
||||
}
|
||||
|
||||
|
@ -90,7 +90,8 @@ namespace
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
coordination->setError(current_host, Exception{getCurrentExceptionCode(), getCurrentExceptionMessage(true, true)});
|
||||
if (coordination)
|
||||
coordination->setError(current_host, Exception{getCurrentExceptionCode(), getCurrentExceptionMessage(true, true)});
|
||||
}
|
||||
}
|
||||
|
||||
@ -164,9 +165,9 @@ OperationID BackupsWorker::startMakingBackup(const ASTPtr & query, const Context
|
||||
backup_coordination = makeBackupCoordination(backup_settings.coordination_zk_path, context, backup_settings.internal);
|
||||
}
|
||||
|
||||
auto backup_info = BackupInfo::fromAST(*backup_query->backup_name);
|
||||
try
|
||||
{
|
||||
auto backup_info = BackupInfo::fromAST(*backup_query->backup_name);
|
||||
addInfo(backup_id, backup_info.toString(), backup_settings.internal, BackupStatus::CREATING_BACKUP);
|
||||
|
||||
/// Prepare context to use.
|
||||
@ -213,6 +214,7 @@ OperationID BackupsWorker::startMakingBackup(const ASTPtr & query, const Context
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(log, fmt::format("Failed to start {} {}", (backup_settings.internal ? "internal backup" : "backup"), backup_info.toString()));
|
||||
/// Something bad happened, the backup has not built.
|
||||
setStatusSafe(backup_id, BackupStatus::BACKUP_FAILED);
|
||||
sendCurrentExceptionToCoordination(backup_coordination, backup_settings.host_id);
|
||||
|
@ -20,6 +20,20 @@ public:
|
||||
UInt64 getSize() const override { return getInternalBackupEntry()->getSize(); }
|
||||
std::optional<UInt128> getChecksum() const override { return getInternalBackupEntry()->getChecksum(); }
|
||||
std::unique_ptr<SeekableReadBuffer> getReadBuffer() const override { return getInternalBackupEntry()->getReadBuffer(); }
|
||||
String getFilePath() const override
|
||||
{
|
||||
return getInternalBackupEntry()->getFilePath();
|
||||
}
|
||||
|
||||
DiskPtr tryGetDiskIfExists() const override
|
||||
{
|
||||
return getInternalBackupEntry()->tryGetDiskIfExists();
|
||||
}
|
||||
|
||||
DataSourceDescription getDataSourceDescription() const override
|
||||
{
|
||||
return getInternalBackupEntry()->getDataSourceDescription();
|
||||
}
|
||||
|
||||
private:
|
||||
BackupEntryPtr getInternalBackupEntry() const
|
||||
|
@ -4,6 +4,8 @@
|
||||
#include <memory>
|
||||
#include <optional>
|
||||
#include <vector>
|
||||
#include <Disks/DiskType.h>
|
||||
#include <Disks/IDisk.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -24,6 +26,12 @@ public:
|
||||
|
||||
/// Returns a read buffer for reading the data.
|
||||
virtual std::unique_ptr<SeekableReadBuffer> getReadBuffer() const = 0;
|
||||
|
||||
virtual String getFilePath() const = 0;
|
||||
|
||||
virtual DiskPtr tryGetDiskIfExists() const = 0;
|
||||
|
||||
virtual DataSourceDescription getDataSourceDescription() const = 0;
|
||||
};
|
||||
|
||||
using BackupEntryPtr = std::shared_ptr<const IBackupEntry>;
|
||||
|
@ -46,7 +46,7 @@ namespace
|
||||
void checkPath(const String & disk_name, const DiskPtr & disk, fs::path & path)
|
||||
{
|
||||
path = path.lexically_normal();
|
||||
if (!path.is_relative() && (disk->getType() == DiskType::Local))
|
||||
if (!path.is_relative() && (disk->getDataSourceDescription().type == DataSourceType::Local))
|
||||
path = path.lexically_proximate(disk->getPath());
|
||||
|
||||
bool path_ok = path.empty() || (path.is_relative() && (*path.begin() != ".."));
|
||||
|
@ -108,9 +108,10 @@ void FileCache::useCell(
|
||||
|
||||
if (file_segment->isDownloaded()
|
||||
&& fs::file_size(getPathInLocalCache(file_segment->key(), file_segment->offset(), file_segment->isPersistent())) == 0)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Cannot have zero size downloaded file segments. Current file segment: {}",
|
||||
file_segment->range().toString());
|
||||
throw Exception(
|
||||
ErrorCodes::LOGICAL_ERROR,
|
||||
"Cannot have zero size downloaded file segments. {}",
|
||||
file_segment->getInfoForLog());
|
||||
|
||||
result.push_back(cell.file_segment);
|
||||
|
||||
@ -872,7 +873,7 @@ void FileCache::remove(
|
||||
Key key, size_t offset,
|
||||
std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & /* segment_lock */)
|
||||
{
|
||||
LOG_TEST(log, "Remove. Key: {}, offset: {}", key.toString(), offset);
|
||||
LOG_DEBUG(log, "Remove from cache. Key: {}, offset: {}", key.toString(), offset);
|
||||
|
||||
auto * cell = getCell(key, offset, cache_lock);
|
||||
if (!cell)
|
||||
|
@ -55,6 +55,7 @@ FileSegment::FileSegment(
|
||||
case (State::DOWNLOADED):
|
||||
{
|
||||
reserved_size = downloaded_size = size_;
|
||||
is_downloaded = true;
|
||||
break;
|
||||
}
|
||||
case (State::SKIP_CACHE):
|
||||
@ -574,6 +575,7 @@ String FileSegment::getInfoForLogImpl(std::lock_guard<std::mutex> & segment_lock
|
||||
{
|
||||
WriteBufferFromOwnString info;
|
||||
info << "File segment: " << range().toString() << ", ";
|
||||
info << "key: " << key().toString() << ", ";
|
||||
info << "state: " << download_state << ", ";
|
||||
info << "downloaded size: " << getDownloadedSize(segment_lock) << ", ";
|
||||
info << "reserved size: " << reserved_size << ", ";
|
||||
@ -699,7 +701,7 @@ void FileSegment::detach(
|
||||
download_state = State::PARTIALLY_DOWNLOADED_NO_CONTINUATION;
|
||||
downloader_id.clear();
|
||||
|
||||
LOG_TEST(log, "Detached file segment: {}", getInfoForLogImpl(segment_lock));
|
||||
LOG_DEBUG(log, "Detached file segment: {}", getInfoForLogImpl(segment_lock));
|
||||
}
|
||||
|
||||
void FileSegment::markAsDetached(std::lock_guard<std::mutex> & /* segment_lock */)
|
||||
|
@ -24,9 +24,7 @@ IFileCachePriority::WriteIterator LRUFileCachePriority::add(const Key & key, siz
|
||||
throw Exception(
|
||||
ErrorCodes::LOGICAL_ERROR,
|
||||
"Attempt to add duplicate queue entry to queue. (Key: {}, offset: {}, size: {})",
|
||||
entry.key.toString(),
|
||||
entry.offset,
|
||||
entry.size);
|
||||
entry.key.toString(), entry.offset, entry.size);
|
||||
}
|
||||
#endif
|
||||
|
||||
@ -36,6 +34,8 @@ IFileCachePriority::WriteIterator LRUFileCachePriority::add(const Key & key, siz
|
||||
CurrentMetrics::add(CurrentMetrics::FilesystemCacheSize, size);
|
||||
CurrentMetrics::add(CurrentMetrics::FilesystemCacheElements);
|
||||
|
||||
LOG_DEBUG(log, "Added entry into LRU queue, key: {}, offset: {}", key.toString(), offset);
|
||||
|
||||
return std::make_shared<LRUFileCacheIterator>(this, iter);
|
||||
}
|
||||
|
||||
@ -54,6 +54,8 @@ void LRUFileCachePriority::removeAll(std::lock_guard<std::mutex> &)
|
||||
CurrentMetrics::sub(CurrentMetrics::FilesystemCacheSize, cache_size);
|
||||
CurrentMetrics::sub(CurrentMetrics::FilesystemCacheElements, queue.size());
|
||||
|
||||
LOG_DEBUG(log, "Removed all entries from LRU queue");
|
||||
|
||||
queue.clear();
|
||||
cache_size = 0;
|
||||
}
|
||||
@ -86,6 +88,8 @@ void LRUFileCachePriority::LRUFileCacheIterator::removeAndGetNext(std::lock_guar
|
||||
CurrentMetrics::sub(CurrentMetrics::FilesystemCacheSize, queue_iter->size);
|
||||
CurrentMetrics::sub(CurrentMetrics::FilesystemCacheElements);
|
||||
|
||||
LOG_DEBUG(cache_priority->log, "Removed entry from LRU queue, key: {}, offset: {}", queue_iter->key.toString(), queue_iter->offset);
|
||||
|
||||
queue_iter = cache_priority->queue.erase(queue_iter);
|
||||
}
|
||||
|
||||
|
@ -2,6 +2,7 @@
|
||||
|
||||
#include <list>
|
||||
#include <Common/IFileCachePriority.h>
|
||||
#include <Common/logger_useful.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -32,6 +33,7 @@ public:
|
||||
|
||||
private:
|
||||
LRUQueue queue;
|
||||
Poco::Logger * log = &Poco::Logger::get("LRUFileCachePriority");
|
||||
};
|
||||
|
||||
class LRUFileCachePriority::LRUFileCacheIterator : public IFileCachePriority::IIterator
|
||||
|
@ -79,6 +79,22 @@ String getBlockDeviceId([[maybe_unused]] const String & path)
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
std::optional<String> tryGetBlockDeviceId([[maybe_unused]] const String & path)
|
||||
{
|
||||
#if defined(OS_LINUX)
|
||||
struct stat sb;
|
||||
if (lstat(path.c_str(), &sb))
|
||||
return {};
|
||||
WriteBufferFromOwnString ss;
|
||||
ss << major(sb.st_dev) << ":" << minor(sb.st_dev);
|
||||
return ss.str();
|
||||
#else
|
||||
return {};
|
||||
#endif
|
||||
|
||||
}
|
||||
|
||||
#if !defined(OS_LINUX)
|
||||
[[noreturn]]
|
||||
#endif
|
||||
|
@ -25,6 +25,8 @@ std::unique_ptr<TemporaryFile> createTemporaryFile(const std::string & path);
|
||||
#endif
|
||||
String getBlockDeviceId([[maybe_unused]] const String & path);
|
||||
|
||||
std::optional<String> tryGetBlockDeviceId([[maybe_unused]] const String & path);
|
||||
|
||||
enum class BlockDeviceType
|
||||
{
|
||||
UNKNOWN = 0, // we were unable to determine device type
|
||||
|
@ -90,24 +90,56 @@ struct DataTypeDecimalTrait
|
||||
* Sign of `fractional` is expected to be positive, otherwise result is undefined.
|
||||
* If `scale` is to big (scale > max_precision<DecimalType::NativeType>), result is undefined.
|
||||
*/
|
||||
template <typename DecimalType>
|
||||
inline DecimalType decimalFromComponentsWithMultiplier(
|
||||
const typename DecimalType::NativeType & whole,
|
||||
const typename DecimalType::NativeType & fractional,
|
||||
typename DecimalType::NativeType scale_multiplier)
|
||||
|
||||
template <typename DecimalType, bool throw_on_error>
|
||||
inline bool decimalFromComponentsWithMultiplierImpl(
|
||||
const typename DecimalType::NativeType & whole,
|
||||
const typename DecimalType::NativeType & fractional,
|
||||
typename DecimalType::NativeType scale_multiplier,
|
||||
DecimalType & result)
|
||||
{
|
||||
using T = typename DecimalType::NativeType;
|
||||
const auto fractional_sign = whole < 0 ? -1 : 1;
|
||||
|
||||
T whole_scaled = 0;
|
||||
if (common::mulOverflow(whole, scale_multiplier, whole_scaled))
|
||||
throw Exception("Decimal math overflow", ErrorCodes::DECIMAL_OVERFLOW);
|
||||
{
|
||||
if constexpr (throw_on_error)
|
||||
throw Exception("Decimal math overflow", ErrorCodes::DECIMAL_OVERFLOW);
|
||||
return false;
|
||||
}
|
||||
|
||||
T value;
|
||||
if (common::addOverflow(whole_scaled, fractional_sign * (fractional % scale_multiplier), value))
|
||||
throw Exception("Decimal math overflow", ErrorCodes::DECIMAL_OVERFLOW);
|
||||
{
|
||||
if constexpr (throw_on_error)
|
||||
throw Exception("Decimal math overflow", ErrorCodes::DECIMAL_OVERFLOW);
|
||||
return false;
|
||||
}
|
||||
|
||||
return DecimalType(value);
|
||||
result = DecimalType(value);
|
||||
return true;
|
||||
}
|
||||
|
||||
template <typename DecimalType>
|
||||
inline DecimalType decimalFromComponentsWithMultiplier(
|
||||
const typename DecimalType::NativeType & whole,
|
||||
const typename DecimalType::NativeType & fractional,
|
||||
typename DecimalType::NativeType scale_multiplier)
|
||||
{
|
||||
DecimalType result;
|
||||
decimalFromComponentsWithMultiplierImpl<DecimalType, true>(whole, fractional, scale_multiplier, result);
|
||||
return result;
|
||||
}
|
||||
|
||||
template <typename DecimalType>
|
||||
inline bool tryGetDecimalFromComponentsWithMultiplier(
|
||||
const typename DecimalType::NativeType & whole,
|
||||
const typename DecimalType::NativeType & fractional,
|
||||
typename DecimalType::NativeType scale_multiplier,
|
||||
DecimalType & result)
|
||||
{
|
||||
return decimalFromComponentsWithMultiplierImpl<DecimalType, false>(whole, fractional, scale_multiplier, result);
|
||||
}
|
||||
|
||||
template <typename DecimalType>
|
||||
@ -118,6 +150,15 @@ inline DecimalType decimalFromComponentsWithMultiplier(
|
||||
return decimalFromComponentsWithMultiplier<DecimalType>(components.whole, components.fractional, scale_multiplier);
|
||||
}
|
||||
|
||||
template <typename DecimalType>
|
||||
inline bool tryGetDecimalFromComponentsWithMultiplier(
|
||||
const DecimalComponents<DecimalType> & components,
|
||||
typename DecimalType::NativeType scale_multiplier,
|
||||
DecimalType & result)
|
||||
{
|
||||
return tryGetDecimalFromComponentsWithMultiplier<DecimalType>(components.whole, components.fractional, scale_multiplier, result);
|
||||
}
|
||||
|
||||
|
||||
/** Make a decimal value from whole and fractional components with given scale.
|
||||
*
|
||||
@ -134,6 +175,18 @@ inline DecimalType decimalFromComponents(
|
||||
return decimalFromComponentsWithMultiplier<DecimalType>(whole, fractional, scaleMultiplier<T>(scale));
|
||||
}
|
||||
|
||||
template <typename DecimalType>
|
||||
inline bool tryGetDecimalFromComponents(
|
||||
const typename DecimalType::NativeType & whole,
|
||||
const typename DecimalType::NativeType & fractional,
|
||||
UInt32 scale,
|
||||
DecimalType & result)
|
||||
{
|
||||
using T = typename DecimalType::NativeType;
|
||||
|
||||
return tryGetDecimalFromComponentsWithMultiplier<DecimalType>(whole, fractional, scaleMultiplier<T>(scale), result);
|
||||
}
|
||||
|
||||
/** Make a decimal value from whole and fractional components with given scale.
|
||||
* @see `decimalFromComponentsWithMultiplier` for details.
|
||||
*/
|
||||
@ -145,6 +198,15 @@ inline DecimalType decimalFromComponents(
|
||||
return decimalFromComponents<DecimalType>(components.whole, components.fractional, scale);
|
||||
}
|
||||
|
||||
template <typename DecimalType>
|
||||
inline bool tryGetDecimalFromComponents(
|
||||
const DecimalComponents<DecimalType> & components,
|
||||
UInt32 scale,
|
||||
DecimalType & result)
|
||||
{
|
||||
return tryGetDecimalFromComponents<DecimalType>(components.whole, components.fractional, scale, result);
|
||||
}
|
||||
|
||||
/** Split decimal into whole and fractional parts with given scale_multiplier.
|
||||
* This is an optimization to reduce number of calls to scaleMultiplier on known scale.
|
||||
*/
|
||||
|
@ -90,6 +90,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
|
||||
M(UInt64, s3_max_connections, 1024, "The maximum number of connections per server.", 0) \
|
||||
M(Bool, s3_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables.", 0) \
|
||||
M(Bool, s3_create_new_file_on_insert, false, "Enables or disables creating a new file on each insert in s3 engine tables", 0) \
|
||||
M(Bool, s3_check_objects_after_upload, false, "Check each uploaded object to s3 with head request to be sure that upload was successful", 0) \
|
||||
M(Bool, enable_s3_requests_logging, false, "Enable very explicit logging of S3 requests. Makes sense for debug only.", 0) \
|
||||
M(UInt64, hdfs_replication, 0, "The actual number of replications can be specified when the hdfs file is created.", 0) \
|
||||
M(Bool, hdfs_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables", 0) \
|
||||
|
@ -72,7 +72,7 @@ public:
|
||||
void sync(int fd) const;
|
||||
String getUniqueId(const String & path) const override { return delegate->getUniqueId(path); }
|
||||
bool checkUniqueId(const String & id) const override { return delegate->checkUniqueId(id); }
|
||||
DiskType getType() const override { return delegate->getType(); }
|
||||
DataSourceDescription getDataSourceDescription() const override { return delegate->getDataSourceDescription(); }
|
||||
bool isRemote() const override { return delegate->isRemote(); }
|
||||
bool supportZeroCopyReplication() const override { return delegate->supportZeroCopyReplication(); }
|
||||
bool supportParallelWrite() const override { return delegate->supportParallelWrite(); }
|
||||
|
@ -234,7 +234,13 @@ public:
|
||||
|
||||
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap & map) override;
|
||||
|
||||
DiskType getType() const override { return DiskType::Encrypted; }
|
||||
DataSourceDescription getDataSourceDescription() const override
|
||||
{
|
||||
auto delegate_description = delegate->getDataSourceDescription();
|
||||
delegate_description.is_encrypted = true;
|
||||
return delegate_description;
|
||||
}
|
||||
|
||||
bool isRemote() const override { return delegate->isRemote(); }
|
||||
|
||||
SyncGuardPtr getDirectorySyncGuard(const String & path) const override;
|
||||
|
@ -230,14 +230,14 @@ std::optional<UInt64> DiskLocal::tryReserve(UInt64 bytes)
|
||||
|
||||
if (bytes == 0)
|
||||
{
|
||||
LOG_DEBUG(log, "Reserving 0 bytes on disk {}", backQuote(name));
|
||||
LOG_DEBUG(logger, "Reserving 0 bytes on disk {}", backQuote(name));
|
||||
++reservation_count;
|
||||
return {unreserved_space};
|
||||
}
|
||||
|
||||
if (unreserved_space >= bytes)
|
||||
{
|
||||
LOG_DEBUG(log, "Reserving {} on disk {}, having unreserved {}.",
|
||||
LOG_DEBUG(logger, "Reserving {} on disk {}, having unreserved {}.",
|
||||
ReadableSize(bytes), backQuote(name), ReadableSize(unreserved_space));
|
||||
++reservation_count;
|
||||
reserved_bytes += bytes;
|
||||
@ -497,6 +497,14 @@ DiskLocal::DiskLocal(const String & name_, const String & path_, UInt64 keep_fre
|
||||
, keep_free_space_bytes(keep_free_space_bytes_)
|
||||
, logger(&Poco::Logger::get("DiskLocal"))
|
||||
{
|
||||
data_source_description.type = DataSourceType::Local;
|
||||
|
||||
if (auto block_device_id = tryGetBlockDeviceId(disk_path); block_device_id.has_value())
|
||||
data_source_description.description = *block_device_id;
|
||||
else
|
||||
data_source_description.description = disk_path;
|
||||
data_source_description.is_encrypted = false;
|
||||
data_source_description.is_cached = false;
|
||||
}
|
||||
|
||||
DiskLocal::DiskLocal(
|
||||
@ -507,6 +515,11 @@ DiskLocal::DiskLocal(
|
||||
disk_checker = std::make_unique<DiskLocalCheckThread>(this, context, local_disk_check_period_ms);
|
||||
}
|
||||
|
||||
DataSourceDescription DiskLocal::getDataSourceDescription() const
|
||||
{
|
||||
return data_source_description;
|
||||
}
|
||||
|
||||
void DiskLocal::startup(ContextPtr)
|
||||
{
|
||||
try
|
||||
@ -615,7 +628,6 @@ DiskObjectStoragePtr DiskLocal::createDiskObjectStorage()
|
||||
"Local",
|
||||
metadata_storage,
|
||||
object_storage,
|
||||
DiskType::Local,
|
||||
false,
|
||||
/* threadpool_size */16
|
||||
);
|
||||
@ -714,6 +726,13 @@ void DiskLocal::chmod(const String & path, mode_t mode)
|
||||
DB::throwFromErrnoWithPath("Cannot chmod file: " + path, path, DB::ErrorCodes::PATH_ACCESS_DENIED);
|
||||
}
|
||||
|
||||
MetadataStoragePtr DiskLocal::getMetadataStorage()
|
||||
{
|
||||
auto object_storage = std::make_shared<LocalObjectStorage>();
|
||||
return std::make_shared<FakeMetadataStorageFromDisk>(
|
||||
std::static_pointer_cast<IDisk>(shared_from_this()), object_storage, getPath());
|
||||
}
|
||||
|
||||
void registerDiskLocal(DiskFactory & factory)
|
||||
{
|
||||
auto creator = [](const String & name,
|
||||
|
@ -101,7 +101,8 @@ public:
|
||||
|
||||
void truncateFile(const String & path, size_t size) override;
|
||||
|
||||
DiskType getType() const override { return DiskType::Local; }
|
||||
DataSourceDescription getDataSourceDescription() const override;
|
||||
|
||||
bool isRemote() const override { return false; }
|
||||
|
||||
bool supportZeroCopyReplication() const override { return false; }
|
||||
@ -130,6 +131,8 @@ public:
|
||||
bool supportsChmod() const override { return true; }
|
||||
void chmod(const String & path, mode_t mode) override;
|
||||
|
||||
MetadataStoragePtr getMetadataStorage() override;
|
||||
|
||||
private:
|
||||
std::optional<UInt64> tryReserve(UInt64 bytes);
|
||||
|
||||
@ -145,14 +148,13 @@ private:
|
||||
const String disk_checker_path = ".disk_checker_file";
|
||||
std::atomic<UInt64> keep_free_space_bytes;
|
||||
Poco::Logger * logger;
|
||||
DataSourceDescription data_source_description;
|
||||
|
||||
UInt64 reserved_bytes = 0;
|
||||
UInt64 reservation_count = 0;
|
||||
|
||||
static std::mutex reservation_mutex;
|
||||
|
||||
Poco::Logger * log = &Poco::Logger::get("DiskLocal");
|
||||
|
||||
std::atomic<bool> broken{false};
|
||||
std::atomic<bool> readonly{false};
|
||||
std::unique_ptr<DiskLocalCheckThread> disk_checker;
|
||||
|
@ -7,6 +7,8 @@
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
#include <Interpreters/Context.h>
|
||||
|
||||
#include <Disks/ObjectStorages/LocalObjectStorage.h>
|
||||
#include <Disks/ObjectStorages/FakeMetadataStorageFromDisk.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -443,6 +445,13 @@ void DiskMemory::truncateFile(const String & path, size_t size)
|
||||
file_it->second.data.resize(size);
|
||||
}
|
||||
|
||||
MetadataStoragePtr DiskMemory::getMetadataStorage()
|
||||
{
|
||||
auto object_storage = std::make_shared<LocalObjectStorage>();
|
||||
return std::make_shared<FakeMetadataStorageFromDisk>(
|
||||
std::static_pointer_cast<IDisk>(shared_from_this()), object_storage, getPath());
|
||||
}
|
||||
|
||||
|
||||
using DiskMemoryPtr = std::shared_ptr<DiskMemory>;
|
||||
|
||||
|
@ -91,11 +91,14 @@ public:
|
||||
|
||||
void truncateFile(const String & path, size_t size) override;
|
||||
|
||||
DiskType getType() const override { return DiskType::RAM; }
|
||||
DataSourceDescription getDataSourceDescription() const override { return DataSourceDescription{DataSourceType::RAM, "", false, false}; }
|
||||
|
||||
bool isRemote() const override { return false; }
|
||||
|
||||
bool supportZeroCopyReplication() const override { return false; }
|
||||
|
||||
MetadataStoragePtr getMetadataStorage() override;
|
||||
|
||||
private:
|
||||
void createDirectoriesImpl(const String & path);
|
||||
void replaceFileImpl(const String & from_path, const String & to_path);
|
||||
|
11
src/Disks/DiskType.cpp
Normal file
11
src/Disks/DiskType.cpp
Normal file
@ -0,0 +1,11 @@
|
||||
#include "DiskType.h"
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
bool DataSourceDescription::operator==(const DataSourceDescription & other) const
|
||||
{
|
||||
return std::tie(type, description, is_encrypted) == std::tie(other.type, other.description, other.is_encrypted);
|
||||
}
|
||||
|
||||
}
|
@ -5,40 +5,45 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
enum class DiskType
|
||||
enum class DataSourceType
|
||||
{
|
||||
Local,
|
||||
RAM,
|
||||
S3,
|
||||
HDFS,
|
||||
Encrypted,
|
||||
WebServer,
|
||||
AzureBlobStorage,
|
||||
Cache,
|
||||
};
|
||||
|
||||
inline String toString(DiskType disk_type)
|
||||
inline String toString(DataSourceType data_source_type)
|
||||
{
|
||||
switch (disk_type)
|
||||
switch (data_source_type)
|
||||
{
|
||||
case DiskType::Local:
|
||||
case DataSourceType::Local:
|
||||
return "local";
|
||||
case DiskType::RAM:
|
||||
case DataSourceType::RAM:
|
||||
return "memory";
|
||||
case DiskType::S3:
|
||||
case DataSourceType::S3:
|
||||
return "s3";
|
||||
case DiskType::HDFS:
|
||||
case DataSourceType::HDFS:
|
||||
return "hdfs";
|
||||
case DiskType::Encrypted:
|
||||
return "encrypted";
|
||||
case DiskType::WebServer:
|
||||
case DataSourceType::WebServer:
|
||||
return "web";
|
||||
case DiskType::AzureBlobStorage:
|
||||
case DataSourceType::AzureBlobStorage:
|
||||
return "azure_blob_storage";
|
||||
case DiskType::Cache:
|
||||
return "cache";
|
||||
}
|
||||
__builtin_unreachable();
|
||||
}
|
||||
|
||||
struct DataSourceDescription
|
||||
{
|
||||
DataSourceType type;
|
||||
std::string description;
|
||||
|
||||
bool is_encrypted = false;
|
||||
bool is_cached = false;
|
||||
|
||||
bool operator==(const DataSourceDescription & other) const;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -113,7 +113,7 @@ void IDisk::copyDirectoryContent(const String & from_dir, const std::shared_ptr<
|
||||
|
||||
void IDisk::truncateFile(const String &, size_t)
|
||||
{
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Truncate operation is not implemented for disk of type {}", getType());
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Truncate operation is not implemented for disk of type {}", getDataSourceDescription().type);
|
||||
}
|
||||
|
||||
SyncGuardPtr IDisk::getDirectorySyncGuard(const String & /* path */) const
|
||||
@ -121,18 +121,4 @@ SyncGuardPtr IDisk::getDirectorySyncGuard(const String & /* path */) const
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
MetadataStoragePtr IDisk::getMetadataStorage()
|
||||
{
|
||||
if (isRemote())
|
||||
{
|
||||
return std::make_shared<MetadataStorageFromDisk>(std::static_pointer_cast<IDisk>(shared_from_this()), "");
|
||||
}
|
||||
else
|
||||
{
|
||||
auto object_storage = std::make_shared<LocalObjectStorage>();
|
||||
return std::make_shared<FakeMetadataStorageFromDisk>(
|
||||
std::static_pointer_cast<IDisk>(shared_from_this()), object_storage, getPath());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -227,7 +227,7 @@ public:
|
||||
|
||||
virtual NameSet getCacheLayersNames() const
|
||||
{
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method `getCacheLayersNames()` is not implemented for disk: {}", getType());
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method `getCacheLayersNames()` is not implemented for disk: {}", getDataSourceDescription().type);
|
||||
}
|
||||
|
||||
/// Returns a list of storage objects (contains path, size, ...).
|
||||
@ -235,7 +235,7 @@ public:
|
||||
/// be multiple files in remote fs for single clickhouse file.
|
||||
virtual StoredObjects getStorageObjects(const String &) const
|
||||
{
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method `getStorageObjects() not implemented for disk: {}`", getType());
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method `getStorageObjects() not implemented for disk: {}`", getDataSourceDescription().type);
|
||||
}
|
||||
|
||||
/// For one local path there might be multiple remote paths in case of Log family engines.
|
||||
@ -243,7 +243,7 @@ public:
|
||||
|
||||
virtual void getRemotePathsRecursive(const String &, std::vector<LocalPathWithObjectStoragePaths> &)
|
||||
{
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method `getRemotePathsRecursive() not implemented for disk: {}`", getType());
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method `getRemotePathsRecursive() not implemented for disk: {}`", getDataSourceDescription().type);
|
||||
}
|
||||
|
||||
/// Batch request to remove multiple files.
|
||||
@ -271,8 +271,8 @@ public:
|
||||
/// Truncate file to specified size.
|
||||
virtual void truncateFile(const String & path, size_t size);
|
||||
|
||||
/// Return disk type - "local", "s3", etc.
|
||||
virtual DiskType getType() const = 0;
|
||||
/// Return data source description
|
||||
virtual DataSourceDescription getDataSourceDescription() const = 0;
|
||||
|
||||
/// Involves network interaction.
|
||||
virtual bool isRemote() const = 0;
|
||||
@ -321,7 +321,7 @@ public:
|
||||
/// Actually it's a part of IDiskRemote implementation but we have so
|
||||
/// complex hierarchy of disks (with decorators), so we cannot even
|
||||
/// dynamic_cast some pointer to IDisk to pointer to IDiskRemote.
|
||||
virtual MetadataStoragePtr getMetadataStorage();
|
||||
virtual MetadataStoragePtr getMetadataStorage() = 0;
|
||||
|
||||
/// Very similar case as for getMetadataDiskIfExistsOrSelf(). If disk has "metadata"
|
||||
/// it will return mapping for each required path: path -> metadata as string.
|
||||
@ -357,7 +357,7 @@ public:
|
||||
throw Exception(
|
||||
ErrorCodes::NOT_IMPLEMENTED,
|
||||
"Method createDiskObjectStorage() is not implemented for disk type: {}",
|
||||
getType());
|
||||
getDataSourceDescription().type);
|
||||
}
|
||||
|
||||
virtual bool supportsStat() const { return false; }
|
||||
|
@ -32,6 +32,10 @@ AzureObjectStorage::AzureObjectStorage(
|
||||
, settings(std::move(settings_))
|
||||
, log(&Poco::Logger::get("AzureObjectStorage"))
|
||||
{
|
||||
data_source_description.type = DataSourceType::AzureBlobStorage;
|
||||
data_source_description.description = client.get()->GetUrl();
|
||||
data_source_description.is_cached = false;
|
||||
data_source_description.is_encrypted = false;
|
||||
}
|
||||
|
||||
std::string AzureObjectStorage::generateBlobNameForPath(const std::string & /* path */)
|
||||
|
@ -57,6 +57,8 @@ public:
|
||||
AzureClientPtr && client_,
|
||||
SettingsPtr && settings_);
|
||||
|
||||
DataSourceDescription getDataSourceDescription() const override { return data_source_description; }
|
||||
|
||||
std::string getName() const override { return "AzureObjectStorage"; }
|
||||
|
||||
bool exists(const StoredObject & object) const override;
|
||||
@ -128,6 +130,8 @@ private:
|
||||
MultiVersion<AzureObjectStorageSettings> settings;
|
||||
|
||||
Poco::Logger * log;
|
||||
|
||||
DataSourceDescription data_source_description;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -90,7 +90,6 @@ void registerDiskAzureBlobStorage(DiskFactory & factory)
|
||||
"DiskAzureBlobStorage",
|
||||
std::move(metadata_storage),
|
||||
std::move(azure_object_storage),
|
||||
DiskType::AzureBlobStorage,
|
||||
send_metadata,
|
||||
copy_thread_pool_size
|
||||
);
|
||||
|
@ -34,6 +34,13 @@ CachedObjectStorage::CachedObjectStorage(
|
||||
cache->initialize();
|
||||
}
|
||||
|
||||
DataSourceDescription CachedObjectStorage::getDataSourceDescription() const
|
||||
{
|
||||
auto wrapped_object_storage_data_source = object_storage->getDataSourceDescription();
|
||||
wrapped_object_storage_data_source.is_cached = true;
|
||||
return wrapped_object_storage_data_source;
|
||||
}
|
||||
|
||||
FileCache::Key CachedObjectStorage::getCacheKey(const std::string & path) const
|
||||
{
|
||||
return cache->hash(path);
|
||||
|
@ -20,6 +20,8 @@ class CachedObjectStorage final : public IObjectStorage
|
||||
public:
|
||||
CachedObjectStorage(ObjectStoragePtr object_storage_, FileCachePtr cache_, const FileCacheSettings & cache_settings_, const String & cache_config_name_);
|
||||
|
||||
DataSourceDescription getDataSourceDescription() const override;
|
||||
|
||||
std::string getName() const override { return fmt::format("CachedObjectStorage-{}({})", cache_config_name, object_storage->getName()); }
|
||||
|
||||
bool exists(const StoredObject & object) const override;
|
||||
|
@ -103,14 +103,12 @@ DiskObjectStorage::DiskObjectStorage(
|
||||
const String & log_name,
|
||||
MetadataStoragePtr metadata_storage_,
|
||||
ObjectStoragePtr object_storage_,
|
||||
DiskType disk_type_,
|
||||
bool send_metadata_,
|
||||
uint64_t thread_pool_size_)
|
||||
: IDisk(getAsyncExecutor(log_name, thread_pool_size_))
|
||||
, name(name_)
|
||||
, object_storage_root_path(object_storage_root_path_)
|
||||
, log (&Poco::Logger::get("DiskObjectStorage(" + log_name + ")"))
|
||||
, disk_type(disk_type_)
|
||||
, metadata_storage(std::move(metadata_storage_))
|
||||
, object_storage(std::move(object_storage_))
|
||||
, send_metadata(send_metadata_)
|
||||
@ -216,6 +214,22 @@ void DiskObjectStorage::moveFile(const String & from_path, const String & to_pat
|
||||
transaction->commit();
|
||||
}
|
||||
|
||||
|
||||
void DiskObjectStorage::copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path)
|
||||
{
|
||||
/// It's the same object storage disk
|
||||
if (this == to_disk.get())
|
||||
{
|
||||
auto transaction = createObjectStorageTransaction();
|
||||
transaction->copyFile(from_path, to_path);
|
||||
transaction->commit();
|
||||
}
|
||||
else
|
||||
{
|
||||
IDisk::copy(from_path, to_disk, to_path);
|
||||
}
|
||||
}
|
||||
|
||||
void DiskObjectStorage::moveFile(const String & from_path, const String & to_path)
|
||||
{
|
||||
moveFile(from_path, to_path, send_metadata);
|
||||
@ -476,7 +490,6 @@ DiskObjectStoragePtr DiskObjectStorage::createDiskObjectStorage()
|
||||
getName(),
|
||||
metadata_storage,
|
||||
object_storage,
|
||||
disk_type,
|
||||
send_metadata,
|
||||
threadpool_size);
|
||||
}
|
||||
|
@ -34,14 +34,13 @@ public:
|
||||
const String & log_name,
|
||||
MetadataStoragePtr metadata_storage_,
|
||||
ObjectStoragePtr object_storage_,
|
||||
DiskType disk_type_,
|
||||
bool send_metadata_,
|
||||
uint64_t thread_pool_size_);
|
||||
|
||||
/// Create fake transaction
|
||||
DiskTransactionPtr createTransaction() override;
|
||||
|
||||
DiskType getType() const override { return disk_type; }
|
||||
DataSourceDescription getDataSourceDescription() const override { return object_storage->getDataSourceDescription(); }
|
||||
|
||||
bool supportZeroCopyReplication() const override { return true; }
|
||||
|
||||
@ -156,6 +155,8 @@ public:
|
||||
WriteMode mode,
|
||||
const WriteSettings & settings) override;
|
||||
|
||||
void copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path) override;
|
||||
|
||||
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context_, const String &, const DisksMap &) override;
|
||||
|
||||
void restoreMetadataIfNeeded(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context);
|
||||
@ -208,7 +209,6 @@ private:
|
||||
const String object_storage_root_path;
|
||||
Poco::Logger * log;
|
||||
|
||||
const DiskType disk_type;
|
||||
MetadataStoragePtr metadata_storage;
|
||||
ObjectStoragePtr object_storage;
|
||||
|
||||
|
@ -48,10 +48,20 @@ public:
|
||||
, hdfs_builder(createHDFSBuilder(hdfs_root_path_, config))
|
||||
, hdfs_fs(createHDFSFS(hdfs_builder.get()))
|
||||
, settings(std::move(settings_))
|
||||
{}
|
||||
{
|
||||
data_source_description.type = DataSourceType::HDFS;
|
||||
data_source_description.description = hdfs_root_path_;
|
||||
data_source_description.is_cached = false;
|
||||
data_source_description.is_encrypted = false;
|
||||
}
|
||||
|
||||
std::string getName() const override { return "HDFSObjectStorage"; }
|
||||
|
||||
DataSourceDescription getDataSourceDescription() const override
|
||||
{
|
||||
return data_source_description;
|
||||
}
|
||||
|
||||
bool exists(const StoredObject & object) const override;
|
||||
|
||||
std::unique_ptr<ReadBufferFromFileBase> readObject( /// NOLINT
|
||||
@ -121,6 +131,8 @@ private:
|
||||
HDFSFSPtr hdfs_fs;
|
||||
|
||||
SettingsPtr settings;
|
||||
|
||||
DataSourceDescription data_source_description;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -49,7 +49,6 @@ void registerDiskHDFS(DiskFactory & factory)
|
||||
"DiskHDFS",
|
||||
std::move(metadata_storage),
|
||||
std::move(hdfs_storage),
|
||||
DiskType::HDFS,
|
||||
/* send_metadata = */ false,
|
||||
copy_thread_pool_size);
|
||||
|
||||
|
@ -15,6 +15,7 @@
|
||||
|
||||
#include <Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h>
|
||||
#include <Disks/ObjectStorages/StoredObject.h>
|
||||
#include <Disks/DiskType.h>
|
||||
#include <Common/ThreadPool.h>
|
||||
#include <Common/FileCache.h>
|
||||
#include <Disks/WriteMode.h>
|
||||
@ -58,6 +59,8 @@ class IObjectStorage
|
||||
public:
|
||||
IObjectStorage() = default;
|
||||
|
||||
virtual DataSourceDescription getDataSourceDescription() const = 0;
|
||||
|
||||
virtual std::string getName() const = 0;
|
||||
|
||||
/// Object exists or not
|
||||
|
@ -28,6 +28,14 @@ namespace ErrorCodes
|
||||
LocalObjectStorage::LocalObjectStorage()
|
||||
: log(&Poco::Logger::get("LocalObjectStorage"))
|
||||
{
|
||||
data_source_description.type = DataSourceType::Local;
|
||||
if (auto block_device_id = tryGetBlockDeviceId("/"); block_device_id.has_value())
|
||||
data_source_description.description = *block_device_id;
|
||||
else
|
||||
data_source_description.description = "/";
|
||||
|
||||
data_source_description.is_cached = false;
|
||||
data_source_description.is_encrypted = false;
|
||||
}
|
||||
|
||||
bool LocalObjectStorage::exists(const StoredObject & object) const
|
||||
|
@ -17,6 +17,8 @@ class LocalObjectStorage : public IObjectStorage
|
||||
public:
|
||||
LocalObjectStorage();
|
||||
|
||||
DataSourceDescription getDataSourceDescription() const override { return data_source_description; }
|
||||
|
||||
std::string getName() const override { return "LocalObjectStorage"; }
|
||||
|
||||
bool exists(const StoredObject & object) const override;
|
||||
@ -86,6 +88,7 @@ public:
|
||||
|
||||
private:
|
||||
Poco::Logger * log;
|
||||
DataSourceDescription data_source_description;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -2,6 +2,8 @@
|
||||
|
||||
#if USE_AWS_S3
|
||||
|
||||
#include <IO/S3Common.h>
|
||||
|
||||
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
|
||||
#include <Disks/ObjectStorages/DiskObjectStorageCommon.h>
|
||||
#include <Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h>
|
||||
@ -25,6 +27,7 @@
|
||||
#include <aws/s3/model/AbortMultipartUploadRequest.h>
|
||||
|
||||
#include <Common/getRandomASCIIString.h>
|
||||
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/MultiVersion.h>
|
||||
|
||||
@ -373,6 +376,15 @@ void S3ObjectStorage::copyObjectImpl(
|
||||
}
|
||||
|
||||
throwIfError(outcome);
|
||||
|
||||
auto settings_ptr = s3_settings.get();
|
||||
if (settings_ptr->s3_settings.check_objects_after_upload)
|
||||
{
|
||||
auto object_head = requestObjectHeadData(dst_bucket, dst_key);
|
||||
if (!object_head.IsSuccess())
|
||||
throw Exception(ErrorCodes::S3_ERROR, "Object {} from bucket {} disappeared immediately after upload, it's a bug in S3 or S3 API.", dst_key, dst_bucket);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void S3ObjectStorage::copyObjectMultipartImpl(
|
||||
@ -454,6 +466,14 @@ void S3ObjectStorage::copyObjectMultipartImpl(
|
||||
|
||||
throwIfError(outcome);
|
||||
}
|
||||
|
||||
if (settings_ptr->s3_settings.check_objects_after_upload)
|
||||
{
|
||||
auto object_head = requestObjectHeadData(dst_bucket, dst_key);
|
||||
if (!object_head.IsSuccess())
|
||||
throw Exception(ErrorCodes::S3_ERROR, "Object {} from bucket {} disappeared immediately after upload, it's a bug in S3 or S3 API.", dst_key, dst_bucket);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void S3ObjectStorage::copyObject( // NOLINT
|
||||
@ -515,7 +535,8 @@ std::unique_ptr<IObjectStorage> S3ObjectStorage::cloneObjectStorage(
|
||||
return std::make_unique<S3ObjectStorage>(
|
||||
getClient(config, config_prefix, context),
|
||||
getSettings(config, config_prefix, context),
|
||||
version_id, s3_capabilities, new_namespace);
|
||||
version_id, s3_capabilities, new_namespace,
|
||||
S3::URI(Poco::URI(config.getString(config_prefix + ".endpoint"))).endpoint);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -48,13 +48,23 @@ public:
|
||||
std::unique_ptr<S3ObjectStorageSettings> && s3_settings_,
|
||||
String version_id_,
|
||||
const S3Capabilities & s3_capabilities_,
|
||||
String bucket_)
|
||||
String bucket_,
|
||||
String connection_string)
|
||||
: bucket(bucket_)
|
||||
, client(std::move(client_))
|
||||
, s3_settings(std::move(s3_settings_))
|
||||
, s3_capabilities(s3_capabilities_)
|
||||
, version_id(std::move(version_id_))
|
||||
{
|
||||
data_source_description.type = DataSourceType::S3;
|
||||
data_source_description.description = connection_string;
|
||||
data_source_description.is_cached = false;
|
||||
data_source_description.is_encrypted = false;
|
||||
}
|
||||
|
||||
DataSourceDescription getDataSourceDescription() const override
|
||||
{
|
||||
return data_source_description;
|
||||
}
|
||||
|
||||
std::string getName() const override { return "S3ObjectStorage"; }
|
||||
@ -171,6 +181,7 @@ private:
|
||||
const String version_id;
|
||||
|
||||
Poco::Logger * log = &Poco::Logger::get("S3ObjectStorage");
|
||||
DataSourceDescription data_source_description;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -40,6 +40,7 @@ std::unique_ptr<S3ObjectStorageSettings> getSettings(const Poco::Util::AbstractC
|
||||
rw_settings.upload_part_size_multiply_factor = config.getUInt64(config_prefix + ".s3_upload_part_size_multiply_factor", context->getSettingsRef().s3_upload_part_size_multiply_factor);
|
||||
rw_settings.upload_part_size_multiply_parts_count_threshold = config.getUInt64(config_prefix + ".s3_upload_part_size_multiply_parts_count_threshold", context->getSettingsRef().s3_upload_part_size_multiply_parts_count_threshold);
|
||||
rw_settings.max_single_part_upload_size = config.getUInt64(config_prefix + ".s3_max_single_part_upload_size", context->getSettingsRef().s3_max_single_part_upload_size);
|
||||
rw_settings.check_objects_after_upload = config.getUInt64(config_prefix + ".s3_check_objects_after_upload", context->getSettingsRef().s3_check_objects_after_upload);
|
||||
|
||||
return std::make_unique<S3ObjectStorageSettings>(
|
||||
rw_settings,
|
||||
|
@ -130,7 +130,7 @@ void registerDiskS3(DiskFactory & factory)
|
||||
auto s3_storage = std::make_unique<S3ObjectStorage>(
|
||||
getClient(config, config_prefix, context),
|
||||
getSettings(config, config_prefix, context),
|
||||
uri.version_id, s3_capabilities, uri.bucket);
|
||||
uri.version_id, s3_capabilities, uri.bucket, uri.endpoint);
|
||||
|
||||
bool skip_access_check = config.getBool(config_prefix + ".skip_access_check", false);
|
||||
|
||||
@ -159,7 +159,6 @@ void registerDiskS3(DiskFactory & factory)
|
||||
"DiskS3",
|
||||
std::move(metadata_storage),
|
||||
std::move(s3_storage),
|
||||
DiskType::S3,
|
||||
send_metadata,
|
||||
copy_thread_pool_size);
|
||||
|
||||
|
@ -20,6 +20,16 @@ class WebObjectStorage : public IObjectStorage, WithContext
|
||||
public:
|
||||
WebObjectStorage(const String & url_, ContextPtr context_);
|
||||
|
||||
DataSourceDescription getDataSourceDescription() const override
|
||||
{
|
||||
return DataSourceDescription{
|
||||
.type = DataSourceType::WebServer,
|
||||
.description = url,
|
||||
.is_encrypted = false,
|
||||
.is_cached = false,
|
||||
};
|
||||
}
|
||||
|
||||
std::string getName() const override { return "WebObjectStorage"; }
|
||||
|
||||
bool exists(const StoredObject & object) const override;
|
||||
|
@ -47,7 +47,6 @@ void registerDiskWebServer(DiskFactory & factory)
|
||||
"DiskWebServer",
|
||||
metadata_storage,
|
||||
object_storage,
|
||||
DiskType::WebServer,
|
||||
/* send_metadata */false,
|
||||
/* threadpool_size */16);
|
||||
};
|
||||
|
@ -1017,9 +1017,7 @@ inline bool tryParseImpl<DataTypeDate32>(DataTypeDate32::FieldType & x, ReadBuff
|
||||
{
|
||||
ExtendedDayNum tmp(0);
|
||||
if (!tryReadDateText(tmp, rb))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
x = tmp;
|
||||
return true;
|
||||
}
|
||||
@ -1102,9 +1100,27 @@ struct ConvertThroughParsing
|
||||
if (in.eof())
|
||||
return true;
|
||||
|
||||
/// Special case, that allows to parse string with DateTime as Date.
|
||||
if (std::is_same_v<ToDataType, DataTypeDate> && (in.buffer().size()) == strlen("YYYY-MM-DD hh:mm:ss"))
|
||||
return true;
|
||||
/// Special case, that allows to parse string with DateTime or DateTime64 as Date or Date32.
|
||||
if constexpr (std::is_same_v<ToDataType, DataTypeDate> || std::is_same_v<ToDataType, DataTypeDate32>)
|
||||
{
|
||||
if (!in.eof() && (*in.position() == ' ' || *in.position() == 'T'))
|
||||
{
|
||||
if (in.buffer().size() == strlen("YYYY-MM-DD hh:mm:ss"))
|
||||
return true;
|
||||
|
||||
if (in.buffer().size() >= strlen("YYYY-MM-DD hh:mm:ss.x")
|
||||
&& in.buffer().begin()[19] == '.')
|
||||
{
|
||||
in.position() = in.buffer().begin() + 20;
|
||||
|
||||
while (!in.eof() && isNumericASCII(*in.position()))
|
||||
++in.position();
|
||||
|
||||
if (in.eof())
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
@ -1240,8 +1256,10 @@ struct ConvertThroughParsing
|
||||
vec_to[i] = value;
|
||||
}
|
||||
else if constexpr (IsDataTypeDecimal<ToDataType>)
|
||||
{
|
||||
SerializationDecimal<typename ToDataType::FieldType>::readText(
|
||||
vec_to[i], read_buffer, ToDataType::maxPrecision(), col_to->getScale());
|
||||
}
|
||||
else
|
||||
{
|
||||
parseImpl<ToDataType>(vec_to[i], read_buffer, local_time_zone);
|
||||
@ -1294,8 +1312,10 @@ struct ConvertThroughParsing
|
||||
vec_to[i] = value;
|
||||
}
|
||||
else if constexpr (IsDataTypeDecimal<ToDataType>)
|
||||
{
|
||||
parsed = SerializationDecimal<typename ToDataType::FieldType>::tryReadText(
|
||||
vec_to[i], read_buffer, ToDataType::maxPrecision(), col_to->getScale());
|
||||
}
|
||||
else
|
||||
parsed = tryParseImpl<ToDataType>(vec_to[i], read_buffer, local_time_zone);
|
||||
}
|
||||
|
@ -85,6 +85,9 @@ class FunctionArrayMapped : public IFunction
|
||||
{
|
||||
public:
|
||||
static constexpr auto name = Name::name;
|
||||
static constexpr bool is_argument_type_map = std::is_same_v<typename Impl::data_type, DataTypeMap>;
|
||||
static constexpr bool is_argument_type_array = std::is_same_v<typename Impl::data_type, DataTypeArray>;
|
||||
static constexpr auto argument_type_name = is_argument_type_map ? "Map" : "Array";
|
||||
static FunctionPtr create(ContextPtr) { return std::make_shared<FunctionArrayMapped>(); }
|
||||
|
||||
String getName() const override
|
||||
@ -112,20 +115,25 @@ public:
|
||||
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
|
||||
"Function {} needs one argument with data", getName());
|
||||
|
||||
size_t nested_types_count = std::is_same_v<typename Impl::data_type, DataTypeMap> ? (arguments.size() - 1) * 2 : (arguments.size() - 1);
|
||||
size_t nested_types_count = is_argument_type_map ? (arguments.size() - 1) * 2 : (arguments.size() - 1);
|
||||
DataTypes nested_types(nested_types_count);
|
||||
for (size_t i = 0; i < arguments.size() - 1; ++i)
|
||||
{
|
||||
const auto * array_type = checkAndGetDataType<typename Impl::data_type>(&*arguments[i + 1]);
|
||||
if (!array_type)
|
||||
throw Exception("Argument " + toString(i + 2) + " of function " + getName() + " must be array. Found "
|
||||
+ arguments[i + 1]->getName() + " instead.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
|
||||
if constexpr (std::is_same_v<typename Impl::data_type, DataTypeMap>)
|
||||
throw Exception(
|
||||
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
|
||||
"Argument {} of function {} must be {}. Found {} instead",
|
||||
toString(i + 2),
|
||||
getName(),
|
||||
argument_type_name,
|
||||
arguments[i + 1]->getName());
|
||||
if constexpr (is_argument_type_map)
|
||||
{
|
||||
nested_types[2 * i] = recursiveRemoveLowCardinality(array_type->getKeyType());
|
||||
nested_types[2 * i + 1] = recursiveRemoveLowCardinality(array_type->getValueType());
|
||||
}
|
||||
else if constexpr (std::is_same_v<typename Impl::data_type, DataTypeArray>)
|
||||
else if constexpr (is_argument_type_array)
|
||||
{
|
||||
nested_types[i] = recursiveRemoveLowCardinality(array_type->getNestedType());
|
||||
}
|
||||
@ -149,7 +157,7 @@ public:
|
||||
"Function {} needs at least {} argument, passed {}",
|
||||
getName(), min_args, arguments.size());
|
||||
|
||||
if ((arguments.size() == 1) && std::is_same_v<typename Impl::data_type, DataTypeArray>)
|
||||
if ((arguments.size() == 1) && is_argument_type_array)
|
||||
{
|
||||
const auto * data_type = checkAndGetDataType<typename Impl::data_type>(arguments[0].type.get());
|
||||
|
||||
@ -163,7 +171,7 @@ public:
|
||||
throw Exception("The only argument for function " + getName() + " must be array of UInt8. Found "
|
||||
+ arguments[0].type->getName() + " instead", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
|
||||
|
||||
if constexpr (std::is_same_v<typename Impl::data_type, DataTypeArray>)
|
||||
if constexpr (is_argument_type_array)
|
||||
return Impl::getReturnType(nested_type, nested_type);
|
||||
else
|
||||
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Unreachable code reached");
|
||||
@ -193,10 +201,7 @@ public:
|
||||
throw Exception("Expression for function " + getName() + " must return UInt8 or Nullable(UInt8), found "
|
||||
+ return_type->getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
|
||||
|
||||
static_assert(
|
||||
std::is_same_v<typename Impl::data_type, DataTypeMap> ||
|
||||
std::is_same_v<typename Impl::data_type, DataTypeArray>,
|
||||
"unsupported type");
|
||||
static_assert(is_argument_type_map || is_argument_type_array, "unsupported type");
|
||||
|
||||
if (arguments.size() < 2)
|
||||
{
|
||||
@ -208,10 +213,10 @@ public:
|
||||
if (!first_array_type)
|
||||
throw DB::Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Unsupported type {}", arguments[1].type->getName());
|
||||
|
||||
if constexpr (std::is_same_v<typename Impl::data_type, DataTypeArray>)
|
||||
if constexpr (is_argument_type_array)
|
||||
return Impl::getReturnType(return_type, first_array_type->getNestedType());
|
||||
|
||||
if constexpr (std::is_same_v<typename Impl::data_type, DataTypeMap>)
|
||||
if constexpr (is_argument_type_map)
|
||||
return Impl::getReturnType(return_type, first_array_type->getKeyValueTypes());
|
||||
|
||||
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Unreachable code reached");
|
||||
@ -229,7 +234,11 @@ public:
|
||||
{
|
||||
const ColumnConst * column_const_array = checkAndGetColumnConst<typename Impl::column_type>(column_array_ptr.get());
|
||||
if (!column_const_array)
|
||||
throw Exception("Expected array column, found " + column_array_ptr->getName(), ErrorCodes::ILLEGAL_COLUMN);
|
||||
throw Exception(
|
||||
ErrorCodes::ILLEGAL_COLUMN,
|
||||
"Expected {} column, found {}",
|
||||
argument_type_name,
|
||||
column_array_ptr->getName());
|
||||
column_array_ptr = column_const_array->convertToFullColumn();
|
||||
column_array = assert_cast<const typename Impl::column_type *>(column_array_ptr.get());
|
||||
}
|
||||
@ -279,13 +288,15 @@ public:
|
||||
{
|
||||
const ColumnConst * column_const_array = checkAndGetColumnConst<typename Impl::column_type>(column_array_ptr.get());
|
||||
if (!column_const_array)
|
||||
throw Exception("Expected array column, found " + column_array_ptr->getName(), ErrorCodes::ILLEGAL_COLUMN);
|
||||
throw Exception(
|
||||
ErrorCodes::ILLEGAL_COLUMN, "Expected {} column, found {}", argument_type_name, column_array_ptr->getName());
|
||||
column_array_ptr = recursiveRemoveLowCardinality(column_const_array->convertToFullColumn());
|
||||
column_array = checkAndGetColumn<typename Impl::column_type>(column_array_ptr.get());
|
||||
}
|
||||
|
||||
if (!array_type)
|
||||
throw Exception("Expected array type, found " + array_type_ptr->getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
|
||||
throw Exception(
|
||||
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Expected {} type, found {}", argument_type_name, array_type_ptr->getName());
|
||||
|
||||
if (!offsets_column)
|
||||
{
|
||||
@ -296,7 +307,11 @@ public:
|
||||
/// The first condition is optimization: do not compare data if the pointers are equal.
|
||||
if (getOffsetsPtr(*column_array) != offsets_column
|
||||
&& getOffsets(*column_array) != typeid_cast<const ColumnArray::ColumnOffsets &>(*offsets_column).getData())
|
||||
throw Exception("Arrays passed to " + getName() + " must have equal size", ErrorCodes::SIZES_OF_ARRAYS_DOESNT_MATCH);
|
||||
throw Exception(
|
||||
ErrorCodes::SIZES_OF_ARRAYS_DOESNT_MATCH,
|
||||
"{}s passed to {} must have equal size",
|
||||
argument_type_name,
|
||||
getName());
|
||||
}
|
||||
|
||||
if (i == 1)
|
||||
@ -305,7 +320,7 @@ public:
|
||||
column_first_array = column_array;
|
||||
}
|
||||
|
||||
if constexpr (std::is_same_v<DataTypeMap, typename Impl::data_type>)
|
||||
if constexpr (is_argument_type_map)
|
||||
{
|
||||
arrays.emplace_back(ColumnWithTypeAndName(
|
||||
column_array->getNestedData().getColumnPtr(0), recursiveRemoveLowCardinality(array_type->getKeyType()), array_with_type_and_name.name+".key"));
|
||||
|
@ -964,7 +964,13 @@ inline ReturnType readDateTimeTextImpl(DateTime64 & datetime64, UInt32 scale, Re
|
||||
components.whole = components.whole / common::exp10_i32(scale);
|
||||
}
|
||||
|
||||
datetime64 = negative_multiplier * DecimalUtils::decimalFromComponents<DateTime64>(components, scale);
|
||||
if constexpr (std::is_same_v<ReturnType, void>)
|
||||
datetime64 = DecimalUtils::decimalFromComponents<DateTime64>(components, scale);
|
||||
else
|
||||
DecimalUtils::tryGetDecimalFromComponents<DateTime64>(components, scale, datetime64);
|
||||
|
||||
datetime64 *= negative_multiplier;
|
||||
|
||||
|
||||
return ReturnType(true);
|
||||
}
|
||||
|
@ -5,6 +5,8 @@
|
||||
#include "PocoHTTPClient.h"
|
||||
|
||||
#include <utility>
|
||||
#include <algorithm>
|
||||
#include <functional>
|
||||
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/Stopwatch.h>
|
||||
@ -14,6 +16,7 @@
|
||||
|
||||
#include <aws/core/http/HttpRequest.h>
|
||||
#include <aws/core/http/HttpResponse.h>
|
||||
#include <aws/core/utils/xml/XmlSerializer.h>
|
||||
#include <aws/core/monitoring/HttpClientMetrics.h>
|
||||
#include <aws/core/utils/ratelimiter/RateLimiterInterface.h>
|
||||
#include "Poco/StreamCopier.h"
|
||||
@ -23,6 +26,8 @@
|
||||
|
||||
#include <boost/algorithm/string.hpp>
|
||||
|
||||
static const int SUCCESS_RESPONSE_MIN = 200;
|
||||
static const int SUCCESS_RESPONSE_MAX = 299;
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
@ -121,6 +126,37 @@ std::shared_ptr<Aws::Http::HttpResponse> PocoHTTPClient::MakeRequest(
|
||||
return response;
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
/// No comments:
|
||||
/// 1) https://aws.amazon.com/premiumsupport/knowledge-center/s3-resolve-200-internalerror/
|
||||
/// 2) https://github.com/aws/aws-sdk-cpp/issues/658
|
||||
bool checkRequestCanReturn2xxAndErrorInBody(Aws::Http::HttpRequest & request)
|
||||
{
|
||||
auto query_params = request.GetQueryStringParameters();
|
||||
if (request.HasHeader("z-amz-copy-source"))
|
||||
{
|
||||
/// CopyObject https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html
|
||||
if (query_params.empty())
|
||||
return true;
|
||||
|
||||
/// UploadPartCopy https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPartCopy.html
|
||||
if (query_params.contains("partNumber") && query_params.contains("uploadId"))
|
||||
return true;
|
||||
|
||||
}
|
||||
else
|
||||
{
|
||||
/// CompleteMultipartUpload https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
|
||||
if (query_params.size() == 1 && query_params.contains("uploadId"))
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void PocoHTTPClient::makeRequestInternal(
|
||||
Aws::Http::HttpRequest & request,
|
||||
std::shared_ptr<PocoHTTPResponse> & response,
|
||||
@ -281,6 +317,7 @@ void PocoHTTPClient::makeRequestInternal(
|
||||
ProfileEvents::increment(select_metric(S3MetricType::Microseconds), watch.elapsedMicroseconds());
|
||||
|
||||
int status_code = static_cast<int>(poco_response.getStatus());
|
||||
|
||||
if (enable_s3_requests_logging)
|
||||
LOG_TEST(log, "Response status: {}, {}", status_code, poco_response.getReason());
|
||||
|
||||
@ -316,18 +353,44 @@ void PocoHTTPClient::makeRequestInternal(
|
||||
response->AddHeader(header_name, header_value);
|
||||
}
|
||||
|
||||
if (status_code == 429 || status_code == 503)
|
||||
{ // API throttling
|
||||
ProfileEvents::increment(select_metric(S3MetricType::Throttling));
|
||||
}
|
||||
else if (status_code >= 300)
|
||||
/// Request is successful but for some special requests we can have actual error message in body
|
||||
if (status_code >= SUCCESS_RESPONSE_MIN && status_code <= SUCCESS_RESPONSE_MAX && checkRequestCanReturn2xxAndErrorInBody(request))
|
||||
{
|
||||
ProfileEvents::increment(select_metric(S3MetricType::Errors));
|
||||
if (status_code >= 500 && error_report)
|
||||
error_report(request_configuration);
|
||||
}
|
||||
std::string response_string((std::istreambuf_iterator<char>(response_body_stream)),
|
||||
std::istreambuf_iterator<char>());
|
||||
|
||||
response->SetResponseBody(response_body_stream, session);
|
||||
/// Just trim string so it will not be so long
|
||||
LOG_TRACE(log, "Got dangerous response with successful code {}, checking its body: '{}'", status_code, response_string.substr(0, 300));
|
||||
const static std::string_view needle = "<Error>";
|
||||
if (auto it = std::search(response_string.begin(), response_string.end(), std::default_searcher(needle.begin(), needle.end())); it != response_string.end())
|
||||
{
|
||||
LOG_WARNING(log, "Response for request contain <Error> tag in body, settings internal server error (500 code)");
|
||||
response->SetResponseCode(Aws::Http::HttpResponseCode::INTERNAL_SERVER_ERROR);
|
||||
|
||||
ProfileEvents::increment(select_metric(S3MetricType::Errors));
|
||||
if (error_report)
|
||||
error_report(request_configuration);
|
||||
|
||||
}
|
||||
|
||||
/// Set response from string
|
||||
response->SetResponseBody(response_string);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
||||
if (status_code == 429 || status_code == 503)
|
||||
{ // API throttling
|
||||
ProfileEvents::increment(select_metric(S3MetricType::Throttling));
|
||||
}
|
||||
else if (status_code >= 300)
|
||||
{
|
||||
ProfileEvents::increment(select_metric(S3MetricType::Errors));
|
||||
if (status_code >= 500 && error_report)
|
||||
error_report(request_configuration);
|
||||
}
|
||||
response->SetResponseBody(response_body_stream, session);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
@ -80,6 +80,13 @@ public:
|
||||
);
|
||||
}
|
||||
|
||||
void SetResponseBody(std::string & response_body) /// NOLINT
|
||||
{
|
||||
auto stream = Aws::New<std::stringstream>("http result buf", response_body); // STYLE_CHECK_ALLOW_STD_STRING_STREAM
|
||||
stream->exceptions(std::ios::failbit);
|
||||
body_stream = Aws::Utils::Stream::ResponseStream(std::move(stream));
|
||||
}
|
||||
|
||||
Aws::IOStream & GetResponseBody() const override
|
||||
{
|
||||
return body_stream.GetUnderlyingStream();
|
||||
|
@ -35,9 +35,8 @@ WriteBufferFromHTTP::WriteBufferFromHTTP(
|
||||
|
||||
void WriteBufferFromHTTP::finalizeImpl()
|
||||
{
|
||||
// for compressed body, the data is stored in buffered first
|
||||
// here, make sure the content in the buffer has been flushed
|
||||
this->nextImpl();
|
||||
// Make sure the content in the buffer has been flushed
|
||||
this->next();
|
||||
|
||||
receiveResponse(*session, request, response, false);
|
||||
/// TODO: Response body is ignored.
|
||||
|
@ -15,6 +15,7 @@
|
||||
#include <aws/s3/model/CompleteMultipartUploadRequest.h>
|
||||
#include <aws/s3/model/PutObjectRequest.h>
|
||||
#include <aws/s3/model/UploadPartRequest.h>
|
||||
#include <aws/s3/model/HeadObjectRequest.h>
|
||||
|
||||
#include <utility>
|
||||
|
||||
@ -164,6 +165,20 @@ void WriteBufferFromS3::finalizeImpl()
|
||||
|
||||
if (!multipart_upload_id.empty())
|
||||
completeMultipartUpload();
|
||||
|
||||
if (s3_settings.check_objects_after_upload)
|
||||
{
|
||||
LOG_TRACE(log, "Checking object {} exists after upload", key);
|
||||
|
||||
Aws::S3::Model::HeadObjectRequest request;
|
||||
request.SetBucket(bucket);
|
||||
request.SetKey(key);
|
||||
|
||||
auto response = client_ptr->HeadObject(request);
|
||||
|
||||
if (!response.IsSuccess())
|
||||
throw Exception(ErrorCodes::S3_ERROR, "Object {} from bucket {} disappeared immediately after upload, it's a bug in S3 or S3 API.", key, bucket);
|
||||
}
|
||||
}
|
||||
|
||||
void WriteBufferFromS3::createMultipartUpload()
|
||||
|
@ -415,7 +415,7 @@ std::string DataPartStorageOnDisk::getDiskName() const
|
||||
|
||||
std::string DataPartStorageOnDisk::getDiskType() const
|
||||
{
|
||||
return toString(volume->getDisk()->getType());
|
||||
return toString(volume->getDisk()->getDataSourceDescription().type);
|
||||
}
|
||||
|
||||
bool DataPartStorageOnDisk::isStoredOnRemoteDisk() const
|
||||
|
@ -458,11 +458,11 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
|
||||
Disks disks = data.getDisks();
|
||||
for (const auto & data_disk : disks)
|
||||
if (data_disk->supportZeroCopyReplication())
|
||||
capability.push_back(toString(data_disk->getType()));
|
||||
capability.push_back(toString(data_disk->getDataSourceDescription().type));
|
||||
}
|
||||
else if (disk->supportZeroCopyReplication())
|
||||
{
|
||||
capability.push_back(toString(disk->getType()));
|
||||
capability.push_back(toString(disk->getDataSourceDescription().type));
|
||||
}
|
||||
}
|
||||
if (!capability.empty())
|
||||
|
@ -443,6 +443,7 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
|
||||
pos += unused_flags_len + commit_lsn_len + transaction_end_lsn_len + transaction_commit_timestamp_len;
|
||||
|
||||
final_lsn = current_lsn;
|
||||
committed = true;
|
||||
break;
|
||||
}
|
||||
case 'R': // Relation
|
||||
@ -593,6 +594,12 @@ void MaterializedPostgreSQLConsumer::syncTables()
|
||||
|
||||
LOG_DEBUG(log, "Table sync end for {} tables, last lsn: {} = {}, (attempted lsn {})", tables_to_sync.size(), current_lsn, getLSNValue(current_lsn), getLSNValue(final_lsn));
|
||||
|
||||
updateLsn();
|
||||
}
|
||||
|
||||
|
||||
void MaterializedPostgreSQLConsumer::updateLsn()
|
||||
{
|
||||
try
|
||||
{
|
||||
auto tx = std::make_shared<pqxx::nontransaction>(connection->getRef());
|
||||
@ -614,6 +621,7 @@ String MaterializedPostgreSQLConsumer::advanceLSN(std::shared_ptr<pqxx::nontrans
|
||||
|
||||
final_lsn = result[0][0].as<std::string>();
|
||||
LOG_TRACE(log, "Advanced LSN up to: {}", getLSNValue(final_lsn));
|
||||
committed = false;
|
||||
return final_lsn;
|
||||
}
|
||||
|
||||
@ -771,7 +779,7 @@ bool MaterializedPostgreSQLConsumer::readFromReplicationSlot()
|
||||
|
||||
try
|
||||
{
|
||||
// LOG_DEBUG(log, "Current message: {}", (*row)[1]);
|
||||
/// LOG_DEBUG(log, "Current message: {}", (*row)[1]);
|
||||
processReplicationMessage((*row)[1].c_str(), (*row)[1].size());
|
||||
}
|
||||
catch (const Exception & e)
|
||||
@ -790,6 +798,7 @@ bool MaterializedPostgreSQLConsumer::readFromReplicationSlot()
|
||||
}
|
||||
catch (const pqxx::broken_connection &)
|
||||
{
|
||||
LOG_DEBUG(log, "Connection was broken");
|
||||
connection->tryUpdateConnection();
|
||||
return false;
|
||||
}
|
||||
@ -823,7 +832,13 @@ bool MaterializedPostgreSQLConsumer::readFromReplicationSlot()
|
||||
}
|
||||
|
||||
if (!tables_to_sync.empty())
|
||||
{
|
||||
syncTables();
|
||||
}
|
||||
else if (committed)
|
||||
{
|
||||
updateLsn();
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
@ -94,6 +94,8 @@ private:
|
||||
|
||||
void syncTables();
|
||||
|
||||
void updateLsn();
|
||||
|
||||
String advanceLSN(std::shared_ptr<pqxx::nontransaction> ntx);
|
||||
|
||||
void processReplicationMessage(const char * replication_message, size_t size);
|
||||
@ -136,6 +138,8 @@ private:
|
||||
ContextPtr context;
|
||||
const std::string replication_slot_name, publication_name;
|
||||
|
||||
bool committed = false;
|
||||
|
||||
std::shared_ptr<postgres::Connection> connection;
|
||||
|
||||
std::string current_lsn, final_lsn;
|
||||
|
@ -321,13 +321,13 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
|
||||
nested_storages,
|
||||
(is_materialized_postgresql_database ? postgres_database : postgres_database + '.' + tables_list));
|
||||
|
||||
replication_handler_initialized = true;
|
||||
|
||||
consumer_task->activateAndSchedule();
|
||||
cleanup_task->activateAndSchedule();
|
||||
|
||||
/// Do not rely anymore on saved storage pointers.
|
||||
materialized_storages.clear();
|
||||
|
||||
replication_handler_initialized = true;
|
||||
}
|
||||
|
||||
|
||||
|
@ -7500,7 +7500,7 @@ void StorageReplicatedMergeTree::lockSharedDataTemporary(const String & part_nam
|
||||
String id = part_id;
|
||||
boost::replace_all(id, "/", "_");
|
||||
|
||||
Strings zc_zookeeper_paths = getZeroCopyPartPath(*getSettings(), toString(disk->getType()), getTableSharedID(),
|
||||
Strings zc_zookeeper_paths = getZeroCopyPartPath(*getSettings(), toString(disk->getDataSourceDescription().type), getTableSharedID(),
|
||||
part_name, zookeeper_path);
|
||||
|
||||
for (const auto & zc_zookeeper_path : zc_zookeeper_paths)
|
||||
@ -7690,11 +7690,11 @@ DataPartStoragePtr StorageReplicatedMergeTree::tryToFetchIfShared(
|
||||
const String & path)
|
||||
{
|
||||
const auto settings = getSettings();
|
||||
auto disk_type = disk->getType();
|
||||
auto data_source_description = disk->getDataSourceDescription();
|
||||
if (!(disk->supportZeroCopyReplication() && settings->allow_remote_fs_zero_copy_replication))
|
||||
return nullptr;
|
||||
|
||||
String replica = getSharedDataReplica(part, disk_type);
|
||||
String replica = getSharedDataReplica(part, data_source_description.type);
|
||||
|
||||
/// We can't fetch part when none replicas have this part on a same type remote disk
|
||||
if (replica.empty())
|
||||
@ -7703,9 +7703,8 @@ DataPartStoragePtr StorageReplicatedMergeTree::tryToFetchIfShared(
|
||||
return executeFetchShared(replica, part.name, disk, path);
|
||||
}
|
||||
|
||||
|
||||
String StorageReplicatedMergeTree::getSharedDataReplica(
|
||||
const IMergeTreeDataPart & part, DiskType disk_type) const
|
||||
const IMergeTreeDataPart & part, DataSourceType data_source_type) const
|
||||
{
|
||||
String best_replica;
|
||||
|
||||
@ -7713,7 +7712,7 @@ String StorageReplicatedMergeTree::getSharedDataReplica(
|
||||
if (!zookeeper)
|
||||
return "";
|
||||
|
||||
Strings zc_zookeeper_paths = getZeroCopyPartPath(*getSettings(), toString(disk_type), getTableSharedID(), part.name,
|
||||
Strings zc_zookeeper_paths = getZeroCopyPartPath(*getSettings(), toString(data_source_type), getTableSharedID(), part.name,
|
||||
zookeeper_path);
|
||||
|
||||
std::set<String> replicas;
|
||||
@ -7824,7 +7823,7 @@ std::optional<String> StorageReplicatedMergeTree::getZeroCopyPartPath(const Stri
|
||||
if (!disk || !disk->supportZeroCopyReplication())
|
||||
return std::nullopt;
|
||||
|
||||
return getZeroCopyPartPath(*getSettings(), toString(disk->getType()), getTableSharedID(), part_name, zookeeper_path)[0];
|
||||
return getZeroCopyPartPath(*getSettings(), toString(disk->getDataSourceDescription().type), getTableSharedID(), part_name, zookeeper_path)[0];
|
||||
}
|
||||
|
||||
std::optional<ZeroCopyLock> StorageReplicatedMergeTree::tryCreateZeroCopyExclusiveLock(const String & part_name, const DiskPtr & disk)
|
||||
@ -8229,7 +8228,7 @@ bool StorageReplicatedMergeTree::removeSharedDetachedPart(DiskPtr disk, const St
|
||||
String id = disk->getUniqueId(checksums);
|
||||
bool can_remove = false;
|
||||
std::tie(can_remove, files_not_to_remove) = StorageReplicatedMergeTree::unlockSharedDataByID(id, table_uuid, part_name,
|
||||
detached_replica_name, toString(disk->getType()), zookeeper, local_context->getReplicatedMergeTreeSettings(), &Poco::Logger::get("StorageReplicatedMergeTree"),
|
||||
detached_replica_name, toString(disk->getDataSourceDescription().type), zookeeper, local_context->getReplicatedMergeTreeSettings(), &Poco::Logger::get("StorageReplicatedMergeTree"),
|
||||
detached_zookeeper_path);
|
||||
|
||||
keep_shared = !can_remove;
|
||||
|
@ -283,7 +283,7 @@ public:
|
||||
DataPartStoragePtr tryToFetchIfShared(const IMergeTreeDataPart & part, const DiskPtr & disk, const String & path) override;
|
||||
|
||||
/// Get best replica having this partition on a same type remote disk
|
||||
String getSharedDataReplica(const IMergeTreeDataPart & part, DiskType disk_type) const;
|
||||
String getSharedDataReplica(const IMergeTreeDataPart & part, DataSourceType data_source_type) const;
|
||||
|
||||
inline String getReplicaName() const { return replica_name; }
|
||||
|
||||
|
@ -34,6 +34,13 @@ void StorageS3Settings::loadFromConfig(const String & config_elem, const Poco::U
|
||||
return with_default ? config.getUInt64(config_elem + "." + key + "." + elem, default_value) : config.getUInt64(config_elem + "." + key + "." + elem);
|
||||
};
|
||||
|
||||
|
||||
auto get_bool_for_key = [&](const String & key, const String & elem, bool with_default = true, bool default_value = false)
|
||||
{
|
||||
return with_default ? config.getBool(config_elem + "." + key + "." + elem, default_value) : config.getBool(config_elem + "." + key + "." + elem);
|
||||
};
|
||||
|
||||
|
||||
for (const String & key : config_keys)
|
||||
{
|
||||
if (config.has(config_elem + "." + key + ".endpoint"))
|
||||
@ -82,6 +89,7 @@ void StorageS3Settings::loadFromConfig(const String & config_elem, const Poco::U
|
||||
rw_settings.upload_part_size_multiply_parts_count_threshold = get_uint_for_key(key, "upload_part_size_multiply_parts_count_threshold", true, settings.s3_upload_part_size_multiply_parts_count_threshold);
|
||||
rw_settings.max_single_part_upload_size = get_uint_for_key(key, "max_single_part_upload_size", true, settings.s3_max_single_part_upload_size);
|
||||
rw_settings.max_connections = get_uint_for_key(key, "max_connections", true, settings.s3_max_connections);
|
||||
rw_settings.check_objects_after_upload = get_bool_for_key(key, "check_objects_after_upload", true, false);
|
||||
|
||||
s3_settings.emplace(endpoint, S3Settings{std::move(auth_settings), std::move(rw_settings)});
|
||||
}
|
||||
@ -112,6 +120,7 @@ S3Settings::ReadWriteSettings::ReadWriteSettings(const Settings & settings)
|
||||
upload_part_size_multiply_parts_count_threshold = settings.s3_upload_part_size_multiply_parts_count_threshold;
|
||||
max_single_part_upload_size = settings.s3_max_single_part_upload_size;
|
||||
max_connections = settings.s3_max_connections;
|
||||
check_objects_after_upload = settings.s3_check_objects_after_upload;
|
||||
}
|
||||
|
||||
void S3Settings::ReadWriteSettings::updateFromSettingsIfEmpty(const Settings & settings)
|
||||
@ -128,6 +137,7 @@ void S3Settings::ReadWriteSettings::updateFromSettingsIfEmpty(const Settings & s
|
||||
max_single_part_upload_size = settings.s3_max_single_part_upload_size;
|
||||
if (!max_connections)
|
||||
max_connections = settings.s3_max_connections;
|
||||
check_objects_after_upload = settings.s3_check_objects_after_upload;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -60,6 +60,7 @@ struct S3Settings
|
||||
size_t upload_part_size_multiply_parts_count_threshold = 0;
|
||||
size_t max_single_part_upload_size = 0;
|
||||
size_t max_connections = 0;
|
||||
bool check_objects_after_upload = false;
|
||||
|
||||
ReadWriteSettings() = default;
|
||||
explicit ReadWriteSettings(const Settings & settings);
|
||||
@ -71,7 +72,8 @@ struct S3Settings
|
||||
&& upload_part_size_multiply_factor == other.upload_part_size_multiply_factor
|
||||
&& upload_part_size_multiply_parts_count_threshold == other.upload_part_size_multiply_parts_count_threshold
|
||||
&& max_single_part_upload_size == other.max_single_part_upload_size
|
||||
&& max_connections == other.max_connections;
|
||||
&& max_connections == other.max_connections
|
||||
&& check_objects_after_upload == other.check_objects_after_upload;
|
||||
}
|
||||
|
||||
void updateFromSettingsIfEmpty(const Settings & settings);
|
||||
|
@ -23,6 +23,7 @@ StorageSystemDisks::StorageSystemDisks(const StorageID & table_id_)
|
||||
{"total_space", std::make_shared<DataTypeUInt64>()},
|
||||
{"keep_free_space", std::make_shared<DataTypeUInt64>()},
|
||||
{"type", std::make_shared<DataTypeString>()},
|
||||
{"is_encrypted", std::make_shared<DataTypeUInt8>()},
|
||||
{"cache_path", std::make_shared<DataTypeString>()},
|
||||
}));
|
||||
setInMemoryMetadata(storage_metadata);
|
||||
@ -45,6 +46,7 @@ Pipe StorageSystemDisks::read(
|
||||
MutableColumnPtr col_total = ColumnUInt64::create();
|
||||
MutableColumnPtr col_keep = ColumnUInt64::create();
|
||||
MutableColumnPtr col_type = ColumnString::create();
|
||||
MutableColumnPtr col_is_encrypted = ColumnUInt8::create();
|
||||
MutableColumnPtr col_cache_path = ColumnString::create();
|
||||
|
||||
for (const auto & [disk_name, disk_ptr] : context->getDisksMap())
|
||||
@ -54,7 +56,9 @@ Pipe StorageSystemDisks::read(
|
||||
col_free->insert(disk_ptr->getAvailableSpace());
|
||||
col_total->insert(disk_ptr->getTotalSpace());
|
||||
col_keep->insert(disk_ptr->getKeepingFreeSpace());
|
||||
col_type->insert(toString(disk_ptr->getType()));
|
||||
auto data_source_description = disk_ptr->getDataSourceDescription();
|
||||
col_type->insert(toString(data_source_description.type));
|
||||
col_is_encrypted->insert(data_source_description.is_encrypted);
|
||||
|
||||
String cache_path;
|
||||
if (disk_ptr->supportsCache())
|
||||
@ -70,6 +74,7 @@ Pipe StorageSystemDisks::read(
|
||||
res_columns.emplace_back(std::move(col_total));
|
||||
res_columns.emplace_back(std::move(col_keep));
|
||||
res_columns.emplace_back(std::move(col_type));
|
||||
res_columns.emplace_back(std::move(col_is_encrypted));
|
||||
res_columns.emplace_back(std::move(col_cache_path));
|
||||
|
||||
UInt64 num_rows = res_columns.at(0)->size();
|
||||
|
@ -59,6 +59,7 @@ ln -sf $SRC_PATH/users.d/session_log_test.xml $DEST_SERVER_PATH/users.d/
|
||||
ln -sf $SRC_PATH/users.d/memory_profiler.xml $DEST_SERVER_PATH/users.d/
|
||||
ln -sf $SRC_PATH/users.d/no_fsync_metadata.xml $DEST_SERVER_PATH/users.d/
|
||||
ln -sf $SRC_PATH/users.d/filelog.xml $DEST_SERVER_PATH/users.d/
|
||||
ln -sf $SRC_PATH/users.d/enable_blobs_check.xml $DEST_SERVER_PATH/users.d/
|
||||
|
||||
# FIXME DataPartsExchange may hang for http_send_timeout seconds
|
||||
# when nobody is going to read from the other side of socket (due to "Fetching of part was cancelled"),
|
||||
|
7
tests/config/users.d/enable_blobs_check.xml
Normal file
7
tests/config/users.d/enable_blobs_check.xml
Normal file
@ -0,0 +1,7 @@
|
||||
<clickhouse>
|
||||
<profiles>
|
||||
<default>
|
||||
<s3_check_objects_after_upload>1</s3_check_objects_after_upload>
|
||||
</default>
|
||||
</profiles>
|
||||
</clickhouse>
|
@ -89,6 +89,8 @@ def test_restore_table(engine):
|
||||
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
|
||||
instance.query(f"BACKUP TABLE test.table TO {backup_name}")
|
||||
|
||||
assert instance.contains_in_log("using native copy")
|
||||
|
||||
instance.query("DROP TABLE test.table")
|
||||
assert instance.query("EXISTS test.table") == "0\n"
|
||||
|
||||
@ -129,6 +131,8 @@ def test_restore_table_under_another_name():
|
||||
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
|
||||
instance.query(f"BACKUP TABLE test.table TO {backup_name}")
|
||||
|
||||
assert instance.contains_in_log("using native copy")
|
||||
|
||||
assert instance.query("EXISTS test.table2") == "0\n"
|
||||
|
||||
instance.query(f"RESTORE TABLE test.table AS test.table2 FROM {backup_name}")
|
||||
@ -142,6 +146,8 @@ def test_backup_table_under_another_name():
|
||||
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
|
||||
instance.query(f"BACKUP TABLE test.table AS test.table2 TO {backup_name}")
|
||||
|
||||
assert instance.contains_in_log("using native copy")
|
||||
|
||||
assert instance.query("EXISTS test.table2") == "0\n"
|
||||
|
||||
instance.query(f"RESTORE TABLE test.table2 FROM {backup_name}")
|
||||
@ -170,6 +176,8 @@ def test_incremental_backup():
|
||||
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
|
||||
instance.query(f"BACKUP TABLE test.table TO {backup_name}")
|
||||
|
||||
assert instance.contains_in_log("using native copy")
|
||||
|
||||
instance.query("INSERT INTO test.table VALUES (65, 'a'), (66, 'b')")
|
||||
|
||||
assert instance.query("SELECT count(), sum(x) FROM test.table") == "102\t5081\n"
|
||||
@ -244,6 +252,8 @@ def test_file_engine():
|
||||
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
|
||||
instance.query(f"BACKUP TABLE test.table TO {backup_name}")
|
||||
|
||||
assert instance.contains_in_log("using native copy")
|
||||
|
||||
instance.query("DROP TABLE test.table")
|
||||
assert instance.query("EXISTS test.table") == "0\n"
|
||||
|
||||
@ -257,6 +267,9 @@ def test_database():
|
||||
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
|
||||
|
||||
instance.query(f"BACKUP DATABASE test TO {backup_name}")
|
||||
|
||||
assert instance.contains_in_log("using native copy")
|
||||
|
||||
instance.query("DROP DATABASE test")
|
||||
instance.query(f"RESTORE DATABASE test FROM {backup_name}")
|
||||
|
||||
@ -269,6 +282,7 @@ def test_zip_archive():
|
||||
|
||||
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
|
||||
instance.query(f"BACKUP TABLE test.table TO {backup_name}")
|
||||
|
||||
assert os.path.isfile(get_path_to_backup(backup_name))
|
||||
|
||||
instance.query("DROP TABLE test.table")
|
||||
|
@ -0,0 +1 @@
|
||||
#!/usr/bin/env python3
|
@ -0,0 +1,23 @@
|
||||
<?xml version="1.0" encoding="utf-8"?>
|
||||
|
||||
<clickhouse>
|
||||
<profiles>
|
||||
<default>
|
||||
<s3_check_objects_after_upload>1</s3_check_objects_after_upload>
|
||||
<enable_s3_requests_logging>1</enable_s3_requests_logging>
|
||||
</default>
|
||||
</profiles>
|
||||
|
||||
<users>
|
||||
<default>
|
||||
<password></password>
|
||||
<networks incl="networks" replace="replace">
|
||||
<ip>::/0</ip>
|
||||
</networks>
|
||||
<profile>default</profile>
|
||||
<quota>default</quota>
|
||||
</default>
|
||||
</users>
|
||||
|
||||
<quotas><default></default></quotas>
|
||||
</clickhouse>
|
@ -0,0 +1,31 @@
|
||||
<?xml version="1.0" encoding="utf-8"?>
|
||||
|
||||
<clickhouse>
|
||||
<logger>
|
||||
<level>test</level>
|
||||
</logger>
|
||||
|
||||
<storage_configuration>
|
||||
<disks>
|
||||
<s3>
|
||||
<type>s3</type>
|
||||
<endpoint>http://minio1:9001/root/data/</endpoint>
|
||||
<access_key_id>minio</access_key_id>
|
||||
<secret_access_key>minio123</secret_access_key>
|
||||
</s3>
|
||||
</disks>
|
||||
|
||||
<policies>
|
||||
<s3>
|
||||
<volumes>
|
||||
<main>
|
||||
<disk>s3</disk>
|
||||
</main>
|
||||
</volumes>
|
||||
</s3>
|
||||
</policies>
|
||||
</storage_configuration>
|
||||
<merge_tree>
|
||||
<storage_policy>s3</storage_policy>
|
||||
</merge_tree>
|
||||
</clickhouse>
|
52
tests/integration/test_checking_s3_blobs_paranoid/test.py
Normal file
52
tests/integration/test_checking_s3_blobs_paranoid/test.py
Normal file
@ -0,0 +1,52 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
|
||||
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def cluster():
|
||||
try:
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
cluster.add_instance(
|
||||
"node",
|
||||
main_configs=[
|
||||
"configs/storage_conf.xml",
|
||||
],
|
||||
user_configs=[
|
||||
"configs/setting.xml",
|
||||
],
|
||||
with_minio=True,
|
||||
)
|
||||
logging.info("Starting cluster...")
|
||||
cluster.start()
|
||||
logging.info("Cluster started")
|
||||
|
||||
yield cluster
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def test_paranoid_check_in_logs(cluster):
|
||||
node = cluster.instances["node"]
|
||||
|
||||
node.query(
|
||||
"""
|
||||
CREATE TABLE s3_failover_test (
|
||||
id Int64,
|
||||
data String
|
||||
) ENGINE=MergeTree()
|
||||
ORDER BY id
|
||||
"""
|
||||
)
|
||||
|
||||
node.query("INSERT INTO s3_failover_test VALUES (1, 'Hello')")
|
||||
|
||||
assert node.contains_in_log("exists after upload")
|
||||
|
||||
assert node.query("SELECT * FROM s3_failover_test ORDER BY id") == "1\tHello\n"
|
@ -6,7 +6,7 @@ disk_types = {
|
||||
"disk_s3": "s3",
|
||||
"disk_memory": "memory",
|
||||
"disk_hdfs": "hdfs",
|
||||
"disk_encrypted": "encrypted",
|
||||
"disk_encrypted": "s3",
|
||||
}
|
||||
|
||||
|
||||
@ -34,14 +34,30 @@ def test_different_types(cluster):
|
||||
if disk == "": # skip empty line (after split at last position)
|
||||
continue
|
||||
fields = disk.split("\t")
|
||||
assert len(fields) >= 6
|
||||
assert len(fields) >= 7
|
||||
assert disk_types.get(fields[0], "UNKNOWN") == fields[5]
|
||||
if "encrypted" in fields[0]:
|
||||
assert fields[6] == "1"
|
||||
else:
|
||||
assert fields[6] == "0"
|
||||
|
||||
|
||||
def test_select_by_type(cluster):
|
||||
node = cluster.instances["node"]
|
||||
for name, disk_type in list(disk_types.items()):
|
||||
assert (
|
||||
node.query("SELECT name FROM system.disks WHERE type='" + disk_type + "'")
|
||||
== name + "\n"
|
||||
)
|
||||
if disk_type != "s3":
|
||||
assert (
|
||||
node.query(
|
||||
"SELECT name FROM system.disks WHERE type='" + disk_type + "'"
|
||||
)
|
||||
== name + "\n"
|
||||
)
|
||||
else:
|
||||
assert (
|
||||
node.query(
|
||||
"SELECT name FROM system.disks WHERE type='"
|
||||
+ disk_type
|
||||
+ "' ORDER BY name"
|
||||
)
|
||||
== "disk_encrypted\ndisk_s3\n"
|
||||
)
|
||||
|
@ -0,0 +1 @@
|
||||
#!/usr/bin/env python3
|
@ -0,0 +1,35 @@
|
||||
<clickhouse>
|
||||
<logger>
|
||||
<level>test</level>
|
||||
</logger>
|
||||
|
||||
<storage_configuration>
|
||||
<disks>
|
||||
<s3>
|
||||
<type>s3</type>
|
||||
<!-- Use custom S3 endpoint -->
|
||||
<endpoint>http://resolver:8080/root/data/</endpoint>
|
||||
<access_key_id>minio</access_key_id>
|
||||
<secret_access_key>minio123</secret_access_key>
|
||||
<!-- ClickHouse starts earlier than custom S3 endpoint. Skip access check to avoid fail on start-up -->
|
||||
<skip_access_check>true</skip_access_check>
|
||||
<!-- Avoid extra retries to speed up tests -->
|
||||
<retry_attempts>0</retry_attempts>
|
||||
<skip_access_check>true</skip_access_check>
|
||||
</s3>
|
||||
</disks>
|
||||
|
||||
<policies>
|
||||
<s3>
|
||||
<volumes>
|
||||
<main>
|
||||
<disk>s3</disk>
|
||||
</main>
|
||||
</volumes>
|
||||
</s3>
|
||||
</policies>
|
||||
</storage_configuration>
|
||||
<merge_tree>
|
||||
<storage_policy>s3</storage_policy>
|
||||
</merge_tree>
|
||||
</clickhouse>
|
@ -0,0 +1,22 @@
|
||||
|
||||
<clickhouse>
|
||||
<profiles>
|
||||
<default>
|
||||
<s3_max_single_part_upload_size>1</s3_max_single_part_upload_size>
|
||||
<enable_s3_requests_logging>1</enable_s3_requests_logging>
|
||||
</default>
|
||||
</profiles>
|
||||
|
||||
<users>
|
||||
<default>
|
||||
<password></password>
|
||||
<networks incl="networks" replace="replace">
|
||||
<ip>::/0</ip>
|
||||
</networks>
|
||||
<profile>default</profile>
|
||||
<quota>default</quota>
|
||||
</default>
|
||||
</users>
|
||||
|
||||
<quotas><default></default></quotas>
|
||||
</clickhouse>
|
@ -0,0 +1,36 @@
|
||||
#!/usr/bin/env python3
|
||||
from bottle import request, route, run, response
|
||||
|
||||
# Handle for MultipleObjectsDelete.
|
||||
@route("/<_bucket>", ["POST"])
|
||||
def delete(_bucket):
|
||||
response.set_header(
|
||||
"Location", "http://minio1:9001/" + _bucket + "?" + request.query_string
|
||||
)
|
||||
response.status = 307
|
||||
return "Redirected"
|
||||
|
||||
|
||||
@route("/<_bucket>/<_path:path>", ["GET", "POST", "PUT", "DELETE"])
|
||||
def server(_bucket, _path):
|
||||
# CompleteMultipartUpload request
|
||||
# We always returning 200 + error in body to simulate: https://aws.amazon.com/premiumsupport/knowledge-center/s3-resolve-200-internalerror/
|
||||
if request.query_string.startswith("uploadId="):
|
||||
response.status = 200
|
||||
response.content_type = "text/xml"
|
||||
return '<?xml version="1.0" encoding="UTF-8"?><Error><Code>InternalError</Code><Message>We encountered an internal error. Please try again.</Message><RequestId>txfbd566d03042474888193-00608d7538</RequestId></Error>'
|
||||
|
||||
response.set_header(
|
||||
"Location",
|
||||
"http://minio1:9001/" + _bucket + "/" + _path + "?" + request.query_string,
|
||||
)
|
||||
response.status = 307
|
||||
return "Redirected"
|
||||
|
||||
|
||||
@route("/")
|
||||
def ping():
|
||||
return "OK"
|
||||
|
||||
|
||||
run(host="0.0.0.0", port=8080)
|
88
tests/integration/test_s3_aws_sdk_is_total_garbage/test.py
Normal file
88
tests/integration/test_s3_aws_sdk_is_total_garbage/test.py
Normal file
@ -0,0 +1,88 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
|
||||
|
||||
import pytest
|
||||
|
||||
from helpers.client import QueryRuntimeException
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
|
||||
|
||||
# Runs custom python-based S3 endpoint.
|
||||
def run_endpoint(cluster):
|
||||
logging.info("Starting custom S3 endpoint")
|
||||
container_id = cluster.get_container_id("resolver")
|
||||
current_dir = os.path.dirname(__file__)
|
||||
cluster.copy_file_to_container(
|
||||
container_id,
|
||||
os.path.join(current_dir, "s3_endpoint", "endpoint.py"),
|
||||
"endpoint.py",
|
||||
)
|
||||
cluster.exec_in_container(container_id, ["python", "endpoint.py"], detach=True)
|
||||
|
||||
# Wait for S3 endpoint start
|
||||
num_attempts = 100
|
||||
for attempt in range(num_attempts):
|
||||
ping_response = cluster.exec_in_container(
|
||||
cluster.get_container_id("resolver"),
|
||||
["curl", "-s", "http://resolver:8080/"],
|
||||
nothrow=True,
|
||||
)
|
||||
if ping_response != "OK":
|
||||
if attempt == num_attempts - 1:
|
||||
assert ping_response == "OK", 'Expected "OK", but got "{}"'.format(
|
||||
ping_response
|
||||
)
|
||||
else:
|
||||
time.sleep(1)
|
||||
else:
|
||||
break
|
||||
|
||||
logging.info("S3 endpoint started")
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def cluster():
|
||||
try:
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
cluster.add_instance(
|
||||
"node",
|
||||
main_configs=[
|
||||
"configs/storage_conf.xml",
|
||||
],
|
||||
user_configs=[
|
||||
"configs/upload_min_size.xml",
|
||||
],
|
||||
with_minio=True,
|
||||
)
|
||||
logging.info("Starting cluster...")
|
||||
cluster.start()
|
||||
logging.info("Cluster started")
|
||||
|
||||
run_endpoint(cluster)
|
||||
|
||||
yield cluster
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def test_dataloss(cluster):
|
||||
node = cluster.instances["node"]
|
||||
|
||||
node.query(
|
||||
"""
|
||||
CREATE TABLE s3_failover_test (
|
||||
id Int64,
|
||||
data String
|
||||
) ENGINE=MergeTree()
|
||||
ORDER BY id
|
||||
"""
|
||||
)
|
||||
|
||||
# Must throw an exception because we use proxy which always fail
|
||||
# CompleteMultipartUpload requests
|
||||
with pytest.raises(Exception):
|
||||
node.query("INSERT INTO s3_failover_test VALUES (1, 'Hello')")
|
@ -123,7 +123,7 @@ timeout $TIMEOUT bash -c drop_part_thread &
|
||||
wait
|
||||
|
||||
check_replication_consistency "dst_" "count(), sum(p), sum(k), sum(v)"
|
||||
try_sync_replicas "src_"
|
||||
try_sync_replicas "src_" 300
|
||||
|
||||
for ((i=0; i<16; i++)) do
|
||||
$CLICKHOUSE_CLIENT -q "DROP TABLE dst_$i" 2>&1| grep -Fv "is already started to be removing" &
|
||||
|
@ -186,6 +186,7 @@ CREATE TABLE system.disks
|
||||
`total_space` UInt64,
|
||||
`keep_free_space` UInt64,
|
||||
`type` String,
|
||||
`is_encrypted` UInt8,
|
||||
`cache_path` String
|
||||
)
|
||||
ENGINE = SystemDisks
|
||||
|
@ -0,0 +1,12 @@
|
||||
2022-08-22
|
||||
2022-08-22
|
||||
2022-08-22
|
||||
2022-08-22
|
||||
2022-08-22
|
||||
2022-08-22
|
||||
2022-08-22
|
||||
2022-08-22
|
||||
2022-08-22
|
||||
2022-08-22
|
||||
2022-08-22
|
||||
2022-08-22
|
@ -0,0 +1,33 @@
|
||||
SELECT toDate('2022-08-22 01:02:03');
|
||||
SELECT toDate32('2022-08-22 01:02:03');
|
||||
|
||||
SELECT toDate('2022-08-22 01:02:03.1');
|
||||
SELECT toDate32('2022-08-22 01:02:03.1');
|
||||
|
||||
SELECT toDate('2022-08-22 01:02:03.123456');
|
||||
SELECT toDate32('2022-08-22 01:02:03.123456');
|
||||
|
||||
SELECT toDate('2022-08-22T01:02:03');
|
||||
SELECT toDate32('2022-08-22T01:02:03');
|
||||
|
||||
SELECT toDate('2022-08-22T01:02:03.1');
|
||||
SELECT toDate32('2022-08-22T01:02:03.1');
|
||||
|
||||
SELECT toDate('2022-08-22T01:02:03.123456');
|
||||
SELECT toDate32('2022-08-22T01:02:03.123456');
|
||||
|
||||
|
||||
SELECT toDate('2022-08-22+01:02:03'); -- { serverError 6 }
|
||||
SELECT toDate32('2022-08-22+01:02:03'); -- { serverError 6 }
|
||||
|
||||
SELECT toDate('2022-08-22 01:02:0'); -- { serverError 6 }
|
||||
SELECT toDate32('2022-08-22 01:02:0'); -- { serverError 6 }
|
||||
|
||||
SELECT toDate('2022-08-22 01:02:03.'); -- { serverError 6 }
|
||||
SELECT toDate32('2022-08-22 01:02:03.'); -- { serverError 6 }
|
||||
|
||||
SELECT toDate('2022-08-22 01:02:03.111a'); -- { serverError 6 }
|
||||
SELECT toDate32('2022-08-22 01:02:03.2b'); -- { serverError 6 }
|
||||
|
||||
SELECT toDate('2022-08-22 01:02:03.a'); -- { serverError 6 }
|
||||
SELECT toDate32('2022-08-22 01:02:03.b'); -- { serverError 6 }
|
@ -0,0 +1,5 @@
|
||||
a
|
||||
b
|
||||
c
|
||||
d
|
||||
e
|
15
tests/queries/0_stateless/02405_pmj_issue_40335.sql
Normal file
15
tests/queries/0_stateless/02405_pmj_issue_40335.sql
Normal file
@ -0,0 +1,15 @@
|
||||
DROP TABLE IF EXISTS t1;
|
||||
DROP TABLE IF EXISTS t2;
|
||||
|
||||
CREATE TABLE t1 (x UInt64) ENGINE = TinyLog;
|
||||
INSERT INTO t1 VALUES (1), (2), (3);
|
||||
|
||||
CREATE TABLE t2 (x UInt64, value String) ENGINE = TinyLog;
|
||||
INSERT INTO t2 VALUES (1, 'a'), (2, 'b'), (2, 'c');
|
||||
INSERT INTO t2 VALUES (3, 'd'), (3, 'e'), (4, 'f');
|
||||
|
||||
SET max_block_size=3;
|
||||
SET max_joined_block_size_rows = 2;
|
||||
SET join_algorithm='partial_merge';
|
||||
|
||||
SELECT value FROM t1 LEFT JOIN t2 ON t1.x = t2.x;
|
@ -0,0 +1,2 @@
|
||||
1970-01-01 00:00:00.000000000
|
||||
c1 Nullable(String)
|
@ -0,0 +1,2 @@
|
||||
select toDateTime64OrDefault('Aaaa e a.a.aaaaaaaaa', 9, 'UTC');
|
||||
desc format(CSV, '"Aaaa e a.a.aaaaaaaaa"');
|
28
tests/queries/0_stateless/add-test
Executable file
28
tests/queries/0_stateless/add-test
Executable file
@ -0,0 +1,28 @@
|
||||
#!/bin/bash
|
||||
|
||||
if [ -z "$1" ]; then
|
||||
echo "Helper script to create empty test and reference files and assign a new number."
|
||||
echo "Usage: $0 <base_test_name>"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
TESTS_PATH=$(dirname ${BASH_SOURCE[0]})
|
||||
set -ue
|
||||
|
||||
# shellcheck disable=SC2010
|
||||
LAST_TEST_NO=$(ls -1 ${TESTS_PATH} | grep -P -o '^\d+' | sort -nr | head -1)
|
||||
|
||||
# remove leading zeros, increment and add padding zeros to 5 digits
|
||||
NEW_TEST_NO=$(printf "%05d\n" $((10#$LAST_TEST_NO + 1)))
|
||||
|
||||
# if extension is not provided, use `.sql`
|
||||
FILENAME="${1}"
|
||||
FILEEXT="sql"
|
||||
if [[ $1 == *.* ]] ; then
|
||||
FILENAME="${1%.*}"
|
||||
FILEEXT="${1##*.}"
|
||||
fi
|
||||
|
||||
set -x
|
||||
touch ${TESTS_PATH}/${NEW_TEST_NO}_${FILENAME}.${FILEEXT}
|
||||
touch ${TESTS_PATH}/${NEW_TEST_NO}_${FILENAME}.reference
|
@ -5,6 +5,7 @@
|
||||
function try_sync_replicas()
|
||||
{
|
||||
table_name_prefix=$1
|
||||
time_left=$2
|
||||
|
||||
readarray -t empty_partitions_arr < <(${CLICKHOUSE_CLIENT} -q \
|
||||
"SELECT DISTINCT substr(new_part_name, 1, position(new_part_name, '_') - 1) AS partition_id
|
||||
@ -29,7 +30,7 @@ function try_sync_replicas()
|
||||
for t in "${tables_arr[@]}"
|
||||
do
|
||||
# The size of log may be big, so increase timeout.
|
||||
$CLICKHOUSE_CLIENT --receive_timeout 300 -q "SYSTEM SYNC REPLICA $t" || ($CLICKHOUSE_CLIENT -q \
|
||||
$CLICKHOUSE_CLIENT --receive_timeout $time_left -q "SYSTEM SYNC REPLICA $t" || ($CLICKHOUSE_CLIENT -q \
|
||||
"select 'sync failed, queue:', * from system.replication_queue where database=currentDatabase() and table='$t' order by database, table, node_name" && exit 1) &
|
||||
pids[${i}]=$!
|
||||
i=$((i + 1))
|
||||
@ -48,13 +49,14 @@ function check_replication_consistency()
|
||||
# Wait for all queries to finish (query may still be running if thread is killed by timeout)
|
||||
num_tries=0
|
||||
while [[ $($CLICKHOUSE_CLIENT -q "SELECT count() FROM system.processes WHERE current_database=currentDatabase() AND query LIKE '%$table_name_prefix%'") -ne 1 ]]; do
|
||||
sleep 0.5;
|
||||
sleep 1;
|
||||
num_tries=$((num_tries+1))
|
||||
if [ $num_tries -eq 200 ]; then
|
||||
if [ $num_tries -eq 250 ]; then
|
||||
$CLICKHOUSE_CLIENT -q "SELECT * FROM system.processes WHERE current_database=currentDatabase() AND query LIKE '%$table_name_prefix%' FORMAT Vertical"
|
||||
break
|
||||
fi
|
||||
done
|
||||
time_left=$((300 - num_tries))
|
||||
|
||||
# Do not check anything if all replicas are readonly,
|
||||
# because is this case all replicas are probably lost (it may happen and it's not a bug)
|
||||
@ -78,7 +80,7 @@ function check_replication_consistency()
|
||||
# SYNC REPLICA is not enough if some MUTATE_PARTs are not assigned yet
|
||||
wait_for_all_mutations "$table_name_prefix%"
|
||||
|
||||
try_sync_replicas "$table_name_prefix" || exit 1
|
||||
try_sync_replicas "$table_name_prefix" "$time_left" || exit 1
|
||||
|
||||
res=$($CLICKHOUSE_CLIENT -q \
|
||||
"SELECT
|
||||
|
Loading…
Reference in New Issue
Block a user