Merge branch 'master' into vdimir/tmp-file-metrics

This commit is contained in:
Vladimir C 2022-08-25 15:23:35 +02:00 committed by GitHub
commit ddde5096ef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
90 changed files with 1284 additions and 269 deletions

View File

@ -303,7 +303,6 @@ else
rm -rf /var/lib/clickhouse/*
# Make BC check more funny by forcing Ordinary engine for system database
# New version will try to convert it to Atomic on startup
mkdir /var/lib/clickhouse/metadata
echo "ATTACH DATABASE system ENGINE=Ordinary" > /var/lib/clickhouse/metadata/system.sql
@ -313,16 +312,13 @@ else
# Start server from previous release
configure
# Avoid "Setting allow_deprecated_database_ordinary is neither a builtin setting..."
rm -f /etc/clickhouse-server/users.d/database_ordinary.xml ||:
# Avoid "Setting s3_check_objects_after_upload is neither a builtin setting..."
rm -f /etc/clickhouse-server/users.d/enable_blobs_check.xml ||:
# Remove s3 related configs to avoid "there is no disk type `cache`"
rm -f /etc/clickhouse-server/config.d/storage_conf.xml ||:
rm -f /etc/clickhouse-server/config.d/azure_storage_conf.xml ||:
# Disable aggressive cleanup of tmp dirs (it worked incorrectly before 22.8)
rm -f /etc/clickhouse-server/config.d/merge_tree_old_dirs_cleanup.xml ||:
start
clickhouse-client --query="SELECT 'Server version: ', version()"

View File

@ -11,6 +11,7 @@ namespace DB
class BackupEntryFromAppendOnlyFile : public BackupEntryFromImmutableFile
{
public:
/// The constructor is allowed to not set `file_size_` or `checksum_`, in that case it will be calculated from the data.
BackupEntryFromAppendOnlyFile(
const DiskPtr & disk_,

View File

@ -2,6 +2,7 @@
#include <Disks/IDisk.h>
#include <Disks/IO/createReadBufferFromFileBase.h>
#include <Poco/File.h>
#include <Common/filesystemHelpers.h>
namespace DB
@ -23,16 +24,24 @@ UInt64 BackupEntryFromImmutableFile::getSize() const
{
std::lock_guard lock{get_file_size_mutex};
if (!file_size)
file_size = disk ? disk->getFileSize(file_path) : Poco::File(file_path).getSize();
file_size = disk->getFileSize(file_path);
return *file_size;
}
std::unique_ptr<SeekableReadBuffer> BackupEntryFromImmutableFile::getReadBuffer() const
{
if (disk)
return disk->readFile(file_path);
else
return createReadBufferFromFileBase(file_path, /* settings= */ {});
return disk->readFile(file_path);
}
DataSourceDescription BackupEntryFromImmutableFile::getDataSourceDescription() const
{
return disk->getDataSourceDescription();
}
String BackupEntryFromImmutableFile::getFilePath() const
{
return file_path;
}
}

View File

@ -14,6 +14,7 @@ using DiskPtr = std::shared_ptr<IDisk>;
class BackupEntryFromImmutableFile : public IBackupEntry
{
public:
/// The constructor is allowed to not set `file_size_` or `checksum_`, in that case it will be calculated from the data.
BackupEntryFromImmutableFile(
const DiskPtr & disk_,
@ -28,8 +29,10 @@ public:
std::optional<UInt128> getChecksum() const override { return checksum; }
std::unique_ptr<SeekableReadBuffer> getReadBuffer() const override;
String getFilePath() const { return file_path; }
DiskPtr getDisk() const { return disk; }
String getFilePath() const override;
DataSourceDescription getDataSourceDescription() const override;
DiskPtr tryGetDiskIfExists() const override { return disk; }
private:
const DiskPtr disk;

View File

@ -19,6 +19,18 @@ public:
std::optional<UInt128> getChecksum() const override { return checksum; }
std::unique_ptr<SeekableReadBuffer> getReadBuffer() const override;
String getFilePath() const override
{
return "";
}
DataSourceDescription getDataSourceDescription() const override
{
return DataSourceDescription{DataSourceType::RAM, "", false, false};
}
DiskPtr tryGetDiskIfExists() const override { return nullptr; }
private:
const String data;
const std::optional<UInt128> checksum;

View File

@ -36,4 +36,5 @@ BackupEntryFromSmallFile::BackupEntryFromSmallFile(
: BackupEntryFromMemory(readFile(disk_, file_path_), checksum_), disk(disk_), file_path(file_path_)
{
}
}

View File

@ -23,9 +23,9 @@ public:
const String & file_path_,
const std::optional<UInt128> & checksum_ = {});
String getFilePath() const { return file_path; }
DiskPtr getDisk() const { return disk; }
String getFilePath() const override { return file_path; }
DiskPtr tryGetDiskIfExists() const override { return disk; }
private:
const DiskPtr disk;
const String file_path;

27
src/Backups/BackupIO.cpp Normal file
View File

@ -0,0 +1,27 @@
#include <Backups/BackupIO.h>
#include <IO/copyData.h>
#include <IO/WriteBuffer.h>
#include <IO/SeekableReadBuffer.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
void IBackupWriter::copyFileThroughBuffer(std::unique_ptr<SeekableReadBuffer> && source, const String & file_name)
{
auto write_buffer = writeFile(file_name);
copyData(*source, *write_buffer);
write_buffer->finalize();
}
void IBackupWriter::copyFileNative(DiskPtr /* from_disk */, const String & /* file_name_from */, const String & /* file_name_to */)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Native copy not implemented for backup writer");
}
}

View File

@ -1,6 +1,8 @@
#pragma once
#include <Core/Types.h>
#include <Disks/DiskType.h>
#include <Disks/IDisk.h>
namespace DB
{
@ -15,6 +17,7 @@ public:
virtual bool fileExists(const String & file_name) = 0;
virtual UInt64 getFileSize(const String & file_name) = 0;
virtual std::unique_ptr<SeekableReadBuffer> readFile(const String & file_name) = 0;
virtual DataSourceDescription getDataSourceDescription() const = 0;
};
/// Represents operations of storing to disk or uploading for writing a backup.
@ -27,6 +30,15 @@ public:
virtual bool fileContentsEqual(const String & file_name, const String & expected_file_contents) = 0;
virtual std::unique_ptr<WriteBuffer> writeFile(const String & file_name) = 0;
virtual void removeFiles(const Strings & file_names) = 0;
virtual DataSourceDescription getDataSourceDescription() const = 0;
virtual void copyFileThroughBuffer(std::unique_ptr<SeekableReadBuffer> && source, const String & file_name);
virtual bool supportNativeCopy(DataSourceDescription /* data_source_description */) const
{
return false;
}
virtual void copyFileNative(DiskPtr from_disk, const String & file_name_from, const String & file_name_to);
};
}

View File

@ -6,6 +6,12 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
BackupReaderDisk::BackupReaderDisk(const DiskPtr & disk_, const String & path_) : disk(disk_), path(path_)
{
}
@ -77,4 +83,28 @@ void BackupWriterDisk::removeFiles(const Strings & file_names)
disk->removeDirectory(path);
}
DataSourceDescription BackupWriterDisk::getDataSourceDescription() const
{
return disk->getDataSourceDescription();
}
DataSourceDescription BackupReaderDisk::getDataSourceDescription() const
{
return disk->getDataSourceDescription();
}
bool BackupWriterDisk::supportNativeCopy(DataSourceDescription data_source_description) const
{
return data_source_description == disk->getDataSourceDescription();
}
void BackupWriterDisk::copyFileNative(DiskPtr from_disk, const String & file_name_from, const String & file_name_to)
{
if (!from_disk)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot natively copy data to disk without source disk");
auto file_path = path / file_name_to;
disk->createDirectories(file_path.parent_path());
from_disk->copyFile(file_name_from, *disk, file_path);
}
}

View File

@ -17,6 +17,7 @@ public:
bool fileExists(const String & file_name) override;
UInt64 getFileSize(const String & file_name) override;
std::unique_ptr<SeekableReadBuffer> readFile(const String & file_name) override;
DataSourceDescription getDataSourceDescription() const override;
private:
DiskPtr disk;
@ -34,7 +35,11 @@ public:
bool fileContentsEqual(const String & file_name, const String & expected_file_contents) override;
std::unique_ptr<WriteBuffer> writeFile(const String & file_name) override;
void removeFiles(const Strings & file_names) override;
DataSourceDescription getDataSourceDescription() const override;
bool supportNativeCopy(DataSourceDescription data_source_description) const override;
void copyFileNative(DiskPtr from_disk, const String & file_name_from, const String & file_name_to) override;
private:
DiskPtr disk;
std::filesystem::path path;

View File

@ -1,6 +1,8 @@
#include <Backups/BackupIO_File.h>
#include <Disks/IO/createReadBufferFromFileBase.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/copyData.h>
#include <Common/filesystemHelpers.h>
namespace fs = std::filesystem;
@ -78,4 +80,55 @@ void BackupWriterFile::removeFiles(const Strings & file_names)
fs::remove(path);
}
DataSourceDescription BackupWriterFile::getDataSourceDescription() const
{
DataSourceDescription data_source_description;
data_source_description.type = DataSourceType::Local;
if (auto block_device_id = tryGetBlockDeviceId(path); block_device_id.has_value())
data_source_description.description = *block_device_id;
else
data_source_description.description = path;
data_source_description.is_encrypted = false;
data_source_description.is_cached = false;
return data_source_description;
}
DataSourceDescription BackupReaderFile::getDataSourceDescription() const
{
DataSourceDescription data_source_description;
data_source_description.type = DataSourceType::Local;
if (auto block_device_id = tryGetBlockDeviceId(path); block_device_id.has_value())
data_source_description.description = *block_device_id;
else
data_source_description.description = path;
data_source_description.is_encrypted = false;
data_source_description.is_cached = false;
return data_source_description;
}
bool BackupWriterFile::supportNativeCopy(DataSourceDescription data_source_description) const
{
return data_source_description == getDataSourceDescription();
}
void BackupWriterFile::copyFileNative(DiskPtr from_disk, const String & file_name_from, const String & file_name_to)
{
auto file_path = path / file_name_to;
fs::create_directories(file_path.parent_path());
std::string abs_source_path;
if (from_disk)
abs_source_path = fullPath(from_disk, file_name_from);
else
abs_source_path = fs::absolute(file_name_from);
fs::copy(abs_source_path, file_path, fs::copy_options::recursive | fs::copy_options::overwrite_existing);
}
}

View File

