Merge branch 'master' into fix/binlog_checksum

This commit is contained in:
Winter Zhang 2020-11-16 21:53:22 +08:00 committed by GitHub
commit f6a205d64b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 570 additions and 218 deletions

View File

@ -5,6 +5,9 @@
/// (See at http://www.boost.org/LICENSE_1_0.txt)
#include "throwError.h"
#include <cfloat>
#include <limits>
#include <cassert>
namespace wide
{
@ -192,7 +195,7 @@ struct integer<Bits, Signed>::_impl
}
template <typename T>
constexpr static auto to_Integral(T f) noexcept
__attribute__((no_sanitize("undefined"))) constexpr static auto to_Integral(T f) noexcept
{
if constexpr (std::is_same_v<T, __int128>)
return f;
@ -225,25 +228,54 @@ struct integer<Bits, Signed>::_impl
self.items[i] = 0;
}
constexpr static void wide_integer_from_bultin(integer<Bits, Signed> & self, double rhs) noexcept
{
if ((rhs > 0 && rhs < std::numeric_limits<uint64_t>::max()) || (rhs < 0 && rhs > std::numeric_limits<int64_t>::min()))
/**
* N.B. t is constructed from double, so max(t) = max(double) ~ 2^310
* the recursive call happens when t / 2^64 > 2^64, so there won't be more than 5 of them.
*
* t = a1 * max_int + b1, a1 > max_int, b1 < max_int
* a1 = a2 * max_int + b2, a2 > max_int, b2 < max_int
* a_(n - 1) = a_n * max_int + b2, a_n <= max_int <- base case.
*/
template <class T>
constexpr static void set_multiplier(integer<Bits, Signed> & self, T t) noexcept {
constexpr uint64_t max_int = std::numeric_limits<uint64_t>::max();
const T alpha = t / max_int;
if (alpha <= max_int)
self = static_cast<uint64_t>(alpha);
else // max(double) / 2^64 will surely contain less than 52 precision bits, so speed up computations.
set_multiplier<double>(self, alpha);
self *= max_int;
self += static_cast<uint64_t>(t - alpha * max_int); // += b_i
}
constexpr static void wide_integer_from_bultin(integer<Bits, Signed>& self, double rhs) noexcept {
constexpr int64_t max_int = std::numeric_limits<int64_t>::max();
constexpr int64_t min_int = std::numeric_limits<int64_t>::min();
/// There are values in int64 that have more than 53 significant bits (in terms of double
/// representation). Such values, being promoted to double, are rounded up or down. If they are rounded up,
/// the result may not fit in 64 bits.
/// The example of such a number is 9.22337e+18.
/// As to_Integral does a static_cast to int64_t, it may result in UB.
/// The necessary check here is that long double has enough significant (mantissa) bits to store the
/// int64_t max value precisely.
static_assert(LDBL_MANT_DIG >= 64,
"On your system long double has less than 64 precision bits,"
"which may result in UB when initializing double from int64_t");
if ((rhs > 0 && rhs < max_int) || (rhs < 0 && rhs > min_int))
{
self = to_Integral(rhs);
self = static_cast<int64_t>(rhs);
return;
}
long double r = rhs;
if (r < 0)
r = -r;
const long double rhs_long_double = (static_cast<long double>(rhs) < 0)
? -static_cast<long double>(rhs)
: rhs;
size_t count = r / std::numeric_limits<uint64_t>::max();
self = count;
self *= std::numeric_limits<uint64_t>::max();
long double to_diff = count;
to_diff *= std::numeric_limits<uint64_t>::max();
self += to_Integral(r - to_diff);
set_multiplier(self, rhs_long_double);
if (rhs < 0)
self = -self;

View File

@ -29,7 +29,7 @@ def dowload_with_progress(url, path):
logging.info("Downloading from %s to temp path %s", url, path)
for i in range(RETRIES_COUNT):
try:
with open(path, 'w') as f:
with open(path, 'wb') as f:
response = requests.get(url, stream=True)
response.raise_for_status()
total_length = response.headers.get('content-length')

View File

@ -21,7 +21,7 @@ mkdocs-htmlproofer-plugin==0.0.3
mkdocs-macros-plugin==0.4.20
nltk==3.5
nose==1.3.7
protobuf==3.13.0
protobuf==3.14.0
numpy==1.19.2
Pygments==2.5.2
pymdown-extensions==8.0

View File

@ -8,7 +8,7 @@ namespace DB
{
AggregateFunctionPtr AggregateFunctionCount::getOwnNullAdapter(
const AggregateFunctionPtr &, const DataTypes & types, const Array & params) const
const AggregateFunctionPtr &, const DataTypes & types, const Array & params, const AggregateFunctionProperties & /*properties*/) const
{
return std::make_shared<AggregateFunctionCountNotNullUnary>(types[0], params);
}

View File

@ -69,7 +69,7 @@ public:
}
AggregateFunctionPtr getOwnNullAdapter(
const AggregateFunctionPtr &, const DataTypes & types, const Array & params) const override;
const AggregateFunctionPtr &, const DataTypes & types, const Array & params, const AggregateFunctionProperties & /*properties*/) const override;
};

View File

