Merge branch 'remove-bad-tests-azat' into fix-bad-tests-01338_long_select_and_alter-alesapin

This commit is contained in:
Alexey Milovidov 2024-07-21 12:33:09 +02:00
commit 5cb41147e8
58 changed files with 483 additions and 355 deletions

View File

@ -4124,7 +4124,9 @@ void QueryAnalyzer::resolveInterpolateColumnsNodeList(QueryTreeNodePtr & interpo
auto * column_to_interpolate = interpolate_node_typed.getExpression()->as<IdentifierNode>(); auto * column_to_interpolate = interpolate_node_typed.getExpression()->as<IdentifierNode>();
if (!column_to_interpolate) if (!column_to_interpolate)
throw Exception(ErrorCodes::LOGICAL_ERROR, "INTERPOLATE can work only for indentifiers, but {} is found", throw Exception(
ErrorCodes::LOGICAL_ERROR,
"INTERPOLATE can work only for identifiers, but {} is found",
interpolate_node_typed.getExpression()->formatASTForErrorMessage()); interpolate_node_typed.getExpression()->formatASTForErrorMessage());
auto column_to_interpolate_name = column_to_interpolate->getIdentifier().getFullName(); auto column_to_interpolate_name = column_to_interpolate->getIdentifier().getFullName();

View File

@ -366,13 +366,10 @@ void ColumnAggregateFunction::updateHashWithValue(size_t n, SipHash & hash) cons
hash.update(wbuf.str().c_str(), wbuf.str().size()); hash.update(wbuf.str().c_str(), wbuf.str().size());
} }
void ColumnAggregateFunction::updateWeakHash32(WeakHash32 & hash) const WeakHash32 ColumnAggregateFunction::getWeakHash32() const
{ {
auto s = data.size(); auto s = data.size();
if (hash.getData().size() != data.size()) WeakHash32 hash(s);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of WeakHash32 does not match size of column: "
"column size is {}, hash size is {}", std::to_string(s), hash.getData().size());
auto & hash_data = hash.getData(); auto & hash_data = hash.getData();
std::vector<UInt8> v; std::vector<UInt8> v;
@ -383,6 +380,8 @@ void ColumnAggregateFunction::updateWeakHash32(WeakHash32 & hash) const
wbuf.finalize(); wbuf.finalize();
hash_data[i] = ::updateWeakHash32(v.data(), v.size(), hash_data[i]); hash_data[i] = ::updateWeakHash32(v.data(), v.size(), hash_data[i]);
} }
return hash;
} }
void ColumnAggregateFunction::updateHashFast(SipHash & hash) const void ColumnAggregateFunction::updateHashFast(SipHash & hash) const

View File

@ -177,7 +177,7 @@ public:
void updateHashWithValue(size_t n, SipHash & hash) const override; void updateHashWithValue(size_t n, SipHash & hash) const override;
void updateWeakHash32(WeakHash32 & hash) const override; WeakHash32 getWeakHash32() const override;
void updateHashFast(SipHash & hash) const override; void updateHashFast(SipHash & hash) const override;

View File

@ -271,15 +271,12 @@ void ColumnArray::updateHashWithValue(size_t n, SipHash & hash) const
getData().updateHashWithValue(offset + i, hash); getData().updateHashWithValue(offset + i, hash);
} }
void ColumnArray::updateWeakHash32(WeakHash32 & hash) const WeakHash32 ColumnArray::getWeakHash32() const
{ {
auto s = offsets->size(); auto s = offsets->size();
if (hash.getData().size() != s) WeakHash32 hash(s);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of WeakHash32 does not match size of column: "
"column size is {}, hash size is {}", s, hash.getData().size());
WeakHash32 internal_hash(data->size()); WeakHash32 internal_hash = data->getWeakHash32();
data->updateWeakHash32(internal_hash);
Offset prev_offset = 0; Offset prev_offset = 0;
const auto & offsets_data = getOffsets(); const auto & offsets_data = getOffsets();
@ -300,6 +297,8 @@ void ColumnArray::updateWeakHash32(WeakHash32 & hash) const
prev_offset = offsets_data[i]; prev_offset = offsets_data[i];
} }
return hash;
} }
void ColumnArray::updateHashFast(SipHash & hash) const void ColumnArray::updateHashFast(SipHash & hash) const

View File

@ -82,7 +82,7 @@ public:
const char * deserializeAndInsertFromArena(const char * pos) override; const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char * pos) const override; const char * skipSerializedInArena(const char * pos) const override;
void updateHashWithValue(size_t n, SipHash & hash) const override; void updateHashWithValue(size_t n, SipHash & hash) const override;
void updateWeakHash32(WeakHash32 & hash) const override; WeakHash32 getWeakHash32() const override;
void updateHashFast(SipHash & hash) const override; void updateHashFast(SipHash & hash) const override;
#if !defined(ABORT_ON_LOGICAL_ERROR) #if !defined(ABORT_ON_LOGICAL_ERROR)
void insertRangeFrom(const IColumn & src, size_t start, size_t length) override; void insertRangeFrom(const IColumn & src, size_t start, size_t length) override;

View File

@ -3,6 +3,7 @@
#include <optional> #include <optional>
#include <Core/Field.h> #include <Core/Field.h>
#include <Columns/IColumn.h> #include <Columns/IColumn.h>
#include <Common/WeakHash.h>
#include <IO/BufferWithOwnMemory.h> #include <IO/BufferWithOwnMemory.h>
@ -98,7 +99,7 @@ public:
const char * deserializeAndInsertFromArena(const char *) override { throwMustBeDecompressed(); } const char * deserializeAndInsertFromArena(const char *) override { throwMustBeDecompressed(); }
const char * skipSerializedInArena(const char *) const override { throwMustBeDecompressed(); } const char * skipSerializedInArena(const char *) const override { throwMustBeDecompressed(); }
void updateHashWithValue(size_t, SipHash &) const override { throwMustBeDecompressed(); } void updateHashWithValue(size_t, SipHash &) const override { throwMustBeDecompressed(); }
void updateWeakHash32(WeakHash32 &) const override { throwMustBeDecompressed(); } WeakHash32 getWeakHash32() const override { throwMustBeDecompressed(); }
void updateHashFast(SipHash &) const override { throwMustBeDecompressed(); } void updateHashFast(SipHash &) const override { throwMustBeDecompressed(); }
ColumnPtr filter(const Filter &, ssize_t) const override { throwMustBeDecompressed(); } ColumnPtr filter(const Filter &, ssize_t) const override { throwMustBeDecompressed(); }
void expand(const Filter &, bool) override { throwMustBeDecompressed(); } void expand(const Filter &, bool) override { throwMustBeDecompressed(); }

View File

@ -137,18 +137,10 @@ void ColumnConst::updatePermutation(PermutationSortDirection /*direction*/, Perm
{ {
} }
void ColumnConst::updateWeakHash32(WeakHash32 & hash) const WeakHash32 ColumnConst::getWeakHash32() const
{ {
if (hash.getData().size() != s) WeakHash32 element_hash = data->getWeakHash32();
throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of WeakHash32 does not match size of column: " return WeakHash32(s, element_hash.getData()[0]);
"column size is {}, hash size is {}", std::to_string(s), std::to_string(hash.getData().size()));
WeakHash32 element_hash(1);
data->updateWeakHash32(element_hash);
size_t data_hash = element_hash.getData()[0];
for (auto & value : hash.getData())
value = static_cast<UInt32>(intHashCRC32(data_hash, value));
} }
void ColumnConst::compareColumn( void ColumnConst::compareColumn(

View File

@ -204,7 +204,7 @@ public:
data->updateHashWithValue(0, hash); data->updateHashWithValue(0, hash);
} }
void updateWeakHash32(WeakHash32 & hash) const override; WeakHash32 getWeakHash32() const override;
void updateHashFast(SipHash & hash) const override void updateHashFast(SipHash & hash) const override
{ {

View File

@ -28,7 +28,6 @@ namespace ErrorCodes
extern const int PARAMETER_OUT_OF_BOUND; extern const int PARAMETER_OUT_OF_BOUND;
extern const int SIZES_OF_COLUMNS_DOESNT_MATCH; extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
extern const int NOT_IMPLEMENTED; extern const int NOT_IMPLEMENTED;
extern const int LOGICAL_ERROR;
} }
template <is_decimal T> template <is_decimal T>
@ -76,13 +75,10 @@ void ColumnDecimal<T>::updateHashWithValue(size_t n, SipHash & hash) const
} }
template <is_decimal T> template <is_decimal T>
void ColumnDecimal<T>::updateWeakHash32(WeakHash32 & hash) const WeakHash32 ColumnDecimal<T>::getWeakHash32() const
{ {
auto s = data.size(); auto s = data.size();
WeakHash32 hash(s);
if (hash.getData().size() != s)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of WeakHash32 does not match size of column: "
"column size is {}, hash size is {}", std::to_string(s), std::to_string(hash.getData().size()));
const T * begin = data.data(); const T * begin = data.data();
const T * end = begin + s; const T * end = begin + s;
@ -94,6 +90,8 @@ void ColumnDecimal<T>::updateWeakHash32(WeakHash32 & hash) const
++begin; ++begin;
++hash_data; ++hash_data;
} }
return hash;
} }
template <is_decimal T> template <is_decimal T>

View File

@ -102,7 +102,7 @@ public:
const char * deserializeAndInsertFromArena(const char * pos) override; const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char * pos) const override; const char * skipSerializedInArena(const char * pos) const override;
void updateHashWithValue(size_t n, SipHash & hash) const override; void updateHashWithValue(size_t n, SipHash & hash) const override;
void updateWeakHash32(WeakHash32 & hash) const override; WeakHash32 getWeakHash32() const override;
void updateHashFast(SipHash & hash) const override; void updateHashFast(SipHash & hash) const override;
#if !defined(ABORT_ON_LOGICAL_ERROR) #if !defined(ABORT_ON_LOGICAL_ERROR)
int compareAt(size_t n, size_t m, const IColumn & rhs_, int nan_direction_hint) const override; int compareAt(size_t n, size_t m, const IColumn & rhs_, int nan_direction_hint) const override;

View File

@ -4,6 +4,7 @@
#include <Columns/ColumnVector.h> #include <Columns/ColumnVector.h>
#include <Columns/ColumnVariant.h> #include <Columns/ColumnVariant.h>
#include <DataTypes/IDataType.h> #include <DataTypes/IDataType.h>
#include <Common/WeakHash.h>
namespace DB namespace DB
@ -174,9 +175,9 @@ public:
void updateHashWithValue(size_t n, SipHash & hash) const override; void updateHashWithValue(size_t n, SipHash & hash) const override;
void updateWeakHash32(WeakHash32 & hash) const override WeakHash32 getWeakHash32() const override
{ {
variant_column->updateWeakHash32(hash); return variant_column->getWeakHash32();
} }
void updateHashFast(SipHash & hash) const override void updateHashFast(SipHash & hash) const override

