diff --git a/contrib/NuRaft b/contrib/NuRaft index e15858f8ad0..1be805e7cb2 160000 --- a/contrib/NuRaft +++ b/contrib/NuRaft @@ -1 +1 @@ -Subproject commit e15858f8ad0ce8aba85cf74e3763874c76bf927c +Subproject commit 1be805e7cb2494aa8170015493474379b0362dfc diff --git a/docs/en/operations/settings/settings.md b/docs/en/operations/settings/settings.md index 3869168becd..dde40acb91a 100644 --- a/docs/en/operations/settings/settings.md +++ b/docs/en/operations/settings/settings.md @@ -3145,6 +3145,17 @@ Result: └─────┴─────┴───────┘ ``` +## enable_extended_results_for_datetime_functions {#enable-extended-results-for-datetime-functions} + +Enables or disables returning results of type `Date32` with extended range (compared to type `Date`) for functions [toStartOfYear](../../sql-reference/functions/date-time-functions.md#tostartofyear), [toStartOfISOYear](../../sql-reference/functions/date-time-functions.md#tostartofisoyear), [toStartOfQuarter](../../sql-reference/functions/date-time-functions.md#tostartofquarter), [toStartOfMonth](../../sql-reference/functions/date-time-functions.md#tostartofmonth), [toStartOfWeek](../../sql-reference/functions/date-time-functions.md#tostartofweek), [toMonday](../../sql-reference/functions/date-time-functions.md#tomonday) and [toLastDayOfMonth](../../sql-reference/functions/date-time-functions.md#tolastdayofmonth). + +Possible values: + +- 0 — Functions return `Date` for all types of arguments. +- 1 — Functions return `Date32` for `Date32` or `DateTime64` arguments and `Date` otherwise. + +Default value: `0`. + ## optimize_move_to_prewhere {#optimize_move_to_prewhere} Enables or disables automatic [PREWHERE](../../sql-reference/statements/select/prewhere.md) optimization in [SELECT](../../sql-reference/statements/select/index.md) queries. @@ -3530,8 +3541,8 @@ desc format(JSONEachRow, '{"x" : 1, "y" : "String", "z" : "0.0.0.0" }') settings Result: ```sql -x UInt8 -y Nullable(String) +x UInt8 +y Nullable(String) z IPv4 ``` diff --git a/docs/en/sql-reference/functions/date-time-functions.md b/docs/en/sql-reference/functions/date-time-functions.md index 8688f3eb3a0..001c7822433 100644 --- a/docs/en/sql-reference/functions/date-time-functions.md +++ b/docs/en/sql-reference/functions/date-time-functions.md @@ -268,13 +268,15 @@ Result: ``` :::note -The return type of `toStartOf*`, `toLastDayOfMonth`, `toMonday` functions described below is `Date` or `DateTime`. -Though these functions can take values of the extended types `Date32` and `DateTime64` as an argument, passing them a time outside the normal range (year 1970 to 2149 for `Date` / 2106 for `DateTime`) will produce wrong results. -In case argument is out of normal range: +The return type of `toStartOf*`, `toLastDayOfMonth`, `toMonday` functions described below is determined by the configuration parameter [enable_extended_results_for_datetime_functions](../../operations/settings/settings#enable-extended-results-for-datetime-functions) which is `0` by default. + +Behavior for +* `enable_extended_results_for_datetime_functions = 0`: Functions `toStartOf*`, `toLastDayOfMonth`, `toMonday` return `Date` or `DateTime`. Though these functions can take values of the extended types `Date32` and `DateTime64` as an argument, passing them a time outside the normal range (year 1970 to 2149 for `Date` / 2106 for `DateTime`) will produce wrong results. In case argument is out of normal range: * If the argument is smaller than 1970, the result will be calculated from the argument `1970-01-01 (00:00:00)` instead. * If the return type is `DateTime` and the argument is larger than `2106-02-07 08:28:15`, the result will be calculated from the argument `2106-02-07 08:28:15` instead. * If the return type is `Date` and the argument is larger than `2149-06-06`, the result will be calculated from the argument `2149-06-06` instead. * If `toLastDayOfMonth` is called with an argument greater then `2149-05-31`, the result will be calculated from the argument `2149-05-31` instead. +* `enable_extended_results_for_datetime_functions = 1`: Functions `toStartOfYear`, `toStartOfISOYear`, `toStartOfQuarter`, `toStartOfMonth`, `toStartOfWeek`, `toLastDayOfMonth`, `toMonday` return `Date` or `DateTime` if their argument is a `Date` or `DateTime`, and they return `Date32` or `DateTime64` if their argument is a `Date32` or `DateTime64`. ::: ## toStartOfYear @@ -303,6 +305,8 @@ Returns the date. Rounds up a date or date with time to the last day of the month. Returns the date. +If `toLastDayOfMonth` is called with an argument of type `Date` greater then 2149-05-31, the result will be calculated from the argument 2149-05-31 instead. + ## toMonday Rounds down a date or date with time to the nearest Monday. @@ -640,7 +644,7 @@ Result: ## date\_diff -Returns the difference between two dates or dates with time values. +Returns the difference between two dates or dates with time values. The difference is calculated using relative units, e.g. the difference between `2022-01-01` and `2021-12-29` is 3 days for day unit (see [toRelativeDayNum](#torelativedaynum)), 1 month for month unit (see [toRelativeMonthNum](#torelativemonthnum)), 1 year for year unit (see [toRelativeYearNum](#torelativeyearnum)). **Syntax** @@ -1059,9 +1063,9 @@ SELECT ## timeSlots(StartTime, Duration,\[, Size\]) -For a time interval starting at ‘StartTime’ and continuing for ‘Duration’ seconds, it returns an array of moments in time, consisting of points from this interval rounded down to the ‘Size’ in seconds. ‘Size’ is an optional parameter set to 1800 (30 minutes) by default. -This is necessary, for example, when searching for pageviews in the corresponding session. -Accepts DateTime and DateTime64 as ’StartTime’ argument. For DateTime, ’Duration’ and ’Size’ arguments must be `UInt32`. For ’DateTime64’ they must be `Decimal64`. +For a time interval starting at ‘StartTime’ and continuing for ‘Duration’ seconds, it returns an array of moments in time, consisting of points from this interval rounded down to the ‘Size’ in seconds. ‘Size’ is an optional parameter set to 1800 (30 minutes) by default. +This is necessary, for example, when searching for pageviews in the corresponding session. +Accepts DateTime and DateTime64 as ’StartTime’ argument. For DateTime, ’Duration’ and ’Size’ arguments must be `UInt32`. For ’DateTime64’ they must be `Decimal64`. Returns an array of DateTime/DateTime64 (return type matches the type of ’StartTime’). For DateTime64, the return value's scale can differ from the scale of ’StartTime’ --- the highest scale among all given arguments is taken. Example: diff --git a/docs/ru/operations/settings/settings.md b/docs/ru/operations/settings/settings.md index 5ddc684ce2a..0d4f0c63210 100644 --- a/docs/ru/operations/settings/settings.md +++ b/docs/ru/operations/settings/settings.md @@ -3799,6 +3799,17 @@ Exception: Total regexp lengths too large. Значение по умолчанию: `1`. +## enable_extended_results_for_datetime_functions {#enable-extended-results-for-datetime-functions} + +Включает или отключает возвращение результатов типа `Date32` с расширенным диапазоном (по сравнению с типом `Date`) для функций [toStartOfYear](../../sql-reference/functions/date-time-functions.md#tostartofyear), [toStartOfISOYear](../../sql-reference/functions/date-time-functions.md#tostartofisoyear), [toStartOfQuarter](../../sql-reference/functions/date-time-functions.md#tostartofquarter), [toStartOfMonth](../../sql-reference/functions/date-time-functions.md#tostartofmonth), [toStartOfWeek](../../sql-reference/functions/date-time-functions.md#tostartofweek), [toMonday](../../sql-reference/functions/date-time-functions.md#tomonday) и [toLastDayOfMonth](../../sql-reference/functions/date-time-functions.md#tolastdayofmonth). + +Возможные значения: + +- 0 — Функции возвращают результаты типа `Date` для всех типов аргументов. +- 1 — Функции возвращают результаты типа `Date32` для аргументов типа `Date32` или `DateTime64` и возвращают `Date` в других случаях. + +Значение по умолчанию: `0`. + **Пример** Запрос: diff --git a/docs/ru/sql-reference/functions/date-time-functions.md b/docs/ru/sql-reference/functions/date-time-functions.md index 1c623cd1dab..27689426cbe 100644 --- a/docs/ru/sql-reference/functions/date-time-functions.md +++ b/docs/ru/sql-reference/functions/date-time-functions.md @@ -268,24 +268,18 @@ SELECT toUnixTimestamp('2017-11-05 08:07:47', 'Asia/Tokyo') AS unix_timestamp; ``` :::note -Тип возвращаемого описанными далее функциями `toStartOf*`, `toMonday` значения - `Date` или `DateTime`. -Хотя эти функции могут принимать значения типа `Date32` или `DateTime64` в качестве аргумента, при обработке аргумента вне нормального диапазона значений (`1970` - `2148` для `Date` и `1970-01-01 00:00:00`-`2106-02-07 08:28:15` для `DateTime`) будет получен некорректный результат. -Возвращаемые значения для значений вне нормального диапазона: -* `1970-01-01 (00:00:00)` будет возвращён для моментов времени до 1970 года, -* `2106-02-07 08:28:15` будет взят в качестве аргумента, если полученный аргумент превосходит данное значение и возвращаемый тип - `DateTime`, -* `2149-06-06` будет взят в качестве аргумента, если полученный аргумент превосходит данное значение и возвращаемый тип - `Date`, -* `2149-05-31` будет результатом функции `toLastDayOfMonth` при обработке аргумента больше `2149-05-31`. +Тип возвращаемого значения описанными далее функциями `toStartOf*`, `toLastDayOfMonth`, `toMonday` определяется конфигурационным параметром [enable_extended_results_for_datetime_functions](../../operations/settings/settings#enable-extended-results-for-datetime-functions) имеющим по умолчанию значение `0`. + +Поведение для +* `enable_extended_results_for_datetime_functions = 0`: Функции `toStartOf*`, `toLastDayOfMonth`, `toMonday` возвращают `Date` или `DateTime`. Хотя эти функции могут принимать значения типа `Date32` или `DateTime64` в качестве аргумента, при обработке аргумента вне нормального диапазона значений (`1970` - `2148` для `Date` и `1970-01-01 00:00:00`-`2106-02-07 08:28:15` для `DateTime`) будет получен некорректный результат. +В случае если значение аргумента вне нормального диапазона: + * `1970-01-01 (00:00:00)` будет возвращён для моментов времени до 1970 года, + * `2106-02-07 08:28:15` будет взят в качестве аргумента, если полученный аргумент превосходит данное значение и возвращаемый тип - `DateTime`, + * `2149-06-06` будет взят в качестве аргумента, если полученный аргумент превосходит данное значение и возвращаемый тип - `Date`, + * `2149-05-31` будет результатом функции `toLastDayOfMonth` при обработке аргумента больше `2149-05-31`. +* `enable_extended_results_for_datetime_functions = 1`: Функции `toStartOfYear`, `toStartOfISOYear`, `toStartOfQuarter`, `toStartOfMonth`, `toStartOfWeek`, `toLastDayOfMonth`, `toMonday` возвращают `Date` или `DateTime` если их аргумент `Date` или `DateTime` и они возвращают `Date32` или `DateTime64` если их аргумент `Date32` или `DateTime64`. ::: -:::note -Тип возвращаемого описанными далее функциями `toStartOf*`, `toLastDayOfMonth`, `toMonday` значения - `Date` или `DateTime`. -Хотя эти функции могут принимать значения типа `Date32` или `DateTime64` в качестве аргумента, при обработке аргумента вне нормального диапазона значений (`1970` - `2148` для `Date` и `1970-01-01 00:00:00`-`2106-02-07 08:28:15` для `DateTime`) будет получен некорректный результат. -Возвращаемые значения для значений вне нормального диапазона: -* `1970-01-01 (00:00:00)` будет возвращён для моментов времени до 1970 года, -* `2106-02-07 08:28:15` будет взят в качестве аргумента, если полученный аргумент превосходит данное значение и возвращаемый тип - `DateTime`, -* `2149-06-06` будет взят в качестве аргумента, если полученный аргумент превосходит данное значение и возвращаемый тип - `Date`. - ::: -* ## toStartOfYear {#tostartofyear} Округляет дату или дату-с-временем вниз до первого дня года. @@ -324,6 +318,8 @@ SELECT toStartOfISOYear(toDate('2017-01-01')) AS ISOYear20170101; Округляет дату или дату-с-временем до последнего числа месяца. Возвращается дата. +Если `toLastDayOfMonth` вызывается с аргументом типа `Date` большим чем 2149-05-31, то результат будет вычислен от аргумента 2149-05-31. + ## toMonday {#tomonday} Округляет дату или дату-с-временем вниз до ближайшего понедельника. @@ -977,7 +973,7 @@ SELECT now('Europe/Moscow'); ## timeSlots(StartTime, Duration,\[, Size\]) {#timeslotsstarttime-duration-size} Для интервала, начинающегося в `StartTime` и длящегося `Duration` секунд, возвращает массив моментов времени, кратных `Size`. Параметр `Size` указывать необязательно, по умолчанию он равен 1800 секундам (30 минутам) - необязательный параметр. Данная функция может использоваться, например, для анализа количества просмотров страницы за соответствующую сессию. -Аргумент `StartTime` может иметь тип `DateTime` или `DateTime64`. В случае, если используется `DateTime`, аргументы `Duration` и `Size` должны иметь тип `UInt32`; Для DateTime64 они должны быть типа `Decimal64`. +Аргумент `StartTime` может иметь тип `DateTime` или `DateTime64`. В случае, если используется `DateTime`, аргументы `Duration` и `Size` должны иметь тип `UInt32`; Для DateTime64 они должны быть типа `Decimal64`. Возвращает массив DateTime/DateTime64 (тип будет совпадать с типом параметра ’StartTime’). Для DateTime64 масштаб(scale) возвращаемой величины может отличаться от масштаба фргумента ’StartTime’ --- результат будет иметь наибольший масштаб среди всех данных аргументов. Пример использования: diff --git a/programs/client/Client.cpp b/programs/client/Client.cpp index 6506c23428a..5bd9d28d8e3 100644 --- a/programs/client/Client.cpp +++ b/programs/client/Client.cpp @@ -150,7 +150,7 @@ std::vector Client::loadWarningMessages() size_t rows = packet.block.rows(); for (size_t i = 0; i < rows; ++i) - messages.emplace_back(column.getDataAt(i).toString()); + messages.emplace_back(column[i].get()); } continue; diff --git a/programs/library-bridge/CatBoostLibraryHandler.cpp b/programs/library-bridge/CatBoostLibraryHandler.cpp index 2c3ed583463..4fe539a53b2 100644 --- a/programs/library-bridge/CatBoostLibraryHandler.cpp +++ b/programs/library-bridge/CatBoostLibraryHandler.cpp @@ -98,7 +98,7 @@ void placeStringColumn(const ColumnString & column, const char ** buffer, size_t size_t size = column.size(); for (size_t i = 0; i < size; ++i) { - *buffer = const_cast(column.getDataAtWithTerminatingZero(i).data); + *buffer = const_cast(column.getDataAt(i).data); buffer += features_count; } } diff --git a/src/AggregateFunctions/AggregateFunctionMinMaxAny.h b/src/AggregateFunctions/AggregateFunctionMinMaxAny.h index d579a925f9d..ad633418ec3 100644 --- a/src/AggregateFunctions/AggregateFunctionMinMaxAny.h +++ b/src/AggregateFunctions/AggregateFunctionMinMaxAny.h @@ -492,7 +492,7 @@ public: void insertResultInto(IColumn & to) const { if (has()) - assert_cast(to).insertDataWithTerminatingZero(getData(), size); + assert_cast(to).insertData(getData(), size); else assert_cast(to).insertDefault(); } @@ -569,7 +569,7 @@ public: void change(const IColumn & column, size_t row_num, Arena * arena) { - changeImpl(assert_cast(column).getDataAtWithTerminatingZero(row_num), arena); + changeImpl(assert_cast(column).getDataAt(row_num), arena); } void change(const Self & to, Arena * arena) @@ -618,7 +618,7 @@ public: bool changeIfLess(const IColumn & column, size_t row_num, Arena * arena) { - if (!has() || assert_cast(column).getDataAtWithTerminatingZero(row_num) < getStringRef()) + if (!has() || assert_cast(column).getDataAt(row_num) < getStringRef()) { change(column, row_num, arena); return true; @@ -640,7 +640,7 @@ public: bool changeIfGreater(const IColumn & column, size_t row_num, Arena * arena) { - if (!has() || assert_cast(column).getDataAtWithTerminatingZero(row_num) > getStringRef()) + if (!has() || assert_cast(column).getDataAt(row_num) > getStringRef()) { change(column, row_num, arena); return true; @@ -667,7 +667,7 @@ public: bool isEqualTo(const IColumn & column, size_t row_num) const { - return has() && assert_cast(column).getDataAtWithTerminatingZero(row_num) == getStringRef(); + return has() && assert_cast(column).getDataAt(row_num) == getStringRef(); } static bool allocatesMemoryInArena() diff --git a/src/Client/Suggest.cpp b/src/Client/Suggest.cpp index f8d41853566..552895e754d 100644 --- a/src/Client/Suggest.cpp +++ b/src/Client/Suggest.cpp @@ -187,9 +187,8 @@ void Suggest::fillWordsFromBlock(const Block & block) Words new_words; new_words.reserve(rows); for (size_t i = 0; i < rows; ++i) - { - new_words.emplace_back(column.getDataAt(i).toString()); - } + new_words.emplace_back(column[i].get()); + addWords(std::move(new_words)); } diff --git a/src/Columns/ColumnArray.cpp b/src/Columns/ColumnArray.cpp index 7bddfc14707..bb56baf9216 100644 --- a/src/Columns/ColumnArray.cpp +++ b/src/Columns/ColumnArray.cpp @@ -151,23 +151,24 @@ void ColumnArray::get(size_t n, Field & res) const StringRef ColumnArray::getDataAt(size_t n) const { + assert(n < size()); + /** Returns the range of memory that covers all elements of the array. * Works for arrays of fixed length values. - * For arrays of strings and arrays of arrays, the resulting chunk of memory may not be one-to-one correspondence with the elements, - * since it contains only the data laid in succession, but not the offsets. */ - size_t offset_of_first_elem = offsetAt(n); - StringRef first = getData().getDataAtWithTerminatingZero(offset_of_first_elem); + /// We are using pointer arithmetic on the addresses of the array elements. + if (!data->isFixedAndContiguous()) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method getDataAt is not supported for {}", getName()); size_t array_size = sizeAt(n); if (array_size == 0) - return StringRef(first.data, 0); + return StringRef(nullptr, 0); - size_t offset_of_last_elem = getOffsets()[n] - 1; - StringRef last = getData().getDataAtWithTerminatingZero(offset_of_last_elem); + size_t offset_of_first_elem = offsetAt(n); + StringRef first = getData().getDataAt(offset_of_first_elem); - return StringRef(first.data, last.data + last.size - first.data); + return StringRef(first.data, first.size * array_size); } @@ -183,7 +184,7 @@ void ColumnArray::insertData(const char * pos, size_t length) /** Similarly - only for arrays of fixed length values. */ if (!data->isFixedAndContiguous()) - throw Exception("Method insertData is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED); + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method insertData is not supported for {}", getName()); size_t field_size = data->sizeOfValueIfFixed(); diff --git a/src/Columns/ColumnConst.h b/src/Columns/ColumnConst.h index 99a230720a4..3b15b7239b9 100644 --- a/src/Columns/ColumnConst.h +++ b/src/Columns/ColumnConst.h @@ -81,11 +81,6 @@ public: return data->getDataAt(0); } - StringRef getDataAtWithTerminatingZero(size_t) const override - { - return data->getDataAtWithTerminatingZero(0); - } - UInt64 get64(size_t) const override { return data->get64(0); diff --git a/src/Columns/ColumnLowCardinality.h b/src/Columns/ColumnLowCardinality.h index 7cd226c4c11..4786c57f8a5 100644 --- a/src/Columns/ColumnLowCardinality.h +++ b/src/Columns/ColumnLowCardinality.h @@ -59,10 +59,6 @@ public: void get(size_t n, Field & res) const override { getDictionary().get(getIndexes().getUInt(n), res); } StringRef getDataAt(size_t n) const override { return getDictionary().getDataAt(getIndexes().getUInt(n)); } - StringRef getDataAtWithTerminatingZero(size_t n) const override - { - return getDictionary().getDataAtWithTerminatingZero(getIndexes().getUInt(n)); - } bool isDefaultAt(size_t n) const override { return getDictionary().isDefaultAt(getIndexes().getUInt(n)); } UInt64 get64(size_t n) const override { return getDictionary().get64(getIndexes().getUInt(n)); } diff --git a/src/Columns/ColumnString.h b/src/Columns/ColumnString.h index 361b792df55..895ec16430c 100644 --- a/src/Columns/ColumnString.h +++ b/src/Columns/ColumnString.h @@ -108,12 +108,6 @@ public: return StringRef(&chars[offsetAt(n)], sizeAt(n) - 1); } - StringRef getDataAtWithTerminatingZero(size_t n) const override - { - assert(n < size()); - return StringRef(&chars[offsetAt(n)], sizeAt(n)); - } - bool isDefaultAt(size_t n) const override { assert(n < size()); @@ -177,17 +171,6 @@ public: offsets.push_back(new_size); } - /// Like getData, but inserting data should be zero-ending (i.e. length is 1 byte greater than real string size). - void insertDataWithTerminatingZero(const char * pos, size_t length) - { - const size_t old_size = chars.size(); - const size_t new_size = old_size + length; - - chars.resize(new_size); - memcpy(chars.data() + old_size, pos, length); - offsets.push_back(new_size); - } - void popBack(size_t n) override { size_t nested_n = offsets.back() - offsetAt(offsets.size() - n); diff --git a/src/Columns/ColumnUnique.h b/src/Columns/ColumnUnique.h index d3ab87410f3..8420441aac8 100644 --- a/src/Columns/ColumnUnique.h +++ b/src/Columns/ColumnUnique.h @@ -70,10 +70,6 @@ public: void get(size_t n, Field & res) const override { getNestedColumn()->get(n, res); } bool isDefaultAt(size_t n) const override { return n == 0; } StringRef getDataAt(size_t n) const override { return getNestedColumn()->getDataAt(n); } - StringRef getDataAtWithTerminatingZero(size_t n) const override - { - return getNestedColumn()->getDataAtWithTerminatingZero(n); - } UInt64 get64(size_t n) const override { return getNestedColumn()->get64(n); } UInt64 getUInt(size_t n) const override { return getNestedColumn()->getUInt(n); } Int64 getInt(size_t n) const override { return getNestedColumn()->getInt(n); } diff --git a/src/Columns/IColumn.h b/src/Columns/IColumn.h index 974925d247e..d11bc9d435d 100644 --- a/src/Columns/IColumn.h +++ b/src/Columns/IColumn.h @@ -106,13 +106,6 @@ public: /// Is used to optimize some computations (in aggregation, for example). [[nodiscard]] virtual StringRef getDataAt(size_t n) const = 0; - /// Like getData, but has special behavior for columns that contain variable-length strings. - /// Returns zero-ending memory chunk (i.e. its size is 1 byte longer). - [[nodiscard]] virtual StringRef getDataAtWithTerminatingZero(size_t n) const - { - return getDataAt(n); - } - /// If column stores integers, it returns n-th element transformed to UInt64 using static_cast. /// If column stores floating point numbers, bits of n-th elements are copied to lower bits of UInt64, the remaining bits are zeros. /// Is used to optimize some computations (in aggregation, for example). diff --git a/src/Columns/tests/gtest_column_object.cpp b/src/Columns/tests/gtest_column_object.cpp index e1ad949f6a8..f9b6ff16b71 100644 --- a/src/Columns/tests/gtest_column_object.cpp +++ b/src/Columns/tests/gtest_column_object.cpp @@ -1,8 +1,11 @@ #include #include +#include +#include #include #include #include +#include #include #include @@ -118,3 +121,36 @@ TEST(ColumnObject, InsertRangeFrom) checkFieldsAreEqual(subcolumn_dst, fields_dst); } } + +TEST(ColumnObject, Unflatten) +{ + auto check_empty_tuple = [](const auto & type, const auto & column) + { + const auto & type_tuple = assert_cast(*type); + const auto & column_tuple = assert_cast(*column); + + ASSERT_EQ(type_tuple.getElements().size(), 1); + ASSERT_EQ(type_tuple.getElements()[0]->getName(), "UInt8"); + ASSERT_EQ(type_tuple.getElementNames()[0], ColumnObject::COLUMN_NAME_DUMMY); + + ASSERT_EQ(column_tuple.getColumns().size(), 1); + ASSERT_EQ(column_tuple.getColumns()[0]->getName(), "UInt8"); + }; + + { + auto column_object = ColumnObject::create(false); + auto [column, type] = unflattenObjectToTuple(*column_object); + + check_empty_tuple(type, column); + ASSERT_EQ(column->size(), 0); + } + + { + auto column_object = ColumnObject::create(false); + column_object->insertManyDefaults(5); + auto [column, type] = unflattenObjectToTuple(*column_object); + + check_empty_tuple(type, column); + ASSERT_EQ(column->size(), 5); + } +} diff --git a/src/Coordination/CoordinationSettings.cpp b/src/Coordination/CoordinationSettings.cpp index f634bcbb281..3e03ee0d6f4 100644 --- a/src/Coordination/CoordinationSettings.cpp +++ b/src/Coordination/CoordinationSettings.cpp @@ -189,9 +189,6 @@ KeeperConfigurationAndSettings::loadFromConfig(const Poco::Util::AbstractConfigu ret->coordination_settings->loadFromConfig("keeper_server.coordination_settings", config); - if (ret->coordination_settings->quorum_reads) - LOG_WARNING(&Poco::Logger::get("KeeperConfigurationAndSettings"), "Setting 'quorum_reads' is deprecated. Please use 'read_mode'"); - return ret; } diff --git a/src/Coordination/CoordinationSettings.h b/src/Coordination/CoordinationSettings.h index d6b0977b4fa..c436c1b6635 100644 --- a/src/Coordination/CoordinationSettings.h +++ b/src/Coordination/CoordinationSettings.h @@ -26,7 +26,6 @@ struct Settings; M(Milliseconds, heart_beat_interval_ms, 500, "Heartbeat interval between quorum nodes", 0) \ M(Milliseconds, election_timeout_lower_bound_ms, 1000, "Lower bound of election timer (avoid too often leader elections)", 0) \ M(Milliseconds, election_timeout_upper_bound_ms, 2000, "Upper bound of election timer (avoid too often leader elections)", 0) \ - M(Milliseconds, leadership_expiry, 0, "How often will leader node check if it still has majority. Set it lower or equal to election_timeout_lower_bound_ms to have linearizable reads.", 0) \ M(UInt64, reserved_log_items, 100000, "How many log items to store (don't remove during compaction)", 0) \ M(UInt64, snapshot_distance, 100000, "How many log items we have to collect to write new snapshot", 0) \ M(Bool, auto_forwarding, true, "Allow to forward write requests from followers to leader", 0) \ @@ -39,12 +38,11 @@ struct Settings; M(UInt64, stale_log_gap, 10000, "When node became stale and should receive snapshots from leader", 0) \ M(UInt64, fresh_log_gap, 200, "When node became fresh", 0) \ M(UInt64, max_requests_batch_size, 100, "Max size of batch in requests count before it will be sent to RAFT", 0) \ - M(Bool, quorum_reads, false, "Deprecated - use read_mode. Execute read requests as writes through whole RAFT consesus with similar speed", 0) \ + M(Bool, quorum_reads, false, "Execute read requests as writes through whole RAFT consesus with similar speed", 0) \ M(Bool, force_sync, true, "Call fsync on each change in RAFT changelog", 0) \ M(Bool, compress_logs, true, "Write compressed coordination logs in ZSTD format", 0) \ M(Bool, compress_snapshots_with_zstd_format, true, "Write compressed snapshots in ZSTD format (instead of custom LZ4)", 0) \ - M(UInt64, configuration_change_tries_count, 20, "How many times we will try to apply configuration change (add/remove server) to the cluster", 0) \ - M(String, read_mode, "nonlinear", "How should reads be processed. Valid values: 'nonlinear', 'fastlinear', 'quorum'. 'nonlinear' is the fastest option because there are no consistency requirements", 0) + M(UInt64, configuration_change_tries_count, 20, "How many times we will try to apply configuration change (add/remove server) to the cluster", 0) DECLARE_SETTINGS_TRAITS(CoordinationSettingsTraits, LIST_OF_COORDINATION_SETTINGS) diff --git a/src/Coordination/KeeperDispatcher.cpp b/src/Coordination/KeeperDispatcher.cpp index 3445ef5ea23..261e43d80e4 100644 --- a/src/Coordination/KeeperDispatcher.cpp +++ b/src/Coordination/KeeperDispatcher.cpp @@ -1,5 +1,4 @@ #include -#include #include #include #include @@ -7,8 +6,6 @@ #include #include #include -#include -#include #include #include @@ -33,83 +30,22 @@ namespace ErrorCodes KeeperDispatcher::KeeperDispatcher() : responses_queue(std::numeric_limits::max()) - , read_requests_queue(std::numeric_limits::max()) - , finalize_requests_queue(std::numeric_limits::max()) , configuration_and_settings(std::make_shared()) , log(&Poco::Logger::get("KeeperDispatcher")) { } -/// ZooKeepers has 2 requirements: -/// - writes need to be linearizable -/// - all requests from single session need to be processed in the order of their arrival -/// -/// Because of that, we cannot process read and write requests from SAME session at the same time. -/// To be able to process read and write requests in parallel we need to make sure that only 1 type -/// of request is being processed from a single session. -/// Multiple types from different sessions can be processed at the same time. -/// -/// We do some in-session housekeeping to make sure that the multithreaded request processing is correct. -/// When a request is received from a client, we check if there are requests being processed from that same -/// session, and if yes, of what type. If the types are the same, and there are no requests of different -/// type inbetetween, we can instanly add it to active request queue. Otherwise, we need to wait until -/// all requests of the other type are processed. -/// -/// There are multiple threads used for processing the request, each of them communicating with a queue. -/// Assumption: only one type of request is being processed from a same session at any point in time (read or write). -/// -/// requestThread -> requests currently being processed -/// readRequestThread -> thread for processing read requests -/// finalizeRequestThread -> thread for finalizing requests: -/// - in-session housekeeping, add requests to the active request queue if there are any -/// -/// If reads are linearizable without quorum, a request can possibly wait for a certain log to be committed. -/// In that case we add it to the waiting queue for that log. -/// When that log is committed, the committing thread will send that read request to readRequestThread so it can be processed. -/// void KeeperDispatcher::requestThread() { setThreadName("KeeperReqT"); /// Result of requests batch from previous iteration - RaftResult prev_result = nullptr; - const auto previous_quorum_done = [&] { return !prev_result || prev_result->has_result() || prev_result->get_result_code() != nuraft::cmd_result_code::OK; }; + RaftAppendResult prev_result = nullptr; + /// Requests from previous iteration. We store them to be able + /// to send errors to the client. + KeeperStorage::RequestsForSessions prev_batch; - const auto needs_quorum = [](const auto & coordination_settings, const auto & request) - { - return coordination_settings->quorum_reads || coordination_settings->read_mode.toString() == "quorum" || !request.request->isReadRequest(); - }; - - KeeperStorage::RequestsForSessions quorum_requests; - KeeperStorage::RequestsForSessions read_requests; - - auto process_quorum_requests = [&, this]() mutable - { - /// Forcefully process all previous pending requests - if (prev_result) - forceWaitAndProcessResult(prev_result); - - prev_result = server->putRequestBatch(quorum_requests); - - if (prev_result) - { - prev_result->when_ready([&, requests_for_sessions = std::move(quorum_requests)](nuraft::cmd_result> & result, nuraft::ptr &) mutable - { - if (!result.get_accepted() || result.get_result_code() == nuraft::cmd_result_code::TIMEOUT) - addErrorResponses(requests_for_sessions, Coordination::Error::ZOPERATIONTIMEOUT); - else if (result.get_result_code() != nuraft::cmd_result_code::OK) - addErrorResponses(requests_for_sessions, Coordination::Error::ZCONNECTIONLOSS); - }); - } - - quorum_requests.clear(); - }; - - /// ZooKeeper requires that the requests inside a single session are processed in a strict order - /// (we cannot process later requests before all the previous once are processed) - /// By making sure that at this point we can either have just read requests or just write requests - /// from a single session, we can process them independently while (!shutdown_called) { KeeperStorage::RequestForSession request; @@ -118,67 +54,94 @@ void KeeperDispatcher::requestThread() uint64_t max_wait = coordination_settings->operation_timeout_ms.totalMilliseconds(); uint64_t max_batch_size = coordination_settings->max_requests_batch_size; + /// The code below do a very simple thing: batch all write (quorum) requests into vector until + /// previous write batch is not finished or max_batch size achieved. The main complexity goes from + /// the ability to process read requests without quorum (from local state). So when we are collecting + /// requests into a batch we must check that the new request is not read request. Otherwise we have to + /// process all already accumulated write requests, wait them synchronously and only after that process + /// read request. So reads are some kind of "separator" for writes. try { - if (active_requests_queue->tryPop(request, max_wait)) + if (requests_queue->tryPop(request, max_wait)) { CurrentMetrics::sub(CurrentMetrics::KeeperOutstandingRequets); if (shutdown_called) break; - if (needs_quorum(coordination_settings, request)) - quorum_requests.emplace_back(request); - else - read_requests.emplace_back(request); + KeeperStorage::RequestsForSessions current_batch; - /// Waiting until previous append will be successful, or batch is big enough - /// has_result == false && get_result_code == OK means that our request still not processed. - /// Sometimes NuRaft set errorcode without setting result, so we check both here. - while (true) + bool has_read_request = false; + + /// If new request is not read request or we must to process it through quorum. + /// Otherwise we will process it locally. + if (coordination_settings->quorum_reads || !request.request->isReadRequest()) { - if (quorum_requests.size() > max_batch_size) - break; + current_batch.emplace_back(request); - if (read_requests.size() > max_batch_size) + /// Waiting until previous append will be successful, or batch is big enough + /// has_result == false && get_result_code == OK means that our request still not processed. + /// Sometimes NuRaft set errorcode without setting result, so we check both here. + while (prev_result && (!prev_result->has_result() && prev_result->get_result_code() == nuraft::cmd_result_code::OK) && current_batch.size() <= max_batch_size) { - processReadRequests(coordination_settings, read_requests); + /// Trying to get batch requests as fast as possible + if (requests_queue->tryPop(request, 1)) + { + CurrentMetrics::sub(CurrentMetrics::KeeperOutstandingRequets); + /// Don't append read request into batch, we have to process them separately + if (!coordination_settings->quorum_reads && request.request->isReadRequest()) + { + has_read_request = true; + break; + } + else + { - if (previous_quorum_done()) + current_batch.emplace_back(request); + } + } + + if (shutdown_called) break; } - - /// Trying to get batch requests as fast as possible - if (active_requests_queue->tryPop(request, 1)) - { - CurrentMetrics::sub(CurrentMetrics::KeeperOutstandingRequets); - if (needs_quorum(coordination_settings, request)) - quorum_requests.emplace_back(request); - else - read_requests.emplace_back(request); - } - else - { - /// batch of read requests can send at most one request - /// so we don't care if the previous batch hasn't received response - if (!read_requests.empty()) - processReadRequests(coordination_settings, read_requests); - - /// if we still didn't process previous batch we can - /// increase are current batch even more - if (previous_quorum_done()) - break; - } - - if (shutdown_called) - break; } + else + has_read_request = true; if (shutdown_called) break; - if (!quorum_requests.empty()) - process_quorum_requests(); + /// Forcefully process all previous pending requests + if (prev_result) + forceWaitAndProcessResult(prev_result, prev_batch); + /// Process collected write requests batch + if (!current_batch.empty()) + { + auto result = server->putRequestBatch(current_batch); + + if (result) + { + if (has_read_request) /// If we will execute read request next, than we have to process result now + forceWaitAndProcessResult(result, current_batch); + } + else + { + addErrorResponses(current_batch, Coordination::Error::ZCONNECTIONLOSS); + current_batch.clear(); + } + + prev_batch = std::move(current_batch); + prev_result = result; + } + + /// Read request always goes after write batch (last request) + if (has_read_request) + { + if (server->isLeaderAlive()) + server->putLocalReadRequest(request); + else + addErrorResponses({request}, Coordination::Error::ZCONNECTIONLOSS); + } } } catch (...) @@ -188,72 +151,6 @@ void KeeperDispatcher::requestThread() } } -void KeeperDispatcher::processReadRequests(const CoordinationSettingsPtr & coordination_settings, KeeperStorage::RequestsForSessions & read_requests) -{ - if (coordination_settings->read_mode.toString() == "fastlinear") - { - // we just want to know what's the current latest committed log on Leader node - auto leader_info_result = server->getLeaderInfo(); - if (leader_info_result) - { - leader_info_result->when_ready([&, requests_for_sessions = std::move(read_requests)](nuraft::cmd_result> & result, nuraft::ptr & exception) mutable - { - if (!result.get_accepted() || result.get_result_code() == nuraft::cmd_result_code::TIMEOUT) - { - addErrorResponses(requests_for_sessions, Coordination::Error::ZOPERATIONTIMEOUT); - return; - } - - if (result.get_result_code() != nuraft::cmd_result_code::OK) - { - addErrorResponses(requests_for_sessions, Coordination::Error::ZCONNECTIONLOSS); - return; - } - - if (exception) - { - LOG_INFO(log, "Got exception while waiting for read results {}", exception->what()); - addErrorResponses(requests_for_sessions, Coordination::Error::ZCONNECTIONLOSS); - return; - } - - auto & leader_info_ctx = result.get(); - - if (!leader_info_ctx) - { - addErrorResponses(requests_for_sessions, Coordination::Error::ZCONNECTIONLOSS); - return; - } - - KeeperServer::NodeInfo leader_info; - leader_info.term = leader_info_ctx->get_ulong(); - leader_info.last_committed_index = leader_info_ctx->get_ulong(); - std::lock_guard lock(leader_waiter_mutex); - auto node_info = server->getNodeInfo(); - - /// we're behind, we need to wait - if (node_info.term < leader_info.term || node_info.last_committed_index < leader_info.last_committed_index) - { - auto & leader_waiter = leader_waiters[leader_info]; - leader_waiter.insert(leader_waiter.end(), requests_for_sessions.begin(), requests_for_sessions.end()); - LOG_TRACE(log, "waiting for term {}, idx {}", leader_info.term, leader_info.last_committed_index); - } - /// process it in background thread - else if (!read_requests_queue.push(std::move(requests_for_sessions))) - throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push read requests to queue"); - }); - } - } - else - { - assert(coordination_settings->read_mode.toString() == "nonlinear"); - if (!read_requests_queue.push(std::move(read_requests))) - throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push read requests to queue"); - } - - read_requests.clear(); -} - void KeeperDispatcher::responseThread() { setThreadName("KeeperRspT"); @@ -303,65 +200,6 @@ void KeeperDispatcher::snapshotThread() } } -/// Background thread for processing read requests -void KeeperDispatcher::readRequestThread() -{ - setThreadName("KeeperReadT"); - while (!shutdown_called) - { - KeeperStorage::RequestsForSessions requests; - if (!read_requests_queue.pop(requests)) - break; - - if (shutdown_called) - break; - - try - { - for (const auto & request_info : requests) - { - if (server->isLeaderAlive()) - server->putLocalReadRequest(request_info); - else - addErrorResponses({request_info}, Coordination::Error::ZCONNECTIONLOSS); - } - - if (!finalize_requests_queue.push(std::move(requests))) - throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push read requests to queue"); - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } - } -} - -/// We finalize requests every time we commit a single log with request -/// or process a batch of read requests. -/// Because it can get heavy, we do it in background thread. -void KeeperDispatcher::finalizeRequestsThread() -{ - setThreadName("KeeperFinalT"); - while (!shutdown_called) - { - KeeperStorage::RequestsForSessions requests; - if (!finalize_requests_queue.pop(requests)) - break; - - if (shutdown_called) - break; - - try - { - finalizeRequests(requests); - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } - } -} - void KeeperDispatcher::setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response) { std::lock_guard lock(session_to_response_callback_mutex); @@ -417,30 +255,6 @@ bool KeeperDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & requ request_info.time = duration_cast(system_clock::now().time_since_epoch()).count(); request_info.session_id = session_id; - { - std::lock_guard lock{unprocessed_request_mutex}; - auto unprocessed_requests_it = unprocessed_requests_for_session.find(session_id); - if (unprocessed_requests_it == unprocessed_requests_for_session.end()) - { - auto & unprocessed_requests = unprocessed_requests_for_session[session_id]; - unprocessed_requests.unprocessed_num = 1; - unprocessed_requests.is_read = request->isReadRequest(); - } - else - { - auto & unprocessed_requests = unprocessed_requests_it->second; - - /// queue is not empty, or the request types don't match, put it in the waiting queue - if (!unprocessed_requests.request_queue.empty() || unprocessed_requests.is_read != request->isReadRequest()) - { - unprocessed_requests.request_queue.push_back(std::move(request_info)); - return true; - } - - ++unprocessed_requests.unprocessed_num; - } - } - std::lock_guard lock(push_request_mutex); if (shutdown_called) @@ -449,10 +263,10 @@ bool KeeperDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & requ /// Put close requests without timeouts if (request->getOpNum() == Coordination::OpNum::Close) { - if (!active_requests_queue->push(std::move(request_info))) + if (!requests_queue->push(std::move(request_info))) throw Exception("Cannot push request to queue", ErrorCodes::SYSTEM_ERROR); } - else if (!active_requests_queue->tryPush(std::move(request_info), configuration_and_settings->coordination_settings->operation_timeout_ms.totalMilliseconds())) + else if (!requests_queue->tryPush(std::move(request_info), configuration_and_settings->coordination_settings->operation_timeout_ms.totalMilliseconds())) { throw Exception("Cannot push request to queue within operation timeout", ErrorCodes::TIMEOUT_EXCEEDED); } @@ -465,23 +279,13 @@ void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & conf LOG_DEBUG(log, "Initializing storage dispatcher"); configuration_and_settings = KeeperConfigurationAndSettings::loadFromConfig(config, standalone_keeper); - active_requests_queue = std::make_unique(configuration_and_settings->coordination_settings->max_requests_batch_size); + requests_queue = std::make_unique(configuration_and_settings->coordination_settings->max_requests_batch_size); request_thread = ThreadFromGlobalPool([this] { requestThread(); }); responses_thread = ThreadFromGlobalPool([this] { responseThread(); }); snapshot_thread = ThreadFromGlobalPool([this] { snapshotThread(); }); - read_request_thread = ThreadFromGlobalPool([this] { readRequestThread(); }); - finalize_requests_thread = ThreadFromGlobalPool([this] { finalizeRequestsThread(); }); - server = std::make_unique( - configuration_and_settings, - config, - responses_queue, - snapshots_queue, - [this](const KeeperStorage::RequestForSession & request_for_session, uint64_t log_term, uint64_t log_idx) - { onRequestCommit(request_for_session, log_term, log_idx); }, - [this](uint64_t term, uint64_t last_idx) - { onApplySnapshot(term, last_idx); }); + server = std::make_unique(configuration_and_settings, config, responses_queue, snapshots_queue); try { @@ -529,9 +333,9 @@ void KeeperDispatcher::shutdown() if (session_cleaner_thread.joinable()) session_cleaner_thread.join(); - if (active_requests_queue) + if (requests_queue) { - active_requests_queue->finish(); + requests_queue->finish(); if (request_thread.joinable()) request_thread.join(); @@ -545,14 +349,6 @@ void KeeperDispatcher::shutdown() if (snapshot_thread.joinable()) snapshot_thread.join(); - read_requests_queue.finish(); - if (read_request_thread.joinable()) - read_request_thread.join(); - - finalize_requests_queue.finish(); - if (finalize_requests_thread.joinable()) - finalize_requests_thread.join(); - update_configuration_queue.finish(); if (update_configuration_thread.joinable()) update_configuration_thread.join(); @@ -561,7 +357,7 @@ void KeeperDispatcher::shutdown() KeeperStorage::RequestForSession request_for_session; /// Set session expired for all pending requests - while (active_requests_queue && active_requests_queue->tryPop(request_for_session)) + while (requests_queue && requests_queue->tryPop(request_for_session)) { CurrentMetrics::sub(CurrentMetrics::KeeperOutstandingRequets); auto response = request_for_session.request->makeResponse(); @@ -678,7 +474,7 @@ void KeeperDispatcher::sessionCleanerTask() }; { std::lock_guard lock(push_request_mutex); - if (!active_requests_queue->push(std::move(request_info))) + if (!requests_queue->push(std::move(request_info))) LOG_INFO(log, "Cannot push close request to queue while cleaning outdated sessions"); CurrentMetrics::add(CurrentMetrics::KeeperOutstandingRequets); } @@ -728,12 +524,19 @@ void KeeperDispatcher::addErrorResponses(const KeeperStorage::RequestsForSession } } -void KeeperDispatcher::forceWaitAndProcessResult(RaftResult & result) +void KeeperDispatcher::forceWaitAndProcessResult(RaftAppendResult & result, KeeperStorage::RequestsForSessions & requests_for_sessions) { if (!result->has_result()) result->get(); + /// If we get some errors, than send them to clients + if (!result->get_accepted() || result->get_result_code() == nuraft::cmd_result_code::TIMEOUT) + addErrorResponses(requests_for_sessions, Coordination::Error::ZOPERATIONTIMEOUT); + else if (result->get_result_code() != nuraft::cmd_result_code::OK) + addErrorResponses(requests_for_sessions, Coordination::Error::ZCONNECTIONLOSS); + result = nullptr; + requests_for_sessions.clear(); } int64_t KeeperDispatcher::getSessionID(int64_t session_timeout_ms) @@ -781,7 +584,7 @@ int64_t KeeperDispatcher::getSessionID(int64_t session_timeout_ms) /// Push new session request to queue { std::lock_guard lock(push_request_mutex); - if (!active_requests_queue->tryPush(std::move(request_info), session_timeout_ms)) + if (!requests_queue->tryPush(std::move(request_info), session_timeout_ms)) throw Exception("Cannot push session id request to queue within session timeout", ErrorCodes::TIMEOUT_EXCEEDED); CurrentMetrics::add(CurrentMetrics::KeeperOutstandingRequets); } @@ -854,122 +657,6 @@ void KeeperDispatcher::updateConfigurationThread() } } -// Used to update the state for a session based on the requests -// - update the number of current unprocessed requests for the session -// - if the number of unprocessed requests is 0, we can start adding next type of requests -// from unprocessed requests queue to the active queue -void KeeperDispatcher::finalizeRequests(const KeeperStorage::RequestsForSessions & requests_for_sessions) -{ - std::unordered_map counts_for_session; - - for (const auto & request_for_session : requests_for_sessions) - { - ++counts_for_session[request_for_session.session_id]; - } - - std::lock_guard lock{unprocessed_request_mutex}; - for (const auto [session_id, count] : counts_for_session) - { - auto unprocessed_requests_it = unprocessed_requests_for_session.find(session_id); - if (unprocessed_requests_it == unprocessed_requests_for_session.end()) - continue; - - auto & unprocessed_requests = unprocessed_requests_it->second; - unprocessed_requests.unprocessed_num -= count; - - if (unprocessed_requests.unprocessed_num == 0) - { - if (!unprocessed_requests.request_queue.empty()) - { - auto & unprocessed_requests_queue = unprocessed_requests.request_queue; - unprocessed_requests.is_read = !unprocessed_requests.is_read; - // start adding next type of requests - while (!unprocessed_requests_queue.empty() && unprocessed_requests_queue.front().request->isReadRequest() == unprocessed_requests.is_read) - { - auto & front_request = unprocessed_requests_queue.front(); - - /// Put close requests without timeouts - if (front_request.request->getOpNum() == Coordination::OpNum::Close) - { - if (!active_requests_queue->push(std::move(front_request))) - throw Exception("Cannot push request to queue", ErrorCodes::SYSTEM_ERROR); - } - else if (!active_requests_queue->tryPush(std::move(front_request), configuration_and_settings->coordination_settings->operation_timeout_ms.totalMilliseconds())) - { - throw Exception("Cannot push request to queue within operation timeout", ErrorCodes::TIMEOUT_EXCEEDED); - } - - ++unprocessed_requests.unprocessed_num; - unprocessed_requests_queue.pop_front(); - } - } - else - { - unprocessed_requests_for_session.erase(unprocessed_requests_it); - } - } - } -} - -// Finalize request -// Process read requests that were waiting for this commit -void KeeperDispatcher::onRequestCommit(const KeeperStorage::RequestForSession & request_for_session, uint64_t log_term, uint64_t log_idx) -{ - if (!finalize_requests_queue.push({request_for_session})) - throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push read requests to queue"); - - KeeperStorage::RequestsForSessions requests; - { - std::lock_guard lock(leader_waiter_mutex); - auto request_queue_it = leader_waiters.find(KeeperServer::NodeInfo{.term = log_term, .last_committed_index = log_idx}); - if (request_queue_it != leader_waiters.end()) - { - requests = std::move(request_queue_it->second); - leader_waiters.erase(request_queue_it); - } - } - - if (requests.empty()) - return; - - if (!read_requests_queue.push(std::move(requests))) - throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push read requests to queue"); -} - -/// Process all read request that are waiting for lower or currently last processed log index -void KeeperDispatcher::onApplySnapshot(uint64_t term, uint64_t last_idx) -{ - KeeperServer::NodeInfo current_node_info{term, last_idx}; - KeeperStorage::RequestsForSessions requests; - { - std::lock_guard lock(leader_waiter_mutex); - for (auto leader_waiter_it = leader_waiters.begin(); leader_waiter_it != leader_waiters.end();) - { - auto waiting_node_info = leader_waiter_it->first; - if (waiting_node_info.term <= current_node_info.term - && waiting_node_info.last_committed_index <= current_node_info.last_committed_index) - { - for (auto & request : leader_waiter_it->second) - { - requests.push_back(std::move(request)); - } - - leader_waiter_it = leader_waiters.erase(leader_waiter_it); - } - else - { - ++leader_waiter_it; - } - } - } - - if (requests.empty()) - return; - - if (!read_requests_queue.push(std::move(requests))) - throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push read requests to queue"); -} - bool KeeperDispatcher::isServerActive() const { return checkInit() && hasLeader() && !server->isRecovering(); @@ -1034,7 +721,7 @@ Keeper4LWInfo KeeperDispatcher::getKeeper4LWInfo() const Keeper4LWInfo result = server->getPartiallyFilled4LWInfo(); { std::lock_guard lock(push_request_mutex); - result.outstanding_requests_count = active_requests_queue->size(); + result.outstanding_requests_count = requests_queue->size(); } { std::lock_guard lock(session_to_response_callback_mutex); diff --git a/src/Coordination/KeeperDispatcher.h b/src/Coordination/KeeperDispatcher.h index 6421db87793..5e2701299f4 100644 --- a/src/Coordination/KeeperDispatcher.h +++ b/src/Coordination/KeeperDispatcher.h @@ -32,12 +32,9 @@ private: using UpdateConfigurationQueue = ConcurrentBoundedQueue; /// Size depends on coordination settings - /// Request currently being processed - std::unique_ptr active_requests_queue; + std::unique_ptr requests_queue; ResponsesQueue responses_queue; SnapshotsQueue snapshots_queue{1}; - ConcurrentBoundedQueue read_requests_queue; - ConcurrentBoundedQueue finalize_requests_queue; /// More than 1k updates is definitely misconfiguration. UpdateConfigurationQueue update_configuration_queue{1000}; @@ -67,8 +64,6 @@ private: ThreadFromGlobalPool snapshot_thread; /// Apply or wait for configuration changes ThreadFromGlobalPool update_configuration_thread; - ThreadFromGlobalPool read_request_thread; - ThreadFromGlobalPool finalize_requests_thread; /// RAFT wrapper. std::unique_ptr server; @@ -82,34 +77,6 @@ private: /// Counter for new session_id requests. std::atomic internal_session_id_counter{0}; - /// A read request needs to have at least the log it was the last committed log on the leader - /// at the time the request was being made. - /// If the node is stale, we need to wait to commit that log before doing local read requests to achieve - /// linearizability. - std::unordered_map leader_waiters; - std::mutex leader_waiter_mutex; - - /// We can be actively processing one type of requests (either read or write) from a single session. - /// If we receive a request of a type that is not currently being processed, we put it in the waiting queue. - /// Also, we want to process them in ariving order, so if we have a different type in the queue, we cannot process that request - /// but wait for all the previous requests to finish. - /// E.g. READ -> WRITE -> READ, the last READ will go to the waiting queue even though we are currently processing the first READ - /// because we have WRITE request before it that needs to be processed. - struct UnprocessedRequests - { - /// how many requests are currently in the active request queue - size_t unprocessed_num{0}; - /// is_read currently being processed - bool is_read{false}; - std::list request_queue; - }; - - // Called every time a batch of requests are processed. - void finalizeRequests(const KeeperStorage::RequestsForSessions & requests_for_sessions); - - std::unordered_map unprocessed_requests_for_session; - std::mutex unprocessed_request_mutex; - /// Thread put requests to raft void requestThread(); /// Thread put responses for subscribed sessions @@ -121,12 +88,6 @@ private: /// Thread apply or wait configuration changes from leader void updateConfigurationThread(); - void readRequestThread(); - - void finalizeRequestsThread(); - - void processReadRequests(const CoordinationSettingsPtr & coordination_settings, KeeperStorage::RequestsForSessions & read_requests); - void setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response); /// Add error responses for requests to responses queue. @@ -135,7 +96,7 @@ private: /// Forcefully wait for result and sets errors if something when wrong. /// Clears both arguments - static void forceWaitAndProcessResult(RaftResult & result); + void forceWaitAndProcessResult(RaftAppendResult & result, KeeperStorage::RequestsForSessions & requests_for_sessions); public: /// Just allocate some objects, real initialization is done by `intialize method` @@ -155,12 +116,6 @@ public: return server && server->checkInit(); } - /// Called when a single log with request is committed. - void onRequestCommit(const KeeperStorage::RequestForSession & request_for_session, uint64_t log_term, uint64_t log_idx); - - /// Called when a snapshot is applied - void onApplySnapshot(uint64_t term, uint64_t last_idx); - /// Is server accepting requests, i.e. connected to the cluster /// and achieved quorum bool isServerActive() const; diff --git a/src/Coordination/KeeperServer.cpp b/src/Coordination/KeeperServer.cpp index b708c5a51ba..8186ddd0c00 100644 --- a/src/Coordination/KeeperServer.cpp +++ b/src/Coordination/KeeperServer.cpp @@ -105,9 +105,7 @@ KeeperServer::KeeperServer( const KeeperConfigurationAndSettingsPtr & configuration_and_settings_, const Poco::Util::AbstractConfiguration & config, ResponsesQueue & responses_queue_, - SnapshotsQueue & snapshots_queue_, - KeeperStateMachine::CommitCallback commit_callback, - KeeperStateMachine::ApplySnapshotCallback apply_snapshot_callback) + SnapshotsQueue & snapshots_queue_) : server_id(configuration_and_settings_->server_id) , coordination_settings(configuration_and_settings_->coordination_settings) , log(&Poco::Logger::get("KeeperServer")) @@ -115,7 +113,7 @@ KeeperServer::KeeperServer( , keeper_context{std::make_shared()} , create_snapshot_on_exit(config.getBool("keeper_server.create_snapshot_on_exit", true)) { - if (coordination_settings->quorum_reads || coordination_settings->read_mode.toString() == "quorum") + if (coordination_settings->quorum_reads) LOG_WARNING(log, "Quorum reads enabled, Keeper will work slower."); keeper_context->digest_enabled = config.getBool("keeper_server.digest_enabled", false); @@ -127,9 +125,7 @@ KeeperServer::KeeperServer( configuration_and_settings_->snapshot_storage_path, coordination_settings, keeper_context, - checkAndGetSuperdigest(configuration_and_settings_->super_digest), - std::move(commit_callback), - std::move(apply_snapshot_callback)); + checkAndGetSuperdigest(configuration_and_settings_->super_digest)); state_manager = nuraft::cs_new( server_id, @@ -180,13 +176,6 @@ struct KeeperServer::KeeperRaftServer : public nuraft::raft_server reconfigure(new_config); } - RaftResult getLeaderInfo() - { - nuraft::ptr req - = nuraft::cs_new(0ull, nuraft::msg_type::leader_status_request, 0, 0, 0ull, 0ull, 0ull); - return send_msg_to_leader(req); - } - void commit_in_bg() override { // For NuRaft, if any commit fails (uncaught exception) the whole server aborts as a safety @@ -280,20 +269,6 @@ void KeeperServer::launchRaftServer(const Poco::Util::AbstractConfiguration & co coordination_settings->election_timeout_lower_bound_ms.totalMilliseconds(), "election_timeout_lower_bound_ms", log); params.election_timeout_upper_bound_ = getValueOrMaxInt32AndLogWarning( coordination_settings->election_timeout_upper_bound_ms.totalMilliseconds(), "election_timeout_upper_bound_ms", log); - - params.leadership_expiry_ = getValueOrMaxInt32AndLogWarning(coordination_settings->leadership_expiry.totalMilliseconds(), "leadership_expiry", log); - - if (coordination_settings->read_mode.toString() == "fastlinear") - { - if (params.leadership_expiry_ == 0) - params.leadership_expiry_ = params.election_timeout_lower_bound_; - else if (params.leadership_expiry_ > params.election_timeout_lower_bound_) - { - LOG_WARNING(log, "To use fast linearizable reads, leadership_expiry should be set to a value that is less or equal to the election_timeout_upper_bound_ms. " - "Based on current settings, there are no guarantees for linearizability of reads."); - } - } - params.reserved_log_items_ = getValueOrMaxInt32AndLogWarning(coordination_settings->reserved_log_items, "reserved_log_items", log); params.snapshot_distance_ = getValueOrMaxInt32AndLogWarning(coordination_settings->snapshot_distance, "snapshot_distance", log); @@ -512,7 +487,7 @@ void KeeperServer::putLocalReadRequest(const KeeperStorage::RequestForSession & state_machine->processReadRequest(request_for_session); } -RaftResult KeeperServer::putRequestBatch(const KeeperStorage::RequestsForSessions & requests_for_sessions) +RaftAppendResult KeeperServer::putRequestBatch(const KeeperStorage::RequestsForSessions & requests_for_sessions) { std::vector> entries; for (const auto & request_for_session : requests_for_sessions) @@ -738,20 +713,6 @@ std::vector KeeperServer::getDeadSessions() return state_machine->getDeadSessions(); } -RaftResult KeeperServer::getLeaderInfo() -{ - std::lock_guard lock{server_write_mutex}; - if (is_recovering) - return nullptr; - - return raft_instance->getLeaderInfo(); -} - -KeeperServer::NodeInfo KeeperServer::getNodeInfo() -{ - return { .term = raft_instance->get_term(), .last_committed_index = state_machine->last_commit_index() }; -} - ConfigUpdateActions KeeperServer::getConfigurationDiff(const Poco::Util::AbstractConfiguration & config) { auto diff = state_manager->getConfigurationDiff(config); diff --git a/src/Coordination/KeeperServer.h b/src/Coordination/KeeperServer.h index 02ab643044a..6873ef2a01e 100644 --- a/src/Coordination/KeeperServer.h +++ b/src/Coordination/KeeperServer.h @@ -14,7 +14,7 @@ namespace DB { -using RaftResult = nuraft::ptr>>; +using RaftAppendResult = nuraft::ptr>>; class KeeperServer { @@ -71,9 +71,7 @@ public: const KeeperConfigurationAndSettingsPtr & settings_, const Poco::Util::AbstractConfiguration & config_, ResponsesQueue & responses_queue_, - SnapshotsQueue & snapshots_queue_, - KeeperStateMachine::CommitCallback commit_callback, - KeeperStateMachine::ApplySnapshotCallback apply_snapshot_callback); + SnapshotsQueue & snapshots_queue_); /// Load state machine from the latest snapshot and load log storage. Start NuRaft with required settings. void startup(const Poco::Util::AbstractConfiguration & config, bool enable_ipv6 = true); @@ -86,7 +84,7 @@ public: /// Put batch of requests into Raft and get result of put. Responses will be set separately into /// responses_queue. - RaftResult putRequestBatch(const KeeperStorage::RequestsForSessions & requests); + RaftAppendResult putRequestBatch(const KeeperStorage::RequestsForSessions & requests); /// Return set of the non-active sessions std::vector getDeadSessions(); @@ -121,17 +119,6 @@ public: int getServerID() const { return server_id; } - struct NodeInfo - { - uint64_t term; - uint64_t last_committed_index; - - bool operator==(const NodeInfo &) const = default; - }; - - RaftResult getLeaderInfo(); - NodeInfo getNodeInfo(); - /// Get configuration diff between current configuration in RAFT and in XML file ConfigUpdateActions getConfigurationDiff(const Poco::Util::AbstractConfiguration & config); @@ -139,23 +126,10 @@ public: /// Synchronously check for update results with retries. void applyConfigurationUpdate(const ConfigUpdateAction & task); + /// Wait configuration update for action. Used by followers. /// Return true if update was successfully received. bool waitConfigurationUpdate(const ConfigUpdateAction & task); }; } -namespace std -{ - template <> - struct hash - { - size_t operator()(const DB::KeeperServer::NodeInfo & info) const - { - SipHash hash_state; - hash_state.update(info.term); - hash_state.update(info.last_committed_index); - return hash_state.get64(); - } - }; -} diff --git a/src/Coordination/KeeperStateMachine.cpp b/src/Coordination/KeeperStateMachine.cpp index 477d8104796..c5a66ce29ca 100644 --- a/src/Coordination/KeeperStateMachine.cpp +++ b/src/Coordination/KeeperStateMachine.cpp @@ -44,9 +44,7 @@ KeeperStateMachine::KeeperStateMachine( const std::string & snapshots_path_, const CoordinationSettingsPtr & coordination_settings_, const KeeperContextPtr & keeper_context_, - const std::string & superdigest_, - CommitCallback commit_callback_, - ApplySnapshotCallback apply_snapshot_callback_) + const std::string & superdigest_) : coordination_settings(coordination_settings_) , snapshot_manager( snapshots_path_, @@ -60,8 +58,6 @@ KeeperStateMachine::KeeperStateMachine( , last_committed_idx(0) , log(&Poco::Logger::get("KeeperStateMachine")) , superdigest(superdigest_) - , commit_callback(std::move(commit_callback_)) - , apply_snapshot_callback(std::move(apply_snapshot_callback_)) , keeper_context(keeper_context_) { } @@ -227,11 +223,11 @@ bool KeeperStateMachine::preprocess(const KeeperStorage::RequestForSession & req return true; } -nuraft::ptr KeeperStateMachine::commit_ext(const ext_op_params & params) +nuraft::ptr KeeperStateMachine::commit(const uint64_t log_idx, nuraft::buffer & data) { - auto request_for_session = parseRequest(*params.data); + auto request_for_session = parseRequest(data); if (!request_for_session.zxid) - request_for_session.zxid = params.log_idx; + request_for_session.zxid = log_idx; /// Special processing of session_id request if (request_for_session.request->getOpNum() == Coordination::OpNum::SessionID) @@ -276,9 +272,8 @@ nuraft::ptr KeeperStateMachine::commit_ext(const ext_op_params & assertDigest(*request_for_session.digest, storage->getNodesDigest(true), *request_for_session.request, true); } - last_committed_idx = params.log_idx; - commit_callback(request_for_session, params.log_term, params.log_idx); ProfileEvents::increment(ProfileEvents::KeeperCommits); + last_committed_idx = log_idx; return nullptr; } @@ -311,7 +306,6 @@ bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s) ProfileEvents::increment(ProfileEvents::KeeperSnapshotApplys); last_committed_idx = s.get_last_log_idx(); - apply_snapshot_callback(s.get_last_log_term(), s.get_last_log_idx()); return true; } @@ -326,10 +320,6 @@ void KeeperStateMachine::commit_config(const uint64_t /* log_idx */, nuraft::ptr void KeeperStateMachine::rollback(uint64_t log_idx, nuraft::buffer & data) { auto request_for_session = parseRequest(data); - - if (request_for_session.request->getOpNum() == Coordination::OpNum::SessionID) - return; - // If we received a log from an older node, use the log_idx as the zxid // log_idx will always be larger or equal to the zxid so we can safely do this // (log_idx is increased for all logs, while zxid is only increased for requests) diff --git a/src/Coordination/KeeperStateMachine.h b/src/Coordination/KeeperStateMachine.h index f44dfd503b0..fbd4fdc5ac2 100644 --- a/src/Coordination/KeeperStateMachine.h +++ b/src/Coordination/KeeperStateMachine.h @@ -20,18 +20,13 @@ using SnapshotsQueue = ConcurrentBoundedQueue; class KeeperStateMachine : public nuraft::state_machine { public: - using CommitCallback = std::function; - using ApplySnapshotCallback = std::function; - KeeperStateMachine( ResponsesQueue & responses_queue_, SnapshotsQueue & snapshots_queue_, const std::string & snapshots_path_, const CoordinationSettingsPtr & coordination_settings_, const KeeperContextPtr & keeper_context_, - const std::string & superdigest_ = "", - CommitCallback commit_callback_ = [](const KeeperStorage::RequestForSession &, uint64_t, uint64_t){}, - ApplySnapshotCallback apply_snapshot_callback_ = [](uint64_t, uint64_t){}); + const std::string & superdigest_ = ""); /// Read state from the latest snapshot void init(); @@ -42,7 +37,7 @@ public: nuraft::ptr pre_commit(uint64_t log_idx, nuraft::buffer & data) override; - nuraft::ptr commit_ext(const ext_op_params & params) override; /// NOLINT + nuraft::ptr commit(const uint64_t log_idx, nuraft::buffer & data) override; /// NOLINT /// Save new cluster config to our snapshot (copy of the config stored in StateManager) void commit_config(const uint64_t log_idx, nuraft::ptr & new_conf) override; /// NOLINT @@ -150,11 +145,6 @@ private: /// Special part of ACL system -- superdigest specified in server config. const std::string superdigest; - /// call when a request is committed - const CommitCallback commit_callback; - /// call when snapshot is applied - const ApplySnapshotCallback apply_snapshot_callback; - KeeperContextPtr keeper_context; }; diff --git a/src/Coordination/tests/gtest_coordination.cpp b/src/Coordination/tests/gtest_coordination.cpp index b98fd0e56e8..fa6bfca7c7a 100644 --- a/src/Coordination/tests/gtest_coordination.cpp +++ b/src/Coordination/tests/gtest_coordination.cpp @@ -1330,9 +1330,8 @@ void testLogAndStateMachine(Coordination::CoordinationSettingsPtr settings, uint changelog.append(entry); changelog.end_of_append_batch(0, 0); - auto entry_buf = changelog.entry_at(i)->get_buf_ptr(); - state_machine->pre_commit(i, *entry_buf); - state_machine->commit_ext(nuraft::state_machine::ext_op_params{i, entry_buf}); + state_machine->pre_commit(i, changelog.entry_at(i)->get_buf()); + state_machine->commit(i, changelog.entry_at(i)->get_buf()); bool snapshot_created = false; if (i % settings->snapshot_distance == 0) { @@ -1376,9 +1375,8 @@ void testLogAndStateMachine(Coordination::CoordinationSettingsPtr settings, uint for (size_t i = restore_machine->last_commit_index() + 1; i < restore_changelog.next_slot(); ++i) { - auto entry = changelog.entry_at(i)->get_buf_ptr(); - restore_machine->pre_commit(i, *entry); - restore_machine->commit_ext(nuraft::state_machine::ext_op_params{i, entry}); + restore_machine->pre_commit(i, changelog.entry_at(i)->get_buf()); + restore_machine->commit(i, changelog.entry_at(i)->get_buf()); } auto & source_storage = state_machine->getStorage(); @@ -1479,18 +1477,18 @@ TEST_P(CoordinationTest, TestEphemeralNodeRemove) std::shared_ptr request_c = std::make_shared(); request_c->path = "/hello"; request_c->is_ephemeral = true; - auto entry_c = getLogEntryFromZKRequest(0, 1, state_machine->getNextZxid(), request_c)->get_buf_ptr(); - state_machine->pre_commit(1, *entry_c); - state_machine->commit_ext(nuraft::state_machine::ext_op_params{1, entry_c}); + auto entry_c = getLogEntryFromZKRequest(0, 1, state_machine->getNextZxid(), request_c); + state_machine->pre_commit(1, entry_c->get_buf()); + state_machine->commit(1, entry_c->get_buf()); const auto & storage = state_machine->getStorage(); EXPECT_EQ(storage.ephemerals.size(), 1); std::shared_ptr request_d = std::make_shared(); request_d->path = "/hello"; /// Delete from other session - auto entry_d = getLogEntryFromZKRequest(0, 2, state_machine->getNextZxid(), request_d)->get_buf_ptr(); - state_machine->pre_commit(2, *entry_d); - state_machine->commit_ext(nuraft::state_machine::ext_op_params{2, entry_d}); + auto entry_d = getLogEntryFromZKRequest(0, 2, state_machine->getNextZxid(), request_d); + state_machine->pre_commit(2, entry_d->get_buf()); + state_machine->commit(2, entry_d->get_buf()); EXPECT_EQ(storage.ephemerals.size(), 0); } diff --git a/src/Core/Block.cpp b/src/Core/Block.cpp index 3b7595eb886..33691e83d27 100644 --- a/src/Core/Block.cpp +++ b/src/Core/Block.cpp @@ -623,6 +623,7 @@ NamesAndTypesList Block::getNamesAndTypesList() const NamesAndTypes Block::getNamesAndTypes() const { NamesAndTypes res; + res.reserve(columns()); for (const auto & elem : data) res.emplace_back(elem.name, elem.type); diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 86fccf45a8d..870647b3254 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -136,6 +136,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value) M(Bool, distributed_aggregation_memory_efficient, true, "Is the memory-saving mode of distributed aggregation enabled.", 0) \ M(UInt64, aggregation_memory_efficient_merge_threads, 0, "Number of threads to use for merge intermediate aggregation results in memory efficient mode. When bigger, then more memory is consumed. 0 means - same as 'max_threads'.", 0) \ M(Bool, enable_positional_arguments, true, "Enable positional arguments in ORDER BY, GROUP BY and LIMIT BY", 0) \ + M(Bool, enable_extended_results_for_datetime_functions, false, "Enable date functions like toLastDayOfMonth return Date32 results (instead of Date results) for Date32/DateTime64 arguments.", 0) \ \ M(Bool, group_by_use_nulls, false, "Treat columns mentioned in ROLLUP, CUBE or GROUPING SETS as Nullable", 0) \ \ diff --git a/src/DataTypes/ObjectUtils.cpp b/src/DataTypes/ObjectUtils.cpp index c14b9b579ea..e5d8d05acb5 100644 --- a/src/DataTypes/ObjectUtils.cpp +++ b/src/DataTypes/ObjectUtils.cpp @@ -453,15 +453,19 @@ using SubcolumnsTreeWithColumns = SubcolumnsTree; using Node = SubcolumnsTreeWithColumns::Node; /// Creates data type and column from tree of subcolumns. -ColumnWithTypeAndDimensions createTypeFromNode(const Node * node) +ColumnWithTypeAndDimensions createTypeFromNode(const Node & node) { auto collect_tuple_elemets = [](const auto & children) { + if (children.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot create type from empty Tuple or Nested node"); + std::vector> tuple_elements; tuple_elements.reserve(children.size()); for (const auto & [name, child] : children) { - auto column = createTypeFromNode(child.get()); + assert(child); + auto column = createTypeFromNode(*child); tuple_elements.emplace_back(name, std::move(column)); } @@ -475,13 +479,13 @@ ColumnWithTypeAndDimensions createTypeFromNode(const Node * node) return std::make_tuple(std::move(tuple_names), std::move(tuple_columns)); }; - if (node->kind == Node::SCALAR) + if (node.kind == Node::SCALAR) { - return node->data; + return node.data; } - else if (node->kind == Node::NESTED) + else if (node.kind == Node::NESTED) { - auto [tuple_names, tuple_columns] = collect_tuple_elemets(node->children); + auto [tuple_names, tuple_columns] = collect_tuple_elemets(node.children); Columns offsets_columns; offsets_columns.reserve(tuple_columns[0].array_dimensions + 1); @@ -492,7 +496,7 @@ ColumnWithTypeAndDimensions createTypeFromNode(const Node * node) /// `k1 Array(Nested(k2 Int, k3 Int))` and k1 is marked as Nested /// and `k2` and `k3` has anonymous_array_level = 1 in that case. - const auto & current_array = assert_cast(*node->data.column); + const auto & current_array = assert_cast(*node.data.column); offsets_columns.push_back(current_array.getOffsetsPtr()); auto first_column = tuple_columns[0].column; @@ -529,7 +533,7 @@ ColumnWithTypeAndDimensions createTypeFromNode(const Node * node) } else { - auto [tuple_names, tuple_columns] = collect_tuple_elemets(node->children); + auto [tuple_names, tuple_columns] = collect_tuple_elemets(node.children); size_t num_elements = tuple_columns.size(); Columns tuple_elements_columns(num_elements); @@ -587,6 +591,15 @@ std::pair unflattenObjectToTuple(const ColumnObject & co { const auto & subcolumns = column.getSubcolumns(); + if (subcolumns.empty()) + { + auto type = std::make_shared( + DataTypes{std::make_shared()}, + Names{ColumnObject::COLUMN_NAME_DUMMY}); + + return {type->createColumn()->cloneResized(column.size()), type}; + } + PathsInData paths; DataTypes types; Columns columns; @@ -613,6 +626,9 @@ std::pair unflattenTuple( assert(paths.size() == tuple_types.size()); assert(paths.size() == tuple_columns.size()); + if (paths.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot unflatten empty Tuple"); + /// We add all paths to the subcolumn tree and then create a type from it. /// The tree stores column, type and number of array dimensions /// for each intermediate node. diff --git a/src/DataTypes/Serializations/SubcolumnsTree.h b/src/DataTypes/Serializations/SubcolumnsTree.h index f66f557bc8f..fda45e1e9a2 100644 --- a/src/DataTypes/Serializations/SubcolumnsTree.h +++ b/src/DataTypes/Serializations/SubcolumnsTree.h @@ -51,6 +51,8 @@ public: using NodeKind = typename Node::Kind; using NodePtr = std::shared_ptr; + SubcolumnsTree() : root(std::make_shared(Node::TUPLE)) {} + /// Add a leaf without any data in other nodes. bool add(const PathInData & path, const NodeData & leaf_data) { @@ -73,13 +75,9 @@ public: bool add(const PathInData & path, const NodeCreator & node_creator) { const auto & parts = path.getParts(); - if (parts.empty()) return false; - if (!root) - root = std::make_shared(Node::TUPLE); - Node * current_node = root.get(); for (size_t i = 0; i < parts.size() - 1; ++i) { @@ -166,13 +164,13 @@ public: return node; } - bool empty() const { return root == nullptr; } + bool empty() const { return root->children.empty(); } size_t size() const { return leaves.size(); } using Nodes = std::vector; const Nodes & getLeaves() const { return leaves; } - const Node * getRoot() const { return root.get(); } + const Node & getRoot() const { return *root; } using iterator = typename Nodes::iterator; using const_iterator = typename Nodes::const_iterator; @@ -186,11 +184,11 @@ public: private: const Node * findImpl(const PathInData & path, bool find_exact) const { - if (!root) + if (empty()) return nullptr; const auto & parts = path.getParts(); - const Node * current_node = root.get(); + const auto * current_node = root.get(); for (const auto & part : parts) { diff --git a/src/Functions/CustomWeekTransforms.h b/src/Functions/CustomWeekTransforms.h index 3378aec02d5..5fa51d5f5e0 100644 --- a/src/Functions/CustomWeekTransforms.h +++ b/src/Functions/CustomWeekTransforms.h @@ -82,6 +82,14 @@ struct ToStartOfWeekImpl { return time_zone.toFirstDayNumOfWeek(DayNum(d), week_mode); } + static inline Int64 execute_extended_result(Int64 t, UInt8 week_mode, const DateLUTImpl & time_zone) + { + return time_zone.toFirstDayNumOfWeek(time_zone.toDayNum(t), week_mode); + } + static inline Int32 execute_extended_result(Int32 d, UInt8 week_mode, const DateLUTImpl & time_zone) + { + return time_zone.toFirstDayNumOfWeek(ExtendedDayNum(d), week_mode); + } using FactorTransform = ZeroTransform; }; @@ -115,7 +123,7 @@ struct ToWeekImpl using FactorTransform = ToStartOfYearImpl; }; -template +template struct WeekTransformer { explicit WeekTransformer(Transform transform_) @@ -130,7 +138,10 @@ struct WeekTransformer vec_to.resize(size); for (size_t i = 0; i < size; ++i) - vec_to[i] = transform.execute(vec_from[i], week_mode, time_zone); + if constexpr (is_extended_result) + vec_to[i] = transform.execute_extended_result(vec_from[i], week_mode, time_zone); + else + vec_to[i] = transform.execute(vec_from[i], week_mode, time_zone); } private: @@ -138,13 +149,13 @@ private: }; -template +template struct CustomWeekTransformImpl { template static ColumnPtr execute(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t /*input_rows_count*/, Transform transform = {}) { - const auto op = WeekTransformer{std::move(transform)}; + const auto op = WeekTransformer{std::move(transform)}; UInt8 week_mode = DEFAULT_WEEK_MODE; if (arguments.size() > 1) diff --git a/src/Functions/DateTimeTransforms.h b/src/Functions/DateTimeTransforms.h index 66d57f2463f..fbe8e4bfcfe 100644 --- a/src/Functions/DateTimeTransforms.h +++ b/src/Functions/DateTimeTransforms.h @@ -161,7 +161,14 @@ struct ToMondayImpl { return time_zone.toFirstDayNumOfWeek(DayNum(d)); } - + static inline Int64 execute_extended_result(Int64 t, const DateLUTImpl & time_zone) + { + return time_zone.toFirstDayNumOfWeek(time_zone.toDayNum(t)); + } + static inline Int32 execute_extended_result(Int32 d, const DateLUTImpl & time_zone) + { + return time_zone.toFirstDayNumOfWeek(ExtendedDayNum(d)); + } using FactorTransform = ZeroTransform; }; @@ -185,6 +192,14 @@ struct ToStartOfMonthImpl { return time_zone.toFirstDayNumOfMonth(DayNum(d)); } + static inline Int64 execute_extended_result(Int64 t, const DateLUTImpl & time_zone) + { + return time_zone.toFirstDayNumOfMonth(time_zone.toDayNum(t)); + } + static inline Int32 execute_extended_result(Int32 d, const DateLUTImpl & time_zone) + { + return time_zone.toFirstDayNumOfMonth(ExtendedDayNum(d)); + } using FactorTransform = ZeroTransform; }; @@ -218,7 +233,14 @@ struct ToLastDayOfMonthImpl /// 0xFFF9 is Int value for 2149-05-31 -- the last day where we can actually find LastDayOfMonth. This will also be the return value. return time_zone.toLastDayNumOfMonth(DayNum(std::min(d, UInt16(0xFFF9)))); } - + static inline Int64 execute_extended_result(Int64 t, const DateLUTImpl & time_zone) + { + return time_zone.toLastDayNumOfMonth(time_zone.toDayNum(t)); + } + static inline Int32 execute_extended_result(Int32 d, const DateLUTImpl & time_zone) + { + return time_zone.toLastDayNumOfMonth(ExtendedDayNum(d)); + } using FactorTransform = ZeroTransform; }; @@ -242,7 +264,14 @@ struct ToStartOfQuarterImpl { return time_zone.toFirstDayNumOfQuarter(DayNum(d)); } - + static inline Int64 execute_extended_result(Int64 t, const DateLUTImpl & time_zone) + { + return time_zone.toFirstDayNumOfQuarter(time_zone.toDayNum(t)); + } + static inline Int32 execute_extended_result(Int32 d, const DateLUTImpl & time_zone) + { + return time_zone.toFirstDayNumOfQuarter(ExtendedDayNum(d)); + } using FactorTransform = ZeroTransform; }; @@ -266,6 +295,14 @@ struct ToStartOfYearImpl { return time_zone.toFirstDayNumOfYear(DayNum(d)); } + static inline Int64 execute_extended_result(Int64 t, const DateLUTImpl & time_zone) + { + return time_zone.toFirstDayNumOfYear(time_zone.toDayNum(t)); + } + static inline Int32 execute_extended_result(Int32 d, const DateLUTImpl & time_zone) + { + return time_zone.toFirstDayNumOfYear(ExtendedDayNum(d)); + } using FactorTransform = ZeroTransform; }; @@ -893,7 +930,7 @@ struct ToStartOfISOYearImpl static inline UInt16 execute(Int64 t, const DateLUTImpl & time_zone) { - return time_zone.toFirstDayNumOfISOYear(time_zone.toDayNum(t)); + return t < 0 ? 0 : time_zone.toFirstDayNumOfISOYear(ExtendedDayNum(std::min(Int32(time_zone.toDayNum(t)), Int32(DATE_LUT_MAX_DAY_NUM)))); } static inline UInt16 execute(UInt32 t, const DateLUTImpl & time_zone) { @@ -901,12 +938,20 @@ struct ToStartOfISOYearImpl } static inline UInt16 execute(Int32 d, const DateLUTImpl & time_zone) { - return time_zone.toFirstDayNumOfISOYear(ExtendedDayNum(d)); + return d < 0 ? 0 : time_zone.toFirstDayNumOfISOYear(ExtendedDayNum(std::min(d, Int32(DATE_LUT_MAX_DAY_NUM)))); } static inline UInt16 execute(UInt16 d, const DateLUTImpl & time_zone) { return time_zone.toFirstDayNumOfISOYear(DayNum(d)); } + static inline Int64 execute_extended_result(Int64 t, const DateLUTImpl & time_zone) + { + return time_zone.toFirstDayNumOfISOYear(time_zone.toDayNum(t)); + } + static inline Int32 execute_extended_result(Int32 d, const DateLUTImpl & time_zone) + { + return time_zone.toFirstDayNumOfISOYear(ExtendedDayNum(d)); + } using FactorTransform = ZeroTransform; }; @@ -1201,7 +1246,7 @@ struct ToYYYYMMDDhhmmssImpl }; -template +template struct Transformer { template @@ -1211,18 +1256,21 @@ struct Transformer vec_to.resize(size); for (size_t i = 0; i < size; ++i) - vec_to[i] = transform.execute(vec_from[i], time_zone); + if constexpr (is_extended_result) + vec_to[i] = transform.execute_extended_result(vec_from[i], time_zone); + else + vec_to[i] = transform.execute(vec_from[i], time_zone); } }; -template +template struct DateTimeTransformImpl { static ColumnPtr execute( const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t /*input_rows_count*/, const Transform & transform = {}) { - using Op = Transformer; + using Op = Transformer; const ColumnPtr source_col = arguments[0].column; if (const auto * sources = checkAndGetColumn(source_col.get())) diff --git a/src/Functions/FunctionCustomWeekToDateOrDate32.h b/src/Functions/FunctionCustomWeekToDateOrDate32.h new file mode 100644 index 00000000000..0b91fbb3bbe --- /dev/null +++ b/src/Functions/FunctionCustomWeekToDateOrDate32.h @@ -0,0 +1,78 @@ +#pragma once +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int ILLEGAL_TYPE_OF_ARGUMENT; +} + +template +class FunctionCustomWeekToDateOrDate32 : public IFunctionCustomWeek, WithContext +{ +public: + const bool enable_extended_results_for_datetime_functions = false; + + static FunctionPtr create(ContextPtr context_) + { + return std::make_shared(context_); + } + + explicit FunctionCustomWeekToDateOrDate32(ContextPtr context_) + : WithContext(context_) + , enable_extended_results_for_datetime_functions(context_->getSettingsRef().enable_extended_results_for_datetime_functions) + { + } + + DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override + { + this->checkArguments(arguments, /*is_result_type_date_or_date32*/ true); + + const IDataType * from_type = arguments[0].type.get(); + WhichDataType which(from_type); + if ((which.isDate32() || which.isDateTime64()) && enable_extended_results_for_datetime_functions) + return std::make_shared(); + else + return std::make_shared(); + } + + ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const override + { + const IDataType * from_type = arguments[0].type.get(); + WhichDataType which(from_type); + + if (which.isDate()) + return CustomWeekTransformImpl::execute( + arguments, result_type, input_rows_count, Transform{}); + else if (which.isDate32()) + if (enable_extended_results_for_datetime_functions) + return CustomWeekTransformImpl::execute( + arguments, result_type, input_rows_count, Transform{}); + else + return CustomWeekTransformImpl::execute( + arguments, result_type, input_rows_count, Transform{}); + else if (which.isDateTime()) + return CustomWeekTransformImpl::execute( + arguments, result_type, input_rows_count, Transform{}); + else if (which.isDateTime64()) + { + if (enable_extended_results_for_datetime_functions) + return CustomWeekTransformImpl::execute( + arguments, result_type, input_rows_count, + TransformDateTime64{assert_cast(from_type)->getScale()}); + else + return CustomWeekTransformImpl::execute( + arguments, result_type, input_rows_count, + TransformDateTime64{assert_cast(from_type)->getScale()}); + } + else + throw Exception( + "Illegal type " + arguments[0].type->getName() + " of argument of function " + this->getName(), + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + } + +}; + +} diff --git a/src/Functions/FunctionCustomWeekToSomething.h b/src/Functions/FunctionCustomWeekToSomething.h index 8a0f474a7e8..eb65d562221 100644 --- a/src/Functions/FunctionCustomWeekToSomething.h +++ b/src/Functions/FunctionCustomWeekToSomething.h @@ -1,14 +1,5 @@ #pragma once -#include -#include -#include -#include -#include -#include -#include -#include -#include - +#include namespace DB { @@ -16,82 +7,23 @@ namespace DB namespace ErrorCodes { extern const int ILLEGAL_TYPE_OF_ARGUMENT; - extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; } /// See CustomWeekTransforms.h template -class FunctionCustomWeekToSomething : public IFunction +class FunctionCustomWeekToSomething : public IFunctionCustomWeek { public: - static constexpr auto name = Transform::name; static FunctionPtr create(ContextPtr) { return std::make_shared(); } - String getName() const override { return name; } - - bool isVariadic() const override { return true; } - bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; } - size_t getNumberOfArguments() const override { return 0; } - DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override { - if (arguments.size() == 1) - { - if (!isDate(arguments[0].type) && !isDate32(arguments[0].type) && !isDateTime(arguments[0].type) && !isDateTime64(arguments[0].type)) - throw Exception( - "Illegal type " + arguments[0].type->getName() + " of argument of function " + getName() - + ". Must be Date, Date32, DateTime or DateTime64.", - ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); - } - else if (arguments.size() == 2) - { - if (!isDate(arguments[0].type) && !isDate32(arguments[0].type) && !isDateTime(arguments[0].type) && !isDateTime64(arguments[0].type)) - throw Exception( - "Illegal type " + arguments[0].type->getName() + " of 1st argument of function " + getName() - + ". Must be Date, Date32, DateTime or DateTime64.", - ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); - if (!isUInt8(arguments[1].type)) - throw Exception( - "Illegal type of 2nd (optional) argument of function " + getName() - + ". Must be constant UInt8 (week mode).", - ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); - } - else if (arguments.size() == 3) - { - if (!isDate(arguments[0].type) && !isDate32(arguments[0].type) && !isDateTime(arguments[0].type) && !isDateTime64(arguments[0].type)) - throw Exception( - "Illegal type " + arguments[0].type->getName() + " of argument of function " + getName() - + ". Must be Date, Date32, DateTime or DateTime64", - ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); - if (!isUInt8(arguments[1].type)) - throw Exception( - "Illegal type of 2nd (optional) argument of function " + getName() - + ". Must be constant UInt8 (week mode).", - ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); - if (!isString(arguments[2].type)) - throw Exception( - "Illegal type of 3rd (optional) argument of function " + getName() - + ". Must be constant string (timezone name).", - ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); - if ((isDate(arguments[0].type) || isDate32(arguments[0].type)) - && (std::is_same_v || std::is_same_v)) - throw Exception( - "The timezone argument of function " + getName() + " is allowed only when the 1st argument is DateTime or DateTime64.", - ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); - } - else - throw Exception( - "Number of arguments for function " + getName() + " doesn't match: passed " + toString(arguments.size()) - + ", expected 1, 2 or 3.", - ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + this->checkArguments(arguments); return std::make_shared(); } - bool useDefaultImplementationForConstants() const override { return true; } - ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {1, 2}; } - ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const override { const IDataType * from_type = arguments[0].type.get(); @@ -114,44 +46,10 @@ public: } else throw Exception( - "Illegal type " + arguments[0].type->getName() + " of argument of function " + getName(), + "Illegal type " + arguments[0].type->getName() + " of argument of function " + this->getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); } - - bool hasInformationAboutMonotonicity() const override { return true; } - - Monotonicity getMonotonicityForRange(const IDataType & type, const Field & left, const Field & right) const override - { - if constexpr (std::is_same_v) - return { .is_monotonic = true, .is_always_monotonic = true }; - - const IFunction::Monotonicity is_monotonic = { .is_monotonic = true }; - const IFunction::Monotonicity is_not_monotonic; - - /// This method is called only if the function has one argument. Therefore, we do not care about the non-local time zone. - const DateLUTImpl & date_lut = DateLUT::instance(); - - if (left.isNull() || right.isNull()) - return {}; - - /// The function is monotonous on the [left, right] segment, if the factor transformation returns the same values for them. - - if (checkAndGetDataType(&type)) - { - return Transform::FactorTransform::execute(UInt16(left.get()), date_lut) - == Transform::FactorTransform::execute(UInt16(right.get()), date_lut) - ? is_monotonic - : is_not_monotonic; - } - else - { - return Transform::FactorTransform::execute(UInt32(left.get()), date_lut) - == Transform::FactorTransform::execute(UInt32(right.get()), date_lut) - ? is_monotonic - : is_not_monotonic; - } - } }; } diff --git a/src/Functions/FunctionDateOrDateTimeToDateOrDate32.h b/src/Functions/FunctionDateOrDateTimeToDateOrDate32.h new file mode 100644 index 00000000000..3ff90cb57fb --- /dev/null +++ b/src/Functions/FunctionDateOrDateTimeToDateOrDate32.h @@ -0,0 +1,81 @@ +#pragma once +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int ILLEGAL_TYPE_OF_ARGUMENT; +} + +template +class FunctionDateOrDateTimeToDateOrDate32 : public IFunctionDateOrDateTime, WithContext +{ +public: + const bool enable_extended_results_for_datetime_functions = false; + + static FunctionPtr create(ContextPtr context_) + { + return std::make_shared(context_); + } + + explicit FunctionDateOrDateTimeToDateOrDate32(ContextPtr context_) + : WithContext(context_) + , enable_extended_results_for_datetime_functions(context_->getSettingsRef().enable_extended_results_for_datetime_functions) + { + } + + DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override + { + this->checkArguments(arguments, /*is_result_type_date_or_date32*/ true); + + const IDataType * from_type = arguments[0].type.get(); + WhichDataType which(from_type); + + /// If the time zone is specified but empty, throw an exception. + /// only validate the time_zone part if the number of arguments is 2. + if ((which.isDateTime() || which.isDateTime64()) && arguments.size() == 2 + && extractTimeZoneNameFromFunctionArguments(arguments, 1, 0).empty()) + throw Exception( + "Function " + this->getName() + " supports a 2nd argument (optional) that must be non-empty and be a valid time zone", + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + + if ((which.isDate32() || which.isDateTime64()) && enable_extended_results_for_datetime_functions) + return std::make_shared(); + else + return std::make_shared(); + } + + ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const override + { + const IDataType * from_type = arguments[0].type.get(); + WhichDataType which(from_type); + + if (which.isDate()) + return DateTimeTransformImpl::execute(arguments, result_type, input_rows_count); + else if (which.isDate32()) + if (enable_extended_results_for_datetime_functions) + return DateTimeTransformImpl::execute(arguments, result_type, input_rows_count); + else + return DateTimeTransformImpl::execute(arguments, result_type, input_rows_count); + else if (which.isDateTime()) + return DateTimeTransformImpl::execute(arguments, result_type, input_rows_count); + else if (which.isDateTime64()) + { + const auto scale = static_cast(from_type)->getScale(); + + const TransformDateTime64 transformer(scale); + if (enable_extended_results_for_datetime_functions) + return DateTimeTransformImpl::execute(arguments, result_type, input_rows_count, transformer); + else + return DateTimeTransformImpl::execute(arguments, result_type, input_rows_count, transformer); + } + else + throw Exception("Illegal type " + arguments[0].type->getName() + " of argument of function " + this->getName(), + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + } + +}; + +} diff --git a/src/Functions/FunctionDateOrDateTimeToSomething.h b/src/Functions/FunctionDateOrDateTimeToSomething.h index d734c7f87c1..5c1c54c1b84 100644 --- a/src/Functions/FunctionDateOrDateTimeToSomething.h +++ b/src/Functions/FunctionDateOrDateTimeToSomething.h @@ -1,14 +1,5 @@ #pragma once -#include -#include -#include -#include -#include -#include -#include -#include -#include - +#include namespace DB { @@ -16,59 +7,18 @@ namespace DB namespace ErrorCodes { extern const int ILLEGAL_TYPE_OF_ARGUMENT; - extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; } /// See DateTimeTransforms.h template -class FunctionDateOrDateTimeToSomething : public IFunction +class FunctionDateOrDateTimeToSomething : public IFunctionDateOrDateTime { public: - static constexpr auto name = Transform::name; static FunctionPtr create(ContextPtr) { return std::make_shared(); } - String getName() const override - { - return name; - } - - bool isVariadic() const override { return true; } - bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; } - size_t getNumberOfArguments() const override { return 0; } - DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override { - if (arguments.size() == 1) - { - if (!isDateOrDate32(arguments[0].type) && !isDateTime(arguments[0].type) && !isDateTime64(arguments[0].type)) - throw Exception( - "Illegal type " + arguments[0].type->getName() + " of argument of function " + getName() - + ". Should be a date or a date with time", - ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); - } - else if (arguments.size() == 2) - { - if (!isDateOrDate32(arguments[0].type) && !isDateTime(arguments[0].type) && !isDateTime64(arguments[0].type)) - throw Exception( - "Illegal type " + arguments[0].type->getName() + " of argument of function " + getName() - + ". Should be a date or a date with time", - ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); - if (!isString(arguments[1].type)) - throw Exception( - "Function " + getName() + " supports 1 or 2 arguments. The 1st argument " - "must be of type Date or DateTime. The 2nd argument (optional) must be " - "a constant string with timezone name", - ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); - if ((isDate(arguments[0].type) || isDate32(arguments[0].type)) && (std::is_same_v || std::is_same_v)) - throw Exception( - "The timezone argument of function " + getName() + " is allowed only when the 1st argument has the type DateTime", - ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); - } - else - throw Exception( - "Number of arguments for function " + getName() + " doesn't match: passed " + toString(arguments.size()) - + ", should be 1 or 2", - ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + this->checkArguments(arguments, (std::is_same_v || std::is_same_v)); /// For DateTime, if time zone is specified, attach it to type. /// If the time zone is specified but empty, throw an exception. @@ -79,7 +29,7 @@ public: /// to accommodate functions like toStartOfDay(today()), toStartOfDay(yesterday()) etc. if (arguments.size() == 2 && time_zone.empty()) throw Exception( - "Function " + getName() + " supports a 2nd argument (optional) that must be non-empty and be a valid time zone", + "Function " + this->getName() + " supports a 2nd argument (optional) that must be non-empty and be a valid time zone", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); return std::make_shared(time_zone); } @@ -109,9 +59,6 @@ public: return std::make_shared(); } - bool useDefaultImplementationForConstants() const override { return true; } - ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {1}; } - ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const override { const IDataType * from_type = arguments[0].type.get(); @@ -131,51 +78,10 @@ public: return DateTimeTransformImpl::execute(arguments, result_type, input_rows_count, transformer); } else - throw Exception("Illegal type " + arguments[0].type->getName() + " of argument of function " + getName(), + throw Exception("Illegal type " + arguments[0].type->getName() + " of argument of function " + this->getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); } - bool hasInformationAboutMonotonicity() const override - { - return true; - } - - Monotonicity getMonotonicityForRange(const IDataType & type, const Field & left, const Field & right) const override - { - if constexpr (std::is_same_v) - return { .is_monotonic = true, .is_always_monotonic = true }; - - const IFunction::Monotonicity is_monotonic = { .is_monotonic = true }; - const IFunction::Monotonicity is_not_monotonic; - - const DateLUTImpl * date_lut = &DateLUT::instance(); - if (const auto * timezone = dynamic_cast(&type)) - date_lut = &timezone->getTimeZone(); - - if (left.isNull() || right.isNull()) - return is_not_monotonic; - - /// The function is monotonous on the [left, right] segment, if the factor transformation returns the same values for them. - - if (checkAndGetDataType(&type)) - { - return Transform::FactorTransform::execute(UInt16(left.get()), *date_lut) - == Transform::FactorTransform::execute(UInt16(right.get()), *date_lut) - ? is_monotonic : is_not_monotonic; - } - else if (checkAndGetDataType(&type)) - { - return Transform::FactorTransform::execute(Int32(left.get()), *date_lut) - == Transform::FactorTransform::execute(Int32(right.get()), *date_lut) - ? is_monotonic : is_not_monotonic; - } - else - { - return Transform::FactorTransform::execute(UInt32(left.get()), *date_lut) - == Transform::FactorTransform::execute(UInt32(right.get()), *date_lut) - ? is_monotonic : is_not_monotonic; - } - } }; } diff --git a/src/Functions/FunctionsEmbeddedDictionaries.h b/src/Functions/FunctionsEmbeddedDictionaries.h index 20be3ee3cce..aa2144d271f 100644 --- a/src/Functions/FunctionsEmbeddedDictionaries.h +++ b/src/Functions/FunctionsEmbeddedDictionaries.h @@ -649,7 +649,7 @@ public: for (unsigned int region_id : region_ids) { const StringRef & name_ref = dict.getRegionName(region_id, language); - col_to->insertDataWithTerminatingZero(name_ref.data, name_ref.size + 1); + col_to->insertData(name_ref.data, name_ref.size); } return col_to; diff --git a/src/Functions/IFunctionCustomWeek.h b/src/Functions/IFunctionCustomWeek.h new file mode 100644 index 00000000000..1bc4e44655a --- /dev/null +++ b/src/Functions/IFunctionCustomWeek.h @@ -0,0 +1,122 @@ +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int ILLEGAL_TYPE_OF_ARGUMENT; + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; +} + +template +class IFunctionCustomWeek : public IFunction +{ +public: + static constexpr auto name = Transform::name; + String getName() const override { return name; } + bool isVariadic() const override { return true; } + bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; } + size_t getNumberOfArguments() const override { return 0; } + bool useDefaultImplementationForConstants() const override { return true; } + ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {1, 2}; } + + bool hasInformationAboutMonotonicity() const override { return true; } + + Monotonicity getMonotonicityForRange(const IDataType & type, const Field & left, const Field & right) const override + { + if constexpr (std::is_same_v) + return {.is_monotonic = true, .is_always_monotonic = true}; + + const IFunction::Monotonicity is_monotonic = {.is_monotonic = true}; + const IFunction::Monotonicity is_not_monotonic; + + /// This method is called only if the function has one argument. Therefore, we do not care about the non-local time zone. + const DateLUTImpl & date_lut = DateLUT::instance(); + + if (left.isNull() || right.isNull()) + return {}; + + /// The function is monotonous on the [left, right] segment, if the factor transformation returns the same values for them. + + if (checkAndGetDataType(&type)) + { + return Transform::FactorTransform::execute(UInt16(left.get()), date_lut) + == Transform::FactorTransform::execute(UInt16(right.get()), date_lut) + ? is_monotonic + : is_not_monotonic; + } + else + { + return Transform::FactorTransform::execute(UInt32(left.get()), date_lut) + == Transform::FactorTransform::execute(UInt32(right.get()), date_lut) + ? is_monotonic + : is_not_monotonic; + } + } + +protected: + void checkArguments(const ColumnsWithTypeAndName & arguments, bool is_result_type_date_or_date32 = false) const + { + if (arguments.size() == 1) + { + if (!isDate(arguments[0].type) && !isDate32(arguments[0].type) && !isDateTime(arguments[0].type) && !isDateTime64(arguments[0].type)) + throw Exception( + "Illegal type " + arguments[0].type->getName() + " of argument of function " + getName() + + ". Must be Date, Date32, DateTime or DateTime64.", + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + } + else if (arguments.size() == 2) + { + if (!isDate(arguments[0].type) && !isDate32(arguments[0].type) && !isDateTime(arguments[0].type) && !isDateTime64(arguments[0].type)) + throw Exception( + "Illegal type " + arguments[0].type->getName() + " of 1st argument of function " + getName() + + ". Must be Date, Date32, DateTime or DateTime64.", + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + if (!isUInt8(arguments[1].type)) + throw Exception( + "Illegal type of 2nd (optional) argument of function " + getName() + + ". Must be constant UInt8 (week mode).", + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + } + else if (arguments.size() == 3) + { + if (!isDate(arguments[0].type) && !isDate32(arguments[0].type) && !isDateTime(arguments[0].type) && !isDateTime64(arguments[0].type)) + throw Exception( + "Illegal type " + arguments[0].type->getName() + " of argument of function " + getName() + + ". Must be Date, Date32, DateTime or DateTime64", + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + if (!isUInt8(arguments[1].type)) + throw Exception( + "Illegal type of 2nd (optional) argument of function " + getName() + + ". Must be constant UInt8 (week mode).", + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + if (!isString(arguments[2].type)) + throw Exception( + "Illegal type of 3rd (optional) argument of function " + getName() + + ". Must be constant string (timezone name).", + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + if ((isDate(arguments[0].type) || isDate32(arguments[0].type)) && is_result_type_date_or_date32) + throw Exception( + "The timezone argument of function " + getName() + " is allowed only when the 1st argument is DateTime or DateTime64.", + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + } + else + throw Exception( + "Number of arguments for function " + getName() + " doesn't match: passed " + toString(arguments.size()) + + ", expected 1, 2 or 3.", + ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + } + +}; + +} diff --git a/src/Functions/IFunctionDateOrDateTime.h b/src/Functions/IFunctionDateOrDateTime.h new file mode 100644 index 00000000000..1efe89c7fe9 --- /dev/null +++ b/src/Functions/IFunctionDateOrDateTime.h @@ -0,0 +1,118 @@ +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int ILLEGAL_TYPE_OF_ARGUMENT; + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; +} + +template +class IFunctionDateOrDateTime : public IFunction +{ +public: + static constexpr auto name = Transform::name; + String getName() const override { return name; } + + bool isVariadic() const override { return true; } + + bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; } + + size_t getNumberOfArguments() const override { return 0; } + + bool useDefaultImplementationForConstants() const override { return true; } + + ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {1}; } + + bool hasInformationAboutMonotonicity() const override + { + return true; + } + + Monotonicity getMonotonicityForRange(const IDataType & type, const Field & left, const Field & right) const override + { + if constexpr (std::is_same_v) + return { .is_monotonic = true, .is_always_monotonic = true }; + + const IFunction::Monotonicity is_monotonic = { .is_monotonic = true }; + const IFunction::Monotonicity is_not_monotonic; + + const DateLUTImpl * date_lut = &DateLUT::instance(); + if (const auto * timezone = dynamic_cast(&type)) + date_lut = &timezone->getTimeZone(); + + if (left.isNull() || right.isNull()) + return is_not_monotonic; + + /// The function is monotonous on the [left, right] segment, if the factor transformation returns the same values for them. + + if (checkAndGetDataType(&type)) + { + return Transform::FactorTransform::execute(UInt16(left.get()), *date_lut) + == Transform::FactorTransform::execute(UInt16(right.get()), *date_lut) + ? is_monotonic : is_not_monotonic; + } + else if (checkAndGetDataType(&type)) + { + return Transform::FactorTransform::execute(Int32(left.get()), *date_lut) + == Transform::FactorTransform::execute(Int32(right.get()), *date_lut) + ? is_monotonic : is_not_monotonic; + } + else + { + return Transform::FactorTransform::execute(UInt32(left.get()), *date_lut) + == Transform::FactorTransform::execute(UInt32(right.get()), *date_lut) + ? is_monotonic : is_not_monotonic; + } + } + +protected: + void checkArguments(const ColumnsWithTypeAndName & arguments, bool is_result_type_date_or_date32) const + { + if (arguments.size() == 1) + { + if (!isDateOrDate32(arguments[0].type) && !isDateTime(arguments[0].type) && !isDateTime64(arguments[0].type)) + throw Exception( + "Illegal type " + arguments[0].type->getName() + " of argument of function " + getName() + + ". Should be Date, Date32, DateTime or DateTime64", + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + } + else if (arguments.size() == 2) + { + if (!isDateOrDate32(arguments[0].type) && !isDateTime(arguments[0].type) && !isDateTime64(arguments[0].type)) + throw Exception( + "Illegal type " + arguments[0].type->getName() + " of argument of function " + getName() + + ". Should be Date, Date32, DateTime or DateTime64", + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + if (!isString(arguments[1].type)) + throw Exception( + "Function " + getName() + " supports 1 or 2 arguments. The optional 2nd argument must be " + "a constant string with a timezone name", + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + if ((isDate(arguments[0].type) || isDate32(arguments[0].type)) && is_result_type_date_or_date32) + throw Exception( + "The timezone argument of function " + getName() + " is allowed only when the 1st argument has the type DateTime or DateTime64", + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + } + else + throw Exception( + "Number of arguments for function " + getName() + " doesn't match: passed " + toString(arguments.size()) + + ", should be 1 or 2", + ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + } +}; + +} diff --git a/src/Functions/TransformDateTime64.h b/src/Functions/TransformDateTime64.h index 9ac28118b8f..fbe7e2e8250 100644 --- a/src/Functions/TransformDateTime64.h +++ b/src/Functions/TransformDateTime64.h @@ -87,6 +87,46 @@ public: return wrapped_transform.execute(t, std::forward(args)...); } + + template + inline auto NO_SANITIZE_UNDEFINED execute_extended_result(const DateTime64 & t, Args && ... args) const + { + /// Type conversion from float to integer may be required. + /// We are Ok with implementation specific result for out of range and denormals conversion. + + if constexpr (TransformHasExecuteOverload_v) + { + return wrapped_transform.execute_extended_result(t, scale_multiplier, std::forward(args)...); + } + else if constexpr (TransformHasExecuteOverload_v, Args...>) + { + auto components = DecimalUtils::splitWithScaleMultiplier(t, scale_multiplier); + + const auto result = wrapped_transform.execute_extended_result(components, std::forward(args)...); + using ResultType = std::decay_t; + + if constexpr (std::is_same_v, ResultType>) + { + return DecimalUtils::decimalFromComponentsWithMultiplier(result, scale_multiplier); + } + else + { + return result; + } + } + else + { + const auto components = DecimalUtils::splitWithScaleMultiplier(t, scale_multiplier); + return wrapped_transform.execute_extended_result(static_cast(components.whole), std::forward(args)...); + } + } + + template >> + inline auto execute_extended_result(const T & t, Args && ... args) const + { + return wrapped_transform.execute_extended_result(t, std::forward(args)...); + } + private: DateTime64::NativeType scale_multiplier = 1; Transform wrapped_transform = {}; diff --git a/src/Functions/addressToSymbol.cpp b/src/Functions/addressToSymbol.cpp index 99988ee82f6..dd9efd6cc44 100644 --- a/src/Functions/addressToSymbol.cpp +++ b/src/Functions/addressToSymbol.cpp @@ -83,7 +83,7 @@ public: for (size_t i = 0; i < input_rows_count; ++i) { if (const auto * symbol = symbol_index.findSymbol(reinterpret_cast(data[i]))) - result_column->insertDataWithTerminatingZero(symbol->name, strlen(symbol->name) + 1); + result_column->insertData(symbol->name, strlen(symbol->name)); else result_column->insertDefault(); } diff --git a/src/Functions/demange.cpp b/src/Functions/demange.cpp index 9026790f740..a7c3d8e52bf 100644 --- a/src/Functions/demange.cpp +++ b/src/Functions/demange.cpp @@ -78,15 +78,15 @@ public: for (size_t i = 0; i < input_rows_count; ++i) { - StringRef source = column_concrete->getDataAtWithTerminatingZero(i); + StringRef source = column_concrete->getDataAt(i); auto demangled = tryDemangle(source.data); if (demangled) { - result_column->insertDataWithTerminatingZero(demangled.get(), strlen(demangled.get()) + 1); + result_column->insertData(demangled.get(), strlen(demangled.get())); } else { - result_column->insertDataWithTerminatingZero(source.data, source.size); + result_column->insertData(source.data, source.size); } } @@ -102,4 +102,3 @@ REGISTER_FUNCTION(Demangle) } } - diff --git a/src/Functions/toCustomWeek.cpp b/src/Functions/toCustomWeek.cpp index 13dc76b6389..b773cc7df96 100644 --- a/src/Functions/toCustomWeek.cpp +++ b/src/Functions/toCustomWeek.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include @@ -9,7 +10,7 @@ namespace DB { using FunctionToWeek = FunctionCustomWeekToSomething; using FunctionToYearWeek = FunctionCustomWeekToSomething; -using FunctionToStartOfWeek = FunctionCustomWeekToSomething; +using FunctionToStartOfWeek = FunctionCustomWeekToDateOrDate32; REGISTER_FUNCTION(ToCustomWeek) { diff --git a/src/Functions/toLastDayOfMonth.cpp b/src/Functions/toLastDayOfMonth.cpp index a7faab15f9f..9365880bfb8 100644 --- a/src/Functions/toLastDayOfMonth.cpp +++ b/src/Functions/toLastDayOfMonth.cpp @@ -1,12 +1,12 @@ #include #include -#include +#include namespace DB { -using FunctionToLastDayOfMonth = FunctionDateOrDateTimeToSomething; +using FunctionToLastDayOfMonth = FunctionDateOrDateTimeToDateOrDate32; REGISTER_FUNCTION(ToLastDayOfMonth) { diff --git a/src/Functions/toMonday.cpp b/src/Functions/toMonday.cpp index 89145634e45..280c8a93b1a 100644 --- a/src/Functions/toMonday.cpp +++ b/src/Functions/toMonday.cpp @@ -1,12 +1,12 @@ #include #include -#include +#include namespace DB { -using FunctionToMonday = FunctionDateOrDateTimeToSomething; +using FunctionToMonday = FunctionDateOrDateTimeToDateOrDate32; REGISTER_FUNCTION(ToMonday) { diff --git a/src/Functions/toStartOfISOYear.cpp b/src/Functions/toStartOfISOYear.cpp index 366ba8dd09f..245d043f0b6 100644 --- a/src/Functions/toStartOfISOYear.cpp +++ b/src/Functions/toStartOfISOYear.cpp @@ -1,12 +1,12 @@ #include #include -#include +#include namespace DB { -using FunctionToStartOfISOYear = FunctionDateOrDateTimeToSomething; +using FunctionToStartOfISOYear = FunctionDateOrDateTimeToDateOrDate32; REGISTER_FUNCTION(ToStartOfISOYear) { diff --git a/src/Functions/toStartOfMonth.cpp b/src/Functions/toStartOfMonth.cpp index 9674462097b..00146e25d44 100644 --- a/src/Functions/toStartOfMonth.cpp +++ b/src/Functions/toStartOfMonth.cpp @@ -1,12 +1,12 @@ #include #include -#include +#include namespace DB { -using FunctionToStartOfMonth = FunctionDateOrDateTimeToSomething; +using FunctionToStartOfMonth = FunctionDateOrDateTimeToDateOrDate32; REGISTER_FUNCTION(ToStartOfMonth) { diff --git a/src/Functions/toStartOfQuarter.cpp b/src/Functions/toStartOfQuarter.cpp index c7d69743198..74966d51584 100644 --- a/src/Functions/toStartOfQuarter.cpp +++ b/src/Functions/toStartOfQuarter.cpp @@ -1,12 +1,12 @@ #include #include -#include +#include namespace DB { -using FunctionToStartOfQuarter = FunctionDateOrDateTimeToSomething; +using FunctionToStartOfQuarter = FunctionDateOrDateTimeToDateOrDate32; REGISTER_FUNCTION(ToStartOfQuarter) { diff --git a/src/Functions/toStartOfYear.cpp b/src/Functions/toStartOfYear.cpp index 13729f2f812..27019bfd69f 100644 --- a/src/Functions/toStartOfYear.cpp +++ b/src/Functions/toStartOfYear.cpp @@ -1,12 +1,12 @@ #include #include -#include +#include namespace DB { -using FunctionToStartOfYear = FunctionDateOrDateTimeToSomething; +using FunctionToStartOfYear = FunctionDateOrDateTimeToDateOrDate32; REGISTER_FUNCTION(ToStartOfYear) { diff --git a/src/IO/ReadBufferFromS3.cpp b/src/IO/ReadBufferFromS3.cpp index 380365f9b95..5efe9c61a35 100644 --- a/src/IO/ReadBufferFromS3.cpp +++ b/src/IO/ReadBufferFromS3.cpp @@ -131,18 +131,18 @@ bool ReadBufferFromS3::nextImpl() ProfileEvents::increment(ProfileEvents::ReadBufferFromS3Microseconds, watch.elapsedMicroseconds()); break; } - catch (const Exception & e) + catch (Exception & e) { watch.stop(); ProfileEvents::increment(ProfileEvents::ReadBufferFromS3Microseconds, watch.elapsedMicroseconds()); ProfileEvents::increment(ProfileEvents::ReadBufferFromS3RequestsErrors, 1); - if (const auto * s3_exception = dynamic_cast(&e)) + if (auto * s3_exception = dynamic_cast(&e)) { /// It doesn't make sense to retry Access Denied or No Such Key if (!s3_exception->isRetryableError()) { - tryLogCurrentException(log, fmt::format("while reading key: {}, from bucket: {}", key, bucket)); + s3_exception->addMessage("while reading key: {}, from bucket: {}", key, bucket); throw; } } diff --git a/src/IO/S3Common.h b/src/IO/S3Common.h index da7ecf95b78..ddc23b7aa8a 100644 --- a/src/IO/S3Common.h +++ b/src/IO/S3Common.h @@ -55,7 +55,7 @@ public: bool isRetryableError() const; private: - const Aws::S3::S3Errors code; + Aws::S3::S3Errors code; }; } diff --git a/src/Interpreters/AsynchronousInsertQueue.cpp b/src/Interpreters/AsynchronousInsertQueue.cpp index 88b564f1dcf..cad2200c5ec 100644 --- a/src/Interpreters/AsynchronousInsertQueue.cpp +++ b/src/Interpreters/AsynchronousInsertQueue.cpp @@ -457,17 +457,20 @@ try format->addBuffer(std::move(last_buffer)); - auto chunk = Chunk(executor.getResultColumns(), total_rows); - size_t total_bytes = chunk.bytes(); + if (total_rows) + { + auto chunk = Chunk(executor.getResultColumns(), total_rows); + size_t total_bytes = chunk.bytes(); - auto source = std::make_shared(header, std::move(chunk)); - pipeline.complete(Pipe(std::move(source))); + auto source = std::make_shared(header, std::move(chunk)); + pipeline.complete(Pipe(std::move(source))); - CompletedPipelineExecutor completed_executor(pipeline); - completed_executor.execute(); + CompletedPipelineExecutor completed_executor(pipeline); + completed_executor.execute(); - LOG_INFO(log, "Flushed {} rows, {} bytes for query '{}'", - total_rows, total_bytes, queryToString(key.query)); + LOG_INFO(log, "Flushed {} rows, {} bytes for query '{}'", + total_rows, total_bytes, queryToString(key.query)); + } for (const auto & entry : data->entries) if (!entry->isFinished()) diff --git a/src/Interpreters/TreeOptimizer.cpp b/src/Interpreters/TreeOptimizer.cpp index 3f7e141db3e..74f084df40b 100644 --- a/src/Interpreters/TreeOptimizer.cpp +++ b/src/Interpreters/TreeOptimizer.cpp @@ -418,7 +418,7 @@ void optimizeDuplicateDistinct(ASTSelectQuery & select) return; std::unordered_set distinct_names = getDistinctNames(*subselect); - std::unordered_set selected_names; + std::unordered_set selected_names; /// Check source column names from select list (ignore aliases and table names) for (const auto & id : select.select()->children) @@ -427,11 +427,11 @@ void optimizeDuplicateDistinct(ASTSelectQuery & select) if (!identifier) return; - String name = identifier->shortName(); + const String & name = identifier->shortName(); if (!distinct_names.contains(name)) return; /// Not a distinct column, keep DISTINCT for it. - selected_names.insert(name); + selected_names.emplace(name); } /// select columns list != distinct columns list diff --git a/src/Processors/Transforms/MongoDBSource.cpp b/src/Processors/Transforms/MongoDBSource.cpp index 19d21f3409e..19289f3f818 100644 --- a/src/Processors/Transforms/MongoDBSource.cpp +++ b/src/Processors/Transforms/MongoDBSource.cpp @@ -250,13 +250,13 @@ namespace if (value.type() == Poco::MongoDB::ElementTraits::TypeId) { std::string string_id = value.toString(); - assert_cast(column).insertDataWithTerminatingZero(string_id.data(), string_id.size() + 1); + assert_cast(column).insertData(string_id.data(), string_id.size()); break; } else if (value.type() == Poco::MongoDB::ElementTraits::TypeId) { String string = static_cast &>(value).value(); - assert_cast(column).insertDataWithTerminatingZero(string.data(), string.size() + 1); + assert_cast(column).insertData(string.data(), string.size()); break; } diff --git a/src/Storages/MergeTree/DataPartStorageOnDisk.cpp b/src/Storages/MergeTree/DataPartStorageOnDisk.cpp index 894eec12f0c..bb9aab3c9b6 100644 --- a/src/Storages/MergeTree/DataPartStorageOnDisk.cpp +++ b/src/Storages/MergeTree/DataPartStorageOnDisk.cpp @@ -204,13 +204,12 @@ DataPartStorageBuilderPtr DataPartStorageOnDisk::getBuilder() const } void DataPartStorageOnDisk::remove( - bool can_remove_shared_data, - const NameSet & names_not_to_remove, + CanRemoveCallback && can_remove_callback, const MergeTreeDataPartChecksums & checksums, std::list projections, bool is_temp, MergeTreeDataPartState state, - Poco::Logger * log) const + Poco::Logger * log) { /// NOTE We rename part to delete_tmp_ instead of delete_tmp_ to avoid race condition /// when we try to remove two parts with the same name, but different relative paths, @@ -239,13 +238,16 @@ void DataPartStorageOnDisk::remove( fs::path to = fs::path(root_path) / part_dir_without_slash; + std::optional can_remove_description; + auto disk = volume->getDisk(); if (disk->exists(to)) { LOG_WARNING(log, "Directory {} (to which part must be renamed before removing) already exists. Most likely this is due to unclean restart or race condition. Removing it.", fullPath(disk, to)); try { - disk->removeSharedRecursive(fs::path(to) / "", !can_remove_shared_data, names_not_to_remove); + can_remove_description.emplace(can_remove_callback()); + disk->removeSharedRecursive(fs::path(to) / "", !can_remove_description->can_remove_anything, can_remove_description->files_not_to_remove); } catch (...) { @@ -257,6 +259,7 @@ void DataPartStorageOnDisk::remove( try { disk->moveDirectory(from, to); + onRename(root_path, part_dir_without_slash); } catch (const fs::filesystem_error & e) { @@ -268,6 +271,9 @@ void DataPartStorageOnDisk::remove( throw; } + if (!can_remove_description) + can_remove_description.emplace(can_remove_callback()); + // Record existing projection directories so we don't remove them twice std::unordered_set projection_directories; std::string proj_suffix = ".proj"; @@ -278,7 +284,7 @@ void DataPartStorageOnDisk::remove( clearDirectory( fs::path(to) / proj_dir_name, - can_remove_shared_data, names_not_to_remove, projection.checksums, {}, is_temp, state, log, true); + can_remove_description->can_remove_anything, can_remove_description->files_not_to_remove, projection.checksums, {}, is_temp, state, log, true); } /// It is possible that we are removing the part which have a written but not loaded projection. @@ -305,7 +311,7 @@ void DataPartStorageOnDisk::remove( clearDirectory( fs::path(to) / name, - can_remove_shared_data, names_not_to_remove, tmp_checksums, {}, is_temp, state, log, true); + can_remove_description->can_remove_anything, can_remove_description->files_not_to_remove, tmp_checksums, {}, is_temp, state, log, true); } catch (...) { @@ -315,7 +321,7 @@ void DataPartStorageOnDisk::remove( } } - clearDirectory(to, can_remove_shared_data, names_not_to_remove, checksums, projection_directories, is_temp, state, log, false); + clearDirectory(to, can_remove_description->can_remove_anything, can_remove_description->files_not_to_remove, checksums, projection_directories, is_temp, state, log, false); } void DataPartStorageOnDisk::clearDirectory( diff --git a/src/Storages/MergeTree/DataPartStorageOnDisk.h b/src/Storages/MergeTree/DataPartStorageOnDisk.h index f02ef26f811..51b557767d4 100644 --- a/src/Storages/MergeTree/DataPartStorageOnDisk.h +++ b/src/Storages/MergeTree/DataPartStorageOnDisk.h @@ -45,13 +45,12 @@ public: void checkConsistency(const MergeTreeDataPartChecksums & checksums) const override; void remove( - bool can_remove_shared_data, - const NameSet & names_not_to_remove, + CanRemoveCallback && can_remove_callback, const MergeTreeDataPartChecksums & checksums, std::list projections, bool is_temp, MergeTreeDataPartState state, - Poco::Logger * log) const override; + Poco::Logger * log) override; std::string getRelativePathForPrefix(Poco::Logger * log, const String & prefix, bool detached) const override; diff --git a/src/Storages/MergeTree/IDataPartStorage.h b/src/Storages/MergeTree/IDataPartStorage.h index 9da8a5eae03..bd449d46075 100644 --- a/src/Storages/MergeTree/IDataPartStorage.h +++ b/src/Storages/MergeTree/IDataPartStorage.h @@ -12,6 +12,13 @@ namespace DB class ReadBufferFromFileBase; class WriteBufferFromFileBase; +struct CanRemoveDescription +{ + bool can_remove_anything; + NameSet files_not_to_remove; + +}; +using CanRemoveCallback = std::function; class IDataPartStorageIterator { @@ -113,13 +120,12 @@ public: /// can_remove_shared_data, names_not_to_remove are specific for DiskObjectStorage. /// projections, checksums are needed to avoid recursive listing virtual void remove( - bool can_remove_shared_data, - const NameSet & names_not_to_remove, + CanRemoveCallback && can_remove_callback, const MergeTreeDataPartChecksums & checksums, std::list projections, bool is_temp, MergeTreeDataPartState state, - Poco::Logger * log) const = 0; + Poco::Logger * log) = 0; /// Get a name like 'prefix_partdir_tryN' which does not exist in a root dir. /// TODO: remove it. diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index 93c8516fa7b..217f437a4ff 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -1433,13 +1433,18 @@ void IMergeTreeDataPart::remove() const assert(assertHasValidVersionMetadata()); part_is_probably_removed_from_disk = true; - auto [can_remove, files_not_to_remove] = canRemovePart(); + auto can_remove_callback = [this] () + { + auto [can_remove, files_not_to_remove] = canRemovePart(); + if (!can_remove) + LOG_TRACE(storage.log, "Blobs of part {} cannot be removed", name); - if (!can_remove) - LOG_TRACE(storage.log, "Blobs of part {} cannot be removed", name); + if (!files_not_to_remove.empty()) + LOG_TRACE(storage.log, "Some blobs ({}) of part {} cannot be removed", fmt::join(files_not_to_remove, ", "), name); + + return CanRemoveDescription{.can_remove_anything = can_remove, .files_not_to_remove = files_not_to_remove }; + }; - if (!files_not_to_remove.empty()) - LOG_TRACE(storage.log, "Some blobs ({}) of part {} cannot be removed", fmt::join(files_not_to_remove, ", "), name); if (!isStoredOnDisk()) return; @@ -1459,7 +1464,7 @@ void IMergeTreeDataPart::remove() const projection_checksums.emplace_back(IDataPartStorage::ProjectionChecksums{.name = p_name, .checksums = projection_part->checksums}); } - data_part_storage->remove(can_remove, files_not_to_remove, checksums, projection_checksums, is_temp, getState(), storage.log); + data_part_storage->remove(std::move(can_remove_callback), checksums, projection_checksums, is_temp, getState(), storage.log); } String IMergeTreeDataPart::getRelativePathForPrefix(const String & prefix, bool detached) const diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 20323d87bed..7c7bab333db 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -27,6 +27,7 @@ #include #include #include +#include #include #include #include @@ -1635,11 +1636,10 @@ size_t MergeTreeData::clearOldTemporaryDirectories(size_t custom_directories_lif /// We don't control the amount of refs for temporary parts so we cannot decide can we remove blobs /// or not. So we are not doing it bool keep_shared = false; - if (it->path().find("fetch") != std::string::npos) + if (disk->supportZeroCopyReplication() && settings->allow_remote_fs_zero_copy_replication) { - keep_shared = disk->supportZeroCopyReplication() && settings->allow_remote_fs_zero_copy_replication; - if (keep_shared) - LOG_WARNING(log, "Since zero-copy replication is enabled we are not going to remove blobs from shared storage for {}", full_path); + LOG_WARNING(log, "Since zero-copy replication is enabled we are not going to remove blobs from shared storage for {}", full_path); + keep_shared = true; } disk->removeSharedRecursive(it->path(), keep_shared, {}); @@ -2135,6 +2135,7 @@ void MergeTreeData::renameInMemory(const StorageID & new_table_id) void MergeTreeData::dropAllData() { LOG_TRACE(log, "dropAllData: waiting for locks."); + auto settings_ptr = getSettings(); auto lock = lockParts(); diff --git a/src/Storages/SelectQueryInfo.h b/src/Storages/SelectQueryInfo.h index c41b422199d..f2835ab4dbf 100644 --- a/src/Storages/SelectQueryInfo.h +++ b/src/Storages/SelectQueryInfo.h @@ -117,10 +117,10 @@ struct InputOrderInfo * sort_description_for_merging will be equal to (c, d) and * used_prefix_of_sorting_key_size will be equal to 4. */ - size_t used_prefix_of_sorting_key_size; + const size_t used_prefix_of_sorting_key_size; - int direction; - UInt64 limit; + const int direction; + const UInt64 limit; InputOrderInfo( const SortDescription & sort_description_for_merging_, diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index d704721abcc..295f201191a 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -7604,7 +7604,7 @@ std::pair StorageReplicatedMergeTree::unlockSharedData(const IMer } else { - LOG_TRACE(log, "Part {} looks temporary, because checksums file doesn't exists, blobs can be removed", part.name); + LOG_TRACE(log, "Part {} looks temporary, because {} file doesn't exists, blobs can be removed", part.name, IMergeTreeDataPart::FILE_FOR_REFERENCES_CHECK); /// Temporary part with some absent file cannot be locked in shared mode return std::make_pair(true, NameSet{}); } diff --git a/tests/jepsen.clickhouse-keeper/resources/keeper_config.xml b/tests/jepsen.clickhouse-keeper/resources/keeper_config.xml index 677de5f6769..2ab747fbd71 100644 --- a/tests/jepsen.clickhouse-keeper/resources/keeper_config.xml +++ b/tests/jepsen.clickhouse-keeper/resources/keeper_config.xml @@ -21,8 +21,7 @@ 1000 2000 4000 - 0 - fastlinear + {quorum_reads} {snapshot_distance} {stale_log_gap} {reserved_log_items} diff --git a/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse_keeper/counter.clj b/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse_keeper/counter.clj index e6e94371501..60b29bd799a 100644 --- a/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse_keeper/counter.clj +++ b/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse_keeper/counter.clj @@ -27,12 +27,7 @@ (invoke! [this test op] (case (:f op) - :read (try - (assoc op - :type :ok - :value (count (zk-list conn root-path))) - (catch Exception _ (assoc op :type :info, :error :connect-error))) - :final-read (exec-with-retries 30 (fn [] + :read (exec-with-retries 30 (fn [] (assoc op :type :ok :value (count (zk-list conn root-path))))) @@ -54,5 +49,7 @@ :checker (checker/compose {:counter (checker/counter) :perf (checker/perf)}) - :generator (gen/mix [r add]) - :final-generator (gen/once {:type :invoke, :f :final-read, :value nil})}) + :generator (->> (range) + (map (fn [x] + (->> (gen/mix [r add]))))) + :final-generator (gen/once {:type :invoke, :f :read, :value nil})}) diff --git a/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse_keeper/db.clj b/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse_keeper/db.clj index 9e85b37dd75..c354e36e430 100644 --- a/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse_keeper/db.clj +++ b/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse_keeper/db.clj @@ -98,6 +98,7 @@ #"\{srv2\}" (get nodes 1) #"\{srv3\}" (get nodes 2) #"\{id\}" (str (inc (.indexOf nodes node))) + #"\{quorum_reads\}" (str (boolean (:quorum test))) #"\{snapshot_distance\}" (str (:snapshot-distance test)) #"\{stale_log_gap\}" (str (:stale-log-gap test)) #"\{reserved_log_items\}" (str (:reserved-log-items test))}] diff --git a/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse_keeper/main.clj b/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse_keeper/main.clj index 1919c8ce3ec..cd1aa540e45 100644 --- a/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse_keeper/main.clj +++ b/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse_keeper/main.clj @@ -103,7 +103,7 @@ current-nemesis (get custom-nemesis/custom-nemesises (:nemesis opts))] (merge tests/noop-test opts - {:name (str "clickhouse-keeper-" (name (:workload opts)) "-" (name (:nemesis opts))) + {:name (str "clickhouse-keeper-quorum=" quorum "-" (name (:workload opts)) "-" (name (:nemesis opts))) :os ubuntu/os :db (get-db opts) :pure-generators true diff --git a/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse_keeper/register.clj b/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse_keeper/register.clj index 228cb3f46ef..a1605192b51 100644 --- a/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse_keeper/register.clj +++ b/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse_keeper/register.clj @@ -20,8 +20,7 @@ (assoc this :conn (zk-connect node 9181 30000))) (setup! [this test] - (exec-with-retries 30 (fn [] - (zk-create-range conn 300)))) + (zk-create-range conn 300)) ; 300 nodes to be sure (invoke! [_ test op] (let [[k v] (:value op) diff --git a/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse_keeper/utils.clj b/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse_keeper/utils.clj index cdb25ba0a2d..3625b24b4f9 100644 --- a/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse_keeper/utils.clj +++ b/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse_keeper/utils.clj @@ -45,7 +45,7 @@ (defn zk-connect [host port timeout] - (zk/connect (str host ":" port) :timeout-msec timeout)) + (exec-with-retries 30 (fn [] (zk/connect (str host ":" port) :timeout-msec timeout)))) (defn zk-create-range [conn n] diff --git a/tests/queries/0_stateless/01921_datatype_date32.reference b/tests/queries/0_stateless/01921_datatype_date32.reference index a33a96ffffb..b5bf4e06a4c 100644 --- a/tests/queries/0_stateless/01921_datatype_date32.reference +++ b/tests/queries/0_stateless/01921_datatype_date32.reference @@ -109,10 +109,10 @@ -------toStartOfFifteenMinutes--------- -------toStartOfHour--------- -------toStartOfISOYear--------- -2079-06-07 -2079-06-07 -2119-07-29 -2119-07-29 +1970-01-01 +1970-01-01 +2148-12-30 +2148-12-30 2021-01-04 -------toRelativeYearNum--------- 1900 diff --git a/tests/queries/0_stateless/02403_enable_extended_results_for_datetime_functions.reference b/tests/queries/0_stateless/02403_enable_extended_results_for_datetime_functions.reference new file mode 100644 index 00000000000..aa950215f59 --- /dev/null +++ b/tests/queries/0_stateless/02403_enable_extended_results_for_datetime_functions.reference @@ -0,0 +1,56 @@ +toStartOfYear;toDate32;true 1920-01-01 +type;toStartOfYear;toDate32;true Date32 +toStartOfYear;toDateTime64;true 1920-01-01 +type;toStartOfYear;toDateTime64;true Date32 +toStartOfISOYear;toDate32;true 1919-12-29 +type;toStartOfISOYear;toDate32;true Date32 +toStartOfISOYear;toDateTime64;true 1919-12-29 +type;toStartOfISOYear;toDateTime64;true Date32 +toStartOfQuarter;toDate32;true 1920-01-01 +type;toStartOfQuarter;toDate32;true Date32 +toStartOfQuarter;toDateTime64;true 1920-01-01 +type;toStartOfQuarter;toDateTime64;true Date32 +toStartOfMonth;toDate32;true 1920-02-01 +type;toStartOfMonth;toDate32;true Date32 +toStartOfMonth;toDateTime64;true 1920-02-01 +type;toStartOfMonth;toDateTime64;true Date32 +toStartOfWeek;toDate32;true 1920-02-01 +type;toStartOfWeek;toDate32;true Date32 +toStartOfWeek;toDateTime64;true 1920-02-01 +type;toStartOfWeek;toDateTime64;true Date32 +toMonday;toDate32;true 1920-02-02 +type;toMonday;toDate32;true Date32 +toMonday;toDateTime64;true 1920-02-02 +type;toMonday;toDateTime64;true Date32 +toLastDayOfMonth;toDate32;true 1920-02-29 +type;toLastDayOfMonth;toDate32;true Date32 +toLastDayOfMonth;toDateTime64;true 1920-02-29 +type;toLastDayOfMonth;toDateTime64;true Date32 +toStartOfYear;toDate32;false 1970-01-01 +type;toStartOfYear;toDate32;false Date +toStartOfYear;toDateTime64;false 1970-01-01 +type;toStartOfYear;toDateTime64;false Date +toStartOfISOYear;toDate32;false 1970-01-01 +type;toStartOfISOYear;toDate32;false Date +toStartOfISOYear;toDateTime64;false 1970-01-01 +type;toStartOfISOYear;toDateTime64;false Date +toStartOfQuarter;toDate32;false 1970-01-01 +type;toStartOfQuarter;toDate32;false Date +toStartOfQuarter;toDateTime64;false 1970-01-01 +type;toStartOfQuarter;toDateTime64;false Date +toStartOfMonth;toDate32;false 1970-01-01 +type;toStartOfMonth;toDate32;false Date +toStartOfMonth;toDateTime64;false 1970-01-01 +type;toStartOfMonth;toDateTime64;false Date +toStartOfWeek;toDate32;false 1970-01-01 +type;toStartOfWeek;toDate32;false Date +toStartOfWeek;toDateTime64;false 1970-01-01 +type;toStartOfWeek;toDateTime64;false Date +toMonday;toDate32;false 1970-01-01 +type;toMonday;toDate32;false Date +toMonday;toDateTime64;false 1970-01-01 +type;toMonday;toDateTime64;false Date +toLastDayOfMonth;toDate32;false 1970-01-01 +type;toLastDayOfMonth;toDate32;false Date +toLastDayOfMonth;toDateTime64;false 1970-01-01 +type;toLastDayOfMonth;toDateTime64;false Date diff --git a/tests/queries/0_stateless/02403_enable_extended_results_for_datetime_functions.sql.j2 b/tests/queries/0_stateless/02403_enable_extended_results_for_datetime_functions.sql.j2 new file mode 100644 index 00000000000..70c07c7792a --- /dev/null +++ b/tests/queries/0_stateless/02403_enable_extended_results_for_datetime_functions.sql.j2 @@ -0,0 +1,9 @@ +{% for option_value in ['true', 'false'] -%} +{% for date_fun in ['toStartOfYear', 'toStartOfISOYear', 'toStartOfQuarter', 'toStartOfMonth', 'toStartOfWeek', 'toMonday', 'toLastDayOfMonth'] -%} +SELECT '{{ date_fun }};toDate32;{{ option_value }}', {{ date_fun }}(toDate32('1920-02-02')) SETTINGS enable_extended_results_for_datetime_functions = {{ option_value }}; +SELECT 'type;{{ date_fun }};toDate32;{{ option_value }}', toTypeName({{ date_fun }}(toDate32('1920-02-02'))) SETTINGS enable_extended_results_for_datetime_functions = {{ option_value }}; +SELECT '{{ date_fun }};toDateTime64;{{ option_value }}', {{ date_fun }}(toDateTime64('1920-02-02 10:20:30', 3)) SETTINGS enable_extended_results_for_datetime_functions = {{ option_value }}; +SELECT 'type;{{ date_fun }};toDateTime64;{{ option_value }}', toTypeName({{ date_fun }}(toDateTime64('1920-02-02 10:20:30', 3))) SETTINGS enable_extended_results_for_datetime_functions = {{ option_value }}; +{% endfor -%} +{% endfor -%} + diff --git a/tests/queries/0_stateless/02421_type_json_async_insert.reference b/tests/queries/0_stateless/02421_type_json_async_insert.reference new file mode 100644 index 00000000000..f3d96ebf2d0 --- /dev/null +++ b/tests/queries/0_stateless/02421_type_json_async_insert.reference @@ -0,0 +1,5 @@ +Cannot parse object +0 +0 +Cannot parse object +aaa diff --git a/tests/queries/0_stateless/02421_type_json_async_insert.sh b/tests/queries/0_stateless/02421_type_json_async_insert.sh new file mode 100755 index 00000000000..8aa0d510dbb --- /dev/null +++ b/tests/queries/0_stateless/02421_type_json_async_insert.sh @@ -0,0 +1,21 @@ +#!/usr/bin/env bash +# Tags: no-fasttest + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS t_json_async_insert" +$CLICKHOUSE_CLIENT --allow_experimental_object_type=1 -q "CREATE TABLE t_json_async_insert (data JSON) ENGINE = MergeTree ORDER BY tuple()" + +$CLICKHOUSE_CLIENT --async_insert=1 --wait_for_async_insert=1 -q 'INSERT INTO t_json_async_insert FORMAT JSONAsObject {"aaa"}' 2>&1 | grep -o -m1 "Cannot parse object" +$CLICKHOUSE_CLIENT -q "SELECT count() FROM t_json_async_insert" +$CLICKHOUSE_CLIENT -q "SELECT count() FROM system.parts WHERE database = '$CLICKHOUSE_DATABASE' AND table = 't_json_async_insert'" + +$CLICKHOUSE_CLIENT --async_insert=1 --wait_for_async_insert=1 -q 'INSERT INTO t_json_async_insert FORMAT JSONAsObject {"aaa"}' 2>&1 | grep -o -m1 "Cannot parse object" & +$CLICKHOUSE_CLIENT --async_insert=1 --wait_for_async_insert=1 -q 'INSERT INTO t_json_async_insert FORMAT JSONAsObject {"k1": "aaa"}' & + +wait + +$CLICKHOUSE_CLIENT -q "SELECT data.k1 FROM t_json_async_insert ORDER BY data.k1" +$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS t_json_async_insert" diff --git a/tests/queries/0_stateless/02421_type_json_empty_parts.reference b/tests/queries/0_stateless/02421_type_json_empty_parts.reference new file mode 100644 index 00000000000..f360b4b92cd --- /dev/null +++ b/tests/queries/0_stateless/02421_type_json_empty_parts.reference @@ -0,0 +1,26 @@ +Collapsing +0 +0 +id UInt64 +s Int8 +data Tuple(_dummy UInt8) +DELETE all +2 +1 +id UInt64 +data Tuple(k1 String, k2 String) +0 +0 +id UInt64 +data Tuple(_dummy UInt8) +TTL +1 +1 +id UInt64 +d Date +data Tuple(k1 String, k2 String) +0 +0 +id UInt64 +d Date +data Tuple(_dummy UInt8) diff --git a/tests/queries/0_stateless/02421_type_json_empty_parts.sql b/tests/queries/0_stateless/02421_type_json_empty_parts.sql new file mode 100644 index 00000000000..409a2b18a49 --- /dev/null +++ b/tests/queries/0_stateless/02421_type_json_empty_parts.sql @@ -0,0 +1,61 @@ +-- Tags: no-fasttest + +SET allow_experimental_object_type = 1; + +DROP TABLE IF EXISTS t_json_empty_parts; + +SELECT 'Collapsing'; +CREATE TABLE t_json_empty_parts (id UInt64, s Int8, data JSON) ENGINE = CollapsingMergeTree(s) ORDER BY id; + +INSERT INTO t_json_empty_parts VALUES (1, 1, '{"k1": "aaa"}') (1, -1, '{"k2": "bbb"}'); + +SELECT count() FROM t_json_empty_parts; +SELECT count() FROM system.parts WHERE table = 't_json_empty_parts' AND database = currentDatabase() AND active; +DESC TABLE t_json_empty_parts SETTINGS describe_extend_object_types = 1; + +DROP TABLE t_json_empty_parts; + +DROP TABLE IF EXISTS t_json_empty_parts; + +SELECT 'DELETE all'; +CREATE TABLE t_json_empty_parts (id UInt64, data JSON) ENGINE = MergeTree ORDER BY id; + +INSERT INTO t_json_empty_parts VALUES (1, '{"k1": "aaa"}') (2, '{"k2": "bbb"}'); + +SELECT count() FROM t_json_empty_parts; +SELECT count() FROM system.parts WHERE table = 't_json_empty_parts' AND database = currentDatabase() AND active; +DESC TABLE t_json_empty_parts SETTINGS describe_extend_object_types = 1; + +SET mutations_sync = 2; +ALTER TABLE t_json_empty_parts DELETE WHERE 1; + +DETACH TABLE t_json_empty_parts; +ATTACH TABLE t_json_empty_parts; + +SELECT count() FROM t_json_empty_parts; +SELECT count() FROM system.parts WHERE table = 't_json_empty_parts' AND database = currentDatabase() AND active; +DESC TABLE t_json_empty_parts SETTINGS describe_extend_object_types = 1; + +DROP TABLE IF EXISTS t_json_empty_parts; + +SELECT 'TTL'; +CREATE TABLE t_json_empty_parts (id UInt64, d Date, data JSON) ENGINE = MergeTree ORDER BY id TTL d WHERE id % 2 = 1; + +INSERT INTO t_json_empty_parts VALUES (1, '2000-01-01', '{"k1": "aaa"}') (2, '2000-01-01', '{"k2": "bbb"}'); +OPTIMIZE TABLE t_json_empty_parts FINAL; + +SELECT count() FROM t_json_empty_parts; +SELECT count() FROM system.parts WHERE table = 't_json_empty_parts' AND database = currentDatabase() AND active; +DESC TABLE t_json_empty_parts SETTINGS describe_extend_object_types = 1; + +ALTER TABLE t_json_empty_parts MODIFY TTL d; +OPTIMIZE TABLE t_json_empty_parts FINAL; + +DETACH TABLE t_json_empty_parts; +ATTACH TABLE t_json_empty_parts; + +SELECT count() FROM t_json_empty_parts; +SELECT count() FROM system.parts WHERE table = 't_json_empty_parts' AND database = currentDatabase() AND active; +DESC TABLE t_json_empty_parts SETTINGS describe_extend_object_types = 1; + +DROP TABLE IF EXISTS t_json_empty_parts; diff --git a/tests/queries/0_stateless/02423_multidimensional_array_get_data_at.reference b/tests/queries/0_stateless/02423_multidimensional_array_get_data_at.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/02423_multidimensional_array_get_data_at.sql b/tests/queries/0_stateless/02423_multidimensional_array_get_data_at.sql new file mode 100644 index 00000000000..a47fbdfc789 --- /dev/null +++ b/tests/queries/0_stateless/02423_multidimensional_array_get_data_at.sql @@ -0,0 +1,7 @@ +SELECT formatRow('RawBLOB', [[[33]], []]); -- { serverError 48 } +SELECT formatRow('RawBLOB', [[[]], []]); -- { serverError 48 } +SELECT formatRow('RawBLOB', [[[[[[[0x48, 0x65, 0x6c, 0x6c, 0x6f]]]]]], []]); -- { serverError 48 } +SELECT formatRow('RawBLOB', []::Array(Array(Nothing))); -- { serverError 48 } +SELECT formatRow('RawBLOB', [[], [['Hello']]]); -- { serverError 48 } +SELECT formatRow('RawBLOB', [[['World']], []]); -- { serverError 48 } +SELECT formatRow('RawBLOB', []::Array(String)); -- { serverError 48 }