@ -9,12 +9,13 @@ namespace DB
class BackupReaderFile : public IBackupReader
{
public:
BackupReaderFile(const String & path_);
explicit BackupReaderFile(const String & path_);
~BackupReaderFile() override;
bool fileExists(const String & file_name) override;
UInt64 getFileSize(const String & file_name) override;
std::unique_ptr<SeekableReadBuffer> readFile(const String & file_name) override;
DataSourceDescription getDataSourceDescription() const override;
private:
std::filesystem::path path;
@ -23,7 +24,7 @@ private:
class BackupWriterFile : public IBackupWriter
{
public:
BackupWriterFile(const String & path_);
explicit BackupWriterFile(const String & path_);
~BackupWriterFile() override;
bool fileExists(const String & file_name) override;
@ -31,6 +32,10 @@ public:
bool fileContentsEqual(const String & file_name, const String & expected_file_contents) override;
std::unique_ptr<WriteBuffer> writeFile(const String & file_name) override;
void removeFiles(const Strings & file_names) override;
DataSourceDescription getDataSourceDescription() const override;
bool supportNativeCopy(DataSourceDescription data_source_description) const override;
void copyFileNative(DiskPtr from_disk, const String & file_name_from, const String & file_name_to) override;
private:
std::filesystem::path path;

View File

@ -111,6 +111,22 @@ public:
UInt64 getSize() const override { return size; }
std::optional<UInt128> getChecksum() const override { return checksum; }
String getFilePath() const override
{
return data_file_name;
}
DiskPtr tryGetDiskIfExists() const override
{
return nullptr;
}
DataSourceDescription getDataSourceDescription() const override
{
return backup->reader->getDataSourceDescription();
}
private:
const std::shared_ptr<const BackupImpl> backup;
const String archive_suffix;
@ -587,9 +603,86 @@ BackupEntryPtr BackupImpl::readFile(const SizeAndChecksum & size_and_checksum) c
}
}
namespace
{
std::optional<SizeAndChecksum> getInfoAboutFileFromBaseBackupIfExists(std::shared_ptr<const IBackup> base_backup, const std::string & file_path)
{
if (base_backup && base_backup->fileExists(file_path))
return std::pair{base_backup->getFileSize(file_path), base_backup->getFileChecksum(file_path)};
return std::nullopt;
}
enum class CheckBackupResult
{
HasPrefix,
HasFull,
HasNothing,
};
CheckBackupResult checkBaseBackupForFile(const SizeAndChecksum & base_backup_info, const FileInfo & new_entry_info)
{
/// We cannot reuse base backup because our file is smaller
/// than file stored in previous backup
if (new_entry_info.size > base_backup_info.first)
return CheckBackupResult::HasNothing;
if (base_backup_info.first == new_entry_info.size)
return CheckBackupResult::HasFull;
return CheckBackupResult::HasPrefix;
}
struct ChecksumsForNewEntry
{
UInt128 full_checksum;
UInt128 prefix_checksum;
};
/// Calculate checksum for backup entry if it's empty.
/// Also able to calculate additional checksum of some prefix.
ChecksumsForNewEntry calculateNewEntryChecksumsIfNeeded(BackupEntryPtr entry, size_t prefix_size)
{
if (prefix_size > 0)
{
auto read_buffer = entry->getReadBuffer();
HashingReadBuffer hashing_read_buffer(*read_buffer);
hashing_read_buffer.ignore(prefix_size);
auto prefix_checksum = hashing_read_buffer.getHash();
if (entry->getChecksum() == std::nullopt)
{
hashing_read_buffer.ignoreAll();
auto full_checksum = hashing_read_buffer.getHash();
return ChecksumsForNewEntry{full_checksum, prefix_checksum};
}
else
{
return ChecksumsForNewEntry{*(entry->getChecksum()), prefix_checksum};
}
}
else
{
if (entry->getChecksum() == std::nullopt)
{
auto read_buffer = entry->getReadBuffer();
HashingReadBuffer hashing_read_buffer(*read_buffer);
hashing_read_buffer.ignoreAll();
return ChecksumsForNewEntry{hashing_read_buffer.getHash(), 0};
}
else
{
return ChecksumsForNewEntry{*(entry->getChecksum()), 0};
}
}
}
}
void BackupImpl::writeFile(const String & file_name, BackupEntryPtr entry)
{
std::lock_guard lock{mutex};
if (open_mode != OpenMode::WRITE)
throw Exception("Backup is not opened for writing", ErrorCodes::LOGICAL_ERROR);
@ -597,164 +690,179 @@ void BackupImpl::writeFile(const String & file_name, BackupEntryPtr entry)
if (writing_finalized)
throw Exception("Backup is already finalized", ErrorCodes::LOGICAL_ERROR);
std::string from_file_name = "memory buffer";
if (auto fname = entry->getFilePath(); !fname.empty())
from_file_name = "file " + fname;
LOG_TRACE(log, "Writing backup for file {} from file {}", file_name, from_file_name);
auto adjusted_path = removeLeadingSlash(file_name);
if (coordination->getFileInfo(adjusted_path))
throw Exception(
ErrorCodes::BACKUP_ENTRY_ALREADY_EXISTS, "Backup {}: Entry {} already exists", backup_name, quoteString(file_name));
FileInfo info;
info.file_name = adjusted_path;
size_t size = entry->getSize();
info.size = size;
FileInfo info
{
.file_name = adjusted_path,
.size = entry->getSize(),
.base_size = 0,
.base_checksum = 0,
};
/// Check if the entry's data is empty.
if (!info.size)
/// Empty file, nothing to backup
if (info.size == 0)
{
coordination->addFileInfo(info);
return;
}
/// Maybe we have a copy of this file in the backup already.
std::optional<UInt128> checksum = entry->getChecksum();
if (checksum && coordination->getFileInfo(std::pair{size, *checksum}))
{
info.checksum = *checksum;
coordination->addFileInfo(info);
return;
}
std::optional<SizeAndChecksum> base_backup_file_info = getInfoAboutFileFromBaseBackupIfExists(base_backup, adjusted_path);
/// Check if a entry with such name exists in the base backup.
bool base_exists = (base_backup && base_backup->fileExists(adjusted_path));
UInt64 base_size = 0;
UInt128 base_checksum{0, 0};
if (base_exists)
/// We have info about this file in base backup
/// If file has no checksum -- calculate and fill it.
if (base_backup_file_info.has_value())
{
base_size = base_backup->getFileSize(adjusted_path);
base_checksum = base_backup->getFileChecksum(adjusted_path);
}
LOG_TRACE(log, "File {} found in base backup, checking for equality", adjusted_path);
CheckBackupResult check_base = checkBaseBackupForFile(*base_backup_file_info, info);
std::unique_ptr<SeekableReadBuffer> read_buffer; /// We'll set that later.
std::optional<HashingReadBuffer> hashing_read_buffer;
UInt64 hashing_pos = 0; /// Current position in `hashing_read_buffer`.
/// Determine whether it's possible to receive this entry's data from the base backup completely or partly.
bool use_base = false;
if (base_exists && base_size && (size >= base_size))
{
if (checksum && (size == base_size))
/// File with the same name but smaller size exist in previous backup
if (check_base == CheckBackupResult::HasPrefix)
{
/// The size is the same, we need to compare checksums to find out
/// if the entry's data has not changed since the base backup.
use_base = (*checksum == base_checksum);
auto checksums = calculateNewEntryChecksumsIfNeeded(entry, base_backup_file_info->first);
info.checksum = checksums.full_checksum;
/// We have prefix of this file in backup with the same checksum.
/// In ClickHouse this can happen for StorageLog for example.
if (checksums.prefix_checksum == base_backup_file_info->second)
{
LOG_TRACE(log, "File prefix of {} in base backup, will write rest part of file to current backup", adjusted_path);
info.base_size = base_backup_file_info->first;
info.base_checksum = base_backup_file_info->second;
}
else
{
LOG_TRACE(log, "Prefix checksum of file {} doesn't match with checksum in base backup", adjusted_path);
}
}
else
{
/// The size has increased, we need to calculate a partial checksum to find out
/// if the entry's data has only appended since the base backup.
read_buffer = entry->getReadBuffer();
hashing_read_buffer.emplace(*read_buffer);
hashing_read_buffer->ignore(base_size);
hashing_pos = base_size;
UInt128 partial_checksum = hashing_read_buffer->getHash();
if (size == base_size)
checksum = partial_checksum;
if (partial_checksum == base_checksum)
use_base = true;
/// We have full file or have nothing, first of all let's get checksum
/// of current file
auto checksums = calculateNewEntryChecksumsIfNeeded(entry, 0);
info.checksum = checksums.full_checksum;
if (info.checksum == base_backup_file_info->second)
{
LOG_TRACE(log, "Found whole file {} in base backup", adjusted_path);
assert(check_base == CheckBackupResult::HasFull);
assert(info.size == base_backup_file_info->first);
info.base_size = base_backup_file_info->first;
info.base_checksum = base_backup_file_info->second;
/// Actually we can add this info to coordination and exist,
/// but we intentionally don't do it, otherwise control flow
/// of this function will be very complex.
}
else
{
LOG_TRACE(log, "Whole file {} in base backup doesn't match by checksum", adjusted_path);
}
}
}
/// Finish calculating the checksum.
if (!checksum)
else /// We don't have info about this file_name (sic!) in base backup,
/// however file could be renamed, so we will check one more time using size and checksum
{
if (!read_buffer)
read_buffer = entry->getReadBuffer();
if (!hashing_read_buffer)
hashing_read_buffer.emplace(*read_buffer);
hashing_read_buffer->ignore(size - hashing_pos);
checksum = hashing_read_buffer->getHash();
LOG_TRACE(log, "Nothing found for file {} in base backup", adjusted_path);
auto checksums = calculateNewEntryChecksumsIfNeeded(entry, 0);
info.checksum = checksums.full_checksum;
}
hashing_read_buffer.reset();
info.checksum = *checksum;
/// Maybe we have a copy of this file in the backup already.
if (coordination->getFileInfo(std::pair{size, *checksum}))
if (coordination->getFileInfo(std::pair{info.size, info.checksum}))
{
LOG_TRACE(log, "File {} already exist in current backup, adding reference", adjusted_path);
coordination->addFileInfo(info);
return;
}
/// Check if a entry with the same checksum exists in the base backup.
if (base_backup && !use_base && base_backup->fileExists(std::pair{size, *checksum}))
/// On the previous lines we checked that backup for file with adjusted_name exist in previous backup.
/// However file can be renamed, but has the same size and checksums, let's check for this case.
if (base_backup && base_backup->fileExists(std::pair{info.size, info.checksum}))
{
/// The entry's data has not changed since the base backup,
/// but the entry itself has been moved or renamed.
base_size = size;
base_checksum = *checksum;
use_base = true;
}
if (use_base)
{
info.base_size = base_size;
info.base_checksum = base_checksum;
}
LOG_TRACE(log, "File {} doesn't exist in current backup, but we have file with same size and checksum", adjusted_path);
info.base_size = info.size;
info.base_checksum = info.checksum;
if (use_base && (size == base_size))
{
/// The entry's data has not been changed since the base backup.
coordination->addFileInfo(info);
return;
}
bool is_data_file_required;
/// All "short paths" failed. We don't have this file in previous or existing backup
/// or have only prefix of it in previous backup. Let's go long path.
info.data_file_name = info.file_name;
info.archive_suffix = current_archive_suffix;
bool is_data_file_required;
coordination->addFileInfo(info, is_data_file_required);
if (!is_data_file_required)
return; /// We copy data only if it's a new combination of size & checksum.
/// Either the entry wasn't exist in the base backup
/// or the entry has data appended to the end of the data from the base backup.
/// In both those cases we have to copy data to this backup.
/// Find out where the start position to copy data is.
auto copy_pos = use_base ? base_size : 0;
/// Move the current read position to the start position to copy data.
if (!read_buffer)
read_buffer = entry->getReadBuffer();
read_buffer->seek(copy_pos, SEEK_SET);
if (!num_files_written)
checkLockFile(true);
/// Copy the entry's data after `copy_pos`.
std::unique_ptr<WriteBuffer> out;
if (use_archives)
{
String archive_suffix = current_archive_suffix;
bool next_suffix = false;
if (current_archive_suffix.empty() && is_internal_backup)
next_suffix = true;
/*if (archive_params.max_volume_size && current_archive_writer
&& (current_archive_writer->getTotalSize() + size - base_size > archive_params.max_volume_size))
next_suffix = true;*/
if (next_suffix)
current_archive_suffix = coordination->getNextArchiveSuffix();
if (info.archive_suffix != current_archive_suffix)
{
info.archive_suffix = current_archive_suffix;
coordination->updateFileInfo(info);
}
out = getArchiveWriter(current_archive_suffix)->writeFile(info.data_file_name);
LOG_TRACE(log, "File {} doesn't exist in current backup, but we have file with same size and checksum", adjusted_path);
return; /// We copy data only if it's a new combination of size & checksum.
}
auto writer_description = writer->getDataSourceDescription();
auto reader_description = entry->getDataSourceDescription();
/// We need to copy whole file without archive, we can do it faster
/// if source and destination are compatible
if (!use_archives && info.base_size == 0 && writer->supportNativeCopy(reader_description))
{
LOG_TRACE(log, "Will copy file {} using native copy", adjusted_path);
/// Should be much faster than writing data through server
writer->copyFileNative(entry->tryGetDiskIfExists(), entry->getFilePath(), info.data_file_name);
}
else
{
out = writer->writeFile(info.data_file_name);
LOG_TRACE(log, "Will copy file {} through memory buffers", adjusted_path);
auto read_buffer = entry->getReadBuffer();
/// If we have prefix in base we will seek to the start of the suffix which differs
if (info.base_size != 0)
read_buffer->seek(info.base_size, SEEK_SET);
if (!num_files_written)
checkLockFile(true);
if (use_archives)
{
LOG_TRACE(log, "Adding file {} to archive", adjusted_path);
String archive_suffix = current_archive_suffix;
bool next_suffix = false;
if (current_archive_suffix.empty() && is_internal_backup)
next_suffix = true;
/*if (archive_params.max_volume_size && current_archive_writer
&& (current_archive_writer->getTotalSize() + size - base_size > archive_params.max_volume_size))
next_suffix = true;*/
if (next_suffix)
current_archive_suffix = coordination->getNextArchiveSuffix();
if (info.archive_suffix != current_archive_suffix)
{
info.archive_suffix = current_archive_suffix;
coordination->updateFileInfo(info);
}
auto out = getArchiveWriter(current_archive_suffix)->writeFile(info.data_file_name);
copyData(*read_buffer, *out);
out->finalize();
}
else
{
writer->copyFileThroughBuffer(std::move(read_buffer), info.data_file_name);
}
}
copyData(*read_buffer, *out);
out->finalize();
++num_files_written;
}

View File

