Merge branch 'master' into fix-race

This commit is contained in:
Kseniia Sumarokova 2022-08-25 13:03:29 +02:00 committed by GitHub
commit 2089ce14dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
97 changed files with 1185 additions and 332 deletions

View File

@ -303,7 +303,6 @@ else
rm -rf /var/lib/clickhouse/* rm -rf /var/lib/clickhouse/*
# Make BC check more funny by forcing Ordinary engine for system database # Make BC check more funny by forcing Ordinary engine for system database
# New version will try to convert it to Atomic on startup
mkdir /var/lib/clickhouse/metadata mkdir /var/lib/clickhouse/metadata
echo "ATTACH DATABASE system ENGINE=Ordinary" > /var/lib/clickhouse/metadata/system.sql echo "ATTACH DATABASE system ENGINE=Ordinary" > /var/lib/clickhouse/metadata/system.sql
@ -313,16 +312,13 @@ else
# Start server from previous release # Start server from previous release
configure configure
# Avoid "Setting allow_deprecated_database_ordinary is neither a builtin setting..." # Avoid "Setting s3_check_objects_after_upload is neither a builtin setting..."
rm -f /etc/clickhouse-server/users.d/database_ordinary.xml ||: rm -f /etc/clickhouse-server/users.d/enable_blobs_check.xml ||:
# Remove s3 related configs to avoid "there is no disk type `cache`" # Remove s3 related configs to avoid "there is no disk type `cache`"
rm -f /etc/clickhouse-server/config.d/storage_conf.xml ||: rm -f /etc/clickhouse-server/config.d/storage_conf.xml ||:
rm -f /etc/clickhouse-server/config.d/azure_storage_conf.xml ||: rm -f /etc/clickhouse-server/config.d/azure_storage_conf.xml ||:
# Disable aggressive cleanup of tmp dirs (it worked incorrectly before 22.8)
rm -f /etc/clickhouse-server/config.d/merge_tree_old_dirs_cleanup.xml ||:
start start
clickhouse-client --query="SELECT 'Server version: ', version()" clickhouse-client --query="SELECT 'Server version: ', version()"

View File

@ -5,16 +5,6 @@
namespace DB namespace DB
{ {
BackupEntryFromAppendOnlyFile::BackupEntryFromAppendOnlyFile(
const String & file_path_,
const std::optional<UInt64> & file_size_,
const std::optional<UInt128> & checksum_,
const std::shared_ptr<Poco::TemporaryFile> & temporary_file_)
: BackupEntryFromImmutableFile(file_path_, file_size_, checksum_, temporary_file_)
, limit(BackupEntryFromImmutableFile::getSize())
{
}
BackupEntryFromAppendOnlyFile::BackupEntryFromAppendOnlyFile( BackupEntryFromAppendOnlyFile::BackupEntryFromAppendOnlyFile(
const DiskPtr & disk_, const DiskPtr & disk_,
const String & file_path_, const String & file_path_,

View File

@ -11,13 +11,6 @@ namespace DB
class BackupEntryFromAppendOnlyFile : public BackupEntryFromImmutableFile class BackupEntryFromAppendOnlyFile : public BackupEntryFromImmutableFile
{ {
public: public:
/// The constructor is allowed to not set `file_size_` or `checksum_`, in that case it will be calculated from the data.
explicit BackupEntryFromAppendOnlyFile(
const String & file_path_,
const std::optional<UInt64> & file_size_ = {},
const std::optional<UInt128> & checksum_ = {},
const std::shared_ptr<Poco::TemporaryFile> & temporary_file_ = {});
BackupEntryFromAppendOnlyFile( BackupEntryFromAppendOnlyFile(
const DiskPtr & disk_, const DiskPtr & disk_,
const String & file_path_, const String & file_path_,

View File

@ -2,20 +2,12 @@
#include <Disks/IDisk.h> #include <Disks/IDisk.h>
#include <Disks/IO/createReadBufferFromFileBase.h> #include <Disks/IO/createReadBufferFromFileBase.h>
#include <Poco/File.h> #include <Poco/File.h>
#include <Common/filesystemHelpers.h>
namespace DB namespace DB
{ {
BackupEntryFromImmutableFile::BackupEntryFromImmutableFile(
const String & file_path_,
const std::optional<UInt64> & file_size_,
const std::optional<UInt128> & checksum_,
const std::shared_ptr<Poco::TemporaryFile> & temporary_file_)
: file_path(file_path_), file_size(file_size_), checksum(checksum_), temporary_file(temporary_file_)
{
}
BackupEntryFromImmutableFile::BackupEntryFromImmutableFile( BackupEntryFromImmutableFile::BackupEntryFromImmutableFile(
const DiskPtr & disk_, const DiskPtr & disk_,
const String & file_path_, const String & file_path_,
@ -32,16 +24,24 @@ UInt64 BackupEntryFromImmutableFile::getSize() const
{ {
std::lock_guard lock{get_file_size_mutex}; std::lock_guard lock{get_file_size_mutex};
if (!file_size) if (!file_size)
file_size = disk ? disk->getFileSize(file_path) : Poco::File(file_path).getSize(); file_size = disk->getFileSize(file_path);
return *file_size; return *file_size;
} }
std::unique_ptr<SeekableReadBuffer> BackupEntryFromImmutableFile::getReadBuffer() const std::unique_ptr<SeekableReadBuffer> BackupEntryFromImmutableFile::getReadBuffer() const
{ {
if (disk) return disk->readFile(file_path);
return disk->readFile(file_path); }
else
return createReadBufferFromFileBase(file_path, /* settings= */ {});
DataSourceDescription BackupEntryFromImmutableFile::getDataSourceDescription() const
{
return disk->getDataSourceDescription();
}
String BackupEntryFromImmutableFile::getFilePath() const
{
return file_path;
} }
} }

View File

@ -16,13 +16,6 @@ using DiskPtr = std::shared_ptr<IDisk>;
class BackupEntryFromImmutableFile : public IBackupEntry class BackupEntryFromImmutableFile : public IBackupEntry
{ {
public: public:
/// The constructor is allowed to not set `file_size_` or `checksum_`, in that case it will be calculated from the data.
explicit BackupEntryFromImmutableFile(
const String & file_path_,
const std::optional<UInt64> & file_size_ = {},
const std::optional<UInt128> & checksum_ = {},
const std::shared_ptr<Poco::TemporaryFile> & temporary_file_ = {});
BackupEntryFromImmutableFile( BackupEntryFromImmutableFile(
const DiskPtr & disk_, const DiskPtr & disk_,
const String & file_path_, const String & file_path_,
@ -36,8 +29,10 @@ public:
std::optional<UInt128> getChecksum() const override { return checksum; } std::optional<UInt128> getChecksum() const override { return checksum; }
std::unique_ptr<SeekableReadBuffer> getReadBuffer() const override; std::unique_ptr<SeekableReadBuffer> getReadBuffer() const override;
String getFilePath() const { return file_path; } String getFilePath() const override;
DiskPtr getDisk() const { return disk; } DataSourceDescription getDataSourceDescription() const override;
DiskPtr tryGetDiskIfExists() const override { return disk; }
private: private:
const DiskPtr disk; const DiskPtr disk;

View File

@ -19,6 +19,18 @@ public:
std::optional<UInt128> getChecksum() const override { return checksum; } std::optional<UInt128> getChecksum() const override { return checksum; }
std::unique_ptr<SeekableReadBuffer> getReadBuffer() const override; std::unique_ptr<SeekableReadBuffer> getReadBuffer() const override;
String getFilePath() const override
{
return "";
}
DataSourceDescription getDataSourceDescription() const override
{
return DataSourceDescription{DataSourceType::RAM, "", false, false};
}
DiskPtr tryGetDiskIfExists() const override { return nullptr; }
private: private:
const String data; const String data;
const std::optional<UInt128> checksum; const std::optional<UInt128> checksum;

View File

@ -36,4 +36,5 @@ BackupEntryFromSmallFile::BackupEntryFromSmallFile(
: BackupEntryFromMemory(readFile(disk_, file_path_), checksum_), disk(disk_), file_path(file_path_) : BackupEntryFromMemory(readFile(disk_, file_path_), checksum_), disk(disk_), file_path(file_path_)
{ {
} }
} }

View File

@ -23,9 +23,9 @@ public:
const String & file_path_, const String & file_path_,
const std::optional<UInt128> & checksum_ = {}); const std::optional<UInt128> & checksum_ = {});
String getFilePath() const { return file_path; } String getFilePath() const override { return file_path; }
DiskPtr getDisk() const { return disk; }
DiskPtr tryGetDiskIfExists() const override { return disk; }
private: private:
const DiskPtr disk; const DiskPtr disk;
const String file_path; const String file_path;

27
src/Backups/BackupIO.cpp Normal file
View File

@ -0,0 +1,27 @@
#include <Backups/BackupIO.h>
#include <IO/copyData.h>
#include <IO/WriteBuffer.h>
#include <IO/SeekableReadBuffer.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
void IBackupWriter::copyFileThroughBuffer(std::unique_ptr<SeekableReadBuffer> && source, const String & file_name)
{
auto write_buffer = writeFile(file_name);
copyData(*source, *write_buffer);
write_buffer->finalize();
}
void IBackupWriter::copyFileNative(DiskPtr /* from_disk */, const String & /* file_name_from */, const String & /* file_name_to */)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Native copy not implemented for backup writer");
}
}

View File

@ -1,6 +1,8 @@
#pragma once #pragma once
#include <Core/Types.h> #include <Core/Types.h>
#include <Disks/DiskType.h>
#include <Disks/IDisk.h>
namespace DB namespace DB
{ {
@ -15,6 +17,7 @@ public:
virtual bool fileExists(const String & file_name) = 0; virtual bool fileExists(const String & file_name) = 0;
virtual UInt64 getFileSize(const String & file_name) = 0; virtual UInt64 getFileSize(const String & file_name) = 0;
virtual std::unique_ptr<SeekableReadBuffer> readFile(const String & file_name) = 0; virtual std::unique_ptr<SeekableReadBuffer> readFile(const String & file_name) = 0;
virtual DataSourceDescription getDataSourceDescription() const = 0;
}; };
/// Represents operations of storing to disk or uploading for writing a backup. /// Represents operations of storing to disk or uploading for writing a backup.
@ -27,6 +30,15 @@ public:
virtual bool fileContentsEqual(const String & file_name, const String & expected_file_contents) = 0; virtual bool fileContentsEqual(const String & file_name, const String & expected_file_contents) = 0;
virtual std::unique_ptr<WriteBuffer> writeFile(const String & file_name) = 0; virtual std::unique_ptr<WriteBuffer> writeFile(const String & file_name) = 0;
virtual void removeFiles(const Strings & file_names) = 0; virtual void removeFiles(const Strings & file_names) = 0;
virtual DataSourceDescription getDataSourceDescription() const = 0;
virtual void copyFileThroughBuffer(std::unique_ptr<SeekableReadBuffer> && source, const String & file_name);
virtual bool supportNativeCopy(DataSourceDescription /* data_source_description */) const
{
return false;
}
virtual void copyFileNative(DiskPtr from_disk, const String & file_name_from, const String & file_name_to);
}; };
} }

View File

@ -6,6 +6,12 @@
namespace DB namespace DB
{ {
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
BackupReaderDisk::BackupReaderDisk(const DiskPtr & disk_, const String & path_) : disk(disk_), path(path_) BackupReaderDisk::BackupReaderDisk(const DiskPtr & disk_, const String & path_) : disk(disk_), path(path_)
{ {
} }
@ -77,4 +83,28 @@ void BackupWriterDisk::removeFiles(const Strings & file_names)
disk->removeDirectory(path); disk->removeDirectory(path);
} }
DataSourceDescription BackupWriterDisk::getDataSourceDescription() const
{
return disk->getDataSourceDescription();
}
DataSourceDescription BackupReaderDisk::getDataSourceDescription() const
{
return disk->getDataSourceDescription();
}
bool BackupWriterDisk::supportNativeCopy(DataSourceDescription data_source_description) const
{
return data_source_description == disk->getDataSourceDescription();
}
void BackupWriterDisk::copyFileNative(DiskPtr from_disk, const String & file_name_from, const String & file_name_to)
{
if (!from_disk)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot natively copy data to disk without source disk");
auto file_path = path / file_name_to;
disk->createDirectories(file_path.parent_path());
from_disk->copyFile(file_name_from, *disk, file_path);
}
} }

View File

@ -17,6 +17,7 @@ public:
bool fileExists(const String & file_name) override; bool fileExists(const String & file_name) override;
UInt64 getFileSize(const String & file_name) override; UInt64 getFileSize(const String & file_name) override;
std::unique_ptr<SeekableReadBuffer> readFile(const String & file_name) override; std::unique_ptr<SeekableReadBuffer> readFile(const String & file_name) override;
DataSourceDescription getDataSourceDescription() const override;
private: private:
DiskPtr disk; DiskPtr disk;
@ -34,7 +35,11 @@ public:
bool fileContentsEqual(const String & file_name, const String & expected_file_contents) override; bool fileContentsEqual(const String & file_name, const String & expected_file_contents) override;
std::unique_ptr<WriteBuffer> writeFile(const String & file_name) override; std::unique_ptr<WriteBuffer> writeFile(const String & file_name) override;
void removeFiles(const Strings & file_names) override; void removeFiles(const Strings & file_names) override;
DataSourceDescription getDataSourceDescription() const override;
bool supportNativeCopy(DataSourceDescription data_source_description) const override;
void copyFileNative(DiskPtr from_disk, const String & file_name_from, const String & file_name_to) override;
private: private:
DiskPtr disk; DiskPtr disk;
std::filesystem::path path; std::filesystem::path path;

View File

@ -1,6 +1,8 @@
#include <Backups/BackupIO_File.h> #include <Backups/BackupIO_File.h>
#include <Disks/IO/createReadBufferFromFileBase.h> #include <Disks/IO/createReadBufferFromFileBase.h>
#include <IO/WriteBufferFromFile.h> #include <IO/WriteBufferFromFile.h>
#include <IO/copyData.h>
#include <Common/filesystemHelpers.h>
namespace fs = std::filesystem; namespace fs = std::filesystem;
@ -78,4 +80,55 @@ void BackupWriterFile::removeFiles(const Strings & file_names)
fs::remove(path); fs::remove(path);
} }
DataSourceDescription BackupWriterFile::getDataSourceDescription() const
{
DataSourceDescription data_source_description;
data_source_description.type = DataSourceType::Local;
if (auto block_device_id = tryGetBlockDeviceId(path); block_device_id.has_value())
data_source_description.description = *block_device_id;
else
data_source_description.description = path;
data_source_description.is_encrypted = false;
data_source_description.is_cached = false;
return data_source_description;
}
DataSourceDescription BackupReaderFile::getDataSourceDescription() const
{
DataSourceDescription data_source_description;
data_source_description.type = DataSourceType::Local;
if (auto block_device_id = tryGetBlockDeviceId(path); block_device_id.has_value())
data_source_description.description = *block_device_id;
else
data_source_description.description = path;
data_source_description.is_encrypted = false;
data_source_description.is_cached = false;
return data_source_description;
}
bool BackupWriterFile::supportNativeCopy(DataSourceDescription data_source_description) const
{
return data_source_description == getDataSourceDescription();
}
void BackupWriterFile::copyFileNative(DiskPtr from_disk, const String & file_name_from, const String & file_name_to)
{
auto file_path = path / file_name_to;
fs::create_directories(file_path.parent_path());
std::string abs_source_path;
if (from_disk)
abs_source_path = fullPath(from_disk, file_name_from);
else
abs_source_path = fs::absolute(file_name_from);
fs::copy(abs_source_path, file_path, fs::copy_options::recursive | fs::copy_options::overwrite_existing);
}
} }

View File

