mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-03 04:52:10 +00:00
MTReadStream
This commit is contained in:
parent
c4a725a496
commit
8c2a23a129
@ -1,6 +1,6 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <Storages/MergeTree/MergeTreeReader.h>
|
#include <Storages/MergeTree/MergeTreeReaderStream.h>
|
||||||
#include <Storages/MergeTree/MergeTreeIndices.h>
|
#include <Storages/MergeTree/MergeTreeIndices.h>
|
||||||
#include <Storages/MergeTree/MergeTreeData.h>
|
#include <Storages/MergeTree/MergeTreeData.h>
|
||||||
|
|
||||||
@ -20,7 +20,7 @@ public:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
MergeTreeIndexPtr index;
|
MergeTreeIndexPtr index;
|
||||||
MergeTreeReader::Stream stream;
|
MergeTreeReaderStream stream;
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
@ -154,205 +154,6 @@ size_t MergeTreeReader::readRows(size_t from_mark, bool continue_reading, size_t
|
|||||||
return read_rows;
|
return read_rows;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
MergeTreeReader::Stream::Stream(
|
|
||||||
const String & path_prefix_, const String & extension_, size_t marks_count_,
|
|
||||||
const MarkRanges & all_mark_ranges,
|
|
||||||
MarkCache * mark_cache_, bool save_marks_in_cache_,
|
|
||||||
UncompressedCache * uncompressed_cache,
|
|
||||||
size_t aio_threshold, size_t max_read_buffer_size,
|
|
||||||
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type)
|
|
||||||
: path_prefix(path_prefix_), extension(extension_), marks_count(marks_count_)
|
|
||||||
, mark_cache(mark_cache_), save_marks_in_cache(save_marks_in_cache_)
|
|
||||||
{
|
|
||||||
/// Compute the size of the buffer.
|
|
||||||
size_t max_mark_range = 0;
|
|
||||||
|
|
||||||
for (size_t i = 0; i < all_mark_ranges.size(); ++i)
|
|
||||||
{
|
|
||||||
size_t right = all_mark_ranges[i].end;
|
|
||||||
/// NOTE: if we are reading the whole file, then right == marks_count
|
|
||||||
/// and we will use max_read_buffer_size for buffer size, thus avoiding the need to load marks.
|
|
||||||
|
|
||||||
/// If the end of range is inside the block, we will need to read it too.
|
|
||||||
if (right < marks_count && getMark(right).offset_in_decompressed_block > 0)
|
|
||||||
{
|
|
||||||
while (right < marks_count
|
|
||||||
&& getMark(right).offset_in_compressed_file
|
|
||||||
== getMark(all_mark_ranges[i].end).offset_in_compressed_file)
|
|
||||||
{
|
|
||||||
++right;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// If there are no marks after the end of range, just use max_read_buffer_size
|
|
||||||
if (right >= marks_count
|
|
||||||
|| (right + 1 == marks_count
|
|
||||||
&& getMark(right).offset_in_compressed_file
|
|
||||||
== getMark(all_mark_ranges[i].end).offset_in_compressed_file))
|
|
||||||
{
|
|
||||||
max_mark_range = max_read_buffer_size;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
max_mark_range = std::max(max_mark_range,
|
|
||||||
getMark(right).offset_in_compressed_file - getMark(all_mark_ranges[i].begin).offset_in_compressed_file);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Avoid empty buffer. May happen while reading dictionary for DataTypeLowCardinality.
|
|
||||||
/// For example: part has single dictionary and all marks point to the same position.
|
|
||||||
if (max_mark_range == 0)
|
|
||||||
max_mark_range = max_read_buffer_size;
|
|
||||||
|
|
||||||
size_t buffer_size = std::min(max_read_buffer_size, max_mark_range);
|
|
||||||
|
|
||||||
/// Estimate size of the data to be read.
|
|
||||||
size_t estimated_size = 0;
|
|
||||||
if (aio_threshold > 0)
|
|
||||||
{
|
|
||||||
for (const auto & mark_range : all_mark_ranges)
|
|
||||||
{
|
|
||||||
size_t offset_begin = (mark_range.begin > 0)
|
|
||||||
? getMark(mark_range.begin).offset_in_compressed_file
|
|
||||||
: 0;
|
|
||||||
|
|
||||||
size_t offset_end = (mark_range.end < marks_count)
|
|
||||||
? getMark(mark_range.end).offset_in_compressed_file
|
|
||||||
: Poco::File(path_prefix + extension).getSize();
|
|
||||||
|
|
||||||
if (offset_end > offset_begin)
|
|
||||||
estimated_size += offset_end - offset_begin;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Initialize the objects that shall be used to perform read operations.
|
|
||||||
if (uncompressed_cache)
|
|
||||||
{
|
|
||||||
auto buffer = std::make_unique<CachedCompressedReadBuffer>(
|
|
||||||
path_prefix + extension, uncompressed_cache, estimated_size, aio_threshold, buffer_size);
|
|
||||||
|
|
||||||
if (profile_callback)
|
|
||||||
buffer->setProfileCallback(profile_callback, clock_type);
|
|
||||||
|
|
||||||
cached_buffer = std::move(buffer);
|
|
||||||
data_buffer = cached_buffer.get();
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
auto buffer = std::make_unique<CompressedReadBufferFromFile>(
|
|
||||||
path_prefix + extension, estimated_size, aio_threshold, buffer_size);
|
|
||||||
|
|
||||||
if (profile_callback)
|
|
||||||
buffer->setProfileCallback(profile_callback, clock_type);
|
|
||||||
|
|
||||||
non_cached_buffer = std::move(buffer);
|
|
||||||
data_buffer = non_cached_buffer.get();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
const MarkInCompressedFile & MergeTreeReader::Stream::getMark(size_t index)
|
|
||||||
{
|
|
||||||
if (!marks)
|
|
||||||
loadMarks();
|
|
||||||
return (*marks)[index];
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void MergeTreeReader::Stream::loadMarks()
|
|
||||||
{
|
|
||||||
std::string mrk_path = path_prefix + ".mrk";
|
|
||||||
|
|
||||||
auto load = [&]() -> MarkCache::MappedPtr
|
|
||||||
{
|
|
||||||
/// Memory for marks must not be accounted as memory usage for query, because they are stored in shared cache.
|
|
||||||
auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock();
|
|
||||||
|
|
||||||
size_t file_size = Poco::File(mrk_path).getSize();
|
|
||||||
size_t expected_file_size = sizeof(MarkInCompressedFile) * marks_count;
|
|
||||||
if (expected_file_size != file_size)
|
|
||||||
throw Exception(
|
|
||||||
"bad size of marks file `" + mrk_path + "':" + std::to_string(file_size) + ", must be: " + std::to_string(expected_file_size),
|
|
||||||
ErrorCodes::CORRUPTED_DATA);
|
|
||||||
|
|
||||||
auto res = std::make_shared<MarksInCompressedFile>(marks_count);
|
|
||||||
|
|
||||||
/// Read directly to marks.
|
|
||||||
ReadBufferFromFile buffer(mrk_path, file_size, -1, reinterpret_cast<char *>(res->data()));
|
|
||||||
|
|
||||||
if (buffer.eof() || buffer.buffer().size() != file_size)
|
|
||||||
throw Exception("Cannot read all marks from file " + mrk_path, ErrorCodes::CANNOT_READ_ALL_DATA);
|
|
||||||
|
|
||||||
return res;
|
|
||||||
};
|
|
||||||
|
|
||||||
if (mark_cache)
|
|
||||||
{
|
|
||||||
auto key = mark_cache->hash(mrk_path);
|
|
||||||
if (save_marks_in_cache)
|
|
||||||
{
|
|
||||||
marks = mark_cache->getOrSet(key, load);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
marks = mark_cache->get(key);
|
|
||||||
if (!marks)
|
|
||||||
marks = load();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
marks = load();
|
|
||||||
|
|
||||||
if (!marks)
|
|
||||||
throw Exception("Failed to load marks: " + mrk_path, ErrorCodes::LOGICAL_ERROR);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void MergeTreeReader::Stream::seekToMark(size_t index)
|
|
||||||
{
|
|
||||||
MarkInCompressedFile mark = getMark(index);
|
|
||||||
|
|
||||||
try
|
|
||||||
{
|
|
||||||
if (cached_buffer)
|
|
||||||
cached_buffer->seek(mark.offset_in_compressed_file, mark.offset_in_decompressed_block);
|
|
||||||
if (non_cached_buffer)
|
|
||||||
non_cached_buffer->seek(mark.offset_in_compressed_file, mark.offset_in_decompressed_block);
|
|
||||||
}
|
|
||||||
catch (Exception & e)
|
|
||||||
{
|
|
||||||
/// Better diagnostics.
|
|
||||||
if (e.code() == ErrorCodes::ARGUMENT_OUT_OF_BOUND)
|
|
||||||
e.addMessage("(while seeking to mark " + toString(index)
|
|
||||||
+ " of column " + path_prefix + "; offsets are: "
|
|
||||||
+ toString(mark.offset_in_compressed_file) + " "
|
|
||||||
+ toString(mark.offset_in_decompressed_block) + ")");
|
|
||||||
|
|
||||||
throw;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void MergeTreeReader::Stream::seekToStart()
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
if (cached_buffer)
|
|
||||||
cached_buffer->seek(0, 0);
|
|
||||||
if (non_cached_buffer)
|
|
||||||
non_cached_buffer->seek(0, 0);
|
|
||||||
}
|
|
||||||
catch (Exception & e)
|
|
||||||
{
|
|
||||||
/// Better diagnostics.
|
|
||||||
if (e.code() == ErrorCodes::ARGUMENT_OUT_OF_BOUND)
|
|
||||||
e.addMessage("(while seeking to start of column " + path_prefix + ")");
|
|
||||||
|
|
||||||
throw;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void MergeTreeReader::addStreams(const String & name, const IDataType & type,
|
void MergeTreeReader::addStreams(const String & name, const IDataType & type,
|
||||||
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type)
|
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type)
|
||||||
{
|
{
|
||||||
@ -371,7 +172,7 @@ void MergeTreeReader::addStreams(const String & name, const IDataType & type,
|
|||||||
if (!data_file_exists)
|
if (!data_file_exists)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
streams.emplace(stream_name, std::make_unique<Stream>(
|
streams.emplace(stream_name, std::make_unique<MergeTreeReaderStream>(
|
||||||
path + stream_name, DATA_FILE_EXTENSION, data_part->marks_count,
|
path + stream_name, DATA_FILE_EXTENSION, data_part->marks_count,
|
||||||
all_mark_ranges, mark_cache, save_marks_in_cache,
|
all_mark_ranges, mark_cache, save_marks_in_cache,
|
||||||
uncompressed_cache, aio_threshold, max_read_buffer_size, profile_callback, clock_type));
|
uncompressed_cache, aio_threshold, max_read_buffer_size, profile_callback, clock_type));
|
||||||
@ -401,7 +202,7 @@ void MergeTreeReader::readData(
|
|||||||
if (it == streams.end())
|
if (it == streams.end())
|
||||||
return nullptr;
|
return nullptr;
|
||||||
|
|
||||||
Stream & stream = *it->second;
|
MergeTreeReaderStream & stream = *it->second;
|
||||||
|
|
||||||
if (stream_for_prefix)
|
if (stream_for_prefix)
|
||||||
{
|
{
|
||||||
|
@ -1,12 +1,7 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <Storages/MarkCache.h>
|
|
||||||
#include <Storages/MergeTree/MarkRange.h>
|
|
||||||
#include <Storages/MergeTree/MergeTreeData.h>
|
|
||||||
#include <Storages/MergeTree/MergeTreeRangeReader.h>
|
|
||||||
#include <Compression/CachedCompressedReadBuffer.h>
|
|
||||||
#include <Compression/CompressedReadBufferFromFile.h>
|
|
||||||
#include <Core/NamesAndTypes.h>
|
#include <Core/NamesAndTypes.h>
|
||||||
|
#include <Storages/MergeTree/MergeTreeReaderStream.h>
|
||||||
#include <port/clock.h>
|
#include <port/clock.h>
|
||||||
|
|
||||||
|
|
||||||
@ -14,7 +9,6 @@ namespace DB
|
|||||||
{
|
{
|
||||||
|
|
||||||
class IDataType;
|
class IDataType;
|
||||||
class CachedCompressedReadBuffer;
|
|
||||||
|
|
||||||
/// Reads the data between pairs of marks in the same part. When reading consecutive ranges, avoids unnecessary seeks.
|
/// Reads the data between pairs of marks in the same part. When reading consecutive ranges, avoids unnecessary seeks.
|
||||||
/// When ranges are almost consecutive, seeks are fast because they are performed inside the buffer.
|
/// When ranges are almost consecutive, seeks are fast because they are performed inside the buffer.
|
||||||
@ -57,45 +51,8 @@ public:
|
|||||||
/// If continue_reading is true, continue reading from last state, otherwise seek to from_mark
|
/// If continue_reading is true, continue reading from last state, otherwise seek to from_mark
|
||||||
size_t readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Block & res);
|
size_t readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Block & res);
|
||||||
|
|
||||||
class Stream
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
Stream(
|
|
||||||
const String & path_prefix_, const String & extension_, size_t marks_count_,
|
|
||||||
const MarkRanges & all_mark_ranges,
|
|
||||||
MarkCache * mark_cache, bool save_marks_in_cache,
|
|
||||||
UncompressedCache * uncompressed_cache,
|
|
||||||
size_t aio_threshold, size_t max_read_buffer_size,
|
|
||||||
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type);
|
|
||||||
|
|
||||||
void seekToMark(size_t index);
|
|
||||||
void seekToStart();
|
|
||||||
|
|
||||||
ReadBuffer * data_buffer;
|
|
||||||
|
|
||||||
private:
|
|
||||||
Stream() = default;
|
|
||||||
|
|
||||||
/// NOTE: lazily loads marks from the marks cache.
|
|
||||||
const MarkInCompressedFile & getMark(size_t index);
|
|
||||||
|
|
||||||
void loadMarks();
|
|
||||||
|
|
||||||
std::string path_prefix;
|
|
||||||
std::string extension;
|
|
||||||
|
|
||||||
size_t marks_count;
|
|
||||||
|
|
||||||
MarkCache * mark_cache;
|
|
||||||
bool save_marks_in_cache;
|
|
||||||
MarkCache::MappedPtr marks;
|
|
||||||
|
|
||||||
std::unique_ptr<CachedCompressedReadBuffer> cached_buffer;
|
|
||||||
std::unique_ptr<CompressedReadBufferFromFile> non_cached_buffer;
|
|
||||||
};
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
using FileStreams = std::map<std::string, std::unique_ptr<Stream>>;
|
using FileStreams = std::map<std::string, std::unique_ptr<MergeTreeReaderStream>>;
|
||||||
|
|
||||||
/// avg_value_size_hints are used to reduce the number of reallocations when creating columns of variable size.
|
/// avg_value_size_hints are used to reduce the number of reallocations when creating columns of variable size.
|
||||||
ValueSizeMap avg_value_size_hints;
|
ValueSizeMap avg_value_size_hints;
|
||||||
|
215
dbms/src/Storages/MergeTree/MergeTreeReaderStream.cpp
Normal file
215
dbms/src/Storages/MergeTree/MergeTreeReaderStream.cpp
Normal file
@ -0,0 +1,215 @@
|
|||||||
|
#include <Common/MemoryTracker.h>
|
||||||
|
#include <Storages/MergeTree/MergeTreeReaderStream.h>
|
||||||
|
#include <Poco/File.h>
|
||||||
|
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
namespace ErrorCodes
|
||||||
|
{
|
||||||
|
extern const int LOGICAL_ERROR;
|
||||||
|
extern const int CORRUPTED_DATA;
|
||||||
|
extern const int CANNOT_READ_ALL_DATA;
|
||||||
|
extern const int ARGUMENT_OUT_OF_BOUND;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
MergeTreeReaderStream::MergeTreeReaderStream(
|
||||||
|
const String & path_prefix_, const String & extension_, size_t marks_count_,
|
||||||
|
const MarkRanges & all_mark_ranges,
|
||||||
|
MarkCache * mark_cache_, bool save_marks_in_cache_,
|
||||||
|
UncompressedCache * uncompressed_cache,
|
||||||
|
size_t aio_threshold, size_t max_read_buffer_size,
|
||||||
|
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type)
|
||||||
|
: path_prefix(path_prefix_), extension(extension_), marks_count(marks_count_)
|
||||||
|
, mark_cache(mark_cache_), save_marks_in_cache(save_marks_in_cache_)
|
||||||
|
{
|
||||||
|
/// Compute the size of the buffer.
|
||||||
|
size_t max_mark_range = 0;
|
||||||
|
|
||||||
|
for (size_t i = 0; i < all_mark_ranges.size(); ++i)
|
||||||
|
{
|
||||||
|
size_t right = all_mark_ranges[i].end;
|
||||||
|
/// NOTE: if we are reading the whole file, then right == marks_count
|
||||||
|
/// and we will use max_read_buffer_size for buffer size, thus avoiding the need to load marks.
|
||||||
|
|
||||||
|
/// If the end of range is inside the block, we will need to read it too.
|
||||||
|
if (right < marks_count && getMark(right).offset_in_decompressed_block > 0)
|
||||||
|
{
|
||||||
|
while (right < marks_count
|
||||||
|
&& getMark(right).offset_in_compressed_file
|
||||||
|
== getMark(all_mark_ranges[i].end).offset_in_compressed_file)
|
||||||
|
{
|
||||||
|
++right;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// If there are no marks after the end of range, just use max_read_buffer_size
|
||||||
|
if (right >= marks_count
|
||||||
|
|| (right + 1 == marks_count
|
||||||
|
&& getMark(right).offset_in_compressed_file
|
||||||
|
== getMark(all_mark_ranges[i].end).offset_in_compressed_file))
|
||||||
|
{
|
||||||
|
max_mark_range = max_read_buffer_size;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
max_mark_range = std::max(max_mark_range,
|
||||||
|
getMark(right).offset_in_compressed_file - getMark(all_mark_ranges[i].begin).offset_in_compressed_file);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Avoid empty buffer. May happen while reading dictionary for DataTypeLowCardinality.
|
||||||
|
/// For example: part has single dictionary and all marks point to the same position.
|
||||||
|
if (max_mark_range == 0)
|
||||||
|
max_mark_range = max_read_buffer_size;
|
||||||
|
|
||||||
|
size_t buffer_size = std::min(max_read_buffer_size, max_mark_range);
|
||||||
|
|
||||||
|
/// Estimate size of the data to be read.
|
||||||
|
size_t estimated_size = 0;
|
||||||
|
if (aio_threshold > 0)
|
||||||
|
{
|
||||||
|
for (const auto & mark_range : all_mark_ranges)
|
||||||
|
{
|
||||||
|
size_t offset_begin = (mark_range.begin > 0)
|
||||||
|
? getMark(mark_range.begin).offset_in_compressed_file
|
||||||
|
: 0;
|
||||||
|
|
||||||
|
size_t offset_end = (mark_range.end < marks_count)
|
||||||
|
? getMark(mark_range.end).offset_in_compressed_file
|
||||||
|
: Poco::File(path_prefix + extension).getSize();
|
||||||
|
|
||||||
|
if (offset_end > offset_begin)
|
||||||
|
estimated_size += offset_end - offset_begin;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Initialize the objects that shall be used to perform read operations.
|
||||||
|
if (uncompressed_cache)
|
||||||
|
{
|
||||||
|
auto buffer = std::make_unique<CachedCompressedReadBuffer>(
|
||||||
|
path_prefix + extension, uncompressed_cache, estimated_size, aio_threshold, buffer_size);
|
||||||
|
|
||||||
|
if (profile_callback)
|
||||||
|
buffer->setProfileCallback(profile_callback, clock_type);
|
||||||
|
|
||||||
|
cached_buffer = std::move(buffer);
|
||||||
|
data_buffer = cached_buffer.get();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
auto buffer = std::make_unique<CompressedReadBufferFromFile>(
|
||||||
|
path_prefix + extension, estimated_size, aio_threshold, buffer_size);
|
||||||
|
|
||||||
|
if (profile_callback)
|
||||||
|
buffer->setProfileCallback(profile_callback, clock_type);
|
||||||
|
|
||||||
|
non_cached_buffer = std::move(buffer);
|
||||||
|
data_buffer = non_cached_buffer.get();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
const MarkInCompressedFile & MergeTreeReaderStream::getMark(size_t index)
|
||||||
|
{
|
||||||
|
if (!marks)
|
||||||
|
loadMarks();
|
||||||
|
return (*marks)[index];
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void MergeTreeReaderStream::loadMarks()
|
||||||
|
{
|
||||||
|
std::string mrk_path = path_prefix + ".mrk";
|
||||||
|
|
||||||
|
auto load = [&]() -> MarkCache::MappedPtr
|
||||||
|
{
|
||||||
|
/// Memory for marks must not be accounted as memory usage for query, because they are stored in shared cache.
|
||||||
|
auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock();
|
||||||
|
|
||||||
|
size_t file_size = Poco::File(mrk_path).getSize();
|
||||||
|
size_t expected_file_size = sizeof(MarkInCompressedFile) * marks_count;
|
||||||
|
if (expected_file_size != file_size)
|
||||||
|
throw Exception(
|
||||||
|
"bad size of marks file `" + mrk_path + "':" + std::to_string(file_size) + ", must be: " + std::to_string(expected_file_size),
|
||||||
|
ErrorCodes::CORRUPTED_DATA);
|
||||||
|
|
||||||
|
auto res = std::make_shared<MarksInCompressedFile>(marks_count);
|
||||||
|
|
||||||
|
/// Read directly to marks.
|
||||||
|
ReadBufferFromFile buffer(mrk_path, file_size, -1, reinterpret_cast<char *>(res->data()));
|
||||||
|
|
||||||
|
if (buffer.eof() || buffer.buffer().size() != file_size)
|
||||||
|
throw Exception("Cannot read all marks from file " + mrk_path, ErrorCodes::CANNOT_READ_ALL_DATA);
|
||||||
|
|
||||||
|
return res;
|
||||||
|
};
|
||||||
|
|
||||||
|
if (mark_cache)
|
||||||
|
{
|
||||||
|
auto key = mark_cache->hash(mrk_path);
|
||||||
|
if (save_marks_in_cache)
|
||||||
|
{
|
||||||
|
marks = mark_cache->getOrSet(key, load);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
marks = mark_cache->get(key);
|
||||||
|
if (!marks)
|
||||||
|
marks = load();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
marks = load();
|
||||||
|
|
||||||
|
if (!marks)
|
||||||
|
throw Exception("Failed to load marks: " + mrk_path, ErrorCodes::LOGICAL_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void MergeTreeReaderStream::seekToMark(size_t index)
|
||||||
|
{
|
||||||
|
MarkInCompressedFile mark = getMark(index);
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (cached_buffer)
|
||||||
|
cached_buffer->seek(mark.offset_in_compressed_file, mark.offset_in_decompressed_block);
|
||||||
|
if (non_cached_buffer)
|
||||||
|
non_cached_buffer->seek(mark.offset_in_compressed_file, mark.offset_in_decompressed_block);
|
||||||
|
}
|
||||||
|
catch (Exception & e)
|
||||||
|
{
|
||||||
|
/// Better diagnostics.
|
||||||
|
if (e.code() == ErrorCodes::ARGUMENT_OUT_OF_BOUND)
|
||||||
|
e.addMessage("(while seeking to mark " + toString(index)
|
||||||
|
+ " of column " + path_prefix + "; offsets are: "
|
||||||
|
+ toString(mark.offset_in_compressed_file) + " "
|
||||||
|
+ toString(mark.offset_in_decompressed_block) + ")");
|
||||||
|
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void MergeTreeReaderStream::seekToStart()
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (cached_buffer)
|
||||||
|
cached_buffer->seek(0, 0);
|
||||||
|
if (non_cached_buffer)
|
||||||
|
non_cached_buffer->seek(0, 0);
|
||||||
|
}
|
||||||
|
catch (Exception & e)
|
||||||
|
{
|
||||||
|
/// Better diagnostics.
|
||||||
|
if (e.code() == ErrorCodes::ARGUMENT_OUT_OF_BOUND)
|
||||||
|
e.addMessage("(while seeking to start of column " + path_prefix + ")");
|
||||||
|
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
49
dbms/src/Storages/MergeTree/MergeTreeReaderStream.h
Normal file
49
dbms/src/Storages/MergeTree/MergeTreeReaderStream.h
Normal file
@ -0,0 +1,49 @@
|
|||||||
|
#include <Storages/MarkCache.h>
|
||||||
|
#include <Storages/MergeTree/MarkRange.h>
|
||||||
|
#include <Storages/MergeTree/MergeTreeData.h>
|
||||||
|
#include <Storages/MergeTree/MergeTreeRangeReader.h>
|
||||||
|
#include <Compression/CachedCompressedReadBuffer.h>
|
||||||
|
#include <Compression/CompressedReadBufferFromFile.h>
|
||||||
|
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
class MergeTreeReaderStream
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
MergeTreeReaderStream(
|
||||||
|
const String &path_prefix_, const String &extension_, size_t marks_count_,
|
||||||
|
const MarkRanges &all_mark_ranges,
|
||||||
|
MarkCache *mark_cache, bool save_marks_in_cache,
|
||||||
|
UncompressedCache *uncompressed_cache,
|
||||||
|
size_t aio_threshold, size_t max_read_buffer_size,
|
||||||
|
const ReadBufferFromFileBase::ProfileCallback &profile_callback, clockid_t clock_type);
|
||||||
|
|
||||||
|
void seekToMark(size_t index);
|
||||||
|
|
||||||
|
void seekToStart();
|
||||||
|
|
||||||
|
ReadBuffer *data_buffer;
|
||||||
|
|
||||||
|
private:
|
||||||
|
MergeTreeReaderStream() = default;
|
||||||
|
|
||||||
|
/// NOTE: lazily loads marks from the marks cache.
|
||||||
|
const MarkInCompressedFile &getMark(size_t index);
|
||||||
|
|
||||||
|
void loadMarks();
|
||||||
|
|
||||||
|
std::string path_prefix;
|
||||||
|
std::string extension;
|
||||||
|
|
||||||
|
size_t marks_count;
|
||||||
|
|
||||||
|
MarkCache *mark_cache;
|
||||||
|
bool save_marks_in_cache;
|
||||||
|
MarkCache::MappedPtr marks;
|
||||||
|
|
||||||
|
std::unique_ptr<CachedCompressedReadBuffer> cached_buffer;
|
||||||
|
std::unique_ptr<CompressedReadBufferFromFile> non_cached_buffer;
|
||||||
|
};
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user