Merge remote-tracking branch 'rschu1ze/master' into factorize-constants

This commit is contained in:
Robert Schulze 2023-08-16 21:24:21 +00:00
commit 353854520c
No known key found for this signature in database
GPG Key ID: 26703B55FB13728A
73 changed files with 1255 additions and 279 deletions

View File

@ -12,6 +12,7 @@ ENV \
# install systemd packages # install systemd packages
RUN apt-get update && \ RUN apt-get update && \
apt-get install -y --no-install-recommends \ apt-get install -y --no-install-recommends \
sudo \
systemd \ systemd \
&& \ && \
apt-get clean && \ apt-get clean && \

View File

@ -190,7 +190,7 @@ These are the schema conversion manipulations you can do with table overrides fo
* Modify [column TTL](/docs/en/engines/table-engines/mergetree-family/mergetree.md/#mergetree-column-ttl). * Modify [column TTL](/docs/en/engines/table-engines/mergetree-family/mergetree.md/#mergetree-column-ttl).
* Modify [column compression codec](/docs/en/sql-reference/statements/create/table.md/#codecs). * Modify [column compression codec](/docs/en/sql-reference/statements/create/table.md/#codecs).
* Add [ALIAS columns](/docs/en/sql-reference/statements/create/table.md/#alias). * Add [ALIAS columns](/docs/en/sql-reference/statements/create/table.md/#alias).
* Add [skipping indexes](/docs/en/engines/table-engines/mergetree-family/mergetree.md/#table_engine-mergetree-data_skipping-indexes) * Add [skipping indexes](/docs/en/engines/table-engines/mergetree-family/mergetree.md/#table_engine-mergetree-data_skipping-indexes). Note that you need to enable `use_skip_indexes_if_final` setting to make them work (MaterializedMySQL is using `SELECT ... FINAL` by default)
* Add [projections](/docs/en/engines/table-engines/mergetree-family/mergetree.md/#projections). Note that projection optimizations are * Add [projections](/docs/en/engines/table-engines/mergetree-family/mergetree.md/#projections). Note that projection optimizations are
disabled when using `SELECT ... FINAL` (which MaterializedMySQL does by default), so their utility is limited here. disabled when using `SELECT ... FINAL` (which MaterializedMySQL does by default), so their utility is limited here.
`INDEX ... TYPE hypothesis` as [described in the v21.12 blog post]](https://clickhouse.com/blog/en/2021/clickhouse-v21.12-released/) `INDEX ... TYPE hypothesis` as [described in the v21.12 blog post]](https://clickhouse.com/blog/en/2021/clickhouse-v21.12-released/)

View File

@ -524,7 +524,7 @@ void ColumnAggregateFunction::insertDefault()
pushBackAndCreateState(data, arena, func.get()); pushBackAndCreateState(data, arena, func.get());
} }
StringRef ColumnAggregateFunction::serializeValueIntoArena(size_t n, Arena & arena, const char *& begin) const StringRef ColumnAggregateFunction::serializeValueIntoArena(size_t n, Arena & arena, const char *& begin, const UInt8 *) const
{ {
WriteBufferFromArena out(arena, begin); WriteBufferFromArena out(arena, begin);
func->serialize(data[n], out, version); func->serialize(data[n], out, version);

View File

@ -162,7 +162,7 @@ public:
void insertDefault() override; void insertDefault() override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override; StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const override;
const char * deserializeAndInsertFromArena(const char * src_arena) override; const char * deserializeAndInsertFromArena(const char * src_arena) override;

View File

@ -205,7 +205,7 @@ void ColumnArray::insertData(const char * pos, size_t length)
} }
StringRef ColumnArray::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const StringRef ColumnArray::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const
{ {
size_t array_size = sizeAt(n); size_t array_size = sizeAt(n);
size_t offset = offsetAt(n); size_t offset = offsetAt(n);

View File

@ -77,7 +77,7 @@ public:
StringRef getDataAt(size_t n) const override; StringRef getDataAt(size_t n) const override;
bool isDefaultAt(size_t n) const override; bool isDefaultAt(size_t n) const override;
void insertData(const char * pos, size_t length) override; void insertData(const char * pos, size_t length) override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override; StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const override;
const char * deserializeAndInsertFromArena(const char * pos) override; const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char * pos) const override; const char * skipSerializedInArena(const char * pos) const override;
void updateHashWithValue(size_t n, SipHash & hash) const override; void updateHashWithValue(size_t n, SipHash & hash) const override;

View File

@ -88,7 +88,7 @@ public:
void insertData(const char *, size_t) override { throwMustBeDecompressed(); } void insertData(const char *, size_t) override { throwMustBeDecompressed(); }
void insertDefault() override { throwMustBeDecompressed(); } void insertDefault() override { throwMustBeDecompressed(); }
void popBack(size_t) override { throwMustBeDecompressed(); } void popBack(size_t) override { throwMustBeDecompressed(); }
StringRef serializeValueIntoArena(size_t, Arena &, char const *&) const override { throwMustBeDecompressed(); } StringRef serializeValueIntoArena(size_t, Arena &, char const *&, const UInt8 *) const override { throwMustBeDecompressed(); }
const char * deserializeAndInsertFromArena(const char *) override { throwMustBeDecompressed(); } const char * deserializeAndInsertFromArena(const char *) override { throwMustBeDecompressed(); }
const char * skipSerializedInArena(const char *) const override { throwMustBeDecompressed(); } const char * skipSerializedInArena(const char *) const override { throwMustBeDecompressed(); }
void updateHashWithValue(size_t, SipHash &) const override { throwMustBeDecompressed(); } void updateHashWithValue(size_t, SipHash &) const override { throwMustBeDecompressed(); }

View File

@ -151,7 +151,7 @@ public:
s -= n; s -= n;
} }
StringRef serializeValueIntoArena(size_t, Arena & arena, char const *& begin) const override StringRef serializeValueIntoArena(size_t, Arena & arena, char const *& begin, const UInt8 *) const override
{ {
return data->serializeValueIntoArena(0, arena, begin); return data->serializeValueIntoArena(0, arena, begin);
} }

View File

@ -59,9 +59,26 @@ bool ColumnDecimal<T>::hasEqualValues() const
} }
template <is_decimal T> template <is_decimal T>
StringRef ColumnDecimal<T>::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const StringRef ColumnDecimal<T>::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit) const
{ {
auto * pos = arena.allocContinue(sizeof(T), begin); constexpr size_t null_bit_size = sizeof(UInt8);
StringRef res;
char * pos;
if (null_bit)
{
res.size = * null_bit ? null_bit_size : null_bit_size + sizeof(T);
pos = arena.allocContinue(res.size, begin);
res.data = pos;
memcpy(pos, null_bit, null_bit_size);
if (*null_bit) return res;
pos += null_bit_size;
}
else
{
res.size = sizeof(T);
pos = arena.allocContinue(res.size, begin);
res.data = pos;
}
memcpy(pos, &data[n], sizeof(T)); memcpy(pos, &data[n], sizeof(T));
return StringRef(pos, sizeof(T)); return StringRef(pos, sizeof(T));
} }

View File

@ -80,7 +80,7 @@ public:
Float64 getFloat64(size_t n) const final { return DecimalUtils::convertTo<Float64>(data[n], scale); } Float64 getFloat64(size_t n) const final { return DecimalUtils::convertTo<Float64>(data[n], scale); }
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override; StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit) const override;
const char * deserializeAndInsertFromArena(const char * pos) override; const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char * pos) const override; const char * skipSerializedInArena(const char * pos) const override;
void updateHashWithValue(size_t n, SipHash & hash) const override; void updateHashWithValue(size_t n, SipHash & hash) const override;

View File

@ -86,11 +86,28 @@ void ColumnFixedString::insertData(const char * pos, size_t length)
memset(chars.data() + old_size + length, 0, n - length); memset(chars.data() + old_size + length, 0, n - length);
} }
StringRef ColumnFixedString::serializeValueIntoArena(size_t index, Arena & arena, char const *& begin) const StringRef ColumnFixedString::serializeValueIntoArena(size_t index, Arena & arena, char const *& begin, const UInt8 * null_bit) const
{ {
auto * pos = arena.allocContinue(n, begin); constexpr size_t null_bit_size = sizeof(UInt8);
StringRef res;
char * pos;
if (null_bit)
{
res.size = * null_bit ? null_bit_size : null_bit_size + n;
pos = arena.allocContinue(res.size, begin);
res.data = pos;
memcpy(pos, null_bit, null_bit_size);
if (*null_bit) return res;
pos += null_bit_size;
}
else
{
res.size = n;
pos = arena.allocContinue(res.size, begin);
res.data = pos;
}
memcpy(pos, &chars[n * index], n); memcpy(pos, &chars[n * index], n);
return StringRef(pos, n); return res;
} }
const char * ColumnFixedString::deserializeAndInsertFromArena(const char * pos) const char * ColumnFixedString::deserializeAndInsertFromArena(const char * pos)

View File

@ -115,7 +115,7 @@ public:
chars.resize_assume_reserved(chars.size() - n * elems); chars.resize_assume_reserved(chars.size() - n * elems);
} }
StringRef serializeValueIntoArena(size_t index, Arena & arena, char const *& begin) const override; StringRef serializeValueIntoArena(size_t index, Arena & arena, char const *& begin, const UInt8 *) const override;
const char * deserializeAndInsertFromArena(const char * pos) override; const char * deserializeAndInsertFromArena(const char * pos) override;

View File

@ -96,7 +96,7 @@ public:
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot insert into {}", getName()); throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot insert into {}", getName());
} }
StringRef serializeValueIntoArena(size_t, Arena &, char const *&) const override StringRef serializeValueIntoArena(size_t, Arena &, char const *&, const UInt8 *) const override
{ {
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot serialize from {}", getName()); throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot serialize from {}", getName());
} }

View File

@ -255,7 +255,7 @@ void ColumnLowCardinality::insertData(const char * pos, size_t length)
idx.insertPosition(dictionary.getColumnUnique().uniqueInsertData(pos, length)); idx.insertPosition(dictionary.getColumnUnique().uniqueInsertData(pos, length));
} }
StringRef ColumnLowCardinality::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const StringRef ColumnLowCardinality::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const
{ {
return getDictionary().serializeValueIntoArena(getIndexes().getUInt(n), arena, begin); return getDictionary().serializeValueIntoArena(getIndexes().getUInt(n), arena, begin);
} }

View File

@ -87,7 +87,7 @@ public:
void popBack(size_t n) override { idx.popBack(n); } void popBack(size_t n) override { idx.popBack(n); }
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override; StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const override;
const char * deserializeAndInsertFromArena(const char * pos) override; const char * deserializeAndInsertFromArena(const char * pos) override;

View File

@ -111,7 +111,7 @@ void ColumnMap::popBack(size_t n)
nested->popBack(n); nested->popBack(n);
} }
StringRef ColumnMap::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const StringRef ColumnMap::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const
{ {
return nested->serializeValueIntoArena(n, arena, begin); return nested->serializeValueIntoArena(n, arena, begin);
} }

View File

@ -58,7 +58,7 @@ public:
void insert(const Field & x) override; void insert(const Field & x) override;
void insertDefault() override; void insertDefault() override;
void popBack(size_t n) override; void popBack(size_t n) override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override; StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const override;
const char * deserializeAndInsertFromArena(const char * pos) override; const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char * pos) const override; const char * skipSerializedInArena(const char * pos) const override;
void updateHashWithValue(size_t n, SipHash & hash) const override; void updateHashWithValue(size_t n, SipHash & hash) const override;

View File

@ -4,6 +4,10 @@
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
#include <Common/assert_cast.h> #include <Common/assert_cast.h>
#include <Common/WeakHash.h> #include <Common/WeakHash.h>
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnsDateTime.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnNullable.h> #include <Columns/ColumnNullable.h>
#include <Columns/ColumnConst.h> #include <Columns/ColumnConst.h>
#include <Columns/ColumnString.h> #include <Columns/ColumnString.h>
@ -34,6 +38,7 @@ ColumnNullable::ColumnNullable(MutableColumnPtr && nested_column_, MutableColumn
{ {
/// ColumnNullable cannot have constant nested column. But constant argument could be passed. Materialize it. /// ColumnNullable cannot have constant nested column. But constant argument could be passed. Materialize it.
nested_column = getNestedColumn().convertToFullColumnIfConst(); nested_column = getNestedColumn().convertToFullColumnIfConst();
nested_type = nested_column->getDataType();
if (!getNestedColumn().canBeInsideNullable()) if (!getNestedColumn().canBeInsideNullable())
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "{} cannot be inside Nullable column", getNestedColumn().getName()); throw Exception(ErrorCodes::ILLEGAL_COLUMN, "{} cannot be inside Nullable column", getNestedColumn().getName());
@ -134,21 +139,77 @@ void ColumnNullable::insertData(const char * pos, size_t length)
} }
} }
StringRef ColumnNullable::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const StringRef ColumnNullable::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const
{ {
const auto & arr = getNullMapData(); const auto & arr = getNullMapData();
static constexpr auto s = sizeof(arr[0]); static constexpr auto s = sizeof(arr[0]);
char * pos;
auto * pos = arena.allocContinue(s, begin); switch (nested_type)
memcpy(pos, &arr[n], s); {
case TypeIndex::UInt8:
if (arr[n]) return static_cast<const ColumnUInt8 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
return StringRef(pos, s); case TypeIndex::UInt16:
return static_cast<const ColumnUInt16 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
auto nested_ref = getNestedColumn().serializeValueIntoArena(n, arena, begin); case TypeIndex::UInt32:
return static_cast<const ColumnUInt32 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
/// serializeValueIntoArena may reallocate memory. Have to use ptr from nested_ref.data and move it back. case TypeIndex::UInt64:
return StringRef(nested_ref.data - s, nested_ref.size + s); return static_cast<const ColumnUInt64 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::UInt128:
return static_cast<const ColumnUInt128 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::UInt256:
return static_cast<const ColumnUInt256 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Int8:
return static_cast<const ColumnInt8 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Int16:
return static_cast<const ColumnInt16 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Int32:
return static_cast<const ColumnInt32 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Int64:
return static_cast<const ColumnInt64 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Int128:
return static_cast<const ColumnInt128 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Int256:
return static_cast<const ColumnInt256 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Float32:
return static_cast<const ColumnFloat32 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Float64:
return static_cast<const ColumnFloat64 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Date:
return static_cast<const ColumnDate *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Date32:
return static_cast<const ColumnDate32 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::DateTime:
return static_cast<const ColumnDateTime *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::DateTime64:
return static_cast<const ColumnDateTime64 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::String:
return static_cast<const ColumnString *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::FixedString:
return static_cast<const ColumnFixedString *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Decimal32:
return static_cast<const ColumnDecimal<Decimal32> *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Decimal64:
return static_cast<const ColumnDecimal<Decimal64> *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Decimal128:
return static_cast<const ColumnDecimal<Decimal128> *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Decimal256:
return static_cast<const ColumnDecimal<Decimal256> *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::UUID:
return static_cast<const ColumnUUID *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::IPv4:
return static_cast<const ColumnIPv4 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::IPv6:
return static_cast<const ColumnIPv6 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
default:
pos = arena.allocContinue(s, begin);
memcpy(pos, &arr[n], s);
if (arr[n])
return StringRef(pos, s);
auto nested_ref = getNestedColumn().serializeValueIntoArena(n, arena, begin);
/// serializeValueIntoArena may reallocate memory. Have to use ptr from nested_ref.data and move it back.
return StringRef(nested_ref.data - s, nested_ref.size + s);
}
} }
const char * ColumnNullable::deserializeAndInsertFromArena(const char * pos) const char * ColumnNullable::deserializeAndInsertFromArena(const char * pos)