@ -90,7 +90,8 @@ namespace
}
catch (...)
{
coordination->setError(current_host, Exception{getCurrentExceptionCode(), getCurrentExceptionMessage(true, true)});
if (coordination)
coordination->setError(current_host, Exception{getCurrentExceptionCode(), getCurrentExceptionMessage(true, true)});
}
}
@ -164,9 +165,9 @@ OperationID BackupsWorker::startMakingBackup(const ASTPtr & query, const Context
backup_coordination = makeBackupCoordination(backup_settings.coordination_zk_path, context, backup_settings.internal);
}
auto backup_info = BackupInfo::fromAST(*backup_query->backup_name);
try
{
auto backup_info = BackupInfo::fromAST(*backup_query->backup_name);
addInfo(backup_id, backup_info.toString(), backup_settings.internal, BackupStatus::CREATING_BACKUP);
/// Prepare context to use.
@ -213,6 +214,7 @@ OperationID BackupsWorker::startMakingBackup(const ASTPtr & query, const Context
}
catch (...)
{
tryLogCurrentException(log, fmt::format("Failed to start {} {}", (backup_settings.internal ? "internal backup" : "backup"), backup_info.toString()));
/// Something bad happened, the backup has not built.
setStatusSafe(backup_id, BackupStatus::BACKUP_FAILED);
sendCurrentExceptionToCoordination(backup_coordination, backup_settings.host_id);

View File

@ -20,6 +20,20 @@ public:
UInt64 getSize() const override { return getInternalBackupEntry()->getSize(); }
std::optional<UInt128> getChecksum() const override { return getInternalBackupEntry()->getChecksum(); }
std::unique_ptr<SeekableReadBuffer> getReadBuffer() const override { return getInternalBackupEntry()->getReadBuffer(); }
String getFilePath() const override
{
return getInternalBackupEntry()->getFilePath();
}
DiskPtr tryGetDiskIfExists() const override
{
return getInternalBackupEntry()->tryGetDiskIfExists();
}
DataSourceDescription getDataSourceDescription() const override
{
return getInternalBackupEntry()->getDataSourceDescription();
}
private:
BackupEntryPtr getInternalBackupEntry() const

View File

@ -4,6 +4,8 @@
#include <memory>
#include <optional>
#include <vector>
#include <Disks/DiskType.h>
#include <Disks/IDisk.h>
namespace DB
{
@ -24,6 +26,12 @@ public:
/// Returns a read buffer for reading the data.
virtual std::unique_ptr<SeekableReadBuffer> getReadBuffer() const = 0;
virtual String getFilePath() const = 0;
virtual DiskPtr tryGetDiskIfExists() const = 0;
virtual DataSourceDescription getDataSourceDescription() const = 0;
};
using BackupEntryPtr = std::shared_ptr<const IBackupEntry>;

View File

@ -46,7 +46,7 @@ namespace
void checkPath(const String & disk_name, const DiskPtr & disk, fs::path & path)
{
path = path.lexically_normal();
if (!path.is_relative() && (disk->getType() == DiskType::Local))
if (!path.is_relative() && (disk->getDataSourceDescription().type == DataSourceType::Local))
path = path.lexically_proximate(disk->getPath());
bool path_ok = path.empty() || (path.is_relative() && (*path.begin() != ".."));

View File

@ -108,9 +108,10 @@ void FileCache::useCell(
if (file_segment->isDownloaded()
&& fs::file_size(getPathInLocalCache(file_segment->key(), file_segment->offset(), file_segment->isPersistent())) == 0)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot have zero size downloaded file segments. Current file segment: {}",
file_segment->range().toString());
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot have zero size downloaded file segments. {}",
file_segment->getInfoForLog());
result.push_back(cell.file_segment);
@ -872,7 +873,7 @@ void FileCache::remove(
Key key, size_t offset,
std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & /* segment_lock */)
{
LOG_TEST(log, "Remove. Key: {}, offset: {}", key.toString(), offset);
LOG_DEBUG(log, "Remove from cache. Key: {}, offset: {}", key.toString(), offset);
auto * cell = getCell(key, offset, cache_lock);
if (!cell)

View File

@ -55,6 +55,7 @@ FileSegment::FileSegment(
case (State::DOWNLOADED):
{
reserved_size = downloaded_size = size_;
is_downloaded = true;
break;
}
case (State::SKIP_CACHE):
@ -574,6 +575,7 @@ String FileSegment::getInfoForLogImpl(std::lock_guard<std::mutex> & segment_lock
{
WriteBufferFromOwnString info;
info << "File segment: " << range().toString() << ", ";
info << "key: " << key().toString() << ", ";
info << "state: " << download_state << ", ";
info << "downloaded size: " << getDownloadedSize(segment_lock) << ", ";
info << "reserved size: " << reserved_size << ", ";
@ -699,7 +701,7 @@ void FileSegment::detach(
download_state = State::PARTIALLY_DOWNLOADED_NO_CONTINUATION;
downloader_id.clear();
LOG_TEST(log, "Detached file segment: {}", getInfoForLogImpl(segment_lock));
LOG_DEBUG(log, "Detached file segment: {}", getInfoForLogImpl(segment_lock));
}
void FileSegment::markAsDetached(std::lock_guard<std::mutex> & /* segment_lock */)

View File

@ -24,9 +24,7 @@ IFileCachePriority::WriteIterator LRUFileCachePriority::add(const Key & key, siz
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Attempt to add duplicate queue entry to queue. (Key: {}, offset: {}, size: {})",
entry.key.toString(),
entry.offset,
entry.size);
entry.key.toString(), entry.offset, entry.size);
}
#endif
@ -36,6 +34,8 @@ IFileCachePriority::WriteIterator LRUFileCachePriority::add(const Key & key, siz
CurrentMetrics::add(CurrentMetrics::FilesystemCacheSize, size);
CurrentMetrics::add(CurrentMetrics::FilesystemCacheElements);
LOG_DEBUG(log, "Added entry into LRU queue, key: {}, offset: {}", key.toString(), offset);
return std::make_shared<LRUFileCacheIterator>(this, iter);
}
@ -54,6 +54,8 @@ void LRUFileCachePriority::removeAll(std::lock_guard<std::mutex> &)
CurrentMetrics::sub(CurrentMetrics::FilesystemCacheSize, cache_size);
CurrentMetrics::sub(CurrentMetrics::FilesystemCacheElements, queue.size());
LOG_DEBUG(log, "Removed all entries from LRU queue");
queue.clear();
cache_size = 0;
}
@ -86,6 +88,8 @@ void LRUFileCachePriority::LRUFileCacheIterator::removeAndGetNext(std::lock_guar
CurrentMetrics::sub(CurrentMetrics::FilesystemCacheSize, queue_iter->size);
CurrentMetrics::sub(CurrentMetrics::FilesystemCacheElements);
LOG_DEBUG(cache_priority->log, "Removed entry from LRU queue, key: {}, offset: {}", queue_iter->key.toString(), queue_iter->offset);
queue_iter = cache_priority->queue.erase(queue_iter);
}

View File

@ -2,6 +2,7 @@
#include <list>
#include <Common/IFileCachePriority.h>
#include <Common/logger_useful.h>
namespace DB
{
@ -32,6 +33,7 @@ public:
private:
LRUQueue queue;
Poco::Logger * log = &Poco::Logger::get("LRUFileCachePriority");
};
class LRUFileCachePriority::LRUFileCacheIterator : public IFileCachePriority::IIterator

View File

@ -88,6 +88,22 @@ String getBlockDeviceId([[maybe_unused]] const String & path)
#endif
}
std::optional<String> tryGetBlockDeviceId([[maybe_unused]] const String & path)
{
#if defined(OS_LINUX)
struct stat sb;
if (lstat(path.c_str(), &sb))
return {};
WriteBufferFromOwnString ss;
ss << major(sb.st_dev) << ":" << minor(sb.st_dev);
return ss.str();
#else
return {};
#endif
}
#if !defined(OS_LINUX)
[[noreturn]]
#endif

View File

@ -26,6 +26,8 @@ std::unique_ptr<TemporaryFile> createTemporaryFile(const std::string & path);
#endif
String getBlockDeviceId([[maybe_unused]] const String & path);
std::optional<String> tryGetBlockDeviceId([[maybe_unused]] const String & path);
enum class BlockDeviceType
{
UNKNOWN = 0, // we were unable to determine device type

View File

@ -90,24 +90,56 @@ struct DataTypeDecimalTrait
* Sign of `fractional` is expected to be positive, otherwise result is undefined.
* If `scale` is to big (scale > max_precision<DecimalType::NativeType>), result is undefined.
*/
template <typename DecimalType>
inline DecimalType decimalFromComponentsWithMultiplier(
const typename DecimalType::NativeType & whole,
const typename DecimalType::NativeType & fractional,
typename DecimalType::NativeType scale_multiplier)
template <typename DecimalType, bool throw_on_error>
inline bool decimalFromComponentsWithMultiplierImpl(
const typename DecimalType::NativeType & whole,
const typename DecimalType::NativeType & fractional,
typename DecimalType::NativeType scale_multiplier,
DecimalType & result)
{
using T = typename DecimalType::NativeType;
const auto fractional_sign = whole < 0 ? -1 : 1;
T whole_scaled = 0;
if (common::mulOverflow(whole, scale_multiplier, whole_scaled))
throw Exception("Decimal math overflow", ErrorCodes::DECIMAL_OVERFLOW);
{
if constexpr (throw_on_error)
throw Exception("Decimal math overflow", ErrorCodes::DECIMAL_OVERFLOW);
return false;
}
T value;
if (common::addOverflow(whole_scaled, fractional_sign * (fractional % scale_multiplier), value))
throw Exception("Decimal math overflow", ErrorCodes::DECIMAL_OVERFLOW);
{
if constexpr (throw_on_error)
throw Exception("Decimal math overflow", ErrorCodes::DECIMAL_OVERFLOW);
return false;
}
return DecimalType(value);
result = DecimalType(value);
return true;
}
template <typename DecimalType>
inline DecimalType decimalFromComponentsWithMultiplier(
const typename DecimalType::NativeType & whole,
const typename DecimalType::NativeType & fractional,
typename DecimalType::NativeType scale_multiplier)
{
DecimalType result;
decimalFromComponentsWithMultiplierImpl<DecimalType, true>(whole, fractional, scale_multiplier, result);
return result;
}
template <typename DecimalType>
inline bool tryGetDecimalFromComponentsWithMultiplier(
const typename DecimalType::NativeType & whole,
const typename DecimalType::NativeType & fractional,
typename DecimalType::NativeType scale_multiplier,
DecimalType & result)
{
return decimalFromComponentsWithMultiplierImpl<DecimalType, false>(whole, fractional, scale_multiplier, result);
}
template <typename DecimalType>
@ -118,6 +150,15 @@ inline DecimalType decimalFromComponentsWithMultiplier(
return decimalFromComponentsWithMultiplier<DecimalType>(components.whole, components.fractional, scale_multiplier);
}
template <typename DecimalType>
inline bool tryGetDecimalFromComponentsWithMultiplier(
const DecimalComponents<DecimalType> & components,
typename DecimalType::NativeType scale_multiplier,
DecimalType & result)
{
return tryGetDecimalFromComponentsWithMultiplier<DecimalType>(components.whole, components.fractional, scale_multiplier, result);
}
/** Make a decimal value from whole and fractional components with given scale.
*
@ -134,6 +175,18 @@ inline DecimalType decimalFromComponents(
return decimalFromComponentsWithMultiplier<DecimalType>(whole, fractional, scaleMultiplier<T>(scale));
}
template <typename DecimalType>
inline bool tryGetDecimalFromComponents(
const typename DecimalType::NativeType & whole,
const typename DecimalType::NativeType & fractional,
UInt32 scale,
DecimalType & result)
{
using T = typename DecimalType::NativeType;
return tryGetDecimalFromComponentsWithMultiplier<DecimalType>(whole, fractional, scaleMultiplier<T>(scale), result);
}
/** Make a decimal value from whole and fractional components with given scale.
* @see `decimalFromComponentsWithMultiplier` for details.
*/
@ -145,6 +198,15 @@ inline DecimalType decimalFromComponents(
return decimalFromComponents<DecimalType>(components.whole, components.fractional, scale);
}
template <typename DecimalType>
inline bool tryGetDecimalFromComponents(
const DecimalComponents<DecimalType> & components,
UInt32 scale,
DecimalType & result)
{
return tryGetDecimalFromComponents<DecimalType>(components.whole, components.fractional, scale, result);
}
/** Split decimal into whole and fractional parts with given scale_multiplier.
* This is an optimization to reduce number of calls to scaleMultiplier on known scale.
*/

View File

@ -90,6 +90,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(UInt64, s3_max_connections, 1024, "The maximum number of connections per server.", 0) \
M(Bool, s3_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables.", 0) \
M(Bool, s3_create_new_file_on_insert, false, "Enables or disables creating a new file on each insert in s3 engine tables", 0) \
M(Bool, s3_check_objects_after_upload, false, "Check each uploaded object to s3 with head request to be sure that upload was successful", 0) \
M(Bool, enable_s3_requests_logging, false, "Enable very explicit logging of S3 requests. Makes sense for debug only.", 0) \
M(UInt64, hdfs_replication, 0, "The actual number of replications can be specified when the hdfs file is created.", 0) \
M(Bool, hdfs_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables", 0) \

View File

@ -72,7 +72,7 @@ public:
void sync(int fd) const;
String getUniqueId(const String & path) const override { return delegate->getUniqueId(path); }
bool checkUniqueId(const String & id) const override { return delegate->checkUniqueId(id); }
DiskType getType() const override { return delegate->getType(); }
DataSourceDescription getDataSourceDescription() const override { return delegate->getDataSourceDescription(); }
bool isRemote() const override { return delegate->isRemote(); }
bool supportZeroCopyReplication() const override { return delegate->supportZeroCopyReplication(); }
bool supportParallelWrite() const override { return delegate->supportParallelWrite(); }

View File

@ -234,7 +234,13 @@ public:
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap & map) override;
DiskType getType() const override { return DiskType::Encrypted; }
DataSourceDescription getDataSourceDescription() const override
{
auto delegate_description = delegate->getDataSourceDescription();
delegate_description.is_encrypted = true;
return delegate_description;
}
bool isRemote() const override { return delegate->isRemote(); }
SyncGuardPtr getDirectorySyncGuard(const String & path) const override;

View File

@ -230,14 +230,14 @@ std::optional<UInt64> DiskLocal::tryReserve(UInt64 bytes)
if (bytes == 0)
{
LOG_DEBUG(log, "Reserving 0 bytes on disk {}", backQuote(name));
LOG_DEBUG(logger, "Reserving 0 bytes on disk {}", backQuote(name));
++reservation_count;
return {unreserved_space};
}
if (unreserved_space >= bytes)
{
LOG_DEBUG(log, "Reserving {} on disk {}, having unreserved {}.",
LOG_DEBUG(logger, "Reserving {} on disk {}, having unreserved {}.",
ReadableSize(bytes), backQuote(name), ReadableSize(unreserved_space));
++reservation_count;
reserved_bytes += bytes;
@ -497,6 +497,14 @@ DiskLocal::DiskLocal(const String & name_, const String & path_, UInt64 keep_fre
, keep_free_space_bytes(keep_free_space_bytes_)
, logger(&Poco::Logger::get("DiskLocal"))
{
data_source_description.type = DataSourceType::Local;
if (auto block_device_id = tryGetBlockDeviceId(disk_path); block_device_id.has_value())
data_source_description.description = *block_device_id;
else
data_source_description.description = disk_path;
data_source_description.is_encrypted = false;
data_source_description.is_cached = false;
}
DiskLocal::DiskLocal(
@ -507,6 +515,11 @@ DiskLocal::DiskLocal(
disk_checker = std::make_unique<DiskLocalCheckThread>(this, context, local_disk_check_period_ms);
}
DataSourceDescription DiskLocal::getDataSourceDescription() const
{
return data_source_description;
}
void DiskLocal::startup(ContextPtr)
{
try
@ -615,7 +628,6 @@ DiskObjectStoragePtr DiskLocal::createDiskObjectStorage()
"Local",
metadata_storage,
object_storage,
DiskType::Local,
false,
/* threadpool_size */16
);
@ -714,6 +726,13 @@ void DiskLocal::chmod(const String & path, mode_t mode)
DB::throwFromErrnoWithPath("Cannot chmod file: " + path, path, DB::ErrorCodes::PATH_ACCESS_DENIED);
}
MetadataStoragePtr DiskLocal::getMetadataStorage()
{
auto object_storage = std::make_shared<LocalObjectStorage>();
return std::make_shared<FakeMetadataStorageFromDisk>(
std::static_pointer_cast<IDisk>(shared_from_this()), object_storage, getPath());
}
void registerDiskLocal(DiskFactory & factory)
{
auto creator = [](const String & name,

View File

@ -101,7 +101,8 @@ public:
void truncateFile(const String & path, size_t size) override;
DiskType getType() const override { return DiskType::Local; }
DataSourceDescription getDataSourceDescription() const override;
bool isRemote() const override { return false; }
bool supportZeroCopyReplication() const override { return false; }
@ -130,6 +131,8 @@ public:
bool supportsChmod() const override { return true; }
void chmod(const String & path, mode_t mode) override;
MetadataStoragePtr getMetadataStorage() override;
private:
std::optional<UInt64> tryReserve(UInt64 bytes);
@ -145,14 +148,13 @@ private:
const String disk_checker_path = ".disk_checker_file";
std::atomic<UInt64> keep_free_space_bytes;
Poco::Logger * logger;
DataSourceDescription data_source_description;
UInt64 reserved_bytes = 0;
UInt64 reservation_count = 0;
static std::mutex reservation_mutex;
Poco::Logger * log = &Poco::Logger::get("DiskLocal");
std::atomic<bool> broken{false};
std::atomic<bool> readonly{false};
std::unique_ptr<DiskLocalCheckThread> disk_checker;

View File

@ -7,6 +7,8 @@
#include <IO/WriteBufferFromString.h>
#include <Interpreters/Context.h>
#include <Disks/ObjectStorages/LocalObjectStorage.h>
#include <Disks/ObjectStorages/FakeMetadataStorageFromDisk.h>
namespace DB
{
@ -443,6 +445,13 @@ void DiskMemory::truncateFile(const String & path, size_t size)
file_it->second.data.resize(size);
}
MetadataStoragePtr DiskMemory::getMetadataStorage()
{
auto object_storage = std::make_shared<LocalObjectStorage>();
return std::make_shared<FakeMetadataStorageFromDisk>(
std::static_pointer_cast<IDisk>(shared_from_this()), object_storage, getPath());
}
using DiskMemoryPtr = std::shared_ptr<DiskMemory>;

View File

@ -91,11 +91,14 @@ public:
void truncateFile(const String & path, size_t size) override;
DiskType getType() const override { return DiskType::RAM; }
DataSourceDescription getDataSourceDescription() const override { return DataSourceDescription{DataSourceType::RAM, "", false, false}; }
bool isRemote() const override { return false; }
bool supportZeroCopyReplication() const override { return false; }
MetadataStoragePtr getMetadataStorage() override;
private:
void createDirectoriesImpl(const String & path);
void replaceFileImpl(const String & from_path, const String & to_path);

11
src/Disks/DiskType.cpp Normal file
View File

@ -0,0 +1,11 @@
#include "DiskType.h"
namespace DB
{
bool DataSourceDescription::operator==(const DataSourceDescription & other) const
{
return std::tie(type, description, is_encrypted) == std::tie(other.type, other.description, other.is_encrypted);
}
}

View File

@ -5,40 +5,45 @@
namespace DB
{
enum class DiskType
enum class DataSourceType
{
Local,
RAM,
S3,
HDFS,
Encrypted,
WebServer,
AzureBlobStorage,
Cache,
};
inline String toString(DiskType disk_type)
inline String toString(DataSourceType data_source_type)
{
switch (disk_type)
switch (data_source_type)
{
case DiskType::Local:
case DataSourceType::Local:
return "local";
case DiskType::RAM:
case DataSourceType::RAM:
return "memory";
case DiskType::S3:
case DataSourceType::S3:
return "s3";
case DiskType::HDFS:
case DataSourceType::HDFS:
return "hdfs";
case DiskType::Encrypted:
return "encrypted";
case DiskType::WebServer:
case DataSourceType::WebServer:
return "web";
case DiskType::AzureBlobStorage:
case DataSourceType::AzureBlobStorage:
return "azure_blob_storage";
case DiskType::Cache:
return "cache";
}
__builtin_unreachable();
}
struct DataSourceDescription
{
DataSourceType type;
std::string description;
bool is_encrypted = false;
bool is_cached = false;
bool operator==(const DataSourceDescription & other) const;
};
}

View File

@ -113,7 +113,7 @@ void IDisk::copyDirectoryContent(const String & from_dir, const std::shared_ptr<
void IDisk::truncateFile(const String &, size_t)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Truncate operation is not implemented for disk of type {}", getType());
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Truncate operation is not implemented for disk of type {}", getDataSourceDescription().type);
}
SyncGuardPtr IDisk::getDirectorySyncGuard(const String & /* path */) const
@ -121,18 +121,4 @@ SyncGuardPtr IDisk::getDirectorySyncGuard(const String & /* path */) const
return nullptr;
}
MetadataStoragePtr IDisk::getMetadataStorage()
{
if (isRemote())
{
return std::make_shared<MetadataStorageFromDisk>(std::static_pointer_cast<IDisk>(shared_from_this()), "");
}
else
{
auto object_storage = std::make_shared<LocalObjectStorage>();
return std::make_shared<FakeMetadataStorageFromDisk>(
std::static_pointer_cast<IDisk>(shared_from_this()), object_storage, getPath());
}
}
}

View File

@ -227,7 +227,7 @@ public:
virtual NameSet getCacheLayersNames() const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method `getCacheLayersNames()` is not implemented for disk: {}", getType());
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method `getCacheLayersNames()` is not implemented for disk: {}", getDataSourceDescription().type);
}
/// Returns a list of storage objects (contains path, size, ...).
@ -235,7 +235,7 @@ public:
/// be multiple files in remote fs for single clickhouse file.
virtual StoredObjects getStorageObjects(const String &) const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method `getStorageObjects() not implemented for disk: {}`", getType());
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method `getStorageObjects() not implemented for disk: {}`", getDataSourceDescription().type);
}
/// For one local path there might be multiple remote paths in case of Log family engines.
@ -243,7 +243,7 @@ public:
virtual void getRemotePathsRecursive(const String &, std::vector<LocalPathWithObjectStoragePaths> &)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method `getRemotePathsRecursive() not implemented for disk: {}`", getType());
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method `getRemotePathsRecursive() not implemented for disk: {}`", getDataSourceDescription().type);
}
/// Batch request to remove multiple files.
@ -271,8 +271,8 @@ public:
/// Truncate file to specified size.
virtual void truncateFile(const String & path, size_t size);
/// Return disk type - "local", "s3", etc.
virtual DiskType getType() const = 0;
/// Return data source description
virtual DataSourceDescription getDataSourceDescription() const = 0;
/// Involves network interaction.
virtual bool isRemote() const = 0;
@ -321,7 +321,7 @@ public:
/// Actually it's a part of IDiskRemote implementation but we have so
/// complex hierarchy of disks (with decorators), so we cannot even
/// dynamic_cast some pointer to IDisk to pointer to IDiskRemote.
virtual MetadataStoragePtr getMetadataStorage();
virtual MetadataStoragePtr getMetadataStorage() = 0;
/// Very similar case as for getMetadataDiskIfExistsOrSelf(). If disk has "metadata"
/// it will return mapping for each required path: path -> metadata as string.
@ -357,7 +357,7 @@ public:
throw Exception(
ErrorCodes::NOT_IMPLEMENTED,
"Method createDiskObjectStorage() is not implemented for disk type: {}",
getType());
getDataSourceDescription().type);
}
virtual bool supportsStat() const { return false; }