@ -1,6 +1,7 @@
#include <AggregateFunctions/AggregateFunctionIf.h>
#include <AggregateFunctions/AggregateFunctionCombinatorFactory.h>
#include "registerAggregateFunctions.h"
#include "AggregateFunctionNull.h"
namespace DB
@ -8,6 +9,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
@ -40,6 +42,164 @@ public:
}
};
/** There are two cases: for single argument and variadic.
* Code for single argument is much more efficient.
*/
template <bool result_is_nullable, bool serialize_flag>
class AggregateFunctionIfNullUnary final
: public AggregateFunctionNullBase<result_is_nullable, serialize_flag,
AggregateFunctionIfNullUnary<result_is_nullable, serialize_flag>>
{
private:
size_t num_arguments;
using Base = AggregateFunctionNullBase<result_is_nullable, serialize_flag,
AggregateFunctionIfNullUnary<result_is_nullable, serialize_flag>>;
public:
String getName() const override
{
return Base::getName();
}
AggregateFunctionIfNullUnary(AggregateFunctionPtr nested_function_, const DataTypes & arguments, const Array & params)
: Base(std::move(nested_function_), arguments, params), num_arguments(arguments.size())
{
if (num_arguments == 0)
throw Exception("Aggregate function " + getName() + " require at least one argument",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
}
static inline bool singleFilter(const IColumn ** columns, size_t row_num, size_t num_arguments)
{
const IColumn * filter_column = columns[num_arguments - 1];
if (const ColumnNullable * nullable_column = typeid_cast<const ColumnNullable *>(filter_column))
filter_column = nullable_column->getNestedColumnPtr().get();
return assert_cast<const ColumnUInt8 &>(*filter_column).getData()[row_num];
}
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const override
{
const ColumnNullable * column = assert_cast<const ColumnNullable *>(columns[0]);
const IColumn * nested_column = &column->getNestedColumn();
if (!column->isNullAt(row_num) && singleFilter(columns, row_num, num_arguments))
{
this->setFlag(place);
this->nested_function->add(this->nestedPlace(place), &nested_column, row_num, arena);
}
}
};
template <bool result_is_nullable, bool serialize_flag, bool null_is_skipped>
class AggregateFunctionIfNullVariadic final
: public AggregateFunctionNullBase<result_is_nullable, serialize_flag,
AggregateFunctionIfNullVariadic<result_is_nullable, serialize_flag, null_is_skipped>>
{
public:
String getName() const override
{
return Base::getName();
}
AggregateFunctionIfNullVariadic(AggregateFunctionPtr nested_function_, const DataTypes & arguments, const Array & params)
: Base(std::move(nested_function_), arguments, params), number_of_arguments(arguments.size())
{
if (number_of_arguments == 1)
throw Exception("Logical error: single argument is passed to AggregateFunctionIfNullVariadic", ErrorCodes::LOGICAL_ERROR);
if (number_of_arguments > MAX_ARGS)
throw Exception("Maximum number of arguments for aggregate function with Nullable types is " + toString(size_t(MAX_ARGS)),
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
for (size_t i = 0; i < number_of_arguments; ++i)
is_nullable[i] = arguments[i]->isNullable();
}
static inline bool singleFilter(const IColumn ** columns, size_t row_num, size_t num_arguments)
{
return assert_cast<const ColumnUInt8 &>(*columns[num_arguments - 1]).getData()[row_num];
}
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const override
{
/// This container stores the columns we really pass to the nested function.
const IColumn * nested_columns[number_of_arguments];
for (size_t i = 0; i < number_of_arguments; ++i)
{
if (is_nullable[i])
{
const ColumnNullable & nullable_col = assert_cast<const ColumnNullable &>(*columns[i]);
if (null_is_skipped && nullable_col.isNullAt(row_num))
{
/// If at least one column has a null value in the current row,
/// we don't process this row.
return;
}
nested_columns[i] = &nullable_col.getNestedColumn();
}
else
nested_columns[i] = columns[i];
}
if (singleFilter(nested_columns, row_num, number_of_arguments))
{
this->setFlag(place);
this->nested_function->add(this->nestedPlace(place), nested_columns, row_num, arena);
}
}
private:
using Base = AggregateFunctionNullBase<result_is_nullable, serialize_flag,
AggregateFunctionIfNullVariadic<result_is_nullable, serialize_flag, null_is_skipped>>;
enum { MAX_ARGS = 8 };
size_t number_of_arguments = 0;
std::array<char, MAX_ARGS> is_nullable; /// Plain array is better than std::vector due to one indirection less.
};
AggregateFunctionPtr AggregateFunctionIf::getOwnNullAdapter(
const AggregateFunctionPtr & nested_function, const DataTypes & arguments,
const Array & params, const AggregateFunctionProperties & properties) const
{
bool return_type_is_nullable = !properties.returns_default_when_only_null && getReturnType()->canBeInsideNullable();
size_t nullable_size = std::count_if(arguments.begin(), arguments.end(), [](const auto & element) { return element->isNullable(); });
return_type_is_nullable &= nullable_size != 1 || !arguments.back()->isNullable(); /// If only condition is nullable. we should non-nullable type.
bool serialize_flag = return_type_is_nullable || properties.returns_default_when_only_null;
if (arguments.size() <= 2 && arguments.front()->isNullable())
{
if (return_type_is_nullable)
{
return std::make_shared<AggregateFunctionIfNullUnary<true, true>>(nested_func, arguments, params);
}
else
{
if (serialize_flag)
return std::make_shared<AggregateFunctionIfNullUnary<false, true>>(nested_func, arguments, params);
else
return std::make_shared<AggregateFunctionIfNullUnary<false, false>>(nested_func, arguments, params);
}
}
else
{
if (return_type_is_nullable)
{
return std::make_shared<AggregateFunctionIfNullVariadic<true, true, true>>(nested_function, arguments, params);
}
else
{
if (serialize_flag)
return std::make_shared<AggregateFunctionIfNullVariadic<false, true, true>>(nested_function, arguments, params);
else
return std::make_shared<AggregateFunctionIfNullVariadic<false, false, true>>(nested_function, arguments, params);
}
}
}
void registerAggregateFunctionCombinatorIf(AggregateFunctionCombinatorFactory & factory)
{
factory.registerCombinator(std::make_shared<AggregateFunctionCombinatorIf>());

View File

@ -109,6 +109,10 @@ public:
{
return nested_func->isState();
}
AggregateFunctionPtr getOwnNullAdapter(
const AggregateFunctionPtr & nested_function, const DataTypes & arguments,
const Array & params, const AggregateFunctionProperties & properties) const override;
};
}

View File

@ -72,7 +72,7 @@ public:
assert(nested_function);
if (auto adapter = nested_function->getOwnNullAdapter(nested_function, arguments, params))
if (auto adapter = nested_function->getOwnNullAdapter(nested_function, arguments, params, properties))
return adapter;
/// If applied to aggregate function with -State combinator, we apply -Null combinator to it's nested_function instead of itself.

View File

@ -239,7 +239,8 @@ public:
}
AggregateFunctionPtr getOwnNullAdapter(
const AggregateFunctionPtr & nested_function, const DataTypes & arguments, const Array & params) const override
const AggregateFunctionPtr & nested_function, const DataTypes & arguments, const Array & params,
const AggregateFunctionProperties & /*properties*/) const override
{
return std::make_shared<AggregateFunctionNullVariadic<false, false, false>>(nested_function, arguments, params);
}

View File

@ -33,6 +33,7 @@ using ConstAggregateDataPtr = const char *;
class IAggregateFunction;
using AggregateFunctionPtr = std::shared_ptr<IAggregateFunction>;
struct AggregateFunctionProperties;
/** Aggregate functions interface.
* Instances of classes with this interface do not contain the data itself for aggregation,
@ -185,7 +186,8 @@ public:
* arguments and params are for nested_function.
*/
virtual AggregateFunctionPtr getOwnNullAdapter(
const AggregateFunctionPtr & /*nested_function*/, const DataTypes & /*arguments*/, const Array & /*params*/) const
const AggregateFunctionPtr & /*nested_function*/, const DataTypes & /*arguments*/,
const Array & /*params*/, const AggregateFunctionProperties & /*properties*/) const
{
return nullptr;
}

View File

