Merge remote-tracking branch 'origin/master' into parallel-replicas-in-subquery

This commit is contained in:
Igor Nikonov 2023-12-22 17:54:45 +00:00
commit d3a0869119
96 changed files with 1256 additions and 552 deletions

View File

@ -157,7 +157,8 @@ jobs:
##################################### BUILD REPORTER #######################################
############################################################################################
BuilderReport:
if: ${{ !failure() && !cancelled() }}
# run report check for failed builds to indicate the CI error
if: ${{ !cancelled() }}
needs:
- RunConfig
- BuilderDebAarch64
@ -177,7 +178,8 @@ jobs:
run_command: |
python3 build_report_check.py "$CHECK_NAME"
BuilderSpecialReport:
if: ${{ !failure() && !cancelled() }}
# run report check for failed builds to indicate the CI error
if: ${{ !cancelled() }}
needs:
- RunConfig
- BuilderBinDarwin

View File

@ -262,6 +262,8 @@ jobs:
##################################### BUILD REPORTER #######################################
############################################################################################
BuilderReport:
# run report check for failed builds to indicate the CI error
if: ${{ !cancelled() }}
needs:
- RunConfig
- BuilderBinRelease
@ -272,7 +274,6 @@ jobs:
- BuilderDebRelease
- BuilderDebTsan
- BuilderDebUBsan
if: ${{ !failure() && !cancelled() }}
uses: ./.github/workflows/reusable_test.yml
with:
test_name: ClickHouse build check
@ -285,7 +286,8 @@ jobs:
run_command: |
python3 build_report_check.py "$CHECK_NAME"
BuilderSpecialReport:
if: ${{ !failure() && !cancelled() }}
# run report check for failed builds to indicate the CI error
if: ${{ !cancelled() }}
needs:
- RunConfig
- BuilderBinAarch64

View File

@ -291,6 +291,8 @@ jobs:
##################################### BUILD REPORTER #######################################
############################################################################################
BuilderReport:
# run report check for failed builds to indicate the CI error
if: ${{ !cancelled() }}
needs:
- RunConfig
- BuilderBinRelease
@ -301,7 +303,6 @@ jobs:
- BuilderDebRelease
- BuilderDebTsan
- BuilderDebUBsan
if: ${{ !failure() && !cancelled() }}
uses: ./.github/workflows/reusable_test.yml
with:
test_name: ClickHouse build check
@ -314,7 +315,8 @@ jobs:
run_command: |
python3 build_report_check.py "$CHECK_NAME"
BuilderSpecialReport:
if: ${{ !failure() && !cancelled() }}
# run report check for failed builds to indicate the CI error
if: ${{ !cancelled() }}
needs:
- RunConfig
- BuilderBinAarch64

View File

@ -172,6 +172,8 @@ jobs:
##################################### BUILD REPORTER #######################################
############################################################################################
BuilderReport:
# run report check for failed builds to indicate the CI error
if: ${{ !cancelled() }}
needs:
- RunConfig
- BuilderDebRelease
@ -181,7 +183,6 @@ jobs:
- BuilderDebUBsan
- BuilderDebMsan
- BuilderDebDebug
if: ${{ !failure() && !cancelled() }}
uses: ./.github/workflows/reusable_test.yml
with:
test_name: ClickHouse build check
@ -194,7 +195,8 @@ jobs:
run_command: |
python3 build_report_check.py "$CHECK_NAME"
BuilderSpecialReport:
if: ${{ !failure() && !cancelled() }}
# run report check for failed builds to indicate the CI error
if: ${{ !cancelled() }}
needs:
- RunConfig
- BuilderDebRelease

View File

@ -76,6 +76,8 @@ jobs:
run: |
python3 "$GITHUB_WORKSPACE/tests/ci/build_check.py" "$BUILD_NAME"
- name: Post
# it still be build report to upload for failed build job
if: always()
run: |
python3 "$GITHUB_WORKSPACE/tests/ci/ci.py" --infile ${{ toJson(inputs.data) }} --post --job-name '${{inputs.build_name}}'
- name: Mark as done

View File

@ -70,6 +70,15 @@ public:
int queryConvert(const unsigned char * bytes, int length) const;
int sequenceLength(const unsigned char * bytes, int length) const;
protected:
static int safeToInt(Poco::UInt32 value)
{
if (value <= 0x10FFFF)
return static_cast<int>(value);
else
return -1;
}
private:
bool _flipBytes;
static const char * _names[];

View File

@ -30,22 +30,22 @@ const char* UTF32Encoding::_names[] =
const TextEncoding::CharacterMap UTF32Encoding::_charMap =
{
/* 00 */ -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2,
/* 10 */ -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2,
/* 20 */ -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2,
/* 30 */ -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2,
/* 40 */ -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2,
/* 50 */ -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2,
/* 60 */ -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2,
/* 70 */ -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2,
/* 80 */ -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2,
/* 90 */ -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2,
/* a0 */ -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2,
/* b0 */ -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2,
/* c0 */ -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2,
/* d0 */ -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2,
/* e0 */ -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2,
/* f0 */ -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2,
/* 00 */ -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4,
/* 10 */ -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4,
/* 20 */ -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4,
/* 30 */ -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4,
/* 40 */ -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4,
/* 50 */ -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4,
/* 60 */ -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4,
/* 70 */ -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4,
/* 80 */ -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4,
/* 90 */ -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4,
/* a0 */ -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4,
/* b0 */ -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4,
/* c0 */ -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4,
/* d0 */ -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4,
/* e0 */ -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4,
/* f0 */ -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4,
};
@ -118,7 +118,7 @@ const TextEncoding::CharacterMap& UTF32Encoding::characterMap() const
int UTF32Encoding::convert(const unsigned char* bytes) const
{
UInt32 uc;
unsigned char* p = (unsigned char*) &uc;
unsigned char* p = reinterpret_cast<unsigned char*>(&uc);
*p++ = *bytes++;
*p++ = *bytes++;
*p++ = *bytes++;
@ -129,7 +129,7 @@ int UTF32Encoding::convert(const unsigned char* bytes) const
ByteOrder::flipBytes(uc);
}
return uc;
return safeToInt(uc);
}
@ -138,7 +138,7 @@ int UTF32Encoding::convert(int ch, unsigned char* bytes, int length) const
if (bytes && length >= 4)
{
UInt32 ch1 = _flipBytes ? ByteOrder::flipBytes((UInt32) ch) : (UInt32) ch;
unsigned char* p = (unsigned char*) &ch1;
unsigned char* p = reinterpret_cast<unsigned char*>(&ch1);
*bytes++ = *p++;
*bytes++ = *p++;
*bytes++ = *p++;
@ -155,14 +155,14 @@ int UTF32Encoding::queryConvert(const unsigned char* bytes, int length) const
if (length >= 4)
{
UInt32 uc;
unsigned char* p = (unsigned char*) &uc;
unsigned char* p = reinterpret_cast<unsigned char*>(&uc);
*p++ = *bytes++;
*p++ = *bytes++;
*p++ = *bytes++;
*p++ = *bytes++;
if (_flipBytes)
ByteOrder::flipBytes(uc);
return uc;
ret = safeToInt(uc);
}
return ret;

2
contrib/azure vendored

@ -1 +1 @@
Subproject commit a852d81f92f153e109de165ee08546741e3f2a68
Subproject commit 060c54dfb0abe869c065143303a9d3e9c54c29e3

View File

@ -8,31 +8,21 @@ endif()
set(AZURE_DIR "${ClickHouse_SOURCE_DIR}/contrib/azure")
set(AZURE_SDK_LIBRARY_DIR "${AZURE_DIR}/sdk")
file(GLOB AZURE_SDK_CORE_SRC
file(GLOB AZURE_SDK_SRC
"${AZURE_SDK_LIBRARY_DIR}/core/azure-core/src/*.cpp"
"${AZURE_SDK_LIBRARY_DIR}/core/azure-core/src/cryptography/*.cpp"
"${AZURE_SDK_LIBRARY_DIR}/core/azure-core/src/http/*.cpp"
"${AZURE_SDK_LIBRARY_DIR}/core/azure-core/src/http/curl/*.cpp"
"${AZURE_SDK_LIBRARY_DIR}/core/azure-core/src/io/*.cpp"
)
file(GLOB AZURE_SDK_IDENTITY_SRC
"${AZURE_SDK_LIBRARY_DIR}/core/azure-core/src/tracing/*.cpp"
"${AZURE_SDK_LIBRARY_DIR}/identity/azure-identity/src/*.cpp"
)
file(GLOB AZURE_SDK_STORAGE_COMMON_SRC
"${AZURE_SDK_LIBRARY_DIR}/storage/azure-storage-blobs/src/*.cpp"
"${AZURE_SDK_LIBRARY_DIR}/storage/azure-storage-blobs/src/private/*.cpp"
"${AZURE_SDK_LIBRARY_DIR}/storage/azure-storage-common/src/*.cpp"
)
file(GLOB AZURE_SDK_STORAGE_BLOBS_SRC
"${AZURE_SDK_LIBRARY_DIR}/storage/azure-storage-blobs/src/*.cpp"
)
file(GLOB AZURE_SDK_UNIFIED_SRC
${AZURE_SDK_CORE_SRC}
${AZURE_SDK_IDENTITY_SRC}
${AZURE_SDK_STORAGE_COMMON_SRC}
${AZURE_SDK_STORAGE_BLOBS_SRC}
${AZURE_SDK_SRC}
)
set(AZURE_SDK_INCLUDES

View File

@ -34,7 +34,7 @@ services:
# Empty container to run proxy resolver.
resolver:
image: clickhouse/python-bottle
image: clickhouse/python-bottle:${DOCKER_PYTHON_BOTTLE_TAG:-latest}
expose:
- "8080"
tty: true

View File

@ -11,7 +11,7 @@ Inserts data into a table.
**Syntax**
``` sql
INSERT INTO [TABLE] [db.]table [(c1, c2, c3)] VALUES (v11, v12, v13), (v21, v22, v23), ...
INSERT INTO [TABLE] [db.]table [(c1, c2, c3)] [SETTINGS ...] VALUES (v11, v12, v13), (v21, v22, v23), ...
```
You can specify a list of columns to insert using the `(c1, c2, c3)`. You can also use an expression with column [matcher](../../sql-reference/statements/select/index.md#asterisk) such as `*` and/or [modifiers](../../sql-reference/statements/select/index.md#select-modifiers) such as [APPLY](../../sql-reference/statements/select/index.md#apply-modifier), [EXCEPT](../../sql-reference/statements/select/index.md#except-modifier), [REPLACE](../../sql-reference/statements/select/index.md#replace-modifier).
@ -126,7 +126,7 @@ To insert a default value instead of `NULL` into a column with not nullable data
**Syntax**
``` sql
INSERT INTO [TABLE] [db.]table [(c1, c2, c3)] FROM INFILE file_name [COMPRESSION type] FORMAT format_name
INSERT INTO [TABLE] [db.]table [(c1, c2, c3)] FROM INFILE file_name [COMPRESSION type] [SETTINGS ...] [FORMAT format_name]
```
Use the syntax above to insert data from a file, or files, stored on the **client** side. `file_name` and `type` are string literals. Input file [format](../../interfaces/formats.md) must be set in the `FORMAT` clause.

View File

@ -43,6 +43,7 @@ Additional join types available in ClickHouse:
- `LEFT ANTI JOIN` and `RIGHT ANTI JOIN`, a blacklist on “join keys”, without producing a cartesian product.
- `LEFT ANY JOIN`, `RIGHT ANY JOIN` and `INNER ANY JOIN`, partially (for opposite side of `LEFT` and `RIGHT`) or completely (for `INNER` and `FULL`) disables the cartesian product for standard `JOIN` types.
- `ASOF JOIN` and `LEFT ASOF JOIN`, joining sequences with a non-exact match. `ASOF JOIN` usage is described below.
- `PASTE JOIN`, performs a horizontal concatenation of two tables.
:::note
When [join_algorithm](../../../operations/settings/settings.md#join_algorithm) is set to `partial_merge`, `RIGHT JOIN` and `FULL JOIN` are supported only with `ALL` strictness (`SEMI`, `ANTI`, `ANY`, and `ASOF` are not supported).
@ -269,6 +270,33 @@ For example, consider the following tables:
`ASOF` join is **not** supported in the [Join](../../../engines/table-engines/special/join.md) table engine.
:::
## PASTE JOIN Usage
The result of `PASTE JOIN` is a table that contains all columns from left subquery followed by all columns from the right subquery.
The rows are matched based on their positions in the original tables (the order of rows should be defined).
If the subqueries return a different number of rows, extra rows will be cut.
Example:
```SQL
SELECT *
FROM
(
SELECT number AS a
FROM numbers(2)
) AS t1
PASTE JOIN
(
SELECT number AS a
FROM numbers(2)
ORDER BY a DESC
) AS t2
┌─a─┬─t2.a─┐
│ 0 │ 1 │
│ 1 │ 0 │
└───┴──────┘
```
## Distributed JOIN
There are two ways to execute join involving distributed tables:

View File

@ -4,6 +4,7 @@
#include <Analyzer/FunctionNode.h>
#include <Analyzer/IQueryTreeNode.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Analyzer/Utils.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeEnum.h>
@ -41,22 +42,6 @@ DataTypePtr getEnumType(const std::set<std::string> & string_values)
return getDataEnumType<DataTypeEnum8>(string_values);
}
QueryTreeNodePtr createCastFunction(QueryTreeNodePtr from, DataTypePtr result_type, ContextPtr context)
{
auto enum_literal = std::make_shared<ConstantValue>(result_type->getName(), std::make_shared<DataTypeString>());
auto enum_literal_node = std::make_shared<ConstantNode>(std::move(enum_literal));
auto cast_function = FunctionFactory::instance().get("_CAST", std::move(context));
QueryTreeNodes arguments{ std::move(from), std::move(enum_literal_node) };
auto function_node = std::make_shared<FunctionNode>("_CAST");
function_node->getArguments().getNodes() = std::move(arguments);
function_node->resolveAsFunction(cast_function->build(function_node->getArgumentColumns()));
return function_node;
}
/// if(arg1, arg2, arg3) will be transformed to if(arg1, _CAST(arg2, Enum...), _CAST(arg3, Enum...))
/// where Enum is generated based on the possible values stored in string_values
void changeIfArguments(

View File

@ -9,6 +9,8 @@
#include <Analyzer/HashUtils.h>
#include <Analyzer/Utils.h>
#include <DataTypes/DataTypeLowCardinality.h>
namespace DB
{
@ -323,8 +325,21 @@ private:
/// Because we reduce the number of operands here by eliminating the same equality checks,
/// the only situation we can end up here is we had AND check where all the equality checks are the same so we know the type is UInt8.
/// Otherwise, we will have > 1 operands and we don't have to do anything.
assert(!function_node.getResultType()->isNullable() && and_operands[0]->getResultType()->equals(*function_node.getResultType()));
node = std::move(and_operands[0]);
auto operand_type = and_operands[0]->getResultType();
auto function_type = function_node.getResultType();
assert(!function_type->isNullable());
if (!function_type->equals(*operand_type))
{
/// Result of equality operator can be low cardinality, while AND always returns UInt8.
/// In that case we replace `(lc = 1) AND (lc = 1)` with `(lc = 1) AS UInt8`
assert(function_type->equals(*removeLowCardinality(operand_type)));
node = createCastFunction(std::move(and_operands[0]), function_type, getContext());
}
else
{
node = std::move(and_operands[0]);
}
return;
}
@ -389,11 +404,14 @@ private:
continue;
}
bool is_any_nullable = false;
Tuple args;
args.reserve(equals_functions.size());
/// first we create tuple from RHS of equals functions
for (const auto & equals : equals_functions)
{
is_any_nullable |= equals->getResultType()->isNullable();
const auto * equals_function = equals->as<FunctionNode>();
assert(equals_function && equals_function->getFunctionName() == "equals");
@ -421,8 +439,20 @@ private:
in_function->getArguments().getNodes() = std::move(in_arguments);
in_function->resolveAsFunction(in_function_resolver);
or_operands.push_back(std::move(in_function));
/** For `k :: UInt8`, expression `k = 1 OR k = NULL` with result type Nullable(UInt8)
* is replaced with `k IN (1, NULL)` with result type UInt8.
* Convert it back to Nullable(UInt8).
*/
if (is_any_nullable && !in_function->getResultType()->isNullable())
{
auto nullable_result_type = std::make_shared<DataTypeNullable>(in_function->getResultType());
auto in_function_nullable = createCastFunction(std::move(in_function), std::move(nullable_result_type), getContext());
or_operands.push_back(std::move(in_function_nullable));
}
else
{
or_operands.push_back(std::move(in_function));
}
}
if (or_operands.size() == function_node.getArguments().getNodes().size())

View File

@ -667,4 +667,20 @@ NameSet collectIdentifiersFullNames(const QueryTreeNodePtr & node)
return out;
}
QueryTreeNodePtr createCastFunction(QueryTreeNodePtr node, DataTypePtr result_type, ContextPtr context)
{
auto enum_literal = std::make_shared<ConstantValue>(result_type->getName(), std::make_shared<DataTypeString>());
auto enum_literal_node = std::make_shared<ConstantNode>(std::move(enum_literal));
auto cast_function = FunctionFactory::instance().get("_CAST", std::move(context));
QueryTreeNodes arguments{ std::move(node), std::move(enum_literal_node) };
auto function_node = std::make_shared<FunctionNode>("_CAST");
function_node->getArguments().getNodes() = std::move(arguments);
function_node->resolveAsFunction(cast_function->build(function_node->getArgumentColumns()));
return function_node;
}
}

View File

@ -99,4 +99,7 @@ void rerunFunctionResolve(FunctionNode * function_node, ContextPtr context);
/// Just collect all identifiers from query tree
NameSet collectIdentifiersFullNames(const QueryTreeNodePtr & node);
/// Wrap node into `_CAST` function
QueryTreeNodePtr createCastFunction(QueryTreeNodePtr node, DataTypePtr result_type, ContextPtr context);
}

View File

@ -88,18 +88,19 @@ BackupEntriesCollector::BackupEntriesCollector(
, read_settings(read_settings_)
, context(context_)
, on_cluster_first_sync_timeout(context->getConfigRef().getUInt64("backups.on_cluster_first_sync_timeout", 180000))
, collect_metadata_timeout(context->getConfigRef().getUInt64("backups.collect_metadata_timeout", context->getConfigRef().getUInt64("backups.consistent_metadata_snapshot_timeout", 600000)))
, collect_metadata_timeout(context->getConfigRef().getUInt64(
"backups.collect_metadata_timeout", context->getConfigRef().getUInt64("backups.consistent_metadata_snapshot_timeout", 600000)))
, attempts_to_collect_metadata_before_sleep(context->getConfigRef().getUInt("backups.attempts_to_collect_metadata_before_sleep", 2))
, min_sleep_before_next_attempt_to_collect_metadata(context->getConfigRef().getUInt64("backups.min_sleep_before_next_attempt_to_collect_metadata", 100))
, max_sleep_before_next_attempt_to_collect_metadata(context->getConfigRef().getUInt64("backups.max_sleep_before_next_attempt_to_collect_metadata", 5000))
, min_sleep_before_next_attempt_to_collect_metadata(
context->getConfigRef().getUInt64("backups.min_sleep_before_next_attempt_to_collect_metadata", 100))
, max_sleep_before_next_attempt_to_collect_metadata(
context->getConfigRef().getUInt64("backups.max_sleep_before_next_attempt_to_collect_metadata", 5000))
, compare_collected_metadata(context->getConfigRef().getBool("backups.compare_collected_metadata", true))
, log(&Poco::Logger::get("BackupEntriesCollector"))
, global_zookeeper_retries_info(
"BackupEntriesCollector",
log,
context->getSettingsRef().backup_restore_keeper_max_retries,
context->getSettingsRef().backup_restore_keeper_retry_initial_backoff_ms,
context->getSettingsRef().backup_restore_keeper_retry_max_backoff_ms)
context->getSettingsRef().backup_restore_keeper_max_retries,
context->getSettingsRef().backup_restore_keeper_retry_initial_backoff_ms,
context->getSettingsRef().backup_restore_keeper_retry_max_backoff_ms)
, threadpool(threadpool_)
{
}
@ -572,7 +573,7 @@ std::vector<std::pair<ASTPtr, StoragePtr>> BackupEntriesCollector::findTablesInD
{
/// Database or table could be replicated - so may use ZooKeeper. We need to retry.
auto zookeeper_retries_info = global_zookeeper_retries_info;
ZooKeeperRetriesControl retries_ctl("getTablesForBackup", zookeeper_retries_info, nullptr);
ZooKeeperRetriesControl retries_ctl("getTablesForBackup", log, zookeeper_retries_info, nullptr);
retries_ctl.retryLoop([&](){ db_tables = database->getTablesForBackup(filter_by_table_name, context); });
}
catch (Exception & e)

View File

@ -157,11 +157,16 @@ BackupImpl::~BackupImpl()
void BackupImpl::open()
{
std::lock_guard lock{mutex};
LOG_INFO(log, "{} backup: {}", ((open_mode == OpenMode::WRITE) ? "Writing" : "Reading"), backup_name_for_logging);
ProfileEvents::increment((open_mode == OpenMode::WRITE) ? ProfileEvents::BackupsOpenedForWrite : ProfileEvents::BackupsOpenedForRead);
if (open_mode == OpenMode::WRITE)
if (open_mode == OpenMode::READ)
{
ProfileEvents::increment(ProfileEvents::BackupsOpenedForRead);
LOG_INFO(log, "Reading backup: {}", backup_name_for_logging);
}
else
{
ProfileEvents::increment(ProfileEvents::BackupsOpenedForWrite);
LOG_INFO(log, "Writing backup: {}", backup_name_for_logging);
timestamp = std::time(nullptr);
if (!uuid)
uuid = UUIDHelpers::generateV4();

View File

@ -43,14 +43,6 @@ namespace Stage = BackupCoordinationStage;
namespace
{
/// Uppercases the first character of a passed string.
String toUpperFirst(const String & str)
{
String res = str;
res[0] = std::toupper(res[0]);
return res;
}
/// Outputs "table <name>" or "temporary table <name>"
String tableNameWithTypeToString(const String & database_name, const String & table_name, bool first_upper)
{
@ -145,7 +137,7 @@ RestorerFromBackup::DataRestoreTasks RestorerFromBackup::run(Mode mode)
void RestorerFromBackup::setStage(const String & new_stage, const String & message)
{
LOG_TRACE(log, fmt::runtime(toUpperFirst(new_stage)));
LOG_TRACE(log, "Setting stage: {}", new_stage);
current_stage = new_stage;
if (restore_coordination)

View File

@ -20,22 +20,19 @@ WithRetries::KeeperSettings WithRetries::KeeperSettings::fromContext(ContextPtr
};
}
WithRetries::WithRetries(Poco::Logger * log_, zkutil::GetZooKeeper get_zookeeper_, const KeeperSettings & settings_, RenewerCallback callback_)
WithRetries::WithRetries(
Poco::Logger * log_, zkutil::GetZooKeeper get_zookeeper_, const KeeperSettings & settings_, RenewerCallback callback_)
: log(log_)
, get_zookeeper(get_zookeeper_)
, settings(settings_)
, callback(callback_)
, global_zookeeper_retries_info(
log->name(),
log,
settings.keeper_max_retries,
settings.keeper_retry_initial_backoff_ms,
settings.keeper_retry_max_backoff_ms)
settings.keeper_max_retries, settings.keeper_retry_initial_backoff_ms, settings.keeper_retry_max_backoff_ms)
{}
WithRetries::RetriesControlHolder::RetriesControlHolder(const WithRetries * parent, const String & name)
: info(parent->global_zookeeper_retries_info)
, retries_ctl(name, info, nullptr)
, retries_ctl(name, parent->log, info, nullptr)
, faulty_zookeeper(parent->getFaultyZooKeeper())
{}

View File

@ -48,7 +48,7 @@ Suggest::Suggest()
"GRANT", "REVOKE", "OPTION", "ADMIN", "EXCEPT", "REPLACE", "IDENTIFIED", "HOST",
"NAME", "READONLY", "WRITABLE", "PERMISSIVE", "FOR", "RESTRICTIVE", "RANDOMIZED", "INTERVAL",
"LIMITS", "ONLY", "TRACKING", "IP", "REGEXP", "ILIKE", "CLEANUP", "APPEND",
"IGNORE NULLS", "RESPECT NULLS", "OVER"});
"IGNORE NULLS", "RESPECT NULLS", "OVER", "PASTE"});
}
static String getLoadSuggestionQuery(Int32 suggestion_limit, bool basic_suggestion)