View File

@ -137,14 +137,10 @@ void ColumnFixedString::updateHashWithValue(size_t index, SipHash & hash) const
hash.update(reinterpret_cast<const char *>(&chars[n * index]), n); hash.update(reinterpret_cast<const char *>(&chars[n * index]), n);
} }
void ColumnFixedString::updateWeakHash32(WeakHash32 & hash) const WeakHash32 ColumnFixedString::getWeakHash32() const
{ {
auto s = size(); auto s = size();
WeakHash32 hash(s);
if (hash.getData().size() != s)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of WeakHash32 does not match size of column: "
"column size is {}, "
"hash size is {}", std::to_string(s), std::to_string(hash.getData().size()));
const UInt8 * pos = chars.data(); const UInt8 * pos = chars.data();
UInt32 * hash_data = hash.getData().data(); UInt32 * hash_data = hash.getData().data();
@ -156,6 +152,8 @@ void ColumnFixedString::updateWeakHash32(WeakHash32 & hash) const
pos += n; pos += n;
++hash_data; ++hash_data;
} }
return hash;
} }
void ColumnFixedString::updateHashFast(SipHash & hash) const void ColumnFixedString::updateHashFast(SipHash & hash) const

View File

@ -133,7 +133,7 @@ public:
void updateHashWithValue(size_t index, SipHash & hash) const override; void updateHashWithValue(size_t index, SipHash & hash) const override;
void updateWeakHash32(WeakHash32 & hash) const override; WeakHash32 getWeakHash32() const override;
void updateHashFast(SipHash & hash) const override; void updateHashFast(SipHash & hash) const override;

View File

@ -4,6 +4,7 @@
#include <Core/NamesAndTypes.h> #include <Core/NamesAndTypes.h>
#include <Core/ColumnsWithTypeAndName.h> #include <Core/ColumnsWithTypeAndName.h>
#include <Columns/IColumn.h> #include <Columns/IColumn.h>
#include <Common/WeakHash.h>
namespace DB namespace DB
@ -130,9 +131,9 @@ public:
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "updateHashWithValue is not implemented for {}", getName()); throw Exception(ErrorCodes::NOT_IMPLEMENTED, "updateHashWithValue is not implemented for {}", getName());
} }
void updateWeakHash32(WeakHash32 &) const override WeakHash32 getWeakHash32() const override
{ {
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "updateWeakHash32 is not implemented for {}", getName()); throw Exception(ErrorCodes::NOT_IMPLEMENTED, "getWeakHash32 is not implemented for {}", getName());
} }
void updateHashFast(SipHash &) const override void updateHashFast(SipHash &) const override

View File

@ -7,8 +7,7 @@
#include <Common/HashTable/HashMap.h> #include <Common/HashTable/HashMap.h>
#include <Common/WeakHash.h> #include <Common/WeakHash.h>
#include <Common/assert_cast.h> #include <Common/assert_cast.h>
#include "Storages/IndicesDescription.h" #include <base/types.h>
#include "base/types.h"
#include <base/sort.h> #include <base/sort.h>
#include <base/scope_guard.h> #include <base/scope_guard.h>
@ -320,19 +319,10 @@ const char * ColumnLowCardinality::skipSerializedInArena(const char * pos) const
return getDictionary().skipSerializedInArena(pos); return getDictionary().skipSerializedInArena(pos);
} }
void ColumnLowCardinality::updateWeakHash32(WeakHash32 & hash) const WeakHash32 ColumnLowCardinality::getWeakHash32() const
{ {
auto s = size(); WeakHash32 dict_hash = getDictionary().getNestedColumn()->getWeakHash32();
return idx.getWeakHash(dict_hash);
if (hash.getData().size() != s)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of WeakHash32 does not match size of column: "
"column size is {}, hash size is {}", std::to_string(s), std::to_string(hash.getData().size()));
const auto & dict = getDictionary().getNestedColumn();
WeakHash32 dict_hash(dict->size());
dict->updateWeakHash32(dict_hash);
idx.updateWeakHash(hash, dict_hash);
} }
void ColumnLowCardinality::updateHashFast(SipHash & hash) const void ColumnLowCardinality::updateHashFast(SipHash & hash) const
@ -832,10 +822,11 @@ bool ColumnLowCardinality::Index::containsDefault() const
return contains; return contains;
} }
void ColumnLowCardinality::Index::updateWeakHash(WeakHash32 & hash, WeakHash32 & dict_hash) const WeakHash32 ColumnLowCardinality::Index::getWeakHash(const WeakHash32 & dict_hash) const
{ {
WeakHash32 hash(positions->size());
auto & hash_data = hash.getData(); auto & hash_data = hash.getData();
auto & dict_hash_data = dict_hash.getData(); const auto & dict_hash_data = dict_hash.getData();
auto update_weak_hash = [&](auto x) auto update_weak_hash = [&](auto x)
{ {
@ -844,10 +835,11 @@ void ColumnLowCardinality::Index::updateWeakHash(WeakHash32 & hash, WeakHash32 &
auto size = data.size(); auto size = data.size();
for (size_t i = 0; i < size; ++i) for (size_t i = 0; i < size; ++i)
hash_data[i] = static_cast<UInt32>(intHashCRC32(dict_hash_data[data[i]], hash_data[i])); hash_data[i] = dict_hash_data[data[i]];
}; };
callForType(std::move(update_weak_hash), size_of_type); callForType(std::move(update_weak_hash), size_of_type);
return hash;
} }
void ColumnLowCardinality::Index::collectSerializedValueSizes( void ColumnLowCardinality::Index::collectSerializedValueSizes(

View File

@ -111,7 +111,7 @@ public:
getDictionary().updateHashWithValue(getIndexes().getUInt(n), hash); getDictionary().updateHashWithValue(getIndexes().getUInt(n), hash);
} }
void updateWeakHash32(WeakHash32 & hash) const override; WeakHash32 getWeakHash32() const override;
void updateHashFast(SipHash &) const override; void updateHashFast(SipHash &) const override;
@ -325,7 +325,7 @@ public:
bool containsDefault() const; bool containsDefault() const;
void updateWeakHash(WeakHash32 & hash, WeakHash32 & dict_hash) const; WeakHash32 getWeakHash(const WeakHash32 & dict_hash) const;
void collectSerializedValueSizes(PaddedPODArray<UInt64> & sizes, const PaddedPODArray<UInt64> & dict_sizes) const; void collectSerializedValueSizes(PaddedPODArray<UInt64> & sizes, const PaddedPODArray<UInt64> & dict_sizes) const;

View File

@ -143,9 +143,9 @@ void ColumnMap::updateHashWithValue(size_t n, SipHash & hash) const
nested->updateHashWithValue(n, hash); nested->updateHashWithValue(n, hash);
} }
void ColumnMap::updateWeakHash32(WeakHash32 & hash) const WeakHash32 ColumnMap::getWeakHash32() const
{ {
nested->updateWeakHash32(hash); return nested->getWeakHash32();
} }
void ColumnMap::updateHashFast(SipHash & hash) const void ColumnMap::updateHashFast(SipHash & hash) const

View File

@ -64,7 +64,7 @@ public:
const char * deserializeAndInsertFromArena(const char * pos) override; const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char * pos) const override; const char * skipSerializedInArena(const char * pos) const override;
void updateHashWithValue(size_t n, SipHash & hash) const override; void updateHashWithValue(size_t n, SipHash & hash) const override;
void updateWeakHash32(WeakHash32 & hash) const override; WeakHash32 getWeakHash32() const override;
void updateHashFast(SipHash & hash) const override; void updateHashFast(SipHash & hash) const override;
#if !defined(ABORT_ON_LOGICAL_ERROR) #if !defined(ABORT_ON_LOGICAL_ERROR)

View File

@ -56,25 +56,21 @@ void ColumnNullable::updateHashWithValue(size_t n, SipHash & hash) const
getNestedColumn().updateHashWithValue(n, hash); getNestedColumn().updateHashWithValue(n, hash);
} }
void ColumnNullable::updateWeakHash32(WeakHash32 & hash) const WeakHash32 ColumnNullable::getWeakHash32() const
{ {
auto s = size(); auto s = size();
if (hash.getData().size() != s) WeakHash32 hash = nested_column->getWeakHash32();
throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of WeakHash32 does not match size of column: "
"column size is {}, hash size is {}", std::to_string(s), std::to_string(hash.getData().size()));
WeakHash32 old_hash = hash;
nested_column->updateWeakHash32(hash);
const auto & null_map_data = getNullMapData(); const auto & null_map_data = getNullMapData();
auto & hash_data = hash.getData(); auto & hash_data = hash.getData();
auto & old_hash_data = old_hash.getData();
/// Use old data for nulls. /// Use default for nulls.
for (size_t row = 0; row < s; ++row) for (size_t row = 0; row < s; ++row)
if (null_map_data[row]) if (null_map_data[row])
hash_data[row] = old_hash_data[row]; hash_data[row] = WeakHash32::kDefaultInitialValue;
return hash;
} }
void ColumnNullable::updateHashFast(SipHash & hash) const void ColumnNullable::updateHashFast(SipHash & hash) const

View File

@ -133,7 +133,7 @@ public:
void protect() override; void protect() override;
ColumnPtr replicate(const Offsets & replicate_offsets) const override; ColumnPtr replicate(const Offsets & replicate_offsets) const override;
void updateHashWithValue(size_t n, SipHash & hash) const override; void updateHashWithValue(size_t n, SipHash & hash) const override;
void updateWeakHash32(WeakHash32 & hash) const override; WeakHash32 getWeakHash32() const override;
void updateHashFast(SipHash & hash) const override; void updateHashFast(SipHash & hash) const override;
void getExtremes(Field & min, Field & max) const override; void getExtremes(Field & min, Field & max) const override;
// Special function for nullable minmax index // Special function for nullable minmax index

View File

