Merge branch 'master' of github.com:ClickHouse/ClickHouse into full-syntax-highlight

This commit is contained in:
Alexey Milovidov 2024-04-08 19:00:46 +02:00
commit c5ae1eec1e
100 changed files with 876 additions and 383 deletions

View File

@ -34,7 +34,7 @@ RUN arch=${TARGETARCH:-amd64} \
# lts / testing / prestable / etc
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="https://packages.clickhouse.com/tgz/${REPO_CHANNEL}"
ARG VERSION="24.3.1.2672"
ARG VERSION="24.3.2.23"
ARG PACKAGES="clickhouse-keeper"
ARG DIRECT_DOWNLOAD_URLS=""

View File

@ -32,7 +32,7 @@ RUN arch=${TARGETARCH:-amd64} \
# lts / testing / prestable / etc
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="https://packages.clickhouse.com/tgz/${REPO_CHANNEL}"
ARG VERSION="24.3.1.2672"
ARG VERSION="24.3.2.23"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
ARG DIRECT_DOWNLOAD_URLS=""

View File

@ -27,7 +27,7 @@ RUN sed -i "s|http://archive.ubuntu.com|${apt_archive}|g" /etc/apt/sources.list
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="deb [signed-by=/usr/share/keyrings/clickhouse-keyring.gpg] https://packages.clickhouse.com/deb ${REPO_CHANNEL} main"
ARG VERSION="24.3.1.2672"
ARG VERSION="24.3.2.23"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
# set non-empty deb_location_url url to create a docker image

View File