@ -9,12 +9,13 @@ namespace DB
class BackupReaderFile : public IBackupReader class BackupReaderFile : public IBackupReader
{ {
public: public:
BackupReaderFile(const String & path_); explicit BackupReaderFile(const String & path_);
~BackupReaderFile() override; ~BackupReaderFile() override;
bool fileExists(const String & file_name) override; bool fileExists(const String & file_name) override;
UInt64 getFileSize(const String & file_name) override; UInt64 getFileSize(const String & file_name) override;
std::unique_ptr<SeekableReadBuffer> readFile(const String & file_name) override; std::unique_ptr<SeekableReadBuffer> readFile(const String & file_name) override;
DataSourceDescription getDataSourceDescription() const override;
private: private:
std::filesystem::path path; std::filesystem::path path;
@ -23,7 +24,7 @@ private:
class BackupWriterFile : public IBackupWriter class BackupWriterFile : public IBackupWriter
{ {
public: public:
BackupWriterFile(const String & path_); explicit BackupWriterFile(const String & path_);
~BackupWriterFile() override; ~BackupWriterFile() override;
bool fileExists(const String & file_name) override; bool fileExists(const String & file_name) override;
@ -31,6 +32,10 @@ public:
bool fileContentsEqual(const String & file_name, const String & expected_file_contents) override; bool fileContentsEqual(const String & file_name, const String & expected_file_contents) override;
std::unique_ptr<WriteBuffer> writeFile(const String & file_name) override; std::unique_ptr<WriteBuffer> writeFile(const String & file_name) override;
void removeFiles(const Strings & file_names) override; void removeFiles(const Strings & file_names) override;
DataSourceDescription getDataSourceDescription() const override;
bool supportNativeCopy(DataSourceDescription data_source_description) const override;
void copyFileNative(DiskPtr from_disk, const String & file_name_from, const String & file_name_to) override;
private: private:
std::filesystem::path path; std::filesystem::path path;

View File

@ -111,6 +111,22 @@ public:
UInt64 getSize() const override { return size; } UInt64 getSize() const override { return size; }
std::optional<UInt128> getChecksum() const override { return checksum; } std::optional<UInt128> getChecksum() const override { return checksum; }
String getFilePath() const override
{
return data_file_name;
}
DiskPtr tryGetDiskIfExists() const override
{
return nullptr;
}
DataSourceDescription getDataSourceDescription() const override
{
return backup->reader->getDataSourceDescription();
}
private: private:
const std::shared_ptr<const BackupImpl> backup; const std::shared_ptr<const BackupImpl> backup;
const String archive_suffix; const String archive_suffix;
@ -587,9 +603,86 @@ BackupEntryPtr BackupImpl::readFile(const SizeAndChecksum & size_and_checksum) c
} }
} }
namespace
{
std::optional<SizeAndChecksum> getInfoAboutFileFromBaseBackupIfExists(std::shared_ptr<const IBackup> base_backup, const std::string & file_path)
{
if (base_backup && base_backup->fileExists(file_path))
return std::pair{base_backup->getFileSize(file_path), base_backup->getFileChecksum(file_path)};
return std::nullopt;
}
enum class CheckBackupResult
{
HasPrefix,
HasFull,
HasNothing,
};
CheckBackupResult checkBaseBackupForFile(const SizeAndChecksum & base_backup_info, const FileInfo & new_entry_info)
{
/// We cannot reuse base backup because our file is smaller
/// than file stored in previous backup
if (new_entry_info.size > base_backup_info.first)
return CheckBackupResult::HasNothing;
if (base_backup_info.first == new_entry_info.size)
return CheckBackupResult::HasFull;
return CheckBackupResult::HasPrefix;
}
struct ChecksumsForNewEntry
{
UInt128 full_checksum;
UInt128 prefix_checksum;
};
/// Calculate checksum for backup entry if it's empty.
/// Also able to calculate additional checksum of some prefix.
ChecksumsForNewEntry calculateNewEntryChecksumsIfNeeded(BackupEntryPtr entry, size_t prefix_size)
{
if (prefix_size > 0)
{
auto read_buffer = entry->getReadBuffer();
HashingReadBuffer hashing_read_buffer(*read_buffer);
hashing_read_buffer.ignore(prefix_size);
auto prefix_checksum = hashing_read_buffer.getHash();
if (entry->getChecksum() == std::nullopt)
{
hashing_read_buffer.ignoreAll();
auto full_checksum = hashing_read_buffer.getHash();
return ChecksumsForNewEntry{full_checksum, prefix_checksum};
}
else
{
return ChecksumsForNewEntry{*(entry->getChecksum()), prefix_checksum};
}
}
else
{
if (entry->getChecksum() == std::nullopt)
{
auto read_buffer = entry->getReadBuffer();
HashingReadBuffer hashing_read_buffer(*read_buffer);
hashing_read_buffer.ignoreAll();
return ChecksumsForNewEntry{hashing_read_buffer.getHash(), 0};
}
else
{
return ChecksumsForNewEntry{*(entry->getChecksum()), 0};
}
}
}
}
void BackupImpl::writeFile(const String & file_name, BackupEntryPtr entry) void BackupImpl::writeFile(const String & file_name, BackupEntryPtr entry)
{ {
std::lock_guard lock{mutex}; std::lock_guard lock{mutex};
if (open_mode != OpenMode::WRITE) if (open_mode != OpenMode::WRITE)
throw Exception("Backup is not opened for writing", ErrorCodes::LOGICAL_ERROR); throw Exception("Backup is not opened for writing", ErrorCodes::LOGICAL_ERROR);
@ -597,164 +690,179 @@ void BackupImpl::writeFile(const String & file_name, BackupEntryPtr entry)
if (writing_finalized) if (writing_finalized)
throw Exception("Backup is already finalized", ErrorCodes::LOGICAL_ERROR); throw Exception("Backup is already finalized", ErrorCodes::LOGICAL_ERROR);
std::string from_file_name = "memory buffer";
if (auto fname = entry->getFilePath(); !fname.empty())
from_file_name = "file " + fname;
LOG_TRACE(log, "Writing backup for file {} from file {}", file_name, from_file_name);
auto adjusted_path = removeLeadingSlash(file_name); auto adjusted_path = removeLeadingSlash(file_name);
if (coordination->getFileInfo(adjusted_path)) if (coordination->getFileInfo(adjusted_path))
throw Exception( throw Exception(
ErrorCodes::BACKUP_ENTRY_ALREADY_EXISTS, "Backup {}: Entry {} already exists", backup_name, quoteString(file_name)); ErrorCodes::BACKUP_ENTRY_ALREADY_EXISTS, "Backup {}: Entry {} already exists", backup_name, quoteString(file_name));
FileInfo info; FileInfo info
info.file_name = adjusted_path; {
size_t size = entry->getSize(); .file_name = adjusted_path,
info.size = size; .size = entry->getSize(),
.base_size = 0,
.base_checksum = 0,
};
/// Check if the entry's data is empty. /// Empty file, nothing to backup
if (!info.size) if (info.size == 0)
{ {
coordination->addFileInfo(info); coordination->addFileInfo(info);
return; return;
} }
/// Maybe we have a copy of this file in the backup already. std::optional<SizeAndChecksum> base_backup_file_info = getInfoAboutFileFromBaseBackupIfExists(base_backup, adjusted_path);
std::optional<UInt128> checksum = entry->getChecksum();
if (checksum && coordination->getFileInfo(std::pair{size, *checksum}))
{
info.checksum = *checksum;
coordination->addFileInfo(info);
return;
}
/// Check if a entry with such name exists in the base backup. /// We have info about this file in base backup
bool base_exists = (base_backup && base_backup->fileExists(adjusted_path)); /// If file has no checksum -- calculate and fill it.
UInt64 base_size = 0; if (base_backup_file_info.has_value())
UInt128 base_checksum{0, 0};
if (base_exists)
{ {
base_size = base_backup->getFileSize(adjusted_path); LOG_TRACE(log, "File {} found in base backup, checking for equality", adjusted_path);
base_checksum = base_backup->getFileChecksum(adjusted_path); CheckBackupResult check_base = checkBaseBackupForFile(*base_backup_file_info, info);
}
std::unique_ptr<SeekableReadBuffer> read_buffer; /// We'll set that later. /// File with the same name but smaller size exist in previous backup
std::optional<HashingReadBuffer> hashing_read_buffer; if (check_base == CheckBackupResult::HasPrefix)
UInt64 hashing_pos = 0; /// Current position in `hashing_read_buffer`.
/// Determine whether it's possible to receive this entry's data from the base backup completely or partly.
bool use_base = false;
if (base_exists && base_size && (size >= base_size))
{
if (checksum && (size == base_size))
{ {
/// The size is the same, we need to compare checksums to find out auto checksums = calculateNewEntryChecksumsIfNeeded(entry, base_backup_file_info->first);
/// if the entry's data has not changed since the base backup. info.checksum = checksums.full_checksum;
use_base = (*checksum == base_checksum);
/// We have prefix of this file in backup with the same checksum.
/// In ClickHouse this can happen for StorageLog for example.
if (checksums.prefix_checksum == base_backup_file_info->second)
{
LOG_TRACE(log, "File prefix of {} in base backup, will write rest part of file to current backup", adjusted_path);
info.base_size = base_backup_file_info->first;
info.base_checksum = base_backup_file_info->second;
}
else
{
LOG_TRACE(log, "Prefix checksum of file {} doesn't match with checksum in base backup", adjusted_path);
}
} }
else else
{ {
/// The size has increased, we need to calculate a partial checksum to find out /// We have full file or have nothing, first of all let's get checksum
/// if the entry's data has only appended since the base backup. /// of current file
read_buffer = entry->getReadBuffer(); auto checksums = calculateNewEntryChecksumsIfNeeded(entry, 0);
hashing_read_buffer.emplace(*read_buffer); info.checksum = checksums.full_checksum;
hashing_read_buffer->ignore(base_size);
hashing_pos = base_size; if (info.checksum == base_backup_file_info->second)
UInt128 partial_checksum = hashing_read_buffer->getHash(); {
if (size == base_size) LOG_TRACE(log, "Found whole file {} in base backup", adjusted_path);
checksum = partial_checksum; assert(check_base == CheckBackupResult::HasFull);
if (partial_checksum == base_checksum) assert(info.size == base_backup_file_info->first);
use_base = true;
info.base_size = base_backup_file_info->first;
info.base_checksum = base_backup_file_info->second;
/// Actually we can add this info to coordination and exist,
/// but we intentionally don't do it, otherwise control flow
/// of this function will be very complex.
}
else
{
LOG_TRACE(log, "Whole file {} in base backup doesn't match by checksum", adjusted_path);
}
} }
} }
else /// We don't have info about this file_name (sic!) in base backup,
/// Finish calculating the checksum. /// however file could be renamed, so we will check one more time using size and checksum
if (!checksum)
{ {
if (!read_buffer)
read_buffer = entry->getReadBuffer(); LOG_TRACE(log, "Nothing found for file {} in base backup", adjusted_path);
if (!hashing_read_buffer) auto checksums = calculateNewEntryChecksumsIfNeeded(entry, 0);
hashing_read_buffer.emplace(*read_buffer); info.checksum = checksums.full_checksum;
hashing_read_buffer->ignore(size - hashing_pos);
checksum = hashing_read_buffer->getHash();
} }
hashing_read_buffer.reset();
info.checksum = *checksum;
/// Maybe we have a copy of this file in the backup already. /// Maybe we have a copy of this file in the backup already.
if (coordination->getFileInfo(std::pair{size, *checksum})) if (coordination->getFileInfo(std::pair{info.size, info.checksum}))
{ {
LOG_TRACE(log, "File {} already exist in current backup, adding reference", adjusted_path);
coordination->addFileInfo(info); coordination->addFileInfo(info);
return; return;
} }
/// Check if a entry with the same checksum exists in the base backup. /// On the previous lines we checked that backup for file with adjusted_name exist in previous backup.
if (base_backup && !use_base && base_backup->fileExists(std::pair{size, *checksum})) /// However file can be renamed, but has the same size and checksums, let's check for this case.
if (base_backup && base_backup->fileExists(std::pair{info.size, info.checksum}))
{ {
/// The entry's data has not changed since the base backup,
/// but the entry itself has been moved or renamed.
base_size = size;
base_checksum = *checksum;
use_base = true;
}
if (use_base) LOG_TRACE(log, "File {} doesn't exist in current backup, but we have file with same size and checksum", adjusted_path);
{ info.base_size = info.size;
info.base_size = base_size; info.base_checksum = info.checksum;
info.base_checksum = base_checksum;
}
if (use_base && (size == base_size))
{
/// The entry's data has not been changed since the base backup.
coordination->addFileInfo(info); coordination->addFileInfo(info);
return; return;
} }
bool is_data_file_required; /// All "short paths" failed. We don't have this file in previous or existing backup
/// or have only prefix of it in previous backup. Let's go long path.
info.data_file_name = info.file_name; info.data_file_name = info.file_name;
info.archive_suffix = current_archive_suffix; info.archive_suffix = current_archive_suffix;
bool is_data_file_required;
coordination->addFileInfo(info, is_data_file_required); coordination->addFileInfo(info, is_data_file_required);
if (!is_data_file_required) if (!is_data_file_required)
return; /// We copy data only if it's a new combination of size & checksum.
/// Either the entry wasn't exist in the base backup
/// or the entry has data appended to the end of the data from the base backup.
/// In both those cases we have to copy data to this backup.
/// Find out where the start position to copy data is.
auto copy_pos = use_base ? base_size : 0;
/// Move the current read position to the start position to copy data.
if (!read_buffer)
read_buffer = entry->getReadBuffer();
read_buffer->seek(copy_pos, SEEK_SET);
if (!num_files_written)
checkLockFile(true);
/// Copy the entry's data after `copy_pos`.
std::unique_ptr<WriteBuffer> out;
if (use_archives)
{ {
String archive_suffix = current_archive_suffix; LOG_TRACE(log, "File {} doesn't exist in current backup, but we have file with same size and checksum", adjusted_path);
bool next_suffix = false; return; /// We copy data only if it's a new combination of size & checksum.
if (current_archive_suffix.empty() && is_internal_backup) }
next_suffix = true; auto writer_description = writer->getDataSourceDescription();
/*if (archive_params.max_volume_size && current_archive_writer auto reader_description = entry->getDataSourceDescription();
&& (current_archive_writer->getTotalSize() + size - base_size > archive_params.max_volume_size))
next_suffix = true;*/ /// We need to copy whole file without archive, we can do it faster
if (next_suffix) /// if source and destination are compatible
current_archive_suffix = coordination->getNextArchiveSuffix(); if (!use_archives && info.base_size == 0 && writer->supportNativeCopy(reader_description))
if (info.archive_suffix != current_archive_suffix) {
{
info.archive_suffix = current_archive_suffix; LOG_TRACE(log, "Will copy file {} using native copy", adjusted_path);
coordination->updateFileInfo(info); /// Should be much faster than writing data through server
} writer->copyFileNative(entry->tryGetDiskIfExists(), entry->getFilePath(), info.data_file_name);
out = getArchiveWriter(current_archive_suffix)->writeFile(info.data_file_name);
} }
else else
{ {
out = writer->writeFile(info.data_file_name); LOG_TRACE(log, "Will copy file {} through memory buffers", adjusted_path);
auto read_buffer = entry->getReadBuffer();
/// If we have prefix in base we will seek to the start of the suffix which differs
if (info.base_size != 0)
read_buffer->seek(info.base_size, SEEK_SET);
if (!num_files_written)
checkLockFile(true);
if (use_archives)
{
LOG_TRACE(log, "Adding file {} to archive", adjusted_path);
String archive_suffix = current_archive_suffix;
bool next_suffix = false;
if (current_archive_suffix.empty() && is_internal_backup)
next_suffix = true;
/*if (archive_params.max_volume_size && current_archive_writer
&& (current_archive_writer->getTotalSize() + size - base_size > archive_params.max_volume_size))
next_suffix = true;*/
if (next_suffix)
current_archive_suffix = coordination->getNextArchiveSuffix();
if (info.archive_suffix != current_archive_suffix)
{
info.archive_suffix = current_archive_suffix;
coordination->updateFileInfo(info);
}
auto out = getArchiveWriter(current_archive_suffix)->writeFile(info.data_file_name);
copyData(*read_buffer, *out);
out->finalize();
}
else
{
writer->copyFileThroughBuffer(std::move(read_buffer), info.data_file_name);
}
} }
copyData(*read_buffer, *out);
out->finalize();
++num_files_written; ++num_files_written;
} }

View File