@ -5,6 +5,7 @@
#include <Core/Names.h> #include <Core/Names.h>
#include <DataTypes/Serializations/SubcolumnsTree.h> #include <DataTypes/Serializations/SubcolumnsTree.h>
#include <Common/PODArray.h> #include <Common/PODArray.h>
#include <Common/WeakHash.h>
#include <DataTypes/IDataType.h> #include <DataTypes/IDataType.h>
@ -252,7 +253,7 @@ public:
const char * deserializeAndInsertFromArena(const char *) override { throwMustBeConcrete(); } const char * deserializeAndInsertFromArena(const char *) override { throwMustBeConcrete(); }
const char * skipSerializedInArena(const char *) const override { throwMustBeConcrete(); } const char * skipSerializedInArena(const char *) const override { throwMustBeConcrete(); }
void updateHashWithValue(size_t, SipHash &) const override { throwMustBeConcrete(); } void updateHashWithValue(size_t, SipHash &) const override { throwMustBeConcrete(); }
void updateWeakHash32(WeakHash32 &) const override { throwMustBeConcrete(); } WeakHash32 getWeakHash32() const override { throwMustBeConcrete(); }
void updateHashFast(SipHash & hash) const override; void updateHashFast(SipHash & hash) const override;
void expand(const Filter &, bool) override { throwMustBeConcrete(); } void expand(const Filter &, bool) override { throwMustBeConcrete(); }
bool hasEqualValues() const override { throwMustBeConcrete(); } bool hasEqualValues() const override { throwMustBeConcrete(); }

View File

@ -678,20 +678,22 @@ void ColumnSparse::updateHashWithValue(size_t n, SipHash & hash) const
values->updateHashWithValue(getValueIndex(n), hash); values->updateHashWithValue(getValueIndex(n), hash);
} }
void ColumnSparse::updateWeakHash32(WeakHash32 & hash) const WeakHash32 ColumnSparse::getWeakHash32() const
{ {
if (hash.getData().size() != _size) WeakHash32 values_hash = values->getWeakHash32();
throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of WeakHash32 does not match size of column: " WeakHash32 hash(size());
"column size is {}, hash size is {}", _size, hash.getData().size());
auto & hash_data = hash.getData();
auto & values_hash_data = values_hash.getData();
auto offset_it = begin(); auto offset_it = begin();
auto & hash_data = hash.getData();
for (size_t i = 0; i < _size; ++i, ++offset_it) for (size_t i = 0; i < _size; ++i, ++offset_it)
{ {
size_t value_index = offset_it.getValueIndex(); size_t value_index = offset_it.getValueIndex();
auto data_ref = values->getDataAt(value_index); hash_data[i] = values_hash_data[value_index];
hash_data[i] = ::updateWeakHash32(reinterpret_cast<const UInt8 *>(data_ref.data), data_ref.size, hash_data[i]);
} }
return hash;
} }
void ColumnSparse::updateHashFast(SipHash & hash) const void ColumnSparse::updateHashFast(SipHash & hash) const

View File

@ -139,7 +139,7 @@ public:
void protect() override; void protect() override;
ColumnPtr replicate(const Offsets & replicate_offsets) const override; ColumnPtr replicate(const Offsets & replicate_offsets) const override;
void updateHashWithValue(size_t n, SipHash & hash) const override; void updateHashWithValue(size_t n, SipHash & hash) const override;
void updateWeakHash32(WeakHash32 & hash) const override; WeakHash32 getWeakHash32() const override;
void updateHashFast(SipHash & hash) const override; void updateHashFast(SipHash & hash) const override;
void getExtremes(Field & min, Field & max) const override; void getExtremes(Field & min, Field & max) const override;

View File

@ -108,13 +108,10 @@ MutableColumnPtr ColumnString::cloneResized(size_t to_size) const
return res; return res;
} }
void ColumnString::updateWeakHash32(WeakHash32 & hash) const WeakHash32 ColumnString::getWeakHash32() const
{ {
auto s = offsets.size(); auto s = offsets.size();
WeakHash32 hash(s);
if (hash.getData().size() != s)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of WeakHash32 does not match size of column: "
"column size is {}, hash size is {}", std::to_string(s), std::to_string(hash.getData().size()));
const UInt8 * pos = chars.data(); const UInt8 * pos = chars.data();
UInt32 * hash_data = hash.getData().data(); UInt32 * hash_data = hash.getData().data();
@ -130,6 +127,8 @@ void ColumnString::updateWeakHash32(WeakHash32 & hash) const
prev_offset = offset; prev_offset = offset;
++hash_data; ++hash_data;
} }
return hash;
} }

View File

@ -212,7 +212,7 @@ public:
hash.update(reinterpret_cast<const char *>(&chars[offset]), string_size); hash.update(reinterpret_cast<const char *>(&chars[offset]), string_size);
} }
void updateWeakHash32(WeakHash32 & hash) const override; WeakHash32 getWeakHash32() const override;
void updateHashFast(SipHash & hash) const override void updateHashFast(SipHash & hash) const override
{ {

View File

@ -310,16 +310,15 @@ void ColumnTuple::updateHashWithValue(size_t n, SipHash & hash) const
column->updateHashWithValue(n, hash); column->updateHashWithValue(n, hash);
} }
void ColumnTuple::updateWeakHash32(WeakHash32 & hash) const WeakHash32 ColumnTuple::getWeakHash32() const
{ {
auto s = size(); auto s = size();
WeakHash32 hash(s);
if (hash.getData().size() != s)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of WeakHash32 does not match size of column: "
"column size is {}, hash size is {}", std::to_string(s), std::to_string(hash.getData().size()));
for (const auto & column : columns) for (const auto & column : columns)
column->updateWeakHash32(hash); hash.update(column->getWeakHash32());
return hash;
} }
void ColumnTuple::updateHashFast(SipHash & hash) const void ColumnTuple::updateHashFast(SipHash & hash) const

View File

@ -81,7 +81,7 @@ public:
const char * deserializeAndInsertFromArena(const char * pos) override; const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char * pos) const override; const char * skipSerializedInArena(const char * pos) const override;
void updateHashWithValue(size_t n, SipHash & hash) const override; void updateHashWithValue(size_t n, SipHash & hash) const override;
void updateWeakHash32(WeakHash32 & hash) const override; WeakHash32 getWeakHash32() const override;
void updateHashFast(SipHash & hash) const override; void updateHashFast(SipHash & hash) const override;
#if !defined(ABORT_ON_LOGICAL_ERROR) #if !defined(ABORT_ON_LOGICAL_ERROR)
void insertRangeFrom(const IColumn & src, size_t start, size_t length) override; void insertRangeFrom(const IColumn & src, size_t start, size_t length) override;

View File

@ -789,36 +789,26 @@ void ColumnVariant::updateHashWithValue(size_t n, SipHash & hash) const
variants[localDiscriminatorByGlobal(global_discr)]->updateHashWithValue(offsetAt(n), hash); variants[localDiscriminatorByGlobal(global_discr)]->updateHashWithValue(offsetAt(n), hash);
} }
void ColumnVariant::updateWeakHash32(WeakHash32 & hash) const WeakHash32 ColumnVariant::getWeakHash32() const
{ {
auto s = size(); auto s = size();
if (hash.getData().size() != s)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of WeakHash32 does not match size of column: "
"column size is {}, hash size is {}", std::to_string(s), std::to_string(hash.getData().size()));
/// If we have only NULLs, keep hash unchanged. /// If we have only NULLs, keep hash unchanged.
if (hasOnlyNulls()) if (hasOnlyNulls())
return; return WeakHash32(s);
/// Optimization for case when there is only 1 non-empty variant and no NULLs. /// Optimization for case when there is only 1 non-empty variant and no NULLs.
/// In this case we can just calculate weak hash for this variant. /// In this case we can just calculate weak hash for this variant.
if (auto non_empty_local_discr = getLocalDiscriminatorOfOneNoneEmptyVariantNoNulls()) if (auto non_empty_local_discr = getLocalDiscriminatorOfOneNoneEmptyVariantNoNulls())
{ return variants[*non_empty_local_discr]->getWeakHash32();
variants[*non_empty_local_discr]->updateWeakHash32(hash);
return;
}
/// Calculate weak hash for all variants. /// Calculate weak hash for all variants.
std::vector<WeakHash32> nested_hashes; std::vector<WeakHash32> nested_hashes;
for (const auto & variant : variants) for (const auto & variant : variants)
{ nested_hashes.emplace_back(variant->getWeakHash32());
WeakHash32 nested_hash(variant->size());
variant->updateWeakHash32(nested_hash);
nested_hashes.emplace_back(std::move(nested_hash));
}
/// For each row hash is a hash of corresponding row from corresponding variant. /// For each row hash is a hash of corresponding row from corresponding variant.
WeakHash32 hash(s);
auto & hash_data = hash.getData(); auto & hash_data = hash.getData();
const auto & local_discriminators_data = getLocalDiscriminators(); const auto & local_discriminators_data = getLocalDiscriminators();
const auto & offsets_data = getOffsets(); const auto & offsets_data = getOffsets();
@ -827,11 +817,10 @@ void ColumnVariant::updateWeakHash32(WeakHash32 & hash) const
Discriminator discr = local_discriminators_data[i]; Discriminator discr = local_discriminators_data[i];
/// Update hash only for non-NULL values /// Update hash only for non-NULL values
if (discr != NULL_DISCRIMINATOR) if (discr != NULL_DISCRIMINATOR)
{ hash_data[i] = nested_hashes[discr].getData()[offsets_data[i]];
auto nested_hash = nested_hashes[local_discriminators_data[i]].getData()[offsets_data[i]];
hash_data[i] = static_cast<UInt32>(hashCRC32(nested_hash, hash_data[i]));
}
} }
return hash;
} }
void ColumnVariant::updateHashFast(SipHash & hash) const void ColumnVariant::updateHashFast(SipHash & hash) const

View File

@ -213,7 +213,7 @@ public:
const char * deserializeVariantAndInsertFromArena(Discriminator global_discr, const char * pos); const char * deserializeVariantAndInsertFromArena(Discriminator global_discr, const char * pos);
const char * skipSerializedInArena(const char * pos) const override; const char * skipSerializedInArena(const char * pos) const override;
void updateHashWithValue(size_t n, SipHash & hash) const override; void updateHashWithValue(size_t n, SipHash & hash) const override;
void updateWeakHash32(WeakHash32 & hash) const override; WeakHash32 getWeakHash32() const override;
void updateHashFast(SipHash & hash) const override; void updateHashFast(SipHash & hash) const override;
ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override; ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override;
void expand(const Filter & mask, bool inverted) override; void expand(const Filter & mask, bool inverted) override;

View File

@ -73,13 +73,10 @@ void ColumnVector<T>::updateHashWithValue(size_t n, SipHash & hash) const
} }
template <typename T> template <typename T>
void ColumnVector<T>::updateWeakHash32(WeakHash32 & hash) const WeakHash32 ColumnVector<T>::getWeakHash32() const
{ {
auto s = data.size(); auto s = data.size();
WeakHash32 hash(s);
if (hash.getData().size() != s)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of WeakHash32 does not match size of column: "
"column size is {}, hash size is {}", std::to_string(s), std::to_string(hash.getData().size()));
const T * begin = data.data(); const T * begin = data.data();
const T * end = begin + s; const T * end = begin + s;
@ -91,6 +88,8 @@ void ColumnVector<T>::updateWeakHash32(WeakHash32 & hash) const
++begin; ++begin;
++hash_data; ++hash_data;
} }
return hash;
} }
template <typename T> template <typename T>

