polymorphic parts (development)

This commit is contained in:
CurtizJ 2019-11-28 23:14:41 +03:00
parent 55deeea608
commit 7dbdbff748
14 changed files with 80 additions and 29 deletions

View File

@ -265,6 +265,8 @@ void DataTypeArray::deserializeBinaryBulkWithMultipleStreams(
/// Adjust value size hint. Divide it to the average array size.
settings.avg_value_size_hint = nested_limit ? settings.avg_value_size_hint / nested_limit * offset_values.size() : 0;
std::cerr << "nested_limit: " << nested_limit << "\n";
nested->deserializeBinaryBulkWithMultipleStreams(nested_column, nested_limit, settings, state);
settings.path.pop_back();

View File

@ -221,6 +221,8 @@ void IMergeTreeDataPartWriter::calculateAndSerializePrimaryIndex(const Block & p
auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock();
/// Write index. The index contains Primary Key value for each `index_granularity` row.
std::cerr << "writing index...\n";
for (size_t i = index_offset; i < rows;)
{
if (storage.hasPrimaryKey())
@ -233,10 +235,12 @@ void IMergeTreeDataPartWriter::calculateAndSerializePrimaryIndex(const Block & p
}
}
++current_mark;
if (current_mark < index_granularity.getMarksCount())
i += index_granularity.getMarkRows(current_mark);
else
std::cerr << "(index) i: " << i << "\n";
std::cerr << "(index) current_mark: " << current_mark << "\n";
std::cerr << "(index) rows in mark: " << index_granularity.getMarkRows(current_mark) << "\n";
i += index_granularity.getMarkRows(current_mark++);
if (current_mark >= index_granularity.getMarksCount())
break;
}

View File

@ -39,9 +39,6 @@ void MergeTreeDataPartWriterCompact::write(
const Block & block, const IColumn::Permutation * permutation,
const Block & primary_key_block, const Block & skip_indexes_block)
{
if (!header)
header = block.cloneEmpty();
/// Fill index granularity for this block
/// if it's unknown (in case of insert data or horizontal merge,
/// but not in case of vertical merge)
@ -72,6 +69,9 @@ void MergeTreeDataPartWriterCompact::write(
result_block = block;
}
if (!header)
header = result_block.cloneEmpty();
auto result = squashing.add(result_block.mutateColumns());
if (!result.ready)
return;

View File

@ -106,6 +106,8 @@ void MergeTreeDataPartWriterWide::write(const Block & block,
fillIndexGranularity(block);
std::cerr << "(MergeTreeDataPartWriterWide::write) marks_count: " << index_granularity.getMarksCount() << "\n";
std::cerr << "(MergeTreeDataPartWriterWide::write) current_mark: " << current_mark << "\n";
WrittenOffsetColumns offset_columns;
MarkWithOffset result;

View File

@ -679,6 +679,12 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
const Settings & settings,
const ReaderSettings & reader_settings) const
{
std::cerr << "marks to read: ";
for (const auto & part : parts)
for (auto range : part.ranges)
std::cerr << "(" << range.begin << ", " << range.end << ") ";
/// Count marks for each part.
std::vector<size_t> sum_marks_in_parts(parts.size());
size_t sum_marks = 0;

View File

@ -62,6 +62,7 @@ void MergeTreeIndexGranularityInfo::setAdaptive(size_t index_granularity_bytes_,
{
is_adaptive = true;
mark_size_in_bytes = getAdaptiveMrkSize(part_type, columns_num);
skip_index_mark_size_in_bytes = sizeof(MarkInCompressedFile) + sizeof(UInt64);
marks_file_extension = getAdaptiveMrkExtension(part_type);
index_granularity_bytes = index_granularity_bytes_;
}
@ -69,7 +70,7 @@ void MergeTreeIndexGranularityInfo::setAdaptive(size_t index_granularity_bytes_,
void MergeTreeIndexGranularityInfo::setNonAdaptive()
{
is_adaptive = false;
mark_size_in_bytes = getNonAdaptiveMrkSize();
mark_size_in_bytes = skip_index_mark_size_in_bytes = getNonAdaptiveMrkSize();
marks_file_extension = getNonAdaptiveMrkExtension();
index_granularity_bytes = 0;
}

View File

@ -3,6 +3,7 @@
#include <optional>
#include <Core/Types.h>
#include <Storages/MergeTree/IMergeTreeDataPart_fwd.h>
#include <DataStreams/MarkInCompressedFile.h>
namespace DB
{
@ -17,7 +18,9 @@ public:
String marks_file_extension;
/// Size of one mark in file two or three size_t numbers
UInt16 mark_size_in_bytes = 0;
UInt32 mark_size_in_bytes = 0;
UInt8 skip_index_mark_size_in_bytes = 0;
/// Is stride in rows between marks non fixed?
bool is_adaptive = false;
@ -53,7 +56,7 @@ private:
};
constexpr inline auto getNonAdaptiveMrkExtension() { return ".mrk"; }
constexpr inline auto getNonAdaptiveMrkSize() { return sizeof(UInt64) * 2; }
constexpr inline auto getNonAdaptiveMrkSize() { return sizeof(MarkInCompressedFile) * 2; }
inline std::string getAdaptiveMrkExtension(MergeTreeDataPartType part_type)
{

View File

@ -12,6 +12,7 @@ MergeTreeIndexReader::MergeTreeIndexReader(
{ 0, DBMS_DEFAULT_BUFFER_SIZE, false}, nullptr, nullptr,
part_->getFileSizeOrZero(index->getFileName() + ".idx"),
&part_->index_granularity_info,
MergeTreeReaderStream::ReadingMode::INDEX,
ReadBufferFromFileBase::ProfileCallback{}, CLOCK_MONOTONIC_COARSE)
{
stream.seekToStart();

View File

@ -56,14 +56,17 @@ size_t MergeTreeReaderCompact::readRows(size_t from_mark, bool continue_reading,
/// FIXME compute correct granularity
std::cerr << "(MergeTreeReaderCompact::readRows) max_rows_to_read: " << max_rows_to_read << "\n";
size_t read_rows = 0;
std::cerr << "(MergeTreeReaderCompact::readRows) from_mark: " << from_mark << "\n";
std::cerr << "(MergeTreeReaderCompact::readRows) continue_reading: " << continue_reading << "\n";
if (continue_reading)
from_mark = next_mark;
size_t read_rows = 0;
while (read_rows < max_rows_to_read)
{
size_t rows_to_read = data_part->index_granularity.getMarkRows(from_mark);
std::cerr << "(MergeTreeReaderCompact::readRows) rows_to_read: " << rows_to_read << "\n";
for (const auto & it : columns)
{
bool append = res.has(it.name);
@ -75,11 +78,16 @@ size_t MergeTreeReaderCompact::readRows(size_t from_mark, bool continue_reading,
try
{
// size_t column_size_before_reading = column->size();
size_t column_size_before_reading = column->size();
size_t column_position = data_part->getColumnPosition(it.name);
readData(it.name, *it.type, *column, from_mark, column_position, rows_to_read);
size_t read_rows_in_column = column->size() - column_size_before_reading;
if (read_rows_in_column < rows_to_read)
throw Exception("Cannot read all data in MergeTreeReaderCompact. Rows read: " + toString(read_rows_in_column) +
". Rows expected: "+ toString(rows_to_read) + ".", ErrorCodes::CANNOT_READ_ALL_DATA);
/// For elements of Nested, column_size_before_reading may be greater than column size
/// if offsets are not empty and were already read, but elements are empty.
/// FIXME
@ -101,9 +109,13 @@ size_t MergeTreeReaderCompact::readRows(size_t from_mark, bool continue_reading,
++from_mark;
read_rows += rows_to_read;
std::cerr << "(MergeTreeReaderCompact::readRows) cur mark: " << from_mark << "\n";
std::cerr << "(MergeTreeReaderCompact::readRows) read_rows: " << read_rows << "\n";
std::cerr << "(MergeTreeReaderCompact::readRows) rows_to_read: " << rows_to_read << "\n";
}
std::cerr << "(MergeTreeReaderCompact::readRows) read_rows: " << read_rows << "\n";
next_mark = from_mark;
return read_rows;
}
@ -118,23 +130,20 @@ void MergeTreeReaderCompact::readData(
std::cerr << "(MergeTreeReaderCompact::readData) rows_to_read: " << rows_to_read << "\n";
std::cerr << "(MergeTreeReaderCompact::readData) start reading column: " << name << "\n";
/// FIXME seek only if needed
seekToMark(from_mark, column_position);
IDataType::DeserializeBinaryBulkSettings deserialize_settings;
deserialize_settings.getter = [&](IDataType::SubstreamPath) -> ReadBuffer * { return data_buffer; };
deserialize_settings.avg_value_size_hint = avg_value_size_hints[name];
// deserialize_settings.avg_value_size_hint = avg_value_size_hints[name];
deserialize_settings.position_independent_encoding = false;
IDataType::DeserializeBinaryBulkStatePtr state;
type.deserializeBinaryBulkStatePrefix(deserialize_settings, state);
type.deserializeBinaryBulkWithMultipleStreams(column, rows_to_read, deserialize_settings, state);
std::cerr << "(MergeTreeReaderCompact::readData) end reading column rows: " << column.size() << "\n";
std::cerr << "(MergeTreeReaderCompact::readData) end reading column: " << name << "\n";
// if (column.size() != rows_to_read)
// throw Exception("Cannot read all data in NativeBlockInputStream. Rows read: " + toString(column.size()) + ". Rows expected: "+ toString(rows_to_read) + ".",
// ErrorCodes::CANNOT_READ_ALL_DATA);
// std::cerr << "(MergeTreeReaderCompact::readData) end reading column rows: " << column.size() << "\n";
// std::cerr << "(MergeTreeReaderCompact::readData) end reading column: " << name << "\n";
}
@ -198,7 +207,7 @@ void MergeTreeReaderCompact::seekToMark(size_t row_index, size_t column_index)
{
MarkInCompressedFile mark = marks_loader.getMark(row_index, column_index);
std::cerr << "(MergeTreeReaderCompact::seekToMark) mark: (" << mark.offset_in_compressed_file << ", " << mark.offset_in_decompressed_block << "\n";
// std::cerr << "(MergeTreeReaderCompact::seekToMark) mark: (" << mark.offset_in_compressed_file << ", " << mark.offset_in_decompressed_block << "\n";
try
{

View File

@ -33,6 +33,8 @@ private:
MergeTreeMarksLoader marks_loader;
size_t next_mark = 0;
void initMarksLoader();
void seekToStart();
void seekToMark(size_t row, size_t col);

View File

@ -15,16 +15,18 @@ namespace ErrorCodes
MergeTreeReaderStream::MergeTreeReaderStream(
const String & path_prefix_, const String & data_file_extension_, size_t marks_count_,
const String & path_prefix_,const String & data_file_extension_, size_t marks_count_,
const MarkRanges & all_mark_ranges,
const ReaderSettings & settings,
MarkCache * mark_cache_,
UncompressedCache * uncompressed_cache, size_t file_size,
const MergeTreeIndexGranularityInfo * index_granularity_info_,
ReadingMode mode_,
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type)
: path_prefix(path_prefix_), data_file_extension(data_file_extension_), marks_count(marks_count_)
, mark_cache(mark_cache_), save_marks_in_cache(settings.save_marks_in_cache)
, index_granularity_info(index_granularity_info_)
, mode(mode_)
{
/// Compute the size of the buffer.
size_t max_mark_range_bytes = 0;
@ -115,8 +117,14 @@ void MergeTreeReaderStream::initMarksLoader()
/// Memory for marks must not be accounted as memory usage for query, because they are stored in shared cache.
auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock();
std::cerr << "data_file_extension: " << data_file_extension << '\n';
size_t file_size = Poco::File(mrk_path).getSize();
size_t expected_file_size = index_granularity_info->mark_size_in_bytes * marks_count;
size_t mark_size = mode == ReadingMode::INDEX
? index_granularity_info->skip_index_mark_size_in_bytes
: index_granularity_info->mark_size_in_bytes;
size_t expected_file_size = mark_size * marks_count;
if (expected_file_size != file_size)
throw Exception(
"Bad size of marks file '" + mrk_path + "': " + std::to_string(file_size) + ", must be: " + std::to_string(expected_file_size),
@ -143,7 +151,7 @@ void MergeTreeReaderStream::initMarksLoader()
buffer.seek(sizeof(size_t), SEEK_CUR);
++i;
}
if (i * index_granularity_info->mark_size_in_bytes != file_size)
if (i * mark_size != file_size)
throw Exception("Cannot read all marks from file " + mrk_path, ErrorCodes::CANNOT_READ_ALL_DATA);
}
res->protect();

View File

@ -16,12 +16,19 @@ namespace DB
class MergeTreeReaderStream
{
public:
enum class ReadingMode
{
COLUMN,
INDEX,
};
MergeTreeReaderStream(
const String & path_prefix_, const String & data_file_extension_, size_t marks_count_,
const MarkRanges & all_mark_ranges,
const ReaderSettings & settings_,
MarkCache * mark_cache, UncompressedCache * uncompressed_cache,
size_t file_size, const MergeTreeIndexGranularityInfo * index_granularity_info_,
ReadingMode mode_,
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type);
void seekToMark(size_t index);
@ -42,6 +49,7 @@ private:
bool save_marks_in_cache;
const MergeTreeIndexGranularityInfo * index_granularity_info;
ReadingMode mode;
std::unique_ptr<CachedCompressedReadBuffer> cached_buffer;
std::unique_ptr<CompressedReadBufferFromFile> non_cached_buffer;

View File

@ -50,9 +50,8 @@ MergeTreeReaderWide::MergeTreeReaderWide(const MergeTreeData::DataPartPtr & data
size_t MergeTreeReaderWide::readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Block & res)
{
std::cerr << "(MergeTreeReaderWide::readRows) columns: " << columns.toString() << "\n";
std::cerr << "(MergeTreeReaderWide::readRows) from_rows: " << from_mark << "\n";
std::cerr << "(MergeTreeReaderWide::readRows) block: " << res.dumpStructure() << "\n";
std::cerr << "(MergeTreeReaderWide::readRows) from_mark: " << from_mark << "\n";
std::cerr << "(MergeTreeReaderWide::readRows) continue_reading: " << continue_reading << "\n";
size_t read_rows = 0;
try
@ -169,6 +168,7 @@ void MergeTreeReaderWide::addStreams(const String & name, const IDataType & type
all_mark_ranges, settings, mark_cache,
uncompressed_cache, data_part->getFileSizeOrZero(stream_name + DATA_FILE_EXTENSION),
&data_part->index_granularity_info,
MergeTreeReaderStream::ReadingMode::COLUMN,
profile_callback, clock_type));
};

View File

@ -101,6 +101,11 @@ try
bool continue_reading = (current_mark != 0);
size_t rows_readed = reader->readRows(current_mark, continue_reading, rows_to_read, res);
std::cerr << "(MergeTreeSequentialBlockInputStream) rows_to_read: " << rows_to_read << '\n';
std::cerr << "(MergeTreeSequentialBlockInputStream) current_mark: " << current_mark << '\n';
std::cerr << "(MergeTreeSequentialBlockInputStream) rows_readed: " << rows_readed << '\n';
if (res)
{
res.checkNumberOfRows();