Revert "Temporary files evict fs cache"

This commit is contained in:
Vladimir C 2022-12-02 14:50:56 +01:00 committed by GitHub
parent ce8ab95b65
commit 7d6950d397
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 220 additions and 993 deletions

View File

@ -207,7 +207,7 @@ void LocalServer::tryInitPath()
global_context->setPath(path);
global_context->setTemporaryStoragePath(path + "tmp/", 0);
global_context->setTemporaryStorage(path + "tmp", "", 0);
global_context->setFlagsPath(path + "flags");
global_context->setUserFilesPath(""); // user's files are everywhere

View File

@ -203,6 +203,46 @@ int mainEntryClickHouseServer(int argc, char ** argv)
namespace
{
void setupTmpPath(Poco::Logger * log, const std::string & path)
try
{
LOG_DEBUG(log, "Setting up {} to store temporary data in it", path);
fs::create_directories(path);
/// Clearing old temporary files.
fs::directory_iterator dir_end;
size_t unknown_files = 0;
for (fs::directory_iterator it(path); it != dir_end; ++it)
{
if (it->is_regular_file() && startsWith(it->path().filename(), "tmp"))
{
LOG_DEBUG(log, "Removing old temporary file {}", it->path().string());
fs::remove(it->path());
}
else
{
unknown_files++;
if (unknown_files < 100)
LOG_DEBUG(log, "Found unknown {} {} in temporary path",
it->is_regular_file() ? "file" : (it->is_directory() ? "directory" : "element"),
it->path().string());
}
}
if (unknown_files)
LOG_DEBUG(log, "Found {} unknown files in temporary path", unknown_files);
}
catch (...)
{
DB::tryLogCurrentException(
log,
fmt::format(
"Caught exception while setup temporary path: {}. It is ok to skip this exception as cleaning old temporary files is not "
"necessary",
path));
}
size_t waitServersToFinish(std::vector<DB::ProtocolServerAdapter> & servers, size_t seconds_to_wait)
{
const size_t sleep_max_ms = 1000 * seconds_to_wait;
@ -997,21 +1037,13 @@ try
LOG_TRACE(log, "Initialized DateLUT with time zone '{}'.", DateLUT::instance().getTimeZone());
/// Storage with temporary data for processing of heavy queries.
if (auto temporary_policy = config().getString("tmp_policy", ""); !temporary_policy.empty())
{
size_t max_size = config().getUInt64("max_temporary_data_on_disk_size", 0);
global_context->setTemporaryStoragePolicy(temporary_policy, max_size);
}
else if (auto temporary_cache = config().getString("tmp_cache", ""); !temporary_cache.empty())
{
size_t max_size = config().getUInt64("max_temporary_data_on_disk_size", 0);
global_context->setTemporaryStorageInCache(temporary_cache, max_size);
}
else
{
std::string temporary_path = config().getString("tmp_path", path / "tmp/");
std::string temporary_policy = config().getString("tmp_policy", "");
size_t max_size = config().getUInt64("max_temporary_data_on_disk_size", 0);
global_context->setTemporaryStoragePath(temporary_path, max_size);
const VolumePtr & volume = global_context->setTemporaryStorage(temporary_path, temporary_policy, max_size);
for (const DiskPtr & disk : volume->getDisks())
setupTmpPath(log, disk->getPath());
}
/** Directory with 'flags': files indicating temporary settings for the server set by system administrator.
@ -1410,7 +1442,7 @@ try
}
catch (...)
{
tryLogCurrentException(log, "Caught exception while setting up access control.");
tryLogCurrentException(log);
throw;
}

View File

@ -64,11 +64,11 @@ bool enoughSpaceInDirectory(const std::string & path, size_t data_size)
return data_size <= free_space;
}
std::unique_ptr<PocoTemporaryFile> createTemporaryFile(const std::string & folder_path)
std::unique_ptr<TemporaryFile> createTemporaryFile(const std::string & path)
{
ProfileEvents::increment(ProfileEvents::ExternalProcessingFilesTotal);
fs::create_directories(folder_path);
return std::make_unique<PocoTemporaryFile>(folder_path);
fs::create_directories(path);
return std::make_unique<TemporaryFile>(path);
}
#if !defined(OS_LINUX)

View File

@ -14,10 +14,10 @@ namespace fs = std::filesystem;
namespace DB
{
using PocoTemporaryFile = Poco::TemporaryFile;
using TemporaryFile = Poco::TemporaryFile;
bool enoughSpaceInDirectory(const std::string & path, size_t data_size);
std::unique_ptr<PocoTemporaryFile> createTemporaryFile(const std::string & folder_path);
std::unique_ptr<TemporaryFile> createTemporaryFile(const std::string & path);
// Determine what block device is responsible for specified path

View File

@ -118,7 +118,10 @@ void CachedOnDiskReadBufferFromFile::initialize(size_t offset, size_t size)
}
else
{
CreateFileSegmentSettings create_settings(is_persistent ? FileSegmentKind::Persistent : FileSegmentKind::Regular);
CreateFileSegmentSettings create_settings{
.is_persistent = is_persistent
};
file_segments_holder.emplace(cache->getOrSet(cache_key, offset, size, create_settings));
}
@ -948,7 +951,7 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
}
else
{
LOG_TRACE(log, "No space left in cache to reserve {} bytes, will continue without cache download", size);
LOG_TRACE(log, "No space left in cache, will continue without cache download");
file_segment->completeWithState(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
}

View File

@ -51,42 +51,16 @@ FileSegmentRangeWriter::FileSegmentRangeWriter(
{
}
bool FileSegmentRangeWriter::write(const char * data, size_t size, size_t offset, FileSegmentKind segment_kind)
{
size_t written_size = tryWrite(data, size, offset, segment_kind, true);
return written_size == size;
}
size_t FileSegmentRangeWriter::tryWrite(const char * data, size_t size, size_t offset, FileSegmentKind segment_kind, bool strict)
{
size_t total_written_size = 0;
while (size > 0)
{
size_t written_size = tryWriteImpl(data, size, offset, segment_kind, strict);
chassert(written_size <= size);
if (written_size == 0)
break;
if (data)
data += written_size;
size -= written_size;
offset += written_size;
total_written_size += written_size;
}
return total_written_size;
}
size_t FileSegmentRangeWriter::tryWriteImpl(const char * data, size_t size, size_t offset, FileSegmentKind segment_kind, bool strict)
bool FileSegmentRangeWriter::write(const char * data, size_t size, size_t offset, bool is_persistent)
{
if (finalized)
return 0;
return false;
auto & file_segments = file_segments_holder.file_segments;
if (current_file_segment_it == file_segments.end())
{
current_file_segment_it = allocateFileSegment(current_file_segment_write_offset, segment_kind);
current_file_segment_it = allocateFileSegment(current_file_segment_write_offset, is_persistent);
}
else
{
@ -104,7 +78,7 @@ size_t FileSegmentRangeWriter::tryWriteImpl(const char * data, size_t size, size
if (file_segment->range().size() == file_segment->getDownloadedSize())
{
completeFileSegment(*file_segment);
current_file_segment_it = allocateFileSegment(current_file_segment_write_offset, segment_kind);
current_file_segment_it = allocateFileSegment(current_file_segment_write_offset, is_persistent);
}
}
@ -119,26 +93,20 @@ size_t FileSegmentRangeWriter::tryWriteImpl(const char * data, size_t size, size
file_segment->completePartAndResetDownloader();
});
size_t reserved_size = file_segment->tryReserve(size, strict);
if (reserved_size == 0 || (strict && reserved_size != size))
bool reserved = file_segment->reserve(size);
if (!reserved)
{
if (strict)
{
file_segment->completeWithState(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
appendFilesystemCacheLog(*file_segment);
}
file_segment->completeWithState(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
appendFilesystemCacheLog(*file_segment);
LOG_DEBUG(
&Poco::Logger::get("FileSegmentRangeWriter"),
"Unsuccessful space reservation attempt (size: {}, file segment info: {}",
size, file_segment->getInfoForLog());
return 0;
return false;
}
/// Shrink to reserved size, because we can't write more than reserved
size = reserved_size;
try
{
file_segment->write(data, size, offset);
@ -152,17 +120,7 @@ size_t FileSegmentRangeWriter::tryWriteImpl(const char * data, size_t size, size
file_segment->completePartAndResetDownloader();
current_file_segment_write_offset += size;
return size;
}
bool FileSegmentRangeWriter::reserve(size_t size, size_t offset)
{
return write(nullptr, size, offset, FileSegmentKind::Temporary);
}
size_t FileSegmentRangeWriter::tryReserve(size_t size, size_t offset)
{
return tryWrite(nullptr, size, offset, FileSegmentKind::Temporary);
return true;
}
void FileSegmentRangeWriter::finalize()
@ -171,7 +129,6 @@ void FileSegmentRangeWriter::finalize()
return;
auto & file_segments = file_segments_holder.file_segments;
if (file_segments.empty() || current_file_segment_it == file_segments.end())
return;
@ -192,7 +149,7 @@ FileSegmentRangeWriter::~FileSegmentRangeWriter()
}
}
FileSegments::iterator FileSegmentRangeWriter::allocateFileSegment(size_t offset, FileSegmentKind segment_kind)
FileSegments::iterator FileSegmentRangeWriter::allocateFileSegment(size_t offset, bool is_persistent)
{
/**
* Allocate a new file segment starting `offset`.
@ -201,7 +158,10 @@ FileSegments::iterator FileSegmentRangeWriter::allocateFileSegment(size_t offset
std::lock_guard cache_lock(cache->mutex);
CreateFileSegmentSettings create_settings(segment_kind);
CreateFileSegmentSettings create_settings
{
.is_persistent = is_persistent,
};
/// We set max_file_segment_size to be downloaded,
/// if we have less size to write, file segment will be resized in complete() method.
@ -236,15 +196,12 @@ void FileSegmentRangeWriter::appendFilesystemCacheLog(const FileSegment & file_s
}
}
void FileSegmentRangeWriter::completeFileSegment(FileSegment & file_segment, std::optional<FileSegment::State> state)
void FileSegmentRangeWriter::completeFileSegment(FileSegment & file_segment)
{
/// File segment can be detached if space reservation failed.
if (file_segment.isDetached())
return;
if (state.has_value())
file_segment.setDownloadState(*state);
file_segment.completeWithoutState();
appendFilesystemCacheLog(file_segment);
}
@ -313,8 +270,7 @@ void CachedOnDiskWriteBufferFromFile::cacheData(char * data, size_t size)
try
{
auto segment_kind = is_persistent_cache_file ? FileSegmentKind::Persistent : FileSegmentKind::Regular;
if (!cache_writer->write(data, size, current_download_offset, segment_kind))
if (!cache_writer->write(data, size, current_download_offset, is_persistent_cache_file))
{
LOG_INFO(log, "Write-through cache is stopped as cache limit is reached and nothing can be evicted");
return;

View File

@ -4,7 +4,6 @@
#include <IO/WriteSettings.h>
#include <Interpreters/Cache/FileCache.h>
#include <Interpreters/FilesystemCacheLog.h>
#include <Common/filesystemHelpers.h>
namespace Poco
{
@ -29,44 +28,22 @@ public:
FileCache * cache_, const FileSegment::Key & key_,
std::shared_ptr<FilesystemCacheLog> cache_log_, const String & query_id_, const String & source_path_);
/* Write a range of file segments.
* Allocate file segment of `max_file_segment_size` and write to it until it is full and then allocate next file segment.
* If it's impossible to allocate new file segment and reserve space to write all data, then returns false.
*
* Note: the data that was written to file segments before the error occurred is not rolled back.
*/
bool write(const char * data, size_t size, size_t offset, FileSegmentKind segment_kind);
/* Tries to write data to current file segment.
* Size of written data may be less than requested_size, because it may not be enough space.
*
* Returns size of written data.
*/
size_t tryWrite(const char * data, size_t size, size_t offset, FileSegmentKind segment_kind = FileSegmentKind::Regular, bool strict = false);
/// Same as `write/tryWrite`, but doesn't write anything, just reserves some space in cache
bool reserve(size_t size, size_t offset);
size_t tryReserve(size_t size, size_t offset);
/**
* Write a range of file segments. Allocate file segment of `max_file_segment_size` and write to
* it until it is full and then allocate next file segment.
*/
bool write(const char * data, size_t size, size_t offset, bool is_persistent);
void finalize();
size_t currentOffset() const { return current_file_segment_write_offset; }
~FileSegmentRangeWriter();
private:
FileSegments::iterator allocateFileSegment(size_t offset, FileSegmentKind segment_kind);
FileSegments::iterator allocateFileSegment(size_t offset, bool is_persistent);
void appendFilesystemCacheLog(const FileSegment & file_segment);
void completeFileSegment(FileSegment & file_segment, std::optional<FileSegment::State> state = {});
/* Writes data to current file segment as much as possible and returns size of written data, do not allocate new file segments
* In `strict` mode it will write all data or nothing, otherwise it will write as much as possible
* If returned non zero value, then we can try to write again to next file segment.
* If no space is available, returns zero.
*/
size_t tryWriteImpl(const char * data, size_t size, size_t offset, FileSegmentKind segment_kind, bool strict);
void completeFileSegment(FileSegment & file_segment);
FileCache * cache;
FileSegment::Key key;

View File

@ -1,73 +0,0 @@
#include <Disks/IO/FileCachePlaceholder.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_ENOUGH_SPACE;
}
void ISpacePlaceholder::reserveCapacity(size_t requested_capacity)
{
chassert(used_space <= capacity);
size_t remaining_space = capacity - used_space;
LOG_TEST(&Poco::Logger::get("ISpacePlaceholder"), "Reserving {} bytes (used_space: {}, capacity: {})", requested_capacity, used_space, capacity);
if (requested_capacity <= remaining_space)
return;
size_t capacity_to_reserve = requested_capacity - remaining_space;
reserveImpl(capacity_to_reserve);
capacity += capacity_to_reserve;
}
void ISpacePlaceholder::setUsed(size_t size)
{
LOG_TEST(&Poco::Logger::get("ISpacePlaceholder"), "Using {} bytes ({} already used, {} capacity)", size, used_space, capacity);
if (used_space + size > capacity)
{
LOG_WARNING(&Poco::Logger::get("ISpacePlaceholder"), "Used space is greater than capacity. It may lead to not enough space error");
reserveCapacity(size);
}
used_space = used_space + size;
}
FileCachePlaceholder::FileCachePlaceholder(FileCache * cache, const String & name)
: key_name(name)
, file_cache(cache)
{
}
void FileCachePlaceholder::reserveImpl(size_t requested_size)
{
/// We create new cache_writer and will try to reserve requested_size in it
String key = fmt::format("{}_{}", key_name, cache_writers.size());
auto cache_writer = std::make_unique<FileSegmentRangeWriter>(file_cache,
file_cache->hash(key),
/* cache_log_ */ nullptr,
/* query_id_ */ "",
/* source_path_ */ key);
size_t current_offset = cache_writer->currentOffset();
size_t reserved_size = cache_writer->tryReserve(requested_size, current_offset);
if (reserved_size != requested_size)
{
throw Exception(ErrorCodes::NOT_ENOUGH_SPACE,
"Cannot reserve space in file cache "
"({} bytes required, got {} reserved "
"{} / {} bytes used, "
"{} / {} elements used)"
, requested_size, reserved_size
, file_cache->getUsedCacheSize(), file_cache->getTotalMaxSize()
, file_cache->getFileSegmentsNum(), file_cache->getTotalMaxElements());
}
/// Add to cache_writers only if we successfully reserved space, otherwise free reserved_size back
cache_writers.push_back(std::move(cache_writer));
}
}

