This commit is contained in:
kssenii 2022-03-23 15:35:15 +01:00
parent f3e1ca44a9
commit 34c0690a69
11 changed files with 122 additions and 34 deletions

View File

@ -121,6 +121,10 @@ public:
String getInfoForLog() const;
size_t hits() const { return hits_num; }
void hit() { ++hits_num; }
private:
size_t availableSize() const { return reserved_size - downloaded_size; }
bool lastFileSegmentHolder() const;
@ -162,6 +166,7 @@ private:
bool detached = false;
std::atomic<bool> is_downloaded{false};
std::atomic<size_t> hits_num = 0; /// cache hits.
};
struct FileSegmentsHolder : private boost::noncopyable

View File

@ -365,9 +365,14 @@ bool CachedReadBufferFromRemoteFS::completeFileSegmentAndGetNext()
if (current_file_segment_it == file_segments_holder->file_segments.end())
return false;
implementation_buffer = getImplementationBuffer(*current_file_segment_it);
file_segment = *current_file_segment_it;
LOG_TEST(log, "New segment: {}", (*current_file_segment_it)->range().toString());
implementation_buffer = getImplementationBuffer(file_segment);
if (read_type == ReadType::CACHED)
file_segment->hit();
LOG_TEST(log, "New segment: {}", file_segment->range().toString());
return true;
}
@ -583,6 +588,9 @@ bool CachedReadBufferFromRemoteFS::nextImplStep()
else
{
implementation_buffer = getImplementationBuffer(*current_file_segment_it);
if (read_type == ReadType::CACHED)
(*current_file_segment_it)->hit();
}
assert(!internal_buffer.empty());

View File

@ -46,6 +46,8 @@
#include <aws/s3/model/AbortMultipartUploadRequest.h>
#include <Common/FileCache.h>
#include <Common/hex.h>
namespace DB
{
@ -291,7 +293,9 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
});
};
bool cache_on_insert = write_settings.remote_fs_cache_on_insert || FileCacheFactory::instance().getSettings(getCachePath()).cache_on_insert;
bool cache_on_insert = fs::path(path).extension() != ".tmp"
&& write_settings.remote_fs_cache_on_insert
&& FileCacheFactory::instance().getSettings(getCacheBasePath()).cache_on_insert;
auto s3_buffer = std::make_unique<WriteBufferFromS3>(
settings->client,
@ -302,7 +306,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
settings->s3_upload_part_size_multiply_parts_count_threshold,
settings->s3_max_single_part_upload_size,
std::move(object_metadata),
buf_size, std::move(schedule), cache_on_insert ? cache : nullptr);
buf_size, std::move(schedule), blob_name, cache_on_insert ? cache : nullptr);
auto create_metadata_callback = [this, path, blob_name, mode] (size_t count)
{

View File

@ -62,6 +62,7 @@ WriteBufferFromS3::WriteBufferFromS3(
std::optional<std::map<String, String>> object_metadata_,
size_t buffer_size_,
ScheduleFunc schedule_,
const String & blob_name_,
FileCachePtr cache_)
: BufferWithOwnMemory<WriteBuffer>(buffer_size_, nullptr, 0)
, bucket(bucket_)
@ -73,6 +74,7 @@ WriteBufferFromS3::WriteBufferFromS3(
, upload_part_size_multiply_threshold(upload_part_size_multiply_threshold_)
, max_single_part_upload_size(max_single_part_upload_size_)
, schedule(std::move(schedule_))
, blob_name(blob_name_)
, cache(cache_)
{
allocateBuffer();
@ -92,10 +94,11 @@ void WriteBufferFromS3::nextImpl()
if (cacheEnabled())
{
auto cache_key = cache->hash(key);
if (blob_name.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty blob name");
auto cache_key = cache->hash(blob_name);
auto file_segments_holder = cache->setDownloading(cache_key, current_download_offset, size);
assert(file_segments_holder.file_segments.back()->range().right - file_segments_holder.file_segments.begin()->range().left + 1 == size);
size_t remaining_size = size;
for (const auto & file_segment : file_segments_holder.file_segments)

View File

@ -56,6 +56,7 @@ public:
std::optional<std::map<String, String>> object_metadata_ = std::nullopt,
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
ScheduleFunc schedule_ = {},
const String & blob_name = "",
FileCachePtr cache_ = nullptr);
~WriteBufferFromS3() override;
@ -121,8 +122,8 @@ private:
Poco::Logger * log = &Poco::Logger::get("WriteBufferFromS3");
const String blob_name;
FileCachePtr cache;
std::unique_ptr<WriteBufferFromFile> cache_writer;
size_t current_download_offset = 0;
};

View File

@ -58,10 +58,8 @@ Pipe StorageSystemDisks::read(
String cache_path;
if (disk_ptr->isRemote())
{
const auto * remote_disk = assert_cast<IDiskRemote *>(disk_ptr.get());
cache_path = remote_disk->getCachePath();
}
cache_path = disk_ptr->getCacheBasePath();
col_cache_path->insert(cache_path);
}

View File

@ -19,6 +19,8 @@ NamesAndTypesList StorageSystemRemoteFilesystemCache::getNamesAndTypes()
{"cache_path", std::make_shared<DataTypeString>()},
{"file_segment_range", std::make_shared<DataTypeTuple>(DataTypes{std::make_shared<DataTypeUInt64>(), std::make_shared<DataTypeUInt64>()})},
{"size", std::make_shared<DataTypeUInt64>()},
{"state", std::make_shared<DataTypeString>()},
{"cache_hits", std::make_shared<DataTypeUInt64>()},
};
}
@ -44,6 +46,8 @@ void StorageSystemRemoteFilesystemCache::fillData(MutableColumns & res_columns,
const auto & range = file_segment->range();
res_columns[2]->insert(Tuple({range.left, range.right}));
res_columns[3]->insert(range.size());
res_columns[4]->insert(FileSegment::stateToString(file_segment->state()));
res_columns[5]->insert(file_segment->hits());
}
}
}

