2019-11-20 13:33:41 +00:00
|
|
|
#include <Storages/MergeTree/MergeTreeMarksLoader.h>
|
2020-02-03 12:46:25 +00:00
|
|
|
#include <Storages/MergeTree/MergeTreeData.h>
|
2022-01-10 19:39:10 +00:00
|
|
|
#include <Common/MemoryTrackerBlockerInThread.h>
|
2020-02-03 12:46:25 +00:00
|
|
|
#include <IO/ReadBufferFromFile.h>
|
2022-08-30 17:47:34 +00:00
|
|
|
#include <Common/setThreadName.h>
|
|
|
|
#include <Common/scope_guard_safe.h>
|
|
|
|
#include <Common/CurrentMetrics.h>
|
|
|
|
#include <Common/ThreadPool.h>
|
2019-11-20 13:33:41 +00:00
|
|
|
|
2020-02-27 16:47:40 +00:00
|
|
|
#include <utility>
|
|
|
|
|
2022-08-30 17:47:34 +00:00
|
|
|
namespace ProfileEvents
|
|
|
|
{
|
|
|
|
extern const Event WaitMarksLoadMicroseconds;
|
2022-09-04 17:10:46 +00:00
|
|
|
extern const Event BackgroundLoadingMarksTasks;
|
2022-08-30 17:47:34 +00:00
|
|
|
}
|
|
|
|
|
2019-11-20 13:33:41 +00:00
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
2020-02-03 12:46:25 +00:00
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
2020-02-25 18:02:41 +00:00
|
|
|
extern const int CANNOT_READ_ALL_DATA;
|
2020-02-03 12:46:25 +00:00
|
|
|
extern const int CORRUPTED_DATA;
|
|
|
|
extern const int LOGICAL_ERROR;
|
|
|
|
}
|
|
|
|
|
2019-11-20 13:33:41 +00:00
|
|
|
MergeTreeMarksLoader::MergeTreeMarksLoader(
|
2022-04-12 18:59:49 +00:00
|
|
|
DataPartStoragePtr data_part_storage_,
|
2019-11-20 13:33:41 +00:00
|
|
|
MarkCache * mark_cache_,
|
|
|
|
const String & mrk_path_,
|
2020-02-03 12:46:25 +00:00
|
|
|
size_t marks_count_,
|
|
|
|
const MergeTreeIndexGranularityInfo & index_granularity_info_,
|
2019-11-20 13:33:41 +00:00
|
|
|
bool save_marks_in_cache_,
|
2022-07-18 12:09:57 +00:00
|
|
|
const ReadSettings & read_settings_,
|
2020-02-03 12:46:25 +00:00
|
|
|
size_t columns_in_mark_)
|
2022-04-12 18:59:49 +00:00
|
|
|
: data_part_storage(std::move(data_part_storage_))
|
2020-02-27 16:47:40 +00:00
|
|
|
, mark_cache(mark_cache_)
|
2019-11-20 13:33:41 +00:00
|
|
|
, mrk_path(mrk_path_)
|
2020-02-03 12:46:25 +00:00
|
|
|
, marks_count(marks_count_)
|
|
|
|
, index_granularity_info(index_granularity_info_)
|
2019-11-20 13:33:41 +00:00
|
|
|
, save_marks_in_cache(save_marks_in_cache_)
|
2022-07-18 12:09:57 +00:00
|
|
|
, columns_in_mark(columns_in_mark_)
|
|
|
|
, read_settings(read_settings_)
|
|
|
|
{
|
2022-09-01 15:04:34 +00:00
|
|
|
if (read_settings_.load_marks_asynchronously)
|
2022-08-30 17:47:34 +00:00
|
|
|
{
|
|
|
|
future = loadMarksAsync();
|
|
|
|
}
|
2022-07-18 12:09:57 +00:00
|
|
|
}
|
2019-11-20 13:33:41 +00:00
|
|
|
|
2022-08-31 13:39:53 +00:00
|
|
|
MergeTreeMarksLoader::~MergeTreeMarksLoader()
|
|
|
|
{
|
|
|
|
if (future.valid())
|
|
|
|
{
|
|
|
|
future.wait();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-09-01 15:04:34 +00:00
|
|
|
std::shared_ptr<ThreadPool> MergeTreeMarksLoader::getLoadMarksThreadpool()
|
|
|
|
{
|
|
|
|
constexpr size_t pool_size = 50;
|
|
|
|
constexpr size_t queue_size = 1000000;
|
|
|
|
static std::shared_ptr<ThreadPool> load_marks_threadpool = std::make_shared<ThreadPool>(pool_size, pool_size, queue_size);
|
|
|
|
|
|
|
|
return load_marks_threadpool;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2019-11-20 13:33:41 +00:00
|
|
|
const MarkInCompressedFile & MergeTreeMarksLoader::getMark(size_t row_index, size_t column_index)
|
|
|
|
{
|
|
|
|
if (!marks)
|
2022-08-30 17:47:34 +00:00
|
|
|
{
|
|
|
|
Stopwatch watch(CLOCK_MONOTONIC);
|
|
|
|
|
|
|
|
if (future.valid())
|
|
|
|
{
|
|
|
|
marks = future.get();
|
|
|
|
future = {};
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
marks = loadMarks();
|
|
|
|
}
|
|
|
|
|
|
|
|
watch.stop();
|
|
|
|
ProfileEvents::increment(ProfileEvents::WaitMarksLoadMicroseconds, watch.elapsedMicroseconds());
|
|
|
|
}
|
2020-02-03 12:46:25 +00:00
|
|
|
|
|
|
|
#ifndef NDEBUG
|
|
|
|
if (column_index >= columns_in_mark)
|
2022-08-28 20:33:42 +00:00
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Column index: {} is out of range [0, {})", column_index, columns_in_mark);
|
2020-02-03 12:46:25 +00:00
|
|
|
#endif
|
|
|
|
|
|
|
|
return (*marks)[row_index * columns_in_mark + column_index];
|
|
|
|
}
|
|
|
|
|
|
|
|
MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
|
|
|
|
{
|
|
|
|
/// Memory for marks must not be accounted as memory usage for query, because they are stored in shared cache.
|
2022-01-10 19:39:10 +00:00
|
|
|
MemoryTrackerBlockerInThread temporarily_disable_memory_tracker;
|
2020-02-03 12:46:25 +00:00
|
|
|
|
2022-04-12 18:59:49 +00:00
|
|
|
size_t file_size = data_part_storage->getFileSize(mrk_path);
|
2020-02-03 12:46:25 +00:00
|
|
|
size_t mark_size = index_granularity_info.getMarkSizeInBytes(columns_in_mark);
|
|
|
|
size_t expected_file_size = mark_size * marks_count;
|
|
|
|
|
|
|
|
if (expected_file_size != file_size)
|
|
|
|
throw Exception(
|
2022-04-12 18:59:49 +00:00
|
|
|
ErrorCodes::CORRUPTED_DATA,
|
|
|
|
"Bad size of marks file '{}': {}, must be: {}",
|
|
|
|
std::string(fs::path(data_part_storage->getFullPath()) / mrk_path),
|
|
|
|
std::to_string(file_size), std::to_string(expected_file_size));
|
2019-12-18 16:41:11 +00:00
|
|
|
|
2020-02-03 12:46:25 +00:00
|
|
|
auto res = std::make_shared<MarksInCompressedFile>(marks_count * columns_in_mark);
|
|
|
|
|
|
|
|
if (!index_granularity_info.is_adaptive)
|
|
|
|
{
|
|
|
|
/// Read directly to marks.
|
2022-07-18 12:09:57 +00:00
|
|
|
auto buffer = data_part_storage->readFile(mrk_path, read_settings.adjustBufferSize(file_size), file_size, std::nullopt);
|
2020-02-28 11:54:18 +00:00
|
|
|
buffer->readStrict(reinterpret_cast<char *>(res->data()), file_size);
|
2020-02-03 12:46:25 +00:00
|
|
|
|
2020-02-28 11:54:18 +00:00
|
|
|
if (!buffer->eof())
|
2022-08-28 20:33:42 +00:00
|
|
|
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA,
|
|
|
|
"Cannot read all marks from file {}, is eof: {}, buffer size: {}, file size: {}",
|
|
|
|
mrk_path, buffer->eof(), buffer->buffer().size(), file_size);
|
2020-02-03 12:46:25 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2022-07-18 12:09:57 +00:00
|
|
|
auto buffer = data_part_storage->readFile(mrk_path, read_settings.adjustBufferSize(file_size), file_size, std::nullopt);
|
2020-02-03 12:46:25 +00:00
|
|
|
size_t i = 0;
|
2020-02-27 16:47:40 +00:00
|
|
|
while (!buffer->eof())
|
2020-02-03 12:46:25 +00:00
|
|
|
{
|
2020-02-27 16:47:40 +00:00
|
|
|
res->read(*buffer, i * columns_in_mark, columns_in_mark);
|
|
|
|
buffer->seek(sizeof(size_t), SEEK_CUR);
|
2020-02-03 12:46:25 +00:00
|
|
|
++i;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (i * mark_size != file_size)
|
2022-08-28 20:33:42 +00:00
|
|
|
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Cannot read all marks from file {}", mrk_path);
|
2020-02-03 12:46:25 +00:00
|
|
|
}
|
|
|
|
res->protect();
|
|
|
|
return res;
|
2019-11-20 13:33:41 +00:00
|
|
|
}
|
|
|
|
|
2022-08-30 17:47:34 +00:00
|
|
|
MarkCache::MappedPtr MergeTreeMarksLoader::loadMarks()
|
2019-11-20 13:33:41 +00:00
|
|
|
{
|
2022-08-30 17:47:34 +00:00
|
|
|
MarkCache::MappedPtr loaded_marks;
|
|
|
|
|
2019-11-20 13:33:41 +00:00
|
|
|
if (mark_cache)
|
|
|
|
{
|
2022-04-22 20:29:14 +00:00
|
|
|
auto key = mark_cache->hash(fs::path(data_part_storage->getFullPath()) / mrk_path);
|
2019-11-20 13:33:41 +00:00
|
|
|
if (save_marks_in_cache)
|
|
|
|
{
|
2020-04-22 06:22:14 +00:00
|
|
|
auto callback = [this]{ return loadMarksImpl(); };
|
2022-08-30 17:47:34 +00:00
|
|
|
loaded_marks = mark_cache->getOrSet(key, callback);
|
2019-11-20 13:33:41 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2022-08-30 17:47:34 +00:00
|
|
|
loaded_marks = mark_cache->get(key);
|
|
|
|
if (!loaded_marks)
|
|
|
|
loaded_marks = loadMarksImpl();
|
2019-11-20 13:33:41 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
else
|
2022-08-30 17:47:34 +00:00
|
|
|
loaded_marks = loadMarksImpl();
|
2019-11-20 13:33:41 +00:00
|
|
|
|
2022-08-30 17:47:34 +00:00
|
|
|
if (!loaded_marks)
|
|
|
|
{
|
|
|
|
throw Exception(
|
|
|
|
ErrorCodes::LOGICAL_ERROR, "Failed to load marks: {}",
|
|
|
|
(fs::path(data_part_storage->getFullPath()) / mrk_path).string());
|
|
|
|
}
|
|
|
|
|
|
|
|
return loaded_marks;
|
|
|
|
}
|
|
|
|
|
|
|
|
std::future<MarkCache::MappedPtr> MergeTreeMarksLoader::loadMarksAsync()
|
|
|
|
{
|
|
|
|
ThreadGroupStatusPtr thread_group;
|
|
|
|
if (CurrentThread::isInitialized() && CurrentThread::get().getThreadGroup())
|
|
|
|
thread_group = CurrentThread::get().getThreadGroup();
|
|
|
|
|
|
|
|
auto task = std::make_shared<std::packaged_task<MarkCache::MappedPtr()>>([thread_group, this]
|
|
|
|
{
|
|
|
|
setThreadName("loadMarksThread");
|
|
|
|
|
|
|
|
if (thread_group)
|
|
|
|
CurrentThread::attachTo(thread_group);
|
|
|
|
|
|
|
|
SCOPE_EXIT_SAFE({
|
|
|
|
if (thread_group)
|
|
|
|
CurrentThread::detachQuery();
|
|
|
|
});
|
|
|
|
|
2022-09-04 17:10:46 +00:00
|
|
|
ProfileEvents::increment(ProfileEvents::BackgroundLoadingMarksTasks);
|
2022-08-30 17:47:34 +00:00
|
|
|
return loadMarks();
|
|
|
|
});
|
|
|
|
|
|
|
|
auto task_future = task->get_future();
|
2022-09-01 15:04:34 +00:00
|
|
|
getLoadMarksThreadpool()->scheduleOrThrow([task]{ (*task)(); });
|
2022-08-30 17:47:34 +00:00
|
|
|
return task_future;
|
2019-11-20 13:33:41 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
}
|