Merge pull request #13076 from Jokser/s3-file-cache

Ability to cache mark and index files for S3 disk
This commit is contained in:
Alexander Kuzmenkov 2020-08-08 01:04:53 +03:00 committed by GitHub
commit 6a596d5c40
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 821 additions and 30 deletions

View File

@ -0,0 +1,325 @@
#include "DiskCacheWrapper.h"
#include <IO/copyData.h>
#include <Common/quoteString.h>
#include <common/logger_useful.h>
#include <condition_variable>
namespace DB
{
/**
* Write buffer with possibility to set and invoke callback when buffer is finalized.
*/
class CompletionAwareWriteBuffer : public WriteBufferFromFileBase
{
public:
CompletionAwareWriteBuffer(std::unique_ptr<WriteBufferFromFileBase> impl_, std::function<void()> completion_callback_, size_t buf_size_)
: WriteBufferFromFileBase(buf_size_, nullptr, 0), impl(std::move(impl_)), completion_callback(completion_callback_) { }
~CompletionAwareWriteBuffer() override
{
try
{
CompletionAwareWriteBuffer::finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void finalize() override
{
if (finalized)
return;
next();
impl->finalize();
finalized = true;
completion_callback();
}
void sync() override { impl->sync(); }
std::string getFileName() const override { return impl->getFileName(); }
private:
void nextImpl() override
{
impl->swap(*this);
impl->next();
impl->swap(*this);
}
/// Actual write buffer.
std::unique_ptr<WriteBufferFromFileBase> impl;
/// Callback is invoked when finalize is completed.
const std::function<void()> completion_callback;
bool finalized = false;
};
enum FileDownloadStatus
{
NONE,
DOWNLOADING,
DOWNLOADED,
ERROR
};
struct FileDownloadMetadata
{
/// Thread waits on this condition if download process is in progress.
std::condition_variable condition;
FileDownloadStatus status = NONE;
};
DiskCacheWrapper::DiskCacheWrapper(
std::shared_ptr<IDisk> delegate_, std::shared_ptr<DiskLocal> cache_disk_, std::function<bool(const String &)> cache_file_predicate_)
: DiskDecorator(delegate_), cache_disk(cache_disk_), cache_file_predicate(cache_file_predicate_)
{
}
std::shared_ptr<FileDownloadMetadata> DiskCacheWrapper::acquireDownloadMetadata(const String & path) const
{
std::unique_lock<std::mutex> lock{mutex};
auto it = file_downloads.find(path);
if (it != file_downloads.end() && !it->second.expired())
return it->second.lock();
std::shared_ptr<FileDownloadMetadata> metadata(
new FileDownloadMetadata,
[this, path] (FileDownloadMetadata * p)
{
std::unique_lock<std::mutex> erase_lock{mutex};
file_downloads.erase(path);
delete p;
});
file_downloads.emplace(path, metadata);
return metadata;
}
std::unique_ptr<ReadBufferFromFileBase>
DiskCacheWrapper::readFile(const String & path, size_t buf_size, size_t estimated_size, size_t aio_threshold, size_t mmap_threshold) const
{
if (!cache_file_predicate(path))
return DiskDecorator::readFile(path, buf_size, estimated_size, aio_threshold, mmap_threshold);
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Read file {} from cache", backQuote(path));
if (cache_disk->exists(path))
return cache_disk->readFile(path, buf_size, estimated_size, aio_threshold, mmap_threshold);
auto metadata = acquireDownloadMetadata(path);
{
std::unique_lock<std::mutex> lock{mutex};
if (metadata->status == NONE)
{
/// This thread will responsible for file downloading to cache.
metadata->status = DOWNLOADING;
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "File {} doesn't exist in cache. Will download it", backQuote(path));
}
else if (metadata->status == DOWNLOADING)
{
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Waiting for file {} download to cache", backQuote(path));
metadata->condition.wait(lock, [metadata] { return metadata->status == DOWNLOADED || metadata->status == ERROR; });
}
}
if (metadata->status == DOWNLOADING)
{
FileDownloadStatus result_status = DOWNLOADED;
if (!cache_disk->exists(path))
{
try
{
auto dir_path = getDirectoryPath(path);
if (!cache_disk->exists(dir_path))
cache_disk->createDirectories(dir_path);
auto tmp_path = path + ".tmp";
{
auto src_buffer = DiskDecorator::readFile(path, buf_size, estimated_size, aio_threshold, mmap_threshold);
auto dst_buffer = cache_disk->writeFile(tmp_path, buf_size, WriteMode::Rewrite, estimated_size, aio_threshold);
copyData(*src_buffer, *dst_buffer);
}
cache_disk->moveFile(tmp_path, path);
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "File {} downloaded to cache", backQuote(path));
}
catch (...)
{
tryLogCurrentException("DiskS3", "Failed to download file + " + backQuote(path) + " to cache");
result_status = ERROR;
}
}
/// Notify all waiters that file download is finished.
std::unique_lock<std::mutex> lock{mutex};
metadata->status = result_status;
lock.unlock();
metadata->condition.notify_all();
}
if (metadata->status == DOWNLOADED)
return cache_disk->readFile(path, buf_size, estimated_size, aio_threshold, mmap_threshold);
return DiskDecorator::readFile(path, buf_size, estimated_size, aio_threshold, mmap_threshold);
}
std::unique_ptr<WriteBufferFromFileBase>
DiskCacheWrapper::writeFile(const String & path, size_t buf_size, WriteMode mode, size_t estimated_size, size_t aio_threshold)
{
if (!cache_file_predicate(path))
return DiskDecorator::writeFile(path, buf_size, mode, estimated_size, aio_threshold);
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Write file {} to cache", backQuote(path));
auto dir_path = getDirectoryPath(path);
if (!cache_disk->exists(dir_path))
cache_disk->createDirectories(dir_path);
return std::make_unique<CompletionAwareWriteBuffer>(
cache_disk->writeFile(path, buf_size, mode, estimated_size, aio_threshold),
[this, path, buf_size, mode, estimated_size, aio_threshold]()
{
/// Copy file from cache to actual disk when cached buffer is finalized.
auto src_buffer = cache_disk->readFile(path, buf_size, estimated_size, aio_threshold, 0);
auto dst_buffer = DiskDecorator::writeFile(path, buf_size, mode, estimated_size, aio_threshold);
copyData(*src_buffer, *dst_buffer);
},
buf_size);
}
void DiskCacheWrapper::clearDirectory(const String & path)
{
if (cache_disk->exists(path))
cache_disk->clearDirectory(path);
DiskDecorator::clearDirectory(path);
}
void DiskCacheWrapper::moveDirectory(const String & from_path, const String & to_path)
{
if (cache_disk->exists(from_path))
cache_disk->moveDirectory(from_path, to_path);
DiskDecorator::moveDirectory(from_path, to_path);
}
void DiskCacheWrapper::moveFile(const String & from_path, const String & to_path)
{
if (cache_disk->exists(from_path))
{
auto dir_path = getDirectoryPath(to_path);
if (!cache_disk->exists(dir_path))
cache_disk->createDirectories(dir_path);
cache_disk->moveFile(from_path, to_path);
}
DiskDecorator::moveFile(from_path, to_path);
}
void DiskCacheWrapper::replaceFile(const String & from_path, const String & to_path)
{
if (cache_disk->exists(from_path))
{
auto dir_path = getDirectoryPath(to_path);
if (!cache_disk->exists(dir_path))
cache_disk->createDirectories(dir_path);
cache_disk->replaceFile(from_path, to_path);
}
DiskDecorator::replaceFile(from_path, to_path);
}
void DiskCacheWrapper::copyFile(const String & from_path, const String & to_path)
{
if (cache_disk->exists(from_path))
{
auto dir_path = getDirectoryPath(to_path);
if (!cache_disk->exists(dir_path))
cache_disk->createDirectories(dir_path);
cache_disk->copyFile(from_path, to_path);
}
DiskDecorator::copyFile(from_path, to_path);
}
void DiskCacheWrapper::remove(const String & path)
{
if (cache_disk->exists(path))
cache_disk->remove(path);
DiskDecorator::remove(path);
}
void DiskCacheWrapper::removeRecursive(const String & path)
{
if (cache_disk->exists(path))
cache_disk->removeRecursive(path);
DiskDecorator::removeRecursive(path);
}
void DiskCacheWrapper::createHardLink(const String & src_path, const String & dst_path)
{
if (cache_disk->exists(src_path))
{
auto dir_path = getDirectoryPath(dst_path);
if (!cache_disk->exists(dir_path))
cache_disk->createDirectories(dir_path);
cache_disk->createHardLink(src_path, dst_path);
}
DiskDecorator::createHardLink(src_path, dst_path);
}
void DiskCacheWrapper::createDirectory(const String & path)
{
cache_disk->createDirectory(path);
DiskDecorator::createDirectory(path);
}
void DiskCacheWrapper::createDirectories(const String & path)
{
cache_disk->createDirectories(path);
DiskDecorator::createDirectories(path);
}
inline String DiskCacheWrapper::getDirectoryPath(const String & path)
{
return Poco::Path{path}.setFileName("").toString();
}
/// TODO: Current reservation mechanism leaks IDisk abstraction details.
/// This hack is needed to return proper disk pointer (wrapper instead of implementation) from reservation object.
class ReservationDelegate : public IReservation
{
public:
ReservationDelegate(ReservationPtr delegate_, DiskPtr wrapper_) : delegate(std::move(delegate_)), wrapper(wrapper_) { }
UInt64 getSize() const override { return delegate->getSize(); }
DiskPtr getDisk(size_t) const override { return wrapper; }
Disks getDisks() const override { return {wrapper}; }
void update(UInt64 new_size) override { delegate->update(new_size); }
private:
ReservationPtr delegate;
DiskPtr wrapper;
};
ReservationPtr DiskCacheWrapper::reserve(UInt64 bytes)
{
auto ptr = DiskDecorator::reserve(bytes);
if (ptr)
{
auto disk_ptr = std::static_pointer_cast<DiskCacheWrapper>(shared_from_this());
return std::make_unique<ReservationDelegate>(std::move(ptr), disk_ptr);
}
return ptr;
}
}

