polymorphic parts (development)

This commit is contained in:
CurtizJ 2019-11-27 14:35:27 +03:00
parent 49e465d6e0
commit d1ddfbb415
11 changed files with 134 additions and 97 deletions

View File

@ -70,7 +70,6 @@ public:
return ready.tryWait(milliseconds);
}
Block getHeader() const override { return children.at(0)->getHeader(); }
void cancel(bool kill) override

View File

@ -40,6 +40,8 @@ public:
*/
Result add(MutableColumns && columns);
bool hasPendingData() { return !accumulated_columns.empty(); }
private:
size_t min_block_size_rows;
size_t min_block_size_bytes;

View File

@ -127,28 +127,30 @@ void fillIndexGranularityImpl(
/// We should be less or equal than fixed index granularity
index_granularity_for_block = std::min(fixed_index_granularity_rows, index_granularity_for_block);
/// FIXME correct index granularity for compact
// index_granularity_for_block = rows_in_block;
/// FIXME: split/join last mark for compact parts
size_t current_row;
for (current_row = index_offset; current_row < rows_in_block; current_row += index_granularity_for_block)
index_granularity.appendMark(index_granularity_for_block);
size_t rows_rest_in_block = rows_in_block - (current_row - index_granularity_for_block);
if (need_finish_last_granule && rows_rest_in_block)
{
/// If enough rows are left, create a new granule. Otherwise, extend previous granule.
/// So,real size of granule differs from index_granularity_for_block not more than 50%.
if (rows_rest_in_block * 2 >= index_granularity_for_block)
index_granularity.appendMark(rows_rest_in_block);
size_t rows_rest_in_block = rows_in_block - current_row;
std::cerr << "rows_rest_in_block: " << rows_rest_in_block << "\n";
std::cerr << "rows_rest_in_block: " << index_granularity_for_block << "\n";
/// FIXME may be remove need_finish_last_granule and do it always
if (need_finish_last_granule && rows_rest_in_block < index_granularity_for_block)
{
if (rows_rest_in_block * 2 >= index_granularity_for_block)
index_granularity.appendMark(rows_rest_in_block);
else
index_granularity.addRowsToLastMark(rows_rest_in_block);
}
else
{
index_granularity.popMark();
index_granularity.appendMark(index_granularity_for_block + rows_rest_in_block);
index_granularity.appendMark(index_granularity_for_block);
}
}
for (size_t i = 0; i < index_granularity.getMarksCount(); ++i)
std::cerr << "marks: " << index_granularity.getMarkRows(i) << "\n";
}
void IMergeTreeDataPartWriter::fillIndexGranularity(const Block & block)

View File