@ -55,32 +55,16 @@ void ColumnDecimal<T>::compareColumn(const IColumn & rhs, size_t rhs_row_num,
template <typename T>
StringRef ColumnDecimal<T>::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
{
if constexpr (is_POD)
{
auto * pos = arena.allocContinue(sizeof(T), begin);
memcpy(pos, &data[n], sizeof(T));
return StringRef(pos, sizeof(T));
}
else
{
char * pos = arena.allocContinue(BigInt<T>::size, begin);
return BigInt<Int256>::serialize(data[n], pos);
}
auto * pos = arena.allocContinue(sizeof(T), begin);
memcpy(pos, &data[n], sizeof(T));
return StringRef(pos, sizeof(T));
}
template <typename T>
const char * ColumnDecimal<T>::deserializeAndInsertFromArena(const char * pos)
{
if constexpr (is_POD)
{
data.push_back(unalignedLoad<T>(pos));
return pos + sizeof(T);
}
else
{
data.push_back(BigInt<Int256>::deserialize(pos));
return pos + BigInt<Int256>::size;
}
data.push_back(unalignedLoad<T>(pos));
return pos + sizeof(T);
}
template <typename T>
@ -252,24 +236,13 @@ MutableColumnPtr ColumnDecimal<T>::cloneResized(size_t size) const
new_col.data.resize(size);
size_t count = std::min(this->size(), size);
if constexpr (is_POD)
{
memcpy(new_col.data.data(), data.data(), count * sizeof(data[0]));
if (size > count)
{
void * tail = &new_col.data[count];
memset(tail, 0, (size - count) * sizeof(T));
}
}
else
{
for (size_t i = 0; i < count; i++)
new_col.data[i] = data[i];
memcpy(new_col.data.data(), data.data(), count * sizeof(data[0]));
if (size > count)
for (size_t i = count; i < size; i++)
new_col.data[i] = T{};
if (size > count)
{
void * tail = &new_col.data[count];
memset(tail, 0, (size - count) * sizeof(T));
}
}
@ -279,16 +252,9 @@ MutableColumnPtr ColumnDecimal<T>::cloneResized(size_t size) const
template <typename T>
void ColumnDecimal<T>::insertData(const char * src, size_t /*length*/)
{
if constexpr (is_POD)
{
T tmp;
memcpy(&tmp, src, sizeof(T));
data.emplace_back(tmp);
}
else
{
data.push_back(BigInt<Int256>::deserialize(src));
}
T tmp;
memcpy(&tmp, src, sizeof(T));
data.emplace_back(tmp);
}
template <typename T>
@ -303,13 +269,8 @@ void ColumnDecimal<T>::insertRangeFrom(const IColumn & src, size_t start, size_t
size_t old_size = data.size();
data.resize(old_size + length);
if constexpr (is_POD)
memcpy(data.data() + old_size, &src_vec.data[start], length * sizeof(data[0]));
else
{
for (size_t i = 0; i < length; i++)
data[old_size + i] = src_vec.data[start + i];
}
memcpy(data.data() + old_size, &src_vec.data[start], length * sizeof(data[0]));
}
template <typename T>

View File

@ -4,6 +4,7 @@
#include <Columns/IColumn.h>
#include <Columns/IColumnImpl.h>
#include <Core/Field.h>
#include <Core/DecimalFunctions.h>
#include <Common/typeid_cast.h>
#include <common/sort.h>
@ -12,12 +13,6 @@
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
/// PaddedPODArray extended by Decimal scale
template <typename T>
class DecimalPaddedPODArray : public PaddedPODArray<T>
@ -55,43 +50,6 @@ private:
UInt32 scale;
};
/// std::vector extended by Decimal scale
template <typename T>
class DecimalVector : public std::vector<T>
{
public:
using Base = std::vector<T>;
using Base::operator[];
DecimalVector(size_t size, UInt32 scale_)
: Base(size),
scale(scale_)
{}
DecimalVector(const DecimalVector & other)
: Base(other.begin(), other.end()),
scale(other.scale)
{}
DecimalVector(DecimalVector && other)
{
this->swap(other);
std::swap(scale, other.scale);
}
DecimalVector & operator=(DecimalVector && other)
{
this->swap(other);
std::swap(scale, other.scale);
return *this;
}
UInt32 getScale() const { return scale; }
private:
UInt32 scale;
};
/// A ColumnVector for Decimals
template <typename T>
class ColumnDecimal final : public COWHelper<ColumnVectorHelper, ColumnDecimal<T>>
@ -105,10 +63,7 @@ private:
public:
using ValueType = T;
using NativeT = typename T::NativeType;
static constexpr bool is_POD = !is_big_int_v<NativeT>;
using Container = std::conditional_t<is_POD,
DecimalPaddedPODArray<T>,
DecimalVector<T>>;
using Container = DecimalPaddedPODArray<T>;
private:
ColumnDecimal(const size_t n, UInt32 scale_)
@ -132,18 +87,8 @@ public:
size_t size() const override { return data.size(); }
size_t byteSize() const override { return data.size() * sizeof(data[0]); }
size_t allocatedBytes() const override
{
if constexpr (is_POD)
return data.allocated_bytes();
else
return data.capacity() * sizeof(data[0]);
}
void protect() override
{
if constexpr (is_POD)
data.protect();
}
size_t allocatedBytes() const override { return data.allocated_bytes(); }
void protect() override { data.protect(); }
void reserve(size_t n) override { data.reserve(n); }
void insertFrom(const IColumn & src, size_t n) override { data.push_back(static_cast<const Self &>(src).getData()[n]); }
@ -151,38 +96,28 @@ public:
void insertDefault() override { data.push_back(T()); }
virtual void insertManyDefaults(size_t length) override
{
if constexpr (is_POD)
data.resize_fill(data.size() + length);
else
data.resize(data.size() + length);
data.resize_fill(data.size() + length);
}
void insert(const Field & x) override { data.push_back(DB::get<NearestFieldType<T>>(x)); }
void insertRangeFrom(const IColumn & src, size_t start, size_t length) override;
void popBack(size_t n) override
{
if constexpr (is_POD)
data.resize_assume_reserved(data.size() - n);
else
data.resize(data.size() - n);
data.resize_assume_reserved(data.size() - n);
}
StringRef getRawData() const override
{
if constexpr (is_POD)
return StringRef(reinterpret_cast<const char*>(data.data()), byteSize());
else
throw Exception("getRawData() is not implemented for big integers", ErrorCodes::NOT_IMPLEMENTED);
return StringRef(reinterpret_cast<const char*>(data.data()), byteSize());
}
StringRef getDataAt(size_t n) const override
{
if constexpr (is_POD)
return StringRef(reinterpret_cast<const char *>(&data[n]), sizeof(data[n]));
else
throw Exception("getDataAt() is not implemented for big integers", ErrorCodes::NOT_IMPLEMENTED);
return StringRef(reinterpret_cast<const char *>(&data[n]), sizeof(data[n]));
}
Float64 getFloat64(size_t n) const final { return DecimalUtils::convertTo<Float64>(data[n], scale); }
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;
void updateHashWithValue(size_t n, SipHash & hash) const override;

View File

@ -522,6 +522,8 @@
M(553, LZMA_STREAM_ENCODER_FAILED) \
M(554, LZMA_STREAM_DECODER_FAILED) \
M(555, ROCKSDB_ERROR) \
M(556, SYNC_MYSQL_USER_ACCESS_ERROR)\
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \
M(1001, STD_EXCEPTION) \