View File

@ -0,0 +1,59 @@
#pragma once
#include <unordered_map>
#include "DiskDecorator.h"
#include "DiskLocal.h"
namespace DB
{
struct FileDownloadMetadata;
/**
* Simple cache wrapper.
* Tries to cache files matched by predicate to given local disk (cache disk).
*
* When writeFile() is invoked wrapper firstly writes file to cache.
* After write buffer is finalized actual file is stored to underlying disk.
*
* When readFile() is invoked and file exists in cache wrapper reads this file from cache.
* If file doesn't exist wrapper downloads this file from underlying disk to cache.
* readFile() invocation is thread-safe.
*/
class DiskCacheWrapper : public DiskDecorator
{
public:
DiskCacheWrapper(
std::shared_ptr<IDisk> delegate_,
std::shared_ptr<DiskLocal> cache_disk_,
std::function<bool(const String &)> cache_file_predicate_);
void createDirectory(const String & path) override;
void createDirectories(const String & path) override;
void clearDirectory(const String & path) override;
void moveDirectory(const String & from_path, const String & to_path) override;
void moveFile(const String & from_path, const String & to_path) override;
void replaceFile(const String & from_path, const String & to_path) override;
void copyFile(const String & from_path, const String & to_path) override;
std::unique_ptr<ReadBufferFromFileBase>
readFile(const String & path, size_t buf_size, size_t estimated_size, size_t aio_threshold, size_t mmap_threshold) const override;
std::unique_ptr<WriteBufferFromFileBase>
writeFile(const String & path, size_t buf_size, WriteMode mode, size_t estimated_size, size_t aio_threshold) override;
void remove(const String & path) override;
void removeRecursive(const String & path) override;
void createHardLink(const String & src_path, const String & dst_path) override;
ReservationPtr reserve(UInt64 bytes) override;
private:
std::shared_ptr<FileDownloadMetadata> acquireDownloadMetadata(const String & path) const;
static String getDirectoryPath(const String & path);
/// Disk to cache files.
std::shared_ptr<DiskLocal> cache_disk;
/// Cache only files satisfies predicate.
const std::function<bool(const String &)> cache_file_predicate;
/// Contains information about currently running file downloads to cache.
mutable std::unordered_map<String, std::weak_ptr<FileDownloadMetadata>> file_downloads;
/// Protects concurrent downloading files to cache.
mutable std::mutex mutex;
};
}