View File

@ -6,6 +6,7 @@
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
#include <Common/assert_cast.h> #include <Common/assert_cast.h>
#include "Core/TypeId.h"
#include "config.h" #include "config.h"
@ -62,7 +63,7 @@ public:
StringRef getDataAt(size_t) const override; StringRef getDataAt(size_t) const override;
/// Will insert null value if pos=nullptr /// Will insert null value if pos=nullptr
void insertData(const char * pos, size_t length) override; void insertData(const char * pos, size_t length) override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override; StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit) const override;
const char * deserializeAndInsertFromArena(const char * pos) override; const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char * pos) const override; const char * skipSerializedInArena(const char * pos) const override;
void insertRangeFrom(const IColumn & src, size_t start, size_t length) override; void insertRangeFrom(const IColumn & src, size_t start, size_t length) override;
@ -212,6 +213,8 @@ public:
private: private:
WrappedPtr nested_column; WrappedPtr nested_column;
WrappedPtr null_map; WrappedPtr null_map;
// optimize serializeValueIntoArena
TypeIndex nested_type;
template <bool negative> template <bool negative>
void applyNullMapImpl(const NullMap & map); void applyNullMapImpl(const NullMap & map);

View File

@ -244,7 +244,7 @@ public:
StringRef getDataAt(size_t) const override { throwMustBeConcrete(); } StringRef getDataAt(size_t) const override { throwMustBeConcrete(); }
bool isDefaultAt(size_t) const override { throwMustBeConcrete(); } bool isDefaultAt(size_t) const override { throwMustBeConcrete(); }
void insertData(const char *, size_t) override { throwMustBeConcrete(); } void insertData(const char *, size_t) override { throwMustBeConcrete(); }
StringRef serializeValueIntoArena(size_t, Arena &, char const *&) const override { throwMustBeConcrete(); } StringRef serializeValueIntoArena(size_t, Arena &, char const *&, const UInt8 *) const override { throwMustBeConcrete(); }
const char * deserializeAndInsertFromArena(const char *) override { throwMustBeConcrete(); } const char * deserializeAndInsertFromArena(const char *) override { throwMustBeConcrete(); }
const char * skipSerializedInArena(const char *) const override { throwMustBeConcrete(); } const char * skipSerializedInArena(const char *) const override { throwMustBeConcrete(); }
void updateHashWithValue(size_t, SipHash &) const override { throwMustBeConcrete(); } void updateHashWithValue(size_t, SipHash &) const override { throwMustBeConcrete(); }

View File

@ -150,7 +150,7 @@ void ColumnSparse::insertData(const char * pos, size_t length)
insertSingleValue([&](IColumn & column) { column.insertData(pos, length); }); insertSingleValue([&](IColumn & column) { column.insertData(pos, length); });
} }
StringRef ColumnSparse::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const StringRef ColumnSparse::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const
{ {
return values->serializeValueIntoArena(getValueIndex(n), arena, begin); return values->serializeValueIntoArena(getValueIndex(n), arena, begin);
} }

View File

@ -78,7 +78,7 @@ public:
/// Will insert null value if pos=nullptr /// Will insert null value if pos=nullptr
void insertData(const char * pos, size_t length) override; void insertData(const char * pos, size_t length) override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override; StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const override;
const char * deserializeAndInsertFromArena(const char * pos) override; const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char *) const override; const char * skipSerializedInArena(const char *) const override;
void insertRangeFrom(const IColumn & src, size_t start, size_t length) override; void insertRangeFrom(const IColumn & src, size_t start, size_t length) override;

View File

@ -213,17 +213,30 @@ ColumnPtr ColumnString::permute(const Permutation & perm, size_t limit) const
} }
StringRef ColumnString::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const StringRef ColumnString::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit) const
{ {
size_t string_size = sizeAt(n); size_t string_size = sizeAt(n);
size_t offset = offsetAt(n); size_t offset = offsetAt(n);
constexpr size_t null_bit_size = sizeof(UInt8);
StringRef res; StringRef res;
res.size = sizeof(string_size) + string_size; char * pos;
char * pos = arena.allocContinue(res.size, begin); if (null_bit)
{
res.size = * null_bit ? null_bit_size : null_bit_size + sizeof(string_size) + string_size;
pos = arena.allocContinue(res.size, begin);
res.data = pos;
memcpy(pos, null_bit, null_bit_size);
if (*null_bit) return res;
pos += null_bit_size;
}
else
{
res.size = sizeof(string_size) + string_size;
pos = arena.allocContinue(res.size, begin);
res.data = pos;
}
memcpy(pos, &string_size, sizeof(string_size)); memcpy(pos, &string_size, sizeof(string_size));
memcpy(pos + sizeof(string_size), &chars[offset], string_size); memcpy(pos + sizeof(string_size), &chars[offset], string_size);
res.data = pos;
return res; return res;
} }

View File

@ -11,6 +11,7 @@
#include <Common/memcmpSmall.h> #include <Common/memcmpSmall.h>
#include <Common/assert_cast.h> #include <Common/assert_cast.h>
#include <Core/Field.h> #include <Core/Field.h>
#include <Common/Arena.h>
class Collator; class Collator;
@ -168,7 +169,7 @@ public:
offsets.resize_assume_reserved(offsets.size() - n); offsets.resize_assume_reserved(offsets.size() - n);
} }
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override; StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit) const override;
const char * deserializeAndInsertFromArena(const char * pos) override; const char * deserializeAndInsertFromArena(const char * pos) override;

View File

@ -171,7 +171,7 @@ void ColumnTuple::popBack(size_t n)
column->popBack(n); column->popBack(n);
} }
StringRef ColumnTuple::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const StringRef ColumnTuple::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const
{ {
StringRef res(begin, 0); StringRef res(begin, 0);
for (const auto & column : columns) for (const auto & column : columns)

View File

@ -61,7 +61,7 @@ public:
void insertFrom(const IColumn & src_, size_t n) override; void insertFrom(const IColumn & src_, size_t n) override;
void insertDefault() override; void insertDefault() override;
void popBack(size_t n) override; void popBack(size_t n) override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override; StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const override;
const char * deserializeAndInsertFromArena(const char * pos) override; const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char * pos) const override; const char * skipSerializedInArena(const char * pos) const override;
void updateHashWithValue(size_t n, SipHash & hash) const override; void updateHashWithValue(size_t n, SipHash & hash) const override;

View File

@ -79,7 +79,7 @@ public:
Float32 getFloat32(size_t n) const override { return getNestedColumn()->getFloat32(n); } Float32 getFloat32(size_t n) const override { return getNestedColumn()->getFloat32(n); }
bool getBool(size_t n) const override { return getNestedColumn()->getBool(n); } bool getBool(size_t n) const override { return getNestedColumn()->getBool(n); }
bool isNullAt(size_t n) const override { return is_nullable && n == getNullValueIndex(); } bool isNullAt(size_t n) const override { return is_nullable && n == getNullValueIndex(); }
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override; StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit) const override;
const char * skipSerializedInArena(const char * pos) const override; const char * skipSerializedInArena(const char * pos) const override;
void updateHashWithValue(size_t n, SipHash & hash_func) const override void updateHashWithValue(size_t n, SipHash & hash_func) const override
{ {
@ -373,7 +373,7 @@ size_t ColumnUnique<ColumnType>::uniqueInsertData(const char * pos, size_t lengt
} }
template <typename ColumnType> template <typename ColumnType>
StringRef ColumnUnique<ColumnType>::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const StringRef ColumnUnique<ColumnType>::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const
{ {
if (is_nullable) if (is_nullable)
{ {

View File

@ -49,11 +49,28 @@ namespace ErrorCodes
} }
template <typename T> template <typename T>
StringRef ColumnVector<T>::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const StringRef ColumnVector<T>::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit) const
{ {
auto * pos = arena.allocContinue(sizeof(T), begin); constexpr size_t null_bit_size = sizeof(UInt8);
StringRef res;
char * pos;
if (null_bit)
{
res.size = * null_bit ? null_bit_size : null_bit_size + sizeof(T);
pos = arena.allocContinue(res.size, begin);
res.data = pos;
memcpy(pos, null_bit, null_bit_size);
if (*null_bit) return res;
pos += null_bit_size;
}
else
{
res.size = sizeof(T);
pos = arena.allocContinue(res.size, begin);
res.data = pos;
}
unalignedStore<T>(pos, data[n]); unalignedStore<T>(pos, data[n]);
return StringRef(pos, sizeof(T)); return res;
} }
template <typename T> template <typename T>

View File

@ -174,7 +174,7 @@ public:
data.resize_assume_reserved(data.size() - n); data.resize_assume_reserved(data.size() - n);
} }
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override; StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit) const override;
const char * deserializeAndInsertFromArena(const char * pos) override; const char * deserializeAndInsertFromArena(const char * pos) override;

View File

@ -218,7 +218,7 @@ public:
* For example, to obtain unambiguous representation of Array of strings, strings data should be interleaved with their sizes. * For example, to obtain unambiguous representation of Array of strings, strings data should be interleaved with their sizes.
* Parameter begin should be used with Arena::allocContinue. * Parameter begin should be used with Arena::allocContinue.
*/ */
virtual StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const = 0; virtual StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit = nullptr) const = 0;
/// Deserializes a value that was serialized using IColumn::serializeValueIntoArena method. /// Deserializes a value that was serialized using IColumn::serializeValueIntoArena method.
/// Returns pointer to the position after the read data. /// Returns pointer to the position after the read data.

View File