View File

@ -114,7 +114,7 @@ public:
void updateHashWithValue(size_t n, SipHash & hash) const override; void updateHashWithValue(size_t n, SipHash & hash) const override;
void updateWeakHash32(WeakHash32 & hash) const override; WeakHash32 getWeakHash32() const override;
void updateHashFast(SipHash & hash) const override; void updateHashFast(SipHash & hash) const override;

View File

@ -300,10 +300,10 @@ public:
/// passed bytes to hash must identify sequence of values unambiguously. /// passed bytes to hash must identify sequence of values unambiguously.
virtual void updateHashWithValue(size_t n, SipHash & hash) const = 0; virtual void updateHashWithValue(size_t n, SipHash & hash) const = 0;
/// Update hash function value. Hash is calculated for each element. /// Get hash function value. Hash is calculated for each element.
/// It's a fast weak hash function. Mainly need to scatter data between threads. /// It's a fast weak hash function. Mainly need to scatter data between threads.
/// WeakHash32 must have the same size as column. /// WeakHash32 must have the same size as column.
virtual void updateWeakHash32(WeakHash32 & hash) const = 0; virtual WeakHash32 getWeakHash32() const = 0;
/// Update state of hash with all column. /// Update state of hash with all column.
virtual void updateHashFast(SipHash & hash) const = 0; virtual void updateHashFast(SipHash & hash) const = 0;

View File

@ -1,6 +1,7 @@
#pragma once #pragma once
#include <Columns/IColumn.h> #include <Columns/IColumn.h>
#include <Common/WeakHash.h>
namespace DB namespace DB
@ -63,8 +64,9 @@ public:
{ {
} }
void updateWeakHash32(WeakHash32 & /*hash*/) const override WeakHash32 getWeakHash32() const override
{ {
return WeakHash32(s);
} }
void updateHashFast(SipHash & /*hash*/) const override void updateHashFast(SipHash & /*hash*/) const override

View File

@ -1,6 +1,7 @@
#pragma once #pragma once
#include <optional> #include <optional>
#include <Columns/IColumn.h> #include <Columns/IColumn.h>
#include <Common/WeakHash.h>
namespace DB namespace DB
{ {
@ -166,9 +167,9 @@ public:
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method scatter is not supported for ColumnUnique."); throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method scatter is not supported for ColumnUnique.");
} }
void updateWeakHash32(WeakHash32 &) const override WeakHash32 getWeakHash32() const override
{ {
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method updateWeakHash32 is not supported for ColumnUnique."); throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method getWeakHash32 is not supported for ColumnUnique.");
} }
void updateHashFast(SipHash &) const override void updateHashFast(SipHash &) const override

View File

@ -60,8 +60,7 @@ TEST(WeakHash32, ColumnVectorU8)
data.push_back(i); data.push_back(i);
} }
WeakHash32 hash(col->size()); WeakHash32 hash = col->getWeakHash32();
col->updateWeakHash32(hash);
checkColumn(hash.getData(), col->getData()); checkColumn(hash.getData(), col->getData());
} }
@ -77,8 +76,7 @@ TEST(WeakHash32, ColumnVectorI8)
data.push_back(i); data.push_back(i);
} }
WeakHash32 hash(col->size()); WeakHash32 hash = col->getWeakHash32();
col->updateWeakHash32(hash);
checkColumn(hash.getData(), col->getData()); checkColumn(hash.getData(), col->getData());
} }
@ -94,8 +92,7 @@ TEST(WeakHash32, ColumnVectorU16)
data.push_back(i); data.push_back(i);
} }
WeakHash32 hash(col->size()); WeakHash32 hash = col->getWeakHash32();
col->updateWeakHash32(hash);
checkColumn(hash.getData(), col->getData()); checkColumn(hash.getData(), col->getData());
} }
@ -111,8 +108,7 @@ TEST(WeakHash32, ColumnVectorI16)
data.push_back(i); data.push_back(i);
} }
WeakHash32 hash(col->size()); WeakHash32 hash = col->getWeakHash32();
col->updateWeakHash32(hash);
checkColumn(hash.getData(), col->getData()); checkColumn(hash.getData(), col->getData());
} }
@ -128,8 +124,7 @@ TEST(WeakHash32, ColumnVectorU32)
data.push_back(i << 16u); data.push_back(i << 16u);
} }
WeakHash32 hash(col->size()); WeakHash32 hash = col->getWeakHash32();
col->updateWeakHash32(hash);
checkColumn(hash.getData(), col->getData()); checkColumn(hash.getData(), col->getData());
} }
@ -145,8 +140,7 @@ TEST(WeakHash32, ColumnVectorI32)
data.push_back(i << 16); data.push_back(i << 16);
} }
WeakHash32 hash(col->size()); WeakHash32 hash = col->getWeakHash32();
col->updateWeakHash32(hash);
checkColumn(hash.getData(), col->getData()); checkColumn(hash.getData(), col->getData());
} }
@ -162,8 +156,7 @@ TEST(WeakHash32, ColumnVectorU64)
data.push_back(i << 32u); data.push_back(i << 32u);
} }
WeakHash32 hash(col->size()); WeakHash32 hash = col->getWeakHash32();
col->updateWeakHash32(hash);
checkColumn(hash.getData(), col->getData()); checkColumn(hash.getData(), col->getData());
} }
@ -179,8 +172,7 @@ TEST(WeakHash32, ColumnVectorI64)
data.push_back(i << 32); data.push_back(i << 32);
} }
WeakHash32 hash(col->size()); WeakHash32 hash = col->getWeakHash32();
col->updateWeakHash32(hash);
checkColumn(hash.getData(), col->getData()); checkColumn(hash.getData(), col->getData());
} }
@ -204,8 +196,7 @@ TEST(WeakHash32, ColumnVectorU128)
} }
} }
WeakHash32 hash(col->size()); WeakHash32 hash = col->getWeakHash32();
col->updateWeakHash32(hash);
checkColumn(hash.getData(), eq_data); checkColumn(hash.getData(), eq_data);
} }
@ -221,8 +212,7 @@ TEST(WeakHash32, ColumnVectorI128)
data.push_back(i << 32); data.push_back(i << 32);
} }
WeakHash32 hash(col->size()); WeakHash32 hash = col->getWeakHash32();
col->updateWeakHash32(hash);
checkColumn(hash.getData(), col->getData()); checkColumn(hash.getData(), col->getData());
} }
@ -238,8 +228,7 @@ TEST(WeakHash32, ColumnDecimal32)
data.push_back(i << 16); data.push_back(i << 16);
} }
WeakHash32 hash(col->size()); WeakHash32 hash = col->getWeakHash32();
col->updateWeakHash32(hash);
checkColumn(hash.getData(), col->getData()); checkColumn(hash.getData(), col->getData());
} }
@ -255,8 +244,7 @@ TEST(WeakHash32, ColumnDecimal64)
data.push_back(i << 32); data.push_back(i << 32);
} }
WeakHash32 hash(col->size()); WeakHash32 hash = col->getWeakHash32();
col->updateWeakHash32(hash);
checkColumn(hash.getData(), col->getData()); checkColumn(hash.getData(), col->getData());
} }
@ -272,8 +260,7 @@ TEST(WeakHash32, ColumnDecimal128)
data.push_back(i << 32); data.push_back(i << 32);
} }
WeakHash32 hash(col->size()); WeakHash32 hash = col->getWeakHash32();
col->updateWeakHash32(hash);
checkColumn(hash.getData(), col->getData()); checkColumn(hash.getData(), col->getData());
} }
@ -294,8 +281,7 @@ TEST(WeakHash32, ColumnString1)
} }
} }
WeakHash32 hash(col->size()); WeakHash32 hash = col->getWeakHash32();
col->updateWeakHash32(hash);
checkColumn(hash.getData(), data); checkColumn(hash.getData(), data);
} }
@ -331,8 +317,7 @@ TEST(WeakHash32, ColumnString2)
} }
} }
WeakHash32 hash(col->size()); WeakHash32 hash = col->getWeakHash32();
col->updateWeakHash32(hash);
checkColumn(hash.getData(), data); checkColumn(hash.getData(), data);
} }
@ -369,8 +354,7 @@ TEST(WeakHash32, ColumnString3)
} }
} }
WeakHash32 hash(col->size()); WeakHash32 hash = col->getWeakHash32();
col->updateWeakHash32(hash);
checkColumn(hash.getData(), data); checkColumn(hash.getData(), data);
} }
@ -397,8 +381,7 @@ TEST(WeakHash32, ColumnFixedString)
} }
} }
WeakHash32 hash(col->size()); WeakHash32 hash = col->getWeakHash32();
col->updateWeakHash32(hash);
checkColumn(hash.getData(), data); checkColumn(hash.getData(), data);
} }
@ -444,8 +427,7 @@ TEST(WeakHash32, ColumnArray)
auto col_arr = ColumnArray::create(std::move(val), std::move(off)); auto col_arr = ColumnArray::create(std::move(val), std::move(off));
WeakHash32 hash(col_arr->size()); WeakHash32 hash = col_arr->getWeakHash32();
col_arr->updateWeakHash32(hash);
checkColumn(hash.getData(), eq_data); checkColumn(hash.getData(), eq_data);
} }
@ -479,8 +461,7 @@ TEST(WeakHash32, ColumnArray2)
auto col_arr = ColumnArray::create(std::move(val), std::move(off)); auto col_arr = ColumnArray::create(std::move(val), std::move(off));
WeakHash32 hash(col_arr->size()); WeakHash32 hash = col_arr->getWeakHash32();
col_arr->updateWeakHash32(hash);
checkColumn(hash.getData(), eq_data); checkColumn(hash.getData(), eq_data);
} }
@ -536,8 +517,7 @@ TEST(WeakHash32, ColumnArrayArray)
auto col_arr = ColumnArray::create(std::move(val), std::move(off)); auto col_arr = ColumnArray::create(std::move(val), std::move(off));
auto col_arr_arr = ColumnArray::create(std::move(col_arr), std::move(off2)); auto col_arr_arr = ColumnArray::create(std::move(col_arr), std::move(off2));
WeakHash32 hash(col_arr_arr->size()); WeakHash32 hash = col_arr_arr->getWeakHash32();
col_arr_arr->updateWeakHash32(hash);
checkColumn(hash.getData(), eq_data); checkColumn(hash.getData(), eq_data);
} }
@ -555,8 +535,7 @@ TEST(WeakHash32, ColumnConst)
auto col_const = ColumnConst::create(std::move(inner_col), 256); auto col_const = ColumnConst::create(std::move(inner_col), 256);
WeakHash32 hash(col_const->size()); WeakHash32 hash = col_const->getWeakHash32();
col_const->updateWeakHash32(hash);
checkColumn(hash.getData(), data); checkColumn(hash.getData(), data);
} }
@ -576,8 +555,7 @@ TEST(WeakHash32, ColumnLowcardinality)
} }
} }
WeakHash32 hash(col->size()); WeakHash32 hash = col->getWeakHash32();
col->updateWeakHash32(hash);
checkColumn(hash.getData(), data); checkColumn(hash.getData(), data);
} }
@ -602,8 +580,7 @@ TEST(WeakHash32, ColumnNullable)
auto col_null = ColumnNullable::create(std::move(col), std::move(mask)); auto col_null = ColumnNullable::create(std::move(col), std::move(mask));
WeakHash32 hash(col_null->size()); WeakHash32 hash = col_null->getWeakHash32();
col_null->updateWeakHash32(hash);
checkColumn(hash.getData(), eq); checkColumn(hash.getData(), eq);
} }
@ -633,8 +610,7 @@ TEST(WeakHash32, ColumnTupleUInt64UInt64)
columns.emplace_back(std::move(col2)); columns.emplace_back(std::move(col2));
auto col_tuple = ColumnTuple::create(std::move(columns)); auto col_tuple = ColumnTuple::create(std::move(columns));
WeakHash32 hash(col_tuple->size()); WeakHash32 hash = col_tuple->getWeakHash32();
col_tuple->updateWeakHash32(hash);
checkColumn(hash.getData(), eq); checkColumn(hash.getData(), eq);
} }
@ -671,8 +647,7 @@ TEST(WeakHash32, ColumnTupleUInt64String)
columns.emplace_back(std::move(col2)); columns.emplace_back(std::move(col2));
auto col_tuple = ColumnTuple::create(std::move(columns)); auto col_tuple = ColumnTuple::create(std::move(columns));
WeakHash32 hash(col_tuple->size()); WeakHash32 hash = col_tuple->getWeakHash32();
col_tuple->updateWeakHash32(hash);
checkColumn(hash.getData(), eq); checkColumn(hash.getData(), eq);
} }
@ -709,8 +684,7 @@ TEST(WeakHash32, ColumnTupleUInt64FixedString)
columns.emplace_back(std::move(col2)); columns.emplace_back(std::move(col2));
auto col_tuple = ColumnTuple::create(std::move(columns)); auto col_tuple = ColumnTuple::create(std::move(columns));
WeakHash32 hash(col_tuple->size()); WeakHash32 hash = col_tuple->getWeakHash32();
col_tuple->updateWeakHash32(hash);
checkColumn(hash.getData(), eq); checkColumn(hash.getData(), eq);
} }
@ -756,8 +730,7 @@ TEST(WeakHash32, ColumnTupleUInt64Array)
columns.emplace_back(ColumnArray::create(std::move(val), std::move(off))); columns.emplace_back(ColumnArray::create(std::move(val), std::move(off)));
auto col_tuple = ColumnTuple::create(std::move(columns)); auto col_tuple = ColumnTuple::create(std::move(columns));
WeakHash32 hash(col_tuple->size()); WeakHash32 hash = col_tuple->getWeakHash32();
col_tuple->updateWeakHash32(hash);
checkColumn(hash.getData(), eq_data); checkColumn(hash.getData(), eq_data);
} }