168
src/Disks/DiskDecorator.cpp Normal file
View File

@ -0,0 +1,168 @@
#include "DiskDecorator.h"
#include <IO/ReadBufferFromFileBase.h>
#include <IO/WriteBufferFromFileBase.h>
namespace DB
{
DiskDecorator::DiskDecorator(const DiskPtr & delegate_) : delegate(delegate_)
{
}
const String & DiskDecorator::getName() const
{
return delegate->getName();
}
ReservationPtr DiskDecorator::reserve(UInt64 bytes)
{
return delegate->reserve(bytes);
}
const String & DiskDecorator::getPath() const
{
return delegate->getPath();
}
UInt64 DiskDecorator::getTotalSpace() const
{
return delegate->getTotalSpace();
}
UInt64 DiskDecorator::getAvailableSpace() const
{
return delegate->getAvailableSpace();
}
UInt64 DiskDecorator::getUnreservedSpace() const
{
return delegate->getUnreservedSpace();
}
UInt64 DiskDecorator::getKeepingFreeSpace() const
{
return delegate->getKeepingFreeSpace();
}
bool DiskDecorator::exists(const String & path) const
{
return delegate->exists(path);
}
bool DiskDecorator::isFile(const String & path) const
{
return delegate->isFile(path);
}
bool DiskDecorator::isDirectory(const String & path) const
{
return delegate->isDirectory(path);
}
size_t DiskDecorator::getFileSize(const String & path) const
{
return delegate->getFileSize(path);
}
void DiskDecorator::createDirectory(const String & path)
{
delegate->createDirectory(path);
}
void DiskDecorator::createDirectories(const String & path)
{
delegate->createDirectories(path);
}
void DiskDecorator::clearDirectory(const String & path)
{
delegate->clearDirectory(path);
}
void DiskDecorator::moveDirectory(const String & from_path, const String & to_path)
{
delegate->moveDirectory(from_path, to_path);
}
DiskDirectoryIteratorPtr DiskDecorator::iterateDirectory(const String & path)
{
return delegate->iterateDirectory(path);
}
void DiskDecorator::createFile(const String & path)
{
delegate->createFile(path);
}
void DiskDecorator::moveFile(const String & from_path, const String & to_path)
{
delegate->moveFile(from_path, to_path);
}
void DiskDecorator::replaceFile(const String & from_path, const String & to_path)
{
delegate->replaceFile(from_path, to_path);
}
void DiskDecorator::copyFile(const String & from_path, const String & to_path)
{
delegate->copyFile(from_path, to_path);
}
void DiskDecorator::copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path)
{
delegate->copy(from_path, to_disk, to_path);
}
void DiskDecorator::listFiles(const String & path, std::vector<String> & file_names)
{
delegate->listFiles(path, file_names);
}
std::unique_ptr<ReadBufferFromFileBase>
DiskDecorator::readFile(const String & path, size_t buf_size, size_t estimated_size, size_t aio_threshold, size_t mmap_threshold) const
{
return delegate->readFile(path, buf_size, estimated_size, aio_threshold, mmap_threshold);
}
std::unique_ptr<WriteBufferFromFileBase>
DiskDecorator::writeFile(const String & path, size_t buf_size, WriteMode mode, size_t estimated_size, size_t aio_threshold)
{
return delegate->writeFile(path, buf_size, mode, estimated_size, aio_threshold);
}
void DiskDecorator::remove(const String & path)
{
delegate->remove(path);
}
void DiskDecorator::removeRecursive(const String & path)
{
delegate->removeRecursive(path);
}
void DiskDecorator::setLastModified(const String & path, const Poco::Timestamp & timestamp)
{
delegate->setLastModified(path, timestamp);
}
Poco::Timestamp DiskDecorator::getLastModified(const String & path)
{
return delegate->getLastModified(path);
}
void DiskDecorator::setReadOnly(const String & path)
{
delegate->setReadOnly(path);
}
void DiskDecorator::createHardLink(const String & src_path, const String & dst_path)
{
delegate->createHardLink(src_path, dst_path);
}
void DiskDecorator::truncateFile(const String & path, size_t size)
{
delegate->truncateFile(path, size);
}
}

