Merge branch 'master' into fix_undead_sessions

This commit is contained in:
Alexander Tokmakov 2024-02-22 14:36:36 +01:00
commit 97ed2c05b0
352 changed files with 7207 additions and 2228 deletions

View File

@ -11,7 +11,7 @@ on: # yamllint disable-line rule:truthy
- 'backport/**'
jobs:
RunConfig:
runs-on: [self-hosted, style-checker]
runs-on: [self-hosted, style-checker-aarch64]
outputs:
data: ${{ steps.runconfig.outputs.CI_DATA }}
steps:

View File

@ -11,7 +11,7 @@ on: # yamllint disable-line rule:truthy
- 'master'
jobs:
RunConfig:
runs-on: [self-hosted, style-checker]
runs-on: [self-hosted, style-checker-aarch64]
outputs:
data: ${{ steps.runconfig.outputs.CI_DATA }}
steps:

View File

@ -14,7 +14,7 @@ jobs:
# The task for having a preserved ENV and event.json for later investigation
uses: ./.github/workflows/debug.yml
RunConfig:
runs-on: [self-hosted, style-checker]
runs-on: [self-hosted, style-checker-aarch64]
outputs:
data: ${{ steps.runconfig.outputs.CI_DATA }}
steps:

View File

@ -18,7 +18,7 @@ on: # yamllint disable-line rule:truthy
##########################################################################################
jobs:
RunConfig:
runs-on: [self-hosted, style-checker]
runs-on: [self-hosted, style-checker-aarch64]
outputs:
data: ${{ steps.runconfig.outputs.CI_DATA }}
steps:

View File

@ -14,7 +14,7 @@ on: # yamllint disable-line rule:truthy
jobs:
RunConfig:
runs-on: [self-hosted, style-checker]
runs-on: [self-hosted, style-checker-aarch64]
outputs:
data: ${{ steps.runconfig.outputs.CI_DATA }}
steps:

View File

@ -1,6 +1,7 @@
#pragma once
#include <base/types.h>
#include <base/extended_types.h>
namespace wide
{
@ -44,3 +45,8 @@ concept is_over_big_int =
|| std::is_same_v<T, Decimal128>
|| std::is_same_v<T, Decimal256>;
}
template <> struct is_signed<DB::Decimal32> { static constexpr bool value = true; };
template <> struct is_signed<DB::Decimal64> { static constexpr bool value = true; };
template <> struct is_signed<DB::Decimal128> { static constexpr bool value = true; };
template <> struct is_signed<DB::Decimal256> { static constexpr bool value = true; };

2
contrib/NuRaft vendored

@ -1 +1 @@
Subproject commit 1278e32bb0d5dc489f947e002bdf8c71b0ddaa63
Subproject commit 5bb3a0e8257bacd65b099cb1b7239bd6b9a2c477

View File

@ -1,7 +1,7 @@
version: '2.3'
services:
mysql2:
image: mysql:5.7
image: mysql:8.0
restart: always
environment:
MYSQL_ROOT_PASSWORD: clickhouse
@ -23,7 +23,7 @@ services:
source: ${MYSQL_CLUSTER_LOGS:-}
target: /mysql/
mysql3:
image: mysql:5.7
image: mysql:8.0
restart: always
environment:
MYSQL_ROOT_PASSWORD: clickhouse
@ -45,7 +45,7 @@ services:
source: ${MYSQL_CLUSTER_LOGS:-}
target: /mysql/
mysql4:
image: mysql:5.7
image: mysql:8.0
restart: always
environment:
MYSQL_ROOT_PASSWORD: clickhouse

View File

@ -77,6 +77,12 @@ remove_keeper_config "async_replication" "1"
# create_if_not_exists feature flag doesn't exist on some older versions
remove_keeper_config "create_if_not_exists" "[01]"
# latest_logs_cache_size_threshold setting doesn't exist on some older versions
remove_keeper_config "latest_logs_cache_size_threshold" "[[:digit:]]\+"
# commit_logs_cache_size_threshold setting doesn't exist on some older versions
remove_keeper_config "commit_logs_cache_size_threshold" "[[:digit:]]\+"
# it contains some new settings, but we can safely remove it
rm /etc/clickhouse-server/config.d/merge_tree.xml
rm /etc/clickhouse-server/config.d/enable_wait_for_shutdown_replicated_tables.xml
@ -109,6 +115,12 @@ remove_keeper_config "async_replication" "1"
# create_if_not_exists feature flag doesn't exist on some older versions
remove_keeper_config "create_if_not_exists" "[01]"
# latest_logs_cache_size_threshold setting doesn't exist on some older versions
remove_keeper_config "latest_logs_cache_size_threshold" "[[:digit:]]\+"
# commit_logs_cache_size_threshold setting doesn't exist on some older versions
remove_keeper_config "commit_logs_cache_size_threshold" "[[:digit:]]\+"
# But we still need default disk because some tables loaded only into it
sudo cat /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml \
| sed "s|<main><disk>s3</disk></main>|<main><disk>s3</disk></main><default><disk>default</disk></default>|" \

View File