View File

@ -32,6 +32,10 @@ AzureObjectStorage::AzureObjectStorage(
, settings(std::move(settings_))
, log(&Poco::Logger::get("AzureObjectStorage"))
{
data_source_description.type = DataSourceType::AzureBlobStorage;
data_source_description.description = client.get()->GetUrl();
data_source_description.is_cached = false;
data_source_description.is_encrypted = false;
}
std::string AzureObjectStorage::generateBlobNameForPath(const std::string & /* path */)

View File

@ -57,6 +57,8 @@ public:
AzureClientPtr && client_,
SettingsPtr && settings_);
DataSourceDescription getDataSourceDescription() const override { return data_source_description; }
std::string getName() const override { return "AzureObjectStorage"; }
bool exists(const StoredObject & object) const override;
@ -128,6 +130,8 @@ private:
MultiVersion<AzureObjectStorageSettings> settings;
Poco::Logger * log;
DataSourceDescription data_source_description;
};
}

View File

@ -90,7 +90,6 @@ void registerDiskAzureBlobStorage(DiskFactory & factory)
"DiskAzureBlobStorage",
std::move(metadata_storage),
std::move(azure_object_storage),
DiskType::AzureBlobStorage,
send_metadata,
copy_thread_pool_size
);

View File

@ -34,6 +34,13 @@ CachedObjectStorage::CachedObjectStorage(
cache->initialize();
}
DataSourceDescription CachedObjectStorage::getDataSourceDescription() const
{
auto wrapped_object_storage_data_source = object_storage->getDataSourceDescription();
wrapped_object_storage_data_source.is_cached = true;
return wrapped_object_storage_data_source;
}
FileCache::Key CachedObjectStorage::getCacheKey(const std::string & path) const
{
return cache->hash(path);

View File

@ -20,6 +20,8 @@ class CachedObjectStorage final : public IObjectStorage
public:
CachedObjectStorage(ObjectStoragePtr object_storage_, FileCachePtr cache_, const FileCacheSettings & cache_settings_, const String & cache_config_name_);
DataSourceDescription getDataSourceDescription() const override;
std::string getName() const override { return fmt::format("CachedObjectStorage-{}({})", cache_config_name, object_storage->getName()); }
bool exists(const StoredObject & object) const override;

View File

@ -103,14 +103,12 @@ DiskObjectStorage::DiskObjectStorage(
const String & log_name,
MetadataStoragePtr metadata_storage_,
ObjectStoragePtr object_storage_,
DiskType disk_type_,
bool send_metadata_,
uint64_t thread_pool_size_)
: IDisk(getAsyncExecutor(log_name, thread_pool_size_))
, name(name_)
, object_storage_root_path(object_storage_root_path_)
, log (&Poco::Logger::get("DiskObjectStorage(" + log_name + ")"))
, disk_type(disk_type_)
, metadata_storage(std::move(metadata_storage_))
, object_storage(std::move(object_storage_))
, send_metadata(send_metadata_)
@ -216,6 +214,22 @@ void DiskObjectStorage::moveFile(const String & from_path, const String & to_pat
transaction->commit();
}
void DiskObjectStorage::copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path)
{
/// It's the same object storage disk
if (this == to_disk.get())
{
auto transaction = createObjectStorageTransaction();
transaction->copyFile(from_path, to_path);
transaction->commit();
}
else
{
IDisk::copy(from_path, to_disk, to_path);
}
}
void DiskObjectStorage::moveFile(const String & from_path, const String & to_path)
{
moveFile(from_path, to_path, send_metadata);
@ -469,7 +483,6 @@ DiskObjectStoragePtr DiskObjectStorage::createDiskObjectStorage()
getName(),
metadata_storage,
object_storage,
disk_type,
send_metadata,
threadpool_size);
}

