Merge branch 'master' into return-not-nullable-from-count-distinct-2

This commit is contained in:
Alexey Milovidov 2020-06-17 01:15:57 +03:00
commit a655765040
34 changed files with 675 additions and 200 deletions

View File

@ -370,6 +370,46 @@ GROUP BY timeslot
└─────────────────────┴──────────────────────────────────────────────┴────────────────────────────────┘
```
## minMap(key, value), minMap(Tuple(key, value)) {#agg_functions-minmap}
Calculates the minimum from value array according to the keys specified in the key array.
Passing tuple of keys and values arrays is synonymical to passing two arrays of keys and values.
The number of elements in key and value must be the same for each row that is totaled.
Returns a tuple of two arrays: keys in sorted order, and values calculated for the corresponding keys.
Example:
```sql
SELECT minMap(a, b)
FROM values('a Array(Int32), b Array(Int64)', ([1, 2], [2, 2]), ([2, 3], [1, 1]))
```
```text
┌─minMap(a, b)──────┐
│ ([1,2,3],[2,1,1]) │
└───────────────────┘
```
## maxMap(key, value), maxMap(Tuple(key, value)) {#agg_functions-maxmap}
Calculates the maximum from value array according to the keys specified in the key array.
Passing tuple of keys and values arrays is synonymical to passing two arrays of keys and values.
The number of elements in key and value must be the same for each row that is totaled.
Returns a tuple of two arrays: keys in sorted order, and values calculated for the corresponding keys.
Example:
```sql
SELECT maxMap(a, b)
FROM values('a Array(Int32), b Array(Int64)', ([1, 2], [2, 2]), ([2, 3], [1, 1]))
```
```text
┌─maxMap(a, b)──────┐
│ ([1,2,3],[2,2,1]) │
└───────────────────┘
```
## skewPop {#skewpop}
Computes the [skewness](https://en.wikipedia.org/wiki/Skewness) of a sequence.

View File

@ -18,21 +18,6 @@ namespace ErrorCodes
namespace
{
template <bool overflow, bool tuple_argument>
struct SumMap
{
template <typename T>
using F = AggregateFunctionSumMap<T, overflow, tuple_argument>;
};
template <bool overflow, bool tuple_argument>
struct SumMapFiltered
{
template <typename T>
using F = AggregateFunctionSumMapFiltered<T, overflow, tuple_argument>;
};
auto parseArguments(const std::string & name, const DataTypes & arguments)
{
DataTypes args;
@ -85,30 +70,32 @@ auto parseArguments(const std::string & name, const DataTypes & arguments)
tuple_argument};
}
template <bool overflow>
AggregateFunctionPtr createAggregateFunctionSumMap(const std::string & name, const DataTypes & arguments, const Array & params)
// This function instantiates a particular overload of the sumMap family of
// functions.
// The template parameter MappedFunction<bool template_argument> is an aggregate
// function template that allows to choose the aggregate function variant that
// accepts either normal arguments or tuple argument.
template<template <bool tuple_argument> typename MappedFunction>
AggregateFunctionPtr createAggregateFunctionMap(const std::string & name, const DataTypes & arguments, const Array & params)
{
assertNoParameters(name, params);
auto [keys_type, values_types, tuple_argument] = parseArguments(name,
arguments);
auto [keys_type, values_types, tuple_argument] = parseArguments(name, arguments);
AggregateFunctionPtr res;
if (tuple_argument)
{
res.reset(createWithNumericBasedType<SumMap<overflow, true>::template F>(*keys_type, keys_type, values_types, arguments));
res.reset(createWithNumericBasedType<MappedFunction<true>::template F>(*keys_type, keys_type, values_types, arguments, params));
if (!res)
res.reset(createWithDecimalType<SumMap<overflow, true>::template F>(*keys_type, keys_type, values_types, arguments));
res.reset(createWithDecimalType<MappedFunction<true>::template F>(*keys_type, keys_type, values_types, arguments, params));
if (!res)
res.reset(createWithStringType<SumMap<overflow, true>::template F>(*keys_type, keys_type, values_types, arguments));
res.reset(createWithStringType<MappedFunction<true>::template F>(*keys_type, keys_type, values_types, arguments, params));
}
else
{
res.reset(createWithNumericBasedType<SumMap<overflow, false>::template F>(*keys_type, keys_type, values_types, arguments));
res.reset(createWithNumericBasedType<MappedFunction<false>::template F>(*keys_type, keys_type, values_types, arguments, params));
if (!res)
res.reset(createWithDecimalType<SumMap<overflow, false>::template F>(*keys_type, keys_type, values_types, arguments));
res.reset(createWithDecimalType<MappedFunction<false>::template F>(*keys_type, keys_type, values_types, arguments, params));
if (!res)
res.reset(createWithStringType<SumMap<overflow, false>::template F>(*keys_type, keys_type, values_types, arguments));
res.reset(createWithStringType<MappedFunction<false>::template F>(*keys_type, keys_type, values_types, arguments, params));
}
if (!res)
throw Exception("Illegal type of argument for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
@ -116,52 +103,66 @@ AggregateFunctionPtr createAggregateFunctionSumMap(const std::string & name, con
return res;
}
template <bool overflow>
AggregateFunctionPtr createAggregateFunctionSumMapFiltered(const std::string & name, const DataTypes & arguments, const Array & params)
// This template chooses the sumMap variant with given filtering and overflow
// handling.
template <bool filtered, bool overflow>
struct SumMapVariants
{
if (params.size() != 1)
throw Exception("Aggregate function " + name + " requires exactly one parameter of Array type.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
Array keys_to_keep;
if (!params.front().tryGet<Array>(keys_to_keep))
throw Exception("Aggregate function " + name + " requires an Array as parameter.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
auto [keys_type, values_types, tuple_argument] = parseArguments(name,
arguments);
AggregateFunctionPtr res;
if (tuple_argument)
// SumMapVariants chooses the `overflow` and `filtered` parameters of the
// aggregate functions. The `tuple_argument` and the value type `T` are left
// as free parameters.
// DispatchOnTupleArgument chooses `tuple_argument`, and the value type `T`
// is left free.
template <bool tuple_argument>
struct DispatchOnTupleArgument
{
res.reset(createWithNumericBasedType<SumMapFiltered<overflow, true>::template F>(*keys_type, keys_type, values_types, keys_to_keep, arguments, params));
if (!res)
res.reset(createWithDecimalType<SumMapFiltered<overflow, true>::template F>(*keys_type, keys_type, values_types, keys_to_keep, arguments, params));
if (!res)
res.reset(createWithStringType<SumMapFiltered<overflow, true>::template F>(*keys_type, keys_type, values_types, keys_to_keep, arguments, params));
}
else
{
res.reset(createWithNumericBasedType<SumMapFiltered<overflow, false>::template F>(*keys_type, keys_type, values_types, keys_to_keep, arguments, params));
if (!res)
res.reset(createWithDecimalType<SumMapFiltered<overflow, false>::template F>(*keys_type, keys_type, values_types, keys_to_keep, arguments, params));
if (!res)
res.reset(createWithStringType<SumMapFiltered<overflow, false>::template F>(*keys_type, keys_type, values_types, keys_to_keep, arguments, params));
}
if (!res)
throw Exception("Illegal type of argument for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
template <typename T>
using F = std::conditional_t<filtered,
AggregateFunctionSumMapFiltered<T, overflow, tuple_argument>,
AggregateFunctionSumMap<T, overflow, tuple_argument>>;
};
};
return res;
}
// This template gives an aggregate function template that is narrowed
// to accept either tuple argumen or normal argumens.
template <bool tuple_argument>
struct MinMapDispatchOnTupleArgument
{
template <typename T>
using F = AggregateFunctionMinMap<T, tuple_argument>;
};
// This template gives an aggregate function template that is narrowed
// to accept either tuple argumen or normal argumens.
template <bool tuple_argument>
struct MaxMapDispatchOnTupleArgument
{
template <typename T>
using F = AggregateFunctionMaxMap<T, tuple_argument>;
};
}
void registerAggregateFunctionSumMap(AggregateFunctionFactory & factory)
{
factory.registerFunction("sumMap", createAggregateFunctionSumMap<false /*overflow*/>);
factory.registerFunction("sumMapWithOverflow", createAggregateFunctionSumMap<true /*overflow*/>);
factory.registerFunction("sumMapFiltered", createAggregateFunctionSumMapFiltered<false /*overflow*/>);
factory.registerFunction("sumMapFilteredWithOverflow", createAggregateFunctionSumMapFiltered<true /*overflow*/>);
factory.registerFunction("sumMap", createAggregateFunctionMap<
SumMapVariants<false, false>::DispatchOnTupleArgument>);
factory.registerFunction("sumMapWithOverflow", createAggregateFunctionMap<
SumMapVariants<false, true>::DispatchOnTupleArgument>);
factory.registerFunction("sumMapFiltered", createAggregateFunctionMap<
SumMapVariants<true, false>::DispatchOnTupleArgument>);
factory.registerFunction("sumMapFilteredWithOverflow",
createAggregateFunctionMap<
SumMapVariants<true, true>::DispatchOnTupleArgument>);
factory.registerFunction("minMap",
createAggregateFunctionMap<MinMapDispatchOnTupleArgument>);
factory.registerFunction("maxMap",
createAggregateFunctionMap<MaxMapDispatchOnTupleArgument>);
}
}

View File

@ -25,19 +25,20 @@ namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
template <typename T>
struct AggregateFunctionSumMapData
struct AggregateFunctionMapData
{
// Map needs to be ordered to maintain function properties
std::map<T, Array> merged_maps;
};
/** Aggregate function, that takes at least two arguments: keys and values, and as a result, builds a tuple of of at least 2 arrays -
* ordered keys and variable number of argument values summed up by corresponding keys.
* ordered keys and variable number of argument values aggregated by corresponding keys.
*
* This function is the most useful when using SummingMergeTree to sum Nested columns, which name ends in "Map".
* sumMap function is the most useful when using SummingMergeTree to sum Nested columns, which name ends in "Map".
*
* Example: sumMap(k, v...) of:
* k v
@ -49,24 +50,27 @@ struct AggregateFunctionSumMapData
* [8,9,10] [20,20,20]
* will return:
* ([1,2,3,4,5,6,7,8,9,10],[10,10,45,20,35,20,15,30,20,20])
*
* minMap and maxMap share the same idea, but calculate min and max correspondingly.
*/
template <typename T, typename Derived, bool overflow, bool tuple_argument>
class AggregateFunctionSumMapBase : public IAggregateFunctionDataHelper<
AggregateFunctionSumMapData<NearestFieldType<T>>, Derived>
template <typename T, typename Derived, typename Visitor, bool overflow, bool tuple_argument>
class AggregateFunctionMapBase : public IAggregateFunctionDataHelper<
AggregateFunctionMapData<NearestFieldType<T>>, Derived>
{
private:
DataTypePtr keys_type;
DataTypes values_types;
public:
AggregateFunctionSumMapBase(
const DataTypePtr & keys_type_, const DataTypes & values_types_,
const DataTypes & argument_types_, const Array & params_)
: IAggregateFunctionDataHelper<AggregateFunctionSumMapData<NearestFieldType<T>>, Derived>(argument_types_, params_)
, keys_type(keys_type_), values_types(values_types_) {}
using Base = IAggregateFunctionDataHelper<
AggregateFunctionMapData<NearestFieldType<T>>, Derived>;
String getName() const override { return "sumMap"; }
AggregateFunctionMapBase(const DataTypePtr & keys_type_,
const DataTypes & values_types_, const DataTypes & argument_types_)
: Base(argument_types_, {} /* parameters */), keys_type(keys_type_),
values_types(values_types_)
{}
DataTypePtr getReturnType() const override
{
@ -88,7 +92,7 @@ public:
// No overflow, meaning we promote the types if necessary.
if (!value_type->canBePromoted())
{
throw Exception{"Values to be summed are expected to be Numeric, Float or Decimal.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
throw Exception{"Values for " + getName() + " are expected to be Numeric, Float or Decimal.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
}
result_type = value_type->promoteNumericType();
@ -161,7 +165,7 @@ public:
if (it != merged_maps.end())
{
applyVisitor(FieldVisitorSum(value), it->second[col]);
applyVisitor(Visitor(value), it->second[col]);
}
else
{
@ -198,7 +202,7 @@ public:
if (it != merged_maps.end())
{
for (size_t col = 0; col < values_types.size(); ++col)
applyVisitor(FieldVisitorSum(elem.second[col]), it->second[col]);
applyVisitor(Visitor(elem.second[col]), it->second[col]);
}
else
merged_maps[elem.first] = elem.second;
@ -300,20 +304,27 @@ public:
}
bool keepKey(const T & key) const { return static_cast<const Derived &>(*this).keepKey(key); }
String getName() const override { return static_cast<const Derived &>(*this).getName(); }
};
template <typename T, bool overflow, bool tuple_argument>
class AggregateFunctionSumMap final :
public AggregateFunctionSumMapBase<T, AggregateFunctionSumMap<T, overflow, tuple_argument>, overflow, tuple_argument>
public AggregateFunctionMapBase<T, AggregateFunctionSumMap<T, overflow, tuple_argument>, FieldVisitorSum, overflow, tuple_argument>
{
private:
using Self = AggregateFunctionSumMap<T, overflow, tuple_argument>;
using Base = AggregateFunctionSumMapBase<T, Self, overflow, tuple_argument>;
using Base = AggregateFunctionMapBase<T, Self, FieldVisitorSum, overflow, tuple_argument>;
public:
AggregateFunctionSumMap(const DataTypePtr & keys_type_, DataTypes & values_types_, const DataTypes & argument_types_)
: Base{keys_type_, values_types_, argument_types_, {}}
{}
AggregateFunctionSumMap(const DataTypePtr & keys_type_,
DataTypes & values_types_, const DataTypes & argument_types_,
const Array & params_)
: Base{keys_type_, values_types_, argument_types_}
{
// The constructor accepts parameters to have a uniform interface with
// sumMapFiltered, but this function doesn't have any parameters.
assertNoParameters(getName(), params_);
}
String getName() const override { return "sumMap"; }
@ -322,23 +333,35 @@ public:
template <typename T, bool overflow, bool tuple_argument>
class AggregateFunctionSumMapFiltered final :
public AggregateFunctionSumMapBase<T,
public AggregateFunctionMapBase<T,
AggregateFunctionSumMapFiltered<T, overflow, tuple_argument>,
FieldVisitorSum,
overflow,
tuple_argument>
{
private:
using Self = AggregateFunctionSumMapFiltered<T, overflow, tuple_argument>;
using Base = AggregateFunctionSumMapBase<T, Self, overflow, tuple_argument>;
using Base = AggregateFunctionMapBase<T, Self, FieldVisitorSum, overflow, tuple_argument>;
std::unordered_set<T> keys_to_keep;
public:
AggregateFunctionSumMapFiltered(
const DataTypePtr & keys_type_, const DataTypes & values_types_, const Array & keys_to_keep_,
const DataTypes & argument_types_, const Array & params_)
: Base{keys_type_, values_types_, argument_types_, params_}
AggregateFunctionSumMapFiltered(const DataTypePtr & keys_type_,
const DataTypes & values_types_, const DataTypes & argument_types_,
const Array & params_)
: Base{keys_type_, values_types_, argument_types_}
{
if (params_.size() != 1)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Aggregate function '{}' requires exactly one parameter "
"of Array type", getName());
Array keys_to_keep_;
if (!params_.front().tryGet<Array>(keys_to_keep_))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Aggregate function {} requires an Array as a parameter",
getName());
keys_to_keep.reserve(keys_to_keep_.size());
for (const Field & f : keys_to_keep_)
{
@ -346,9 +369,58 @@ public:
}
}
String getName() const override { return "sumMapFiltered"; }
String getName() const override
{ return overflow ? "sumMapFilteredWithOverflow" : "sumMapFiltered"; }
bool keepKey(const T & key) const { return keys_to_keep.count(key); }
};
template <typename T, bool tuple_argument>
class AggregateFunctionMinMap final :
public AggregateFunctionMapBase<T, AggregateFunctionMinMap<T, tuple_argument>, FieldVisitorMin, true, tuple_argument>
{
private:
using Self = AggregateFunctionMinMap<T, tuple_argument>;
using Base = AggregateFunctionMapBase<T, Self, FieldVisitorMin, true, tuple_argument>;
public:
AggregateFunctionMinMap(const DataTypePtr & keys_type_,
DataTypes & values_types_, const DataTypes & argument_types_,
const Array & params_)
: Base{keys_type_, values_types_, argument_types_}
{
// The constructor accepts parameters to have a uniform interface with
// sumMapFiltered, but this function doesn't have any parameters.
assertNoParameters(getName(), params_);
}
String getName() const override { return "minMap"; }
bool keepKey(const T &) const { return true; }
};
template <typename T, bool tuple_argument>
class AggregateFunctionMaxMap final :
public AggregateFunctionMapBase<T, AggregateFunctionMaxMap<T, tuple_argument>, FieldVisitorMax, true, tuple_argument>
{
private:
using Self = AggregateFunctionMaxMap<T, tuple_argument>;
using Base = AggregateFunctionMapBase<T, Self, FieldVisitorMax, true, tuple_argument>;
public:
AggregateFunctionMaxMap(const DataTypePtr & keys_type_,
DataTypes & values_types_, const DataTypes & argument_types_,
const Array & params_)
: Base{keys_type_, values_types_, argument_types_}
{
// The constructor accepts parameters to have a uniform interface with
// sumMapFiltered, but this function doesn't have any parameters.
assertNoParameters(getName(), params_);
}
String getName() const override { return "maxMap"; }
bool keepKey(const T &) const { return true; }
};
}

View File

@ -210,4 +210,88 @@ public:
}
};
/** Implements `Max` operation.
* Returns true if changed
*/
class FieldVisitorMax : public StaticVisitor<bool>
{
private:
const Field & rhs;
public:
explicit FieldVisitorMax(const Field & rhs_) : rhs(rhs_) {}
bool operator() (Null &) const { throw Exception("Cannot compare Nulls", ErrorCodes::LOGICAL_ERROR); }
bool operator() (Array &) const { throw Exception("Cannot compare Arrays", ErrorCodes::LOGICAL_ERROR); }
bool operator() (Tuple &) const { throw Exception("Cannot compare Tuples", ErrorCodes::LOGICAL_ERROR); }
bool operator() (AggregateFunctionStateData &) const { throw Exception("Cannot compare AggregateFunctionStates", ErrorCodes::LOGICAL_ERROR); }
template <typename T>
bool operator() (DecimalField<T> & x) const
{
auto val = get<DecimalField<T>>(rhs);
if (val > x)
{
x = val;
return true;
}
return false;
}
template <typename T>
bool operator() (T & x) const
{
auto val = get<T>(rhs);
if (val > x)
{
x = val;
return true;
}
return false;
}
};
/** Implements `Min` operation.
* Returns true if changed
*/
class FieldVisitorMin : public StaticVisitor<bool>
{
private:
const Field & rhs;
public:
explicit FieldVisitorMin(const Field & rhs_) : rhs(rhs_) {}
bool operator() (Null &) const { throw Exception("Cannot compare Nulls", ErrorCodes::LOGICAL_ERROR); }
bool operator() (Array &) const { throw Exception("Cannot sum Arrays", ErrorCodes::LOGICAL_ERROR); }
bool operator() (Tuple &) const { throw Exception("Cannot sum Tuples", ErrorCodes::LOGICAL_ERROR); }
bool operator() (AggregateFunctionStateData &) const { throw Exception("Cannot sum AggregateFunctionStates", ErrorCodes::LOGICAL_ERROR); }
template <typename T>
bool operator() (DecimalField<T> & x) const
{
auto val = get<DecimalField<T>>(rhs);
if (val < x)
{
x = val;
return true;
}
return false;
}
template <typename T>
bool operator() (T & x) const
{
auto val = get<T>(rhs);
if (val < x)
{
x = val;
return true;
}
return false;
}
};
}

View File

@ -163,6 +163,8 @@ using BlocksPtrs = std::shared_ptr<std::vector<BlocksPtr>>;
struct ExtraBlock
{
Block block;
bool empty() const { return !block; }
};
using ExtraBlockPtr = std::shared_ptr<ExtraBlock>;

View File

@ -289,6 +289,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingJoinAlgorithm, join_algorithm, JoinAlgorithm::HASH, "Specify join algorithm: 'auto', 'hash', 'partial_merge', 'prefer_partial_merge'. 'auto' tries to change HashJoin to MergeJoin on the fly to avoid out of memory.", 0) \
M(SettingBool, partial_merge_join_optimizations, true, "Enable optimizations in partial merge join", 0) \
M(SettingUInt64, default_max_bytes_in_join, 1000000000, "Maximum size of right-side table if limit is required but max_bytes_in_join is not set.", 0) \
M(SettingUInt64, partial_merge_join_left_table_buffer_bytes, 32000000, "If not 0 group left table blocks in bigger ones for left-side table in partial merge join. It uses up to 2x of specified memory per joining thread. In current version work only with 'partial_merge_join_optimizations = 1'.", 0) \
M(SettingUInt64, partial_merge_join_rows_in_right_blocks, 65536, "Split right-hand joining data in blocks of specified size. It's a portion of data indexed by min-max values and possibly unloaded on disk.", 0) \
M(SettingUInt64, join_on_disk_max_files_to_merge, 64, "For MergeJoin on disk set how much files it's allowed to sort simultaneously. Then this value bigger then more memory used and then less disk I/O needed. Minimum is 2.", 0) \
M(SettingString, temporary_files_codec, "LZ4", "Set compression codec for temporary files (sort and join on disk). I.e. LZ4, NONE.", 0) \

View File

@ -55,10 +55,14 @@ Block InflatingExpressionBlockInputStream::readImpl()
}
Block res;
if (likely(!not_processed))
bool keep_going = not_processed && not_processed->empty(); /// There's data inside expression.
if (!not_processed || keep_going)
{
not_processed.reset();
res = children.back()->read();
if (res)
if (res || keep_going)
expression->execute(res, not_processed, action_number);
}
else

View File

@ -1152,13 +1152,20 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
}
}
bool has_stream_with_non_joined_rows = (before_join && before_join->getTableJoinAlgo()->hasStreamWithNonJoinedRows());
bool join_allow_read_in_order = true;
if (before_join)
{
/// You may find it strange but we support read_in_order for HashJoin and do not support for MergeJoin.
auto join = before_join->getTableJoinAlgo();
join_allow_read_in_order = typeid_cast<HashJoin *>(join.get()) && !join->hasStreamWithNonJoinedRows();
}
optimize_read_in_order =
settings.optimize_read_in_order
&& storage && query.orderBy()
&& !query_analyzer.hasAggregation()
&& !query.final()
&& !has_stream_with_non_joined_rows;
&& join_allow_read_in_order;
/// If there is aggregation, we execute expressions in SELECT and ORDER BY on the initiating server, otherwise on the source servers.
query_analyzer.appendSelect(chain, only_types || (need_aggregate ? !second_stage : !first_stage));

View File

@ -29,12 +29,12 @@ namespace ErrorCodes
namespace
{
template <bool has_nulls>
template <bool has_left_nulls, bool has_right_nulls>
int nullableCompareAt(const IColumn & left_column, const IColumn & right_column, size_t lhs_pos, size_t rhs_pos)
{
static constexpr int null_direction_hint = 1;
if constexpr (has_nulls)
if constexpr (has_left_nulls && has_right_nulls)
{
const auto * left_nullable = checkAndGetColumn<ColumnNullable>(left_column);
const auto * right_nullable = checkAndGetColumn<ColumnNullable>(right_column);
@ -48,16 +48,24 @@ int nullableCompareAt(const IColumn & left_column, const IColumn & right_column,
/// NULL != NULL case
if (left_column.isNullAt(lhs_pos))
return null_direction_hint;
}
if (left_nullable && !right_nullable)
return 0;
}
}
if constexpr (has_left_nulls)
{
if (const auto * left_nullable = checkAndGetColumn<ColumnNullable>(left_column))
{
if (left_column.isNullAt(lhs_pos))
return null_direction_hint;
return left_nullable->getNestedColumn().compareAt(lhs_pos, rhs_pos, right_column, null_direction_hint);
}
}
if (!left_nullable && right_nullable)
if constexpr (has_right_nulls)
{
if (const auto * right_nullable = checkAndGetColumn<ColumnNullable>(right_column))
{
if (right_column.isNullAt(rhs_pos))
return -null_direction_hint;
@ -65,7 +73,6 @@ int nullableCompareAt(const IColumn & left_column, const IColumn & right_column,
}
}
/// !left_nullable && !right_nullable
return left_column.compareAt(lhs_pos, rhs_pos, right_column, null_direction_hint);
}
@ -118,26 +125,25 @@ public:
void setCompareNullability(const MergeJoinCursor & rhs)
{
has_nullable_columns = false;
has_left_nullable = false;
has_right_nullable = false;
for (size_t i = 0; i < impl.sort_columns_size; ++i)
{
bool is_left_nullable = isColumnNullable(*impl.sort_columns[i]);
bool is_right_nullable = isColumnNullable(*rhs.impl.sort_columns[i]);
if (is_left_nullable || is_right_nullable)
{
has_nullable_columns = true;
break;
}
has_left_nullable = has_left_nullable || isColumnNullable(*impl.sort_columns[i]);
has_right_nullable = has_right_nullable || isColumnNullable(*rhs.impl.sort_columns[i]);
}
}
Range getNextEqualRange(MergeJoinCursor & rhs)
{
if (has_nullable_columns)
return getNextEqualRangeImpl<true>(rhs);
return getNextEqualRangeImpl<false>(rhs);
if (has_left_nullable && has_right_nullable)
return getNextEqualRangeImpl<true, true>(rhs);
else if (has_left_nullable)
return getNextEqualRangeImpl<true, false>(rhs);
else if (has_right_nullable)
return getNextEqualRangeImpl<false, true>(rhs);
return getNextEqualRangeImpl<false, false>(rhs);
}
int intersect(const Block & min_max, const Names & key_names)
@ -149,16 +155,16 @@ public:
int first_vs_max = 0;
int last_vs_min = 0;
for (size_t i = 0; i < impl.sort_columns.size(); ++i)
for (size_t i = 0; i < impl.sort_columns_size; ++i)
{
const auto & left_column = *impl.sort_columns[i];
const auto & right_column = *min_max.getByName(key_names[i]).column; /// cannot get by position cause of possible duplicates
if (!first_vs_max)
first_vs_max = nullableCompareAt<true>(left_column, right_column, position(), 1);
first_vs_max = nullableCompareAt<true, true>(left_column, right_column, position(), 1);
if (!last_vs_min)
last_vs_min = nullableCompareAt<true>(left_column, right_column, last_position, 0);
last_vs_min = nullableCompareAt<true, true>(left_column, right_column, last_position, 0);
}
if (first_vs_max > 0)
@ -170,64 +176,56 @@ public:
private:
SortCursorImpl impl;
bool has_nullable_columns = false;
bool has_left_nullable = false;
bool has_right_nullable = false;
template <bool has_nulls>
template <bool left_nulls, bool right_nulls>
Range getNextEqualRangeImpl(MergeJoinCursor & rhs)
{
while (!atEnd() && !rhs.atEnd())
{
int cmp = compareAt<has_nulls>(rhs, impl.pos, rhs.impl.pos);
int cmp = compareAtCursor<left_nulls, right_nulls>(rhs);
if (cmp < 0)
impl.next();
if (cmp > 0)
else if (cmp > 0)
rhs.impl.next();
if (!cmp)
{
Range range{impl.pos, rhs.impl.pos, 0, 0};
range.left_length = getEqualLength();
range.right_length = rhs.getEqualLength();
return range;
}
else if (!cmp)
return Range{impl.pos, rhs.impl.pos, getEqualLength(), rhs.getEqualLength()};
}
return Range{impl.pos, rhs.impl.pos, 0, 0};
}
template <bool has_nulls>
int compareAt(const MergeJoinCursor & rhs, size_t lhs_pos, size_t rhs_pos) const
template <bool left_nulls, bool right_nulls>
int ALWAYS_INLINE compareAtCursor(const MergeJoinCursor & rhs) const
{
int res = 0;
for (size_t i = 0; i < impl.sort_columns_size; ++i)
{
const auto * left_column = impl.sort_columns[i];
const auto * right_column = rhs.impl.sort_columns[i];
res = nullableCompareAt<has_nulls>(*left_column, *right_column, lhs_pos, rhs_pos);
int res = nullableCompareAt<left_nulls, right_nulls>(*left_column, *right_column, impl.pos, rhs.impl.pos);
if (res)
break;
return res;
}
return res;
return 0;
}
/// Expects !atEnd()
size_t getEqualLength()
{
if (atEnd())
return 0;
size_t pos = impl.pos;
while (sameNext(pos))
++pos;
return pos - impl.pos + 1;
size_t pos = impl.pos + 1;
for (; pos < impl.rows; ++pos)
if (!samePrev(pos))
break;
return pos - impl.pos;
}
bool sameNext(size_t lhs_pos) const
/// Expects lhs_pos > 0
bool ALWAYS_INLINE samePrev(size_t lhs_pos) const
{
if (lhs_pos + 1 >= impl.rows)
return false;
for (size_t i = 0; i < impl.sort_columns_size; ++i)
if (impl.sort_columns[i]->compareAt(lhs_pos, lhs_pos + 1, *(impl.sort_columns[i]), 1) != 0)
if (impl.sort_columns[i]->compareAt(lhs_pos - 1, lhs_pos, *(impl.sort_columns[i]), 1) != 0)
return false;
return true;
}
@ -359,7 +357,6 @@ MergeJoin::MergeJoin(std::shared_ptr<TableJoin> table_join_, const Block & right
, is_semi_join(table_join->strictness() == ASTTableJoin::Strictness::Semi)
, is_inner(isInner(table_join->kind()))
, is_left(isLeft(table_join->kind()))
, skip_not_intersected(table_join->enablePartialMergeJoinOptimizations())
, max_joined_block_rows(table_join->maxJoinedBlockRows())
, max_rows_in_right_block(table_join->maxRowsInRightBlock())
, max_files_to_merge(table_join->maxFilesToMerge())
@ -407,6 +404,11 @@ MergeJoin::MergeJoin(std::shared_ptr<TableJoin> table_join_, const Block & right
makeSortAndMerge(table_join->keyNamesLeft(), left_sort_description, left_merge_description);
makeSortAndMerge(table_join->keyNamesRight(), right_sort_description, right_merge_description);
/// Temporary disable 'partial_merge_join_left_table_buffer_bytes' without 'partial_merge_join_optimizations'
if (table_join->enablePartialMergeJoinOptimizations())
if (size_t max_bytes = table_join->maxBytesInLeftBuffer())
left_blocks_buffer = std::make_shared<SortedBlocksBuffer>(left_sort_description, max_bytes);
}
void MergeJoin::setTotals(const Block & totals_block)
@ -499,9 +501,7 @@ bool MergeJoin::saveRightBlock(Block && block)
bool has_memory = size_limits.softCheck(right_blocks.row_count, right_blocks.bytes);
if (!has_memory)
{
disk_writer = std::make_unique<SortedBlocksWriter>(size_limits, table_join->getTemporaryVolume(),
right_sample_block, right_sort_description, right_blocks,
max_rows_in_right_block, max_files_to_merge, table_join->temporaryFilesCodec());
initRightTableWriter();
is_in_memory = false;
}
}
@ -521,11 +521,23 @@ bool MergeJoin::addJoinedBlock(const Block & src_block, bool)
void MergeJoin::joinBlock(Block & block, ExtraBlockPtr & not_processed)
{
JoinCommon::checkTypesOfKeys(block, table_join->keyNamesLeft(), right_table_keys, table_join->keyNamesRight());
materializeBlockInplace(block);
JoinCommon::removeLowCardinalityInplace(block);
if (block)
{
JoinCommon::checkTypesOfKeys(block, table_join->keyNamesLeft(), right_table_keys, table_join->keyNamesRight());
materializeBlockInplace(block);
JoinCommon::removeLowCardinalityInplace(block);
sortBlock(block, left_sort_description);
}
if (!not_processed && left_blocks_buffer)
{
if (!block || block.rows())
block = left_blocks_buffer->exchange(std::move(block));
if (!block)
return;
}
sortBlock(block, left_sort_description);
if (is_in_memory)
{
if (is_all_join)
@ -540,12 +552,16 @@ void MergeJoin::joinBlock(Block & block, ExtraBlockPtr & not_processed)
else
joinSortedBlock<false, false>(block, not_processed);
}
/// Back thread even with no data. We have some unfinished data in buffer.
if (!not_processed && left_blocks_buffer)
not_processed = std::make_shared<NotProcessed>(NotProcessed{{}, 0, 0, 0});
}
template <bool in_memory, bool is_all>
void MergeJoin::joinSortedBlock(Block & block, ExtraBlockPtr & not_processed)
{
std::shared_lock lock(rwlock);
//std::shared_lock lock(rwlock);
size_t rows_to_reserve = is_left ? block.rows() : 0;
MutableColumns left_columns = makeMutableColumns(block, (is_all ? rows_to_reserve : 0));
@ -829,4 +845,13 @@ std::shared_ptr<Block> MergeJoin::loadRightBlock(size_t pos)
return loaded_right_blocks[pos];
}
void MergeJoin::initRightTableWriter()
{
disk_writer = std::make_unique<SortedBlocksWriter>(size_limits, table_join->getTemporaryVolume(),
right_sample_block, right_sort_description, max_rows_in_right_block, max_files_to_merge,
table_join->temporaryFilesCodec());
disk_writer->addBlocks(right_blocks);
right_blocks.clear();
}
}

View File

@ -16,6 +16,7 @@ class TableJoin;
class MergeJoinCursor;
struct MergeJoinEqualRange;
class MergeJoin : public IJoin
{
public:
@ -58,6 +59,7 @@ private:
Block right_columns_to_add;
SortedBlocksWriter::Blocks right_blocks;
Blocks min_max_right_blocks;
std::shared_ptr<SortedBlocksBuffer> left_blocks_buffer;
std::unique_ptr<Cache> cached_right_blocks;
std::vector<std::shared_ptr<Block>> loaded_right_blocks;
std::unique_ptr<SortedBlocksWriter> disk_writer;
@ -70,7 +72,7 @@ private:
const bool is_semi_join;
const bool is_inner;
const bool is_left;
const bool skip_not_intersected;
static constexpr const bool skip_not_intersected = true; /// skip index for right blocks
const size_t max_joined_block_rows;
const size_t max_rows_in_right_block;
const size_t max_files_to_merge;
@ -103,6 +105,8 @@ private:
void mergeInMemoryRightBlocks();
void mergeFlushedRightBlocks();
void initRightTableWriter();
};
}

View File

@ -136,18 +136,25 @@ SortedBlocksWriter::TmpFilePtr SortedBlocksWriter::flush(const BlocksList & bloc
return flushToFile(path, sample_block, sorted_input, codec);
}
SortedBlocksWriter::SortedFiles SortedBlocksWriter::finishMerge(std::function<void(const Block &)> callback)
SortedBlocksWriter::PremergedFiles SortedBlocksWriter::premerge()
{
SortedFiles files;
BlocksList blocks;
/// wait other flushes if any
{
std::unique_lock lock{insert_mutex};
files.swap(sorted_files);
blocks.swap(inserted_blocks.blocks);
inserted_blocks.clear();
flush_condvar.wait(lock, [&]{ return !flush_inflight; });
}
/// flush not flushed
if (!inserted_blocks.empty())
sorted_files.emplace_back(flush(inserted_blocks.blocks));
inserted_blocks.clear();
if (!blocks.empty())
files.emplace_back(flush(blocks));
BlockInputStreams inputs;
inputs.reserve(num_files_for_merge);
@ -155,15 +162,15 @@ SortedBlocksWriter::SortedFiles SortedBlocksWriter::finishMerge(std::function<vo
/// Merge by parts to save memory. It's possible to exchange disk I/O and memory by num_files_for_merge.
{
SortedFiles new_files;
new_files.reserve(sorted_files.size() / num_files_for_merge + 1);
new_files.reserve(files.size() / num_files_for_merge + 1);
while (sorted_files.size() > num_files_for_merge)
while (files.size() > num_files_for_merge)
{
for (const auto & file : sorted_files)
for (const auto & file : files)
{
inputs.emplace_back(streamFromFile(file));
if (inputs.size() == num_files_for_merge || &file == &sorted_files.back())
if (inputs.size() == num_files_for_merge || &file == &files.back())
{
MergingSortedBlockInputStream sorted_input(inputs, sort_description, rows_in_block);
new_files.emplace_back(flushToFile(getPath(), sample_block, sorted_input, codec));
@ -171,19 +178,22 @@ SortedBlocksWriter::SortedFiles SortedBlocksWriter::finishMerge(std::function<vo
}
}
sorted_files.clear();
sorted_files.swap(new_files);
files.clear();
files.swap(new_files);
}
for (const auto & file : sorted_files)
for (const auto & file : files)
inputs.emplace_back(streamFromFile(file));
}
MergingSortedBlockInputStream sorted_input(inputs, sort_description, rows_in_block);
return PremergedFiles{std::move(files), std::move(inputs)};
}
SortedFiles out = flushToManyFiles(getPath(), sample_block, sorted_input, codec, callback);
sorted_files.clear();
return out; /// There're also inserted_blocks counters as indirect output
SortedBlocksWriter::SortedFiles SortedBlocksWriter::finishMerge(std::function<void(const Block &)> callback)
{
PremergedFiles files = premerge();
MergingSortedBlockInputStream sorted_input(files.streams, sort_description, rows_in_block);
return flushToManyFiles(getPath(), sample_block, sorted_input, codec, callback);
}
BlockInputStreamPtr SortedBlocksWriter::streamFromFile(const TmpFilePtr & file) const
@ -196,4 +206,87 @@ String SortedBlocksWriter::getPath() const
return volume->getNextDisk()->getPath();
}
Block SortedBlocksBuffer::exchange(Block && block)
{
static constexpr const float reserve_coef = 1.2;
Blocks out_blocks;
Block empty_out = block.cloneEmpty();
{
std::lock_guard lock(mutex);
if (block)
{
current_bytes += block.bytes();
buffer.emplace_back(std::move(block));
/// Saved. Return empty block with same structure.
if (current_bytes < max_bytes)
return empty_out;
}
/// Not saved. Return buffered.
out_blocks.swap(buffer);
buffer.reserve(out_blocks.size() * reserve_coef);
current_bytes = 0;
}
if (size_t size = out_blocks.size())
{
if (size == 1)
return out_blocks[0];
return mergeBlocks(std::move(out_blocks));
}
return {};
}
Block SortedBlocksBuffer::mergeBlocks(Blocks && blocks) const
{
size_t num_rows = 0;
{ /// Merge sort blocks
BlockInputStreams inputs;
inputs.reserve(blocks.size());
for (auto & block : blocks)
{
num_rows += block.rows();
inputs.emplace_back(std::make_shared<OneBlockInputStream>(block));
}
Blocks tmp_blocks;
MergingSortedBlockInputStream stream(inputs, sort_description, num_rows);
while (const auto & block = stream.read())
tmp_blocks.emplace_back(block);
blocks.swap(tmp_blocks);
}
if (blocks.size() == 1)
return blocks[0];
Block out = blocks[0].cloneEmpty();
{ /// Concatenate blocks
MutableColumns columns = out.mutateColumns();
for (size_t i = 0; i < columns.size(); ++i)
{
columns[i]->reserve(num_rows);
for (const auto & block : blocks)
{
const auto & tmp_column = *block.getByPosition(i).column;
columns[i]->insertRangeFrom(tmp_column, 0, block.rows());
}
}
out.setColumns(std::move(columns));
}
return out;
}
}

View File

@ -52,15 +52,21 @@ struct SortedBlocksWriter
}
};
struct PremergedFiles
{
SortedFiles files;
BlockInputStreams streams;
};
static constexpr const size_t num_streams = 2;
std::mutex insert_mutex;
std::condition_variable flush_condvar;
const SizeLimits & size_limits;
VolumeJBODPtr volume;
const Block & sample_block;
Block sample_block;
const SortDescription & sort_description;
Blocks & inserted_blocks;
Blocks inserted_blocks;
const size_t rows_in_block;
const size_t num_files_for_merge;
const String & codec;
@ -70,19 +76,20 @@ struct SortedBlocksWriter
size_t flush_number = 0;
size_t flush_inflight = 0;
SortedBlocksWriter(const SizeLimits & size_limits_, VolumeJBODPtr volume_, const Block & sample_block_, const SortDescription & description,
Blocks & blocks, size_t rows_in_block_, size_t num_files_to_merge_, const String & codec_)
SortedBlocksWriter(const SizeLimits & size_limits_, VolumeJBODPtr volume_, const Block & sample_block_,
const SortDescription & description, size_t rows_in_block_, size_t num_files_to_merge_, const String & codec_)
: size_limits(size_limits_)
, volume(volume_)
, sample_block(sample_block_)
, sort_description(description)
, inserted_blocks(blocks)
, rows_in_block(rows_in_block_)
, num_files_for_merge(num_files_to_merge_)
, codec(codec_)
{}
void addBlocks(const Blocks & blocks)
{
sorted_files.emplace_back(flush(inserted_blocks.blocks));
inserted_blocks.clear();
sorted_files.emplace_back(flush(blocks.blocks));
}
String getPath() const;
@ -90,7 +97,30 @@ struct SortedBlocksWriter
void insert(Block && block);
TmpFilePtr flush(const BlocksList & blocks) const;
PremergedFiles premerge();
SortedFiles finishMerge(std::function<void(const Block &)> callback = [](const Block &){});
};
class SortedBlocksBuffer
{
public:
SortedBlocksBuffer(const SortDescription & sort_description_, size_t max_bytes_)
: max_bytes(max_bytes_)
, current_bytes(0)
, sort_description(sort_description_)
{}
Block exchange(Block && block);
private:
std::mutex mutex;
size_t max_bytes;
size_t current_bytes;
Blocks buffer;
const SortDescription & sort_description;
Block mergeBlocks(Blocks &&) const;
};
}

View File

@ -21,6 +21,7 @@ TableJoin::TableJoin(const Settings & settings, VolumeJBODPtr tmp_volume_)
, join_algorithm(settings.join_algorithm)
, partial_merge_join_optimizations(settings.partial_merge_join_optimizations)
, partial_merge_join_rows_in_right_blocks(settings.partial_merge_join_rows_in_right_blocks)
, partial_merge_join_left_table_buffer_bytes(settings.partial_merge_join_left_table_buffer_bytes)
, max_files_to_merge(settings.join_on_disk_max_files_to_merge)
, temporary_files_codec(settings.temporary_files_codec)
, tmp_volume(tmp_volume_)

View File

@ -50,6 +50,7 @@ class TableJoin
JoinAlgorithm join_algorithm = JoinAlgorithm::AUTO;
const bool partial_merge_join_optimizations = false;
const size_t partial_merge_join_rows_in_right_blocks = 0;
const size_t partial_merge_join_left_table_buffer_bytes = 0;
const size_t max_files_to_merge = 0;
const String temporary_files_codec = "LZ4";
@ -108,6 +109,7 @@ public:
size_t defaultMaxBytes() const { return default_max_bytes; }
size_t maxJoinedBlockRows() const { return max_joined_block_rows; }
size_t maxRowsInRightBlock() const { return partial_merge_join_rows_in_right_blocks; }
size_t maxBytesInLeftBuffer() const { return partial_merge_join_left_table_buffer_bytes; }
size_t maxFilesToMerge() const { return max_files_to_merge; }
const String & temporaryFilesCodec() const { return temporary_files_codec; }
bool enablePartialMergeJoinOptimizations() const { return partial_merge_join_optimizations; }

View File

@ -52,12 +52,23 @@ void InflatingExpressionTransform::transform(Chunk & chunk)
Block InflatingExpressionTransform::readExecute(Chunk & chunk)
{
Block res;
if (likely(!not_processed))
if (!not_processed)
{
res = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
if (chunk.hasColumns())
res = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
if (res)
expression->execute(res, not_processed, action_number);
}
else if (not_processed->empty()) /// There's not processed data inside expression.
{
if (chunk.hasColumns())
res = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
not_processed.reset();
expression->execute(res, not_processed, action_number);
}
else
{
res = std::move(not_processed->block);

View File

@ -407,7 +407,6 @@ Block StorageMerge::getQueryHeader(
if (query_info.prewhere_info)
{
query_info.prewhere_info->prewhere_actions->execute(header);
header = materializeBlock(header);
if (query_info.prewhere_info->remove_prewhere_column)
header.erase(query_info.prewhere_info->prewhere_column_name);
}
@ -415,9 +414,9 @@ Block StorageMerge::getQueryHeader(
}
case QueryProcessingStage::WithMergeableState:
case QueryProcessingStage::Complete:
return materializeBlock(InterpreterSelectQuery(
return InterpreterSelectQuery(
query_info.query, context, std::make_shared<OneBlockInputStream>(getSampleBlockForColumns(column_names)),
SelectQueryOptions(processed_stage).analyze()).getSampleBlock());
SelectQueryOptions(processed_stage).analyze()).getSampleBlock();
}
throw Exception("Logical Error: unknown processed stage.", ErrorCodes::LOGICAL_ERROR);
}

View File

@ -1,9 +1,6 @@
<test>
<create_query>CREATE TABLE ints (i64 Int64, i32 Int32, i16 Int16, i8 Int8) ENGINE = Memory</create_query>
<create_query>SET partial_merge_join = 1</create_query>
<create_query>SET join_algorithm = 'partial_merge'</create_query>
<fill_query>INSERT INTO ints SELECT number AS i64, i64 AS i32, i64 AS i16, i64 AS i8 FROM numbers(5000)</fill_query>
<fill_query>INSERT INTO ints SELECT 10000 + number % 1000 AS i64, i64 AS i32, i64 AS i16, i64 AS i8 FROM numbers(5000)</fill_query>
@ -25,6 +22,21 @@
<query tag='LEFT KEY'>SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 200042</query>
<query tag='LEFT ON'>SELECT COUNT() FROM ints l LEFT JOIN ints r ON l.i64 = r.i64 WHERE i32 = 200042</query>
<query tag='LEFT IN'>SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64 WHERE i32 IN(42, 100042, 200042, 300042, 400042)</query>
<query tag='ANY LEFT (noopt)'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r USING i64 WHERE i32 = 200042 SETTINGS partial_merge_join_optimizations = 0</query>
<query tag='ANY LEFT KEY (noopt)'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 200042 SETTINGS partial_merge_join_optimizations = 0</query>
<query tag='ANY LEFT ON (noopt)'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r ON l.i64 = r.i64 WHERE i32 = 200042 SETTINGS partial_merge_join_optimizations = 0</query>
<query tag='ANY LEFT IN (noopt)'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r USING i64 WHERE i32 IN(42, 100042, 200042, 300042, 400042) SETTINGS partial_merge_join_optimizations = 0</query>
<query tag='INNER (noopt)'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64 WHERE i32 = 200042 SETTINGS partial_merge_join_optimizations = 0</query>
<query tag='INNER KEY (noopt)'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 200042 SETTINGS partial_merge_join_optimizations = 0</query>
<query tag='INNER ON (noopt)'>SELECT COUNT() FROM ints l INNER JOIN ints r ON l.i64 = r.i64 WHERE i32 = 200042 SETTINGS partial_merge_join_optimizations = 0</query>
<query tag='INNER IN (noopt)'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64 WHERE i32 IN(42, 100042, 200042, 300042, 400042) SETTINGS partial_merge_join_optimizations = 0</query>
<query tag='LEFT (noopt)'>SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64 WHERE i32 = 200042 SETTINGS partial_merge_join_optimizations = 0</query>
<query tag='LEFT KEY (noopt)'>SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 200042 SETTINGS partial_merge_join_optimizations = 0</query>
<query tag='LEFT ON (noopt)'>SELECT COUNT() FROM ints l LEFT JOIN ints r ON l.i64 = r.i64 WHERE i32 = 200042 SETTINGS partial_merge_join_optimizations = 0</query>
<query tag='LEFT IN (noopt)'>SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64 WHERE i32 IN(42, 100042, 200042, 300042, 400042) SETTINGS partial_merge_join_optimizations = 0</query>
<drop_query>DROP TABLE IF EXISTS ints</drop_query>
</test>

View File

@ -13,6 +13,8 @@
<substitution>
<name>func</name>
<values>
<value>minMap</value>
<value>maxMap</value>
<value>sumMap</value>
<value>sumMapWithOverflow</value>
</values>

View File

@ -8,7 +8,7 @@ CREATE TABLE t2 (x UInt32, y UInt64) engine = MergeTree ORDER BY (x,y);
INSERT INTO t1 (x, y) VALUES (0, 0);
SET partial_merge_join = 1;
SET join_algorithm = 'prefer_partial_merge';
SET any_join_distinct_right_table_keys = 1;
SELECT 't join none using';

View File

@ -1,4 +1,4 @@
set partial_merge_join = 1;
SET join_algorithm = 'partial_merge';
select s1.x, s2.x from (select 1 as x) s1 left join (select 1 as x) s2 using x;
select * from (select materialize(2) as x) s1 left join (select 2 as x) s2 using x;

View File

@ -1,7 +1,7 @@
DROP TABLE IF EXISTS ints;
CREATE TABLE ints (i64 Int64, i32 Int32) ENGINE = Memory;
SET partial_merge_join = 1;
SET join_algorithm = 'partial_merge';
INSERT INTO ints SELECT 1 AS i64, number AS i32 FROM numbers(2);

View File

@ -6,7 +6,7 @@ CREATE TABLE t0 (x UInt32, y UInt64) engine = MergeTree ORDER BY (x,y);
CREATE TABLE t1 (x UInt32, y UInt64) engine = MergeTree ORDER BY (x,y);
CREATE TABLE t2 (x UInt32, y UInt64) engine = MergeTree ORDER BY (x,y);
SET partial_merge_join = 1;
SET join_algorithm = 'prefer_partial_merge';
SET partial_merge_join_rows_in_right_blocks = 1;
SET any_join_distinct_right_table_keys = 1;

View File

@ -8,7 +8,7 @@ ANY LEFT JOIN (
) js2
USING n; -- { serverError 241 }
SET partial_merge_join = 1;
SET join_algorithm = 'partial_merge';
SET default_max_bytes_in_join = 0;
SELECT number * 200000 as n, j FROM numbers(5) nums

View File

@ -6,7 +6,7 @@ CREATE TABLE t0 (x UInt32, y UInt64) engine = MergeTree ORDER BY (x,y);
CREATE TABLE t1 (x UInt32, y UInt64) engine = MergeTree ORDER BY (x,y);
CREATE TABLE t2 (x UInt32, y UInt64) engine = MergeTree ORDER BY (x,y);
SET partial_merge_join = 1;
SET join_algorithm = 'prefer_partial_merge';
SET partial_merge_join_optimizations = 1;
SET any_join_distinct_right_table_keys = 1;

View File

@ -7,7 +7,7 @@ CREATE TABLE t2 (x UInt32, s String) engine = Memory;
INSERT INTO t1 (x, s) VALUES (0, 'a1'), (1, 'a2'), (2, 'a3'), (3, 'a4'), (4, 'a5');
INSERT INTO t2 (x, s) VALUES (2, 'b1'), (2, 'b2'), (4, 'b3'), (4, 'b4'), (4, 'b5'), (5, 'b6');
SET partial_merge_join = 1;
SET join_algorithm = 'prefer_partial_merge';
SET join_use_nulls = 0;
SET any_join_distinct_right_table_keys = 0;

View File

@ -1,5 +1,5 @@
SET max_memory_usage = 50000000;
SET partial_merge_join = 1;
SET join_algorithm = 'partial_merge';
SELECT 'defaults';

View File

@ -1,5 +1,5 @@
SET max_memory_usage = 50000000;
SET partial_merge_join = 1;
SET join_algorithm = 'partial_merge';
SELECT count(1) FROM (
SELECT t2.n FROM numbers(10) t1

View File

@ -1,4 +1,4 @@
SET partial_merge_join = 1;
SET join_algorithm = 'partial_merge';
SELECT count(1), uniqExact(1) FROM (
SELECT materialize(1) as k FROM numbers(1) nums

View File

@ -0,0 +1,24 @@
([0,1,2,3,4,5,6,7,8,9,10],[10,1,1,1,1,1,1,1,1,1,1]) Tuple(Array(Int32), Array(UInt64))
([1],[-49])
([1.00],[-49.00])
([0,1,2,3,4,5,6,7,8,9,10],[100,91,92,93,94,95,96,97,98,99,1]) Tuple(Array(Int32), Array(UInt64))
([1],[50])
([1.00],[50.00])
(['01234567-89ab-cdef-0123-456789abcdef'],['01111111-89ab-cdef-0123-456789abcdef'])
(['1'],['1'])
(['1'],['1'])
([1],[1])
([1],[1])
(['1970-01-02'],[1])
(['1970-01-01 03:00:01'],[1])
([1.01],[1])
(['a'],[1])
(['01234567-89ab-cdef-0123-456789abcdef'],['02222222-89ab-cdef-0123-456789abcdef'])
(['1'],['2'])
(['1'],['2'])
([1],[2])
([1],[2])
(['1970-01-02'],[2])
(['1970-01-01 03:00:01'],[2])
([1.01],[2])
(['a'],[2])

View File

@ -0,0 +1,33 @@
select minMap([toInt32(number % 10), number % 10 + 1], [number, 1]) as m, toTypeName(m) from numbers(1, 100);
select minMap([1], [toInt32(number) - 50]) from numbers(1, 100);
select minMap([cast(1, 'Decimal(10, 2)')], [cast(toInt32(number) - 50, 'Decimal(10, 2)')]) from numbers(1, 100);
select maxMap([toInt32(number % 10), number % 10 + 1], [number, 1]) as m, toTypeName(m) from numbers(1, 100);
select maxMap([1], [toInt32(number) - 50]) from numbers(1, 100);
select maxMap([cast(1, 'Decimal(10, 2)')], [cast(toInt32(number) - 50, 'Decimal(10, 2)')]) from numbers(1, 100);
-- check different types for minMap
select minMap(val, cnt) from values ('val Array(UUID), cnt Array(UUID)',
(['01234567-89ab-cdef-0123-456789abcdef'], ['01111111-89ab-cdef-0123-456789abcdef']),
(['01234567-89ab-cdef-0123-456789abcdef'], ['02222222-89ab-cdef-0123-456789abcdef']));
select minMap(val, cnt) from values ('val Array(String), cnt Array(String)', (['1'], ['1']), (['1'], ['2']));
select minMap(val, cnt) from values ('val Array(FixedString(1)), cnt Array(FixedString(1))', (['1'], ['1']), (['1'], ['2']));
select minMap(val, cnt) from values ('val Array(UInt64), cnt Array(UInt64)', ([1], [1]), ([1], [2]));
select minMap(val, cnt) from values ('val Array(Float64), cnt Array(Int8)', ([1], [1]), ([1], [2]));
select minMap(val, cnt) from values ('val Array(Date), cnt Array(Int16)', ([1], [1]), ([1], [2]));
select minMap(val, cnt) from values ('val Array(DateTime(\'Europe/Moscow\')), cnt Array(Int32)', ([1], [1]), ([1], [2]));
select minMap(val, cnt) from values ('val Array(Decimal(10, 2)), cnt Array(Int16)', (['1.01'], [1]), (['1.01'], [2]));
select minMap(val, cnt) from values ('val Array(Enum16(\'a\'=1)), cnt Array(Int16)', (['a'], [1]), (['a'], [2]));
-- check different types for maxMap
select maxMap(val, cnt) from values ('val Array(UUID), cnt Array(UUID)',
(['01234567-89ab-cdef-0123-456789abcdef'], ['01111111-89ab-cdef-0123-456789abcdef']),
(['01234567-89ab-cdef-0123-456789abcdef'], ['02222222-89ab-cdef-0123-456789abcdef']));
select maxMap(val, cnt) from values ('val Array(String), cnt Array(String)', (['1'], ['1']), (['1'], ['2']));
select maxMap(val, cnt) from values ('val Array(FixedString(1)), cnt Array(FixedString(1))', (['1'], ['1']), (['1'], ['2']));
select maxMap(val, cnt) from values ('val Array(UInt64), cnt Array(UInt64)', ([1], [1]), ([1], [2]));
select maxMap(val, cnt) from values ('val Array(Float64), cnt Array(Int8)', ([1], [1]), ([1], [2]));
select maxMap(val, cnt) from values ('val Array(Date), cnt Array(Int16)', ([1], [1]), ([1], [2]));
select maxMap(val, cnt) from values ('val Array(DateTime(\'Europe/Moscow\')), cnt Array(Int32)', ([1], [1]), ([1], [2]));
select maxMap(val, cnt) from values ('val Array(Decimal(10, 2)), cnt Array(Int16)', (['1.01'], [1]), (['1.01'], [2]));
select maxMap(val, cnt) from values ('val Array(Enum16(\'a\'=1)), cnt Array(Int16)', (['a'], [1]), (['a'], [2]));

View File

@ -0,0 +1,23 @@
DROP TABLE IF EXISTS distributed_table_merged;
DROP TABLE IF EXISTS distributed_table_1;
DROP TABLE IF EXISTS distributed_table_2;
DROP TABLE IF EXISTS local_table_1;
DROP TABLE IF EXISTS local_table_2;
CREATE TABLE local_table_1 (id String) ENGINE = MergeTree ORDER BY (id);
CREATE TABLE local_table_2(id String) ENGINE = MergeTree ORDER BY (id);
CREATE TABLE local_table_merged (id String) ENGINE = Merge('default', 'local_table_1|local_table_2');
CREATE TABLE distributed_table_1 (id String) ENGINE = Distributed(test_shard_localhost, default, local_table_1);
CREATE TABLE distributed_table_2 (id String) ENGINE = Distributed(test_shard_localhost, default, local_table_2);
CREATE TABLE distributed_table_merged (id String) ENGINE = Merge('default', 'distributed_table_1|distributed_table_2');
SELECT 1 FROM distributed_table_merged;
DROP TABLE IF EXISTS distributed_table_merged;
DROP TABLE IF EXISTS distributed_table_1;
DROP TABLE IF EXISTS distributed_table_2;
DROP TABLE IF EXISTS local_table_1;
DROP TABLE IF EXISTS local_table_2;

View File

@ -71,7 +71,9 @@
}
$('pre').each(function(_, element) {
$(element).prepend('<img src="/images/mkdocs/copy.svg" class="code-copy btn float-right m-0 p-0" />');
$(element).prepend(
'<img src="/images/mkdocs/copy.svg" alt="Copy" title="Copy" class="code-copy btn float-right m-0 p-0" />'
);
});
$('.code-copy').each(function(_, element) {

View File

@ -16,12 +16,14 @@
<li><span class="text-yellow"></span> On-disk locality of reference</li>
<li><span class="text-yellow"></span> Secondary data-skipping indexes</li>
<li><span class="text-yellow"></span> Data compression</li>
<li><span class="text-yellow"></span> Optional separation of hot and cold storage</li>
</ul>
</div>
<div class="col-lg">
<ul class="lead list-unstyled mb-0 mb-lg-1">
<li><span class="text-yellow"></span> SQL support</li>
<li><span class="text-yellow"></span> Data skipping indices</li>
<li><span class="text-yellow"></span> Functions for querying JSON documents</li>
<li><span class="text-yellow"></span> Features for web and mobile analytics</li>
<li><span class="text-yellow"></span> High availability</li>
<li><span class="text-yellow"></span> Cross-datacenter replication</li>
<li><span class="text-yellow"></span> Local and distributed joins</li>
@ -33,9 +35,10 @@
<div class="col-lg">
<ul class="lead list-unstyled mb-0 mb-lg-1">
<li><span class="text-yellow"></span> Focus on OLAP workloads</li>
<li><span class="text-yellow"></span> Support for S3-compatible object storage</li>
<li><span class="text-yellow"></span> Integration with Hadoop, MySQL and PostgreSQL ecosystems</li>
<li><span class="text-yellow"></span> Approximate query processing</li>
<li><span class="text-yellow"></span> Probabilistic data structures</li>
<li><span class="text-yellow"></span> Features for web and mobile analytics</li>
<li><span class="text-yellow"></span> Full support of IPv6</li>
<li><span class="text-yellow"></span> State-of-the-art algorithms</li>
<li><span class="text-yellow"></span> Detailed documentation</li>