@ -69,6 +69,7 @@ public:
const MergeTreeIndexGranularity & index_granularity,
bool need_finish_last_granule);
/// FIXME remove indices block
virtual void write(
const Block & block, const IColumn::Permutation * permutation,
/* Blocks with already sorted index columns */

View File

@ -23,6 +23,7 @@ MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact(
storage_, columns_list_,
indices_to_recalc_, marks_file_extension_,
default_codec_, settings_, index_granularity_, true)
, squashing(storage.getSettings()->index_granularity, storage.getSettings()->index_granularity_bytes)
{
stream = std::make_unique<ColumnStream>(
DATA_FILE_NAME,
@ -37,6 +38,35 @@ MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact(
void MergeTreeDataPartWriterCompact::write(
const Block & block, const IColumn::Permutation * permutation,
const Block & primary_key_block, const Block & skip_indexes_block)
{
UNUSED(primary_key_block);
UNUSED(skip_indexes_block);
if (!header)
header = block.cloneEmpty();
Block result_block = block;
if (permutation)
{
auto it = columns_list.begin();
for (size_t i = 0; i < columns_list.size(); ++i)
{
auto & column = result_block.getByName(it->name);
column.column = column.column->permute(*permutation, 0);
}
}
auto result = squashing.add(result_block.mutateColumns());
if (!result.ready)
return;
result_block = header.cloneWithColumns(std::move(result.columns));
writeBlock(result_block);
}
void MergeTreeDataPartWriterCompact::writeBlock(const Block & block)
{
size_t total_rows = block.rows();
size_t from_mark = current_mark;
@ -48,25 +78,10 @@ void MergeTreeDataPartWriterCompact::write(
if (compute_granularity)
fillIndexGranularity(block);
ColumnsWithTypeAndName columns_to_write(columns_list.size());
auto it = columns_list.begin();
for (size_t i = 0; i < columns_list.size(); ++i, ++it)
{
if (permutation)
{
if (primary_key_block.has(it->name))
columns_to_write[i] = primary_key_block.getByName(it->name);
else if (skip_indexes_block.has(it->name))
columns_to_write[i] = skip_indexes_block.getByName(it->name);
else
{
columns_to_write[i] = block.getByName(it->name);
columns_to_write[i].column = columns_to_write[i].column->permute(*permutation, 0);
}
}
else
columns_to_write[i] = block.getByName(it->name);
}
std::cerr << "(MergeTreeDataPartWriterCompact::write) marks: " << index_granularity.getMarksCount() << "\n";
for (size_t i = 0; i < index_granularity.getMarksCount(); ++i)
std::cerr << "rows in mark: " << index_granularity.getMarkRows(i) << "\n";
std::cerr << "(MergeTreeDataPartWriterCompact::write) total_rows: " << total_rows << "\n";
@ -74,50 +89,28 @@ void MergeTreeDataPartWriterCompact::write(
{
std::cerr << "(MergeTreeDataPartWriterCompact::write) current_row: " << current_row << "\n";
bool write_marks = true;
// size_t rows_to_write = std::min(total_rows, index_granularity.getMarkRows(current_mark));
size_t rows_to_write = total_rows;
// if (compute_granularity)
// index_granularity.appendMark(total_rows);
size_t rows_to_write = index_granularity.getMarkRows(from_mark);
std::cerr << "(MergeTreeDataPartWriterCompact::write) rows_to_write: " << rows_to_write << "\n";
if (rows_to_write)
data_written = true;
// if (current_row == 0 && index_offset != 0)
// {
// rows_to_write = index_offset;
// write_marks = false;
// }
// else
// {
// rows_to_write = index_granularity.getMarkRows(current_mark);
// }
// std::cerr << "(MergeTreeDataPartWriterCompact::write) rows_to_write: " << rows_to_write << "\n";
/// There could already be enough data to compress into the new block.
if (stream->compressed.offset() >= settings.min_compress_block_size)
stream->compressed.next();
size_t next_row = 0;
if (write_marks)
writeIntBinary(rows_to_write, stream->marks);
for (const auto & it : columns_list)
{
writeIntBinary(rows_to_write, stream->marks);
for (size_t i = 0; i < columns_to_write.size(); ++i)
{
writeIntBinary(stream->plain_hashing.count(), stream->marks);
writeIntBinary(stream->compressed.offset(), stream->marks);
next_row = writeColumnSingleGranule(columns_to_write[i], current_row, rows_to_write);
}
++from_mark;
}
else
{
for (size_t i = 0; i < columns_to_write.size(); ++i)
next_row = writeColumnSingleGranule(columns_to_write[i], current_row, rows_to_write);
writeIntBinary(stream->plain_hashing.count(), stream->marks);
writeIntBinary(stream->compressed.offset(), stream->marks);
next_row = writeColumnSingleGranule(block.getByName(it.name), current_row, rows_to_write);
}
++from_mark;
current_row = next_row;
}
@ -125,6 +118,7 @@ void MergeTreeDataPartWriterCompact::write(
next_index_offset = total_rows - current_row;
}
size_t MergeTreeDataPartWriterCompact::writeColumnSingleGranule(const ColumnWithTypeAndName & column, size_t from_row, size_t number_of_rows)
{
std::cerr << "(writeColumnSingleGranule) writing column: " << column.name << "\n";
@ -146,7 +140,11 @@ size_t MergeTreeDataPartWriterCompact::writeColumnSingleGranule(const ColumnWith
}
void MergeTreeDataPartWriterCompact::finishDataSerialization(IMergeTreeDataPart::Checksums & checksums, bool sync)
{
{
auto result = squashing.add({});
if (result.ready && !result.columns.empty())
writeBlock(header.cloneWithColumns(std::move(result.columns)));
if (with_final_mark && data_written)
{
writeIntBinary(0ULL, stream->marks);

View File

@ -1,4 +1,5 @@
#include <Storages/MergeTree/IMergeTreeDataPartWriter.h>
#include <DataStreams/SquashingTransform.h>
namespace DB
{
@ -28,7 +29,12 @@ private:
size_t from_row,
size_t number_of_rows);
void writeBlock(const Block & block);
ColumnStreamPtr stream;
SquashingTransform squashing;
Block header;
};
}

View File

@ -243,7 +243,7 @@ std::pair<size_t, size_t> MergeTreeDataPartWriterWide::writeColumn(
while (current_row < total_rows)
{
size_t rows_to_write;
bool write_marks = true; /// FIXME not always true
bool write_marks = true;
/// If there is `index_offset`, then the first mark goes not immediately, but after this number of rows.
if (current_row == 0 && index_offset != 0)

View File

@ -48,6 +48,14 @@ void MergeTreeIndexGranularity::appendMark(size_t rows_count)
marks_rows_partial_sums.push_back(marks_rows_partial_sums.back() + rows_count);
}
void MergeTreeIndexGranularity::addRowsToLastMark(size_t rows_count)
{
if (marks_rows_partial_sums.empty())
marks_rows_partial_sums.push_back(rows_count);
else
marks_rows_partial_sums.back() += rows_count;
}
void MergeTreeIndexGranularity::popMark()
{
if (!marks_rows_partial_sums.empty())

View File

@ -92,6 +92,8 @@ public:
/// Add new mark with rows_count
void appendMark(size_t rows_count);
void addRowsToLastMark(size_t rows_count);
void popMark();
/// Add `size` of marks with `fixed_granularity` rows

View File

@ -31,6 +31,11 @@ size_t MergeTreeRangeReader::DelayedStream::readRows(Block & block, size_t num_r
{
if (num_rows)
{
std::cerr << "(DelayedStream::readRows) current_mark: " << current_mark << '\n';
std::cerr << "(DelayedStream::readRows) continue_reading: " << continue_reading << '\n';
std::cerr << "(DelayedStream::readRows) num_rows: " << num_rows << '\n';
size_t rows_read = merge_tree_reader->readRows(current_mark, continue_reading, num_rows, block);
continue_reading = true;

View File

@ -54,43 +54,57 @@ size_t MergeTreeReaderCompact::readRows(size_t from_mark, bool continue_reading,
UNUSED(res);
/// FIXME compute correct granularity
size_t rows_to_read = data_part->index_granularity.getMarkRows(from_mark);
std::cerr << "(MergeTreeReaderCompact::readRows) max_rows_to_read: " << max_rows_to_read << "\n";
size_t read_rows = 0;
for (const auto & it : columns)
while (read_rows < max_rows_to_read)
{
bool append = res.has(it.name);
if (!append)
res.insert(ColumnWithTypeAndName(it.type->createColumn(), it.type, it.name));
size_t rows_to_read = data_part->index_granularity.getMarkRows(from_mark);
/// To keep offsets shared. TODO Very dangerous. Get rid of this.
MutableColumnPtr column = res.getByName(it.name).column->assumeMutable();
std::cerr << "(MergeTreeReaderCompact::readRows) rows_to_read: " << rows_to_read << "\n";
try
for (const auto & it : columns)
{
size_t column_size_before_reading = column->size();
size_t column_position = data_part->getColumnPosition(it.name);
bool append = res.has(it.name);
if (!append)
res.insert(ColumnWithTypeAndName(it.type->createColumn(), it.type, it.name));
readData(it.name, *it.type, *column, from_mark, column_position, rows_to_read);
/// To keep offsets shared. TODO Very dangerous. Get rid of this.
MutableColumnPtr column = res.getByName(it.name).column->assumeMutable();
try
{
// size_t column_size_before_reading = column->size();
size_t column_position = data_part->getColumnPosition(it.name);
readData(it.name, *it.type, *column, from_mark, column_position, rows_to_read);
/// For elements of Nested, column_size_before_reading may be greater than column size
/// if offsets are not empty and were already read, but elements are empty.
/// FIXME
// if (column->size())
// read_rows_in_mark = std::max(read_rows, column->size() - column_size_before_reading);
}
catch (Exception & e)
{
/// Better diagnostics.
e.addMessage("(while reading column " + it.name + ")");
throw;
}
/// For elements of Nested, column_size_before_reading may be greater than column size
/// if offsets are not empty and were already read, but elements are empty.
if (column->size())
read_rows = std::max(read_rows, column->size() - column_size_before_reading);
}
catch (Exception & e)
{
/// Better diagnostics.
e.addMessage("(while reading column " + it.name + ")");
throw;
res.getByName(it.name).column = std::move(column);
else
res.erase(it.name);
}
if (column->size())
res.getByName(it.name).column = std::move(column);
else
res.erase(it.name);
++from_mark;
read_rows += rows_to_read;
}
std::cerr << "(MergeTreeReaderCompact::readRows) read_rows: " << read_rows << "\n";
return read_rows;
}
@ -152,7 +166,7 @@ void MergeTreeReaderCompact::initMarksLoader()
auto res = std::make_shared<MarksInCompressedFile>(marks_count * columns_num);
std::cerr << "(MergeTreeReaderCompact::loadMarks) marks_count: " << marks_count << "\n";
// std::cerr << "(MergeTreeReaderCompact::loadMarks) marks_count: " << marks_count << "\n";
ReadBufferFromFile buffer(mrk_path, file_size);
size_t i = 0;
@ -161,13 +175,13 @@ void MergeTreeReaderCompact::initMarksLoader()
{
buffer.seek(sizeof(size_t), SEEK_CUR);
buffer.readStrict(reinterpret_cast<char *>(res->data() + i * columns_num), sizeof(MarkInCompressedFile) * columns_num);
std::cerr << "(MergeTreeReaderCompact::loadMarks) i: " << i << "\n";
std::cerr << "(MergeTreeReaderCompact::loadMarks) buffer pos in file: " << buffer.getPositionInFile() << "\n";
// std::cerr << "(MergeTreeReaderCompact::loadMarks) i: " << i << "\n";
// std::cerr << "(MergeTreeReaderCompact::loadMarks) buffer pos in file: " << buffer.getPositionInFile() << "\n";
++i;
}
std::cerr << "(MergeTreeReaderCompact::loadMarks) file_size: " << file_size << "\n";
std::cerr << "(MergeTreeReaderCompact::loadMarks) correct file size: " << i * mark_size_in_bytes << "\n";
// std::cerr << "(MergeTreeReaderCompact::loadMarks) file_size: " << file_size << "\n";
// std::cerr << "(MergeTreeReaderCompact::loadMarks) correct file size: " << i * mark_size_in_bytes << "\n";
if (i * mark_size_in_bytes != file_size)
throw Exception("Cannot read all marks from file " + mrk_path, ErrorCodes::CANNOT_READ_ALL_DATA);