@ -0,0 +1,29 @@
---
sidebar_position: 1
sidebar_label: 2024
---
# 2024 Changelog
### ClickHouse release v24.3.2.23-lts (8b7d910960c) FIXME as compared to v24.3.1.2672-lts (2c5c589a882)
#### Bug Fix (user-visible misbehavior in an official stable release)
* Fix logical error in group_by_use_nulls + grouping set + analyzer + materialize/constant [#61567](https://github.com/ClickHouse/ClickHouse/pull/61567) ([Kruglov Pavel](https://github.com/Avogar)).
* Fix external table cannot parse data type Bool [#62115](https://github.com/ClickHouse/ClickHouse/pull/62115) ([Duc Canh Le](https://github.com/canhld94)).
* Revert "Merge pull request [#61564](https://github.com/ClickHouse/ClickHouse/issues/61564) from liuneng1994/optimize_in_single_value" [#62135](https://github.com/ClickHouse/ClickHouse/pull/62135) ([Raúl Marín](https://github.com/Algunenano)).
#### CI Fix or Improvement (changelog entry is not required)
* Backported in [#62030](https://github.com/ClickHouse/ClickHouse/issues/62030):. [#61869](https://github.com/ClickHouse/ClickHouse/pull/61869) ([Nikita Fomichev](https://github.com/fm4v)).
* Backported in [#62057](https://github.com/ClickHouse/ClickHouse/issues/62057): ... [#62044](https://github.com/ClickHouse/ClickHouse/pull/62044) ([Max K.](https://github.com/maxknv)).
* Backported in [#62204](https://github.com/ClickHouse/ClickHouse/issues/62204):. [#62190](https://github.com/ClickHouse/ClickHouse/pull/62190) ([Konstantin Bogdanov](https://github.com/thevar1able)).
#### NOT FOR CHANGELOG / INSIGNIFICANT
* Fix some crashes with analyzer and group_by_use_nulls. [#61933](https://github.com/ClickHouse/ClickHouse/pull/61933) ([Nikolai Kochetov](https://github.com/KochetovNicolai)).
* Fix scalars create as select [#61998](https://github.com/ClickHouse/ClickHouse/pull/61998) ([Nikolai Kochetov](https://github.com/KochetovNicolai)).
* Ignore IfChainToMultiIfPass if returned type changed. [#62059](https://github.com/ClickHouse/ClickHouse/pull/62059) ([Nikolai Kochetov](https://github.com/KochetovNicolai)).
* Fix type for ConvertInToEqualPass [#62066](https://github.com/ClickHouse/ClickHouse/pull/62066) ([Nikolai Kochetov](https://github.com/KochetovNicolai)).
* Revert output Pretty in tty [#62090](https://github.com/ClickHouse/ClickHouse/pull/62090) ([Alexey Milovidov](https://github.com/alexey-milovidov)).

View File

@ -18,6 +18,9 @@ Run the command:
```bash
wget https://s3.amazonaws.com/menusdata.nypl.org/gzips/2021_08_01_07_01_17_data.tgz
# Option: Validate the checksum
md5sum 2021_08_01_07_01_17_data.tgz
# Checksum should be equal to: db6126724de939a5481e3160a2d67d15
```
Replace the link to the up to date link from http://menus.nypl.org/data if needed.

View File

@ -507,16 +507,18 @@ Example:
``` xml
<http_handlers>
<rule>
<url><![CDATA[/query_param_with_url/\w+/(?P<name_1>[^/]+)(/(?P<name_2>[^/]+))?]]></url>
<url><![CDATA[regex:/query_param_with_url/(?P<name_1>[^/]+)]]></url>
<methods>GET</methods>
<headers>
<XXX>TEST_HEADER_VALUE</XXX>
<PARAMS_XXX><![CDATA[(?P<name_1>[^/]+)(/(?P<name_2>[^/]+))?]]></PARAMS_XXX>
<PARAMS_XXX><![CDATA[regex:(?P<name_2>[^/]+)]]></PARAMS_XXX>
</headers>
<handler>
<type>predefined_query_handler</type>
<query>SELECT value FROM system.settings WHERE name = {name_1:String}</query>
<query>SELECT name, value FROM system.settings WHERE name = {name_2:String}</query>
<query>
SELECT name, value FROM system.settings
WHERE name IN ({name_1:String}, {name_2:String})
</query>
</handler>
</rule>
<defaults/>
@ -524,13 +526,13 @@ Example:
```
``` bash
$ curl -H 'XXX:TEST_HEADER_VALUE' -H 'PARAMS_XXX:max_threads' 'http://localhost:8123/query_param_with_url/1/max_threads/max_final_threads?max_threads=1&max_final_threads=2'
1
max_final_threads 2
$ curl -H 'XXX:TEST_HEADER_VALUE' -H 'PARAMS_XXX:max_final_threads' 'http://localhost:8123/query_param_with_url/max_threads?max_threads=1&max_final_threads=2'
max_final_threads 2
max_threads 1
```
:::note
In one `predefined_query_handler` only supports one `query` of an insert type.
In one `predefined_query_handler` only one `query` is supported.
:::
### dynamic_query_handler {#dynamic_query_handler}

View File

@ -7,26 +7,33 @@ sidebar_position: 351
[Cramer's V](https://en.wikipedia.org/wiki/Cram%C3%A9r%27s_V) (sometimes referred to as Cramer's phi) is a measure of association between two columns in a table. The result of the `cramersV` function ranges from 0 (corresponding to no association between the variables) to 1 and can reach 1 only when each value is completely determined by the other. It may be viewed as the association between two variables as a percentage of their maximum possible variation.
:::note
For a bias corrected version of Cramer's V see: [cramersVBiasCorrected](./cramersvbiascorrected.md)
:::
**Syntax**
``` sql
cramersV(column1, column2)
```
**Arguments**
**Parameters**
- `column1` and `column2` are the columns to be compared
- `column1`: first column to be compared.
- `column2`: second column to be compared.
**Returned value**
- a value between 0 (corresponding to no association between the columns' values) to 1 (complete association).
**Return type** is always [Float64](../../../sql-reference/data-types/float.md).
Type: always [Float64](../../../sql-reference/data-types/float.md).
**Example**
The following two columns being compared below have no association with each other, so the result of `cramersV` is 0:
Query:
``` sql
SELECT
cramersV(a, b)

View File

@ -5,31 +5,31 @@ sidebar_position: 352
# cramersVBiasCorrected
Cramer's V is a measure of association between two columns in a table. The result of the [`cramersV` function](./cramersv.md) ranges from 0 (corresponding to no association between the variables) to 1 and can reach 1 only when each value is completely determined by the other. The function can be heavily biased, so this version of Cramer's V uses the [bias correction](https://en.wikipedia.org/wiki/Cram%C3%A9r%27s_V#Bias_correction).
**Syntax**
``` sql
cramersVBiasCorrected(column1, column2)
```
**Arguments**
**Parameters**
- `column1` and `column2` are the columns to be compared
- `column1`: first column to be compared.
- `column2`: second column to be compared.
**Returned value**
- a value between 0 (corresponding to no association between the columns' values) to 1 (complete association).
**Return type** is always [Float64](../../../sql-reference/data-types/float.md).
Type: always [Float64](../../../sql-reference/data-types/float.md).
**Example**
The following two columns being compared below have a small association with each other. Notice the result of `cramersVBiasCorrected` is smaller than the result of `cramersV`:
Query:
``` sql
SELECT
cramersV(a, b),

View File

@ -8,7 +8,7 @@ sidebar_label: VIEW
You can modify `SELECT` query that was specified when a [materialized view](../create/view.md#materialized) was created with the `ALTER TABLE … MODIFY QUERY` statement without interrupting ingestion process.
This command is created to change materialized view created with `TO [db.]name` clause. It does not change the structure of the underling storage table and it does not change the columns' definition of the materialized view, because of this the application of this command is very limited for materialized views are created without `TO [db.]name` clause.
This command is created to change materialized view created with `TO [db.]name` clause. It does not change the structure of the underlying storage table and it does not change the columns' definition of the materialized view, because of this the application of this command is very limited for materialized views are created without `TO [db.]name` clause.
**Example with TO table**

View File

@ -434,16 +434,18 @@ $ curl -v 'http://localhost:8123/predefined_query'
``` xml
<http_handlers>
<rule>
<url><![CDATA[regex:/query_param_with_url/\w+/(?P<name_1>[^/]+)(/(?P<name_2>[^/]+))?]]></url>
<url><![CDATA[regex:/query_param_with_url/(?P<name_1>[^/]+)]]></url>
<methods>GET</methods>
<headers>
<XXX>TEST_HEADER_VALUE</XXX>
<PARAMS_XXX><![CDATA[(?P<name_1>[^/]+)(/(?P<name_2>[^/]+))?]]></PARAMS_XXX>
<PARAMS_XXX><![CDATA[regex:(?P<name_2>[^/]+)]]></PARAMS_XXX>
</headers>
<handler>
<type>predefined_query_handler</type>
<query>SELECT value FROM system.settings WHERE name = {name_1:String}</query>
<query>SELECT name, value FROM system.settings WHERE name = {name_2:String}</query>
<query>
SELECT name, value FROM system.settings
WHERE name IN ({name_1:String}, {name_2:String})
</query>
</handler>
</rule>
<defaults/>
@ -451,13 +453,13 @@ $ curl -v 'http://localhost:8123/predefined_query'
```
``` bash
$ curl -H 'XXX:TEST_HEADER_VALUE' -H 'PARAMS_XXX:max_threads' 'http://localhost:8123/query_param_with_url/1/max_threads/max_final_threads?max_threads=1&max_final_threads=2'
1
max_final_threads 2
$ curl -H 'XXX:TEST_HEADER_VALUE' -H 'PARAMS_XXX:max_final_threads' 'http://localhost:8123/query_param_with_url/max_threads?max_threads=1&max_final_threads=2'
max_final_threads 2
max_threads 1
```
:::note Предупреждение
В одном `predefined_query_handler` поддерживается только один запрос типа `INSERT`.
В одном `predefined_query_handler` поддерживается только один запрос.
:::
### dynamic_query_handler {#dynamic_query_handler}

View File

@ -427,29 +427,32 @@ $ curl -v 'http://localhost:8123/predefined_query'
``` xml
<http_handlers>
<rule>
<url><![CDATA[/query_param_with_url/\w+/(?P<name_1>[^/]+)(/(?P<name_2>[^/]+))?]]></url>
<method>GET</method>
<url><![CDATA[regex:/query_param_with_url/(?P<name_1>[^/]+)]]></url>
<methods>GET</methods>
<headers>
<XXX>TEST_HEADER_VALUE</XXX>
<PARAMS_XXX><![CDATA[(?P<name_1>[^/]+)(/(?P<name_2>[^/]+))?]]></PARAMS_XXX>
<PARAMS_XXX><![CDATA[regex:(?P<name_2>[^/]+)]]></PARAMS_XXX>
</headers>
<handler>
<type>predefined_query_handler</type>
<query>SELECT value FROM system.settings WHERE name = {name_1:String}</query>
<query>SELECT name, value FROM system.settings WHERE name = {name_2:String}</query>
<query>
SELECT name, value FROM system.settings
WHERE name IN ({name_1:String}, {name_2:String})
</query>
</handler>
</rule>
<defaults/>
</http_handlers>
```
``` bash
$ curl -H 'XXX:TEST_HEADER_VALUE' -H 'PARAMS_XXX:max_threads' 'http://localhost:8123/query_param_with_url/1/max_threads/max_final_threads?max_threads=1&max_final_threads=2'
1
max_final_threads 2
$ curl -H 'XXX:TEST_HEADER_VALUE' -H 'PARAMS_XXX:max_final_threads' 'http://localhost:8123/query_param_with_url/max_threads?max_threads=1&max_final_threads=2'
max_final_threads 2
max_threads 1
```
:::warning
在一个`predefined_query_handler`中,只支持insert类型的一个`查询`。
在一个`predefined_query_handler`中,只支持的一个`查询`。
:::
### 动态查询 {#dynamic_query_handler}

View File

@ -662,7 +662,6 @@ int mainEntryClickHouseInstall(int argc, char ** argv)
" <server>\n"
" <certificateFile>" << (config_dir / "server.crt").string() << "</certificateFile>\n"
" <privateKeyFile>" << (config_dir / "server.key").string() << "</privateKeyFile>\n"
" <dhParamsFile>" << (config_dir / "dhparam.pem").string() << "</dhParamsFile>\n"
" </server>\n"
" </openSSL>\n"
"</clickhouse>\n";

View File

@ -3992,9 +3992,15 @@ IdentifierResolveResult QueryAnalyzer::tryResolveIdentifierInParentScopes(const
}
else if (resolved_identifier->as<ConstantNode>())
{
lookup_result.resolved_identifier = resolved_identifier;
return lookup_result;
}
else if (auto * resolved_function = resolved_identifier->as<FunctionNode>())
{
/// Special case: scalar subquery was executed and replaced by __getScalar function.
/// Handle it as a constant.
if (resolved_function->getFunctionName() == "__getScalar")
return lookup_result;
}
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
"Resolve identifier '{}' from parent scope only supported for constants and CTE. Actual {} node type {}. In scope {}",
@ -6082,7 +6088,9 @@ ProjectionNames QueryAnalyzer::resolveFunction(QueryTreeNodePtr & node, Identifi
* Example: SELECT toTypeName(sum(number)) FROM numbers(10);
*/
if (column && isColumnConst(*column) && !typeid_cast<const ColumnConst *>(column.get())->getDataColumn().isDummy() &&
(!hasAggregateFunctionNodes(node) && !hasFunctionNode(node, "arrayJoin")))
!hasAggregateFunctionNodes(node) && !hasFunctionNode(node, "arrayJoin") &&
/// Sanity check: do not convert large columns to constants
column->byteSize() < 1_MiB)
{
/// Replace function node with result constant node
Field column_constant_value;

View File

@ -29,7 +29,8 @@ NamesAndTypes extractProjectionColumnsForGroupBy(const QueryNode * query_node)
return {};
NamesAndTypes result;
for (const auto & group_by_ele : query_node->getGroupByNode()->getChildren())
const auto & group_by_elements = query_node->getGroupByNode()->getChildren();
for (const auto & group_by_element : group_by_elements)
{
const auto & projection_columns = query_node->getProjectionColumns();
const auto & projection_nodes = query_node->getProjection().getNodes();
@ -38,10 +39,18 @@ NamesAndTypes extractProjectionColumnsForGroupBy(const QueryNode * query_node)
for (size_t i = 0; i < projection_columns.size(); i++)
{
if (projection_nodes[i]->isEqual(*group_by_ele))
if (projection_nodes[i]->isEqual(*group_by_element))
{
result.push_back(projection_columns[i]);
break;
}
}
}
/// If some group by keys are not matched, we cannot apply optimization,
/// because prefix of group by keys may not be unique.
if (result.size() != group_by_elements.size())
return {};
return result;
}

View File

@ -417,9 +417,9 @@ void DatabaseAtomic::assertCanBeDetached(bool cleanup)
}
DatabaseTablesIteratorPtr
DatabaseAtomic::getTablesIterator(ContextPtr local_context, const IDatabase::FilterByNameFunction & filter_by_table_name) const
DatabaseAtomic::getTablesIterator(ContextPtr local_context, const IDatabase::FilterByNameFunction & filter_by_table_name, bool skip_not_loaded) const
{
auto base_iter = DatabaseOrdinary::getTablesIterator(local_context, filter_by_table_name);
auto base_iter = DatabaseOrdinary::getTablesIterator(local_context, filter_by_table_name, skip_not_loaded);
return std::make_unique<AtomicDatabaseTablesSnapshotIterator>(std::move(typeid_cast<DatabaseTablesSnapshotIterator &>(*base_iter)));
}

View File

@ -46,7 +46,7 @@ public:
void drop(ContextPtr /*context*/) override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name, bool skip_not_loaded) const override;
void beforeLoadingMetadata(ContextMutablePtr context, LoadingStrictnessLevel mode) override;

View File

@ -80,7 +80,7 @@ StoragePtr DatabaseDictionary::tryGetTable(const String & table_name, ContextPtr
return createStorageDictionary(getDatabaseName(), load_result, getContext());
}
DatabaseTablesIteratorPtr DatabaseDictionary::getTablesIterator(ContextPtr, const FilterByNameFunction & filter_by_table_name) const
DatabaseTablesIteratorPtr DatabaseDictionary::getTablesIterator(ContextPtr, const FilterByNameFunction & filter_by_table_name, bool /* skip_not_loaded */) const
{
return std::make_unique<DatabaseTablesSnapshotIterator>(listTables(filter_by_table_name), getDatabaseName());
}

View File

@ -34,7 +34,7 @@ public:
StoragePtr tryGetTable(const String & table_name, ContextPtr context) const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name, bool skip_not_loaded) const override;
bool empty() const override;

View File

@ -229,7 +229,7 @@ std::vector<std::pair<ASTPtr, StoragePtr>> DatabaseFilesystem::getTablesForBacku
* Returns an empty iterator because the database does not have its own tables
* But only caches them for quick access
*/
DatabaseTablesIteratorPtr DatabaseFilesystem::getTablesIterator(ContextPtr, const FilterByNameFunction &) const
DatabaseTablesIteratorPtr DatabaseFilesystem::getTablesIterator(ContextPtr, const FilterByNameFunction &, bool) const
{
return std::make_unique<DatabaseTablesSnapshotIterator>(Tables{}, getDatabaseName());
}

View File

@ -45,7 +45,7 @@ public:
std::vector<std::pair<ASTPtr, StoragePtr>> getTablesForBackup(const FilterByNameFunction &, const ContextPtr &) const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr, const FilterByNameFunction &) const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr, const FilterByNameFunction &, bool) const override;
protected:
StoragePtr getTableImpl(const String & name, ContextPtr context, bool throw_on_error) const;

View File

@ -225,7 +225,7 @@ std::vector<std::pair<ASTPtr, StoragePtr>> DatabaseHDFS::getTablesForBackup(cons
* Returns an empty iterator because the database does not have its own tables
* But only caches them for quick access
*/
DatabaseTablesIteratorPtr DatabaseHDFS::getTablesIterator(ContextPtr, const FilterByNameFunction &) const
DatabaseTablesIteratorPtr DatabaseHDFS::getTablesIterator(ContextPtr, const FilterByNameFunction &, bool) const
{
return std::make_unique<DatabaseTablesSnapshotIterator>(Tables{}, getDatabaseName());
}

View File

@ -45,7 +45,7 @@ public:
void shutdown() override;
std::vector<std::pair<ASTPtr, StoragePtr>> getTablesForBackup(const FilterByNameFunction &, const ContextPtr &) const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr, const FilterByNameFunction &) const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr, const FilterByNameFunction &, bool) const override;
protected:
StoragePtr getTableImpl(const String & name, ContextPtr context) const;

View File

@ -152,7 +152,7 @@ StoragePtr DatabaseLazy::tryGetTable(const String & table_name) const
return loadTable(table_name);
}
DatabaseTablesIteratorPtr DatabaseLazy::getTablesIterator(ContextPtr, const FilterByNameFunction & filter_by_table_name) const
DatabaseTablesIteratorPtr DatabaseLazy::getTablesIterator(ContextPtr, const FilterByNameFunction & filter_by_table_name, bool /* skip_not_loaded */) const
{
std::lock_guard lock(mutex);
Strings filtered_tables;

View File

@ -62,7 +62,7 @@ public:
bool empty() const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name, bool skip_not_loaded) const override;
void attachTable(ContextPtr context, const String & table_name, const StoragePtr & table, const String & relative_table_path) override;

View File

@ -438,24 +438,40 @@ void DatabaseOrdinary::stopLoading()
stop_load_table.clear();
}
DatabaseTablesIteratorPtr DatabaseOrdinary::getTablesIterator(ContextPtr local_context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const
DatabaseTablesIteratorPtr DatabaseOrdinary::getTablesIterator(ContextPtr local_context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name, bool skip_not_loaded) const
{
// Wait for every table (matching the filter) to be loaded and started up before we make the snapshot.
// It is important, because otherwise table might be:
// - not attached and thus will be missed in the snapshot;
// - not started, which is not good for DDL operations.
LoadTaskPtrs tasks_to_wait;
if (!skip_not_loaded)
{
// Wait for every table (matching the filter) to be loaded and started up before we make the snapshot.
// It is important, because otherwise table might be:
// - not attached and thus will be missed in the snapshot;
// - not started, which is not good for DDL operations.
LoadTaskPtrs tasks_to_wait;
{
std::lock_guard lock(mutex);
if (!filter_by_table_name)
tasks_to_wait.reserve(startup_table.size());
for (const auto & [table_name, task] : startup_table)
if (!filter_by_table_name || filter_by_table_name(table_name))
tasks_to_wait.emplace_back(task);
}
waitLoad(currentPoolOr(TablesLoaderForegroundPoolId), tasks_to_wait);
}
return DatabaseWithOwnTablesBase::getTablesIterator(local_context, filter_by_table_name, skip_not_loaded);
}
Strings DatabaseOrdinary::getAllTableNames(ContextPtr) const
{
std::set<String> unique_names;
{
std::lock_guard lock(mutex);
if (!filter_by_table_name)
tasks_to_wait.reserve(startup_table.size());
for (const auto & [table_name, task] : startup_table)
if (!filter_by_table_name || filter_by_table_name(table_name))
tasks_to_wait.emplace_back(task);
for (const auto & [table_name, _] : tables)
unique_names.emplace(table_name);
// Not yet loaded table are not listed in `tables`, so we have to add table names from tasks
for (const auto & [table_name, _] : startup_table)
unique_names.emplace(table_name);
}
waitLoad(currentPoolOr(TablesLoaderForegroundPoolId), tasks_to_wait);
return DatabaseWithOwnTablesBase::getTablesIterator(local_context, filter_by_table_name);
return {unique_names.begin(), unique_names.end()};
}
void DatabaseOrdinary::alterTable(ContextPtr local_context, const StorageID & table_id, const StorageInMemoryMetadata & metadata)

View File

@ -56,7 +56,8 @@ public:
LoadTaskPtr startupDatabaseAsync(AsyncLoader & async_loader, LoadJobSet startup_after, LoadingStrictnessLevel mode) override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr local_context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr local_context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name, bool skip_not_loaded) const override;
Strings getAllTableNames(ContextPtr context) const override;
void alterTable(
ContextPtr context,

View File

@ -873,7 +873,7 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep
std::vector<RenameEdge> replicated_tables_to_rename;
size_t total_tables = 0;
std::vector<UUID> replicated_ids;
for (auto existing_tables_it = getTablesIterator(getContext(), {}); existing_tables_it->isValid();
for (auto existing_tables_it = getTablesIterator(getContext(), {}, /*skip_not_loaded=*/false); existing_tables_it->isValid();
existing_tables_it->next(), ++total_tables)
{
String name = existing_tables_it->name();
@ -1324,7 +1324,6 @@ void DatabaseReplicated::drop(ContextPtr context_)
void DatabaseReplicated::stopReplication()
{
stopLoading();
if (ddl_worker)
ddl_worker->shutdown();
}

View File

@ -303,7 +303,7 @@ std::vector<std::pair<ASTPtr, StoragePtr>> DatabaseS3::getTablesForBackup(const
* Returns an empty iterator because the database does not have its own tables
* But only caches them for quick access
*/
DatabaseTablesIteratorPtr DatabaseS3::getTablesIterator(ContextPtr, const FilterByNameFunction &) const
DatabaseTablesIteratorPtr DatabaseS3::getTablesIterator(ContextPtr, const FilterByNameFunction &, bool) const
{
return std::make_unique<DatabaseTablesSnapshotIterator>(Tables{}, getDatabaseName());
}

View File

@ -56,7 +56,7 @@ public:
void shutdown() override;
std::vector<std::pair<ASTPtr, StoragePtr>> getTablesForBackup(const FilterByNameFunction &, const ContextPtr &) const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr, const FilterByNameFunction &) const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr, const FilterByNameFunction &, bool) const override;
static Configuration parseArguments(ASTs engine_args, ContextPtr context);

View File

@ -226,7 +226,7 @@ StoragePtr DatabaseWithOwnTablesBase::tryGetTable(const String & table_name, Con
return tryGetTableNoWait(table_name);
}
DatabaseTablesIteratorPtr DatabaseWithOwnTablesBase::getTablesIterator(ContextPtr, const FilterByNameFunction & filter_by_table_name) const
DatabaseTablesIteratorPtr DatabaseWithOwnTablesBase::getTablesIterator(ContextPtr, const FilterByNameFunction & filter_by_table_name, bool /* skip_not_loaded */) const
{
std::lock_guard lock(mutex);
if (!filter_by_table_name)
@ -363,7 +363,7 @@ std::vector<std::pair<ASTPtr, StoragePtr>> DatabaseWithOwnTablesBase::getTablesF
{
std::vector<std::pair<ASTPtr, StoragePtr>> res;
for (auto it = getTablesIterator(local_context, filter); it->isValid(); it->next())
for (auto it = getTablesIterator(local_context, filter, /*skip_not_loaded=*/false); it->isValid(); it->next())
{
auto storage = it->table();
if (!storage)

View File

@ -35,7 +35,7 @@ public:
StoragePtr detachTable(ContextPtr context, const String & table_name) override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name, bool skip_not_loaded) const override;
std::vector<std::pair<ASTPtr, StoragePtr>> getTablesForBackup(const FilterByNameFunction & filter, const ContextPtr & local_context) const override;
void createTableRestoredFromBackup(const ASTPtr & create_table_query, ContextMutablePtr local_context, std::shared_ptr<IRestoreCoordination> restore_coordination, UInt64 timeout_ms) override;

View File

@ -254,7 +254,7 @@ void DatabasesOverlay::shutdown()
db->shutdown();
}
DatabaseTablesIteratorPtr DatabasesOverlay::getTablesIterator(ContextPtr context_, const FilterByNameFunction & filter_by_table_name) const
DatabaseTablesIteratorPtr DatabasesOverlay::getTablesIterator(ContextPtr context_, const FilterByNameFunction & filter_by_table_name, bool /*skip_not_loaded*/) const
{
Tables tables;
for (const auto & db : databases)

View File

@ -51,7 +51,7 @@ public:
void createTableRestoredFromBackup(const ASTPtr & create_table_query, ContextMutablePtr local_context, std::shared_ptr<IRestoreCoordination> restore_coordination, UInt64 timeout_ms) override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name, bool skip_not_loaded) const override;
bool empty() const override;

View File

@ -229,7 +229,18 @@ public:
/// Get an iterator that allows you to pass through all the tables.
/// It is possible to have "hidden" tables that are not visible when passing through, but are visible if you get them by name using the functions above.
virtual DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name = {}) const = 0; /// NOLINT
/// Wait for all tables to be loaded and started up. If `skip_not_loaded` is true, then not yet loaded or not yet started up (at the moment of iterator creation) tables are excluded.
virtual DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name = {}, bool skip_not_loaded = false) const = 0; /// NOLINT
/// Returns list of table names.
virtual Strings getAllTableNames(ContextPtr context) const
{
// NOTE: This default implementation wait for all tables to be loaded and started up. It should be reimplemented for databases that support async loading.
Strings result;
for (auto table_it = getTablesIterator(context); table_it->isValid(); table_it->next())
result.emplace_back(table_it->name());
return result;
}
/// Is the database empty.
virtual bool empty() const = 0;

View File

@ -185,9 +185,9 @@ StoragePtr DatabaseMaterializedMySQL::tryGetTable(const String & name, ContextPt
}
DatabaseTablesIteratorPtr
DatabaseMaterializedMySQL::getTablesIterator(ContextPtr context_, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const
DatabaseMaterializedMySQL::getTablesIterator(ContextPtr context_, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name, bool skip_not_loaded) const
{
DatabaseTablesIteratorPtr iterator = DatabaseAtomic::getTablesIterator(context_, filter_by_table_name);
DatabaseTablesIteratorPtr iterator = DatabaseAtomic::getTablesIterator(context_, filter_by_table_name, skip_not_loaded);
if (context_->isInternalQuery())
return iterator;
return std::make_unique<DatabaseMaterializedTablesIterator>(std::move(iterator), this);
@ -201,7 +201,6 @@ void DatabaseMaterializedMySQL::checkIsInternalQuery(ContextPtr context_, const
void DatabaseMaterializedMySQL::stopReplication()
{
stopLoading();
materialize_thread.stopSynchronization();
started_up = false;
}

View File

@ -73,7 +73,7 @@ public:
StoragePtr tryGetTable(const String & name, ContextPtr context_) const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context_, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context_, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name, bool skip_not_loaded) const override;
void checkIsInternalQuery(ContextPtr context_, const char * method) const;

View File

@ -105,7 +105,7 @@ bool DatabaseMySQL::empty() const
return true;
}
DatabaseTablesIteratorPtr DatabaseMySQL::getTablesIterator(ContextPtr local_context, const FilterByNameFunction & filter_by_table_name) const
DatabaseTablesIteratorPtr DatabaseMySQL::getTablesIterator(ContextPtr local_context, const FilterByNameFunction & filter_by_table_name, bool /* skip_not_loaded */) const
{
Tables tables;
std::lock_guard lock(mutex);

View File

@ -58,7 +58,7 @@ public:
bool empty() const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_nam, bool skip_not_loaded) const override;
ASTPtr getCreateDatabaseQuery() const override;

View File

@ -456,8 +456,6 @@ void DatabaseMaterializedPostgreSQL::shutdown()
void DatabaseMaterializedPostgreSQL::stopReplication()
{
stopLoading();
std::lock_guard lock(handler_mutex);
if (replication_handler)
replication_handler->shutdown();
@ -485,10 +483,10 @@ void DatabaseMaterializedPostgreSQL::drop(ContextPtr local_context)
DatabaseTablesIteratorPtr DatabaseMaterializedPostgreSQL::getTablesIterator(
ContextPtr local_context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const
ContextPtr local_context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name, bool skip_not_loaded) const
{
/// Modify context into nested_context and pass query to Atomic database.
return DatabaseAtomic::getTablesIterator(StorageMaterializedPostgreSQL::makeNestedTableContext(local_context), filter_by_table_name);
return DatabaseAtomic::getTablesIterator(StorageMaterializedPostgreSQL::makeNestedTableContext(local_context), filter_by_table_name, skip_not_loaded);
}
void registerDatabaseMaterializedPostgreSQL(DatabaseFactory & factory)

View File

@ -45,7 +45,7 @@ public:
void stopLoading() override;
DatabaseTablesIteratorPtr
getTablesIterator(ContextPtr context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const override;
getTablesIterator(ContextPtr context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name, bool skip_not_loaded) const override;
StoragePtr tryGetTable(const String & name, ContextPtr context) const override;

View File

@ -97,7 +97,7 @@ bool DatabasePostgreSQL::empty() const
}
DatabaseTablesIteratorPtr DatabasePostgreSQL::getTablesIterator(ContextPtr local_context, const FilterByNameFunction & /* filter_by_table_name */) const
DatabaseTablesIteratorPtr DatabasePostgreSQL::getTablesIterator(ContextPtr local_context, const FilterByNameFunction & /* filter_by_table_name */, bool /* skip_not_loaded */) const
{
std::lock_guard lock(mutex);
Tables tables;

View File

@ -46,7 +46,7 @@ public:
void loadStoredObjects(ContextMutablePtr, LoadingStrictnessLevel /*mode*/) override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name, bool skip_not_loaded) const override;
bool isTableExist(const String & name, ContextPtr context) const override;
StoragePtr tryGetTable(const String & name, ContextPtr context) const override;

View File

@ -46,7 +46,7 @@ bool DatabaseSQLite::empty() const
}
DatabaseTablesIteratorPtr DatabaseSQLite::getTablesIterator(ContextPtr local_context, const IDatabase::FilterByNameFunction &) const
DatabaseTablesIteratorPtr DatabaseSQLite::getTablesIterator(ContextPtr local_context, const IDatabase::FilterByNameFunction &, bool) const
{
std::lock_guard lock(mutex);

View File

@ -32,7 +32,7 @@ public:
StoragePtr tryGetTable(const String & name, ContextPtr context) const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name, bool skip_not_loaded) const override;
bool empty() const override;

View File

@ -56,7 +56,6 @@ UserDefinedSQLObjectsDiskStorage::UserDefinedSQLObjectsDiskStorage(const Context
, dir_path{makeDirectoryPathCanonical(dir_path_)}
, log{getLogger("UserDefinedSQLObjectsLoaderFromDisk")}
{
createDirectory();
}
@ -122,7 +121,12 @@ void UserDefinedSQLObjectsDiskStorage::reloadObjects()
void UserDefinedSQLObjectsDiskStorage::loadObjectsImpl()
{
LOG_INFO(log, "Loading user defined objects from {}", dir_path);
createDirectory();
if (!std::filesystem::exists(dir_path))
{
LOG_DEBUG(log, "The directory for user defined objects ({}) does not exist: nothing to load", dir_path);
return;
}
std::vector<std::pair<String, ASTPtr>> function_names_and_queries;
@ -157,7 +161,6 @@ void UserDefinedSQLObjectsDiskStorage::loadObjectsImpl()
void UserDefinedSQLObjectsDiskStorage::reloadObject(UserDefinedSQLObjectType object_type, const String & object_name)
{
createDirectory();
auto ast = tryLoadObject(object_type, object_name);
if (ast)
setObject(object_name, *ast);
@ -185,6 +188,7 @@ bool UserDefinedSQLObjectsDiskStorage::storeObjectImpl(
bool replace_if_exists,
const Settings & settings)
{
createDirectory();
String file_path = getFilePath(object_type, object_name);
LOG_DEBUG(log, "Storing user-defined object {} to file {}", backQuote(object_name), file_path);

View File

@ -1604,6 +1604,9 @@ void DatabaseCatalog::reloadDisksTask()
for (auto & database : getDatabases())
{
// WARNING: In case of `async_load_databases = true` getTablesIterator() call wait for all table in the database to be loaded.
// WARNING: It means that no database will be able to update configuration until all databases are fully loaded.
// TODO: We can split this task by table or by database to make loaded table operate as usual.
auto it = database.second->getTablesIterator(getContext());
while (it->isValid())
{
@ -1741,10 +1744,9 @@ std::pair<String, String> TableNameHints::getExtendedHintForTable(const String &
Names TableNameHints::getAllRegisteredNames() const
{
Names result;
if (database)
for (auto table_it = database->getTablesIterator(context); table_it->isValid(); table_it->next())
result.emplace_back(table_it->name());
return result;
return database->getAllTableNames(context);
return {};
}
}

View File

@ -1187,7 +1187,7 @@ private:
else
{
auto result = std::chrono::system_clock::now() + std::chrono::seconds(calculateDurationWithBackoff(rnd_engine, error_count));
LOG_TRACE(log, "Supposed update time for unspecified object is {} (backoff, {} errors.", to_string(result), error_count);
LOG_TRACE(log, "Supposed update time for unspecified object is {} (backoff, {} errors)", to_string(result), error_count);
return result;
}
}

View File

@ -412,6 +412,7 @@ BlockIO InterpreterDropQuery::executeToDatabaseImpl(const ASTDropQuery & query,
table_context->setInternalQuery(true);
/// Do not hold extra shared pointers to tables
std::vector<std::pair<String, bool>> tables_to_drop;
// NOTE: This means we wait for all tables to be loaded inside getTablesIterator() call in case of `async_load_databases = true`.
for (auto iterator = database->getTablesIterator(table_context); iterator->isValid(); iterator->next())
{
auto table_ptr = iterator->table();

View File

@ -278,7 +278,8 @@ void ServerAsynchronousMetrics::updateImpl(TimePoint update_time, TimePoint curr
bool is_system = db.first == DatabaseCatalog::SYSTEM_DATABASE;
for (auto iterator = db.second->getTablesIterator(getContext()); iterator->isValid(); iterator->next())
// Note that we skip not yet loaded tables, so metrics could possibly be lower than expected on fully loaded database just after server start if `async_load_databases = true`.
for (auto iterator = db.second->getTablesIterator(getContext(), {}, /*skip_not_loaded=*/true); iterator->isValid(); iterator->next())
{
++total_number_of_tables;
if (is_system)
@ -408,7 +409,7 @@ void ServerAsynchronousMetrics::updateDetachedPartsStats()
if (!db.second->canContainMergeTreeTables())
continue;
for (auto iterator = db.second->getTablesIterator(getContext()); iterator->isValid(); iterator->next())
for (auto iterator = db.second->getTablesIterator(getContext(), {}, true); iterator->isValid(); iterator->next())
{
const auto & table = iterator->table();
if (!table)

View File

@ -235,7 +235,9 @@ public:
static bool needChildVisit(const QueryTreeNodePtr &, const QueryTreeNodePtr & child_node)
{
auto child_node_type = child_node->getNodeType();
return !(child_node_type == QueryTreeNodeType::QUERY || child_node_type == QueryTreeNodeType::UNION);
return child_node_type != QueryTreeNodeType::QUERY &&
child_node_type != QueryTreeNodeType::UNION &&
child_node_type != QueryTreeNodeType::LAMBDA;
}
private:

View File

@ -86,7 +86,7 @@ Chunk ArrowBlockInputFormat::read()
/// If defaults_for_omitted_fields is true, calculate the default values from default expression for omitted fields.
/// Otherwise fill the missing columns with zero values of its type.
BlockMissingValues * block_missing_values_ptr = format_settings.defaults_for_omitted_fields ? &block_missing_values : nullptr;
arrow_column_to_ch_column->arrowTableToCHChunk(res, *table_result, (*table_result)->num_rows(), block_missing_values_ptr);
res = arrow_column_to_ch_column->arrowTableToCHChunk(*table_result, (*table_result)->num_rows(), block_missing_values_ptr);
/// There is no easy way to get original record batch size from Arrow metadata.
/// Let's just use the number of bytes read from read buffer.

View File

@ -78,7 +78,7 @@ namespace ErrorCodes
/// Inserts numeric data right into internal column data to reduce an overhead
template <typename NumericType, typename VectorType = ColumnVector<NumericType>>
static ColumnWithTypeAndName readColumnWithNumericData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
static ColumnWithTypeAndName readColumnWithNumericData(const std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
{
auto internal_type = std::make_shared<DataTypeNumber<NumericType>>();
auto internal_column = internal_type->createColumn();
@ -103,7 +103,7 @@ static ColumnWithTypeAndName readColumnWithNumericData(std::shared_ptr<arrow::Ch
/// Internal offsets are shifted by one to the right in comparison with Arrow ones. So the last offset should map to the end of all chars.
/// Also internal strings are null terminated.
template <typename ArrowArray>
static ColumnWithTypeAndName readColumnWithStringData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
static ColumnWithTypeAndName readColumnWithStringData(const std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
{
auto internal_type = std::make_shared<DataTypeString>();
auto internal_column = internal_type->createColumn();
@ -147,7 +147,7 @@ static ColumnWithTypeAndName readColumnWithStringData(std::shared_ptr<arrow::Chu
return {std::move(internal_column), std::move(internal_type), column_name};
}
static ColumnWithTypeAndName readColumnWithFixedStringData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
static ColumnWithTypeAndName readColumnWithFixedStringData(const std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
{
const auto * fixed_type = assert_cast<arrow::FixedSizeBinaryType *>(arrow_column->type().get());
size_t fixed_len = fixed_type->byte_width();
@ -166,7 +166,7 @@ static ColumnWithTypeAndName readColumnWithFixedStringData(std::shared_ptr<arrow
}
template <typename ValueType>
static ColumnWithTypeAndName readColumnWithBigIntegerFromFixedBinaryData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name, const DataTypePtr & column_type)
static ColumnWithTypeAndName readColumnWithBigIntegerFromFixedBinaryData(const std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name, const DataTypePtr & column_type)
{
const auto * fixed_type = assert_cast<arrow::FixedSizeBinaryType *>(arrow_column->type().get());
size_t fixed_len = fixed_type->byte_width();
@ -193,7 +193,7 @@ static ColumnWithTypeAndName readColumnWithBigIntegerFromFixedBinaryData(std::sh
}
template <typename ColumnType, typename ValueType = typename ColumnType::ValueType>
static ColumnWithTypeAndName readColumnWithBigNumberFromBinaryData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name, const DataTypePtr & column_type)
static ColumnWithTypeAndName readColumnWithBigNumberFromBinaryData(const std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name, const DataTypePtr & column_type)
{
size_t total_size = 0;
for (int chunk_i = 0, num_chunks = arrow_column->num_chunks(); chunk_i < num_chunks; ++chunk_i)
@ -229,7 +229,7 @@ static ColumnWithTypeAndName readColumnWithBigNumberFromBinaryData(std::shared_p
return {std::move(internal_column), column_type, column_name};
}
static ColumnWithTypeAndName readColumnWithBooleanData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
static ColumnWithTypeAndName readColumnWithBooleanData(const std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
{
auto internal_type = DataTypeFactory::instance().get("Bool");
auto internal_column = internal_type->createColumn();
@ -248,7 +248,7 @@ static ColumnWithTypeAndName readColumnWithBooleanData(std::shared_ptr<arrow::Ch
return {std::move(internal_column), internal_type, column_name};
}
static ColumnWithTypeAndName readColumnWithDate32Data(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name,
static ColumnWithTypeAndName readColumnWithDate32Data(const std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name,
const DataTypePtr & type_hint, FormatSettings::DateTimeOverflowBehavior date_time_overflow_behavior)
{
DataTypePtr internal_type;
@ -310,7 +310,7 @@ static ColumnWithTypeAndName readColumnWithDate32Data(std::shared_ptr<arrow::Chu
}
/// Arrow stores Parquet::DATETIME in Int64, while ClickHouse stores DateTime in UInt32. Therefore, it should be checked before saving
static ColumnWithTypeAndName readColumnWithDate64Data(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
static ColumnWithTypeAndName readColumnWithDate64Data(const std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
{
auto internal_type = std::make_shared<DataTypeDateTime>();
auto internal_column = internal_type->createColumn();
@ -329,7 +329,7 @@ static ColumnWithTypeAndName readColumnWithDate64Data(std::shared_ptr<arrow::Chu
return {std::move(internal_column), std::move(internal_type), column_name};
}
static ColumnWithTypeAndName readColumnWithTimestampData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
static ColumnWithTypeAndName readColumnWithTimestampData(const std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
{
const auto & arrow_type = static_cast<const arrow::TimestampType &>(*(arrow_column->type()));
const UInt8 scale = arrow_type.unit() * 3;
@ -350,7 +350,7 @@ static ColumnWithTypeAndName readColumnWithTimestampData(std::shared_ptr<arrow::
}
template <typename TimeType, typename TimeArray>
static ColumnWithTypeAndName readColumnWithTimeData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
static ColumnWithTypeAndName readColumnWithTimeData(const std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
{
const auto & arrow_type = static_cast<const TimeType &>(*(arrow_column->type()));
const UInt8 scale = arrow_type.unit() * 3;
@ -373,18 +373,18 @@ static ColumnWithTypeAndName readColumnWithTimeData(std::shared_ptr<arrow::Chunk
return {std::move(internal_column), std::move(internal_type), column_name};
}
static ColumnWithTypeAndName readColumnWithTime32Data(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
static ColumnWithTypeAndName readColumnWithTime32Data(const std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
{
return readColumnWithTimeData<arrow::Time32Type, arrow::Time32Array>(arrow_column, column_name);
}
static ColumnWithTypeAndName readColumnWithTime64Data(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
static ColumnWithTypeAndName readColumnWithTime64Data(const std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
{
return readColumnWithTimeData<arrow::Time64Type, arrow::Time64Array>(arrow_column, column_name);
}
template <typename DecimalType, typename DecimalArray>
static ColumnWithTypeAndName readColumnWithDecimalDataImpl(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name, DataTypePtr internal_type)
static ColumnWithTypeAndName readColumnWithDecimalDataImpl(const std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name, DataTypePtr internal_type)
{
auto internal_column = internal_type->createColumn();
auto & column = assert_cast<ColumnDecimal<DecimalType> &>(*internal_column);
@ -403,7 +403,7 @@ static ColumnWithTypeAndName readColumnWithDecimalDataImpl(std::shared_ptr<arrow
}
template <typename DecimalArray>
static ColumnWithTypeAndName readColumnWithDecimalData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
static ColumnWithTypeAndName readColumnWithDecimalData(const std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
{
const auto * arrow_decimal_type = static_cast<arrow::DecimalType *>(arrow_column->type().get());
size_t precision = arrow_decimal_type->precision();
@ -418,7 +418,7 @@ static ColumnWithTypeAndName readColumnWithDecimalData(std::shared_ptr<arrow::Ch
}
/// Creates a null bytemap from arrow's null bitmap
static ColumnPtr readByteMapFromArrowColumn(std::shared_ptr<arrow::ChunkedArray> & arrow_column)
static ColumnPtr readByteMapFromArrowColumn(const std::shared_ptr<arrow::ChunkedArray> & arrow_column)
{
if (!arrow_column->null_count())
return ColumnUInt8::create(arrow_column->length(), 0);
@ -453,7 +453,7 @@ struct ArrowOffsetArray<arrow::LargeListArray>
};
template <typename ArrowListArray>
static ColumnPtr readOffsetsFromArrowListColumn(std::shared_ptr<arrow::ChunkedArray> & arrow_column)
static ColumnPtr readOffsetsFromArrowListColumn(const std::shared_ptr<arrow::ChunkedArray> & arrow_column)
{
auto offsets_column = ColumnUInt64::create();
ColumnArray::Offsets & offsets_data = assert_cast<ColumnVector<UInt64> &>(*offsets_column).getData();
@ -463,7 +463,7 @@ static ColumnPtr readOffsetsFromArrowListColumn(std::shared_ptr<arrow::ChunkedAr
{
ArrowListArray & list_chunk = dynamic_cast<ArrowListArray &>(*(arrow_column->chunk(chunk_i)));
auto arrow_offsets_array = list_chunk.offsets();
auto & arrow_offsets = dynamic_cast<ArrowOffsetArray<ArrowListArray>::type &>(*arrow_offsets_array);
auto & arrow_offsets = dynamic_cast<typename ArrowOffsetArray<ArrowListArray>::type &>(*arrow_offsets_array);
/*
* CH uses element size as "offsets", while arrow uses actual offsets as offsets.
@ -620,7 +620,7 @@ static ColumnPtr readColumnWithIndexesData(std::shared_ptr<arrow::ChunkedArray>
}
template <typename ArrowListArray>
static std::shared_ptr<arrow::ChunkedArray> getNestedArrowColumn(std::shared_ptr<arrow::ChunkedArray> & arrow_column)
static std::shared_ptr<arrow::ChunkedArray> getNestedArrowColumn(const std::shared_ptr<arrow::ChunkedArray> & arrow_column)
{
arrow::ArrayVector array_vector;
array_vector.reserve(arrow_column->num_chunks());
@ -648,7 +648,7 @@ static std::shared_ptr<arrow::ChunkedArray> getNestedArrowColumn(std::shared_ptr
return std::make_shared<arrow::ChunkedArray>(array_vector);
}
static ColumnWithTypeAndName readIPv6ColumnFromBinaryData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
static ColumnWithTypeAndName readIPv6ColumnFromBinaryData(const std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
{
size_t total_size = 0;
for (int chunk_i = 0, num_chunks = arrow_column->num_chunks(); chunk_i < num_chunks; ++chunk_i)
@ -684,7 +684,7 @@ static ColumnWithTypeAndName readIPv6ColumnFromBinaryData(std::shared_ptr<arrow:
return {std::move(internal_column), std::move(internal_type), column_name};
}
static ColumnWithTypeAndName readIPv4ColumnWithInt32Data(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
static ColumnWithTypeAndName readIPv4ColumnWithInt32Data(const std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
{
auto internal_type = std::make_shared<DataTypeIPv4>();
auto internal_column = internal_type->createColumn();
@ -705,35 +705,31 @@ static ColumnWithTypeAndName readIPv4ColumnWithInt32Data(std::shared_ptr<arrow::
return {std::move(internal_column), std::move(internal_type), column_name};
}
static ColumnWithTypeAndName readColumnFromArrowColumn(
std::shared_ptr<arrow::ChunkedArray> & arrow_column,
const std::string & column_name,
const std::string & format_name,
bool is_nullable,
std::unordered_map<String, ArrowColumnToCHColumn::DictionaryInfo> & dictionary_infos,
bool allow_null_type,
bool skip_columns_with_unsupported_types,
bool & skipped,
FormatSettings::DateTimeOverflowBehavior date_time_overflow_behavior = FormatSettings::DateTimeOverflowBehavior::Ignore,
DataTypePtr type_hint = nullptr,
bool is_map_nested = false)
struct ReadColumnFromArrowColumnSettings
{
if (!is_nullable && (arrow_column->null_count() || (type_hint && type_hint->isNullable())) && arrow_column->type()->id() != arrow::Type::LIST
&& arrow_column->type()->id() != arrow::Type::MAP && arrow_column->type()->id() != arrow::Type::STRUCT &&
arrow_column->type()->id() != arrow::Type::DICTIONARY)
{
DataTypePtr nested_type_hint;
if (type_hint)
nested_type_hint = removeNullable(type_hint);
auto nested_column = readColumnFromArrowColumn(arrow_column, column_name, format_name, true, dictionary_infos, allow_null_type, skip_columns_with_unsupported_types, skipped, date_time_overflow_behavior, nested_type_hint);
if (skipped)
return {};
auto nullmap_column = readByteMapFromArrowColumn(arrow_column);
auto nullable_type = std::make_shared<DataTypeNullable>(std::move(nested_column.type));
auto nullable_column = ColumnNullable::create(nested_column.column, nullmap_column);
return {std::move(nullable_column), std::move(nullable_type), column_name};
}
std::string format_name;
FormatSettings::DateTimeOverflowBehavior date_time_overflow_behavior;
bool allow_arrow_null_type;
bool skip_columns_with_unsupported_types;
};
static ColumnWithTypeAndName readColumnFromArrowColumn(
const std::shared_ptr<arrow::ChunkedArray> & arrow_column,
std::string column_name,
std::unordered_map<String, ArrowColumnToCHColumn::DictionaryInfo> dictionary_infos,
DataTypePtr type_hint,
bool is_nullable_column,
bool is_map_nested_column,
const ReadColumnFromArrowColumnSettings & settings);
static ColumnWithTypeAndName readNonNullableColumnFromArrowColumn(
const std::shared_ptr<arrow::ChunkedArray> & arrow_column,
std::string column_name,
std::unordered_map<String, ArrowColumnToCHColumn::DictionaryInfo> dictionary_infos,
DataTypePtr type_hint,
bool is_map_nested_column,
const ReadColumnFromArrowColumnSettings & settings)
{
switch (arrow_column->type()->id())
{
case arrow::Type::STRING:
@ -790,7 +786,7 @@ static ColumnWithTypeAndName readColumnFromArrowColumn(
case arrow::Type::BOOL:
return readColumnWithBooleanData(arrow_column, column_name);
case arrow::Type::DATE32:
return readColumnWithDate32Data(arrow_column, column_name, type_hint, date_time_overflow_behavior);
return readColumnWithDate32Data(arrow_column, column_name, type_hint, settings.date_time_overflow_behavior);
case arrow::Type::DATE64:
return readColumnWithDate64Data(arrow_column, column_name);
// ClickHouse writes Date as arrow UINT16 and DateTime as arrow UINT32,
@ -837,9 +833,16 @@ static ColumnWithTypeAndName readColumnFromArrowColumn(
key_type_hint = map_type_hint->getKeyType();
}
}
auto arrow_nested_column = getNestedArrowColumn<arrow::ListArray>(arrow_column);
auto nested_column = readColumnFromArrowColumn(arrow_nested_column, column_name, format_name, false, dictionary_infos, allow_null_type, skip_columns_with_unsupported_types, skipped, date_time_overflow_behavior, nested_type_hint, true);
if (skipped)
auto nested_column = readColumnFromArrowColumn(arrow_nested_column,
column_name,
dictionary_infos,
nested_type_hint,
false /*is_nullable_column*/,
true /*is_map_nested_column*/,
settings);
if (!nested_column.column)
return {};
auto offsets_column = readOffsetsFromArrowListColumn<arrow::ListArray>(arrow_column);
@ -866,7 +869,7 @@ static ColumnWithTypeAndName readColumnFromArrowColumn(
case arrow::Type::LIST:
case arrow::Type::LARGE_LIST:
{
bool is_large = arrow_column->type()->id() == arrow::Type::LARGE_LIST;
bool is_large_list = arrow_column->type()->id() == arrow::Type::LARGE_LIST;
DataTypePtr nested_type_hint;
if (type_hint)
{
@ -874,12 +877,33 @@ static ColumnWithTypeAndName readColumnFromArrowColumn(
if (array_type_hint)
nested_type_hint = array_type_hint->getNestedType();
}
auto arrow_nested_column = is_large ? getNestedArrowColumn<arrow::LargeListArray>(arrow_column) : getNestedArrowColumn<arrow::ListArray>(arrow_column);
auto nested_column = readColumnFromArrowColumn(arrow_nested_column, column_name, format_name, false, dictionary_infos, allow_null_type, skip_columns_with_unsupported_types, skipped, date_time_overflow_behavior, nested_type_hint);
if (skipped)
bool is_nested_nullable_column = false;
if (is_large_list)
{
auto * arrow_large_list_type = assert_cast<arrow::LargeListType *>(arrow_column->type().get());
is_nested_nullable_column = arrow_large_list_type->value_field()->nullable();
}
else
{
auto * arrow_list_type = assert_cast<arrow::ListType *>(arrow_column->type().get());
is_nested_nullable_column = arrow_list_type->value_field()->nullable();
}
auto arrow_nested_column = is_large_list ? getNestedArrowColumn<arrow::LargeListArray>(arrow_column) : getNestedArrowColumn<arrow::ListArray>(arrow_column);
auto nested_column = readColumnFromArrowColumn(arrow_nested_column,
column_name,
dictionary_infos,
nested_type_hint,
is_nested_nullable_column,
false /*is_map_nested_column*/,
settings);
if (!nested_column.column)
return {};
auto offsets_column = is_large ? readOffsetsFromArrowListColumn<arrow::LargeListArray>(arrow_column) : readOffsetsFromArrowListColumn<arrow::ListArray>(arrow_column);
auto offsets_column = is_large_list ? readOffsetsFromArrowListColumn<arrow::LargeListArray>(arrow_column) : readOffsetsFromArrowListColumn<arrow::ListArray>(arrow_column);
auto array_column = ColumnArray::create(nested_column.column, offsets_column);
DataTypePtr array_type;
/// If type hint is Nested, we should return Nested type,
/// because we differentiate Nested and simple Array(Tuple)
@ -913,11 +937,13 @@ static ColumnWithTypeAndName readColumnFromArrowColumn(
for (int i = 0; i != arrow_struct_type->num_fields(); ++i)
{
auto field_name = arrow_struct_type->field(i)->name();
const auto & field = arrow_struct_type->field(i);
const auto & field_name = field->name();
DataTypePtr nested_type_hint;
if (tuple_type_hint)
{
if (tuple_type_hint->haveExplicitNames() && !is_map_nested)
if (tuple_type_hint->haveExplicitNames() && !is_map_nested_column)
{
auto pos = tuple_type_hint->tryGetPositionByName(field_name);
if (pos)
@ -926,13 +952,21 @@ static ColumnWithTypeAndName readColumnFromArrowColumn(
else if (size_t(i) < tuple_type_hint->getElements().size())
nested_type_hint = tuple_type_hint->getElement(i);
}
auto nested_arrow_column = std::make_shared<arrow::ChunkedArray>(nested_arrow_columns[i]);
auto element = readColumnFromArrowColumn(nested_arrow_column, field_name, format_name, false, dictionary_infos, allow_null_type, skip_columns_with_unsupported_types, skipped, date_time_overflow_behavior, nested_type_hint);
if (skipped)
auto column_with_type_and_name = readColumnFromArrowColumn(nested_arrow_column,
field_name,
dictionary_infos,
nested_type_hint,
field->nullable(),
false /*is_map_nested_column*/,
settings);
if (!column_with_type_and_name.column)
return {};
tuple_elements.emplace_back(std::move(element.column));
tuple_types.emplace_back(std::move(element.type));
tuple_names.emplace_back(std::move(element.name));
tuple_elements.emplace_back(std::move(column_with_type_and_name.column));
tuple_types.emplace_back(std::move(column_with_type_and_name.type));
tuple_names.emplace_back(std::move(column_with_type_and_name.name));
}
auto tuple_column = ColumnTuple::create(std::move(tuple_elements));
@ -953,8 +987,19 @@ static ColumnWithTypeAndName readColumnFromArrowColumn(
arrow::DictionaryArray & dict_chunk = dynamic_cast<arrow::DictionaryArray &>(*(arrow_column->chunk(chunk_i)));
dict_array.emplace_back(dict_chunk.dictionary());
}
auto arrow_dict_column = std::make_shared<arrow::ChunkedArray>(dict_array);
auto dict_column = readColumnFromArrowColumn(arrow_dict_column, column_name, format_name, false, dictionary_infos, allow_null_type, skip_columns_with_unsupported_types, skipped, date_time_overflow_behavior);
auto dict_column = readColumnFromArrowColumn(arrow_dict_column,
column_name,
dictionary_infos,
nullptr /*nested_type_hint*/,
false /*is_nullable_column*/,
false /*is_map_nested_column*/,
settings);
if (!dict_column.column)
return {};
for (size_t i = 0; i != dict_column.column->size(); ++i)
{
if (dict_column.column->isDefaultAt(i))
@ -963,6 +1008,7 @@ static ColumnWithTypeAndName readColumnFromArrowColumn(
break;
}
}
auto lc_type = std::make_shared<DataTypeLowCardinality>(is_lc_nullable ? makeNullable(dict_column.type) : dict_column.type);
auto tmp_lc_column = lc_type->createColumn();
auto tmp_dict_column = IColumn::mutate(assert_cast<ColumnLowCardinality *>(tmp_lc_column.get())->getDictionaryPtr());
@ -1002,7 +1048,7 @@ static ColumnWithTypeAndName readColumnFromArrowColumn(
// TODO: read UUID as a string?
case arrow::Type::NA:
{
if (allow_null_type)
if (settings.allow_arrow_null_type)
{
auto type = std::make_shared<DataTypeNothing>();
auto column = ColumnNothing::create(arrow_column->length());
@ -1012,11 +1058,8 @@ static ColumnWithTypeAndName readColumnFromArrowColumn(
}
default:
{
if (skip_columns_with_unsupported_types)
{
skipped = true;
if (settings.skip_columns_with_unsupported_types)
return {};
}
throw Exception(
ErrorCodes::UNKNOWN_TYPE,
@ -1024,14 +1067,59 @@ static ColumnWithTypeAndName readColumnFromArrowColumn(
"If it happens during schema inference and you want to skip columns with "
"unsupported types, you can enable setting input_format_{}"
"_skip_columns_with_unsupported_types_in_schema_inference",
format_name,
settings.format_name,
arrow_column->type()->name(),
column_name,
boost::algorithm::to_lower_copy(format_name));
boost::algorithm::to_lower_copy(settings.format_name));
}
}
}
static ColumnWithTypeAndName readColumnFromArrowColumn(
const std::shared_ptr<arrow::ChunkedArray> & arrow_column,
std::string column_name,
std::unordered_map<String, ArrowColumnToCHColumn::DictionaryInfo> dictionary_infos,
DataTypePtr type_hint,
bool is_nullable_column,
bool is_map_nested_column,
const ReadColumnFromArrowColumnSettings & settings)
{
bool read_as_nullable_column = arrow_column->null_count() || is_nullable_column || (type_hint && type_hint->isNullable());
if (read_as_nullable_column &&
arrow_column->type()->id() != arrow::Type::LIST &&
arrow_column->type()->id() != arrow::Type::LARGE_LIST &&
arrow_column->type()->id() != arrow::Type::MAP &&
arrow_column->type()->id() != arrow::Type::STRUCT &&
arrow_column->type()->id() != arrow::Type::DICTIONARY)
{
DataTypePtr nested_type_hint;
if (type_hint)
nested_type_hint = removeNullable(type_hint);
auto nested_column = readNonNullableColumnFromArrowColumn(arrow_column,
column_name,
dictionary_infos,
nested_type_hint,
is_map_nested_column,
settings);
if (!nested_column.column)
return {};
auto nullmap_column = readByteMapFromArrowColumn(arrow_column);
auto nullable_type = std::make_shared<DataTypeNullable>(std::move(nested_column.type));
auto nullable_column = ColumnNullable::create(nested_column.column, nullmap_column);
return {std::move(nullable_column), std::move(nullable_type), column_name};
}
return readNonNullableColumnFromArrowColumn(arrow_column,
column_name,
dictionary_infos,
type_hint,
is_map_nested_column,
settings);
}
// Creating CH header by arrow schema. Will be useful in task about inserting
// data from file without knowing table structure.
@ -1042,44 +1130,56 @@ static void checkStatus(const arrow::Status & status, const String & column_name
throw Exception{ErrorCodes::UNKNOWN_EXCEPTION, "Error with a {} column '{}': {}.", format_name, column_name, status.ToString()};
}
/// Create empty arrow column using specified field
static std::shared_ptr<arrow::ChunkedArray> createArrowColumn(const std::shared_ptr<arrow::Field> & field, const String & format_name)
{
arrow::MemoryPool * pool = arrow::default_memory_pool();
std::unique_ptr<arrow::ArrayBuilder> array_builder;
arrow::Status status = MakeBuilder(pool, field->type(), &array_builder);
checkStatus(status, field->name(), format_name);
std::shared_ptr<arrow::Array> arrow_array;
status = array_builder->Finish(&arrow_array);
checkStatus(status, field->name(), format_name);
return std::make_shared<arrow::ChunkedArray>(arrow::ArrayVector{arrow_array});
}
Block ArrowColumnToCHColumn::arrowSchemaToCHHeader(
const arrow::Schema & schema, const std::string & format_name,
bool skip_columns_with_unsupported_types, const Block * hint_header, bool ignore_case)
const arrow::Schema & schema,
const std::string & format_name,
bool skip_columns_with_unsupported_types)
{
ReadColumnFromArrowColumnSettings settings
{
.format_name = format_name,
.date_time_overflow_behavior = FormatSettings::DateTimeOverflowBehavior::Ignore,
.allow_arrow_null_type = false,
.skip_columns_with_unsupported_types = skip_columns_with_unsupported_types
};
ColumnsWithTypeAndName sample_columns;
std::unordered_set<String> nested_table_names;
if (hint_header)
nested_table_names = Nested::getAllTableNames(*hint_header, ignore_case);
for (const auto & field : schema.fields())
{
if (hint_header && !hint_header->has(field->name(), ignore_case)
&& !nested_table_names.contains(ignore_case ? boost::to_lower_copy(field->name()) : field->name()))
continue;
/// Create empty arrow column by it's type and convert it to ClickHouse column.
arrow::MemoryPool * pool = arrow::default_memory_pool();
std::unique_ptr<arrow::ArrayBuilder> array_builder;
arrow::Status status = MakeBuilder(pool, field->type(), &array_builder);
checkStatus(status, field->name(), format_name);
auto arrow_column = createArrowColumn(field, format_name);
std::shared_ptr<arrow::Array> arrow_array;
status = array_builder->Finish(&arrow_array);
checkStatus(status, field->name(), format_name);
arrow::ArrayVector array_vector = {arrow_array};
auto arrow_column = std::make_shared<arrow::ChunkedArray>(array_vector);
std::unordered_map<std::string, DictionaryInfo> dict_infos;
bool skipped = false;
bool allow_null_type = false;
if (hint_header && hint_header->has(field->name()) && hint_header->getByName(field->name()).type->isNullable())
allow_null_type = true;
ColumnWithTypeAndName sample_column = readColumnFromArrowColumn(
arrow_column, field->name(), format_name, false, dict_infos, allow_null_type, skip_columns_with_unsupported_types, skipped);
if (!skipped)
auto sample_column = readColumnFromArrowColumn(
arrow_column,
field->name(),
dict_infos,
nullptr /*nested_type_hint*/,
field->nullable() /*is_nullable_column*/,
false /*is_map_nested_column*/,
settings);
if (sample_column.column)
sample_columns.emplace_back(std::move(sample_column));
}
return Block(std::move(sample_columns));
}
@ -1101,30 +1201,43 @@ ArrowColumnToCHColumn::ArrowColumnToCHColumn(
{
}
void ArrowColumnToCHColumn::arrowTableToCHChunk(Chunk & res, std::shared_ptr<arrow::Table> & table, size_t num_rows, BlockMissingValues * block_missing_values)
Chunk ArrowColumnToCHColumn::arrowTableToCHChunk(const std::shared_ptr<arrow::Table> & table, size_t num_rows, BlockMissingValues * block_missing_values)
{
NameToColumnPtr name_to_column_ptr;
NameToArrowColumn name_to_arrow_column;
for (auto column_name : table->ColumnNames())
{
std::shared_ptr<arrow::ChunkedArray> arrow_column = table->GetColumnByName(column_name);
auto arrow_column = table->GetColumnByName(column_name);
if (!arrow_column)
throw Exception(ErrorCodes::DUPLICATE_COLUMN, "Column '{}' is duplicated", column_name);
auto arrow_field = table->schema()->GetFieldByName(column_name);
if (case_insensitive_matching)
boost::to_lower(column_name);
name_to_column_ptr[std::move(column_name)] = arrow_column;
name_to_arrow_column[std::move(column_name)] = {std::move(arrow_column), std::move(arrow_field)};
}
arrowColumnsToCHChunk(res, name_to_column_ptr, num_rows, block_missing_values);
return arrowColumnsToCHChunk(name_to_arrow_column, num_rows, block_missing_values);
}
void ArrowColumnToCHColumn::arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr & name_to_column_ptr, size_t num_rows, BlockMissingValues * block_missing_values)
Chunk ArrowColumnToCHColumn::arrowColumnsToCHChunk(const NameToArrowColumn & name_to_arrow_column, size_t num_rows, BlockMissingValues * block_missing_values)
{
Columns columns_list;
columns_list.reserve(header.columns());
ReadColumnFromArrowColumnSettings settings
{
.format_name = format_name,
.date_time_overflow_behavior = date_time_overflow_behavior,
.allow_arrow_null_type = true,
.skip_columns_with_unsupported_types = false
};
Columns columns;
columns.reserve(header.columns());
std::unordered_map<String, std::pair<BlockPtr, std::shared_ptr<NestedColumnExtractHelper>>> nested_tables;
bool skipped = false;
for (size_t column_i = 0, columns = header.columns(); column_i < columns; ++column_i)
for (size_t column_i = 0, header_columns = header.columns(); column_i < header_columns; ++column_i)
{
const ColumnWithTypeAndName & header_column = header.getByPosition(column_i);
@ -1133,15 +1246,17 @@ void ArrowColumnToCHColumn::arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr &
boost::to_lower(search_column_name);
ColumnWithTypeAndName column;
if (!name_to_column_ptr.contains(search_column_name))
if (!name_to_arrow_column.contains(search_column_name))
{
bool read_from_nested = false;
/// Check if it's a subcolumn from some struct.
String nested_table_name = Nested::extractTableName(header_column.name);
String search_nested_table_name = nested_table_name;
if (case_insensitive_matching)
boost::to_lower(search_nested_table_name);
if (name_to_column_ptr.contains(search_nested_table_name))
if (name_to_arrow_column.contains(search_nested_table_name))
{
if (!nested_tables.contains(search_nested_table_name))
{
@ -1153,10 +1268,19 @@ void ArrowColumnToCHColumn::arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr &
}
auto nested_table_type = Nested::collect(nested_columns).front().type;
std::shared_ptr<arrow::ChunkedArray> arrow_column = name_to_column_ptr[search_nested_table_name];
ColumnsWithTypeAndName cols = {
readColumnFromArrowColumn(arrow_column, nested_table_name, format_name, false, dictionary_infos, true, false,
skipped, date_time_overflow_behavior, nested_table_type)};
const auto & arrow_column = name_to_arrow_column.find(search_nested_table_name)->second;
ColumnsWithTypeAndName cols =
{
readColumnFromArrowColumn(arrow_column.column,
nested_table_name,
dictionary_infos,
nested_table_type,
arrow_column.field->nullable() /*is_nullable_column*/,
false /*is_map_nested_column*/,
settings)
};
BlockPtr block_ptr = std::make_shared<Block>(cols);
auto column_extractor = std::make_shared<NestedColumnExtractHelper>(*block_ptr, case_insensitive_matching);
nested_tables[search_nested_table_name] = {block_ptr, column_extractor};
@ -1180,7 +1304,7 @@ void ArrowColumnToCHColumn::arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr &
column.name = header_column.name;
column.type = header_column.type;
column.column = header_column.column->cloneResized(num_rows);
columns_list.push_back(std::move(column.column));
columns.push_back(std::move(column.column));
if (block_missing_values)
block_missing_values->setBits(column_i, num_rows);
continue;
@ -1189,9 +1313,14 @@ void ArrowColumnToCHColumn::arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr &
}
else
{
auto arrow_column = name_to_column_ptr[search_column_name];
column = readColumnFromArrowColumn(
arrow_column, header_column.name, format_name, false, dictionary_infos, true, false, skipped, date_time_overflow_behavior, header_column.type);
const auto & arrow_column = name_to_arrow_column.find(search_column_name)->second;
column = readColumnFromArrowColumn(arrow_column.column,
header_column.name,
dictionary_infos,
header_column.type,
arrow_column.field->nullable(),
false /*is_map_nested_column*/,
settings);
}
if (null_as_default)
@ -1216,10 +1345,10 @@ void ArrowColumnToCHColumn::arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr &
}
column.type = header_column.type;
columns_list.push_back(std::move(column.column));
columns.push_back(std::move(column.column));
}
res.setColumns(columns_list, num_rows);
return Chunk(std::move(columns), num_rows);
}
}

View File

@ -19,8 +19,6 @@ class Chunk;
class ArrowColumnToCHColumn
{
public:
using NameToColumnPtr = std::unordered_map<std::string, std::shared_ptr<arrow::ChunkedArray>>;
ArrowColumnToCHColumn(
const Block & header_,
const std::string & format_name_,
@ -30,18 +28,13 @@ public:
bool case_insensitive_matching_ = false,
bool is_stream_ = false);
void arrowTableToCHChunk(Chunk & res, std::shared_ptr<arrow::Table> & table, size_t num_rows, BlockMissingValues * block_missing_values = nullptr);
Chunk arrowTableToCHChunk(const std::shared_ptr<arrow::Table> & table, size_t num_rows, BlockMissingValues * block_missing_values = nullptr);
void arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr & name_to_column_ptr, size_t num_rows, BlockMissingValues * block_missing_values = nullptr);
/// Transform arrow schema to ClickHouse header. If hint_header is provided,
/// we will skip columns in schema that are not in hint_header.
/// Transform arrow schema to ClickHouse header
static Block arrowSchemaToCHHeader(
const arrow::Schema & schema,
const std::string & format_name,
bool skip_columns_with_unsupported_types = false,
const Block * hint_header = nullptr,
bool ignore_case = false);
bool skip_columns_with_unsupported_types = false);
struct DictionaryInfo
{
@ -52,6 +45,16 @@ public:
private:
struct ArrowColumn
{
std::shared_ptr<arrow::ChunkedArray> column;
std::shared_ptr<arrow::Field> field;
};
using NameToArrowColumn = std::unordered_map<std::string, ArrowColumn>;
Chunk arrowColumnsToCHChunk(const NameToArrowColumn & name_to_arrow_column, size_t num_rows, BlockMissingValues * block_missing_values);
const Block & header;
const std::string format_name;
/// If false, throw exception if some columns in header not exists in arrow table.

View File

@ -71,12 +71,10 @@ Chunk ORCBlockInputFormat::read()
approx_bytes_read_for_chunk = file_reader->GetRawORCReader()->getStripe(stripe_current)->getDataLength();
++stripe_current;
Chunk res;
/// If defaults_for_omitted_fields is true, calculate the default values from default expression for omitted fields.
/// Otherwise fill the missing columns with zero values of its type.
BlockMissingValues * block_missing_values_ptr = format_settings.defaults_for_omitted_fields ? &block_missing_values : nullptr;
arrow_column_to_ch_column->arrowTableToCHChunk(res, table, num_rows, block_missing_values_ptr);
return res;
return arrow_column_to_ch_column->arrowTableToCHChunk(table, num_rows, block_missing_values_ptr);
}
void ORCBlockInputFormat::resetParser()

View File

@ -601,7 +601,7 @@ void ParquetBlockInputFormat::decodeOneChunk(size_t row_group_batch_idx, std::un
/// If defaults_for_omitted_fields is true, calculate the default values from default expression for omitted fields.
/// Otherwise fill the missing columns with zero values of its type.
BlockMissingValues * block_missing_values_ptr = format_settings.defaults_for_omitted_fields ? &res.block_missing_values : nullptr;
row_group_batch.arrow_column_to_ch_column->arrowTableToCHChunk(res.chunk, *tmp_table, (*tmp_table)->num_rows(), block_missing_values_ptr);
res.chunk = row_group_batch.arrow_column_to_ch_column->arrowTableToCHChunk(*tmp_table, (*tmp_table)->num_rows(), block_missing_values_ptr);
lock.lock();

View File

@ -1,4 +1,5 @@
#include <algorithm>
#include <limits>
#include <memory>
#include <numeric>
#include <queue>
@ -125,14 +126,20 @@ int compareValues(const Values & lhs, const Values & rhs)
class IndexAccess
{
public:
explicit IndexAccess(const RangesInDataParts & parts_) : parts(parts_) { }
explicit IndexAccess(const RangesInDataParts & parts_) : parts(parts_)
{
/// Some suffix of index columns might not be loaded (see `primary_key_ratio_of_unique_prefix_values_to_skip_suffix_columns`)
/// and we need to use the same set of index columns across all parts.
for (const auto & part : parts)
loaded_columns = std::min(loaded_columns, part.data_part->getIndex().size());
}
Values getValue(size_t part_idx, size_t mark) const
{
const auto & index = parts[part_idx].data_part->getIndex();
size_t size = index.size();
Values values(size);
for (size_t i = 0; i < size; ++i)
chassert(index.size() >= loaded_columns);
Values values(loaded_columns);
for (size_t i = 0; i < loaded_columns; ++i)
{
index[i]->get(mark, values[i]);
if (values[i].isNull())
@ -199,6 +206,7 @@ public:
}
private:
const RangesInDataParts & parts;
size_t loaded_columns = std::numeric_limits<size_t>::max();
};
class RangesInDataPartsBuilder

View File

@ -51,7 +51,10 @@ void ReplicasStatusHandler::handleRequest(HTTPServerRequest & request, HTTPServe
if (!db.second->canContainMergeTreeTables())
continue;
for (auto iterator = db.second->getTablesIterator(getContext()); iterator->isValid(); iterator->next())
// Note that in case `async_load_databases = true` we do not want replica status handler to be hanging
// and waiting (in getTablesIterator() call) for every table to be load, so we just skip not-yet-loaded tables.
// If they have some lag it will be reflected as soon as they are load.
for (auto iterator = db.second->getTablesIterator(getContext(), {}, true); iterator->isValid(); iterator->next())
{
const auto & table = iterator->table();
if (!table)

View File

@ -282,11 +282,10 @@ struct DeltaLakeMetadataParser<Configuration, MetadataReadHelper>::Impl
format_settings.date_time_overflow_behavior,
/* case_insensitive_column_matching */false);
Chunk res;
std::shared_ptr<arrow::Table> table;
THROW_ARROW_NOT_OK(reader->ReadTable(&table));
column_reader.arrowTableToCHChunk(res, table, reader->parquet_reader()->metadata()->num_rows());
Chunk res = column_reader.arrowTableToCHChunk(table, reader->parquet_reader()->metadata()->num_rows());
const auto & res_columns = res.getColumns();
if (res_columns.size() != 2)

View File

@ -1,3 +1,4 @@
#include <string_view>
#include <Storages/MergeTree/DataPartStorageOnDiskBase.h>
#include <Storages/MergeTree/MergeTreeDataPartChecksum.h>
#include <Disks/TemporaryFileOnDisk.h>
@ -13,6 +14,7 @@
#include <Backups/BackupEntryWrappedWith.h>
#include <Backups/BackupSettings.h>
#include <Disks/SingleDiskVolume.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
namespace DB
@ -64,7 +66,7 @@ std::optional<String> DataPartStorageOnDiskBase::getRelativePathForPrefix(Logger
auto full_relative_path = fs::path(root_path);
if (detached)
full_relative_path /= "detached";
full_relative_path /= MergeTreeData::DETACHED_DIR_NAME;
std::optional<String> original_checksums_content;
std::optional<Strings> original_files_list;
@ -109,7 +111,7 @@ bool DataPartStorageOnDiskBase::looksLikeBrokenDetachedPartHasTheSameContent(con
if (!exists("checksums.txt"))
return false;
auto storage_from_detached = create(volume, fs::path(root_path) / "detached", detached_part_path, /*initialize=*/ true);
auto storage_from_detached = create(volume, fs::path(root_path) / MergeTreeData::DETACHED_DIR_NAME, detached_part_path, /*initialize=*/ true);
if (!storage_from_detached->exists("checksums.txt"))
return false;
@ -490,7 +492,7 @@ MutableDataPartStoragePtr DataPartStorageOnDiskBase::freeze(
auto single_disk_volume = std::make_shared<SingleDiskVolume>(disk->getName(), disk, 0);
/// Do not initialize storage in case of DETACH because part may be broken.
bool to_detached = dir_path.starts_with("detached/");
bool to_detached = dir_path.starts_with(std::string_view((fs::path(MergeTreeData::DETACHED_DIR_NAME) / "").string()));
return create(single_disk_volume, to, dir_path, /*initialize=*/ !to_detached && !params.external_transaction);
}
@ -618,7 +620,7 @@ void DataPartStorageOnDiskBase::remove(
if (part_dir_without_slash.has_parent_path())
{
auto parent_path = part_dir_without_slash.parent_path();
if (parent_path == "detached")
if (parent_path == MergeTreeData::DETACHED_DIR_NAME)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Trying to remove detached part {} with path {} in remove function. It shouldn't happen",

View File

@ -14,6 +14,7 @@
#include <Storages/MergeTree/ReplicatedFetchList.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/DataPartStorageOnDiskFull.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Common/CurrentMetrics.h>
#include <Common/NetException.h>
#include <Common/randomDelay.h>
@ -21,7 +22,6 @@
#include <base/scope_guard.h>
#include <Poco/Net/HTTPRequest.h>
#include <boost/algorithm/string/join.hpp>
#include <iterator>
#include <base/sort.h>
@ -803,7 +803,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk(
throw Exception(ErrorCodes::LOGICAL_ERROR, "`tmp_prefix` and `part_name` cannot be empty or contain '.' or '/' characters.");
auto part_dir = tmp_prefix + part_name;
auto part_relative_path = data.getRelativeDataPath() + String(to_detached ? "detached/" : "");
auto part_relative_path = data.getRelativeDataPath() + String(to_detached ? MergeTreeData::DETACHED_DIR_NAME : "");
auto volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, disk);
/// Create temporary part storage to write sent files.

View File

@ -1844,7 +1844,7 @@ try
}
catch (...)
{
if (startsWith(new_relative_path, "detached/"))
if (startsWith(new_relative_path, fs::path(MergeTreeData::DETACHED_DIR_NAME) / ""))
{
// Don't throw when the destination is to the detached folder. It might be able to
// recover in some cases, such as fetching parts into multi-disks while some of the
@ -1957,7 +1957,7 @@ std::optional<String> IMergeTreeDataPart::getRelativePathForDetachedPart(const S
DetachedPartInfo::DETACH_REASONS.end(),
prefix) != DetachedPartInfo::DETACH_REASONS.end());
if (auto path = getRelativePathForPrefix(prefix, /* detached */ true, broken))
return "detached/" + *path;
return fs::path(MergeTreeData::DETACHED_DIR_NAME) / *path;
return {};
}

View File

@ -262,7 +262,7 @@ void MergeTreeData::initializeDirectoriesAndFormatVersion(const std::string & re
if (need_create_directories)
{
disk->createDirectories(relative_data_path);
disk->createDirectories(fs::path(relative_data_path) / MergeTreeData::DETACHED_DIR_NAME);
disk->createDirectories(fs::path(relative_data_path) / DETACHED_DIR_NAME);
}
if (disk->exists(format_version_path))
@ -1713,7 +1713,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks, std::optional<std::un
{
/// Skip temporary directories, file 'format_version.txt' and directory 'detached'.
if (startsWith(it->name(), "tmp") || it->name() == MergeTreeData::FORMAT_VERSION_FILE_NAME
|| it->name() == MergeTreeData::DETACHED_DIR_NAME)
|| it->name() == DETACHED_DIR_NAME)
continue;
if (auto part_info = MergeTreePartInfo::tryParsePartName(it->name(), format_version))
@ -2796,7 +2796,7 @@ void MergeTreeData::dropAllData()
&& settings_ptr->allow_remote_fs_zero_copy_replication;
try
{
bool keep_shared = removeDetachedPart(part.disk, fs::path(relative_data_path) / "detached" / part.dir_name / "", part.dir_name);
bool keep_shared = removeDetachedPart(part.disk, fs::path(relative_data_path) / DETACHED_DIR_NAME / part.dir_name / "", part.dir_name);
LOG_DEBUG(log, "Dropped detached part {}, keep shared data: {}", part.dir_name, keep_shared);
}
catch (...)
@ -2879,8 +2879,8 @@ void MergeTreeData::dropIfEmpty()
if (disk->isBroken())
continue;
/// Non recursive, exception is thrown if there are more files.
disk->removeFileIfExists(fs::path(relative_data_path) / MergeTreeData::FORMAT_VERSION_FILE_NAME);
disk->removeDirectory(fs::path(relative_data_path) / MergeTreeData::DETACHED_DIR_NAME);
disk->removeFileIfExists(fs::path(relative_data_path) / FORMAT_VERSION_FILE_NAME);
disk->removeDirectory(fs::path(relative_data_path) / DETACHED_DIR_NAME);
disk->removeDirectory(relative_data_path);
}
}
@ -3443,7 +3443,7 @@ void MergeTreeData::changeSettings(
{
auto disk = new_storage_policy->getDiskByName(disk_name);
disk->createDirectories(relative_data_path);
disk->createDirectories(fs::path(relative_data_path) / MergeTreeData::DETACHED_DIR_NAME);
disk->createDirectories(fs::path(relative_data_path) / DETACHED_DIR_NAME);
}
/// FIXME how would that be done while reloading configuration???
@ -6037,7 +6037,7 @@ DetachedPartsInfo MergeTreeData::getDetachedParts() const
for (const auto & disk : getDisks())
{
String detached_path = fs::path(relative_data_path) / MergeTreeData::DETACHED_DIR_NAME;
String detached_path = fs::path(relative_data_path) / DETACHED_DIR_NAME;
/// Note: we don't care about TOCTOU issue here.
if (disk->exists(detached_path))
@ -6063,7 +6063,7 @@ void MergeTreeData::validateDetachedPartName(const String & name)
void MergeTreeData::dropDetached(const ASTPtr & partition, bool part, ContextPtr local_context)
{
PartsTemporaryRename renamed_parts(*this, "detached/");
PartsTemporaryRename renamed_parts(*this, DETACHED_DIR_NAME);
if (part)
{
@ -6088,7 +6088,7 @@ void MergeTreeData::dropDetached(const ASTPtr & partition, bool part, ContextPtr
for (auto & [old_name, new_name, disk] : renamed_parts.old_and_new_names)
{
bool keep_shared = removeDetachedPart(disk, fs::path(relative_data_path) / "detached" / new_name / "", old_name);
bool keep_shared = removeDetachedPart(disk, fs::path(relative_data_path) / DETACHED_DIR_NAME / new_name / "", old_name);
LOG_DEBUG(log, "Dropped detached part {}, keep shared data: {}", old_name, keep_shared);
old_name.clear();
}
@ -6097,14 +6097,14 @@ void MergeTreeData::dropDetached(const ASTPtr & partition, bool part, ContextPtr
MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const ASTPtr & partition, bool attach_part,
ContextPtr local_context, PartsTemporaryRename & renamed_parts)
{
const String source_dir = "detached/";
const fs::path source_dir = DETACHED_DIR_NAME;
/// Let's compose a list of parts that should be added.
if (attach_part)
{
const String part_id = partition->as<ASTLiteral &>().value.safeGet<String>();
validateDetachedPartName(part_id);
if (temporary_parts.contains(String(DETACHED_DIR_NAME) + "/" + part_id))
if (temporary_parts.contains(source_dir / part_id))
{
LOG_WARNING(log, "Will not try to attach part {} because its directory is temporary, "
"probably it's being detached right now", part_id);
@ -6181,7 +6181,7 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const
LOG_DEBUG(log, "Checking part {}", new_name);
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + old_name, disk);
auto part = getDataPartBuilder(old_name, single_disk_volume, source_dir + new_name)
auto part = getDataPartBuilder(old_name, single_disk_volume, source_dir / new_name)
.withPartFormatFromDisk()
.build();
@ -7212,11 +7212,10 @@ String MergeTreeData::getFullPathOnDisk(const DiskPtr & disk) const
DiskPtr MergeTreeData::tryGetDiskForDetachedPart(const String & part_name) const
{
String additional_path = "detached/";
const auto disks = getStoragePolicy()->getDisks();
for (const DiskPtr & disk : disks)
if (disk->exists(fs::path(relative_data_path) / additional_path / part_name))
if (disk->exists(fs::path(relative_data_path) / DETACHED_DIR_NAME / part_name))
return disk;
return nullptr;
@ -7791,7 +7790,7 @@ MovePartsOutcome MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr &
return result;
}
bool MergeTreeData::partsContainSameProjections(const DataPartPtr & left, const DataPartPtr & right, String & out_reason)
bool MergeTreeData::partsContainSameProjections(const DataPartPtr & left, const DataPartPtr & right, PreformattedMessage & out_reason)
{
auto remove_broken_parts_from_consideration = [](auto & parts)
{
@ -7813,7 +7812,7 @@ bool MergeTreeData::partsContainSameProjections(const DataPartPtr & left, const
if (left_projection_parts.size() != right_projection_parts.size())
{
out_reason = fmt::format(
out_reason = PreformattedMessage::create(
"Parts have different number of projections: {} in part '{}' and {} in part '{}'",
left_projection_parts.size(),
left->name,
@ -7827,7 +7826,7 @@ bool MergeTreeData::partsContainSameProjections(const DataPartPtr & left, const
{
if (!right_projection_parts.contains(name))
{
out_reason = fmt::format(
out_reason = PreformattedMessage::create(
"The part '{}' doesn't have projection '{}' while part '{}' does", right->name, name, left->name
);
return false;

View File

@ -418,7 +418,7 @@ public:
static ReservationPtr tryReserveSpace(UInt64 expected_size, const IDataPartStorage & data_part_storage);
static ReservationPtr reserveSpace(UInt64 expected_size, const IDataPartStorage & data_part_storage);
static bool partsContainSameProjections(const DataPartPtr & left, const DataPartPtr & right, String & out_reason);
static bool partsContainSameProjections(const DataPartPtr & left, const DataPartPtr & right, PreformattedMessage & out_reason);
StoragePolicyPtr getStoragePolicy() const override;

View File

@ -136,7 +136,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
const AllowedMergingPredicate & can_merge_callback,
bool merge_with_ttl_allowed,
const MergeTreeTransactionPtr & txn,
String & out_disable_reason,
PreformattedMessage & out_disable_reason,
const PartitionIdsHint * partitions_hint)
{
MergeTreeData::DataPartsVector data_parts = getDataPartsToSelectMergeFrom(txn, partitions_hint);
@ -145,7 +145,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
if (data_parts.empty())
{
out_disable_reason = "There are no parts in the table";
out_disable_reason = PreformattedMessage::create("There are no parts in the table");
return SelectPartsDecision::CANNOT_SELECT;
}
@ -153,7 +153,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
if (info.parts_selected_precondition == 0)
{
out_disable_reason = "No parts satisfy preconditions for merge";
out_disable_reason = PreformattedMessage::create("No parts satisfy preconditions for merge");
return SelectPartsDecision::CANNOT_SELECT;
}
@ -177,9 +177,9 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
/*optimize_skip_merged_partitions=*/true);
}
if (!out_disable_reason.empty())
out_disable_reason += ". ";
out_disable_reason += "There is no need to merge parts according to merge selector algorithm";
if (!out_disable_reason.text.empty())
out_disable_reason.text += ". ";
out_disable_reason.text += "There is no need to merge parts according to merge selector algorithm";
return SelectPartsDecision::CANNOT_SELECT;
}
@ -196,7 +196,7 @@ MergeTreeDataMergerMutator::PartitionIdsHint MergeTreeDataMergerMutator::getPart
auto metadata_snapshot = data.getInMemoryMetadataPtr();
String out_reason;
PreformattedMessage out_reason;
MergeSelectingInfo info = getPossibleMergeRanges(data_parts, can_merge_callback, txn, out_reason);
if (info.parts_selected_precondition == 0)
@ -223,7 +223,7 @@ MergeTreeDataMergerMutator::PartitionIdsHint MergeTreeDataMergerMutator::getPart
for (size_t i = 0; i < all_partition_ids.size(); ++i)
{
auto future_part = std::make_shared<FutureMergedMutatedPart>();
String out_disable_reason;
PreformattedMessage out_disable_reason;
/// This method should have been const, but something went wrong... it's const with dry_run = true
auto status = const_cast<MergeTreeDataMergerMutator *>(this)->selectPartsToMergeFromRanges(
future_part, /*aggressive*/ false, max_total_size_to_merge, merge_with_ttl_allowed,
@ -232,7 +232,7 @@ MergeTreeDataMergerMutator::PartitionIdsHint MergeTreeDataMergerMutator::getPart
if (status == SelectPartsDecision::SELECTED)
res.insert(all_partition_ids[i]);
else
LOG_TEST(log, "Nothing to merge in partition {}: {}", all_partition_ids[i], out_disable_reason);
LOG_TEST(log, "Nothing to merge in partition {}: {}", all_partition_ids[i], out_disable_reason.text);
}
String best_partition_id_to_optimize = getBestPartitionToOptimizeEntire(info.partitions_info);
@ -331,7 +331,7 @@ MergeTreeDataMergerMutator::MergeSelectingInfo MergeTreeDataMergerMutator::getPo
const MergeTreeData::DataPartsVector & data_parts,
const AllowedMergingPredicate & can_merge_callback,
const MergeTreeTransactionPtr & txn,
String & out_disable_reason) const
PreformattedMessage & out_disable_reason) const
{
MergeSelectingInfo res;
@ -444,7 +444,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMergeFromRanges(
const StorageMetadataPtr & metadata_snapshot,
const IMergeSelector::PartsRanges & parts_ranges,
const time_t & current_time,
String & out_disable_reason,
PreformattedMessage & out_disable_reason,
bool dry_run)
{
const auto data_settings = data.getSettings();
@ -515,7 +515,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMergeFromRanges(
if (parts_to_merge.empty())
{
out_disable_reason = "Did not find any parts to merge (with usual merge selectors)";
out_disable_reason = PreformattedMessage::create("Did not find any parts to merge (with usual merge selectors)");
return SelectPartsDecision::CANNOT_SELECT;
}
}
@ -573,20 +573,20 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectAllPartsToMergeWithinParti
bool final,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreeTransactionPtr & txn,
String & out_disable_reason,
PreformattedMessage & out_disable_reason,
bool optimize_skip_merged_partitions)
{
MergeTreeData::DataPartsVector parts = selectAllPartsFromPartition(partition_id);
if (parts.empty())
{
out_disable_reason = "There are no parts inside partition";
out_disable_reason = PreformattedMessage::create("There are no parts inside partition");
return SelectPartsDecision::CANNOT_SELECT;
}
if (!final && parts.size() == 1)
{
out_disable_reason = "There is only one part inside partition";
out_disable_reason = PreformattedMessage::create("There is only one part inside partition");
return SelectPartsDecision::CANNOT_SELECT;
}
@ -595,7 +595,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectAllPartsToMergeWithinParti
if (final && optimize_skip_merged_partitions && parts.size() == 1 && parts[0]->info.level > 0 &&
(!metadata_snapshot->hasAnyTTL() || parts[0]->checkAllTTLCalculated(metadata_snapshot)))
{
out_disable_reason = "Partition skipped due to optimize_skip_merged_partitions";
out_disable_reason = PreformattedMessage::create("Partition skipped due to optimize_skip_merged_partitions");
return SelectPartsDecision::NOTHING_TO_MERGE;
}
@ -636,7 +636,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectAllPartsToMergeWithinParti
static_cast<int>((DISK_USAGE_COEFFICIENT_TO_SELECT - 1.0) * 100));
}
out_disable_reason = fmt::format("Insufficient available disk space, required {}", ReadableSize(required_disk_space));
out_disable_reason = PreformattedMessage::create("Insufficient available disk space, required {}", ReadableSize(required_disk_space));
return SelectPartsDecision::CANNOT_SELECT;
}

View File

@ -43,7 +43,7 @@ public:
using AllowedMergingPredicate = std::function<bool (const MergeTreeData::DataPartPtr &,
const MergeTreeData::DataPartPtr &,
const MergeTreeTransaction *,
String &)>;
PreformattedMessage &)>;
explicit MergeTreeDataMergerMutator(MergeTreeData & data_);
@ -92,7 +92,7 @@ public:
const MergeTreeData::DataPartsVector & data_parts,
const AllowedMergingPredicate & can_merge_callback,
const MergeTreeTransactionPtr & txn,
String & out_disable_reason) const;
PreformattedMessage & out_disable_reason) const;
/// The third step of selecting parts to merge: takes ranges that we can merge, and selects parts that we want to merge
SelectPartsDecision selectPartsToMergeFromRanges(
@ -103,7 +103,7 @@ public:
const StorageMetadataPtr & metadata_snapshot,
const IMergeSelector::PartsRanges & parts_ranges,
const time_t & current_time,
String & out_disable_reason,
PreformattedMessage & out_disable_reason,
bool dry_run = false);
String getBestPartitionToOptimizeEntire(const PartitionsInfo & partitions_info) const;
@ -129,7 +129,7 @@ public:
const AllowedMergingPredicate & can_merge,
bool merge_with_ttl_allowed,
const MergeTreeTransactionPtr & txn,
String & out_disable_reason,
PreformattedMessage & out_disable_reason,
const PartitionIdsHint * partitions_hint = nullptr);
/** Select all the parts in the specified partition for merge, if possible.
@ -144,7 +144,7 @@ public:
bool final,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreeTransactionPtr & txn,
String & out_disable_reason,
PreformattedMessage & out_disable_reason,
bool optimize_skip_merged_partitions = false);
/** Creates a task to merge parts.

View File

@ -2287,7 +2287,7 @@ bool BaseMergePredicate<VirtualPartsT, MutationsStateT>::operator()(
const MergeTreeData::DataPartPtr & left,
const MergeTreeData::DataPartPtr & right,
const MergeTreeTransaction *,
String & out_reason) const
PreformattedMessage & out_reason) const
{
if (left)
return canMergeTwoParts(left, right, out_reason);
@ -2299,7 +2299,7 @@ template<typename VirtualPartsT, typename MutationsStateT>
bool BaseMergePredicate<VirtualPartsT, MutationsStateT>::canMergeTwoParts(
const MergeTreeData::DataPartPtr & left,
const MergeTreeData::DataPartPtr & right,
String & out_reason) const
PreformattedMessage & out_reason) const
{
/// A sketch of a proof of why this method actually works:
///
@ -2343,19 +2343,19 @@ bool BaseMergePredicate<VirtualPartsT, MutationsStateT>::canMergeTwoParts(
{
if (pinned_part_uuids_ && pinned_part_uuids_->part_uuids.contains(part->uuid))
{
out_reason = "Part " + part->name + " has uuid " + toString(part->uuid) + " which is currently pinned";
out_reason = PreformattedMessage::create("Part {} has uuid {} which is currently pinned", part->name, part->uuid);
return false;
}
if (inprogress_quorum_part_ && part->name == *inprogress_quorum_part_)
{
out_reason = "Quorum insert for part " + part->name + " is currently in progress";
out_reason = PreformattedMessage::create("Quorum insert for part {} is currently in progress", part->name);
return false;
}
if (prev_virtual_parts_ && prev_virtual_parts_->getContainingPart(part->info).empty())
{
out_reason = "Entry for part " + part->name + " hasn't been read from the replication log yet";
out_reason = PreformattedMessage::create("Entry for part {} hasn't been read from the replication log yet", part->name);
return false;
}
}
@ -2369,7 +2369,7 @@ bool BaseMergePredicate<VirtualPartsT, MutationsStateT>::canMergeTwoParts(
{
if (partition_ids_hint && !partition_ids_hint->contains(left->info.partition_id))
{
out_reason = fmt::format("Uncommitted block were not loaded for unexpected partition {}", left->info.partition_id);
out_reason = PreformattedMessage::create("Uncommitted block were not loaded for unexpected partition {}", left->info.partition_id);
return false;
}
@ -2381,8 +2381,7 @@ bool BaseMergePredicate<VirtualPartsT, MutationsStateT>::canMergeTwoParts(
auto block_it = block_numbers.upper_bound(left_max_block);
if (block_it != block_numbers.end() && *block_it < right_min_block)
{
out_reason = "Block number " + toString(*block_it) + " is still being inserted between parts "
+ left->name + " and " + right->name;
out_reason = PreformattedMessage::create("Block number {} is still being inserted between parts {} and {}", *block_it, left->name, right->name);
return false;
}
}
@ -2401,7 +2400,7 @@ bool BaseMergePredicate<VirtualPartsT, MutationsStateT>::canMergeTwoParts(
String containing_part = virtual_parts_->getContainingPart(part->info);
if (containing_part != part->name)
{
out_reason = "Part " + part->name + " has already been assigned a merge into " + containing_part;
out_reason = PreformattedMessage::create("Part {} has already been assigned a merge into {}", part->name, containing_part);
return false;
}
}
@ -2418,9 +2417,9 @@ bool BaseMergePredicate<VirtualPartsT, MutationsStateT>::canMergeTwoParts(
Strings covered = virtual_parts_->getPartsCoveredBy(gap_part_info);
if (!covered.empty())
{
out_reason = "There are " + toString(covered.size()) + " parts (from " + covered.front()
+ " to " + covered.back() + ") that are still not present or being processed by "
+ " other background process on this replica between " + left->name + " and " + right->name;
out_reason = PreformattedMessage::create("There are {} parts (from {} to {}) "
"that are still not present or being processed by other background process "
"on this replica between {} and {}", covered.size(), covered.front(), covered.back(), left->name, right->name);
return false;
}
}
@ -2436,8 +2435,8 @@ bool BaseMergePredicate<VirtualPartsT, MutationsStateT>::canMergeTwoParts(
if (left_mutation_ver != right_mutation_ver)
{
out_reason = "Current mutation versions of parts " + left->name + " and " + right->name + " differ: "
+ toString(left_mutation_ver) + " and " + toString(right_mutation_ver) + " respectively";
out_reason = PreformattedMessage::create("Current mutation versions of parts {} and {} differ: "
"{} and {} respectively", left->name, right->name, left_mutation_ver, right_mutation_ver);
return false;
}
}
@ -2448,23 +2447,23 @@ bool BaseMergePredicate<VirtualPartsT, MutationsStateT>::canMergeTwoParts(
template<typename VirtualPartsT, typename MutationsStateT>
bool BaseMergePredicate<VirtualPartsT, MutationsStateT>::canMergeSinglePart(
const MergeTreeData::DataPartPtr & part,
String & out_reason) const
PreformattedMessage & out_reason) const
{
if (pinned_part_uuids_ && pinned_part_uuids_->part_uuids.contains(part->uuid))
{
out_reason = fmt::format("Part {} has uuid {} which is currently pinned", part->name, part->uuid);
out_reason = PreformattedMessage::create("Part {} has uuid {} which is currently pinned", part->name, part->uuid);
return false;
}
if (inprogress_quorum_part_ && part->name == *inprogress_quorum_part_)
{
out_reason = fmt::format("Quorum insert for part {} is currently in progress", part->name);
out_reason = PreformattedMessage::create("Quorum insert for part {} is currently in progress", part->name);
return false;
}
if (prev_virtual_parts_ && prev_virtual_parts_->getContainingPart(part->info).empty())
{
out_reason = fmt::format("Entry for part {} hasn't been read from the replication log yet", part->name);
out_reason = PreformattedMessage::create("Entry for part {} hasn't been read from the replication log yet", part->name);
return false;
}
@ -2479,7 +2478,7 @@ bool BaseMergePredicate<VirtualPartsT, MutationsStateT>::canMergeSinglePart(
String containing_part = virtual_parts_->getContainingPart(part->info);
if (containing_part != part->name)
{
out_reason = fmt::format("Part {} has already been assigned a merge into {}", part->name, containing_part);
out_reason = PreformattedMessage::create("Part {} has already been assigned a merge into {}", part->name, containing_part);
return false;
}
}
@ -2488,7 +2487,7 @@ bool BaseMergePredicate<VirtualPartsT, MutationsStateT>::canMergeSinglePart(
}
bool ReplicatedMergeTreeMergePredicate::partParticipatesInReplaceRange(const MergeTreeData::DataPartPtr & part, String & out_reason) const
bool ReplicatedMergeTreeMergePredicate::partParticipatesInReplaceRange(const MergeTreeData::DataPartPtr & part, PreformattedMessage & out_reason) const
{
std::lock_guard lock(queue.state_mutex);
for (const auto & entry : queue.queue)
@ -2501,7 +2500,7 @@ bool ReplicatedMergeTreeMergePredicate::partParticipatesInReplaceRange(const Mer
if (part->info.isDisjoint(MergeTreePartInfo::fromPartName(part_name, queue.format_version)))
continue;
out_reason = fmt::format("Part {} participates in REPLACE_RANGE {} ({})", part_name, entry->new_part_name, entry->znode_name);
out_reason = PreformattedMessage::create("Part {} participates in REPLACE_RANGE {} ({})", part_name, entry->new_part_name, entry->znode_name);
return true;
}
}

View File

@ -505,19 +505,19 @@ public:
bool operator()(const MergeTreeData::DataPartPtr & left,
const MergeTreeData::DataPartPtr & right,
const MergeTreeTransaction * txn,
String & out_reason) const;
PreformattedMessage & out_reason) const;
/// Can we assign a merge with these two parts?
/// (assuming that no merge was assigned after the predicate was constructed)
/// If we can't and out_reason is not nullptr, set it to the reason why we can't merge.
bool canMergeTwoParts(const MergeTreeData::DataPartPtr & left,
const MergeTreeData::DataPartPtr & right,
String & out_reason) const;
PreformattedMessage & out_reason) const;
/// Can we assign a merge this part and some other part?
/// For example a merge of a part and itself is needed for TTL.
/// This predicate is checked for the first part of each range.
bool canMergeSinglePart(const MergeTreeData::DataPartPtr & part, String & out_reason) const;
bool canMergeSinglePart(const MergeTreeData::DataPartPtr & part, PreformattedMessage & out_reason) const;
CommittingBlocks getCommittingBlocks(zkutil::ZooKeeperPtr & zookeeper, const std::string & zookeeper_path, LoggerPtr log_);
@ -561,7 +561,7 @@ public:
/// Returns true if part is needed for some REPLACE_RANGE entry.
/// We should not drop part in this case, because replication queue may stuck without that part.
bool partParticipatesInReplaceRange(const MergeTreeData::DataPartPtr & part, String & out_reason) const;
bool partParticipatesInReplaceRange(const MergeTreeData::DataPartPtr & part, PreformattedMessage & out_reason) const;
/// Return nonempty optional of desired mutation version and alter version.
/// If we have no alter (modify/drop) mutations in mutations queue, than we return biggest possible

View File

@ -24,8 +24,8 @@ struct ReplicatedTableStatus
UInt64 log_max_index;
UInt64 log_pointer;
UInt64 absolute_delay;
UInt8 total_replicas;
UInt8 active_replicas;
UInt32 total_replicas;
UInt32 active_replicas;
UInt64 lost_part_count;
String last_queue_update_exception;
/// If the error has happened fetching the info from ZooKeeper, this field will be set.

View File

@ -933,7 +933,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge(
bool aggressive,
const String & partition_id,
bool final,
String & out_disable_reason,
PreformattedMessage & out_disable_reason,
TableLockHolder & /* table_lock_holder */,
std::unique_lock<std::mutex> & lock,
const MergeTreeTransactionPtr & txn,
@ -951,7 +951,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge(
CurrentlyMergingPartsTaggerPtr merging_tagger;
MergeList::EntryPtr merge_entry;
auto can_merge = [this, &lock](const DataPartPtr & left, const DataPartPtr & right, const MergeTreeTransaction * tx, String & disable_reason) -> bool
auto can_merge = [this, &lock](const DataPartPtr & left, const DataPartPtr & right, const MergeTreeTransaction * tx, PreformattedMessage & disable_reason) -> bool
{
if (tx)
{
@ -960,7 +960,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge(
if ((left && !left->version.isVisible(tx->getSnapshot(), Tx::EmptyTID))
|| (right && !right->version.isVisible(tx->getSnapshot(), Tx::EmptyTID)))
{
disable_reason = "Some part is not visible in transaction";
disable_reason = PreformattedMessage::create("Some part is not visible in transaction");
return false;
}
@ -968,7 +968,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge(
if ((left && left->version.isRemovalTIDLocked())
|| (right && right->version.isRemovalTIDLocked()))
{
disable_reason = "Some part is locked for removal in another cuncurrent transaction";
disable_reason = PreformattedMessage::create("Some part is locked for removal in another cuncurrent transaction");
return false;
}
}
@ -979,7 +979,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge(
{
if (currently_merging_mutating_parts.contains(right))
{
disable_reason = "Some part currently in a merging or mutating process";
disable_reason = PreformattedMessage::create("Some part currently in a merging or mutating process");
return false;
}
else
@ -988,13 +988,13 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge(
if (currently_merging_mutating_parts.contains(left) || currently_merging_mutating_parts.contains(right))
{
disable_reason = "Some part currently in a merging or mutating process";
disable_reason = PreformattedMessage::create("Some part currently in a merging or mutating process");
return false;
}
if (getCurrentMutationVersion(left, lock) != getCurrentMutationVersion(right, lock))
{
disable_reason = "Some parts have different mutation version";
disable_reason = PreformattedMessage::create("Some parts have different mutation version");
return false;
}
@ -1004,7 +1004,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge(
auto max_possible_level = getMaxLevelInBetween(left, right);
if (max_possible_level > std::max(left->info.level, right->info.level))
{
disable_reason = fmt::format("There is an outdated part in a gap between two active parts ({}, {}) with merge level {} higher than these active parts have", left->name, right->name, max_possible_level);
disable_reason = PreformattedMessage::create("There is an outdated part in a gap between two active parts ({}, {}) with merge level {} higher than these active parts have", left->name, right->name, max_possible_level);
return false;
}
@ -1013,11 +1013,11 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge(
SelectPartsDecision select_decision = SelectPartsDecision::CANNOT_SELECT;
auto is_background_memory_usage_ok = [](String & disable_reason) -> bool
auto is_background_memory_usage_ok = [](PreformattedMessage & disable_reason) -> bool
{
if (canEnqueueBackgroundTask())
return true;
disable_reason = fmt::format("Current background tasks memory usage ({}) is more than the limit ({})",
disable_reason = PreformattedMessage::create("Current background tasks memory usage ({}) is more than the limit ({})",
formatReadableSizeWithBinarySuffix(background_memory_tracker.get()),
formatReadableSizeWithBinarySuffix(background_memory_tracker.getSoftLimit()));
return false;
@ -1045,7 +1045,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge(
out_disable_reason);
}
else
out_disable_reason = "Current value of max_source_parts_size is zero";
out_disable_reason = PreformattedMessage::create("Current value of max_source_parts_size is zero");
}
}
else
@ -1086,7 +1086,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge(
if (std::cv_status::timeout == currently_processing_in_background_condition.wait_for(lock, timeout))
{
out_disable_reason = fmt::format("Timeout ({} ms) while waiting for already running merges before running OPTIMIZE with FINAL", timeout_ms);
out_disable_reason = PreformattedMessage::create("Timeout ({} ms) while waiting for already running merges before running OPTIMIZE with FINAL", timeout_ms);
break;
}
}
@ -1102,9 +1102,9 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge(
if (select_decision != SelectPartsDecision::SELECTED)
{
if (!out_disable_reason.empty())
out_disable_reason += ". ";
out_disable_reason += "Cannot select parts for optimization";
if (!out_disable_reason.text.empty())
out_disable_reason.text += ". ";
out_disable_reason.text += "Cannot select parts for optimization";
return {};
}
@ -1125,7 +1125,7 @@ bool StorageMergeTree::merge(
const Names & deduplicate_by_columns,
bool cleanup,
const MergeTreeTransactionPtr & txn,
String & out_disable_reason,
PreformattedMessage & out_disable_reason,
bool optimize_skip_merged_partitions)
{
auto table_lock_holder = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
@ -1180,7 +1180,7 @@ bool StorageMergeTree::partIsAssignedToBackgroundOperation(const DataPartPtr & p
}
MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate(
const StorageMetadataPtr & metadata_snapshot, String & /* disable_reason */, TableLockHolder & /* table_lock_holder */,
const StorageMetadataPtr & metadata_snapshot, PreformattedMessage & /* disable_reason */, TableLockHolder & /* table_lock_holder */,
std::unique_lock<std::mutex> & /*currently_processing_in_background_mutex_lock*/)
{
if (current_mutations_by_version.empty())
@ -1396,7 +1396,7 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign
if (merger_mutator.merges_blocker.isCancelled())
return false;
String out_reason;
PreformattedMessage out_reason;
merge_entry = selectPartsToMerge(metadata_snapshot, false, {}, false, out_reason, shared_lock, lock, txn);
if (!merge_entry && !current_mutations_by_version.empty())
@ -1559,14 +1559,12 @@ bool StorageMergeTree::optimize(
auto txn = local_context->getCurrentTransaction();
String disable_reason;
PreformattedMessage disable_reason;
if (!partition && final)
{
if (cleanup && this->merging_params.mode != MergingParams::Mode::Replacing)
{
constexpr const char * message = "Cannot OPTIMIZE with CLEANUP table: {}";
disable_reason = "only ReplacingMergeTree can be CLEANUP";
throw Exception(ErrorCodes::CANNOT_ASSIGN_OPTIMIZE, message, disable_reason);
throw Exception(ErrorCodes::CANNOT_ASSIGN_OPTIMIZE, "Cannot OPTIMIZE with CLEANUP table: only ReplacingMergeTree can be CLEANUP");
}
if (cleanup && !getSettings()->allow_experimental_replacing_merge_with_cleanup)
@ -1592,12 +1590,12 @@ bool StorageMergeTree::optimize(
local_context->getSettingsRef().optimize_skip_merged_partitions))
{
constexpr auto message = "Cannot OPTIMIZE table: {}";
if (disable_reason.empty())
disable_reason = "unknown reason";
LOG_INFO(log, message, disable_reason);
if (disable_reason.text.empty())
disable_reason = PreformattedMessage::create("unknown reason");
LOG_INFO(log, message, disable_reason.text);
if (local_context->getSettingsRef().optimize_throw_if_noop)
throw Exception(ErrorCodes::CANNOT_ASSIGN_OPTIMIZE, message, disable_reason);
throw Exception(ErrorCodes::CANNOT_ASSIGN_OPTIMIZE, message, disable_reason.text);
return false;
}
}
@ -1620,12 +1618,12 @@ bool StorageMergeTree::optimize(
local_context->getSettingsRef().optimize_skip_merged_partitions))
{
constexpr auto message = "Cannot OPTIMIZE table: {}";
if (disable_reason.empty())
disable_reason = "unknown reason";
LOG_INFO(log, message, disable_reason);
if (disable_reason.text.empty())
disable_reason = PreformattedMessage::create("unknown reason");
LOG_INFO(log, message, disable_reason.text);
if (local_context->getSettingsRef().optimize_throw_if_noop)
throw Exception(ErrorCodes::CANNOT_ASSIGN_OPTIMIZE, message, disable_reason);
throw Exception(ErrorCodes::CANNOT_ASSIGN_OPTIMIZE, message, disable_reason.text);
return false;
}
}
@ -2024,7 +2022,7 @@ PartitionCommandsResultInfo StorageMergeTree::attachPartition(
bool attach_part, ContextPtr local_context)
{
PartitionCommandsResultInfo results;
PartsTemporaryRename renamed_parts(*this, "detached/");
PartsTemporaryRename renamed_parts(*this, DETACHED_DIR_NAME);
MutableDataPartsVector loaded_parts = tryLoadPartsToAttach(partition, attach_part, local_context, renamed_parts);
for (size_t i = 0; i < loaded_parts.size(); ++i)

View File

@ -175,7 +175,7 @@ private:
const Names & deduplicate_by_columns,
bool cleanup,
const MergeTreeTransactionPtr & txn,
String & out_disable_reason,
PreformattedMessage & out_disable_reason,
bool optimize_skip_merged_partitions = false);
void renameAndCommitEmptyParts(MutableDataPartsVector & new_parts, Transaction & transaction);
@ -202,7 +202,7 @@ private:
bool aggressive,
const String & partition_id,
bool final,
String & disable_reason,
PreformattedMessage & disable_reason,
TableLockHolder & table_lock_holder,
std::unique_lock<std::mutex> & lock,
const MergeTreeTransactionPtr & txn,
@ -211,7 +211,7 @@ private:
MergeMutateSelectedEntryPtr selectPartsToMutate(
const StorageMetadataPtr & metadata_snapshot, String & disable_reason,
const StorageMetadataPtr & metadata_snapshot, PreformattedMessage & disable_reason,
TableLockHolder & table_lock_holder, std::unique_lock<std::mutex> & currently_processing_in_background_mutex_lock);
/// For current mutations queue, returns maximum version of mutation for a part,

View File

@ -1983,7 +1983,7 @@ MergeTreeData::MutableDataPartPtr StorageReplicatedMergeTree::attachPartHelperFo
for (const DiskPtr & disk : getStoragePolicy()->getDisks())
{
for (const auto it = disk->iterateDirectory(fs::path(relative_data_path) / "detached/"); it->isValid(); it->next())
for (const auto it = disk->iterateDirectory(fs::path(relative_data_path) / DETACHED_DIR_NAME); it->isValid(); it->next())
{
const auto part_info = MergeTreePartInfo::tryParsePartName(it->name(), format_version);
@ -1993,7 +1993,7 @@ MergeTreeData::MutableDataPartPtr StorageReplicatedMergeTree::attachPartHelperFo
const auto part_old_name = part_info->getPartNameV1();
const auto volume = std::make_shared<SingleDiskVolume>("volume_" + part_old_name, disk);
auto part = getDataPartBuilder(entry.new_part_name, volume, fs::path("detached") / part_old_name)
auto part = getDataPartBuilder(entry.new_part_name, volume, fs::path(DETACHED_DIR_NAME) / part_old_name)
.withPartFormatFromDisk()
.build();
@ -2440,7 +2440,7 @@ void StorageReplicatedMergeTree::executeDropRange(const LogEntry & entry)
{
String part_dir = part_to_detach->getDataPartStorage().getPartDirectory();
LOG_INFO(log, "Detaching {}", part_dir);
auto holder = getTemporaryPartDirectoryHolder(String(DETACHED_DIR_NAME) + "/" + part_dir);
auto holder = getTemporaryPartDirectoryHolder(fs::path(DETACHED_DIR_NAME) / part_dir);
part_to_detach->makeCloneInDetached("", metadata_snapshot, /*disk_transaction*/ {});
}
}
@ -2967,7 +2967,7 @@ void StorageReplicatedMergeTree::executeClonePartFromShard(const LogEntry & entr
part = get_part();
// The fetched part is valuable and should not be cleaned like a temp part.
part->is_temp = false;
part->renameTo("detached/" + entry.new_part_name, true);
part->renameTo(fs::path(DETACHED_DIR_NAME) / entry.new_part_name, true);
LOG_INFO(log, "Cloned part {} to detached directory", part->name);
}
@ -3832,7 +3832,7 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
merge_pred.emplace(queue.getMergePredicate(zookeeper, partitions_to_merge_in));
}
String out_reason;
PreformattedMessage out_reason;
if (can_assign_merge &&
merger_mutator.selectPartsToMerge(future_merged_part, false, max_source_parts_size_for_merge, *merge_pred,
merge_with_ttl_allowed, NO_TRANSACTION_PTR, out_reason, &partitions_to_merge_in) == SelectPartsDecision::SELECTED)
@ -4987,7 +4987,7 @@ bool StorageReplicatedMergeTree::fetchPart(
{
// The fetched part is valuable and should not be cleaned like a temp part.
part->is_temp = false;
part->renameTo(fs::path("detached") / part_name, true);
part->renameTo(fs::path(DETACHED_DIR_NAME) / part_name, true);
}
}
catch (const Exception & e)
@ -5814,7 +5814,7 @@ bool StorageReplicatedMergeTree::optimize(
future_merged_part->uuid = UUIDHelpers::generateV4();
constexpr const char * unknown_disable_reason = "unknown reason";
String disable_reason = unknown_disable_reason;
PreformattedMessage disable_reason = PreformattedMessage::create(unknown_disable_reason);
SelectPartsDecision select_decision = SelectPartsDecision::CANNOT_SELECT;
if (partition_id.empty())
@ -5837,10 +5837,10 @@ bool StorageReplicatedMergeTree::optimize(
if (select_decision != SelectPartsDecision::SELECTED)
{
constexpr const char * message_fmt = "Cannot select parts for optimization: {}";
assert(disable_reason != unknown_disable_reason);
assert(disable_reason.text != unknown_disable_reason);
if (!partition_id.empty())
disable_reason += fmt::format(" (in partition {})", partition_id);
return handle_noop(message_fmt, disable_reason);
disable_reason.text += fmt::format(" (in partition {})", partition_id);
return handle_noop(message_fmt, disable_reason.text);
}
ReplicatedMergeTreeLogEntryData merge_entry;
@ -6547,7 +6547,7 @@ PartitionCommandsResultInfo StorageReplicatedMergeTree::attachPartition(
assertNotReadonly();
PartitionCommandsResultInfo results;
PartsTemporaryRename renamed_parts(*this, "detached/");
PartsTemporaryRename renamed_parts(*this, DETACHED_DIR_NAME);
MutableDataPartsVector loaded_parts = tryLoadPartsToAttach(partition, attach_part, query_context, renamed_parts);
/// TODO Allow to use quorum here.
@ -7022,7 +7022,7 @@ void StorageReplicatedMergeTree::getStatus(ReplicatedTableStatus & res, bool wit
}
res.log_pointer = log_pointer_str.empty() ? 0 : parse<UInt64>(log_pointer_str);
res.total_replicas = all_replicas.size();
res.total_replicas = UInt32(all_replicas.size());
if (get_result[1].error == Coordination::Error::ZNONODE)
res.lost_part_count = 0;
else
@ -8509,9 +8509,9 @@ void StorageReplicatedMergeTree::movePartitionToShard(
}
/// canMergeSinglePart is overlapping with dropPart, let's try to use the same code.
String out_reason;
PreformattedMessage out_reason;
if (!merge_pred.canMergeSinglePart(part, out_reason))
throw Exception(ErrorCodes::PART_IS_TEMPORARILY_LOCKED, "Part is busy, reason: {}", out_reason);
throw Exception(ErrorCodes::PART_IS_TEMPORARILY_LOCKED, "Part is busy, reason: {}", out_reason.text);
}
{
@ -8769,18 +8769,18 @@ bool StorageReplicatedMergeTree::dropPartImpl(
/// There isn't a lot we can do otherwise. Can't cancel merges because it is possible that a replica already
/// finished the merge.
String out_reason;
PreformattedMessage out_reason;
if (!merge_pred.canMergeSinglePart(part, out_reason))
{
if (throw_if_noop)
throw Exception::createDeprecated(out_reason, ErrorCodes::PART_IS_TEMPORARILY_LOCKED);
throw Exception(out_reason, ErrorCodes::PART_IS_TEMPORARILY_LOCKED);
return false;
}
if (merge_pred.partParticipatesInReplaceRange(part, out_reason))
{
if (throw_if_noop)
throw Exception::createDeprecated(out_reason, ErrorCodes::PART_IS_TEMPORARILY_LOCKED);
throw Exception(out_reason, ErrorCodes::PART_IS_TEMPORARILY_LOCKED);
return false;
}
@ -9986,7 +9986,7 @@ bool StorageReplicatedMergeTree::checkIfDetachedPartExists(const String & part_n
{
fs::directory_iterator dir_end;
for (const std::string & path : getDataPaths())
for (fs::directory_iterator dir_it{fs::path(path) / "detached/"}; dir_it != dir_end; ++dir_it)
for (fs::directory_iterator dir_it{fs::path(path) / DETACHED_DIR_NAME}; dir_it != dir_end; ++dir_it)
if (dir_it->path().filename().string() == part_name)
return true;
return false;
@ -9999,7 +9999,7 @@ bool StorageReplicatedMergeTree::checkIfDetachedPartitionExists(const String & p
for (const std::string & path : getDataPaths())
{
for (fs::directory_iterator dir_it{fs::path(path) / "detached/"}; dir_it != dir_end; ++dir_it)
for (fs::directory_iterator dir_it{fs::path(path) / DETACHED_DIR_NAME}; dir_it != dir_end; ++dir_it)
{
const String file_name = dir_it->path().filename().string();
auto part_info = MergeTreePartInfo::tryParsePartName(file_name, format_version);

View File

@ -439,6 +439,7 @@ bool StorageWindowView::optimize(
bool cleanup,
ContextPtr local_context)
{
throwIfWindowViewIsDisabled(local_context);
auto storage_ptr = getInnerTable();
auto metadata_snapshot = storage_ptr->getInMemoryMetadataPtr();
return getInnerTable()->optimize(query, metadata_snapshot, partition, final, deduplicate, deduplicate_by_columns, cleanup, local_context);
@ -449,6 +450,7 @@ void StorageWindowView::alter(
ContextPtr local_context,
AlterLockHolder &)
{
throwIfWindowViewIsDisabled(local_context);
auto table_id = getStorageID();
StorageInMemoryMetadata new_metadata = getInMemoryMetadata();
StorageInMemoryMetadata old_metadata = getInMemoryMetadata();
@ -508,8 +510,9 @@ void StorageWindowView::alter(
startup();
}
void StorageWindowView::checkAlterIsPossible(const AlterCommands & commands, ContextPtr /*local_context*/) const
void StorageWindowView::checkAlterIsPossible(const AlterCommands & commands, ContextPtr local_context) const
{
throwIfWindowViewIsDisabled(local_context);
for (const auto & command : commands)
{
if (!command.isCommentAlter() && command.type != AlterCommand::MODIFY_QUERY)
@ -519,6 +522,7 @@ void StorageWindowView::checkAlterIsPossible(const AlterCommands & commands, Con
std::pair<BlocksPtr, Block> StorageWindowView::getNewBlocks(UInt32 watermark)
{
throwIfWindowViewIsDisabled();
UInt32 w_start = addTime(watermark, window_kind, -window_num_units, *time_zone);
auto inner_table = getInnerTable();
@ -654,6 +658,7 @@ std::pair<BlocksPtr, Block> StorageWindowView::getNewBlocks(UInt32 watermark)
inline void StorageWindowView::fire(UInt32 watermark)
{
throwIfWindowViewIsDisabled();
LOG_TRACE(log, "Watch streams number: {}, target table: {}",
watch_streams.size(),
target_table_id.empty() ? "None" : target_table_id.getNameForLogs());
@ -722,6 +727,7 @@ inline void StorageWindowView::fire(UInt32 watermark)
ASTPtr StorageWindowView::getSourceTableSelectQuery()
{
throwIfWindowViewIsDisabled();
auto query = select_query->clone();
auto & modified_select = query->as<ASTSelectQuery &>();
@ -947,6 +953,7 @@ UInt32 StorageWindowView::getWindowUpperBound(UInt32 time_sec)
void StorageWindowView::addFireSignal(std::set<UInt32> & signals)
{
throwIfWindowViewIsDisabled();
std::lock_guard lock(fire_signal_mutex);
for (const auto & signal : signals)
fire_signal.push_back(signal);
@ -962,6 +969,7 @@ void StorageWindowView::updateMaxTimestamp(UInt32 timestamp)
void StorageWindowView::updateMaxWatermark(UInt32 watermark)
{
throwIfWindowViewIsDisabled();
if (is_proctime)
{
max_watermark = watermark;
@ -1014,6 +1022,7 @@ void StorageWindowView::cleanup()
void StorageWindowView::threadFuncCleanup()
{
throwIfWindowViewIsDisabled();
if (shutdown_called)
return;
@ -1033,6 +1042,7 @@ void StorageWindowView::threadFuncCleanup()
void StorageWindowView::threadFuncFireProc()
{
throwIfWindowViewIsDisabled();
if (shutdown_called)
return;
@ -1069,6 +1079,7 @@ void StorageWindowView::threadFuncFireProc()
void StorageWindowView::threadFuncFireEvent()
{
throwIfWindowViewIsDisabled();
std::lock_guard lock(fire_signal_mutex);
LOG_TRACE(log, "Fire events: {}", fire_signal.size());
@ -1100,6 +1111,7 @@ void StorageWindowView::read(
const size_t max_block_size,
const size_t num_streams)
{
throwIfWindowViewIsDisabled(local_context);
if (target_table_id.empty())
return;
@ -1140,6 +1152,7 @@ Pipe StorageWindowView::watch(
size_t /*max_block_size*/,
const size_t /*num_streams*/)
{
throwIfWindowViewIsDisabled(local_context);
ASTWatchQuery & query = typeid_cast<ASTWatchQuery &>(*query_info.query);
bool has_limit = false;
@ -1178,8 +1191,10 @@ StorageWindowView::StorageWindowView(
, clean_interval_usec(context_->getSettingsRef().window_view_clean_interval.totalMicroseconds())
{
if (context_->getSettingsRef().allow_experimental_analyzer)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
"Experimental WINDOW VIEW feature is not supported with new infrastructure for query analysis (the setting 'allow_experimental_analyzer')");
disabled_due_to_analyzer = true;
if (mode <= LoadingStrictnessLevel::CREATE)
throwIfWindowViewIsDisabled();
if (!query.select)
throw Exception(ErrorCodes::INCORRECT_QUERY, "SELECT query is not specified for {}", getName());
@ -1243,6 +1258,9 @@ StorageWindowView::StorageWindowView(
}
}
if (disabled_due_to_analyzer)
return;
clean_cache_task = getContext()->getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncCleanup(); });
fire_task = getContext()->getSchedulePool().createTask(
getStorageID().getFullTableName(), [this] { is_proctime ? threadFuncFireProc() : threadFuncFireEvent(); });
@ -1400,6 +1418,7 @@ void StorageWindowView::eventTimeParser(const ASTCreateQuery & query)
void StorageWindowView::writeIntoWindowView(
StorageWindowView & window_view, const Block & block, ContextPtr local_context)
{
window_view.throwIfWindowViewIsDisabled(local_context);
while (window_view.modifying_query)
std::this_thread::sleep_for(std::chrono::milliseconds(100));
@ -1589,6 +1608,9 @@ void StorageWindowView::writeIntoWindowView(
void StorageWindowView::startup()
{
if (disabled_due_to_analyzer)
return;
DatabaseCatalog::instance().addViewDependency(select_table_id, getStorageID());
fire_task->activate();
@ -1602,6 +1624,8 @@ void StorageWindowView::startup()
void StorageWindowView::shutdown(bool)
{
shutdown_called = true;
if (disabled_due_to_analyzer)
return;
fire_condition.notify_all();
@ -1657,6 +1681,7 @@ Block StorageWindowView::getInputHeader() const
const Block & StorageWindowView::getOutputHeader() const
{
throwIfWindowViewIsDisabled();
std::lock_guard lock(sample_block_lock);
if (!output_header)
{
@ -1681,6 +1706,13 @@ StoragePtr StorageWindowView::getTargetTable() const
return DatabaseCatalog::instance().getTable(target_table_id, getContext());
}
void StorageWindowView::throwIfWindowViewIsDisabled(ContextPtr local_context) const
{
if (disabled_due_to_analyzer || (local_context && local_context->getSettingsRef().allow_experimental_analyzer))
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Experimental WINDOW VIEW feature is not supported "
"in the current infrastructure for query analysis (the setting 'allow_experimental_analyzer')");
}
void registerStorageWindowView(StorageFactory & factory)
{
factory.registerStorage("WindowView", [](const StorageFactory::Arguments & args)

View File

@ -271,5 +271,9 @@ private:
StoragePtr getSourceTable() const;
StoragePtr getInnerTable() const;
StoragePtr getTargetTable() const;
bool disabled_due_to_analyzer = false;
void throwIfWindowViewIsDisabled(ContextPtr local_context = nullptr) const;
};
}

View File

@ -318,7 +318,7 @@ class CiCache:
self.update()
if self.cache_data_fetched:
# there are no record w/o underling data - no need to fetch
# there are no records without fetched data - no need to fetch
return self
# clean up
@ -773,6 +773,7 @@ class CiOptions:
not pr_info.is_pr() and not debug_message
): # if commit_message is provided it's test/debug scenario - do not return
# CI options can be configured in PRs only
# if debug_message is provided - it's a test
return res
message = debug_message or GitRunner(set_cwd_to_git_root=True).run(
f"{GIT_PREFIX} log {pr_info.sha} --format=%B -n 1"
@ -790,9 +791,9 @@ class CiOptions:
print(f"CI tags from PR body: [{matches_pr}]")
matches = list(set(matches + matches_pr))
if "do not test" in pr_info.labels:
# do_not_test could be set in GH labels
res.do_not_test = True
if "do not test" in pr_info.labels:
# do_not_test could be set in GH labels
res.do_not_test = True
for match in matches:
if match.startswith("job_"):

View File

@ -26,6 +26,7 @@ NeedsDataType = Dict[str, Dict[str, Union[str, Dict[str, str]]]]
DIFF_IN_DOCUMENTATION_EXT = [
".html",
".md",
".mdx",
".yml",
".txt",
".css",

View File

@ -131,6 +131,11 @@ def main():
temp_path.mkdir(parents=True, exist_ok=True)
pr_info = PRInfo()
if pr_info.is_merge_queue() and args.push:
print("Auto style fix will be disabled for Merge Queue workflow")
args.push = False
run_cpp_check = True
run_shell_check = True
run_python_check = True

View File

@ -357,8 +357,9 @@ def update_contributors(
# format: " 1016 Alexey Arno"
shortlog = git_runner.run("git shortlog HEAD --summary")
escaping = str.maketrans({"\\": "\\\\", '"': '\\"'})
contributors = sorted(
[c.split(maxsplit=1)[-1].replace('"', r"\"") for c in shortlog.split("\n")],
[c.split(maxsplit=1)[-1].translate(escaping) for c in shortlog.split("\n")],
)
contributors = [f' "{c}",' for c in contributors]

View File

@ -28,6 +28,8 @@ with client(name="client1>", log=log) as client1, client(
client1.expect(prompt)
client2.send("SET allow_experimental_window_view = 1")
client2.expect(prompt)
client2.send("SET allow_experimental_analyzer = 0")
client2.expect(prompt)
client1.send("CREATE DATABASE IF NOT EXISTS 01056_window_view_proc_hop_watch")
client1.expect(prompt)

View File

@ -26,6 +26,8 @@ with client(name="client1>", log=log) as client1, client(
client1.expect(prompt)
client1.send("SET window_view_heartbeat_interval = 1")
client1.expect(prompt)
client2.send("SET allow_experimental_analyzer = 0")
client2.expect(prompt)
client1.send("CREATE DATABASE IF NOT EXISTS db_01059_event_hop_watch_strict_asc")
client1.expect(prompt)

View File

@ -28,6 +28,8 @@ with client(name="client1>", log=log) as client1, client(
client1.expect(prompt)
client2.send("SET allow_experimental_window_view = 1")
client2.expect(prompt)
client2.send("SET allow_experimental_analyzer = 0")
client2.expect(prompt)
client1.send("CREATE DATABASE IF NOT EXISTS 01062_window_view_event_hop_watch_asc")
client1.expect(prompt)

View File

@ -27,6 +27,8 @@ with client(name="client1>", log=log) as client1, client(
client1.expect(prompt)
client2.send("SET allow_experimental_window_view = 1")
client2.expect(prompt)
client2.send("SET allow_experimental_analyzer = 0")
client2.expect(prompt)
client1.send(
"CREATE DATABASE IF NOT EXISTS 01065_window_view_event_hop_watch_bounded"

View File

@ -28,6 +28,8 @@ with client(name="client1>", log=log) as client1, client(
client1.expect(prompt)
client2.send("SET allow_experimental_window_view = 1")
client2.expect(prompt)
client2.send("SET allow_experimental_analyzer = 0")
client2.expect(prompt)
client1.send("CREATE DATABASE 01069_window_view_proc_tumble_watch")
client1.expect(prompt)

View File

@ -28,6 +28,8 @@ with client(name="client1>", log=log) as client1, client(
client1.expect(prompt)
client2.send("SET allow_experimental_window_view = 1")
client2.expect(prompt)
client2.send("SET allow_experimental_analyzer = 0")
client2.expect(prompt)
client1.send("CREATE DATABASE IF NOT EXISTS 01070_window_view_watch_events")
client1.expect(prompt)

View File

@ -28,10 +28,14 @@ with client(name="client1>", log=log) as client1, client(
client1.expect(prompt)
client2.send("SET allow_experimental_window_view = 1")
client2.expect(prompt)
client2.send("SET allow_experimental_analyzer = 0")
client2.expect(prompt)
client3.send("SET allow_experimental_window_view = 1")
client3.expect(prompt)
client3.send("SET window_view_heartbeat_interval = 1")
client3.expect(prompt)
client3.send("SET allow_experimental_analyzer = 0")
client3.expect(prompt)
client1.send("CREATE DATABASE IF NOT EXISTS 01078_window_view_alter_query_watch")
client1.expect(prompt)

View File

@ -27,6 +27,8 @@ with client(name="client1>", log=log) as client1, client(
client1.expect(prompt)
client2.send("SET allow_experimental_window_view = 1")
client2.expect(prompt)
client2.send("SET allow_experimental_analyzer = 0")
client2.expect(prompt)
client1.send("CREATE DATABASE IF NOT EXISTS 01082_window_view_watch_limit")
client1.expect(prompt)

View File

@ -34,4 +34,19 @@ FROM
) AS t
) SETTINGS optimize_uniq_to_count=1;
-- https://github.com/ClickHouse/ClickHouse/issues/62298
DROP TABLE IF EXISTS users;
CREATE TABLE users
(
`id` Int64,
`name` String
)
ENGINE = ReplacingMergeTree
ORDER BY (id, name);
INSERT INTO users VALUES (1, 'pufit'), (1, 'pufit2'), (1, 'pufit3');
SELECT uniqExact(id) FROM ( SELECT id FROM users WHERE id = 1 GROUP BY id, name );
DROP TABLE IF EXISTS users;
DROP TABLE IF EXISTS tags;

View File

@ -9,6 +9,11 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# But cutting it in the result of SHOW CREATE TABLE will be bad for a user.
# That's why we control it with the setting `output_format_pretty_max_value_width_apply_for_single_value`.
# Make sure that system.metric_log exists
${CLICKHOUSE_CLIENT} --query "SELECT 1 FORMAT Null"
${CLICKHOUSE_CLIENT} --query "SYSTEM FLUSH LOGS"
${CLICKHOUSE_CLIENT} --query "SHOW CREATE TABLE system.metric_log" --format Pretty | grep -P '^COMMENT'
${CLICKHOUSE_CLIENT} --query "SHOW CREATE TABLE system.metric_log" --format PrettyCompact | grep -P '^COMMENT'
${CLICKHOUSE_CLIENT} --query "SHOW CREATE TABLE system.metric_log" --format PrettySpace | grep -P '^COMMENT'

View File

@ -4,5 +4,5 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
clickhouse-local --param_rounding 1 --query "SELECT 1 AS x ORDER BY x WITH FILL STEP {rounding:UInt32} SETTINGS allow_experimental_analyzer = 1"
clickhouse-local --param_rounding 1 --query "SELECT 1 AS x ORDER BY x WITH FILL STEP {rounding:UInt32} SETTINGS allow_experimental_analyzer = 0"
${CLICKHOUSE_LOCAL} --param_rounding 1 --query "SELECT 1 AS x ORDER BY x WITH FILL STEP {rounding:UInt32} SETTINGS allow_experimental_analyzer = 1"
${CLICKHOUSE_LOCAL} --param_rounding 1 --query "SELECT 1 AS x ORDER BY x WITH FILL STEP {rounding:UInt32} SETTINGS allow_experimental_analyzer = 0"

View File

@ -0,0 +1,27 @@
CREATE TABLE vecs_Float32 (v Array(Float32)) ENGINE=Memory;
INSERT INTO vecs_Float32
SELECT v FROM (
SELECT
number AS n,
[
rand(n*10), rand(n*10+1), rand(n*10+2), rand(n*10+3), rand(n*10+4), rand(n*10+5), rand(n*10+6), rand(n*10+7), rand(n*10+8), rand(n*10+9),
rand(n*10+10), rand(n*10+11), rand(n*10+12), rand(n*10+13), rand(n*10+14), rand(n*10+15), rand(n*10+16), rand(n*10+17), rand(n*10+18), rand(n*10+19),
rand(n*10+20), rand(n*10+21), rand(n*10+22), rand(n*10+23), rand(n*10+24), rand(n*10+25), rand(n*10+26), rand(n*10+27), rand(n*10+28), rand(n*10+29),
rand(n*10+30), rand(n*10+31), rand(n*10+32), rand(n*10+33), rand(n*10+34), rand(n*10+35), rand(n*10+36), rand(n*10+37), rand(n*10+38), rand(n*10+39),
rand(n*10+40), rand(n*10+41), rand(n*10+42), rand(n*10+43), rand(n*10+44), rand(n*10+45), rand(n*10+46), rand(n*10+47), rand(n*10+48), rand(n*10+49),
rand(n*10+50), rand(n*10+51), rand(n*10+52), rand(n*10+53), rand(n*10+54), rand(n*10+55), rand(n*10+56), rand(n*10+57), rand(n*10+58), rand(n*10+59),
rand(n*10+60), rand(n*10+61), rand(n*10+62), rand(n*10+63), rand(n*10+64), rand(n*10+65), rand(n*10+66), rand(n*10+67), rand(n*10+68), rand(n*10+69),
rand(n*10+70), rand(n*10+71), rand(n*10+72), rand(n*10+73), rand(n*10+74), rand(n*10+75), rand(n*10+76), rand(n*10+77), rand(n*10+78), rand(n*10+79),
rand(n*10+80), rand(n*10+81), rand(n*10+82), rand(n*10+83), rand(n*10+84), rand(n*10+85), rand(n*10+86), rand(n*10+87), rand(n*10+88), rand(n*10+89),
rand(n*10+90), rand(n*10+91), rand(n*10+92), rand(n*10+93), rand(n*10+94), rand(n*10+95), rand(n*10+96), rand(n*10+97), rand(n*10+98), rand(n*10+99),
rand(n*10+100), rand(n*10+101), rand(n*10+102), rand(n*10+103), rand(n*10+104), rand(n*10+105), rand(n*10+106), rand(n*10+107), rand(n*10+108), rand(n*10+109),
rand(n*10+110), rand(n*10+111), rand(n*10+112), rand(n*10+113), rand(n*10+114), rand(n*10+115), rand(n*10+116), rand(n*10+117), rand(n*10+118), rand(n*10+119),
rand(n*10+120), rand(n*10+121), rand(n*10+122), rand(n*10+123), rand(n*10+124), rand(n*10+125), rand(n*10+126), rand(n*10+127), rand(n*10+128), rand(n*10+129),
rand(n*10+130), rand(n*10+131), rand(n*10+132), rand(n*10+133), rand(n*10+134), rand(n*10+135), rand(n*10+136), rand(n*10+137), rand(n*10+138), rand(n*10+139),
rand(n*10+140), rand(n*10+141), rand(n*10+142), rand(n*10+143), rand(n*10+144), rand(n*10+145), rand(n*10+146), rand(n*10+147), rand(n*10+148), rand(n*10+149)
] AS v
FROM system.numbers
LIMIT 10
);
WITH (SELECT v FROM vecs_Float32 limit 1) AS a SELECT count(dp) FROM (SELECT dotProduct(a, v) AS dp FROM vecs_Float32);

View File

@ -0,0 +1,19 @@
create table t(a UInt32, b UInt32) engine=MergeTree order by (a, b) settings index_granularity=1;
system stop merges t;
-- for this part the first columns is useless, so we have to use both
insert into t select 42, number from numbers_mt(100);
-- for this part the first columns is enough
insert into t select number, number from numbers_mt(100);
-- force reloading index
detach table t;
attach table t;
set merge_tree_min_bytes_for_concurrent_read=1, merge_tree_min_rows_for_concurrent_read=1, merge_tree_read_split_ranges_into_intersecting_and_non_intersecting_injection_probability=1.0, max_threads=4;
-- the bug happened when we used (a, b) index values for one part and only (a) for another in PartsSplitter. even a simple count query is enough,
-- because some granules were assinged to wrong layers and hence not returned from the reading step (because they were filtered out by `FilterSortedStreamByRange`)
select count() from t where not ignore(*);

View File

@ -0,0 +1,40 @@
Parquet
a UInt64
a_nullable Nullable(UInt64)
Arrow
a UInt64
a_nullable Nullable(UInt64)
Parquet
b Array(Nullable(UInt64))
b_nullable Array(Nullable(UInt64))
Arrow
b Array(Nullable(UInt64))
b_nullable Array(Nullable(UInt64))
Parquet
c Tuple(\n a UInt64,\n b String)
c_nullable Tuple(\n a Nullable(UInt64),\n b Nullable(String))
Arrow
c Tuple(\n a UInt64,\n b String)
c_nullable Tuple(\n a Nullable(UInt64),\n b Nullable(String))
Parquet
d Tuple(\n a UInt64,\n b Tuple(\n a UInt64,\n b String),\n d_nullable Tuple(\n a UInt64,\n b Tuple(\n a Nullable(UInt64),\n b Nullable(String))))
Arrow
d Tuple(\n a UInt64,\n b Tuple(\n a UInt64,\n b String),\n d_nullable Tuple(\n a UInt64,\n b Tuple(\n a Nullable(UInt64),\n b Nullable(String))))
Parquet
e Map(UInt64, Nullable(String))
e_nullable Map(UInt64, Nullable(String))
Arrow
e Map(UInt64, Nullable(String))
e_nullable Map(UInt64, Nullable(String))
Parquet
f Map(UInt64, Map(UInt64, Nullable(String)))
f_nullables Map(UInt64, Map(UInt64, Nullable(String)))
Arrow
f Map(UInt64, Map(UInt64, Nullable(String)))
f_nullables Map(UInt64, Map(UInt64, Nullable(String)))
Parquet
g String
g_nullable Nullable(String)
Arrow
g LowCardinality(String)
g_nullable LowCardinality(String)

View File

@ -0,0 +1,63 @@
#!/usr/bin/env bash
# Tags: no-fasttest
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
DATA_FILE=$CLICKHOUSE_TEST_UNIQUE_NAME.data
formats="Parquet Arrow"
for format in $formats
do
echo $format
$CLICKHOUSE_LOCAL -q "select * from generateRandom('a UInt64, a_nullable Nullable(UInt64)', 42) limit 10 format $format" > $DATA_FILE
$CLICKHOUSE_LOCAL -q "desc file('$DATA_FILE') SETTINGS schema_inference_make_columns_nullable = 0"
done
for format in $formats
do
echo $format
$CLICKHOUSE_LOCAL -q "select * from generateRandom('b Array(UInt64), b_nullable Array(Nullable(UInt64))', 42) limit 10 format $format" > $DATA_FILE
$CLICKHOUSE_LOCAL -q "desc file('$DATA_FILE') SETTINGS schema_inference_make_columns_nullable = 0"
done
for format in $formats
do
echo $format
$CLICKHOUSE_LOCAL -q "select * from generateRandom('c Tuple(a UInt64, b String), c_nullable Tuple(a Nullable(UInt64), b Nullable(String))', 42) limit 10 format $format" > $DATA_FILE
$CLICKHOUSE_LOCAL -q "desc file('$DATA_FILE') SETTINGS schema_inference_make_columns_nullable = 0"
done
for format in $formats
do
echo $format
$CLICKHOUSE_LOCAL -q "select * from generateRandom('d Tuple(a UInt64, b Tuple(a UInt64, b String), d_nullable Tuple(a UInt64, b Tuple(a Nullable(UInt64), b Nullable(String))))', 42) limit 10 format $format" > $DATA_FILE
$CLICKHOUSE_LOCAL -q "desc file('$DATA_FILE') SETTINGS schema_inference_make_columns_nullable = 0"
done
for format in $formats
do
echo $format
$CLICKHOUSE_LOCAL -q "select * from generateRandom('e Map(UInt64, String), e_nullable Map(UInt64, Nullable(String))', 42) limit 10 format $format" > $DATA_FILE
$CLICKHOUSE_LOCAL -q "desc file('$DATA_FILE') SETTINGS schema_inference_make_columns_nullable = 0"
done
for format in $formats
do
echo $format
$CLICKHOUSE_LOCAL -q "select * from generateRandom('f Map(UInt64, Map(UInt64, String)), f_nullables Map(UInt64, Map(UInt64, Nullable(String)))', 42) limit 10 format $format" > $DATA_FILE
$CLICKHOUSE_LOCAL -q "desc file('$DATA_FILE') SETTINGS schema_inference_make_columns_nullable = 0"
done
for format in $formats
do
echo $format
$CLICKHOUSE_LOCAL -q "select * from generateRandom('g LowCardinality(String), g_nullable LowCardinality(Nullable(String))', 42) limit 10 settings output_format_arrow_low_cardinality_as_dictionary=1, allow_suspicious_low_cardinality_types=1 format $format" > $DATA_FILE
$CLICKHOUSE_LOCAL -q "desc file('$DATA_FILE') SETTINGS schema_inference_make_columns_nullable = 0"
done
rm $DATA_FILE

View File

@ -0,0 +1 @@
[4,5,6]

View File

@ -0,0 +1,7 @@
DROP TABLE IF EXISTS t;
CREATE TABLE t (A Array(Int64)) Engine = MergeTree ORDER BY tuple();
INSERT INTO t VALUES ([1,2,3]), ([4,5,6]), ([7,8,9]);
SELECT * FROM t PREWHERE arrayExists(x -> x = 5, A);
DROP TABLE t;

View File

@ -0,0 +1 @@
Unknown function

View File

@ -0,0 +1,15 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS test"
${CLICKHOUSE_CLIENT} --query "CREATE TABLE test (s String) ENGINE = Memory"
# Calling an unknown function should not lead to creation of a 'user_defined' directory in the current directory
${CLICKHOUSE_CLIENT} --query "INSERT INTO test VALUES (xyz('abc'))" 2>&1 | grep -o -F 'Unknown function'
ls -ld user_defined 2> /dev/null
${CLICKHOUSE_CLIENT} --query "DROP TABLE test"

View File

@ -1,3 +1,4 @@
v24.3.2.23-lts 2024-04-03
v24.3.1.2672-lts 2024-03-27
v24.2.2.71-stable 2024-03-15
v24.2.1.2248-stable 2024-02-29

1 v24.3.1.2672-lts v24.3.2.23-lts 2024-03-27 2024-04-03
1 v24.3.2.23-lts 2024-04-03
2 v24.3.1.2672-lts v24.3.1.2672-lts 2024-03-27 2024-03-27
3 v24.2.2.71-stable v24.2.2.71-stable 2024-03-15 2024-03-15
4 v24.2.1.2248-stable v24.2.1.2248-stable 2024-02-29 2024-02-29