View File

@ -6,6 +6,22 @@
namespace DB
{
/**
* SELECT
* cache_path,
* local_path,
* remote_path
* FROM
* (
* SELECT
* arrayJoin(cache_paths) AS cache_path,
* local_path,
* remote_path
* FROM system.remote_data_paths
* ) AS data_paths
* INNER JOIN system.remote_filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path
*/
class StorageSystemRemoteFilesystemCache final : public shared_ptr_helper<StorageSystemRemoteFilesystemCache>,
public IStorageSystemOneBlock<StorageSystemRemoteFilesystemCache>
{
@ -16,7 +32,7 @@ public:
static NamesAndTypesList getNamesAndTypes();
protected:
StorageSystemRemoteFilesystemCache(const StorageID & table_id_);
explicit StorageSystemRemoteFilesystemCache(const StorageID & table_id_);
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override;
};

View File

@ -69,6 +69,7 @@
#include <Storages/System/StorageSystemPrivileges.h>
#include <Storages/System/StorageSystemAsynchronousInserts.h>
#include <Storages/System/StorageSystemRemoteFilesystemCache.h>
#include <Storages/System/StorageSystemRemoteDataPaths.h>
#ifdef OS_LINUX
#include <Storages/System/StorageSystemStackTrace.h>
@ -161,6 +162,7 @@ void attachSystemTablesServer(ContextPtr context, IDatabase & system_database, b
attach<StorageSystemPartMovesBetweenShards>(context, system_database, "part_moves_between_shards");
attach<StorageSystemAsynchronousInserts>(context, system_database, "asynchronous_inserts");
attach<StorageSystemRemoteFilesystemCache>(context, system_database, "remote_filesystem_cache");
attach<StorageSystemRemoteDataPaths>(context, system_database, "remote_data_paths");
if (has_zookeeper)
attach<StorageSystemZooKeeper>(context, system_database, "zookeeper");

View File

@ -1,18 +1,45 @@
-- { echo }
SET remote_fs_cache_on_insert=1;
DROP TABLE IF EXISTS test;
SYSTEM DROP REMOTE FILESYSTEM CACHE;
CREATE TABLE test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3_cache';
INSERT INTO test SELECT number, toString(number) FROM numbers(100) SETTINGS remote_fs_cache_on_insert=1;
SELECT count() FROM system.remote_filesystem_cache;
8
SELECT * FROM test FORMAT Null;
SELECT count() size FROM system.remote_filesystem_cache;
9
SYSTEM DROP REMOTE FILESYSTEM CACHE;
INSERT INTO test SELECT number, toString(number) FROM numbers(100, 100);
SELECT file_segment_range, size, state FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.remote_filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path FORMAT Vertical;
SELECT file_segment_range, size, state FROM system.remote_filesystem_cache format Vertical;
INSERT INTO test SELECT number, toString(number) FROM numbers(100);
SELECT file_segment_range, size, state FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.remote_filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path FORMAT Vertical;
Row 1:
──────
file_segment_range: (0,745)
size: 746
state: DOWNLOADED
SELECT file_segment_range, size, state FROM system.remote_filesystem_cache format Vertical;
Row 1:
──────
file_segment_range: (0,745)
size: 746
state: DOWNLOADED
SELECT cache_hits FROM system.remote_filesystem_cache;
0
SELECT * FROM test FORMAT Null;
SELECT cache_hits FROM system.remote_filesystem_cache;
1
SELECT * FROM test FORMAT Null;
SELECT cache_hits FROM system.remote_filesystem_cache;
2
SELECT count() size FROM system.remote_filesystem_cache;
7
INSERT INTO test SELECT number, toString(number) FROM numbers(100) SETTINGS remote_fs_cache_on_insert=0;
SELECT count() size FROM system.remote_filesystem_cache;
14
1
SYSTEM DROP REMOTE FILESYSTEM CACHE;
INSERT INTO test SELECT number, toString(number) FROM numbers(100, 200);
SELECT file_segment_range, size, state FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.remote_filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path ORDER BY size FORMAT Vertical;
Row 1:
──────
file_segment_range: (0,1659)
size: 1660
state: DOWNLOADED
SELECT file_segment_range, size, state FROM system.remote_filesystem_cache format Vertical;
Row 1:
──────
file_segment_range: (0,1659)
size: 1660
state: DOWNLOADED

View File

@ -2,16 +2,36 @@
-- { echo }
DROP TABLE IF EXISTS test;
SYSTEM DROP REMOTE FILESYSTEM CACHE;
CREATE TABLE test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3_cache';
INSERT INTO test SELECT number, toString(number) FROM numbers(100) SETTINGS remote_fs_cache_on_insert=1;
SET remote_fs_cache_on_insert=1;
DROP TABLE IF EXISTS test;
CREATE TABLE test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3_cache';
SELECT count() FROM system.remote_filesystem_cache;
SELECT * FROM test FORMAT Null;
SELECT count() size FROM system.remote_filesystem_cache;
SYSTEM DROP REMOTE FILESYSTEM CACHE;
INSERT INTO test SELECT number, toString(number) FROM numbers(100, 100);
SELECT count() size FROM system.remote_filesystem_cache;
INSERT INTO test SELECT number, toString(number) FROM numbers(100) SETTINGS remote_fs_cache_on_insert=0; -- still writes cache because now config setting is used
SELECT file_segment_range, size, state FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.remote_filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path FORMAT Vertical;
SELECT file_segment_range, size, state FROM system.remote_filesystem_cache format Vertical;
INSERT INTO test SELECT number, toString(number) FROM numbers(100);
SELECT file_segment_range, size, state FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.remote_filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path FORMAT Vertical;
SELECT file_segment_range, size, state FROM system.remote_filesystem_cache format Vertical;
SELECT cache_hits FROM system.remote_filesystem_cache;
SELECT * FROM test FORMAT Null;
SELECT cache_hits FROM system.remote_filesystem_cache;
SELECT * FROM test FORMAT Null;
SELECT cache_hits FROM system.remote_filesystem_cache;
SELECT count() size FROM system.remote_filesystem_cache;
SYSTEM DROP REMOTE FILESYSTEM CACHE;
INSERT INTO test SELECT number, toString(number) FROM numbers(100, 200);
SELECT file_segment_range, size, state FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.remote_filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path ORDER BY size FORMAT Vertical;
SELECT file_segment_range, size, state FROM system.remote_filesystem_cache format Vertical;
-- INSERT INTO test SELECT number, toString(number) FROM numbers(100) SETTINGS remote_fs_cache_on_insert=0; -- still writes cache because now config setting is used