mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 00:30:49 +00:00
Fixed half of bad code #2571
This commit is contained in:
parent
083b075631
commit
e86f73e465
@ -1,3 +1,5 @@
|
||||
#include <optional>
|
||||
|
||||
#include <Core/Field.h>
|
||||
#include <Common/FieldVisitors.h>
|
||||
#include <Core/Row.h>
|
||||
@ -20,9 +22,12 @@
|
||||
#include <Interpreters/convertFieldToType.h>
|
||||
#include <Interpreters/evaluateConstantExpression.h>
|
||||
#include <Interpreters/NullableUtils.h>
|
||||
#include <Interpreters/sortBlock.h>
|
||||
|
||||
#include <Storages/MergeTree/KeyCondition.h>
|
||||
|
||||
#include <ext/range.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -43,22 +48,34 @@ void NO_INLINE Set::insertFromBlockImpl(
|
||||
const ColumnRawPtrs & key_columns,
|
||||
size_t rows,
|
||||
SetVariants & variants,
|
||||
ConstNullMapPtr null_map)
|
||||
ConstNullMapPtr null_map,
|
||||
ColumnUInt8::Container * out_filter)
|
||||
{
|
||||
if (null_map)
|
||||
insertFromBlockImplCase<Method, true>(method, key_columns, rows, variants, null_map);
|
||||
{
|
||||
if (out_filter)
|
||||
insertFromBlockImplCase<Method, true, true>(method, key_columns, rows, variants, null_map, out_filter);
|
||||
else
|
||||
insertFromBlockImplCase<Method, true, false>(method, key_columns, rows, variants, null_map, out_filter);
|
||||
}
|
||||
else
|
||||
insertFromBlockImplCase<Method, false>(method, key_columns, rows, variants, null_map);
|
||||
{
|
||||
if (out_filter)
|
||||
insertFromBlockImplCase<Method, false, true>(method, key_columns, rows, variants, null_map, out_filter);
|
||||
else
|
||||
insertFromBlockImplCase<Method, false, false>(method, key_columns, rows, variants, null_map, out_filter);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
template <typename Method, bool has_null_map>
|
||||
template <typename Method, bool has_null_map, bool build_filter>
|
||||
void NO_INLINE Set::insertFromBlockImplCase(
|
||||
Method & method,
|
||||
const ColumnRawPtrs & key_columns,
|
||||
size_t rows,
|
||||
SetVariants & variants,
|
||||
ConstNullMapPtr null_map)
|
||||
ConstNullMapPtr null_map,
|
||||
ColumnUInt8::Container * out_filter)
|
||||
{
|
||||
typename Method::State state;
|
||||
state.init(key_columns);
|
||||
@ -78,6 +95,9 @@ void NO_INLINE Set::insertFromBlockImplCase(
|
||||
|
||||
if (inserted)
|
||||
method.onNewKey(*it, keys_size, variants.string_pool);
|
||||
|
||||
if (build_filter)
|
||||
(*out_filter)[i] = inserted;
|
||||
}
|
||||
}
|
||||
|
||||
@ -152,13 +172,18 @@ bool Set::insertFromBlock(const Block & block, bool fill_set_elements)
|
||||
ConstNullMapPtr null_map{};
|
||||
extractNestedColumnsAndNullMap(key_columns, null_map_holder, null_map);
|
||||
|
||||
/// Filter to extract distinct values from the block.
|
||||
ColumnUInt8::MutablePtr filter;
|
||||
if (fill_set_elements)
|
||||
filter = ColumnUInt8::create(block.rows());
|
||||
|
||||
switch (data.type)
|
||||
{
|
||||
case SetVariants::Type::EMPTY:
|
||||
break;
|
||||
#define M(NAME) \
|
||||
case SetVariants::Type::NAME: \
|
||||
insertFromBlockImpl(*data.NAME, key_columns, rows, data, null_map); \
|
||||
insertFromBlockImpl(*data.NAME, key_columns, rows, data, null_map, filter ? &filter->getData() : nullptr); \
|
||||
break;
|
||||
APPLY_FOR_SET_VARIANTS(M)
|
||||
#undef M
|
||||
@ -166,13 +191,13 @@ bool Set::insertFromBlock(const Block & block, bool fill_set_elements)
|
||||
|
||||
if (fill_set_elements)
|
||||
{
|
||||
for (size_t i = 0; i < rows; ++i)
|
||||
{
|
||||
std::vector<Field> new_set_elements;
|
||||
for (size_t j = 0; j < keys_size; ++j)
|
||||
new_set_elements.push_back((*key_columns[j])[i]);
|
||||
if (set_elements.empty())
|
||||
set_elements.resize(keys_size);
|
||||
|
||||
set_elements->emplace_back(std::move(new_set_elements));
|
||||
for (size_t i = 0; i < keys_size; ++i)
|
||||
{
|
||||
auto filtered_column = block.getByPosition(i).column->filter(filter->getData(), rows);
|
||||
set_elements[i]->assumeMutableRef().insertRangeFrom(*filtered_column, 0, filtered_column->size());
|
||||
}
|
||||
}
|
||||
|
||||
@ -403,9 +428,8 @@ void Set::executeOrdinary(
|
||||
}
|
||||
|
||||
|
||||
MergeTreeSetIndex::MergeTreeSetIndex(const SetElements & set_elements, std::vector<KeyTuplePositionMapping> && index_mapping_)
|
||||
: ordered_set(),
|
||||
indexes_mapping(std::move(index_mapping_))
|
||||
MergeTreeSetIndex::MergeTreeSetIndex(const Columns & set_elements, std::vector<KeyTuplePositionMapping> && index_mapping_)
|
||||
: indexes_mapping(std::move(index_mapping_))
|
||||
{
|
||||
std::sort(indexes_mapping.begin(), indexes_mapping.end(),
|
||||
[](const KeyTuplePositionMapping & l, const KeyTuplePositionMapping & r)
|
||||
@ -420,16 +444,23 @@ MergeTreeSetIndex::MergeTreeSetIndex(const SetElements & set_elements, std::vect
|
||||
return l.key_index == r.key_index;
|
||||
}), indexes_mapping.end());
|
||||
|
||||
for (size_t i = 0; i < set_elements.size(); ++i)
|
||||
{
|
||||
std::vector<FieldWithInfinity> new_set_values;
|
||||
for (size_t j = 0; j < indexes_mapping.size(); ++j)
|
||||
new_set_values.emplace_back(set_elements[i][indexes_mapping[j].tuple_index]);
|
||||
size_t tuple_size = indexes_mapping.size();
|
||||
ordered_set.resize(tuple_size);
|
||||
for (size_t i = 0; i < tuple_size; ++i)
|
||||
ordered_set[i] = set_elements[indexes_mapping[i].tuple_index];
|
||||
|
||||
ordered_set.emplace_back(std::move(new_set_values));
|
||||
Block block_to_sort;
|
||||
SortDescription sort_description;
|
||||
for (size_t i = 0; i < tuple_size; ++i)
|
||||
{
|
||||
block_to_sort.insert({ ordered_set[i], nullptr, "" });
|
||||
sort_description.emplace_back(i, 1, 1);
|
||||
}
|
||||
|
||||
std::sort(ordered_set.begin(), ordered_set.end());
|
||||
sortBlock(block_to_sort, sort_description);
|
||||
|
||||
for (size_t i = 0; i < tuple_size; ++i)
|
||||
ordered_set[i] = block_to_sort.getByPosition(i).column;
|
||||
}
|
||||
|
||||
|
||||
@ -439,15 +470,19 @@ MergeTreeSetIndex::MergeTreeSetIndex(const SetElements & set_elements, std::vect
|
||||
*/
|
||||
BoolMask MergeTreeSetIndex::mayBeTrueInRange(const std::vector<Range> & key_ranges, const DataTypes & data_types)
|
||||
{
|
||||
std::vector<FieldWithInfinity> left_point;
|
||||
std::vector<FieldWithInfinity> right_point;
|
||||
left_point.reserve(indexes_mapping.size());
|
||||
right_point.reserve(indexes_mapping.size());
|
||||
size_t tuple_size = indexes_mapping.size();
|
||||
|
||||
using FieldWithInfinityTuple = std::vector<FieldWithInfinity>;
|
||||
|
||||
FieldWithInfinityTuple left_point;
|
||||
FieldWithInfinityTuple right_point;
|
||||
left_point.reserve(tuple_size);
|
||||
right_point.reserve(tuple_size);
|
||||
|
||||
bool invert_left_infinities = false;
|
||||
bool invert_right_infinities = false;
|
||||
|
||||
for (size_t i = 0; i < indexes_mapping.size(); ++i)
|
||||
for (size_t i = 0; i < tuple_size; ++i)
|
||||
{
|
||||
std::optional<Range> new_range = KeyCondition::applyMonotonicFunctionsChainToRange(
|
||||
key_ranges[indexes_mapping[i].key_index],
|
||||
@ -491,16 +526,40 @@ BoolMask MergeTreeSetIndex::mayBeTrueInRange(const std::vector<Range> & key_rang
|
||||
}
|
||||
}
|
||||
|
||||
/// This allows to construct tuple in 'ordered_set' at specified index for comparison with range.
|
||||
|
||||
auto indices = ext::range(0, ordered_set.at(0)->size());
|
||||
|
||||
auto extract_tuple = [tuple_size, this](size_t i)
|
||||
{
|
||||
/// Inefficient.
|
||||
FieldWithInfinityTuple res;
|
||||
res.reserve(tuple_size);
|
||||
for (size_t j = 0; j < tuple_size; ++j)
|
||||
res.emplace_back((*ordered_set[j])[i]);
|
||||
return res;
|
||||
};
|
||||
|
||||
auto compare = [&extract_tuple](size_t i, const FieldWithInfinityTuple & rhs)
|
||||
{
|
||||
return extract_tuple(i) < rhs;
|
||||
};
|
||||
|
||||
/** Because each parallelogram maps to a contiguous sequence of elements
|
||||
* layed out in the lexicographically increasing order, the set intersects the range
|
||||
* if and only if either bound coincides with an element or at least one element
|
||||
* is between the lower bounds
|
||||
*/
|
||||
auto left_lower = std::lower_bound(ordered_set.begin(), ordered_set.end(), left_point);
|
||||
auto right_lower = std::lower_bound(ordered_set.begin(), ordered_set.end(), right_point);
|
||||
return {left_lower != right_lower
|
||||
|| (left_lower != ordered_set.end() && *left_lower == left_point)
|
||||
|| (right_lower != ordered_set.end() && *right_lower == right_point), true};
|
||||
auto left_lower = std::lower_bound(indices.begin(), indices.end(), left_point, compare);
|
||||
auto right_lower = std::lower_bound(indices.begin(), indices.end(), right_point, compare);
|
||||
|
||||
return
|
||||
{
|
||||
left_lower != right_lower
|
||||
|| (left_lower != indices.end() && extract_tuple(*left_lower) == left_point)
|
||||
|| (right_lower != indices.end() && extract_tuple(*right_lower) == right_point),
|
||||
true
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -18,12 +18,10 @@ namespace DB
|
||||
struct Range;
|
||||
class FieldWithInfinity;
|
||||
|
||||
using SetElements = std::vector<std::vector<Field>>;
|
||||
using SetElementsPtr = std::unique_ptr<SetElements>;
|
||||
|
||||
class IFunctionBase;
|
||||
using FunctionBasePtr = std::shared_ptr<IFunctionBase>;
|
||||
|
||||
|
||||
/** Data structure for implementation of IN expression.
|
||||
*/
|
||||
class Set
|
||||
@ -31,8 +29,7 @@ class Set
|
||||
public:
|
||||
Set(const SizeLimits & limits) :
|
||||
log(&Logger::get("Set")),
|
||||
limits(limits),
|
||||
set_elements(std::make_unique<SetElements>())
|
||||
limits(limits)
|
||||
{
|
||||
}
|
||||
|
||||
@ -66,7 +63,7 @@ public:
|
||||
|
||||
const DataTypes & getDataTypes() const { return data_types; }
|
||||
|
||||
SetElements & getSetElements() { return *set_elements.get(); }
|
||||
const Columns & getSetElements() const { return set_elements; }
|
||||
|
||||
private:
|
||||
size_t keys_size = 0;
|
||||
@ -106,9 +103,9 @@ private:
|
||||
bool negative,
|
||||
const PaddedPODArray<UInt8> * null_map) const;
|
||||
|
||||
/// Vector of elements of `Set`.
|
||||
/// Collected elements of `Set`.
|
||||
/// It is necessary for the index to work on the primary key in the IN statement.
|
||||
SetElementsPtr set_elements;
|
||||
Columns set_elements;
|
||||
|
||||
/** Protects work with the set in the functions `insertFromBlock` and `execute`.
|
||||
* These functions can be called simultaneously from different threads only when using StorageSet,
|
||||
@ -123,15 +120,17 @@ private:
|
||||
const ColumnRawPtrs & key_columns,
|
||||
size_t rows,
|
||||
SetVariants & variants,
|
||||
ConstNullMapPtr null_map);
|
||||
ConstNullMapPtr null_map,
|
||||
ColumnUInt8::Container * out_filter);
|
||||
|
||||
template <typename Method, bool has_null_map>
|
||||
template <typename Method, bool has_null_map, bool build_filter>
|
||||
void insertFromBlockImplCase(
|
||||
Method & method,
|
||||
const ColumnRawPtrs & key_columns,
|
||||
size_t rows,
|
||||
SetVariants & variants,
|
||||
ConstNullMapPtr null_map);
|
||||
ConstNullMapPtr null_map,
|
||||
ColumnUInt8::Container * out_filter);
|
||||
|
||||
template <typename Method>
|
||||
void executeImpl(
|
||||
@ -156,6 +155,7 @@ using SetPtr = std::shared_ptr<Set>;
|
||||
using ConstSetPtr = std::shared_ptr<const Set>;
|
||||
using Sets = std::vector<SetPtr>;
|
||||
|
||||
|
||||
class IFunction;
|
||||
using FunctionPtr = std::shared_ptr<IFunction>;
|
||||
|
||||
@ -174,16 +174,14 @@ public:
|
||||
std::vector<FunctionBasePtr> functions;
|
||||
};
|
||||
|
||||
MergeTreeSetIndex(const SetElements & set_elements, std::vector<KeyTuplePositionMapping> && indexes_mapping_);
|
||||
MergeTreeSetIndex(const Columns & set_elements, std::vector<KeyTuplePositionMapping> && indexes_mapping_);
|
||||
|
||||
size_t size() const { return ordered_set.size(); }
|
||||
size_t size() const { return ordered_set.at(0)->size(); }
|
||||
|
||||
BoolMask mayBeTrueInRange(const std::vector<Range> & key_ranges, const DataTypes & data_types);
|
||||
|
||||
private:
|
||||
using OrderedTuples = std::vector<std::vector<FieldWithInfinity>>;
|
||||
OrderedTuples ordered_set;
|
||||
|
||||
Columns ordered_set;
|
||||
std::vector<KeyTuplePositionMapping> indexes_mapping;
|
||||
};
|
||||
|
||||
|
@ -121,7 +121,7 @@ void sortBlock(Block & block, const SortDescription & description, size_t limit)
|
||||
|
||||
size_t columns = block.columns();
|
||||
for (size_t i = 0; i < columns; ++i)
|
||||
block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->permute(perm, limit);
|
||||
block.getByPosition(i).column = block.getByPosition(i).column->permute(perm, limit);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -166,7 +166,7 @@ void sortBlock(Block & block, const SortDescription & description, size_t limit)
|
||||
|
||||
size_t columns = block.columns();
|
||||
for (size_t i = 0; i < columns; ++i)
|
||||
block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->permute(perm, limit);
|
||||
block.getByPosition(i).column = block.getByPosition(i).column->permute(perm, limit);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -269,13 +269,13 @@ KeyCondition::KeyCondition(
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
const NamesAndTypesList & all_columns,
|
||||
const SortDescription & sort_descr_,
|
||||
const Names & key_column_names,
|
||||
const ExpressionActionsPtr & key_expr_)
|
||||
: sort_descr(sort_descr_), key_expr(key_expr_), prepared_sets(query_info.sets)
|
||||
: key_expr(key_expr_), prepared_sets(query_info.sets)
|
||||
{
|
||||
for (size_t i = 0; i < sort_descr.size(); ++i)
|
||||
for (size_t i = 0, size = key_column_names.size(); i < size; ++i)
|
||||
{
|
||||
std::string name = sort_descr[i].column_name;
|
||||
std::string name = key_column_names[i];
|
||||
if (!key_columns.count(name))
|
||||
key_columns[name] = i;
|
||||
}
|
||||
@ -523,8 +523,7 @@ bool KeyCondition::isTupleIndexable(
|
||||
if (indexes_mapping.empty())
|
||||
return false;
|
||||
|
||||
out.set_index = std::make_shared<MergeTreeSetIndex>(
|
||||
prepared_set->getSetElements(), std::move(indexes_mapping));
|
||||
out.set_index = std::make_shared<MergeTreeSetIndex>(prepared_set->getSetElements(), std::move(indexes_mapping));
|
||||
|
||||
return true;
|
||||
}
|
||||
@ -636,7 +635,7 @@ bool KeyCondition::atomFromAST(const ASTPtr & node, const Context & context, Blo
|
||||
|
||||
DataTypePtr key_expr_type; /// Type of expression containing key column
|
||||
size_t key_arg_pos; /// Position of argument with key column (non-const argument)
|
||||
size_t key_column_num; /// Number of a key column (inside sort_descr array)
|
||||
size_t key_column_num; /// Number of a key column (inside key_column_names array)
|
||||
MonotonicFunctionsChain chain;
|
||||
bool is_set_const = false;
|
||||
bool is_constant_transformed = false;
|
||||
|
@ -189,6 +189,7 @@ public:
|
||||
String toString() const;
|
||||
};
|
||||
|
||||
|
||||
/// Class that extends arbitrary objects with infinities, like +-inf for floats
|
||||
class FieldWithInfinity
|
||||
{
|
||||
@ -216,6 +217,7 @@ private:
|
||||
FieldWithInfinity(const Type type_);
|
||||
};
|
||||
|
||||
|
||||
/** Condition on the index.
|
||||
*
|
||||
* Consists of the conditions for the key belonging to all possible ranges or sets,
|
||||
@ -232,7 +234,7 @@ public:
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
const NamesAndTypesList & all_columns,
|
||||
const SortDescription & sort_descr,
|
||||
const Names & key_column_names,
|
||||
const ExpressionActionsPtr & key_expr);
|
||||
|
||||
/// Whether the condition is feasible in the key range.
|
||||
@ -324,8 +326,8 @@ private:
|
||||
|
||||
public:
|
||||
static const AtomMap atom_map;
|
||||
private:
|
||||
|
||||
private:
|
||||
bool mayBeTrueInRange(
|
||||
size_t used_key_size,
|
||||
const Field * left_key,
|
||||
@ -379,7 +381,6 @@ private:
|
||||
|
||||
RPN rpn;
|
||||
|
||||
SortDescription sort_descr;
|
||||
ColumnIndices key_columns;
|
||||
ExpressionActionsPtr key_expr;
|
||||
PreparedSets prepared_sets;
|
||||
|
@ -214,19 +214,16 @@ static void checkKeyExpression(const ExpressionActions & expr, const Block & sam
|
||||
|
||||
void MergeTreeData::initPrimaryKey()
|
||||
{
|
||||
auto addSortDescription = [](SortDescription & descr, const ASTPtr & expr_ast)
|
||||
auto addSortColumns = [](Names & out, const ASTPtr & expr_ast)
|
||||
{
|
||||
descr.reserve(descr.size() + expr_ast->children.size());
|
||||
out.reserve(out.size() + expr_ast->children.size());
|
||||
for (const ASTPtr & ast : expr_ast->children)
|
||||
{
|
||||
String name = ast->getColumnName();
|
||||
descr.emplace_back(name, 1, 1);
|
||||
}
|
||||
out.emplace_back(ast->getColumnName());
|
||||
};
|
||||
|
||||
/// Initialize description of sorting for primary key.
|
||||
primary_sort_descr.clear();
|
||||
addSortDescription(primary_sort_descr, primary_expr_ast);
|
||||
primary_sort_columns.clear();
|
||||
addSortColumns(primary_sort_columns, primary_expr_ast);
|
||||
|
||||
primary_expr = ExpressionAnalyzer(primary_expr_ast, context, nullptr, getColumns().getAllPhysical()).getActions(false);
|
||||
|
||||
@ -243,10 +240,10 @@ void MergeTreeData::initPrimaryKey()
|
||||
for (size_t i = 0; i < primary_key_size; ++i)
|
||||
primary_key_data_types[i] = primary_key_sample.getByPosition(i).type;
|
||||
|
||||
sort_descr = primary_sort_descr;
|
||||
sort_columns = primary_sort_columns;
|
||||
if (secondary_sort_expr_ast)
|
||||
{
|
||||
addSortDescription(sort_descr, secondary_sort_expr_ast);
|
||||
addSortColumns(sort_columns, secondary_sort_expr_ast);
|
||||
secondary_sort_expr = ExpressionAnalyzer(secondary_sort_expr_ast, context, nullptr, getColumns().getAllPhysical()).getActions(false);
|
||||
|
||||
ExpressionActionsPtr projected_expr =
|
||||
@ -279,7 +276,6 @@ void MergeTreeData::initPartitionKey()
|
||||
{
|
||||
minmax_idx_columns.emplace_back(column.name);
|
||||
minmax_idx_column_types.emplace_back(column.type);
|
||||
minmax_idx_sort_descr.emplace_back(column.name, 1, 1);
|
||||
}
|
||||
|
||||
/// Try to find the date column in columns used by the partition key (a common case).
|
||||
@ -2282,14 +2278,14 @@ MergeTreeData::DataPartsVector MergeTreeData::Transaction::commit(MergeTreeData:
|
||||
|
||||
bool MergeTreeData::isPrimaryOrMinMaxKeyColumnPossiblyWrappedInFunctions(const ASTPtr & node) const
|
||||
{
|
||||
String column_name = node->getColumnName();
|
||||
const String column_name = node->getColumnName();
|
||||
|
||||
for (const auto & column : primary_sort_descr)
|
||||
if (column_name == column.column_name)
|
||||
for (const auto & name : primary_sort_columns)
|
||||
if (column_name == name)
|
||||
return true;
|
||||
|
||||
for (const auto & column : minmax_idx_sort_descr)
|
||||
if (column_name == column.column_name)
|
||||
for (const auto & name : minmax_idx_columns)
|
||||
if (column_name == name)
|
||||
return true;
|
||||
|
||||
if (const ASTFunction * func = typeid_cast<const ASTFunction *>(node.get()))
|
||||
|
@ -1,6 +1,5 @@
|
||||
#pragma once
|
||||
|
||||
#include <Core/SortDescription.h>
|
||||
#include <Common/SimpleIncrement.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/ExpressionActions.h>
|
||||
@ -480,11 +479,11 @@ public:
|
||||
broken_part_callback(name);
|
||||
}
|
||||
|
||||
bool hasPrimaryKey() const { return !primary_sort_descr.empty(); }
|
||||
bool hasPrimaryKey() const { return !primary_sort_columns.empty(); }
|
||||
ExpressionActionsPtr getPrimaryExpression() const { return primary_expr; }
|
||||
ExpressionActionsPtr getSecondarySortExpression() const { return secondary_sort_expr; } /// may return nullptr
|
||||
SortDescription getPrimarySortDescription() const { return primary_sort_descr; }
|
||||
SortDescription getSortDescription() const { return sort_descr; }
|
||||
Names getPrimarySortColumns() const { return primary_sort_columns; }
|
||||
Names getSortColumns() const { return sort_columns; }
|
||||
|
||||
/// Check that the part is not broken and calculate the checksums for it if they are not present.
|
||||
MutableDataPartPtr loadPartAndFixMetadata(const String & relative_path);
|
||||
@ -555,7 +554,6 @@ public:
|
||||
Names minmax_idx_columns;
|
||||
DataTypes minmax_idx_column_types;
|
||||
Int64 minmax_idx_date_column_pos = -1; /// In a common case minmax index includes a date column.
|
||||
SortDescription minmax_idx_sort_descr; /// For use with KeyCondition.
|
||||
|
||||
/// Limiting parallel sends per one table, used in DataPartsExchange
|
||||
std::atomic_uint current_table_sends {0};
|
||||
@ -576,10 +574,10 @@ private:
|
||||
ExpressionActionsPtr primary_expr;
|
||||
/// Additional expression for sorting (of rows with the same primary keys).
|
||||
ExpressionActionsPtr secondary_sort_expr;
|
||||
/// Sort description for primary key. Is the prefix of sort_descr.
|
||||
SortDescription primary_sort_descr;
|
||||
/// Sort description for primary key + secondary sorting columns.
|
||||
SortDescription sort_descr;
|
||||
/// Names of columns for primary key. Is the prefix of sort_columns.
|
||||
Names primary_sort_columns;
|
||||
/// Names of columns for primary key + secondary sorting columns.
|
||||
Names sort_columns;
|
||||
|
||||
String database_name;
|
||||
String table_name;
|
||||
|
@ -549,7 +549,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
|
||||
|
||||
Names all_column_names = data.getColumns().getNamesOfPhysical();
|
||||
NamesAndTypesList all_columns = data.getColumns().getAllPhysical();
|
||||
const SortDescription sort_desc = data.getSortDescription();
|
||||
|
||||
NamesAndTypesList gathering_columns, merging_columns;
|
||||
Names gathering_column_names, merging_column_names;
|
||||
@ -611,6 +610,15 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
|
||||
src_streams.emplace_back(std::move(input));
|
||||
}
|
||||
|
||||
Names sort_columns = data.getSortColumns();
|
||||
SortDescription sort_description;
|
||||
size_t sort_columns_size = sort_columns.size();
|
||||
sort_description.reserve(sort_columns_size);
|
||||
|
||||
Block header = src_streams.at(0)->getHeader();
|
||||
for (size_t i = 0; i < sort_columns_size; ++i)
|
||||
sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1);
|
||||
|
||||
/// The order of the streams is important: when the key is matched, the elements go in the order of the source stream number.
|
||||
/// In the merged part, the lines with the same key must be in the ascending order of the identifier of original part,
|
||||
/// that is going in insertion order.
|
||||
@ -620,38 +628,38 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
|
||||
{
|
||||
case MergeTreeData::MergingParams::Ordinary:
|
||||
merged_stream = std::make_unique<MergingSortedBlockInputStream>(
|
||||
src_streams, sort_desc, DEFAULT_MERGE_BLOCK_SIZE, 0, rows_sources_write_buf.get(), true);
|
||||
src_streams, sort_description, DEFAULT_MERGE_BLOCK_SIZE, 0, rows_sources_write_buf.get(), true);
|
||||
break;
|
||||
|
||||
case MergeTreeData::MergingParams::Collapsing:
|
||||
merged_stream = std::make_unique<CollapsingSortedBlockInputStream>(
|
||||
src_streams, sort_desc, data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE, rows_sources_write_buf.get());
|
||||
src_streams, sort_description, data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE, rows_sources_write_buf.get());
|
||||
break;
|
||||
|
||||
case MergeTreeData::MergingParams::Summing:
|
||||
merged_stream = std::make_unique<SummingSortedBlockInputStream>(
|
||||
src_streams, sort_desc, data.merging_params.columns_to_sum, DEFAULT_MERGE_BLOCK_SIZE);
|
||||
src_streams, sort_description, data.merging_params.columns_to_sum, DEFAULT_MERGE_BLOCK_SIZE);
|
||||
break;
|
||||
|
||||
case MergeTreeData::MergingParams::Aggregating:
|
||||
merged_stream = std::make_unique<AggregatingSortedBlockInputStream>(
|
||||
src_streams, sort_desc, DEFAULT_MERGE_BLOCK_SIZE);
|
||||
src_streams, sort_description, DEFAULT_MERGE_BLOCK_SIZE);
|
||||
break;
|
||||
|
||||
case MergeTreeData::MergingParams::Replacing:
|
||||
merged_stream = std::make_unique<ReplacingSortedBlockInputStream>(
|
||||
src_streams, sort_desc, data.merging_params.version_column, DEFAULT_MERGE_BLOCK_SIZE, rows_sources_write_buf.get());
|
||||
src_streams, sort_description, data.merging_params.version_column, DEFAULT_MERGE_BLOCK_SIZE, rows_sources_write_buf.get());
|
||||
break;
|
||||
|
||||
case MergeTreeData::MergingParams::Graphite:
|
||||
merged_stream = std::make_unique<GraphiteRollupSortedBlockInputStream>(
|
||||
src_streams, sort_desc, DEFAULT_MERGE_BLOCK_SIZE,
|
||||
src_streams, sort_description, DEFAULT_MERGE_BLOCK_SIZE,
|
||||
data.merging_params.graphite_params, time_of_merge);
|
||||
break;
|
||||
|
||||
case MergeTreeData::MergingParams::VersionedCollapsing:
|
||||
merged_stream = std::make_unique<VersionedCollapsingSortedBlockInputStream>(
|
||||
src_streams, sort_desc, data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE, false, rows_sources_write_buf.get());
|
||||
src_streams, sort_description, data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE, false, rows_sources_write_buf.get());
|
||||
break;
|
||||
|
||||
default:
|
||||
|
@ -448,7 +448,7 @@ void MergeTreeDataPart::loadIndex()
|
||||
.getSize() / MERGE_TREE_MARK_SIZE;
|
||||
}
|
||||
|
||||
size_t key_size = storage.primary_sort_descr.size();
|
||||
size_t key_size = storage.primary_sort_columns.size();
|
||||
|
||||
if (key_size)
|
||||
{
|
||||
@ -630,7 +630,7 @@ void MergeTreeDataPart::checkConsistency(bool require_part_metadata)
|
||||
|
||||
if (!checksums.empty())
|
||||
{
|
||||
if (!storage.primary_sort_descr.empty() && !checksums.files.count("primary.idx"))
|
||||
if (!storage.primary_sort_columns.empty() && !checksums.files.count("primary.idx"))
|
||||
throw Exception("No checksum for primary.idx", ErrorCodes::NO_FILE_IN_DATA_PART);
|
||||
|
||||
if (require_part_metadata)
|
||||
@ -683,7 +683,7 @@ void MergeTreeDataPart::checkConsistency(bool require_part_metadata)
|
||||
};
|
||||
|
||||
/// Check that the primary key index is not empty.
|
||||
if (!storage.primary_sort_descr.empty())
|
||||
if (!storage.primary_sort_columns.empty())
|
||||
check_file_not_empty(path + "primary.idx");
|
||||
|
||||
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
|
||||
|
@ -196,17 +196,18 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
|
||||
processed_stage = QueryProcessingStage::FetchColumns;
|
||||
|
||||
const Settings & settings = context.getSettingsRef();
|
||||
SortDescription sort_descr = data.getPrimarySortDescription();
|
||||
Names primary_sort_columns = data.getPrimarySortColumns();
|
||||
|
||||
KeyCondition key_condition(query_info, context, available_real_and_virtual_columns, sort_descr,
|
||||
data.getPrimaryExpression());
|
||||
KeyCondition key_condition(
|
||||
query_info, context, available_real_and_virtual_columns,
|
||||
primary_sort_columns, data.getPrimaryExpression());
|
||||
|
||||
if (settings.force_primary_key && key_condition.alwaysUnknownOrTrue())
|
||||
{
|
||||
std::stringstream exception_message;
|
||||
exception_message << "Primary key (";
|
||||
for (size_t i = 0, size = sort_descr.size(); i < size; ++i)
|
||||
exception_message << (i == 0 ? "" : ", ") << sort_descr[i].column_name;
|
||||
for (size_t i = 0, size = primary_sort_columns.size(); i < size; ++i)
|
||||
exception_message << (i == 0 ? "" : ", ") << primary_sort_columns[i];
|
||||
exception_message << ") is not used and setting 'force_primary_key' is set.";
|
||||
|
||||
throw Exception(exception_message.str(), ErrorCodes::INDEX_NOT_USED);
|
||||
@ -217,7 +218,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
|
||||
{
|
||||
minmax_idx_condition.emplace(
|
||||
query_info, context, available_real_and_virtual_columns,
|
||||
data.minmax_idx_sort_descr, data.minmax_idx_expr);
|
||||
data.minmax_idx_columns, data.minmax_idx_expr);
|
||||
|
||||
if (settings.force_index_by_date && minmax_idx_condition->alwaysUnknownOrTrue())
|
||||
{
|
||||
@ -781,36 +782,44 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal
|
||||
to_merge.emplace_back(std::make_shared<ExpressionBlockInputStream>(source_stream, data.getPrimaryExpression()));
|
||||
}
|
||||
|
||||
BlockInputStreamPtr merged;
|
||||
Names sort_columns = data.getSortColumns();
|
||||
SortDescription sort_description;
|
||||
size_t sort_columns_size = sort_columns.size();
|
||||
sort_description.reserve(sort_columns_size);
|
||||
|
||||
Block header = to_merge.at(0)->getHeader();
|
||||
for (size_t i = 0; i < sort_columns_size; ++i)
|
||||
sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1);
|
||||
|
||||
BlockInputStreamPtr merged;
|
||||
switch (data.merging_params.mode)
|
||||
{
|
||||
case MergeTreeData::MergingParams::Ordinary:
|
||||
merged = std::make_shared<MergingSortedBlockInputStream>(to_merge, data.getSortDescription(), max_block_size);
|
||||
merged = std::make_shared<MergingSortedBlockInputStream>(to_merge, sort_description, max_block_size);
|
||||
break;
|
||||
|
||||
case MergeTreeData::MergingParams::Collapsing:
|
||||
merged = std::make_shared<CollapsingFinalBlockInputStream>(
|
||||
to_merge, data.getSortDescription(), data.merging_params.sign_column);
|
||||
to_merge, sort_description, data.merging_params.sign_column);
|
||||
break;
|
||||
|
||||
case MergeTreeData::MergingParams::Summing:
|
||||
merged = std::make_shared<SummingSortedBlockInputStream>(to_merge,
|
||||
data.getSortDescription(), data.merging_params.columns_to_sum, max_block_size);
|
||||
sort_description, data.merging_params.columns_to_sum, max_block_size);
|
||||
break;
|
||||
|
||||
case MergeTreeData::MergingParams::Aggregating:
|
||||
merged = std::make_shared<AggregatingSortedBlockInputStream>(to_merge, data.getSortDescription(), max_block_size);
|
||||
merged = std::make_shared<AggregatingSortedBlockInputStream>(to_merge, sort_description, max_block_size);
|
||||
break;
|
||||
|
||||
case MergeTreeData::MergingParams::Replacing: /// TODO Make ReplacingFinalBlockInputStream
|
||||
merged = std::make_shared<ReplacingSortedBlockInputStream>(to_merge,
|
||||
data.getSortDescription(), data.merging_params.version_column, max_block_size);
|
||||
sort_description, data.merging_params.version_column, max_block_size);
|
||||
break;
|
||||
|
||||
case MergeTreeData::MergingParams::VersionedCollapsing: /// TODO Make VersionedCollapsingFinalBlockInputStream
|
||||
merged = std::make_shared<VersionedCollapsingSortedBlockInputStream>(
|
||||
to_merge, data.getSortDescription(), data.merging_params.sign_column, max_block_size, true);
|
||||
to_merge, sort_description, data.merging_params.sign_column, max_block_size, true);
|
||||
break;
|
||||
|
||||
case MergeTreeData::MergingParams::Graphite:
|
||||
|
@ -183,7 +183,13 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
|
||||
secondary_sort_expr->execute(block);
|
||||
}
|
||||
|
||||
SortDescription sort_descr = data.getSortDescription();
|
||||
Names sort_columns = data.getSortColumns();
|
||||
SortDescription sort_description;
|
||||
size_t sort_columns_size = sort_columns.size();
|
||||
sort_description.reserve(sort_columns_size);
|
||||
|
||||
for (size_t i = 0; i < sort_columns_size; ++i)
|
||||
sort_description.emplace_back(block.getPositionByName(sort_columns[i]), 1, 1);
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterBlocks);
|
||||
|
||||
@ -192,9 +198,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
|
||||
IColumn::Permutation perm;
|
||||
if (data.hasPrimaryKey())
|
||||
{
|
||||
if (!isAlreadySorted(block, sort_descr))
|
||||
if (!isAlreadySorted(block, sort_description))
|
||||
{
|
||||
stableGetPermutation(block, sort_descr, perm);
|
||||
stableGetPermutation(block, sort_description, perm);
|
||||
perm_ptr = &perm;
|
||||
}
|
||||
else
|
||||
|
@ -12,6 +12,7 @@
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <DataTypes/NestedUtils.h>
|
||||
#include <ext/scope_guard.h>
|
||||
#include <ext/collection_cast.h>
|
||||
#include <ext/map.h>
|
||||
#include <memory>
|
||||
#include <unordered_map>
|
||||
@ -39,8 +40,7 @@ MergeTreeWhereOptimizer::MergeTreeWhereOptimizer(
|
||||
const MergeTreeData & data,
|
||||
const Names & column_names,
|
||||
Logger * log)
|
||||
: primary_key_columns{ext::map<std::unordered_set>(data.getPrimarySortDescription(),
|
||||
[] (const SortColumnDescription & col) { return col.column_name; })},
|
||||
: primary_key_columns{ext::collection_cast<std::unordered_set>(data.getPrimarySortColumns())},
|
||||
table_columns{ext::map<std::unordered_set>(data.getColumns().getAllPhysical(),
|
||||
[] (const NameAndTypePair & col) { return col.name; })},
|
||||
block_with_constants{KeyCondition::getBlockWithConstants(query_info.query, context, data.getColumns().getAllPhysical())},
|
||||
|
@ -365,26 +365,20 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
|
||||
/// The set of written offset columns so that you do not write shared offsets of nested structures columns several times
|
||||
OffsetColumns offset_columns;
|
||||
|
||||
auto sort_description = storage.getPrimarySortDescription();
|
||||
auto sort_columns = storage.getPrimarySortColumns();
|
||||
|
||||
/// Here we will add the columns related to the Primary Key, then write the index.
|
||||
std::vector<ColumnWithTypeAndName> primary_columns(sort_description.size());
|
||||
std::vector<ColumnWithTypeAndName> primary_columns(sort_columns.size());
|
||||
std::map<String, size_t> primary_columns_name_to_position;
|
||||
|
||||
for (size_t i = 0, size = sort_description.size(); i < size; ++i)
|
||||
for (size_t i = 0, size = sort_columns.size(); i < size; ++i)
|
||||
{
|
||||
const auto & descr = sort_description[i];
|
||||
|
||||
String name = !descr.column_name.empty()
|
||||
? descr.column_name
|
||||
: block.safeGetByPosition(descr.column_number).name;
|
||||
const auto & name = sort_columns[i];
|
||||
|
||||
if (!primary_columns_name_to_position.emplace(name, i).second)
|
||||
throw Exception("Primary key contains duplicate columns", ErrorCodes::BAD_ARGUMENTS);
|
||||
|
||||
primary_columns[i] = !descr.column_name.empty()
|
||||
? block.getByName(descr.column_name)
|
||||
: block.safeGetByPosition(descr.column_number);
|
||||
primary_columns[i] = block.getByName(name);
|
||||
|
||||
/// Reorder primary key columns in advance and add them to `primary_columns`.
|
||||
if (permutation)
|
||||
@ -393,8 +387,8 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
|
||||
|
||||
if (index_columns.empty())
|
||||
{
|
||||
index_columns.resize(sort_description.size());
|
||||
for (size_t i = 0, size = sort_description.size(); i < size; ++i)
|
||||
index_columns.resize(sort_columns.size());
|
||||
for (size_t i = 0, size = sort_columns.size(); i < size; ++i)
|
||||
index_columns[i] = primary_columns[i].column->cloneEmpty();
|
||||
}
|
||||
|
||||
|
@ -6,16 +6,17 @@
|
||||
#include <boost/iterator/counting_iterator.hpp>
|
||||
|
||||
|
||||
/** \brief Numeric range iterator, used to represent a half-closed interval [begin, end).
|
||||
* In conjunction with std::reverse_iterator allows for forward and backward iteration
|
||||
* over corresponding interval. */
|
||||
/** Numeric range iterator, used to represent a half-closed interval [begin, end).
|
||||
* In conjunction with std::reverse_iterator allows for forward and backward iteration
|
||||
* over corresponding interval.
|
||||
*/
|
||||
namespace ext
|
||||
{
|
||||
template <typename T>
|
||||
using range_iterator = boost::counting_iterator<T>;
|
||||
|
||||
/** \brief Range-based for loop adapter for (reverse_)range_iterator.
|
||||
* By and large should be in conjunction with ext::range and ext::reverse_range.
|
||||
/** Range-based for loop adapter for (reverse_)range_iterator.
|
||||
* By and large should be in conjunction with ext::range and ext::reverse_range.
|
||||
*/
|
||||
template <typename T>
|
||||
struct range_wrapper
|
||||
@ -30,12 +31,12 @@ namespace ext
|
||||
iterator end() const { return iterator(end_); }
|
||||
};
|
||||
|
||||
/** \brief Constructs range_wrapper for forward-iteration over [begin, end) in range-based for loop.
|
||||
* Usage example:
|
||||
* for (const auto i : ext::range(0, 4)) print(i);
|
||||
* Output:
|
||||
* 0 1 2 3
|
||||
*/
|
||||
/** Constructs range_wrapper for forward-iteration over [begin, end) in range-based for loop.
|
||||
* Usage example:
|
||||
* for (const auto i : ext::range(0, 4)) print(i);
|
||||
* Output:
|
||||
* 0 1 2 3
|
||||
*/
|
||||
template <typename T1, typename T2>
|
||||
inline range_wrapper<typename std::common_type<T1, T2>::type> range(T1 begin, T2 end)
|
||||
{
|
||||
|
Loading…
Reference in New Issue
Block a user