mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-18 04:12:19 +00:00
Merge pull request #10531 from CurtizJ/polymorphic-parts-2
Fix index corruption in merges with compact parts.
This commit is contained in:
commit
15e8f37839
@ -71,8 +71,7 @@ IMergeTreeDataPartWriter::IMergeTreeDataPartWriter(
|
|||||||
const String & marks_file_extension_,
|
const String & marks_file_extension_,
|
||||||
const CompressionCodecPtr & default_codec_,
|
const CompressionCodecPtr & default_codec_,
|
||||||
const MergeTreeWriterSettings & settings_,
|
const MergeTreeWriterSettings & settings_,
|
||||||
const MergeTreeIndexGranularity & index_granularity_,
|
const MergeTreeIndexGranularity & index_granularity_)
|
||||||
bool need_finish_last_granule_)
|
|
||||||
: disk(std::move(disk_))
|
: disk(std::move(disk_))
|
||||||
, part_path(part_path_)
|
, part_path(part_path_)
|
||||||
, storage(storage_)
|
, storage(storage_)
|
||||||
@ -84,7 +83,6 @@ IMergeTreeDataPartWriter::IMergeTreeDataPartWriter(
|
|||||||
, settings(settings_)
|
, settings(settings_)
|
||||||
, compute_granularity(index_granularity.empty())
|
, compute_granularity(index_granularity.empty())
|
||||||
, with_final_mark(storage.getSettings()->write_final_mark && settings.can_use_adaptive_granularity)
|
, with_final_mark(storage.getSettings()->write_final_mark && settings.can_use_adaptive_granularity)
|
||||||
, need_finish_last_granule(need_finish_last_granule_)
|
|
||||||
{
|
{
|
||||||
if (settings.blocks_are_granules_size && !index_granularity.empty())
|
if (settings.blocks_are_granules_size && !index_granularity.empty())
|
||||||
throw Exception("Can't take information about index granularity from blocks, when non empty index_granularity array specified", ErrorCodes::LOGICAL_ERROR);
|
throw Exception("Can't take information about index granularity from blocks, when non empty index_granularity array specified", ErrorCodes::LOGICAL_ERROR);
|
||||||
@ -95,15 +93,15 @@ IMergeTreeDataPartWriter::IMergeTreeDataPartWriter(
|
|||||||
|
|
||||||
IMergeTreeDataPartWriter::~IMergeTreeDataPartWriter() = default;
|
IMergeTreeDataPartWriter::~IMergeTreeDataPartWriter() = default;
|
||||||
|
|
||||||
static void fillIndexGranularityImpl(
|
/// Implemetation is splitted into static functions for ability
|
||||||
|
/// of making unit tests without creation instance of IMergeTreeDataPartWriter,
|
||||||
|
/// which requires a lot of dependencies and access to filesystem.
|
||||||
|
static size_t computeIndexGranularityImpl(
|
||||||
const Block & block,
|
const Block & block,
|
||||||
size_t index_granularity_bytes,
|
size_t index_granularity_bytes,
|
||||||
size_t fixed_index_granularity_rows,
|
size_t fixed_index_granularity_rows,
|
||||||
bool blocks_are_granules,
|
bool blocks_are_granules,
|
||||||
size_t index_offset,
|
bool can_use_adaptive_index_granularity)
|
||||||
MergeTreeIndexGranularity & index_granularity,
|
|
||||||
bool can_use_adaptive_index_granularity,
|
|
||||||
bool need_finish_last_granule = false)
|
|
||||||
{
|
{
|
||||||
size_t rows_in_block = block.rows();
|
size_t rows_in_block = block.rows();
|
||||||
size_t index_granularity_for_block;
|
size_t index_granularity_for_block;
|
||||||
@ -130,43 +128,37 @@ static void fillIndexGranularityImpl(
|
|||||||
|
|
||||||
/// We should be less or equal than fixed index granularity
|
/// We should be less or equal than fixed index granularity
|
||||||
index_granularity_for_block = std::min(fixed_index_granularity_rows, index_granularity_for_block);
|
index_granularity_for_block = std::min(fixed_index_granularity_rows, index_granularity_for_block);
|
||||||
|
return index_granularity_for_block;
|
||||||
size_t current_row;
|
|
||||||
for (current_row = index_offset; current_row < rows_in_block; current_row += index_granularity_for_block)
|
|
||||||
{
|
|
||||||
size_t rows_left_in_block = rows_in_block - current_row;
|
|
||||||
|
|
||||||
/// Try to extend last granule if it's needed and block is large enough
|
|
||||||
/// or it shouldn't be first in granule (index_offset != 0).
|
|
||||||
if (need_finish_last_granule && rows_left_in_block < index_granularity_for_block
|
|
||||||
&& (rows_in_block >= index_granularity_for_block || index_offset != 0))
|
|
||||||
{
|
|
||||||
// If enough rows are left, create a new granule. Otherwise, extend previous granule.
|
|
||||||
// So, real size of granule differs from index_granularity_for_block not more than 50%.
|
|
||||||
if (rows_left_in_block * 2 >= index_granularity_for_block)
|
|
||||||
index_granularity.appendMark(rows_left_in_block);
|
|
||||||
else
|
|
||||||
index_granularity.addRowsToLastMark(rows_left_in_block);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
index_granularity.appendMark(index_granularity_for_block);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void IMergeTreeDataPartWriter::fillIndexGranularity(const Block & block)
|
static void fillIndexGranularityImpl(
|
||||||
|
MergeTreeIndexGranularity & index_granularity,
|
||||||
|
size_t index_offset,
|
||||||
|
size_t index_granularity_for_block,
|
||||||
|
size_t rows_in_block)
|
||||||
|
{
|
||||||
|
for (size_t current_row = index_offset; current_row < rows_in_block; current_row += index_granularity_for_block)
|
||||||
|
index_granularity.appendMark(index_granularity_for_block);
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t IMergeTreeDataPartWriter::computeIndexGranularity(const Block & block)
|
||||||
{
|
{
|
||||||
const auto storage_settings = storage.getSettings();
|
const auto storage_settings = storage.getSettings();
|
||||||
fillIndexGranularityImpl(
|
return computeIndexGranularityImpl(
|
||||||
block,
|
block,
|
||||||
storage_settings->index_granularity_bytes,
|
storage_settings->index_granularity_bytes,
|
||||||
storage_settings->index_granularity,
|
storage_settings->index_granularity,
|
||||||
settings.blocks_are_granules_size,
|
settings.blocks_are_granules_size,
|
||||||
index_offset,
|
settings.can_use_adaptive_granularity);
|
||||||
|
}
|
||||||
|
|
||||||
|
void IMergeTreeDataPartWriter::fillIndexGranularity(size_t index_granularity_for_block, size_t rows_in_block)
|
||||||
|
{
|
||||||
|
fillIndexGranularityImpl(
|
||||||
index_granularity,
|
index_granularity,
|
||||||
settings.can_use_adaptive_granularity,
|
index_offset,
|
||||||
need_finish_last_granule);
|
index_granularity_for_block,
|
||||||
|
rows_in_block);
|
||||||
}
|
}
|
||||||
|
|
||||||
void IMergeTreeDataPartWriter::initPrimaryIndex()
|
void IMergeTreeDataPartWriter::initPrimaryIndex()
|
||||||
@ -225,21 +217,22 @@ void IMergeTreeDataPartWriter::calculateAndSerializePrimaryIndex(const Block & p
|
|||||||
|
|
||||||
/// Write index. The index contains Primary Key value for each `index_granularity` row.
|
/// Write index. The index contains Primary Key value for each `index_granularity` row.
|
||||||
|
|
||||||
for (size_t i = index_offset; i < rows;)
|
size_t current_row = index_offset;
|
||||||
|
size_t total_marks = index_granularity.getMarksCount();
|
||||||
|
|
||||||
|
while (index_mark < total_marks && current_row < rows)
|
||||||
{
|
{
|
||||||
if (storage.hasPrimaryKey())
|
if (storage.hasPrimaryKey())
|
||||||
{
|
{
|
||||||
for (size_t j = 0; j < primary_columns_num; ++j)
|
for (size_t j = 0; j < primary_columns_num; ++j)
|
||||||
{
|
{
|
||||||
const auto & primary_column = primary_index_block.getByPosition(j);
|
const auto & primary_column = primary_index_block.getByPosition(j);
|
||||||
index_columns[j]->insertFrom(*primary_column.column, i);
|
index_columns[j]->insertFrom(*primary_column.column, current_row);
|
||||||
primary_column.type->serializeBinary(*primary_column.column, i, *index_stream);
|
primary_column.type->serializeBinary(*primary_column.column, current_row, *index_stream);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
i += index_granularity.getMarkRows(current_mark++);
|
current_row += index_granularity.getMarkRows(index_mark++);
|
||||||
if (current_mark >= index_granularity.getMarksCount())
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// store last index row to write final mark at the end of column
|
/// store last index row to write final mark at the end of column
|
||||||
|
@ -69,8 +69,7 @@ public:
|
|||||||
const String & marks_file_extension,
|
const String & marks_file_extension,
|
||||||
const CompressionCodecPtr & default_codec,
|
const CompressionCodecPtr & default_codec,
|
||||||
const MergeTreeWriterSettings & settings,
|
const MergeTreeWriterSettings & settings,
|
||||||
const MergeTreeIndexGranularity & index_granularity,
|
const MergeTreeIndexGranularity & index_granularity);
|
||||||
bool need_finish_last_granule);
|
|
||||||
|
|
||||||
virtual ~IMergeTreeDataPartWriter();
|
virtual ~IMergeTreeDataPartWriter();
|
||||||
|
|
||||||
@ -87,9 +86,6 @@ public:
|
|||||||
/// calling calculations of primary and skip indices.
|
/// calling calculations of primary and skip indices.
|
||||||
void next();
|
void next();
|
||||||
|
|
||||||
/// Count index_granularity for block and store in `index_granularity`
|
|
||||||
void fillIndexGranularity(const Block & block);
|
|
||||||
|
|
||||||
const MergeTreeIndexGranularity & getIndexGranularity() const { return index_granularity; }
|
const MergeTreeIndexGranularity & getIndexGranularity() const { return index_granularity; }
|
||||||
|
|
||||||
Columns releaseIndexColumns()
|
Columns releaseIndexColumns()
|
||||||
@ -112,6 +108,13 @@ public:
|
|||||||
void finishSkipIndicesSerialization(MergeTreeData::DataPart::Checksums & checksums);
|
void finishSkipIndicesSerialization(MergeTreeData::DataPart::Checksums & checksums);
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
|
/// Count index_granularity for block and store in `index_granularity`
|
||||||
|
size_t computeIndexGranularity(const Block & block);
|
||||||
|
virtual void fillIndexGranularity(size_t index_granularity_for_block, size_t rows_in_block);
|
||||||
|
|
||||||
|
size_t getCurrentMark() const { return current_mark; }
|
||||||
|
size_t getIndexOffset() const { return index_offset; }
|
||||||
|
|
||||||
using SerializationState = IDataType::SerializeBinaryBulkStatePtr;
|
using SerializationState = IDataType::SerializeBinaryBulkStatePtr;
|
||||||
using SerializationStates = std::unordered_map<String, SerializationState>;
|
using SerializationStates = std::unordered_map<String, SerializationState>;
|
||||||
|
|
||||||
@ -131,12 +134,6 @@ protected:
|
|||||||
|
|
||||||
bool compute_granularity;
|
bool compute_granularity;
|
||||||
bool with_final_mark;
|
bool with_final_mark;
|
||||||
bool need_finish_last_granule;
|
|
||||||
|
|
||||||
size_t current_mark = 0;
|
|
||||||
|
|
||||||
/// The offset to the first row of the block for which you want to write the index.
|
|
||||||
size_t index_offset = 0;
|
|
||||||
|
|
||||||
size_t next_mark = 0;
|
size_t next_mark = 0;
|
||||||
size_t next_index_offset = 0;
|
size_t next_index_offset = 0;
|
||||||
@ -163,6 +160,14 @@ protected:
|
|||||||
|
|
||||||
/// To correctly write Nested elements column-by-column.
|
/// To correctly write Nested elements column-by-column.
|
||||||
WrittenOffsetColumns * written_offset_columns = nullptr;
|
WrittenOffsetColumns * written_offset_columns = nullptr;
|
||||||
|
|
||||||
|
private:
|
||||||
|
/// Data is already written up to this mark.
|
||||||
|
size_t current_mark = 0;
|
||||||
|
/// The offset to the first row of the block for which you want to write the index.
|
||||||
|
size_t index_offset = 0;
|
||||||
|
/// Index is already serialized up to this mark.
|
||||||
|
size_t index_mark = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -18,7 +18,7 @@ MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact(
|
|||||||
: IMergeTreeDataPartWriter(disk_, part_path_,
|
: IMergeTreeDataPartWriter(disk_, part_path_,
|
||||||
storage_, columns_list_,
|
storage_, columns_list_,
|
||||||
indices_to_recalc_, marks_file_extension_,
|
indices_to_recalc_, marks_file_extension_,
|
||||||
default_codec_, settings_, index_granularity_, true)
|
default_codec_, settings_, index_granularity_)
|
||||||
{
|
{
|
||||||
using DataPart = MergeTreeDataPartCompact;
|
using DataPart = MergeTreeDataPartCompact;
|
||||||
String data_file_name = DataPart::DATA_FILE_NAME;
|
String data_file_name = DataPart::DATA_FILE_NAME;
|
||||||
@ -42,7 +42,10 @@ void MergeTreeDataPartWriterCompact::write(
|
|||||||
/// if it's unknown (in case of insert data or horizontal merge,
|
/// if it's unknown (in case of insert data or horizontal merge,
|
||||||
/// but not in case of vertical merge)
|
/// but not in case of vertical merge)
|
||||||
if (compute_granularity)
|
if (compute_granularity)
|
||||||
fillIndexGranularity(block);
|
{
|
||||||
|
size_t index_granularity_for_block = computeIndexGranularity(block);
|
||||||
|
fillIndexGranularity(index_granularity_for_block, block.rows());
|
||||||
|
}
|
||||||
|
|
||||||
Block result_block;
|
Block result_block;
|
||||||
|
|
||||||
@ -88,7 +91,7 @@ void MergeTreeDataPartWriterCompact::write(
|
|||||||
void MergeTreeDataPartWriterCompact::writeBlock(const Block & block)
|
void MergeTreeDataPartWriterCompact::writeBlock(const Block & block)
|
||||||
{
|
{
|
||||||
size_t total_rows = block.rows();
|
size_t total_rows = block.rows();
|
||||||
size_t from_mark = current_mark;
|
size_t from_mark = getCurrentMark();
|
||||||
size_t current_row = 0;
|
size_t current_row = 0;
|
||||||
|
|
||||||
while (current_row < total_rows)
|
while (current_row < total_rows)
|
||||||
@ -163,6 +166,44 @@ void MergeTreeDataPartWriterCompact::finishDataSerialization(IMergeTreeDataPart:
|
|||||||
stream.reset();
|
stream.reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void fillIndexGranularityImpl(
|
||||||
|
MergeTreeIndexGranularity & index_granularity,
|
||||||
|
size_t index_offset,
|
||||||
|
size_t index_granularity_for_block,
|
||||||
|
size_t rows_in_block)
|
||||||
|
{
|
||||||
|
for (size_t current_row = index_offset; current_row < rows_in_block; current_row += index_granularity_for_block)
|
||||||
|
{
|
||||||
|
size_t rows_left_in_block = rows_in_block - current_row;
|
||||||
|
|
||||||
|
/// Try to extend last granule if block is large enough
|
||||||
|
/// or it isn't first in granule (index_offset != 0).
|
||||||
|
if (rows_left_in_block < index_granularity_for_block &&
|
||||||
|
(rows_in_block >= index_granularity_for_block || index_offset != 0))
|
||||||
|
{
|
||||||
|
// If enough rows are left, create a new granule. Otherwise, extend previous granule.
|
||||||
|
// So, real size of granule differs from index_granularity_for_block not more than 50%.
|
||||||
|
if (rows_left_in_block * 2 >= index_granularity_for_block)
|
||||||
|
index_granularity.appendMark(rows_left_in_block);
|
||||||
|
else
|
||||||
|
index_granularity.addRowsToLastMark(rows_left_in_block);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
index_granularity.appendMark(index_granularity_for_block);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void MergeTreeDataPartWriterCompact::fillIndexGranularity(size_t index_granularity_for_block, size_t rows_in_block)
|
||||||
|
{
|
||||||
|
fillIndexGranularityImpl(
|
||||||
|
index_granularity,
|
||||||
|
getIndexOffset(),
|
||||||
|
index_granularity_for_block,
|
||||||
|
rows_in_block);
|
||||||
|
}
|
||||||
|
|
||||||
void MergeTreeDataPartWriterCompact::ColumnsBuffer::add(MutableColumns && columns)
|
void MergeTreeDataPartWriterCompact::ColumnsBuffer::add(MutableColumns && columns)
|
||||||
{
|
{
|
||||||
if (accumulated_columns.empty())
|
if (accumulated_columns.empty())
|
||||||
|
@ -23,6 +23,9 @@ public:
|
|||||||
|
|
||||||
void finishDataSerialization(IMergeTreeDataPart::Checksums & checksums) override;
|
void finishDataSerialization(IMergeTreeDataPart::Checksums & checksums) override;
|
||||||
|
|
||||||
|
protected:
|
||||||
|
void fillIndexGranularity(size_t index_granularity_for_block, size_t rows_in_block) override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
/// Write single granule of one column (rows between 2 marks)
|
/// Write single granule of one column (rows between 2 marks)
|
||||||
void writeColumnSingleGranule(
|
void writeColumnSingleGranule(
|
||||||
|
@ -24,7 +24,7 @@ MergeTreeDataPartWriterWide::MergeTreeDataPartWriterWide(
|
|||||||
const MergeTreeIndexGranularity & index_granularity_)
|
const MergeTreeIndexGranularity & index_granularity_)
|
||||||
: IMergeTreeDataPartWriter(disk_, part_path_,
|
: IMergeTreeDataPartWriter(disk_, part_path_,
|
||||||
storage_, columns_list_, indices_to_recalc_,
|
storage_, columns_list_, indices_to_recalc_,
|
||||||
marks_file_extension_, default_codec_, settings_, index_granularity_, false)
|
marks_file_extension_, default_codec_, settings_, index_granularity_)
|
||||||
{
|
{
|
||||||
const auto & columns = storage.getColumns();
|
const auto & columns = storage.getColumns();
|
||||||
for (const auto & it : columns_list)
|
for (const auto & it : columns_list)
|
||||||
@ -85,7 +85,10 @@ void MergeTreeDataPartWriterWide::write(const Block & block,
|
|||||||
/// if it's unknown (in case of insert data or horizontal merge,
|
/// if it's unknown (in case of insert data or horizontal merge,
|
||||||
/// but not in case of vertical merge)
|
/// but not in case of vertical merge)
|
||||||
if (compute_granularity)
|
if (compute_granularity)
|
||||||
fillIndexGranularity(block);
|
{
|
||||||
|
size_t index_granularity_for_block = computeIndexGranularity(block);
|
||||||
|
fillIndexGranularity(index_granularity_for_block, block.rows());
|
||||||
|
}
|
||||||
|
|
||||||
auto offset_columns = written_offset_columns ? *written_offset_columns : WrittenOffsetColumns{};
|
auto offset_columns = written_offset_columns ? *written_offset_columns : WrittenOffsetColumns{};
|
||||||
|
|
||||||
@ -206,17 +209,18 @@ void MergeTreeDataPartWriterWide::writeColumn(
|
|||||||
|
|
||||||
size_t total_rows = column.size();
|
size_t total_rows = column.size();
|
||||||
size_t current_row = 0;
|
size_t current_row = 0;
|
||||||
size_t current_column_mark = current_mark;
|
size_t current_column_mark = getCurrentMark();
|
||||||
|
size_t current_index_offset = getIndexOffset();
|
||||||
while (current_row < total_rows)
|
while (current_row < total_rows)
|
||||||
{
|
{
|
||||||
size_t rows_to_write;
|
size_t rows_to_write;
|
||||||
bool write_marks = true;
|
bool write_marks = true;
|
||||||
|
|
||||||
/// If there is `index_offset`, then the first mark goes not immediately, but after this number of rows.
|
/// If there is `index_offset`, then the first mark goes not immediately, but after this number of rows.
|
||||||
if (current_row == 0 && index_offset != 0)
|
if (current_row == 0 && current_index_offset != 0)
|
||||||
{
|
{
|
||||||
write_marks = false;
|
write_marks = false;
|
||||||
rows_to_write = index_offset;
|
rows_to_write = current_index_offset;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -25,14 +25,16 @@ TEST(AdaptiveIndexGranularity, FillGranularityToyTests)
|
|||||||
EXPECT_EQ(block1.bytes(), 80);
|
EXPECT_EQ(block1.bytes(), 80);
|
||||||
{ /// Granularity bytes are not set. Take default index_granularity.
|
{ /// Granularity bytes are not set. Take default index_granularity.
|
||||||
MergeTreeIndexGranularity index_granularity;
|
MergeTreeIndexGranularity index_granularity;
|
||||||
fillIndexGranularityImpl(block1, 0, 100, false, 0, index_granularity, false);
|
auto granularity = computeIndexGranularityImpl(block1, 0, 100, false, false);
|
||||||
|
fillIndexGranularityImpl(index_granularity, 0, granularity, block1.rows());
|
||||||
EXPECT_EQ(index_granularity.getMarksCount(), 1);
|
EXPECT_EQ(index_granularity.getMarksCount(), 1);
|
||||||
EXPECT_EQ(index_granularity.getMarkRows(0), 100);
|
EXPECT_EQ(index_granularity.getMarkRows(0), 100);
|
||||||
}
|
}
|
||||||
|
|
||||||
{ /// Granule size is less than block size. Block contains multiple granules.
|
{ /// Granule size is less than block size. Block contains multiple granules.
|
||||||
MergeTreeIndexGranularity index_granularity;
|
MergeTreeIndexGranularity index_granularity;
|
||||||
fillIndexGranularityImpl(block1, 16, 100, false, 0, index_granularity, true);
|
auto granularity = computeIndexGranularityImpl(block1, 16, 100, false, true);
|
||||||
|
fillIndexGranularityImpl(index_granularity, 0, granularity, block1.rows());
|
||||||
EXPECT_EQ(index_granularity.getMarksCount(), 5); /// First granule with 8 rows, and second with 1 row
|
EXPECT_EQ(index_granularity.getMarksCount(), 5); /// First granule with 8 rows, and second with 1 row
|
||||||
for (size_t i = 0; i < index_granularity.getMarksCount(); ++i)
|
for (size_t i = 0; i < index_granularity.getMarksCount(); ++i)
|
||||||
EXPECT_EQ(index_granularity.getMarkRows(i), 2);
|
EXPECT_EQ(index_granularity.getMarkRows(i), 2);
|
||||||
@ -41,7 +43,8 @@ TEST(AdaptiveIndexGranularity, FillGranularityToyTests)
|
|||||||
{ /// Granule size is more than block size. Whole block (and maybe more) can be placed in single granule.
|
{ /// Granule size is more than block size. Whole block (and maybe more) can be placed in single granule.
|
||||||
|
|
||||||
MergeTreeIndexGranularity index_granularity;
|
MergeTreeIndexGranularity index_granularity;
|
||||||
fillIndexGranularityImpl(block1, 512, 100, false, 0, index_granularity, true);
|
auto granularity = computeIndexGranularityImpl(block1, 512, 100, false, true);
|
||||||
|
fillIndexGranularityImpl(index_granularity, 0, granularity, block1.rows());
|
||||||
EXPECT_EQ(index_granularity.getMarksCount(), 1);
|
EXPECT_EQ(index_granularity.getMarksCount(), 1);
|
||||||
for (size_t i = 0; i < index_granularity.getMarksCount(); ++i)
|
for (size_t i = 0; i < index_granularity.getMarksCount(); ++i)
|
||||||
EXPECT_EQ(index_granularity.getMarkRows(i), 64);
|
EXPECT_EQ(index_granularity.getMarkRows(i), 64);
|
||||||
@ -50,7 +53,8 @@ TEST(AdaptiveIndexGranularity, FillGranularityToyTests)
|
|||||||
{ /// Blocks with granule size
|
{ /// Blocks with granule size
|
||||||
|
|
||||||
MergeTreeIndexGranularity index_granularity;
|
MergeTreeIndexGranularity index_granularity;
|
||||||
fillIndexGranularityImpl(block1, 1, 100, true, 0, index_granularity, true);
|
auto granularity = computeIndexGranularityImpl(block1, 1, 100, true, true);
|
||||||
|
fillIndexGranularityImpl(index_granularity, 0, granularity, block1.rows());
|
||||||
EXPECT_EQ(index_granularity.getMarksCount(), 1);
|
EXPECT_EQ(index_granularity.getMarksCount(), 1);
|
||||||
for (size_t i = 0; i < index_granularity.getMarksCount(); ++i)
|
for (size_t i = 0; i < index_granularity.getMarksCount(); ++i)
|
||||||
EXPECT_EQ(index_granularity.getMarkRows(i), block1.rows());
|
EXPECT_EQ(index_granularity.getMarkRows(i), block1.rows());
|
||||||
@ -58,7 +62,8 @@ TEST(AdaptiveIndexGranularity, FillGranularityToyTests)
|
|||||||
|
|
||||||
{ /// Shift in index offset
|
{ /// Shift in index offset
|
||||||
MergeTreeIndexGranularity index_granularity;
|
MergeTreeIndexGranularity index_granularity;
|
||||||
fillIndexGranularityImpl(block1, 16, 100, false, 6, index_granularity, true);
|
auto granularity = computeIndexGranularityImpl(block1, 16, 100, false, true);
|
||||||
|
fillIndexGranularityImpl(index_granularity, 6, granularity, block1.rows());
|
||||||
EXPECT_EQ(index_granularity.getMarksCount(), 2);
|
EXPECT_EQ(index_granularity.getMarksCount(), 2);
|
||||||
for (size_t i = 0; i < index_granularity.getMarksCount(); ++i)
|
for (size_t i = 0; i < index_granularity.getMarksCount(); ++i)
|
||||||
EXPECT_EQ(index_granularity.getMarkRows(i), 2);
|
EXPECT_EQ(index_granularity.getMarkRows(i), 2);
|
||||||
@ -74,7 +79,10 @@ TEST(AdaptiveIndexGranularity, FillGranularitySequenceOfBlocks)
|
|||||||
auto block3 = getBlockWithSize(65536, 8);
|
auto block3 = getBlockWithSize(65536, 8);
|
||||||
MergeTreeIndexGranularity index_granularity;
|
MergeTreeIndexGranularity index_granularity;
|
||||||
for (const auto & block : {block1, block2, block3})
|
for (const auto & block : {block1, block2, block3})
|
||||||
fillIndexGranularityImpl(block, 1024, 8192, false, 0, index_granularity, true);
|
{
|
||||||
|
auto granularity = computeIndexGranularityImpl(block, 1024, 8192, false, true);
|
||||||
|
fillIndexGranularityImpl(index_granularity, 0, granularity, block.rows());
|
||||||
|
}
|
||||||
|
|
||||||
EXPECT_EQ(index_granularity.getMarksCount(), 192); /// granules
|
EXPECT_EQ(index_granularity.getMarksCount(), 192); /// granules
|
||||||
for (size_t i = 0; i < index_granularity.getMarksCount(); ++i)
|
for (size_t i = 0; i < index_granularity.getMarksCount(); ++i)
|
||||||
@ -87,7 +95,10 @@ TEST(AdaptiveIndexGranularity, FillGranularitySequenceOfBlocks)
|
|||||||
EXPECT_EQ(block1.rows() + block2.rows() + block3.rows(), 3136);
|
EXPECT_EQ(block1.rows() + block2.rows() + block3.rows(), 3136);
|
||||||
MergeTreeIndexGranularity index_granularity;
|
MergeTreeIndexGranularity index_granularity;
|
||||||
for (const auto & block : {block1, block2, block3})
|
for (const auto & block : {block1, block2, block3})
|
||||||
fillIndexGranularityImpl(block, 1024, 8192, false, 0, index_granularity, true);
|
{
|
||||||
|
auto granularity = computeIndexGranularityImpl(block, 1024, 8192, false, true);
|
||||||
|
fillIndexGranularityImpl(index_granularity, 0, granularity, block.rows());
|
||||||
|
}
|
||||||
|
|
||||||
EXPECT_EQ(index_granularity.getMarksCount(), 98); /// granules
|
EXPECT_EQ(index_granularity.getMarksCount(), 98); /// granules
|
||||||
for (size_t i = 0; i < index_granularity.getMarksCount(); ++i)
|
for (size_t i = 0; i < index_granularity.getMarksCount(); ++i)
|
||||||
@ -105,7 +116,8 @@ TEST(AdaptiveIndexGranularity, FillGranularitySequenceOfBlocks)
|
|||||||
size_t index_offset = 0;
|
size_t index_offset = 0;
|
||||||
for (const auto & block : {block1, block2, block3})
|
for (const auto & block : {block1, block2, block3})
|
||||||
{
|
{
|
||||||
fillIndexGranularityImpl(block, 16384, 8192, false, index_offset, index_granularity, true);
|
auto granularity = computeIndexGranularityImpl(block, 16384, 8192, false, true);
|
||||||
|
fillIndexGranularityImpl(index_granularity, index_offset, granularity, block.rows());
|
||||||
index_offset = index_granularity.getLastMarkRows() - block.rows();
|
index_offset = index_granularity.getLastMarkRows() - block.rows();
|
||||||
}
|
}
|
||||||
EXPECT_EQ(index_granularity.getMarksCount(), 1); /// granules
|
EXPECT_EQ(index_granularity.getMarksCount(), 1); /// granules
|
||||||
|
@ -0,0 +1,81 @@
|
|||||||
|
#include <gtest/gtest.h>
|
||||||
|
#include <Core/Block.h>
|
||||||
|
#include <Columns/ColumnVector.h>
|
||||||
|
|
||||||
|
// I know that inclusion of .cpp is not good at all
|
||||||
|
#include <Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp>
|
||||||
|
|
||||||
|
using namespace DB;
|
||||||
|
|
||||||
|
TEST(IndexGranularityCompactParts, FillGranularitySequenceOfBlocks)
|
||||||
|
{
|
||||||
|
{ /// Three blocks in one granule.
|
||||||
|
size_t rows = 8;
|
||||||
|
size_t granularity = 32;
|
||||||
|
|
||||||
|
MergeTreeIndexGranularity index_granularity;
|
||||||
|
size_t index_offset = 0;
|
||||||
|
size_t rows_written = 0;
|
||||||
|
for (size_t i = 0; i < 3; ++i)
|
||||||
|
{
|
||||||
|
fillIndexGranularityImpl(index_granularity, index_offset, granularity, rows);
|
||||||
|
rows_written += rows;
|
||||||
|
index_offset = granularity - rows_written;
|
||||||
|
}
|
||||||
|
|
||||||
|
EXPECT_EQ(index_granularity.getMarksCount(), 1); /// granules
|
||||||
|
/// It's ok, that granularity is higher than actual number of row.
|
||||||
|
/// It will be corrected in CompactWriter.
|
||||||
|
EXPECT_EQ(index_granularity.getMarkRows(0), granularity);
|
||||||
|
}
|
||||||
|
|
||||||
|
{ /// Granule is extended with small block
|
||||||
|
size_t rows1 = 30;
|
||||||
|
size_t rows2 = 8;
|
||||||
|
size_t granularity = 32;
|
||||||
|
|
||||||
|
MergeTreeIndexGranularity index_granularity;
|
||||||
|
size_t index_offset = 0;
|
||||||
|
|
||||||
|
fillIndexGranularityImpl(index_granularity, index_offset, granularity, rows1);
|
||||||
|
index_offset = granularity - rows1;
|
||||||
|
|
||||||
|
fillIndexGranularityImpl(index_granularity, index_offset, granularity, rows2);
|
||||||
|
|
||||||
|
EXPECT_EQ(index_granularity.getMarksCount(), 1);
|
||||||
|
EXPECT_EQ(index_granularity.getMarkRows(0), rows1 + rows2);
|
||||||
|
}
|
||||||
|
|
||||||
|
{ /// New granule is created with large block;
|
||||||
|
size_t rows1 = 30;
|
||||||
|
size_t rows2 = 25;
|
||||||
|
size_t granularity = 32;
|
||||||
|
|
||||||
|
MergeTreeIndexGranularity index_granularity;
|
||||||
|
size_t index_offset = 0;
|
||||||
|
|
||||||
|
fillIndexGranularityImpl(index_granularity, index_offset, granularity, rows1);
|
||||||
|
index_offset = granularity - rows1;
|
||||||
|
|
||||||
|
fillIndexGranularityImpl(index_granularity, index_offset, granularity, rows2);
|
||||||
|
|
||||||
|
EXPECT_EQ(index_granularity.getMarksCount(), 2);
|
||||||
|
EXPECT_EQ(index_granularity.getMarkRows(0), granularity);
|
||||||
|
EXPECT_EQ(index_granularity.getMarkRows(1), rows1 + rows2 - granularity);
|
||||||
|
}
|
||||||
|
|
||||||
|
{ /// Three large blocks
|
||||||
|
size_t rows = 40;
|
||||||
|
size_t granularity = 32;
|
||||||
|
|
||||||
|
MergeTreeIndexGranularity index_granularity;
|
||||||
|
size_t index_offset = 0;
|
||||||
|
|
||||||
|
for (size_t i = 0; i < 3; ++i)
|
||||||
|
fillIndexGranularityImpl(index_granularity, index_offset, granularity, rows);
|
||||||
|
|
||||||
|
EXPECT_EQ(index_granularity.getMarksCount(), 3);
|
||||||
|
for (size_t i = 0; i < 3; ++i)
|
||||||
|
EXPECT_EQ(index_granularity.getMarkRows(i), rows);
|
||||||
|
}
|
||||||
|
}
|
@ -2,6 +2,8 @@ import time
|
|||||||
import pytest
|
import pytest
|
||||||
import random
|
import random
|
||||||
import string
|
import string
|
||||||
|
import os
|
||||||
|
import struct
|
||||||
|
|
||||||
from helpers.test_tools import TSV
|
from helpers.test_tools import TSV
|
||||||
from helpers.test_tools import assert_eq_with_retry
|
from helpers.test_tools import assert_eq_with_retry
|
||||||
@ -260,3 +262,24 @@ def test_polymorphic_parts_non_adaptive(start_cluster):
|
|||||||
"WHERE table = 'non_adaptive_table' AND active GROUP BY part_type ORDER BY part_type")) == TSV("Wide\t2\n")
|
"WHERE table = 'non_adaptive_table' AND active GROUP BY part_type ORDER BY part_type")) == TSV("Wide\t2\n")
|
||||||
|
|
||||||
assert node1.contains_in_log("<Warning> default.non_adaptive_table: Table can't create parts with adaptive granularity")
|
assert node1.contains_in_log("<Warning> default.non_adaptive_table: Table can't create parts with adaptive granularity")
|
||||||
|
|
||||||
|
|
||||||
|
def test_polymorphic_parts_index(start_cluster):
|
||||||
|
node1.query('''
|
||||||
|
CREATE TABLE index_compact(a UInt32, s String)
|
||||||
|
ENGINE = MergeTree ORDER BY a
|
||||||
|
SETTINGS min_rows_for_wide_part = 1000, index_granularity = 128, merge_max_block_size = 100''')
|
||||||
|
|
||||||
|
node1.query("INSERT INTO index_compact SELECT number, toString(number) FROM numbers(100)")
|
||||||
|
node1.query("INSERT INTO index_compact SELECT number, toString(number) FROM numbers(30)")
|
||||||
|
node1.query("OPTIMIZE TABLE index_compact FINAL")
|
||||||
|
|
||||||
|
assert node1.query("SELECT part_type FROM system.parts WHERE table = 'index_compact' AND active") == "Compact\n"
|
||||||
|
assert node1.query("SELECT marks FROM system.parts WHERE table = 'index_compact' AND active") == "2\n"
|
||||||
|
|
||||||
|
index_path = os.path.join(node1.path, "database/data/default/index_compact/all_1_2_1/primary.idx")
|
||||||
|
f = open(index_path, 'rb')
|
||||||
|
|
||||||
|
assert os.path.getsize(index_path) == 8
|
||||||
|
assert struct.unpack('I', f.read(4))[0] == 0
|
||||||
|
assert struct.unpack('I', f.read(4))[0] == 99
|
||||||
|
Loading…
Reference in New Issue
Block a user