View File

@ -1,61 +0,0 @@
#pragma once
#include <Interpreters/Cache/FileCache.h>
#include <Disks/IO/CachedOnDiskWriteBufferFromFile.h>
#include <Poco/Logger.h>
#include <Poco/ConsoleChannel.h>
#include <filesystem>
namespace fs = std::filesystem;
namespace DB
{
/* ISpacePlaceholder is a base class for all classes that need to reserve space in some storage.
* You should resrve space with call reserveCapacity() before writing to it.
* After writing you should call setUsed() to let ISpacePlaceholder know how much space was used.
* It can be different because in some cases you don't know exact size of data you will write (because of compression, for example).
* It's better to reserve more space in advance not to overuse space.
*/
class ISpacePlaceholder
{
public:
/// Reserve space in storage
void reserveCapacity(size_t requested_capacity);
/// Indicate that some space is used
/// It uses reserved space if it is possible, otherwise it reserves more space
void setUsed(size_t size);
virtual ~ISpacePlaceholder() = default;
private:
virtual void reserveImpl(size_t size) = 0;
size_t capacity = 0;
size_t used_space = 0;
};
/* FileCachePlaceholder is a class that reserves space in FileCache.
* Data is written externally, and FileCachePlaceholder is only used to hold space in FileCache.
*/
class FileCachePlaceholder : public ISpacePlaceholder
{
public:
FileCachePlaceholder(FileCache * cache, const String & name);
void reserveImpl(size_t requested_size) override;
private:
std::string key_name;
FileCache * file_cache;
/// On each reserveImpl() call we create new FileSegmentRangeWriter that would be hold space
/// It's required to easily release already reserved space on unsuccessful attempt
std::vector<std::unique_ptr<FileSegmentRangeWriter>> cache_writers;
};
}

View File

@ -113,8 +113,6 @@ public:
WriteSettings getAdjustedSettingsFromMetadataFile(const WriteSettings & settings, const std::string & path) const override;
FileCachePtr getCache() const { return cache; }
private:
FileCache::Key getCacheKey(const std::string & path) const;

View File