View File

@ -1,2 +1,24 @@
#include <Common/WeakHash.h> #include <Common/WeakHash.h>
#include <Common/Exception.h>
#include <Common/HashTable/Hash.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
void WeakHash32::update(const WeakHash32 & other)
{
size_t size = data.size();
if (size != other.data.size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of WeakHash32 does not match:"
"left size is {}, right size is {}", size, other.data.size());
for (size_t i = 0; i < size; ++i)
data[i] = static_cast<UInt32>(intHashCRC32(other.data[i], data[i]));
}
}

View File

@ -11,9 +11,8 @@ namespace DB
/// The main purpose why this class needed is to support data initialization. Initially, every bit is 1. /// The main purpose why this class needed is to support data initialization. Initially, every bit is 1.
class WeakHash32 class WeakHash32
{ {
static constexpr UInt32 kDefaultInitialValue = ~UInt32(0);
public: public:
static constexpr UInt32 kDefaultInitialValue = ~UInt32(0);
using Container = PaddedPODArray<UInt32>; using Container = PaddedPODArray<UInt32>;
@ -22,6 +21,8 @@ public:
void reset(size_t size, UInt32 initial_value = kDefaultInitialValue) { data.assign(size, initial_value); } void reset(size_t size, UInt32 initial_value = kDefaultInitialValue) { data.assign(size, initial_value); }
void update(const WeakHash32 & other);
const Container & getData() const { return data; } const Container & getData() const { return data; }
Container & getData() { return data; } Container & getData() { return data; }

View File

@ -47,54 +47,85 @@ bool allArgumentsAreConstants(const ColumnsWithTypeAndName & args)
return true; return true;
} }
/// Replaces single low cardinality column in a function call by its dictionary
/// This can only happen after the arguments have been adapted in IFunctionOverloadResolver::getReturnType
/// as it's only possible if there is one low cardinality column and, optionally, const columns
ColumnPtr replaceLowCardinalityColumnsByNestedAndGetDictionaryIndexes( ColumnPtr replaceLowCardinalityColumnsByNestedAndGetDictionaryIndexes(
ColumnsWithTypeAndName & args, bool can_be_executed_on_default_arguments, size_t input_rows_count) ColumnsWithTypeAndName & args, bool can_be_executed_on_default_arguments, size_t input_rows_count)
{ {
size_t num_rows = input_rows_count; /// We return the LC indexes so the LC can be reconstructed with the function result
ColumnPtr indexes; ColumnPtr indexes;
/// Find first LowCardinality column and replace it to nested dictionary. size_t number_low_cardinality_columns = 0;
for (auto & column : args) size_t last_low_cardinality = 0;
size_t number_const_columns = 0;
size_t number_full_columns = 0;
for (size_t i = 0; i < args.size(); i++)
{ {
if (const auto * low_cardinality_column = checkAndGetColumn<ColumnLowCardinality>(column.column.get())) auto const & arg = args[i];
if (checkAndGetColumn<ColumnLowCardinality>(arg.column.get()))
{ {
/// Single LowCardinality column is supported now. number_low_cardinality_columns++;
if (indexes) last_low_cardinality = i;
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected single dictionary argument for function.");
const auto * low_cardinality_type = checkAndGetDataType<DataTypeLowCardinality>(column.type.get());
if (!low_cardinality_type)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Incompatible type for LowCardinality column: {}",
column.type->getName());
if (can_be_executed_on_default_arguments)
{
/// Normal case, when function can be executed on values' default.
column.column = low_cardinality_column->getDictionary().getNestedColumn();
indexes = low_cardinality_column->getIndexesPtr();
}
else
{
/// Special case when default value can't be used. Example: 1 % LowCardinality(Int).
/// LowCardinality always contains default, so 1 % 0 will throw exception in normal case.
auto dict_encoded = low_cardinality_column->getMinimalDictionaryEncodedColumn(0, low_cardinality_column->size());
column.column = dict_encoded.dictionary;
indexes = dict_encoded.indexes;
}
num_rows = column.column->size();
column.type = low_cardinality_type->getDictionaryType();
} }
else if (checkAndGetColumn<ColumnConst>(arg.column.get()))
number_const_columns++;
else
number_full_columns++;
} }
/// Change size of constants. if (!number_low_cardinality_columns && !number_const_columns)
return nullptr;
if (number_full_columns > 0 || number_low_cardinality_columns > 1)
{
/// This should not be possible but currently there are multiple tests in CI failing because of it
/// TODO: Fix those cases, then enable this exception
#if 0
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected low cardinality types found. Low cardinality: {}. Full {}. Const {}",
number_low_cardinality_columns, number_full_columns, number_const_columns);
#else
return nullptr;
#endif
}
else if (number_low_cardinality_columns == 1)
{
auto & lc_arg = args[last_low_cardinality];
const auto * low_cardinality_type = checkAndGetDataType<DataTypeLowCardinality>(lc_arg.type.get());
if (!low_cardinality_type)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Incompatible type for LowCardinality column: {}", lc_arg.type->getName());
const auto * low_cardinality_column = checkAndGetColumn<ColumnLowCardinality>(lc_arg.column.get());
chassert(low_cardinality_column);
if (can_be_executed_on_default_arguments)
{
/// Normal case, when function can be executed on values' default.
lc_arg.column = low_cardinality_column->getDictionary().getNestedColumn();
indexes = low_cardinality_column->getIndexesPtr();
}
else
{
/// Special case when default value can't be used. Example: 1 % LowCardinality(Int).
/// LowCardinality always contains default, so 1 % 0 will throw exception in normal case.
auto dict_encoded = low_cardinality_column->getMinimalDictionaryEncodedColumn(0, low_cardinality_column->size());
lc_arg.column = dict_encoded.dictionary;
indexes = dict_encoded.indexes;
}
/// The new column will have a different number of rows, normally less but occasionally it might be more (NULL)
input_rows_count = lc_arg.column->size();
lc_arg.type = low_cardinality_type->getDictionaryType();
}
/// Change size of constants
for (auto & column : args) for (auto & column : args)
{ {
if (const auto * column_const = checkAndGetColumn<ColumnConst>(column.column.get())) if (const auto * column_const = checkAndGetColumn<ColumnConst>(column.column.get()))
{ {
column.column = ColumnConst::create(recursiveRemoveLowCardinality(column_const->getDataColumnPtr()), num_rows); column.column = ColumnConst::create(recursiveRemoveLowCardinality(column_const->getDataColumnPtr()), input_rows_count);
column.type = recursiveRemoveLowCardinality(column.type); column.type = recursiveRemoveLowCardinality(column.type);
} }
} }
@ -270,6 +301,8 @@ ColumnPtr IExecutableFunction::executeWithoutSparseColumns(const ColumnsWithType
bool can_be_executed_on_default_arguments = canBeExecutedOnDefaultArguments(); bool can_be_executed_on_default_arguments = canBeExecutedOnDefaultArguments();
const auto & dictionary_type = res_low_cardinality_type->getDictionaryType(); const auto & dictionary_type = res_low_cardinality_type->getDictionaryType();
/// The arguments should have been adapted in IFunctionOverloadResolver::getReturnType
/// So there is only one low cardinality column (and optionally some const columns) and no full column
ColumnPtr indexes = replaceLowCardinalityColumnsByNestedAndGetDictionaryIndexes( ColumnPtr indexes = replaceLowCardinalityColumnsByNestedAndGetDictionaryIndexes(
columns_without_low_cardinality, can_be_executed_on_default_arguments, input_rows_count); columns_without_low_cardinality, can_be_executed_on_default_arguments, input_rows_count);

View File

@ -310,7 +310,7 @@ IColumn::Selector ConcurrentHashJoin::selectDispatchBlock(const Strings & key_co
{ {
const auto & key_col = from_block.getByName(key_name).column->convertToFullColumnIfConst(); const auto & key_col = from_block.getByName(key_name).column->convertToFullColumnIfConst();
const auto & key_col_no_lc = recursiveRemoveLowCardinality(recursiveRemoveSparse(key_col)); const auto & key_col_no_lc = recursiveRemoveLowCardinality(recursiveRemoveSparse(key_col));
key_col_no_lc->updateWeakHash32(hash); hash.update(key_col_no_lc->getWeakHash32());
} }
return hashToSelector(hash, num_shards); return hashToSelector(hash, num_shards);
} }