51
src/Disks/DiskDecorator.h Normal file
View File

@ -0,0 +1,51 @@
#pragma once
#include "Disks/IDisk.h"
namespace DB
{
class DiskDecorator : public IDisk
{
public:
explicit DiskDecorator(const DiskPtr & delegate_);
const String & getName() const override;
ReservationPtr reserve(UInt64 bytes) override;
~DiskDecorator() override = default;
const String & getPath() const override;
UInt64 getTotalSpace() const override;
UInt64 getAvailableSpace() const override;
UInt64 getUnreservedSpace() const override;
UInt64 getKeepingFreeSpace() const override;
bool exists(const String & path) const override;
bool isFile(const String & path) const override;
bool isDirectory(const String & path) const override;
size_t getFileSize(const String & path) const override;
void createDirectory(const String & path) override;
void createDirectories(const String & path) override;
void clearDirectory(const String & path) override;
void moveDirectory(const String & from_path, const String & to_path) override;
DiskDirectoryIteratorPtr iterateDirectory(const String & path) override;
void createFile(const String & path) override;
void moveFile(const String & from_path, const String & to_path) override;
void replaceFile(const String & from_path, const String & to_path) override;
void copyFile(const String & from_path, const String & to_path) override;
void copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path) override;
void listFiles(const String & path, std::vector<String> & file_names) override;
std::unique_ptr<ReadBufferFromFileBase>
readFile(const String & path, size_t buf_size, size_t estimated_size, size_t aio_threshold, size_t mmap_threshold) const override;
std::unique_ptr<WriteBufferFromFileBase>
writeFile(const String & path, size_t buf_size, WriteMode mode, size_t estimated_size, size_t aio_threshold) override;
void remove(const String & path) override;
void removeRecursive(const String & path) override;
void setLastModified(const String & path, const Poco::Timestamp & timestamp) override;
Poco::Timestamp getLastModified(const String & path) override;
void setReadOnly(const String & path) override;
void createHardLink(const String & src_path, const String & dst_path) override;
void truncateFile(const String & path, size_t size) override;
const String getType() const override { return delegate->getType(); }
protected:
DiskPtr delegate;
};
}

