mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
Merge branch 'master' into pod-array-assign-empty-array-fix
This commit is contained in:
commit
5dd4f4b10f
2
contrib/arrow
vendored
2
contrib/arrow
vendored
@ -1 +1 @@
|
||||
Subproject commit efdcd015cfdee1b6aa349c9ca227ca12c3d697f5
|
||||
Subproject commit 450a5638704386356f8e520080468fc9bc8bcaf8
|
@ -58,6 +58,7 @@ if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]
|
||||
--tcp_port 19000 --tcp_port_secure 19440 --http_port 18123 --https_port 18443 --interserver_http_port 19009 --tcp_with_proxy_port 19010 \
|
||||
--mysql_port 19004 --postgresql_port 19005 \
|
||||
--keeper_server.tcp_port 19181 --keeper_server.server_id 2 \
|
||||
--prometheus.port 19988 \
|
||||
--macros.replica r2 # It doesn't work :(
|
||||
|
||||
mkdir -p /var/run/clickhouse-server2
|
||||
@ -69,6 +70,7 @@ if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]
|
||||
--tcp_port 29000 --tcp_port_secure 29440 --http_port 28123 --https_port 28443 --interserver_http_port 29009 --tcp_with_proxy_port 29010 \
|
||||
--mysql_port 29004 --postgresql_port 29005 \
|
||||
--keeper_server.tcp_port 29181 --keeper_server.server_id 3 \
|
||||
--prometheus.port 29988 \
|
||||
--macros.shard s2 # It doesn't work :(
|
||||
|
||||
MAX_RUN_TIME=$((MAX_RUN_TIME < 9000 ? MAX_RUN_TIME : 9000)) # min(MAX_RUN_TIME, 2.5 hours)
|
||||
|
@ -86,7 +86,7 @@ def process_test_log(log_path):
|
||||
test_end = True
|
||||
|
||||
test_results = [
|
||||
(test[0], test[1], test[2], "".join(test[3]))[:4096] for test in test_results
|
||||
(test[0], test[1], test[2], "".join(test[3])[:4096]) for test in test_results
|
||||
]
|
||||
|
||||
return (
|
||||
|
@ -55,9 +55,9 @@ Differs from ‘intDiv’ in that it returns zero when dividing by zero or when
|
||||
|
||||
## modulo(a, b), a % b operator
|
||||
|
||||
Calculates the remainder after division.
|
||||
If arguments are floating-point numbers, they are pre-converted to integers by dropping the decimal portion.
|
||||
The remainder is taken in the same sense as in C++. Truncated division is used for negative numbers.
|
||||
Calculates the remainder when dividing `a` by `b`.
|
||||
The result type is an integer if both inputs are integers. If one of the inputs is a floating-point number, the result is a floating-point number.
|
||||
The remainder is computed like in C++. Truncated division is used for negative numbers.
|
||||
An exception is thrown when dividing by zero or when dividing a minimal negative number by minus one.
|
||||
|
||||
## moduloOrZero(a, b)
|
||||
|
@ -1241,7 +1241,7 @@ Same as for [parseDateTime64BestEffort](#parsedatetime64besteffort), except that
|
||||
|
||||
## toLowCardinality
|
||||
|
||||
Converts input parameter to the [LowCardianlity](../../sql-reference/data-types/lowcardinality.md) version of same data type.
|
||||
Converts input parameter to the [LowCardinality](../../sql-reference/data-types/lowcardinality.md) version of same data type.
|
||||
|
||||
To convert data from the `LowCardinality` data type use the [CAST](#type_conversion_function-cast) function. For example, `CAST(x as String)`.
|
||||
|
||||
|
@ -56,7 +56,7 @@ SELECT toTypeName(0), toTypeName(0 + 0), toTypeName(0 + 0 + 0), toTypeName(0 + 0
|
||||
## modulo(a, b), оператор a % b {#modulo}
|
||||
|
||||
Вычисляет остаток от деления.
|
||||
Если аргументы - числа с плавающей запятой, то они предварительно преобразуются в целые числа, путём отбрасывания дробной части.
|
||||
Тип результата - целое число, если оба аргумента - целые числа. Если один из аргументов является числом с плавающей точкой, результатом будет число с плавающей точкой.
|
||||
Берётся остаток в том же смысле, как это делается в C++. По факту, для отрицательных чисел, используется truncated division.
|
||||
При делении на ноль или при делении минимального отрицательного числа на минус единицу, кидается исключение.
|
||||
|
||||
|
@ -1162,7 +1162,7 @@ FORMAT PrettyCompactMonoBlock;
|
||||
|
||||
## toLowCardinality {#tolowcardinality}
|
||||
|
||||
Преобразует входные данные в версию [LowCardianlity](../data-types/lowcardinality.md) того же типа данных.
|
||||
Преобразует входные данные в версию [LowCardinality](../data-types/lowcardinality.md) того же типа данных.
|
||||
|
||||
Чтобы преобразовать данные из типа `LowCardinality`, используйте функцию [CAST](#type_conversion_function-cast). Например, `CAST(x as String)`.
|
||||
|
||||
|
@ -54,7 +54,7 @@ SELECT toTypeName(0), toTypeName(0 + 0), toTypeName(0 + 0 + 0), toTypeName(0 + 0
|
||||
## modulo(a, b), a % b operator {#modulo}
|
||||
|
||||
计算除法后的余数。
|
||||
如果参数是浮点数,则通过删除小数部分将它们预转换为整数。
|
||||
如果两个输入都是整数,结果类型是整数。如果其中一个输入是浮点数,则结果是浮点数。
|
||||
其余部分与C++中的含义相同。截断除法用于负数。
|
||||
除以零或将最小负数除以-1时抛出异常。
|
||||
|
||||
|
@ -512,7 +512,7 @@ SELECT parseDateTimeBestEffort('10 20:19')
|
||||
|
||||
## toLowCardinality {#tolowcardinality}
|
||||
|
||||
把输入值转换为[LowCardianlity](../data-types/lowcardinality.md)的相同类型的数据。
|
||||
把输入值转换为[LowCardinality](../data-types/lowcardinality.md)的相同类型的数据。
|
||||
|
||||
如果要把`LowCardinality`类型的数据转换为其他类型,使用[CAST](#type_conversion_function-cast)函数。比如:`CAST(x as String)`。
|
||||
|
||||
|
@ -1,8 +1,7 @@
|
||||
#include "LRUFileCache.h"
|
||||
#include "FileCache.h"
|
||||
|
||||
#include <Common/randomSeed.h>
|
||||
#include <Common/SipHash.h>
|
||||
#include <Common/hex.h>
|
||||
#include <Common/FileCacheSettings.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/WriteBufferFromFile.h>
|
||||
@ -11,6 +10,7 @@
|
||||
#include <IO/Operators.h>
|
||||
#include <pcg-random/pcg_random.hpp>
|
||||
#include <filesystem>
|
||||
#include <Common/LRUFileCachePriority.h>
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
@ -22,16 +22,188 @@ namespace ErrorCodes
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
LRUFileCache::LRUFileCache(const String & cache_base_path_, const FileCacheSettings & cache_settings_)
|
||||
: IFileCache(cache_base_path_, cache_settings_)
|
||||
FileCache::FileCache(
|
||||
const String & cache_base_path_,
|
||||
const FileCacheSettings & cache_settings_)
|
||||
: cache_base_path(cache_base_path_)
|
||||
, max_size(cache_settings_.max_size)
|
||||
, max_element_size(cache_settings_.max_elements)
|
||||
, max_file_segment_size(cache_settings_.max_file_segment_size)
|
||||
, enable_filesystem_query_cache_limit(cache_settings_.enable_filesystem_query_cache_limit)
|
||||
, main_priority(std::make_unique<LRUFileCachePriority>())
|
||||
, stash_priority(std::make_unique<LRUFileCachePriority>())
|
||||
, max_stash_element_size(cache_settings_.max_elements)
|
||||
, enable_cache_hits_threshold(cache_settings_.enable_cache_hits_threshold)
|
||||
, log(&Poco::Logger::get("LRUFileCache"))
|
||||
, log(&Poco::Logger::get("FileCache"))
|
||||
, allow_to_remove_persistent_segments_from_cache_by_default(cache_settings_.allow_to_remove_persistent_segments_from_cache_by_default)
|
||||
{
|
||||
}
|
||||
|
||||
void LRUFileCache::initialize()
|
||||
FileCache::Key FileCache::hash(const String & path)
|
||||
{
|
||||
return Key(sipHash128(path.data(), path.size()));
|
||||
}
|
||||
|
||||
String FileCache::getPathInLocalCache(const Key & key, size_t offset, bool is_persistent) const
|
||||
{
|
||||
auto key_str = key.toString();
|
||||
return fs::path(cache_base_path)
|
||||
/ key_str.substr(0, 3)
|
||||
/ key_str
|
||||
/ (std::to_string(offset) + (is_persistent ? "_persistent" : ""));
|
||||
}
|
||||
|
||||
String FileCache::getPathInLocalCache(const Key & key) const
|
||||
{
|
||||
auto key_str = key.toString();
|
||||
return fs::path(cache_base_path) / key_str.substr(0, 3) / key_str;
|
||||
}
|
||||
|
||||
static bool isQueryInitialized()
|
||||
{
|
||||
return CurrentThread::isInitialized()
|
||||
&& CurrentThread::get().getQueryContext()
|
||||
&& !CurrentThread::getQueryId().empty();
|
||||
}
|
||||
|
||||
bool FileCache::isReadOnly()
|
||||
{
|
||||
return !isQueryInitialized();
|
||||
}
|
||||
|
||||
void FileCache::assertInitialized() const
|
||||
{
|
||||
if (!is_initialized)
|
||||
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Cache not initialized");
|
||||
}
|
||||
|
||||
FileCache::QueryContextPtr FileCache::getCurrentQueryContext(std::lock_guard<std::mutex> & cache_lock)
|
||||
{
|
||||
if (!isQueryInitialized())
|
||||
return nullptr;
|
||||
|
||||
return getQueryContext(std::string(CurrentThread::getQueryId()), cache_lock);
|
||||
}
|
||||
|
||||
FileCache::QueryContextPtr FileCache::getQueryContext(const String & query_id, std::lock_guard<std::mutex> & /* cache_lock */)
|
||||
{
|
||||
auto query_iter = query_map.find(query_id);
|
||||
return (query_iter == query_map.end()) ? nullptr : query_iter->second;
|
||||
}
|
||||
|
||||
void FileCache::removeQueryContext(const String & query_id)
|
||||
{
|
||||
std::lock_guard cache_lock(mutex);
|
||||
auto query_iter = query_map.find(query_id);
|
||||
|
||||
if (query_iter == query_map.end())
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::LOGICAL_ERROR,
|
||||
"Attempt to release query context that does not exist (query_id: {})",
|
||||
query_id);
|
||||
}
|
||||
|
||||
query_map.erase(query_iter);
|
||||
}
|
||||
|
||||
FileCache::QueryContextPtr FileCache::getOrSetQueryContext(
|
||||
const String & query_id, const ReadSettings & settings, std::lock_guard<std::mutex> & cache_lock)
|
||||
{
|
||||
if (query_id.empty())
|
||||
return nullptr;
|
||||
|
||||
auto context = getQueryContext(query_id, cache_lock);
|
||||
if (context)
|
||||
return context;
|
||||
|
||||
auto query_context = std::make_shared<QueryContext>(settings.max_query_cache_size, settings.skip_download_if_exceeds_query_cache);
|
||||
auto query_iter = query_map.emplace(query_id, query_context).first;
|
||||
return query_iter->second;
|
||||
}
|
||||
|
||||
FileCache::QueryContextHolder FileCache::getQueryContextHolder(const String & query_id, const ReadSettings & settings)
|
||||
{
|
||||
std::lock_guard cache_lock(mutex);
|
||||
|
||||
if (!enable_filesystem_query_cache_limit || settings.max_query_cache_size == 0)
|
||||
return {};
|
||||
|
||||
/// if enable_filesystem_query_cache_limit is true, and max_query_cache_size large than zero,
|
||||
/// we create context query for current query.
|
||||
auto context = getOrSetQueryContext(query_id, settings, cache_lock);
|
||||
return QueryContextHolder(query_id, this, context);
|
||||
}
|
||||
|
||||
void FileCache::QueryContext::remove(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock)
|
||||
{
|
||||
if (cache_size < size)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Deleted cache size exceeds existing cache size");
|
||||
|
||||
if (!skip_download_if_exceeds_query_cache)
|
||||
{
|
||||
auto record = records.find({key, offset});
|
||||
if (record != records.end())
|
||||
{
|
||||
record->second->removeAndGetNext(cache_lock);
|
||||
records.erase({key, offset});
|
||||
}
|
||||
}
|
||||
cache_size -= size;
|
||||
}
|
||||
|
||||
void FileCache::QueryContext::reserve(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock)
|
||||
{
|
||||
if (cache_size + size > max_cache_size)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::LOGICAL_ERROR,
|
||||
"Reserved cache size exceeds the remaining cache size (key: {}, offset: {})",
|
||||
key.toString(), offset);
|
||||
}
|
||||
|
||||
if (!skip_download_if_exceeds_query_cache)
|
||||
{
|
||||
auto record = records.find({key, offset});
|
||||
if (record == records.end())
|
||||
{
|
||||
auto queue_iter = priority->add(key, offset, 0, cache_lock);
|
||||
record = records.insert({{key, offset}, queue_iter}).first;
|
||||
}
|
||||
record->second->incrementSize(size, cache_lock);
|
||||
}
|
||||
cache_size += size;
|
||||
}
|
||||
|
||||
void FileCache::QueryContext::use(const Key & key, size_t offset, std::lock_guard<std::mutex> & cache_lock)
|
||||
{
|
||||
if (skip_download_if_exceeds_query_cache)
|
||||
return;
|
||||
|
||||
auto record = records.find({key, offset});
|
||||
if (record != records.end())
|
||||
record->second->use(cache_lock);
|
||||
}
|
||||
|
||||
FileCache::QueryContextHolder::QueryContextHolder(
|
||||
const String & query_id_,
|
||||
FileCache * cache_,
|
||||
FileCache::QueryContextPtr context_)
|
||||
: query_id(query_id_)
|
||||
, cache(cache_)
|
||||
, context(context_)
|
||||
{
|
||||
}
|
||||
|
||||
FileCache::QueryContextHolder::~QueryContextHolder()
|
||||
{
|
||||
/// If only the query_map and the current holder hold the context_query,
|
||||
/// the query has been completed and the query_context is released.
|
||||
if (context && context.use_count() == 2)
|
||||
cache->removeQueryContext(query_id);
|
||||
}
|
||||
|
||||
void FileCache::initialize()
|
||||
{
|
||||
std::lock_guard cache_lock(mutex);
|
||||
if (!is_initialized)
|
||||
@ -55,8 +227,8 @@ void LRUFileCache::initialize()
|
||||
}
|
||||
}
|
||||
|
||||
void LRUFileCache::useCell(
|
||||
const FileSegmentCell & cell, FileSegments & result, std::lock_guard<std::mutex> & cache_lock)
|
||||
void FileCache::useCell(
|
||||
const FileSegmentCell & cell, FileSegments & result, std::lock_guard<std::mutex> & cache_lock) const
|
||||
{
|
||||
auto file_segment = cell.file_segment;
|
||||
|
||||
@ -75,11 +247,11 @@ void LRUFileCache::useCell(
|
||||
if (cell.queue_iterator)
|
||||
{
|
||||
/// Move to the end of the queue. The iterator remains valid.
|
||||
queue.moveToEnd(*cell.queue_iterator, cache_lock);
|
||||
cell.queue_iterator->use(cache_lock);
|
||||
}
|
||||
}
|
||||
|
||||
LRUFileCache::FileSegmentCell * LRUFileCache::getCell(
|
||||
FileCache::FileSegmentCell * FileCache::getCell(
|
||||
const Key & key, size_t offset, std::lock_guard<std::mutex> & /* cache_lock */)
|
||||
{
|
||||
auto it = files.find(key);
|
||||
@ -94,7 +266,7 @@ LRUFileCache::FileSegmentCell * LRUFileCache::getCell(
|
||||
return &cell_it->second;
|
||||
}
|
||||
|
||||
FileSegments LRUFileCache::getImpl(
|
||||
FileSegments FileCache::getImpl(
|
||||
const Key & key, const FileSegment::Range & range, std::lock_guard<std::mutex> & cache_lock)
|
||||
{
|
||||
/// Given range = [left, right] and non-overlapping ordered set of file segments,
|
||||
@ -112,7 +284,7 @@ FileSegments LRUFileCache::getImpl(
|
||||
files.erase(key);
|
||||
|
||||
/// Note: it is guaranteed that there is no concurrency with files deletion,
|
||||
/// because cache files are deleted only inside IFileCache and under cache lock.
|
||||
/// because cache files are deleted only inside FileCache and under cache lock.
|
||||
if (fs::exists(key_path))
|
||||
fs::remove_all(key_path);
|
||||
|
||||
@ -150,7 +322,6 @@ FileSegments LRUFileCache::getImpl(
|
||||
/// [___________
|
||||
/// ^
|
||||
/// range.left
|
||||
|
||||
useCell(prev_cell, result, cache_lock);
|
||||
}
|
||||
}
|
||||
@ -175,7 +346,7 @@ FileSegments LRUFileCache::getImpl(
|
||||
return result;
|
||||
}
|
||||
|
||||
FileSegments LRUFileCache::splitRangeIntoCells(
|
||||
FileSegments FileCache::splitRangeIntoCells(
|
||||
const Key & key, size_t offset, size_t size, FileSegment::State state, bool is_persistent, std::lock_guard<std::mutex> & cache_lock)
|
||||
{
|
||||
assert(size > 0);
|
||||
@ -204,7 +375,7 @@ FileSegments LRUFileCache::splitRangeIntoCells(
|
||||
return file_segments;
|
||||
}
|
||||
|
||||
void LRUFileCache::fillHolesWithEmptyFileSegments(
|
||||
void FileCache::fillHolesWithEmptyFileSegments(
|
||||
FileSegments & file_segments,
|
||||
const Key & key,
|
||||
const FileSegment::Range & range,
|
||||
@ -298,7 +469,7 @@ void LRUFileCache::fillHolesWithEmptyFileSegments(
|
||||
}
|
||||
}
|
||||
|
||||
FileSegmentsHolder LRUFileCache::getOrSet(const Key & key, size_t offset, size_t size, bool is_persistent)
|
||||
FileSegmentsHolder FileCache::getOrSet(const Key & key, size_t offset, size_t size, bool is_persistent)
|
||||
{
|
||||
assertInitialized();
|
||||
|
||||
@ -326,7 +497,7 @@ FileSegmentsHolder LRUFileCache::getOrSet(const Key & key, size_t offset, size_t
|
||||
return FileSegmentsHolder(std::move(file_segments));
|
||||
}
|
||||
|
||||
FileSegmentsHolder LRUFileCache::get(const Key & key, size_t offset, size_t size)
|
||||
FileSegmentsHolder FileCache::get(const Key & key, size_t offset, size_t size)
|
||||
{
|
||||
assertInitialized();
|
||||
|
||||
@ -358,7 +529,7 @@ FileSegmentsHolder LRUFileCache::get(const Key & key, size_t offset, size_t size
|
||||
return FileSegmentsHolder(std::move(file_segments));
|
||||
}
|
||||
|
||||
LRUFileCache::FileSegmentCell * LRUFileCache::addCell(
|
||||
FileCache::FileSegmentCell * FileCache::addCell(
|
||||
const Key & key, size_t offset, size_t size,
|
||||
FileSegment::State state, bool is_persistent,
|
||||
std::lock_guard<std::mutex> & cache_lock)
|
||||
@ -379,30 +550,27 @@ LRUFileCache::FileSegmentCell * LRUFileCache::addCell(
|
||||
FileSegment::State result_state = state;
|
||||
if (state == FileSegment::State::EMPTY && enable_cache_hits_threshold)
|
||||
{
|
||||
auto record = records.find({key, offset});
|
||||
auto record = stash_records.find({key, offset});
|
||||
|
||||
if (record == records.end())
|
||||
if (record == stash_records.end())
|
||||
{
|
||||
auto queue_iter = stash_queue.add(key, offset, 0, cache_lock);
|
||||
records.insert({{key, offset}, queue_iter});
|
||||
auto priority_iter = stash_priority->add(key, offset, 0, cache_lock);
|
||||
stash_records.insert({{key, offset}, priority_iter});
|
||||
|
||||
if (stash_queue.getElementsNum(cache_lock) > max_stash_element_size)
|
||||
if (stash_priority->getElementsNum(cache_lock) > max_stash_element_size)
|
||||
{
|
||||
auto remove_queue_iter = stash_queue.begin();
|
||||
records.erase({remove_queue_iter->key, remove_queue_iter->offset});
|
||||
stash_queue.remove(remove_queue_iter, cache_lock);
|
||||
auto remove_priority_iter = stash_priority->getLowestPriorityWriteIterator(cache_lock);
|
||||
stash_records.erase({remove_priority_iter->key(), remove_priority_iter->offset()});
|
||||
remove_priority_iter->removeAndGetNext(cache_lock);
|
||||
}
|
||||
|
||||
/// For segments that do not reach the download threshold, we do not download them, but directly read them
|
||||
result_state = FileSegment::State::SKIP_CACHE;
|
||||
}
|
||||
else
|
||||
{
|
||||
auto queue_iter = record->second;
|
||||
queue_iter->hits++;
|
||||
stash_queue.moveToEnd(queue_iter, cache_lock);
|
||||
|
||||
result_state = queue_iter->hits >= enable_cache_hits_threshold ? FileSegment::State::EMPTY : FileSegment::State::SKIP_CACHE;
|
||||
auto priority_iter = record->second;
|
||||
priority_iter->use(cache_lock);
|
||||
result_state = priority_iter->hits() >= enable_cache_hits_threshold ? FileSegment::State::EMPTY : FileSegment::State::SKIP_CACHE;
|
||||
}
|
||||
}
|
||||
|
||||
@ -429,7 +597,7 @@ LRUFileCache::FileSegmentCell * LRUFileCache::addCell(
|
||||
return &(it->second);
|
||||
}
|
||||
|
||||
FileSegmentsHolder LRUFileCache::setDownloading(
|
||||
FileSegmentsHolder FileCache::setDownloading(
|
||||
const Key & key,
|
||||
size_t offset,
|
||||
size_t size,
|
||||
@ -452,7 +620,7 @@ FileSegmentsHolder LRUFileCache::setDownloading(
|
||||
return FileSegmentsHolder(std::move(file_segments));
|
||||
}
|
||||
|
||||
bool LRUFileCache::tryReserve(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock)
|
||||
bool FileCache::tryReserve(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock)
|
||||
{
|
||||
auto query_context = enable_filesystem_query_cache_limit ? getCurrentQueryContext(cache_lock) : nullptr;
|
||||
if (!query_context)
|
||||
@ -473,40 +641,52 @@ bool LRUFileCache::tryReserve(const Key & key, size_t offset, size_t size, std::
|
||||
else
|
||||
{
|
||||
size_t removed_size = 0;
|
||||
size_t queue_size = queue.getElementsNum(cache_lock);
|
||||
size_t queue_size = main_priority->getElementsNum(cache_lock);
|
||||
|
||||
auto * cell_for_reserve = getCell(key, offset, cache_lock);
|
||||
|
||||
std::vector<IFileCache::LRUQueue::Iterator> ghost;
|
||||
struct Segment
|
||||
{
|
||||
Key key;
|
||||
size_t offset;
|
||||
size_t size;
|
||||
|
||||
Segment(Key key_, size_t offset_, size_t size_)
|
||||
: key(key_), offset(offset_), size(size_) {}
|
||||
};
|
||||
|
||||
std::vector<Segment> ghost;
|
||||
std::vector<FileSegmentCell *> trash;
|
||||
std::vector<FileSegmentCell *> to_evict;
|
||||
|
||||
auto is_overflow = [&]
|
||||
{
|
||||
return (max_size != 0 && queue.getTotalCacheSize(cache_lock) + size - removed_size > max_size)
|
||||
return (max_size != 0 && main_priority->getCacheSize(cache_lock) + size - removed_size > max_size)
|
||||
|| (max_element_size != 0 && queue_size > max_element_size)
|
||||
|| (query_context->getCacheSize() + size - removed_size > query_context->getMaxCacheSize());
|
||||
};
|
||||
|
||||
/// Select the cache from the LRU queue held by query for expulsion.
|
||||
for (auto iter = query_context->queue().begin(); iter != query_context->queue().end(); iter++)
|
||||
for (auto iter = query_context->getPriority()->getLowestPriorityWriteIterator(cache_lock); iter->valid();)
|
||||
{
|
||||
if (!is_overflow())
|
||||
break;
|
||||
|
||||
auto * cell = getCell(iter->key, iter->offset, cache_lock);
|
||||
auto * cell = getCell(iter->key(), iter->offset(), cache_lock);
|
||||
|
||||
if (!cell)
|
||||
{
|
||||
/// The cache corresponding to this record may be swapped out by
|
||||
/// other queries, so it has become invalid.
|
||||
ghost.push_back(iter);
|
||||
removed_size += iter->size;
|
||||
removed_size += iter->size();
|
||||
ghost.push_back(Segment(iter->key(), iter->offset(), iter->size()));
|
||||
/// next()
|
||||
iter->removeAndGetNext(cache_lock);
|
||||
}
|
||||
else
|
||||
{
|
||||
size_t cell_size = cell->size();
|
||||
assert(iter->size == cell_size);
|
||||
assert(iter->size() == cell_size);
|
||||
|
||||
if (cell->releasable())
|
||||
{
|
||||
@ -529,6 +709,8 @@ bool LRUFileCache::tryReserve(const Key & key, size_t offset, size_t size, std::
|
||||
removed_size += cell_size;
|
||||
--queue_size;
|
||||
}
|
||||
|
||||
iter->next();
|
||||
}
|
||||
}
|
||||
|
||||
@ -547,8 +729,8 @@ bool LRUFileCache::tryReserve(const Key & key, size_t offset, size_t size, std::
|
||||
remove_file_segment(file_segment, cell->size());
|
||||
}
|
||||
|
||||
for (auto & iter : ghost)
|
||||
query_context->remove(iter->key, iter->offset, iter->size, cache_lock);
|
||||
for (auto & entry : ghost)
|
||||
query_context->remove(entry.key, entry.offset, entry.size, cache_lock);
|
||||
|
||||
if (is_overflow())
|
||||
return false;
|
||||
@ -557,9 +739,9 @@ bool LRUFileCache::tryReserve(const Key & key, size_t offset, size_t size, std::
|
||||
{
|
||||
auto queue_iterator = cell_for_reserve->queue_iterator;
|
||||
if (queue_iterator)
|
||||
queue.incrementSize(*queue_iterator, size, cache_lock);
|
||||
queue_iterator->incrementSize(size, cache_lock);
|
||||
else
|
||||
cell_for_reserve->queue_iterator = queue.add(key, offset, size, cache_lock);
|
||||
cell_for_reserve->queue_iterator = main_priority->add(key, offset, size, cache_lock);
|
||||
}
|
||||
|
||||
for (auto & cell : to_evict)
|
||||
@ -573,11 +755,11 @@ bool LRUFileCache::tryReserve(const Key & key, size_t offset, size_t size, std::
|
||||
}
|
||||
}
|
||||
|
||||
bool LRUFileCache::tryReserveForMainList(
|
||||
bool FileCache::tryReserveForMainList(
|
||||
const Key & key, size_t offset, size_t size, QueryContextPtr query_context, std::lock_guard<std::mutex> & cache_lock)
|
||||
{
|
||||
auto removed_size = 0;
|
||||
size_t queue_size = queue.getElementsNum(cache_lock);
|
||||
size_t queue_size = main_priority->getElementsNum(cache_lock);
|
||||
assert(queue_size <= max_element_size);
|
||||
|
||||
/// Since space reservation is incremental, cache cell already exists if it's state is EMPTY.
|
||||
@ -592,15 +774,18 @@ bool LRUFileCache::tryReserveForMainList(
|
||||
auto is_overflow = [&]
|
||||
{
|
||||
/// max_size == 0 means unlimited cache size, max_element_size means unlimited number of cache elements.
|
||||
return (max_size != 0 && queue.getTotalCacheSize(cache_lock) + size - removed_size > max_size)
|
||||
return (max_size != 0 && main_priority->getCacheSize(cache_lock) + size - removed_size > max_size)
|
||||
|| (max_element_size != 0 && queue_size > max_element_size);
|
||||
};
|
||||
|
||||
std::vector<FileSegmentCell *> to_evict;
|
||||
std::vector<FileSegmentCell *> trash;
|
||||
|
||||
for (const auto & [entry_key, entry_offset, entry_size, _] : queue)
|
||||
for (auto it = main_priority->getLowestPriorityReadIterator(cache_lock); it->valid(); it->next())
|
||||
{
|
||||
const auto & entry_key = it->key();
|
||||
auto entry_offset = it->offset();
|
||||
|
||||
if (!is_overflow())
|
||||
break;
|
||||
|
||||
@ -612,7 +797,7 @@ bool LRUFileCache::tryReserveForMainList(
|
||||
key.toString(), offset);
|
||||
|
||||
size_t cell_size = cell->size();
|
||||
assert(entry_size == cell_size);
|
||||
assert(it->size() == cell_size);
|
||||
|
||||
/// It is guaranteed that cell is not removed from cache as long as
|
||||
/// pointer to corresponding file segment is hold by any other thread.
|
||||
@ -671,9 +856,9 @@ bool LRUFileCache::tryReserveForMainList(
|
||||
/// If queue iterator already exists, we need to update the size after each space reservation.
|
||||
auto queue_iterator = cell_for_reserve->queue_iterator;
|
||||
if (queue_iterator)
|
||||
queue.incrementSize(*queue_iterator, size, cache_lock);
|
||||
queue_iterator->incrementSize(size, cache_lock);
|
||||
else
|
||||
cell_for_reserve->queue_iterator = queue.add(key, offset, size, cache_lock);
|
||||
cell_for_reserve->queue_iterator = main_priority->add(key, offset, size, cache_lock);
|
||||
}
|
||||
|
||||
for (auto & cell : to_evict)
|
||||
@ -682,7 +867,7 @@ bool LRUFileCache::tryReserveForMainList(
|
||||
remove_file_segment(file_segment);
|
||||
}
|
||||
|
||||
if (queue.getTotalCacheSize(cache_lock) > (1ull << 63))
|
||||
if (main_priority->getCacheSize(cache_lock) > (1ull << 63))
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache became inconsistent. There must be a bug");
|
||||
|
||||
if (query_context)
|
||||
@ -691,7 +876,7 @@ bool LRUFileCache::tryReserveForMainList(
|
||||
return true;
|
||||
}
|
||||
|
||||
void LRUFileCache::removeIfExists(const Key & key)
|
||||
void FileCache::removeIfExists(const Key & key)
|
||||
{
|
||||
assertInitialized();
|
||||
|
||||
@ -742,7 +927,7 @@ void LRUFileCache::removeIfExists(const Key & key)
|
||||
}
|
||||
}
|
||||
|
||||
void LRUFileCache::removeIfReleasable(bool remove_persistent_files)
|
||||
void FileCache::removeIfReleasable(bool remove_persistent_files)
|
||||
{
|
||||
/// Try remove all cached files by cache_base_path.
|
||||
/// Only releasable file segments are evicted.
|
||||
@ -751,10 +936,12 @@ void LRUFileCache::removeIfReleasable(bool remove_persistent_files)
|
||||
|
||||
std::lock_guard cache_lock(mutex);
|
||||
|
||||
std::vector<FileSegment *> to_remove;
|
||||
for (auto it = queue.begin(); it != queue.end();)
|
||||
std::vector<FileSegmentPtr> to_remove;
|
||||
for (auto it = main_priority->getLowestPriorityReadIterator(cache_lock); it->valid(); it->next())
|
||||
{
|
||||
const auto & [key, offset, size, _] = *it++;
|
||||
const auto & key = it->key();
|
||||
auto offset = it->offset();
|
||||
|
||||
auto * cell = getCell(key, offset, cache_lock);
|
||||
if (!cell)
|
||||
throw Exception(
|
||||
@ -769,23 +956,28 @@ void LRUFileCache::removeIfReleasable(bool remove_persistent_files)
|
||||
|| remove_persistent_files
|
||||
|| allow_to_remove_persistent_segments_from_cache_by_default))
|
||||
{
|
||||
std::lock_guard segment_lock(file_segment->mutex);
|
||||
file_segment->detach(cache_lock, segment_lock);
|
||||
remove(file_segment->key(), file_segment->offset(), cache_lock, segment_lock);
|
||||
to_remove.emplace_back(file_segment);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (auto & file_segment : to_remove)
|
||||
{
|
||||
std::lock_guard segment_lock(file_segment->mutex);
|
||||
file_segment->detach(cache_lock, segment_lock);
|
||||
remove(file_segment->key(), file_segment->offset(), cache_lock, segment_lock);
|
||||
}
|
||||
|
||||
/// Remove all access information.
|
||||
records.clear();
|
||||
stash_queue.removeAll(cache_lock);
|
||||
stash_records.clear();
|
||||
stash_priority->removeAll(cache_lock);
|
||||
|
||||
#ifndef NDEBUG
|
||||
assertCacheCorrectness(cache_lock);
|
||||
#endif
|
||||
}
|
||||
|
||||
void LRUFileCache::remove(
|
||||
void FileCache::remove(
|
||||
Key key, size_t offset,
|
||||
std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & /* segment_lock */)
|
||||
{
|
||||
@ -799,7 +991,7 @@ void LRUFileCache::remove(
|
||||
|
||||
if (cell->queue_iterator)
|
||||
{
|
||||
queue.remove(*cell->queue_iterator, cache_lock);
|
||||
cell->queue_iterator->removeAndGetNext(cache_lock);
|
||||
}
|
||||
|
||||
auto & offsets = files[key];
|
||||
@ -831,15 +1023,14 @@ void LRUFileCache::remove(
|
||||
}
|
||||
}
|
||||
|
||||
void LRUFileCache::loadCacheInfoIntoMemory(std::lock_guard<std::mutex> & cache_lock)
|
||||
void FileCache::loadCacheInfoIntoMemory(std::lock_guard<std::mutex> & cache_lock)
|
||||
{
|
||||
Key key;
|
||||
UInt64 offset = 0;
|
||||
size_t size = 0;
|
||||
std::vector<std::pair<LRUQueue::Iterator, std::weak_ptr<FileSegment>>> queue_entries;
|
||||
std::vector<std::pair<IFileCachePriority::WriteIterator, std::weak_ptr<FileSegment>>> queue_entries;
|
||||
|
||||
/// cache_base_path / key_prefix / key / offset
|
||||
|
||||
if (!files.empty())
|
||||
throw Exception(
|
||||
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
|
||||
@ -888,7 +1079,7 @@ void LRUFileCache::loadCacheInfoIntoMemory(std::lock_guard<std::mutex> & cache_l
|
||||
{
|
||||
auto * cell = addCell(key, offset, size, FileSegment::State::DOWNLOADED, is_persistent, cache_lock);
|
||||
if (cell)
|
||||
queue_entries.emplace_back(*cell->queue_iterator, cell->file_segment);
|
||||
queue_entries.emplace_back(cell->queue_iterator, cell->file_segment);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -912,14 +1103,14 @@ void LRUFileCache::loadCacheInfoIntoMemory(std::lock_guard<std::mutex> & cache_l
|
||||
if (file_segment.expired())
|
||||
continue;
|
||||
|
||||
queue.moveToEnd(it, cache_lock);
|
||||
it->use(cache_lock);
|
||||
}
|
||||
#ifndef NDEBUG
|
||||
assertCacheCorrectness(cache_lock);
|
||||
#endif
|
||||
}
|
||||
|
||||
void LRUFileCache::reduceSizeToDownloaded(
|
||||
void FileCache::reduceSizeToDownloaded(
|
||||
const Key & key, size_t offset,
|
||||
std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & /* segment_lock */)
|
||||
{
|
||||
@ -952,7 +1143,7 @@ void LRUFileCache::reduceSizeToDownloaded(
|
||||
cell->file_segment = std::make_shared<FileSegment>(offset, downloaded_size, key, this, FileSegment::State::DOWNLOADED);
|
||||
}
|
||||
|
||||
bool LRUFileCache::isLastFileSegmentHolder(
|
||||
bool FileCache::isLastFileSegmentHolder(
|
||||
const Key & key, size_t offset,
|
||||
std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & /* segment_lock */)
|
||||
{
|
||||
@ -965,7 +1156,7 @@ bool LRUFileCache::isLastFileSegmentHolder(
|
||||
return cell->file_segment.use_count() == 2;
|
||||
}
|
||||
|
||||
FileSegments LRUFileCache::getSnapshot() const
|
||||
FileSegments FileCache::getSnapshot() const
|
||||
{
|
||||
std::lock_guard cache_lock(mutex);
|
||||
|
||||
@ -979,7 +1170,7 @@ FileSegments LRUFileCache::getSnapshot() const
|
||||
return file_segments;
|
||||
}
|
||||
|
||||
std::vector<String> LRUFileCache::tryGetCachePaths(const Key & key)
|
||||
std::vector<String> FileCache::tryGetCachePaths(const Key & key)
|
||||
{
|
||||
std::lock_guard cache_lock(mutex);
|
||||
|
||||
@ -996,42 +1187,42 @@ std::vector<String> LRUFileCache::tryGetCachePaths(const Key & key)
|
||||
return cache_paths;
|
||||
}
|
||||
|
||||
size_t LRUFileCache::getUsedCacheSize() const
|
||||
size_t FileCache::getUsedCacheSize() const
|
||||
{
|
||||
std::lock_guard cache_lock(mutex);
|
||||
return getUsedCacheSizeUnlocked(cache_lock);
|
||||
}
|
||||
|
||||
size_t LRUFileCache::getUsedCacheSizeUnlocked(std::lock_guard<std::mutex> & cache_lock) const
|
||||
size_t FileCache::getUsedCacheSizeUnlocked(std::lock_guard<std::mutex> & cache_lock) const
|
||||
{
|
||||
return queue.getTotalCacheSize(cache_lock);
|
||||
return main_priority->getCacheSize(cache_lock);
|
||||
}
|
||||
|
||||
size_t LRUFileCache::getAvailableCacheSize() const
|
||||
size_t FileCache::getAvailableCacheSize() const
|
||||
{
|
||||
std::lock_guard cache_lock(mutex);
|
||||
return getAvailableCacheSizeUnlocked(cache_lock);
|
||||
}
|
||||
|
||||
size_t LRUFileCache::getAvailableCacheSizeUnlocked(std::lock_guard<std::mutex> & cache_lock) const
|
||||
size_t FileCache::getAvailableCacheSizeUnlocked(std::lock_guard<std::mutex> & cache_lock) const
|
||||
{
|
||||
return max_size - getUsedCacheSizeUnlocked(cache_lock);
|
||||
}
|
||||
|
||||
size_t LRUFileCache::getFileSegmentsNum() const
|
||||
size_t FileCache::getFileSegmentsNum() const
|
||||
{
|
||||
std::lock_guard cache_lock(mutex);
|
||||
return getFileSegmentsNumUnlocked(cache_lock);
|
||||
}
|
||||
|
||||
size_t LRUFileCache::getFileSegmentsNumUnlocked(std::lock_guard<std::mutex> & cache_lock) const
|
||||
size_t FileCache::getFileSegmentsNumUnlocked(std::lock_guard<std::mutex> & cache_lock) const
|
||||
{
|
||||
return queue.getElementsNum(cache_lock);
|
||||
return main_priority->getElementsNum(cache_lock);
|
||||
}
|
||||
|
||||
LRUFileCache::FileSegmentCell::FileSegmentCell(
|
||||
FileCache::FileSegmentCell::FileSegmentCell(
|
||||
FileSegmentPtr file_segment_,
|
||||
LRUFileCache * cache,
|
||||
FileCache * cache,
|
||||
std::lock_guard<std::mutex> & cache_lock)
|
||||
: file_segment(file_segment_)
|
||||
{
|
||||
@ -1045,7 +1236,7 @@ LRUFileCache::FileSegmentCell::FileSegmentCell(
|
||||
{
|
||||
case FileSegment::State::DOWNLOADED:
|
||||
{
|
||||
queue_iterator = cache->queue.add(file_segment->key(), file_segment->offset(), file_segment->range().size(), cache_lock);
|
||||
queue_iterator = cache->main_priority->add(file_segment->key(), file_segment->offset(), file_segment->range().size(), cache_lock);
|
||||
break;
|
||||
}
|
||||
case FileSegment::State::SKIP_CACHE:
|
||||
@ -1061,79 +1252,13 @@ LRUFileCache::FileSegmentCell::FileSegmentCell(
|
||||
}
|
||||
}
|
||||
|
||||
IFileCache::LRUQueue::Iterator IFileCache::LRUQueue::add(
|
||||
const IFileCache::Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & /* cache_lock */)
|
||||
{
|
||||
#ifndef NDEBUG
|
||||
for (const auto & [entry_key, entry_offset, entry_size, entry_hits] : queue)
|
||||
{
|
||||
if (entry_key == key && entry_offset == offset)
|
||||
throw Exception(
|
||||
ErrorCodes::LOGICAL_ERROR,
|
||||
"Attempt to add duplicate queue entry to queue. (Key: {}, offset: {}, size: {})",
|
||||
key.toString(), offset, size);
|
||||
}
|
||||
#endif
|
||||
|
||||
cache_size += size;
|
||||
return queue.insert(queue.end(), FileKeyAndOffset(key, offset, size));
|
||||
}
|
||||
|
||||
void IFileCache::LRUQueue::remove(Iterator queue_it, std::lock_guard<std::mutex> & /* cache_lock */)
|
||||
{
|
||||
cache_size -= queue_it->size;
|
||||
queue.erase(queue_it);
|
||||
}
|
||||
|
||||
void IFileCache::LRUQueue::removeAll(std::lock_guard<std::mutex> & /* cache_lock */)
|
||||
{
|
||||
queue.clear();
|
||||
cache_size = 0;
|
||||
}
|
||||
|
||||
void IFileCache::LRUQueue::moveToEnd(Iterator queue_it, std::lock_guard<std::mutex> & /* cache_lock */)
|
||||
{
|
||||
queue.splice(queue.end(), queue, queue_it);
|
||||
}
|
||||
|
||||
void IFileCache::LRUQueue::incrementSize(Iterator queue_it, size_t size_increment, std::lock_guard<std::mutex> & /* cache_lock */)
|
||||
{
|
||||
cache_size += size_increment;
|
||||
queue_it->size += size_increment;
|
||||
}
|
||||
|
||||
bool IFileCache::LRUQueue::contains(
|
||||
const IFileCache::Key & key, size_t offset, std::lock_guard<std::mutex> & /* cache_lock */) const
|
||||
{
|
||||
/// This method is used for assertions in debug mode.
|
||||
/// So we do not care about complexity here.
|
||||
for (const auto & [entry_key, entry_offset, size, _] : queue)
|
||||
{
|
||||
if (key == entry_key && offset == entry_offset)
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
String IFileCache::LRUQueue::toString(std::lock_guard<std::mutex> & /* cache_lock */) const
|
||||
{
|
||||
String result;
|
||||
for (const auto & [key, offset, size, _] : queue)
|
||||
{
|
||||
if (!result.empty())
|
||||
result += ", ";
|
||||
result += fmt::format("{}: [{}, {}]", key.toString(), offset, offset + size - 1);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
String LRUFileCache::dumpStructure(const Key & key)
|
||||
String FileCache::dumpStructure(const Key & key)
|
||||
{
|
||||
std::lock_guard cache_lock(mutex);
|
||||
return dumpStructureUnlocked(key, cache_lock);
|
||||
}
|
||||
|
||||
String LRUFileCache::dumpStructureUnlocked(const Key & key, std::lock_guard<std::mutex> & cache_lock)
|
||||
String FileCache::dumpStructureUnlocked(const Key & key, std::lock_guard<std::mutex> &)
|
||||
{
|
||||
WriteBufferFromOwnString result;
|
||||
const auto & cells_by_offset = files[key];
|
||||
@ -1141,11 +1266,10 @@ String LRUFileCache::dumpStructureUnlocked(const Key & key, std::lock_guard<std:
|
||||
for (const auto & [offset, cell] : cells_by_offset)
|
||||
result << cell.file_segment->getInfoForLog() << "\n";
|
||||
|
||||
result << "\n\nQueue: " << queue.toString(cache_lock);
|
||||
return result.str();
|
||||
}
|
||||
|
||||
void LRUFileCache::assertCacheCellsCorrectness(
|
||||
void FileCache::assertCacheCellsCorrectness(
|
||||
const FileSegmentsByOffset & cells_by_offset, [[maybe_unused]] std::lock_guard<std::mutex> & cache_lock)
|
||||
{
|
||||
for (const auto & [_, cell] : cells_by_offset)
|
||||
@ -1156,30 +1280,32 @@ void LRUFileCache::assertCacheCellsCorrectness(
|
||||
if (file_segment->reserved_size != 0)
|
||||
{
|
||||
assert(cell.queue_iterator);
|
||||
assert(queue.contains(file_segment->key(), file_segment->offset(), cache_lock));
|
||||
assert(main_priority->contains(file_segment->key(), file_segment->offset(), cache_lock));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void LRUFileCache::assertCacheCorrectness(const Key & key, std::lock_guard<std::mutex> & cache_lock)
|
||||
void FileCache::assertCacheCorrectness(const Key & key, std::lock_guard<std::mutex> & cache_lock)
|
||||
{
|
||||
assertCacheCellsCorrectness(files[key], cache_lock);
|
||||
assertQueueCorrectness(cache_lock);
|
||||
assertPriorityCorrectness(cache_lock);
|
||||
}
|
||||
|
||||
void LRUFileCache::assertCacheCorrectness(std::lock_guard<std::mutex> & cache_lock)
|
||||
void FileCache::assertCacheCorrectness(std::lock_guard<std::mutex> & cache_lock)
|
||||
{
|
||||
for (const auto & [key, cells_by_offset] : files)
|
||||
assertCacheCellsCorrectness(files[key], cache_lock);
|
||||
assertQueueCorrectness(cache_lock);
|
||||
assertPriorityCorrectness(cache_lock);
|
||||
}
|
||||
|
||||
void LRUFileCache::assertQueueCorrectness(std::lock_guard<std::mutex> & cache_lock)
|
||||
void FileCache::assertPriorityCorrectness(std::lock_guard<std::mutex> & cache_lock)
|
||||
{
|
||||
[[maybe_unused]] size_t total_size = 0;
|
||||
for (auto it = queue.begin(); it != queue.end();)
|
||||
for (auto it = main_priority->getLowestPriorityReadIterator(cache_lock); it->valid(); it->next())
|
||||
{
|
||||
auto & [key, offset, size, _] = *it++;
|
||||
const auto & key = it->key();
|
||||
auto offset = it->offset();
|
||||
auto size = it->size();
|
||||
|
||||
auto * cell = getCell(key, offset, cache_lock);
|
||||
if (!cell)
|
||||
@ -1188,14 +1314,12 @@ void LRUFileCache::assertQueueCorrectness(std::lock_guard<std::mutex> & cache_lo
|
||||
ErrorCodes::LOGICAL_ERROR,
|
||||
"Cache is in inconsistent state: LRU queue contains entries with no cache cell (assertCorrectness())");
|
||||
}
|
||||
|
||||
assert(cell->size() == size);
|
||||
total_size += size;
|
||||
}
|
||||
|
||||
assert(total_size == queue.getTotalCacheSize(cache_lock));
|
||||
assert(queue.getTotalCacheSize(cache_lock) <= max_size);
|
||||
assert(queue.getElementsNum(cache_lock) <= max_element_size);
|
||||
assert(total_size == main_priority->getCacheSize(cache_lock));
|
||||
assert(main_priority->getCacheSize(cache_lock) <= max_size);
|
||||
assert(main_priority->getElementsNum(cache_lock) <= max_element_size);
|
||||
}
|
||||
|
||||
}
|
289
src/Common/FileCache.h
Normal file
289
src/Common/FileCache.h
Normal file
@ -0,0 +1,289 @@
|
||||
#pragma once
|
||||
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <list>
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <unordered_map>
|
||||
#include <unordered_set>
|
||||
#include <boost/functional/hash.hpp>
|
||||
#include <boost/noncopyable.hpp>
|
||||
|
||||
#include <Core/Types.h>
|
||||
#include <IO/ReadSettings.h>
|
||||
#include <Common/FileCache_fwd.h>
|
||||
#include <Common/FileSegment.h>
|
||||
#include <Common/IFileCachePriority.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/FileCacheType.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/// Local cache for remote filesystem files, represented as a set of non-overlapping non-empty file segments.
|
||||
/// Different caching algorithms are implemented based on IFileCachePriority.
|
||||
class FileCache : private boost::noncopyable
|
||||
{
|
||||
friend class FileSegment;
|
||||
friend class IFileCachePriority;
|
||||
friend struct FileSegmentsHolder;
|
||||
friend class FileSegmentRangeWriter;
|
||||
|
||||
public:
|
||||
using Key = DB::FileCacheKey;
|
||||
|
||||
FileCache(const String & cache_base_path_, const FileCacheSettings & cache_settings_);
|
||||
|
||||
~FileCache() = default;
|
||||
|
||||
/// Restore cache from local filesystem.
|
||||
void initialize();
|
||||
|
||||
void removeIfExists(const Key & key);
|
||||
|
||||
void removeIfReleasable(bool remove_persistent_files);
|
||||
|
||||
static bool isReadOnly();
|
||||
|
||||
/// Cache capacity in bytes.
|
||||
size_t capacity() const { return max_size; }
|
||||
|
||||
static Key hash(const String & path);
|
||||
|
||||
String getPathInLocalCache(const Key & key, size_t offset, bool is_persistent) const;
|
||||
|
||||
String getPathInLocalCache(const Key & key) const;
|
||||
|
||||
const String & getBasePath() const { return cache_base_path; }
|
||||
|
||||
std::vector<String> tryGetCachePaths(const Key & key);
|
||||
|
||||
/**
|
||||
* Given an `offset` and `size` representing [offset, offset + size) bytes interval,
|
||||
* return list of cached non-overlapping non-empty
|
||||
* file segments `[segment1, ..., segmentN]` which intersect with given interval.
|
||||
*
|
||||
* Segments in returned list are ordered in ascending order and represent a full contiguous
|
||||
* interval (no holes). Each segment in returned list has state: DOWNLOADED, DOWNLOADING or EMPTY.
|
||||
*
|
||||
* As long as pointers to returned file segments are hold
|
||||
* it is guaranteed that these file segments are not removed from cache.
|
||||
*/
|
||||
FileSegmentsHolder getOrSet(const Key & key, size_t offset, size_t size, bool is_persistent);
|
||||
|
||||
/**
|
||||
* Segments in returned list are ordered in ascending order and represent a full contiguous
|
||||
* interval (no holes). Each segment in returned list has state: DOWNLOADED, DOWNLOADING or EMPTY.
|
||||
*
|
||||
* If file segment has state EMPTY, then it is also marked as "detached". E.g. it is "detached"
|
||||
* from cache (not owned by cache), and as a result will never change it's state and will be destructed
|
||||
* with the destruction of the holder, while in getOrSet() EMPTY file segments can eventually change
|
||||
* it's state (and become DOWNLOADED).
|
||||
*/
|
||||
FileSegmentsHolder get(const Key & key, size_t offset, size_t size);
|
||||
|
||||
FileSegmentsHolder setDownloading(const Key & key, size_t offset, size_t size, bool is_persistent);
|
||||
|
||||
FileSegments getSnapshot() const;
|
||||
|
||||
/// For debug.
|
||||
String dumpStructure(const Key & key);
|
||||
|
||||
size_t getUsedCacheSize() const;
|
||||
|
||||
size_t getFileSegmentsNum() const;
|
||||
|
||||
private:
|
||||
String cache_base_path;
|
||||
size_t max_size;
|
||||
size_t max_element_size;
|
||||
size_t max_file_segment_size;
|
||||
|
||||
bool is_initialized = false;
|
||||
|
||||
mutable std::mutex mutex;
|
||||
|
||||
bool tryReserve(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
void remove(Key key, size_t offset, std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & segment_lock);
|
||||
|
||||
bool isLastFileSegmentHolder(
|
||||
const Key & key, size_t offset, std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & segment_lock);
|
||||
|
||||
void reduceSizeToDownloaded(
|
||||
const Key & key, size_t offset, std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & /* segment_lock */);
|
||||
|
||||
void assertInitialized() const;
|
||||
|
||||
using AccessKeyAndOffset = std::pair<Key, size_t>;
|
||||
struct KeyAndOffsetHash
|
||||
{
|
||||
std::size_t operator()(const AccessKeyAndOffset & key) const
|
||||
{
|
||||
return std::hash<UInt128>()(key.first.key) ^ std::hash<UInt64>()(key.second);
|
||||
}
|
||||
};
|
||||
|
||||
using FileCacheRecords = std::unordered_map<AccessKeyAndOffset, IFileCachePriority::WriteIterator, KeyAndOffsetHash>;
|
||||
|
||||
/// Used to track and control the cache access of each query.
|
||||
/// Through it, we can realize the processing of different queries by the cache layer.
|
||||
struct QueryContext
|
||||
{
|
||||
FileCacheRecords records;
|
||||
FileCachePriorityPtr priority;
|
||||
|
||||
size_t cache_size = 0;
|
||||
size_t max_cache_size;
|
||||
|
||||
bool skip_download_if_exceeds_query_cache;
|
||||
|
||||
QueryContext(size_t max_cache_size_, bool skip_download_if_exceeds_query_cache_)
|
||||
: max_cache_size(max_cache_size_), skip_download_if_exceeds_query_cache(skip_download_if_exceeds_query_cache_)
|
||||
{
|
||||
}
|
||||
|
||||
void remove(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
void reserve(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
void use(const Key & key, size_t offset, std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
size_t getMaxCacheSize() const { return max_cache_size; }
|
||||
|
||||
size_t getCacheSize() const { return cache_size; }
|
||||
|
||||
FileCachePriorityPtr getPriority() { return priority; }
|
||||
|
||||
bool isSkipDownloadIfExceed() const { return skip_download_if_exceeds_query_cache; }
|
||||
};
|
||||
|
||||
using QueryContextPtr = std::shared_ptr<QueryContext>;
|
||||
using QueryContextMap = std::unordered_map<String, QueryContextPtr>;
|
||||
|
||||
QueryContextMap query_map;
|
||||
|
||||
bool enable_filesystem_query_cache_limit;
|
||||
|
||||
QueryContextPtr getCurrentQueryContext(std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
QueryContextPtr getQueryContext(const String & query_id, std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
void removeQueryContext(const String & query_id);
|
||||
|
||||
QueryContextPtr getOrSetQueryContext(const String & query_id, const ReadSettings & settings, std::lock_guard<std::mutex> &);
|
||||
|
||||
public:
|
||||
/// Save a query context information, and adopt different cache policies
|
||||
/// for different queries through the context cache layer.
|
||||
struct QueryContextHolder : private boost::noncopyable
|
||||
{
|
||||
QueryContextHolder(const String & query_id_, FileCache * cache_, QueryContextPtr context_);
|
||||
|
||||
QueryContextHolder() = default;
|
||||
|
||||
~QueryContextHolder();
|
||||
|
||||
String query_id;
|
||||
FileCache * cache = nullptr;
|
||||
QueryContextPtr context;
|
||||
};
|
||||
|
||||
QueryContextHolder getQueryContextHolder(const String & query_id, const ReadSettings & settings);
|
||||
|
||||
private:
|
||||
struct FileSegmentCell : private boost::noncopyable
|
||||
{
|
||||
FileSegmentPtr file_segment;
|
||||
|
||||
/// Iterator is put here on first reservation attempt, if successful.
|
||||
IFileCachePriority::WriteIterator queue_iterator;
|
||||
|
||||
/// Pointer to file segment is always hold by the cache itself.
|
||||
/// Apart from pointer in cache, it can be hold by cache users, when they call
|
||||
/// getorSet(), but cache users always hold it via FileSegmentsHolder.
|
||||
bool releasable() const { return file_segment.unique(); }
|
||||
|
||||
size_t size() const { return file_segment->reserved_size; }
|
||||
|
||||
FileSegmentCell(FileSegmentPtr file_segment_, FileCache * cache, std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
FileSegmentCell(FileSegmentCell && other) noexcept
|
||||
: file_segment(std::move(other.file_segment)), queue_iterator(other.queue_iterator)
|
||||
{
|
||||
}
|
||||
};
|
||||
|
||||
using FileSegmentsByOffset = std::map<size_t, FileSegmentCell>;
|
||||
using CachedFiles = std::unordered_map<Key, FileSegmentsByOffset>;
|
||||
|
||||
CachedFiles files;
|
||||
std::unique_ptr<IFileCachePriority> main_priority;
|
||||
|
||||
FileCacheRecords stash_records;
|
||||
std::unique_ptr<IFileCachePriority> stash_priority;
|
||||
|
||||
size_t max_stash_element_size;
|
||||
size_t enable_cache_hits_threshold;
|
||||
|
||||
Poco::Logger * log;
|
||||
bool allow_to_remove_persistent_segments_from_cache_by_default;
|
||||
|
||||
FileSegments getImpl(const Key & key, const FileSegment::Range & range, std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
FileSegmentCell * getCell(const Key & key, size_t offset, std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
FileSegmentCell * addCell(
|
||||
const Key & key,
|
||||
size_t offset,
|
||||
size_t size,
|
||||
FileSegment::State state,
|
||||
bool is_persistent,
|
||||
std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
void useCell(const FileSegmentCell & cell, FileSegments & result, std::lock_guard<std::mutex> & cache_lock) const;
|
||||
|
||||
bool tryReserveForMainList(
|
||||
const Key & key, size_t offset, size_t size, QueryContextPtr query_context, std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
size_t getAvailableCacheSize() const;
|
||||
|
||||
void loadCacheInfoIntoMemory(std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
FileSegments splitRangeIntoCells(
|
||||
const Key & key,
|
||||
size_t offset,
|
||||
size_t size,
|
||||
FileSegment::State state,
|
||||
bool is_persistent,
|
||||
std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
String dumpStructureUnlocked(const Key & key_, std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
void fillHolesWithEmptyFileSegments(
|
||||
FileSegments & file_segments,
|
||||
const Key & key,
|
||||
const FileSegment::Range & range,
|
||||
bool fill_with_detached_file_segments,
|
||||
bool is_persistent,
|
||||
std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
size_t getUsedCacheSizeUnlocked(std::lock_guard<std::mutex> & cache_lock) const;
|
||||
|
||||
size_t getAvailableCacheSizeUnlocked(std::lock_guard<std::mutex> & cache_lock) const;
|
||||
|
||||
size_t getFileSegmentsNumUnlocked(std::lock_guard<std::mutex> & cache_lock) const;
|
||||
|
||||
void assertCacheCellsCorrectness(const FileSegmentsByOffset & cells_by_offset, std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
public:
|
||||
void assertCacheCorrectness(const Key & key, std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
void assertCacheCorrectness(std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
void assertPriorityCorrectness(std::lock_guard<std::mutex> & cache_lock);
|
||||
};
|
||||
|
||||
}
|
@ -1,5 +1,5 @@
|
||||
#include "FileCacheFactory.h"
|
||||
#include "LRUFileCache.h"
|
||||
#include "FileCache.h"
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -53,7 +53,7 @@ FileCachePtr FileCacheFactory::getOrCreate(
|
||||
return it->second->cache;
|
||||
}
|
||||
|
||||
auto cache = std::make_shared<LRUFileCache>(cache_base_path, file_cache_settings);
|
||||
auto cache = std::make_shared<FileCache>(cache_base_path, file_cache_settings);
|
||||
FileCacheData result{cache, file_cache_settings};
|
||||
|
||||
auto cache_it = caches.insert(caches.end(), std::move(result));
|
||||
|
31
src/Common/FileCacheType.h
Normal file
31
src/Common/FileCacheType.h
Normal file
@ -0,0 +1,31 @@
|
||||
#pragma once
|
||||
#include <Core/Types.h>
|
||||
#include <Common/hex.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
struct FileCacheKey
|
||||
{
|
||||
UInt128 key;
|
||||
|
||||
String toString() const { return getHexUIntLowercase(key); }
|
||||
|
||||
FileCacheKey() = default;
|
||||
|
||||
explicit FileCacheKey(const UInt128 & key_) : key(key_) { }
|
||||
|
||||
bool operator==(const FileCacheKey & other) const { return key == other.key; }
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
namespace std
|
||||
{
|
||||
template <>
|
||||
struct hash<DB::FileCacheKey>
|
||||
{
|
||||
std::size_t operator()(const DB::FileCacheKey & k) const { return hash<UInt128>()(k.key); }
|
||||
};
|
||||
|
||||
}
|
@ -9,8 +9,8 @@ static constexpr int REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_FILE_SEGMENT_SIZE = 100
|
||||
static constexpr int REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_ELEMENTS = 1024 * 1024;
|
||||
static constexpr int REMOTE_FS_OBJECTS_CACHE_ENABLE_HITS_THRESHOLD = 0;
|
||||
|
||||
class IFileCache;
|
||||
using FileCachePtr = std::shared_ptr<IFileCache>;
|
||||
class FileCache;
|
||||
using FileCachePtr = std::shared_ptr<FileCache>;
|
||||
|
||||
struct FileCacheSettings;
|
||||
|
||||
|
@ -5,7 +5,7 @@
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
#include <IO/Operators.h>
|
||||
#include <filesystem>
|
||||
|
||||
#include <Common/FileCache.h>
|
||||
|
||||
namespace CurrentMetrics
|
||||
{
|
||||
@ -25,7 +25,7 @@ FileSegment::FileSegment(
|
||||
size_t offset_,
|
||||
size_t size_,
|
||||
const Key & key_,
|
||||
IFileCache * cache_,
|
||||
FileCache * cache_,
|
||||
State download_state_,
|
||||
bool is_persistent_)
|
||||
: segment_range(offset_, offset_ + size_ - 1)
|
||||
@ -787,7 +787,7 @@ FileSegmentsHolder::~FileSegmentsHolder()
|
||||
/// FileSegmentsHolder right after calling file_segment->complete(), so on destruction here
|
||||
/// remain only uncompleted file segments.
|
||||
|
||||
IFileCache * cache = nullptr;
|
||||
FileCache * cache = nullptr;
|
||||
|
||||
for (auto file_segment_it = file_segments.begin(); file_segment_it != file_segments.end();)
|
||||
{
|
||||
|
@ -1,11 +1,11 @@
|
||||
#pragma once
|
||||
|
||||
#include <boost/noncopyable.hpp>
|
||||
#include <Common/IFileCache.h>
|
||||
#include <Core/Types.h>
|
||||
#include <IO/WriteBufferFromFile.h>
|
||||
#include <IO/ReadBufferFromFileBase.h>
|
||||
#include <list>
|
||||
#include <Common/FileCacheType.h>
|
||||
|
||||
namespace Poco { class Logger; }
|
||||
|
||||
@ -17,7 +17,7 @@ extern const Metric CacheFileSegments;
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class IFileCache;
|
||||
class FileCache;
|
||||
|
||||
class FileSegment;
|
||||
using FileSegmentPtr = std::shared_ptr<FileSegment>;
|
||||
@ -27,12 +27,12 @@ using FileSegments = std::list<FileSegmentPtr>;
|
||||
class FileSegment : boost::noncopyable
|
||||
{
|
||||
|
||||
friend class LRUFileCache;
|
||||
friend class FileCache;
|
||||
friend struct FileSegmentsHolder;
|
||||
friend class FileSegmentRangeWriter;
|
||||
|
||||
public:
|
||||
using Key = IFileCache::Key;
|
||||
using Key = FileCacheKey;
|
||||
using RemoteFileReaderPtr = std::shared_ptr<ReadBufferFromFileBase>;
|
||||
using LocalCacheWriterPtr = std::unique_ptr<WriteBufferFromFile>;
|
||||
|
||||
@ -74,7 +74,7 @@ public:
|
||||
size_t offset_,
|
||||
size_t size_,
|
||||
const Key & key_,
|
||||
IFileCache * cache_,
|
||||
FileCache * cache_,
|
||||
State download_state_,
|
||||
bool is_persistent_ = false);
|
||||
|
||||
@ -234,7 +234,7 @@ private:
|
||||
mutable std::mutex download_mutex;
|
||||
|
||||
Key file_key;
|
||||
IFileCache * cache;
|
||||
FileCache * cache;
|
||||
|
||||
Poco::Logger * log;
|
||||
|
||||
|
@ -1,201 +0,0 @@
|
||||
#include "IFileCache.h"
|
||||
|
||||
#include <Common/hex.h>
|
||||
#include <Common/CurrentThread.h>
|
||||
#include <Common/SipHash.h>
|
||||
#include <Common/FileCacheSettings.h>
|
||||
#include <IO/ReadSettings.h>
|
||||
#include <filesystem>
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int REMOTE_FS_OBJECT_CACHE_ERROR;
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
IFileCache::IFileCache(
|
||||
const String & cache_base_path_,
|
||||
const FileCacheSettings & cache_settings_)
|
||||
: cache_base_path(cache_base_path_)
|
||||
, max_size(cache_settings_.max_size)
|
||||
, max_element_size(cache_settings_.max_elements)
|
||||
, max_file_segment_size(cache_settings_.max_file_segment_size)
|
||||
, enable_filesystem_query_cache_limit(cache_settings_.enable_filesystem_query_cache_limit)
|
||||
{
|
||||
}
|
||||
|
||||
String IFileCache::Key::toString() const
|
||||
{
|
||||
return getHexUIntLowercase(key);
|
||||
}
|
||||
|
||||
IFileCache::Key IFileCache::hash(const String & path)
|
||||
{
|
||||
return Key(sipHash128(path.data(), path.size()));
|
||||
}
|
||||
|
||||
String IFileCache::getPathInLocalCache(const Key & key, size_t offset, bool is_persistent) const
|
||||
{
|
||||
auto key_str = key.toString();
|
||||
return fs::path(cache_base_path)
|
||||
/ key_str.substr(0, 3)
|
||||
/ key_str
|
||||
/ (std::to_string(offset) + (is_persistent ? "_persistent" : ""));
|
||||
}
|
||||
|
||||
String IFileCache::getPathInLocalCache(const Key & key) const
|
||||
{
|
||||
auto key_str = key.toString();
|
||||
return fs::path(cache_base_path) / key_str.substr(0, 3) / key_str;
|
||||
}
|
||||
|
||||
static bool isQueryInitialized()
|
||||
{
|
||||
return CurrentThread::isInitialized()
|
||||
&& CurrentThread::get().getQueryContext()
|
||||
&& !CurrentThread::getQueryId().empty();
|
||||
}
|
||||
|
||||
bool IFileCache::isReadOnly()
|
||||
{
|
||||
return !isQueryInitialized();
|
||||
}
|
||||
|
||||
void IFileCache::assertInitialized() const
|
||||
{
|
||||
if (!is_initialized)
|
||||
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Cache not initialized");
|
||||
}
|
||||
|
||||
IFileCache::QueryContextPtr IFileCache::getCurrentQueryContext(std::lock_guard<std::mutex> & cache_lock)
|
||||
{
|
||||
if (!isQueryInitialized())
|
||||
return nullptr;
|
||||
|
||||
return getQueryContext(std::string(CurrentThread::getQueryId()), cache_lock);
|
||||
}
|
||||
|
||||
IFileCache::QueryContextPtr IFileCache::getQueryContext(const String & query_id, std::lock_guard<std::mutex> & /* cache_lock */)
|
||||
{
|
||||
auto query_iter = query_map.find(query_id);
|
||||
return (query_iter == query_map.end()) ? nullptr : query_iter->second;
|
||||
}
|
||||
|
||||
void IFileCache::removeQueryContext(const String & query_id)
|
||||
{
|
||||
std::lock_guard cache_lock(mutex);
|
||||
auto query_iter = query_map.find(query_id);
|
||||
|
||||
if (query_iter == query_map.end())
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::LOGICAL_ERROR,
|
||||
"Attempt to release query context that does not exist (query_id: {})",
|
||||
query_id);
|
||||
}
|
||||
|
||||
query_map.erase(query_iter);
|
||||
}
|
||||
|
||||
IFileCache::QueryContextPtr IFileCache::getOrSetQueryContext(
|
||||
const String & query_id, const ReadSettings & settings, std::lock_guard<std::mutex> & cache_lock)
|
||||
{
|
||||
if (query_id.empty())
|
||||
return nullptr;
|
||||
|
||||
auto context = getQueryContext(query_id, cache_lock);
|
||||
if (context)
|
||||
return context;
|
||||
|
||||
auto query_context = std::make_shared<QueryContext>(settings.max_query_cache_size, settings.skip_download_if_exceeds_query_cache);
|
||||
auto query_iter = query_map.emplace(query_id, query_context).first;
|
||||
return query_iter->second;
|
||||
}
|
||||
|
||||
IFileCache::QueryContextHolder IFileCache::getQueryContextHolder(const String & query_id, const ReadSettings & settings)
|
||||
{
|
||||
std::lock_guard cache_lock(mutex);
|
||||
|
||||
if (!enable_filesystem_query_cache_limit || settings.max_query_cache_size == 0)
|
||||
return {};
|
||||
|
||||
/// if enable_filesystem_query_cache_limit is true, and max_query_cache_size large than zero,
|
||||
/// we create context query for current query.
|
||||
auto context = getOrSetQueryContext(query_id, settings, cache_lock);
|
||||
return QueryContextHolder(query_id, this, context);
|
||||
}
|
||||
|
||||
void IFileCache::QueryContext::remove(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock)
|
||||
{
|
||||
if (cache_size < size)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Deleted cache size exceeds existing cache size");
|
||||
|
||||
if (!skip_download_if_exceeds_query_cache)
|
||||
{
|
||||
auto record = records.find({key, offset});
|
||||
if (record != records.end())
|
||||
{
|
||||
lru_queue.remove(record->second, cache_lock);
|
||||
records.erase({key, offset});
|
||||
}
|
||||
}
|
||||
cache_size -= size;
|
||||
}
|
||||
|
||||
void IFileCache::QueryContext::reserve(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock)
|
||||
{
|
||||
if (cache_size + size > max_cache_size)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::LOGICAL_ERROR,
|
||||
"Reserved cache size exceeds the remaining cache size (key: {}, offset: {})",
|
||||
key.toString(), offset);
|
||||
}
|
||||
|
||||
if (!skip_download_if_exceeds_query_cache)
|
||||
{
|
||||
auto record = records.find({key, offset});
|
||||
if (record == records.end())
|
||||
{
|
||||
auto queue_iter = lru_queue.add(key, offset, 0, cache_lock);
|
||||
record = records.insert({{key, offset}, queue_iter}).first;
|
||||
}
|
||||
record->second->size += size;
|
||||
}
|
||||
cache_size += size;
|
||||
}
|
||||
|
||||
void IFileCache::QueryContext::use(const Key & key, size_t offset, std::lock_guard<std::mutex> & cache_lock)
|
||||
{
|
||||
if (skip_download_if_exceeds_query_cache)
|
||||
return;
|
||||
|
||||
auto record = records.find({key, offset});
|
||||
if (record != records.end())
|
||||
lru_queue.moveToEnd(record->second, cache_lock);
|
||||
}
|
||||
|
||||
IFileCache::QueryContextHolder::QueryContextHolder(
|
||||
const String & query_id_,
|
||||
IFileCache * cache_,
|
||||
IFileCache::QueryContextPtr context_)
|
||||
: query_id(query_id_)
|
||||
, cache(cache_)
|
||||
, context(context_)
|
||||
{
|
||||
}
|
||||
|
||||
IFileCache::QueryContextHolder::~QueryContextHolder()
|
||||
{
|
||||
/// If only the query_map and the current holder hold the context_query,
|
||||
/// the query has been completed and the query_context is released.
|
||||
if (context && context.use_count() == 2)
|
||||
cache->removeQueryContext(query_id);
|
||||
}
|
||||
|
||||
}
|
@ -1,267 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <Core/Types.h>
|
||||
#include <Common/FileCache_fwd.h>
|
||||
|
||||
#include <boost/noncopyable.hpp>
|
||||
#include <list>
|
||||
#include <unordered_map>
|
||||
#include <functional>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class FileSegment;
|
||||
using FileSegmentPtr = std::shared_ptr<FileSegment>;
|
||||
using FileSegments = std::list<FileSegmentPtr>;
|
||||
struct FileSegmentsHolder;
|
||||
struct ReadSettings;
|
||||
|
||||
/**
|
||||
* Local cache for remote filesystem files, represented as a set of non-overlapping non-empty file segments.
|
||||
*/
|
||||
class IFileCache : private boost::noncopyable
|
||||
{
|
||||
friend class FileSegment;
|
||||
friend struct FileSegmentsHolder;
|
||||
friend class FileSegmentRangeWriter;
|
||||
|
||||
public:
|
||||
struct Key
|
||||
{
|
||||
UInt128 key;
|
||||
String toString() const;
|
||||
|
||||
Key() = default;
|
||||
explicit Key(const UInt128 & key_) : key(key_) {}
|
||||
|
||||
bool operator==(const Key & other) const { return key == other.key; }
|
||||
};
|
||||
|
||||
IFileCache(
|
||||
const String & cache_base_path_,
|
||||
const FileCacheSettings & cache_settings_);
|
||||
|
||||
virtual ~IFileCache() = default;
|
||||
|
||||
/// Restore cache from local filesystem.
|
||||
virtual void initialize() = 0;
|
||||
|
||||
virtual void removeIfExists(const Key & key) = 0;
|
||||
|
||||
virtual void removeIfReleasable(bool remove_persistent_files) = 0;
|
||||
|
||||
static bool isReadOnly();
|
||||
|
||||
/// Cache capacity in bytes.
|
||||
size_t capacity() const { return max_size; }
|
||||
|
||||
static Key hash(const String & path);
|
||||
|
||||
String getPathInLocalCache(const Key & key, size_t offset, bool is_persistent) const;
|
||||
|
||||
String getPathInLocalCache(const Key & key) const;
|
||||
|
||||
const String & getBasePath() const { return cache_base_path; }
|
||||
|
||||
virtual std::vector<String> tryGetCachePaths(const Key & key) = 0;
|
||||
|
||||
/**
|
||||
* Given an `offset` and `size` representing [offset, offset + size) bytes interval,
|
||||
* return list of cached non-overlapping non-empty
|
||||
* file segments `[segment1, ..., segmentN]` which intersect with given interval.
|
||||
*
|
||||
* Segments in returned list are ordered in ascending order and represent a full contiguous
|
||||
* interval (no holes). Each segment in returned list has state: DOWNLOADED, DOWNLOADING or EMPTY.
|
||||
*
|
||||
* As long as pointers to returned file segments are hold
|
||||
* it is guaranteed that these file segments are not removed from cache.
|
||||
*/
|
||||
virtual FileSegmentsHolder getOrSet(const Key & key, size_t offset, size_t size, bool is_persistent) = 0;
|
||||
|
||||
/**
|
||||
* Segments in returned list are ordered in ascending order and represent a full contiguous
|
||||
* interval (no holes). Each segment in returned list has state: DOWNLOADED, DOWNLOADING or EMPTY.
|
||||
*
|
||||
* If file segment has state EMPTY, then it is also marked as "detached". E.g. it is "detached"
|
||||
* from cache (not owned by cache), and as a result will never change it's state and will be destructed
|
||||
* with the destruction of the holder, while in getOrSet() EMPTY file segments can eventually change
|
||||
* it's state (and become DOWNLOADED).
|
||||
*/
|
||||
virtual FileSegmentsHolder get(const Key & key, size_t offset, size_t size) = 0;
|
||||
|
||||
virtual FileSegmentsHolder setDownloading(const Key & key, size_t offset, size_t size, bool is_persistent) = 0;
|
||||
|
||||
virtual FileSegments getSnapshot() const = 0;
|
||||
|
||||
/// For debug.
|
||||
virtual String dumpStructure(const Key & key) = 0;
|
||||
|
||||
virtual size_t getUsedCacheSize() const = 0;
|
||||
|
||||
virtual size_t getFileSegmentsNum() const = 0;
|
||||
|
||||
protected:
|
||||
String cache_base_path;
|
||||
size_t max_size;
|
||||
size_t max_element_size;
|
||||
size_t max_file_segment_size;
|
||||
|
||||
bool is_initialized = false;
|
||||
|
||||
mutable std::mutex mutex;
|
||||
|
||||
virtual bool tryReserve(
|
||||
const Key & key, size_t offset, size_t size,
|
||||
std::lock_guard<std::mutex> & cache_lock) = 0;
|
||||
|
||||
virtual void remove(
|
||||
Key key, size_t offset,
|
||||
std::lock_guard<std::mutex> & cache_lock,
|
||||
std::lock_guard<std::mutex> & segment_lock) = 0;
|
||||
|
||||
virtual bool isLastFileSegmentHolder(
|
||||
const Key & key, size_t offset,
|
||||
std::lock_guard<std::mutex> & cache_lock,
|
||||
std::lock_guard<std::mutex> & segment_lock) = 0;
|
||||
|
||||
virtual void reduceSizeToDownloaded(
|
||||
const Key & key, size_t offset,
|
||||
std::lock_guard<std::mutex> & cache_lock,
|
||||
std::lock_guard<std::mutex> & /* segment_lock */) = 0;
|
||||
|
||||
void assertInitialized() const;
|
||||
|
||||
class LRUQueue
|
||||
{
|
||||
public:
|
||||
struct FileKeyAndOffset
|
||||
{
|
||||
Key key;
|
||||
size_t offset;
|
||||
size_t size;
|
||||
size_t hits = 0;
|
||||
|
||||
FileKeyAndOffset(const Key & key_, size_t offset_, size_t size_) : key(key_), offset(offset_), size(size_) {}
|
||||
};
|
||||
|
||||
using Iterator = typename std::list<FileKeyAndOffset>::iterator;
|
||||
|
||||
size_t getTotalCacheSize(std::lock_guard<std::mutex> & /* cache_lock */) const { return cache_size; }
|
||||
|
||||
size_t getElementsNum(std::lock_guard<std::mutex> & /* cache_lock */) const { return queue.size(); }
|
||||
|
||||
Iterator add(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
void remove(Iterator queue_it, std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
void moveToEnd(Iterator queue_it, std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
/// Space reservation for a file segment is incremental, so we need to be able to increment size of the queue entry.
|
||||
void incrementSize(Iterator queue_it, size_t size_increment, std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
String toString(std::lock_guard<std::mutex> & cache_lock) const;
|
||||
|
||||
bool contains(const Key & key, size_t offset, std::lock_guard<std::mutex> & cache_lock) const;
|
||||
|
||||
Iterator begin() { return queue.begin(); }
|
||||
|
||||
Iterator end() { return queue.end(); }
|
||||
|
||||
void removeAll(std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
private:
|
||||
std::list<FileKeyAndOffset> queue;
|
||||
size_t cache_size = 0;
|
||||
};
|
||||
|
||||
using AccessKeyAndOffset = std::pair<Key, size_t>;
|
||||
struct KeyAndOffsetHash
|
||||
{
|
||||
std::size_t operator()(const AccessKeyAndOffset & key) const
|
||||
{
|
||||
return std::hash<UInt128>()(key.first.key) ^ std::hash<UInt64>()(key.second);
|
||||
}
|
||||
};
|
||||
|
||||
using AccessRecord = std::unordered_map<AccessKeyAndOffset, LRUQueue::Iterator, KeyAndOffsetHash>;
|
||||
|
||||
/// Used to track and control the cache access of each query.
|
||||
/// Through it, we can realize the processing of different queries by the cache layer.
|
||||
struct QueryContext
|
||||
{
|
||||
LRUQueue lru_queue;
|
||||
AccessRecord records;
|
||||
|
||||
size_t cache_size = 0;
|
||||
size_t max_cache_size;
|
||||
|
||||
bool skip_download_if_exceeds_query_cache;
|
||||
|
||||
QueryContext(size_t max_cache_size_, bool skip_download_if_exceeds_query_cache_)
|
||||
: max_cache_size(max_cache_size_)
|
||||
, skip_download_if_exceeds_query_cache(skip_download_if_exceeds_query_cache_) {}
|
||||
|
||||
void remove(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
void reserve(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
void use(const Key & key, size_t offset, std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
size_t getMaxCacheSize() const { return max_cache_size; }
|
||||
|
||||
size_t getCacheSize() const { return cache_size; }
|
||||
|
||||
LRUQueue & queue() { return lru_queue; }
|
||||
|
||||
bool isSkipDownloadIfExceed() const { return skip_download_if_exceeds_query_cache; }
|
||||
};
|
||||
|
||||
using QueryContextPtr = std::shared_ptr<QueryContext>;
|
||||
using QueryContextMap = std::unordered_map<String, QueryContextPtr>;
|
||||
|
||||
QueryContextMap query_map;
|
||||
|
||||
bool enable_filesystem_query_cache_limit;
|
||||
|
||||
QueryContextPtr getCurrentQueryContext(std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
QueryContextPtr getQueryContext(const String & query_id, std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
void removeQueryContext(const String & query_id);
|
||||
|
||||
QueryContextPtr getOrSetQueryContext(const String & query_id, const ReadSettings & settings, std::lock_guard<std::mutex> &);
|
||||
|
||||
public:
|
||||
/// Save a query context information, and adopt different cache policies
|
||||
/// for different queries through the context cache layer.
|
||||
struct QueryContextHolder : private boost::noncopyable
|
||||
{
|
||||
QueryContextHolder(const String & query_id_, IFileCache * cache_, QueryContextPtr context_);
|
||||
|
||||
QueryContextHolder() = default;
|
||||
|
||||
~QueryContextHolder();
|
||||
|
||||
String query_id;
|
||||
IFileCache * cache = nullptr;
|
||||
QueryContextPtr context;
|
||||
};
|
||||
|
||||
QueryContextHolder getQueryContextHolder(const String & query_id, const ReadSettings & settings);
|
||||
|
||||
};
|
||||
|
||||
using FileCachePtr = std::shared_ptr<IFileCache>;
|
||||
|
||||
}
|
||||
|
||||
namespace std
|
||||
{
|
||||
template <> struct hash<DB::IFileCache::Key>
|
||||
{
|
||||
std::size_t operator()(const DB::IFileCache::Key & k) const { return hash<UInt128>()(k.key); }
|
||||
};
|
||||
|
||||
}
|
95
src/Common/IFileCachePriority.h
Normal file
95
src/Common/IFileCachePriority.h
Normal file
@ -0,0 +1,95 @@
|
||||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <Core/Types.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/FileCacheType.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class IFileCachePriority;
|
||||
using FileCachePriorityPtr = std::shared_ptr<IFileCachePriority>;
|
||||
|
||||
/// IFileCachePriority is used to maintain the priority of cached data.
|
||||
class IFileCachePriority
|
||||
{
|
||||
public:
|
||||
class IIterator;
|
||||
using Key = FileCacheKey;
|
||||
using ReadIterator = std::unique_ptr<const IIterator>;
|
||||
using WriteIterator = std::shared_ptr<IIterator>;
|
||||
|
||||
struct FileCacheRecord
|
||||
{
|
||||
Key key;
|
||||
size_t offset;
|
||||
size_t size;
|
||||
size_t hits = 0;
|
||||
|
||||
FileCacheRecord(const Key & key_, size_t offset_, size_t size_) : key(key_), offset(offset_), size(size_) { }
|
||||
};
|
||||
|
||||
/// It provides an iterator to traverse the cache priority. Under normal circumstances,
|
||||
/// the iterator can only return the records that have been directly swapped out.
|
||||
/// For example, in the LRU algorithm, it can traverse all records, but in the LRU-K, it
|
||||
/// can only traverse the records in the low priority queue.
|
||||
class IIterator
|
||||
{
|
||||
public:
|
||||
virtual ~IIterator() = default;
|
||||
|
||||
virtual const Key & key() const = 0;
|
||||
|
||||
virtual size_t offset() const = 0;
|
||||
|
||||
virtual size_t size() const = 0;
|
||||
|
||||
virtual size_t hits() const = 0;
|
||||
|
||||
/// Point the iterator to the next higher priority cache record.
|
||||
virtual void next() const = 0;
|
||||
|
||||
virtual bool valid() const = 0;
|
||||
|
||||
/// Mark a cache record as recently used, it will update the priority
|
||||
/// of the cache record according to different cache algorithms.
|
||||
virtual void use(std::lock_guard<std::mutex> &) = 0;
|
||||
|
||||
/// Deletes an existing cached record. And to avoid pointer suspension
|
||||
/// the iterator should automatically point to the next record.
|
||||
virtual void removeAndGetNext(std::lock_guard<std::mutex> &) = 0;
|
||||
|
||||
virtual void incrementSize(size_t, std::lock_guard<std::mutex> &) = 0;
|
||||
};
|
||||
|
||||
public:
|
||||
virtual ~IFileCachePriority() = default;
|
||||
|
||||
/// Add a cache record that did not exist before, and throw a
|
||||
/// logical exception if the cache block already exists.
|
||||
virtual WriteIterator add(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock) = 0;
|
||||
|
||||
/// This method is used for assertions in debug mode. So we do not care about complexity here.
|
||||
/// Query whether a cache record exists. If it exists, return true. If not, return false.
|
||||
virtual bool contains(const Key & key, size_t offset, std::lock_guard<std::mutex> & cache_lock) = 0;
|
||||
|
||||
virtual void removeAll(std::lock_guard<std::mutex> & cache_lock) = 0;
|
||||
|
||||
/// Returns an iterator pointing to the lowest priority cached record.
|
||||
/// We can traverse all cached records through the iterator's next().
|
||||
virtual ReadIterator getLowestPriorityReadIterator(std::lock_guard<std::mutex> & cache_lock) = 0;
|
||||
|
||||
/// The same as getLowestPriorityReadIterator(), but it is writeable.
|
||||
virtual WriteIterator getLowestPriorityWriteIterator(std::lock_guard<std::mutex> & cache_lock) = 0;
|
||||
|
||||
virtual size_t getElementsNum(std::lock_guard<std::mutex> & cache_lock) const = 0;
|
||||
|
||||
size_t getCacheSize(std::lock_guard<std::mutex> &) const { return cache_size; }
|
||||
|
||||
protected:
|
||||
size_t max_cache_size = 0;
|
||||
size_t cache_size = 0;
|
||||
};
|
||||
};
|
@ -1,157 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <list>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <unordered_map>
|
||||
#include <unordered_set>
|
||||
#include <boost/functional/hash.hpp>
|
||||
#include <boost/noncopyable.hpp>
|
||||
#include <map>
|
||||
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/FileSegment.h>
|
||||
#include <Common/IFileCache.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/**
|
||||
* Local cache for remote filesystem files, represented as a set of non-overlapping non-empty file segments.
|
||||
* Implements LRU eviction policy.
|
||||
*/
|
||||
class LRUFileCache final : public IFileCache
|
||||
{
|
||||
public:
|
||||
LRUFileCache(
|
||||
const String & cache_base_path_,
|
||||
const FileCacheSettings & cache_settings_);
|
||||
|
||||
FileSegmentsHolder getOrSet(const Key & key, size_t offset, size_t size, bool is_persistent) override;
|
||||
|
||||
FileSegmentsHolder get(const Key & key, size_t offset, size_t size) override;
|
||||
|
||||
FileSegments getSnapshot() const override;
|
||||
|
||||
void initialize() override;
|
||||
|
||||
void removeIfExists(const Key & key) override;
|
||||
|
||||
void removeIfReleasable(bool remove_persistent_files) override;
|
||||
|
||||
std::vector<String> tryGetCachePaths(const Key & key) override;
|
||||
|
||||
size_t getUsedCacheSize() const override;
|
||||
|
||||
size_t getFileSegmentsNum() const override;
|
||||
|
||||
private:
|
||||
struct FileSegmentCell : private boost::noncopyable
|
||||
{
|
||||
FileSegmentPtr file_segment;
|
||||
|
||||
/// Iterator is put here on first reservation attempt, if successful.
|
||||
std::optional<LRUQueue::Iterator> queue_iterator;
|
||||
|
||||
/// Pointer to file segment is always hold by the cache itself.
|
||||
/// Apart from pointer in cache, it can be hold by cache users, when they call
|
||||
/// getorSet(), but cache users always hold it via FileSegmentsHolder.
|
||||
bool releasable() const {return file_segment.unique(); }
|
||||
|
||||
size_t size() const { return file_segment->reserved_size; }
|
||||
|
||||
FileSegmentCell(FileSegmentPtr file_segment_, LRUFileCache * cache, std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
FileSegmentCell(FileSegmentCell && other) noexcept
|
||||
: file_segment(std::move(other.file_segment))
|
||||
, queue_iterator(other.queue_iterator) {}
|
||||
};
|
||||
|
||||
using FileSegmentsByOffset = std::map<size_t, FileSegmentCell>;
|
||||
using CachedFiles = std::unordered_map<Key, FileSegmentsByOffset>;
|
||||
|
||||
CachedFiles files;
|
||||
LRUQueue queue;
|
||||
|
||||
LRUQueue stash_queue;
|
||||
AccessRecord records;
|
||||
|
||||
size_t max_stash_element_size;
|
||||
size_t enable_cache_hits_threshold;
|
||||
|
||||
Poco::Logger * log;
|
||||
bool allow_to_remove_persistent_segments_from_cache_by_default;
|
||||
|
||||
FileSegments getImpl(
|
||||
const Key & key, const FileSegment::Range & range,
|
||||
std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
FileSegmentCell * getCell(
|
||||
const Key & key, size_t offset, std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
FileSegmentCell * addCell(
|
||||
const Key & key, size_t offset, size_t size,
|
||||
FileSegment::State state, bool is_persistent,
|
||||
std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
void useCell(const FileSegmentCell & cell, FileSegments & result, std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
bool tryReserve(
|
||||
const Key & key, size_t offset, size_t size,
|
||||
std::lock_guard<std::mutex> & cache_lock) override;
|
||||
|
||||
bool tryReserveForMainList(
|
||||
const Key & key, size_t offset, size_t size,
|
||||
QueryContextPtr query_context,
|
||||
std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
void remove(
|
||||
Key key, size_t offset,
|
||||
std::lock_guard<std::mutex> & cache_lock,
|
||||
std::lock_guard<std::mutex> & segment_lock) override;
|
||||
|
||||
bool isLastFileSegmentHolder(
|
||||
const Key & key, size_t offset,
|
||||
std::lock_guard<std::mutex> & cache_lock,
|
||||
std::lock_guard<std::mutex> & segment_lock) override;
|
||||
|
||||
size_t getAvailableCacheSize() const;
|
||||
|
||||
void loadCacheInfoIntoMemory(std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
FileSegments splitRangeIntoCells(
|
||||
const Key & key, size_t offset, size_t size, FileSegment::State state, bool is_persistent, std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
String dumpStructureUnlocked(const Key & key_, std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
void fillHolesWithEmptyFileSegments(
|
||||
FileSegments & file_segments, const Key & key, const FileSegment::Range & range, bool fill_with_detached_file_segments, bool is_persistent, std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
FileSegmentsHolder setDownloading(const Key & key, size_t offset, size_t size, bool is_persistent) override;
|
||||
|
||||
size_t getUsedCacheSizeUnlocked(std::lock_guard<std::mutex> & cache_lock) const;
|
||||
|
||||
size_t getAvailableCacheSizeUnlocked(std::lock_guard<std::mutex> & cache_lock) const;
|
||||
|
||||
size_t getFileSegmentsNumUnlocked(std::lock_guard<std::mutex> & cache_lock) const;
|
||||
|
||||
void assertCacheCellsCorrectness(const FileSegmentsByOffset & cells_by_offset, std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
void reduceSizeToDownloaded(
|
||||
const Key & key, size_t offset,
|
||||
std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & /* segment_lock */) override;
|
||||
|
||||
public:
|
||||
String dumpStructure(const Key & key_) override;
|
||||
|
||||
void assertCacheCorrectness(const Key & key, std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
void assertCacheCorrectness(std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
void assertQueueCorrectness(std::lock_guard<std::mutex> & cache_lock);
|
||||
};
|
||||
|
||||
}
|
61
src/Common/LRUFileCachePriority.cpp
Normal file
61
src/Common/LRUFileCachePriority.cpp
Normal file
@ -0,0 +1,61 @@
|
||||
#include <Common/LRUFileCachePriority.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
IFileCachePriority::WriteIterator LRUFileCachePriority::add(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> &)
|
||||
{
|
||||
#ifndef NDEBUG
|
||||
for (const auto & entry : queue)
|
||||
{
|
||||
if (entry.key == key && entry.offset == offset)
|
||||
throw Exception(
|
||||
ErrorCodes::LOGICAL_ERROR,
|
||||
"Attempt to add duplicate queue entry to queue. (Key: {}, offset: {}, size: {})",
|
||||
entry.key.toString(),
|
||||
entry.offset,
|
||||
entry.size);
|
||||
}
|
||||
#endif
|
||||
auto iter = queue.insert(queue.end(), FileCacheRecord(key, offset, size));
|
||||
cache_size += size;
|
||||
return std::make_shared<LRUFileCacheIterator>(this, iter);
|
||||
}
|
||||
|
||||
bool LRUFileCachePriority::contains(const Key & key, size_t offset, std::lock_guard<std::mutex> &)
|
||||
{
|
||||
for (const auto & record : queue)
|
||||
{
|
||||
if (key == record.key && offset == record.offset)
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void LRUFileCachePriority::removeAll(std::lock_guard<std::mutex> &)
|
||||
{
|
||||
queue.clear();
|
||||
cache_size = 0;
|
||||
}
|
||||
|
||||
IFileCachePriority::ReadIterator LRUFileCachePriority::getLowestPriorityReadIterator(std::lock_guard<std::mutex> &)
|
||||
{
|
||||
return std::make_unique<const LRUFileCacheIterator>(this, queue.begin());
|
||||
}
|
||||
|
||||
IFileCachePriority::WriteIterator LRUFileCachePriority::getLowestPriorityWriteIterator(std::lock_guard<std::mutex> &)
|
||||
{
|
||||
return std::make_shared<LRUFileCacheIterator>(this, queue.begin());
|
||||
}
|
||||
|
||||
size_t LRUFileCachePriority::getElementsNum(std::lock_guard<std::mutex> &) const
|
||||
{
|
||||
return queue.size();
|
||||
}
|
||||
|
||||
};
|
80
src/Common/LRUFileCachePriority.h
Normal file
80
src/Common/LRUFileCachePriority.h
Normal file
@ -0,0 +1,80 @@
|
||||
#pragma once
|
||||
|
||||
#include <list>
|
||||
#include <Common/IFileCachePriority.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/// Based on the LRU algorithm implementation, the record with the lowest priority is stored at
|
||||
/// the head of the queue, and the record with the highest priority is stored at the tail.
|
||||
class LRUFileCachePriority : public IFileCachePriority
|
||||
{
|
||||
private:
|
||||
class LRUFileCacheIterator;
|
||||
using LRUQueue = std::list<FileCacheRecord>;
|
||||
using LRUQueueIterator = typename LRUQueue::iterator;
|
||||
|
||||
public:
|
||||
LRUFileCachePriority() = default;
|
||||
|
||||
WriteIterator add(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> &) override;
|
||||
|
||||
bool contains(const Key & key, size_t offset, std::lock_guard<std::mutex> &) override;
|
||||
|
||||
void removeAll(std::lock_guard<std::mutex> &) override;
|
||||
|
||||
ReadIterator getLowestPriorityReadIterator(std::lock_guard<std::mutex> &) override;
|
||||
|
||||
WriteIterator getLowestPriorityWriteIterator(std::lock_guard<std::mutex> &) override;
|
||||
|
||||
size_t getElementsNum(std::lock_guard<std::mutex> &) const override;
|
||||
|
||||
private:
|
||||
LRUQueue queue;
|
||||
};
|
||||
|
||||
class LRUFileCachePriority::LRUFileCacheIterator : public IFileCachePriority::IIterator
|
||||
{
|
||||
public:
|
||||
LRUFileCacheIterator(LRUFileCachePriority * file_cache_, LRUFileCachePriority::LRUQueueIterator queue_iter_)
|
||||
: file_cache(file_cache_), queue_iter(queue_iter_)
|
||||
{
|
||||
}
|
||||
|
||||
void next() const override { queue_iter++; }
|
||||
|
||||
bool valid() const override { return queue_iter != file_cache->queue.end(); }
|
||||
|
||||
const Key & key() const override { return queue_iter->key; }
|
||||
|
||||
size_t offset() const override { return queue_iter->offset; }
|
||||
|
||||
size_t size() const override { return queue_iter->size; }
|
||||
|
||||
size_t hits() const override { return queue_iter->hits; }
|
||||
|
||||
void removeAndGetNext(std::lock_guard<std::mutex> &) override
|
||||
{
|
||||
file_cache->cache_size -= queue_iter->size;
|
||||
queue_iter = file_cache->queue.erase(queue_iter);
|
||||
}
|
||||
|
||||
void incrementSize(size_t size_increment, std::lock_guard<std::mutex> &) override
|
||||
{
|
||||
file_cache->cache_size += size_increment;
|
||||
queue_iter->size += size_increment;
|
||||
}
|
||||
|
||||
void use(std::lock_guard<std::mutex> &) override
|
||||
{
|
||||
queue_iter->hits++;
|
||||
file_cache->queue.splice(file_cache->queue.end(), file_cache->queue, queue_iter);
|
||||
}
|
||||
|
||||
private:
|
||||
LRUFileCachePriority * file_cache;
|
||||
mutable LRUFileCachePriority::LRUQueueIterator queue_iter;
|
||||
};
|
||||
|
||||
};
|
@ -1,7 +1,7 @@
|
||||
#include <iomanip>
|
||||
#include <iostream>
|
||||
#include <gtest/gtest.h>
|
||||
#include <Common/LRUFileCache.h>
|
||||
#include <Common/FileCache.h>
|
||||
#include <Common/FileSegment.h>
|
||||
#include <Common/CurrentThread.h>
|
||||
#include <Common/filesystemHelpers.h>
|
||||
@ -47,7 +47,7 @@ std::vector<DB::FileSegmentPtr> fromHolder(const DB::FileSegmentsHolder & holder
|
||||
return std::vector<DB::FileSegmentPtr>(holder.file_segments.begin(), holder.file_segments.end());
|
||||
}
|
||||
|
||||
String getFileSegmentPath(const String & base_path, const DB::IFileCache::Key & key, size_t offset)
|
||||
String getFileSegmentPath(const String & base_path, const DB::FileCache::Key & key, size_t offset)
|
||||
{
|
||||
auto key_str = key.toString();
|
||||
return fs::path(base_path) / key_str.substr(0, 3) / key_str / DB::toString(offset);
|
||||
@ -85,7 +85,7 @@ void complete(const DB::FileSegmentsHolder & holder)
|
||||
}
|
||||
|
||||
|
||||
TEST(LRUFileCache, get)
|
||||
TEST(FileCache, get)
|
||||
{
|
||||
if (fs::exists(cache_base_path))
|
||||
fs::remove_all(cache_base_path);
|
||||
@ -103,7 +103,7 @@ TEST(LRUFileCache, get)
|
||||
DB::FileCacheSettings settings;
|
||||
settings.max_size = 30;
|
||||
settings.max_elements = 5;
|
||||
auto cache = DB::LRUFileCache(cache_base_path, settings);
|
||||
auto cache = DB::FileCache(cache_base_path, settings);
|
||||
cache.initialize();
|
||||
auto key = cache.hash("key1");
|
||||
|
||||
@ -479,7 +479,7 @@ TEST(LRUFileCache, get)
|
||||
{
|
||||
/// Test LRUCache::restore().
|
||||
|
||||
auto cache2 = DB::LRUFileCache(cache_base_path, settings);
|
||||
auto cache2 = DB::FileCache(cache_base_path, settings);
|
||||
cache2.initialize();
|
||||
|
||||
auto holder1 = cache2.getOrSet(key, 2, 28, false); /// Get [2, 29]
|
||||
@ -499,7 +499,7 @@ TEST(LRUFileCache, get)
|
||||
|
||||
auto settings2 = settings;
|
||||
settings2.max_file_segment_size = 10;
|
||||
auto cache2 = DB::LRUFileCache(caches_dir / "cache2", settings2);
|
||||
auto cache2 = DB::FileCache(caches_dir / "cache2", settings2);
|
||||
cache2.initialize();
|
||||
|
||||
auto holder1 = cache2.getOrSet(key, 0, 25, false); /// Get [0, 24]
|
||||
|
@ -212,7 +212,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
|
||||
\
|
||||
M(Bool, insert_deduplicate, true, "For INSERT queries in the replicated table, specifies that deduplication of insertings blocks should be performed", 0) \
|
||||
\
|
||||
M(UInt64, insert_quorum, 0, "For INSERT queries in the replicated table, wait writing for the specified number of replicas and linearize the addition of the data. 0 - disabled.", 0) \
|
||||
M(UInt64Auto, insert_quorum, 0, "For INSERT queries in the replicated table, wait writing for the specified number of replicas and linearize the addition of the data. 0 - disabled.", 0) \
|
||||
M(Milliseconds, insert_quorum_timeout, 600000, "", 0) \
|
||||
M(Bool, insert_quorum_parallel, true, "For quorum INSERT queries - enable to make parallel inserts without linearizability", 0) \
|
||||
M(UInt64, select_sequential_consistency, 0, "For SELECT queries from the replicated table, throw an exception if the replica does not have a chunk written with the quorum; do not read the parts that have not yet been written with the quorum.", 0) \
|
||||
|
@ -153,6 +153,9 @@ template struct SettingFieldNumber<Int64>;
|
||||
template struct SettingFieldNumber<float>;
|
||||
template struct SettingFieldNumber<bool>;
|
||||
|
||||
template struct SettingAutoWrapper<SettingFieldNumber<UInt64>>;
|
||||
template struct SettingAutoWrapper<SettingFieldNumber<Int64>>;
|
||||
template struct SettingAutoWrapper<SettingFieldNumber<float>>;
|
||||
|
||||
namespace
|
||||
{
|
||||
|
@ -58,11 +58,82 @@ using SettingFieldInt64 = SettingFieldNumber<Int64>;
|
||||
using SettingFieldFloat = SettingFieldNumber<float>;
|
||||
using SettingFieldBool = SettingFieldNumber<bool>;
|
||||
|
||||
|
||||
/** Unlike SettingFieldUInt64, supports the value of 'auto' - the number of processor cores without taking into account SMT.
|
||||
* A value of 0 is also treated as auto.
|
||||
* When serializing, `auto` is written in the same way as 0.
|
||||
/** Wraps any SettingField to support special value 'auto' that can be checked with `is_auto` flag.
|
||||
* Note about serialization:
|
||||
* The new versions with `SettingsWriteFormat::STRINGS_WITH_FLAGS` serialize values as a string.
|
||||
* In legacy SettingsWriteFormat mode, functions `read/writeBinary` would serialize values as a binary, and 'is_auto' would be ignored.
|
||||
* It's possible to upgrade settings from regular type to wrapped ones and keep compatibility with old versions,
|
||||
* but when serializing 'auto' old version will see binary representation of the default value.
|
||||
*/
|
||||
template <typename Base>
|
||||
struct SettingAutoWrapper
|
||||
{
|
||||
constexpr static auto keyword = "auto";
|
||||
static bool isAuto(const Field & f) { return f.getType() == Field::Types::String && f.safeGet<const String &>() == keyword; }
|
||||
static bool isAuto(const String & str) { return str == keyword; }
|
||||
|
||||
using Type = typename Base::Type;
|
||||
|
||||
Base base;
|
||||
bool is_auto = false;
|
||||
bool changed = false;
|
||||
|
||||
explicit SettingAutoWrapper() : is_auto(true) {}
|
||||
explicit SettingAutoWrapper(Type val) : is_auto(false) { base = Base(val); }
|
||||
|
||||
explicit SettingAutoWrapper(const Field & f)
|
||||
: is_auto(isAuto(f))
|
||||
{
|
||||
if (!is_auto)
|
||||
base = Base(f);
|
||||
}
|
||||
|
||||
SettingAutoWrapper & operator=(const Field & f)
|
||||
{
|
||||
changed = true;
|
||||
if (is_auto = isAuto(f); !is_auto)
|
||||
base = f;
|
||||
return *this;
|
||||
}
|
||||
|
||||
explicit operator Field() const { return is_auto ? Field(keyword) : Field(base); }
|
||||
|
||||
String toString() const { return is_auto ? keyword : base.toString(); }
|
||||
|
||||
void parseFromString(const String & str)
|
||||
{
|
||||
changed = true;
|
||||
if (is_auto = isAuto(str); !is_auto)
|
||||
base.parseFromString(str);
|
||||
}
|
||||
|
||||
void writeBinary(WriteBuffer & out) const
|
||||
{
|
||||
if (is_auto)
|
||||
Base().writeBinary(out); /// serialize default value
|
||||
else
|
||||
base.writeBinary(out);
|
||||
}
|
||||
|
||||
/*
|
||||
* That it is fine to reset `is_auto` here and to use default value in case `is_auto`
|
||||
* because settings will be serialized only if changed.
|
||||
* If they were changed they were requested to use explicit value instead of `auto`.
|
||||
* And so interactions between client-server, and server-server (distributed queries), should be OK.
|
||||
*/
|
||||
void readBinary(ReadBuffer & in) { changed = true; is_auto = false; base.readBinary(in); }
|
||||
|
||||
Type valueOr(Type default_value) const { return is_auto ? default_value : base.value; }
|
||||
};
|
||||
|
||||
using SettingFieldUInt64Auto = SettingAutoWrapper<SettingFieldUInt64>;
|
||||
using SettingFieldInt64Auto = SettingAutoWrapper<SettingFieldInt64>;
|
||||
using SettingFieldFloatAuto = SettingAutoWrapper<SettingFieldFloat>;
|
||||
|
||||
/* Similar to SettingFieldUInt64Auto with small differences to behave like regular UInt64, supported to compatibility.
|
||||
* When setting to 'auto' it becomes equal to the number of processor cores without taking into account SMT.
|
||||
* A value of 0 is also treated as 'auto', so 'auto' is parsed and serialized in the same way as 0.
|
||||
*/
|
||||
struct SettingFieldMaxThreads
|
||||
{
|
||||
bool is_auto;
|
||||
|
@ -1024,7 +1024,7 @@ std::optional<size_t> CachedReadBufferFromRemoteFS::getLastNonDownloadedOffset()
|
||||
|
||||
void CachedReadBufferFromRemoteFS::assertCorrectness() const
|
||||
{
|
||||
if (IFileCache::isReadOnly() && !settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache)
|
||||
if (FileCache::isReadOnly() && !settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache usage is not allowed");
|
||||
}
|
||||
|
||||
|
@ -1,6 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include <Common/IFileCache.h>
|
||||
#include <Common/FileCache.h>
|
||||
#include <IO/SeekableReadBuffer.h>
|
||||
#include <IO/WriteBufferFromFile.h>
|
||||
#include <IO/ReadSettings.h>
|
||||
@ -81,7 +81,7 @@ private:
|
||||
bool writeCache(char * data, size_t size, size_t offset, FileSegment & file_segment);
|
||||
|
||||
Poco::Logger * log;
|
||||
IFileCache::Key cache_key;
|
||||
FileCache::Key cache_key;
|
||||
String remote_fs_object_path;
|
||||
FileCachePtr cache;
|
||||
ReadSettings settings;
|
||||
@ -128,7 +128,7 @@ private:
|
||||
CurrentMetrics::Increment metric_increment{CurrentMetrics::FilesystemCacheReadBuffers};
|
||||
ProfileEvents::Counters current_file_segment_counters;
|
||||
|
||||
IFileCache::QueryContextHolder query_context_holder;
|
||||
FileCache::QueryContextHolder query_context_holder;
|
||||
|
||||
bool is_persistent;
|
||||
};
|
||||
|
@ -35,7 +35,7 @@ ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
|
||||
|
||||
with_cache = settings.remote_fs_cache
|
||||
&& settings.enable_filesystem_cache
|
||||
&& (!IFileCache::isReadOnly() || settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache);
|
||||
&& (!FileCache::isReadOnly() || settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache);
|
||||
}
|
||||
|
||||
SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(const String & path, size_t file_size)
|
||||
|
@ -10,7 +10,8 @@
|
||||
#include <Common/quoteString.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/filesystemHelpers.h>
|
||||
#include <Common/IFileCache.h>
|
||||
#include <Disks/IO/ThreadPoolRemoteFSReader.h>
|
||||
#include <Common/FileCache.h>
|
||||
#include <Disks/ObjectStorages/DiskObjectStorageRemoteMetadataRestoreHelper.h>
|
||||
#include <Disks/ObjectStorages/DiskObjectStorageTransaction.h>
|
||||
#include <Disks/FakeDiskTransaction.h>
|
||||
|
@ -1,7 +1,7 @@
|
||||
#include <Disks/ObjectStorages/DiskObjectStorageCommon.h>
|
||||
#include <Common/getRandomASCIIString.h>
|
||||
#include <Common/FileCacheFactory.h>
|
||||
#include <Common/IFileCache.h>
|
||||
#include <Common/FileCache.h>
|
||||
#include <Common/FileCacheSettings.h>
|
||||
#include <Interpreters/Context.h>
|
||||
|
||||
|
@ -16,6 +16,7 @@
|
||||
#include <Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h>
|
||||
#include <Disks/ObjectStorages/StoredObject.h>
|
||||
#include <Common/ThreadPool.h>
|
||||
#include <Common/FileCache.h>
|
||||
#include <Disks/WriteMode.h>
|
||||
|
||||
|
||||
|
@ -1,7 +1,7 @@
|
||||
#include <Disks/ObjectStorages/LocalObjectStorage.h>
|
||||
|
||||
#include <Disks/ObjectStorages/DiskObjectStorageCommon.h>
|
||||
#include <Common/IFileCache.h>
|
||||
#include <Common/FileCache.h>
|
||||
#include <Common/FileCacheFactory.h>
|
||||
#include <Common/filesystemHelpers.h>
|
||||
#include <Common/logger_useful.h>
|
||||
|
@ -24,7 +24,7 @@
|
||||
#include <aws/s3/model/UploadPartCopyRequest.h>
|
||||
#include <aws/s3/model/AbortMultipartUploadRequest.h>
|
||||
|
||||
#include <Common/IFileCache.h>
|
||||
#include <Common/FileCache.h>
|
||||
#include <Common/FileCacheFactory.h>
|
||||
#include <Common/getRandomASCIIString.h>
|
||||
#include <Common/logger_useful.h>
|
||||
@ -128,7 +128,7 @@ void S3ObjectStorage::removeCacheIfExists(const std::string & path_key)
|
||||
if (!cache || path_key.empty())
|
||||
return;
|
||||
|
||||
IFileCache::Key key = cache->hash(path_key);
|
||||
FileCache::Key key = cache->hash(path_key);
|
||||
cache->removeIfExists(key);
|
||||
}
|
||||
|
||||
@ -500,7 +500,7 @@ ReadSettings S3ObjectStorage::patchSettings(const ReadSettings & read_settings)
|
||||
ReadSettings settings{read_settings};
|
||||
if (cache)
|
||||
{
|
||||
if (IFileCache::isReadOnly())
|
||||
if (FileCache::isReadOnly())
|
||||
settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache = true;
|
||||
|
||||
settings.remote_fs_cache = cache;
|
||||
|
@ -3,8 +3,8 @@
|
||||
#if USE_AWS_S3
|
||||
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/IFileCache.h>
|
||||
#include <Common/Throttler.h>
|
||||
#include <Common/FileCache.h>
|
||||
|
||||
#include <IO/WriteBufferFromS3.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
|
@ -12,9 +12,9 @@
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <Common/filesystemHelpers.h>
|
||||
#include <Common/FileCacheFactory.h>
|
||||
#include <Common/IFileCache.h>
|
||||
#include <Common/getCurrentProcessFDCount.h>
|
||||
#include <Common/getMaxFileDescriptorCount.h>
|
||||
#include <Common/FileCache.h>
|
||||
#include <Server/ProtocolServerAdapter.h>
|
||||
#include <Storages/MarkCache.h>
|
||||
#include <Storages/StorageMergeTree.h>
|
||||
|
@ -1,8 +0,0 @@
|
||||
#include <Interpreters/DictionaryJoinAdapter.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
|
||||
}
|
@ -1,7 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
}
|
@ -18,7 +18,6 @@
|
||||
#include <Interpreters/ArrayJoinAction.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/ConcurrentHashJoin.h>
|
||||
#include <Interpreters/DictionaryJoinAdapter.h>
|
||||
#include <Interpreters/evaluateConstantExpression.h>
|
||||
#include <Interpreters/ExpressionActions.h>
|
||||
#include <Interpreters/ExpressionAnalyzer.h>
|
||||
|
@ -1299,6 +1299,7 @@ scope_guard ExternalLoader::addConfigRepository(std::unique_ptr<IExternalLoaderC
|
||||
return [this, ptr, name]()
|
||||
{
|
||||
config_files_reader->removeConfigRepository(ptr);
|
||||
CurrentStatusInfo::unset(CurrentStatusInfo::DictionaryStatus, name);
|
||||
reloadConfig(name);
|
||||
};
|
||||
}
|
||||
|
@ -6,7 +6,7 @@
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
#include <Storages/ColumnsDescription.h>
|
||||
#include <Common/FileCacheFactory.h>
|
||||
#include <Common/IFileCache.h>
|
||||
#include <Common/FileCache.h>
|
||||
#include <Access/Common/AccessFlags.h>
|
||||
#include <Core/Block.h>
|
||||
|
||||
|
@ -8,7 +8,7 @@
|
||||
#include <Common/escapeForFileName.h>
|
||||
#include <Common/ShellCommand.h>
|
||||
#include <Common/FileCacheFactory.h>
|
||||
#include <Common/IFileCache.h>
|
||||
#include <Common/FileCache.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/DatabaseCatalog.h>
|
||||
#include <Interpreters/ExternalDictionariesLoader.h>
|
||||
|
@ -6,29 +6,30 @@
|
||||
#include <Common/checkStackSize.h>
|
||||
#include <Core/SettingsEnums.h>
|
||||
|
||||
#include <Interpreters/TreeRewriter.h>
|
||||
#include <Interpreters/LogicalExpressionsOptimizer.h>
|
||||
#include <Interpreters/QueryAliasesVisitor.h>
|
||||
#include <Interpreters/ArrayJoinedColumnsVisitor.h>
|
||||
#include <Interpreters/TranslateQualifiedNamesVisitor.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/FunctionNameNormalizer.h>
|
||||
#include <Interpreters/MarkTableIdentifiersVisitor.h>
|
||||
#include <Interpreters/QueryNormalizer.h>
|
||||
#include <Interpreters/GroupingSetsRewriterVisitor.h>
|
||||
#include <Interpreters/ExecuteScalarSubqueriesVisitor.h>
|
||||
#include <Interpreters/CollectJoinOnKeysVisitor.h>
|
||||
#include <Interpreters/RequiredSourceColumnsVisitor.h>
|
||||
#include <Interpreters/GetAggregatesVisitor.h>
|
||||
#include <Interpreters/UserDefinedSQLFunctionVisitor.h>
|
||||
#include <Interpreters/TableJoin.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/ExecuteScalarSubqueriesVisitor.h>
|
||||
#include <Interpreters/ExpressionActions.h> /// getSmallestColumn()
|
||||
#include <Interpreters/getTableExpressions.h>
|
||||
#include <Interpreters/TreeOptimizer.h>
|
||||
#include <Interpreters/replaceAliasColumnsInQuery.h>
|
||||
#include <Interpreters/evaluateConstantExpression.h>
|
||||
#include <Interpreters/FunctionNameNormalizer.h>
|
||||
#include <Interpreters/GetAggregatesVisitor.h>
|
||||
#include <Interpreters/GroupingSetsRewriterVisitor.h>
|
||||
#include <Interpreters/LogicalExpressionsOptimizer.h>
|
||||
#include <Interpreters/MarkTableIdentifiersVisitor.h>
|
||||
#include <Interpreters/PredicateExpressionsOptimizer.h>
|
||||
#include <Interpreters/QueryAliasesVisitor.h>
|
||||
#include <Interpreters/QueryNormalizer.h>
|
||||
#include <Interpreters/RequiredSourceColumnsVisitor.h>
|
||||
#include <Interpreters/RewriteOrderByVisitor.hpp>
|
||||
#include <Interpreters/TableJoin.h>
|
||||
#include <Interpreters/TranslateQualifiedNamesVisitor.h>
|
||||
#include <Interpreters/TreeOptimizer.h>
|
||||
#include <Interpreters/TreeRewriter.h>
|
||||
#include <Interpreters/UserDefinedSQLFunctionFactory.h>
|
||||
#include <Interpreters/UserDefinedSQLFunctionVisitor.h>
|
||||
#include <Interpreters/evaluateConstantExpression.h>
|
||||
#include <Interpreters/getTableExpressions.h>
|
||||
#include <Interpreters/replaceAliasColumnsInQuery.h>
|
||||
#include <Interpreters/replaceForPositionalArguments.h>
|
||||
|
||||
#include <Parsers/IAST_fwd.h>
|
||||
@ -1405,8 +1406,11 @@ TreeRewriterResultPtr TreeRewriter::analyze(
|
||||
void TreeRewriter::normalize(
|
||||
ASTPtr & query, Aliases & aliases, const NameSet & source_columns_set, bool ignore_alias, const Settings & settings, bool allow_self_aliases, ContextPtr context_)
|
||||
{
|
||||
UserDefinedSQLFunctionVisitor::Data data_user_defined_functions_visitor;
|
||||
UserDefinedSQLFunctionVisitor(data_user_defined_functions_visitor).visit(query);
|
||||
if (!UserDefinedSQLFunctionFactory::instance().empty())
|
||||
{
|
||||
UserDefinedSQLFunctionVisitor::Data data_user_defined_functions_visitor;
|
||||
UserDefinedSQLFunctionVisitor(data_user_defined_functions_visitor).visit(query);
|
||||
}
|
||||
|
||||
CustomizeCountDistinctVisitor::Data data_count_distinct{settings.count_distinct_implementation};
|
||||
CustomizeCountDistinctVisitor(data_count_distinct).visit(query);
|
||||
|
@ -160,4 +160,9 @@ std::vector<std::string> UserDefinedSQLFunctionFactory::getAllRegisteredNames()
|
||||
return registered_names;
|
||||
}
|
||||
|
||||
bool UserDefinedSQLFunctionFactory::empty() const
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
return function_name_to_create_query.empty();
|
||||
}
|
||||
}
|
||||
|
@ -43,6 +43,9 @@ public:
|
||||
/// Get all user defined functions registered names.
|
||||
std::vector<String> getAllRegisteredNames() const override;
|
||||
|
||||
/// Check whether any UDFs have been registered
|
||||
bool empty() const;
|
||||
|
||||
private:
|
||||
std::unordered_map<String, ASTPtr> function_name_to_create_query;
|
||||
mutable std::mutex mutex;
|
||||
|
@ -490,7 +490,8 @@ static NameSet collectFilesToSkip(
|
||||
|
||||
for (const auto & index : indices_to_recalc)
|
||||
{
|
||||
files_to_skip.insert(index->getFileName() + ".idx");
|
||||
/// Since MinMax index has .idx2 extension, we need to add correct extension.
|
||||
files_to_skip.insert(index->getFileName() + index->getSerializedFileExtension());
|
||||
files_to_skip.insert(index->getFileName() + mrk_extension);
|
||||
}
|
||||
|
||||
|
@ -4443,8 +4443,9 @@ SinkToStoragePtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/, con
|
||||
bool deduplicate = storage_settings_ptr->replicated_deduplication_window != 0 && query_settings.insert_deduplicate;
|
||||
|
||||
// TODO: should we also somehow pass list of columns to deduplicate on to the ReplicatedMergeTreeSink?
|
||||
// TODO: insert_quorum = 'auto' would be supported in https://github.com/ClickHouse/ClickHouse/pull/39970, now it's same as 0.
|
||||
return std::make_shared<ReplicatedMergeTreeSink>(
|
||||
*this, metadata_snapshot, query_settings.insert_quorum,
|
||||
*this, metadata_snapshot, query_settings.insert_quorum.valueOr(0),
|
||||
query_settings.insert_quorum_timeout.totalMilliseconds(),
|
||||
query_settings.max_partitions_per_insert_block,
|
||||
query_settings.insert_quorum_parallel,
|
||||
|
@ -2,7 +2,7 @@
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <DataTypes/DataTypeTuple.h>
|
||||
#include <Common/IFileCache.h>
|
||||
#include <Common/FileCache.h>
|
||||
#include <Common/FileSegment.h>
|
||||
#include <Common/FileCacheFactory.h>
|
||||
#include <Interpreters/Context.h>
|
||||
|
@ -1,7 +1,7 @@
|
||||
#include "StorageSystemRemoteDataPaths.h"
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
#include <DataTypes/DataTypeArray.h>
|
||||
#include <Common/IFileCache.h>
|
||||
#include <Common/FileCache.h>
|
||||
#include <Common/FileCacheFactory.h>
|
||||
#include <Columns/ColumnString.h>
|
||||
#include <Columns/ColumnArray.h>
|
||||
|
@ -239,8 +239,8 @@ def create_test_html_report(
|
||||
)
|
||||
|
||||
raw_log_name = os.path.basename(raw_log_url)
|
||||
if raw_log_name.endswith("?check_suite_focus=true"):
|
||||
raw_log_name = "Job (github actions)"
|
||||
if "?" in raw_log_name:
|
||||
raw_log_name = raw_log_name.split("?")[0]
|
||||
|
||||
result = HTML_BASE_TEST_TEMPLATE.format(
|
||||
title=_format_header(header, branch_name),
|
||||
|
6
tests/config/config.d/prometheus.xml
Normal file
6
tests/config/config.d/prometheus.xml
Normal file
@ -0,0 +1,6 @@
|
||||
<clickhouse>
|
||||
<prometheus>
|
||||
<endpoint>/metrics</endpoint>
|
||||
<port>9988</port>
|
||||
</prometheus>
|
||||
</clickhouse>
|
@ -35,6 +35,7 @@ ln -sf $SRC_PATH/config.d/logging_no_rotate.xml $DEST_SERVER_PATH/config.d/
|
||||
ln -sf $SRC_PATH/config.d/merge_tree.xml $DEST_SERVER_PATH/config.d/
|
||||
ln -sf $SRC_PATH/config.d/metadata_cache.xml $DEST_SERVER_PATH/config.d/
|
||||
ln -sf $SRC_PATH/config.d/tcp_with_proxy.xml $DEST_SERVER_PATH/config.d/
|
||||
ln -sf $SRC_PATH/config.d/prometheus.xml $DEST_SERVER_PATH/config.d/
|
||||
ln -sf $SRC_PATH/config.d/top_level_domains_lists.xml $DEST_SERVER_PATH/config.d/
|
||||
ln -sf $SRC_PATH/config.d/top_level_domains_path.xml $DEST_SERVER_PATH/config.d/
|
||||
ln -sf $SRC_PATH/config.d/transactions.xml $DEST_SERVER_PATH/config.d/
|
||||
|
@ -9,6 +9,7 @@
|
||||
[zookeeper :as zk])
|
||||
(:import (org.apache.zookeeper ZooKeeper KeeperException KeeperException$BadVersionException)))
|
||||
|
||||
(def root-path "/counter")
|
||||
(defn r [_ _] {:type :invoke, :f :read})
|
||||
(defn add [_ _] {:type :invoke, :f :add, :value (rand-int 5)})
|
||||
|
||||
@ -20,17 +21,19 @@
|
||||
:conn (zk-connect node 9181 30000))
|
||||
:nodename node))
|
||||
|
||||
(setup! [this test])
|
||||
(setup! [this test]
|
||||
(exec-with-retries 30 (fn []
|
||||
(zk-create-if-not-exists conn root-path ""))))
|
||||
|
||||
(invoke! [this test op]
|
||||
(case (:f op)
|
||||
:read (exec-with-retries 30 (fn []
|
||||
(assoc op
|
||||
:type :ok
|
||||
:value (count (zk-list conn "/")))))
|
||||
:value (count (zk-list conn root-path)))))
|
||||
:add (try
|
||||
(do
|
||||
(zk-multi-create-many-seq-nodes conn "/seq-" (:value op))
|
||||
(zk-multi-create-many-seq-nodes conn (concat-path root-path "seq-") (:value op))
|
||||
(assoc op :type :ok))
|
||||
(catch Exception _ (assoc op :type :info, :error :connect-error)))))
|
||||
|
||||
|
@ -0,0 +1,4 @@
|
||||
0 0 UInt64Auto
|
||||
auto 1 UInt64Auto
|
||||
0 1 UInt64Auto
|
||||
1 1 UInt64Auto
|
10
tests/queries/0_stateless/02381_setting_value_auto.sql
Normal file
10
tests/queries/0_stateless/02381_setting_value_auto.sql
Normal file
@ -0,0 +1,10 @@
|
||||
SELECT value, changed, type FROM system.settings WHERE name = 'insert_quorum';
|
||||
|
||||
SET insert_quorum = 'auto';
|
||||
SELECT value, changed, type FROM system.settings WHERE name = 'insert_quorum';
|
||||
|
||||
SET insert_quorum = 0;
|
||||
SELECT value, changed, type FROM system.settings WHERE name = 'insert_quorum';
|
||||
|
||||
SET insert_quorum = 1;
|
||||
SELECT value, changed, type FROM system.settings WHERE name = 'insert_quorum';
|
@ -0,0 +1,18 @@
|
||||
status before reload
|
||||
status after reload
|
||||
NOT_LOADED 0
|
||||
LOADED 0
|
||||
FAILED 1
|
||||
LOADING 0
|
||||
FAILED_AND_RELOADING 0
|
||||
LOADED_AND_RELOADING 0
|
||||
NOT_EXIST 0
|
||||
status after reload, table exists
|
||||
NOT_LOADED 0
|
||||
LOADED 1
|
||||
FAILED 0
|
||||
LOADING 0
|
||||
FAILED_AND_RELOADING 0
|
||||
LOADED_AND_RELOADING 0
|
||||
NOT_EXIST 0
|
||||
status after drop
|
@ -0,0 +1,38 @@
|
||||
#!/usr/bin/env bash
|
||||
# Tags: no-ordinary-database
|
||||
|
||||
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CUR_DIR"/../shell_config.sh
|
||||
|
||||
function get_dictionary_status()
|
||||
{
|
||||
local name=$1 && shift
|
||||
$CLICKHOUSE_CURL -sS "$CLICKHOUSE_URL_PROMETHEUS" | {
|
||||
awk -F'[{}=," ]' -vname="$name" '/ClickHouseStatusInfo_DictionaryStatus{/ && $(NF-3) == name { print $4, $NF }'
|
||||
}
|
||||
}
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "CREATE DICTIONARY dict (key Int, value String) PRIMARY KEY key SOURCE(CLICKHOUSE(TABLE data)) LAYOUT(HASHED()) LIFETIME(0)"
|
||||
uuid="$($CLICKHOUSE_CLIENT -q "SELECT uuid FROM system.dictionaries WHERE database = '$CLICKHOUSE_DATABASE' AND name = 'dict'")"
|
||||
|
||||
echo 'status before reload'
|
||||
get_dictionary_status "$uuid"
|
||||
|
||||
# source table does not exists
|
||||
# NOTE: when dictionary does not exist it produce BAD_ARGUMENTS error, so using UNKNOWN_TABLE is safe
|
||||
$CLICKHOUSE_CLIENT -n -q "SYSTEM RELOAD DICTIONARY dict -- { serverError UNKNOWN_TABLE }"
|
||||
echo 'status after reload'
|
||||
get_dictionary_status "$uuid"
|
||||
|
||||
# create source
|
||||
$CLICKHOUSE_CLIENT -q "CREATE TABLE data (key Int, value String) Engine=Null"
|
||||
$CLICKHOUSE_CLIENT -q "SYSTEM RELOAD DICTIONARY dict"
|
||||
echo 'status after reload, table exists'
|
||||
get_dictionary_status "$uuid"
|
||||
|
||||
# remove dictionary
|
||||
$CLICKHOUSE_CLIENT -q "DROP DICTIONARY dict"
|
||||
$CLICKHOUSE_CLIENT -q "DROP TABLE data"
|
||||
echo 'status after drop'
|
||||
get_dictionary_status "$uuid"
|
@ -66,6 +66,8 @@ export CLICKHOUSE_PORT_TCP_WITH_PROXY=${CLICKHOUSE_PORT_TCP_WITH_PROXY:=$(${CLIC
|
||||
export CLICKHOUSE_PORT_TCP_WITH_PROXY=${CLICKHOUSE_PORT_TCP_WITH_PROXY:="9010"}
|
||||
export CLICKHOUSE_PORT_HTTP=${CLICKHOUSE_PORT_HTTP:=$(${CLICKHOUSE_EXTRACT_CONFIG} --key=http_port 2>/dev/null)}
|
||||
export CLICKHOUSE_PORT_HTTP=${CLICKHOUSE_PORT_HTTP:="8123"}
|
||||
export CLICKHOUSE_PORT_PROMTHEUS_PORT=${CLICKHOUSE_PORT_PROMTHEUS_PORT:=$(${CLICKHOUSE_EXTRACT_CONFIG} --key=prometheus.port 2>/dev/null)}
|
||||
export CLICKHOUSE_PORT_PROMTHEUS_PORT=${CLICKHOUSE_PORT_PROMTHEUS_PORT:="9988"}
|
||||
export CLICKHOUSE_PORT_HTTPS=${CLICKHOUSE_PORT_HTTPS:=$(${CLICKHOUSE_EXTRACT_CONFIG} --try --key=https_port 2>/dev/null)} 2>/dev/null
|
||||
export CLICKHOUSE_PORT_HTTPS=${CLICKHOUSE_PORT_HTTPS:="8443"}
|
||||
export CLICKHOUSE_PORT_HTTP_PROTO=${CLICKHOUSE_PORT_HTTP_PROTO:="http"}
|
||||
@ -98,6 +100,8 @@ then
|
||||
export CLICKHOUSE_URL_HTTPS="${CLICKHOUSE_URL_HTTPS}?${CLICKHOUSE_URL_PARAMS}"
|
||||
fi
|
||||
|
||||
export CLICKHOUSE_URL_PROMETHEUS=${CLICKHOUSE_URL_PROMETHEUS:="${CLICKHOUSE_PORT_HTTP_PROTO}://${CLICKHOUSE_HOST}:${CLICKHOUSE_PORT_PROMTHEUS_PORT}/metrics"}
|
||||
|
||||
export CLICKHOUSE_PORT_INTERSERVER=${CLICKHOUSE_PORT_INTERSERVER:=$(${CLICKHOUSE_EXTRACT_CONFIG} --try --key=interserver_http_port 2>/dev/null)} 2>/dev/null
|
||||
export CLICKHOUSE_PORT_INTERSERVER=${CLICKHOUSE_PORT_INTERSERVER:="9009"}
|
||||
export CLICKHOUSE_URL_INTERSERVER=${CLICKHOUSE_URL_INTERSERVER:="${CLICKHOUSE_PORT_HTTP_PROTO}://${CLICKHOUSE_HOST}:${CLICKHOUSE_PORT_INTERSERVER}/"}
|
||||
|
Loading…
Reference in New Issue
Block a user