Merge branch 'sqltest' of github.com:ClickHouse/ClickHouse into sqltest

This commit is contained in:
Alexey Milovidov 2023-08-01 22:33:19 +02:00
commit 3b6247e3f5
173 changed files with 4035 additions and 505 deletions

View File

@ -8,6 +8,7 @@
#include <functional>
#include <iosfwd>
#include <base/defines.h>
#include <base/types.h>
#include <base/unaligned.h>
@ -274,6 +275,8 @@ struct CRC32Hash
if (size == 0)
return 0;
chassert(pos);
if (size < 8)
{
return static_cast<unsigned>(hashLessThan8(x.data, x.size));

View File

@ -115,8 +115,15 @@
/// because SIGABRT is easier to debug than SIGTRAP (the second one makes gdb crazy)
#if !defined(chassert)
#if defined(ABORT_ON_LOGICAL_ERROR)
// clang-format off
#include <base/types.h>
namespace DB
{
void abortOnFailedAssertion(const String & description);
}
#define chassert(x) static_cast<bool>(x) ? void(0) : ::DB::abortOnFailedAssertion(#x)
#define UNREACHABLE() abort()
// clang-format off
#else
/// Here sizeof() trick is used to suppress unused warning for result,
/// since simple "(void)x" will evaluate the expression, while

View File

@ -57,7 +57,7 @@ public:
URI();
/// Creates an empty URI.
explicit URI(const std::string & uri, bool disable_url_encoding = false);
explicit URI(const std::string & uri, bool enable_url_encoding = true);
/// Parses an URI from the given string. Throws a
/// SyntaxException if the uri is not valid.
@ -362,7 +362,7 @@ private:
std::string _query;
std::string _fragment;
bool _disable_url_encoding = false;
bool _enable_url_encoding = true;
};

View File

@ -36,8 +36,8 @@ URI::URI():
}
URI::URI(const std::string& uri, bool decode_and_encode_path):
_port(0), _disable_url_encoding(decode_and_encode_path)
URI::URI(const std::string& uri, bool enable_url_encoding):
_port(0), _enable_url_encoding(enable_url_encoding)
{
parse(uri);
}
@ -108,7 +108,7 @@ URI::URI(const URI& uri):
_path(uri._path),
_query(uri._query),
_fragment(uri._fragment),
_disable_url_encoding(uri._disable_url_encoding)
_enable_url_encoding(uri._enable_url_encoding)
{
}
@ -121,7 +121,7 @@ URI::URI(const URI& baseURI, const std::string& relativeURI):
_path(baseURI._path),
_query(baseURI._query),
_fragment(baseURI._fragment),
_disable_url_encoding(baseURI._disable_url_encoding)
_enable_url_encoding(baseURI._enable_url_encoding)
{
resolve(relativeURI);
}
@ -153,7 +153,7 @@ URI& URI::operator = (const URI& uri)
_path = uri._path;
_query = uri._query;
_fragment = uri._fragment;
_disable_url_encoding = uri._disable_url_encoding;
_enable_url_encoding = uri._enable_url_encoding;
}
return *this;
}
@ -184,7 +184,7 @@ void URI::swap(URI& uri)
std::swap(_path, uri._path);
std::swap(_query, uri._query);
std::swap(_fragment, uri._fragment);
std::swap(_disable_url_encoding, uri._disable_url_encoding);
std::swap(_enable_url_encoding, uri._enable_url_encoding);
}
@ -687,18 +687,18 @@ void URI::decode(const std::string& str, std::string& decodedStr, bool plusAsSpa
void URI::encodePath(std::string & encodedStr) const
{
if (_disable_url_encoding)
encodedStr = _path;
else
if (_enable_url_encoding)
encode(_path, RESERVED_PATH, encodedStr);
else
encodedStr = _path;
}
void URI::decodePath(const std::string & encodedStr)
{
if (_disable_url_encoding)
_path = encodedStr;
else
if (_enable_url_encoding)
decode(encodedStr, _path);
else
_path = encodedStr;
}
bool URI::isWellKnownPort() const

View File

@ -165,5 +165,9 @@
"docker/test/sqltest": {
"name": "clickhouse/sqltest",
"dependent": []
},
"docker/test/integration/nginx_dav": {
"name": "clickhouse/nginx-dav",
"dependent": []
}
}

View File

@ -0,0 +1,6 @@
FROM nginx:alpine-slim
COPY default.conf /etc/nginx/conf.d/
RUN mkdir /usr/share/nginx/files/ \
&& chown nginx: /usr/share/nginx/files/ -R

View File

@ -0,0 +1,25 @@
server {
listen 80;
#root /usr/share/nginx/test.com;
index index.html index.htm;
server_name test.com localhost;
location / {
expires max;
root /usr/share/nginx/files;
client_max_body_size 20m;
client_body_temp_path /usr/share/nginx/tmp;
dav_methods PUT; # Allowed methods, only PUT is necessary
create_full_put_path on; # nginx automatically creates nested directories
dav_access user:rw group:r all:r; # access permissions for files
limit_except GET {
allow all;
}
}
error_page 405 =200 $uri;
}

View File

@ -1,16 +1,15 @@
version: '2.3'
services:
meili1:
image: getmeili/meilisearch:v0.27.0
image: getmeili/meilisearch:v0.27.0
restart: always
ports:
- ${MEILI_EXTERNAL_PORT:-7700}:${MEILI_INTERNAL_PORT:-7700}
meili_secure:
image: getmeili/meilisearch:v0.27.0
image: getmeili/meilisearch:v0.27.0
restart: always
ports:
- ${MEILI_SECURE_EXTERNAL_PORT:-7700}:${MEILI_SECURE_INTERNAL_PORT:-7700}
environment:
MEILI_MASTER_KEY: "password"

View File

@ -9,10 +9,10 @@ services:
DATADIR: /mysql/
expose:
- ${MYSQL_PORT:-3306}
command: --server_id=100
--log-bin='mysql-bin-1.log'
--default-time-zone='+3:00'
--gtid-mode="ON"
command: --server_id=100
--log-bin='mysql-bin-1.log'
--default-time-zone='+3:00'
--gtid-mode="ON"
--enforce-gtid-consistency
--log-error-verbosity=3
--log-error=/mysql/error.log
@ -21,4 +21,4 @@ services:
volumes:
- type: ${MYSQL_LOGS_FS:-tmpfs}
source: ${MYSQL_LOGS:-}
target: /mysql/
target: /mysql/

View File

@ -9,9 +9,9 @@ services:
DATADIR: /mysql/
expose:
- ${MYSQL8_PORT:-3306}
command: --server_id=100 --log-bin='mysql-bin-1.log'
--default_authentication_plugin='mysql_native_password'
--default-time-zone='+3:00' --gtid-mode="ON"
command: --server_id=100 --log-bin='mysql-bin-1.log'
--default_authentication_plugin='mysql_native_password'
--default-time-zone='+3:00' --gtid-mode="ON"
--enforce-gtid-consistency
--log-error-verbosity=3
--log-error=/mysql/error.log
@ -20,4 +20,4 @@ services:
volumes:
- type: ${MYSQL8_LOGS_FS:-tmpfs}
source: ${MYSQL8_LOGS:-}
target: /mysql/
target: /mysql/

View File

@ -9,10 +9,10 @@ services:
DATADIR: /mysql/
expose:
- ${MYSQL_CLUSTER_PORT:-3306}
command: --server_id=100
--log-bin='mysql-bin-2.log'
--default-time-zone='+3:00'
--gtid-mode="ON"
command: --server_id=100
--log-bin='mysql-bin-2.log'
--default-time-zone='+3:00'
--gtid-mode="ON"
--enforce-gtid-consistency
--log-error-verbosity=3
--log-error=/mysql/2_error.log
@ -31,10 +31,10 @@ services:
DATADIR: /mysql/
expose:
- ${MYSQL_CLUSTER_PORT:-3306}
command: --server_id=100
--log-bin='mysql-bin-3.log'
--default-time-zone='+3:00'
--gtid-mode="ON"
command: --server_id=100
--log-bin='mysql-bin-3.log'
--default-time-zone='+3:00'
--gtid-mode="ON"
--enforce-gtid-consistency
--log-error-verbosity=3
--log-error=/mysql/3_error.log
@ -53,10 +53,10 @@ services:
DATADIR: /mysql/
expose:
- ${MYSQL_CLUSTER_PORT:-3306}
command: --server_id=100
--log-bin='mysql-bin-4.log'
--default-time-zone='+3:00'
--gtid-mode="ON"
command: --server_id=100
--log-bin='mysql-bin-4.log'
--default-time-zone='+3:00'
--gtid-mode="ON"
--enforce-gtid-consistency
--log-error-verbosity=3
--log-error=/mysql/4_error.log
@ -65,4 +65,4 @@ services:
volumes:
- type: ${MYSQL_CLUSTER_LOGS_FS:-tmpfs}
source: ${MYSQL_CLUSTER_LOGS:-}
target: /mysql/
target: /mysql/

View File

@ -5,7 +5,7 @@ services:
# Files will be put into /usr/share/nginx/files.
nginx:
image: kssenii/nginx-test:1.1
image: clickhouse/nginx-dav:${DOCKER_NGINX_DAV_TAG:-latest}
restart: always
ports:
- 80:80

View File

@ -12,9 +12,9 @@ services:
timeout: 5s
retries: 5
networks:
default:
aliases:
- postgre-sql.local
default:
aliases:
- postgre-sql.local
environment:
POSTGRES_HOST_AUTH_METHOD: "trust"
POSTGRES_PASSWORD: mysecretpassword

View File

@ -12,7 +12,7 @@ services:
command: ["zkServer.sh", "start-foreground"]
entrypoint: /zookeeper-ssl-entrypoint.sh
volumes:
- type: bind
- type: bind
source: /misc/zookeeper-ssl-entrypoint.sh
target: /zookeeper-ssl-entrypoint.sh
- type: bind
@ -37,7 +37,7 @@ services:
command: ["zkServer.sh", "start-foreground"]
entrypoint: /zookeeper-ssl-entrypoint.sh
volumes:
- type: bind
- type: bind
source: /misc/zookeeper-ssl-entrypoint.sh
target: /zookeeper-ssl-entrypoint.sh
- type: bind
@ -61,7 +61,7 @@ services:
command: ["zkServer.sh", "start-foreground"]
entrypoint: /zookeeper-ssl-entrypoint.sh
volumes:
- type: bind
- type: bind
source: /misc/zookeeper-ssl-entrypoint.sh
target: /zookeeper-ssl-entrypoint.sh
- type: bind

View File

@ -64,15 +64,16 @@ export CLICKHOUSE_ODBC_BRIDGE_BINARY_PATH=/clickhouse-odbc-bridge
export CLICKHOUSE_LIBRARY_BRIDGE_BINARY_PATH=/clickhouse-library-bridge
export DOCKER_BASE_TAG=${DOCKER_BASE_TAG:=latest}
export DOCKER_HELPER_TAG=${DOCKER_HELPER_TAG:=latest}
export DOCKER_MYSQL_GOLANG_CLIENT_TAG=${DOCKER_MYSQL_GOLANG_CLIENT_TAG:=latest}
export DOCKER_DOTNET_CLIENT_TAG=${DOCKER_DOTNET_CLIENT_TAG:=latest}
export DOCKER_HELPER_TAG=${DOCKER_HELPER_TAG:=latest}
export DOCKER_KERBERIZED_HADOOP_TAG=${DOCKER_KERBERIZED_HADOOP_TAG:=latest}
export DOCKER_KERBEROS_KDC_TAG=${DOCKER_KERBEROS_KDC_TAG:=latest}
export DOCKER_MYSQL_GOLANG_CLIENT_TAG=${DOCKER_MYSQL_GOLANG_CLIENT_TAG:=latest}
export DOCKER_MYSQL_JAVA_CLIENT_TAG=${DOCKER_MYSQL_JAVA_CLIENT_TAG:=latest}
export DOCKER_MYSQL_JS_CLIENT_TAG=${DOCKER_MYSQL_JS_CLIENT_TAG:=latest}
export DOCKER_MYSQL_PHP_CLIENT_TAG=${DOCKER_MYSQL_PHP_CLIENT_TAG:=latest}
export DOCKER_NGINX_DAV_TAG=${DOCKER_NGINX_DAV_TAG:=latest}
export DOCKER_POSTGRESQL_JAVA_CLIENT_TAG=${DOCKER_POSTGRESQL_JAVA_CLIENT_TAG:=latest}
export DOCKER_KERBEROS_KDC_TAG=${DOCKER_KERBEROS_KDC_TAG:=latest}
export DOCKER_KERBERIZED_HADOOP_TAG=${DOCKER_KERBERIZED_HADOOP_TAG:=latest}
cd /ClickHouse/tests/integration
exec "$@"

View File