View File

@ -407,7 +407,31 @@ public:
disk->reserved_bytes += size; disk->reserved_bytes += size;
} }
~DiskS3Reservation() override; ~DiskS3Reservation() override
{
try
{
std::lock_guard lock(disk->reservation_mutex);
if (disk->reserved_bytes < size)
{
disk->reserved_bytes = 0;
LOG_ERROR(&Poco::Logger::get("DiskLocal"), "Unbalanced reservations size for disk '{}'.", disk->getName());
}
else
{
disk->reserved_bytes -= size;
}
if (disk->reservation_count == 0)
LOG_ERROR(&Poco::Logger::get("DiskLocal"), "Unbalanced reservation count for disk '{}'.", disk->getName());
else
--disk->reservation_count;
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
private: private:
DiskS3Ptr disk; DiskS3Ptr disk;
@ -688,30 +712,4 @@ void DiskS3::setReadOnly(const String & path)
Poco::File(metadata_path + path).setReadOnly(true); Poco::File(metadata_path + path).setReadOnly(true);
} }
DiskS3Reservation::~DiskS3Reservation()
{
try
{
std::lock_guard lock(disk->reservation_mutex);
if (disk->reserved_bytes < size)
{
disk->reserved_bytes = 0;
LOG_ERROR(&Poco::Logger::get("DiskLocal"), "Unbalanced reservations size for disk '{}'.", disk->getName());
}
else
{
disk->reserved_bytes -= size;
}
if (disk->reservation_count == 0)
LOG_ERROR(&Poco::Logger::get("DiskLocal"), "Unbalanced reservation count for disk '{}'.", disk->getName());
else
--disk->reservation_count;
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
} }