View File

@ -18,16 +18,37 @@ template <typename T>
static inline String formatQuoted(T x)
{
WriteBufferFromOwnString wb;
writeQuoted(x, wb);
return wb.str();
}
template <typename T>
static inline void writeQuoted(const DecimalField<T> & x, WriteBuffer & buf)
{
writeChar('\'', buf);
writeText(x.getValue(), x.getScale(), buf, {});
writeChar('\'', buf);
if constexpr (is_decimal_field<T>)
{
writeChar('\'', wb);
writeText(x.getValue(), x.getScale(), wb, {});
writeChar('\'', wb);
}
else if constexpr (is_big_int_v<T>)
{
writeChar('\'', wb);
writeText(x, wb);
writeChar('\'', wb);
}
else
{
/// While `writeQuoted` sounds like it will always write the value in quotes,
/// in fact it means: write according to the rules of the quoted format, like VALUES,
/// where strings, dates, date-times, UUID are in quotes, and numbers are not.
/// That's why we take extra care to put Decimal and big integers inside quotes
/// when formatting literals in SQL language,
/// because it is different from the quoted formats like VALUES.
/// In fact, there are no Decimal and big integer literals in SQL,
/// but they can appear if we format the query from a modified AST.
/// We can fix this idiosyncrasy later.
writeQuoted(x, wb);
}
return wb.str();
}
/** In contrast to writeFloatText (and writeQuoted),

View File

@ -13,6 +13,7 @@ const char * toString(JoinKind kind)
case JoinKind::Full: return "FULL";
case JoinKind::Cross: return "CROSS";
case JoinKind::Comma: return "COMMA";
case JoinKind::Paste: return "PASTE";
}
};

View File

@ -13,7 +13,8 @@ enum class JoinKind
Right,
Full,
Cross, /// Direct product. Strictness and condition doesn't matter.
Comma /// Same as direct product. Intended to be converted to INNER JOIN with conditions from WHERE.
Comma, /// Same as direct product. Intended to be converted to INNER JOIN with conditions from WHERE.
Paste, /// Used to join parts without `ON` clause.
};
const char * toString(JoinKind kind);
@ -27,6 +28,7 @@ inline constexpr bool isRightOrFull(JoinKind kind) { return kind == JoinKind::R
inline constexpr bool isLeftOrFull(JoinKind kind) { return kind == JoinKind::Left || kind == JoinKind::Full; }
inline constexpr bool isInnerOrRight(JoinKind kind) { return kind == JoinKind::Inner || kind == JoinKind::Right; }
inline constexpr bool isInnerOrLeft(JoinKind kind) { return kind == JoinKind::Inner || kind == JoinKind::Left; }
inline constexpr bool isPaste(JoinKind kind) { return kind == JoinKind::Paste; }
/// Allows more optimal JOIN for typical cases.
enum class JoinStrictness

View File

@ -373,7 +373,7 @@ void DiskLocal::removeDirectory(const String & path)
{
auto fs_path = fs::path(disk_path) / path;
if (0 != rmdir(fs_path.c_str()))
ErrnoException::throwFromPath(ErrorCodes::CANNOT_RMDIR, fs_path, "Cannot rmdir {}", fs_path);
ErrnoException::throwFromPath(ErrorCodes::CANNOT_RMDIR, fs_path, "Cannot remove directory {}", fs_path);
}
void DiskLocal::removeRecursive(const String & path)

View File

@ -172,7 +172,7 @@ struct SHA512Impl256
/// SSL library that we use, for S390X architecture only OpenSSL is supported. But the SHA512-256, SHA512_256_Init,
/// SHA512_256_Update, SHA512_256_Final methods to calculate hash (similar to the other SHA functions) aren't available
/// in the current version of OpenSSL that we use which necessitates the use of the EVP interface.
auto md_ctx = EVP_MD_CTX_create();
auto * md_ctx = EVP_MD_CTX_create();
EVP_DigestInit_ex(md_ctx, EVP_sha512_256(), nullptr /*engine*/);
EVP_DigestUpdate(md_ctx, begin, size);
EVP_DigestFinal_ex(md_ctx, out_char_data, nullptr /*size*/);

View File

@ -56,6 +56,7 @@
#include <Core/Names.h>
#include <Core/NamesAndTypes.h>
#include <Common/logger_useful.h>
#include <Interpreters/PasteJoin.h>
#include <QueryPipeline/SizeLimits.h>
@ -951,6 +952,9 @@ static std::shared_ptr<IJoin> tryCreateJoin(
std::unique_ptr<QueryPlan> & joined_plan,
ContextPtr context)
{
if (analyzed_join->kind() == JoinKind::Paste)
return std::make_shared<PasteJoin>(analyzed_join, right_sample_block);
if (algorithm == JoinAlgorithm::DIRECT || algorithm == JoinAlgorithm::DEFAULT)
{
JoinPtr direct_join = tryKeyValueJoin(analyzed_join, right_sample_block);

View File

@ -1729,7 +1729,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
return step_raw_ptr;
};
if (expressions.join->pipelineType() == JoinPipelineType::YShaped)
if (expressions.join->pipelineType() == JoinPipelineType::YShaped && expressions.join->getTableJoin().kind() != JoinKind::Paste)
{
const auto & table_join = expressions.join->getTableJoin();
const auto & join_clause = table_join.getOnlyClause();

View File

@ -345,27 +345,6 @@ ColumnRawPtrs getRawPointers(const Columns & columns)
return ptrs;
}
void convertToFullColumnsInplace(Block & block)
{
for (size_t i = 0; i < block.columns(); ++i)
{
auto & col = block.getByPosition(i);
col.column = recursiveRemoveLowCardinality(recursiveRemoveSparse(col.column));
col.type = recursiveRemoveLowCardinality(col.type);
}
}
void convertToFullColumnsInplace(Block & block, const Names & names, bool change_type)
{
for (const String & column_name : names)
{
auto & col = block.getByName(column_name);
col.column = recursiveRemoveLowCardinality(recursiveRemoveSparse(col.column));
if (change_type)
col.type = recursiveRemoveLowCardinality(col.type);
}
}
void restoreLowCardinalityInplace(Block & block, const Names & lowcard_keys)
{
for (const auto & column_name : lowcard_keys)
@ -495,8 +474,8 @@ void addDefaultValues(IColumn & column, const DataTypePtr & type, size_t count)
bool typesEqualUpToNullability(DataTypePtr left_type, DataTypePtr right_type)
{
DataTypePtr left_type_strict = removeNullable(recursiveRemoveLowCardinality(left_type));
DataTypePtr right_type_strict = removeNullable(recursiveRemoveLowCardinality(right_type));
DataTypePtr left_type_strict = removeNullable(removeLowCardinality(left_type));
DataTypePtr right_type_strict = removeNullable(removeLowCardinality(right_type));
return left_type_strict->equals(*right_type_strict);
}

View File

@ -71,8 +71,6 @@ ColumnPtr materializeColumn(const Block & block, const String & name);
Columns materializeColumns(const Block & block, const Names & names);
ColumnRawPtrs materializeColumnsInplace(Block & block, const Names & names);
ColumnRawPtrs getRawPointers(const Columns & columns);
void convertToFullColumnsInplace(Block & block);
void convertToFullColumnsInplace(Block & block, const Names & names, bool change_type = true);
void restoreLowCardinalityInplace(Block & block, const Names & lowcard_keys);
ColumnRawPtrs extractKeysForJoin(const Block & block_keys, const Names & key_names_right);

View File

@ -138,6 +138,9 @@ Block extractMinMax(const Block & block, const Block & keys)
}
min_max.setColumns(std::move(columns));
for (auto & column : min_max)
column.column = column.column->convertToFullColumnIfLowCardinality();
return min_max;
}
@ -224,6 +227,16 @@ public:
MergeJoinCursor(const Block & block, const SortDescription & desc_)
: impl(block, desc_)
{
for (auto *& column : impl.sort_columns)
{
const auto * lowcard_column = typeid_cast<const ColumnLowCardinality *>(column);
if (lowcard_column)
{
auto & new_col = column_holder.emplace_back(lowcard_column->convertToFullColumn());
column = new_col.get();
}
}
/// SortCursorImpl can work with permutation, but MergeJoinCursor can't.
if (impl.permutation)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: MergeJoinCursor doesn't support permutation");
@ -287,6 +300,7 @@ public:
private:
SortCursorImpl impl;
Columns column_holder;
bool has_left_nullable = false;
bool has_right_nullable = false;
@ -537,9 +551,6 @@ MergeJoin::MergeJoin(std::shared_ptr<TableJoin> table_join_, const Block & right
lowcard_right_keys.push_back(right_key);
}
JoinCommon::convertToFullColumnsInplace(right_table_keys);
JoinCommon::convertToFullColumnsInplace(right_sample_block, key_names_right);
for (const auto & column : right_table_keys)
if (required_right_keys.contains(column.name))
right_columns_to_add.insert(ColumnWithTypeAndName{nullptr, column.type, column.name});
@ -662,9 +673,7 @@ bool MergeJoin::saveRightBlock(Block && block)
Block MergeJoin::modifyRightBlock(const Block & src_block) const
{
Block block = materializeBlock(src_block);
JoinCommon::convertToFullColumnsInplace(block, table_join->getOnlyClause().key_names_right);
return block;
return materializeBlock(src_block);
}
bool MergeJoin::addBlockToJoin(const Block & src_block, bool)
@ -705,8 +714,6 @@ void MergeJoin::joinBlock(Block & block, ExtraBlockPtr & not_processed)
lowcard_keys.push_back(column_name);
}
JoinCommon::convertToFullColumnsInplace(block, key_names_left, false);
sortBlock(block, left_sort_description);
}
@ -739,8 +746,6 @@ void MergeJoin::joinBlock(Block & block, ExtraBlockPtr & not_processed)
if (needConditionJoinColumn())
block.erase(deriveTempName(mask_column_name_left, JoinTableSide::Left));
JoinCommon::restoreLowCardinalityInplace(block, lowcard_keys);
}
template <bool in_memory, bool is_all>

