mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-26 01:22:04 +00:00
Merge branch 'master' into 45486_Fix_flaky_test_for_disallowing_concurrent_backups_restores
This commit is contained in:
commit
d387835774
33
.github/workflows/release.yml
vendored
33
.github/workflows/release.yml
vendored
@ -12,38 +12,9 @@ jobs:
|
||||
ReleasePublish:
|
||||
runs-on: [self-hosted, style-checker]
|
||||
steps:
|
||||
- name: Set envs
|
||||
- name: Deploy packages and assets
|
||||
run: |
|
||||
cat >> "$GITHUB_ENV" << 'EOF'
|
||||
JFROG_API_KEY=${{ secrets.JFROG_ARTIFACTORY_API_KEY }}
|
||||
TEMP_PATH=${{runner.temp}}/release_packages
|
||||
REPO_COPY=${{runner.temp}}/release_packages/ClickHouse
|
||||
EOF
|
||||
- name: Check out repository code
|
||||
uses: ClickHouse/checkout@v1
|
||||
with:
|
||||
# Always use the most recent script version
|
||||
ref: master
|
||||
- name: Download packages and push to Artifactory
|
||||
run: |
|
||||
rm -rf "$TEMP_PATH" && mkdir -p "$TEMP_PATH"
|
||||
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
|
||||
cd "$REPO_COPY"
|
||||
# Download and push packages to artifactory
|
||||
python3 ./tests/ci/push_to_artifactory.py --release '${{ github.ref }}' \
|
||||
--commit '${{ github.sha }}' --artifactory-url '${{ secrets.JFROG_ARTIFACTORY_URL }}' --all
|
||||
# Download macos binaries to ${{runner.temp}}/download_binary
|
||||
python3 ./tests/ci/download_binary.py --version '${{ github.ref }}' \
|
||||
--commit '${{ github.sha }}' binary_darwin binary_darwin_aarch64
|
||||
mv '${{runner.temp}}/download_binary/'clickhouse-* '${{runner.temp}}/push_to_artifactory'
|
||||
- name: Upload packages to release assets
|
||||
uses: svenstaro/upload-release-action@v2
|
||||
with:
|
||||
repo_token: ${{ secrets.GITHUB_TOKEN }}
|
||||
file: ${{runner.temp}}/push_to_artifactory/*
|
||||
overwrite: true
|
||||
tag: ${{ github.ref }}
|
||||
file_glob: true
|
||||
curl '${{ secrets.PACKAGES_RELEASE_URL }}/release/${{ github.ref }}?binary=binary_darwin&binary=binary_darwin_aarch64&sync=true' -d ''
|
||||
############################################################################################
|
||||
##################################### Docker images #######################################
|
||||
############################################################################################
|
||||
|
@ -1714,7 +1714,7 @@ something_weird{problem="division by zero"} +Inf -3982045
|
||||
|
||||
## Protobuf {#protobuf}
|
||||
|
||||
Protobuf - is a [Protocol Buffers](https://developers.google.com/protocol-buffers/) format.
|
||||
Protobuf - is a [Protocol Buffers](https://protobuf.dev/) format.
|
||||
|
||||
This format requires an external format schema. The schema is cached between queries.
|
||||
ClickHouse supports both `proto2` and `proto3` syntaxes. Repeated/optional/required fields are supported.
|
||||
|
@ -23,7 +23,9 @@ FROM table2
|
||||
```
|
||||
The condition could be any expression based on your requirements.
|
||||
|
||||
**Examples**
|
||||
## Examples
|
||||
|
||||
Here is a simple example that returns the numbers 1 to 10 that are _not_ a part of the numbers 3 to 8:
|
||||
|
||||
Query:
|
||||
|
||||
@ -33,7 +35,7 @@ SELECT number FROM numbers(1,10) EXCEPT SELECT number FROM numbers(3,6);
|
||||
|
||||
Result:
|
||||
|
||||
``` text
|
||||
```response
|
||||
┌─number─┐
|
||||
│ 1 │
|
||||
│ 2 │
|
||||
@ -42,28 +44,109 @@ Result:
|
||||
└────────┘
|
||||
```
|
||||
|
||||
Query:
|
||||
`EXCEPT` and `INTERSECT` can often be used interchangeably with different Boolean logic, and they are both useful if you have two tables that share a common column (or columns). For example, suppose we have a few million rows of historical cryptocurrency data that contains trade prices and volume:
|
||||
|
||||
``` sql
|
||||
CREATE TABLE t1(one String, two String, three String) ENGINE=Memory();
|
||||
CREATE TABLE t2(four String, five String, six String) ENGINE=Memory();
|
||||
```sql
|
||||
CREATE TABLE crypto_prices
|
||||
(
|
||||
trade_date Date,
|
||||
crypto_name String,
|
||||
volume Float32,
|
||||
price Float32,
|
||||
market_cap Float32,
|
||||
change_1_day Float32
|
||||
)
|
||||
ENGINE = MergeTree
|
||||
PRIMARY KEY (crypto_name, trade_date);
|
||||
|
||||
INSERT INTO t1 VALUES ('q', 'm', 'b'), ('s', 'd', 'f'), ('l', 'p', 'o'), ('s', 'd', 'f'), ('s', 'd', 'f'), ('k', 't', 'd'), ('l', 'p', 'o');
|
||||
INSERT INTO t2 VALUES ('q', 'm', 'b'), ('b', 'd', 'k'), ('s', 'y', 't'), ('s', 'd', 'f'), ('m', 'f', 'o'), ('k', 'k', 'd');
|
||||
INSERT INTO crypto_prices
|
||||
SELECT *
|
||||
FROM s3(
|
||||
'https://learn-clickhouse.s3.us-east-2.amazonaws.com/crypto_prices.csv',
|
||||
'CSVWithNames'
|
||||
);
|
||||
|
||||
SELECT * FROM t1 EXCEPT SELECT * FROM t2;
|
||||
SELECT * FROM crypto_prices
|
||||
WHERE crypto_name = 'Bitcoin'
|
||||
ORDER BY trade_date DESC
|
||||
LIMIT 10;
|
||||
```
|
||||
|
||||
```response
|
||||
┌─trade_date─┬─crypto_name─┬──────volume─┬────price─┬───market_cap─┬──change_1_day─┐
|
||||
│ 2020-11-02 │ Bitcoin │ 30771456000 │ 13550.49 │ 251119860000 │ -0.013585099 │
|
||||
│ 2020-11-01 │ Bitcoin │ 24453857000 │ 13737.11 │ 254569760000 │ -0.0031840964 │
|
||||
│ 2020-10-31 │ Bitcoin │ 30306464000 │ 13780.99 │ 255372070000 │ 0.017308505 │
|
||||
│ 2020-10-30 │ Bitcoin │ 30581486000 │ 13546.52 │ 251018150000 │ 0.008084608 │
|
||||
│ 2020-10-29 │ Bitcoin │ 56499500000 │ 13437.88 │ 248995320000 │ 0.012552661 │
|
||||
│ 2020-10-28 │ Bitcoin │ 35867320000 │ 13271.29 │ 245899820000 │ -0.02804481 │
|
||||
│ 2020-10-27 │ Bitcoin │ 33749879000 │ 13654.22 │ 252985950000 │ 0.04427984 │
|
||||
│ 2020-10-26 │ Bitcoin │ 29461459000 │ 13075.25 │ 242251000000 │ 0.0033826586 │
|
||||
│ 2020-10-25 │ Bitcoin │ 24406921000 │ 13031.17 │ 241425220000 │ -0.0058658565 │
|
||||
│ 2020-10-24 │ Bitcoin │ 24542319000 │ 13108.06 │ 242839880000 │ 0.013650347 │
|
||||
└────────────┴─────────────┴─────────────┴──────────┴──────────────┴───────────────┘
|
||||
```
|
||||
|
||||
Now suppose we have a table named `holdings` that contains a list of cryptocurrencies that we own, along with the number of coins:
|
||||
|
||||
```sql
|
||||
CREATE TABLE holdings
|
||||
(
|
||||
crypto_name String,
|
||||
quantity UInt64
|
||||
)
|
||||
ENGINE = MergeTree
|
||||
PRIMARY KEY (crypto_name);
|
||||
|
||||
INSERT INTO holdings VALUES
|
||||
('Bitcoin', 1000),
|
||||
('Bitcoin', 200),
|
||||
('Ethereum', 250),
|
||||
('Ethereum', 5000),
|
||||
('DOGEFI', 10);
|
||||
('Bitcoin Diamond', 5000);
|
||||
```
|
||||
|
||||
We can use `EXCEPT` to answer a question like **"Which coins do we own have never traded below $10?"**:
|
||||
|
||||
```sql
|
||||
SELECT crypto_name FROM holdings
|
||||
EXCEPT
|
||||
SELECT crypto_name FROM crypto_prices
|
||||
WHERE price < 10;
|
||||
```
|
||||
|
||||
Result:
|
||||
|
||||
``` text
|
||||
┌─one─┬─two─┬─three─┐
|
||||
│ l │ p │ o │
|
||||
│ k │ t │ d │
|
||||
│ l │ p │ o │
|
||||
└─────┴─────┴───────┘
|
||||
```response
|
||||
┌─crypto_name─┐
|
||||
│ Bitcoin │
|
||||
│ Bitcoin │
|
||||
└─────────────┘
|
||||
```
|
||||
|
||||
This means of the four cryptocurrencies we own, only Bitcoin has never dropped below $10 (based on the limited data we have here in this example).
|
||||
|
||||
## EXCEPT DISTINCT
|
||||
|
||||
Notice in the previous query we had multiple Bitcoin holdings in the result. You can add `DISTINCT` to `EXCEPT` to eliminate duplicate rows from the result:
|
||||
|
||||
```sql
|
||||
SELECT crypto_name FROM holdings
|
||||
EXCEPT DISTINCT
|
||||
SELECT crypto_name FROM crypto_prices
|
||||
WHERE price < 10;
|
||||
```
|
||||
|
||||
Result:
|
||||
|
||||
```response
|
||||
┌─crypto_name─┐
|
||||
│ Bitcoin │
|
||||
└─────────────┘
|
||||
```
|
||||
|
||||
|
||||
**See Also**
|
||||
|
||||
- [UNION](union.md#union-clause)
|
||||
|
@ -24,17 +24,17 @@ FROM table2
|
||||
```
|
||||
The condition could be any expression based on your requirements.
|
||||
|
||||
**Examples**
|
||||
## Examples
|
||||
|
||||
Query:
|
||||
Here is a simple example that intersects the numbers 1 to 10 with the numbers 3 to 8:
|
||||
|
||||
``` sql
|
||||
```sql
|
||||
SELECT number FROM numbers(1,10) INTERSECT SELECT number FROM numbers(3,6);
|
||||
```
|
||||
|
||||
Result:
|
||||
|
||||
``` text
|
||||
```response
|
||||
┌─number─┐
|
||||
│ 3 │
|
||||
│ 4 │
|
||||
@ -45,29 +45,112 @@ Result:
|
||||
└────────┘
|
||||
```
|
||||
|
||||
Query:
|
||||
`INTERSECT` is useful if you have two tables that share a common column (or columns). You can intersect the results of two queries, as long as the results contain the same columns. For example, suppose we have a few million rows of historical cryptocurrency data that contains trade prices and volume:
|
||||
|
||||
``` sql
|
||||
CREATE TABLE t1(one String, two String, three String) ENGINE=Memory();
|
||||
CREATE TABLE t2(four String, five String, six String) ENGINE=Memory();
|
||||
```sql
|
||||
CREATE TABLE crypto_prices
|
||||
(
|
||||
trade_date Date,
|
||||
crypto_name String,
|
||||
volume Float32,
|
||||
price Float32,
|
||||
market_cap Float32,
|
||||
change_1_day Float32
|
||||
)
|
||||
ENGINE = MergeTree
|
||||
PRIMARY KEY (crypto_name, trade_date);
|
||||
|
||||
INSERT INTO t1 VALUES ('q', 'm', 'b'), ('s', 'd', 'f'), ('l', 'p', 'o'), ('s', 'd', 'f'), ('s', 'd', 'f'), ('k', 't', 'd'), ('l', 'p', 'o');
|
||||
INSERT INTO t2 VALUES ('q', 'm', 'b'), ('b', 'd', 'k'), ('s', 'y', 't'), ('s', 'd', 'f'), ('m', 'f', 'o'), ('k', 'k', 'd');
|
||||
INSERT INTO crypto_prices
|
||||
SELECT *
|
||||
FROM s3(
|
||||
'https://learn-clickhouse.s3.us-east-2.amazonaws.com/crypto_prices.csv',
|
||||
'CSVWithNames'
|
||||
);
|
||||
|
||||
SELECT * FROM t1 INTERSECT SELECT * FROM t2;
|
||||
SELECT * FROM crypto_prices
|
||||
WHERE crypto_name = 'Bitcoin'
|
||||
ORDER BY trade_date DESC
|
||||
LIMIT 10;
|
||||
```
|
||||
|
||||
```response
|
||||
┌─trade_date─┬─crypto_name─┬──────volume─┬────price─┬───market_cap─┬──change_1_day─┐
|
||||
│ 2020-11-02 │ Bitcoin │ 30771456000 │ 13550.49 │ 251119860000 │ -0.013585099 │
|
||||
│ 2020-11-01 │ Bitcoin │ 24453857000 │ 13737.11 │ 254569760000 │ -0.0031840964 │
|
||||
│ 2020-10-31 │ Bitcoin │ 30306464000 │ 13780.99 │ 255372070000 │ 0.017308505 │
|
||||
│ 2020-10-30 │ Bitcoin │ 30581486000 │ 13546.52 │ 251018150000 │ 0.008084608 │
|
||||
│ 2020-10-29 │ Bitcoin │ 56499500000 │ 13437.88 │ 248995320000 │ 0.012552661 │
|
||||
│ 2020-10-28 │ Bitcoin │ 35867320000 │ 13271.29 │ 245899820000 │ -0.02804481 │
|
||||
│ 2020-10-27 │ Bitcoin │ 33749879000 │ 13654.22 │ 252985950000 │ 0.04427984 │
|
||||
│ 2020-10-26 │ Bitcoin │ 29461459000 │ 13075.25 │ 242251000000 │ 0.0033826586 │
|
||||
│ 2020-10-25 │ Bitcoin │ 24406921000 │ 13031.17 │ 241425220000 │ -0.0058658565 │
|
||||
│ 2020-10-24 │ Bitcoin │ 24542319000 │ 13108.06 │ 242839880000 │ 0.013650347 │
|
||||
└────────────┴─────────────┴─────────────┴──────────┴──────────────┴───────────────┘
|
||||
```
|
||||
|
||||
Now suppose we have a table named `holdings` that contains a list of cryptocurrencies that we own, along with the number of coins:
|
||||
|
||||
```sql
|
||||
CREATE TABLE holdings
|
||||
(
|
||||
crypto_name String,
|
||||
quantity UInt64
|
||||
)
|
||||
ENGINE = MergeTree
|
||||
PRIMARY KEY (crypto_name);
|
||||
|
||||
INSERT INTO holdings VALUES
|
||||
('Bitcoin', 1000),
|
||||
('Bitcoin', 200),
|
||||
('Ethereum', 250),
|
||||
('Ethereum', 5000),
|
||||
('DOGEFI', 10);
|
||||
('Bitcoin Diamond', 5000);
|
||||
```
|
||||
|
||||
We can use `INTERSECT` to answer questions like **"Which coins do we own have traded at a price greater than $100?"**:
|
||||
|
||||
```sql
|
||||
SELECT crypto_name FROM holdings
|
||||
INTERSECT
|
||||
SELECT crypto_name FROM crypto_prices
|
||||
WHERE price > 100
|
||||
```
|
||||
|
||||
Result:
|
||||
|
||||
``` text
|
||||
┌─one─┬─two─┬─three─┐
|
||||
│ q │ m │ b │
|
||||
│ s │ d │ f │
|
||||
│ s │ d │ f │
|
||||
│ s │ d │ f │
|
||||
└─────┴─────┴───────┘
|
||||
```response
|
||||
┌─crypto_name─┐
|
||||
│ Bitcoin │
|
||||
│ Bitcoin │
|
||||
│ Ethereum │
|
||||
│ Ethereum │
|
||||
└─────────────┘
|
||||
```
|
||||
|
||||
This means at some point in time, Bitcoin and Ethereum traded above $100, and DOGEFI and Bitcoin Diamond have never traded above $100 (at least using the data we have here in this example).
|
||||
|
||||
## INTERSECT DISTINCT
|
||||
|
||||
Notice in the previous query we had multiple Bitcoin and Ethereum holdings that traded above $100. It might be nice to remove duplicate rows (since they only repeat what we already know). You can add `DISTINCT` to `INTERSECT` to eliminate duplicate rows from the result:
|
||||
|
||||
```sql
|
||||
SELECT crypto_name FROM holdings
|
||||
INTERSECT DISTINCT
|
||||
SELECT crypto_name FROM crypto_prices
|
||||
WHERE price > 100;
|
||||
```
|
||||
|
||||
Result:
|
||||
|
||||
```response
|
||||
┌─crypto_name─┐
|
||||
│ Bitcoin │
|
||||
│ Ethereum │
|
||||
└─────────────┘
|
||||
```
|
||||
|
||||
|
||||
**See Also**
|
||||
|
||||
- [UNION](union.md#union-clause)
|
||||
|
@ -1464,7 +1464,7 @@ try
|
||||
size_t max_cache_size = static_cast<size_t>(memory_amount * cache_size_to_ram_max_ratio);
|
||||
|
||||
/// Size of cache for uncompressed blocks. Zero means disabled.
|
||||
String uncompressed_cache_policy = config().getString("uncompressed_cache_policy", "");
|
||||
String uncompressed_cache_policy = config().getString("uncompressed_cache_policy", "SLRU");
|
||||
LOG_INFO(log, "Uncompressed cache policy name {}", uncompressed_cache_policy);
|
||||
size_t uncompressed_cache_size = config().getUInt64("uncompressed_cache_size", 0);
|
||||
if (uncompressed_cache_size > max_cache_size)
|
||||
@ -1490,7 +1490,7 @@ try
|
||||
|
||||
/// Size of cache for marks (index of MergeTree family of tables).
|
||||
size_t mark_cache_size = config().getUInt64("mark_cache_size", 5368709120);
|
||||
String mark_cache_policy = config().getString("mark_cache_policy", "");
|
||||
String mark_cache_policy = config().getString("mark_cache_policy", "SLRU");
|
||||
if (!mark_cache_size)
|
||||
LOG_ERROR(log, "Too low mark cache size will lead to severe performance degradation.");
|
||||
if (mark_cache_size > max_cache_size)
|
||||
|
@ -32,7 +32,7 @@ public:
|
||||
}
|
||||
void deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override
|
||||
{
|
||||
deserializeText(column, istr, settings, true);
|
||||
deserializeText(column, istr, settings, false);
|
||||
}
|
||||
void serializeTextQuoted(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const override
|
||||
{
|
||||
@ -74,14 +74,11 @@ public:
|
||||
serializeText(column, row_num, ostr, settings);
|
||||
writeChar('"', ostr);
|
||||
}
|
||||
void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override
|
||||
void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings &/* settings*/) const override
|
||||
{
|
||||
IPv value;
|
||||
readCSV(value, istr);
|
||||
|
||||
if (!istr.eof())
|
||||
throwUnexpectedDataAfterParsedValue(column, istr, settings, TypeName<IPv>.data());
|
||||
|
||||
assert_cast<ColumnVector<IPv> &>(column).getData().push_back(value);
|
||||
}
|
||||
|
||||
|
@ -379,14 +379,14 @@ void DatabaseOnDisk::renameTable(
|
||||
if (dictionary && table && !table->isDictionary())
|
||||
throw Exception(ErrorCodes::INCORRECT_QUERY, "Use RENAME/EXCHANGE TABLE (instead of RENAME/EXCHANGE DICTIONARY) for tables");
|
||||
|
||||
table_lock = table->lockExclusively(
|
||||
local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout);
|
||||
|
||||
detachTable(local_context, table_name);
|
||||
|
||||
UUID prev_uuid = UUIDHelpers::Nil;
|
||||
try
|
||||
{
|
||||
table_lock = table->lockExclusively(
|
||||
local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout);
|
||||
|
||||
table_metadata_path = getObjectMetadataPath(table_name);
|
||||
attach_query = parseQueryFromMetadata(log, local_context, table_metadata_path);
|
||||
auto & create = attach_query->as<ASTCreateQuery &>();
|
||||
|
@ -236,7 +236,6 @@ StoragePtr DatabaseWithOwnTablesBase::detachTableUnlocked(const String & table_n
|
||||
backQuote(database_name), backQuote(table_name));
|
||||
res = it->second;
|
||||
tables.erase(it);
|
||||
res->is_detached = true;
|
||||
|
||||
auto table_id = res->getStorageID();
|
||||
if (table_id.hasUUID())
|
||||
@ -273,10 +272,6 @@ void DatabaseWithOwnTablesBase::attachTableUnlocked(const String & table_name, c
|
||||
DatabaseCatalog::instance().removeUUIDMapping(table_id.uuid);
|
||||
throw Exception(ErrorCodes::TABLE_ALREADY_EXISTS, "Table {} already exists.", table_id.getFullTableName());
|
||||
}
|
||||
|
||||
/// It is important to reset is_detached here since in case of RENAME in
|
||||
/// non-Atomic database the is_detached is set to true before RENAME.
|
||||
table->is_detached = false;
|
||||
}
|
||||
|
||||
void DatabaseWithOwnTablesBase::shutdown()
|
||||
|
@ -448,7 +448,7 @@ void DatabaseMySQL::detachTablePermanently(ContextPtr, const String & table_name
|
||||
remove_or_detach_tables.erase(table_name);
|
||||
throw;
|
||||
}
|
||||
table_iter->second.second->is_detached = true;
|
||||
table_iter->second.second->is_dropped = true;
|
||||
}
|
||||
|
||||
void DatabaseMySQL::dropTable(ContextPtr local_context, const String & table_name, bool /*sync*/)
|
||||
|
@ -24,6 +24,7 @@
|
||||
#include <DataTypes/DataTypeInterval.h>
|
||||
#include <DataTypes/DataTypeTuple.h>
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
#include <DataTypes/DataTypeIPv4andIPv6.h>
|
||||
#include <DataTypes/DataTypesDecimal.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <DataTypes/Native.h>
|
||||
@ -107,6 +108,9 @@ template <typename DataType> constexpr bool IsDateOrDateTime = false;
|
||||
template <> inline constexpr bool IsDateOrDateTime<DataTypeDate> = true;
|
||||
template <> inline constexpr bool IsDateOrDateTime<DataTypeDateTime> = true;
|
||||
|
||||
template <typename DataType> constexpr bool IsIPv4 = false;
|
||||
template <> inline constexpr bool IsIPv4<DataTypeIPv4> = true;
|
||||
|
||||
template <typename T0, typename T1> constexpr bool UseLeftDecimal = false;
|
||||
template <> inline constexpr bool UseLeftDecimal<DataTypeDecimal<Decimal256>, DataTypeDecimal<Decimal128>> = true;
|
||||
template <> inline constexpr bool UseLeftDecimal<DataTypeDecimal<Decimal256>, DataTypeDecimal<Decimal64>> = true;
|
||||
@ -1210,6 +1214,17 @@ public:
|
||||
return arguments[0];
|
||||
}
|
||||
|
||||
/// Special case - one or both arguments are IPv4
|
||||
if (isIPv4(arguments[0]) || isIPv4(arguments[1]))
|
||||
{
|
||||
DataTypes new_arguments {
|
||||
isIPv4(arguments[0]) ? std::make_shared<DataTypeUInt32>() : arguments[0],
|
||||
isIPv4(arguments[1]) ? std::make_shared<DataTypeUInt32>() : arguments[1],
|
||||
};
|
||||
|
||||
return getReturnTypeImplStatic(new_arguments, context);
|
||||
}
|
||||
|
||||
/// Special case when the function is plus or minus, one of arguments is Date/DateTime and another is Interval.
|
||||
if (auto function_builder = getFunctionForIntervalArithmetic(arguments[0], arguments[1], context))
|
||||
{
|
||||
@ -1707,6 +1722,25 @@ public:
|
||||
return executeAggregateAddition(arguments, result_type, input_rows_count);
|
||||
}
|
||||
|
||||
/// Special case - one or both arguments are IPv4
|
||||
if (isIPv4(arguments[0].type) || isIPv4(arguments[1].type))
|
||||
{
|
||||
ColumnsWithTypeAndName new_arguments {
|
||||
{
|
||||
isIPv4(arguments[0].type) ? castColumn(arguments[0], std::make_shared<DataTypeUInt32>()) : arguments[0].column,
|
||||
isIPv4(arguments[0].type) ? std::make_shared<DataTypeUInt32>() : arguments[0].type,
|
||||
arguments[0].name,
|
||||
},
|
||||
{
|
||||
isIPv4(arguments[1].type) ? castColumn(arguments[1], std::make_shared<DataTypeUInt32>()) : arguments[1].column,
|
||||
isIPv4(arguments[1].type) ? std::make_shared<DataTypeUInt32>() : arguments[1].type,
|
||||
arguments[1].name
|
||||
}
|
||||
};
|
||||
|
||||
return executeImpl(new_arguments, result_type, input_rows_count);
|
||||
}
|
||||
|
||||
/// Special case when the function is plus or minus, one of arguments is Date/DateTime and another is Interval.
|
||||
if (auto function_builder = getFunctionForIntervalArithmetic(arguments[0].type, arguments[1].type, context))
|
||||
{
|
||||
|
@ -41,7 +41,7 @@ namespace
|
||||
public:
|
||||
AccumulatedBlockReader(TemporaryFileStream & reader_,
|
||||
std::mutex & mutex_,
|
||||
size_t result_block_size_ = DEFAULT_BLOCK_SIZE * 8)
|
||||
size_t result_block_size_ = 0)
|
||||
: reader(reader_)
|
||||
, mutex(mutex_)
|
||||
, result_block_size(result_block_size_)
|
||||
@ -59,18 +59,22 @@ namespace
|
||||
|
||||
Blocks blocks;
|
||||
size_t rows_read = 0;
|
||||
while (rows_read < result_block_size)
|
||||
do
|
||||
{
|
||||
Block block = reader.read();
|
||||
rows_read += block.rows();
|
||||
if (!block)
|
||||
{
|
||||
eof = true;
|
||||
if (blocks.size() == 1)
|
||||
return blocks.front();
|
||||
return concatenateBlocks(blocks);
|
||||
}
|
||||
blocks.push_back(std::move(block));
|
||||
}
|
||||
} while (rows_read < result_block_size);
|
||||
|
||||
if (blocks.size() == 1)
|
||||
return blocks.front();
|
||||
return concatenateBlocks(blocks);
|
||||
}
|
||||
|
||||
@ -118,21 +122,12 @@ class GraceHashJoin::FileBucket : boost::noncopyable
|
||||
public:
|
||||
using BucketLock = std::unique_lock<std::mutex>;
|
||||
|
||||
struct Stats
|
||||
{
|
||||
TemporaryFileStream::Stat left;
|
||||
TemporaryFileStream::Stat right;
|
||||
};
|
||||
|
||||
explicit FileBucket(size_t bucket_index_,
|
||||
TemporaryFileStream & left_file_,
|
||||
TemporaryFileStream & right_file_,
|
||||
Poco::Logger * log_)
|
||||
explicit FileBucket(size_t bucket_index_, TemporaryFileStream & left_file_, TemporaryFileStream & right_file_, Poco::Logger * log_)
|
||||
: idx{bucket_index_}
|
||||
, left_file{left_file_}
|
||||
, right_file{right_file_}
|
||||
, state{State::WRITING_BLOCKS}
|
||||
, log(log_)
|
||||
, log{log_}
|
||||
{
|
||||
}
|
||||
|
||||
@ -168,21 +163,18 @@ public:
|
||||
|
||||
bool empty() const { return is_empty.load(); }
|
||||
|
||||
Stats getStat() const { return stats; }
|
||||
|
||||
AccumulatedBlockReader startJoining()
|
||||
{
|
||||
LOG_TRACE(log, "Joining file bucket {}", idx);
|
||||
|
||||
{
|
||||
std::unique_lock<std::mutex> left_lock(left_file_mutex);
|
||||
std::unique_lock<std::mutex> right_lock(right_file_mutex);
|
||||
|
||||
stats.left = left_file.finishWriting();
|
||||
stats.right = right_file.finishWriting();
|
||||
left_file.finishWriting();
|
||||
right_file.finishWriting();
|
||||
|
||||
state = State::JOINING_BLOCKS;
|
||||
}
|
||||
|
||||
return AccumulatedBlockReader(right_file, right_file_mutex);
|
||||
}
|
||||
|
||||
@ -231,22 +223,23 @@ private:
|
||||
std::atomic_bool is_empty = true;
|
||||
|
||||
std::atomic<State> state;
|
||||
Stats stats;
|
||||
|
||||
Poco::Logger * log;
|
||||
};
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
template <JoinTableSide table_side>
|
||||
void flushBlocksToBuckets(Blocks & blocks, const GraceHashJoin::Buckets & buckets)
|
||||
void flushBlocksToBuckets(Blocks & blocks, const GraceHashJoin::Buckets & buckets, size_t except_index = 0)
|
||||
{
|
||||
chassert(blocks.size() == buckets.size());
|
||||
retryForEach(
|
||||
generateRandomPermutation(1, buckets.size()), // skipping 0 block, since we join it in memory w/o spilling on disk
|
||||
[&](size_t i)
|
||||
{
|
||||
if (!blocks[i].rows())
|
||||
/// Skip empty and current bucket
|
||||
if (!blocks[i].rows() || i == except_index)
|
||||
return true;
|
||||
|
||||
bool flushed = false;
|
||||
@ -281,6 +274,7 @@ GraceHashJoin::GraceHashJoin(
|
||||
, right_key_names(table_join->getOnlyClause().key_names_right)
|
||||
, tmp_data(std::make_unique<TemporaryDataOnDisk>(tmp_data_, CurrentMetrics::TemporaryFilesForJoin))
|
||||
, hash_join(makeInMemoryJoin())
|
||||
, hash_join_sample_block(hash_join->savedBlockSample())
|
||||
{
|
||||
if (!GraceHashJoin::isSupported(table_join))
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "GraceHashJoin is not supported for this join type");
|
||||
@ -288,6 +282,9 @@ GraceHashJoin::GraceHashJoin(
|
||||
|
||||
void GraceHashJoin::initBuckets()
|
||||
{
|
||||
if (!buckets.empty())
|
||||
return;
|
||||
|
||||
const auto & settings = context->getSettingsRef();
|
||||
|
||||
size_t initial_num_buckets = roundUpToPowerOfTwoOrZero(std::clamp<size_t>(settings.grace_hash_join_initial_buckets, 1, settings.grace_hash_join_max_buckets));
|
||||
@ -300,7 +297,7 @@ void GraceHashJoin::initBuckets()
|
||||
if (buckets.empty())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "No buckets created");
|
||||
|
||||
LOG_TRACE(log, "Initialize {} buckets", buckets.size());
|
||||
LOG_TRACE(log, "Initialize {} bucket{}", buckets.size(), buckets.size() > 1 ? "s" : "");
|
||||
|
||||
current_bucket = buckets.front().get();
|
||||
current_bucket->startJoining();
|
||||
@ -320,18 +317,44 @@ bool GraceHashJoin::addJoinedBlock(const Block & block, bool /*check_limits*/)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "GraceHashJoin is not initialized");
|
||||
|
||||
Block materialized = materializeBlock(block);
|
||||
addJoinedBlockImpl(materialized);
|
||||
addJoinedBlockImpl(std::move(materialized));
|
||||
return true;
|
||||
}
|
||||
|
||||
bool GraceHashJoin::fitsInMemory() const
|
||||
bool GraceHashJoin::hasMemoryOverflow(size_t total_rows, size_t total_bytes) const
|
||||
{
|
||||
/// One row can't be split, avoid loop
|
||||
size_t total_row_count = hash_join->getTotalRowCount();
|
||||
if (total_row_count < 2)
|
||||
return true;
|
||||
if (total_rows < 2)
|
||||
return false;
|
||||
|
||||
return table_join->sizeLimits().softCheck(total_row_count, hash_join->getTotalByteCount());
|
||||
bool has_overflow = !table_join->sizeLimits().softCheck(total_rows, total_bytes);
|
||||
|
||||
if (has_overflow)
|
||||
LOG_TRACE(log, "Memory overflow, size exceeded {} / {} bytes, {} / {} rows",
|
||||
ReadableSize(total_bytes), ReadableSize(table_join->sizeLimits().max_bytes),
|
||||
total_rows, table_join->sizeLimits().max_rows);
|
||||
|
||||
return has_overflow;
|
||||
}
|
||||
|
||||
bool GraceHashJoin::hasMemoryOverflow(const BlocksList & blocks) const
|
||||
{
|
||||
size_t total_rows = 0;
|
||||
size_t total_bytes = 0;
|
||||
for (const auto & block : blocks)
|
||||
{
|
||||
total_rows += block.rows();
|
||||
total_bytes += block.allocatedBytes();
|
||||
}
|
||||
return hasMemoryOverflow(total_rows, total_bytes);
|
||||
}
|
||||
|
||||
bool GraceHashJoin::hasMemoryOverflow(const InMemoryJoinPtr & hash_join_) const
|
||||
{
|
||||
size_t total_rows = hash_join_->getTotalRowCount();
|
||||
size_t total_bytes = hash_join_->getTotalByteCount();
|
||||
|
||||
return hasMemoryOverflow(total_rows, total_bytes);
|
||||
}
|
||||
|
||||
GraceHashJoin::Buckets GraceHashJoin::rehashBuckets(size_t to_size)
|
||||
@ -342,7 +365,7 @@ GraceHashJoin::Buckets GraceHashJoin::rehashBuckets(size_t to_size)
|
||||
if (to_size <= current_size)
|
||||
return buckets;
|
||||
|
||||
assert(isPowerOf2(to_size));
|
||||
chassert(isPowerOf2(to_size));
|
||||
|
||||
if (to_size > max_num_buckets)
|
||||
{
|
||||
@ -363,14 +386,16 @@ GraceHashJoin::Buckets GraceHashJoin::rehashBuckets(size_t to_size)
|
||||
|
||||
void GraceHashJoin::addBucket(Buckets & destination)
|
||||
{
|
||||
BucketPtr new_bucket = std::make_shared<FileBucket>(
|
||||
destination.size(), tmp_data->createStream(left_sample_block), tmp_data->createStream(right_sample_block), log);
|
||||
auto & left_file = tmp_data->createStream(left_sample_block);
|
||||
auto & right_file = tmp_data->createStream(prepareRightBlock(right_sample_block));
|
||||
|
||||
BucketPtr new_bucket = std::make_shared<FileBucket>(destination.size(), left_file, right_file, log);
|
||||
destination.emplace_back(std::move(new_bucket));
|
||||
}
|
||||
|
||||
void GraceHashJoin::checkTypesOfKeys(const Block & block) const
|
||||
{
|
||||
assert(hash_join);
|
||||
chassert(hash_join);
|
||||
return hash_join->checkTypesOfKeys(block);
|
||||
}
|
||||
|
||||
@ -423,7 +448,7 @@ size_t GraceHashJoin::getTotalRowCount() const
|
||||
size_t GraceHashJoin::getTotalByteCount() const
|
||||
{
|
||||
std::lock_guard lock(hash_join_mutex);
|
||||
assert(hash_join);
|
||||
chassert(hash_join);
|
||||
return hash_join->getTotalByteCount();
|
||||
}
|
||||
|
||||
@ -437,9 +462,14 @@ bool GraceHashJoin::alwaysReturnsEmptySet() const
|
||||
std::shared_lock lock(rehash_mutex);
|
||||
return std::all_of(buckets.begin(), buckets.end(), [](const auto & bucket) { return bucket->empty(); });
|
||||
}();
|
||||
bool hash_join_is_empty = hash_join && hash_join->alwaysReturnsEmptySet();
|
||||
|
||||
return hash_join_is_empty && file_buckets_are_empty;
|
||||
if (!file_buckets_are_empty)
|
||||
return false;
|
||||
|
||||
chassert(hash_join);
|
||||
bool hash_join_is_empty = hash_join->alwaysReturnsEmptySet();
|
||||
|
||||
return hash_join_is_empty;
|
||||
}
|
||||
|
||||
IBlocksStreamPtr GraceHashJoin::getNonJoinedBlocks(const Block &, const Block &, UInt64) const
|
||||
@ -528,17 +558,11 @@ IBlocksStreamPtr GraceHashJoin::getDelayedBlocks()
|
||||
|
||||
if (hash_join)
|
||||
{
|
||||
auto right_blocks = hash_join->releaseJoinedBlocks();
|
||||
Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, right_blocks, buckets.size());
|
||||
|
||||
for (size_t i = 0; i < blocks.size(); ++i)
|
||||
auto right_blocks = hash_join->releaseJoinedBlocks(/* restructure */ false);
|
||||
for (auto & block : right_blocks)
|
||||
{
|
||||
if (blocks[i].rows() == 0 || i == bucket_idx)
|
||||
continue;
|
||||
|
||||
if (i < bucket_idx)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected bucket index {} when current bucket is {}", i, bucket_idx);
|
||||
buckets[i]->addRightBlock(blocks[i]);
|
||||
Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, block, buckets.size());
|
||||
flushBlocksToBuckets<JoinTableSide::Right>(blocks, buckets, bucket_idx);
|
||||
}
|
||||
}
|
||||
|
||||
@ -570,7 +594,7 @@ IBlocksStreamPtr GraceHashJoin::getDelayedBlocks()
|
||||
return std::make_unique<DelayedBlocks>(current_bucket->idx, buckets, hash_join, left_key_names, right_key_names);
|
||||
}
|
||||
|
||||
LOG_TRACE(log, "Finished loading all buckets");
|
||||
LOG_TRACE(log, "Finished loading all {} buckets", buckets.size());
|
||||
|
||||
current_bucket = nullptr;
|
||||
return nullptr;
|
||||
@ -581,42 +605,64 @@ GraceHashJoin::InMemoryJoinPtr GraceHashJoin::makeInMemoryJoin()
|
||||
return std::make_unique<InMemoryJoin>(table_join, right_sample_block, any_take_last_row);
|
||||
}
|
||||
|
||||
Block GraceHashJoin::prepareRightBlock(const Block & block)
|
||||
{
|
||||
return HashJoin::prepareRightBlock(block, hash_join_sample_block);
|
||||
}
|
||||
|
||||
void GraceHashJoin::addJoinedBlockImpl(Block block)
|
||||
{
|
||||
Buckets buckets_snapshot = getCurrentBuckets();
|
||||
Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, block, buckets_snapshot.size());
|
||||
size_t bucket_index = current_bucket->idx;
|
||||
Block current_block;
|
||||
|
||||
{
|
||||
Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, block, buckets_snapshot.size());
|
||||
flushBlocksToBuckets<JoinTableSide::Right>(blocks, buckets_snapshot, bucket_index);
|
||||
current_block = std::move(blocks[bucket_index]);
|
||||
}
|
||||
|
||||
// Add block to the in-memory join
|
||||
if (blocks[bucket_index].rows() > 0)
|
||||
if (current_block.rows() > 0)
|
||||
{
|
||||
std::lock_guard lock(hash_join_mutex);
|
||||
|
||||
hash_join->addJoinedBlock(blocks[bucket_index], /* check_limits = */ false);
|
||||
bool overflow = !fitsInMemory();
|
||||
|
||||
if (overflow)
|
||||
{
|
||||
auto right_blocks = hash_join->releaseJoinedBlocks();
|
||||
right_blocks.pop_back();
|
||||
|
||||
for (const auto & right_block : right_blocks)
|
||||
blocks.push_back(right_block);
|
||||
}
|
||||
|
||||
while (overflow)
|
||||
{
|
||||
buckets_snapshot = rehashBuckets(buckets_snapshot.size() * 2);
|
||||
|
||||
blocks = JoinCommon::scatterBlockByHash(right_key_names, blocks, buckets_snapshot.size());
|
||||
if (!hash_join)
|
||||
hash_join = makeInMemoryJoin();
|
||||
hash_join->addJoinedBlock(blocks[bucket_index], /* check_limits = */ false);
|
||||
overflow = !fitsInMemory();
|
||||
}
|
||||
blocks[bucket_index].clear();
|
||||
}
|
||||
|
||||
flushBlocksToBuckets<JoinTableSide::Right>(blocks, buckets_snapshot);
|
||||
hash_join->addJoinedBlock(current_block, /* check_limits = */ false);
|
||||
|
||||
if (!hasMemoryOverflow(hash_join))
|
||||
return;
|
||||
|
||||
current_block = {};
|
||||
|
||||
auto right_blocks = hash_join->releaseJoinedBlocks(/* restructure */ false);
|
||||
hash_join = nullptr;
|
||||
|
||||
buckets_snapshot = rehashBuckets(buckets_snapshot.size() * 2);
|
||||
|
||||
{
|
||||
Blocks current_blocks;
|
||||
current_blocks.reserve(right_blocks.size());
|
||||
for (const auto & right_block : right_blocks)
|
||||
{
|
||||
Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, right_block, buckets_snapshot.size());
|
||||
flushBlocksToBuckets<JoinTableSide::Right>(blocks, buckets_snapshot, bucket_index);
|
||||
current_blocks.emplace_back(std::move(blocks[bucket_index]));
|
||||
}
|
||||
|
||||
if (current_blocks.size() == 1)
|
||||
current_block = std::move(current_blocks.front());
|
||||
else
|
||||
current_block = concatenateBlocks(current_blocks);
|
||||
}
|
||||
|
||||
hash_join = makeInMemoryJoin();
|
||||
|
||||
if (current_block.rows() > 0)
|
||||
hash_join->addJoinedBlock(current_block, /* check_limits = */ false);
|
||||
}
|
||||
}
|
||||
|
||||
size_t GraceHashJoin::getNumBuckets() const
|
||||
|
@ -95,8 +95,10 @@ private:
|
||||
/// Add right table block to the @join. Calls @rehash on overflow.
|
||||
void addJoinedBlockImpl(Block block);
|
||||
|
||||
/// Check that @join satisifes limits on rows/bytes in @table_join.
|
||||
bool fitsInMemory() const;
|
||||
/// Check that join satisfies limits on rows/bytes in table_join.
|
||||
bool hasMemoryOverflow(size_t total_rows, size_t total_bytes) const;
|
||||
bool hasMemoryOverflow(const InMemoryJoinPtr & hash_join_) const;
|
||||
bool hasMemoryOverflow(const BlocksList & blocks) const;
|
||||
|
||||
/// Create new bucket at the end of @destination.
|
||||
void addBucket(Buckets & destination);
|
||||
@ -114,6 +116,9 @@ private:
|
||||
size_t getNumBuckets() const;
|
||||
Buckets getCurrentBuckets() const;
|
||||
|
||||
/// Structure block to store in the HashJoin according to sample_block.
|
||||
Block prepareRightBlock(const Block & block);
|
||||
|
||||
Poco::Logger * log;
|
||||
ContextPtr context;
|
||||
std::shared_ptr<TableJoin> table_join;
|
||||
@ -136,6 +141,7 @@ private:
|
||||
mutable std::mutex current_bucket_mutex;
|
||||
|
||||
InMemoryJoinPtr hash_join;
|
||||
Block hash_join_sample_block;
|
||||
mutable std::mutex hash_join_mutex;
|
||||
};
|
||||
|
||||
|
@ -221,8 +221,8 @@ HashJoin::HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_s
|
||||
, right_sample_block(right_sample_block_)
|
||||
, log(&Poco::Logger::get("HashJoin"))
|
||||
{
|
||||
LOG_DEBUG(log, "Datatype: {}, kind: {}, strictness: {}, right header: {}", data->type, kind, strictness, right_sample_block.dumpStructure());
|
||||
LOG_DEBUG(log, "Keys: {}", TableJoin::formatClauses(table_join->getClauses(), true));
|
||||
LOG_DEBUG(log, "({}) Datatype: {}, kind: {}, strictness: {}, right header: {}", fmt::ptr(this), data->type, kind, strictness, right_sample_block.dumpStructure());
|
||||
LOG_DEBUG(log, "({}) Keys: {}", fmt::ptr(this), TableJoin::formatClauses(table_join->getClauses(), true));
|
||||
|
||||
if (isCrossOrComma(kind))
|
||||
{
|
||||
@ -469,6 +469,9 @@ bool HashJoin::alwaysReturnsEmptySet() const
|
||||
|
||||
size_t HashJoin::getTotalRowCount() const
|
||||
{
|
||||
if (!data)
|
||||
return 0;
|
||||
|
||||
size_t res = 0;
|
||||
|
||||
if (data->type == Type::CROSS)
|
||||
@ -484,28 +487,45 @@ size_t HashJoin::getTotalRowCount() const
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
size_t HashJoin::getTotalByteCount() const
|
||||
{
|
||||
if (!data)
|
||||
return 0;
|
||||
|
||||
#ifdef NDEBUG
|
||||
size_t debug_blocks_allocated_size = 0;
|
||||
for (const auto & block : data->blocks)
|
||||
debug_blocks_allocated_size += block.allocatedBytes();
|
||||
|
||||
if (data->blocks_allocated_size != debug_blocks_allocated_size)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "data->blocks_allocated_size != debug_blocks_allocated_size ({} != {})",
|
||||
data->blocks_allocated_size, debug_blocks_allocated_size);
|
||||
|
||||
size_t debug_blocks_nullmaps_allocated_size = 0;
|
||||
for (const auto & nullmap : data->blocks_nullmaps)
|
||||
debug_blocks_nullmaps_allocated_size += nullmap.second->allocatedBytes();
|
||||
|
||||
if (data->blocks_nullmaps_allocated_size != debug_blocks_nullmaps_allocated_size)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "data->blocks_nullmaps_allocated_size != debug_blocks_nullmaps_allocated_size ({} != {})",
|
||||
data->blocks_nullmaps_allocated_size, debug_blocks_nullmaps_allocated_size);
|
||||
#endif
|
||||
|
||||
size_t res = 0;
|
||||
|
||||
if (data->type == Type::CROSS)
|
||||
{
|
||||
for (const auto & block : data->blocks)
|
||||
res += block.bytes();
|
||||
}
|
||||
else
|
||||
res += data->blocks_allocated_size;
|
||||
res += data->blocks_nullmaps_allocated_size;
|
||||
res += data->pool.size();
|
||||
|
||||
if (data->type != Type::CROSS)
|
||||
{
|
||||
for (const auto & map : data->maps)
|
||||
{
|
||||
joinDispatch(kind, strictness, map, [&](auto, auto, auto & map_) { res += map_.getTotalByteCountImpl(data->type); });
|
||||
}
|
||||
res += data->pool.size();
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
@ -656,41 +676,57 @@ void HashJoin::initRightBlockStructure(Block & saved_block_sample)
|
||||
}
|
||||
}
|
||||
|
||||
Block HashJoin::structureRightBlock(const Block & block) const
|
||||
Block HashJoin::prepareRightBlock(const Block & block, const Block & saved_block_sample_)
|
||||
{
|
||||
Block structured_block;
|
||||
for (const auto & sample_column : savedBlockSample().getColumnsWithTypeAndName())
|
||||
for (const auto & sample_column : saved_block_sample_.getColumnsWithTypeAndName())
|
||||
{
|
||||
ColumnWithTypeAndName column = block.getByName(sample_column.name);
|
||||
if (sample_column.column->isNullable())
|
||||
JoinCommon::convertColumnToNullable(column);
|
||||
structured_block.insert(column);
|
||||
|
||||
if (column.column->lowCardinality() && !sample_column.column->lowCardinality())
|
||||
{
|
||||
column.column = column.column->convertToFullColumnIfLowCardinality();
|
||||
column.type = removeLowCardinality(column.type);
|
||||
}
|
||||
|
||||
/// There's no optimization for right side const columns. Remove constness if any.
|
||||
column.column = recursiveRemoveSparse(column.column->convertToFullColumnIfConst());
|
||||
structured_block.insert(std::move(column));
|
||||
}
|
||||
|
||||
return structured_block;
|
||||
}
|
||||
|
||||
Block HashJoin::prepareRightBlock(const Block & block) const
|
||||
{
|
||||
return prepareRightBlock(block, savedBlockSample());
|
||||
}
|
||||
|
||||
bool HashJoin::addJoinedBlock(const Block & source_block, bool check_limits)
|
||||
{
|
||||
if (!data)
|
||||
throw Exception("Join data was released", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
/// RowRef::SizeT is uint32_t (not size_t) for hash table Cell memory efficiency.
|
||||
/// It's possible to split bigger blocks and insert them by parts here. But it would be a dead code.
|
||||
if (unlikely(source_block.rows() > std::numeric_limits<RowRef::SizeT>::max()))
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Too many rows in right table block for HashJoin: {}", source_block.rows());
|
||||
|
||||
/// There's no optimization for right side const columns. Remove constness if any.
|
||||
Block block = materializeBlock(source_block);
|
||||
size_t rows = block.rows();
|
||||
size_t rows = source_block.rows();
|
||||
|
||||
ColumnRawPtrMap all_key_columns = JoinCommon::materializeColumnsInplaceMap(block, table_join->getAllNames(JoinTableSide::Right));
|
||||
ColumnPtrMap all_key_columns = JoinCommon::materializeColumnsInplaceMap(source_block, table_join->getAllNames(JoinTableSide::Right));
|
||||
|
||||
Block structured_block = structureRightBlock(block);
|
||||
Block block_to_save = prepareRightBlock(source_block);
|
||||
size_t total_rows = 0;
|
||||
size_t total_bytes = 0;
|
||||
{
|
||||
if (storage_join_lock)
|
||||
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "addJoinedBlock called when HashJoin locked to prevent updates");
|
||||
|
||||
data->blocks.emplace_back(std::move(structured_block));
|
||||
data->blocks_allocated_size += block_to_save.allocatedBytes();
|
||||
data->blocks.emplace_back(std::move(block_to_save));
|
||||
Block * stored_block = &data->blocks.back();
|
||||
|
||||
if (rows)
|
||||
@ -702,7 +738,7 @@ bool HashJoin::addJoinedBlock(const Block & source_block, bool check_limits)
|
||||
{
|
||||
ColumnRawPtrs key_columns;
|
||||
for (const auto & name : onexprs[onexpr_idx].key_names_right)
|
||||
key_columns.push_back(all_key_columns[name]);
|
||||
key_columns.push_back(all_key_columns[name].get());
|
||||
|
||||
/// We will insert to the map only keys, where all components are not NULL.
|
||||
ConstNullMapPtr null_map{};
|
||||
@ -717,14 +753,14 @@ bool HashJoin::addJoinedBlock(const Block & source_block, bool check_limits)
|
||||
save_nullmap |= (*null_map)[i];
|
||||
}
|
||||
|
||||
auto join_mask_col = JoinCommon::getColumnAsMask(block, onexprs[onexpr_idx].condColumnNames().second);
|
||||
auto join_mask_col = JoinCommon::getColumnAsMask(source_block, onexprs[onexpr_idx].condColumnNames().second);
|
||||
/// Save blocks that do not hold conditions in ON section
|
||||
ColumnUInt8::MutablePtr not_joined_map = nullptr;
|
||||
if (!multiple_disjuncts && isRightOrFull(kind) && !join_mask_col.isConstant())
|
||||
if (!multiple_disjuncts && isRightOrFull(kind) && join_mask_col.hasData())
|
||||
{
|
||||
const auto & join_mask = join_mask_col.getData();
|
||||
/// Save rows that do not hold conditions
|
||||
not_joined_map = ColumnUInt8::create(block.rows(), 0);
|
||||
not_joined_map = ColumnUInt8::create(rows, 0);
|
||||
for (size_t i = 0, sz = join_mask->size(); i < sz; ++i)
|
||||
{
|
||||
/// Condition hold, do not save row
|
||||
@ -758,10 +794,16 @@ bool HashJoin::addJoinedBlock(const Block & source_block, bool check_limits)
|
||||
}
|
||||
|
||||
if (!multiple_disjuncts && save_nullmap)
|
||||
{
|
||||
data->blocks_nullmaps_allocated_size += null_map_holder->allocatedBytes();
|
||||
data->blocks_nullmaps.emplace_back(stored_block, null_map_holder);
|
||||
}
|
||||
|
||||
if (!multiple_disjuncts && not_joined_map)
|
||||
{
|
||||
data->blocks_nullmaps_allocated_size += not_joined_map->allocatedBytes();
|
||||
data->blocks_nullmaps.emplace_back(stored_block, std::move(not_joined_map));
|
||||
}
|
||||
|
||||
if (!check_limits)
|
||||
return true;
|
||||
@ -794,7 +836,6 @@ struct JoinOnKeyColumns
|
||||
|
||||
Sizes key_sizes;
|
||||
|
||||
|
||||
explicit JoinOnKeyColumns(const Block & block, const Names & key_names_, const String & cond_column_name, const Sizes & key_sizes_)
|
||||
: key_names(key_names_)
|
||||
, materialized_keys_holder(JoinCommon::materializeColumns(block, key_names)) /// Rare case, when keys are constant or low cardinality. To avoid code bloat, simply materialize them.
|
||||
@ -1672,7 +1713,7 @@ void HashJoin::checkTypesOfKeys(const Block & block) const
|
||||
|
||||
void HashJoin::joinBlock(Block & block, ExtraBlockPtr & not_processed)
|
||||
{
|
||||
if (data->released)
|
||||
if (!data)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot join after data has been released");
|
||||
|
||||
for (const auto & onexpr : table_join->getClauses())
|
||||
@ -1711,6 +1752,16 @@ void HashJoin::joinBlock(Block & block, ExtraBlockPtr & not_processed)
|
||||
}
|
||||
}
|
||||
|
||||
HashJoin::~HashJoin()
|
||||
{
|
||||
if (!data)
|
||||
{
|
||||
LOG_TRACE(log, "({}) Join data has been already released", fmt::ptr(this));
|
||||
return;
|
||||
}
|
||||
LOG_TRACE(log, "({}) Join data is being destroyed, {} bytes and {} rows in hash table", fmt::ptr(this), getTotalByteCount(), getTotalRowCount());
|
||||
}
|
||||
|
||||
template <typename Mapped>
|
||||
struct AdderNonJoined
|
||||
{
|
||||
@ -1749,7 +1800,6 @@ struct AdderNonJoined
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
/// Stream from not joined earlier rows of the right table.
|
||||
/// Based on:
|
||||
/// - map offsetInternal saved in used_flags for single disjuncts
|
||||
@ -1760,7 +1810,10 @@ class NotJoinedHash final : public NotJoinedBlocks::RightColumnsFiller
|
||||
public:
|
||||
NotJoinedHash(const HashJoin & parent_, UInt64 max_block_size_)
|
||||
: parent(parent_), max_block_size(max_block_size_), current_block_start(0)
|
||||
{}
|
||||
{
|
||||
if (parent.data == nullptr)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot join after data has been released");
|
||||
}
|
||||
|
||||
Block getEmptyBlock() override { return parent.savedBlockSample().cloneEmpty(); }
|
||||
|
||||
@ -1957,7 +2010,6 @@ IBlocksStreamPtr HashJoin::getNonJoinedBlocks(const Block & left_sample_block,
|
||||
size_t left_columns_count = left_sample_block.columns();
|
||||
auto non_joined = std::make_unique<NotJoinedHash<true>>(*this, max_block_size);
|
||||
return std::make_unique<NotJoinedBlocks>(std::move(non_joined), result_sample_block, left_columns_count, *table_join);
|
||||
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -1986,10 +2038,20 @@ void HashJoin::reuseJoinedData(const HashJoin & join)
|
||||
}
|
||||
}
|
||||
|
||||
BlocksList HashJoin::releaseJoinedBlocks()
|
||||
BlocksList HashJoin::releaseJoinedBlocks(bool restructure)
|
||||
{
|
||||
LOG_TRACE(log, "({}) Join data is being released, {} bytes and {} rows in hash table", fmt::ptr(this), getTotalByteCount(), getTotalRowCount());
|
||||
|
||||
BlocksList right_blocks = std::move(data->blocks);
|
||||
data->released = true;
|
||||
if (!restructure)
|
||||
{
|
||||
data.reset();
|
||||
return right_blocks;
|
||||
}
|
||||
|
||||
data->maps.clear();
|
||||
data->blocks_nullmaps.clear();
|
||||
|
||||
BlocksList restored_blocks;
|
||||
|
||||
/// names to positions optimization
|
||||
@ -2018,6 +2080,7 @@ BlocksList HashJoin::releaseJoinedBlocks()
|
||||
restored_blocks.emplace_back(std::move(restored_block));
|
||||
}
|
||||
|
||||
data.reset();
|
||||
return restored_blocks;
|
||||
}
|
||||
|
||||
|
@ -149,6 +149,8 @@ class HashJoin : public IJoin
|
||||
public:
|
||||
HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block, bool any_take_last_row_ = false);
|
||||
|
||||
~HashJoin() override;
|
||||
|
||||
const TableJoin & getTableJoin() const override { return *table_join; }
|
||||
|
||||
/** Add block of data from right hand of JOIN to the map.
|
||||
@ -336,7 +338,8 @@ public:
|
||||
/// Additional data - strings for string keys and continuation elements of single-linked lists of references to rows.
|
||||
Arena pool;
|
||||
|
||||
bool released = false;
|
||||
size_t blocks_allocated_size = 0;
|
||||
size_t blocks_nullmaps_allocated_size = 0;
|
||||
};
|
||||
|
||||
using RightTableDataPtr = std::shared_ptr<RightTableData>;
|
||||
@ -351,7 +354,13 @@ public:
|
||||
void reuseJoinedData(const HashJoin & join);
|
||||
|
||||
RightTableDataPtr getJoinedData() const { return data; }
|
||||
BlocksList releaseJoinedBlocks();
|
||||
BlocksList releaseJoinedBlocks(bool restructure = false);
|
||||
|
||||
/// Modify right block (update structure according to sample block) to save it in block list
|
||||
static Block prepareRightBlock(const Block & block, const Block & saved_block_sample_);
|
||||
Block prepareRightBlock(const Block & block) const;
|
||||
|
||||
const Block & savedBlockSample() const { return data->sample_block; }
|
||||
|
||||
bool isUsed(size_t off) const { return used_flags.getUsedSafe(off); }
|
||||
bool isUsed(const Block * block_ptr, size_t row_idx) const { return used_flags.getUsedSafe(block_ptr, row_idx); }
|
||||
@ -403,10 +412,6 @@ private:
|
||||
|
||||
void dataMapInit(MapsVariant &);
|
||||
|
||||
const Block & savedBlockSample() const { return data->sample_block; }
|
||||
|
||||
/// Modify (structure) right block to save it in block list
|
||||
Block structureRightBlock(const Block & stored_block) const;
|
||||
void initRightBlockStructure(Block & saved_block_sample);
|
||||
|
||||
template <JoinKind KIND, JoinStrictness STRICTNESS, typename Maps>
|
||||
|
@ -287,10 +287,6 @@ BlockIO InterpreterDropQuery::executeToTemporaryTable(const String & table_name,
|
||||
table->drop();
|
||||
table->is_dropped = true;
|
||||
}
|
||||
else if (kind == ASTDropQuery::Kind::Detach)
|
||||
{
|
||||
table->is_detached = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -562,7 +562,10 @@ InterpreterSelectQuery::InterpreterSelectQuery(
|
||||
view = nullptr;
|
||||
}
|
||||
|
||||
if (try_move_to_prewhere && storage && storage->canMoveConditionsToPrewhere() && query.where() && !query.prewhere())
|
||||
if (try_move_to_prewhere
|
||||
&& storage && storage->canMoveConditionsToPrewhere()
|
||||
&& query.where() && !query.prewhere()
|
||||
&& !query.hasJoin()) /// Join may produce rows with nulls or default values, it's difficult to analyze if they affected or not.
|
||||
{
|
||||
/// PREWHERE optimization: transfer some condition from WHERE to PREWHERE if enabled and viable
|
||||
if (const auto & column_sizes = storage->getColumnSizes(); !column_sizes.empty())
|
||||
|
@ -41,7 +41,7 @@ bool JoinSwitcher::addJoinedBlock(const Block & block, bool)
|
||||
bool JoinSwitcher::switchJoin()
|
||||
{
|
||||
HashJoin * hash_join = assert_cast<HashJoin *>(join.get());
|
||||
BlocksList right_blocks = hash_join->releaseJoinedBlocks();
|
||||
BlocksList right_blocks = hash_join->releaseJoinedBlocks(true);
|
||||
|
||||
/// Destroy old join & create new one.
|
||||
join = std::make_shared<MergeJoin>(table_join, right_sample_block);
|
||||
|
@ -324,17 +324,20 @@ ColumnRawPtrs materializeColumnsInplace(Block & block, const Names & names)
|
||||
return ptrs;
|
||||
}
|
||||
|
||||
ColumnRawPtrMap materializeColumnsInplaceMap(Block & block, const Names & names)
|
||||
ColumnPtrMap materializeColumnsInplaceMap(const Block & block, const Names & names)
|
||||
{
|
||||
ColumnRawPtrMap ptrs;
|
||||
ColumnPtrMap ptrs;
|
||||
ptrs.reserve(names.size());
|
||||
|
||||
for (const auto & column_name : names)
|
||||
{
|
||||
auto & column = block.getByName(column_name);
|
||||
column.column = recursiveRemoveLowCardinality(column.column->convertToFullColumnIfConst());
|
||||
column.type = recursiveRemoveLowCardinality(column.type);
|
||||
ptrs[column_name] = column.column.get();
|
||||
ColumnPtr column = block.getByName(column_name).column;
|
||||
|
||||
column = column->convertToFullColumnIfConst();
|
||||
column = recursiveRemoveLowCardinality(column);
|
||||
column = recursiveRemoveSparse(column);
|
||||
|
||||
ptrs[column_name] = column;
|
||||
}
|
||||
|
||||
return ptrs;
|
||||
@ -529,24 +532,24 @@ bool typesEqualUpToNullability(DataTypePtr left_type, DataTypePtr right_type)
|
||||
JoinMask getColumnAsMask(const Block & block, const String & column_name)
|
||||
{
|
||||
if (column_name.empty())
|
||||
return JoinMask(true);
|
||||
return JoinMask(true, block.rows());
|
||||
|
||||
const auto & src_col = block.getByName(column_name);
|
||||
|
||||
DataTypePtr col_type = recursiveRemoveLowCardinality(src_col.type);
|
||||
if (isNothing(col_type))
|
||||
return JoinMask(false);
|
||||
return JoinMask(false, block.rows());
|
||||
|
||||
if (const auto * const_cond = checkAndGetColumn<ColumnConst>(*src_col.column))
|
||||
{
|
||||
return JoinMask(const_cond->getBool(0));
|
||||
return JoinMask(const_cond->getBool(0), block.rows());
|
||||
}
|
||||
|
||||
ColumnPtr join_condition_col = recursiveRemoveLowCardinality(src_col.column->convertToFullColumnIfConst());
|
||||
if (const auto * nullable_col = typeid_cast<const ColumnNullable *>(join_condition_col.get()))
|
||||
{
|
||||
if (isNothing(assert_cast<const DataTypeNullable &>(*col_type).getNestedType()))
|
||||
return JoinMask(false);
|
||||
return JoinMask(false, block.rows());
|
||||
|
||||
/// Return nested column with NULL set to false
|
||||
const auto & nest_col = assert_cast<const ColumnUInt8 &>(nullable_col->getNestedColumn());
|
||||
@ -639,9 +642,8 @@ Blocks scatterBlockByHash(const Strings & key_columns_names, const Block & block
|
||||
{
|
||||
if (num_shards == 0)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Number of shards must be positive");
|
||||
UNUSED(scatterBlockByHashPow2);
|
||||
// if (likely(isPowerOf2(num_shards)))
|
||||
// return scatterBlockByHashPow2(key_columns_names, block, num_shards);
|
||||
if (likely(isPowerOf2(num_shards)))
|
||||
return scatterBlockByHashPow2(key_columns_names, block, num_shards);
|
||||
return scatterBlockByHashGeneric(key_columns_names, block, num_shards);
|
||||
}
|
||||
|
||||
|
@ -14,30 +14,34 @@ class TableJoin;
|
||||
class IColumn;
|
||||
|
||||
using ColumnRawPtrs = std::vector<const IColumn *>;
|
||||
using ColumnPtrMap = std::unordered_map<String, ColumnPtr>;
|
||||
using ColumnRawPtrMap = std::unordered_map<String, const IColumn *>;
|
||||
using UInt8ColumnDataPtr = const ColumnUInt8::Container *;
|
||||
|
||||
namespace JoinCommon
|
||||
{
|
||||
|
||||
/// Store boolean column handling constant value without materializing
|
||||
/// Behaves similar to std::variant<bool, ColumnPtr>, but provides more convenient specialized interface
|
||||
/// Helper interface to work with mask from JOIN ON section
|
||||
class JoinMask
|
||||
{
|
||||
public:
|
||||
explicit JoinMask(bool value)
|
||||
explicit JoinMask()
|
||||
: column(nullptr)
|
||||
, const_value(value)
|
||||
{}
|
||||
|
||||
explicit JoinMask(bool value, size_t size)
|
||||
: column(ColumnUInt8::create(size, value))
|
||||
{}
|
||||
|
||||
explicit JoinMask(ColumnPtr col)
|
||||
: column(col)
|
||||
, const_value(false)
|
||||
{}
|
||||
|
||||
bool isConstant() { return !column; }
|
||||
bool hasData()
|
||||
{
|
||||
return column != nullptr;
|
||||
}
|
||||
|
||||
/// Return data if mask is not constant
|
||||
UInt8ColumnDataPtr getData()
|
||||
{
|
||||
if (column)
|
||||
@ -47,15 +51,11 @@ public:
|
||||
|
||||
inline bool isRowFiltered(size_t row) const
|
||||
{
|
||||
if (column)
|
||||
return !assert_cast<const ColumnUInt8 &>(*column).getData()[row];
|
||||
return !const_value;
|
||||
return !assert_cast<const ColumnUInt8 &>(*column).getData()[row];
|
||||
}
|
||||
|
||||
private:
|
||||
ColumnPtr column;
|
||||
/// Used if column is null
|
||||
bool const_value;
|
||||
};
|
||||
|
||||
|
||||
@ -71,7 +71,7 @@ ColumnPtr emptyNotNullableClone(const ColumnPtr & column);
|
||||
ColumnPtr materializeColumn(const Block & block, const String & name);
|
||||
Columns materializeColumns(const Block & block, const Names & names);
|
||||
ColumnRawPtrs materializeColumnsInplace(Block & block, const Names & names);
|
||||
ColumnRawPtrMap materializeColumnsInplaceMap(Block & block, const Names & names);
|
||||
ColumnPtrMap materializeColumnsInplaceMap(const Block & block, const Names & names);
|
||||
ColumnRawPtrs getRawPointers(const Columns & columns);
|
||||
void convertToFullColumnsInplace(Block & block);
|
||||
void convertToFullColumnsInplace(Block & block, const Names & names, bool change_type = true);
|
||||
|
@ -55,7 +55,7 @@ ColumnWithTypeAndName condtitionColumnToJoinable(const Block & block, const Stri
|
||||
if (!src_column_name.empty())
|
||||
{
|
||||
auto join_mask = JoinCommon::getColumnAsMask(block, src_column_name);
|
||||
if (!join_mask.isConstant())
|
||||
if (join_mask.hasData())
|
||||
{
|
||||
for (size_t i = 0; i < res_size; ++i)
|
||||
null_map->getData()[i] = join_mask.isRowFiltered(i);
|
||||
|
@ -198,7 +198,7 @@ public:
|
||||
: size_limits(limits)
|
||||
, default_max_bytes(0)
|
||||
, join_use_nulls(use_nulls)
|
||||
, join_algorithm(JoinAlgorithm::HASH)
|
||||
, join_algorithm(JoinAlgorithm::DEFAULT)
|
||||
{
|
||||
clauses.emplace_back().key_names_right = key_names_right;
|
||||
table_join.kind = kind;
|
||||
|
@ -392,10 +392,34 @@ bool ExecutingGraph::updateNode(uint64_t pid, Queue & queue, Queue & async_queue
|
||||
|
||||
void ExecutingGraph::cancel()
|
||||
{
|
||||
std::lock_guard guard(processors_mutex);
|
||||
for (auto & processor : *processors)
|
||||
processor->cancel();
|
||||
cancelled = true;
|
||||
std::exception_ptr exception_ptr;
|
||||
|
||||
{
|
||||
std::lock_guard guard(processors_mutex);
|
||||
for (auto & processor : *processors)
|
||||
{
|
||||
try
|
||||
{
|
||||
processor->cancel();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
if (!exception_ptr)
|
||||
exception_ptr = std::current_exception();
|
||||
|
||||
/// Log any exception since:
|
||||
/// a) they are pretty rare (the only that I know is from
|
||||
/// RemoteQueryExecutor)
|
||||
/// b) there can be exception during query execution, and in this
|
||||
/// case, this exception can be ignored (not showed to the user).
|
||||
tryLogCurrentException("ExecutingGraph");
|
||||
}
|
||||
}
|
||||
cancelled = true;
|
||||
}
|
||||
|
||||
if (exception_ptr)
|
||||
std::rethrow_exception(exception_ptr);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -175,20 +175,35 @@ bool PullingAsyncPipelineExecutor::pull(Block & block, uint64_t milliseconds)
|
||||
|
||||
void PullingAsyncPipelineExecutor::cancel()
|
||||
{
|
||||
if (!data)
|
||||
return;
|
||||
|
||||
/// Cancel execution if it wasn't finished.
|
||||
if (data && !data->is_finished && data->executor)
|
||||
data->executor->cancel();
|
||||
try
|
||||
{
|
||||
if (!data->is_finished && data->executor)
|
||||
data->executor->cancel();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
/// Store exception only of during query execution there was no
|
||||
/// exception, since only one exception can be re-thrown.
|
||||
if (!data->has_exception)
|
||||
{
|
||||
data->exception = std::current_exception();
|
||||
data->has_exception = true;
|
||||
}
|
||||
}
|
||||
|
||||
/// The following code is needed to rethrow exception from PipelineExecutor.
|
||||
/// It could have been thrown from pull(), but we will not likely call it again.
|
||||
|
||||
/// Join thread here to wait for possible exception.
|
||||
if (data && data->thread.joinable())
|
||||
if (data->thread.joinable())
|
||||
data->thread.join();
|
||||
|
||||
/// Rethrow exception to not swallow it in destructor.
|
||||
if (data)
|
||||
data->rethrowExceptionIfHas();
|
||||
data->rethrowExceptionIfHas();
|
||||
}
|
||||
|
||||
Chunk PullingAsyncPipelineExecutor::getTotals()
|
||||
|
@ -50,10 +50,10 @@ TableLockHolder IStorage::lockForShare(const String & query_id, const std::chron
|
||||
{
|
||||
TableLockHolder result = tryLockTimed(drop_lock, RWLockImpl::Read, query_id, acquire_timeout);
|
||||
|
||||
if (is_dropped || is_detached)
|
||||
if (is_dropped)
|
||||
{
|
||||
auto table_id = getStorageID();
|
||||
throw Exception(ErrorCodes::TABLE_IS_DROPPED, "Table {}.{} is dropped or detached", table_id.database_name, table_id.table_name);
|
||||
throw Exception(ErrorCodes::TABLE_IS_DROPPED, "Table {}.{} is dropped", table_id.database_name, table_id.table_name);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
@ -62,7 +62,7 @@ TableLockHolder IStorage::tryLockForShare(const String & query_id, const std::ch
|
||||
{
|
||||
TableLockHolder result = tryLockTimed(drop_lock, RWLockImpl::Read, query_id, acquire_timeout);
|
||||
|
||||
if (is_dropped || is_detached)
|
||||
if (is_dropped)
|
||||
{
|
||||
// Table was dropped while acquiring the lock
|
||||
result = nullptr;
|
||||
@ -81,7 +81,7 @@ IStorage::AlterLockHolder IStorage::lockForAlter(const std::chrono::milliseconds
|
||||
"Possible deadlock avoided. Client should retry.",
|
||||
getStorageID().getFullTableName(), acquire_timeout.count());
|
||||
|
||||
if (is_dropped || is_detached)
|
||||
if (is_dropped)
|
||||
throw Exception(ErrorCodes::TABLE_IS_DROPPED, "Table {} is dropped or detached", getStorageID());
|
||||
|
||||
return lock;
|
||||
@ -93,7 +93,7 @@ TableExclusiveLockHolder IStorage::lockExclusively(const String & query_id, cons
|
||||
TableExclusiveLockHolder result;
|
||||
result.drop_lock = tryLockTimed(drop_lock, RWLockImpl::Write, query_id, acquire_timeout);
|
||||
|
||||
if (is_dropped || is_detached)
|
||||
if (is_dropped)
|
||||
throw Exception(ErrorCodes::TABLE_IS_DROPPED, "Table {} is dropped or detached", getStorageID());
|
||||
|
||||
return result;
|
||||
|
@ -562,7 +562,6 @@ public:
|
||||
virtual void onActionLockRemove(StorageActionBlockType /* action_type */) {}
|
||||
|
||||
std::atomic<bool> is_dropped{false};
|
||||
std::atomic<bool> is_detached{false};
|
||||
|
||||
/// Does table support index for IN sections
|
||||
virtual bool supportsIndexForIn() const { return false; }
|
||||
|
@ -470,6 +470,7 @@ void StorageLiveView::drop()
|
||||
DatabaseCatalog::instance().removeViewDependency(select_table_id, table_id);
|
||||
|
||||
std::lock_guard lock(mutex);
|
||||
is_dropped = true;
|
||||
condition.notify_all();
|
||||
}
|
||||
|
||||
|
@ -3763,7 +3763,7 @@ String StorageReplicatedMergeTree::findReplicaHavingCoveringPart(
|
||||
void StorageReplicatedMergeTree::updateQuorum(const String & part_name, bool is_parallel)
|
||||
{
|
||||
if (is_parallel && format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Parallel quorum inserts are not compatible with deprecated *MergeTree-syntax");
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Parallel quorum inserts are not compatible with the deprecated syntax of *MergeTree engines");
|
||||
|
||||
auto zookeeper = getZooKeeper();
|
||||
|
||||
|
@ -311,11 +311,13 @@ class Backport:
|
||||
logging.info("Active releases: %s", ", ".join(self.release_branches))
|
||||
|
||||
def receive_prs_for_backport(self):
|
||||
# The commit is the oldest open release branch's merge-base
|
||||
since_commit = git_runner(
|
||||
f"git merge-base {self.remote}/{self.release_branches[0]} "
|
||||
f"{self.remote}/{self.default_branch}"
|
||||
# The commits in the oldest open release branch
|
||||
oldest_branch_commits = git_runner(
|
||||
"git log --no-merges --format=%H --reverse "
|
||||
f"{self.remote}/{self.default_branch}..{self.remote}/{self.release_branches[0]}"
|
||||
)
|
||||
# The first commit is the one we are looking for
|
||||
since_commit = oldest_branch_commits.split("\n", 1)[0]
|
||||
since_date = date.fromisoformat(
|
||||
git_runner.run(f"git log -1 --format=format:%cs {since_commit}")
|
||||
)
|
||||
|
@ -1,367 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
import argparse
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
from collections import namedtuple
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
|
||||
from artifactory import ArtifactorySaaSPath # type: ignore
|
||||
from build_download_helper import download_build_with_progress
|
||||
from env_helper import S3_ARTIFACT_DOWNLOAD_TEMPLATE, RUNNER_TEMP
|
||||
from git_helper import TAG_REGEXP, commit, removeprefix, removesuffix
|
||||
|
||||
|
||||
# Necessary ENV variables
|
||||
def getenv(name: str, default: Optional[str] = None) -> str:
|
||||
env = os.getenv(name, default)
|
||||
if env is not None:
|
||||
return env
|
||||
raise KeyError(f"Necessary {name} environment is not set")
|
||||
|
||||
|
||||
TEMP_PATH = os.path.join(RUNNER_TEMP, "push_to_artifactory")
|
||||
# One of the following ENVs is necessary
|
||||
JFROG_API_KEY = getenv("JFROG_API_KEY", "")
|
||||
JFROG_TOKEN = getenv("JFROG_TOKEN", "")
|
||||
|
||||
CheckDesc = namedtuple("CheckDesc", ("check_name", "deb_arch", "rpm_arch"))
|
||||
|
||||
|
||||
class Packages:
|
||||
checks = (
|
||||
CheckDesc("package_release", "amd64", "x86_64"),
|
||||
CheckDesc("package_aarch64", "arm64", "aarch64"),
|
||||
)
|
||||
packages = (
|
||||
"clickhouse-client",
|
||||
"clickhouse-common-static",
|
||||
"clickhouse-common-static-dbg",
|
||||
"clickhouse-server",
|
||||
)
|
||||
|
||||
def __init__(self, version: str):
|
||||
# Dicts of name: s3_path_suffix
|
||||
self.deb = {} # type: Dict[str, str]
|
||||
self.rpm = {} # type: Dict[str, str]
|
||||
self.tgz = {} # type: Dict[str, str]
|
||||
for check in self.checks:
|
||||
for name in self.packages:
|
||||
deb = f"{name}_{version}_{check.deb_arch}.deb"
|
||||
self.deb[deb] = f"{check.check_name}/{deb}"
|
||||
|
||||
rpm = f"{name}-{version}.{check.rpm_arch}.rpm"
|
||||
self.rpm[rpm] = f"{check.check_name}/{rpm}"
|
||||
|
||||
tgz = f"{name}-{version}-{check.deb_arch}.tgz"
|
||||
self.tgz[tgz] = f"{check.check_name}/{tgz}"
|
||||
|
||||
def arch(self, deb_pkg: str) -> str:
|
||||
if deb_pkg not in self.deb:
|
||||
raise ValueError(f"{deb_pkg} not in {self.deb}")
|
||||
return removesuffix(deb_pkg, ".deb").split("_")[-1]
|
||||
|
||||
def replace_with_fallback(self, name: str) -> None:
|
||||
if name.endswith(".deb"):
|
||||
suffix = self.deb.pop(name)
|
||||
self.deb[self.fallback_to_all(name)] = self.fallback_to_all(suffix)
|
||||
elif name.endswith(".rpm"):
|
||||
suffix = self.rpm.pop(name)
|
||||
self.rpm[self.fallback_to_all(name)] = self.fallback_to_all(suffix)
|
||||
elif name.endswith(".tgz"):
|
||||
suffix = self.tgz.pop(name)
|
||||
self.tgz[self.fallback_to_all(name)] = self.fallback_to_all(suffix)
|
||||
else:
|
||||
raise KeyError(f"unknown package type for {name}")
|
||||
|
||||
@staticmethod
|
||||
def path(package_file: str) -> str:
|
||||
return os.path.join(TEMP_PATH, package_file)
|
||||
|
||||
@staticmethod
|
||||
def fallback_to_all(url_or_name: str) -> str:
|
||||
"""Until July 2022 we had clickhouse-server and clickhouse-client with
|
||||
arch 'all'"""
|
||||
# deb
|
||||
if url_or_name.endswith("amd64.deb") or url_or_name.endswith("arm64.deb"):
|
||||
return f"{url_or_name[:-9]}all.deb"
|
||||
# rpm
|
||||
if url_or_name.endswith("x86_64.rpm") or url_or_name.endswith("aarch64.rpm"):
|
||||
new = removesuffix(removesuffix(url_or_name, "x86_64.rpm"), "aarch64.rpm")
|
||||
return f"{new}noarch.rpm"
|
||||
# tgz
|
||||
if url_or_name.endswith("-amd64.tgz") or url_or_name.endswith("-arm64.tgz"):
|
||||
return f"{url_or_name[:-10]}.tgz"
|
||||
return url_or_name
|
||||
|
||||
|
||||
class S3:
|
||||
def __init__(
|
||||
self,
|
||||
pr: int,
|
||||
commit: str,
|
||||
version: str,
|
||||
force_download: bool,
|
||||
):
|
||||
self._common = dict(
|
||||
pr_or_release=pr,
|
||||
commit=commit,
|
||||
)
|
||||
self.force_download = force_download
|
||||
self.packages = Packages(version)
|
||||
|
||||
def download_package(self, package_file: str, s3_path_suffix: str) -> None:
|
||||
path = Packages.path(package_file)
|
||||
fallback_path = Packages.fallback_to_all(path)
|
||||
if not self.force_download and (
|
||||
os.path.exists(path) or os.path.exists(fallback_path)
|
||||
):
|
||||
if os.path.exists(fallback_path):
|
||||
self.packages.replace_with_fallback(package_file)
|
||||
|
||||
return
|
||||
build_name, artifact = s3_path_suffix.split("/")
|
||||
url = S3_ARTIFACT_DOWNLOAD_TEMPLATE.format_map(
|
||||
{**self._common, "build_name": build_name, "artifact": artifact}
|
||||
)
|
||||
try:
|
||||
download_build_with_progress(url, path)
|
||||
except Exception as e:
|
||||
if "Cannot download dataset from" in e.args[0]:
|
||||
new_url = Packages.fallback_to_all(url)
|
||||
logging.warning(
|
||||
"Fallback downloading %s for old release", fallback_path
|
||||
)
|
||||
download_build_with_progress(new_url, fallback_path)
|
||||
self.packages.replace_with_fallback(package_file)
|
||||
|
||||
def download_deb(self):
|
||||
# Copy to have a way to pop/add fallback packages
|
||||
packages = self.packages.deb.copy()
|
||||
for package_file, s3_path_suffix in packages.items():
|
||||
self.download_package(package_file, s3_path_suffix)
|
||||
|
||||
def download_rpm(self):
|
||||
# Copy to have a way to pop/add fallback packages
|
||||
packages = self.packages.rpm.copy()
|
||||
for package_file, s3_path_suffix in packages.items():
|
||||
self.download_package(package_file, s3_path_suffix)
|
||||
|
||||
def download_tgz(self):
|
||||
# Copy to have a way to pop/add fallback packages
|
||||
packages = self.packages.tgz.copy()
|
||||
for package_file, s3_path_suffix in packages.items():
|
||||
self.download_package(package_file, s3_path_suffix)
|
||||
|
||||
|
||||
class Release:
|
||||
def __init__(self, name: str):
|
||||
r = re.compile(TAG_REGEXP)
|
||||
# Automatically remove refs/tags/ if full refname passed here
|
||||
name = removeprefix(name, "refs/tags/")
|
||||
if not r.match(name):
|
||||
raise argparse.ArgumentTypeError(
|
||||
f"release name {name} does not match "
|
||||
"v12.1.2.15-(testing|prestable|stable|lts) pattern"
|
||||
)
|
||||
self._name = name
|
||||
self._version = removeprefix(self._name, "v")
|
||||
self._version = self.version.split("-")[0]
|
||||
self._version_parts = tuple(self.version.split("."))
|
||||
self._type = self._name.split("-")[-1]
|
||||
|
||||
@property
|
||||
def version(self) -> str:
|
||||
return self._version
|
||||
|
||||
@property
|
||||
def version_parts(self) -> Tuple[str, ...]:
|
||||
return self._version_parts
|
||||
|
||||
@property
|
||||
def type(self) -> str:
|
||||
return self._type
|
||||
|
||||
|
||||
class Artifactory:
|
||||
def __init__(
|
||||
self,
|
||||
url: str,
|
||||
release: str,
|
||||
deb_repo: str = "deb",
|
||||
rpm_repo: str = "rpm",
|
||||
tgz_repo: str = "tgz",
|
||||
):
|
||||
self._url = url
|
||||
self._release = release
|
||||
self._deb_url = "/".join((self._url, deb_repo, "pool", self._release)) + "/"
|
||||
self._rpm_url = "/".join((self._url, rpm_repo, self._release)) + "/"
|
||||
self._tgz_url = "/".join((self._url, tgz_repo, self._release)) + "/"
|
||||
# check the credentials ENVs for early exit
|
||||
self.__path_helper("_deb", "")
|
||||
|
||||
def deploy_deb(self, packages: Packages) -> None:
|
||||
for package_file in packages.deb:
|
||||
path = packages.path(package_file)
|
||||
dist = self._release
|
||||
comp = "main"
|
||||
arch = packages.arch(package_file)
|
||||
logging.info(
|
||||
"Deploy %s(distribution=%s;component=%s;architecture=%s) "
|
||||
"to artifactory",
|
||||
path,
|
||||
dist,
|
||||
comp,
|
||||
arch,
|
||||
)
|
||||
self.deb_path(package_file).deploy_deb(path, dist, comp, arch)
|
||||
|
||||
def deploy_rpm(self, packages: Packages) -> None:
|
||||
for package_file in packages.rpm:
|
||||
path = packages.path(package_file)
|
||||
logging.info("Deploy %s to artifactory", path)
|
||||
self.rpm_path(package_file).deploy_file(path)
|
||||
|
||||
def deploy_tgz(self, packages: Packages) -> None:
|
||||
for package_file in packages.tgz:
|
||||
path = packages.path(package_file)
|
||||
logging.info("Deploy %s to artifactory", path)
|
||||
self.tgz_path(package_file).deploy_file(path)
|
||||
|
||||
def __path_helper(self, name: str, package_file: str) -> ArtifactorySaaSPath:
|
||||
url = "/".join((getattr(self, name + "_url"), package_file))
|
||||
path = None
|
||||
if JFROG_API_KEY:
|
||||
path = ArtifactorySaaSPath(url, apikey=JFROG_API_KEY)
|
||||
elif JFROG_TOKEN:
|
||||
path = ArtifactorySaaSPath(url, token=JFROG_TOKEN)
|
||||
else:
|
||||
raise KeyError("Neither JFROG_API_KEY nor JFROG_TOKEN env are defined")
|
||||
return path
|
||||
|
||||
def deb_path(self, package_file: str) -> ArtifactorySaaSPath:
|
||||
return self.__path_helper("_deb", package_file)
|
||||
|
||||
def rpm_path(self, package_file: str) -> ArtifactorySaaSPath:
|
||||
return self.__path_helper("_rpm", package_file)
|
||||
|
||||
def tgz_path(self, package_file: str) -> ArtifactorySaaSPath:
|
||||
return self.__path_helper("_tgz", package_file)
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(
|
||||
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
|
||||
description="Program to download artifacts from S3 and push them to "
|
||||
"artifactory. ENV variables JFROG_API_KEY and JFROG_TOKEN are used "
|
||||
"for authentication in the given order",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--release",
|
||||
required=True,
|
||||
type=Release,
|
||||
help="release name, e.g. v12.13.14.15-prestable; 'refs/tags/' "
|
||||
"prefix is striped automatically",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--pull-request",
|
||||
type=int,
|
||||
default=0,
|
||||
help="pull request number; if PR is omitted, the first two numbers "
|
||||
"from release will be used, e.g. 12.11",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--commit", required=True, type=commit, help="commit hash for S3 bucket"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--all", action="store_true", help="implies all deb, rpm and tgz"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--deb", action="store_true", help="if Debian packages should be processed"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--rpm", action="store_true", help="if RPM packages should be processed"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--tgz",
|
||||
action="store_true",
|
||||
help="if tgz archives should be processed. They aren't pushed to artifactory",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--artifactory-url",
|
||||
default="https://clickhousedb.jfrog.io/artifactory",
|
||||
help="SaaS Artifactory url",
|
||||
)
|
||||
parser.add_argument("--artifactory", default=True, help=argparse.SUPPRESS)
|
||||
parser.add_argument(
|
||||
"-n",
|
||||
"--no-artifactory",
|
||||
action="store_false",
|
||||
dest="artifactory",
|
||||
default=argparse.SUPPRESS,
|
||||
help="do not push packages to artifactory",
|
||||
)
|
||||
parser.add_argument("--force-download", default=True, help=argparse.SUPPRESS)
|
||||
parser.add_argument(
|
||||
"--no-force-download",
|
||||
action="store_false",
|
||||
dest="force_download",
|
||||
default=argparse.SUPPRESS,
|
||||
help="do not download packages again if they exist already",
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
if args.all:
|
||||
args.deb = args.rpm = args.tgz = True
|
||||
if not (args.deb or args.rpm or args.tgz):
|
||||
parser.error("at least one of --deb, --rpm or --tgz should be specified")
|
||||
if args.pull_request == 0:
|
||||
args.pull_request = ".".join(args.release.version_parts[:2])
|
||||
return args
|
||||
|
||||
|
||||
def process_deb(s3: S3, art_clients: List[Artifactory]) -> None:
|
||||
s3.download_deb()
|
||||
for art_client in art_clients:
|
||||
art_client.deploy_deb(s3.packages)
|
||||
|
||||
|
||||
def process_rpm(s3: S3, art_clients: List[Artifactory]) -> None:
|
||||
s3.download_rpm()
|
||||
for art_client in art_clients:
|
||||
art_client.deploy_rpm(s3.packages)
|
||||
|
||||
|
||||
def process_tgz(s3: S3, art_clients: List[Artifactory]) -> None:
|
||||
s3.download_tgz()
|
||||
for art_client in art_clients:
|
||||
art_client.deploy_tgz(s3.packages)
|
||||
|
||||
|
||||
def main():
|
||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
|
||||
args = parse_args()
|
||||
os.makedirs(TEMP_PATH, exist_ok=True)
|
||||
s3 = S3(
|
||||
args.pull_request,
|
||||
args.commit,
|
||||
args.release.version,
|
||||
args.force_download,
|
||||
)
|
||||
art_clients = []
|
||||
if args.artifactory:
|
||||
art_clients.append(Artifactory(args.artifactory_url, args.release.type))
|
||||
if args.release.type == "lts":
|
||||
art_clients.append(Artifactory(args.artifactory_url, "stable"))
|
||||
|
||||
if args.deb:
|
||||
process_deb(s3, art_clients)
|
||||
if args.rpm:
|
||||
process_rpm(s3, art_clients)
|
||||
if args.tgz:
|
||||
process_tgz(s3, art_clients)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
@ -29,11 +29,13 @@ class TeePopen:
|
||||
self.env = env or os.environ.copy()
|
||||
self._process = None # type: Optional[Popen]
|
||||
self.timeout = timeout
|
||||
self.timeout_exceeded = False
|
||||
|
||||
def _check_timeout(self) -> None:
|
||||
if self.timeout is None:
|
||||
return
|
||||
sleep(self.timeout)
|
||||
self.timeout_exceeded = True
|
||||
while self.process.poll() is None:
|
||||
logging.warning(
|
||||
"Killing process %s, timeout %s exceeded",
|
||||
@ -62,6 +64,16 @@ class TeePopen:
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
self.wait()
|
||||
if self.timeout_exceeded:
|
||||
exceeded_log = (
|
||||
f"Command `{self.command}` has failed, "
|
||||
f"timeout {self.timeout}s is exceeded"
|
||||
)
|
||||
if self.process.stdout is not None:
|
||||
sys.stdout.write(exceeded_log)
|
||||
|
||||
self.log_file.write(exceeded_log)
|
||||
|
||||
self.log_file.close()
|
||||
|
||||
def wait(self) -> int:
|
||||
|
@ -96,7 +96,7 @@ def test_merge_simple(started_cluster, replicated):
|
||||
|
||||
# Wait for OPTIMIZE to actually start
|
||||
assert_eq_with_retry(
|
||||
node1,
|
||||
node_check,
|
||||
f"select count() from system.merges where table='{table_name}'",
|
||||
"1\n",
|
||||
retry_count=30,
|
||||
@ -196,7 +196,7 @@ def test_mutation_simple(started_cluster, replicated):
|
||||
|
||||
# Wait for the mutation to actually start
|
||||
assert_eq_with_retry(
|
||||
node1,
|
||||
node_check,
|
||||
f"select count() from system.merges where table='{table_name}'",
|
||||
"1\n",
|
||||
retry_count=30,
|
||||
|
@ -30,6 +30,8 @@ SELECT t1.key, t1.key2 FROM t1 INNER ALL JOIN t2 ON t1.id == t2.id AND t2.key ==
|
||||
|
||||
SELECT '--';
|
||||
SELECT t1.key FROM t1 INNER ANY JOIN t2 ON t1.id == t2.id AND t2.key == t2.key2 AND t1.key == t1.key2;
|
||||
SELECT t1.key FROM t1 INNER ANY JOIN t2 ON t1.id == t2.id AND t2.key == t2.key2 AND t1.key == t1.key2 AND 0; -- { serverError INVALID_JOIN_ON_EXPRESSION }
|
||||
SELECT t1.key FROM t1 INNER ANY JOIN t2 ON t1.id == t2.id AND t2.key == t2.key2 AND t1.key == t1.key2 AND 1; -- { serverError INVALID_JOIN_ON_EXPRESSION }
|
||||
|
||||
SELECT '--';
|
||||
SELECT '333' = t1.key FROM t1 INNER ANY JOIN t2 ON t1.id == t2.id AND t2.key == t2.key2 AND t1.key == t1.key2 AND t2.id > 2;
|
||||
|
@ -33,10 +33,10 @@ echo "[\"255.255.255.255trash\"]" > $DATA_FILE
|
||||
$CLICKHOUSE_CLIENT -q "SELECT * FROM file('data_02118', 'JSONCompactStringsEachRow', 'x IPv4')" 2>&1 | grep -F -q "UNEXPECTED_DATA_AFTER_PARSED_VALUE" && echo 'OK' || echo 'FAIL'
|
||||
|
||||
echo "255.255.255.255trash" > $DATA_FILE
|
||||
$CLICKHOUSE_CLIENT -q "SELECT * FROM file('data_02118', 'TSV', 'x IPv4')" 2>&1 | grep -F -q "UNEXPECTED_DATA_AFTER_PARSED_VALUE" && echo 'OK' || echo 'FAIL'
|
||||
$CLICKHOUSE_CLIENT -q "SELECT * FROM file('data_02118', 'TSV', 'x IPv4')" 2>&1 | grep -F -q "CANNOT_PARSE_INPUT_ASSERTION_FAILED" && echo 'OK' || echo 'FAIL'
|
||||
|
||||
echo "255.255.255.255trash" > $DATA_FILE
|
||||
$CLICKHOUSE_CLIENT -q "SELECT * FROM file('data_02118', 'CSV', 'x IPv4')" 2>&1 | grep -F -q "UNEXPECTED_DATA_AFTER_PARSED_VALUE" && echo 'OK' || echo 'FAIL'
|
||||
$CLICKHOUSE_CLIENT -q "SELECT * FROM file('data_02118', 'CSV', 'x IPv4')" 2>&1 | grep -F -q "INCORRECT_DATA" && echo 'OK' || echo 'FAIL'
|
||||
|
||||
echo "[\"255.255.255.255trash\"]" > $DATA_FILE
|
||||
$CLICKHOUSE_CLIENT -q "SELECT * FROM file('data_02118', 'JSONCompactEachRow', 'x IPv4')" 2>&1 | grep -F -q "UNEXPECTED_DATA_AFTER_PARSED_VALUE" && echo 'OK' || echo 'FAIL'
|
||||
@ -45,10 +45,10 @@ echo "[\"0000:0000:0000:0000:0000:ffff:192.168.100.228b1trash\"]" > $DATA_FILE
|
||||
$CLICKHOUSE_CLIENT -q "SELECT * FROM file('data_02118', 'JSONCompactStringsEachRow', 'x IPv6')" 2>&1 | grep -F -q "UNEXPECTED_DATA_AFTER_PARSED_VALUE" && echo 'OK' || echo 'FAIL'
|
||||
|
||||
echo "0000:0000:0000:0000:0000:ffff:192.168.100.228b1trash" > $DATA_FILE
|
||||
$CLICKHOUSE_CLIENT -q "SELECT * FROM file('data_02118', 'TSV', 'x IPv6')" 2>&1 | grep -F -q "UNEXPECTED_DATA_AFTER_PARSED_VALUE" && echo 'OK' || echo 'FAIL'
|
||||
$CLICKHOUSE_CLIENT -q "SELECT * FROM file('data_02118', 'TSV', 'x IPv6')" 2>&1 | grep -F -q "CANNOT_PARSE_INPUT_ASSERTION_FAILED" && echo 'OK' || echo 'FAIL'
|
||||
|
||||
echo "0000:0000:0000:0000:0000:ffff:192.168.100.228b1trash" > $DATA_FILE
|
||||
$CLICKHOUSE_CLIENT -q "SELECT * FROM file('data_02118', 'CSV', 'x IPv6')" 2>&1 | grep -F -q "UNEXPECTED_DATA_AFTER_PARSED_VALUE" && echo 'OK' || echo 'FAIL'
|
||||
$CLICKHOUSE_CLIENT -q "SELECT * FROM file('data_02118', 'CSV', 'x IPv6')" 2>&1 | grep -F -q "INCORRECT_DATA" && echo 'OK' || echo 'FAIL'
|
||||
|
||||
echo "[\"0000:0000:0000:0000:0000:ffff:192.168.100.228b1trash\"]" > $DATA_FILE
|
||||
$CLICKHOUSE_CLIENT -q "SELECT * FROM file('data_02118', 'JSONCompactEachRow', 'x IPv6')" 2>&1 | grep -F -q "UNEXPECTED_DATA_AFTER_PARSED_VALUE" && echo 'OK' || echo 'FAIL'
|
||||
|
20
tests/queries/0_stateless/02531_ipv4_arithmetic.reference
Normal file
20
tests/queries/0_stateless/02531_ipv4_arithmetic.reference
Normal file
@ -0,0 +1,20 @@
|
||||
10 1.2.3.4 0
|
||||
11 1.2.3.4 3
|
||||
12 1.2.3.4 4
|
||||
13 1.2.3.4 12
|
||||
14 1.2.3.4 0
|
||||
15 1.2.3.4 10
|
||||
16 1.2.3.4 4
|
||||
17 1.2.3.4 10
|
||||
18 1.2.3.4 4
|
||||
19 1.2.3.4 10
|
||||
20 1.2.3.4 0
|
||||
21 1.2.3.4 7
|
||||
22 1.2.3.4 14
|
||||
23 1.2.3.4 12
|
||||
24 1.2.3.4 4
|
||||
25 1.2.3.4 10
|
||||
26 1.2.3.4 12
|
||||
27 1.2.3.4 13
|
||||
28 1.2.3.4 0
|
||||
29 1.2.3.4 1
|
1
tests/queries/0_stateless/02531_ipv4_arithmetic.sql
Normal file
1
tests/queries/0_stateless/02531_ipv4_arithmetic.sql
Normal file
@ -0,0 +1 @@
|
||||
SELECT number, ip, ip % number FROM (SELECT number, toIPv4('1.2.3.4') as ip FROM numbers(10, 20));
|
@ -0,0 +1,38 @@
|
||||
-- { echoOn }
|
||||
|
||||
SELECT * FROM test1 LEFT JOIN test2 ON test1.col1 = test2.col1
|
||||
WHERE test2.col1 IS NULL
|
||||
ORDER BY test2.col1
|
||||
;
|
||||
12321 -30 \N \N
|
||||
SELECT * FROM test2 RIGHT JOIN test1 ON test2.col1 = test1.col1
|
||||
WHERE test2.col1 IS NULL
|
||||
ORDER BY test2.col1
|
||||
;
|
||||
\N \N 12321 -30
|
||||
SELECT * FROM test1 LEFT JOIN test2 ON test1.col1 = test2.col1
|
||||
WHERE test2.col1 IS NOT NULL
|
||||
ORDER BY test2.col1
|
||||
;
|
||||
123 123 123 5600
|
||||
321 -32 321 5601
|
||||
SELECT * FROM test2 RIGHT JOIN test1 ON test2.col1 = test1.col1
|
||||
WHERE test2.col1 IS NOT NULL
|
||||
ORDER BY test2.col1
|
||||
;
|
||||
123 5600 123 123
|
||||
321 5601 321 -32
|
||||
SELECT test2.col1, test1.* FROM test2 RIGHT JOIN test1 ON test2.col1 = test1.col1
|
||||
WHERE test2.col1 IS NOT NULL
|
||||
ORDER BY test2.col1
|
||||
;
|
||||
123 123 123
|
||||
321 321 -32
|
||||
SELECT test2.col3, test1.* FROM test2 RIGHT JOIN test1 ON test2.col1 = test1.col1
|
||||
WHERE test2.col1 IS NOT NULL
|
||||
ORDER BY test2.col1
|
||||
;
|
||||
5600 123 123
|
||||
5601 321 -32
|
||||
DROP TABLE IF EXISTS test1;
|
||||
DROP TABLE IF EXISTS test2;
|
46
tests/queries/0_stateless/02534_join_prewhere_bug_44062.sql
Normal file
46
tests/queries/0_stateless/02534_join_prewhere_bug_44062.sql
Normal file
@ -0,0 +1,46 @@
|
||||
|
||||
DROP TABLE IF EXISTS test1;
|
||||
DROP TABLE IF EXISTS test2;
|
||||
|
||||
CREATE TABLE test1 ( `col1` UInt64, `col2` Int8 ) ENGINE = MergeTree ORDER BY col1;
|
||||
CREATE TABLE test2 ( `col1` UInt64, `col3` Int16 ) ENGINE = MergeTree ORDER BY col1;
|
||||
|
||||
INSERT INTO test1 VALUES (123, 123), (12321, -30), (321, -32);
|
||||
INSERT INTO test2 VALUES (123, 5600), (321, 5601);
|
||||
|
||||
SET join_use_nulls = 1;
|
||||
|
||||
-- { echoOn }
|
||||
|
||||
SELECT * FROM test1 LEFT JOIN test2 ON test1.col1 = test2.col1
|
||||
WHERE test2.col1 IS NULL
|
||||
ORDER BY test2.col1
|
||||
;
|
||||
|
||||
SELECT * FROM test2 RIGHT JOIN test1 ON test2.col1 = test1.col1
|
||||
WHERE test2.col1 IS NULL
|
||||
ORDER BY test2.col1
|
||||
;
|
||||
|
||||
SELECT * FROM test1 LEFT JOIN test2 ON test1.col1 = test2.col1
|
||||
WHERE test2.col1 IS NOT NULL
|
||||
ORDER BY test2.col1
|
||||
;
|
||||
|
||||
SELECT * FROM test2 RIGHT JOIN test1 ON test2.col1 = test1.col1
|
||||
WHERE test2.col1 IS NOT NULL
|
||||
ORDER BY test2.col1
|
||||
;
|
||||
|
||||
SELECT test2.col1, test1.* FROM test2 RIGHT JOIN test1 ON test2.col1 = test1.col1
|
||||
WHERE test2.col1 IS NOT NULL
|
||||
ORDER BY test2.col1
|
||||
;
|
||||
|
||||
SELECT test2.col3, test1.* FROM test2 RIGHT JOIN test1 ON test2.col1 = test1.col1
|
||||
WHERE test2.col1 IS NOT NULL
|
||||
ORDER BY test2.col1
|
||||
;
|
||||
|
||||
DROP TABLE IF EXISTS test1;
|
||||
DROP TABLE IF EXISTS test2;
|
@ -0,0 +1,3 @@
|
||||
::1 42
|
||||
::1 42
|
||||
::1 42
|
3
tests/queries/0_stateless/02535_ip_parser_not_whole.sql
Normal file
3
tests/queries/0_stateless/02535_ip_parser_not_whole.sql
Normal file
@ -0,0 +1,3 @@
|
||||
SELECT * FROM format(CSVWithNamesAndTypes, 'ip,port\nIPv6,UInt16\n::1,42\n');
|
||||
SELECT * FROM format(TSVWithNamesAndTypes, 'ip\tport\nIPv6\tUInt16\n::1\t42\n');
|
||||
SELECT * FROM format(JSONCompactEachRowWithNamesAndTypes, '["ip","port"]\n["IPv6","UInt16"]\n["::1",42]\n');
|
@ -1,4 +1,4 @@
|
||||
{% for join_algorithm in ['hash', 'parallel_hash', 'full_sorting_merge'] -%}
|
||||
{% for join_algorithm in ['hash', 'parallel_hash', 'full_sorting_merge', 'grace_hash'] -%}
|
||||
--- {{ join_algorithm }} ---
|
||||
2014-03-17 1406958 265108
|
||||
2014-03-19 1405797 261624
|
||||
|
@ -1,6 +1,7 @@
|
||||
{% for join_algorithm in ['hash', 'parallel_hash', 'full_sorting_merge'] -%}
|
||||
{% for join_algorithm in ['hash', 'parallel_hash', 'full_sorting_merge', 'grace_hash'] -%}
|
||||
|
||||
SET max_bytes_in_join = '{% if join_algorithm == 'grace_hash' %}20K{% else %}0{% endif %}';
|
||||
SET max_rows_in_join = '{% if join_algorithm == 'grace_hash' %}10K{% else %}0{% endif %}';
|
||||
SET grace_hash_join_initial_buckets = 4;
|
||||
|
||||
SELECT '--- {{ join_algorithm }} ---';
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user