@ -90,7 +90,8 @@ namespace
} }
catch (...) catch (...)
{ {
coordination->setError(current_host, Exception{getCurrentExceptionCode(), getCurrentExceptionMessage(true, true)}); if (coordination)
coordination->setError(current_host, Exception{getCurrentExceptionCode(), getCurrentExceptionMessage(true, true)});
} }
} }
@ -164,9 +165,9 @@ OperationID BackupsWorker::startMakingBackup(const ASTPtr & query, const Context
backup_coordination = makeBackupCoordination(backup_settings.coordination_zk_path, context, backup_settings.internal); backup_coordination = makeBackupCoordination(backup_settings.coordination_zk_path, context, backup_settings.internal);
} }
auto backup_info = BackupInfo::fromAST(*backup_query->backup_name);
try try
{ {
auto backup_info = BackupInfo::fromAST(*backup_query->backup_name);
addInfo(backup_id, backup_info.toString(), backup_settings.internal, BackupStatus::CREATING_BACKUP); addInfo(backup_id, backup_info.toString(), backup_settings.internal, BackupStatus::CREATING_BACKUP);
/// Prepare context to use. /// Prepare context to use.
@ -213,6 +214,7 @@ OperationID BackupsWorker::startMakingBackup(const ASTPtr & query, const Context
} }
catch (...) catch (...)
{ {
tryLogCurrentException(log, fmt::format("Failed to start {} {}", (backup_settings.internal ? "internal backup" : "backup"), backup_info.toString()));
/// Something bad happened, the backup has not built. /// Something bad happened, the backup has not built.
setStatusSafe(backup_id, BackupStatus::BACKUP_FAILED); setStatusSafe(backup_id, BackupStatus::BACKUP_FAILED);
sendCurrentExceptionToCoordination(backup_coordination, backup_settings.host_id); sendCurrentExceptionToCoordination(backup_coordination, backup_settings.host_id);

View File

@ -20,6 +20,20 @@ public:
UInt64 getSize() const override { return getInternalBackupEntry()->getSize(); } UInt64 getSize() const override { return getInternalBackupEntry()->getSize(); }
std::optional<UInt128> getChecksum() const override { return getInternalBackupEntry()->getChecksum(); } std::optional<UInt128> getChecksum() const override { return getInternalBackupEntry()->getChecksum(); }
std::unique_ptr<SeekableReadBuffer> getReadBuffer() const override { return getInternalBackupEntry()->getReadBuffer(); } std::unique_ptr<SeekableReadBuffer> getReadBuffer() const override { return getInternalBackupEntry()->getReadBuffer(); }
String getFilePath() const override
{
return getInternalBackupEntry()->getFilePath();
}
DiskPtr tryGetDiskIfExists() const override
{
return getInternalBackupEntry()->tryGetDiskIfExists();
}
DataSourceDescription getDataSourceDescription() const override
{
return getInternalBackupEntry()->getDataSourceDescription();
}
private: private:
BackupEntryPtr getInternalBackupEntry() const BackupEntryPtr getInternalBackupEntry() const

View File

@ -4,6 +4,8 @@
#include <memory> #include <memory>
#include <optional> #include <optional>
#include <vector> #include <vector>
#include <Disks/DiskType.h>
#include <Disks/IDisk.h>
namespace DB namespace DB
{ {
@ -24,6 +26,12 @@ public:
/// Returns a read buffer for reading the data. /// Returns a read buffer for reading the data.
virtual std::unique_ptr<SeekableReadBuffer> getReadBuffer() const = 0; virtual std::unique_ptr<SeekableReadBuffer> getReadBuffer() const = 0;
virtual String getFilePath() const = 0;
virtual DiskPtr tryGetDiskIfExists() const = 0;
virtual DataSourceDescription getDataSourceDescription() const = 0;
}; };
using BackupEntryPtr = std::shared_ptr<const IBackupEntry>; using BackupEntryPtr = std::shared_ptr<const IBackupEntry>;

View File

@ -46,7 +46,7 @@ namespace
void checkPath(const String & disk_name, const DiskPtr & disk, fs::path & path) void checkPath(const String & disk_name, const DiskPtr & disk, fs::path & path)
{ {
path = path.lexically_normal(); path = path.lexically_normal();
if (!path.is_relative() && (disk->getType() == DiskType::Local)) if (!path.is_relative() && (disk->getDataSourceDescription().type == DataSourceType::Local))
path = path.lexically_proximate(disk->getPath()); path = path.lexically_proximate(disk->getPath());
bool path_ok = path.empty() || (path.is_relative() && (*path.begin() != "..")); bool path_ok = path.empty() || (path.is_relative() && (*path.begin() != ".."));

View File

@ -898,7 +898,7 @@ void FileCache::remove(
Key key, size_t offset, Key key, size_t offset,
std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & /* segment_lock */) std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & /* segment_lock */)
{ {
LOG_TEST(log, "Remove. Key: {}, offset: {}", key.toString(), offset); LOG_DEBUG(log, "Remove from cache. Key: {}, offset: {}", key.toString(), offset);
auto * cell = getCell(key, offset, cache_lock); auto * cell = getCell(key, offset, cache_lock);
if (!cell) if (!cell)

View File

@ -55,6 +55,7 @@ FileSegment::FileSegment(
case (State::DOWNLOADED): case (State::DOWNLOADED):
{ {
reserved_size = downloaded_size = size_; reserved_size = downloaded_size = size_;
is_downloaded = true;
break; break;
} }
case (State::SKIP_CACHE): case (State::SKIP_CACHE):
@ -728,7 +729,7 @@ void FileSegment::detach(
download_state = State::PARTIALLY_DOWNLOADED_NO_CONTINUATION; download_state = State::PARTIALLY_DOWNLOADED_NO_CONTINUATION;
downloader_id.clear(); downloader_id.clear();
LOG_TEST(log, "Detached file segment: {}", getInfoForLogImpl(segment_lock)); LOG_DEBUG(log, "Detached file segment: {}", getInfoForLogImpl(segment_lock));
} }
void FileSegment::markAsDetached(std::lock_guard<std::mutex> & /* segment_lock */) void FileSegment::markAsDetached(std::lock_guard<std::mutex> & /* segment_lock */)

View File

@ -24,9 +24,7 @@ IFileCachePriority::WriteIterator LRUFileCachePriority::add(const Key & key, siz
throw Exception( throw Exception(
ErrorCodes::LOGICAL_ERROR, ErrorCodes::LOGICAL_ERROR,
"Attempt to add duplicate queue entry to queue. (Key: {}, offset: {}, size: {})", "Attempt to add duplicate queue entry to queue. (Key: {}, offset: {}, size: {})",
entry.key.toString(), entry.key.toString(), entry.offset, entry.size);
entry.offset,
entry.size);
} }
#endif #endif
@ -36,6 +34,8 @@ IFileCachePriority::WriteIterator LRUFileCachePriority::add(const Key & key, siz
CurrentMetrics::add(CurrentMetrics::FilesystemCacheSize, size); CurrentMetrics::add(CurrentMetrics::FilesystemCacheSize, size);
CurrentMetrics::add(CurrentMetrics::FilesystemCacheElements); CurrentMetrics::add(CurrentMetrics::FilesystemCacheElements);
LOG_DEBUG(log, "Added entry into LRU queue, key: {}, offset: {}", key.toString(), offset);
return std::make_shared<LRUFileCacheIterator>(this, iter); return std::make_shared<LRUFileCacheIterator>(this, iter);
} }
@ -54,6 +54,8 @@ void LRUFileCachePriority::removeAll(std::lock_guard<std::mutex> &)
CurrentMetrics::sub(CurrentMetrics::FilesystemCacheSize, cache_size); CurrentMetrics::sub(CurrentMetrics::FilesystemCacheSize, cache_size);
CurrentMetrics::sub(CurrentMetrics::FilesystemCacheElements, queue.size()); CurrentMetrics::sub(CurrentMetrics::FilesystemCacheElements, queue.size());
LOG_DEBUG(log, "Removed all entries from LRU queue");
queue.clear(); queue.clear();
cache_size = 0; cache_size = 0;
} }
@ -86,6 +88,8 @@ void LRUFileCachePriority::LRUFileCacheIterator::removeAndGetNext(std::lock_guar
CurrentMetrics::sub(CurrentMetrics::FilesystemCacheSize, queue_iter->size); CurrentMetrics::sub(CurrentMetrics::FilesystemCacheSize, queue_iter->size);
CurrentMetrics::sub(CurrentMetrics::FilesystemCacheElements); CurrentMetrics::sub(CurrentMetrics::FilesystemCacheElements);
LOG_DEBUG(cache_priority->log, "Removed entry from LRU queue, key: {}, offset: {}", queue_iter->key.toString(), queue_iter->offset);
queue_iter = cache_priority->queue.erase(queue_iter); queue_iter = cache_priority->queue.erase(queue_iter);
} }

View File

@ -2,6 +2,7 @@
#include <list> #include <list>
#include <Common/IFileCachePriority.h> #include <Common/IFileCachePriority.h>
#include <Common/logger_useful.h>
namespace DB namespace DB
{ {
@ -32,6 +33,7 @@ public:
private: private:
LRUQueue queue; LRUQueue queue;
Poco::Logger * log = &Poco::Logger::get("LRUFileCachePriority");
}; };
class LRUFileCachePriority::LRUFileCacheIterator : public IFileCachePriority::IIterator class LRUFileCachePriority::LRUFileCacheIterator : public IFileCachePriority::IIterator

View File

@ -79,6 +79,22 @@ String getBlockDeviceId([[maybe_unused]] const String & path)
#endif #endif
} }
std::optional<String> tryGetBlockDeviceId([[maybe_unused]] const String & path)
{
#if defined(OS_LINUX)
struct stat sb;
if (lstat(path.c_str(), &sb))
return {};
WriteBufferFromOwnString ss;
ss << major(sb.st_dev) << ":" << minor(sb.st_dev);
return ss.str();
#else
return {};
#endif
}
#if !defined(OS_LINUX) #if !defined(OS_LINUX)
[[noreturn]] [[noreturn]]
#endif #endif

View File

@ -25,6 +25,8 @@ std::unique_ptr<TemporaryFile> createTemporaryFile(const std::string & path);
#endif #endif
String getBlockDeviceId([[maybe_unused]] const String & path); String getBlockDeviceId([[maybe_unused]] const String & path);
std::optional<String> tryGetBlockDeviceId([[maybe_unused]] const String & path);
enum class BlockDeviceType enum class BlockDeviceType
{ {
UNKNOWN = 0, // we were unable to determine device type UNKNOWN = 0, // we were unable to determine device type

View File

@ -90,24 +90,56 @@ struct DataTypeDecimalTrait
* Sign of `fractional` is expected to be positive, otherwise result is undefined. * Sign of `fractional` is expected to be positive, otherwise result is undefined.
* If `scale` is to big (scale > max_precision<DecimalType::NativeType>), result is undefined. * If `scale` is to big (scale > max_precision<DecimalType::NativeType>), result is undefined.
*/ */
template <typename DecimalType>
inline DecimalType decimalFromComponentsWithMultiplier( template <typename DecimalType, bool throw_on_error>
const typename DecimalType::NativeType & whole, inline bool decimalFromComponentsWithMultiplierImpl(
const typename DecimalType::NativeType & fractional, const typename DecimalType::NativeType & whole,
typename DecimalType::NativeType scale_multiplier) const typename DecimalType::NativeType & fractional,
typename DecimalType::NativeType scale_multiplier,
DecimalType & result)
{ {
using T = typename DecimalType::NativeType; using T = typename DecimalType::NativeType;
const auto fractional_sign = whole < 0 ? -1 : 1; const auto fractional_sign = whole < 0 ? -1 : 1;
T whole_scaled = 0; T whole_scaled = 0;
if (common::mulOverflow(whole, scale_multiplier, whole_scaled)) if (common::mulOverflow(whole, scale_multiplier, whole_scaled))
throw Exception("Decimal math overflow", ErrorCodes::DECIMAL_OVERFLOW); {
if constexpr (throw_on_error)
throw Exception("Decimal math overflow", ErrorCodes::DECIMAL_OVERFLOW);
return false;
}
T value; T value;
if (common::addOverflow(whole_scaled, fractional_sign * (fractional % scale_multiplier), value)) if (common::addOverflow(whole_scaled, fractional_sign * (fractional % scale_multiplier), value))
throw Exception("Decimal math overflow", ErrorCodes::DECIMAL_OVERFLOW); {
if constexpr (throw_on_error)
throw Exception("Decimal math overflow", ErrorCodes::DECIMAL_OVERFLOW);
return false;
}
return DecimalType(value); result = DecimalType(value);
return true;
}
template <typename DecimalType>
inline DecimalType decimalFromComponentsWithMultiplier(
const typename DecimalType::NativeType & whole,
const typename DecimalType::NativeType & fractional,
typename DecimalType::NativeType scale_multiplier)
{
DecimalType result;
decimalFromComponentsWithMultiplierImpl<DecimalType, true>(whole, fractional, scale_multiplier, result);
return result;
}
template <typename DecimalType>
inline bool tryGetDecimalFromComponentsWithMultiplier(
const typename DecimalType::NativeType & whole,
const typename DecimalType::NativeType & fractional,
typename DecimalType::NativeType scale_multiplier,
DecimalType & result)
{
return decimalFromComponentsWithMultiplierImpl<DecimalType, false>(whole, fractional, scale_multiplier, result);
} }
template <typename DecimalType> template <typename DecimalType>
@ -118,6 +150,15 @@ inline DecimalType decimalFromComponentsWithMultiplier(
return decimalFromComponentsWithMultiplier<DecimalType>(components.whole, components.fractional, scale_multiplier); return decimalFromComponentsWithMultiplier<DecimalType>(components.whole, components.fractional, scale_multiplier);
} }
template <typename DecimalType>
inline bool tryGetDecimalFromComponentsWithMultiplier(
const DecimalComponents<DecimalType> & components,
typename DecimalType::NativeType scale_multiplier,
DecimalType & result)
{
return tryGetDecimalFromComponentsWithMultiplier<DecimalType>(components.whole, components.fractional, scale_multiplier, result);
}
/** Make a decimal value from whole and fractional components with given scale. /** Make a decimal value from whole and fractional components with given scale.
* *
@ -134,6 +175,18 @@ inline DecimalType decimalFromComponents(
return decimalFromComponentsWithMultiplier<DecimalType>(whole, fractional, scaleMultiplier<T>(scale)); return decimalFromComponentsWithMultiplier<DecimalType>(whole, fractional, scaleMultiplier<T>(scale));
} }
template <typename DecimalType>
inline bool tryGetDecimalFromComponents(
const typename DecimalType::NativeType & whole,
const typename DecimalType::NativeType & fractional,
UInt32 scale,
DecimalType & result)
{
using T = typename DecimalType::NativeType;
return tryGetDecimalFromComponentsWithMultiplier<DecimalType>(whole, fractional, scaleMultiplier<T>(scale), result);
}
/** Make a decimal value from whole and fractional components with given scale. /** Make a decimal value from whole and fractional components with given scale.
* @see `decimalFromComponentsWithMultiplier` for details. * @see `decimalFromComponentsWithMultiplier` for details.
*/ */
@ -145,6 +198,15 @@ inline DecimalType decimalFromComponents(
return decimalFromComponents<DecimalType>(components.whole, components.fractional, scale); return decimalFromComponents<DecimalType>(components.whole, components.fractional, scale);
} }
template <typename DecimalType>
inline bool tryGetDecimalFromComponents(
const DecimalComponents<DecimalType> & components,
UInt32 scale,
DecimalType & result)
{
return tryGetDecimalFromComponents<DecimalType>(components.whole, components.fractional, scale, result);
}
/** Split decimal into whole and fractional parts with given scale_multiplier. /** Split decimal into whole and fractional parts with given scale_multiplier.
* This is an optimization to reduce number of calls to scaleMultiplier on known scale. * This is an optimization to reduce number of calls to scaleMultiplier on known scale.
*/ */

View File

@ -90,6 +90,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(UInt64, s3_max_connections, 1024, "The maximum number of connections per server.", 0) \ M(UInt64, s3_max_connections, 1024, "The maximum number of connections per server.", 0) \
M(Bool, s3_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables.", 0) \ M(Bool, s3_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables.", 0) \
M(Bool, s3_create_new_file_on_insert, false, "Enables or disables creating a new file on each insert in s3 engine tables", 0) \ M(Bool, s3_create_new_file_on_insert, false, "Enables or disables creating a new file on each insert in s3 engine tables", 0) \
M(Bool, s3_check_objects_after_upload, false, "Check each uploaded object to s3 with head request to be sure that upload was successful", 0) \
M(Bool, enable_s3_requests_logging, false, "Enable very explicit logging of S3 requests. Makes sense for debug only.", 0) \ M(Bool, enable_s3_requests_logging, false, "Enable very explicit logging of S3 requests. Makes sense for debug only.", 0) \
M(UInt64, hdfs_replication, 0, "The actual number of replications can be specified when the hdfs file is created.", 0) \ M(UInt64, hdfs_replication, 0, "The actual number of replications can be specified when the hdfs file is created.", 0) \
M(Bool, hdfs_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables", 0) \ M(Bool, hdfs_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables", 0) \

View File

@ -72,7 +72,7 @@ public:
void sync(int fd) const; void sync(int fd) const;
String getUniqueId(const String & path) const override { return delegate->getUniqueId(path); } String getUniqueId(const String & path) const override { return delegate->getUniqueId(path); }
bool checkUniqueId(const String & id) const override { return delegate->checkUniqueId(id); } bool checkUniqueId(const String & id) const override { return delegate->checkUniqueId(id); }
DiskType getType() const override { return delegate->getType(); } DataSourceDescription getDataSourceDescription() const override { return delegate->getDataSourceDescription(); }
bool isRemote() const override { return delegate->isRemote(); } bool isRemote() const override { return delegate->isRemote(); }
bool supportZeroCopyReplication() const override { return delegate->supportZeroCopyReplication(); } bool supportZeroCopyReplication() const override { return delegate->supportZeroCopyReplication(); }
bool supportParallelWrite() const override { return delegate->supportParallelWrite(); } bool supportParallelWrite() const override { return delegate->supportParallelWrite(); }

View File

@ -234,7 +234,13 @@ public:
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap & map) override; void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap & map) override;
DiskType getType() const override { return DiskType::Encrypted; } DataSourceDescription getDataSourceDescription() const override
{
auto delegate_description = delegate->getDataSourceDescription();
delegate_description.is_encrypted = true;
return delegate_description;
}
bool isRemote() const override { return delegate->isRemote(); } bool isRemote() const override { return delegate->isRemote(); }
SyncGuardPtr getDirectorySyncGuard(const String & path) const override; SyncGuardPtr getDirectorySyncGuard(const String & path) const override;

View File