@ -57,7 +57,7 @@ public:
++s; ++s;
} }
StringRef serializeValueIntoArena(size_t /*n*/, Arena & arena, char const *& begin) const override StringRef serializeValueIntoArena(size_t /*n*/, Arena & arena, char const *& begin, const UInt8 *) const override
{ {
/// Has to put one useless byte into Arena, because serialization into zero number of bytes is ambiguous. /// Has to put one useless byte into Arena, because serialization into zero number of bytes is ambiguous.
char * res = arena.allocContinue(1, begin); char * res = arena.allocContinue(1, begin);

View File

@ -117,7 +117,7 @@ void column_unique_unique_deserialize_from_arena_impl(ColumnType & column, const
const char * pos = nullptr; const char * pos = nullptr;
for (size_t i = 0; i < num_values; ++i) for (size_t i = 0; i < num_values; ++i)
{ {
auto ref = column_unique_pattern->serializeValueIntoArena(idx->getUInt(i), arena, pos); auto ref = column_unique_pattern->serializeValueIntoArena(idx->getUInt(i), arena, pos, nullptr);
const char * new_pos; const char * new_pos;
column_unique->uniqueDeserializeAndInsertFromArena(ref.data, new_pos); column_unique->uniqueDeserializeAndInsertFromArena(ref.data, new_pos);
ASSERT_EQ(new_pos - ref.data, ref.size) << "Deserialized data has different sizes at position " << i; ASSERT_EQ(new_pos - ref.data, ref.size) << "Deserialized data has different sizes at position " << i;
@ -140,8 +140,8 @@ void column_unique_unique_deserialize_from_arena_impl(ColumnType & column, const
const char * pos_lc = nullptr; const char * pos_lc = nullptr;
for (size_t i = 0; i < num_values; ++i) for (size_t i = 0; i < num_values; ++i)
{ {
auto ref_string = column.serializeValueIntoArena(i, arena_string, pos_string); auto ref_string = column.serializeValueIntoArena(i, arena_string, pos_string, nullptr);
auto ref_lc = column_unique->serializeValueIntoArena(idx->getUInt(i), arena_lc, pos_lc); auto ref_lc = column_unique->serializeValueIntoArena(idx->getUInt(i), arena_lc, pos_lc, nullptr);
ASSERT_EQ(ref_string, ref_lc) << "Serialized data is different from pattern at position " << i; ASSERT_EQ(ref_string, ref_lc) << "Serialized data is different from pattern at position " << i;
} }
} }

View File

@ -776,8 +776,12 @@ namespace
UInt64 key = 0; UInt64 key = 0;
auto * dst = reinterpret_cast<char *>(&key); auto * dst = reinterpret_cast<char *>(&key);
const auto ref = cache.from_column->getDataAt(i); const auto ref = cache.from_column->getDataAt(i);
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wunreachable-code"
if constexpr (std::endian::native == std::endian::big) if constexpr (std::endian::native == std::endian::big)
dst += sizeof(key) - ref.size; dst += sizeof(key) - ref.size;
#pragma clang diagnostic pop
memcpy(dst, ref.data, ref.size); memcpy(dst, ref.data, ref.size);
table[key] = i; table[key] = i;

View File

@ -1780,7 +1780,8 @@ void IMergeTreeDataPart::renameToDetached(const String & prefix)
part_is_probably_removed_from_disk = true; part_is_probably_removed_from_disk = true;
} }
DataPartStoragePtr IMergeTreeDataPart::makeCloneInDetached(const String & prefix, const StorageMetadataPtr & /*metadata_snapshot*/) const DataPartStoragePtr IMergeTreeDataPart::makeCloneInDetached(const String & prefix, const StorageMetadataPtr & /*metadata_snapshot*/,
const DiskTransactionPtr & disk_transaction) const
{ {
/// Avoid unneeded duplicates of broken parts if we try to detach the same broken part multiple times. /// Avoid unneeded duplicates of broken parts if we try to detach the same broken part multiple times.
/// Otherwise it may pollute detached/ with dirs with _tryN suffix and we will fail to remove broken part after 10 attempts. /// Otherwise it may pollute detached/ with dirs with _tryN suffix and we will fail to remove broken part after 10 attempts.
@ -1795,7 +1796,8 @@ DataPartStoragePtr IMergeTreeDataPart::makeCloneInDetached(const String & prefix
IDataPartStorage::ClonePartParams params IDataPartStorage::ClonePartParams params
{ {
.copy_instead_of_hardlink = isStoredOnRemoteDiskWithZeroCopySupport() && storage.supportsReplication() && storage_settings->allow_remote_fs_zero_copy_replication, .copy_instead_of_hardlink = isStoredOnRemoteDiskWithZeroCopySupport() && storage.supportsReplication() && storage_settings->allow_remote_fs_zero_copy_replication,
.make_source_readonly = true .make_source_readonly = true,
.external_transaction = disk_transaction
}; };
return getDataPartStorage().freeze( return getDataPartStorage().freeze(
storage.relative_data_path, storage.relative_data_path,

View File

@ -371,7 +371,8 @@ public:
virtual void renameTo(const String & new_relative_path, bool remove_new_dir_if_exists); virtual void renameTo(const String & new_relative_path, bool remove_new_dir_if_exists);
/// Makes clone of a part in detached/ directory via hard links /// Makes clone of a part in detached/ directory via hard links
virtual DataPartStoragePtr makeCloneInDetached(const String & prefix, const StorageMetadataPtr & metadata_snapshot) const; virtual DataPartStoragePtr makeCloneInDetached(const String & prefix, const StorageMetadataPtr & metadata_snapshot,
const DiskTransactionPtr & disk_transaction) const;
/// Makes full clone of part in specified subdirectory (relative to storage data directory, e.g. "detached") on another disk /// Makes full clone of part in specified subdirectory (relative to storage data directory, e.g. "detached") on another disk
MutableDataPartStoragePtr makeCloneOnDisk(const DiskPtr & disk, const String & directory_name) const; MutableDataPartStoragePtr makeCloneOnDisk(const DiskPtr & disk, const String & directory_name) const;

View File

@ -2619,8 +2619,50 @@ size_t MergeTreeData::clearOldBrokenPartsFromDetachedDirectory()
if (detached_parts.empty()) if (detached_parts.empty())
return 0; return 0;
PartsTemporaryRename renamed_parts(*this, "detached/"); auto get_last_touched_time = [&](const DetachedPartInfo & part_info) -> time_t
{
auto path = fs::path(relative_data_path) / "detached" / part_info.dir_name;
time_t last_change_time = part_info.disk->getLastChanged(path);
time_t last_modification_time = part_info.disk->getLastModified(path).epochTime();
return std::max(last_change_time, last_modification_time);
};
time_t ttl_seconds = getSettings()->merge_tree_clear_old_broken_detached_parts_ttl_timeout_seconds;
size_t unfinished_deleting_parts = 0;
time_t current_time = time(nullptr);
for (const auto & part_info : detached_parts)
{
if (!part_info.dir_name.starts_with("deleting_"))
continue;
time_t startup_time = current_time - static_cast<time_t>(Context::getGlobalContextInstance()->getUptimeSeconds());
time_t last_touch_time = get_last_touched_time(part_info);
/// Maybe it's being deleted right now (for example, in ALTER DROP DETACHED)
bool had_restart = last_touch_time < startup_time;
bool ttl_expired = last_touch_time + ttl_seconds <= current_time;
if (!had_restart && !ttl_expired)
continue;
/// We were trying to delete this detached part but did not finish deleting, probably because the server crashed
LOG_INFO(log, "Removing detached part {} that we failed to remove previously", part_info.dir_name);
try
{
removeDetachedPart(part_info.disk, fs::path(relative_data_path) / "detached" / part_info.dir_name / "", part_info.dir_name);
++unfinished_deleting_parts;
}
catch (...)
{
tryLogCurrentException(log);
}
}
if (!getSettings()->merge_tree_enable_clear_old_broken_detached)
return unfinished_deleting_parts;
const auto full_path = fs::path(relative_data_path) / "detached";
size_t removed_count = 0;
for (const auto & part_info : detached_parts) for (const auto & part_info : detached_parts)
{ {
if (!part_info.valid_name || part_info.prefix.empty()) if (!part_info.valid_name || part_info.prefix.empty())
@ -2635,31 +2677,24 @@ size_t MergeTreeData::clearOldBrokenPartsFromDetachedDirectory()
if (!can_be_removed_by_timeout) if (!can_be_removed_by_timeout)
continue; continue;
time_t current_time = time(nullptr); ssize_t threshold = current_time - ttl_seconds;
ssize_t threshold = current_time - getSettings()->merge_tree_clear_old_broken_detached_parts_ttl_timeout_seconds; time_t last_touch_time = get_last_touched_time(part_info);
auto path = fs::path(relative_data_path) / "detached" / part_info.dir_name;
time_t last_change_time = part_info.disk->getLastChanged(path);
time_t last_modification_time = part_info.disk->getLastModified(path).epochTime();
time_t last_touch_time = std::max(last_change_time, last_modification_time);
if (last_touch_time == 0 || last_touch_time >= threshold) if (last_touch_time == 0 || last_touch_time >= threshold)
continue; continue;
renamed_parts.addPart(part_info.dir_name, "deleting_" + part_info.dir_name, part_info.disk); const String & old_name = part_info.dir_name;
} String new_name = "deleting_" + part_info.dir_name;
part_info.disk->moveFile(fs::path(full_path) / old_name, fs::path(full_path) / new_name);
LOG_INFO(log, "Will clean up {} detached parts", renamed_parts.old_and_new_names.size()); removeDetachedPart(part_info.disk, fs::path(relative_data_path) / "detached" / new_name / "", old_name);
renamed_parts.tryRenameAll();
for (auto & [old_name, new_name, disk] : renamed_parts.old_and_new_names)
{
removeDetachedPart(disk, fs::path(relative_data_path) / "detached" / new_name / "", old_name);
LOG_WARNING(log, "Removed broken detached part {} due to a timeout for broken detached parts", old_name); LOG_WARNING(log, "Removed broken detached part {} due to a timeout for broken detached parts", old_name);
old_name.clear(); ++removed_count;
} }
return renamed_parts.old_and_new_names.size(); LOG_INFO(log, "Cleaned up {} detached parts", removed_count);
return removed_count + unfinished_deleting_parts;
} }
size_t MergeTreeData::clearOldWriteAheadLogs() size_t MergeTreeData::clearOldWriteAheadLogs()
@ -4035,7 +4070,7 @@ void MergeTreeData::restoreAndActivatePart(const DataPartPtr & part, DataPartsLo
void MergeTreeData::outdateUnexpectedPartAndCloneToDetached(const DataPartPtr & part_to_detach) void MergeTreeData::outdateUnexpectedPartAndCloneToDetached(const DataPartPtr & part_to_detach)
{ {
LOG_INFO(log, "Cloning part {} to unexpected_{} and making it obsolete.", part_to_detach->getDataPartStorage().getPartDirectory(), part_to_detach->name); LOG_INFO(log, "Cloning part {} to unexpected_{} and making it obsolete.", part_to_detach->getDataPartStorage().getPartDirectory(), part_to_detach->name);
part_to_detach->makeCloneInDetached("unexpected", getInMemoryMetadataPtr()); part_to_detach->makeCloneInDetached("unexpected", getInMemoryMetadataPtr(), /*disk_transaction*/ {});
DataPartsLock lock = lockParts(); DataPartsLock lock = lockParts();
part_to_detach->is_unexpected_local_part = true; part_to_detach->is_unexpected_local_part = true;
@ -5797,18 +5832,21 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const
{ {
const String source_dir = "detached/"; const String source_dir = "detached/";
std::map<String, DiskPtr> name_to_disk;
/// Let's compose a list of parts that should be added. /// Let's compose a list of parts that should be added.
if (attach_part) if (attach_part)
{ {
const String part_id = partition->as<ASTLiteral &>().value.safeGet<String>(); const String part_id = partition->as<ASTLiteral &>().value.safeGet<String>();
validateDetachedPartName(part_id); validateDetachedPartName(part_id);
auto disk = getDiskForDetachedPart(part_id); if (temporary_parts.contains(String(DETACHED_DIR_NAME) + "/" + part_id))
renamed_parts.addPart(part_id, "attaching_" + part_id, disk); {
LOG_WARNING(log, "Will not try to attach part {} because its directory is temporary, "
if (MergeTreePartInfo::tryParsePartName(part_id, format_version)) "probably it's being detached right now", part_id);
name_to_disk[part_id] = getDiskForDetachedPart(part_id); }
else
{
auto disk = getDiskForDetachedPart(part_id);
renamed_parts.addPart(part_id, "attaching_" + part_id, disk);
}
} }
else else
{ {
@ -5825,6 +5863,12 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const
for (const auto & part_info : detached_parts) for (const auto & part_info : detached_parts)
{ {
if (temporary_parts.contains(String(DETACHED_DIR_NAME) + "/" + part_info.dir_name))
{
LOG_WARNING(log, "Will not try to attach part {} because its directory is temporary, "
"probably it's being detached right now", part_info.dir_name);
continue;
}
LOG_DEBUG(log, "Found part {}", part_info.dir_name); LOG_DEBUG(log, "Found part {}", part_info.dir_name);
active_parts.add(part_info.dir_name); active_parts.add(part_info.dir_name);
} }
@ -5835,6 +5879,8 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const
for (const auto & part_info : detached_parts) for (const auto & part_info : detached_parts)
{ {
const String containing_part = active_parts.getContainingPart(part_info.dir_name); const String containing_part = active_parts.getContainingPart(part_info.dir_name);
if (containing_part.empty())
continue;
LOG_DEBUG(log, "Found containing part {} for part {}", containing_part, part_info.dir_name); LOG_DEBUG(log, "Found containing part {} for part {}", containing_part, part_info.dir_name);

View File

@ -17,6 +17,7 @@ namespace DB
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int DIRECTORY_ALREADY_EXISTS; extern const int DIRECTORY_ALREADY_EXISTS;
extern const int NOT_IMPLEMENTED;
} }
MergeTreeDataPartInMemory::MergeTreeDataPartInMemory( MergeTreeDataPartInMemory::MergeTreeDataPartInMemory(
@ -138,8 +139,12 @@ MutableDataPartStoragePtr MergeTreeDataPartInMemory::flushToDisk(const String &
return new_data_part_storage; return new_data_part_storage;
} }
DataPartStoragePtr MergeTreeDataPartInMemory::makeCloneInDetached(const String & prefix, const StorageMetadataPtr & metadata_snapshot) const DataPartStoragePtr MergeTreeDataPartInMemory::makeCloneInDetached(const String & prefix,
const StorageMetadataPtr & metadata_snapshot,
const DiskTransactionPtr & disk_transaction) const
{ {
if (disk_transaction)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "InMemory parts are not compatible with disk transactions");
String detached_path = *getRelativePathForDetachedPart(prefix, /* broken */ false); String detached_path = *getRelativePathForDetachedPart(prefix, /* broken */ false);
return flushToDisk(detached_path, metadata_snapshot); return flushToDisk(detached_path, metadata_snapshot);
} }

View File

@ -42,7 +42,8 @@ public:
bool hasColumnFiles(const NameAndTypePair & column) const override { return !!getColumnPosition(column.getNameInStorage()); } bool hasColumnFiles(const NameAndTypePair & column) const override { return !!getColumnPosition(column.getNameInStorage()); }
String getFileNameForColumn(const NameAndTypePair & /* column */) const override { return ""; } String getFileNameForColumn(const NameAndTypePair & /* column */) const override { return ""; }
void renameTo(const String & new_relative_path, bool remove_new_dir_if_exists) override; void renameTo(const String & new_relative_path, bool remove_new_dir_if_exists) override;
DataPartStoragePtr makeCloneInDetached(const String & prefix, const StorageMetadataPtr & metadata_snapshot) const override; DataPartStoragePtr makeCloneInDetached(const String & prefix, const StorageMetadataPtr & metadata_snapshot,
const DiskTransactionPtr & disk_transaction) const override;
std::optional<time_t> getColumnModificationTime(const String & /* column_name */) const override { return {}; } std::optional<time_t> getColumnModificationTime(const String & /* column_name */) const override { return {}; }
MutableDataPartStoragePtr flushToDisk(const String & new_relative_path, const StorageMetadataPtr & metadata_snapshot) const; MutableDataPartStoragePtr flushToDisk(const String & new_relative_path, const StorageMetadataPtr & metadata_snapshot) const;

View File

@ -149,8 +149,7 @@ Float32 ReplicatedMergeTreeCleanupThread::iterate()
/// do it under share lock /// do it under share lock
cleaned_other += storage.clearOldWriteAheadLogs(); cleaned_other += storage.clearOldWriteAheadLogs();
cleaned_part_like += storage.clearOldTemporaryDirectories(storage.getSettings()->temporary_directories_lifetime.totalSeconds()); cleaned_part_like += storage.clearOldTemporaryDirectories(storage.getSettings()->temporary_directories_lifetime.totalSeconds());
if (storage.getSettings()->merge_tree_enable_clear_old_broken_detached) cleaned_part_like += storage.clearOldBrokenPartsFromDetachedDirectory();
cleaned_part_like += storage.clearOldBrokenPartsFromDetachedDirectory();
} }
/// This is loose condition: no problem if we actually had lost leadership at this moment /// This is loose condition: no problem if we actually had lost leadership at this moment

View File

@ -633,8 +633,8 @@ void ReplicatedMergeTreeSinkImpl<true>::finishDelayedChunk(const ZooKeeperWithFa
delayed_chunk.reset(); delayed_chunk.reset();
} }
template<bool async_insert> template<>
void ReplicatedMergeTreeSinkImpl<async_insert>::writeExistingPart(MergeTreeData::MutableDataPartPtr & part) bool ReplicatedMergeTreeSinkImpl<false>::writeExistingPart(MergeTreeData::MutableDataPartPtr & part)
{ {
/// NOTE: No delay in this case. That's Ok. /// NOTE: No delay in this case. That's Ok.
auto origin_zookeeper = storage.getZooKeeper(); auto origin_zookeeper = storage.getZooKeeper();
@ -649,8 +649,13 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::writeExistingPart(MergeTreeData:
try try
{ {
part->version.setCreationTID(Tx::PrehistoricTID, nullptr); part->version.setCreationTID(Tx::PrehistoricTID, nullptr);
commitPart(zookeeper, part, BlockIDsType(), replicas_num, true); String block_id = deduplicate ? fmt::format("{}_{}", part->info.partition_id, part->checksums.getTotalChecksumHex()) : "";
PartLog::addNewPart(storage.getContext(), PartLog::PartLogEntry(part, watch.elapsed(), profile_events_scope.getSnapshot())); bool deduplicated = commitPart(zookeeper, part, block_id, replicas_num, /* writing_existing_part */ true).second;
/// Set a special error code if the block is duplicate
int error = (deduplicate && deduplicated) ? ErrorCodes::INSERT_WAS_DEDUPLICATED : 0;
PartLog::addNewPart(storage.getContext(), PartLog::PartLogEntry(part, watch.elapsed(), profile_events_scope.getSnapshot()), ExecutionStatus(error));
return deduplicated;
} }
catch (...) catch (...)
{ {

View File

@ -56,7 +56,7 @@ public:
String getName() const override { return "ReplicatedMergeTreeSink"; } String getName() const override { return "ReplicatedMergeTreeSink"; }
/// For ATTACHing existing data on filesystem. /// For ATTACHing existing data on filesystem.
void writeExistingPart(MergeTreeData::MutableDataPartPtr & part); bool writeExistingPart(MergeTreeData::MutableDataPartPtr & part);
/// For proper deduplication in MaterializedViews /// For proper deduplication in MaterializedViews
bool lastBlockIsDuplicate() const override bool lastBlockIsDuplicate() const override

View File

@ -1379,8 +1379,7 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign
cleared_count += clearOldWriteAheadLogs(); cleared_count += clearOldWriteAheadLogs();
cleared_count += clearOldMutations(); cleared_count += clearOldMutations();
cleared_count += clearEmptyParts(); cleared_count += clearEmptyParts();
if (getSettings()->merge_tree_enable_clear_old_broken_detached) cleared_count += clearOldBrokenPartsFromDetachedDirectory();
cleared_count += clearOldBrokenPartsFromDetachedDirectory();
return cleared_count; return cleared_count;
/// TODO maybe take into account number of cleared objects when calculating backoff /// TODO maybe take into account number of cleared objects when calculating backoff
}, common_assignee_trigger, getStorageID()), /* need_trigger */ false); }, common_assignee_trigger, getStorageID()), /* need_trigger */ false);
@ -1811,8 +1810,10 @@ void StorageMergeTree::dropPart(const String & part_name, bool detach, ContextPt
if (detach) if (detach)
{ {
auto metadata_snapshot = getInMemoryMetadataPtr(); auto metadata_snapshot = getInMemoryMetadataPtr();
LOG_INFO(log, "Detaching {}", part->getDataPartStorage().getPartDirectory()); String part_dir = part->getDataPartStorage().getPartDirectory();
part->makeCloneInDetached("", metadata_snapshot); LOG_INFO(log, "Detaching {}", part_dir);
auto holder = getTemporaryPartDirectoryHolder(String(DETACHED_DIR_NAME) + "/" + part_dir);
part->makeCloneInDetached("", metadata_snapshot, /*disk_transaction*/ {});
} }
{ {
@ -1894,8 +1895,10 @@ void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, Cont
for (const auto & part : parts) for (const auto & part : parts)
{ {
auto metadata_snapshot = getInMemoryMetadataPtr(); auto metadata_snapshot = getInMemoryMetadataPtr();
LOG_INFO(log, "Detaching {}", part->getDataPartStorage().getPartDirectory()); String part_dir = part->getDataPartStorage().getPartDirectory();
part->makeCloneInDetached("", metadata_snapshot); LOG_INFO(log, "Detaching {}", part_dir);
auto holder = getTemporaryPartDirectoryHolder(String(DETACHED_DIR_NAME) + "/" + part_dir);
part->makeCloneInDetached("", metadata_snapshot, /*disk_transaction*/ {});
} }
} }
@ -1935,8 +1938,10 @@ void StorageMergeTree::dropPartsImpl(DataPartsVector && parts_to_remove, bool de
/// NOTE: no race with background cleanup until we hold pointers to parts /// NOTE: no race with background cleanup until we hold pointers to parts
for (const auto & part : parts_to_remove) for (const auto & part : parts_to_remove)
{ {
LOG_INFO(log, "Detaching {}", part->getDataPartStorage().getPartDirectory()); String part_dir = part->getDataPartStorage().getPartDirectory();
part->makeCloneInDetached("", metadata_snapshot); LOG_INFO(log, "Detaching {}", part_dir);
auto holder = getTemporaryPartDirectoryHolder(String(DETACHED_DIR_NAME) + "/" + part_dir);
part->makeCloneInDetached("", metadata_snapshot, /*disk_transaction*/ {});
} }
} }

View File

@ -2097,8 +2097,10 @@ void StorageReplicatedMergeTree::executeDropRange(const LogEntry & entry)
{ {
if (auto part_to_detach = part.getPartIfItWasActive()) if (auto part_to_detach = part.getPartIfItWasActive())
{ {
LOG_INFO(log, "Detaching {}", part_to_detach->getDataPartStorage().getPartDirectory()); String part_dir = part_to_detach->getDataPartStorage().getPartDirectory();
part_to_detach->makeCloneInDetached("", metadata_snapshot); LOG_INFO(log, "Detaching {}", part_dir);
auto holder = getTemporaryPartDirectoryHolder(String(DETACHED_DIR_NAME) + "/" + part_dir);
part_to_detach->makeCloneInDetached("", metadata_snapshot, /*disk_transaction*/ {});
} }
} }
} }
@ -2828,7 +2830,7 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
for (const auto & part : parts_to_remove_from_working_set) for (const auto & part : parts_to_remove_from_working_set)
{ {
LOG_INFO(log, "Detaching {}", part->getDataPartStorage().getPartDirectory()); LOG_INFO(log, "Detaching {}", part->getDataPartStorage().getPartDirectory());
part->makeCloneInDetached("clone", metadata_snapshot); part->makeCloneInDetached("clone", metadata_snapshot, /*disk_transaction*/ {});
} }
} }
@ -3794,12 +3796,12 @@ void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_n
chassert(!broken_part); chassert(!broken_part);
chassert(!storage_init); chassert(!storage_init);
part->was_removed_as_broken = true; part->was_removed_as_broken = true;
part->makeCloneInDetached("broken", getInMemoryMetadataPtr()); part->makeCloneInDetached("broken", getInMemoryMetadataPtr(), /*disk_transaction*/ {});
broken_part = part; broken_part = part;
} }
else else
{ {
part->makeCloneInDetached("covered-by-broken", getInMemoryMetadataPtr()); part->makeCloneInDetached("covered-by-broken", getInMemoryMetadataPtr(), /*disk_transaction*/ {});
} }
detached_parts.push_back(part->name); detached_parts.push_back(part->name);
} }
@ -6133,8 +6135,9 @@ PartitionCommandsResultInfo StorageReplicatedMergeTree::attachPartition(
MutableDataPartsVector loaded_parts = tryLoadPartsToAttach(partition, attach_part, query_context, renamed_parts); MutableDataPartsVector loaded_parts = tryLoadPartsToAttach(partition, attach_part, query_context, renamed_parts);
/// TODO Allow to use quorum here. /// TODO Allow to use quorum here.
ReplicatedMergeTreeSink output(*this, metadata_snapshot, 0, 0, 0, false, false, false, query_context, ReplicatedMergeTreeSink output(*this, metadata_snapshot, /* quorum */ 0, /* quorum_timeout_ms */ 0, /* max_parts_per_block */ 0,
/*is_attach*/true); /* quorum_parallel */ false, query_context->getSettingsRef().insert_deduplicate,
/* majority_quorum */ false, query_context, /*is_attach*/true);
for (size_t i = 0; i < loaded_parts.size(); ++i) for (size_t i = 0; i < loaded_parts.size(); ++i)
{ {

View File

@ -5,24 +5,6 @@ test_distributed_ddl/test.py::test_default_database[configs_secure]
test_distributed_ddl/test.py::test_on_server_fail[configs] test_distributed_ddl/test.py::test_on_server_fail[configs]
test_distributed_ddl/test.py::test_on_server_fail[configs_secure] test_distributed_ddl/test.py::test_on_server_fail[configs_secure]
test_distributed_insert_backward_compatibility/test.py::test_distributed_in_tuple test_distributed_insert_backward_compatibility/test.py::test_distributed_in_tuple
test_distributed_inter_server_secret/test.py::test_per_user_inline_settings_secure_cluster[default-]
test_distributed_inter_server_secret/test.py::test_per_user_inline_settings_secure_cluster[nopass-]
test_distributed_inter_server_secret/test.py::test_per_user_inline_settings_secure_cluster[pass-foo]
test_distributed_inter_server_secret/test.py::test_per_user_protocol_settings_secure_cluster[default-]
test_distributed_inter_server_secret/test.py::test_per_user_protocol_settings_secure_cluster[nopass-]
test_distributed_inter_server_secret/test.py::test_per_user_protocol_settings_secure_cluster[pass-foo]
test_distributed_inter_server_secret/test.py::test_user_insecure_cluster[default-]
test_distributed_inter_server_secret/test.py::test_user_insecure_cluster[nopass-]
test_distributed_inter_server_secret/test.py::test_user_insecure_cluster[pass-foo]
test_distributed_inter_server_secret/test.py::test_user_secure_cluster[default-]
test_distributed_inter_server_secret/test.py::test_user_secure_cluster[nopass-]
test_distributed_inter_server_secret/test.py::test_user_secure_cluster[pass-foo]
test_distributed_inter_server_secret/test.py::test_user_secure_cluster_from_backward[default-]
test_distributed_inter_server_secret/test.py::test_user_secure_cluster_from_backward[nopass-]
test_distributed_inter_server_secret/test.py::test_user_secure_cluster_from_backward[pass-foo]
test_distributed_inter_server_secret/test.py::test_user_secure_cluster_with_backward[default-]
test_distributed_inter_server_secret/test.py::test_user_secure_cluster_with_backward[nopass-]
test_distributed_inter_server_secret/test.py::test_user_secure_cluster_with_backward[pass-foo]
test_distributed_load_balancing/test.py::test_distributed_replica_max_ignored_errors test_distributed_load_balancing/test.py::test_distributed_replica_max_ignored_errors
test_distributed_load_balancing/test.py::test_load_balancing_default test_distributed_load_balancing/test.py::test_load_balancing_default
test_distributed_load_balancing/test.py::test_load_balancing_priority_round_robin[dist_priority] test_distributed_load_balancing/test.py::test_load_balancing_priority_round_robin[dist_priority]

View File

@ -146,7 +146,7 @@ def main():
"CLICKHOUSE_CI_LOGS_PASSWORD", "CLICKHOUSE_CI_LOGS_PASSWORD" "CLICKHOUSE_CI_LOGS_PASSWORD", "CLICKHOUSE_CI_LOGS_PASSWORD"
) )
if ci_logs_host != "CLICKHOUSE_CI_LOGS_HOST": if ci_logs_host not in ("CLICKHOUSE_CI_LOGS_HOST", ""):
subprocess.check_call( subprocess.check_call(
f"sed -i -r -e 's!{ci_logs_host}!CLICKHOUSE_CI_LOGS_HOST!g; s!{ci_logs_password}!CLICKHOUSE_CI_LOGS_PASSWORD!g;' '{run_log_path}' '{main_log_path}'", f"sed -i -r -e 's!{ci_logs_host}!CLICKHOUSE_CI_LOGS_HOST!g; s!{ci_logs_password}!CLICKHOUSE_CI_LOGS_PASSWORD!g;' '{run_log_path}' '{main_log_path}'",
shell=True, shell=True,

View File

@ -394,7 +394,7 @@ def main():
ci_logs_password = os.getenv( ci_logs_password = os.getenv(
"CLICKHOUSE_CI_LOGS_PASSWORD", "CLICKHOUSE_CI_LOGS_PASSWORD" "CLICKHOUSE_CI_LOGS_PASSWORD", "CLICKHOUSE_CI_LOGS_PASSWORD"
) )
if ci_logs_host != "CLICKHOUSE_CI_LOGS_HOST": if ci_logs_host not in ("CLICKHOUSE_CI_LOGS_HOST", ""):
subprocess.check_call( subprocess.check_call(
f"sed -i -r -e 's!{ci_logs_host}!CLICKHOUSE_CI_LOGS_HOST!g; s!{ci_logs_password}!CLICKHOUSE_CI_LOGS_PASSWORD!g;' '{run_log_path}'", f"sed -i -r -e 's!{ci_logs_host}!CLICKHOUSE_CI_LOGS_HOST!g; s!{ci_logs_password}!CLICKHOUSE_CI_LOGS_PASSWORD!g;' '{run_log_path}'",
shell=True, shell=True,

View File

@ -50,8 +50,19 @@ def prepare_test_scripts():
server_test = r"""#!/bin/bash server_test = r"""#!/bin/bash
set -e set -e
trap "bash -ex /packages/preserve_logs.sh" ERR trap "bash -ex /packages/preserve_logs.sh" ERR
test_env='TEST_THE_DEFAULT_PARAMETER=15'
echo "$test_env" >> /etc/default/clickhouse
systemctl start clickhouse-server systemctl start clickhouse-server
clickhouse-client -q 'SELECT version()'""" clickhouse-client -q 'SELECT version()'
grep "$test_env" /proc/$(cat /var/run/clickhouse-server/clickhouse-server.pid)/environ"""
initd_test = r"""#!/bin/bash
set -e
trap "bash -ex /packages/preserve_logs.sh" ERR
test_env='TEST_THE_DEFAULT_PARAMETER=15'
echo "$test_env" >> /etc/default/clickhouse
/etc/init.d/clickhouse-server start
clickhouse-client -q 'SELECT version()'
grep "$test_env" /proc/$(cat /var/run/clickhouse-server/clickhouse-server.pid)/environ"""
keeper_test = r"""#!/bin/bash keeper_test = r"""#!/bin/bash
set -e set -e
trap "bash -ex /packages/preserve_logs.sh" ERR trap "bash -ex /packages/preserve_logs.sh" ERR
@ -102,6 +113,7 @@ chmod a+rw -R /tests_logs
exit 1 exit 1
""" """
(TEMP_PATH / "server_test.sh").write_text(server_test, encoding="utf-8") (TEMP_PATH / "server_test.sh").write_text(server_test, encoding="utf-8")
(TEMP_PATH / "initd_test.sh").write_text(initd_test, encoding="utf-8")
(TEMP_PATH / "keeper_test.sh").write_text(keeper_test, encoding="utf-8") (TEMP_PATH / "keeper_test.sh").write_text(keeper_test, encoding="utf-8")
(TEMP_PATH / "binary_test.sh").write_text(binary_test, encoding="utf-8") (TEMP_PATH / "binary_test.sh").write_text(binary_test, encoding="utf-8")
(TEMP_PATH / "preserve_logs.sh").write_text(preserve_logs, encoding="utf-8") (TEMP_PATH / "preserve_logs.sh").write_text(preserve_logs, encoding="utf-8")
@ -112,6 +124,9 @@ def test_install_deb(image: DockerImage) -> TestResults:
"Install server deb": r"""#!/bin/bash -ex "Install server deb": r"""#!/bin/bash -ex
apt-get install /packages/clickhouse-{server,client,common}*deb apt-get install /packages/clickhouse-{server,client,common}*deb
bash -ex /packages/server_test.sh""", bash -ex /packages/server_test.sh""",
"Run server init.d": r"""#!/bin/bash -ex
apt-get install /packages/clickhouse-{server,client,common}*deb
bash -ex /packages/initd_test.sh""",
"Install keeper deb": r"""#!/bin/bash -ex "Install keeper deb": r"""#!/bin/bash -ex
apt-get install /packages/clickhouse-keeper*deb apt-get install /packages/clickhouse-keeper*deb
bash -ex /packages/keeper_test.sh""", bash -ex /packages/keeper_test.sh""",
@ -191,6 +206,9 @@ def test_install(image: DockerImage, tests: Dict[str, str]) -> TestResults:
retcode = process.wait() retcode = process.wait()
if retcode == 0: if retcode == 0:
status = OK status = OK
subprocess.check_call(
f"docker kill -s 9 {container_id}", shell=True
)
break break
status = FAIL status = FAIL
@ -198,8 +216,8 @@ def test_install(image: DockerImage, tests: Dict[str, str]) -> TestResults:
archive_path = TEMP_PATH / f"{container_name}-{retry}.tar.gz" archive_path = TEMP_PATH / f"{container_name}-{retry}.tar.gz"
compress_fast(LOGS_PATH, archive_path) compress_fast(LOGS_PATH, archive_path)
logs.append(archive_path) logs.append(archive_path)
subprocess.check_call(f"docker kill -s 9 {container_id}", shell=True)
subprocess.check_call(f"docker kill -s 9 {container_id}", shell=True)
test_results.append(TestResult(name, status, stopwatch.duration_seconds, logs)) test_results.append(TestResult(name, status, stopwatch.duration_seconds, logs))
return test_results return test_results
@ -276,7 +294,7 @@ def main():
sys.exit(0) sys.exit(0)
docker_images = { docker_images = {
name: get_image_with_version(REPORTS_PATH, name) name: get_image_with_version(REPORTS_PATH, name, args.download)
for name in (RPM_IMAGE, DEB_IMAGE) for name in (RPM_IMAGE, DEB_IMAGE)
} }
prepare_test_scripts() prepare_test_scripts()
@ -293,6 +311,8 @@ def main():
is_match = is_match or path.endswith(".rpm") is_match = is_match or path.endswith(".rpm")
if args.tgz: if args.tgz:
is_match = is_match or path.endswith(".tgz") is_match = is_match or path.endswith(".tgz")
# We don't need debug packages, so let's filter them out
is_match = is_match and "-dbg" not in path
return is_match return is_match
download_builds_filter( download_builds_filter(

View File

@ -209,7 +209,7 @@ def run_stress_test(docker_image_name):
ci_logs_password = os.getenv( ci_logs_password = os.getenv(
"CLICKHOUSE_CI_LOGS_PASSWORD", "CLICKHOUSE_CI_LOGS_PASSWORD" "CLICKHOUSE_CI_LOGS_PASSWORD", "CLICKHOUSE_CI_LOGS_PASSWORD"
) )
if ci_logs_host != "CLICKHOUSE_CI_LOGS_HOST": if ci_logs_host not in ("CLICKHOUSE_CI_LOGS_HOST", ""):
subprocess.check_call( subprocess.check_call(
f"sed -i -r -e 's!{ci_logs_host}!CLICKHOUSE_CI_LOGS_HOST!g; s!{ci_logs_password}!CLICKHOUSE_CI_LOGS_PASSWORD!g;' '{run_log_path}'", f"sed -i -r -e 's!{ci_logs_host}!CLICKHOUSE_CI_LOGS_HOST!g; s!{ci_logs_password}!CLICKHOUSE_CI_LOGS_PASSWORD!g;' '{run_log_path}'",
shell=True, shell=True,

View File

@ -57,27 +57,30 @@ def remove_broken_detached_part_impl(table, node, expect_broken_prefix):
] ]
) )
node.exec_in_container(["mkdir", f"{path_to_detached}../unexpected_all_42_1337_5"]) for name in [
node.exec_in_container( "unexpected_all_42_1337_5",
[ "deleting_all_123_456_7",
"touch", "covered-by-broken_all_12_34_5",
"-t", ]:
"1312031429.30", node.exec_in_container(["mkdir", f"{path_to_detached}../{name}"])
f"{path_to_detached}../unexpected_all_42_1337_5", node.exec_in_container(
] [
) "touch",
result = node.exec_in_container( "-t",
["stat", f"{path_to_detached}../unexpected_all_42_1337_5"] "1312031429.30",
) f"{path_to_detached}../{name}",
print(result) ]
assert "Modify: 2013-12-03" in result )
node.exec_in_container( result = node.exec_in_container(["stat", f"{path_to_detached}../{name}"])
[ print(result)
"mv", assert "Modify: 2013-12-03" in result
f"{path_to_detached}../unexpected_all_42_1337_5", node.exec_in_container(
f"{path_to_detached}unexpected_all_42_1337_5", [
] "mv",
) f"{path_to_detached}../{name}",
f"{path_to_detached}{name}",
]
)
result = node.query( result = node.query(
f"CHECK TABLE {table}", settings={"check_query_single_value_result": 0} f"CHECK TABLE {table}", settings={"check_query_single_value_result": 0}
@ -87,17 +90,20 @@ def remove_broken_detached_part_impl(table, node, expect_broken_prefix):
node.query(f"DETACH TABLE {table}") node.query(f"DETACH TABLE {table}")
node.query(f"ATTACH TABLE {table}") node.query(f"ATTACH TABLE {table}")
result = node.exec_in_container(["ls", path_to_detached]) node.wait_for_log_line(
print(result) "Removing detached part deleting_all_123_456_7",
assert f"{expect_broken_prefix}_all_3_3_0" in result timeout=90,
assert "all_1_1_0" in result look_behind_lines=1000000,
assert "trash" in result )
assert "broken_all_fake" in result node.wait_for_log_line(
assert "unexpected_all_42_1337_5" in result f"Removed broken detached part {expect_broken_prefix}_all_3_3_0 due to a timeout",
timeout=10,
time.sleep(15) look_behind_lines=1000000,
assert node.contains_in_log( )
"Removed broken detached part unexpected_all_42_1337_5 due to a timeout" node.wait_for_log_line(
"Removed broken detached part unexpected_all_42_1337_5 due to a timeout",
timeout=10,
look_behind_lines=1000000,
) )
result = node.exec_in_container(["ls", path_to_detached]) result = node.exec_in_container(["ls", path_to_detached])
@ -106,7 +112,16 @@ def remove_broken_detached_part_impl(table, node, expect_broken_prefix):
assert "all_1_1_0" in result assert "all_1_1_0" in result
assert "trash" in result assert "trash" in result
assert "broken_all_fake" in result assert "broken_all_fake" in result
assert "covered-by-broken_all_12_34_5" in result
assert "unexpected_all_42_1337_5" not in result assert "unexpected_all_42_1337_5" not in result
assert "deleting_all_123_456_7" not in result
node.query(
f"ALTER TABLE {table} DROP DETACHED PART 'covered-by-broken_all_12_34_5'",
settings={"allow_drop_detached": 1},
)
result = node.exec_in_container(["ls", path_to_detached])
assert "covered-by-broken_all_12_34_5" not in result
node.query(f"DROP TABLE {table} SYNC") node.query(f"DROP TABLE {table} SYNC")

View File

@ -110,10 +110,6 @@ def start_cluster():
cluster.shutdown() cluster.shutdown()
def query_with_id(node, id_, query, **kwargs):
return node.query("WITH '{}' AS __id {}".format(id_, query), **kwargs)
# @return -- [user, initial_user] # @return -- [user, initial_user]
def get_query_user_info(node, query_pattern): def get_query_user_info(node, query_pattern):
node.query("SYSTEM FLUSH LOGS") node.query("SYSTEM FLUSH LOGS")
@ -334,7 +330,7 @@ def test_secure_disagree_insert():
@users @users
def test_user_insecure_cluster(user, password): def test_user_insecure_cluster(user, password):
id_ = "query-dist_insecure-" + user id_ = "query-dist_insecure-" + user
query_with_id(n1, id_, "SELECT * FROM dist_insecure", user=user, password=password) n1.query(f"SELECT *, '{id_}' FROM dist_insecure", user=user, password=password)
assert get_query_user_info(n1, id_) == [ assert get_query_user_info(n1, id_) == [
user, user,
user, user,
@ -345,7 +341,7 @@ def test_user_insecure_cluster(user, password):
@users @users
def test_user_secure_cluster(user, password): def test_user_secure_cluster(user, password):
id_ = "query-dist_secure-" + user id_ = "query-dist_secure-" + user
query_with_id(n1, id_, "SELECT * FROM dist_secure", user=user, password=password) n1.query(f"SELECT *, '{id_}' FROM dist_secure", user=user, password=password)
assert get_query_user_info(n1, id_) == [user, user] assert get_query_user_info(n1, id_) == [user, user]
assert get_query_user_info(n2, id_) == [user, user] assert get_query_user_info(n2, id_) == [user, user]
@ -353,16 +349,14 @@ def test_user_secure_cluster(user, password):
@users @users
def test_per_user_inline_settings_insecure_cluster(user, password): def test_per_user_inline_settings_insecure_cluster(user, password):
id_ = "query-ddl-settings-dist_insecure-" + user id_ = "query-ddl-settings-dist_insecure-" + user
query_with_id( n1.query(
n1, f"""
id_, SELECT *, '{id_}' FROM dist_insecure
""" SETTINGS
SELECT * FROM dist_insecure prefer_localhost_replica=0,
SETTINGS max_memory_usage_for_user=1e9,
prefer_localhost_replica=0, max_untracked_memory=0
max_memory_usage_for_user=1e9, """,
max_untracked_memory=0
""",
user=user, user=user,
password=password, password=password,
) )
@ -372,16 +366,14 @@ def test_per_user_inline_settings_insecure_cluster(user, password):
@users @users
def test_per_user_inline_settings_secure_cluster(user, password): def test_per_user_inline_settings_secure_cluster(user, password):
id_ = "query-ddl-settings-dist_secure-" + user id_ = "query-ddl-settings-dist_secure-" + user
query_with_id( n1.query(
n1, f"""
id_, SELECT *, '{id_}' FROM dist_secure
""" SETTINGS
SELECT * FROM dist_secure prefer_localhost_replica=0,
SETTINGS max_memory_usage_for_user=1e9,
prefer_localhost_replica=0, max_untracked_memory=0
max_memory_usage_for_user=1e9, """,
max_untracked_memory=0
""",
user=user, user=user,
password=password, password=password,
) )
@ -393,10 +385,8 @@ def test_per_user_inline_settings_secure_cluster(user, password):
@users @users
def test_per_user_protocol_settings_insecure_cluster(user, password): def test_per_user_protocol_settings_insecure_cluster(user, password):
id_ = "query-protocol-settings-dist_insecure-" + user id_ = "query-protocol-settings-dist_insecure-" + user
query_with_id( n1.query(
n1, f"SELECT *, '{id_}' FROM dist_insecure",
id_,
"SELECT * FROM dist_insecure",
user=user, user=user,
password=password, password=password,
settings={ settings={
@ -411,10 +401,8 @@ def test_per_user_protocol_settings_insecure_cluster(user, password):
@users @users
def test_per_user_protocol_settings_secure_cluster(user, password): def test_per_user_protocol_settings_secure_cluster(user, password):
id_ = "query-protocol-settings-dist_secure-" + user id_ = "query-protocol-settings-dist_secure-" + user
query_with_id( n1.query(
n1, f"SELECT *, '{id_}' FROM dist_secure",
id_,
"SELECT * FROM dist_secure",
user=user, user=user,
password=password, password=password,
settings={ settings={
@ -431,8 +419,8 @@ def test_per_user_protocol_settings_secure_cluster(user, password):
@users @users
def test_user_secure_cluster_with_backward(user, password): def test_user_secure_cluster_with_backward(user, password):
id_ = "with-backward-query-dist_secure-" + user id_ = "with-backward-query-dist_secure-" + user
query_with_id( n1.query(
n1, id_, "SELECT * FROM dist_secure_backward", user=user, password=password f"SELECT *, '{id_}' FROM dist_secure_backward", user=user, password=password
) )
assert get_query_user_info(n1, id_) == [user, user] assert get_query_user_info(n1, id_) == [user, user]
assert get_query_user_info(backward, id_) == [user, user] assert get_query_user_info(backward, id_) == [user, user]
@ -441,13 +429,7 @@ def test_user_secure_cluster_with_backward(user, password):
@users @users
def test_user_secure_cluster_from_backward(user, password): def test_user_secure_cluster_from_backward(user, password):
id_ = "from-backward-query-dist_secure-" + user id_ = "from-backward-query-dist_secure-" + user
query_with_id( backward.query(f"SELECT *, '{id_}' FROM dist_secure", user=user, password=password)
backward,
id_,
"SELECT * FROM dist_secure_backward",
user=user,
password=password,
)
assert get_query_user_info(n1, id_) == [user, user] assert get_query_user_info(n1, id_) == [user, user]
assert get_query_user_info(backward, id_) == [user, user] assert get_query_user_info(backward, id_) == [user, user]

View File

@ -28,10 +28,7 @@ proto_dir = os.path.join(SCRIPT_DIR, "./protos")
gen_dir = os.path.join(SCRIPT_DIR, "./_gen") gen_dir = os.path.join(SCRIPT_DIR, "./_gen")
os.makedirs(gen_dir, exist_ok=True) os.makedirs(gen_dir, exist_ok=True)
run_and_check( run_and_check(
"python3 -m grpc_tools.protoc -I{proto_dir} --python_out={gen_dir} --grpc_python_out={gen_dir} \ f"python3 -m grpc_tools.protoc -I{proto_dir} --python_out={gen_dir} --grpc_python_out={gen_dir} {proto_dir}/clickhouse_grpc.proto",
{proto_dir}/clickhouse_grpc.proto".format(
proto_dir=proto_dir, gen_dir=gen_dir
),
shell=True, shell=True,
) )

View File

@ -0,0 +1 @@
_gen

View File

@ -0,0 +1,9 @@
<clickhouse>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/clickhouse-server.log</log>
<errorlog>/var/log/clickhouse-server/clickhouse-server.err.log</errorlog>
<size>1000M</size>
<count>10</count>
</logger>
</clickhouse>

View File

@ -0,0 +1,9 @@
<clickhouse>
<postgresql_port>5433</postgresql_port>
<mysql_port>9001</mysql_port>
<grpc_port>9100</grpc_port>
<grpc replace="replace">
<!-- Enable if you want very detailed logs -->
<verbose_logs>false</verbose_logs>
</grpc>
</clickhouse>

View File

@ -0,0 +1,9 @@
<clickhouse>
<session_log>
<database>system</database>
<table>session_log</table>
<partition_by>toYYYYMM(event_date)</partition_by>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
</session_log>
</clickhouse>

View File

@ -0,0 +1,23 @@
<clickhouse>
<profiles>
<default>
<function_sleep_max_microseconds_per_block>0</function_sleep_max_microseconds_per_block>
</default>
</profiles>
<users>
<default>
</default>
<mysql_user>
<password>pass</password>
</mysql_user>
<postgres_user>
<password>pass</password>
</postgres_user>
<grpc_user>
<password>pass</password>
</grpc_user>
<parallel_user>
<password>pass</password>
</parallel_user>
</users>
</clickhouse>

View File

@ -0,0 +1 @@
../../../../src/Server/grpc_protos/clickhouse_grpc.proto

View File

@ -0,0 +1,292 @@
import os
import grpc
import pymysql.connections
import pytest
import random
import sys
import threading
from helpers.cluster import ClickHouseCluster, run_and_check
POSTGRES_SERVER_PORT = 5433
MYSQL_SERVER_PORT = 9001
GRPC_PORT = 9100
SESSION_LOG_MATCHING_FIELDS = "auth_id, auth_type, client_version_major, client_version_minor, client_version_patch, interface"
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
DEFAULT_ENCODING = "utf-8"
# Use grpcio-tools to generate *pb2.py files from *.proto.
proto_dir = os.path.join(SCRIPT_DIR, "./protos")
gen_dir = os.path.join(SCRIPT_DIR, "./_gen")
os.makedirs(gen_dir, exist_ok=True)
run_and_check(
f"python3 -m grpc_tools.protoc -I{proto_dir} --python_out={gen_dir} --grpc_python_out={gen_dir} {proto_dir}/clickhouse_grpc.proto",
shell=True,
)
sys.path.append(gen_dir)
import clickhouse_grpc_pb2
import clickhouse_grpc_pb2_grpc
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance(
"node",
main_configs=[
"configs/ports.xml",
"configs/log.xml",
"configs/session_log.xml",
],
user_configs=["configs/users.xml"],
# Bug in TSAN reproduces in this test https://github.com/grpc/grpc/issues/29550#issuecomment-1188085387
env_variables={
"TSAN_OPTIONS": "report_atomic_races=0 " + os.getenv("TSAN_OPTIONS", default="")
},
with_postgres=True,
)
def grpc_get_url():
return f"{instance.ip_address}:{GRPC_PORT}"
def grpc_create_insecure_channel():
channel = grpc.insecure_channel(grpc_get_url())
grpc.channel_ready_future(channel).result(timeout=2)
return channel
session_id_counter = 0
def next_session_id():
global session_id_counter
session_id = session_id_counter
session_id_counter += 1
return str(session_id)
def grpc_query(query, user_, pass_, raise_exception):
try:
query_info = clickhouse_grpc_pb2.QueryInfo(
query=query,
session_id=next_session_id(),
user_name=user_,
password=pass_,
)
channel = grpc_create_insecure_channel()
stub = clickhouse_grpc_pb2_grpc.ClickHouseStub(channel)
result = stub.ExecuteQuery(query_info)
if result and result.HasField("exception"):
raise Exception(result.exception.display_text)
return result.output.decode(DEFAULT_ENCODING)
except Exception:
assert raise_exception
def postgres_query(query, user_, pass_, raise_exception):
try:
connection_string = f"host={instance.hostname} port={POSTGRES_SERVER_PORT} dbname=default user={user_} password={pass_}"
cluster.exec_in_container(
cluster.postgres_id,
[
"/usr/bin/psql",
connection_string,
"--no-align",
"--field-separator=' '",
"-c",
query,
],
shell=True,
)
except Exception:
assert raise_exception
def mysql_query(query, user_, pass_, raise_exception):
try:
client = pymysql.connections.Connection(
host=instance.ip_address,
user=user_,
password=pass_,
database="default",
port=MYSQL_SERVER_PORT,
)
cursor = client.cursor(pymysql.cursors.DictCursor)
if raise_exception:
with pytest.raises(Exception):
cursor.execute(query)
else:
cursor.execute(query)
cursor.fetchall()
except Exception:
assert raise_exception
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_grpc_session(started_cluster):
grpc_query("SELECT 1", "grpc_user", "pass", False)
grpc_query("SELECT 2", "grpc_user", "wrong_pass", True)
grpc_query("SELECT 3", "wrong_grpc_user", "pass", True)
instance.query("SYSTEM FLUSH LOGS")
login_success_records = instance.query(
"SELECT user, client_port <> 0, client_address <> toIPv6('::') FROM system.session_log WHERE user='grpc_user' AND type = 'LoginSuccess'"
)
assert login_success_records == "grpc_user\t1\t1\n"
logout_records = instance.query(
"SELECT user, client_port <> 0, client_address <> toIPv6('::') FROM system.session_log WHERE user='grpc_user' AND type = 'Logout'"
)
assert logout_records == "grpc_user\t1\t1\n"
login_failure_records = instance.query(
"SELECT user, client_port <> 0, client_address <> toIPv6('::') FROM system.session_log WHERE user='grpc_user' AND type = 'LoginFailure'"
)
assert login_failure_records == "grpc_user\t1\t1\n"
logins_and_logouts = instance.query(
f"SELECT COUNT(*) FROM (SELECT {SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = 'grpc_user' AND type = 'LoginSuccess' INTERSECT SELECT {SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = 'grpc_user' AND type = 'Logout')"
)
assert logins_and_logouts == "1\n"
def test_mysql_session(started_cluster):
mysql_query("SELECT 1", "mysql_user", "pass", False)
mysql_query("SELECT 2", "mysql_user", "wrong_pass", True)
mysql_query("SELECT 3", "wrong_mysql_user", "pass", True)
instance.query("SYSTEM FLUSH LOGS")
login_success_records = instance.query(
"SELECT user, client_port <> 0, client_address <> toIPv6('::') FROM system.session_log WHERE user='mysql_user' AND type = 'LoginSuccess'"
)
assert login_success_records == "mysql_user\t1\t1\n"
logout_records = instance.query(
"SELECT user, client_port <> 0, client_address <> toIPv6('::') FROM system.session_log WHERE user='mysql_user' AND type = 'Logout'"
)
assert logout_records == "mysql_user\t1\t1\n"
login_failure_records = instance.query(
"SELECT user, client_port <> 0, client_address <> toIPv6('::') FROM system.session_log WHERE user='mysql_user' AND type = 'LoginFailure'"
)
assert login_failure_records == "mysql_user\t1\t1\n"
logins_and_logouts = instance.query(
f"SELECT COUNT(*) FROM (SELECT {SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = 'mysql_user' AND type = 'LoginSuccess' INTERSECT SELECT {SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = 'mysql_user' AND type = 'Logout')"
)
assert logins_and_logouts == "1\n"
def test_postgres_session(started_cluster):
postgres_query("SELECT 1", "postgres_user", "pass", False)
postgres_query("SELECT 2", "postgres_user", "wrong_pass", True)
postgres_query("SELECT 3", "wrong_postgres_user", "pass", True)
instance.query("SYSTEM FLUSH LOGS")
login_success_records = instance.query(
"SELECT user, client_port <> 0, client_address <> toIPv6('::') FROM system.session_log WHERE user='postgres_user' AND type = 'LoginSuccess'"
)
assert login_success_records == "postgres_user\t1\t1\n"
logout_records = instance.query(
"SELECT user, client_port <> 0, client_address <> toIPv6('::') FROM system.session_log WHERE user='postgres_user' AND type = 'Logout'"
)
assert logout_records == "postgres_user\t1\t1\n"
login_failure_records = instance.query(
"SELECT user, client_port <> 0, client_address <> toIPv6('::') FROM system.session_log WHERE user='postgres_user' AND type = 'LoginFailure'"
)
assert login_failure_records == "postgres_user\t1\t1\n"
logins_and_logouts = instance.query(
f"SELECT COUNT(*) FROM (SELECT {SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = 'postgres_user' AND type = 'LoginSuccess' INTERSECT SELECT {SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = 'postgres_user' AND type = 'Logout')"
)
assert logins_and_logouts == "1\n"
def test_parallel_sessions(started_cluster):
thread_list = []
for _ in range(10):
# Sleep time does not significantly matter here,
# test should pass even without sleeping.
for function in [postgres_query, grpc_query, mysql_query]:
thread = threading.Thread(
target=function,
args=(
f"SELECT sleep({random.uniform(0.03, 0.04)})",
"parallel_user",
"pass",
False,
),
)
thread.start()
thread_list.append(thread)
thread = threading.Thread(
target=function,
args=(
f"SELECT sleep({random.uniform(0.03, 0.04)})",
"parallel_user",
"wrong_pass",
True,
),
)
thread.start()
thread_list.append(thread)
thread = threading.Thread(
target=function,
args=(
f"SELECT sleep({random.uniform(0.03, 0.04)})",
"wrong_parallel_user",
"pass",
True,
),
)
thread.start()
thread_list.append(thread)
for thread in thread_list:
thread.join()
instance.query("SYSTEM FLUSH LOGS")
port_0_sessions = instance.query(
f"SELECT COUNT(*) FROM system.session_log WHERE user = 'parallel_user'"
)
assert port_0_sessions == "90\n"
port_0_sessions = instance.query(
f"SELECT COUNT(*) FROM system.session_log WHERE user = 'parallel_user' AND client_port = 0"
)
assert port_0_sessions == "0\n"
address_0_sessions = instance.query(
f"SELECT COUNT(*) FROM system.session_log WHERE user = 'parallel_user' AND client_address = toIPv6('::')"
)
assert address_0_sessions == "0\n"
grpc_sessions = instance.query(
f"SELECT COUNT(*) FROM system.session_log WHERE user = 'parallel_user' AND interface = 'gRPC'"
)
assert grpc_sessions == "30\n"
mysql_sessions = instance.query(
f"SELECT COUNT(*) FROM system.session_log WHERE user = 'parallel_user' AND interface = 'MySQL'"
)
assert mysql_sessions == "30\n"
postgres_sessions = instance.query(
f"SELECT COUNT(*) FROM system.session_log WHERE user = 'parallel_user' AND interface = 'PostgreSQL'"
)
assert postgres_sessions == "30\n"
logins_and_logouts = instance.query(
f"SELECT COUNT(*) FROM (SELECT {SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = 'parallel_user' AND type = 'LoginSuccess' INTERSECT SELECT {SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = 'parallel_user' AND type = 'Logout')"
)
assert logins_and_logouts == "30\n"
logout_failure_sessions = instance.query(
f"SELECT COUNT(*) FROM system.session_log WHERE user = 'parallel_user' AND type = 'LoginFailure'"
)
assert logout_failure_sessions == "30\n"

View File

@ -0,0 +1,32 @@
<test>
<settings>
<max_insert_threads>8</max_insert_threads>
<allow_experimental_projection_optimization>0</allow_experimental_projection_optimization>
<max_threads>4</max_threads>
</settings>
<create_query>
CREATE TABLE t_nullable
(
key_string1 Nullable(String),
key_string2 Nullable(String),
key_string3 Nullable(String),
key_int64_1 Nullable(Int64),
key_int64_2 Nullable(Int64),
key_int64_3 Nullable(Int64),
key_int64_4 Nullable(Int64),
key_int64_5 Nullable(Int64),
m1 Int64,
m2 Int64
)
ENGINE = Memory
</create_query>
<fill_query>insert into t_nullable select ['aaaaaa','bbaaaa','ccaaaa','ddaaaa'][number % 101 + 1], ['aa','bb','cc','dd'][number % 100 + 1], ['aa','bb','cc','dd'][number % 102 + 1], number%10+1, number%10+2, number%10+3, number%10+4,number%10+5, number%6000+1, number%5000+2 from numbers_mt(20000000)</fill_query>
<query>select key_string1,key_string2,key_string3, min(m1) from t_nullable group by key_string1,key_string2,key_string3</query>
<query>select key_string3,key_int64_1,key_int64_2, min(m1) from t_nullable group by key_string3,key_int64_1,key_int64_2</query>
<query>select key_int64_1,key_int64_2,key_int64_3,key_int64_4,key_int64_5, min(m1) from t_nullable group by key_int64_1,key_int64_2,key_int64_3,key_int64_4,key_int64_5</query>
<query>select toFloat64(key_int64_1),toFloat64(key_int64_2),toFloat64(key_int64_3),toFloat64(key_int64_4),toFloat64(key_int64_5), min(m1) from t_nullable group by toFloat64(key_int64_1),toFloat64(key_int64_2),toFloat64(key_int64_3),toFloat64(key_int64_4),toFloat64(key_int64_5) limit 10</query>
<query>select toDecimal64(key_int64_1, 3),toDecimal64(key_int64_2, 3),toDecimal64(key_int64_3, 3),toDecimal64(key_int64_4, 3),toDecimal64(key_int64_5, 3), min(m1) from t_nullable group by toDecimal64(key_int64_1, 3),toDecimal64(key_int64_2, 3),toDecimal64(key_int64_3, 3),toDecimal64(key_int64_4, 3),toDecimal64(key_int64_5, 3) limit 10</query>
<drop_query>drop table if exists t_nullable</drop_query>
</test>

View File

@ -11,26 +11,6 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh # shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh . "$CURDIR"/../shell_config.sh
function query_with_retry
{
local query="$1" && shift
local retry=0
until [ $retry -ge 5 ]
do
local result
result="$($CLICKHOUSE_CLIENT "$@" --query="$query" 2>&1)"
if [ "$?" == 0 ]; then
echo -n "$result"
return
else
retry=$((retry + 1))
sleep 3
fi
done
echo "Query '$query' failed with '$result'"
}
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS src;" $CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS src;"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS dst_r1;" $CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS dst_r1;"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS dst_r2;" $CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS dst_r2;"

View File

@ -5,22 +5,6 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh # shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh . "$CURDIR"/../shell_config.sh
function query_with_retry
{
retry=0
until [ $retry -ge 5 ]
do
result=$($CLICKHOUSE_CLIENT $2 --query="$1" 2>&1)
if [ "$?" == 0 ]; then
echo -n "$result"
return
else
retry=$(($retry + 1))
sleep 3
fi
done
echo "Query '$1' failed with '$result'"
}
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS ttl_repl1" $CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS ttl_repl1"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS ttl_repl2" $CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS ttl_repl2"

View File

@ -7,23 +7,6 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh # shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh . "$CURDIR"/../shell_config.sh
function query_with_retry
{
retry=0
until [ $retry -ge 5 ]
do
result=$($CLICKHOUSE_CLIENT $2 --query="$1" 2>&1)
if [ "$?" == 0 ]; then
echo -n "$result"
return
else
retry=$(($retry + 1))
sleep 3
fi
done
echo "Query '$1' failed with '$result'"
}
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS src;" $CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS src;"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS dst;" $CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS dst;"

View File

@ -0,0 +1,4 @@
default begin inserts
default end inserts
30 465
30 465

View File

@ -0,0 +1,86 @@
#!/usr/bin/env bash
# Tags: race, zookeeper, long
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
# shellcheck source=./replication.lib
. "$CURDIR"/replication.lib
$CLICKHOUSE_CLIENT -n -q "
DROP TABLE IF EXISTS alter_table0;
DROP TABLE IF EXISTS alter_table1;
CREATE TABLE alter_table0 (a UInt8, b Int16) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r1') ORDER BY a;
CREATE TABLE alter_table1 (a UInt8, b Int16) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r2') ORDER BY a;
" || exit 1
function thread_detach()
{
while true; do
$CLICKHOUSE_CLIENT -mn -q "ALTER TABLE alter_table$(($RANDOM % 2)) DETACH PARTITION ID 'all'; SELECT sleep($RANDOM / 32000) format Null;" 2>/dev/null ||:
done
}
function thread_attach()
{
while true; do
$CLICKHOUSE_CLIENT -mn -q "ALTER TABLE alter_table$(($RANDOM % 2)) ATTACH PARTITION ID 'all'; SELECT sleep($RANDOM / 32000) format Null;" 2>/dev/null ||:
done
}
insert_type=$(($RANDOM % 3))
$CLICKHOUSE_CLIENT -q "SELECT '$CLICKHOUSE_DATABASE', 'insert_type $insert_type' FORMAT Null"
function insert()
{
# Fault injection may lead to duplicates
if [[ "$insert_type" -eq 0 ]]; then
$CLICKHOUSE_CLIENT --insert_deduplication_token=$1 -q "INSERT INTO alter_table$(($RANDOM % 2)) SELECT $RANDOM, $1" 2>/dev/null
elif [[ "$insert_type" -eq 1 ]]; then
$CLICKHOUSE_CLIENT -q "INSERT INTO alter_table$(($RANDOM % 2)) SELECT $1, $1" 2>/dev/null
else
$CLICKHOUSE_CLIENT --insert_keeper_fault_injection_probability=0 -q "INSERT INTO alter_table$(($RANDOM % 2)) SELECT $RANDOM, $1" 2>/dev/null
fi
}
thread_detach & PID_1=$!
thread_attach & PID_2=$!
thread_detach & PID_3=$!
thread_attach & PID_4=$!
function do_inserts()
{
for i in {1..30}; do
while ! insert $i; do $CLICKHOUSE_CLIENT -q "SELECT '$CLICKHOUSE_DATABASE', 'retrying insert $i' FORMAT Null"; done
done
}
$CLICKHOUSE_CLIENT -q "SELECT '$CLICKHOUSE_DATABASE', 'begin inserts'"
do_inserts 2>&1| grep -Fa "Exception: " | grep -Fv "was cancelled by concurrent ALTER PARTITION"
$CLICKHOUSE_CLIENT -q "SELECT '$CLICKHOUSE_DATABASE', 'end inserts'"
kill -TERM $PID_1 && kill -TERM $PID_2 && kill -TERM $PID_3 && kill -TERM $PID_4
wait
$CLICKHOUSE_CLIENT -q "SYSTEM SYNC REPLICA alter_table0"
$CLICKHOUSE_CLIENT -q "SYSTEM SYNC REPLICA alter_table1"
query_with_retry "ALTER TABLE alter_table0 ATTACH PARTITION ID 'all'" 2>/dev/null;
$CLICKHOUSE_CLIENT -q "ALTER TABLE alter_table1 ATTACH PARTITION ID 'all'" 2>/dev/null
$CLICKHOUSE_CLIENT -q "SYSTEM SYNC REPLICA alter_table1"
$CLICKHOUSE_CLIENT -q "ALTER TABLE alter_table1 ATTACH PARTITION ID 'all'"
$CLICKHOUSE_CLIENT -q "SYSTEM SYNC REPLICA alter_table0"
$CLICKHOUSE_CLIENT -q "SYSTEM SYNC REPLICA alter_table1"
engine=$($CLICKHOUSE_CLIENT -q "SELECT engine FROM system.tables WHERE database=currentDatabase() AND table='alter_table0'")
if [[ "$engine" == "ReplicatedMergeTree" ]]; then
# ReplicatedMergeTree may duplicate data on ATTACH PARTITION (when one replica has a merged part and another replica has source parts only)
$CLICKHOUSE_CLIENT -q "OPTIMIZE TABLE alter_table0 FINAL DEDUPLICATE"
$CLICKHOUSE_CLIENT -q "SYSTEM SYNC REPLICA alter_table1"
fi
$CLICKHOUSE_CLIENT -q "SELECT count(), sum(b) FROM alter_table0"
$CLICKHOUSE_CLIENT -q "SELECT count(), sum(b) FROM alter_table1"
$CLICKHOUSE_CLIENT -q "DROP TABLE alter_table0"
$CLICKHOUSE_CLIENT -q "DROP TABLE alter_table1"

View File

@ -5,23 +5,6 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh # shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh . "$CURDIR"/../shell_config.sh
function query_with_retry
{
retry=0
until [ $retry -ge 5 ]
do
result=$($CLICKHOUSE_CLIENT $2 --query="$1" 2>&1)
if [ "$?" == 0 ]; then
echo -n "$result"
return
else
retry=$(($retry + 1))
sleep 3
fi
done
echo "Query '$1' failed with '$result'"
}
$CLICKHOUSE_CLIENT -n --query " $CLICKHOUSE_CLIENT -n --query "
DROP TABLE IF EXISTS load_parts_refcounts SYNC; DROP TABLE IF EXISTS load_parts_refcounts SYNC;

View File

@ -0,0 +1,34 @@
sessions:
150
port_0_sessions:
0
address_0_sessions:
0
tcp_sessions
60
http_sessions
30
http_with_session_id_sessions
30
my_sql_sessions
30
Corresponding LoginSuccess/Logout
10
LoginFailure
10
Corresponding LoginSuccess/Logout
10
LoginFailure
10
Corresponding LoginSuccess/Logout
10
LoginFailure
10
Corresponding LoginSuccess/Logout
10
LoginFailure
10
Corresponding LoginSuccess/Logout
10
LoginFailure
10

View File

@ -0,0 +1,138 @@
#!/usr/bin/env bash
# Tags: no-fasttest, no-debug
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
readonly PID=$$
# Each user uses a separate thread.
readonly TCP_USERS=( "02833_TCP_USER_${PID}"_{1,2} ) # 2 concurrent TCP users
readonly HTTP_USERS=( "02833_HTTP_USER_${PID}" )
readonly HTTP_WITH_SESSION_ID_SESSION_USERS=( "02833_HTTP_WITH_SESSION_ID_USER_${PID}" )
readonly MYSQL_USERS=( "02833_MYSQL_USER_${PID}")
readonly ALL_USERS=( "${TCP_USERS[@]}" "${HTTP_USERS[@]}" "${HTTP_WITH_SESSION_ID_SESSION_USERS[@]}" "${MYSQL_USERS[@]}" )
readonly TCP_USERS_SQL_COLLECTION_STRING="$( echo "${TCP_USERS[*]}" | sed "s/[^[:space:]]\+/'&'/g" | sed 's/[[:space:]]/,/g' )"
readonly HTTP_USERS_SQL_COLLECTION_STRING="$( echo "${HTTP_USERS[*]}" | sed "s/[^[:space:]]\+/'&'/g" | sed 's/[[:space:]]/,/g' )"
readonly HTTP_WITH_SESSION_ID_USERS_SQL_COLLECTION_STRING="$( echo "${HTTP_WITH_SESSION_ID_SESSION_USERS[*]}" | sed "s/[^[:space:]]\+/'&'/g" | sed 's/[[:space:]]/,/g' )"
readonly MYSQL_USERS_SQL_COLLECTION_STRING="$( echo "${MYSQL_USERS[*]}" | sed "s/[^[:space:]]\+/'&'/g" | sed 's/[[:space:]]/,/g' )"
readonly ALL_USERS_SQL_COLLECTION_STRING="$( echo "${ALL_USERS[*]}" | sed "s/[^[:space:]]\+/'&'/g" | sed 's/[[:space:]]/,/g' )"
readonly SESSION_LOG_MATCHING_FIELDS="auth_id, auth_type, client_version_major, client_version_minor, client_version_patch, interface"
for user in "${ALL_USERS[@]}"; do
${CLICKHOUSE_CLIENT} -q "CREATE USER IF NOT EXISTS ${user} IDENTIFIED WITH plaintext_password BY 'pass'"
${CLICKHOUSE_CLIENT} -q "GRANT SELECT ON system.* TO ${user}"
${CLICKHOUSE_CLIENT} -q "GRANT SELECT ON INFORMATION_SCHEMA.* TO ${user}";
done
# All <type>_session functions execute in separate threads.
# These functions try to create a session with successful login and logout.
# Sleep a small, random amount of time to make concurrency more intense.
# and try to login with an invalid password.
function tcp_session()
{
local user=$1
local i=0
while (( (i++) < 10 )); do
# login logout
${CLICKHOUSE_CLIENT} -q "SELECT 1, sleep(0.01${RANDOM})" --user="${user}" --password="pass"
# login failure
${CLICKHOUSE_CLIENT} -q "SELECT 2" --user="${user}" --password 'invalid'
done
}
function http_session()
{
local user=$1
local i=0
while (( (i++) < 10 )); do
# login logout
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&user=${user}&password=pass" -d "SELECT 3, sleep(0.01${RANDOM})"
# login failure
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&user=${user}&password=wrong" -d "SELECT 4"
done
}
function http_with_session_id_session()
{
local user=$1
local i=0
while (( (i++) < 10 )); do
# login logout
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&session_id=${user}&user=${user}&password=pass" -d "SELECT 5, sleep 0.01${RANDOM}"
# login failure
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&session_id=${user}&user=${user}&password=wrong" -d "SELECT 6"
done
}
function mysql_session()
{
local user=$1
local i=0
while (( (i++) < 10 )); do
# login logout
${CLICKHOUSE_CLIENT} -q "SELECT 1, sleep(0.01${RANDOM}) FROM mysql('127.0.0.1:9004', 'system', 'one', '${user}', 'pass')"
# login failure
${CLICKHOUSE_CLIENT} -q "SELECT 1 FROM mysql('127.0.0.1:9004', 'system', 'one', '${user}', 'wrong', SETTINGS connection_max_tries=1)"
done
}
${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS"
${CLICKHOUSE_CLIENT} -q "DELETE FROM system.session_log WHERE user IN (${ALL_USERS_SQL_COLLECTION_STRING})"
export -f tcp_session;
export -f http_session;
export -f http_with_session_id_session;
export -f mysql_session;
for user in "${TCP_USERS[@]}"; do
timeout 60s bash -c "tcp_session ${user}" >/dev/null 2>&1 &
done
for user in "${HTTP_USERS[@]}"; do
timeout 60s bash -c "http_session ${user}" >/dev/null 2>&1 &
done
for user in "${HTTP_WITH_SESSION_ID_SESSION_USERS[@]}"; do
timeout 60s bash -c "http_with_session_id_session ${user}" >/dev/null 2>&1 &
done
for user in "${MYSQL_USERS[@]}"; do
timeout 60s bash -c "mysql_session ${user}" >/dev/null 2>&1 &
done
wait
${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS"
echo "sessions:"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${ALL_USERS_SQL_COLLECTION_STRING})"
echo "port_0_sessions:"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${ALL_USERS_SQL_COLLECTION_STRING}) AND client_port = 0"
echo "address_0_sessions:"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${ALL_USERS_SQL_COLLECTION_STRING}) AND client_address = toIPv6('::')"
echo "tcp_sessions"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${TCP_USERS_SQL_COLLECTION_STRING}) AND interface = 'TCP'"
echo "http_sessions"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${HTTP_USERS_SQL_COLLECTION_STRING}) AND interface = 'HTTP'"
echo "http_with_session_id_sessions"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${HTTP_WITH_SESSION_ID_USERS_SQL_COLLECTION_STRING}) AND interface = 'HTTP'"
echo "my_sql_sessions"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${MYSQL_USERS_SQL_COLLECTION_STRING}) AND interface = 'MySQL'"
for user in "${ALL_USERS[@]}"; do
${CLICKHOUSE_CLIENT} -q "DROP USER ${user}"
echo "Corresponding LoginSuccess/Logout"
${CLICKHOUSE_CLIENT} -q "SELECT COUNT(*) FROM (SELECT ${SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = '${user}' AND type = 'LoginSuccess' INTERSECT SELECT ${SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = '${user}' AND type = 'Logout')"
echo "LoginFailure"
${CLICKHOUSE_CLIENT} -q "SELECT COUNT(*) FROM system.session_log WHERE user = '${user}' AND type = 'LoginFailure'"
done

View File

@ -0,0 +1,13 @@
0
0
0
0
client_port 0 connections:
0
client_address '::' connections:
0
login failures:
0
TCP Login and logout count is equal
HTTP Login and logout count is equal
MySQL Login and logout count is equal

View File

@ -0,0 +1,56 @@
#!/usr/bin/env bash
# Tags: no-fasttest
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
readonly PID=$$
readonly TEST_USER=$"02834_USER_${PID}"
readonly SESSION_LOG_MATCHING_FIELDS="auth_id, auth_type, client_version_major, client_version_minor, client_version_patch, interface"
${CLICKHOUSE_CLIENT} -q "CREATE USER IF NOT EXISTS ${TEST_USER} IDENTIFIED WITH plaintext_password BY 'pass'"
${CLICKHOUSE_CLIENT} -q "GRANT SELECT ON INFORMATION_SCHEMA.* TO ${TEST_USER}"
${CLICKHOUSE_CLIENT} -q "GRANT SELECT ON system.* TO ${TEST_USER}"
${CLICKHOUSE_CLIENT} -q "GRANT CREATE TEMPORARY TABLE, MYSQL, REMOTE ON *.* TO ${TEST_USER}"
${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS"
${CLICKHOUSE_CLIENT} -q "DELETE FROM system.session_log WHERE user = '${TEST_USER}'"
${CLICKHOUSE_CURL} -sS -X POST "${CLICKHOUSE_URL}&user=${TEST_USER}&password=pass" \
-d "SELECT * FROM remote('127.0.0.1:${CLICKHOUSE_PORT_TCP}', 'system', 'one', '${TEST_USER}', 'pass')"
${CLICKHOUSE_CURL} -sS -X POST "${CLICKHOUSE_URL}&user=${TEST_USER}&password=pass" \
-d "SELECT * FROM mysql('127.0.0.1:9004', 'system', 'one', '${TEST_USER}', 'pass')"
${CLICKHOUSE_CLIENT} -q "SELECT * FROM remote('127.0.0.1:${CLICKHOUSE_PORT_TCP}', 'system', 'one', '${TEST_USER}', 'pass')" -u "${TEST_USER}" --password "pass"
${CLICKHOUSE_CLIENT} -q "SELECT * FROM mysql('127.0.0.1:9004', 'system', 'one', '${TEST_USER}', 'pass')" -u "${TEST_USER}" --password "pass"
${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS"
echo "client_port 0 connections:"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user = '${TEST_USER}' and client_port = 0"
echo "client_address '::' connections:"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user = '${TEST_USER}' and client_address = toIPv6('::')"
echo "login failures:"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user = '${TEST_USER}' and type = 'LoginFailure'"
# remote(...) function sometimes reuses old cached sessions for query execution.
# This makes LoginSuccess/Logout entries count unstable, but success and logouts must always match.
for interface in 'TCP' 'HTTP' 'MySQL'
do
LOGIN_COUNT=`${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user = '${TEST_USER}' AND type = 'LoginSuccess' AND interface = '${interface}'"`
CORRESPONDING_LOGOUT_RECORDS_COUNT=`${CLICKHOUSE_CLIENT} -q "SELECT COUNT(*) FROM (SELECT ${SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = '${TEST_USER}' AND type = 'LoginSuccess' AND interface = '${interface}' INTERSECT SELECT ${SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = '${TEST_USER}' AND type = 'Logout' AND interface = '${interface}')"`
if [ "$LOGIN_COUNT" == "$CORRESPONDING_LOGOUT_RECORDS_COUNT" ]; then
echo "${interface} Login and logout count is equal"
else
TOTAL_LOGOUT_COUNT=`${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user = '${TEST_USER}' AND type = 'Logout' AND interface = '${interface}'"`
echo "${interface} Login count ${LOGIN_COUNT} != corresponding logout count ${CORRESPONDING_LOGOUT_RECORDS_COUNT}. TOTAL_LOGOUT_COUNT ${TOTAL_LOGOUT_COUNT}"
fi
done
${CLICKHOUSE_CLIENT} -q "DROP USER ${TEST_USER}"

View File

@ -0,0 +1,8 @@
port_0_sessions:
0
address_0_sessions:
0
Corresponding LoginSuccess/Logout
9
LoginFailure
0

View File

@ -0,0 +1,114 @@
#!/usr/bin/env bash
# Tags: no-debug
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
readonly PID=$$
readonly TEST_USER="02835_USER_${PID}"
readonly TEST_ROLE="02835_ROLE_${PID}"
readonly TEST_PROFILE="02835_PROFILE_${PID}"
readonly SESSION_LOG_MATCHING_FIELDS="auth_id, auth_type, client_version_major, client_version_minor, client_version_patch, interface"
function tcp_session()
{
local user=$1
${CLICKHOUSE_CLIENT} -q "SELECT COUNT(*) FROM system.numbers" --user="${user}"
}
function http_session()
{
local user=$1
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&user=${user}&password=pass" -d "SELECT COUNT(*) FROM system.numbers"
}
function http_with_session_id_session()
{
local user=$1
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&user=${user}&password=pass" -d "SELECT COUNT(*) FROM system.numbers"
}
# Busy-waits until user $1, specified amount of queries ($2) will run simultaneously.
function wait_for_queries_start()
{
local user=$1
local queries_count=$2
# 10 seconds waiting
counter=0 retries=100
while [[ $counter -lt $retries ]]; do
result=$($CLICKHOUSE_CLIENT --query "SELECT COUNT(*) FROM system.processes WHERE user = '${user}'")
if [[ $result == "${queries_count}" ]]; then
break;
fi
sleep 0.1
((++counter))
done
}
${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS"
${CLICKHOUSE_CLIENT} -q "DELETE FROM system.session_log WHERE user = '${TEST_USER}'"
# DROP USE CASE
${CLICKHOUSE_CLIENT} -q "CREATE USER IF NOT EXISTS ${TEST_USER}"
${CLICKHOUSE_CLIENT} -q "GRANT SELECT ON system.numbers TO ${TEST_USER}"
export -f tcp_session;
export -f http_session;
export -f http_with_session_id_session;
timeout 10s bash -c "tcp_session ${TEST_USER}" >/dev/null 2>&1 &
timeout 10s bash -c "http_session ${TEST_USER}" >/dev/null 2>&1 &
timeout 10s bash -c "http_with_session_id_session ${TEST_USER}" >/dev/null 2>&1 &
wait_for_queries_start $TEST_USER 3
${CLICKHOUSE_CLIENT} -q "DROP USER ${TEST_USER}"
${CLICKHOUSE_CLIENT} -q "KILL QUERY WHERE user = '${TEST_USER}' SYNC" >/dev/null &
wait
# DROP ROLE CASE
${CLICKHOUSE_CLIENT} -q "CREATE ROLE IF NOT EXISTS ${TEST_ROLE}"
${CLICKHOUSE_CLIENT} -q "CREATE USER ${TEST_USER} DEFAULT ROLE ${TEST_ROLE}"
${CLICKHOUSE_CLIENT} -q "GRANT SELECT ON system.numbers TO ${TEST_USER}"
timeout 10s bash -c "tcp_session ${TEST_USER}" >/dev/null 2>&1 &
timeout 10s bash -c "http_session ${TEST_USER}" >/dev/null 2>&1 &
timeout 10s bash -c "http_with_session_id_session ${TEST_USER}" >/dev/null 2>&1 &
wait_for_queries_start $TEST_USER 3
${CLICKHOUSE_CLIENT} -q "DROP ROLE ${TEST_ROLE}"
${CLICKHOUSE_CLIENT} -q "DROP USER ${TEST_USER}"
${CLICKHOUSE_CLIENT} -q "KILL QUERY WHERE user = '${TEST_USER}' SYNC" >/dev/null &
wait
# DROP PROFILE CASE
${CLICKHOUSE_CLIENT} -q "CREATE SETTINGS PROFILE IF NOT EXISTS '${TEST_PROFILE}'"
${CLICKHOUSE_CLIENT} -q "CREATE USER ${TEST_USER} SETTINGS PROFILE '${TEST_PROFILE}'"
${CLICKHOUSE_CLIENT} -q "GRANT SELECT ON system.numbers TO ${TEST_USER}"
timeout 10s bash -c "tcp_session ${TEST_USER}" >/dev/null 2>&1 &
timeout 10s bash -c "http_session ${TEST_USER}" >/dev/null 2>&1 &
timeout 10s bash -c "http_with_session_id_session ${TEST_USER}" >/dev/null 2>&1 &
wait_for_queries_start $TEST_USER 3
${CLICKHOUSE_CLIENT} -q "DROP SETTINGS PROFILE '${TEST_PROFILE}'"
${CLICKHOUSE_CLIENT} -q "DROP USER ${TEST_USER}"
${CLICKHOUSE_CLIENT} -q "KILL QUERY WHERE user = '${TEST_USER}' SYNC" >/dev/null &
wait
${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS"
echo "port_0_sessions:"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user = '${TEST_USER}' AND client_port = 0"
echo "address_0_sessions:"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user = '${TEST_USER}' AND client_address = toIPv6('::')"
echo "Corresponding LoginSuccess/Logout"
${CLICKHOUSE_CLIENT} -q "SELECT COUNT(*) FROM (SELECT ${SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = '${TEST_USER}' AND type = 'LoginSuccess' INTERSECT SELECT ${SESSION_LOG_MATCHING_FIELDS}, FROM system.session_log WHERE user = '${TEST_USER}' AND type = 'Logout')"
echo "LoginFailure"
${CLICKHOUSE_CLIENT} -q "SELECT COUNT(*) FROM system.session_log WHERE user = '${TEST_USER}' AND type = 'LoginFailure'"

View File

@ -155,3 +155,23 @@ function random_str()
local n=$1 && shift local n=$1 && shift
tr -cd '[:lower:]' < /dev/urandom | head -c"$n" tr -cd '[:lower:]' < /dev/urandom | head -c"$n"
} }
function query_with_retry
{
local query="$1" && shift
local retry=0
until [ $retry -ge 5 ]
do
local result
result="$($CLICKHOUSE_CLIENT "$@" --query="$query" 2>&1)"
if [ "$?" == 0 ]; then
echo -n "$result"
return
else
retry=$((retry + 1))
sleep 3
fi
done
echo "Query '$query' failed with '$result'"
}