Removed UnsortedMergeTree, part 2 [#CLICKHOUSE-2]

This commit is contained in:
Alexey Milovidov 2018-02-19 20:31:30 +03:00
parent 399ede6404
commit 7ff1346685
6 changed files with 12 additions and 24 deletions

View File

@ -113,10 +113,8 @@ MergeTreeData::MergeTreeData(
{
merging_params.check(columns);
if (primary_expr_ast && merging_params.mode == MergingParams::Unsorted)
throw Exception("Primary key cannot be set for UnsortedMergeTree", ErrorCodes::BAD_ARGUMENTS);
if (!primary_expr_ast && merging_params.mode != MergingParams::Unsorted)
throw Exception("Primary key can be empty only for UnsortedMergeTree", ErrorCodes::BAD_ARGUMENTS);
if (!primary_expr_ast) /// TODO Allow tables without primary key.
throw Exception("Primary key cannot be empty", ErrorCodes::BAD_ARGUMENTS);
initPrimaryKey();
@ -402,7 +400,6 @@ String MergeTreeData::MergingParams::getModeName() const
case Collapsing: return "Collapsing";
case Summing: return "Summing";
case Aggregating: return "Aggregating";
case Unsorted: return "Unsorted";
case Replacing: return "Replacing";
case Graphite: return "Graphite";
case VersionedCollapsing: return "VersionedCollapsing";

View File

@ -78,8 +78,6 @@ namespace ErrorCodes
/// column is set, keep the latest row with the maximal version.
/// - Summing - sum all numeric columns not contained in the primary key for all rows with the same primary key.
/// - Aggregating - merge columns containing aggregate function states for all rows with the same primary key.
/// - Unsorted - during the merge the data is not sorted but merely concatenated; this allows reading the data
/// in the same batches as they were written.
/// - Graphite - performs coarsening of historical data for Graphite (a system for quantitative monitoring).
/// The MergeTreeData class contains a list of parts and the data structure parameters.
@ -239,7 +237,6 @@ public:
Collapsing = 1,
Summing = 2,
Aggregating = 3,
Unsorted = 4,
Replacing = 5,
Graphite = 6,
VersionedCollapsing = 7,
@ -269,7 +266,7 @@ public:
/// Attach the table corresponding to the directory in full_path (must end with /), with the given columns.
/// Correctness of names and paths is not checked.
///
/// primary_expr_ast - expression used for sorting; empty for UnsortedMergeTree.
/// primary_expr_ast - expression used for sorting;
/// date_column_name - if not empty, the name of the Date column used for partitioning by month.
/// Otherwise, partition_expr_ast is used for partitioning.
/// require_part_metadata - should checksums.txt and columns.txt exist in the part directory.
@ -442,6 +439,7 @@ public:
broken_part_callback(name);
}
bool hasPrimaryKey() const { return !primary_sort_descr.empty(); }
ExpressionActionsPtr getPrimaryExpression() const { return primary_expr; }
ExpressionActionsPtr getSecondarySortExpression() const { return secondary_sort_expr; } /// may return nullptr
SortDescription getPrimarySortDescription() const { return primary_sort_descr; }

View File

@ -592,7 +592,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
input->setProgressCallback(MergeProgressCallback(
merge_entry, sum_input_rows_upper_bound, column_sizes, watch_prev_elapsed, merge_alg));
if (data.merging_params.mode != MergeTreeData::MergingParams::Unsorted)
if (data.hasPrimaryKey())
src_streams.emplace_back(std::make_shared<MaterializingBlockInputStream>(
std::make_shared<ExpressionBlockInputStream>(BlockInputStreamPtr(std::move(input)), data.getPrimaryExpression())));
else
@ -642,10 +642,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
src_streams, sort_desc, data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE, false, rows_sources_write_buf.get());
break;
case MergeTreeData::MergingParams::Unsorted:
merged_stream = std::make_unique<ConcatBlockInputStream>(src_streams);
break;
default:
throw Exception("Unknown mode of operation for MergeTreeData: " + toString<int>(data.merging_params.mode), ErrorCodes::LOGICAL_ERROR);
}

View File

@ -520,7 +520,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
{
RangesInDataPart ranges(part, part_index++);
if (data.merging_params.mode != MergeTreeData::MergingParams::Unsorted)
if (data.hasPrimaryKey())
ranges.ranges = markRangesFromPKRange(part->index, key_condition, settings);
else
ranges.ranges = MarkRanges{MarkRange{0, part->marks_count}};
@ -830,9 +830,6 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal
to_merge, data.getSortDescription(), data.merging_params.sign_column, max_block_size, true);
break;
case MergeTreeData::MergingParams::Unsorted:
throw Exception("UnsortedMergeTree doesn't support FINAL", ErrorCodes::LOGICAL_ERROR);
case MergeTreeData::MergingParams::Graphite:
throw Exception("GraphiteMergeTree doesn't support FINAL", ErrorCodes::LOGICAL_ERROR);
}

View File

@ -172,8 +172,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
dir.createDirectories();
/// If you need to calculate some columns to sort, we do it.
if (data.merging_params.mode != MergeTreeData::MergingParams::Unsorted)
/// If we need to calculate some columns to sort.
if (data.hasPrimaryKey())
{
data.getPrimaryExpression()->execute(block);
auto secondary_sort_expr = data.getSecondarySortExpression();
@ -188,7 +188,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
/// Sort.
IColumn::Permutation * perm_ptr = nullptr;
IColumn::Permutation perm;
if (data.merging_params.mode != MergeTreeData::MergingParams::Unsorted)
if (data.hasPrimaryKey())
{
if (!isAlreadySorted(block, sort_descr))
{

View File

@ -293,7 +293,7 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
if (additional_column_checksums)
checksums = std::move(*additional_column_checksums);
if (storage.merging_params.mode != MergeTreeData::MergingParams::Unsorted)
if (index_stream)
{
index_stream->next();
checksums.files["primary.idx"].file_size = index_stream->count();
@ -354,7 +354,7 @@ void MergedBlockOutputStream::init()
{
Poco::File(part_path).createDirectories();
if (storage.merging_params.mode != MergeTreeData::MergingParams::Unsorted)
if (storage.hasPrimaryKey())
{
index_file_stream = std::make_unique<WriteBufferFromFile>(
part_path + "primary.idx", DBMS_DEFAULT_BUFFER_SIZE, O_TRUNC | O_CREAT | O_WRONLY);
@ -443,7 +443,7 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
/// Write index. The index contains Primary Key value for each `index_granularity` row.
for (size_t i = index_offset; i < rows; i += storage.index_granularity)
{
if (storage.merging_params.mode != MergeTreeData::MergingParams::Unsorted)
if (storage.hasPrimaryKey())
{
for (size_t j = 0, size = primary_columns.size(); j < size; ++j)
{