@ -230,14 +230,14 @@ std::optional<UInt64> DiskLocal::tryReserve(UInt64 bytes)
if (bytes == 0) if (bytes == 0)
{ {
LOG_DEBUG(log, "Reserving 0 bytes on disk {}", backQuote(name)); LOG_DEBUG(logger, "Reserving 0 bytes on disk {}", backQuote(name));
++reservation_count; ++reservation_count;
return {unreserved_space}; return {unreserved_space};
} }
if (unreserved_space >= bytes) if (unreserved_space >= bytes)
{ {
LOG_DEBUG(log, "Reserving {} on disk {}, having unreserved {}.", LOG_DEBUG(logger, "Reserving {} on disk {}, having unreserved {}.",
ReadableSize(bytes), backQuote(name), ReadableSize(unreserved_space)); ReadableSize(bytes), backQuote(name), ReadableSize(unreserved_space));
++reservation_count; ++reservation_count;
reserved_bytes += bytes; reserved_bytes += bytes;
@ -497,6 +497,14 @@ DiskLocal::DiskLocal(const String & name_, const String & path_, UInt64 keep_fre
, keep_free_space_bytes(keep_free_space_bytes_) , keep_free_space_bytes(keep_free_space_bytes_)
, logger(&Poco::Logger::get("DiskLocal")) , logger(&Poco::Logger::get("DiskLocal"))
{ {
data_source_description.type = DataSourceType::Local;
if (auto block_device_id = tryGetBlockDeviceId(disk_path); block_device_id.has_value())
data_source_description.description = *block_device_id;
else
data_source_description.description = disk_path;
data_source_description.is_encrypted = false;
data_source_description.is_cached = false;
} }
DiskLocal::DiskLocal( DiskLocal::DiskLocal(
@ -507,6 +515,11 @@ DiskLocal::DiskLocal(
disk_checker = std::make_unique<DiskLocalCheckThread>(this, context, local_disk_check_period_ms); disk_checker = std::make_unique<DiskLocalCheckThread>(this, context, local_disk_check_period_ms);
} }
DataSourceDescription DiskLocal::getDataSourceDescription() const
{
return data_source_description;
}
void DiskLocal::startup(ContextPtr) void DiskLocal::startup(ContextPtr)
{ {
try try
@ -615,7 +628,6 @@ DiskObjectStoragePtr DiskLocal::createDiskObjectStorage()
"Local", "Local",
metadata_storage, metadata_storage,
object_storage, object_storage,
DiskType::Local,
false, false,
/* threadpool_size */16 /* threadpool_size */16
); );
@ -714,6 +726,13 @@ void DiskLocal::chmod(const String & path, mode_t mode)
DB::throwFromErrnoWithPath("Cannot chmod file: " + path, path, DB::ErrorCodes::PATH_ACCESS_DENIED); DB::throwFromErrnoWithPath("Cannot chmod file: " + path, path, DB::ErrorCodes::PATH_ACCESS_DENIED);
} }
MetadataStoragePtr DiskLocal::getMetadataStorage()
{
auto object_storage = std::make_shared<LocalObjectStorage>();
return std::make_shared<FakeMetadataStorageFromDisk>(
std::static_pointer_cast<IDisk>(shared_from_this()), object_storage, getPath());
}
void registerDiskLocal(DiskFactory & factory) void registerDiskLocal(DiskFactory & factory)
{ {
auto creator = [](const String & name, auto creator = [](const String & name,

View File

@ -101,7 +101,8 @@ public:
void truncateFile(const String & path, size_t size) override; void truncateFile(const String & path, size_t size) override;
DiskType getType() const override { return DiskType::Local; } DataSourceDescription getDataSourceDescription() const override;
bool isRemote() const override { return false; } bool isRemote() const override { return false; }
bool supportZeroCopyReplication() const override { return false; } bool supportZeroCopyReplication() const override { return false; }
@ -130,6 +131,8 @@ public:
bool supportsChmod() const override { return true; } bool supportsChmod() const override { return true; }
void chmod(const String & path, mode_t mode) override; void chmod(const String & path, mode_t mode) override;
MetadataStoragePtr getMetadataStorage() override;
private: private:
std::optional<UInt64> tryReserve(UInt64 bytes); std::optional<UInt64> tryReserve(UInt64 bytes);
@ -145,14 +148,13 @@ private:
const String disk_checker_path = ".disk_checker_file"; const String disk_checker_path = ".disk_checker_file";
std::atomic<UInt64> keep_free_space_bytes; std::atomic<UInt64> keep_free_space_bytes;
Poco::Logger * logger; Poco::Logger * logger;
DataSourceDescription data_source_description;
UInt64 reserved_bytes = 0; UInt64 reserved_bytes = 0;
UInt64 reservation_count = 0; UInt64 reservation_count = 0;
static std::mutex reservation_mutex; static std::mutex reservation_mutex;
Poco::Logger * log = &Poco::Logger::get("DiskLocal");
std::atomic<bool> broken{false}; std::atomic<bool> broken{false};
std::atomic<bool> readonly{false}; std::atomic<bool> readonly{false};
std::unique_ptr<DiskLocalCheckThread> disk_checker; std::unique_ptr<DiskLocalCheckThread> disk_checker;

View File

@ -7,6 +7,8 @@
#include <IO/WriteBufferFromString.h> #include <IO/WriteBufferFromString.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Disks/ObjectStorages/LocalObjectStorage.h>
#include <Disks/ObjectStorages/FakeMetadataStorageFromDisk.h>
namespace DB namespace DB
{ {
@ -443,6 +445,13 @@ void DiskMemory::truncateFile(const String & path, size_t size)
file_it->second.data.resize(size); file_it->second.data.resize(size);
} }
MetadataStoragePtr DiskMemory::getMetadataStorage()
{
auto object_storage = std::make_shared<LocalObjectStorage>();
return std::make_shared<FakeMetadataStorageFromDisk>(
std::static_pointer_cast<IDisk>(shared_from_this()), object_storage, getPath());
}
using DiskMemoryPtr = std::shared_ptr<DiskMemory>; using DiskMemoryPtr = std::shared_ptr<DiskMemory>;

View File

@ -91,11 +91,14 @@ public:
void truncateFile(const String & path, size_t size) override; void truncateFile(const String & path, size_t size) override;
DiskType getType() const override { return DiskType::RAM; } DataSourceDescription getDataSourceDescription() const override { return DataSourceDescription{DataSourceType::RAM, "", false, false}; }
bool isRemote() const override { return false; } bool isRemote() const override { return false; }
bool supportZeroCopyReplication() const override { return false; } bool supportZeroCopyReplication() const override { return false; }
MetadataStoragePtr getMetadataStorage() override;
private: private:
void createDirectoriesImpl(const String & path); void createDirectoriesImpl(const String & path);
void replaceFileImpl(const String & from_path, const String & to_path); void replaceFileImpl(const String & from_path, const String & to_path);

11
src/Disks/DiskType.cpp Normal file
View File

@ -0,0 +1,11 @@
#include "DiskType.h"
namespace DB
{
bool DataSourceDescription::operator==(const DataSourceDescription & other) const
{
return std::tie(type, description, is_encrypted) == std::tie(other.type, other.description, other.is_encrypted);
}
}

View File

@ -5,40 +5,45 @@
namespace DB namespace DB
{ {
enum class DiskType enum class DataSourceType
{ {
Local, Local,
RAM, RAM,
S3, S3,
HDFS, HDFS,
Encrypted,
WebServer, WebServer,
AzureBlobStorage, AzureBlobStorage,
Cache,
}; };
inline String toString(DiskType disk_type) inline String toString(DataSourceType data_source_type)
{ {
switch (disk_type) switch (data_source_type)
{ {
case DiskType::Local: case DataSourceType::Local:
return "local"; return "local";
case DiskType::RAM: case DataSourceType::RAM:
return "memory"; return "memory";
case DiskType::S3: case DataSourceType::S3:
return "s3"; return "s3";
case DiskType::HDFS: case DataSourceType::HDFS:
return "hdfs"; return "hdfs";
case DiskType::Encrypted: case DataSourceType::WebServer:
return "encrypted";
case DiskType::WebServer:
return "web"; return "web";
case DiskType::AzureBlobStorage: case DataSourceType::AzureBlobStorage:
return "azure_blob_storage"; return "azure_blob_storage";
case DiskType::Cache:
return "cache";
} }
__builtin_unreachable(); __builtin_unreachable();
} }
struct DataSourceDescription
{
DataSourceType type;
std::string description;
bool is_encrypted = false;
bool is_cached = false;
bool operator==(const DataSourceDescription & other) const;
};
} }

View File

@ -113,7 +113,7 @@ void IDisk::copyDirectoryContent(const String & from_dir, const std::shared_ptr<
void IDisk::truncateFile(const String &, size_t) void IDisk::truncateFile(const String &, size_t)
{ {
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Truncate operation is not implemented for disk of type {}", getType()); throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Truncate operation is not implemented for disk of type {}", getDataSourceDescription().type);
} }
SyncGuardPtr IDisk::getDirectorySyncGuard(const String & /* path */) const SyncGuardPtr IDisk::getDirectorySyncGuard(const String & /* path */) const
@ -121,18 +121,4 @@ SyncGuardPtr IDisk::getDirectorySyncGuard(const String & /* path */) const
return nullptr; return nullptr;
} }
MetadataStoragePtr IDisk::getMetadataStorage()
{
if (isRemote())
{
return std::make_shared<MetadataStorageFromDisk>(std::static_pointer_cast<IDisk>(shared_from_this()), "");
}
else
{
auto object_storage = std::make_shared<LocalObjectStorage>();
return std::make_shared<FakeMetadataStorageFromDisk>(
std::static_pointer_cast<IDisk>(shared_from_this()), object_storage, getPath());
}
}
} }

View File

@ -227,7 +227,7 @@ public:
virtual NameSet getCacheLayersNames() const virtual NameSet getCacheLayersNames() const
{ {
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method `getCacheLayersNames()` is not implemented for disk: {}", getType()); throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method `getCacheLayersNames()` is not implemented for disk: {}", getDataSourceDescription().type);
} }
/// Returns a list of storage objects (contains path, size, ...). /// Returns a list of storage objects (contains path, size, ...).
@ -235,7 +235,7 @@ public:
/// be multiple files in remote fs for single clickhouse file. /// be multiple files in remote fs for single clickhouse file.
virtual StoredObjects getStorageObjects(const String &) const virtual StoredObjects getStorageObjects(const String &) const
{ {
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method `getStorageObjects() not implemented for disk: {}`", getType()); throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method `getStorageObjects() not implemented for disk: {}`", getDataSourceDescription().type);
} }
/// For one local path there might be multiple remote paths in case of Log family engines. /// For one local path there might be multiple remote paths in case of Log family engines.
@ -243,7 +243,7 @@ public:
virtual void getRemotePathsRecursive(const String &, std::vector<LocalPathWithObjectStoragePaths> &) virtual void getRemotePathsRecursive(const String &, std::vector<LocalPathWithObjectStoragePaths> &)
{ {
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method `getRemotePathsRecursive() not implemented for disk: {}`", getType()); throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method `getRemotePathsRecursive() not implemented for disk: {}`", getDataSourceDescription().type);
} }
/// Batch request to remove multiple files. /// Batch request to remove multiple files.
@ -271,8 +271,8 @@ public:
/// Truncate file to specified size. /// Truncate file to specified size.
virtual void truncateFile(const String & path, size_t size); virtual void truncateFile(const String & path, size_t size);
/// Return disk type - "local", "s3", etc. /// Return data source description
virtual DiskType getType() const = 0; virtual DataSourceDescription getDataSourceDescription() const = 0;
/// Involves network interaction. /// Involves network interaction.
virtual bool isRemote() const = 0; virtual bool isRemote() const = 0;
@ -321,7 +321,7 @@ public:
/// Actually it's a part of IDiskRemote implementation but we have so /// Actually it's a part of IDiskRemote implementation but we have so
/// complex hierarchy of disks (with decorators), so we cannot even /// complex hierarchy of disks (with decorators), so we cannot even
/// dynamic_cast some pointer to IDisk to pointer to IDiskRemote. /// dynamic_cast some pointer to IDisk to pointer to IDiskRemote.
virtual MetadataStoragePtr getMetadataStorage(); virtual MetadataStoragePtr getMetadataStorage() = 0;
/// Very similar case as for getMetadataDiskIfExistsOrSelf(). If disk has "metadata" /// Very similar case as for getMetadataDiskIfExistsOrSelf(). If disk has "metadata"
/// it will return mapping for each required path: path -> metadata as string. /// it will return mapping for each required path: path -> metadata as string.
@ -357,7 +357,7 @@ public:
throw Exception( throw Exception(
ErrorCodes::NOT_IMPLEMENTED, ErrorCodes::NOT_IMPLEMENTED,
"Method createDiskObjectStorage() is not implemented for disk type: {}", "Method createDiskObjectStorage() is not implemented for disk type: {}",
getType()); getDataSourceDescription().type);
} }
virtual bool supportsStat() const { return false; } virtual bool supportsStat() const { return false; }

View File

@ -32,6 +32,10 @@ AzureObjectStorage::AzureObjectStorage(
, settings(std::move(settings_)) , settings(std::move(settings_))
, log(&Poco::Logger::get("AzureObjectStorage")) , log(&Poco::Logger::get("AzureObjectStorage"))
{ {
data_source_description.type = DataSourceType::AzureBlobStorage;
data_source_description.description = client.get()->GetUrl();
data_source_description.is_cached = false;
data_source_description.is_encrypted = false;
} }
std::string AzureObjectStorage::generateBlobNameForPath(const std::string & /* path */) std::string AzureObjectStorage::generateBlobNameForPath(const std::string & /* path */)

View File

@ -57,6 +57,8 @@ public:
AzureClientPtr && client_, AzureClientPtr && client_,
SettingsPtr && settings_); SettingsPtr && settings_);
DataSourceDescription getDataSourceDescription() const override { return data_source_description; }
std::string getName() const override { return "AzureObjectStorage"; } std::string getName() const override { return "AzureObjectStorage"; }
bool exists(const StoredObject & object) const override; bool exists(const StoredObject & object) const override;
@ -128,6 +130,8 @@ private:
MultiVersion<AzureObjectStorageSettings> settings; MultiVersion<AzureObjectStorageSettings> settings;
Poco::Logger * log; Poco::Logger * log;
DataSourceDescription data_source_description;
}; };
} }

View File

@ -90,7 +90,6 @@ void registerDiskAzureBlobStorage(DiskFactory & factory)
"DiskAzureBlobStorage", "DiskAzureBlobStorage",
std::move(metadata_storage), std::move(metadata_storage),
std::move(azure_object_storage), std::move(azure_object_storage),
DiskType::AzureBlobStorage,
send_metadata, send_metadata,
copy_thread_pool_size copy_thread_pool_size
); );

View File

@ -34,6 +34,13 @@ CachedObjectStorage::CachedObjectStorage(
cache->initialize(); cache->initialize();
} }
DataSourceDescription CachedObjectStorage::getDataSourceDescription() const
{
auto wrapped_object_storage_data_source = object_storage->getDataSourceDescription();
wrapped_object_storage_data_source.is_cached = true;
return wrapped_object_storage_data_source;
}
FileCache::Key CachedObjectStorage::getCacheKey(const std::string & path) const FileCache::Key CachedObjectStorage::getCacheKey(const std::string & path) const
{ {
return cache->hash(path); return cache->hash(path);

View File

@ -20,6 +20,8 @@ class CachedObjectStorage final : public IObjectStorage
public: public:
CachedObjectStorage(ObjectStoragePtr object_storage_, FileCachePtr cache_, const FileCacheSettings & cache_settings_, const String & cache_config_name_); CachedObjectStorage(ObjectStoragePtr object_storage_, FileCachePtr cache_, const FileCacheSettings & cache_settings_, const String & cache_config_name_);
DataSourceDescription getDataSourceDescription() const override;
std::string getName() const override { return fmt::format("CachedObjectStorage-{}({})", cache_config_name, object_storage->getName()); } std::string getName() const override { return fmt::format("CachedObjectStorage-{}({})", cache_config_name, object_storage->getName()); }
bool exists(const StoredObject & object) const override; bool exists(const StoredObject & object) const override;

View File

@ -103,14 +103,12 @@ DiskObjectStorage::DiskObjectStorage(
const String & log_name, const String & log_name,
MetadataStoragePtr metadata_storage_, MetadataStoragePtr metadata_storage_,
ObjectStoragePtr object_storage_, ObjectStoragePtr object_storage_,
DiskType disk_type_,
bool send_metadata_, bool send_metadata_,
uint64_t thread_pool_size_) uint64_t thread_pool_size_)
: IDisk(getAsyncExecutor(log_name, thread_pool_size_)) : IDisk(getAsyncExecutor(log_name, thread_pool_size_))
, name(name_) , name(name_)
, object_storage_root_path(object_storage_root_path_) , object_storage_root_path(object_storage_root_path_)
, log (&Poco::Logger::get("DiskObjectStorage(" + log_name + ")")) , log (&Poco::Logger::get("DiskObjectStorage(" + log_name + ")"))
, disk_type(disk_type_)
, metadata_storage(std::move(metadata_storage_)) , metadata_storage(std::move(metadata_storage_))
, object_storage(std::move(object_storage_)) , object_storage(std::move(object_storage_))
, send_metadata(send_metadata_) , send_metadata(send_metadata_)
@ -216,6 +214,22 @@ void DiskObjectStorage::moveFile(const String & from_path, const String & to_pat
transaction->commit(); transaction->commit();
} }
void DiskObjectStorage::copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path)
{
/// It's the same object storage disk
if (this == to_disk.get())
{
auto transaction = createObjectStorageTransaction();
transaction->copyFile(from_path, to_path);
transaction->commit();
}
else
{
IDisk::copy(from_path, to_disk, to_path);
}
}
void DiskObjectStorage::moveFile(const String & from_path, const String & to_path) void DiskObjectStorage::moveFile(const String & from_path, const String & to_path)
{ {
moveFile(from_path, to_path, send_metadata); moveFile(from_path, to_path, send_metadata);
@ -469,7 +483,6 @@ DiskObjectStoragePtr DiskObjectStorage::createDiskObjectStorage()
getName(), getName(),
metadata_storage, metadata_storage,
object_storage, object_storage,
disk_type,
send_metadata, send_metadata,
threadpool_size); threadpool_size);
} }

View File

@ -34,14 +34,13 @@ public:
const String & log_name, const String & log_name,
MetadataStoragePtr metadata_storage_, MetadataStoragePtr metadata_storage_,
ObjectStoragePtr object_storage_, ObjectStoragePtr object_storage_,
DiskType disk_type_,
bool send_metadata_, bool send_metadata_,
uint64_t thread_pool_size_); uint64_t thread_pool_size_);
/// Create fake transaction /// Create fake transaction
DiskTransactionPtr createTransaction() override; DiskTransactionPtr createTransaction() override;
DiskType getType() const override { return disk_type; } DataSourceDescription getDataSourceDescription() const override { return object_storage->getDataSourceDescription(); }
bool supportZeroCopyReplication() const override { return true; } bool supportZeroCopyReplication() const override { return true; }
@ -154,6 +153,8 @@ public:
WriteMode mode, WriteMode mode,
const WriteSettings & settings) override; const WriteSettings & settings) override;
void copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path) override;
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context_, const String &, const DisksMap &) override; void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context_, const String &, const DisksMap &) override;
void restoreMetadataIfNeeded(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context); void restoreMetadataIfNeeded(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context);
@ -206,7 +207,6 @@ private:
const String object_storage_root_path; const String object_storage_root_path;
Poco::Logger * log; Poco::Logger * log;
const DiskType disk_type;
MetadataStoragePtr metadata_storage; MetadataStoragePtr metadata_storage;
ObjectStoragePtr object_storage; ObjectStoragePtr object_storage;

View File

@ -48,10 +48,20 @@ public:
, hdfs_builder(createHDFSBuilder(hdfs_root_path_, config)) , hdfs_builder(createHDFSBuilder(hdfs_root_path_, config))
, hdfs_fs(createHDFSFS(hdfs_builder.get())) , hdfs_fs(createHDFSFS(hdfs_builder.get()))
, settings(std::move(settings_)) , settings(std::move(settings_))
{} {
data_source_description.type = DataSourceType::HDFS;
data_source_description.description = hdfs_root_path_;
data_source_description.is_cached = false;
data_source_description.is_encrypted = false;
}
std::string getName() const override { return "HDFSObjectStorage"; } std::string getName() const override { return "HDFSObjectStorage"; }
DataSourceDescription getDataSourceDescription() const override
{
return data_source_description;
}
bool exists(const StoredObject & object) const override; bool exists(const StoredObject & object) const override;
std::unique_ptr<ReadBufferFromFileBase> readObject( /// NOLINT std::unique_ptr<ReadBufferFromFileBase> readObject( /// NOLINT
@ -121,6 +131,8 @@ private:
HDFSFSPtr hdfs_fs; HDFSFSPtr hdfs_fs;
SettingsPtr settings; SettingsPtr settings;
DataSourceDescription data_source_description;
}; };
} }

View File

@ -49,7 +49,6 @@ void registerDiskHDFS(DiskFactory & factory)
"DiskHDFS", "DiskHDFS",
std::move(metadata_storage), std::move(metadata_storage),
std::move(hdfs_storage), std::move(hdfs_storage),
DiskType::HDFS,
/* send_metadata = */ false, /* send_metadata = */ false,
copy_thread_pool_size); copy_thread_pool_size);

View File