View File

@ -0,0 +1,96 @@
#pragma once
#include <Interpreters/IJoin.h>
#include <Interpreters/TableJoin.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <Common/logger_useful.h>
#include <Poco/Logger.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int NOT_IMPLEMENTED;
}
/// Dummy class, actual joining is done by MergeTransform
class PasteJoin : public IJoin
{
public:
explicit PasteJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block_)
: table_join(table_join_)
, right_sample_block(right_sample_block_)
{
LOG_TRACE(&Poco::Logger::get("PasteJoin"), "Will use paste join");
}
std::string getName() const override { return "PasteJoin"; }
const TableJoin & getTableJoin() const override { return *table_join; }
bool addBlockToJoin(const Block & /* block */, bool /* check_limits */) override
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "PasteJoin::addBlockToJoin should not be called");
}
static bool isSupported(const std::shared_ptr<TableJoin> & table_join)
{
bool support_storage = !table_join->isSpecialStorage();
/// Key column can change nullability and it's not handled on type conversion stage, so algorithm should be aware of it
bool support_using = !table_join->hasUsing();
bool check_strictness = table_join->strictness() == JoinStrictness::All;
bool if_has_keys = table_join->getClauses().empty();
return support_using && support_storage && check_strictness && if_has_keys;
}
void checkTypesOfKeys(const Block & /*left_block*/) const override
{
if (!isSupported(table_join))
throw DB::Exception(ErrorCodes::NOT_IMPLEMENTED, "PasteJoin doesn't support specified query");
}
/// Used just to get result header
void joinBlock(Block & block, std::shared_ptr<ExtraBlock> & /* not_processed */) override
{
for (const auto & col : right_sample_block)
block.insert(col);
block = materializeBlock(block).cloneEmpty();
}
void setTotals(const Block & block) override { totals = block; }
const Block & getTotals() const override { return totals; }
size_t getTotalRowCount() const override
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "PasteJoin::getTotalRowCount should not be called");
}
size_t getTotalByteCount() const override
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "PasteJoin::getTotalByteCount should not be called");
}
bool alwaysReturnsEmptySet() const override { return false; }
IBlocksStreamPtr
getNonJoinedBlocks(const Block & /* left_sample_block */, const Block & /* result_sample_block */, UInt64 /* max_block_size */) const override
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "PasteJoin::getNonJoinedBlocks should not be called");
}
/// Left and right streams have the same priority and are processed simultaneously
JoinPipelineType pipelineType() const override { return JoinPipelineType::YShaped; }
private:
std::shared_ptr<TableJoin> table_join;
Block right_sample_block;
Block totals;
};
}

View File