@ -519,14 +519,6 @@ void DiskObjectStorage::wrapWithCache(FileCachePtr cache, const FileCacheSetting
object_storage = std::make_shared<CachedObjectStorage>(object_storage, cache, cache_settings, layer_name);
}
FileCachePtr DiskObjectStorage::getCache() const
{
const auto * cached_object_storage = typeid_cast<CachedObjectStorage *>(object_storage.get());
if (!cached_object_storage)
return nullptr;
return cached_object_storage->getCache();
}
NameSet DiskObjectStorage::getCacheLayersNames() const
{
NameSet cache_layers;

View File

@ -186,7 +186,6 @@ public:
/// There can be any number of cache layers:
/// DiskObjectStorage(CachedObjectStorage(...CacheObjectStorage(S3ObjectStorage)...))
void wrapWithCache(FileCachePtr cache, const FileCacheSettings & cache_settings, const String & layer_name);
FileCachePtr getCache() const;
/// Get structure of object storage this disk works with. Examples:
/// DiskObjectStorage(S3ObjectStorage)

View File

@ -1,20 +0,0 @@
#include <Disks/TemporaryFileInPath.h>
#include <Common/filesystemHelpers.h>
namespace DB
{
TemporaryFileInPath::TemporaryFileInPath(const String & folder_path)
: tmp_file(createTemporaryFile(folder_path))
{
chassert(tmp_file);
}
String TemporaryFileInPath::getPath() const
{
return tmp_file->path();
}
TemporaryFileInPath::~TemporaryFileInPath() = default;
}

View File

@ -1,21 +0,0 @@
#pragma once
#include <Disks/TemporaryFileOnDisk.h>
#include <Poco/TemporaryFile.h>
namespace DB
{
/// Wrapper around Poco::TemporaryFile to implement ITemporaryFile.
class TemporaryFileInPath : public ITemporaryFile
{
public:
explicit TemporaryFileInPath(const String & folder_path);
String getPath() const override;
~TemporaryFileInPath() override;
private:
std::unique_ptr<Poco::TemporaryFile> tmp_file;
};
}

View File

@ -2,7 +2,6 @@
#include <Poco/TemporaryFile.h>
#include <Common/CurrentMetrics.h>
#include <Common/logger_useful.h>
#include <Disks/TemporaryFileInPath.h>
#include <filesystem>
@ -16,6 +15,7 @@ namespace CurrentMetrics
extern const Metric TotalTemporaryFiles;
}
namespace DB
{

View File

@ -9,30 +9,21 @@ namespace DB
{
using DiskPtr = std::shared_ptr<IDisk>;
class ITemporaryFile
{
public:
virtual String getPath() const = 0;
virtual ~ITemporaryFile() = default;
};
using TemporaryFileHolder = std::unique_ptr<ITemporaryFile>;
/// This class helps with the handling of temporary files or directories.
/// A unique name for the temporary file or directory is automatically chosen based on a specified prefix.
/// Create a directory in the constructor.
/// The destructor always removes the temporary file or directory with all contained files.
class TemporaryFileOnDisk : public ITemporaryFile
class TemporaryFileOnDisk
{
public:
explicit TemporaryFileOnDisk(const DiskPtr & disk_);
explicit TemporaryFileOnDisk(const DiskPtr & disk_, CurrentMetrics::Value metric_scope);
explicit TemporaryFileOnDisk(const DiskPtr & disk_, const String & prefix);
~TemporaryFileOnDisk() override;
~TemporaryFileOnDisk();
DiskPtr getDisk() const { return disk; }
String getPath() const override;
String getPath() const;
private:
DiskPtr disk;

View File

@ -64,10 +64,8 @@ static void writeData(const ISerialization & serialization, const ColumnPtr & co
}
size_t NativeWriter::write(const Block & block)
void NativeWriter::write(const Block & block)
{
size_t written_before = ostr.count();
/// Additional information about the block.
if (client_revision > 0)
block.info.write(ostr);
@ -163,10 +161,6 @@ size_t NativeWriter::write(const Block & block)
if (index)
index->blocks.emplace_back(std::move(index_block));
size_t written_after = ostr.count();
size_t written_size = written_after - written_before;
return written_size;
}
}

View File

@ -27,9 +27,7 @@ public:
IndexForNativeFormat * index_ = nullptr, size_t initial_size_of_file_ = 0);
Block getHeader() const { return header; }
/// Returns the number of bytes written.
size_t write(const Block & block);
void write(const Block & block);
void flush();
static String getContentType() { return "application/octet-stream"; }

View File

@ -13,7 +13,7 @@ namespace ErrorCodes
}
WriteBufferFromTemporaryFile::WriteBufferFromTemporaryFile(std::unique_ptr<PocoTemporaryFile> && tmp_file_)
WriteBufferFromTemporaryFile::WriteBufferFromTemporaryFile(std::unique_ptr<TemporaryFile> && tmp_file_)
: WriteBufferFromFile(tmp_file_->path(), DBMS_DEFAULT_BUFFER_SIZE, O_RDWR | O_TRUNC | O_CREAT, 0600), tmp_file(std::move(tmp_file_))
{}
@ -40,11 +40,11 @@ public:
return std::make_shared<ReadBufferFromTemporaryWriteBuffer>(fd, file_name, std::move(origin->tmp_file));
}
ReadBufferFromTemporaryWriteBuffer(int fd_, const std::string & file_name_, std::unique_ptr<PocoTemporaryFile> && tmp_file_)
ReadBufferFromTemporaryWriteBuffer(int fd_, const std::string & file_name_, std::unique_ptr<TemporaryFile> && tmp_file_)
: ReadBufferFromFile(fd_, file_name_), tmp_file(std::move(tmp_file_))
{}
std::unique_ptr<PocoTemporaryFile> tmp_file;
std::unique_ptr<TemporaryFile> tmp_file;
};

View File

@ -20,11 +20,11 @@ public:
~WriteBufferFromTemporaryFile() override;
private:
explicit WriteBufferFromTemporaryFile(std::unique_ptr<PocoTemporaryFile> && tmp_file);
explicit WriteBufferFromTemporaryFile(std::unique_ptr<TemporaryFile> && tmp_file);
std::shared_ptr<ReadBuffer> getReadBufferImpl() override;
std::unique_ptr<PocoTemporaryFile> tmp_file;
std::unique_ptr<TemporaryFile> tmp_file;
friend class ReadBufferFromTemporaryWriteBuffer;
};

View File