View File

@ -1,10 +1,10 @@
#include <IO/ReadBufferFromFileBase.h>
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
#include <IO/S3Common.h> #include <IO/S3Common.h>
#include <IO/WriteBufferFromFileBase.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include "DiskS3.h" #include "DiskS3.h"
#include "Disks/DiskCacheWrapper.h"
#include "Disks/DiskCacheWrapper.cpp"
#include "Disks/DiskFactory.h" #include "Disks/DiskFactory.h"
#include "ProxyConfiguration.h" #include "ProxyConfiguration.h"
#include "ProxyListConfiguration.h" #include "ProxyListConfiguration.h"
@ -130,7 +130,7 @@ void registerDiskS3(DiskFactory & factory)
config.getString(config_prefix + ".access_key_id", ""), config.getString(config_prefix + ".access_key_id", ""),
config.getString(config_prefix + ".secret_access_key", "")); config.getString(config_prefix + ".secret_access_key", ""));
String metadata_path = context.getPath() + "disks/" + name + "/"; String metadata_path = config.getString(config_prefix + ".metadata_path", context.getPath() + "disks/" + name + "/");
auto s3disk = std::make_shared<DiskS3>( auto s3disk = std::make_shared<DiskS3>(
name, name,
@ -148,6 +148,26 @@ void registerDiskS3(DiskFactory & factory)
checkReadAccess(name, *s3disk); checkReadAccess(name, *s3disk);
checkRemoveAccess(*s3disk); checkRemoveAccess(*s3disk);
bool cache_enabled = config.getBool(config_prefix + ".cache_enabled", true);
if (cache_enabled)
{
String cache_path = config.getString(config_prefix + ".cache_path", context.getPath() + "disks/" + name + "/cache/");
if (metadata_path == cache_path)
throw Exception("Metadata and cache path should be different: " + metadata_path, ErrorCodes::BAD_ARGUMENTS);
auto cache_disk = std::make_shared<DiskLocal>("s3-cache", cache_path, 0);
auto cache_file_predicate = [] (const String & path)
{
return path.ends_with("idx") // index files.
|| path.ends_with("mrk") || path.ends_with("mrk2") || path.ends_with("mrk3") // mark files.
|| path.ends_with("txt") || path.ends_with("dat");
};
return std::make_shared<DiskCacheWrapper>(s3disk, cache_disk, cache_file_predicate);
}
return s3disk; return s3disk;
}; };
factory.registerDiskType("s3", creator); factory.registerDiskType("s3", creator);

View File

@ -318,6 +318,7 @@ void LogBlockOutputStream::writeSuffix()
/// Finish write. /// Finish write.
marks_stream->next(); marks_stream->next();
marks_stream->finalize();
for (auto & name_stream : streams) for (auto & name_stream : streams)
name_stream.second.finalize(); name_stream.second.finalize();

View File

@ -206,8 +206,10 @@ public:
block_out.writeSuffix(); block_out.writeSuffix();
data_out->next(); data_out->next();
data_out_compressed->next(); data_out_compressed->next();
data_out_compressed->finalize();
index_out->next(); index_out->next();
index_out_compressed->next(); index_out_compressed->next();
index_out_compressed->finalize();
storage.file_checker.update(data_out_file); storage.file_checker.update(data_out_file);
storage.file_checker.update(index_out_file); storage.file_checker.update(index_out_file);

View File

@ -0,0 +1,12 @@
<yandex>
<shutdown_wait_unfinished>3</shutdown_wait_unfinished>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/log.log</log>
<errorlog>/var/log/clickhouse-server/log.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
</logger>
</yandex>

View File