View File

@ -34,14 +34,13 @@ public:
const String & log_name,
MetadataStoragePtr metadata_storage_,
ObjectStoragePtr object_storage_,
DiskType disk_type_,
bool send_metadata_,
uint64_t thread_pool_size_);
/// Create fake transaction
DiskTransactionPtr createTransaction() override;
DiskType getType() const override { return disk_type; }
DataSourceDescription getDataSourceDescription() const override { return object_storage->getDataSourceDescription(); }
bool supportZeroCopyReplication() const override { return true; }
@ -154,6 +153,8 @@ public:
WriteMode mode,
const WriteSettings & settings) override;
void copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path) override;
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context_, const String &, const DisksMap &) override;
void restoreMetadataIfNeeded(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context);
@ -206,7 +207,6 @@ private:
const String object_storage_root_path;
Poco::Logger * log;
const DiskType disk_type;
MetadataStoragePtr metadata_storage;
ObjectStoragePtr object_storage;

View File

@ -48,10 +48,20 @@ public:
, hdfs_builder(createHDFSBuilder(hdfs_root_path_, config))
, hdfs_fs(createHDFSFS(hdfs_builder.get()))
, settings(std::move(settings_))
{}
{
data_source_description.type = DataSourceType::HDFS;
data_source_description.description = hdfs_root_path_;
data_source_description.is_cached = false;
data_source_description.is_encrypted = false;
}
std::string getName() const override { return "HDFSObjectStorage"; }
DataSourceDescription getDataSourceDescription() const override
{
return data_source_description;
}
bool exists(const StoredObject & object) const override;
std::unique_ptr<ReadBufferFromFileBase> readObject( /// NOLINT
@ -121,6 +131,8 @@ private:
HDFSFSPtr hdfs_fs;
SettingsPtr settings;
DataSourceDescription data_source_description;
};
}

View File

@ -49,7 +49,6 @@ void registerDiskHDFS(DiskFactory & factory)
"DiskHDFS",
std::move(metadata_storage),
std::move(hdfs_storage),
DiskType::HDFS,
/* send_metadata = */ false,
copy_thread_pool_size);

View File

@ -15,6 +15,7 @@
#include <Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h>
#include <Disks/ObjectStorages/StoredObject.h>
#include <Disks/DiskType.h>
#include <Common/ThreadPool.h>
#include <Common/FileCache.h>
#include <Disks/WriteMode.h>
@ -58,6 +59,8 @@ class IObjectStorage
public:
IObjectStorage() = default;
virtual DataSourceDescription getDataSourceDescription() const = 0;
virtual std::string getName() const = 0;
/// Object exists or not

View File

@ -28,6 +28,14 @@ namespace ErrorCodes
LocalObjectStorage::LocalObjectStorage()
: log(&Poco::Logger::get("LocalObjectStorage"))
{
data_source_description.type = DataSourceType::Local;
if (auto block_device_id = tryGetBlockDeviceId("/"); block_device_id.has_value())
data_source_description.description = *block_device_id;
else
data_source_description.description = "/";
data_source_description.is_cached = false;
data_source_description.is_encrypted = false;
}
bool LocalObjectStorage::exists(const StoredObject & object) const

View File

@ -17,6 +17,8 @@ class LocalObjectStorage : public IObjectStorage
public:
LocalObjectStorage();
DataSourceDescription getDataSourceDescription() const override { return data_source_description; }
std::string getName() const override { return "LocalObjectStorage"; }
bool exists(const StoredObject & object) const override;
@ -86,6 +88,7 @@ public:
private:
Poco::Logger * log;
DataSourceDescription data_source_description;
};
}

View File

@ -2,6 +2,8 @@
#if USE_AWS_S3
#include <IO/S3Common.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/ObjectStorages/DiskObjectStorageCommon.h>
#include <Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h>
@ -25,6 +27,7 @@
#include <aws/s3/model/AbortMultipartUploadRequest.h>
#include <Common/getRandomASCIIString.h>
#include <Common/logger_useful.h>
#include <Common/MultiVersion.h>
@ -369,6 +372,15 @@ void S3ObjectStorage::copyObjectImpl(
}
throwIfError(outcome);
auto settings_ptr = s3_settings.get();
if (settings_ptr->s3_settings.check_objects_after_upload)
{
auto object_head = requestObjectHeadData(dst_bucket, dst_key);
if (!object_head.IsSuccess())
throw Exception(ErrorCodes::S3_ERROR, "Object {} from bucket {} disappeared immediately after upload, it's a bug in S3 or S3 API.", dst_key, dst_bucket);
}
}
void S3ObjectStorage::copyObjectMultipartImpl(
@ -450,6 +462,14 @@ void S3ObjectStorage::copyObjectMultipartImpl(
throwIfError(outcome);
}
if (settings_ptr->s3_settings.check_objects_after_upload)
{
auto object_head = requestObjectHeadData(dst_bucket, dst_key);
if (!object_head.IsSuccess())
throw Exception(ErrorCodes::S3_ERROR, "Object {} from bucket {} disappeared immediately after upload, it's a bug in S3 or S3 API.", dst_key, dst_bucket);
}
}
void S3ObjectStorage::copyObject( // NOLINT
@ -511,7 +531,8 @@ std::unique_ptr<IObjectStorage> S3ObjectStorage::cloneObjectStorage(
return std::make_unique<S3ObjectStorage>(
getClient(config, config_prefix, context),
getSettings(config, config_prefix, context),
version_id, s3_capabilities, new_namespace);
version_id, s3_capabilities, new_namespace,
S3::URI(Poco::URI(config.getString(config_prefix + ".endpoint"))).endpoint);
}
}

View File

@ -48,13 +48,23 @@ public:
std::unique_ptr<S3ObjectStorageSettings> && s3_settings_,
String version_id_,
const S3Capabilities & s3_capabilities_,
String bucket_)
String bucket_,
String connection_string)
: bucket(bucket_)
, client(std::move(client_))
, s3_settings(std::move(s3_settings_))
, s3_capabilities(s3_capabilities_)
, version_id(std::move(version_id_))
{
data_source_description.type = DataSourceType::S3;
data_source_description.description = connection_string;
data_source_description.is_cached = false;
data_source_description.is_encrypted = false;
}
DataSourceDescription getDataSourceDescription() const override
{
return data_source_description;
}
std::string getName() const override { return "S3ObjectStorage"; }
@ -169,6 +179,8 @@ private:
S3Capabilities s3_capabilities;
const String version_id;
DataSourceDescription data_source_description;
};
}

View File

@ -40,6 +40,7 @@ std::unique_ptr<S3ObjectStorageSettings> getSettings(const Poco::Util::AbstractC
rw_settings.upload_part_size_multiply_factor = config.getUInt64(config_prefix + ".s3_upload_part_size_multiply_factor", context->getSettingsRef().s3_upload_part_size_multiply_factor);
rw_settings.upload_part_size_multiply_parts_count_threshold = config.getUInt64(config_prefix + ".s3_upload_part_size_multiply_parts_count_threshold", context->getSettingsRef().s3_upload_part_size_multiply_parts_count_threshold);
rw_settings.max_single_part_upload_size = config.getUInt64(config_prefix + ".s3_max_single_part_upload_size", context->getSettingsRef().s3_max_single_part_upload_size);
rw_settings.check_objects_after_upload = config.getUInt64(config_prefix + ".s3_check_objects_after_upload", context->getSettingsRef().s3_check_objects_after_upload);
return std::make_unique<S3ObjectStorageSettings>(
rw_settings,

View File

@ -130,7 +130,7 @@ void registerDiskS3(DiskFactory & factory)
auto s3_storage = std::make_unique<S3ObjectStorage>(
getClient(config, config_prefix, context),
getSettings(config, config_prefix, context),
uri.version_id, s3_capabilities, uri.bucket);
uri.version_id, s3_capabilities, uri.bucket, uri.endpoint);
bool skip_access_check = config.getBool(config_prefix + ".skip_access_check", false);
@ -159,7 +159,6 @@ void registerDiskS3(DiskFactory & factory)
"DiskS3",
std::move(metadata_storage),
std::move(s3_storage),
DiskType::S3,
send_metadata,
copy_thread_pool_size);

View File

@ -20,6 +20,16 @@ class WebObjectStorage : public IObjectStorage, WithContext
public:
WebObjectStorage(const String & url_, ContextPtr context_);
DataSourceDescription getDataSourceDescription() const override
{
return DataSourceDescription{
.type = DataSourceType::WebServer,
.description = url,
.is_encrypted = false,
.is_cached = false,
};
}
std::string getName() const override { return "WebObjectStorage"; }
bool exists(const StoredObject & object) const override;

View File

@ -47,7 +47,6 @@ void registerDiskWebServer(DiskFactory & factory)
"DiskWebServer",
metadata_storage,
object_storage,
DiskType::WebServer,
/* send_metadata */false,
/* threadpool_size */16);
};

View File