@ -34,6 +34,7 @@
#include <type_traits>
#include <vector>
#include <DataTypes/DataTypeLowCardinality.h>
namespace DB
{
@ -375,7 +376,7 @@ void TableJoin::addJoinedColumnsAndCorrectTypesImpl(TColumns & left_columns, boo
* For `JOIN ON expr1 == expr2` we will infer common type later in makeTableJoin,
* when part of plan built and types of expression will be known.
*/
inferJoinKeyCommonType(left_columns, columns_from_joined_table, !isSpecialStorage(), isEnabledAlgorithm(JoinAlgorithm::FULL_SORTING_MERGE));
inferJoinKeyCommonType(left_columns, columns_from_joined_table, !isSpecialStorage());
if (auto it = left_type_map.find(col.name); it != left_type_map.end())
{
@ -558,7 +559,8 @@ TableJoin::createConvertingActions(
*/
NameToNameMap left_column_rename;
NameToNameMap right_column_rename;
inferJoinKeyCommonType(left_sample_columns, right_sample_columns, !isSpecialStorage(), isEnabledAlgorithm(JoinAlgorithm::FULL_SORTING_MERGE));
inferJoinKeyCommonType(left_sample_columns, right_sample_columns, !isSpecialStorage());
if (!left_type_map.empty() || !right_type_map.empty())
{
left_dag = applyKeyConvertToTable(left_sample_columns, left_type_map, JoinTableSide::Left, left_column_rename);
@ -612,8 +614,11 @@ TableJoin::createConvertingActions(
}
template <typename LeftNamesAndTypes, typename RightNamesAndTypes>
void TableJoin::inferJoinKeyCommonType(const LeftNamesAndTypes & left, const RightNamesAndTypes & right, bool allow_right, bool strict)
void TableJoin::inferJoinKeyCommonType(const LeftNamesAndTypes & left, const RightNamesAndTypes & right, bool allow_right)
{
/// FullSortingMerge and PartialMerge join algorithms don't support joining keys with different types
/// (e.g. String and LowCardinality(String))
bool require_strict_keys_match = isEnabledAlgorithm(JoinAlgorithm::FULL_SORTING_MERGE);
if (!left_type_map.empty() || !right_type_map.empty())
return;
@ -645,7 +650,7 @@ void TableJoin::inferJoinKeyCommonType(const LeftNamesAndTypes & left, const Rig
const auto & ltype = ltypeit->second;
const auto & rtype = rtypeit->second;
bool type_equals = strict ? ltype->equals(*rtype) : JoinCommon::typesEqualUpToNullability(ltype, rtype);
bool type_equals = require_strict_keys_match ? ltype->equals(*rtype) : JoinCommon::typesEqualUpToNullability(ltype, rtype);
if (type_equals)
return true;

View File

@ -218,7 +218,7 @@ private:
/// Calculates common supertypes for corresponding join key columns.
template <typename LeftNamesAndTypes, typename RightNamesAndTypes>
void inferJoinKeyCommonType(const LeftNamesAndTypes & left, const RightNamesAndTypes & right, bool allow_right, bool strict);
void inferJoinKeyCommonType(const LeftNamesAndTypes & left, const RightNamesAndTypes & right, bool allow_right);
void deduplicateAndQualifyColumnNames(const NameSet & left_table_columns, const String & right_table_prefix);

View File

@ -41,12 +41,9 @@ static ZooKeeperRetriesInfo getRetriesInfo()
{
const auto & config_ref = Context::getGlobalContextInstance()->getConfigRef();
return ZooKeeperRetriesInfo(
"DistributedDDL",
&Poco::Logger::get("DDLQueryStatusSource"),
config_ref.getInt("distributed_ddl_keeper_max_retries", 5),
config_ref.getInt("distributed_ddl_keeper_initial_backoff_ms", 100),
config_ref.getInt("distributed_ddl_keeper_max_backoff_ms", 5000)
);
config_ref.getInt("distributed_ddl_keeper_max_backoff_ms", 5000));
}
bool isSupportedAlterTypeForOnClusterDDLQuery(int type)
@ -438,8 +435,8 @@ Chunk DDLQueryStatusSource::generate()
Strings tmp_active_hosts;
{
auto retries_info = getRetriesInfo();
auto retries_ctl = ZooKeeperRetriesControl("executeDDLQueryOnCluster", retries_info, context->getProcessListElement());
auto retries_ctl = ZooKeeperRetriesControl(
"executeDDLQueryOnCluster", &Poco::Logger::get("DDLQueryStatusSource"), getRetriesInfo(), context->getProcessListElement());
retries_ctl.retryLoop([&]()
{
auto zookeeper = context->getZooKeeper();
@ -478,8 +475,11 @@ Chunk DDLQueryStatusSource::generate()
String status_data;
bool finished_exists = false;
auto retries_info = getRetriesInfo();
auto retries_ctl = ZooKeeperRetriesControl("executeDDLQueryOnCluster", retries_info, context->getProcessListElement());
auto retries_ctl = ZooKeeperRetriesControl(
"executeDDLQueryOnCluster",
&Poco::Logger::get("DDLQueryStatusSource"),
getRetriesInfo(),
context->getProcessListElement());
retries_ctl.retryLoop([&]()
{
finished_exists = context->getZooKeeper()->tryGet(fs::path(node_path) / "finished" / host_id, status_data);

View File

@ -211,6 +211,9 @@ void ASTTableJoin::formatImplBeforeTable(const FormatSettings & settings, Format
case JoinKind::Comma:
settings.ostr << ",";
break;
case JoinKind::Paste:
settings.ostr << "PASTE JOIN";
break;
}
settings.ostr << (settings.hilite ? hilite_none : "");

View File

@ -6,6 +6,7 @@
#include <Parsers/ParserSelectQuery.h>
#include <Parsers/ParserSampleRatio.h>
#include <Parsers/ParserTablesInSelectQuery.h>
#include <Core/Joins.h>
namespace DB
@ -166,6 +167,8 @@ bool ParserTablesInSelectQueryElement::parseImpl(Pos & pos, ASTPtr & node, Expec
table_join->kind = JoinKind::Full;
else if (ParserKeyword("CROSS").ignore(pos))
table_join->kind = JoinKind::Cross;
else if (ParserKeyword("PASTE").ignore(pos))
table_join->kind = JoinKind::Paste;
else
no_kind = true;
@ -191,8 +194,8 @@ bool ParserTablesInSelectQueryElement::parseImpl(Pos & pos, ASTPtr & node, Expec
}
if (table_join->strictness != JoinStrictness::Unspecified
&& table_join->kind == JoinKind::Cross)
throw Exception(ErrorCodes::SYNTAX_ERROR, "You must not specify ANY or ALL for CROSS JOIN.");
&& (table_join->kind == JoinKind::Cross || table_join->kind == JoinKind::Paste))
throw Exception(ErrorCodes::SYNTAX_ERROR, "You must not specify ANY or ALL for {} JOIN.", toString(table_join->kind));
if ((table_join->strictness == JoinStrictness::Semi || table_join->strictness == JoinStrictness::Anti) &&
(table_join->kind != JoinKind::Left && table_join->kind != JoinKind::Right))
@ -206,7 +209,7 @@ bool ParserTablesInSelectQueryElement::parseImpl(Pos & pos, ASTPtr & node, Expec
return false;
if (table_join->kind != JoinKind::Comma
&& table_join->kind != JoinKind::Cross)
&& table_join->kind != JoinKind::Cross && table_join->kind != JoinKind::Paste)
{
if (ParserKeyword("USING").ignore(pos, expected))
{

View File

@ -955,6 +955,29 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
};
}
void joinCastPlanColumnsToNullable(QueryPlan & plan_to_add_cast, PlannerContextPtr & planner_context, const FunctionOverloadResolverPtr & to_nullable_function)
{
auto cast_actions_dag = std::make_shared<ActionsDAG>(plan_to_add_cast.getCurrentDataStream().header.getColumnsWithTypeAndName());
for (auto & output_node : cast_actions_dag->getOutputs())
{
if (planner_context->getGlobalPlannerContext()->hasColumnIdentifier(output_node->result_name))
{
DataTypePtr type_to_check = output_node->result_type;
if (const auto * type_to_check_low_cardinality = typeid_cast<const DataTypeLowCardinality *>(type_to_check.get()))
type_to_check = type_to_check_low_cardinality->getDictionaryType();
if (type_to_check->canBeInsideNullable())
output_node = &cast_actions_dag->addFunction(to_nullable_function, {output_node}, output_node->result_name);
}
}
cast_actions_dag->projectInput();
auto cast_join_columns_step = std::make_unique<ExpressionStep>(plan_to_add_cast.getCurrentDataStream(), std::move(cast_actions_dag));
cast_join_columns_step->setStepDescription("Cast JOIN columns to Nullable");
plan_to_add_cast.addStep(std::move(cast_join_columns_step));
}
JoinTreeQueryPlan buildQueryPlanForJoinNode(const QueryTreeNodePtr & join_table_expression,
JoinTreeQueryPlan left_join_tree_query_plan,
JoinTreeQueryPlan right_join_tree_query_plan,
@ -1068,45 +1091,21 @@ JoinTreeQueryPlan buildQueryPlanForJoinNode(const QueryTreeNodePtr & join_table_
const auto & query_context = planner_context->getQueryContext();
const auto & settings = query_context->getSettingsRef();
auto to_nullable_function = FunctionFactory::instance().get("toNullable", query_context);
auto join_cast_plan_columns_to_nullable = [&](QueryPlan & plan_to_add_cast)
{
auto cast_actions_dag = std::make_shared<ActionsDAG>(plan_to_add_cast.getCurrentDataStream().header.getColumnsWithTypeAndName());
for (auto & output_node : cast_actions_dag->getOutputs())
{
if (planner_context->getGlobalPlannerContext()->hasColumnIdentifier(output_node->result_name))
{
DataTypePtr type_to_check = output_node->result_type;
if (const auto * type_to_check_low_cardinality = typeid_cast<const DataTypeLowCardinality *>(type_to_check.get()))
type_to_check = type_to_check_low_cardinality->getDictionaryType();
if (type_to_check->canBeInsideNullable())
output_node = &cast_actions_dag->addFunction(to_nullable_function, {output_node}, output_node->result_name);
}
}
cast_actions_dag->projectInput();
auto cast_join_columns_step = std::make_unique<ExpressionStep>(plan_to_add_cast.getCurrentDataStream(), std::move(cast_actions_dag));
cast_join_columns_step->setStepDescription("Cast JOIN columns to Nullable");
plan_to_add_cast.addStep(std::move(cast_join_columns_step));
};
if (settings.join_use_nulls)
{
auto to_nullable_function = FunctionFactory::instance().get("toNullable", query_context);
if (isFull(join_kind))
{
join_cast_plan_columns_to_nullable(left_plan);
join_cast_plan_columns_to_nullable(right_plan);
joinCastPlanColumnsToNullable(left_plan, planner_context, to_nullable_function);
joinCastPlanColumnsToNullable(right_plan, planner_context, to_nullable_function);
}
else if (isLeft(join_kind))
{
join_cast_plan_columns_to_nullable(right_plan);
joinCastPlanColumnsToNullable(right_plan, planner_context, to_nullable_function);
}
else if (isRight(join_kind))
{
join_cast_plan_columns_to_nullable(left_plan);
joinCastPlanColumnsToNullable(left_plan, planner_context, to_nullable_function);
}
}
@ -1312,7 +1311,7 @@ JoinTreeQueryPlan buildQueryPlanForJoinNode(const QueryTreeNodePtr & join_table_
return step_raw_ptr;
};
if (join_algorithm->pipelineType() == JoinPipelineType::YShaped)
if (join_algorithm->pipelineType() == JoinPipelineType::YShaped && join_kind != JoinKind::Paste)
{
const auto & join_clause = table_join->getOnlyClause();

View File

@ -35,6 +35,7 @@
#include <Interpreters/JoinSwitcher.h>
#include <Interpreters/ArrayJoinAction.h>
#include <Interpreters/GraceHashJoin.h>
#include <Interpreters/PasteJoin.h>
#include <Planner/PlannerActionsVisitor.h>
#include <Planner/PlannerContext.h>
@ -653,6 +654,8 @@ static std::shared_ptr<IJoin> tryCreateJoin(JoinAlgorithm algorithm,
const Block & right_table_expression_header,
const PlannerContextPtr & planner_context)
{
if (table_join->kind() == JoinKind::Paste)
return std::make_shared<PasteJoin>(table_join, right_table_expression_header);
/// Direct JOIN with special storages that support key value access. For example JOIN with Dictionary
if (algorithm == JoinAlgorithm::DIRECT || algorithm == JoinAlgorithm::DEFAULT)
{

View File

@ -740,7 +740,7 @@ void DWARFBlockInputFormat::parseFilenameTable(UnitState & unit, uint64_t offset
auto error = prologue.parse(*debug_line_extractor, &offset, /*RecoverableErrorHandler*/ [&](auto e)
{
if (++seen_debug_line_warnings < 10)
LOG_INFO(&Poco::Logger::get("DWARF"), "{}", llvm::toString(std::move(e)));
LOG_INFO(&Poco::Logger::get("DWARF"), "Parsing error: {}", llvm::toString(std::move(e)));
}, *dwarf_context, unit.dwarf_unit);
if (error)

View File

@ -0,0 +1,127 @@
#include <cassert>
#include <cstddef>
#include <limits>
#include <memory>
#include <type_traits>
#include <base/defines.h>
#include <base/types.h>
#include <Common/logger_useful.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/IColumn.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/TableJoin.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Processors/Transforms/PasteJoinTransform.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int NOT_IMPLEMENTED;
}
PasteJoinAlgorithm::PasteJoinAlgorithm(
JoinPtr table_join_,
const Blocks & input_headers,
size_t max_block_size_)
: table_join(table_join_)
, max_block_size(max_block_size_)
, log(&Poco::Logger::get("PasteJoinAlgorithm"))
{
if (input_headers.size() != 2)
throw Exception(ErrorCodes::LOGICAL_ERROR, "PasteJoinAlgorithm requires exactly two inputs");
auto strictness = table_join->getTableJoin().strictness();
if (strictness != JoinStrictness::Any && strictness != JoinStrictness::All)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "PasteJoinAlgorithm is not implemented for strictness {}", strictness);
auto kind = table_join->getTableJoin().kind();
if (!isPaste(kind))
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "PasteJoinAlgorithm is not implemented for kind {}", kind);
}
static void prepareChunk(Chunk & chunk)
{
if (!chunk)
return;
auto num_rows = chunk.getNumRows();
auto columns = chunk.detachColumns();
chunk.setColumns(std::move(columns), num_rows);
}
void PasteJoinAlgorithm::initialize(Inputs inputs)
{
if (inputs.size() != 2)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Two inputs are required, got {}", inputs.size());
for (size_t i = 0; i < inputs.size(); ++i)
{
consume(inputs[i], i);
}
}
void PasteJoinAlgorithm::consume(Input & input, size_t source_num)
{
if (input.skip_last_row)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "skip_last_row is not supported");
if (input.permutation)
throw DB::Exception(ErrorCodes::NOT_IMPLEMENTED, "permutation is not supported");
last_used_row[source_num] = 0;
prepareChunk(input.chunk);
chunks[source_num] = std::move(input.chunk);
}
IMergingAlgorithm::Status PasteJoinAlgorithm::merge()
{
if (chunks[0].empty() || chunks[1].empty())
return Status({}, true);
if (last_used_row[0] >= chunks[0].getNumRows())
return Status(0);
if (last_used_row[1] >= chunks[1].getNumRows())
return Status(1);
/// We have unused rows from both inputs
size_t result_num_rows = std::min(chunks[0].getNumRows() - last_used_row[0], chunks[1].getNumRows() - last_used_row[1]);
Chunk result;
for (size_t source_num = 0; source_num < 2; ++source_num)
for (const auto & col : chunks[source_num].getColumns())
result.addColumn(col->cut(last_used_row[source_num], result_num_rows));
last_used_row[0] += result_num_rows;
last_used_row[1] += result_num_rows;
return Status(std::move(result));
}
PasteJoinTransform::PasteJoinTransform(
JoinPtr table_join,
const Blocks & input_headers,
const Block & output_header,
size_t max_block_size,
UInt64 limit_hint_)
: IMergingTransform<PasteJoinAlgorithm>(
input_headers,
output_header,
/* have_all_inputs_= */ true,
limit_hint_,
/* always_read_till_end_= */ false,
/* empty_chunk_on_finish_= */ true,
table_join, input_headers, max_block_size)
, log(&Poco::Logger::get("PasteJoinTransform"))
{
LOG_TRACE(log, "Use PasteJoinTransform");
}
void PasteJoinTransform::onFinish() {};
}

View File

@ -0,0 +1,88 @@
#pragma once
#include <cassert>
#include <cstddef>
#include <memory>
#include <mutex>
#include <utility>
#include <boost/core/noncopyable.hpp>
#include <Common/PODArray.h>
#include <IO/ReadBuffer.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Processors/Chunk.h>
#include <Processors/Merges/Algorithms/IMergingAlgorithm.h>
#include <Processors/Merges/IMergingTransform.h>
namespace Poco { class Logger; }
namespace DB
{
class IJoin;
using JoinPtr = std::shared_ptr<IJoin>;
/*
* This class is used to join chunks from two sorted streams.
* It is used in MergeJoinTransform.
*/
class PasteJoinAlgorithm final : public IMergingAlgorithm
{
public:
explicit PasteJoinAlgorithm(JoinPtr table_join, const Blocks & input_headers, size_t max_block_size_);
const char * getName() const override { return "PasteJoinAlgorithm"; }
virtual void initialize(Inputs inputs) override;
virtual void consume(Input & input, size_t source_num) override;
virtual Status merge() override;
void logElapsed(double seconds);
private:
Chunk createBlockWithDefaults(size_t source_num);
Chunk createBlockWithDefaults(size_t source_num, size_t start, size_t num_rows) const;
/// For `USING` join key columns should have values from right side instead of defaults
std::unordered_map<size_t, size_t> left_to_right_key_remap;
std::array<Chunk, 2> chunks;
JoinPtr table_join;
size_t max_block_size;
struct Statistic
{
size_t num_blocks[2] = {0, 0};
size_t num_rows[2] = {0, 0};
size_t max_blocks_loaded = 0;
};
Statistic stat;
Poco::Logger * log;
UInt64 last_used_row[2] = {0, 0};
};
class PasteJoinTransform final : public IMergingTransform<PasteJoinAlgorithm>
{
using Base = IMergingTransform<PasteJoinAlgorithm>;
public:
PasteJoinTransform(
JoinPtr table_join,
const Blocks & input_headers,
const Block & output_header,
size_t max_block_size,
UInt64 limit_hint = 0);
String getName() const override { return "PasteJoinTransform"; }
protected:
void onFinish() override;
Poco::Logger * log;
};
}

View File

@ -25,6 +25,7 @@
#include <Processors/Transforms/ExtremesTransform.h>
#include <Processors/Transforms/JoiningTransform.h>
#include <Processors/Transforms/MergeJoinTransform.h>
#include <Processors/Transforms/PasteJoinTransform.h>
#include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h>
#include <Processors/Transforms/PartialSortingTransform.h>
#include <Processors/Transforms/TotalsHavingTransform.h>
@ -36,6 +37,7 @@ namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int NOT_IMPLEMENTED;
extern const int BAD_ARGUMENTS;
}
void QueryPipelineBuilder::checkInitialized()
@ -354,7 +356,9 @@ std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelinesYShaped
left->pipe.dropExtremes();
right->pipe.dropExtremes();
if (left->getNumStreams() != 1 || right->getNumStreams() != 1)
if ((left->getNumStreams() != 1 || right->getNumStreams() != 1) && join->getTableJoin().kind() == JoinKind::Paste)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Paste JOIN requires sorted tables only");
else if (left->getNumStreams() != 1 || right->getNumStreams() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Join is supported only for pipelines with one output port");
if (left->hasTotals() || right->hasTotals())
@ -362,9 +366,16 @@ std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelinesYShaped
Blocks inputs = {left->getHeader(), right->getHeader()};
auto joining = std::make_shared<MergeJoinTransform>(join, inputs, out_header, max_block_size);
return mergePipelines(std::move(left), std::move(right), std::move(joining), collected_processors);
if (join->getTableJoin().kind() == JoinKind::Paste)
{
auto joining = std::make_shared<PasteJoinTransform>(join, inputs, out_header, max_block_size);
return mergePipelines(std::move(left), std::move(right), std::move(joining), collected_processors);
}
else
{
auto joining = std::make_shared<MergeJoinTransform>(join, inputs, out_header, max_block_size);
return mergePipelines(std::move(left), std::move(right), std::move(joining), collected_processors);
}
}
std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelinesRightLeft(

View File

@ -40,8 +40,6 @@ namespace ErrorCodes
extern const int READONLY;
extern const int UNKNOWN_STATUS_OF_INSERT;
extern const int INSERT_WAS_DEDUPLICATED;
extern const int TIMEOUT_EXCEEDED;
extern const int NO_ACTIVE_REPLICAS;
extern const int DUPLICATE_DATA_PART;
extern const int PART_IS_TEMPORARILY_LOCKED;
extern const int LOGICAL_ERROR;
@ -160,7 +158,12 @@ size_t ReplicatedMergeTreeSinkImpl<async_insert>::checkQuorumPrecondition(const
size_t replicas_number = 0;
ZooKeeperRetriesControl quorum_retries_ctl("checkQuorumPrecondition", zookeeper_retries_info, context->getProcessListElement());
const auto & settings = context->getSettingsRef();
ZooKeeperRetriesControl quorum_retries_ctl(
"checkQuorumPrecondition",
log,
{settings.insert_keeper_max_retries, settings.insert_keeper_retry_initial_backoff_ms, settings.insert_keeper_retry_max_backoff_ms},
context->getProcessListElement());
quorum_retries_ctl.retryLoop(
[&]()
{
@ -255,12 +258,6 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::consume(Chunk chunk)
auto block = getHeader().cloneWithColumns(chunk.detachColumns());
const auto & settings = context->getSettingsRef();
zookeeper_retries_info = ZooKeeperRetriesInfo(
"ReplicatedMergeTreeSink::consume",
settings.insert_keeper_max_retries ? log : nullptr,
settings.insert_keeper_max_retries,
settings.insert_keeper_retry_initial_backoff_ms,
settings.insert_keeper_retry_max_backoff_ms);
ZooKeeperWithFaultInjectionPtr zookeeper = ZooKeeperWithFaultInjection::createInstance(
settings.insert_keeper_fault_injection_probability,
@ -636,7 +633,12 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
CommitRetryContext retry_context;
ZooKeeperRetriesControl retries_ctl("commitPart", zookeeper_retries_info, context->getProcessListElement());
const auto & settings = context->getSettingsRef();
ZooKeeperRetriesControl retries_ctl(
"commitPart",
log,
{settings.insert_keeper_max_retries, settings.insert_keeper_retry_initial_backoff_ms, settings.insert_keeper_retry_max_backoff_ms},
context->getProcessListElement());
auto resolve_duplicate_stage = [&] () -> CommitRetryContext::Stages
{
@ -910,12 +912,8 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
part->name, multi_code, MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER);
});
/// Independently of how many retries we had left we want to do at least one check of this inner retry
/// so a) we try to verify at least once if metadata was written and b) we set the proper final error
/// (UNKNOWN_STATUS_OF_INSERT) if we fail to reconnect to keeper
new_retry_controller.requestUnconditionalRetry();
bool node_exists = false;
/// The loop will be executed at least once
new_retry_controller.retryLoop([&]
{
fiu_do_on(FailPoints::replicated_merge_tree_commit_zk_fail_when_recovering_from_hw_fault, { zookeeper->forceFailureBeforeOperation(); });
@ -1073,7 +1071,26 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
if (quorum_parallel)
quorum_info.status_path = storage.zookeeper_path + "/quorum/parallel/" + retry_context.actual_part_name;
waitForQuorum(zookeeper, retry_context.actual_part_name, quorum_info.status_path, quorum_info.is_active_node_version, replicas_num);
ZooKeeperRetriesControl new_retry_controller = retries_ctl;
new_retry_controller.actionAfterLastFailedRetry([&]
{
/// We do not know whether or not data has been inserted in other replicas
new_retry_controller.setUserError(
ErrorCodes::UNKNOWN_STATUS_OF_INSERT,
"Unknown quorum status. The data was inserted in the local replica but we could not verify quorum. Reason: {}",
new_retry_controller.getLastKeeperErrorMessage());
});
new_retry_controller.retryLoop([&]()
{
zookeeper->setKeeper(storage.getZooKeeper());
waitForQuorum(
zookeeper,
retry_context.actual_part_name,
quorum_info.status_path,
quorum_info.is_active_node_version,
replicas_num);
});
}
}
@ -1106,49 +1123,44 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::waitForQuorum(
/// We are waiting for quorum to be satisfied.
LOG_TRACE(log, "Waiting for quorum '{}' for part {}{}", quorum_path, part_name, quorumLogMessage(replicas_num));
try
fiu_do_on(FailPoints::replicated_merge_tree_insert_quorum_fail_0, { zookeeper->forceFailureBeforeOperation(); });
while (true)
{
fiu_do_on(FailPoints::replicated_merge_tree_insert_quorum_fail_0, { zookeeper->forceFailureBeforeOperation(); });
zkutil::EventPtr event = std::make_shared<Poco::Event>();
while (true)
{
zkutil::EventPtr event = std::make_shared<Poco::Event>();
std::string value;
/// `get` instead of `exists` so that `watch` does not leak if the node is no longer there.
if (!zookeeper->tryGet(quorum_path, value, nullptr, event))
break;
std::string value;
/// `get` instead of `exists` so that `watch` does not leak if the node is no longer there.
if (!zookeeper->tryGet(quorum_path, value, nullptr, event))
break;
LOG_TRACE(log, "Quorum node {} still exists, will wait for updates", quorum_path);
LOG_TRACE(log, "Quorum node {} still exists, will wait for updates", quorum_path);
ReplicatedMergeTreeQuorumEntry quorum_entry(value);
ReplicatedMergeTreeQuorumEntry quorum_entry(value);
/// If the node has time to disappear, and then appear again for the next insert.
if (quorum_entry.part_name != part_name)
break;
/// If the node has time to disappear, and then appear again for the next insert.
if (quorum_entry.part_name != part_name)
break;
if (!event->tryWait(quorum_timeout_ms))
throw Exception(
ErrorCodes::UNKNOWN_STATUS_OF_INSERT,
"Unknown quorum status. The data was inserted in the local replica but we could not verify quorum. Reason: "
"Timeout while waiting for quorum");
if (!event->tryWait(quorum_timeout_ms))
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Timeout while waiting for quorum");
LOG_TRACE(log, "Quorum {} for part {} updated, will check quorum node still exists", quorum_path, part_name);
}
/// And what if it is possible that the current replica at this time has ceased to be active
/// and the quorum is marked as failed and deleted?
Coordination::Stat stat;
String value;
if (!zookeeper->tryGet(storage.replica_path + "/is_active", value, &stat)
|| stat.version != is_active_node_version)
throw Exception(ErrorCodes::NO_ACTIVE_REPLICAS, "Replica become inactive while waiting for quorum");
}
catch (...)
{
/// We do not know whether or not data has been inserted
/// - whether other replicas have time to download the part and mark the quorum as done.
throw Exception(ErrorCodes::UNKNOWN_STATUS_OF_INSERT, "Unknown status, client must retry. Reason: {}",
getCurrentExceptionMessage(false));
LOG_TRACE(log, "Quorum {} for part {} updated, will check quorum node still exists", quorum_path, part_name);
}
/// And what if it is possible that the current replica at this time has ceased to be active
/// and the quorum is marked as failed and deleted?
Coordination::Stat stat;
String value;
if (!zookeeper->tryGet(storage.replica_path + "/is_active", value, &stat) || stat.version != is_active_node_version)
throw Exception(
ErrorCodes::UNKNOWN_STATUS_OF_INSERT,
"Unknown quorum status. The data was inserted in the local replica but we could not verify quorum. Reason: "
"Replica became inactive while waiting for quorum");
LOG_TRACE(log, "Quorum '{}' for part {} satisfied", quorum_path, part_name);
}

View File

@ -74,7 +74,6 @@ private:
using BlockIDsType = std::conditional_t<async_insert, std::vector<String>, String>;
ZooKeeperRetriesInfo zookeeper_retries_info;
struct QuorumInfo
{
String status_path;

View File

@ -5,6 +5,8 @@
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/logger_useful.h>
#include <memory>
namespace DB
{
@ -15,29 +17,31 @@ namespace ErrorCodes
struct ZooKeeperRetriesInfo
{
ZooKeeperRetriesInfo() = default;
ZooKeeperRetriesInfo(std::string name_, Poco::Logger * logger_, UInt64 max_retries_, UInt64 initial_backoff_ms_, UInt64 max_backoff_ms_)
: name(std::move(name_))
, logger(logger_)
, max_retries(max_retries_)
, curr_backoff_ms(std::min(initial_backoff_ms_, max_backoff_ms_))
, max_backoff_ms(max_backoff_ms_)
ZooKeeperRetriesInfo(UInt64 max_retries_, UInt64 initial_backoff_ms_, UInt64 max_backoff_ms_)
: max_retries(max_retries_), initial_backoff_ms(std::min(initial_backoff_ms_, max_backoff_ms_)), max_backoff_ms(max_backoff_ms_)
{
}
std::string name;
Poco::Logger * logger = nullptr;
UInt64 max_retries = 0;
UInt64 curr_backoff_ms = 0;
UInt64 max_backoff_ms = 0;
UInt64 retry_count = 0;
UInt64 max_retries;
UInt64 initial_backoff_ms;
UInt64 max_backoff_ms;
};
class ZooKeeperRetriesControl
{
public:
ZooKeeperRetriesControl(std::string name_, ZooKeeperRetriesInfo & retries_info_, QueryStatusPtr elem)
: name(std::move(name_)), retries_info(retries_info_), process_list_element(elem)
ZooKeeperRetriesControl(std::string name_, Poco::Logger * logger_, ZooKeeperRetriesInfo retries_info_, QueryStatusPtr elem)
: name(std::move(name_)), logger(logger_), retries_info(retries_info_), process_list_element(elem)
{
}
ZooKeeperRetriesControl(const ZooKeeperRetriesControl & other)
: name(other.name)
, logger(other.logger)
, retries_info(other.retries_info)
, total_failures(other.total_failures)
, process_list_element(other.process_list_element)
, current_backoff_ms(other.current_backoff_ms)
{
}
@ -46,7 +50,7 @@ public:
retryLoop(f, []() {});
}
/// retryLoop() executes f() until it succeeds/max_retries is reached/non-retrialable error is encountered
/// retryLoop() executes f() until it succeeds/max_retries is reached/non-retryable error is encountered
///
/// the callable f() can provide feedback in terms of errors in two ways:
/// 1. throw KeeperException exception:
@ -56,10 +60,17 @@ public:
/// The idea is that if the caller has some semantics on top of non-hardware keeper errors,
/// then it can provide feedback to retries controller via user errors
///
/// It is possible to use it multiple times (it will share nº of errors over the total amount of calls)
/// Each retryLoop is independent and it will execute f at least once
void retryLoop(auto && f, auto && iteration_cleanup)
{
while (canTry())
current_iteration = 0;
current_backoff_ms = retries_info.initial_backoff_ms;
while (current_iteration == 0 || canTry())
{
/// reset the flag, it will be set to false in case of error
iteration_succeeded = true;
try
{
f();
@ -79,6 +90,7 @@ public:
iteration_cleanup();
throw;
}
current_iteration++;
}
}
@ -102,13 +114,11 @@ public:
void setUserError(std::exception_ptr exception, int code, std::string message)
{
if (retries_info.logger)
LOG_TRACE(
retries_info.logger, "ZooKeeperRetriesControl: {}/{}: setUserError: error={} message={}", retries_info.name, name, code, message);
if (logger)
LOG_TRACE(logger, "ZooKeeperRetriesControl: {}: setUserError: error={} message={}", name, code, message);
/// if current iteration is already failed, keep initial error
if (!iteration_succeeded)
return;
if (iteration_succeeded)
total_failures++;
iteration_succeeded = false;
user_error.code = code;
@ -136,13 +146,11 @@ public:
void setKeeperError(std::exception_ptr exception, Coordination::Error code, std::string message)
{
if (retries_info.logger)
LOG_TRACE(
retries_info.logger, "ZooKeeperRetriesControl: {}/{}: setKeeperError: error={} message={}", retries_info.name, name, code, message);
if (logger)
LOG_TRACE(logger, "ZooKeeperRetriesControl: {}: setKeeperError: error={} message={}", name, code, message);
/// if current iteration is already failed, keep initial error
if (!iteration_succeeded)
return;
if (iteration_succeeded)
total_failures++;
iteration_succeeded = false;
keeper_error.code = code;
@ -170,17 +178,19 @@ public:
void stopRetries() { stop_retries = true; }
void requestUnconditionalRetry() { unconditional_retry = true; }
bool isLastRetry() const { return total_failures >= retries_info.max_retries; }
bool isLastRetry() const { return retries_info.retry_count >= retries_info.max_retries; }
bool isRetry() const { return current_iteration > 1; }
bool isRetry() const { return retries_info.retry_count > 0; }
Coordination::Error getLastKeeperErrorCode() const { return keeper_error.code; }
const std::string & getLastKeeperErrorMessage() const { return keeper_error.message; }
/// action will be called only once and only after latest failed retry
void actionAfterLastFailedRetry(std::function<void()> f) { action_after_last_failed_retry = std::move(f); }
const std::string & getName() const { return name; }
Poco::Logger * getLogger() const { return logger; }
private:
struct KeeperError
{
@ -199,59 +209,42 @@ private:
bool canTry()
{
++iteration_count;
/// first iteration is ordinary execution, no further checks needed
if (0 == iteration_count)
return true;
if (process_list_element && !process_list_element->checkTimeLimitSoft())
return false;
if (unconditional_retry)
{
unconditional_retry = false;
return true;
}
/// iteration succeeded -> no need to retry
if (iteration_succeeded)
{
/// avoid unnecessary logs, - print something only in case of retries
if (retries_info.logger && iteration_count > 1)
if (logger && total_failures > 0)
LOG_DEBUG(
retries_info.logger,
"ZooKeeperRetriesControl: {}/{}: succeeded after: iterations={} total_retries={}",
retries_info.name,
logger,
"ZooKeeperRetriesControl: {}: succeeded after: Iterations={} Total keeper failures={}/{}",
name,
iteration_count,
retries_info.retry_count);
current_iteration,
total_failures,
retries_info.max_retries);
return false;
}
if (stop_retries)
{
logLastError("stop retries on request");
action_after_last_failed_retry();
logLastError("stop retries on request");
throwIfError();
return false;
}
if (retries_info.retry_count >= retries_info.max_retries)
if (total_failures > retries_info.max_retries)
{
logLastError("retry limit is reached");
action_after_last_failed_retry();
logLastError("retry limit is reached");
throwIfError();
return false;
}
if (process_list_element && !process_list_element->checkTimeLimitSoft())
return false;
/// retries
++retries_info.retry_count;
logLastError("will retry due to error");
sleepForMilliseconds(retries_info.curr_backoff_ms);
retries_info.curr_backoff_ms = std::min(retries_info.curr_backoff_ms * 2, retries_info.max_backoff_ms);
/// reset the flag, it will be set to false in case of error
iteration_succeeded = true;
sleepForMilliseconds(current_backoff_ms);
current_backoff_ms = std::min(current_backoff_ms * 2, retries_info.max_backoff_ms);
return true;
}
@ -265,49 +258,52 @@ private:
std::rethrow_exception(keeper_error.exception);
}
void logLastError(std::string_view header)
void logLastError(const std::string_view & header)
{
if (!logger)
return;
if (user_error.code == ErrorCodes::OK)
{
if (retries_info.logger)
LOG_DEBUG(
retries_info.logger,
"ZooKeeperRetriesControl: {}/{}: {}: retry_count={} timeout={}ms error={} message={}",
retries_info.name,
name,
header,
retries_info.retry_count,
retries_info.curr_backoff_ms,
keeper_error.code,
keeper_error.message);
LOG_DEBUG(
logger,
"ZooKeeperRetriesControl: {}: {}: retry_count={}/{} timeout={}ms error={} message={}",
name,
header,
current_iteration,
retries_info.max_retries,
current_backoff_ms,
keeper_error.code,
keeper_error.message);
}
else
{
if (retries_info.logger)
LOG_DEBUG(
retries_info.logger,
"ZooKeeperRetriesControl: {}/{}: {}: retry_count={} timeout={}ms error={} message={}",
retries_info.name,
name,
header,
retries_info.retry_count,
retries_info.curr_backoff_ms,
user_error.code,
user_error.message);
LOG_DEBUG(
logger,
"ZooKeeperRetriesControl: {}: {}: retry_count={}/{} timeout={}ms error={} message={}",
name,
header,
current_iteration,
retries_info.max_retries,
current_backoff_ms,
user_error.code,
user_error.message);
}
}
std::string name;
ZooKeeperRetriesInfo & retries_info;
Int64 iteration_count = -1;
Poco::Logger * logger = nullptr;
ZooKeeperRetriesInfo retries_info;
UInt64 total_failures = 0;
UserError user_error;
KeeperError keeper_error;
std::function<void()> action_after_last_failed_retry = []() {};
bool unconditional_retry = false;
bool iteration_succeeded = true;
bool stop_retries = false;
QueryStatusPtr process_list_element;
UInt64 current_iteration = 0;
UInt64 current_backoff_ms = 0;
};
}

View File

@ -10258,7 +10258,7 @@ void StorageReplicatedMergeTree::backupData(
bool exists = false;
Strings mutation_ids;
{
ZooKeeperRetriesControl retries_ctl("getMutations", zookeeper_retries_info, nullptr);
ZooKeeperRetriesControl retries_ctl("getMutations", log, zookeeper_retries_info, nullptr);
retries_ctl.retryLoop([&]()
{
if (!zookeeper || zookeeper->expired())
@ -10277,7 +10277,7 @@ void StorageReplicatedMergeTree::backupData(
bool mutation_id_exists = false;
String mutation;
ZooKeeperRetriesControl retries_ctl("getMutation", zookeeper_retries_info, nullptr);
ZooKeeperRetriesControl retries_ctl("getMutation", log, zookeeper_retries_info, nullptr);
retries_ctl.retryLoop([&]()
{
if (!zookeeper || zookeeper->expired())

View File

@ -6,6 +6,7 @@
#include <mutex>
#include <filesystem>
#include <unordered_map>
#include <memory>
#include <base/scope_guard.h>
@ -24,9 +25,14 @@
#include <Common/HashTable/Hash.h>
#include <Common/logger_useful.h>
#include <Common/Stopwatch.h>
#include <Core/ColumnsWithTypeAndName.h>
#include <Interpreters/Context.h>
#include <Interpreters/TranslateQualifiedNamesVisitor.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/QueryPlan/SourceStepWithFilter.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <QueryPipeline/Pipe.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <base/getThreadId.h>
@ -162,7 +168,7 @@ bool wait(int timeout_ms)
}
using ThreadIdToName = std::unordered_map<UInt64, String, DefaultHash<UInt64>>;
ThreadIdToName getFilteredThreadNames(ASTPtr query, ContextPtr context, const PaddedPODArray<UInt64> & thread_ids, Poco::Logger * log)
ThreadIdToName getFilteredThreadNames(const ActionsDAG::Node * predicate, ContextPtr context, const PaddedPODArray<UInt64> & thread_ids, Poco::Logger * log)
{
ThreadIdToName tid_to_name;
MutableColumnPtr all_thread_names = ColumnString::create();
@ -193,7 +199,7 @@ ThreadIdToName getFilteredThreadNames(ASTPtr query, ContextPtr context, const Pa
LOG_TRACE(log, "Read {} thread names for {} threads, took {} ms", tid_to_name.size(), thread_ids.size(), watch.elapsedMilliseconds());
Block block { ColumnWithTypeAndName(std::move(all_thread_names), std::make_shared<DataTypeString>(), "thread_name") };
VirtualColumnUtils::filterBlockWithQuery(query, block, context);
VirtualColumnUtils::filterBlockWithPredicate(predicate, block, context);
ColumnPtr thread_names = std::move(block.getByPosition(0).column);
std::unordered_set<String> filtered_thread_names;
@ -218,14 +224,16 @@ ThreadIdToName getFilteredThreadNames(ASTPtr query, ContextPtr context, const Pa
/// We must wait for every thread one by one sequentially,
/// because there is a limit on number of queued signals in OS and otherwise signals may get lost.
/// Also, non-RT signals are not delivered if previous signal is handled right now (by default; but we use RT signals).
class StorageSystemStackTraceSource : public ISource
class StackTraceSource : public ISource
{
public:
StorageSystemStackTraceSource(const Names & column_names, Block header_, const ASTPtr query_, ContextPtr context_, UInt64 max_block_size_, Poco::Logger * log_)
StackTraceSource(const Names & column_names, Block header_, ASTPtr && query_, ActionsDAGPtr && filter_dag_, ContextPtr context_, UInt64 max_block_size_, Poco::Logger * log_)
: ISource(header_)
, context(context_)
, header(std::move(header_))
, query(query_)
, query(std::move(query_))
, filter_dag(std::move(filter_dag_))
, predicate(filter_dag ? filter_dag->getOutputs().at(0) : nullptr)
, max_block_size(max_block_size_)
, pipe_read_timeout_ms(static_cast<int>(context->getSettingsRef().storage_system_stack_trace_pipe_read_timeout_ms.totalMilliseconds()))
, log(log_)
@ -259,7 +267,7 @@ protected:
ThreadIdToName thread_names;
if (read_thread_names)
thread_names = getFilteredThreadNames(query, context, thread_ids_data, log);
thread_names = getFilteredThreadNames(predicate, context, thread_ids_data, log);
for (UInt64 tid : thread_ids_data)
{
@ -343,6 +351,8 @@ private:
ContextPtr context;
Block header;
const ASTPtr query;
const ActionsDAGPtr filter_dag;
const ActionsDAG::Node * predicate;
const size_t max_block_size;
const int pipe_read_timeout_ms;
@ -372,11 +382,55 @@ private:
}
Block block { ColumnWithTypeAndName(std::move(all_thread_ids), std::make_shared<DataTypeUInt64>(), "thread_id") };
VirtualColumnUtils::filterBlockWithQuery(query, block, context);
VirtualColumnUtils::filterBlockWithPredicate(predicate, block, context);
return block.getByPosition(0).column;
}
};
class ReadFromSystemStackTrace : public SourceStepWithFilter
{
public:
std::string getName() const override { return "ReadFromSystemStackTrace"; }
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override
{
auto filter_actions_dag = ActionsDAG::buildFilterActionsDAG(filter_nodes.nodes, {}, context);
Pipe pipe(std::make_shared<StackTraceSource>(
column_names,
getOutputStream().header,
std::move(query),
std::move(filter_actions_dag),
context,
max_block_size,
log));
pipeline.init(std::move(pipe));
}
ReadFromSystemStackTrace(
const Names & column_names_,
Block sample_block,
ASTPtr && query_,
ContextPtr context_,
size_t max_block_size_,
Poco::Logger * log_)
: SourceStepWithFilter(DataStream{.header = std::move(sample_block)})
, column_names(column_names_)
, query(query_)
, context(std::move(context_))
, max_block_size(max_block_size_)
, log(log_)
{
}
private:
Names column_names;
ASTPtr query;
ContextPtr context;
size_t max_block_size;
Poco::Logger * log;
};
}
@ -412,23 +466,27 @@ StorageSystemStackTrace::StorageSystemStackTrace(const StorageID & table_id_)
}
Pipe StorageSystemStackTrace::read(
void StorageSystemStackTrace::read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum /*processed_stage*/,
const size_t max_block_size,
const size_t /*num_streams*/)
size_t max_block_size,
size_t /*num_streams*/)
{
storage_snapshot->check(column_names);
return Pipe(std::make_shared<StorageSystemStackTraceSource>(
Block sample_block = storage_snapshot->metadata->getSampleBlock();
auto reading = std::make_unique<ReadFromSystemStackTrace>(
column_names,
storage_snapshot->metadata->getSampleBlock(),
sample_block,
query_info.query->clone(),
context,
max_block_size,
log));
log);
query_plan.addStep(std::move(reading));
}
}

View File

@ -25,14 +25,15 @@ public:
String getName() const override { return "SystemStackTrace"; }
Pipe read(
void read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
QueryProcessingStage::Enum /*processed_stage*/,
size_t max_block_size,
size_t num_streams) override;
size_t /*num_streams*/) override;
bool isSystemStorage() const override { return true; }

View File

@ -135,7 +135,7 @@ def parse_args(parser: argparse.ArgumentParser) -> argparse.Namespace:
"--skip-jobs",
action="store_true",
default=False,
help="skip fetching data about job runs, used in --configure action (for debugging)",
help="skip fetching data about job runs, used in --configure action (for debugging and nigthly ci)",
)
parser.add_argument(
"--rebuild-all-docker",
@ -377,6 +377,7 @@ def _configure_jobs(
for batch in range(num_batches): # type: ignore
batches_to_do.append(batch)
elif job_config.run_always:
# always add to todo
batches_to_do.append(batch)
else:
# this job controlled by digest, add to todo if it's not successfully done before
@ -396,6 +397,21 @@ def _configure_jobs(
else:
jobs_to_skip += (job,)
if pr_labels:
jobs_requested_by_label = [] # type: List[str]
ci_controlling_labels = [] # type: List[str]
for label in pr_labels:
label_config = CI_CONFIG.get_label_config(label)
if label_config:
jobs_requested_by_label += label_config.run_jobs
ci_controlling_labels += [label]
if ci_controlling_labels:
print(f"NOTE: CI controlling labels are set: [{ci_controlling_labels}]")
print(
f" : following jobs will be executed: [{jobs_requested_by_label}]"
)
jobs_to_do = jobs_requested_by_label
if commit_tokens:
requested_jobs = [
token[len("#job_") :]
@ -415,7 +431,7 @@ def _configure_jobs(
if parent in jobs_to_do and parent not in jobs_to_do_requested:
jobs_to_do_requested.append(parent)
print(
f"NOTE: Only specific job(s) were requested: [{jobs_to_do_requested}]"
f"NOTE: Only specific job(s) were requested by commit message tokens: [{jobs_to_do_requested}]"
)
jobs_to_do = jobs_to_do_requested
@ -607,8 +623,10 @@ def main() -> int:
result["jobs_data"] = jobs_data
result["docker_data"] = docker_data
if pr_info.number != 0 and not args.docker_digest_or_latest:
# FIXME: it runs style check before docker build if possible (style-check images is not changed)
# find a way to do style check always before docker build and others
_check_and_update_for_early_style_check(result)
if pr_info.number != 0 and pr_info.has_changes_in_documentation_only():
if pr_info.has_changes_in_documentation_only():
_update_config_for_docs_only(result)
elif args.update_gh_statuses:

View File

@ -1,12 +1,18 @@
#!/usr/bin/env python3
from enum import Enum
import logging
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
from dataclasses import dataclass, field
from pathlib import Path
from typing import Callable, Dict, Iterable, List, Literal, Optional, Union
from integration_test_images import IMAGES
class Labels(Enum):
DO_NOT_TEST_LABEL = "do not test"
@dataclass
class DigestConfig:
@ -22,6 +28,15 @@ class DigestConfig:
git_submodules: bool = False
@dataclass
class LabelConfig:
"""
class to configure different CI scenarious per GH label
"""
run_jobs: Iterable[str] = frozenset()
@dataclass
class JobConfig:
"""
@ -95,7 +110,7 @@ class TestConfig:
BuildConfigs = Dict[str, BuildConfig]
BuildsReportConfig = Dict[str, BuildReportConfig]
TestConfigs = Dict[str, TestConfig]
LabelConfigs = Dict[str, LabelConfig]
# common digests configs
compatibility_check_digest = DigestConfig(
@ -131,20 +146,7 @@ upgrade_check_digest = DigestConfig(
integration_check_digest = DigestConfig(
include_paths=["./tests/ci/integration_test_check.py", "./tests/integration"],
exclude_files=[".md"],
docker=[
"clickhouse/dotnet-client",
"clickhouse/integration-helper",
"clickhouse/integration-test",
"clickhouse/integration-tests-runner",
"clickhouse/kerberized-hadoop",
"clickhouse/kerberos-kdc",
"clickhouse/mysql-golang-client",
"clickhouse/mysql-java-client",
"clickhouse/mysql-js-client",
"clickhouse/mysql-php-client",
"clickhouse/nginx-dav",
"clickhouse/postgresql-java-client",
],
docker=IMAGES.copy(),
)
ast_fuzzer_check_digest = DigestConfig(
@ -188,20 +190,9 @@ bugfix_validate_check = DigestConfig(
"./tests/ci/bugfix_validate_check.py",
],
exclude_files=[".md"],
docker=[
docker=IMAGES.copy()
+ [
"clickhouse/stateless-test",
"clickhouse/dotnet-client",
"clickhouse/integration-helper",
"clickhouse/integration-test",
"clickhouse/integration-tests-runner",
"clickhouse/kerberized-hadoop",
"clickhouse/kerberos-kdc",
"clickhouse/mysql-golang-client",
"clickhouse/mysql-java-client",
"clickhouse/mysql-js-client",
"clickhouse/mysql-php-client",
"clickhouse/nginx-dav",
"clickhouse/postgresql-java-client",
],
)
# common test params
@ -268,6 +259,13 @@ class CiConfig:
builds_report_config: BuildsReportConfig
test_configs: TestConfigs
other_jobs_configs: TestConfigs
label_configs: LabelConfigs
def get_label_config(self, label_name: str) -> Optional[LabelConfig]:
for label, config in self.label_configs.items():
if label_name == label:
return config
return None
def get_job_config(self, check_name: str) -> JobConfig:
res = None
@ -417,6 +415,9 @@ class CiConfig:
CI_CONFIG = CiConfig(
label_configs={
Labels.DO_NOT_TEST_LABEL.value: LabelConfig(run_jobs=["Style check"]),
},
build_config={
"package_release": BuildConfig(
name="package_release",

View File

@ -10,13 +10,8 @@ import sys
from pathlib import Path
from typing import Dict, List, Tuple
from github import Github
from build_download_helper import download_all_deb_packages
from clickhouse_helper import (
ClickHouseHelper,
prepare_tests_results_for_clickhouse,
)
from clickhouse_helper import ClickHouseHelper, prepare_tests_results_for_clickhouse
from commit_status_helper import (
RerunHelper,
get_commit,
@ -24,10 +19,12 @@ from commit_status_helper import (
post_commit_status,
post_commit_status_to_file,
)
from docker_images_helper import DockerImage, pull_image, get_docker_image
from docker_images_helper import DockerImage, get_docker_image, pull_image
from download_release_packages import download_last_release
from env_helper import REPORT_PATH, TEMP_PATH, REPO_COPY
from env_helper import REPO_COPY, REPORT_PATH, TEMP_PATH
from get_robot_token import get_best_robot_token
from github_helper import GitHub
from integration_test_images import IMAGES
from pr_info import PRInfo
from report import ERROR, TestResult, TestResults, read_test_results
from s3_helper import S3Helper
@ -36,24 +33,6 @@ from tee_popen import TeePopen
from upload_result_helper import upload_results
# When update, update
# tests/integration/ci-runner.py:ClickhouseIntegrationTestsRunner.get_images_names too
IMAGES = [
"clickhouse/dotnet-client",
"clickhouse/integration-helper",
"clickhouse/integration-test",
"clickhouse/integration-tests-runner",
"clickhouse/kerberized-hadoop",
"clickhouse/kerberos-kdc",
"clickhouse/mysql-golang-client",
"clickhouse/mysql-java-client",
"clickhouse/mysql-js-client",
"clickhouse/mysql-php-client",
"clickhouse/nginx-dav",
"clickhouse/postgresql-java-client",
]
def get_json_params_dict(
check_name: str,
pr_info: PRInfo,
@ -210,7 +189,7 @@ def main():
logging.info("Skipping '%s' (no pr-bugfix in '%s')", check_name, pr_info.labels)
sys.exit(0)
gh = Github(get_best_robot_token(), per_page=100)
gh = GitHub(get_best_robot_token())
commit = get_commit(gh, pr_info.sha)
rerun_helper = RerunHelper(commit, check_name_with_group)

View File

@ -0,0 +1,31 @@
#!/usr/bin/env python3
IMAGES_ENV = {
"clickhouse/dotnet-client": "DOCKER_DOTNET_CLIENT_TAG",
"clickhouse/integration-helper": "DOCKER_HELPER_TAG",
"clickhouse/integration-test": "DOCKER_BASE_TAG",
"clickhouse/integration-tests-runner": "",
"clickhouse/kerberized-hadoop": "DOCKER_KERBERIZED_HADOOP_TAG",
"clickhouse/kerberos-kdc": "DOCKER_KERBEROS_KDC_TAG",
"clickhouse/mysql-golang-client": "DOCKER_MYSQL_GOLANG_CLIENT_TAG",
"clickhouse/mysql-java-client": "DOCKER_MYSQL_JAVA_CLIENT_TAG",
"clickhouse/mysql-js-client": "DOCKER_MYSQL_JS_CLIENT_TAG",
"clickhouse/mysql-php-client": "DOCKER_MYSQL_PHP_CLIENT_TAG",
"clickhouse/nginx-dav": "DOCKER_NGINX_DAV_TAG",
"clickhouse/postgresql-java-client": "DOCKER_POSTGRESQL_JAVA_CLIENT_TAG",
"clickhouse/python-bottle": "DOCKER_PYTHON_BOTTLE_TAG",
}
IMAGES = list(IMAGES_ENV.keys())
def get_image_env(image: str) -> str:
return IMAGES_ENV.get(image, "")
def get_docker_env(image: str, tag: str) -> str:
"if image belongs to IMAGES_ENV, return `-e` argument for docker command"
env = get_image_env(image)
if not env:
return env
return f"-e {env}={tag} "

View File

@ -32,7 +32,6 @@ TRUSTED_ORG_IDS = {
OK_SKIP_LABELS = {"release", "pr-backport", "pr-cherrypick"}
CAN_BE_TESTED_LABEL = "can be tested"
DO_NOT_TEST_LABEL = "do not test"
FEATURE_LABEL = "pr-feature"
SUBMODULE_CHANGED_LABEL = "submodule changed"
PR_CHECK = "PR Check"
@ -68,10 +67,6 @@ def should_run_ci_for_pr(pr_info: PRInfo) -> Tuple[bool, str]:
print(f"Label '{FORCE_TESTS_LABEL}' set, forcing remaining checks")
return True, f"Labeled '{FORCE_TESTS_LABEL}'"
if DO_NOT_TEST_LABEL in pr_info.labels:
print(f"Label '{DO_NOT_TEST_LABEL}' set, skipping remaining checks")
return False, f"Labeled '{DO_NOT_TEST_LABEL}'"
if OK_SKIP_LABELS.intersection(pr_info.labels):
return True, "Don't try new checks for release/backports/cherry-picks"

View File

@ -1,6 +1,5 @@
#!/usr/bin/env python3
from collections import defaultdict
import csv
import glob
import json
@ -8,13 +7,15 @@ import logging
import os
import random
import re
import shlex
import shutil
import string
import subprocess
import time
import shlex
import zlib # for crc32
from collections import defaultdict
from integration_test_images import IMAGES
MAX_RETRY = 1
NUM_WORKERS = 5
@ -301,23 +302,6 @@ class ClickhouseIntegrationTestsRunner:
def shuffle_test_groups(self):
return self.shuffle_groups != 0
@staticmethod
def get_images_names():
return [
"clickhouse/dotnet-client",
"clickhouse/integration-helper",
"clickhouse/integration-test",
"clickhouse/integration-tests-runner",
"clickhouse/kerberized-hadoop",
"clickhouse/kerberos-kdc",
"clickhouse/mysql-golang-client",
"clickhouse/mysql-java-client",
"clickhouse/mysql-js-client",
"clickhouse/mysql-php-client",
"clickhouse/nginx-dav",
"clickhouse/postgresql-java-client",
]
def _pre_pull_images(self, repo_path):
image_cmd = self._get_runner_image_cmd(repo_path)
@ -523,7 +507,7 @@ class ClickhouseIntegrationTestsRunner:
os.path.join(repo_path, "tests/integration", "runner"),
"--docker-image-version",
):
for img in self.get_images_names():
for img in IMAGES:
if img == "clickhouse/integration-tests-runner":
runner_version = self.get_image_version(img)
logging.info(

View File

@ -0,0 +1 @@
../ci/integration_test_images.py

View File

@ -1,17 +1,17 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import subprocess
import os
import getpass
import glob
import argparse
import glob
import logging
import signal
import subprocess
import sys
import string
import os
import random
import shlex
import signal
import string
import subprocess
import sys
from integration_test_images import get_docker_env
def random_str(length=6):
@ -335,30 +335,11 @@ if __name__ == "__main__":
if args.docker_compose_images_tags is not None:
for img_tag in args.docker_compose_images_tags:
[image, tag] = img_tag.split(":")
if image == "clickhouse/dotnet-client":
env_tags += "-e {}={} ".format("DOCKER_DOTNET_CLIENT_TAG", tag)
elif image == "clickhouse/integration-helper":
env_tags += "-e {}={} ".format("DOCKER_HELPER_TAG", tag)
elif image == "clickhouse/integration-test":
env_tags += "-e {}={} ".format("DOCKER_BASE_TAG", tag)
elif image == "clickhouse/kerberized-hadoop":
env_tags += "-e {}={} ".format("DOCKER_KERBERIZED_HADOOP_TAG", tag)
elif image == "clickhouse/kerberos-kdc":
env_tags += "-e {}={} ".format("DOCKER_KERBEROS_KDC_TAG", tag)
elif image == "clickhouse/mysql-golang-client":
env_tags += "-e {}={} ".format("DOCKER_MYSQL_GOLANG_CLIENT_TAG", tag)
elif image == "clickhouse/mysql-java-client":
env_tags += "-e {}={} ".format("DOCKER_MYSQL_JAVA_CLIENT_TAG", tag)
elif image == "clickhouse/mysql-js-client":
env_tags += "-e {}={} ".format("DOCKER_MYSQL_JS_CLIENT_TAG", tag)
elif image == "clickhouse/mysql-php-client":
env_tags += "-e {}={} ".format("DOCKER_MYSQL_PHP_CLIENT_TAG", tag)
elif image == "clickhouse/nginx-dav":
env_tags += "-e {}={} ".format("DOCKER_NGINX_DAV_TAG", tag)
elif image == "clickhouse/postgresql-java-client":
env_tags += "-e {}={} ".format("DOCKER_POSTGRESQL_JAVA_CLIENT_TAG", tag)
env_tag = get_docker_env(image, tag)
if env_tag:
env_tags += env_tag
else:
logging.info("Unknown image %s" % (image))
logging.info("Unknown image %s", image)
# create named volume which will be used inside to store images and other docker related files,
# to avoid redownloading it every time

View File

@ -115,9 +115,8 @@ def test_parallel_quorum_actually_quorum(started_cluster):
error = node.query_and_get_error(
"INSERT INTO q VALUES(3, 'Hi')", settings=settings
)
assert "DB::Exception: Unknown status, client must retry." in error, error
assert (
"DB::Exception: Timeout while waiting for quorum. (TIMEOUT_EXCEEDED)"
"DB::Exception: Unknown quorum status. The data was inserted in the local replica but we could not verify quorum. Reason: Timeout while waiting for quorum"
in error
), error

View File

@ -1,15 +1,15 @@
runtime messages 0.001
runtime exceptions 0.05
unknown runtime exceptions 0.01
messages shorter than 10 1
messages shorter than 16 3
exceptions shorter than 30 3 []
noisy messages 0.3
noisy Trace messages 0.16
noisy Debug messages 0.09
noisy Info messages 0.05
noisy Warning messages 0.01
noisy Error messages 0.02
runtime messages 0.001 []
runtime exceptions 0.05 []
unknown runtime exceptions 0.01 []
messages shorter than 10 1 []
messages shorter than 16 1 []
exceptions shorter than 30 1 []
noisy messages 0.3
noisy Trace messages 0.16
noisy Debug messages 0.09
noisy Info messages 0.05
noisy Warning messages 0.01
noisy Error messages 0.03
no Fatal messages 0
number of too noisy messages 3
number of noisy messages 10

View File

@ -9,17 +9,61 @@ create view logs as select * from system.text_log where now() - toIntervalMinute
-- Check that we don't have too many messages formatted with fmt::runtime or strings concatenation.
-- 0.001 threshold should be always enough, the value was about 0.00025
select 'runtime messages', greatest(coalesce(sum(length(message_format_string) = 0) / countOrNull(), 0), 0.001) from logs
where message not like '% Received from %clickhouse-staging.com:9440%' and source_file not like '%/AWSLogger.cpp%';
WITH 0.001 AS threshold
SELECT
'runtime messages',
greatest(coalesce(sum(length(message_format_string) = 0) / countOrNull(), 0) as v, threshold),
v <= threshold ? [] :
(SELECT groupArray((message, c)) FROM (
SELECT message, count() as c FROM logs
WHERE
length(message_format_string) = 0
AND message not like '% Received from %clickhouse-staging.com:9440%'
AND source_file not like '%/AWSLogger.cpp%'
GROUP BY message ORDER BY c LIMIT 10
))
FROM logs
WHERE
message NOT LIKE '% Received from %clickhouse-staging.com:9440%'
AND source_file not like '%/AWSLogger.cpp%';
-- Check the same for exceptions. The value was 0.03
select 'runtime exceptions', greatest(coalesce(sum(length(message_format_string) = 0) / countOrNull(), 0), 0.05) from logs
where (message like '%DB::Exception%' or message like '%Coordination::Exception%')
and message not like '% Received from %clickhouse-staging.com:9440%';
WITH 0.05 AS threshold
SELECT
'runtime exceptions',
greatest(coalesce(sum(length(message_format_string) = 0) / countOrNull(), 0) as v, threshold),
v <= threshold ? [] :
(SELECT groupArray((message, c)) FROM (
SELECT message, count() as c FROM logs
WHERE
length(message_format_string) = 0
AND (message like '%DB::Exception%' or message like '%Coordination::Exception%')
AND message not like '% Received from %clickhouse-staging.com:9440%'
GROUP BY message ORDER BY c LIMIT 10
))
FROM logs
WHERE
message NOT LIKE '% Received from %clickhouse-staging.com:9440%'
AND (message like '%DB::Exception%' or message like '%Coordination::Exception%');
WITH 0.01 AS threshold
SELECT
'unknown runtime exceptions',
greatest(coalesce(sum(length(message_format_string) = 0) / countOrNull(), 0) as v, threshold),
v <= threshold ? [] :
(SELECT groupArray((message, c)) FROM (
SELECT message, count() as c FROM logs
WHERE
length(message_format_string) = 0
AND (message like '%DB::Exception%' or message like '%Coordination::Exception%')
AND message not like '% Received from %' and message not like '%(SYNTAX_ERROR)%'
GROUP BY message ORDER BY c LIMIT 10
))
FROM logs
WHERE
(message like '%DB::Exception%' or message like '%Coordination::Exception%')
AND message not like '% Received from %' and message not like '%(SYNTAX_ERROR)%';
select 'unknown runtime exceptions', greatest(coalesce(sum(length(message_format_string) = 0) / countOrNull(), 0), 0.01) from logs where
(message like '%DB::Exception%' or message like '%Coordination::Exception%')
and message not like '% Received from %' and message not like '%(SYNTAX_ERROR)%';
-- FIXME some of the following messages are not informative and it has to be fixed
create temporary table known_short_messages (s String) as select * from (select
@ -51,15 +95,20 @@ create temporary table known_short_messages (s String) as select * from (select
] as arr) array join arr;
-- Check that we don't have too many short meaningless message patterns.
WITH 1 AS max_messages
select 'messages shorter than 10',
greatest(uniqExact(message_format_string), 1)
(uniqExact(message_format_string) as c) <= max_messages,
c <= max_messages ? [] : groupUniqArray(message_format_string)
from logs
where length(message_format_string) < 10 and message_format_string not in known_short_messages;
-- Same as above. Feel free to update the threshold or remove this query if really necessary
WITH 3 AS max_messages
select 'messages shorter than 16',
greatest(uniqExact(message_format_string), 3)
from logs where length(message_format_string) < 16 and message_format_string not in known_short_messages;
(uniqExact(message_format_string) as c) <= max_messages,
c <= max_messages ? [] : groupUniqArray(message_format_string)
from logs
where length(message_format_string) < 16 and message_format_string not in known_short_messages;
-- Unlike above, here we look at length of the formatted message, not format string. Most short format strings are fine because they end up decorated with context from outer or inner exceptions, e.g.:
-- "Expected end of line" -> "Code: 117. DB::Exception: Expected end of line: (in file/uri /var/lib/clickhouse/user_files/data_02118): (at row 1)"
@ -68,40 +117,53 @@ select 'messages shorter than 16',
-- This table currently doesn't have enough information to do this reliably, so we just regex search for " (ERROR_NAME_IN_CAPS)" and hope that's good enough.
-- For the "Code: 123. DB::Exception: " part, we just subtract 26 instead of searching for it. Because sometimes it's not at the start, e.g.:
-- "Unexpected error, will try to restart main thread: Code: 341. DB::Exception: Unexpected error: Code: 57. DB::Exception:[...]"
WITH 3 AS max_messages
select 'exceptions shorter than 30',
greatest(uniqExact(message_format_string), 3) AS c,
c = 3 ? [] : groupUniqArray(message_format_string)
(uniqExact(message_format_string) as c) <= max_messages,
c <= max_messages ? [] : groupUniqArray(message_format_string)
from logs
where message ilike '%DB::Exception%' and if(length(extract(message, '(.*)\\([A-Z0-9_]+\\)')) as pref > 0, pref, length(message)) < 30 + 26 and message_format_string not in known_short_messages;
-- Avoid too noisy messages: top 1 message frequency must be less than 30%. We should reduce the threshold
select 'noisy messages',
greatest((select count() from logs group by message_format_string order by count() desc limit 1) / (select count() from logs), 0.30);
WITH 0.30 as threshold
select
'noisy messages',
greatest(coalesce(((select message_format_string, count() from logs group by message_format_string order by count() desc limit 1) as top_message).2, 0) / (select count() from logs), threshold) as r,
r <= threshold ? '' : top_message.1;
-- Same as above, but excluding Test level (actually finds top 1 Trace message)
with ('Access granted: {}{}', '{} -> {}') as frequent_in_tests
select 'noisy Trace messages',
greatest((select count() from logs where level!='Test' and message_format_string not in frequent_in_tests
group by message_format_string order by count() desc limit 1) / (select count() from logs), 0.16);
with 0.16 as threshold
select
'noisy Trace messages',
greatest(coalesce(((select message_format_string, count() from logs where level = 'Trace' and message_format_string not in ('Access granted: {}{}', '{} -> {}')
group by message_format_string order by count() desc limit 1) as top_message).2, 0) / (select count() from logs), threshold) as r,
r <= threshold ? '' : top_message.1;
-- Same as above for Debug
WITH 0.09 as threshold
select 'noisy Debug messages',
greatest((select count() from logs where level <= 'Debug' group by message_format_string order by count() desc limit 1) / (select count() from logs), 0.09);
greatest(coalesce(((select message_format_string, count() from logs where level = 'Debug' group by message_format_string order by count() desc limit 1) as top_message).2, 0) / (select count() from logs), threshold) as r,
r <= threshold ? '' : top_message.1;
-- Same as above for Info
WITH 0.05 as threshold
select 'noisy Info messages',
greatest((select count() from logs where level <= 'Information' group by message_format_string order by count() desc limit 1) / (select count() from logs), 0.05);
greatest(coalesce(((select message_format_string, count() from logs where level = 'Information' group by message_format_string order by count() desc limit 1) as top_message).2, 0) / (select count() from logs), threshold) as r,
r <= threshold ? '' : top_message.1;
-- Same as above for Warning
with ('Not enabled four letter command {}') as frequent_in_tests
select 'noisy Warning messages',
greatest(coalesce((select count() from logs where level = 'Warning' and message_format_string not in frequent_in_tests
group by message_format_string order by count() desc limit 1), 0) / (select count() from logs), 0.01);
with 0.01 as threshold
select
'noisy Warning messages',
greatest(coalesce(((select message_format_string, count() from logs where level = 'Warning' and message_format_string not in ('Not enabled four letter command {}')
group by message_format_string order by count() desc limit 1) as top_message).2, 0) / (select count() from logs), threshold) as r,
r <= threshold ? '' : top_message.1;
-- Same as above for Error
WITH 0.03 as threshold
select 'noisy Error messages',
greatest(coalesce((select count() from logs where level = 'Error' group by message_format_string order by count() desc limit 1), 0) / (select count() from logs), 0.02);
greatest(coalesce(((select message_format_string, count() from logs where level = 'Error' group by message_format_string order by count() desc limit 1) as top_message).2, 0) / (select count() from logs), threshold) as r,
r <= threshold ? '' : top_message.1;
select 'no Fatal messages', count() from logs where level = 'Fatal';

View File

@ -20,7 +20,6 @@ SET select_sequential_consistency=1;
SELECT x FROM quorum1 ORDER BY x;
SELECT x FROM quorum2 ORDER BY x;
SET insert_keeper_fault_injection_probability=0;
SET insert_quorum=2, insert_quorum_parallel=0;
INSERT INTO quorum1 VALUES (4, '1990-11-15');

View File

@ -11,7 +11,6 @@ CREATE TABLE quorum2(x UInt32, y Date) ENGINE ReplicatedMergeTree('/clickhouse/t
SET insert_quorum=2, insert_quorum_parallel=0;
SET select_sequential_consistency=1;
SET insert_keeper_fault_injection_probability=0;
INSERT INTO quorum1 VALUES (1, '2018-11-15');
INSERT INTO quorum1 VALUES (2, '2018-11-15');

View File

@ -11,7 +11,6 @@ CREATE TABLE quorum2(x UInt32, y Date) ENGINE ReplicatedMergeTree('/clickhouse/t
SET insert_quorum=2, insert_quorum_parallel=0;
SET select_sequential_consistency=1;
SET insert_keeper_fault_injection_probability=0;
SET insert_quorum_timeout=0;

View File

@ -17,7 +17,6 @@ SYSTEM SYNC REPLICA quorum2;
SET select_sequential_consistency=1;
SET insert_quorum=2, insert_quorum_parallel=0;
SET insert_keeper_fault_injection_probability=0;
SET insert_quorum_timeout=0;

View File

@ -11,7 +11,6 @@ CREATE TABLE quorum2(x UInt32, y Date) ENGINE ReplicatedMergeTree('/clickhouse/t
SET insert_quorum=2, insert_quorum_parallel=0;
SET select_sequential_consistency=1;
SET insert_keeper_fault_injection_probability=0;
INSERT INTO quorum1 VALUES (1, '2018-11-15');
INSERT INTO quorum1 VALUES (2, '2018-11-15');

View File

@ -11,7 +11,6 @@ CREATE TABLE quorum2(x UInt32, y Date) ENGINE ReplicatedMergeTree('/clickhouse/t
SET insert_quorum=2, insert_quorum_parallel=0;
SET select_sequential_consistency=1;
SET insert_keeper_fault_injection_probability=0;
INSERT INTO quorum1 VALUES (1, '2018-11-15');
INSERT INTO quorum1 VALUES (2, '2018-11-15');

View File

@ -9,7 +9,6 @@ CREATE TABLE mutations_and_quorum2 (`server_date` Date, `something` String) ENGI
-- Should not be larger then 600e6 (default timeout in clickhouse-test)
SET insert_quorum=2, insert_quorum_parallel=0, insert_quorum_timeout=300e3;
SET insert_keeper_fault_injection_probability=0;
INSERT INTO mutations_and_quorum1 VALUES ('2019-01-01', 'test1'), ('2019-02-01', 'test2'), ('2019-03-01', 'test3'), ('2019-04-01', 'test4'), ('2019-05-01', 'test1'), ('2019-06-01', 'test2'), ('2019-07-01', 'test3'), ('2019-08-01', 'test4'), ('2019-09-01', 'test1'), ('2019-10-01', 'test2'), ('2019-11-01', 'test3'), ('2019-12-01', 'test4');

View File

@ -1,6 +1,5 @@
-- Tags: long, replica, no-replicated-database
SET insert_keeper_fault_injection_probability=0; -- disable fault injection; part ids are non-deterministic in case of insert retries
SET replication_alter_partitions_sync = 2;
@ -10,7 +9,7 @@ DROP TABLE IF EXISTS replica2;
CREATE TABLE replica1 (v UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/test/01451/quorum', 'r1') order by tuple() settings max_replicated_merges_in_queue = 0;
CREATE TABLE replica2 (v UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/test/01451/quorum', 'r2') order by tuple() settings max_replicated_merges_in_queue = 0;
INSERT INTO replica1 VALUES (0);
INSERT INTO replica1 SETTINGS insert_keeper_fault_injection_probability=0 VALUES (0);
SYSTEM SYNC REPLICA replica2;
@ -27,7 +26,7 @@ ALTER TABLE replica2 DROP PARTITION ID 'all';
SET insert_quorum = 2, insert_quorum_parallel = 0;
INSERT INTO replica2 VALUES (1);
INSERT INTO replica2 SETTINGS insert_keeper_fault_injection_probability=0 VALUES (1);
SYSTEM SYNC REPLICA replica2;
@ -39,7 +38,7 @@ SELECT COUNT() FROM replica1;
SET insert_quorum_parallel=1;
INSERT INTO replica2 VALUES (2);
INSERT INTO replica2 SETTINGS insert_keeper_fault_injection_probability=0 VALUES (2);
-- should work, parallel quorum nodes exists only during insert
ALTER TABLE replica1 DROP PART 'all_3_3_0';

View File

@ -1,7 +1,6 @@
-- Tags: long, replica, no-replicated-database
-- Tag no-replicated-database: Fails due to additional replicas or shards
SET insert_keeper_fault_injection_probability=0; -- disable fault injection; part ids are non-deterministic in case of insert retries
SET replication_alter_partitions_sync = 2;
DROP TABLE IF EXISTS replica1 SYNC;
@ -10,9 +9,9 @@ DROP TABLE IF EXISTS replica2 SYNC;
CREATE TABLE replica1 (v UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/'||currentDatabase()||'test/01451/attach', 'r1') order by tuple() settings max_replicated_merges_in_queue = 0;
CREATE TABLE replica2 (v UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/'||currentDatabase()||'test/01451/attach', 'r2') order by tuple() settings max_replicated_merges_in_queue = 0;
INSERT INTO replica1 VALUES (0);
INSERT INTO replica1 VALUES (1);
INSERT INTO replica1 VALUES (2);
INSERT INTO replica1 SETTINGS insert_keeper_fault_injection_probability=0 VALUES (0);
INSERT INTO replica1 SETTINGS insert_keeper_fault_injection_probability=0 VALUES (1);
INSERT INTO replica1 SETTINGS insert_keeper_fault_injection_probability=0 VALUES (2);
ALTER TABLE replica1 DETACH PART 'all_100_100_0'; -- { serverError 232 }
@ -25,7 +24,7 @@ SELECT v FROM replica1 ORDER BY v;
SELECT name FROM system.detached_parts WHERE table = 'replica2' AND database = currentDatabase();
ALTER TABLE replica2 ATTACH PART 'all_1_1_0';
ALTER TABLE replica2 ATTACH PART 'all_1_1_0' SETTINGS insert_keeper_fault_injection_probability=0;
SYSTEM SYNC REPLICA replica1;
SELECT v FROM replica1 ORDER BY v;

View File

@ -20,10 +20,6 @@ function thread {
for x in {0..99}; do
# sometimes we can try to commit obsolete part if fetches will be quite fast,
# so supress warning messages like "Tried to commit obsolete part ... covered by ..."
# (2) keeper fault injection for inserts because
# it can be a cause of deduplicated parts be visible to SELECTs for sometime (until cleanup thread remove them),
# so the same SELECT on different replicas can return different results, i.e. test output will be non-deterministic
# (see #9712)
$CLICKHOUSE_CLIENT --insert_keeper_fault_injection_probability=0 --query "INSERT INTO r$1 SELECT $x % $NUM_REPLICAS = $1 ? $x - 1 : $x" 2>/dev/null # Replace some records as duplicates so they will be written by other replicas
done
}

View File

@ -24,7 +24,7 @@ function thread {
while true; do
$CLICKHOUSE_CLIENT --query "DETACH TABLE r$1"
$CLICKHOUSE_CLIENT --query "ATTACH TABLE r$1"
$CLICKHOUSE_CLIENT --insert_quorum 3 --insert_quorum_parallel 0 --insert_keeper_fault_injection_probability=0 --query "INSERT INTO r$1 SELECT $x" 2>&1 | grep -qE "$valid_exceptions_to_retry" || break
$CLICKHOUSE_CLIENT --insert_quorum 3 --insert_quorum_parallel 0 --query "INSERT INTO r$1 SELECT $x" 2>&1 | grep -qE "$valid_exceptions_to_retry" || break
done
done
}

View File

@ -20,7 +20,7 @@ done
function thread {
i=0 retries=300
while [[ $i -lt $retries ]]; do # server can be dead
$CLICKHOUSE_CLIENT --insert_quorum 3 --insert_quorum_parallel 1 --insert_keeper_fault_injection_probability=0 --query "INSERT INTO r$1 SELECT $2" && break
$CLICKHOUSE_CLIENT --insert_quorum 3 --insert_quorum_parallel 1 --query "INSERT INTO r$1 SELECT $2" && break
((++i))
sleep 0.1
done

View File

@ -21,7 +21,7 @@ done
$CLICKHOUSE_CLIENT -n -q "SYSTEM STOP REPLICATION QUEUES r2;"
function thread {
$CLICKHOUSE_CLIENT --insert_quorum 2 --insert_quorum_parallel 1 --insert_keeper_fault_injection_probability=0 --query "INSERT INTO r1 SELECT $1"
$CLICKHOUSE_CLIENT --insert_quorum 2 --insert_quorum_parallel 1 --query "INSERT INTO r1 SELECT $1"
}
for i in $(seq 1 $NUM_INSERTS); do

View File

@ -20,10 +20,9 @@ $CLICKHOUSE_CLIENT -q "CREATE TABLE parallel_q2 (x UInt64) ENGINE=ReplicatedMerg
$CLICKHOUSE_CLIENT -q "SYSTEM STOP REPLICATION QUEUES parallel_q2"
$CLICKHOUSE_CLIENT --insert_keeper_fault_injection_probability=0 -q "INSERT INTO parallel_q1 VALUES (1)"
# disable keeper fault injection during insert since test checks part names. Part names can differ in case of retries during insert
$CLICKHOUSE_CLIENT --insert_quorum 2 --insert_quorum_parallel 1 --insert_keeper_fault_injection_probability=0 --query="INSERT INTO parallel_q1 VALUES (2)" &
# This test depends on part names and those aren't deterministic with faults
$CLICKHOUSE_CLIENT --insert_keeper_fault_injection_probability=0 -q "INSERT INTO parallel_q1 VALUES (1)"
$CLICKHOUSE_CLIENT --insert_keeper_fault_injection_probability=0 --insert_quorum 2 --insert_quorum_parallel 1 --query="INSERT INTO parallel_q1 VALUES (2)" &
part_count=$($CLICKHOUSE_CLIENT --query="SELECT COUNT() FROM system.parts WHERE table='parallel_q1' and database='${CLICKHOUSE_DATABASE}'")

View File

@ -16,8 +16,6 @@ CREATE TABLE r2 (
ENGINE = ReplicatedMergeTree('/clickhouse/{database}/01509_parallel_quorum_insert_no_replicas', '2')
ORDER BY tuple();
SET insert_keeper_fault_injection_probability=0;
SET insert_quorum_parallel=1;
SET insert_quorum=3;

View File

@ -20,7 +20,6 @@ SYSTEM SYNC REPLICA quorum3;
SET select_sequential_consistency=0;
SET optimize_trivial_count_query=1;
SET insert_quorum=2, insert_quorum_parallel=0;
SET insert_keeper_fault_injection_probability=0;
SYSTEM STOP FETCHES quorum1;

View File

@ -2,8 +2,6 @@
-- Tag no-replicated-database: Fails due to additional replicas or shards
-- Tag no-parallel: static zk path
SET insert_keeper_fault_injection_probability=0; -- disable fault injection; part ids are non-deterministic in case of insert retries
DROP TABLE IF EXISTS execute_on_single_replica_r1 SYNC;
DROP TABLE IF EXISTS execute_on_single_replica_r2 SYNC;
@ -11,7 +9,7 @@ DROP TABLE IF EXISTS execute_on_single_replica_r2 SYNC;
CREATE TABLE execute_on_single_replica_r1 (x UInt64) ENGINE=ReplicatedMergeTree('/clickhouse/tables/test_01532/execute_on_single_replica', 'r1') ORDER BY tuple() SETTINGS execute_merges_on_single_replica_time_threshold=10;
CREATE TABLE execute_on_single_replica_r2 (x UInt64) ENGINE=ReplicatedMergeTree('/clickhouse/tables/test_01532/execute_on_single_replica', 'r2') ORDER BY tuple() SETTINGS execute_merges_on_single_replica_time_threshold=10;
INSERT INTO execute_on_single_replica_r1 VALUES (1);
INSERT INTO execute_on_single_replica_r1 SETTINGS insert_keeper_fault_injection_probability=0 VALUES (1);
SYSTEM SYNC REPLICA execute_on_single_replica_r2;
SET optimize_throw_if_noop=1;

View File

@ -4,7 +4,7 @@ DROP TABLE IF EXISTS test_01640;
DROP TABLE IF EXISTS restore_01640;
CREATE TABLE test_01640(i Int64, d Date, s String)
ENGINE = ReplicatedMergeTree('/clickhouse/{database}/{shard}/tables/test_01640','{replica}')
ENGINE = ReplicatedMergeTree('/clickhouse/{database}/{shard}/tables/test_01640','{replica}')
PARTITION BY toYYYYMM(d) ORDER BY i
SETTINGS allow_remote_fs_zero_copy_replication=0;
@ -16,13 +16,13 @@ PARTITION BY toYYYYMM(d) ORDER BY i
SETTINGS allow_remote_fs_zero_copy_replication=0;
ALTER TABLE restore_01640 FETCH PARTITION tuple(toYYYYMM(toDate('2021-01-01')))
FROM '/clickhouse/{database}/{shard}/tables/test_01640';
FROM '/clickhouse/{database}/{shard}/tables/test_01640' SETTINGS insert_keeper_fault_injection_probability=0;
SELECT partition_id
FROM system.detached_parts
WHERE (table = 'restore_01640') AND (database = currentDatabase());
ALTER TABLE restore_01640 ATTACH PARTITION tuple(toYYYYMM(toDate('2021-01-01')));
ALTER TABLE restore_01640 ATTACH PARTITION tuple(toYYYYMM(toDate('2021-01-01'))) SETTINGS insert_keeper_fault_injection_probability=0;;
SELECT partition_id
FROM system.detached_parts

View File

@ -29,3 +29,23 @@
['1'] [] 0
[] [] 3
---
[] 0 ['2']
['0'] 2 ['0']
['0'] 2 ['0']
['1'] 1 []
[] 3 []
---
[] 0 ['2'] 1
['0'] 2 ['0'] 2
['1'] 1 [] 0
[] 3 [] 3
---
[] ['2'] 1
['0'] ['0'] 2
['0'] ['0'] 2
['1'] [] 0
[] [] 3

View File

@ -70,6 +70,12 @@ ALL LEFT JOIN
) AS js2 USING (a)
ORDER BY b ASC NULLS FIRST;
{% for join_algorithm in ['default', 'partial_merge'] -%}
SET join_algorithm = '{{ join_algorithm }}';
SELECT '---';
SELECT
*
@ -112,3 +118,5 @@ FULL JOIN (
ON l.item_id = r.item_id
ORDER BY 1,2,3
;
{% endfor %}

View File

@ -88,3 +88,4 @@ QUERY id: 0
COLUMN id: 7, column_name: a, result_type: Int32, source_id: 3
CONSTANT id: 8, constant_value: UInt64_2, constant_value_type: UInt8
1
1

View File

@ -25,4 +25,6 @@ EXPLAIN QUERY TREE SELECT * FROM 02668_logical_optimizer WHERE a = 3 AND b = 'an
SELECT * FROM 02668_logical_optimizer WHERE a = 2 AND 2 = a;
EXPLAIN QUERY TREE SELECT * FROM 02668_logical_optimizer WHERE a = 2 AND 2 = a;
SELECT a FROM 02668_logical_optimizer WHERE (b = 'test') AND ('test' = b);
SELECT (k = 3) OR ( (k = 1) OR (k = 2) OR ( (NULL OR 1) = k ) ) FROM ( SELECT materialize(1) AS k );

View File

@ -75,3 +75,5 @@ QUERY id: 0
LIST id: 6, nodes: 2
COLUMN id: 7, column_name: a, result_type: Nullable(Int32), source_id: 3
CONSTANT id: 8, constant_value: Tuple_(UInt64_1, UInt64_3, UInt64_2), constant_value_type: Tuple(UInt8, UInt8, UInt8)
1
1

View File

@ -29,4 +29,7 @@ INSERT INTO 02702_logical_optimizer_with_null_column VALUES (1, 'test'), (2, 'te
SELECT * FROM 02702_logical_optimizer_with_null_column WHERE a = 1 OR 3 = a OR 2 = a;
EXPLAIN QUERY TREE SELECT * FROM 02702_logical_optimizer_with_null_column WHERE a = 1 OR 3 = a OR 2 = a;
SELECT materialize(1) AS k WHERE NULL OR (0 OR (k = 2) OR (k = CAST(1, 'Nullable(UInt8)') OR k = 3));
SELECT (k = 2) OR (k = 1) OR ((NULL OR 1) = k) FROM (SELECT 1 AS k);
DROP TABLE 02702_logical_optimizer_with_null_column;

View File

@ -7,6 +7,7 @@ CREATE TABLE quorum1(x UInt32) ENGINE ReplicatedMergeTree('/clickhouse/tables/{d
CREATE TABLE quorum2(x UInt32) ENGINE ReplicatedMergeTree('/clickhouse/tables/{database}/test_02887/quorum', '2') ORDER BY x;
SET insert_keeper_fault_injection_probability=0;
SET insert_keeper_max_retries = 0;
SET insert_quorum = 2;
system enable failpoint replicated_merge_tree_insert_quorum_fail_0;

View File

@ -29,7 +29,7 @@ $CLICKHOUSE_CLIENT \
--query_id "${query_id}" \
--max_parallel_replicas 3 \
--prefer_localhost_replica 1 \
--cluster_for_parallel_replicas "parallel_replicas" \
--cluster_for_parallel_replicas "test_cluster_one_shard_three_replicas_localhost" \
--allow_experimental_parallel_reading_from_replicas 1 \
--parallel_replicas_for_non_replicated_merge_tree 1 \
--parallel_replicas_min_number_of_rows_per_replica 0 \
@ -62,7 +62,7 @@ $CLICKHOUSE_CLIENT \
--query_id "${query_id}" \
--max_parallel_replicas 3 \
--prefer_localhost_replica 1 \
--cluster_for_parallel_replicas "parallel_replicas" \
--cluster_for_parallel_replicas "test_cluster_one_shard_three_replicas_localhost" \
--allow_experimental_parallel_reading_from_replicas 1 \
--parallel_replicas_for_non_replicated_merge_tree 1 \
--parallel_replicas_min_number_of_rows_per_replica 0 \

View File

@ -11,7 +11,7 @@ SET
allow_experimental_parallel_reading_from_replicas=1,
max_parallel_replicas=2,
use_hedged_requests=0,
cluster_for_parallel_replicas='parallel_replicas',
cluster_for_parallel_replicas='test_cluster_one_shard_three_replicas_localhost',
parallel_replicas_for_non_replicated_merge_tree=1
;

View File

@ -0,0 +1,74 @@
0 0
1 1
2 2
3 3
4 4
5 5
6 6
7 7
8 8
9 9
0 9
1 8
2 7
3 6
4 5
5 4
6 3
7 2
8 1
9 0
1 2
0 0
1 1
2 2
3 3
4 4
5 5
6 0
7 1
8 2
9 3
10 4
0 0
1 1
0 0 0 0
1 1 1 1
2 2 2 2
3 3 3 3
4 4 4 4
5 5 5 5
6 6 6 6
7 7 7 7
8 8 8 8
9 9 9 9
10 10 10 10
11 11 11 11
12 12 12 12
13 13 13 13
14 14 14 14
15 15 15 15
16 16 16 16
17 17 17 17
18 18 18 18
19 19 19 19
20 20 20 20
21 21 21 21
22 22 22 22
23 23 23 23
24 24 24 24
25 25 25 25
26 26 26 26
27 27 27 27
28 28 28 28
29 29 29 29
UInt64
UInt64
UInt64
UInt64
UInt64
UInt64
UInt64
UInt64
UInt64
UInt64

View File

@ -0,0 +1,37 @@
select * from (SELECT number as a FROM numbers(10)) t1 PASTE JOIN (select number as a from numbers(10)) t2;
select * from (SELECT number as a FROM numbers(10)) t1 PASTE JOIN (select number as a from numbers(10) order by a desc) t2;
create table if not exists test (num UInt64) engine=Memory;
insert into test select number from numbers(6);
insert into test select number from numbers(5);
SELECT * FROM (SELECT 1) t1 PASTE JOIN (SELECT 2) SETTINGS joined_subquery_requires_alias=0;
select * from (SELECT number as a FROM numbers(11)) t1 PASTE JOIN test t2 SETTINGS max_threads=1;
select * from (SELECT number as a FROM numbers(11)) t1 PASTE JOIN (select * from test limit 2) t2 SETTINGs max_threads=1;
CREATE TABLE t1 (a UInt64, b UInt64) ENGINE = Memory;
INSERT INTO t1 SELECT number, number FROM numbers(0, 3);
INSERT INTO t1 SELECT number, number FROM numbers(3, 2);
INSERT INTO t1 SELECT number, number FROM numbers(5, 7);
INSERT INTO t1 SELECT number, number FROM numbers(12, 2);
INSERT INTO t1 SELECT number, number FROM numbers(14, 1);
INSERT INTO t1 SELECT number, number FROM numbers(15, 2);
INSERT INTO t1 SELECT number, number FROM numbers(17, 1);
INSERT INTO t1 SELECT number, number FROM numbers(18, 2);
INSERT INTO t1 SELECT number, number FROM numbers(20, 2);
INSERT INTO t1 SELECT number, number FROM numbers(22, 2);
INSERT INTO t1 SELECT number, number FROM numbers(24, 2);
INSERT INTO t1 SELECT number, number FROM numbers(26, 2);
INSERT INTO t1 SELECT number, number FROM numbers(28, 2);
CREATE TABLE t2 (a UInt64, b UInt64) ENGINE = Memory;
INSERT INTO t2 SELECT number, number FROM numbers(0, 2);
INSERT INTO t2 SELECT number, number FROM numbers(2, 3);
INSERT INTO t2 SELECT number, number FROM numbers(5, 5);
INSERT INTO t2 SELECT number, number FROM numbers(10, 5);
INSERT INTO t2 SELECT number, number FROM numbers(15, 15);
SELECT * FROM ( SELECT * from t1 ) t1 PASTE JOIN ( SELECT * from t2 ) t2 SETTINGS max_threads = 1;
SELECT toTypeName(a) FROM (SELECT number as a FROM numbers(11)) t1 PASTE JOIN (select number as a from numbers(10)) t2 SETTINGS join_use_nulls = 1;
SET max_threads = 2;
select * from (SELECT number as a FROM numbers(10)) t1 ANY PASTE JOIN (select number as a from numbers(10)) t2; -- { clientError SYNTAX_ERROR }
select * from (SELECT number as a FROM numbers(10)) t1 ALL PASTE JOIN (select number as a from numbers(10)) t2; -- { clientError SYNTAX_ERROR }
select * from (SELECT number as a FROM numbers_mt(10)) t1 PASTE JOIN (select number as a from numbers(10) ORDER BY a DESC) t2 SETTINGS max_block_size=3; -- { serverError BAD_ARGUMENTS }

View File

@ -0,0 +1,5 @@
thread = 0
thread != 0
Send signal to
thread_name = 'foo'
Send signal to 0 threads (total)

View File

@ -0,0 +1,20 @@
#!/usr/bin/env bash
# Tags: no-parallel
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
# NOTE: due to grep "Cannot obtain a stack trace for thread {}' will be ignored automatically, which is the intention.
# no message at all
echo "thread = 0"
$CLICKHOUSE_CLIENT --allow_repeated_settings --send_logs_level=test -nm -q "select * from system.stack_trace where thread_id = 0" |& grep -F -o 'Send signal to'
# send messages to some threads
echo "thread != 0"
$CLICKHOUSE_CLIENT --allow_repeated_settings --send_logs_level=test -nm -q "select * from system.stack_trace where thread_id != 0 format Null" |& grep -F -o 'Send signal to' | grep -v 'Send signal to 0 threads (total)'
# there is no thread with comm="foo", so no signals will be sent
echo "thread_name = 'foo'"
$CLICKHOUSE_CLIENT --allow_repeated_settings --send_logs_level=test -nm -q "select * from system.stack_trace where thread_name = 'foo' format Null" |& grep -F -o 'Send signal to 0 threads (total)'

View File

@ -11,7 +11,7 @@ ENGINE = Distributed(test_cluster_one_shard_three_replicas_localhost, currentDat
SELECT count(), sum(id)
FROM test_d
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, max_parallel_replicas = 3, prefer_localhost_replica = 0;
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 3, prefer_localhost_replica = 0, parallel_replicas_for_non_replicated_merge_tree=1;
DROP TABLE test_d;
DROP TABLE test;

View File

@ -0,0 +1,12 @@
DROP TABLE IF EXISTS test;
CREATE TABLE test (id UInt64, date Date)
ENGINE = MergeTree
ORDER BY id
AS select *, '2023-12-25' from numbers(100);
SELECT count(), sum(id)
FROM remote('127.0.0.1|127.0.0.2|127.0.0.3|127.0.0.4', currentDatabase(), test)
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 4, prefer_localhost_replica = 0, parallel_replicas_for_non_replicated_merge_tree = 1; -- { serverError CLUSTER_DOESNT_EXIST }
DROP TABLE test;

View File

@ -0,0 +1 @@
6 111111111111111111111111111111111111111

View File

@ -0,0 +1,9 @@
DROP TABLE IF EXISTS test;
CREATE TABLE test (x UInt8) ENGINE = MergeTree ORDER BY x;
INSERT INTO test VALUES (1), (2), (3);
SET allow_experimental_parallel_reading_from_replicas = 1, max_parallel_replicas = 2, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', prefer_localhost_replica = 0, parallel_replicas_for_non_replicated_merge_tree = 1;
WITH (SELECT '111111111111111111111111111111111111111'::UInt128) AS v SELECT sum(x), max(v) FROM test;
DROP TABLE test;