@ -0,0 +1,21 @@
<yandex>
<storage_configuration>
<disks>
<s3>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3>
</disks>
<policies>
<s3>
<volumes>
<main>
<disk>s3</disk>
</main>
</volumes>
</s3>
</policies>
</storage_configuration>
</yandex>

View File

@ -0,0 +1,5 @@
<yandex>
<profiles>
<default/>
</profiles>
</yandex>

View File

@ -0,0 +1,27 @@
<?xml version="1.0"?>
<yandex>
<tcp_port>9000</tcp_port>
<listen_host>127.0.0.1</listen_host>
<query_log>
<database>system</database>
<table>query_log</table>
<partition_by>toYYYYMM(event_date)</partition_by>
<flush_interval_milliseconds>1000</flush_interval_milliseconds>
</query_log>
<openSSL>
<client>
<cacheSessions>true</cacheSessions>
<verificationMode>none</verificationMode>
<invalidCertificateHandler>
<name>AcceptCertificateHandler</name>
</invalidCertificateHandler>
</client>
</openSSL>
<max_concurrent_queries>500</max_concurrent_queries>
<mark_cache_size>0</mark_cache_size>
<path>./clickhouse/</path>
<users_config>users.xml</users_config>
</yandex>

View File

@ -0,0 +1,102 @@
import logging
import pytest
from helpers.cluster import ClickHouseCluster
logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler())
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance("node", config_dir="configs", with_minio=True)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
yield cluster
finally:
cluster.shutdown()
def get_query_stat(instance, hint):
result = {}
instance.query("SYSTEM FLUSH LOGS")
events = instance.query('''
SELECT ProfileEvents.Names, ProfileEvents.Values
FROM system.query_log
ARRAY JOIN ProfileEvents
WHERE type != 1 AND query LIKE '%{}%'
'''.format(hint.replace("'", "\\'"))).split("\n")
for event in events:
ev = event.split("\t")
if len(ev) == 2:
if ev[0].startswith("S3"):
result[ev[0]] = int(ev[1])
return result
def test_write_is_cached(cluster):
node = cluster.instances["node"]
node.query(
"""
CREATE TABLE s3_test (
id Int64,
data String
) ENGINE=MergeTree()
ORDER BY id
SETTINGS storage_policy='s3'
"""
)
node.query("SYSTEM FLUSH LOGS")
node.query("TRUNCATE TABLE system.query_log")
node.query("INSERT INTO s3_test VALUES (0,'data'),(1,'data')")
select_query = "SELECT * FROM s3_test order by id FORMAT Values"
assert node.query(select_query) == "(0,'data'),(1,'data')"
stat = get_query_stat(node, select_query)
assert stat["S3ReadRequestsCount"] == 2 # Only .bin files should be accessed from S3.
node.query("DROP TABLE IF EXISTS s3_test NO DELAY")
def test_read_after_cache_is_wiped(cluster):
node = cluster.instances["node"]
node.query(
"""
CREATE TABLE s3_test (
id Int64,
data String
) ENGINE=MergeTree()
ORDER BY id
SETTINGS storage_policy='s3'
"""
)
node.query("SYSTEM FLUSH LOGS")
node.query("TRUNCATE TABLE system.query_log")
node.query("INSERT INTO s3_test VALUES (0,'data'),(1,'data')")
# Wipe cache
cluster.exec_in_container(cluster.get_container_id("node"), ["rm", "-rf", "/var/lib/clickhouse/disks/s3/cache/"])
select_query = "SELECT * FROM s3_test"
node.query(select_query)
stat = get_query_stat(node, select_query)
assert stat["S3ReadRequestsCount"] == 4 # .mrk and .bin files should be accessed from S3.
# After cache is populated again, only .bin files should be accessed from S3.
select_query = "SELECT * FROM s3_test order by id FORMAT Values"
assert node.query(select_query) == "(0,'data'),(1,'data')"
stat = get_query_stat(node, select_query)
assert stat["S3ReadRequestsCount"] == 2
node.query("DROP TABLE IF EXISTS s3_test NO DELAY")