mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 09:32:06 +00:00
Merge pull request #55164 from ClickHouse/vdimir/fix_file_cache_tmp_write_buffer
Fix file cache temporary file segment range in FileSegment::reserve
This commit is contained in:
commit
a4bd6891e1
@ -508,7 +508,8 @@ bool FileSegment::reserve(size_t size_to_reserve, FileCacheReserveStat * reserve
|
|||||||
/// This (resizable file segments) is allowed only for single threaded use of file segment.
|
/// This (resizable file segments) is allowed only for single threaded use of file segment.
|
||||||
/// Currently it is used only for temporary files through cache.
|
/// Currently it is used only for temporary files through cache.
|
||||||
if (is_unbound && is_file_segment_size_exceeded)
|
if (is_unbound && is_file_segment_size_exceeded)
|
||||||
segment_range.right = range().left + expected_downloaded_size + size_to_reserve;
|
/// Note: segment_range.right is inclusive.
|
||||||
|
segment_range.right = range().left + expected_downloaded_size + size_to_reserve - 1;
|
||||||
|
|
||||||
/// if reserve_stat is not passed then use dummy stat and discard the result.
|
/// if reserve_stat is not passed then use dummy stat and discard the result.
|
||||||
FileCacheReserveStat dummy_stat;
|
FileCacheReserveStat dummy_stat;
|
||||||
|
@ -1,11 +1,21 @@
|
|||||||
|
#include <gtest/gtest.h>
|
||||||
|
|
||||||
#include <filesystem>
|
#include <filesystem>
|
||||||
#include <iomanip>
|
#include <iomanip>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
|
|
||||||
|
|
||||||
|
#include <algorithm>
|
||||||
|
#include <numeric>
|
||||||
|
#include <random>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
|
|
||||||
|
#include <Common/randomSeed.h>
|
||||||
#include <DataTypes/DataTypesNumber.h>
|
#include <DataTypes/DataTypesNumber.h>
|
||||||
#include <IO/ReadHelpers.h>
|
#include <IO/ReadHelpers.h>
|
||||||
#include <IO/WriteHelpers.h>
|
#include <IO/WriteHelpers.h>
|
||||||
|
|
||||||
#include <Interpreters/Cache/FileCache.h>
|
#include <Interpreters/Cache/FileCache.h>
|
||||||
#include <Interpreters/Cache/FileCacheSettings.h>
|
#include <Interpreters/Cache/FileCacheSettings.h>
|
||||||
#include <Interpreters/Cache/FileSegment.h>
|
#include <Interpreters/Cache/FileSegment.h>
|
||||||
@ -13,7 +23,6 @@
|
|||||||
#include <Interpreters/TemporaryDataOnDisk.h>
|
#include <Interpreters/TemporaryDataOnDisk.h>
|
||||||
#include <base/hex.h>
|
#include <base/hex.h>
|
||||||
#include <base/sleep.h>
|
#include <base/sleep.h>
|
||||||
#include <gtest/gtest.h>
|
|
||||||
#include <Poco/DOM/DOMParser.h>
|
#include <Poco/DOM/DOMParser.h>
|
||||||
#include <Poco/Util/XMLConfiguration.h>
|
#include <Poco/Util/XMLConfiguration.h>
|
||||||
#include <Common/CurrentThread.h>
|
#include <Common/CurrentThread.h>
|
||||||
@ -187,6 +196,12 @@ public:
|
|||||||
else
|
else
|
||||||
setupLogs(TEST_LOG_LEVEL);
|
setupLogs(TEST_LOG_LEVEL);
|
||||||
|
|
||||||
|
UInt64 seed = randomSeed();
|
||||||
|
if (const char * random_seed = std::getenv("TEST_RANDOM_SEED")) // NOLINT(concurrency-mt-unsafe)
|
||||||
|
seed = std::stoull(random_seed);
|
||||||
|
std::cout << "TEST_RANDOM_SEED=" << seed << std::endl;
|
||||||
|
rng = pcg64(seed);
|
||||||
|
|
||||||
if (fs::exists(cache_base_path))
|
if (fs::exists(cache_base_path))
|
||||||
fs::remove_all(cache_base_path);
|
fs::remove_all(cache_base_path);
|
||||||
fs::create_directories(cache_base_path);
|
fs::create_directories(cache_base_path);
|
||||||
@ -198,6 +213,7 @@ public:
|
|||||||
fs::remove_all(cache_base_path);
|
fs::remove_all(cache_base_path);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pcg64 rng;
|
||||||
};
|
};
|
||||||
|
|
||||||
TEST_F(FileCacheTest, get)
|
TEST_F(FileCacheTest, get)
|
||||||
@ -679,7 +695,7 @@ TEST_F(FileCacheTest, writeBuffer)
|
|||||||
FileCache cache("6", settings);
|
FileCache cache("6", settings);
|
||||||
cache.initialize();
|
cache.initialize();
|
||||||
|
|
||||||
auto write_to_cache = [&cache](const String & key, const Strings & data, bool flush)
|
auto write_to_cache = [&cache, this](const String & key, const Strings & data, bool flush, ReadBufferPtr * out_read_buffer = nullptr)
|
||||||
{
|
{
|
||||||
CreateFileSegmentSettings segment_settings;
|
CreateFileSegmentSettings segment_settings;
|
||||||
segment_settings.kind = FileSegmentKind::Temporary;
|
segment_settings.kind = FileSegmentKind::Temporary;
|
||||||
@ -694,24 +710,32 @@ TEST_F(FileCacheTest, writeBuffer)
|
|||||||
WriteBufferToFileSegment out(&segment);
|
WriteBufferToFileSegment out(&segment);
|
||||||
std::list<std::thread> threads;
|
std::list<std::thread> threads;
|
||||||
std::mutex mu;
|
std::mutex mu;
|
||||||
for (const auto & s : data)
|
|
||||||
|
/// get random permutation of indexes
|
||||||
|
std::vector<size_t> indexes(data.size());
|
||||||
|
std::iota(indexes.begin(), indexes.end(), 0);
|
||||||
|
std::shuffle(indexes.begin(), indexes.end(), rng);
|
||||||
|
|
||||||
|
for (auto i : indexes)
|
||||||
{
|
{
|
||||||
/// Write from diffetent threads to check
|
/// Write from diffetent threads to check
|
||||||
/// that no assertions inside cache related to downloaderId are triggered
|
/// that no assertions inside cache related to downloaderId are triggered
|
||||||
|
const auto & s = data[i];
|
||||||
threads.emplace_back([&]
|
threads.emplace_back([&]
|
||||||
{
|
{
|
||||||
std::unique_lock lock(mu);
|
std::unique_lock lock(mu);
|
||||||
out.write(s.data(), s.size());
|
out.write(s.data(), s.size());
|
||||||
/// test different buffering scenarios
|
/// test different buffering scenarios
|
||||||
if (flush)
|
if (flush)
|
||||||
{
|
|
||||||
out.next();
|
out.next();
|
||||||
}
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
for (auto & t : threads)
|
for (auto & t : threads)
|
||||||
t.join();
|
t.join();
|
||||||
|
|
||||||
out.finalize();
|
out.finalize();
|
||||||
|
if (out_read_buffer)
|
||||||
|
*out_read_buffer = out.tryGetReadBuffer();
|
||||||
return holder;
|
return holder;
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -721,17 +745,31 @@ TEST_F(FileCacheTest, writeBuffer)
|
|||||||
file_segment_paths.emplace_back(holder->front().getPathInLocalCache());
|
file_segment_paths.emplace_back(holder->front().getPathInLocalCache());
|
||||||
|
|
||||||
ASSERT_EQ(fs::file_size(file_segment_paths.back()), 7);
|
ASSERT_EQ(fs::file_size(file_segment_paths.back()), 7);
|
||||||
ASSERT_TRUE(holder->front().range() == FileSegment::Range(0, 7));
|
EXPECT_EQ(holder->front().range().size(), 7);
|
||||||
|
EXPECT_EQ(holder->front().range().left, 0);
|
||||||
ASSERT_EQ(cache.getUsedCacheSize(), 7);
|
ASSERT_EQ(cache.getUsedCacheSize(), 7);
|
||||||
|
|
||||||
{
|
{
|
||||||
auto holder2 = write_to_cache("key2", {"1", "22", "333", "4444", "55555"}, true);
|
ReadBufferPtr reader = nullptr;
|
||||||
|
|
||||||
|
auto holder2 = write_to_cache("key2", {"22", "333", "4444", "55555", "1"}, true, &reader);
|
||||||
file_segment_paths.emplace_back(holder2->front().getPathInLocalCache());
|
file_segment_paths.emplace_back(holder2->front().getPathInLocalCache());
|
||||||
|
|
||||||
std::cerr << "\nFile segments: " << holder2->toString() << "\n";
|
std::cerr << "\nFile segments: " << holder2->toString() << "\n";
|
||||||
|
|
||||||
ASSERT_EQ(fs::file_size(file_segment_paths.back()), 15);
|
ASSERT_EQ(fs::file_size(file_segment_paths.back()), 15);
|
||||||
ASSERT_EQ(holder2->front().range(), FileSegment::Range(0, 15));
|
EXPECT_TRUE(reader);
|
||||||
|
if (reader)
|
||||||
|
{
|
||||||
|
String result;
|
||||||
|
readStringUntilEOF(result, *reader);
|
||||||
|
/// sort result to make it independent of the order of writes
|
||||||
|
std::sort(result.begin(), result.end());
|
||||||
|
EXPECT_EQ(result, "122333444455555");
|
||||||
|
}
|
||||||
|
|
||||||
|
EXPECT_EQ(holder2->front().range().size(), 15);
|
||||||
|
EXPECT_EQ(holder2->front().range().left, 0);
|
||||||
ASSERT_EQ(cache.getUsedCacheSize(), 22);
|
ASSERT_EQ(cache.getUsedCacheSize(), 22);
|
||||||
}
|
}
|
||||||
ASSERT_FALSE(fs::exists(file_segment_paths.back()));
|
ASSERT_FALSE(fs::exists(file_segment_paths.back()));
|
||||||
|
Loading…
Reference in New Issue
Block a user