@ -2,7 +2,6 @@
#include <Common/randomSeed.h>
#include <Common/SipHash.h>
#include <Common/logger_useful.h>
#include <Interpreters/Cache/FileCacheSettings.h>
#include <Interpreters/Cache/LRUFileCachePriority.h>
#include <IO/ReadHelpers.h>
@ -13,7 +12,6 @@
#include <pcg-random/pcg_random.hpp>
#include <filesystem>
namespace fs = std::filesystem;
namespace DB
@ -48,27 +46,13 @@ FileCache::Key FileCache::hash(const String & path)
return Key(sipHash128(path.data(), path.size()));
}
String FileCache::getPathInLocalCache(const Key & key, size_t offset, FileSegmentKind segment_kind) const
String FileCache::getPathInLocalCache(const Key & key, size_t offset, bool is_persistent) const
{
String file_suffix;
switch (segment_kind)
{
case FileSegmentKind::Persistent:
file_suffix = "_persistent";
break;
case FileSegmentKind::Temporary:
file_suffix = "_temporary";
break;
case FileSegmentKind::Regular:
file_suffix = "";
break;
}
auto key_str = key.toString();
return fs::path(cache_base_path)
/ key_str.substr(0, 3)
/ key_str
/ (std::to_string(offset) + file_suffix);
/ (std::to_string(offset) + (is_persistent ? "_persistent" : ""));
}
String FileCache::getPathInLocalCache(const Key & key) const
@ -556,6 +540,9 @@ FileSegmentPtr FileCache::createFileSegmentForDownload(
assertCacheCorrectness(key, cache_lock);
#endif
if (size > max_file_segment_size)
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Requested size exceeds max file segment size");
auto * cell = getCell(key, offset, cache_lock);
if (cell)
throw Exception(
@ -1012,17 +999,9 @@ void FileCache::loadCacheInfoIntoMemory(std::lock_guard<std::mutex> & cache_lock
fs::directory_iterator key_it{key_prefix_it->path()};
for (; key_it != fs::directory_iterator(); ++key_it)
{
if (key_it->is_regular_file())
if (!key_it->is_directory())
{
if (key_prefix_it->path().filename() == "tmp" && startsWith(key_it->path().filename(), "tmp"))
{
LOG_DEBUG(log, "Found temporary file '{}', will remove it", key_it->path().string());
fs::remove(key_it->path());
}
else
{
LOG_DEBUG(log, "Unexpected file: {}. Expected a directory", key_it->path().string());
}
LOG_DEBUG(log, "Unexpected file: {}. Expected a directory", key_it->path().string());
continue;
}
@ -1030,26 +1009,17 @@ void FileCache::loadCacheInfoIntoMemory(std::lock_guard<std::mutex> & cache_lock
fs::directory_iterator offset_it{key_it->path()};
for (; offset_it != fs::directory_iterator(); ++offset_it)
{
if (offset_it->is_directory())
{
LOG_DEBUG(log, "Unexpected directory: {}. Expected a file", offset_it->path().string());
continue;
}
auto offset_with_suffix = offset_it->path().filename().string();
auto delim_pos = offset_with_suffix.find('_');
bool parsed;
FileSegmentKind segment_kind = FileSegmentKind::Regular;
bool is_persistent = false;
if (delim_pos == std::string::npos)
parsed = tryParse<UInt64>(offset, offset_with_suffix);
else
{
parsed = tryParse<UInt64>(offset, offset_with_suffix.substr(0, delim_pos));
if (offset_with_suffix.substr(delim_pos+1) == "persistent")
segment_kind = FileSegmentKind::Persistent;
if (offset_with_suffix.substr(delim_pos+1) == "temporary")
segment_kind = FileSegmentKind::Temporary;
is_persistent = offset_with_suffix.substr(delim_pos+1) == "persistent";
}
if (!parsed)
@ -1069,7 +1039,7 @@ void FileCache::loadCacheInfoIntoMemory(std::lock_guard<std::mutex> & cache_lock
{
auto * cell = addCell(
key, offset, size, FileSegment::State::DOWNLOADED,
CreateFileSegmentSettings(segment_kind), cache_lock);
CreateFileSegmentSettings{ .is_persistent = is_persistent }, cache_lock);
if (cell)
queue_entries.emplace_back(cell->queue_iterator, cell->file_segment);
@ -1181,7 +1151,7 @@ std::vector<String> FileCache::tryGetCachePaths(const Key & key)
for (const auto & [offset, cell] : cells_by_offset)
{
if (cell.file_segment->state() == FileSegment::State::DOWNLOADED)
cache_paths.push_back(getPathInLocalCache(key, offset, cell.file_segment->getKind()));
cache_paths.push_back(getPathInLocalCache(key, offset, cell.file_segment->isPersistent()));
}
return cache_paths;
@ -1203,16 +1173,6 @@ size_t FileCache::getAvailableCacheSizeUnlocked(std::lock_guard<std::mutex> & ca
return max_size - getUsedCacheSizeUnlocked(cache_lock);
}
size_t FileCache::getTotalMaxSize() const
{
return max_size;
}
size_t FileCache::getTotalMaxElements() const
{
return max_element_size;
}
size_t FileCache::getFileSegmentsNum() const
{
std::lock_guard cache_lock(mutex);

View File

@ -80,7 +80,7 @@ public:
static Key hash(const String & path);
String getPathInLocalCache(const Key & key, size_t offset, FileSegmentKind segment_kind) const;
String getPathInLocalCache(const Key & key, size_t offset, bool is_persistent) const;
String getPathInLocalCache(const Key & key) const;
@ -89,10 +89,8 @@ public:
size_t capacity() const { return max_size; }
size_t getUsedCacheSize() const;
size_t getTotalMaxSize() const;
size_t getFileSegmentsNum() const;
size_t getTotalMaxElements() const;
static bool isReadOnly();
@ -223,8 +221,6 @@ private:
FileSegmentCell * getCell(const Key & key, size_t offset, std::lock_guard<std::mutex> & cache_lock);
/// Returns non-owened pointer to the cell stored in the `files` map.
/// Doesn't reserve any space.
FileSegmentCell * addCell(
const Key & key,
size_t offset,

View File

@ -31,21 +31,14 @@ const FileCacheSettings & FileCacheFactory::getSettings(const std::string & cach
}
FileCachePtr FileCacheFactory::tryGet(const std::string & cache_base_path)
FileCachePtr FileCacheFactory::get(const std::string & cache_base_path)
{
std::lock_guard lock(mutex);
auto it = caches_by_path.find(cache_base_path);
if (it == caches_by_path.end())
return nullptr;
return it->second->cache;
}
FileCachePtr FileCacheFactory::get(const std::string & cache_base_path)
{
auto file_cache_ptr = tryGet(cache_base_path);
if (!file_cache_ptr)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "No cache found by path: {}", cache_base_path);
return file_cache_ptr;
return it->second->cache;
}
FileCachePtr FileCacheFactory::getOrCreate(

View File

@ -33,7 +33,6 @@ public:
FileCachePtr getOrCreate(const std::string & cache_base_path, const FileCacheSettings & file_cache_settings, const std::string & name);
FileCachePtr tryGet(const std::string & cache_base_path);
FileCachePtr get(const std::string & cache_base_path);
CacheByBasePath getAll();

View File

@ -23,19 +23,6 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
String toString(FileSegmentKind type)
{
switch (type)
{
case FileSegmentKind::Regular:
return "Regular";
case FileSegmentKind::Persistent:
return "Persistent";
case FileSegmentKind::Temporary:
return "Temporary";
}
}
FileSegment::FileSegment(
size_t offset_,
size_t size_,
@ -52,7 +39,7 @@ FileSegment::FileSegment(
#else
, log(&Poco::Logger::get("FileSegment"))
#endif
, segment_kind(settings.type)
, is_persistent(settings.is_persistent)
{
/// On creation, file segment state can be EMPTY, DOWNLOADED, DOWNLOADING.
switch (download_state)
@ -86,8 +73,7 @@ FileSegment::FileSegment(
String FileSegment::getPathInLocalCache() const
{
chassert(cache);
return cache->getPathInLocalCache(key(), offset(), segment_kind);
return cache->getPathInLocalCache(key(), offset(), isPersistent());
}
FileSegment::State FileSegment::state() const
@ -323,7 +309,7 @@ void FileSegment::write(const char * from, size_t size, size_t offset)
if (current_downloaded_size == range().size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "File segment is already fully downloaded");
if (!cache_writer && from != nullptr)
if (!cache_writer)
{
if (current_downloaded_size > 0)
throw Exception(
@ -338,14 +324,11 @@ void FileSegment::write(const char * from, size_t size, size_t offset)
try
{
/// if `from` is nullptr, then we just allocate and hold space by current segment and it was (or would) be written outside
if (cache_writer && from != nullptr)
cache_writer->write(from, size);
cache_writer->write(from, size);
std::unique_lock download_lock(download_mutex);
if (cache_writer && from != nullptr)
cache_writer->next();
cache_writer->next();
downloaded_size += size;
}
@ -396,13 +379,6 @@ FileSegment::State FileSegment::wait()
}
bool FileSegment::reserve(size_t size_to_reserve)
{
size_t reserved = tryReserve(size_to_reserve, true);
assert(reserved == 0 || reserved == size_to_reserve);
return reserved == size_to_reserve;
}
size_t FileSegment::tryReserve(size_t size_to_reserve, bool strict)
{
if (!size_to_reserve)
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Zero space reservation is not allowed");
@ -418,16 +394,10 @@ size_t FileSegment::tryReserve(size_t size_to_reserve, bool strict)
expected_downloaded_size = getDownloadedSizeUnlocked(segment_lock);
if (expected_downloaded_size + size_to_reserve > range().size())
{
if (strict)
{
throw Exception(
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
"Attempt to reserve space too much space ({}) for file segment with range: {} (downloaded size: {})",
size_to_reserve, range().toString(), downloaded_size);
}
size_to_reserve = range().size() - expected_downloaded_size;
}
throw Exception(
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
"Attempt to reserve space too much space ({}) for file segment with range: {} (downloaded size: {})",
size_to_reserve, range().toString(), downloaded_size);
chassert(reserved_size >= expected_downloaded_size);
}
@ -445,16 +415,17 @@ size_t FileSegment::tryReserve(size_t size_to_reserve, bool strict)
{
std::lock_guard cache_lock(cache->mutex);
size_t need_to_reserve = size_to_reserve - already_reserved_size;
reserved = cache->tryReserve(key(), offset(), need_to_reserve, cache_lock);
size_to_reserve = size_to_reserve - already_reserved_size;
reserved = cache->tryReserve(key(), offset(), size_to_reserve, cache_lock);
if (!reserved)
return 0;
std::lock_guard segment_lock(mutex);
reserved_size += need_to_reserve;
if (reserved)
{
std::lock_guard segment_lock(mutex);
reserved_size += size_to_reserve;
}
}
return size_to_reserve;
return reserved;
}
void FileSegment::setDownloadedUnlocked([[maybe_unused]] std::unique_lock<std::mutex> & segment_lock)
@ -574,15 +545,6 @@ void FileSegment::completeBasedOnCurrentState(std::lock_guard<std::mutex> & cach
resetDownloaderUnlocked(segment_lock);
}
if (segment_kind == FileSegmentKind::Temporary && is_last_holder)
{
LOG_TEST(log, "Removing temporary file segment: {}", getInfoForLogUnlocked(segment_lock));
detach(cache_lock, segment_lock);
setDownloadState(State::SKIP_CACHE);
cache->remove(key(), offset(), cache_lock, segment_lock);
return;
}
switch (download_state)
{
case State::SKIP_CACHE:
@ -664,7 +626,7 @@ String FileSegment::getInfoForLogUnlocked(std::unique_lock<std::mutex> & segment
info << "first non-downloaded offset: " << getFirstNonDownloadedOffsetUnlocked(segment_lock) << ", ";
info << "caller id: " << getCallerId() << ", ";
info << "detached: " << is_detached << ", ";
info << "kind: " << toString(segment_kind);
info << "persistent: " << is_persistent;
return info.str();
}
@ -759,7 +721,7 @@ FileSegmentPtr FileSegment::getSnapshot(const FileSegmentPtr & file_segment, std
snapshot->ref_count = file_segment.use_count();
snapshot->downloaded_size = file_segment->getDownloadedSizeUnlocked(segment_lock);
snapshot->download_state = file_segment->download_state;
snapshot->segment_kind = file_segment->getKind();
snapshot->is_persistent = file_segment->isPersistent();
return snapshot;
}
@ -821,8 +783,6 @@ FileSegmentsHolder::~FileSegmentsHolder()
if (!cache)
cache = file_segment->cache;
assert(cache == file_segment->cache); /// all segments should belong to the same cache
try
{
bool is_detached = false;

View File

@ -30,38 +30,9 @@ using FileSegmentPtr = std::shared_ptr<FileSegment>;
using FileSegments = std::list<FileSegmentPtr>;
/*
* FileSegmentKind is used to specify the eviction policy for file segments.
*/
enum class FileSegmentKind
{
/* `Regular` file segment is still in cache after usage, and can be evicted
* (unless there're some holders).
*/
Regular,
/* `Persistent` file segment can't be evicted from cache,
* it should be removed manually.
*/
Persistent,
/* `Temporary` file segment is removed right after relesing.
* Also corresponding files are removed during cache loading (if any).
*/
Temporary,
};
String toString(FileSegmentKind type);
struct CreateFileSegmentSettings
{
FileSegmentKind type = FileSegmentKind::Regular;
CreateFileSegmentSettings() = default;
explicit CreateFileSegmentSettings(FileSegmentKind type_)
: type(type_)
{}
bool is_persistent = false;
};
class FileSegment : private boost::noncopyable, public std::enable_shared_from_this<FileSegment>
@ -156,8 +127,7 @@ public:
size_t offset() const { return range().left; }
FileSegmentKind getKind() const { return segment_kind; }
bool isPersistent() const { return segment_kind == FileSegmentKind::Persistent; }
bool isPersistent() const { return is_persistent; }
using UniqueId = std::pair<FileCacheKey, size_t>;
UniqueId getUniqueId() const { return std::pair(key(), offset()); }
@ -213,19 +183,19 @@ public:
void assertCorrectness() const;
/**
* ========== Methods for _only_ file segment's `writer` ======================
*/
void synchronousWrite(const char * from, size_t size, size_t offset);
/**
* ========== Methods for _only_ file segment's `downloader` ==================
*/
/// Try to reserve exactly `size` bytes.
/// Returns true if reservation was successful, false otherwise.
bool reserve(size_t size_to_reserve);
/// Try to reserve at max `size` bytes.
/// Returns actual size reserved.
/// In strict mode throws an error on attempt to reserve space too much space
size_t tryReserve(size_t size_to_reserve, bool strict = false);
/// Write data into reserved space.
void write(const char * from, size_t size, size_t offset);
@ -277,9 +247,9 @@ private:
void assertIsDownloaderUnlocked(const std::string & operation, std::unique_lock<std::mutex> & segment_lock) const;
void assertCorrectnessUnlocked(std::unique_lock<std::mutex> & segment_lock) const;
/// completeWithoutStateUnlocked() is called from destructor of FileSegmentsHolder.
/// Function might check if the caller of the method
/// is the last alive holder of the segment. Therefore, completion and destruction
/// complete() without any completion state is called from destructor of
/// FileSegmentsHolder. complete() might check if the caller of the method
/// is the last alive holder of the segment. Therefore, complete() and destruction
/// of the file segment pointer must be done under the same cache mutex.
void completeWithoutStateUnlocked(std::lock_guard<std::mutex> & cache_lock);
void completeBasedOnCurrentState(std::lock_guard<std::mutex> & cache_lock, std::unique_lock<std::mutex> & segment_lock);
@ -325,12 +295,12 @@ private:
/// In general case, all file segments are owned by cache.
bool is_detached = false;
bool is_downloaded = false;
bool is_downloaded{false};
std::atomic<size_t> hits_count = 0; /// cache hits.
std::atomic<size_t> ref_count = 0; /// Used for getting snapshot state
FileSegmentKind segment_kind;
bool is_persistent;
CurrentMetrics::Increment metric_increment{CurrentMetrics::CacheFileSegments};
};
@ -343,8 +313,6 @@ struct FileSegmentsHolder : private boost::noncopyable
FileSegmentsHolder(FileSegmentsHolder && other) noexcept : file_segments(std::move(other.file_segments)) {}
void reset() { file_segments.clear(); }
~FileSegmentsHolder();
String toString();

View File

@ -32,7 +32,6 @@
#include <Storages/StorageS3Settings.h>
#include <Disks/DiskLocal.h>
#include <Disks/DiskDecorator.h>
#include <Disks/ObjectStorages/DiskObjectStorage.h>
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Disks/IO/ThreadPoolRemoteFSReader.h>
#include <Disks/IO/ThreadPoolReader.h>
@ -103,7 +102,6 @@
#include <Interpreters/Lemmatizers.h>
#include <Interpreters/ClusterDiscovery.h>
#include <Interpreters/TransactionLog.h>
#include <Interpreters/Cache/FileCacheFactory.h>
#include <filesystem>
#include <re2/re2.h>
@ -748,65 +746,28 @@ void Context::setPath(const String & path)
shared->user_scripts_path = shared->path + "user_scripts/";
}
static void setupTmpPath(Poco::Logger * log, const std::string & path)
try
{
LOG_DEBUG(log, "Setting up {} to store temporary data in it", path);
fs::create_directories(path);
/// Clearing old temporary files.
fs::directory_iterator dir_end;
for (fs::directory_iterator it(path); it != dir_end; ++it)
{
if (it->is_regular_file() && startsWith(it->path().filename(), "tmp"))
{
LOG_DEBUG(log, "Removing old temporary file {}", it->path().string());
fs::remove(it->path());
}
else
LOG_DEBUG(log, "Found unknown file in temporary path {}", it->path().string());
}
}
catch (...)
{
DB::tryLogCurrentException(log, fmt::format(
"Caught exception while setup temporary path: {}. "
"It is ok to skip this exception as cleaning old temporary files is not necessary", path));
}
static VolumePtr createLocalSingleDiskVolume(const std::string & path)
{
auto disk = std::make_shared<DiskLocal>("_tmp_default", path, 0);
VolumePtr volume = std::make_shared<SingleDiskVolume>("_tmp_default", disk, 0);
return volume;
}
void Context::setTemporaryStoragePath(const String & path, size_t max_size)
{
shared->tmp_path = path;
if (!shared->tmp_path.ends_with('/'))
shared->tmp_path += '/';
VolumePtr volume = createLocalSingleDiskVolume(shared->tmp_path);
for (const auto & disk : volume->getDisks())
{
setupTmpPath(shared->log, disk->getPath());
}
shared->temp_data_on_disk = std::make_shared<TemporaryDataOnDiskScope>(volume, nullptr, max_size);
}
void Context::setTemporaryStoragePolicy(const String & policy_name, size_t max_size)
VolumePtr Context::setTemporaryStorage(const String & path, const String & policy_name, size_t max_size)
{
std::lock_guard lock(shared->storage_policies_mutex);
VolumePtr volume;
StoragePolicyPtr tmp_policy = getStoragePolicySelector(lock)->get(policy_name);
if (tmp_policy->getVolumes().size() != 1)
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG,
"Policy '{}' is used temporary files, such policy should have exactly one volume", policy_name);
VolumePtr volume = tmp_policy->getVolume(0);
if (policy_name.empty())
{
shared->tmp_path = path;
if (!shared->tmp_path.ends_with('/'))
shared->tmp_path += '/';
auto disk = std::make_shared<DiskLocal>("_tmp_default", shared->tmp_path, 0);
volume = std::make_shared<SingleDiskVolume>("_tmp_default", disk, 0);
}
else
{
StoragePolicyPtr tmp_policy = getStoragePolicySelector(lock)->get(policy_name);
if (tmp_policy->getVolumes().size() != 1)
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG,
"Policy '{}' is used temporary files, such policy should have exactly one volume", policy_name);
volume = tmp_policy->getVolume(0);
}
if (volume->getDisks().empty())
throw Exception("No disks volume for temporary files", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
@ -828,33 +789,10 @@ void Context::setTemporaryStoragePolicy(const String & policy_name, size_t max_s
"Disk '{}' ({}) is not local and can't be used for temporary files",
disk_ptr->getName(), typeid(*disk_raw_ptr).name());
}
setupTmpPath(shared->log, disk->getPath());
}
shared->temp_data_on_disk = std::make_shared<TemporaryDataOnDiskScope>(volume, nullptr, max_size);
}
void Context::setTemporaryStorageInCache(const String & cache_disk_name, size_t max_size)
{
auto disk_ptr = getDisk(cache_disk_name);
if (!disk_ptr)
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "Disk '{}' is not found", cache_disk_name);
const auto * disk_object_storage_ptr = dynamic_cast<const DiskObjectStorage *>(disk_ptr.get());
if (!disk_object_storage_ptr)
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "Disk '{}' does not use cache", cache_disk_name);
auto file_cache = disk_object_storage_ptr->getCache();
if (!file_cache)
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "Cache '{}' is not found", file_cache->getBasePath());
LOG_DEBUG(shared->log, "Using file cache ({}) for temporary files", file_cache->getBasePath());
shared->tmp_path = file_cache->getBasePath();
VolumePtr volume = createLocalSingleDiskVolume(shared->tmp_path);
shared->temp_data_on_disk = std::make_shared<TemporaryDataOnDiskScope>(volume, file_cache.get(), max_size);
shared->temp_data_on_disk = std::make_shared<TemporaryDataOnDiskScope>(volume, max_size);
return volume;
}
void Context::setFlagsPath(const String & path)

View File

@ -463,9 +463,7 @@ public:
void addWarningMessage(const String & msg) const;
void setTemporaryStorageInCache(const String & cache_disk_name, size_t max_size);
void setTemporaryStoragePolicy(const String & policy_name, size_t max_size);
void setTemporaryStoragePath(const String & path, size_t max_size);
VolumePtr setTemporaryStorage(const String & path, const String & policy_name, size_t max_size);
using ConfigurationPtr = Poco::AutoPtr<Poco::Util::AbstractConfiguration>;

View File

@ -7,7 +7,6 @@
#include <Formats/NativeWriter.h>
#include <Formats/NativeReader.h>
#include <Core/ProtocolDefines.h>
#include <Disks/TemporaryFileInPath.h>
#include <Common/logger_useful.h>
@ -36,32 +35,35 @@ void TemporaryDataOnDiskScope::deltaAllocAndCheck(ssize_t compressed_delta, ssiz
size_t new_consumprion = stat.compressed_size + compressed_delta;
if (compressed_delta > 0 && limit && new_consumprion > limit)
throw Exception(ErrorCodes::TOO_MANY_ROWS_OR_BYTES,
"Limit for temporary files size exceeded (would consume {} / {} bytes)", new_consumprion, limit);
throw Exception(ErrorCodes::TOO_MANY_ROWS_OR_BYTES, "Limit for temporary files size exceeded");
stat.compressed_size += compressed_delta;
stat.uncompressed_size += uncompressed_delta;
}
VolumePtr TemporaryDataOnDiskScope::getVolume() const
{
if (!volume)
throw Exception("TemporaryDataOnDiskScope has no volume", ErrorCodes::LOGICAL_ERROR);
return volume;
}
TemporaryFileStream & TemporaryDataOnDisk::createStream(const Block & header, size_t max_file_size)
{
TemporaryFileStreamPtr tmp_stream;
if (cache)
tmp_stream = TemporaryFileStream::create(cache, header, max_file_size, this);
DiskPtr disk;
if (max_file_size > 0)
{
auto reservation = volume->reserve(max_file_size);
if (!reservation)
throw Exception("Not enough space on temporary disk", ErrorCodes::NOT_ENOUGH_SPACE);
disk = reservation->getDisk();
}
else
tmp_stream = TemporaryFileStream::create(volume, header, max_file_size, this);
{
disk = volume->getDisk();
}
auto tmp_file = std::make_unique<TemporaryFileOnDisk>(disk, current_metric_scope);
std::lock_guard lock(mutex);
return *streams.emplace_back(std::move(tmp_stream));
TemporaryFileStreamPtr & tmp_stream = streams.emplace_back(std::make_unique<TemporaryFileStream>(std::move(tmp_file), header, this));
return *tmp_stream;
}
std::vector<TemporaryFileStream *> TemporaryDataOnDisk::getStreams() const
{
std::vector<TemporaryFileStream *> res;
@ -87,13 +89,12 @@ struct TemporaryFileStream::OutputWriter
{
}
size_t write(const Block & block)
void write(const Block & block)
{
if (finalized)
throw Exception("Cannot write to finalized stream", ErrorCodes::LOGICAL_ERROR);
size_t written_bytes = out_writer.write(block);
out_writer.write(block);
num_rows += block.rows();
return written_bytes;
}
void finalize()
@ -154,68 +155,21 @@ struct TemporaryFileStream::InputReader
NativeReader in_reader;
};
TemporaryFileStreamPtr TemporaryFileStream::create(const VolumePtr & volume, const Block & header, size_t max_file_size, TemporaryDataOnDisk * parent_)
{
if (!volume)
throw Exception("TemporaryDataOnDiskScope has no volume", ErrorCodes::LOGICAL_ERROR);
DiskPtr disk;
if (max_file_size > 0)
{
auto reservation = volume->reserve(max_file_size);
if (!reservation)
throw Exception("Not enough space on temporary disk", ErrorCodes::NOT_ENOUGH_SPACE);
disk = reservation->getDisk();
}
else
{
disk = volume->getDisk();
}
auto tmp_file = std::make_unique<TemporaryFileOnDisk>(disk, parent_->getMetricScope());
return std::make_unique<TemporaryFileStream>(std::move(tmp_file), header, /* cache_placeholder */ nullptr, /* parent */ parent_);
}
TemporaryFileStreamPtr TemporaryFileStream::create(FileCache * cache, const Block & header, size_t max_file_size, TemporaryDataOnDisk * parent_)
{
auto tmp_file = std::make_unique<TemporaryFileInPath>(fs::path(cache->getBasePath()) / "tmp");
auto cache_placeholder = std::make_unique<FileCachePlaceholder>(cache, tmp_file->getPath());
cache_placeholder->reserveCapacity(max_file_size);
return std::make_unique<TemporaryFileStream>(std::move(tmp_file), header, std::move(cache_placeholder), parent_);
}
TemporaryFileStream::TemporaryFileStream(
TemporaryFileHolder file_,
const Block & header_,
std::unique_ptr<ISpacePlaceholder> space_holder_,
TemporaryDataOnDisk * parent_)
TemporaryFileStream::TemporaryFileStream(TemporaryFileOnDiskHolder file_, const Block & header_, TemporaryDataOnDisk * parent_)
: parent(parent_)
, header(header_)
, file(std::move(file_))
, space_holder(std::move(space_holder_))
, out_writer(std::make_unique<OutputWriter>(file->getPath(), header))
{
}
size_t TemporaryFileStream::write(const Block & block)
void TemporaryFileStream::write(const Block & block)
{
if (!out_writer)
throw Exception("Writing has been finished", ErrorCodes::LOGICAL_ERROR);
size_t block_size_in_memory = block.bytes();
if (space_holder)
space_holder->reserveCapacity(block_size_in_memory);
updateAllocAndCheck();
size_t bytes_written = out_writer->write(block);
if (space_holder)
space_holder->setUsed(bytes_written);
return bytes_written;
out_writer->write(block);
}
TemporaryFileStream::Stat TemporaryFileStream::finishWriting()

View File

@ -6,7 +6,6 @@
#include <Disks/TemporaryFileOnDisk.h>
#include <Disks/IVolume.h>
#include <Common/CurrentMetrics.h>
#include <Disks/IO/FileCachePlaceholder.h>
namespace CurrentMetrics
@ -41,25 +40,23 @@ public:
std::atomic<size_t> uncompressed_size;
};
explicit TemporaryDataOnDiskScope(VolumePtr volume_, FileCache * cache_, size_t limit_)
: volume(std::move(volume_)), cache(cache_), limit(limit_)
explicit TemporaryDataOnDiskScope(VolumePtr volume_, size_t limit_)
: volume(std::move(volume_)), limit(limit_)
{}
explicit TemporaryDataOnDiskScope(TemporaryDataOnDiskScopePtr parent_, size_t limit_)
: parent(std::move(parent_)), volume(parent->volume), cache(parent->cache), limit(limit_)
: parent(std::move(parent_)), volume(parent->volume), limit(limit_)
{}
/// TODO: remove
/// Refactor all code that uses volume directly to use TemporaryDataOnDisk.
VolumePtr getVolume() const;
VolumePtr getVolume() const { return volume; }
protected:
void deltaAllocAndCheck(ssize_t compressed_delta, ssize_t uncompressed_delta);
TemporaryDataOnDiskScopePtr parent = nullptr;
VolumePtr volume;
FileCache * cache = nullptr;
StatAtomic stat;
size_t limit = 0;
@ -94,7 +91,6 @@ public:
bool empty() const;
const StatAtomic & getStat() const { return stat; }
CurrentMetrics::Value getMetricScope() const { return current_metric_scope; }
private:
mutable std::mutex mutex;
@ -120,14 +116,9 @@ public:
size_t num_rows = 0;
};
static TemporaryFileStreamPtr create(const VolumePtr & volume, const Block & header, size_t max_file_size, TemporaryDataOnDisk * parent_);
static TemporaryFileStreamPtr create(FileCache * cache, const Block & header, size_t max_file_size, TemporaryDataOnDisk * parent_);
TemporaryFileStream(TemporaryFileHolder file_, const Block & header_, std::unique_ptr<ISpacePlaceholder> space_holder, TemporaryDataOnDisk * parent_);
/// Returns number of written bytes
size_t write(const Block & block);
TemporaryFileStream(TemporaryFileOnDiskHolder file_, const Block & header_, TemporaryDataOnDisk * parent_);
void write(const Block & block);
Stat finishWriting();
bool isWriteFinished() const;
@ -151,8 +142,7 @@ private:
Block header;
TemporaryFileHolder file;
std::unique_ptr<ISpacePlaceholder> space_holder;
TemporaryFileOnDiskHolder file;
Stat stat;

View File

@ -6,7 +6,6 @@
#include <Common/CurrentThread.h>
#include <Common/filesystemHelpers.h>
#include <Interpreters/Cache/FileCacheSettings.h>
#include <Interpreters/TemporaryDataOnDisk.h>
#include <Common/tests/gtest_global_context.h>
#include <Common/SipHash.h>
#include <Common/hex.h>
@ -15,14 +14,11 @@
#include <IO/WriteHelpers.h>
#include <filesystem>
#include <thread>
#include <DataTypes/DataTypesNumber.h>
#include <Disks/IO/CachedOnDiskWriteBufferFromFile.h>
namespace fs = std::filesystem;
using namespace DB;
static constexpr auto TEST_LOG_LEVEL = "debug";
fs::path caches_dir = fs::current_path() / "lru_cache_test";
String cache_base_path = caches_dir / "cache1" / "";
void assertRange(
[[maybe_unused]] size_t assert_n, DB::FileSegmentPtr file_segment,
@ -57,7 +53,7 @@ String getFileSegmentPath(const String & base_path, const DB::FileCache::Key & k
return fs::path(base_path) / key_str.substr(0, 3) / key_str / DB::toString(offset);
}
void download(const std::string & cache_base_path, DB::FileSegmentPtr file_segment)
void download(DB::FileSegmentPtr file_segment)
{
const auto & key = file_segment->key();
size_t size = file_segment->range().size();
@ -71,57 +67,30 @@ void download(const std::string & cache_base_path, DB::FileSegmentPtr file_segme
file_segment->write(data.data(), size, file_segment->getCurrentWriteOffset());
}
void prepareAndDownload(const std::string & cache_base_path, DB::FileSegmentPtr file_segment)
void prepareAndDownload(DB::FileSegmentPtr file_segment)
{
// std::cerr << "Reserving: " << file_segment->range().size() << " for: " << file_segment->range().toString() << "\n";
ASSERT_TRUE(file_segment->reserve(file_segment->range().size()));
download(cache_base_path, file_segment);
download(file_segment);
}
void complete(const std::string & cache_base_path, const DB::FileSegmentsHolder & holder)
void complete(const DB::FileSegmentsHolder & holder)
{
for (const auto & file_segment : holder.file_segments)
{
ASSERT_TRUE(file_segment->getOrSetDownloader() == DB::FileSegment::getCallerId());
prepareAndDownload(cache_base_path, file_segment);
prepareAndDownload(file_segment);
file_segment->completeWithState(DB::FileSegment::State::DOWNLOADED);
}
}
class FileCacheTest : public ::testing::Test
TEST(FileCache, get)
{
public:
if (fs::exists(cache_base_path))
fs::remove_all(cache_base_path);
fs::create_directories(cache_base_path);
static void setupLogs(const std::string & level)
{
Poco::AutoPtr<Poco::ConsoleChannel> channel(new Poco::ConsoleChannel(std::cerr));
Poco::Logger::root().setChannel(channel);
Poco::Logger::root().setLevel(level);
}
void SetUp() override
{
if(const char * test_log_level = std::getenv("TEST_LOG_LEVEL")) // NOLINT(concurrency-mt-unsafe)
setupLogs(test_log_level);
else
setupLogs(TEST_LOG_LEVEL);
if (fs::exists(cache_base_path))
fs::remove_all(cache_base_path);
fs::create_directories(cache_base_path);
}
void TearDown() override
{
if (fs::exists(cache_base_path))
fs::remove_all(cache_base_path);
}
fs::path caches_dir = fs::current_path() / "lru_cache_test";
std::string cache_base_path = caches_dir / "cache1" / "";
};
TEST_F(FileCacheTest, get)
{
DB::ThreadStatus thread_status;
/// To work with cache need query_id and query context.
@ -157,7 +126,7 @@ TEST_F(FileCacheTest, get)
ASSERT_TRUE(segments[0]->reserve(segments[0]->range().size()));
assertRange(2, segments[0], DB::FileSegment::Range(0, 9), DB::FileSegment::State::DOWNLOADING);
download(cache_base_path, segments[0]);
download(segments[0]);
segments[0]->completeWithState(DB::FileSegment::State::DOWNLOADED);
assertRange(3, segments[0], DB::FileSegment::Range(0, 9), DB::FileSegment::State::DOWNLOADED);
}
@ -178,7 +147,7 @@ TEST_F(FileCacheTest, get)
assertRange(5, segments[1], DB::FileSegment::Range(10, 14), DB::FileSegment::State::EMPTY);
ASSERT_TRUE(segments[1]->getOrSetDownloader() == DB::FileSegment::getCallerId());
prepareAndDownload(cache_base_path, segments[1]);
prepareAndDownload(segments[1]);
segments[1]->completeWithState(DB::FileSegment::State::DOWNLOADED);
assertRange(6, segments[1], DB::FileSegment::Range(10, 14), DB::FileSegment::State::DOWNLOADED);
}
@ -211,8 +180,8 @@ TEST_F(FileCacheTest, get)
assertRange(10, segments[0], DB::FileSegment::Range(10, 14), DB::FileSegment::State::DOWNLOADED);
}
complete(cache_base_path, cache.getOrSet(key, 17, 4, {})); /// Get [17, 20]
complete(cache_base_path, cache.getOrSet(key, 24, 3, {})); /// Get [24, 26]
complete(cache.getOrSet(key, 17, 4, {})); /// Get [17, 20]
complete(cache.getOrSet(key, 24, 3, {})); /// Get [24, 26]
/// completeWithState(cache.getOrSet(key, 27, 1, false)); /// Get [27, 27]
/// Current cache: [__________][_____] [____] [___][]
@ -234,7 +203,7 @@ TEST_F(FileCacheTest, get)
assertRange(13, segments[2], DB::FileSegment::Range(15, 16), DB::FileSegment::State::EMPTY);
ASSERT_TRUE(segments[2]->getOrSetDownloader() == DB::FileSegment::getCallerId());
prepareAndDownload(cache_base_path, segments[2]);
prepareAndDownload(segments[2]);
segments[2]->completeWithState(DB::FileSegment::State::DOWNLOADED);
@ -275,7 +244,7 @@ TEST_F(FileCacheTest, get)
assertRange(21, segments[3], DB::FileSegment::Range(21, 21), DB::FileSegment::State::EMPTY);
ASSERT_TRUE(segments[3]->getOrSetDownloader() == DB::FileSegment::getCallerId());
prepareAndDownload(cache_base_path, segments[3]);
prepareAndDownload(segments[3]);
segments[3]->completeWithState(DB::FileSegment::State::DOWNLOADED);
ASSERT_TRUE(segments[3]->state() == DB::FileSegment::State::DOWNLOADED);
@ -298,8 +267,8 @@ TEST_F(FileCacheTest, get)
ASSERT_TRUE(segments[0]->getOrSetDownloader() == DB::FileSegment::getCallerId());
ASSERT_TRUE(segments[2]->getOrSetDownloader() == DB::FileSegment::getCallerId());
prepareAndDownload(cache_base_path, segments[0]);
prepareAndDownload(cache_base_path, segments[2]);
prepareAndDownload(segments[0]);
prepareAndDownload(segments[2]);
segments[0]->completeWithState(DB::FileSegment::State::DOWNLOADED);
segments[2]->completeWithState(DB::FileSegment::State::DOWNLOADED);
}
@ -321,8 +290,8 @@ TEST_F(FileCacheTest, get)
ASSERT_TRUE(s5[0]->getOrSetDownloader() == DB::FileSegment::getCallerId());
ASSERT_TRUE(s1[0]->getOrSetDownloader() == DB::FileSegment::getCallerId());
prepareAndDownload(cache_base_path, s5[0]);
prepareAndDownload(cache_base_path, s1[0]);
prepareAndDownload(s5[0]);
prepareAndDownload(s1[0]);
s5[0]->completeWithState(DB::FileSegment::State::DOWNLOADED);
s1[0]->completeWithState(DB::FileSegment::State::DOWNLOADED);
@ -425,7 +394,7 @@ TEST_F(FileCacheTest, get)
cv.wait(lock, [&]{ return lets_start_download; });
}
prepareAndDownload(cache_base_path, segments[2]);
prepareAndDownload(segments[2]);
segments[2]->completeWithState(DB::FileSegment::State::DOWNLOADED);
ASSERT_TRUE(segments[2]->state() == DB::FileSegment::State::DOWNLOADED);
@ -490,7 +459,7 @@ TEST_F(FileCacheTest, get)
ASSERT_TRUE(segments_2[1]->state() == DB::FileSegment::State::PARTIALLY_DOWNLOADED);
ASSERT_TRUE(segments_2[1]->getOrSetDownloader() == DB::FileSegment::getCallerId());
prepareAndDownload(cache_base_path, segments_2[1]);
prepareAndDownload(segments_2[1]);
segments_2[1]->completeWithState(DB::FileSegment::State::DOWNLOADED);
});
@ -548,141 +517,3 @@ TEST_F(FileCacheTest, get)
}
}
TEST_F(FileCacheTest, rangeWriter)
{
DB::FileCacheSettings settings;
settings.max_size = 25;
settings.max_elements = 5;
settings.max_file_segment_size = 10;
DB::FileCache cache(cache_base_path, settings);
cache.initialize();
auto key = cache.hash("key1");
DB::FileSegmentRangeWriter writer(&cache, key, nullptr, "", "key1");
std::string data(100, '\xf0');
size_t total_written = 0;
for (const size_t size : {3, 5, 8, 1, 1, 3})
{
total_written += size;
ASSERT_EQ(writer.tryWrite(data.data(), size, writer.currentOffset()), size);
}
ASSERT_LT(total_written, settings.max_size);
size_t offset_before_unsuccessful_write = writer.currentOffset();
size_t space_left = settings.max_size - total_written;
ASSERT_EQ(writer.tryWrite(data.data(), space_left + 1, writer.currentOffset()), 0);
ASSERT_EQ(writer.currentOffset(), offset_before_unsuccessful_write);
ASSERT_EQ(writer.tryWrite(data.data(), space_left, writer.currentOffset()), space_left);
writer.finalize();
}
static Block generateBlock(size_t size = 0)
{
Block block;
ColumnWithTypeAndName column;
column.name = "x";
column.type = std::make_shared<DataTypeUInt64>();
{
MutableColumnPtr mut_col = column.type->createColumn();
for (size_t i = 0; i < size; ++i)
mut_col->insert(i);
column.column = std::move(mut_col);
}
block.insert(column);
LOG_DEBUG(&Poco::Logger::get("FileCacheTest"), "generated block {} bytes", block.bytes());
return block;
}
static size_t readAllTemporaryData(TemporaryFileStream & stream)
{
Block block;
size_t read_rows = 0;
do
{
block = stream.read();
read_rows += block.rows();
} while (block);
return read_rows;
}
TEST_F(FileCacheTest, temporaryData)
{
DB::FileCacheSettings settings;
settings.max_size = 10240;
settings.max_file_segment_size = 1024;
DB::FileCache file_cache(cache_base_path, settings);
file_cache.initialize();
auto tmp_data_scope = std::make_shared<TemporaryDataOnDiskScope>(nullptr, &file_cache, 0);
auto some_data_holder = file_cache.getOrSet(file_cache.hash("some_data"), 0, 1024 * 5, CreateFileSegmentSettings{});
{
auto segments = fromHolder(some_data_holder);
ASSERT_EQ(segments.size(), 5);
for (auto & segment : segments)
{
ASSERT_TRUE(segment->getOrSetDownloader() == DB::FileSegment::getCallerId());
ASSERT_TRUE(segment->reserve(segment->range().size()));
download(cache_base_path, segment);
segment->completeWithState(DB::FileSegment::State::DOWNLOADED);
}
}
size_t size_used_before_temporary_data = file_cache.getUsedCacheSize();
size_t segments_used_before_temporary_data = file_cache.getFileSegmentsNum();
ASSERT_GT(size_used_before_temporary_data, 0);
ASSERT_GT(segments_used_before_temporary_data, 0);
size_t size_used_with_temporary_data;
size_t segments_used_with_temporary_data;
{
auto tmp_data = std::make_unique<TemporaryDataOnDisk>(tmp_data_scope);
auto & stream = tmp_data->createStream(generateBlock());
ASSERT_GT(stream.write(generateBlock(100)), 0);
EXPECT_GT(file_cache.getUsedCacheSize(), 0);
EXPECT_GT(file_cache.getFileSegmentsNum(), 0);
size_t used_size_before_attempt = file_cache.getUsedCacheSize();
/// data can't be evicted because it is still held by `some_data_holder`
ASSERT_THROW(stream.write(generateBlock(1000)), DB::Exception);
ASSERT_EQ(file_cache.getUsedCacheSize(), used_size_before_attempt);
some_data_holder.reset();
stream.write(generateBlock(1011));
auto stat = stream.finishWriting();
EXPECT_EQ(stat.num_rows, 1111);
EXPECT_EQ(readAllTemporaryData(stream), 1111);
size_used_with_temporary_data = file_cache.getUsedCacheSize();
segments_used_with_temporary_data = file_cache.getFileSegmentsNum();
EXPECT_GT(size_used_with_temporary_data, 0);
EXPECT_GT(segments_used_with_temporary_data, 0);
}
/// All temp data should be evicted after removing temporary files
EXPECT_LE(file_cache.getUsedCacheSize(), size_used_with_temporary_data);
EXPECT_LE(file_cache.getFileSegmentsNum(), segments_used_with_temporary_data);
/// Some segments reserved by `some_data_holder` was eviced by temporary data
EXPECT_LE(file_cache.getUsedCacheSize(), size_used_before_temporary_data);
EXPECT_LE(file_cache.getFileSegmentsNum(), segments_used_before_temporary_data);
}

