Merge branch 'master' into cont-server-after-gdb-killed

This commit is contained in:
Nikolai Kochetov 2023-08-01 17:32:29 +02:00 committed by GitHub
commit 2d7f52971a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
148 changed files with 3902 additions and 391 deletions

View File

@ -8,6 +8,7 @@
#include <functional> #include <functional>
#include <iosfwd> #include <iosfwd>
#include <base/defines.h>
#include <base/types.h> #include <base/types.h>
#include <base/unaligned.h> #include <base/unaligned.h>
@ -274,6 +275,8 @@ struct CRC32Hash
if (size == 0) if (size == 0)
return 0; return 0;
chassert(pos);
if (size < 8) if (size < 8)
{ {
return static_cast<unsigned>(hashLessThan8(x.data, x.size)); return static_cast<unsigned>(hashLessThan8(x.data, x.size));

View File

@ -115,8 +115,15 @@
/// because SIGABRT is easier to debug than SIGTRAP (the second one makes gdb crazy) /// because SIGABRT is easier to debug than SIGTRAP (the second one makes gdb crazy)
#if !defined(chassert) #if !defined(chassert)
#if defined(ABORT_ON_LOGICAL_ERROR) #if defined(ABORT_ON_LOGICAL_ERROR)
// clang-format off
#include <base/types.h>
namespace DB
{
void abortOnFailedAssertion(const String & description);
}
#define chassert(x) static_cast<bool>(x) ? void(0) : ::DB::abortOnFailedAssertion(#x) #define chassert(x) static_cast<bool>(x) ? void(0) : ::DB::abortOnFailedAssertion(#x)
#define UNREACHABLE() abort() #define UNREACHABLE() abort()
// clang-format off
#else #else
/// Here sizeof() trick is used to suppress unused warning for result, /// Here sizeof() trick is used to suppress unused warning for result,
/// since simple "(void)x" will evaluate the expression, while /// since simple "(void)x" will evaluate the expression, while

View File

@ -57,7 +57,7 @@ public:
URI(); URI();
/// Creates an empty URI. /// Creates an empty URI.
explicit URI(const std::string & uri, bool disable_url_encoding = false); explicit URI(const std::string & uri, bool enable_url_encoding = true);
/// Parses an URI from the given string. Throws a /// Parses an URI from the given string. Throws a
/// SyntaxException if the uri is not valid. /// SyntaxException if the uri is not valid.
@ -362,7 +362,7 @@ private:
std::string _query; std::string _query;
std::string _fragment; std::string _fragment;
bool _disable_url_encoding = false; bool _enable_url_encoding = true;
}; };

View File

