diff --git a/dbms/src/DataStreams/AsynchronousBlockInputStream.h b/dbms/src/DataStreams/AsynchronousBlockInputStream.h index 93c695f20c9..a32d3049995 100644 --- a/dbms/src/DataStreams/AsynchronousBlockInputStream.h +++ b/dbms/src/DataStreams/AsynchronousBlockInputStream.h @@ -70,7 +70,6 @@ public: return ready.tryWait(milliseconds); } - Block getHeader() const override { return children.at(0)->getHeader(); } void cancel(bool kill) override diff --git a/dbms/src/DataStreams/SquashingTransform.h b/dbms/src/DataStreams/SquashingTransform.h index f1681c57c8c..de6f1cc8383 100644 --- a/dbms/src/DataStreams/SquashingTransform.h +++ b/dbms/src/DataStreams/SquashingTransform.h @@ -40,6 +40,8 @@ public: */ Result add(MutableColumns && columns); + bool hasPendingData() { return !accumulated_columns.empty(); } + private: size_t min_block_size_rows; size_t min_block_size_bytes; diff --git a/dbms/src/Storages/MergeTree/IMergeTreeDataPartWriter.cpp b/dbms/src/Storages/MergeTree/IMergeTreeDataPartWriter.cpp index 955164dc7ac..5826fb0b1f8 100644 --- a/dbms/src/Storages/MergeTree/IMergeTreeDataPartWriter.cpp +++ b/dbms/src/Storages/MergeTree/IMergeTreeDataPartWriter.cpp @@ -127,28 +127,30 @@ void fillIndexGranularityImpl( /// We should be less or equal than fixed index granularity index_granularity_for_block = std::min(fixed_index_granularity_rows, index_granularity_for_block); - - /// FIXME correct index granularity for compact - // index_granularity_for_block = rows_in_block; - - /// FIXME: split/join last mark for compact parts + size_t current_row; for (current_row = index_offset; current_row < rows_in_block; current_row += index_granularity_for_block) - index_granularity.appendMark(index_granularity_for_block); - - size_t rows_rest_in_block = rows_in_block - (current_row - index_granularity_for_block); - if (need_finish_last_granule && rows_rest_in_block) { - /// If enough rows are left, create a new granule. Otherwise, extend previous granule. - /// So,real size of granule differs from index_granularity_for_block not more than 50%. - if (rows_rest_in_block * 2 >= index_granularity_for_block) - index_granularity.appendMark(rows_rest_in_block); + size_t rows_rest_in_block = rows_in_block - current_row; + std::cerr << "rows_rest_in_block: " << rows_rest_in_block << "\n"; + std::cerr << "rows_rest_in_block: " << index_granularity_for_block << "\n"; + + /// FIXME may be remove need_finish_last_granule and do it always + if (need_finish_last_granule && rows_rest_in_block < index_granularity_for_block) + { + if (rows_rest_in_block * 2 >= index_granularity_for_block) + index_granularity.appendMark(rows_rest_in_block); + else + index_granularity.addRowsToLastMark(rows_rest_in_block); + } else { - index_granularity.popMark(); - index_granularity.appendMark(index_granularity_for_block + rows_rest_in_block); + index_granularity.appendMark(index_granularity_for_block); } } + + for (size_t i = 0; i < index_granularity.getMarksCount(); ++i) + std::cerr << "marks: " << index_granularity.getMarkRows(i) << "\n"; } void IMergeTreeDataPartWriter::fillIndexGranularity(const Block & block) diff --git a/dbms/src/Storages/MergeTree/IMergeTreeDataPartWriter.h b/dbms/src/Storages/MergeTree/IMergeTreeDataPartWriter.h index c182ca072fe..e3deeea859e 100644 --- a/dbms/src/Storages/MergeTree/IMergeTreeDataPartWriter.h +++ b/dbms/src/Storages/MergeTree/IMergeTreeDataPartWriter.h @@ -69,6 +69,7 @@ public: const MergeTreeIndexGranularity & index_granularity, bool need_finish_last_granule); + /// FIXME remove indices block virtual void write( const Block & block, const IColumn::Permutation * permutation, /* Blocks with already sorted index columns */ diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp index 9c6ac028ac1..ab95bacafe9 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp @@ -23,6 +23,7 @@ MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact( storage_, columns_list_, indices_to_recalc_, marks_file_extension_, default_codec_, settings_, index_granularity_, true) + , squashing(storage.getSettings()->index_granularity, storage.getSettings()->index_granularity_bytes) { stream = std::make_unique( DATA_FILE_NAME, @@ -37,6 +38,35 @@ MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact( void MergeTreeDataPartWriterCompact::write( const Block & block, const IColumn::Permutation * permutation, const Block & primary_key_block, const Block & skip_indexes_block) +{ + UNUSED(primary_key_block); + UNUSED(skip_indexes_block); + + if (!header) + header = block.cloneEmpty(); + + Block result_block = block; + + if (permutation) + { + auto it = columns_list.begin(); + for (size_t i = 0; i < columns_list.size(); ++i) + { + auto & column = result_block.getByName(it->name); + column.column = column.column->permute(*permutation, 0); + } + } + + auto result = squashing.add(result_block.mutateColumns()); + if (!result.ready) + return; + + result_block = header.cloneWithColumns(std::move(result.columns)); + + writeBlock(result_block); +} + +void MergeTreeDataPartWriterCompact::writeBlock(const Block & block) { size_t total_rows = block.rows(); size_t from_mark = current_mark; @@ -48,25 +78,10 @@ void MergeTreeDataPartWriterCompact::write( if (compute_granularity) fillIndexGranularity(block); - ColumnsWithTypeAndName columns_to_write(columns_list.size()); - auto it = columns_list.begin(); - for (size_t i = 0; i < columns_list.size(); ++i, ++it) - { - if (permutation) - { - if (primary_key_block.has(it->name)) - columns_to_write[i] = primary_key_block.getByName(it->name); - else if (skip_indexes_block.has(it->name)) - columns_to_write[i] = skip_indexes_block.getByName(it->name); - else - { - columns_to_write[i] = block.getByName(it->name); - columns_to_write[i].column = columns_to_write[i].column->permute(*permutation, 0); - } - } - else - columns_to_write[i] = block.getByName(it->name); - } + std::cerr << "(MergeTreeDataPartWriterCompact::write) marks: " << index_granularity.getMarksCount() << "\n"; + + for (size_t i = 0; i < index_granularity.getMarksCount(); ++i) + std::cerr << "rows in mark: " << index_granularity.getMarkRows(i) << "\n"; std::cerr << "(MergeTreeDataPartWriterCompact::write) total_rows: " << total_rows << "\n"; @@ -74,50 +89,28 @@ void MergeTreeDataPartWriterCompact::write( { std::cerr << "(MergeTreeDataPartWriterCompact::write) current_row: " << current_row << "\n"; - bool write_marks = true; - // size_t rows_to_write = std::min(total_rows, index_granularity.getMarkRows(current_mark)); - size_t rows_to_write = total_rows; - // if (compute_granularity) - // index_granularity.appendMark(total_rows); + size_t rows_to_write = index_granularity.getMarkRows(from_mark); + + std::cerr << "(MergeTreeDataPartWriterCompact::write) rows_to_write: " << rows_to_write << "\n"; if (rows_to_write) data_written = true; - // if (current_row == 0 && index_offset != 0) - // { - // rows_to_write = index_offset; - // write_marks = false; - // } - // else - // { - // rows_to_write = index_granularity.getMarkRows(current_mark); - // } - - // std::cerr << "(MergeTreeDataPartWriterCompact::write) rows_to_write: " << rows_to_write << "\n"; - /// There could already be enough data to compress into the new block. if (stream->compressed.offset() >= settings.min_compress_block_size) stream->compressed.next(); size_t next_row = 0; - if (write_marks) + writeIntBinary(rows_to_write, stream->marks); + for (const auto & it : columns_list) { - writeIntBinary(rows_to_write, stream->marks); - for (size_t i = 0; i < columns_to_write.size(); ++i) - { - writeIntBinary(stream->plain_hashing.count(), stream->marks); - writeIntBinary(stream->compressed.offset(), stream->marks); - next_row = writeColumnSingleGranule(columns_to_write[i], current_row, rows_to_write); - } - ++from_mark; - } - else - { - for (size_t i = 0; i < columns_to_write.size(); ++i) - next_row = writeColumnSingleGranule(columns_to_write[i], current_row, rows_to_write); + writeIntBinary(stream->plain_hashing.count(), stream->marks); + writeIntBinary(stream->compressed.offset(), stream->marks); + next_row = writeColumnSingleGranule(block.getByName(it.name), current_row, rows_to_write); } + ++from_mark; current_row = next_row; } @@ -125,6 +118,7 @@ void MergeTreeDataPartWriterCompact::write( next_index_offset = total_rows - current_row; } + size_t MergeTreeDataPartWriterCompact::writeColumnSingleGranule(const ColumnWithTypeAndName & column, size_t from_row, size_t number_of_rows) { std::cerr << "(writeColumnSingleGranule) writing column: " << column.name << "\n"; @@ -146,7 +140,11 @@ size_t MergeTreeDataPartWriterCompact::writeColumnSingleGranule(const ColumnWith } void MergeTreeDataPartWriterCompact::finishDataSerialization(IMergeTreeDataPart::Checksums & checksums, bool sync) -{ +{ + auto result = squashing.add({}); + if (result.ready && !result.columns.empty()) + writeBlock(header.cloneWithColumns(std::move(result.columns))); + if (with_final_mark && data_written) { writeIntBinary(0ULL, stream->marks); diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.h b/dbms/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.h index efe84182640..1c56bf75ade 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.h +++ b/dbms/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.h @@ -1,4 +1,5 @@ #include +#include namespace DB { @@ -28,7 +29,12 @@ private: size_t from_row, size_t number_of_rows); + void writeBlock(const Block & block); + ColumnStreamPtr stream; + + SquashingTransform squashing; + Block header; }; } diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp index 7b17bcb2ec8..3ce805b0e65 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp @@ -243,7 +243,7 @@ std::pair MergeTreeDataPartWriterWide::writeColumn( while (current_row < total_rows) { size_t rows_to_write; - bool write_marks = true; /// FIXME not always true + bool write_marks = true; /// If there is `index_offset`, then the first mark goes not immediately, but after this number of rows. if (current_row == 0 && index_offset != 0) diff --git a/dbms/src/Storages/MergeTree/MergeTreeIndexGranularity.cpp b/dbms/src/Storages/MergeTree/MergeTreeIndexGranularity.cpp index b2dd5141df3..4901900b6ba 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeIndexGranularity.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeIndexGranularity.cpp @@ -48,6 +48,14 @@ void MergeTreeIndexGranularity::appendMark(size_t rows_count) marks_rows_partial_sums.push_back(marks_rows_partial_sums.back() + rows_count); } +void MergeTreeIndexGranularity::addRowsToLastMark(size_t rows_count) +{ + if (marks_rows_partial_sums.empty()) + marks_rows_partial_sums.push_back(rows_count); + else + marks_rows_partial_sums.back() += rows_count; +} + void MergeTreeIndexGranularity::popMark() { if (!marks_rows_partial_sums.empty()) diff --git a/dbms/src/Storages/MergeTree/MergeTreeIndexGranularity.h b/dbms/src/Storages/MergeTree/MergeTreeIndexGranularity.h index a2d78596279..053eebccb25 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeIndexGranularity.h +++ b/dbms/src/Storages/MergeTree/MergeTreeIndexGranularity.h @@ -92,6 +92,8 @@ public: /// Add new mark with rows_count void appendMark(size_t rows_count); + void addRowsToLastMark(size_t rows_count); + void popMark(); /// Add `size` of marks with `fixed_granularity` rows diff --git a/dbms/src/Storages/MergeTree/MergeTreeRangeReader.cpp b/dbms/src/Storages/MergeTree/MergeTreeRangeReader.cpp index 2eb53f9280d..79850ea6101 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeRangeReader.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeRangeReader.cpp @@ -31,6 +31,11 @@ size_t MergeTreeRangeReader::DelayedStream::readRows(Block & block, size_t num_r { if (num_rows) { + std::cerr << "(DelayedStream::readRows) current_mark: " << current_mark << '\n'; + std::cerr << "(DelayedStream::readRows) continue_reading: " << continue_reading << '\n'; + std::cerr << "(DelayedStream::readRows) num_rows: " << num_rows << '\n'; + + size_t rows_read = merge_tree_reader->readRows(current_mark, continue_reading, num_rows, block); continue_reading = true; diff --git a/dbms/src/Storages/MergeTree/MergeTreeReaderCompact.cpp b/dbms/src/Storages/MergeTree/MergeTreeReaderCompact.cpp index 517c6212b1b..84d87954cdf 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeReaderCompact.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeReaderCompact.cpp @@ -54,43 +54,57 @@ size_t MergeTreeReaderCompact::readRows(size_t from_mark, bool continue_reading, UNUSED(res); /// FIXME compute correct granularity - size_t rows_to_read = data_part->index_granularity.getMarkRows(from_mark); + std::cerr << "(MergeTreeReaderCompact::readRows) max_rows_to_read: " << max_rows_to_read << "\n"; + size_t read_rows = 0; - for (const auto & it : columns) + while (read_rows < max_rows_to_read) { - bool append = res.has(it.name); - if (!append) - res.insert(ColumnWithTypeAndName(it.type->createColumn(), it.type, it.name)); + size_t rows_to_read = data_part->index_granularity.getMarkRows(from_mark); - /// To keep offsets shared. TODO Very dangerous. Get rid of this. - MutableColumnPtr column = res.getByName(it.name).column->assumeMutable(); + std::cerr << "(MergeTreeReaderCompact::readRows) rows_to_read: " << rows_to_read << "\n"; - try + for (const auto & it : columns) { - size_t column_size_before_reading = column->size(); - size_t column_position = data_part->getColumnPosition(it.name); + bool append = res.has(it.name); + if (!append) + res.insert(ColumnWithTypeAndName(it.type->createColumn(), it.type, it.name)); - readData(it.name, *it.type, *column, from_mark, column_position, rows_to_read); + /// To keep offsets shared. TODO Very dangerous. Get rid of this. + MutableColumnPtr column = res.getByName(it.name).column->assumeMutable(); + + try + { + // size_t column_size_before_reading = column->size(); + size_t column_position = data_part->getColumnPosition(it.name); + + readData(it.name, *it.type, *column, from_mark, column_position, rows_to_read); + + /// For elements of Nested, column_size_before_reading may be greater than column size + /// if offsets are not empty and were already read, but elements are empty. + /// FIXME + // if (column->size()) + // read_rows_in_mark = std::max(read_rows, column->size() - column_size_before_reading); + } + catch (Exception & e) + { + /// Better diagnostics. + e.addMessage("(while reading column " + it.name + ")"); + throw; + } - /// For elements of Nested, column_size_before_reading may be greater than column size - /// if offsets are not empty and were already read, but elements are empty. if (column->size()) - read_rows = std::max(read_rows, column->size() - column_size_before_reading); - } - catch (Exception & e) - { - /// Better diagnostics. - e.addMessage("(while reading column " + it.name + ")"); - throw; + res.getByName(it.name).column = std::move(column); + else + res.erase(it.name); } - if (column->size()) - res.getByName(it.name).column = std::move(column); - else - res.erase(it.name); + ++from_mark; + read_rows += rows_to_read; } + std::cerr << "(MergeTreeReaderCompact::readRows) read_rows: " << read_rows << "\n"; + return read_rows; } @@ -152,7 +166,7 @@ void MergeTreeReaderCompact::initMarksLoader() auto res = std::make_shared(marks_count * columns_num); - std::cerr << "(MergeTreeReaderCompact::loadMarks) marks_count: " << marks_count << "\n"; + // std::cerr << "(MergeTreeReaderCompact::loadMarks) marks_count: " << marks_count << "\n"; ReadBufferFromFile buffer(mrk_path, file_size); size_t i = 0; @@ -161,13 +175,13 @@ void MergeTreeReaderCompact::initMarksLoader() { buffer.seek(sizeof(size_t), SEEK_CUR); buffer.readStrict(reinterpret_cast(res->data() + i * columns_num), sizeof(MarkInCompressedFile) * columns_num); - std::cerr << "(MergeTreeReaderCompact::loadMarks) i: " << i << "\n"; - std::cerr << "(MergeTreeReaderCompact::loadMarks) buffer pos in file: " << buffer.getPositionInFile() << "\n"; + // std::cerr << "(MergeTreeReaderCompact::loadMarks) i: " << i << "\n"; + // std::cerr << "(MergeTreeReaderCompact::loadMarks) buffer pos in file: " << buffer.getPositionInFile() << "\n"; ++i; } - std::cerr << "(MergeTreeReaderCompact::loadMarks) file_size: " << file_size << "\n"; - std::cerr << "(MergeTreeReaderCompact::loadMarks) correct file size: " << i * mark_size_in_bytes << "\n"; + // std::cerr << "(MergeTreeReaderCompact::loadMarks) file_size: " << file_size << "\n"; + // std::cerr << "(MergeTreeReaderCompact::loadMarks) correct file size: " << i * mark_size_in_bytes << "\n"; if (i * mark_size_in_bytes != file_size) throw Exception("Cannot read all marks from file " + mrk_path, ErrorCodes::CANNOT_READ_ALL_DATA);