This commit is contained in:
kssenii 2022-06-21 19:30:21 +02:00
parent 90be49faa4
commit 90ce0b44f0
18 changed files with 78 additions and 76 deletions

View File

@ -561,7 +561,9 @@ void FileSegment::completeImpl(std::lock_guard<std::mutex> & cache_lock, std::lo
*/
download_state = State::PARTIALLY_DOWNLOADED_NO_CONTINUATION;
/// Resize this file segment by creating a copy file segment with DOWNLOADED state,
/// but current file segment should remain PARRTIALLY_DOWNLOADED_NO_CONTINUATION and with detached state.
/// but current file segment should remain PARRTIALLY_DOWNLOADED_NO_CONTINUATION and with detached state,
/// because otherwise an invariant that getOrSet() returns a contiguous range of file segments will be broken
/// (this will be crucial for other file segment holder, not for current one).
cache->reduceSizeToDownloaded(key(), offset(), cache_lock, segment_lock);
}

View File

@ -127,7 +127,14 @@ protected:
virtual void reduceSizeToDownloaded(
const Key & key, size_t offset,
std::lock_guard<std::mutex> & cache_lock,
std::lock_guard<std::mutex> & /* segment_lock */) = 0;
std::lock_guard<std::mutex> & segment_lock) = 0;
virtual FileSegmentPtr setDownloading(
const Key & key,
size_t offset,
size_t size,
bool is_persistent,
std::lock_guard<std::mutex> & cache_lock) = 0;
void assertInitialized() const;

View File