@ -85,6 +85,9 @@ class FunctionArrayMapped : public IFunction
{
public:
static constexpr auto name = Name::name;
static constexpr bool is_argument_type_map = std::is_same_v<typename Impl::data_type, DataTypeMap>;
static constexpr bool is_argument_type_array = std::is_same_v<typename Impl::data_type, DataTypeArray>;
static constexpr auto argument_type_name = is_argument_type_map ? "Map" : "Array";
static FunctionPtr create(ContextPtr) { return std::make_shared<FunctionArrayMapped>(); }
String getName() const override
@ -112,20 +115,25 @@ public:
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Function {} needs one argument with data", getName());
size_t nested_types_count = std::is_same_v<typename Impl::data_type, DataTypeMap> ? (arguments.size() - 1) * 2 : (arguments.size() - 1);
size_t nested_types_count = is_argument_type_map ? (arguments.size() - 1) * 2 : (arguments.size() - 1);
DataTypes nested_types(nested_types_count);
for (size_t i = 0; i < arguments.size() - 1; ++i)
{
const auto * array_type = checkAndGetDataType<typename Impl::data_type>(&*arguments[i + 1]);
if (!array_type)
throw Exception("Argument " + toString(i + 2) + " of function " + getName() + " must be array. Found "
+ arguments[i + 1]->getName() + " instead.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
if constexpr (std::is_same_v<typename Impl::data_type, DataTypeMap>)
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Argument {} of function {} must be {}. Found {} instead",
toString(i + 2),
getName(),
argument_type_name,
arguments[i + 1]->getName());
if constexpr (is_argument_type_map)
{
nested_types[2 * i] = recursiveRemoveLowCardinality(array_type->getKeyType());
nested_types[2 * i + 1] = recursiveRemoveLowCardinality(array_type->getValueType());
}
else if constexpr (std::is_same_v<typename Impl::data_type, DataTypeArray>)
else if constexpr (is_argument_type_array)
{
nested_types[i] = recursiveRemoveLowCardinality(array_type->getNestedType());
}
@ -149,7 +157,7 @@ public:
"Function {} needs at least {} argument, passed {}",
getName(), min_args, arguments.size());
if ((arguments.size() == 1) && std::is_same_v<typename Impl::data_type, DataTypeArray>)
if ((arguments.size() == 1) && is_argument_type_array)
{
const auto * data_type = checkAndGetDataType<typename Impl::data_type>(arguments[0].type.get());
@ -163,7 +171,7 @@ public:
throw Exception("The only argument for function " + getName() + " must be array of UInt8. Found "
+ arguments[0].type->getName() + " instead", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
if constexpr (std::is_same_v<typename Impl::data_type, DataTypeArray>)
if constexpr (is_argument_type_array)
return Impl::getReturnType(nested_type, nested_type);
else
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Unreachable code reached");
@ -193,10 +201,7 @@ public:
throw Exception("Expression for function " + getName() + " must return UInt8 or Nullable(UInt8), found "
+ return_type->getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
static_assert(
std::is_same_v<typename Impl::data_type, DataTypeMap> ||
std::is_same_v<typename Impl::data_type, DataTypeArray>,
"unsupported type");
static_assert(is_argument_type_map || is_argument_type_array, "unsupported type");
if (arguments.size() < 2)
{
@ -208,10 +213,10 @@ public:
if (!first_array_type)
throw DB::Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Unsupported type {}", arguments[1].type->getName());
if constexpr (std::is_same_v<typename Impl::data_type, DataTypeArray>)
if constexpr (is_argument_type_array)
return Impl::getReturnType(return_type, first_array_type->getNestedType());
if constexpr (std::is_same_v<typename Impl::data_type, DataTypeMap>)
if constexpr (is_argument_type_map)
return Impl::getReturnType(return_type, first_array_type->getKeyValueTypes());
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Unreachable code reached");
@ -229,7 +234,11 @@ public:
{
const ColumnConst * column_const_array = checkAndGetColumnConst<typename Impl::column_type>(column_array_ptr.get());
if (!column_const_array)
throw Exception("Expected array column, found " + column_array_ptr->getName(), ErrorCodes::ILLEGAL_COLUMN);
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"Expected {} column, found {}",
argument_type_name,
column_array_ptr->getName());
column_array_ptr = column_const_array->convertToFullColumn();
column_array = assert_cast<const typename Impl::column_type *>(column_array_ptr.get());
}
@ -279,13 +288,15 @@ public:
{
const ColumnConst * column_const_array = checkAndGetColumnConst<typename Impl::column_type>(column_array_ptr.get());
if (!column_const_array)
throw Exception("Expected array column, found " + column_array_ptr->getName(), ErrorCodes::ILLEGAL_COLUMN);
throw Exception(
ErrorCodes::ILLEGAL_COLUMN, "Expected {} column, found {}", argument_type_name, column_array_ptr->getName());
column_array_ptr = recursiveRemoveLowCardinality(column_const_array->convertToFullColumn());
column_array = checkAndGetColumn<typename Impl::column_type>(column_array_ptr.get());
}
if (!array_type)
throw Exception("Expected array type, found " + array_type_ptr->getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Expected {} type, found {}", argument_type_name, array_type_ptr->getName());
if (!offsets_column)
{
@ -296,7 +307,11 @@ public:
/// The first condition is optimization: do not compare data if the pointers are equal.
if (getOffsetsPtr(*column_array) != offsets_column
&& getOffsets(*column_array) != typeid_cast<const ColumnArray::ColumnOffsets &>(*offsets_column).getData())
throw Exception("Arrays passed to " + getName() + " must have equal size", ErrorCodes::SIZES_OF_ARRAYS_DOESNT_MATCH);
throw Exception(
ErrorCodes::SIZES_OF_ARRAYS_DOESNT_MATCH,
"{}s passed to {} must have equal size",
argument_type_name,
getName());
}
if (i == 1)
@ -305,7 +320,7 @@ public:
column_first_array = column_array;
}
if constexpr (std::is_same_v<DataTypeMap, typename Impl::data_type>)
if constexpr (is_argument_type_map)
{
arrays.emplace_back(ColumnWithTypeAndName(
column_array->getNestedData().getColumnPtr(0), recursiveRemoveLowCardinality(array_type->getKeyType()), array_with_type_and_name.name+".key"));

View File

@ -964,7 +964,13 @@ inline ReturnType readDateTimeTextImpl(DateTime64 & datetime64, UInt32 scale, Re
components.whole = components.whole / common::exp10_i32(scale);
}
datetime64 = negative_multiplier * DecimalUtils::decimalFromComponents<DateTime64>(components, scale);
if constexpr (std::is_same_v<ReturnType, void>)
datetime64 = DecimalUtils::decimalFromComponents<DateTime64>(components, scale);
else
DecimalUtils::tryGetDecimalFromComponents<DateTime64>(components, scale, datetime64);
datetime64 *= negative_multiplier;
return ReturnType(true);
}

View File

@ -5,6 +5,8 @@
#include "PocoHTTPClient.h"
#include <utility>
#include <algorithm>
#include <functional>
#include <Common/logger_useful.h>
#include <Common/Stopwatch.h>
@ -14,6 +16,7 @@
#include <aws/core/http/HttpRequest.h>
#include <aws/core/http/HttpResponse.h>
#include <aws/core/utils/xml/XmlSerializer.h>
#include <aws/core/monitoring/HttpClientMetrics.h>
#include <aws/core/utils/ratelimiter/RateLimiterInterface.h>
#include "Poco/StreamCopier.h"
@ -23,6 +26,8 @@
#include <boost/algorithm/string.hpp>
static const int SUCCESS_RESPONSE_MIN = 200;
static const int SUCCESS_RESPONSE_MAX = 299;
namespace ProfileEvents
{
@ -121,6 +126,37 @@ std::shared_ptr<Aws::Http::HttpResponse> PocoHTTPClient::MakeRequest(
return response;
}
namespace
{
/// No comments:
/// 1) https://aws.amazon.com/premiumsupport/knowledge-center/s3-resolve-200-internalerror/
/// 2) https://github.com/aws/aws-sdk-cpp/issues/658
bool checkRequestCanReturn2xxAndErrorInBody(Aws::Http::HttpRequest & request)
{
auto query_params = request.GetQueryStringParameters();
if (request.HasHeader("z-amz-copy-source"))
{
/// CopyObject https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html
if (query_params.empty())
return true;
/// UploadPartCopy https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPartCopy.html
if (query_params.contains("partNumber") && query_params.contains("uploadId"))
return true;
}
else
{
/// CompleteMultipartUpload https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
if (query_params.size() == 1 && query_params.contains("uploadId"))
return true;
}
return false;
}
}
void PocoHTTPClient::makeRequestInternal(
Aws::Http::HttpRequest & request,
std::shared_ptr<PocoHTTPResponse> & response,
@ -281,6 +317,7 @@ void PocoHTTPClient::makeRequestInternal(
ProfileEvents::increment(select_metric(S3MetricType::Microseconds), watch.elapsedMicroseconds());
int status_code = static_cast<int>(poco_response.getStatus());
if (enable_s3_requests_logging)
LOG_TEST(log, "Response status: {}, {}", status_code, poco_response.getReason());
@ -316,18 +353,44 @@ void PocoHTTPClient::makeRequestInternal(
response->AddHeader(header_name, header_value);
}
if (status_code == 429 || status_code == 503)
{ // API throttling
ProfileEvents::increment(select_metric(S3MetricType::Throttling));
}
else if (status_code >= 300)
/// Request is successful but for some special requests we can have actual error message in body
if (status_code >= SUCCESS_RESPONSE_MIN && status_code <= SUCCESS_RESPONSE_MAX && checkRequestCanReturn2xxAndErrorInBody(request))
{
ProfileEvents::increment(select_metric(S3MetricType::Errors));
if (status_code >= 500 && error_report)
error_report(request_configuration);
}
std::string response_string((std::istreambuf_iterator<char>(response_body_stream)),
std::istreambuf_iterator<char>());
response->SetResponseBody(response_body_stream, session);
/// Just trim string so it will not be so long
LOG_TRACE(log, "Got dangerous response with successful code {}, checking its body: '{}'", status_code, response_string.substr(0, 300));
const static std::string_view needle = "<Error>";
if (auto it = std::search(response_string.begin(), response_string.end(), std::default_searcher(needle.begin(), needle.end())); it != response_string.end())
{
LOG_WARNING(log, "Response for request contain <Error> tag in body, settings internal server error (500 code)");
response->SetResponseCode(Aws::Http::HttpResponseCode::INTERNAL_SERVER_ERROR);
ProfileEvents::increment(select_metric(S3MetricType::Errors));
if (error_report)
error_report(request_configuration);
}
/// Set response from string
response->SetResponseBody(response_string);
}
else
{
if (status_code == 429 || status_code == 503)
{ // API throttling
ProfileEvents::increment(select_metric(S3MetricType::Throttling));
}
else if (status_code >= 300)
{
ProfileEvents::increment(select_metric(S3MetricType::Errors));
if (status_code >= 500 && error_report)
error_report(request_configuration);
}
response->SetResponseBody(response_body_stream, session);
}
return;
}

View File

@ -80,6 +80,13 @@ public:
);
}
void SetResponseBody(std::string & response_body) /// NOLINT
{
auto stream = Aws::New<std::stringstream>("http result buf", response_body); // STYLE_CHECK_ALLOW_STD_STRING_STREAM
stream->exceptions(std::ios::failbit);
body_stream = Aws::Utils::Stream::ResponseStream(std::move(stream));
}
Aws::IOStream & GetResponseBody() const override
{
return body_stream.GetUnderlyingStream();

View File

@ -35,9 +35,8 @@ WriteBufferFromHTTP::WriteBufferFromHTTP(
void WriteBufferFromHTTP::finalizeImpl()
{
// for compressed body, the data is stored in buffered first
// here, make sure the content in the buffer has been flushed
this->nextImpl();
// Make sure the content in the buffer has been flushed
this->next();
receiveResponse(*session, request, response, false);
/// TODO: Response body is ignored.

View File

@ -15,6 +15,7 @@
#include <aws/s3/model/CompleteMultipartUploadRequest.h>
#include <aws/s3/model/PutObjectRequest.h>
#include <aws/s3/model/UploadPartRequest.h>
#include <aws/s3/model/HeadObjectRequest.h>
#include <utility>
@ -164,6 +165,20 @@ void WriteBufferFromS3::finalizeImpl()
if (!multipart_upload_id.empty())
completeMultipartUpload();
if (s3_settings.check_objects_after_upload)
{
LOG_TRACE(log, "Checking object {} exists after upload", key);
Aws::S3::Model::HeadObjectRequest request;
request.SetBucket(bucket);
request.SetKey(key);
auto response = client_ptr->HeadObject(request);
if (!response.IsSuccess())
throw Exception(ErrorCodes::S3_ERROR, "Object {} from bucket {} disappeared immediately after upload, it's a bug in S3 or S3 API.", key, bucket);
}
}
void WriteBufferFromS3::createMultipartUpload()

View File

@ -415,7 +415,7 @@ std::string DataPartStorageOnDisk::getDiskName() const
std::string DataPartStorageOnDisk::getDiskType() const
{
return toString(volume->getDisk()->getType());
return toString(volume->getDisk()->getDataSourceDescription().type);
}
bool DataPartStorageOnDisk::isStoredOnRemoteDisk() const

View File

@ -458,11 +458,11 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
Disks disks = data.getDisks();
for (const auto & data_disk : disks)
if (data_disk->supportZeroCopyReplication())
capability.push_back(toString(data_disk->getType()));
capability.push_back(toString(data_disk->getDataSourceDescription().type));
}
else if (disk->supportZeroCopyReplication())
{
capability.push_back(toString(disk->getType()));
capability.push_back(toString(disk->getDataSourceDescription().type));
}
}
if (!capability.empty())

View File

@ -443,6 +443,7 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
pos += unused_flags_len + commit_lsn_len + transaction_end_lsn_len + transaction_commit_timestamp_len;
final_lsn = current_lsn;
committed = true;
break;
}
case 'R': // Relation
@ -593,6 +594,12 @@ void MaterializedPostgreSQLConsumer::syncTables()
LOG_DEBUG(log, "Table sync end for {} tables, last lsn: {} = {}, (attempted lsn {})", tables_to_sync.size(), current_lsn, getLSNValue(current_lsn), getLSNValue(final_lsn));
updateLsn();
}
void MaterializedPostgreSQLConsumer::updateLsn()
{
try
{
auto tx = std::make_shared<pqxx::nontransaction>(connection->getRef());
@ -614,6 +621,7 @@ String MaterializedPostgreSQLConsumer::advanceLSN(std::shared_ptr<pqxx::nontrans
final_lsn = result[0][0].as<std::string>();
LOG_TRACE(log, "Advanced LSN up to: {}", getLSNValue(final_lsn));
committed = false;
return final_lsn;
}
@ -771,7 +779,7 @@ bool MaterializedPostgreSQLConsumer::readFromReplicationSlot()
try
{
// LOG_DEBUG(log, "Current message: {}", (*row)[1]);
/// LOG_DEBUG(log, "Current message: {}", (*row)[1]);
processReplicationMessage((*row)[1].c_str(), (*row)[1].size());
}
catch (const Exception & e)
@ -790,6 +798,7 @@ bool MaterializedPostgreSQLConsumer::readFromReplicationSlot()
}
catch (const pqxx::broken_connection &)
{
LOG_DEBUG(log, "Connection was broken");
connection->tryUpdateConnection();
return false;
}
@ -823,7 +832,13 @@ bool MaterializedPostgreSQLConsumer::readFromReplicationSlot()
}
if (!tables_to_sync.empty())
{
syncTables();
}
else if (committed)
{
updateLsn();
}
return true;
}