@ -106,4 +106,4 @@ For partitioning by month, use the `toYYYYMM(date_column)` expression, where `da
## Storage Settings {#storage-settings}
- [engine_url_skip_empty_files](/docs/en/operations/settings/settings.md#engine_url_skip_empty_files) - allows to skip empty files while reading. Disabled by default.
- [disable_url_encoding](/docs/en/operations/settings/settings.md#disable_url_encoding) -allows to disable decoding/encoding path in uri. Disabled by default.
- [enable_url_encoding](/docs/en/operations/settings/settings.md#enable_url_encoding) - allows to enable/disable decoding/encoding path in uri. Enabled by default.

View File

@ -1723,6 +1723,34 @@ You can select data from a ClickHouse table and save them into some file in the
``` bash
$ clickhouse-client --query = "SELECT * FROM test.hits FORMAT CapnProto SETTINGS format_schema = 'schema:Message'"
```
### Using autogenerated schema {#using-autogenerated-capn-proto-schema}
If you don't have an external CapnProto schema for your data, you can still output/input data in CapnProto format using autogenerated schema.
For example:
```sql
SELECT * FROM test.hits format CapnProto SETTINGS format_capn_proto_use_autogenerated_schema=1
```
In this case ClickHouse will autogenerate CapnProto schema according to the table structure using function [structureToCapnProtoSchema](../sql-reference/functions/other-functions.md#structure_to_capn_proto_schema) and will use this schema to serialize data in CapnProto format.
You can also read CapnProto file with autogenerated schema (in this case the file must be created using the same schema):
```bash
$ cat hits.bin | clickhouse-client --query "INSERT INTO test.hits SETTINGS format_capn_proto_use_autogenerated_schema=1 FORMAT CapnProto"
```
The setting [format_capn_proto_use_autogenerated_schema](../operations/settings/settings-formats.md#format_capn_proto_use_autogenerated_schema) is enabled by default and applies if [format_schema](../operations/settings/settings-formats.md#formatschema-format-schema) is not set.
You can also save autogenerated schema in the file during input/output using setting [output_format_schema](../operations/settings/settings-formats.md#outputformatschema-output-format-schema). For example:
```sql
SELECT * FROM test.hits format CapnProto SETTINGS format_capn_proto_use_autogenerated_schema=1, output_format_schema='path/to/schema/schema.capnp'
```
In this case autogenerated CapnProto schema will be saved in file `path/to/schema/schema.capnp`.
## Prometheus {#prometheus}
Expose metrics in [Prometheus text-based exposition format](https://prometheus.io/docs/instrumenting/exposition_formats/#text-based-format).
@ -1861,6 +1889,33 @@ ClickHouse inputs and outputs protobuf messages in the `length-delimited` format
It means before every message should be written its length as a [varint](https://developers.google.com/protocol-buffers/docs/encoding#varints).
See also [how to read/write length-delimited protobuf messages in popular languages](https://cwiki.apache.org/confluence/display/GEODE/Delimiting+Protobuf+Messages).
### Using autogenerated schema {#using-autogenerated-protobuf-schema}
If you don't have an external Protobuf schema for your data, you can still output/input data in Protobuf format using autogenerated schema.
For example:
```sql
SELECT * FROM test.hits format Protobuf SETTINGS format_protobuf_use_autogenerated_schema=1
```
In this case ClickHouse will autogenerate Protobuf schema according to the table structure using function [structureToProtobufSchema](../sql-reference/functions/other-functions.md#structure_to_protobuf_schema) and will use this schema to serialize data in Protobuf format.
You can also read Protobuf file with autogenerated schema (in this case the file must be created using the same schema):
```bash
$ cat hits.bin | clickhouse-client --query "INSERT INTO test.hits SETTINGS format_protobuf_use_autogenerated_schema=1 FORMAT Protobuf"
```
The setting [format_protobuf_use_autogenerated_schema](../operations/settings/settings-formats.md#format_protobuf_use_autogenerated_schema) is enabled by default and applies if [format_schema](../operations/settings/settings-formats.md#formatschema-format-schema) is not set.
You can also save autogenerated schema in the file during input/output using setting [output_format_schema](../operations/settings/settings-formats.md#outputformatschema-output-format-schema). For example:
```sql
SELECT * FROM test.hits format Protobuf SETTINGS format_protobuf_use_autogenerated_schema=1, output_format_schema='path/to/schema/schema.proto'
```
In this case autogenerated Protobuf schema will be saved in file `path/to/schema/schema.capnp`.
## ProtobufSingle {#protobufsingle}
Same as [Protobuf](#protobuf) but for storing/parsing single Protobuf message without length delimiters.

View File

@ -321,6 +321,10 @@ If both `input_format_allow_errors_num` and `input_format_allow_errors_ratio` ar
This parameter is useful when you are using formats that require a schema definition, such as [Capn Proto](https://capnproto.org/) or [Protobuf](https://developers.google.com/protocol-buffers/). The value depends on the format.
## output_format_schema {#output-format-schema}
The path to the file where the automatically generated schema will be saved in [Capn Proto](../../interfaces/formats.md#capnproto-capnproto) or [Protobuf](../../interfaces/formats.md#protobuf-protobuf) formats.
## output_format_enable_streaming {#output_format_enable_streaming}
Enable streaming in output formats that support it.
@ -1330,6 +1334,11 @@ When serializing Nullable columns with Google wrappers, serialize default values
Disabled by default.
### format_protobuf_use_autogenerated_schema {#format_capn_proto_use_autogenerated_schema}
Use autogenerated Protobuf schema when [format_schema](#formatschema-format-schema) is not set.
The schema is generated from ClickHouse table structure using function [structureToProtobufSchema](../../sql-reference/functions/other-functions.md#structure_to_protobuf_schema)
## Avro format settings {#avro-format-settings}
### input_format_avro_allow_missing_fields {#input_format_avro_allow_missing_fields}
@ -1626,6 +1635,11 @@ Possible values:
Default value: `'by_values'`.
### format_capn_proto_use_autogenerated_schema {#format_capn_proto_use_autogenerated_schema}
Use autogenerated CapnProto schema when [format_schema](#formatschema-format-schema) is not set.
The schema is generated from ClickHouse table structure using function [structureToCapnProtoSchema](../../sql-reference/functions/other-functions.md#structure_to_capnproto_schema)
## MySQLDump format settings {#musqldump-format-settings}
### input_format_mysql_dump_table_name (#input_format_mysql_dump_table_name)

View File

@ -3468,11 +3468,11 @@ Possible values:
Default value: `0`.
## disable_url_encoding {#disable_url_encoding}
## enable_url_encoding {#enable_url_encoding}
Allows to disable decoding/encoding path in uri in [URL](../../engines/table-engines/special/url.md) engine tables.
Allows to enable/disable decoding/encoding path in uri in [URL](../../engines/table-engines/special/url.md) engine tables.
Disabled by default.
Enabled by default.
## database_atomic_wait_for_drop_and_detach_synchronously {#database_atomic_wait_for_drop_and_detach_synchronously}

View File

@ -2552,3 +2552,187 @@ Result:
This function can be used together with [generateRandom](../../sql-reference/table-functions/generate.md) to generate completely random tables.
## structureToCapnProtoSchema {#structure_to_capn_proto_schema}
Converts ClickHouse table structure to CapnProto schema.
**Syntax**
``` sql
structureToCapnProtoSchema(structure)
```
**Arguments**
- `structure` — Table structure in a format `column1_name column1_type, column2_name column2_type, ...`.
- `root_struct_name` — Name for root struct in CapnProto schema. Default value - `Message`;
**Returned value**
- CapnProto schema
Type: [String](../../sql-reference/data-types/string.md).
**Examples**
Query:
``` sql
SELECT structureToCapnProtoSchema('column1 String, column2 UInt32, column3 Array(String)') FORMAT RawBLOB
```
Result:
``` text
@0xf96402dd754d0eb7;
struct Message
{
column1 @0 : Data;
column2 @1 : UInt32;
column3 @2 : List(Data);
}
```
Query:
``` sql
SELECT structureToCapnProtoSchema('column1 Nullable(String), column2 Tuple(element1 UInt32, element2 Array(String)), column3 Map(String, String)') FORMAT RawBLOB
```
Result:
``` text
@0xd1c8320fecad2b7f;
struct Message
{
struct Column1
{
union
{
value @0 : Data;
null @1 : Void;
}
}
column1 @0 : Column1;
struct Column2
{
element1 @0 : UInt32;
element2 @1 : List(Data);
}
column2 @1 : Column2;
struct Column3
{
struct Entry
{
key @0 : Data;
value @1 : Data;
}
entries @0 : List(Entry);
}
column3 @2 : Column3;
}
```
Query:
``` sql
SELECT structureToCapnProtoSchema('column1 String, column2 UInt32', 'Root') FORMAT RawBLOB
```
Result:
``` text
@0x96ab2d4ab133c6e1;
struct Root
{
column1 @0 : Data;
column2 @1 : UInt32;
}
```
## structureToProtobufSchema {#structure_to_protobuf_schema}
Converts ClickHouse table structure to Protobuf schema.
**Syntax**
``` sql
structureToProtobufSchema(structure)
```
**Arguments**
- `structure` — Table structure in a format `column1_name column1_type, column2_name column2_type, ...`.
- `root_message_name` — Name for root message in Protobuf schema. Default value - `Message`;
**Returned value**
- Protobuf schema
Type: [String](../../sql-reference/data-types/string.md).
**Examples**
Query:
``` sql
SELECT structureToProtobufSchema('column1 String, column2 UInt32, column3 Array(String)') FORMAT RawBLOB
```
Result:
``` text
syntax = "proto3";
message Message
{
bytes column1 = 1;
uint32 column2 = 2;
repeated bytes column3 = 3;
}
```
Query:
``` sql
SELECT structureToProtobufSchema('column1 Nullable(String), column2 Tuple(element1 UInt32, element2 Array(String)), column3 Map(String, String)') FORMAT RawBLOB
```
Result:
``` text
syntax = "proto3";
message Message
{
bytes column1 = 1;
message Column2
{
uint32 element1 = 1;
repeated bytes element2 = 2;
}
Column2 column2 = 2;
map<string, bytes> column3 = 3;
}
```
Query:
``` sql
SELECT structureToProtobufSchema('column1 String, column2 UInt32', 'Root') FORMAT RawBLOB
```
Result:
``` text
syntax = "proto3";
message Root
{
bytes column1 = 1;
uint32 column2 = 2;
}
```

View File

@ -56,7 +56,7 @@ Character `|` inside patterns is used to specify failover addresses. They are it
## Storage Settings {#storage-settings}
- [engine_url_skip_empty_files](/docs/en/operations/settings/settings.md#engine_url_skip_empty_files) - allows to skip empty files while reading. Disabled by default.
- [disable_url_encoding](/docs/en/operations/settings/settings.md#disable_url_encoding) - allows to disable decoding/encoding path in uri. Disabled by default.
- [enable_url_encoding](/docs/en/operations/settings/settings.md#enable_url_encoding) - allows to enable/disable decoding/encoding path in uri. Enabled by default.
**See Also**

View File

@ -0,0 +1,221 @@
#include <Analyzer/Passes/OptimizeDateOrDateTimeConverterWithPreimagePass.h>
#include <Functions/FunctionFactory.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Analyzer/ColumnNode.h>
#include <Analyzer/ConstantNode.h>
#include <Analyzer/FunctionNode.h>
#include <Common/DateLUT.h>
#include <Common/DateLUTImpl.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
namespace
{
class OptimizeDateOrDateTimeConverterWithPreimageVisitor : public InDepthQueryTreeVisitorWithContext<OptimizeDateOrDateTimeConverterWithPreimageVisitor>
{
public:
using Base = InDepthQueryTreeVisitorWithContext<OptimizeDateOrDateTimeConverterWithPreimageVisitor>;
explicit OptimizeDateOrDateTimeConverterWithPreimageVisitor(ContextPtr context)
: Base(std::move(context))
{}
static bool needChildVisit(QueryTreeNodePtr & node, QueryTreeNodePtr & /*child*/)
{
const static std::unordered_set<String> relations = {
"equals",
"notEquals",
"less",
"greater",
"lessOrEquals",
"greaterOrEquals",
};
if (const auto * function = node->as<FunctionNode>())
{
return !relations.contains(function->getFunctionName());
}
return true;
}
void enterImpl(QueryTreeNodePtr & node) const
{
const static std::unordered_map<String, String> swap_relations = {
{"equals", "equals"},
{"notEquals", "notEquals"},
{"less", "greater"},
{"greater", "less"},
{"lessOrEquals", "greaterOrEquals"},
{"greaterOrEquals", "lessOrEquals"},
};
const auto * function = node->as<FunctionNode>();
if (!function || !swap_relations.contains(function->getFunctionName())) return;
if (function->getArguments().getNodes().size() != 2) return;
size_t func_id = function->getArguments().getNodes().size();
for (size_t i = 0; i < function->getArguments().getNodes().size(); i++)
{
if (const auto * func = function->getArguments().getNodes()[i]->as<FunctionNode>())
{
func_id = i;
}
}
if (func_id == function->getArguments().getNodes().size()) return;
size_t literal_id = 1 - func_id;
const auto * literal = function->getArguments().getNodes()[literal_id]->as<ConstantNode>();
if (!literal || literal->getValue().getType() != Field::Types::UInt64) return;
String comparator = literal_id > func_id ? function->getFunctionName(): swap_relations.at(function->getFunctionName());
const auto * func_node = function->getArguments().getNodes()[func_id]->as<FunctionNode>();
/// Currently we only handle single-argument functions.
if (!func_node || func_node->getArguments().getNodes().size() != 1) return;
const auto * column_id = func_node->getArguments().getNodes()[0]->as<ColumnNode>();
if (!column_id) return;
const auto * column_type = column_id->getColumnType().get();
if (!isDateOrDate32(column_type) && !isDateTime(column_type) && !isDateTime64(column_type)) return;
const auto & converter = FunctionFactory::instance().tryGet(func_node->getFunctionName(), getContext());
if (!converter) return;
ColumnsWithTypeAndName args;
args.emplace_back(column_id->getColumnType(), "tmp");
auto converter_base = converter->build(args);
if (!converter_base || !converter_base->hasInformationAboutPreimage()) return;
auto preimage_range = converter_base->getPreimage(*(column_id->getColumnType()), literal->getValue());
if (!preimage_range) return;
const auto new_node = generateOptimizedDateFilter(comparator, *column_id, *preimage_range);
if (!new_node) return;
node = new_node;
}
private:
QueryTreeNodePtr generateOptimizedDateFilter(const String & comparator, const ColumnNode & column_node, const std::pair<Field, Field>& range) const
{
const DateLUTImpl & date_lut = DateLUT::instance("UTC");
String start_date_or_date_time;
String end_date_or_date_time;
if (isDateOrDate32(column_node.getColumnType().get()))
{
start_date_or_date_time = date_lut.dateToString(range.first.get<DateLUTImpl::Time>());
end_date_or_date_time = date_lut.dateToString(range.second.get<DateLUTImpl::Time>());
}
else if (isDateTime(column_node.getColumnType().get()) || isDateTime64(column_node.getColumnType().get()))
{
start_date_or_date_time = date_lut.timeToString(range.first.get<DateLUTImpl::Time>());
end_date_or_date_time = date_lut.timeToString(range.second.get<DateLUTImpl::Time>());
}
else [[unlikely]] return {};
if (comparator == "equals")
{
const auto lhs = std::make_shared<FunctionNode>("greaterOrEquals");
lhs->getArguments().getNodes().push_back(std::make_shared<ColumnNode>(column_node.getColumn(), column_node.getColumnSource()));
lhs->getArguments().getNodes().push_back(std::make_shared<ConstantNode>(start_date_or_date_time));
resolveOrdinaryFunctionNode(*lhs, lhs->getFunctionName());
const auto rhs = std::make_shared<FunctionNode>("less");
rhs->getArguments().getNodes().push_back(std::make_shared<ColumnNode>(column_node.getColumn(), column_node.getColumnSource()));
rhs->getArguments().getNodes().push_back(std::make_shared<ConstantNode>(end_date_or_date_time));
resolveOrdinaryFunctionNode(*rhs, rhs->getFunctionName());
const auto new_date_filter = std::make_shared<FunctionNode>("and");
new_date_filter->getArguments().getNodes() = {lhs, rhs};
resolveOrdinaryFunctionNode(*new_date_filter, new_date_filter->getFunctionName());
return new_date_filter;
}
else if (comparator == "notEquals")
{
const auto lhs = std::make_shared<FunctionNode>("less");
lhs->getArguments().getNodes().push_back(std::make_shared<ColumnNode>(column_node.getColumn(), column_node.getColumnSource()));
lhs->getArguments().getNodes().push_back(std::make_shared<ConstantNode>(start_date_or_date_time));
resolveOrdinaryFunctionNode(*lhs, lhs->getFunctionName());
const auto rhs = std::make_shared<FunctionNode>("greaterOrEquals");
rhs->getArguments().getNodes().push_back(std::make_shared<ColumnNode>(column_node.getColumn(), column_node.getColumnSource()));
rhs->getArguments().getNodes().push_back(std::make_shared<ConstantNode>(end_date_or_date_time));
resolveOrdinaryFunctionNode(*rhs, rhs->getFunctionName());
const auto new_date_filter = std::make_shared<FunctionNode>("or");
new_date_filter->getArguments().getNodes() = {lhs, rhs};
resolveOrdinaryFunctionNode(*new_date_filter, new_date_filter->getFunctionName());
return new_date_filter;
}
else if (comparator == "greater")
{
const auto new_date_filter = std::make_shared<FunctionNode>("greaterOrEquals");
new_date_filter->getArguments().getNodes().push_back(std::make_shared<ColumnNode>(column_node.getColumn(), column_node.getColumnSource()));
new_date_filter->getArguments().getNodes().push_back(std::make_shared<ConstantNode>(end_date_or_date_time));
resolveOrdinaryFunctionNode(*new_date_filter, new_date_filter->getFunctionName());
return new_date_filter;
}
else if (comparator == "lessOrEquals")
{
const auto new_date_filter = std::make_shared<FunctionNode>("less");
new_date_filter->getArguments().getNodes().push_back(std::make_shared<ColumnNode>(column_node.getColumn(), column_node.getColumnSource()));
new_date_filter->getArguments().getNodes().push_back(std::make_shared<ConstantNode>(end_date_or_date_time));
resolveOrdinaryFunctionNode(*new_date_filter, new_date_filter->getFunctionName());
return new_date_filter;
}
else if (comparator == "less" || comparator == "greaterOrEquals")
{
const auto new_date_filter = std::make_shared<FunctionNode>(comparator);
new_date_filter->getArguments().getNodes().push_back(std::make_shared<ColumnNode>(column_node.getColumn(), column_node.getColumnSource()));
new_date_filter->getArguments().getNodes().push_back(std::make_shared<ConstantNode>(start_date_or_date_time));
resolveOrdinaryFunctionNode(*new_date_filter, new_date_filter->getFunctionName());
return new_date_filter;
}
else [[unlikely]]
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Expected equals, notEquals, less, lessOrEquals, greater, greaterOrEquals. Actual {}",
comparator);
}
}
void resolveOrdinaryFunctionNode(FunctionNode & function_node, const String & function_name) const
{
auto function = FunctionFactory::instance().get(function_name, getContext());
function_node.resolveAsFunction(function->build(function_node.getArgumentColumns()));
}
};
}
void OptimizeDateOrDateTimeConverterWithPreimagePass::run(QueryTreeNodePtr query_tree_node, ContextPtr context)
{
OptimizeDateOrDateTimeConverterWithPreimageVisitor visitor(std::move(context));
visitor.visit(query_tree_node);
}
}

View File

@ -0,0 +1,24 @@
#pragma once
#include <Analyzer/IQueryTreePass.h>
namespace DB
{
/** Replace predicate having Date/DateTime converters with their preimages to improve performance.
* Given a Date column c, toYear(c) = 2023 -> c >= '2023-01-01' AND c < '2024-01-01'
* Or if c is a DateTime column, toYear(c) = 2023 -> c >= '2023-01-01 00:00:00' AND c < '2024-01-01 00:00:00'.
* The similar optimization also applies to other converters.
*/
class OptimizeDateOrDateTimeConverterWithPreimagePass final : public IQueryTreePass
{
public:
String getName() override { return "OptimizeDateOrDateTimeConverterWithPreimagePass"; }
String getDescription() override { return "Replace predicate having Date/DateTime converters with their preimages"; }
void run(QueryTreeNodePtr query_tree_node, ContextPtr context) override;
};
}

View File

@ -42,6 +42,7 @@
#include <Analyzer/Passes/CrossToInnerJoinPass.h>
#include <Analyzer/Passes/ShardNumColumnToFunctionPass.h>
#include <Analyzer/Passes/ConvertQueryToCNFPass.h>
#include <Analyzer/Passes/OptimizeDateOrDateTimeConverterWithPreimagePass.h>
namespace DB
{
@ -278,6 +279,7 @@ void addQueryTreePasses(QueryTreePassManager & manager)
manager.addPass(std::make_unique<AutoFinalOnQueryPass>());
manager.addPass(std::make_unique<CrossToInnerJoinPass>());
manager.addPass(std::make_unique<ShardNumColumnToFunctionPass>());
manager.addPass(std::make_unique<OptimizeDateOrDateTimeConverterWithPreimagePass>());
}
}

View File

@ -564,15 +564,22 @@ void ColumnNullable::updatePermutationImpl(IColumn::PermutationSortDirection dir
else
getNestedColumn().updatePermutation(direction, stability, limit, null_direction_hint, res, new_ranges);
equal_ranges = std::move(new_ranges);
if (unlikely(stability == PermutationSortStability::Stable))
{
for (auto & null_range : null_ranges)
::sort(res.begin() + null_range.first, res.begin() + null_range.second);
}
std::move(null_ranges.begin(), null_ranges.end(), std::back_inserter(equal_ranges));
if (is_nulls_last || null_ranges.empty())
{
equal_ranges = std::move(new_ranges);
std::move(null_ranges.begin(), null_ranges.end(), std::back_inserter(equal_ranges));
}
else
{
equal_ranges = std::move(null_ranges);
std::move(new_ranges.begin(), new_ranges.end(), std::back_inserter(equal_ranges));
}
}
void ColumnNullable::getPermutation(IColumn::PermutationSortDirection direction, IColumn::PermutationSortStability stability,

View File

@ -439,7 +439,7 @@ void ColumnSparse::compareColumn(const IColumn & rhs, size_t rhs_row_num,
PaddedPODArray<UInt64> * row_indexes, PaddedPODArray<Int8> & compare_results,
int direction, int nan_direction_hint) const
{
if (row_indexes)
if (row_indexes || !typeid_cast<const ColumnSparse *>(&rhs))
{
/// TODO: implement without conversion to full column.
auto this_full = convertToFullColumnIfSparse();

View File

@ -320,8 +320,6 @@ bool KeeperDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & requ
request_info.time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
request_info.session_id = session_id;
std::lock_guard lock(push_request_mutex);
if (shutdown_called)
return false;
@ -423,13 +421,10 @@ void KeeperDispatcher::shutdown()
try
{
{
std::lock_guard lock(push_request_mutex);
if (shutdown_called)
if (shutdown_called.exchange(true))
return;
LOG_DEBUG(log, "Shutting down storage dispatcher");
shutdown_called = true;
if (session_cleaner_thread.joinable())
session_cleaner_thread.join();
@ -582,12 +577,9 @@ void KeeperDispatcher::sessionCleanerTask()
.time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(),
.request = std::move(request),
};
{
std::lock_guard lock(push_request_mutex);
if (!requests_queue->push(std::move(request_info)))
LOG_INFO(log, "Cannot push close request to queue while cleaning outdated sessions");
CurrentMetrics::add(CurrentMetrics::KeeperOutstandingRequets);
}
if (!requests_queue->push(std::move(request_info)))
LOG_INFO(log, "Cannot push close request to queue while cleaning outdated sessions");
CurrentMetrics::add(CurrentMetrics::KeeperOutstandingRequets);
/// Remove session from registered sessions
finishSession(dead_session);
@ -607,6 +599,10 @@ void KeeperDispatcher::sessionCleanerTask()
void KeeperDispatcher::finishSession(int64_t session_id)
{
/// shutdown() method will cleanup sessions if needed
if (shutdown_called)
return;
{
std::lock_guard lock(session_to_response_callback_mutex);
auto session_it = session_to_response_callback.find(session_id);
@ -698,12 +694,9 @@ int64_t KeeperDispatcher::getSessionID(int64_t session_timeout_ms)
}
/// Push new session request to queue
{
std::lock_guard lock(push_request_mutex);
if (!requests_queue->tryPush(std::move(request_info), session_timeout_ms))
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Cannot push session id request to queue within session timeout");
CurrentMetrics::add(CurrentMetrics::KeeperOutstandingRequets);
}
if (!requests_queue->tryPush(std::move(request_info), session_timeout_ms))
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Cannot push session id request to queue within session timeout");
CurrentMetrics::add(CurrentMetrics::KeeperOutstandingRequets);
if (future.wait_for(std::chrono::milliseconds(session_timeout_ms)) != std::future_status::ready)
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Cannot receive session id within session timeout");
@ -871,10 +864,7 @@ uint64_t KeeperDispatcher::getSnapDirSize() const
Keeper4LWInfo KeeperDispatcher::getKeeper4LWInfo() const
{
Keeper4LWInfo result = server->getPartiallyFilled4LWInfo();
{
std::lock_guard lock(push_request_mutex);
result.outstanding_requests_count = requests_queue->size();
}
result.outstanding_requests_count = requests_queue->size();
{
std::lock_guard lock(session_to_response_callback_mutex);
result.alive_connections_count = session_to_response_callback.size();

View File

@ -27,8 +27,6 @@ using ZooKeeperResponseCallback = std::function<void(const Coordination::ZooKeep
class KeeperDispatcher
{
private:
mutable std::mutex push_request_mutex;
using RequestsQueue = ConcurrentBoundedQueue<KeeperStorage::RequestForSession>;
using SessionToResponseCallback = std::unordered_map<int64_t, ZooKeeperResponseCallback>;
using ClusterUpdateQueue = ConcurrentBoundedQueue<ClusterUpdateAction>;

View File

@ -626,7 +626,7 @@ class IColumn;
M(Bool, engine_file_allow_create_multiple_files, false, "Enables or disables creating a new file on each insert in file engine tables if format has suffix.", 0) \
M(Bool, engine_file_skip_empty_files, false, "Allows to skip empty files in file table engine", 0) \
M(Bool, engine_url_skip_empty_files, false, "Allows to skip empty files in url table engine", 0) \
M(Bool, disable_url_encoding, false, " Allows to disable decoding/encoding path in uri in URL table engine", 0) \
M(Bool, enable_url_encoding, true, " Allows to enable/disable decoding/encoding path in uri in URL table engine", 0) \
M(Bool, allow_experimental_database_replicated, false, "Allow to create databases with Replicated engine", 0) \
M(UInt64, database_replicated_initial_query_timeout_sec, 300, "How long initial DDL query should wait for Replicated database to precess previous DDL queue entries", 0) \
M(Bool, database_replicated_enforce_synchronous_settings, false, "Enforces synchronous waiting for some queries (see also database_atomic_wait_for_drop_and_detach_synchronously, mutation_sync, alter_sync). Not recommended to enable these settings.", 0) \
@ -1011,6 +1011,10 @@ class IColumn;
\
M(CapnProtoEnumComparingMode, format_capn_proto_enum_comparising_mode, FormatSettings::CapnProtoEnumComparingMode::BY_VALUES, "How to map ClickHouse Enum and CapnProto Enum", 0) \
\
M(Bool, format_capn_proto_use_autogenerated_schema, true, "Use autogenerated CapnProto schema when format_schema is not set", 0) \
M(Bool, format_protobuf_use_autogenerated_schema, true, "Use autogenerated Protobuf when format_schema is not set", 0) \
M(String, output_format_schema, "", "The path to the file where the automatically generated schema will be saved", 0) \
\
M(String, input_format_mysql_dump_table_name, "", "Name of the table in MySQL dump from which to read data", 0) \
M(Bool, input_format_mysql_dump_map_column_names, true, "Match columns from table in MySQL dump and columns from ClickHouse table by names", 0) \
\

View File

@ -14,7 +14,7 @@ namespace ErrorCodes
}
WriteBufferFromTemporaryFile::WriteBufferFromTemporaryFile(TemporaryFileOnDiskHolder && tmp_file_)
: WriteBufferFromFile(tmp_file_->getPath(), DBMS_DEFAULT_BUFFER_SIZE, O_RDWR | O_TRUNC | O_CREAT, /* throttler= */ {}, 0600)
: WriteBufferFromFile(tmp_file_->getAbsolutePath(), DBMS_DEFAULT_BUFFER_SIZE, O_RDWR | O_TRUNC | O_CREAT, /* throttler= */ {}, 0600)
, tmp_file(std::move(tmp_file_))
{
}

View File

@ -54,7 +54,7 @@ TemporaryFileOnDisk::TemporaryFileOnDisk(const DiskPtr & disk_, const String & p
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary file name is empty");
}
String TemporaryFileOnDisk::getPath() const
String TemporaryFileOnDisk::getAbsolutePath() const
{
return std::filesystem::path(disk->getPath()) / relative_path;
}

View File

@ -22,7 +22,10 @@ public:
~TemporaryFileOnDisk();
DiskPtr getDisk() const { return disk; }
String getPath() const;
/// Return absolute path (disk + relative_path)
String getAbsolutePath() const;
/// Return relative path (without disk)
const String & getRelativePath() const { return relative_path; }
private:
DiskPtr disk;

View File

@ -31,7 +31,7 @@ namespace ErrorCodes
message.empty() ? "" : ": " + message);
}
Poco::AutoPtr<Poco::XML::Document> getDiskConfigurationFromASTImpl(const std::string & root_name, const ASTs & disk_args, ContextPtr context)
Poco::AutoPtr<Poco::XML::Document> getDiskConfigurationFromASTImpl(const ASTs & disk_args, ContextPtr context)
{
if (disk_args.empty())
throwBadConfiguration("expected non-empty list of arguments");
@ -39,8 +39,6 @@ Poco::AutoPtr<Poco::XML::Document> getDiskConfigurationFromASTImpl(const std::st
Poco::AutoPtr<Poco::XML::Document> xml_document(new Poco::XML::Document());
Poco::AutoPtr<Poco::XML::Element> root(xml_document->createElement("disk"));
xml_document->appendChild(root);
Poco::AutoPtr<Poco::XML::Element> disk_configuration(xml_document->createElement(root_name));
root->appendChild(disk_configuration);
for (const auto & arg : disk_args)
{
@ -62,7 +60,7 @@ Poco::AutoPtr<Poco::XML::Document> getDiskConfigurationFromASTImpl(const std::st
const std::string & key = key_identifier->name();
Poco::AutoPtr<Poco::XML::Element> key_element(xml_document->createElement(key));
disk_configuration->appendChild(key_element);
root->appendChild(key_element);
if (!function_args[1]->as<ASTLiteral>() && !function_args[1]->as<ASTIdentifier>())
throwBadConfiguration("expected values to be literals or identifiers");
@ -75,9 +73,9 @@ Poco::AutoPtr<Poco::XML::Document> getDiskConfigurationFromASTImpl(const std::st
return xml_document;
}
DiskConfigurationPtr getDiskConfigurationFromAST(const std::string & root_name, const ASTs & disk_args, ContextPtr context)
DiskConfigurationPtr getDiskConfigurationFromAST(const ASTs & disk_args, ContextPtr context)
{
auto xml_document = getDiskConfigurationFromASTImpl(root_name, disk_args, context);
auto xml_document = getDiskConfigurationFromASTImpl(disk_args, context);
Poco::AutoPtr<Poco::Util::XMLConfiguration> conf(new Poco::Util::XMLConfiguration());
conf->load(xml_document);
return conf;

View File

@ -14,19 +14,19 @@ using DiskConfigurationPtr = Poco::AutoPtr<Poco::Util::AbstractConfiguration>;
/**
* Transform a list of pairs ( key1=value1, key2=value2, ... ), where keys and values are ASTLiteral or ASTIdentifier
* into
* <root_name>
* <disk>
* <key1>value1</key1>
* <key2>value2</key2>
* ...
* </root_name>
* </disk>
*
* Used in case disk configuration is passed via AST when creating
* a disk object on-the-fly without any configuration file.
*/
DiskConfigurationPtr getDiskConfigurationFromAST(const std::string & root_name, const ASTs & disk_args, ContextPtr context);
DiskConfigurationPtr getDiskConfigurationFromAST(const ASTs & disk_args, ContextPtr context);
/// The same as above function, but return XML::Document for easier modification of result configuration.
[[ maybe_unused ]] Poco::AutoPtr<Poco::XML::Document> getDiskConfigurationFromASTImpl(const std::string & root_name, const ASTs & disk_args, ContextPtr context);
[[ maybe_unused ]] Poco::AutoPtr<Poco::XML::Document> getDiskConfigurationFromASTImpl(const ASTs & disk_args, ContextPtr context);
/*
* A reverse function.

View File

@ -26,8 +26,16 @@ namespace
{
std::string getOrCreateDiskFromDiskAST(const ASTFunction & function, ContextPtr context)
{
const auto * function_args_expr = assert_cast<const ASTExpressionList *>(function.arguments.get());
const auto & function_args = function_args_expr->children;
auto config = getDiskConfigurationFromAST(function_args, context);
std::string disk_name;
if (function.name == "disk")
if (config->has("name"))
{
disk_name = config->getString("name");
}
else
{
/// We need a unique name for a created custom disk, but it needs to be the same
/// after table is reattached or server is restarted, so take a hash of the disk
@ -36,21 +44,9 @@ namespace
disk_name = DiskSelector::TMP_INTERNAL_DISK_PREFIX
+ toString(sipHash128(disk_setting_string.data(), disk_setting_string.size()));
}
else
{
static constexpr std::string_view custom_disk_prefix = "disk_";
if (function.name.size() <= custom_disk_prefix.size() || !function.name.starts_with(custom_disk_prefix))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid disk name: {}", function.name);
disk_name = function.name.substr(custom_disk_prefix.size());
}
auto result_disk = context->getOrCreateDisk(disk_name, [&](const DisksMap & disks_map) -> DiskPtr {
const auto * function_args_expr = assert_cast<const ASTExpressionList *>(function.arguments.get());
const auto & function_args = function_args_expr->children;
auto config = getDiskConfigurationFromAST(disk_name, function_args, context);
auto disk = DiskFactory::instance().create(disk_name, *config, disk_name, context, disks_map);
auto disk = DiskFactory::instance().create(disk_name, *config, "", context, disks_map);
/// Mark that disk can be used without storage policy.
disk->markDiskAsCustom();
return disk;

View File

@ -143,12 +143,14 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.protobuf.input_flatten_google_wrappers = settings.input_format_protobuf_flatten_google_wrappers;
format_settings.protobuf.output_nullables_with_google_wrappers = settings.output_format_protobuf_nullables_with_google_wrappers;
format_settings.protobuf.skip_fields_with_unsupported_types_in_schema_inference = settings.input_format_protobuf_skip_fields_with_unsupported_types_in_schema_inference;
format_settings.protobuf.use_autogenerated_schema = settings.format_protobuf_use_autogenerated_schema;
format_settings.regexp.escaping_rule = settings.format_regexp_escaping_rule;
format_settings.regexp.regexp = settings.format_regexp;
format_settings.regexp.skip_unmatched = settings.format_regexp_skip_unmatched;
format_settings.schema.format_schema = settings.format_schema;
format_settings.schema.format_schema_path = context->getFormatSchemaPath();
format_settings.schema.is_server = context->hasGlobalContext() && (context->getGlobalContext()->getApplicationType() == Context::ApplicationType::SERVER);
format_settings.schema.output_format_schema = settings.output_format_schema;
format_settings.skip_unknown_fields = settings.input_format_skip_unknown_fields;
format_settings.template_settings.resultset_format = settings.format_template_resultset;
format_settings.template_settings.row_between_delimiter = settings.format_template_rows_between_delimiter;
@ -190,6 +192,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.defaults_for_omitted_fields = settings.input_format_defaults_for_omitted_fields;
format_settings.capn_proto.enum_comparing_mode = settings.format_capn_proto_enum_comparising_mode;
format_settings.capn_proto.skip_fields_with_unsupported_types_in_schema_inference = settings.input_format_capn_proto_skip_fields_with_unsupported_types_in_schema_inference;
format_settings.capn_proto.use_autogenerated_schema = settings.format_capn_proto_use_autogenerated_schema;
format_settings.seekable_read = settings.input_format_allow_seeks;
format_settings.msgpack.number_of_columns = settings.input_format_msgpack_number_of_columns;
format_settings.msgpack.output_uuid_representation = settings.output_format_msgpack_uuid_representation;

View File

@ -1,6 +1,8 @@
#include <Formats/FormatSchemaInfo.h>
#include <Interpreters/Context.h>
#include <Common/Exception.h>
#include <Common/filesystemHelpers.h>
#include <Disks/IO/WriteBufferFromTemporaryFile.h>
#include <filesystem>
@ -105,4 +107,84 @@ FormatSchemaInfo::FormatSchemaInfo(const FormatSettings & settings, const String
{
}
template <typename SchemaGenerator>
MaybeAutogeneratedFormatSchemaInfo<SchemaGenerator>::MaybeAutogeneratedFormatSchemaInfo(
const FormatSettings & settings, const String & format, const Block & header, bool use_autogenerated_schema)
{
if (!use_autogenerated_schema || !settings.schema.format_schema.empty())
{
schema_info = std::make_unique<FormatSchemaInfo>(settings, format, true);
return;
}
String schema_path;
fs::path default_schema_directory_path(fs::canonical(settings.schema.format_schema_path) / "");
fs::path path;
if (!settings.schema.output_format_schema.empty())
{
schema_path = settings.schema.output_format_schema;
path = schema_path;
if (path.is_absolute())
{
if (settings.schema.is_server)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Absolute path in the 'output_format_schema' setting is prohibited: {}", path.string());
}
else if (path.has_parent_path() && !fs::weakly_canonical(default_schema_directory_path / path).string().starts_with(fs::weakly_canonical(default_schema_directory_path).string()))
{
if (settings.schema.is_server)
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Path in the 'format_schema' setting shouldn't go outside the 'format_schema_path' directory: {} ({} not in {})",
default_schema_directory_path.string(),
path.string(),
default_schema_directory_path.string());
path = default_schema_directory_path / path;
}
else
{
path = default_schema_directory_path / path;
}
}
else
{
if (settings.schema.is_server)
{
tmp_file_path = PocoTemporaryFile::tempName(default_schema_directory_path.string()) + '.' + getFormatSchemaDefaultFileExtension(format);
schema_path = fs::path(tmp_file_path).filename();
}
else
{
tmp_file_path = PocoTemporaryFile::tempName() + '.' + getFormatSchemaDefaultFileExtension(format);
schema_path = tmp_file_path;
}
path = tmp_file_path;
}
WriteBufferFromFile buf(path.string());
SchemaGenerator::writeSchema(buf, "Message", header.getNamesAndTypesList());
buf.finalize();
schema_info = std::make_unique<FormatSchemaInfo>(schema_path + ":Message", format, true, settings.schema.is_server, settings.schema.format_schema_path);
}
template <typename SchemaGenerator>
MaybeAutogeneratedFormatSchemaInfo<SchemaGenerator>::~MaybeAutogeneratedFormatSchemaInfo()
{
if (!tmp_file_path.empty())
{
try
{
fs::remove(tmp_file_path);
}
catch (...)
{
tryLogCurrentException("MaybeAutogeneratedFormatSchemaInfo", "Cannot delete temporary schema file");
}
}
}
template class MaybeAutogeneratedFormatSchemaInfo<StructureToCapnProtoSchema>;
template class MaybeAutogeneratedFormatSchemaInfo<StructureToProtobufSchema>;
}

View File

@ -2,6 +2,8 @@
#include <base/types.h>
#include <Formats/FormatSettings.h>
#include <Formats/StructureToCapnProtoSchema.h>
#include <Formats/StructureToProtobufSchema.h>
namespace DB
{
@ -30,4 +32,23 @@ private:
String message_name;
};
template <typename SchemaGenerator>
class MaybeAutogeneratedFormatSchemaInfo
{
public:
MaybeAutogeneratedFormatSchemaInfo(const FormatSettings & settings, const String & format, const Block & header, bool use_autogenerated_schema);
~MaybeAutogeneratedFormatSchemaInfo();
const FormatSchemaInfo & getSchemaInfo() const { return *schema_info; }
private:
std::unique_ptr<FormatSchemaInfo> schema_info;
String tmp_file_path;
};
using CapnProtoSchemaInfo = MaybeAutogeneratedFormatSchemaInfo<StructureToCapnProtoSchema>;
using ProtobufSchemaInfo = MaybeAutogeneratedFormatSchemaInfo<StructureToProtobufSchema>;
}

View File

@ -276,6 +276,7 @@ struct FormatSettings
*/
bool allow_multiple_rows_without_delimiter = false;
bool skip_fields_with_unsupported_types_in_schema_inference = false;
bool use_autogenerated_schema = true;
} protobuf;
struct
@ -297,6 +298,7 @@ struct FormatSettings
std::string format_schema;
std::string format_schema_path;
bool is_server = false;
std::string output_format_schema;
} schema;
struct
@ -359,6 +361,7 @@ struct FormatSettings
{
CapnProtoEnumComparingMode enum_comparing_mode = CapnProtoEnumComparingMode::BY_VALUES;
bool skip_fields_with_unsupported_types_in_schema_inference = false;
bool use_autogenerated_schema = true;
} capn_proto;
enum class MsgPackUUIDRepresentation

View File

@ -3029,7 +3029,7 @@ namespace
if (!message_serializer)
{
throw Exception(ErrorCodes::NO_COLUMNS_SERIALIZED_TO_PROTOBUF_FIELDS,
"Not found matches between the names of the columns {{}} and the fields {{}} of the message {} in the protobuf schema",
"Not found matches between the names of the columns ({}) and the fields ({}) of the message {} in the protobuf schema",
boost::algorithm::join(column_names, ", "), boost::algorithm::join(getFieldNames(message_descriptor), ", "),
quoteString(message_descriptor.full_name()));
}
@ -3647,7 +3647,7 @@ namespace
if (!message_serializer)
{
throw Exception(ErrorCodes::NO_COLUMNS_SERIALIZED_TO_PROTOBUF_FIELDS,
"Not found matches between the names of the tuple's elements {{}} and the fields {{}} "
"Not found matches between the names of the tuple's elements ({}) and the fields ({}) "
"of the message {} in the protobuf schema",
boost::algorithm::join(tuple_data_type.getElementNames(), ", "),
boost::algorithm::join(getFieldNames(*field_descriptor.message_type()), ", "),

View File

@ -0,0 +1,236 @@
#include <Formats/StructureToCapnProtoSchema.h>
#include <Formats/StructureToFormatSchemaUtils.h>
#include <Columns/ColumnString.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeEnum.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/randomSeed.h>
#include <pcg_random.hpp>
namespace DB
{
using namespace StructureToFormatSchemaUtils;
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
namespace
{
const std::unordered_map<TypeIndex, String> capn_proto_simple_type_names =
{
{TypeIndex::Int8, "Int8"},
{TypeIndex::UInt8, "UInt8"},
{TypeIndex::Int16, "Int16"},
{TypeIndex::UInt16, "UInt16"},
{TypeIndex::Int32, "Int32"},
{TypeIndex::UInt32, "UInt32"},
{TypeIndex::Int64, "Int64"},
{TypeIndex::UInt64, "UInt64"},
{TypeIndex::Int128, "Data"},
{TypeIndex::UInt128, "Data"},
{TypeIndex::Int256, "Data"},
{TypeIndex::UInt256, "Data"},
{TypeIndex::Float32, "Float32"},
{TypeIndex::Float64, "Float64"},
{TypeIndex::Decimal32, "Int32"},
{TypeIndex::Decimal64, "Int64"},
{TypeIndex::Decimal128, "Data"},
{TypeIndex::Decimal256, "Data"},
{TypeIndex::String, "Data"},
{TypeIndex::FixedString, "Data"},
{TypeIndex::UUID, "Data"},
{TypeIndex::Date, "UInt16"},
{TypeIndex::Date32, "Int32"},
{TypeIndex::DateTime, "UInt32"},
{TypeIndex::DateTime64, "Int64"},
{TypeIndex::IPv4, "UInt32"},
{TypeIndex::IPv6, "Data"},
};
void writeCapnProtoHeader(WriteBuffer & buf)
{
pcg64 rng(randomSeed());
size_t id = rng() | (1ull << 63); /// First bit should be 1
writeString(fmt::format("@0x{};\n\n", getHexUIntLowercase(id)), buf);
}
void writeFieldDefinition(WriteBuffer & buf, const String & type_name, const String & column_name, size_t & field_index, size_t indent)
{
writeIndent(buf, indent);
writeString(fmt::format("{} @{} : {};\n", getSchemaFieldName(column_name), field_index++, type_name), buf);
}
void startEnum(WriteBuffer & buf, const String & enum_name, size_t indent)
{
startNested(buf, enum_name, "enum", indent);
}
void startUnion(WriteBuffer & buf, size_t indent)
{
startNested(buf, "", "union", indent);
}
void startStruct(WriteBuffer & buf, const String & struct_name, size_t indent)
{
startNested(buf, struct_name, "struct", indent);
}
String prepareAndGetCapnProtoTypeName(WriteBuffer & buf, const DataTypePtr & data_type, const String & column_name, size_t indent);
void writeField(WriteBuffer & buf, const DataTypePtr & data_type, const String & column_name, size_t & field_index, size_t indent)
{
auto field_type_name = prepareAndGetCapnProtoTypeName(buf, data_type, column_name, indent);
writeFieldDefinition(buf, field_type_name, column_name, field_index, indent);
}
String prepareArrayAndGetCapnProtoTypeName(WriteBuffer & buf, const DataTypePtr & data_type, const String & column_name, size_t indent)
{
const auto & nested_type = assert_cast<const DataTypeArray &>(*data_type).getNestedType();
auto nested_type_name = prepareAndGetCapnProtoTypeName(buf, nested_type, column_name, indent);
return "List(" + nested_type_name + ")";
}
String prepareNullableAndGetCapnProtoTypeName(WriteBuffer & buf, const DataTypePtr & data_type, const String & column_name, size_t indent)
{
/// Nullable is represented as a struct with union with 2 fields:
///
/// struct Nullable
/// {
/// union
/// {
/// value @0 : Value;
/// null @1 : Void;
/// }
/// }
auto struct_name = getSchemaMessageName(column_name);
startStruct(buf, struct_name, indent);
auto nested_type_name = prepareAndGetCapnProtoTypeName(buf, assert_cast<const DataTypeNullable &>(*data_type).getNestedType(), column_name, indent);
startUnion(buf, indent + 1);
size_t field_index = 0;
writeFieldDefinition(buf, nested_type_name, "value", field_index, indent + 2);
writeFieldDefinition(buf, "Void", "null", field_index, indent + 2);
endNested(buf, indent + 1);
endNested(buf, indent);
return struct_name;
}
String prepareTupleAndGetCapnProtoTypeName(WriteBuffer & buf, const DataTypePtr & data_type, const String & column_name, size_t indent)
{
const auto & tuple_type = assert_cast<const DataTypeTuple &>(*data_type);
auto nested_names_and_types = getCollectedTupleElements(tuple_type);
String struct_name = getSchemaMessageName(column_name);
startStruct(buf, struct_name, indent);
size_t nested_field_index = 0;
for (const auto & [name, type] : nested_names_and_types)
writeField(buf, type, name, nested_field_index, indent + 1);
endNested(buf, indent);
return struct_name;
}
String prepareMapAndGetCapnProtoTypeName(WriteBuffer & buf, const DataTypePtr & data_type, const String & column_name, size_t indent)
{
/// We output/input Map type as follow CapnProto schema
///
/// struct Map
/// {
/// struct Entry
/// {
/// key @0: Key;
/// value @1: Value;
/// }
/// entries @0 :List(Entry);
/// }
const auto & map_type = assert_cast<const DataTypeMap &>(*data_type);
const auto & key_type = map_type.getKeyType();
const auto & value_type = map_type.getValueType();
String struct_name = getSchemaMessageName(column_name);
startStruct(buf, struct_name, indent);
startStruct(buf, "Entry", indent + 1);
auto key_type_name = prepareAndGetCapnProtoTypeName(buf, key_type, "key", indent + 2);
auto value_type_name = prepareAndGetCapnProtoTypeName(buf, value_type, "value", indent + 2);
size_t field_index = 0;
writeFieldDefinition(buf, key_type_name, "key", field_index, indent + 2);
writeFieldDefinition(buf, value_type_name, "value", field_index, indent + 2);
endNested(buf, indent + 1);
field_index = 0;
writeFieldDefinition(buf, "List(Entry)", "entries", field_index, indent + 1);
endNested(buf, indent);
return struct_name;
}
template <typename EnumType>
String prepareEnumAndGetCapnProtoTypeName(WriteBuffer & buf, const DataTypePtr & data_type, const String & column_name, size_t indent)
{
const auto & enum_type = assert_cast<const DataTypeEnum<EnumType> &>(*data_type);
String enum_name = getSchemaMessageName(column_name);
startEnum(buf, enum_name, indent);
const auto & names = enum_type.getAllRegisteredNames();
for (size_t i = 0; i != names.size(); ++i)
{
writeIndent(buf, indent + 1);
writeString(fmt::format("{} @{};\n", names[i], std::to_string(i)), buf);
}
endNested(buf, indent);
return enum_name;
}
String prepareAndGetCapnProtoTypeName(WriteBuffer & buf, const DataTypePtr & data_type, const String & column_name, size_t indent)
{
TypeIndex type_id = data_type->getTypeId();
switch (data_type->getTypeId())
{
case TypeIndex::Nullable:
return prepareNullableAndGetCapnProtoTypeName(buf, data_type, column_name, indent);
case TypeIndex::LowCardinality:
return prepareAndGetCapnProtoTypeName(buf, assert_cast<const DataTypeLowCardinality &>(*data_type).getDictionaryType(), column_name, indent);
case TypeIndex::Array:
return prepareArrayAndGetCapnProtoTypeName(buf, data_type, column_name, indent);
case TypeIndex::Tuple:
return prepareTupleAndGetCapnProtoTypeName(buf, data_type, column_name, indent);
case TypeIndex::Map:
return prepareMapAndGetCapnProtoTypeName(buf, data_type, column_name, indent);
case TypeIndex::Enum8:
return prepareEnumAndGetCapnProtoTypeName<Int8>(buf, data_type, column_name, indent);
case TypeIndex::Enum16:
return prepareEnumAndGetCapnProtoTypeName<Int16>(buf, data_type, column_name, indent);
default:
{
if (isBool(data_type))
return "Bool";
auto it = capn_proto_simple_type_names.find(type_id);
if (it == capn_proto_simple_type_names.end())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "CapnProto type name is not found for type {}", data_type->getName());
return it->second;
}
}
}
}
void StructureToCapnProtoSchema::writeSchema(WriteBuffer & buf, const String & message_name, const NamesAndTypesList & names_and_types_)
{
auto names_and_types = collectNested(names_and_types_);
writeCapnProtoHeader(buf);
startStruct(buf, getSchemaMessageName(message_name), 0);
size_t field_index = 0;
for (const auto & [column_name, data_type] : names_and_types)
writeField(buf, data_type, column_name, field_index, 1);
endNested(buf, 0);
}
}

View File

@ -0,0 +1,16 @@
#pragma once
#include <IO/WriteBuffer.h>
#include <Core/NamesAndTypes.h>
namespace DB
{
struct StructureToCapnProtoSchema
{
static constexpr auto name = "structureToCapnProtoSchema";
static void writeSchema(WriteBuffer & buf, const String & message_name, const NamesAndTypesList & names_and_types_);
};
}

View File

@ -0,0 +1,117 @@
#include <Formats/StructureToFormatSchemaUtils.h>
#include <IO/WriteHelpers.h>
namespace DB
{
namespace StructureToFormatSchemaUtils
{
void writeIndent(WriteBuffer & buf, size_t indent)
{
writeChar(' ', indent * 4, buf);
}
void startNested(WriteBuffer & buf, const String & nested_name, const String & nested_type, size_t indent)
{
writeIndent(buf, indent);
writeString(nested_type, buf);
if (!nested_name.empty())
{
writeChar(' ', buf);
writeString(nested_name, buf);
}
writeChar('\n', buf);
writeIndent(buf, indent);
writeCString("{\n", buf);
}
void endNested(WriteBuffer & buf, size_t indent)
{
writeIndent(buf, indent);
writeCString("}\n", buf);
}
String getSchemaFieldName(const String & column_name)
{
String result = column_name;
/// Replace all first uppercase letters to lower-case,
/// because fields in CapnProto schema must begin with a lower-case letter.
/// Don't replace all letters to lower-case to remain camelCase field names.
for (auto & symbol : result)
{
if (islower(symbol))
break;
symbol = tolower(symbol);
}
return result;
}
String getSchemaMessageName(const String & column_name)
{
String result = column_name;
if (!column_name.empty() && isalpha(column_name[0]))
result[0] = toupper(column_name[0]);
return result;
}
namespace
{
std::pair<String, String> splitName(const String & name)
{
const auto * begin = name.data();
const auto * end = name.data() + name.size();
const auto * it = find_first_symbols<'_', '.'>(begin, end);
String first = String(begin, it);
String second = it == end ? "" : String(it + 1, end);
return {std::move(first), std::move(second)};
}
}
NamesAndTypesList collectNested(const NamesAndTypesList & names_and_types)
{
/// Find all columns with dots '.' or underscores '_' and move them into a tuple.
/// For example if we have columns 'a.b UInt32, a.c UInt32, x_y String' we will
/// change it to 'a Tuple(b UInt32, c UInt32), x Tuple(y String)'
NamesAndTypesList result;
std::unordered_map<String, NamesAndTypesList> nested;
for (const auto & [name, type] : names_and_types)
{
auto [field_name, nested_name] = splitName(name);
if (nested_name.empty())
result.emplace_back(name, type);
else
nested[field_name].emplace_back(nested_name, type);
}
for (const auto & [field_name, elements]: nested)
result.emplace_back(field_name, std::make_shared<DataTypeTuple>(elements.getTypes(), elements.getNames()));
return result;
}
NamesAndTypesList getCollectedTupleElements(const DataTypeTuple & tuple_type)
{
const auto & nested_types = tuple_type.getElements();
Names nested_names;
if (tuple_type.haveExplicitNames())
{
nested_names = tuple_type.getElementNames();
}
else
{
nested_names.reserve(nested_types.size());
for (size_t i = 0; i != nested_types.size(); ++i)
nested_names.push_back("e" + std::to_string(i + 1));
}
NamesAndTypesList result;
for (size_t i = 0; i != nested_names.size(); ++i)
result.emplace_back(nested_names[i], nested_types[i]);
return collectNested(result);
}
}
}

View File

@ -0,0 +1,27 @@
#pragma once
#include <Core/NamesAndTypes.h>
#include <DataTypes/NestedUtils.h>
#include <DataTypes/DataTypeTuple.h>
namespace DB
{
namespace StructureToFormatSchemaUtils
{
void writeIndent(WriteBuffer & buf, size_t indent);
void startNested(WriteBuffer & buf, const String & nested_name, const String & nested_type, size_t indent);
void endNested(WriteBuffer & buf, size_t indent);
String getSchemaFieldName(const String & column_name);
String getSchemaMessageName(const String & column_name);
NamesAndTypesList collectNested(const NamesAndTypesList & names_and_types);
NamesAndTypesList getCollectedTupleElements(const DataTypeTuple & tuple_type);
}
}

View File

@ -0,0 +1,214 @@
#include <Formats/StructureToProtobufSchema.h>
#include <Formats/StructureToFormatSchemaUtils.h>
#include <Columns/ColumnString.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeEnum.h>
#include <Common/StringUtils/StringUtils.h>
namespace DB
{
using namespace StructureToFormatSchemaUtils;
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
namespace
{
const std::unordered_map<TypeIndex, String> protobuf_simple_type_names =
{
{TypeIndex::Int8, "int32"},
{TypeIndex::UInt8, "uint32"},
{TypeIndex::Int16, "int32"},
{TypeIndex::UInt16, "uint32"},
{TypeIndex::Int32, "int32"},
{TypeIndex::UInt32, "uint32"},
{TypeIndex::Int64, "int64"},
{TypeIndex::UInt64, "uint64"},
{TypeIndex::Int128, "bytes"},
{TypeIndex::UInt128, "bytes"},
{TypeIndex::Int256, "bytes"},
{TypeIndex::UInt256, "bytes"},
{TypeIndex::Float32, "float"},
{TypeIndex::Float64, "double"},
{TypeIndex::Decimal32, "bytes"},
{TypeIndex::Decimal64, "bytes"},
{TypeIndex::Decimal128, "bytes"},
{TypeIndex::Decimal256, "bytes"},
{TypeIndex::String, "bytes"},
{TypeIndex::FixedString, "bytes"},
{TypeIndex::UUID, "bytes"},
{TypeIndex::Date, "uint32"},
{TypeIndex::Date32, "int32"},
{TypeIndex::DateTime, "uint32"},
{TypeIndex::DateTime64, "uint64"},
{TypeIndex::IPv4, "uint32"},
{TypeIndex::IPv6, "bytes"},
};
void writeProtobufHeader(WriteBuffer & buf)
{
writeCString("syntax = \"proto3\";\n\n", buf);
}
void startEnum(WriteBuffer & buf, const String & enum_name, size_t indent)
{
startNested(buf, enum_name, "enum", indent);
}
void startMessage(WriteBuffer & buf, const String & message_name, size_t indent)
{
startNested(buf, message_name, "message", indent);
}
void writeFieldDefinition(WriteBuffer & buf, const String & type_name, const String & column_name, size_t & field_index, size_t indent)
{
writeIndent(buf, indent);
writeString(fmt::format("{} {} = {};\n", type_name, getSchemaFieldName(column_name), field_index++), buf);
}
String prepareAndGetProtobufTypeName(WriteBuffer & buf, const DataTypePtr & data_type, const String & column_name, size_t indent);
void writeProtobufField(WriteBuffer & buf, const DataTypePtr & data_type, const String & column_name, size_t & field_index, size_t indent)
{
auto field_type_name = prepareAndGetProtobufTypeName(buf, data_type, column_name, indent);
writeFieldDefinition(buf, field_type_name, column_name, field_index, indent);
}
String prepareArrayAndGetProtobufTypeName(WriteBuffer & buf, const DataTypePtr & data_type, const String & column_name, size_t indent)
{
const auto & nested_type = assert_cast<const DataTypeArray &>(*data_type).getNestedType();
/// Simple case when we can just use 'repeated <nested_type>'.
if (!isArray(nested_type) && !isMap(nested_type))
{
auto nested_type_name = prepareAndGetProtobufTypeName(buf, nested_type, column_name, indent);
return "repeated " + nested_type_name;
}
/// Protobuf doesn't support multidimensional repeated fields and repeated maps.
/// When we have Array(Array(...)) or Array(Map(...)) we should place nested type into a nested Message with one field.
String message_name = getSchemaMessageName(column_name);
startMessage(buf, message_name, indent);
size_t nested_field_index = 1;
writeProtobufField(buf, nested_type, column_name, nested_field_index, indent + 1);
endNested(buf, indent);
return "repeated " + message_name;
}
String prepareTupleAndGetProtobufTypeName(WriteBuffer & buf, const DataTypePtr & data_type, const String & column_name, size_t indent)
{
const auto & tuple_type = assert_cast<const DataTypeTuple &>(*data_type);
auto nested_names_and_types = getCollectedTupleElements(tuple_type);
String message_name = getSchemaMessageName(column_name);
startMessage(buf, message_name, indent);
size_t nested_field_index = 1;
for (const auto & [name, type] : nested_names_and_types)
writeProtobufField(buf, type, name, nested_field_index, indent + 1);
endNested(buf, indent);
return message_name;
}
String prepareMapAndGetProtobufTypeName(WriteBuffer & buf, const DataTypePtr & data_type, const String & column_name, size_t indent)
{
const auto & map_type = assert_cast<const DataTypeMap &>(*data_type);
const auto & key_type = map_type.getKeyType();
const auto & value_type = map_type.getValueType();
auto it = protobuf_simple_type_names.find(key_type->getTypeId());
if (it == protobuf_simple_type_names.end())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Type {} is not supported for conversion into Map key in Protobuf schema", data_type->getName());
auto key_type_name = it->second;
/// Protobuf map type doesn't support "bytes" type as a key. Change it to "string"
if (key_type_name == "bytes")
key_type_name = "string";
/// Special cases when value type is Array or Map, because Protobuf
/// doesn't support syntax "map<Key, repeated Value>" and "map<Key, map<..., ...>>"
/// In this case we should place it into a nested Message with one field.
String value_type_name;
if (isArray(value_type) || isMap(value_type))
{
value_type_name = getSchemaMessageName(column_name) + "Value";
startMessage(buf, value_type_name, indent);
size_t nested_field_index = 1;
writeProtobufField(buf, value_type, column_name + "Value", nested_field_index, indent + 1);
endNested(buf, indent);
}
else
{
value_type_name = prepareAndGetProtobufTypeName(buf, value_type, column_name + "Value", indent);
}
return fmt::format("map<{}, {}>", key_type_name, value_type_name);
}
template <typename EnumType>
String prepareEnumAndGetProtobufTypeName(WriteBuffer & buf, const DataTypePtr & data_type, const String & column_name, size_t indent)
{
const auto & enum_type = assert_cast<const DataTypeEnum<EnumType> &>(*data_type);
String enum_name = getSchemaMessageName(column_name);
startEnum(buf, enum_name, indent);
const auto & names = enum_type.getAllRegisteredNames();
for (size_t i = 0; i != names.size(); ++i)
{
writeIndent(buf, indent + 1);
writeString(fmt::format("{} = {};\n", names[i], std::to_string(i)), buf);
}
endNested(buf, indent);
return enum_name;
}
String prepareAndGetProtobufTypeName(WriteBuffer & buf, const DataTypePtr & data_type, const String & column_name, size_t indent)
{
TypeIndex type_id = data_type->getTypeId();
switch (data_type->getTypeId())
{
case TypeIndex::Nullable:
return prepareAndGetProtobufTypeName(buf, assert_cast<const DataTypeNullable &>(*data_type).getNestedType(), column_name, indent);
case TypeIndex::LowCardinality:
return prepareAndGetProtobufTypeName(buf, assert_cast<const DataTypeLowCardinality &>(*data_type).getDictionaryType(), column_name, indent);
case TypeIndex::Array:
return prepareArrayAndGetProtobufTypeName(buf, data_type, column_name, indent);
case TypeIndex::Tuple:
return prepareTupleAndGetProtobufTypeName(buf, data_type, column_name, indent);
case TypeIndex::Map:
return prepareMapAndGetProtobufTypeName(buf, data_type, column_name, indent);
case TypeIndex::Enum8:
return prepareEnumAndGetProtobufTypeName<Int8>(buf, data_type, column_name, indent);
case TypeIndex::Enum16:
return prepareEnumAndGetProtobufTypeName<Int16>(buf, data_type, column_name, indent);
default:
{
if (isBool(data_type))
return "bool";
auto it = protobuf_simple_type_names.find(type_id);
if (it == protobuf_simple_type_names.end())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Type {} is not supported for conversion into Protobuf schema", data_type->getName());
return it->second;
}
}
}
}
void StructureToProtobufSchema::writeSchema(WriteBuffer & buf, const String & message_name, const NamesAndTypesList & names_and_types_)
{
auto names_and_types = collectNested(names_and_types_);
writeProtobufHeader(buf);
startMessage(buf, getSchemaMessageName(message_name), 0);
size_t field_index = 1;
for (const auto & [column_name, data_type] : names_and_types)
writeProtobufField(buf, data_type, column_name, field_index, 1);
endNested(buf, 0);
}
}

View File

@ -0,0 +1,16 @@
#pragma once
#include <IO/WriteBuffer.h>
#include <Core/NamesAndTypes.h>
namespace DB
{
struct StructureToProtobufSchema
{
static constexpr auto name = "structureToProtobufSchema";
static void writeSchema(WriteBuffer & buf, const String & message_name, const NamesAndTypesList & names_and_types_);
};
}

View File

@ -0,0 +1,145 @@
#include <Columns/ColumnString.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeEnum.h>
#include <Functions/FunctionFactory.h>
#include <Functions/IFunction.h>
#include <Interpreters/parseColumnsListForTableFunction.h>
#include <Interpreters/Context.h>
#include <IO/WriteBufferFromVector.h>
#include <Formats/StructureToCapnProtoSchema.h>
#include <Formats/StructureToProtobufSchema.h>
#include <Common/randomSeed.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}
template <class Impl>
class FunctionStructureToFormatSchema : public IFunction
{
public:
static constexpr auto name = Impl::name;
explicit FunctionStructureToFormatSchema(ContextPtr context_) : context(std::move(context_))
{
}
static FunctionPtr create(ContextPtr ctx)
{
return std::make_shared<FunctionStructureToFormatSchema>(std::move(ctx));
}
String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 0; }
bool isVariadic() const override { return true; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {0, 1}; }
bool useDefaultImplementationForConstants() const override { return false; }
bool useDefaultImplementationForNulls() const override { return false; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
if (arguments.empty() || arguments.size() > 2)
throw Exception(
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Number of arguments for function {} doesn't match: passed {}, expected 1 or 2",
getName(), arguments.size());
if (!isString(arguments[0]))
{
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of the first argument of function {}, expected constant string",
arguments[0]->getName(),
getName());
}
if (arguments.size() > 1 && !isString(arguments[1]))
{
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of the second argument of function {}, expected constant string",
arguments[1]->getName(),
getName());
}
return std::make_shared<DataTypeString>();
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
if (arguments.empty() || arguments.size() > 2)
throw Exception(
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Number of arguments for function {} doesn't match: passed {}, expected 1 or 2",
getName(), arguments.size());
String structure = arguments[0].column->getDataAt(0).toString();
String message_name = arguments.size() == 2 ? arguments[1].column->getDataAt(0).toString() : "Message";
auto columns_list = parseColumnsListFromString(structure, context);
auto col_res = ColumnString::create();
auto & data = assert_cast<ColumnString &>(*col_res).getChars();
WriteBufferFromVector buf(data);
Impl::writeSchema(buf, message_name, columns_list.getAll());
buf.finalize();
auto & offsets = assert_cast<ColumnString &>(*col_res).getOffsets();
offsets.push_back(data.size());
return ColumnConst::create(std::move(col_res), input_rows_count);
}
private:
ContextPtr context;
};
REGISTER_FUNCTION(StructureToCapnProtoSchema)
{
factory.registerFunction<FunctionStructureToFormatSchema<StructureToCapnProtoSchema>>(FunctionDocumentation
{
.description=R"(
Function that converts ClickHouse table structure to CapnProto format schema
)",
.examples{
{"random", "SELECT structureToCapnProtoSchema('s String, x UInt32', 'MessageName') format TSVRaw", "struct MessageName\n"
"{\n"
" s @0 : Data;\n"
" x @1 : UInt32;\n"
"}"},
},
.categories{"Other"}
},
FunctionFactory::CaseSensitive);
}
REGISTER_FUNCTION(StructureToProtobufSchema)
{
factory.registerFunction<FunctionStructureToFormatSchema<StructureToProtobufSchema>>(FunctionDocumentation
{
.description=R"(
Function that converts ClickHouse table structure to Protobuf format schema
)",
.examples{
{"random", "SELECT structureToCapnProtoSchema('s String, x UInt32', 'MessageName') format TSVRaw", "syntax = \"proto3\";\n"
"\n"
"message MessageName\n"
"{\n"
" bytes s = 1;\n"
" uint32 x = 2;\n"
"}"},
},
.categories{"Other"}
},
FunctionFactory::CaseSensitive);
}
}

View File

@ -764,7 +764,7 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::getTableProperti
/// Table function without columns list.
auto table_function_ast = create.as_table_function->ptr();
auto table_function = TableFunctionFactory::instance().get(table_function_ast, getContext());
properties.columns = table_function->getActualTableStructure(getContext());
properties.columns = table_function->getActualTableStructure(getContext(), /*is_insert_query*/ true);
}
else if (create.is_dictionary)
{

View File

@ -96,7 +96,7 @@ BlockIO InterpreterDescribeQuery::execute()
else if (table_expression.table_function)
{
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_expression.table_function, getContext());
auto table_function_column_descriptions = table_function_ptr->getActualTableStructure(getContext());
auto table_function_column_descriptions = table_function_ptr->getActualTableStructure(getContext(), /*is_insert_query*/ true);
for (const auto & table_function_column_description : table_function_column_descriptions)
columns.emplace_back(table_function_column_description);
}

View File

@ -1034,7 +1034,7 @@ std::shared_ptr<Block> MergeJoin::loadRightBlock(size_t pos) const
{
auto load_func = [&]() -> std::shared_ptr<Block>
{
TemporaryFileStreamLegacy input(flushed_right_blocks[pos]->getPath(), materializeBlock(right_sample_block));
TemporaryFileStreamLegacy input(flushed_right_blocks[pos]->getAbsolutePath(), materializeBlock(right_sample_block));
return std::make_shared<Block>(input.block_in->read());
};

View File

@ -39,7 +39,7 @@ namespace
TemporaryFileOnDiskHolder flushToFile(const DiskPtr & disk, const Block & header, QueryPipelineBuilder pipeline, const String & codec)
{
auto tmp_file = std::make_unique<TemporaryFileOnDisk>(disk, CurrentMetrics::TemporaryFilesForJoin);
auto write_stat = TemporaryFileStreamLegacy::write(tmp_file->getPath(), header, std::move(pipeline), codec);
auto write_stat = TemporaryFileStreamLegacy::write(tmp_file->getAbsolutePath(), header, std::move(pipeline), codec);
ProfileEvents::increment(ProfileEvents::ExternalProcessingCompressedBytesTotal, write_stat.compressed_bytes);
ProfileEvents::increment(ProfileEvents::ExternalProcessingUncompressedBytesTotal, write_stat.uncompressed_bytes);
@ -267,7 +267,7 @@ SortedBlocksWriter::SortedFiles SortedBlocksWriter::finishMerge(std::function<vo
Pipe SortedBlocksWriter::streamFromFile(const TmpFilePtr & file) const
{
return Pipe(std::make_shared<TemporaryFileLazySource>(file->getPath(), materializeBlock(sample_block)));
return Pipe(std::make_shared<TemporaryFileLazySource>(file->getAbsolutePath(), materializeBlock(sample_block)));
}

View File

@ -235,9 +235,9 @@ TemporaryFileStream::TemporaryFileStream(TemporaryFileOnDiskHolder file_, const
: parent(parent_)
, header(header_)
, file(std::move(file_))
, out_writer(std::make_unique<OutputWriter>(std::make_unique<WriteBufferFromFile>(file->getPath()), header))
, out_writer(std::make_unique<OutputWriter>(std::make_unique<WriteBufferFromFile>(file->getAbsolutePath()), header))
{
LOG_TEST(&Poco::Logger::get("TemporaryFileStream"), "Writing to temporary file {}", file->getPath());
LOG_TEST(&Poco::Logger::get("TemporaryFileStream"), "Writing to temporary file {}", file->getAbsolutePath());
}
TemporaryFileStream::TemporaryFileStream(FileSegmentsHolderPtr segments_, const Block & header_, TemporaryDataOnDisk * parent_)
@ -365,7 +365,7 @@ void TemporaryFileStream::release()
String TemporaryFileStream::getPath() const
{
if (file)
return file->getPath();
return file->getAbsolutePath();
if (segment_holder && !segment_holder->empty())
return segment_holder->front().getPathInLocalCache();

View File

@ -215,7 +215,7 @@ bool ParserSetQuery::parseNameValuePair(SettingChange & change, IParser::Pos & p
else if (ParserKeyword("FALSE").ignore(pos, expected))
value = std::make_shared<ASTLiteral>(Field(static_cast<UInt64>(0)));
/// for SETTINGS disk=disk(type='s3', path='', ...)
else if (function_p.parse(pos, function_ast, expected) && function_ast->as<ASTFunction>()->name.starts_with("disk"))
else if (function_p.parse(pos, function_ast, expected) && function_ast->as<ASTFunction>()->name == "disk")
{
tryGetIdentifierNameInto(name, change.name);
change.value = createFieldFromAST(function_ast);
@ -280,7 +280,7 @@ bool ParserSetQuery::parseNameValuePairWithParameterOrDefault(
node = std::make_shared<ASTLiteral>(Field(static_cast<UInt64>(1)));
else if (ParserKeyword("FALSE").ignore(pos, expected))
node = std::make_shared<ASTLiteral>(Field(static_cast<UInt64>(0)));
else if (function_p.parse(pos, function_ast, expected) && function_ast->as<ASTFunction>()->name.starts_with("disk"))
else if (function_p.parse(pos, function_ast, expected) && function_ast->as<ASTFunction>()->name == "disk")
{
change.name = name;
change.value = createFieldFromAST(function_ast);

View File

@ -17,12 +17,12 @@ namespace ErrorCodes
extern const int INCORRECT_DATA;
}
CapnProtoRowInputFormat::CapnProtoRowInputFormat(ReadBuffer & in_, Block header_, Params params_, const FormatSchemaInfo & info, const FormatSettings & format_settings)
CapnProtoRowInputFormat::CapnProtoRowInputFormat(ReadBuffer & in_, Block header_, Params params_, const CapnProtoSchemaInfo & info, const FormatSettings & format_settings)
: IRowInputFormat(std::move(header_), in_, std::move(params_))
, parser(std::make_shared<CapnProtoSchemaParser>())
{
// Parse the schema and fetch the root object
schema = parser->getMessageSchema(info);
schema = parser->getMessageSchema(info.getSchemaInfo());
const auto & header = getPort().getHeader();
serializer = std::make_unique<CapnProtoSerializer>(header.getDataTypes(), header.getNames(), schema, format_settings.capn_proto);
}
@ -106,8 +106,12 @@ void registerInputFormatCapnProto(FormatFactory & factory)
"CapnProto",
[](ReadBuffer & buf, const Block & sample, IRowInputFormat::Params params, const FormatSettings & settings)
{
return std::make_shared<CapnProtoRowInputFormat>(buf, sample, std::move(params),
FormatSchemaInfo(settings, "CapnProto", true), settings);
return std::make_shared<CapnProtoRowInputFormat>(
buf,
sample,
std::move(params),
CapnProtoSchemaInfo(settings, "CapnProto", sample, settings.capn_proto.use_autogenerated_schema),
settings);
});
factory.markFormatSupportsSubsetOfColumns("CapnProto");
factory.registerFileExtension("capnp", "CapnProto");

View File

@ -24,7 +24,7 @@ class ReadBuffer;
class CapnProtoRowInputFormat final : public IRowInputFormat
{
public:
CapnProtoRowInputFormat(ReadBuffer & in_, Block header, Params params_, const FormatSchemaInfo & info, const FormatSettings & format_settings_);
CapnProtoRowInputFormat(ReadBuffer & in_, Block header, Params params_, const CapnProtoSchemaInfo & info, const FormatSettings & format_settings);
String getName() const override { return "CapnProtoRowInputFormat"; }

View File

@ -23,14 +23,14 @@ void CapnProtoOutputStream::write(const void * buffer, size_t size)
CapnProtoRowOutputFormat::CapnProtoRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
const FormatSchemaInfo & info,
const CapnProtoSchemaInfo & info,
const FormatSettings & format_settings)
: IRowOutputFormat(header_, out_)
, column_names(header_.getNames())
, column_types(header_.getDataTypes())
, output_stream(std::make_unique<CapnProtoOutputStream>(out_))
{
schema = schema_parser.getMessageSchema(info);
schema = schema_parser.getMessageSchema(info.getSchemaInfo());
const auto & header = getPort(PortKind::Main).getHeader();
serializer = std::make_unique<CapnProtoSerializer>(header.getDataTypes(), header.getNames(), schema, format_settings.capn_proto);
capnp::MallocMessageBuilder message;
@ -52,7 +52,11 @@ void registerOutputFormatCapnProto(FormatFactory & factory)
const Block & sample,
const FormatSettings & format_settings)
{
return std::make_shared<CapnProtoRowOutputFormat>(buf, sample, FormatSchemaInfo(format_settings, "CapnProto", true), format_settings);
return std::make_shared<CapnProtoRowOutputFormat>(
buf,
sample,
CapnProtoSchemaInfo(format_settings, "CapnProto", sample, format_settings.capn_proto.use_autogenerated_schema),
format_settings);
});
}

View File

@ -31,8 +31,8 @@ public:
CapnProtoRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
const FormatSchemaInfo & info,
const FormatSettings & format_settings_);
const CapnProtoSchemaInfo & info,
const FormatSettings & format_settings);
String getName() const override { return "CapnProtoRowOutputFormat"; }

View File

@ -14,7 +14,7 @@ ProtobufListInputFormat::ProtobufListInputFormat(
ReadBuffer & in_,
const Block & header_,
const Params & params_,
const FormatSchemaInfo & schema_info_,
const ProtobufSchemaInfo & schema_info_,
bool flatten_google_wrappers_)
: IRowInputFormat(header_, in_, params_)
, reader(std::make_unique<ProtobufReader>(in_))
@ -22,7 +22,7 @@ ProtobufListInputFormat::ProtobufListInputFormat(
header_.getNames(),
header_.getDataTypes(),
missing_column_indices,
*ProtobufSchemas::instance().getMessageTypeForFormatSchema(schema_info_, ProtobufSchemas::WithEnvelope::Yes),
*ProtobufSchemas::instance().getMessageTypeForFormatSchema(schema_info_.getSchemaInfo(), ProtobufSchemas::WithEnvelope::Yes),
/* with_length_delimiter = */ true,
/* with_envelope = */ true,
flatten_google_wrappers_,
@ -84,7 +84,7 @@ void registerInputFormatProtobufList(FormatFactory & factory)
const FormatSettings & settings)
{
return std::make_shared<ProtobufListInputFormat>(buf, sample, std::move(params),
FormatSchemaInfo(settings, "Protobuf", true), settings.protobuf.input_flatten_google_wrappers);
ProtobufSchemaInfo(settings, "Protobuf", sample, settings.protobuf.use_autogenerated_schema), settings.protobuf.input_flatten_google_wrappers);
});
factory.markFormatSupportsSubsetOfColumns("ProtobufList");
factory.registerAdditionalInfoForSchemaCacheGetter(

View File

@ -28,7 +28,7 @@ public:
ReadBuffer & in_,
const Block & header_,
const Params & params_,
const FormatSchemaInfo & schema_info_,
const ProtobufSchemaInfo & schema_info_,
bool flatten_google_wrappers_);
String getName() const override { return "ProtobufListInputFormat"; }

View File

@ -2,7 +2,6 @@
#if USE_PROTOBUF
# include <Formats/FormatFactory.h>
# include <Formats/FormatSchemaInfo.h>
# include <Formats/ProtobufWriter.h>
# include <Formats/ProtobufSerializer.h>
# include <Formats/ProtobufSchemas.h>
@ -13,14 +12,14 @@ namespace DB
ProtobufListOutputFormat::ProtobufListOutputFormat(
WriteBuffer & out_,
const Block & header_,
const FormatSchemaInfo & schema_info_,
const ProtobufSchemaInfo & schema_info_,
bool defaults_for_nullable_google_wrappers_)
: IRowOutputFormat(header_, out_)
, writer(std::make_unique<ProtobufWriter>(out))
, serializer(ProtobufSerializer::create(
header_.getNames(),
header_.getDataTypes(),
*ProtobufSchemas::instance().getMessageTypeForFormatSchema(schema_info_, ProtobufSchemas::WithEnvelope::Yes),
*ProtobufSchemas::instance().getMessageTypeForFormatSchema(schema_info_.getSchemaInfo(), ProtobufSchemas::WithEnvelope::Yes),
/* with_length_delimiter = */ true,
/* with_envelope = */ true,
defaults_for_nullable_google_wrappers_,
@ -55,7 +54,7 @@ void registerOutputFormatProtobufList(FormatFactory & factory)
const FormatSettings & settings)
{
return std::make_shared<ProtobufListOutputFormat>(
buf, header, FormatSchemaInfo(settings, "Protobuf", true),
buf, header, ProtobufSchemaInfo(settings, "Protobuf", header, settings.protobuf.use_autogenerated_schema),
settings.protobuf.output_nullables_with_google_wrappers);
});
}

View File

@ -4,10 +4,10 @@
#if USE_PROTOBUF
# include <Processors/Formats/IRowOutputFormat.h>
# include <Formats/FormatSchemaInfo.h>
namespace DB
{
class FormatSchemaInfo;
class ProtobufWriter;
class ProtobufSerializer;
@ -26,7 +26,7 @@ public:
ProtobufListOutputFormat(
WriteBuffer & out_,
const Block & header_,
const FormatSchemaInfo & schema_info_,
const ProtobufSchemaInfo & schema_info_,
bool defaults_for_nullable_google_wrappers_);
String getName() const override { return "ProtobufListOutputFormat"; }

View File

@ -11,9 +11,9 @@ namespace DB
{
ProtobufRowInputFormat::ProtobufRowInputFormat(ReadBuffer & in_, const Block & header_, const Params & params_,
const FormatSchemaInfo & schema_info_, bool with_length_delimiter_, bool flatten_google_wrappers_)
const ProtobufSchemaInfo & schema_info_, bool with_length_delimiter_, bool flatten_google_wrappers_)
: IRowInputFormat(header_, in_, params_)
, message_descriptor(ProtobufSchemas::instance().getMessageTypeForFormatSchema(schema_info_, ProtobufSchemas::WithEnvelope::No))
, message_descriptor(ProtobufSchemas::instance().getMessageTypeForFormatSchema(schema_info_.getSchemaInfo(), ProtobufSchemas::WithEnvelope::No))
, with_length_delimiter(with_length_delimiter_)
, flatten_google_wrappers(flatten_google_wrappers_)
{
@ -89,7 +89,7 @@ void registerInputFormatProtobuf(FormatFactory & factory)
const FormatSettings & settings)
{
return std::make_shared<ProtobufRowInputFormat>(buf, sample, std::move(params),
FormatSchemaInfo(settings, "Protobuf", true),
ProtobufSchemaInfo(settings, "Protobuf", sample, settings.protobuf.use_autogenerated_schema),
with_length_delimiter,
settings.protobuf.input_flatten_google_wrappers);
});

View File

@ -33,7 +33,7 @@ public:
ReadBuffer & in_,
const Block & header_,
const Params & params_,
const FormatSchemaInfo & schema_info_,
const ProtobufSchemaInfo & schema_info_,
bool with_length_delimiter_,
bool flatten_google_wrappers_);

View File

@ -3,7 +3,6 @@
#if USE_PROTOBUF
# include <Formats/FormatFactory.h>
# include <Core/Block.h>
# include <Formats/FormatSchemaInfo.h>
# include <Formats/FormatSettings.h>
# include <Formats/ProtobufSchemas.h>
# include <Formats/ProtobufSerializer.h>
@ -20,7 +19,7 @@ namespace ErrorCodes
ProtobufRowOutputFormat::ProtobufRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
const FormatSchemaInfo & schema_info_,
const ProtobufSchemaInfo & schema_info_,
const FormatSettings & settings_,
bool with_length_delimiter_)
: IRowOutputFormat(header_, out_)
@ -28,7 +27,7 @@ ProtobufRowOutputFormat::ProtobufRowOutputFormat(
, serializer(ProtobufSerializer::create(
header_.getNames(),
header_.getDataTypes(),
*ProtobufSchemas::instance().getMessageTypeForFormatSchema(schema_info_, ProtobufSchemas::WithEnvelope::No),
*ProtobufSchemas::instance().getMessageTypeForFormatSchema(schema_info_.getSchemaInfo(), ProtobufSchemas::WithEnvelope::No),
with_length_delimiter_,
/* with_envelope = */ false,
settings_.protobuf.output_nullables_with_google_wrappers,
@ -61,7 +60,7 @@ void registerOutputFormatProtobuf(FormatFactory & factory)
const FormatSettings & settings)
{
return std::make_shared<ProtobufRowOutputFormat>(
buf, header, FormatSchemaInfo(settings, "Protobuf", true),
buf, header, ProtobufSchemaInfo(settings, "Protobuf", header, settings.protobuf.use_autogenerated_schema),
settings, with_length_delimiter);
});
}

View File

@ -4,11 +4,11 @@
#if USE_PROTOBUF
# include <Processors/Formats/IRowOutputFormat.h>
# include <Formats/FormatSchemaInfo.h>
namespace DB
{
class DB;
class FormatSchemaInfo;
class ProtobufSerializer;
class ProtobufWriter;
class WriteBuffer;
@ -30,7 +30,7 @@ public:
ProtobufRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
const FormatSchemaInfo & schema_info_,
const ProtobufSchemaInfo & schema_info_,
const FormatSettings & settings_,
bool with_length_delimiter_);

View File

@ -5,6 +5,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int SET_SIZE_LIMIT_EXCEEDED;
}
@ -126,9 +127,20 @@ bool DistinctSortedChunkTransform::isKey(const size_t key_pos, const size_t row_
bool DistinctSortedChunkTransform::isLatestKeyFromPrevChunk(const size_t row_pos) const
{
for (size_t i = 0; i < sorted_columns.size(); ++i)
for (size_t i = 0, s = sorted_columns.size(); i < s; ++i)
{
const int res = prev_chunk_latest_key[i]->compareAt(0, row_pos, *sorted_columns[i], sorted_columns_descr[i].nulls_direction);
const auto & sorted_column = *sorted_columns[i];
/// temporary hardening due to suspious crashes in sqlancer tests
if (unlikely(sorted_column.size() <= row_pos))
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Unexpected size of a sorted column: size {}, row_pos {}, column position {}, type {}",
sorted_column.size(),
row_pos,
i,
sorted_column.getFamilyName());
const int res = prev_chunk_latest_key[i]->compareAt(0, row_pos, sorted_column, sorted_columns_descr[i].nulls_direction);
if (res != 0)
return false;
}

View File

@ -159,7 +159,7 @@ void PartialSortingTransform::transform(Chunk & chunk)
{
MutableColumnPtr sort_description_threshold_column_updated = raw_block_columns[i]->cloneEmpty();
sort_description_threshold_column_updated->insertFrom(*raw_block_columns[i], min_row_to_compare);
sort_description_threshold_columns_updated[i] = std::move(sort_description_threshold_column_updated);
sort_description_threshold_columns_updated[i] = sort_description_threshold_column_updated->convertToFullColumnIfSparse();
}
sort_description_threshold_columns = std::move(sort_description_threshold_columns_updated);

View File

@ -250,15 +250,16 @@ StorageKafka::StorageKafka(
: IStorage(table_id_)
, WithContext(context_->getGlobalContext())
, kafka_settings(std::move(kafka_settings_))
, topics(parseTopics(getContext()->getMacros()->expand(kafka_settings->kafka_topic_list.value)))
, brokers(getContext()->getMacros()->expand(kafka_settings->kafka_broker_list.value))
, group(getContext()->getMacros()->expand(kafka_settings->kafka_group_name.value))
, macros_info{.table_id = table_id_}
, topics(parseTopics(getContext()->getMacros()->expand(kafka_settings->kafka_topic_list.value, macros_info)))
, brokers(getContext()->getMacros()->expand(kafka_settings->kafka_broker_list.value, macros_info))
, group(getContext()->getMacros()->expand(kafka_settings->kafka_group_name.value, macros_info))
, client_id(
kafka_settings->kafka_client_id.value.empty() ? getDefaultClientId(table_id_)
: getContext()->getMacros()->expand(kafka_settings->kafka_client_id.value))
: getContext()->getMacros()->expand(kafka_settings->kafka_client_id.value, macros_info))
, format_name(getContext()->getMacros()->expand(kafka_settings->kafka_format.value))
, max_rows_per_message(kafka_settings->kafka_max_rows_per_message.value)
, schema_name(getContext()->getMacros()->expand(kafka_settings->kafka_schema.value))
, schema_name(getContext()->getMacros()->expand(kafka_settings->kafka_schema.value, macros_info))
, num_consumers(kafka_settings->kafka_num_consumers.value)
, log(&Poco::Logger::get("StorageKafka (" + table_id_.table_name + ")"))
, semaphore(0, static_cast<int>(num_consumers))

View File

@ -1,5 +1,6 @@
#pragma once
#include <Common/Macros.h>
#include <Core/BackgroundSchedulePool.h>
#include <Storages/IStorage.h>
#include <Storages/Kafka/KafkaConsumer.h>
@ -79,6 +80,7 @@ public:
private:
// Configuration and state
std::unique_ptr<KafkaSettings> kafka_settings;
Macros::MacroExpansionInfo macros_info;
const Names topics;
const String brokers;
const String group;

View File

@ -350,7 +350,7 @@ void DataPartStorageOnDiskBase::backup(
temp_dir_it = temp_dirs->emplace(disk, std::make_shared<TemporaryFileOnDisk>(disk, "tmp/")).first;
temp_dir_owner = temp_dir_it->second;
fs::path temp_dir = temp_dir_owner->getPath();
fs::path temp_dir = temp_dir_owner->getRelativePath();
temp_part_dir = temp_dir / part_path_in_backup.relative_path();
disk->createDirectories(temp_part_dir);
}

View File

@ -5266,7 +5266,7 @@ public:
auto it = temp_dirs.find(disk);
if (it == temp_dirs.end())
it = temp_dirs.emplace(disk, std::make_shared<TemporaryFileOnDisk>(disk, "tmp/")).first;
return it->second->getPath();
return it->second->getRelativePath();
}
private:

View File

@ -86,7 +86,7 @@ const std::unordered_set<std::string_view> optional_configuration_keys = {
bool isConnectionString(const std::string & candidate)
{
return candidate.starts_with("DefaultEndpointsProtocol");
return !candidate.starts_with("http");
}
}
@ -257,7 +257,7 @@ void registerStorageAzureBlob(StorageFactory & factory)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "External data source must have arguments");
auto configuration = StorageAzureBlob::getConfiguration(engine_args, args.getLocalContext());
auto client = StorageAzureBlob::createClient(configuration);
auto client = StorageAzureBlob::createClient(configuration, /* is_read_only */ false);
// Use format settings from global server context + settings from
// the SETTINGS clause of the create query. Settings from current
// session and user are ignored.
@ -309,58 +309,113 @@ void registerStorageAzureBlob(StorageFactory & factory)
});
}
AzureClientPtr StorageAzureBlob::createClient(StorageAzureBlob::Configuration configuration)
static bool containerExists(std::unique_ptr<BlobServiceClient> &blob_service_client, std::string container_name)
{
Azure::Storage::Blobs::ListBlobContainersOptions options;
options.Prefix = container_name;
options.PageSizeHint = 1;
auto containers_list_response = blob_service_client->ListBlobContainers(options);
auto containers_list = containers_list_response.BlobContainers;
for (const auto & container : containers_list)
{
if (container_name == container.Name)
return true;
}
return false;
}
AzureClientPtr StorageAzureBlob::createClient(StorageAzureBlob::Configuration configuration, bool is_read_only)
{
AzureClientPtr result;
if (configuration.is_connection_string)
{
std::unique_ptr<BlobServiceClient> blob_service_client = std::make_unique<BlobServiceClient>(BlobServiceClient::CreateFromConnectionString(configuration.connection_url));
result = std::make_unique<BlobContainerClient>(BlobContainerClient::CreateFromConnectionString(configuration.connection_url, configuration.container));
result->CreateIfNotExists();
}
else
{
if (configuration.account_name.has_value() && configuration.account_key.has_value())
bool container_exists = containerExists(blob_service_client,configuration.container);
if (!container_exists)
{
auto storage_shared_key_credential = std::make_shared<Azure::Storage::StorageSharedKeyCredential>(*configuration.account_name, *configuration.account_key);
auto blob_service_client = std::make_unique<BlobServiceClient>(configuration.connection_url, storage_shared_key_credential);
if (is_read_only)
throw Exception(
ErrorCodes::DATABASE_ACCESS_DENIED,
"AzureBlobStorage container does not exist '{}'",
configuration.container);
try
{
result = std::make_unique<BlobContainerClient>(blob_service_client->CreateBlobContainer(configuration.container).Value);
}
catch (const Azure::Storage::StorageException & e)
result->CreateIfNotExists();
} catch (const Azure::Storage::StorageException & e)
{
if (e.StatusCode == Azure::Core::Http::HttpStatusCode::Conflict)
{
auto final_url = configuration.connection_url
+ (configuration.connection_url.back() == '/' ? "" : "/")
+ configuration.container;
result = std::make_unique<BlobContainerClient>(final_url, storage_shared_key_credential);
}
else
if (!(e.StatusCode == Azure::Core::Http::HttpStatusCode::Conflict
&& e.ReasonPhrase == "The specified container already exists."))
{
throw;
}
}
}
}
else
{
std::shared_ptr<Azure::Storage::StorageSharedKeyCredential> storage_shared_key_credential;
if (configuration.account_name.has_value() && configuration.account_key.has_value())
{
storage_shared_key_credential
= std::make_shared<Azure::Storage::StorageSharedKeyCredential>(*configuration.account_name, *configuration.account_key);
}
std::unique_ptr<BlobServiceClient> blob_service_client;
if (storage_shared_key_credential)
{
blob_service_client = std::make_unique<BlobServiceClient>(configuration.connection_url, storage_shared_key_credential);
}
else
{
auto managed_identity_credential = std::make_shared<Azure::Identity::ManagedIdentityCredential>();
auto blob_service_client = std::make_unique<BlobServiceClient>(configuration.connection_url, managed_identity_credential);
blob_service_client = std::make_unique<BlobServiceClient>(configuration.connection_url);
}
bool container_exists = containerExists(blob_service_client,configuration.container);
std::string final_url;
size_t pos = configuration.connection_url.find('?');
if (pos != std::string::npos)
{
auto url_without_sas = configuration.connection_url.substr(0, pos);
final_url = url_without_sas + (url_without_sas.back() == '/' ? "" : "/") + configuration.container
+ configuration.connection_url.substr(pos);
}
else
final_url
= configuration.connection_url + (configuration.connection_url.back() == '/' ? "" : "/") + configuration.container;
if (container_exists)
{
if (storage_shared_key_credential)
result = std::make_unique<BlobContainerClient>(final_url, storage_shared_key_credential);
else
result = std::make_unique<BlobContainerClient>(final_url);
}
else
{
if (is_read_only)
throw Exception(
ErrorCodes::DATABASE_ACCESS_DENIED,
"AzureBlobStorage container does not exist '{}'",
configuration.container);
try
{
result = std::make_unique<BlobContainerClient>(blob_service_client->CreateBlobContainer(configuration.container).Value);
}
catch (const Azure::Storage::StorageException & e)
} catch (const Azure::Storage::StorageException & e)
{
if (e.StatusCode == Azure::Core::Http::HttpStatusCode::Conflict)
if (e.StatusCode == Azure::Core::Http::HttpStatusCode::Conflict
&& e.ReasonPhrase == "The specified container already exists.")
{
auto final_url = configuration.connection_url
+ (configuration.connection_url.back() == '/' ? "" : "/")
+ configuration.container;
result = std::make_unique<BlobContainerClient>(final_url, managed_identity_credential);
if (storage_shared_key_credential)
result = std::make_unique<BlobContainerClient>(final_url, storage_shared_key_credential);
else
result = std::make_unique<BlobContainerClient>(final_url);
}
else
{
@ -438,7 +493,7 @@ void StorageAzureBlob::truncate(const ASTPtr &, const StorageMetadataPtr &, Cont
{
throw Exception(
ErrorCodes::DATABASE_ACCESS_DENIED,
"S3 key '{}' contains globs, so the table is in readonly mode",
"AzureBlobStorage key '{}' contains globs, so the table is in readonly mode",
configuration.blob_path);
}
@ -1203,7 +1258,7 @@ ColumnsDescription StorageAzureBlob::getTableStructureFromData(
return nullptr;
}
/// S3 file iterator could get new keys after new iteration, check them in schema cache.
///AzureBlobStorage file iterator could get new keys after new iteration, check them in schema cache.
if (ctx->getSettingsRef().schema_inference_use_cache_for_azure && read_keys.size() > prev_read_keys_size)
{
columns_from_cache = tryGetColumnsFromCache(read_keys.begin() + prev_read_keys_size, read_keys.end(), configuration, format_settings, ctx);

View File

@ -65,7 +65,7 @@ public:
ASTPtr partition_by_);
static StorageAzureBlob::Configuration getConfiguration(ASTs & engine_args, ContextPtr local_context);
static AzureClientPtr createClient(StorageAzureBlob::Configuration configuration);
static AzureClientPtr createClient(StorageAzureBlob::Configuration configuration, bool is_read_only);
static AzureObjectStorage::SettingsPtr createSettings(ContextPtr local_context);

View File

@ -946,7 +946,7 @@ void StorageLog::backupData(BackupEntriesCollector & backup_entries_collector, c
fs::path data_path_in_backup_fs = data_path_in_backup;
auto temp_dir_owner = std::make_shared<TemporaryFileOnDisk>(disk, "tmp/");
fs::path temp_dir = temp_dir_owner->getPath();
fs::path temp_dir = temp_dir_owner->getRelativePath();
disk->createDirectories(temp_dir);
bool copy_encrypted = !backup_entries_collector.getBackupSettings().decrypt_files_from_encrypted_disks;

View File

@ -314,7 +314,7 @@ namespace
backup_entries.resize(file_paths.size());
temp_dir_owner.emplace(temp_disk);
fs::path temp_dir = temp_dir_owner->getPath();
fs::path temp_dir = temp_dir_owner->getRelativePath();
temp_disk->createDirectories(temp_dir);
/// Writing data.bin
@ -453,10 +453,10 @@ void StorageMemory::restoreDataImpl(const BackupPtr & backup, const String & dat
if (!dynamic_cast<ReadBufferFromFileBase *>(in.get()))
{
temp_data_file.emplace(temporary_disk);
auto out = std::make_unique<WriteBufferFromFile>(temp_data_file->getPath());
auto out = std::make_unique<WriteBufferFromFile>(temp_data_file->getAbsolutePath());
copyData(*in, *out);
out.reset();
in = createReadBufferFromFileBase(temp_data_file->getPath(), {});
in = createReadBufferFromFileBase(temp_data_file->getAbsolutePath(), {});
}
std::unique_ptr<ReadBufferFromFileBase> in_from_file{static_cast<ReadBufferFromFileBase *>(in.release())};
CompressedReadBufferFromFile compressed_in{std::move(in_from_file)};

View File

@ -544,7 +544,7 @@ void StorageStripeLog::backupData(BackupEntriesCollector & backup_entries_collec
fs::path data_path_in_backup_fs = data_path_in_backup;
auto temp_dir_owner = std::make_shared<TemporaryFileOnDisk>(disk, "tmp/");
fs::path temp_dir = temp_dir_owner->getPath();
fs::path temp_dir = temp_dir_owner->getRelativePath();
disk->createDirectories(temp_dir);
bool copy_encrypted = !backup_entries_collector.getBackupSettings().decrypt_files_from_encrypted_disks;

View File

@ -371,7 +371,7 @@ std::pair<Poco::URI, std::unique_ptr<ReadWriteBufferFromHTTP>> StorageURLSource:
for (; option != end; ++option)
{
bool skip_url_not_found_error = glob_url && read_settings.http_skip_not_found_url_for_globs && option == std::prev(end);
auto request_uri = Poco::URI(*option, context->getSettingsRef().disable_url_encoding);
auto request_uri = Poco::URI(*option, context->getSettingsRef().enable_url_encoding);
for (const auto & [param, value] : params)
request_uri.addQueryParameter(param, value);

View File

@ -38,7 +38,7 @@ ColumnsDescription getStructureOfRemoteTableInShard(
if (shard_info.isLocal())
{
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_func_ptr, context);
return table_function_ptr->getActualTableStructure(context);
return table_function_ptr->getActualTableStructure(context, /*is_insert_query*/ true);
}
auto table_func_name = queryToString(table_func_ptr);

View File

@ -49,13 +49,14 @@ namespace DB
actual_columns = parseColumnsListFromString(table_structure, context_);
}
ColumnsDescription TableFunctionHive::getActualTableStructure(ContextPtr /*context_*/) const { return actual_columns; }
ColumnsDescription TableFunctionHive::getActualTableStructure(ContextPtr /*context_*/, bool /*is_insert_query*/) const { return actual_columns; }
StoragePtr TableFunctionHive::executeImpl(
const ASTPtr & /*ast_function_*/,
ContextPtr context_,
const std::string & table_name_,
ColumnsDescription /*cached_columns_*/) const
ColumnsDescription /*cached_columns_*/,
bool /*is_insert_query*/) const
{
const Settings & settings = context_->getSettings();
ParserExpression partition_by_parser;

View File

@ -17,10 +17,10 @@ public:
bool hasStaticStructure() const override { return true; }
StoragePtr executeImpl(
const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const override;
const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;
const char * getStorageTypeName() const override { return storage_type_name; }
ColumnsDescription getActualTableStructure(ContextPtr) const override;
ColumnsDescription getActualTableStructure(ContextPtr, bool is_insert_query) const override;
void parseArguments(const ASTPtr & ast_function_, ContextPtr context_) override;
private:

View File

@ -34,15 +34,15 @@ StoragePtr ITableFunction::execute(const ASTPtr & ast_function, ContextPtr conte
auto context_to_use = use_global_context ? context->getGlobalContext() : context;
if (cached_columns.empty())
return executeImpl(ast_function, context, table_name, std::move(cached_columns));
return executeImpl(ast_function, context, table_name, std::move(cached_columns), is_insert_query);
if (hasStaticStructure() && cached_columns == getActualTableStructure(context))
return executeImpl(ast_function, context_to_use, table_name, std::move(cached_columns));
if (hasStaticStructure() && cached_columns == getActualTableStructure(context,is_insert_query))
return executeImpl(ast_function, context_to_use, table_name, std::move(cached_columns), is_insert_query);
auto this_table_function = shared_from_this();
auto get_storage = [=]() -> StoragePtr
{
return this_table_function->executeImpl(ast_function, context_to_use, table_name, cached_columns);
return this_table_function->executeImpl(ast_function, context_to_use, table_name, cached_columns, is_insert_query);
};
/// It will request actual table structure and create underlying storage lazily

View File

@ -58,7 +58,7 @@ public:
virtual void parseArguments(const ASTPtr & /*ast_function*/, ContextPtr /*context*/) {}
/// Returns actual table structure probably requested from remote server, may fail
virtual ColumnsDescription getActualTableStructure(ContextPtr /*context*/) const = 0;
virtual ColumnsDescription getActualTableStructure(ContextPtr /*context*/, bool is_insert_query) const = 0;
/// Check if table function needs a structure hint from SELECT query in case of
/// INSERT INTO FUNCTION ... SELECT ... and INSERT INTO ... SELECT ... FROM table_function(...)
@ -89,7 +89,7 @@ protected:
private:
virtual StoragePtr executeImpl(
const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const = 0;
const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const = 0;
virtual const char * getStorageTypeName() const = 0;
};

View File

@ -26,7 +26,8 @@ protected:
const ASTPtr & /*ast_function*/,
ContextPtr context,
const std::string & table_name,
ColumnsDescription /*cached_columns*/) const override
ColumnsDescription /*cached_columns*/,
bool /*is_insert_query*/) const override
{
ColumnsDescription columns;
if (TableFunction::configuration.structure != "auto")
@ -42,7 +43,7 @@ protected:
const char * getStorageTypeName() const override { return Storage::name; }
ColumnsDescription getActualTableStructure(ContextPtr context) const override
ColumnsDescription getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const override
{
if (TableFunction::configuration.structure == "auto")
{

View File

@ -110,7 +110,7 @@ void ITableFunctionFileLike::addColumnsStructureToArguments(ASTs & args, const S
}
}
StoragePtr ITableFunctionFileLike::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
StoragePtr ITableFunctionFileLike::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool /*is_insert_query*/) const
{
ColumnsDescription columns;
if (structure != "auto")

View File

@ -48,7 +48,7 @@ protected:
ColumnsDescription structure_hint;
private:
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const override;
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;
virtual StoragePtr getStorage(
const String & source, const String & format, const ColumnsDescription & columns, ContextPtr global_context,

View File

@ -61,7 +61,7 @@ void ITableFunctionXDBC::startBridgeIfNot(ContextPtr context) const
}
}
ColumnsDescription ITableFunctionXDBC::getActualTableStructure(ContextPtr context) const
ColumnsDescription ITableFunctionXDBC::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const
{
startBridgeIfNot(context);
@ -92,10 +92,10 @@ ColumnsDescription ITableFunctionXDBC::getActualTableStructure(ContextPtr contex
return ColumnsDescription{columns};
}
StoragePtr ITableFunctionXDBC::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
StoragePtr ITableFunctionXDBC::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool is_insert_query) const
{
startBridgeIfNot(context);
auto columns = getActualTableStructure(context);
auto columns = getActualTableStructure(context, is_insert_query);
auto result = std::make_shared<StorageXDBC>(
StorageID(getDatabaseName(), table_name), schema_name, remote_table_name, columns, ConstraintsDescription{}, String{}, context, helper);
result->startup();

View File

@ -16,7 +16,7 @@ namespace DB
class ITableFunctionXDBC : public ITableFunction
{
private:
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const override;
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;
/* A factory method to create bridge helper, that will assist in remote interaction */
virtual BridgeHelperPtr createBridgeHelper(ContextPtr context,
@ -24,7 +24,7 @@ private:
const std::string & connection_string_,
bool use_connection_pooling_) const = 0;
ColumnsDescription getActualTableStructure(ContextPtr context) const override;
ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;

View File

@ -39,7 +39,7 @@ namespace
bool isConnectionString(const std::string & candidate)
{
return candidate.starts_with("DefaultEndpointsProtocol");
return !candidate.starts_with("http");
}
}
@ -193,12 +193,12 @@ void TableFunctionAzureBlobStorage::parseArguments(const ASTPtr & ast_function,
configuration = parseArgumentsImpl(args, context);
}
ColumnsDescription TableFunctionAzureBlobStorage::getActualTableStructure(ContextPtr context) const
ColumnsDescription TableFunctionAzureBlobStorage::getActualTableStructure(ContextPtr context, bool is_insert_query) const
{
if (configuration.structure == "auto")
{
context->checkAccess(getSourceAccessType());
auto client = StorageAzureBlob::createClient(configuration);
auto client = StorageAzureBlob::createClient(configuration, !is_insert_query);
auto settings = StorageAzureBlob::createSettings(context);
auto object_storage = std::make_unique<AzureObjectStorage>("AzureBlobStorageTableFunction", std::move(client), std::move(settings));
@ -213,9 +213,9 @@ bool TableFunctionAzureBlobStorage::supportsReadingSubsetOfColumns()
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format);
}
StoragePtr TableFunctionAzureBlobStorage::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
StoragePtr TableFunctionAzureBlobStorage::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool is_insert_query) const
{
auto client = StorageAzureBlob::createClient(configuration);
auto client = StorageAzureBlob::createClient(configuration, !is_insert_query);
auto settings = StorageAzureBlob::createSettings(context);
ColumnsDescription columns;

View File

@ -54,11 +54,12 @@ protected:
const ASTPtr & ast_function,
ContextPtr context,
const std::string & table_name,
ColumnsDescription cached_columns) const override;
ColumnsDescription cached_columns,
bool is_insert_query) const override;
const char * getStorageTypeName() const override { return "Azure"; }
ColumnsDescription getActualTableStructure(ContextPtr context) const override;
ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
mutable StorageAzureBlob::Configuration configuration;

View File

@ -43,7 +43,7 @@ void TableFunctionDictionary::parseArguments(const ASTPtr & ast_function, Contex
dictionary_name = checkAndGetLiteralArgument<String>(args[0], "dictionary_name");
}
ColumnsDescription TableFunctionDictionary::getActualTableStructure(ContextPtr context) const
ColumnsDescription TableFunctionDictionary::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const
{
const ExternalDictionariesLoader & external_loader = context->getExternalDictionariesLoader();
std::string resolved_name = external_loader.resolveDictionaryName(dictionary_name, context->getCurrentDatabase());
@ -76,10 +76,10 @@ ColumnsDescription TableFunctionDictionary::getActualTableStructure(ContextPtr c
}
StoragePtr TableFunctionDictionary::executeImpl(
const ASTPtr &, ContextPtr context, const std::string & table_name, ColumnsDescription) const
const ASTPtr &, ContextPtr context, const std::string & table_name, ColumnsDescription, bool is_insert_query) const
{
StorageID dict_id(getDatabaseName(), table_name);
auto dictionary_table_structure = getActualTableStructure(context);
auto dictionary_table_structure = getActualTableStructure(context, is_insert_query);
auto result = std::make_shared<StorageDictionary>(
dict_id, dictionary_name, std::move(dictionary_table_structure), String{}, StorageDictionary::Location::Custom, context);

View File

@ -18,9 +18,9 @@ public:
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
ColumnsDescription getActualTableStructure(ContextPtr context) const override;
ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription) const override;
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription, bool is_insert_query) const override;
const char * getStorageTypeName() const override { return "Dictionary"; }

View File

@ -120,12 +120,12 @@ void TableFunctionExecutable::parseArguments(const ASTPtr & ast_function, Contex
}
}
ColumnsDescription TableFunctionExecutable::getActualTableStructure(ContextPtr context) const
ColumnsDescription TableFunctionExecutable::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const
{
return parseColumnsListFromString(structure, context);
}
StoragePtr TableFunctionExecutable::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
StoragePtr TableFunctionExecutable::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool is_insert_query) const
{
auto storage_id = StorageID(getDatabaseName(), table_name);
auto global_context = context->getGlobalContext();
@ -135,7 +135,7 @@ StoragePtr TableFunctionExecutable::executeImpl(const ASTPtr & /*ast_function*/,
if (settings_query != nullptr)
settings.applyChanges(settings_query->as<ASTSetQuery>()->changes);
auto storage = std::make_shared<StorageExecutable>(storage_id, format, settings, input_queries, getActualTableStructure(context), ConstraintsDescription{});
auto storage = std::make_shared<StorageExecutable>(storage_id, format, settings, input_queries, getActualTableStructure(context, is_insert_query), ConstraintsDescription{});
storage->startup();
return storage;
}

View File

@ -24,11 +24,11 @@ public:
bool hasStaticStructure() const override { return true; }
private:
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const override;
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;
const char * getStorageTypeName() const override { return "Executable"; }
ColumnsDescription getActualTableStructure(ContextPtr context) const override;
ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
std::vector<size_t> skipAnalysisForArguments(const QueryTreeNodePtr & query_node_table_function, ContextPtr context) const override;

View File

@ -91,7 +91,7 @@ void TableFunctionExplain::parseArguments(const ASTPtr & ast_function, ContextPt
query = std::move(explain_query);
}
ColumnsDescription TableFunctionExplain::getActualTableStructure(ContextPtr context) const
ColumnsDescription TableFunctionExplain::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const
{
Block sample_block = getInterpreter(context).getSampleBlock(query->as<ASTExplainQuery>()->getKind());
ColumnsDescription columns_description;
@ -123,7 +123,7 @@ static Block executeMonoBlock(QueryPipeline & pipeline)
}
StoragePtr TableFunctionExplain::executeImpl(
const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool is_insert_query) const
{
/// To support settings inside explain subquery.
auto mutable_context = Context::createCopy(context);
@ -132,7 +132,7 @@ StoragePtr TableFunctionExplain::executeImpl(
Block block = executeMonoBlock(blockio.pipeline);
StorageID storage_id(getDatabaseName(), table_name);
auto storage = std::make_shared<StorageValues>(storage_id, getActualTableStructure(context), std::move(block));
auto storage = std::make_shared<StorageValues>(storage_id, getActualTableStructure(context, is_insert_query), std::move(block));
storage->startup();
return storage;
}

View File

@ -17,7 +17,7 @@ public:
std::string getName() const override { return name; }
private:
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const String & table_name, ColumnsDescription cached_columns) const override;
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const String & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;
const char * getStorageTypeName() const override { return "Explain"; }
@ -25,7 +25,7 @@ private:
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
ColumnsDescription getActualTableStructure(ContextPtr context) const override;
ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
InterpreterExplainQuery getInterpreter(ContextPtr context) const;

View File

@ -83,7 +83,7 @@ StoragePtr TableFunctionFile::getStorage(const String & source,
return std::make_shared<StorageFile>(source, global_context->getUserFilesPath(), args);
}
ColumnsDescription TableFunctionFile::getActualTableStructure(ContextPtr context) const
ColumnsDescription TableFunctionFile::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const
{
if (structure == "auto")
{

View File

@ -20,7 +20,7 @@ public:
return name;
}
ColumnsDescription getActualTableStructure(ContextPtr context) const override;
ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
std::unordered_set<String> getVirtualsToCheckBeforeUsingStructureHint() const override
{

View File

@ -52,7 +52,7 @@ void TableFunctionFormat::parseArguments(const ASTPtr & ast_function, ContextPtr
structure = checkAndGetLiteralArgument<String>(args[1], "structure");
}
ColumnsDescription TableFunctionFormat::getActualTableStructure(ContextPtr context) const
ColumnsDescription TableFunctionFormat::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const
{
if (structure == "auto")
{
@ -98,9 +98,9 @@ Block TableFunctionFormat::parseData(ColumnsDescription columns, ContextPtr cont
return concatenateBlocks(blocks);
}
StoragePtr TableFunctionFormat::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
StoragePtr TableFunctionFormat::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool is_insert_query) const
{
auto columns = getActualTableStructure(context);
auto columns = getActualTableStructure(context, is_insert_query);
Block res_block = parseData(columns, context);
auto res = std::make_shared<StorageValues>(StorageID(getDatabaseName(), table_name), columns, res_block);
res->startup();

View File

@ -18,10 +18,10 @@ public:
bool hasStaticStructure() const override { return false; }
private:
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const override;
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;
const char * getStorageTypeName() const override { return "Values"; }
ColumnsDescription getActualTableStructure(ContextPtr context) const override;
ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
Block parseData(ColumnsDescription columns, ContextPtr context) const;

View File

@ -97,7 +97,7 @@ void TableFunctionGenerateRandom::parseArguments(const ASTPtr & ast_function, Co
}
}
ColumnsDescription TableFunctionGenerateRandom::getActualTableStructure(ContextPtr context) const
ColumnsDescription TableFunctionGenerateRandom::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const
{
if (structure == "auto")
{
@ -113,9 +113,9 @@ ColumnsDescription TableFunctionGenerateRandom::getActualTableStructure(ContextP
return parseColumnsListFromString(structure, context);
}
StoragePtr TableFunctionGenerateRandom::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
StoragePtr TableFunctionGenerateRandom::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool is_insert_query) const
{
ColumnsDescription columns = getActualTableStructure(context);
ColumnsDescription columns = getActualTableStructure(context, is_insert_query);
auto res = std::make_shared<StorageGenerateRandom>(
StorageID(getDatabaseName(), table_name), columns, String{}, max_array_length, max_string_length, random_seed);
res->startup();

Some files were not shown because too many files have changed in this diff Show More