This commit is contained in:
Nikita Mikhaylov 2023-04-24 13:24:55 +00:00
parent 745d9bb47f
commit b60109d43e
3 changed files with 40 additions and 3 deletions

View File

@ -42,20 +42,28 @@ void CheckSortedTransform::transform(Chunk & chunk)
else if (res > 0)
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Sort order of blocks violated for column number {}, left: {}, right: {}.",
"Sort order of blocks violated for column number {}, left: {}, right: {}. Chunk {}, rows read {}.{}",
column_number,
applyVisitor(FieldVisitorDump(), (*left_col)[left_index]),
applyVisitor(FieldVisitorDump(), (*right_col)[right_index]));
applyVisitor(FieldVisitorDump(), (*right_col)[right_index]),
chunk_num, rows_read,
description.empty() ? String() : fmt::format(" ({})", description));
}
}
};
const auto & chunk_columns = chunk.getColumns();
++rows_read;
if (!last_row.empty())
check(last_row, 0, chunk_columns, 0);
for (size_t i = 1; i < num_rows; ++i)
{
++rows_read;
check(chunk_columns, i - 1, chunk_columns, i);
}
last_row.clear();
for (const auto & chunk_column : chunk_columns)
@ -64,6 +72,8 @@ void CheckSortedTransform::transform(Chunk & chunk)
column->insertFrom(*chunk_column, num_rows - 1);
last_row.emplace_back(std::move(column));
}
++chunk_num;
}
}

View File

@ -13,7 +13,7 @@ public:
CheckSortedTransform(const Block & header, const SortDescription & sort_description);
String getName() const override { return "CheckSortedTransform"; }
void setDescription(const String & str) { description = str; }
protected:
void transform(Chunk & chunk) override;
@ -21,5 +21,8 @@ protected:
private:
SortDescriptionWithPositions sort_description_map;
Columns last_row;
String description;
size_t chunk_num = 0;
size_t rows_read = 0;
};
}

View File

@ -7,6 +7,7 @@
#include <Common/logger_useful.h>
#include <Common/ActionBlocker.h>
#include <Processors/Transforms/CheckSortedTransform.h>
#include <Storages/LightweightDeleteDescription.h>
#include <Storages/MergeTree/DataPartStorageOnDiskFull.h>
@ -957,6 +958,20 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
for (size_t i = 0; i < sort_columns_size; ++i)
sort_description.emplace_back(sort_columns[i], 1, 1);
if (!sort_description.empty())
{
for (size_t i = 0; i < pipes.size(); ++i)
{
auto & pipe = pipes[i];
pipe.addSimpleTransform([&](const Block & header_)
{
auto transform = std::make_shared<CheckSortedTransform>(header_, sort_description);
transform->setDescription(global_ctx->future_part->parts[i]->name);
return transform;
});
}
}
/// The order of the streams is important: when the key is matched, the elements go in the order of the source stream number.
/// In the merged part, the lines with the same key must be in the ascending order of the identifier of original part,
/// that is going in insertion order.
@ -1023,6 +1038,15 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
auto res_pipe = Pipe::unitePipes(std::move(pipes));
res_pipe.addTransform(std::move(merged_transform));
if (!sort_description.empty())
{
res_pipe.addSimpleTransform([&](const Block & header_)
{
auto transform = std::make_shared<CheckSortedTransform>(header_, sort_description);
return transform;
});
}
if (global_ctx->deduplicate)
{
/// We don't want to deduplicate by block number column