Merge branch 'master' into alexey-milovidov-patch-4

This commit is contained in:
Alexey Milovidov 2022-08-20 19:58:31 +03:00 committed by GitHub
commit 0db45afb08
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 301 additions and 260 deletions

View File

@ -53,10 +53,9 @@ jobs:
git diff HEAD
- name: Create Pull Request
uses: peter-evans/create-pull-request@v3
env:
GITHUB_TOKEN: ${{ secrets.ROBOT_CLICKHOUSE_COMMIT_TOKEN }}
with:
author: "robot-clickhouse <robot-clickhouse@users.noreply.github.com>"
token: ${{ secrets.ROBOT_CLICKHOUSE_COMMIT_TOKEN }}
committer: "robot-clickhouse <robot-clickhouse@users.noreply.github.com>"
commit-message: Update version_date.tsv and changelogs after ${{ env.GITHUB_TAG }}
branch: auto/${{ env.GITHUB_TAG }}

View File

@ -387,6 +387,7 @@ else
-e "TABLE_IS_READ_ONLY" \
-e "Code: 1000, e.code() = 111, Connection refused" \
-e "UNFINISHED" \
-e "NETLINK_ERROR" \
-e "Renaming unexpected part" \
-e "PART_IS_TEMPORARILY_LOCKED" \
-e "and a merge is impossible: we didn't find" \

View File

