mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-24 18:50:49 +00:00
Less seeks in compressed buffers
This commit is contained in:
parent
dab4c60596
commit
8186339307
@ -2,6 +2,7 @@
|
|||||||
|
|
||||||
#include <IO/WriteHelpers.h>
|
#include <IO/WriteHelpers.h>
|
||||||
#include <Compression/LZ4_decompress_faster.h>
|
#include <Compression/LZ4_decompress_faster.h>
|
||||||
|
#include <common/logger_useful.h>
|
||||||
|
|
||||||
#include <utility>
|
#include <utility>
|
||||||
|
|
||||||
@ -30,7 +31,6 @@ void CachedCompressedReadBuffer::initInput()
|
|||||||
|
|
||||||
bool CachedCompressedReadBuffer::nextImpl()
|
bool CachedCompressedReadBuffer::nextImpl()
|
||||||
{
|
{
|
||||||
|
|
||||||
/// Let's check for the presence of a decompressed block in the cache, grab the ownership of this block, if it exists.
|
/// Let's check for the presence of a decompressed block in the cache, grab the ownership of this block, if it exists.
|
||||||
UInt128 key = cache->hash(path, file_pos);
|
UInt128 key = cache->hash(path, file_pos);
|
||||||
|
|
||||||
@ -60,6 +60,13 @@ bool CachedCompressedReadBuffer::nextImpl()
|
|||||||
|
|
||||||
working_buffer = Buffer(owned_cell->data.data(), owned_cell->data.data() + owned_cell->data.size() - owned_cell->additional_bytes);
|
working_buffer = Buffer(owned_cell->data.data(), owned_cell->data.data() + owned_cell->data.size() - owned_cell->additional_bytes);
|
||||||
|
|
||||||
|
/// nextimpl_working_buffer_offset is set in the seek function (lazy seek). So we have to
|
||||||
|
/// check that we are not seeking beyond working buffer.
|
||||||
|
if (nextimpl_working_buffer_offset > working_buffer.size())
|
||||||
|
throw Exception("Seek position is beyond the decompressed block"
|
||||||
|
" (pos: " + toString(nextimpl_working_buffer_offset) + ", block size: " + toString(working_buffer.size()) + ")",
|
||||||
|
ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
|
||||||
|
|
||||||
file_pos += owned_cell->compressed_size;
|
file_pos += owned_cell->compressed_size;
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
@ -74,28 +81,32 @@ CachedCompressedReadBuffer::CachedCompressedReadBuffer(
|
|||||||
|
|
||||||
void CachedCompressedReadBuffer::seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block)
|
void CachedCompressedReadBuffer::seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block)
|
||||||
{
|
{
|
||||||
|
|
||||||
|
/// Nothing to do if we already at required position
|
||||||
|
if (file_pos == offset_in_compressed_file
|
||||||
|
&& (offset() == offset_in_decompressed_block ||
|
||||||
|
nextimpl_working_buffer_offset == offset_in_decompressed_block))
|
||||||
|
return;
|
||||||
|
|
||||||
if (owned_cell &&
|
if (owned_cell &&
|
||||||
offset_in_compressed_file == file_pos - owned_cell->compressed_size &&
|
offset_in_compressed_file == file_pos - owned_cell->compressed_size &&
|
||||||
offset_in_decompressed_block <= working_buffer.size())
|
offset_in_decompressed_block <= working_buffer.size())
|
||||||
{
|
{
|
||||||
bytes += offset();
|
|
||||||
pos = working_buffer.begin() + offset_in_decompressed_block;
|
pos = working_buffer.begin() + offset_in_decompressed_block;
|
||||||
bytes -= offset();
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
|
||||||
|
LOG_INFO(&Poco::Logger::get("DEBUG"), "Adding offset {}", offset());
|
||||||
|
/// Remember position in compressed file (will be moved in nextImpl)
|
||||||
file_pos = offset_in_compressed_file;
|
file_pos = offset_in_compressed_file;
|
||||||
|
/// We will discard our working_buffer, but have to account rest bytes
|
||||||
bytes += offset();
|
bytes += offset();
|
||||||
nextImpl();
|
/// No data, everything discarded
|
||||||
|
pos = working_buffer.end();
|
||||||
if (offset_in_decompressed_block > working_buffer.size())
|
/// Remember required offset in decompressed block which will be set in
|
||||||
throw Exception("Seek position is beyond the decompressed block"
|
/// the next ReadBuffer::next() call
|
||||||
" (pos: " + toString(offset_in_decompressed_block) + ", block size: " + toString(working_buffer.size()) + ")",
|
nextimpl_working_buffer_offset = offset_in_decompressed_block;
|
||||||
ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
|
|
||||||
|
|
||||||
pos = working_buffer.begin() + offset_in_decompressed_block;
|
|
||||||
bytes -= offset();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -15,7 +15,7 @@ namespace DB
|
|||||||
* The external cache is passed as an argument to the constructor.
|
* The external cache is passed as an argument to the constructor.
|
||||||
* Allows you to increase performance in cases where the same blocks are often read.
|
* Allows you to increase performance in cases where the same blocks are often read.
|
||||||
* Disadvantages:
|
* Disadvantages:
|
||||||
* - in case you need to read a lot of data in a row, but of them only a part is cached, you have to do seek-and.
|
* - in case you need to read a lot of data in a row, but some of them only a part is cached, you have to do seek-and.
|
||||||
*/
|
*/
|
||||||
class CachedCompressedReadBuffer : public CompressedReadBufferBase, public ReadBuffer
|
class CachedCompressedReadBuffer : public CompressedReadBufferBase, public ReadBuffer
|
||||||
{
|
{
|
||||||
@ -25,6 +25,8 @@ private:
|
|||||||
std::unique_ptr<ReadBufferFromFileBase> file_in;
|
std::unique_ptr<ReadBufferFromFileBase> file_in;
|
||||||
|
|
||||||
const std::string path;
|
const std::string path;
|
||||||
|
|
||||||
|
/// Current position in file_in
|
||||||
size_t file_pos;
|
size_t file_pos;
|
||||||
|
|
||||||
/// A piece of data from the cache, or a piece of read data that we put into the cache.
|
/// A piece of data from the cache, or a piece of read data that we put into the cache.
|
||||||
@ -37,9 +39,15 @@ private:
|
|||||||
ReadBufferFromFileBase::ProfileCallback profile_callback;
|
ReadBufferFromFileBase::ProfileCallback profile_callback;
|
||||||
clockid_t clock_type {};
|
clockid_t clock_type {};
|
||||||
|
|
||||||
|
|
||||||
|
/// Check comment in CompressedReadBuffer
|
||||||
|
/* size_t nextimpl_working_buffer_offset; */
|
||||||
|
|
||||||
public:
|
public:
|
||||||
CachedCompressedReadBuffer(const std::string & path, std::function<std::unique_ptr<ReadBufferFromFileBase>()> file_in_creator, UncompressedCache * cache_, bool allow_different_codecs_ = false);
|
CachedCompressedReadBuffer(const std::string & path, std::function<std::unique_ptr<ReadBufferFromFileBase>()> file_in_creator, UncompressedCache * cache_, bool allow_different_codecs_ = false);
|
||||||
|
|
||||||
|
/// Seek is lazy. It doesn't move the position anywhere, just remember them and perform actual
|
||||||
|
/// seek inside nextImpl.
|
||||||
void seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block);
|
void seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block);
|
||||||
|
|
||||||
void setProfileCallback(const ReadBufferFromFileBase::ProfileCallback & profile_callback_, clockid_t clock_type_ = CLOCK_MONOTONIC_COARSE)
|
void setProfileCallback(const ReadBufferFromFileBase::ProfileCallback & profile_callback_, clockid_t clock_type_ = CLOCK_MONOTONIC_COARSE)
|
||||||
|
@ -33,6 +33,13 @@ bool CompressedReadBufferFromFile::nextImpl()
|
|||||||
|
|
||||||
decompress(working_buffer, size_decompressed, size_compressed_without_checksum);
|
decompress(working_buffer, size_decompressed, size_compressed_without_checksum);
|
||||||
|
|
||||||
|
/// nextimpl_working_buffer_offset is set in the seek function (lazy seek). So we have to
|
||||||
|
/// check that we are not seeking beyond working buffer.
|
||||||
|
if (nextimpl_working_buffer_offset > working_buffer.size())
|
||||||
|
throw Exception("Required to move position beyond the decompressed block"
|
||||||
|
" (pos: " + toString(nextimpl_working_buffer_offset) + ", block size: " + toString(working_buffer.size()) + ")",
|
||||||
|
ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -67,33 +74,34 @@ CompressedReadBufferFromFile::CompressedReadBufferFromFile(
|
|||||||
|
|
||||||
void CompressedReadBufferFromFile::seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block)
|
void CompressedReadBufferFromFile::seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block)
|
||||||
{
|
{
|
||||||
|
/// Nothing to do if we already at required position
|
||||||
|
if (file_in.getPosition() - size_compressed == offset_in_compressed_file && /// correct position in compressed file
|
||||||
|
(offset() == offset_in_decompressed_block /// correct position in buffer or
|
||||||
|
|| nextimpl_working_buffer_offset == offset_in_decompressed_block)) /// we will move our position to correct one
|
||||||
|
return;
|
||||||
|
|
||||||
|
/// Our seek is within working_buffer, so just move the position
|
||||||
if (size_compressed &&
|
if (size_compressed &&
|
||||||
offset_in_compressed_file == file_in.getPosition() - size_compressed &&
|
offset_in_compressed_file == file_in.getPosition() - size_compressed &&
|
||||||
offset_in_decompressed_block <= working_buffer.size())
|
offset_in_decompressed_block <= working_buffer.size())
|
||||||
{
|
{
|
||||||
bytes += offset();
|
|
||||||
pos = working_buffer.begin() + offset_in_decompressed_block;
|
pos = working_buffer.begin() + offset_in_decompressed_block;
|
||||||
/// `bytes` can overflow and get negative, but in `count()` everything will overflow back and get right.
|
|
||||||
bytes -= offset();
|
|
||||||
}
|
}
|
||||||
else
|
else /// Our seek outside working buffer, so perform "lazy seek"
|
||||||
{
|
{
|
||||||
|
/// Actually seek compressed file
|
||||||
file_in.seek(offset_in_compressed_file, SEEK_SET);
|
file_in.seek(offset_in_compressed_file, SEEK_SET);
|
||||||
|
/// We will discard our working_buffer, but have to account rest bytes
|
||||||
bytes += offset();
|
bytes += offset();
|
||||||
nextImpl();
|
/// No data, everything discarded
|
||||||
|
pos = working_buffer.end();
|
||||||
if (offset_in_decompressed_block > working_buffer.size())
|
size_compressed = 0;
|
||||||
throw Exception("Seek position is beyond the decompressed block"
|
/// Remember required offset in decompressed block which will be set in
|
||||||
" (pos: " + toString(offset_in_decompressed_block) + ", block size: " + toString(working_buffer.size()) + ")",
|
/// the next ReadBuffer::next() call
|
||||||
ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
|
nextimpl_working_buffer_offset = offset_in_decompressed_block;
|
||||||
|
|
||||||
pos = working_buffer.begin() + offset_in_decompressed_block;
|
|
||||||
bytes -= offset();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
size_t CompressedReadBufferFromFile::readBig(char * to, size_t n)
|
size_t CompressedReadBufferFromFile::readBig(char * to, size_t n)
|
||||||
{
|
{
|
||||||
size_t bytes_read = 0;
|
size_t bytes_read = 0;
|
||||||
@ -134,7 +142,11 @@ size_t CompressedReadBufferFromFile::readBig(char * to, size_t n)
|
|||||||
working_buffer = Buffer(memory.data(), &memory[size_decompressed]);
|
working_buffer = Buffer(memory.data(), &memory[size_decompressed]);
|
||||||
|
|
||||||
decompress(working_buffer, size_decompressed, size_compressed_without_checksum);
|
decompress(working_buffer, size_decompressed, size_compressed_without_checksum);
|
||||||
pos = working_buffer.begin();
|
|
||||||
|
/// Manually take nextimpl_working_buffer_offset into account, because we don't use
|
||||||
|
/// nextImpl in this method.
|
||||||
|
pos = working_buffer.begin() + nextimpl_working_buffer_offset;
|
||||||
|
nextimpl_working_buffer_offset = 0;
|
||||||
|
|
||||||
bytes_read += read(to + bytes_read, n - bytes_read);
|
bytes_read += read(to + bytes_read, n - bytes_read);
|
||||||
break;
|
break;
|
||||||
|
@ -28,6 +28,19 @@ private:
|
|||||||
ReadBufferFromFileBase & file_in;
|
ReadBufferFromFileBase & file_in;
|
||||||
size_t size_compressed = 0;
|
size_t size_compressed = 0;
|
||||||
|
|
||||||
|
/// This field inherited from ReadBuffer. It's used to perform "lazy" seek, so in seek() call we:
|
||||||
|
/// 1) actually seek only underlying compressed file_in to offset_in_compressed_file;
|
||||||
|
/// 2) reset current working_buffer;
|
||||||
|
/// 3) remember the position in decompressed block in nextimpl_working_buffer_offset.
|
||||||
|
/// After following ReadBuffer::next() -> nextImpl call we will read new data into working_buffer and
|
||||||
|
/// ReadBuffer::next() will move our position in the fresh working_buffer to nextimpl_working_buffer_offset and
|
||||||
|
/// reset it to zero.
|
||||||
|
///
|
||||||
|
/// NOTE: We have independent readBig implementation, so we have to take
|
||||||
|
/// nextimpl_working_buffer_offset into account there as well.
|
||||||
|
///
|
||||||
|
/* size_t nextimpl_working_buffer_offset; */
|
||||||
|
|
||||||
bool nextImpl() override;
|
bool nextImpl() override;
|
||||||
void prefetch() override;
|
void prefetch() override;
|
||||||
|
|
||||||
@ -37,6 +50,9 @@ public:
|
|||||||
CompressedReadBufferFromFile(
|
CompressedReadBufferFromFile(
|
||||||
const std::string & path, const ReadSettings & settings, size_t estimated_size, bool allow_different_codecs_ = false);
|
const std::string & path, const ReadSettings & settings, size_t estimated_size, bool allow_different_codecs_ = false);
|
||||||
|
|
||||||
|
/// Seek is lazy in some sense. We move position in compressed file_in to offset_in_compressed_file, but don't
|
||||||
|
/// read data into working_buffer and don't shit our position to offset_in_decompressed_block. Instead
|
||||||
|
/// we store this offset inside nextimpl_working_buffer_offset.
|
||||||
void seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block);
|
void seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block);
|
||||||
|
|
||||||
size_t readBig(char * to, size_t n) override;
|
size_t readBig(char * to, size_t n) override;
|
||||||
|
@ -75,6 +75,7 @@ size_t MergeTreeReaderWide::readRows(size_t from_mark, bool continue_reading, si
|
|||||||
OffsetColumns offset_columns;
|
OffsetColumns offset_columns;
|
||||||
std::unordered_map<String, ISerialization::SubstreamsCache> caches;
|
std::unordered_map<String, ISerialization::SubstreamsCache> caches;
|
||||||
|
|
||||||
|
std::unordered_set<std::string> prefetched_streams;
|
||||||
if (disk->isRemote() ? settings.read_settings.remote_fs_prefetch : settings.read_settings.local_fs_prefetch)
|
if (disk->isRemote() ? settings.read_settings.remote_fs_prefetch : settings.read_settings.local_fs_prefetch)
|
||||||
{
|
{
|
||||||
/// Request reading of data in advance,
|
/// Request reading of data in advance,
|
||||||
@ -86,7 +87,7 @@ size_t MergeTreeReaderWide::readRows(size_t from_mark, bool continue_reading, si
|
|||||||
try
|
try
|
||||||
{
|
{
|
||||||
auto & cache = caches[column_from_part.getNameInStorage()];
|
auto & cache = caches[column_from_part.getNameInStorage()];
|
||||||
prefetch(column_from_part, from_mark, continue_reading, cache);
|
prefetch(column_from_part, from_mark, continue_reading, cache, prefetched_streams);
|
||||||
}
|
}
|
||||||
catch (Exception & e)
|
catch (Exception & e)
|
||||||
{
|
{
|
||||||
@ -98,6 +99,7 @@ size_t MergeTreeReaderWide::readRows(size_t from_mark, bool continue_reading, si
|
|||||||
}
|
}
|
||||||
|
|
||||||
auto name_and_type = columns.begin();
|
auto name_and_type = columns.begin();
|
||||||
|
|
||||||
for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type)
|
for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type)
|
||||||
{
|
{
|
||||||
auto column_from_part = getColumnFromPart(*name_and_type);
|
auto column_from_part = getColumnFromPart(*name_and_type);
|
||||||
@ -114,7 +116,9 @@ size_t MergeTreeReaderWide::readRows(size_t from_mark, bool continue_reading, si
|
|||||||
size_t column_size_before_reading = column->size();
|
size_t column_size_before_reading = column->size();
|
||||||
auto & cache = caches[column_from_part.getNameInStorage()];
|
auto & cache = caches[column_from_part.getNameInStorage()];
|
||||||
|
|
||||||
readData(column_from_part, column, from_mark, continue_reading, max_rows_to_read, cache);
|
readData(
|
||||||
|
column_from_part, column, from_mark, continue_reading,
|
||||||
|
max_rows_to_read, cache, /* was_prefetched =*/ !prefetched_streams.empty());
|
||||||
|
|
||||||
/// For elements of Nested, column_size_before_reading may be greater than column size
|
/// For elements of Nested, column_size_before_reading may be greater than column size
|
||||||
/// if offsets are not empty and were already read, but elements are empty.
|
/// if offsets are not empty and were already read, but elements are empty.
|
||||||
@ -190,11 +194,11 @@ void MergeTreeReaderWide::addStreams(const NameAndTypePair & name_and_type,
|
|||||||
|
|
||||||
|
|
||||||
static ReadBuffer * getStream(
|
static ReadBuffer * getStream(
|
||||||
bool stream_for_prefix,
|
bool seek_to_start,
|
||||||
const ISerialization::SubstreamPath & substream_path,
|
const ISerialization::SubstreamPath & substream_path,
|
||||||
MergeTreeReaderWide::FileStreams & streams,
|
MergeTreeReaderWide::FileStreams & streams,
|
||||||
const NameAndTypePair & name_and_type,
|
const NameAndTypePair & name_and_type,
|
||||||
size_t from_mark, bool continue_reading,
|
size_t from_mark, bool seek_to_mark,
|
||||||
ISerialization::SubstreamsCache & cache)
|
ISerialization::SubstreamsCache & cache)
|
||||||
{
|
{
|
||||||
/// If substream have already been read.
|
/// If substream have already been read.
|
||||||
@ -209,9 +213,9 @@ static ReadBuffer * getStream(
|
|||||||
|
|
||||||
MergeTreeReaderStream & stream = *it->second;
|
MergeTreeReaderStream & stream = *it->second;
|
||||||
|
|
||||||
if (stream_for_prefix)
|
if (seek_to_start)
|
||||||
stream.seekToStart();
|
stream.seekToStart();
|
||||||
else if (!continue_reading)
|
else if (seek_to_mark)
|
||||||
stream.seekToMark(from_mark);
|
stream.seekToMark(from_mark);
|
||||||
|
|
||||||
return stream.data_buffer;
|
return stream.data_buffer;
|
||||||
@ -222,15 +226,25 @@ void MergeTreeReaderWide::prefetch(
|
|||||||
const NameAndTypePair & name_and_type,
|
const NameAndTypePair & name_and_type,
|
||||||
size_t from_mark,
|
size_t from_mark,
|
||||||
bool continue_reading,
|
bool continue_reading,
|
||||||
ISerialization::SubstreamsCache & cache)
|
ISerialization::SubstreamsCache & cache,
|
||||||
|
std::unordered_set<std::string> & prefetched_streams)
|
||||||
{
|
{
|
||||||
const auto & name = name_and_type.name;
|
const auto & name = name_and_type.name;
|
||||||
auto & serialization = serializations[name];
|
auto & serialization = serializations[name];
|
||||||
|
|
||||||
serialization->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
|
serialization->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
|
||||||
{
|
{
|
||||||
if (ReadBuffer * buf = getStream(false, substream_path, streams, name_and_type, from_mark, continue_reading, cache))
|
bool seek_to_start = deserialize_binary_bulk_state_map.count(name) == 0;
|
||||||
|
String stream_name = ISerialization::getFileNameForStream(name_and_type, substream_path);
|
||||||
|
|
||||||
|
if (!prefetched_streams.count(stream_name))
|
||||||
|
{
|
||||||
|
bool seek_to_mark = !continue_reading;
|
||||||
|
if (ReadBuffer * buf = getStream(seek_to_start, substream_path, streams, name_and_type, from_mark, seek_to_mark, cache))
|
||||||
buf->prefetch();
|
buf->prefetch();
|
||||||
|
|
||||||
|
prefetched_streams.insert(stream_name);
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -238,7 +252,7 @@ void MergeTreeReaderWide::prefetch(
|
|||||||
void MergeTreeReaderWide::readData(
|
void MergeTreeReaderWide::readData(
|
||||||
const NameAndTypePair & name_and_type, ColumnPtr & column,
|
const NameAndTypePair & name_and_type, ColumnPtr & column,
|
||||||
size_t from_mark, bool continue_reading, size_t max_rows_to_read,
|
size_t from_mark, bool continue_reading, size_t max_rows_to_read,
|
||||||
ISerialization::SubstreamsCache & cache)
|
ISerialization::SubstreamsCache & cache, bool was_prefetched)
|
||||||
{
|
{
|
||||||
double & avg_value_size_hint = avg_value_size_hints[name_and_type.name];
|
double & avg_value_size_hint = avg_value_size_hints[name_and_type.name];
|
||||||
ISerialization::DeserializeBinaryBulkSettings deserialize_settings;
|
ISerialization::DeserializeBinaryBulkSettings deserialize_settings;
|
||||||
@ -251,14 +265,21 @@ void MergeTreeReaderWide::readData(
|
|||||||
{
|
{
|
||||||
deserialize_settings.getter = [&](const ISerialization::SubstreamPath & substream_path)
|
deserialize_settings.getter = [&](const ISerialization::SubstreamPath & substream_path)
|
||||||
{
|
{
|
||||||
return getStream(true, substream_path, streams, name_and_type, from_mark, continue_reading, cache);
|
/// If data was already prefetched we don't need to seek to start
|
||||||
|
bool seek_to_start = !was_prefetched;
|
||||||
|
bool seek_to_mark = !was_prefetched && !continue_reading;
|
||||||
|
return getStream(seek_to_start, substream_path, streams, name_and_type, from_mark, seek_to_mark, cache);
|
||||||
};
|
};
|
||||||
serialization->deserializeBinaryBulkStatePrefix(deserialize_settings, deserialize_binary_bulk_state_map[name]);
|
serialization->deserializeBinaryBulkStatePrefix(deserialize_settings, deserialize_binary_bulk_state_map[name]);
|
||||||
}
|
}
|
||||||
|
|
||||||
deserialize_settings.getter = [&](const ISerialization::SubstreamPath & substream_path)
|
deserialize_settings.getter = [&](const ISerialization::SubstreamPath & substream_path)
|
||||||
{
|
{
|
||||||
return getStream(false, substream_path, streams, name_and_type, from_mark, continue_reading, cache);
|
bool seek_to_mark = !was_prefetched && !continue_reading;
|
||||||
|
|
||||||
|
return getStream(
|
||||||
|
/* seek_to_start = */false, substream_path, streams, name_and_type, from_mark,
|
||||||
|
seek_to_mark, cache);
|
||||||
};
|
};
|
||||||
deserialize_settings.continuous_reading = continue_reading;
|
deserialize_settings.continuous_reading = continue_reading;
|
||||||
auto & deserialize_state = deserialize_binary_bulk_state_map[name];
|
auto & deserialize_state = deserialize_binary_bulk_state_map[name];
|
||||||
|
@ -46,14 +46,15 @@ private:
|
|||||||
void readData(
|
void readData(
|
||||||
const NameAndTypePair & name_and_type, ColumnPtr & column,
|
const NameAndTypePair & name_and_type, ColumnPtr & column,
|
||||||
size_t from_mark, bool continue_reading, size_t max_rows_to_read,
|
size_t from_mark, bool continue_reading, size_t max_rows_to_read,
|
||||||
ISerialization::SubstreamsCache & cache);
|
ISerialization::SubstreamsCache & cache, bool was_prefetched);
|
||||||
|
|
||||||
/// Make next readData more simple by calling 'prefetch' of all related ReadBuffers.
|
/// Make next readData more simple by calling 'prefetch' of all related ReadBuffers (column streams).
|
||||||
void prefetch(
|
void prefetch(
|
||||||
const NameAndTypePair & name_and_type,
|
const NameAndTypePair & name_and_type,
|
||||||
size_t from_mark,
|
size_t from_mark,
|
||||||
bool continue_reading,
|
bool continue_reading,
|
||||||
ISerialization::SubstreamsCache & cache);
|
ISerialization::SubstreamsCache & cache,
|
||||||
|
std::unordered_set<std::string> & prefetched_streams); /// if stream was already prefetched do nothing
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user