View File

@ -192,7 +192,7 @@ private:
bool force_ttl{false};
CompressionCodecPtr compression_codec{nullptr};
size_t sum_input_rows_upper_bound{0};
std::unique_ptr<PocoTemporaryFile> rows_sources_file{nullptr};
std::unique_ptr<TemporaryFile> rows_sources_file{nullptr};
std::unique_ptr<WriteBufferFromFileBase> rows_sources_uncompressed_write_buf{nullptr};
std::unique_ptr<WriteBuffer> rows_sources_write_buf{nullptr};
std::optional<ColumnSizeEstimator> column_sizes{};
@ -257,7 +257,7 @@ private:
/// Begin dependencies from previous stage
std::unique_ptr<WriteBuffer> rows_sources_write_buf{nullptr};
std::unique_ptr<WriteBufferFromFileBase> rows_sources_uncompressed_write_buf{nullptr};
std::unique_ptr<PocoTemporaryFile> rows_sources_file;
std::unique_ptr<TemporaryFile> rows_sources_file;
std::optional<ColumnSizeEstimator> column_sizes;
CompressionCodecPtr compression_codec;
DiskPtr tmp_disk{nullptr};

View File

@ -24,8 +24,7 @@ NamesAndTypesList StorageSystemFilesystemCache::getNamesAndTypes()
{"cache_hits", std::make_shared<DataTypeUInt64>()},
{"references", std::make_shared<DataTypeUInt64>()},
{"downloaded_size", std::make_shared<DataTypeUInt64>()},
{"persistent", std::make_shared<DataTypeNumber<UInt8>>()},
{"kind", std::make_shared<DataTypeString>()},
{"persistent", std::make_shared<DataTypeNumber<UInt8>>()}
};
}
@ -46,11 +45,8 @@ void StorageSystemFilesystemCache::fillData(MutableColumns & res_columns, Contex
for (const auto & file_segment : file_segments)
{
res_columns[0]->insert(cache_base_path);
/// Do not use `file_segment->getPathInLocalCache` here because it will lead to nullptr dereference
/// (because file_segments in getSnapshot doesn't have `cache` field set)
res_columns[1]->insert(
cache->getPathInLocalCache(file_segment->key(), file_segment->offset(), file_segment->getKind()));
cache->getPathInLocalCache(file_segment->key(), file_segment->offset(), file_segment->isPersistent()));
const auto & range = file_segment->range();
res_columns[2]->insert(range.left);
@ -61,7 +57,6 @@ void StorageSystemFilesystemCache::fillData(MutableColumns & res_columns, Contex
res_columns[7]->insert(file_segment->getRefCount());
res_columns[8]->insert(file_segment->getDownloadedSize());
res_columns[9]->insert(file_segment->isPersistent());
res_columns[10]->insert(toString(file_segment->getKind()));
}
}
}