@ -10,11 +10,62 @@ The ClickHouse server can be configured with configuration files in XML or YAML
It is possible to mix XML and YAML configuration files, for example you could have a main configuration file `config.xml` and additional configuration files `config.d/network.xml`, `config.d/timezone.yaml` and `config.d/keeper.yaml`. Mixing XML and YAML within a single configuration file is not supported. XML configuration files should use `<clickhouse>...</clickhouse>` as top-level tag. In YAML configuration files, `clickhouse:` is optional, the parser inserts it implicitly if absent.
## Overriding Configuration {#override}
## Merging Configuration {#merging}
The merge of configuration files behaves as one intuitively expects: The contents of both files are combined recursively, children with the same name are replaced by the element of the more specific configuration file. The merge can be customized using attributes `replace` and `remove`.
- Attribute `replace` means that the element is replaced by the specified one.
- Attribute `remove` means that the element is deleted.
Two configuration files (usually the main configuration file and another configuration files from `config.d/`) are merged as follows:
- If a node (i.e. a path leading to an element) appears in both files and does not have attributes `replace` or `remove`, it is included in the merged configuration file and children from both nodes are included and merged recursively.
- If one of both nodes contains attribute `replace`, it is included in the merged configuration file but only children from the node with attribute `replace` are included.
- If one of both nodes contains attribute `remove`, the node is not included in the merged configuration file (if it exists already, it is deleted).
Example:
```xml
<!-- config.xml -->
<clickhouse>
<config_a>
<setting_1>1</setting_1>
</config_a>
<config_b>
<setting_2>2</setting_2>
</config_b>
<config_c>
<setting_3>3</setting_3>
</config_c>
</clickhouse>
```
and
```xml
<!-- config.d/other_config.xml -->
<clickhouse>
<config_a>
<setting_4>4</setting_4>
</config_a>
<config_b replace="replace">
<setting_5>5</setting_5>
</config_b>
<config_c remove="remove">
<setting_6>6</setting_6>
</config_c>
</clickhouse>
```
generates merged configuration file:
```xml
<clickhouse>
<config_a>
<setting_1>1</setting_1>
<setting_4>4</setting_4>
</config_a>
<config_b>
<setting_5>5</setting_5>
</config_b>
</clickhouse>
```
To specify that a value of an element should be replaced by the value of an environment variable, you can use attribute `from_env`.
@ -125,7 +176,7 @@ Users configuration can be split into separate files similar to `config.xml` and
Directory name is defined as `users_config` setting without `.xml` postfix concatenated with `.d`.
Directory `users.d` is used by default, as `users_config` defaults to `users.xml`.
Note that configuration files are first merged taking into account [Override](#override) settings and includes are processed after that.
Note that configuration files are first [merged](#merging) taking into account settings, and includes are processed after that.
## XML example {#example}

View File

@ -4279,41 +4279,6 @@ Result:
└─────┴─────┴───────┘
```
## enable_order_by_all {#enable-order-by-all}
Enables or disables sorting by `ALL` columns, i.e. [ORDER BY](../../sql-reference/statements/select/order-by.md)
Possible values:
- 0 — Disable ORDER BY ALL.
- 1 — Enable ORDER BY ALL.
Default value: `1`.
**Example**
Query:
```sql
CREATE TABLE TAB(C1 Int, C2 Int, ALL Int) ENGINE=Memory();
INSERT INTO TAB VALUES (10, 20, 30), (20, 20, 10), (30, 10, 20);
SELECT * FROM TAB ORDER BY ALL; -- returns an error that ALL is ambiguous
SELECT * FROM TAB ORDER BY ALL SETTINGS enable_order_by_all;
```
Result:
```text
┌─C1─┬─C2─┬─ALL─┐
│ 20 │ 20 │ 10 │
│ 30 │ 10 │ 20 │
│ 10 │ 20 │ 30 │
└────┴────┴─────┘
```
## splitby_max_substrings_includes_remaining_string {#splitby_max_substrings_includes_remaining_string}
Controls whether function [splitBy*()](../../sql-reference/functions/splitting-merging-functions.md) with argument `max_substrings` > 0 will include the remaining string in the last element of the result array.

View File

@ -13,8 +13,8 @@ simpleLinearRegression(x, y)
Parameters:
- `x` — Column with dependent variable values.
- `y` — Column with explanatory variable values.
- `x` — Column with explanatory variable values.
- `y` — Column with dependent variable values.
Returned values:

View File

@ -509,7 +509,7 @@ Result:
## cosineDistance
Calculates the cosine distance between two vectors (the values of the tuples are the coordinates). The less the returned value is, the more similar are the vectors.
Calculates the cosine distance between two vectors (the values of the tuples are the coordinates). The smaller the returned value is, the more similar are the vectors.
**Syntax**

View File

@ -542,7 +542,7 @@ Alias: `scalarProduct`.
- Scalar product.
Type: [Int/UInt](../../sql-reference/data-types/int-uint.md), [Float](../../sql-reference/data-types/float.md) or [Decimal](../../sql-reference/data-types/decimal.md).
Type: [Int/UInt](../../sql-reference/data-types/int-uint.md) or [Float](../../sql-reference/data-types/float.md).
**Example**

View File

@ -9,10 +9,9 @@ The `ORDER BY` clause contains
- a list of expressions, e.g. `ORDER BY visits, search_phrase`,
- a list of numbers referring to columns in the `SELECT` clause, e.g. `ORDER BY 2, 1`, or
- `ALL` which means all columns of the `SELECT` clause, e.g. `ORDER BY ALL`.
- `*` (without other expressions or numbers) which means all columns of the `SELECT` clause: `ORDER BY *`.
To disable sorting by column numbers, set setting [enable_positional_arguments](../../../operations/settings/settings.md#enable-positional-arguments) = 0.
To disable sorting by `ALL`, set setting [enable_order_by_all](../../../operations/settings/settings.md#enable-order-by-all) = 0.
The `ORDER BY` clause can be attributed by a `DESC` (descending) or `ASC` (ascending) modifier which determines the sorting direction.
Unless an explicit sort order is specified, `ASC` is used by default.

View File

@ -61,14 +61,14 @@ sidebar_label: ORDER BY
我们只建议使用 `COLLATE` 对于少量行的最终排序,因为排序与 `COLLATE` 比正常的按字节排序效率低。
## ORDER BY ALL
## ORDER BY *
`ORDER BY ALL` 对所有选定的列进行升序排序。
`ORDER BY *` 对所有选定的列进行升序排序。
示例:
``` sql
SELECT a, b, c FROM t ORDER BY ALL
SELECT a, b, c FROM t ORDER BY *
```
等同于:

View File

@ -2,7 +2,6 @@
#include <cstdlib>
#include <csignal>
#include <iostream>
#include <fstream>
#include <iomanip>
#include <optional>
#include <random>

View File

@ -1,6 +1,7 @@
#include <iostream>
#include <boost/program_options.hpp>
#include <Coordination/CoordinationSettings.h>
#include <Coordination/KeeperSnapshotManager.h>
#include <Coordination/ZooKeeperDataReader.h>
#include <Common/TerminalSize.h>
@ -39,7 +40,7 @@ int mainEntryClickHouseKeeperConverter(int argc, char ** argv)
try
{
auto keeper_context = std::make_shared<KeeperContext>(true);
auto keeper_context = std::make_shared<KeeperContext>(true, std::make_shared<CoordinationSettings>());
keeper_context->setDigestEnabled(true);
keeper_context->setSnapshotDisk(std::make_shared<DiskLocal>("Keeper-snapshots", options["output-dir"].as<std::string>()));

View File

@ -41,7 +41,7 @@ if (BUILD_STANDALONE_KEEPER)
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/KeeperStorage.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/KeeperConstants.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/KeeperAsynchronousMetrics.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/pathUtils.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/KeeperCommon.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/SessionExpiryQueue.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/SummingStateMachine.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/WriteBufferFromNuraftBuffer.cpp

View File

@ -560,7 +560,7 @@ try
auto main_config_reloader = std::make_unique<ConfigReloader>(
config_path,
extra_paths,
config().getString("path", ""),
config().getString("path", KEEPER_DEFAULT_PATH),
std::move(unused_cache),
unused_event,
[&](ConfigurationPtr config, bool /* initial_loading */)

View File

@ -1292,7 +1292,7 @@ try
auto main_config_reloader = std::make_unique<ConfigReloader>(
config_path,
extra_paths,
config().getString("path", ""),
config().getString("path", DBMS_DEFAULT_PATH),
std::move(main_config_zk_node_cache),
main_config_zk_changed_event,
[&](ConfigurationPtr config, bool initial_loading)
@ -1391,7 +1391,7 @@ try
global_context->setMaxDatabaseNumToWarn(new_server_settings.max_database_num_to_warn);
global_context->setMaxPartNumToWarn(new_server_settings.max_part_num_to_warn);
ConcurrencyControl::SlotCount concurrent_threads_soft_limit = ConcurrencyControl::Unlimited;
SlotCount concurrent_threads_soft_limit = UnlimitedSlots;
if (new_server_settings.concurrent_threads_soft_limit_num > 0 && new_server_settings.concurrent_threads_soft_limit_num < concurrent_threads_soft_limit)
concurrent_threads_soft_limit = new_server_settings.concurrent_threads_soft_limit_num;
if (new_server_settings.concurrent_threads_soft_limit_ratio_to_cores > 0)

View File

@ -219,7 +219,7 @@ public:
: IAggregateFunctionDataHelper<AggregateFunctionCountData, AggregateFunctionCountNotNullUnary>({argument}, params, createResultType())
{
if (!argument->isNullable())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Not Nullable data type passed to AggregateFunctionCountNotNullUnary");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: not Nullable data type passed to AggregateFunctionCountNotNullUnary");
}
String getName() const override { return "count"; }

View File

@ -100,7 +100,7 @@ AggregateFunctionPtr AggregateFunctionFactory::get(
{
AggregateFunctionCombinatorPtr combinator = AggregateFunctionCombinatorFactory::instance().tryFindSuffix("Null");
if (!combinator)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot find aggregate function combinator "
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: cannot find aggregate function combinator "
"to apply a function to Nullable arguments.");
DataTypes nested_types = combinator->transformArguments(types_without_low_cardinality);
@ -123,7 +123,7 @@ AggregateFunctionPtr AggregateFunctionFactory::get(
auto with_original_arguments = getImpl(name, action, types_without_low_cardinality, parameters, out_properties, false);
if (!with_original_arguments)
throw Exception(ErrorCodes::LOGICAL_ERROR, "AggregateFunctionFactory returned nullptr");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: AggregateFunctionFactory returned nullptr");
return with_original_arguments;
}

View File

@ -146,9 +146,7 @@ struct AggregateFunctionSumData
size_t count = end - start;
const auto * end_ptr = ptr + count;
if constexpr (
(is_integer<T> && !is_big_int_v<T>)
|| (is_decimal<T> && !std::is_same_v<T, Decimal256> && !std::is_same_v<T, Decimal128>))
if constexpr ((is_integer<T> || is_decimal<T>) && !is_over_big_int<T>)
{
/// For integers we can vectorize the operation if we replace the null check using a multiplication (by 0 for null, 1 for not null)
/// https://quick-bench.com/q/MLTnfTvwC2qZFVeWHfOBR3U7a8I
@ -163,8 +161,39 @@ struct AggregateFunctionSumData
Impl::add(sum, local_sum);
return;
}
else if constexpr (is_over_big_int<T>)
{
/// Use a mask to discard or keep the value to reduce branch miss.
/// Notice that for (U)Int128 or Decimal128, MaskType is Int8 instead of Int64, otherwise extra branches will be introduced by compiler (for unknown reason) and performance will be worse.
using MaskType = std::conditional_t<sizeof(T) == 16, Int8, Int64>;
alignas(64) const MaskType masks[2] = {0, -1};
T local_sum{};
while (ptr < end_ptr)
{
Value v = *ptr;
if constexpr (!add_if_zero)
{
if constexpr (is_integer<T>)
v &= masks[!!*condition_map];
else
v.value &= masks[!!*condition_map];
}
else
{
if constexpr (is_integer<T>)
v &= masks[!*condition_map];
else
v.value &= masks[!*condition_map];
}
if constexpr (std::is_floating_point_v<T>)
Impl::add(local_sum, v);
++ptr;
++condition_map;
}
Impl::add(sum, local_sum);
return;
}
else if constexpr (std::is_floating_point_v<T>)
{
/// For floating point we use a similar trick as above, except that now we reinterpret the floating point number as an unsigned
/// integer of the same size and use a mask instead (0 to discard, 0xFF..FF to keep)

View File

@ -249,7 +249,7 @@ public:
: Base(std::move(nested_function_), arguments, params), number_of_arguments(arguments.size())
{
if (number_of_arguments == 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Single argument is passed to AggregateFunctionIfNullVariadic");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: single argument is passed to AggregateFunctionIfNullVariadic");
if (number_of_arguments > MAX_ARGS)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,

View File

@ -429,7 +429,7 @@ public:
, number_of_arguments(arguments.size())
{
if (number_of_arguments == 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Single argument is passed to AggregateFunctionNullVariadic");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: single argument is passed to AggregateFunctionNullVariadic");
if (number_of_arguments > MAX_ARGS)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,

View File

@ -1,6 +1,7 @@
#include <Analyzer/Passes/ArrayExistsToHasPass.h>
#include <Functions/FunctionFactory.h>
#include <Functions/array/has.h>
#include <Interpreters/Context.h>
@ -83,7 +84,8 @@ public:
return;
}
auto has_function = FunctionFactory::instance().get("has", getContext());
auto has_function = createInternalFunctionHasOverloadResolver();
array_exists_function_arguments_nodes[0] = std::move(array_exists_function_arguments_nodes[1]);
array_exists_function_arguments_nodes[1] = std::move(has_constant_element_argument);
array_exists_function_node->resolveAsFunction(has_function->build(array_exists_function_node->getArgumentColumns()));

View File

@ -10,6 +10,7 @@
#include <IO/Operators.h>
#include <Functions/FunctionFactory.h>
#include <Functions/logical.h>
#include <Common/checkStackSize.h>
@ -79,7 +80,7 @@ public:
if (name == "and" || name == "or")
{
auto function_resolver = FunctionFactory::instance().get(name, current_context);
auto function_resolver = name == "and" ? createInternalFunctionAndOverloadResolver() : createInternalFunctionOrOverloadResolver();
const auto & arguments = function_node->getArguments().getNodes();
if (arguments.size() > 2)
@ -110,10 +111,10 @@ private:
class PushNotVisitor
{
public:
explicit PushNotVisitor(const ContextPtr & context)
: not_function_resolver(FunctionFactory::instance().get("not", context))
, or_function_resolver(FunctionFactory::instance().get("or", context))
, and_function_resolver(FunctionFactory::instance().get("and", context))
explicit PushNotVisitor()
: not_function_resolver(createInternalFunctionNotOverloadResolver())
, or_function_resolver(createInternalFunctionOrOverloadResolver())
, and_function_resolver(createInternalFunctionAndOverloadResolver())
{}
void visit(QueryTreeNodePtr & node, bool add_negation)
@ -162,10 +163,10 @@ private:
class PushOrVisitor
{
public:
PushOrVisitor(ContextPtr context, size_t max_atoms_)
explicit PushOrVisitor(size_t max_atoms_)
: max_atoms(max_atoms_)
, and_resolver(FunctionFactory::instance().get("and", context))
, or_resolver(FunctionFactory::instance().get("or", context))
, and_resolver(createInternalFunctionAndOverloadResolver())
, or_resolver(createInternalFunctionOrOverloadResolver())
{}
bool visit(QueryTreeNodePtr & node, size_t num_atoms)
@ -513,11 +514,11 @@ std::optional<CNF> CNF::tryBuildCNF(const QueryTreeNodePtr & node, ContextPtr co
}
{
PushNotVisitor visitor(context);
PushNotVisitor visitor;
visitor.visit(node_cloned, false);
}
if (PushOrVisitor visitor(context, max_atoms);
if (PushOrVisitor visitor(max_atoms);
!visitor.visit(node_cloned, atom_count))
return std::nullopt;
@ -542,7 +543,7 @@ CNF CNF::toCNF(const QueryTreeNodePtr & node, ContextPtr context, size_t max_gro
return *cnf;
}
QueryTreeNodePtr CNF::toQueryTree(ContextPtr context) const
QueryTreeNodePtr CNF::toQueryTree() const
{
if (statements.empty())
return nullptr;
@ -550,9 +551,9 @@ QueryTreeNodePtr CNF::toQueryTree(ContextPtr context) const
QueryTreeNodes and_arguments;
and_arguments.reserve(statements.size());
auto not_resolver = FunctionFactory::instance().get("not", context);
auto or_resolver = FunctionFactory::instance().get("or", context);
auto and_resolver = FunctionFactory::instance().get("and", context);
auto not_resolver = createInternalFunctionNotOverloadResolver();
auto or_resolver = createInternalFunctionOrOverloadResolver();
auto and_resolver = createInternalFunctionAndOverloadResolver();
const auto function_node_from_atom = [&](const auto & atom) -> QueryTreeNodePtr
{

View File

@ -54,7 +54,7 @@ public:
static std::optional<CNF> tryBuildCNF(const QueryTreeNodePtr & node, ContextPtr context, size_t max_growth_multiplier = DEFAULT_MAX_GROWTH_MULTIPLIER);
static CNF toCNF(const QueryTreeNodePtr & node, ContextPtr context, size_t max_growth_multiplier = DEFAULT_MAX_GROWTH_MULTIPLIER);
QueryTreeNodePtr toQueryTree(ContextPtr context) const;
QueryTreeNodePtr toQueryTree() const;
const auto & getStatements() const
{

View File

@ -11,6 +11,8 @@
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionFactory.h>
#include <Functions/multiMatchAny.h>
#include <Functions/logical.h>
#include <Interpreters/Context.h>
@ -134,8 +136,10 @@ private:
void ConvertOrLikeChainPass::run(QueryTreeNodePtr & query_tree_node, ContextPtr context)
{
auto or_function_resolver = FunctionFactory::instance().get("or", context);
auto match_function_resolver = FunctionFactory::instance().get("multiMatchAny", context);
const auto & settings = context->getSettingsRef();
auto match_function_resolver = createInternalMultiMatchAnyOverloadResolver(settings.allow_hyperscan, settings.max_hyperscan_regexp_length, settings.max_hyperscan_regexp_total_length, settings.reject_expensive_hyperscan_regexps);
auto or_function_resolver = createInternalFunctionOrOverloadResolver();
ConvertOrLikeChainVisitor visitor(std::move(or_function_resolver), std::move(match_function_resolver), std::move(context));
visitor.visit(query_tree_node);
}

View File

@ -339,7 +339,7 @@ void addIndexConstraint(Analyzer::CNF & cnf, const QueryTreeNodes & table_expres
{
Analyzer::CNF::OrGroup new_group;
auto index_hint_node = std::make_shared<FunctionNode>("indexHint");
index_hint_node->getArguments().getNodes().push_back(Analyzer::CNF{std::move(and_group)}.toQueryTree(context));
index_hint_node->getArguments().getNodes().push_back(Analyzer::CNF{std::move(and_group)}.toQueryTree());
index_hint_node->resolveAsFunction(FunctionFactory::instance().get("indexHint", context));
new_group.insert({false, QueryTreeNodePtrWithHash{std::move(index_hint_node)}});
@ -676,7 +676,7 @@ void optimizeNode(QueryTreeNodePtr & node, const QueryTreeNodes & table_expressi
if (settings.optimize_using_constraints)
optimizeWithConstraints(*cnf, table_expressions, context);
auto new_node = cnf->toQueryTree(context);
auto new_node = cnf->toQueryTree();
node = std::move(new_node);
}

View File

@ -12,6 +12,7 @@
#include <Functions/FunctionFactory.h>
#include <Functions/IFunction.h>
#include <Functions/logical.h>
#include <Common/logger_useful.h>
@ -256,7 +257,7 @@ private:
for (const auto & node : nodes)
function_node->getArguments().getNodes().push_back(node);
const auto & function = FunctionFactory::instance().get("and", getContext());
const auto & function = createInternalFunctionAndOverloadResolver();
function_node->resolveAsFunction(function->build(function_node->getArgumentColumns()));
return function_node;
}

View File

@ -5,6 +5,7 @@
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Analyzer/FunctionNode.h>
#include <Functions/FunctionFactory.h>
#include <Functions/multiIf.h>
namespace DB
{
@ -75,7 +76,8 @@ private:
void IfChainToMultiIfPass::run(QueryTreeNodePtr & query_tree_node, ContextPtr context)
{
auto multi_if_function_ptr = FunctionFactory::instance().get("multiIf", context);
const auto & settings = context->getSettingsRef();
auto multi_if_function_ptr = createInternalMultiIfOverloadResolver(settings.allow_execute_multiif_columnar, settings.allow_experimental_variant_type, settings.use_variant_as_common_type);
IfChainToMultiIfPassVisitor visitor(std::move(multi_if_function_ptr), std::move(context));
visitor.visit(query_tree_node);
}

View File

@ -3,6 +3,7 @@
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Analyzer/FunctionNode.h>
#include <Functions/FunctionFactory.h>
#include <Functions/if.h>
namespace DB
{
@ -54,7 +55,8 @@ private:
void MultiIfToIfPass::run(QueryTreeNodePtr & query_tree_node, ContextPtr context)
{
auto if_function_ptr = FunctionFactory::instance().get("if", context);
const auto & settings = context->getSettingsRef();
auto if_function_ptr = createInternalFunctionIfOverloadResolver(settings.allow_experimental_variant_type, settings.use_variant_as_common_type);
MultiIfToIfVisitor visitor(std::move(if_function_ptr), std::move(context));
visitor.visit(query_tree_node);
}

View File

@ -120,7 +120,6 @@ namespace ErrorCodes
extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH;
extern const int FUNCTION_CANNOT_HAVE_PARAMETERS;
extern const int SYNTAX_ERROR;
extern const int UNEXPECTED_EXPRESSION;
extern const int INVALID_IDENTIFIER;
}
@ -1215,7 +1214,7 @@ private:
static void expandGroupByAll(QueryNode & query_tree_node_typed);
void expandOrderByAll(QueryNode & query_tree_node_typed, const Settings & settings);
void expandOrderByAll(QueryNode & query_tree_node_typed);
static std::string
rewriteAggregateFunctionNameIfNeeded(const std::string & aggregate_function_name, NullsAction action, const ContextPtr & context);
@ -2367,9 +2366,9 @@ void QueryAnalyzer::expandGroupByAll(QueryNode & query_tree_node_typed)
query_tree_node_typed.setIsGroupByAll(false);
}
void QueryAnalyzer::expandOrderByAll(QueryNode & query_tree_node_typed, const Settings & settings)
void QueryAnalyzer::expandOrderByAll(QueryNode & query_tree_node_typed)
{
if (!settings.enable_order_by_all || !query_tree_node_typed.isOrderByAll())
if (!query_tree_node_typed.isOrderByAll())
return;
auto * all_node = query_tree_node_typed.getOrderBy().getNodes()[0]->as<SortNode>();
@ -2390,9 +2389,6 @@ void QueryAnalyzer::expandOrderByAll(QueryNode & query_tree_node_typed, const Se
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Expression nodes list expected 1 projection names. Actual {}",
projection_names.size());
if (Poco::toUpper(projection_names[0]) == "ALL")
throw Exception(ErrorCodes::UNEXPECTED_EXPRESSION,
"Cannot use ORDER BY ALL to sort a column with name 'all', please disable setting `enable_order_by_all` and try again");
}
auto sort_node = std::make_shared<SortNode>(node, all_node->getSortDirection(), all_node->getNullsSortDirection());
@ -7559,7 +7555,7 @@ void QueryAnalyzer::resolveQuery(const QueryTreeNodePtr & query_node, Identifier
if (settings.enable_positional_arguments)
replaceNodesWithPositionalArguments(query_node_typed.getOrderByNode(), query_node_typed.getProjection().getNodes(), scope);
expandOrderByAll(query_node_typed, settings);
expandOrderByAll(query_node_typed);
resolveSortNodeList(query_node_typed.getOrderByNode(), scope);
}

View File

@ -219,13 +219,13 @@ public:
is_group_by_all = is_group_by_all_value;
}
/// Returns true, if query node has ORDER BY ALL modifier, false otherwise
/// Returns true, if query node has ORDER BY * modifier, false otherwise
bool isOrderByAll() const
{
return is_order_by_all;
}
/// Set query node ORDER BY ALL modifier value
/// Set query node ORDER BY * modifier value
void setIsOrderByAll(bool is_order_by_all_value)
{
is_order_by_all = is_order_by_all_value;

View File

@ -2,7 +2,7 @@
#if USE_AZURE_BLOB_STORAGE
#include <Common/quoteString.h>
#include <Interpreters/threadPoolCallbackRunner.h>
#include <Common/threadPoolCallbackRunner.h>
#include <Interpreters/Context.h>
#include <IO/SharedThreadPools.h>
#include <IO/HTTPHeaderEntries.h>

View File

@ -2,7 +2,7 @@
#if USE_AWS_S3
#include <Common/quoteString.h>
#include <Interpreters/threadPoolCallbackRunner.h>
#include <Common/threadPoolCallbackRunner.h>
#include <Interpreters/Context.h>
#include <IO/SharedThreadPools.h>
#include <IO/ReadBufferFromS3.h>
@ -127,7 +127,7 @@ BackupReaderS3::BackupReaderS3(
: BackupReaderDefault(read_settings_, write_settings_, getLogger("BackupReaderS3"))
, s3_uri(s3_uri_)
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::S3, MetadataStorageType::None, s3_uri.endpoint, false, false}
, s3_settings(context_->getStorageS3Settings().getSettings(s3_uri.uri.toString()))
, s3_settings(context_->getStorageS3Settings().getSettings(s3_uri.uri.toString(), context_->getUserName()))
{
auto & request_settings = s3_settings.request_settings;
request_settings.updateFromSettings(context_->getSettingsRef());
@ -217,7 +217,7 @@ BackupWriterS3::BackupWriterS3(
: BackupWriterDefault(read_settings_, write_settings_, getLogger("BackupWriterS3"))
, s3_uri(s3_uri_)
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::S3, MetadataStorageType::None, s3_uri.endpoint, false, false}
, s3_settings(context_->getStorageS3Settings().getSettings(s3_uri.uri.toString()))
, s3_settings(context_->getStorageS3Settings().getSettings(s3_uri.uri.toString(), context_->getUserName()))
{
auto & request_settings = s3_settings.request_settings;
request_settings.updateFromSettings(context_->getSettingsRef());

View File

@ -20,6 +20,9 @@ struct BackupOperationInfo
/// Base Backup Operation name, a string like "Disk('backups', 'my_base_backup')"
String base_backup_name;
/// Query ID of a query that started backup
String query_id;
/// This operation is internal and should not be shown in system.backups
bool internal = false;

View File

@ -440,7 +440,13 @@ OperationID BackupsWorker::startMakingBackup(const ASTPtr & query, const Context
try
{
addInfo(backup_id, backup_name_for_logging, base_backup_name, backup_settings.internal, context->getProcessListElement(), BackupStatus::CREATING_BACKUP);
addInfo(backup_id,
backup_name_for_logging,
base_backup_name,
context->getCurrentQueryId(),
backup_settings.internal,
context->getProcessListElement(),
BackupStatus::CREATING_BACKUP);
/// Prepare context to use.
ContextPtr context_in_use = context;
@ -823,7 +829,13 @@ OperationID BackupsWorker::startRestoring(const ASTPtr & query, ContextMutablePt
if (restore_settings.base_backup_info)
base_backup_name = restore_settings.base_backup_info->toStringForLogging();
addInfo(restore_id, backup_name_for_logging, base_backup_name, restore_settings.internal, context->getProcessListElement(), BackupStatus::RESTORING);
addInfo(restore_id,
backup_name_for_logging,
base_backup_name,
context->getCurrentQueryId(),
restore_settings.internal,
context->getProcessListElement(),
BackupStatus::RESTORING);
/// Prepare context to use.
ContextMutablePtr context_in_use = context;
@ -1108,13 +1120,15 @@ void BackupsWorker::restoreTablesData(const OperationID & restore_id, BackupPtr
}
void BackupsWorker::addInfo(const OperationID & id, const String & name, const String & base_backup_name, bool internal, QueryStatusPtr process_list_element, BackupStatus status)
void BackupsWorker::addInfo(const OperationID & id, const String & name, const String & base_backup_name, const String & query_id,
bool internal, QueryStatusPtr process_list_element, BackupStatus status)
{
ExtendedOperationInfo extended_info;
auto & info = extended_info.info;
info.id = id;
info.name = name;
info.base_backup_name = base_backup_name;
info.query_id = query_id;
info.internal = internal;
info.status = status;
info.start_time = std::chrono::system_clock::now();
@ -1183,7 +1197,7 @@ void BackupsWorker::setStatus(const String & id, BackupStatus status, bool throw
if (isFailedOrCancelled(status))
{
info.error_message = getCurrentExceptionMessage(false);
info.error_message = getCurrentExceptionMessage(true /*with_stacktrace*/);
info.exception = std::current_exception();
}

View File

@ -108,7 +108,8 @@ private:
/// Run data restoring tasks which insert data to tables.
void restoreTablesData(const BackupOperationID & restore_id, BackupPtr backup, DataRestoreTasks && tasks, ThreadPool & thread_pool, QueryStatusPtr process_list_element);
void addInfo(const BackupOperationID & id, const String & name, const String & base_backup_name, bool internal, QueryStatusPtr process_list_element, BackupStatus status);
void addInfo(const BackupOperationID & id, const String & name, const String & base_backup_name, const String & query_id,
bool internal, QueryStatusPtr process_list_element, BackupStatus status);
void setStatus(const BackupOperationID & id, BackupStatus status, bool throw_if_error = true);
void setStatusSafe(const String & id, BackupStatus status) { setStatus(id, status, false); }
void setNumFilesAndSize(const BackupOperationID & id, size_t num_files, UInt64 total_size, size_t num_entries,

View File

@ -506,6 +506,10 @@ if (TARGET ch_contrib::s2)
dbms_target_link_libraries (PUBLIC ch_contrib::s2)
endif()
if (TARGET ch_contrib::vectorscan)
dbms_target_link_libraries (PRIVATE ch_contrib::vectorscan)
endif()
if (TARGET ch_contrib::brotli)
target_link_libraries (clickhouse_common_io PRIVATE ch_contrib::brotli)
endif()

View File

@ -3,6 +3,7 @@
#include <Common/AsyncTaskExecutor.h>
#include <Common/Epoll.h>
#include <Common/Fiber.h>
#include <Common/FiberStack.h>
#include <Common/TimerDescriptor.h>
#include <Common/PoolWithFailoverBase.h>
#include <Client/ConnectionPool.h>

View File

@ -28,7 +28,10 @@ public:
using Entry = PoolBase<Connection>::Entry;
IConnectionPool() = default;
IConnectionPool(String host_, UInt16 port_) : host(host_), port(port_), address(host + ":" + toString(port_)) {}
IConnectionPool(String host_, UInt16 port_, Priority config_priority_)
: host(host_), port(port_), address(host + ":" + toString(port_)), config_priority(config_priority_)
{
}
virtual ~IConnectionPool() = default;
@ -42,12 +45,13 @@ public:
const std::string & getHost() const { return host; }
UInt16 getPort() const { return port; }
const String & getAddress() const { return address; }
virtual Priority getPriority() const { return Priority{1}; }
Priority getConfigPriority() const { return config_priority; }
protected:
const String host;
const UInt16 port = 0;
const String address;
const Priority config_priority;
};
using ConnectionPoolPtr = std::shared_ptr<IConnectionPool>;
@ -61,7 +65,8 @@ public:
using Entry = IConnectionPool::Entry;
using Base = PoolBase<Connection>;
ConnectionPool(unsigned max_connections_,
ConnectionPool(
unsigned max_connections_,
const String & host_,
UInt16 port_,
const String & default_database_,
@ -73,20 +78,18 @@ public:
const String & client_name_,
Protocol::Compression compression_,
Protocol::Secure secure_,
Priority priority_ = Priority{1})
: IConnectionPool(host_, port_),
Base(max_connections_,
getLogger("ConnectionPool (" + host_ + ":" + toString(port_) + ")")),
default_database(default_database_),
user(user_),
password(password_),
quota_key(quota_key_),
cluster(cluster_),
cluster_secret(cluster_secret_),
client_name(client_name_),
compression(compression_),
secure(secure_),
priority(priority_)
Priority config_priority_ = Priority{1})
: IConnectionPool(host_, port_, config_priority_)
, Base(max_connections_, getLogger("ConnectionPool (" + host_ + ":" + toString(port_) + ")"))
, default_database(default_database_)
, user(user_)
, password(password_)
, quota_key(quota_key_)
, cluster(cluster_)
, cluster_secret(cluster_secret_)
, client_name(client_name_)
, compression(compression_)
, secure(secure_)
{
}
@ -114,11 +117,6 @@ public:
return host + ":" + toString(port);
}
Priority getPriority() const override
{
return priority;
}
protected:
/** Creates a new object to put in the pool. */
ConnectionPtr allocObject() override
@ -143,7 +141,6 @@ private:
String client_name;
Protocol::Compression compression; /// Whether to compress data when interacting with the server.
Protocol::Secure secure; /// Whether to encrypt data when interacting with the server.
Priority priority; /// priority from <remote_servers>
};
/**

View File

@ -79,14 +79,6 @@ IConnectionPool::Entry ConnectionPoolWithFailover::get(const ConnectionTimeouts
return Base::get(max_ignored_errors, fallback_to_stale_replicas, try_get_entry, get_priority);
}
Priority ConnectionPoolWithFailover::getPriority() const
{
return (*std::max_element(nested_pools.begin(), nested_pools.end(), [](const auto & a, const auto & b)
{
return a->getPriority() < b->getPriority();
}))->getPriority();
}
ConnectionPoolWithFailover::Status ConnectionPoolWithFailover::getStatus() const
{
const auto [states, pools, error_decrease_time] = getPoolExtendedStates();
@ -253,13 +245,13 @@ ConnectionPoolWithFailover::tryGetEntry(
}
std::vector<ConnectionPoolWithFailover::Base::ShuffledPool>
ConnectionPoolWithFailover::getShuffledPools(const Settings & settings, GetPriorityForLoadBalancing::Func priority_func)
ConnectionPoolWithFailover::getShuffledPools(const Settings & settings, GetPriorityForLoadBalancing::Func priority_func, bool use_slowdown_count)
{
if (!priority_func)
priority_func = makeGetPriorityFunc(settings);
UInt64 max_ignored_errors = settings.distributed_replica_max_ignored_errors.value;
return Base::getShuffledPools(max_ignored_errors, priority_func);
return Base::getShuffledPools(max_ignored_errors, priority_func, use_slowdown_count);
}
}

View File

@ -49,8 +49,6 @@ public:
const Settings & settings,
bool force_connected) override; /// From IConnectionPool
Priority getPriority() const override; /// From IConnectionPool
/** Allocates up to the specified number of connections to work.
* Connections provide access to different replicas of one shard.
*/
@ -83,15 +81,15 @@ public:
struct NestedPoolStatus
{
const Base::NestedPoolPtr pool;
size_t error_count;
size_t slowdown_count;
size_t error_count = 0;
size_t slowdown_count = 0;
std::chrono::seconds estimated_recovery_time;
};
using Status = std::vector<NestedPoolStatus>;
Status getStatus() const;
std::vector<Base::ShuffledPool> getShuffledPools(const Settings & settings, GetPriorityFunc priority_func = {});
std::vector<Base::ShuffledPool> getShuffledPools(const Settings & settings, GetPriorityFunc priority_func = {}, bool use_slowdown_count = false);
size_t getMaxErrorCup() const { return Base::max_error_cap; }

View File

@ -40,7 +40,8 @@ HedgedConnectionsFactory::HedgedConnectionsFactory(
, max_parallel_replicas(max_parallel_replicas_)
, skip_unavailable_shards(skip_unavailable_shards_)
{
shuffled_pools = pool->getShuffledPools(settings_, priority_func);
shuffled_pools = pool->getShuffledPools(settings_, priority_func, /* use_slowdown_count */ true);
for (const auto & shuffled_pool : shuffled_pools)
replicas.emplace_back(
std::make_unique<ConnectionEstablisherAsync>(shuffled_pool.pool, &timeouts, settings_, log, table_to_check.get()));

View File

@ -320,7 +320,7 @@ Packet MultiplexedConnections::receivePacketUnlocked(AsyncCallback async_callbac
ReplicaState & state = getReplicaForReading();
current_connection = state.connection;
if (current_connection == nullptr)
throw Exception(ErrorCodes::NO_AVAILABLE_REPLICA, "No available replica");
throw Exception(ErrorCodes::NO_AVAILABLE_REPLICA, "Logical error: no available replica");
Packet packet;
try

View File

@ -5,6 +5,7 @@
#include <variant>
#include <Client/IConnections.h>
#include <Common/FiberStack.h>
#include <Common/Fiber.h>
#include <Common/Epoll.h>
#include <Common/TimerDescriptor.h>

View File

@ -810,7 +810,7 @@ ColumnPtr ColumnArray::filterTuple(const Filter & filt, ssize_t result_size_hint
size_t tuple_size = tuple.tupleSize();
if (tuple_size == 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty tuple");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: empty tuple");
Columns temporary_arrays(tuple_size);
for (size_t i = 0; i < tuple_size; ++i)
@ -1263,7 +1263,7 @@ ColumnPtr ColumnArray::replicateTuple(const Offsets & replicate_offsets) const
size_t tuple_size = tuple.tupleSize();
if (tuple_size == 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty tuple");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: empty tuple");
Columns temporary_arrays(tuple_size);
for (size_t i = 0; i < tuple_size; ++i)

View File

@ -1,5 +1,7 @@
#include <Common/Arena.h>
#include <Common/SipHash.h>
#include <Common/NaNUtils.h>
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include <Common/WeakHash.h>
#include <Columns/ColumnDecimal.h>
@ -26,6 +28,7 @@ namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int ILLEGAL_COLUMN;
extern const int SIZES_OF_NESTED_COLUMNS_ARE_INCONSISTENT;
extern const int NOT_IMPLEMENTED;
}
@ -826,7 +829,8 @@ void ColumnNullable::applyNullMap(const ColumnNullable & other)
void ColumnNullable::checkConsistency() const
{
if (null_map->size() != getNestedColumn().size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Sizes of nested column and null map of Nullable column are not equal");
throw Exception(ErrorCodes::SIZES_OF_NESTED_COLUMNS_ARE_INCONSISTENT,
"Logical error: Sizes of nested column and null map of Nullable column are not equal");
}
ColumnPtr ColumnNullable::createWithOffsets(const IColumn::Offsets & offsets, const ColumnConst & column_with_default_value, size_t total_rows, size_t shift) const

View File

@ -21,7 +21,7 @@ static bool sameConstants(const IColumn & a, const IColumn & b)
ColumnWithTypeAndName getLeastSuperColumn(const std::vector<const ColumnWithTypeAndName *> & columns)
{
if (columns.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "No src columns for supercolumn");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: no src columns for supercolumn");
ColumnWithTypeAndName result = *columns[0];

View File

@ -57,6 +57,249 @@ inline bool cpuid(UInt32 op, UInt32 * res) noexcept /// NOLINT
#endif
}
union CPUInfo
{
UInt32 info[4];
struct Registers
{
UInt32 eax;
UInt32 ebx;
UInt32 ecx;
UInt32 edx;
} registers;
inline explicit CPUInfo(UInt32 op) noexcept { cpuid(op, info); }
inline CPUInfo(UInt32 op, UInt32 sub_op) noexcept { cpuid(op, sub_op, info); }
};
inline bool haveRDTSCP() noexcept
{
return (CPUInfo(0x80000001).registers.edx >> 27) & 1u;
}
inline bool haveSSE() noexcept
{
return (CPUInfo(0x1).registers.edx >> 25) & 1u;
}
inline bool haveSSE2() noexcept
{
return (CPUInfo(0x1).registers.edx >> 26) & 1u;
}
inline bool haveSSE3() noexcept
{
return CPUInfo(0x1).registers.ecx & 1u;
}
inline bool havePCLMUL() noexcept
{
return (CPUInfo(0x1).registers.ecx >> 1) & 1u;
}
inline bool haveSSSE3() noexcept
{
return (CPUInfo(0x1).registers.ecx >> 9) & 1u;
}
inline bool haveSSE41() noexcept
{
return (CPUInfo(0x1).registers.ecx >> 19) & 1u;
}
inline bool haveSSE42() noexcept
{
return (CPUInfo(0x1).registers.ecx >> 20) & 1u;
}
inline bool haveF16C() noexcept
{
return (CPUInfo(0x1).registers.ecx >> 29) & 1u;
}
inline bool havePOPCNT() noexcept
{
return (CPUInfo(0x1).registers.ecx >> 23) & 1u;
}
inline bool haveAES() noexcept
{
return (CPUInfo(0x1).registers.ecx >> 25) & 1u;
}
inline bool haveXSAVE() noexcept
{
return (CPUInfo(0x1).registers.ecx >> 26) & 1u;
}
inline bool haveOSXSAVE() noexcept
{
return (CPUInfo(0x1).registers.ecx >> 27) & 1u;
}
inline bool haveAVX() noexcept
{
#if defined(__x86_64__)
// http://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf
// https://bugs.chromium.org/p/chromium/issues/detail?id=375968
return haveOSXSAVE() // implies haveXSAVE()
&& (our_xgetbv(0) & 6u) == 6u // XMM state and YMM state are enabled by OS
&& ((CPUInfo(0x1).registers.ecx >> 28) & 1u); // AVX bit
#else
return false;
#endif
}
inline bool haveFMA() noexcept
{
return haveAVX() && ((CPUInfo(0x1).registers.ecx >> 12) & 1u);
}
inline bool haveAVX2() noexcept
{
return haveAVX() && ((CPUInfo(0x7, 0).registers.ebx >> 5) & 1u);
}
inline bool haveBMI1() noexcept
{
return (CPUInfo(0x7, 0).registers.ebx >> 3) & 1u;
}
inline bool haveBMI2() noexcept
{
return (CPUInfo(0x7, 0).registers.ebx >> 8) & 1u;
}
inline bool haveAVX512F() noexcept
{
#if defined(__x86_64__)
// https://software.intel.com/en-us/articles/how-to-detect-knl-instruction-support
return haveOSXSAVE() // implies haveXSAVE()
&& (our_xgetbv(0) & 6u) == 6u // XMM state and YMM state are enabled by OS
&& ((our_xgetbv(0) >> 5) & 7u) == 7u // ZMM state is enabled by OS
&& CPUInfo(0x0).registers.eax >= 0x7 // leaf 7 is present
&& ((CPUInfo(0x7, 0).registers.ebx >> 16) & 1u); // AVX512F bit
#else
return false;
#endif
}
inline bool haveAVX512DQ() noexcept
{
return haveAVX512F() && ((CPUInfo(0x7, 0).registers.ebx >> 17) & 1u);
}
inline bool haveRDSEED() noexcept
{
return CPUInfo(0x0).registers.eax >= 0x7 && ((CPUInfo(0x7, 0).registers.ebx >> 18) & 1u);
}
inline bool haveADX() noexcept
{
return CPUInfo(0x0).registers.eax >= 0x7 && ((CPUInfo(0x7, 0).registers.ebx >> 19) & 1u);
}
inline bool haveAVX512IFMA() noexcept
{
return haveAVX512F() && ((CPUInfo(0x7, 0).registers.ebx >> 21) & 1u);
}
inline bool havePCOMMIT() noexcept
{
return CPUInfo(0x0).registers.eax >= 0x7 && ((CPUInfo(0x7, 0).registers.ebx >> 22) & 1u);
}
inline bool haveCLFLUSHOPT() noexcept
{
return CPUInfo(0x0).registers.eax >= 0x7 && ((CPUInfo(0x7, 0).registers.ebx >> 23) & 1u);
}
inline bool haveCLWB() noexcept
{
return CPUInfo(0x0).registers.eax >= 0x7 && ((CPUInfo(0x7, 0).registers.ebx >> 24) & 1u);
}
inline bool haveAVX512PF() noexcept
{
return haveAVX512F() && ((CPUInfo(0x7, 0).registers.ebx >> 26) & 1u);
}
inline bool haveAVX512ER() noexcept
{
return haveAVX512F() && ((CPUInfo(0x7, 0).registers.ebx >> 27) & 1u);
}
inline bool haveAVX512CD() noexcept
{
return haveAVX512F() && ((CPUInfo(0x7, 0).registers.ebx >> 28) & 1u);
}
inline bool haveSHA() noexcept
{
return CPUInfo(0x0).registers.eax >= 0x7 && ((CPUInfo(0x7, 0).registers.ebx >> 29) & 1u);
}
inline bool haveAVX512BW() noexcept
{
return haveAVX512F() && ((CPUInfo(0x7, 0).registers.ebx >> 30) & 1u);
}
inline bool haveAVX512VL() noexcept
{
return haveAVX512F() && ((CPUInfo(0x7, 0).registers.ebx >> 31) & 1u);
}
inline bool havePREFETCHWT1() noexcept
{
return CPUInfo(0x0).registers.eax >= 0x7 && ((CPUInfo(0x7, 0).registers.ecx >> 0) & 1u);
}
inline bool haveAVX512VBMI() noexcept
{
return haveAVX512F() && ((CPUInfo(0x7, 0).registers.ecx >> 1) & 1u);
}
inline bool haveAVX512VBMI2() noexcept
{
return haveAVX512F() && ((CPUInfo(0x7, 0).registers.ecx >> 6) & 1u);
}
inline bool haveRDRAND() noexcept
{
return CPUInfo(0x0).registers.eax >= 0x7 && ((CPUInfo(0x1).registers.ecx >> 30) & 1u);
}
inline bool haveAMX() noexcept
{
#if defined(__x86_64__)
// http://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf
return haveOSXSAVE() // implies haveXSAVE()
&& ((our_xgetbv(0) >> 17) & 0x3) == 0x3; // AMX state are enabled by OS
#else
return false;
#endif
}
inline bool haveAMXBF16() noexcept
{
return haveAMX()
&& ((CPUInfo(0x7, 0).registers.edx >> 22) & 1u); // AMX-BF16 bit
}
inline bool haveAMXTILE() noexcept
{
return haveAMX()
&& ((CPUInfo(0x7, 0).registers.edx >> 24) & 1u); // AMX-TILE bit
}
inline bool haveAMXINT8() noexcept
{
return haveAMX()
&& ((CPUInfo(0x7, 0).registers.edx >> 25) & 1u); // AMX-INT8 bit
}
#define CPU_ID_ENUMERATE(OP) \
OP(SSE) \
OP(SSE2) \
@ -98,253 +341,6 @@ inline bool cpuid(UInt32 op, UInt32 * res) noexcept /// NOLINT
OP(AMXTILE) \
OP(AMXINT8)
union CPUInfo
{
UInt32 info[4];
struct Registers
{
UInt32 eax;
UInt32 ebx;
UInt32 ecx;
UInt32 edx;
} registers;
inline explicit CPUInfo(UInt32 op) noexcept { cpuid(op, info); }
inline CPUInfo(UInt32 op, UInt32 sub_op) noexcept { cpuid(op, sub_op, info); }
};
#define DEF_NAME(X) inline bool have##X() noexcept;
CPU_ID_ENUMERATE(DEF_NAME)
#undef DEF_NAME
bool haveRDTSCP() noexcept
{
return (CPUInfo(0x80000001).registers.edx >> 27) & 1u;
}
bool haveSSE() noexcept
{
return (CPUInfo(0x1).registers.edx >> 25) & 1u;
}
bool haveSSE2() noexcept
{
return (CPUInfo(0x1).registers.edx >> 26) & 1u;
}
bool haveSSE3() noexcept
{
return CPUInfo(0x1).registers.ecx & 1u;
}
bool havePCLMUL() noexcept
{
return (CPUInfo(0x1).registers.ecx >> 1) & 1u;
}
bool haveSSSE3() noexcept
{
return (CPUInfo(0x1).registers.ecx >> 9) & 1u;
}
bool haveSSE41() noexcept
{
return (CPUInfo(0x1).registers.ecx >> 19) & 1u;
}
bool haveSSE42() noexcept
{
return (CPUInfo(0x1).registers.ecx >> 20) & 1u;
}
bool haveF16C() noexcept
{
return (CPUInfo(0x1).registers.ecx >> 29) & 1u;
}
bool havePOPCNT() noexcept
{
return (CPUInfo(0x1).registers.ecx >> 23) & 1u;
}
bool haveAES() noexcept
{
return (CPUInfo(0x1).registers.ecx >> 25) & 1u;
}
bool haveXSAVE() noexcept
{
return (CPUInfo(0x1).registers.ecx >> 26) & 1u;
}
bool haveOSXSAVE() noexcept
{
return (CPUInfo(0x1).registers.ecx >> 27) & 1u;
}
bool haveAVX() noexcept
{
#if defined(__x86_64__)
// http://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf
// https://bugs.chromium.org/p/chromium/issues/detail?id=375968
return haveOSXSAVE() // implies haveXSAVE()
&& (our_xgetbv(0) & 6u) == 6u // XMM state and YMM state are enabled by OS
&& ((CPUInfo(0x1).registers.ecx >> 28) & 1u); // AVX bit
#else
return false;
#endif
}
bool haveFMA() noexcept
{
return haveAVX() && ((CPUInfo(0x1).registers.ecx >> 12) & 1u);
}
bool haveAVX2() noexcept
{
return haveAVX() && ((CPUInfo(0x7, 0).registers.ebx >> 5) & 1u);
}
bool haveBMI1() noexcept
{
return (CPUInfo(0x7, 0).registers.ebx >> 3) & 1u;
}
bool haveBMI2() noexcept
{
return (CPUInfo(0x7, 0).registers.ebx >> 8) & 1u;
}
bool haveAVX512F() noexcept
{
#if defined(__x86_64__)
// https://software.intel.com/en-us/articles/how-to-detect-knl-instruction-support
return haveOSXSAVE() // implies haveXSAVE()
&& (our_xgetbv(0) & 6u) == 6u // XMM state and YMM state are enabled by OS
&& ((our_xgetbv(0) >> 5) & 7u) == 7u // ZMM state is enabled by OS
&& CPUInfo(0x0).registers.eax >= 0x7 // leaf 7 is present
&& ((CPUInfo(0x7, 0).registers.ebx >> 16) & 1u); // AVX512F bit
#else
return false;
#endif
}
bool haveAVX512DQ() noexcept
{
return haveAVX512F() && ((CPUInfo(0x7, 0).registers.ebx >> 17) & 1u);
}
bool haveRDSEED() noexcept
{
return CPUInfo(0x0).registers.eax >= 0x7 && ((CPUInfo(0x7, 0).registers.ebx >> 18) & 1u);
}
bool haveADX() noexcept
{
return CPUInfo(0x0).registers.eax >= 0x7 && ((CPUInfo(0x7, 0).registers.ebx >> 19) & 1u);
}
bool haveAVX512IFMA() noexcept
{
return haveAVX512F() && ((CPUInfo(0x7, 0).registers.ebx >> 21) & 1u);
}
bool havePCOMMIT() noexcept
{
return CPUInfo(0x0).registers.eax >= 0x7 && ((CPUInfo(0x7, 0).registers.ebx >> 22) & 1u);
}
bool haveCLFLUSHOPT() noexcept
{
return CPUInfo(0x0).registers.eax >= 0x7 && ((CPUInfo(0x7, 0).registers.ebx >> 23) & 1u);
}
bool haveCLWB() noexcept
{
return CPUInfo(0x0).registers.eax >= 0x7 && ((CPUInfo(0x7, 0).registers.ebx >> 24) & 1u);
}
bool haveAVX512PF() noexcept
{
return haveAVX512F() && ((CPUInfo(0x7, 0).registers.ebx >> 26) & 1u);
}
bool haveAVX512ER() noexcept
{
return haveAVX512F() && ((CPUInfo(0x7, 0).registers.ebx >> 27) & 1u);
}
bool haveAVX512CD() noexcept
{
return haveAVX512F() && ((CPUInfo(0x7, 0).registers.ebx >> 28) & 1u);
}
bool haveSHA() noexcept
{
return CPUInfo(0x0).registers.eax >= 0x7 && ((CPUInfo(0x7, 0).registers.ebx >> 29) & 1u);
}
bool haveAVX512BW() noexcept
{
return haveAVX512F() && ((CPUInfo(0x7, 0).registers.ebx >> 30) & 1u);
}
bool haveAVX512VL() noexcept
{
return haveAVX512F() && ((CPUInfo(0x7, 0).registers.ebx >> 31) & 1u);
}
bool havePREFETCHWT1() noexcept
{
return CPUInfo(0x0).registers.eax >= 0x7 && ((CPUInfo(0x7, 0).registers.ecx >> 0) & 1u);
}
bool haveAVX512VBMI() noexcept
{
return haveAVX512F() && ((CPUInfo(0x7, 0).registers.ecx >> 1) & 1u);
}
bool haveAVX512VBMI2() noexcept
{
return haveAVX512F() && ((CPUInfo(0x7, 0).registers.ecx >> 6) & 1u);
}
bool haveRDRAND() noexcept
{
return CPUInfo(0x0).registers.eax >= 0x7 && ((CPUInfo(0x1).registers.ecx >> 30) & 1u);
}
inline bool haveAMX() noexcept
{
#if defined(__x86_64__)
// http://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf
return haveOSXSAVE() // implies haveXSAVE()
&& ((our_xgetbv(0) >> 17) & 0x3) == 0x3; // AMX state are enabled by OS
#else
return false;
#endif
}
bool haveAMXBF16() noexcept
{
return haveAMX()
&& ((CPUInfo(0x7, 0).registers.edx >> 22) & 1u); // AMX-BF16 bit
}
bool haveAMXTILE() noexcept
{
return haveAMX()
&& ((CPUInfo(0x7, 0).registers.edx >> 24) & 1u); // AMX-TILE bit
}
bool haveAMXINT8() noexcept
{
return haveAMX()
&& ((CPUInfo(0x7, 0).registers.edx >> 25) & 1u); // AMX-INT8 bit
}
struct CPUFlagsCache
{
#define DEF_NAME(X) static inline bool have_##X = have##X();

View File

@ -12,10 +12,10 @@ namespace ErrorCodes
ConcurrencyControl::Slot::~Slot()
{
allocation->release();
static_cast<ConcurrencyControl::Allocation&>(*allocation).release();
}
ConcurrencyControl::Slot::Slot(AllocationPtr && allocation_)
ConcurrencyControl::Slot::Slot(SlotAllocationPtr && allocation_)
: allocation(std::move(allocation_))
{
}
@ -27,7 +27,7 @@ ConcurrencyControl::Allocation::~Allocation()
parent.free(this);
}
[[nodiscard]] ConcurrencyControl::SlotPtr ConcurrencyControl::Allocation::tryAcquire()
[[nodiscard]] AcquiredSlotPtr ConcurrencyControl::Allocation::tryAcquire()
{
SlotCount value = granted.load();
while (value)
@ -35,15 +35,21 @@ ConcurrencyControl::Allocation::~Allocation()
if (granted.compare_exchange_strong(value, value - 1))
{
std::unique_lock lock{mutex};
return SlotPtr(new Slot(shared_from_this())); // can't use std::make_shared due to private ctor
return AcquiredSlotPtr(new Slot(shared_from_this())); // can't use std::make_shared due to private ctor
}
}
return {}; // avoid unnecessary locking
}
ConcurrencyControl::SlotCount ConcurrencyControl::Allocation::grantedCount() const
SlotCount ConcurrencyControl::Allocation::grantedCount() const
{
return granted;
return granted.load();
}
SlotCount ConcurrencyControl::Allocation::allocatedCount() const
{
std::unique_lock lock{mutex};
return allocated;
}
ConcurrencyControl::Allocation::Allocation(ConcurrencyControl & parent_, SlotCount limit_, SlotCount granted_, Waiters::iterator waiter_)
@ -87,7 +93,7 @@ ConcurrencyControl::~ConcurrencyControl()
abort();
}
[[nodiscard]] ConcurrencyControl::AllocationPtr ConcurrencyControl::allocate(SlotCount min, SlotCount max)
[[nodiscard]] SlotAllocationPtr ConcurrencyControl::allocate(SlotCount min, SlotCount max)
{
if (min > max)
throw Exception(ErrorCodes::LOGICAL_ERROR, "ConcurrencyControl: invalid allocation requirements");
@ -100,13 +106,13 @@ ConcurrencyControl::~ConcurrencyControl()
// Create allocation and start waiting if more slots are required
if (granted < max)
return AllocationPtr(new Allocation(*this, max, granted,
return SlotAllocationPtr(new Allocation(*this, max, granted,
waiters.insert(cur_waiter, nullptr /* pointer is set by Allocation ctor */)));
else
return AllocationPtr(new Allocation(*this, max, granted));
return SlotAllocationPtr(new Allocation(*this, max, granted));
}
void ConcurrencyControl::setMaxConcurrency(ConcurrencyControl::SlotCount value)
void ConcurrencyControl::setMaxConcurrency(SlotCount value)
{
std::unique_lock lock{mutex};
max_concurrency = std::max<SlotCount>(1, value); // never allow max_concurrency to be zero
@ -162,7 +168,7 @@ void ConcurrencyControl::schedule(std::unique_lock<std::mutex> &)
}
}
ConcurrencyControl::SlotCount ConcurrencyControl::available(std::unique_lock<std::mutex> &) const
SlotCount ConcurrencyControl::available(std::unique_lock<std::mutex> &) const
{
if (cur_concurrency < max_concurrency)
return max_concurrency - cur_concurrency;

View File

@ -7,6 +7,7 @@
#include <base/types.h>
#include <boost/core/noncopyable.hpp>
#include <Common/ISlotControl.h>
namespace DB
{
@ -34,41 +35,35 @@ namespace DB
* Oversubscription is possible: total amount of allocated slots can exceed `setMaxConcurrency(limit)`
* because `min` amount of slots is allocated for each query unconditionally.
*/
class ConcurrencyControl : boost::noncopyable
class ConcurrencyControl : public ISlotControl
{
public:
struct Allocation;
using AllocationPtr = std::shared_ptr<Allocation>;
using SlotCount = UInt64;
using Waiters = std::list<Allocation *>;
static constexpr SlotCount Unlimited = std::numeric_limits<SlotCount>::max();
// Scoped guard for acquired slot, see Allocation::tryAcquire()
struct Slot : boost::noncopyable
struct Slot : public IAcquiredSlot
{
~Slot();
~Slot() override;
private:
friend struct Allocation; // for ctor
explicit Slot(AllocationPtr && allocation_);
explicit Slot(SlotAllocationPtr && allocation_);
AllocationPtr allocation;
SlotAllocationPtr allocation;
};
// FIXME: have to be unique_ptr, but ThreadFromGlobalPool does not support move semantics yet
using SlotPtr = std::shared_ptr<Slot>;
// Manages group of slots for a single query, see ConcurrencyControl::allocate(min, max)
struct Allocation : std::enable_shared_from_this<Allocation>, boost::noncopyable
struct Allocation : public ISlotAllocation
{
~Allocation();
~Allocation() override;
// Take one already granted slot if available. Lock-free iff there is no granted slot.
[[nodiscard]] SlotPtr tryAcquire();
[[nodiscard]] AcquiredSlotPtr tryAcquire() override;
SlotCount grantedCount() const;
SlotCount grantedCount() const override;
SlotCount allocatedCount() const override;
private:
friend struct Slot; // for release()
@ -94,7 +89,7 @@ public:
ConcurrencyControl & parent;
const SlotCount limit;
std::mutex mutex; // the following values must be accessed under this mutex
mutable std::mutex mutex; // the following values must be accessed under this mutex
SlotCount allocated; // allocated total (including already `released`)
SlotCount released = 0;
@ -103,17 +98,16 @@ public:
const Waiters::iterator waiter; // iterator to itself in Waiters list; valid iff allocated < limit
};
public:
ConcurrencyControl();
// WARNING: all Allocation objects MUST be destructed before ConcurrencyControl
// NOTE: Recommended way to achieve this is to use `instance()` and do graceful shutdown of queries
~ConcurrencyControl();
~ConcurrencyControl() override;
// Allocate at least `min` and at most `max` slots.
// If not all `max` slots were successfully allocated, a subscription for later allocation is created
// Use `Allocation::tryAcquire()` to acquire allocated slot, before running a thread.
[[nodiscard]] AllocationPtr allocate(SlotCount min, SlotCount max);
[[nodiscard]] SlotAllocationPtr allocate(SlotCount min, SlotCount max) override;
void setMaxConcurrency(SlotCount value);
@ -134,7 +128,7 @@ private:
std::mutex mutex;
Waiters waiters;
Waiters::iterator cur_waiter; // round-robin pointer
SlotCount max_concurrency = Unlimited;
SlotCount max_concurrency = UnlimitedSlots;
SlotCount cur_concurrency = 0;
};

View File

@ -262,6 +262,9 @@
M(ActiveTimersInQueryProfiler, "Number of Active thread local timers in QueryProfiler") \
M(RefreshableViews, "Number materialized views with periodic refreshing (REFRESH)") \
M(RefreshingViews, "Number of materialized views currently executing a refresh") \
M(StorageBufferFlushThreads, "Number of threads for background flushes in StorageBuffer") \
M(StorageBufferFlushThreadsActive, "Number of threads for background flushes in StorageBuffer running a task") \
M(StorageBufferFlushThreadsScheduled, "Number of queued or active threads for background flushes in StorageBuffer")
#ifdef APPLY_FOR_EXTERNAL_METRICS
#define APPLY_FOR_METRICS(M) APPLY_FOR_BUILTIN_METRICS(M) APPLY_FOR_EXTERNAL_METRICS(M)

View File

@ -46,12 +46,6 @@ public:
current_fiber = parent_fiber;
}
static FiberPtr & getCurrentFiber()
{
thread_local static FiberPtr current_fiber;
return current_fiber;
}
private:
template <typename Fn>
struct RoutineImpl
@ -80,6 +74,12 @@ private:
Fn fn;
};
static FiberPtr & getCurrentFiber()
{
thread_local static FiberPtr current_fiber;
return current_fiber;
}
/// Special wrapper to store data in uniquer_ptr.
struct DataWrapper
{
@ -146,3 +146,4 @@ private:
T main_instance;
};

76
src/Common/ISlotControl.h Normal file
View File

@ -0,0 +1,76 @@
#pragma once
#include <limits>
#include <memory>
#include <base/types.h>
#include <boost/core/noncopyable.hpp>
namespace DB
{
// Interfaces for abstract "slot" allocation and control.
// Slot is a virtual entity existing in a limited amount (CPUs or memory chunks, etc).
//
// Every slot can be in one of the following states:
// * free: slot is available to be allocated.
// * allocated: slot is allocated to a specific ISlotAllocation.
//
// Allocated slots can be in one of the following states:
// * granted: allocated, but not yet acquired.
// * acquired: a granted slot becomes acquired by using IAcquiredSlot.
//
// Example for CPU (see ConcurrencyControl.h). Every slot represents one CPU in the system.
// Slot allocation is a request to allocate specific number of CPUs for a specific query.
// Acquired slot is an entity that is held by a thread as long as it is running. This allows
// total number of threads in the system to be limited and the distribution process to be controlled.
//
// TODO:
// - for preemption - ability to return granted slot back and reacquire it later.
// - for memory allocations - variable size of slots (in bytes).
/// Number of slots
using SlotCount = UInt64;
/// Unlimited number of slots
constexpr SlotCount UnlimitedSlots = std::numeric_limits<SlotCount>::max();
/// Acquired slot holder. Slot is considered to be acquired as long as the object exists.
class IAcquiredSlot : public std::enable_shared_from_this<IAcquiredSlot>, boost::noncopyable
{
public:
virtual ~IAcquiredSlot() = default;
};
using AcquiredSlotPtr = std::shared_ptr<IAcquiredSlot>;
/// Request for allocation of slots from ISlotControl.
/// Allows for more slots to be acquired and the whole request to be canceled.
class ISlotAllocation : public std::enable_shared_from_this<ISlotAllocation>, boost::noncopyable
{
public:
virtual ~ISlotAllocation() = default;
/// Take one already granted slot if available.
[[nodiscard]] virtual AcquiredSlotPtr tryAcquire() = 0;
/// Returns the number of granted slots for given allocation (i.e. available to be acquired)
virtual SlotCount grantedCount() const = 0;
/// Returns the total number of slots allocated at the moment (acquired and granted)
virtual SlotCount allocatedCount() const = 0;
};
using SlotAllocationPtr = std::shared_ptr<ISlotAllocation>;
class ISlotControl : boost::noncopyable
{
public:
virtual ~ISlotControl() = default;
// Allocate at least `min` and at most `max` slots.
// If not all `max` slots were successfully allocated, a "subscription" for later allocation is created
[[nodiscard]] virtual SlotAllocationPtr allocate(SlotCount min, SlotCount max) = 0;
};
}

View File

@ -66,7 +66,7 @@ public:
, log(log_)
{
for (size_t i = 0;i < nested_pools.size(); ++i)
shared_pool_states[i].config_priority = nested_pools[i]->getPriority();
shared_pool_states[i].config_priority = nested_pools[i]->getConfigPriority();
}
struct TryResult
@ -133,7 +133,7 @@ protected:
void updateErrorCounts(PoolStates & states, time_t & last_decrease_time) const;
std::vector<ShuffledPool> getShuffledPools(size_t max_ignored_errors, const GetPriorityFunc & get_priority);
std::vector<ShuffledPool> getShuffledPools(size_t max_ignored_errors, const GetPriorityFunc & get_priority, bool use_slowdown_count = false);
inline void updateSharedErrorCounts(std::vector<ShuffledPool> & shuffled_pools);
@ -160,7 +160,7 @@ protected:
template <typename TNestedPool>
std::vector<typename PoolWithFailoverBase<TNestedPool>::ShuffledPool>
PoolWithFailoverBase<TNestedPool>::getShuffledPools(
size_t max_ignored_errors, const PoolWithFailoverBase::GetPriorityFunc & get_priority)
size_t max_ignored_errors, const PoolWithFailoverBase::GetPriorityFunc & get_priority, bool use_slowdown_count)
{
/// Update random numbers and error counts.
PoolStates pool_states = updatePoolStates(max_ignored_errors);
@ -175,13 +175,13 @@ PoolWithFailoverBase<TNestedPool>::getShuffledPools(
std::vector<ShuffledPool> shuffled_pools;
shuffled_pools.reserve(nested_pools.size());
for (size_t i = 0; i < nested_pools.size(); ++i)
shuffled_pools.push_back(ShuffledPool{nested_pools[i], &pool_states[i], i, /* error_count = */ 0, /* slowdown_count = */ 0});
shuffled_pools.emplace_back(ShuffledPool{.pool = nested_pools[i], .state = &pool_states[i], .index = i});
::sort(
shuffled_pools.begin(), shuffled_pools.end(),
[](const ShuffledPool & lhs, const ShuffledPool & rhs)
[use_slowdown_count](const ShuffledPool & lhs, const ShuffledPool & rhs)
{
return PoolState::compare(*lhs.state, *rhs.state);
return PoolState::compare(*lhs.state, *rhs.state, use_slowdown_count);
});
return shuffled_pools;
@ -344,10 +344,14 @@ struct PoolWithFailoverBase<TNestedPool>::PoolState
random = rng();
}
static bool compare(const PoolState & lhs, const PoolState & rhs)
static bool compare(const PoolState & lhs, const PoolState & rhs, bool use_slowdown_count)
{
if (use_slowdown_count)
return std::forward_as_tuple(lhs.error_count, lhs.slowdown_count, lhs.config_priority, lhs.priority, lhs.random)
< std::forward_as_tuple(rhs.error_count, rhs.slowdown_count, rhs.config_priority, rhs.priority, rhs.random);
else
return std::forward_as_tuple(lhs.error_count, lhs.config_priority, lhs.priority, lhs.random)
< std::forward_as_tuple(rhs.error_count, rhs.config_priority, rhs.priority, rhs.random);
}
private:

View File

@ -632,6 +632,12 @@ The server successfully detected this situation and will download merged part fr
M(InterfacePostgreSQLReceiveBytes, "Number of bytes received through PostgreSQL interfaces") \
\
M(ParallelReplicasUsedCount, "Number of replicas used to execute a query with task-based parallel replicas") \
\
M(KeeperLogsEntryReadFromLatestCache, "Number of log entries in Keeper being read from latest logs cache") \
M(KeeperLogsEntryReadFromCommitCache, "Number of log entries in Keeper being read from commit logs cache") \
M(KeeperLogsEntryReadFromFile, "Number of log entries in Keeper being read directly from the changelog file") \
M(KeeperLogsPrefetchedEntries, "Number of log entries in Keeper being prefetched from the changelog file") \
\
M(ParallelReplicasAvailableCount, "Number of replicas available to execute a query with task-based parallel replicas") \
M(ParallelReplicasUnavailableCount, "Number of replicas which was chosen, but found to be unavailable during query execution with task-based parallel replicas") \

View File

@ -91,7 +91,7 @@ void SensitiveDataMasker::setInstance(std::unique_ptr<SensitiveDataMasker>&& sen
{
if (!sensitive_data_masker_)
throw Exception(ErrorCodes::LOGICAL_ERROR, "The 'sensitive_data_masker' is not set");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: the 'sensitive_data_masker' is not set");
if (sensitive_data_masker_->rulesCount() > 0)
{

View File

@ -209,7 +209,7 @@ public:
{
if (!is_reference_128)
throw DB::Exception(
DB::ErrorCodes::LOGICAL_ERROR, "Can't call get128Reference when is_reference_128 is not set");
DB::ErrorCodes::LOGICAL_ERROR, "Logical error: can't call get128Reference when is_reference_128 is not set");
finalize();
const auto lo = v0 ^ v1 ^ v2 ^ v3;
v1 ^= 0xdd;

View File

@ -448,6 +448,9 @@ toStringEveryLineImpl([[maybe_unused]] bool fatal, const StackTraceRefTriple & s
DB::writePointerHex(frame.physical_addr, out);
}
if (frame.object.has_value())
out << " in " << *frame.object;
callback(out.str());
};
#else

View File

@ -1,8 +1,8 @@
#include <base/getThreadId.h>
#include <base/defines.h> /// THREAD_SANITIZER
#include <Common/checkStackSize.h>
#include <Common/Exception.h>
#include <Common/Fiber.h>
#include <base/getThreadId.h>
#include <base/scope_guard.h>
#include <base/defines.h> /// THREAD_SANITIZER
#include <sys/resource.h>
#include <pthread.h>
#include <unistd.h>
@ -114,10 +114,6 @@ __attribute__((__weak__)) void checkStackSize()
{
using namespace DB;
/// Not implemented for coroutines.
if (Fiber::getCurrentFiber())
return;
if (!stack_address)
max_stack_size = getStackSize(&stack_address);
@ -140,7 +136,7 @@ __attribute__((__weak__)) void checkStackSize()
/// We assume that stack grows towards lower addresses. And that it starts to grow from the end of a chunk of memory of max_stack_size.
if (int_frame_address > int_stack_address + max_stack_size)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Frame address is greater than stack begin address");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: frame address is greater than stack begin address");
size_t stack_size = int_stack_address + max_stack_size - int_frame_address;
size_t max_stack_size_allowed = static_cast<size_t>(max_stack_size * STACK_SIZE_FREE_RATIO);

View File

@ -427,9 +427,7 @@ TEST(AsyncLoader, CancelExecutingTask)
}
}
// This test is disabled due to `MemorySanitizer: use-of-uninitialized-value` issue in `collectSymbolsFromProgramHeaders` function
// More details: https://github.com/ClickHouse/ClickHouse/pull/48923#issuecomment-1545415482
TEST(AsyncLoader, DISABLED_JobFailure)
TEST(AsyncLoader, JobFailure)
{
AsyncLoaderTest t;
t.loader.start();

View File

@ -15,7 +15,7 @@ struct ConcurrencyControlTest
{
ConcurrencyControl cc;
explicit ConcurrencyControlTest(ConcurrencyControl::SlotCount limit = ConcurrencyControl::Unlimited)
explicit ConcurrencyControlTest(SlotCount limit = UnlimitedSlots)
{
cc.setMaxConcurrency(limit);
}
@ -25,7 +25,7 @@ TEST(ConcurrencyControl, Unlimited)
{
ConcurrencyControlTest t; // unlimited number of slots
auto slots = t.cc.allocate(0, 100500);
std::vector<ConcurrencyControl::SlotPtr> acquired;
std::vector<AcquiredSlotPtr> acquired;
while (auto slot = slots->tryAcquire())
acquired.emplace_back(std::move(slot));
ASSERT_TRUE(acquired.size() == 100500);
@ -34,14 +34,14 @@ TEST(ConcurrencyControl, Unlimited)
TEST(ConcurrencyControl, Fifo)
{
ConcurrencyControlTest t(1); // use single slot
std::vector<ConcurrencyControl::AllocationPtr> allocations;
std::vector<SlotAllocationPtr> allocations;
constexpr int count = 42;
allocations.reserve(count);
for (int i = 0; i < count; i++)
allocations.emplace_back(t.cc.allocate(0, 1));
for (int i = 0; i < count; i++)
{
ConcurrencyControl::SlotPtr holder;
AcquiredSlotPtr holder;
for (int j = 0; j < count; j++)
{
auto slot = allocations[j]->tryAcquire();
@ -60,11 +60,11 @@ TEST(ConcurrencyControl, Fifo)
TEST(ConcurrencyControl, Oversubscription)
{
ConcurrencyControlTest t(10);
std::vector<ConcurrencyControl::AllocationPtr> allocations;
std::vector<SlotAllocationPtr> allocations;
allocations.reserve(10);
for (int i = 0; i < 10; i++)
allocations.emplace_back(t.cc.allocate(1, 2));
std::vector<ConcurrencyControl::SlotPtr> slots;
std::vector<AcquiredSlotPtr> slots;
// Normal allocation using maximum amount of slots
for (int i = 0; i < 5; i++)
{
@ -90,7 +90,7 @@ TEST(ConcurrencyControl, ReleaseUnacquiredSlots)
{
ConcurrencyControlTest t(10);
{
std::vector<ConcurrencyControl::AllocationPtr> allocations;
std::vector<SlotAllocationPtr> allocations;
allocations.reserve(10);
for (int i = 0; i < 10; i++)
allocations.emplace_back(t.cc.allocate(1, 2));
@ -98,7 +98,7 @@ TEST(ConcurrencyControl, ReleaseUnacquiredSlots)
}
// Check that slots were actually released
auto allocation = t.cc.allocate(0, 20);
std::vector<ConcurrencyControl::SlotPtr> acquired;
std::vector<AcquiredSlotPtr> acquired;
while (auto slot = allocation->tryAcquire())
acquired.emplace_back(std::move(slot));
ASSERT_TRUE(acquired.size() == 10);
@ -110,7 +110,7 @@ TEST(ConcurrencyControl, DestroyNotFullyAllocatedAllocation)
for (int i = 0; i < 3; i++)
{
auto allocation = t.cc.allocate(5, 20);
std::vector<ConcurrencyControl::SlotPtr> acquired;
std::vector<AcquiredSlotPtr> acquired;
while (auto slot = allocation->tryAcquire())
acquired.emplace_back(std::move(slot));
ASSERT_TRUE(acquired.size() == 10);
@ -122,7 +122,7 @@ TEST(ConcurrencyControl, DestroyAllocationBeforeSlots)
ConcurrencyControlTest t(10);
for (int i = 0; i < 3; i++)
{
std::vector<ConcurrencyControl::SlotPtr> acquired;
std::vector<AcquiredSlotPtr> acquired;
auto allocation = t.cc.allocate(5, 20);
while (auto slot = allocation->tryAcquire())
acquired.emplace_back(std::move(slot));
@ -135,7 +135,7 @@ TEST(ConcurrencyControl, GrantReleasedToTheSameAllocation)
{
ConcurrencyControlTest t(3);
auto allocation = t.cc.allocate(0, 10);
std::list<ConcurrencyControl::SlotPtr> acquired;
std::list<AcquiredSlotPtr> acquired;
while (auto slot = allocation->tryAcquire())
acquired.emplace_back(std::move(slot));
ASSERT_TRUE(acquired.size() == 3); // 0 1 2
@ -183,7 +183,7 @@ TEST(ConcurrencyControl, SetSlotCount)
{
ConcurrencyControlTest t(10);
auto allocation = t.cc.allocate(5, 30);
std::vector<ConcurrencyControl::SlotPtr> acquired;
std::vector<AcquiredSlotPtr> acquired;
while (auto slot = allocation->tryAcquire())
acquired.emplace_back(std::move(slot));
ASSERT_TRUE(acquired.size() == 10);
@ -200,7 +200,7 @@ TEST(ConcurrencyControl, SetSlotCount)
ASSERT_TRUE(acquired.size() == 5);
// Check that newly added slots are equally distributed over waiting allocations
std::vector<ConcurrencyControl::SlotPtr> acquired2;
std::vector<AcquiredSlotPtr> acquired2;
auto allocation2 = t.cc.allocate(0, 30);
ASSERT_TRUE(!allocation->tryAcquire());
t.cc.setMaxConcurrency(15); // 10 slots added: 5 to the first allocation and 5 to the second one
@ -224,7 +224,7 @@ TEST(ConcurrencyControl, MultipleThreads)
auto run_query = [&] (size_t max_threads)
{
ConcurrencyControl::AllocationPtr slots = t.cc.allocate(1, max_threads);
SlotAllocationPtr slots = t.cc.allocate(1, max_threads);
std::mutex threads_mutex;
std::vector<std::thread> threads;
threads.reserve(max_threads);

View File

@ -91,6 +91,7 @@ enum class MagicNumber : uint8_t
Decimal32 = 19,
Decimal64 = 20,
IPv4 = 21,
Date32 = 22,
};
MagicNumber serializeTypeId(std::optional<TypeIndex> type_id)
@ -109,6 +110,7 @@ MagicNumber serializeTypeId(std::optional<TypeIndex> type_id)
case TypeIndex::Int32: return MagicNumber::Int32;
case TypeIndex::Int64: return MagicNumber::Int64;
case TypeIndex::Date: return MagicNumber::Date;
case TypeIndex::Date32: return MagicNumber::Date32;
case TypeIndex::DateTime: return MagicNumber::DateTime;
case TypeIndex::DateTime64: return MagicNumber::DateTime64;
case TypeIndex::Enum8: return MagicNumber::Enum8;
@ -137,6 +139,7 @@ TypeIndex deserializeTypeId(uint8_t serialized_type_id)
case MagicNumber::Int32: return TypeIndex::Int32;
case MagicNumber::Int64: return TypeIndex::Int64;
case MagicNumber::Date: return TypeIndex::Date;
case MagicNumber::Date32: return TypeIndex::Date32;
case MagicNumber::DateTime: return TypeIndex::DateTime;
case MagicNumber::DateTime64: return TypeIndex::DateTime64;
case MagicNumber::Enum8: return TypeIndex::Enum8;
@ -165,6 +168,7 @@ TypeIndex baseType(TypeIndex type_idx)
return TypeIndex::Int16;
case TypeIndex::Int32:
case TypeIndex::Decimal32:
case TypeIndex::Date32:
return TypeIndex::Int32;
case TypeIndex::Int64:
case TypeIndex::Decimal64:
@ -205,6 +209,7 @@ TypeIndex typeIdx(const IDataType * data_type)
case TypeIndex::UInt16:
case TypeIndex::Enum16:
case TypeIndex::Date:
case TypeIndex::Date32:
case TypeIndex::Int32:
case TypeIndex::UInt32:
case TypeIndex::IPv4:

File diff suppressed because it is too large Load Diff

View File

@ -1,17 +1,26 @@
#pragma once
#include <optional>
#include <city.h>
#include <Disks/IDisk.h>
#include <IO/CompressionMethod.h>
#include <IO/HashingWriteBuffer.h>
#include <IO/WriteBufferFromFile.h>
#include <base/defines.h>
#include <libnuraft/nuraft.hxx>
#include <libnuraft/raft_server.hxx>
#include <libnuraft/ptr.hxx>
#include <Common/ThreadPool_fwd.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/ThreadPool.h>
#include <Coordination/KeeperContext.h>
#include <map>
#include <unordered_set>
#include <future>
namespace nuraft
{
struct log_entry;
struct buffer;
struct raft_server;
}
namespace Poco
{
class Logger;
}
using LoggerPtr = std::shared_ptr<Poco::Logger>;
namespace DB
{
@ -23,8 +32,11 @@ using LogEntries = std::vector<LogEntryPtr>;
using LogEntriesPtr = nuraft::ptr<LogEntries>;
using BufferPtr = nuraft::ptr<nuraft::buffer>;
using IndexToOffset = std::unordered_map<uint64_t, off_t>;
using IndexToLogEntry = std::unordered_map<uint64_t, LogEntryPtr>;
struct KeeperLogInfo;
class KeeperContext;
using KeeperContextPtr = std::shared_ptr<KeeperContext>;
class IDisk;
using DiskPtr = std::shared_ptr<IDisk>;
enum class ChangelogVersion : uint8_t
{
@ -63,10 +75,19 @@ struct ChangelogFileDescription
DiskPtr disk;
std::string path;
std::mutex file_mutex;
bool deleted = false;
/// How many entries should be stored in this log
uint64_t expectedEntriesCountInLog() const { return to_log_index - from_log_index + 1; }
template <typename TFunction>
void withLock(TFunction && fn)
{
std::lock_guard lock(file_mutex);
fn();
}
};
using ChangelogFileDescriptionPtr = std::shared_ptr<ChangelogFileDescription>;
@ -80,6 +101,8 @@ struct LogFileSettings
uint64_t rotate_interval = 100000;
uint64_t max_size = 0;
uint64_t overallocate_size = 0;
uint64_t latest_logs_cache_size_threshold = 0;
uint64_t commit_logs_cache_size_threshold = 0;
};
struct FlushSettings
@ -87,6 +110,191 @@ struct FlushSettings
uint64_t max_flush_batch_size = 1000;
};
struct LogLocation
{
ChangelogFileDescriptionPtr file_description;
size_t position;
size_t size;
};
struct PrefetchedCacheEntry
{
explicit PrefetchedCacheEntry();
const LogEntryPtr & getLogEntry() const;
void resolve(std::exception_ptr exception);
void resolve(LogEntryPtr log_entry_);
private:
std::promise<LogEntryPtr> log_entry_resolver;
mutable std::shared_future<LogEntryPtr> log_entry;
};
using CacheEntry = std::variant<LogEntryPtr, PrefetchedCacheEntry>;
using IndexToCacheEntry = std::unordered_map<uint64_t, CacheEntry>;
using IndexToCacheEntryNode = typename IndexToCacheEntry::node_type;
/**
* Storage for storing and handling deserialized entries from disk.
* It consists of 2 in-memory caches that rely heavily on the way
* entries are used in Raft.
* Random and repeated access to certain entries is almost never done so we can't implement a solution
* like LRU/SLRU cache because entries would be cached and never read again.
* Entries are often read sequentially for 2 cases:
* - for replication
* - for committing
*
* First cache will store latest logs in memory, limited by the latest_logs_cache_size_threshold coordination setting.
* Once the log is persisted to the disk, we store it's location in the file and allow the storage
* to evict that log from cache if it's needed.
* Latest logs cache should have a high hit rate in "normal" operation for both replication and committing.
*
* As we commit (and read) logs sequentially, we will try to read from latest logs cache.
* In some cases, latest logs could be ahead from last committed log by more than latest_logs_cache_size_threshold
* which means that for each commit we would need to read the log from disk.
* In case latest logs cache hits the threshold we have a second cache called commit logs cache limited by commit_logs_cache_size_threshold.
* If a log is evicted from the latest logs cache, we check if we can move it to commit logs cache to avoid re-reading the log from disk.
* If latest logs cache moves ahead of the commit log by a lot or commit log hits the threshold
* we cannot move the entries from latest logs and we will need to refill the commit cache from disk.
* To avoid reading entry by entry (which can have really bad effect on performance because we support disks based on S3),
* we try to prefetch multiple entries ahead of time because we know that they will be read by commit thread
* in the future.
* Commit logs cache should have a high hit rate if we start with a lot of unprocessed logs that cannot fit in the
* latest logs cache.
*/
struct LogEntryStorage
{
LogEntryStorage(const LogFileSettings & log_settings, KeeperContextPtr keeper_context_);
~LogEntryStorage();
void addEntry(uint64_t index, const LogEntryPtr & log_entry);
void addEntryWithLocation(uint64_t index, const LogEntryPtr & log_entry, LogLocation log_location);
/// clean all logs up to (but not including) index
void cleanUpTo(uint64_t index);
/// clean all logs after (but not including) index
void cleanAfter(uint64_t index);
bool contains(uint64_t index) const;
LogEntryPtr getEntry(uint64_t index) const;
void clear();
LogEntryPtr getLatestConfigChange() const;
uint64_t termAt(uint64_t index) const;
using IndexWithLogLocation = std::pair<uint64_t, LogLocation>;
void addLogLocations(std::vector<IndexWithLogLocation> && indices_with_log_locations);
void refreshCache();
LogEntriesPtr getLogEntriesBetween(uint64_t start, uint64_t end) const;
void getKeeperLogInfo(KeeperLogInfo & log_info) const;
bool isConfigLog(uint64_t index) const;
size_t empty() const;
size_t size() const;
size_t getFirstIndex() const;
void shutdown();
private:
void prefetchCommitLogs();
void startCommitLogsPrefetch(uint64_t last_committed_index) const;
bool shouldMoveLogToCommitCache(uint64_t index, size_t log_entry_size);
void updateTermInfoWithNewEntry(uint64_t index, uint64_t term);
struct InMemoryCache
{
explicit InMemoryCache(size_t size_threshold_);
void addEntry(uint64_t index, size_t size, CacheEntry log_entry);
void addEntry(IndexToCacheEntryNode && node);
void updateStatsWithNewEntry(uint64_t index, size_t size);
IndexToCacheEntryNode popOldestEntry();
bool containsEntry(uint64_t index) const;
LogEntryPtr getEntry(uint64_t index) const;
CacheEntry * getCacheEntry(uint64_t index);
const CacheEntry * getCacheEntry(uint64_t index) const;
PrefetchedCacheEntry & getPrefetchedCacheEntry(uint64_t index);
void cleanUpTo(uint64_t index);
void cleanAfter(uint64_t index);
bool empty() const;
size_t numberOfEntries() const;
bool hasSpaceAvailable(size_t log_entry_size) const;
void clear();
/// Mapping log_id -> log_entry
mutable IndexToCacheEntry cache;
size_t cache_size = 0;
size_t min_index_in_cache = 0;
size_t max_index_in_cache = 0;
const size_t size_threshold;
};
InMemoryCache latest_logs_cache;
mutable InMemoryCache commit_logs_cache;
LogEntryPtr latest_config;
uint64_t latest_config_index = 0;
mutable LogEntryPtr first_log_entry;
mutable uint64_t first_log_index = 0;
std::unique_ptr<ThreadFromGlobalPool> commit_logs_prefetcher;
struct FileReadInfo
{
ChangelogFileDescriptionPtr file_description;
size_t position;
size_t count;
};
struct PrefetchInfo
{
std::vector<FileReadInfo> file_infos;
std::pair<uint64_t, uint64_t> commit_prefetch_index_range;
std::atomic<bool> cancel;
std::atomic<bool> done = false;
};
mutable ConcurrentBoundedQueue<std::shared_ptr<PrefetchInfo>> prefetch_queue;
mutable std::shared_ptr<PrefetchInfo> current_prefetch_info;
mutable std::mutex logs_location_mutex;
std::vector<IndexWithLogLocation> unapplied_indices_with_log_locations;
std::unordered_map<uint64_t, LogLocation> logs_location;
size_t max_index_with_location = 0;
size_t min_index_with_location = 0;
/// store indices of logs that contain config changes
std::unordered_set<uint64_t> logs_with_config_changes;
struct LogTermInfo
{
uint64_t term = 0;
uint64_t first_index = 0;
};
/// store first index of each term
/// so we don't have to fetch log to return that information
/// terms are monotonically increasing so first index is enough
std::deque<LogTermInfo> log_term_infos;
bool is_shutdown = false;
KeeperContextPtr keeper_context;
LoggerPtr log;
};
/// Simplest changelog with files rotation.
/// No compression, no metadata, just entries with headers one by one.
/// Able to read broken files/entries and discard them. Not thread safe.
@ -114,9 +322,9 @@ public:
/// Remove log files with to_log_index <= up_to_log_index.
void compact(uint64_t up_to_log_index);
uint64_t getNextEntryIndex() const { return max_log_id + 1; }
uint64_t getNextEntryIndex() const;
uint64_t getStartIndex() const { return min_log_id; }
uint64_t getStartIndex() const;
/// Last entry in log, or fake entry with term 0 if log is empty
LogEntryPtr getLastEntry() const;
@ -128,7 +336,7 @@ public:
LogEntriesPtr getLogEntriesBetween(uint64_t start_index, uint64_t end_index);
/// Return entry at position index
LogEntryPtr entryAt(uint64_t index);
LogEntryPtr entryAt(uint64_t index) const;
/// Serialize entries from index into buffer
BufferPtr serializeEntriesToBuffer(uint64_t index, int32_t count);
@ -136,6 +344,9 @@ public:
/// Apply entries from buffer overriding existing entries
void applyEntriesFromBuffer(uint64_t index, nuraft::buffer & buffer);
bool isConfigLog(uint64_t index) const;
uint64_t termAt(uint64_t index) const;
/// Fsync latest log to disk and flush buffer
bool flush();
@ -143,7 +354,7 @@ public:
void shutdown();
uint64_t size() const { return logs.size(); }
uint64_t size() const;
uint64_t lastDurableIndex() const
{
@ -155,6 +366,8 @@ public:
bool isInitialized() const;
void getKeeperLogInfo(KeeperLogInfo & log_info) const;
/// Fsync log to disk
~Changelog();
@ -190,16 +403,14 @@ private:
std::mutex writer_mutex;
/// Current writer for changelog file
std::unique_ptr<ChangelogWriter> current_writer;
/// Mapping log_id -> log_entry
IndexToLogEntry logs;
/// Start log_id which exists in all "active" logs
/// min_log_id + 1 == max_log_id means empty log storage for NuRaft
uint64_t min_log_id = 0;
LogEntryStorage entry_storage;
uint64_t max_log_id = 0;
/// For compaction, queue of delete not used logs
/// 128 is enough, even if log is not removed, it's not a problem
ConcurrentBoundedQueue<std::pair<std::string, DiskPtr>> log_files_to_delete_queue{128};
ThreadFromGlobalPool clean_log_thread;
std::unique_ptr<ThreadFromGlobalPool> clean_log_thread;
struct AppendLog
{
@ -217,7 +428,7 @@ private:
void writeThread();
ThreadFromGlobalPool write_thread;
std::unique_ptr<ThreadFromGlobalPool> write_thread;
ConcurrentBoundedQueue<WriteOperation> write_operations;
/// Append log completion callback tries to acquire NuRaft's global lock
@ -226,7 +437,7 @@ private:
/// For those reasons we call the completion callback in a different thread
void appendCompletionThread();
ThreadFromGlobalPool append_completion_thread;
std::unique_ptr<ThreadFromGlobalPool> append_completion_thread;
ConcurrentBoundedQueue<bool> append_completion_queue;
// last_durable_index needs to be exposed through const getter so we make mutex mutable

View File

@ -34,6 +34,11 @@ void CoordinationSettings::loadFromConfig(const String & config_elem, const Poco
e.addMessage("in Coordination settings config");
throw;
}
/// for backwards compatibility we set max_requests_append_size to max_requests_batch_size
/// if max_requests_append_size was not changed
if (!max_requests_append_size.changed)
max_requests_append_size = max_requests_batch_size;
}
@ -41,7 +46,7 @@ const String KeeperConfigurationAndSettings::DEFAULT_FOUR_LETTER_WORD_CMD =
#if USE_JEMALLOC
"jmst,jmfp,jmep,jmdp,"
#endif
"conf,cons,crst,envi,ruok,srst,srvr,stat,wchs,dirs,mntr,isro,rcvr,apiv,csnp,lgif,rqld,rclc,clrs,ftfl,ydld";
"conf,cons,crst,envi,ruok,srst,srvr,stat,wchs,dirs,mntr,isro,rcvr,apiv,csnp,lgif,rqld,rclc,clrs,ftfl,ydld,pfev";
KeeperConfigurationAndSettings::KeeperConfigurationAndSettings()
: server_id(NOT_EXIST)

View File

@ -41,6 +41,7 @@ struct Settings;
M(UInt64, max_request_queue_size, 100000, "Maximum number of request that can be in queue for processing", 0) \
M(UInt64, max_requests_batch_size, 100, "Max size of batch of requests that can be sent to RAFT", 0) \
M(UInt64, max_requests_batch_bytes_size, 100*1024, "Max size in bytes of batch of requests that can be sent to RAFT", 0) \
M(UInt64, max_requests_append_size, 100, "Max size of batch of requests that can be sent to replica in append request", 0) \
M(UInt64, max_flush_batch_size, 1000, "Max size of batch of requests that can be flushed together", 0) \
M(UInt64, max_requests_quick_batch_size, 100, "Max size of batch of requests to try to get before proceeding with RAFT. Keeper will not wait for requests but take only requests that are already in queue" , 0) \
M(Bool, quorum_reads, false, "Execute read requests as writes through whole RAFT consesus with similar speed", 0) \
@ -52,7 +53,11 @@ struct Settings;
M(UInt64, log_file_overallocate_size, 50 * 1024 * 1024, "If max_log_file_size is not set to 0, this value will be added to it for preallocating bytes on disk. If a log record is larger than this value, it could lead to uncaught out-of-space issues so a larger value is preferred", 0) \
M(UInt64, min_request_size_for_cache, 50 * 1024, "Minimal size of the request to cache the deserialization result. Caching can have negative effect on latency for smaller requests, set to 0 to disable", 0) \
M(UInt64, raft_limits_reconnect_limit, 50, "If connection to a peer is silent longer than this limit * (multiplied by heartbeat interval), we re-establish the connection.", 0) \
M(Bool, async_replication, false, "Enable async replication. All write and read guarantees are preserved while better performance is achieved. Settings is disabled by default to not break backwards compatibility.", 0)
M(Bool, async_replication, false, "Enable async replication. All write and read guarantees are preserved while better performance is achieved. Settings is disabled by default to not break backwards compatibility.", 0) \
M(UInt64, latest_logs_cache_size_threshold, 1 * 1024 * 1024 * 1024, "Maximum total size of in-memory cache of latest log entries.", 0) \
M(UInt64, commit_logs_cache_size_threshold, 500 * 1024 * 1024, "Maximum total size of in-memory cache of log entries needed next for commit.", 0) \
M(UInt64, disk_move_retries_wait_ms, 1000, "How long to wait between retries after a failure which happened while a file was being moved between disks.", 0) \
M(UInt64, disk_move_retries_during_init, 100, "The amount of retries after a failure which happened while a file was being moved between disks during initialization.", 0)
DECLARE_SETTINGS_TRAITS(CoordinationSettingsTraits, LIST_OF_COORDINATION_SETTINGS)

View File

@ -9,6 +9,7 @@
#include <Common/getCurrentProcessFDCount.h>
#include <Common/getMaxFileDescriptorCount.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/config_version.h>
#include "Coordination/KeeperFeatureFlags.h"
#include <Coordination/Keeper4LWInfo.h>
#include <IO/WriteHelpers.h>
@ -37,6 +38,12 @@ String formatZxid(int64_t zxid)
}
#if USE_NURAFT
namespace ProfileEvents
{
extern const std::vector<Event> keeper_profile_events;
}
#endif
namespace DB
{
@ -193,6 +200,8 @@ void FourLetterCommandFactory::registerCommands(KeeperDispatcher & keeper_dispat
FourLetterCommandPtr jemalloc_disable_profile = std::make_shared<JemallocDisableProfile>(keeper_dispatcher);
factory.registerCommand(jemalloc_disable_profile);
#endif
FourLetterCommandPtr profile_events_command = std::make_shared<ProfileEventsCommand>(keeper_dispatcher);
factory.registerCommand(profile_events_command);
factory.initializeAllowList(keeper_dispatcher);
factory.setInitialize(true);
@ -561,6 +570,12 @@ String LogInfoCommand::run()
append("leader_committed_log_idx", log_info.leader_committed_log_idx);
append("target_committed_log_idx", log_info.target_committed_log_idx);
append("last_snapshot_idx", log_info.last_snapshot_idx);
append("latest_logs_cache_entries", log_info.latest_logs_cache_entries);
append("latest_logs_cache_size", log_info.latest_logs_cache_size);
append("commit_logs_cache_entries", log_info.commit_logs_cache_entries);
append("commit_logs_cache_size", log_info.commit_logs_cache_size);
return ret.str();
}
@ -644,4 +659,31 @@ String JemallocDisableProfile::run()
}
#endif
String ProfileEventsCommand::run()
{
StringBuffer ret;
#if USE_NURAFT
auto append = [&ret] (const String & metric, uint64_t value, const String & docs) -> void
{
writeText(metric, ret);
writeText('\t', ret);
writeText(std::to_string(value), ret);
writeText('\t', ret);
writeText(docs, ret);
writeText('\n', ret);
};
for (auto i : ProfileEvents::keeper_profile_events)
{
const auto counter = ProfileEvents::global_counters[i].load(std::memory_order_relaxed);
std::string metric_name{ProfileEvents::getName(static_cast<ProfileEvents::Event>(i))};
std::string metric_doc{ProfileEvents::getDocumentation(static_cast<ProfileEvents::Event>(i))};
append(metric_name, counter, metric_doc);
}
#endif
return ret.str();
}
}

View File

@ -1,18 +1,19 @@
#pragma once
#include <sstream>
#include <string>
#include "config.h"
#include <unordered_map>
#include <Coordination/KeeperDispatcher.h>
#include <IO/WriteBufferFromString.h>
#include <Common/config_version.h>
#include <string>
#include <boost/noncopyable.hpp>
namespace DB
{
class WriteBufferFromOwnString;
class KeeperDispatcher;
using String = std::string;
struct IFourLetterCommand;
using FourLetterCommandPtr = std::shared_ptr<DB::IFourLetterCommand>;
@ -479,4 +480,16 @@ struct JemallocDisableProfile : public IFourLetterCommand
};
#endif
struct ProfileEventsCommand : public IFourLetterCommand
{
explicit ProfileEventsCommand(KeeperDispatcher & keeper_dispatcher_)
: IFourLetterCommand(keeper_dispatcher_)
{
}
String name() override { return "pfev"; }
String run() override;
~ProfileEventsCommand() override = default;
};
}

View File

@ -191,4 +191,10 @@ bool InMemoryLogStore::compact(uint64_t last_log_index)
return true;
}
bool InMemoryLogStore::is_conf(uint64_t index)
{
auto entry = entry_at(index);
return entry != nullptr && entry->get_val_type() == nuraft::conf;
}
}

View File

@ -39,6 +39,8 @@ public:
bool flush() override { return true; }
bool is_conf(uint64_t index) override;
private:
std::map<uint64_t, nuraft::ptr<nuraft::log_entry>> logs TSA_GUARDED_BY(logs_lock);
mutable std::mutex logs_lock;

View File

@ -52,16 +52,16 @@ struct Keeper4LWInfo
struct KeeperLogInfo
{
/// My first log index in log store.
uint64_t first_log_idx;
uint64_t first_log_idx{0};
/// My first log term.
uint64_t first_log_term;
uint64_t first_log_term{0};
/// My last log index in log store.
uint64_t last_log_idx;
uint64_t last_log_idx{0};
/// My last log term.
uint64_t last_log_term;
uint64_t last_log_term{0};
/// My last committed log index in state machine.
uint64_t last_committed_log_idx;
@ -74,6 +74,12 @@ struct KeeperLogInfo
/// The largest committed log index in last snapshot.
uint64_t last_snapshot_idx;
uint64_t latest_logs_cache_entries;
uint64_t latest_logs_cache_size;
uint64_t commit_logs_cache_entries;
uint64_t commit_logs_cache_size;
};
}

View File

@ -20,7 +20,6 @@ void updateKeeperInformation(KeeperDispatcher & keeper_dispatcher, AsynchronousM
size_t ephemerals_count = 0;
size_t approximate_data_size = 0;
size_t key_arena_size = 0;
size_t latest_snapshot_size = 0;
size_t open_file_descriptor_count = 0;
std::optional<size_t> max_file_descriptor_count = 0;
size_t followers = 0;
@ -46,11 +45,8 @@ void updateKeeperInformation(KeeperDispatcher & keeper_dispatcher, AsynchronousM
ephemerals_count = state_machine.getTotalEphemeralNodesCount();
approximate_data_size = state_machine.getApproximateDataSize();
key_arena_size = state_machine.getKeyArenaSize();
latest_snapshot_size = state_machine.getLatestSnapshotBufSize();
session_with_watches = state_machine.getSessionsWithWatchesCount();
paths_watched = state_machine.getWatchedPathsCount();
//snapshot_dir_size = keeper_dispatcher.getSnapDirSize();
//log_dir_size = keeper_dispatcher.getLogDirSize();
# if defined(__linux__) || defined(__APPLE__)
open_file_descriptor_count = getCurrentProcessFDCount();
@ -76,7 +72,9 @@ void updateKeeperInformation(KeeperDispatcher & keeper_dispatcher, AsynchronousM
new_values["KeeperApproximateDataSize"] = { approximate_data_size, "The approximate data size of ClickHouse Keeper, in bytes." };
new_values["KeeperKeyArenaSize"] = { key_arena_size, "The size in bytes of the memory arena for keys in ClickHouse Keeper." };
new_values["KeeperLatestSnapshotSize"] = { latest_snapshot_size, "The uncompressed size in bytes of the latest snapshot created by ClickHouse Keeper." };
/// TODO: value was incorrectly set to 0 previously for local snapshots
/// it needs to be fixed and it needs to be atomic to avoid deadlock
///new_values["KeeperLatestSnapshotSize"] = { latest_snapshot_size, "The uncompressed size in bytes of the latest snapshot created by ClickHouse Keeper." };
new_values["KeeperOpenFileDescriptorCount"] = { open_file_descriptor_count, "The number of open file descriptors in ClickHouse Keeper." };
if (max_file_descriptor_count.has_value())
@ -99,6 +97,12 @@ void updateKeeperInformation(KeeperDispatcher & keeper_dispatcher, AsynchronousM
new_values["KeeperTargetCommitLogIdx"] = { keeper_log_info.target_committed_log_idx, "Index until which logs can be committed in ClickHouse Keeper." };
new_values["KeeperLastSnapshotIdx"] = { keeper_log_info.last_snapshot_idx, "Index of the last log present in the last created snapshot." };
new_values["KeeperLatestLogsCacheEntries"] = {keeper_log_info.latest_logs_cache_entries, "Number of entries stored in the in-memory cache for latest logs"};
new_values["KeeperLatestLogsCacheSize"] = {keeper_log_info.latest_logs_cache_size, "Total size of in-memory cache for latest logs"};
new_values["KeeperCommitLogsCacheEntries"] = {keeper_log_info.commit_logs_cache_entries, "Number of entries stored in the in-memory cache for next logs to be committed"};
new_values["KeeperCommitLogsCacheSize"] = {keeper_log_info.commit_logs_cache_size, "Total size of in-memory cache for next logs to be committed"};
auto & keeper_connection_stats = keeper_dispatcher.getKeeperConnectionStats();
new_values["KeeperMinLatency"] = { keeper_connection_stats.getMinLatency(), "Minimal request latency of ClickHouse Keeper." };

View File

@ -0,0 +1,122 @@
#include <Coordination/KeeperCommon.h>
#include <string>
#include <filesystem>
#include <Common/logger_useful.h>
#include <Disks/IDisk.h>
#include <Coordination/KeeperContext.h>
#include <Coordination/CoordinationSettings.h>
namespace DB
{
static size_t findLastSlash(StringRef path)
{
if (path.size == 0)
return std::string::npos;
for (size_t i = path.size - 1; i > 0; --i)
{
if (path.data[i] == '/')
return i;
}
if (path.data[0] == '/')
return 0;
return std::string::npos;
}
StringRef parentNodePath(StringRef path)
{
auto rslash_pos = findLastSlash(path);
if (rslash_pos > 0)
return StringRef{path.data, rslash_pos};
return "/";
}
StringRef getBaseNodeName(StringRef path)
{
size_t basename_start = findLastSlash(path);
return StringRef{path.data + basename_start + 1, path.size - basename_start - 1};
}
void moveFileBetweenDisks(
DiskPtr disk_from,
const std::string & path_from,
DiskPtr disk_to,
const std::string & path_to,
std::function<void()> before_file_remove_op,
LoggerPtr logger,
const KeeperContextPtr & keeper_context)
{
LOG_TRACE(logger, "Moving {} to {} from disk {} to disk {}", path_from, path_to, disk_from->getName(), disk_to->getName());
/// we use empty file with prefix tmp_ to detect incomplete copies
/// if a copy is complete we don't care from which disk we use the same file
/// so it's okay if a failure happens after removing of tmp file but before we remove
/// the file from the source disk
auto from_path = fs::path(path_from);
auto tmp_file_name = from_path.parent_path() / (std::string{tmp_keeper_file_prefix} + from_path.filename().string());
const auto & coordination_settings = keeper_context->getCoordinationSettings();
auto max_retries_on_init = coordination_settings->disk_move_retries_during_init.value;
auto retries_sleep = std::chrono::milliseconds(coordination_settings->disk_move_retries_wait_ms);
auto run_with_retries = [&](const auto & op, std::string_view operation_description)
{
size_t retry_num = 0;
do
{
try
{
op();
return true;
}
catch (...)
{
tryLogCurrentException(
logger,
fmt::format(
"While moving file {} to disk {} and running '{}'", path_from, disk_to->getName(), operation_description));
std::this_thread::sleep_for(retries_sleep);
}
++retry_num;
if (keeper_context->getServerState() == KeeperContext::Phase::INIT && retry_num == max_retries_on_init)
{
LOG_ERROR(logger, "Operation '{}' failed too many times", operation_description);
break;
}
} while (!keeper_context->isShutdownCalled());
LOG_ERROR(
logger,
"Failed to run '{}' while moving file {} to disk {}",
operation_description,
path_from,
disk_to->getName());
return false;
};
if (!run_with_retries(
[&]
{
auto buf = disk_to->writeFile(tmp_file_name);
buf->finalize();
},
"creating temporary file"))
return;
if (!run_with_retries([&] { disk_from->copyFile(from_path, *disk_to, path_to, {}); }, "copying file"))
return;
if (!run_with_retries([&] { disk_to->removeFileIfExists(tmp_file_name); }, "removing temporary file"))
return;
if (before_file_remove_op)
before_file_remove_op();
if (!run_with_retries([&] { disk_from->removeFileIfExists(path_from); }, "removing file from source disk"))
return;
}
}

View File

@ -0,0 +1,29 @@
#pragma once
#include <base/StringRef.h>
#include "Common/Logger.h"
namespace DB
{
class IDisk;
using DiskPtr = std::shared_ptr<IDisk>;
class KeeperContext;
using KeeperContextPtr = std::shared_ptr<KeeperContext>;
StringRef parentNodePath(StringRef path);
StringRef getBaseNodeName(StringRef path);
inline static constexpr std::string_view tmp_keeper_file_prefix = "tmp_";
void moveFileBetweenDisks(
DiskPtr disk_from,
const std::string & path_from,
DiskPtr disk_to,
const std::string & path_to,
std::function<void()> before_file_remove_op,
LoggerPtr logger,
const KeeperContextPtr & keeper_context);
}

View File

@ -284,7 +284,12 @@
M(InterfaceMySQLSendBytes) \
M(InterfaceMySQLReceiveBytes) \
M(InterfacePostgreSQLSendBytes) \
M(InterfacePostgreSQLReceiveBytes)
M(InterfacePostgreSQLReceiveBytes) \
\
M(KeeperLogsEntryReadFromLatestCache) \
M(KeeperLogsEntryReadFromCommitCache) \
M(KeeperLogsEntryReadFromFile) \
M(KeeperLogsPrefetchedEntries) \
namespace ProfileEvents
{

View File

@ -1,14 +1,16 @@
#include <Coordination/KeeperContext.h>
#include <Coordination/Defines.h>
#include <Disks/DiskLocal.h>
#include <Interpreters/Context.h>
#include <IO/S3/Credentials.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Coordination/KeeperConstants.h>
#include <Common/logger_useful.h>
#include <Server/CloudPlacementInfo.h>
#include <Coordination/KeeperFeatureFlags.h>
#include <Disks/DiskLocal.h>
#include <Disks/DiskSelector.h>
#include <IO/S3/Credentials.h>
#include <Interpreters/Context.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/logger_useful.h>
#include <boost/algorithm/string.hpp>
namespace DB
@ -21,9 +23,10 @@ extern const int BAD_ARGUMENTS;
}
KeeperContext::KeeperContext(bool standalone_keeper_)
KeeperContext::KeeperContext(bool standalone_keeper_, CoordinationSettingsPtr coordination_settings_)
: disk_selector(std::make_shared<DiskSelector>())
, standalone_keeper(standalone_keeper_)
, coordination_settings(std::move(coordination_settings_))
{
/// enable by default some feature flags
feature_flags.enableFeatureFlag(KeeperFeatureFlag::FILTERED_LIST);
@ -402,4 +405,9 @@ void KeeperContext::waitLocalLogsPreprocessedOrShutdown()
local_logs_preprocessed_cv.wait(lock, [this]{ return shutdown_called || local_logs_preprocessed; });
}
const CoordinationSettingsPtr & KeeperContext::getCoordinationSettings() const
{
return coordination_settings;
}
}

View File

@ -1,8 +1,7 @@
#pragma once
#include <Coordination/KeeperFeatureFlags.h>
#include <Disks/DiskSelector.h>
#include <IO/WriteBufferFromString.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <atomic>
#include <condition_variable>
#include <cstdint>
#include <memory>
@ -12,10 +11,19 @@ namespace DB
class KeeperDispatcher;
struct CoordinationSettings;
using CoordinationSettingsPtr = std::shared_ptr<CoordinationSettings>;
class DiskSelector;
class IDisk;
using DiskPtr = std::shared_ptr<IDisk>;
class WriteBufferFromOwnString;
class KeeperContext
{
public:
explicit KeeperContext(bool standalone_keeper_);
KeeperContext(bool standalone_keeper_, CoordinationSettingsPtr coordination_settings_);
enum class Phase : uint8_t
{
@ -68,6 +76,24 @@ public:
void waitLocalLogsPreprocessedOrShutdown();
uint64_t lastCommittedIndex() const
{
return last_committed_log_idx.load(std::memory_order_relaxed);
}
void setLastCommitIndex(uint64_t commit_index)
{
last_committed_log_idx.store(commit_index, std::memory_order_relaxed);
last_committed_log_idx.notify_all();
}
void waitLastCommittedIndexUpdated(uint64_t current_last_committed_idx)
{
last_committed_log_idx.wait(current_last_committed_idx, std::memory_order_relaxed);
}
const CoordinationSettingsPtr & getCoordinationSettings() const;
private:
/// local disk defined using path or disk name
using Storage = std::variant<DiskPtr, std::string>;
@ -89,7 +115,7 @@ private:
std::atomic<bool> local_logs_preprocessed = false;
std::atomic<bool> shutdown_called = false;
Phase server_state{Phase::INIT};
std::atomic<Phase> server_state{Phase::INIT};
bool ignore_system_path_on_startup{false};
bool digest_enabled{true};
@ -113,6 +139,10 @@ private:
KeeperDispatcher * dispatcher{nullptr};
std::atomic<UInt64> memory_soft_limit = 0;
std::atomic<UInt64> last_committed_log_idx = 0;
CoordinationSettingsPtr coordination_settings;
};
using KeeperContextPtr = std::shared_ptr<KeeperContext>;

View File

@ -256,11 +256,11 @@ void KeeperDispatcher::requestThread()
if (shutdown_called)
return;
auto current_last_committed_idx = our_last_committed_log_idx.load(std::memory_order_relaxed);
auto current_last_committed_idx = keeper_context->lastCommittedIndex();
if (current_last_committed_idx >= log_idx)
break;
our_last_committed_log_idx.wait(current_last_committed_idx);
keeper_context->waitLastCommittedIndexUpdated(current_last_committed_idx);
}
}
}
@ -414,8 +414,8 @@ void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & conf
{
LOG_DEBUG(log, "Initializing storage dispatcher");
keeper_context = std::make_shared<KeeperContext>(standalone_keeper);
configuration_and_settings = KeeperConfigurationAndSettings::loadFromConfig(config, standalone_keeper);
keeper_context = std::make_shared<KeeperContext>(standalone_keeper, configuration_and_settings->coordination_settings);
keeper_context->initialize(config, this);
@ -433,7 +433,7 @@ void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & conf
snapshots_queue,
keeper_context,
snapshot_s3,
[this](uint64_t log_idx, const KeeperStorage::RequestForSession & request_for_session)
[this](uint64_t /*log_idx*/, const KeeperStorage::RequestForSession & request_for_session)
{
{
/// check if we have queue of read requests depending on this request to be committed
@ -457,9 +457,6 @@ void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & conf
}
}
}
our_last_committed_log_idx.store(log_idx, std::memory_order_relaxed);
our_last_committed_log_idx.notify_all();
});
try
@ -504,8 +501,9 @@ void KeeperDispatcher::shutdown()
LOG_DEBUG(log, "Shutting down storage dispatcher");
our_last_committed_log_idx = std::numeric_limits<uint64_t>::max();
our_last_committed_log_idx.notify_all();
/// some threads can be waiting for certain commits, so we set value
/// of the last commit index to something that will always unblock
keeper_context->setLastCommitIndex(std::numeric_limits<uint64_t>::max());
if (session_cleaner_thread.joinable())
session_cleaner_thread.join();

View File

@ -105,8 +105,6 @@ private:
public:
std::mutex read_request_queue_mutex;
std::atomic<uint64_t> our_last_committed_log_idx = 0;
/// queue of read requests that can be processed after a request with specific session ID and XID is committed
std::unordered_map<int64_t, std::unordered_map<Coordination::XID, KeeperStorage::RequestsForSessions>> read_request_queue;

View File

@ -66,13 +66,16 @@ nuraft::ptr<nuraft::log_entry> KeeperLogStore::entry_at(uint64_t index)
return changelog.entryAt(index);
}
bool KeeperLogStore::is_conf(uint64_t index)
{
std::lock_guard lock(changelog_lock);
return changelog.isConfigLog(index);
}
uint64_t KeeperLogStore::term_at(uint64_t index)
{
std::lock_guard lock(changelog_lock);
auto entry = changelog.entryAt(index);
if (entry)
return entry->get_term();
return 0;
return changelog.termAt(index);
}
nuraft::ptr<nuraft::buffer> KeeperLogStore::pack(uint64_t index, int32_t cnt)
@ -145,4 +148,10 @@ void KeeperLogStore::setRaftServer(const nuraft::ptr<nuraft::raft_server> & raft
return changelog.setRaftServer(raft_server);
}
void KeeperLogStore::getKeeperLogInfo(KeeperLogInfo & log_info) const
{
std::lock_guard lock(changelog_lock);
changelog.getKeeperLogInfo(log_info);
}
}

View File

@ -1,10 +1,10 @@
#pragma once
#include <libnuraft/log_store.hxx>
#include <map>
#include <mutex>
#include <Core/Types.h>
#include <Coordination/Changelog.h>
#include <Coordination/KeeperContext.h>
#include <Coordination/Keeper4LWInfo.h>
#include <base/defines.h>
namespace DB
@ -38,6 +38,8 @@ public:
/// Return entry at index
nuraft::ptr<nuraft::log_entry> entry_at(uint64_t index) override;
bool is_conf(uint64_t index) override;
/// Term if the index
uint64_t term_at(uint64_t index) override;
@ -72,6 +74,8 @@ public:
void setRaftServer(const nuraft::ptr<nuraft::raft_server> & raft_server);
void getKeeperLogInfo(KeeperLogInfo & log_info) const;
private:
mutable std::mutex changelog_lock;
LoggerPtr log;

View File

@ -6,6 +6,7 @@
#include <chrono>
#include <mutex>
#include <string>
#include <Coordination/KeeperLogStore.h>
#include <Coordination/KeeperStateMachine.h>
#include <Coordination/KeeperStateManager.h>
#include <Coordination/KeeperSnapshotManagerS3.h>
@ -119,22 +120,20 @@ KeeperServer::KeeperServer(
KeeperSnapshotManagerS3 & snapshot_manager_s3,
KeeperStateMachine::CommitCallback commit_callback)
: server_id(configuration_and_settings_->server_id)
, coordination_settings(configuration_and_settings_->coordination_settings)
, log(getLogger("KeeperServer"))
, is_recovering(config.getBool("keeper_server.force_recovery", false))
, keeper_context{std::move(keeper_context_)}
, create_snapshot_on_exit(config.getBool("keeper_server.create_snapshot_on_exit", true))
, enable_reconfiguration(config.getBool("keeper_server.enable_reconfiguration", false))
{
if (coordination_settings->quorum_reads)
if (keeper_context->getCoordinationSettings()->quorum_reads)
LOG_WARNING(log, "Quorum reads enabled, Keeper will work slower.");
state_machine = nuraft::cs_new<KeeperStateMachine>(
responses_queue_,
snapshots_queue_,
coordination_settings,
keeper_context,
config.getBool("keeper_server.upload_snapshot_on_exit", true) ? &snapshot_manager_s3 : nullptr,
config.getBool("keeper_server.upload_snapshot_on_exit", false) ? &snapshot_manager_s3 : nullptr,
commit_callback,
checkAndGetSuperdigest(configuration_and_settings_->super_digest));
@ -143,7 +142,6 @@ KeeperServer::KeeperServer(
"keeper_server",
"state",
config,
coordination_settings,
keeper_context);
}
@ -226,7 +224,7 @@ void KeeperServer::loadLatestConfig()
{
auto latest_snapshot_config = state_machine->getClusterConfig();
auto latest_log_store_config = state_manager->getLatestConfigFromLogStore();
auto async_replication = coordination_settings->async_replication;
auto async_replication = keeper_context->getCoordinationSettings()->async_replication;
if (latest_snapshot_config && latest_log_store_config)
{
@ -293,6 +291,8 @@ void KeeperServer::forceRecovery()
void KeeperServer::launchRaftServer(const Poco::Util::AbstractConfiguration & config, bool enable_ipv6)
{
const auto & coordination_settings = keeper_context->getCoordinationSettings();
nuraft::raft_params params;
params.parallel_log_appending_ = true;
params.heart_beat_interval_
@ -332,7 +332,7 @@ void KeeperServer::launchRaftServer(const Poco::Util::AbstractConfiguration & co
params.auto_forwarding_req_timeout_
= getValueOrMaxInt32AndLogWarning(coordination_settings->operation_timeout_ms.totalMilliseconds() * 2, "operation_timeout_ms", log);
params.max_append_size_
= getValueOrMaxInt32AndLogWarning(coordination_settings->max_requests_batch_size, "max_requests_batch_size", log);
= getValueOrMaxInt32AndLogWarning(coordination_settings->max_requests_append_size, "max_requests_append_size", log);
params.return_method_ = nuraft::raft_params::async_handler;
@ -427,6 +427,10 @@ void KeeperServer::startup(const Poco::Util::AbstractConfiguration & config, boo
{
state_machine->init();
keeper_context->setLastCommitIndex(state_machine->last_commit_index());
const auto & coordination_settings = keeper_context->getCoordinationSettings();
state_manager->loadLogStore(state_machine->last_commit_index() + 1, coordination_settings->reserved_log_items);
auto log_store = state_manager->load_log_store();
@ -446,7 +450,7 @@ void KeeperServer::startup(const Poco::Util::AbstractConfiguration & config, boo
void KeeperServer::shutdownRaftServer()
{
size_t timeout = coordination_settings->shutdown_timeout.totalSeconds();
size_t timeout = keeper_context->getCoordinationSettings()->shutdown_timeout.totalSeconds();
if (!raft_instance)
{
@ -870,7 +874,7 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
/// Node first became leader, and after that some other node became leader.
/// BecameFresh for this node will not be called because it was already fresh
/// when it was leader.
if (leader_index < our_index + coordination_settings->fresh_log_gap)
if (leader_index < our_index + keeper_context->getCoordinationSettings()->fresh_log_gap)
set_initialized();
}
return nuraft::cb_func::ReturnCode::Ok;
@ -905,7 +909,7 @@ void KeeperServer::waitInit()
{
std::unique_lock lock(initialized_mutex);
int64_t timeout = coordination_settings->startup_timeout.totalMilliseconds();
int64_t timeout = keeper_context->getCoordinationSettings()->startup_timeout.totalMilliseconds();
if (!initialized_cv.wait_for(lock, std::chrono::milliseconds(timeout), [&] { return initialized_flag.load(); }))
LOG_WARNING(log, "Failed to wait for RAFT initialization in {}ms, will continue in background", timeout);
}
@ -977,6 +981,7 @@ KeeperServer::ConfigUpdateState KeeperServer::applyConfigUpdate(
ClusterUpdateActions KeeperServer::getRaftConfigurationDiff(const Poco::Util::AbstractConfiguration & config)
{
const auto & coordination_settings = keeper_context->getCoordinationSettings();
auto diff = state_manager->getRaftConfigurationDiff(config, coordination_settings);
if (!diff.empty())
@ -1004,6 +1009,7 @@ void KeeperServer::applyConfigUpdateWithReconfigDisabled(const ClusterUpdateActi
std::this_thread::sleep_for(sleep_time * (i + 1));
};
const auto & coordination_settings = keeper_context->getCoordinationSettings();
if (const auto * add = std::get_if<AddRaftServer>(&action))
{
for (size_t i = 0; i < coordination_settings->configuration_change_tries_count && !is_recovering; ++i)
@ -1059,6 +1065,7 @@ bool KeeperServer::waitForConfigUpdateWithReconfigDisabled(const ClusterUpdateAc
auto became_leader = [&] { LOG_INFO(log, "Became leader, aborting"); return false; };
auto backoff = [&](size_t i) { std::this_thread::sleep_for(sleep_time * (i + 1)); };
const auto & coordination_settings = keeper_context->getCoordinationSettings();
if (const auto* add = std::get_if<AddRaftServer>(&action))
{
for (size_t i = 0; i < coordination_settings->configuration_change_tries_count && !is_recovering; ++i)
@ -1125,14 +1132,12 @@ KeeperLogInfo KeeperServer::getKeeperLogInfo()
auto log_store = state_manager->load_log_store();
if (log_store)
{
log_info.first_log_idx = log_store->start_index();
log_info.first_log_term = log_store->term_at(log_info.first_log_idx);
const auto & keeper_log_storage = static_cast<const KeeperLogStore &>(*log_store);
keeper_log_storage.getKeeperLogInfo(log_info);
}
if (raft_instance)
{
log_info.last_log_idx = raft_instance->get_last_log_idx();
log_info.last_log_term = raft_instance->get_last_log_term();
log_info.last_committed_log_idx = raft_instance->get_committed_log_idx();
log_info.leader_committed_log_idx = raft_instance->get_leader_committed_log_idx();
log_info.target_committed_log_idx = raft_instance->get_target_committed_log_idx();

View File

@ -22,8 +22,6 @@ class KeeperServer
private:
const int server_id;
CoordinationSettingsPtr coordination_settings;
nuraft::ptr<KeeperStateMachine> state_machine;
nuraft::ptr<KeeperStateManager> state_manager;

View File

@ -3,6 +3,7 @@
#include <Coordination/KeeperSnapshotManager.h>
#include <Coordination/ReadBufferFromNuraftBuffer.h>
#include <Coordination/WriteBufferFromNuraftBuffer.h>
#include <Coordination/CoordinationSettings.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
@ -13,7 +14,7 @@
#include <memory>
#include <Common/logger_useful.h>
#include <Coordination/KeeperContext.h>
#include <Coordination/pathUtils.h>
#include <Coordination/KeeperCommon.h>
#include <Coordination/KeeperConstants.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Core/Field.h>
@ -32,23 +33,21 @@ namespace ErrorCodes
namespace
{
constexpr std::string_view tmp_prefix = "tmp_";
void moveFileBetweenDisks(DiskPtr disk_from, const std::string & path_from, DiskPtr disk_to, const std::string & path_to)
void moveSnapshotBetweenDisks(
DiskPtr disk_from,
const std::string & path_from,
DiskPtr disk_to,
const std::string & path_to,
const KeeperContextPtr & keeper_context)
{
/// we use empty file with prefix tmp_ to detect incomplete copies
/// if a copy is complete we don't care from which disk we use the same file
/// so it's okay if a failure happens after removing of tmp file but before we remove
/// the snapshot from the source disk
auto from_path = fs::path(path_from);
auto tmp_snapshot_name = from_path.parent_path() / (std::string{tmp_prefix} + from_path.filename().string());
{
auto buf = disk_to->writeFile(tmp_snapshot_name);
buf->finalize();
}
disk_from->copyFile(from_path, *disk_to, path_to, {});
disk_to->removeFile(tmp_snapshot_name);
disk_from->removeFile(path_from);
moveFileBetweenDisks(
std::move(disk_from),
path_from,
std::move(disk_to),
path_to,
/*before_file_remove_op=*/{},
getLogger("KeeperSnapshotManager"),
keeper_context);
}
uint64_t getSnapshotPathUpToLogIdx(const String & snapshot_path)
@ -582,9 +581,9 @@ KeeperSnapshotManager::KeeperSnapshotManager(
std::vector<std::string> snapshot_files;
for (auto it = disk->iterateDirectory(""); it->isValid(); it->next())
{
if (it->name().starts_with(tmp_prefix))
if (it->name().starts_with(tmp_keeper_file_prefix))
{
incomplete_files.emplace(it->name().substr(tmp_prefix.size()), it->path());
incomplete_files.emplace(it->name().substr(tmp_keeper_file_prefix.size()), it->path());
continue;
}
@ -603,7 +602,7 @@ KeeperSnapshotManager::KeeperSnapshotManager(
if (!inserted)
LOG_WARNING(
getLogger("KeeperSnapshotManager"),
log,
"Found another snapshots with last log idx {}, will use snapshot from disk {}",
snapshot_up_to,
disk->getName());
@ -612,6 +611,9 @@ KeeperSnapshotManager::KeeperSnapshotManager(
for (const auto & [name, path] : incomplete_files)
disk->removeFile(path);
if (snapshot_files.empty())
LOG_TRACE(log, "No snapshots were found on {}", disk->getName());
read_disks.insert(disk);
};
@ -774,7 +776,7 @@ void KeeperSnapshotManager::moveSnapshotsIfNeeded()
{
if (file_info.disk != latest_snapshot_disk)
{
moveFileBetweenDisks(file_info.disk, file_info.path, latest_snapshot_disk, file_info.path);
moveSnapshotBetweenDisks(file_info.disk, file_info.path, latest_snapshot_disk, file_info.path, keeper_context);
file_info.disk = latest_snapshot_disk;
}
}
@ -782,7 +784,7 @@ void KeeperSnapshotManager::moveSnapshotsIfNeeded()
{
if (file_info.disk != disk)
{
moveFileBetweenDisks(file_info.disk, file_info.path, disk, file_info.path);
moveSnapshotBetweenDisks(file_info.disk, file_info.path, disk, file_info.path, keeper_context);
file_info.disk = disk;
}
}

View File

@ -11,6 +11,7 @@
#include <base/errnoToString.h>
#include <base/move_extend.h>
#include <sys/mman.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/ZooKeeper/ZooKeeperIO.h>
@ -42,23 +43,20 @@ namespace ErrorCodes
KeeperStateMachine::KeeperStateMachine(
ResponsesQueue & responses_queue_,
SnapshotsQueue & snapshots_queue_,
const CoordinationSettingsPtr & coordination_settings_,
const KeeperContextPtr & keeper_context_,
KeeperSnapshotManagerS3 * snapshot_manager_s3_,
CommitCallback commit_callback_,
const std::string & superdigest_)
: commit_callback(commit_callback_)
, coordination_settings(coordination_settings_)
, snapshot_manager(
coordination_settings->snapshots_to_keep,
keeper_context_->getCoordinationSettings()->snapshots_to_keep,
keeper_context_,
coordination_settings->compress_snapshots_with_zstd_format,
keeper_context_->getCoordinationSettings()->compress_snapshots_with_zstd_format,
superdigest_,
coordination_settings->dead_session_check_period_ms.totalMilliseconds())
keeper_context_->getCoordinationSettings()->dead_session_check_period_ms.totalMilliseconds())
, responses_queue(responses_queue_)
, snapshots_queue(snapshots_queue_)
, min_request_size_to_cache(coordination_settings_->min_request_size_for_cache)
, last_committed_idx(0)
, min_request_size_to_cache(keeper_context_->getCoordinationSettings()->min_request_size_for_cache)
, log(getLogger("KeeperStateMachine"))
, superdigest(superdigest_)
, keeper_context(keeper_context_)
@ -100,7 +98,7 @@ void KeeperStateMachine::init()
storage = std::move(snapshot_deserialization_result.storage);
latest_snapshot_meta = snapshot_deserialization_result.snapshot_meta;
cluster_config = snapshot_deserialization_result.cluster_config;
last_committed_idx = latest_snapshot_meta->get_last_log_idx();
keeper_context->setLastCommitIndex(latest_snapshot_meta->get_last_log_idx());
loaded = true;
break;
}
@ -115,6 +113,7 @@ void KeeperStateMachine::init()
}
}
auto last_committed_idx = keeper_context->lastCommittedIndex();
if (has_snapshots)
{
if (loaded)
@ -129,7 +128,7 @@ void KeeperStateMachine::init()
if (!storage)
storage = std::make_unique<KeeperStorage>(
coordination_settings->dead_session_check_period_ms.totalMilliseconds(), superdigest, keeper_context);
keeper_context->getCoordinationSettings()->dead_session_check_period_ms.totalMilliseconds(), superdigest, keeper_context);
}
namespace
@ -139,16 +138,18 @@ void assertDigest(
const KeeperStorage::Digest & expected,
const KeeperStorage::Digest & actual,
const Coordination::ZooKeeperRequest & request,
uint64_t log_idx,
bool committing)
{
if (!KeeperStorage::checkDigest(expected, actual))
{
LOG_FATAL(
getLogger("KeeperStateMachine"),
"Digest for nodes is not matching after {} request of type '{}'.\nExpected digest - {}, actual digest - {} (digest "
"{}). Keeper will terminate to avoid inconsistencies.\nExtra information about the request:\n{}",
"Digest for nodes is not matching after {} request of type '{}' at log index {}.\nExpected digest - {}, actual digest - {} "
"(digest {}). Keeper will terminate to avoid inconsistencies.\nExtra information about the request:\n{}",
committing ? "committing" : "preprocessing",
request.getOpNum(),
log_idx,
expected.value,
actual.value,
expected.version,
@ -296,12 +297,12 @@ bool KeeperStateMachine::preprocess(const KeeperStorage::RequestForSession & req
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__, "Failed to preprocess stored log, aborting to avoid inconsistent state");
tryLogCurrentException(__PRETTY_FUNCTION__, fmt::format("Failed to preprocess stored log at index {}, aborting to avoid inconsistent state", request_for_session.log_idx));
std::abort();
}
if (keeper_context->digestEnabled() && request_for_session.digest)
assertDigest(*request_for_session.digest, storage->getNodesDigest(false), *request_for_session.request, false);
assertDigest(*request_for_session.digest, storage->getNodesDigest(false), *request_for_session.request, request_for_session.log_idx, false);
return true;
}
@ -408,6 +409,8 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
}
};
try
{
const auto op_num = request_for_session->request->getOpNum();
if (op_num == Coordination::OpNum::SessionID)
{
@ -442,14 +445,21 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
try_push(response_for_session);
if (keeper_context->digestEnabled() && request_for_session->digest)
assertDigest(*request_for_session->digest, storage->getNodesDigest(true), *request_for_session->request, true);
assertDigest(*request_for_session->digest, storage->getNodesDigest(true), *request_for_session->request, request_for_session->log_idx, true);
}
ProfileEvents::increment(ProfileEvents::KeeperCommits);
last_committed_idx = log_idx;
keeper_context->setLastCommitIndex(log_idx);
if (commit_callback)
commit_callback(log_idx, *request_for_session);
}
catch (...)
{
tryLogCurrentException(log, fmt::format("Failed to commit stored log at index {}", log_idx));
throw;
}
return nullptr;
}
@ -496,7 +506,7 @@ bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s)
}
ProfileEvents::increment(ProfileEvents::KeeperSnapshotApplys);
last_committed_idx = s.get_last_log_idx();
keeper_context->setLastCommitIndex(s.get_last_log_idx());
return true;
}
@ -506,7 +516,7 @@ void KeeperStateMachine::commit_config(const uint64_t log_idx, nuraft::ptr<nuraf
std::lock_guard lock(cluster_config_lock);
auto tmp = new_conf->serialize();
cluster_config = ClusterConfig::deserialize(*tmp);
last_committed_idx = log_idx;
keeper_context->setLastCommitIndex(log_idx);
}
void KeeperStateMachine::rollback(uint64_t log_idx, nuraft::buffer & data)

View File

@ -25,7 +25,6 @@ public:
KeeperStateMachine(
ResponsesQueue & responses_queue_,
SnapshotsQueue & snapshots_queue_,
const CoordinationSettingsPtr & coordination_settings_,
const KeeperContextPtr & keeper_context_,
KeeperSnapshotManagerS3 * snapshot_manager_s3_,
CommitCallback commit_callback_ = {},
@ -70,7 +69,7 @@ public:
const KeeperStorage::RequestForSession & request_for_session,
bool allow_missing) TSA_NO_THREAD_SAFETY_ANALYSIS;
uint64_t last_commit_index() override { return last_committed_idx; }
uint64_t last_commit_index() override { return keeper_context->lastCommittedIndex(); }
/// Apply preliminarily saved (save_logical_snp_obj) snapshot to our state.
bool apply_snapshot(nuraft::snapshot & s) override;
@ -139,8 +138,6 @@ private:
SnapshotFileInfo latest_snapshot_info;
nuraft::ptr<nuraft::buffer> latest_snapshot_buf = nullptr;
CoordinationSettingsPtr coordination_settings;
/// Main state machine logic
KeeperStoragePtr storage TSA_PT_GUARDED_BY(storage_and_responses_lock);
@ -170,9 +167,6 @@ private:
/// can be processed only in 1 thread at any point
std::mutex request_cache_mutex;
/// Last committed Raft log number.
std::atomic<uint64_t> last_committed_idx;
LoggerPtr log;
/// Cluster config for our quorum.

View File

@ -241,23 +241,25 @@ KeeperStateManager::KeeperStateManager(
const std::string & config_prefix_,
const std::string & server_state_file_name_,
const Poco::Util::AbstractConfiguration & config,
const CoordinationSettingsPtr & coordination_settings,
KeeperContextPtr keeper_context_)
: my_server_id(my_server_id_)
, secure(config.getBool(config_prefix_ + ".raft_configuration.secure", false))
, config_prefix(config_prefix_)
, configuration_wrapper(parseServersConfiguration(config, false, coordination_settings->async_replication))
, configuration_wrapper(parseServersConfiguration(config, false, keeper_context_->getCoordinationSettings()->async_replication))
, log_store(nuraft::cs_new<KeeperLogStore>(
LogFileSettings
{
.force_sync = coordination_settings->force_sync,
.compress_logs = coordination_settings->compress_logs,
.rotate_interval = coordination_settings->rotate_log_storage_interval,
.max_size = coordination_settings->max_log_file_size,
.overallocate_size = coordination_settings->log_file_overallocate_size},
.force_sync = keeper_context_->getCoordinationSettings()->force_sync,
.compress_logs = keeper_context_->getCoordinationSettings()->compress_logs,
.rotate_interval = keeper_context_->getCoordinationSettings()->rotate_log_storage_interval,
.max_size = keeper_context_->getCoordinationSettings()->max_log_file_size,
.overallocate_size = keeper_context_->getCoordinationSettings()->log_file_overallocate_size,
.latest_logs_cache_size_threshold = keeper_context_->getCoordinationSettings()->latest_logs_cache_size_threshold,
.commit_logs_cache_size_threshold = keeper_context_->getCoordinationSettings()->commit_logs_cache_size_threshold
},
FlushSettings
{
.max_flush_batch_size = coordination_settings->max_flush_batch_size,
.max_flush_batch_size = keeper_context_->getCoordinationSettings()->max_flush_batch_size,
},
keeper_context_))
, server_state_file_name(server_state_file_name_)

View File

@ -23,7 +23,6 @@ public:
const std::string & config_prefix_,
const std::string & server_state_file_name_,
const Poco::Util::AbstractConfiguration & config,
const CoordinationSettingsPtr & coordination_settings,
KeeperContextPtr keeper_context_);
/// Constructor for tests

View File

@ -18,7 +18,7 @@
#include <Common/LockMemoryExceptionInThread.h>
#include <Common/ProfileEvents.h>
#include <Coordination/pathUtils.h>
#include <Coordination/KeeperCommon.h>
#include <Coordination/KeeperConstants.h>
#include <Coordination/KeeperReconfiguration.h>
#include <Coordination/KeeperStorage.h>
@ -26,7 +26,6 @@
#include <functional>
#include <base/defines.h>
#include <filesystem>
namespace ProfileEvents
{
@ -1583,7 +1582,7 @@ struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProc
{
auto path_prefix = request.path;
if (path_prefix.empty())
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Path cannot be empty");
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: path cannot be empty");
const auto & children = node_it->value.getChildren();
response.names.reserve(children.size());

View File

@ -8,7 +8,7 @@
#include <Common/ZooKeeper/ZooKeeperIO.h>
#include <Common/logger_useful.h>
#include <IO/ReadBufferFromFile.h>
#include <Coordination/pathUtils.h>
#include <Coordination/KeeperCommon.h>
namespace DB

View File

@ -1,37 +0,0 @@
#include <Coordination/pathUtils.h>
namespace DB
{
static size_t findLastSlash(StringRef path)
{
if (path.size == 0)
return std::string::npos;
for (size_t i = path.size - 1; i > 0; --i)
{
if (path.data[i] == '/')
return i;
}
if (path.data[0] == '/')
return 0;
return std::string::npos;
}
StringRef parentNodePath(StringRef path)
{
auto rslash_pos = findLastSlash(path);
if (rslash_pos > 0)
return StringRef{path.data, rslash_pos};
return "/";
}
StringRef getBaseNodeName(StringRef path)
{
size_t basename_start = findLastSlash(path);
return StringRef{path.data + basename_start + 1, path.size - basename_start - 1};
}
}

View File

@ -1,13 +0,0 @@
#pragma once
#include <string>
#include <base/StringRef.h>
namespace DB
{
StringRef parentNodePath(StringRef path);
StringRef getBaseNodeName(StringRef path);
}

View File

@ -1,8 +1,6 @@
#include <chrono>
#include <gtest/gtest.h>
#include "Common/ZooKeeper/IKeeper.h"
#include "Core/Defines.h"
#include "config.h"
#if USE_NURAFT
@ -22,7 +20,7 @@
#include <Coordination/ReadBufferFromNuraftBuffer.h>
#include <Coordination/SummingStateMachine.h>
#include <Coordination/WriteBufferFromNuraftBuffer.h>
#include <Coordination/pathUtils.h>
#include <Coordination/KeeperCommon.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <libnuraft/nuraft.hxx>
@ -65,7 +63,7 @@ struct CompressionParam
class CoordinationTest : public ::testing::TestWithParam<CompressionParam>
{
protected:
DB::KeeperContextPtr keeper_context = std::make_shared<DB::KeeperContext>(true);
DB::KeeperContextPtr keeper_context = std::make_shared<DB::KeeperContext>(true, std::make_shared<DB::CoordinationSettings>());
LoggerPtr log{getLogger("CoordinationTest")};
void SetUp() override
@ -558,6 +556,7 @@ TEST_P(CoordinationTest, ChangelogTestCompaction)
EXPECT_EQ(changelog.size(), 3);
keeper_context->setLastCommitIndex(2);
changelog.compact(2);
EXPECT_EQ(changelog.size(), 1);
@ -582,6 +581,7 @@ TEST_P(CoordinationTest, ChangelogTestCompaction)
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension));
keeper_context->setLastCommitIndex(6);
changelog.compact(6);
std::this_thread::sleep_for(std::chrono::microseconds(1000));
@ -1758,22 +1758,30 @@ getLogEntryFromZKRequest(size_t term, int64_t session_id, int64_t zxid, const Co
}
void testLogAndStateMachine(
Coordination::CoordinationSettingsPtr settings,
DB::CoordinationSettingsPtr settings,
uint64_t total_logs,
bool enable_compression,
Coordination::KeeperContextPtr keeper_context)
bool enable_compression)
{
using namespace Coordination;
using namespace DB;
ChangelogDirTest snapshots("./snapshots");
keeper_context->setSnapshotDisk(std::make_shared<DiskLocal>("SnapshotDisk", "./snapshots"));
ChangelogDirTest logs("./logs");
keeper_context->setLogDisk(std::make_shared<DiskLocal>("LogDisk", "./logs"));
auto get_keeper_context = [&]
{
auto local_keeper_context = std::make_shared<DB::KeeperContext>(true, settings);
local_keeper_context->setSnapshotDisk(std::make_shared<DiskLocal>("SnapshotDisk", "./snapshots"));
local_keeper_context->setLogDisk(std::make_shared<DiskLocal>("LogDisk", "./logs"));
return local_keeper_context;
};
ResponsesQueue queue(std::numeric_limits<size_t>::max());
SnapshotsQueue snapshots_queue{1};
auto state_machine = std::make_shared<KeeperStateMachine>(queue, snapshots_queue, settings, keeper_context, nullptr);
auto keeper_context = get_keeper_context();
auto state_machine = std::make_shared<KeeperStateMachine>(queue, snapshots_queue, keeper_context, nullptr);
state_machine->init();
DB::KeeperLogStore changelog(
DB::LogFileSettings{
@ -1811,12 +1819,14 @@ void testLogAndStateMachine(
snapshot_task.create_snapshot(std::move(snapshot_task.snapshot));
}
if (snapshot_created && changelog.size() > settings->reserved_log_items)
changelog.compact(i - settings->reserved_log_items);
}
SnapshotsQueue snapshots_queue1{1};
auto restore_machine = std::make_shared<KeeperStateMachine>(queue, snapshots_queue1, settings, keeper_context, nullptr);
keeper_context = get_keeper_context();
auto restore_machine = std::make_shared<KeeperStateMachine>(queue, snapshots_queue1, keeper_context, nullptr);
restore_machine->init();
EXPECT_EQ(restore_machine->last_commit_index(), total_logs - total_logs % settings->snapshot_distance);
@ -1863,63 +1873,64 @@ TEST_P(CoordinationTest, TestStateMachineAndLogStore)
settings->snapshot_distance = 10;
settings->reserved_log_items = 10;
settings->rotate_log_storage_interval = 10;
testLogAndStateMachine(settings, 37, params.enable_compression, keeper_context);
testLogAndStateMachine(settings, 37, params.enable_compression);
}
{
CoordinationSettingsPtr settings = std::make_shared<CoordinationSettings>();
settings->snapshot_distance = 10;
settings->reserved_log_items = 10;
settings->rotate_log_storage_interval = 10;
testLogAndStateMachine(settings, 11, params.enable_compression, keeper_context);
testLogAndStateMachine(settings, 11, params.enable_compression);
}
{
CoordinationSettingsPtr settings = std::make_shared<CoordinationSettings>();
settings->snapshot_distance = 10;
settings->reserved_log_items = 10;
settings->rotate_log_storage_interval = 10;
testLogAndStateMachine(settings, 40, params.enable_compression, keeper_context);
testLogAndStateMachine(settings, 40, params.enable_compression);
}
{
CoordinationSettingsPtr settings = std::make_shared<CoordinationSettings>();
settings->snapshot_distance = 10;
settings->reserved_log_items = 20;
settings->rotate_log_storage_interval = 30;
testLogAndStateMachine(settings, 40, params.enable_compression, keeper_context);
testLogAndStateMachine(settings, 40, params.enable_compression);
}
{
CoordinationSettingsPtr settings = std::make_shared<CoordinationSettings>();
settings->snapshot_distance = 10;
settings->reserved_log_items = 0;
settings->rotate_log_storage_interval = 10;
testLogAndStateMachine(settings, 40, params.enable_compression, keeper_context);
testLogAndStateMachine(settings, 40, params.enable_compression);
}
{
CoordinationSettingsPtr settings = std::make_shared<CoordinationSettings>();
settings->snapshot_distance = 1;
settings->reserved_log_items = 1;
settings->rotate_log_storage_interval = 32;
testLogAndStateMachine(settings, 32, params.enable_compression, keeper_context);
testLogAndStateMachine(settings, 32, params.enable_compression);
}
{
CoordinationSettingsPtr settings = std::make_shared<CoordinationSettings>();
settings->snapshot_distance = 10;
settings->reserved_log_items = 7;
settings->rotate_log_storage_interval = 1;
testLogAndStateMachine(settings, 33, params.enable_compression, keeper_context);
testLogAndStateMachine(settings, 33, params.enable_compression);
}
{
CoordinationSettingsPtr settings = std::make_shared<CoordinationSettings>();
settings->snapshot_distance = 37;
settings->reserved_log_items = 1000;
settings->rotate_log_storage_interval = 5000;
testLogAndStateMachine(settings, 33, params.enable_compression, keeper_context);
testLogAndStateMachine(settings, 33, params.enable_compression);
}
{
CoordinationSettingsPtr settings = std::make_shared<CoordinationSettings>();
settings->snapshot_distance = 37;
settings->reserved_log_items = 1000;
settings->rotate_log_storage_interval = 5000;
testLogAndStateMachine(settings, 45, params.enable_compression, keeper_context);
testLogAndStateMachine(settings, 45, params.enable_compression);
}
}
@ -1931,11 +1942,10 @@ TEST_P(CoordinationTest, TestEphemeralNodeRemove)
ChangelogDirTest snapshots("./snapshots");
setSnapshotDirectory("./snapshots");
CoordinationSettingsPtr settings = std::make_shared<CoordinationSettings>();
ResponsesQueue queue(std::numeric_limits<size_t>::max());
SnapshotsQueue snapshots_queue{1};
auto state_machine = std::make_shared<KeeperStateMachine>(queue, snapshots_queue, settings, keeper_context, nullptr);
auto state_machine = std::make_shared<KeeperStateMachine>(queue, snapshots_queue, keeper_context, nullptr);
state_machine->init();
std::shared_ptr<ZooKeeperCreateRequest> request_c = std::make_shared<ZooKeeperCreateRequest>();
@ -1965,11 +1975,10 @@ TEST_P(CoordinationTest, TestCreateNodeWithAuthSchemeForAclWhenAuthIsPrecommitte
ChangelogDirTest snapshots("./snapshots");
setSnapshotDirectory("./snapshots");
CoordinationSettingsPtr settings = std::make_shared<CoordinationSettings>();
ResponsesQueue queue(std::numeric_limits<size_t>::max());
SnapshotsQueue snapshots_queue{1};
auto state_machine = std::make_shared<KeeperStateMachine>(queue, snapshots_queue, settings, keeper_context, nullptr);
auto state_machine = std::make_shared<KeeperStateMachine>(queue, snapshots_queue, keeper_context, nullptr);
state_machine->init();
String user_auth_data = "test_user:test_password";
@ -2017,11 +2026,10 @@ TEST_P(CoordinationTest, TestSetACLWithAuthSchemeForAclWhenAuthIsPrecommitted)
ChangelogDirTest snapshots("./snapshots");
setSnapshotDirectory("./snapshots");
CoordinationSettingsPtr settings = std::make_shared<CoordinationSettings>();
ResponsesQueue queue(std::numeric_limits<size_t>::max());
SnapshotsQueue snapshots_queue{1};
auto state_machine = std::make_shared<KeeperStateMachine>(queue, snapshots_queue, settings, keeper_context, nullptr);
auto state_machine = std::make_shared<KeeperStateMachine>(queue, snapshots_queue, keeper_context, nullptr);
state_machine->init();
String user_auth_data = "test_user:test_password";
@ -2132,6 +2140,7 @@ TEST_P(CoordinationTest, TestRotateIntervalChanges)
waitDurableLogs(changelog_2);
keeper_context->setLastCommitIndex(105);
changelog_2.compact(105);
std::this_thread::sleep_for(std::chrono::microseconds(1000));
@ -2157,6 +2166,7 @@ TEST_P(CoordinationTest, TestRotateIntervalChanges)
waitDurableLogs(changelog_3);
keeper_context->setLastCommitIndex(125);
changelog_3.compact(125);
std::this_thread::sleep_for(std::chrono::microseconds(1000));
assertFileDeleted("./logs/changelog_101_110.bin" + params.extension);

View File

@ -40,7 +40,7 @@ bool PacketEndpoint::tryReceivePacket(IMySQLReadPacket & packet, UInt64 millisec
ReadBufferFromPocoSocket * socket_in = typeid_cast<ReadBufferFromPocoSocket *>(in);
if (!socket_in)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to pull the duration in a non socket stream");
throw Exception(ErrorCodes::LOGICAL_ERROR, "LOGICAL ERROR: Attempt to pull the duration in a non socket stream");
if (!socket_in->poll(millisecond * 1000))
return false;

Some files were not shown because too many files have changed in this diff Show More