@ -15,6 +15,7 @@
#include <Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h> #include <Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h>
#include <Disks/ObjectStorages/StoredObject.h> #include <Disks/ObjectStorages/StoredObject.h>
#include <Disks/DiskType.h>
#include <Common/ThreadPool.h> #include <Common/ThreadPool.h>
#include <Common/FileCache.h> #include <Common/FileCache.h>
#include <Disks/WriteMode.h> #include <Disks/WriteMode.h>
@ -58,6 +59,8 @@ class IObjectStorage
public: public:
IObjectStorage() = default; IObjectStorage() = default;
virtual DataSourceDescription getDataSourceDescription() const = 0;
virtual std::string getName() const = 0; virtual std::string getName() const = 0;
/// Object exists or not /// Object exists or not

View File

@ -28,6 +28,14 @@ namespace ErrorCodes
LocalObjectStorage::LocalObjectStorage() LocalObjectStorage::LocalObjectStorage()
: log(&Poco::Logger::get("LocalObjectStorage")) : log(&Poco::Logger::get("LocalObjectStorage"))
{ {
data_source_description.type = DataSourceType::Local;
if (auto block_device_id = tryGetBlockDeviceId("/"); block_device_id.has_value())
data_source_description.description = *block_device_id;
else
data_source_description.description = "/";
data_source_description.is_cached = false;
data_source_description.is_encrypted = false;
} }
bool LocalObjectStorage::exists(const StoredObject & object) const bool LocalObjectStorage::exists(const StoredObject & object) const

View File

@ -17,6 +17,8 @@ class LocalObjectStorage : public IObjectStorage
public: public:
LocalObjectStorage(); LocalObjectStorage();
DataSourceDescription getDataSourceDescription() const override { return data_source_description; }
std::string getName() const override { return "LocalObjectStorage"; } std::string getName() const override { return "LocalObjectStorage"; }
bool exists(const StoredObject & object) const override; bool exists(const StoredObject & object) const override;
@ -86,6 +88,7 @@ public:
private: private:
Poco::Logger * log; Poco::Logger * log;
DataSourceDescription data_source_description;
}; };
} }

View File

@ -2,6 +2,8 @@
#if USE_AWS_S3 #if USE_AWS_S3
#include <IO/S3Common.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h> #include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/ObjectStorages/DiskObjectStorageCommon.h> #include <Disks/ObjectStorages/DiskObjectStorageCommon.h>
#include <Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h> #include <Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h>
@ -25,6 +27,7 @@
#include <aws/s3/model/AbortMultipartUploadRequest.h> #include <aws/s3/model/AbortMultipartUploadRequest.h>
#include <Common/getRandomASCIIString.h> #include <Common/getRandomASCIIString.h>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
#include <Common/MultiVersion.h> #include <Common/MultiVersion.h>
@ -369,6 +372,15 @@ void S3ObjectStorage::copyObjectImpl(
} }
throwIfError(outcome); throwIfError(outcome);
auto settings_ptr = s3_settings.get();
if (settings_ptr->s3_settings.check_objects_after_upload)
{
auto object_head = requestObjectHeadData(dst_bucket, dst_key);
if (!object_head.IsSuccess())
throw Exception(ErrorCodes::S3_ERROR, "Object {} from bucket {} disappeared immediately after upload, it's a bug in S3 or S3 API.", dst_key, dst_bucket);
}
} }
void S3ObjectStorage::copyObjectMultipartImpl( void S3ObjectStorage::copyObjectMultipartImpl(
@ -450,6 +462,14 @@ void S3ObjectStorage::copyObjectMultipartImpl(
throwIfError(outcome); throwIfError(outcome);
} }
if (settings_ptr->s3_settings.check_objects_after_upload)
{
auto object_head = requestObjectHeadData(dst_bucket, dst_key);
if (!object_head.IsSuccess())
throw Exception(ErrorCodes::S3_ERROR, "Object {} from bucket {} disappeared immediately after upload, it's a bug in S3 or S3 API.", dst_key, dst_bucket);
}
} }
void S3ObjectStorage::copyObject( // NOLINT void S3ObjectStorage::copyObject( // NOLINT
@ -511,7 +531,8 @@ std::unique_ptr<IObjectStorage> S3ObjectStorage::cloneObjectStorage(
return std::make_unique<S3ObjectStorage>( return std::make_unique<S3ObjectStorage>(
getClient(config, config_prefix, context), getClient(config, config_prefix, context),
getSettings(config, config_prefix, context), getSettings(config, config_prefix, context),
version_id, s3_capabilities, new_namespace); version_id, s3_capabilities, new_namespace,
S3::URI(Poco::URI(config.getString(config_prefix + ".endpoint"))).endpoint);
} }
} }

View File

@ -48,13 +48,23 @@ public:
std::unique_ptr<S3ObjectStorageSettings> && s3_settings_, std::unique_ptr<S3ObjectStorageSettings> && s3_settings_,
String version_id_, String version_id_,
const S3Capabilities & s3_capabilities_, const S3Capabilities & s3_capabilities_,
String bucket_) String bucket_,
String connection_string)
: bucket(bucket_) : bucket(bucket_)
, client(std::move(client_)) , client(std::move(client_))
, s3_settings(std::move(s3_settings_)) , s3_settings(std::move(s3_settings_))
, s3_capabilities(s3_capabilities_) , s3_capabilities(s3_capabilities_)
, version_id(std::move(version_id_)) , version_id(std::move(version_id_))
{ {
data_source_description.type = DataSourceType::S3;
data_source_description.description = connection_string;
data_source_description.is_cached = false;
data_source_description.is_encrypted = false;
}
DataSourceDescription getDataSourceDescription() const override
{
return data_source_description;
} }
std::string getName() const override { return "S3ObjectStorage"; } std::string getName() const override { return "S3ObjectStorage"; }
@ -169,6 +179,8 @@ private:
S3Capabilities s3_capabilities; S3Capabilities s3_capabilities;
const String version_id; const String version_id;
DataSourceDescription data_source_description;
}; };
} }

View File

@ -40,6 +40,7 @@ std::unique_ptr<S3ObjectStorageSettings> getSettings(const Poco::Util::AbstractC
rw_settings.upload_part_size_multiply_factor = config.getUInt64(config_prefix + ".s3_upload_part_size_multiply_factor", context->getSettingsRef().s3_upload_part_size_multiply_factor); rw_settings.upload_part_size_multiply_factor = config.getUInt64(config_prefix + ".s3_upload_part_size_multiply_factor", context->getSettingsRef().s3_upload_part_size_multiply_factor);
rw_settings.upload_part_size_multiply_parts_count_threshold = config.getUInt64(config_prefix + ".s3_upload_part_size_multiply_parts_count_threshold", context->getSettingsRef().s3_upload_part_size_multiply_parts_count_threshold); rw_settings.upload_part_size_multiply_parts_count_threshold = config.getUInt64(config_prefix + ".s3_upload_part_size_multiply_parts_count_threshold", context->getSettingsRef().s3_upload_part_size_multiply_parts_count_threshold);
rw_settings.max_single_part_upload_size = config.getUInt64(config_prefix + ".s3_max_single_part_upload_size", context->getSettingsRef().s3_max_single_part_upload_size); rw_settings.max_single_part_upload_size = config.getUInt64(config_prefix + ".s3_max_single_part_upload_size", context->getSettingsRef().s3_max_single_part_upload_size);
rw_settings.check_objects_after_upload = config.getUInt64(config_prefix + ".s3_check_objects_after_upload", context->getSettingsRef().s3_check_objects_after_upload);
return std::make_unique<S3ObjectStorageSettings>( return std::make_unique<S3ObjectStorageSettings>(
rw_settings, rw_settings,

View File

@ -130,7 +130,7 @@ void registerDiskS3(DiskFactory & factory)
auto s3_storage = std::make_unique<S3ObjectStorage>( auto s3_storage = std::make_unique<S3ObjectStorage>(
getClient(config, config_prefix, context), getClient(config, config_prefix, context),
getSettings(config, config_prefix, context), getSettings(config, config_prefix, context),
uri.version_id, s3_capabilities, uri.bucket); uri.version_id, s3_capabilities, uri.bucket, uri.endpoint);
bool skip_access_check = config.getBool(config_prefix + ".skip_access_check", false); bool skip_access_check = config.getBool(config_prefix + ".skip_access_check", false);
@ -159,7 +159,6 @@ void registerDiskS3(DiskFactory & factory)
"DiskS3", "DiskS3",
std::move(metadata_storage), std::move(metadata_storage),
std::move(s3_storage), std::move(s3_storage),
DiskType::S3,
send_metadata, send_metadata,
copy_thread_pool_size); copy_thread_pool_size);

View File

@ -20,6 +20,16 @@ class WebObjectStorage : public IObjectStorage, WithContext
public: public:
WebObjectStorage(const String & url_, ContextPtr context_); WebObjectStorage(const String & url_, ContextPtr context_);
DataSourceDescription getDataSourceDescription() const override
{
return DataSourceDescription{
.type = DataSourceType::WebServer,
.description = url,
.is_encrypted = false,
.is_cached = false,
};
}
std::string getName() const override { return "WebObjectStorage"; } std::string getName() const override { return "WebObjectStorage"; }
bool exists(const StoredObject & object) const override; bool exists(const StoredObject & object) const override;

View File

@ -47,7 +47,6 @@ void registerDiskWebServer(DiskFactory & factory)
"DiskWebServer", "DiskWebServer",
metadata_storage, metadata_storage,
object_storage, object_storage,
DiskType::WebServer,
/* send_metadata */false, /* send_metadata */false,
/* threadpool_size */16); /* threadpool_size */16);
}; };

View File

@ -1017,9 +1017,7 @@ inline bool tryParseImpl<DataTypeDate32>(DataTypeDate32::FieldType & x, ReadBuff
{ {
ExtendedDayNum tmp(0); ExtendedDayNum tmp(0);
if (!tryReadDateText(tmp, rb)) if (!tryReadDateText(tmp, rb))
{
return false; return false;
}
x = tmp; x = tmp;
return true; return true;
} }
@ -1102,9 +1100,27 @@ struct ConvertThroughParsing
if (in.eof()) if (in.eof())
return true; return true;
/// Special case, that allows to parse string with DateTime as Date. /// Special case, that allows to parse string with DateTime or DateTime64 as Date or Date32.
if (std::is_same_v<ToDataType, DataTypeDate> && (in.buffer().size()) == strlen("YYYY-MM-DD hh:mm:ss")) if constexpr (std::is_same_v<ToDataType, DataTypeDate> || std::is_same_v<ToDataType, DataTypeDate32>)
return true; {
if (!in.eof() && (*in.position() == ' ' || *in.position() == 'T'))
{
if (in.buffer().size() == strlen("YYYY-MM-DD hh:mm:ss"))
return true;
if (in.buffer().size() >= strlen("YYYY-MM-DD hh:mm:ss.x")
&& in.buffer().begin()[19] == '.')
{
in.position() = in.buffer().begin() + 20;
while (!in.eof() && isNumericASCII(*in.position()))
++in.position();
if (in.eof())
return true;
}
}
}
return false; return false;
} }
@ -1240,8 +1256,10 @@ struct ConvertThroughParsing
vec_to[i] = value; vec_to[i] = value;
} }
else if constexpr (IsDataTypeDecimal<ToDataType>) else if constexpr (IsDataTypeDecimal<ToDataType>)
{
SerializationDecimal<typename ToDataType::FieldType>::readText( SerializationDecimal<typename ToDataType::FieldType>::readText(
vec_to[i], read_buffer, ToDataType::maxPrecision(), col_to->getScale()); vec_to[i], read_buffer, ToDataType::maxPrecision(), col_to->getScale());
}
else else
{ {
parseImpl<ToDataType>(vec_to[i], read_buffer, local_time_zone); parseImpl<ToDataType>(vec_to[i], read_buffer, local_time_zone);
@ -1294,8 +1312,10 @@ struct ConvertThroughParsing
vec_to[i] = value; vec_to[i] = value;
} }
else if constexpr (IsDataTypeDecimal<ToDataType>) else if constexpr (IsDataTypeDecimal<ToDataType>)
{
parsed = SerializationDecimal<typename ToDataType::FieldType>::tryReadText( parsed = SerializationDecimal<typename ToDataType::FieldType>::tryReadText(
vec_to[i], read_buffer, ToDataType::maxPrecision(), col_to->getScale()); vec_to[i], read_buffer, ToDataType::maxPrecision(), col_to->getScale());
}
else else
parsed = tryParseImpl<ToDataType>(vec_to[i], read_buffer, local_time_zone); parsed = tryParseImpl<ToDataType>(vec_to[i], read_buffer, local_time_zone);
} }

View File

