Merge branch 'master' into s3_file_not_found

This commit is contained in:
chen 2023-02-03 12:29:17 +08:00 committed by GitHub
commit 07a6de1713
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 912 additions and 295 deletions

View File

@ -48,6 +48,7 @@ RUN apt-get update \
gdb \
git \
gperf \
libclang-rt-${LLVM_VERSION}-dev \
lld-${LLVM_VERSION} \
llvm-${LLVM_VERSION} \
llvm-${LLVM_VERSION}-dev \

View File

@ -2,10 +2,10 @@
slug: /en/engines/table-engines/mergetree-family/invertedindexes
sidebar_label: Inverted Indexes
description: Quickly find search terms in text.
keywords: [full-text search, text search]
keywords: [full-text search, text search, inverted, index, indices]
---
# Inverted indexes [experimental]
# Full-text Search using Inverted Indexes [experimental]
Inverted indexes are an experimental type of [secondary indexes](/docs/en/engines/table-engines/mergetree-family/mergetree.md/#available-types-of-indices) which provide fast text search
capabilities for [String](/docs/en/sql-reference/data-types/string.md) or [FixedString](/docs/en/sql-reference/data-types/fixedstring.md)
@ -13,7 +13,7 @@ columns. The main idea of an inverted index is to store a mapping from "terms" t
tokenized cells of the string column. For example, the string cell "I will be a little late" is by default tokenized into six terms "I", "will",
"be", "a", "little" and "late". Another kind of tokenizer is n-grams. For example, the result of 3-gram tokenization will be 21 terms "I w",
" wi", "wil", "ill", "ll ", "l b", " be" etc. The more fine-granular the input strings are tokenized, the bigger but also the more
useful the resulting inverted index will be.
useful the resulting inverted index will be.
:::warning
Inverted indexes are experimental and should not be used in production environments yet. They may change in the future in backward-incompatible
@ -50,7 +50,7 @@ Being a type of skipping index, inverted indexes can be dropped or added to a co
``` sql
ALTER TABLE tab DROP INDEX inv_idx;
ALTER TABLE tab ADD INDEX inv_idx(s) TYPE inverted(2) GRANULARITY 1;
ALTER TABLE tab ADD INDEX inv_idx(s) TYPE inverted(2);
```
To use the index, no special functions or syntax are required. Typical string search predicates automatically leverage the index. As
@ -74,7 +74,106 @@ controls the amount of data read consumed from the underlying column before a ne
intermediate memory consumption for index construction but also improves lookup performance since fewer segments need to be checked on
average to evaluate a query.
## Full-text search of the Hacker News dataset
Let's look at the performance improvements of inverted indexes on a large dataset with lots of text. We will use 28.7M rows of comments on the popular Hacker News website. Here is the table without an inverted index:
```sql
CREATE TABLE hackernews (
id UInt64,
deleted UInt8,
type String,
author String,
timestamp DateTime,
comment String,
dead UInt8,
parent UInt64,
poll UInt64,
children Array(UInt32),
url String,
score UInt32,
title String,
parts Array(UInt32),
descendants UInt32
)
ENGINE = MergeTree
ORDER BY (type, author);
```
The 28.7M rows are in a Parquet file in S3 - let's insert them into the `hackernews` table:
```sql
INSERT INTO hackernews
SELECT * FROM s3Cluster(
'default',
'https://datasets-documentation.s3.eu-west-3.amazonaws.com/hackernews/hacknernews.parquet',
'Parquet',
'
id UInt64,
deleted UInt8,
type String,
by String,
time DateTime,
text String,
dead UInt8,
parent UInt64,
poll UInt64,
kids Array(UInt32),
url String,
score UInt32,
title String,
parts Array(UInt32),
descendants UInt32');
```
Consider the following simple search for the term `ClickHouse` (and its varied upper and lower cases) in the `comment` column:
```sql
SELECT count()
FROM hackernews
WHERE hasToken(lower(comment), 'clickhouse');
```
Notice it takes 3 seconds to execute the query:
```response
┌─count()─┐
│ 1145 │
└─────────┘
1 row in set. Elapsed: 3.001 sec. Processed 28.74 million rows, 9.75 GB (9.58 million rows/s., 3.25 GB/s.)
```
We will use `ALTER TABLE` and add an inverted index on the lowercase of the `comment` column, then materialize it (which can take a while - wait for it to materialize):
```sql
ALTER TABLE hackernews
ADD INDEX comment_lowercase(lower(comment)) TYPE inverted;
ALTER TABLE hackernews MATERIALIZE INDEX comment_lowercase;
```
We run the same query...
```sql
SELECT count()
FROM hackernews
WHERE hasToken(lower(comment), 'clickhouse')
```
...and notice the query executes 4x faster:
```response
┌─count()─┐
│ 1145 │
└─────────┘
1 row in set. Elapsed: 0.747 sec. Processed 4.49 million rows, 1.77 GB (6.01 million rows/s., 2.37 GB/s.)
```
:::note
Unlike other secondary indices, inverted indexes (for now) map to row numbers (row ids) instead of granule ids. The reason for this design
is performance. In practice, users often search for multiple terms at once. For example, filter predicate `WHERE s LIKE '%little%' OR s LIKE
'%big%'` can be evaluated directly using an inverted index by forming the union of the row id lists for terms "little" and "big". This also
means that the parameter `GRANULARITY` supplied to index creation has no meaning (it may be removed from the syntax in the future).
:::

View File

@ -22,6 +22,6 @@ Additional cache types:
- [Dictionaries](../sql-reference/dictionaries/index.md) data cache.
- Schema inference cache.
- [Filesystem cache](storing-data.md) over S3, Azure, Local and other disks.
- [(Experimental) Query result cache](query-result-cache.md).
- [(Experimental) Query cache](query-cache.md).
To drop one of the caches, use [SYSTEM DROP ... CACHE](../sql-reference/statements/system.md#drop-mark-cache) statements.

View File

@ -1,112 +0,0 @@
---
slug: /en/operations/query-result-cache
sidebar_position: 65
sidebar_label: Query Result Cache [experimental]
---
# Query Result Cache [experimental]
The query result cache allows to compute `SELECT` queries just once and to serve further executions of the same query directly from the
cache. Depending on the type of the queries, this can dramatically reduce latency and resource consumption of the ClickHouse server.
## Background, Design and Limitations
Query result caches can generally be viewed as transactionally consistent or inconsistent.
- In transactionally consistent caches, the database invalidates (discards) cached query results if the result of the `SELECT` query changes
or potentially changes. In ClickHouse, operations which change the data include inserts/updates/deletes in/of/from tables or collapsing
merges. Transactionally consistent caching is especially suitable for OLTP databases, for example
[MySQL](https://dev.mysql.com/doc/refman/5.6/en/query-cache.html) (which removed query result cache after v8.0) and
[Oracle](https://docs.oracle.com/database/121/TGDBA/tune_result_cache.htm).
- In transactionally inconsistent caches, slight inaccuracies in query results are accepted under the assumption that all cache entries are
assigned a validity period after which they expire (e.g. 1 minute) and that the underlying data changes only little during this period.
This approach is overall more suitable for OLAP databases. As an example where transactionally inconsistent caching is sufficient,
consider an hourly sales report in a reporting tool which is simultaneously accessed by multiple users. Sales data changes typically
slowly enough that the database only needs to compute the report once (represented by the first `SELECT` query). Further queries can be
served directly from the query result cache. In this example, a reasonable validity period could be 30 min.
Transactionally inconsistent caching is traditionally provided by client tools or proxy packages interacting with the database. As a result,
the same caching logic and configuration is often duplicated. With ClickHouse's query result cache, the caching logic moves to the server
side. This reduces maintenance effort and avoids redundancy.
:::warning
The query result cache is an experimental feature that should not be used in production. There are known cases (e.g. in distributed query
processing) where wrong results are returned.
:::
## Configuration Settings and Usage
As long as the result cache is experimental it must be activated using the following configuration setting:
```sql
SET allow_experimental_query_result_cache = true;
```
Afterwards, setting [use_query_result_cache](settings/settings.md#use-query-result-cache) can be used to control whether a specific query or
all queries of the current session should utilize the query result cache. For example, the first execution of query
```sql
SELECT some_expensive_calculation(column_1, column_2)
FROM table
SETTINGS use_query_result_cache = true;
```
will store the query result in the query result cache. Subsequent executions of the same query (also with parameter `use_query_result_cache
= true`) will read the computed result from the cache and return it immediately.
The way the cache is utilized can be configured in more detail using settings [enable_writes_to_query_result_cache](settings/settings.md#enable-writes-to-query-result-cache)
and [enable_reads_from_query_result_cache](settings/settings.md#enable-reads-from-query-result-cache) (both `true` by default). The first
settings controls whether query results are stored in the cache, whereas the second parameter determines if the database should try to
retrieve query results from the cache. For example, the following query will use the cache only passively, i.e. attempt to read from it but
not store its result in it:
```sql
SELECT some_expensive_calculation(column_1, column_2)
FROM table
SETTINGS use_query_result_cache = true, enable_writes_to_query_result_cache = false;
```
For maximum control, it is generally recommended to provide settings "use_query_result_cache", "enable_writes_to_query_result_cache" and
"enable_reads_from_query_result_cache" only with specific queries. It is also possible to enable caching at user or profile level (e.g. via
`SET use_query_result_cache = true`) but one should keep in mind that all `SELECT` queries including monitoring or debugging queries to
system tables may return cached results then.
The query result cache can be cleared using statement `SYSTEM DROP QUERY RESULT CACHE`. The content of the query result cache is displayed
in system table `SYSTEM.QUERY_RESULT_CACHE`. The number of query result cache hits and misses are shown as events "QueryCacheHits" and
"QueryCacheMisses" in system table `SYSTEM.EVENTS`. Both counters are only updated for `SELECT` queries which run with setting
"use_query_result_cache = true". Other queries do not affect the cache miss counter.
The query result cache exists once per ClickHouse server process. However, cache results are by default not shared between users. This can
be changed (see below) but doing so is not recommended for security reasons.
Query results are referenced in the query result cache by the [Abstract Syntax Tree (AST)](https://en.wikipedia.org/wiki/Abstract_syntax_tree)
of their query. This means that caching is agnostic to upper/lowercase, for example `SELECT 1` and `select 1` are treated as the same query.
To make the matching more natural, all query-level settings related to the query result cache are removed from the AST.
If the query was aborted due to an exception or user cancellation, no entry is written into the query result cache.
The size of the query result cache, the maximum number of cache entries and the maximum size of cache entries (in bytes and in records) can
be configured using different [server configuration options](server-configuration-parameters/settings.md#server_configuration_parameters_query-result-cache).
To define how long a query must run at least such that its result can be cached, you can use setting
[query_result_cache_min_query_duration](settings/settings.md#query-result-cache-min-query-duration). For example, the result of query
``` sql
SELECT some_expensive_calculation(column_1, column_2)
FROM table
SETTINGS use_query_result_cache = true, query_result_cache_min_query_duration = 5000;
```
is only cached if the query runs longer than 5 seconds. It is also possible to specify how often a query needs to run until its result is
cached - for that use setting [query_result_cache_min_query_runs](settings/settings.md#query-result-cache-min-query-runs).
Entries in the query result cache become stale after a certain time period (time-to-live). By default, this period is 60 seconds but a
different value can be specified at session, profile or query level using setting [query_result_cache_ttl](settings/settings.md#query-result-cache-ttl).
Also, results of queries with non-deterministic functions such as `rand()` and `now()` are not cached. This can be overruled using
setting [query_result_cache_store_results_of_queries_with_nondeterministic_functions](settings/settings.md#query-result-cache-store-results-of-queries-with-nondeterministic-functions).
Finally, entries in the query cache are not shared between users due to security reasons. For example, user A must not be able to bypass a
row policy on a table by running the same query as another user B for whom no such policy exists. However, if necessary, cache entries can
be marked accessible by other users (i.e. shared) by supplying setting
[query_result_cache_share_between_users](settings/settings.md#query-result-cache-share-between-users).

View File

@ -1303,7 +1303,7 @@ Default value: `3`.
## use_query_cache {#use-query-cache}
If turned on, `SELECT` queries may utilize the [query cache](../query-cache.md). Parameters [enable_reads_from_query_cache](#enable-readsfrom-query-cache)
If turned on, `SELECT` queries may utilize the [query cache](../query-cache.md). Parameters [enable_reads_from_query_cache](#enable-reads-from-query-cache)
and [enable_writes_to_query_cache](#enable-writes-to-query-cache) control in more detail how the cache is used.
Possible values:

View File

@ -283,7 +283,7 @@ SYSTEM START REPLICATION QUEUES [[db.]replicated_merge_tree_family_table_name]
Wait until a `ReplicatedMergeTree` table will be synced with other replicas in a cluster. Will run until `receive_timeout` if fetches currently disabled for the table.
``` sql
SYSTEM SYNC REPLICA [db.]replicated_merge_tree_family_table_name
SYSTEM SYNC REPLICA [ON CLUSTER cluster_name] [db.]replicated_merge_tree_family_table_name
```
After running this statement the `[db.]replicated_merge_tree_family_table_name` fetches commands from the common replicated log into its own replication queue, and then the query waits till the replica processes all of the fetched commands.

View File

@ -2,11 +2,12 @@
slug: /en/sql-reference/table-functions/s3
sidebar_position: 45
sidebar_label: s3
keywords: [s3, gcs, bucket]
---
# s3 Table Function
Provides table-like interface to select/insert files in [Amazon S3](https://aws.amazon.com/s3/). This table function is similar to [hdfs](../../sql-reference/table-functions/hdfs.md), but provides S3-specific features.
Provides a table-like interface to select/insert files in [Amazon S3](https://aws.amazon.com/s3/) and [Google Cloud Storage](https://cloud.google.com/storage/). This table function is similar to the [hdfs function](../../sql-reference/table-functions/hdfs.md), but provides S3-specific features.
**Syntax**
@ -14,9 +15,24 @@ Provides table-like interface to select/insert files in [Amazon S3](https://aws.
s3(path [,aws_access_key_id, aws_secret_access_key] [,format] [,structure] [,compression])
```
:::tip GCS
The S3 Table Function integrates with Google Cloud Storage by using the GCS XML API and HMAC keys. See the [Google interoperability docs]( https://cloud.google.com/storage/docs/interoperability) for more details about the endpoint and HMAC.
For GCS, substitute your HMAC key and HMAC secret where you see `aws_access_key_id` and `aws_secret_access_key`.
:::
**Arguments**
- `path` — Bucket url with path to file. Supports following wildcards in readonly mode: `*`, `?`, `{abc,def}` and `{N..M}` where `N`, `M` — numbers, `'abc'`, `'def'` — strings. For more information see [here](../../engines/table-engines/integrations/s3.md#wildcards-in-path).
:::note GCS
The GCS path is in this format as the endpoint for the Google XML API is different than the JSON API:
```
https://storage.googleapis.com/<bucket>/<folder>/<filename(s)>
```
and not ~~https://storage.cloud.google.com~~.
:::
- `format` — The [format](../../interfaces/formats.md#formats) of the file.
- `structure` — Structure of the table. Format `'column1_name column1_type, column2_name column2_type, ...'`.
- `compression` — Parameter is optional. Supported values: `none`, `gzip/gz`, `brotli/br`, `xz/LZMA`, `zstd/zst`. By default, it will autodetect compression by file extension.

View File

@ -76,7 +76,7 @@
#charts
{
height: 100%;
display: flex;
display: none;
flex-flow: row wrap;
gap: 1rem;
}
@ -170,6 +170,14 @@
background: var(--button-background-color);
}
#auth-error {
color: var(--error-color);
display: flex;
flex-flow: row nowrap;
justify-content: center;
}
form {
display: inline;
}
@ -293,6 +301,7 @@
</div>
</form>
</div>
<div id="auth-error"></div>
<div id="charts"></div>
<script>
@ -322,6 +331,11 @@ if (location.protocol != 'file:') {
user = 'default';
}
const errorCodeRegex = /Code: (\d+)/
const errorCodeMessageMap = {
516: 'Error authenticating with database. Please check your connection params and try again.'
}
/// This is just a demo configuration of the dashboard.
let queries = [
@ -597,6 +611,11 @@ function insertChart(i) {
query_editor_confirm.value = 'Ok';
query_editor_confirm.className = 'edit-confirm';
function getCurrentIndex() {
/// Indices may change after deletion of other element, hence captured "i" may become incorrect.
return [...charts.querySelectorAll('.chart')].findIndex(child => chart == child);
}
function editConfirm() {
query_editor.style.display = 'none';
query_error.style.display = 'none';
@ -605,7 +624,8 @@ function insertChart(i) {
title_text.data = '';
findParamsInQuery(q.query, params);
buildParams();
draw(i, chart, getParamsForURL(), q.query);
const idx = getCurrentIndex();
draw(idx, chart, getParamsForURL(), q.query);
saveState();
}
@ -649,8 +669,7 @@ function insertChart(i) {
let trash_text = document.createTextNode('✕');
trash.appendChild(trash_text);
trash.addEventListener('click', e => {
/// Indices may change after deletion of other element, hence captured "i" may become incorrect.
let idx = [...charts.querySelectorAll('.chart')].findIndex(child => chart == child);
const idx = getCurrentIndex();
if (plots[idx]) {
plots[idx].destroy();
plots[idx] = null;
@ -796,6 +815,18 @@ async function draw(idx, chart, url_params, query) {
error = e.toString();
}
if (error) {
const errorMatch = error.match(errorCodeRegex)
if (errorMatch && errorMatch[1]) {
const code = errorMatch[1]
if (errorCodeMessageMap[code]) {
const authError = new Error(errorCodeMessageMap[code])
authError.code = code
throw authError
}
}
}
if (!error) {
if (!Array.isArray(data)) {
error = "Query should return an array.";
@ -853,16 +884,50 @@ async function draw(idx, chart, url_params, query) {
sync.sub(plots[idx]);
/// Set title
const title = queries[idx].title ? queries[idx].title.replaceAll(/\{(\w+)\}/g, (_, name) => params[name] ) : '';
const title = queries[idx] && queries[idx].title ? queries[idx].title.replaceAll(/\{(\w+)\}/g, (_, name) => params[name] ) : '';
chart.querySelector('.title').firstChild.data = title;
}
function showAuthError(message) {
const charts = document.querySelector('#charts');
charts.style.display = 'none';
const add = document.querySelector('#add');
add.style.display = 'none';
const authError = document.querySelector('#auth-error');
authError.textContent = message;
authError.style.display = 'flex';
}
function hideAuthError() {
const charts = document.querySelector('#charts');
charts.style.display = 'flex';
const add = document.querySelector('#add');
add.style.display = 'block';
const authError = document.querySelector('#auth-error');
authError.textContent = '';
authError.style.display = 'none';
}
let firstLoad = true;
async function drawAll() {
let params = getParamsForURL();
const charts = document.getElementsByClassName('chart');
for (let i = 0; i < queries.length; ++i) {
draw(i, charts[i], params, queries[i].query);
if (!firstLoad) {
hideAuthError();
}
await Promise.all([...Array(queries.length)].map(async (_, i) => {
return draw(i, charts[i], params, queries[i].query).catch((e) => {
if (!firstLoad) {
showAuthError(e.message);
}
});
})).then(() => {
firstLoad = false;
})
}
function resize() {

View File

@ -0,0 +1,40 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/Helpers.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include <AggregateFunctions/AggregateFunctionVarianceMatrix.h>
namespace DB
{
struct Settings;
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}
namespace
{
template <typename FunctionTemplate>
AggregateFunctionPtr createAggregateFunctionVarianceMatrix(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
{
assertNoParameters(name, parameters);
for (const auto & argument_type : argument_types)
if (!isNativeNumber(argument_type))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Aggregate function {} only supports numerical types", name);
return std::make_shared<FunctionTemplate>(argument_types);
}
}
void registerAggregateFunctionsVarianceMatrix(AggregateFunctionFactory & factory)
{
factory.registerFunction("covarSampMatrix", createAggregateFunctionVarianceMatrix<AggregateFunctionCovarSampMatrix>);
factory.registerFunction("covarPopMatrix", createAggregateFunctionVarianceMatrix<AggregateFunctionCovarPopMatrix>);
factory.registerFunction("corrMatrix", createAggregateFunctionVarianceMatrix<AggregateFunctionCorrMatrix>);
}
}

View File

@ -0,0 +1,159 @@
#pragma once
#include <Columns/ColumnArray.h>
#include <Columns/ColumnsNumber.h>
#include <Common/PODArray.h>
#include <Common/PODArray_fwd.h>
#include <DataTypes/DataTypeArray.h>
#include <AggregateFunctions/IAggregateFunction.h>
#include <AggregateFunctions/Moments.h>
#include <DataTypes/DataTypesNumber.h>
namespace DB
{
struct Settings;
enum class StatisticsMatrixFunctionKind
{
covarPopMatrix,
covarSampMatrix,
corrMatrix
};
template <StatisticsMatrixFunctionKind _kind>
struct AggregateFunctionVarianceMatrixData
{
using DataType = std::conditional_t<_kind == StatisticsMatrixFunctionKind::corrMatrix, CorrMoments<Float64>, CovarMoments<Float64>>;
AggregateFunctionVarianceMatrixData() = default;
explicit AggregateFunctionVarianceMatrixData(const size_t _num_args)
: num_args(_num_args)
{
data_matrix.resize_fill(num_args * (num_args + 1) / 2, DataType());
}
void add(const IColumn ** column, const size_t row_num)
{
for (size_t i = 0; i < num_args; ++i)
for (size_t j = 0; j <= i; ++j)
data_matrix[i * (i + 1) / 2 + j].add(column[i]->getFloat64(row_num), column[j]->getFloat64(row_num));
}
void merge(const AggregateFunctionVarianceMatrixData & other)
{
for (size_t i = 0; i < num_args; ++i)
for (size_t j = 0; j <= i; ++j)
data_matrix[i * (i + 1) / 2 + j].merge(other.data_matrix[i * (i + 1) / 2 + j]);
}
void serialize(WriteBuffer & buf) const
{
for (size_t i = 0; i < num_args; ++i)
for (size_t j = 0; j <= i; ++j)
data_matrix[i * (i + 1) / 2 + j].write(buf);
}
void deserialize(ReadBuffer & buf)
{
for (size_t i = 0; i < num_args; ++i)
for (size_t j = 0; j <= i; ++j)
data_matrix[i * (i + 1) / 2 + j].read(buf);
}
void insertResultInto(IColumn & to) const
{
auto & data_to = assert_cast<ColumnFloat64 &>(assert_cast<ColumnArray &>(assert_cast<ColumnArray &>(to).getData()).getData()).getData();
auto & root_offsets_to = assert_cast<ColumnArray &>(to).getOffsets();
auto & nested_offsets_to = assert_cast<ColumnArray &>(assert_cast<ColumnArray &>(to).getData()).getOffsets();
for (size_t i = 0; i < num_args; ++i)
{
for (size_t j = 0; j < num_args; ++j)
{
auto & data = i < j ? data_matrix[j * (j + 1) / 2 + i] : data_matrix[i * (i + 1) / 2 + j];
if constexpr (kind == StatisticsMatrixFunctionKind::covarPopMatrix)
data_to.push_back(data.getPopulation());
if constexpr (kind == StatisticsMatrixFunctionKind::covarSampMatrix)
data_to.push_back(data.getSample());
if constexpr (kind == StatisticsMatrixFunctionKind::corrMatrix)
data_to.push_back(data.get());
}
nested_offsets_to.push_back(nested_offsets_to.back() + num_args);
}
root_offsets_to.push_back(root_offsets_to.back() + num_args);
}
static constexpr StatisticsMatrixFunctionKind kind = _kind;
PaddedPODArray<DataType> data_matrix;
size_t num_args;
};
template <typename Data>
class AggregateFunctionVarianceMatrix final
: public IAggregateFunctionDataHelper<Data, AggregateFunctionVarianceMatrix<Data>>
{
public:
explicit AggregateFunctionVarianceMatrix(const DataTypes & argument_types_)
: IAggregateFunctionDataHelper<Data, AggregateFunctionVarianceMatrix<Data>>(argument_types_, {}, createResultType())
{}
AggregateFunctionVarianceMatrix(const IDataType &, const DataTypes & argument_types_)
: IAggregateFunctionDataHelper<Data, AggregateFunctionVarianceMatrix<Data>>(argument_types_, {}, createResultType())
{}
String getName() const override
{
if constexpr (Data::kind == StatisticsMatrixFunctionKind::covarPopMatrix)
return "covarPopMatrix";
if constexpr (Data::kind == StatisticsMatrixFunctionKind::covarSampMatrix)
return "covarSampMatrix";
if constexpr (Data::kind == StatisticsMatrixFunctionKind::corrMatrix)
return "corrMatrix";
UNREACHABLE();
}
void create(AggregateDataPtr __restrict place) const override
{
new (place) Data(this->argument_types.size());
}
static DataTypePtr createResultType()
{
return std::make_shared<DataTypeArray>(std::make_shared<DataTypeArray>(std::make_shared<DataTypeFloat64>()));
}
bool allocatesMemoryInArena() const override { return false; }
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
{
this->data(place).add(columns, row_num);
}
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override
{
this->data(place).merge(this->data(rhs));
}
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override
{
this->data(place).serialize(buf);
}
void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t> /* version */, Arena *) const override
{
this->data(place).deserialize(buf);
}
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
{
this->data(place).insertResultInto(to);
}
};
using AggregateFunctionCovarPopMatrix = AggregateFunctionVarianceMatrix<AggregateFunctionVarianceMatrixData<StatisticsMatrixFunctionKind::covarPopMatrix>>;
using AggregateFunctionCovarSampMatrix = AggregateFunctionVarianceMatrix<AggregateFunctionVarianceMatrixData<StatisticsMatrixFunctionKind::covarSampMatrix>>;
using AggregateFunctionCorrMatrix = AggregateFunctionVarianceMatrix<AggregateFunctionVarianceMatrixData<StatisticsMatrixFunctionKind::corrMatrix>>;
}

View File

@ -40,6 +40,7 @@ void registerAggregateFunctionsMax(AggregateFunctionFactory &);
void registerAggregateFunctionsAny(AggregateFunctionFactory &);
void registerAggregateFunctionsStatisticsStable(AggregateFunctionFactory &);
void registerAggregateFunctionsStatisticsSimple(AggregateFunctionFactory &);
void registerAggregateFunctionsVarianceMatrix(AggregateFunctionFactory &);
void registerAggregateFunctionSum(AggregateFunctionFactory &);
void registerAggregateFunctionSumCount(AggregateFunctionFactory &);
void registerAggregateFunctionSumMap(AggregateFunctionFactory &);
@ -126,6 +127,7 @@ void registerAggregateFunctions()
registerAggregateFunctionsAny(factory);
registerAggregateFunctionsStatisticsStable(factory);
registerAggregateFunctionsStatisticsSimple(factory);
registerAggregateFunctionsVarianceMatrix(factory);
registerAggregateFunctionSum(factory);
registerAggregateFunctionSumCount(factory);
registerAggregateFunctionSumMap(factory);

View File

@ -157,21 +157,24 @@ public:
assert(file_buffer && current_file_description);
assert(record.header.index - getStartIndex() <= current_file_description->expectedEntriesCountInLog());
const bool log_is_complete = record.header.index - getStartIndex() == current_file_description->expectedEntriesCountInLog();
if (log_is_complete)
rotate(record.header.index);
// writing at least 1 log is requirement - we don't want empty log files
// we use count() that can be unreliable for more complex WriteBuffers, so we should be careful if we change the type of it in the future
const bool log_too_big = record.header.index != getStartIndex() && log_file_settings.max_size != 0
&& initial_file_size + file_buffer->count() > log_file_settings.max_size;
if (log_too_big)
// check if log file reached the limit for amount of records it can contain
if (record.header.index - getStartIndex() == current_file_description->expectedEntriesCountInLog())
{
LOG_TRACE(log, "Log file reached maximum allowed size ({} bytes), creating new log file", log_file_settings.max_size);
rotate(record.header.index);
}
else
{
// writing at least 1 log is requirement - we don't want empty log files
// we use count() that can be unreliable for more complex WriteBuffers, so we should be careful if we change the type of it in the future
const bool log_too_big = record.header.index != getStartIndex() && log_file_settings.max_size != 0
&& initial_file_size + file_buffer->count() > log_file_settings.max_size;
if (log_too_big)
{
LOG_TRACE(log, "Log file reached maximum allowed size ({} bytes), creating new log file", log_file_settings.max_size);
rotate(record.header.index);
}
}
if (!prealloc_done) [[unlikely]]
{

View File

@ -216,7 +216,7 @@ struct ConvertImpl
}
else if constexpr (
(std::is_same_v<FromDataType, DataTypeIPv4> != std::is_same_v<ToDataType, DataTypeIPv4>)
&& !(is_any_of<FromDataType, DataTypeUInt8, DataTypeUInt16, DataTypeUInt32> || is_any_of<ToDataType, DataTypeUInt32, DataTypeUInt64, DataTypeUInt128, DataTypeUInt256>)
&& !(is_any_of<FromDataType, DataTypeUInt8, DataTypeUInt16, DataTypeUInt32, DataTypeUInt64> || is_any_of<ToDataType, DataTypeUInt32, DataTypeUInt64, DataTypeUInt128, DataTypeUInt256>)
)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Conversion from {} to {} is not supported",
@ -303,7 +303,10 @@ struct ConvertImpl
}
else
{
vec_to[i] = static_cast<ToFieldType>(vec_from[i]);
if constexpr (std::is_same_v<ToDataType, DataTypeIPv4> && std::is_same_v<FromDataType, DataTypeUInt64>)
vec_to[i] = static_cast<ToFieldType>(static_cast<IPv4::UnderlyingType>(vec_from[i]));
else
vec_to[i] = static_cast<ToFieldType>(vec_from[i]);
}
}
}
@ -374,7 +377,7 @@ struct ToDateTransform32Or64
static NO_SANITIZE_UNDEFINED ToType execute(const FromType & from, const DateLUTImpl & time_zone)
{
// since converting to Date, no need in values outside of default LUT range.
return (from < DATE_LUT_MAX_DAY_NUM)
return (from <= DATE_LUT_MAX_DAY_NUM)
? from
: time_zone.toDayNum(std::min(time_t(from), time_t(0xFFFFFFFF)));
}
@ -391,7 +394,7 @@ struct ToDateTransform32Or64Signed
/// The function should be monotonic (better for query optimizations), so we saturate instead of overflow.
if (from < 0)
return 0;
return (from < DATE_LUT_MAX_DAY_NUM)
return (from <= DATE_LUT_MAX_DAY_NUM)
? static_cast<ToType>(from)
: time_zone.toDayNum(std::min(time_t(from), time_t(0xFFFFFFFF)));
}

View File

@ -186,7 +186,10 @@ QueryCache::Writer::Writer(std::mutex & mutex_, Cache & cache_, const Key & key_
, min_query_runtime(min_query_runtime_)
{
if (auto it = cache.find(key); it != cache.end() && !is_stale(it->first))
{
skip_insert = true; /// Key already contained in cache and did not expire yet --> don't replace it
LOG_TRACE(&Poco::Logger::get("QueryResultCache"), "Skipped insert (non-stale entry found), query: {}", key.queryStringFromAst());
}
}
void QueryCache::Writer::buffer(Chunk && partial_query_result)
@ -205,6 +208,7 @@ void QueryCache::Writer::buffer(Chunk && partial_query_result)
{
chunks->clear(); /// eagerly free some space
skip_insert = true;
LOG_TRACE(&Poco::Logger::get("QueryResultCache"), "Skipped insert (query result too big), new_entry_size_in_bytes: {} ({}), new_entry_size_in_rows: {} ({}), query: {}", new_entry_size_in_bytes, max_entry_size_in_bytes, new_entry_size_in_rows, max_entry_size_in_rows, key.queryStringFromAst());
}
}
@ -214,12 +218,19 @@ void QueryCache::Writer::finalizeWrite()
return;
if (std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - query_start_time) < min_query_runtime)
{
LOG_TRACE(&Poco::Logger::get("QueryResultCache"), "Skipped insert (query not expensive enough), query: {}", key.queryStringFromAst());
return;
}
std::lock_guard lock(mutex);
if (auto it = cache.find(key); it != cache.end() && !is_stale(it->first))
return; /// same check as in ctor because a parallel Writer could have inserted the current key in the meantime
{
/// same check as in ctor because a parallel Writer could have inserted the current key in the meantime
LOG_TRACE(&Poco::Logger::get("QueryResultCache"), "Skipped insert (non-stale entry found), query: {}", key.queryStringFromAst());
return;
}
auto sufficient_space_in_cache = [this]() TSA_REQUIRES(mutex)
{
@ -242,9 +253,11 @@ void QueryCache::Writer::finalizeWrite()
LOG_TRACE(&Poco::Logger::get("QueryCache"), "Removed {} stale entries", removed_items);
}
/// Insert or replace if enough space
if (sufficient_space_in_cache())
if (!sufficient_space_in_cache())
LOG_TRACE(&Poco::Logger::get("QueryResultCache"), "Skipped insert (cache has insufficient space), query: {}", key.queryStringFromAst());
else
{
//// Insert or replace key
cache_size_in_bytes += query_result.sizeInBytes();
if (auto it = cache.find(key); it != cache.end())
cache_size_in_bytes -= it->second.sizeInBytes(); // key replacement

View File

@ -17,7 +17,7 @@
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
#include <Processors/Transforms/CheckSortedTransform.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTFunction.h>
@ -197,7 +197,7 @@ bool isStorageTouchedByMutations(
MergeTreeData::DataPartPtr source_part,
const StorageMetadataPtr & metadata_snapshot,
const std::vector<MutationCommand> & commands,
ContextMutablePtr context_copy)
ContextPtr context)
{
if (commands.empty())
return false;
@ -210,7 +210,7 @@ bool isStorageTouchedByMutations(
if (command.partition)
{
const String partition_id = storage.getPartitionIDFromQuery(command.partition, context_copy);
const String partition_id = storage.getPartitionIDFromQuery(command.partition, context);
if (partition_id == source_part->info.partition_id)
all_commands_can_be_skipped = false;
}
@ -221,15 +221,7 @@ bool isStorageTouchedByMutations(
if (all_commands_can_be_skipped)
return false;
/// We must read with one thread because it guarantees that
/// output stream will be sorted after reading from MergeTree parts.
/// Disable all settings that can enable reading with several streams.
context_copy->setSetting("max_streams_to_max_threads_ratio", 1);
context_copy->setSetting("max_threads", 1);
context_copy->setSetting("allow_asynchronous_read_from_io_pool_for_merge_tree", false);
context_copy->setSetting("max_streams_for_merge_tree_reading", Field(0));
ASTPtr select_query = prepareQueryAffectedAST(commands, storage.shared_from_this(), context_copy);
ASTPtr select_query = prepareQueryAffectedAST(commands, storage.shared_from_this(), context);
auto storage_from_part = std::make_shared<StorageFromMergeTreeDataPart>(source_part);
@ -237,12 +229,12 @@ bool isStorageTouchedByMutations(
/// For some reason it may copy context and give it into ExpressionTransform
/// after that we will use context from destroyed stack frame in our stream.
InterpreterSelectQuery interpreter(
select_query, context_copy, storage_from_part, metadata_snapshot, SelectQueryOptions().ignoreLimits().ignoreProjections());
select_query, context, storage_from_part, metadata_snapshot, SelectQueryOptions().ignoreLimits().ignoreProjections());
auto io = interpreter.execute();
PullingPipelineExecutor executor(io.pipeline);
PullingAsyncPipelineExecutor executor(io.pipeline);
Block block;
while (executor.pull(block)) {}
while (block.rows() == 0 && executor.pull(block));
if (!block.rows())
return false;

View File

@ -23,7 +23,7 @@ bool isStorageTouchedByMutations(
MergeTreeData::DataPartPtr source_part,
const StorageMetadataPtr & metadata_snapshot,
const std::vector<MutationCommand> & commands,
ContextMutablePtr context_copy
ContextPtr context
);
ASTPtr getPartitionAndPredicateExpressionForMutationCommand(

View File

@ -37,81 +37,118 @@ namespace
{
/// Finds arguments of a specified function which should not be displayed for most users for security reasons.
/// That involves passwords and secret keys.
/// The member function getRange() returns a pair of numbers [first, last) specifying arguments
/// which must be hidden. If the function returns {-1, -1} that means no arguments must be hidden.
class FunctionSecretArgumentsFinder
{
public:
explicit FunctionSecretArgumentsFinder(const ASTFunction & function_) : function(function_)
{
if (function.arguments)
{
if (const auto * expr_list = function.arguments->as<ASTExpressionList>())
arguments = &expr_list->children;
}
}
if (!function.arguments)
return;
std::pair<size_t, size_t> getRange() const
{
if (!arguments)
return npos;
const auto * expr_list = function.arguments->as<ASTExpressionList>();
if (!expr_list)
return;
arguments = &expr_list->children;
switch (function.kind)
{
case ASTFunction::Kind::ORDINARY_FUNCTION: return findOrdinaryFunctionSecretArguments();
case ASTFunction::Kind::WINDOW_FUNCTION: return npos;
case ASTFunction::Kind::LAMBDA_FUNCTION: return npos;
case ASTFunction::Kind::TABLE_ENGINE: return findTableEngineSecretArguments();
case ASTFunction::Kind::DATABASE_ENGINE: return findDatabaseEngineSecretArguments();
case ASTFunction::Kind::BACKUP_NAME: return findBackupNameSecretArguments();
case ASTFunction::Kind::ORDINARY_FUNCTION: findOrdinaryFunctionSecretArguments(); break;
case ASTFunction::Kind::WINDOW_FUNCTION: break;
case ASTFunction::Kind::LAMBDA_FUNCTION: break;
case ASTFunction::Kind::TABLE_ENGINE: findTableEngineSecretArguments(); break;
case ASTFunction::Kind::DATABASE_ENGINE: findDatabaseEngineSecretArguments(); break;
case ASTFunction::Kind::BACKUP_NAME: findBackupNameSecretArguments(); break;
}
}
static const constexpr std::pair<size_t, size_t> npos{static_cast<size_t>(-1), static_cast<size_t>(-1)};
struct Result
{
/// Result constructed by default means no arguments will be hidden.
size_t start = static_cast<size_t>(-1);
size_t count = 0; /// Mostly it's either 0 or 1. There are only a few cases where `count` can be greater than 1 (e.g. see `encrypt`).
/// In all known cases secret arguments are consecutive
bool are_named = false; /// Arguments like `password = 'password'` are considered as named arguments.
};
Result getResult() const { return result; }
private:
std::pair<size_t, size_t> findOrdinaryFunctionSecretArguments() const
const ASTFunction & function;
const ASTs * arguments = nullptr;
Result result;
void markSecretArgument(size_t index, bool argument_is_named = false)
{
if (!result.count)
{
result.start = index;
result.are_named = argument_is_named;
}
chassert(index >= result.start); /// We always check arguments consecutively
result.count = index + 1 - result.start;
if (!argument_is_named)
result.are_named = false;
}
void findOrdinaryFunctionSecretArguments()
{
if ((function.name == "mysql") || (function.name == "postgresql") || (function.name == "mongodb"))
{
/// mysql('host:port', 'database', 'table', 'user', 'password', ...)
/// postgresql('host:port', 'database', 'table', 'user', 'password', ...)
/// mongodb('host:port', 'database', 'collection', 'user', 'password', ...)
return {4, 5};
findMySQLFunctionSecretArguments();
}
else if ((function.name == "s3") || (function.name == "cosn") || (function.name == "oss"))
{
/// s3('url', 'aws_access_key_id', 'aws_secret_access_key', ...)
return findS3FunctionSecretArguments(/* is_cluster_function= */ false);
findS3FunctionSecretArguments(/* is_cluster_function= */ false);
}
else if (function.name == "s3Cluster")
{
/// s3Cluster('cluster_name', 'url', 'aws_access_key_id', 'aws_secret_access_key', ...)
return findS3FunctionSecretArguments(/* is_cluster_function= */ true);
findS3FunctionSecretArguments(/* is_cluster_function= */ true);
}
else if ((function.name == "remote") || (function.name == "remoteSecure"))
{
/// remote('addresses_expr', 'db', 'table', 'user', 'password', ...)
return findRemoteFunctionSecretArguments();
findRemoteFunctionSecretArguments();
}
else if ((function.name == "encrypt") || (function.name == "decrypt") ||
(function.name == "aes_encrypt_mysql") || (function.name == "aes_decrypt_mysql") ||
(function.name == "tryDecrypt"))
{
/// encrypt('mode', 'plaintext', 'key' [, iv, aad])
return findEncryptionFunctionSecretArguments();
}
else
{
return npos;
findEncryptionFunctionSecretArguments();
}
}
std::pair<size_t, size_t> findS3FunctionSecretArguments(bool is_cluster_function) const
void findMySQLFunctionSecretArguments()
{
if (isNamedCollectionName(0))
{
/// mysql(named_collection, ..., password = 'password', ...)
findSecretNamedArgument("password", 1);
}
else
{
/// mysql('host:port', 'database', 'table', 'user', 'password', ...)
markSecretArgument(4);
}
}
void findS3FunctionSecretArguments(bool is_cluster_function)
{
/// s3Cluster('cluster_name', 'url', ...) has 'url' as its second argument.
size_t url_arg_idx = is_cluster_function ? 1 : 0;
if (!is_cluster_function && isNamedCollectionName(0))
{
/// s3(named_collection, ..., secret_access_key = 'secret_access_key', ...)
findSecretNamedArgument("secret_access_key", 1);
return;
}
/// We're going to replace 'aws_secret_access_key' with '[HIDDEN'] for the following signatures:
/// s3('url', 'aws_access_key_id', 'aws_secret_access_key', ...)
/// s3Cluster('cluster_name', 'url', 'aws_access_key_id', 'aws_secret_access_key', 'format', 'compression')
@ -119,12 +156,12 @@ namespace
/// But we should check the number of arguments first because we don't need to do any replacements in case of
/// s3('url' [, 'format']) or s3Cluster('cluster_name', 'url' [, 'format'])
if (arguments->size() < url_arg_idx + 3)
return npos;
return;
if (arguments->size() >= url_arg_idx + 5)
{
/// s3('url', 'aws_access_key_id', 'aws_secret_access_key', 'format', 'structure', ...)
return {url_arg_idx + 2, url_arg_idx + 3};
markSecretArgument(url_arg_idx + 2);
}
else
{
@ -136,15 +173,16 @@ namespace
{
/// We couldn't evaluate the argument after 'url' so we don't know whether it is a format or `aws_access_key_id`.
/// So it's safer to wipe the next argument just in case.
return {url_arg_idx + 2, url_arg_idx + 3}; /// Wipe either `aws_secret_access_key` or `structure`.
markSecretArgument(url_arg_idx + 2); /// Wipe either `aws_secret_access_key` or `structure`.
return;
}
if (KnownFormatNames::instance().exists(format))
return npos; /// The argument after 'url' is a format: s3('url', 'format', ...)
return; /// The argument after 'url' is a format: s3('url', 'format', ...)
/// The argument after 'url' is not a format so we do our replacement:
/// s3('url', 'aws_access_key_id', 'aws_secret_access_key', ...) -> s3('url', 'aws_access_key_id', '[HIDDEN]', ...)
return {url_arg_idx + 2, url_arg_idx + 3};
markSecretArgument(url_arg_idx + 2);
}
}
@ -153,8 +191,12 @@ namespace
if (arg_idx >= arguments->size())
return false;
ASTPtr argument = (*arguments)[arg_idx];
if (const auto * literal = argument->as<ASTLiteral>())
return tryGetStringFromArgument(*(*arguments)[arg_idx], res, allow_identifier);
}
static bool tryGetStringFromArgument(const IAST & argument, String * res, bool allow_identifier = true)
{
if (const auto * literal = argument.as<ASTLiteral>())
{
if (literal->value.getType() != Field::Types::String)
return false;
@ -165,7 +207,7 @@ namespace
if (allow_identifier)
{
if (const auto * id = argument->as<ASTIdentifier>())
if (const auto * id = argument.as<ASTIdentifier>())
{
if (res)
*res = id->name();
@ -176,8 +218,15 @@ namespace
return false;
}
std::pair<size_t, size_t> findRemoteFunctionSecretArguments() const
void findRemoteFunctionSecretArguments()
{
if (isNamedCollectionName(0))
{
/// remote(named_collection, ..., password = 'password', ...)
findSecretNamedArgument("password", 1);
return;
}
/// We're going to replace 'password' with '[HIDDEN'] for the following signatures:
/// remote('addresses_expr', db.table, 'user' [, 'password'] [, sharding_key])
/// remote('addresses_expr', 'db', 'table', 'user' [, 'password'] [, sharding_key])
@ -186,7 +235,7 @@ namespace
/// But we should check the number of arguments first because we don't need to do any replacements in case of
/// remote('addresses_expr', db.table)
if (arguments->size() < 3)
return npos;
return;
size_t arg_num = 1;
@ -207,20 +256,17 @@ namespace
/// before the argument 'password'. So it's safer to wipe two arguments just in case.
/// The last argument can be also a `sharding_key`, so we need to check that argument is a literal string
/// before wiping it (because the `password` argument is always a literal string).
auto res = npos;
if (tryGetStringFromArgument(arg_num + 2, nullptr, /* allow_identifier= */ false))
{
/// Wipe either `password` or `user`.
res = {arg_num + 2, arg_num + 3};
markSecretArgument(arg_num + 2);
}
if (tryGetStringFromArgument(arg_num + 3, nullptr, /* allow_identifier= */ false))
{
/// Wipe either `password` or `sharding_key`.
if (res == npos)
res.first = arg_num + 3;
res.second = arg_num + 4;
markSecretArgument(arg_num + 3);
}
return res;
return;
}
/// Skip the current argument (which is either a database name or a qualified table name).
@ -241,9 +287,7 @@ namespace
/// before wiping it (because the `password` argument is always a literal string).
bool can_be_password = tryGetStringFromArgument(arg_num, nullptr, /* allow_identifier= */ false);
if (can_be_password)
return {arg_num, arg_num + 1};
return npos;
markSecretArgument(arg_num);
}
/// Tries to get either a database name or a qualified table name from an argument.
@ -278,20 +322,24 @@ namespace
return true;
}
std::pair<size_t, size_t> findEncryptionFunctionSecretArguments() const
void findEncryptionFunctionSecretArguments()
{
if (arguments->empty())
return;
/// We replace all arguments after 'mode' with '[HIDDEN]':
/// encrypt('mode', 'plaintext', 'key' [, iv, aad]) -> encrypt('mode', '[HIDDEN]')
return {1, arguments->size()};
result.start = 1;
result.count = arguments->size() - 1;
}
std::pair<size_t, size_t> findTableEngineSecretArguments() const
void findTableEngineSecretArguments()
{
const String & engine_name = function.name;
if (engine_name == "ExternalDistributed")
{
/// ExternalDistributed('engine', 'host:port', 'database', 'table', 'user', 'password')
return {5, 6};
findExternalDistributedTableEngineSecretArguments();
}
else if ((engine_name == "MySQL") || (engine_name == "PostgreSQL") ||
(engine_name == "MaterializedPostgreSQL") || (engine_name == "MongoDB"))
@ -300,21 +348,38 @@ namespace
/// PostgreSQL('host:port', 'database', 'table', 'user', 'password', ...)
/// MaterializedPostgreSQL('host:port', 'database', 'table', 'user', 'password', ...)
/// MongoDB('host:port', 'database', 'collection', 'user', 'password', ...)
return {4, 5};
findMySQLFunctionSecretArguments();
}
else if ((engine_name == "S3") || (engine_name == "COSN") || (engine_name == "OSS"))
{
/// S3('url', ['aws_access_key_id', 'aws_secret_access_key',] ...)
return findS3TableEngineSecretArguments();
}
else
{
return npos;
findS3TableEngineSecretArguments();
}
}
std::pair<size_t, size_t> findS3TableEngineSecretArguments() const
void findExternalDistributedTableEngineSecretArguments()
{
if (isNamedCollectionName(1))
{
/// ExternalDistributed('engine', named_collection, ..., password = 'password', ...)
findSecretNamedArgument("password", 2);
}
else
{
/// ExternalDistributed('engine', 'host:port', 'database', 'table', 'user', 'password')
markSecretArgument(5);
}
}
void findS3TableEngineSecretArguments()
{
if (isNamedCollectionName(0))
{
/// S3(named_collection, ..., secret_access_key = 'secret_access_key')
findSecretNamedArgument("secret_access_key", 1);
return;
}
/// We replace 'aws_secret_access_key' with '[HIDDEN'] for the following signatures:
/// S3('url', 'aws_access_key_id', 'aws_secret_access_key', 'format')
/// S3('url', 'aws_access_key_id', 'aws_secret_access_key', 'format', 'compression')
@ -322,12 +387,12 @@ namespace
/// But we should check the number of arguments first because we don't need to do that replacements in case of
/// S3('url' [, 'format' [, 'compression']])
if (arguments->size() < 4)
return npos;
return;
return {2, 3};
markSecretArgument(2);
}
std::pair<size_t, size_t> findDatabaseEngineSecretArguments() const
void findDatabaseEngineSecretArguments()
{
const String & engine_name = function.name;
if ((engine_name == "MySQL") || (engine_name == "MaterializeMySQL") ||
@ -335,31 +400,71 @@ namespace
(engine_name == "MaterializedPostgreSQL"))
{
/// MySQL('host:port', 'database', 'user', 'password')
/// PostgreSQL('host:port', 'database', 'user', 'password', ...)
return {3, 4};
}
else
{
return npos;
/// PostgreSQL('host:port', 'database', 'user', 'password')
findMySQLDatabaseSecretArguments();
}
}
std::pair<size_t, size_t> findBackupNameSecretArguments() const
void findMySQLDatabaseSecretArguments()
{
if (isNamedCollectionName(0))
{
/// MySQL(named_collection, ..., password = 'password', ...)
findSecretNamedArgument("password", 1);
}
else
{
/// MySQL('host:port', 'database', 'user', 'password')
markSecretArgument(3);
}
}
void findBackupNameSecretArguments()
{
const String & engine_name = function.name;
if (engine_name == "S3")
{
/// BACKUP ... TO S3(url, [aws_access_key_id, aws_secret_access_key])
return {2, 3};
}
else
{
return npos;
markSecretArgument(2);
}
}
const ASTFunction & function;
const ASTs * arguments = nullptr;
/// Whether a specified argument can be the name of a named collection?
bool isNamedCollectionName(size_t arg_idx) const
{
if (arguments->size() <= arg_idx)
return false;
const auto * identifier = (*arguments)[arg_idx]->as<ASTIdentifier>();
return identifier != nullptr;
}
/// Looks for a secret argument with a specified name. This function looks for arguments in format `key=value` where the key is specified.
void findSecretNamedArgument(const std::string_view & key, size_t start = 0)
{
for (size_t i = start; i < arguments->size(); ++i)
{
const auto & argument = (*arguments)[i];
const auto * equals_func = argument->as<ASTFunction>();
if (!equals_func || (equals_func->name != "equals"))
continue;
const auto * expr_list = equals_func->arguments->as<ASTExpressionList>();
if (!expr_list)
continue;
const auto & equal_args = expr_list->children;
if (equal_args.size() != 2)
continue;
String found_key;
if (!tryGetStringFromArgument(*equal_args[0], &found_key))
continue;
if (found_key == key)
markSecretArgument(i, /* argument_is_named= */ true);
}
}
};
}
@ -966,32 +1071,39 @@ void ASTFunction::formatImplWithoutAlias(const FormatSettings & settings, Format
&& (name == "match" || name == "extract" || name == "extractAll" || name == "replaceRegexpOne"
|| name == "replaceRegexpAll");
auto secret_arguments = std::make_pair(static_cast<size_t>(-1), static_cast<size_t>(-1));
FunctionSecretArgumentsFinder::Result secret_arguments;
if (!settings.show_secrets)
secret_arguments = FunctionSecretArgumentsFinder(*this).getRange();
secret_arguments = FunctionSecretArgumentsFinder{*this}.getResult();
for (size_t i = 0, size = arguments->children.size(); i < size; ++i)
{
if (i != 0)
settings.ostr << ", ";
if (arguments->children[i]->as<ASTSetQuery>())
const auto & argument = arguments->children[i];
if (argument->as<ASTSetQuery>())
settings.ostr << "SETTINGS ";
if (!settings.show_secrets && (secret_arguments.first <= i) && (i < secret_arguments.second))
if (!settings.show_secrets && (secret_arguments.start <= i) && (i < secret_arguments.start + secret_arguments.count))
{
if (secret_arguments.are_named)
{
assert_cast<const ASTFunction *>(argument.get())->arguments->children[0]->formatImpl(settings, state, nested_dont_need_parens);
settings.ostr << (settings.hilite ? hilite_operator : "") << " = " << (settings.hilite ? hilite_none : "");
}
settings.ostr << "'[HIDDEN]'";
if (size - 1 < secret_arguments.second)
if (size <= secret_arguments.start + secret_arguments.count && !secret_arguments.are_named)
break; /// All other arguments should also be hidden.
continue;
}
if ((i == 1) && special_hilite_regexp
&& highlightStringLiteralWithMetacharacters(arguments->children[i], settings, "|()^$.[]?*+{:-"))
&& highlightStringLiteralWithMetacharacters(argument, settings, "|()^$.[]?*+{:-"))
{
continue;
}
arguments->children[i]->formatImpl(settings, state, nested_dont_need_parens);
argument->formatImpl(settings, state, nested_dont_need_parens);
}
}
@ -1005,14 +1117,7 @@ void ASTFunction::formatImplWithoutAlias(const FormatSettings & settings, Format
bool ASTFunction::hasSecretParts() const
{
if (arguments)
{
size_t num_arguments = arguments->children.size();
auto secret_arguments = FunctionSecretArgumentsFinder(*this).getRange();
if ((secret_arguments.first < num_arguments) && (secret_arguments.first < secret_arguments.second))
return true;
}
return childrenHaveSecretParts();
return (FunctionSecretArgumentsFinder{*this}.getResult().count > 0) || childrenHaveSecretParts();
}
String getFunctionName(const IAST * ast)

View File

@ -1543,13 +1543,6 @@ bool MutateTask::prepare()
auto context_for_reading = Context::createCopy(ctx->context);
/// We must read with one thread because it guarantees that output stream will be sorted.
/// Disable all settings that can enable reading with several streams.
context_for_reading->setSetting("max_streams_to_max_threads_ratio", 1);
context_for_reading->setSetting("max_threads", 1);
context_for_reading->setSetting("allow_asynchronous_read_from_io_pool_for_merge_tree", false);
context_for_reading->setSetting("max_streams_for_merge_tree_reading", Field(0));
/// Allow mutations to work when force_index_by_date or force_primary_key is on.
context_for_reading->setSetting("force_index_by_date", false);
context_for_reading->setSetting("force_primary_key", false);
@ -1562,7 +1555,7 @@ bool MutateTask::prepare()
}
if (ctx->source_part->isStoredOnDisk() && !isStorageTouchedByMutations(
*ctx->data, ctx->source_part, ctx->metadata_snapshot, ctx->commands_for_part, Context::createCopy(context_for_reading)))
*ctx->data, ctx->source_part, ctx->metadata_snapshot, ctx->commands_for_part, context_for_reading))
{
NameSet files_to_copy_instead_of_hardlinks;
auto settings_ptr = ctx->data->getSettings();
@ -1597,6 +1590,15 @@ bool MutateTask::prepare()
LOG_TRACE(ctx->log, "Mutating part {} to mutation version {}", ctx->source_part->name, ctx->future_part->part_info.mutation);
}
/// We must read with one thread because it guarantees that output stream will be sorted.
/// Disable all settings that can enable reading with several streams.
/// NOTE: isStorageTouchedByMutations() above is done without this settings because it
/// should be ok to calculate count() with multiple streams.
context_for_reading->setSetting("max_streams_to_max_threads_ratio", 1);
context_for_reading->setSetting("max_threads", 1);
context_for_reading->setSetting("allow_asynchronous_read_from_io_pool_for_merge_tree", false);
context_for_reading->setSetting("max_streams_for_merge_tree_reading", Field(0));
MutationHelpers::splitMutationCommands(ctx->source_part, ctx->commands_for_part, ctx->for_interpreter, ctx->for_file_renames);
ctx->stage_progress = std::make_unique<MergeStageProgress>(1.0);

View File

@ -208,6 +208,8 @@ Merge it only if you intend to backport changes to the target branch, otherwise
self.cherrypick_pr.add_to_labels(Labels.CHERRYPICK)
self.cherrypick_pr.add_to_labels(Labels.DO_NOT_TEST)
self._assign_new_pr(self.cherrypick_pr)
# update cherrypick PR to get the state for PR.mergable
self.cherrypick_pr.update()
def create_backport(self):
assert self.cherrypick_pr is not None

View File

@ -953,6 +953,7 @@
"topKWeighted"
"stochasticLinearRegression"
"corr"
"corrMatrix"
"uniqCombined64"
"intervalLengthSum"
"uniqCombined"
@ -967,6 +968,7 @@
"quantiles"
"sum"
"covarPop"
"covarPopMatrix"
"row_number"
"kurtPop"
"kurtSamp"
@ -1021,6 +1023,7 @@
"quantilesTiming"
"welchTTest"
"covarSamp"
"covarSampMatrix"
"varPopStable"
"quantileTiming"
"quantileExactInclusive"

View File

@ -0,0 +1,10 @@
<clickhouse>
<named_collections>
<named_collection_1/>
<named_collection_2/>
<named_collection_3/>
<named_collection_4/>
<named_collection_5/>
<named_collection_6/>
</named_collections>
</clickhouse>

View File

@ -4,7 +4,13 @@ from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance("node", with_zookeeper=True)
node = cluster.add_instance(
"node",
main_configs=[
"configs/named_collections.xml",
],
with_zookeeper=True,
)
@pytest.fixture(scope="module", autouse=True)
@ -116,6 +122,12 @@ def test_create_table():
f"S3('http://minio1:9001/root/data/test3.csv.gz', 'CSV', 'gzip')",
f"S3('http://minio1:9001/root/data/test4.csv', 'minio', '{password}', 'CSV')",
f"S3('http://minio1:9001/root/data/test5.csv.gz', 'minio', '{password}', 'CSV', 'gzip')",
f"MySQL(named_collection_1, host = 'mysql57', port = 3306, database = 'mysql_db', table = 'mysql_table', user = 'mysql_user', password = '{password}')",
f"MySQL(named_collection_2, database = 'mysql_db', host = 'mysql57', port = 3306, password = '{password}', table = 'mysql_table', user = 'mysql_user')",
f"MySQL(named_collection_3, database = 'mysql_db', host = 'mysql57', port = 3306, table = 'mysql_table')",
f"PostgreSQL(named_collection_4, host = 'postgres1', port = 5432, database = 'postgres_db', table = 'postgres_table', user = 'postgres_user', password = '{password}')",
f"MongoDB(named_collection_5, host = 'mongo1', port = 5432, database = 'mongo_db', collection = 'mongo_col', user = 'mongo_user', password = '{password}')",
f"S3(named_collection_6, url = 'http://minio1:9001/root/data/test8.csv', access_key_id = 'minio', secret_access_key = '{password}', format = 'CSV')",
]
for i, table_engine in enumerate(table_engines):
@ -147,6 +159,12 @@ def test_create_table():
"CREATE TABLE table5 (x int) ENGINE = S3('http://minio1:9001/root/data/test3.csv.gz', 'CSV', 'gzip')",
"CREATE TABLE table6 (`x` int) ENGINE = S3('http://minio1:9001/root/data/test4.csv', 'minio', '[HIDDEN]', 'CSV')",
"CREATE TABLE table7 (`x` int) ENGINE = S3('http://minio1:9001/root/data/test5.csv.gz', 'minio', '[HIDDEN]', 'CSV', 'gzip')",
"CREATE TABLE table8 (`x` int) ENGINE = MySQL(named_collection_1, host = 'mysql57', port = 3306, database = 'mysql_db', table = 'mysql_table', user = 'mysql_user', password = '[HIDDEN]')",
"CREATE TABLE table9 (`x` int) ENGINE = MySQL(named_collection_2, database = 'mysql_db', host = 'mysql57', port = 3306, password = '[HIDDEN]', table = 'mysql_table', user = 'mysql_user')",
"CREATE TABLE table10 (x int) ENGINE = MySQL(named_collection_3, database = 'mysql_db', host = 'mysql57', port = 3306, table = 'mysql_table')",
"CREATE TABLE table11 (`x` int) ENGINE = PostgreSQL(named_collection_4, host = 'postgres1', port = 5432, database = 'postgres_db', table = 'postgres_table', user = 'postgres_user', password = '[HIDDEN]')",
"CREATE TABLE table12 (`x` int) ENGINE = MongoDB(named_collection_5, host = 'mongo1', port = 5432, database = 'mongo_db', collection = 'mongo_col', user = 'mongo_user', password = '[HIDDEN]'",
"CREATE TABLE table13 (`x` int) ENGINE = S3(named_collection_6, url = 'http://minio1:9001/root/data/test8.csv', access_key_id = 'minio', secret_access_key = '[HIDDEN]', format = 'CSV')",
],
must_not_contain=[password],
)
@ -160,6 +178,7 @@ def test_create_database():
database_engines = [
f"MySQL('localhost:3306', 'mysql_db', 'mysql_user', '{password}') SETTINGS connect_timeout=1, connection_max_tries=1",
f"MySQL(named_collection_1, host = 'localhost', port = 3306, database = 'mysql_db', user = 'mysql_user', password = '{password}') SETTINGS connect_timeout=1, connection_max_tries=1",
# f"PostgreSQL('localhost:5432', 'postgres_db', 'postgres_user', '{password}')",
]
@ -173,7 +192,8 @@ def test_create_database():
check_logs(
must_contain=[
"CREATE DATABASE database0 ENGINE = MySQL('localhost:3306', 'mysql_db', 'mysql_user', '[HIDDEN]')",
# "CREATE DATABASE database1 ENGINE = PostgreSQL('localhost:5432', 'postgres_db', 'postgres_user', '[HIDDEN]')",
"CREATE DATABASE database1 ENGINE = MySQL(named_collection_1, host = 'localhost', port = 3306, database = 'mysql_db', user = 'mysql_user', password = '[HIDDEN]')",
# "CREATE DATABASE database2 ENGINE = PostgreSQL('localhost:5432', 'postgres_db', 'postgres_user', '[HIDDEN]')",
],
must_not_contain=[password],
)
@ -211,6 +231,11 @@ def test_table_functions():
f"remote('127.{{2..11}}', numbers(10), 'remote_user', '{password}', rand())",
f"remoteSecure('127.{{2..11}}', 'default', 'remote_table', 'remote_user', '{password}')",
f"remoteSecure('127.{{2..11}}', 'default', 'remote_table', 'remote_user', rand())",
f"mysql(named_collection_1, host = 'mysql57', port = 3306, database = 'mysql_db', table = 'mysql_table', user = 'mysql_user', password = '{password}')",
f"postgresql(named_collection_2, password = '{password}', host = 'postgres1', port = 5432, database = 'postgres_db', table = 'postgres_table', user = 'postgres_user')",
f"s3(named_collection_3, url = 'http://minio1:9001/root/data/test4.csv', access_key_id = 'minio', secret_access_key = '{password}')",
f"remote(named_collection_4, addresses_expr = '127.{{2..11}}', database = 'default', table = 'remote_table', user = 'remote_user', password = '{password}', sharding_key = rand())",
f"remoteSecure(named_collection_5, addresses_expr = '127.{{2..11}}', database = 'default', table = 'remote_table', user = 'remote_user', password = '{password}')",
]
for i, table_function in enumerate(table_functions):
@ -259,6 +284,11 @@ def test_table_functions():
"CREATE TABLE tablefunc22 (`x` int) AS remote('127.{2..11}', numbers(10), 'remote_user', '[HIDDEN]', rand())",
"CREATE TABLE tablefunc23 (`x` int) AS remoteSecure('127.{2..11}', 'default', 'remote_table', 'remote_user', '[HIDDEN]')",
"CREATE TABLE tablefunc24 (x int) AS remoteSecure('127.{2..11}', 'default', 'remote_table', 'remote_user', rand())",
"CREATE TABLE tablefunc25 (`x` int) AS mysql(named_collection_1, host = 'mysql57', port = 3306, database = 'mysql_db', table = 'mysql_table', user = 'mysql_user', password = '[HIDDEN]')",
"CREATE TABLE tablefunc26 (`x` int) AS postgresql(named_collection_2, password = '[HIDDEN]', host = 'postgres1', port = 5432, database = 'postgres_db', table = 'postgres_table', user = 'postgres_user')",
"CREATE TABLE tablefunc27 (`x` int) AS s3(named_collection_3, url = 'http://minio1:9001/root/data/test4.csv', access_key_id = 'minio', secret_access_key = '[HIDDEN]')",
"CREATE TABLE tablefunc28 (`x` int) AS remote(named_collection_4, addresses_expr = '127.{2..11}', database = 'default', table = 'remote_table', user = 'remote_user', password = '[HIDDEN]', sharding_key = rand())",
"CREATE TABLE tablefunc29 (`x` int) AS remoteSecure(named_collection_5, addresses_expr = '127.{2..11}', database = 'default', table = 'remote_table', user = 'remote_user', password = '[HIDDEN]')",
],
must_not_contain=[password],
)

View File

@ -0,0 +1,49 @@
#!/usr/bin/env bash
# Tags: replica
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
function query_with_retry
{
retry=0
until [ $retry -ge 5 ]
do
result=$($CLICKHOUSE_CLIENT $2 --query="$1" 2>&1)
if [ "$?" == 0 ]; then
echo -n "$result"
return
else
retry=$(($retry + 1))
sleep 3
fi
done
echo "Query '$1' failed with '$result'"
}
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS ttl_repl1"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS ttl_repl2"
$CLICKHOUSE_CLIENT --query="CREATE TABLE ttl_repl1(d Date, x UInt32) ENGINE ReplicatedMergeTree('/clickhouse/tables/{database}/test_00933/ttl_repl', '1') PARTITION BY toDayOfMonth(d) ORDER BY x TTL d + INTERVAL 1 DAY;"
$CLICKHOUSE_CLIENT --query="CREATE TABLE ttl_repl2(d Date, x UInt32) ENGINE ReplicatedMergeTree('/clickhouse/tables/{database}/test_00933/ttl_repl', '2') PARTITION BY toDayOfMonth(d) ORDER BY x TTL d + INTERVAL 1 DAY;"
$CLICKHOUSE_CLIENT --query="INSERT INTO TABLE ttl_repl1 VALUES (toDate('2000-10-10 00:00:00'), 100)"
$CLICKHOUSE_CLIENT --query="INSERT INTO TABLE ttl_repl1 VALUES (toDate('2100-10-10 00:00:00'), 200)"
$CLICKHOUSE_CLIENT --query="ALTER TABLE ttl_repl1 MODIFY TTL d + INTERVAL 1 DAY"
$CLICKHOUSE_CLIENT --query="SYSTEM SYNC REPLICA ttl_repl2"
$CLICKHOUSE_CLIENT --query="INSERT INTO TABLE ttl_repl1 VALUES (toDate('2000-10-10 00:00:00'), 300)"
$CLICKHOUSE_CLIENT --query="INSERT INTO TABLE ttl_repl1 VALUES (toDate('2100-10-10 00:00:00'), 400)"
$CLICKHOUSE_CLIENT --query="SYSTEM SYNC REPLICA ttl_repl2"
query_with_retry "OPTIMIZE TABLE ttl_repl2 FINAL SETTINGS optimize_throw_if_noop = 1"
$CLICKHOUSE_CLIENT --query="SELECT x FROM ttl_repl2 ORDER BY x"
$CLICKHOUSE_CLIENT --query="SHOW CREATE TABLE ttl_repl2"
$CLICKHOUSE_CLIENT --query="DROP TABLE ttl_repl1"
$CLICKHOUSE_CLIENT --query="DROP TABLE ttl_repl2"

View File

@ -1,30 +0,0 @@
-- Tags: long, replica
DROP TABLE IF EXISTS ttl_repl1;
DROP TABLE IF EXISTS ttl_repl2;
CREATE TABLE ttl_repl1(d Date, x UInt32) ENGINE ReplicatedMergeTree('/clickhouse/tables/{database}/test_00933/ttl_repl', '1')
PARTITION BY toDayOfMonth(d) ORDER BY x TTL d + INTERVAL 1 DAY;
CREATE TABLE ttl_repl2(d Date, x UInt32) ENGINE ReplicatedMergeTree('/clickhouse/tables/{database}/test_00933/ttl_repl', '2')
PARTITION BY toDayOfMonth(d) ORDER BY x TTL d + INTERVAL 1 DAY;
INSERT INTO TABLE ttl_repl1 VALUES (toDate('2000-10-10 00:00:00'), 100);
INSERT INTO TABLE ttl_repl1 VALUES (toDate('2100-10-10 00:00:00'), 200);
ALTER TABLE ttl_repl1 MODIFY TTL d + INTERVAL 1 DAY;
SYSTEM SYNC REPLICA ttl_repl2;
INSERT INTO TABLE ttl_repl1 VALUES (toDate('2000-10-10 00:00:00'), 300);
INSERT INTO TABLE ttl_repl1 VALUES (toDate('2100-10-10 00:00:00'), 400);
SYSTEM SYNC REPLICA ttl_repl2;
SELECT sleep(1) format Null; -- wait for probable merges after inserts
OPTIMIZE TABLE ttl_repl2 FINAL;
SELECT x FROM ttl_repl2 ORDER BY x;
SHOW CREATE TABLE ttl_repl2;
DROP TABLE ttl_repl1;
DROP TABLE ttl_repl2;

View File

@ -0,0 +1,18 @@
[[nan]]
[[nan]]
[[nan,nan,nan,nan],[nan,nan,nan,nan],[nan,nan,nan,nan],[nan,nan,nan,nan]]
[[nan,nan,nan,nan],[nan,nan,nan,nan],[nan,nan,nan,nan],[nan,nan,nan,nan]]
[[1,-0.09561,0.24287,0.74554],[-0.09561,1,0.17303,0.10558],[0.24287,0.17303,1,0.25797],[0.74554,0.10558,0.25797,1]]
0 0 0
[[nan]]
[[nan]]
[[nan,nan,nan,nan],[nan,nan,nan,nan],[nan,nan,nan,nan],[nan,nan,nan,nan]]
[[nan,nan,nan,nan],[nan,nan,nan,nan],[nan,nan,nan,nan],[nan,nan,nan,nan]]
[[9.16667,-1.95556,4.5335,7.49776],[-1.95556,45.63378,7.20628,2.36899],[4.5335,7.20628,38.01103,5.28296],[7.49776,2.36899,5.28296,11.03352]]
0 0 0
[[nan]]
[[0]]
[[nan,nan,nan,nan],[nan,nan,nan,nan],[nan,nan,nan,nan],[nan,nan,nan,nan]]
[[0,0,0,0],[0,0,0,0],[0,0,0,0],[0,0,0,0]]
[[8.25,-1.76,4.08015,6.74799],[-1.76,41.0704,6.48565,2.13209],[4.08015,6.48565,34.20993,4.75467],[6.74799,2.13209,4.75467,9.93017]]
0 0 0

View File

@ -0,0 +1,41 @@
DROP TABLE IF EXISTS fh;
CREATE TABLE fh(a_value UInt32, b_value Float64, c_value Float64, d_value Float64) ENGINE = Memory;
INSERT INTO fh(a_value, b_value, c_value, d_value) VALUES (1, 5.6,-4.4, 2.6),(2, -9.6, 3, 3.3),(3, -1.3,-4, 1.2),(4, 5.3,9.7,2.3),(5, 4.4,0.037,1.222),(6, -8.6,-7.8,2.1233),(7, 5.1,9.3,8.1222),(8, 7.9,-3.6,9.837),(9, -8.2,0.62,8.43555),(10, -3,7.3,6.762);
SELECT corrMatrix(a_value) FROM (select a_value from fh limit 0);
SELECT corrMatrix(a_value) FROM (select a_value from fh limit 1);
SELECT corrMatrix(a_value, b_value, c_value, d_value) FROM (select a_value, b_value, c_value, d_value from fh limit 0);
SELECT corrMatrix(a_value, b_value, c_value, d_value) FROM (select a_value, b_value, c_value, d_value from fh limit 1);
SELECT arrayMap(x -> arrayMap(y -> round(y, 5), x), corrMatrix(a_value, b_value, c_value, d_value)) FROM fh;
SELECT round(abs(corr(x1,x2) - corrMatrix(x1,x2)[1][2]), 5), round(abs(corr(x1,x1) - corrMatrix(x1,x2)[1][1]), 5), round(abs(corr(x2,x2) - corrMatrix(x1,x2)[2][2]), 5) from (select randNormal(100, 1) as x1, randNormal(100,5) as x2 from numbers(100000));
SELECT covarSampMatrix(a_value) FROM (select a_value from fh limit 0);
SELECT covarSampMatrix(a_value) FROM (select a_value from fh limit 1);
SELECT covarSampMatrix(a_value, b_value, c_value, d_value) FROM (select a_value, b_value, c_value, d_value from fh limit 0);
SELECT covarSampMatrix(a_value, b_value, c_value, d_value) FROM (select a_value, b_value, c_value, d_value from fh limit 1);
SELECT arrayMap(x -> arrayMap(y -> round(y, 5), x), covarSampMatrix(a_value, b_value, c_value, d_value)) FROM fh;
SELECT round(abs(covarSamp(x1,x2) - covarSampMatrix(x1,x2)[1][2]), 5), round(abs(covarSamp(x1,x1) - covarSampMatrix(x1,x2)[1][1]), 5), round(abs(covarSamp(x2,x2) - covarSampMatrix(x1,x2)[2][2]), 5) from (select randNormal(100, 1) as x1, randNormal(100,5) as x2 from numbers(100000));
SELECT covarPopMatrix(a_value) FROM (select a_value from fh limit 0);
SELECT covarPopMatrix(a_value) FROM (select a_value from fh limit 1);
SELECT covarPopMatrix(a_value, b_value, c_value, d_value) FROM (select a_value, b_value, c_value, d_value from fh limit 0);
SELECT covarPopMatrix(a_value, b_value, c_value, d_value) FROM (select a_value, b_value, c_value, d_value from fh limit 1);
SELECT arrayMap(x -> arrayMap(y -> round(y, 5), x), covarPopMatrix(a_value, b_value, c_value, d_value)) FROM fh;
SELECT round(abs(covarPop(x1,x2) - covarPopMatrix(x1,x2)[1][2]), 5), round(abs(covarPop(x1,x1) - covarPopMatrix(x1,x2)[1][1]), 5), round(abs(covarPop(x2,x2) - covarPopMatrix(x1,x2)[2][2]), 5) from (select randNormal(100, 1) as x1, randNormal(100,5) as x2 from numbers(100000));

View File

@ -0,0 +1,27 @@
2149-06-06 65535
2149-06-06 toUInt16(65535)
2149-06-06 toInt32(65535)
2149-06-06 toUInt32(65535)
2149-06-06 toDate(65535)
2149-06-06 CAST(65535 as UInt16)
2149-06-06 CAST(65535 as Int32)
2149-06-06 CAST(65535 as UInt32)
2149-06-06 CAST(65535 as Date)
2149-06-05 65534
2149-06-05 toUInt16(65534)
2149-06-05 toInt32(65534)
2149-06-05 toUInt32(65534)
2149-06-05 toDate(65534)
2149-06-05 CAST(65534 as UInt16)
2149-06-05 CAST(65534 as Int32)
2149-06-05 CAST(65534 as UInt32)
2149-06-05 CAST(65534 as Date)
1970-01-01 65536
1970-01-01 toUInt16(65536)
1970-01-01 toInt32(65536)
1970-01-01 toUInt32(65536)
1970-01-01 toDate(65536)
1970-01-01 CAST(65536 as UInt16)
1970-01-01 CAST(65536 as Int32)
1970-01-01 CAST(65536 as UInt32)
1970-01-01 CAST(65536 as Date)

View File

@ -0,0 +1,72 @@
DROP TABLE IF EXISTS 02540_date;
CREATE TABLE 02540_date (txt String, x Date) engine=Memory;
-- Date: Supported range of values: [1970-01-01, 2149-06-06].
-- ^----closed interval---^
INSERT INTO 02540_date VALUES('65535', 65535);
INSERT INTO 02540_date VALUES('toUInt16(65535)', toUInt16(65535)); -- #43370 weird one -> used to be 1970-01-01
INSERT INTO 02540_date VALUES('toInt32(65535)', toInt32(65535));
INSERT INTO 02540_date VALUES('toUInt32(65535)', toUInt32(65535));
INSERT INTO 02540_date VALUES('toDate(65535)', toDate(65535));
INSERT INTO 02540_date VALUES('CAST(65535 as UInt16)', CAST(65535 as UInt16));
INSERT INTO 02540_date VALUES('CAST(65535 as Int32)', CAST(65535 as Int32));
INSERT INTO 02540_date VALUES('CAST(65535 as UInt32)', CAST(65535 as UInt32));
INSERT INTO 02540_date VALUES('CAST(65535 as Date)', CAST(65535 as Date));
INSERT INTO 02540_date VALUES('65534', 65534);
INSERT INTO 02540_date VALUES('toUInt16(65534)', toUInt16(65534));
INSERT INTO 02540_date VALUES('toInt32(65534)', toInt32(65534));
INSERT INTO 02540_date VALUES('toUInt32(65534)', toUInt32(65534));
INSERT INTO 02540_date VALUES('toDate(65534)', toDate(65534));
INSERT INTO 02540_date VALUES('CAST(65534 as UInt16)', CAST(65534 as UInt16));
INSERT INTO 02540_date VALUES('CAST(65534 as Int32)', CAST(65534 as Int32));
INSERT INTO 02540_date VALUES('CAST(65534 as UInt32)', CAST(65534 as UInt32));
INSERT INTO 02540_date VALUES('CAST(65534 as Date)', CAST(65534 as Date));
INSERT INTO 02540_date VALUES('65536', 65536);
INSERT INTO 02540_date VALUES('toUInt16(65536)', toUInt16(65536));
INSERT INTO 02540_date VALUES('toInt32(65536)', toInt32(65536));
INSERT INTO 02540_date VALUES('toUInt32(65536)', toUInt32(65536));
INSERT INTO 02540_date VALUES('toDate(65536)', toDate(65536));
INSERT INTO 02540_date VALUES('CAST(65536 as UInt16)', CAST(65536 as UInt16));
INSERT INTO 02540_date VALUES('CAST(65536 as Int32)', CAST(65536 as Int32));
INSERT INTO 02540_date VALUES('CAST(65536 as UInt32)', CAST(65536 as UInt32));
INSERT INTO 02540_date VALUES('CAST(65536 as Date)', CAST(65536 as Date));
SELECT x, txt FROM 02540_date WHERE txt == '65535';
SELECT x, txt FROM 02540_date WHERE txt == 'toUInt16(65535)';
SELECT x, txt FROM 02540_date WHERE txt == 'toInt32(65535)';
SELECT x, txt FROM 02540_date WHERE txt == 'toUInt32(65535)';
SELECT x, txt FROM 02540_date WHERE txt == 'toDate(65535)';
SELECT x, txt FROM 02540_date WHERE txt == 'CAST(65535 as UInt16)';
SELECT x, txt FROM 02540_date WHERE txt == 'CAST(65535 as Int32)';
SELECT x, txt FROM 02540_date WHERE txt == 'CAST(65535 as UInt32)';
SELECT x, txt FROM 02540_date WHERE txt == 'CAST(65535 as Date)';
SELECT x, txt FROM 02540_date WHERE txt == '65534';
SELECT x, txt FROM 02540_date WHERE txt == 'toUInt16(65534)';
SELECT x, txt FROM 02540_date WHERE txt == 'toInt32(65534)';
SELECT x, txt FROM 02540_date WHERE txt == 'toUInt32(65534)';
SELECT x, txt FROM 02540_date WHERE txt == 'toDate(65534)';
SELECT x, txt FROM 02540_date WHERE txt == 'CAST(65534 as UInt16)';
SELECT x, txt FROM 02540_date WHERE txt == 'CAST(65534 as Int32)';
SELECT x, txt FROM 02540_date WHERE txt == 'CAST(65534 as UInt32)';
SELECT x, txt FROM 02540_date WHERE txt == 'CAST(65534 as Date)';
SELECT x, txt FROM 02540_date WHERE txt == '65536';
SELECT x, txt FROM 02540_date WHERE txt == 'toUInt16(65536)';
SELECT x, txt FROM 02540_date WHERE txt == 'toInt32(65536)';
SELECT x, txt FROM 02540_date WHERE txt == 'toUInt32(65536)';
SELECT x, txt FROM 02540_date WHERE txt == 'toDate(65536)';
SELECT x, txt FROM 02540_date WHERE txt == 'CAST(65536 as UInt16)';
SELECT x, txt FROM 02540_date WHERE txt == 'CAST(65536 as Int32)';
SELECT x, txt FROM 02540_date WHERE txt == 'CAST(65536 as UInt32)';
SELECT x, txt FROM 02540_date WHERE txt == 'CAST(65536 as Date)';

View File

@ -0,0 +1,2 @@
85.85.85.85
138.68.230.86

View File

@ -0,0 +1,4 @@
CREATE TABLE ip4test (ip IPv4) ENGINE=Memory;
INSERT INTO ip4test VALUES (22906492245), (2319771222);
SELECT * FROM ip4test;
DROP TABLE ip4test;

View File

@ -69,7 +69,8 @@ int main(int argc, char *argv[])
LOG_INFO(logger, "Last committed index: {}", last_commited_index);
DB::KeeperLogStore changelog(argv[2], 10000000, true, settings->compress_logs);
DB::KeeperLogStore changelog(
argv[2], LogFileSettings{.force_sync = true, .compress_logs = settings->compress_logs, .rotate_interval = 10000000});
changelog.init(last_commited_index, 10000000000UL); /// collect all logs
if (changelog.size() == 0)
LOG_INFO(logger, "Changelog empty");