@ -436,6 +436,13 @@ FileSegmentPtr LRUFileCache::setDownloading(
bool is_persistent,
std::lock_guard<std::mutex> & cache_lock)
{
/**
* Create a file segment of exactly requested size with EMPTY state.
* Throw exception if requested size exceeds max allowed file segment size.
* This method is for protected usage: file segment range writer uses it
* to dynamically allocate file segments.
*/
#ifndef NDEBUG
assertCacheCorrectness(key, cache_lock);
#endif

View File

@ -255,7 +255,19 @@ void DiskRestartProxy::removeRecursive(const String & path)
void DiskRestartProxy::removeSharedFile(const String & path, bool keep_s3)
{
ReadLock lock (mutex);
return DiskDecorator::removeSharedFile(path, keep_s3);
DiskDecorator::removeSharedFile(path, keep_s3);
}
void DiskRestartProxy::removeSharedFiles(const RemoveBatchRequest & files, bool keep_all_batch_data, const NameSet & file_names_remove_metadata_only)
{
ReadLock lock (mutex);
DiskDecorator::removeSharedFiles(files, keep_all_batch_data, file_names_remove_metadata_only);
}
void DiskRestartProxy::removeSharedRecursive(const String & path, bool keep_all_batch_data, const NameSet & file_names_remove_metadata_only)
{
ReadLock lock (mutex);
DiskDecorator::removeSharedRecursive(path, keep_all_batch_data, file_names_remove_metadata_only);
}
void DiskRestartProxy::setLastModified(const String & path, const Poco::Timestamp & timestamp)

View File

@ -55,6 +55,8 @@ public:
void removeDirectory(const String & path) override;
void removeRecursive(const String & path) override;
void removeSharedFile(const String & path, bool keep_s3) override;
void removeSharedFiles(const RemoveBatchRequest & files, bool keep_all_batch_data, const NameSet & file_names_remove_metadata_only) override;
void removeSharedRecursive(const String & path, bool keep_all_batch_data, const NameSet & file_names_remove_metadata_only) override;
void setLastModified(const String & path, const Poco::Timestamp & timestamp) override;
Poco::Timestamp getLastModified(const String & path) const override;
void setReadOnly(const String & path) override;

View File

@ -96,20 +96,6 @@ void IDisk::truncateFile(const String &, size_t)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Truncate operation is not implemented for disk of type {}", getType());
}
void IDisk::iterateRecursively(const String & path, std::function<void(const String & path)> func)
{
if (isFile(path))
{
func(path);
return;
}
for (auto it = iterateDirectory(path); it->isValid(); it->next())
{
iterateRecursively(it->path(), func);
}
}
SyncGuardPtr IDisk::getDirectorySyncGuard(const String & /* path */) const
{
return nullptr;

View File

@ -142,10 +142,7 @@ public:
/// Return iterator to the contents of the specified directory.
virtual DirectoryIteratorPtr iterateDirectory(const String & path) const = 0;
/// Iterate all disk files by path and apply a callback to each path.
void iterateRecursively(const String & path, std::function<void(const String & path)>);
/// Return `true` if the specified directoskry is empty.
/// Return `true` if the specified directory is empty.
bool isDirectoryEmpty(const String & path) const;
/// Create empty file at `path`.

View File

@ -58,14 +58,13 @@ CachedReadBufferFromFile::CachedReadBufferFromFile(
, settings(settings_)
, read_until_position(read_until_position_ ? *read_until_position_ : file_size_)
, implementation_buffer_creator(implementation_buffer_creator_)
, is_persistent(settings_.is_file_cache_persistent)
, query_id(query_id_)
, enable_logging(!query_id.empty() && settings_.enable_filesystem_cache_log)
, current_buffer_id(getRandomASCIIString(8))
, allow_seeks(allow_seeks_)
, use_external_buffer(use_external_buffer_)
, query_context_holder(cache_->getQueryContextHolder(query_id, settings_))
, is_persistent(false) /// Unused for now, see PR 36171
, is_persistent(settings_.is_file_cache_persistent)
{
}
@ -947,7 +946,7 @@ bool CachedReadBufferFromFile::nextImplStep()
{
download_current_segment = false;
file_segment->complete(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
LOG_DEBUG(log, "No space left in cache, will continue without cache download");
LOG_DEBUG(log, "No space left in cache, will continue without cache download ({})", file_segment->getInfoForLog());
}
}

View File

@ -128,8 +128,6 @@ private:
String nextimpl_step_log_info;
String last_caller_id;
bool is_persistent = false;
String query_id;
bool enable_logging = false;
String current_buffer_id;

View File

@ -80,7 +80,7 @@ std::unique_ptr<ReadBufferFromFileBase> CachedObjectStorage::readObjects( /// NO
std::optional<size_t> file_size) const
{
auto modified_read_settings = getReadSettingsForCache(read_settings);
auto impl = object_storage->readObjects(common_path_prefix, blobs_to_read, read_settings, read_hint, file_size);
auto impl = object_storage->readObjects(common_path_prefix, blobs_to_read, modified_read_settings, read_hint, file_size);
/// If underlying read buffer does caching on its own, do not wrap it in caching buffer.
if (impl->isIntegratedWithFilesystemCache()

View File

@ -388,7 +388,8 @@ time_t DiskObjectStorage::getLastChanged(const String & path) const
void DiskObjectStorage::removeMetadata(const String & path, std::vector<String> & paths_to_remove)
{
LOG_TEST(log, "Remove file by path: {}", backQuote(metadata_storage->getPath() + path));
String full_path = fs::path(metadata_storage->getPath()) / path;
LOG_TEST(log, "Remove file by path: {}", backQuote(full_path));
if (!metadata_storage->exists(path))
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "Metadata path '{}' doesn't exist", path);
@ -403,7 +404,7 @@ void DiskObjectStorage::removeMetadata(const String & path, std::vector<String>
bool is_remote = object_storage->isRemote();
if (!is_remote)
object_storage->removeCacheIfExists(path);
object_storage->removeCacheIfExists(full_path);
auto tx = metadata_storage->createTransaction();
tx->unlinkMetadata(path);

View File

@ -1,5 +1,4 @@
#include "MetadataStorageFromLocalDisk.h"
#include <Disks/ObjectStorages/MetadataStorageFromDiskTransactionOperations.h>
#include <Disks/IDisk.h>
@ -18,7 +17,7 @@ MetadataStorageFromLocalDisk::MetadataStorageFromLocalDisk(DiskPtr disk_)
MetadataTransactionPtr MetadataStorageFromLocalDisk::createTransaction() const
{
return std::make_shared<MetadataStorageFromLocalDiskTransaction>(*this);
return std::make_shared<MetadataStorageFromLocalDiskTransaction>(*this, disk);
}
const std::string & MetadataStorageFromLocalDisk::getPath() const
@ -99,61 +98,61 @@ uint32_t MetadataStorageFromLocalDisk::getHardlinkCount(const std::string & path
return disk->getRefCount(path);
}
void MetadataStorageFromLocalDiskTransaction::writeStringToFile( /// NOLINT
const std::string & path,
const std::string & data)
void MetadataStorageFromLocalDiskTransaction::writeStringToFile(const std::string & path, const std::string & data) /// NOLINT
{
addOperation(std::make_unique<WriteFileOperation>(path, *metadata_storage_for_local.getDisk(), data));
auto wb = disk->writeFile(path);
wb->write(data.data(), data.size());
wb->finalize();
}
void MetadataStorageFromLocalDiskTransaction::setLastModified(const std::string & path, const Poco::Timestamp & timestamp)
{
addOperation(std::make_unique<SetLastModifiedOperation>(path, timestamp, *metadata_storage_for_local.getDisk()));
disk->setLastModified(path, timestamp);
}
void MetadataStorageFromLocalDiskTransaction::unlinkFile(const std::string & path)
{
addOperation(std::make_unique<UnlinkFileOperation>(path, *metadata_storage_for_local.getDisk()));
disk->removeFile(path);
}
void MetadataStorageFromLocalDiskTransaction::removeRecursive(const std::string & path)
{
addOperation(std::make_unique<RemoveRecursiveOperation>(path, *metadata_storage_for_local.getDisk()));
disk->removeRecursive(path);
}
void MetadataStorageFromLocalDiskTransaction::createDirectory(const std::string & path)
{
addOperation(std::make_unique<CreateDirectoryOperation>(path, *metadata_storage_for_local.getDisk()));
disk->createDirectory(path);
}
void MetadataStorageFromLocalDiskTransaction::createDicrectoryRecursive(const std::string & path)
{
addOperation(std::make_unique<CreateDirectoryRecursiveOperation>(path, *metadata_storage_for_local.getDisk()));
disk->createDirectories(path);
}
void MetadataStorageFromLocalDiskTransaction::removeDirectory(const std::string & path)
{
addOperation(std::make_unique<RemoveDirectoryOperation>(path, *metadata_storage_for_local.getDisk()));
disk->removeDirectory(path);
}
void MetadataStorageFromLocalDiskTransaction::moveFile(const std::string & path_from, const std::string & path_to)
{
addOperation(std::make_unique<MoveFileOperation>(path_from, path_to, *metadata_storage_for_local.getDisk()));
disk->moveFile(path_from, path_to);
}
void MetadataStorageFromLocalDiskTransaction::moveDirectory(const std::string & path_from, const std::string & path_to)
{
addOperation(std::make_unique<MoveDirectoryOperation>(path_from, path_to, *metadata_storage_for_local.getDisk()));
disk->moveDirectory(path_from, path_to);
}
void MetadataStorageFromLocalDiskTransaction::replaceFile(const std::string & path_from, const std::string & path_to)
{
addOperation(std::make_unique<ReplaceFileOperation>(path_from, path_to, *metadata_storage_for_local.getDisk()));
disk->replaceFile(path_from, path_to);
}
void MetadataStorageFromLocalDiskTransaction::setReadOnly(const std::string & path)
{
addOperation(std::make_unique<SetReadOnlyOperation>(path, *metadata_storage_for_local.getDisk()));
disk->setReadOnly(path);
}
void MetadataStorageFromLocalDiskTransaction::createHardLink(const std::string & /* path_from */, const std::string & /* path_from */)

View File

@ -54,11 +54,13 @@ class MetadataStorageFromLocalDiskTransaction final : public MetadataStorageFrom
{
private:
const MetadataStorageFromLocalDisk & metadata_storage_for_local;
DiskPtr disk;
public:
explicit MetadataStorageFromLocalDiskTransaction(const MetadataStorageFromLocalDisk & metadata_storage_)
explicit MetadataStorageFromLocalDiskTransaction(const MetadataStorageFromLocalDisk & metadata_storage_, DiskPtr disk_)
: MetadataStorageFromDiskTransaction(metadata_storage_)
, metadata_storage_for_local(metadata_storage_)
, disk(disk_)
{}
void writeStringToFile(const std::string & path, const std::string & data) override;

View File

@ -322,6 +322,7 @@ BlockIO InterpreterSystemQuery::execute()
{
auto cache = FileCacheFactory::instance().get(query.filesystem_cache_path);
cache->removeIfReleasable(query.drop_persistent_files);
}
break;
}
case Type::RELOAD_DICTIONARY:

View File

@ -198,6 +198,7 @@ ColumnsDescription StorageHDFS::getTableStructureFromData(
return nullptr;
auto compression = chooseCompressionMethod(*it, compression_method);
auto impl = std::make_unique<ReadBufferFromHDFS>(uri_without_path, *it++, ctx->getGlobalContext()->getConfigRef(), ctx->getReadSettings());
const auto zstd_window_log_max = ctx->getSettingsRef().zstd_window_log_max;
return wrapReadBufferWithCompressionMethod(std::move(impl), compression, zstd_window_log_max);
};
return readSchemaFromFormat(format, std::nullopt, read_buffer_iterator, paths.size() > 1, ctx);
@ -329,6 +330,7 @@ bool HDFSSource::initialize()
auto compression = chooseCompressionMethod(path_from_uri, storage->compression_method);
auto impl = std::make_unique<ReadBufferFromHDFS>(
uri_without_path, path_from_uri, getContext()->getGlobalContext()->getConfigRef(), getContext()->getReadSettings());
const auto zstd_window_log_max = getContext()->getSettingsRef().zstd_window_log_max;
read_buf = wrapReadBufferWithCompressionMethod(std::move(impl), compression, zstd_window_log_max);
auto input_format = getContext()->getInputFormat(storage->format_name, *read_buf, block_for_format, max_block_size);

View File

@ -11,9 +11,10 @@ TMP_PATH=${CLICKHOUSE_TEST_UNIQUE_NAME}
QUERIES_FILE=02226_filesystem_cache_profile_events.queries
TEST_FILE=$CUR_DIR/filesystem_cache_queries/$QUERIES_FILE
#for storagePolicy in 's3_cache'; do
echo "Using storage policy: $storagePolicy"
cat $TEST_FILE | sed -e "s/_storagePolicy/s3_cache/" > $TMP_PATH
${CLICKHOUSE_CLIENT} --queries-file $TMP_PATH
rm $TMP_PATH
echo
for storagePolicy in 's3_cache' 'local_cache'; do
echo "Using storage policy: $storagePolicy"
cat $TEST_FILE | sed -e "s/_storagePolicy/${storagePolicy}/" > $TMP_PATH
${CLICKHOUSE_CLIENT} --queries-file $TMP_PATH
rm $TMP_PATH
echo
done

View File

@ -37,11 +37,8 @@ FROM (
system.remote_data_paths
) AS data_paths
INNER JOIN system.filesystem_cache AS caches
ON data_paths.cache_path = caches.cache_path
FORMAT Vertical;
Row 1:
──────
count(): 1
ON data_paths.cache_path = caches.cache_path;
1
DROP TABLE test NO DELAY;
SELECT count() FROM system.filesystem_cache;
0
@ -55,11 +52,8 @@ FROM (
system.remote_data_paths
) AS data_paths
INNER JOIN system.filesystem_cache AS caches
ON data_paths.cache_path = caches.cache_path
FORMAT Vertical;
Row 1:
──────
count(): 0
ON data_paths.cache_path = caches.cache_path;
0
DROP TABLE IF EXISTS test2;
CREATE TABLE test2 (key UInt32, value String)
Engine=MergeTree()
@ -112,11 +106,8 @@ FROM (
system.remote_data_paths
) AS data_paths
INNER JOIN system.filesystem_cache AS caches
ON data_paths.cache_path = caches.cache_path
FORMAT Vertical;
Row 1:
──────
count(): 0
ON data_paths.cache_path = caches.cache_path;
0
DROP TABLE test NO DELAY;
SELECT count() FROM system.filesystem_cache;
0
@ -130,11 +121,8 @@ FROM (
system.remote_data_paths
) AS data_paths
INNER JOIN system.filesystem_cache AS caches
ON data_paths.cache_path = caches.cache_path
FORMAT Vertical;
Row 1:
──────
count(): 0
ON data_paths.cache_path = caches.cache_path;
0
DROP TABLE IF EXISTS test2;
CREATE TABLE test2 (key UInt32, value String)
Engine=MergeTree()

View File

@ -39,8 +39,7 @@ FROM (
system.remote_data_paths
) AS data_paths
INNER JOIN system.filesystem_cache AS caches
ON data_paths.cache_path = caches.cache_path
FORMAT Vertical;
ON data_paths.cache_path = caches.cache_path;
DROP TABLE test NO DELAY;
SELECT count() FROM system.filesystem_cache;
@ -54,8 +53,7 @@ FROM (
system.remote_data_paths
) AS data_paths
INNER JOIN system.filesystem_cache AS caches
ON data_paths.cache_path = caches.cache_path
FORMAT Vertical;
ON data_paths.cache_path = caches.cache_path;
DROP TABLE IF EXISTS test2;