Merge branch 'master' into break_some_tests

This commit is contained in:
mergify[bot] 2021-08-20 13:24:17 +00:00 committed by GitHub
commit 02b4e56e6a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
97 changed files with 1399 additions and 473 deletions

2
contrib/librdkafka vendored

@ -1 +1 @@
Subproject commit 43491d33ca2826531d1e3cae70d4bf1e5249e3c9
Subproject commit b8554f1682062c85ba519eb54ef2f90e02b812cb

View File

@ -628,9 +628,6 @@ cat analyze/errors.log >> report/errors.log ||:
cat profile-errors.log >> report/errors.log ||:
clickhouse-local --query "
-- We use decimals specifically to get fixed-point, fixed-width formatting.
set output_format_decimal_trailing_zeros = 1;
create view query_display_names as select * from
file('analyze/query-display-names.tsv', TSV,
'test text, query_index int, query_display_name text')
@ -644,6 +641,7 @@ create view partial_query_times as select * from
-- Report for partial queries that we could only run on the new server (e.g.
-- queries with new functions added in the tested PR).
create table partial_queries_report engine File(TSV, 'report/partial-queries-report.tsv')
settings output_format_decimal_trailing_zeros = 1
as select toDecimal64(time_median, 3) time,
toDecimal64(time_stddev / time_median, 3) relative_time_stddev,
test, query_index, query_display_name
@ -716,8 +714,9 @@ create table queries engine File(TSVWithNamesAndTypes, 'report/queries.tsv')
order by test, query_index, metric_name
;
create table changed_perf_report engine File(TSV, 'report/changed-perf.tsv') as
with
create table changed_perf_report engine File(TSV, 'report/changed-perf.tsv')
settings output_format_decimal_trailing_zeros = 1
as with
-- server_time is sometimes reported as zero (if it's less than 1 ms),
-- so we have to work around this to not get an error about conversion
-- of NaN to decimal.
@ -733,8 +732,9 @@ create table changed_perf_report engine File(TSV, 'report/changed-perf.tsv') as
changed_fail, test, query_index, query_display_name
from queries where changed_show order by abs(diff) desc;
create table unstable_queries_report engine File(TSV, 'report/unstable-queries.tsv') as
select
create table unstable_queries_report engine File(TSV, 'report/unstable-queries.tsv')
settings output_format_decimal_trailing_zeros = 1
as select
toDecimal64(left, 3), toDecimal64(right, 3), toDecimal64(diff, 3),
toDecimal64(stat_threshold, 3), unstable_fail, test, query_index, query_display_name
from queries where unstable_show order by stat_threshold desc;
@ -764,8 +764,9 @@ create view total_speedup as
from test_speedup
;
create table test_perf_changes_report engine File(TSV, 'report/test-perf-changes.tsv') as
with
create table test_perf_changes_report engine File(TSV, 'report/test-perf-changes.tsv')
settings output_format_decimal_trailing_zeros = 1
as with
(times_speedup >= 1
? '-' || toString(toDecimal64(times_speedup, 3)) || 'x'
: '+' || toString(toDecimal64(1 / times_speedup, 3)) || 'x')
@ -791,8 +792,9 @@ create view total_client_time_per_query as select *
from file('analyze/client-times.tsv', TSV,
'test text, query_index int, client float, server float');
create table slow_on_client_report engine File(TSV, 'report/slow-on-client.tsv') as
select client, server, toDecimal64(client/server, 3) p,
create table slow_on_client_report engine File(TSV, 'report/slow-on-client.tsv')
settings output_format_decimal_trailing_zeros = 1
as select client, server, toDecimal64(client/server, 3) p,
test, query_display_name
from total_client_time_per_query left join query_display_names using (test, query_index)
where p > toDecimal64(1.02, 3) order by p desc;
@ -877,8 +879,9 @@ create view test_times_view_total as
from test_times_view
;
create table test_times_report engine File(TSV, 'report/test-times.tsv') as
select
create table test_times_report engine File(TSV, 'report/test-times.tsv')
settings output_format_decimal_trailing_zeros = 1
as select
test,
toDecimal64(real, 3),
toDecimal64(total_client_time, 3),
@ -896,8 +899,9 @@ create table test_times_report engine File(TSV, 'report/test-times.tsv') as
;
-- report for all queries page, only main metric
create table all_tests_report engine File(TSV, 'report/all-queries.tsv') as
with
create table all_tests_report engine File(TSV, 'report/all-queries.tsv')
settings output_format_decimal_trailing_zeros = 1
as with
-- server_time is sometimes reported as zero (if it's less than 1 ms),
-- so we have to work around this to not get an error about conversion
-- of NaN to decimal.
@ -978,9 +982,6 @@ for version in {right,left}
do
rm -rf data
clickhouse-local --query "
-- We use decimals specifically to get fixed-point, fixed-width formatting.
set output_format_decimal_trailing_zeros = 1;
create view query_profiles as
with 0 as left, 1 as right
select * from file('analyze/query-profiles.tsv', TSV,
@ -1063,9 +1064,10 @@ create table unstable_run_traces engine File(TSVWithNamesAndTypes,
;
create table metric_devation engine File(TSVWithNamesAndTypes,
'report/metric-deviation.$version.tsv') as
'report/metric-deviation.$version.tsv')
settings output_format_decimal_trailing_zeros = 1
-- first goes the key used to split the file with grep
select test, query_index, query_display_name,
as select test, query_index, query_display_name,
toDecimal64(d, 3) d, q, metric
from (
select
@ -1176,9 +1178,6 @@ rm -rf metrics ||:
mkdir metrics
clickhouse-local --query "
-- We use decimals specifically to get fixed-point, fixed-width formatting.
set output_format_decimal_trailing_zeros = 1;
create view right_async_metric_log as
select * from file('right-async-metric-log.tsv', TSVWithNamesAndTypes,
'$(cat right-async-metric-log.tsv.columns)')
@ -1196,8 +1195,9 @@ create table metrics engine File(TSV, 'metrics/metrics.tsv') as
;
-- Show metrics that have changed
create table changes engine File(TSV, 'metrics/changes.tsv') as
select metric, left, right,
create table changes engine File(TSV, 'metrics/changes.tsv')
settings output_format_decimal_trailing_zeros = 1
as select metric, left, right,
toDecimal64(diff, 3), toDecimal64(times_diff, 3)
from (
select metric, median(left) as left, median(right) as right,

View File

@ -105,7 +105,7 @@ We use `Decimal` data type to store prices. Everything else is quite straightfor
## Import Data
Upload data into ClickHouse in parallel:
Upload data into ClickHouse:
```
clickhouse-client --format_csv_allow_single_quotes 0 --input_format_null_as_default 0 --query "INSERT INTO dish FORMAT CSVWithNames" < Dish.csv

View File

@ -114,5 +114,5 @@ Seamlessly migration from ZooKeeper to `clickhouse-keeper` is impossible you hav
clickhouse-keeper-converter --zookeeper-logs-dir /var/lib/zookeeper/version-2 --zookeeper-snapshots-dir /var/lib/zookeeper/version-2 --output-dir /path/to/clickhouse/keeper/snapshots
```
4. Copy snapshot to `clickhouse-server` nodes with configured `keeper` or start `clickhouse-keeper` instead of ZooKeeper. Snapshot must persist only on leader node, leader will sync it automatically to other nodes.
4. Copy snapshot to `clickhouse-server` nodes with configured `keeper` or start `clickhouse-keeper` instead of ZooKeeper. Snapshot must persist on all nodes, otherwise empty nodes can be faster and one of them can becamse leader.

View File

@ -1339,3 +1339,149 @@ Result:
│ 2,"good" │
└───────────────────────────────────────────┘
```
## snowflakeToDateTime {#snowflakeToDateTime}
Extract time from snowflake id as DateTime format.
**Syntax**
``` sql
snowflakeToDateTime(value [, time_zone])
```
**Parameters**
- `value``snowflake id`, Int64 value.
- `time_zone` — [Timezone](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-timezone). The function parses `time_string` according to the timezone. Optional. [String](../../sql-reference/data-types/string.md).
**Returned value**
- value converted to the `DateTime` data type.
**Example**
Query:
``` sql
SELECT snowflakeToDateTime(CAST('1426860702823350272', 'Int64'), 'UTC');
```
Result:
``` text
┌─snowflakeToDateTime(CAST('1426860702823350272', 'Int64'), 'UTC')─┐
│ 2021-08-15 10:57:56 │
└──────────────────────────────────────────────────────────────────┘
```
## snowflakeToDateTime64 {#snowflakeToDateTime64}
Extract time from snowflake id as DateTime64 format.
**Syntax**
``` sql
snowflakeToDateTime64(value [, time_zone])
```
**Parameters**
- `value``snowflake id`, Int64 value.
- `time_zone` — [Timezone](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-timezone). The function parses `time_string` according to the timezone. Optional. [String](../../sql-reference/data-types/string.md).
**Returned value**
- value converted to the `DateTime64` data type.
**Example**
Query:
``` sql
SELECT snowflakeToDateTime64(CAST('1426860802823350272', 'Int64'), 'UTC');
```
Result:
``` text
┌─snowflakeToDateTime64(CAST('1426860802823350272', 'Int64'), 'UTC')─┐
│ 2021-08-15 10:58:19.841 │
└────────────────────────────────────────────────────────────────────┘
```
## dateTimeToSnowflake {#dateTimeToSnowflake}
Convert DateTime to the first snowflake id at the giving time.
**Syntax**
``` sql
dateTimeToSnowflake(value)
```
**Parameters**
- `value` — Date and time. [DateTime](../../sql-reference/data-types/datetime.md).
**Returned value**
- `value` converted to the `Int64` data type as the first snowflake id at that time.
**Example**
Query:
``` sql
WITH toDateTime('2021-08-15 18:57:56', 'Asia/Shanghai') AS dt
SELECT dateTimeToSnowflake(dt);
```
Result:
``` text
┌─dateTimeToSnowflake(dt)─┐
│ 1426860702823350272 │
└─────────────────────────┘
```
## dateTime64ToSnowflake {#dateTime64ToSnowflake}
Convert DateTime64 to the first snowflake id at the giving time.
**Syntax**
``` sql
dateTime64ToSnowflake(value)
```
**Parameters**
- `value` — Date and time. [DateTime64](../../sql-reference/data-types/datetime64.md).
**Returned value**
- `value` converted to the `Int64` data type as the first snowflake id at that time.
**Example**
Query:
``` sql
WITH toDateTime64('2021-08-15 18:57:56.492', 3, 'Asia/Shanghai') AS dt64
SELECT dateTime64ToSnowflake(dt64);
```
Result:
``` text
┌─dateTime64ToSnowflake(dt64)─┐
│ 1426860704886947840 │
└─────────────────────────────┘
```

View File

@ -12,6 +12,7 @@
#include <Interpreters/executeQuery.h>
#include <Interpreters/loadMetadata.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/Session.h>
#include <Common/Exception.h>
#include <Common/Macros.h>
#include <Common/Config/ConfigProcessor.h>
@ -374,14 +375,13 @@ void LocalServer::processQueries()
if (!parse_res.second)
throw Exception("Cannot parse and execute the following part of query: " + String(parse_res.first), ErrorCodes::SYNTAX_ERROR);
/// we can't mutate global global_context (can lead to races, as it was already passed to some background threads)
/// so we can't reuse it safely as a query context and need a copy here
auto context = Context::createCopy(global_context);
/// Authenticate and create a context to execute queries.
Session session{global_context, ClientInfo::Interface::TCP};
session.authenticate("default", "", Poco::Net::SocketAddress{});
context->makeSessionContext();
context->makeQueryContext();
context->authenticate("default", "", Poco::Net::SocketAddress{});
/// Use the same context for all queries.
auto context = session.makeQueryContext();
context->makeSessionContext(); /// initial_create_query requires a session context to be set.
context->setCurrentQueryId("");
applyCmdSettings(context);

View File

@ -54,7 +54,6 @@
#include <Interpreters/ExternalLoaderXMLConfigRepository.h>
#include <Interpreters/InterserverCredentials.h>
#include <Interpreters/JIT/CompiledExpressionCache.h>
#include <Interpreters/Session.h>
#include <Access/AccessControlManager.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/System/attachSystemTables.h>
@ -1431,7 +1430,6 @@ if (ThreadFuzzer::instance().isEffective())
/// Must be done after initialization of `servers`, because async_metrics will access `servers` variable from its thread.
async_metrics.start();
Session::startupNamedSessions();
{
String level_str = config().getString("text_log.level", "");

View File

@ -122,6 +122,24 @@ bool pathStartsWith(const std::filesystem::path & path, const std::filesystem::p
return path_starts_with_prefix_path;
}
bool symlinkStartsWith(const std::filesystem::path & path, const std::filesystem::path & prefix_path)
{
/// Differs from pathStartsWith in how `path` is normalized before comparison.
/// Make `path` absolute if it was relative and put it into normalized form: remove
/// `.` and `..` and extra `/`. Path is not canonized because otherwise path will
/// not be a path of a symlink itself.
auto absolute_path = std::filesystem::absolute(path);
absolute_path = absolute_path.lexically_normal(); /// Normalize path.
auto absolute_prefix_path = std::filesystem::absolute(prefix_path);
absolute_prefix_path = absolute_prefix_path.lexically_normal(); /// Normalize path.
auto [_, prefix_path_mismatch_it] = std::mismatch(absolute_path.begin(), absolute_path.end(), absolute_prefix_path.begin(), absolute_prefix_path.end());
bool path_starts_with_prefix_path = (prefix_path_mismatch_it == absolute_prefix_path.end());
return path_starts_with_prefix_path;
}
bool pathStartsWith(const String & path, const String & prefix_path)
{
auto filesystem_path = std::filesystem::path(path);
@ -130,6 +148,13 @@ bool pathStartsWith(const String & path, const String & prefix_path)
return pathStartsWith(filesystem_path, filesystem_prefix_path);
}
bool symlinkStartsWith(const String & path, const String & prefix_path)
{
auto filesystem_path = std::filesystem::path(path);
auto filesystem_prefix_path = std::filesystem::path(prefix_path);
return symlinkStartsWith(filesystem_path, filesystem_prefix_path);
}
}

View File

@ -35,6 +35,8 @@ bool pathStartsWith(const std::filesystem::path & path, const std::filesystem::p
/// Returns true if path starts with prefix path
bool pathStartsWith(const String & path, const String & prefix_path);
bool symlinkStartsWith(const String & path, const String & prefix_path);
}
namespace FS

View File

@ -439,11 +439,14 @@ bool NO_INLINE decompressImpl(
{
s = *ip++;
length += s;
} while (unlikely(s == 255));
} while (unlikely(s == 255 && ip < input_end));
};
/// Get literal length.
if (unlikely(ip >= input_end))
return false;
const unsigned token = *ip++;
length = token >> 4;
if (length == 0x0F)
@ -464,18 +467,18 @@ bool NO_INLINE decompressImpl(
/// output: xyzHello, w
/// ^-op (we will overwrite excessive bytes on next iteration)
{
auto * target = std::min(copy_end, output_end);
wildCopy<copy_amount>(op, ip, target); /// Here we can write up to copy_amount - 1 bytes after buffer.
if (unlikely(copy_end > output_end))
return false;
if (target == output_end)
return true;
}
wildCopy<copy_amount>(op, ip, copy_end); /// Here we can write up to copy_amount - 1 bytes after buffer.
if (copy_end == output_end)
return true;
ip += length;
op = copy_end;
if (unlikely(ip > input_end))
if (unlikely(ip + 1 >= input_end))
return false;
/// Get match offset.
@ -528,8 +531,9 @@ bool NO_INLINE decompressImpl(
copy<copy_amount>(op, match); /// copy_amount + copy_amount - 1 - 4 * 2 bytes after buffer.
if (length > copy_amount * 2)
{
auto * target = std::min(copy_end, output_end);
wildCopy<copy_amount>(op + copy_amount, match + copy_amount, target);
if (unlikely(copy_end > output_end))
return false;
wildCopy<copy_amount>(op + copy_amount, match + copy_amount, copy_end);
}
op = copy_end;

View File

@ -110,7 +110,7 @@ void insertPostgreSQLValue(
readDateTime64Text(time, 6, in, assert_cast<const DataTypeDateTime64 *>(data_type.get())->getTimeZone());
if (time < 0)
time = 0;
assert_cast<ColumnDecimal<Decimal64> &>(column).insertValue(time);
assert_cast<DataTypeDateTime64::ColumnType &>(column).insertValue(time);
break;
}
case ExternalResultDescription::ValueType::vtDecimal32: [[fallthrough]];

View File

@ -17,7 +17,7 @@ void registerDictionarySourceCassandra(DictionarySourceFactory & factory)
[[maybe_unused]] const Poco::Util::AbstractConfiguration & config,
[[maybe_unused]] const std::string & config_prefix,
[[maybe_unused]] Block & sample_block,
ContextPtr /* context */,
ContextPtr /* global_context */,
const std::string & /* default_database */,
bool /*created_from_ddl*/) -> DictionarySourcePtr
{

View File

@ -7,6 +7,7 @@
#include <Interpreters/ExpressionActions.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <IO/ConnectionTimeouts.h>
#include <Interpreters/Session.h>
#include <Interpreters/executeQuery.h>
#include <Common/isLocalAddress.h>
#include <common/logger_useful.h>
@ -63,19 +64,18 @@ ClickHouseDictionarySource::ClickHouseDictionarySource(
const DictionaryStructure & dict_struct_,
const Configuration & configuration_,
const Block & sample_block_,
ContextPtr context_)
ContextMutablePtr context_,
std::shared_ptr<Session> local_session_)
: update_time{std::chrono::system_clock::from_time_t(0)}
, dict_struct{dict_struct_}
, configuration{configuration_}
, query_builder{dict_struct, configuration.db, "", configuration.table, configuration.query, configuration.where, IdentifierQuotingStyle::Backticks}
, sample_block{sample_block_}
, context(Context::createCopy(context_))
, local_session(local_session_)
, context(context_)
, pool{createPool(configuration)}
, load_all_query{query_builder.composeLoadAllQuery()}
{
/// Query context is needed because some code in executeQuery function may assume it exists.
/// Current example is Context::getSampleBlockCache from InterpreterSelectWithUnionQuery::getSampleBlock.
context->makeQueryContext();
}
ClickHouseDictionarySource::ClickHouseDictionarySource(const ClickHouseDictionarySource & other)
@ -85,11 +85,11 @@ ClickHouseDictionarySource::ClickHouseDictionarySource(const ClickHouseDictionar
, invalidate_query_response{other.invalidate_query_response}
, query_builder{dict_struct, configuration.db, "", configuration.table, configuration.query, configuration.where, IdentifierQuotingStyle::Backticks}
, sample_block{other.sample_block}
, local_session(other.local_session)
, context(Context::createCopy(other.context))
, pool{createPool(configuration)}
, load_all_query{other.load_all_query}
{
context->makeQueryContext();
}
std::string ClickHouseDictionarySource::getUpdateFieldAndDate()
@ -222,14 +222,13 @@ void registerDictionarySourceClickHouse(DictionarySourceFactory & factory)
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
Block & sample_block,
ContextPtr context,
ContextPtr global_context,
const std::string & default_database [[maybe_unused]],
bool /* created_from_ddl */) -> DictionarySourcePtr
{
bool secure = config.getBool(config_prefix + ".secure", false);
auto context_copy = Context::createCopy(context);
UInt16 default_port = getPortFromContext(context_copy, secure);
UInt16 default_port = getPortFromContext(global_context, secure);
std::string settings_config_prefix = config_prefix + ".clickhouse";
std::string host = config.getString(settings_config_prefix + ".host", "localhost");
@ -252,12 +251,18 @@ void registerDictionarySourceClickHouse(DictionarySourceFactory & factory)
.secure = config.getBool(settings_config_prefix + ".secure", false)
};
/// We should set user info even for the case when the dictionary is loaded in-process (without TCP communication).
ContextMutablePtr context;
std::shared_ptr<Session> local_session;
if (configuration.is_local)
{
context_copy->authenticate(configuration.user, configuration.password, Poco::Net::SocketAddress("127.0.0.1", 0));
context_copy = copyContextAndApplySettings(config_prefix, context_copy, config);
/// Start local session in case when the dictionary is loaded in-process (without TCP communication).
local_session = std::make_shared<Session>(global_context, ClientInfo::Interface::TCP);
local_session->authenticate(configuration.user, configuration.password, Poco::Net::SocketAddress{"127.0.0.1", 0});
context = local_session->makeQueryContext();
context->applySettingsChanges(readSettingsFromDictionaryConfig(config, config_prefix));
}
else
context = copyContextAndApplySettingsFromDictionaryConfig(global_context, config, config_prefix);
String dictionary_name = config.getString(".dictionary.name", "");
String dictionary_database = config.getString(".dictionary.database", "");
@ -265,7 +270,7 @@ void registerDictionarySourceClickHouse(DictionarySourceFactory & factory)
if (dictionary_name == configuration.table && dictionary_database == configuration.db)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "ClickHouseDictionarySource table cannot be dictionary table");
return std::make_unique<ClickHouseDictionarySource>(dict_struct, configuration, sample_block, context_copy);
return std::make_unique<ClickHouseDictionarySource>(dict_struct, configuration, sample_block, context, local_session);
};
factory.registerSource("clickhouse", create_table_source);

View File

@ -39,7 +39,8 @@ public:
const DictionaryStructure & dict_struct_,
const Configuration & configuration_,
const Block & sample_block_,
ContextPtr context);
ContextMutablePtr context_,
std::shared_ptr<Session> local_session_);
/// copy-constructor is provided in order to support cloneability
ClickHouseDictionarySource(const ClickHouseDictionarySource & other);
@ -81,6 +82,7 @@ private:
mutable std::string invalidate_query_response;
ExternalQueryBuilder query_builder;
Block sample_block;
std::shared_ptr<Session> local_session;
ContextMutablePtr context;
ConnectionPoolWithFailoverPtr pool;
const std::string load_all_query;

View File

@ -31,7 +31,7 @@ DictionaryPtr DictionaryFactory::create(
const std::string & name,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
ContextPtr context,
ContextPtr global_context,
bool created_from_ddl) const
{
Poco::Util::AbstractConfiguration::Keys keys;
@ -45,12 +45,9 @@ DictionaryPtr DictionaryFactory::create(
const DictionaryStructure dict_struct{config, config_prefix};
DictionarySourcePtr source_ptr = DictionarySourceFactory::instance().create(
name, config, config_prefix + ".source", dict_struct, context, config.getString(config_prefix + ".database", ""), created_from_ddl);
name, config, config_prefix + ".source", dict_struct, global_context, config.getString(config_prefix + ".database", ""), created_from_ddl);
LOG_TRACE(&Poco::Logger::get("DictionaryFactory"), "Created dictionary source '{}' for dictionary '{}'", source_ptr->toString(), name);
if (context->hasQueryContext() && context->getSettingsRef().log_queries)
context->getQueryContext()->addQueryFactoriesInfo(Context::QueryLogFactories::Dictionary, name);
const auto & layout_type = keys.front();
{
@ -58,7 +55,7 @@ DictionaryPtr DictionaryFactory::create(
if (found != registered_layouts.end())
{
const auto & layout_creator = found->second.layout_create_function;
return layout_creator(name, dict_struct, config, config_prefix, std::move(source_ptr), context, created_from_ddl);
return layout_creator(name, dict_struct, config, config_prefix, std::move(source_ptr), global_context, created_from_ddl);
}
}
@ -68,10 +65,10 @@ DictionaryPtr DictionaryFactory::create(
layout_type);
}
DictionaryPtr DictionaryFactory::create(const std::string & name, const ASTCreateQuery & ast, ContextPtr context) const
DictionaryPtr DictionaryFactory::create(const std::string & name, const ASTCreateQuery & ast, ContextPtr global_context) const
{
auto configuration = getDictionaryConfigurationFromAST(ast, context);
return DictionaryFactory::create(name, *configuration, "dictionary", context, true);
auto configuration = getDictionaryConfigurationFromAST(ast, global_context);
return DictionaryFactory::create(name, *configuration, "dictionary", global_context, true);
}
bool DictionaryFactory::isComplex(const std::string & layout_type) const

View File

@ -36,13 +36,13 @@ public:
const std::string & name,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
ContextPtr context,
ContextPtr global_context,
bool created_from_ddl) const;
/// Create dictionary from DDL-query
DictionaryPtr create(const std::string & name,
const ASTCreateQuery & ast,
ContextPtr context) const;
ContextPtr global_context) const;
using LayoutCreateFunction = std::function<DictionaryPtr(
const std::string & name,
@ -50,7 +50,7 @@ public:
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
DictionarySourcePtr source_ptr,
ContextPtr context,
ContextPtr global_context,
bool created_from_ddl)>;
bool isComplex(const std::string & layout_type) const;

View File

@ -80,7 +80,7 @@ DictionarySourcePtr DictionarySourceFactory::create(
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const DictionaryStructure & dict_struct,
ContextPtr context,
ContextPtr global_context,
const std::string & default_database,
bool check_config) const
{
@ -99,7 +99,7 @@ DictionarySourcePtr DictionarySourceFactory::create(
{
const auto & create_source = found->second;
auto sample_block = createSampleBlock(dict_struct);
return create_source(dict_struct, config, config_prefix, sample_block, context, default_database, check_config);
return create_source(dict_struct, config, config_prefix, sample_block, global_context, default_database, check_config);
}
throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG,

View File

@ -35,7 +35,7 @@ public:
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
Block & sample_block,
ContextPtr context,
ContextPtr global_context,
const std::string & default_database,
bool check_config)>;
@ -48,7 +48,7 @@ public:
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const DictionaryStructure & dict_struct,
ContextPtr context,
ContextPtr global_context,
const std::string & default_database,
bool check_config) const;

View File

@ -59,30 +59,36 @@ Block blockForKeys(
return block;
}
ContextMutablePtr copyContextAndApplySettings(
const std::string & config_prefix,
ContextPtr context,
const Poco::Util::AbstractConfiguration & config)
SettingsChanges readSettingsFromDictionaryConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
{
auto local_context = Context::createCopy(context);
if (config.has(config_prefix + ".settings"))
if (!config.has(config_prefix + ".settings"))
return {};
const auto prefix = config_prefix + ".settings";
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(prefix, config_keys);
SettingsChanges changes;
for (const std::string & key : config_keys)
{
const auto prefix = config_prefix + ".settings";
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(prefix, config_keys);
SettingsChanges changes;
for (const std::string & key : config_keys)
{
const auto value = config.getString(prefix + "." + key);
changes.emplace_back(key, value);
}
local_context->applySettingsChanges(changes);
const auto value = config.getString(prefix + "." + key);
changes.emplace_back(key, value);
}
return local_context;
return changes;
}
ContextMutablePtr copyContextAndApplySettingsFromDictionaryConfig(
const ContextPtr & context, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
{
auto context_copy = Context::createCopy(context);
auto changes = readSettingsFromDictionaryConfig(config, config_prefix);
context_copy->applySettingsChanges(changes);
return context_copy;
}
static Block transformHeader(Block header, Block block_to_add)

View File

@ -14,6 +14,7 @@ namespace DB
{
struct DictionaryStructure;
class SettingsChanges;
/// For simple key
@ -29,10 +30,8 @@ Block blockForKeys(
const std::vector<size_t> & requested_rows);
/// Used for applying settings to copied context in some register[...]Source functions
ContextMutablePtr copyContextAndApplySettings(
const std::string & config_prefix,
ContextPtr context,
const Poco::Util::AbstractConfiguration & config);
SettingsChanges readSettingsFromDictionaryConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix);
ContextMutablePtr copyContextAndApplySettingsFromDictionaryConfig(const ContextPtr & context, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix);
/** A stream, adds additional columns to each block that it will read from inner stream.
*

View File

@ -307,7 +307,7 @@ namespace
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
DictionarySourcePtr source_ptr,
ContextPtr /* context */,
ContextPtr /* global_context */,
bool /* created_from_ddl */)
{
const auto * layout_name = dictionary_key_type == DictionaryKeyType::Simple ? "direct" : "complex_key_direct";

View File

@ -275,7 +275,7 @@ void registerDictionarySourceExecutable(DictionarySourceFactory & factory)
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
Block & sample_block,
ContextPtr context,
ContextPtr global_context,
const std::string & /* default_database */,
bool created_from_ddl) -> DictionarySourcePtr
{
@ -285,10 +285,10 @@ void registerDictionarySourceExecutable(DictionarySourceFactory & factory)
/// Executable dictionaries may execute arbitrary commands.
/// It's OK for dictionaries created by administrator from xml-file, but
/// maybe dangerous for dictionaries created from DDL-queries.
if (created_from_ddl && context->getApplicationType() != Context::ApplicationType::LOCAL)
if (created_from_ddl && global_context->getApplicationType() != Context::ApplicationType::LOCAL)
throw Exception(ErrorCodes::DICTIONARY_ACCESS_DENIED, "Dictionaries with executable dictionary source are not allowed to be created from DDL query");
auto context_local_copy = copyContextAndApplySettings(config_prefix, context, config);
auto context = copyContextAndApplySettingsFromDictionaryConfig(global_context, config, config_prefix);
std::string settings_config_prefix = config_prefix + ".executable";
@ -301,7 +301,7 @@ void registerDictionarySourceExecutable(DictionarySourceFactory & factory)
.implicit_key = config.getBool(settings_config_prefix + ".implicit_key", false)
};
return std::make_unique<ExecutableDictionarySource>(dict_struct, configuration, sample_block, context_local_copy);
return std::make_unique<ExecutableDictionarySource>(dict_struct, configuration, sample_block, context);
};
factory.registerSource("executable", create_table_source);

View File

@ -279,7 +279,7 @@ void registerDictionarySourceExecutablePool(DictionarySourceFactory & factory)
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
Block & sample_block,
ContextPtr context,
ContextPtr global_context,
const std::string & /* default_database */,
bool created_from_ddl) -> DictionarySourcePtr
{
@ -289,17 +289,15 @@ void registerDictionarySourceExecutablePool(DictionarySourceFactory & factory)
/// Executable dictionaries may execute arbitrary commands.
/// It's OK for dictionaries created by administrator from xml-file, but
/// maybe dangerous for dictionaries created from DDL-queries.
if (created_from_ddl && context->getApplicationType() != Context::ApplicationType::LOCAL)
if (created_from_ddl && global_context->getApplicationType() != Context::ApplicationType::LOCAL)
throw Exception(ErrorCodes::DICTIONARY_ACCESS_DENIED, "Dictionaries with executable pool dictionary source are not allowed to be created from DDL query");
auto context_local_copy = copyContextAndApplySettings(config_prefix, context, config);
ContextMutablePtr context = copyContextAndApplySettingsFromDictionaryConfig(global_context, config, config_prefix);
/** Currently parallel parsing input format cannot read exactly max_block_size rows from input,
* so it will be blocked on ReadBufferFromFileDescriptor because this file descriptor represent pipe that does not have eof.
*/
auto settings_no_parallel_parsing = context_local_copy->getSettings();
settings_no_parallel_parsing.input_format_parallel_parsing = false;
context_local_copy->setSettings(settings_no_parallel_parsing);
context->setSetting("input_format_parallel_parsing", Field{false});
String settings_config_prefix = config_prefix + ".executable_pool";
@ -319,7 +317,7 @@ void registerDictionarySourceExecutablePool(DictionarySourceFactory & factory)
.implicit_key = config.getBool(settings_config_prefix + ".implicit_key", false),
};
return std::make_unique<ExecutablePoolDictionarySource>(dict_struct, configuration, sample_block, context_local_copy);
return std::make_unique<ExecutablePoolDictionarySource>(dict_struct, configuration, sample_block, context);
};
factory.registerSource("executable_pool", create_table_source);

View File

@ -77,7 +77,7 @@ void registerDictionarySourceFile(DictionarySourceFactory & factory)
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
Block & sample_block,
ContextPtr context,
ContextPtr global_context,
const std::string & /* default_database */,
bool created_from_ddl) -> DictionarySourcePtr
{
@ -87,9 +87,9 @@ void registerDictionarySourceFile(DictionarySourceFactory & factory)
const auto filepath = config.getString(config_prefix + ".file.path");
const auto format = config.getString(config_prefix + ".file.format");
auto context_local_copy = copyContextAndApplySettings(config_prefix, context, config);
const auto context = copyContextAndApplySettingsFromDictionaryConfig(global_context, config, config_prefix);
return std::make_unique<FileDictionarySource>(filepath, format, sample_block, context_local_copy, created_from_ddl);
return std::make_unique<FileDictionarySource>(filepath, format, sample_block, context, created_from_ddl);
};
factory.registerSource("file", create_table_source);

View File

@ -557,7 +557,7 @@ void registerDictionaryFlat(DictionaryFactory & factory)
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
DictionarySourcePtr source_ptr,
ContextPtr /* context */,
ContextPtr /* global_context */,
bool /* created_from_ddl */) -> DictionaryPtr
{
if (dict_struct.key)

View File

@ -213,13 +213,13 @@ void registerDictionarySourceHTTP(DictionarySourceFactory & factory)
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
Block & sample_block,
ContextPtr context,
ContextPtr global_context,
const std::string & /* default_database */,
bool created_from_ddl) -> DictionarySourcePtr {
if (dict_struct.has_expressions)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Dictionary source of type `http` does not support attribute expressions");
auto context_local_copy = copyContextAndApplySettings(config_prefix, context, config);
auto context = copyContextAndApplySettingsFromDictionaryConfig(global_context, config, config_prefix);
const auto & settings_config_prefix = config_prefix + ".http";
const auto & credentials_prefix = settings_config_prefix + ".credentials";
@ -258,7 +258,7 @@ void registerDictionarySourceHTTP(DictionarySourceFactory & factory)
.header_entries = std::move(header_entries)
};
return std::make_unique<HTTPDictionarySource>(dict_struct, configuration, credentials, sample_block, context_local_copy, created_from_ddl);
return std::make_unique<HTTPDictionarySource>(dict_struct, configuration, credentials, sample_block, context, created_from_ddl);
};
factory.registerSource("http", create_table_source);
}

View File

@ -756,13 +756,13 @@ void registerDictionaryHashed(DictionaryFactory & factory)
using namespace std::placeholders;
factory.registerLayout("hashed",
[=](auto && a, auto && b, auto && c, auto && d, DictionarySourcePtr e, ContextPtr /* context */, bool /*created_from_ddl*/){ return create_layout(a, b, c, d, std::move(e), DictionaryKeyType::Simple, /* sparse = */ false); }, false);
[=](auto && a, auto && b, auto && c, auto && d, DictionarySourcePtr e, ContextPtr /* global_context */, bool /*created_from_ddl*/){ return create_layout(a, b, c, d, std::move(e), DictionaryKeyType::Simple, /* sparse = */ false); }, false);
factory.registerLayout("sparse_hashed",
[=](auto && a, auto && b, auto && c, auto && d, DictionarySourcePtr e, ContextPtr /* context */, bool /*created_from_ddl*/){ return create_layout(a, b, c, d, std::move(e), DictionaryKeyType::Simple, /* sparse = */ true); }, false);
[=](auto && a, auto && b, auto && c, auto && d, DictionarySourcePtr e, ContextPtr /* global_context */, bool /*created_from_ddl*/){ return create_layout(a, b, c, d, std::move(e), DictionaryKeyType::Simple, /* sparse = */ true); }, false);
factory.registerLayout("complex_key_hashed",
[=](auto && a, auto && b, auto && c, auto && d, DictionarySourcePtr e, ContextPtr /* context */, bool /*created_from_ddl*/){ return create_layout(a, b, c, d, std::move(e), DictionaryKeyType::Complex, /* sparse = */ false); }, true);
[=](auto && a, auto && b, auto && c, auto && d, DictionarySourcePtr e, ContextPtr /* global_context */, bool /*created_from_ddl*/){ return create_layout(a, b, c, d, std::move(e), DictionaryKeyType::Complex, /* sparse = */ false); }, true);
factory.registerLayout("complex_key_sparse_hashed",
[=](auto && a, auto && b, auto && c, auto && d, DictionarySourcePtr e, ContextPtr /* context */, bool /*created_from_ddl*/){ return create_layout(a, b, c, d, std::move(e), DictionaryKeyType::Complex, /* sparse = */ true); }, true);
[=](auto && a, auto && b, auto && c, auto && d, DictionarySourcePtr e, ContextPtr /* global_context */, bool /*created_from_ddl*/){ return create_layout(a, b, c, d, std::move(e), DictionaryKeyType::Complex, /* sparse = */ true); }, true);
}

View File

@ -954,7 +954,7 @@ void registerDictionaryTrie(DictionaryFactory & factory)
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
DictionarySourcePtr source_ptr,
ContextPtr /* context */,
ContextPtr /* global_context */,
bool /*created_from_ddl*/) -> DictionaryPtr
{
if (!dict_struct.key || dict_struct.key->size() != 1)

View File

@ -41,10 +41,13 @@ LibraryDictionarySource::LibraryDictionarySource(
, sample_block{sample_block_}
, context(Context::createCopy(context_))
{
if (fs::path(path).is_relative())
path = fs::canonical(path);
bool path_checked = false;
if (fs::is_symlink(path))
path_checked = symlinkStartsWith(path, context->getDictionariesLibPath());
else
path_checked = pathStartsWith(path, context->getDictionariesLibPath());
if (created_from_ddl && !pathStartsWith(path, context->getDictionariesLibPath()))
if (created_from_ddl && !path_checked)
throw Exception(ErrorCodes::PATH_ACCESS_DENIED, "File path {} is not inside {}", path, context->getDictionariesLibPath());
if (!fs::exists(path))
@ -183,11 +186,11 @@ void registerDictionarySourceLibrary(DictionarySourceFactory & factory)
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
Block & sample_block,
ContextPtr context,
ContextPtr global_context,
const std::string & /* default_database */,
bool created_from_ddl) -> DictionarySourcePtr
{
return std::make_unique<LibraryDictionarySource>(dict_struct, config, config_prefix + ".library", sample_block, context, created_from_ddl);
return std::make_unique<LibraryDictionarySource>(dict_struct, config, config_prefix + ".library", sample_block, global_context, created_from_ddl);
};
factory.registerSource("library", create_table_source);

View File

@ -31,11 +31,11 @@ void registerDictionarySourceMysql(DictionarySourceFactory & factory)
[[maybe_unused]] const Poco::Util::AbstractConfiguration & config,
[[maybe_unused]] const std::string & config_prefix,
[[maybe_unused]] Block & sample_block,
[[maybe_unused]] ContextPtr context,
[[maybe_unused]] ContextPtr global_context,
const std::string & /* default_database */,
bool /* created_from_ddl */) -> DictionarySourcePtr {
#if USE_MYSQL
StreamSettings mysql_input_stream_settings(context->getSettingsRef()
StreamSettings mysql_input_stream_settings(global_context->getSettingsRef()
, config.getBool(config_prefix + ".mysql.close_connection", false) || config.getBool(config_prefix + ".mysql.share_connection", false)
, false
, config.getBool(config_prefix + ".mysql.fail_on_connection_loss", false) ? 1 : default_num_tries_on_connection_loss);

View File

@ -167,7 +167,7 @@ DictionaryPtr createLayout(const std::string & ,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
DictionarySourcePtr source_ptr,
ContextPtr /* context */,
ContextPtr /* global_context */,
bool /*created_from_ddl*/)
{
const String database = config.getString(config_prefix + ".database", "");

View File

@ -182,7 +182,7 @@ void registerDictionarySourcePostgreSQL(DictionarySourceFactory & factory)
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
Block & sample_block,
ContextPtr context,
ContextPtr global_context,
const std::string & /* default_database */,
bool /* created_from_ddl */) -> DictionarySourcePtr
{
@ -190,8 +190,8 @@ void registerDictionarySourcePostgreSQL(DictionarySourceFactory & factory)
const auto settings_config_prefix = config_prefix + ".postgresql";
auto pool = std::make_shared<postgres::PoolWithFailover>(
config, settings_config_prefix,
context->getSettingsRef().postgresql_connection_pool_size,
context->getSettingsRef().postgresql_connection_pool_wait_timeout);
global_context->getSettingsRef().postgresql_connection_pool_size,
global_context->getSettingsRef().postgresql_connection_pool_wait_timeout);
PostgreSQLDictionarySource::Configuration configuration
{
@ -211,7 +211,7 @@ void registerDictionarySourcePostgreSQL(DictionarySourceFactory & factory)
(void)config;
(void)config_prefix;
(void)sample_block;
(void)context;
(void)global_context;
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,
"Dictionary source of type `postgresql` is disabled because ClickHouse was built without postgresql support.");
#endif

View File

@ -688,7 +688,7 @@ void registerDictionaryRangeHashed(DictionaryFactory & factory)
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
DictionarySourcePtr source_ptr,
ContextPtr /* context */,
ContextPtr /* global_context */,
bool /*created_from_ddl*/) -> DictionaryPtr
{
if (dict_struct.key)

View File

@ -12,7 +12,7 @@ void registerDictionarySourceRedis(DictionarySourceFactory & factory)
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
Block & sample_block,
ContextPtr /* context */,
ContextPtr /* global_context */,
const std::string & /* default_database */,
bool /* created_from_ddl */) -> DictionarySourcePtr {
return std::make_unique<RedisDictionarySource>(dict_struct, config, config_prefix + ".redis", sample_block);

View File

@ -234,12 +234,12 @@ void registerDictionarySourceXDBC(DictionarySourceFactory & factory)
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
Block & sample_block,
ContextPtr context,
ContextPtr global_context,
const std::string & /* default_database */,
bool /* check_config */) -> DictionarySourcePtr {
#if USE_ODBC
BridgeHelperPtr bridge = std::make_shared<XDBCBridgeHelper<ODBCBridgeMixin>>(
context, context->getSettings().http_receive_timeout, config.getString(config_prefix + ".odbc.connection_string"));
global_context, global_context->getSettings().http_receive_timeout, config.getString(config_prefix + ".odbc.connection_string"));
std::string settings_config_prefix = config_prefix + ".odbc";
@ -255,13 +255,13 @@ void registerDictionarySourceXDBC(DictionarySourceFactory & factory)
.update_lag = config.getUInt64(settings_config_prefix + ".update_lag", 1)
};
return std::make_unique<XDBCDictionarySource>(dict_struct, configuration, sample_block, context, bridge);
return std::make_unique<XDBCDictionarySource>(dict_struct, configuration, sample_block, global_context, bridge);
#else
(void)dict_struct;
(void)config;
(void)config_prefix;
(void)sample_block;
(void)context;
(void)global_context;
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,
"Dictionary source of type `odbc` is disabled because poco library was built without ODBC support.");
#endif
@ -276,7 +276,7 @@ void registerDictionarySourceJDBC(DictionarySourceFactory & factory)
const Poco::Util::AbstractConfiguration & /* config */,
const std::string & /* config_prefix */,
Block & /* sample_block */,
ContextPtr /* context */,
ContextPtr /* global_context */,
const std::string & /* default_database */,
bool /* created_from_ddl */) -> DictionarySourcePtr {
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,

View File

@ -154,7 +154,7 @@ DictionaryPtr createCacheDictionaryLayout(
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
DictionarySourcePtr source_ptr,
ContextPtr context [[maybe_unused]],
ContextPtr global_context [[maybe_unused]],
bool created_from_ddl [[maybe_unused]])
{
String layout_type;
@ -213,8 +213,8 @@ DictionaryPtr createCacheDictionaryLayout(
else
{
auto storage_configuration = parseSSDCacheStorageConfiguration(config, full_name, layout_type, dictionary_layout_prefix, dict_lifetime);
if (created_from_ddl && !pathStartsWith(storage_configuration.file_path, context->getUserFilesPath()))
throw Exception(ErrorCodes::PATH_ACCESS_DENIED, "File path {} is not inside {}", storage_configuration.file_path, context->getUserFilesPath());
if (created_from_ddl && !pathStartsWith(storage_configuration.file_path, global_context->getUserFilesPath()))
throw Exception(ErrorCodes::PATH_ACCESS_DENIED, "File path {} is not inside {}", storage_configuration.file_path, global_context->getUserFilesPath());
storage = std::make_shared<SSDCacheDictionaryStorage<dictionary_key_type>>(storage_configuration);
}
@ -239,10 +239,10 @@ void registerDictionaryCache(DictionaryFactory & factory)
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
DictionarySourcePtr source_ptr,
ContextPtr context,
ContextPtr global_context,
bool created_from_ddl) -> DictionaryPtr
{
return createCacheDictionaryLayout<DictionaryKeyType::Simple, false/* ssd */>(full_name, dict_struct, config, config_prefix, std::move(source_ptr), std::move(context), created_from_ddl);
return createCacheDictionaryLayout<DictionaryKeyType::Simple, false/* ssd */>(full_name, dict_struct, config, config_prefix, std::move(source_ptr), global_context, created_from_ddl);
};
factory.registerLayout("cache", create_simple_cache_layout, false);
@ -252,10 +252,10 @@ void registerDictionaryCache(DictionaryFactory & factory)
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
DictionarySourcePtr source_ptr,
ContextPtr context,
ContextPtr global_context,
bool created_from_ddl) -> DictionaryPtr
{
return createCacheDictionaryLayout<DictionaryKeyType::Complex, false /* ssd */>(full_name, dict_struct, config, config_prefix, std::move(source_ptr), std::move(context), created_from_ddl);
return createCacheDictionaryLayout<DictionaryKeyType::Complex, false /* ssd */>(full_name, dict_struct, config, config_prefix, std::move(source_ptr), global_context, created_from_ddl);
};
factory.registerLayout("complex_key_cache", create_complex_key_cache_layout, true);
@ -267,10 +267,10 @@ void registerDictionaryCache(DictionaryFactory & factory)
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
DictionarySourcePtr source_ptr,
ContextPtr context,
ContextPtr global_context,
bool created_from_ddl) -> DictionaryPtr
{
return createCacheDictionaryLayout<DictionaryKeyType::Simple, true /* ssd */>(full_name, dict_struct, config, config_prefix, std::move(source_ptr), std::move(context), created_from_ddl);
return createCacheDictionaryLayout<DictionaryKeyType::Simple, true /* ssd */>(full_name, dict_struct, config, config_prefix, std::move(source_ptr), global_context, created_from_ddl);
};
factory.registerLayout("ssd_cache", create_simple_ssd_cache_layout, false);
@ -280,9 +280,9 @@ void registerDictionaryCache(DictionaryFactory & factory)
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
DictionarySourcePtr source_ptr,
ContextPtr context,
ContextPtr global_context,
bool created_from_ddl) -> DictionaryPtr {
return createCacheDictionaryLayout<DictionaryKeyType::Complex, true /* ssd */>(full_name, dict_struct, config, config_prefix, std::move(source_ptr), std::move(context), created_from_ddl);
return createCacheDictionaryLayout<DictionaryKeyType::Complex, true /* ssd */>(full_name, dict_struct, config, config_prefix, std::move(source_ptr), global_context, created_from_ddl);
};
factory.registerLayout("complex_key_ssd_cache", create_complex_key_ssd_cache_layout, true);

View File

@ -0,0 +1,207 @@
#pragma once
#include <Functions/extractTimeZoneFromFunctionArguments.h>
#include <Functions/IFunction.h>
#include <Functions/FunctionHelpers.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnsNumber.h>
#include <common/arithmeticOverflow.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}
/** According to Twitter's post on Snowflake, we can extract the timestamp for a snowflake ID by right shifting
* the snowflake ID by 22 bits(10 bits machine ID and 12 bits sequence ID) and adding the Twitter epoch time of 1288834974657.
* https://en.wikipedia.org/wiki/Snowflake_ID
* https://blog.twitter.com/engineering/en_us/a/2010/announcing-snowflake
* https://ws-dl.blogspot.com/2019/08/2019-08-03-tweetedat-finding-tweet.html
*/
static constexpr long snowflake_epoch = 1288834974657L;
static constexpr int time_shift = 22;
class FunctionDateTimeToSnowflake : public IFunction
{
private:
const char * name;
public:
FunctionDateTimeToSnowflake(const char * name_) : name(name_) { }
String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 1; }
bool isVariadic() const override { return false; }
bool useDefaultImplementationForConstants() const override { return true; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
if (!isDateTime(arguments[0].type))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "The only argument for function {} must be DateTime", name);
return std::make_shared<DataTypeInt64>();
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
const auto & src = arguments[0];
const auto & col = *src.column;
auto res_column = ColumnInt64::create(input_rows_count);
auto & result_data = res_column->getData();
const auto & source_data = typeid_cast<const ColumnUInt32 &>(col).getData();
for (size_t i = 0; i < input_rows_count; ++i)
{
result_data[i] = (Int64(source_data[i]) * 1000 - snowflake_epoch) << time_shift;
}
return res_column;
}
};
class FunctionSnowflakeToDateTime : public IFunction
{
private:
const char * name;
public:
FunctionSnowflakeToDateTime(const char * name_) : name(name_) { }
String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 0; }
bool isVariadic() const override { return true; }
bool useDefaultImplementationForConstants() const override { return true; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
if (arguments.size() < 1 || arguments.size() > 2)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Function {} takes one or two arguments", name);
if (!typeid_cast<const DataTypeInt64 *>(arguments[0].type.get()))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "The first argument for function {} must be Int64", name);
std::string timezone;
if (arguments.size() == 2)
timezone = extractTimeZoneNameFromFunctionArguments(arguments, 1, 0);
return std::make_shared<DataTypeDateTime>(timezone);
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
const auto & src = arguments[0];
const auto & col = *src.column;
auto res_column = ColumnUInt32::create(input_rows_count);
auto & result_data = res_column->getData();
const auto & source_data = typeid_cast<const ColumnInt64 &>(col).getData();
for (size_t i = 0; i < input_rows_count; ++i)
{
result_data[i] = ((source_data[i] >> time_shift) + snowflake_epoch) / 1000;
}
return res_column;
}
};
class FunctionDateTime64ToSnowflake : public IFunction
{
private:
const char * name;
public:
FunctionDateTime64ToSnowflake(const char * name_) : name(name_) { }
String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 1; }
bool isVariadic() const override { return false; }
bool useDefaultImplementationForConstants() const override { return true; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
if (!isDateTime64(arguments[0].type))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "The only argument for function {} must be DateTime64", name);
return std::make_shared<DataTypeInt64>();
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
const auto & src = arguments[0];
const auto & col = *src.column;
auto res_column = ColumnInt64::create(input_rows_count);
auto & result_data = res_column->getData();
const auto & source_data = typeid_cast<const ColumnDecimal<DateTime64> &>(col).getData();
for (size_t i = 0; i < input_rows_count; ++i)
{
result_data[i] = (source_data[i] - snowflake_epoch) << time_shift;
}
return res_column;
}
};
class FunctionSnowflakeToDateTime64 : public IFunction
{
private:
const char * name;
public:
FunctionSnowflakeToDateTime64(const char * name_) : name(name_) { }
String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 0; }
bool isVariadic() const override { return true; }
bool useDefaultImplementationForConstants() const override { return true; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
if (arguments.size() < 1 || arguments.size() > 2)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Function {} takes one or two arguments", name);
if (!typeid_cast<const DataTypeInt64 *>(arguments[0].type.get()))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "The first argument for function {} must be Int64", name);
std::string timezone;
if (arguments.size() == 2)
timezone = extractTimeZoneNameFromFunctionArguments(arguments, 1, 0);
return std::make_shared<DataTypeDateTime64>(3, timezone);
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
const auto & src = arguments[0];
const auto & col = *src.column;
auto res_column = ColumnDecimal<DateTime64>::create(input_rows_count, 3);
auto & result_data = res_column->getData();
const auto & source_data = typeid_cast<const ColumnInt64 &>(col).getData();
for (size_t i = 0; i < input_rows_count; ++i)
{
result_data[i] = (source_data[i] >> time_shift) + snowflake_epoch;
}
return res_column;
}
};
}

View File

@ -51,6 +51,7 @@ void registerFunctionBitHammingDistance(FunctionFactory & factory);
void registerFunctionTupleHammingDistance(FunctionFactory & factory);
void registerFunctionsStringHash(FunctionFactory & factory);
void registerFunctionValidateNestedArraySizes(FunctionFactory & factory);
void registerFunctionsSnowflake(FunctionFactory & factory);
#if !defined(ARCADIA_BUILD)
void registerFunctionBayesAB(FunctionFactory &);
#endif
@ -115,6 +116,7 @@ void registerFunctions()
registerFunctionTupleHammingDistance(factory);
registerFunctionsStringHash(factory);
registerFunctionValidateNestedArraySizes(factory);
registerFunctionsSnowflake(factory);
#if !defined(ARCADIA_BUILD)
registerFunctionBayesAB(factory);

View File

@ -0,0 +1,22 @@
namespace DB
{
class FunctionFactory;
void registerDateTimeToSnowflake(FunctionFactory &);
void registerSnowflakeToDateTime(FunctionFactory &);
void registerDateTime64ToSnowflake(FunctionFactory &);
void registerSnowflakeToDateTime64(FunctionFactory &);
void registerFunctionsSnowflake(FunctionFactory & factory)
{
registerDateTimeToSnowflake(factory);
registerSnowflakeToDateTime(factory);
registerDateTime64ToSnowflake(factory);
registerSnowflakeToDateTime64(factory);
}
}

View File

@ -0,0 +1,34 @@
#include <Functions/FunctionSnowflake.h>
#include <Functions/FunctionFactory.h>
namespace DB
{
void registerDateTimeToSnowflake(FunctionFactory & factory)
{
factory.registerFunction("dateTimeToSnowflake",
[](ContextPtr){ return std::make_unique<FunctionToOverloadResolverAdaptor>(
std::make_shared<FunctionDateTimeToSnowflake>("dateTimeToSnowflake")); });
}
void registerDateTime64ToSnowflake(FunctionFactory & factory)
{
factory.registerFunction("dateTime64ToSnowflake",
[](ContextPtr){ return std::make_unique<FunctionToOverloadResolverAdaptor>(
std::make_shared<FunctionDateTime64ToSnowflake>("dateTime64ToSnowflake")); });
}
void registerSnowflakeToDateTime(FunctionFactory & factory)
{
factory.registerFunction("snowflakeToDateTime",
[](ContextPtr){ return std::make_unique<FunctionToOverloadResolverAdaptor>(
std::make_shared<FunctionSnowflakeToDateTime>("snowflakeToDateTime")); });
}
void registerSnowflakeToDateTime64(FunctionFactory & factory)
{
factory.registerFunction("snowflakeToDateTime64",
[](ContextPtr){ return std::make_unique<FunctionToOverloadResolverAdaptor>(
std::make_shared<FunctionSnowflakeToDateTime64>("snowflakeToDateTime64")); });
}
}

View File

@ -59,6 +59,7 @@
#include <Interpreters/Context.h>
#include <Interpreters/DDLWorker.h>
#include <Interpreters/DDLTask.h>
#include <Interpreters/Session.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/UncompressedCache.h>
#include <IO/MMappedFileCache.h>
@ -273,6 +274,8 @@ struct ContextSharedPart
return;
shutdown_called = true;
Session::shutdownNamedSessions();
/** After system_logs have been shut down it is guaranteed that no system table gets created or written to.
* Note that part changes at shutdown won't be logged to part log.
*/
@ -589,27 +592,6 @@ ConfigurationPtr Context::getUsersConfig()
}
void Context::authenticate(const String & name, const String & password, const Poco::Net::SocketAddress & address)
{
authenticate(BasicCredentials(name, password), address);
}
void Context::authenticate(const Credentials & credentials, const Poco::Net::SocketAddress & address)
{
auto authenticated_user_id = getAccessControlManager().login(credentials, address.host());
client_info.current_user = credentials.getUserName();
client_info.current_address = address;
#if defined(ARCADIA_BUILD)
/// This is harmful field that is used only in foreign "Arcadia" build.
if (const auto * basic_credentials = dynamic_cast<const BasicCredentials *>(&credentials))
client_info.current_password = basic_credentials->getPassword();
#endif
setUser(authenticated_user_id);
}
void Context::setUser(const UUID & user_id_)
{
auto lock = getLock();

View File

@ -362,13 +362,9 @@ public:
void setUsersConfig(const ConfigurationPtr & config);
ConfigurationPtr getUsersConfig();
/// Sets the current user, checks the credentials and that the specified address is allowed to connect from.
/// The function throws an exception if there is no such user or password is wrong.
void authenticate(const String & user_name, const String & password, const Poco::Net::SocketAddress & address);
void authenticate(const Credentials & credentials, const Poco::Net::SocketAddress & address);
/// Sets the current user assuming that he/she is already authenticated.
/// WARNING: This function doesn't check password! Don't use until it's necessary!
/// WARNING: This function doesn't check password!
/// Normally you shouldn't call this function. Use the Session class to do authentication instead.
void setUser(const UUID & user_id_);
UserPtr getUser() const;

View File

@ -45,12 +45,20 @@ ExternalLoader::LoadablePtr ExternalDictionariesLoader::create(
ExternalDictionariesLoader::DictPtr ExternalDictionariesLoader::getDictionary(const std::string & dictionary_name, ContextPtr local_context) const
{
std::string resolved_dictionary_name = resolveDictionaryName(dictionary_name, local_context->getCurrentDatabase());
if (local_context->hasQueryContext() && local_context->getSettingsRef().log_queries)
local_context->addQueryFactoriesInfo(Context::QueryLogFactories::Dictionary, resolved_dictionary_name);
return std::static_pointer_cast<const IDictionary>(load(resolved_dictionary_name));
}
ExternalDictionariesLoader::DictPtr ExternalDictionariesLoader::tryGetDictionary(const std::string & dictionary_name, ContextPtr local_context) const
{
std::string resolved_dictionary_name = resolveDictionaryName(dictionary_name, local_context->getCurrentDatabase());
if (local_context->hasQueryContext() && local_context->getSettingsRef().log_queries)
local_context->addQueryFactoriesInfo(Context::QueryLogFactories::Dictionary, resolved_dictionary_name);
return std::static_pointer_cast<const IDictionary>(tryLoad(resolved_dictionary_name));
}

View File

@ -21,7 +21,6 @@
#include <Storages/StorageDictionary.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/materializeBlock.h>
#include <Core/ColumnNumbers.h>
@ -194,6 +193,13 @@ HashJoin::HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_s
required_right_keys = table_join->getRequiredRightKeys(right_table_keys, required_right_keys_sources);
LOG_DEBUG(log, "Right keys: [{}] (required: [{}]), left keys: [{}]",
fmt::join(key_names_right, ", "),
fmt::join(required_right_keys.getNames(), ", "),
fmt::join(table_join->keyNamesLeft(), ", "));
LOG_DEBUG(log, "Columns to add: [{}]", sample_block_with_columns_to_add.dumpStructure());
std::tie(condition_mask_column_name_left, condition_mask_column_name_right) = table_join->joinConditionColumnNames();
JoinCommon::removeLowCardinalityInplace(right_table_keys);
@ -629,7 +635,7 @@ bool HashJoin::addJoinedBlock(const Block & source_block, bool check_limits)
ConstNullMapPtr null_map{};
ColumnPtr null_map_holder = extractNestedColumnsAndNullMap(key_columns, null_map);
/// If RIGHT or FULL save blocks with nulls for NonJoinedBlockInputStream
/// If RIGHT or FULL save blocks with nulls for NotJoinedBlocks
UInt8 save_nullmap = 0;
if (isRightOrFull(kind) && null_map)
{
@ -1468,40 +1474,17 @@ struct AdderNonJoined
/// Stream from not joined earlier rows of the right table.
class NonJoinedBlockInputStream : private NotJoined, public IBlockInputStream
class NotJoinedHash final : public NotJoinedBlocks::RightColumnsFiller
{
public:
NonJoinedBlockInputStream(const HashJoin & parent_, const Block & result_sample_block_, UInt64 max_block_size_)
: NotJoined(*parent_.table_join,
parent_.savedBlockSample(),
parent_.right_sample_block,
result_sample_block_)
, parent(parent_)
, max_block_size(max_block_size_)
NotJoinedHash(const HashJoin & parent_, UInt64 max_block_size_)
: parent(parent_), max_block_size(max_block_size_)
{}
String getName() const override { return "NonJoined"; }
Block getHeader() const override { return result_sample_block; }
Block getEmptyBlock() override { return parent.savedBlockSample().cloneEmpty(); }
protected:
Block readImpl() override
size_t fillColumns(MutableColumns & columns_right) override
{
if (parent.data->blocks.empty())
return Block();
return createBlock();
}
private:
const HashJoin & parent;
UInt64 max_block_size;
std::any position;
std::optional<HashJoin::BlockNullmapList::const_iterator> nulls_position;
Block createBlock()
{
MutableColumns columns_right = saved_block_sample.cloneEmptyColumns();
size_t rows_added = 0;
auto fill_callback = [&](auto, auto strictness, auto & map)
@ -1513,22 +1496,16 @@ private:
throw Exception("Logical error: unknown JOIN strictness (must be on of: ANY, ALL, ASOF)", ErrorCodes::LOGICAL_ERROR);
fillNullsFromBlocks(columns_right, rows_added);
if (!rows_added)
return {};
Block res = result_sample_block.cloneEmpty();
addLeftColumns(res, rows_added);
addRightColumns(res, columns_right);
copySameKeys(res);
correctLowcardAndNullability(res);
#ifndef NDEBUG
assertBlocksHaveEqualStructure(res, result_sample_block, getName());
#endif
return res;
return rows_added;
}
private:
const HashJoin & parent;
UInt64 max_block_size;
std::any position;
std::optional<HashJoin::BlockNullmapList::const_iterator> nulls_position;
template <ASTTableJoin::Strictness STRICTNESS, typename Maps>
size_t fillColumnsFromMap(const Maps & maps, MutableColumns & columns_keys_and_right)
{
@ -1607,15 +1584,18 @@ private:
};
BlockInputStreamPtr HashJoin::createStreamWithNonJoinedRows(const Block & result_sample_block, UInt64 max_block_size) const
std::shared_ptr<NotJoinedBlocks> HashJoin::getNonJoinedBlocks(const Block & result_sample_block, UInt64 max_block_size) const
{
if (table_join->strictness() == ASTTableJoin::Strictness::Asof ||
table_join->strictness() == ASTTableJoin::Strictness::Semi)
table_join->strictness() == ASTTableJoin::Strictness::Semi ||
!isRightOrFull(table_join->kind()))
{
return {};
}
if (isRightOrFull(table_join->kind()))
return std::make_shared<NonJoinedBlockInputStream>(*this, result_sample_block, max_block_size);
return {};
size_t left_columns_count = result_sample_block.columns() - required_right_keys.columns() - sample_block_with_columns_to_add.columns();
auto non_joined = std::make_unique<NotJoinedHash>(*this, max_block_size);
return std::make_shared<NotJoinedBlocks>(std::move(non_joined), result_sample_block, left_columns_count, table_join->leftToRightKeyRemap());
}
void HashJoin::reuseJoinedData(const HashJoin & join)

View File

@ -20,7 +20,6 @@
#include <Columns/ColumnFixedString.h>
#include <DataStreams/SizeLimits.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <Core/Block.h>
@ -164,7 +163,7 @@ public:
* Use only after all calls to joinBlock was done.
* left_sample_block is passed without account of 'use_nulls' setting (columns will be converted to Nullable inside).
*/
BlockInputStreamPtr createStreamWithNonJoinedRows(const Block & result_sample_block, UInt64 max_block_size) const override;
std::shared_ptr<NotJoinedBlocks> getNonJoinedBlocks(const Block & result_sample_block, UInt64 max_block_size) const override;
/// Number of keys in all built JOIN maps.
size_t getTotalRowCount() const final;
@ -337,7 +336,7 @@ public:
bool isUsed(size_t off) const { return used_flags.getUsedSafe(off); }
private:
friend class NonJoinedBlockInputStream;
friend class NotJoinedHash;
friend class JoinSource;
std::shared_ptr<TableJoin> table_join;

View File

@ -5,7 +5,6 @@
#include <Core/Names.h>
#include <Columns/IColumn.h>
#include <DataStreams/IBlockStream_fwd.h>
namespace DB
{
@ -15,6 +14,7 @@ struct ExtraBlock;
using ExtraBlockPtr = std::shared_ptr<ExtraBlock>;
class TableJoin;
class NotJoinedBlocks;
class IJoin
{
@ -43,7 +43,7 @@ public:
/// Different query plan is used for such joins.
virtual bool isFilled() const { return false; }
virtual BlockInputStreamPtr createStreamWithNonJoinedRows(const Block &, UInt64) const { return {}; }
virtual std::shared_ptr<NotJoinedBlocks> getNonJoinedBlocks(const Block &, UInt64) const = 0;
};
using JoinPtr = std::shared_ptr<IJoin>;

View File

@ -56,9 +56,9 @@ public:
return join->alwaysReturnsEmptySet();
}
BlockInputStreamPtr createStreamWithNonJoinedRows(const Block & block, UInt64 max_block_size) const override
std::shared_ptr<NotJoinedBlocks> getNonJoinedBlocks(const Block & block, UInt64 max_block_size) const override
{
return join->createStreamWithNonJoinedRows(block, max_block_size);
return join->getNonJoinedBlocks(block, max_block_size);
}
private:
@ -74,38 +74,4 @@ private:
void switchJoin();
};
/// Creates NonJoinedBlockInputStream on the first read. Allows to swap join algo before it.
class LazyNonJoinedBlockInputStream : public IBlockInputStream
{
public:
LazyNonJoinedBlockInputStream(const IJoin & join_, const Block & block, UInt64 max_block_size_)
: join(join_)
, result_sample_block(block)
, max_block_size(max_block_size_)
{}
String getName() const override { return "LazyNonMergeJoined"; }
Block getHeader() const override { return result_sample_block; }
protected:
Block readImpl() override
{
if (!stream)
{
stream = join.createStreamWithNonJoinedRows(result_sample_block, max_block_size);
if (!stream)
return {};
}
return stream->read();
}
private:
BlockInputStreamPtr stream;
const IJoin & join;
Block result_sample_block;
UInt64 max_block_size;
};
}

View File

@ -1,7 +1,8 @@
#include <limits>
#include <Columns/ColumnNullable.h>
#include <Core/NamesAndTypes.h>
#include <Columns/ColumnLowCardinality.h>
#include <Core/SortCursor.h>
#include <DataStreams/TemporaryFileStream.h>
#include <DataStreams/materializeBlock.h>
@ -723,15 +724,7 @@ void MergeJoin::joinBlock(Block & block, ExtraBlockPtr & not_processed)
if (needConditionJoinColumn())
block.erase(deriveTempName(mask_column_name_left));
for (const auto & column_name : lowcard_keys)
{
if (!block.has(column_name))
continue;
if (auto & col = block.getByName(column_name); !col.type->lowCardinality())
JoinCommon::changeLowCardinalityInplace(col);
}
JoinCommon::restoreLowCardinalityInplace(block);
JoinCommon::restoreLowCardinalityInplace(block, lowcard_keys);
}
template <bool in_memory, bool is_all>
@ -1035,55 +1028,16 @@ void MergeJoin::initRightTableWriter()
}
/// Stream from not joined earlier rows of the right table.
class NonMergeJoinedBlockInputStream : private NotJoined, public IBlockInputStream
class NotJoinedMerge final : public NotJoinedBlocks::RightColumnsFiller
{
public:
NonMergeJoinedBlockInputStream(const MergeJoin & parent_,
const Block & result_sample_block_,
const Names & key_names_right_,
UInt64 max_block_size_)
: NotJoined(*parent_.table_join,
parent_.modifyRightBlock(parent_.right_sample_block),
parent_.right_sample_block,
result_sample_block_,
{}, key_names_right_)
, parent(parent_)
, max_block_size(max_block_size_)
NotJoinedMerge(const MergeJoin & parent_, UInt64 max_block_size_)
: parent(parent_), max_block_size(max_block_size_)
{}
String getName() const override { return "NonMergeJoined"; }
Block getHeader() const override { return result_sample_block; }
Block getEmptyBlock() override { return parent.modifyRightBlock(parent.right_sample_block).cloneEmpty(); }
protected:
Block readImpl() override
{
if (parent.getRightBlocksCount())
return createBlock();
return {};
}
private:
const MergeJoin & parent;
size_t max_block_size;
size_t block_number = 0;
Block createBlock()
{
MutableColumns columns_right = saved_block_sample.cloneEmptyColumns();
size_t rows_added = fillColumns(columns_right);
if (!rows_added)
return {};
Block res = result_sample_block.cloneEmpty();
addLeftColumns(res, rows_added);
addRightColumns(res, columns_right);
copySameKeys(res);
correctLowcardAndNullability(res);
return res;
}
size_t fillColumns(MutableColumns & columns_right)
size_t fillColumns(MutableColumns & columns_right) override
{
const RowBitmaps & bitmaps = *parent.used_rows_bitmap;
size_t rows_added = 0;
@ -1127,14 +1081,23 @@ private:
return rows_added;
}
private:
const MergeJoin & parent;
size_t max_block_size;
size_t block_number = 0;
};
BlockInputStreamPtr MergeJoin::createStreamWithNonJoinedRows(const Block & result_sample_block, UInt64 max_block_size) const
std::shared_ptr<NotJoinedBlocks> MergeJoin::getNonJoinedBlocks(const Block & result_sample_block, UInt64 max_block_size) const
{
if (table_join->strictness() == ASTTableJoin::Strictness::All && (is_right || is_full))
return std::make_shared<NonMergeJoinedBlockInputStream>(*this, result_sample_block, key_names_right, max_block_size);
return {};
{
size_t left_columns_count = result_sample_block.columns() - right_columns_to_add.columns();
auto non_joined = std::make_unique<NotJoinedMerge>(*this, max_block_size);
return std::make_shared<NotJoinedBlocks>(std::move(non_joined), result_sample_block, left_columns_count, table_join->leftToRightKeyRemap());
}
return nullptr;
}
bool MergeJoin::needConditionJoinColumn() const

View File

@ -35,10 +35,10 @@ public:
/// Has to be called only after setTotals()/mergeRightBlocks()
bool alwaysReturnsEmptySet() const override { return (is_right || is_inner) && min_max_right_blocks.empty(); }
BlockInputStreamPtr createStreamWithNonJoinedRows(const Block & result_sample_block, UInt64 max_block_size) const override;
std::shared_ptr<NotJoinedBlocks> getNonJoinedBlocks(const Block & result_sample_block, UInt64 max_block_size) const override;
private:
friend class NonMergeJoinedBlockInputStream;
friend class NotJoinedMerge;
struct NotProcessed : public ExtraBlock
{
@ -78,6 +78,7 @@ private:
SortDescription right_merge_description;
Block right_sample_block;
Block right_table_keys;
/// Columns from right side of join, both key and additional
Block right_columns_to_add;
SortedBlocksWriter::Blocks right_blocks;

View File

@ -54,17 +54,17 @@ class NamedSessionsStorage
public:
using Key = NamedSessionKey;
static NamedSessionsStorage & instance()
{
static NamedSessionsStorage the_instance;
return the_instance;
}
~NamedSessionsStorage()
{
try
{
{
std::lock_guard lock{mutex};
quit = true;
}
cond.notify_one();
thread.join();
shutdown();
}
catch (...)
{
@ -72,6 +72,20 @@ public:
}
}
void shutdown()
{
{
std::lock_guard lock{mutex};
sessions.clear();
if (!thread.joinable())
return;
quit = true;
}
cond.notify_one();
thread.join();
}
/// Find existing session or create a new.
std::pair<std::shared_ptr<NamedSessionData>, bool> acquireSession(
const ContextPtr & global_context,
@ -94,6 +108,10 @@ public:
auto context = Context::createCopy(global_context);
it = sessions.insert(std::make_pair(key, std::make_shared<NamedSessionData>(key, context, timeout, *this))).first;
const auto & session = it->second;
if (!thread.joinable())
thread = ThreadFromGlobalPool{&NamedSessionsStorage::cleanThread, this};
return {session, true};
}
else
@ -156,11 +174,9 @@ private:
{
setThreadName("SessionCleaner");
std::unique_lock lock{mutex};
while (true)
while (!quit)
{
auto interval = closeSessions(lock);
if (cond.wait_for(lock, interval, [this]() -> bool { return quit; }))
break;
}
@ -208,8 +224,8 @@ private:
std::mutex mutex;
std::condition_variable cond;
std::atomic<bool> quit{false};
ThreadFromGlobalPool thread{&NamedSessionsStorage::cleanThread, this};
ThreadFromGlobalPool thread;
bool quit = false;
};
@ -218,13 +234,12 @@ void NamedSessionData::release()
parent.releaseSession(*this);
}
std::optional<NamedSessionsStorage> Session::named_sessions = std::nullopt;
void Session::startupNamedSessions()
void Session::shutdownNamedSessions()
{
named_sessions.emplace();
NamedSessionsStorage::instance().shutdown();
}
Session::Session(const ContextPtr & global_context_, ClientInfo::Interface interface_)
: global_context(global_context_)
{
@ -317,15 +332,13 @@ ContextMutablePtr Session::makeSessionContext(const String & session_id_, std::c
throw Exception("Session context already exists", ErrorCodes::LOGICAL_ERROR);
if (query_context_created)
throw Exception("Session context must be created before any query context", ErrorCodes::LOGICAL_ERROR);
if (!named_sessions)
throw Exception("Support for named sessions is not enabled", ErrorCodes::LOGICAL_ERROR);
/// Make a new session context OR
/// if the `session_id` and `user_id` were used before then just get a previously created session context.
std::shared_ptr<NamedSessionData> new_named_session;
bool new_named_session_created = false;
std::tie(new_named_session, new_named_session_created)
= named_sessions->acquireSession(global_context, user_id.value_or(UUID{}), session_id_, timeout_, session_check_);
= NamedSessionsStorage::instance().acquireSession(global_context, user_id.value_or(UUID{}), session_id_, timeout_, session_check_);
auto new_session_context = new_named_session->context;
new_session_context->makeSessionContext();

View File

@ -28,9 +28,8 @@ using UserPtr = std::shared_ptr<const User>;
class Session
{
public:
/// Allow to use named sessions. The thread will be run to cleanup sessions after timeout has expired.
/// The method must be called at the server startup.
static void startupNamedSessions();
/// Stops using named sessions. The method must be called at the server shutdown.
static void shutdownNamedSessions();
Session(const ContextPtr & global_context_, ClientInfo::Interface interface_);
Session(Session &&);
@ -83,8 +82,6 @@ private:
String session_id;
std::shared_ptr<NamedSessionData> named_session;
bool named_session_created = false;
static std::optional<NamedSessionsStorage> named_sessions;
};
}

View File

@ -454,6 +454,24 @@ void TableJoin::addJoinCondition(const ASTPtr & ast, bool is_left)
on_filter_condition_asts_right.push_back(ast);
}
std::unordered_map<String, String> TableJoin::leftToRightKeyRemap() const
{
std::unordered_map<String, String> left_to_right_key_remap;
if (hasUsing())
{
const auto & required_right_keys = requiredRightKeys();
for (size_t i = 0; i < key_names_left.size(); ++i)
{
const String & left_key_name = key_names_left[i];
const String & right_key_name = key_names_right[i];
if (!required_right_keys.contains(right_key_name))
left_to_right_key_remap[left_key_name] = right_key_name;
}
}
return left_to_right_key_remap;
}
/// Returns all conditions related to one table joined with 'and' function
static ASTPtr buildJoinConditionColumn(const ASTs & on_filter_condition_asts)
{

View File

@ -229,6 +229,7 @@ public:
Block getRequiredRightKeys(const Block & right_table_keys, std::vector<String> & keys_sources) const;
String renamedRightColumnName(const String & name) const;
std::unordered_map<String, String> leftToRightKeyRemap() const;
};
}

View File

@ -314,8 +314,16 @@ void removeLowCardinalityInplace(Block & block, const Names & names, bool change
}
}
void restoreLowCardinalityInplace(Block & block)
void restoreLowCardinalityInplace(Block & block, const Names & lowcard_keys)
{
for (const auto & column_name : lowcard_keys)
{
if (!block.has(column_name))
continue;
if (auto & col = block.getByName(column_name); !col.type->lowCardinality())
JoinCommon::changeLowCardinalityInplace(col);
}
for (size_t i = 0; i < block.columns(); ++i)
{
auto & col = block.getByPosition(i);
@ -484,49 +492,23 @@ void splitAdditionalColumns(const Names & key_names, const Block & sample_block,
}
NotJoined::NotJoined(const TableJoin & table_join, const Block & saved_block_sample_, const Block & right_sample_block,
const Block & result_sample_block_, const Names & key_names_left_, const Names & key_names_right_)
: saved_block_sample(saved_block_sample_)
NotJoinedBlocks::NotJoinedBlocks(std::unique_ptr<RightColumnsFiller> filler_,
const Block & result_sample_block_,
size_t left_columns_count,
const LeftToRightKeyRemap & left_to_right_key_remap)
: filler(std::move(filler_))
, saved_block_sample(filler->getEmptyBlock())
, result_sample_block(materializeBlock(result_sample_block_))
, key_names_left(key_names_left_.empty() ? table_join.keyNamesLeft() : key_names_left_)
, key_names_right(key_names_right_.empty() ? table_join.keyNamesRight() : key_names_right_)
{
std::vector<String> tmp;
Block right_table_keys;
Block sample_block_with_columns_to_add;
JoinCommon::splitAdditionalColumns(key_names_right, right_sample_block, right_table_keys,
sample_block_with_columns_to_add);
Block required_right_keys = table_join.getRequiredRightKeys(right_table_keys, tmp);
std::unordered_map<size_t, size_t> left_to_right_key_remap;
if (table_join.hasUsing())
{
for (size_t i = 0; i < key_names_left.size(); ++i)
{
const String & left_key_name = key_names_left[i];
const String & right_key_name = key_names_right[i];
size_t left_key_pos = result_sample_block.getPositionByName(left_key_name);
size_t right_key_pos = saved_block_sample.getPositionByName(right_key_name);
if (!required_right_keys.has(right_key_name))
left_to_right_key_remap[left_key_pos] = right_key_pos;
}
}
/// result_sample_block: left_sample_block + left expressions, right not key columns, required right keys
size_t left_columns_count = result_sample_block.columns() -
sample_block_with_columns_to_add.columns() - required_right_keys.columns();
for (size_t left_pos = 0; left_pos < left_columns_count; ++left_pos)
{
/// We need right 'x' for 'RIGHT JOIN ... USING(x)'.
if (left_to_right_key_remap.count(left_pos))
/// We need right 'x' for 'RIGHT JOIN ... USING(x)'
auto left_name = result_sample_block.getByPosition(left_pos).name;
const auto & right_key = left_to_right_key_remap.find(left_name);
if (right_key != left_to_right_key_remap.end())
{
size_t right_key_pos = left_to_right_key_remap[left_pos];
size_t right_key_pos = saved_block_sample.getPositionByName(right_key->second);
setRightIndex(right_key_pos, left_pos);
}
else
@ -556,9 +538,9 @@ NotJoined::NotJoined(const TableJoin & table_join, const Block & saved_block_sam
ErrorCodes::LOGICAL_ERROR);
}
void NotJoined::setRightIndex(size_t right_pos, size_t result_position)
void NotJoinedBlocks::setRightIndex(size_t right_pos, size_t result_position)
{
if (!column_indices_right.count(right_pos))
if (!column_indices_right.contains(right_pos))
{
column_indices_right[right_pos] = result_position;
extractColumnChanges(right_pos, result_position);
@ -567,7 +549,7 @@ void NotJoined::setRightIndex(size_t right_pos, size_t result_position)
same_result_keys[result_position] = column_indices_right[right_pos];
}
void NotJoined::extractColumnChanges(size_t right_pos, size_t result_pos)
void NotJoinedBlocks::extractColumnChanges(size_t right_pos, size_t result_pos)
{
auto src_props = getLowcardAndNullability(saved_block_sample.getByPosition(right_pos).column);
auto dst_props = getLowcardAndNullability(result_sample_block.getByPosition(result_pos).column);
@ -579,7 +561,7 @@ void NotJoined::extractColumnChanges(size_t right_pos, size_t result_pos)
right_lowcard_changes.push_back({result_pos, dst_props.is_lowcard});
}
void NotJoined::correctLowcardAndNullability(Block & block)
void NotJoinedBlocks::correctLowcardAndNullability(Block & block)
{
for (auto & [pos, added] : right_nullability_changes)
{
@ -607,7 +589,7 @@ void NotJoined::correctLowcardAndNullability(Block & block)
}
}
void NotJoined::addLeftColumns(Block & block, size_t rows_added) const
void NotJoinedBlocks::addLeftColumns(Block & block, size_t rows_added) const
{
for (size_t pos : column_indices_left)
{
@ -619,7 +601,7 @@ void NotJoined::addLeftColumns(Block & block, size_t rows_added) const
}
}
void NotJoined::addRightColumns(Block & block, MutableColumns & columns_right) const
void NotJoinedBlocks::addRightColumns(Block & block, MutableColumns & columns_right) const
{
for (const auto & pr : column_indices_right)
{
@ -629,7 +611,7 @@ void NotJoined::addRightColumns(Block & block, MutableColumns & columns_right) c
}
}
void NotJoined::copySameKeys(Block & block) const
void NotJoinedBlocks::copySameKeys(Block & block) const
{
for (const auto & pr : same_result_keys)
{
@ -639,4 +621,26 @@ void NotJoined::copySameKeys(Block & block) const
}
}
Block NotJoinedBlocks::read()
{
Block result_block = result_sample_block.cloneEmpty();
{
Block right_block = filler->getEmptyBlock();
MutableColumns columns_right = right_block.cloneEmptyColumns();
size_t rows_added = filler->fillColumns(columns_right);
if (rows_added == 0)
return {};
addLeftColumns(result_block, rows_added);
addRightColumns(result_block, columns_right);
}
copySameKeys(result_block);
correctLowcardAndNullability(result_block);
#ifndef NDEBUG
assertBlocksHaveEqualStructure(result_block, result_sample_block, "NotJoinedBlocks");
#endif
return result_block;
}
}

View File

@ -30,7 +30,7 @@ ColumnRawPtrs materializeColumnsInplace(Block & block, const Names & names);
ColumnRawPtrs getRawPointers(const Columns & columns);
void removeLowCardinalityInplace(Block & block);
void removeLowCardinalityInplace(Block & block, const Names & names, bool change_type = true);
void restoreLowCardinalityInplace(Block & block);
void restoreLowCardinalityInplace(Block & block, const Names & lowcard_keys);
ColumnRawPtrs extractKeysForJoin(const Block & block_keys, const Names & key_names_right);
@ -64,40 +64,58 @@ void changeLowCardinalityInplace(ColumnWithTypeAndName & column);
}
/// Creates result from right table data in RIGHT and FULL JOIN when keys are not present in left table.
class NotJoined
class NotJoinedBlocks final
{
public:
NotJoined(const TableJoin & table_join, const Block & saved_block_sample_, const Block & right_sample_block,
const Block & result_sample_block_, const Names & key_names_left_ = {}, const Names & key_names_right_ = {});
using LeftToRightKeyRemap = std::unordered_map<String, String>;
/// Returns non joined columns from right part of join
class RightColumnsFiller
{
public:
/// Create empty block for right part
virtual Block getEmptyBlock() = 0;
/// Fill columns from right part of join with not joined rows
virtual size_t fillColumns(MutableColumns & columns_right) = 0;
virtual ~RightColumnsFiller() = default;
};
NotJoinedBlocks(std::unique_ptr<RightColumnsFiller> filler_,
const Block & result_sample_block_,
size_t left_columns_count,
const LeftToRightKeyRemap & left_to_right_key_remap);
Block read();
private:
void extractColumnChanges(size_t right_pos, size_t result_pos);
void correctLowcardAndNullability(Block & block);
void addLeftColumns(Block & block, size_t rows_added) const;
void addRightColumns(Block & block, MutableColumns & columns_right) const;
void copySameKeys(Block & block) const;
protected:
std::unique_ptr<RightColumnsFiller> filler;
/// Right block saved in Join
Block saved_block_sample;
/// Output of join
Block result_sample_block;
Names key_names_left;
Names key_names_right;
~NotJoined() = default;
private:
/// Indices of columns in result_sample_block that should be generated
std::vector<size_t> column_indices_left;
/// Indices of columns that come from the right-side table: right_pos -> result_pos
std::unordered_map<size_t, size_t> column_indices_right;
///
std::unordered_map<size_t, size_t> same_result_keys;
/// Which right columns (saved in parent) need nullability change before placing them in result block
/// Which right columns (saved in parent) need Nullability/LowCardinality change
/// before placing them in result block
std::vector<std::pair<size_t, bool>> right_nullability_changes;
/// Which right columns (saved in parent) need LowCardinality change before placing them in result block
std::vector<std::pair<size_t, bool>> right_lowcard_changes;
void setRightIndex(size_t right_pos, size_t result_position);
void extractColumnChanges(size_t right_pos, size_t result_pos);
};
}

View File

@ -850,15 +850,24 @@ static bool isOneOf(TokenType token)
return ((token == tokens) || ...);
}
bool ParserCastOperator::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
/// Parse numbers (including decimals), strings and arrays of them.
/// Parse numbers (including decimals), strings, arrays and tuples of them.
const char * data_begin = pos->begin;
const char * data_end = pos->end;
bool is_string_literal = pos->type == TokenType::StringLiteral;
if (pos->type == TokenType::Number || is_string_literal)
if (pos->type == TokenType::Minus)
{
++pos;
if (pos->type != TokenType::Number)
return false;
data_end = pos->end;
++pos;
}
else if (pos->type == TokenType::Number || is_string_literal)
{
++pos;
}
@ -876,7 +885,7 @@ bool ParserCastOperator::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
}
else if (pos->type == TokenType::ClosingSquareBracket)
{
if (isOneOf<TokenType::Comma, TokenType::OpeningRoundBracket>(last_token))
if (isOneOf<TokenType::Comma, TokenType::OpeningRoundBracket, TokenType::Minus>(last_token))
return false;
if (stack.empty() || stack.back() != TokenType::OpeningSquareBracket)
return false;
@ -884,7 +893,7 @@ bool ParserCastOperator::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
}
else if (pos->type == TokenType::ClosingRoundBracket)
{
if (isOneOf<TokenType::Comma, TokenType::OpeningSquareBracket>(last_token))
if (isOneOf<TokenType::Comma, TokenType::OpeningSquareBracket, TokenType::Minus>(last_token))
return false;
if (stack.empty() || stack.back() != TokenType::OpeningRoundBracket)
return false;
@ -892,10 +901,15 @@ bool ParserCastOperator::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
}
else if (pos->type == TokenType::Comma)
{
if (isOneOf<TokenType::OpeningSquareBracket, TokenType::OpeningRoundBracket, TokenType::Comma>(last_token))
if (isOneOf<TokenType::OpeningSquareBracket, TokenType::OpeningRoundBracket, TokenType::Comma, TokenType::Minus>(last_token))
return false;
}
else if (isOneOf<TokenType::Number, TokenType::StringLiteral>(pos->type))
else if (pos->type == TokenType::Number)
{
if (!isOneOf<TokenType::OpeningSquareBracket, TokenType::OpeningRoundBracket, TokenType::Comma, TokenType::Minus>(last_token))
return false;
}
else if (isOneOf<TokenType::StringLiteral, TokenType::Minus>(pos->type))
{
if (!isOneOf<TokenType::OpeningSquareBracket, TokenType::OpeningRoundBracket, TokenType::Comma>(last_token))
return false;
@ -915,6 +929,8 @@ bool ParserCastOperator::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
if (!stack.empty())
return false;
}
else
return false;
ASTPtr type_ast;
if (ParserToken(TokenType::DoubleColon).ignore(pos, expected)

View File

@ -664,10 +664,12 @@ bool ParserUnaryExpression::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
if (pos->type == TokenType::Minus)
{
ParserLiteral lit_p;
Pos begin = pos;
if (ParserCastOperator().parse(pos, node, expected))
return true;
if (lit_p.parse(pos, node, expected))
pos = begin;
if (ParserLiteral().parse(pos, node, expected))
return true;
pos = begin;

View File

@ -1,7 +1,6 @@
#include <Processors/Transforms/JoiningTransform.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/join_common.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataStreams/IBlockInputStream.h>
@ -114,7 +113,7 @@ void JoiningTransform::work()
}
else
{
if (!non_joined_stream)
if (!non_joined_blocks)
{
if (!finish_counter || !finish_counter->isLast())
{
@ -122,15 +121,15 @@ void JoiningTransform::work()
return;
}
non_joined_stream = join->createStreamWithNonJoinedRows(outputs.front().getHeader(), max_block_size);
if (!non_joined_stream)
non_joined_blocks = join->getNonJoinedBlocks(outputs.front().getHeader(), max_block_size);
if (!non_joined_blocks)
{
process_non_joined = false;
return;
}
}
auto block = non_joined_stream->read();
Block block = non_joined_blocks->read();
if (!block)
{
process_non_joined = false;

View File

@ -8,8 +8,7 @@ namespace DB
class IJoin;
using JoinPtr = std::shared_ptr<IJoin>;
class IBlockInputStream;
using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;
class NotJoinedBlocks;
/// Join rows to chunk form left table.
/// This transform usually has two input ports and one output.
@ -76,7 +75,7 @@ private:
ExtraBlockPtr not_processed;
FinishCounterPtr finish_counter;
BlockInputStreamPtr non_joined_stream;
std::shared_ptr<NotJoinedBlocks> non_joined_blocks;
size_t max_block_size;
Block readExecute(Chunk & chunk);

View File

@ -1764,21 +1764,21 @@ void registerWindowFunctions(AggregateFunctionFactory & factory)
{
return std::make_shared<WindowFunctionRank>(name, argument_types,
parameters);
}, properties});
}, properties}, AggregateFunctionFactory::CaseInsensitive);
factory.registerFunction("dense_rank", {[](const std::string & name,
const DataTypes & argument_types, const Array & parameters, const Settings *)
{
return std::make_shared<WindowFunctionDenseRank>(name, argument_types,
parameters);
}, properties});
}, properties}, AggregateFunctionFactory::CaseInsensitive);
factory.registerFunction("row_number", {[](const std::string & name,
const DataTypes & argument_types, const Array & parameters, const Settings *)
{
return std::make_shared<WindowFunctionRowNumber>(name, argument_types,
parameters);
}, properties});
}, properties}, AggregateFunctionFactory::CaseInsensitive);
factory.registerFunction("lagInFrame", {[](const std::string & name,
const DataTypes & argument_types, const Array & parameters, const Settings *)
@ -1799,7 +1799,7 @@ void registerWindowFunctions(AggregateFunctionFactory & factory)
{
return std::make_shared<WindowFunctionNthValue>(
name, argument_types, parameters);
}, properties});
}, properties}, AggregateFunctionFactory::CaseInsensitive);
}
}

View File

@ -49,27 +49,6 @@
namespace DB
{
namespace
{
std::string formatHTTPErrorResponse(const Poco::Util::AbstractConfiguration& config)
{
std::string result = fmt::format(
"HTTP/1.0 400 Bad Request\r\n\r\n"
"Port {} is for clickhouse-client program\r\n",
config.getString("tcp_port"));
if (config.has("http_port"))
{
result += fmt::format(
"You must use port {} for HTTP.\r\n",
config.getString("http_port"));
}
return result;
}
}
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
@ -925,6 +904,29 @@ bool TCPHandler::receiveProxyHeader()
}
namespace
{
std::string formatHTTPErrorResponseWhenUserIsConnectedToWrongPort(const Poco::Util::AbstractConfiguration& config)
{
std::string result = fmt::format(
"HTTP/1.0 400 Bad Request\r\n\r\n"
"Port {} is for clickhouse-client program\r\n",
config.getString("tcp_port"));
if (config.has("http_port"))
{
result += fmt::format(
"You must use port {} for HTTP.\r\n",
config.getString("http_port"));
}
return result;
}
}
void TCPHandler::receiveHello()
{
/// Receive `hello` packet.
@ -940,9 +942,7 @@ void TCPHandler::receiveHello()
*/
if (packet_type == 'G' || packet_type == 'P')
{
writeString(formatHTTPErrorResponse(server.config()),
*out);
writeString(formatHTTPErrorResponseWhenUserIsConnectedToWrongPort(server.config()), *out);
throw Exception("Client has connected to wrong port", ErrorCodes::CLIENT_HAS_CONNECTED_TO_WRONG_PORT);
}
else

View File

@ -172,11 +172,21 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
storage.cloneReplicaIfNeeded(zookeeper);
storage.queue.load(zookeeper);
try
{
storage.queue.load(zookeeper);
/// pullLogsToQueue() after we mark replica 'is_active' (and after we repair if it was lost);
/// because cleanup_thread doesn't delete log_pointer of active replicas.
storage.queue.pullLogsToQueue(zookeeper, {}, ReplicatedMergeTreeQueue::LOAD);
}
catch (...)
{
std::unique_lock lock(storage.last_queue_update_exception_lock);
storage.last_queue_update_exception = getCurrentExceptionMessage(false);
throw;
}
/// pullLogsToQueue() after we mark replica 'is_active' (and after we repair if it was lost);
/// because cleanup_thread doesn't delete log_pointer of active replicas.
storage.queue.pullLogsToQueue(zookeeper, {}, ReplicatedMergeTreeQueue::LOAD);
storage.queue.removeCurrentPartsFromMutations();
storage.last_queue_update_finish_time.store(time(nullptr));

View File

@ -3079,6 +3079,12 @@ void StorageReplicatedMergeTree::cloneReplicaIfNeeded(zkutil::ZooKeeperPtr zooke
zookeeper->set(fs::path(replica_path) / "is_lost", "0");
}
String StorageReplicatedMergeTree::getLastQueueUpdateException() const
{
std::unique_lock lock(last_queue_update_exception_lock);
return last_queue_update_exception;
}
void StorageReplicatedMergeTree::queueUpdatingTask()
{
@ -3097,6 +3103,9 @@ void StorageReplicatedMergeTree::queueUpdatingTask()
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
std::unique_lock lock(last_queue_update_exception_lock);
last_queue_update_exception = getCurrentExceptionMessage(false);
if (e.code == Coordination::Error::ZSESSIONEXPIRED)
{
restarting_thread.wakeup();
@ -3108,6 +3117,10 @@ void StorageReplicatedMergeTree::queueUpdatingTask()
catch (...)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
std::unique_lock lock(last_queue_update_exception_lock);
last_queue_update_exception = getCurrentExceptionMessage(false);
queue_updating_task->scheduleAfter(QUEUE_UPDATE_ERROR_SLEEP_MS);
}
}
@ -5565,6 +5578,7 @@ void StorageReplicatedMergeTree::getStatus(Status & res, bool with_zk_fields)
res.log_pointer = 0;
res.total_replicas = 0;
res.active_replicas = 0;
res.last_queue_update_exception = getLastQueueUpdateException();
if (with_zk_fields && !res.is_session_expired)
{

View File

@ -174,6 +174,7 @@ public:
UInt64 absolute_delay;
UInt8 total_replicas;
UInt8 active_replicas;
String last_queue_update_exception;
/// If the error has happened fetching the info from ZooKeeper, this field will be set.
String zookeeper_exception;
std::unordered_map<std::string, bool> replica_is_active;
@ -331,6 +332,10 @@ private:
std::atomic<time_t> last_queue_update_start_time{0};
std::atomic<time_t> last_queue_update_finish_time{0};
mutable std::mutex last_queue_update_exception_lock;
String last_queue_update_exception;
String getLastQueueUpdateException() const;
DataPartsExchange::Fetcher fetcher;
/// When activated, replica is initialized and startup() method could exit

View File

@ -51,6 +51,7 @@ StorageSystemReplicas::StorageSystemReplicas(const StorageID & table_id_)
{ "absolute_delay", std::make_shared<DataTypeUInt64>() },
{ "total_replicas", std::make_shared<DataTypeUInt8>() },
{ "active_replicas", std::make_shared<DataTypeUInt8>() },
{ "last_queue_update_exception", std::make_shared<DataTypeString>() },
{ "zookeeper_exception", std::make_shared<DataTypeString>() },
{ "replica_is_active", std::make_shared<DataTypeMap>(std::make_shared<DataTypeString>(), std::make_shared<DataTypeUInt8>()) }
}));
@ -186,6 +187,7 @@ Pipe StorageSystemReplicas::read(
res_columns[col_num++]->insert(status.absolute_delay);
res_columns[col_num++]->insert(status.total_replicas);
res_columns[col_num++]->insert(status.active_replicas);
res_columns[col_num++]->insert(status.last_queue_update_exception);
res_columns[col_num++]->insert(status.zookeeper_exception);
Map replica_is_active_values;

View File

@ -44,15 +44,17 @@ DISTRIBUTED_DDL_TIMEOUT_MSG = "is executing longer than distributed_ddl_task_tim
MESSAGES_TO_RETRY = [
"DB::Exception: ZooKeeper session has been expired",
"DB::Exception: Connection loss",
"Coordination::Exception: Session expired",
"Coordination::Exception: Connection loss",
"Coordination::Exception: Operation timeout",
"DB::Exception: Session expired",
"DB::Exception: Connection loss",
"DB::Exception: Operation timeout",
"Operation timed out",
"ConnectionPoolWithFailover: Connection failed at try",
"DB::Exception: New table appeared in database being dropped or detached. Try again",
"is already started to be removing by another replica right now",
"DB::Exception: Cannot enqueue query",
"Shutdown is called for table", # It happens in SYSTEM SYNC REPLICA query if session with ZooKeeper is being reinitialized.
DISTRIBUTED_DDL_TIMEOUT_MSG # FIXME
]

View File

@ -22,9 +22,9 @@
<operation_timeout_ms>10000</operation_timeout_ms>
<session_timeout_ms>30000</session_timeout_ms>
<heart_beat_interval_ms>1000</heart_beat_interval_ms>
<election_timeout_lower_bound_ms>2000</election_timeout_lower_bound_ms>
<election_timeout_upper_bound_ms>4000</election_timeout_upper_bound_ms>
<raft_logs_level>trace</raft_logs_level>
<election_timeout_lower_bound_ms>4000</election_timeout_lower_bound_ms>
<election_timeout_upper_bound_ms>5000</election_timeout_upper_bound_ms>
<raft_logs_level>information</raft_logs_level>
<force_sync>false</force_sync>
<!-- we want all logs for complex problems investigation -->
<reserved_log_items>1000000000000000</reserved_log_items>

View File

@ -474,6 +474,11 @@ class ClickHouseCluster:
cmd += " client"
return cmd
def copy_file_from_container_to_container(self, src_node, src_path, dst_node, dst_path):
fname = os.path.basename(src_path)
run_and_check([f"docker cp {src_node.docker_id}:{src_path} {self.instances_dir}"], shell=True)
run_and_check([f"docker cp {self.instances_dir}/{fname} {dst_node.docker_id}:{dst_path}"], shell=True)
def setup_zookeeper_secure_cmd(self, instance, env_variables, docker_compose_yml_dir):
logging.debug('Setup ZooKeeper Secure')
zookeeper_docker_compose_path = p.join(docker_compose_yml_dir, 'docker_compose_zookeeper_secure.yml')

View File

@ -0,0 +1,37 @@
<yandex>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<snapshot_distance>75</snapshot_distance>
<reserved_log_items>5</reserved_log_items>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
<priority>3</priority>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
<priority>2</priority>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>44444</port>
<priority>1</priority>
</server>
</raft_configuration>
</keeper_server>
</yandex>

View File

@ -0,0 +1,37 @@
<yandex>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<snapshot_distance>75</snapshot_distance>
<reserved_log_items>5</reserved_log_items>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
<priority>3</priority>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
<priority>2</priority>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>44444</port>
<priority>1</priority>
</server>
</raft_configuration>
</keeper_server>
</yandex>

View File

@ -0,0 +1,37 @@
<yandex>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<snapshot_distance>75</snapshot_distance>
<reserved_log_items>5</reserved_log_items>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
<priority>3</priority>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
<priority>2</priority>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>44444</port>
<priority>1</priority>
</server>
</raft_configuration>
</keeper_server>
</yandex>

View File

@ -0,0 +1,12 @@
<yandex>
<shutdown_wait_unfinished>3</shutdown_wait_unfinished>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/log.log</log>
<errorlog>/var/log/clickhouse-server/log.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
</logger>
</yandex>

View File

@ -0,0 +1,120 @@
#!/usr/bin/env python3
##!/usr/bin/env python3
import pytest
from helpers.cluster import ClickHouseCluster
from multiprocessing.dummy import Pool
from kazoo.client import KazooClient, KazooState
import random
import string
import os
import time
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/keeper_config1.xml'], stay_alive=True)
node2 = cluster.add_instance('node2', main_configs=['configs/keeper_config2.xml'], stay_alive=True)
node3 = cluster.add_instance('node3', main_configs=['configs/keeper_config3.xml'], stay_alive=True)
def start_zookeeper(node):
node1.exec_in_container(['bash', '-c', '/opt/zookeeper/bin/zkServer.sh start'])
def stop_zookeeper(node):
node.exec_in_container(['bash', '-c', '/opt/zookeeper/bin/zkServer.sh stop'])
def clear_zookeeper(node):
node.exec_in_container(['bash', '-c', 'rm -fr /zookeeper/*'])
def restart_and_clear_zookeeper(node):
stop_zookeeper(node)
clear_zookeeper(node)
start_zookeeper(node)
def clear_clickhouse_data(node):
node.exec_in_container(['bash', '-c', 'rm -fr /var/lib/clickhouse/coordination/logs/* /var/lib/clickhouse/coordination/snapshots/*'])
def convert_zookeeper_data(node):
cmd = '/usr/bin/clickhouse keeper-converter --zookeeper-logs-dir /zookeeper/version-2/ --zookeeper-snapshots-dir /zookeeper/version-2/ --output-dir /var/lib/clickhouse/coordination/snapshots'
node.exec_in_container(['bash', '-c', cmd])
return os.path.join('/var/lib/clickhouse/coordination/snapshots', node.exec_in_container(['bash', '-c', 'ls /var/lib/clickhouse/coordination/snapshots']).strip())
def stop_clickhouse(node):
node.stop_clickhouse()
def start_clickhouse(node):
node.start_clickhouse()
def copy_zookeeper_data(make_zk_snapshots, node):
stop_zookeeper(node)
if make_zk_snapshots: # force zookeeper to create snapshot
start_zookeeper(node)
stop_zookeeper(node)
stop_clickhouse(node)
clear_clickhouse_data(node)
convert_zookeeper_data(node)
start_zookeeper(node)
start_clickhouse(node)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def get_fake_zk(node, timeout=30.0):
_fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(node.name) + ":9181", timeout=timeout)
_fake_zk_instance.start()
return _fake_zk_instance
def get_genuine_zk(node, timeout=30.0):
_genuine_zk_instance = KazooClient(hosts=cluster.get_instance_ip(node.name) + ":2181", timeout=timeout)
_genuine_zk_instance.start()
return _genuine_zk_instance
def test_snapshot_and_load(started_cluster):
restart_and_clear_zookeeper(node1)
genuine_connection = get_genuine_zk(node1)
for node in [node1, node2, node3]:
print("Stop and clear", node.name, "with dockerid", node.docker_id)
stop_clickhouse(node)
clear_clickhouse_data(node)
for i in range(1000):
genuine_connection.create("/test" + str(i), b"data")
print("Data loaded to zookeeper")
stop_zookeeper(node1)
start_zookeeper(node1)
stop_zookeeper(node1)
print("Data copied to node1")
resulted_path = convert_zookeeper_data(node1)
print("Resulted path", resulted_path)
for node in [node2, node3]:
print("Copy snapshot from", node1.name, "to", node.name)
cluster.copy_file_from_container_to_container(node1, resulted_path, node, '/var/lib/clickhouse/coordination/snapshots')
print("Starting clickhouses")
p = Pool(3)
result = p.map_async(start_clickhouse, [node1, node2, node3])
result.wait()
print("Loading additional data")
fake_zks = [get_fake_zk(node) for node in [node1, node2, node3]]
for i in range(1000):
fake_zk = random.choice(fake_zks)
try:
fake_zk.create("/test" + str(i + 1000), b"data")
except Exception as ex:
print("Got exception:" + str(ex))
print("Final")
fake_zks[0].create("/test10000", b"data")

View File

@ -44,6 +44,11 @@ def ch_cluster():
'/usr/bin/g++ -shared -o /etc/clickhouse-server/config.d/dictionaries_lib/dict_lib.so -fPIC /etc/clickhouse-server/config.d/dictionaries_lib/dict_lib.cpp'],
user='root')
instance.exec_in_container(
['bash', '-c',
'/usr/bin/g++ -shared -o /dict_lib_copy.so -fPIC /etc/clickhouse-server/config.d/dictionaries_lib/dict_lib.cpp'], user='root')
instance.exec_in_container(['bash', '-c', 'ln -s /dict_lib_copy.so /etc/clickhouse-server/config.d/dictionaries_lib/dict_lib_symlink.so'])
yield cluster
finally:
@ -59,6 +64,7 @@ def test_load_all(ch_cluster):
if instance.is_built_with_memory_sanitizer():
pytest.skip("Memory Sanitizer cannot work with third-party shared libraries")
instance.query('DROP DICTIONARY IF EXISTS lib_dict')
instance.query('''
CREATE DICTIONARY lib_dict (key UInt64, value1 UInt64, value2 UInt64, value3 UInt64)
PRIMARY KEY key
@ -128,6 +134,7 @@ def test_load_keys(ch_cluster):
if instance.is_built_with_memory_sanitizer():
pytest.skip("Memory Sanitizer cannot work with third-party shared libraries")
instance.query('DROP DICTIONARY IF EXISTS lib_dict_ckc')
instance.query('''
CREATE DICTIONARY lib_dict_ckc (key UInt64, value1 UInt64, value2 UInt64, value3 UInt64)
PRIMARY KEY key
@ -148,6 +155,7 @@ def test_load_all_many_rows(ch_cluster):
pytest.skip("Memory Sanitizer cannot work with third-party shared libraries")
num_rows = [1000, 10000, 100000, 1000000]
instance.query('DROP DICTIONARY IF EXISTS lib_dict')
for num in num_rows:
instance.query('''
CREATE DICTIONARY lib_dict (key UInt64, value1 UInt64, value2 UInt64, value3 UInt64)
@ -267,6 +275,42 @@ def test_bridge_dies_with_parent(ch_cluster):
instance.query('DROP DICTIONARY lib_dict_c')
def test_path_validation(ch_cluster):
if instance.is_built_with_memory_sanitizer():
pytest.skip("Memory Sanitizer cannot work with third-party shared libraries")
instance.query('DROP DICTIONARY IF EXISTS lib_dict_c')
instance.query('''
CREATE DICTIONARY lib_dict_c (key UInt64, value1 UInt64, value2 UInt64, value3 UInt64)
PRIMARY KEY key SOURCE(library(PATH '/etc/clickhouse-server/config.d/dictionaries_lib/dict_lib_symlink.so'))
LAYOUT(CACHE(
SIZE_IN_CELLS 10000000
BLOCK_SIZE 4096
FILE_SIZE 16777216
READ_BUFFER_SIZE 1048576
MAX_STORED_KEYS 1048576))
LIFETIME(2) ;
''')
result = instance.query('''select dictGet(lib_dict_c, 'value1', toUInt64(1));''')
assert(result.strip() == '101')
instance.query('DROP DICTIONARY IF EXISTS lib_dict_c')
instance.query('''
CREATE DICTIONARY lib_dict_c (key UInt64, value1 UInt64, value2 UInt64, value3 UInt64)
PRIMARY KEY key SOURCE(library(PATH '/etc/clickhouse-server/config.d/dictionaries_lib/../../../../dict_lib_copy.so'))
LAYOUT(CACHE(
SIZE_IN_CELLS 10000000
BLOCK_SIZE 4096
FILE_SIZE 16777216
READ_BUFFER_SIZE 1048576
MAX_STORED_KEYS 1048576))
LIFETIME(2) ;
''')
result = instance.query_and_get_error('''select dictGet(lib_dict_c, 'value1', toUInt64(1));''')
assert('DB::Exception: File path /etc/clickhouse-server/config.d/dictionaries_lib/../../../../dict_lib_copy.so is not inside /etc/clickhouse-server/config.d/dictionaries_lib' in result)
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")

View File

@ -42,10 +42,10 @@ popd > /dev/null
#SCRIPTDIR=`dirname "$SCRIPTPATH"`
SCRIPTDIR=$SCRIPTPATH
cat "$SCRIPTDIR"/00282_merging.sql | $CLICKHOUSE_CLIENT --preferred_block_size_bytes=10 -n > "${CLICKHOUSE_TMP}"/preferred_block_size_bytes.stdout 2>&1
cat "$SCRIPTDIR"/00282_merging.sql | $CLICKHOUSE_CLIENT --preferred_block_size_bytes=10 -n > "${CLICKHOUSE_TMP}"/preferred_block_size_bytes.stdout
cmp "$SCRIPTDIR"/00282_merging.reference "${CLICKHOUSE_TMP}"/preferred_block_size_bytes.stdout && echo PASSED || echo FAILED
cat "$SCRIPTDIR"/00282_merging.sql | $CLICKHOUSE_CLIENT --preferred_block_size_bytes=20 -n > "${CLICKHOUSE_TMP}"/preferred_block_size_bytes.stdout 2>&1
cat "$SCRIPTDIR"/00282_merging.sql | $CLICKHOUSE_CLIENT --preferred_block_size_bytes=20 -n > "${CLICKHOUSE_TMP}"/preferred_block_size_bytes.stdout
cmp "$SCRIPTDIR"/00282_merging.reference "${CLICKHOUSE_TMP}"/preferred_block_size_bytes.stdout && echo PASSED || echo FAILED
rm "${CLICKHOUSE_TMP}"/preferred_block_size_bytes.stdout

View File

@ -0,0 +1,10 @@
-1
SELECT CAST(\'-1\', \'Int32\')
-0.1
SELECT CAST(\'-0.1\', \'Decimal(38, 38)\')
-0.111
SELECT CAST(\'-0.111\', \'Float64\')
[-1,2,-3]
SELECT CAST(\'[-1, 2, -3]\', \'Array(Int32)\')
[-1.1,2,-3]
SELECT CAST(\'[-1.1, 2, -3]\', \'Array(Float64)\')

View File

@ -0,0 +1,14 @@
SELECT -1::Int32;
EXPLAIN SYNTAX SELECT -1::Int32;
SELECT -0.1::Decimal(38, 38);
EXPLAIN SYNTAX SELECT -0.1::Decimal(38, 38);
SELECT -0.111::Float64;
EXPLAIN SYNTAX SELECT -0.111::Float64;
SELECT [-1, 2, -3]::Array(Int32);
EXPLAIN SYNTAX SELECT [-1, 2, -3]::Array(Int32);
SELECT [-1.1, 2, -3]::Array(Float64);
EXPLAIN SYNTAX SELECT [-1.1, 2, -3]::Array(Float64);

View File

@ -8,3 +8,11 @@ Syntax error
Syntax error
Syntax error
Code: 6
Syntax error
Syntax error
Syntax error
Syntax error
Syntax error
Syntax error
Syntax error
Syntax error

View File

@ -15,3 +15,13 @@ $CLICKHOUSE_CLIENT --query="SELECT [1 2]::Array(UInt8)" 2>&1 | grep -o -m1 'Syn
$CLICKHOUSE_CLIENT --query="SELECT 1 4::UInt32" 2>&1 | grep -o 'Syntax error'
$CLICKHOUSE_CLIENT --query="SELECT '1' '4'::UInt32" 2>&1 | grep -o -m1 'Syntax error'
$CLICKHOUSE_CLIENT --query="SELECT '1''4'::UInt32" 2>&1 | grep -o -m1 'Code: 6'
$CLICKHOUSE_CLIENT --query="SELECT ::UInt32" 2>&1 | grep -o 'Syntax error'
$CLICKHOUSE_CLIENT --query="SELECT ::String" 2>&1 | grep -o 'Syntax error'
$CLICKHOUSE_CLIENT --query="SELECT -::Int32" 2>&1 | grep -o 'Syntax error'
$CLICKHOUSE_CLIENT --query="SELECT [1, -]::Array(Int32)" 2>&1 | grep -o 'Syntax error'
$CLICKHOUSE_CLIENT --query="SELECT [1, 3-]::Array(Int32)" 2>&1 | grep -o 'Syntax error'
$CLICKHOUSE_CLIENT --query="SELECT [-, 2]::Array(Int32)" 2>&1 | grep -o 'Syntax error'
$CLICKHOUSE_CLIENT --query="SELECT [--, 2]::Array(Int32)" 2>&1 | grep -o 'Syntax error'
$CLICKHOUSE_CLIENT --query="SELECT [1, 2]-::Array(Int32)" 2>&1 | grep -o 'Syntax error'

View File

@ -0,0 +1,6 @@
const column
2021-08-15 18:57:56 1426860702823350272
2021-08-15 18:57:56.492 1426860704886947840
non-const column
2021-08-15 18:57:56 1426860702823350272
2021-08-15 18:57:56.492 1426860704886947840

View File

@ -0,0 +1,23 @@
-- Error cases
SELECT dateTimeToSnowflake(); -- {serverError 42}
SELECT dateTime64ToSnowflake(); -- {serverError 42}
SELECT dateTimeToSnowflake('abc'); -- {serverError 43}
SELECT dateTime64ToSnowflake('abc'); -- {serverError 43}
SELECT dateTimeToSnowflake('abc', 123); -- {serverError 42}
SELECT dateTime64ToSnowflake('abc', 123); -- {serverError 42}
SELECT 'const column';
WITH toDateTime('2021-08-15 18:57:56', 'Asia/Shanghai') AS dt
SELECT dt, dateTimeToSnowflake(dt);
WITH toDateTime64('2021-08-15 18:57:56.492', 3, 'Asia/Shanghai') AS dt64
SELECT dt64, dateTime64ToSnowflake(dt64);
SELECT 'non-const column';
WITH toDateTime('2021-08-15 18:57:56', 'Asia/Shanghai') AS x
SELECT materialize(x) as dt, dateTimeToSnowflake(dt);;
WITH toDateTime64('2021-08-15 18:57:56.492', 3, 'Asia/Shanghai') AS x
SELECT materialize(x) as dt64, dateTime64ToSnowflake(dt64);

View File

@ -0,0 +1,3 @@
const column
UTC 1426860704886947840 2021-08-15 10:57:56 DateTime(\'UTC\') 2021-08-15 10:57:56.492 DateTime64(3, \'UTC\')
Asia/Shanghai 1426860704886947840 2021-08-15 18:57:56 DateTime(\'Asia/Shanghai\') 2021-08-15 18:57:56.492 DateTime64(3, \'Asia/Shanghai\')

View File

@ -0,0 +1,32 @@
-- -- Error cases
SELECT snowflakeToDateTime(); -- {serverError 42}
SELECT snowflakeToDateTime64(); -- {serverError 42}
SELECT snowflakeToDateTime('abc'); -- {serverError 43}
SELECT snowflakeToDateTime64('abc'); -- {serverError 43}
SELECT snowflakeToDateTime('abc', 123); -- {serverError 43}
SELECT snowflakeToDateTime64('abc', 123); -- {serverError 43}
SELECT 'const column';
WITH
CAST(1426860704886947840 AS Int64) AS i64,
'UTC' AS tz
SELECT
tz,
i64,
snowflakeToDateTime(i64, tz) as dt,
toTypeName(dt),
snowflakeToDateTime64(i64, tz) as dt64,
toTypeName(dt64);
WITH
CAST(1426860704886947840 AS Int64) AS i64,
'Asia/Shanghai' AS tz
SELECT
tz,
i64,
snowflakeToDateTime(i64, tz) as dt,
toTypeName(dt),
snowflakeToDateTime64(i64, tz) as dt64,
toTypeName(dt64);

View File

@ -0,0 +1,83 @@
---
title: 'Testing the Performance of ClickHouse'
image: 'https://blog-images.clickhouse.tech/en/2021/performance-testing-1/chebu-crop.jpg'
date: '2021-08-19'
author: '[Alexander Kuzmenkov](https://github.com/akuzm)'
tags: ['testing', 'performance']
---
One of the main selling points of ClickHouse is that it's very fast, in many cases utilizing the hardware up to the theoretical limits. This was noted by many independent benchmark such as [this one](http://brandonharris.io/redshift-clickhouse-time-series/). This speed boils down to a right combination of architectural choices and algorithmic optimizations, sprinkled with a dash of pixie dust. There is an [overview of these factors](https://clickhouse.tech/docs/en/faq/general/why-clickhouse-is-so-fast) on our website, or a talk by the ClickHouse lead developer Alexey Milovidov ["The secrets of ClickHouse performance optimizations"](https://www.youtube.com/watch?v=ZOZQCQEtrz8). But this is a static picture of "how the things are". Software is a living and changing organism, and ClickHouse is changing very fast &mdash; to give you a scale, in July 2021 we merged 319 pull requests made by 60 different authors ([live statistics here](https://gh-api.clickhouse.tech/play?user=play#c2VsZWN0IGRhdGVfdHJ1bmMoJ21vbnRoJywgY3JlYXRlZF9hdCkgbW9udGgsIHVuaXEoY3JlYXRvcl91c2VyX2xvZ2luKSBhdXRob3JzLCB1bmlxKG51bWJlcikgcHJzIGZyb20gZ2l0aHViX2V2ZW50cyB3aGVyZSByZXBvX25hbWUgPSAnQ2xpY2tIb3VzZS9DbGlja0hvdXNlJyBhbmQgbm90IGhhc0FueShsYWJlbHMsIFsncHItYmFja3BvcnQnLCAncHItZG9jdW1lbnRhdGlvbicsICdwci1jaGVycnlwaWNrJ10pIGFuZCBtZXJnZWQgYW5kIGNyZWF0ZWRfYXQgYmV0d2VlbiAnMjAyMC0wOS0wMScgYW5kICcyMDIxLTA5LTAxJyBncm91cCBieSBtb250aA==)). Any quality that is not actively selected for is going to be lost in this endless stream of changes, and the performance is no exception. For this reason, we have to have some process that allows us to ensure than ClickHouse always stays fast.
# Measuring and Comparing the Performance
How do we know it is fast, in the first place? We do a lot of benchmarks, many kinds of them. The most basic kind of a benchmark is a micro-benchmark, that doesn't use the full code of the server and tests a particular algorithm in isolation. We use them to choose a better inner loop for some aggregate function, or to test various layouts of hash tables, and so on. For example, when we discovered that a competing database engine completes a query with `sum` aggregate function twice as fast, we tested a couple of dozen implementations of `sum` to ultimately find the one that gives the best performance (see [a talk](https://www.youtube.com/watch?v=MJJfWoWJq0o) about this, in Russian). But testing a particular algorithm by itself is not enough to say how the entire query is going to work. We have to also make end-to-end measurements of entire queries, often using the real production data, because the particulars of the data (e.g. the cardinality and the distribution of values) heavily influence the performance. Currently we have about 3000 end-to-end test queries organized into about 200 [tests](https://github.com/ClickHouse/ClickHouse/tree/6c4c3df96e41425185beb0c471a8dde0ce6f25a7/tests/performance). Many of them use real data sets, such as the [production data of Yandex.Metrica](https://clickhouse.tech/docs/en/getting-started/example-datasets/metrica/), obfuscated with `clickhouse-obfuscator` as described [here](https://habr.com/ru/company/yandex/blog/485096/).
Micro-benchmarks are normally ran by a developer while working on the code, but it is not practical to manually run the entire battery of the end-to-end tests for each change. We use an automated system that does this for each pull request as part of continuous integration checks. It measures whether the code changes introduced by a pull request influenced the performance, for which kinds of queries and by how much, and alerts the developer if there is a regression. Here is how a typical report looks.
<img src="https://blog-images.clickhouse.tech/en/2021/performance-testing-1/report.png"/>
To talk about "changes in performance", we first have to measure this performance. The most natural measure for a single query is elapsed time. It is susceptible to random variations, so we have to take several measurements and average them in some way. From the application point of view, the most interesting statistic is maximum. We want to guarantee that e.g. an analytical dashboard built on ClickHouse is responsive. However, the query time can grow almost without limit due to random factor such as sudden disk load spikes or network delays, so using the maximum is not practical. The minimum is also interesting &mdash; after all, there is a theoretical bound on it. We know that the particular algorithm can run only so fast on the particular hardware, in ideal conditions. But if we only look at the minimum, we are going to miss cases where some runs of the query are slow and some are not (e.g. boundary effects in some cache). So we compromise by measuring the median. It is a robust statistic that is reasonably sensitive to outliers and stable enough against noise.
After measuring the performance, how do we determine that it has changed? Due to various random and systematic factors, the query time always drifts, so the number always changes, but the question is whether this change is meaningful. If we have an old version of the server, and a new version of the server, are they going to consistently give a different result for this query, or was it just a fluke? To answer this, we have to employ some statistical method. The core idea of these methods is comparing the observed values to some reference distribution, and deciding whether what we observed can plausibly belong to this distribution, or, on the contrary, it cannot, which means that the performance characteristics of the patched server are indeed different.
Choosing the reference distribution is the starting point. One way to obtain it is to build a mathematical model of the process. This works well for simple things like tossing a coin a fixed number of times. We can analytically deduce that the number of heads we get follows the binomial distribution, and get a confidence interval on this number, given the required [level of significance](https://en.wikipedia.org/wiki/P-value#Definition_and_interpretation). If the observed number of heads doesn't belong to this interval, we can conclude that the coin is biased. However, modeling the query execution from first principles is too complex. The best we can do is to use the hardware capabilities to estimate how fast the query could run, in principle, and try to achieve this throughput.
For complex processes which resist modeling, a practical option is to use the historical data from the same process. We actually used to do this for ClickHouse. For each tested commit, we measured the run times for each test query and saved them into a database. We could compare the patched server to these reference values, build graphs of changes over time and so on. The main problem with this approach is systematic errors induced by environment. Sometimes the performance testing task ends up on a machine with dying HDD, or they update `atop` to a broken version that slows every kernel call in half, et cetera, ad infinitum. This is why now we employ another approach.
We run the reference version of the server process and the tested version, simultaneously on the same machine, and run the test queries on each of them in turn, one by one. This way we eliminate most systematic errors, because both servers are equally influenced by them. We can then compare the set of results we got from the reference server process, and the set from the test server process, to see whether they look the same. Comparing the distributions using two samples is a very interesting problem in itself. We use a non-parametric bootstrap method to build a randomization distribution for the observed difference of median query run times. This method is described in detail in [[1]](#ref1), where they apply it to see how changing a fertilizer mixture changes the yield of tomato plants. ClickHouse is not much different from tomatoes, only we have to check how the changes in code influence the performance.
This method ultimately gives a single threshold number _T_: what is the largest difference in median query run times between old and new server, that we can observe even if nothing has changed. Then we have a simple decision protocol given this threshold _T_ and the measured difference of medians _D_:
1. _abs(D) <= T_ &mdash; the changes are not statistically significant,
2. _abs(D) <= 5%_ &mdash; the changes are too small to be important,
3. _abs(T) >= 10%_ &mdash; the test query has excessive run time variance that leads to poor sensitivity,
4. finally, _abs(D) >= T and abs(D) >= 5%_ &mdash; there are statistically significant changes of significant magnitude.
The most interesting case are the unstable queries _(3)_. When the elapsed time changes significantly between runs even on the same version of server, it means we won't be able to detect the changes of performance, because they are going to be drowned out by the noise. Such queries tend to be the most difficult to debug, because there is no straightforward way to compare "good" and "bad" server. This topic deserves its own article which we will publish next. For now, let's consider the happy path _(4)_. This is the case of real and notable changes in performance that this system is intended to catch. What do we do next?
# Understanding the Reasons Behind the Changes
An investigation of code performance often starts with applying a profiler. On Linux, you would use `perf`, a sampling profiler that periodically collects the stack trace of the process, so that you can then see an aggregate picture of where your program spends the most time. In ClickHouse, we actually have a built-in sampling profiler that saves results into a system table, so no external tools are needed. It can be enabled for all queries or for a particular one, by passing the settings [as described in the docs](https://clickhouse.tech/docs/en/operations/optimizing-performance/sampling-query-profiler/). It is on by default, so if you use a recent version of ClickHouse, you already have a combined profile of your production server load. To visualize it, we can use a well-known script for building [flamegraphs](https://github.com/brendangregg/FlameGraph):
```
clickhouse-client -q "SELECT
arrayStringConcat(
arrayMap(
x -> concat(splitByChar('/', addressToLine(x))[-1],
'#', demangle(addressToSymbol(x))),
trace),
';') AS stack,
count(*) AS samples
FROM system.trace_log
WHERE trace_type = 'Real'
AND query_id = '4aac5305-b27f-4a5a-91c3-61c0cf52ec2a'
GROUP BY trace" \
| flamegraph.pl
```
As an example, let's use the test run we've seen above. The tested [pull request](https://github.com/ClickHouse/ClickHouse/pull/26248) is supposed to speed up the `sum` aggregate function for nullable integer types. Let's look at the query #8 of the test 'sum': `SELECT sum(toNullable(number)) FROM numbers(100000000)`. The test system reported that its performance increased by 38.5%, and built a "differential" variant of flamegraph for it, that shows the relative time spent in different functions. We can see that the function that calculates the sum, `DB::AggregateFunctionSumData<unsigned long>::addManyNotNull<unsigned long>`, now takes 15% less time.
<object data="https://blog-images.clickhouse.tech/en/2021/performance-testing-1/sum-8-diff.svg" type="image/svg+xml" width="100%"/>
To get more leads into why the performance has changed, we can check how the various query metrics have changed between the old and the new servers. This includes all the metrics from `system.query_log.ProfileEvents`, such as `SelectedRows` or `RealTimeMicroseconds`. ClickHouse also tracks the hardware CPU metrics such as the number of branch or cache misses, using the Linux `perf_event_open` API. After downloading the test output archive, we can use a simple ad hoc [script](https://gist.github.com/akuzm/bb28a442f882349e0a5ec2b5262b97d0) to build some statistics and graphs of these metrics.
<img src="https://blog-images.clickhouse.tech/en/2021/performance-testing-1/sum_8_scatter_2d_PerfBranchInstructions_per_client_time.png"/>
This graph shows the number of branch instructions per second, on the old and the new server. We can see that the number of branch instructions has dramatically decreased, which might explain the performance difference. The tested pull request removes some `if`s and replaces them with multiplication, so this explanation sounds plausible.
While side-to-side comparison is more robust against the systemic errors, the historical data is still very valuable for finding where a regression was introduced or investigating the unstable test queries. This is why we save the results of all test runs into a ClickHouse database. Let's consider the same query #8 from the `sum` test. We can build the history of performance changes with this [SQL query](https://play-ci.clickhouse.tech/play?user=play#V0lUSCAwLjA1IEFTIHMKU0VMRUNUIG9sZF9zaGEsIG5ld19zaGEsIGV2ZW50X3RpbWUsIG1lc3NhZ2UsIG9sZF92YWx1ZSBBUyBgb2xkIHNlcnZlcmAsICAgbmV3X3ZhbHVlIEFTIGBuZXcgc2VydmVyYCwgYmVmb3JlIEFTIGBwcmV2IDExIHJ1bnNgLCBhZnRlciBBUyBgbmV4dCAxMSBydW5zYCwgICAgZGlmZiBBUyBgZGlmZiwgcmF0aW9gLCBzdGF0X3RocmVzaG9sZF9oaXN0b3JpY2FsIEFTIGBzdGF0IHRocmVzaG9sZCwgcmF0aW8sIGhpc3RvcmljYWxgLCBzdGF0X3RocmVzaG9sZCBBUyBgc3RhdCB0aHJlc2hvbGQsIHJhdGlvLCBwZXItcnVuYCwgY3B1X21vZGVsLHF1ZXJ5X2Rpc3BsYXlfbmFtZQpGUk9NIAooU0VMRUNUICosIHJ1bl9hdHRyaWJ1dGVzX3YxLnZhbHVlIEFTIGNwdV9tb2RlbCwKICAgICAgICBtZWRpYW4ob2xkX3ZhbHVlKSBPVkVSIChQQVJUSVRJT04gQlkgcnVuX2F0dHJpYnV0ZXNfdjEudmFsdWUsIHRlc3QsIHF1ZXJ5X2luZGV4LCBxdWVyeV9kaXNwbGF5X25hbWUgT1JERVIgQlkgZXZlbnRfZGF0ZSBBU0MgUk9XUyBCRVRXRUVOIDExIFBSRUNFRElORyBBTkQgQ1VSUkVOVCBST1cpIEFTIGJlZm9yZSwKICAgICAgICBtZWRpYW4obmV3X3ZhbHVlKSBPVkVSIChQQVJUSVRJT04gQlkgcnVuX2F0dHJpYnV0ZXNfdjEudmFsdWUsIHRlc3QsIHF1ZXJ5X2luZGV4LCBxdWVyeV9kaXNwbGF5X25hbWUgT1JERVIgQlkgZXZlbnRfZGF0ZSBBU0MgUk9XUyBCRVRXRUVOIENVUlJFTlQgUk9XIEFORCAxMSBGT0xMT1dJTkcpIEFTIGFmdGVyLAogICAgICAgIHF1YW50aWxlRXhhY3QoMC45NSkoYWJzKGRpZmYpKSBPVkVSIChQQVJUSVRJT04gQlkgcnVuX2F0dHJpYnV0ZXNfdjEudmFsdWUsIHRlc3QsIHF1ZXJ5X2luZGV4LCBxdWVyeV9kaXNwbGF5X25hbWUgT1JERVIgQlkgZXZlbnRfZGF0ZSBBU0MgUk9XUyBCRVRXRUVOIDM3IFBSRUNFRElORyBBTkQgQ1VSUkVOVCBST1cpIEFTIHN0YXRfdGhyZXNob2xkX2hpc3RvcmljYWwKICAgIEZST00gcGVyZnRlc3QucXVlcnlfbWV0cmljc192MgogICAgTEVGVCBKT0lOIHBlcmZ0ZXN0LnJ1bl9hdHRyaWJ1dGVzX3YxIFVTSU5HIChvbGRfc2hhLCBuZXdfc2hhKQogICAgV0hFUkUgKGF0dHJpYnV0ZSA9ICdsc2NwdS1tb2RlbC1uYW1lJykgQU5EIChtZXRyaWMgPSAnY2xpZW50X3RpbWUnKQogICAgICAgIC0tIG9ubHkgZm9yIGNvbW1pdHMgaW4gbWFzdGVyCiAgICAgICAgQU5EIChwcl9udW1iZXIgPSAwKQogICAgICAgIC0tIHNlbGVjdCB0aGUgcXVlcmllcyB3ZSBhcmUgaW50ZXJlc3RlZCBpbgogICAgICAgIEFORCAodGVzdCA9ICdzdW0nKSBBTkQgKHF1ZXJ5X2luZGV4ID0gOCkKKSBBUyB0CkFOWSBMRUZUIEpPSU4gYGdoLWRhdGFgLmNvbW1pdHMgT04gbmV3X3NoYSA9IHNoYQpXSEVSRQogICAgLS0gQ2hlY2sgZm9yIGEgcGVyc2lzdGVudCBhbmQgc2lnbmlmaWNhbnQgY2hhbmdlIGluIHF1ZXJ5IHJ1biB0aW1lLCBpbnRyb2R1Y2VkIGJ5IGEgY29tbWl0OgogICAgLS0gMSkgb24gYSBoaXN0b3JpY2FsIGdyYXBoIG9mIHF1ZXJ5IHJ1biB0aW1lLCB0aGVyZSBpcyBhIHN0ZXAgYmV0d2VlbiB0aGUgYWRqYWNlbnQgY29tbWl0cywKICAgIC0tIHRoYXQgaXMgaGlnaGVyIHRoYW4gdGhlIG5vcm1hbCB2YXJpYW5jZSwKICAgICgoKGFicyhhZnRlciAtIGJlZm9yZSkgLyBpZihhZnRlciA+IGJlZm9yZSwgYWZ0ZXIsIGJlZm9yZSkpIEFTIHN0ZXBfaGVpZ2h0KSA+PSBncmVhdGVzdChzLCBzdGF0X3RocmVzaG9sZF9oaXN0b3JpY2FsKSkKICAgIC0tIDIpIGluIHNpZGUtdG8tc2lkZSBjb21wYXJpc29uIG9mIHRoZXNlIHR3byBjb21taXRzLCB0aGVyZSB3YXMgYSBzdGF0aXN0aWNhbGx5IHNpZ25pZmljYW50IGRpZmZlcmVuY2UKICAgIC0tIHRoYXQgaXMgYWxzbyBoaWdoZXIgdGhhbiB0aGUgbm9ybWFsIHZhcmlhbmNlLAogICAgICAgIEFORCAoYWJzKGRpZmYpID49IGdyZWF0ZXN0KHN0YXRfdGhyZXNob2xkLCBzdGF0X3RocmVzaG9sZF9oaXN0b3JpY2FsLCBzKSkKICAgIC0tIDMpIGZpbmFsbHksIHRoaXMgc2lkZS10by1zaWRlIGRpZmZlcmVuY2UgaXMgb2YgbWFnbml0dWRlIGNvbXBhcmFibGUgdG8gdGhlIHN0ZXAgaW4gaGlzdG9yaWNhbCBncmFwaHMuCiAgICAgICAgQU5EIChhYnMoZGlmZikgPj0gKDAuNyAqIHN0ZXBfaGVpZ2h0KSkKb3JkZXIgYnkgZXZlbnRfdGltZSBkZXNjCmZvcm1hdCBWZXJ0aWNhbAoKCg==) to the live ClickHouse CI database. Open the link and run the query so that you can examine the query and see the result for yourself. There were three significant changes of performance throughout the test history. The most recent is a speedup in PR we started with. The second speedup is related to fully switching to clang 11. Curiously, there is also a small slowdown introduced by a PR that was supposed to speed it up instead.
# Usability Considerations
Regardless of how it works inside, a test system must be actually usable as a part of the development process. First and foremost, the false positive rate should be as low as possible. False positives are costly to investigate, and if they happen often, developers perceive the test as generally unreliable and tend to ignore the true positives as well. The test must also provide a concise report that makes it obvious what went wrong. We have not really succeeded in this. This test has many more failure modes than a plain functional test, and worse, some of these failures are quantitative, not binary. Much of the complexity is essential, and we try to alleviate it by providing good documentation and linking to the relevant parts of it right from the report page. Another important thing is that the user must be able to investigate a problematic query post-mortem, without running it again locally. This is why we try to export every metric and every intermediate result we have, in easily-manipulated plain text formats.
Organizationally, it is hard to prevent devolving into a system that does a lot of busywork to just show a green check without giving any insight. I like to call this process "mining the green check", by analogy to cryptocurrencies. Our previous system did just that. It used increasingly complex heuristics tailored to each test query to prevent false positives, restarted itself many times if the results didn't look good, and so on. Ultimately, it wasted a lot of processing power without giving the real picture of the server performance. If you wanted to be sure that the performance did or did not change, you had to recheck by hand. This sorry state is the result of how the incentives are aligned around development &mdash; most of the time, the developers just want to merge their pull requests and not be bothered by some obscure test failures. Writing a good performance test query is also not always simple. Just any other query won't do &mdash; it has to give predictable performance, be not too fast and not too slow, actually measure something, and so on. After gathering more precise statistics, we discovered that several hundred of our test queries don't measure anything meaningful, e.g. they give a result that varies by 100% between runs. Another problem is that the performance often changes in statistically significant ways (true positive) with no relevant code changes (due to e.g. random differences in layout of the executable). Given all these difficulties, a working performance test system is bound to add noticeable friction to the development process. Most of the "obvious" ways to remove this friction ultimately boil down to "mining the green check".
Implementation-wise, our system is peculiar in that it doesn't rely on well-known statistical packages, but instead heavily uses `clickhouse-local`, a tool that turns the ClickHouse SQL query processor into a [command line utility](https://altinity.com/blog/2019/6/11/clickhouse-local-the-power-of-clickhouse-sql-in-a-single-command). Doing all the computations in ClickHouse SQL helped us find bugs and usability problems with `clickhouse-local`. The performance test continues to work in dual purpose as a heavy SQL test, and sometimes catches newly introduced bugs in complex joins and the like. The query profiler is always on in the performance tests, and this finds bugs in our fork of `libunwind`. To run the test queries, we use a third-party [Python driver](https://github.com/mymarilyn/clickhouse-driver). This is the only use of this driver in our CI, and it also helped us find some bugs in native protocol handling. A not so honorable fact is that the scaffolding consists of an unreasonable amount of bash, but this at least served to convince us that running [shellcheck](https://github.com/koalaman/shellcheck) in CI is very helpful.
This concludes the overview of the ClickHouse performance test system. Stay tuned for the next article where we will discuss the most problematic kind of a performance test failure &mdash; the unstable query run time.
_2021-08-20 [Alexander Kuzmenkov](https://github.com/akuzm). Title photo by [Alexander Tokmakov](https://github.com/tavplubix)_
References:
<a id="ref1"/>1. Box, Hunter, Hunter, 2005. Statistics for experimenters, p. 78: A Randomized Design Used in the Comparison of Standard and Modified Fertilizer Mixtures for Tomato Plants.