From 7d6950d397c29f9a0f391746a76ead59f57503d4 Mon Sep 17 00:00:00 2001 From: Vladimir C Date: Fri, 2 Dec 2022 14:50:56 +0100 Subject: [PATCH] Revert "Temporary files evict fs cache" --- programs/local/LocalServer.cpp | 2 +- programs/server/Server.cpp | 58 +++-- src/Common/filesystemHelpers.cpp | 6 +- src/Common/filesystemHelpers.h | 4 +- .../IO/CachedOnDiskReadBufferFromFile.cpp | 7 +- .../IO/CachedOnDiskWriteBufferFromFile.cpp | 78 ++----- .../IO/CachedOnDiskWriteBufferFromFile.h | 37 +-- src/Disks/IO/FileCachePlaceholder.cpp | 73 ------ src/Disks/IO/FileCachePlaceholder.h | 61 ----- .../Cached/CachedObjectStorage.h | 2 - .../ObjectStorages/DiskObjectStorage.cpp | 8 - src/Disks/ObjectStorages/DiskObjectStorage.h | 1 - src/Disks/TemporaryFileInPath.cpp | 20 -- src/Disks/TemporaryFileInPath.h | 21 -- src/Disks/TemporaryFileOnDisk.cpp | 2 +- src/Disks/TemporaryFileOnDisk.h | 15 +- src/Formats/NativeWriter.cpp | 8 +- src/Formats/NativeWriter.h | 4 +- src/IO/WriteBufferFromTemporaryFile.cpp | 6 +- src/IO/WriteBufferFromTemporaryFile.h | 4 +- src/Interpreters/Cache/FileCache.cpp | 62 +---- src/Interpreters/Cache/FileCache.h | 6 +- src/Interpreters/Cache/FileCacheFactory.cpp | 13 +- src/Interpreters/Cache/FileCacheFactory.h | 1 - src/Interpreters/Cache/FileSegment.cpp | 80 ++----- src/Interpreters/Cache/FileSegment.h | 58 ++--- src/Interpreters/Context.cpp | 104 ++------- src/Interpreters/Context.h | 4 +- src/Interpreters/TemporaryDataOnDisk.cpp | 90 ++----- src/Interpreters/TemporaryDataOnDisk.h | 24 +- .../tests/gtest_lru_file_cache.cpp | 219 ++---------------- src/Storages/MergeTree/MergeTask.h | 4 +- .../System/StorageSystemFilesystemCache.cpp | 9 +- .../test_temporary_data_in_cache/__init__.py | 0 .../config.d/storage_configuration.xml | 39 ---- .../test_temporary_data_in_cache/test.py | 81 ------- tests/integration/test_tmp_policy/test.py | 2 +- 37 files changed, 220 insertions(+), 993 deletions(-) delete mode 100644 src/Disks/IO/FileCachePlaceholder.cpp delete mode 100644 src/Disks/IO/FileCachePlaceholder.h delete mode 100644 src/Disks/TemporaryFileInPath.cpp delete mode 100644 src/Disks/TemporaryFileInPath.h delete mode 100644 tests/integration/test_temporary_data_in_cache/__init__.py delete mode 100644 tests/integration/test_temporary_data_in_cache/configs/config.d/storage_configuration.xml delete mode 100644 tests/integration/test_temporary_data_in_cache/test.py diff --git a/programs/local/LocalServer.cpp b/programs/local/LocalServer.cpp index 22c94e01a51..ce7e27026f1 100644 --- a/programs/local/LocalServer.cpp +++ b/programs/local/LocalServer.cpp @@ -207,7 +207,7 @@ void LocalServer::tryInitPath() global_context->setPath(path); - global_context->setTemporaryStoragePath(path + "tmp/", 0); + global_context->setTemporaryStorage(path + "tmp", "", 0); global_context->setFlagsPath(path + "flags"); global_context->setUserFilesPath(""); // user's files are everywhere diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index 40f156bc211..e7e359df21a 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -203,6 +203,46 @@ int mainEntryClickHouseServer(int argc, char ** argv) namespace { +void setupTmpPath(Poco::Logger * log, const std::string & path) +try +{ + LOG_DEBUG(log, "Setting up {} to store temporary data in it", path); + + fs::create_directories(path); + + /// Clearing old temporary files. + fs::directory_iterator dir_end; + size_t unknown_files = 0; + for (fs::directory_iterator it(path); it != dir_end; ++it) + { + if (it->is_regular_file() && startsWith(it->path().filename(), "tmp")) + { + LOG_DEBUG(log, "Removing old temporary file {}", it->path().string()); + fs::remove(it->path()); + } + else + { + unknown_files++; + if (unknown_files < 100) + LOG_DEBUG(log, "Found unknown {} {} in temporary path", + it->is_regular_file() ? "file" : (it->is_directory() ? "directory" : "element"), + it->path().string()); + } + } + + if (unknown_files) + LOG_DEBUG(log, "Found {} unknown files in temporary path", unknown_files); +} +catch (...) +{ + DB::tryLogCurrentException( + log, + fmt::format( + "Caught exception while setup temporary path: {}. It is ok to skip this exception as cleaning old temporary files is not " + "necessary", + path)); +} + size_t waitServersToFinish(std::vector & servers, size_t seconds_to_wait) { const size_t sleep_max_ms = 1000 * seconds_to_wait; @@ -997,21 +1037,13 @@ try LOG_TRACE(log, "Initialized DateLUT with time zone '{}'.", DateLUT::instance().getTimeZone()); /// Storage with temporary data for processing of heavy queries. - if (auto temporary_policy = config().getString("tmp_policy", ""); !temporary_policy.empty()) - { - size_t max_size = config().getUInt64("max_temporary_data_on_disk_size", 0); - global_context->setTemporaryStoragePolicy(temporary_policy, max_size); - } - else if (auto temporary_cache = config().getString("tmp_cache", ""); !temporary_cache.empty()) - { - size_t max_size = config().getUInt64("max_temporary_data_on_disk_size", 0); - global_context->setTemporaryStorageInCache(temporary_cache, max_size); - } - else { std::string temporary_path = config().getString("tmp_path", path / "tmp/"); + std::string temporary_policy = config().getString("tmp_policy", ""); size_t max_size = config().getUInt64("max_temporary_data_on_disk_size", 0); - global_context->setTemporaryStoragePath(temporary_path, max_size); + const VolumePtr & volume = global_context->setTemporaryStorage(temporary_path, temporary_policy, max_size); + for (const DiskPtr & disk : volume->getDisks()) + setupTmpPath(log, disk->getPath()); } /** Directory with 'flags': files indicating temporary settings for the server set by system administrator. @@ -1410,7 +1442,7 @@ try } catch (...) { - tryLogCurrentException(log, "Caught exception while setting up access control."); + tryLogCurrentException(log); throw; } diff --git a/src/Common/filesystemHelpers.cpp b/src/Common/filesystemHelpers.cpp index 43f88dd7faa..07a08dc7fbc 100644 --- a/src/Common/filesystemHelpers.cpp +++ b/src/Common/filesystemHelpers.cpp @@ -64,11 +64,11 @@ bool enoughSpaceInDirectory(const std::string & path, size_t data_size) return data_size <= free_space; } -std::unique_ptr createTemporaryFile(const std::string & folder_path) +std::unique_ptr createTemporaryFile(const std::string & path) { ProfileEvents::increment(ProfileEvents::ExternalProcessingFilesTotal); - fs::create_directories(folder_path); - return std::make_unique(folder_path); + fs::create_directories(path); + return std::make_unique(path); } #if !defined(OS_LINUX) diff --git a/src/Common/filesystemHelpers.h b/src/Common/filesystemHelpers.h index 14ee5f54322..0e6e16941bb 100644 --- a/src/Common/filesystemHelpers.h +++ b/src/Common/filesystemHelpers.h @@ -14,10 +14,10 @@ namespace fs = std::filesystem; namespace DB { -using PocoTemporaryFile = Poco::TemporaryFile; +using TemporaryFile = Poco::TemporaryFile; bool enoughSpaceInDirectory(const std::string & path, size_t data_size); -std::unique_ptr createTemporaryFile(const std::string & folder_path); +std::unique_ptr createTemporaryFile(const std::string & path); // Determine what block device is responsible for specified path diff --git a/src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp b/src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp index 0dd40e7f153..5b5d746ab55 100644 --- a/src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp +++ b/src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp @@ -118,7 +118,10 @@ void CachedOnDiskReadBufferFromFile::initialize(size_t offset, size_t size) } else { - CreateFileSegmentSettings create_settings(is_persistent ? FileSegmentKind::Persistent : FileSegmentKind::Regular); + CreateFileSegmentSettings create_settings{ + .is_persistent = is_persistent + }; + file_segments_holder.emplace(cache->getOrSet(cache_key, offset, size, create_settings)); } @@ -948,7 +951,7 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep() } else { - LOG_TRACE(log, "No space left in cache to reserve {} bytes, will continue without cache download", size); + LOG_TRACE(log, "No space left in cache, will continue without cache download"); file_segment->completeWithState(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION); } diff --git a/src/Disks/IO/CachedOnDiskWriteBufferFromFile.cpp b/src/Disks/IO/CachedOnDiskWriteBufferFromFile.cpp index b11edd7e701..994bb743c5f 100644 --- a/src/Disks/IO/CachedOnDiskWriteBufferFromFile.cpp +++ b/src/Disks/IO/CachedOnDiskWriteBufferFromFile.cpp @@ -51,42 +51,16 @@ FileSegmentRangeWriter::FileSegmentRangeWriter( { } -bool FileSegmentRangeWriter::write(const char * data, size_t size, size_t offset, FileSegmentKind segment_kind) -{ - size_t written_size = tryWrite(data, size, offset, segment_kind, true); - return written_size == size; -} - -size_t FileSegmentRangeWriter::tryWrite(const char * data, size_t size, size_t offset, FileSegmentKind segment_kind, bool strict) -{ - size_t total_written_size = 0; - while (size > 0) - { - size_t written_size = tryWriteImpl(data, size, offset, segment_kind, strict); - chassert(written_size <= size); - if (written_size == 0) - break; - - if (data) - data += written_size; - - size -= written_size; - offset += written_size; - total_written_size += written_size; - } - return total_written_size; -} - -size_t FileSegmentRangeWriter::tryWriteImpl(const char * data, size_t size, size_t offset, FileSegmentKind segment_kind, bool strict) +bool FileSegmentRangeWriter::write(const char * data, size_t size, size_t offset, bool is_persistent) { if (finalized) - return 0; + return false; auto & file_segments = file_segments_holder.file_segments; if (current_file_segment_it == file_segments.end()) { - current_file_segment_it = allocateFileSegment(current_file_segment_write_offset, segment_kind); + current_file_segment_it = allocateFileSegment(current_file_segment_write_offset, is_persistent); } else { @@ -104,7 +78,7 @@ size_t FileSegmentRangeWriter::tryWriteImpl(const char * data, size_t size, size if (file_segment->range().size() == file_segment->getDownloadedSize()) { completeFileSegment(*file_segment); - current_file_segment_it = allocateFileSegment(current_file_segment_write_offset, segment_kind); + current_file_segment_it = allocateFileSegment(current_file_segment_write_offset, is_persistent); } } @@ -119,26 +93,20 @@ size_t FileSegmentRangeWriter::tryWriteImpl(const char * data, size_t size, size file_segment->completePartAndResetDownloader(); }); - size_t reserved_size = file_segment->tryReserve(size, strict); - if (reserved_size == 0 || (strict && reserved_size != size)) + bool reserved = file_segment->reserve(size); + if (!reserved) { - if (strict) - { - file_segment->completeWithState(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION); - appendFilesystemCacheLog(*file_segment); - } + file_segment->completeWithState(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION); + appendFilesystemCacheLog(*file_segment); LOG_DEBUG( &Poco::Logger::get("FileSegmentRangeWriter"), "Unsuccessful space reservation attempt (size: {}, file segment info: {}", size, file_segment->getInfoForLog()); - return 0; + return false; } - /// Shrink to reserved size, because we can't write more than reserved - size = reserved_size; - try { file_segment->write(data, size, offset); @@ -152,17 +120,7 @@ size_t FileSegmentRangeWriter::tryWriteImpl(const char * data, size_t size, size file_segment->completePartAndResetDownloader(); current_file_segment_write_offset += size; - return size; -} - -bool FileSegmentRangeWriter::reserve(size_t size, size_t offset) -{ - return write(nullptr, size, offset, FileSegmentKind::Temporary); -} - -size_t FileSegmentRangeWriter::tryReserve(size_t size, size_t offset) -{ - return tryWrite(nullptr, size, offset, FileSegmentKind::Temporary); + return true; } void FileSegmentRangeWriter::finalize() @@ -171,7 +129,6 @@ void FileSegmentRangeWriter::finalize() return; auto & file_segments = file_segments_holder.file_segments; - if (file_segments.empty() || current_file_segment_it == file_segments.end()) return; @@ -192,7 +149,7 @@ FileSegmentRangeWriter::~FileSegmentRangeWriter() } } -FileSegments::iterator FileSegmentRangeWriter::allocateFileSegment(size_t offset, FileSegmentKind segment_kind) +FileSegments::iterator FileSegmentRangeWriter::allocateFileSegment(size_t offset, bool is_persistent) { /** * Allocate a new file segment starting `offset`. @@ -201,7 +158,10 @@ FileSegments::iterator FileSegmentRangeWriter::allocateFileSegment(size_t offset std::lock_guard cache_lock(cache->mutex); - CreateFileSegmentSettings create_settings(segment_kind); + CreateFileSegmentSettings create_settings + { + .is_persistent = is_persistent, + }; /// We set max_file_segment_size to be downloaded, /// if we have less size to write, file segment will be resized in complete() method. @@ -236,15 +196,12 @@ void FileSegmentRangeWriter::appendFilesystemCacheLog(const FileSegment & file_s } } -void FileSegmentRangeWriter::completeFileSegment(FileSegment & file_segment, std::optional state) +void FileSegmentRangeWriter::completeFileSegment(FileSegment & file_segment) { /// File segment can be detached if space reservation failed. if (file_segment.isDetached()) return; - if (state.has_value()) - file_segment.setDownloadState(*state); - file_segment.completeWithoutState(); appendFilesystemCacheLog(file_segment); } @@ -313,8 +270,7 @@ void CachedOnDiskWriteBufferFromFile::cacheData(char * data, size_t size) try { - auto segment_kind = is_persistent_cache_file ? FileSegmentKind::Persistent : FileSegmentKind::Regular; - if (!cache_writer->write(data, size, current_download_offset, segment_kind)) + if (!cache_writer->write(data, size, current_download_offset, is_persistent_cache_file)) { LOG_INFO(log, "Write-through cache is stopped as cache limit is reached and nothing can be evicted"); return; diff --git a/src/Disks/IO/CachedOnDiskWriteBufferFromFile.h b/src/Disks/IO/CachedOnDiskWriteBufferFromFile.h index 38ec2b46a5d..cec7305ab1b 100644 --- a/src/Disks/IO/CachedOnDiskWriteBufferFromFile.h +++ b/src/Disks/IO/CachedOnDiskWriteBufferFromFile.h @@ -4,7 +4,6 @@ #include #include #include -#include namespace Poco { @@ -29,44 +28,22 @@ public: FileCache * cache_, const FileSegment::Key & key_, std::shared_ptr cache_log_, const String & query_id_, const String & source_path_); - /* Write a range of file segments. - * Allocate file segment of `max_file_segment_size` and write to it until it is full and then allocate next file segment. - * If it's impossible to allocate new file segment and reserve space to write all data, then returns false. - * - * Note: the data that was written to file segments before the error occurred is not rolled back. - */ - bool write(const char * data, size_t size, size_t offset, FileSegmentKind segment_kind); - - /* Tries to write data to current file segment. - * Size of written data may be less than requested_size, because it may not be enough space. - * - * Returns size of written data. - */ - size_t tryWrite(const char * data, size_t size, size_t offset, FileSegmentKind segment_kind = FileSegmentKind::Regular, bool strict = false); - - /// Same as `write/tryWrite`, but doesn't write anything, just reserves some space in cache - bool reserve(size_t size, size_t offset); - size_t tryReserve(size_t size, size_t offset); + /** + * Write a range of file segments. Allocate file segment of `max_file_segment_size` and write to + * it until it is full and then allocate next file segment. + */ + bool write(const char * data, size_t size, size_t offset, bool is_persistent); void finalize(); - size_t currentOffset() const { return current_file_segment_write_offset; } - ~FileSegmentRangeWriter(); private: - FileSegments::iterator allocateFileSegment(size_t offset, FileSegmentKind segment_kind); + FileSegments::iterator allocateFileSegment(size_t offset, bool is_persistent); void appendFilesystemCacheLog(const FileSegment & file_segment); - void completeFileSegment(FileSegment & file_segment, std::optional state = {}); - - /* Writes data to current file segment as much as possible and returns size of written data, do not allocate new file segments - * In `strict` mode it will write all data or nothing, otherwise it will write as much as possible - * If returned non zero value, then we can try to write again to next file segment. - * If no space is available, returns zero. - */ - size_t tryWriteImpl(const char * data, size_t size, size_t offset, FileSegmentKind segment_kind, bool strict); + void completeFileSegment(FileSegment & file_segment); FileCache * cache; FileSegment::Key key; diff --git a/src/Disks/IO/FileCachePlaceholder.cpp b/src/Disks/IO/FileCachePlaceholder.cpp deleted file mode 100644 index d45d7b2d83d..00000000000 --- a/src/Disks/IO/FileCachePlaceholder.cpp +++ /dev/null @@ -1,73 +0,0 @@ -#include - -namespace DB -{ - -namespace ErrorCodes -{ - extern const int NOT_ENOUGH_SPACE; -} - -void ISpacePlaceholder::reserveCapacity(size_t requested_capacity) -{ - chassert(used_space <= capacity); - - size_t remaining_space = capacity - used_space; - LOG_TEST(&Poco::Logger::get("ISpacePlaceholder"), "Reserving {} bytes (used_space: {}, capacity: {})", requested_capacity, used_space, capacity); - - if (requested_capacity <= remaining_space) - return; - - size_t capacity_to_reserve = requested_capacity - remaining_space; - reserveImpl(capacity_to_reserve); - capacity += capacity_to_reserve; -} - -void ISpacePlaceholder::setUsed(size_t size) -{ - LOG_TEST(&Poco::Logger::get("ISpacePlaceholder"), "Using {} bytes ({} already used, {} capacity)", size, used_space, capacity); - - if (used_space + size > capacity) - { - LOG_WARNING(&Poco::Logger::get("ISpacePlaceholder"), "Used space is greater than capacity. It may lead to not enough space error"); - reserveCapacity(size); - } - - used_space = used_space + size; -} - -FileCachePlaceholder::FileCachePlaceholder(FileCache * cache, const String & name) - : key_name(name) - , file_cache(cache) -{ -} - -void FileCachePlaceholder::reserveImpl(size_t requested_size) -{ - /// We create new cache_writer and will try to reserve requested_size in it - String key = fmt::format("{}_{}", key_name, cache_writers.size()); - auto cache_writer = std::make_unique(file_cache, - file_cache->hash(key), - /* cache_log_ */ nullptr, - /* query_id_ */ "", - /* source_path_ */ key); - - size_t current_offset = cache_writer->currentOffset(); - size_t reserved_size = cache_writer->tryReserve(requested_size, current_offset); - if (reserved_size != requested_size) - { - throw Exception(ErrorCodes::NOT_ENOUGH_SPACE, - "Cannot reserve space in file cache " - "({} bytes required, got {} reserved " - "{} / {} bytes used, " - "{} / {} elements used)" - , requested_size, reserved_size - , file_cache->getUsedCacheSize(), file_cache->getTotalMaxSize() - , file_cache->getFileSegmentsNum(), file_cache->getTotalMaxElements()); - } - /// Add to cache_writers only if we successfully reserved space, otherwise free reserved_size back - cache_writers.push_back(std::move(cache_writer)); -} - - -} diff --git a/src/Disks/IO/FileCachePlaceholder.h b/src/Disks/IO/FileCachePlaceholder.h deleted file mode 100644 index 6ddeb85286b..00000000000 --- a/src/Disks/IO/FileCachePlaceholder.h +++ /dev/null @@ -1,61 +0,0 @@ -#pragma once - -#include -#include - -#include -#include - -#include - -namespace fs = std::filesystem; - -namespace DB -{ - - -/* ISpacePlaceholder is a base class for all classes that need to reserve space in some storage. - * You should resrve space with call reserveCapacity() before writing to it. - * After writing you should call setUsed() to let ISpacePlaceholder know how much space was used. - * It can be different because in some cases you don't know exact size of data you will write (because of compression, for example). - * It's better to reserve more space in advance not to overuse space. - */ -class ISpacePlaceholder -{ -public: - /// Reserve space in storage - void reserveCapacity(size_t requested_capacity); - - /// Indicate that some space is used - /// It uses reserved space if it is possible, otherwise it reserves more space - void setUsed(size_t size); - - virtual ~ISpacePlaceholder() = default; - -private: - virtual void reserveImpl(size_t size) = 0; - - size_t capacity = 0; - size_t used_space = 0; -}; - -/* FileCachePlaceholder is a class that reserves space in FileCache. - * Data is written externally, and FileCachePlaceholder is only used to hold space in FileCache. - */ -class FileCachePlaceholder : public ISpacePlaceholder -{ -public: - FileCachePlaceholder(FileCache * cache, const String & name); - - void reserveImpl(size_t requested_size) override; - -private: - std::string key_name; - FileCache * file_cache; - - /// On each reserveImpl() call we create new FileSegmentRangeWriter that would be hold space - /// It's required to easily release already reserved space on unsuccessful attempt - std::vector> cache_writers; -}; - -} diff --git a/src/Disks/ObjectStorages/Cached/CachedObjectStorage.h b/src/Disks/ObjectStorages/Cached/CachedObjectStorage.h index 119dc25c66b..2d67203be0f 100644 --- a/src/Disks/ObjectStorages/Cached/CachedObjectStorage.h +++ b/src/Disks/ObjectStorages/Cached/CachedObjectStorage.h @@ -113,8 +113,6 @@ public: WriteSettings getAdjustedSettingsFromMetadataFile(const WriteSettings & settings, const std::string & path) const override; - FileCachePtr getCache() const { return cache; } - private: FileCache::Key getCacheKey(const std::string & path) const; diff --git a/src/Disks/ObjectStorages/DiskObjectStorage.cpp b/src/Disks/ObjectStorages/DiskObjectStorage.cpp index 4230fb6254a..263a9a9d0e1 100644 --- a/src/Disks/ObjectStorages/DiskObjectStorage.cpp +++ b/src/Disks/ObjectStorages/DiskObjectStorage.cpp @@ -519,14 +519,6 @@ void DiskObjectStorage::wrapWithCache(FileCachePtr cache, const FileCacheSetting object_storage = std::make_shared(object_storage, cache, cache_settings, layer_name); } -FileCachePtr DiskObjectStorage::getCache() const -{ - const auto * cached_object_storage = typeid_cast(object_storage.get()); - if (!cached_object_storage) - return nullptr; - return cached_object_storage->getCache(); -} - NameSet DiskObjectStorage::getCacheLayersNames() const { NameSet cache_layers; diff --git a/src/Disks/ObjectStorages/DiskObjectStorage.h b/src/Disks/ObjectStorages/DiskObjectStorage.h index a24acc270c0..00e3cf98142 100644 --- a/src/Disks/ObjectStorages/DiskObjectStorage.h +++ b/src/Disks/ObjectStorages/DiskObjectStorage.h @@ -186,7 +186,6 @@ public: /// There can be any number of cache layers: /// DiskObjectStorage(CachedObjectStorage(...CacheObjectStorage(S3ObjectStorage)...)) void wrapWithCache(FileCachePtr cache, const FileCacheSettings & cache_settings, const String & layer_name); - FileCachePtr getCache() const; /// Get structure of object storage this disk works with. Examples: /// DiskObjectStorage(S3ObjectStorage) diff --git a/src/Disks/TemporaryFileInPath.cpp b/src/Disks/TemporaryFileInPath.cpp deleted file mode 100644 index eae7fa66855..00000000000 --- a/src/Disks/TemporaryFileInPath.cpp +++ /dev/null @@ -1,20 +0,0 @@ -#include -#include - -namespace DB -{ - -TemporaryFileInPath::TemporaryFileInPath(const String & folder_path) - : tmp_file(createTemporaryFile(folder_path)) -{ - chassert(tmp_file); -} - -String TemporaryFileInPath::getPath() const -{ - return tmp_file->path(); -} - -TemporaryFileInPath::~TemporaryFileInPath() = default; - -} diff --git a/src/Disks/TemporaryFileInPath.h b/src/Disks/TemporaryFileInPath.h deleted file mode 100644 index 503247e3f89..00000000000 --- a/src/Disks/TemporaryFileInPath.h +++ /dev/null @@ -1,21 +0,0 @@ -#pragma once - -#include -#include - -namespace DB -{ - -/// Wrapper around Poco::TemporaryFile to implement ITemporaryFile. -class TemporaryFileInPath : public ITemporaryFile -{ -public: - explicit TemporaryFileInPath(const String & folder_path); - String getPath() const override; - - ~TemporaryFileInPath() override; -private: - std::unique_ptr tmp_file; -}; - -} diff --git a/src/Disks/TemporaryFileOnDisk.cpp b/src/Disks/TemporaryFileOnDisk.cpp index af1f3f87c71..4f348519037 100644 --- a/src/Disks/TemporaryFileOnDisk.cpp +++ b/src/Disks/TemporaryFileOnDisk.cpp @@ -2,7 +2,6 @@ #include #include #include -#include #include @@ -16,6 +15,7 @@ namespace CurrentMetrics extern const Metric TotalTemporaryFiles; } + namespace DB { diff --git a/src/Disks/TemporaryFileOnDisk.h b/src/Disks/TemporaryFileOnDisk.h index fa3a0383297..9ba59c3eaf0 100644 --- a/src/Disks/TemporaryFileOnDisk.h +++ b/src/Disks/TemporaryFileOnDisk.h @@ -9,30 +9,21 @@ namespace DB { using DiskPtr = std::shared_ptr; -class ITemporaryFile -{ -public: - virtual String getPath() const = 0; - virtual ~ITemporaryFile() = default; -}; - -using TemporaryFileHolder = std::unique_ptr; - /// This class helps with the handling of temporary files or directories. /// A unique name for the temporary file or directory is automatically chosen based on a specified prefix. /// Create a directory in the constructor. /// The destructor always removes the temporary file or directory with all contained files. -class TemporaryFileOnDisk : public ITemporaryFile +class TemporaryFileOnDisk { public: explicit TemporaryFileOnDisk(const DiskPtr & disk_); explicit TemporaryFileOnDisk(const DiskPtr & disk_, CurrentMetrics::Value metric_scope); explicit TemporaryFileOnDisk(const DiskPtr & disk_, const String & prefix); - ~TemporaryFileOnDisk() override; + ~TemporaryFileOnDisk(); DiskPtr getDisk() const { return disk; } - String getPath() const override; + String getPath() const; private: DiskPtr disk; diff --git a/src/Formats/NativeWriter.cpp b/src/Formats/NativeWriter.cpp index e932bb88c2d..c4dea371afd 100644 --- a/src/Formats/NativeWriter.cpp +++ b/src/Formats/NativeWriter.cpp @@ -64,10 +64,8 @@ static void writeData(const ISerialization & serialization, const ColumnPtr & co } -size_t NativeWriter::write(const Block & block) +void NativeWriter::write(const Block & block) { - size_t written_before = ostr.count(); - /// Additional information about the block. if (client_revision > 0) block.info.write(ostr); @@ -163,10 +161,6 @@ size_t NativeWriter::write(const Block & block) if (index) index->blocks.emplace_back(std::move(index_block)); - - size_t written_after = ostr.count(); - size_t written_size = written_after - written_before; - return written_size; } } diff --git a/src/Formats/NativeWriter.h b/src/Formats/NativeWriter.h index 7bb377d2e4a..010a03ec722 100644 --- a/src/Formats/NativeWriter.h +++ b/src/Formats/NativeWriter.h @@ -27,9 +27,7 @@ public: IndexForNativeFormat * index_ = nullptr, size_t initial_size_of_file_ = 0); Block getHeader() const { return header; } - - /// Returns the number of bytes written. - size_t write(const Block & block); + void write(const Block & block); void flush(); static String getContentType() { return "application/octet-stream"; } diff --git a/src/IO/WriteBufferFromTemporaryFile.cpp b/src/IO/WriteBufferFromTemporaryFile.cpp index 4562ad512b3..f93c79ca587 100644 --- a/src/IO/WriteBufferFromTemporaryFile.cpp +++ b/src/IO/WriteBufferFromTemporaryFile.cpp @@ -13,7 +13,7 @@ namespace ErrorCodes } -WriteBufferFromTemporaryFile::WriteBufferFromTemporaryFile(std::unique_ptr && tmp_file_) +WriteBufferFromTemporaryFile::WriteBufferFromTemporaryFile(std::unique_ptr && tmp_file_) : WriteBufferFromFile(tmp_file_->path(), DBMS_DEFAULT_BUFFER_SIZE, O_RDWR | O_TRUNC | O_CREAT, 0600), tmp_file(std::move(tmp_file_)) {} @@ -40,11 +40,11 @@ public: return std::make_shared(fd, file_name, std::move(origin->tmp_file)); } - ReadBufferFromTemporaryWriteBuffer(int fd_, const std::string & file_name_, std::unique_ptr && tmp_file_) + ReadBufferFromTemporaryWriteBuffer(int fd_, const std::string & file_name_, std::unique_ptr && tmp_file_) : ReadBufferFromFile(fd_, file_name_), tmp_file(std::move(tmp_file_)) {} - std::unique_ptr tmp_file; + std::unique_ptr tmp_file; }; diff --git a/src/IO/WriteBufferFromTemporaryFile.h b/src/IO/WriteBufferFromTemporaryFile.h index a4e83b95ac6..06e2911db26 100644 --- a/src/IO/WriteBufferFromTemporaryFile.h +++ b/src/IO/WriteBufferFromTemporaryFile.h @@ -20,11 +20,11 @@ public: ~WriteBufferFromTemporaryFile() override; private: - explicit WriteBufferFromTemporaryFile(std::unique_ptr && tmp_file); + explicit WriteBufferFromTemporaryFile(std::unique_ptr && tmp_file); std::shared_ptr getReadBufferImpl() override; - std::unique_ptr tmp_file; + std::unique_ptr tmp_file; friend class ReadBufferFromTemporaryWriteBuffer; }; diff --git a/src/Interpreters/Cache/FileCache.cpp b/src/Interpreters/Cache/FileCache.cpp index e2a0e839f19..72fa1b3c324 100644 --- a/src/Interpreters/Cache/FileCache.cpp +++ b/src/Interpreters/Cache/FileCache.cpp @@ -2,7 +2,6 @@ #include #include -#include #include #include #include @@ -13,7 +12,6 @@ #include #include - namespace fs = std::filesystem; namespace DB @@ -48,27 +46,13 @@ FileCache::Key FileCache::hash(const String & path) return Key(sipHash128(path.data(), path.size())); } -String FileCache::getPathInLocalCache(const Key & key, size_t offset, FileSegmentKind segment_kind) const +String FileCache::getPathInLocalCache(const Key & key, size_t offset, bool is_persistent) const { - String file_suffix; - switch (segment_kind) - { - case FileSegmentKind::Persistent: - file_suffix = "_persistent"; - break; - case FileSegmentKind::Temporary: - file_suffix = "_temporary"; - break; - case FileSegmentKind::Regular: - file_suffix = ""; - break; - } - auto key_str = key.toString(); return fs::path(cache_base_path) / key_str.substr(0, 3) / key_str - / (std::to_string(offset) + file_suffix); + / (std::to_string(offset) + (is_persistent ? "_persistent" : "")); } String FileCache::getPathInLocalCache(const Key & key) const @@ -556,6 +540,9 @@ FileSegmentPtr FileCache::createFileSegmentForDownload( assertCacheCorrectness(key, cache_lock); #endif + if (size > max_file_segment_size) + throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Requested size exceeds max file segment size"); + auto * cell = getCell(key, offset, cache_lock); if (cell) throw Exception( @@ -1012,17 +999,9 @@ void FileCache::loadCacheInfoIntoMemory(std::lock_guard & cache_lock fs::directory_iterator key_it{key_prefix_it->path()}; for (; key_it != fs::directory_iterator(); ++key_it) { - if (key_it->is_regular_file()) + if (!key_it->is_directory()) { - if (key_prefix_it->path().filename() == "tmp" && startsWith(key_it->path().filename(), "tmp")) - { - LOG_DEBUG(log, "Found temporary file '{}', will remove it", key_it->path().string()); - fs::remove(key_it->path()); - } - else - { - LOG_DEBUG(log, "Unexpected file: {}. Expected a directory", key_it->path().string()); - } + LOG_DEBUG(log, "Unexpected file: {}. Expected a directory", key_it->path().string()); continue; } @@ -1030,26 +1009,17 @@ void FileCache::loadCacheInfoIntoMemory(std::lock_guard & cache_lock fs::directory_iterator offset_it{key_it->path()}; for (; offset_it != fs::directory_iterator(); ++offset_it) { - if (offset_it->is_directory()) - { - LOG_DEBUG(log, "Unexpected directory: {}. Expected a file", offset_it->path().string()); - continue; - } - auto offset_with_suffix = offset_it->path().filename().string(); auto delim_pos = offset_with_suffix.find('_'); bool parsed; - FileSegmentKind segment_kind = FileSegmentKind::Regular; + bool is_persistent = false; if (delim_pos == std::string::npos) parsed = tryParse(offset, offset_with_suffix); else { parsed = tryParse(offset, offset_with_suffix.substr(0, delim_pos)); - if (offset_with_suffix.substr(delim_pos+1) == "persistent") - segment_kind = FileSegmentKind::Persistent; - if (offset_with_suffix.substr(delim_pos+1) == "temporary") - segment_kind = FileSegmentKind::Temporary; + is_persistent = offset_with_suffix.substr(delim_pos+1) == "persistent"; } if (!parsed) @@ -1069,7 +1039,7 @@ void FileCache::loadCacheInfoIntoMemory(std::lock_guard & cache_lock { auto * cell = addCell( key, offset, size, FileSegment::State::DOWNLOADED, - CreateFileSegmentSettings(segment_kind), cache_lock); + CreateFileSegmentSettings{ .is_persistent = is_persistent }, cache_lock); if (cell) queue_entries.emplace_back(cell->queue_iterator, cell->file_segment); @@ -1181,7 +1151,7 @@ std::vector FileCache::tryGetCachePaths(const Key & key) for (const auto & [offset, cell] : cells_by_offset) { if (cell.file_segment->state() == FileSegment::State::DOWNLOADED) - cache_paths.push_back(getPathInLocalCache(key, offset, cell.file_segment->getKind())); + cache_paths.push_back(getPathInLocalCache(key, offset, cell.file_segment->isPersistent())); } return cache_paths; @@ -1203,16 +1173,6 @@ size_t FileCache::getAvailableCacheSizeUnlocked(std::lock_guard & ca return max_size - getUsedCacheSizeUnlocked(cache_lock); } -size_t FileCache::getTotalMaxSize() const -{ - return max_size; -} - -size_t FileCache::getTotalMaxElements() const -{ - return max_element_size; -} - size_t FileCache::getFileSegmentsNum() const { std::lock_guard cache_lock(mutex); diff --git a/src/Interpreters/Cache/FileCache.h b/src/Interpreters/Cache/FileCache.h index 4a2610fd76b..706762b6915 100644 --- a/src/Interpreters/Cache/FileCache.h +++ b/src/Interpreters/Cache/FileCache.h @@ -80,7 +80,7 @@ public: static Key hash(const String & path); - String getPathInLocalCache(const Key & key, size_t offset, FileSegmentKind segment_kind) const; + String getPathInLocalCache(const Key & key, size_t offset, bool is_persistent) const; String getPathInLocalCache(const Key & key) const; @@ -89,10 +89,8 @@ public: size_t capacity() const { return max_size; } size_t getUsedCacheSize() const; - size_t getTotalMaxSize() const; size_t getFileSegmentsNum() const; - size_t getTotalMaxElements() const; static bool isReadOnly(); @@ -223,8 +221,6 @@ private: FileSegmentCell * getCell(const Key & key, size_t offset, std::lock_guard & cache_lock); - /// Returns non-owened pointer to the cell stored in the `files` map. - /// Doesn't reserve any space. FileSegmentCell * addCell( const Key & key, size_t offset, diff --git a/src/Interpreters/Cache/FileCacheFactory.cpp b/src/Interpreters/Cache/FileCacheFactory.cpp index e120fe3fc27..b276760c0dd 100644 --- a/src/Interpreters/Cache/FileCacheFactory.cpp +++ b/src/Interpreters/Cache/FileCacheFactory.cpp @@ -31,21 +31,14 @@ const FileCacheSettings & FileCacheFactory::getSettings(const std::string & cach } -FileCachePtr FileCacheFactory::tryGet(const std::string & cache_base_path) +FileCachePtr FileCacheFactory::get(const std::string & cache_base_path) { std::lock_guard lock(mutex); auto it = caches_by_path.find(cache_base_path); if (it == caches_by_path.end()) - return nullptr; - return it->second->cache; -} - -FileCachePtr FileCacheFactory::get(const std::string & cache_base_path) -{ - auto file_cache_ptr = tryGet(cache_base_path); - if (!file_cache_ptr) throw Exception(ErrorCodes::BAD_ARGUMENTS, "No cache found by path: {}", cache_base_path); - return file_cache_ptr; + return it->second->cache; + } FileCachePtr FileCacheFactory::getOrCreate( diff --git a/src/Interpreters/Cache/FileCacheFactory.h b/src/Interpreters/Cache/FileCacheFactory.h index 32ecd05f019..82e0ec8f928 100644 --- a/src/Interpreters/Cache/FileCacheFactory.h +++ b/src/Interpreters/Cache/FileCacheFactory.h @@ -33,7 +33,6 @@ public: FileCachePtr getOrCreate(const std::string & cache_base_path, const FileCacheSettings & file_cache_settings, const std::string & name); - FileCachePtr tryGet(const std::string & cache_base_path); FileCachePtr get(const std::string & cache_base_path); CacheByBasePath getAll(); diff --git a/src/Interpreters/Cache/FileSegment.cpp b/src/Interpreters/Cache/FileSegment.cpp index e070317e454..418bcee05d9 100644 --- a/src/Interpreters/Cache/FileSegment.cpp +++ b/src/Interpreters/Cache/FileSegment.cpp @@ -23,19 +23,6 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -String toString(FileSegmentKind type) -{ - switch (type) - { - case FileSegmentKind::Regular: - return "Regular"; - case FileSegmentKind::Persistent: - return "Persistent"; - case FileSegmentKind::Temporary: - return "Temporary"; - } -} - FileSegment::FileSegment( size_t offset_, size_t size_, @@ -52,7 +39,7 @@ FileSegment::FileSegment( #else , log(&Poco::Logger::get("FileSegment")) #endif - , segment_kind(settings.type) + , is_persistent(settings.is_persistent) { /// On creation, file segment state can be EMPTY, DOWNLOADED, DOWNLOADING. switch (download_state) @@ -86,8 +73,7 @@ FileSegment::FileSegment( String FileSegment::getPathInLocalCache() const { - chassert(cache); - return cache->getPathInLocalCache(key(), offset(), segment_kind); + return cache->getPathInLocalCache(key(), offset(), isPersistent()); } FileSegment::State FileSegment::state() const @@ -323,7 +309,7 @@ void FileSegment::write(const char * from, size_t size, size_t offset) if (current_downloaded_size == range().size()) throw Exception(ErrorCodes::LOGICAL_ERROR, "File segment is already fully downloaded"); - if (!cache_writer && from != nullptr) + if (!cache_writer) { if (current_downloaded_size > 0) throw Exception( @@ -338,14 +324,11 @@ void FileSegment::write(const char * from, size_t size, size_t offset) try { - /// if `from` is nullptr, then we just allocate and hold space by current segment and it was (or would) be written outside - if (cache_writer && from != nullptr) - cache_writer->write(from, size); + cache_writer->write(from, size); std::unique_lock download_lock(download_mutex); - if (cache_writer && from != nullptr) - cache_writer->next(); + cache_writer->next(); downloaded_size += size; } @@ -396,13 +379,6 @@ FileSegment::State FileSegment::wait() } bool FileSegment::reserve(size_t size_to_reserve) -{ - size_t reserved = tryReserve(size_to_reserve, true); - assert(reserved == 0 || reserved == size_to_reserve); - return reserved == size_to_reserve; -} - -size_t FileSegment::tryReserve(size_t size_to_reserve, bool strict) { if (!size_to_reserve) throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Zero space reservation is not allowed"); @@ -418,16 +394,10 @@ size_t FileSegment::tryReserve(size_t size_to_reserve, bool strict) expected_downloaded_size = getDownloadedSizeUnlocked(segment_lock); if (expected_downloaded_size + size_to_reserve > range().size()) - { - if (strict) - { - throw Exception( - ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, - "Attempt to reserve space too much space ({}) for file segment with range: {} (downloaded size: {})", - size_to_reserve, range().toString(), downloaded_size); - } - size_to_reserve = range().size() - expected_downloaded_size; - } + throw Exception( + ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, + "Attempt to reserve space too much space ({}) for file segment with range: {} (downloaded size: {})", + size_to_reserve, range().toString(), downloaded_size); chassert(reserved_size >= expected_downloaded_size); } @@ -445,16 +415,17 @@ size_t FileSegment::tryReserve(size_t size_to_reserve, bool strict) { std::lock_guard cache_lock(cache->mutex); - size_t need_to_reserve = size_to_reserve - already_reserved_size; - reserved = cache->tryReserve(key(), offset(), need_to_reserve, cache_lock); + size_to_reserve = size_to_reserve - already_reserved_size; + reserved = cache->tryReserve(key(), offset(), size_to_reserve, cache_lock); - if (!reserved) - return 0; - - std::lock_guard segment_lock(mutex); - reserved_size += need_to_reserve; + if (reserved) + { + std::lock_guard segment_lock(mutex); + reserved_size += size_to_reserve; + } } - return size_to_reserve; + + return reserved; } void FileSegment::setDownloadedUnlocked([[maybe_unused]] std::unique_lock & segment_lock) @@ -574,15 +545,6 @@ void FileSegment::completeBasedOnCurrentState(std::lock_guard & cach resetDownloaderUnlocked(segment_lock); } - if (segment_kind == FileSegmentKind::Temporary && is_last_holder) - { - LOG_TEST(log, "Removing temporary file segment: {}", getInfoForLogUnlocked(segment_lock)); - detach(cache_lock, segment_lock); - setDownloadState(State::SKIP_CACHE); - cache->remove(key(), offset(), cache_lock, segment_lock); - return; - } - switch (download_state) { case State::SKIP_CACHE: @@ -664,7 +626,7 @@ String FileSegment::getInfoForLogUnlocked(std::unique_lock & segment info << "first non-downloaded offset: " << getFirstNonDownloadedOffsetUnlocked(segment_lock) << ", "; info << "caller id: " << getCallerId() << ", "; info << "detached: " << is_detached << ", "; - info << "kind: " << toString(segment_kind); + info << "persistent: " << is_persistent; return info.str(); } @@ -759,7 +721,7 @@ FileSegmentPtr FileSegment::getSnapshot(const FileSegmentPtr & file_segment, std snapshot->ref_count = file_segment.use_count(); snapshot->downloaded_size = file_segment->getDownloadedSizeUnlocked(segment_lock); snapshot->download_state = file_segment->download_state; - snapshot->segment_kind = file_segment->getKind(); + snapshot->is_persistent = file_segment->isPersistent(); return snapshot; } @@ -821,8 +783,6 @@ FileSegmentsHolder::~FileSegmentsHolder() if (!cache) cache = file_segment->cache; - assert(cache == file_segment->cache); /// all segments should belong to the same cache - try { bool is_detached = false; diff --git a/src/Interpreters/Cache/FileSegment.h b/src/Interpreters/Cache/FileSegment.h index 8915b2f0a36..8f9c0097d77 100644 --- a/src/Interpreters/Cache/FileSegment.h +++ b/src/Interpreters/Cache/FileSegment.h @@ -30,38 +30,9 @@ using FileSegmentPtr = std::shared_ptr; using FileSegments = std::list; -/* - * FileSegmentKind is used to specify the eviction policy for file segments. - */ -enum class FileSegmentKind -{ - /* `Regular` file segment is still in cache after usage, and can be evicted - * (unless there're some holders). - */ - Regular, - - /* `Persistent` file segment can't be evicted from cache, - * it should be removed manually. - */ - Persistent, - - /* `Temporary` file segment is removed right after relesing. - * Also corresponding files are removed during cache loading (if any). - */ - Temporary, -}; - -String toString(FileSegmentKind type); - struct CreateFileSegmentSettings { - FileSegmentKind type = FileSegmentKind::Regular; - - CreateFileSegmentSettings() = default; - - explicit CreateFileSegmentSettings(FileSegmentKind type_) - : type(type_) - {} + bool is_persistent = false; }; class FileSegment : private boost::noncopyable, public std::enable_shared_from_this @@ -156,8 +127,7 @@ public: size_t offset() const { return range().left; } - FileSegmentKind getKind() const { return segment_kind; } - bool isPersistent() const { return segment_kind == FileSegmentKind::Persistent; } + bool isPersistent() const { return is_persistent; } using UniqueId = std::pair; UniqueId getUniqueId() const { return std::pair(key(), offset()); } @@ -213,19 +183,19 @@ public: void assertCorrectness() const; + /** + * ========== Methods for _only_ file segment's `writer` ====================== + */ + + void synchronousWrite(const char * from, size_t size, size_t offset); + /** * ========== Methods for _only_ file segment's `downloader` ================== */ /// Try to reserve exactly `size` bytes. - /// Returns true if reservation was successful, false otherwise. bool reserve(size_t size_to_reserve); - /// Try to reserve at max `size` bytes. - /// Returns actual size reserved. - /// In strict mode throws an error on attempt to reserve space too much space - size_t tryReserve(size_t size_to_reserve, bool strict = false); - /// Write data into reserved space. void write(const char * from, size_t size, size_t offset); @@ -277,9 +247,9 @@ private: void assertIsDownloaderUnlocked(const std::string & operation, std::unique_lock & segment_lock) const; void assertCorrectnessUnlocked(std::unique_lock & segment_lock) const; - /// completeWithoutStateUnlocked() is called from destructor of FileSegmentsHolder. - /// Function might check if the caller of the method - /// is the last alive holder of the segment. Therefore, completion and destruction + /// complete() without any completion state is called from destructor of + /// FileSegmentsHolder. complete() might check if the caller of the method + /// is the last alive holder of the segment. Therefore, complete() and destruction /// of the file segment pointer must be done under the same cache mutex. void completeWithoutStateUnlocked(std::lock_guard & cache_lock); void completeBasedOnCurrentState(std::lock_guard & cache_lock, std::unique_lock & segment_lock); @@ -325,12 +295,12 @@ private: /// In general case, all file segments are owned by cache. bool is_detached = false; - bool is_downloaded = false; + bool is_downloaded{false}; std::atomic hits_count = 0; /// cache hits. std::atomic ref_count = 0; /// Used for getting snapshot state - FileSegmentKind segment_kind; + bool is_persistent; CurrentMetrics::Increment metric_increment{CurrentMetrics::CacheFileSegments}; }; @@ -343,8 +313,6 @@ struct FileSegmentsHolder : private boost::noncopyable FileSegmentsHolder(FileSegmentsHolder && other) noexcept : file_segments(std::move(other.file_segments)) {} - void reset() { file_segments.clear(); } - ~FileSegmentsHolder(); String toString(); diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 796b93998a9..913b0535358 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -32,7 +32,6 @@ #include #include #include -#include #include #include #include @@ -103,7 +102,6 @@ #include #include #include -#include #include #include @@ -748,65 +746,28 @@ void Context::setPath(const String & path) shared->user_scripts_path = shared->path + "user_scripts/"; } -static void setupTmpPath(Poco::Logger * log, const std::string & path) -try -{ - LOG_DEBUG(log, "Setting up {} to store temporary data in it", path); - - fs::create_directories(path); - - /// Clearing old temporary files. - fs::directory_iterator dir_end; - for (fs::directory_iterator it(path); it != dir_end; ++it) - { - if (it->is_regular_file() && startsWith(it->path().filename(), "tmp")) - { - LOG_DEBUG(log, "Removing old temporary file {}", it->path().string()); - fs::remove(it->path()); - } - else - LOG_DEBUG(log, "Found unknown file in temporary path {}", it->path().string()); - } -} -catch (...) -{ - DB::tryLogCurrentException(log, fmt::format( - "Caught exception while setup temporary path: {}. " - "It is ok to skip this exception as cleaning old temporary files is not necessary", path)); -} - -static VolumePtr createLocalSingleDiskVolume(const std::string & path) -{ - auto disk = std::make_shared("_tmp_default", path, 0); - VolumePtr volume = std::make_shared("_tmp_default", disk, 0); - return volume; -} - -void Context::setTemporaryStoragePath(const String & path, size_t max_size) -{ - shared->tmp_path = path; - if (!shared->tmp_path.ends_with('/')) - shared->tmp_path += '/'; - - VolumePtr volume = createLocalSingleDiskVolume(shared->tmp_path); - - for (const auto & disk : volume->getDisks()) - { - setupTmpPath(shared->log, disk->getPath()); - } - - shared->temp_data_on_disk = std::make_shared(volume, nullptr, max_size); -} - -void Context::setTemporaryStoragePolicy(const String & policy_name, size_t max_size) +VolumePtr Context::setTemporaryStorage(const String & path, const String & policy_name, size_t max_size) { std::lock_guard lock(shared->storage_policies_mutex); + VolumePtr volume; - StoragePolicyPtr tmp_policy = getStoragePolicySelector(lock)->get(policy_name); - if (tmp_policy->getVolumes().size() != 1) - throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, - "Policy '{}' is used temporary files, such policy should have exactly one volume", policy_name); - VolumePtr volume = tmp_policy->getVolume(0); + if (policy_name.empty()) + { + shared->tmp_path = path; + if (!shared->tmp_path.ends_with('/')) + shared->tmp_path += '/'; + + auto disk = std::make_shared("_tmp_default", shared->tmp_path, 0); + volume = std::make_shared("_tmp_default", disk, 0); + } + else + { + StoragePolicyPtr tmp_policy = getStoragePolicySelector(lock)->get(policy_name); + if (tmp_policy->getVolumes().size() != 1) + throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, + "Policy '{}' is used temporary files, such policy should have exactly one volume", policy_name); + volume = tmp_policy->getVolume(0); + } if (volume->getDisks().empty()) throw Exception("No disks volume for temporary files", ErrorCodes::NO_ELEMENTS_IN_CONFIG); @@ -828,33 +789,10 @@ void Context::setTemporaryStoragePolicy(const String & policy_name, size_t max_s "Disk '{}' ({}) is not local and can't be used for temporary files", disk_ptr->getName(), typeid(*disk_raw_ptr).name()); } - - setupTmpPath(shared->log, disk->getPath()); } - shared->temp_data_on_disk = std::make_shared(volume, nullptr, max_size); -} - - -void Context::setTemporaryStorageInCache(const String & cache_disk_name, size_t max_size) -{ - auto disk_ptr = getDisk(cache_disk_name); - if (!disk_ptr) - throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "Disk '{}' is not found", cache_disk_name); - - const auto * disk_object_storage_ptr = dynamic_cast(disk_ptr.get()); - if (!disk_object_storage_ptr) - throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "Disk '{}' does not use cache", cache_disk_name); - - auto file_cache = disk_object_storage_ptr->getCache(); - if (!file_cache) - throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "Cache '{}' is not found", file_cache->getBasePath()); - - LOG_DEBUG(shared->log, "Using file cache ({}) for temporary files", file_cache->getBasePath()); - - shared->tmp_path = file_cache->getBasePath(); - VolumePtr volume = createLocalSingleDiskVolume(shared->tmp_path); - shared->temp_data_on_disk = std::make_shared(volume, file_cache.get(), max_size); + shared->temp_data_on_disk = std::make_shared(volume, max_size); + return volume; } void Context::setFlagsPath(const String & path) diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 13bbf8a7ea0..63f321db993 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -463,9 +463,7 @@ public: void addWarningMessage(const String & msg) const; - void setTemporaryStorageInCache(const String & cache_disk_name, size_t max_size); - void setTemporaryStoragePolicy(const String & policy_name, size_t max_size); - void setTemporaryStoragePath(const String & path, size_t max_size); + VolumePtr setTemporaryStorage(const String & path, const String & policy_name, size_t max_size); using ConfigurationPtr = Poco::AutoPtr; diff --git a/src/Interpreters/TemporaryDataOnDisk.cpp b/src/Interpreters/TemporaryDataOnDisk.cpp index a039053b011..c5ae6f6c885 100644 --- a/src/Interpreters/TemporaryDataOnDisk.cpp +++ b/src/Interpreters/TemporaryDataOnDisk.cpp @@ -7,7 +7,6 @@ #include #include #include -#include #include @@ -36,32 +35,35 @@ void TemporaryDataOnDiskScope::deltaAllocAndCheck(ssize_t compressed_delta, ssiz size_t new_consumprion = stat.compressed_size + compressed_delta; if (compressed_delta > 0 && limit && new_consumprion > limit) - throw Exception(ErrorCodes::TOO_MANY_ROWS_OR_BYTES, - "Limit for temporary files size exceeded (would consume {} / {} bytes)", new_consumprion, limit); + throw Exception(ErrorCodes::TOO_MANY_ROWS_OR_BYTES, "Limit for temporary files size exceeded"); stat.compressed_size += compressed_delta; stat.uncompressed_size += uncompressed_delta; } -VolumePtr TemporaryDataOnDiskScope::getVolume() const -{ - if (!volume) - throw Exception("TemporaryDataOnDiskScope has no volume", ErrorCodes::LOGICAL_ERROR); - return volume; -} - TemporaryFileStream & TemporaryDataOnDisk::createStream(const Block & header, size_t max_file_size) { - TemporaryFileStreamPtr tmp_stream; - if (cache) - tmp_stream = TemporaryFileStream::create(cache, header, max_file_size, this); + DiskPtr disk; + if (max_file_size > 0) + { + auto reservation = volume->reserve(max_file_size); + if (!reservation) + throw Exception("Not enough space on temporary disk", ErrorCodes::NOT_ENOUGH_SPACE); + disk = reservation->getDisk(); + } else - tmp_stream = TemporaryFileStream::create(volume, header, max_file_size, this); + { + disk = volume->getDisk(); + } + + auto tmp_file = std::make_unique(disk, current_metric_scope); std::lock_guard lock(mutex); - return *streams.emplace_back(std::move(tmp_stream)); + TemporaryFileStreamPtr & tmp_stream = streams.emplace_back(std::make_unique(std::move(tmp_file), header, this)); + return *tmp_stream; } + std::vector TemporaryDataOnDisk::getStreams() const { std::vector res; @@ -87,13 +89,12 @@ struct TemporaryFileStream::OutputWriter { } - size_t write(const Block & block) + void write(const Block & block) { if (finalized) throw Exception("Cannot write to finalized stream", ErrorCodes::LOGICAL_ERROR); - size_t written_bytes = out_writer.write(block); + out_writer.write(block); num_rows += block.rows(); - return written_bytes; } void finalize() @@ -154,68 +155,21 @@ struct TemporaryFileStream::InputReader NativeReader in_reader; }; -TemporaryFileStreamPtr TemporaryFileStream::create(const VolumePtr & volume, const Block & header, size_t max_file_size, TemporaryDataOnDisk * parent_) -{ - if (!volume) - throw Exception("TemporaryDataOnDiskScope has no volume", ErrorCodes::LOGICAL_ERROR); - - DiskPtr disk; - if (max_file_size > 0) - { - auto reservation = volume->reserve(max_file_size); - if (!reservation) - throw Exception("Not enough space on temporary disk", ErrorCodes::NOT_ENOUGH_SPACE); - disk = reservation->getDisk(); - } - else - { - disk = volume->getDisk(); - } - - auto tmp_file = std::make_unique(disk, parent_->getMetricScope()); - return std::make_unique(std::move(tmp_file), header, /* cache_placeholder */ nullptr, /* parent */ parent_); -} - -TemporaryFileStreamPtr TemporaryFileStream::create(FileCache * cache, const Block & header, size_t max_file_size, TemporaryDataOnDisk * parent_) -{ - auto tmp_file = std::make_unique(fs::path(cache->getBasePath()) / "tmp"); - - auto cache_placeholder = std::make_unique(cache, tmp_file->getPath()); - cache_placeholder->reserveCapacity(max_file_size); - - return std::make_unique(std::move(tmp_file), header, std::move(cache_placeholder), parent_); -} - -TemporaryFileStream::TemporaryFileStream( - TemporaryFileHolder file_, - const Block & header_, - std::unique_ptr space_holder_, - TemporaryDataOnDisk * parent_) +TemporaryFileStream::TemporaryFileStream(TemporaryFileOnDiskHolder file_, const Block & header_, TemporaryDataOnDisk * parent_) : parent(parent_) , header(header_) , file(std::move(file_)) - , space_holder(std::move(space_holder_)) , out_writer(std::make_unique(file->getPath(), header)) { } -size_t TemporaryFileStream::write(const Block & block) +void TemporaryFileStream::write(const Block & block) { if (!out_writer) throw Exception("Writing has been finished", ErrorCodes::LOGICAL_ERROR); - size_t block_size_in_memory = block.bytes(); - - if (space_holder) - space_holder->reserveCapacity(block_size_in_memory); - updateAllocAndCheck(); - - size_t bytes_written = out_writer->write(block); - if (space_holder) - space_holder->setUsed(bytes_written); - - return bytes_written; + out_writer->write(block); } TemporaryFileStream::Stat TemporaryFileStream::finishWriting() diff --git a/src/Interpreters/TemporaryDataOnDisk.h b/src/Interpreters/TemporaryDataOnDisk.h index 2790529754f..11edc8700d2 100644 --- a/src/Interpreters/TemporaryDataOnDisk.h +++ b/src/Interpreters/TemporaryDataOnDisk.h @@ -6,7 +6,6 @@ #include #include #include -#include namespace CurrentMetrics @@ -41,25 +40,23 @@ public: std::atomic uncompressed_size; }; - explicit TemporaryDataOnDiskScope(VolumePtr volume_, FileCache * cache_, size_t limit_) - : volume(std::move(volume_)), cache(cache_), limit(limit_) + explicit TemporaryDataOnDiskScope(VolumePtr volume_, size_t limit_) + : volume(std::move(volume_)), limit(limit_) {} explicit TemporaryDataOnDiskScope(TemporaryDataOnDiskScopePtr parent_, size_t limit_) - : parent(std::move(parent_)), volume(parent->volume), cache(parent->cache), limit(limit_) + : parent(std::move(parent_)), volume(parent->volume), limit(limit_) {} /// TODO: remove /// Refactor all code that uses volume directly to use TemporaryDataOnDisk. - VolumePtr getVolume() const; + VolumePtr getVolume() const { return volume; } protected: void deltaAllocAndCheck(ssize_t compressed_delta, ssize_t uncompressed_delta); TemporaryDataOnDiskScopePtr parent = nullptr; - VolumePtr volume; - FileCache * cache = nullptr; StatAtomic stat; size_t limit = 0; @@ -94,7 +91,6 @@ public: bool empty() const; const StatAtomic & getStat() const { return stat; } - CurrentMetrics::Value getMetricScope() const { return current_metric_scope; } private: mutable std::mutex mutex; @@ -120,14 +116,9 @@ public: size_t num_rows = 0; }; - static TemporaryFileStreamPtr create(const VolumePtr & volume, const Block & header, size_t max_file_size, TemporaryDataOnDisk * parent_); - static TemporaryFileStreamPtr create(FileCache * cache, const Block & header, size_t max_file_size, TemporaryDataOnDisk * parent_); - - TemporaryFileStream(TemporaryFileHolder file_, const Block & header_, std::unique_ptr space_holder, TemporaryDataOnDisk * parent_); - - /// Returns number of written bytes - size_t write(const Block & block); + TemporaryFileStream(TemporaryFileOnDiskHolder file_, const Block & header_, TemporaryDataOnDisk * parent_); + void write(const Block & block); Stat finishWriting(); bool isWriteFinished() const; @@ -151,8 +142,7 @@ private: Block header; - TemporaryFileHolder file; - std::unique_ptr space_holder; + TemporaryFileOnDiskHolder file; Stat stat; diff --git a/src/Interpreters/tests/gtest_lru_file_cache.cpp b/src/Interpreters/tests/gtest_lru_file_cache.cpp index 2d408bd9b34..22150b9f656 100644 --- a/src/Interpreters/tests/gtest_lru_file_cache.cpp +++ b/src/Interpreters/tests/gtest_lru_file_cache.cpp @@ -6,7 +6,6 @@ #include #include #include -#include #include #include #include @@ -15,14 +14,11 @@ #include #include #include -#include - -#include namespace fs = std::filesystem; -using namespace DB; -static constexpr auto TEST_LOG_LEVEL = "debug"; +fs::path caches_dir = fs::current_path() / "lru_cache_test"; +String cache_base_path = caches_dir / "cache1" / ""; void assertRange( [[maybe_unused]] size_t assert_n, DB::FileSegmentPtr file_segment, @@ -57,7 +53,7 @@ String getFileSegmentPath(const String & base_path, const DB::FileCache::Key & k return fs::path(base_path) / key_str.substr(0, 3) / key_str / DB::toString(offset); } -void download(const std::string & cache_base_path, DB::FileSegmentPtr file_segment) +void download(DB::FileSegmentPtr file_segment) { const auto & key = file_segment->key(); size_t size = file_segment->range().size(); @@ -71,57 +67,30 @@ void download(const std::string & cache_base_path, DB::FileSegmentPtr file_segme file_segment->write(data.data(), size, file_segment->getCurrentWriteOffset()); } -void prepareAndDownload(const std::string & cache_base_path, DB::FileSegmentPtr file_segment) +void prepareAndDownload(DB::FileSegmentPtr file_segment) { + // std::cerr << "Reserving: " << file_segment->range().size() << " for: " << file_segment->range().toString() << "\n"; ASSERT_TRUE(file_segment->reserve(file_segment->range().size())); - download(cache_base_path, file_segment); + download(file_segment); } -void complete(const std::string & cache_base_path, const DB::FileSegmentsHolder & holder) +void complete(const DB::FileSegmentsHolder & holder) { for (const auto & file_segment : holder.file_segments) { ASSERT_TRUE(file_segment->getOrSetDownloader() == DB::FileSegment::getCallerId()); - prepareAndDownload(cache_base_path, file_segment); + prepareAndDownload(file_segment); file_segment->completeWithState(DB::FileSegment::State::DOWNLOADED); } } -class FileCacheTest : public ::testing::Test + +TEST(FileCache, get) { -public: + if (fs::exists(cache_base_path)) + fs::remove_all(cache_base_path); + fs::create_directories(cache_base_path); - static void setupLogs(const std::string & level) - { - Poco::AutoPtr channel(new Poco::ConsoleChannel(std::cerr)); - Poco::Logger::root().setChannel(channel); - Poco::Logger::root().setLevel(level); - } - - void SetUp() override - { - if(const char * test_log_level = std::getenv("TEST_LOG_LEVEL")) // NOLINT(concurrency-mt-unsafe) - setupLogs(test_log_level); - else - setupLogs(TEST_LOG_LEVEL); - - if (fs::exists(cache_base_path)) - fs::remove_all(cache_base_path); - fs::create_directories(cache_base_path); - } - - void TearDown() override - { - if (fs::exists(cache_base_path)) - fs::remove_all(cache_base_path); - } - - fs::path caches_dir = fs::current_path() / "lru_cache_test"; - std::string cache_base_path = caches_dir / "cache1" / ""; -}; - -TEST_F(FileCacheTest, get) -{ DB::ThreadStatus thread_status; /// To work with cache need query_id and query context. @@ -157,7 +126,7 @@ TEST_F(FileCacheTest, get) ASSERT_TRUE(segments[0]->reserve(segments[0]->range().size())); assertRange(2, segments[0], DB::FileSegment::Range(0, 9), DB::FileSegment::State::DOWNLOADING); - download(cache_base_path, segments[0]); + download(segments[0]); segments[0]->completeWithState(DB::FileSegment::State::DOWNLOADED); assertRange(3, segments[0], DB::FileSegment::Range(0, 9), DB::FileSegment::State::DOWNLOADED); } @@ -178,7 +147,7 @@ TEST_F(FileCacheTest, get) assertRange(5, segments[1], DB::FileSegment::Range(10, 14), DB::FileSegment::State::EMPTY); ASSERT_TRUE(segments[1]->getOrSetDownloader() == DB::FileSegment::getCallerId()); - prepareAndDownload(cache_base_path, segments[1]); + prepareAndDownload(segments[1]); segments[1]->completeWithState(DB::FileSegment::State::DOWNLOADED); assertRange(6, segments[1], DB::FileSegment::Range(10, 14), DB::FileSegment::State::DOWNLOADED); } @@ -211,8 +180,8 @@ TEST_F(FileCacheTest, get) assertRange(10, segments[0], DB::FileSegment::Range(10, 14), DB::FileSegment::State::DOWNLOADED); } - complete(cache_base_path, cache.getOrSet(key, 17, 4, {})); /// Get [17, 20] - complete(cache_base_path, cache.getOrSet(key, 24, 3, {})); /// Get [24, 26] + complete(cache.getOrSet(key, 17, 4, {})); /// Get [17, 20] + complete(cache.getOrSet(key, 24, 3, {})); /// Get [24, 26] /// completeWithState(cache.getOrSet(key, 27, 1, false)); /// Get [27, 27] /// Current cache: [__________][_____] [____] [___][] @@ -234,7 +203,7 @@ TEST_F(FileCacheTest, get) assertRange(13, segments[2], DB::FileSegment::Range(15, 16), DB::FileSegment::State::EMPTY); ASSERT_TRUE(segments[2]->getOrSetDownloader() == DB::FileSegment::getCallerId()); - prepareAndDownload(cache_base_path, segments[2]); + prepareAndDownload(segments[2]); segments[2]->completeWithState(DB::FileSegment::State::DOWNLOADED); @@ -275,7 +244,7 @@ TEST_F(FileCacheTest, get) assertRange(21, segments[3], DB::FileSegment::Range(21, 21), DB::FileSegment::State::EMPTY); ASSERT_TRUE(segments[3]->getOrSetDownloader() == DB::FileSegment::getCallerId()); - prepareAndDownload(cache_base_path, segments[3]); + prepareAndDownload(segments[3]); segments[3]->completeWithState(DB::FileSegment::State::DOWNLOADED); ASSERT_TRUE(segments[3]->state() == DB::FileSegment::State::DOWNLOADED); @@ -298,8 +267,8 @@ TEST_F(FileCacheTest, get) ASSERT_TRUE(segments[0]->getOrSetDownloader() == DB::FileSegment::getCallerId()); ASSERT_TRUE(segments[2]->getOrSetDownloader() == DB::FileSegment::getCallerId()); - prepareAndDownload(cache_base_path, segments[0]); - prepareAndDownload(cache_base_path, segments[2]); + prepareAndDownload(segments[0]); + prepareAndDownload(segments[2]); segments[0]->completeWithState(DB::FileSegment::State::DOWNLOADED); segments[2]->completeWithState(DB::FileSegment::State::DOWNLOADED); } @@ -321,8 +290,8 @@ TEST_F(FileCacheTest, get) ASSERT_TRUE(s5[0]->getOrSetDownloader() == DB::FileSegment::getCallerId()); ASSERT_TRUE(s1[0]->getOrSetDownloader() == DB::FileSegment::getCallerId()); - prepareAndDownload(cache_base_path, s5[0]); - prepareAndDownload(cache_base_path, s1[0]); + prepareAndDownload(s5[0]); + prepareAndDownload(s1[0]); s5[0]->completeWithState(DB::FileSegment::State::DOWNLOADED); s1[0]->completeWithState(DB::FileSegment::State::DOWNLOADED); @@ -425,7 +394,7 @@ TEST_F(FileCacheTest, get) cv.wait(lock, [&]{ return lets_start_download; }); } - prepareAndDownload(cache_base_path, segments[2]); + prepareAndDownload(segments[2]); segments[2]->completeWithState(DB::FileSegment::State::DOWNLOADED); ASSERT_TRUE(segments[2]->state() == DB::FileSegment::State::DOWNLOADED); @@ -490,7 +459,7 @@ TEST_F(FileCacheTest, get) ASSERT_TRUE(segments_2[1]->state() == DB::FileSegment::State::PARTIALLY_DOWNLOADED); ASSERT_TRUE(segments_2[1]->getOrSetDownloader() == DB::FileSegment::getCallerId()); - prepareAndDownload(cache_base_path, segments_2[1]); + prepareAndDownload(segments_2[1]); segments_2[1]->completeWithState(DB::FileSegment::State::DOWNLOADED); }); @@ -548,141 +517,3 @@ TEST_F(FileCacheTest, get) } } - -TEST_F(FileCacheTest, rangeWriter) -{ - DB::FileCacheSettings settings; - settings.max_size = 25; - settings.max_elements = 5; - settings.max_file_segment_size = 10; - - DB::FileCache cache(cache_base_path, settings); - cache.initialize(); - auto key = cache.hash("key1"); - - DB::FileSegmentRangeWriter writer(&cache, key, nullptr, "", "key1"); - - std::string data(100, '\xf0'); - - size_t total_written = 0; - for (const size_t size : {3, 5, 8, 1, 1, 3}) - { - total_written += size; - ASSERT_EQ(writer.tryWrite(data.data(), size, writer.currentOffset()), size); - } - ASSERT_LT(total_written, settings.max_size); - - size_t offset_before_unsuccessful_write = writer.currentOffset(); - size_t space_left = settings.max_size - total_written; - ASSERT_EQ(writer.tryWrite(data.data(), space_left + 1, writer.currentOffset()), 0); - - ASSERT_EQ(writer.currentOffset(), offset_before_unsuccessful_write); - - ASSERT_EQ(writer.tryWrite(data.data(), space_left, writer.currentOffset()), space_left); - - writer.finalize(); -} - -static Block generateBlock(size_t size = 0) -{ - Block block; - ColumnWithTypeAndName column; - column.name = "x"; - column.type = std::make_shared(); - - { - MutableColumnPtr mut_col = column.type->createColumn(); - for (size_t i = 0; i < size; ++i) - mut_col->insert(i); - column.column = std::move(mut_col); - } - - block.insert(column); - - LOG_DEBUG(&Poco::Logger::get("FileCacheTest"), "generated block {} bytes", block.bytes()); - return block; -} - -static size_t readAllTemporaryData(TemporaryFileStream & stream) -{ - Block block; - size_t read_rows = 0; - do - { - block = stream.read(); - read_rows += block.rows(); - } while (block); - return read_rows; -} - -TEST_F(FileCacheTest, temporaryData) -{ - DB::FileCacheSettings settings; - settings.max_size = 10240; - settings.max_file_segment_size = 1024; - - DB::FileCache file_cache(cache_base_path, settings); - file_cache.initialize(); - - auto tmp_data_scope = std::make_shared(nullptr, &file_cache, 0); - - auto some_data_holder = file_cache.getOrSet(file_cache.hash("some_data"), 0, 1024 * 5, CreateFileSegmentSettings{}); - - { - auto segments = fromHolder(some_data_holder); - ASSERT_EQ(segments.size(), 5); - for (auto & segment : segments) - { - ASSERT_TRUE(segment->getOrSetDownloader() == DB::FileSegment::getCallerId()); - ASSERT_TRUE(segment->reserve(segment->range().size())); - download(cache_base_path, segment); - segment->completeWithState(DB::FileSegment::State::DOWNLOADED); - } - } - - size_t size_used_before_temporary_data = file_cache.getUsedCacheSize(); - size_t segments_used_before_temporary_data = file_cache.getFileSegmentsNum(); - ASSERT_GT(size_used_before_temporary_data, 0); - ASSERT_GT(segments_used_before_temporary_data, 0); - - size_t size_used_with_temporary_data; - size_t segments_used_with_temporary_data; - { - auto tmp_data = std::make_unique(tmp_data_scope); - - auto & stream = tmp_data->createStream(generateBlock()); - - ASSERT_GT(stream.write(generateBlock(100)), 0); - - EXPECT_GT(file_cache.getUsedCacheSize(), 0); - EXPECT_GT(file_cache.getFileSegmentsNum(), 0); - - size_t used_size_before_attempt = file_cache.getUsedCacheSize(); - /// data can't be evicted because it is still held by `some_data_holder` - ASSERT_THROW(stream.write(generateBlock(1000)), DB::Exception); - - ASSERT_EQ(file_cache.getUsedCacheSize(), used_size_before_attempt); - - some_data_holder.reset(); - - stream.write(generateBlock(1011)); - - auto stat = stream.finishWriting(); - - EXPECT_EQ(stat.num_rows, 1111); - EXPECT_EQ(readAllTemporaryData(stream), 1111); - - size_used_with_temporary_data = file_cache.getUsedCacheSize(); - segments_used_with_temporary_data = file_cache.getFileSegmentsNum(); - EXPECT_GT(size_used_with_temporary_data, 0); - EXPECT_GT(segments_used_with_temporary_data, 0); - } - - /// All temp data should be evicted after removing temporary files - EXPECT_LE(file_cache.getUsedCacheSize(), size_used_with_temporary_data); - EXPECT_LE(file_cache.getFileSegmentsNum(), segments_used_with_temporary_data); - - /// Some segments reserved by `some_data_holder` was eviced by temporary data - EXPECT_LE(file_cache.getUsedCacheSize(), size_used_before_temporary_data); - EXPECT_LE(file_cache.getFileSegmentsNum(), segments_used_before_temporary_data); -} diff --git a/src/Storages/MergeTree/MergeTask.h b/src/Storages/MergeTree/MergeTask.h index 2b8d9055cff..6a29cdbb5ca 100644 --- a/src/Storages/MergeTree/MergeTask.h +++ b/src/Storages/MergeTree/MergeTask.h @@ -192,7 +192,7 @@ private: bool force_ttl{false}; CompressionCodecPtr compression_codec{nullptr}; size_t sum_input_rows_upper_bound{0}; - std::unique_ptr rows_sources_file{nullptr}; + std::unique_ptr rows_sources_file{nullptr}; std::unique_ptr rows_sources_uncompressed_write_buf{nullptr}; std::unique_ptr rows_sources_write_buf{nullptr}; std::optional column_sizes{}; @@ -257,7 +257,7 @@ private: /// Begin dependencies from previous stage std::unique_ptr rows_sources_write_buf{nullptr}; std::unique_ptr rows_sources_uncompressed_write_buf{nullptr}; - std::unique_ptr rows_sources_file; + std::unique_ptr rows_sources_file; std::optional column_sizes; CompressionCodecPtr compression_codec; DiskPtr tmp_disk{nullptr}; diff --git a/src/Storages/System/StorageSystemFilesystemCache.cpp b/src/Storages/System/StorageSystemFilesystemCache.cpp index bec92a60436..cd9324b3253 100644 --- a/src/Storages/System/StorageSystemFilesystemCache.cpp +++ b/src/Storages/System/StorageSystemFilesystemCache.cpp @@ -24,8 +24,7 @@ NamesAndTypesList StorageSystemFilesystemCache::getNamesAndTypes() {"cache_hits", std::make_shared()}, {"references", std::make_shared()}, {"downloaded_size", std::make_shared()}, - {"persistent", std::make_shared>()}, - {"kind", std::make_shared()}, + {"persistent", std::make_shared>()} }; } @@ -46,11 +45,8 @@ void StorageSystemFilesystemCache::fillData(MutableColumns & res_columns, Contex for (const auto & file_segment : file_segments) { res_columns[0]->insert(cache_base_path); - - /// Do not use `file_segment->getPathInLocalCache` here because it will lead to nullptr dereference - /// (because file_segments in getSnapshot doesn't have `cache` field set) res_columns[1]->insert( - cache->getPathInLocalCache(file_segment->key(), file_segment->offset(), file_segment->getKind())); + cache->getPathInLocalCache(file_segment->key(), file_segment->offset(), file_segment->isPersistent())); const auto & range = file_segment->range(); res_columns[2]->insert(range.left); @@ -61,7 +57,6 @@ void StorageSystemFilesystemCache::fillData(MutableColumns & res_columns, Contex res_columns[7]->insert(file_segment->getRefCount()); res_columns[8]->insert(file_segment->getDownloadedSize()); res_columns[9]->insert(file_segment->isPersistent()); - res_columns[10]->insert(toString(file_segment->getKind())); } } } diff --git a/tests/integration/test_temporary_data_in_cache/__init__.py b/tests/integration/test_temporary_data_in_cache/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/tests/integration/test_temporary_data_in_cache/configs/config.d/storage_configuration.xml b/tests/integration/test_temporary_data_in_cache/configs/config.d/storage_configuration.xml deleted file mode 100644 index 8ccd705c6f1..00000000000 --- a/tests/integration/test_temporary_data_in_cache/configs/config.d/storage_configuration.xml +++ /dev/null @@ -1,39 +0,0 @@ - - - - - local - /local_disk/ - - - - cache - local_disk - /tiny_local_cache/ - 10M - 1M - 1 - 0 - - - - - - local - /tiny_local_cache/ - - - - - - -
- tiny_local_cache -
-
-
-
-
- - tiny_local_cache -
diff --git a/tests/integration/test_temporary_data_in_cache/test.py b/tests/integration/test_temporary_data_in_cache/test.py deleted file mode 100644 index ba57348ee37..00000000000 --- a/tests/integration/test_temporary_data_in_cache/test.py +++ /dev/null @@ -1,81 +0,0 @@ -# pylint: disable=unused-argument -# pylint: disable=redefined-outer-name - -import pytest - -from helpers.cluster import ClickHouseCluster -from helpers.client import QueryRuntimeException - -cluster = ClickHouseCluster(__file__) - -node = cluster.add_instance( - "node", - main_configs=["configs/config.d/storage_configuration.xml"], - tmpfs=["/local_disk:size=50M", "/tiny_local_cache:size=12M"], -) - - -@pytest.fixture(scope="module") -def start_cluster(): - try: - cluster.start() - yield cluster - finally: - cluster.shutdown() - - -def test_cache_evicted_by_temporary_data(start_cluster): - q = node.query - qi = lambda query: int(node.query(query).strip()) - - cache_size_initial = qi("SELECT sum(size) FROM system.filesystem_cache") - assert cache_size_initial == 0 - - free_space_initial = qi( - "SELECT free_space FROM system.disks WHERE name = 'tiny_local_cache_local_disk'" - ) - assert free_space_initial > 8 * 1024 * 1024 - - q( - "CREATE TABLE t1 (x UInt64) ENGINE = MergeTree ORDER BY x SETTINGS storage_policy = 'tiny_local_cache'" - ) - q("INSERT INTO t1 SELECT number FROM numbers(1024 * 1024)") - - # To be sure that nothing is reading the cache and entries for t1 can be evited - q("OPTIMIZE TABLE t1 FINAL") - q("SYSTEM STOP MERGES t1") - - # Read some data to fill the cache - q("SELECT sum(x) FROM t1") - - cache_size_with_t1 = qi("SELECT sum(size) FROM system.filesystem_cache") - assert cache_size_with_t1 > 8 * 1024 * 1024 - - # Almost all disk space is occupied by t1 cache - free_space_with_t1 = qi( - "SELECT free_space FROM system.disks WHERE name = 'tiny_local_cache_local_disk'" - ) - assert free_space_with_t1 < 4 * 1024 * 1024 - - # Try to sort the table, but fail because of lack of disk space - with pytest.raises(QueryRuntimeException) as exc: - q( - "SELECT ignore(*) FROM numbers(10 * 1024 * 1024) ORDER BY sipHash64(number)", - settings={ - "max_bytes_before_external_group_by": "4M", - "max_bytes_before_external_sort": "4M", - }, - ) - assert "Cannot reserve space in file cache" in str(exc.value) - - # Some data evicted from cache by temporary data - cache_size_after_eviction = qi("SELECT sum(size) FROM system.filesystem_cache") - assert cache_size_after_eviction < cache_size_with_t1 - - # Disk space freed, at least 3 MB, because temporary data tried to write 4 MB - free_space_after_eviction = qi( - "SELECT free_space FROM system.disks WHERE name = 'tiny_local_cache_local_disk'" - ) - assert free_space_after_eviction > free_space_with_t1 + 3 * 1024 * 1024 - - q("DROP TABLE IF EXISTS t1") diff --git a/tests/integration/test_tmp_policy/test.py b/tests/integration/test_tmp_policy/test.py index 870a70b127a..c919d9a0c3d 100644 --- a/tests/integration/test_tmp_policy/test.py +++ b/tests/integration/test_tmp_policy/test.py @@ -23,7 +23,7 @@ def start_cluster(): cluster.shutdown() -def test_disk_selection(start_cluster): +def test_different_versions(start_cluster): query = "SELECT count(ignore(*)) FROM (SELECT * FROM system.numbers LIMIT 1e7) GROUP BY number" settings = { "max_bytes_before_external_group_by": 1 << 20,