@ -36,8 +36,8 @@ URI::URI():
} }
URI::URI(const std::string& uri, bool decode_and_encode_path): URI::URI(const std::string& uri, bool enable_url_encoding):
_port(0), _disable_url_encoding(decode_and_encode_path) _port(0), _enable_url_encoding(enable_url_encoding)
{ {
parse(uri); parse(uri);
} }
@ -108,7 +108,7 @@ URI::URI(const URI& uri):
_path(uri._path), _path(uri._path),
_query(uri._query), _query(uri._query),
_fragment(uri._fragment), _fragment(uri._fragment),
_disable_url_encoding(uri._disable_url_encoding) _enable_url_encoding(uri._enable_url_encoding)
{ {
} }
@ -121,7 +121,7 @@ URI::URI(const URI& baseURI, const std::string& relativeURI):
_path(baseURI._path), _path(baseURI._path),
_query(baseURI._query), _query(baseURI._query),
_fragment(baseURI._fragment), _fragment(baseURI._fragment),
_disable_url_encoding(baseURI._disable_url_encoding) _enable_url_encoding(baseURI._enable_url_encoding)
{ {
resolve(relativeURI); resolve(relativeURI);
} }
@ -153,7 +153,7 @@ URI& URI::operator = (const URI& uri)
_path = uri._path; _path = uri._path;
_query = uri._query; _query = uri._query;
_fragment = uri._fragment; _fragment = uri._fragment;
_disable_url_encoding = uri._disable_url_encoding; _enable_url_encoding = uri._enable_url_encoding;
} }
return *this; return *this;
} }
@ -184,7 +184,7 @@ void URI::swap(URI& uri)
std::swap(_path, uri._path); std::swap(_path, uri._path);
std::swap(_query, uri._query); std::swap(_query, uri._query);
std::swap(_fragment, uri._fragment); std::swap(_fragment, uri._fragment);
std::swap(_disable_url_encoding, uri._disable_url_encoding); std::swap(_enable_url_encoding, uri._enable_url_encoding);
} }
@ -687,18 +687,18 @@ void URI::decode(const std::string& str, std::string& decodedStr, bool plusAsSpa
void URI::encodePath(std::string & encodedStr) const void URI::encodePath(std::string & encodedStr) const
{ {
if (_disable_url_encoding) if (_enable_url_encoding)
encodedStr = _path;
else
encode(_path, RESERVED_PATH, encodedStr); encode(_path, RESERVED_PATH, encodedStr);
else
encodedStr = _path;
} }
void URI::decodePath(const std::string & encodedStr) void URI::decodePath(const std::string & encodedStr)
{ {
if (_disable_url_encoding) if (_enable_url_encoding)
_path = encodedStr;
else
decode(encodedStr, _path); decode(encodedStr, _path);
else
_path = encodedStr;
} }
bool URI::isWellKnownPort() const bool URI::isWellKnownPort() const

View File

@ -106,4 +106,4 @@ For partitioning by month, use the `toYYYYMM(date_column)` expression, where `da
## Storage Settings {#storage-settings} ## Storage Settings {#storage-settings}
- [engine_url_skip_empty_files](/docs/en/operations/settings/settings.md#engine_url_skip_empty_files) - allows to skip empty files while reading. Disabled by default. - [engine_url_skip_empty_files](/docs/en/operations/settings/settings.md#engine_url_skip_empty_files) - allows to skip empty files while reading. Disabled by default.
- [disable_url_encoding](/docs/en/operations/settings/settings.md#disable_url_encoding) -allows to disable decoding/encoding path in uri. Disabled by default. - [enable_url_encoding](/docs/en/operations/settings/settings.md#enable_url_encoding) - allows to enable/disable decoding/encoding path in uri. Enabled by default.

View File

@ -1723,6 +1723,34 @@ You can select data from a ClickHouse table and save them into some file in the
``` bash ``` bash
$ clickhouse-client --query = "SELECT * FROM test.hits FORMAT CapnProto SETTINGS format_schema = 'schema:Message'" $ clickhouse-client --query = "SELECT * FROM test.hits FORMAT CapnProto SETTINGS format_schema = 'schema:Message'"
``` ```
### Using autogenerated schema {#using-autogenerated-capn-proto-schema}
If you don't have an external CapnProto schema for your data, you can still output/input data in CapnProto format using autogenerated schema.
For example:
```sql
SELECT * FROM test.hits format CapnProto SETTINGS format_capn_proto_use_autogenerated_schema=1
```
In this case ClickHouse will autogenerate CapnProto schema according to the table structure using function [structureToCapnProtoSchema](../sql-reference/functions/other-functions.md#structure_to_capn_proto_schema) and will use this schema to serialize data in CapnProto format.
You can also read CapnProto file with autogenerated schema (in this case the file must be created using the same schema):
```bash
$ cat hits.bin | clickhouse-client --query "INSERT INTO test.hits SETTINGS format_capn_proto_use_autogenerated_schema=1 FORMAT CapnProto"
```
The setting [format_capn_proto_use_autogenerated_schema](../operations/settings/settings-formats.md#format_capn_proto_use_autogenerated_schema) is enabled by default and applies if [format_schema](../operations/settings/settings-formats.md#formatschema-format-schema) is not set.
You can also save autogenerated schema in the file during input/output using setting [output_format_schema](../operations/settings/settings-formats.md#outputformatschema-output-format-schema). For example:
```sql
SELECT * FROM test.hits format CapnProto SETTINGS format_capn_proto_use_autogenerated_schema=1, output_format_schema='path/to/schema/schema.capnp'
```
In this case autogenerated CapnProto schema will be saved in file `path/to/schema/schema.capnp`.
## Prometheus {#prometheus} ## Prometheus {#prometheus}
Expose metrics in [Prometheus text-based exposition format](https://prometheus.io/docs/instrumenting/exposition_formats/#text-based-format). Expose metrics in [Prometheus text-based exposition format](https://prometheus.io/docs/instrumenting/exposition_formats/#text-based-format).
@ -1861,6 +1889,33 @@ ClickHouse inputs and outputs protobuf messages in the `length-delimited` format
It means before every message should be written its length as a [varint](https://developers.google.com/protocol-buffers/docs/encoding#varints). It means before every message should be written its length as a [varint](https://developers.google.com/protocol-buffers/docs/encoding#varints).
See also [how to read/write length-delimited protobuf messages in popular languages](https://cwiki.apache.org/confluence/display/GEODE/Delimiting+Protobuf+Messages). See also [how to read/write length-delimited protobuf messages in popular languages](https://cwiki.apache.org/confluence/display/GEODE/Delimiting+Protobuf+Messages).
### Using autogenerated schema {#using-autogenerated-protobuf-schema}
If you don't have an external Protobuf schema for your data, you can still output/input data in Protobuf format using autogenerated schema.
For example:
```sql
SELECT * FROM test.hits format Protobuf SETTINGS format_protobuf_use_autogenerated_schema=1
```
In this case ClickHouse will autogenerate Protobuf schema according to the table structure using function [structureToProtobufSchema](../sql-reference/functions/other-functions.md#structure_to_protobuf_schema) and will use this schema to serialize data in Protobuf format.
You can also read Protobuf file with autogenerated schema (in this case the file must be created using the same schema):
```bash
$ cat hits.bin | clickhouse-client --query "INSERT INTO test.hits SETTINGS format_protobuf_use_autogenerated_schema=1 FORMAT Protobuf"
```
The setting [format_protobuf_use_autogenerated_schema](../operations/settings/settings-formats.md#format_protobuf_use_autogenerated_schema) is enabled by default and applies if [format_schema](../operations/settings/settings-formats.md#formatschema-format-schema) is not set.
You can also save autogenerated schema in the file during input/output using setting [output_format_schema](../operations/settings/settings-formats.md#outputformatschema-output-format-schema). For example:
```sql
SELECT * FROM test.hits format Protobuf SETTINGS format_protobuf_use_autogenerated_schema=1, output_format_schema='path/to/schema/schema.proto'
```
In this case autogenerated Protobuf schema will be saved in file `path/to/schema/schema.capnp`.
## ProtobufSingle {#protobufsingle} ## ProtobufSingle {#protobufsingle}
Same as [Protobuf](#protobuf) but for storing/parsing single Protobuf message without length delimiters. Same as [Protobuf](#protobuf) but for storing/parsing single Protobuf message without length delimiters.

View File

@ -321,6 +321,10 @@ If both `input_format_allow_errors_num` and `input_format_allow_errors_ratio` ar
This parameter is useful when you are using formats that require a schema definition, such as [Capn Proto](https://capnproto.org/) or [Protobuf](https://developers.google.com/protocol-buffers/). The value depends on the format. This parameter is useful when you are using formats that require a schema definition, such as [Capn Proto](https://capnproto.org/) or [Protobuf](https://developers.google.com/protocol-buffers/). The value depends on the format.
## output_format_schema {#output-format-schema}
The path to the file where the automatically generated schema will be saved in [Capn Proto](../../interfaces/formats.md#capnproto-capnproto) or [Protobuf](../../interfaces/formats.md#protobuf-protobuf) formats.
## output_format_enable_streaming {#output_format_enable_streaming} ## output_format_enable_streaming {#output_format_enable_streaming}
Enable streaming in output formats that support it. Enable streaming in output formats that support it.
@ -1330,6 +1334,11 @@ When serializing Nullable columns with Google wrappers, serialize default values
Disabled by default. Disabled by default.
### format_protobuf_use_autogenerated_schema {#format_capn_proto_use_autogenerated_schema}
Use autogenerated Protobuf schema when [format_schema](#formatschema-format-schema) is not set.
The schema is generated from ClickHouse table structure using function [structureToProtobufSchema](../../sql-reference/functions/other-functions.md#structure_to_protobuf_schema)
## Avro format settings {#avro-format-settings} ## Avro format settings {#avro-format-settings}
### input_format_avro_allow_missing_fields {#input_format_avro_allow_missing_fields} ### input_format_avro_allow_missing_fields {#input_format_avro_allow_missing_fields}
@ -1626,6 +1635,11 @@ Possible values:
Default value: `'by_values'`. Default value: `'by_values'`.
### format_capn_proto_use_autogenerated_schema {#format_capn_proto_use_autogenerated_schema}
Use autogenerated CapnProto schema when [format_schema](#formatschema-format-schema) is not set.
The schema is generated from ClickHouse table structure using function [structureToCapnProtoSchema](../../sql-reference/functions/other-functions.md#structure_to_capnproto_schema)
## MySQLDump format settings {#musqldump-format-settings} ## MySQLDump format settings {#musqldump-format-settings}
### input_format_mysql_dump_table_name (#input_format_mysql_dump_table_name) ### input_format_mysql_dump_table_name (#input_format_mysql_dump_table_name)

View File

@ -3468,11 +3468,11 @@ Possible values:
Default value: `0`. Default value: `0`.
## disable_url_encoding {#disable_url_encoding} ## enable_url_encoding {#enable_url_encoding}
Allows to disable decoding/encoding path in uri in [URL](../../engines/table-engines/special/url.md) engine tables. Allows to enable/disable decoding/encoding path in uri in [URL](../../engines/table-engines/special/url.md) engine tables.
Disabled by default. Enabled by default.
## database_atomic_wait_for_drop_and_detach_synchronously {#database_atomic_wait_for_drop_and_detach_synchronously} ## database_atomic_wait_for_drop_and_detach_synchronously {#database_atomic_wait_for_drop_and_detach_synchronously}

View File

@ -2552,3 +2552,187 @@ Result:
This function can be used together with [generateRandom](../../sql-reference/table-functions/generate.md) to generate completely random tables. This function can be used together with [generateRandom](../../sql-reference/table-functions/generate.md) to generate completely random tables.
## structureToCapnProtoSchema {#structure_to_capn_proto_schema}
Converts ClickHouse table structure to CapnProto schema.
**Syntax**
``` sql
structureToCapnProtoSchema(structure)
```
**Arguments**
- `structure` — Table structure in a format `column1_name column1_type, column2_name column2_type, ...`.
- `root_struct_name` — Name for root struct in CapnProto schema. Default value - `Message`;
**Returned value**
- CapnProto schema
Type: [String](../../sql-reference/data-types/string.md).
**Examples**
Query:
``` sql
SELECT structureToCapnProtoSchema('column1 String, column2 UInt32, column3 Array(String)') FORMAT RawBLOB
```
Result:
``` text
@0xf96402dd754d0eb7;
struct Message
{
column1 @0 : Data;
column2 @1 : UInt32;
column3 @2 : List(Data);
}
```
Query:
``` sql
SELECT structureToCapnProtoSchema('column1 Nullable(String), column2 Tuple(element1 UInt32, element2 Array(String)), column3 Map(String, String)') FORMAT RawBLOB
```
Result:
``` text
@0xd1c8320fecad2b7f;
struct Message
{
struct Column1
{
union
{
value @0 : Data;
null @1 : Void;
}
}
column1 @0 : Column1;
struct Column2
{
element1 @0 : UInt32;
element2 @1 : List(Data);
}
column2 @1 : Column2;
struct Column3
{
struct Entry
{
key @0 : Data;
value @1 : Data;
}
entries @0 : List(Entry);
}
column3 @2 : Column3;
}
```
Query:
``` sql
SELECT structureToCapnProtoSchema('column1 String, column2 UInt32', 'Root') FORMAT RawBLOB
```
Result:
``` text
@0x96ab2d4ab133c6e1;
struct Root
{
column1 @0 : Data;
column2 @1 : UInt32;
}
```
## structureToProtobufSchema {#structure_to_protobuf_schema}
Converts ClickHouse table structure to Protobuf schema.
**Syntax**
``` sql
structureToProtobufSchema(structure)
```
**Arguments**
- `structure` — Table structure in a format `column1_name column1_type, column2_name column2_type, ...`.
- `root_message_name` — Name for root message in Protobuf schema. Default value - `Message`;
**Returned value**
- Protobuf schema
Type: [String](../../sql-reference/data-types/string.md).
**Examples**
Query:
``` sql
SELECT structureToProtobufSchema('column1 String, column2 UInt32, column3 Array(String)') FORMAT RawBLOB
```
Result:
``` text
syntax = "proto3";
message Message
{
bytes column1 = 1;
uint32 column2 = 2;
repeated bytes column3 = 3;
}
```
Query:
``` sql
SELECT structureToProtobufSchema('column1 Nullable(String), column2 Tuple(element1 UInt32, element2 Array(String)), column3 Map(String, String)') FORMAT RawBLOB
```
Result:
``` text
syntax = "proto3";
message Message
{
bytes column1 = 1;
message Column2
{
uint32 element1 = 1;
repeated bytes element2 = 2;
}
Column2 column2 = 2;
map<string, bytes> column3 = 3;
}
```
Query:
``` sql
SELECT structureToProtobufSchema('column1 String, column2 UInt32', 'Root') FORMAT RawBLOB
```
Result:
``` text
syntax = "proto3";
message Root
{
bytes column1 = 1;
uint32 column2 = 2;
}
```

View File

@ -56,7 +56,7 @@ Character `|` inside patterns is used to specify failover addresses. They are it
## Storage Settings {#storage-settings} ## Storage Settings {#storage-settings}
- [engine_url_skip_empty_files](/docs/en/operations/settings/settings.md#engine_url_skip_empty_files) - allows to skip empty files while reading. Disabled by default. - [engine_url_skip_empty_files](/docs/en/operations/settings/settings.md#engine_url_skip_empty_files) - allows to skip empty files while reading. Disabled by default.
- [disable_url_encoding](/docs/en/operations/settings/settings.md#disable_url_encoding) - allows to disable decoding/encoding path in uri. Disabled by default. - [enable_url_encoding](/docs/en/operations/settings/settings.md#enable_url_encoding) - allows to enable/disable decoding/encoding path in uri. Enabled by default.
**See Also** **See Also**

View File

@ -0,0 +1,221 @@
#include <Analyzer/Passes/OptimizeDateOrDateTimeConverterWithPreimagePass.h>
#include <Functions/FunctionFactory.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Analyzer/ColumnNode.h>
#include <Analyzer/ConstantNode.h>
#include <Analyzer/FunctionNode.h>
#include <Common/DateLUT.h>
#include <Common/DateLUTImpl.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
namespace
{
class OptimizeDateOrDateTimeConverterWithPreimageVisitor : public InDepthQueryTreeVisitorWithContext<OptimizeDateOrDateTimeConverterWithPreimageVisitor>
{
public:
using Base = InDepthQueryTreeVisitorWithContext<OptimizeDateOrDateTimeConverterWithPreimageVisitor>;
explicit OptimizeDateOrDateTimeConverterWithPreimageVisitor(ContextPtr context)
: Base(std::move(context))
{}
static bool needChildVisit(QueryTreeNodePtr & node, QueryTreeNodePtr & /*child*/)
{
const static std::unordered_set<String> relations = {
"equals",
"notEquals",
"less",
"greater",
"lessOrEquals",
"greaterOrEquals",
};
if (const auto * function = node->as<FunctionNode>())
{
return !relations.contains(function->getFunctionName());
}
return true;
}
void enterImpl(QueryTreeNodePtr & node) const
{
const static std::unordered_map<String, String> swap_relations = {
{"equals", "equals"},
{"notEquals", "notEquals"},
{"less", "greater"},
{"greater", "less"},
{"lessOrEquals", "greaterOrEquals"},
{"greaterOrEquals", "lessOrEquals"},
};
const auto * function = node->as<FunctionNode>();
if (!function || !swap_relations.contains(function->getFunctionName())) return;
if (function->getArguments().getNodes().size() != 2) return;
size_t func_id = function->getArguments().getNodes().size();
for (size_t i = 0; i < function->getArguments().getNodes().size(); i++)
{
if (const auto * func = function->getArguments().getNodes()[i]->as<FunctionNode>())
{
func_id = i;
}
}
if (func_id == function->getArguments().getNodes().size()) return;
size_t literal_id = 1 - func_id;
const auto * literal = function->getArguments().getNodes()[literal_id]->as<ConstantNode>();
if (!literal || literal->getValue().getType() != Field::Types::UInt64) return;
String comparator = literal_id > func_id ? function->getFunctionName(): swap_relations.at(function->getFunctionName());
const auto * func_node = function->getArguments().getNodes()[func_id]->as<FunctionNode>();
/// Currently we only handle single-argument functions.
if (!func_node || func_node->getArguments().getNodes().size() != 1) return;
const auto * column_id = func_node->getArguments().getNodes()[0]->as<ColumnNode>();
if (!column_id) return;
const auto * column_type = column_id->getColumnType().get();
if (!isDateOrDate32(column_type) && !isDateTime(column_type) && !isDateTime64(column_type)) return;
const auto & converter = FunctionFactory::instance().tryGet(func_node->getFunctionName(), getContext());
if (!converter) return;
ColumnsWithTypeAndName args;
args.emplace_back(column_id->getColumnType(), "tmp");
auto converter_base = converter->build(args);
if (!converter_base || !converter_base->hasInformationAboutPreimage()) return;
auto preimage_range = converter_base->getPreimage(*(column_id->getColumnType()), literal->getValue());
if (!preimage_range) return;
const auto new_node = generateOptimizedDateFilter(comparator, *column_id, *preimage_range);
if (!new_node) return;
node = new_node;
}
private:
QueryTreeNodePtr generateOptimizedDateFilter(const String & comparator, const ColumnNode & column_node, const std::pair<Field, Field>& range) const
{
const DateLUTImpl & date_lut = DateLUT::instance("UTC");
String start_date_or_date_time;
String end_date_or_date_time;
if (isDateOrDate32(column_node.getColumnType().get()))
{
start_date_or_date_time = date_lut.dateToString(range.first.get<DateLUTImpl::Time>());
end_date_or_date_time = date_lut.dateToString(range.second.get<DateLUTImpl::Time>());
}
else if (isDateTime(column_node.getColumnType().get()) || isDateTime64(column_node.getColumnType().get()))
{
start_date_or_date_time = date_lut.timeToString(range.first.get<DateLUTImpl::Time>());
end_date_or_date_time = date_lut.timeToString(range.second.get<DateLUTImpl::Time>());
}
else [[unlikely]] return {};
if (comparator == "equals")
{
const auto lhs = std::make_shared<FunctionNode>("greaterOrEquals");
lhs->getArguments().getNodes().push_back(std::make_shared<ColumnNode>(column_node.getColumn(), column_node.getColumnSource()));
lhs->getArguments().getNodes().push_back(std::make_shared<ConstantNode>(start_date_or_date_time));
resolveOrdinaryFunctionNode(*lhs, lhs->getFunctionName());
const auto rhs = std::make_shared<FunctionNode>("less");
rhs->getArguments().getNodes().push_back(std::make_shared<ColumnNode>(column_node.getColumn(), column_node.getColumnSource()));
rhs->getArguments().getNodes().push_back(std::make_shared<ConstantNode>(end_date_or_date_time));
resolveOrdinaryFunctionNode(*rhs, rhs->getFunctionName());
const auto new_date_filter = std::make_shared<FunctionNode>("and");
new_date_filter->getArguments().getNodes() = {lhs, rhs};
resolveOrdinaryFunctionNode(*new_date_filter, new_date_filter->getFunctionName());
return new_date_filter;
}
else if (comparator == "notEquals")
{
const auto lhs = std::make_shared<FunctionNode>("less");
lhs->getArguments().getNodes().push_back(std::make_shared<ColumnNode>(column_node.getColumn(), column_node.getColumnSource()));
lhs->getArguments().getNodes().push_back(std::make_shared<ConstantNode>(start_date_or_date_time));
resolveOrdinaryFunctionNode(*lhs, lhs->getFunctionName());
const auto rhs = std::make_shared<FunctionNode>("greaterOrEquals");
rhs->getArguments().getNodes().push_back(std::make_shared<ColumnNode>(column_node.getColumn(), column_node.getColumnSource()));
rhs->getArguments().getNodes().push_back(std::make_shared<ConstantNode>(end_date_or_date_time));
resolveOrdinaryFunctionNode(*rhs, rhs->getFunctionName());
const auto new_date_filter = std::make_shared<FunctionNode>("or");
new_date_filter->getArguments().getNodes() = {lhs, rhs};
resolveOrdinaryFunctionNode(*new_date_filter, new_date_filter->getFunctionName());
return new_date_filter;
}
else if (comparator == "greater")
{
const auto new_date_filter = std::make_shared<FunctionNode>("greaterOrEquals");
new_date_filter->getArguments().getNodes().push_back(std::make_shared<ColumnNode>(column_node.getColumn(), column_node.getColumnSource()));
new_date_filter->getArguments().getNodes().push_back(std::make_shared<ConstantNode>(end_date_or_date_time));
resolveOrdinaryFunctionNode(*new_date_filter, new_date_filter->getFunctionName());
return new_date_filter;
}
else if (comparator == "lessOrEquals")
{
const auto new_date_filter = std::make_shared<FunctionNode>("less");
new_date_filter->getArguments().getNodes().push_back(std::make_shared<ColumnNode>(column_node.getColumn(), column_node.getColumnSource()));
new_date_filter->getArguments().getNodes().push_back(std::make_shared<ConstantNode>(end_date_or_date_time));
resolveOrdinaryFunctionNode(*new_date_filter, new_date_filter->getFunctionName());
return new_date_filter;
}
else if (comparator == "less" || comparator == "greaterOrEquals")
{
const auto new_date_filter = std::make_shared<FunctionNode>(comparator);
new_date_filter->getArguments().getNodes().push_back(std::make_shared<ColumnNode>(column_node.getColumn(), column_node.getColumnSource()));
new_date_filter->getArguments().getNodes().push_back(std::make_shared<ConstantNode>(start_date_or_date_time));
resolveOrdinaryFunctionNode(*new_date_filter, new_date_filter->getFunctionName());
return new_date_filter;
}
else [[unlikely]]
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Expected equals, notEquals, less, lessOrEquals, greater, greaterOrEquals. Actual {}",
comparator);
}
}
void resolveOrdinaryFunctionNode(FunctionNode & function_node, const String & function_name) const
{
auto function = FunctionFactory::instance().get(function_name, getContext());
function_node.resolveAsFunction(function->build(function_node.getArgumentColumns()));
}
};
}
void OptimizeDateOrDateTimeConverterWithPreimagePass::run(QueryTreeNodePtr query_tree_node, ContextPtr context)
{
OptimizeDateOrDateTimeConverterWithPreimageVisitor visitor(std::move(context));
visitor.visit(query_tree_node);
}
}

View File

@ -0,0 +1,24 @@
#pragma once
#include <Analyzer/IQueryTreePass.h>
namespace DB
{
/** Replace predicate having Date/DateTime converters with their preimages to improve performance.
* Given a Date column c, toYear(c) = 2023 -> c >= '2023-01-01' AND c < '2024-01-01'
* Or if c is a DateTime column, toYear(c) = 2023 -> c >= '2023-01-01 00:00:00' AND c < '2024-01-01 00:00:00'.
* The similar optimization also applies to other converters.
*/
class OptimizeDateOrDateTimeConverterWithPreimagePass final : public IQueryTreePass
{
public:
String getName() override { return "OptimizeDateOrDateTimeConverterWithPreimagePass"; }
String getDescription() override { return "Replace predicate having Date/DateTime converters with their preimages"; }
void run(QueryTreeNodePtr query_tree_node, ContextPtr context) override;
};
}

View File

@ -42,6 +42,7 @@
#include <Analyzer/Passes/CrossToInnerJoinPass.h> #include <Analyzer/Passes/CrossToInnerJoinPass.h>
#include <Analyzer/Passes/ShardNumColumnToFunctionPass.h> #include <Analyzer/Passes/ShardNumColumnToFunctionPass.h>
#include <Analyzer/Passes/ConvertQueryToCNFPass.h> #include <Analyzer/Passes/ConvertQueryToCNFPass.h>
#include <Analyzer/Passes/OptimizeDateOrDateTimeConverterWithPreimagePass.h>
namespace DB namespace DB
{ {
@ -278,6 +279,7 @@ void addQueryTreePasses(QueryTreePassManager & manager)
manager.addPass(std::make_unique<AutoFinalOnQueryPass>()); manager.addPass(std::make_unique<AutoFinalOnQueryPass>());
manager.addPass(std::make_unique<CrossToInnerJoinPass>()); manager.addPass(std::make_unique<CrossToInnerJoinPass>());
manager.addPass(std::make_unique<ShardNumColumnToFunctionPass>()); manager.addPass(std::make_unique<ShardNumColumnToFunctionPass>());
manager.addPass(std::make_unique<OptimizeDateOrDateTimeConverterWithPreimagePass>());
} }
} }

View File

@ -564,16 +564,23 @@ void ColumnNullable::updatePermutationImpl(IColumn::PermutationSortDirection dir
else else
getNestedColumn().updatePermutation(direction, stability, limit, null_direction_hint, res, new_ranges); getNestedColumn().updatePermutation(direction, stability, limit, null_direction_hint, res, new_ranges);
equal_ranges = std::move(new_ranges);
if (unlikely(stability == PermutationSortStability::Stable)) if (unlikely(stability == PermutationSortStability::Stable))
{ {
for (auto & null_range : null_ranges) for (auto & null_range : null_ranges)
::sort(res.begin() + null_range.first, res.begin() + null_range.second); ::sort(res.begin() + null_range.first, res.begin() + null_range.second);
} }
if (is_nulls_last || null_ranges.empty())
{
equal_ranges = std::move(new_ranges);
std::move(null_ranges.begin(), null_ranges.end(), std::back_inserter(equal_ranges)); std::move(null_ranges.begin(), null_ranges.end(), std::back_inserter(equal_ranges));
} }
else
{
equal_ranges = std::move(null_ranges);
std::move(new_ranges.begin(), new_ranges.end(), std::back_inserter(equal_ranges));
}
}
void ColumnNullable::getPermutation(IColumn::PermutationSortDirection direction, IColumn::PermutationSortStability stability, void ColumnNullable::getPermutation(IColumn::PermutationSortDirection direction, IColumn::PermutationSortStability stability,
size_t limit, int null_direction_hint, Permutation & res) const size_t limit, int null_direction_hint, Permutation & res) const

View File

@ -439,7 +439,7 @@ void ColumnSparse::compareColumn(const IColumn & rhs, size_t rhs_row_num,
PaddedPODArray<UInt64> * row_indexes, PaddedPODArray<Int8> & compare_results, PaddedPODArray<UInt64> * row_indexes, PaddedPODArray<Int8> & compare_results,
int direction, int nan_direction_hint) const int direction, int nan_direction_hint) const
{ {
if (row_indexes) if (row_indexes || !typeid_cast<const ColumnSparse *>(&rhs))
{ {
/// TODO: implement without conversion to full column. /// TODO: implement without conversion to full column.
auto this_full = convertToFullColumnIfSparse(); auto this_full = convertToFullColumnIfSparse();

View File

@ -626,7 +626,7 @@ class IColumn;
M(Bool, engine_file_allow_create_multiple_files, false, "Enables or disables creating a new file on each insert in file engine tables if format has suffix.", 0) \ M(Bool, engine_file_allow_create_multiple_files, false, "Enables or disables creating a new file on each insert in file engine tables if format has suffix.", 0) \
M(Bool, engine_file_skip_empty_files, false, "Allows to skip empty files in file table engine", 0) \ M(Bool, engine_file_skip_empty_files, false, "Allows to skip empty files in file table engine", 0) \
M(Bool, engine_url_skip_empty_files, false, "Allows to skip empty files in url table engine", 0) \ M(Bool, engine_url_skip_empty_files, false, "Allows to skip empty files in url table engine", 0) \
M(Bool, disable_url_encoding, false, " Allows to disable decoding/encoding path in uri in URL table engine", 0) \ M(Bool, enable_url_encoding, true, " Allows to enable/disable decoding/encoding path in uri in URL table engine", 0) \
M(Bool, allow_experimental_database_replicated, false, "Allow to create databases with Replicated engine", 0) \ M(Bool, allow_experimental_database_replicated, false, "Allow to create databases with Replicated engine", 0) \
M(UInt64, database_replicated_initial_query_timeout_sec, 300, "How long initial DDL query should wait for Replicated database to precess previous DDL queue entries", 0) \ M(UInt64, database_replicated_initial_query_timeout_sec, 300, "How long initial DDL query should wait for Replicated database to precess previous DDL queue entries", 0) \
M(Bool, database_replicated_enforce_synchronous_settings, false, "Enforces synchronous waiting for some queries (see also database_atomic_wait_for_drop_and_detach_synchronously, mutation_sync, alter_sync). Not recommended to enable these settings.", 0) \ M(Bool, database_replicated_enforce_synchronous_settings, false, "Enforces synchronous waiting for some queries (see also database_atomic_wait_for_drop_and_detach_synchronously, mutation_sync, alter_sync). Not recommended to enable these settings.", 0) \
@ -1011,6 +1011,10 @@ class IColumn;
\ \
M(CapnProtoEnumComparingMode, format_capn_proto_enum_comparising_mode, FormatSettings::CapnProtoEnumComparingMode::BY_VALUES, "How to map ClickHouse Enum and CapnProto Enum", 0) \ M(CapnProtoEnumComparingMode, format_capn_proto_enum_comparising_mode, FormatSettings::CapnProtoEnumComparingMode::BY_VALUES, "How to map ClickHouse Enum and CapnProto Enum", 0) \
\ \
M(Bool, format_capn_proto_use_autogenerated_schema, true, "Use autogenerated CapnProto schema when format_schema is not set", 0) \
M(Bool, format_protobuf_use_autogenerated_schema, true, "Use autogenerated Protobuf when format_schema is not set", 0) \
M(String, output_format_schema, "", "The path to the file where the automatically generated schema will be saved", 0) \
\
M(String, input_format_mysql_dump_table_name, "", "Name of the table in MySQL dump from which to read data", 0) \ M(String, input_format_mysql_dump_table_name, "", "Name of the table in MySQL dump from which to read data", 0) \
M(Bool, input_format_mysql_dump_map_column_names, true, "Match columns from table in MySQL dump and columns from ClickHouse table by names", 0) \ M(Bool, input_format_mysql_dump_map_column_names, true, "Match columns from table in MySQL dump and columns from ClickHouse table by names", 0) \
\ \

View File

@ -14,7 +14,7 @@ namespace ErrorCodes
} }
WriteBufferFromTemporaryFile::WriteBufferFromTemporaryFile(TemporaryFileOnDiskHolder && tmp_file_) WriteBufferFromTemporaryFile::WriteBufferFromTemporaryFile(TemporaryFileOnDiskHolder && tmp_file_)
: WriteBufferFromFile(tmp_file_->getPath(), DBMS_DEFAULT_BUFFER_SIZE, O_RDWR | O_TRUNC | O_CREAT, /* throttler= */ {}, 0600) : WriteBufferFromFile(tmp_file_->getAbsolutePath(), DBMS_DEFAULT_BUFFER_SIZE, O_RDWR | O_TRUNC | O_CREAT, /* throttler= */ {}, 0600)
, tmp_file(std::move(tmp_file_)) , tmp_file(std::move(tmp_file_))
{ {
} }

View File

@ -54,7 +54,7 @@ TemporaryFileOnDisk::TemporaryFileOnDisk(const DiskPtr & disk_, const String & p
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary file name is empty"); throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary file name is empty");
} }
String TemporaryFileOnDisk::getPath() const String TemporaryFileOnDisk::getAbsolutePath() const
{ {
return std::filesystem::path(disk->getPath()) / relative_path; return std::filesystem::path(disk->getPath()) / relative_path;
} }

View File

@ -22,7 +22,10 @@ public:
~TemporaryFileOnDisk(); ~TemporaryFileOnDisk();
DiskPtr getDisk() const { return disk; } DiskPtr getDisk() const { return disk; }
String getPath() const; /// Return absolute path (disk + relative_path)
String getAbsolutePath() const;
/// Return relative path (without disk)
const String & getRelativePath() const { return relative_path; }
private: private:
DiskPtr disk; DiskPtr disk;

View File

@ -143,12 +143,14 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.protobuf.input_flatten_google_wrappers = settings.input_format_protobuf_flatten_google_wrappers; format_settings.protobuf.input_flatten_google_wrappers = settings.input_format_protobuf_flatten_google_wrappers;
format_settings.protobuf.output_nullables_with_google_wrappers = settings.output_format_protobuf_nullables_with_google_wrappers; format_settings.protobuf.output_nullables_with_google_wrappers = settings.output_format_protobuf_nullables_with_google_wrappers;
format_settings.protobuf.skip_fields_with_unsupported_types_in_schema_inference = settings.input_format_protobuf_skip_fields_with_unsupported_types_in_schema_inference; format_settings.protobuf.skip_fields_with_unsupported_types_in_schema_inference = settings.input_format_protobuf_skip_fields_with_unsupported_types_in_schema_inference;
format_settings.protobuf.use_autogenerated_schema = settings.format_protobuf_use_autogenerated_schema;
format_settings.regexp.escaping_rule = settings.format_regexp_escaping_rule; format_settings.regexp.escaping_rule = settings.format_regexp_escaping_rule;
format_settings.regexp.regexp = settings.format_regexp; format_settings.regexp.regexp = settings.format_regexp;
format_settings.regexp.skip_unmatched = settings.format_regexp_skip_unmatched; format_settings.regexp.skip_unmatched = settings.format_regexp_skip_unmatched;
format_settings.schema.format_schema = settings.format_schema; format_settings.schema.format_schema = settings.format_schema;
format_settings.schema.format_schema_path = context->getFormatSchemaPath(); format_settings.schema.format_schema_path = context->getFormatSchemaPath();
format_settings.schema.is_server = context->hasGlobalContext() && (context->getGlobalContext()->getApplicationType() == Context::ApplicationType::SERVER); format_settings.schema.is_server = context->hasGlobalContext() && (context->getGlobalContext()->getApplicationType() == Context::ApplicationType::SERVER);
format_settings.schema.output_format_schema = settings.output_format_schema;
format_settings.skip_unknown_fields = settings.input_format_skip_unknown_fields; format_settings.skip_unknown_fields = settings.input_format_skip_unknown_fields;
format_settings.template_settings.resultset_format = settings.format_template_resultset; format_settings.template_settings.resultset_format = settings.format_template_resultset;
format_settings.template_settings.row_between_delimiter = settings.format_template_rows_between_delimiter; format_settings.template_settings.row_between_delimiter = settings.format_template_rows_between_delimiter;
@ -190,6 +192,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.defaults_for_omitted_fields = settings.input_format_defaults_for_omitted_fields; format_settings.defaults_for_omitted_fields = settings.input_format_defaults_for_omitted_fields;
format_settings.capn_proto.enum_comparing_mode = settings.format_capn_proto_enum_comparising_mode; format_settings.capn_proto.enum_comparing_mode = settings.format_capn_proto_enum_comparising_mode;
format_settings.capn_proto.skip_fields_with_unsupported_types_in_schema_inference = settings.input_format_capn_proto_skip_fields_with_unsupported_types_in_schema_inference; format_settings.capn_proto.skip_fields_with_unsupported_types_in_schema_inference = settings.input_format_capn_proto_skip_fields_with_unsupported_types_in_schema_inference;
format_settings.capn_proto.use_autogenerated_schema = settings.format_capn_proto_use_autogenerated_schema;
format_settings.seekable_read = settings.input_format_allow_seeks; format_settings.seekable_read = settings.input_format_allow_seeks;
format_settings.msgpack.number_of_columns = settings.input_format_msgpack_number_of_columns; format_settings.msgpack.number_of_columns = settings.input_format_msgpack_number_of_columns;
format_settings.msgpack.output_uuid_representation = settings.output_format_msgpack_uuid_representation; format_settings.msgpack.output_uuid_representation = settings.output_format_msgpack_uuid_representation;

View File

@ -1,6 +1,8 @@
#include <Formats/FormatSchemaInfo.h> #include <Formats/FormatSchemaInfo.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Common/filesystemHelpers.h>
#include <Disks/IO/WriteBufferFromTemporaryFile.h>
#include <filesystem> #include <filesystem>
@ -105,4 +107,84 @@ FormatSchemaInfo::FormatSchemaInfo(const FormatSettings & settings, const String
{ {
} }
template <typename SchemaGenerator>
MaybeAutogeneratedFormatSchemaInfo<SchemaGenerator>::MaybeAutogeneratedFormatSchemaInfo(
const FormatSettings & settings, const String & format, const Block & header, bool use_autogenerated_schema)
{
if (!use_autogenerated_schema || !settings.schema.format_schema.empty())
{
schema_info = std::make_unique<FormatSchemaInfo>(settings, format, true);
return;
}
String schema_path;
fs::path default_schema_directory_path(fs::canonical(settings.schema.format_schema_path) / "");
fs::path path;
if (!settings.schema.output_format_schema.empty())
{
schema_path = settings.schema.output_format_schema;
path = schema_path;
if (path.is_absolute())
{
if (settings.schema.is_server)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Absolute path in the 'output_format_schema' setting is prohibited: {}", path.string());
}
else if (path.has_parent_path() && !fs::weakly_canonical(default_schema_directory_path / path).string().starts_with(fs::weakly_canonical(default_schema_directory_path).string()))
{
if (settings.schema.is_server)
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Path in the 'format_schema' setting shouldn't go outside the 'format_schema_path' directory: {} ({} not in {})",
default_schema_directory_path.string(),
path.string(),
default_schema_directory_path.string());
path = default_schema_directory_path / path;
}
else
{
path = default_schema_directory_path / path;
}
}
else
{
if (settings.schema.is_server)
{
tmp_file_path = PocoTemporaryFile::tempName(default_schema_directory_path.string()) + '.' + getFormatSchemaDefaultFileExtension(format);
schema_path = fs::path(tmp_file_path).filename();
}
else
{
tmp_file_path = PocoTemporaryFile::tempName() + '.' + getFormatSchemaDefaultFileExtension(format);
schema_path = tmp_file_path;
}
path = tmp_file_path;
}
WriteBufferFromFile buf(path.string());
SchemaGenerator::writeSchema(buf, "Message", header.getNamesAndTypesList());
buf.finalize();
schema_info = std::make_unique<FormatSchemaInfo>(schema_path + ":Message", format, true, settings.schema.is_server, settings.schema.format_schema_path);
}
template <typename SchemaGenerator>
MaybeAutogeneratedFormatSchemaInfo<SchemaGenerator>::~MaybeAutogeneratedFormatSchemaInfo()
{
if (!tmp_file_path.empty())
{
try
{
fs::remove(tmp_file_path);
}
catch (...)
{
tryLogCurrentException("MaybeAutogeneratedFormatSchemaInfo", "Cannot delete temporary schema file");
}
}
}
template class MaybeAutogeneratedFormatSchemaInfo<StructureToCapnProtoSchema>;
template class MaybeAutogeneratedFormatSchemaInfo<StructureToProtobufSchema>;
} }

View File

@ -2,6 +2,8 @@
#include <base/types.h> #include <base/types.h>
#include <Formats/FormatSettings.h> #include <Formats/FormatSettings.h>
#include <Formats/StructureToCapnProtoSchema.h>
#include <Formats/StructureToProtobufSchema.h>
namespace DB namespace DB
{ {
@ -30,4 +32,23 @@ private:
String message_name; String message_name;
}; };
template <typename SchemaGenerator>
class MaybeAutogeneratedFormatSchemaInfo
{
public:
MaybeAutogeneratedFormatSchemaInfo(const FormatSettings & settings, const String & format, const Block & header, bool use_autogenerated_schema);
~MaybeAutogeneratedFormatSchemaInfo();
const FormatSchemaInfo & getSchemaInfo() const { return *schema_info; }
private:
std::unique_ptr<FormatSchemaInfo> schema_info;
String tmp_file_path;
};
using CapnProtoSchemaInfo = MaybeAutogeneratedFormatSchemaInfo<StructureToCapnProtoSchema>;
using ProtobufSchemaInfo = MaybeAutogeneratedFormatSchemaInfo<StructureToProtobufSchema>;
} }

View File

@ -276,6 +276,7 @@ struct FormatSettings
*/ */
bool allow_multiple_rows_without_delimiter = false; bool allow_multiple_rows_without_delimiter = false;
bool skip_fields_with_unsupported_types_in_schema_inference = false; bool skip_fields_with_unsupported_types_in_schema_inference = false;
bool use_autogenerated_schema = true;
} protobuf; } protobuf;
struct struct
@ -297,6 +298,7 @@ struct FormatSettings
std::string format_schema; std::string format_schema;
std::string format_schema_path; std::string format_schema_path;
bool is_server = false; bool is_server = false;
std::string output_format_schema;
} schema; } schema;
struct struct
@ -359,6 +361,7 @@ struct FormatSettings
{ {
CapnProtoEnumComparingMode enum_comparing_mode = CapnProtoEnumComparingMode::BY_VALUES; CapnProtoEnumComparingMode enum_comparing_mode = CapnProtoEnumComparingMode::BY_VALUES;
bool skip_fields_with_unsupported_types_in_schema_inference = false; bool skip_fields_with_unsupported_types_in_schema_inference = false;
bool use_autogenerated_schema = true;
} capn_proto; } capn_proto;
enum class MsgPackUUIDRepresentation enum class MsgPackUUIDRepresentation

View File

@ -3029,7 +3029,7 @@ namespace
if (!message_serializer) if (!message_serializer)
{ {
throw Exception(ErrorCodes::NO_COLUMNS_SERIALIZED_TO_PROTOBUF_FIELDS, throw Exception(ErrorCodes::NO_COLUMNS_SERIALIZED_TO_PROTOBUF_FIELDS,
"Not found matches between the names of the columns {{}} and the fields {{}} of the message {} in the protobuf schema", "Not found matches between the names of the columns ({}) and the fields ({}) of the message {} in the protobuf schema",
boost::algorithm::join(column_names, ", "), boost::algorithm::join(getFieldNames(message_descriptor), ", "), boost::algorithm::join(column_names, ", "), boost::algorithm::join(getFieldNames(message_descriptor), ", "),
quoteString(message_descriptor.full_name())); quoteString(message_descriptor.full_name()));
} }
@ -3647,7 +3647,7 @@ namespace
if (!message_serializer) if (!message_serializer)
{ {
throw Exception(ErrorCodes::NO_COLUMNS_SERIALIZED_TO_PROTOBUF_FIELDS, throw Exception(ErrorCodes::NO_COLUMNS_SERIALIZED_TO_PROTOBUF_FIELDS,
"Not found matches between the names of the tuple's elements {{}} and the fields {{}} " "Not found matches between the names of the tuple's elements ({}) and the fields ({}) "
"of the message {} in the protobuf schema", "of the message {} in the protobuf schema",
boost::algorithm::join(tuple_data_type.getElementNames(), ", "), boost::algorithm::join(tuple_data_type.getElementNames(), ", "),
boost::algorithm::join(getFieldNames(*field_descriptor.message_type()), ", "), boost::algorithm::join(getFieldNames(*field_descriptor.message_type()), ", "),

View File

@ -0,0 +1,236 @@
#include <Formats/StructureToCapnProtoSchema.h>
#include <Formats/StructureToFormatSchemaUtils.h>
#include <Columns/ColumnString.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeEnum.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/randomSeed.h>
#include <pcg_random.hpp>
namespace DB
{
using namespace StructureToFormatSchemaUtils;
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
namespace
{
const std::unordered_map<TypeIndex, String> capn_proto_simple_type_names =
{
{TypeIndex::Int8, "Int8"},
{TypeIndex::UInt8, "UInt8"},
{TypeIndex::Int16, "Int16"},
{TypeIndex::UInt16, "UInt16"},
{TypeIndex::Int32, "Int32"},
{TypeIndex::UInt32, "UInt32"},
{TypeIndex::Int64, "Int64"},
{TypeIndex::UInt64, "UInt64"},
{TypeIndex::Int128, "Data"},
{TypeIndex::UInt128, "Data"},
{TypeIndex::Int256, "Data"},
{TypeIndex::UInt256, "Data"},
{TypeIndex::Float32, "Float32"},
{TypeIndex::Float64, "Float64"},
{TypeIndex::Decimal32, "Int32"},
{TypeIndex::Decimal64, "Int64"},
{TypeIndex::Decimal128, "Data"},
{TypeIndex::Decimal256, "Data"},
{TypeIndex::String, "Data"},
{TypeIndex::FixedString, "Data"},
{TypeIndex::UUID, "Data"},
{TypeIndex::Date, "UInt16"},
{TypeIndex::Date32, "Int32"},
{TypeIndex::DateTime, "UInt32"},
{TypeIndex::DateTime64, "Int64"},
{TypeIndex::IPv4, "UInt32"},
{TypeIndex::IPv6, "Data"},
};
void writeCapnProtoHeader(WriteBuffer & buf)
{
pcg64 rng(randomSeed());
size_t id = rng() | (1ull << 63); /// First bit should be 1
writeString(fmt::format("@0x{};\n\n", getHexUIntLowercase(id)), buf);
}
void writeFieldDefinition(WriteBuffer & buf, const String & type_name, const String & column_name, size_t & field_index, size_t indent)
{
writeIndent(buf, indent);
writeString(fmt::format("{} @{} : {};\n", getSchemaFieldName(column_name), field_index++, type_name), buf);
}
void startEnum(WriteBuffer & buf, const String & enum_name, size_t indent)
{
startNested(buf, enum_name, "enum", indent);
}
void startUnion(WriteBuffer & buf, size_t indent)
{
startNested(buf, "", "union", indent);
}
void startStruct(WriteBuffer & buf, const String & struct_name, size_t indent)
{
startNested(buf, struct_name, "struct", indent);
}
String prepareAndGetCapnProtoTypeName(WriteBuffer & buf, const DataTypePtr & data_type, const String & column_name, size_t indent);
void writeField(WriteBuffer & buf, const DataTypePtr & data_type, const String & column_name, size_t & field_index, size_t indent)
{
auto field_type_name = prepareAndGetCapnProtoTypeName(buf, data_type, column_name, indent);
writeFieldDefinition(buf, field_type_name, column_name, field_index, indent);
}
String prepareArrayAndGetCapnProtoTypeName(WriteBuffer & buf, const DataTypePtr & data_type, const String & column_name, size_t indent)
{
const auto & nested_type = assert_cast<const DataTypeArray &>(*data_type).getNestedType();
auto nested_type_name = prepareAndGetCapnProtoTypeName(buf, nested_type, column_name, indent);
return "List(" + nested_type_name + ")";
}
String prepareNullableAndGetCapnProtoTypeName(WriteBuffer & buf, const DataTypePtr & data_type, const String & column_name, size_t indent)
{
/// Nullable is represented as a struct with union with 2 fields:
///
/// struct Nullable
/// {
/// union
/// {
/// value @0 : Value;
/// null @1 : Void;
/// }
/// }
auto struct_name = getSchemaMessageName(column_name);
startStruct(buf, struct_name, indent);
auto nested_type_name = prepareAndGetCapnProtoTypeName(buf, assert_cast<const DataTypeNullable &>(*data_type).getNestedType(), column_name, indent);
startUnion(buf, indent + 1);
size_t field_index = 0;
writeFieldDefinition(buf, nested_type_name, "value", field_index, indent + 2);
writeFieldDefinition(buf, "Void", "null", field_index, indent + 2);
endNested(buf, indent + 1);
endNested(buf, indent);
return struct_name;
}
String prepareTupleAndGetCapnProtoTypeName(WriteBuffer & buf, const DataTypePtr & data_type, const String & column_name, size_t indent)
{
const auto & tuple_type = assert_cast<const DataTypeTuple &>(*data_type);
auto nested_names_and_types = getCollectedTupleElements(tuple_type);
String struct_name = getSchemaMessageName(column_name);
startStruct(buf, struct_name, indent);
size_t nested_field_index = 0;
for (const auto & [name, type] : nested_names_and_types)
writeField(buf, type, name, nested_field_index, indent + 1);
endNested(buf, indent);
return struct_name;
}
String prepareMapAndGetCapnProtoTypeName(WriteBuffer & buf, const DataTypePtr & data_type, const String & column_name, size_t indent)
{
/// We output/input Map type as follow CapnProto schema
///
/// struct Map
/// {
/// struct Entry
/// {
/// key @0: Key;
/// value @1: Value;
/// }
/// entries @0 :List(Entry);
/// }
const auto & map_type = assert_cast<const DataTypeMap &>(*data_type);
const auto & key_type = map_type.getKeyType();
const auto & value_type = map_type.getValueType();
String struct_name = getSchemaMessageName(column_name);
startStruct(buf, struct_name, indent);
startStruct(buf, "Entry", indent + 1);
auto key_type_name = prepareAndGetCapnProtoTypeName(buf, key_type, "key", indent + 2);
auto value_type_name = prepareAndGetCapnProtoTypeName(buf, value_type, "value", indent + 2);
size_t field_index = 0;
writeFieldDefinition(buf, key_type_name, "key", field_index, indent + 2);
writeFieldDefinition(buf, value_type_name, "value", field_index, indent + 2);
endNested(buf, indent + 1);
field_index = 0;
writeFieldDefinition(buf, "List(Entry)", "entries", field_index, indent + 1);
endNested(buf, indent);
return struct_name;
}
template <typename EnumType>
String prepareEnumAndGetCapnProtoTypeName(WriteBuffer & buf, const DataTypePtr & data_type, const String & column_name, size_t indent)
{
const auto & enum_type = assert_cast<const DataTypeEnum<EnumType> &>(*data_type);
String enum_name = getSchemaMessageName(column_name);
startEnum(buf, enum_name, indent);
const auto & names = enum_type.getAllRegisteredNames();
for (size_t i = 0; i != names.size(); ++i)
{
writeIndent(buf, indent + 1);
writeString(fmt::format("{} @{};\n", names[i], std::to_string(i)), buf);
}
endNested(buf, indent);
return enum_name;
}
String prepareAndGetCapnProtoTypeName(WriteBuffer & buf, const DataTypePtr & data_type, const String & column_name, size_t indent)
{
TypeIndex type_id = data_type->getTypeId();
switch (data_type->getTypeId())
{
case TypeIndex::Nullable:
return prepareNullableAndGetCapnProtoTypeName(buf, data_type, column_name, indent);
case TypeIndex::LowCardinality:
return prepareAndGetCapnProtoTypeName(buf, assert_cast<const DataTypeLowCardinality &>(*data_type).getDictionaryType(), column_name, indent);
case TypeIndex::Array:
return prepareArrayAndGetCapnProtoTypeName(buf, data_type, column_name, indent);
case TypeIndex::Tuple:
return prepareTupleAndGetCapnProtoTypeName(buf, data_type, column_name, indent);
case TypeIndex::Map:
return prepareMapAndGetCapnProtoTypeName(buf, data_type, column_name, indent);
case TypeIndex::Enum8:
return prepareEnumAndGetCapnProtoTypeName<Int8>(buf, data_type, column_name, indent);
case TypeIndex::Enum16:
return prepareEnumAndGetCapnProtoTypeName<Int16>(buf, data_type, column_name, indent);
default:
{
if (isBool(data_type))
return "Bool";
auto it = capn_proto_simple_type_names.find(type_id);
if (it == capn_proto_simple_type_names.end())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "CapnProto type name is not found for type {}", data_type->getName());
return it->second;
}
}
}
}
void StructureToCapnProtoSchema::writeSchema(WriteBuffer & buf, const String & message_name, const NamesAndTypesList & names_and_types_)
{
auto names_and_types = collectNested(names_and_types_);
writeCapnProtoHeader(buf);
startStruct(buf, getSchemaMessageName(message_name), 0);
size_t field_index = 0;
for (const auto & [column_name, data_type] : names_and_types)
writeField(buf, data_type, column_name, field_index, 1);
endNested(buf, 0);
}
}

View File

@ -0,0 +1,16 @@
#pragma once
#include <IO/WriteBuffer.h>
#include <Core/NamesAndTypes.h>
namespace DB
{
struct StructureToCapnProtoSchema
{
static constexpr auto name = "structureToCapnProtoSchema";
static void writeSchema(WriteBuffer & buf, const String & message_name, const NamesAndTypesList & names_and_types_);
};
}

View File

@ -0,0 +1,117 @@
#include <Formats/StructureToFormatSchemaUtils.h>
#include <IO/WriteHelpers.h>
namespace DB
{
namespace StructureToFormatSchemaUtils
{
void writeIndent(WriteBuffer & buf, size_t indent)
{
writeChar(' ', indent * 4, buf);
}
void startNested(WriteBuffer & buf, const String & nested_name, const String & nested_type, size_t indent)
{
writeIndent(buf, indent);
writeString(nested_type, buf);
if (!nested_name.empty())
{
writeChar(' ', buf);
writeString(nested_name, buf);
}
writeChar('\n', buf);
writeIndent(buf, indent);
writeCString("{\n", buf);
}
void endNested(WriteBuffer & buf, size_t indent)
{
writeIndent(buf, indent);
writeCString("}\n", buf);
}
String getSchemaFieldName(const String & column_name)
{
String result = column_name;
/// Replace all first uppercase letters to lower-case,
/// because fields in CapnProto schema must begin with a lower-case letter.
/// Don't replace all letters to lower-case to remain camelCase field names.
for (auto & symbol : result)
{
if (islower(symbol))
break;
symbol = tolower(symbol);
}
return result;
}
String getSchemaMessageName(const String & column_name)
{
String result = column_name;
if (!column_name.empty() && isalpha(column_name[0]))
result[0] = toupper(column_name[0]);
return result;
}
namespace
{
std::pair<String, String> splitName(const String & name)
{
const auto * begin = name.data();
const auto * end = name.data() + name.size();
const auto * it = find_first_symbols<'_', '.'>(begin, end);
String first = String(begin, it);
String second = it == end ? "" : String(it + 1, end);
return {std::move(first), std::move(second)};
}
}
NamesAndTypesList collectNested(const NamesAndTypesList & names_and_types)
{
/// Find all columns with dots '.' or underscores '_' and move them into a tuple.
/// For example if we have columns 'a.b UInt32, a.c UInt32, x_y String' we will
/// change it to 'a Tuple(b UInt32, c UInt32), x Tuple(y String)'
NamesAndTypesList result;
std::unordered_map<String, NamesAndTypesList> nested;
for (const auto & [name, type] : names_and_types)
{
auto [field_name, nested_name] = splitName(name);
if (nested_name.empty())
result.emplace_back(name, type);
else
nested[field_name].emplace_back(nested_name, type);
}
for (const auto & [field_name, elements]: nested)
result.emplace_back(field_name, std::make_shared<DataTypeTuple>(elements.getTypes(), elements.getNames()));
return result;
}
NamesAndTypesList getCollectedTupleElements(const DataTypeTuple & tuple_type)
{
const auto & nested_types = tuple_type.getElements();
Names nested_names;
if (tuple_type.haveExplicitNames())
{
nested_names = tuple_type.getElementNames();
}
else
{
nested_names.reserve(nested_types.size());
for (size_t i = 0; i != nested_types.size(); ++i)
nested_names.push_back("e" + std::to_string(i + 1));
}
NamesAndTypesList result;
for (size_t i = 0; i != nested_names.size(); ++i)
result.emplace_back(nested_names[i], nested_types[i]);
return collectNested(result);
}
}
}

View File

@ -0,0 +1,27 @@
#pragma once
#include <Core/NamesAndTypes.h>
#include <DataTypes/NestedUtils.h>
#include <DataTypes/DataTypeTuple.h>
namespace DB
{
namespace StructureToFormatSchemaUtils
{
void writeIndent(WriteBuffer & buf, size_t indent);
void startNested(WriteBuffer & buf, const String & nested_name, const String & nested_type, size_t indent);
void endNested(WriteBuffer & buf, size_t indent);
String getSchemaFieldName(const String & column_name);
String getSchemaMessageName(const String & column_name);
NamesAndTypesList collectNested(const NamesAndTypesList & names_and_types);
NamesAndTypesList getCollectedTupleElements(const DataTypeTuple & tuple_type);
}
}

View File

@ -0,0 +1,214 @@
#include <Formats/StructureToProtobufSchema.h>
#include <Formats/StructureToFormatSchemaUtils.h>
#include <Columns/ColumnString.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeEnum.h>
#include <Common/StringUtils/StringUtils.h>
namespace DB
{
using namespace StructureToFormatSchemaUtils;
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
namespace
{
const std::unordered_map<TypeIndex, String> protobuf_simple_type_names =
{
{TypeIndex::Int8, "int32"},
{TypeIndex::UInt8, "uint32"},
{TypeIndex::Int16, "int32"},
{TypeIndex::UInt16, "uint32"},
{TypeIndex::Int32, "int32"},
{TypeIndex::UInt32, "uint32"},
{TypeIndex::Int64, "int64"},
{TypeIndex::UInt64, "uint64"},
{TypeIndex::Int128, "bytes"},
{TypeIndex::UInt128, "bytes"},
{TypeIndex::Int256, "bytes"},
{TypeIndex::UInt256, "bytes"},
{TypeIndex::Float32, "float"},
{TypeIndex::Float64, "double"},
{TypeIndex::Decimal32, "bytes"},
{TypeIndex::Decimal64, "bytes"},
{TypeIndex::Decimal128, "bytes"},
{TypeIndex::Decimal256, "bytes"},
{TypeIndex::String, "bytes"},
{TypeIndex::FixedString, "bytes"},
{TypeIndex::UUID, "bytes"},
{TypeIndex::Date, "uint32"},
{TypeIndex::Date32, "int32"},
{TypeIndex::DateTime, "uint32"},
{TypeIndex::DateTime64, "uint64"},
{TypeIndex::IPv4, "uint32"},
{TypeIndex::IPv6, "bytes"},
};
void writeProtobufHeader(WriteBuffer & buf)
{
writeCString("syntax = \"proto3\";\n\n", buf);
}
void startEnum(WriteBuffer & buf, const String & enum_name, size_t indent)
{
startNested(buf, enum_name, "enum", indent);
}
void startMessage(WriteBuffer & buf, const String & message_name, size_t indent)
{
startNested(buf, message_name, "message", indent);
}
void writeFieldDefinition(WriteBuffer & buf, const String & type_name, const String & column_name, size_t & field_index, size_t indent)
{
writeIndent(buf, indent);
writeString(fmt::format("{} {} = {};\n", type_name, getSchemaFieldName(column_name), field_index++), buf);
}
String prepareAndGetProtobufTypeName(WriteBuffer & buf, const DataTypePtr & data_type, const String & column_name, size_t indent);
void writeProtobufField(WriteBuffer & buf, const DataTypePtr & data_type, const String & column_name, size_t & field_index, size_t indent)
{
auto field_type_name = prepareAndGetProtobufTypeName(buf, data_type, column_name, indent);
writeFieldDefinition(buf, field_type_name, column_name, field_index, indent);
}
String prepareArrayAndGetProtobufTypeName(WriteBuffer & buf, const DataTypePtr & data_type, const String & column_name, size_t indent)
{
const auto & nested_type = assert_cast<const DataTypeArray &>(*data_type).getNestedType();
/// Simple case when we can just use 'repeated <nested_type>'.
if (!isArray(nested_type) && !isMap(nested_type))
{
auto nested_type_name = prepareAndGetProtobufTypeName(buf, nested_type, column_name, indent);
return "repeated " + nested_type_name;
}
/// Protobuf doesn't support multidimensional repeated fields and repeated maps.
/// When we have Array(Array(...)) or Array(Map(...)) we should place nested type into a nested Message with one field.
String message_name = getSchemaMessageName(column_name);
startMessage(buf, message_name, indent);
size_t nested_field_index = 1;
writeProtobufField(buf, nested_type, column_name, nested_field_index, indent + 1);
endNested(buf, indent);
return "repeated " + message_name;
}
String prepareTupleAndGetProtobufTypeName(WriteBuffer & buf, const DataTypePtr & data_type, const String & column_name, size_t indent)
{
const auto & tuple_type = assert_cast<const DataTypeTuple &>(*data_type);
auto nested_names_and_types = getCollectedTupleElements(tuple_type);
String message_name = getSchemaMessageName(column_name);
startMessage(buf, message_name, indent);
size_t nested_field_index = 1;
for (const auto & [name, type] : nested_names_and_types)
writeProtobufField(buf, type, name, nested_field_index, indent + 1);
endNested(buf, indent);
return message_name;
}
String prepareMapAndGetProtobufTypeName(WriteBuffer & buf, const DataTypePtr & data_type, const String & column_name, size_t indent)
{
const auto & map_type = assert_cast<const DataTypeMap &>(*data_type);
const auto & key_type = map_type.getKeyType();
const auto & value_type = map_type.getValueType();
auto it = protobuf_simple_type_names.find(key_type->getTypeId());
if (it == protobuf_simple_type_names.end())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Type {} is not supported for conversion into Map key in Protobuf schema", data_type->getName());
auto key_type_name = it->second;
/// Protobuf map type doesn't support "bytes" type as a key. Change it to "string"
if (key_type_name == "bytes")
key_type_name = "string";
/// Special cases when value type is Array or Map, because Protobuf
/// doesn't support syntax "map<Key, repeated Value>" and "map<Key, map<..., ...>>"
/// In this case we should place it into a nested Message with one field.
String value_type_name;
if (isArray(value_type) || isMap(value_type))
{
value_type_name = getSchemaMessageName(column_name) + "Value";
startMessage(buf, value_type_name, indent);
size_t nested_field_index = 1;
writeProtobufField(buf, value_type, column_name + "Value", nested_field_index, indent + 1);
endNested(buf, indent);
}
else
{
value_type_name = prepareAndGetProtobufTypeName(buf, value_type, column_name + "Value", indent);
}
return fmt::format("map<{}, {}>", key_type_name, value_type_name);
}
template <typename EnumType>
String prepareEnumAndGetProtobufTypeName(WriteBuffer & buf, const DataTypePtr & data_type, const String & column_name, size_t indent)
{
const auto & enum_type = assert_cast<const DataTypeEnum<EnumType> &>(*data_type);
String enum_name = getSchemaMessageName(column_name);
startEnum(buf, enum_name, indent);
const auto & names = enum_type.getAllRegisteredNames();
for (size_t i = 0; i != names.size(); ++i)
{
writeIndent(buf, indent + 1);
writeString(fmt::format("{} = {};\n", names[i], std::to_string(i)), buf);
}
endNested(buf, indent);
return enum_name;
}
String prepareAndGetProtobufTypeName(WriteBuffer & buf, const DataTypePtr & data_type, const String & column_name, size_t indent)
{
TypeIndex type_id = data_type->getTypeId();
switch (data_type->getTypeId())
{
case TypeIndex::Nullable:
return prepareAndGetProtobufTypeName(buf, assert_cast<const DataTypeNullable &>(*data_type).getNestedType(), column_name, indent);
case TypeIndex::LowCardinality:
return prepareAndGetProtobufTypeName(buf, assert_cast<const DataTypeLowCardinality &>(*data_type).getDictionaryType(), column_name, indent);
case TypeIndex::Array:
return prepareArrayAndGetProtobufTypeName(buf, data_type, column_name, indent);
case TypeIndex::Tuple:
return prepareTupleAndGetProtobufTypeName(buf, data_type, column_name, indent);
case TypeIndex::Map:
return prepareMapAndGetProtobufTypeName(buf, data_type, column_name, indent);
case TypeIndex::Enum8:
return prepareEnumAndGetProtobufTypeName<Int8>(buf, data_type, column_name, indent);
case TypeIndex::Enum16:
return prepareEnumAndGetProtobufTypeName<Int16>(buf, data_type, column_name, indent);
default:
{
if (isBool(data_type))
return "bool";
auto it = protobuf_simple_type_names.find(type_id);
if (it == protobuf_simple_type_names.end())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Type {} is not supported for conversion into Protobuf schema", data_type->getName());
return it->second;
}
}
}
}
void StructureToProtobufSchema::writeSchema(WriteBuffer & buf, const String & message_name, const NamesAndTypesList & names_and_types_)
{
auto names_and_types = collectNested(names_and_types_);
writeProtobufHeader(buf);
startMessage(buf, getSchemaMessageName(message_name), 0);
size_t field_index = 1;
for (const auto & [column_name, data_type] : names_and_types)
writeProtobufField(buf, data_type, column_name, field_index, 1);
endNested(buf, 0);
}
}

View File

@ -0,0 +1,16 @@
#pragma once
#include <IO/WriteBuffer.h>
#include <Core/NamesAndTypes.h>
namespace DB
{
struct StructureToProtobufSchema
{
static constexpr auto name = "structureToProtobufSchema";
static void writeSchema(WriteBuffer & buf, const String & message_name, const NamesAndTypesList & names_and_types_);
};
}

View File

@ -0,0 +1,145 @@
#include <Columns/ColumnString.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeEnum.h>
#include <Functions/FunctionFactory.h>
#include <Functions/IFunction.h>
#include <Interpreters/parseColumnsListForTableFunction.h>
#include <Interpreters/Context.h>
#include <IO/WriteBufferFromVector.h>
#include <Formats/StructureToCapnProtoSchema.h>
#include <Formats/StructureToProtobufSchema.h>
#include <Common/randomSeed.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}
template <class Impl>
class FunctionStructureToFormatSchema : public IFunction
{
public:
static constexpr auto name = Impl::name;
explicit FunctionStructureToFormatSchema(ContextPtr context_) : context(std::move(context_))
{
}
static FunctionPtr create(ContextPtr ctx)
{
return std::make_shared<FunctionStructureToFormatSchema>(std::move(ctx));
}
String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 0; }
bool isVariadic() const override { return true; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {0, 1}; }
bool useDefaultImplementationForConstants() const override { return false; }
bool useDefaultImplementationForNulls() const override { return false; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
if (arguments.empty() || arguments.size() > 2)
throw Exception(
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Number of arguments for function {} doesn't match: passed {}, expected 1 or 2",
getName(), arguments.size());
if (!isString(arguments[0]))
{
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of the first argument of function {}, expected constant string",
arguments[0]->getName(),
getName());
}
if (arguments.size() > 1 && !isString(arguments[1]))
{
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of the second argument of function {}, expected constant string",
arguments[1]->getName(),
getName());
}
return std::make_shared<DataTypeString>();
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
if (arguments.empty() || arguments.size() > 2)
throw Exception(
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Number of arguments for function {} doesn't match: passed {}, expected 1 or 2",
getName(), arguments.size());
String structure = arguments[0].column->getDataAt(0).toString();
String message_name = arguments.size() == 2 ? arguments[1].column->getDataAt(0).toString() : "Message";
auto columns_list = parseColumnsListFromString(structure, context);
auto col_res = ColumnString::create();
auto & data = assert_cast<ColumnString &>(*col_res).getChars();
WriteBufferFromVector buf(data);
Impl::writeSchema(buf, message_name, columns_list.getAll());
buf.finalize();
auto & offsets = assert_cast<ColumnString &>(*col_res).getOffsets();
offsets.push_back(data.size());
return ColumnConst::create(std::move(col_res), input_rows_count);
}
private:
ContextPtr context;
};
REGISTER_FUNCTION(StructureToCapnProtoSchema)
{
factory.registerFunction<FunctionStructureToFormatSchema<StructureToCapnProtoSchema>>(FunctionDocumentation
{
.description=R"(
Function that converts ClickHouse table structure to CapnProto format schema
)",
.examples{
{"random", "SELECT structureToCapnProtoSchema('s String, x UInt32', 'MessageName') format TSVRaw", "struct MessageName\n"
"{\n"
" s @0 : Data;\n"
" x @1 : UInt32;\n"
"}"},
},
.categories{"Other"}
},
FunctionFactory::CaseSensitive);
}
REGISTER_FUNCTION(StructureToProtobufSchema)
{
factory.registerFunction<FunctionStructureToFormatSchema<StructureToProtobufSchema>>(FunctionDocumentation
{
.description=R"(
Function that converts ClickHouse table structure to Protobuf format schema
)",
.examples{
{"random", "SELECT structureToCapnProtoSchema('s String, x UInt32', 'MessageName') format TSVRaw", "syntax = \"proto3\";\n"
"\n"
"message MessageName\n"
"{\n"
" bytes s = 1;\n"
" uint32 x = 2;\n"
"}"},
},
.categories{"Other"}
},
FunctionFactory::CaseSensitive);
}
}

View File

@ -764,7 +764,7 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::getTableProperti
/// Table function without columns list. /// Table function without columns list.
auto table_function_ast = create.as_table_function->ptr(); auto table_function_ast = create.as_table_function->ptr();
auto table_function = TableFunctionFactory::instance().get(table_function_ast, getContext()); auto table_function = TableFunctionFactory::instance().get(table_function_ast, getContext());
properties.columns = table_function->getActualTableStructure(getContext()); properties.columns = table_function->getActualTableStructure(getContext(), /*is_insert_query*/ true);
} }
else if (create.is_dictionary) else if (create.is_dictionary)
{ {

View File

@ -96,7 +96,7 @@ BlockIO InterpreterDescribeQuery::execute()
else if (table_expression.table_function) else if (table_expression.table_function)
{ {
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_expression.table_function, getContext()); TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_expression.table_function, getContext());
auto table_function_column_descriptions = table_function_ptr->getActualTableStructure(getContext()); auto table_function_column_descriptions = table_function_ptr->getActualTableStructure(getContext(), /*is_insert_query*/ true);
for (const auto & table_function_column_description : table_function_column_descriptions) for (const auto & table_function_column_description : table_function_column_descriptions)
columns.emplace_back(table_function_column_description); columns.emplace_back(table_function_column_description);
} }

View File

@ -1034,7 +1034,7 @@ std::shared_ptr<Block> MergeJoin::loadRightBlock(size_t pos) const
{ {
auto load_func = [&]() -> std::shared_ptr<Block> auto load_func = [&]() -> std::shared_ptr<Block>
{ {
TemporaryFileStreamLegacy input(flushed_right_blocks[pos]->getPath(), materializeBlock(right_sample_block)); TemporaryFileStreamLegacy input(flushed_right_blocks[pos]->getAbsolutePath(), materializeBlock(right_sample_block));
return std::make_shared<Block>(input.block_in->read()); return std::make_shared<Block>(input.block_in->read());
}; };

View File

@ -39,7 +39,7 @@ namespace
TemporaryFileOnDiskHolder flushToFile(const DiskPtr & disk, const Block & header, QueryPipelineBuilder pipeline, const String & codec) TemporaryFileOnDiskHolder flushToFile(const DiskPtr & disk, const Block & header, QueryPipelineBuilder pipeline, const String & codec)
{ {
auto tmp_file = std::make_unique<TemporaryFileOnDisk>(disk, CurrentMetrics::TemporaryFilesForJoin); auto tmp_file = std::make_unique<TemporaryFileOnDisk>(disk, CurrentMetrics::TemporaryFilesForJoin);
auto write_stat = TemporaryFileStreamLegacy::write(tmp_file->getPath(), header, std::move(pipeline), codec); auto write_stat = TemporaryFileStreamLegacy::write(tmp_file->getAbsolutePath(), header, std::move(pipeline), codec);
ProfileEvents::increment(ProfileEvents::ExternalProcessingCompressedBytesTotal, write_stat.compressed_bytes); ProfileEvents::increment(ProfileEvents::ExternalProcessingCompressedBytesTotal, write_stat.compressed_bytes);
ProfileEvents::increment(ProfileEvents::ExternalProcessingUncompressedBytesTotal, write_stat.uncompressed_bytes); ProfileEvents::increment(ProfileEvents::ExternalProcessingUncompressedBytesTotal, write_stat.uncompressed_bytes);
@ -267,7 +267,7 @@ SortedBlocksWriter::SortedFiles SortedBlocksWriter::finishMerge(std::function<vo
Pipe SortedBlocksWriter::streamFromFile(const TmpFilePtr & file) const Pipe SortedBlocksWriter::streamFromFile(const TmpFilePtr & file) const
{ {
return Pipe(std::make_shared<TemporaryFileLazySource>(file->getPath(), materializeBlock(sample_block))); return Pipe(std::make_shared<TemporaryFileLazySource>(file->getAbsolutePath(), materializeBlock(sample_block)));
} }

View File

@ -235,9 +235,9 @@ TemporaryFileStream::TemporaryFileStream(TemporaryFileOnDiskHolder file_, const
: parent(parent_) : parent(parent_)
, header(header_) , header(header_)
, file(std::move(file_)) , file(std::move(file_))
, out_writer(std::make_unique<OutputWriter>(std::make_unique<WriteBufferFromFile>(file->getPath()), header)) , out_writer(std::make_unique<OutputWriter>(std::make_unique<WriteBufferFromFile>(file->getAbsolutePath()), header))
{ {
LOG_TEST(&Poco::Logger::get("TemporaryFileStream"), "Writing to temporary file {}", file->getPath()); LOG_TEST(&Poco::Logger::get("TemporaryFileStream"), "Writing to temporary file {}", file->getAbsolutePath());
} }
TemporaryFileStream::TemporaryFileStream(FileSegmentsHolderPtr segments_, const Block & header_, TemporaryDataOnDisk * parent_) TemporaryFileStream::TemporaryFileStream(FileSegmentsHolderPtr segments_, const Block & header_, TemporaryDataOnDisk * parent_)
@ -365,7 +365,7 @@ void TemporaryFileStream::release()
String TemporaryFileStream::getPath() const String TemporaryFileStream::getPath() const
{ {
if (file) if (file)
return file->getPath(); return file->getAbsolutePath();
if (segment_holder && !segment_holder->empty()) if (segment_holder && !segment_holder->empty())
return segment_holder->front().getPathInLocalCache(); return segment_holder->front().getPathInLocalCache();

View File

@ -17,12 +17,12 @@ namespace ErrorCodes
extern const int INCORRECT_DATA; extern const int INCORRECT_DATA;
} }
CapnProtoRowInputFormat::CapnProtoRowInputFormat(ReadBuffer & in_, Block header_, Params params_, const FormatSchemaInfo & info, const FormatSettings & format_settings) CapnProtoRowInputFormat::CapnProtoRowInputFormat(ReadBuffer & in_, Block header_, Params params_, const CapnProtoSchemaInfo & info, const FormatSettings & format_settings)
: IRowInputFormat(std::move(header_), in_, std::move(params_)) : IRowInputFormat(std::move(header_), in_, std::move(params_))
, parser(std::make_shared<CapnProtoSchemaParser>()) , parser(std::make_shared<CapnProtoSchemaParser>())
{ {
// Parse the schema and fetch the root object // Parse the schema and fetch the root object
schema = parser->getMessageSchema(info); schema = parser->getMessageSchema(info.getSchemaInfo());
const auto & header = getPort().getHeader(); const auto & header = getPort().getHeader();
serializer = std::make_unique<CapnProtoSerializer>(header.getDataTypes(), header.getNames(), schema, format_settings.capn_proto); serializer = std::make_unique<CapnProtoSerializer>(header.getDataTypes(), header.getNames(), schema, format_settings.capn_proto);
} }
@ -106,8 +106,12 @@ void registerInputFormatCapnProto(FormatFactory & factory)
"CapnProto", "CapnProto",
[](ReadBuffer & buf, const Block & sample, IRowInputFormat::Params params, const FormatSettings & settings) [](ReadBuffer & buf, const Block & sample, IRowInputFormat::Params params, const FormatSettings & settings)
{ {
return std::make_shared<CapnProtoRowInputFormat>(buf, sample, std::move(params), return std::make_shared<CapnProtoRowInputFormat>(
FormatSchemaInfo(settings, "CapnProto", true), settings); buf,
sample,
std::move(params),
CapnProtoSchemaInfo(settings, "CapnProto", sample, settings.capn_proto.use_autogenerated_schema),
settings);
}); });
factory.markFormatSupportsSubsetOfColumns("CapnProto"); factory.markFormatSupportsSubsetOfColumns("CapnProto");
factory.registerFileExtension("capnp", "CapnProto"); factory.registerFileExtension("capnp", "CapnProto");

View File

@ -24,7 +24,7 @@ class ReadBuffer;
class CapnProtoRowInputFormat final : public IRowInputFormat class CapnProtoRowInputFormat final : public IRowInputFormat
{ {
public: public:
CapnProtoRowInputFormat(ReadBuffer & in_, Block header, Params params_, const FormatSchemaInfo & info, const FormatSettings & format_settings_); CapnProtoRowInputFormat(ReadBuffer & in_, Block header, Params params_, const CapnProtoSchemaInfo & info, const FormatSettings & format_settings);
String getName() const override { return "CapnProtoRowInputFormat"; } String getName() const override { return "CapnProtoRowInputFormat"; }

View File

@ -23,14 +23,14 @@ void CapnProtoOutputStream::write(const void * buffer, size_t size)
CapnProtoRowOutputFormat::CapnProtoRowOutputFormat( CapnProtoRowOutputFormat::CapnProtoRowOutputFormat(
WriteBuffer & out_, WriteBuffer & out_,
const Block & header_, const Block & header_,
const FormatSchemaInfo & info, const CapnProtoSchemaInfo & info,
const FormatSettings & format_settings) const FormatSettings & format_settings)
: IRowOutputFormat(header_, out_) : IRowOutputFormat(header_, out_)
, column_names(header_.getNames()) , column_names(header_.getNames())
, column_types(header_.getDataTypes()) , column_types(header_.getDataTypes())
, output_stream(std::make_unique<CapnProtoOutputStream>(out_)) , output_stream(std::make_unique<CapnProtoOutputStream>(out_))
{ {
schema = schema_parser.getMessageSchema(info); schema = schema_parser.getMessageSchema(info.getSchemaInfo());
const auto & header = getPort(PortKind::Main).getHeader(); const auto & header = getPort(PortKind::Main).getHeader();
serializer = std::make_unique<CapnProtoSerializer>(header.getDataTypes(), header.getNames(), schema, format_settings.capn_proto); serializer = std::make_unique<CapnProtoSerializer>(header.getDataTypes(), header.getNames(), schema, format_settings.capn_proto);
capnp::MallocMessageBuilder message; capnp::MallocMessageBuilder message;
@ -52,7 +52,11 @@ void registerOutputFormatCapnProto(FormatFactory & factory)
const Block & sample, const Block & sample,
const FormatSettings & format_settings) const FormatSettings & format_settings)
{ {
return std::make_shared<CapnProtoRowOutputFormat>(buf, sample, FormatSchemaInfo(format_settings, "CapnProto", true), format_settings); return std::make_shared<CapnProtoRowOutputFormat>(
buf,
sample,
CapnProtoSchemaInfo(format_settings, "CapnProto", sample, format_settings.capn_proto.use_autogenerated_schema),
format_settings);
}); });
} }

View File

@ -31,8 +31,8 @@ public:
CapnProtoRowOutputFormat( CapnProtoRowOutputFormat(
WriteBuffer & out_, WriteBuffer & out_,
const Block & header_, const Block & header_,
const FormatSchemaInfo & info, const CapnProtoSchemaInfo & info,
const FormatSettings & format_settings_); const FormatSettings & format_settings);
String getName() const override { return "CapnProtoRowOutputFormat"; } String getName() const override { return "CapnProtoRowOutputFormat"; }

View File

@ -14,7 +14,7 @@ ProtobufListInputFormat::ProtobufListInputFormat(
ReadBuffer & in_, ReadBuffer & in_,
const Block & header_, const Block & header_,
const Params & params_, const Params & params_,
const FormatSchemaInfo & schema_info_, const ProtobufSchemaInfo & schema_info_,
bool flatten_google_wrappers_) bool flatten_google_wrappers_)
: IRowInputFormat(header_, in_, params_) : IRowInputFormat(header_, in_, params_)
, reader(std::make_unique<ProtobufReader>(in_)) , reader(std::make_unique<ProtobufReader>(in_))
@ -22,7 +22,7 @@ ProtobufListInputFormat::ProtobufListInputFormat(
header_.getNames(), header_.getNames(),
header_.getDataTypes(), header_.getDataTypes(),
missing_column_indices, missing_column_indices,
*ProtobufSchemas::instance().getMessageTypeForFormatSchema(schema_info_, ProtobufSchemas::WithEnvelope::Yes), *ProtobufSchemas::instance().getMessageTypeForFormatSchema(schema_info_.getSchemaInfo(), ProtobufSchemas::WithEnvelope::Yes),
/* with_length_delimiter = */ true, /* with_length_delimiter = */ true,
/* with_envelope = */ true, /* with_envelope = */ true,
flatten_google_wrappers_, flatten_google_wrappers_,
@ -84,7 +84,7 @@ void registerInputFormatProtobufList(FormatFactory & factory)
const FormatSettings & settings) const FormatSettings & settings)
{ {
return std::make_shared<ProtobufListInputFormat>(buf, sample, std::move(params), return std::make_shared<ProtobufListInputFormat>(buf, sample, std::move(params),
FormatSchemaInfo(settings, "Protobuf", true), settings.protobuf.input_flatten_google_wrappers); ProtobufSchemaInfo(settings, "Protobuf", sample, settings.protobuf.use_autogenerated_schema), settings.protobuf.input_flatten_google_wrappers);
}); });
factory.markFormatSupportsSubsetOfColumns("ProtobufList"); factory.markFormatSupportsSubsetOfColumns("ProtobufList");
factory.registerAdditionalInfoForSchemaCacheGetter( factory.registerAdditionalInfoForSchemaCacheGetter(

View File

@ -28,7 +28,7 @@ public:
ReadBuffer & in_, ReadBuffer & in_,
const Block & header_, const Block & header_,
const Params & params_, const Params & params_,
const FormatSchemaInfo & schema_info_, const ProtobufSchemaInfo & schema_info_,
bool flatten_google_wrappers_); bool flatten_google_wrappers_);
String getName() const override { return "ProtobufListInputFormat"; } String getName() const override { return "ProtobufListInputFormat"; }

View File

@ -2,7 +2,6 @@
#if USE_PROTOBUF #if USE_PROTOBUF
# include <Formats/FormatFactory.h> # include <Formats/FormatFactory.h>
# include <Formats/FormatSchemaInfo.h>
# include <Formats/ProtobufWriter.h> # include <Formats/ProtobufWriter.h>
# include <Formats/ProtobufSerializer.h> # include <Formats/ProtobufSerializer.h>
# include <Formats/ProtobufSchemas.h> # include <Formats/ProtobufSchemas.h>
@ -13,14 +12,14 @@ namespace DB
ProtobufListOutputFormat::ProtobufListOutputFormat( ProtobufListOutputFormat::ProtobufListOutputFormat(
WriteBuffer & out_, WriteBuffer & out_,
const Block & header_, const Block & header_,
const FormatSchemaInfo & schema_info_, const ProtobufSchemaInfo & schema_info_,
bool defaults_for_nullable_google_wrappers_) bool defaults_for_nullable_google_wrappers_)
: IRowOutputFormat(header_, out_) : IRowOutputFormat(header_, out_)
, writer(std::make_unique<ProtobufWriter>(out)) , writer(std::make_unique<ProtobufWriter>(out))
, serializer(ProtobufSerializer::create( , serializer(ProtobufSerializer::create(
header_.getNames(), header_.getNames(),
header_.getDataTypes(), header_.getDataTypes(),
*ProtobufSchemas::instance().getMessageTypeForFormatSchema(schema_info_, ProtobufSchemas::WithEnvelope::Yes), *ProtobufSchemas::instance().getMessageTypeForFormatSchema(schema_info_.getSchemaInfo(), ProtobufSchemas::WithEnvelope::Yes),
/* with_length_delimiter = */ true, /* with_length_delimiter = */ true,
/* with_envelope = */ true, /* with_envelope = */ true,
defaults_for_nullable_google_wrappers_, defaults_for_nullable_google_wrappers_,
@ -55,7 +54,7 @@ void registerOutputFormatProtobufList(FormatFactory & factory)
const FormatSettings & settings) const FormatSettings & settings)
{ {
return std::make_shared<ProtobufListOutputFormat>( return std::make_shared<ProtobufListOutputFormat>(
buf, header, FormatSchemaInfo(settings, "Protobuf", true), buf, header, ProtobufSchemaInfo(settings, "Protobuf", header, settings.protobuf.use_autogenerated_schema),
settings.protobuf.output_nullables_with_google_wrappers); settings.protobuf.output_nullables_with_google_wrappers);
}); });
} }

View File

@ -4,10 +4,10 @@
#if USE_PROTOBUF #if USE_PROTOBUF
# include <Processors/Formats/IRowOutputFormat.h> # include <Processors/Formats/IRowOutputFormat.h>
# include <Formats/FormatSchemaInfo.h>
namespace DB namespace DB
{ {
class FormatSchemaInfo;
class ProtobufWriter; class ProtobufWriter;
class ProtobufSerializer; class ProtobufSerializer;
@ -26,7 +26,7 @@ public:
ProtobufListOutputFormat( ProtobufListOutputFormat(
WriteBuffer & out_, WriteBuffer & out_,
const Block & header_, const Block & header_,
const FormatSchemaInfo & schema_info_, const ProtobufSchemaInfo & schema_info_,
bool defaults_for_nullable_google_wrappers_); bool defaults_for_nullable_google_wrappers_);
String getName() const override { return "ProtobufListOutputFormat"; } String getName() const override { return "ProtobufListOutputFormat"; }

View File

@ -11,9 +11,9 @@ namespace DB
{ {
ProtobufRowInputFormat::ProtobufRowInputFormat(ReadBuffer & in_, const Block & header_, const Params & params_, ProtobufRowInputFormat::ProtobufRowInputFormat(ReadBuffer & in_, const Block & header_, const Params & params_,
const FormatSchemaInfo & schema_info_, bool with_length_delimiter_, bool flatten_google_wrappers_) const ProtobufSchemaInfo & schema_info_, bool with_length_delimiter_, bool flatten_google_wrappers_)
: IRowInputFormat(header_, in_, params_) : IRowInputFormat(header_, in_, params_)
, message_descriptor(ProtobufSchemas::instance().getMessageTypeForFormatSchema(schema_info_, ProtobufSchemas::WithEnvelope::No)) , message_descriptor(ProtobufSchemas::instance().getMessageTypeForFormatSchema(schema_info_.getSchemaInfo(), ProtobufSchemas::WithEnvelope::No))
, with_length_delimiter(with_length_delimiter_) , with_length_delimiter(with_length_delimiter_)
, flatten_google_wrappers(flatten_google_wrappers_) , flatten_google_wrappers(flatten_google_wrappers_)
{ {
@ -89,7 +89,7 @@ void registerInputFormatProtobuf(FormatFactory & factory)
const FormatSettings & settings) const FormatSettings & settings)
{ {
return std::make_shared<ProtobufRowInputFormat>(buf, sample, std::move(params), return std::make_shared<ProtobufRowInputFormat>(buf, sample, std::move(params),
FormatSchemaInfo(settings, "Protobuf", true), ProtobufSchemaInfo(settings, "Protobuf", sample, settings.protobuf.use_autogenerated_schema),
with_length_delimiter, with_length_delimiter,
settings.protobuf.input_flatten_google_wrappers); settings.protobuf.input_flatten_google_wrappers);
}); });

View File

@ -33,7 +33,7 @@ public:
ReadBuffer & in_, ReadBuffer & in_,
const Block & header_, const Block & header_,
const Params & params_, const Params & params_,
const FormatSchemaInfo & schema_info_, const ProtobufSchemaInfo & schema_info_,
bool with_length_delimiter_, bool with_length_delimiter_,
bool flatten_google_wrappers_); bool flatten_google_wrappers_);

View File

@ -3,7 +3,6 @@
#if USE_PROTOBUF #if USE_PROTOBUF
# include <Formats/FormatFactory.h> # include <Formats/FormatFactory.h>
# include <Core/Block.h> # include <Core/Block.h>
# include <Formats/FormatSchemaInfo.h>
# include <Formats/FormatSettings.h> # include <Formats/FormatSettings.h>
# include <Formats/ProtobufSchemas.h> # include <Formats/ProtobufSchemas.h>
# include <Formats/ProtobufSerializer.h> # include <Formats/ProtobufSerializer.h>
@ -20,7 +19,7 @@ namespace ErrorCodes
ProtobufRowOutputFormat::ProtobufRowOutputFormat( ProtobufRowOutputFormat::ProtobufRowOutputFormat(
WriteBuffer & out_, WriteBuffer & out_,
const Block & header_, const Block & header_,
const FormatSchemaInfo & schema_info_, const ProtobufSchemaInfo & schema_info_,
const FormatSettings & settings_, const FormatSettings & settings_,
bool with_length_delimiter_) bool with_length_delimiter_)
: IRowOutputFormat(header_, out_) : IRowOutputFormat(header_, out_)
@ -28,7 +27,7 @@ ProtobufRowOutputFormat::ProtobufRowOutputFormat(
, serializer(ProtobufSerializer::create( , serializer(ProtobufSerializer::create(
header_.getNames(), header_.getNames(),
header_.getDataTypes(), header_.getDataTypes(),
*ProtobufSchemas::instance().getMessageTypeForFormatSchema(schema_info_, ProtobufSchemas::WithEnvelope::No), *ProtobufSchemas::instance().getMessageTypeForFormatSchema(schema_info_.getSchemaInfo(), ProtobufSchemas::WithEnvelope::No),
with_length_delimiter_, with_length_delimiter_,
/* with_envelope = */ false, /* with_envelope = */ false,
settings_.protobuf.output_nullables_with_google_wrappers, settings_.protobuf.output_nullables_with_google_wrappers,
@ -61,7 +60,7 @@ void registerOutputFormatProtobuf(FormatFactory & factory)
const FormatSettings & settings) const FormatSettings & settings)
{ {
return std::make_shared<ProtobufRowOutputFormat>( return std::make_shared<ProtobufRowOutputFormat>(
buf, header, FormatSchemaInfo(settings, "Protobuf", true), buf, header, ProtobufSchemaInfo(settings, "Protobuf", header, settings.protobuf.use_autogenerated_schema),
settings, with_length_delimiter); settings, with_length_delimiter);
}); });
} }

View File

@ -4,11 +4,11 @@
#if USE_PROTOBUF #if USE_PROTOBUF
# include <Processors/Formats/IRowOutputFormat.h> # include <Processors/Formats/IRowOutputFormat.h>
# include <Formats/FormatSchemaInfo.h>
namespace DB namespace DB
{ {
class DB; class DB;
class FormatSchemaInfo;
class ProtobufSerializer; class ProtobufSerializer;
class ProtobufWriter; class ProtobufWriter;
class WriteBuffer; class WriteBuffer;
@ -30,7 +30,7 @@ public:
ProtobufRowOutputFormat( ProtobufRowOutputFormat(
WriteBuffer & out_, WriteBuffer & out_,
const Block & header_, const Block & header_,
const FormatSchemaInfo & schema_info_, const ProtobufSchemaInfo & schema_info_,
const FormatSettings & settings_, const FormatSettings & settings_,
bool with_length_delimiter_); bool with_length_delimiter_);

View File

@ -5,6 +5,7 @@ namespace DB
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int LOGICAL_ERROR;
extern const int SET_SIZE_LIMIT_EXCEEDED; extern const int SET_SIZE_LIMIT_EXCEEDED;
} }
@ -126,9 +127,20 @@ bool DistinctSortedChunkTransform::isKey(const size_t key_pos, const size_t row_
bool DistinctSortedChunkTransform::isLatestKeyFromPrevChunk(const size_t row_pos) const bool DistinctSortedChunkTransform::isLatestKeyFromPrevChunk(const size_t row_pos) const
{ {
for (size_t i = 0; i < sorted_columns.size(); ++i) for (size_t i = 0, s = sorted_columns.size(); i < s; ++i)
{ {
const int res = prev_chunk_latest_key[i]->compareAt(0, row_pos, *sorted_columns[i], sorted_columns_descr[i].nulls_direction); const auto & sorted_column = *sorted_columns[i];
/// temporary hardening due to suspious crashes in sqlancer tests
if (unlikely(sorted_column.size() <= row_pos))
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Unexpected size of a sorted column: size {}, row_pos {}, column position {}, type {}",
sorted_column.size(),
row_pos,
i,
sorted_column.getFamilyName());
const int res = prev_chunk_latest_key[i]->compareAt(0, row_pos, sorted_column, sorted_columns_descr[i].nulls_direction);
if (res != 0) if (res != 0)
return false; return false;
} }

View File

@ -159,7 +159,7 @@ void PartialSortingTransform::transform(Chunk & chunk)
{ {
MutableColumnPtr sort_description_threshold_column_updated = raw_block_columns[i]->cloneEmpty(); MutableColumnPtr sort_description_threshold_column_updated = raw_block_columns[i]->cloneEmpty();
sort_description_threshold_column_updated->insertFrom(*raw_block_columns[i], min_row_to_compare); sort_description_threshold_column_updated->insertFrom(*raw_block_columns[i], min_row_to_compare);
sort_description_threshold_columns_updated[i] = std::move(sort_description_threshold_column_updated); sort_description_threshold_columns_updated[i] = sort_description_threshold_column_updated->convertToFullColumnIfSparse();
} }
sort_description_threshold_columns = std::move(sort_description_threshold_columns_updated); sort_description_threshold_columns = std::move(sort_description_threshold_columns_updated);

View File

@ -250,15 +250,16 @@ StorageKafka::StorageKafka(
: IStorage(table_id_) : IStorage(table_id_)
, WithContext(context_->getGlobalContext()) , WithContext(context_->getGlobalContext())
, kafka_settings(std::move(kafka_settings_)) , kafka_settings(std::move(kafka_settings_))
, topics(parseTopics(getContext()->getMacros()->expand(kafka_settings->kafka_topic_list.value))) , macros_info{.table_id = table_id_}
, brokers(getContext()->getMacros()->expand(kafka_settings->kafka_broker_list.value)) , topics(parseTopics(getContext()->getMacros()->expand(kafka_settings->kafka_topic_list.value, macros_info)))
, group(getContext()->getMacros()->expand(kafka_settings->kafka_group_name.value)) , brokers(getContext()->getMacros()->expand(kafka_settings->kafka_broker_list.value, macros_info))
, group(getContext()->getMacros()->expand(kafka_settings->kafka_group_name.value, macros_info))
, client_id( , client_id(
kafka_settings->kafka_client_id.value.empty() ? getDefaultClientId(table_id_) kafka_settings->kafka_client_id.value.empty() ? getDefaultClientId(table_id_)
: getContext()->getMacros()->expand(kafka_settings->kafka_client_id.value)) : getContext()->getMacros()->expand(kafka_settings->kafka_client_id.value, macros_info))
, format_name(getContext()->getMacros()->expand(kafka_settings->kafka_format.value)) , format_name(getContext()->getMacros()->expand(kafka_settings->kafka_format.value))
, max_rows_per_message(kafka_settings->kafka_max_rows_per_message.value) , max_rows_per_message(kafka_settings->kafka_max_rows_per_message.value)
, schema_name(getContext()->getMacros()->expand(kafka_settings->kafka_schema.value)) , schema_name(getContext()->getMacros()->expand(kafka_settings->kafka_schema.value, macros_info))
, num_consumers(kafka_settings->kafka_num_consumers.value) , num_consumers(kafka_settings->kafka_num_consumers.value)
, log(&Poco::Logger::get("StorageKafka (" + table_id_.table_name + ")")) , log(&Poco::Logger::get("StorageKafka (" + table_id_.table_name + ")"))
, semaphore(0, static_cast<int>(num_consumers)) , semaphore(0, static_cast<int>(num_consumers))

View File

@ -1,5 +1,6 @@
#pragma once #pragma once
#include <Common/Macros.h>
#include <Core/BackgroundSchedulePool.h> #include <Core/BackgroundSchedulePool.h>
#include <Storages/IStorage.h> #include <Storages/IStorage.h>
#include <Storages/Kafka/KafkaConsumer.h> #include <Storages/Kafka/KafkaConsumer.h>
@ -79,6 +80,7 @@ public:
private: private:
// Configuration and state // Configuration and state
std::unique_ptr<KafkaSettings> kafka_settings; std::unique_ptr<KafkaSettings> kafka_settings;
Macros::MacroExpansionInfo macros_info;
const Names topics; const Names topics;
const String brokers; const String brokers;
const String group; const String group;

View File

@ -350,7 +350,7 @@ void DataPartStorageOnDiskBase::backup(
temp_dir_it = temp_dirs->emplace(disk, std::make_shared<TemporaryFileOnDisk>(disk, "tmp/")).first; temp_dir_it = temp_dirs->emplace(disk, std::make_shared<TemporaryFileOnDisk>(disk, "tmp/")).first;
temp_dir_owner = temp_dir_it->second; temp_dir_owner = temp_dir_it->second;
fs::path temp_dir = temp_dir_owner->getPath(); fs::path temp_dir = temp_dir_owner->getRelativePath();
temp_part_dir = temp_dir / part_path_in_backup.relative_path(); temp_part_dir = temp_dir / part_path_in_backup.relative_path();
disk->createDirectories(temp_part_dir); disk->createDirectories(temp_part_dir);
} }

View File

@ -5266,7 +5266,7 @@ public:
auto it = temp_dirs.find(disk); auto it = temp_dirs.find(disk);
if (it == temp_dirs.end()) if (it == temp_dirs.end())
it = temp_dirs.emplace(disk, std::make_shared<TemporaryFileOnDisk>(disk, "tmp/")).first; it = temp_dirs.emplace(disk, std::make_shared<TemporaryFileOnDisk>(disk, "tmp/")).first;
return it->second->getPath(); return it->second->getRelativePath();
} }
private: private:

View File

@ -86,7 +86,7 @@ const std::unordered_set<std::string_view> optional_configuration_keys = {
bool isConnectionString(const std::string & candidate) bool isConnectionString(const std::string & candidate)
{ {
return candidate.starts_with("DefaultEndpointsProtocol"); return !candidate.starts_with("http");
} }
} }
@ -257,7 +257,7 @@ void registerStorageAzureBlob(StorageFactory & factory)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "External data source must have arguments"); throw Exception(ErrorCodes::BAD_ARGUMENTS, "External data source must have arguments");
auto configuration = StorageAzureBlob::getConfiguration(engine_args, args.getLocalContext()); auto configuration = StorageAzureBlob::getConfiguration(engine_args, args.getLocalContext());
auto client = StorageAzureBlob::createClient(configuration); auto client = StorageAzureBlob::createClient(configuration, /* is_read_only */ false);
// Use format settings from global server context + settings from // Use format settings from global server context + settings from
// the SETTINGS clause of the create query. Settings from current // the SETTINGS clause of the create query. Settings from current
// session and user are ignored. // session and user are ignored.
@ -309,58 +309,113 @@ void registerStorageAzureBlob(StorageFactory & factory)
}); });
} }
AzureClientPtr StorageAzureBlob::createClient(StorageAzureBlob::Configuration configuration) static bool containerExists(std::unique_ptr<BlobServiceClient> &blob_service_client, std::string container_name)
{
Azure::Storage::Blobs::ListBlobContainersOptions options;
options.Prefix = container_name;
options.PageSizeHint = 1;
auto containers_list_response = blob_service_client->ListBlobContainers(options);
auto containers_list = containers_list_response.BlobContainers;
for (const auto & container : containers_list)
{
if (container_name == container.Name)
return true;
}
return false;
}
AzureClientPtr StorageAzureBlob::createClient(StorageAzureBlob::Configuration configuration, bool is_read_only)
{ {
AzureClientPtr result; AzureClientPtr result;
if (configuration.is_connection_string) if (configuration.is_connection_string)
{ {
std::unique_ptr<BlobServiceClient> blob_service_client = std::make_unique<BlobServiceClient>(BlobServiceClient::CreateFromConnectionString(configuration.connection_url));
result = std::make_unique<BlobContainerClient>(BlobContainerClient::CreateFromConnectionString(configuration.connection_url, configuration.container)); result = std::make_unique<BlobContainerClient>(BlobContainerClient::CreateFromConnectionString(configuration.connection_url, configuration.container));
result->CreateIfNotExists(); bool container_exists = containerExists(blob_service_client,configuration.container);
}
else if (!container_exists)
{ {
if (configuration.account_name.has_value() && configuration.account_key.has_value()) if (is_read_only)
{ throw Exception(
auto storage_shared_key_credential = std::make_shared<Azure::Storage::StorageSharedKeyCredential>(*configuration.account_name, *configuration.account_key); ErrorCodes::DATABASE_ACCESS_DENIED,
auto blob_service_client = std::make_unique<BlobServiceClient>(configuration.connection_url, storage_shared_key_credential); "AzureBlobStorage container does not exist '{}'",
configuration.container);
try try
{ {
result = std::make_unique<BlobContainerClient>(blob_service_client->CreateBlobContainer(configuration.container).Value); result->CreateIfNotExists();
} } catch (const Azure::Storage::StorageException & e)
catch (const Azure::Storage::StorageException & e)
{ {
if (e.StatusCode == Azure::Core::Http::HttpStatusCode::Conflict) if (!(e.StatusCode == Azure::Core::Http::HttpStatusCode::Conflict
{ && e.ReasonPhrase == "The specified container already exists."))
auto final_url = configuration.connection_url
+ (configuration.connection_url.back() == '/' ? "" : "/")
+ configuration.container;
result = std::make_unique<BlobContainerClient>(final_url, storage_shared_key_credential);
}
else
{ {
throw; throw;
} }
} }
} }
}
else else
{ {
auto managed_identity_credential = std::make_shared<Azure::Identity::ManagedIdentityCredential>(); std::shared_ptr<Azure::Storage::StorageSharedKeyCredential> storage_shared_key_credential;
auto blob_service_client = std::make_unique<BlobServiceClient>(configuration.connection_url, managed_identity_credential); if (configuration.account_name.has_value() && configuration.account_key.has_value())
{
storage_shared_key_credential
= std::make_shared<Azure::Storage::StorageSharedKeyCredential>(*configuration.account_name, *configuration.account_key);
}
std::unique_ptr<BlobServiceClient> blob_service_client;
if (storage_shared_key_credential)
{
blob_service_client = std::make_unique<BlobServiceClient>(configuration.connection_url, storage_shared_key_credential);
}
else
{
blob_service_client = std::make_unique<BlobServiceClient>(configuration.connection_url);
}
bool container_exists = containerExists(blob_service_client,configuration.container);
std::string final_url;
size_t pos = configuration.connection_url.find('?');
if (pos != std::string::npos)
{
auto url_without_sas = configuration.connection_url.substr(0, pos);
final_url = url_without_sas + (url_without_sas.back() == '/' ? "" : "/") + configuration.container
+ configuration.connection_url.substr(pos);
}
else
final_url
= configuration.connection_url + (configuration.connection_url.back() == '/' ? "" : "/") + configuration.container;
if (container_exists)
{
if (storage_shared_key_credential)
result = std::make_unique<BlobContainerClient>(final_url, storage_shared_key_credential);
else
result = std::make_unique<BlobContainerClient>(final_url);
}
else
{
if (is_read_only)
throw Exception(
ErrorCodes::DATABASE_ACCESS_DENIED,
"AzureBlobStorage container does not exist '{}'",
configuration.container);
try try
{ {
result = std::make_unique<BlobContainerClient>(blob_service_client->CreateBlobContainer(configuration.container).Value); result = std::make_unique<BlobContainerClient>(blob_service_client->CreateBlobContainer(configuration.container).Value);
} } catch (const Azure::Storage::StorageException & e)
catch (const Azure::Storage::StorageException & e)
{ {
if (e.StatusCode == Azure::Core::Http::HttpStatusCode::Conflict) if (e.StatusCode == Azure::Core::Http::HttpStatusCode::Conflict
&& e.ReasonPhrase == "The specified container already exists.")
{ {
auto final_url = configuration.connection_url if (storage_shared_key_credential)
+ (configuration.connection_url.back() == '/' ? "" : "/") result = std::make_unique<BlobContainerClient>(final_url, storage_shared_key_credential);
+ configuration.container; else
result = std::make_unique<BlobContainerClient>(final_url);
result = std::make_unique<BlobContainerClient>(final_url, managed_identity_credential);
} }
else else
{ {
@ -438,7 +493,7 @@ void StorageAzureBlob::truncate(const ASTPtr &, const StorageMetadataPtr &, Cont
{ {
throw Exception( throw Exception(
ErrorCodes::DATABASE_ACCESS_DENIED, ErrorCodes::DATABASE_ACCESS_DENIED,
"S3 key '{}' contains globs, so the table is in readonly mode", "AzureBlobStorage key '{}' contains globs, so the table is in readonly mode",
configuration.blob_path); configuration.blob_path);
} }
@ -1203,7 +1258,7 @@ ColumnsDescription StorageAzureBlob::getTableStructureFromData(
return nullptr; return nullptr;
} }
/// S3 file iterator could get new keys after new iteration, check them in schema cache. ///AzureBlobStorage file iterator could get new keys after new iteration, check them in schema cache.
if (ctx->getSettingsRef().schema_inference_use_cache_for_azure && read_keys.size() > prev_read_keys_size) if (ctx->getSettingsRef().schema_inference_use_cache_for_azure && read_keys.size() > prev_read_keys_size)
{ {
columns_from_cache = tryGetColumnsFromCache(read_keys.begin() + prev_read_keys_size, read_keys.end(), configuration, format_settings, ctx); columns_from_cache = tryGetColumnsFromCache(read_keys.begin() + prev_read_keys_size, read_keys.end(), configuration, format_settings, ctx);

View File

@ -65,7 +65,7 @@ public:
ASTPtr partition_by_); ASTPtr partition_by_);
static StorageAzureBlob::Configuration getConfiguration(ASTs & engine_args, ContextPtr local_context); static StorageAzureBlob::Configuration getConfiguration(ASTs & engine_args, ContextPtr local_context);
static AzureClientPtr createClient(StorageAzureBlob::Configuration configuration); static AzureClientPtr createClient(StorageAzureBlob::Configuration configuration, bool is_read_only);
static AzureObjectStorage::SettingsPtr createSettings(ContextPtr local_context); static AzureObjectStorage::SettingsPtr createSettings(ContextPtr local_context);

View File

@ -946,7 +946,7 @@ void StorageLog::backupData(BackupEntriesCollector & backup_entries_collector, c
fs::path data_path_in_backup_fs = data_path_in_backup; fs::path data_path_in_backup_fs = data_path_in_backup;
auto temp_dir_owner = std::make_shared<TemporaryFileOnDisk>(disk, "tmp/"); auto temp_dir_owner = std::make_shared<TemporaryFileOnDisk>(disk, "tmp/");
fs::path temp_dir = temp_dir_owner->getPath(); fs::path temp_dir = temp_dir_owner->getRelativePath();
disk->createDirectories(temp_dir); disk->createDirectories(temp_dir);
bool copy_encrypted = !backup_entries_collector.getBackupSettings().decrypt_files_from_encrypted_disks; bool copy_encrypted = !backup_entries_collector.getBackupSettings().decrypt_files_from_encrypted_disks;

View File

@ -314,7 +314,7 @@ namespace
backup_entries.resize(file_paths.size()); backup_entries.resize(file_paths.size());
temp_dir_owner.emplace(temp_disk); temp_dir_owner.emplace(temp_disk);
fs::path temp_dir = temp_dir_owner->getPath(); fs::path temp_dir = temp_dir_owner->getRelativePath();
temp_disk->createDirectories(temp_dir); temp_disk->createDirectories(temp_dir);
/// Writing data.bin /// Writing data.bin
@ -453,10 +453,10 @@ void StorageMemory::restoreDataImpl(const BackupPtr & backup, const String & dat
if (!dynamic_cast<ReadBufferFromFileBase *>(in.get())) if (!dynamic_cast<ReadBufferFromFileBase *>(in.get()))
{ {
temp_data_file.emplace(temporary_disk); temp_data_file.emplace(temporary_disk);
auto out = std::make_unique<WriteBufferFromFile>(temp_data_file->getPath()); auto out = std::make_unique<WriteBufferFromFile>(temp_data_file->getAbsolutePath());
copyData(*in, *out); copyData(*in, *out);
out.reset(); out.reset();
in = createReadBufferFromFileBase(temp_data_file->getPath(), {}); in = createReadBufferFromFileBase(temp_data_file->getAbsolutePath(), {});
} }
std::unique_ptr<ReadBufferFromFileBase> in_from_file{static_cast<ReadBufferFromFileBase *>(in.release())}; std::unique_ptr<ReadBufferFromFileBase> in_from_file{static_cast<ReadBufferFromFileBase *>(in.release())};
CompressedReadBufferFromFile compressed_in{std::move(in_from_file)}; CompressedReadBufferFromFile compressed_in{std::move(in_from_file)};

View File

@ -544,7 +544,7 @@ void StorageStripeLog::backupData(BackupEntriesCollector & backup_entries_collec
fs::path data_path_in_backup_fs = data_path_in_backup; fs::path data_path_in_backup_fs = data_path_in_backup;
auto temp_dir_owner = std::make_shared<TemporaryFileOnDisk>(disk, "tmp/"); auto temp_dir_owner = std::make_shared<TemporaryFileOnDisk>(disk, "tmp/");
fs::path temp_dir = temp_dir_owner->getPath(); fs::path temp_dir = temp_dir_owner->getRelativePath();
disk->createDirectories(temp_dir); disk->createDirectories(temp_dir);
bool copy_encrypted = !backup_entries_collector.getBackupSettings().decrypt_files_from_encrypted_disks; bool copy_encrypted = !backup_entries_collector.getBackupSettings().decrypt_files_from_encrypted_disks;

View File

@ -371,7 +371,7 @@ std::pair<Poco::URI, std::unique_ptr<ReadWriteBufferFromHTTP>> StorageURLSource:
for (; option != end; ++option) for (; option != end; ++option)
{ {
bool skip_url_not_found_error = glob_url && read_settings.http_skip_not_found_url_for_globs && option == std::prev(end); bool skip_url_not_found_error = glob_url && read_settings.http_skip_not_found_url_for_globs && option == std::prev(end);
auto request_uri = Poco::URI(*option, context->getSettingsRef().disable_url_encoding); auto request_uri = Poco::URI(*option, context->getSettingsRef().enable_url_encoding);
for (const auto & [param, value] : params) for (const auto & [param, value] : params)
request_uri.addQueryParameter(param, value); request_uri.addQueryParameter(param, value);

View File

@ -38,7 +38,7 @@ ColumnsDescription getStructureOfRemoteTableInShard(
if (shard_info.isLocal()) if (shard_info.isLocal())
{ {
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_func_ptr, context); TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_func_ptr, context);
return table_function_ptr->getActualTableStructure(context); return table_function_ptr->getActualTableStructure(context, /*is_insert_query*/ true);
} }
auto table_func_name = queryToString(table_func_ptr); auto table_func_name = queryToString(table_func_ptr);

View File

@ -49,13 +49,14 @@ namespace DB
actual_columns = parseColumnsListFromString(table_structure, context_); actual_columns = parseColumnsListFromString(table_structure, context_);
} }
ColumnsDescription TableFunctionHive::getActualTableStructure(ContextPtr /*context_*/) const { return actual_columns; } ColumnsDescription TableFunctionHive::getActualTableStructure(ContextPtr /*context_*/, bool /*is_insert_query*/) const { return actual_columns; }
StoragePtr TableFunctionHive::executeImpl( StoragePtr TableFunctionHive::executeImpl(
const ASTPtr & /*ast_function_*/, const ASTPtr & /*ast_function_*/,
ContextPtr context_, ContextPtr context_,
const std::string & table_name_, const std::string & table_name_,
ColumnsDescription /*cached_columns_*/) const ColumnsDescription /*cached_columns_*/,
bool /*is_insert_query*/) const
{ {
const Settings & settings = context_->getSettings(); const Settings & settings = context_->getSettings();
ParserExpression partition_by_parser; ParserExpression partition_by_parser;

View File

@ -17,10 +17,10 @@ public:
bool hasStaticStructure() const override { return true; } bool hasStaticStructure() const override { return true; }
StoragePtr executeImpl( StoragePtr executeImpl(
const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const override; const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;
const char * getStorageTypeName() const override { return storage_type_name; } const char * getStorageTypeName() const override { return storage_type_name; }
ColumnsDescription getActualTableStructure(ContextPtr) const override; ColumnsDescription getActualTableStructure(ContextPtr, bool is_insert_query) const override;
void parseArguments(const ASTPtr & ast_function_, ContextPtr context_) override; void parseArguments(const ASTPtr & ast_function_, ContextPtr context_) override;
private: private:

View File

@ -34,15 +34,15 @@ StoragePtr ITableFunction::execute(const ASTPtr & ast_function, ContextPtr conte
auto context_to_use = use_global_context ? context->getGlobalContext() : context; auto context_to_use = use_global_context ? context->getGlobalContext() : context;
if (cached_columns.empty()) if (cached_columns.empty())
return executeImpl(ast_function, context, table_name, std::move(cached_columns)); return executeImpl(ast_function, context, table_name, std::move(cached_columns), is_insert_query);
if (hasStaticStructure() && cached_columns == getActualTableStructure(context)) if (hasStaticStructure() && cached_columns == getActualTableStructure(context,is_insert_query))
return executeImpl(ast_function, context_to_use, table_name, std::move(cached_columns)); return executeImpl(ast_function, context_to_use, table_name, std::move(cached_columns), is_insert_query);
auto this_table_function = shared_from_this(); auto this_table_function = shared_from_this();
auto get_storage = [=]() -> StoragePtr auto get_storage = [=]() -> StoragePtr
{ {
return this_table_function->executeImpl(ast_function, context_to_use, table_name, cached_columns); return this_table_function->executeImpl(ast_function, context_to_use, table_name, cached_columns, is_insert_query);
}; };
/// It will request actual table structure and create underlying storage lazily /// It will request actual table structure and create underlying storage lazily

View File

@ -58,7 +58,7 @@ public:
virtual void parseArguments(const ASTPtr & /*ast_function*/, ContextPtr /*context*/) {} virtual void parseArguments(const ASTPtr & /*ast_function*/, ContextPtr /*context*/) {}
/// Returns actual table structure probably requested from remote server, may fail /// Returns actual table structure probably requested from remote server, may fail
virtual ColumnsDescription getActualTableStructure(ContextPtr /*context*/) const = 0; virtual ColumnsDescription getActualTableStructure(ContextPtr /*context*/, bool is_insert_query) const = 0;
/// Check if table function needs a structure hint from SELECT query in case of /// Check if table function needs a structure hint from SELECT query in case of
/// INSERT INTO FUNCTION ... SELECT ... and INSERT INTO ... SELECT ... FROM table_function(...) /// INSERT INTO FUNCTION ... SELECT ... and INSERT INTO ... SELECT ... FROM table_function(...)
@ -89,7 +89,7 @@ protected:
private: private:
virtual StoragePtr executeImpl( virtual StoragePtr executeImpl(
const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const = 0; const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const = 0;
virtual const char * getStorageTypeName() const = 0; virtual const char * getStorageTypeName() const = 0;
}; };

View File

@ -26,7 +26,8 @@ protected:
const ASTPtr & /*ast_function*/, const ASTPtr & /*ast_function*/,
ContextPtr context, ContextPtr context,
const std::string & table_name, const std::string & table_name,
ColumnsDescription /*cached_columns*/) const override ColumnsDescription /*cached_columns*/,
bool /*is_insert_query*/) const override
{ {
ColumnsDescription columns; ColumnsDescription columns;
if (TableFunction::configuration.structure != "auto") if (TableFunction::configuration.structure != "auto")
@ -42,7 +43,7 @@ protected:
const char * getStorageTypeName() const override { return Storage::name; } const char * getStorageTypeName() const override { return Storage::name; }
ColumnsDescription getActualTableStructure(ContextPtr context) const override ColumnsDescription getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const override
{ {
if (TableFunction::configuration.structure == "auto") if (TableFunction::configuration.structure == "auto")
{ {

View File

@ -110,7 +110,7 @@ void ITableFunctionFileLike::addColumnsStructureToArguments(ASTs & args, const S
} }
} }
StoragePtr ITableFunctionFileLike::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const StoragePtr ITableFunctionFileLike::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool /*is_insert_query*/) const
{ {
ColumnsDescription columns; ColumnsDescription columns;
if (structure != "auto") if (structure != "auto")

View File

@ -48,7 +48,7 @@ protected:
ColumnsDescription structure_hint; ColumnsDescription structure_hint;
private: private:
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const override; StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;
virtual StoragePtr getStorage( virtual StoragePtr getStorage(
const String & source, const String & format, const ColumnsDescription & columns, ContextPtr global_context, const String & source, const String & format, const ColumnsDescription & columns, ContextPtr global_context,

View File

@ -61,7 +61,7 @@ void ITableFunctionXDBC::startBridgeIfNot(ContextPtr context) const
} }
} }
ColumnsDescription ITableFunctionXDBC::getActualTableStructure(ContextPtr context) const ColumnsDescription ITableFunctionXDBC::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const
{ {
startBridgeIfNot(context); startBridgeIfNot(context);
@ -92,10 +92,10 @@ ColumnsDescription ITableFunctionXDBC::getActualTableStructure(ContextPtr contex
return ColumnsDescription{columns}; return ColumnsDescription{columns};
} }
StoragePtr ITableFunctionXDBC::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const StoragePtr ITableFunctionXDBC::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool is_insert_query) const
{ {
startBridgeIfNot(context); startBridgeIfNot(context);
auto columns = getActualTableStructure(context); auto columns = getActualTableStructure(context, is_insert_query);
auto result = std::make_shared<StorageXDBC>( auto result = std::make_shared<StorageXDBC>(
StorageID(getDatabaseName(), table_name), schema_name, remote_table_name, columns, ConstraintsDescription{}, String{}, context, helper); StorageID(getDatabaseName(), table_name), schema_name, remote_table_name, columns, ConstraintsDescription{}, String{}, context, helper);
result->startup(); result->startup();

View File

@ -16,7 +16,7 @@ namespace DB
class ITableFunctionXDBC : public ITableFunction class ITableFunctionXDBC : public ITableFunction
{ {
private: private:
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const override; StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;
/* A factory method to create bridge helper, that will assist in remote interaction */ /* A factory method to create bridge helper, that will assist in remote interaction */
virtual BridgeHelperPtr createBridgeHelper(ContextPtr context, virtual BridgeHelperPtr createBridgeHelper(ContextPtr context,
@ -24,7 +24,7 @@ private:
const std::string & connection_string_, const std::string & connection_string_,
bool use_connection_pooling_) const = 0; bool use_connection_pooling_) const = 0;
ColumnsDescription getActualTableStructure(ContextPtr context) const override; ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override; void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;

View File

@ -39,7 +39,7 @@ namespace
bool isConnectionString(const std::string & candidate) bool isConnectionString(const std::string & candidate)
{ {
return candidate.starts_with("DefaultEndpointsProtocol"); return !candidate.starts_with("http");
} }
} }
@ -193,12 +193,12 @@ void TableFunctionAzureBlobStorage::parseArguments(const ASTPtr & ast_function,
configuration = parseArgumentsImpl(args, context); configuration = parseArgumentsImpl(args, context);
} }
ColumnsDescription TableFunctionAzureBlobStorage::getActualTableStructure(ContextPtr context) const ColumnsDescription TableFunctionAzureBlobStorage::getActualTableStructure(ContextPtr context, bool is_insert_query) const
{ {
if (configuration.structure == "auto") if (configuration.structure == "auto")
{ {
context->checkAccess(getSourceAccessType()); context->checkAccess(getSourceAccessType());
auto client = StorageAzureBlob::createClient(configuration); auto client = StorageAzureBlob::createClient(configuration, !is_insert_query);
auto settings = StorageAzureBlob::createSettings(context); auto settings = StorageAzureBlob::createSettings(context);
auto object_storage = std::make_unique<AzureObjectStorage>("AzureBlobStorageTableFunction", std::move(client), std::move(settings)); auto object_storage = std::make_unique<AzureObjectStorage>("AzureBlobStorageTableFunction", std::move(client), std::move(settings));
@ -213,9 +213,9 @@ bool TableFunctionAzureBlobStorage::supportsReadingSubsetOfColumns()
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format); return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format);
} }
StoragePtr TableFunctionAzureBlobStorage::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const StoragePtr TableFunctionAzureBlobStorage::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool is_insert_query) const
{ {
auto client = StorageAzureBlob::createClient(configuration); auto client = StorageAzureBlob::createClient(configuration, !is_insert_query);
auto settings = StorageAzureBlob::createSettings(context); auto settings = StorageAzureBlob::createSettings(context);
ColumnsDescription columns; ColumnsDescription columns;

View File

@ -54,11 +54,12 @@ protected:
const ASTPtr & ast_function, const ASTPtr & ast_function,
ContextPtr context, ContextPtr context,
const std::string & table_name, const std::string & table_name,
ColumnsDescription cached_columns) const override; ColumnsDescription cached_columns,
bool is_insert_query) const override;
const char * getStorageTypeName() const override { return "Azure"; } const char * getStorageTypeName() const override { return "Azure"; }
ColumnsDescription getActualTableStructure(ContextPtr context) const override; ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override; void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
mutable StorageAzureBlob::Configuration configuration; mutable StorageAzureBlob::Configuration configuration;

View File

@ -43,7 +43,7 @@ void TableFunctionDictionary::parseArguments(const ASTPtr & ast_function, Contex
dictionary_name = checkAndGetLiteralArgument<String>(args[0], "dictionary_name"); dictionary_name = checkAndGetLiteralArgument<String>(args[0], "dictionary_name");
} }
ColumnsDescription TableFunctionDictionary::getActualTableStructure(ContextPtr context) const ColumnsDescription TableFunctionDictionary::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const
{ {
const ExternalDictionariesLoader & external_loader = context->getExternalDictionariesLoader(); const ExternalDictionariesLoader & external_loader = context->getExternalDictionariesLoader();
std::string resolved_name = external_loader.resolveDictionaryName(dictionary_name, context->getCurrentDatabase()); std::string resolved_name = external_loader.resolveDictionaryName(dictionary_name, context->getCurrentDatabase());
@ -76,10 +76,10 @@ ColumnsDescription TableFunctionDictionary::getActualTableStructure(ContextPtr c
} }
StoragePtr TableFunctionDictionary::executeImpl( StoragePtr TableFunctionDictionary::executeImpl(
const ASTPtr &, ContextPtr context, const std::string & table_name, ColumnsDescription) const const ASTPtr &, ContextPtr context, const std::string & table_name, ColumnsDescription, bool is_insert_query) const
{ {
StorageID dict_id(getDatabaseName(), table_name); StorageID dict_id(getDatabaseName(), table_name);
auto dictionary_table_structure = getActualTableStructure(context); auto dictionary_table_structure = getActualTableStructure(context, is_insert_query);
auto result = std::make_shared<StorageDictionary>( auto result = std::make_shared<StorageDictionary>(
dict_id, dictionary_name, std::move(dictionary_table_structure), String{}, StorageDictionary::Location::Custom, context); dict_id, dictionary_name, std::move(dictionary_table_structure), String{}, StorageDictionary::Location::Custom, context);

View File

@ -18,9 +18,9 @@ public:
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override; void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
ColumnsDescription getActualTableStructure(ContextPtr context) const override; ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription) const override; StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription, bool is_insert_query) const override;
const char * getStorageTypeName() const override { return "Dictionary"; } const char * getStorageTypeName() const override { return "Dictionary"; }

View File

@ -120,12 +120,12 @@ void TableFunctionExecutable::parseArguments(const ASTPtr & ast_function, Contex
} }
} }
ColumnsDescription TableFunctionExecutable::getActualTableStructure(ContextPtr context) const ColumnsDescription TableFunctionExecutable::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const
{ {
return parseColumnsListFromString(structure, context); return parseColumnsListFromString(structure, context);
} }
StoragePtr TableFunctionExecutable::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const StoragePtr TableFunctionExecutable::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool is_insert_query) const
{ {
auto storage_id = StorageID(getDatabaseName(), table_name); auto storage_id = StorageID(getDatabaseName(), table_name);
auto global_context = context->getGlobalContext(); auto global_context = context->getGlobalContext();
@ -135,7 +135,7 @@ StoragePtr TableFunctionExecutable::executeImpl(const ASTPtr & /*ast_function*/,
if (settings_query != nullptr) if (settings_query != nullptr)
settings.applyChanges(settings_query->as<ASTSetQuery>()->changes); settings.applyChanges(settings_query->as<ASTSetQuery>()->changes);
auto storage = std::make_shared<StorageExecutable>(storage_id, format, settings, input_queries, getActualTableStructure(context), ConstraintsDescription{}); auto storage = std::make_shared<StorageExecutable>(storage_id, format, settings, input_queries, getActualTableStructure(context, is_insert_query), ConstraintsDescription{});
storage->startup(); storage->startup();
return storage; return storage;
} }

View File

@ -24,11 +24,11 @@ public:
bool hasStaticStructure() const override { return true; } bool hasStaticStructure() const override { return true; }
private: private:
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const override; StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;
const char * getStorageTypeName() const override { return "Executable"; } const char * getStorageTypeName() const override { return "Executable"; }
ColumnsDescription getActualTableStructure(ContextPtr context) const override; ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
std::vector<size_t> skipAnalysisForArguments(const QueryTreeNodePtr & query_node_table_function, ContextPtr context) const override; std::vector<size_t> skipAnalysisForArguments(const QueryTreeNodePtr & query_node_table_function, ContextPtr context) const override;

View File

@ -91,7 +91,7 @@ void TableFunctionExplain::parseArguments(const ASTPtr & ast_function, ContextPt
query = std::move(explain_query); query = std::move(explain_query);
} }
ColumnsDescription TableFunctionExplain::getActualTableStructure(ContextPtr context) const ColumnsDescription TableFunctionExplain::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const
{ {
Block sample_block = getInterpreter(context).getSampleBlock(query->as<ASTExplainQuery>()->getKind()); Block sample_block = getInterpreter(context).getSampleBlock(query->as<ASTExplainQuery>()->getKind());
ColumnsDescription columns_description; ColumnsDescription columns_description;
@ -123,7 +123,7 @@ static Block executeMonoBlock(QueryPipeline & pipeline)
} }
StoragePtr TableFunctionExplain::executeImpl( StoragePtr TableFunctionExplain::executeImpl(
const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool is_insert_query) const
{ {
/// To support settings inside explain subquery. /// To support settings inside explain subquery.
auto mutable_context = Context::createCopy(context); auto mutable_context = Context::createCopy(context);
@ -132,7 +132,7 @@ StoragePtr TableFunctionExplain::executeImpl(
Block block = executeMonoBlock(blockio.pipeline); Block block = executeMonoBlock(blockio.pipeline);
StorageID storage_id(getDatabaseName(), table_name); StorageID storage_id(getDatabaseName(), table_name);
auto storage = std::make_shared<StorageValues>(storage_id, getActualTableStructure(context), std::move(block)); auto storage = std::make_shared<StorageValues>(storage_id, getActualTableStructure(context, is_insert_query), std::move(block));
storage->startup(); storage->startup();
return storage; return storage;
} }

View File

@ -17,7 +17,7 @@ public:
std::string getName() const override { return name; } std::string getName() const override { return name; }
private: private:
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const String & table_name, ColumnsDescription cached_columns) const override; StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const String & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;
const char * getStorageTypeName() const override { return "Explain"; } const char * getStorageTypeName() const override { return "Explain"; }
@ -25,7 +25,7 @@ private:
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override; void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
ColumnsDescription getActualTableStructure(ContextPtr context) const override; ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
InterpreterExplainQuery getInterpreter(ContextPtr context) const; InterpreterExplainQuery getInterpreter(ContextPtr context) const;

View File

@ -83,7 +83,7 @@ StoragePtr TableFunctionFile::getStorage(const String & source,
return std::make_shared<StorageFile>(source, global_context->getUserFilesPath(), args); return std::make_shared<StorageFile>(source, global_context->getUserFilesPath(), args);
} }
ColumnsDescription TableFunctionFile::getActualTableStructure(ContextPtr context) const ColumnsDescription TableFunctionFile::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const
{ {
if (structure == "auto") if (structure == "auto")
{ {

View File

@ -20,7 +20,7 @@ public:
return name; return name;
} }
ColumnsDescription getActualTableStructure(ContextPtr context) const override; ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
std::unordered_set<String> getVirtualsToCheckBeforeUsingStructureHint() const override std::unordered_set<String> getVirtualsToCheckBeforeUsingStructureHint() const override
{ {

View File

@ -52,7 +52,7 @@ void TableFunctionFormat::parseArguments(const ASTPtr & ast_function, ContextPtr
structure = checkAndGetLiteralArgument<String>(args[1], "structure"); structure = checkAndGetLiteralArgument<String>(args[1], "structure");
} }
ColumnsDescription TableFunctionFormat::getActualTableStructure(ContextPtr context) const ColumnsDescription TableFunctionFormat::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const
{ {
if (structure == "auto") if (structure == "auto")
{ {
@ -98,9 +98,9 @@ Block TableFunctionFormat::parseData(ColumnsDescription columns, ContextPtr cont
return concatenateBlocks(blocks); return concatenateBlocks(blocks);
} }
StoragePtr TableFunctionFormat::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const StoragePtr TableFunctionFormat::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool is_insert_query) const
{ {
auto columns = getActualTableStructure(context); auto columns = getActualTableStructure(context, is_insert_query);
Block res_block = parseData(columns, context); Block res_block = parseData(columns, context);
auto res = std::make_shared<StorageValues>(StorageID(getDatabaseName(), table_name), columns, res_block); auto res = std::make_shared<StorageValues>(StorageID(getDatabaseName(), table_name), columns, res_block);
res->startup(); res->startup();

View File

@ -18,10 +18,10 @@ public:
bool hasStaticStructure() const override { return false; } bool hasStaticStructure() const override { return false; }
private: private:
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const override; StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;
const char * getStorageTypeName() const override { return "Values"; } const char * getStorageTypeName() const override { return "Values"; }
ColumnsDescription getActualTableStructure(ContextPtr context) const override; ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override; void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
Block parseData(ColumnsDescription columns, ContextPtr context) const; Block parseData(ColumnsDescription columns, ContextPtr context) const;

View File

@ -97,7 +97,7 @@ void TableFunctionGenerateRandom::parseArguments(const ASTPtr & ast_function, Co
} }
} }
ColumnsDescription TableFunctionGenerateRandom::getActualTableStructure(ContextPtr context) const ColumnsDescription TableFunctionGenerateRandom::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const
{ {
if (structure == "auto") if (structure == "auto")
{ {
@ -113,9 +113,9 @@ ColumnsDescription TableFunctionGenerateRandom::getActualTableStructure(ContextP
return parseColumnsListFromString(structure, context); return parseColumnsListFromString(structure, context);
} }
StoragePtr TableFunctionGenerateRandom::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const StoragePtr TableFunctionGenerateRandom::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool is_insert_query) const
{ {
ColumnsDescription columns = getActualTableStructure(context); ColumnsDescription columns = getActualTableStructure(context, is_insert_query);
auto res = std::make_shared<StorageGenerateRandom>( auto res = std::make_shared<StorageGenerateRandom>(
StorageID(getDatabaseName(), table_name), columns, String{}, max_array_length, max_string_length, random_seed); StorageID(getDatabaseName(), table_name), columns, String{}, max_array_length, max_string_length, random_seed);
res->startup(); res->startup();

View File

@ -19,10 +19,10 @@ public:
void setStructureHint(const ColumnsDescription & structure_hint_) override { structure_hint = structure_hint_; } void setStructureHint(const ColumnsDescription & structure_hint_) override { structure_hint = structure_hint_; }
private: private:
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const override; StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;
const char * getStorageTypeName() const override { return "GenerateRandom"; } const char * getStorageTypeName() const override { return "GenerateRandom"; }
ColumnsDescription getActualTableStructure(ContextPtr context) const override; ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override; void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
String structure = "auto"; String structure = "auto";

View File

@ -28,7 +28,7 @@ StoragePtr TableFunctionHDFS::getStorage(
compression_method_); compression_method_);
} }
ColumnsDescription TableFunctionHDFS::getActualTableStructure(ContextPtr context) const ColumnsDescription TableFunctionHDFS::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const
{ {
if (structure == "auto") if (structure == "auto")
{ {

View File

@ -34,7 +34,7 @@ public:
return signature; return signature;
} }
ColumnsDescription getActualTableStructure(ContextPtr context) const override; ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
std::unordered_set<String> getVirtualsToCheckBeforeUsingStructureHint() const override std::unordered_set<String> getVirtualsToCheckBeforeUsingStructureHint() const override
{ {

View File

@ -43,7 +43,7 @@ void TableFunctionInput::parseArguments(const ASTPtr & ast_function, ContextPtr
structure = checkAndGetLiteralArgument<String>(evaluateConstantExpressionOrIdentifierAsLiteral(args[0], context), "structure"); structure = checkAndGetLiteralArgument<String>(evaluateConstantExpressionOrIdentifierAsLiteral(args[0], context), "structure");
} }
ColumnsDescription TableFunctionInput::getActualTableStructure(ContextPtr context) const ColumnsDescription TableFunctionInput::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const
{ {
if (structure == "auto") if (structure == "auto")
{ {
@ -58,9 +58,9 @@ ColumnsDescription TableFunctionInput::getActualTableStructure(ContextPtr contex
return parseColumnsListFromString(structure, context); return parseColumnsListFromString(structure, context);
} }
StoragePtr TableFunctionInput::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const StoragePtr TableFunctionInput::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool is_insert_query) const
{ {
auto storage = std::make_shared<StorageInput>(StorageID(getDatabaseName(), table_name), getActualTableStructure(context)); auto storage = std::make_shared<StorageInput>(StorageID(getDatabaseName(), table_name), getActualTableStructure(context, is_insert_query));
storage->startup(); storage->startup();
return storage; return storage;
} }

View File

@ -20,10 +20,10 @@ public:
void setStructureHint(const ColumnsDescription & structure_hint_) override { structure_hint = structure_hint_; } void setStructureHint(const ColumnsDescription & structure_hint_) override { structure_hint = structure_hint_; }
private: private:
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const override; StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;
const char * getStorageTypeName() const override { return "Input"; } const char * getStorageTypeName() const override { return "Input"; }
ColumnsDescription getActualTableStructure(ContextPtr context) const override; ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override; void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
String structure; String structure;

View File

@ -8,13 +8,13 @@
namespace DB namespace DB
{ {
StoragePtr TableFunctionMeiliSearch::executeImpl( StoragePtr TableFunctionMeiliSearch::executeImpl(
const ASTPtr & /* ast_function */, ContextPtr /*context*/, const String & table_name, ColumnsDescription /*cached_columns*/) const const ASTPtr & /* ast_function */, ContextPtr /*context*/, const String & table_name, ColumnsDescription /*cached_columns*/, bool /*is_insert_query*/) const
{ {
return std::make_shared<StorageMeiliSearch>( return std::make_shared<StorageMeiliSearch>(
StorageID(getDatabaseName(), table_name), configuration.value(), ColumnsDescription{}, ConstraintsDescription{}, String{}); StorageID(getDatabaseName(), table_name), configuration.value(), ColumnsDescription{}, ConstraintsDescription{}, String{});
} }
ColumnsDescription TableFunctionMeiliSearch::getActualTableStructure(ContextPtr /* context */) const ColumnsDescription TableFunctionMeiliSearch::getActualTableStructure(ContextPtr /* context */, bool /*is_insert_query*/) const
{ {
return StorageMeiliSearch::getTableStructureFromData(configuration.value()); return StorageMeiliSearch::getTableStructureFromData(configuration.value());
} }

View File

@ -13,11 +13,11 @@ public:
private: private:
StoragePtr executeImpl( StoragePtr executeImpl(
const ASTPtr & ast_function, ContextPtr context, const String & table_name, ColumnsDescription cached_columns) const override; const ASTPtr & ast_function, ContextPtr context, const String & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;
const char * getStorageTypeName() const override { return "meilisearch"; } const char * getStorageTypeName() const override { return "meilisearch"; }
ColumnsDescription getActualTableStructure(ContextPtr context) const override; ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override; void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
std::optional<MeiliSearchConfiguration> configuration; std::optional<MeiliSearchConfiguration> configuration;

View File

@ -118,7 +118,7 @@ const TableFunctionMerge::DBToTableSetMap & TableFunctionMerge::getSourceDatabas
return *source_databases_and_tables; return *source_databases_and_tables;
} }
ColumnsDescription TableFunctionMerge::getActualTableStructure(ContextPtr context) const ColumnsDescription TableFunctionMerge::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const
{ {
for (const auto & db_with_tables : getSourceDatabasesAndTables(context)) for (const auto & db_with_tables : getSourceDatabasesAndTables(context))
{ {
@ -134,11 +134,11 @@ ColumnsDescription TableFunctionMerge::getActualTableStructure(ContextPtr contex
} }
StoragePtr TableFunctionMerge::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const StoragePtr TableFunctionMerge::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool is_insert_query) const
{ {
auto res = std::make_shared<StorageMerge>( auto res = std::make_shared<StorageMerge>(
StorageID(getDatabaseName(), table_name), StorageID(getDatabaseName(), table_name),
getActualTableStructure(context), getActualTableStructure(context, is_insert_query),
String{}, String{},
source_database_name_or_regexp, source_database_name_or_regexp,
database_is_regexp, database_is_regexp,

View File

@ -17,13 +17,13 @@ public:
std::string getName() const override { return name; } std::string getName() const override { return name; }
private: private:
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const override; StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;
const char * getStorageTypeName() const override { return "Merge"; } const char * getStorageTypeName() const override { return "Merge"; }
using TableSet = std::set<String>; using TableSet = std::set<String>;
using DBToTableSetMap = std::map<String, TableSet>; using DBToTableSetMap = std::map<String, TableSet>;
const DBToTableSetMap & getSourceDatabasesAndTables(ContextPtr context) const; const DBToTableSetMap & getSourceDatabasesAndTables(ContextPtr context) const;
ColumnsDescription getActualTableStructure(ContextPtr context) const override; ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
std::vector<size_t> skipAnalysisForArguments(const QueryTreeNodePtr & query_node_table_function, ContextPtr context) const override; std::vector<size_t> skipAnalysisForArguments(const QueryTreeNodePtr & query_node_table_function, ContextPtr context) const override;
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override; void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
static TableSet getMatchedTablesWithAccess(const String & database_name, const String & table_regexp, const ContextPtr & context); static TableSet getMatchedTablesWithAccess(const String & database_name, const String & table_regexp, const ContextPtr & context);

View File

@ -27,9 +27,9 @@ namespace ErrorCodes
StoragePtr TableFunctionMongoDB::executeImpl(const ASTPtr & /*ast_function*/, StoragePtr TableFunctionMongoDB::executeImpl(const ASTPtr & /*ast_function*/,
ContextPtr context, const String & table_name, ColumnsDescription /*cached_columns*/) const ContextPtr context, const String & table_name, ColumnsDescription /*cached_columns*/, bool is_insert_query) const
{ {
auto columns = getActualTableStructure(context); auto columns = getActualTableStructure(context, is_insert_query);
auto storage = std::make_shared<StorageMongoDB>( auto storage = std::make_shared<StorageMongoDB>(
StorageID(configuration->database, table_name), StorageID(configuration->database, table_name),
configuration->host, configuration->host,
@ -46,7 +46,7 @@ StoragePtr TableFunctionMongoDB::executeImpl(const ASTPtr & /*ast_function*/,
return storage; return storage;
} }
ColumnsDescription TableFunctionMongoDB::getActualTableStructure(ContextPtr context) const ColumnsDescription TableFunctionMongoDB::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const
{ {
return parseColumnsListFromString(structure, context); return parseColumnsListFromString(structure, context);
} }

View File

@ -17,11 +17,11 @@ public:
private: private:
StoragePtr executeImpl( StoragePtr executeImpl(
const ASTPtr & ast_function, ContextPtr context, const ASTPtr & ast_function, ContextPtr context,
const std::string & table_name, ColumnsDescription cached_columns) const override; const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;
const char * getStorageTypeName() const override { return "MongoDB"; } const char * getStorageTypeName() const override { return "MongoDB"; }
ColumnsDescription getActualTableStructure(ContextPtr context) const override; ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override; void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
std::optional<StorageMongoDB::Configuration> configuration; std::optional<StorageMongoDB::Configuration> configuration;

View File

@ -57,7 +57,7 @@ void TableFunctionMySQL::parseArguments(const ASTPtr & ast_function, ContextPtr
pool.emplace(createMySQLPoolWithFailover(*configuration, mysql_settings)); pool.emplace(createMySQLPoolWithFailover(*configuration, mysql_settings));
} }
ColumnsDescription TableFunctionMySQL::getActualTableStructure(ContextPtr context) const ColumnsDescription TableFunctionMySQL::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const
{ {
return StorageMySQL::getTableStructureFromData(*pool, configuration->database, configuration->table, context); return StorageMySQL::getTableStructureFromData(*pool, configuration->database, configuration->table, context);
} }
@ -66,7 +66,8 @@ StoragePtr TableFunctionMySQL::executeImpl(
const ASTPtr & /*ast_function*/, const ASTPtr & /*ast_function*/,
ContextPtr context, ContextPtr context,
const std::string & table_name, const std::string & table_name,
ColumnsDescription /*cached_columns*/) const ColumnsDescription /*cached_columns*/,
bool /*is_insert_query*/) const
{ {
auto res = std::make_shared<StorageMySQL>( auto res = std::make_shared<StorageMySQL>(
StorageID(getDatabaseName(), table_name), StorageID(getDatabaseName(), table_name),

View File

@ -23,10 +23,10 @@ public:
return name; return name;
} }
private: private:
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const override; StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;
const char * getStorageTypeName() const override { return "MySQL"; } const char * getStorageTypeName() const override { return "MySQL"; }
ColumnsDescription getActualTableStructure(ContextPtr context) const override; ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override; void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
mutable std::optional<mysqlxx::PoolWithFailover> pool; mutable std::optional<mysqlxx::PoolWithFailover> pool;

View File

@ -32,14 +32,14 @@ void TableFunctionNull::parseArguments(const ASTPtr & ast_function, ContextPtr c
structure = checkAndGetLiteralArgument<String>(evaluateConstantExpressionOrIdentifierAsLiteral(arguments[0], context), "structure"); structure = checkAndGetLiteralArgument<String>(evaluateConstantExpressionOrIdentifierAsLiteral(arguments[0], context), "structure");
} }
ColumnsDescription TableFunctionNull::getActualTableStructure(ContextPtr context) const ColumnsDescription TableFunctionNull::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const
{ {
if (structure != "auto") if (structure != "auto")
return parseColumnsListFromString(structure, context); return parseColumnsListFromString(structure, context);
return default_structure; return default_structure;
} }
StoragePtr TableFunctionNull::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const StoragePtr TableFunctionNull::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool /*is_insert_query*/) const
{ {
ColumnsDescription columns; ColumnsDescription columns;
if (structure != "auto") if (structure != "auto")

View File

@ -23,11 +23,11 @@ public:
void setStructureHint(const ColumnsDescription & structure_hint_) override { structure_hint = structure_hint_; } void setStructureHint(const ColumnsDescription & structure_hint_) override { structure_hint = structure_hint_; }
private: private:
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const String & table_name, ColumnsDescription cached_columns) const override; StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const String & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;
const char * getStorageTypeName() const override { return "Null"; } const char * getStorageTypeName() const override { return "Null"; }
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override; void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
ColumnsDescription getActualTableStructure(ContextPtr context) const override; ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
String structure = "auto"; String structure = "auto";
ColumnsDescription structure_hint; ColumnsDescription structure_hint;

View File

@ -23,14 +23,14 @@ namespace ErrorCodes
template <bool multithreaded> template <bool multithreaded>
ColumnsDescription TableFunctionNumbers<multithreaded>::getActualTableStructure(ContextPtr /*context*/) const ColumnsDescription TableFunctionNumbers<multithreaded>::getActualTableStructure(ContextPtr /*context*/, bool /*is_insert_query*/) const
{ {
/// NOTE: https://bugs.llvm.org/show_bug.cgi?id=47418 /// NOTE: https://bugs.llvm.org/show_bug.cgi?id=47418
return ColumnsDescription{{{"number", std::make_shared<DataTypeUInt64>()}}}; return ColumnsDescription{{{"number", std::make_shared<DataTypeUInt64>()}}};
} }
template <bool multithreaded> template <bool multithreaded>
StoragePtr TableFunctionNumbers<multithreaded>::executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const StoragePtr TableFunctionNumbers<multithreaded>::executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool /*is_insert_query*/) const
{ {
if (const auto * function = ast_function->as<ASTFunction>()) if (const auto * function = ast_function->as<ASTFunction>())
{ {

View File

@ -19,12 +19,12 @@ public:
std::string getName() const override { return name; } std::string getName() const override { return name; }
bool hasStaticStructure() const override { return true; } bool hasStaticStructure() const override { return true; }
private: private:
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const override; StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;
const char * getStorageTypeName() const override { return "SystemNumbers"; } const char * getStorageTypeName() const override { return "SystemNumbers"; }
UInt64 evaluateArgument(ContextPtr context, ASTPtr & argument) const; UInt64 evaluateArgument(ContextPtr context, ASTPtr & argument) const;
ColumnsDescription getActualTableStructure(ContextPtr context) const override; ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
}; };

Some files were not shown because too many files have changed in this diff Show More