Merge branch 'master' into gb-use-null-analyzer-crashes

This commit is contained in:
Nikolai Kochetov 2024-03-27 18:37:27 +00:00
commit dc52b81f5d
135 changed files with 1320 additions and 629 deletions

View File

@ -44,22 +44,35 @@ At a minimum, the following information should be added (but add more as needed)
---
### Modify your CI run:
**NOTE:** If your merge the PR with modified CI you **MUST KNOW** what you are doing
**NOTE:** Set desired options before CI starts or re-push after updates
**NOTE:** Checked options will be applied if set before CI RunConfig/PrepareRunConfig step
#### Run only:
- [ ] <!---ci_set_integration--> Integration tests
- [ ] <!---ci_set_arm--> Integration tests (arm64)
- [ ] <!---ci_set_stateless--> Stateless tests (release)
- [ ] <!---ci_set_stateless_asan--> Stateless tests (asan)
- [ ] <!---ci_set_stateful--> Stateful tests (release)
- [ ] <!---ci_set_stateful_asan--> Stateful tests (asan)
- [ ] <!---ci_set_reduced--> No sanitizers
- [ ] <!---ci_set_analyzer--> Tests with analyzer
- [ ] <!---ci_set_fast--> Fast tests
- [ ] <!---job_package_debug--> Only package_debug build
- [ ] <!---PLACE_YOUR_TAG_CONFIGURED_IN_ci_config.py_FILE_HERE--> Add your CI variant description here
#### Include tests (required builds will be added automatically):
- [ ] <!---ci_include_fast--> Fast test
- [ ] <!---ci_include_integration--> Integration Tests
- [ ] <!---ci_include_stateless--> Stateless tests
- [ ] <!---ci_include_stateful--> Stateful tests
- [ ] <!---ci_include_unit--> Unit tests
- [ ] <!---ci_include_performance--> Performance tests
- [ ] <!---ci_include_asan--> All with ASAN
- [ ] <!---ci_include_tsan--> All with TSAN
- [ ] <!---ci_include_analyzer--> All with Analyzer
- [ ] <!---ci_include_KEYWORD--> Add your option here
#### CI options:
#### Exclude tests:
- [ ] <!---ci_exclude_fast--> Fast test
- [ ] <!---ci_exclude_integration--> Integration Tests
- [ ] <!---ci_exclude_stateless--> Stateless tests
- [ ] <!---ci_exclude_stateful--> Stateful tests
- [ ] <!---ci_exclude_performance--> Performance tests
- [ ] <!---ci_exclude_asan--> All with ASAN
- [ ] <!---ci_exclude_tsan--> All with TSAN
- [ ] <!---ci_exclude_msan--> All with MSAN
- [ ] <!---ci_exclude_ubsan--> All with UBSAN
- [ ] <!---ci_exclude_coverage--> All with Coverage
- [ ] <!---ci_exclude_aarch64--> All with Aarch64
- [ ] <!---ci_exclude_KEYWORD--> Add your option here
#### Extra options:
- [ ] <!---do_not_test--> do not test (only style check)
- [ ] <!---no_merge_commit--> disable merge-commit (no merge from master before tests)
- [ ] <!---no_ci_cache--> disable CI cache (job reuse)

View File

@ -374,7 +374,7 @@ jobs:
if: ${{ !failure() && !cancelled() }}
uses: ./.github/workflows/reusable_test.yml
with:
test_name: Stateless tests (release, analyzer, s3, DatabaseReplicated)
test_name: Stateless tests (release, old analyzer, s3, DatabaseReplicated)
runner_type: func-tester
data: ${{ needs.RunConfig.outputs.data }}
FunctionalStatelessTestS3Debug:
@ -632,7 +632,7 @@ jobs:
if: ${{ !failure() && !cancelled() }}
uses: ./.github/workflows/reusable_test.yml
with:
test_name: Integration tests (asan, analyzer)
test_name: Integration tests (asan, old analyzer)
runner_type: stress-tester
data: ${{ needs.RunConfig.outputs.data }}
IntegrationTestsTsan:

View File

@ -436,7 +436,7 @@ jobs:
if: ${{ !failure() && !cancelled() }}
uses: ./.github/workflows/reusable_test.yml
with:
test_name: Integration tests (asan, analyzer)
test_name: Integration tests (asan, old analyzer)
runner_type: stress-tester
data: ${{ needs.RunConfig.outputs.data }}
IntegrationTestsTsan:

View File