View File

@ -94,6 +94,8 @@ private:
void syncTables();
void updateLsn();
String advanceLSN(std::shared_ptr<pqxx::nontransaction> ntx);
void processReplicationMessage(const char * replication_message, size_t size);
@ -136,6 +138,8 @@ private:
ContextPtr context;
const std::string replication_slot_name, publication_name;
bool committed = false;
std::shared_ptr<postgres::Connection> connection;
std::string current_lsn, final_lsn;

View File

@ -321,13 +321,13 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
nested_storages,
(is_materialized_postgresql_database ? postgres_database : postgres_database + '.' + tables_list));
replication_handler_initialized = true;
consumer_task->activateAndSchedule();
cleanup_task->activateAndSchedule();
/// Do not rely anymore on saved storage pointers.
materialized_storages.clear();
replication_handler_initialized = true;
}

View File

@ -7500,7 +7500,7 @@ void StorageReplicatedMergeTree::lockSharedDataTemporary(const String & part_nam
String id = part_id;
boost::replace_all(id, "/", "_");
Strings zc_zookeeper_paths = getZeroCopyPartPath(*getSettings(), toString(disk->getType()), getTableSharedID(),
Strings zc_zookeeper_paths = getZeroCopyPartPath(*getSettings(), toString(disk->getDataSourceDescription().type), getTableSharedID(),
part_name, zookeeper_path);
for (const auto & zc_zookeeper_path : zc_zookeeper_paths)
@ -7690,11 +7690,11 @@ DataPartStoragePtr StorageReplicatedMergeTree::tryToFetchIfShared(
const String & path)
{
const auto settings = getSettings();
auto disk_type = disk->getType();
auto data_source_description = disk->getDataSourceDescription();
if (!(disk->supportZeroCopyReplication() && settings->allow_remote_fs_zero_copy_replication))
return nullptr;
String replica = getSharedDataReplica(part, disk_type);
String replica = getSharedDataReplica(part, data_source_description.type);
/// We can't fetch part when none replicas have this part on a same type remote disk
if (replica.empty())
@ -7703,9 +7703,8 @@ DataPartStoragePtr StorageReplicatedMergeTree::tryToFetchIfShared(
return executeFetchShared(replica, part.name, disk, path);
}
String StorageReplicatedMergeTree::getSharedDataReplica(
const IMergeTreeDataPart & part, DiskType disk_type) const
const IMergeTreeDataPart & part, DataSourceType data_source_type) const
{
String best_replica;
@ -7713,7 +7712,7 @@ String StorageReplicatedMergeTree::getSharedDataReplica(
if (!zookeeper)
return "";
Strings zc_zookeeper_paths = getZeroCopyPartPath(*getSettings(), toString(disk_type), getTableSharedID(), part.name,
Strings zc_zookeeper_paths = getZeroCopyPartPath(*getSettings(), toString(data_source_type), getTableSharedID(), part.name,
zookeeper_path);
std::set<String> replicas;
@ -7824,7 +7823,7 @@ std::optional<String> StorageReplicatedMergeTree::getZeroCopyPartPath(const Stri
if (!disk || !disk->supportZeroCopyReplication())
return std::nullopt;
return getZeroCopyPartPath(*getSettings(), toString(disk->getType()), getTableSharedID(), part_name, zookeeper_path)[0];
return getZeroCopyPartPath(*getSettings(), toString(disk->getDataSourceDescription().type), getTableSharedID(), part_name, zookeeper_path)[0];
}
std::optional<ZeroCopyLock> StorageReplicatedMergeTree::tryCreateZeroCopyExclusiveLock(const String & part_name, const DiskPtr & disk)
@ -8229,7 +8228,7 @@ bool StorageReplicatedMergeTree::removeSharedDetachedPart(DiskPtr disk, const St
String id = disk->getUniqueId(checksums);
bool can_remove = false;
std::tie(can_remove, files_not_to_remove) = StorageReplicatedMergeTree::unlockSharedDataByID(id, table_uuid, part_name,
detached_replica_name, toString(disk->getType()), zookeeper, local_context->getReplicatedMergeTreeSettings(), &Poco::Logger::get("StorageReplicatedMergeTree"),
detached_replica_name, toString(disk->getDataSourceDescription().type), zookeeper, local_context->getReplicatedMergeTreeSettings(), &Poco::Logger::get("StorageReplicatedMergeTree"),
detached_zookeeper_path);
keep_shared = !can_remove;

View File

@ -283,7 +283,7 @@ public:
DataPartStoragePtr tryToFetchIfShared(const IMergeTreeDataPart & part, const DiskPtr & disk, const String & path) override;
/// Get best replica having this partition on a same type remote disk
String getSharedDataReplica(const IMergeTreeDataPart & part, DiskType disk_type) const;
String getSharedDataReplica(const IMergeTreeDataPart & part, DataSourceType data_source_type) const;
inline String getReplicaName() const { return replica_name; }

View File

@ -34,6 +34,13 @@ void StorageS3Settings::loadFromConfig(const String & config_elem, const Poco::U
return with_default ? config.getUInt64(config_elem + "." + key + "." + elem, default_value) : config.getUInt64(config_elem + "." + key + "." + elem);
};
auto get_bool_for_key = [&](const String & key, const String & elem, bool with_default = true, bool default_value = false)
{
return with_default ? config.getBool(config_elem + "." + key + "." + elem, default_value) : config.getBool(config_elem + "." + key + "." + elem);
};
for (const String & key : config_keys)
{
if (config.has(config_elem + "." + key + ".endpoint"))
@ -82,6 +89,7 @@ void StorageS3Settings::loadFromConfig(const String & config_elem, const Poco::U
rw_settings.upload_part_size_multiply_parts_count_threshold = get_uint_for_key(key, "upload_part_size_multiply_parts_count_threshold", true, settings.s3_upload_part_size_multiply_parts_count_threshold);
rw_settings.max_single_part_upload_size = get_uint_for_key(key, "max_single_part_upload_size", true, settings.s3_max_single_part_upload_size);
rw_settings.max_connections = get_uint_for_key(key, "max_connections", true, settings.s3_max_connections);
rw_settings.check_objects_after_upload = get_bool_for_key(key, "check_objects_after_upload", true, false);
s3_settings.emplace(endpoint, S3Settings{std::move(auth_settings), std::move(rw_settings)});
}
@ -112,6 +120,7 @@ S3Settings::ReadWriteSettings::ReadWriteSettings(const Settings & settings)
upload_part_size_multiply_parts_count_threshold = settings.s3_upload_part_size_multiply_parts_count_threshold;
max_single_part_upload_size = settings.s3_max_single_part_upload_size;
max_connections = settings.s3_max_connections;
check_objects_after_upload = settings.s3_check_objects_after_upload;
}
void S3Settings::ReadWriteSettings::updateFromSettingsIfEmpty(const Settings & settings)
@ -128,6 +137,7 @@ void S3Settings::ReadWriteSettings::updateFromSettingsIfEmpty(const Settings & s
max_single_part_upload_size = settings.s3_max_single_part_upload_size;
if (!max_connections)
max_connections = settings.s3_max_connections;
check_objects_after_upload = settings.s3_check_objects_after_upload;
}
}

View File

@ -60,6 +60,7 @@ struct S3Settings
size_t upload_part_size_multiply_parts_count_threshold = 0;
size_t max_single_part_upload_size = 0;
size_t max_connections = 0;
bool check_objects_after_upload = false;
ReadWriteSettings() = default;
explicit ReadWriteSettings(const Settings & settings);
@ -71,7 +72,8 @@ struct S3Settings
&& upload_part_size_multiply_factor == other.upload_part_size_multiply_factor
&& upload_part_size_multiply_parts_count_threshold == other.upload_part_size_multiply_parts_count_threshold
&& max_single_part_upload_size == other.max_single_part_upload_size
&& max_connections == other.max_connections;
&& max_connections == other.max_connections
&& check_objects_after_upload == other.check_objects_after_upload;
}
void updateFromSettingsIfEmpty(const Settings & settings);

View File

@ -23,6 +23,7 @@ StorageSystemDisks::StorageSystemDisks(const StorageID & table_id_)
{"total_space", std::make_shared<DataTypeUInt64>()},
{"keep_free_space", std::make_shared<DataTypeUInt64>()},
{"type", std::make_shared<DataTypeString>()},
{"is_encrypted", std::make_shared<DataTypeUInt8>()},
{"cache_path", std::make_shared<DataTypeString>()},
}));
setInMemoryMetadata(storage_metadata);
@ -45,6 +46,7 @@ Pipe StorageSystemDisks::read(
MutableColumnPtr col_total = ColumnUInt64::create();
MutableColumnPtr col_keep = ColumnUInt64::create();
MutableColumnPtr col_type = ColumnString::create();
MutableColumnPtr col_is_encrypted = ColumnUInt8::create();
MutableColumnPtr col_cache_path = ColumnString::create();
for (const auto & [disk_name, disk_ptr] : context->getDisksMap())
@ -54,7 +56,9 @@ Pipe StorageSystemDisks::read(
col_free->insert(disk_ptr->getAvailableSpace());
col_total->insert(disk_ptr->getTotalSpace());
col_keep->insert(disk_ptr->getKeepingFreeSpace());
col_type->insert(toString(disk_ptr->getType()));
auto data_source_description = disk_ptr->getDataSourceDescription();
col_type->insert(toString(data_source_description.type));
col_is_encrypted->insert(data_source_description.is_encrypted);
String cache_path;
if (disk_ptr->supportsCache())
@ -70,6 +74,7 @@ Pipe StorageSystemDisks::read(
res_columns.emplace_back(std::move(col_total));
res_columns.emplace_back(std::move(col_keep));
res_columns.emplace_back(std::move(col_type));
res_columns.emplace_back(std::move(col_is_encrypted));
res_columns.emplace_back(std::move(col_cache_path));
UInt64 num_rows = res_columns.at(0)->size();

View File

@ -59,6 +59,7 @@ ln -sf $SRC_PATH/users.d/session_log_test.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/memory_profiler.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/no_fsync_metadata.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/filelog.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/enable_blobs_check.xml $DEST_SERVER_PATH/users.d/
# FIXME DataPartsExchange may hang for http_send_timeout seconds
# when nobody is going to read from the other side of socket (due to "Fetching of part was cancelled"),

View File

@ -0,0 +1,7 @@
<clickhouse>
<profiles>
<default>
<s3_check_objects_after_upload>1</s3_check_objects_after_upload>
</default>
</profiles>
</clickhouse>

View File

@ -89,6 +89,8 @@ def test_restore_table(engine):
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
instance.query(f"BACKUP TABLE test.table TO {backup_name}")
assert instance.contains_in_log("using native copy")
instance.query("DROP TABLE test.table")
assert instance.query("EXISTS test.table") == "0\n"
@ -129,6 +131,8 @@ def test_restore_table_under_another_name():
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
instance.query(f"BACKUP TABLE test.table TO {backup_name}")
assert instance.contains_in_log("using native copy")
assert instance.query("EXISTS test.table2") == "0\n"
instance.query(f"RESTORE TABLE test.table AS test.table2 FROM {backup_name}")
@ -142,6 +146,8 @@ def test_backup_table_under_another_name():
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
instance.query(f"BACKUP TABLE test.table AS test.table2 TO {backup_name}")
assert instance.contains_in_log("using native copy")
assert instance.query("EXISTS test.table2") == "0\n"
instance.query(f"RESTORE TABLE test.table2 FROM {backup_name}")
@ -170,6 +176,8 @@ def test_incremental_backup():
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
instance.query(f"BACKUP TABLE test.table TO {backup_name}")
assert instance.contains_in_log("using native copy")
instance.query("INSERT INTO test.table VALUES (65, 'a'), (66, 'b')")
assert instance.query("SELECT count(), sum(x) FROM test.table") == "102\t5081\n"
@ -244,6 +252,8 @@ def test_file_engine():
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
instance.query(f"BACKUP TABLE test.table TO {backup_name}")
assert instance.contains_in_log("using native copy")
instance.query("DROP TABLE test.table")
assert instance.query("EXISTS test.table") == "0\n"
@ -257,6 +267,9 @@ def test_database():
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
instance.query(f"BACKUP DATABASE test TO {backup_name}")
assert instance.contains_in_log("using native copy")
instance.query("DROP DATABASE test")
instance.query(f"RESTORE DATABASE test FROM {backup_name}")
@ -269,6 +282,7 @@ def test_zip_archive():
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
instance.query(f"BACKUP TABLE test.table TO {backup_name}")
assert os.path.isfile(get_path_to_backup(backup_name))
instance.query("DROP TABLE test.table")

View File

@ -0,0 +1 @@
#!/usr/bin/env python3

View File

@ -0,0 +1,23 @@
<?xml version="1.0" encoding="utf-8"?>
<clickhouse>
<profiles>
<default>
<s3_check_objects_after_upload>1</s3_check_objects_after_upload>
<enable_s3_requests_logging>1</enable_s3_requests_logging>
</default>
</profiles>
<users>
<default>
<password></password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
</default>
</users>
<quotas><default></default></quotas>
</clickhouse>

View File

@ -0,0 +1,31 @@
<?xml version="1.0" encoding="utf-8"?>
<clickhouse>
<logger>
<level>test</level>
</logger>
<storage_configuration>
<disks>
<s3>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3>
</disks>
<policies>
<s3>
<volumes>
<main>
<disk>s3</disk>
</main>
</volumes>
</s3>
</policies>
</storage_configuration>
<merge_tree>
<storage_policy>s3</storage_policy>
</merge_tree>
</clickhouse>

View File

@ -0,0 +1,52 @@
#!/usr/bin/env python3
import logging
import os
import time
from helpers.cluster import ClickHouseCluster
import pytest
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"node",
main_configs=[
"configs/storage_conf.xml",
],
user_configs=[
"configs/setting.xml",
],
with_minio=True,
)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
yield cluster
finally:
cluster.shutdown()
def test_paranoid_check_in_logs(cluster):
node = cluster.instances["node"]
node.query(
"""
CREATE TABLE s3_failover_test (
id Int64,
data String
) ENGINE=MergeTree()
ORDER BY id
"""
)
node.query("INSERT INTO s3_failover_test VALUES (1, 'Hello')")
assert node.contains_in_log("exists after upload")
assert node.query("SELECT * FROM s3_failover_test ORDER BY id") == "1\tHello\n"

View File

@ -6,7 +6,7 @@ disk_types = {
"disk_s3": "s3",
"disk_memory": "memory",
"disk_hdfs": "hdfs",
"disk_encrypted": "encrypted",
"disk_encrypted": "s3",
}
@ -34,14 +34,30 @@ def test_different_types(cluster):
if disk == "": # skip empty line (after split at last position)
continue
fields = disk.split("\t")
assert len(fields) >= 6
assert len(fields) >= 7
assert disk_types.get(fields[0], "UNKNOWN") == fields[5]
if "encrypted" in fields[0]:
assert fields[6] == "1"
else:
assert fields[6] == "0"
def test_select_by_type(cluster):
node = cluster.instances["node"]
for name, disk_type in list(disk_types.items()):
assert (
node.query("SELECT name FROM system.disks WHERE type='" + disk_type + "'")
== name + "\n"
)
if disk_type != "s3":
assert (
node.query(
"SELECT name FROM system.disks WHERE type='" + disk_type + "'"
)
== name + "\n"
)
else:
assert (
node.query(
"SELECT name FROM system.disks WHERE type='"
+ disk_type
+ "' ORDER BY name"
)
== "disk_encrypted\ndisk_s3\n"
)

View File

@ -0,0 +1 @@
#!/usr/bin/env python3

View File

@ -0,0 +1,35 @@
<clickhouse>
<logger>
<level>test</level>
</logger>
<storage_configuration>
<disks>
<s3>
<type>s3</type>
<!-- Use custom S3 endpoint -->
<endpoint>http://resolver:8080/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<!-- ClickHouse starts earlier than custom S3 endpoint. Skip access check to avoid fail on start-up -->
<skip_access_check>true</skip_access_check>
<!-- Avoid extra retries to speed up tests -->
<retry_attempts>0</retry_attempts>
<skip_access_check>true</skip_access_check>
</s3>
</disks>
<policies>
<s3>
<volumes>
<main>
<disk>s3</disk>
</main>
</volumes>
</s3>
</policies>
</storage_configuration>
<merge_tree>
<storage_policy>s3</storage_policy>
</merge_tree>
</clickhouse>

View File

@ -0,0 +1,22 @@
<clickhouse>
<profiles>
<default>
<s3_max_single_part_upload_size>1</s3_max_single_part_upload_size>
<enable_s3_requests_logging>1</enable_s3_requests_logging>
</default>
</profiles>
<users>
<default>
<password></password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
</default>
</users>
<quotas><default></default></quotas>
</clickhouse>

View File

@ -0,0 +1,36 @@
#!/usr/bin/env python3
from bottle import request, route, run, response
# Handle for MultipleObjectsDelete.
@route("/<_bucket>", ["POST"])
def delete(_bucket):
response.set_header(
"Location", "http://minio1:9001/" + _bucket + "?" + request.query_string
)
response.status = 307
return "Redirected"
@route("/<_bucket>/<_path:path>", ["GET", "POST", "PUT", "DELETE"])
def server(_bucket, _path):
# CompleteMultipartUpload request
# We always returning 200 + error in body to simulate: https://aws.amazon.com/premiumsupport/knowledge-center/s3-resolve-200-internalerror/
if request.query_string.startswith("uploadId="):
response.status = 200
response.content_type = "text/xml"
return '<?xml version="1.0" encoding="UTF-8"?><Error><Code>InternalError</Code><Message>We encountered an internal error. Please try again.</Message><RequestId>txfbd566d03042474888193-00608d7538</RequestId></Error>'
response.set_header(
"Location",
"http://minio1:9001/" + _bucket + "/" + _path + "?" + request.query_string,
)
response.status = 307
return "Redirected"
@route("/")
def ping():
return "OK"
run(host="0.0.0.0", port=8080)

View File

@ -0,0 +1,88 @@
#!/usr/bin/env python3
import logging
import os
import time
import pytest
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
# Runs custom python-based S3 endpoint.
def run_endpoint(cluster):
logging.info("Starting custom S3 endpoint")
container_id = cluster.get_container_id("resolver")
current_dir = os.path.dirname(__file__)
cluster.copy_file_to_container(
container_id,
os.path.join(current_dir, "s3_endpoint", "endpoint.py"),
"endpoint.py",
)
cluster.exec_in_container(container_id, ["python", "endpoint.py"], detach=True)
# Wait for S3 endpoint start
num_attempts = 100
for attempt in range(num_attempts):
ping_response = cluster.exec_in_container(
cluster.get_container_id("resolver"),
["curl", "-s", "http://resolver:8080/"],
nothrow=True,
)
if ping_response != "OK":
if attempt == num_attempts - 1:
assert ping_response == "OK", 'Expected "OK", but got "{}"'.format(
ping_response
)
else:
time.sleep(1)
else:
break
logging.info("S3 endpoint started")
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"node",
main_configs=[
"configs/storage_conf.xml",
],
user_configs=[
"configs/upload_min_size.xml",
],
with_minio=True,
)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
run_endpoint(cluster)
yield cluster
finally:
cluster.shutdown()
def test_dataloss(cluster):
node = cluster.instances["node"]
node.query(
"""
CREATE TABLE s3_failover_test (
id Int64,
data String
) ENGINE=MergeTree()
ORDER BY id
"""
)
# Must throw an exception because we use proxy which always fail
# CompleteMultipartUpload requests
with pytest.raises(Exception):
node.query("INSERT INTO s3_failover_test VALUES (1, 'Hello')")

View File

@ -123,7 +123,7 @@ timeout $TIMEOUT bash -c drop_part_thread &
wait
check_replication_consistency "dst_" "count(), sum(p), sum(k), sum(v)"
try_sync_replicas "src_"
try_sync_replicas "src_" 300
for ((i=0; i<16; i++)) do
$CLICKHOUSE_CLIENT -q "DROP TABLE dst_$i" 2>&1| grep -Fv "is already started to be removing" &

View File

@ -186,6 +186,7 @@ CREATE TABLE system.disks
`total_space` UInt64,
`keep_free_space` UInt64,
`type` String,
`is_encrypted` UInt8,
`cache_path` String
)
ENGINE = SystemDisks

View File

@ -12,4 +12,4 @@ SET max_block_size=3;
SET max_joined_block_size_rows = 2;
SET join_algorithm='partial_merge';
SELECT value FROM t1 LEFT JOIN t2 ON t1.x = t2.x;
SELECT value FROM t1 LEFT JOIN t2 ON t1.x = t2.x ORDER BY value;

View File

@ -0,0 +1,2 @@
1970-01-01 00:00:00.000000000
c1 Nullable(String)

View File

@ -0,0 +1,2 @@
select toDateTime64OrDefault('Aaaa e a.a.aaaaaaaaa', 9, 'UTC');
desc format(CSV, '"Aaaa e a.a.aaaaaaaaa"');

View File

@ -0,0 +1,28 @@
#!/bin/bash
if [ -z "$1" ]; then
echo "Helper script to create empty test and reference files and assign a new number."
echo "Usage: $0 <base_test_name>"
exit 1
fi
TESTS_PATH=$(dirname ${BASH_SOURCE[0]})
set -ue
# shellcheck disable=SC2010
LAST_TEST_NO=$(ls -1 ${TESTS_PATH} | grep -P -o '^\d+' | sort -nr | head -1)
# remove leading zeros, increment and add padding zeros to 5 digits
NEW_TEST_NO=$(printf "%05d\n" $((10#$LAST_TEST_NO + 1)))
# if extension is not provided, use `.sql`
FILENAME="${1}"
FILEEXT="sql"
if [[ $1 == *.* ]] ; then
FILENAME="${1%.*}"
FILEEXT="${1##*.}"
fi
set -x
touch ${TESTS_PATH}/${NEW_TEST_NO}_${FILENAME}.${FILEEXT}
touch ${TESTS_PATH}/${NEW_TEST_NO}_${FILENAME}.reference

View File

@ -5,6 +5,7 @@
function try_sync_replicas()
{
table_name_prefix=$1
time_left=$2
readarray -t empty_partitions_arr < <(${CLICKHOUSE_CLIENT} -q \
"SELECT DISTINCT substr(new_part_name, 1, position(new_part_name, '_') - 1) AS partition_id
@ -29,7 +30,7 @@ function try_sync_replicas()
for t in "${tables_arr[@]}"
do
# The size of log may be big, so increase timeout.
$CLICKHOUSE_CLIENT --receive_timeout 300 -q "SYSTEM SYNC REPLICA $t" || ($CLICKHOUSE_CLIENT -q \
$CLICKHOUSE_CLIENT --receive_timeout $time_left -q "SYSTEM SYNC REPLICA $t" || ($CLICKHOUSE_CLIENT -q \
"select 'sync failed, queue:', * from system.replication_queue where database=currentDatabase() and table='$t' order by database, table, node_name" && exit 1) &
pids[${i}]=$!
i=$((i + 1))
@ -48,13 +49,14 @@ function check_replication_consistency()
# Wait for all queries to finish (query may still be running if thread is killed by timeout)
num_tries=0
while [[ $($CLICKHOUSE_CLIENT -q "SELECT count() FROM system.processes WHERE current_database=currentDatabase() AND query LIKE '%$table_name_prefix%'") -ne 1 ]]; do
sleep 0.5;
sleep 1;
num_tries=$((num_tries+1))
if [ $num_tries -eq 200 ]; then
if [ $num_tries -eq 250 ]; then
$CLICKHOUSE_CLIENT -q "SELECT * FROM system.processes WHERE current_database=currentDatabase() AND query LIKE '%$table_name_prefix%' FORMAT Vertical"
break
fi
done
time_left=$((300 - num_tries))
# Do not check anything if all replicas are readonly,
# because is this case all replicas are probably lost (it may happen and it's not a bug)
@ -78,7 +80,7 @@ function check_replication_consistency()
# SYNC REPLICA is not enough if some MUTATE_PARTs are not assigned yet
wait_for_all_mutations "$table_name_prefix%"
try_sync_replicas "$table_name_prefix" || exit 1
try_sync_replicas "$table_name_prefix" "$time_left" || exit 1
res=$($CLICKHOUSE_CLIENT -q \
"SELECT