Merge branch 'master' into bump-replxx

This commit is contained in:
Alexey Milovidov 2022-09-17 22:49:34 +03:00 committed by GitHub
commit 59d05b949b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
75 changed files with 1057 additions and 967 deletions

2
contrib/NuRaft vendored

@ -1 +1 @@
Subproject commit e15858f8ad0ce8aba85cf74e3763874c76bf927c
Subproject commit 1be805e7cb2494aa8170015493474379b0362dfc

View File

@ -3145,6 +3145,17 @@ Result:
└─────┴─────┴───────┘
```
## enable_extended_results_for_datetime_functions {#enable-extended-results-for-datetime-functions}
Enables or disables returning results of type `Date32` with extended range (compared to type `Date`) for functions [toStartOfYear](../../sql-reference/functions/date-time-functions.md#tostartofyear), [toStartOfISOYear](../../sql-reference/functions/date-time-functions.md#tostartofisoyear), [toStartOfQuarter](../../sql-reference/functions/date-time-functions.md#tostartofquarter), [toStartOfMonth](../../sql-reference/functions/date-time-functions.md#tostartofmonth), [toStartOfWeek](../../sql-reference/functions/date-time-functions.md#tostartofweek), [toMonday](../../sql-reference/functions/date-time-functions.md#tomonday) and [toLastDayOfMonth](../../sql-reference/functions/date-time-functions.md#tolastdayofmonth).
Possible values:
- 0 — Functions return `Date` for all types of arguments.
- 1 — Functions return `Date32` for `Date32` or `DateTime64` arguments and `Date` otherwise.
Default value: `0`.
## optimize_move_to_prewhere {#optimize_move_to_prewhere}
Enables or disables automatic [PREWHERE](../../sql-reference/statements/select/prewhere.md) optimization in [SELECT](../../sql-reference/statements/select/index.md) queries.

View File

@ -268,13 +268,15 @@ Result:
```
:::note
The return type of `toStartOf*`, `toLastDayOfMonth`, `toMonday` functions described below is `Date` or `DateTime`.
Though these functions can take values of the extended types `Date32` and `DateTime64` as an argument, passing them a time outside the normal range (year 1970 to 2149 for `Date` / 2106 for `DateTime`) will produce wrong results.
In case argument is out of normal range:
The return type of `toStartOf*`, `toLastDayOfMonth`, `toMonday` functions described below is determined by the configuration parameter [enable_extended_results_for_datetime_functions](../../operations/settings/settings#enable-extended-results-for-datetime-functions) which is `0` by default.
Behavior for
* `enable_extended_results_for_datetime_functions = 0`: Functions `toStartOf*`, `toLastDayOfMonth`, `toMonday` return `Date` or `DateTime`. Though these functions can take values of the extended types `Date32` and `DateTime64` as an argument, passing them a time outside the normal range (year 1970 to 2149 for `Date` / 2106 for `DateTime`) will produce wrong results. In case argument is out of normal range:
* If the argument is smaller than 1970, the result will be calculated from the argument `1970-01-01 (00:00:00)` instead.
* If the return type is `DateTime` and the argument is larger than `2106-02-07 08:28:15`, the result will be calculated from the argument `2106-02-07 08:28:15` instead.
* If the return type is `Date` and the argument is larger than `2149-06-06`, the result will be calculated from the argument `2149-06-06` instead.
* If `toLastDayOfMonth` is called with an argument greater then `2149-05-31`, the result will be calculated from the argument `2149-05-31` instead.
* `enable_extended_results_for_datetime_functions = 1`: Functions `toStartOfYear`, `toStartOfISOYear`, `toStartOfQuarter`, `toStartOfMonth`, `toStartOfWeek`, `toLastDayOfMonth`, `toMonday` return `Date` or `DateTime` if their argument is a `Date` or `DateTime`, and they return `Date32` or `DateTime64` if their argument is a `Date32` or `DateTime64`.
:::
## toStartOfYear
@ -303,6 +305,8 @@ Returns the date.
Rounds up a date or date with time to the last day of the month.
Returns the date.
If `toLastDayOfMonth` is called with an argument of type `Date` greater then 2149-05-31, the result will be calculated from the argument 2149-05-31 instead.
## toMonday
Rounds down a date or date with time to the nearest Monday.

View File

@ -3799,6 +3799,17 @@ Exception: Total regexp lengths too large.
Значение по умолчанию: `1`.
## enable_extended_results_for_datetime_functions {#enable-extended-results-for-datetime-functions}
Включает или отключает возвращение результатов типа `Date32` с расширенным диапазоном (по сравнению с типом `Date`) для функций [toStartOfYear](../../sql-reference/functions/date-time-functions.md#tostartofyear), [toStartOfISOYear](../../sql-reference/functions/date-time-functions.md#tostartofisoyear), [toStartOfQuarter](../../sql-reference/functions/date-time-functions.md#tostartofquarter), [toStartOfMonth](../../sql-reference/functions/date-time-functions.md#tostartofmonth), [toStartOfWeek](../../sql-reference/functions/date-time-functions.md#tostartofweek), [toMonday](../../sql-reference/functions/date-time-functions.md#tomonday) и [toLastDayOfMonth](../../sql-reference/functions/date-time-functions.md#tolastdayofmonth).
Возможные значения:
- 0 — Функции возвращают результаты типа `Date` для всех типов аргументов.
- 1 — Функции возвращают результаты типа `Date32` для аргументов типа `Date32` или `DateTime64` и возвращают `Date` в других случаях.
Значение по умолчанию: `0`.
**Пример**
Запрос:

View File

@ -268,24 +268,18 @@ SELECT toUnixTimestamp('2017-11-05 08:07:47', 'Asia/Tokyo') AS unix_timestamp;
```
:::note
Тип возвращаемого описанными далее функциями `toStartOf*`, `toMonday` значения - `Date` или `DateTime`.
Хотя эти функции могут принимать значения типа `Date32` или `DateTime64` в качестве аргумента, при обработке аргумента вне нормального диапазона значений (`1970` - `2148` для `Date` и `1970-01-01 00:00:00`-`2106-02-07 08:28:15` для `DateTime`) будет получен некорректный результат.
Возвращаемые значения для значений вне нормального диапазона:
* `1970-01-01 (00:00:00)` будет возвращён для моментов времени до 1970 года,
* `2106-02-07 08:28:15` будет взят в качестве аргумента, если полученный аргумент превосходит данное значение и возвращаемый тип - `DateTime`,
* `2149-06-06` будет взят в качестве аргумента, если полученный аргумент превосходит данное значение и возвращаемый тип - `Date`,
* `2149-05-31` будет результатом функции `toLastDayOfMonth` при обработке аргумента больше `2149-05-31`.
Тип возвращаемого значения описанными далее функциями `toStartOf*`, `toLastDayOfMonth`, `toMonday` определяется конфигурационным параметром [enable_extended_results_for_datetime_functions](../../operations/settings/settings#enable-extended-results-for-datetime-functions) имеющим по умолчанию значение `0`.
Поведение для
* `enable_extended_results_for_datetime_functions = 0`: Функции `toStartOf*`, `toLastDayOfMonth`, `toMonday` возвращают `Date` или `DateTime`. Хотя эти функции могут принимать значения типа `Date32` или `DateTime64` в качестве аргумента, при обработке аргумента вне нормального диапазона значений (`1970` - `2148` для `Date` и `1970-01-01 00:00:00`-`2106-02-07 08:28:15` для `DateTime`) будет получен некорректный результат.
В случае если значение аргумента вне нормального диапазона:
* `1970-01-01 (00:00:00)` будет возвращён для моментов времени до 1970 года,
* `2106-02-07 08:28:15` будет взят в качестве аргумента, если полученный аргумент превосходит данное значение и возвращаемый тип - `DateTime`,
* `2149-06-06` будет взят в качестве аргумента, если полученный аргумент превосходит данное значение и возвращаемый тип - `Date`,
* `2149-05-31` будет результатом функции `toLastDayOfMonth` при обработке аргумента больше `2149-05-31`.
* `enable_extended_results_for_datetime_functions = 1`: Функции `toStartOfYear`, `toStartOfISOYear`, `toStartOfQuarter`, `toStartOfMonth`, `toStartOfWeek`, `toLastDayOfMonth`, `toMonday` возвращают `Date` или `DateTime` если их аргумент `Date` или `DateTime` и они возвращают `Date32` или `DateTime64` если их аргумент `Date32` или `DateTime64`.
:::
:::note
Тип возвращаемого описанными далее функциями `toStartOf*`, `toLastDayOfMonth`, `toMonday` значения - `Date` или `DateTime`.
Хотя эти функции могут принимать значения типа `Date32` или `DateTime64` в качестве аргумента, при обработке аргумента вне нормального диапазона значений (`1970` - `2148` для `Date` и `1970-01-01 00:00:00`-`2106-02-07 08:28:15` для `DateTime`) будет получен некорректный результат.
Возвращаемые значения для значений вне нормального диапазона:
* `1970-01-01 (00:00:00)` будет возвращён для моментов времени до 1970 года,
* `2106-02-07 08:28:15` будет взят в качестве аргумента, если полученный аргумент превосходит данное значение и возвращаемый тип - `DateTime`,
* `2149-06-06` будет взят в качестве аргумента, если полученный аргумент превосходит данное значение и возвращаемый тип - `Date`.
:::
*
## toStartOfYear {#tostartofyear}
Округляет дату или дату-с-временем вниз до первого дня года.
@ -324,6 +318,8 @@ SELECT toStartOfISOYear(toDate('2017-01-01')) AS ISOYear20170101;
Округляет дату или дату-с-временем до последнего числа месяца.
Возвращается дата.
Если `toLastDayOfMonth` вызывается с аргументом типа `Date` большим чем 2149-05-31, то результат будет вычислен от аргумента 2149-05-31.
## toMonday {#tomonday}
Округляет дату или дату-с-временем вниз до ближайшего понедельника.

View File

@ -150,7 +150,7 @@ std::vector<String> Client::loadWarningMessages()
size_t rows = packet.block.rows();
for (size_t i = 0; i < rows; ++i)
messages.emplace_back(column.getDataAt(i).toString());
messages.emplace_back(column[i].get<String>());
}
continue;

View File

@ -98,7 +98,7 @@ void placeStringColumn(const ColumnString & column, const char ** buffer, size_t
size_t size = column.size();
for (size_t i = 0; i < size; ++i)
{
*buffer = const_cast<char *>(column.getDataAtWithTerminatingZero(i).data);
*buffer = const_cast<char *>(column.getDataAt(i).data);
buffer += features_count;
}
}

View File

@ -492,7 +492,7 @@ public:
void insertResultInto(IColumn & to) const
{
if (has())
assert_cast<ColumnString &>(to).insertDataWithTerminatingZero(getData(), size);
assert_cast<ColumnString &>(to).insertData(getData(), size);
else
assert_cast<ColumnString &>(to).insertDefault();
}
@ -569,7 +569,7 @@ public:
void change(const IColumn & column, size_t row_num, Arena * arena)
{
changeImpl(assert_cast<const ColumnString &>(column).getDataAtWithTerminatingZero(row_num), arena);
changeImpl(assert_cast<const ColumnString &>(column).getDataAt(row_num), arena);
}
void change(const Self & to, Arena * arena)
@ -618,7 +618,7 @@ public:
bool changeIfLess(const IColumn & column, size_t row_num, Arena * arena)
{
if (!has() || assert_cast<const ColumnString &>(column).getDataAtWithTerminatingZero(row_num) < getStringRef())
if (!has() || assert_cast<const ColumnString &>(column).getDataAt(row_num) < getStringRef())
{
change(column, row_num, arena);
return true;
@ -640,7 +640,7 @@ public:
bool changeIfGreater(const IColumn & column, size_t row_num, Arena * arena)
{
if (!has() || assert_cast<const ColumnString &>(column).getDataAtWithTerminatingZero(row_num) > getStringRef())
if (!has() || assert_cast<const ColumnString &>(column).getDataAt(row_num) > getStringRef())
{
change(column, row_num, arena);
return true;
@ -667,7 +667,7 @@ public:
bool isEqualTo(const IColumn & column, size_t row_num) const
{
return has() && assert_cast<const ColumnString &>(column).getDataAtWithTerminatingZero(row_num) == getStringRef();
return has() && assert_cast<const ColumnString &>(column).getDataAt(row_num) == getStringRef();
}
static bool allocatesMemoryInArena()

View File

@ -187,9 +187,8 @@ void Suggest::fillWordsFromBlock(const Block & block)
Words new_words;
new_words.reserve(rows);
for (size_t i = 0; i < rows; ++i)
{
new_words.emplace_back(column.getDataAt(i).toString());
}
new_words.emplace_back(column[i].get<String>());
addWords(std::move(new_words));
}

View File

@ -151,23 +151,24 @@ void ColumnArray::get(size_t n, Field & res) const
StringRef ColumnArray::getDataAt(size_t n) const
{
assert(n < size());
/** Returns the range of memory that covers all elements of the array.
* Works for arrays of fixed length values.
* For arrays of strings and arrays of arrays, the resulting chunk of memory may not be one-to-one correspondence with the elements,
* since it contains only the data laid in succession, but not the offsets.
*/
size_t offset_of_first_elem = offsetAt(n);
StringRef first = getData().getDataAtWithTerminatingZero(offset_of_first_elem);
/// We are using pointer arithmetic on the addresses of the array elements.
if (!data->isFixedAndContiguous())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method getDataAt is not supported for {}", getName());
size_t array_size = sizeAt(n);
if (array_size == 0)
return StringRef(first.data, 0);
return StringRef(nullptr, 0);
size_t offset_of_last_elem = getOffsets()[n] - 1;
StringRef last = getData().getDataAtWithTerminatingZero(offset_of_last_elem);
size_t offset_of_first_elem = offsetAt(n);
StringRef first = getData().getDataAt(offset_of_first_elem);
return StringRef(first.data, last.data + last.size - first.data);
return StringRef(first.data, first.size * array_size);
}
@ -183,7 +184,7 @@ void ColumnArray::insertData(const char * pos, size_t length)
/** Similarly - only for arrays of fixed length values.
*/
if (!data->isFixedAndContiguous())
throw Exception("Method insertData is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method insertData is not supported for {}", getName());
size_t field_size = data->sizeOfValueIfFixed();

View File

@ -81,11 +81,6 @@ public:
return data->getDataAt(0);
}
StringRef getDataAtWithTerminatingZero(size_t) const override
{
return data->getDataAtWithTerminatingZero(0);
}
UInt64 get64(size_t) const override
{
return data->get64(0);

View File

@ -59,10 +59,6 @@ public:
void get(size_t n, Field & res) const override { getDictionary().get(getIndexes().getUInt(n), res); }
StringRef getDataAt(size_t n) const override { return getDictionary().getDataAt(getIndexes().getUInt(n)); }
StringRef getDataAtWithTerminatingZero(size_t n) const override
{
return getDictionary().getDataAtWithTerminatingZero(getIndexes().getUInt(n));
}
bool isDefaultAt(size_t n) const override { return getDictionary().isDefaultAt(getIndexes().getUInt(n)); }
UInt64 get64(size_t n) const override { return getDictionary().get64(getIndexes().getUInt(n)); }

View File

@ -108,12 +108,6 @@ public:
return StringRef(&chars[offsetAt(n)], sizeAt(n) - 1);
}
StringRef getDataAtWithTerminatingZero(size_t n) const override
{
assert(n < size());
return StringRef(&chars[offsetAt(n)], sizeAt(n));
}
bool isDefaultAt(size_t n) const override
{
assert(n < size());
@ -177,17 +171,6 @@ public:
offsets.push_back(new_size);
}
/// Like getData, but inserting data should be zero-ending (i.e. length is 1 byte greater than real string size).
void insertDataWithTerminatingZero(const char * pos, size_t length)
{
const size_t old_size = chars.size();
const size_t new_size = old_size + length;
chars.resize(new_size);
memcpy(chars.data() + old_size, pos, length);
offsets.push_back(new_size);
}
void popBack(size_t n) override
{
size_t nested_n = offsets.back() - offsetAt(offsets.size() - n);

View File

@ -70,10 +70,6 @@ public:
void get(size_t n, Field & res) const override { getNestedColumn()->get(n, res); }
bool isDefaultAt(size_t n) const override { return n == 0; }
StringRef getDataAt(size_t n) const override { return getNestedColumn()->getDataAt(n); }
StringRef getDataAtWithTerminatingZero(size_t n) const override
{
return getNestedColumn()->getDataAtWithTerminatingZero(n);
}
UInt64 get64(size_t n) const override { return getNestedColumn()->get64(n); }
UInt64 getUInt(size_t n) const override { return getNestedColumn()->getUInt(n); }
Int64 getInt(size_t n) const override { return getNestedColumn()->getInt(n); }

View File

@ -106,13 +106,6 @@ public:
/// Is used to optimize some computations (in aggregation, for example).
[[nodiscard]] virtual StringRef getDataAt(size_t n) const = 0;
/// Like getData, but has special behavior for columns that contain variable-length strings.
/// Returns zero-ending memory chunk (i.e. its size is 1 byte longer).
[[nodiscard]] virtual StringRef getDataAtWithTerminatingZero(size_t n) const
{
return getDataAt(n);
}
/// If column stores integers, it returns n-th element transformed to UInt64 using static_cast.
/// If column stores floating point numbers, bits of n-th elements are copied to lower bits of UInt64, the remaining bits are zeros.
/// Is used to optimize some computations (in aggregation, for example).

View File

@ -1,8 +1,11 @@
#include <Common/FieldVisitorsAccurateComparison.h>
#include <DataTypes/getLeastSupertype.h>
#include <DataTypes/ObjectUtils.h>
#include <DataTypes/DataTypeTuple.h>
#include <Interpreters/castColumn.h>
#include <Interpreters/convertFieldToType.h>
#include <Columns/ColumnObject.h>
#include <Columns/ColumnTuple.h>
#include <Common/FieldVisitorToString.h>
#include <Common/randomSeed.h>
@ -118,3 +121,36 @@ TEST(ColumnObject, InsertRangeFrom)
checkFieldsAreEqual(subcolumn_dst, fields_dst);
}
}
TEST(ColumnObject, Unflatten)
{
auto check_empty_tuple = [](const auto & type, const auto & column)
{
const auto & type_tuple = assert_cast<const DataTypeTuple &>(*type);
const auto & column_tuple = assert_cast<const ColumnTuple &>(*column);
ASSERT_EQ(type_tuple.getElements().size(), 1);
ASSERT_EQ(type_tuple.getElements()[0]->getName(), "UInt8");
ASSERT_EQ(type_tuple.getElementNames()[0], ColumnObject::COLUMN_NAME_DUMMY);
ASSERT_EQ(column_tuple.getColumns().size(), 1);
ASSERT_EQ(column_tuple.getColumns()[0]->getName(), "UInt8");
};
{
auto column_object = ColumnObject::create(false);
auto [column, type] = unflattenObjectToTuple(*column_object);
check_empty_tuple(type, column);
ASSERT_EQ(column->size(), 0);
}
{
auto column_object = ColumnObject::create(false);
column_object->insertManyDefaults(5);
auto [column, type] = unflattenObjectToTuple(*column_object);
check_empty_tuple(type, column);
ASSERT_EQ(column->size(), 5);
}
}

View File

@ -189,9 +189,6 @@ KeeperConfigurationAndSettings::loadFromConfig(const Poco::Util::AbstractConfigu
ret->coordination_settings->loadFromConfig("keeper_server.coordination_settings", config);
if (ret->coordination_settings->quorum_reads)
LOG_WARNING(&Poco::Logger::get("KeeperConfigurationAndSettings"), "Setting 'quorum_reads' is deprecated. Please use 'read_mode'");
return ret;
}

View File

@ -26,7 +26,6 @@ struct Settings;
M(Milliseconds, heart_beat_interval_ms, 500, "Heartbeat interval between quorum nodes", 0) \
M(Milliseconds, election_timeout_lower_bound_ms, 1000, "Lower bound of election timer (avoid too often leader elections)", 0) \
M(Milliseconds, election_timeout_upper_bound_ms, 2000, "Upper bound of election timer (avoid too often leader elections)", 0) \
M(Milliseconds, leadership_expiry, 0, "How often will leader node check if it still has majority. Set it lower or equal to election_timeout_lower_bound_ms to have linearizable reads.", 0) \
M(UInt64, reserved_log_items, 100000, "How many log items to store (don't remove during compaction)", 0) \
M(UInt64, snapshot_distance, 100000, "How many log items we have to collect to write new snapshot", 0) \
M(Bool, auto_forwarding, true, "Allow to forward write requests from followers to leader", 0) \
@ -39,12 +38,11 @@ struct Settings;
M(UInt64, stale_log_gap, 10000, "When node became stale and should receive snapshots from leader", 0) \
M(UInt64, fresh_log_gap, 200, "When node became fresh", 0) \
M(UInt64, max_requests_batch_size, 100, "Max size of batch in requests count before it will be sent to RAFT", 0) \
M(Bool, quorum_reads, false, "Deprecated - use read_mode. Execute read requests as writes through whole RAFT consesus with similar speed", 0) \
M(Bool, quorum_reads, false, "Execute read requests as writes through whole RAFT consesus with similar speed", 0) \
M(Bool, force_sync, true, "Call fsync on each change in RAFT changelog", 0) \
M(Bool, compress_logs, true, "Write compressed coordination logs in ZSTD format", 0) \
M(Bool, compress_snapshots_with_zstd_format, true, "Write compressed snapshots in ZSTD format (instead of custom LZ4)", 0) \
M(UInt64, configuration_change_tries_count, 20, "How many times we will try to apply configuration change (add/remove server) to the cluster", 0) \
M(String, read_mode, "nonlinear", "How should reads be processed. Valid values: 'nonlinear', 'fastlinear', 'quorum'. 'nonlinear' is the fastest option because there are no consistency requirements", 0)
M(UInt64, configuration_change_tries_count, 20, "How many times we will try to apply configuration change (add/remove server) to the cluster", 0)
DECLARE_SETTINGS_TRAITS(CoordinationSettingsTraits, LIST_OF_COORDINATION_SETTINGS)

View File

@ -1,5 +1,4 @@
#include <Coordination/KeeperDispatcher.h>
#include <libnuraft/async.hxx>
#include <Common/setThreadName.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <future>
@ -7,8 +6,6 @@
#include <Poco/Path.h>
#include <Common/hex.h>
#include <filesystem>
#include <iterator>
#include <limits>
#include <Common/checkStackSize.h>
#include <Common/CurrentMetrics.h>
@ -33,83 +30,22 @@ namespace ErrorCodes
KeeperDispatcher::KeeperDispatcher()
: responses_queue(std::numeric_limits<size_t>::max())
, read_requests_queue(std::numeric_limits<size_t>::max())
, finalize_requests_queue(std::numeric_limits<size_t>::max())
, configuration_and_settings(std::make_shared<KeeperConfigurationAndSettings>())
, log(&Poco::Logger::get("KeeperDispatcher"))
{
}
/// ZooKeepers has 2 requirements:
/// - writes need to be linearizable
/// - all requests from single session need to be processed in the order of their arrival
///
/// Because of that, we cannot process read and write requests from SAME session at the same time.
/// To be able to process read and write requests in parallel we need to make sure that only 1 type
/// of request is being processed from a single session.
/// Multiple types from different sessions can be processed at the same time.
///
/// We do some in-session housekeeping to make sure that the multithreaded request processing is correct.
/// When a request is received from a client, we check if there are requests being processed from that same
/// session, and if yes, of what type. If the types are the same, and there are no requests of different
/// type inbetetween, we can instanly add it to active request queue. Otherwise, we need to wait until
/// all requests of the other type are processed.
///
/// There are multiple threads used for processing the request, each of them communicating with a queue.
/// Assumption: only one type of request is being processed from a same session at any point in time (read or write).
///
/// requestThread -> requests currently being processed
/// readRequestThread -> thread for processing read requests
/// finalizeRequestThread -> thread for finalizing requests:
/// - in-session housekeeping, add requests to the active request queue if there are any
///
/// If reads are linearizable without quorum, a request can possibly wait for a certain log to be committed.
/// In that case we add it to the waiting queue for that log.
/// When that log is committed, the committing thread will send that read request to readRequestThread so it can be processed.
///
void KeeperDispatcher::requestThread()
{
setThreadName("KeeperReqT");
/// Result of requests batch from previous iteration
RaftResult prev_result = nullptr;
const auto previous_quorum_done = [&] { return !prev_result || prev_result->has_result() || prev_result->get_result_code() != nuraft::cmd_result_code::OK; };
RaftAppendResult prev_result = nullptr;
/// Requests from previous iteration. We store them to be able
/// to send errors to the client.
KeeperStorage::RequestsForSessions prev_batch;
const auto needs_quorum = [](const auto & coordination_settings, const auto & request)
{
return coordination_settings->quorum_reads || coordination_settings->read_mode.toString() == "quorum" || !request.request->isReadRequest();
};
KeeperStorage::RequestsForSessions quorum_requests;
KeeperStorage::RequestsForSessions read_requests;
auto process_quorum_requests = [&, this]() mutable
{
/// Forcefully process all previous pending requests
if (prev_result)
forceWaitAndProcessResult(prev_result);
prev_result = server->putRequestBatch(quorum_requests);
if (prev_result)
{
prev_result->when_ready([&, requests_for_sessions = std::move(quorum_requests)](nuraft::cmd_result<nuraft::ptr<nuraft::buffer>> & result, nuraft::ptr<std::exception> &) mutable
{
if (!result.get_accepted() || result.get_result_code() == nuraft::cmd_result_code::TIMEOUT)
addErrorResponses(requests_for_sessions, Coordination::Error::ZOPERATIONTIMEOUT);
else if (result.get_result_code() != nuraft::cmd_result_code::OK)
addErrorResponses(requests_for_sessions, Coordination::Error::ZCONNECTIONLOSS);
});
}
quorum_requests.clear();
};
/// ZooKeeper requires that the requests inside a single session are processed in a strict order
/// (we cannot process later requests before all the previous once are processed)
/// By making sure that at this point we can either have just read requests or just write requests
/// from a single session, we can process them independently
while (!shutdown_called)
{
KeeperStorage::RequestForSession request;
@ -118,67 +54,94 @@ void KeeperDispatcher::requestThread()
uint64_t max_wait = coordination_settings->operation_timeout_ms.totalMilliseconds();
uint64_t max_batch_size = coordination_settings->max_requests_batch_size;
/// The code below do a very simple thing: batch all write (quorum) requests into vector until
/// previous write batch is not finished or max_batch size achieved. The main complexity goes from
/// the ability to process read requests without quorum (from local state). So when we are collecting
/// requests into a batch we must check that the new request is not read request. Otherwise we have to
/// process all already accumulated write requests, wait them synchronously and only after that process
/// read request. So reads are some kind of "separator" for writes.
try
{
if (active_requests_queue->tryPop(request, max_wait))
if (requests_queue->tryPop(request, max_wait))
{
CurrentMetrics::sub(CurrentMetrics::KeeperOutstandingRequets);
if (shutdown_called)
break;
if (needs_quorum(coordination_settings, request))
quorum_requests.emplace_back(request);
else
read_requests.emplace_back(request);
KeeperStorage::RequestsForSessions current_batch;
bool has_read_request = false;
/// If new request is not read request or we must to process it through quorum.
/// Otherwise we will process it locally.
if (coordination_settings->quorum_reads || !request.request->isReadRequest())
{
current_batch.emplace_back(request);
/// Waiting until previous append will be successful, or batch is big enough
/// has_result == false && get_result_code == OK means that our request still not processed.
/// Sometimes NuRaft set errorcode without setting result, so we check both here.
while (true)
while (prev_result && (!prev_result->has_result() && prev_result->get_result_code() == nuraft::cmd_result_code::OK) && current_batch.size() <= max_batch_size)
{
if (quorum_requests.size() > max_batch_size)
break;
if (read_requests.size() > max_batch_size)
{
processReadRequests(coordination_settings, read_requests);
if (previous_quorum_done())
break;
}
/// Trying to get batch requests as fast as possible
if (active_requests_queue->tryPop(request, 1))
if (requests_queue->tryPop(request, 1))
{
CurrentMetrics::sub(CurrentMetrics::KeeperOutstandingRequets);
if (needs_quorum(coordination_settings, request))
quorum_requests.emplace_back(request);
else
read_requests.emplace_back(request);
/// Don't append read request into batch, we have to process them separately
if (!coordination_settings->quorum_reads && request.request->isReadRequest())
{
has_read_request = true;
break;
}
else
{
/// batch of read requests can send at most one request
/// so we don't care if the previous batch hasn't received response
if (!read_requests.empty())
processReadRequests(coordination_settings, read_requests);
/// if we still didn't process previous batch we can
/// increase are current batch even more
if (previous_quorum_done())
break;
current_batch.emplace_back(request);
}
}
if (shutdown_called)
break;
}
}
else
has_read_request = true;
if (shutdown_called)
break;
if (!quorum_requests.empty())
process_quorum_requests();
/// Forcefully process all previous pending requests
if (prev_result)
forceWaitAndProcessResult(prev_result, prev_batch);
/// Process collected write requests batch
if (!current_batch.empty())
{
auto result = server->putRequestBatch(current_batch);
if (result)
{
if (has_read_request) /// If we will execute read request next, than we have to process result now
forceWaitAndProcessResult(result, current_batch);
}
else
{
addErrorResponses(current_batch, Coordination::Error::ZCONNECTIONLOSS);
current_batch.clear();
}
prev_batch = std::move(current_batch);
prev_result = result;
}
/// Read request always goes after write batch (last request)
if (has_read_request)
{
if (server->isLeaderAlive())
server->putLocalReadRequest(request);
else
addErrorResponses({request}, Coordination::Error::ZCONNECTIONLOSS);
}
}
}
catch (...)
@ -188,72 +151,6 @@ void KeeperDispatcher::requestThread()
}
}
void KeeperDispatcher::processReadRequests(const CoordinationSettingsPtr & coordination_settings, KeeperStorage::RequestsForSessions & read_requests)
{
if (coordination_settings->read_mode.toString() == "fastlinear")
{
// we just want to know what's the current latest committed log on Leader node
auto leader_info_result = server->getLeaderInfo();
if (leader_info_result)
{
leader_info_result->when_ready([&, requests_for_sessions = std::move(read_requests)](nuraft::cmd_result<nuraft::ptr<nuraft::buffer>> & result, nuraft::ptr<std::exception> & exception) mutable
{
if (!result.get_accepted() || result.get_result_code() == nuraft::cmd_result_code::TIMEOUT)
{
addErrorResponses(requests_for_sessions, Coordination::Error::ZOPERATIONTIMEOUT);
return;
}
if (result.get_result_code() != nuraft::cmd_result_code::OK)
{
addErrorResponses(requests_for_sessions, Coordination::Error::ZCONNECTIONLOSS);
return;
}
if (exception)
{
LOG_INFO(log, "Got exception while waiting for read results {}", exception->what());
addErrorResponses(requests_for_sessions, Coordination::Error::ZCONNECTIONLOSS);
return;
}
auto & leader_info_ctx = result.get();
if (!leader_info_ctx)
{
addErrorResponses(requests_for_sessions, Coordination::Error::ZCONNECTIONLOSS);
return;
}
KeeperServer::NodeInfo leader_info;
leader_info.term = leader_info_ctx->get_ulong();
leader_info.last_committed_index = leader_info_ctx->get_ulong();
std::lock_guard lock(leader_waiter_mutex);
auto node_info = server->getNodeInfo();
/// we're behind, we need to wait
if (node_info.term < leader_info.term || node_info.last_committed_index < leader_info.last_committed_index)
{
auto & leader_waiter = leader_waiters[leader_info];
leader_waiter.insert(leader_waiter.end(), requests_for_sessions.begin(), requests_for_sessions.end());
LOG_TRACE(log, "waiting for term {}, idx {}", leader_info.term, leader_info.last_committed_index);
}
/// process it in background thread
else if (!read_requests_queue.push(std::move(requests_for_sessions)))
throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push read requests to queue");
});
}
}
else
{
assert(coordination_settings->read_mode.toString() == "nonlinear");
if (!read_requests_queue.push(std::move(read_requests)))
throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push read requests to queue");
}
read_requests.clear();
}
void KeeperDispatcher::responseThread()
{
setThreadName("KeeperRspT");
@ -303,65 +200,6 @@ void KeeperDispatcher::snapshotThread()
}
}
/// Background thread for processing read requests
void KeeperDispatcher::readRequestThread()
{
setThreadName("KeeperReadT");
while (!shutdown_called)
{
KeeperStorage::RequestsForSessions requests;
if (!read_requests_queue.pop(requests))
break;
if (shutdown_called)
break;
try
{
for (const auto & request_info : requests)
{
if (server->isLeaderAlive())
server->putLocalReadRequest(request_info);
else
addErrorResponses({request_info}, Coordination::Error::ZCONNECTIONLOSS);
}
if (!finalize_requests_queue.push(std::move(requests)))
throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push read requests to queue");
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
/// We finalize requests every time we commit a single log with request
/// or process a batch of read requests.
/// Because it can get heavy, we do it in background thread.
void KeeperDispatcher::finalizeRequestsThread()
{
setThreadName("KeeperFinalT");
while (!shutdown_called)
{
KeeperStorage::RequestsForSessions requests;
if (!finalize_requests_queue.pop(requests))
break;
if (shutdown_called)
break;
try
{
finalizeRequests(requests);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
void KeeperDispatcher::setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response)
{
std::lock_guard lock(session_to_response_callback_mutex);
@ -417,30 +255,6 @@ bool KeeperDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & requ
request_info.time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
request_info.session_id = session_id;
{
std::lock_guard lock{unprocessed_request_mutex};
auto unprocessed_requests_it = unprocessed_requests_for_session.find(session_id);
if (unprocessed_requests_it == unprocessed_requests_for_session.end())
{
auto & unprocessed_requests = unprocessed_requests_for_session[session_id];
unprocessed_requests.unprocessed_num = 1;
unprocessed_requests.is_read = request->isReadRequest();
}
else
{
auto & unprocessed_requests = unprocessed_requests_it->second;
/// queue is not empty, or the request types don't match, put it in the waiting queue
if (!unprocessed_requests.request_queue.empty() || unprocessed_requests.is_read != request->isReadRequest())
{
unprocessed_requests.request_queue.push_back(std::move(request_info));
return true;
}
++unprocessed_requests.unprocessed_num;
}
}
std::lock_guard lock(push_request_mutex);
if (shutdown_called)
@ -449,10 +263,10 @@ bool KeeperDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & requ
/// Put close requests without timeouts
if (request->getOpNum() == Coordination::OpNum::Close)
{
if (!active_requests_queue->push(std::move(request_info)))
if (!requests_queue->push(std::move(request_info)))
throw Exception("Cannot push request to queue", ErrorCodes::SYSTEM_ERROR);
}
else if (!active_requests_queue->tryPush(std::move(request_info), configuration_and_settings->coordination_settings->operation_timeout_ms.totalMilliseconds()))
else if (!requests_queue->tryPush(std::move(request_info), configuration_and_settings->coordination_settings->operation_timeout_ms.totalMilliseconds()))
{
throw Exception("Cannot push request to queue within operation timeout", ErrorCodes::TIMEOUT_EXCEEDED);
}
@ -465,23 +279,13 @@ void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & conf
LOG_DEBUG(log, "Initializing storage dispatcher");
configuration_and_settings = KeeperConfigurationAndSettings::loadFromConfig(config, standalone_keeper);
active_requests_queue = std::make_unique<RequestsQueue>(configuration_and_settings->coordination_settings->max_requests_batch_size);
requests_queue = std::make_unique<RequestsQueue>(configuration_and_settings->coordination_settings->max_requests_batch_size);
request_thread = ThreadFromGlobalPool([this] { requestThread(); });
responses_thread = ThreadFromGlobalPool([this] { responseThread(); });
snapshot_thread = ThreadFromGlobalPool([this] { snapshotThread(); });
read_request_thread = ThreadFromGlobalPool([this] { readRequestThread(); });
finalize_requests_thread = ThreadFromGlobalPool([this] { finalizeRequestsThread(); });
server = std::make_unique<KeeperServer>(
configuration_and_settings,
config,
responses_queue,
snapshots_queue,
[this](const KeeperStorage::RequestForSession & request_for_session, uint64_t log_term, uint64_t log_idx)
{ onRequestCommit(request_for_session, log_term, log_idx); },
[this](uint64_t term, uint64_t last_idx)
{ onApplySnapshot(term, last_idx); });
server = std::make_unique<KeeperServer>(configuration_and_settings, config, responses_queue, snapshots_queue);
try
{
@ -529,9 +333,9 @@ void KeeperDispatcher::shutdown()
if (session_cleaner_thread.joinable())
session_cleaner_thread.join();
if (active_requests_queue)
if (requests_queue)
{
active_requests_queue->finish();
requests_queue->finish();
if (request_thread.joinable())
request_thread.join();
@ -545,14 +349,6 @@ void KeeperDispatcher::shutdown()
if (snapshot_thread.joinable())
snapshot_thread.join();
read_requests_queue.finish();
if (read_request_thread.joinable())
read_request_thread.join();
finalize_requests_queue.finish();
if (finalize_requests_thread.joinable())
finalize_requests_thread.join();
update_configuration_queue.finish();
if (update_configuration_thread.joinable())
update_configuration_thread.join();
@ -561,7 +357,7 @@ void KeeperDispatcher::shutdown()
KeeperStorage::RequestForSession request_for_session;
/// Set session expired for all pending requests
while (active_requests_queue && active_requests_queue->tryPop(request_for_session))
while (requests_queue && requests_queue->tryPop(request_for_session))
{
CurrentMetrics::sub(CurrentMetrics::KeeperOutstandingRequets);
auto response = request_for_session.request->makeResponse();
@ -678,7 +474,7 @@ void KeeperDispatcher::sessionCleanerTask()
};
{
std::lock_guard lock(push_request_mutex);
if (!active_requests_queue->push(std::move(request_info)))
if (!requests_queue->push(std::move(request_info)))
LOG_INFO(log, "Cannot push close request to queue while cleaning outdated sessions");
CurrentMetrics::add(CurrentMetrics::KeeperOutstandingRequets);
}
@ -728,12 +524,19 @@ void KeeperDispatcher::addErrorResponses(const KeeperStorage::RequestsForSession
}
}
void KeeperDispatcher::forceWaitAndProcessResult(RaftResult & result)
void KeeperDispatcher::forceWaitAndProcessResult(RaftAppendResult & result, KeeperStorage::RequestsForSessions & requests_for_sessions)
{
if (!result->has_result())
result->get();
/// If we get some errors, than send them to clients
if (!result->get_accepted() || result->get_result_code() == nuraft::cmd_result_code::TIMEOUT)
addErrorResponses(requests_for_sessions, Coordination::Error::ZOPERATIONTIMEOUT);
else if (result->get_result_code() != nuraft::cmd_result_code::OK)
addErrorResponses(requests_for_sessions, Coordination::Error::ZCONNECTIONLOSS);
result = nullptr;
requests_for_sessions.clear();
}
int64_t KeeperDispatcher::getSessionID(int64_t session_timeout_ms)
@ -781,7 +584,7 @@ int64_t KeeperDispatcher::getSessionID(int64_t session_timeout_ms)
/// Push new session request to queue
{
std::lock_guard lock(push_request_mutex);
if (!active_requests_queue->tryPush(std::move(request_info), session_timeout_ms))
if (!requests_queue->tryPush(std::move(request_info), session_timeout_ms))
throw Exception("Cannot push session id request to queue within session timeout", ErrorCodes::TIMEOUT_EXCEEDED);
CurrentMetrics::add(CurrentMetrics::KeeperOutstandingRequets);
}
@ -854,122 +657,6 @@ void KeeperDispatcher::updateConfigurationThread()
}
}
// Used to update the state for a session based on the requests
// - update the number of current unprocessed requests for the session
// - if the number of unprocessed requests is 0, we can start adding next type of requests
// from unprocessed requests queue to the active queue
void KeeperDispatcher::finalizeRequests(const KeeperStorage::RequestsForSessions & requests_for_sessions)
{
std::unordered_map<int64_t, size_t> counts_for_session;
for (const auto & request_for_session : requests_for_sessions)
{
++counts_for_session[request_for_session.session_id];
}
std::lock_guard lock{unprocessed_request_mutex};
for (const auto [session_id, count] : counts_for_session)
{
auto unprocessed_requests_it = unprocessed_requests_for_session.find(session_id);
if (unprocessed_requests_it == unprocessed_requests_for_session.end())
continue;
auto & unprocessed_requests = unprocessed_requests_it->second;
unprocessed_requests.unprocessed_num -= count;
if (unprocessed_requests.unprocessed_num == 0)
{
if (!unprocessed_requests.request_queue.empty())
{
auto & unprocessed_requests_queue = unprocessed_requests.request_queue;
unprocessed_requests.is_read = !unprocessed_requests.is_read;
// start adding next type of requests
while (!unprocessed_requests_queue.empty() && unprocessed_requests_queue.front().request->isReadRequest() == unprocessed_requests.is_read)
{
auto & front_request = unprocessed_requests_queue.front();
/// Put close requests without timeouts
if (front_request.request->getOpNum() == Coordination::OpNum::Close)
{
if (!active_requests_queue->push(std::move(front_request)))
throw Exception("Cannot push request to queue", ErrorCodes::SYSTEM_ERROR);
}
else if (!active_requests_queue->tryPush(std::move(front_request), configuration_and_settings->coordination_settings->operation_timeout_ms.totalMilliseconds()))
{
throw Exception("Cannot push request to queue within operation timeout", ErrorCodes::TIMEOUT_EXCEEDED);
}
++unprocessed_requests.unprocessed_num;
unprocessed_requests_queue.pop_front();
}
}
else
{
unprocessed_requests_for_session.erase(unprocessed_requests_it);
}
}
}
}
// Finalize request
// Process read requests that were waiting for this commit
void KeeperDispatcher::onRequestCommit(const KeeperStorage::RequestForSession & request_for_session, uint64_t log_term, uint64_t log_idx)
{
if (!finalize_requests_queue.push({request_for_session}))
throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push read requests to queue");
KeeperStorage::RequestsForSessions requests;
{
std::lock_guard lock(leader_waiter_mutex);
auto request_queue_it = leader_waiters.find(KeeperServer::NodeInfo{.term = log_term, .last_committed_index = log_idx});
if (request_queue_it != leader_waiters.end())
{
requests = std::move(request_queue_it->second);
leader_waiters.erase(request_queue_it);
}
}
if (requests.empty())
return;
if (!read_requests_queue.push(std::move(requests)))
throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push read requests to queue");
}
/// Process all read request that are waiting for lower or currently last processed log index
void KeeperDispatcher::onApplySnapshot(uint64_t term, uint64_t last_idx)
{
KeeperServer::NodeInfo current_node_info{term, last_idx};
KeeperStorage::RequestsForSessions requests;
{
std::lock_guard lock(leader_waiter_mutex);
for (auto leader_waiter_it = leader_waiters.begin(); leader_waiter_it != leader_waiters.end();)
{
auto waiting_node_info = leader_waiter_it->first;
if (waiting_node_info.term <= current_node_info.term
&& waiting_node_info.last_committed_index <= current_node_info.last_committed_index)
{
for (auto & request : leader_waiter_it->second)
{
requests.push_back(std::move(request));
}
leader_waiter_it = leader_waiters.erase(leader_waiter_it);
}
else
{
++leader_waiter_it;
}
}
}
if (requests.empty())
return;
if (!read_requests_queue.push(std::move(requests)))
throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push read requests to queue");
}
bool KeeperDispatcher::isServerActive() const
{
return checkInit() && hasLeader() && !server->isRecovering();
@ -1034,7 +721,7 @@ Keeper4LWInfo KeeperDispatcher::getKeeper4LWInfo() const
Keeper4LWInfo result = server->getPartiallyFilled4LWInfo();
{
std::lock_guard lock(push_request_mutex);
result.outstanding_requests_count = active_requests_queue->size();
result.outstanding_requests_count = requests_queue->size();
}
{
std::lock_guard lock(session_to_response_callback_mutex);

View File

@ -32,12 +32,9 @@ private:
using UpdateConfigurationQueue = ConcurrentBoundedQueue<ConfigUpdateAction>;
/// Size depends on coordination settings
/// Request currently being processed
std::unique_ptr<RequestsQueue> active_requests_queue;
std::unique_ptr<RequestsQueue> requests_queue;
ResponsesQueue responses_queue;
SnapshotsQueue snapshots_queue{1};
ConcurrentBoundedQueue<KeeperStorage::RequestsForSessions> read_requests_queue;
ConcurrentBoundedQueue<KeeperStorage::RequestsForSessions> finalize_requests_queue;
/// More than 1k updates is definitely misconfiguration.
UpdateConfigurationQueue update_configuration_queue{1000};
@ -67,8 +64,6 @@ private:
ThreadFromGlobalPool snapshot_thread;
/// Apply or wait for configuration changes
ThreadFromGlobalPool update_configuration_thread;
ThreadFromGlobalPool read_request_thread;
ThreadFromGlobalPool finalize_requests_thread;
/// RAFT wrapper.
std::unique_ptr<KeeperServer> server;
@ -82,34 +77,6 @@ private:
/// Counter for new session_id requests.
std::atomic<int64_t> internal_session_id_counter{0};
/// A read request needs to have at least the log it was the last committed log on the leader
/// at the time the request was being made.
/// If the node is stale, we need to wait to commit that log before doing local read requests to achieve
/// linearizability.
std::unordered_map<KeeperServer::NodeInfo, KeeperStorage::RequestsForSessions> leader_waiters;
std::mutex leader_waiter_mutex;
/// We can be actively processing one type of requests (either read or write) from a single session.
/// If we receive a request of a type that is not currently being processed, we put it in the waiting queue.
/// Also, we want to process them in ariving order, so if we have a different type in the queue, we cannot process that request
/// but wait for all the previous requests to finish.
/// E.g. READ -> WRITE -> READ, the last READ will go to the waiting queue even though we are currently processing the first READ
/// because we have WRITE request before it that needs to be processed.
struct UnprocessedRequests
{
/// how many requests are currently in the active request queue
size_t unprocessed_num{0};
/// is_read currently being processed
bool is_read{false};
std::list<KeeperStorage::RequestForSession> request_queue;
};
// Called every time a batch of requests are processed.
void finalizeRequests(const KeeperStorage::RequestsForSessions & requests_for_sessions);
std::unordered_map<int64_t, UnprocessedRequests> unprocessed_requests_for_session;
std::mutex unprocessed_request_mutex;
/// Thread put requests to raft
void requestThread();
/// Thread put responses for subscribed sessions
@ -121,12 +88,6 @@ private:
/// Thread apply or wait configuration changes from leader
void updateConfigurationThread();
void readRequestThread();
void finalizeRequestsThread();
void processReadRequests(const CoordinationSettingsPtr & coordination_settings, KeeperStorage::RequestsForSessions & read_requests);
void setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response);
/// Add error responses for requests to responses queue.
@ -135,7 +96,7 @@ private:
/// Forcefully wait for result and sets errors if something when wrong.
/// Clears both arguments
static void forceWaitAndProcessResult(RaftResult & result);
void forceWaitAndProcessResult(RaftAppendResult & result, KeeperStorage::RequestsForSessions & requests_for_sessions);
public:
/// Just allocate some objects, real initialization is done by `intialize method`
@ -155,12 +116,6 @@ public:
return server && server->checkInit();
}
/// Called when a single log with request is committed.
void onRequestCommit(const KeeperStorage::RequestForSession & request_for_session, uint64_t log_term, uint64_t log_idx);
/// Called when a snapshot is applied
void onApplySnapshot(uint64_t term, uint64_t last_idx);
/// Is server accepting requests, i.e. connected to the cluster
/// and achieved quorum
bool isServerActive() const;

View File

@ -105,9 +105,7 @@ KeeperServer::KeeperServer(
const KeeperConfigurationAndSettingsPtr & configuration_and_settings_,
const Poco::Util::AbstractConfiguration & config,
ResponsesQueue & responses_queue_,
SnapshotsQueue & snapshots_queue_,
KeeperStateMachine::CommitCallback commit_callback,
KeeperStateMachine::ApplySnapshotCallback apply_snapshot_callback)
SnapshotsQueue & snapshots_queue_)
: server_id(configuration_and_settings_->server_id)
, coordination_settings(configuration_and_settings_->coordination_settings)
, log(&Poco::Logger::get("KeeperServer"))
@ -115,7 +113,7 @@ KeeperServer::KeeperServer(
, keeper_context{std::make_shared<KeeperContext>()}
, create_snapshot_on_exit(config.getBool("keeper_server.create_snapshot_on_exit", true))
{
if (coordination_settings->quorum_reads || coordination_settings->read_mode.toString() == "quorum")
if (coordination_settings->quorum_reads)
LOG_WARNING(log, "Quorum reads enabled, Keeper will work slower.");
keeper_context->digest_enabled = config.getBool("keeper_server.digest_enabled", false);
@ -127,9 +125,7 @@ KeeperServer::KeeperServer(
configuration_and_settings_->snapshot_storage_path,
coordination_settings,
keeper_context,
checkAndGetSuperdigest(configuration_and_settings_->super_digest),
std::move(commit_callback),
std::move(apply_snapshot_callback));
checkAndGetSuperdigest(configuration_and_settings_->super_digest));
state_manager = nuraft::cs_new<KeeperStateManager>(
server_id,
@ -180,13 +176,6 @@ struct KeeperServer::KeeperRaftServer : public nuraft::raft_server
reconfigure(new_config);
}
RaftResult getLeaderInfo()
{
nuraft::ptr<nuraft::req_msg> req
= nuraft::cs_new<nuraft::req_msg>(0ull, nuraft::msg_type::leader_status_request, 0, 0, 0ull, 0ull, 0ull);
return send_msg_to_leader(req);
}
void commit_in_bg() override
{
// For NuRaft, if any commit fails (uncaught exception) the whole server aborts as a safety
@ -280,20 +269,6 @@ void KeeperServer::launchRaftServer(const Poco::Util::AbstractConfiguration & co
coordination_settings->election_timeout_lower_bound_ms.totalMilliseconds(), "election_timeout_lower_bound_ms", log);
params.election_timeout_upper_bound_ = getValueOrMaxInt32AndLogWarning(
coordination_settings->election_timeout_upper_bound_ms.totalMilliseconds(), "election_timeout_upper_bound_ms", log);
params.leadership_expiry_ = getValueOrMaxInt32AndLogWarning(coordination_settings->leadership_expiry.totalMilliseconds(), "leadership_expiry", log);
if (coordination_settings->read_mode.toString() == "fastlinear")
{
if (params.leadership_expiry_ == 0)
params.leadership_expiry_ = params.election_timeout_lower_bound_;
else if (params.leadership_expiry_ > params.election_timeout_lower_bound_)
{
LOG_WARNING(log, "To use fast linearizable reads, leadership_expiry should be set to a value that is less or equal to the election_timeout_upper_bound_ms. "
"Based on current settings, there are no guarantees for linearizability of reads.");
}
}
params.reserved_log_items_ = getValueOrMaxInt32AndLogWarning(coordination_settings->reserved_log_items, "reserved_log_items", log);
params.snapshot_distance_ = getValueOrMaxInt32AndLogWarning(coordination_settings->snapshot_distance, "snapshot_distance", log);
@ -512,7 +487,7 @@ void KeeperServer::putLocalReadRequest(const KeeperStorage::RequestForSession &
state_machine->processReadRequest(request_for_session);
}
RaftResult KeeperServer::putRequestBatch(const KeeperStorage::RequestsForSessions & requests_for_sessions)
RaftAppendResult KeeperServer::putRequestBatch(const KeeperStorage::RequestsForSessions & requests_for_sessions)
{
std::vector<nuraft::ptr<nuraft::buffer>> entries;
for (const auto & request_for_session : requests_for_sessions)
@ -738,20 +713,6 @@ std::vector<int64_t> KeeperServer::getDeadSessions()
return state_machine->getDeadSessions();
}
RaftResult KeeperServer::getLeaderInfo()
{
std::lock_guard lock{server_write_mutex};
if (is_recovering)
return nullptr;
return raft_instance->getLeaderInfo();
}
KeeperServer::NodeInfo KeeperServer::getNodeInfo()
{
return { .term = raft_instance->get_term(), .last_committed_index = state_machine->last_commit_index() };
}
ConfigUpdateActions KeeperServer::getConfigurationDiff(const Poco::Util::AbstractConfiguration & config)
{
auto diff = state_manager->getConfigurationDiff(config);

View File

@ -14,7 +14,7 @@
namespace DB
{
using RaftResult = nuraft::ptr<nuraft::cmd_result<nuraft::ptr<nuraft::buffer>>>;
using RaftAppendResult = nuraft::ptr<nuraft::cmd_result<nuraft::ptr<nuraft::buffer>>>;
class KeeperServer
{
@ -71,9 +71,7 @@ public:
const KeeperConfigurationAndSettingsPtr & settings_,
const Poco::Util::AbstractConfiguration & config_,
ResponsesQueue & responses_queue_,
SnapshotsQueue & snapshots_queue_,
KeeperStateMachine::CommitCallback commit_callback,
KeeperStateMachine::ApplySnapshotCallback apply_snapshot_callback);
SnapshotsQueue & snapshots_queue_);
/// Load state machine from the latest snapshot and load log storage. Start NuRaft with required settings.
void startup(const Poco::Util::AbstractConfiguration & config, bool enable_ipv6 = true);
@ -86,7 +84,7 @@ public:
/// Put batch of requests into Raft and get result of put. Responses will be set separately into
/// responses_queue.
RaftResult putRequestBatch(const KeeperStorage::RequestsForSessions & requests);
RaftAppendResult putRequestBatch(const KeeperStorage::RequestsForSessions & requests);
/// Return set of the non-active sessions
std::vector<int64_t> getDeadSessions();
@ -121,17 +119,6 @@ public:
int getServerID() const { return server_id; }
struct NodeInfo
{
uint64_t term;
uint64_t last_committed_index;
bool operator==(const NodeInfo &) const = default;
};
RaftResult getLeaderInfo();
NodeInfo getNodeInfo();
/// Get configuration diff between current configuration in RAFT and in XML file
ConfigUpdateActions getConfigurationDiff(const Poco::Util::AbstractConfiguration & config);
@ -139,23 +126,10 @@ public:
/// Synchronously check for update results with retries.
void applyConfigurationUpdate(const ConfigUpdateAction & task);
/// Wait configuration update for action. Used by followers.
/// Return true if update was successfully received.
bool waitConfigurationUpdate(const ConfigUpdateAction & task);
};
}
namespace std
{
template <>
struct hash<DB::KeeperServer::NodeInfo>
{
size_t operator()(const DB::KeeperServer::NodeInfo & info) const
{
SipHash hash_state;
hash_state.update(info.term);
hash_state.update(info.last_committed_index);
return hash_state.get64();
}
};
}

View File

@ -44,9 +44,7 @@ KeeperStateMachine::KeeperStateMachine(
const std::string & snapshots_path_,
const CoordinationSettingsPtr & coordination_settings_,
const KeeperContextPtr & keeper_context_,
const std::string & superdigest_,
CommitCallback commit_callback_,
ApplySnapshotCallback apply_snapshot_callback_)
const std::string & superdigest_)
: coordination_settings(coordination_settings_)
, snapshot_manager(
snapshots_path_,
@ -60,8 +58,6 @@ KeeperStateMachine::KeeperStateMachine(
, last_committed_idx(0)
, log(&Poco::Logger::get("KeeperStateMachine"))
, superdigest(superdigest_)
, commit_callback(std::move(commit_callback_))
, apply_snapshot_callback(std::move(apply_snapshot_callback_))
, keeper_context(keeper_context_)
{
}
@ -227,11 +223,11 @@ bool KeeperStateMachine::preprocess(const KeeperStorage::RequestForSession & req
return true;
}
nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit_ext(const ext_op_params & params)
nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, nuraft::buffer & data)
{
auto request_for_session = parseRequest(*params.data);
auto request_for_session = parseRequest(data);
if (!request_for_session.zxid)
request_for_session.zxid = params.log_idx;
request_for_session.zxid = log_idx;
/// Special processing of session_id request
if (request_for_session.request->getOpNum() == Coordination::OpNum::SessionID)
@ -276,9 +272,8 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit_ext(const ext_op_params &
assertDigest(*request_for_session.digest, storage->getNodesDigest(true), *request_for_session.request, true);
}
last_committed_idx = params.log_idx;
commit_callback(request_for_session, params.log_term, params.log_idx);
ProfileEvents::increment(ProfileEvents::KeeperCommits);
last_committed_idx = log_idx;
return nullptr;
}
@ -311,7 +306,6 @@ bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s)
ProfileEvents::increment(ProfileEvents::KeeperSnapshotApplys);
last_committed_idx = s.get_last_log_idx();
apply_snapshot_callback(s.get_last_log_term(), s.get_last_log_idx());
return true;
}
@ -326,10 +320,6 @@ void KeeperStateMachine::commit_config(const uint64_t /* log_idx */, nuraft::ptr
void KeeperStateMachine::rollback(uint64_t log_idx, nuraft::buffer & data)
{
auto request_for_session = parseRequest(data);
if (request_for_session.request->getOpNum() == Coordination::OpNum::SessionID)
return;
// If we received a log from an older node, use the log_idx as the zxid
// log_idx will always be larger or equal to the zxid so we can safely do this
// (log_idx is increased for all logs, while zxid is only increased for requests)

View File

@ -20,18 +20,13 @@ using SnapshotsQueue = ConcurrentBoundedQueue<CreateSnapshotTask>;
class KeeperStateMachine : public nuraft::state_machine
{
public:
using CommitCallback = std::function<void(const KeeperStorage::RequestForSession &, uint64_t, uint64_t)>;
using ApplySnapshotCallback = std::function<void(uint64_t, uint64_t)>;
KeeperStateMachine(
ResponsesQueue & responses_queue_,
SnapshotsQueue & snapshots_queue_,
const std::string & snapshots_path_,
const CoordinationSettingsPtr & coordination_settings_,
const KeeperContextPtr & keeper_context_,
const std::string & superdigest_ = "",
CommitCallback commit_callback_ = [](const KeeperStorage::RequestForSession &, uint64_t, uint64_t){},
ApplySnapshotCallback apply_snapshot_callback_ = [](uint64_t, uint64_t){});
const std::string & superdigest_ = "");
/// Read state from the latest snapshot
void init();
@ -42,7 +37,7 @@ public:
nuraft::ptr<nuraft::buffer> pre_commit(uint64_t log_idx, nuraft::buffer & data) override;
nuraft::ptr<nuraft::buffer> commit_ext(const ext_op_params & params) override; /// NOLINT
nuraft::ptr<nuraft::buffer> commit(const uint64_t log_idx, nuraft::buffer & data) override; /// NOLINT
/// Save new cluster config to our snapshot (copy of the config stored in StateManager)
void commit_config(const uint64_t log_idx, nuraft::ptr<nuraft::cluster_config> & new_conf) override; /// NOLINT
@ -150,11 +145,6 @@ private:
/// Special part of ACL system -- superdigest specified in server config.
const std::string superdigest;
/// call when a request is committed
const CommitCallback commit_callback;
/// call when snapshot is applied
const ApplySnapshotCallback apply_snapshot_callback;
KeeperContextPtr keeper_context;
};

View File

@ -1330,9 +1330,8 @@ void testLogAndStateMachine(Coordination::CoordinationSettingsPtr settings, uint
changelog.append(entry);
changelog.end_of_append_batch(0, 0);
auto entry_buf = changelog.entry_at(i)->get_buf_ptr();
state_machine->pre_commit(i, *entry_buf);
state_machine->commit_ext(nuraft::state_machine::ext_op_params{i, entry_buf});
state_machine->pre_commit(i, changelog.entry_at(i)->get_buf());
state_machine->commit(i, changelog.entry_at(i)->get_buf());
bool snapshot_created = false;
if (i % settings->snapshot_distance == 0)
{
@ -1376,9 +1375,8 @@ void testLogAndStateMachine(Coordination::CoordinationSettingsPtr settings, uint
for (size_t i = restore_machine->last_commit_index() + 1; i < restore_changelog.next_slot(); ++i)
{
auto entry = changelog.entry_at(i)->get_buf_ptr();
restore_machine->pre_commit(i, *entry);
restore_machine->commit_ext(nuraft::state_machine::ext_op_params{i, entry});
restore_machine->pre_commit(i, changelog.entry_at(i)->get_buf());
restore_machine->commit(i, changelog.entry_at(i)->get_buf());
}
auto & source_storage = state_machine->getStorage();
@ -1479,18 +1477,18 @@ TEST_P(CoordinationTest, TestEphemeralNodeRemove)
std::shared_ptr<ZooKeeperCreateRequest> request_c = std::make_shared<ZooKeeperCreateRequest>();
request_c->path = "/hello";
request_c->is_ephemeral = true;
auto entry_c = getLogEntryFromZKRequest(0, 1, state_machine->getNextZxid(), request_c)->get_buf_ptr();
state_machine->pre_commit(1, *entry_c);
state_machine->commit_ext(nuraft::state_machine::ext_op_params{1, entry_c});
auto entry_c = getLogEntryFromZKRequest(0, 1, state_machine->getNextZxid(), request_c);
state_machine->pre_commit(1, entry_c->get_buf());
state_machine->commit(1, entry_c->get_buf());
const auto & storage = state_machine->getStorage();
EXPECT_EQ(storage.ephemerals.size(), 1);
std::shared_ptr<ZooKeeperRemoveRequest> request_d = std::make_shared<ZooKeeperRemoveRequest>();
request_d->path = "/hello";
/// Delete from other session
auto entry_d = getLogEntryFromZKRequest(0, 2, state_machine->getNextZxid(), request_d)->get_buf_ptr();
state_machine->pre_commit(2, *entry_d);
state_machine->commit_ext(nuraft::state_machine::ext_op_params{2, entry_d});
auto entry_d = getLogEntryFromZKRequest(0, 2, state_machine->getNextZxid(), request_d);
state_machine->pre_commit(2, entry_d->get_buf());
state_machine->commit(2, entry_d->get_buf());
EXPECT_EQ(storage.ephemerals.size(), 0);
}

View File

@ -623,6 +623,7 @@ NamesAndTypesList Block::getNamesAndTypesList() const
NamesAndTypes Block::getNamesAndTypes() const
{
NamesAndTypes res;
res.reserve(columns());
for (const auto & elem : data)
res.emplace_back(elem.name, elem.type);

View File

@ -136,6 +136,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Bool, distributed_aggregation_memory_efficient, true, "Is the memory-saving mode of distributed aggregation enabled.", 0) \
M(UInt64, aggregation_memory_efficient_merge_threads, 0, "Number of threads to use for merge intermediate aggregation results in memory efficient mode. When bigger, then more memory is consumed. 0 means - same as 'max_threads'.", 0) \
M(Bool, enable_positional_arguments, true, "Enable positional arguments in ORDER BY, GROUP BY and LIMIT BY", 0) \
M(Bool, enable_extended_results_for_datetime_functions, false, "Enable date functions like toLastDayOfMonth return Date32 results (instead of Date results) for Date32/DateTime64 arguments.", 0) \
\
M(Bool, group_by_use_nulls, false, "Treat columns mentioned in ROLLUP, CUBE or GROUPING SETS as Nullable", 0) \
\

View File

@ -453,15 +453,19 @@ using SubcolumnsTreeWithColumns = SubcolumnsTree<ColumnWithTypeAndDimensions>;
using Node = SubcolumnsTreeWithColumns::Node;
/// Creates data type and column from tree of subcolumns.
ColumnWithTypeAndDimensions createTypeFromNode(const Node * node)
ColumnWithTypeAndDimensions createTypeFromNode(const Node & node)
{
auto collect_tuple_elemets = [](const auto & children)
{
if (children.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot create type from empty Tuple or Nested node");
std::vector<std::tuple<String, ColumnWithTypeAndDimensions>> tuple_elements;
tuple_elements.reserve(children.size());
for (const auto & [name, child] : children)
{
auto column = createTypeFromNode(child.get());
assert(child);
auto column = createTypeFromNode(*child);
tuple_elements.emplace_back(name, std::move(column));
}
@ -475,13 +479,13 @@ ColumnWithTypeAndDimensions createTypeFromNode(const Node * node)
return std::make_tuple(std::move(tuple_names), std::move(tuple_columns));
};
if (node->kind == Node::SCALAR)
if (node.kind == Node::SCALAR)
{
return node->data;
return node.data;
}
else if (node->kind == Node::NESTED)
else if (node.kind == Node::NESTED)
{
auto [tuple_names, tuple_columns] = collect_tuple_elemets(node->children);
auto [tuple_names, tuple_columns] = collect_tuple_elemets(node.children);
Columns offsets_columns;
offsets_columns.reserve(tuple_columns[0].array_dimensions + 1);
@ -492,7 +496,7 @@ ColumnWithTypeAndDimensions createTypeFromNode(const Node * node)
/// `k1 Array(Nested(k2 Int, k3 Int))` and k1 is marked as Nested
/// and `k2` and `k3` has anonymous_array_level = 1 in that case.
const auto & current_array = assert_cast<const ColumnArray &>(*node->data.column);
const auto & current_array = assert_cast<const ColumnArray &>(*node.data.column);
offsets_columns.push_back(current_array.getOffsetsPtr());
auto first_column = tuple_columns[0].column;
@ -529,7 +533,7 @@ ColumnWithTypeAndDimensions createTypeFromNode(const Node * node)
}
else
{
auto [tuple_names, tuple_columns] = collect_tuple_elemets(node->children);
auto [tuple_names, tuple_columns] = collect_tuple_elemets(node.children);
size_t num_elements = tuple_columns.size();
Columns tuple_elements_columns(num_elements);
@ -587,6 +591,15 @@ std::pair<ColumnPtr, DataTypePtr> unflattenObjectToTuple(const ColumnObject & co
{
const auto & subcolumns = column.getSubcolumns();
if (subcolumns.empty())
{
auto type = std::make_shared<DataTypeTuple>(
DataTypes{std::make_shared<DataTypeUInt8>()},
Names{ColumnObject::COLUMN_NAME_DUMMY});
return {type->createColumn()->cloneResized(column.size()), type};
}
PathsInData paths;
DataTypes types;
Columns columns;
@ -613,6 +626,9 @@ std::pair<ColumnPtr, DataTypePtr> unflattenTuple(
assert(paths.size() == tuple_types.size());
assert(paths.size() == tuple_columns.size());
if (paths.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot unflatten empty Tuple");
/// We add all paths to the subcolumn tree and then create a type from it.
/// The tree stores column, type and number of array dimensions
/// for each intermediate node.

View File

@ -51,6 +51,8 @@ public:
using NodeKind = typename Node::Kind;
using NodePtr = std::shared_ptr<Node>;
SubcolumnsTree() : root(std::make_shared<Node>(Node::TUPLE)) {}
/// Add a leaf without any data in other nodes.
bool add(const PathInData & path, const NodeData & leaf_data)
{
@ -73,13 +75,9 @@ public:
bool add(const PathInData & path, const NodeCreator & node_creator)
{
const auto & parts = path.getParts();
if (parts.empty())
return false;
if (!root)
root = std::make_shared<Node>(Node::TUPLE);
Node * current_node = root.get();
for (size_t i = 0; i < parts.size() - 1; ++i)
{
@ -166,13 +164,13 @@ public:
return node;
}
bool empty() const { return root == nullptr; }
bool empty() const { return root->children.empty(); }
size_t size() const { return leaves.size(); }
using Nodes = std::vector<NodePtr>;
const Nodes & getLeaves() const { return leaves; }
const Node * getRoot() const { return root.get(); }
const Node & getRoot() const { return *root; }
using iterator = typename Nodes::iterator;
using const_iterator = typename Nodes::const_iterator;
@ -186,11 +184,11 @@ public:
private:
const Node * findImpl(const PathInData & path, bool find_exact) const
{
if (!root)
if (empty())
return nullptr;
const auto & parts = path.getParts();
const Node * current_node = root.get();
const auto * current_node = root.get();
for (const auto & part : parts)
{

View File

@ -82,6 +82,14 @@ struct ToStartOfWeekImpl
{
return time_zone.toFirstDayNumOfWeek(DayNum(d), week_mode);
}
static inline Int64 execute_extended_result(Int64 t, UInt8 week_mode, const DateLUTImpl & time_zone)
{
return time_zone.toFirstDayNumOfWeek(time_zone.toDayNum(t), week_mode);
}
static inline Int32 execute_extended_result(Int32 d, UInt8 week_mode, const DateLUTImpl & time_zone)
{
return time_zone.toFirstDayNumOfWeek(ExtendedDayNum(d), week_mode);
}
using FactorTransform = ZeroTransform;
};
@ -115,7 +123,7 @@ struct ToWeekImpl
using FactorTransform = ToStartOfYearImpl;
};
template <typename FromType, typename ToType, typename Transform>
template <typename FromType, typename ToType, typename Transform, bool is_extended_result = false>
struct WeekTransformer
{
explicit WeekTransformer(Transform transform_)
@ -130,6 +138,9 @@ struct WeekTransformer
vec_to.resize(size);
for (size_t i = 0; i < size; ++i)
if constexpr (is_extended_result)
vec_to[i] = transform.execute_extended_result(vec_from[i], week_mode, time_zone);
else
vec_to[i] = transform.execute(vec_from[i], week_mode, time_zone);
}
@ -138,13 +149,13 @@ private:
};
template <typename FromDataType, typename ToDataType>
template <typename FromDataType, typename ToDataType, bool is_extended_result = false>
struct CustomWeekTransformImpl
{
template <typename Transform>
static ColumnPtr execute(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t /*input_rows_count*/, Transform transform = {})
{
const auto op = WeekTransformer<typename FromDataType::FieldType, typename ToDataType::FieldType, Transform>{std::move(transform)};
const auto op = WeekTransformer<typename FromDataType::FieldType, typename ToDataType::FieldType, Transform, is_extended_result>{std::move(transform)};
UInt8 week_mode = DEFAULT_WEEK_MODE;
if (arguments.size() > 1)

View File

@ -161,7 +161,14 @@ struct ToMondayImpl
{
return time_zone.toFirstDayNumOfWeek(DayNum(d));
}
static inline Int64 execute_extended_result(Int64 t, const DateLUTImpl & time_zone)
{
return time_zone.toFirstDayNumOfWeek(time_zone.toDayNum(t));
}
static inline Int32 execute_extended_result(Int32 d, const DateLUTImpl & time_zone)
{
return time_zone.toFirstDayNumOfWeek(ExtendedDayNum(d));
}
using FactorTransform = ZeroTransform;
};
@ -185,6 +192,14 @@ struct ToStartOfMonthImpl
{
return time_zone.toFirstDayNumOfMonth(DayNum(d));
}
static inline Int64 execute_extended_result(Int64 t, const DateLUTImpl & time_zone)
{
return time_zone.toFirstDayNumOfMonth(time_zone.toDayNum(t));
}
static inline Int32 execute_extended_result(Int32 d, const DateLUTImpl & time_zone)
{
return time_zone.toFirstDayNumOfMonth(ExtendedDayNum(d));
}
using FactorTransform = ZeroTransform;
};
@ -218,7 +233,14 @@ struct ToLastDayOfMonthImpl
/// 0xFFF9 is Int value for 2149-05-31 -- the last day where we can actually find LastDayOfMonth. This will also be the return value.
return time_zone.toLastDayNumOfMonth(DayNum(std::min(d, UInt16(0xFFF9))));
}
static inline Int64 execute_extended_result(Int64 t, const DateLUTImpl & time_zone)
{
return time_zone.toLastDayNumOfMonth(time_zone.toDayNum(t));
}
static inline Int32 execute_extended_result(Int32 d, const DateLUTImpl & time_zone)
{
return time_zone.toLastDayNumOfMonth(ExtendedDayNum(d));
}
using FactorTransform = ZeroTransform;
};
@ -242,7 +264,14 @@ struct ToStartOfQuarterImpl
{
return time_zone.toFirstDayNumOfQuarter(DayNum(d));
}
static inline Int64 execute_extended_result(Int64 t, const DateLUTImpl & time_zone)
{
return time_zone.toFirstDayNumOfQuarter(time_zone.toDayNum(t));
}
static inline Int32 execute_extended_result(Int32 d, const DateLUTImpl & time_zone)
{
return time_zone.toFirstDayNumOfQuarter(ExtendedDayNum(d));
}
using FactorTransform = ZeroTransform;
};
@ -266,6 +295,14 @@ struct ToStartOfYearImpl
{
return time_zone.toFirstDayNumOfYear(DayNum(d));
}
static inline Int64 execute_extended_result(Int64 t, const DateLUTImpl & time_zone)
{
return time_zone.toFirstDayNumOfYear(time_zone.toDayNum(t));
}
static inline Int32 execute_extended_result(Int32 d, const DateLUTImpl & time_zone)
{
return time_zone.toFirstDayNumOfYear(ExtendedDayNum(d));
}
using FactorTransform = ZeroTransform;
};
@ -893,7 +930,7 @@ struct ToStartOfISOYearImpl
static inline UInt16 execute(Int64 t, const DateLUTImpl & time_zone)
{
return time_zone.toFirstDayNumOfISOYear(time_zone.toDayNum(t));
return t < 0 ? 0 : time_zone.toFirstDayNumOfISOYear(ExtendedDayNum(std::min(Int32(time_zone.toDayNum(t)), Int32(DATE_LUT_MAX_DAY_NUM))));
}
static inline UInt16 execute(UInt32 t, const DateLUTImpl & time_zone)
{
@ -901,12 +938,20 @@ struct ToStartOfISOYearImpl
}
static inline UInt16 execute(Int32 d, const DateLUTImpl & time_zone)
{
return time_zone.toFirstDayNumOfISOYear(ExtendedDayNum(d));
return d < 0 ? 0 : time_zone.toFirstDayNumOfISOYear(ExtendedDayNum(std::min(d, Int32(DATE_LUT_MAX_DAY_NUM))));
}
static inline UInt16 execute(UInt16 d, const DateLUTImpl & time_zone)
{
return time_zone.toFirstDayNumOfISOYear(DayNum(d));
}
static inline Int64 execute_extended_result(Int64 t, const DateLUTImpl & time_zone)
{
return time_zone.toFirstDayNumOfISOYear(time_zone.toDayNum(t));
}
static inline Int32 execute_extended_result(Int32 d, const DateLUTImpl & time_zone)
{
return time_zone.toFirstDayNumOfISOYear(ExtendedDayNum(d));
}
using FactorTransform = ZeroTransform;
};
@ -1201,7 +1246,7 @@ struct ToYYYYMMDDhhmmssImpl
};
template <typename FromType, typename ToType, typename Transform>
template <typename FromType, typename ToType, typename Transform, bool is_extended_result = false>
struct Transformer
{
template <typename FromTypeVector, typename ToTypeVector>
@ -1211,18 +1256,21 @@ struct Transformer
vec_to.resize(size);
for (size_t i = 0; i < size; ++i)
if constexpr (is_extended_result)
vec_to[i] = transform.execute_extended_result(vec_from[i], time_zone);
else
vec_to[i] = transform.execute(vec_from[i], time_zone);
}
};
template <typename FromDataType, typename ToDataType, typename Transform>
template <typename FromDataType, typename ToDataType, typename Transform, bool is_extended_result = false>
struct DateTimeTransformImpl
{
static ColumnPtr execute(
const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t /*input_rows_count*/, const Transform & transform = {})
{
using Op = Transformer<typename FromDataType::FieldType, typename ToDataType::FieldType, Transform>;
using Op = Transformer<typename FromDataType::FieldType, typename ToDataType::FieldType, Transform, is_extended_result>;
const ColumnPtr source_col = arguments[0].column;
if (const auto * sources = checkAndGetColumn<typename FromDataType::ColumnType>(source_col.get()))

View File

@ -0,0 +1,78 @@
#pragma once
#include <Functions/IFunctionCustomWeek.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}
template <typename Transform>
class FunctionCustomWeekToDateOrDate32 : public IFunctionCustomWeek<Transform>, WithContext
{
public:
const bool enable_extended_results_for_datetime_functions = false;
static FunctionPtr create(ContextPtr context_)
{
return std::make_shared<FunctionCustomWeekToDateOrDate32>(context_);
}
explicit FunctionCustomWeekToDateOrDate32(ContextPtr context_)
: WithContext(context_)
, enable_extended_results_for_datetime_functions(context_->getSettingsRef().enable_extended_results_for_datetime_functions)
{
}
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
this->checkArguments(arguments, /*is_result_type_date_or_date32*/ true);
const IDataType * from_type = arguments[0].type.get();
WhichDataType which(from_type);
if ((which.isDate32() || which.isDateTime64()) && enable_extended_results_for_datetime_functions)
return std::make_shared<DataTypeDate32>();
else
return std::make_shared<DataTypeDate>();
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const override
{
const IDataType * from_type = arguments[0].type.get();
WhichDataType which(from_type);
if (which.isDate())
return CustomWeekTransformImpl<DataTypeDate, DataTypeDate>::execute(
arguments, result_type, input_rows_count, Transform{});
else if (which.isDate32())
if (enable_extended_results_for_datetime_functions)
return CustomWeekTransformImpl<DataTypeDate32, DataTypeDate32, /*is_extended_result*/ true>::execute(
arguments, result_type, input_rows_count, Transform{});
else
return CustomWeekTransformImpl<DataTypeDate32, DataTypeDate>::execute(
arguments, result_type, input_rows_count, Transform{});
else if (which.isDateTime())
return CustomWeekTransformImpl<DataTypeDateTime, DataTypeDate>::execute(
arguments, result_type, input_rows_count, Transform{});
else if (which.isDateTime64())
{
if (enable_extended_results_for_datetime_functions)
return CustomWeekTransformImpl<DataTypeDateTime64, DataTypeDate32, /*is_extended_result*/ true>::execute(
arguments, result_type, input_rows_count,
TransformDateTime64<Transform>{assert_cast<const DataTypeDateTime64 *>(from_type)->getScale()});
else
return CustomWeekTransformImpl<DataTypeDateTime64, DataTypeDate>::execute(
arguments, result_type, input_rows_count,
TransformDateTime64<Transform>{assert_cast<const DataTypeDateTime64 *>(from_type)->getScale()});
}
else
throw Exception(
"Illegal type " + arguments[0].type->getName() + " of argument of function " + this->getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
};
}

View File

@ -1,14 +1,5 @@
#pragma once
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDate32.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <Functions/CustomWeekTransforms.h>
#include <Functions/IFunction.h>
#include <Functions/TransformDateTime64.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context_fwd.h>
#include <Functions/IFunctionCustomWeek.h>
namespace DB
{
@ -16,82 +7,23 @@ namespace DB
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
/// See CustomWeekTransforms.h
template <typename ToDataType, typename Transform>
class FunctionCustomWeekToSomething : public IFunction
class FunctionCustomWeekToSomething : public IFunctionCustomWeek<Transform>
{
public:
static constexpr auto name = Transform::name;
static FunctionPtr create(ContextPtr) { return std::make_shared<FunctionCustomWeekToSomething>(); }
String getName() const override { return name; }
bool isVariadic() const override { return true; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
size_t getNumberOfArguments() const override { return 0; }
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
if (arguments.size() == 1)
{
if (!isDate(arguments[0].type) && !isDate32(arguments[0].type) && !isDateTime(arguments[0].type) && !isDateTime64(arguments[0].type))
throw Exception(
"Illegal type " + arguments[0].type->getName() + " of argument of function " + getName()
+ ". Must be Date, Date32, DateTime or DateTime64.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
else if (arguments.size() == 2)
{
if (!isDate(arguments[0].type) && !isDate32(arguments[0].type) && !isDateTime(arguments[0].type) && !isDateTime64(arguments[0].type))
throw Exception(
"Illegal type " + arguments[0].type->getName() + " of 1st argument of function " + getName()
+ ". Must be Date, Date32, DateTime or DateTime64.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
if (!isUInt8(arguments[1].type))
throw Exception(
"Illegal type of 2nd (optional) argument of function " + getName()
+ ". Must be constant UInt8 (week mode).",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
else if (arguments.size() == 3)
{
if (!isDate(arguments[0].type) && !isDate32(arguments[0].type) && !isDateTime(arguments[0].type) && !isDateTime64(arguments[0].type))
throw Exception(
"Illegal type " + arguments[0].type->getName() + " of argument of function " + getName()
+ ". Must be Date, Date32, DateTime or DateTime64",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
if (!isUInt8(arguments[1].type))
throw Exception(
"Illegal type of 2nd (optional) argument of function " + getName()
+ ". Must be constant UInt8 (week mode).",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
if (!isString(arguments[2].type))
throw Exception(
"Illegal type of 3rd (optional) argument of function " + getName()
+ ". Must be constant string (timezone name).",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
if ((isDate(arguments[0].type) || isDate32(arguments[0].type))
&& (std::is_same_v<ToDataType, DataTypeDate> || std::is_same_v<ToDataType, DataTypeDate32>))
throw Exception(
"The timezone argument of function " + getName() + " is allowed only when the 1st argument is DateTime or DateTime64.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
else
throw Exception(
"Number of arguments for function " + getName() + " doesn't match: passed " + toString(arguments.size())
+ ", expected 1, 2 or 3.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
this->checkArguments(arguments);
return std::make_shared<ToDataType>();
}
bool useDefaultImplementationForConstants() const override { return true; }
ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {1, 2}; }
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const override
{
const IDataType * from_type = arguments[0].type.get();
@ -114,44 +46,10 @@ public:
}
else
throw Exception(
"Illegal type " + arguments[0].type->getName() + " of argument of function " + getName(),
"Illegal type " + arguments[0].type->getName() + " of argument of function " + this->getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
bool hasInformationAboutMonotonicity() const override { return true; }
Monotonicity getMonotonicityForRange(const IDataType & type, const Field & left, const Field & right) const override
{
if constexpr (std::is_same_v<typename Transform::FactorTransform, ZeroTransform>)
return { .is_monotonic = true, .is_always_monotonic = true };
const IFunction::Monotonicity is_monotonic = { .is_monotonic = true };
const IFunction::Monotonicity is_not_monotonic;
/// This method is called only if the function has one argument. Therefore, we do not care about the non-local time zone.
const DateLUTImpl & date_lut = DateLUT::instance();
if (left.isNull() || right.isNull())
return {};
/// The function is monotonous on the [left, right] segment, if the factor transformation returns the same values for them.
if (checkAndGetDataType<DataTypeDate>(&type))
{
return Transform::FactorTransform::execute(UInt16(left.get<UInt64>()), date_lut)
== Transform::FactorTransform::execute(UInt16(right.get<UInt64>()), date_lut)
? is_monotonic
: is_not_monotonic;
}
else
{
return Transform::FactorTransform::execute(UInt32(left.get<UInt64>()), date_lut)
== Transform::FactorTransform::execute(UInt32(right.get<UInt64>()), date_lut)
? is_monotonic
: is_not_monotonic;
}
}
};
}

View File

@ -0,0 +1,81 @@
#pragma once
#include <Functions/IFunctionDateOrDateTime.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}
template <typename Transform>
class FunctionDateOrDateTimeToDateOrDate32 : public IFunctionDateOrDateTime<Transform>, WithContext
{
public:
const bool enable_extended_results_for_datetime_functions = false;
static FunctionPtr create(ContextPtr context_)
{
return std::make_shared<FunctionDateOrDateTimeToDateOrDate32>(context_);
}
explicit FunctionDateOrDateTimeToDateOrDate32(ContextPtr context_)
: WithContext(context_)
, enable_extended_results_for_datetime_functions(context_->getSettingsRef().enable_extended_results_for_datetime_functions)
{
}
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
this->checkArguments(arguments, /*is_result_type_date_or_date32*/ true);
const IDataType * from_type = arguments[0].type.get();
WhichDataType which(from_type);
/// If the time zone is specified but empty, throw an exception.
/// only validate the time_zone part if the number of arguments is 2.
if ((which.isDateTime() || which.isDateTime64()) && arguments.size() == 2
&& extractTimeZoneNameFromFunctionArguments(arguments, 1, 0).empty())
throw Exception(
"Function " + this->getName() + " supports a 2nd argument (optional) that must be non-empty and be a valid time zone",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
if ((which.isDate32() || which.isDateTime64()) && enable_extended_results_for_datetime_functions)
return std::make_shared<DataTypeDate32>();
else
return std::make_shared<DataTypeDate>();
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const override
{
const IDataType * from_type = arguments[0].type.get();
WhichDataType which(from_type);
if (which.isDate())
return DateTimeTransformImpl<DataTypeDate, DataTypeDate, Transform>::execute(arguments, result_type, input_rows_count);
else if (which.isDate32())
if (enable_extended_results_for_datetime_functions)
return DateTimeTransformImpl<DataTypeDate32, DataTypeDate32, Transform, /*is_extended_result*/ true>::execute(arguments, result_type, input_rows_count);
else
return DateTimeTransformImpl<DataTypeDate32, DataTypeDate, Transform>::execute(arguments, result_type, input_rows_count);
else if (which.isDateTime())
return DateTimeTransformImpl<DataTypeDateTime, DataTypeDate, Transform>::execute(arguments, result_type, input_rows_count);
else if (which.isDateTime64())
{
const auto scale = static_cast<const DataTypeDateTime64 *>(from_type)->getScale();
const TransformDateTime64<Transform> transformer(scale);
if (enable_extended_results_for_datetime_functions)
return DateTimeTransformImpl<DataTypeDateTime64, DataTypeDate32, decltype(transformer), /*is_extended_result*/ true>::execute(arguments, result_type, input_rows_count, transformer);
else
return DateTimeTransformImpl<DataTypeDateTime64, DataTypeDate, decltype(transformer)>::execute(arguments, result_type, input_rows_count, transformer);
}
else
throw Exception("Illegal type " + arguments[0].type->getName() + " of argument of function " + this->getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
};
}

View File

@ -1,14 +1,5 @@
#pragma once
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDate32.h>
#include <DataTypes/DataTypeDateTime.h>
#include <Functions/IFunction.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <Functions/extractTimeZoneFromFunctionArguments.h>
#include <Functions/DateTimeTransforms.h>
#include <Functions/TransformDateTime64.h>
#include <IO/WriteHelpers.h>
#include <Functions/IFunctionDateOrDateTime.h>
namespace DB
{
@ -16,59 +7,18 @@ namespace DB
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
/// See DateTimeTransforms.h
template <typename ToDataType, typename Transform>
class FunctionDateOrDateTimeToSomething : public IFunction
class FunctionDateOrDateTimeToSomething : public IFunctionDateOrDateTime<Transform>
{
public:
static constexpr auto name = Transform::name;
static FunctionPtr create(ContextPtr) { return std::make_shared<FunctionDateOrDateTimeToSomething>(); }
String getName() const override
{
return name;
}
bool isVariadic() const override { return true; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
size_t getNumberOfArguments() const override { return 0; }
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
if (arguments.size() == 1)
{
if (!isDateOrDate32(arguments[0].type) && !isDateTime(arguments[0].type) && !isDateTime64(arguments[0].type))
throw Exception(
"Illegal type " + arguments[0].type->getName() + " of argument of function " + getName()
+ ". Should be a date or a date with time",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
else if (arguments.size() == 2)
{
if (!isDateOrDate32(arguments[0].type) && !isDateTime(arguments[0].type) && !isDateTime64(arguments[0].type))
throw Exception(
"Illegal type " + arguments[0].type->getName() + " of argument of function " + getName()
+ ". Should be a date or a date with time",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
if (!isString(arguments[1].type))
throw Exception(
"Function " + getName() + " supports 1 or 2 arguments. The 1st argument "
"must be of type Date or DateTime. The 2nd argument (optional) must be "
"a constant string with timezone name",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
if ((isDate(arguments[0].type) || isDate32(arguments[0].type)) && (std::is_same_v<ToDataType, DataTypeDate> || std::is_same_v<ToDataType, DataTypeDate32>))
throw Exception(
"The timezone argument of function " + getName() + " is allowed only when the 1st argument has the type DateTime",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
else
throw Exception(
"Number of arguments for function " + getName() + " doesn't match: passed " + toString(arguments.size())
+ ", should be 1 or 2",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
this->checkArguments(arguments, (std::is_same_v<ToDataType, DataTypeDate> || std::is_same_v<ToDataType, DataTypeDate32>));
/// For DateTime, if time zone is specified, attach it to type.
/// If the time zone is specified but empty, throw an exception.
@ -79,7 +29,7 @@ public:
/// to accommodate functions like toStartOfDay(today()), toStartOfDay(yesterday()) etc.
if (arguments.size() == 2 && time_zone.empty())
throw Exception(
"Function " + getName() + " supports a 2nd argument (optional) that must be non-empty and be a valid time zone",
"Function " + this->getName() + " supports a 2nd argument (optional) that must be non-empty and be a valid time zone",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return std::make_shared<ToDataType>(time_zone);
}
@ -109,9 +59,6 @@ public:
return std::make_shared<ToDataType>();
}
bool useDefaultImplementationForConstants() const override { return true; }
ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {1}; }
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const override
{
const IDataType * from_type = arguments[0].type.get();
@ -131,51 +78,10 @@ public:
return DateTimeTransformImpl<DataTypeDateTime64, ToDataType, decltype(transformer)>::execute(arguments, result_type, input_rows_count, transformer);
}
else
throw Exception("Illegal type " + arguments[0].type->getName() + " of argument of function " + getName(),
throw Exception("Illegal type " + arguments[0].type->getName() + " of argument of function " + this->getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
bool hasInformationAboutMonotonicity() const override
{
return true;
}
Monotonicity getMonotonicityForRange(const IDataType & type, const Field & left, const Field & right) const override
{
if constexpr (std::is_same_v<typename Transform::FactorTransform, ZeroTransform>)
return { .is_monotonic = true, .is_always_monotonic = true };
const IFunction::Monotonicity is_monotonic = { .is_monotonic = true };
const IFunction::Monotonicity is_not_monotonic;
const DateLUTImpl * date_lut = &DateLUT::instance();
if (const auto * timezone = dynamic_cast<const TimezoneMixin *>(&type))
date_lut = &timezone->getTimeZone();
if (left.isNull() || right.isNull())
return is_not_monotonic;
/// The function is monotonous on the [left, right] segment, if the factor transformation returns the same values for them.
if (checkAndGetDataType<DataTypeDate>(&type))
{
return Transform::FactorTransform::execute(UInt16(left.get<UInt64>()), *date_lut)
== Transform::FactorTransform::execute(UInt16(right.get<UInt64>()), *date_lut)
? is_monotonic : is_not_monotonic;
}
else if (checkAndGetDataType<DataTypeDate32>(&type))
{
return Transform::FactorTransform::execute(Int32(left.get<UInt64>()), *date_lut)
== Transform::FactorTransform::execute(Int32(right.get<UInt64>()), *date_lut)
? is_monotonic : is_not_monotonic;
}
else
{
return Transform::FactorTransform::execute(UInt32(left.get<UInt64>()), *date_lut)
== Transform::FactorTransform::execute(UInt32(right.get<UInt64>()), *date_lut)
? is_monotonic : is_not_monotonic;
}
}
};
}

View File

@ -649,7 +649,7 @@ public:
for (unsigned int region_id : region_ids)
{
const StringRef & name_ref = dict.getRegionName(region_id, language);
col_to->insertDataWithTerminatingZero(name_ref.data, name_ref.size + 1);
col_to->insertData(name_ref.data, name_ref.size);
}
return col_to;

View File

@ -0,0 +1,122 @@
#pragma once
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDate32.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <Functions/CustomWeekTransforms.h>
#include <Functions/IFunction.h>
#include <Functions/TransformDateTime64.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
template <typename Transform>
class IFunctionCustomWeek : public IFunction
{
public:
static constexpr auto name = Transform::name;
String getName() const override { return name; }
bool isVariadic() const override { return true; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
size_t getNumberOfArguments() const override { return 0; }
bool useDefaultImplementationForConstants() const override { return true; }
ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {1, 2}; }
bool hasInformationAboutMonotonicity() const override { return true; }
Monotonicity getMonotonicityForRange(const IDataType & type, const Field & left, const Field & right) const override
{
if constexpr (std::is_same_v<typename Transform::FactorTransform, ZeroTransform>)
return {.is_monotonic = true, .is_always_monotonic = true};
const IFunction::Monotonicity is_monotonic = {.is_monotonic = true};
const IFunction::Monotonicity is_not_monotonic;
/// This method is called only if the function has one argument. Therefore, we do not care about the non-local time zone.
const DateLUTImpl & date_lut = DateLUT::instance();
if (left.isNull() || right.isNull())
return {};
/// The function is monotonous on the [left, right] segment, if the factor transformation returns the same values for them.
if (checkAndGetDataType<DataTypeDate>(&type))
{
return Transform::FactorTransform::execute(UInt16(left.get<UInt64>()), date_lut)
== Transform::FactorTransform::execute(UInt16(right.get<UInt64>()), date_lut)
? is_monotonic
: is_not_monotonic;
}
else
{
return Transform::FactorTransform::execute(UInt32(left.get<UInt64>()), date_lut)
== Transform::FactorTransform::execute(UInt32(right.get<UInt64>()), date_lut)
? is_monotonic
: is_not_monotonic;
}
}
protected:
void checkArguments(const ColumnsWithTypeAndName & arguments, bool is_result_type_date_or_date32 = false) const
{
if (arguments.size() == 1)
{
if (!isDate(arguments[0].type) && !isDate32(arguments[0].type) && !isDateTime(arguments[0].type) && !isDateTime64(arguments[0].type))
throw Exception(
"Illegal type " + arguments[0].type->getName() + " of argument of function " + getName()
+ ". Must be Date, Date32, DateTime or DateTime64.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
else if (arguments.size() == 2)
{
if (!isDate(arguments[0].type) && !isDate32(arguments[0].type) && !isDateTime(arguments[0].type) && !isDateTime64(arguments[0].type))
throw Exception(
"Illegal type " + arguments[0].type->getName() + " of 1st argument of function " + getName()
+ ". Must be Date, Date32, DateTime or DateTime64.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
if (!isUInt8(arguments[1].type))
throw Exception(
"Illegal type of 2nd (optional) argument of function " + getName()
+ ". Must be constant UInt8 (week mode).",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
else if (arguments.size() == 3)
{
if (!isDate(arguments[0].type) && !isDate32(arguments[0].type) && !isDateTime(arguments[0].type) && !isDateTime64(arguments[0].type))
throw Exception(
"Illegal type " + arguments[0].type->getName() + " of argument of function " + getName()
+ ". Must be Date, Date32, DateTime or DateTime64",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
if (!isUInt8(arguments[1].type))
throw Exception(
"Illegal type of 2nd (optional) argument of function " + getName()
+ ". Must be constant UInt8 (week mode).",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
if (!isString(arguments[2].type))
throw Exception(
"Illegal type of 3rd (optional) argument of function " + getName()
+ ". Must be constant string (timezone name).",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
if ((isDate(arguments[0].type) || isDate32(arguments[0].type)) && is_result_type_date_or_date32)
throw Exception(
"The timezone argument of function " + getName() + " is allowed only when the 1st argument is DateTime or DateTime64.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
else
throw Exception(
"Number of arguments for function " + getName() + " doesn't match: passed " + toString(arguments.size())
+ ", expected 1, 2 or 3.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
}
};
}

View File

@ -0,0 +1,118 @@
#pragma once
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDate32.h>
#include <DataTypes/DataTypeDateTime.h>
#include <Functions/IFunction.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <Functions/extractTimeZoneFromFunctionArguments.h>
#include <Functions/DateTimeTransforms.h>
#include <Functions/TransformDateTime64.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
template <typename Transform>
class IFunctionDateOrDateTime : public IFunction
{
public:
static constexpr auto name = Transform::name;
String getName() const override { return name; }
bool isVariadic() const override { return true; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
size_t getNumberOfArguments() const override { return 0; }
bool useDefaultImplementationForConstants() const override { return true; }
ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {1}; }
bool hasInformationAboutMonotonicity() const override
{
return true;
}
Monotonicity getMonotonicityForRange(const IDataType & type, const Field & left, const Field & right) const override
{
if constexpr (std::is_same_v<typename Transform::FactorTransform, ZeroTransform>)
return { .is_monotonic = true, .is_always_monotonic = true };
const IFunction::Monotonicity is_monotonic = { .is_monotonic = true };
const IFunction::Monotonicity is_not_monotonic;
const DateLUTImpl * date_lut = &DateLUT::instance();
if (const auto * timezone = dynamic_cast<const TimezoneMixin *>(&type))
date_lut = &timezone->getTimeZone();
if (left.isNull() || right.isNull())
return is_not_monotonic;
/// The function is monotonous on the [left, right] segment, if the factor transformation returns the same values for them.
if (checkAndGetDataType<DataTypeDate>(&type))
{
return Transform::FactorTransform::execute(UInt16(left.get<UInt64>()), *date_lut)
== Transform::FactorTransform::execute(UInt16(right.get<UInt64>()), *date_lut)
? is_monotonic : is_not_monotonic;
}
else if (checkAndGetDataType<DataTypeDate32>(&type))
{
return Transform::FactorTransform::execute(Int32(left.get<UInt64>()), *date_lut)
== Transform::FactorTransform::execute(Int32(right.get<UInt64>()), *date_lut)
? is_monotonic : is_not_monotonic;
}
else
{
return Transform::FactorTransform::execute(UInt32(left.get<UInt64>()), *date_lut)
== Transform::FactorTransform::execute(UInt32(right.get<UInt64>()), *date_lut)
? is_monotonic : is_not_monotonic;
}
}
protected:
void checkArguments(const ColumnsWithTypeAndName & arguments, bool is_result_type_date_or_date32) const
{
if (arguments.size() == 1)
{
if (!isDateOrDate32(arguments[0].type) && !isDateTime(arguments[0].type) && !isDateTime64(arguments[0].type))
throw Exception(
"Illegal type " + arguments[0].type->getName() + " of argument of function " + getName()
+ ". Should be Date, Date32, DateTime or DateTime64",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
else if (arguments.size() == 2)
{
if (!isDateOrDate32(arguments[0].type) && !isDateTime(arguments[0].type) && !isDateTime64(arguments[0].type))
throw Exception(
"Illegal type " + arguments[0].type->getName() + " of argument of function " + getName()
+ ". Should be Date, Date32, DateTime or DateTime64",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
if (!isString(arguments[1].type))
throw Exception(
"Function " + getName() + " supports 1 or 2 arguments. The optional 2nd argument must be "
"a constant string with a timezone name",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
if ((isDate(arguments[0].type) || isDate32(arguments[0].type)) && is_result_type_date_or_date32)
throw Exception(
"The timezone argument of function " + getName() + " is allowed only when the 1st argument has the type DateTime or DateTime64",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
else
throw Exception(
"Number of arguments for function " + getName() + " doesn't match: passed " + toString(arguments.size())
+ ", should be 1 or 2",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
}
};
}

View File

@ -87,6 +87,46 @@ public:
return wrapped_transform.execute(t, std::forward<Args>(args)...);
}
template <typename ... Args>
inline auto NO_SANITIZE_UNDEFINED execute_extended_result(const DateTime64 & t, Args && ... args) const
{
/// Type conversion from float to integer may be required.
/// We are Ok with implementation specific result for out of range and denormals conversion.
if constexpr (TransformHasExecuteOverload_v<DateTime64, decltype(scale_multiplier), Args...>)
{
return wrapped_transform.execute_extended_result(t, scale_multiplier, std::forward<Args>(args)...);
}
else if constexpr (TransformHasExecuteOverload_v<DecimalUtils::DecimalComponents<DateTime64>, Args...>)
{
auto components = DecimalUtils::splitWithScaleMultiplier(t, scale_multiplier);
const auto result = wrapped_transform.execute_extended_result(components, std::forward<Args>(args)...);
using ResultType = std::decay_t<decltype(result)>;
if constexpr (std::is_same_v<DecimalUtils::DecimalComponents<DateTime64>, ResultType>)
{
return DecimalUtils::decimalFromComponentsWithMultiplier<DateTime64>(result, scale_multiplier);
}
else
{
return result;
}
}
else
{
const auto components = DecimalUtils::splitWithScaleMultiplier(t, scale_multiplier);
return wrapped_transform.execute_extended_result(static_cast<Int64>(components.whole), std::forward<Args>(args)...);
}
}
template <typename T, typename ... Args, typename = std::enable_if_t<!std::is_same_v<T, DateTime64>>>
inline auto execute_extended_result(const T & t, Args && ... args) const
{
return wrapped_transform.execute_extended_result(t, std::forward<Args>(args)...);
}
private:
DateTime64::NativeType scale_multiplier = 1;
Transform wrapped_transform = {};

View File

@ -83,7 +83,7 @@ public:
for (size_t i = 0; i < input_rows_count; ++i)
{
if (const auto * symbol = symbol_index.findSymbol(reinterpret_cast<const void *>(data[i])))
result_column->insertDataWithTerminatingZero(symbol->name, strlen(symbol->name) + 1);
result_column->insertData(symbol->name, strlen(symbol->name));
else
result_column->insertDefault();
}

View File

@ -78,15 +78,15 @@ public:
for (size_t i = 0; i < input_rows_count; ++i)
{
StringRef source = column_concrete->getDataAtWithTerminatingZero(i);
StringRef source = column_concrete->getDataAt(i);
auto demangled = tryDemangle(source.data);
if (demangled)
{
result_column->insertDataWithTerminatingZero(demangled.get(), strlen(demangled.get()) + 1);
result_column->insertData(demangled.get(), strlen(demangled.get()));
}
else
{
result_column->insertDataWithTerminatingZero(source.data, source.size);
result_column->insertData(source.data, source.size);
}
}
@ -102,4 +102,3 @@ REGISTER_FUNCTION(Demangle)
}
}

View File

@ -1,6 +1,7 @@
#include <DataTypes/DataTypesNumber.h>
#include <Functions/CustomWeekTransforms.h>
#include <Functions/FunctionCustomWeekToSomething.h>
#include <Functions/FunctionCustomWeekToDateOrDate32.h>
#include <Functions/FunctionFactory.h>
#include <Functions/IFunction.h>
@ -9,7 +10,7 @@ namespace DB
{
using FunctionToWeek = FunctionCustomWeekToSomething<DataTypeUInt8, ToWeekImpl>;
using FunctionToYearWeek = FunctionCustomWeekToSomething<DataTypeUInt32, ToYearWeekImpl>;
using FunctionToStartOfWeek = FunctionCustomWeekToSomething<DataTypeDate, ToStartOfWeekImpl>;
using FunctionToStartOfWeek = FunctionCustomWeekToDateOrDate32<ToStartOfWeekImpl>;
REGISTER_FUNCTION(ToCustomWeek)
{

View File

@ -1,12 +1,12 @@
#include <Functions/FunctionFactory.h>
#include <Functions/DateTimeTransforms.h>
#include <Functions/FunctionDateOrDateTimeToSomething.h>
#include <Functions/FunctionDateOrDateTimeToDateOrDate32.h>
namespace DB
{
using FunctionToLastDayOfMonth = FunctionDateOrDateTimeToSomething<DataTypeDate, ToLastDayOfMonthImpl>;
using FunctionToLastDayOfMonth = FunctionDateOrDateTimeToDateOrDate32<ToLastDayOfMonthImpl>;
REGISTER_FUNCTION(ToLastDayOfMonth)
{

View File

@ -1,12 +1,12 @@
#include <Functions/FunctionFactory.h>
#include <Functions/DateTimeTransforms.h>
#include <Functions/FunctionDateOrDateTimeToSomething.h>
#include <Functions/FunctionDateOrDateTimeToDateOrDate32.h>
namespace DB
{
using FunctionToMonday = FunctionDateOrDateTimeToSomething<DataTypeDate, ToMondayImpl>;
using FunctionToMonday = FunctionDateOrDateTimeToDateOrDate32<ToMondayImpl>;
REGISTER_FUNCTION(ToMonday)
{

View File

@ -1,12 +1,12 @@
#include <Functions/FunctionFactory.h>
#include <Functions/DateTimeTransforms.h>
#include <Functions/FunctionDateOrDateTimeToSomething.h>
#include <Functions/FunctionDateOrDateTimeToDateOrDate32.h>
namespace DB
{
using FunctionToStartOfISOYear = FunctionDateOrDateTimeToSomething<DataTypeDate, ToStartOfISOYearImpl>;
using FunctionToStartOfISOYear = FunctionDateOrDateTimeToDateOrDate32<ToStartOfISOYearImpl>;
REGISTER_FUNCTION(ToStartOfISOYear)
{

View File

@ -1,12 +1,12 @@
#include <Functions/FunctionFactory.h>
#include <Functions/DateTimeTransforms.h>
#include <Functions/FunctionDateOrDateTimeToSomething.h>
#include <Functions/FunctionDateOrDateTimeToDateOrDate32.h>
namespace DB
{
using FunctionToStartOfMonth = FunctionDateOrDateTimeToSomething<DataTypeDate, ToStartOfMonthImpl>;
using FunctionToStartOfMonth = FunctionDateOrDateTimeToDateOrDate32<ToStartOfMonthImpl>;
REGISTER_FUNCTION(ToStartOfMonth)
{

View File

@ -1,12 +1,12 @@
#include <Functions/FunctionFactory.h>
#include <Functions/DateTimeTransforms.h>
#include <Functions/FunctionDateOrDateTimeToSomething.h>
#include <Functions/FunctionDateOrDateTimeToDateOrDate32.h>
namespace DB
{
using FunctionToStartOfQuarter = FunctionDateOrDateTimeToSomething<DataTypeDate, ToStartOfQuarterImpl>;
using FunctionToStartOfQuarter = FunctionDateOrDateTimeToDateOrDate32<ToStartOfQuarterImpl>;
REGISTER_FUNCTION(ToStartOfQuarter)
{

View File

@ -1,12 +1,12 @@
#include <Functions/FunctionFactory.h>
#include <Functions/DateTimeTransforms.h>
#include <Functions/FunctionDateOrDateTimeToSomething.h>
#include <Functions/FunctionDateOrDateTimeToDateOrDate32.h>
namespace DB
{
using FunctionToStartOfYear = FunctionDateOrDateTimeToSomething<DataTypeDate, ToStartOfYearImpl>;
using FunctionToStartOfYear = FunctionDateOrDateTimeToDateOrDate32<ToStartOfYearImpl>;
REGISTER_FUNCTION(ToStartOfYear)
{

View File

@ -131,18 +131,18 @@ bool ReadBufferFromS3::nextImpl()
ProfileEvents::increment(ProfileEvents::ReadBufferFromS3Microseconds, watch.elapsedMicroseconds());
break;
}
catch (const Exception & e)
catch (Exception & e)
{
watch.stop();
ProfileEvents::increment(ProfileEvents::ReadBufferFromS3Microseconds, watch.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::ReadBufferFromS3RequestsErrors, 1);
if (const auto * s3_exception = dynamic_cast<const S3Exception *>(&e))
if (auto * s3_exception = dynamic_cast<S3Exception *>(&e))
{
/// It doesn't make sense to retry Access Denied or No Such Key
if (!s3_exception->isRetryableError())
{
tryLogCurrentException(log, fmt::format("while reading key: {}, from bucket: {}", key, bucket));
s3_exception->addMessage("while reading key: {}, from bucket: {}", key, bucket);
throw;
}
}

View File

@ -55,7 +55,7 @@ public:
bool isRetryableError() const;
private:
const Aws::S3::S3Errors code;
Aws::S3::S3Errors code;
};
}

View File

@ -457,6 +457,8 @@ try
format->addBuffer(std::move(last_buffer));
if (total_rows)
{
auto chunk = Chunk(executor.getResultColumns(), total_rows);
size_t total_bytes = chunk.bytes();
@ -468,6 +470,7 @@ try
LOG_INFO(log, "Flushed {} rows, {} bytes for query '{}'",
total_rows, total_bytes, queryToString(key.query));
}
for (const auto & entry : data->entries)
if (!entry->isFinished())

View File

@ -418,7 +418,7 @@ void optimizeDuplicateDistinct(ASTSelectQuery & select)
return;
std::unordered_set<String> distinct_names = getDistinctNames(*subselect);
std::unordered_set<String> selected_names;
std::unordered_set<std::string_view> selected_names;
/// Check source column names from select list (ignore aliases and table names)
for (const auto & id : select.select()->children)
@ -427,11 +427,11 @@ void optimizeDuplicateDistinct(ASTSelectQuery & select)
if (!identifier)
return;
String name = identifier->shortName();
const String & name = identifier->shortName();
if (!distinct_names.contains(name))
return; /// Not a distinct column, keep DISTINCT for it.
selected_names.insert(name);
selected_names.emplace(name);
}
/// select columns list != distinct columns list

View File

@ -250,13 +250,13 @@ namespace
if (value.type() == Poco::MongoDB::ElementTraits<ObjectId::Ptr>::TypeId)
{
std::string string_id = value.toString();
assert_cast<ColumnString &>(column).insertDataWithTerminatingZero(string_id.data(), string_id.size() + 1);
assert_cast<ColumnString &>(column).insertData(string_id.data(), string_id.size());
break;
}
else if (value.type() == Poco::MongoDB::ElementTraits<String>::TypeId)
{
String string = static_cast<const Poco::MongoDB::ConcreteElement<String> &>(value).value();
assert_cast<ColumnString &>(column).insertDataWithTerminatingZero(string.data(), string.size() + 1);
assert_cast<ColumnString &>(column).insertData(string.data(), string.size());
break;
}

View File

@ -204,13 +204,12 @@ DataPartStorageBuilderPtr DataPartStorageOnDisk::getBuilder() const
}
void DataPartStorageOnDisk::remove(
bool can_remove_shared_data,
const NameSet & names_not_to_remove,
CanRemoveCallback && can_remove_callback,
const MergeTreeDataPartChecksums & checksums,
std::list<ProjectionChecksums> projections,
bool is_temp,
MergeTreeDataPartState state,
Poco::Logger * log) const
Poco::Logger * log)
{
/// NOTE We rename part to delete_tmp_<relative_path> instead of delete_tmp_<name> to avoid race condition
/// when we try to remove two parts with the same name, but different relative paths,
@ -239,13 +238,16 @@ void DataPartStorageOnDisk::remove(
fs::path to = fs::path(root_path) / part_dir_without_slash;
std::optional<CanRemoveDescription> can_remove_description;
auto disk = volume->getDisk();
if (disk->exists(to))
{
LOG_WARNING(log, "Directory {} (to which part must be renamed before removing) already exists. Most likely this is due to unclean restart or race condition. Removing it.", fullPath(disk, to));
try
{
disk->removeSharedRecursive(fs::path(to) / "", !can_remove_shared_data, names_not_to_remove);
can_remove_description.emplace(can_remove_callback());
disk->removeSharedRecursive(fs::path(to) / "", !can_remove_description->can_remove_anything, can_remove_description->files_not_to_remove);
}
catch (...)
{
@ -257,6 +259,7 @@ void DataPartStorageOnDisk::remove(
try
{
disk->moveDirectory(from, to);
onRename(root_path, part_dir_without_slash);
}
catch (const fs::filesystem_error & e)
{
@ -268,6 +271,9 @@ void DataPartStorageOnDisk::remove(
throw;
}
if (!can_remove_description)
can_remove_description.emplace(can_remove_callback());
// Record existing projection directories so we don't remove them twice
std::unordered_set<String> projection_directories;
std::string proj_suffix = ".proj";
@ -278,7 +284,7 @@ void DataPartStorageOnDisk::remove(
clearDirectory(
fs::path(to) / proj_dir_name,
can_remove_shared_data, names_not_to_remove, projection.checksums, {}, is_temp, state, log, true);
can_remove_description->can_remove_anything, can_remove_description->files_not_to_remove, projection.checksums, {}, is_temp, state, log, true);
}
/// It is possible that we are removing the part which have a written but not loaded projection.
@ -305,7 +311,7 @@ void DataPartStorageOnDisk::remove(
clearDirectory(
fs::path(to) / name,
can_remove_shared_data, names_not_to_remove, tmp_checksums, {}, is_temp, state, log, true);
can_remove_description->can_remove_anything, can_remove_description->files_not_to_remove, tmp_checksums, {}, is_temp, state, log, true);
}
catch (...)
{
@ -315,7 +321,7 @@ void DataPartStorageOnDisk::remove(
}
}
clearDirectory(to, can_remove_shared_data, names_not_to_remove, checksums, projection_directories, is_temp, state, log, false);
clearDirectory(to, can_remove_description->can_remove_anything, can_remove_description->files_not_to_remove, checksums, projection_directories, is_temp, state, log, false);
}
void DataPartStorageOnDisk::clearDirectory(

View File

@ -45,13 +45,12 @@ public:
void checkConsistency(const MergeTreeDataPartChecksums & checksums) const override;
void remove(
bool can_remove_shared_data,
const NameSet & names_not_to_remove,
CanRemoveCallback && can_remove_callback,
const MergeTreeDataPartChecksums & checksums,
std::list<ProjectionChecksums> projections,
bool is_temp,
MergeTreeDataPartState state,
Poco::Logger * log) const override;
Poco::Logger * log) override;
std::string getRelativePathForPrefix(Poco::Logger * log, const String & prefix, bool detached) const override;

View File

@ -12,6 +12,13 @@ namespace DB
class ReadBufferFromFileBase;
class WriteBufferFromFileBase;
struct CanRemoveDescription
{
bool can_remove_anything;
NameSet files_not_to_remove;
};
using CanRemoveCallback = std::function<CanRemoveDescription()>;
class IDataPartStorageIterator
{
@ -113,13 +120,12 @@ public:
/// can_remove_shared_data, names_not_to_remove are specific for DiskObjectStorage.
/// projections, checksums are needed to avoid recursive listing
virtual void remove(
bool can_remove_shared_data,
const NameSet & names_not_to_remove,
CanRemoveCallback && can_remove_callback,
const MergeTreeDataPartChecksums & checksums,
std::list<ProjectionChecksums> projections,
bool is_temp,
MergeTreeDataPartState state,
Poco::Logger * log) const = 0;
Poco::Logger * log) = 0;
/// Get a name like 'prefix_partdir_tryN' which does not exist in a root dir.
/// TODO: remove it.

View File

@ -1433,14 +1433,19 @@ void IMergeTreeDataPart::remove() const
assert(assertHasValidVersionMetadata());
part_is_probably_removed_from_disk = true;
auto can_remove_callback = [this] ()
{
auto [can_remove, files_not_to_remove] = canRemovePart();
if (!can_remove)
LOG_TRACE(storage.log, "Blobs of part {} cannot be removed", name);
if (!files_not_to_remove.empty())
LOG_TRACE(storage.log, "Some blobs ({}) of part {} cannot be removed", fmt::join(files_not_to_remove, ", "), name);
return CanRemoveDescription{.can_remove_anything = can_remove, .files_not_to_remove = files_not_to_remove };
};
if (!isStoredOnDisk())
return;
@ -1459,7 +1464,7 @@ void IMergeTreeDataPart::remove() const
projection_checksums.emplace_back(IDataPartStorage::ProjectionChecksums{.name = p_name, .checksums = projection_part->checksums});
}
data_part_storage->remove(can_remove, files_not_to_remove, checksums, projection_checksums, is_temp, getState(), storage.log);
data_part_storage->remove(std::move(can_remove_callback), checksums, projection_checksums, is_temp, getState(), storage.log);
}
String IMergeTreeDataPart::getRelativePathForPrefix(const String & prefix, bool detached) const

View File

@ -27,6 +27,7 @@
#include <IO/Operators.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/WriteBufferFromString.h>
#include <IO/S3Common.h>
#include <Interpreters/Aggregator.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/PartLog.h>
@ -1635,11 +1636,10 @@ size_t MergeTreeData::clearOldTemporaryDirectories(size_t custom_directories_lif
/// We don't control the amount of refs for temporary parts so we cannot decide can we remove blobs
/// or not. So we are not doing it
bool keep_shared = false;
if (it->path().find("fetch") != std::string::npos)
if (disk->supportZeroCopyReplication() && settings->allow_remote_fs_zero_copy_replication)
{
keep_shared = disk->supportZeroCopyReplication() && settings->allow_remote_fs_zero_copy_replication;
if (keep_shared)
LOG_WARNING(log, "Since zero-copy replication is enabled we are not going to remove blobs from shared storage for {}", full_path);
keep_shared = true;
}
disk->removeSharedRecursive(it->path(), keep_shared, {});
@ -2135,6 +2135,7 @@ void MergeTreeData::renameInMemory(const StorageID & new_table_id)
void MergeTreeData::dropAllData()
{
LOG_TRACE(log, "dropAllData: waiting for locks.");
auto settings_ptr = getSettings();
auto lock = lockParts();

View File

@ -117,10 +117,10 @@ struct InputOrderInfo
* sort_description_for_merging will be equal to (c, d) and
* used_prefix_of_sorting_key_size will be equal to 4.
*/
size_t used_prefix_of_sorting_key_size;
const size_t used_prefix_of_sorting_key_size;
int direction;
UInt64 limit;
const int direction;
const UInt64 limit;
InputOrderInfo(
const SortDescription & sort_description_for_merging_,

View File

@ -7604,7 +7604,7 @@ std::pair<bool, NameSet> StorageReplicatedMergeTree::unlockSharedData(const IMer
}
else
{
LOG_TRACE(log, "Part {} looks temporary, because checksums file doesn't exists, blobs can be removed", part.name);
LOG_TRACE(log, "Part {} looks temporary, because {} file doesn't exists, blobs can be removed", part.name, IMergeTreeDataPart::FILE_FOR_REFERENCES_CHECK);
/// Temporary part with some absent file cannot be locked in shared mode
return std::make_pair(true, NameSet{});
}

View File

@ -21,8 +21,7 @@
<heart_beat_interval_ms>1000</heart_beat_interval_ms>
<election_timeout_lower_bound_ms>2000</election_timeout_lower_bound_ms>
<election_timeout_upper_bound_ms>4000</election_timeout_upper_bound_ms>
<quorum_reads>0</quorum_reads>
<read_mode>fastlinear</read_mode>
<quorum_reads>{quorum_reads}</quorum_reads>
<snapshot_distance>{snapshot_distance}</snapshot_distance>
<stale_log_gap>{stale_log_gap}</stale_log_gap>
<reserved_log_items>{reserved_log_items}</reserved_log_items>

View File

@ -27,12 +27,7 @@
(invoke! [this test op]
(case (:f op)
:read (try
(assoc op
:type :ok
:value (count (zk-list conn root-path)))
(catch Exception _ (assoc op :type :info, :error :connect-error)))
:final-read (exec-with-retries 30 (fn []
:read (exec-with-retries 30 (fn []
(assoc op
:type :ok
:value (count (zk-list conn root-path)))))
@ -54,5 +49,7 @@
:checker (checker/compose
{:counter (checker/counter)
:perf (checker/perf)})
:generator (gen/mix [r add])
:final-generator (gen/once {:type :invoke, :f :final-read, :value nil})})
:generator (->> (range)
(map (fn [x]
(->> (gen/mix [r add])))))
:final-generator (gen/once {:type :invoke, :f :read, :value nil})})

View File

@ -98,6 +98,7 @@
#"\{srv2\}" (get nodes 1)
#"\{srv3\}" (get nodes 2)
#"\{id\}" (str (inc (.indexOf nodes node)))
#"\{quorum_reads\}" (str (boolean (:quorum test)))
#"\{snapshot_distance\}" (str (:snapshot-distance test))
#"\{stale_log_gap\}" (str (:stale-log-gap test))
#"\{reserved_log_items\}" (str (:reserved-log-items test))}]

View File

@ -103,7 +103,7 @@
current-nemesis (get custom-nemesis/custom-nemesises (:nemesis opts))]
(merge tests/noop-test
opts
{:name (str "clickhouse-keeper-" (name (:workload opts)) "-" (name (:nemesis opts)))
{:name (str "clickhouse-keeper-quorum=" quorum "-" (name (:workload opts)) "-" (name (:nemesis opts)))
:os ubuntu/os
:db (get-db opts)
:pure-generators true

View File

@ -20,8 +20,7 @@
(assoc this :conn (zk-connect node 9181 30000)))
(setup! [this test]
(exec-with-retries 30 (fn []
(zk-create-range conn 300))))
(zk-create-range conn 300)) ; 300 nodes to be sure
(invoke! [_ test op]
(let [[k v] (:value op)

View File

@ -45,7 +45,7 @@
(defn zk-connect
[host port timeout]
(zk/connect (str host ":" port) :timeout-msec timeout))
(exec-with-retries 30 (fn [] (zk/connect (str host ":" port) :timeout-msec timeout))))
(defn zk-create-range
[conn n]

View File

@ -109,10 +109,10 @@
-------toStartOfFifteenMinutes---------
-------toStartOfHour---------
-------toStartOfISOYear---------
2079-06-07
2079-06-07
2119-07-29
2119-07-29
1970-01-01
1970-01-01
2148-12-30
2148-12-30
2021-01-04
-------toRelativeYearNum---------
1900

View File

@ -0,0 +1,56 @@
toStartOfYear;toDate32;true 1920-01-01
type;toStartOfYear;toDate32;true Date32
toStartOfYear;toDateTime64;true 1920-01-01
type;toStartOfYear;toDateTime64;true Date32
toStartOfISOYear;toDate32;true 1919-12-29
type;toStartOfISOYear;toDate32;true Date32
toStartOfISOYear;toDateTime64;true 1919-12-29
type;toStartOfISOYear;toDateTime64;true Date32
toStartOfQuarter;toDate32;true 1920-01-01
type;toStartOfQuarter;toDate32;true Date32
toStartOfQuarter;toDateTime64;true 1920-01-01
type;toStartOfQuarter;toDateTime64;true Date32
toStartOfMonth;toDate32;true 1920-02-01
type;toStartOfMonth;toDate32;true Date32
toStartOfMonth;toDateTime64;true 1920-02-01
type;toStartOfMonth;toDateTime64;true Date32
toStartOfWeek;toDate32;true 1920-02-01
type;toStartOfWeek;toDate32;true Date32
toStartOfWeek;toDateTime64;true 1920-02-01
type;toStartOfWeek;toDateTime64;true Date32
toMonday;toDate32;true 1920-02-02
type;toMonday;toDate32;true Date32
toMonday;toDateTime64;true 1920-02-02
type;toMonday;toDateTime64;true Date32
toLastDayOfMonth;toDate32;true 1920-02-29
type;toLastDayOfMonth;toDate32;true Date32
toLastDayOfMonth;toDateTime64;true 1920-02-29
type;toLastDayOfMonth;toDateTime64;true Date32
toStartOfYear;toDate32;false 1970-01-01
type;toStartOfYear;toDate32;false Date
toStartOfYear;toDateTime64;false 1970-01-01
type;toStartOfYear;toDateTime64;false Date
toStartOfISOYear;toDate32;false 1970-01-01
type;toStartOfISOYear;toDate32;false Date
toStartOfISOYear;toDateTime64;false 1970-01-01
type;toStartOfISOYear;toDateTime64;false Date
toStartOfQuarter;toDate32;false 1970-01-01
type;toStartOfQuarter;toDate32;false Date
toStartOfQuarter;toDateTime64;false 1970-01-01
type;toStartOfQuarter;toDateTime64;false Date
toStartOfMonth;toDate32;false 1970-01-01
type;toStartOfMonth;toDate32;false Date
toStartOfMonth;toDateTime64;false 1970-01-01
type;toStartOfMonth;toDateTime64;false Date
toStartOfWeek;toDate32;false 1970-01-01
type;toStartOfWeek;toDate32;false Date
toStartOfWeek;toDateTime64;false 1970-01-01
type;toStartOfWeek;toDateTime64;false Date
toMonday;toDate32;false 1970-01-01
type;toMonday;toDate32;false Date
toMonday;toDateTime64;false 1970-01-01
type;toMonday;toDateTime64;false Date
toLastDayOfMonth;toDate32;false 1970-01-01
type;toLastDayOfMonth;toDate32;false Date
toLastDayOfMonth;toDateTime64;false 1970-01-01
type;toLastDayOfMonth;toDateTime64;false Date

View File

@ -0,0 +1,9 @@
{% for option_value in ['true', 'false'] -%}
{% for date_fun in ['toStartOfYear', 'toStartOfISOYear', 'toStartOfQuarter', 'toStartOfMonth', 'toStartOfWeek', 'toMonday', 'toLastDayOfMonth'] -%}
SELECT '{{ date_fun }};toDate32;{{ option_value }}', {{ date_fun }}(toDate32('1920-02-02')) SETTINGS enable_extended_results_for_datetime_functions = {{ option_value }};
SELECT 'type;{{ date_fun }};toDate32;{{ option_value }}', toTypeName({{ date_fun }}(toDate32('1920-02-02'))) SETTINGS enable_extended_results_for_datetime_functions = {{ option_value }};
SELECT '{{ date_fun }};toDateTime64;{{ option_value }}', {{ date_fun }}(toDateTime64('1920-02-02 10:20:30', 3)) SETTINGS enable_extended_results_for_datetime_functions = {{ option_value }};
SELECT 'type;{{ date_fun }};toDateTime64;{{ option_value }}', toTypeName({{ date_fun }}(toDateTime64('1920-02-02 10:20:30', 3))) SETTINGS enable_extended_results_for_datetime_functions = {{ option_value }};
{% endfor -%}
{% endfor -%}

View File

@ -0,0 +1,5 @@
Cannot parse object
0
0
Cannot parse object
aaa

View File

@ -0,0 +1,21 @@
#!/usr/bin/env bash
# Tags: no-fasttest
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS t_json_async_insert"
$CLICKHOUSE_CLIENT --allow_experimental_object_type=1 -q "CREATE TABLE t_json_async_insert (data JSON) ENGINE = MergeTree ORDER BY tuple()"
$CLICKHOUSE_CLIENT --async_insert=1 --wait_for_async_insert=1 -q 'INSERT INTO t_json_async_insert FORMAT JSONAsObject {"aaa"}' 2>&1 | grep -o -m1 "Cannot parse object"
$CLICKHOUSE_CLIENT -q "SELECT count() FROM t_json_async_insert"
$CLICKHOUSE_CLIENT -q "SELECT count() FROM system.parts WHERE database = '$CLICKHOUSE_DATABASE' AND table = 't_json_async_insert'"
$CLICKHOUSE_CLIENT --async_insert=1 --wait_for_async_insert=1 -q 'INSERT INTO t_json_async_insert FORMAT JSONAsObject {"aaa"}' 2>&1 | grep -o -m1 "Cannot parse object" &
$CLICKHOUSE_CLIENT --async_insert=1 --wait_for_async_insert=1 -q 'INSERT INTO t_json_async_insert FORMAT JSONAsObject {"k1": "aaa"}' &
wait
$CLICKHOUSE_CLIENT -q "SELECT data.k1 FROM t_json_async_insert ORDER BY data.k1"
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS t_json_async_insert"

View File

@ -0,0 +1,26 @@
Collapsing
0
0
id UInt64
s Int8
data Tuple(_dummy UInt8)
DELETE all
2
1
id UInt64
data Tuple(k1 String, k2 String)
0
0
id UInt64
data Tuple(_dummy UInt8)
TTL
1
1
id UInt64
d Date
data Tuple(k1 String, k2 String)
0
0
id UInt64
d Date
data Tuple(_dummy UInt8)

View File

@ -0,0 +1,61 @@
-- Tags: no-fasttest
SET allow_experimental_object_type = 1;
DROP TABLE IF EXISTS t_json_empty_parts;
SELECT 'Collapsing';
CREATE TABLE t_json_empty_parts (id UInt64, s Int8, data JSON) ENGINE = CollapsingMergeTree(s) ORDER BY id;
INSERT INTO t_json_empty_parts VALUES (1, 1, '{"k1": "aaa"}') (1, -1, '{"k2": "bbb"}');
SELECT count() FROM t_json_empty_parts;
SELECT count() FROM system.parts WHERE table = 't_json_empty_parts' AND database = currentDatabase() AND active;
DESC TABLE t_json_empty_parts SETTINGS describe_extend_object_types = 1;
DROP TABLE t_json_empty_parts;
DROP TABLE IF EXISTS t_json_empty_parts;
SELECT 'DELETE all';
CREATE TABLE t_json_empty_parts (id UInt64, data JSON) ENGINE = MergeTree ORDER BY id;
INSERT INTO t_json_empty_parts VALUES (1, '{"k1": "aaa"}') (2, '{"k2": "bbb"}');
SELECT count() FROM t_json_empty_parts;
SELECT count() FROM system.parts WHERE table = 't_json_empty_parts' AND database = currentDatabase() AND active;
DESC TABLE t_json_empty_parts SETTINGS describe_extend_object_types = 1;
SET mutations_sync = 2;
ALTER TABLE t_json_empty_parts DELETE WHERE 1;
DETACH TABLE t_json_empty_parts;
ATTACH TABLE t_json_empty_parts;
SELECT count() FROM t_json_empty_parts;
SELECT count() FROM system.parts WHERE table = 't_json_empty_parts' AND database = currentDatabase() AND active;
DESC TABLE t_json_empty_parts SETTINGS describe_extend_object_types = 1;
DROP TABLE IF EXISTS t_json_empty_parts;
SELECT 'TTL';
CREATE TABLE t_json_empty_parts (id UInt64, d Date, data JSON) ENGINE = MergeTree ORDER BY id TTL d WHERE id % 2 = 1;
INSERT INTO t_json_empty_parts VALUES (1, '2000-01-01', '{"k1": "aaa"}') (2, '2000-01-01', '{"k2": "bbb"}');
OPTIMIZE TABLE t_json_empty_parts FINAL;
SELECT count() FROM t_json_empty_parts;
SELECT count() FROM system.parts WHERE table = 't_json_empty_parts' AND database = currentDatabase() AND active;
DESC TABLE t_json_empty_parts SETTINGS describe_extend_object_types = 1;
ALTER TABLE t_json_empty_parts MODIFY TTL d;
OPTIMIZE TABLE t_json_empty_parts FINAL;
DETACH TABLE t_json_empty_parts;
ATTACH TABLE t_json_empty_parts;
SELECT count() FROM t_json_empty_parts;
SELECT count() FROM system.parts WHERE table = 't_json_empty_parts' AND database = currentDatabase() AND active;
DESC TABLE t_json_empty_parts SETTINGS describe_extend_object_types = 1;
DROP TABLE IF EXISTS t_json_empty_parts;

View File

@ -0,0 +1,7 @@
SELECT formatRow('RawBLOB', [[[33]], []]); -- { serverError 48 }
SELECT formatRow('RawBLOB', [[[]], []]); -- { serverError 48 }
SELECT formatRow('RawBLOB', [[[[[[[0x48, 0x65, 0x6c, 0x6c, 0x6f]]]]]], []]); -- { serverError 48 }
SELECT formatRow('RawBLOB', []::Array(Array(Nothing))); -- { serverError 48 }
SELECT formatRow('RawBLOB', [[], [['Hello']]]); -- { serverError 48 }
SELECT formatRow('RawBLOB', [[['World']], []]); -- { serverError 48 }
SELECT formatRow('RawBLOB', []::Array(String)); -- { serverError 48 }