View File

@ -57,6 +57,7 @@ public:
using Op = Operation<CompareInt, CompareInt>;
using ColVecA = std::conditional_t<IsDecimalNumber<A>, ColumnDecimal<A>, ColumnVector<A>>;
using ColVecB = std::conditional_t<IsDecimalNumber<B>, ColumnDecimal<B>, ColumnVector<B>>;
using ArrayA = typename ColVecA::Container;
using ArrayB = typename ColVecB::Container;

View File

@ -145,7 +145,7 @@ struct Decimal
operator T () const { return value; }
template <typename U>
U convertTo()
U convertTo() const
{
/// no IsDecimalNumber defined yet
if constexpr (std::is_same_v<U, Decimal<Int32>> ||

View File

@ -10,6 +10,7 @@
#include <Common/CurrentThread.h>
#include <Common/setThreadName.h>
#include <Common/ThreadPool.h>
#include <Common/checkStackSize.h>
#include <Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h>
#include <Storages/StorageValues.h>
#include <Storages/LiveView/StorageLiveView.h>
@ -29,6 +30,8 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
, context(context_)
, query_ptr(query_ptr_)
{
checkStackSize();
/** TODO This is a very important line. At any insertion into the table one of streams should own lock.
* Although now any insertion into the table is done via PushingToViewsBlockOutputStream,
* but it's clear that here is not the best place for this functionality.

View File

@ -466,75 +466,66 @@ struct WhichDataType
{
TypeIndex idx;
WhichDataType(TypeIndex idx_ = TypeIndex::Nothing)
: idx(idx_)
{}
constexpr WhichDataType(TypeIndex idx_ = TypeIndex::Nothing) : idx(idx_) {}
constexpr WhichDataType(const IDataType & data_type) : idx(data_type.getTypeId()) {}
constexpr WhichDataType(const IDataType * data_type) : idx(data_type->getTypeId()) {}
WhichDataType(const IDataType & data_type)
: idx(data_type.getTypeId())
{}
// shared ptr -> is non-constexpr in gcc
WhichDataType(const DataTypePtr & data_type) : idx(data_type->getTypeId()) {}
WhichDataType(const IDataType * data_type)
: idx(data_type->getTypeId())
{}
constexpr bool isUInt8() const { return idx == TypeIndex::UInt8; }
constexpr bool isUInt16() const { return idx == TypeIndex::UInt16; }
constexpr bool isUInt32() const { return idx == TypeIndex::UInt32; }
constexpr bool isUInt64() const { return idx == TypeIndex::UInt64; }
constexpr bool isUInt128() const { return idx == TypeIndex::UInt128; }
constexpr bool isUInt256() const { return idx == TypeIndex::UInt256; }
constexpr bool isUInt() const { return isUInt8() || isUInt16() || isUInt32() || isUInt64() || isUInt128() || isUInt256(); }
constexpr bool isNativeUInt() const { return isUInt8() || isUInt16() || isUInt32() || isUInt64(); }
WhichDataType(const DataTypePtr & data_type)
: idx(data_type->getTypeId())
{}
constexpr bool isInt8() const { return idx == TypeIndex::Int8; }
constexpr bool isInt16() const { return idx == TypeIndex::Int16; }
constexpr bool isInt32() const { return idx == TypeIndex::Int32; }
constexpr bool isInt64() const { return idx == TypeIndex::Int64; }
constexpr bool isInt128() const { return idx == TypeIndex::Int128; }
constexpr bool isInt256() const { return idx == TypeIndex::Int256; }
constexpr bool isInt() const { return isInt8() || isInt16() || isInt32() || isInt64() || isInt128() || isInt256(); }
constexpr bool isNativeInt() const { return isInt8() || isInt16() || isInt32() || isInt64(); }
bool isUInt8() const { return idx == TypeIndex::UInt8; }
bool isUInt16() const { return idx == TypeIndex::UInt16; }
bool isUInt32() const { return idx == TypeIndex::UInt32; }
bool isUInt64() const { return idx == TypeIndex::UInt64; }
bool isUInt128() const { return idx == TypeIndex::UInt128; }
bool isUInt256() const { return idx == TypeIndex::UInt256; }
bool isUInt() const { return isUInt8() || isUInt16() || isUInt32() || isUInt64() || isUInt128() || isUInt256(); }
bool isNativeUInt() const { return isUInt8() || isUInt16() || isUInt32() || isUInt64(); }
constexpr bool isDecimal32() const { return idx == TypeIndex::Decimal32; }
constexpr bool isDecimal64() const { return idx == TypeIndex::Decimal64; }
constexpr bool isDecimal128() const { return idx == TypeIndex::Decimal128; }
constexpr bool isDecimal256() const { return idx == TypeIndex::Decimal256; }
constexpr bool isDecimal() const { return isDecimal32() || isDecimal64() || isDecimal128() || isDecimal256(); }
bool isInt8() const { return idx == TypeIndex::Int8; }
bool isInt16() const { return idx == TypeIndex::Int16; }
bool isInt32() const { return idx == TypeIndex::Int32; }
bool isInt64() const { return idx == TypeIndex::Int64; }
bool isInt128() const { return idx == TypeIndex::Int128; }
bool isInt256() const { return idx == TypeIndex::Int256; }
bool isInt() const { return isInt8() || isInt16() || isInt32() || isInt64() || isInt128() || isInt256(); }
bool isNativeInt() const { return isInt8() || isInt16() || isInt32() || isInt64(); }
constexpr bool isFloat32() const { return idx == TypeIndex::Float32; }
constexpr bool isFloat64() const { return idx == TypeIndex::Float64; }
constexpr bool isFloat() const { return isFloat32() || isFloat64(); }
bool isDecimal32() const { return idx == TypeIndex::Decimal32; }
bool isDecimal64() const { return idx == TypeIndex::Decimal64; }
bool isDecimal128() const { return idx == TypeIndex::Decimal128; }
bool isDecimal256() const { return idx == TypeIndex::Decimal256; }
bool isDecimal() const { return isDecimal32() || isDecimal64() || isDecimal128() || isDecimal256(); }
constexpr bool isEnum8() const { return idx == TypeIndex::Enum8; }
constexpr bool isEnum16() const { return idx == TypeIndex::Enum16; }
constexpr bool isEnum() const { return isEnum8() || isEnum16(); }
bool isFloat32() const { return idx == TypeIndex::Float32; }
bool isFloat64() const { return idx == TypeIndex::Float64; }
bool isFloat() const { return isFloat32() || isFloat64(); }
constexpr bool isDate() const { return idx == TypeIndex::Date; }
constexpr bool isDateTime() const { return idx == TypeIndex::DateTime; }
constexpr bool isDateTime64() const { return idx == TypeIndex::DateTime64; }
constexpr bool isDateOrDateTime() const { return isDate() || isDateTime() || isDateTime64(); }
bool isEnum8() const { return idx == TypeIndex::Enum8; }
bool isEnum16() const { return idx == TypeIndex::Enum16; }
bool isEnum() const { return isEnum8() || isEnum16(); }
constexpr bool isString() const { return idx == TypeIndex::String; }
constexpr bool isFixedString() const { return idx == TypeIndex::FixedString; }
constexpr bool isStringOrFixedString() const { return isString() || isFixedString(); }
bool isDate() const { return idx == TypeIndex::Date; }
bool isDateTime() const { return idx == TypeIndex::DateTime; }
bool isDateTime64() const { return idx == TypeIndex::DateTime64; }
bool isDateOrDateTime() const { return isDate() || isDateTime() || isDateTime64(); }
constexpr bool isUUID() const { return idx == TypeIndex::UUID; }
constexpr bool isArray() const { return idx == TypeIndex::Array; }
constexpr bool isTuple() const { return idx == TypeIndex::Tuple; }
constexpr bool isSet() const { return idx == TypeIndex::Set; }
constexpr bool isInterval() const { return idx == TypeIndex::Interval; }
bool isString() const { return idx == TypeIndex::String; }
bool isFixedString() const { return idx == TypeIndex::FixedString; }
bool isStringOrFixedString() const { return isString() || isFixedString(); }
constexpr bool isNothing() const { return idx == TypeIndex::Nothing; }
constexpr bool isNullable() const { return idx == TypeIndex::Nullable; }
constexpr bool isFunction() const { return idx == TypeIndex::Function; }
constexpr bool isAggregateFunction() const { return idx == TypeIndex::AggregateFunction; }
bool isUUID() const { return idx == TypeIndex::UUID; }
bool isArray() const { return idx == TypeIndex::Array; }
bool isTuple() const { return idx == TypeIndex::Tuple; }
bool isSet() const { return idx == TypeIndex::Set; }
bool isInterval() const { return idx == TypeIndex::Interval; }
bool isNothing() const { return idx == TypeIndex::Nothing; }
bool isNullable() const { return idx == TypeIndex::Nullable; }
bool isFunction() const { return idx == TypeIndex::Function; }
bool isAggregateFunction() const { return idx == TypeIndex::AggregateFunction; }
bool IsBigIntOrDeimal() const { return isInt128() || isInt256() || isUInt256() || isDecimal256(); }
constexpr bool IsBigIntOrDeimal() const { return isInt128() || isInt256() || isUInt256() || isDecimal256(); }
};
/// IDataType helpers (alternative for IDataType virtual methods with single point of truth)

View File

@ -12,6 +12,7 @@
#include <Common/quoteString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/Operators.h>
namespace DB
{
@ -19,6 +20,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int SYNC_MYSQL_USER_ACCESS_ERROR;
}
static std::unordered_map<String, String> fetchTablesCreateQuery(
@ -64,6 +66,7 @@ static std::vector<String> fetchTablesInDB(const mysqlxx::PoolWithFailover::Entr
return tables_in_db;
}
void MaterializeMetadata::fetchMasterStatus(mysqlxx::PoolWithFailover::Entry & connection)
{
Block header{
@ -128,6 +131,49 @@ static Block getShowMasterLogHeader(const String & mysql_version)
};
}
static bool checkSyncUserPrivImpl(mysqlxx::PoolWithFailover::Entry & connection, WriteBuffer & out)
{
Block sync_user_privs_header
{
{std::make_shared<DataTypeString>(), "current_user_grants"}
};
String grants_query, sub_privs;
MySQLBlockInputStream input(connection, "SHOW GRANTS FOR CURRENT_USER();", sync_user_privs_header, DEFAULT_BLOCK_SIZE);
while (Block block = input.read())
{
for (size_t index = 0; index < block.rows(); ++index)
{
grants_query = (*block.getByPosition(0).column)[index].safeGet<String>();
out << grants_query << "; ";
sub_privs = grants_query.substr(0, grants_query.find(" ON "));
if (sub_privs.find("ALL PRIVILEGES") == std::string::npos)
{
if ((sub_privs.find("RELOAD") != std::string::npos and
sub_privs.find("REPLICATION SLAVE") != std::string::npos and
sub_privs.find("REPLICATION CLIENT") != std::string::npos))
return true;
}
else
{
return true;
}
}
}
return false;
}
static void checkSyncUserPriv(mysqlxx::PoolWithFailover::Entry & connection)
{
WriteBufferFromOwnString out;
if (!checkSyncUserPrivImpl(connection, out))
throw Exception("MySQL SYNC USER ACCESS ERR: mysql sync user needs "
"at least GLOBAL PRIVILEGES:'RELOAD, REPLICATION SLAVE, REPLICATION CLIENT' "
"and SELECT PRIVILEGE on MySQL Database."
"But the SYNC USER grant query is: " + out.str(), ErrorCodes::SYNC_MYSQL_USER_ACCESS_ERROR);
}
bool MaterializeMetadata::checkBinlogFileExists(mysqlxx::PoolWithFailover::Entry & connection, const String & mysql_version) const
{
MySQLBlockInputStream input(connection, "SHOW MASTER LOGS", getShowMasterLogHeader(mysql_version), DEFAULT_BLOCK_SIZE);
@ -190,6 +236,8 @@ MaterializeMetadata::MaterializeMetadata(
const String & database, bool & opened_transaction, const String & mysql_version)
: persistent_path(path_)
{
checkSyncUserPriv(connection);
if (Poco::File(persistent_path).exists())
{
ReadBufferFromFile in(persistent_path, DBMS_DEFAULT_BUFFER_SIZE);

View File

@ -5,7 +5,6 @@
#if USE_MYSQL
#include <Databases/MySQL/MaterializeMySQLSyncThread.h>
# include <cstdlib>
# include <random>
# include <Columns/ColumnTuple.h>
@ -34,6 +33,8 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
extern const int NOT_IMPLEMENTED;
extern const int ILLEGAL_MYSQL_VARIABLE;
extern const int SYNC_MYSQL_USER_ACCESS_ERROR;
extern const int UNKNOWN_DATABASE;
}
static constexpr auto MYSQL_BACKGROUND_THREAD_NAME = "MySQLDBSync";
@ -214,10 +215,33 @@ void MaterializeMySQLSyncThread::stopSynchronization()
void MaterializeMySQLSyncThread::startSynchronization()
{
const auto & mysql_server_version = checkVariableAndGetVersion(pool.get());
try
{
const auto & mysql_server_version = checkVariableAndGetVersion(pool.get());
background_thread_pool = std::make_unique<ThreadFromGlobalPool>(
[this, mysql_server_version = mysql_server_version]() { synchronization(mysql_server_version); });
background_thread_pool = std::make_unique<ThreadFromGlobalPool>(
[this, mysql_server_version = mysql_server_version]() { synchronization(mysql_server_version); });
}
catch (...)
{
try
{
throw;
}
catch (mysqlxx::ConnectionFailed & e)
{
if (e.errnum() == ER_ACCESS_DENIED_ERROR
|| e.errnum() == ER_DBACCESS_DENIED_ERROR)
throw Exception("MySQL SYNC USER ACCESS ERR: mysql sync user needs "
"at least GLOBAL PRIVILEGES:'RELOAD, REPLICATION SLAVE, REPLICATION CLIENT' "
"and SELECT PRIVILEGE on Database " + mysql_database_name
, ErrorCodes::SYNC_MYSQL_USER_ACCESS_ERROR);
else if (e.errnum() == ER_BAD_DB_ERROR)
throw Exception("Unknown database '" + mysql_database_name + "' on MySQL", ErrorCodes::UNKNOWN_DATABASE);
else
throw;
}
}
}
static inline void cleanOutdatedTables(const String & database_name, const Context & context)

View File

@ -20,6 +20,7 @@
# include <mysqlxx/Pool.h>
# include <mysqlxx/PoolWithFailover.h>
namespace DB
{
@ -63,6 +64,12 @@ private:
MaterializeMySQLSettings * settings;
String query_prefix;
// USE MySQL ERROR CODE:
// https://dev.mysql.com/doc/mysql-errors/5.7/en/server-error-reference.html
const int ER_ACCESS_DENIED_ERROR = 1045;
const int ER_DBACCESS_DENIED_ERROR = 1044;
const int ER_BAD_DB_ERROR = 1049;
struct Buffers
{
String database;

View File

@ -131,8 +131,13 @@ public:
data.reject();
}
static bool needChildVisit(const ASTPtr &, const ASTPtr &)
static bool needChildVisit(const ASTPtr & parent, const ASTPtr &)
{
/// Currently we check monotonicity only for single-argument functions.
/// Although, multi-argument functions with all but one constant arguments can also be monotonic.
if (const auto * func = typeid_cast<const ASTFunction *>(parent.get()))
return func->arguments->children.size() < 2;
return true;
}
};

View File

@ -21,6 +21,7 @@
#include <Storages/SelectQueryDescription.h>
#include <Common/typeid_cast.h>
#include <Common/checkStackSize.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/QueryPlan/SettingQuotaAndLimitsStep.h>
@ -30,6 +31,7 @@ namespace DB
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int NOT_IMPLEMENTED;
extern const int INCORRECT_QUERY;
extern const int QUERY_IS_NOT_SUPPORTED_IN_MATERIALIZED_VIEW;
@ -72,7 +74,11 @@ StorageMaterializedView::StorageMaterializedView(
setInMemoryMetadata(storage_metadata);
if (!has_inner_table)
{
if (query.to_table_id.database_name == table_id_.database_name && query.to_table_id.table_name == table_id_.table_name)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Materialized view {} cannot point to itself", table_id_.getFullTableName());
target_table_id = query.to_table_id;
}
else if (attach_)
{
/// If there is an ATTACH request, then the internal table must already be created.
@ -351,11 +357,13 @@ void StorageMaterializedView::shutdown()
StoragePtr StorageMaterializedView::getTargetTable() const
{
checkStackSize();
return DatabaseCatalog::instance().getTable(target_table_id, global_context);
}
StoragePtr StorageMaterializedView::tryGetTargetTable() const
{
checkStackSize();
return DatabaseCatalog::instance().tryGetTable(target_table_id, global_context);
}

View File

@ -2,8 +2,10 @@ import time
import pymysql.cursors
import pytest
from helpers.client import QueryRuntimeException
def check_query(clickhouse_node, query, result_set, retry_count=3, interval_seconds=3):
def check_query(clickhouse_node, query, result_set, retry_count=60, interval_seconds=3):
lastest_result = ''
for index in range(retry_count):
lastest_result = clickhouse_node.query(query)
@ -18,6 +20,8 @@ def check_query(clickhouse_node, query, result_set, retry_count=3, interval_seco
def dml_with_materialize_mysql_database(clickhouse_node, mysql_node, service_name):
mysql_node.query("DROP DATABASE IF EXISTS test_database")
clickhouse_node.query("DROP DATABASE IF EXISTS test_database")
mysql_node.query("CREATE DATABASE test_database DEFAULT CHARACTER SET 'utf8'")
# existed before the mapping was created
@ -100,6 +104,8 @@ def dml_with_materialize_mysql_database(clickhouse_node, mysql_node, service_nam
def materialize_mysql_database_with_datetime_and_decimal(clickhouse_node, mysql_node, service_name):
mysql_node.query("DROP DATABASE IF EXISTS test_database")
clickhouse_node.query("DROP DATABASE IF EXISTS test_database")
mysql_node.query("CREATE DATABASE test_database DEFAULT CHARACTER SET 'utf8'")
mysql_node.query("CREATE TABLE test_database.test_table_1 (`key` INT NOT NULL PRIMARY KEY, _datetime DateTime(6), _timestamp TIMESTAMP(3), _decimal DECIMAL(65, 30)) ENGINE = InnoDB;")
mysql_node.query("INSERT INTO test_database.test_table_1 VALUES(1, '2020-01-01 01:02:03.999999', '2020-01-01 01:02:03.999', " + ('9' * 35) + "." + ('9' * 30) + ")")
@ -121,6 +127,7 @@ def materialize_mysql_database_with_datetime_and_decimal(clickhouse_node, mysql_
mysql_node.query("INSERT INTO test_database.test_table_2 VALUES(2, '2020-01-01 01:02:03.000000', '2020-01-01 01:02:03.000', ." + ('0' * 29) + "1)")
mysql_node.query("INSERT INTO test_database.test_table_2 VALUES(3, '2020-01-01 01:02:03.9999', '2020-01-01 01:02:03.99', -" + ('9' * 35) + "." + ('9' * 30) + ")")
mysql_node.query("INSERT INTO test_database.test_table_2 VALUES(4, '2020-01-01 01:02:03.9999', '2020-01-01 01:02:03.9999', -." + ('0' * 29) + "1)")
check_query(clickhouse_node, "SHOW TABLES FROM test_database FORMAT TSV", "test_table_1\ntest_table_2\n")
check_query(clickhouse_node, "SELECT * FROM test_database.test_table_2 ORDER BY key FORMAT TSV",
"1\t2020-01-01 01:02:03.999999\t2020-01-01 01:02:03.999\t" + ('9' * 35) + "." + ('9' * 30) + "\n"
"2\t2020-01-01 01:02:03.000000\t2020-01-01 01:02:03.000\t0." + ('0' * 29) + "1\n"
@ -132,6 +139,8 @@ def materialize_mysql_database_with_datetime_and_decimal(clickhouse_node, mysql_
def drop_table_with_materialize_mysql_database(clickhouse_node, mysql_node, service_name):
mysql_node.query("DROP DATABASE IF EXISTS test_database")
clickhouse_node.query("DROP DATABASE IF EXISTS test_database")
mysql_node.query("CREATE DATABASE test_database DEFAULT CHARACTER SET 'utf8'")
mysql_node.query("CREATE TABLE test_database.test_table_1 (id INT NOT NULL PRIMARY KEY) ENGINE = InnoDB;")
@ -164,8 +173,9 @@ def drop_table_with_materialize_mysql_database(clickhouse_node, mysql_node, serv
clickhouse_node.query("DROP DATABASE test_database")
mysql_node.query("DROP DATABASE test_database")
def create_table_with_materialize_mysql_database(clickhouse_node, mysql_node, service_name):
mysql_node.query("DROP DATABASE IF EXISTS test_database")
clickhouse_node.query("DROP DATABASE IF EXISTS test_database")
mysql_node.query("CREATE DATABASE test_database DEFAULT CHARACTER SET 'utf8'")
# existed before the mapping was created
mysql_node.query("CREATE TABLE test_database.test_table_1 (id INT NOT NULL PRIMARY KEY) ENGINE = InnoDB;")
@ -194,6 +204,8 @@ def create_table_with_materialize_mysql_database(clickhouse_node, mysql_node, se
def rename_table_with_materialize_mysql_database(clickhouse_node, mysql_node, service_name):
mysql_node.query("DROP DATABASE IF EXISTS test_database")
clickhouse_node.query("DROP DATABASE IF EXISTS test_database")
mysql_node.query("CREATE DATABASE test_database DEFAULT CHARACTER SET 'utf8'")
mysql_node.query("CREATE TABLE test_database.test_table_1 (id INT NOT NULL PRIMARY KEY) ENGINE = InnoDB;")
@ -214,6 +226,8 @@ def rename_table_with_materialize_mysql_database(clickhouse_node, mysql_node, se
def alter_add_column_with_materialize_mysql_database(clickhouse_node, mysql_node, service_name):
mysql_node.query("DROP DATABASE IF EXISTS test_database")
clickhouse_node.query("DROP DATABASE IF EXISTS test_database")
mysql_node.query("CREATE DATABASE test_database DEFAULT CHARACTER SET 'utf8'")
mysql_node.query("CREATE TABLE test_database.test_table_1 (id INT NOT NULL PRIMARY KEY) ENGINE = InnoDB;")
@ -255,6 +269,8 @@ def alter_add_column_with_materialize_mysql_database(clickhouse_node, mysql_node
def alter_drop_column_with_materialize_mysql_database(clickhouse_node, mysql_node, service_name):
mysql_node.query("DROP DATABASE IF EXISTS test_database")
clickhouse_node.query("DROP DATABASE IF EXISTS test_database")
mysql_node.query("CREATE DATABASE test_database DEFAULT CHARACTER SET 'utf8'")
mysql_node.query(
"CREATE TABLE test_database.test_table_1 (id INT NOT NULL PRIMARY KEY, drop_column INT) ENGINE = InnoDB;")
@ -287,6 +303,8 @@ def alter_drop_column_with_materialize_mysql_database(clickhouse_node, mysql_nod
def alter_rename_column_with_materialize_mysql_database(clickhouse_node, mysql_node, service_name):
mysql_node.query("DROP DATABASE IF EXISTS test_database")
clickhouse_node.query("DROP DATABASE IF EXISTS test_database")
mysql_node.query("CREATE DATABASE test_database DEFAULT CHARACTER SET 'utf8'")
# maybe should test rename primary key?
@ -322,6 +340,8 @@ def alter_rename_column_with_materialize_mysql_database(clickhouse_node, mysql_n
def alter_modify_column_with_materialize_mysql_database(clickhouse_node, mysql_node, service_name):
mysql_node.query("DROP DATABASE IF EXISTS test_database")
clickhouse_node.query("DROP DATABASE IF EXISTS test_database")
mysql_node.query("CREATE DATABASE test_database DEFAULT CHARACTER SET 'utf8'")
# maybe should test rename primary key?
@ -366,6 +386,8 @@ def alter_modify_column_with_materialize_mysql_database(clickhouse_node, mysql_n
# pass
def alter_rename_table_with_materialize_mysql_database(clickhouse_node, mysql_node, service_name):
mysql_node.query("DROP DATABASE IF EXISTS test_database")
clickhouse_node.query("DROP DATABASE IF EXISTS test_database")
mysql_node.query("CREATE DATABASE test_database DEFAULT CHARACTER SET 'utf8'")
mysql_node.query(
"CREATE TABLE test_database.test_table_1 (id INT NOT NULL PRIMARY KEY, drop_column INT) ENGINE = InnoDB;")
@ -401,6 +423,8 @@ def alter_rename_table_with_materialize_mysql_database(clickhouse_node, mysql_no
def query_event_with_empty_transaction(clickhouse_node, mysql_node, service_name):
mysql_node.query("DROP DATABASE IF EXISTS test_database")
clickhouse_node.query("DROP DATABASE IF EXISTS test_database")
mysql_node.query("CREATE DATABASE test_database")
mysql_node.query("RESET MASTER")
@ -434,6 +458,8 @@ def query_event_with_empty_transaction(clickhouse_node, mysql_node, service_name
def select_without_columns(clickhouse_node, mysql_node, service_name):
mysql_node.query("DROP DATABASE IF EXISTS db")
clickhouse_node.query("DROP DATABASE IF EXISTS db")
mysql_node.query("CREATE DATABASE db")
mysql_node.query("CREATE TABLE db.t (a INT PRIMARY KEY, b INT)")
clickhouse_node.query(
@ -482,3 +508,52 @@ def insert_with_modify_binlog_checksum(clickhouse_node, mysql_node, service_name
clickhouse_node.query("DROP DATABASE test_checksum")
mysql_node.query("DROP DATABASE test_checksum")
def err_sync_user_privs_with_materialize_mysql_database(clickhouse_node, mysql_node, service_name):
clickhouse_node.query("DROP DATABASE IF EXISTS priv_err_db")
mysql_node.query("DROP DATABASE IF EXISTS priv_err_db")
mysql_node.query("CREATE DATABASE priv_err_db DEFAULT CHARACTER SET 'utf8'")
mysql_node.query("CREATE TABLE priv_err_db.test_table_1 (id INT NOT NULL PRIMARY KEY) ENGINE = InnoDB;")
mysql_node.query("INSERT INTO priv_err_db.test_table_1 VALUES(1);")
mysql_node.result("SHOW GRANTS FOR 'test'@'%';")
clickhouse_node.query(
"CREATE DATABASE priv_err_db ENGINE = MaterializeMySQL('{}:3306', 'priv_err_db', 'test', '123')".format(
service_name))
# wait MaterializeMySQL read binlog events
check_query(clickhouse_node, "SHOW TABLES FROM priv_err_db FORMAT TSV;", "test_table_1\n")
check_query(clickhouse_node, "SELECT count() FROM priv_err_db.test_table_1 FORMAT TSV", "1\n", 30, 5)
mysql_node.query("INSERT INTO priv_err_db.test_table_1 VALUES(2);")
check_query(clickhouse_node, "SELECT count() FROM priv_err_db.test_table_1 FORMAT TSV", "2\n")
clickhouse_node.query("DROP DATABASE priv_err_db;")
mysql_node.query("REVOKE REPLICATION SLAVE ON *.* FROM 'test'@'%'")
clickhouse_node.query(
"CREATE DATABASE priv_err_db ENGINE = MaterializeMySQL('{}:3306', 'priv_err_db', 'test', '123')".format(
service_name))
assert "priv_err_db" in clickhouse_node.query("SHOW DATABASES")
assert "test_table_1" not in clickhouse_node.query("SHOW TABLES FROM priv_err_db")
clickhouse_node.query("DROP DATABASE priv_err_db")
mysql_node.query("REVOKE REPLICATION CLIENT, RELOAD ON *.* FROM 'test'@'%'")
clickhouse_node.query(
"CREATE DATABASE priv_err_db ENGINE = MaterializeMySQL('{}:3306', 'priv_err_db', 'test', '123')".format(
service_name))
assert "priv_err_db" in clickhouse_node.query("SHOW DATABASES")
assert "test_table_1" not in clickhouse_node.query("SHOW TABLES FROM priv_err_db")
clickhouse_node.query("DETACH DATABASE priv_err_db")
mysql_node.query("REVOKE SELECT ON priv_err_db.* FROM 'test'@'%'")
time.sleep(3)
with pytest.raises(QueryRuntimeException) as exception:
clickhouse_node.query("ATTACH DATABASE priv_err_db")
assert 'MySQL SYNC USER ACCESS ERR:' in str(exception.value)
assert "priv_err_db" not in clickhouse_node.query("SHOW DATABASES")
mysql_node.query("DROP DATABASE priv_err_db;")
mysql_node.grant_min_priv_for_user("test")

View File

@ -41,6 +41,20 @@ class MySQLNodeInstance:
with self.alloc_connection().cursor() as cursor:
cursor.execute(execution_query)
def create_min_priv_user(self, user, password):
self.query("CREATE USER '" + user + "'@'%' IDENTIFIED BY '" + password + "'")
self.grant_min_priv_for_user(user)
def grant_min_priv_for_user(self, user, db='priv_err_db'):
self.query("GRANT REPLICATION SLAVE, REPLICATION CLIENT, RELOAD ON *.* TO '" + user + "'@'%'")
self.query("GRANT SELECT ON " + db + ".* TO '" + user + "'@'%'")
def result(self, execution_query):
with self.alloc_connection().cursor() as cursor:
result = cursor.execute(execution_query)
if result is not None:
print(cursor.fetchall())
def close(self):
if self.mysql_connection is not None:
self.mysql_connection.close()
@ -51,6 +65,8 @@ class MySQLNodeInstance:
try:
self.alloc_connection()
print("Mysql Started")
self.create_min_priv_user("test", "123")
print("min priv user created")
return
except Exception as ex:
print("Can't connect to MySQL " + str(ex))
@ -162,3 +178,22 @@ def test_insert_with_modify_binlog_checksum_5_7(started_cluster, started_mysql_5
def test_insert_with_modify_binlog_checksum_8_0(started_cluster, started_mysql_5_7):
materialize_with_ddl.insert_with_modify_binlog_checksum(clickhouse_node, started_mysql_5_7, "mysql1")
def test_materialize_database_err_sync_user_privs_5_7(started_cluster, started_mysql_5_7):
try:
materialize_with_ddl.err_sync_user_privs_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1")
except:
print((clickhouse_node.query(
"select '\n', thread_id, query_id, arrayStringConcat(arrayMap(x -> concat(demangle(addressToSymbol(x)), '\n ', addressToLine(x)), trace), '\n') AS sym from system.stack_trace format TSVRaw")))
raise
def test_materialize_database_err_sync_user_privs_8_0(started_cluster, started_mysql_8_0):
try:
materialize_with_ddl.err_sync_user_privs_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql8_0")
except:
print((clickhouse_node.query(
"select '\n', thread_id, query_id, arrayStringConcat(arrayMap(x -> concat(demangle(addressToSymbol(x)), '\n ', addressToLine(x)), trace), '\n') AS sym from system.stack_trace format TSVRaw")))
raise

View File

@ -0,0 +1,3 @@
\N Nullable(UInt8)
\N Nullable(UInt8)
0 UInt8

View File

@ -0,0 +1,6 @@
-- Value nullable
SELECT anyIf(CAST(number, 'Nullable(UInt8)'), number = 3) AS a, toTypeName(a) FROM numbers(2);
-- Value and condition nullable
SELECT anyIf(number, number = 3) AS a, toTypeName(a) FROM (SELECT CAST(number, 'Nullable(UInt8)') AS number FROM numbers(2));
-- Condition nullable
SELECT anyIf(CAST(number, 'UInt8'), number = 3) AS a, toTypeName(a) FROM (SELECT CAST(number, 'Nullable(UInt8)') AS number FROM numbers(2));

View File

@ -0,0 +1,28 @@
DROP TABLE IF EXISTS t;
DROP TABLE IF EXISTS v;
CREATE TABLE t (c String) ENGINE = Memory;
CREATE MATERIALIZED VIEW v to v AS SELECT c FROM t; -- { serverError 36 }
CREATE MATERIALIZED VIEW v to t AS SELECT * FROM v; -- { serverError 60 }
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t2;
DROP TABLE IF EXISTS v1;
DROP TABLE IF EXISTS v2;
CREATE TABLE t1 (c String) ENGINE = Memory;
CREATE TABLE t2 (c String) ENGINE = Memory;
CREATE MATERIALIZED VIEW v1 to t1 AS SELECT * FROM t2;
CREATE MATERIALIZED VIEW v2 to t2 AS SELECT * FROM t1;
INSERT INTO t1 VALUES ('Hello'); -- { serverError 306 }
INSERT INTO t2 VALUES ('World'); -- { serverError 306 }
DROP TABLE IF EXISTS t;
DROP TABLE IF EXISTS v;
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t2;
DROP TABLE IF EXISTS v1;
DROP TABLE IF EXISTS v2;

View File

@ -1 +1 @@
([1],[5]) 4 4
([1],[4]) 4 4

View File

@ -0,0 +1,4 @@
2020-11-12
2020-11-13
2020-11-12
2020-11-13

View File

@ -0,0 +1,17 @@
WITH arrayJoin(range(2)) AS delta
SELECT
toDate(time) + toIntervalDay(delta) AS dt
FROM
(
SELECT toDateTime('2020.11.12 19:02:04') AS time
)
ORDER BY dt ASC;
WITH arrayJoin([0, 1]) AS delta
SELECT
toDate(time) + toIntervalDay(delta) AS dt
FROM
(
SELECT toDateTime('2020.11.12 19:02:04') AS time
)
ORDER BY dt ASC;