Merge branch 'ClickHouse:master' into use-idisk-in-databases

This commit is contained in:
tuanpach 2024-12-10 21:45:12 +07:00 committed by GitHub
commit f4e37406f1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
159 changed files with 2863 additions and 615 deletions

View File

@ -35,7 +35,7 @@ curl https://clickhouse.com/ | sh
Every month we get together with the community (users, contributors, customers, those interested in learning more about ClickHouse) to discuss what is coming in the latest release. If you are interested in sharing what you've built on ClickHouse, let us know.
* [v24.10 Community Call](https://clickhouse.com/company/events/v24-10-community-release-call) - October 31
* [v24.12 Community Call](https://clickhouse.com/company/events/v24-12-community-release-call) - December 19
## Upcoming Events

View File

@ -103,6 +103,26 @@ namespace Net
///
/// The default limit is 100.
int getNameLengthLimit() const;
/// Returns the maximum length of a field name.
///
/// See setNameLengthLimit() for more information.
void setNameLengthLimit(int limit);
/// Sets the maximum length of a field name.
///
/// The default limit is 256.
int getValueLengthLimit() const;
/// Returns the maximum length of a field value.
///
/// See setValueLengthLimit() for more information.
void setValueLengthLimit(int limit);
/// Sets the maximum length of a field value.
///
/// The default limit is 8192.
bool hasToken(const std::string & fieldName, const std::string & token) const;
/// Returns true iff the field with the given fieldName contains
/// the given token. Tokens in a header field are expected to be
@ -157,12 +177,14 @@ namespace Net
enum Limits
/// Limits for basic sanity checks when reading a header
{
MAX_NAME_LENGTH = 256,
MAX_VALUE_LENGTH = 8192,
DFL_NAME_LENGTH_LIMIT = 256,
DFL_VALUE_LENGTH_LIMIT = 8192,
DFL_FIELD_LIMIT = 100
};
int _fieldLimit;
int _nameLengthLimit;
int _valueLengthLimit;
};

View File

@ -28,14 +28,18 @@ namespace Net {
MessageHeader::MessageHeader():
_fieldLimit(DFL_FIELD_LIMIT)
_fieldLimit(DFL_FIELD_LIMIT),
_nameLengthLimit(DFL_NAME_LENGTH_LIMIT),
_valueLengthLimit(DFL_VALUE_LENGTH_LIMIT)
{
}
MessageHeader::MessageHeader(const MessageHeader& messageHeader):
NameValueCollection(messageHeader),
_fieldLimit(DFL_FIELD_LIMIT)
_fieldLimit(DFL_FIELD_LIMIT),
_nameLengthLimit(DFL_NAME_LENGTH_LIMIT),
_valueLengthLimit(DFL_VALUE_LENGTH_LIMIT)
{
}
@ -80,12 +84,12 @@ void MessageHeader::read(std::istream& istr)
throw MessageException("Too many header fields");
name.clear();
value.clear();
while (ch != eof && ch != ':' && ch != '\n' && name.length() < MAX_NAME_LENGTH) { name += ch; ch = buf.sbumpc(); }
while (ch != eof && ch != ':' && ch != '\n' && name.length() < _nameLengthLimit) { name += ch; ch = buf.sbumpc(); }
if (ch == '\n') { ch = buf.sbumpc(); continue; } // ignore invalid header lines
if (ch != ':') throw MessageException("Field name too long/no colon found");
if (ch != eof) ch = buf.sbumpc(); // ':'
while (ch != eof && Poco::Ascii::isSpace(ch) && ch != '\r' && ch != '\n') ch = buf.sbumpc();
while (ch != eof && ch != '\r' && ch != '\n' && value.length() < MAX_VALUE_LENGTH) { value += ch; ch = buf.sbumpc(); }
while (ch != eof && ch != '\r' && ch != '\n' && value.length() < _valueLengthLimit) { value += ch; ch = buf.sbumpc(); }
if (ch == '\r') ch = buf.sbumpc();
if (ch == '\n')
ch = buf.sbumpc();
@ -93,7 +97,7 @@ void MessageHeader::read(std::istream& istr)
throw MessageException("Field value too long/no CRLF found");
while (ch == ' ' || ch == '\t') // folding
{
while (ch != eof && ch != '\r' && ch != '\n' && value.length() < MAX_VALUE_LENGTH) { value += ch; ch = buf.sbumpc(); }
while (ch != eof && ch != '\r' && ch != '\n' && value.length() < _valueLengthLimit) { value += ch; ch = buf.sbumpc(); }
if (ch == '\r') ch = buf.sbumpc();
if (ch == '\n')
ch = buf.sbumpc();
@ -122,6 +126,32 @@ void MessageHeader::setFieldLimit(int limit)
}
int MessageHeader::getNameLengthLimit() const
{
return _nameLengthLimit;
}
void MessageHeader::setNameLengthLimit(int limit)
{
poco_assert(limit >= 0);
_nameLengthLimit = limit;
}
int MessageHeader::getValueLengthLimit() const
{
return _valueLengthLimit;
}
void MessageHeader::setValueLengthLimit(int limit)
{
poco_assert(limit >= 0);
_valueLengthLimit = limit;
}
bool MessageHeader::hasToken(const std::string& fieldName, const std::string& token) const
{
std::string field = get(fieldName, "");

View File

@ -120,6 +120,12 @@ setup_aws_credentials() {
local minio_root_user=${MINIO_ROOT_USER:-clickhouse}
local minio_root_password=${MINIO_ROOT_PASSWORD:-clickhouse}
mkdir -p ~/.aws
if [[ -f ~/.aws/credentials ]]; then
if grep -q "^\[default\]" ~/.aws/credentials; then
echo "The credentials file contains a [default] section."
return
fi
fi
cat <<EOT >> ~/.aws/credentials
[default]
aws_access_key_id=${minio_root_user}

View File

@ -101,3 +101,4 @@ wadllib==1.3.6
websocket-client==1.8.0
wheel==0.38.1
zipp==1.0.0
jinja2==3.1.3

View File

@ -36,6 +36,8 @@ Upper and lower bounds can be specified to limit Memory engine table size, effec
- Requires `max_rows_to_keep`
- `max_rows_to_keep` — Maximum rows to keep within memory table where oldest rows are deleted on each insertion (i.e circular buffer). Max rows can exceed the stated limit if the oldest batch of rows to remove falls under the `min_rows_to_keep` limit when adding a large block.
- Default value: `0`
- `compress` - Whether to compress data in memory.
- Default value: `false`
## Usage {#usage}

View File

@ -36,8 +36,8 @@ Alias:
**Arguments**
- `haystack` — String in which the search is performed. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `needle` — Substring to be searched. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `haystack` — String in which the search is performed. [String](../data-types/string.md) or [Enum](../data-types/string.md).
- `needle` — Substring to be searched. [String](../data-types/string.md).
- `start_pos` Position (1-based) in `haystack` at which the search starts. [UInt](../data-types/int-uint.md). Optional.
**Returned value**
@ -203,7 +203,7 @@ multiSearchAllPositions(haystack, [needle1, needle2, ..., needleN])
**Arguments**
- `haystack` — String in which the search is performed. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `haystack` — String in which the search is performed. [String](../data-types/string.md).
- `needle` — Substrings to be searched. [Array](../data-types/array.md).
**Returned value**
@ -238,7 +238,7 @@ multiSearchAllPositionsCaseInsensitive(haystack, [needle1, needle2, ..., needleN
**Parameters**
- `haystack` — String in which the search is performed. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `haystack` — String in which the search is performed. [String](../data-types/string.md).
- `needle` — Substrings to be searched. [Array](../data-types/array.md).
**Returned value**
@ -272,7 +272,7 @@ multiSearchAllPositionsUTF8(haystack, [needle1, needle2, ..., needleN])
**Parameters**
- `haystack` — UTF-8 encoded string in which the search is performed. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `haystack` — UTF-8 encoded string in which the search is performed. [String](../data-types/string.md).
- `needle` — UTF-8 encoded substrings to be searched. [Array](../data-types/array.md).
**Returned value**
@ -308,7 +308,7 @@ multiSearchAllPositionsCaseInsensitiveUTF8(haystack, [needle1, needle2, ..., nee
**Parameters**
- `haystack` — UTF-8 encoded string in which the search is performed. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `haystack` — UTF-8 encoded string in which the search is performed. [String](../data-types/string.md).
- `needle` — UTF-8 encoded substrings to be searched. [Array](../data-types/array.md).
**Returned value**
@ -346,7 +346,7 @@ multiSearchFirstPosition(haystack, [needle1, needle2, ..., needleN])
**Parameters**
- `haystack` — String in which the search is performed. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `haystack` — String in which the search is performed. [String](../data-types/string.md).
- `needle` — Substrings to be searched. [Array](../data-types/array.md).
**Returned value**
@ -380,7 +380,7 @@ multiSearchFirstPositionCaseInsensitive(haystack, [needle1, needle2, ..., needle
**Parameters**
- `haystack` — String in which the search is performed. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `haystack` — String in which the search is performed. [String](../data-types/string.md).
- `needle` — Array of substrings to be searched. [Array](../data-types/array.md).
**Returned value**
@ -414,7 +414,7 @@ multiSearchFirstPositionUTF8(haystack, [needle1, needle2, ..., needleN])
**Parameters**
- `haystack` — UTF-8 string in which the search is performed. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `haystack` — UTF-8 string in which the search is performed. [String](../data-types/string.md).
- `needle` — Array of UTF-8 substrings to be searched. [Array](../data-types/array.md).
**Returned value**
@ -450,7 +450,7 @@ multiSearchFirstPositionCaseInsensitiveUTF8(haystack, [needle1, needle2, ..., ne
**Parameters**
- `haystack` — UTF-8 string in which the search is performed. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `haystack` — UTF-8 string in which the search is performed. [String](../data-types/string.md).
- `needle` — Array of UTF-8 substrings to be searched. [Array](../data-types/array.md)
**Returned value**
@ -487,7 +487,7 @@ multiSearchFirstIndex(haystack, [needle1, needle2, ..., needleN])
```
**Parameters**
- `haystack` — String in which the search is performed. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `haystack` — String in which the search is performed. [String](../data-types/string.md).
- `needle` — Substrings to be searched. [Array](../data-types/array.md).
**Returned value**
@ -520,7 +520,7 @@ multiSearchFirstIndexCaseInsensitive(haystack, [needle1, needle2, ..., needleN])
**Parameters**
- `haystack` — String in which the search is performed. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `haystack` — String in which the search is performed. [String](../data-types/string.md).
- `needle` — Substrings to be searched. [Array](../data-types/array.md).
**Returned value**
@ -553,7 +553,7 @@ multiSearchFirstIndexUTF8(haystack, [needle1, needle2, ..., needleN])
**Parameters**
- `haystack` — UTF-8 string in which the search is performed. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `haystack` — UTF-8 string in which the search is performed. [String](../data-types/string.md).
- `needle` — Array of UTF-8 substrings to be searched. [Array](../data-types/array.md)
**Returned value**
@ -588,7 +588,7 @@ multiSearchFirstIndexCaseInsensitiveUTF8(haystack, [needle1, needle2, ..., needl
**Parameters**
- `haystack` — UTF-8 string in which the search is performed. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `haystack` — UTF-8 string in which the search is performed. [String](../data-types/string.md).
- `needle` — Array of UTF-8 substrings to be searched. [Array](../data-types/array.md).
**Returned value**
@ -625,7 +625,7 @@ multiSearchAny(haystack, [needle1, needle2, ..., needleN])
**Parameters**
- `haystack` — String in which the search is performed. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `haystack` — String in which the search is performed. [String](../data-types/string.md).
- `needle` — Substrings to be searched. [Array](../data-types/array.md).
**Returned value**
@ -659,7 +659,7 @@ multiSearchAnyCaseInsensitive(haystack, [needle1, needle2, ..., needleN])
**Parameters**
- `haystack` — String in which the search is performed. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `haystack` — String in which the search is performed. [String](../data-types/string.md).
- `needle` — Substrings to be searched. [Array](../data-types/array.md)
**Returned value**
@ -693,7 +693,7 @@ multiSearchAnyUTF8(haystack, [needle1, needle2, ..., needleN])
**Parameters**
- `haystack` — UTF-8 string in which the search is performed. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `haystack` — UTF-8 string in which the search is performed. [String](../data-types/string.md).
- `needle` — UTF-8 substrings to be searched. [Array](../data-types/array.md).
**Returned value**
@ -729,7 +729,7 @@ multiSearchAnyCaseInsensitiveUTF8(haystack, [needle1, needle2, ..., needleN])
**Parameters**
- `haystack` — UTF-8 string in which the search is performed. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `haystack` — UTF-8 string in which the search is performed. [String](../data-types/string.md).
- `needle` — UTF-8 substrings to be searched. [Array](../data-types/array.md)
**Returned value**
@ -1414,8 +1414,8 @@ countSubstrings(haystack, needle[, start_pos])
**Arguments**
- `haystack` — String in which the search is performed. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `needle` — Substring to be searched. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `haystack` — String in which the search is performed. [String](../data-types/string.md) or [Enum](../data-types/enum.md).
- `needle` — Substring to be searched. [String](../data-types/string.md).
- `start_pos` Position (1-based) in `haystack` at which the search starts. [UInt](../data-types/int-uint.md). Optional.
**Returned value**
@ -1461,8 +1461,8 @@ countSubstringsCaseInsensitive(haystack, needle[, start_pos])
**Arguments**
- `haystack` — String in which the search is performed. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `needle` — Substring to be searched. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `haystack` — String in which the search is performed. [String](../data-types/string.md) or [Enum](../data-types/enum.md).
- `needle` — Substring to be searched. [String](../data-types/string.md).
- `start_pos` Position (1-based) in `haystack` at which the search starts. [UInt](../data-types/int-uint.md). Optional.
**Returned value**
@ -1513,8 +1513,8 @@ countSubstringsCaseInsensitiveUTF8(haystack, needle[, start_pos])
**Arguments**
- `haystack` — UTF-8 string in which the search is performed. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `needle` — Substring to be searched. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `haystack` — UTF-8 string in which the search is performed. [String](../data-types/string.md) or [Enum](../data-types/enum.md).
- `needle` — Substring to be searched. [String](../data-types/string.md).
- `start_pos` Position (1-based) in `haystack` at which the search starts. [UInt](../data-types/int-uint.md). Optional.
**Returned value**
@ -1565,7 +1565,7 @@ countMatches(haystack, pattern)
**Arguments**
- `haystack` — The string to search in. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `haystack` — The string to search in. [String](../data-types/string.md).
- `pattern` — The regular expression with [re2 regular expression syntax](https://github.com/google/re2/wiki/Syntax). [String](../data-types/string.md).
**Returned value**
@ -1610,7 +1610,7 @@ countMatchesCaseInsensitive(haystack, pattern)
**Arguments**
- `haystack` — The string to search in. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `haystack` — The string to search in. [String](../data-types/string.md).
- `pattern` — The regular expression with [re2 regular expression syntax](https://github.com/google/re2/wiki/Syntax). [String](../data-types/string.md).
**Returned value**
@ -1647,8 +1647,8 @@ Alias: `REGEXP_EXTRACT(haystack, pattern[, index])`.
**Arguments**
- `haystack` — String, in which regexp pattern will to be matched. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `pattern` — String, regexp expression, must be constant. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `haystack` — String, in which regexp pattern will to be matched. [String](../data-types/string.md).
- `pattern` — String, regexp expression, must be constant. [String](../data-types/string.md).
- `index` An integer number greater or equal 0 with default 1. It represents which regex group to extract. [UInt or Int](../data-types/int-uint.md). Optional.
**Returned value**
@ -1687,8 +1687,8 @@ hasSubsequence(haystack, needle)
**Arguments**
- `haystack` — String in which the search is performed. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `needle` — Subsequence to be searched. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `haystack` — String in which the search is performed. [String](../data-types/string.md).
- `needle` — Subsequence to be searched. [String](../data-types/string.md).
**Returned value**
@ -1722,8 +1722,8 @@ hasSubsequenceCaseInsensitive(haystack, needle)
**Arguments**
- `haystack` — String in which the search is performed. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `needle` — Subsequence to be searched. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `haystack` — String in which the search is performed. [String](../data-types/string.md).
- `needle` — Subsequence to be searched. [String](../data-types/string.md).
**Returned value**
@ -1757,8 +1757,8 @@ hasSubsequenceUTF8(haystack, needle)
**Arguments**
- `haystack` — String in which the search is performed. UTF-8 encoded [String](../../sql-reference/syntax.md#syntax-string-literal).
- `needle` — Subsequence to be searched. UTF-8 encoded [String](../../sql-reference/syntax.md#syntax-string-literal).
- `haystack` — String in which the search is performed. UTF-8 encoded [String](../data-types/string.md).
- `needle` — Subsequence to be searched. UTF-8 encoded [String](../data-types/string.md).
**Returned value**
@ -1792,8 +1792,8 @@ hasSubsequenceCaseInsensitiveUTF8(haystack, needle)
**Arguments**
- `haystack` — String in which the search is performed. UTF-8 encoded [String](../../sql-reference/syntax.md#syntax-string-literal).
- `needle` — Subsequence to be searched. UTF-8 encoded [String](../../sql-reference/syntax.md#syntax-string-literal).
- `haystack` — String in which the search is performed. UTF-8 encoded [String](../data-types/string.md).
- `needle` — Subsequence to be searched. UTF-8 encoded [String](../data-types/string.md).
**Returned value**
@ -1827,7 +1827,7 @@ hasToken(haystack, token)
**Parameters**
- `haystack`: String in which the search is performed. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `haystack`: String in which the search is performed. [String](../data-types/string.md) or [Enum](../data-types/enum.md).
- `token`: Maximal length substring between two non alphanumeric ASCII characters (or boundaries of haystack).
**Returned value**
@ -1862,12 +1862,12 @@ hasTokenOrNull(haystack, token)
**Parameters**
- `haystack`: String in which the search is performed. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `haystack`: String in which the search is performed. [String](../data-types/string.md) or [Enum](../data-types/enum.md).
- `token`: Maximal length substring between two non alphanumeric ASCII characters (or boundaries of haystack).
**Returned value**
- 1, if the token is present in the haystack, 0 if it is not present, and null if the token is ill formed.
- 1, if the token is present in the haystack, 0 if it is not present, and null if the token is ill formed.
**Implementation details**
@ -1899,7 +1899,7 @@ hasTokenCaseInsensitive(haystack, token)
**Parameters**
- `haystack`: String in which the search is performed. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `haystack`: String in which the search is performed. [String](../data-types/string.md) or [Enum](../data-types/enum.md).
- `token`: Maximal length substring between two non alphanumeric ASCII characters (or boundaries of haystack).
**Returned value**
@ -1934,7 +1934,7 @@ hasTokenCaseInsensitiveOrNull(haystack, token)
**Parameters**
- `haystack`: String in which the search is performed. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `haystack`: String in which the search is performed. [String](../data-types/string.md) or [Enum](../data-types/enum.md).
- `token`: Maximal length substring between two non alphanumeric ASCII characters (or boundaries of haystack).
**Returned value**

View File

@ -157,13 +157,14 @@ For your convenience, the old documentation is located [here](https://pastila.nl
## Refreshable Materialized View {#refreshable-materialized-view}
```sql
CREATE MATERIALIZED VIEW [IF NOT EXISTS] [db.]table_name
CREATE MATERIALIZED VIEW [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
REFRESH EVERY|AFTER interval [OFFSET interval]
RANDOMIZE FOR interval
DEPENDS ON [db.]name [, [db.]name [, ...]]
SETTINGS name = value [, name = value [, ...]]
[RANDOMIZE FOR interval]
[DEPENDS ON [db.]name [, [db.]name [, ...]]]
[SETTINGS name = value [, name = value [, ...]]]
[APPEND]
[TO[db.]name] [(columns)] [ENGINE = engine] [EMPTY]
[TO[db.]name] [(columns)] [ENGINE = engine]
[EMPTY]
AS SELECT ...
[COMMENT 'comment']
```
@ -281,7 +282,7 @@ This replaces *all* refresh parameters at once: schedule, dependencies, settings
The status of all refreshable materialized views is available in table [`system.view_refreshes`](../../../operations/system-tables/view_refreshes.md). In particular, it contains refresh progress (if running), last and next refresh time, exception message if a refresh failed.
To manually stop, start, trigger, or cancel refreshes use [`SYSTEM STOP|START|REFRESH|CANCEL VIEW`](../system.md#refreshable-materialized-views).
To manually stop, start, trigger, or cancel refreshes use [`SYSTEM STOP|START|REFRESH|WAIT|CANCEL VIEW`](../system.md#refreshable-materialized-views).
To wait for a refresh to complete, use [`SYSTEM WAIT VIEW`](../system.md#refreshable-materialized-views). In particular, useful for waiting for initial refresh after creating a view.

View File

@ -10,7 +10,6 @@
#include <AggregateFunctions/IAggregateFunction.h>
#include <AggregateFunctions/AggregateFunctionSum.h>
#include <Core/DecimalFunctions.h>
#include <Core/IResolvedFunction.h>
#include "config.h"
@ -141,6 +140,9 @@ public:
bool isCompilable() const override
{
if constexpr (!canBeNativeType<Numerator>() || !canBeNativeType<Denominator>())
return false;
bool can_be_compiled = true;
for (const auto & argument : this->argument_types)
@ -158,7 +160,8 @@ public:
b.CreateMemSet(aggregate_data_ptr, llvm::ConstantInt::get(b.getInt8Ty(), 0), sizeof(Fraction), llvm::assumeAligned(this->alignOfData()));
}
void compileMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) const override
void compileMergeImpl(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) const
requires(canBeNativeType<Numerator>() && canBeNativeType<Denominator>())
{
llvm::IRBuilder<> & b = static_cast<llvm::IRBuilder<> &>(builder);
@ -185,7 +188,15 @@ public:
b.CreateStore(denominator_result_value, denominator_dst_ptr);
}
llvm::Value * compileGetResult(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr) const override
void
compileMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) const override
{
if constexpr (canBeNativeType<Numerator>() && canBeNativeType<Denominator>())
compileMergeImpl(builder, aggregate_data_dst_ptr, aggregate_data_src_ptr);
}
llvm::Value * compileGetResultImpl(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr) const
requires(canBeNativeType<Numerator>() && canBeNativeType<Denominator>())
{
llvm::IRBuilder<> & b = static_cast<llvm::IRBuilder<> &>(builder);
@ -204,6 +215,13 @@ public:
return b.CreateFDiv(double_numerator, double_denominator);
}
llvm::Value * compileGetResult(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr) const override
{
if constexpr (canBeNativeType<Numerator>() && canBeNativeType<Denominator>())
return compileGetResultImpl(builder, aggregate_data_ptr);
return nullptr;
}
#endif
private:
@ -308,7 +326,8 @@ public:
#if USE_EMBEDDED_COMPILER
void compileAdd(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, const ValuesWithType & arguments) const override
void compileAddImpl(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, const ValuesWithType & arguments) const
requires(canBeNativeType<Numerator>() && canBeNativeType<Denominator>())
{
llvm::IRBuilder<> & b = static_cast<llvm::IRBuilder<> &>(builder);
@ -327,6 +346,12 @@ public:
b.CreateStore(denominator_value_updated, denominator_ptr);
}
void compileAdd(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, const ValuesWithType & arguments) const override
{
if constexpr (canBeNativeType<Numerator>() && canBeNativeType<Denominator>())
compileAddImpl(builder, aggregate_data_ptr, arguments);
}
#endif
private:

View File

@ -59,13 +59,13 @@ public:
bool isCompilable() const override
{
bool can_be_compiled = Base::isCompilable();
can_be_compiled &= canBeNativeType<Weight>();
return can_be_compiled;
if constexpr (!canBeNativeType<Weight>() || !canBeNativeType<Numerator>() || !canBeNativeType<Denominator>())
return false;
return Base::isCompilable();
}
void compileAdd(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, const ValuesWithType & arguments) const override
void compileAddImpl(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, const ValuesWithType & arguments) const
requires(canBeNativeType<Weight>() && canBeNativeType<Numerator>() && canBeNativeType<Denominator>())
{
llvm::IRBuilder<> & b = static_cast<llvm::IRBuilder<> &>(builder);
@ -94,6 +94,26 @@ public:
b.CreateStore(denominator_value_updated, denominator_ptr);
}
void
compileMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) const override
{
if constexpr (canBeNativeType<Weight>() && canBeNativeType<Numerator>() && canBeNativeType<Denominator>())
Base::compileMergeImpl(builder, aggregate_data_dst_ptr, aggregate_data_src_ptr);
}
llvm::Value * compileGetResult(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr) const override
{
if constexpr (canBeNativeType<Weight>() && canBeNativeType<Numerator>() && canBeNativeType<Denominator>())
return Base::compileGetResultImpl(builder, aggregate_data_ptr);
return nullptr;
}
void compileAdd(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, const ValuesWithType & arguments) const override
{
if constexpr (canBeNativeType<Weight>() && canBeNativeType<Numerator>() && canBeNativeType<Denominator>())
compileAddImpl(builder, aggregate_data_ptr, arguments);
}
#endif
};
@ -104,7 +124,7 @@ bool allowTypes(const DataTypePtr& left, const DataTypePtr& right) noexcept
constexpr auto allow = [](WhichDataType t)
{
return t.isInt() || t.isUInt() || t.isFloat();
return t.isInt() || t.isUInt() || t.isNativeFloat();
};
return allow(l_dt) && allow(r_dt);

View File

@ -1,12 +1,13 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/Combinators/AggregateFunctionCombinatorFactory.h>
#include <Core/Settings.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionFactory.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTFunction.h>
#include <Common/CurrentThread.h>
#include <Core/Settings.h>
static constexpr size_t MAX_AGGREGATE_FUNCTION_NAME_LENGTH = 1000;
@ -349,4 +350,9 @@ AggregateFunctionFactory & AggregateFunctionFactory::instance()
return ret;
}
bool AggregateUtils::isAggregateFunction(const ASTFunction & node)
{
return AggregateFunctionFactory::instance().isAggregateFunctionName(node.name);
}
}

View File

@ -1,7 +1,6 @@
#pragma once
#include <AggregateFunctions/IAggregateFunction.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/NullsAction.h>
#include <Common/IFactoryWithAliases.h>
@ -23,6 +22,8 @@ class IDataType;
using DataTypePtr = std::shared_ptr<const IDataType>;
using DataTypes = std::vector<DataTypePtr>;
class ASTFunction;
/**
* The invoker has arguments: name of aggregate function, types of arguments, values of parameters.
* Parameters are for "parametric" aggregate functions.
@ -114,10 +115,7 @@ private:
struct AggregateUtils
{
static bool isAggregateFunction(const ASTFunction & node)
{
return AggregateFunctionFactory::instance().isAggregateFunctionName(node.name);
}
static bool isAggregateFunction(const ASTFunction & node);
};
const String & getAggregateFunctionCanonicalNameIfAny(const String & name);

View File

@ -52,6 +52,9 @@ namespace S3RequestSetting
{
extern const S3RequestSettingsBool allow_native_copy;
extern const S3RequestSettingsString storage_class_name;
extern const S3RequestSettingsUInt64 http_max_fields;
extern const S3RequestSettingsUInt64 http_max_field_name_size;
extern const S3RequestSettingsUInt64 http_max_field_value_size;
}
namespace ErrorCodes
@ -100,6 +103,9 @@ namespace
client_configuration.requestTimeoutMs = 60 * 60 * 1000;
client_configuration.http_keep_alive_timeout = S3::DEFAULT_KEEP_ALIVE_TIMEOUT;
client_configuration.http_keep_alive_max_requests = S3::DEFAULT_KEEP_ALIVE_MAX_REQUESTS;
client_configuration.http_max_fields = request_settings[S3RequestSetting::http_max_fields];
client_configuration.http_max_field_name_size = request_settings[S3RequestSetting::http_max_field_name_size];
client_configuration.http_max_field_value_size = request_settings[S3RequestSetting::http_max_field_value_size];
S3::ClientSettings client_settings{
.use_virtual_addressing = s3_uri.is_virtual_hosted_style,

View File

@ -29,6 +29,7 @@
#include <boost/range/adaptor/map.hpp>
#include <filesystem>
#include <future>
#include <ranges>
namespace fs = std::filesystem;

View File

@ -8,7 +8,9 @@
#include <Storages/IStorage_fwd.h>
#include <Interpreters/Context_fwd.h>
#include <Common/ThreadPool_fwd.h>
#include <filesystem>
#include <future>
namespace DB

View File

@ -19,6 +19,7 @@
#include <base/scope_guard.h>
#include <iostream>
#include <thread>
#include <sys/resource.h>
#include <sys/time.h>

View File

@ -3,6 +3,7 @@
#include <Client/ProgressTable.h>
#include <Client/Suggest.h>
#include <IO/CompressionMethod.h>
#include <IO/WriteBuffer.h>
#include <Common/DNSResolver.h>
#include <Common/InterruptListener.h>

View File

@ -1024,10 +1024,10 @@ void ColumnArray::updatePermutationWithCollation(const Collator & collator, Perm
DefaultPartialSort());
}
ColumnPtr ColumnArray::compress() const
ColumnPtr ColumnArray::compress(bool force_compression) const
{
ColumnPtr data_compressed = data->compress();
ColumnPtr offsets_compressed = offsets->compress();
ColumnPtr data_compressed = data->compress(force_compression);
ColumnPtr offsets_compressed = offsets->compress(force_compression);
size_t byte_size = data_compressed->byteSize() + offsets_compressed->byteSize();

View File

@ -159,7 +159,7 @@ public:
/// For example, `getDataInRange(0, size())` is the same as `getDataPtr()->clone()`.
MutableColumnPtr getDataInRange(size_t start, size_t length) const;
ColumnPtr compress() const override;
ColumnPtr compress(bool force_compression) const override;
ColumnCheckpointPtr getCheckpoint() const override;
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;

View File

@ -16,7 +16,7 @@ namespace ErrorCodes
}
std::shared_ptr<Memory<>> ColumnCompressed::compressBuffer(const void * data, size_t data_size, bool always_compress)
std::shared_ptr<Memory<>> ColumnCompressed::compressBuffer(const void * data, size_t data_size, bool force_compression)
{
size_t max_dest_size = LZ4_COMPRESSBOUND(data_size);
@ -35,7 +35,8 @@ std::shared_ptr<Memory<>> ColumnCompressed::compressBuffer(const void * data, si
throw Exception(ErrorCodes::CANNOT_COMPRESS, "Cannot compress column");
/// If compression is inefficient.
if (!always_compress && static_cast<size_t>(compressed_size) * 2 > data_size)
const size_t threshold = force_compression ? 1 : 2;
if (static_cast<size_t>(compressed_size) * threshold > data_size)
return {};
/// Shrink to fit.

View File

@ -70,9 +70,11 @@ public:
/// Helper methods for compression.
/// If data is not worth to be compressed and not 'always_compress' - returns nullptr.
/// If data is not worth to be compressed - returns nullptr.
/// By default it requires that compressed data is at least 50% smaller than original.
/// With `force_compression` set to true, it requires compressed data to be not larger than the source data.
/// Note: shared_ptr is to allow to be captured by std::function.
static std::shared_ptr<Memory<>> compressBuffer(const void * data, size_t data_size, bool always_compress);
static std::shared_ptr<Memory<>> compressBuffer(const void * data, size_t data_size, bool force_compression);
static void decompressBuffer(
const void * compressed_data, void * decompressed_data, size_t compressed_size, size_t decompressed_size);

View File

@ -478,7 +478,7 @@ ColumnPtr ColumnDecimal<T>::replicate(const IColumn::Offsets & offsets) const
}
template <is_decimal T>
ColumnPtr ColumnDecimal<T>::compress() const
ColumnPtr ColumnDecimal<T>::compress(bool force_compression) const
{
const size_t data_size = data.size();
const size_t source_size = data_size * sizeof(T);
@ -487,7 +487,7 @@ ColumnPtr ColumnDecimal<T>::compress() const
if (source_size < 4096) /// A wild guess.
return ColumnCompressed::wrap(this->getPtr());
auto compressed = ColumnCompressed::compressBuffer(data.data(), source_size, false);
auto compressed = ColumnCompressed::compressBuffer(data.data(), source_size, force_compression);
if (!compressed)
return ColumnCompressed::wrap(this->getPtr());

View File

@ -140,7 +140,7 @@ public:
return false;
}
ColumnPtr compress() const override;
ColumnPtr compress(bool force_compression) const override;
void insertValue(const T value) { data.push_back(value); }
Container & getData() { return data; }

View File

@ -991,9 +991,9 @@ void ColumnDynamic::updatePermutation(IColumn::PermutationSortDirection directio
updatePermutationImpl(limit, res, equal_ranges, ComparatorDescendingStable(*this, nan_direction_hint), comparator_equal, DefaultSort(), DefaultPartialSort());
}
ColumnPtr ColumnDynamic::compress() const
ColumnPtr ColumnDynamic::compress(bool force_compression) const
{
ColumnPtr variant_compressed = variant_column_ptr->compress();
ColumnPtr variant_compressed = variant_column_ptr->compress(force_compression);
size_t byte_size = variant_compressed->byteSize();
return ColumnCompressed::create(size(), byte_size,
[my_variant_compressed = std::move(variant_compressed), my_variant_info = variant_info, my_max_dynamic_types = max_dynamic_types, my_global_max_dynamic_types = global_max_dynamic_types, my_statistics = statistics]() mutable

View File

@ -335,7 +335,7 @@ public:
return false;
}
ColumnPtr compress() const override;
ColumnPtr compress(bool force_compression) const override;
double getRatioOfDefaultRows(double sample_ratio) const override
{

View File

@ -419,7 +419,7 @@ void ColumnFixedString::getExtremes(Field & min, Field & max) const
get(max_idx, max);
}
ColumnPtr ColumnFixedString::compress() const
ColumnPtr ColumnFixedString::compress(bool force_compression) const
{
size_t source_size = chars.size();
@ -427,7 +427,7 @@ ColumnPtr ColumnFixedString::compress() const
if (source_size < 4096) /// A wild guess.
return ColumnCompressed::wrap(this->getPtr());
auto compressed = ColumnCompressed::compressBuffer(chars.data(), source_size, false);
auto compressed = ColumnCompressed::compressBuffer(chars.data(), source_size, force_compression);
if (!compressed)
return ColumnCompressed::wrap(this->getPtr());

View File

@ -175,7 +175,7 @@ public:
ColumnPtr replicate(const Offsets & offsets) const override;
ColumnPtr compress() const override;
ColumnPtr compress(bool force_compression) const override;
void reserve(size_t size) override
{

View File

@ -352,9 +352,9 @@ bool ColumnMap::dynamicStructureEquals(const IColumn & rhs) const
return false;
}
ColumnPtr ColumnMap::compress() const
ColumnPtr ColumnMap::compress(bool force_compression) const
{
auto compressed = nested->compress();
auto compressed = nested->compress(force_compression);
const auto byte_size = compressed->byteSize();
/// The order of evaluation of function arguments is unspecified
/// and could cause interacting with object in moved-from state

View File

@ -120,7 +120,7 @@ public:
const ColumnTuple & getNestedData() const { return assert_cast<const ColumnTuple &>(getNestedColumn().getData()); }
ColumnTuple & getNestedData() { return assert_cast<ColumnTuple &>(getNestedColumn().getData()); }
ColumnPtr compress() const override;
ColumnPtr compress(bool force_compression) const override;
bool hasDynamicStructure() const override { return nested->hasDynamicStructure(); }
bool dynamicStructureEquals(const IColumn & rhs) const override;

View File

@ -773,10 +773,10 @@ void ColumnNullable::protect()
getNullMapColumn().protect();
}
ColumnPtr ColumnNullable::compress() const
ColumnPtr ColumnNullable::compress(bool force_compression) const
{
ColumnPtr nested_compressed = nested_column->compress();
ColumnPtr null_map_compressed = null_map->compress();
ColumnPtr nested_compressed = nested_column->compress(force_compression);
ColumnPtr null_map_compressed = null_map->compress(force_compression);
size_t byte_size = nested_column->byteSize() + null_map->byteSize();

View File

@ -141,7 +141,7 @@ public:
// Special function for nullable minmax index
void getExtremesNullLast(Field & min, Field & max) const;
ColumnPtr compress() const override;
ColumnPtr compress(bool force_compression) const override;
ColumnCheckpointPtr getCheckpoint() const override;
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;

View File

@ -1225,14 +1225,14 @@ bool ColumnObject::structureEquals(const IColumn & rhs) const
return true;
}
ColumnPtr ColumnObject::compress() const
ColumnPtr ColumnObject::compress(bool force_compression) const
{
std::unordered_map<String, ColumnPtr> compressed_typed_paths;
compressed_typed_paths.reserve(typed_paths.size());
size_t byte_size = 0;
for (const auto & [path, column] : typed_paths)
{
auto compressed_column = column->compress();
auto compressed_column = column->compress(force_compression);
byte_size += compressed_column->byteSize();
compressed_typed_paths[path] = std::move(compressed_column);
}
@ -1241,12 +1241,12 @@ ColumnPtr ColumnObject::compress() const
compressed_dynamic_paths.reserve(dynamic_paths_ptrs.size());
for (const auto & [path, column] : dynamic_paths_ptrs)
{
auto compressed_column = column->compress();
auto compressed_column = column->compress(force_compression);
byte_size += compressed_column->byteSize();
compressed_dynamic_paths[path] = std::move(compressed_column);
}
auto compressed_shared_data = shared_data->compress();
auto compressed_shared_data = shared_data->compress(force_compression);
byte_size += compressed_shared_data->byteSize();
auto decompress =

View File

@ -171,7 +171,7 @@ public:
bool structureEquals(const IColumn & rhs) const override;
ColumnPtr compress() const override;
ColumnPtr compress(bool force_compression) const override;
void finalize() override;
bool isFinalized() const override;

View File

@ -774,10 +774,10 @@ UInt64 ColumnSparse::getNumberOfDefaultRows() const
return _size - offsets->size();
}
ColumnPtr ColumnSparse::compress() const
ColumnPtr ColumnSparse::compress(bool force_compression) const
{
auto values_compressed = values->compress();
auto offsets_compressed = offsets->compress();
auto values_compressed = values->compress(force_compression);
auto offsets_compressed = offsets->compress(force_compression);
size_t byte_size = values_compressed->byteSize() + offsets_compressed->byteSize();

View File

@ -147,7 +147,7 @@ public:
double getRatioOfDefaultRows(double sample_ratio) const override;
UInt64 getNumberOfDefaultRows() const override;
ColumnPtr compress() const override;
ColumnPtr compress(bool force_compression) const override;
ColumnCheckpointPtr getCheckpoint() const override;
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;

View File

@ -628,33 +628,46 @@ void ColumnString::getExtremes(Field & min, Field & max) const
get(max_idx, max);
}
ColumnPtr ColumnString::compress() const
ColumnPtr ColumnString::compress(bool force_compression) const
{
const size_t source_chars_size = chars.size();
const size_t source_offsets_elements = offsets.size();
const size_t source_offsets_size = source_offsets_elements * sizeof(Offset);
/// Don't compress small blocks.
if (source_chars_size < 4096) /// A wild guess.
if (source_chars_size < min_size_to_compress)
{
return ColumnCompressed::wrap(this->getPtr());
}
auto chars_compressed = ColumnCompressed::compressBuffer(chars.data(), source_chars_size, false);
auto chars_compressed = ColumnCompressed::compressBuffer(chars.data(), source_chars_size, force_compression);
/// Return original column if not compressible.
if (!chars_compressed)
{
return ColumnCompressed::wrap(this->getPtr());
}
auto offsets_compressed = ColumnCompressed::compressBuffer(offsets.data(), source_offsets_size, true);
auto offsets_compressed = ColumnCompressed::compressBuffer(offsets.data(), source_offsets_size, force_compression);
const bool offsets_were_compressed = !!offsets_compressed;
/// Offsets are not compressible. Use the source data.
if (!offsets_compressed)
{
offsets_compressed = std::make_shared<Memory<>>(source_offsets_size);
memcpy(offsets_compressed->data(), offsets.data(), source_offsets_size);
}
const size_t chars_compressed_size = chars_compressed->size();
const size_t offsets_compressed_size = offsets_compressed->size();
return ColumnCompressed::create(source_offsets_elements, chars_compressed_size + offsets_compressed_size,
[
my_chars_compressed = std::move(chars_compressed),
my_offsets_compressed = std::move(offsets_compressed),
source_chars_size,
source_offsets_elements
]
return ColumnCompressed::create(
source_offsets_elements,
chars_compressed_size + offsets_compressed_size,
[my_chars_compressed = std::move(chars_compressed),
my_offsets_compressed = std::move(offsets_compressed),
source_chars_size,
source_offsets_elements,
offsets_were_compressed]
{
auto res = ColumnString::create();
@ -664,8 +677,18 @@ ColumnPtr ColumnString::compress() const
ColumnCompressed::decompressBuffer(
my_chars_compressed->data(), res->getChars().data(), my_chars_compressed->size(), source_chars_size);
ColumnCompressed::decompressBuffer(
my_offsets_compressed->data(), res->getOffsets().data(), my_offsets_compressed->size(), source_offsets_elements * sizeof(Offset));
if (offsets_were_compressed)
{
ColumnCompressed::decompressBuffer(
my_offsets_compressed->data(),
res->getOffsets().data(),
my_offsets_compressed->size(),
source_offsets_elements * sizeof(Offset));
}
else
{
memcpy(res->getOffsets().data(), my_offsets_compressed->data(), my_offsets_compressed->size());
}
return res;
});

View File

@ -29,6 +29,8 @@ public:
using Char = UInt8;
using Chars = PaddedPODArray<UInt8>;
static constexpr size_t min_size_to_compress = 4096;
private:
friend class COWHelper<IColumnHelper<ColumnString>, ColumnString>;
@ -272,7 +274,7 @@ public:
ColumnPtr replicate(const Offsets & replicate_offsets) const override;
ColumnPtr compress() const override;
ColumnPtr compress(bool force_compression) const override;
void reserve(size_t n) override;
size_t capacity() const override;

View File

@ -796,7 +796,7 @@ void ColumnTuple::takeDynamicStructureFromSourceColumns(const Columns & source_c
}
ColumnPtr ColumnTuple::compress() const
ColumnPtr ColumnTuple::compress(bool force_compression) const
{
if (columns.empty())
{
@ -812,7 +812,7 @@ ColumnPtr ColumnTuple::compress() const
compressed.reserve(columns.size());
for (const auto & column : columns)
{
auto compressed_column = column->compress();
auto compressed_column = column->compress(force_compression);
byte_size += compressed_column->byteSize();
compressed.emplace_back(std::move(compressed_column));
}

View File

@ -125,7 +125,7 @@ public:
void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override;
bool structureEquals(const IColumn & rhs) const override;
bool isCollationSupported() const override;
ColumnPtr compress() const override;
ColumnPtr compress(bool force_compression) const override;
void finalize() override;
bool isFinalized() const override;

View File

@ -1426,16 +1426,16 @@ bool ColumnVariant::dynamicStructureEquals(const IColumn & rhs) const
return true;
}
ColumnPtr ColumnVariant::compress() const
ColumnPtr ColumnVariant::compress(bool force_compression) const
{
ColumnPtr local_discriminators_compressed = local_discriminators->compress();
ColumnPtr offsets_compressed = offsets->compress();
ColumnPtr local_discriminators_compressed = local_discriminators->compress(force_compression);
ColumnPtr offsets_compressed = offsets->compress(force_compression);
size_t byte_size = local_discriminators_compressed->byteSize() + offsets_compressed->byteSize();
Columns compressed;
compressed.reserve(variants.size());
for (const auto & variant : variants)
{
auto compressed_variant = variant->compress();
auto compressed_variant = variant->compress(force_compression);
byte_size += compressed_variant->byteSize();
compressed.emplace_back(std::move(compressed_variant));
}

View File

@ -254,7 +254,7 @@ public:
void forEachSubcolumn(MutableColumnCallback callback) override;
void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override;
bool structureEquals(const IColumn & rhs) const override;
ColumnPtr compress() const override;
ColumnPtr compress(bool force_compression) const override;
double getRatioOfDefaultRows(double sample_ratio) const override;
UInt64 getNumberOfDefaultRows() const override;
void getIndicesOfNonDefaultRows(Offsets & indices, size_t from, size_t limit) const override;

View File

@ -951,7 +951,7 @@ void ColumnVector<T>::getExtremes(Field & min, Field & max) const
}
template <typename T>
ColumnPtr ColumnVector<T>::compress() const
ColumnPtr ColumnVector<T>::compress(bool force_compression) const
{
const size_t data_size = data.size();
const size_t source_size = data_size * sizeof(T);
@ -960,7 +960,7 @@ ColumnPtr ColumnVector<T>::compress() const
if (source_size < 4096) /// A wild guess.
return ColumnCompressed::wrap(this->getPtr());
auto compressed = ColumnCompressed::compressBuffer(data.data(), source_size, false);
auto compressed = ColumnCompressed::compressBuffer(data.data(), source_size, force_compression);
if (!compressed)
return ColumnCompressed::wrap(this->getPtr());

View File

@ -287,7 +287,7 @@ public:
ColumnPtr createWithOffsets(const IColumn::Offsets & offsets, const ColumnConst & column_with_default_value, size_t total_rows, size_t shift) const override;
ColumnPtr compress() const override;
ColumnPtr compress(bool force_compression) const override;
/// Replace elements that match the filter with zeroes. If inverted replaces not matched elements.
void applyZeroMap(const IColumn::Filter & filt, bool inverted = false);

View File

@ -601,7 +601,8 @@ public:
/// Compress column in memory to some representation that allows to decompress it back.
/// Return itself if compression is not applicable for this column type.
[[nodiscard]] virtual Ptr compress() const
/// The flag `force_compression` indicates that compression should be performed even if it's not efficient (if only compression factor < 1).
[[nodiscard]] virtual Ptr compress([[maybe_unused]] bool force_compression) const
{
/// No compression by default.
return getPtr();

View File

@ -0,0 +1,88 @@
#include <gtest/gtest.h>
#include <Columns/ColumnString.h>
#include <Common/randomSeed.h>
#include <Common/thread_local_rng.h>
using namespace DB;
static pcg64 rng(randomSeed());
constexpr size_t bytes_per_string = sizeof(uint64_t) + 1;
/// Column should have enough bytes to be compressed
constexpr size_t column_size = ColumnString::min_size_to_compress / bytes_per_string + 42;
TEST(ColumnString, Incompressible)
{
auto col = ColumnString::create();
auto & chars = col->getChars();
auto & offsets = col->getOffsets();
chars.resize(column_size * bytes_per_string);
for (size_t i = 0; i < column_size; ++i)
{
const uint64_t value = rng();
memcpy(&chars[i * bytes_per_string], &value, sizeof(uint64_t));
chars[i * bytes_per_string + sizeof(uint64_t)] = '\0';
offsets.push_back((i + 1) * bytes_per_string);
}
auto compressed = col->compress(true);
auto decompressed = compressed->decompress();
// When column is incompressible, we return the original column wrapped in CompressedColumn
ASSERT_EQ(decompressed.get(), col.get());
ASSERT_EQ(compressed->size(), col->size());
ASSERT_EQ(compressed->allocatedBytes(), col->allocatedBytes());
ASSERT_EQ(decompressed->size(), col->size());
ASSERT_EQ(decompressed->allocatedBytes(), col->allocatedBytes());
}
TEST(ColumnString, CompressibleCharsAndIncompressibleOffsets)
{
auto col = ColumnString::create();
auto & chars = col->getChars();
auto & offsets = col->getOffsets();
chars.resize(column_size * bytes_per_string);
for (size_t i = 0; i < column_size; ++i)
{
static const uint64_t value = 42;
memcpy(&chars[i * bytes_per_string], &value, sizeof(uint64_t));
chars[i * bytes_per_string + sizeof(uint64_t)] = '\0';
}
offsets.push_back(chars.size());
auto compressed = col->compress(true);
auto decompressed = compressed->decompress();
// For actually compressed column only compressed `chars` and `offsets` arrays are stored.
// Upon decompression, a new column is created.
ASSERT_NE(decompressed.get(), col.get());
ASSERT_EQ(compressed->size(), col->size());
ASSERT_LE(compressed->allocatedBytes(), col->allocatedBytes());
ASSERT_EQ(decompressed->size(), col->size());
ASSERT_LE(decompressed->allocatedBytes(), col->allocatedBytes());
}
TEST(ColumnString, CompressibleCharsAndCompressibleOffsets)
{
auto col = ColumnString::create();
auto & chars = col->getChars();
auto & offsets = col->getOffsets();
chars.resize(column_size * bytes_per_string);
for (size_t i = 0; i < column_size; ++i)
{
static const uint64_t value = 42;
memcpy(&chars[i * bytes_per_string], &value, sizeof(uint64_t));
chars[i * bytes_per_string + sizeof(uint64_t)] = '\0';
offsets.push_back((i + 1) * bytes_per_string);
}
auto compressed = col->compress(true);
auto decompressed = compressed->decompress();
// For actually compressed column only compressed `chars` and `offsets` arrays are stored.
// Upon decompression, a new column is created.
ASSERT_NE(decompressed.get(), col.get());
ASSERT_EQ(compressed->size(), col->size());
ASSERT_LE(compressed->allocatedBytes(), col->allocatedBytes());
ASSERT_EQ(decompressed->size(), col->size());
ASSERT_LE(decompressed->allocatedBytes(), col->allocatedBytes());
}

View File

@ -1,11 +1,9 @@
#pragma once
#include <string>
#include <iomanip>
#include <exception>
#include <Common/DateLUT.h>
#include <Common/LocalDate.h>
#include <string>
/** Stores calendar date and time in broken-down form.
* Could be initialized from date and time in text form like '2011-01-01 00:00:00' or from time_t.

View File

@ -11,6 +11,7 @@
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/formatAST.h>
#include <Parsers/parseQuery.h>
#include <base/sort.h>
#include <boost/algorithm/hex.hpp>
#include <Common/NamedCollections/NamedCollectionConfiguration.h>
#include <Common/NamedCollections/NamedCollectionsMetadataStorage.h>

View File

@ -16,6 +16,8 @@
#include <Core/Settings.h>
#include <Poco/Environment.h>
#include <thread>
#pragma clang diagnostic ignored "-Wreserved-identifier"
namespace DB

View File

@ -1,6 +1,7 @@
#include <vector>
#include <string>
#include <iomanip>
#include <iostream>
#include <Common/SipHash.h>
#include <IO/ReadBufferFromFileDescriptor.h>

View File

@ -3,6 +3,8 @@
#include <mysqlxx/Query.h>
#include <mysqlxx/Exception.h>
#include <base/preciseExp10.h>
namespace mysqlxx
{

View File

@ -1,19 +1,11 @@
#pragma once
#include <string.h>
#include <stdio.h>
#include <time.h>
#include <math.h>
#include <string>
#include <limits>
#include <base/preciseExp10.h>
#include <base/types.h>
#include <Common/DateLUT.h>
#include <mysqlxx/Types.h>
#include <Common/LocalDateTime.h>
#include <mysqlxx/Types.h>
#include <ostream>
#include <string>
namespace mysqlxx

View File

@ -1,14 +1,14 @@
#include "config.h"
#include <string_view>
#include <Common/Exception.h>
#include <base/types.h>
#include <IO/VarInt.h>
#include <Compression/CompressionFactory.h>
#include <Compression/CompressionCodecEncrypted.h>
#include <Compression/CompressionFactory.h>
#include <IO/VarInt.h>
#include <Parsers/IAST.h>
#include <Poco/Logger.h>
#include <base/types.h>
#include <Common/Exception.h>
#include <Common/MemorySanitizer.h>
#include <Common/logger_useful.h>
#include <Common/safe_cast.h>
#include "config.h"
#if USE_SSL
# include <openssl/err.h>

View File

@ -11,11 +11,13 @@
#include <Coordination/WriteBufferFromNuraftBuffer.h>
#include <Core/Field.h>
#include <Disks/DiskLocal.h>
#include <IO/CompressionMethod.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <IO/copyData.h>
#include <base/sort.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/ZooKeeper/ZooKeeperIO.h>
#include <Common/logger_useful.h>

View File

@ -616,7 +616,7 @@ Block Block::compress() const
size_t num_columns = data.size();
Columns new_columns(num_columns);
for (size_t i = 0; i < num_columns; ++i)
new_columns[i] = data[i].column->compress();
new_columns[i] = data[i].column->compress(/*force_compression=*/false);
return cloneWithColumns(new_columns);
}

View File

@ -1,8 +1,11 @@
#include "MySQLGtid.h"
#include <boost/algorithm/string.hpp>
#include <Core/UUID.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <boost/algorithm/string.hpp>
namespace DB
{

View File

@ -9,6 +9,7 @@
#include <Common/FieldVisitorToString.h>
#include <Core/MySQL/PacketsGeneric.h>
#include <Core/MySQL/PacketsProtocolText.h>
#include <Core/UUID.h>
namespace DB

View File

@ -10,7 +10,7 @@ namespace DB
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int UNKNOWN_ELEMENT_OF_ENUM;
}
template <typename T>
@ -38,7 +38,7 @@ public:
{
auto it = value_to_name_map.find(value);
if (it == value_to_name_map.end())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected value {} in enum", toString(value));
throw Exception(ErrorCodes::UNKNOWN_ELEMENT_OF_ENUM, "Unexpected value {} in enum", toString(value));
return it;
}

View File

@ -30,7 +30,7 @@ bool canBeNativeType(const IDataType & type);
bool canBeNativeType(const DataTypePtr & type);
template <typename Type>
static inline bool canBeNativeType()
static constexpr bool canBeNativeType()
{
if constexpr (std::is_same_v<Type, Int8> || std::is_same_v<Type, UInt8>)
return true;
@ -74,14 +74,12 @@ static inline llvm::Type * toNativeType(llvm::IRBuilderBase & builder)
template <typename ToType>
static inline DataTypePtr toNativeDataType()
{
if constexpr (std::is_same_v<ToType, Int8> || std::is_same_v<ToType, UInt8> ||
static_assert(std::is_same_v<ToType, Int8> || std::is_same_v<ToType, UInt8> ||
std::is_same_v<ToType, Int16> || std::is_same_v<ToType, UInt16> ||
std::is_same_v<ToType, Int32> || std::is_same_v<ToType, UInt32> ||
std::is_same_v<ToType, Int64> || std::is_same_v<ToType, UInt64> ||
std::is_same_v<ToType, Float32> || std::is_same_v<ToType, Float64>)
return std::make_shared<DataTypeNumber<ToType>>();
throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid cast to native data type");
std::is_same_v<ToType, Float32> || std::is_same_v<ToType, Float64>);
return std::make_shared<DataTypeNumber<ToType>>();
}
/// Cast LLVM value with type to bool

View File

@ -96,7 +96,9 @@ void SerializationEnum<Type>::deserializeWholeText(IColumn & column, ReadBuffer
{
std::string field_name;
readStringUntilEOF(field_name, istr);
assert_cast<ColumnType &>(column).getData().push_back(ref_enum_values.getValue(StringRef(field_name)));
Type t;
if (ref_enum_values.tryGetValue(t, StringRef(field_name)))
assert_cast<ColumnType &>(column).getData().push_back(t);
}
}

View File

@ -162,13 +162,11 @@ void SerializationInfo::deserializeFromKindsBinary(ReadBuffer & in)
kind = *maybe_kind;
}
Poco::JSON::Object SerializationInfo::toJSON() const
void SerializationInfo::toJSON(Poco::JSON::Object & object) const
{
Poco::JSON::Object object;
object.set(KEY_KIND, ISerialization::kindToString(kind));
object.set(KEY_NUM_DEFAULTS, data.num_defaults);
object.set(KEY_NUM_ROWS, data.num_rows);
return object;
}
void SerializationInfo::fromJSON(const Poco::JSON::Object & object)
@ -276,7 +274,8 @@ void SerializationInfoByName::writeJSON(WriteBuffer & out) const
Poco::JSON::Array column_infos;
for (const auto & [name, info] : *this)
{
auto info_json = info->toJSON();
Poco::JSON::Object info_json;
info->toJSON(info_json);
info_json.set(KEY_NAME, name);
column_infos.add(std::move(info_json)); /// NOLINT
}

View File

@ -4,8 +4,10 @@
#include <DataTypes/Serializations/ISerialization.h>
#include <DataTypes/Serializations/SerializationInfoSettings.h>
#include <Poco/JSON/Object.h>
namespace Poco::JSON
{
class Object;
}
namespace DB
{
@ -67,7 +69,7 @@ public:
virtual void serialializeKindBinary(WriteBuffer & out) const;
virtual void deserializeFromKindsBinary(ReadBuffer & in);
virtual Poco::JSON::Object toJSON() const;
virtual void toJSON(Poco::JSON::Object & object) const;
virtual void fromJSON(const Poco::JSON::Object & object);
void setKind(ISerialization::Kind kind_) { kind = kind_; }

View File

@ -3,6 +3,8 @@
#include <Columns/ColumnTuple.h>
#include <Common/assert_cast.h>
#include <Poco/JSON/Object.h>
namespace DB
{
@ -151,15 +153,17 @@ void SerializationInfoTuple::deserializeFromKindsBinary(ReadBuffer & in)
elem->deserializeFromKindsBinary(in);
}
Poco::JSON::Object SerializationInfoTuple::toJSON() const
void SerializationInfoTuple::toJSON(Poco::JSON::Object & object) const
{
auto object = SerializationInfo::toJSON();
SerializationInfo::toJSON(object);
Poco::JSON::Array subcolumns;
for (const auto & elem : elems)
subcolumns.add(elem->toJSON());
{
Poco::JSON::Object sub_column_json;
elem->toJSON(sub_column_json);
subcolumns.add(sub_column_json);
}
object.set("subcolumns", subcolumns);
return object;
}
void SerializationInfoTuple::fromJSON(const Poco::JSON::Object & object)

View File

@ -29,7 +29,7 @@ public:
void serialializeKindBinary(WriteBuffer & out) const override;
void deserializeFromKindsBinary(ReadBuffer & in) override;
Poco::JSON::Object toJSON() const override;
void toJSON(Poco::JSON::Object & object) const override;
void fromJSON(const Poco::JSON::Object & object) override;
const MutableSerializationInfoPtr & getElementInfo(size_t i) const { return elems[i]; }

View File

@ -256,7 +256,7 @@ PostgreSQLTableStructure::ColumnsInfoPtr readNamesAndTypesList(
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"PostgreSQL cannot infer dimensions of an empty array: {}.{}",
"PostgreSQL cannot infer dimensions of an empty array: {}.{}. Make sure no empty array values in the first row.",
postgres_table,
postgres_column);
}

View File

@ -10,6 +10,7 @@
#include <base/defines.h>
#include <Disks/ObjectStorages/MetadataStorageFromDisk.h>
#include <Disks/ObjectStorages/MetadataStorageFromPlainObjectStorageOperations.h>
#include <boost/algorithm/string/join.hpp>
namespace DB
@ -593,6 +594,39 @@ struct TruncateFileObjectStorageOperation final : public IDiskObjectStorageOpera
}
};
struct CreateEmptyFileObjectStorageOperation final : public IDiskObjectStorageOperation
{
StoredObject object;
CreateEmptyFileObjectStorageOperation(
IObjectStorage & object_storage_,
IMetadataStorage & metadata_storage_,
const std::string & path_)
: IDiskObjectStorageOperation(object_storage_, metadata_storage_)
{
const auto key = object_storage.generateObjectKeyForPath(path_, std::nullopt);
object = StoredObject(key.serialize(), path_, /* file_size */0);
}
std::string getInfoForLog() const override
{
return fmt::format("CreateEmptyFileObjectStorageOperation (remote path: {}, local path: {})", object.remote_path, object.local_path);
}
void execute(MetadataTransactionPtr /* tx */) override
{
auto buf = object_storage.writeObject(object, WriteMode::Rewrite);
buf->finalize();
}
void undo() override
{
object_storage.removeObjectIfExists(object);
}
void finalize() override {}
};
}
void DiskObjectStorageTransaction::createDirectory(const std::string & path)
@ -910,12 +944,15 @@ void DiskObjectStorageTransaction::chmod(const String & path, mode_t mode)
void DiskObjectStorageTransaction::createFile(const std::string & path)
{
operations_to_execute.emplace_back(
std::make_unique<PureMetadataObjectStorageOperation>(object_storage, metadata_storage, [path, this](MetadataTransactionPtr tx)
{
if (object_storage.isPlain() && !object_storage.isWriteOnce())
tx->createEmptyFile(path);
if (object_storage.isPlain() && !object_storage.isWriteOnce())
{
operations_to_execute.emplace_back(
std::make_unique<CreateEmptyFileObjectStorageOperation>(object_storage, metadata_storage, path));
}
operations_to_execute.emplace_back(
std::make_unique<PureMetadataObjectStorageOperation>(object_storage, metadata_storage, [path](MetadataTransactionPtr tx)
{
tx->createEmptyMetadataFile(path);
}));
}

View File

@ -10,25 +10,6 @@
namespace DB
{
class MetadataStorageFromPlainRewritableObjectStorageTransaction final : public MetadataStorageFromPlainObjectStorageTransaction
{
public:
explicit MetadataStorageFromPlainRewritableObjectStorageTransaction(
MetadataStorageFromPlainObjectStorage & metadata_storage_, ObjectStoragePtr object_storage_)
: DB::MetadataStorageFromPlainObjectStorageTransaction(metadata_storage_, object_storage_)
{
}
void createEmptyFile(const std::string & path) override
{
const auto key = object_storage->generateObjectKeyForPath(path, std::nullopt);
StoredObject object(key.serialize(), "", /* file_size */0);
auto buf = object_storage->writeObject(object, WriteMode::Rewrite);
buf->finalize();
}
};
class MetadataStorageFromPlainRewritableObjectStorage final : public MetadataStorageFromPlainObjectStorage
{
private:
@ -42,11 +23,6 @@ public:
MetadataStorageType getType() const override { return MetadataStorageType::PlainRewritable; }
MetadataTransactionPtr createTransaction() override
{
return std::make_shared<MetadataStorageFromPlainRewritableObjectStorageTransaction>(*this, object_storage);
}
bool existsFile(const std::string & path) const override;
bool existsDirectory(const std::string & path) const override;

View File

@ -82,7 +82,7 @@ void skipFieldByEscapingRule(ReadBuffer & buf, FormatSettings::EscapingRule esca
readCSVStringInto(out, buf, format_settings.csv);
break;
case FormatSettings::EscapingRule::JSON:
skipJSONField(buf, StringRef(field_name, field_name_len), format_settings.json);
skipJSONField(buf, std::string_view(field_name, field_name_len), format_settings.json);
break;
case FormatSettings::EscapingRule::Raw:
readStringInto(out, buf);

View File

@ -7,10 +7,10 @@
#include <Columns/ColumnVector.h>
#include <Core/Settings.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeEnum.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/IFunction.h>
#include <Interpreters/Context.h>
@ -179,8 +179,10 @@ public:
{
const auto * col = argument.column.get();
const auto * type = argument.type.get();
auto res = ColumnString::create();
res->reserve(col->size());
if constexpr (std::is_same_v<DataTypeEnum8, EnumType>)
{
const auto * enum_col = typeid_cast<const ColumnInt8 *>(col);
@ -212,17 +214,10 @@ public:
ColumnPtr column_haystack = haystack_argument.column;
const ColumnPtr & column_needle = (argument_order == ArgumentOrder::HaystackNeedle) ? arguments[1].column : arguments[0].column;
bool is_enum8 = isEnum8(haystack_argument.type);
bool is_enum16 = isEnum16(haystack_argument.type);
if (is_enum8)
{
if (isEnum8(haystack_argument.type))
column_haystack = genStringColumnFromEnumColumn<DataTypeEnum8>(haystack_argument);
}
if (is_enum16)
{
if (isEnum16(haystack_argument.type))
column_haystack = genStringColumnFromEnumColumn<DataTypeEnum16>(haystack_argument);
}
ColumnPtr column_start_pos = nullptr;
if (arguments.size() >= 3)

View File

@ -23,13 +23,17 @@ struct NameEmpty
static constexpr auto name = "empty";
};
using FunctionEmpty = FunctionStringOrArrayToT<EmptyImpl<false>, NameEmpty, UInt8, false>;
struct NameNotEmpty
{
static constexpr auto name = "notEmpty";
};
/// Implements the empty function for JSON type.
template <bool negative, class Name>
class ExecutableFunctionJSONEmpty : public IExecutableFunction
{
public:
std::string getName() const override { return NameEmpty::name; }
std::string getName() const override { return Name::name; }
private:
bool useDefaultImplementationForConstants() const override { return true; }
@ -48,7 +52,7 @@ private:
/// If object column has at least 1 typed path, it will never be empty, because these paths always have values.
if (!typed_paths.empty())
{
data.resize_fill(size, 0);
data.resize_fill(size, negative);
return res;
}
@ -76,19 +80,20 @@ private:
}
}
data.push_back(empty);
data.push_back(negative ^ empty);
}
return res;
}
};
template <bool negative, class Name>
class FunctionEmptyJSON final : public IFunctionBase
{
public:
FunctionEmptyJSON(const DataTypes & argument_types_, const DataTypePtr & return_type_) : argument_types(argument_types_), return_type(return_type_) {}
String getName() const override { return NameEmpty::name; }
String getName() const override { return Name::name; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
@ -97,7 +102,7 @@ public:
ExecutableFunctionPtr prepare(const ColumnsWithTypeAndName &) const override
{
return std::make_unique<ExecutableFunctionJSONEmpty>();
return std::make_unique<ExecutableFunctionJSONEmpty<negative, Name>>();
}
private:
@ -105,17 +110,18 @@ private:
DataTypePtr return_type;
};
template <bool negative, class Name>
class FunctionEmptyOverloadResolver final : public IFunctionOverloadResolver
{
public:
static constexpr auto name = NameEmpty::name;
static constexpr auto name = Name::name;
static FunctionOverloadResolverPtr create(ContextPtr)
{
return std::make_unique<FunctionEmptyOverloadResolver>();
}
String getName() const override { return NameEmpty::name; }
String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 1; }
FunctionBasePtr buildImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & return_type) const override
@ -126,9 +132,9 @@ public:
argument_types.push_back(arg.type);
if (argument_types.size() == 1 && isObject(argument_types[0]))
return std::make_shared<FunctionEmptyJSON>(argument_types, return_type);
return std::make_shared<FunctionEmptyJSON<negative, Name>>(argument_types, return_type);
return std::make_shared<FunctionToFunctionBaseAdaptor>(std::make_shared<FunctionEmpty>(), argument_types, return_type);
return std::make_shared<FunctionToFunctionBaseAdaptor>(std::make_shared<FunctionStringOrArrayToT<EmptyImpl<negative>, Name, UInt8, false>>(), argument_types, return_type);
}
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
@ -155,7 +161,9 @@ public:
REGISTER_FUNCTION(Empty)
{
factory.registerFunction<FunctionEmptyOverloadResolver>();
factory.registerFunction<FunctionEmptyOverloadResolver<true, NameNotEmpty>>();
factory.registerFunction<FunctionEmptyOverloadResolver<false, NameEmpty>>();
}
}

View File

@ -1,25 +0,0 @@
#include <DataTypes/DataTypeString.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionStringOrArrayToT.h>
#include <Functions/EmptyImpl.h>
namespace DB
{
namespace
{
struct NameNotEmpty
{
static constexpr auto name = "notEmpty";
};
using FunctionNotEmpty = FunctionStringOrArrayToT<EmptyImpl<true>, NameNotEmpty, UInt8, false>;
}
REGISTER_FUNCTION(NotEmpty)
{
factory.registerFunction<FunctionNotEmpty>();
}
}

View File

@ -56,6 +56,9 @@ void CascadeWriteBuffer::nextImpl()
CascadeWriteBuffer::WriteBufferPtrs CascadeWriteBuffer::getResultBuffers()
{
if (!curr_buffer)
return {};
/// Sync position with underlying buffer before invalidating
curr_buffer->position() = position();

View File

@ -1,13 +1,11 @@
#pragma once
#include <cassert>
#include <cstring>
#include <memory>
#include <Common/Exception.h>
#include <Common/Priority.h>
#include <IO/BufferBase.h>
#include <IO/AsynchronousReader.h>
namespace DB

View File

@ -1585,14 +1585,14 @@ template bool readDateTimeTextFallback<bool, true>(time_t &, ReadBuffer &, const
template <typename ReturnType>
ReturnType skipJSONFieldImpl(ReadBuffer & buf, StringRef name_of_field, const FormatSettings::JSON & settings, size_t current_depth)
ReturnType skipJSONFieldImpl(ReadBuffer & buf, std::string_view name_of_field, const FormatSettings::JSON & settings, size_t current_depth)
{
static constexpr bool throw_exception = std::is_same_v<ReturnType, void>;
if (unlikely(current_depth > settings.max_depth))
{
if constexpr (throw_exception)
throw Exception(ErrorCodes::TOO_DEEP_RECURSION, "JSON is too deep for key '{}'", name_of_field.toString());
throw Exception(ErrorCodes::TOO_DEEP_RECURSION, "JSON is too deep for key '{}'", name_of_field);
return ReturnType(false);
}
@ -1602,7 +1602,7 @@ ReturnType skipJSONFieldImpl(ReadBuffer & buf, StringRef name_of_field, const Fo
if (buf.eof())
{
if constexpr (throw_exception)
throw Exception(ErrorCodes::INCORRECT_DATA, "Unexpected EOF for key '{}'", name_of_field.toString());
throw Exception(ErrorCodes::INCORRECT_DATA, "Unexpected EOF for key '{}'", name_of_field);
return ReturnType(false);
}
if (*buf.position() == '"') /// skip double-quoted string
@ -1622,7 +1622,7 @@ ReturnType skipJSONFieldImpl(ReadBuffer & buf, StringRef name_of_field, const Fo
if (!tryReadFloatText(v, buf))
{
if constexpr (throw_exception)
throw Exception(ErrorCodes::INCORRECT_DATA, "Expected a number field for key '{}'", name_of_field.toString());
throw Exception(ErrorCodes::INCORRECT_DATA, "Expected a number field for key '{}'", name_of_field);
return ReturnType(false);
}
}
@ -1680,7 +1680,7 @@ ReturnType skipJSONFieldImpl(ReadBuffer & buf, StringRef name_of_field, const Fo
else
{
if constexpr (throw_exception)
throw Exception(ErrorCodes::INCORRECT_DATA, "Unexpected symbol for key '{}'", name_of_field.toString());
throw Exception(ErrorCodes::INCORRECT_DATA, "Unexpected symbol for key '{}'", name_of_field);
return ReturnType(false);
}
}
@ -1704,7 +1704,7 @@ ReturnType skipJSONFieldImpl(ReadBuffer & buf, StringRef name_of_field, const Fo
else
{
if constexpr (throw_exception)
throw Exception(ErrorCodes::INCORRECT_DATA, "Unexpected symbol for key '{}'", name_of_field.toString());
throw Exception(ErrorCodes::INCORRECT_DATA, "Unexpected symbol for key '{}'", name_of_field);
return ReturnType(false);
}
@ -1713,7 +1713,7 @@ ReturnType skipJSONFieldImpl(ReadBuffer & buf, StringRef name_of_field, const Fo
if (buf.eof() || !(*buf.position() == ':'))
{
if constexpr (throw_exception)
throw Exception(ErrorCodes::INCORRECT_DATA, "Unexpected symbol for key '{}'", name_of_field.toString());
throw Exception(ErrorCodes::INCORRECT_DATA, "Unexpected symbol for key '{}'", name_of_field);
return ReturnType(false);
}
++buf.position();
@ -1737,7 +1737,7 @@ ReturnType skipJSONFieldImpl(ReadBuffer & buf, StringRef name_of_field, const Fo
if (buf.eof())
{
if constexpr (throw_exception)
throw Exception(ErrorCodes::INCORRECT_DATA, "Unexpected EOF for key '{}'", name_of_field.toString());
throw Exception(ErrorCodes::INCORRECT_DATA, "Unexpected EOF for key '{}'", name_of_field);
return ReturnType(false);
}
++buf.position();
@ -1750,7 +1750,7 @@ ReturnType skipJSONFieldImpl(ReadBuffer & buf, StringRef name_of_field, const Fo
"Cannot read JSON field here: '{}'. Unexpected symbol '{}'{}",
String(buf.position(), std::min(buf.available(), size_t(10))),
std::string(1, *buf.position()),
name_of_field.empty() ? "" : " for key " + name_of_field.toString());
name_of_field.empty() ? "" : " for key " + String{name_of_field});
return ReturnType(false);
}
@ -1758,12 +1758,12 @@ ReturnType skipJSONFieldImpl(ReadBuffer & buf, StringRef name_of_field, const Fo
return ReturnType(true);
}
void skipJSONField(ReadBuffer & buf, StringRef name_of_field, const FormatSettings::JSON & settings)
void skipJSONField(ReadBuffer & buf, std::string_view name_of_field, const FormatSettings::JSON & settings)
{
skipJSONFieldImpl<void>(buf, name_of_field, settings, 0);
}
bool trySkipJSONField(ReadBuffer & buf, StringRef name_of_field, const FormatSettings::JSON & settings)
bool trySkipJSONField(ReadBuffer & buf, std::string_view name_of_field, const FormatSettings::JSON & settings)
{
return skipJSONFieldImpl<bool>(buf, name_of_field, settings, 0);
}

View File

@ -1,12 +1,9 @@
#pragma once
#include <cmath>
#include <cstring>
#include <string>
#include <string_view>
#include <limits>
#include <algorithm>
#include <iterator>
#include <bit>
#include <span>
@ -18,14 +15,11 @@
#include <Common/LocalDate.h>
#include <Common/LocalDateTime.h>
#include <Common/transformEndianness.h>
#include <base/StringRef.h>
#include <base/arithmeticOverflow.h>
#include <base/sort.h>
#include <base/unit.h>
#include <Core/Types.h>
#include <Core/DecimalFunctions.h>
#include <Core/UUID.h>
#include <base/IPv4andIPv6.h>
#include <Common/Allocator.h>
@ -35,7 +29,6 @@
#include <Formats/FormatSettings.h>
#include <IO/CompressionMethod.h>
#include <IO/ReadBuffer.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/VarInt.h>
@ -1727,8 +1720,8 @@ inline void skipWhitespaceIfAny(ReadBuffer & buf, bool one_line = false)
}
/// Skips json value.
void skipJSONField(ReadBuffer & buf, StringRef name_of_field, const FormatSettings::JSON & settings);
bool trySkipJSONField(ReadBuffer & buf, StringRef name_of_field, const FormatSettings::JSON & settings);
void skipJSONField(ReadBuffer & buf, std::string_view name_of_field, const FormatSettings::JSON & settings);
bool trySkipJSONField(ReadBuffer & buf, std::string_view name_of_field, const FormatSettings::JSON & settings);
/** Read serialized exception.

View File

@ -163,6 +163,9 @@ PocoHTTPClient::PocoHTTPClient(const PocoHTTPClientConfiguration & client_config
, remote_host_filter(client_configuration.remote_host_filter)
, s3_max_redirects(client_configuration.s3_max_redirects)
, s3_use_adaptive_timeouts(client_configuration.s3_use_adaptive_timeouts)
, http_max_fields(client_configuration.http_max_fields)
, http_max_field_name_size(client_configuration.http_max_field_name_size)
, http_max_field_value_size(client_configuration.http_max_field_value_size)
, enable_s3_requests_logging(client_configuration.enable_s3_requests_logging)
, for_disk_s3(client_configuration.for_disk_s3)
, get_request_throttler(client_configuration.get_request_throttler)
@ -466,6 +469,9 @@ void PocoHTTPClient::makeRequestInternalImpl(
}
Poco::Net::HTTPResponse poco_response;
poco_response.setFieldLimit(static_cast<int>(http_max_fields));
poco_response.setNameLengthLimit(static_cast<int>(http_max_field_name_size));
poco_response.setValueLengthLimit(static_cast<int>(http_max_field_value_size));
Stopwatch watch;

View File

@ -57,6 +57,10 @@ struct PocoHTTPClientConfiguration : public Aws::Client::ClientConfiguration
size_t http_keep_alive_timeout = DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT;
size_t http_keep_alive_max_requests = DEFAULT_HTTP_KEEP_ALIVE_MAX_REQUEST;
UInt64 http_max_fields = 1000000;
UInt64 http_max_field_name_size = 128 * 1024;
UInt64 http_max_field_value_size = 128 * 1024;
std::function<void(const ProxyConfiguration &)> error_report;
void updateSchemeAndRegion();
@ -177,6 +181,9 @@ protected:
const RemoteHostFilter & remote_host_filter;
unsigned int s3_max_redirects = 0;
bool s3_use_adaptive_timeouts = true;
const UInt64 http_max_fields = 1000000;
const UInt64 http_max_field_name_size = 128 * 1024;
const UInt64 http_max_field_value_size = 128 * 1024;
bool enable_s3_requests_logging = false;
bool for_disk_s3 = false;

View File

@ -37,7 +37,10 @@ namespace ErrorCodes
DECLARE(Bool, check_objects_after_upload, S3::DEFAULT_CHECK_OBJECTS_AFTER_UPLOAD, "", 0) \
DECLARE(Bool, throw_on_zero_files_match, false, "", 0) \
DECLARE(UInt64, max_single_operation_copy_size, S3::DEFAULT_MAX_SINGLE_OPERATION_COPY_SIZE, "", 0) \
DECLARE(String, storage_class_name, "", "", 0)
DECLARE(String, storage_class_name, "", "", 0) \
DECLARE(UInt64, http_max_fields, 1000000, "", 0) \
DECLARE(UInt64, http_max_field_name_size, 128 * 1024, "", 0) \
DECLARE(UInt64, http_max_field_value_size, 128 * 1024, "", 0)
#define PART_UPLOAD_SETTINGS(DECLARE, ALIAS) \
DECLARE(UInt64, strict_upload_part_size, 0, "", 0) \

View File

@ -5,10 +5,6 @@
#include <IO/ReadBuffer.h>
#include <IO/WriteBuffer.h>
#include <istream>
#include <ostream>
namespace DB
{
@ -37,20 +33,6 @@ inline void writeVarUInt(UInt64 x, WriteBuffer & ostr)
++ostr.position();
}
inline void writeVarUInt(UInt64 x, std::ostream & ostr)
{
while (x > 0x7F)
{
uint8_t byte = 0x80 | (x & 0x7F);
ostr.put(byte);
x >>= 7;
}
uint8_t final_byte = static_cast<uint8_t>(x);
ostr.put(final_byte);
}
inline char * writeVarUInt(UInt64 x, char * ostr)
{
while (x > 0x7F)
@ -114,19 +96,6 @@ inline void readVarUInt(UInt64 & x, ReadBuffer & istr)
varint_impl::readVarUInt<true>(x, istr);
}
inline void readVarUInt(UInt64 & x, std::istream & istr)
{
x = 0;
for (size_t i = 0; i < 10; ++i)
{
UInt64 byte = istr.get();
x |= (byte & 0x7F) << (7 * i);
if (!(byte & 0x80))
return;
}
}
inline const char * readVarUInt(UInt64 & x, const char * istr, size_t size)
{
const char * end = istr + size;

View File

@ -19,9 +19,6 @@ target_link_libraries (valid_utf8_perf PRIVATE clickhouse_common_io clickhouse_c
clickhouse_add_executable (valid_utf8 valid_utf8.cpp)
target_link_libraries (valid_utf8 PRIVATE clickhouse_common_io clickhouse_common_config)
clickhouse_add_executable (var_uint var_uint.cpp)
target_link_libraries (var_uint PRIVATE clickhouse_common_io clickhouse_common_config)
clickhouse_add_executable (read_escaped_string read_escaped_string.cpp)
target_link_libraries (read_escaped_string PRIVATE clickhouse_common_io clickhouse_common_config)

View File

@ -1,3 +1,4 @@
#include <iostream>
#include <string>
#include <IO/parseDateTimeBestEffort.h>

View File

@ -1,58 +0,0 @@
#include <string>
#include <iostream>
#include <IO/VarInt.h>
#include <IO/WriteBufferFromString.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <Poco/HexBinaryEncoder.h>
int main(int argc, char ** argv)
{
if (argc != 2)
{
std::cerr << "Usage: " << std::endl
<< argv[0] << " unsigned_number" << std::endl;
return 1;
}
UInt64 x = DB::parse<UInt64>(argv[1]);
std::cout << std::hex << std::showbase << "Input: " << x << std::endl;
Poco::HexBinaryEncoder hex(std::cout);
std::cout << "writeVarUInt(std::ostream): 0x";
DB::writeVarUInt(x, hex);
std::cout << std::endl;
std::string s;
{
DB::WriteBufferFromString wb(s);
DB::writeVarUInt(x, wb);
wb.next();
}
std::cout << "writeVarUInt(WriteBuffer): 0x";
hex << s;
std::cout << std::endl;
s.clear();
s.resize(9);
s.resize(DB::writeVarUInt(x, s.data()) - s.data());
std::cout << "writeVarUInt(char *): 0x";
hex << s;
std::cout << std::endl;
UInt64 y = 0;
DB::ReadBufferFromString rb(s);
DB::readVarUInt(y, rb);
std::cerr << "Input: " << x << ", readVarUInt(writeVarUInt()): " << y << std::endl;
return 0;
}

View File

@ -469,7 +469,7 @@ void QueryCache::Writer::finalizeWrite()
Columns compressed_columns;
for (const auto & column : columns)
{
auto compressed_column = column->compress();
auto compressed_column = column->compress(/*force_compression=*/false);
compressed_columns.push_back(compressed_column);
}
Chunk compressed_chunk(compressed_columns, chunk.getNumRows());

View File

@ -1204,6 +1204,22 @@ namespace
source_ast->children.push_back(source_ast->elements);
dict.set(dict.source, source_ast);
}
ASTs * getEngineArgsFromCreateQuery(ASTCreateQuery & create_query)
{
ASTStorage * storage_def = create_query.storage;
if (!storage_def)
return nullptr;
if (!storage_def->engine)
return nullptr;
const ASTFunction & engine_def = *storage_def->engine;
if (!engine_def.arguments)
return nullptr;
return &engine_def.arguments->children;
}
}
void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
@ -1903,7 +1919,11 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
mode);
/// If schema wes inferred while storage creation, add columns description to create query.
addColumnsDescriptionToCreateQueryIfNecessary(query_ptr->as<ASTCreateQuery &>(), res);
auto & create_query = query_ptr->as<ASTCreateQuery &>();
addColumnsDescriptionToCreateQueryIfNecessary(create_query, res);
/// Add any inferred engine args if needed. For example, data format for engines File/S3/URL/etc
if (auto * engine_args = getEngineArgsFromCreateQuery(create_query))
res->addInferredEngineArgsToCreateQuery(*engine_args, getContext());
}
validateVirtualColumns(*res);

View File

@ -1,17 +1,17 @@
#include <Interpreters/TransactionLog.h>
#include <Interpreters/TransactionVersionMetadata.h>
#include <Interpreters/Context.h>
#include <Interpreters/TransactionsInfoLog.h>
#include <Core/ServerUUID.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadBufferFromString.h>
#include <Interpreters/Context.h>
#include <Interpreters/TransactionLog.h>
#include <Interpreters/TransactionVersionMetadata.h>
#include <Interpreters/TransactionsInfoLog.h>
#include <base/sort.h>
#include <Common/Exception.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/threadPoolCallbackRunner.h>
#include <Core/ServerUUID.h>
#include <Common/logger_useful.h>
#include <Common/noexcept_scope.h>
#include <Common/threadPoolCallbackRunner.h>
namespace DB
{

View File

@ -38,6 +38,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
extern const int TYPE_MISMATCH;
extern const int UNEXPECTED_DATA_AFTER_PARSED_VALUE;
extern const int UNKNOWN_ELEMENT_OF_ENUM;
}
@ -584,6 +585,8 @@ Field convertFieldToTypeImpl(const Field & src, const IDataType & type, const ID
throw;
}
if (col->empty())
return Field(Null());
Field parsed = (*col)[0];
return convertFieldToType(parsed, type, from_type_hint, format_settings);
}
@ -660,7 +663,19 @@ static bool decimalEqualsFloat(Field field, Float64 float_value)
std::optional<Field> convertFieldToTypeStrict(const Field & from_value, const IDataType & from_type, const IDataType & to_type, const FormatSettings & format_settings)
{
Field result_value = convertFieldToType(from_value, to_type, &from_type, format_settings);
//Field result_value = convertFieldToType(from_value, to_type, &from_type, format_settings);
Field result_value;
try
{
result_value = convertFieldToType(from_value, to_type, &from_type, format_settings);
}
catch (Exception & e)
{
if (isEnum(to_type) && e.code() == ErrorCodes::UNKNOWN_ELEMENT_OF_ENUM)
return {};
throw;
}
if (Field::isDecimal(from_value.getType()) && Field::isDecimal(result_value.getType()))
{

View File

@ -121,7 +121,7 @@ void JSONEachRowRowInputFormat::skipUnknownField(StringRef name_ref)
if (!format_settings.skip_unknown_fields)
throw Exception(ErrorCodes::INCORRECT_DATA, "Unknown field found while parsing JSONEachRow format: {}", name_ref.toString());
skipJSONField(*in, name_ref, format_settings.json);
skipJSONField(*in, std::string_view(name_ref.data, name_ref.size), format_settings.json);
}
void JSONEachRowRowInputFormat::readField(size_t index, MutableColumns & columns)

View File

@ -1,5 +1,6 @@
#include <Server/PrometheusRequestHandlerFactory.h>
#include <Core/Types_fwd.h>
#include <Server/HTTPHandlerFactory.h>
#include <Server/PrometheusMetricsWriter.h>
#include <Server/PrometheusRequestHandler.h>

View File

@ -287,6 +287,10 @@ public:
/// Returns hints for serialization of columns accorsing to statistics accumulated by storage.
virtual SerializationInfoByName getSerializationHints() const { return {}; }
/// Add engine args that were inferred during storage creation to create query to avoid the same
/// inference on server restart. For example - data format inference in File/URL/S3/etc engines.
virtual void addInferredEngineArgsToCreateQuery(ASTs & /*args*/, const ContextPtr & /*context*/) const {}
private:
StorageID storage_id;

View File

@ -81,8 +81,10 @@ protected:
/// avg_value_size_hints are used to reduce the number of reallocations when creating columns of variable size.
ValueSizeMap avg_value_size_hints;
/// Stores states for IDataType::deserializeBinaryBulk
/// Stores states for IDataType::deserializeBinaryBulk for regular columns.
DeserializeBinaryBulkStateMap deserialize_binary_bulk_state_map;
/// The same as above, but for subcolumns.
DeserializeBinaryBulkStateMap deserialize_binary_bulk_state_map_for_subcolumns;
/// Actual column names and types of columns in part,
/// which may differ from table metadata.

View File

@ -6054,7 +6054,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::loadPartRestoredFromBackup(cons
if (!retryable || (try_no + 1 == loading_parts_max_tries))
{
if (Exception * e = exception_cast<Exception *>(error))
e->addMessage("while restoring part {} of table {}", part->name, getStorageID());
e->addMessage("while restoring part {} of table {}", part_name, getStorageID());
std::rethrow_exception(error);
}

View File

@ -5,7 +5,7 @@
#include <Common/ThreadPool_fwd.h>
#include <atomic>
#include <future>
namespace DB
{

View File

@ -148,7 +148,9 @@ void MergeTreeReaderCompact::readData(
ColumnPtr & column,
size_t rows_to_read,
const InputStreamGetter & getter,
ISerialization::SubstreamsCache & cache)
ISerialization::SubstreamsCache & cache,
std::unordered_map<String, ColumnPtr> & columns_cache_for_subcolumns,
const ColumnNameLevel & name_level_for_offsets)
{
try
{
@ -171,17 +173,33 @@ void MergeTreeReaderCompact::readData(
const auto & type_in_storage = name_and_type.getTypeInStorage();
const auto & name_in_storage = name_and_type.getNameInStorage();
auto serialization = getSerializationInPart({name_in_storage, type_in_storage});
ColumnPtr temp_column = type_in_storage->createColumn(*serialization);
serialization->deserializeBinaryBulkWithMultipleStreams(temp_column, rows_to_read, deserialize_settings, deserialize_binary_bulk_state_map[name], nullptr);
auto subcolumn = type_in_storage->getSubcolumn(name_and_type.getSubcolumnName(), temp_column);
/// TODO: Avoid extra copying.
if (column->empty())
column = subcolumn;
auto cache_for_subcolumns_it = columns_cache_for_subcolumns.find(name_in_storage);
if (!name_level_for_offsets.has_value() && cache_for_subcolumns_it != columns_cache_for_subcolumns.end())
{
auto subcolumn = type_in_storage->getSubcolumn(name_and_type.getSubcolumnName(), cache_for_subcolumns_it->second);
/// TODO: Avoid extra copying.
if (column->empty())
column = IColumn::mutate(subcolumn);
else
column->assumeMutable()->insertRangeFrom(*subcolumn, 0, subcolumn->size());
}
else
column->assumeMutable()->insertRangeFrom(*subcolumn, 0, subcolumn->size());
{
auto serialization = getSerializationInPart({name_in_storage, type_in_storage});
ColumnPtr temp_column = type_in_storage->createColumn(*serialization);
serialization->deserializeBinaryBulkWithMultipleStreams(temp_column, rows_to_read, deserialize_settings, deserialize_binary_bulk_state_map_for_subcolumns[name_in_storage], nullptr);
auto subcolumn = type_in_storage->getSubcolumn(name_and_type.getSubcolumnName(), temp_column);
/// TODO: Avoid extra copying.
if (column->empty())
column = subcolumn;
else
column->assumeMutable()->insertRangeFrom(*subcolumn, 0, subcolumn->size());
if (!name_level_for_offsets.has_value())
columns_cache_for_subcolumns[name_in_storage] = temp_column;
}
}
else
{
@ -227,15 +245,23 @@ void MergeTreeReaderCompact::readPrefix(
serialization_for_prefix->deserializeBinaryBulkStatePrefix(deserialize_settings, state_for_prefix, nullptr);
}
SerializationPtr serialization;
if (name_and_type.isSubcolumn())
serialization = getSerializationInPart({name_and_type.getNameInStorage(), name_and_type.getTypeInStorage()});
else
serialization = getSerializationInPart(name_and_type);
deserialize_settings.getter = buffer_getter;
deserialize_settings.object_and_dynamic_read_statistics = true;
serialization->deserializeBinaryBulkStatePrefix(deserialize_settings, deserialize_binary_bulk_state_map[name_and_type.name], nullptr);
if (name_and_type.isSubcolumn())
{
/// For subcolumns of the same column we need to deserialize prefix only once.
if (deserialize_binary_bulk_state_map_for_subcolumns.contains(name_and_type.getNameInStorage()))
return;
auto serialization = getSerializationInPart({name_and_type.getNameInStorage(), name_and_type.getTypeInStorage()});
serialization->deserializeBinaryBulkStatePrefix(deserialize_settings, deserialize_binary_bulk_state_map_for_subcolumns[name_and_type.getNameInStorage()], nullptr);
}
else
{
auto serialization = getSerializationInPart(name_and_type);
serialization->deserializeBinaryBulkStatePrefix(deserialize_settings, deserialize_binary_bulk_state_map[name_and_type.getNameInStorage()], nullptr);
}
}
catch (Exception & e)
{

View File

@ -45,7 +45,9 @@ protected:
ColumnPtr & column,
size_t rows_to_read,
const InputStreamGetter & getter,
ISerialization::SubstreamsCache & cache);
ISerialization::SubstreamsCache & cache,
std::unordered_map<String, ColumnPtr> & columns_cache_for_subcolumns,
const ColumnNameLevel & name_level_for_offsets);
void readPrefix(
const NameAndTypePair & name_and_type,

View File

@ -25,10 +25,18 @@ try
while (read_rows < max_rows_to_read)
{
size_t rows_to_read = data_part_info_for_read->getIndexGranularity().getMarkRows(from_mark);
deserialize_binary_bulk_state_map.clear();
deserialize_binary_bulk_state_map_for_subcolumns.clear();
/// Use cache to avoid reading the column with the same name twice.
/// It may happen if there are empty array Nested in the part.
ISerialization::SubstreamsCache cache;
/// If we need to read multiple subcolumns from a single column in storage,
/// we will read it this column only once and then reuse to extract all subcolumns.
/// We cannot use SubstreamsCache for it, because we may also read the full column itself
/// and it might me not empty inside res_columns (and SubstreamsCache contains the whole columns).
/// TODO: refactor the code in a way when we first read all full columns and then extract all subcolumns from them.
std::unordered_map<String, ColumnPtr> columns_cache_for_subcolumns;
for (size_t pos = 0; pos < num_columns; ++pos)
{
@ -56,7 +64,7 @@ try
};
readPrefix(columns_to_read[pos], buffer_getter, buffer_getter_for_prefix, columns_for_offsets[pos]);
readData(columns_to_read[pos], column, rows_to_read, buffer_getter, cache);
readData(columns_to_read[pos], column, rows_to_read, buffer_getter, cache, columns_cache_for_subcolumns, columns_for_offsets[pos]);
}
++from_mark;

View File

@ -283,7 +283,7 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context,
}
void StorageAzureConfiguration::addStructureAndFormatToArgsIfNeeded(
ASTs & args, const String & structure_, const String & format_, ContextPtr context)
ASTs & args, const String & structure_, const String & format_, ContextPtr context, bool with_structure)
{
if (auto collection = tryGetNamedCollectionWithOverrides(args, context))
{
@ -295,7 +295,7 @@ void StorageAzureConfiguration::addStructureAndFormatToArgsIfNeeded(
auto format_equal_func = makeASTFunction("equals", std::move(format_equal_func_args));
args.push_back(format_equal_func);
}
if (collection->getOrDefault<String>("structure", "auto") == "auto")
if (with_structure && collection->getOrDefault<String>("structure", "auto") == "auto")
{
ASTs structure_equal_func_args = {std::make_shared<ASTIdentifier>("structure"), std::make_shared<ASTLiteral>(structure_)};
auto structure_equal_func = makeASTFunction("equals", std::move(structure_equal_func_args));
@ -319,9 +319,12 @@ void StorageAzureConfiguration::addStructureAndFormatToArgsIfNeeded(
if (args.size() == 3)
{
args.push_back(format_literal);
/// Add compression = "auto" before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(structure_literal);
if (with_structure)
{
/// Add compression = "auto" before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(structure_literal);
}
}
/// (connection_string, container_name, blobpath, structure) or
/// (connection_string, container_name, blobpath, format)
@ -334,12 +337,15 @@ void StorageAzureConfiguration::addStructureAndFormatToArgsIfNeeded(
{
if (fourth_arg == "auto")
args[3] = format_literal;
/// Add compression=auto before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(structure_literal);
if (with_structure)
{
/// Add compression=auto before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(structure_literal);
}
}
/// (..., structure) -> (..., format, compression, structure)
else
else if (with_structure)
{
auto structure_arg = args.back();
args[3] = format_literal;
@ -362,15 +368,19 @@ void StorageAzureConfiguration::addStructureAndFormatToArgsIfNeeded(
{
if (fourth_arg == "auto")
args[3] = format_literal;
args.push_back(structure_literal);
if (with_structure)
args.push_back(structure_literal);
}
/// (..., account_name, account_key) -> (..., account_name, account_key, format, compression, structure)
else
{
args.push_back(format_literal);
/// Add compression=auto before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(structure_literal);
if (with_structure)
{
/// Add compression=auto before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(structure_literal);
}
}
}
/// (connection_string, container_name, blobpath, format, compression, structure) or
@ -386,7 +396,7 @@ void StorageAzureConfiguration::addStructureAndFormatToArgsIfNeeded(
{
if (fourth_arg == "auto")
args[3] = format_literal;
if (checkAndGetLiteralArgument<String>(args[5], "structure") == "auto")
if (with_structure && checkAndGetLiteralArgument<String>(args[5], "structure") == "auto")
args[5] = structure_literal;
}
/// (..., account_name, account_key, format) -> (..., account_name, account_key, format, compression, structure)
@ -394,12 +404,15 @@ void StorageAzureConfiguration::addStructureAndFormatToArgsIfNeeded(
{
if (sixth_arg == "auto")
args[5] = format_literal;
/// Add compression=auto before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(structure_literal);
if (with_structure)
{
/// Add compression=auto before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(structure_literal);
}
}
/// (..., account_name, account_key, structure) -> (..., account_name, account_key, format, compression, structure)
else
else if (with_structure)
{
auto structure_arg = args.back();
args[5] = format_literal;
@ -417,14 +430,15 @@ void StorageAzureConfiguration::addStructureAndFormatToArgsIfNeeded(
/// (..., format, compression) -> (..., format, compression, structure)
if (checkAndGetLiteralArgument<String>(args[5], "format") == "auto")
args[5] = format_literal;
args.push_back(structure_literal);
if (with_structure)
args.push_back(structure_literal);
}
/// (storage_account_url, container_name, blobpath, account_name, account_key, format, compression, structure)
else if (args.size() == 8)
{
if (checkAndGetLiteralArgument<String>(args[5], "format") == "auto")
args[5] = format_literal;
if (checkAndGetLiteralArgument<String>(args[7], "structure") == "auto")
if (with_structure && checkAndGetLiteralArgument<String>(args[7], "structure") == "auto")
args[7] = structure_literal;
}
}

View File

@ -76,7 +76,8 @@ public:
ASTs & args,
const String & structure_,
const String & format_,
ContextPtr context) override;
ContextPtr context,
bool with_structure) override;
protected:
void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override;

View File

@ -6,15 +6,12 @@
#include <string>
#include <memory>
#include <hdfs/hdfs.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <base/types.h>
#include <IO/ReadBuffer.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/SeekableReadBuffer.h>
#include <Storages/ObjectStorage/HDFS/ReadBufferFromHDFS.h>
#include <Interpreters/Context.h>
# include <IO/AsynchronousReader.h>
# include <IO/BufferWithOwnMemory.h>
# include <IO/SeekableReadBuffer.h>
# include <Interpreters/Context.h>
# include <Storages/ObjectStorage/HDFS/ReadBufferFromHDFS.h>
# include <base/types.h>
namespace DB
{

View File

@ -174,7 +174,8 @@ void StorageHDFSConfiguration::addStructureAndFormatToArgsIfNeeded(
ASTs & args,
const String & structure_,
const String & format_,
ContextPtr context)
ContextPtr context,
bool with_structure)
{
if (auto collection = tryGetNamedCollectionWithOverrides(args, context))
{
@ -186,7 +187,7 @@ void StorageHDFSConfiguration::addStructureAndFormatToArgsIfNeeded(
auto format_equal_func = makeASTFunction("equals", std::move(format_equal_func_args));
args.push_back(format_equal_func);
}
if (collection->getOrDefault<String>("structure", "auto") == "auto")
if (with_structure && collection->getOrDefault<String>("structure", "auto") == "auto")
{
ASTs structure_equal_func_args = {std::make_shared<ASTIdentifier>("structure"), std::make_shared<ASTLiteral>(structure_)};
auto structure_equal_func = makeASTFunction("equals", std::move(structure_equal_func_args));
@ -209,23 +210,26 @@ void StorageHDFSConfiguration::addStructureAndFormatToArgsIfNeeded(
if (count == 1)
{
/// Add format=auto before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(structure_literal);
args.push_back(format_literal);
if (with_structure)
args.push_back(structure_literal);
}
/// hdfs(url, format)
else if (count == 2)
{
if (checkAndGetLiteralArgument<String>(args[1], "format") == "auto")
args.back() = format_literal;
args.push_back(structure_literal);
if (with_structure)
args.push_back(structure_literal);
}
/// hdfs(url, format, structure)
/// hdfs(url, format, structure, compression_method)
/// hdfs(url, format, compression_method)
else if (count >= 3)
{
if (checkAndGetLiteralArgument<String>(args[1], "format") == "auto")
args[1] = format_literal;
if (checkAndGetLiteralArgument<String>(args[2], "structure") == "auto")
if (with_structure && checkAndGetLiteralArgument<String>(args[2], "structure") == "auto")
args[2] = structure_literal;
}
}

View File

@ -62,7 +62,8 @@ public:
ASTs & args,
const String & structure_,
const String & format_,
ContextPtr context) override;
ContextPtr context,
bool with_structure) override;
private:
void fromNamedCollection(const NamedCollection &, ContextPtr context) override;

View File

@ -59,7 +59,7 @@ public:
ObjectStoragePtr createObjectStorage(ContextPtr, bool) override { return std::make_shared<LocalObjectStorage>("/"); }
void addStructureAndFormatToArgsIfNeeded(ASTs &, const String &, const String &, ContextPtr) override { }
void addStructureAndFormatToArgsIfNeeded(ASTs &, const String &, const String &, ContextPtr, bool) override { }
private:
void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override;

Some files were not shown because too many files have changed in this diff Show More