@ -85,6 +85,9 @@ class FunctionArrayMapped : public IFunction
{ {
public: public:
static constexpr auto name = Name::name; static constexpr auto name = Name::name;
static constexpr bool is_argument_type_map = std::is_same_v<typename Impl::data_type, DataTypeMap>;
static constexpr bool is_argument_type_array = std::is_same_v<typename Impl::data_type, DataTypeArray>;
static constexpr auto argument_type_name = is_argument_type_map ? "Map" : "Array";
static FunctionPtr create(ContextPtr) { return std::make_shared<FunctionArrayMapped>(); } static FunctionPtr create(ContextPtr) { return std::make_shared<FunctionArrayMapped>(); }
String getName() const override String getName() const override
@ -112,20 +115,25 @@ public:
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Function {} needs one argument with data", getName()); "Function {} needs one argument with data", getName());
size_t nested_types_count = std::is_same_v<typename Impl::data_type, DataTypeMap> ? (arguments.size() - 1) * 2 : (arguments.size() - 1); size_t nested_types_count = is_argument_type_map ? (arguments.size() - 1) * 2 : (arguments.size() - 1);
DataTypes nested_types(nested_types_count); DataTypes nested_types(nested_types_count);
for (size_t i = 0; i < arguments.size() - 1; ++i) for (size_t i = 0; i < arguments.size() - 1; ++i)
{ {
const auto * array_type = checkAndGetDataType<typename Impl::data_type>(&*arguments[i + 1]); const auto * array_type = checkAndGetDataType<typename Impl::data_type>(&*arguments[i + 1]);
if (!array_type) if (!array_type)
throw Exception("Argument " + toString(i + 2) + " of function " + getName() + " must be array. Found " throw Exception(
+ arguments[i + 1]->getName() + " instead.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
if constexpr (std::is_same_v<typename Impl::data_type, DataTypeMap>) "Argument {} of function {} must be {}. Found {} instead",
toString(i + 2),
getName(),
argument_type_name,
arguments[i + 1]->getName());
if constexpr (is_argument_type_map)
{ {
nested_types[2 * i] = recursiveRemoveLowCardinality(array_type->getKeyType()); nested_types[2 * i] = recursiveRemoveLowCardinality(array_type->getKeyType());
nested_types[2 * i + 1] = recursiveRemoveLowCardinality(array_type->getValueType()); nested_types[2 * i + 1] = recursiveRemoveLowCardinality(array_type->getValueType());
} }
else if constexpr (std::is_same_v<typename Impl::data_type, DataTypeArray>) else if constexpr (is_argument_type_array)
{ {
nested_types[i] = recursiveRemoveLowCardinality(array_type->getNestedType()); nested_types[i] = recursiveRemoveLowCardinality(array_type->getNestedType());
} }
@ -149,7 +157,7 @@ public:
"Function {} needs at least {} argument, passed {}", "Function {} needs at least {} argument, passed {}",
getName(), min_args, arguments.size()); getName(), min_args, arguments.size());
if ((arguments.size() == 1) && std::is_same_v<typename Impl::data_type, DataTypeArray>) if ((arguments.size() == 1) && is_argument_type_array)
{ {
const auto * data_type = checkAndGetDataType<typename Impl::data_type>(arguments[0].type.get()); const auto * data_type = checkAndGetDataType<typename Impl::data_type>(arguments[0].type.get());
@ -163,7 +171,7 @@ public:
throw Exception("The only argument for function " + getName() + " must be array of UInt8. Found " throw Exception("The only argument for function " + getName() + " must be array of UInt8. Found "
+ arguments[0].type->getName() + " instead", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + arguments[0].type->getName() + " instead", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
if constexpr (std::is_same_v<typename Impl::data_type, DataTypeArray>) if constexpr (is_argument_type_array)
return Impl::getReturnType(nested_type, nested_type); return Impl::getReturnType(nested_type, nested_type);
else else
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Unreachable code reached"); throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Unreachable code reached");
@ -193,10 +201,7 @@ public:
throw Exception("Expression for function " + getName() + " must return UInt8 or Nullable(UInt8), found " throw Exception("Expression for function " + getName() + " must return UInt8 or Nullable(UInt8), found "
+ return_type->getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + return_type->getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
static_assert( static_assert(is_argument_type_map || is_argument_type_array, "unsupported type");
std::is_same_v<typename Impl::data_type, DataTypeMap> ||
std::is_same_v<typename Impl::data_type, DataTypeArray>,
"unsupported type");
if (arguments.size() < 2) if (arguments.size() < 2)
{ {
@ -208,10 +213,10 @@ public:
if (!first_array_type) if (!first_array_type)
throw DB::Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Unsupported type {}", arguments[1].type->getName()); throw DB::Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Unsupported type {}", arguments[1].type->getName());
if constexpr (std::is_same_v<typename Impl::data_type, DataTypeArray>) if constexpr (is_argument_type_array)
return Impl::getReturnType(return_type, first_array_type->getNestedType()); return Impl::getReturnType(return_type, first_array_type->getNestedType());
if constexpr (std::is_same_v<typename Impl::data_type, DataTypeMap>) if constexpr (is_argument_type_map)
return Impl::getReturnType(return_type, first_array_type->getKeyValueTypes()); return Impl::getReturnType(return_type, first_array_type->getKeyValueTypes());
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Unreachable code reached"); throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Unreachable code reached");
@ -229,7 +234,11 @@ public:
{ {
const ColumnConst * column_const_array = checkAndGetColumnConst<typename Impl::column_type>(column_array_ptr.get()); const ColumnConst * column_const_array = checkAndGetColumnConst<typename Impl::column_type>(column_array_ptr.get());
if (!column_const_array) if (!column_const_array)
throw Exception("Expected array column, found " + column_array_ptr->getName(), ErrorCodes::ILLEGAL_COLUMN); throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"Expected {} column, found {}",
argument_type_name,
column_array_ptr->getName());
column_array_ptr = column_const_array->convertToFullColumn(); column_array_ptr = column_const_array->convertToFullColumn();
column_array = assert_cast<const typename Impl::column_type *>(column_array_ptr.get()); column_array = assert_cast<const typename Impl::column_type *>(column_array_ptr.get());
} }
@ -279,13 +288,15 @@ public:
{ {
const ColumnConst * column_const_array = checkAndGetColumnConst<typename Impl::column_type>(column_array_ptr.get()); const ColumnConst * column_const_array = checkAndGetColumnConst<typename Impl::column_type>(column_array_ptr.get());
if (!column_const_array) if (!column_const_array)
throw Exception("Expected array column, found " + column_array_ptr->getName(), ErrorCodes::ILLEGAL_COLUMN); throw Exception(
ErrorCodes::ILLEGAL_COLUMN, "Expected {} column, found {}", argument_type_name, column_array_ptr->getName());
column_array_ptr = recursiveRemoveLowCardinality(column_const_array->convertToFullColumn()); column_array_ptr = recursiveRemoveLowCardinality(column_const_array->convertToFullColumn());
column_array = checkAndGetColumn<typename Impl::column_type>(column_array_ptr.get()); column_array = checkAndGetColumn<typename Impl::column_type>(column_array_ptr.get());
} }
if (!array_type) if (!array_type)
throw Exception("Expected array type, found " + array_type_ptr->getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Expected {} type, found {}", argument_type_name, array_type_ptr->getName());
if (!offsets_column) if (!offsets_column)
{ {
@ -296,7 +307,11 @@ public:
/// The first condition is optimization: do not compare data if the pointers are equal. /// The first condition is optimization: do not compare data if the pointers are equal.
if (getOffsetsPtr(*column_array) != offsets_column if (getOffsetsPtr(*column_array) != offsets_column
&& getOffsets(*column_array) != typeid_cast<const ColumnArray::ColumnOffsets &>(*offsets_column).getData()) && getOffsets(*column_array) != typeid_cast<const ColumnArray::ColumnOffsets &>(*offsets_column).getData())
throw Exception("Arrays passed to " + getName() + " must have equal size", ErrorCodes::SIZES_OF_ARRAYS_DOESNT_MATCH); throw Exception(
ErrorCodes::SIZES_OF_ARRAYS_DOESNT_MATCH,
"{}s passed to {} must have equal size",
argument_type_name,
getName());
} }
if (i == 1) if (i == 1)
@ -305,7 +320,7 @@ public:
column_first_array = column_array; column_first_array = column_array;
} }
if constexpr (std::is_same_v<DataTypeMap, typename Impl::data_type>) if constexpr (is_argument_type_map)
{ {
arrays.emplace_back(ColumnWithTypeAndName( arrays.emplace_back(ColumnWithTypeAndName(
column_array->getNestedData().getColumnPtr(0), recursiveRemoveLowCardinality(array_type->getKeyType()), array_with_type_and_name.name+".key")); column_array->getNestedData().getColumnPtr(0), recursiveRemoveLowCardinality(array_type->getKeyType()), array_with_type_and_name.name+".key"));

View File

@ -969,10 +969,12 @@ ReturnType readDateTimeTextFallback(time_t & datetime, ReadBuffer & buf, const D
{ {
static constexpr bool throw_exception = std::is_same_v<ReturnType, void>; static constexpr bool throw_exception = std::is_same_v<ReturnType, void>;
/// YYYY-MM-DD hh:mm:ss
static constexpr auto date_time_broken_down_length = 19;
/// YYYY-MM-DD /// YYYY-MM-DD
static constexpr auto date_broken_down_length = 10; static constexpr auto date_broken_down_length = 10;
/// hh:mm:ss
static constexpr auto time_broken_down_length = 8;
/// YYYY-MM-DD hh:mm:ss
static constexpr auto date_time_broken_down_length = date_broken_down_length + 1 + time_broken_down_length;
char s[date_time_broken_down_length]; char s[date_time_broken_down_length];
char * s_pos = s; char * s_pos = s;
@ -995,16 +997,15 @@ ReturnType readDateTimeTextFallback(time_t & datetime, ReadBuffer & buf, const D
if (s_pos == s + 4 && !buf.eof() && !isNumericASCII(*buf.position())) if (s_pos == s + 4 && !buf.eof() && !isNumericASCII(*buf.position()))
{ {
const auto already_read_length = s_pos - s; const auto already_read_length = s_pos - s;
const size_t remaining_date_time_size = date_time_broken_down_length - already_read_length;
const size_t remaining_date_size = date_broken_down_length - already_read_length; const size_t remaining_date_size = date_broken_down_length - already_read_length;
size_t size = buf.read(s_pos, remaining_date_time_size); size_t size = buf.read(s_pos, remaining_date_size);
if (size != remaining_date_time_size && size != remaining_date_size) if (size != remaining_date_size)
{ {
s_pos[size] = 0; s_pos[size] = 0;
if constexpr (throw_exception) if constexpr (throw_exception)
throw ParsingException(std::string("Cannot parse datetime ") + s, ErrorCodes::CANNOT_PARSE_DATETIME); throw ParsingException(std::string("Cannot parse DateTime ") + s, ErrorCodes::CANNOT_PARSE_DATETIME);
else else
return false; return false;
} }
@ -1017,11 +1018,24 @@ ReturnType readDateTimeTextFallback(time_t & datetime, ReadBuffer & buf, const D
UInt8 minute = 0; UInt8 minute = 0;
UInt8 second = 0; UInt8 second = 0;
if (size == remaining_date_time_size) if (!buf.eof() && (*buf.position() == ' ' || *buf.position() == 'T'))
{ {
hour = (s[11] - '0') * 10 + (s[12] - '0'); ++buf.position();
minute = (s[14] - '0') * 10 + (s[15] - '0'); size = buf.read(s, time_broken_down_length);
second = (s[17] - '0') * 10 + (s[18] - '0');
if (size != time_broken_down_length)
{
s_pos[size] = 0;
if constexpr (throw_exception)
throw ParsingException(std::string("Cannot parse time component of DateTime ") + s, ErrorCodes::CANNOT_PARSE_DATETIME);
else
return false;
}
hour = (s[0] - '0') * 10 + (s[1] - '0');
minute = (s[3] - '0') * 10 + (s[4] - '0');
second = (s[6] - '0') * 10 + (s[7] - '0');
} }
if (unlikely(year == 0)) if (unlikely(year == 0))

View File

@ -736,6 +736,7 @@ inline ReturnType readDateTextImpl(ExtendedDayNum & date, ReadBuffer & buf)
readDateTextImpl<ReturnType>(local_date, buf); readDateTextImpl<ReturnType>(local_date, buf);
else if (!readDateTextImpl<ReturnType>(local_date, buf)) else if (!readDateTextImpl<ReturnType>(local_date, buf))
return false; return false;
/// When the parameter is out of rule or out of range, Date32 uses 1925-01-01 as the default value (-DateLUT::instance().getDayNumOffsetEpoch(), -16436) and Date uses 1970-01-01. /// When the parameter is out of rule or out of range, Date32 uses 1925-01-01 as the default value (-DateLUT::instance().getDayNumOffsetEpoch(), -16436) and Date uses 1970-01-01.
date = DateLUT::instance().makeDayNum(local_date.year(), local_date.month(), local_date.day(), -static_cast<Int32>(DateLUT::instance().getDayNumOffsetEpoch())); date = DateLUT::instance().makeDayNum(local_date.year(), local_date.month(), local_date.day(), -static_cast<Int32>(DateLUT::instance().getDayNumOffsetEpoch()));
return ReturnType(true); return ReturnType(true);
@ -856,10 +857,10 @@ inline ReturnType readDateTimeTextImpl(time_t & datetime, ReadBuffer & buf, cons
const char * s = buf.position(); const char * s = buf.position();
/// YYYY-MM-DD hh:mm:ss /// YYYY-MM-DD hh:mm:ss
static constexpr auto DateTimeStringInputSize = 19; static constexpr auto date_time_broken_down_length = 19;
///YYYY-MM-DD /// YYYY-MM-DD
static constexpr auto DateStringInputSize = 10; static constexpr auto date_broken_down_length = 10;
bool optimistic_path_for_date_time_input = s + DateTimeStringInputSize <= buf.buffer().end(); bool optimistic_path_for_date_time_input = s + date_time_broken_down_length <= buf.buffer().end();
if (optimistic_path_for_date_time_input) if (optimistic_path_for_date_time_input)
{ {
@ -872,7 +873,8 @@ inline ReturnType readDateTimeTextImpl(time_t & datetime, ReadBuffer & buf, cons
UInt8 hour = 0; UInt8 hour = 0;
UInt8 minute = 0; UInt8 minute = 0;
UInt8 second = 0; UInt8 second = 0;
///simply determine whether it is YYYY-MM-DD hh:mm:ss or YYYY-MM-DD by the content of the tenth character in an optimistic scenario
/// Simply determine whether it is YYYY-MM-DD hh:mm:ss or YYYY-MM-DD by the content of the tenth character in an optimistic scenario
bool dt_long = (s[10] == ' ' || s[10] == 'T'); bool dt_long = (s[10] == ' ' || s[10] == 'T');
if (dt_long) if (dt_long)
{ {
@ -887,9 +889,10 @@ inline ReturnType readDateTimeTextImpl(time_t & datetime, ReadBuffer & buf, cons
datetime = date_lut.makeDateTime(year, month, day, hour, minute, second); datetime = date_lut.makeDateTime(year, month, day, hour, minute, second);
if (dt_long) if (dt_long)
buf.position() += DateTimeStringInputSize; buf.position() += date_time_broken_down_length;
else else
buf.position() += DateStringInputSize; buf.position() += date_broken_down_length;
return ReturnType(true); return ReturnType(true);
} }
else else
@ -961,7 +964,13 @@ inline ReturnType readDateTimeTextImpl(DateTime64 & datetime64, UInt32 scale, Re
components.whole = components.whole / common::exp10_i32(scale); components.whole = components.whole / common::exp10_i32(scale);
} }
datetime64 = negative_multiplier * DecimalUtils::decimalFromComponents<DateTime64>(components, scale); if constexpr (std::is_same_v<ReturnType, void>)
datetime64 = DecimalUtils::decimalFromComponents<DateTime64>(components, scale);
else
DecimalUtils::tryGetDecimalFromComponents<DateTime64>(components, scale, datetime64);
datetime64 *= negative_multiplier;
return ReturnType(true); return ReturnType(true);
} }
@ -988,21 +997,33 @@ inline bool tryReadDateTime64Text(DateTime64 & datetime64, UInt32 scale, ReadBuf
inline void readDateTimeText(LocalDateTime & datetime, ReadBuffer & buf) inline void readDateTimeText(LocalDateTime & datetime, ReadBuffer & buf)
{ {
char s[19]; char s[10];
size_t size = buf.read(s, 19); size_t size = buf.read(s, 10);
if (19 != size) if (10 != size)
{ {
s[size] = 0; s[size] = 0;
throw ParsingException(std::string("Cannot parse datetime ") + s, ErrorCodes::CANNOT_PARSE_DATETIME); throw ParsingException(std::string("Cannot parse DateTime ") + s, ErrorCodes::CANNOT_PARSE_DATETIME);
} }
datetime.year((s[0] - '0') * 1000 + (s[1] - '0') * 100 + (s[2] - '0') * 10 + (s[3] - '0')); datetime.year((s[0] - '0') * 1000 + (s[1] - '0') * 100 + (s[2] - '0') * 10 + (s[3] - '0'));
datetime.month((s[5] - '0') * 10 + (s[6] - '0')); datetime.month((s[5] - '0') * 10 + (s[6] - '0'));
datetime.day((s[8] - '0') * 10 + (s[9] - '0')); datetime.day((s[8] - '0') * 10 + (s[9] - '0'));
datetime.hour((s[11] - '0') * 10 + (s[12] - '0')); /// Allow to read Date as DateTime
datetime.minute((s[14] - '0') * 10 + (s[15] - '0')); if (buf.eof() || !(*buf.position() == ' ' || *buf.position() == 'T'))
datetime.second((s[17] - '0') * 10 + (s[18] - '0')); return;
++buf.position();
size = buf.read(s, 8);
if (8 != size)
{
s[size] = 0;
throw ParsingException(std::string("Cannot parse time component of DateTime ") + s, ErrorCodes::CANNOT_PARSE_DATETIME);
}
datetime.hour((s[0] - '0') * 10 + (s[1] - '0'));
datetime.minute((s[3] - '0') * 10 + (s[4] - '0'));
datetime.second((s[6] - '0') * 10 + (s[7] - '0'));
} }

View File

@ -35,9 +35,8 @@ WriteBufferFromHTTP::WriteBufferFromHTTP(
void WriteBufferFromHTTP::finalizeImpl() void WriteBufferFromHTTP::finalizeImpl()
{ {
// for compressed body, the data is stored in buffered first // Make sure the content in the buffer has been flushed
// here, make sure the content in the buffer has been flushed this->next();
this->nextImpl();
receiveResponse(*session, request, response, false); receiveResponse(*session, request, response, false);
/// TODO: Response body is ignored. /// TODO: Response body is ignored.

View File

@ -15,6 +15,7 @@
#include <aws/s3/model/CompleteMultipartUploadRequest.h> #include <aws/s3/model/CompleteMultipartUploadRequest.h>
#include <aws/s3/model/PutObjectRequest.h> #include <aws/s3/model/PutObjectRequest.h>
#include <aws/s3/model/UploadPartRequest.h> #include <aws/s3/model/UploadPartRequest.h>
#include <aws/s3/model/HeadObjectRequest.h>
#include <utility> #include <utility>
@ -164,6 +165,20 @@ void WriteBufferFromS3::finalizeImpl()
if (!multipart_upload_id.empty()) if (!multipart_upload_id.empty())
completeMultipartUpload(); completeMultipartUpload();
if (s3_settings.check_objects_after_upload)
{
LOG_TRACE(log, "Checking object {} exists after upload", key);
Aws::S3::Model::HeadObjectRequest request;
request.SetBucket(bucket);
request.SetKey(key);
auto response = client_ptr->HeadObject(request);
if (!response.IsSuccess())
throw Exception(ErrorCodes::S3_ERROR, "Object {} from bucket {} disappeared immediately after upload, it's a bug in S3 or S3 API.", key, bucket);
}
} }
void WriteBufferFromS3::createMultipartUpload() void WriteBufferFromS3::createMultipartUpload()

View File

@ -240,7 +240,7 @@ void TabSeparatedFormatReader::checkNullValueForNonNullable(DataTypePtr type)
void TabSeparatedFormatReader::skipPrefixBeforeHeader() void TabSeparatedFormatReader::skipPrefixBeforeHeader()
{ {
for (size_t i = 0; i != format_settings.csv.skip_first_lines; ++i) for (size_t i = 0; i != format_settings.tsv.skip_first_lines; ++i)
readRow(); readRow();
} }

View File

@ -415,7 +415,7 @@ std::string DataPartStorageOnDisk::getDiskName() const
std::string DataPartStorageOnDisk::getDiskType() const std::string DataPartStorageOnDisk::getDiskType() const
{ {
return toString(volume->getDisk()->getType()); return toString(volume->getDisk()->getDataSourceDescription().type);
} }
bool DataPartStorageOnDisk::isStoredOnRemoteDisk() const bool DataPartStorageOnDisk::isStoredOnRemoteDisk() const

View File

@ -458,11 +458,11 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
Disks disks = data.getDisks(); Disks disks = data.getDisks();
for (const auto & data_disk : disks) for (const auto & data_disk : disks)
if (data_disk->supportZeroCopyReplication()) if (data_disk->supportZeroCopyReplication())
capability.push_back(toString(data_disk->getType())); capability.push_back(toString(data_disk->getDataSourceDescription().type));
} }
else if (disk->supportZeroCopyReplication()) else if (disk->supportZeroCopyReplication())
{ {
capability.push_back(toString(disk->getType())); capability.push_back(toString(disk->getDataSourceDescription().type));
} }
} }
if (!capability.empty()) if (!capability.empty())

View File

@ -443,6 +443,7 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
pos += unused_flags_len + commit_lsn_len + transaction_end_lsn_len + transaction_commit_timestamp_len; pos += unused_flags_len + commit_lsn_len + transaction_end_lsn_len + transaction_commit_timestamp_len;
final_lsn = current_lsn; final_lsn = current_lsn;
committed = true;
break; break;
} }
case 'R': // Relation case 'R': // Relation
@ -593,6 +594,12 @@ void MaterializedPostgreSQLConsumer::syncTables()
LOG_DEBUG(log, "Table sync end for {} tables, last lsn: {} = {}, (attempted lsn {})", tables_to_sync.size(), current_lsn, getLSNValue(current_lsn), getLSNValue(final_lsn)); LOG_DEBUG(log, "Table sync end for {} tables, last lsn: {} = {}, (attempted lsn {})", tables_to_sync.size(), current_lsn, getLSNValue(current_lsn), getLSNValue(final_lsn));
updateLsn();
}
void MaterializedPostgreSQLConsumer::updateLsn()
{
try try
{ {
auto tx = std::make_shared<pqxx::nontransaction>(connection->getRef()); auto tx = std::make_shared<pqxx::nontransaction>(connection->getRef());
@ -614,6 +621,7 @@ String MaterializedPostgreSQLConsumer::advanceLSN(std::shared_ptr<pqxx::nontrans
final_lsn = result[0][0].as<std::string>(); final_lsn = result[0][0].as<std::string>();
LOG_TRACE(log, "Advanced LSN up to: {}", getLSNValue(final_lsn)); LOG_TRACE(log, "Advanced LSN up to: {}", getLSNValue(final_lsn));
committed = false;
return final_lsn; return final_lsn;
} }
@ -771,7 +779,7 @@ bool MaterializedPostgreSQLConsumer::readFromReplicationSlot()
try try
{ {
// LOG_DEBUG(log, "Current message: {}", (*row)[1]); /// LOG_DEBUG(log, "Current message: {}", (*row)[1]);
processReplicationMessage((*row)[1].c_str(), (*row)[1].size()); processReplicationMessage((*row)[1].c_str(), (*row)[1].size());
} }
catch (const Exception & e) catch (const Exception & e)
@ -790,6 +798,7 @@ bool MaterializedPostgreSQLConsumer::readFromReplicationSlot()
} }
catch (const pqxx::broken_connection &) catch (const pqxx::broken_connection &)
{ {
LOG_DEBUG(log, "Connection was broken");
connection->tryUpdateConnection(); connection->tryUpdateConnection();
return false; return false;
} }
@ -823,7 +832,13 @@ bool MaterializedPostgreSQLConsumer::readFromReplicationSlot()
} }
if (!tables_to_sync.empty()) if (!tables_to_sync.empty())
{
syncTables(); syncTables();
}
else if (committed)
{
updateLsn();
}
return true; return true;
} }

View File

@ -94,6 +94,8 @@ private:
void syncTables(); void syncTables();
void updateLsn();
String advanceLSN(std::shared_ptr<pqxx::nontransaction> ntx); String advanceLSN(std::shared_ptr<pqxx::nontransaction> ntx);
void processReplicationMessage(const char * replication_message, size_t size); void processReplicationMessage(const char * replication_message, size_t size);
@ -136,6 +138,8 @@ private:
ContextPtr context; ContextPtr context;
const std::string replication_slot_name, publication_name; const std::string replication_slot_name, publication_name;
bool committed = false;
std::shared_ptr<postgres::Connection> connection; std::shared_ptr<postgres::Connection> connection;
std::string current_lsn, final_lsn; std::string current_lsn, final_lsn;

View File

@ -321,13 +321,13 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
nested_storages, nested_storages,
(is_materialized_postgresql_database ? postgres_database : postgres_database + '.' + tables_list)); (is_materialized_postgresql_database ? postgres_database : postgres_database + '.' + tables_list));
replication_handler_initialized = true;
consumer_task->activateAndSchedule(); consumer_task->activateAndSchedule();
cleanup_task->activateAndSchedule(); cleanup_task->activateAndSchedule();
/// Do not rely anymore on saved storage pointers. /// Do not rely anymore on saved storage pointers.
materialized_storages.clear(); materialized_storages.clear();
replication_handler_initialized = true;
} }

View File

@ -7500,7 +7500,7 @@ void StorageReplicatedMergeTree::lockSharedDataTemporary(const String & part_nam
String id = part_id; String id = part_id;
boost::replace_all(id, "/", "_"); boost::replace_all(id, "/", "_");
Strings zc_zookeeper_paths = getZeroCopyPartPath(*getSettings(), toString(disk->getType()), getTableSharedID(), Strings zc_zookeeper_paths = getZeroCopyPartPath(*getSettings(), toString(disk->getDataSourceDescription().type), getTableSharedID(),
part_name, zookeeper_path); part_name, zookeeper_path);
for (const auto & zc_zookeeper_path : zc_zookeeper_paths) for (const auto & zc_zookeeper_path : zc_zookeeper_paths)
@ -7690,11 +7690,11 @@ DataPartStoragePtr StorageReplicatedMergeTree::tryToFetchIfShared(
const String & path) const String & path)
{ {
const auto settings = getSettings(); const auto settings = getSettings();
auto disk_type = disk->getType(); auto data_source_description = disk->getDataSourceDescription();
if (!(disk->supportZeroCopyReplication() && settings->allow_remote_fs_zero_copy_replication)) if (!(disk->supportZeroCopyReplication() && settings->allow_remote_fs_zero_copy_replication))
return nullptr; return nullptr;
String replica = getSharedDataReplica(part, disk_type); String replica = getSharedDataReplica(part, data_source_description.type);
/// We can't fetch part when none replicas have this part on a same type remote disk /// We can't fetch part when none replicas have this part on a same type remote disk
if (replica.empty()) if (replica.empty())
@ -7703,9 +7703,8 @@ DataPartStoragePtr StorageReplicatedMergeTree::tryToFetchIfShared(
return executeFetchShared(replica, part.name, disk, path); return executeFetchShared(replica, part.name, disk, path);
} }
String StorageReplicatedMergeTree::getSharedDataReplica( String StorageReplicatedMergeTree::getSharedDataReplica(
const IMergeTreeDataPart & part, DiskType disk_type) const const IMergeTreeDataPart & part, DataSourceType data_source_type) const
{ {
String best_replica; String best_replica;
@ -7713,7 +7712,7 @@ String StorageReplicatedMergeTree::getSharedDataReplica(
if (!zookeeper) if (!zookeeper)
return ""; return "";
Strings zc_zookeeper_paths = getZeroCopyPartPath(*getSettings(), toString(disk_type), getTableSharedID(), part.name, Strings zc_zookeeper_paths = getZeroCopyPartPath(*getSettings(), toString(data_source_type), getTableSharedID(), part.name,
zookeeper_path); zookeeper_path);
std::set<String> replicas; std::set<String> replicas;
@ -7824,7 +7823,7 @@ std::optional<String> StorageReplicatedMergeTree::getZeroCopyPartPath(const Stri
if (!disk || !disk->supportZeroCopyReplication()) if (!disk || !disk->supportZeroCopyReplication())
return std::nullopt; return std::nullopt;
return getZeroCopyPartPath(*getSettings(), toString(disk->getType()), getTableSharedID(), part_name, zookeeper_path)[0]; return getZeroCopyPartPath(*getSettings(), toString(disk->getDataSourceDescription().type), getTableSharedID(), part_name, zookeeper_path)[0];
} }
std::optional<ZeroCopyLock> StorageReplicatedMergeTree::tryCreateZeroCopyExclusiveLock(const String & part_name, const DiskPtr & disk) std::optional<ZeroCopyLock> StorageReplicatedMergeTree::tryCreateZeroCopyExclusiveLock(const String & part_name, const DiskPtr & disk)
@ -8229,7 +8228,7 @@ bool StorageReplicatedMergeTree::removeSharedDetachedPart(DiskPtr disk, const St
String id = disk->getUniqueId(checksums); String id = disk->getUniqueId(checksums);
bool can_remove = false; bool can_remove = false;
std::tie(can_remove, files_not_to_remove) = StorageReplicatedMergeTree::unlockSharedDataByID(id, table_uuid, part_name, std::tie(can_remove, files_not_to_remove) = StorageReplicatedMergeTree::unlockSharedDataByID(id, table_uuid, part_name,
detached_replica_name, toString(disk->getType()), zookeeper, local_context->getReplicatedMergeTreeSettings(), &Poco::Logger::get("StorageReplicatedMergeTree"), detached_replica_name, toString(disk->getDataSourceDescription().type), zookeeper, local_context->getReplicatedMergeTreeSettings(), &Poco::Logger::get("StorageReplicatedMergeTree"),
detached_zookeeper_path); detached_zookeeper_path);
keep_shared = !can_remove; keep_shared = !can_remove;

View File

@ -283,7 +283,7 @@ public:
DataPartStoragePtr tryToFetchIfShared(const IMergeTreeDataPart & part, const DiskPtr & disk, const String & path) override; DataPartStoragePtr tryToFetchIfShared(const IMergeTreeDataPart & part, const DiskPtr & disk, const String & path) override;
/// Get best replica having this partition on a same type remote disk /// Get best replica having this partition on a same type remote disk
String getSharedDataReplica(const IMergeTreeDataPart & part, DiskType disk_type) const; String getSharedDataReplica(const IMergeTreeDataPart & part, DataSourceType data_source_type) const;
inline String getReplicaName() const { return replica_name; } inline String getReplicaName() const { return replica_name; }

View File

@ -34,6 +34,13 @@ void StorageS3Settings::loadFromConfig(const String & config_elem, const Poco::U
return with_default ? config.getUInt64(config_elem + "." + key + "." + elem, default_value) : config.getUInt64(config_elem + "." + key + "." + elem); return with_default ? config.getUInt64(config_elem + "." + key + "." + elem, default_value) : config.getUInt64(config_elem + "." + key + "." + elem);
}; };
auto get_bool_for_key = [&](const String & key, const String & elem, bool with_default = true, bool default_value = false)
{
return with_default ? config.getBool(config_elem + "." + key + "." + elem, default_value) : config.getBool(config_elem + "." + key + "." + elem);
};
for (const String & key : config_keys) for (const String & key : config_keys)
{ {
if (config.has(config_elem + "." + key + ".endpoint")) if (config.has(config_elem + "." + key + ".endpoint"))
@ -82,6 +89,7 @@ void StorageS3Settings::loadFromConfig(const String & config_elem, const Poco::U
rw_settings.upload_part_size_multiply_parts_count_threshold = get_uint_for_key(key, "upload_part_size_multiply_parts_count_threshold", true, settings.s3_upload_part_size_multiply_parts_count_threshold); rw_settings.upload_part_size_multiply_parts_count_threshold = get_uint_for_key(key, "upload_part_size_multiply_parts_count_threshold", true, settings.s3_upload_part_size_multiply_parts_count_threshold);
rw_settings.max_single_part_upload_size = get_uint_for_key(key, "max_single_part_upload_size", true, settings.s3_max_single_part_upload_size); rw_settings.max_single_part_upload_size = get_uint_for_key(key, "max_single_part_upload_size", true, settings.s3_max_single_part_upload_size);
rw_settings.max_connections = get_uint_for_key(key, "max_connections", true, settings.s3_max_connections); rw_settings.max_connections = get_uint_for_key(key, "max_connections", true, settings.s3_max_connections);
rw_settings.check_objects_after_upload = get_bool_for_key(key, "check_objects_after_upload", true, false);
s3_settings.emplace(endpoint, S3Settings{std::move(auth_settings), std::move(rw_settings)}); s3_settings.emplace(endpoint, S3Settings{std::move(auth_settings), std::move(rw_settings)});
} }
@ -112,6 +120,7 @@ S3Settings::ReadWriteSettings::ReadWriteSettings(const Settings & settings)
upload_part_size_multiply_parts_count_threshold = settings.s3_upload_part_size_multiply_parts_count_threshold; upload_part_size_multiply_parts_count_threshold = settings.s3_upload_part_size_multiply_parts_count_threshold;
max_single_part_upload_size = settings.s3_max_single_part_upload_size; max_single_part_upload_size = settings.s3_max_single_part_upload_size;
max_connections = settings.s3_max_connections; max_connections = settings.s3_max_connections;
check_objects_after_upload = settings.s3_check_objects_after_upload;
} }
void S3Settings::ReadWriteSettings::updateFromSettingsIfEmpty(const Settings & settings) void S3Settings::ReadWriteSettings::updateFromSettingsIfEmpty(const Settings & settings)
@ -128,6 +137,7 @@ void S3Settings::ReadWriteSettings::updateFromSettingsIfEmpty(const Settings & s
max_single_part_upload_size = settings.s3_max_single_part_upload_size; max_single_part_upload_size = settings.s3_max_single_part_upload_size;
if (!max_connections) if (!max_connections)
max_connections = settings.s3_max_connections; max_connections = settings.s3_max_connections;
check_objects_after_upload = settings.s3_check_objects_after_upload;
} }
} }

View File

@ -60,6 +60,7 @@ struct S3Settings
size_t upload_part_size_multiply_parts_count_threshold = 0; size_t upload_part_size_multiply_parts_count_threshold = 0;
size_t max_single_part_upload_size = 0; size_t max_single_part_upload_size = 0;
size_t max_connections = 0; size_t max_connections = 0;
bool check_objects_after_upload = false;
ReadWriteSettings() = default; ReadWriteSettings() = default;
explicit ReadWriteSettings(const Settings & settings); explicit ReadWriteSettings(const Settings & settings);
@ -71,7 +72,8 @@ struct S3Settings
&& upload_part_size_multiply_factor == other.upload_part_size_multiply_factor && upload_part_size_multiply_factor == other.upload_part_size_multiply_factor
&& upload_part_size_multiply_parts_count_threshold == other.upload_part_size_multiply_parts_count_threshold && upload_part_size_multiply_parts_count_threshold == other.upload_part_size_multiply_parts_count_threshold
&& max_single_part_upload_size == other.max_single_part_upload_size && max_single_part_upload_size == other.max_single_part_upload_size
&& max_connections == other.max_connections; && max_connections == other.max_connections
&& check_objects_after_upload == other.check_objects_after_upload;
} }
void updateFromSettingsIfEmpty(const Settings & settings); void updateFromSettingsIfEmpty(const Settings & settings);

View File

@ -23,6 +23,7 @@ StorageSystemDisks::StorageSystemDisks(const StorageID & table_id_)
{"total_space", std::make_shared<DataTypeUInt64>()}, {"total_space", std::make_shared<DataTypeUInt64>()},
{"keep_free_space", std::make_shared<DataTypeUInt64>()}, {"keep_free_space", std::make_shared<DataTypeUInt64>()},
{"type", std::make_shared<DataTypeString>()}, {"type", std::make_shared<DataTypeString>()},
{"is_encrypted", std::make_shared<DataTypeUInt8>()},
{"cache_path", std::make_shared<DataTypeString>()}, {"cache_path", std::make_shared<DataTypeString>()},
})); }));
setInMemoryMetadata(storage_metadata); setInMemoryMetadata(storage_metadata);
@ -45,6 +46,7 @@ Pipe StorageSystemDisks::read(
MutableColumnPtr col_total = ColumnUInt64::create(); MutableColumnPtr col_total = ColumnUInt64::create();
MutableColumnPtr col_keep = ColumnUInt64::create(); MutableColumnPtr col_keep = ColumnUInt64::create();
MutableColumnPtr col_type = ColumnString::create(); MutableColumnPtr col_type = ColumnString::create();
MutableColumnPtr col_is_encrypted = ColumnUInt8::create();
MutableColumnPtr col_cache_path = ColumnString::create(); MutableColumnPtr col_cache_path = ColumnString::create();
for (const auto & [disk_name, disk_ptr] : context->getDisksMap()) for (const auto & [disk_name, disk_ptr] : context->getDisksMap())
@ -54,7 +56,9 @@ Pipe StorageSystemDisks::read(
col_free->insert(disk_ptr->getAvailableSpace()); col_free->insert(disk_ptr->getAvailableSpace());
col_total->insert(disk_ptr->getTotalSpace()); col_total->insert(disk_ptr->getTotalSpace());
col_keep->insert(disk_ptr->getKeepingFreeSpace()); col_keep->insert(disk_ptr->getKeepingFreeSpace());
col_type->insert(toString(disk_ptr->getType())); auto data_source_description = disk_ptr->getDataSourceDescription();
col_type->insert(toString(data_source_description.type));
col_is_encrypted->insert(data_source_description.is_encrypted);
String cache_path; String cache_path;
if (disk_ptr->supportsCache()) if (disk_ptr->supportsCache())
@ -70,6 +74,7 @@ Pipe StorageSystemDisks::read(
res_columns.emplace_back(std::move(col_total)); res_columns.emplace_back(std::move(col_total));
res_columns.emplace_back(std::move(col_keep)); res_columns.emplace_back(std::move(col_keep));
res_columns.emplace_back(std::move(col_type)); res_columns.emplace_back(std::move(col_type));
res_columns.emplace_back(std::move(col_is_encrypted));
res_columns.emplace_back(std::move(col_cache_path)); res_columns.emplace_back(std::move(col_cache_path));
UInt64 num_rows = res_columns.at(0)->size(); UInt64 num_rows = res_columns.at(0)->size();

View File

@ -59,6 +59,7 @@ ln -sf $SRC_PATH/users.d/session_log_test.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/memory_profiler.xml $DEST_SERVER_PATH/users.d/ ln -sf $SRC_PATH/users.d/memory_profiler.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/no_fsync_metadata.xml $DEST_SERVER_PATH/users.d/ ln -sf $SRC_PATH/users.d/no_fsync_metadata.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/filelog.xml $DEST_SERVER_PATH/users.d/ ln -sf $SRC_PATH/users.d/filelog.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/enable_blobs_check.xml $DEST_SERVER_PATH/users.d/
# FIXME DataPartsExchange may hang for http_send_timeout seconds # FIXME DataPartsExchange may hang for http_send_timeout seconds
# when nobody is going to read from the other side of socket (due to "Fetching of part was cancelled"), # when nobody is going to read from the other side of socket (due to "Fetching of part was cancelled"),

View File

@ -0,0 +1,7 @@
<clickhouse>
<profiles>
<default>
<s3_check_objects_after_upload>1</s3_check_objects_after_upload>
</default>
</profiles>
</clickhouse>

View File

@ -89,6 +89,8 @@ def test_restore_table(engine):
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n" assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
instance.query(f"BACKUP TABLE test.table TO {backup_name}") instance.query(f"BACKUP TABLE test.table TO {backup_name}")
assert instance.contains_in_log("using native copy")
instance.query("DROP TABLE test.table") instance.query("DROP TABLE test.table")
assert instance.query("EXISTS test.table") == "0\n" assert instance.query("EXISTS test.table") == "0\n"
@ -129,6 +131,8 @@ def test_restore_table_under_another_name():
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n" assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
instance.query(f"BACKUP TABLE test.table TO {backup_name}") instance.query(f"BACKUP TABLE test.table TO {backup_name}")
assert instance.contains_in_log("using native copy")
assert instance.query("EXISTS test.table2") == "0\n" assert instance.query("EXISTS test.table2") == "0\n"
instance.query(f"RESTORE TABLE test.table AS test.table2 FROM {backup_name}") instance.query(f"RESTORE TABLE test.table AS test.table2 FROM {backup_name}")
@ -142,6 +146,8 @@ def test_backup_table_under_another_name():
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n" assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
instance.query(f"BACKUP TABLE test.table AS test.table2 TO {backup_name}") instance.query(f"BACKUP TABLE test.table AS test.table2 TO {backup_name}")
assert instance.contains_in_log("using native copy")
assert instance.query("EXISTS test.table2") == "0\n" assert instance.query("EXISTS test.table2") == "0\n"
instance.query(f"RESTORE TABLE test.table2 FROM {backup_name}") instance.query(f"RESTORE TABLE test.table2 FROM {backup_name}")
@ -170,6 +176,8 @@ def test_incremental_backup():
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n" assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
instance.query(f"BACKUP TABLE test.table TO {backup_name}") instance.query(f"BACKUP TABLE test.table TO {backup_name}")
assert instance.contains_in_log("using native copy")
instance.query("INSERT INTO test.table VALUES (65, 'a'), (66, 'b')") instance.query("INSERT INTO test.table VALUES (65, 'a'), (66, 'b')")
assert instance.query("SELECT count(), sum(x) FROM test.table") == "102\t5081\n" assert instance.query("SELECT count(), sum(x) FROM test.table") == "102\t5081\n"
@ -244,6 +252,8 @@ def test_file_engine():
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n" assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
instance.query(f"BACKUP TABLE test.table TO {backup_name}") instance.query(f"BACKUP TABLE test.table TO {backup_name}")
assert instance.contains_in_log("using native copy")
instance.query("DROP TABLE test.table") instance.query("DROP TABLE test.table")
assert instance.query("EXISTS test.table") == "0\n" assert instance.query("EXISTS test.table") == "0\n"
@ -257,6 +267,9 @@ def test_database():
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n" assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
instance.query(f"BACKUP DATABASE test TO {backup_name}") instance.query(f"BACKUP DATABASE test TO {backup_name}")
assert instance.contains_in_log("using native copy")
instance.query("DROP DATABASE test") instance.query("DROP DATABASE test")
instance.query(f"RESTORE DATABASE test FROM {backup_name}") instance.query(f"RESTORE DATABASE test FROM {backup_name}")
@ -269,6 +282,7 @@ def test_zip_archive():
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n" assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
instance.query(f"BACKUP TABLE test.table TO {backup_name}") instance.query(f"BACKUP TABLE test.table TO {backup_name}")
assert os.path.isfile(get_path_to_backup(backup_name)) assert os.path.isfile(get_path_to_backup(backup_name))
instance.query("DROP TABLE test.table") instance.query("DROP TABLE test.table")

View File

@ -0,0 +1 @@
#!/usr/bin/env python3

View File

@ -0,0 +1,23 @@
<?xml version="1.0" encoding="utf-8"?>
<clickhouse>
<profiles>
<default>
<s3_check_objects_after_upload>1</s3_check_objects_after_upload>
<enable_s3_requests_logging>1</enable_s3_requests_logging>
</default>
</profiles>
<users>
<default>
<password></password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
</default>
</users>
<quotas><default></default></quotas>
</clickhouse>

View File

@ -0,0 +1,31 @@
<?xml version="1.0" encoding="utf-8"?>
<clickhouse>
<logger>
<level>test</level>
</logger>
<storage_configuration>
<disks>
<s3>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3>
</disks>
<policies>
<s3>
<volumes>
<main>
<disk>s3</disk>
</main>
</volumes>
</s3>
</policies>
</storage_configuration>
<merge_tree>
<storage_policy>s3</storage_policy>
</merge_tree>
</clickhouse>

View File

@ -0,0 +1,52 @@
#!/usr/bin/env python3
import logging
import os
import time
from helpers.cluster import ClickHouseCluster
import pytest
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"node",
main_configs=[
"configs/storage_conf.xml",
],
user_configs=[
"configs/setting.xml",
],
with_minio=True,
)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
yield cluster
finally:
cluster.shutdown()
def test_paranoid_check_in_logs(cluster):
node = cluster.instances["node"]
node.query(
"""
CREATE TABLE s3_failover_test (
id Int64,
data String
) ENGINE=MergeTree()
ORDER BY id
"""
)
node.query("INSERT INTO s3_failover_test VALUES (1, 'Hello')")
assert node.contains_in_log("exists after upload")
assert node.query("SELECT * FROM s3_failover_test ORDER BY id") == "1\tHello\n"

View File

@ -6,7 +6,7 @@ disk_types = {
"disk_s3": "s3", "disk_s3": "s3",
"disk_memory": "memory", "disk_memory": "memory",
"disk_hdfs": "hdfs", "disk_hdfs": "hdfs",
"disk_encrypted": "encrypted", "disk_encrypted": "s3",
} }
@ -34,14 +34,30 @@ def test_different_types(cluster):
if disk == "": # skip empty line (after split at last position) if disk == "": # skip empty line (after split at last position)
continue continue
fields = disk.split("\t") fields = disk.split("\t")
assert len(fields) >= 6 assert len(fields) >= 7
assert disk_types.get(fields[0], "UNKNOWN") == fields[5] assert disk_types.get(fields[0], "UNKNOWN") == fields[5]
if "encrypted" in fields[0]:
assert fields[6] == "1"
else:
assert fields[6] == "0"
def test_select_by_type(cluster): def test_select_by_type(cluster):
node = cluster.instances["node"] node = cluster.instances["node"]
for name, disk_type in list(disk_types.items()): for name, disk_type in list(disk_types.items()):
assert ( if disk_type != "s3":
node.query("SELECT name FROM system.disks WHERE type='" + disk_type + "'") assert (
== name + "\n" node.query(
) "SELECT name FROM system.disks WHERE type='" + disk_type + "'"
)
== name + "\n"
)
else:
assert (
node.query(
"SELECT name FROM system.disks WHERE type='"
+ disk_type
+ "' ORDER BY name"
)
== "disk_encrypted\ndisk_s3\n"
)

View File

@ -27,7 +27,7 @@ def get_profile_event_for_query(node, query, profile_event):
query = query.replace("'", "\\'") query = query.replace("'", "\\'")
return int( return int(
node.query( node.query(
f"select ProfileEvents['{profile_event}'] from system.query_log where query='{query}' and type = 'QueryFinish' order by event_time desc limit 1" f"select ProfileEvents['{profile_event}'] from system.query_log where query='{query}' and type = 'QueryFinish' order by query_start_time_microseconds desc limit 1"
) )
) )

View File

@ -636,7 +636,7 @@ def get_profile_event_for_query(node, query, profile_event):
query = query.replace("'", "\\'") query = query.replace("'", "\\'")
return int( return int(
node.query( node.query(
f"select ProfileEvents['{profile_event}'] from system.query_log where query='{query}' and type = 'QueryFinish' order by event_time desc limit 1" f"select ProfileEvents['{profile_event}'] from system.query_log where query='{query}' and type = 'QueryFinish' order by query_start_time_microseconds desc limit 1"
) )
) )

View File

@ -1493,16 +1493,12 @@ def test_wrong_format_usage(started_cluster):
def check_profile_event_for_query(instance, query, profile_event, amount): def check_profile_event_for_query(instance, query, profile_event, amount):
instance.query("system flush logs") instance.query("system flush logs")
query = query.replace("'", "\\'") query = query.replace("'", "\\'")
attempt = 0 res = int(
res = 0 instance.query(
while attempt < 10: f"select ProfileEvents['{profile_event}'] from system.query_log where query='{query}' and type = 'QueryFinish' order by query_start_time_microseconds desc limit 1"
res = int(
instance.query(
f"select ProfileEvents['{profile_event}'] from system.query_log where query='{query}' and type = 'QueryFinish' order by event_time desc limit 1"
)
) )
if res == amount: )
break
assert res == amount assert res == amount

View File

@ -123,7 +123,7 @@ timeout $TIMEOUT bash -c drop_part_thread &
wait wait
check_replication_consistency "dst_" "count(), sum(p), sum(k), sum(v)" check_replication_consistency "dst_" "count(), sum(p), sum(k), sum(v)"
try_sync_replicas "src_" try_sync_replicas "src_" 300
for ((i=0; i<16; i++)) do for ((i=0; i<16; i++)) do
$CLICKHOUSE_CLIENT -q "DROP TABLE dst_$i" 2>&1| grep -Fv "is already started to be removing" & $CLICKHOUSE_CLIENT -q "DROP TABLE dst_$i" 2>&1| grep -Fv "is already started to be removing" &

View File

@ -5,4 +5,4 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CURDIR"/../shell_config.sh . "$CURDIR"/../shell_config.sh
# if it will not match, the exit code of grep will be non-zero and the test will fail # if it will not match, the exit code of grep will be non-zero and the test will fail
$CLICKHOUSE_CLIENT -q "SELECT toDateTime(format('{}-{}-01 00:00:00', '2021', '1'))" |& grep -F -q 'Cannot parse datetime 2021-1-01 00:00:00: Cannot parse DateTime from String:' $CLICKHOUSE_CLIENT -q "SELECT toDateTime(format('{}-{}-01 00:00:00', '2021', '1'))" |& grep -F -q "Cannot parse string '2021-1-01 00:00:00' as DateTime"

View File

@ -186,6 +186,7 @@ CREATE TABLE system.disks
`total_space` UInt64, `total_space` UInt64,
`keep_free_space` UInt64, `keep_free_space` UInt64,
`type` String, `type` String,
`is_encrypted` UInt8,
`cache_path` String `cache_path` String
) )
ENGINE = SystemDisks ENGINE = SystemDisks