View File

@ -554,7 +554,7 @@ static Blocks scatterBlockByHashImpl(const Strings & key_columns_names, const Bl
for (const auto & key_name : key_columns_names) for (const auto & key_name : key_columns_names)
{ {
ColumnPtr key_col = materializeColumn(block, key_name); ColumnPtr key_col = materializeColumn(block, key_name);
key_col->updateWeakHash32(hash); hash.update(key_col->getWeakHash32());
} }
auto selector = hashToSelector(hash, sharder); auto selector = hashToSelector(hash, sharder);

View File

@ -745,7 +745,12 @@ void addWithFillStepIfNeeded(QueryPlan & query_plan,
{ {
auto & interpolate_node_typed = interpolate_node->as<InterpolateNode &>(); auto & interpolate_node_typed = interpolate_node->as<InterpolateNode &>();
PlannerActionsVisitor planner_actions_visitor(planner_context); PlannerActionsVisitor planner_actions_visitor(
planner_context,
/* use_column_identifier_as_action_node_name_, (default value)*/ true,
/// Prefer the INPUT to CONSTANT nodes (actions must be non constant)
/* always_use_const_column_for_constant_nodes */ false);
auto expression_to_interpolate_expression_nodes = planner_actions_visitor.visit(*interpolate_actions_dag, auto expression_to_interpolate_expression_nodes = planner_actions_visitor.visit(*interpolate_actions_dag,
interpolate_node_typed.getExpression()); interpolate_node_typed.getExpression());
if (expression_to_interpolate_expression_nodes.size() != 1) if (expression_to_interpolate_expression_nodes.size() != 1)

View File

@ -487,16 +487,33 @@ public:
return node; return node;
} }
const ActionsDAG::Node * addConstantIfNecessary(const std::string & node_name, const ColumnWithTypeAndName & column) [[nodiscard]] String addConstantIfNecessary(
const std::string & node_name, const ColumnWithTypeAndName & column, bool always_use_const_column_for_constant_nodes)
{ {
chassert(column.column != nullptr);
auto it = node_name_to_node.find(node_name); auto it = node_name_to_node.find(node_name);
if (it != node_name_to_node.end() && (!always_use_const_column_for_constant_nodes || it->second->column))
return {node_name};
if (it != node_name_to_node.end()) if (it != node_name_to_node.end())
return it->second; {
/// There is a node with this name, but it doesn't have a column
/// This likely happens because we executed the query until WithMergeableState with a const node in the
/// WHERE clause and, as the results of headers are materialized, the column was removed
/// Let's add a new column and keep this
String dupped_name{node_name + "_dupped"};
if (node_name_to_node.find(dupped_name) != node_name_to_node.end())
return dupped_name;
const auto * node = &actions_dag.addColumn(column);
node_name_to_node[dupped_name] = node;
return dupped_name;
}
const auto * node = &actions_dag.addColumn(column); const auto * node = &actions_dag.addColumn(column);
node_name_to_node[node->result_name] = node; node_name_to_node[node->result_name] = node;
return node; return {node_name};
} }
template <typename FunctionOrOverloadResolver> template <typename FunctionOrOverloadResolver>
@ -525,7 +542,7 @@ public:
} }
private: private:
std::unordered_map<std::string_view, const ActionsDAG::Node *> node_name_to_node; std::unordered_map<String, const ActionsDAG::Node *> node_name_to_node;
ActionsDAG & actions_dag; ActionsDAG & actions_dag;
QueryTreeNodePtr scope_node; QueryTreeNodePtr scope_node;
}; };
@ -533,9 +550,11 @@ private:
class PlannerActionsVisitorImpl class PlannerActionsVisitorImpl
{ {
public: public:
PlannerActionsVisitorImpl(ActionsDAG & actions_dag, PlannerActionsVisitorImpl(
ActionsDAG & actions_dag,
const PlannerContextPtr & planner_context_, const PlannerContextPtr & planner_context_,
bool use_column_identifier_as_action_node_name_); bool use_column_identifier_as_action_node_name_,
bool always_use_const_column_for_constant_nodes_);
ActionsDAG::NodeRawConstPtrs visit(QueryTreeNodePtr expression_node); ActionsDAG::NodeRawConstPtrs visit(QueryTreeNodePtr expression_node);
@ -595,14 +614,18 @@ private:
const PlannerContextPtr planner_context; const PlannerContextPtr planner_context;
ActionNodeNameHelper action_node_name_helper; ActionNodeNameHelper action_node_name_helper;
bool use_column_identifier_as_action_node_name; bool use_column_identifier_as_action_node_name;
bool always_use_const_column_for_constant_nodes;
}; };
PlannerActionsVisitorImpl::PlannerActionsVisitorImpl(ActionsDAG & actions_dag, PlannerActionsVisitorImpl::PlannerActionsVisitorImpl(
ActionsDAG & actions_dag,
const PlannerContextPtr & planner_context_, const PlannerContextPtr & planner_context_,
bool use_column_identifier_as_action_node_name_) bool use_column_identifier_as_action_node_name_,
bool always_use_const_column_for_constant_nodes_)
: planner_context(planner_context_) : planner_context(planner_context_)
, action_node_name_helper(node_to_node_name, *planner_context, use_column_identifier_as_action_node_name_) , action_node_name_helper(node_to_node_name, *planner_context, use_column_identifier_as_action_node_name_)
, use_column_identifier_as_action_node_name(use_column_identifier_as_action_node_name_) , use_column_identifier_as_action_node_name(use_column_identifier_as_action_node_name_)
, always_use_const_column_for_constant_nodes(always_use_const_column_for_constant_nodes_)
{ {
actions_stack.emplace_back(actions_dag, nullptr); actions_stack.emplace_back(actions_dag, nullptr);
} }
@ -725,17 +748,16 @@ PlannerActionsVisitorImpl::NodeNameAndNodeMinLevel PlannerActionsVisitorImpl::vi
column.type = constant_type; column.type = constant_type;
column.column = column.type->createColumnConst(1, constant_literal); column.column = column.type->createColumnConst(1, constant_literal);
actions_stack[0].addConstantIfNecessary(constant_node_name, column); String final_name = actions_stack[0].addConstantIfNecessary(constant_node_name, column, always_use_const_column_for_constant_nodes);
size_t actions_stack_size = actions_stack.size(); size_t actions_stack_size = actions_stack.size();
for (size_t i = 1; i < actions_stack_size; ++i) for (size_t i = 1; i < actions_stack_size; ++i)
{ {
auto & actions_stack_node = actions_stack[i]; auto & actions_stack_node = actions_stack[i];
actions_stack_node.addInputConstantColumnIfNecessary(constant_node_name, column); actions_stack_node.addInputConstantColumnIfNecessary(final_name, column);
} }
return {constant_node_name, Levels(0)}; return {final_name, Levels(0)};
} }
PlannerActionsVisitorImpl::NodeNameAndNodeMinLevel PlannerActionsVisitorImpl::visitLambda(const QueryTreeNodePtr & node) PlannerActionsVisitorImpl::NodeNameAndNodeMinLevel PlannerActionsVisitorImpl::visitLambda(const QueryTreeNodePtr & node)
@ -864,16 +886,16 @@ PlannerActionsVisitorImpl::NodeNameAndNodeMinLevel PlannerActionsVisitorImpl::ma
else else
column.column = std::move(column_set); column.column = std::move(column_set);
actions_stack[0].addConstantIfNecessary(column.name, column); String final_name = actions_stack[0].addConstantIfNecessary(column.name, column, always_use_const_column_for_constant_nodes);
size_t actions_stack_size = actions_stack.size(); size_t actions_stack_size = actions_stack.size();
for (size_t i = 1; i < actions_stack_size; ++i) for (size_t i = 1; i < actions_stack_size; ++i)
{ {
auto & actions_stack_node = actions_stack[i]; auto & actions_stack_node = actions_stack[i];
actions_stack_node.addInputConstantColumnIfNecessary(column.name, column); actions_stack_node.addInputConstantColumnIfNecessary(final_name, column);
} }
return {column.name, Levels(0)}; return {final_name, Levels(0)};
} }
PlannerActionsVisitorImpl::NodeNameAndNodeMinLevel PlannerActionsVisitorImpl::visitIndexHintFunction(const QueryTreeNodePtr & node) PlannerActionsVisitorImpl::NodeNameAndNodeMinLevel PlannerActionsVisitorImpl::visitIndexHintFunction(const QueryTreeNodePtr & node)
@ -1010,14 +1032,19 @@ PlannerActionsVisitorImpl::NodeNameAndNodeMinLevel PlannerActionsVisitorImpl::vi
} }
PlannerActionsVisitor::PlannerActionsVisitor(const PlannerContextPtr & planner_context_, bool use_column_identifier_as_action_node_name_) PlannerActionsVisitor::PlannerActionsVisitor(
const PlannerContextPtr & planner_context_,
bool use_column_identifier_as_action_node_name_,
bool always_use_const_column_for_constant_nodes_)
: planner_context(planner_context_) : planner_context(planner_context_)
, use_column_identifier_as_action_node_name(use_column_identifier_as_action_node_name_) , use_column_identifier_as_action_node_name(use_column_identifier_as_action_node_name_)
, always_use_const_column_for_constant_nodes(always_use_const_column_for_constant_nodes_)
{} {}
ActionsDAG::NodeRawConstPtrs PlannerActionsVisitor::visit(ActionsDAG & actions_dag, QueryTreeNodePtr expression_node) ActionsDAG::NodeRawConstPtrs PlannerActionsVisitor::visit(ActionsDAG & actions_dag, QueryTreeNodePtr expression_node)
{ {
PlannerActionsVisitorImpl actions_visitor_impl(actions_dag, planner_context, use_column_identifier_as_action_node_name); PlannerActionsVisitorImpl actions_visitor_impl(
actions_dag, planner_context, use_column_identifier_as_action_node_name, always_use_const_column_for_constant_nodes);
return actions_visitor_impl.visit(expression_node); return actions_visitor_impl.visit(expression_node);
} }

