Fix reading from TinyLog.

This commit is contained in:
Vitaly Baranov 2021-11-01 03:39:38 +03:00
parent ca48c5f302
commit d2363d625c
2 changed files with 82 additions and 54 deletions

View File

@ -7,12 +7,13 @@
#include <Interpreters/evaluateConstantExpression.h>
#include <IO/LimitReadBuffer.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFileBase.h>
#include <IO/WriteHelpers.h>
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedWriteBuffer.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <DataTypes/NestedUtils.h>
@ -63,15 +64,23 @@ public:
}
LogSource(
size_t block_size_, const NamesAndTypesList & columns_, const StorageLog & storage_,
size_t rows_limit_, const std::vector<size_t> & offsets_, ReadSettings read_settings_)
: SourceWithProgress(getHeader(columns_)),
block_size(block_size_),
columns(columns_),
storage(storage_),
rows_limit(rows_limit_),
offsets(offsets_),
read_settings(std::move(read_settings_))
size_t block_size_,
const NamesAndTypesList & columns_,
const StorageLog & storage_,
size_t rows_limit_,
const std::vector<size_t> & offsets_,
const std::vector<size_t> & file_sizes_,
bool limited_by_file_sizes_,
ReadSettings read_settings_)
: SourceWithProgress(getHeader(columns_))
, block_size(block_size_)
, columns(columns_)
, storage(storage_)
, rows_limit(rows_limit_)
, offsets(offsets_)
, file_sizes(file_sizes_)
, limited_by_file_sizes(limited_by_file_sizes_)
, read_settings(std::move(read_settings_))
{
}
@ -81,30 +90,38 @@ protected:
Chunk generate() override;
private:
size_t block_size;
NamesAndTypesList columns;
const size_t block_size;
const NamesAndTypesList columns;
const StorageLog & storage;
size_t rows_limit; /// The maximum number of rows that can be read
const size_t rows_limit; /// The maximum number of rows that can be read
size_t rows_read = 0;
bool is_finished = false;
std::vector<size_t> offsets;
ReadSettings read_settings;
const std::vector<size_t> offsets;
const std::vector<size_t> file_sizes;
const bool limited_by_file_sizes;
const ReadSettings read_settings;
struct Stream
{
/// We use `disk->getFileSize(data_path)` to get the size of the file here.
/// We cannot just use `storage.file_checker` for that purpose, because `storage.rwlock` is not locked at this point.
Stream(const DiskPtr & disk, const String & data_path, size_t offset, ReadSettings read_settings_)
: plain(disk->readFile(data_path, read_settings_.adjustBufferSize(disk->getFileSize(data_path))))
, compressed(*plain)
Stream(const DiskPtr & disk, const String & data_path, size_t offset, size_t file_size, bool limited_by_file_size, ReadSettings read_settings_)
{
plain = disk->readFile(data_path, read_settings_.adjustBufferSize(file_size));
if (offset)
plain->seek(offset, SEEK_SET);
if (limited_by_file_size)
{
limited.emplace(*plain, file_size - offset, false);
compressed.emplace(*limited);
}
else
compressed.emplace(*plain);
}
std::unique_ptr<ReadBufferFromFileBase> plain;
CompressedReadBuffer compressed;
std::optional<LimitReadBuffer> limited;
std::optional<CompressedReadBuffer> compressed;
};
using FileStreams = std::map<String, Stream>;
@ -194,9 +211,10 @@ void LogSource::readData(const NameAndTypePair & name_and_type, ColumnPtr & colu
const auto & data_file = *data_file_it->second;
size_t offset = stream_for_prefix ? 0 : offsets[data_file.index];
size_t file_size = file_sizes[data_file.index];
auto it = streams.try_emplace(data_file_name, storage.disk, data_file.path, offset, read_settings).first;
return &it->second.compressed;
auto it = streams.try_emplace(data_file_name, storage.disk, data_file.path, offset, file_size, limited_by_file_sizes, read_settings).first;
return &it->second.compressed.value();
};
};
@ -215,20 +233,24 @@ bool LogSource::isFinished()
if (is_finished)
return true;
if (rows_limit == std::numeric_limits<size_t>::max())
/// Check for row limit.
if (rows_read == rows_limit)
{
/// No limit for the row count, check for EOF.
if (!streams.empty() && streams.begin()->second.compressed.eof())
is_finished = true;
}
else
{
/// There is a limit for the row count, check that limit.
if (rows_read == rows_limit)
is_finished = true;
is_finished = true;
return true;
}
return is_finished;
if (limited_by_file_sizes)
{
/// Check for EOF.
if (!streams.empty() && streams.begin()->second.compressed->eof())
{
is_finished = true;
return true;
}
}
return false;
}
@ -768,33 +790,40 @@ Pipe StorageLog::read(
std::vector<size_t> offsets;
offsets.resize(num_data_files, 0);
std::vector<size_t> file_sizes;
file_sizes.resize(num_data_files, 0);
for (const auto & data_file : data_files)
file_sizes[data_file.index] = file_checker.getFileSize(data_file.path);
/// For TinyLog (use_marks_file == false) there is no row limit and we just read
/// the data files up to their sizes.
bool limited_by_file_sizes = !use_marks_file;
size_t row_limit = std::numeric_limits<size_t>::max();
ReadSettings read_settings = context->getReadSettings();
Pipes pipes;
for (size_t stream = 0; stream < num_streams; ++stream)
{
size_t start_row, end_row;
if (use_marks_file)
{
size_t mark_begin = stream * num_marks / num_streams;
size_t mark_end = (stream + 1) * num_marks / num_streams;
start_row = mark_begin ? marks_with_real_row_count[mark_begin - 1].rows : 0;
end_row = mark_end ? marks_with_real_row_count[mark_end - 1].rows : 0;
size_t start_row = mark_begin ? marks_with_real_row_count[mark_begin - 1].rows : 0;
size_t end_row = mark_end ? marks_with_real_row_count[mark_end - 1].rows : 0;
row_limit = end_row - start_row;
for (const auto & data_file : data_files)
offsets[data_file.index] = data_file.marks[mark_begin].offset;
}
else
{
start_row = 0;
end_row = std::numeric_limits<size_t>::max(); // row limit not set
}
pipes.emplace_back(std::make_shared<LogSource>(
max_block_size,
all_columns,
*this,
end_row - start_row,
row_limit,
offsets,
file_sizes,
limited_by_file_sizes,
read_settings));
}

View File

@ -82,7 +82,8 @@ public:
ReadSettings read_settings_,
std::shared_ptr<const IndexForNativeFormat> indices_,
IndexForNativeFormat::Blocks::const_iterator index_begin_,
IndexForNativeFormat::Blocks::const_iterator index_end_)
IndexForNativeFormat::Blocks::const_iterator index_end_,
size_t file_size_)
: SourceWithProgress(getHeader(storage_, metadata_snapshot_, column_names, index_begin_, index_end_))
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
@ -90,6 +91,7 @@ public:
, indices(indices_)
, index_begin(index_begin_)
, index_end(index_end_)
, file_size(file_size_)
{
}
@ -125,6 +127,7 @@ private:
std::shared_ptr<const IndexForNativeFormat> indices;
IndexForNativeFormat::Blocks::const_iterator index_begin;
IndexForNativeFormat::Blocks::const_iterator index_end;
size_t file_size;
Block header;
@ -143,12 +146,7 @@ private:
started = true;
String data_file_path = storage.table_path + "data.bin";
/// We cannot just use `storage.file_checker` to get the size of the file here,
/// because `storage.rwlock` is not locked at this point.
size_t data_file_size = storage.disk->getFileSize(data_file_path);
data_in.emplace(storage.disk->readFile(data_file_path, read_settings.adjustBufferSize(data_file_size)));
data_in.emplace(storage.disk->readFile(data_file_path, read_settings.adjustBufferSize(file_size)));
block_in.emplace(*data_in, 0, index_begin, index_end);
}
}
@ -351,7 +349,8 @@ Pipe StorageStripeLog::read(
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
if (!file_checker.getFileSize(data_file_path))
size_t data_file_size = file_checker.getFileSize(data_file_path);
if (!data_file_size)
return Pipe(std::make_shared<NullSource>(metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID())));
auto indices_for_selected_columns
@ -373,7 +372,7 @@ Pipe StorageStripeLog::read(
std::advance(end, (stream + 1) * size / num_streams);
pipes.emplace_back(std::make_shared<StripeLogSource>(
*this, metadata_snapshot, column_names, read_settings, indices_for_selected_columns, begin, end));
*this, metadata_snapshot, column_names, read_settings, indices_for_selected_columns, begin, end, data_file_size));
}
/// We do not keep read lock directly at the time of reading, because we read ranges of data that do not change.