View File

@ -7,6 +7,6 @@ select * from file(data_02314.csv) settings input_format_csv_skip_first_lines=5;
insert into function file(data_02314.tsv) select number, number + 1 from numbers(5) settings engine_file_truncate_on_insert=1; insert into function file(data_02314.tsv) select number, number + 1 from numbers(5) settings engine_file_truncate_on_insert=1;
insert into function file(data_02314.tsv) select number, number + 1, number + 2 from numbers(5); insert into function file(data_02314.tsv) select number, number + 1, number + 2 from numbers(5);
desc file(data_02314.tsv) settings input_format_csv_skip_first_lines=5; desc file(data_02314.tsv) settings input_format_tsv_skip_first_lines=5;
select * from file(data_02314.tsv) settings input_format_csv_skip_first_lines=5; select * from file(data_02314.tsv) settings input_format_tsv_skip_first_lines=5;

View File

@ -0,0 +1,4 @@
123 2022-05-03 00:00:00
456 2022-05-03 01:02:03
123 2022-05-03 00:00:00.000
456 2022-05-03 01:02:03.000

View File

@ -0,0 +1,13 @@
CREATE TEMPORARY TABLE test (`i` Int64, `d` DateTime);
INSERT INTO test FORMAT JSONEachRow {"i": 123, "d": "2022-05-03"};
INSERT INTO test FORMAT JSONEachRow {"i": 456, "d": "2022-05-03 01:02:03"};
SELECT * FROM test ORDER BY i;
DROP TABLE test;
CREATE TEMPORARY TABLE test (`i` Int64, `d` DateTime64);
INSERT INTO test FORMAT JSONEachRow {"i": 123, "d": "2022-05-03"};
INSERT INTO test FORMAT JSONEachRow {"i": 456, "d": "2022-05-03 01:02:03"};
SELECT * FROM test ORDER BY i;
DROP TABLE test;