@ -6,7 +6,7 @@
# 2024 Changelog
### <a id="243"></a> ClickHouse release 24.3 LTS, 2024-03-26
### <a id="243"></a> ClickHouse release 24.3 LTS, 2024-03-27
#### Upgrade Notes
* The setting `allow_experimental_analyzer` is enabled by default and it switches the query analysis to a new implementation, which has better compatibility and feature completeness. The feature "analyzer" is considered beta instead of experimental. You can turn the old behavior by setting the `compatibility` to `24.2` or disabling the `allow_experimental_analyzer` setting. Watch the [video on YouTube](https://www.youtube.com/watch?v=zhrOYQpgvkk).

View File

@ -2,11 +2,11 @@
# NOTE: has nothing common with DBMS_TCP_PROTOCOL_VERSION,
# only DBMS_TCP_PROTOCOL_VERSION should be incremented on protocol changes.
SET(VERSION_REVISION 54484)
SET(VERSION_REVISION 54485)
SET(VERSION_MAJOR 24)
SET(VERSION_MINOR 3)
SET(VERSION_MINOR 4)
SET(VERSION_PATCH 1)
SET(VERSION_GITHASH 891689a41506d00aa169548f5b4a8774351242c4)
SET(VERSION_DESCRIBE v24.3.1.1-testing)
SET(VERSION_STRING 24.3.1.1)
SET(VERSION_GITHASH 2c5c589a882ceec35439650337b92db3e76f0081)
SET(VERSION_DESCRIBE v24.4.1.1-testing)
SET(VERSION_STRING 24.4.1.1)
# end of autochange

View File

@ -138,7 +138,7 @@ ENGINE = MergeTree
PARTITION BY toYYYYMM(EventDate)
ORDER BY (CounterID, EventDate, intHash32(UserID))
SAMPLE BY intHash32(UserID)
SETTINGS disk = disk(type = cache, path = '/var/lib/clickhouse/filesystem_caches/', max_size = '4G',
SETTINGS disk = disk(type = cache, path = '/var/lib/clickhouse/filesystem_caches/stateful/', max_size = '4G',
disk = disk(type = web, endpoint = 'https://clickhouse-datasets-web.s3.us-east-1.amazonaws.com/'));
ATTACH TABLE datasets.visits_v1 UUID '5131f834-711f-4168-98a5-968b691a104b'
@ -329,5 +329,5 @@ ENGINE = CollapsingMergeTree(Sign)
PARTITION BY toYYYYMM(StartDate)
ORDER BY (CounterID, StartDate, intHash32(UserID), VisitID)
SAMPLE BY intHash32(UserID)
SETTINGS disk = disk(type = cache, path = '/var/lib/clickhouse/filesystem_caches/', max_size = '4G',
SETTINGS disk = disk(type = cache, path = '/var/lib/clickhouse/filesystem_caches/stateful/', max_size = '4G',
disk = disk(type = web, endpoint = 'https://clickhouse-datasets-web.s3.us-east-1.amazonaws.com/'));

View File

@ -2356,7 +2356,7 @@ You can select data from a ClickHouse table and save them into some file in the
$ clickhouse-client --query="SELECT * FROM {some_table} FORMAT Arrow" > {filename.arrow}
```
### Arrow format settings {#parquet-format-settings}
### Arrow format settings {#arrow-format-settings}
- [output_format_arrow_low_cardinality_as_dictionary](/docs/en/operations/settings/settings-formats.md/#output_format_arrow_low_cardinality_as_dictionary) - enable output ClickHouse LowCardinality type as Dictionary Arrow type. Default value - `false`.
- [output_format_arrow_use_64_bit_indexes_for_dictionary](/docs/en/operations/settings/settings-formats.md/#output_format_arrow_use_64_bit_indexes_for_dictionary) - use 64-bit integer type for Dictionary indexes. Default value - `false`.

View File

@ -945,9 +945,9 @@ Hard limit is configured via system tools
## database_atomic_delay_before_drop_table_sec {#database_atomic_delay_before_drop_table_sec}
Sets the delay before remove table data in seconds. If the query has `SYNC` modifier, this setting is ignored.
The delay during which a dropped table can be restored using the [UNDROP](/docs/en/sql-reference/statements/undrop.md) statement. If `DROP TABLE` ran with a `SYNC` modifier, the setting is ignored.
Default value: `480` (8 minute).
Default value: `480` (8 minutes).
## database_catalog_unused_dir_hide_timeout_sec {#database_catalog_unused_dir_hide_timeout_sec}

View File

@ -1367,7 +1367,7 @@ Default value: `1'000'000`.
While importing data, when column is not found in schema default value will be used instead of error.
Disabled by default.
Enabled by default.
### input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference {#input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference}

View File

@ -2817,6 +2817,17 @@ Possible values:
Default value: 0.
## distributed_insert_skip_read_only_replicas {#distributed_insert_skip_read_only_replicas}
Enables skipping read-only replicas for INSERT queries into Distributed.
Possible values:
- 0 — INSERT was as usual, if it will go to read-only replica it will fail
- 1 — Initiator will skip read-only replicas before sending data to shards.
Default value: `0`
## distributed_foreground_insert {#distributed_foreground_insert}
Enables or disables synchronous data insertion into a [Distributed](../../engines/table-engines/special/distributed.md/#distributed) table.

View File

@ -128,9 +128,9 @@ Returns the part of the domain that includes top-level subdomains up to the “f
For example:
- `cutToFirstSignificantSubdomain('https://news.clickhouse.com.tr/') = 'clickhouse.com.tr'`.
- `cutToFirstSignificantSubdomain('www.tr') = 'www.tr'`.
- `cutToFirstSignificantSubdomain('tr') = ''`.
- `cutToFirstSignificantSubdomainWithWWW('https://news.clickhouse.com.tr/') = 'clickhouse.com.tr'`.
- `cutToFirstSignificantSubdomainWithWWW('www.tr') = 'www.tr'`.
- `cutToFirstSignificantSubdomainWithWWW('tr') = ''`.
### cutToFirstSignificantSubdomainCustom

View File

@ -13,13 +13,6 @@ a system table called `system.dropped_tables`.
If you have a materialized view without a `TO` clause associated with the dropped table, then you will also have to UNDROP the inner table of that view.
:::note
UNDROP TABLE is experimental. To use it add this setting:
```sql
set allow_experimental_undrop_table_query = 1;
```
:::
:::tip
Also see [DROP TABLE](/docs/en/sql-reference/statements/drop.md)
:::
@ -32,60 +25,53 @@ UNDROP TABLE [db.]name [UUID '<uuid>'] [ON CLUSTER cluster]
**Example**
``` sql
set allow_experimental_undrop_table_query = 1;
```
```sql
CREATE TABLE undropMe
CREATE TABLE tab
(
`id` UInt8
)
ENGINE = MergeTree
ORDER BY id
```
ORDER BY id;
DROP TABLE tab;
```sql
DROP TABLE undropMe
```
```sql
SELECT *
FROM system.dropped_tables
FORMAT Vertical
FORMAT Vertical;
```
```response
Row 1:
──────
index: 0
database: default
table: undropMe
table: tab
uuid: aa696a1a-1d70-4e60-a841-4c80827706cc
engine: MergeTree
metadata_dropped_path: /var/lib/clickhouse/metadata_dropped/default.undropMe.aa696a1a-1d70-4e60-a841-4c80827706cc.sql
metadata_dropped_path: /var/lib/clickhouse/metadata_dropped/default.tab.aa696a1a-1d70-4e60-a841-4c80827706cc.sql
table_dropped_time: 2023-04-05 14:12:12
1 row in set. Elapsed: 0.001 sec.
```
```sql
UNDROP TABLE undropMe
```
```response
Ok.
```
```sql
UNDROP TABLE tab;
SELECT *
FROM system.dropped_tables
FORMAT Vertical
```
FORMAT Vertical;
```response
Ok.
0 rows in set. Elapsed: 0.001 sec.
```
```sql
DESCRIBE TABLE undropMe
FORMAT Vertical
DESCRIBE TABLE tab
FORMAT Vertical;
```
```response
Row 1:
──────

View File

@ -53,7 +53,7 @@ SELECT * FROM random;
└──────────────────────────────┴──────────────┴────────────────────────────────────────────────────────────────────┘
```
In combination with [generateRandomStructure](../../sql-reference/functions/other-functions.md#generateRandomStructure):
In combination with [generateRandomStructure](../../sql-reference/functions/other-functions.md#generaterandomstructure):
```sql
SELECT * FROM generateRandom(generateRandomStructure(4, 101), 101) LIMIT 3;

View File

@ -99,6 +99,19 @@ function(add_rust_subdirectory src)
message(STATUS "Copy ${src} to ${dst}")
file(COPY "${src}" DESTINATION "${CMAKE_CURRENT_BINARY_DIR}"
PATTERN target EXCLUDE)
# Check is Rust available or not.
#
# `cargo update --dry-run` will not update anything, but will check the internet connectivity.
execute_process(COMMAND ${Rust_CARGO_CACHED} update --dry-run
WORKING_DIRECTORY "${dst}"
RESULT_VARIABLE CARGO_UPDATE_RESULT
OUTPUT_VARIABLE CARGO_UPDATE_STDOUT
ERROR_VARIABLE CARGO_UPDATE_STDERR)
if (CARGO_UPDATE_RESULT)
message(FATAL_ERROR "Rust (${Rust_CARGO_CACHED}) support is not available (likely there is no internet connectivity):\n${CARGO_UPDATE_STDERR}\nYou can disable Rust support with -DENABLE_RUST=OFF")
endif()
add_subdirectory("${dst}" "${dst}")
# cmake -E copy* do now know how to exclude files

View File

@ -2072,92 +2072,75 @@ void QueryAnalyzer::evaluateScalarSubqueryIfNeeded(QueryTreeNodePtr & node, Iden
io.pipeline.setProgressCallback(context->getProgressCallback());
io.pipeline.setProcessListElement(context->getProcessListElement());
if (only_analyze)
Block block;
while (block.rows() == 0 && executor.pull(block))
{
/// If query is only analyzed, then constants are not correct.
scalar_block = interpreter->getSampleBlock();
for (auto & column : scalar_block)
}
if (block.rows() == 0)
{
auto types = interpreter->getSampleBlock().getDataTypes();
if (types.size() != 1)
types = {std::make_shared<DataTypeTuple>(types)};
auto & type = types[0];
if (!type->isNullable())
{
if (column.column->empty())
{
auto mut_col = column.column->cloneEmpty();
mut_col->insertDefault();
column.column = std::move(mut_col);
}
if (!type->canBeInsideNullable())
throw Exception(ErrorCodes::INCORRECT_RESULT_OF_SCALAR_SUBQUERY,
"Scalar subquery returned empty result of type {} which cannot be Nullable",
type->getName());
type = makeNullable(type);
}
auto scalar_column = type->createColumn();
scalar_column->insert(Null());
scalar_block.insert({std::move(scalar_column), type, "null"});
}
else
{
Block block;
if (block.rows() != 1)
throw Exception(ErrorCodes::INCORRECT_RESULT_OF_SCALAR_SUBQUERY, "Scalar subquery returned more than one row");
while (block.rows() == 0 && executor.pull(block))
Block tmp_block;
while (tmp_block.rows() == 0 && executor.pull(tmp_block))
{
}
if (block.rows() == 0)
if (tmp_block.rows() != 0)
throw Exception(ErrorCodes::INCORRECT_RESULT_OF_SCALAR_SUBQUERY, "Scalar subquery returned more than one row");
block = materializeBlock(block);
size_t columns = block.columns();
if (columns == 1)
{
auto types = interpreter->getSampleBlock().getDataTypes();
if (types.size() != 1)
types = {std::make_shared<DataTypeTuple>(types)};
auto & type = types[0];
if (!type->isNullable())
auto & column = block.getByPosition(0);
/// Here we wrap type to nullable if we can.
/// It is needed cause if subquery return no rows, it's result will be Null.
/// In case of many columns, do not check it cause tuple can't be nullable.
if (!column.type->isNullable() && column.type->canBeInsideNullable())
{
if (!type->canBeInsideNullable())
throw Exception(ErrorCodes::INCORRECT_RESULT_OF_SCALAR_SUBQUERY,
"Scalar subquery returned empty result of type {} which cannot be Nullable",
type->getName());
type = makeNullable(type);
column.type = makeNullable(column.type);
column.column = makeNullable(column.column);
}
auto scalar_column = type->createColumn();
scalar_column->insert(Null());
scalar_block.insert({std::move(scalar_column), type, "null"});
scalar_block = block;
}
else
{
if (block.rows() != 1)
throw Exception(ErrorCodes::INCORRECT_RESULT_OF_SCALAR_SUBQUERY, "Scalar subquery returned more than one row");
/** Make unique column names for tuple.
*
* Example: SELECT (SELECT 2 AS x, x)
*/
makeUniqueColumnNamesInBlock(block);
Block tmp_block;
while (tmp_block.rows() == 0 && executor.pull(tmp_block))
{
}
if (tmp_block.rows() != 0)
throw Exception(ErrorCodes::INCORRECT_RESULT_OF_SCALAR_SUBQUERY, "Scalar subquery returned more than one row");
block = materializeBlock(block);
size_t columns = block.columns();
if (columns == 1)
{
auto & column = block.getByPosition(0);
/// Here we wrap type to nullable if we can.
/// It is needed cause if subquery return no rows, it's result will be Null.
/// In case of many columns, do not check it cause tuple can't be nullable.
if (!column.type->isNullable() && column.type->canBeInsideNullable())
{
column.type = makeNullable(column.type);
column.column = makeNullable(column.column);
}
scalar_block = block;
}
else
{
/** Make unique column names for tuple.
*
* Example: SELECT (SELECT 2 AS x, x)
*/
makeUniqueColumnNamesInBlock(block);
scalar_block.insert({
ColumnTuple::create(block.getColumns()),
std::make_shared<DataTypeTuple>(block.getDataTypes(), block.getNames()),
"tuple"});
}
scalar_block.insert({
ColumnTuple::create(block.getColumns()),
std::make_shared<DataTypeTuple>(block.getDataTypes(), block.getNames()),
"tuple"});
}
}
@ -7375,7 +7358,7 @@ void QueryAnalyzer::resolveTableFunction(QueryTreeNodePtr & table_function_node,
ColumnDescription column = insert_columns.get(*insert_column_name_it);
/// Change ephemeral columns to default columns.
column.default_desc.kind = ColumnDefaultKind::Default;
structure_hint.add(insert_columns.get(*insert_column_name_it));
structure_hint.add(std::move(column));
}
}

View File

@ -70,6 +70,12 @@ void ConnectionEstablisher::run(ConnectionEstablisher::TryResult & result, std::
ProfileEvents::increment(ProfileEvents::DistributedConnectionUsable);
result.is_usable = true;
if (table_status_it->second.is_readonly)
{
result.is_readonly = true;
LOG_TRACE(log, "Table {}.{} is readonly on server {}", table_to_check->database, table_to_check->table, result.entry->getDescription());
}
const UInt64 max_allowed_delay = settings.max_replica_delay_for_distributed_queries;
if (!max_allowed_delay)
{

View File

@ -52,7 +52,7 @@ IConnectionPool::Entry ConnectionPoolWithFailover::get(const ConnectionTimeouts
settings.distributed_replica_max_ignored_errors = 0;
settings.fallback_to_stale_replicas_for_distributed_queries = true;
return get(timeouts, settings, true);
return get(timeouts, settings, /* force_connected= */ true);
}
IConnectionPool::Entry ConnectionPoolWithFailover::get(const ConnectionTimeouts & timeouts,
@ -65,7 +65,7 @@ IConnectionPool::Entry ConnectionPoolWithFailover::get(const ConnectionTimeouts
TryGetEntryFunc try_get_entry = [&](const NestedPoolPtr & pool, std::string & fail_message)
{
return tryGetEntry(pool, timeouts, fail_message, settings, {});
return tryGetEntry(pool, timeouts, fail_message, settings);
};
const size_t offset = settings.load_balancing_first_offset % nested_pools.size();
@ -158,6 +158,21 @@ std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::g
return getManyImpl(settings, pool_mode, try_get_entry, skip_unavailable_endpoints, priority_func);
}
std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::getManyCheckedForInsert(
const ConnectionTimeouts & timeouts,
const Settings & settings,
PoolMode pool_mode,
const QualifiedTableName & table_to_check)
{
TryGetEntryFunc try_get_entry = [&](const NestedPoolPtr & pool, std::string & fail_message)
{ return tryGetEntry(pool, timeouts, fail_message, settings, &table_to_check, /*async_callback=*/ {}); };
return getManyImpl(settings, pool_mode, try_get_entry,
/*skip_unavailable_endpoints=*/ std::nullopt,
/*priority_func=*/ {},
settings.distributed_insert_skip_read_only_replicas);
}
ConnectionPoolWithFailover::Base::GetPriorityFunc ConnectionPoolWithFailover::makeGetPriorityFunc(const Settings & settings)
{
const size_t offset = settings.load_balancing_first_offset % nested_pools.size();
@ -171,7 +186,8 @@ std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::g
PoolMode pool_mode,
const TryGetEntryFunc & try_get_entry,
std::optional<bool> skip_unavailable_endpoints,
GetPriorityForLoadBalancing::Func priority_func)
GetPriorityForLoadBalancing::Func priority_func,
bool skip_read_only_replicas)
{
if (nested_pools.empty())
throw DB::Exception(
@ -203,7 +219,7 @@ std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::g
UInt64 max_ignored_errors = settings.distributed_replica_max_ignored_errors.value;
bool fallback_to_stale_replicas = settings.fallback_to_stale_replicas_for_distributed_queries.value;
return Base::getMany(min_entries, max_entries, max_tries, max_ignored_errors, fallback_to_stale_replicas, try_get_entry, priority_func);
return Base::getMany(min_entries, max_entries, max_tries, max_ignored_errors, fallback_to_stale_replicas, skip_read_only_replicas, try_get_entry, priority_func);
}
ConnectionPoolWithFailover::TryResult

View File

@ -77,6 +77,12 @@ public:
AsyncCallback async_callback = {},
std::optional<bool> skip_unavailable_endpoints = std::nullopt,
GetPriorityForLoadBalancing::Func priority_func = {});
/// The same as getManyChecked(), but respects distributed_insert_skip_read_only_replicas setting.
std::vector<TryResult> getManyCheckedForInsert(
const ConnectionTimeouts & timeouts,
const Settings & settings,
PoolMode pool_mode,
const QualifiedTableName & table_to_check);
struct NestedPoolStatus
{
@ -107,7 +113,8 @@ private:
PoolMode pool_mode,
const TryGetEntryFunc & try_get_entry,
std::optional<bool> skip_unavailable_endpoints = std::nullopt,
GetPriorityForLoadBalancing::Func priority_func = {});
GetPriorityForLoadBalancing::Func priority_func = {},
bool skip_read_only_replicas = false);
/// Try to get a connection from the pool and check that it is good.
/// If table_to_check is not null and the check is enabled in settings, check that replication delay

View File

@ -20,12 +20,12 @@ namespace DB
namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
extern const int ARGUMENT_OUT_OF_BOUND;
extern const int DUPLICATE_COLUMN;
extern const int EXPERIMENTAL_FEATURE_ERROR;
extern const int ILLEGAL_COLUMN;
extern const int NUMBER_OF_DIMENSIONS_MISMATCHED;
extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
extern const int ARGUMENT_OUT_OF_BOUND;
extern const int EXPERIMENTAL_FEATURE_ERROR;
}
namespace
@ -334,7 +334,18 @@ void ColumnObject::Subcolumn::insert(Field field, FieldInfo info)
if (type_changed || info.need_convert)
field = convertFieldToTypeOrThrow(field, *least_common_type.get());
data.back()->insert(field);
if (!data.back()->tryInsert(field))
{
/** Normalization of the field above is pretty complicated (it uses several FieldVisitors),
* so in the case of a bug, we may get mismatched types.
* The `IColumn::insert` method does not check the type of the inserted field, and it can lead to a segmentation fault.
* Therefore, we use the safer `tryInsert` method to get an exception instead of a segmentation fault.
*/
throw Exception(ErrorCodes::EXPERIMENTAL_FEATURE_ERROR,
"Cannot insert field {} to column {}",
field.dump(), data.back()->dumpStructure());
}
++num_rows;
}

View File

@ -460,6 +460,28 @@ Float32 ColumnVector<T>::getFloat32(size_t n [[maybe_unused]]) const
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot get the value of {} as Float32", TypeName<T>);
}
template <typename T>
bool ColumnVector<T>::tryInsert(const DB::Field & x)
{
NearestFieldType<T> value;
if (!x.tryGet<NearestFieldType<T>>(value))
{
if constexpr (std::is_same_v<T, UInt8>)
{
/// It's also possible to insert boolean values into UInt8 column.
bool boolean_value;
if (x.tryGet<bool>(boolean_value))
{
data.push_back(static_cast<T>(boolean_value));
return true;
}
}
return false;
}
data.push_back(static_cast<T>(value));
return true;
}
template <typename T>
void ColumnVector<T>::insertRangeFrom(const IColumn & src, size_t start, size_t length)
{

View File

@ -224,14 +224,8 @@ public:
data.push_back(static_cast<T>(x.get<T>()));
}
bool tryInsert(const DB::Field & x) override
{
NearestFieldType<T> value;
if (!x.tryGet<NearestFieldType<T>>(value))
return false;
data.push_back(static_cast<T>(value));
return true;
}
bool tryInsert(const DB::Field & x) override;
void insertRangeFrom(const IColumn & src, size_t start, size_t length) override;
ColumnPtr filter(const IColumn::Filter & filt, ssize_t result_size_hint) const override;

View File

@ -30,6 +30,7 @@ namespace ProfileEvents
{
extern const Event DistributedConnectionFailTry;
extern const Event DistributedConnectionFailAtAll;
extern const Event DistributedConnectionSkipReadOnlyReplica;
}
/// This class provides a pool with fault tolerance. It is used for pooling of connections to replicated DB.
@ -73,13 +74,7 @@ public:
{
TryResult() = default;
void reset()
{
entry = Entry();
is_usable = false;
is_up_to_date = false;
delay = 0;
}
void reset() { *this = {}; }
Entry entry; /// use isNull() to check if connection is established
bool is_usable = false; /// if connection is established, then can be false only with table check
@ -87,6 +82,7 @@ public:
bool is_up_to_date = false; /// If true, the entry is a connection to up-to-date replica
/// Depends on max_replica_delay_for_distributed_queries setting
UInt32 delay = 0; /// Helps choosing the "least stale" option when all replicas are stale.
bool is_readonly = false; /// Table is in read-only mode, INSERT can ignore such replicas.
};
struct PoolState;
@ -117,6 +113,7 @@ public:
size_t min_entries, size_t max_entries, size_t max_tries,
size_t max_ignored_errors,
bool fallback_to_stale_replicas,
bool skip_read_only_replicas,
const TryGetEntryFunc & try_get_entry,
const GetPriorityFunc & get_priority);
@ -205,8 +202,12 @@ PoolWithFailoverBase<TNestedPool>::get(size_t max_ignored_errors, bool fallback_
const TryGetEntryFunc & try_get_entry, const GetPriorityFunc & get_priority)
{
std::vector<TryResult> results = getMany(
1 /* min entries */, 1 /* max entries */, 1 /* max tries */,
max_ignored_errors, fallback_to_stale_replicas,
/* min_entries= */ 1,
/* max_entries= */ 1,
/* max_tries= */ 1,
max_ignored_errors,
fallback_to_stale_replicas,
/* skip_read_only_replicas= */ false,
try_get_entry, get_priority);
if (results.empty() || results[0].entry.isNull())
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR,
@ -220,6 +221,7 @@ PoolWithFailoverBase<TNestedPool>::getMany(
size_t min_entries, size_t max_entries, size_t max_tries,
size_t max_ignored_errors,
bool fallback_to_stale_replicas,
bool skip_read_only_replicas,
const TryGetEntryFunc & try_get_entry,
const GetPriorityFunc & get_priority)
{
@ -271,9 +273,14 @@ PoolWithFailoverBase<TNestedPool>::getMany(
++entries_count;
if (result.is_usable)
{
++usable_count;
if (result.is_up_to_date)
++up_to_date_count;
if (skip_read_only_replicas && result.is_readonly)
ProfileEvents::increment(ProfileEvents::DistributedConnectionSkipReadOnlyReplica);
else
{
++usable_count;
if (result.is_up_to_date)
++up_to_date_count;
}
}
}
else
@ -296,7 +303,7 @@ PoolWithFailoverBase<TNestedPool>::getMany(
throw DB::NetException(DB::ErrorCodes::ALL_CONNECTION_TRIES_FAILED,
"All connection tries failed. Log: \n\n{}\n", fail_messages);
std::erase_if(try_results, [](const TryResult & r) { return r.entry.isNull() || !r.is_usable; });
std::erase_if(try_results, [&](const TryResult & r) { return r.entry.isNull() || !r.is_usable || (skip_read_only_replicas && r.is_readonly); });
/// Sort so that preferred items are near the beginning.
std::stable_sort(

View File

@ -156,6 +156,7 @@
M(DistributedConnectionFailTry, "Total count when distributed connection fails with retry.") \
M(DistributedConnectionMissingTable, "Number of times we rejected a replica from a distributed query, because it did not contain a table needed for the query.") \
M(DistributedConnectionStaleReplica, "Number of times we rejected a replica from a distributed query, because some table needed for a query had replication lag higher than the configured threshold.") \
M(DistributedConnectionSkipReadOnlyReplica, "Number of replicas skipped during INSERT into Distributed table due to replicas being read-only") \
M(DistributedConnectionFailAtAll, "Total count when distributed connection fails after all retries finished.") \
\
M(HedgedRequestsChangeReplica, "Total count when timeout for changing replica expired in hedged requests.") \

View File

@ -76,6 +76,9 @@ static constexpr auto DBMS_MIN_REVISION_WITH_SPARSE_SERIALIZATION = 54465;
static constexpr auto DBMS_MIN_REVISION_WITH_SSH_AUTHENTICATION = 54466;
/// Send read-only flag for Replicated tables as well
static constexpr auto DBMS_MIN_REVISION_WITH_TABLE_READ_ONLY_CHECK = 54467;
/// Version of ClickHouse TCP protocol.
///
/// Should be incremented manually on protocol changes.
@ -83,6 +86,6 @@ static constexpr auto DBMS_MIN_REVISION_WITH_SSH_AUTHENTICATION = 54466;
/// NOTE: DBMS_TCP_PROTOCOL_VERSION has nothing common with VERSION_REVISION,
/// later is just a number for server version (one number instead of commit SHA)
/// for simplicity (sometimes it may be more convenient in some use cases).
static constexpr auto DBMS_TCP_PROTOCOL_VERSION = 54466;
static constexpr auto DBMS_TCP_PROTOCOL_VERSION = 54467;
}

View File

@ -136,6 +136,7 @@ class IColumn;
M(Bool, stream_like_engine_allow_direct_select, false, "Allow direct SELECT query for Kafka, RabbitMQ, FileLog, Redis Streams, and NATS engines. In case there are attached materialized views, SELECT query is not allowed even if this setting is enabled.", 0) \
M(String, stream_like_engine_insert_queue, "", "When stream like engine reads from multiple queues, user will need to select one queue to insert into when writing. Used by Redis Streams and NATS.", 0) \
\
M(Bool, distributed_insert_skip_read_only_replicas, false, "If true, INSERT into Distributed will skip read-only replicas.", 0) \
M(Bool, distributed_foreground_insert, false, "If setting is enabled, insert query into distributed waits until data are sent to all nodes in a cluster. \n\nEnables or disables synchronous data insertion into a `Distributed` table.\n\nBy default, when inserting data into a Distributed table, the ClickHouse server sends data to cluster nodes in the background. When `distributed_foreground_insert` = 1, the data is processed synchronously, and the `INSERT` operation succeeds only after all the data is saved on all shards (at least one replica for each shard if `internal_replication` is true).", 0) ALIAS(insert_distributed_sync) \
M(UInt64, distributed_background_insert_timeout, 0, "Timeout for insert query into distributed. Setting is used only with insert_distributed_sync enabled. Zero value means no timeout.", 0) ALIAS(insert_distributed_timeout) \
M(Milliseconds, distributed_background_insert_sleep_time_ms, 100, "Sleep time for background INSERTs into Distributed, in case of any errors delay grows exponentially.", 0) ALIAS(distributed_directory_monitor_sleep_time_ms) \
@ -380,7 +381,7 @@ class IColumn;
M(Float, opentelemetry_start_trace_probability, 0., "Probability to start an OpenTelemetry trace for an incoming query.", 0) \
M(Bool, opentelemetry_trace_processors, false, "Collect OpenTelemetry spans for processors.", 0) \
M(Bool, prefer_column_name_to_alias, false, "Prefer using column names instead of aliases if possible.", 0) \
M(Bool, allow_experimental_analyzer, false, "Allow experimental analyzer", 0) \
M(Bool, allow_experimental_analyzer, true, "Allow experimental analyzer.", 0) \
M(Bool, analyzer_compatibility_join_using_top_level_identifier, false, "Force to resolve identifier in JOIN USING from projection (for example, in `SELECT a + 1 AS b FROM t1 JOIN t2 USING (b)` join will be performed by `t1.a + 1 = t2.b`, rather then `t1.b = t2.b`).", 0) \
M(Bool, prefer_global_in_and_join, false, "If enabled, all IN/JOIN operators will be rewritten as GLOBAL IN/JOIN. It's useful when the to-be-joined tables are only available on the initiator and we need to always scatter their data on-the-fly during distributed processing with the GLOBAL keyword. It's also useful to reduce the need to access the external sources joining external tables.", 0) \
M(Bool, enable_vertical_final, true, "If enable, remove duplicated rows during FINAL by marking rows as deleted and filtering them later instead of merging rows", 0) \

View File

@ -101,10 +101,12 @@ static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> sett
{"filesystem_cache_reserve_space_wait_lock_timeout_milliseconds", 1000, 1000, "Wait time to lock cache for sapce reservation in filesystem cache"},
{"max_parser_backtracks", 0, 1000000, "Limiting the complexity of parsing"},
{"analyzer_compatibility_join_using_top_level_identifier", false, false, "Force to resolve identifier in JOIN USING from projection"},
{"distributed_insert_skip_read_only_replicas", false, false, "If true, INSERT into Distributed will skip read-only replicas"},
{"keeper_max_retries", 10, 10, "Max retries for general keeper operations"},
{"keeper_retry_initial_backoff_ms", 100, 100, "Initial backoff timeout for general keeper operations"},
{"keeper_retry_max_backoff_ms", 5000, 5000, "Max backoff timeout for general keeper operations"},
{"s3queue_allow_experimental_sharded_mode", false, false, "Enable experimental sharded mode of S3Queue table engine. It is experimental because it will be rewritten"},
{"allow_experimental_analyzer", false, true, "Enable analyzer and planner by default."},
{"merge_tree_read_split_ranges_into_intersecting_and_non_intersecting_injection_probability", 0.0, 0.0, "For testing of `PartsSplitter` - split read ranges into intersecting and non intersecting every time you read from MergeTree with the specified probability."},
{"allow_get_client_http_header", false, false, "Introduced a new function."},
{"output_format_pretty_row_numbers", false, true, "It is better for usability."},

View File

@ -1054,6 +1054,14 @@ Field FieldVisitorFoldDimension::operator()(const Array & x) const
return res;
}
Field FieldVisitorFoldDimension::operator()(const Null & x) const
{
if (num_dimensions_to_fold == 0)
return x;
return Array();
}
void setAllObjectsToDummyTupleType(NamesAndTypesList & columns)
{
for (auto & column : columns)

View File

@ -149,7 +149,7 @@ public:
Field operator()(const Array & x) const;
Field operator()(const Null & x) const { return x; }
Field operator()(const Null & x) const;
template <typename T>
Field operator()(const T & x) const

View File

@ -7,6 +7,7 @@
#include <Common/assert_cast.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromString.h>
@ -526,68 +527,26 @@ void SerializationTuple::serializeTextXML(const IColumn & column, size_t row_num
void SerializationTuple::serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
{
for (size_t i = 0; i < elems.size(); ++i)
{
if (i != 0)
writeChar(settings.csv.tuple_delimiter, ostr);
elems[i]->serializeTextCSV(extractElementColumn(column, i), row_num, ostr, settings);
}
WriteBufferFromOwnString wb;
serializeText(column, row_num, wb, settings);
writeCSV(wb.str(), ostr);
}
void SerializationTuple::deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
addElementSafe<void>(elems.size(), column, [&]
{
const size_t size = elems.size();
for (size_t i = 0; i < size; ++i)
{
if (i != 0)
{
skipWhitespaceIfAny(istr);
assertChar(settings.csv.tuple_delimiter, istr);
skipWhitespaceIfAny(istr);
}
auto & element_column = extractElementColumn(column, i);
if (settings.null_as_default && !isColumnNullableOrLowCardinalityNullable(element_column))
SerializationNullable::deserializeNullAsDefaultOrNestedTextCSV(element_column, istr, settings, elems[i]);
else
elems[i]->deserializeTextCSV(element_column, istr, settings);
}
return true;
});
String s;
readCSV(s, istr, settings.csv);
ReadBufferFromString rb(s);
deserializeText(column, rb, settings, true);
}
bool SerializationTuple::tryDeserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
return addElementSafe<bool>(elems.size(), column, [&]
{
const size_t size = elems.size();
for (size_t i = 0; i < size; ++i)
{
if (i != 0)
{
skipWhitespaceIfAny(istr);
if (!checkChar(settings.csv.tuple_delimiter, istr))
return false;
skipWhitespaceIfAny(istr);
}
auto & element_column = extractElementColumn(column, i);
if (settings.null_as_default && !isColumnNullableOrLowCardinalityNullable(element_column))
{
if (!SerializationNullable::tryDeserializeNullAsDefaultOrNestedTextCSV(element_column, istr, settings, elems[i]))
return false;
}
else
{
if (!elems[i]->tryDeserializeTextCSV(element_column, istr, settings))
return false;
}
}
return true;
});
String s;
if (!tryReadCSV(s, istr, settings.csv))
return false;
ReadBufferFromString rb(s);
return tryDeserializeText(column, rb, settings, true);
}
void SerializationTuple::enumerateStreams(

View File

@ -1,11 +1,16 @@
#include <Databases/DatabaseOnDisk.h>
#include <filesystem>
#include <iterator>
#include <span>
#include <Databases/DatabaseAtomic.h>
#include <Databases/DatabaseOrdinary.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Interpreters/ApplyWithSubqueryVisitor.h>
#include <Interpreters/Context.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Parsers/ASTCreateQuery.h>
@ -16,14 +21,11 @@
#include <Storages/IStorage.h>
#include <Storages/StorageFactory.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Common/escapeForFileName.h>
#include <Common/logger_useful.h>
#include <Common/filesystemHelpers.h>
#include <Common/CurrentMetrics.h>
#include <Common/assert_cast.h>
#include <Databases/DatabaseOrdinary.h>
#include <Databases/DatabaseAtomic.h>
#include <filesystem>
#include <Common/escapeForFileName.h>
#include <Common/filesystemHelpers.h>
#include <Common/logger_useful.h>
namespace fs = std::filesystem;
@ -613,7 +615,7 @@ void DatabaseOnDisk::iterateMetadataFiles(ContextPtr local_context, const Iterat
};
/// Metadata files to load: name and flag for .tmp_drop files
std::set<std::pair<String, bool>> metadata_files;
std::vector<std::pair<String, bool>> metadata_files;
fs::directory_iterator dir_end;
for (fs::directory_iterator dir_it(getMetadataPath()); dir_it != dir_end; ++dir_it)
@ -634,7 +636,7 @@ void DatabaseOnDisk::iterateMetadataFiles(ContextPtr local_context, const Iterat
if (endsWith(file_name, ".sql.tmp_drop"))
{
/// There are files that we tried to delete previously
metadata_files.emplace(file_name, false);
metadata_files.emplace_back(file_name, false);
}
else if (endsWith(file_name, ".sql.tmp"))
{
@ -645,23 +647,30 @@ void DatabaseOnDisk::iterateMetadataFiles(ContextPtr local_context, const Iterat
else if (endsWith(file_name, ".sql"))
{
/// The required files have names like `table_name.sql`
metadata_files.emplace(file_name, true);
metadata_files.emplace_back(file_name, true);
}
else
throw Exception(ErrorCodes::INCORRECT_FILE_NAME, "Incorrect file extension: {} in metadata directory {}", file_name, getMetadataPath());
}
std::sort(metadata_files.begin(), metadata_files.end());
metadata_files.erase(std::unique(metadata_files.begin(), metadata_files.end()), metadata_files.end());
/// Read and parse metadata in parallel
ThreadPool pool(CurrentMetrics::DatabaseOnDiskThreads, CurrentMetrics::DatabaseOnDiskThreadsActive, CurrentMetrics::DatabaseOnDiskThreadsScheduled);
for (const auto & file : metadata_files)
const auto batch_size = metadata_files.size() / pool.getMaxThreads() + 1;
for (auto it = metadata_files.begin(); it < metadata_files.end(); std::advance(it, batch_size))
{
pool.scheduleOrThrowOnError([&]()
{
if (file.second)
process_metadata_file(file.first);
else
process_tmp_drop_metadata_file(file.first);
});
std::span batch{it, std::min(std::next(it, batch_size), metadata_files.end())};
pool.scheduleOrThrowOnError(
[batch, &process_metadata_file, &process_tmp_drop_metadata_file]() mutable
{
for (const auto & file : batch)
if (file.second)
process_metadata_file(file.first);
else
process_tmp_drop_metadata_file(file.first);
});
}
pool.wait();
}

View File

@ -303,8 +303,8 @@ DataTypePtr tryInferDataTypeByEscapingRule(const String & field, const FormatSet
/// Try to determine the type of value inside quotes
auto type = tryInferDataTypeForSingleField(data, format_settings);
/// If we couldn't infer any type or it's tuple in quotes or it's a number and csv.try_infer_numbers_from_strings = 0, we determine it as a string.
if (!type || isTuple(type) || (isNumber(type) && !format_settings.csv.try_infer_numbers_from_strings))
/// If we couldn't infer any type or it's a number and csv.try_infer_numbers_from_strings = 0, we determine it as a string.
if (!type || (isNumber(type) && !format_settings.csv.try_infer_numbers_from_strings))
return std::make_shared<DataTypeString>();
return type;

View File

@ -348,7 +348,6 @@ public:
String getName() const override { return Name::name; }
bool useDefaultImplementationForNulls() const override { return false; }
bool useDefaultImplementationForConstants() const override { return true; }
bool useDefaultImplementationForLowCardinalityColumns() const override { return false; }
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const override
{
@ -470,6 +469,9 @@ public:
else
return_type = json_return_type;
/// Top-level LowCardinality columns are processed outside JSON parser.
json_return_type = removeLowCardinality(json_return_type);
DataTypes argument_types;
argument_types.reserve(arguments.size());
for (const auto & argument : arguments)
@ -865,9 +867,11 @@ struct JSONExtractTree
explicit LowCardinalityFixedStringNode(const size_t fixed_length_) : fixed_length(fixed_length_) { }
bool insertResultToColumn(IColumn & dest, const Element & element) override
{
// For types other than string, delegate the insertion to JSONExtractRawImpl.
if (!element.isString())
// If element is an object we delegate the insertion to JSONExtractRawImpl
if (element.isObject())
return JSONExtractRawImpl<JSONParser>::insertResultToLowCardinalityFixedStringColumn(dest, element, fixed_length);
else if (!element.isString())
return false;
auto str = element.getString();
if (str.size() > fixed_length)
@ -1482,6 +1486,9 @@ public:
// We use insertResultToLowCardinalityFixedStringColumn in case we are inserting raw data in a Low Cardinality FixedString column
static bool insertResultToLowCardinalityFixedStringColumn(IColumn & dest, const Element & element, size_t fixed_length)
{
if (element.getObject().size() > fixed_length)
return false;
ColumnFixedString::Chars chars;
WriteBufferFromVector<ColumnFixedString::Chars> buf(chars, AppendModeTag());
traverse(element, buf);

View File

@ -56,6 +56,10 @@ public:
if (!IntervalKind::tryParseString(datepart_param, datepart_kind))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "{} doesn't look like datepart name in {}", datepart_param, getName());
if (datepart_kind == IntervalKind::Kind::Nanosecond || datepart_kind == IntervalKind::Kind::Microsecond
|| datepart_kind == IntervalKind::Kind::Millisecond)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "{} doesn't support {}", getName(), datepart_param);
result_type_is_date = (datepart_kind == IntervalKind::Kind::Year)
|| (datepart_kind == IntervalKind::Kind::Quarter) || (datepart_kind == IntervalKind::Kind::Month)
|| (datepart_kind == IntervalKind::Kind::Week);

View File

@ -1219,7 +1219,7 @@ void Context::addWarningMessageAboutDatabaseOrdinary(const String & database_nam
/// We don't use getFlagsPath method, because it takes a shared lock.
auto convert_databases_flag = fs::path(shared->flags_path) / "convert_ordinary_to_atomic";
auto message = fmt::format("Server has databases (for example `{}`) with Ordinary engine, which was deprecated. "
"To convert this database to a new Atomic engine, create a flag {} and make sure that ClickHouse has write permission for it. "
"To convert this database to the new Atomic engine, create a flag {} and make sure that ClickHouse has write permission for it. "
"Example: sudo touch '{}' && sudo chmod 666 '{}'",
database_name,
convert_databases_flag.string(), convert_databases_flag.string(), convert_databases_flag.string());

View File

@ -1143,7 +1143,7 @@ void DatabaseCatalog::dequeueDroppedTableCleanup(StorageID table_id)
TableMarkedAsDropped dropped_table;
{
std::lock_guard lock(tables_marked_dropped_mutex);
time_t latest_drop_time = std::numeric_limits<time_t>::min();
auto latest_drop_time = std::numeric_limits<time_t>::min();
auto it_dropped_table = tables_marked_dropped.end();
for (auto it = tables_marked_dropped.begin(); it != tables_marked_dropped.end(); ++it)
{
@ -1168,7 +1168,7 @@ void DatabaseCatalog::dequeueDroppedTableCleanup(StorageID table_id)
}
if (it_dropped_table == tables_marked_dropped.end())
throw Exception(ErrorCodes::UNKNOWN_TABLE,
"The drop task of table {} is in progress, has been dropped or the database engine doesn't support it",
"Table {} is being dropped, has been dropped, or the database engine does not support UNDROP",
table_id.getNameForLogs());
latest_metadata_dropped_path = it_dropped_table->metadata_path;
String table_metadata_path = getPathForMetadata(it_dropped_table->table_id);

View File

@ -18,14 +18,16 @@ namespace ErrorCodes
extern const int SUPPORT_IS_DISABLED;
}
InterpreterUndropQuery::InterpreterUndropQuery(const ASTPtr & query_ptr_, ContextMutablePtr context_) : WithMutableContext(context_), query_ptr(query_ptr_)
InterpreterUndropQuery::InterpreterUndropQuery(const ASTPtr & query_ptr_, ContextMutablePtr context_)
: WithMutableContext(context_)
, query_ptr(query_ptr_)
{
}
BlockIO InterpreterUndropQuery::execute()
{
getContext()->checkAccess(AccessType::UNDROP_TABLE);
auto & undrop = query_ptr->as<ASTUndropQuery &>();
if (!undrop.cluster.empty() && !maybeRemoveOnCluster(query_ptr, getContext()))
{

View File

@ -13,22 +13,26 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
void TableStatus::write(WriteBuffer & out) const
void TableStatus::write(WriteBuffer & out, UInt64 client_protocol_revision) const
{
writeBinary(is_replicated, out);
if (is_replicated)
{
writeVarUInt(absolute_delay, out);
if (client_protocol_revision >= DBMS_MIN_REVISION_WITH_TABLE_READ_ONLY_CHECK)
writeVarUInt(is_readonly, out);
}
}
void TableStatus::read(ReadBuffer & in)
void TableStatus::read(ReadBuffer & in, UInt64 server_protocol_revision)
{
absolute_delay = 0;
readBinary(is_replicated, in);
if (is_replicated)
{
readVarUInt(absolute_delay, in);
if (server_protocol_revision >= DBMS_MIN_REVISION_WITH_TABLE_READ_ONLY_CHECK)
readVarUInt(is_readonly, in);
}
}
@ -71,14 +75,14 @@ void TablesStatusResponse::write(WriteBuffer & out, UInt64 client_protocol_revis
throw Exception(ErrorCodes::LOGICAL_ERROR, "method TablesStatusResponse::write is called for unsupported client revision");
writeVarUInt(table_states_by_id.size(), out);
for (const auto & kv: table_states_by_id)
for (const auto & kv : table_states_by_id)
{
const QualifiedTableName & table_name = kv.first;
writeBinary(table_name.database, out);
writeBinary(table_name.table, out);
const TableStatus & status = kv.second;
status.write(out);
status.write(out, client_protocol_revision);
}
}
@ -100,7 +104,7 @@ void TablesStatusResponse::read(ReadBuffer & in, UInt64 server_protocol_revision
readBinary(table_name.table, in);
TableStatus status;
status.read(in);
status.read(in, server_protocol_revision);
table_states_by_id.emplace(std::move(table_name), std::move(status));
}
}

View File

@ -28,9 +28,11 @@ struct TableStatus
{
bool is_replicated = false;
UInt32 absolute_delay = 0;
/// Used to filter such nodes out for INSERTs
bool is_readonly = false;
void write(WriteBuffer & out) const;
void read(ReadBuffer & in);
void write(WriteBuffer & out, UInt64 client_protocol_revision) const;
void read(ReadBuffer & in, UInt64 server_protocol_revision);
};
struct TablesStatusRequest

View File

@ -237,10 +237,21 @@ FillingTransform::FillingTransform(
}
logDebug("fill description", dumpSortDescription(fill_description));
std::set<size_t> unique_positions;
std::unordered_set<size_t> ordinary_sort_positions;
for (const auto & desc : sort_description)
{
if (!desc.with_fill)
ordinary_sort_positions.insert(header_.getPositionByName(desc.column_name));
}
std::unordered_set<size_t> unique_positions;
for (auto pos : fill_column_positions)
{
if (!unique_positions.insert(pos).second)
throw Exception(ErrorCodes::INVALID_WITH_FILL_EXPRESSION, "Multiple WITH FILL for identical expressions is not supported in ORDER BY");
if (ordinary_sort_positions.contains(pos))
throw Exception(ErrorCodes::INVALID_WITH_FILL_EXPRESSION, "ORDER BY containing the same expression with and without WITH FILL modifier is not supported");
}
if (use_with_fill_by_sorting_prefix)
{

View File

@ -1124,6 +1124,7 @@ void TCPHandler::processTablesStatusRequest()
{
status.is_replicated = true;
status.absolute_delay = static_cast<UInt32>(replicated_table->getAbsoluteDelay());
status.is_readonly = replicated_table->isTableReadOnly();
}
else
status.is_replicated = false;

View File

@ -232,7 +232,8 @@ void DistributedAsyncInsertBatch::sendBatch(const SettingsChanges & settings_cha
insert_settings.applyChanges(settings_changes);
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings);
connection = parent.pool->get(timeouts);
auto result = parent.pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, parent.storage.remote_storage.getQualifiedName());
connection = std::move(result.front().entry);
compression_expected = connection->getCompression() == Protocol::Compression::Enable;
LOG_DEBUG(parent.log, "Sending a batch of {} files to {} ({} rows, {} bytes).",
@ -289,7 +290,8 @@ void DistributedAsyncInsertBatch::sendSeparateFiles(const SettingsChanges & sett
parent.storage.getContext()->getOpenTelemetrySpanLog());
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings);
auto connection = parent.pool->get(timeouts);
auto result = parent.pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, parent.storage.remote_storage.getQualifiedName());
auto connection = std::move(result.front().entry);
bool compression_expected = connection->getCompression() == Protocol::Compression::Enable;
RemoteInserter remote(*connection, timeouts,

View File

@ -101,7 +101,7 @@ DistributedAsyncInsertDirectoryQueue::DistributedAsyncInsertDirectoryQueue(
StorageDistributed & storage_,
const DiskPtr & disk_,
const std::string & relative_path_,
ConnectionPoolPtr pool_,
ConnectionPoolWithFailoverPtr pool_,
ActionBlocker & monitor_blocker_,
BackgroundSchedulePool & bg_pool)
: storage(storage_)
@ -237,7 +237,7 @@ void DistributedAsyncInsertDirectoryQueue::run()
}
ConnectionPoolPtr DistributedAsyncInsertDirectoryQueue::createPool(const Cluster::Addresses & addresses, const StorageDistributed & storage)
ConnectionPoolWithFailoverPtr DistributedAsyncInsertDirectoryQueue::createPool(const Cluster::Addresses & addresses, const StorageDistributed & storage)
{
const auto pool_factory = [&storage] (const Cluster::Address & address) -> ConnectionPoolPtr
{
@ -284,7 +284,7 @@ ConnectionPoolPtr DistributedAsyncInsertDirectoryQueue::createPool(const Cluster
auto pools = createPoolsForAddresses(addresses, pool_factory, storage.log);
const auto settings = storage.getContext()->getSettings();
return pools.size() == 1 ? pools.front() : std::make_shared<ConnectionPoolWithFailover>(pools,
return std::make_shared<ConnectionPoolWithFailover>(std::move(pools),
settings.load_balancing,
settings.distributed_replica_error_half_life.totalSeconds(),
settings.distributed_replica_error_cap);
@ -412,7 +412,9 @@ void DistributedAsyncInsertDirectoryQueue::processFile(std::string & file_path,
insert_settings.applyChanges(settings_changes);
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings);
auto connection = pool->get(timeouts, insert_settings);
auto result = pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, storage.remote_storage.getQualifiedName());
auto connection = std::move(result.front().entry);
LOG_DEBUG(log, "Sending `{}` to {} ({} rows, {} bytes)",
file_path,
connection->getDescription(),

View File

@ -50,13 +50,13 @@ public:
StorageDistributed & storage_,
const DiskPtr & disk_,
const std::string & relative_path_,
ConnectionPoolPtr pool_,
ConnectionPoolWithFailoverPtr pool_,
ActionBlocker & monitor_blocker_,
BackgroundSchedulePool & bg_pool);
~DistributedAsyncInsertDirectoryQueue();
static ConnectionPoolPtr createPool(const Cluster::Addresses & addresses, const StorageDistributed & storage);
static ConnectionPoolWithFailoverPtr createPool(const Cluster::Addresses & addresses, const StorageDistributed & storage);
void updatePath(const std::string & new_relative_path);
@ -111,7 +111,7 @@ private:
std::string getLoggerName() const;
StorageDistributed & storage;
const ConnectionPoolPtr pool;
const ConnectionPoolWithFailoverPtr pool;
DiskPtr disk;
std::string relative_path;

View File

@ -112,19 +112,17 @@ DistributedSink::DistributedSink(
const ClusterPtr & cluster_,
bool insert_sync_,
UInt64 insert_timeout_,
StorageID main_table_,
const Names & columns_to_send_)
: SinkToStorage(metadata_snapshot_->getSampleBlock())
, context(Context::createCopy(context_))
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, query_ast(createInsertToRemoteTableQuery(main_table_.database_name, main_table_.table_name, columns_to_send_))
, query_ast(createInsertToRemoteTableQuery(storage.remote_storage.database_name, storage.remote_storage.table_name, columns_to_send_))
, query_string(queryToString(query_ast))
, cluster(cluster_)
, insert_sync(insert_sync_)
, allow_materialized(context->getSettingsRef().insert_allow_materialized_columns)
, insert_timeout(insert_timeout_)
, main_table(main_table_)
, columns_to_send(columns_to_send_.begin(), columns_to_send_.end())
, log(getLogger("DistributedSink"))
{
@ -372,10 +370,9 @@ DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, si
throw Exception(ErrorCodes::LOGICAL_ERROR, "There are several writing job for an automatically replicated shard");
/// TODO: it make sense to rewrite skip_unavailable_shards and max_parallel_replicas here
auto results = shard_info.pool->getManyChecked(timeouts, settings, PoolMode::GET_ONE, main_table.getQualifiedName());
if (results.empty() || results.front().entry.isNull())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected exactly one connection for shard {}", toString(job.shard_index));
/// NOTE: INSERT will also take into account max_replica_delay_for_distributed_queries
/// (anyway fallback_to_stale_replicas_for_distributed_queries=true by default)
auto results = shard_info.pool->getManyCheckedForInsert(timeouts, settings, PoolMode::GET_ONE, storage.remote_storage.getQualifiedName());
job.connection_entry = std::move(results.front().entry);
}
else

View File

@ -46,7 +46,6 @@ public:
const ClusterPtr & cluster_,
bool insert_sync_,
UInt64 insert_timeout_,
StorageID main_table_,
const Names & columns_to_send_);
String getName() const override { return "DistributedSink"; }
@ -108,7 +107,6 @@ private:
/// Sync-related stuff
UInt64 insert_timeout; // in seconds
StorageID main_table;
NameSet columns_to_send;
Stopwatch watch;
Stopwatch watch_current_block;

View File

@ -334,6 +334,7 @@ StorageDistributed::StorageDistributed(
, remote_database(remote_database_)
, remote_table(remote_table_)
, remote_table_function_ptr(remote_table_function_ptr_)
, remote_storage(remote_table_function_ptr ? StorageID::createEmpty() : StorageID{remote_database, remote_table})
, log(getLogger("StorageDistributed (" + id_.table_name + ")"))
, owned_cluster(std::move(owned_cluster_))
, cluster_name(getContext()->getMacros()->expand(cluster_name_))
@ -896,10 +897,6 @@ void StorageDistributed::read(
return;
}
StorageID main_table = StorageID::createEmpty();
if (!remote_table_function_ptr)
main_table = StorageID{remote_database, remote_table};
const auto & snapshot_data = assert_cast<const SnapshotData &>(*storage_snapshot->data);
ClusterProxy::SelectStreamFactory select_stream_factory =
ClusterProxy::SelectStreamFactory(
@ -932,7 +929,7 @@ void StorageDistributed::read(
query_plan,
header,
processed_stage,
main_table,
remote_storage,
remote_table_function_ptr,
select_stream_factory,
log,
@ -978,10 +975,8 @@ SinkToStoragePtr StorageDistributed::write(const ASTPtr &, const StorageMetadata
else
columns_to_send = metadata_snapshot->getSampleBlockNonMaterialized().getNames();
/// DistributedSink will not own cluster
return std::make_shared<DistributedSink>(
local_context, *this, metadata_snapshot, cluster, insert_sync, timeout,
StorageID{remote_database, remote_table}, columns_to_send);
/// DistributedSink will not own cluster, but will own ConnectionPools of the cluster
return std::make_shared<DistributedSink>(local_context, *this, metadata_snapshot, cluster, insert_sync, timeout, columns_to_send);
}

View File

@ -241,6 +241,7 @@ private:
String remote_database;
String remote_table;
ASTPtr remote_table_function_ptr;
StorageID remote_storage;
LoggerPtr log;
@ -275,7 +276,7 @@ private:
struct ClusterNodeData
{
std::shared_ptr<DistributedAsyncInsertDirectoryQueue> directory_queue;
ConnectionPoolPtr connection_pool;
ConnectionPoolWithFailoverPtr connection_pool;
Cluster::Addresses addresses;
size_t clusters_version;
};

View File

@ -70,6 +70,16 @@ static void removeNonCommonColumns(const Block & src_header, Block & target_head
target_header.erase(target_only_positions);
}
namespace
{
void checkTargetTableHasQueryOutputColumns(const ColumnsDescription & target_table_columns, const ColumnsDescription & select_query_output_columns)
{
for (const auto & column : select_query_output_columns)
if (!target_table_columns.has(column.name))
throw Exception(ErrorCodes::NO_SUCH_COLUMN_IN_TABLE, "Column {} does not exist in the materialized view's inner table", column.name);
}
}
StorageMaterializedView::StorageMaterializedView(
const StorageID & table_id_,
ContextPtr local_context,
@ -402,11 +412,13 @@ void StorageMaterializedView::alter(
/// Check the materialized view's inner table structure.
if (has_inner_table)
{
const Block & block = InterpreterSelectWithUnionQuery::getSampleBlock(new_select.select_query, local_context);
const auto & inner_table_metadata = tryGetTargetTable()->getInMemoryMetadata().columns;
for (const auto & name : block.getNames())
if (!inner_table_metadata.has(name))
throw Exception(ErrorCodes::NO_SUCH_COLUMN_IN_TABLE, "Column {} does not exist in the materialized view's inner table", name);
/// If this materialized view has an inner table it should always have the same columns as this materialized view.
/// Try to find mistakes in the select query (it shouldn't have columns which are not in the inner table).
auto target_table_metadata = getTargetTable()->getInMemoryMetadataPtr();
const auto & select_query_output_columns = new_metadata.columns; /// AlterCommands::alter() analyzed the query and assigned `new_metadata.columns` before.
checkTargetTableHasQueryOutputColumns(target_table_metadata->columns, select_query_output_columns);
/// We need to copy the target table's columns (after checkTargetTableHasQueryOutputColumns() they can be still different - e.g. the data types of those columns can differ).
new_metadata.columns = target_table_metadata->columns;
}
DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(local_context, table_id, new_metadata);

View File

@ -351,7 +351,7 @@ public:
bool canUseZeroCopyReplication() const;
bool isTableReadOnly () { return is_readonly; }
bool isTableReadOnly () { return is_readonly || isStaticStorage(); }
std::optional<bool> hasMetadataInZooKeeper () { return has_metadata_in_zookeeper; }

View File

@ -245,6 +245,7 @@ const char * auto_contributors[] {
"Brendan Cox",
"Brett Hoerner",
"Brian Hunter",
"Brokenice0415",
"Bulat Gaifullin",
"Camden Cheek",
"Camilo Sierra",
@ -286,6 +287,7 @@ const char * auto_contributors[] {
"Dale Mcdiarmid",
"Dalitso Banda",
"Dan Roscigno",
"Dan Wu",
"DanRoscigno",
"Dani Pozo",
"Daniel Bershatsky",
@ -294,6 +296,7 @@ const char * auto_contributors[] {
"Daniel Kutenin",
"Daniel Pozo Escalona",
"Daniel Qin",
"Daniil Ivanik",
"Daniil Rubin",
"Danila Kutenin",
"Daniël van Eeden",
@ -634,6 +637,7 @@ const char * auto_contributors[] {
"LiuCong",
"LiuNeng",
"LiuYangkuan",
"LiuYuan",
"Lloyd-Pottiger",
"Lopatin Konstantin",
"Lorenzo Mangani",
@ -668,6 +672,7 @@ const char * auto_contributors[] {
"Marek Vavruša",
"Marek Vavruša",
"Mariano Benítez Mulet",
"Marina Fathouat",
"Mark Andreev",
"Mark Frost",
"Mark Needham",
@ -767,6 +772,7 @@ const char * auto_contributors[] {
"N. Kolotov",
"NIKITA MIKHAILOV",
"Narek Galstyan",
"Nataly Merezhuk",
"Natalya Chizhonkova",
"Natasha Murashkina",
"NeZeD [Mac Pro]",
@ -787,6 +793,7 @@ const char * auto_contributors[] {
"Nikhil Raman",
"Nikifor Seriakov",
"Nikita",
"Nikita Fomichev",
"Nikita Keba",
"Nikita Lapkov",
"Nikita Mikhailov",
@ -804,10 +811,12 @@ const char * auto_contributors[] {
"Nikolay Degterinsky",
"Nikolay Edigaryev",
"Nikolay Kirsh",
"Nikolay Monkov",
"Nikolay Semyachkin",
"Nikolay Shcheglov",
"Nikolay Vasiliev",
"Nikolay Volosatov",
"Nikolay Yankin",
"Nir Peled",
"Nityananda Gohain",
"Niu Zhaojie",
@ -831,11 +840,13 @@ const char * auto_contributors[] {
"Orkhan Zeynalli",
"Oskar Wojciski",
"OuO",
"Oxide Computer Company",
"PHO",
"Pablo Alegre",
"Pablo Marcos",
"Pablo Musa",
"Palash Goel",
"PapaToemmsn",
"Paramtamtam",
"Patrick Zippenfenig",
"Paul Loyd",
@ -859,7 +870,9 @@ const char * auto_contributors[] {
"Persiyanov Dmitriy Andreevich",
"Pervakov Grigorii",
"Pervakov Grigory",
"Peter",
"Petr Vasilev",
"Pham Anh Tuan",
"Philip Hallstrom",
"Philippe Ombredanne",
"PigInCloud",
@ -973,11 +986,14 @@ const char * auto_contributors[] {
"SevaCode",
"Seyed Mehrshad Hosseini",
"Shane Andrade",
"Shanfeng Pang",
"Shani Elharrar",
"Shaun Struwig",
"Sherry Wang",
"Shoh Jahon",
"Shri Bodas",
"Shuai li",
"Shubham Ranjan",
"Sichen Zhao",
"SiderZhang",
"Sidorov Pavel",
@ -1139,6 +1155,7 @@ const char * auto_contributors[] {
"Wangyang Guo",
"Waterkin",
"Weiqing Xu",
"William Schoeffel",
"William Shallum",
"Winter Zhang",
"Xbitz29",
@ -1252,6 +1269,7 @@ const char * auto_contributors[] {
"awesomeleo",
"bakam412",
"bbkas",
"beetelbrox",
"benamazing",
"benbiti",
"bgranvea",
@ -1261,6 +1279,7 @@ const char * auto_contributors[] {
"bkuschel",
"blazerer",
"bluebirddm",
"bluikko",
"bo zeng",
"bobrovskij artemij",
"booknouse",
@ -1309,6 +1328,7 @@ const char * auto_contributors[] {
"d.v.semenov",
"dalei2019",
"damozhaeva",
"danila-ermakov",
"dankondr",
"daoready",
"darkkeks",
@ -1324,6 +1344,7 @@ const char * auto_contributors[] {
"dheerajathrey",
"dimarub2000",
"dinosaur",
"divanik",
"divanorama",
"dkxiaohei",
"dmi-feo",
@ -1454,6 +1475,7 @@ const char * auto_contributors[] {
"joelynch",
"johanngan",
"johnnymatthews",
"josh-hildred",
"jsc0218",
"jthmath",
"jun won",
@ -1595,6 +1617,7 @@ const char * auto_contributors[] {
"nautaa",
"ndchikin",
"nellicus",
"nemonlou",
"neng.liu",
"never lee",
"ni1l",
@ -1637,6 +1660,7 @@ const char * auto_contributors[] {
"pufit",
"pyos",
"pzhdfy",
"qaziqarta",
"qianlixiang",
"qianmoQ",
"qieqieplus",
@ -1684,8 +1708,10 @@ const char * auto_contributors[] {
"sev7e0",
"sevirov",
"sfod",
"shabroo",
"shangshujie",
"shedx",
"shuai-xu",
"shuchaome",
"shuyang",
"sichenzhao",
@ -1710,6 +1736,7 @@ const char * auto_contributors[] {
"sundy-li",
"sundyli",
"sunlisheng",
"sunny",
"sunny19930321",
"svladykin",
"tai",
@ -1733,6 +1760,7 @@ const char * auto_contributors[] {
"tiger.yan",
"timfursov",
"tison",
"tomershafir",
"tomtana",
"topvisor",
"tpanetti",
@ -1740,6 +1768,7 @@ const char * auto_contributors[] {
"tyrionhuang",
"ubuntu",
"una",
"unashi",
"unbyte",
"unegare",
"unknown",
@ -1882,6 +1911,7 @@ const char * auto_contributors[] {
"董海镔",
"袁焊忠",
"谢磊",
"豪肥肥",
"贾顺名(Jarvis)",
"郭小龙",
"陈小玉",

View File

@ -24,6 +24,14 @@
#include <Interpreters/Context.h>
#include <Interpreters/DatabaseCatalog.h>
namespace
{
constexpr auto * database_column_name = "database";
constexpr auto * table_column_name = "table";
constexpr auto * engine_column_name = "engine";
constexpr auto * active_column_name = "active";
constexpr auto * storage_uuid_column_name = "storage_uuid";
}
namespace DB
{
@ -112,7 +120,7 @@ StoragesInfoStream::StoragesInfoStream(const ActionsDAGPtr & filter_by_database,
database_column_mut->insert(database.first);
}
block_to_filter.insert(ColumnWithTypeAndName(
std::move(database_column_mut), std::make_shared<DataTypeString>(), "database"));
std::move(database_column_mut), std::make_shared<DataTypeString>(), database_column_name));
/// Filter block_to_filter with column 'database'.
if (filter_by_database)
@ -120,7 +128,7 @@ StoragesInfoStream::StoragesInfoStream(const ActionsDAGPtr & filter_by_database,
rows = block_to_filter.rows();
/// Block contains new columns, update database_column.
ColumnPtr database_column_for_filter = block_to_filter.getByName("database").column;
ColumnPtr database_column_for_filter = block_to_filter.getByName(database_column_name).column;
if (rows)
{
@ -187,10 +195,10 @@ StoragesInfoStream::StoragesInfoStream(const ActionsDAGPtr & filter_by_database,
}
}
block_to_filter.insert(ColumnWithTypeAndName(std::move(table_column_mut), std::make_shared<DataTypeString>(), "table"));
block_to_filter.insert(ColumnWithTypeAndName(std::move(engine_column_mut), std::make_shared<DataTypeString>(), "engine"));
block_to_filter.insert(ColumnWithTypeAndName(std::move(active_column_mut), std::make_shared<DataTypeUInt8>(), "active"));
block_to_filter.insert(ColumnWithTypeAndName(std::move(storage_uuid_column_mut), std::make_shared<DataTypeUUID>(), "uuid"));
block_to_filter.insert(ColumnWithTypeAndName(std::move(table_column_mut), std::make_shared<DataTypeString>(), table_column_name));
block_to_filter.insert(ColumnWithTypeAndName(std::move(engine_column_mut), std::make_shared<DataTypeString>(), engine_column_name));
block_to_filter.insert(ColumnWithTypeAndName(std::move(active_column_mut), std::make_shared<DataTypeUInt8>(), active_column_name));
block_to_filter.insert(ColumnWithTypeAndName(std::move(storage_uuid_column_mut), std::make_shared<DataTypeUUID>(), storage_uuid_column_name));
if (rows)
{
@ -200,10 +208,10 @@ StoragesInfoStream::StoragesInfoStream(const ActionsDAGPtr & filter_by_database,
rows = block_to_filter.rows();
}
database_column = block_to_filter.getByName("database").column;
table_column = block_to_filter.getByName("table").column;
active_column = block_to_filter.getByName("active").column;
storage_uuid_column = block_to_filter.getByName("uuid").column;
database_column = block_to_filter.getByName(database_column_name).column;
table_column = block_to_filter.getByName(table_column_name).column;
active_column = block_to_filter.getByName(active_column_name).column;
storage_uuid_column = block_to_filter.getByName(storage_uuid_column_name).column;
}
class ReadFromSystemPartsBase : public SourceStepWithFilter
@ -261,16 +269,16 @@ void ReadFromSystemPartsBase::applyFilters(ActionDAGNodes added_filter_nodes)
const auto * predicate = filter_actions_dag->getOutputs().at(0);
Block block;
block.insert(ColumnWithTypeAndName({}, std::make_shared<DataTypeString>(), "database"));
block.insert(ColumnWithTypeAndName({}, std::make_shared<DataTypeString>(), database_column_name));
filter_by_database = VirtualColumnUtils::splitFilterDagForAllowedInputs(predicate, &block);
if (filter_by_database)
VirtualColumnUtils::buildSetsForDAG(filter_by_database, context);
block.insert(ColumnWithTypeAndName({}, std::make_shared<DataTypeString>(), "table"));
block.insert(ColumnWithTypeAndName({}, std::make_shared<DataTypeString>(), "engine"));
block.insert(ColumnWithTypeAndName({}, std::make_shared<DataTypeUInt8>(), "active"));
block.insert(ColumnWithTypeAndName({}, std::make_shared<DataTypeUUID>(), "uuid"));
block.insert(ColumnWithTypeAndName({}, std::make_shared<DataTypeString>(), table_column_name));
block.insert(ColumnWithTypeAndName({}, std::make_shared<DataTypeString>(), engine_column_name));
block.insert(ColumnWithTypeAndName({}, std::make_shared<DataTypeUInt8>(), active_column_name));
block.insert(ColumnWithTypeAndName({}, std::make_shared<DataTypeUUID>(), storage_uuid_column_name));
filter_by_other_columns = VirtualColumnUtils::splitFilterDagForAllowedInputs(predicate, &block);
if (filter_by_other_columns)

View File

@ -71,6 +71,7 @@ namespace ErrorCodes
extern const int QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW;
extern const int SUPPORT_IS_DISABLED;
extern const int TABLE_WAS_NOT_DROPPED;
extern const int NO_SUCH_COLUMN_IN_TABLE;
extern const int NOT_IMPLEMENTED;
extern const int UNSUPPORTED_METHOD;
}
@ -339,6 +340,13 @@ namespace
table_expr->children.push_back(table_expr->database_and_table_name);
return fetch_query;
}
void checkTargetTableHasQueryOutputColumns(const ColumnsDescription & target_table_columns, const ColumnsDescription & select_query_output_columns)
{
for (const auto & column : select_query_output_columns)
if (!target_table_columns.has(column.name))
throw Exception(ErrorCodes::NO_SUCH_COLUMN_IN_TABLE, "Column {} does not exist in the window view's inner table", column.name);
}
}
static void extractDependentTable(ContextPtr context, ASTPtr & query, String & select_database_name, String & select_table_name)
@ -482,6 +490,18 @@ void StorageWindowView::alter(
new_metadata.setSelectQuery(new_select);
/// Check the window view's inner target table structure.
if (has_inner_target_table)
{
/// If this window view has an inner target table it should always have the same columns as this window view.
/// Try to find mistakes in the select query (it shouldn't have columns which are not in the inner target table).
auto target_table_metadata = getTargetTable()->getInMemoryMetadataPtr();
const auto & select_query_output_columns = new_metadata.columns; /// AlterCommands::alter() analyzed the query and assigned `new_metadata.columns` before.
checkTargetTableHasQueryOutputColumns(target_table_metadata->columns, select_query_output_columns);
/// We need to copy the target table's columns (after checkTargetTableHasQueryOutputColumns() they can be still different - e.g. in data types).
new_metadata.columns = target_table_metadata->columns;
}
DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(local_context, table_id, new_metadata);
setInMemoryMetadata(new_metadata);

View File

@ -2,3 +2,9 @@
01624_soft_constraints
02354_vector_search_queries
02901_parallel_replicas_rollup
02999_scalar_subqueries_bug_2
# Flaky list
01825_type_json_in_array
01414_mutations_and_errors_zookeeper
# Check after ConstantNode refactoring
02154_parser_backtracking

View File

@ -12,7 +12,7 @@ from copy import deepcopy
from dataclasses import asdict, dataclass
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, Set, Union
from typing import Any, Dict, List, Optional, Sequence, Set, Tuple, Union
import docker_images_helper
import upload_result_helper
@ -733,6 +733,233 @@ class CiCache:
return await_finished
@dataclass
class CiOptions:
# job will be included in the run if any keyword from the list matches job name
include_keywords: Optional[List[str]] = None
# job will be excluded in the run if any keyword from the list matches job name
exclude_keywords: Optional[List[str]] = None
# list of specified preconfigured ci sets to run
ci_sets: Optional[List[str]] = None
# list of specified jobs to run
ci_jobs: Optional[List[str]] = None
# btaches to run for all multi-batch jobs
job_batches: Optional[List[int]] = None
do_not_test: bool = False
no_ci_cache: bool = False
no_merge_commit: bool = False
def as_dict(self) -> Dict[str, Any]:
return asdict(self)
@staticmethod
def create_from_run_config(run_config: Dict[str, Any]) -> "CiOptions":
return CiOptions(**run_config["ci_options"])
@staticmethod
def create_from_pr_message(
debug_message: Optional[str], update_from_api: bool
) -> "CiOptions":
"""
Creates CiOptions instance based on tags found in PR body and/or commit message
@commit_message - may be provided directly for debugging purposes, otherwise it will be retrieved from git.
"""
res = CiOptions()
pr_info = PRInfo()
if (
not pr_info.is_pr() and not debug_message
): # if commit_message is provided it's test/debug scenario - do not return
# CI options can be configured in PRs only
return res
message = debug_message or GitRunner(set_cwd_to_git_root=True).run(
f"{GIT_PREFIX} log {pr_info.sha} --format=%B -n 1"
)
pattern = r"(#|- \[x\] +<!---)(\w+)"
matches = [match[-1] for match in re.findall(pattern, message)]
print(f"CI tags from commit message: [{matches}]")
if not debug_message: # to be skipped if debug/test
pr_info = PRInfo(
pr_event_from_api=update_from_api
) # Fetch updated PR body from GH API
matches_pr = [match[-1] for match in re.findall(pattern, pr_info.body)]
print(f"CI tags from PR body: [{matches_pr}]")
matches = list(set(matches + matches_pr))
if "do not test" in pr_info.labels:
# do_not_test could be set in GH labels
res.do_not_test = True
for match in matches:
if match.startswith("job_"):
if not res.ci_jobs:
res.ci_jobs = []
res.ci_jobs.append(match.removeprefix("job_"))
elif match.startswith("ci_set_") and match in Labels:
if not res.ci_sets:
res.ci_sets = []
res.ci_sets.append(match)
elif match.startswith("ci_include_"):
if not res.include_keywords:
res.include_keywords = []
res.include_keywords.append(
normalize_check_name(match.removeprefix("ci_include_"))
)
elif match.startswith("ci_exclude_"):
if not res.exclude_keywords:
res.exclude_keywords = []
res.exclude_keywords.append(
normalize_check_name(match.removeprefix("ci_exclude_"))
)
elif match == Labels.NO_CI_CACHE:
res.no_ci_cache = True
print("NOTE: CI Cache will be disabled")
elif match == Labels.DO_NOT_TEST_LABEL:
res.do_not_test = True
elif match == Labels.NO_MERGE_COMMIT:
res.no_merge_commit = True
print("NOTE: Merge Commit will be disabled")
elif match.startswith("batch_"):
batches = []
try:
batches = [
int(batch) for batch in match.removeprefix("batch_").split("_")
]
except Exception:
print(f"ERROR: failed to parse commit tag [{match}] - skip")
if batches:
if not res.job_batches:
res.job_batches = []
res.job_batches += batches
res.job_batches = list(set(res.job_batches))
else:
print(
f"WARNING: Invalid tag in commit message or PR body [{match}] - skip"
)
return res
def apply(
self,
jobs_to_do: List[str],
jobs_to_skip: List[str],
jobs_params: Dict[str, Dict[str, Any]],
) -> Tuple[List[str], List[str], Dict[str, Dict[str, Any]]]:
"""
Applies specified options on CI Run Config
Returns updated jobs_to_do, jobs_to_skip, jobs_params
"""
jobs_to_do_requested = [] # type: List[str]
# -1. Handle "ci_exclude_" tags if any
if self.exclude_keywords:
new_jobs_to_do = list(jobs_to_do)
for job in jobs_to_do:
found = False
for keyword in self.exclude_keywords:
if keyword in normalize_check_name(job):
print(
f"Job [{job}] matches Exclude keyword [{keyword}] - remove"
)
found = True
break
if found:
new_jobs_to_do.remove(job)
jobs_to_do = new_jobs_to_do
# 0. Handle "ci_include_" tags if any
if self.include_keywords:
for job in jobs_to_do:
found = False
for keyword in self.include_keywords:
if keyword in normalize_check_name(job):
print(f"Job [{job}] matches Include keyword [{keyword}] - add")
found = True
break
if found:
job_with_parents = CI_CONFIG.get_job_with_parents(job)
for job in job_with_parents:
if job in jobs_to_do and job not in jobs_to_do_requested:
jobs_to_do_requested.append(job)
assert (
jobs_to_do_requested
), "Include tags are set but now job configured - Invalid tags, probably [{self.include_keywords}]"
if JobNames.STYLE_CHECK not in jobs_to_do_requested:
# Style check must not be omitted
jobs_to_do_requested.append(JobNames.STYLE_CHECK)
# FIXME: to be removed in favor of include/exclude
# 1. Handle "ci_set_" tags if any
if self.ci_sets:
for tag in self.ci_sets:
label_config = CI_CONFIG.get_label_config(tag)
assert label_config, f"Unknonwn tag [{tag}]"
print(
f"NOTE: CI Set's tag: [{tag}], add jobs: [{label_config.run_jobs}]"
)
jobs_to_do_requested += label_config.run_jobs
# FIXME: to be removed in favor of include/exclude
# 2. Handle "job_" tags if any
if self.ci_jobs:
for job in self.ci_jobs:
job_with_parents = CI_CONFIG.get_job_with_parents(job)
print(
f"NOTE: CI Job's tag: [#job_{job}], add jobs: [{job_with_parents}]"
)
# always add requested job itself, even if it could be skipped
jobs_to_do_requested.append(job_with_parents[0])
for parent in job_with_parents[1:]:
if parent in jobs_to_do and parent not in jobs_to_do_requested:
jobs_to_do_requested.append(parent)
# 3. Handle "do not test"
if self.do_not_test:
label_config = CI_CONFIG.get_label_config(Labels.DO_NOT_TEST_LABEL)
assert label_config
print(
f"NOTE: CI 'do not test' setting applied, set jobs: [{label_config.run_jobs}]"
)
if jobs_to_do_requested:
print(
"WARNING: 'do not test' is used alongside with other CI modifying tags - 'do not test' prevails"
)
jobs_to_do_requested = list(label_config.run_jobs)
if jobs_to_do_requested:
jobs_to_do_requested = list(set(jobs_to_do_requested))
print(
f"NOTE: Only specific job(s) were requested by user's input: [{jobs_to_do_requested}]"
)
jobs_to_do = list(
set(job for job in jobs_to_do_requested if job not in jobs_to_skip)
)
# if requested job does not have params in jobs_params (it happens for "run_by_label" job)
# we need to add params - otherwise it won't run as "batches" list will be empty
for job in jobs_to_do:
if job not in jobs_params:
num_batches = CI_CONFIG.get_job_config(job).num_batches
jobs_params[job] = {
"batches": list(range(num_batches)),
"num_batches": num_batches,
}
# 4. Handle "batch_" tags
if self.job_batches:
print(
f"NOTE: Only specific job batches were requested [{self.job_batches}]"
)
for job, params in jobs_params.items():
if params["num_batches"] > 1:
params["batches"] = self.job_batches
return jobs_to_do, jobs_to_skip, jobs_params
def get_check_name(check_name: str, batch: int, num_batches: int) -> str:
res = check_name
if num_batches > 1:
@ -1095,8 +1322,7 @@ def _configure_jobs(
job_digester: JobDigester,
s3: S3Helper,
pr_info: PRInfo,
commit_tokens: List[str],
ci_cache_disabled: bool,
ci_options: CiOptions,
) -> Dict:
## a. digest each item from the config
job_digester = JobDigester()
@ -1106,7 +1332,7 @@ def _configure_jobs(
digests: Dict[str, str] = {}
print("::group::Job Digests")
for job in CI_CONFIG.job_generator(pr_info.head_ref):
for job in CI_CONFIG.job_generator(pr_info.head_ref if CI else "dummy_branch_name"):
digest = job_digester.get_job_digest(CI_CONFIG.get_digest_config(job))
digests[job] = digest
print(f" job [{job.rjust(50)}] has digest [{digest}]")
@ -1114,7 +1340,7 @@ def _configure_jobs(
## b. check what we need to run
ci_cache = None
if not ci_cache_disabled and CI:
if not ci_options.no_ci_cache and CI:
ci_cache = CiCache(s3, digests).update()
ci_cache.print_status()
@ -1204,91 +1430,9 @@ def _configure_jobs(
job for job in jobs_to_do if job not in jobs_to_remove_randomization
]
## c. check CI controlling labels and commit messages
if pr_info.labels:
jobs_requested_by_label = [] # type: List[str]
ci_controlling_labels = [] # type: List[str]
for label in pr_info.labels:
label_config = CI_CONFIG.get_label_config(label)
if label_config:
jobs_requested_by_label += label_config.run_jobs
ci_controlling_labels += [label]
if ci_controlling_labels:
print(f"NOTE: CI controlling labels are set: [{ci_controlling_labels}]")
print(
f" : following jobs will be executed: [{jobs_requested_by_label}]"
)
# so far there is only "do not test" label in the config that runs only Style check.
# check later if we need to filter out requested jobs using ci cache. right now we do it:
jobs_to_do = [job for job in jobs_requested_by_label if job in jobs_to_do]
if commit_tokens:
jobs_to_do_requested = [] # type: List[str]
# handle ci set tokens
ci_controlling_tokens = [
token for token in commit_tokens if token in CI_CONFIG.label_configs
]
for token_ in ci_controlling_tokens:
label_config = CI_CONFIG.get_label_config(token_)
assert label_config, f"Unknonwn token [{token_}]"
print(f"NOTE: CI modifier: [{token_}], add jobs: [{label_config.run_jobs}]")
jobs_to_do_requested += label_config.run_jobs
# handle specific job requests
requested_jobs = [
token[len("job_") :] for token in commit_tokens if token.startswith("job_")
]
if requested_jobs:
assert any(
len(x) > 1 for x in requested_jobs
), f"Invalid job names requested [{requested_jobs}]"
for job in requested_jobs:
job_with_parents = CI_CONFIG.get_job_with_parents(job)
print(
f"NOTE: CI modifier: [#job_{job}], add jobs: [{job_with_parents}]"
)
# always add requested job itself, even if it could be skipped
jobs_to_do_requested.append(job_with_parents[0])
for parent in job_with_parents[1:]:
if parent in jobs_to_do and parent not in jobs_to_do_requested:
jobs_to_do_requested.append(parent)
if jobs_to_do_requested:
jobs_to_do_requested = list(set(jobs_to_do_requested))
print(
f"NOTE: Only specific job(s) were requested by commit message tokens: [{jobs_to_do_requested}]"
)
jobs_to_do = list(
set(job for job in jobs_to_do_requested if job not in jobs_to_skip)
)
# if requested job does not have params in jobs_params (it happens for "run_by_label" job)
# we need to add params - otherwise it won't run as "batches" list will be empty
for job in jobs_to_do:
if job not in jobs_params:
num_batches = CI_CONFIG.get_job_config(job).num_batches
jobs_params[job] = {
"batches": list(range(num_batches)),
"num_batches": num_batches,
}
requested_batches = set()
for token in commit_tokens:
if token.startswith("batch_"):
try:
batches = [
int(batch) for batch in token.removeprefix("batch_").split("_")
]
except Exception:
print(f"ERROR: failed to parse commit tag [{token}]")
requested_batches.update(batches)
if requested_batches:
print(
f"NOTE: Only specific job batches were requested [{list(requested_batches)}]"
)
for job, params in jobs_params.items():
if params["num_batches"] > 1:
params["batches"] = list(requested_batches)
jobs_to_do, jobs_to_skip, jobs_params = ci_options.apply(
jobs_to_do, jobs_to_skip, jobs_params
)
if pr_info.is_merge_queue():
# FIXME: Quick support for MQ workflow which is only StyleCheck for now
@ -1349,7 +1493,7 @@ def _create_gh_status(
def _update_gh_statuses_action(indata: Dict, s3: S3Helper) -> None:
if indata["ci_flags"][Labels.NO_CI_CACHE]:
if CiOptions.create_from_run_config(indata).no_ci_cache:
print("CI cache is disabled - skip restoring commit statuses from CI cache")
return
job_digests = indata["jobs_data"]["digests"]
@ -1690,25 +1834,14 @@ def main() -> int:
### CONFIGURE action: start
if args.configure:
# if '#no_merge_commit' is set in commit message - set git ref to PR branch head to avoid merge-commit
tokens = []
ci_flags = {
Labels.NO_MERGE_COMMIT: False,
Labels.NO_CI_CACHE: False,
}
if (pr_info.number != 0 and not args.skip_jobs) or args.commit_message:
message = args.commit_message or git_runner.run(
f"{GIT_PREFIX} log {pr_info.sha} --format=%B -n 1"
)
tokens = _fetch_commit_tokens(message, pr_info)
if Labels.NO_MERGE_COMMIT in tokens and CI:
git_runner.run(f"{GIT_PREFIX} checkout {pr_info.sha}")
git_ref = git_runner.run(f"{GIT_PREFIX} rev-parse HEAD")
ci_flags[Labels.NO_MERGE_COMMIT] = True
print("NOTE: Disable Merge Commit")
if Labels.NO_CI_CACHE in tokens:
ci_flags[Labels.NO_CI_CACHE] = True
print("NOTE: Disable CI Cache")
ci_options = CiOptions.create_from_pr_message(
args.commit_message or None, update_from_api=True
)
# tokens = _fetch_commit_tokens(message, pr_info)
if ci_options.no_merge_commit and CI:
git_runner.run(f"{GIT_PREFIX} checkout {pr_info.sha}")
git_ref = git_runner.run(f"{GIT_PREFIX} rev-parse HEAD")
docker_data = {}
git_ref = git_runner.run(f"{GIT_PREFIX} rev-parse HEAD")
@ -1735,8 +1868,7 @@ def main() -> int:
job_digester,
s3,
pr_info,
tokens,
ci_flags[Labels.NO_CI_CACHE],
ci_options,
)
if not args.skip_jobs
else {}
@ -1790,7 +1922,7 @@ def main() -> int:
result["version"] = version
result["build"] = build_digest
result["docs"] = docs_digest
result["ci_flags"] = ci_flags
result["ci_options"] = ci_options.as_dict()
if not args.skip_jobs:
result["stages_data"] = _generate_ci_stage_config(jobs_data)
result["jobs_data"] = jobs_data
@ -1805,6 +1937,7 @@ def main() -> int:
### RUN action: start
elif args.run:
assert indata
ci_options = CiOptions.create_from_run_config(indata)
check_name = args.job_name
check_name_with_group = _get_ext_check_name(check_name)
print(
@ -1815,7 +1948,7 @@ def main() -> int:
# this is a build job - check if build report is present
build_result = (
BuildResult.load_any(check_name, pr_info.number, pr_info.head_ref)
if not indata["ci_flags"][Labels.NO_CI_CACHE]
if not ci_options.no_ci_cache
else None
)
if build_result:
@ -1853,7 +1986,7 @@ def main() -> int:
print("::endgroup::")
# ci cache check
if not previous_status and not indata["ci_flags"][Labels.NO_CI_CACHE]:
if not previous_status and not ci_options.no_ci_cache:
ci_cache = CiCache(s3, indata["jobs_data"]["digests"]).update()
job_config = CI_CONFIG.get_job_config(check_name)
if ci_cache.is_successful(

View File

@ -99,7 +99,7 @@ class JobNames(metaclass=WithIter):
STATELESS_TEST_MSAN = "Stateless tests (msan)"
STATELESS_TEST_UBSAN = "Stateless tests (ubsan)"
STATELESS_TEST_ANALYZER_S3_REPLICATED_RELEASE = (
"Stateless tests (release, analyzer, s3, DatabaseReplicated)"
"Stateless tests (release, old analyzer, s3, DatabaseReplicated)"
)
# merged into STATELESS_TEST_ANALYZER_S3_REPLICATED_RELEASE:
# STATELESS_TEST_ANALYZER_RELEASE = "Stateless tests (release, analyzer)"
@ -132,7 +132,7 @@ class JobNames(metaclass=WithIter):
INTEGRATION_TEST = "Integration tests (release)"
INTEGRATION_TEST_ASAN = "Integration tests (asan)"
INTEGRATION_TEST_ASAN_ANALYZER = "Integration tests (asan, analyzer)"
INTEGRATION_TEST_ASAN_ANALYZER = "Integration tests (asan, old analyzer)"
INTEGRATION_TEST_TSAN = "Integration tests (tsan)"
INTEGRATION_TEST_ARM = "Integration tests (aarch64)"
INTEGRATION_TEST_FLAKY = "Integration tests flaky check (asan)"

View File

@ -47,6 +47,7 @@ def get_fasttest_cmd(
f"-e SCCACHE_BUCKET={S3_BUILDS_BUCKET} -e SCCACHE_S3_KEY_PREFIX=ccache/sccache "
"-e stage=clone_submodules "
f"--volume={workspace}:/fasttest-workspace --volume={repo_path}:/ClickHouse "
f"--volume={repo_path}/tests/analyzer_tech_debt.txt:/analyzer_tech_debt.txt "
f"--volume={output_path}:/test_output {image}"
)

View File

@ -39,7 +39,7 @@ def get_additional_envs(
result.append("USE_S3_STORAGE_FOR_MERGE_TREE=1")
result.append("RANDOMIZE_OBJECT_KEY_TYPE=1")
if "analyzer" in check_name:
result.append("USE_NEW_ANALYZER=1")
result.append("USE_OLD_ANALYZER=1")
if run_by_hash_total != 0:
result.append(f"RUN_BY_HASH_NUM={run_by_hash_num}")
@ -94,7 +94,7 @@ def get_run_command(
env_str = " ".join(envs)
volume_with_broken_test = (
f"--volume={repo_path}/tests/analyzer_tech_debt.txt:/analyzer_tech_debt.txt "
if "analyzer" in check_name
if "analyzer" not in check_name
else ""
)

View File

@ -74,7 +74,7 @@ def get_env_for_runner(
my_env["CLICKHOUSE_TESTS_RUNNER_RESTART_DOCKER"] = "0"
if "analyzer" in check_name.lower():
my_env["CLICKHOUSE_USE_NEW_ANALYZER"] = "1"
my_env["CLICKHOUSE_USE_OLD_ANALYZER"] = "1"
return my_env

View File

@ -265,7 +265,7 @@ class ClickhouseIntegrationTestsRunner:
self.start_time = time.time()
self.soft_deadline_time = self.start_time + (TASK_TIMEOUT - MAX_TIME_IN_SANDBOX)
self.use_analyzer = os.environ.get("CLICKHOUSE_USE_NEW_ANALYZER") is not None
self.use_analyzer = os.environ.get("CLICKHOUSE_USE_OLD_ANALYZER") is not None
if "run_by_hash_total" in self.params:
self.run_by_hash_total = self.params["run_by_hash_total"]

View File

@ -33,7 +33,7 @@ def get_additional_envs(check_name, run_by_hash_num, run_by_hash_total):
result.append("USE_S3_STORAGE_FOR_MERGE_TREE=1")
result.append("RANDOMIZE_OBJECT_KEY_TYPE=1")
if "analyzer" in check_name:
result.append("USE_NEW_ANALYZER=1")
result.append("USE_OLD_ANALYZER=1")
if run_by_hash_total != 0:
result.append(f"RUN_BY_HASH_NUM={run_by_hash_num}")

View File

@ -5,7 +5,6 @@ import logging
import os
from commit_status_helper import get_commit, post_commit_status
from env_helper import GITHUB_JOB_URL
from get_robot_token import get_best_robot_token
from git_helper import commit as commit_arg
from github_helper import GitHub
@ -33,7 +32,6 @@ def main():
help="if given, used instead of one from PRInfo",
)
args = parser.parse_args()
url = ""
description = "the release can be created from the commit, manually set"
pr_info = None
if not args.commit:
@ -41,7 +39,6 @@ def main():
if pr_info.event == pr_info.default_event:
raise ValueError("neither launched from the CI nor commit is given")
args.commit = pr_info.sha
url = GITHUB_JOB_URL()
description = "the release can be created from the commit"
args.token = args.token or get_best_robot_token()
@ -52,7 +49,7 @@ def main():
post_commit_status(
commit,
SUCCESS,
url,
"",
description,
RELEASE_READY_STATUS,
pr_info,

121
tests/ci/test_ci_options.py Normal file
View File

@ -0,0 +1,121 @@
#!/usr/bin/env python3
# type: ignore
import unittest
from ci import CiOptions
from ci_config import JobNames
_TEST_BODY_1 = """
#### Run only:
- [x] <!---ci_set_integration--> Integration tests
- [ ] <!---ci_set_arm--> Integration tests (arm64)
- [x] <!---ci_include_foo--> Integration tests
- [x] <!---ci_include_foo_Bar--> Integration tests
- [ ] <!---ci_include_bar--> Integration tests
- [x] <!---ci_exclude_foo--> some invalid mask - should be skipped
- [x] <!---ci_exclude_Foo_bar--> Integration tests
- [ ] <!---ci_exclude_bar--> Integration tests
#### CI options:
- [ ] <!---do_not_test--> do not test (only style check)
- [x] <!---no_merge_commit--> disable merge-commit (no merge from master before tests)
- [ ] <!---no_ci_cache--> disable CI cache (job reuse)
#### Only specified batches in multi-batch jobs:
- [x] <!---batch_0--> 1
- [ ] <!---batch_1--> 2
"""
_TEST_BODY_2 = """
- [x] <!---ci_include_integration--> MUST include integration tests
- [x] <!---ci_include_stateless--> MUST include stateless tests
- [x] <!---ci_include_foo_Bar--> no action must be applied
- [ ] <!---ci_include_bar--> no action must be applied
- [x] <!---ci_exclude_tsan--> MUST exclude tsan
- [x] <!---ci_exclude_aarch64--> MUST exclude aarch64
- [x] <!---ci_exclude_analyzer--> MUST exclude test with analazer
- [ ] <!---ci_exclude_bar--> no action applied
- [x] <!---ci_exclude_s3_storage--> Must exclude statless test with s3 storage
- [x] <!---ci_exclude_coverage--> Must exclude tests on coverage build
"""
_TEST_BODY_3 = """
- [x] <!---ci_include_analyzer--> Must include all tests for analyzer
"""
class TestCIOptions(unittest.TestCase):
def test_pr_body_parsing(self):
ci_options = CiOptions.create_from_pr_message(
_TEST_BODY_1, update_from_api=False
)
self.assertFalse(ci_options.do_not_test)
self.assertFalse(ci_options.no_ci_cache)
self.assertTrue(ci_options.no_merge_commit)
self.assertEqual(ci_options.ci_sets, ["ci_set_integration"])
self.assertCountEqual(ci_options.include_keywords, ["foo", "foo_bar"])
self.assertCountEqual(ci_options.exclude_keywords, ["foo", "foo_bar"])
def test_options_applied(self):
self.maxDiff = None
ci_options = CiOptions.create_from_pr_message(
_TEST_BODY_2, update_from_api=False
)
self.assertCountEqual(
ci_options.include_keywords, ["integration", "foo_bar", "stateless"]
)
self.assertCountEqual(
ci_options.exclude_keywords,
["tsan", "aarch64", "analyzer", "s3_storage", "coverage"],
)
jobs_to_do = list(JobNames)
jobs_to_skip = []
job_params = {}
jobs_to_do, jobs_to_skip, job_params = ci_options.apply(
jobs_to_do, jobs_to_skip, job_params
)
self.assertCountEqual(
jobs_to_do,
[
"Style check",
"package_release",
"package_asan",
"package_ubsan",
"package_debug",
"package_msan",
"Stateless tests (asan)",
"Stateless tests flaky check (asan)",
"Stateless tests (msan)",
"Stateless tests (ubsan)",
"Stateless tests (debug)",
"Stateless tests (release)",
"Integration tests (release)",
"Integration tests (asan)",
"Integration tests flaky check (asan)",
],
)
def test_options_applied_2(self):
self.maxDiff = None
ci_options = CiOptions.create_from_pr_message(
_TEST_BODY_3, update_from_api=False
)
self.assertCountEqual(ci_options.include_keywords, ["analyzer"])
self.assertIsNone(ci_options.exclude_keywords)
jobs_to_do = list(JobNames)
jobs_to_skip = []
job_params = {}
jobs_to_do, jobs_to_skip, job_params = ci_options.apply(
jobs_to_do, jobs_to_skip, job_params
)
self.assertCountEqual(
jobs_to_do,
[
"Style check",
"Integration tests (asan, old analyzer)",
"package_release",
"Stateless tests (release, old analyzer, s3, DatabaseReplicated)",
"package_asan",
],
)

View File

@ -92,13 +92,21 @@ def upload_results(
else:
raw_log_url = GITHUB_JOB_URL()
try:
job_url = GITHUB_JOB_URL()
except Exception:
print(
"ERROR: Failed to get job URL from GH API, job report will use run URL instead."
)
job_url = GITHUB_RUN_URL
if test_results or not ready_report_url:
html_report = create_test_html_report(
check_name,
test_results,
raw_log_url,
GITHUB_RUN_URL,
GITHUB_JOB_URL(),
job_url,
branch_url,
branch_name,
commit_url,

View File

@ -67,6 +67,21 @@
</replica>
</shard>
</test_cluster_two_replicas_different_databases>
<test_cluster_two_replicas_different_databases_internal_replication>
<shard>
<internal_replication>true</internal_replication>
<replica>
<default_database>shard_0</default_database>
<host>localhost</host>
<port>9000</port>
</replica>
<replica>
<default_database>shard_1</default_database>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
</test_cluster_two_replicas_different_databases_internal_replication>
<test_cluster_interserver_secret>
<secret>123457</secret>
<shard>

View File

@ -92,7 +92,7 @@ ln -sf $SRC_PATH/users.d/nonconst_timezone.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/allow_introspection_functions.yaml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/replicated_ddl_entry.xml $DEST_SERVER_PATH/users.d/
if [[ -n "$USE_NEW_ANALYZER" ]] && [[ "$USE_NEW_ANALYZER" -eq 1 ]]; then
if [[ -n "$USE_OLD_ANALYZER" ]] && [[ "$USE_OLD_ANALYZER" -eq 1 ]]; then
ln -sf $SRC_PATH/users.d/analyzer.xml $DEST_SERVER_PATH/users.d/
fi

View File

@ -1,7 +1,7 @@
<clickhouse>
<profiles>
<default>
<allow_experimental_analyzer>1</allow_experimental_analyzer>
<allow_experimental_analyzer>0</allow_experimental_analyzer>
</default>
</profiles>
</clickhouse>

View File

@ -1,7 +1,7 @@
<clickhouse>
<profiles>
<default>
<allow_experimental_analyzer>1</allow_experimental_analyzer>
<allow_experimental_analyzer>0</allow_experimental_analyzer>
</default>
</profiles>
</clickhouse>

View File

@ -1601,7 +1601,7 @@ class ClickHouseCluster:
with_jdbc_bridge=False,
with_hive=False,
with_coredns=False,
allow_analyzer=True,
use_old_analyzer=False,
hostname=None,
env_variables=None,
instance_env_variables=False,
@ -1700,7 +1700,7 @@ class ClickHouseCluster:
with_coredns=with_coredns,
with_cassandra=with_cassandra,
with_ldap=with_ldap,
allow_analyzer=allow_analyzer,
use_old_analyzer=use_old_analyzer,
server_bin_path=self.server_bin_path,
odbc_bridge_bin_path=self.odbc_bridge_bin_path,
library_bridge_bin_path=self.library_bridge_bin_path,
@ -3262,7 +3262,7 @@ class ClickHouseInstance:
with_coredns,
with_cassandra,
with_ldap,
allow_analyzer,
use_old_analyzer,
server_bin_path,
odbc_bridge_bin_path,
library_bridge_bin_path,
@ -3356,7 +3356,7 @@ class ClickHouseInstance:
self.with_hive = with_hive
self.with_coredns = with_coredns
self.coredns_config_dir = p.abspath(p.join(base_path, "coredns_config"))
self.allow_analyzer = allow_analyzer
self.use_old_analyzer = use_old_analyzer
self.main_config_name = main_config_name
self.users_config_name = users_config_name
@ -4405,10 +4405,7 @@ class ClickHouseInstance:
)
write_embedded_config("0_common_instance_users.xml", users_d_dir)
if (
os.environ.get("CLICKHOUSE_USE_NEW_ANALYZER") is not None
and self.allow_analyzer
):
if self.use_old_analyzer:
write_embedded_config("0_common_enable_analyzer.xml", users_d_dir)
if len(self.custom_dictionaries_paths):

View File

@ -387,7 +387,7 @@ if __name__ == "__main__":
use_analyzer = ""
if args.analyzer:
use_analyzer = "-e CLICKHOUSE_USE_NEW_ANALYZER=1"
use_analyzer = "-e CLICKHOUSE_USE_OLD_ANALYZER=1"
# NOTE: since pytest options is in the argument value already we need to additionally escape '"'
pytest_opts = " ".join(

View File

@ -10,13 +10,12 @@ node1 = cluster.add_instance(
tag=CLICKHOUSE_CI_MIN_TESTED_VERSION,
stay_alive=True,
with_installed_binary=True,
allow_analyzer=False,
)
node2 = cluster.add_instance(
"node2",
main_configs=["configs/wide_parts_only.xml", "configs/no_compress_marks.xml"],
with_zookeeper=True,
allow_analyzer=False,
use_old_analyzer=True,
)

View File

@ -9,10 +9,9 @@ node1 = cluster.add_instance(
image="clickhouse/clickhouse-server",
tag=CLICKHOUSE_CI_MIN_TESTED_VERSION,
with_installed_binary=True,
allow_analyzer=False,
)
node2 = cluster.add_instance("node2", with_zookeeper=True, allow_analyzer=False)
node3 = cluster.add_instance("node3", with_zookeeper=True, allow_analyzer=False)
node2 = cluster.add_instance("node2", with_zookeeper=True, use_old_analyzer=True)
node3 = cluster.add_instance("node3", with_zookeeper=True, use_old_analyzer=True)
@pytest.fixture(scope="module")

View File

@ -10,7 +10,6 @@ node1 = cluster.add_instance(
tag=CLICKHOUSE_CI_MIN_TESTED_VERSION,
stay_alive=True,
with_installed_binary=True,
allow_analyzer=False,
)
node2 = cluster.add_instance(
"node2",
@ -19,10 +18,9 @@ node2 = cluster.add_instance(
tag=CLICKHOUSE_CI_MIN_TESTED_VERSION,
stay_alive=True,
with_installed_binary=True,
allow_analyzer=False,
)
node3 = cluster.add_instance("node3", with_zookeeper=False, allow_analyzer=False)
node4 = cluster.add_instance("node4", with_zookeeper=False, allow_analyzer=False)
node3 = cluster.add_instance("node3", with_zookeeper=False, use_old_analyzer=True)
node4 = cluster.add_instance("node4", with_zookeeper=False, use_old_analyzer=True)
@pytest.fixture(scope="module")

View File

@ -9,7 +9,6 @@ node = cluster.add_instance(
stay_alive=True,
with_zookeeper=True,
with_installed_binary=True,
allow_analyzer=False,
)

View File

@ -3,7 +3,7 @@ import pytest
from helpers.cluster import ClickHouseCluster, CLICKHOUSE_CI_MIN_TESTED_VERSION
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance("node1", with_zookeeper=False, allow_analyzer=False)
node1 = cluster.add_instance("node1", with_zookeeper=False, use_old_analyzer=True)
node2 = cluster.add_instance(
"node2",
with_zookeeper=False,
@ -11,7 +11,6 @@ node2 = cluster.add_instance(
tag=CLICKHOUSE_CI_MIN_TESTED_VERSION,
stay_alive=True,
with_installed_binary=True,
allow_analyzer=False,
)

View File

@ -9,13 +9,12 @@ from helpers.cluster import ClickHouseCluster, CLICKHOUSE_CI_MIN_TESTED_VERSION
from helpers.client import QueryRuntimeException
cluster = ClickHouseCluster(__file__)
upstream = cluster.add_instance("upstream", allow_analyzer=False)
upstream = cluster.add_instance("upstream", use_old_analyzer=True)
backward = cluster.add_instance(
"backward",
image="clickhouse/clickhouse-server",
tag=CLICKHOUSE_CI_MIN_TESTED_VERSION,
with_installed_binary=True,
allow_analyzer=False,
)

View File

@ -7,13 +7,12 @@ import pytest
from helpers.cluster import ClickHouseCluster, CLICKHOUSE_CI_MIN_TESTED_VERSION
cluster = ClickHouseCluster(__file__)
upstream_node = cluster.add_instance("upstream_node", allow_analyzer=False)
upstream_node = cluster.add_instance("upstream_node", use_old_analyzer=True)
old_node = cluster.add_instance(
"old_node",
image="clickhouse/clickhouse-server",
tag=CLICKHOUSE_CI_MIN_TESTED_VERSION,
with_installed_binary=True,
allow_analyzer=False,
)

View File

@ -10,7 +10,6 @@ node = cluster.add_instance(
tag=CLICKHOUSE_CI_MIN_TESTED_VERSION,
stay_alive=True,
with_installed_binary=True,
allow_analyzer=False,
)

View File

@ -10,7 +10,6 @@ node1 = cluster.add_instance(
tag=CLICKHOUSE_CI_MIN_TESTED_VERSION,
stay_alive=True,
with_installed_binary=True,
allow_analyzer=False,
)
node2 = cluster.add_instance(
"node2",
@ -19,9 +18,8 @@ node2 = cluster.add_instance(
tag=CLICKHOUSE_CI_MIN_TESTED_VERSION,
stay_alive=True,
with_installed_binary=True,
allow_analyzer=False,
)
node3 = cluster.add_instance("node3", with_zookeeper=False, allow_analyzer=False)
node3 = cluster.add_instance("node3", with_zookeeper=False, use_old_analyzer=True)
@pytest.fixture(scope="module")

View File

@ -3,7 +3,7 @@ import pytest
from helpers.cluster import ClickHouseCluster, CLICKHOUSE_CI_MIN_TESTED_VERSION
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance("node1", with_zookeeper=False, allow_analyzer=False)
node1 = cluster.add_instance("node1", with_zookeeper=False, use_old_analyzer=True)
node2 = cluster.add_instance(
"node2",
with_zookeeper=False,
@ -11,7 +11,6 @@ node2 = cluster.add_instance(
tag=CLICKHOUSE_CI_MIN_TESTED_VERSION,
stay_alive=True,
with_installed_binary=True,
allow_analyzer=False,
)

View File

@ -3,7 +3,7 @@ import pytest
from helpers.cluster import ClickHouseCluster, CLICKHOUSE_CI_MIN_TESTED_VERSION
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance("node1", with_zookeeper=False, allow_analyzer=False)
node1 = cluster.add_instance("node1", with_zookeeper=False, use_old_analyzer=True)
node2 = cluster.add_instance(
"node2",
with_zookeeper=False,
@ -11,7 +11,6 @@ node2 = cluster.add_instance(
tag=CLICKHOUSE_CI_MIN_TESTED_VERSION,
stay_alive=True,
with_installed_binary=True,
allow_analyzer=False,
)

View File

@ -10,7 +10,6 @@ node1 = cluster.add_instance(
tag=CLICKHOUSE_CI_MIN_TESTED_VERSION,
stay_alive=True,
with_installed_binary=True,
allow_analyzer=False,
)
node2 = cluster.add_instance(
"node2",
@ -19,9 +18,8 @@ node2 = cluster.add_instance(
tag=CLICKHOUSE_CI_MIN_TESTED_VERSION,
stay_alive=True,
with_installed_binary=True,
allow_analyzer=False,
)
node3 = cluster.add_instance("node3", with_zookeeper=False, allow_analyzer=False)
node3 = cluster.add_instance("node3", with_zookeeper=False, use_old_analyzer=True)
@pytest.fixture(scope="module")

View File

@ -11,7 +11,6 @@ node_old = cluster.add_instance(
stay_alive=True,
with_installed_binary=True,
with_zookeeper=True,
allow_analyzer=False,
)
node_new = cluster.add_instance(
"node2",
@ -21,7 +20,7 @@ node_new = cluster.add_instance(
],
with_zookeeper=True,
stay_alive=True,
allow_analyzer=False,
use_old_analyzer=True,
)

View File

@ -223,14 +223,17 @@ def check(node, table, check_result, expect_broken_part="", expected_error=""):
query_id = node.query(
f"SELECT queryID() FROM (SELECT c FROM '{table}' WHERE d == 12 ORDER BY c)"
).strip()
node.query("SYSTEM FLUSH LOGS")
res = node.query(
f"""
SELECT query, splitByChar('.', arrayJoin(projections))[-1]
FROM system.query_log
WHERE query_id='{query_id}' AND type='QueryFinish'
"""
)
for _ in range(10):
node.query("SYSTEM FLUSH LOGS")
res = node.query(
f"""
SELECT query, splitByChar('.', arrayJoin(projections))[-1]
FROM system.query_log
WHERE query_id='{query_id}' AND type='QueryFinish'
"""
)
if res != "":
break
if res == "":
res = node.query(
"""
@ -238,7 +241,7 @@ def check(node, table, check_result, expect_broken_part="", expected_error=""):
FROM system.query_log ORDER BY query_start_time_microseconds DESC
"""
)
print(f"LOG: {res}")
print(f"Looked for query id {query_id}, but to no avail: {res}")
assert False
assert "proj1" in res
@ -250,14 +253,17 @@ def check(node, table, check_result, expect_broken_part="", expected_error=""):
query_id = node.query(
f"SELECT queryID() FROM (SELECT d FROM '{table}' WHERE c == 12 ORDER BY d)"
).strip()
node.query("SYSTEM FLUSH LOGS")
res = node.query(
f"""
SELECT query, splitByChar('.', arrayJoin(projections))[-1]
FROM system.query_log
WHERE query_id='{query_id}' AND type='QueryFinish'
"""
)
for _ in range(10):
node.query("SYSTEM FLUSH LOGS")
res = node.query(
f"""
SELECT query, splitByChar('.', arrayJoin(projections))[-1]
FROM system.query_log
WHERE query_id='{query_id}' AND type='QueryFinish'
"""
)
if res != "":
break
if res == "":
res = node.query(
"""
@ -265,7 +271,7 @@ def check(node, table, check_result, expect_broken_part="", expected_error=""):
FROM system.query_log ORDER BY query_start_time_microseconds DESC
"""
)
print(f"LOG: {res}")
print(f"Looked for query id {query_id}, but to no avail: {res}")
assert False
assert "proj2" in res

View File

@ -13,7 +13,7 @@ def cluster():
"node1",
main_configs=["configs/storage_conf.xml"],
with_nginx=True,
allow_analyzer=False,
use_old_analyzer=True,
)
cluster.add_instance(
"node2",
@ -21,14 +21,14 @@ def cluster():
with_nginx=True,
stay_alive=True,
with_zookeeper=True,
allow_analyzer=False,
use_old_analyzer=True,
)
cluster.add_instance(
"node3",
main_configs=["configs/storage_conf_web.xml"],
with_nginx=True,
with_zookeeper=True,
allow_analyzer=False,
use_old_analyzer=True,
)
cluster.add_instance(
@ -39,7 +39,6 @@ def cluster():
with_installed_binary=True,
image="clickhouse/clickhouse-server",
tag=CLICKHOUSE_CI_MIN_TESTED_VERSION,
allow_analyzer=False,
)
cluster.start()

View File

@ -14,7 +14,6 @@ node_dist = cluster.add_instance(
tag=CLICKHOUSE_CI_MIN_TESTED_VERSION,
stay_alive=True,
with_installed_binary=True,
allow_analyzer=False,
)

View File

@ -8,13 +8,13 @@ node1 = cluster.add_instance(
"node1",
main_configs=["configs/clusters.xml"],
with_zookeeper=True,
allow_analyzer=False,
use_old_analyzer=True,
)
node2 = cluster.add_instance(
"node2",
main_configs=["configs/clusters.xml"],
with_zookeeper=True,
allow_analyzer=False,
use_old_analyzer=True,
)

View File

@ -8,13 +8,13 @@ node1 = cluster.add_instance(
"node1",
main_configs=["configs/clusters.xml"],
with_zookeeper=True,
allow_analyzer=False,
use_old_analyzer=True,
)
node2 = cluster.add_instance(
"node2",
main_configs=["configs/clusters.xml"],
with_zookeeper=True,
allow_analyzer=False,
use_old_analyzer=True,
)

View File

@ -10,7 +10,6 @@ node_oldest = cluster.add_instance(
tag=CLICKHOUSE_CI_MIN_TESTED_VERSION,
with_installed_binary=True,
main_configs=["configs/config.d/test_cluster.xml"],
allow_analyzer=False,
)
old_nodes = [node_oldest]
new_node = cluster.add_instance("node_new")

View File

@ -369,7 +369,6 @@ node7 = cluster.add_instance(
tag=CLICKHOUSE_CI_MIN_TESTED_VERSION,
stay_alive=True,
with_installed_binary=True,
allow_analyzer=False,
)
node8 = cluster.add_instance(
"node8",

View File

@ -9,7 +9,6 @@ node1 = cluster.add_instance(
tag=CLICKHOUSE_CI_MIN_TESTED_VERSION,
stay_alive=True,
with_installed_binary=True,
allow_analyzer=False,
)
node2 = cluster.add_instance(
"node2",
@ -18,7 +17,6 @@ node2 = cluster.add_instance(
tag=CLICKHOUSE_CI_MIN_TESTED_VERSION,
stay_alive=True,
with_installed_binary=True,
allow_analyzer=False,
)

View File

@ -108,7 +108,7 @@ def started_cluster():
tag="23.12",
stay_alive=True,
with_installed_binary=True,
allow_analyzer=False,
use_old_analyzer=True,
)
logging.info("Starting cluster...")

View File

@ -23,7 +23,6 @@ node4 = cluster.add_instance(
main_configs=[
"configs/compat.xml",
],
allow_analyzer=False,
)
node5 = cluster.add_instance(
@ -36,7 +35,6 @@ node5 = cluster.add_instance(
main_configs=[
"configs/compat.xml",
],
allow_analyzer=False,
)
node6 = cluster.add_instance(
"node6",
@ -48,7 +46,6 @@ node6 = cluster.add_instance(
main_configs=[
"configs/compat.xml",
],
allow_analyzer=False,
)

View File

@ -1,3 +1,3 @@
<clickhouse>
<database_atomic_delay_before_drop_table_sec>3</database_atomic_delay_before_drop_table_sec>
<database_atomic_delay_before_drop_table_sec>20</database_atomic_delay_before_drop_table_sec>
</clickhouse>

View File

@ -1,5 +1,6 @@
import pytest
import uuid
import logging
import time
from helpers.cluster import ClickHouseCluster
@ -20,29 +21,28 @@ def started_cluster():
def test_undrop_drop_and_undrop_loop(started_cluster):
# create, drop, undrop, drop, undrop table 5 times
for _ in range(5):
table_uuid = str(uuid.uuid1())
table = f"test_undrop_loop"
uuid_list = []
for i in range(4):
table_uuid = uuid.uuid1().__str__()
uuid_list.append(table_uuid)
logging.info(f"table_uuid: {table_uuid}")
node.query(
f"CREATE TABLE {table} "
f"UUID '{table_uuid}' (id Int32) "
f"Engine=MergeTree() ORDER BY id"
f"CREATE TABLE test_undrop_{i} UUID '{table_uuid}' (id Int32) ENGINE = MergeTree() ORDER BY id;"
)
node.query(f"DROP TABLE {table}")
node.query(f"UNDROP TABLE {table} UUID '{table_uuid}'")
node.query(f"DROP TABLE test_undrop_{i};")
node.query(f"DROP TABLE {table}")
# database_atomic_delay_before_drop_table_sec=3
time.sleep(6)
"""
Expect two things:
1. Table is dropped - UNKNOWN_TABLE in error
2. Table in process of dropping - Return code: 60.
The drop task of table ... (uuid) is in progress,
has been dropped or the database engine doesn't support it
"""
error = node.query_and_get_error(f"UNDROP TABLE {table} UUID '{table_uuid}'")
assert "UNKNOWN_TABLE" in error or "The drop task of table" in error
for i in range(4):
if (
i >= 3
): # First 3 tables are undropped after 0, 5 and 10 seconds. Fourth is undropped after 21 seconds
time.sleep(6)
error = node.query_and_get_error(
f"UNDROP TABLE test_undrop_loop_{i} UUID '{uuid_list[i]}';"
)
assert "UNKNOWN_TABLE" in error
else:
node.query(f"UNDROP TABLE test_undrop_loop_{i} UUID '{uuid_list[i]}';")
time.sleep(5)

View File

@ -14,7 +14,6 @@ node2 = cluster.add_instance(
tag=CLICKHOUSE_CI_MIN_TESTED_VERSION,
with_installed_binary=True,
stay_alive=True,
allow_analyzer=False,
)

View File

@ -16,7 +16,6 @@ node1 = cluster.add_instance(
main_configs=[
"configs/compat.xml",
],
allow_analyzer=False,
)
node2 = cluster.add_instance(
"node2",
@ -28,7 +27,6 @@ node2 = cluster.add_instance(
main_configs=[
"configs/compat.xml",
],
allow_analyzer=False,
)
node3 = cluster.add_instance(
"node3",
@ -40,7 +38,6 @@ node3 = cluster.add_instance(
main_configs=[
"configs/compat.xml",
],
allow_analyzer=False,
)

View File

@ -67,4 +67,5 @@ from
select throwIf(uniq((test, query)) != 1) from table
) check_single_query -- this subselect checks that there is only one query in the input table;
-- written this way so that it is not optimized away (#10523)
SETTINGS allow_experimental_analyzer = 0
;

View File

@ -1,11 +1,11 @@
"Hello, ""World""",123,"[1,2,3]",456,"['abc','def']","Newline
"Hello, ""World""",123,"[1,2,3]","(456,['abc','def'])","Newline
here"
"x","y","z","a","b"
"Hello, ""World""",123,"[1,2,3]",456,"['abc','def']","Newline
"Hello, ""World""",123,"[1,2,3]","(456,['abc','def'])","Newline
here"
"x","y","z","a","b"
"String","UInt8","Array(UInt8)","Tuple(UInt16, Array(String))","String"
"Hello, ""World""",123,"[1,2,3]",456,"['abc','def']","Newline
"Hello, ""World""",123,"[1,2,3]","(456,['abc','def'])","Newline
here"
0,"0","[]","2000-01-01","2000-01-01 00:00:00"
1,"1","[0]","2000-01-02","2000-01-01 00:00:01"

View File

@ -1,3 +1,5 @@
-- Tags: no-parallel
SELECT '-- test FORMAT clause --';
SET output_format_write_statistics = 0;
SELECT number, 'Hello & world' FROM numbers(3) FORMAT Tsv;

Some files were not shown because too many files have changed in this diff Show More