View File

@ -27,11 +27,17 @@ using PlannerContextPtr = std::shared_ptr<PlannerContext>;
* During actions build, there is special handling for following functions: * During actions build, there is special handling for following functions:
* 1. Aggregate functions are added in actions dag as INPUT nodes. Aggregate functions arguments are not added. * 1. Aggregate functions are added in actions dag as INPUT nodes. Aggregate functions arguments are not added.
* 2. For function `in` and its variants, already collected sets from planner context are used. * 2. For function `in` and its variants, already collected sets from planner context are used.
* 3. When building actions that use CONSTANT nodes, by default we ignore pre-existing INPUTs if those don't have
* a column (a const column always has a column). This is for compatibility with previous headers. We disable this
* behaviour when we explicitly want to override CONSTANT nodes with the input (resolving InterpolateNode for example)
*/ */
class PlannerActionsVisitor class PlannerActionsVisitor
{ {
public: public:
explicit PlannerActionsVisitor(const PlannerContextPtr & planner_context_, bool use_column_identifier_as_action_node_name_ = true); explicit PlannerActionsVisitor(
const PlannerContextPtr & planner_context_,
bool use_column_identifier_as_action_node_name_ = true,
bool always_use_const_column_for_constant_nodes_ = true);
/** Add actions necessary to calculate expression node into expression dag. /** Add actions necessary to calculate expression node into expression dag.
* Necessary actions are not added in actions dag output. * Necessary actions are not added in actions dag output.
@ -42,6 +48,7 @@ public:
private: private:
const PlannerContextPtr planner_context; const PlannerContextPtr planner_context;
bool use_column_identifier_as_action_node_name = true; bool use_column_identifier_as_action_node_name = true;
bool always_use_const_column_for_constant_nodes = true;
}; };
/** Calculate query tree expression node action dag name and add them into node to name map. /** Calculate query tree expression node action dag name and add them into node to name map.

View File

@ -109,7 +109,7 @@ void ScatterByPartitionTransform::generateOutputChunks()
hash.reset(num_rows); hash.reset(num_rows);
for (const auto & column_number : key_columns) for (const auto & column_number : key_columns)
columns[column_number]->updateWeakHash32(hash); hash.update(columns[column_number]->getWeakHash32());
const auto & hash_data = hash.getData(); const auto & hash_data = hash.getData();
IColumn::Selector selector(num_rows); IColumn::Selector selector(num_rows);

View File

@ -1292,6 +1292,7 @@ def main() -> int:
pass pass
if Utils.is_killed_with_oom(): if Utils.is_killed_with_oom():
print("WARNING: OOM while job execution") print("WARNING: OOM while job execution")
print(subprocess.run("sudo dmesg -T", check=False))
error_description = f"Out Of Memory, exit_code {job_report.exit_code}" error_description = f"Out Of Memory, exit_code {job_report.exit_code}"
else: else:
error_description = f"Unknown, exit_code {job_report.exit_code}" error_description = f"Unknown, exit_code {job_report.exit_code}"

View File

@ -2220,13 +2220,11 @@ def test_rabbitmq_commit_on_block_write(rabbitmq_cluster):
def test_rabbitmq_no_connection_at_startup_1(rabbitmq_cluster): def test_rabbitmq_no_connection_at_startup_1(rabbitmq_cluster):
# no connection when table is initialized error = instance.query_and_get_error(
rabbitmq_cluster.pause_container("rabbitmq1")
instance.query_and_get_error(
""" """
CREATE TABLE test.cs (key UInt64, value UInt64) CREATE TABLE test.cs (key UInt64, value UInt64)
ENGINE = RabbitMQ ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', SETTINGS rabbitmq_host_port = 'no_connection_at_startup:5672',
rabbitmq_exchange_name = 'cs', rabbitmq_exchange_name = 'cs',
rabbitmq_format = 'JSONEachRow', rabbitmq_format = 'JSONEachRow',
rabbitmq_flush_interval_ms=1000, rabbitmq_flush_interval_ms=1000,
@ -2234,7 +2232,7 @@ def test_rabbitmq_no_connection_at_startup_1(rabbitmq_cluster):
rabbitmq_row_delimiter = '\\n'; rabbitmq_row_delimiter = '\\n';
""" """
) )
rabbitmq_cluster.unpause_container("rabbitmq1") assert "CANNOT_CONNECT_RABBITMQ" in error
def test_rabbitmq_no_connection_at_startup_2(rabbitmq_cluster): def test_rabbitmq_no_connection_at_startup_2(rabbitmq_cluster):

View File

@ -8,13 +8,13 @@
40 40
41 41
0 41
2 42 2 42
2 42 2 42
43 43
0 43
11 11
11 11

View File

@ -1,3 +1,3 @@
Parquet Parquet
e76a749f346078a6a43e0cbd25f0d18a - 3249508141921544766
400 400

View File

@ -1,5 +1,6 @@
#!/usr/bin/env bash #!/usr/bin/env bash
# Tags: no-ubsan, no-fasttest # Tags: long, no-ubsan, no-fasttest, no-parallel, no-asan, no-msan, no-tsan
# This test requires around 10 GB of memory and it is just too much.
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh # shellcheck source=../shell_config.sh
@ -121,9 +122,12 @@ echo "Parquet"
#} #}
DATA_FILE=$CUR_DIR/data_parquet/string_int_list_inconsistent_offset_multiple_batches.parquet DATA_FILE=$CUR_DIR/data_parquet/string_int_list_inconsistent_offset_multiple_batches.parquet
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS parquet_load"
${CLICKHOUSE_CLIENT} --query="CREATE TABLE parquet_load (ints Array(Int64), strings Nullable(String)) ENGINE = Memory" ${CLICKHOUSE_LOCAL} --multiquery "
cat "$DATA_FILE" | ${CLICKHOUSE_CLIENT} -q "INSERT INTO parquet_load FORMAT Parquet" DROP TABLE IF EXISTS parquet_load;
${CLICKHOUSE_CLIENT} --query="SELECT * FROM parquet_load" | md5sum CREATE TABLE parquet_load (ints Array(Int64), strings Nullable(String)) ENGINE = Memory;
${CLICKHOUSE_CLIENT} --query="SELECT count() FROM parquet_load" INSERT INTO parquet_load FROM INFILE '$DATA_FILE';
${CLICKHOUSE_CLIENT} --query="drop table parquet_load" SELECT sum(cityHash64(*)) FROM parquet_load;
SELECT count() FROM parquet_load;
DROP TABLE parquet_load;
"

View File

@ -1,3 +1,6 @@
-- Tags: no-random-merge-tree-settings, no-random-settings
-- Because we compare part sizes, and they could be affected by index granularity and index compression settings.
CREATE TABLE part_log_bytes_uncompressed ( CREATE TABLE part_log_bytes_uncompressed (
key UInt8, key UInt8,
value UInt8 value UInt8
@ -17,7 +20,8 @@ ALTER TABLE part_log_bytes_uncompressed DROP PART 'all_4_4_0' SETTINGS mutations
SYSTEM FLUSH LOGS; SYSTEM FLUSH LOGS;
SELECT event_type, table, part_name, bytes_uncompressed > 0, size_in_bytes < bytes_uncompressed FROM system.part_log SELECT event_type, table, part_name, bytes_uncompressed > 0, (bytes_uncompressed > 0 ? (size_in_bytes < bytes_uncompressed ? '1' : toString((size_in_bytes, bytes_uncompressed))) : '0')
FROM system.part_log
WHERE event_date >= yesterday() AND database = currentDatabase() AND table = 'part_log_bytes_uncompressed' WHERE event_date >= yesterday() AND database = currentDatabase() AND table = 'part_log_bytes_uncompressed'
AND (event_type != 'RemovePart' OR part_name = 'all_4_4_0') -- ignore removal of other parts AND (event_type != 'RemovePart' OR part_name = 'all_4_4_0') -- ignore removal of other parts
ORDER BY part_name, event_type; ORDER BY part_name, event_type;

View File

@ -1,10 +0,0 @@
before
rmt_master NewPart 0 1
rmt_master MergeParts 0 1
rmt_slave MergeParts 1 0
rmt_slave DownloadPart 0 1
after
rmt_master NewPart 0 1
rmt_master MergeParts 0 1
rmt_slave MergeParts 1 0
rmt_slave DownloadPart 0 2

View File

@ -1,35 +0,0 @@
-- Tags: no-replicated-database, no-parallel, no-shared-merge-tree
-- SMT: The merge process is completely different from RMT
drop table if exists rmt_master;
drop table if exists rmt_slave;
create table rmt_master (key Int) engine=ReplicatedMergeTree('/clickhouse/{database}', 'master') order by key settings always_fetch_merged_part=0;
-- always_fetch_merged_part=1, consider this table as a "slave"
create table rmt_slave (key Int) engine=ReplicatedMergeTree('/clickhouse/{database}', 'slave') order by key settings always_fetch_merged_part=1;
insert into rmt_master values (1);
system sync replica rmt_master;
system sync replica rmt_slave;
system stop replicated sends rmt_master;
optimize table rmt_master final settings alter_sync=1, optimize_throw_if_noop=1;
select sleep(3) format Null;
system flush logs;
select 'before';
select table, event_type, error>0, countIf(error=0) from system.part_log where database = currentDatabase() group by 1, 2, 3 order by 1, 2, 3;
system start replicated sends rmt_master;
-- sleep few seconds to try rmt_slave to fetch the part and reflect this error
-- in system.part_log
select sleep(3) format Null;
system sync replica rmt_slave;
system flush logs;
select 'after';
select table, event_type, error>0, countIf(error=0) from system.part_log where database = currentDatabase() group by 1, 2, 3 order by 1, 2, 3;
drop table rmt_master;
drop table rmt_slave;

View File

@ -1,10 +0,0 @@
before
rmt_master NewPart 0 1
rmt_master MutatePart 0 1
rmt_slave DownloadPart 0 1
rmt_slave MutatePart 1 0
after
rmt_master NewPart 0 1
rmt_master MutatePart 0 1
rmt_slave DownloadPart 0 2
rmt_slave MutatePart 1 0

View File

@ -1,41 +0,0 @@
-- Tags: no-replicated-database, no-parallel, no-shared-merge-tree
-- SMT: The merge process is completely different from RMT
drop table if exists rmt_master;
drop table if exists rmt_slave;
create table rmt_master (key Int) engine=ReplicatedMergeTree('/clickhouse/{database}', 'master') order by tuple() settings always_fetch_merged_part=0, old_parts_lifetime=600;
-- prefer_fetch_merged_part_*_threshold=0, consider this table as a "slave"
create table rmt_slave (key Int) engine=ReplicatedMergeTree('/clickhouse/{database}', 'slave') order by tuple() settings prefer_fetch_merged_part_time_threshold=0, prefer_fetch_merged_part_size_threshold=0, old_parts_lifetime=600;
insert into rmt_master values (1);
system sync replica rmt_master;
system sync replica rmt_slave;
system stop replicated sends rmt_master;
system stop pulling replication log rmt_slave;
alter table rmt_master update key=key+100 where 1 settings alter_sync=1;
-- first we need to make the rmt_master execute mutation so that it will have
-- the part, and rmt_slave will consider it instead of performing mutation on
-- it's own, otherwise prefer_fetch_merged_part_*_threshold will be simply ignored
select sleep(3) format Null;
system start pulling replication log rmt_slave;
-- and sleep few more seconds to try rmt_slave to fetch the part and reflect
-- this error in system.part_log
select sleep(3) format Null;
system flush logs;
select 'before';
select table, event_type, error>0, countIf(error=0) from system.part_log where database = currentDatabase() group by 1, 2, 3 order by 1, 2, 3;
system start replicated sends rmt_master;
select sleep(3) format Null;
system sync replica rmt_slave;
system flush logs;
select 'after';
select table, event_type, error>0, countIf(error=0) from system.part_log where database = currentDatabase() group by 1, 2, 3 order by 1, 2, 3;
drop table rmt_master;
drop table rmt_slave;

View File

@ -0,0 +1,55 @@
-- { echoOn }
SELECT concatWithSeparator('.', toUInt128(6), '666' as b, materialize(toLowCardinality(8)))
FROM system.one
GROUP BY '666';
6.666.8
SELECT concatWithSeparator('.', toUInt128(6), '666' as b, materialize(toLowCardinality(8)))
FROM remote('127.0.0.{1,1}', 'system.one')
GROUP BY '666';
6.666.8
-- https://github.com/ClickHouse/ClickHouse/issues/63006
SELECT
6,
concat(' World', toUInt128(6), 6, 6, 6, toNullable(6), materialize(toLowCardinality(toNullable(toUInt128(6))))) AS a,
concat(concat(' World', 6, toLowCardinality(6), ' World', toUInt256(6), materialize(6), 6, toNullable(6), 6, 6, NULL, 6, 6), ' World', 6, 6, 6, 6, toUInt256(6), NULL, 6, 6) AS b
FROM system.one
GROUP BY toNullable(6)
WITH ROLLUP
WITH TOTALS;
6 World666666 \N
6 World666666 \N
6 World666666 \N
SELECT
6,
concat(' World', toUInt128(6), 6, 6, 6, toNullable(6), materialize(toLowCardinality(toNullable(toUInt128(6))))) AS a,
concat(concat(' World', 6, toLowCardinality(6), ' World', toUInt256(6), materialize(6), 6, toNullable(6), 6, 6, NULL, 6, 6), ' World', 6, 6, 6, 6, toUInt256(6), NULL, 6, 6) AS b
FROM remote('127.0.0.1')
GROUP BY toNullable(6)
WITH ROLLUP
WITH TOTALS;
6 World666666 \N
6 World666666 \N
6 World666666 \N
-- { echoOn }
SELECT
'%',
tuple(concat('%', 1, toLowCardinality(toLowCardinality(toNullable(materialize(1)))), currentDatabase(), 101., toNullable(13), '%AS%id_02%', toNullable(toNullable(10)), toLowCardinality(toNullable(10)), 10, 10)),
(toDecimal128(99.67, 6), 36, 61, 14)
FROM dist_03174
WHERE dummy IN (0, '255')
GROUP BY
toNullable(13),
(99.67, 61, toLowCardinality(14));
% ('%11default10113%AS%id_02%10101010') (99.67,36,61,14)
-- { echoOn }
SELECT
38,
concat(position(concat(concat(position(concat(toUInt256(3)), 'ca', 2), 3), NULLIF(1, materialize(toLowCardinality(1)))), toLowCardinality(toNullable('ca'))), concat(NULLIF(1, 1), concat(3), toNullable(3)))
FROM set_index_not__fuzz_0
GROUP BY
toNullable(3),
concat(concat(CAST(NULL, 'Nullable(Int8)'), toNullable(3)))
FORMAT Null
SETTINGS max_threads = 1, allow_experimental_analyzer = 1, cluster_for_parallel_replicas = 'parallel_replicas', max_parallel_replicas = 3, allow_experimental_parallel_reading_from_replicas = 1, parallel_replicas_for_non_replicated_merge_tree = 1, max_threads = 1;

View File

@ -0,0 +1,80 @@
-- There are various tests that check that group by keys don't propagate into functions replacing const arguments
-- by full (empty) columns
DROP TABLE IF EXISTS dist_03174;
DROP TABLE IF EXISTS set_index_not__fuzz_0;
-- https://github.com/ClickHouse/ClickHouse/issues/63006
SET allow_experimental_analyzer=1;
-- { echoOn }
SELECT concatWithSeparator('.', toUInt128(6), '666' as b, materialize(toLowCardinality(8)))
FROM system.one
GROUP BY '666';
SELECT concatWithSeparator('.', toUInt128(6), '666' as b, materialize(toLowCardinality(8)))
FROM remote('127.0.0.{1,1}', 'system.one')
GROUP BY '666';
-- https://github.com/ClickHouse/ClickHouse/issues/63006
SELECT
6,
concat(' World', toUInt128(6), 6, 6, 6, toNullable(6), materialize(toLowCardinality(toNullable(toUInt128(6))))) AS a,
concat(concat(' World', 6, toLowCardinality(6), ' World', toUInt256(6), materialize(6), 6, toNullable(6), 6, 6, NULL, 6, 6), ' World', 6, 6, 6, 6, toUInt256(6), NULL, 6, 6) AS b
FROM system.one
GROUP BY toNullable(6)
WITH ROLLUP
WITH TOTALS;
SELECT
6,
concat(' World', toUInt128(6), 6, 6, 6, toNullable(6), materialize(toLowCardinality(toNullable(toUInt128(6))))) AS a,
concat(concat(' World', 6, toLowCardinality(6), ' World', toUInt256(6), materialize(6), 6, toNullable(6), 6, 6, NULL, 6, 6), ' World', 6, 6, 6, 6, toUInt256(6), NULL, 6, 6) AS b
FROM remote('127.0.0.1')
GROUP BY toNullable(6)
WITH ROLLUP
WITH TOTALS;
-- https://github.com/ClickHouse/ClickHouse/issues/64945
-- { echoOff }
CREATE TABLE dist_03174 AS system.one ENGINE = Distributed(test_cluster_two_shards, system, one, dummy);
-- { echoOn }
SELECT
'%',
tuple(concat('%', 1, toLowCardinality(toLowCardinality(toNullable(materialize(1)))), currentDatabase(), 101., toNullable(13), '%AS%id_02%', toNullable(toNullable(10)), toLowCardinality(toNullable(10)), 10, 10)),
(toDecimal128(99.67, 6), 36, 61, 14)
FROM dist_03174
WHERE dummy IN (0, '255')
GROUP BY
toNullable(13),
(99.67, 61, toLowCardinality(14));
-- Parallel replicas
-- { echoOff }
CREATE TABLE set_index_not__fuzz_0
(
`name` String,
`status` Enum8('alive' = 0, 'rip' = 1),
INDEX idx_status status TYPE set(2) GRANULARITY 1
)
ENGINE = MergeTree()
ORDER BY name;
INSERT INTO set_index_not__fuzz_0 SELECT * FROM generateRandom() LIMIT 10;
-- { echoOn }
SELECT
38,
concat(position(concat(concat(position(concat(toUInt256(3)), 'ca', 2), 3), NULLIF(1, materialize(toLowCardinality(1)))), toLowCardinality(toNullable('ca'))), concat(NULLIF(1, 1), concat(3), toNullable(3)))
FROM set_index_not__fuzz_0
GROUP BY
toNullable(3),
concat(concat(CAST(NULL, 'Nullable(Int8)'), toNullable(3)))
FORMAT Null
SETTINGS max_threads = 1, allow_experimental_analyzer = 1, cluster_for_parallel_replicas = 'parallel_replicas', max_parallel_replicas = 3, allow_experimental_parallel_reading_from_replicas = 1, parallel_replicas_for_non_replicated_merge_tree = 1, max_threads = 1;
-- { echoOff }
DROP TABLE IF EXISTS dist_03174;
DROP TABLE IF EXISTS set_index_not__fuzz_0;

View File

@ -0,0 +1,10 @@
false 1 1
true 1 1
---
false 1 1
false 1 2
false 1 3
true 1 1
true 1 2
---
-755809149 0

View File

@ -0,0 +1,33 @@
create table t(c Int32, d Bool) Engine=MergeTree order by c;
system stop merges t;
insert into t values (1, 0);
insert into t values (1, 0);
insert into t values (1, 1);
insert into t values (1, 0)(1, 1);
SELECT d, c, row_number() over (partition by d order by c) as c8 FROM t qualify c8=1 order by d settings max_threads=2, allow_experimental_analyzer = 1;
SELECT '---';
SELECT d, c, row_number() over (partition by d order by c) as c8 FROM t order by d, c8 settings max_threads=2;
SELECT '---';
drop table t;
create table t (
c Int32 primary key ,
s Bool ,
w Float64
);
system stop merges t;
insert into t values(439499072,true,0),(1393290072,true,0);
insert into t values(-1317193174,false,0),(1929066636,false,0);
insert into t values(-2,false,0),(1962246186,true,0),(2054878592,false,0);
insert into t values(-1893563136,true,41.55);
insert into t values(-1338380855,true,-0.7),(-991301833,true,0),(-755809149,false,43.18),(-41,true,0),(3,false,0),(255,false,0),(255,false,0),(189195893,false,0),(195550885,false,9223372036854776000);
SELECT * FROM (
SELECT c, min(w) OVER (PARTITION BY s ORDER BY c ASC, s ASC, w ASC)
FROM t limit toUInt64(-1))
WHERE c = -755809149;