View File

@ -0,0 +1,12 @@
2022-08-22
2022-08-22
2022-08-22
2022-08-22
2022-08-22
2022-08-22
2022-08-22
2022-08-22
2022-08-22
2022-08-22
2022-08-22
2022-08-22

View File

@ -0,0 +1,33 @@
SELECT toDate('2022-08-22 01:02:03');
SELECT toDate32('2022-08-22 01:02:03');
SELECT toDate('2022-08-22 01:02:03.1');
SELECT toDate32('2022-08-22 01:02:03.1');
SELECT toDate('2022-08-22 01:02:03.123456');
SELECT toDate32('2022-08-22 01:02:03.123456');
SELECT toDate('2022-08-22T01:02:03');
SELECT toDate32('2022-08-22T01:02:03');
SELECT toDate('2022-08-22T01:02:03.1');
SELECT toDate32('2022-08-22T01:02:03.1');
SELECT toDate('2022-08-22T01:02:03.123456');
SELECT toDate32('2022-08-22T01:02:03.123456');
SELECT toDate('2022-08-22+01:02:03'); -- { serverError 6 }
SELECT toDate32('2022-08-22+01:02:03'); -- { serverError 6 }
SELECT toDate('2022-08-22 01:02:0'); -- { serverError 6 }
SELECT toDate32('2022-08-22 01:02:0'); -- { serverError 6 }
SELECT toDate('2022-08-22 01:02:03.'); -- { serverError 6 }
SELECT toDate32('2022-08-22 01:02:03.'); -- { serverError 6 }
SELECT toDate('2022-08-22 01:02:03.111a'); -- { serverError 6 }
SELECT toDate32('2022-08-22 01:02:03.2b'); -- { serverError 6 }
SELECT toDate('2022-08-22 01:02:03.a'); -- { serverError 6 }
SELECT toDate32('2022-08-22 01:02:03.b'); -- { serverError 6 }

View File

@ -0,0 +1,5 @@
a
b
c
d
e

View File

@ -0,0 +1,15 @@
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t2;
CREATE TABLE t1 (x UInt64) ENGINE = TinyLog;
INSERT INTO t1 VALUES (1), (2), (3);
CREATE TABLE t2 (x UInt64, value String) ENGINE = TinyLog;
INSERT INTO t2 VALUES (1, 'a'), (2, 'b'), (2, 'c');
INSERT INTO t2 VALUES (3, 'd'), (3, 'e'), (4, 'f');
SET max_block_size=3;
SET max_joined_block_size_rows = 2;
SET join_algorithm='partial_merge';
SELECT value FROM t1 LEFT JOIN t2 ON t1.x = t2.x;

View File

@ -0,0 +1,2 @@
1970-01-01 00:00:00.000000000
c1 Nullable(String)

View File

@ -0,0 +1,2 @@
select toDateTime64OrDefault('Aaaa e a.a.aaaaaaaaa', 9, 'UTC');
desc format(CSV, '"Aaaa e a.a.aaaaaaaaa"');

View File

@ -0,0 +1,28 @@
#!/bin/bash
if [ -z "$1" ]; then
echo "Helper script to create empty test and reference files and assign a new number."
echo "Usage: $0 <base_test_name>"
exit 1
fi
TESTS_PATH=$(dirname ${BASH_SOURCE[0]})
set -ue
# shellcheck disable=SC2010
LAST_TEST_NO=$(ls -1 ${TESTS_PATH} | grep -P -o '^\d+' | sort -nr | head -1)
# remove leading zeros, increment and add padding zeros to 5 digits
NEW_TEST_NO=$(printf "%05d\n" $((10#$LAST_TEST_NO + 1)))
# if extension is not provided, use `.sql`
FILENAME="${1}"
FILEEXT="sql"
if [[ $1 == *.* ]] ; then
FILENAME="${1%.*}"
FILEEXT="${1##*.}"
fi
set -x
touch ${TESTS_PATH}/${NEW_TEST_NO}_${FILENAME}.${FILEEXT}
touch ${TESTS_PATH}/${NEW_TEST_NO}_${FILENAME}.reference

View File

@ -5,6 +5,7 @@
function try_sync_replicas() function try_sync_replicas()
{ {
table_name_prefix=$1 table_name_prefix=$1
time_left=$2
readarray -t empty_partitions_arr < <(${CLICKHOUSE_CLIENT} -q \ readarray -t empty_partitions_arr < <(${CLICKHOUSE_CLIENT} -q \
"SELECT DISTINCT substr(new_part_name, 1, position(new_part_name, '_') - 1) AS partition_id "SELECT DISTINCT substr(new_part_name, 1, position(new_part_name, '_') - 1) AS partition_id
@ -29,7 +30,7 @@ function try_sync_replicas()
for t in "${tables_arr[@]}" for t in "${tables_arr[@]}"
do do
# The size of log may be big, so increase timeout. # The size of log may be big, so increase timeout.
$CLICKHOUSE_CLIENT --receive_timeout 300 -q "SYSTEM SYNC REPLICA $t" || ($CLICKHOUSE_CLIENT -q \ $CLICKHOUSE_CLIENT --receive_timeout $time_left -q "SYSTEM SYNC REPLICA $t" || ($CLICKHOUSE_CLIENT -q \
"select 'sync failed, queue:', * from system.replication_queue where database=currentDatabase() and table='$t' order by database, table, node_name" && exit 1) & "select 'sync failed, queue:', * from system.replication_queue where database=currentDatabase() and table='$t' order by database, table, node_name" && exit 1) &
pids[${i}]=$! pids[${i}]=$!
i=$((i + 1)) i=$((i + 1))
@ -48,13 +49,14 @@ function check_replication_consistency()
# Wait for all queries to finish (query may still be running if thread is killed by timeout) # Wait for all queries to finish (query may still be running if thread is killed by timeout)
num_tries=0 num_tries=0
while [[ $($CLICKHOUSE_CLIENT -q "SELECT count() FROM system.processes WHERE current_database=currentDatabase() AND query LIKE '%$table_name_prefix%'") -ne 1 ]]; do while [[ $($CLICKHOUSE_CLIENT -q "SELECT count() FROM system.processes WHERE current_database=currentDatabase() AND query LIKE '%$table_name_prefix%'") -ne 1 ]]; do
sleep 0.5; sleep 1;
num_tries=$((num_tries+1)) num_tries=$((num_tries+1))
if [ $num_tries -eq 200 ]; then if [ $num_tries -eq 250 ]; then
$CLICKHOUSE_CLIENT -q "SELECT * FROM system.processes WHERE current_database=currentDatabase() AND query LIKE '%$table_name_prefix%' FORMAT Vertical" $CLICKHOUSE_CLIENT -q "SELECT * FROM system.processes WHERE current_database=currentDatabase() AND query LIKE '%$table_name_prefix%' FORMAT Vertical"
break break
fi fi
done done
time_left=$((300 - num_tries))
# Do not check anything if all replicas are readonly, # Do not check anything if all replicas are readonly,
# because is this case all replicas are probably lost (it may happen and it's not a bug) # because is this case all replicas are probably lost (it may happen and it's not a bug)
@ -78,7 +80,7 @@ function check_replication_consistency()
# SYNC REPLICA is not enough if some MUTATE_PARTs are not assigned yet # SYNC REPLICA is not enough if some MUTATE_PARTs are not assigned yet
wait_for_all_mutations "$table_name_prefix%" wait_for_all_mutations "$table_name_prefix%"
try_sync_replicas "$table_name_prefix" || exit 1 try_sync_replicas "$table_name_prefix" "$time_left" || exit 1
res=$($CLICKHOUSE_CLIENT -q \ res=$($CLICKHOUSE_CLIENT -q \
"SELECT "SELECT