View File

@ -1,39 +0,0 @@
<clickhouse>
<storage_configuration>
<disks>
<local_disk>
<type>local</type>
<path>/local_disk/</path>
</local_disk>
<tiny_local_cache>
<type>cache</type>
<disk>local_disk</disk>
<path>/tiny_local_cache/</path>
<max_size>10M</max_size>
<max_file_segment_size>1M</max_file_segment_size>
<cache_on_write_operations>1</cache_on_write_operations>
<do_not_evict_index_and_mark_files>0</do_not_evict_index_and_mark_files>
</tiny_local_cache>
<!-- Used to check free space in `/tiny_local_cache` using `system.disks` -->
<!-- Entry about tiny_local_cache shows incorrect info due to it uses DiskObjectStorage under the hood -->
<tiny_local_cache_local_disk>
<type>local</type>
<path>/tiny_local_cache/</path>
</tiny_local_cache_local_disk>
</disks>
<policies>
<tiny_local_cache>
<volumes>
<main>
<disk>tiny_local_cache</disk>
</main>
</volumes>
</tiny_local_cache>
</policies>
</storage_configuration>
<tmp_cache>tiny_local_cache</tmp_cache>
</clickhouse>

View File

@ -1,81 +0,0 @@
# pylint: disable=unused-argument
# pylint: disable=redefined-outer-name
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.client import QueryRuntimeException
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node",
main_configs=["configs/config.d/storage_configuration.xml"],
tmpfs=["/local_disk:size=50M", "/tiny_local_cache:size=12M"],
)
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_cache_evicted_by_temporary_data(start_cluster):
q = node.query
qi = lambda query: int(node.query(query).strip())
cache_size_initial = qi("SELECT sum(size) FROM system.filesystem_cache")
assert cache_size_initial == 0
free_space_initial = qi(
"SELECT free_space FROM system.disks WHERE name = 'tiny_local_cache_local_disk'"
)
assert free_space_initial > 8 * 1024 * 1024
q(
"CREATE TABLE t1 (x UInt64) ENGINE = MergeTree ORDER BY x SETTINGS storage_policy = 'tiny_local_cache'"
)
q("INSERT INTO t1 SELECT number FROM numbers(1024 * 1024)")
# To be sure that nothing is reading the cache and entries for t1 can be evited
q("OPTIMIZE TABLE t1 FINAL")
q("SYSTEM STOP MERGES t1")
# Read some data to fill the cache
q("SELECT sum(x) FROM t1")
cache_size_with_t1 = qi("SELECT sum(size) FROM system.filesystem_cache")
assert cache_size_with_t1 > 8 * 1024 * 1024
# Almost all disk space is occupied by t1 cache
free_space_with_t1 = qi(
"SELECT free_space FROM system.disks WHERE name = 'tiny_local_cache_local_disk'"
)
assert free_space_with_t1 < 4 * 1024 * 1024
# Try to sort the table, but fail because of lack of disk space
with pytest.raises(QueryRuntimeException) as exc:
q(
"SELECT ignore(*) FROM numbers(10 * 1024 * 1024) ORDER BY sipHash64(number)",
settings={
"max_bytes_before_external_group_by": "4M",
"max_bytes_before_external_sort": "4M",
},
)
assert "Cannot reserve space in file cache" in str(exc.value)
# Some data evicted from cache by temporary data
cache_size_after_eviction = qi("SELECT sum(size) FROM system.filesystem_cache")
assert cache_size_after_eviction < cache_size_with_t1
# Disk space freed, at least 3 MB, because temporary data tried to write 4 MB
free_space_after_eviction = qi(
"SELECT free_space FROM system.disks WHERE name = 'tiny_local_cache_local_disk'"
)
assert free_space_after_eviction > free_space_with_t1 + 3 * 1024 * 1024
q("DROP TABLE IF EXISTS t1")

View File

@ -23,7 +23,7 @@ def start_cluster():
cluster.shutdown()
def test_disk_selection(start_cluster):
def test_different_versions(start_cluster):
query = "SELECT count(ignore(*)) FROM (SELECT * FROM system.numbers LIMIT 1e7) GROUP BY number"
settings = {
"max_bytes_before_external_group_by": 1 << 20,