@ -30,12 +30,12 @@ FileCache::FileCache(
, max_element_size(cache_settings_.max_elements)
, max_file_segment_size(cache_settings_.max_file_segment_size)
, allow_persistent_files(cache_settings_.do_not_evict_index_and_mark_files)
, enable_cache_hits_threshold(cache_settings_.enable_cache_hits_threshold)
, enable_filesystem_query_cache_limit(cache_settings_.enable_filesystem_query_cache_limit)
, log(&Poco::Logger::get("FileCache"))
, main_priority(std::make_unique<LRUFileCachePriority>())
, stash_priority(std::make_unique<LRUFileCachePriority>())
, max_stash_element_size(cache_settings_.max_elements)
, enable_cache_hits_threshold(cache_settings_.enable_cache_hits_threshold)
, log(&Poco::Logger::get("FileCache"))
{
}
@ -77,132 +77,6 @@ void FileCache::assertInitialized() const
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Cache not initialized");
}
FileCache::QueryContextPtr FileCache::getCurrentQueryContext(std::lock_guard<std::mutex> & cache_lock)
{
if (!isQueryInitialized())
return nullptr;
return getQueryContext(std::string(CurrentThread::getQueryId()), cache_lock);
}
FileCache::QueryContextPtr FileCache::getQueryContext(const String & query_id, std::lock_guard<std::mutex> & /* cache_lock */)
{
auto query_iter = query_map.find(query_id);
return (query_iter == query_map.end()) ? nullptr : query_iter->second;
}
void FileCache::removeQueryContext(const String & query_id)
{
std::lock_guard cache_lock(mutex);
auto query_iter = query_map.find(query_id);
if (query_iter == query_map.end())
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Attempt to release query context that does not exist (query_id: {})",
query_id);
}
query_map.erase(query_iter);
}
FileCache::QueryContextPtr FileCache::getOrSetQueryContext(
const String & query_id, const ReadSettings & settings, std::lock_guard<std::mutex> & cache_lock)
{
if (query_id.empty())
return nullptr;
auto context = getQueryContext(query_id, cache_lock);
if (context)
return context;
auto query_context = std::make_shared<QueryContext>(settings.max_query_cache_size, settings.skip_download_if_exceeds_query_cache);
auto query_iter = query_map.emplace(query_id, query_context).first;
return query_iter->second;
}
FileCache::QueryContextHolder FileCache::getQueryContextHolder(const String & query_id, const ReadSettings & settings)
{
std::lock_guard cache_lock(mutex);
if (!enable_filesystem_query_cache_limit || settings.max_query_cache_size == 0)
return {};
/// if enable_filesystem_query_cache_limit is true, and max_query_cache_size large than zero,
/// we create context query for current query.
auto context = getOrSetQueryContext(query_id, settings, cache_lock);
return QueryContextHolder(query_id, this, context);
}
void FileCache::QueryContext::remove(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock)
{
if (cache_size < size)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Deleted cache size exceeds existing cache size");
if (!skip_download_if_exceeds_query_cache)
{
auto record = records.find({key, offset});
if (record != records.end())
{
record->second->removeAndGetNext(cache_lock);
records.erase({key, offset});
}
}
cache_size -= size;
}
void FileCache::QueryContext::reserve(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock)
{
if (cache_size + size > max_cache_size)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Reserved cache size exceeds the remaining cache size (key: {}, offset: {})",
key.toString(), offset);
}
if (!skip_download_if_exceeds_query_cache)
{
auto record = records.find({key, offset});
if (record == records.end())
{
auto queue_iter = priority->add(key, offset, 0, cache_lock);
record = records.insert({{key, offset}, queue_iter}).first;
}
record->second->incrementSize(size, cache_lock);
}
cache_size += size;
}
void FileCache::QueryContext::use(const Key & key, size_t offset, std::lock_guard<std::mutex> & cache_lock)
{
if (skip_download_if_exceeds_query_cache)
return;
auto record = records.find({key, offset});
if (record != records.end())
record->second->use(cache_lock);
}
FileCache::QueryContextHolder::QueryContextHolder(
const String & query_id_,
FileCache * cache_,
FileCache::QueryContextPtr context_)
: query_id(query_id_)
, cache(cache_)
, context(context_)
{
}
FileCache::QueryContextHolder::~QueryContextHolder()
{
/// If only the query_map and the current holder hold the context_query,
/// the query has been completed and the query_context is released.
if (context && context.use_count() == 2)
cache->removeQueryContext(query_id);
}
void FileCache::initialize()
{
std::lock_guard cache_lock(mutex);
@ -1222,12 +1096,6 @@ size_t FileCache::getUsedCacheSizeUnlocked(std::lock_guard<std::mutex> & cache_l
return main_priority->getCacheSize(cache_lock);
}
size_t FileCache::getAvailableCacheSize() const
{
std::lock_guard cache_lock(mutex);
return getAvailableCacheSizeUnlocked(cache_lock);
}
size_t FileCache::getAvailableCacheSizeUnlocked(std::lock_guard<std::mutex> & cache_lock) const
{
return max_size - getUsedCacheSizeUnlocked(cache_lock);
@ -1346,4 +1214,130 @@ void FileCache::assertPriorityCorrectness(std::lock_guard<std::mutex> & cache_lo
assert(main_priority->getElementsNum(cache_lock) <= max_element_size);
}
FileCache::QueryContextPtr FileCache::getCurrentQueryContext(std::lock_guard<std::mutex> & cache_lock)
{
if (!isQueryInitialized())
return nullptr;
return getQueryContext(std::string(CurrentThread::getQueryId()), cache_lock);
}
FileCache::QueryContextPtr FileCache::getQueryContext(const String & query_id, std::lock_guard<std::mutex> & /* cache_lock */)
{
auto query_iter = query_map.find(query_id);
return (query_iter == query_map.end()) ? nullptr : query_iter->second;
}
void FileCache::removeQueryContext(const String & query_id)
{
std::lock_guard cache_lock(mutex);
auto query_iter = query_map.find(query_id);
if (query_iter == query_map.end())
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Attempt to release query context that does not exist (query_id: {})",
query_id);
}
query_map.erase(query_iter);
}
FileCache::QueryContextPtr FileCache::getOrSetQueryContext(
const String & query_id, const ReadSettings & settings, std::lock_guard<std::mutex> & cache_lock)
{
if (query_id.empty())
return nullptr;
auto context = getQueryContext(query_id, cache_lock);
if (context)
return context;
auto query_context = std::make_shared<QueryContext>(settings.max_query_cache_size, settings.skip_download_if_exceeds_query_cache);
auto query_iter = query_map.emplace(query_id, query_context).first;
return query_iter->second;
}
FileCache::QueryContextHolder FileCache::getQueryContextHolder(const String & query_id, const ReadSettings & settings)
{
std::lock_guard cache_lock(mutex);
if (!enable_filesystem_query_cache_limit || settings.max_query_cache_size == 0)
return {};
/// if enable_filesystem_query_cache_limit is true, and max_query_cache_size large than zero,
/// we create context query for current query.
auto context = getOrSetQueryContext(query_id, settings, cache_lock);
return QueryContextHolder(query_id, this, context);
}
void FileCache::QueryContext::remove(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock)
{
if (cache_size < size)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Deleted cache size exceeds existing cache size");
if (!skip_download_if_exceeds_query_cache)
{
auto record = records.find({key, offset});
if (record != records.end())
{
record->second->removeAndGetNext(cache_lock);
records.erase({key, offset});
}
}
cache_size -= size;
}
void FileCache::QueryContext::reserve(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock)
{
if (cache_size + size > max_cache_size)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Reserved cache size exceeds the remaining cache size (key: {}, offset: {})",
key.toString(), offset);
}
if (!skip_download_if_exceeds_query_cache)
{
auto record = records.find({key, offset});
if (record == records.end())
{
auto queue_iter = priority->add(key, offset, 0, cache_lock);
record = records.insert({{key, offset}, queue_iter}).first;
}
record->second->incrementSize(size, cache_lock);
}
cache_size += size;
}
void FileCache::QueryContext::use(const Key & key, size_t offset, std::lock_guard<std::mutex> & cache_lock)
{
if (skip_download_if_exceeds_query_cache)
return;
auto record = records.find({key, offset});
if (record != records.end())
record->second->use(cache_lock);
}
FileCache::QueryContextHolder::QueryContextHolder(
const String & query_id_,
FileCache * cache_,
FileCache::QueryContextPtr context_)
: query_id(query_id_)
, cache(cache_)
, context(context_)
{
}
FileCache::QueryContextHolder::~QueryContextHolder()
{
/// If only the query_map and the current holder hold the context_query,
/// the query has been completed and the query_context is released.
if (context && context.use_count() == 2)
cache->removeQueryContext(query_id);
}
}

