From e789d15948eaec3eaa9a8604e24d2f6ed7b60db5 Mon Sep 17 00:00:00 2001 From: taiyang-li <654010905@qq.com> Date: Tue, 5 Mar 2024 16:06:25 +0800 Subject: [PATCH 01/30] optimize insertmanyfrom of nullable(number) or nullable(string) --- src/Columns/ColumnDecimal.h | 7 +++++++ src/Columns/ColumnNullable.cpp | 8 ++++++++ src/Columns/ColumnNullable.h | 1 + src/Columns/ColumnString.cpp | 21 +++++++++++++++++++++ src/Columns/ColumnString.h | 2 ++ 5 files changed, 39 insertions(+) diff --git a/src/Columns/ColumnDecimal.h b/src/Columns/ColumnDecimal.h index 7ca01a8342c..e0ea26744dc 100644 --- a/src/Columns/ColumnDecimal.h +++ b/src/Columns/ColumnDecimal.h @@ -56,6 +56,13 @@ public: void shrinkToFit() override { data.shrink_to_fit(); } void insertFrom(const IColumn & src, size_t n) override { data.push_back(static_cast(src).getData()[n]); } + + void insertManyFrom(const IColumn & src, size_t position, size_t length) override + { + ValueType v = assert_cast(src).getData()[position]; + data.resize_fill(data.size() + length, v); + } + void insertData(const char * src, size_t /*length*/) override; void insertDefault() override { data.push_back(T()); } void insertManyDefaults(size_t length) override { data.resize_fill(data.size() + length); } diff --git a/src/Columns/ColumnNullable.cpp b/src/Columns/ColumnNullable.cpp index 1d11827ac97..fa5fdfb8c21 100644 --- a/src/Columns/ColumnNullable.cpp +++ b/src/Columns/ColumnNullable.cpp @@ -231,6 +231,14 @@ void ColumnNullable::insertFrom(const IColumn & src, size_t n) getNullMapData().push_back(src_concrete.getNullMapData()[n]); } + +void ColumnNullable::insertManyFrom(const IColumn & src, size_t position, size_t length) +{ + const ColumnNullable & src_concrete = assert_cast(src); + getNestedColumn().insertManyFrom(src_concrete.getNestedColumn(), position, length); + getNullMapColumn().insertManyFrom(src_concrete.getNullMapColumn(), position, length); +} + void ColumnNullable::insertFromNotNullable(const IColumn & src, size_t n) { getNestedColumn().insertFrom(src, n); diff --git a/src/Columns/ColumnNullable.h b/src/Columns/ColumnNullable.h index b4aef8e08fa..ef4bf4fa41b 100644 --- a/src/Columns/ColumnNullable.h +++ b/src/Columns/ColumnNullable.h @@ -69,6 +69,7 @@ public: void insert(const Field & x) override; bool tryInsert(const Field & x) override; void insertFrom(const IColumn & src, size_t n) override; + void insertManyFrom(const IColumn & src, size_t position, size_t length) override; void insertFromNotNullable(const IColumn & src, size_t n); void insertRangeFromNotNullable(const IColumn & src, size_t start, size_t length); diff --git a/src/Columns/ColumnString.cpp b/src/Columns/ColumnString.cpp index b9128372cea..f3c7ac1bf0c 100644 --- a/src/Columns/ColumnString.cpp +++ b/src/Columns/ColumnString.cpp @@ -38,6 +38,27 @@ ColumnString::ColumnString(const ColumnString & src) last_offset, chars.size()); } +void ColumnString::insertManyFrom(const IColumn & src, size_t position, size_t length) +{ + const ColumnString & src_concrete = assert_cast(src); + const UInt8 * src_buf = &src_concrete.chars[src_concrete.offsets[position - 1]]; + const size_t src_buf_size + = src_concrete.offsets[position] - src_concrete.offsets[position - 1]; /// -1th index is Ok, see PaddedPODArray. + + const size_t old_size = chars.size(); + const size_t new_size = old_size + src_buf_size * length; + chars.resize(new_size); + + const size_t old_rows = offsets.size(); + offsets.resize(old_rows + length); + + for (size_t current_offset = old_size; current_offset < new_size; current_offset += src_buf_size) + memcpySmallAllowReadWriteOverflow15(&chars[current_offset], src_buf, src_buf_size); + + for (size_t i = 0, current_offset = old_size + src_buf_size; i < length; ++i, current_offset += src_buf_size) + offsets[old_rows + i] = current_offset; +} + MutableColumnPtr ColumnString::cloneResized(size_t to_size) const { diff --git a/src/Columns/ColumnString.h b/src/Columns/ColumnString.h index 04aa1849187..2d1d69ced73 100644 --- a/src/Columns/ColumnString.h +++ b/src/Columns/ColumnString.h @@ -160,6 +160,8 @@ public: } } + void insertManyFrom(const IColumn & src, size_t position, size_t length) override; + void insertData(const char * pos, size_t length) override { const size_t old_size = chars.size(); From a109952960acac12790cffde030062ec60208994 Mon Sep 17 00:00:00 2001 From: taiyang-li <654010905@qq.com> Date: Tue, 5 Mar 2024 22:08:36 +0800 Subject: [PATCH 02/30] dev columnstring --- src/Columns/ColumnArray.cpp | 83 +++++++++++++++++++++++++++++++ src/Columns/ColumnArray.h | 9 ++++ src/Columns/ColumnConst.h | 2 + src/Columns/ColumnFixedString.cpp | 14 ++++++ src/Columns/ColumnFixedString.h | 2 + 5 files changed, 110 insertions(+) diff --git a/src/Columns/ColumnArray.cpp b/src/Columns/ColumnArray.cpp index 7b268b80116..b620da81ae8 100644 --- a/src/Columns/ColumnArray.cpp +++ b/src/Columns/ColumnArray.cpp @@ -347,6 +347,89 @@ void ColumnArray::insertFrom(const IColumn & src_, size_t n) getOffsets().push_back(getOffsets().back() + size); } +template +void ColumnArray::insertManyFromNumber(const ColumnArray & src, size_t position, size_t length) +{ + using ColVecType = ColumnVectorOrDecimal; + size_t src_size = src.sizeAt(position); + size_t src_offset = src.offsetAt(position); + + const typename ColVecType::Container & src_data = typeid_cast(src.getData()).getData(); + typename ColVecType::Container & data_ref = typeid_cast(getData()).getData(); + size_t old_size = data_ref.size(); + size_t new_size = old_size + src_size * length; + data_ref.resize(new_size); + for (size_t i = 0, offset = old_size; i < length; ++i, offset += src_size) + memcpy(&data_ref[offset], &src_data[src_offset], src_size * sizeof(T)); +} + +void ColumnArray::insertManyFromString(const ColumnArray & src, size_t position, size_t length) +{ + size_t src_size = src.sizeAt(position); + size_t src_offset = src.offsetAt(position); + + const auto & src_string = typeid_cast(src.getData()); + const auto & src_chars = src_string.getChars(); + const auto & src_string_offsets = src_string.getOffsets(); + auto & dst_string = typeid_cast(getData()); + auto & dst_chars = dst_string.getChars(); + auto & dst_string_offsets = dst_string.getOffsets(); + + /// Each row may have multiple strings, copy them to dst_chars and update dst_offsets + size_t old_size = dst_string_offsets.size(); + size_t new_size = old_size + src_size * length; + dst_string_offsets.resize(new_size); + size_t dst_string_offset = dst_chars.size(); + for (size_t i = 0; i < length; ++i) + { + for (size_t j = 0; j < src_size; ++j) + { + size_t nested_offset = src_string_offsets[src_offset + j - 1]; + size_t nested_length = src_string_offsets[src_offset + j] - nested_offset; + + dst_string_offset += nested_length; + dst_string_offsets[old_size + i * src_size + j] = dst_string_offset; + } + } + + size_t chars_to_copy = src_string_offsets[src_offset + src_size - 1] - src_string_offsets[src_offset - 1]; + dst_chars.resize(dst_chars.size() + chars_to_copy * length); + for (size_t dst_offset = old_size; dst_offset < new_size; dst_offset += src_size) + memcpy(&dst_chars[dst_string_offsets[dst_offset - 1]], &src_chars[src_string_offsets[src_offset - 1]], chars_to_copy); +} + +void ColumnArray::insertManyFromTuple(const ColumnArray & src, size_t position, size_t length) +{ + +} +void ColumnArray::insertManyFromNullable(const ColumnArray & src, size_t position, size_t length) +{ + +} +void ColumnArray::insertManyFromGeneric(const ColumnArray & src, size_t position, size_t length) +{ + size_t src_size = src.sizeAt(position); + size_t src_offset = src.offsetAt(position); + const auto & src_data = src.getData(); + size_t new_size = data->size() + src_size * length; + data->reserve(new_size); + for (size_t i = 0; i < length; ++i) + data->insertRangeFrom(src_data, src_offset, src_size); +} + +void ColumnArray::insertManyFrom(const IColumn & src_, size_t position, size_t length) +{ + /// First fill offsets + const ColumnArray & src = assert_cast(src_); + size_t src_size = src.sizeAt(position); + auto & offsets_ref = getOffsets(); + size_t old_rows = offsets_ref.size(); + size_t new_rows = old_rows + length; + size_t old_size = offsets_ref.back(); + offsets_ref.resize(new_rows); + for (size_t i = 0, offset = old_size + src_size; i < length; ++i, offset += src_size) + offsets_ref[old_rows + i] = offset; +} void ColumnArray::insertDefault() { diff --git a/src/Columns/ColumnArray.h b/src/Columns/ColumnArray.h index 230d8830265..73d632a38b9 100644 --- a/src/Columns/ColumnArray.h +++ b/src/Columns/ColumnArray.h @@ -88,6 +88,7 @@ public: void insert(const Field & x) override; bool tryInsert(const Field & x) override; void insertFrom(const IColumn & src_, size_t n) override; + void insertManyFrom(const IColumn & src, size_t position, size_t length) override; void insertDefault() override; void popBack(size_t n) override; ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override; @@ -213,6 +214,14 @@ private: ColumnPtr filterNullable(const Filter & filt, ssize_t result_size_hint) const; ColumnPtr filterGeneric(const Filter & filt, ssize_t result_size_hint) const; + /// Specializations for insertManyFrom + template + void insertManyFromNumber(const ColumnArray & src, size_t position, size_t length); + void insertManyFromString(const ColumnArray & src, size_t position, size_t length); + void insertManyFromTuple(const ColumnArray & src, size_t position, size_t length); + void insertManyFromNullable(const ColumnArray & src, size_t position, size_t length); + void insertManyFromGeneric(const ColumnArray & src, size_t position, size_t length); + int compareAtImpl(size_t n, size_t m, const IColumn & rhs_, int nan_direction_hint, const Collator * collator=nullptr) const; }; diff --git a/src/Columns/ColumnConst.h b/src/Columns/ColumnConst.h index 990b7189fa3..4a3d40ca0d2 100644 --- a/src/Columns/ColumnConst.h +++ b/src/Columns/ColumnConst.h @@ -150,6 +150,8 @@ public: ++s; } + void insertManyFrom(const IColumn & /*src*/, size_t /* position */, size_t length) override { s += length; } + void insertDefault() override { ++s; diff --git a/src/Columns/ColumnFixedString.cpp b/src/Columns/ColumnFixedString.cpp index e460c84d696..b55f68d4687 100644 --- a/src/Columns/ColumnFixedString.cpp +++ b/src/Columns/ColumnFixedString.cpp @@ -85,6 +85,20 @@ void ColumnFixedString::insertFrom(const IColumn & src_, size_t index) memcpySmallAllowReadWriteOverflow15(chars.data() + old_size, &src.chars[n * index], n); } +void ColumnFixedString::insertManyFrom(const IColumn & src, size_t position, size_t length) +{ + const ColumnFixedString & src_concrete = assert_cast(src); + if (n != src_concrete.getN()) + throw Exception(ErrorCodes::SIZE_OF_FIXED_STRING_DOESNT_MATCH, "Size of FixedString doesn't match"); + + const size_t old_size = chars.size(); + const size_t new_size = old_size + n * length; + chars.resize(new_size); + + for (size_t offset = old_size; offset < new_size; offset += n) + memcpySmallAllowReadWriteOverflow15(&chars[offset], &src_concrete.chars[n * position], n); +} + void ColumnFixedString::insertData(const char * pos, size_t length) { if (length > n) diff --git a/src/Columns/ColumnFixedString.h b/src/Columns/ColumnFixedString.h index f40e1356b27..56d42e8b34e 100644 --- a/src/Columns/ColumnFixedString.h +++ b/src/Columns/ColumnFixedString.h @@ -100,6 +100,8 @@ public: void insertFrom(const IColumn & src_, size_t index) override; + void insertManyFrom(const IColumn & src, size_t position, size_t length) override; + void insertData(const char * pos, size_t length) override; void insertDefault() override From 53c9d4513c4b93ed79df305bb5c36c0cfb43ef79 Mon Sep 17 00:00:00 2001 From: taiyang-li <654010905@qq.com> Date: Wed, 6 Mar 2024 12:16:17 +0800 Subject: [PATCH 03/30] finish dev column array --- src/Columns/ColumnArray.cpp | 132 +++++++++++++++++++++++++++++++++--- src/Columns/ColumnArray.h | 3 + 2 files changed, 125 insertions(+), 10 deletions(-) diff --git a/src/Columns/ColumnArray.cpp b/src/Columns/ColumnArray.cpp index b620da81ae8..aa0d5aa3e50 100644 --- a/src/Columns/ColumnArray.cpp +++ b/src/Columns/ColumnArray.cpp @@ -31,6 +31,7 @@ namespace ErrorCodes extern const int SIZES_OF_COLUMNS_DOESNT_MATCH; extern const int LOGICAL_ERROR; extern const int TOO_LARGE_ARRAY_SIZE; + extern const int ILLEGAL_COLUMN; } /** Obtaining array as Field can be slow for large arrays and consume vast amount of memory. @@ -363,6 +364,19 @@ void ColumnArray::insertManyFromNumber(const ColumnArray & src, size_t position, memcpy(&data_ref[offset], &src_data[src_offset], src_size * sizeof(T)); } +void ColumnArray::insertManyFromConst(const ColumnConst & src, size_t position, size_t length) +{ + const ColumnArray * src_array = typeid_cast(&src.getDataColumn()); + if (!src_array) + throw Exception( + ErrorCodes::ILLEGAL_COLUMN, + "Cannot insert from const column of type {} to column of type {}", + src.getDataColumn().getName(), + getName()); + + insertManyFromImpl(*src_array, 0, length, true); +} + void ColumnArray::insertManyFromString(const ColumnArray & src, size_t position, size_t length) { size_t src_size = src.sizeAt(position); @@ -400,12 +414,53 @@ void ColumnArray::insertManyFromString(const ColumnArray & src, size_t position, void ColumnArray::insertManyFromTuple(const ColumnArray & src, size_t position, size_t length) { + ColumnTuple & tuple = assert_cast(getData()); + const ColumnTuple & src_tuple = assert_cast(src.getData()); + /// Make temporary arrays for each components of Tuple. In the same way as for Nullable. + size_t tuple_size = tuple.tupleSize(); + size_t src_tuple_size = src_tuple.tupleSize(); + if (tuple_size == 0) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty tuple"); + if (tuple_size != src_tuple_size) + throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Nested tuple size mismatch: {} vs {}", tuple_size, src_tuple_size); + + Columns temporary_arrays(tuple_size); + Columns src_temporary_arrays(tuple_size); + for (size_t i = 0; i < tuple_size; ++i) + { + temporary_arrays[i] = ColumnArray::create(tuple.getColumn(i).assumeMutable(), getOffsetsPtr()->assumeMutable()); + src_temporary_arrays[i] = ColumnArray::create(src_tuple.getColumn(i).assumeMutable(), src.getOffsetsPtr()->assumeMutable()); + assert_cast(*temporary_arrays[i]) + .insertManyFromImpl(assert_cast(*src_temporary_arrays[i]), position, length, false); + } + + Columns tuple_columns(tuple_size); + for (size_t i = 0; i < tuple_size; ++i) + tuple_columns[i] = assert_cast(*temporary_arrays[i]).getDataPtr(); + + getDataPtr() = ColumnTuple::create(std::move(tuple_columns)); } + void ColumnArray::insertManyFromNullable(const ColumnArray & src, size_t position, size_t length) { + ColumnNullable & nullable = assert_cast(getData()); + const ColumnNullable & src_nullable = assert_cast(src.getData()); + /// Process nested column without updating array offsets + auto array_of_nested = ColumnArray(nullable.getNestedColumnPtr()->assumeMutable(), getOffsetsPtr()->assumeMutable()); + auto src_array_of_nested = ColumnArray(src_nullable.getNestedColumnPtr()->assumeMutable(), src.getOffsetsPtr()->assumeMutable()); + array_of_nested.insertManyFromImpl(src_array_of_nested, position, length, false); + + /// Process null map column without updating array offsets + auto array_of_null_map = ColumnArray(nullable.getNullMapColumnPtr()->assumeMutable(), getOffsetsPtr()->assumeMutable()); + auto src_array_of_null_map = ColumnArray(src_nullable.getNullMapColumnPtr()->assumeMutable(), src.getOffsetsPtr()->assumeMutable()); + array_of_null_map.insertManyFromImpl(src_array_of_null_map, position, length, false); + + /// Update array data + getDataPtr() = ColumnNullable::create(array_of_nested.getDataPtr(), array_of_null_map.getDataPtr()); } + void ColumnArray::insertManyFromGeneric(const ColumnArray & src, size_t position, size_t length) { size_t src_size = src.sizeAt(position); @@ -419,16 +474,73 @@ void ColumnArray::insertManyFromGeneric(const ColumnArray & src, size_t position void ColumnArray::insertManyFrom(const IColumn & src_, size_t position, size_t length) { - /// First fill offsets - const ColumnArray & src = assert_cast(src_); - size_t src_size = src.sizeAt(position); - auto & offsets_ref = getOffsets(); - size_t old_rows = offsets_ref.size(); - size_t new_rows = old_rows + length; - size_t old_size = offsets_ref.back(); - offsets_ref.resize(new_rows); - for (size_t i = 0, offset = old_size + src_size; i < length; ++i, offset += src_size) - offsets_ref[old_rows + i] = offset; + const ColumnConst * src_const = typeid_cast(&src_); + if (src_const) + return insertManyFromConst(*src_const, position, length); + + const ColumnArray * src_array = typeid_cast(&src_); + if (!src_array) + throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Cannot insert from column of type {} to column of type {}", src_.getName(), getName()); + + return insertManyFromImpl(*src_array, position, length, true); +} + +void ColumnArray::insertManyFromImpl(const ColumnArray & src, size_t position, size_t length, bool update_offsets) +{ + /// First fill offsets if needed + if (update_offsets) + { + size_t src_size = src.sizeAt(position); + auto & offsets_ref = getOffsets(); + size_t old_rows = offsets_ref.size(); + size_t new_rows = old_rows + length; + size_t old_size = offsets_ref.back(); + offsets_ref.resize(new_rows); + for (size_t i = 0, offset = old_size + src_size; i < length; ++i, offset += src_size) + offsets_ref[old_rows + i] = offset; + } + + if (typeid_cast(data.get())) + return insertManyFromNumber(src, position, length); + if (typeid_cast(data.get())) + return insertManyFromNumber(src, position, length); + if (typeid_cast(data.get())) + return insertManyFromNumber(src, position, length); + if (typeid_cast(data.get())) + return insertManyFromNumber(src, position, length); + if (typeid_cast(data.get())) + return insertManyFromNumber(src, position, length); + if (typeid_cast(data.get())) + return insertManyFromNumber(src, position, length); + if (typeid_cast(data.get())) + return insertManyFromNumber(src, position, length); + if (typeid_cast(data.get())) + return insertManyFromNumber(src, position, length); + if (typeid_cast(data.get())) + return insertManyFromNumber(src, position, length); + if (typeid_cast(data.get())) + return insertManyFromNumber(src, position, length); + if (typeid_cast(data.get())) + return insertManyFromNumber(src, position, length); + if (typeid_cast(data.get())) + return insertManyFromNumber(src, position, length); + if (typeid_cast(data.get())) + return insertManyFromNumber(src, position, length); + if (typeid_cast *>(data.get())) + return insertManyFromNumber(src, position, length); + if (typeid_cast *>(data.get())) + return insertManyFromNumber(src, position, length); + if (typeid_cast *>(data.get())) + return insertManyFromNumber(src, position, length); + if (typeid_cast *>(data.get())) + return insertManyFromNumber(src, position, length); + if (typeid_cast *>(data.get())) + return insertManyFromNumber(src, position, length); + if (typeid_cast(data.get())) + return insertManyFromNullable(src, position, length); + if (typeid_cast(data.get())) + return insertManyFromTuple(src, position, length); + return insertManyFromGeneric(src, position, length); } void ColumnArray::insertDefault() diff --git a/src/Columns/ColumnArray.h b/src/Columns/ColumnArray.h index 73d632a38b9..765f86ec552 100644 --- a/src/Columns/ColumnArray.h +++ b/src/Columns/ColumnArray.h @@ -215,6 +215,9 @@ private: ColumnPtr filterGeneric(const Filter & filt, ssize_t result_size_hint) const; /// Specializations for insertManyFrom + void insertManyFromConst(const ColumnConst & src, size_t position, size_t length); + void insertManyFromImpl(const ColumnArray & src, size_t position, size_t length, bool update_offsets = true); + template void insertManyFromNumber(const ColumnArray & src, size_t position, size_t length); void insertManyFromString(const ColumnArray & src, size_t position, size_t length); From 3bf3c7cc708d1a564896d649a1a804b868f89d8d Mon Sep 17 00:00:00 2001 From: taiyang-li <654010905@qq.com> Date: Wed, 6 Mar 2024 12:32:23 +0800 Subject: [PATCH 04/30] finish column map and tuple --- src/Columns/ColumnArray.cpp | 2 +- src/Columns/ColumnMap.cpp | 5 +++++ src/Columns/ColumnMap.h | 1 + src/Columns/ColumnTuple.cpp | 12 ++++++++++++ src/Columns/ColumnTuple.h | 1 + 5 files changed, 20 insertions(+), 1 deletion(-) diff --git a/src/Columns/ColumnArray.cpp b/src/Columns/ColumnArray.cpp index aa0d5aa3e50..5b0df8e9b6b 100644 --- a/src/Columns/ColumnArray.cpp +++ b/src/Columns/ColumnArray.cpp @@ -364,7 +364,7 @@ void ColumnArray::insertManyFromNumber(const ColumnArray & src, size_t position, memcpy(&data_ref[offset], &src_data[src_offset], src_size * sizeof(T)); } -void ColumnArray::insertManyFromConst(const ColumnConst & src, size_t position, size_t length) +void ColumnArray::insertManyFromConst(const ColumnConst & src, size_t /*position*/, size_t length) { const ColumnArray * src_array = typeid_cast(&src.getDataColumn()); if (!src_array) diff --git a/src/Columns/ColumnMap.cpp b/src/Columns/ColumnMap.cpp index 995f3103484..57e8ba685b4 100644 --- a/src/Columns/ColumnMap.cpp +++ b/src/Columns/ColumnMap.cpp @@ -158,6 +158,11 @@ void ColumnMap::insertFrom(const IColumn & src, size_t n) nested->insertFrom(assert_cast(src).getNestedColumn(), n); } +void ColumnMap::insertManyFrom(const IColumn & src, size_t position, size_t length) +{ + assert_cast(*nested).insertManyFrom(assert_cast(src).getNestedColumn(), position, length); +} + void ColumnMap::insertRangeFrom(const IColumn & src, size_t start, size_t length) { nested->insertRangeFrom( diff --git a/src/Columns/ColumnMap.h b/src/Columns/ColumnMap.h index 17cd86a3788..60aa69e7bf6 100644 --- a/src/Columns/ColumnMap.h +++ b/src/Columns/ColumnMap.h @@ -67,6 +67,7 @@ public: void updateWeakHash32(WeakHash32 & hash) const override; void updateHashFast(SipHash & hash) const override; void insertFrom(const IColumn & src_, size_t n) override; + void insertManyFrom(const IColumn & src, size_t position, size_t length) override; void insertRangeFrom(const IColumn & src, size_t start, size_t length) override; ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override; void expand(const Filter & mask, bool inverted) override; diff --git a/src/Columns/ColumnTuple.cpp b/src/Columns/ColumnTuple.cpp index 17cc58d92f5..062bdadf9d2 100644 --- a/src/Columns/ColumnTuple.cpp +++ b/src/Columns/ColumnTuple.cpp @@ -185,6 +185,18 @@ void ColumnTuple::insertFrom(const IColumn & src_, size_t n) columns[i]->insertFrom(*src.columns[i], n); } +void ColumnTuple::insertManyFrom(const IColumn & src, size_t position, size_t length) +{ + const ColumnTuple & src_tuple = assert_cast(src); + + const size_t tuple_size = columns.size(); + if (src_tuple.columns.size() != tuple_size) + throw Exception(ErrorCodes::CANNOT_INSERT_VALUE_OF_DIFFERENT_SIZE_INTO_TUPLE, "Cannot insert value of different size into tuple"); + + for (size_t i = 0; i < tuple_size; ++i) + columns[i]->insertManyFrom(*src_tuple.columns[i], position, length); +} + void ColumnTuple::insertDefault() { for (auto & column : columns) diff --git a/src/Columns/ColumnTuple.h b/src/Columns/ColumnTuple.h index 610416b8b11..5b626155754 100644 --- a/src/Columns/ColumnTuple.h +++ b/src/Columns/ColumnTuple.h @@ -60,6 +60,7 @@ public: void insert(const Field & x) override; bool tryInsert(const Field & x) override; void insertFrom(const IColumn & src_, size_t n) override; + void insertManyFrom(const IColumn & src, size_t position, size_t length) override; void insertDefault() override; void popBack(size_t n) override; StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override; From 3005bff23100539dbb71f9623dc3aed9c34a87f6 Mon Sep 17 00:00:00 2001 From: taiyang-li <654010905@qq.com> Date: Wed, 6 Mar 2024 14:43:33 +0800 Subject: [PATCH 05/30] fix building --- src/Columns/ColumnArray.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Columns/ColumnArray.cpp b/src/Columns/ColumnArray.cpp index 5b0df8e9b6b..389b3e97820 100644 --- a/src/Columns/ColumnArray.cpp +++ b/src/Columns/ColumnArray.cpp @@ -425,7 +425,7 @@ void ColumnArray::insertManyFromTuple(const ColumnArray & src, size_t position, if (tuple_size != src_tuple_size) throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Nested tuple size mismatch: {} vs {}", tuple_size, src_tuple_size); - Columns temporary_arrays(tuple_size); + MutableColumns temporary_arrays(tuple_size); Columns src_temporary_arrays(tuple_size); for (size_t i = 0; i < tuple_size; ++i) { From 8e413da8f156ab03c875b9525044265cffcc5b83 Mon Sep 17 00:00:00 2001 From: taiyang-li <654010905@qq.com> Date: Wed, 6 Mar 2024 17:32:08 +0800 Subject: [PATCH 06/30] apply opts for string nested in array --- src/Columns/ColumnArray.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Columns/ColumnArray.cpp b/src/Columns/ColumnArray.cpp index 389b3e97820..44b17c89ae1 100644 --- a/src/Columns/ColumnArray.cpp +++ b/src/Columns/ColumnArray.cpp @@ -536,6 +536,8 @@ void ColumnArray::insertManyFromImpl(const ColumnArray & src, size_t position, s return insertManyFromNumber(src, position, length); if (typeid_cast *>(data.get())) return insertManyFromNumber(src, position, length); + if (typeid_cast(data.get())) + return insertManyFromString(src, position, length); if (typeid_cast(data.get())) return insertManyFromNullable(src, position, length); if (typeid_cast(data.get())) From 930deee699be05398aac334ce9e025d084c68a30 Mon Sep 17 00:00:00 2001 From: taiyang-li <654010905@qq.com> Date: Thu, 7 Mar 2024 22:02:10 +0800 Subject: [PATCH 07/30] fix bugs --- src/Columns/ColumnArray.cpp | 63 ++++++++++++++++++------------------- src/Columns/ColumnArray.h | 2 +- 2 files changed, 32 insertions(+), 33 deletions(-) diff --git a/src/Columns/ColumnArray.cpp b/src/Columns/ColumnArray.cpp index 44b17c89ae1..0214375122f 100644 --- a/src/Columns/ColumnArray.cpp +++ b/src/Columns/ColumnArray.cpp @@ -43,29 +43,34 @@ namespace ErrorCodes static constexpr size_t max_array_size_as_field = 1000000; -ColumnArray::ColumnArray(MutableColumnPtr && nested_column, MutableColumnPtr && offsets_column) +ColumnArray::ColumnArray(MutableColumnPtr && nested_column, MutableColumnPtr && offsets_column, bool check_offsets) : data(std::move(nested_column)), offsets(std::move(offsets_column)) { - const ColumnOffsets * offsets_concrete = typeid_cast(offsets.get()); - - if (!offsets_concrete) - throw Exception(ErrorCodes::LOGICAL_ERROR, "offsets_column must be a ColumnUInt64"); - - if (!offsets_concrete->empty() && data && !data->empty()) + if (check_offsets) { - Offset last_offset = offsets_concrete->getData().back(); + const ColumnOffsets * offsets_concrete = typeid_cast(offsets.get()); - /// This will also prevent possible overflow in offset. - if (data->size() != last_offset) - throw Exception(ErrorCodes::LOGICAL_ERROR, - "offsets_column has data inconsistent with nested_column. Data size: {}, last offset: {}", - data->size(), last_offset); + if (!offsets_concrete) + throw Exception(ErrorCodes::LOGICAL_ERROR, "offsets_column must be a ColumnUInt64"); + + if (!offsets_concrete->empty() && data && !data->empty()) + { + Offset last_offset = offsets_concrete->getData().back(); + + /// This will also prevent possible overflow in offset. + if (data->size() != last_offset) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "offsets_column has data inconsistent with nested_column. Data size: {}, last offset: {}", + data->size(), + last_offset); + } + + /** NOTE + * Arrays with constant value are possible and used in implementation of higher order functions (see FunctionReplicate). + * But in most cases, arrays with constant value are unexpected and code will work wrong. Use with caution. + */ } - - /** NOTE - * Arrays with constant value are possible and used in implementation of higher order functions (see FunctionReplicate). - * But in most cases, arrays with constant value are unexpected and code will work wrong. Use with caution. - */ } ColumnArray::ColumnArray(MutableColumnPtr && nested_column) @@ -425,20 +430,14 @@ void ColumnArray::insertManyFromTuple(const ColumnArray & src, size_t position, if (tuple_size != src_tuple_size) throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Nested tuple size mismatch: {} vs {}", tuple_size, src_tuple_size); - MutableColumns temporary_arrays(tuple_size); - Columns src_temporary_arrays(tuple_size); - for (size_t i = 0; i < tuple_size; ++i) - { - temporary_arrays[i] = ColumnArray::create(tuple.getColumn(i).assumeMutable(), getOffsetsPtr()->assumeMutable()); - src_temporary_arrays[i] = ColumnArray::create(src_tuple.getColumn(i).assumeMutable(), src.getOffsetsPtr()->assumeMutable()); - assert_cast(*temporary_arrays[i]) - .insertManyFromImpl(assert_cast(*src_temporary_arrays[i]), position, length, false); - } - Columns tuple_columns(tuple_size); for (size_t i = 0; i < tuple_size; ++i) - tuple_columns[i] = assert_cast(*temporary_arrays[i]).getDataPtr(); - + { + auto array_of_element = ColumnArray(tuple.getColumn(i).assumeMutable(), getOffsetsPtr()->assumeMutable(), false); + auto src_array_of_element = ColumnArray(src_tuple.getColumn(i).assumeMutable(), src.getOffsetsPtr()->assumeMutable()); + array_of_element.insertManyFromImpl(src_array_of_element, position, length, false); + tuple_columns[i] = array_of_element.getDataPtr(); + } getDataPtr() = ColumnTuple::create(std::move(tuple_columns)); } @@ -448,12 +447,12 @@ void ColumnArray::insertManyFromNullable(const ColumnArray & src, size_t positio const ColumnNullable & src_nullable = assert_cast(src.getData()); /// Process nested column without updating array offsets - auto array_of_nested = ColumnArray(nullable.getNestedColumnPtr()->assumeMutable(), getOffsetsPtr()->assumeMutable()); + auto array_of_nested = ColumnArray(nullable.getNestedColumnPtr()->assumeMutable(), getOffsetsPtr()->assumeMutable(), false); auto src_array_of_nested = ColumnArray(src_nullable.getNestedColumnPtr()->assumeMutable(), src.getOffsetsPtr()->assumeMutable()); array_of_nested.insertManyFromImpl(src_array_of_nested, position, length, false); /// Process null map column without updating array offsets - auto array_of_null_map = ColumnArray(nullable.getNullMapColumnPtr()->assumeMutable(), getOffsetsPtr()->assumeMutable()); + auto array_of_null_map = ColumnArray(nullable.getNullMapColumnPtr()->assumeMutable(), getOffsetsPtr()->assumeMutable(), false); auto src_array_of_null_map = ColumnArray(src_nullable.getNullMapColumnPtr()->assumeMutable(), src.getOffsetsPtr()->assumeMutable()); array_of_null_map.insertManyFromImpl(src_array_of_null_map, position, length, false); diff --git a/src/Columns/ColumnArray.h b/src/Columns/ColumnArray.h index 765f86ec552..8c4d103e7d0 100644 --- a/src/Columns/ColumnArray.h +++ b/src/Columns/ColumnArray.h @@ -21,7 +21,7 @@ private: friend class COWHelper, ColumnArray>; /** Create an array column with specified values and offsets. */ - ColumnArray(MutableColumnPtr && nested_column, MutableColumnPtr && offsets_column); + ColumnArray(MutableColumnPtr && nested_column, MutableColumnPtr && offsets_column, bool check_offsets = true); /** Create an empty column of arrays with the type of values as in the column `nested_column` */ explicit ColumnArray(MutableColumnPtr && nested_column); From e2317477f7b95d07407db8def968d286aa9e270d Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Fri, 8 Mar 2024 17:12:31 +0100 Subject: [PATCH 08/30] fix removing is_active node after re-creation --- src/Databases/DatabaseReplicatedWorker.cpp | 2 ++ tests/integration/test_replicated_database/test.py | 2 ++ 2 files changed, 4 insertions(+) diff --git a/src/Databases/DatabaseReplicatedWorker.cpp b/src/Databases/DatabaseReplicatedWorker.cpp index 2056b403ff6..0a6e8f9345e 100644 --- a/src/Databases/DatabaseReplicatedWorker.cpp +++ b/src/Databases/DatabaseReplicatedWorker.cpp @@ -75,6 +75,8 @@ void DatabaseReplicatedDDLWorker::initializeReplication() String active_path = fs::path(database->replica_path) / "active"; String active_id = toString(ServerUUID::get()); zookeeper->deleteEphemeralNodeIfContentMatches(active_path, active_id); + if (active_node_holder) + active_node_holder->setAlreadyRemoved(); zookeeper->create(active_path, active_id, zkutil::CreateMode::Ephemeral); active_node_holder.reset(); active_node_holder_zookeeper = zookeeper; diff --git a/tests/integration/test_replicated_database/test.py b/tests/integration/test_replicated_database/test.py index b47f86a843d..4f449f9a296 100644 --- a/tests/integration/test_replicated_database/test.py +++ b/tests/integration/test_replicated_database/test.py @@ -1141,6 +1141,8 @@ def test_sync_replica(started_cluster): dummy_node.query("SYSTEM SYNC DATABASE REPLICA test_sync_database") + assert "2\n" == main_node.query("SELECT sum(is_active) FROM system.clusters WHERE cluster='test_sync_database'") + assert dummy_node.query( "SELECT count() FROM system.tables where database='test_sync_database'" ).strip() == str(number_of_tables) From babe00003620ca34f228009d919d5613db867dee Mon Sep 17 00:00:00 2001 From: robot-clickhouse Date: Fri, 8 Mar 2024 16:43:10 +0000 Subject: [PATCH 09/30] Automatic style fix --- tests/integration/test_replicated_database/test.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_replicated_database/test.py b/tests/integration/test_replicated_database/test.py index 4f449f9a296..881659262ac 100644 --- a/tests/integration/test_replicated_database/test.py +++ b/tests/integration/test_replicated_database/test.py @@ -1141,7 +1141,9 @@ def test_sync_replica(started_cluster): dummy_node.query("SYSTEM SYNC DATABASE REPLICA test_sync_database") - assert "2\n" == main_node.query("SELECT sum(is_active) FROM system.clusters WHERE cluster='test_sync_database'") + assert "2\n" == main_node.query( + "SELECT sum(is_active) FROM system.clusters WHERE cluster='test_sync_database'" + ) assert dummy_node.query( "SELECT count() FROM system.tables where database='test_sync_database'" From 2ce96f48f3c3958ef51c3e620b886d633436bb26 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Fri, 8 Mar 2024 21:58:01 +0100 Subject: [PATCH 10/30] Update 02962_system_sync_replica_lightweight_from_modifier.sh --- .../02962_system_sync_replica_lightweight_from_modifier.sh | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/queries/0_stateless/02962_system_sync_replica_lightweight_from_modifier.sh b/tests/queries/0_stateless/02962_system_sync_replica_lightweight_from_modifier.sh index f47801abf73..b61be87411d 100755 --- a/tests/queries/0_stateless/02962_system_sync_replica_lightweight_from_modifier.sh +++ b/tests/queries/0_stateless/02962_system_sync_replica_lightweight_from_modifier.sh @@ -14,7 +14,7 @@ export REPLICAS_TO_DROP for i in $(seq $TOTAL_REPLICAS); do $CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS test_table_$i" - $CLICKHOUSE_CLIENT --query "CREATE TABLE test_table_$i (key UInt64, value UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/test_table', '$i') ORDER BY key" + $CLICKHOUSE_CLIENT --query "CREATE TABLE test_table_$i (key UInt64, value UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/test_table', '$i') ORDER BY key SETTINGS old_parts_lifetime=1" done function insert_thread() { @@ -35,7 +35,7 @@ function sync_and_drop_replicas() { done for i in $(seq $REPLICAS_TO_DROP); do - $CLICKHOUSE_CLIENT --query "CREATE TABLE test_table_$i (key UInt64, value UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/test_table', '$i') ORDER BY key" + $CLICKHOUSE_CLIENT --query "CREATE TABLE test_table_$i (key UInt64, value UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/test_table', '$i') ORDER BY key SETTINGS old_parts_lifetime=1" done done } @@ -87,4 +87,4 @@ for i in $(seq $TOTAL_REPLICAS); do if [ $i -gt $REPLICAS_TO_DROP ]; then $CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS test_table_$i" fi -done \ No newline at end of file +done From d2b8afb98b39067d057ce0159d24d5879d284e44 Mon Sep 17 00:00:00 2001 From: taiyang-li <654010905@qq.com> Date: Thu, 14 Mar 2024 18:26:13 +0800 Subject: [PATCH 11/30] add benchmarks for column::insertmanyfrom --- src/Columns/CMakeLists.txt | 4 + src/Columns/benchmarks/CMakeLists.txt | 4 + .../benchmark_column_insert_many_from.cpp | 102 ++++++++++++++++++ 3 files changed, 110 insertions(+) create mode 100644 src/Columns/benchmarks/CMakeLists.txt create mode 100644 src/Columns/benchmarks/benchmark_column_insert_many_from.cpp diff --git a/src/Columns/CMakeLists.txt b/src/Columns/CMakeLists.txt index f676f415eea..1febe4f71d7 100644 --- a/src/Columns/CMakeLists.txt +++ b/src/Columns/CMakeLists.txt @@ -1,3 +1,7 @@ if (ENABLE_EXAMPLES) add_subdirectory (examples) endif () + +if (ENABLE_BENCHMARKS) + add_subdirectory(benchmarks) +endif() diff --git a/src/Columns/benchmarks/CMakeLists.txt b/src/Columns/benchmarks/CMakeLists.txt new file mode 100644 index 00000000000..47f5dfe4c59 --- /dev/null +++ b/src/Columns/benchmarks/CMakeLists.txt @@ -0,0 +1,4 @@ +clickhouse_add_executable(column_insert_many_from benchmark_column_insert_many_from.cpp) +target_link_libraries (column_insert_many_from PRIVATE + ch_contrib::gbenchmark_all + dbms) \ No newline at end of file diff --git a/src/Columns/benchmarks/benchmark_column_insert_many_from.cpp b/src/Columns/benchmarks/benchmark_column_insert_many_from.cpp new file mode 100644 index 00000000000..325cf5559cd --- /dev/null +++ b/src/Columns/benchmarks/benchmark_column_insert_many_from.cpp @@ -0,0 +1,102 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace DB; + +static constexpr size_t ROWS = 65536; + +static ColumnPtr mockColumn(const DataTypePtr & type, size_t rows) +{ + const auto * type_array = typeid_cast(type.get()); + if (type_array) + { + auto data_col = mockColumn(type_array->getNestedType(), rows); + auto offset_col = ColumnArray::ColumnOffsets::create(rows); + auto & offsets = offset_col->getData(); + for (size_t i = 0; i < data_col->size(); ++i) + offsets[i] = offsets[i - 1] + (rand() % 10); + auto new_data_col = data_col->replicate(offsets); + + return ColumnArray::create(new_data_col, std::move(offset_col)); + } + + auto type_not_nullable = removeNullable(type); + auto column = type->createColumn(); + for (size_t i = 0; i < rows; ++i) + { + if (i % 100) + column->insertDefault(); + else if (isInt(type_not_nullable)) + column->insert(i); + else if (isFloat(type_not_nullable)) + { + double d = i * 1.0; + column->insert(d); + } + else if (isString(type_not_nullable)) + { + String s = "helloworld"; + column->insert(s); + } + else + column->insertDefault(); + } + return std::move(column); +} + + +static NO_INLINE void insertManyFrom(IColumn & dst, const IColumn & src) +{ + size_t size = src.size(); + dst.insertManyFrom(src, size / 2, size); +} + + +template +static void BM_insertManyFrom(benchmark::State & state) +{ + auto type = DataTypeFactory::instance().get(str_type); + auto src = mockColumn(type, ROWS); + + for (auto _ : state) + { + state.PauseTiming(); + auto dst = type->createColumn(); + dst->reserve(ROWS); + state.ResumeTiming(); + + insertManyFrom(*dst, *src); + benchmark::DoNotOptimize(dst); + } +} + +static const String type_int64 = "Int64"; +static const String type_nullable_int64 = "Nullable(Int64)"; +static const String type_string = "String"; +static const String type_nullable_string = "Nullable(String)"; +static const String type_decimal = "Decimal128(3)"; +static const String type_nullable_decimal = "Nullable(Decimal128(3))"; + +static const String type_array_int64 = "Array(Int64)"; +static const String type_array_nullable_int64 = "Array(Nullable(Int64))"; +static const String type_array_string = "Array(String)"; +static const String type_array_nullable_string = "Array(Nullable(String))"; + +BENCHMARK_TEMPLATE(BM_insertManyFrom, type_int64); +BENCHMARK_TEMPLATE(BM_insertManyFrom, type_nullable_int64); +BENCHMARK_TEMPLATE(BM_insertManyFrom, type_string); +BENCHMARK_TEMPLATE(BM_insertManyFrom, type_nullable_string); +BENCHMARK_TEMPLATE(BM_insertManyFrom, type_decimal); +BENCHMARK_TEMPLATE(BM_insertManyFrom, type_nullable_decimal); + +BENCHMARK_TEMPLATE(BM_insertManyFrom, type_array_int64); +BENCHMARK_TEMPLATE(BM_insertManyFrom, type_array_nullable_int64); +BENCHMARK_TEMPLATE(BM_insertManyFrom, type_array_string); +BENCHMARK_TEMPLATE(BM_insertManyFrom, type_array_nullable_string); From 00533f3df634c3a96e78fa0732c9375f257ffb5b Mon Sep 17 00:00:00 2001 From: taiyang-li <654010905@qq.com> Date: Fri, 15 Mar 2024 19:43:44 +0800 Subject: [PATCH 12/30] revert opts in column array --- src/Columns/ColumnArray.cpp | 232 +++--------------------------------- src/Columns/ColumnArray.h | 14 +-- 2 files changed, 19 insertions(+), 227 deletions(-) diff --git a/src/Columns/ColumnArray.cpp b/src/Columns/ColumnArray.cpp index 0214375122f..7b268b80116 100644 --- a/src/Columns/ColumnArray.cpp +++ b/src/Columns/ColumnArray.cpp @@ -31,7 +31,6 @@ namespace ErrorCodes extern const int SIZES_OF_COLUMNS_DOESNT_MATCH; extern const int LOGICAL_ERROR; extern const int TOO_LARGE_ARRAY_SIZE; - extern const int ILLEGAL_COLUMN; } /** Obtaining array as Field can be slow for large arrays and consume vast amount of memory. @@ -43,34 +42,29 @@ namespace ErrorCodes static constexpr size_t max_array_size_as_field = 1000000; -ColumnArray::ColumnArray(MutableColumnPtr && nested_column, MutableColumnPtr && offsets_column, bool check_offsets) +ColumnArray::ColumnArray(MutableColumnPtr && nested_column, MutableColumnPtr && offsets_column) : data(std::move(nested_column)), offsets(std::move(offsets_column)) { - if (check_offsets) + const ColumnOffsets * offsets_concrete = typeid_cast(offsets.get()); + + if (!offsets_concrete) + throw Exception(ErrorCodes::LOGICAL_ERROR, "offsets_column must be a ColumnUInt64"); + + if (!offsets_concrete->empty() && data && !data->empty()) { - const ColumnOffsets * offsets_concrete = typeid_cast(offsets.get()); + Offset last_offset = offsets_concrete->getData().back(); - if (!offsets_concrete) - throw Exception(ErrorCodes::LOGICAL_ERROR, "offsets_column must be a ColumnUInt64"); - - if (!offsets_concrete->empty() && data && !data->empty()) - { - Offset last_offset = offsets_concrete->getData().back(); - - /// This will also prevent possible overflow in offset. - if (data->size() != last_offset) - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "offsets_column has data inconsistent with nested_column. Data size: {}, last offset: {}", - data->size(), - last_offset); - } - - /** NOTE - * Arrays with constant value are possible and used in implementation of higher order functions (see FunctionReplicate). - * But in most cases, arrays with constant value are unexpected and code will work wrong. Use with caution. - */ + /// This will also prevent possible overflow in offset. + if (data->size() != last_offset) + throw Exception(ErrorCodes::LOGICAL_ERROR, + "offsets_column has data inconsistent with nested_column. Data size: {}, last offset: {}", + data->size(), last_offset); } + + /** NOTE + * Arrays with constant value are possible and used in implementation of higher order functions (see FunctionReplicate). + * But in most cases, arrays with constant value are unexpected and code will work wrong. Use with caution. + */ } ColumnArray::ColumnArray(MutableColumnPtr && nested_column) @@ -353,196 +347,6 @@ void ColumnArray::insertFrom(const IColumn & src_, size_t n) getOffsets().push_back(getOffsets().back() + size); } -template -void ColumnArray::insertManyFromNumber(const ColumnArray & src, size_t position, size_t length) -{ - using ColVecType = ColumnVectorOrDecimal; - size_t src_size = src.sizeAt(position); - size_t src_offset = src.offsetAt(position); - - const typename ColVecType::Container & src_data = typeid_cast(src.getData()).getData(); - typename ColVecType::Container & data_ref = typeid_cast(getData()).getData(); - size_t old_size = data_ref.size(); - size_t new_size = old_size + src_size * length; - data_ref.resize(new_size); - for (size_t i = 0, offset = old_size; i < length; ++i, offset += src_size) - memcpy(&data_ref[offset], &src_data[src_offset], src_size * sizeof(T)); -} - -void ColumnArray::insertManyFromConst(const ColumnConst & src, size_t /*position*/, size_t length) -{ - const ColumnArray * src_array = typeid_cast(&src.getDataColumn()); - if (!src_array) - throw Exception( - ErrorCodes::ILLEGAL_COLUMN, - "Cannot insert from const column of type {} to column of type {}", - src.getDataColumn().getName(), - getName()); - - insertManyFromImpl(*src_array, 0, length, true); -} - -void ColumnArray::insertManyFromString(const ColumnArray & src, size_t position, size_t length) -{ - size_t src_size = src.sizeAt(position); - size_t src_offset = src.offsetAt(position); - - const auto & src_string = typeid_cast(src.getData()); - const auto & src_chars = src_string.getChars(); - const auto & src_string_offsets = src_string.getOffsets(); - auto & dst_string = typeid_cast(getData()); - auto & dst_chars = dst_string.getChars(); - auto & dst_string_offsets = dst_string.getOffsets(); - - /// Each row may have multiple strings, copy them to dst_chars and update dst_offsets - size_t old_size = dst_string_offsets.size(); - size_t new_size = old_size + src_size * length; - dst_string_offsets.resize(new_size); - size_t dst_string_offset = dst_chars.size(); - for (size_t i = 0; i < length; ++i) - { - for (size_t j = 0; j < src_size; ++j) - { - size_t nested_offset = src_string_offsets[src_offset + j - 1]; - size_t nested_length = src_string_offsets[src_offset + j] - nested_offset; - - dst_string_offset += nested_length; - dst_string_offsets[old_size + i * src_size + j] = dst_string_offset; - } - } - - size_t chars_to_copy = src_string_offsets[src_offset + src_size - 1] - src_string_offsets[src_offset - 1]; - dst_chars.resize(dst_chars.size() + chars_to_copy * length); - for (size_t dst_offset = old_size; dst_offset < new_size; dst_offset += src_size) - memcpy(&dst_chars[dst_string_offsets[dst_offset - 1]], &src_chars[src_string_offsets[src_offset - 1]], chars_to_copy); -} - -void ColumnArray::insertManyFromTuple(const ColumnArray & src, size_t position, size_t length) -{ - ColumnTuple & tuple = assert_cast(getData()); - const ColumnTuple & src_tuple = assert_cast(src.getData()); - - /// Make temporary arrays for each components of Tuple. In the same way as for Nullable. - size_t tuple_size = tuple.tupleSize(); - size_t src_tuple_size = src_tuple.tupleSize(); - if (tuple_size == 0) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty tuple"); - if (tuple_size != src_tuple_size) - throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Nested tuple size mismatch: {} vs {}", tuple_size, src_tuple_size); - - Columns tuple_columns(tuple_size); - for (size_t i = 0; i < tuple_size; ++i) - { - auto array_of_element = ColumnArray(tuple.getColumn(i).assumeMutable(), getOffsetsPtr()->assumeMutable(), false); - auto src_array_of_element = ColumnArray(src_tuple.getColumn(i).assumeMutable(), src.getOffsetsPtr()->assumeMutable()); - array_of_element.insertManyFromImpl(src_array_of_element, position, length, false); - tuple_columns[i] = array_of_element.getDataPtr(); - } - getDataPtr() = ColumnTuple::create(std::move(tuple_columns)); -} - -void ColumnArray::insertManyFromNullable(const ColumnArray & src, size_t position, size_t length) -{ - ColumnNullable & nullable = assert_cast(getData()); - const ColumnNullable & src_nullable = assert_cast(src.getData()); - - /// Process nested column without updating array offsets - auto array_of_nested = ColumnArray(nullable.getNestedColumnPtr()->assumeMutable(), getOffsetsPtr()->assumeMutable(), false); - auto src_array_of_nested = ColumnArray(src_nullable.getNestedColumnPtr()->assumeMutable(), src.getOffsetsPtr()->assumeMutable()); - array_of_nested.insertManyFromImpl(src_array_of_nested, position, length, false); - - /// Process null map column without updating array offsets - auto array_of_null_map = ColumnArray(nullable.getNullMapColumnPtr()->assumeMutable(), getOffsetsPtr()->assumeMutable(), false); - auto src_array_of_null_map = ColumnArray(src_nullable.getNullMapColumnPtr()->assumeMutable(), src.getOffsetsPtr()->assumeMutable()); - array_of_null_map.insertManyFromImpl(src_array_of_null_map, position, length, false); - - /// Update array data - getDataPtr() = ColumnNullable::create(array_of_nested.getDataPtr(), array_of_null_map.getDataPtr()); -} - -void ColumnArray::insertManyFromGeneric(const ColumnArray & src, size_t position, size_t length) -{ - size_t src_size = src.sizeAt(position); - size_t src_offset = src.offsetAt(position); - const auto & src_data = src.getData(); - size_t new_size = data->size() + src_size * length; - data->reserve(new_size); - for (size_t i = 0; i < length; ++i) - data->insertRangeFrom(src_data, src_offset, src_size); -} - -void ColumnArray::insertManyFrom(const IColumn & src_, size_t position, size_t length) -{ - const ColumnConst * src_const = typeid_cast(&src_); - if (src_const) - return insertManyFromConst(*src_const, position, length); - - const ColumnArray * src_array = typeid_cast(&src_); - if (!src_array) - throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Cannot insert from column of type {} to column of type {}", src_.getName(), getName()); - - return insertManyFromImpl(*src_array, position, length, true); -} - -void ColumnArray::insertManyFromImpl(const ColumnArray & src, size_t position, size_t length, bool update_offsets) -{ - /// First fill offsets if needed - if (update_offsets) - { - size_t src_size = src.sizeAt(position); - auto & offsets_ref = getOffsets(); - size_t old_rows = offsets_ref.size(); - size_t new_rows = old_rows + length; - size_t old_size = offsets_ref.back(); - offsets_ref.resize(new_rows); - for (size_t i = 0, offset = old_size + src_size; i < length; ++i, offset += src_size) - offsets_ref[old_rows + i] = offset; - } - - if (typeid_cast(data.get())) - return insertManyFromNumber(src, position, length); - if (typeid_cast(data.get())) - return insertManyFromNumber(src, position, length); - if (typeid_cast(data.get())) - return insertManyFromNumber(src, position, length); - if (typeid_cast(data.get())) - return insertManyFromNumber(src, position, length); - if (typeid_cast(data.get())) - return insertManyFromNumber(src, position, length); - if (typeid_cast(data.get())) - return insertManyFromNumber(src, position, length); - if (typeid_cast(data.get())) - return insertManyFromNumber(src, position, length); - if (typeid_cast(data.get())) - return insertManyFromNumber(src, position, length); - if (typeid_cast(data.get())) - return insertManyFromNumber(src, position, length); - if (typeid_cast(data.get())) - return insertManyFromNumber(src, position, length); - if (typeid_cast(data.get())) - return insertManyFromNumber(src, position, length); - if (typeid_cast(data.get())) - return insertManyFromNumber(src, position, length); - if (typeid_cast(data.get())) - return insertManyFromNumber(src, position, length); - if (typeid_cast *>(data.get())) - return insertManyFromNumber(src, position, length); - if (typeid_cast *>(data.get())) - return insertManyFromNumber(src, position, length); - if (typeid_cast *>(data.get())) - return insertManyFromNumber(src, position, length); - if (typeid_cast *>(data.get())) - return insertManyFromNumber(src, position, length); - if (typeid_cast *>(data.get())) - return insertManyFromNumber(src, position, length); - if (typeid_cast(data.get())) - return insertManyFromString(src, position, length); - if (typeid_cast(data.get())) - return insertManyFromNullable(src, position, length); - if (typeid_cast(data.get())) - return insertManyFromTuple(src, position, length); - return insertManyFromGeneric(src, position, length); -} void ColumnArray::insertDefault() { diff --git a/src/Columns/ColumnArray.h b/src/Columns/ColumnArray.h index 8c4d103e7d0..230d8830265 100644 --- a/src/Columns/ColumnArray.h +++ b/src/Columns/ColumnArray.h @@ -21,7 +21,7 @@ private: friend class COWHelper, ColumnArray>; /** Create an array column with specified values and offsets. */ - ColumnArray(MutableColumnPtr && nested_column, MutableColumnPtr && offsets_column, bool check_offsets = true); + ColumnArray(MutableColumnPtr && nested_column, MutableColumnPtr && offsets_column); /** Create an empty column of arrays with the type of values as in the column `nested_column` */ explicit ColumnArray(MutableColumnPtr && nested_column); @@ -88,7 +88,6 @@ public: void insert(const Field & x) override; bool tryInsert(const Field & x) override; void insertFrom(const IColumn & src_, size_t n) override; - void insertManyFrom(const IColumn & src, size_t position, size_t length) override; void insertDefault() override; void popBack(size_t n) override; ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override; @@ -214,17 +213,6 @@ private: ColumnPtr filterNullable(const Filter & filt, ssize_t result_size_hint) const; ColumnPtr filterGeneric(const Filter & filt, ssize_t result_size_hint) const; - /// Specializations for insertManyFrom - void insertManyFromConst(const ColumnConst & src, size_t position, size_t length); - void insertManyFromImpl(const ColumnArray & src, size_t position, size_t length, bool update_offsets = true); - - template - void insertManyFromNumber(const ColumnArray & src, size_t position, size_t length); - void insertManyFromString(const ColumnArray & src, size_t position, size_t length); - void insertManyFromTuple(const ColumnArray & src, size_t position, size_t length); - void insertManyFromNullable(const ColumnArray & src, size_t position, size_t length); - void insertManyFromGeneric(const ColumnArray & src, size_t position, size_t length); - int compareAtImpl(size_t n, size_t m, const IColumn & rhs_, int nan_direction_hint, const Collator * collator=nullptr) const; }; From adaf262dc09ff08628d54d5ffd64f7ea5053482d Mon Sep 17 00:00:00 2001 From: Dan Wu Date: Sat, 16 Mar 2024 01:12:59 +0000 Subject: [PATCH 13/30] Fix bug when reading system.parts using UUID (issue 61220). During the predicates pushing down of system.parts table, there is a logic to use that predicates to filter on the storages. This makes sense if the predicate is on the `database`, `table`, `engine`. But it can be problem if the predicate is about `uuid`, because storage UUID is apparently different from parts UUID. Rename the column name from `uuid` to `storage_uuid` fixed this. --- src/Storages/System/StorageSystemPartsBase.cpp | 4 ++-- .../03010_read_system_parts_table_test.reference | 4 ++++ .../0_stateless/03010_read_system_parts_table_test.sql | 10 ++++++++++ 3 files changed, 16 insertions(+), 2 deletions(-) create mode 100644 tests/queries/0_stateless/03010_read_system_parts_table_test.reference create mode 100644 tests/queries/0_stateless/03010_read_system_parts_table_test.sql diff --git a/src/Storages/System/StorageSystemPartsBase.cpp b/src/Storages/System/StorageSystemPartsBase.cpp index 6bdfdd357e8..1283f2d6ce0 100644 --- a/src/Storages/System/StorageSystemPartsBase.cpp +++ b/src/Storages/System/StorageSystemPartsBase.cpp @@ -188,7 +188,7 @@ StoragesInfoStream::StoragesInfoStream(const ActionsDAG::Node * predicate, Conte block_to_filter.insert(ColumnWithTypeAndName(std::move(table_column_mut), std::make_shared(), "table")); block_to_filter.insert(ColumnWithTypeAndName(std::move(engine_column_mut), std::make_shared(), "engine")); block_to_filter.insert(ColumnWithTypeAndName(std::move(active_column_mut), std::make_shared(), "active")); - block_to_filter.insert(ColumnWithTypeAndName(std::move(storage_uuid_column_mut), std::make_shared(), "uuid")); + block_to_filter.insert(ColumnWithTypeAndName(std::move(storage_uuid_column_mut), std::make_shared(), "storage_uuid")); if (rows) { @@ -200,7 +200,7 @@ StoragesInfoStream::StoragesInfoStream(const ActionsDAG::Node * predicate, Conte database_column = block_to_filter.getByName("database").column; table_column = block_to_filter.getByName("table").column; active_column = block_to_filter.getByName("active").column; - storage_uuid_column = block_to_filter.getByName("uuid").column; + storage_uuid_column = block_to_filter.getByName("storage_uuid").column; } class ReadFromSystemPartsBase : public SourceStepWithFilter diff --git a/tests/queries/0_stateless/03010_read_system_parts_table_test.reference b/tests/queries/0_stateless/03010_read_system_parts_table_test.reference new file mode 100644 index 00000000000..6847786761e --- /dev/null +++ b/tests/queries/0_stateless/03010_read_system_parts_table_test.reference @@ -0,0 +1,4 @@ +00000000-0000-0000-0000-000000000000 1231_1_1_0 +00000000-0000-0000-0000-000000000000 6666_2_2_0 +users +users diff --git a/tests/queries/0_stateless/03010_read_system_parts_table_test.sql b/tests/queries/0_stateless/03010_read_system_parts_table_test.sql new file mode 100644 index 00000000000..6ec7e6212d5 --- /dev/null +++ b/tests/queries/0_stateless/03010_read_system_parts_table_test.sql @@ -0,0 +1,10 @@ +DROP TABLE IF EXISTS users; +CREATE TABLE users (uid Int16, name String, age Int16) ENGINE=MergeTree ORDER BY uid PARTITION BY uid; + +INSERT INTO users VALUES (1231, 'John', 33); +INSERT INTO users VALUES (6666, 'Ksenia', 48); + +SELECT uuid, name from system.parts WHERE table = 'users'; + +SELECT table from system.parts WHERE uuid = '00000000-0000-0000-0000-000000000000'; +DROP TABLE IF EXISTS users; From fd9231a886ffb37db3657695b590b43d0fbd31ef Mon Sep 17 00:00:00 2001 From: Dan Wu Date: Mon, 18 Mar 2024 07:41:50 +0800 Subject: [PATCH 14/30] Add database = currentDatabase() condition to query test 03010_read_system_parts_table_test.sql The failed test suggest to add database = currentDatabase() condition to tests that queries on system.parts. --- .../0_stateless/03010_read_system_parts_table_test.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/queries/0_stateless/03010_read_system_parts_table_test.sql b/tests/queries/0_stateless/03010_read_system_parts_table_test.sql index 6ec7e6212d5..8871822af4e 100644 --- a/tests/queries/0_stateless/03010_read_system_parts_table_test.sql +++ b/tests/queries/0_stateless/03010_read_system_parts_table_test.sql @@ -4,7 +4,7 @@ CREATE TABLE users (uid Int16, name String, age Int16) ENGINE=MergeTree ORDER BY INSERT INTO users VALUES (1231, 'John', 33); INSERT INTO users VALUES (6666, 'Ksenia', 48); -SELECT uuid, name from system.parts WHERE table = 'users'; +SELECT uuid, name from system.parts WHERE database = currentDatabase() AND table = 'users'; -SELECT table from system.parts WHERE uuid = '00000000-0000-0000-0000-000000000000'; +SELECT table from system.parts WHERE database = currentDatabase() AND uuid = '00000000-0000-0000-0000-000000000000'; DROP TABLE IF EXISTS users; From 679bf92cd146f0855ba5e02bc0c343c46506b534 Mon Sep 17 00:00:00 2001 From: avogar Date: Mon, 18 Mar 2024 15:08:46 +0000 Subject: [PATCH 15/30] Split test 00159_parallel_formatting_json_and_friends to avoid timeouts --- ...l_formatting_json_and_friends_1.reference} | 12 ----------- ...parallel_formatting_json_and_friends_1.sh} | 2 +- ...el_formatting_json_and_friends_2.reference | 12 +++++++++++ ..._parallel_formatting_json_and_friends_2.sh | 20 +++++++++++++++++++ 4 files changed, 33 insertions(+), 13 deletions(-) rename tests/queries/1_stateful/{00159_parallel_formatting_json_and_friends.reference => 00159_parallel_formatting_json_and_friends_1.reference} (51%) rename tests/queries/1_stateful/{00159_parallel_formatting_json_and_friends.sh => 00159_parallel_formatting_json_and_friends_1.sh} (83%) create mode 100644 tests/queries/1_stateful/00159_parallel_formatting_json_and_friends_2.reference create mode 100755 tests/queries/1_stateful/00159_parallel_formatting_json_and_friends_2.sh diff --git a/tests/queries/1_stateful/00159_parallel_formatting_json_and_friends.reference b/tests/queries/1_stateful/00159_parallel_formatting_json_and_friends_1.reference similarity index 51% rename from tests/queries/1_stateful/00159_parallel_formatting_json_and_friends.reference rename to tests/queries/1_stateful/00159_parallel_formatting_json_and_friends_1.reference index 787b57ce761..cd425198651 100644 --- a/tests/queries/1_stateful/00159_parallel_formatting_json_and_friends.reference +++ b/tests/queries/1_stateful/00159_parallel_formatting_json_and_friends_1.reference @@ -14,15 +14,3 @@ JSONCompactEachRowWithNames, false e3231b1c8187de4da6752d692b2ddba9 - JSONCompactEachRowWithNames, true e3231b1c8187de4da6752d692b2ddba9 - -JSONCompactStringsEachRowWithNames, false -e3231b1c8187de4da6752d692b2ddba9 - -JSONCompactStringsEachRowWithNames, true -e3231b1c8187de4da6752d692b2ddba9 - -JSONCompactEachRowWithNamesAndTypes, false -d40c4327c63eded184eee185a5330e12 - -JSONCompactEachRowWithNamesAndTypes, true -d40c4327c63eded184eee185a5330e12 - -JSONCompactStringsEachRowWithNamesAndTypes, false -d40c4327c63eded184eee185a5330e12 - -JSONCompactStringsEachRowWithNamesAndTypes, true -d40c4327c63eded184eee185a5330e12 - diff --git a/tests/queries/1_stateful/00159_parallel_formatting_json_and_friends.sh b/tests/queries/1_stateful/00159_parallel_formatting_json_and_friends_1.sh similarity index 83% rename from tests/queries/1_stateful/00159_parallel_formatting_json_and_friends.sh rename to tests/queries/1_stateful/00159_parallel_formatting_json_and_friends_1.sh index 77dd330099a..0cc4636ef5d 100755 --- a/tests/queries/1_stateful/00159_parallel_formatting_json_and_friends.sh +++ b/tests/queries/1_stateful/00159_parallel_formatting_json_and_friends_1.sh @@ -6,7 +6,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) . "$CURDIR"/../shell_config.sh -FORMATS=('JSONEachRow' 'JSONCompactEachRow' 'JSONCompactStringsEachRow' 'JSONCompactEachRowWithNames' 'JSONCompactStringsEachRowWithNames' 'JSONCompactEachRowWithNamesAndTypes' 'JSONCompactStringsEachRowWithNamesAndTypes') +FORMATS=('JSONEachRow' 'JSONCompactEachRow' 'JSONCompactStringsEachRow' 'JSONCompactEachRowWithNames') for format in "${FORMATS[@]}" do diff --git a/tests/queries/1_stateful/00159_parallel_formatting_json_and_friends_2.reference b/tests/queries/1_stateful/00159_parallel_formatting_json_and_friends_2.reference new file mode 100644 index 00000000000..ba7a8ac5fdc --- /dev/null +++ b/tests/queries/1_stateful/00159_parallel_formatting_json_and_friends_2.reference @@ -0,0 +1,12 @@ +JSONCompactStringsEachRowWithNames, false +e3231b1c8187de4da6752d692b2ddba9 - +JSONCompactStringsEachRowWithNames, true +e3231b1c8187de4da6752d692b2ddba9 - +JSONCompactEachRowWithNamesAndTypes, false +d40c4327c63eded184eee185a5330e12 - +JSONCompactEachRowWithNamesAndTypes, true +d40c4327c63eded184eee185a5330e12 - +JSONCompactStringsEachRowWithNamesAndTypes, false +d40c4327c63eded184eee185a5330e12 - +JSONCompactStringsEachRowWithNamesAndTypes, true +d40c4327c63eded184eee185a5330e12 - diff --git a/tests/queries/1_stateful/00159_parallel_formatting_json_and_friends_2.sh b/tests/queries/1_stateful/00159_parallel_formatting_json_and_friends_2.sh new file mode 100755 index 00000000000..1c49305fe53 --- /dev/null +++ b/tests/queries/1_stateful/00159_parallel_formatting_json_and_friends_2.sh @@ -0,0 +1,20 @@ +#!/usr/bin/env bash +# Tags: no-fasttest + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + + +FORMATS=('JSONCompactStringsEachRowWithNames' 'JSONCompactEachRowWithNamesAndTypes' 'JSONCompactStringsEachRowWithNamesAndTypes') + +for format in "${FORMATS[@]}" +do + echo "$format, false"; + $CLICKHOUSE_CLIENT --max_threads=0 --output_format_parallel_formatting=false -q \ + "SELECT ClientEventTime::DateTime('Asia/Dubai') as a, MobilePhoneModel as b, ClientIP6 as c FROM test.hits ORDER BY a, b, c LIMIT 3000000 Format $format" | md5sum + + echo "$format, true"; + $CLICKHOUSE_CLIENT --max_threads=0 --output_format_parallel_formatting=true -q \ + "SELECT ClientEventTime::DateTime('Asia/Dubai') as a, MobilePhoneModel as b, ClientIP6 as c FROM test.hits ORDER BY a, b, c LIMIT 3000000 Format $format" | md5sum +done From 514faa851c32d26620fb3a086f3dbfe40ecd1a23 Mon Sep 17 00:00:00 2001 From: avogar Date: Mon, 18 Mar 2024 15:26:01 +0000 Subject: [PATCH 16/30] Make test 00159_parallel_formatting_tsv_and_friends.sh faster --- .../1_stateful/00159_parallel_formatting_tsv_and_friends.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/queries/1_stateful/00159_parallel_formatting_tsv_and_friends.sh b/tests/queries/1_stateful/00159_parallel_formatting_tsv_and_friends.sh index 02441190b91..393a9fc4282 100755 --- a/tests/queries/1_stateful/00159_parallel_formatting_tsv_and_friends.sh +++ b/tests/queries/1_stateful/00159_parallel_formatting_tsv_and_friends.sh @@ -11,9 +11,9 @@ for format in "${FORMATS[@]}" do echo "$format, false"; $CLICKHOUSE_CLIENT --max_threads=0 --output_format_parallel_formatting=false -q \ - "SELECT ClientEventTime::DateTime('Asia/Dubai') as a, MobilePhoneModel as b, ClientIP6 as c FROM test.hits ORDER BY a, b, c Format $format" | md5sum + "SELECT ClientEventTime::DateTime('Asia/Dubai') as a, MobilePhoneModel as b, ClientIP6 as c FROM test.hits ORDER BY a, b, c LIMIT 3000000 Format $format" | md5sum echo "$format, true"; $CLICKHOUSE_CLIENT --max_threads=0 --output_format_parallel_formatting=true -q \ - "SELECT ClientEventTime::DateTime('Asia/Dubai') as a, MobilePhoneModel as b, ClientIP6 as c FROM test.hits ORDER BY a, b, c Format $format" | md5sum + "SELECT ClientEventTime::DateTime('Asia/Dubai') as a, MobilePhoneModel as b, ClientIP6 as c FROM test.hits ORDER BY a, b, c LIMIT 3000000 Format $format" | md5sum done From 4f30ac72a1ef01f7c783c192639933e32d2f0397 Mon Sep 17 00:00:00 2001 From: William Schoeffel Date: Mon, 18 Mar 2024 15:18:47 -0300 Subject: [PATCH 17/30] fix typo in variable name legacy --- src/Storages/MergeTree/MergeTreePartInfo.cpp | 10 +++++----- src/Storages/MergeTree/MergeTreePartInfo.h | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreePartInfo.cpp b/src/Storages/MergeTree/MergeTreePartInfo.cpp index 8c1b630e85e..f3a974a22af 100644 --- a/src/Storages/MergeTree/MergeTreePartInfo.cpp +++ b/src/Storages/MergeTree/MergeTreePartInfo.cpp @@ -131,7 +131,7 @@ std::optional MergeTreePartInfo::tryParsePartName( /// "Part 20170601_20170630_0_2_999999999 intersects 201706_0_1_4294967295". /// So we replace unexpected max level to make contains(...) method and comparison operators work /// correctly with such virtual parts. On part name serialization we will use legacy max level to keep the name unchanged. - part_info.use_leagcy_max_level = true; + part_info.use_legacy_max_level = true; level = MAX_LEVEL; } @@ -205,7 +205,7 @@ String MergeTreePartInfo::getPartNameV1() const writeChar('_', wb); writeIntText(max_block, wb); writeChar('_', wb); - if (use_leagcy_max_level) + if (use_legacy_max_level) { assert(level == MAX_LEVEL); writeIntText(LEGACY_MAX_LEVEL, wb); @@ -244,7 +244,7 @@ String MergeTreePartInfo::getPartNameV0(DayNum left_date, DayNum right_date) con writeChar('_', wb); writeIntText(max_block, wb); writeChar('_', wb); - if (use_leagcy_max_level) + if (use_legacy_max_level) { assert(level == MAX_LEVEL); writeIntText(LEGACY_MAX_LEVEL, wb); @@ -274,7 +274,7 @@ void MergeTreePartInfo::serialize(WriteBuffer & out) const writeIntBinary(max_block, out); writeIntBinary(level, out); writeIntBinary(mutation, out); - writeBoolText(use_leagcy_max_level, out); + writeBoolText(use_legacy_max_level, out); } @@ -297,7 +297,7 @@ void MergeTreePartInfo::deserialize(ReadBuffer & in) readIntBinary(max_block, in); readIntBinary(level, in); readIntBinary(mutation, in); - readBoolText(use_leagcy_max_level, in); + readBoolText(use_legacy_max_level, in); } bool MergeTreePartInfo::areAllBlockNumbersCovered(const MergeTreePartInfo & blocks_range, std::vector candidates) diff --git a/src/Storages/MergeTree/MergeTreePartInfo.h b/src/Storages/MergeTree/MergeTreePartInfo.h index 5fbb5d70bf3..9bb79e21144 100644 --- a/src/Storages/MergeTree/MergeTreePartInfo.h +++ b/src/Storages/MergeTree/MergeTreePartInfo.h @@ -26,7 +26,7 @@ struct MergeTreePartInfo UInt32 level = 0; Int64 mutation = 0; /// If the part has been mutated or contains mutated parts, is equal to mutation version number. - bool use_leagcy_max_level = false; /// For compatibility. TODO remove it + bool use_legacy_max_level = false; /// For compatibility. TODO remove it MergeTreePartInfo() = default; From 9af69b42668c9c4363af1865e86d04453ad5ddcc Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Mon, 18 Mar 2024 21:05:09 +0100 Subject: [PATCH 18/30] fix read_rows count with external group by --- src/Processors/ISource.h | 2 +- src/Processors/Transforms/AggregatingTransform.cpp | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/Processors/ISource.h b/src/Processors/ISource.h index 8c140d0d0a3..778042c5b56 100644 --- a/src/Processors/ISource.h +++ b/src/Processors/ISource.h @@ -44,7 +44,7 @@ public: void setStorageLimits(const std::shared_ptr & storage_limits_) override; /// Default implementation for all the sources. - std::optional getReadProgress() final; + std::optional getReadProgress() override; void addTotalRowsApprox(size_t value); void addTotalBytes(size_t value); diff --git a/src/Processors/Transforms/AggregatingTransform.cpp b/src/Processors/Transforms/AggregatingTransform.cpp index 74da97f2199..ea5c525d5f2 100644 --- a/src/Processors/Transforms/AggregatingTransform.cpp +++ b/src/Processors/Transforms/AggregatingTransform.cpp @@ -80,6 +80,8 @@ namespace return convertToChunk(block); } + std::optional getReadProgress() override { return std::nullopt; } + private: TemporaryFileStream * tmp_stream; }; From 8d49ff7350304619386cb849851432a5ca434619 Mon Sep 17 00:00:00 2001 From: Dan Wu Date: Tue, 19 Mar 2024 03:12:32 +0000 Subject: [PATCH 19/30] Add tests for system.parts_columns table. --- .../03010_read_system_parts_table_test.reference | 10 ++++++++-- .../0_stateless/03010_read_system_parts_table_test.sql | 3 ++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/tests/queries/0_stateless/03010_read_system_parts_table_test.reference b/tests/queries/0_stateless/03010_read_system_parts_table_test.reference index 6847786761e..c2dd177f245 100644 --- a/tests/queries/0_stateless/03010_read_system_parts_table_test.reference +++ b/tests/queries/0_stateless/03010_read_system_parts_table_test.reference @@ -1,4 +1,10 @@ 00000000-0000-0000-0000-000000000000 1231_1_1_0 00000000-0000-0000-0000-000000000000 6666_2_2_0 -users -users +00000000-0000-0000-0000-000000000000 1231_1_1_0 users +00000000-0000-0000-0000-000000000000 6666_2_2_0 users +00000000-0000-0000-0000-000000000000 1231_1_1_0 users uid +00000000-0000-0000-0000-000000000000 1231_1_1_0 users name +00000000-0000-0000-0000-000000000000 1231_1_1_0 users age +00000000-0000-0000-0000-000000000000 6666_2_2_0 users uid +00000000-0000-0000-0000-000000000000 6666_2_2_0 users name +00000000-0000-0000-0000-000000000000 6666_2_2_0 users age diff --git a/tests/queries/0_stateless/03010_read_system_parts_table_test.sql b/tests/queries/0_stateless/03010_read_system_parts_table_test.sql index 8871822af4e..a50005b2312 100644 --- a/tests/queries/0_stateless/03010_read_system_parts_table_test.sql +++ b/tests/queries/0_stateless/03010_read_system_parts_table_test.sql @@ -6,5 +6,6 @@ INSERT INTO users VALUES (6666, 'Ksenia', 48); SELECT uuid, name from system.parts WHERE database = currentDatabase() AND table = 'users'; -SELECT table from system.parts WHERE database = currentDatabase() AND uuid = '00000000-0000-0000-0000-000000000000'; +SELECT uuid, name, table from system.parts WHERE database = currentDatabase() AND table = 'users' AND uuid = '00000000-0000-0000-0000-000000000000'; +SELECT uuid, name, table, column from system.parts_columns WHERE database = currentDatabase() AND table = 'users' AND uuid = '00000000-0000-0000-0000-000000000000'; DROP TABLE IF EXISTS users; From eb75926e50d03ddba7327c3377b11ead3e221c80 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Wed, 21 Feb 2024 14:33:01 +0100 Subject: [PATCH 20/30] Parallel flush of pending INSERT blocks of Distributed engine Parallelism will work only if you have multi disk policy for table (like everything in Distributed engine right now). This will work for DETACH/server shutdown and SYSTEM FLUSH DISTRIBUTED Signed-off-by: Azat Khuzhin --- src/Storages/StorageDistributed.cpp | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 3b766ac8d26..6554af1f7e6 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -1734,16 +1734,27 @@ void StorageDistributed::flushClusterNodesAllData(ContextPtr local_context) directory_queues.push_back(node.second.directory_queue); } - bool need_flush = getDistributedSettingsRef().flush_on_detach; - if (!need_flush) + if (getDistributedSettingsRef().flush_on_detach) + { + LOG_INFO(log, "Flushing pending INSERT blocks"); + + Stopwatch watch; + ThreadPool pool(CurrentMetrics::StorageDistributedThreads, CurrentMetrics::StorageDistributedThreadsActive, CurrentMetrics::StorageDistributedThreadsScheduled, directory_queues.size()); + for (const auto & node : directory_queues) + { + pool.scheduleOrThrowOnError([node_to_flush = node]() + { + node_to_flush->flushAllData(); + }); + } + pool.wait(); + LOG_INFO(log, "Pending INSERT blocks flushed, took {} ms.", watch.elapsedMilliseconds()); + } + else + { LOG_INFO(log, "Skip flushing data (due to flush_on_detach=0)"); - /// TODO: Maybe it should be executed in parallel - for (auto & node : directory_queues) - { - if (need_flush) - node->flushAllData(); - else + for (auto & node : directory_queues) node->shutdownWithoutFlush(); } } From 929dc6fa12ced13309fea73229a0c5213cdc0f30 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Thu, 22 Feb 2024 15:41:19 +0100 Subject: [PATCH 21/30] Attach to query for threads in Distributed engine background ops Signed-off-by: Azat Khuzhin --- src/Storages/StorageDistributed.cpp | 28 +++++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 6554af1f7e6..e25e3425359 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -1287,10 +1287,19 @@ void StorageDistributed::initializeFromDisk() /// Make initialization for large number of disks parallel. ThreadPool pool(CurrentMetrics::StorageDistributedThreads, CurrentMetrics::StorageDistributedThreadsActive, CurrentMetrics::StorageDistributedThreadsScheduled, disks.size()); + ThreadGroupPtr thread_group = CurrentThread::getGroup(); for (const DiskPtr & disk : disks) { pool.scheduleOrThrowOnError([&]() { + SCOPE_EXIT_SAFE( + if (thread_group) + CurrentThread::detachFromGroupIfNotDetached(); + ); + if (thread_group) + CurrentThread::attachToGroup(thread_group); + setThreadName("DistInit"); + initializeDirectoryQueuesForDisk(disk); }); } @@ -1302,6 +1311,14 @@ void StorageDistributed::initializeFromDisk() { pool.scheduleOrThrowOnError([&, i]() { + SCOPE_EXIT_SAFE( + if (thread_group) + CurrentThread::detachFromGroupIfNotDetached(); + ); + if (thread_group) + CurrentThread::attachToGroup(thread_group); + setThreadName("DistInit"); + last_increment[i] = getMaximumFileNumber(paths[i]); }); } @@ -1739,11 +1756,20 @@ void StorageDistributed::flushClusterNodesAllData(ContextPtr local_context) LOG_INFO(log, "Flushing pending INSERT blocks"); Stopwatch watch; + ThreadGroupPtr thread_group = CurrentThread::getGroup(); ThreadPool pool(CurrentMetrics::StorageDistributedThreads, CurrentMetrics::StorageDistributedThreadsActive, CurrentMetrics::StorageDistributedThreadsScheduled, directory_queues.size()); for (const auto & node : directory_queues) { - pool.scheduleOrThrowOnError([node_to_flush = node]() + pool.scheduleOrThrowOnError([node_to_flush = node, &thread_group]() { + SCOPE_EXIT_SAFE( + if (thread_group) + CurrentThread::detachFromGroupIfNotDetached(); + ); + if (thread_group) + CurrentThread::attachToGroup(thread_group); + setThreadName("DistFlush"); + node_to_flush->flushAllData(); }); } From 8b1df937a567cec4acc6702288d979d4489bb83f Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Tue, 19 Mar 2024 07:09:28 -0400 Subject: [PATCH 22/30] Use scheduleFromThreadPool for parallel processing in distributed storage Signed-off-by: Azat Khuzhin --- src/Storages/StorageDistributed.cpp | 63 ++++++++++++++--------------- 1 file changed, 30 insertions(+), 33 deletions(-) diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index e25e3425359..7819aa7683a 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -23,6 +23,7 @@ #include +#include #include #include #include @@ -282,6 +283,17 @@ size_t getClusterQueriedNodes(const Settings & settings, const ClusterPtr & clus return (num_remote_shards + num_local_shards) * settings.max_parallel_replicas; } +template +void waitFutures(F & futures) +{ + for (auto & future : futures) + future.wait(); + /// Make sure there is no exception. + for (auto & future : futures) + future.get(); + futures.clear(); +} + } /// For destruction of std::unique_ptr of type that is incomplete in class definition. @@ -1286,42 +1298,30 @@ void StorageDistributed::initializeFromDisk() /// Make initialization for large number of disks parallel. ThreadPool pool(CurrentMetrics::StorageDistributedThreads, CurrentMetrics::StorageDistributedThreadsActive, CurrentMetrics::StorageDistributedThreadsScheduled, disks.size()); + std::vector> futures; - ThreadGroupPtr thread_group = CurrentThread::getGroup(); for (const DiskPtr & disk : disks) { - pool.scheduleOrThrowOnError([&]() + auto future = scheduleFromThreadPool([this, disk_to_init = disk] { - SCOPE_EXIT_SAFE( - if (thread_group) - CurrentThread::detachFromGroupIfNotDetached(); - ); - if (thread_group) - CurrentThread::attachToGroup(thread_group); - setThreadName("DistInit"); - - initializeDirectoryQueuesForDisk(disk); - }); + initializeDirectoryQueuesForDisk(disk_to_init); + }, pool, "DistInit"); + futures.push_back(std::move(future)); } + waitFutures(futures); pool.wait(); const auto & paths = getDataPaths(); std::vector last_increment(paths.size()); for (size_t i = 0; i < paths.size(); ++i) { - pool.scheduleOrThrowOnError([&, i]() + auto future = scheduleFromThreadPool([&paths, &last_increment, i] { - SCOPE_EXIT_SAFE( - if (thread_group) - CurrentThread::detachFromGroupIfNotDetached(); - ); - if (thread_group) - CurrentThread::attachToGroup(thread_group); - setThreadName("DistInit"); - last_increment[i] = getMaximumFileNumber(paths[i]); - }); + }, pool, "DistInit"); + futures.push_back(std::move(future)); } + waitFutures(futures); pool.wait(); for (const auto inc : last_increment) @@ -1756,24 +1756,21 @@ void StorageDistributed::flushClusterNodesAllData(ContextPtr local_context) LOG_INFO(log, "Flushing pending INSERT blocks"); Stopwatch watch; - ThreadGroupPtr thread_group = CurrentThread::getGroup(); ThreadPool pool(CurrentMetrics::StorageDistributedThreads, CurrentMetrics::StorageDistributedThreadsActive, CurrentMetrics::StorageDistributedThreadsScheduled, directory_queues.size()); + std::vector> futures; + for (const auto & node : directory_queues) { - pool.scheduleOrThrowOnError([node_to_flush = node, &thread_group]() + auto future = scheduleFromThreadPool([node_to_flush = node] { - SCOPE_EXIT_SAFE( - if (thread_group) - CurrentThread::detachFromGroupIfNotDetached(); - ); - if (thread_group) - CurrentThread::attachToGroup(thread_group); - setThreadName("DistFlush"); - node_to_flush->flushAllData(); - }); + }, pool, "DistFlush"); + futures.push_back(std::move(future)); } + + waitFutures(futures); pool.wait(); + LOG_INFO(log, "Pending INSERT blocks flushed, took {} ms.", watch.elapsedMilliseconds()); } else From 06d28eebe598e975fa14595d1ad1a5a1af3e1999 Mon Sep 17 00:00:00 2001 From: Kruglov Pavel <48961922+Avogar@users.noreply.github.com> Date: Wed, 20 Mar 2024 13:58:21 +0100 Subject: [PATCH 23/30] Fix test reference --- ...159_parallel_formatting_tsv_and_friends.reference | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/queries/1_stateful/00159_parallel_formatting_tsv_and_friends.reference b/tests/queries/1_stateful/00159_parallel_formatting_tsv_and_friends.reference index 91e3af03db8..1131cbb670d 100644 --- a/tests/queries/1_stateful/00159_parallel_formatting_tsv_and_friends.reference +++ b/tests/queries/1_stateful/00159_parallel_formatting_tsv_and_friends.reference @@ -1,12 +1,12 @@ TSV, false -194d5061de4cae59489d989373f8effe - +2cc7bfde1a2855814c6ea2c8181679c8 - TSV, true -194d5061de4cae59489d989373f8effe - +2cc7bfde1a2855814c6ea2c8181679c8 - TSVWithNames, false -a6d327a3611288b3f973d00e6116f16e - +c4cb6f9c0d77cd76f2584279993b4438 - TSVWithNames, true -a6d327a3611288b3f973d00e6116f16e - +c4cb6f9c0d77cd76f2584279993b4438 - TSKV, false -c2e32a21c08aacf60bda21248ce4f73f - +fd9ccbc364c90e1f7682348fe7f11a5a - TSKV, true -c2e32a21c08aacf60bda21248ce4f73f - +fd9ccbc364c90e1f7682348fe7f11a5a - From e34c4618a52b37665c04a3f3b4192add46976cf6 Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Wed, 20 Mar 2024 15:56:54 +0100 Subject: [PATCH 24/30] Fix data race during snapshot destructor call --- src/Coordination/KeeperDispatcher.cpp | 8 +- src/Coordination/KeeperSnapshotManager.h | 3 +- src/Coordination/KeeperStateMachine.cpp | 87 ++++++++++--------- src/Coordination/tests/gtest_coordination.cpp | 2 +- 4 files changed, 50 insertions(+), 50 deletions(-) diff --git a/src/Coordination/KeeperDispatcher.cpp b/src/Coordination/KeeperDispatcher.cpp index cd642087130..84e1632e9b1 100644 --- a/src/Coordination/KeeperDispatcher.cpp +++ b/src/Coordination/KeeperDispatcher.cpp @@ -325,12 +325,12 @@ void KeeperDispatcher::snapshotThread() if (!snapshots_queue.pop(task)) break; - if (shutdown_called) - break; - try { - auto snapshot_file_info = task.create_snapshot(std::move(task.snapshot)); + auto snapshot_file_info = task.create_snapshot(std::move(task.snapshot), /*execute_only_cleanup=*/shutdown_called); + + if (shutdown_called) + break; if (snapshot_file_info.path.empty()) continue; diff --git a/src/Coordination/KeeperSnapshotManager.h b/src/Coordination/KeeperSnapshotManager.h index ad3bcee028a..8ba0f92a564 100644 --- a/src/Coordination/KeeperSnapshotManager.h +++ b/src/Coordination/KeeperSnapshotManager.h @@ -98,8 +98,7 @@ struct SnapshotFileInfo }; using KeeperStorageSnapshotPtr = std::shared_ptr; -using CreateSnapshotCallback = std::function; - +using CreateSnapshotCallback = std::function; using SnapshotMetaAndStorage = std::pair; diff --git a/src/Coordination/KeeperStateMachine.cpp b/src/Coordination/KeeperStateMachine.cpp index 0c398a0d549..3dbdb329b93 100644 --- a/src/Coordination/KeeperStateMachine.cpp +++ b/src/Coordination/KeeperStateMachine.cpp @@ -564,63 +564,65 @@ void KeeperStateMachine::create_snapshot(nuraft::snapshot & s, nuraft::async_res } /// create snapshot task for background execution (in snapshot thread) - snapshot_task.create_snapshot = [this, when_done](KeeperStorageSnapshotPtr && snapshot) + snapshot_task.create_snapshot = [this, when_done](KeeperStorageSnapshotPtr && snapshot, bool execute_only_cleanup) { nuraft::ptr exception(nullptr); bool ret = true; - try + if (!execute_only_cleanup) { - { /// Read storage data without locks and create snapshot - std::lock_guard lock(snapshots_lock); + try + { + { /// Read storage data without locks and create snapshot + std::lock_guard lock(snapshots_lock); - if (latest_snapshot_meta && snapshot->snapshot_meta->get_last_log_idx() <= latest_snapshot_meta->get_last_log_idx()) - { - LOG_INFO( - log, - "Will not create a snapshot with last log idx {} because a snapshot with bigger last log idx ({}) is already " - "created", - snapshot->snapshot_meta->get_last_log_idx(), - latest_snapshot_meta->get_last_log_idx()); - } - else - { - latest_snapshot_meta = snapshot->snapshot_meta; - /// we rely on the fact that the snapshot disk cannot be changed during runtime - if (isLocalDisk(*keeper_context->getLatestSnapshotDisk())) + if (latest_snapshot_meta && snapshot->snapshot_meta->get_last_log_idx() <= latest_snapshot_meta->get_last_log_idx()) { - auto snapshot_info = snapshot_manager.serializeSnapshotToDisk(*snapshot); - latest_snapshot_info = std::move(snapshot_info); - latest_snapshot_buf = nullptr; + LOG_INFO( + log, + "Will not create a snapshot with last log idx {} because a snapshot with bigger last log idx ({}) is already " + "created", + snapshot->snapshot_meta->get_last_log_idx(), + latest_snapshot_meta->get_last_log_idx()); } else { - auto snapshot_buf = snapshot_manager.serializeSnapshotToBuffer(*snapshot); - auto snapshot_info = snapshot_manager.serializeSnapshotBufferToDisk(*snapshot_buf, snapshot->snapshot_meta->get_last_log_idx()); - latest_snapshot_info = std::move(snapshot_info); - latest_snapshot_buf = std::move(snapshot_buf); - } + latest_snapshot_meta = snapshot->snapshot_meta; + /// we rely on the fact that the snapshot disk cannot be changed during runtime + if (isLocalDisk(*keeper_context->getLatestSnapshotDisk())) + { + auto snapshot_info = snapshot_manager.serializeSnapshotToDisk(*snapshot); + latest_snapshot_info = std::move(snapshot_info); + latest_snapshot_buf = nullptr; + } + else + { + auto snapshot_buf = snapshot_manager.serializeSnapshotToBuffer(*snapshot); + auto snapshot_info = snapshot_manager.serializeSnapshotBufferToDisk(*snapshot_buf, snapshot->snapshot_meta->get_last_log_idx()); + latest_snapshot_info = std::move(snapshot_info); + latest_snapshot_buf = std::move(snapshot_buf); + } - ProfileEvents::increment(ProfileEvents::KeeperSnapshotCreations); - LOG_DEBUG(log, "Created persistent snapshot {} with path {}", latest_snapshot_meta->get_last_log_idx(), latest_snapshot_info.path); + ProfileEvents::increment(ProfileEvents::KeeperSnapshotCreations); + LOG_DEBUG(log, "Created persistent snapshot {} with path {}", latest_snapshot_meta->get_last_log_idx(), latest_snapshot_info.path); + } } } - + catch (...) { - /// Destroy snapshot with lock - std::lock_guard lock(storage_and_responses_lock); - LOG_TRACE(log, "Clearing garbage after snapshot"); - /// Turn off "snapshot mode" and clear outdate part of storage state - storage->clearGarbageAfterSnapshot(); - LOG_TRACE(log, "Cleared garbage after snapshot"); - snapshot.reset(); + ProfileEvents::increment(ProfileEvents::KeeperSnapshotCreationsFailed); + LOG_TRACE(log, "Exception happened during snapshot"); + tryLogCurrentException(log); + ret = false; } } - catch (...) { - ProfileEvents::increment(ProfileEvents::KeeperSnapshotCreationsFailed); - LOG_TRACE(log, "Exception happened during snapshot"); - tryLogCurrentException(log); - ret = false; + /// Destroy snapshot with lock + std::lock_guard lock(storage_and_responses_lock); + LOG_TRACE(log, "Clearing garbage after snapshot"); + /// Turn off "snapshot mode" and clear outdate part of storage state + storage->clearGarbageAfterSnapshot(); + LOG_TRACE(log, "Cleared garbage after snapshot"); + snapshot.reset(); } when_done(ret, exception); @@ -628,11 +630,10 @@ void KeeperStateMachine::create_snapshot(nuraft::snapshot & s, nuraft::async_res return ret ? latest_snapshot_info : SnapshotFileInfo{}; }; - if (keeper_context->getServerState() == KeeperContext::Phase::SHUTDOWN) { LOG_INFO(log, "Creating a snapshot during shutdown because 'create_snapshot_on_exit' is enabled."); - auto snapshot_file_info = snapshot_task.create_snapshot(std::move(snapshot_task.snapshot)); + auto snapshot_file_info = snapshot_task.create_snapshot(std::move(snapshot_task.snapshot), /*execute_only_cleanup=*/false); if (!snapshot_file_info.path.empty() && snapshot_manager_s3) { diff --git a/src/Coordination/tests/gtest_coordination.cpp b/src/Coordination/tests/gtest_coordination.cpp index 07dfac0670e..d314757efc9 100644 --- a/src/Coordination/tests/gtest_coordination.cpp +++ b/src/Coordination/tests/gtest_coordination.cpp @@ -1818,7 +1818,7 @@ void testLogAndStateMachine( bool pop_result = snapshots_queue.pop(snapshot_task); EXPECT_TRUE(pop_result); - snapshot_task.create_snapshot(std::move(snapshot_task.snapshot)); + snapshot_task.create_snapshot(std::move(snapshot_task.snapshot), /*execute_only_cleanup=*/false); } if (snapshot_created && changelog.size() > settings->reserved_log_items) From f44127ce19f3723eac91fcc795d0adf0adb0b427 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Wed, 20 Mar 2024 19:13:51 +0100 Subject: [PATCH 25/30] fix --- src/Databases/DatabaseReplicatedWorker.cpp | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/Databases/DatabaseReplicatedWorker.cpp b/src/Databases/DatabaseReplicatedWorker.cpp index 0a6e8f9345e..51065062995 100644 --- a/src/Databases/DatabaseReplicatedWorker.cpp +++ b/src/Databases/DatabaseReplicatedWorker.cpp @@ -77,10 +77,7 @@ void DatabaseReplicatedDDLWorker::initializeReplication() zookeeper->deleteEphemeralNodeIfContentMatches(active_path, active_id); if (active_node_holder) active_node_holder->setAlreadyRemoved(); - zookeeper->create(active_path, active_id, zkutil::CreateMode::Ephemeral); active_node_holder.reset(); - active_node_holder_zookeeper = zookeeper; - active_node_holder = zkutil::EphemeralNodeHolder::existing(active_path, *active_node_holder_zookeeper); String log_ptr_str = zookeeper->get(database->replica_path + "/log_ptr"); UInt32 our_log_ptr = parse(log_ptr_str); @@ -129,9 +126,15 @@ void DatabaseReplicatedDDLWorker::initializeReplication() initializeLogPointer(log_entry_name); } - std::lock_guard lock{database->metadata_mutex}; - if (!database->checkDigestValid(context, false)) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Inconsistent database metadata after reconnection to ZooKeeper"); + { + std::lock_guard lock{database->metadata_mutex}; + if (!database->checkDigestValid(context, false)) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Inconsistent database metadata after reconnection to ZooKeeper"); + } + + zookeeper->create(active_path, active_id, zkutil::CreateMode::Ephemeral); + active_node_holder_zookeeper = zookeeper; + active_node_holder = zkutil::EphemeralNodeHolder::existing(active_path, *active_node_holder_zookeeper); } String DatabaseReplicatedDDLWorker::enqueueQuery(DDLLogEntry & entry) From 93dd212f6f9602e690ec3f378691ae57b29d1112 Mon Sep 17 00:00:00 2001 From: Nikolay Degterinsky Date: Wed, 20 Mar 2024 23:55:25 +0000 Subject: [PATCH 26/30] Fix use-of-uninitialized-value in HedgedConnections --- src/Client/HedgedConnections.cpp | 1 - src/Client/HedgedConnections.h | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/Client/HedgedConnections.cpp b/src/Client/HedgedConnections.cpp index 9fa79dd6b77..fb4d9a6bdcc 100644 --- a/src/Client/HedgedConnections.cpp +++ b/src/Client/HedgedConnections.cpp @@ -67,7 +67,6 @@ HedgedConnections::HedgedConnections( } active_connection_count = connections.size(); - offsets_with_disabled_changing_replica = 0; pipeline_for_new_replicas.add([throttler_](ReplicaState & replica_) { replica_.connection->setThrottler(throttler_); }); } diff --git a/src/Client/HedgedConnections.h b/src/Client/HedgedConnections.h index 5bc274332db..7f538804e5a 100644 --- a/src/Client/HedgedConnections.h +++ b/src/Client/HedgedConnections.h @@ -178,12 +178,12 @@ private: std::queue offsets_queue; /// The current number of valid connections to the replicas of this shard. - size_t active_connection_count; + size_t active_connection_count = 0; /// We count offsets in which we can't change replica anymore, /// it's needed to cancel choosing new replicas when we /// disabled replica changing in all offsets. - size_t offsets_with_disabled_changing_replica; + size_t offsets_with_disabled_changing_replica = 0; Pipeline pipeline_for_new_replicas; From f162ea83417ab93a6883ad5f3ac9daa80e013dcd Mon Sep 17 00:00:00 2001 From: Nikolay Degterinsky Date: Thu, 21 Mar 2024 09:51:10 +0000 Subject: [PATCH 27/30] Fix use-of-uninitialized-value in parseDateTimeBestEffort --- src/IO/parseDateTimeBestEffort.cpp | 3 +++ tests/queries/0_stateless/03014_msan_parse_date_time.reference | 0 tests/queries/0_stateless/03014_msan_parse_date_time.sql | 1 + 3 files changed, 4 insertions(+) create mode 100644 tests/queries/0_stateless/03014_msan_parse_date_time.reference create mode 100644 tests/queries/0_stateless/03014_msan_parse_date_time.sql diff --git a/src/IO/parseDateTimeBestEffort.cpp b/src/IO/parseDateTimeBestEffort.cpp index caf51d94bb3..83928b32f2f 100644 --- a/src/IO/parseDateTimeBestEffort.cpp +++ b/src/IO/parseDateTimeBestEffort.cpp @@ -147,6 +147,9 @@ ReturnType parseDateTimeBestEffortImpl( { has_comma_between_date_and_time = true; ++in.position(); + + if (in.eof()) + break; } } diff --git a/tests/queries/0_stateless/03014_msan_parse_date_time.reference b/tests/queries/0_stateless/03014_msan_parse_date_time.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/03014_msan_parse_date_time.sql b/tests/queries/0_stateless/03014_msan_parse_date_time.sql new file mode 100644 index 00000000000..d6daea69cfa --- /dev/null +++ b/tests/queries/0_stateless/03014_msan_parse_date_time.sql @@ -0,0 +1 @@ +SELECT parseDateTimeBestEffort(toFixedString('01/12/2017,', 11)); -- { serverError CANNOT_PARSE_DATETIME } From 77e947c44f343d3aab32340c4fda4545fb794a57 Mon Sep 17 00:00:00 2001 From: "Mikhail f. Shiryaev" Date: Thu, 21 Mar 2024 14:09:01 +0100 Subject: [PATCH 28/30] Avoid `IsADirectoryError: Is a directory contrib/azure` --- tests/ci/style_check.py | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/tests/ci/style_check.py b/tests/ci/style_check.py index 797c0ef12d0..2604793c900 100644 --- a/tests/ci/style_check.py +++ b/tests/ci/style_check.py @@ -100,20 +100,28 @@ def is_python(file: Union[Path, str]) -> bool: """returns if the changed file in the repository is python script""" # WARNING: python-magic v2:0.4.24-2 is used in ubuntu 22.04, # and `Support os.PathLike values in magic.from_file` is only from 0.4.25 - return bool( - magic.from_file(os.path.join(REPO_COPY, file), mime=True) - == "text/x-script.python" - ) + try: + return bool( + magic.from_file(os.path.join(REPO_COPY, file), mime=True) + == "text/x-script.python" + ) + except IsADirectoryError: + # Process submodules w/o errors + return False def is_shell(file: Union[Path, str]) -> bool: """returns if the changed file in the repository is shell script""" # WARNING: python-magic v2:0.4.24-2 is used in ubuntu 22.04, # and `Support os.PathLike values in magic.from_file` is only from 0.4.25 - return bool( - magic.from_file(os.path.join(REPO_COPY, file), mime=True) - == "text/x-shellscript" - ) + try: + return bool( + magic.from_file(os.path.join(REPO_COPY, file), mime=True) + == "text/x-shellscript" + ) + except IsADirectoryError: + # Process submodules w/o errors + return False def main(): From 15a61a998a176314debe152b5d59be4ff3f5797a Mon Sep 17 00:00:00 2001 From: "Mikhail f. Shiryaev" Date: Thu, 21 Mar 2024 14:34:31 +0100 Subject: [PATCH 29/30] Fix logic for run_cpp_check, it had a bug --- tests/ci/style_check.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/ci/style_check.py b/tests/ci/style_check.py index 2604793c900..d7f6fa998e9 100644 --- a/tests/ci/style_check.py +++ b/tests/ci/style_check.py @@ -143,8 +143,8 @@ def main(): run_python_check = True if CI and pr_info.number > 0: pr_info.fetch_changed_files() - run_cpp_check = not any( - is_python(file) or is_shell(file) for file in pr_info.changed_files + run_cpp_check = any( + not (is_python(file) or is_shell(file)) for file in pr_info.changed_files ) run_shell_check = any(is_shell(file) for file in pr_info.changed_files) run_python_check = any(is_python(file) for file in pr_info.changed_files) From 48cb228c9a88b1f77c9ec10ce04107edec3e190c Mon Sep 17 00:00:00 2001 From: Shuai li Date: Thu, 21 Mar 2024 21:47:29 +0800 Subject: [PATCH 30/30] Fix addDays cause an error when used datetime64 (#61561) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Fix addDays cause an error when used datetime64 * add exception describe * Update tests/queries/0_stateless/03013_addDays_with_timezone.sql * remove file be executable * fix timezone * fix ci --------- Co-authored-by: János Benjamin Antal --- .../FunctionDateOrDateTimeAddInterval.h | 4 +-- ...21_datetime64_compatibility_long.reference | 32 +++++++++---------- .../03013_addDays_with_timezone.reference | 1 + .../03013_addDays_with_timezone.sql | 1 + 4 files changed, 20 insertions(+), 18 deletions(-) create mode 100644 tests/queries/0_stateless/03013_addDays_with_timezone.reference create mode 100644 tests/queries/0_stateless/03013_addDays_with_timezone.sql diff --git a/src/Functions/FunctionDateOrDateTimeAddInterval.h b/src/Functions/FunctionDateOrDateTimeAddInterval.h index b8c0d27c42e..f50b1415622 100644 --- a/src/Functions/FunctionDateOrDateTimeAddInterval.h +++ b/src/Functions/FunctionDateOrDateTimeAddInterval.h @@ -621,9 +621,9 @@ public: } else { - if (!WhichDataType(arguments[0].type).isDateTime()) + if (!WhichDataType(arguments[0].type).isDateTimeOrDateTime64()) throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of first argument of function {}. " - "Must be a DateTime", arguments[0].type->getName(), getName()); + "Must be a DateTime/DateTime64", arguments[0].type->getName(), getName()); if (!WhichDataType(arguments[2].type).isString()) throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of third argument of function {}. " diff --git a/tests/queries/0_stateless/00921_datetime64_compatibility_long.reference b/tests/queries/0_stateless/00921_datetime64_compatibility_long.reference index a946a114bf4..74b7b207661 100644 --- a/tests/queries/0_stateless/00921_datetime64_compatibility_long.reference +++ b/tests/queries/0_stateless/00921_datetime64_compatibility_long.reference @@ -241,82 +241,82 @@ SELECT toYYYYMMDDhhmmss(N, \'Asia/Istanbul\') SELECT addYears(N, 1, \'Asia/Istanbul\') Code: 43 "DateTime('Asia/Istanbul')","2020-09-16 19:20:11" -Code: 43 +"DateTime64(3, 'Asia/Istanbul')","2020-09-16 19:20:11.234" ------------------------------------------ SELECT addMonths(N, 1, \'Asia/Istanbul\') Code: 43 "DateTime('Asia/Istanbul')","2019-10-16 19:20:11" -Code: 43 +"DateTime64(3, 'Asia/Istanbul')","2019-10-16 19:20:11.234" ------------------------------------------ SELECT addWeeks(N, 1, \'Asia/Istanbul\') Code: 43 "DateTime('Asia/Istanbul')","2019-09-23 19:20:11" -Code: 43 +"DateTime64(3, 'Asia/Istanbul')","2019-09-23 19:20:11.234" ------------------------------------------ SELECT addDays(N, 1, \'Asia/Istanbul\') Code: 43 "DateTime('Asia/Istanbul')","2019-09-17 19:20:11" -Code: 43 +"DateTime64(3, 'Asia/Istanbul')","2019-09-17 19:20:11.234" ------------------------------------------ SELECT addHours(N, 1, \'Asia/Istanbul\') Code: 43 "DateTime('Asia/Istanbul')","2019-09-16 20:20:11" -Code: 43 +"DateTime64(3, 'Asia/Istanbul')","2019-09-16 20:20:11.234" ------------------------------------------ SELECT addMinutes(N, 1, \'Asia/Istanbul\') Code: 43 "DateTime('Asia/Istanbul')","2019-09-16 19:21:11" -Code: 43 +"DateTime64(3, 'Asia/Istanbul')","2019-09-16 19:21:11.234" ------------------------------------------ SELECT addSeconds(N, 1, \'Asia/Istanbul\') Code: 43 "DateTime('Asia/Istanbul')","2019-09-16 19:20:12" -Code: 43 +"DateTime64(3, 'Asia/Istanbul')","2019-09-16 19:20:12.234" ------------------------------------------ SELECT addQuarters(N, 1, \'Asia/Istanbul\') Code: 43 "DateTime('Asia/Istanbul')","2019-12-16 19:20:11" -Code: 43 +"DateTime64(3, 'Asia/Istanbul')","2019-12-16 19:20:11.234" ------------------------------------------ SELECT subtractYears(N, 1, \'Asia/Istanbul\') Code: 43 "DateTime('Asia/Istanbul')","2018-09-16 19:20:11" -Code: 43 +"DateTime64(3, 'Asia/Istanbul')","2018-09-16 19:20:11.234" ------------------------------------------ SELECT subtractMonths(N, 1, \'Asia/Istanbul\') Code: 43 "DateTime('Asia/Istanbul')","2019-08-16 19:20:11" -Code: 43 +"DateTime64(3, 'Asia/Istanbul')","2019-08-16 19:20:11.234" ------------------------------------------ SELECT subtractWeeks(N, 1, \'Asia/Istanbul\') Code: 43 "DateTime('Asia/Istanbul')","2019-09-09 19:20:11" -Code: 43 +"DateTime64(3, 'Asia/Istanbul')","2019-09-09 19:20:11.234" ------------------------------------------ SELECT subtractDays(N, 1, \'Asia/Istanbul\') Code: 43 "DateTime('Asia/Istanbul')","2019-09-15 19:20:11" -Code: 43 +"DateTime64(3, 'Asia/Istanbul')","2019-09-15 19:20:11.234" ------------------------------------------ SELECT subtractHours(N, 1, \'Asia/Istanbul\') Code: 43 "DateTime('Asia/Istanbul')","2019-09-16 18:20:11" -Code: 43 +"DateTime64(3, 'Asia/Istanbul')","2019-09-16 18:20:11.234" ------------------------------------------ SELECT subtractMinutes(N, 1, \'Asia/Istanbul\') Code: 43 "DateTime('Asia/Istanbul')","2019-09-16 19:19:11" -Code: 43 +"DateTime64(3, 'Asia/Istanbul')","2019-09-16 19:19:11.234" ------------------------------------------ SELECT subtractSeconds(N, 1, \'Asia/Istanbul\') Code: 43 "DateTime('Asia/Istanbul')","2019-09-16 19:20:10" -Code: 43 +"DateTime64(3, 'Asia/Istanbul')","2019-09-16 19:20:10.234" ------------------------------------------ SELECT subtractQuarters(N, 1, \'Asia/Istanbul\') Code: 43 "DateTime('Asia/Istanbul')","2019-06-16 19:20:11" -Code: 43 +"DateTime64(3, 'Asia/Istanbul')","2019-06-16 19:20:11.234" ------------------------------------------ SELECT CAST(N as DateTime(\'Europe/Minsk\')) "DateTime('Europe/Minsk')","2019-09-16 00:00:00" diff --git a/tests/queries/0_stateless/03013_addDays_with_timezone.reference b/tests/queries/0_stateless/03013_addDays_with_timezone.reference new file mode 100644 index 00000000000..6a8fa4f8a2c --- /dev/null +++ b/tests/queries/0_stateless/03013_addDays_with_timezone.reference @@ -0,0 +1 @@ +2024-01-11 00:00:00.000000 diff --git a/tests/queries/0_stateless/03013_addDays_with_timezone.sql b/tests/queries/0_stateless/03013_addDays_with_timezone.sql new file mode 100644 index 00000000000..eb822d53898 --- /dev/null +++ b/tests/queries/0_stateless/03013_addDays_with_timezone.sql @@ -0,0 +1 @@ +select addDays(toDateTime64('2024-01-01', 6, 'Asia/Shanghai'), 10, 'Asia/Shanghai');