View File

@ -23,13 +23,17 @@ namespace DB
{
/// Local cache for remote filesystem files, represented as a set of non-overlapping non-empty file segments.
/// Different caching algorithms are implemented based on IFileCachePriority.
/// Different caching algorithms are implemented using IFileCachePriority.
class FileCache : private boost::noncopyable
{
friend class FileSegment;
friend class IFileCachePriority;
friend struct FileSegmentsHolder;
friend class FileSegmentRangeWriter;
friend class FileSegment;
friend class IFileCachePriority;
friend struct FileSegmentsHolder;
friend class FileSegmentRangeWriter;
struct QueryContext;
using QueryContextPtr = std::shared_ptr<QueryContext>;
public:
using Key = DB::FileCacheKey;
@ -41,25 +45,8 @@ public:
/// Restore cache from local filesystem.
void initialize();
void removeIfExists(const Key & key);
void removeIfReleasable();
static bool isReadOnly();
/// Cache capacity in bytes.
size_t capacity() const { return max_size; }
static Key hash(const String & path);
String getPathInLocalCache(const Key & key, size_t offset, bool is_persistent) const;
String getPathInLocalCache(const Key & key) const;
const String & getBasePath() const { return cache_base_path; }
std::vector<String> tryGetCachePaths(const Key & key);
/**
* Given an `offset` and `size` representing [offset, offset + size) bytes interval,
* return list of cached non-overlapping non-empty
@ -84,6 +71,28 @@ public:
*/
FileSegmentsHolder get(const Key & key, size_t offset, size_t size);
/// Remove files by `key`. Removes files which might be used at the moment.
void removeIfExists(const Key & key);
/// Remove files by `key`. Will not remove files which are used at the moment.
void removeIfReleasable();
static Key hash(const String & path);
String getPathInLocalCache(const Key & key, size_t offset, bool is_persistent) const;
String getPathInLocalCache(const Key & key) const;
std::vector<String> tryGetCachePaths(const Key & key);
size_t capacity() const { return max_size; }
size_t getUsedCacheSize() const;
size_t getFileSegmentsNum() const;
static bool isReadOnly();
/**
* Create a file segment of exactly requested size with EMPTY state.
* Throw exception if requested size exceeds max allowed file segment size.
@ -102,92 +111,6 @@ public:
/// For debug.
String dumpStructure(const Key & key);
size_t getUsedCacheSize() const;
size_t getFileSegmentsNum() const;
private:
String cache_base_path;
size_t max_size;
size_t max_element_size;
size_t max_file_segment_size;
bool allow_persistent_files;
bool is_initialized = false;
mutable std::mutex mutex;
bool tryReserve(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock);
void remove(Key key, size_t offset, std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & segment_lock);
bool isLastFileSegmentHolder(
const Key & key, size_t offset, std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & segment_lock);
void reduceSizeToDownloaded(
const Key & key, size_t offset, std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & /* segment_lock */);
void assertInitialized() const;
using AccessKeyAndOffset = std::pair<Key, size_t>;
struct KeyAndOffsetHash
{
std::size_t operator()(const AccessKeyAndOffset & key) const
{
return std::hash<UInt128>()(key.first.key) ^ std::hash<UInt64>()(key.second);
}
};
using FileCacheRecords = std::unordered_map<AccessKeyAndOffset, IFileCachePriority::WriteIterator, KeyAndOffsetHash>;
/// Used to track and control the cache access of each query.
/// Through it, we can realize the processing of different queries by the cache layer.
struct QueryContext
{
FileCacheRecords records;
FileCachePriorityPtr priority;
size_t cache_size = 0;
size_t max_cache_size;
bool skip_download_if_exceeds_query_cache;
QueryContext(size_t max_cache_size_, bool skip_download_if_exceeds_query_cache_)
: max_cache_size(max_cache_size_), skip_download_if_exceeds_query_cache(skip_download_if_exceeds_query_cache_)
{
}
void remove(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock);
void reserve(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock);
void use(const Key & key, size_t offset, std::lock_guard<std::mutex> & cache_lock);
size_t getMaxCacheSize() const { return max_cache_size; }
size_t getCacheSize() const { return cache_size; }
FileCachePriorityPtr getPriority() { return priority; }
bool isSkipDownloadIfExceed() const { return skip_download_if_exceeds_query_cache; }
};
using QueryContextPtr = std::shared_ptr<QueryContext>;
using QueryContextMap = std::unordered_map<String, QueryContextPtr>;
QueryContextMap query_map;
bool enable_filesystem_query_cache_limit;
QueryContextPtr getCurrentQueryContext(std::lock_guard<std::mutex> & cache_lock);
QueryContextPtr getQueryContext(const String & query_id, std::lock_guard<std::mutex> & cache_lock);
void removeQueryContext(const String & query_id);
QueryContextPtr getOrSetQueryContext(const String & query_id, const ReadSettings & settings, std::lock_guard<std::mutex> &);
public:
/// Save a query context information, and adopt different cache policies
/// for different queries through the context cache layer.
struct QueryContextHolder : private boost::noncopyable
@ -206,6 +129,43 @@ public:
QueryContextHolder getQueryContextHolder(const String & query_id, const ReadSettings & settings);
private:
String cache_base_path;
size_t max_size;
size_t max_element_size;
size_t max_file_segment_size;
bool allow_persistent_files;
size_t enable_cache_hits_threshold;
bool enable_filesystem_query_cache_limit;
Poco::Logger * log;
bool is_initialized = false;
mutable std::mutex mutex;
bool tryReserve(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock);
void remove(
Key key,
size_t offset,
std::lock_guard<std::mutex> & cache_lock,
std::lock_guard<std::mutex> & segment_lock);
bool isLastFileSegmentHolder(
const Key & key,
size_t offset,
std::lock_guard<std::mutex> & cache_lock,
std::lock_guard<std::mutex> & segment_lock);
void reduceSizeToDownloaded(
const Key & key,
size_t offset,
std::lock_guard<std::mutex> & cache_lock,
std::lock_guard<std::mutex> & segment_lock);
void assertInitialized() const;
struct FileSegmentCell : private boost::noncopyable
{
FileSegmentPtr file_segment;
@ -223,24 +183,30 @@ private:
FileSegmentCell(FileSegmentPtr file_segment_, FileCache * cache, std::lock_guard<std::mutex> & cache_lock);
FileSegmentCell(FileSegmentCell && other) noexcept
: file_segment(std::move(other.file_segment)), queue_iterator(other.queue_iterator)
: file_segment(std::move(other.file_segment)), queue_iterator(std::move(other.queue_iterator)) {}
};
using AccessKeyAndOffset = std::pair<Key, size_t>;
struct KeyAndOffsetHash
{
std::size_t operator()(const AccessKeyAndOffset & key) const
{
return std::hash<UInt128>()(key.first.key) ^ std::hash<UInt64>()(key.second);
}
};
using FileSegmentsByOffset = std::map<size_t, FileSegmentCell>;
using CachedFiles = std::unordered_map<Key, FileSegmentsByOffset>;
using FileCacheRecords = std::unordered_map<AccessKeyAndOffset, IFileCachePriority::WriteIterator, KeyAndOffsetHash>;
CachedFiles files;
std::unique_ptr<IFileCachePriority> main_priority;
FileCacheRecords stash_records;
std::unique_ptr<IFileCachePriority> stash_priority;
size_t max_stash_element_size;
size_t enable_cache_hits_threshold;
Poco::Logger * log;
void loadCacheInfoIntoMemory(std::lock_guard<std::mutex> & cache_lock);
FileSegments getImpl(const Key & key, const FileSegment::Range & range, std::lock_guard<std::mutex> & cache_lock);
@ -257,11 +223,11 @@ private:
void useCell(const FileSegmentCell & cell, FileSegments & result, std::lock_guard<std::mutex> & cache_lock) const;
bool tryReserveForMainList(
const Key & key, size_t offset, size_t size, QueryContextPtr query_context, std::lock_guard<std::mutex> & cache_lock);
size_t getAvailableCacheSize() const;
void loadCacheInfoIntoMemory(std::lock_guard<std::mutex> & cache_lock);
const Key & key,
size_t offset,
size_t size,
QueryContextPtr query_context,
std::lock_guard<std::mutex> & cache_lock);
FileSegments splitRangeIntoCells(
const Key & key,
@ -289,6 +255,48 @@ private:
void assertCacheCellsCorrectness(const FileSegmentsByOffset & cells_by_offset, std::lock_guard<std::mutex> & cache_lock);
/// Used to track and control the cache access of each query.
/// Through it, we can realize the processing of different queries by the cache layer.
struct QueryContext
{
FileCacheRecords records;
FileCachePriorityPtr priority;
size_t cache_size = 0;
size_t max_cache_size;
bool skip_download_if_exceeds_query_cache;
QueryContext(size_t max_cache_size_, bool skip_download_if_exceeds_query_cache_)
: max_cache_size(max_cache_size_)
, skip_download_if_exceeds_query_cache(skip_download_if_exceeds_query_cache_) {}
size_t getMaxCacheSize() const { return max_cache_size; }
size_t getCacheSize() const { return cache_size; }
FileCachePriorityPtr getPriority() const { return priority; }
bool isSkipDownloadIfExceed() const { return skip_download_if_exceeds_query_cache; }
void remove(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock);
void reserve(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock);
void use(const Key & key, size_t offset, std::lock_guard<std::mutex> & cache_lock);
};
using QueryContextMap = std::unordered_map<String, QueryContextPtr>;
QueryContextMap query_map;
QueryContextPtr getCurrentQueryContext(std::lock_guard<std::mutex> & cache_lock);
QueryContextPtr getQueryContext(const String & query_id, std::lock_guard<std::mutex> & cache_lock);
void removeQueryContext(const String & query_id);
QueryContextPtr getOrSetQueryContext(const String & query_id, const ReadSettings & settings, std::lock_guard<std::mutex> &);
public:
void assertCacheCorrectness(const Key & key, std::lock_guard<std::mutex> & cache_lock);

View File

@ -79,10 +79,18 @@ NameSet IMergedBlockOutputStream::removeEmptyColumnsFromPart(
}
/// Remove files on disk and checksums
for (const String & removed_file : remove_files)
for (auto itr = remove_files.begin(); itr != remove_files.end();)
{
if (checksums.files.contains(removed_file))
checksums.files.erase(removed_file);
if (checksums.files.contains(*itr))
{
checksums.files.erase(*itr);
++itr;
}
else /// If we have no file in checksums it doesn't exist on disk
{
LOG_TRACE(storage.log, "Files {} doesn't exist in checksums so it doesn't exist on disk, will not try to remove it", *itr);
itr = remove_files.erase(itr);
}
}
/// Remove columns from columns array

View File

@ -703,11 +703,11 @@ size_t MergeTreeBaseSelectProcessor::estimateMaxBatchSizeForHugeRanges()
{
/// This is an empirical number and it is so,
/// because we have an adaptive granularity by default.
const size_t average_granule_size_bytes = 8UL * 1024 * 1024 * 10; // 10 MiB
const size_t average_granule_size_bytes = 1024 * 1024 * 10; // 10 MiB
/// We want to have one RTT per one gigabyte of data read from disk
/// this could be configurable.
const size_t max_size_for_one_request = 8UL * 1024 * 1024 * 1024; // 1 GiB
const size_t max_size_for_one_request = 1024 * 1024 * 1024; // 1 GiB
size_t sum_average_marks_size = 0;
/// getColumnSize is not fully implemented for compact parts

View File

@ -94,7 +94,7 @@ void MergedBlockOutputStream::Finalizer::Impl::finish()
{
writer.finish(sync);
for (const auto & file_name: files_to_remove_after_finish)
for (const auto & file_name : files_to_remove_after_finish)
data_part_storage_builder->removeFile(file_name);
for (auto & file : written_files)

View File

@ -34,7 +34,7 @@ def get_run_command(
# a static link, don't use S3_URL or S3_DOWNLOAD
"-e S3_URL='https://s3.amazonaws.com/clickhouse-datasets' "
# For dmesg
"--cap-add syslog "
"--privileged "
f"--volume={build_path}:/package_folder "
f"--volume={result_folder}:/test_output "
f"--volume={repo_tests_path}:/usr/share/clickhouse-test "

View File

@ -0,0 +1,3 @@
2020-10-01 144
2020-10-01 0
2020-10-01 0

View File

@ -0,0 +1,28 @@
DROP TABLE IF EXISTS ttl_table;
CREATE TABLE ttl_table
(
EventDate Date,
Longitude Float64 TTL EventDate + toIntervalWeek(2)
)
ENGINE = MergeTree()
ORDER BY EventDate
SETTINGS vertical_merge_algorithm_min_rows_to_activate=1, vertical_merge_algorithm_min_columns_to_activate=1;
SYSTEM STOP MERGES ttl_table;
INSERT INTO ttl_table VALUES(toDate('2020-10-01'), 144);
SELECT * FROM ttl_table;
SYSTEM START MERGES ttl_table;
OPTIMIZE TABLE ttl_table FINAL;
SELECT * FROM ttl_table;
OPTIMIZE TABLE ttl_table FINAL;
SELECT * FROM ttl_table;
DROP TABLE IF EXISTS ttl_table;