Merge branch 'master' into deb-bump-clang

This commit is contained in:
mergify[bot] 2021-09-21 16:18:16 +00:00 committed by GitHub
commit ea83c4df63
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
159 changed files with 3816 additions and 1680 deletions

3
.gitignore vendored
View File

@ -33,6 +33,9 @@
/docs/ja/single.md
/docs/fa/single.md
/docs/en/development/cmake-in-clickhouse.md
/docs/ja/development/cmake-in-clickhouse.md
/docs/zh/development/cmake-in-clickhouse.md
/docs/ru/development/cmake-in-clickhouse.md
# callgrind files
callgrind.out.*

View File

@ -1,4 +1,4 @@
Copyright 2016-2021 Yandex LLC
Copyright 2016-2021 ClickHouse, Inc.
Apache License
Version 2.0, January 2004

View File

@ -28,15 +28,16 @@ The following versions of ClickHouse server are currently being supported with s
| 21.3 | ✅ |
| 21.4 | :x: |
| 21.5 | :x: |
| 21.6 | |
| 21.6 | :x: |
| 21.7 | ✅ |
| 21.8 | ✅ |
| 21.9 | ✅ |
## Reporting a Vulnerability
We're extremely grateful for security researchers and users that report vulnerabilities to the ClickHouse Open Source Community. All reports are thoroughly investigated by developers.
To report a potential vulnerability in ClickHouse please send the details about it to [clickhouse-feedback@yandex-team.com](mailto:clickhouse-feedback@yandex-team.com).
To report a potential vulnerability in ClickHouse please send the details about it to [security@clickhouse.com](mailto:security@clickhouse.com).
### When Should I Report a Vulnerability?

View File

@ -70,6 +70,9 @@ def compress_stress_logs(output_path, files_prefix):
def prepare_for_hung_check(drop_databases):
# FIXME this function should not exist, but...
# ThreadFuzzer significantly slows down server and causes false-positive hung check failures
call("clickhouse client -q 'SYSTEM STOP THREAD FUZZER'", shell=True, stderr=STDOUT)
# We attach gdb to clickhouse-server before running tests
# to print stacktraces of all crashes even if clickhouse cannot print it for some reason.
# However, it obstruct checking for hung queries.

View File

@ -23,6 +23,20 @@ ENGINE = MaterializedPostgreSQL('host:port', ['database' | database], 'user', 'p
- `user` — PostgreSQL user.
- `password` — User password.
## Dynamically adding new tables to replication
``` sql
ATTACH TABLE postgres_database.new_table;
```
It will work as well if there is a setting `materialized_postgresql_tables_list`.
## Dynamically removing tables from replication
``` sql
DETACH TABLE postgres_database.table_to_remove;
```
## Settings {#settings}
- [materialized_postgresql_max_block_size](../../operations/settings/settings.md#materialized-postgresql-max-block-size)
@ -44,6 +58,12 @@ SETTINGS materialized_postgresql_max_block_size = 65536,
SELECT * FROM database1.table1;
```
It is also possible to change settings at run time.
``` sql
ALTER DATABASE postgres_database MODIFY SETTING materialized_postgresql_max_block_size = <new_size>;
```
## Requirements {#requirements}
1. The [wal_level](https://www.postgresql.org/docs/current/runtime-config-wal.html) setting must have a value `logical` and `max_replication_slots` parameter must have a value at least `2` in the PostgreSQL config file.

View File

@ -2833,6 +2833,43 @@ Possible values:
Default value: `1`.
## output_format_csv_null_representation {#output_format_csv_null_representation}
Defines the representation of `NULL` for [CSV](../../interfaces/formats.md#csv) output format. User can set any string as a value, for example, `My NULL`.
Default value: `\N`.
**Examples**
Query
```sql
SELECT * from csv_custom_null FORMAT CSV;
```
Result
```text
788
\N
\N
```
Query
```sql
SET output_format_csv_null_representation = 'My NULL';
SELECT * FROM csv_custom_null FORMAT CSV;
```
Result
```text
788
My NULL
My NULL
```
## output_format_tsv_null_representation {#output_format_tsv_null_representation}
Defines the representation of `NULL` for [TSV](../../interfaces/formats.md#tabseparated) output format. User can set any string as a value, for example, `My NULL`.
@ -3306,7 +3343,7 @@ Result:
└─────┘
```
## optimize_fuse_sum_count_avg {#optimize_fuse_sum_count_avg}
## optimize_syntax_fuse_functions {#optimize_syntax_fuse_functions}
Enables to fuse aggregate functions with identical argument. It rewrites query contains at least two aggregate functions from [sum](../../sql-reference/aggregate-functions/reference/sum.md#agg_function-sum), [count](../../sql-reference/aggregate-functions/reference/count.md#agg_function-count) or [avg](../../sql-reference/aggregate-functions/reference/avg.md#agg_function-avg) with identical argument to [sumCount](../../sql-reference/aggregate-functions/reference/sumcount.md#agg_function-sumCount).
@ -3323,7 +3360,7 @@ Query:
``` sql
CREATE TABLE fuse_tbl(a Int8, b Int8) Engine = Log;
SET optimize_fuse_sum_count_avg = 1;
SET optimize_syntax_fuse_functions = 1;
EXPLAIN SYNTAX SELECT sum(a), sum(b), count(b), avg(b) from fuse_tbl FORMAT TSV;
```

View File

@ -43,4 +43,4 @@ Result:
**See also**
- [optimize_fuse_sum_count_avg](../../../operations/settings/settings.md#optimize_fuse_sum_count_avg) setting.
- [optimize_syntax_fuse_functions](../../../operations/settings/settings.md#optimize_syntax_fuse_functions) setting.

View File

@ -1 +0,0 @@
../../en/development/cmake-in-clickhouse.md

View File

@ -1 +0,0 @@
../../en/development/cmake-in-clickhouse.md

View File

@ -94,7 +94,7 @@ ClickHouse Keeper может использоваться как равноце
## Как запустить
ClickHouse Keeper входит в пакет` clickhouse-server`, просто добавьте кофигурацию `<keeper_server>` и запустите сервер ClickHouse как обычно. Если вы хотите запустить ClickHouse Keeper автономно, сделайте это аналогичным способом:
ClickHouse Keeper входит в пакет `clickhouse-server`, просто добавьте кофигурацию `<keeper_server>` и запустите сервер ClickHouse как обычно. Если вы хотите запустить ClickHouse Keeper автономно, сделайте это аналогичным способом:
```bash
clickhouse-keeper --config /etc/your_path_to_config/config.xml --daemon

View File

@ -3122,7 +3122,7 @@ SELECT * FROM test LIMIT 10 OFFSET 100;
Значение по умолчанию: `1800`.
## optimize_fuse_sum_count_avg {#optimize_fuse_sum_count_avg}
## optimize_syntax_fuse_functions {#optimize_syntax_fuse_functions}
Позволяет объединить агрегатные функции с одинаковым аргументом. Запрос, содержащий по крайней мере две агрегатные функции: [sum](../../sql-reference/aggregate-functions/reference/sum.md#agg_function-sum), [count](../../sql-reference/aggregate-functions/reference/count.md#agg_function-count) или [avg](../../sql-reference/aggregate-functions/reference/avg.md#agg_function-avg) с одинаковым аргументом, перезаписывается как [sumCount](../../sql-reference/aggregate-functions/reference/sumcount.md#agg_function-sumCount).
@ -3139,7 +3139,7 @@ SELECT * FROM test LIMIT 10 OFFSET 100;
``` sql
CREATE TABLE fuse_tbl(a Int8, b Int8) Engine = Log;
SET optimize_fuse_sum_count_avg = 1;
SET optimize_syntax_fuse_functions = 1;
EXPLAIN SYNTAX SELECT sum(a), sum(b), count(b), avg(b) from fuse_tbl FORMAT TSV;
```

View File

@ -43,4 +43,4 @@ SELECT sumCount(x) from s_table;
**Смотрите также**
- Настройка [optimize_fuse_sum_count_avg](../../../operations/settings/settings.md#optimize_fuse_sum_count_avg)
- Настройка [optimize_syntax_fuse_functions](../../../operations/settings/settings.md#optimize_syntax_fuse_functions)

View File

@ -51,7 +51,7 @@ def build_for_lang(lang, args):
if args.htmlproofer:
plugins.append('htmlproofer')
website_url = 'https://clickhouse.tech'
website_url = 'https://clickhouse.com'
site_name = site_names.get(lang, site_names['en'])
blog_nav, post_meta = nav.build_blog_nav(lang, args)
raw_config = dict(
@ -62,7 +62,7 @@ def build_for_lang(lang, args):
strict=True,
theme=theme_cfg,
nav=blog_nav,
copyright='©20162021 Yandex LLC',
copyright='©20162021 ClickHouse, Inc.',
use_directory_urls=True,
repo_name='ClickHouse/ClickHouse',
repo_url='https://github.com/ClickHouse/ClickHouse/',

View File

@ -203,6 +203,7 @@ if __name__ == '__main__':
arg_parser.add_argument('--verbose', action='store_true')
args = arg_parser.parse_args()
args.minify = False # TODO remove
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,

View File

@ -155,6 +155,12 @@ def generate_cmake_flags_files() -> None:
with open(footer_file_name, "r") as footer:
f.write(footer.read())
other_languages = ["docs/ja/development/cmake-in-clickhouse.md",
"docs/zh/development/cmake-in-clickhouse.md",
"docs/ru/development/cmake-in-clickhouse.md"]
for lang in other_languages:
os.symlink(output_file_name, os.path.join(root_path, lang))
if __name__ == '__main__':
generate_cmake_flags_files()

View File

@ -219,7 +219,10 @@ def build_single_page_version(lang, args, nav, cfg):
]
logging.info(' '.join(create_pdf_command))
subprocess.check_call(' '.join(create_pdf_command), shell=True)
try:
subprocess.check_call(' '.join(create_pdf_command), shell=True)
except:
pass # TODO: fix pdf issues
logging.info(f'Finished building single page version for {lang}')

View File

@ -215,10 +215,12 @@ def minify_file(path, css_digest, js_digest):
content = minify_html(content)
content = content.replace('base.css?css_digest', f'base.css?{css_digest}')
content = content.replace('base.js?js_digest', f'base.js?{js_digest}')
elif path.endswith('.css'):
content = cssmin.cssmin(content)
elif path.endswith('.js'):
content = jsmin.jsmin(content)
# TODO: restore cssmin
# elif path.endswith('.css'):
# content = cssmin.cssmin(content)
# TODO: restore jsmin
# elif path.endswith('.js'):
# content = jsmin.jsmin(content)
with open(path, 'wb') as f:
f.write(content.encode('utf-8'))
@ -240,7 +242,7 @@ def minify_website(args):
js_in = get_js_in(args)
js_out = f'{args.output_dir}/js/base.js'
if args.minify:
if args.minify and False: # TODO: return closure
js_in = [js[1:-1] for js in js_in]
closure_args = [
'--js', *js_in, '--js_output_file', js_out,

View File

@ -1 +0,0 @@
../../en/development/cmake-in-clickhouse.md

View File

@ -1964,7 +1964,7 @@ UInt64 ClusterCopier::executeQueryOnCluster(
}
catch (...)
{
LOG_WARNING(log, "Seemns like node with address {} is unreachable.", node.host_name);
LOG_WARNING(log, "Node with address {} seems to be unreachable.", node.host_name);
continue;
}

View File

@ -306,6 +306,7 @@ try
attachInformationSchema(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::INFORMATION_SCHEMA));
attachInformationSchema(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE));
loadMetadata(global_context);
startupSystemTables();
DatabaseCatalog::instance().loadDatabases();
LOG_DEBUG(log, "Loaded metadata.");
}

View File

@ -918,7 +918,7 @@ if (ThreadFuzzer::instance().isEffective())
global_context,
settings.async_insert_threads,
settings.async_insert_max_data_size,
AsynchronousInsertQueue::Timeout{.busy = settings.async_insert_busy_timeout, .stale = settings.async_insert_stale_timeout}));
AsynchronousInsertQueue::Timeout{.busy = settings.async_insert_busy_timeout_ms, .stale = settings.async_insert_stale_timeout_ms}));
/// Size of cache for marks (index of MergeTree family of tables). It is mandatory.
size_t mark_cache_size = config().getUInt64("mark_cache_size");
@ -1116,6 +1116,7 @@ if (ThreadFuzzer::instance().isEffective())
database_catalog.loadMarkedAsDroppedTables();
/// Then, load remaining databases
loadMetadata(global_context, default_database);
startupSystemTables();
database_catalog.loadDatabases();
/// After loading validate that default database exists
database_catalog.assertDatabaseExists(default_database);

View File

@ -72,7 +72,10 @@ enum class AccessType
M(ALTER_FETCH_PARTITION, "ALTER FETCH PART, FETCH PARTITION", TABLE, ALTER_TABLE) \
M(ALTER_FREEZE_PARTITION, "FREEZE PARTITION, UNFREEZE", TABLE, ALTER_TABLE) \
\
M(ALTER_DATABASE_SETTINGS, "ALTER DATABASE SETTING, ALTER MODIFY DATABASE SETTING, MODIFY DATABASE SETTING", DATABASE, ALTER_DATABASE) /* allows to execute ALTER MODIFY SETTING */\
\
M(ALTER_TABLE, "", GROUP, ALTER) \
M(ALTER_DATABASE, "", GROUP, ALTER) \
\
M(ALTER_VIEW_REFRESH, "ALTER LIVE VIEW REFRESH, REFRESH VIEW", VIEW, ALTER_VIEW) \
M(ALTER_VIEW_MODIFY_QUERY, "ALTER TABLE MODIFY QUERY", VIEW, ALTER_VIEW) \

View File

@ -160,26 +160,29 @@ namespace
if (args.size() <= db_name_index)
return;
String db_name = evaluateConstantExpressionForDatabaseName(args[db_name_index], data.context)->as<ASTLiteral &>().value.safeGet<String>();
String name = evaluateConstantExpressionForDatabaseName(args[db_name_index], data.context)->as<ASTLiteral &>().value.safeGet<String>();
String table_name;
size_t table_name_index = static_cast<size_t>(-1);
size_t dot = String::npos;
if (function.name != "Distributed")
dot = db_name.find('.');
if (dot != String::npos)
{
table_name = db_name.substr(dot + 1);
db_name.resize(dot);
}
QualifiedTableName qualified_name;
if (function.name == "Distributed")
qualified_name.table = name;
else
qualified_name = QualifiedTableName::parseFromString(name);
if (qualified_name.database.empty())
{
std::swap(qualified_name.database, qualified_name.table);
table_name_index = 2;
if (args.size() <= table_name_index)
return;
table_name = evaluateConstantExpressionForDatabaseName(args[table_name_index], data.context)->as<ASTLiteral &>().value.safeGet<String>();
qualified_name.table = evaluateConstantExpressionForDatabaseName(args[table_name_index], data.context)->as<ASTLiteral &>().value.safeGet<String>();
}
const String & db_name = qualified_name.database;
const String & table_name = qualified_name.table;
if (db_name.empty() || table_name.empty())
return;

View File

@ -586,6 +586,8 @@
M(616, UNKNOWN_READ_METHOD) \
M(617, LZ4_ENCODER_FAILED) \
M(618, LZ4_DECODER_FAILED) \
M(619, POSTGRESQL_REPLICATION_INTERNAL_ERROR) \
M(620, QUERY_NOT_ALLOWED) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \

View File

@ -128,6 +128,9 @@ void ThreadFuzzer::initConfiguration()
bool ThreadFuzzer::isEffective() const
{
if (!isStarted())
return false;
#if THREAD_FUZZER_WRAP_PTHREAD
# define CHECK_WRAPPER_PARAMS(RET, NAME, ...) \
if (NAME##_before_yield_probability.load(std::memory_order_relaxed)) \
@ -159,6 +162,20 @@ bool ThreadFuzzer::isEffective() const
|| (sleep_probability > 0 && sleep_time_us > 0));
}
void ThreadFuzzer::stop()
{
started.store(false, std::memory_order_relaxed);
}
void ThreadFuzzer::start()
{
started.store(true, std::memory_order_relaxed);
}
bool ThreadFuzzer::isStarted()
{
return started.load(std::memory_order_relaxed);
}
static void injection(
double yield_probability,
@ -166,6 +183,10 @@ static void injection(
double sleep_probability,
double sleep_time_us [[maybe_unused]])
{
DENY_ALLOCATIONS_IN_SCOPE;
if (!ThreadFuzzer::isStarted())
return;
if (yield_probability > 0
&& std::bernoulli_distribution(yield_probability)(thread_local_rng))
{

View File

@ -1,6 +1,6 @@
#pragma once
#include <cstdint>
#include <atomic>
namespace DB
{
@ -54,6 +54,10 @@ public:
bool isEffective() const;
static void stop();
static void start();
static bool isStarted();
private:
uint64_t cpu_time_period_us = 0;
double yield_probability = 0;
@ -61,6 +65,8 @@ private:
double sleep_probability = 0;
double sleep_time_us = 0;
inline static std::atomic<bool> started{true};
ThreadFuzzer();
void initConfiguration();

View File

@ -10,6 +10,7 @@
#include <common/logger_useful.h>
#include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h>
#include <Common/Stopwatch.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/ZooKeeper/ZooKeeperConstants.h>
#include <unistd.h>
@ -277,6 +278,8 @@ public:
void setZooKeeperLog(std::shared_ptr<DB::ZooKeeperLog> zk_log_);
UInt32 getSessionUptime() const { return session_uptime.elapsedSeconds(); }
private:
friend class EphemeralNodeHolder;
@ -307,6 +310,8 @@ private:
Poco::Logger * log = nullptr;
std::shared_ptr<DB::ZooKeeperLog> zk_log;
AtomicStopwatch session_uptime;
};

View File

@ -575,12 +575,24 @@ void deserializeLogsAndApplyToStorage(KeeperStorage & storage, const std::string
LOG_INFO(log, "Totally have {} logs", existing_logs.size());
for (auto [zxid, log_path] : existing_logs)
std::vector<std::string> stored_files;
for (auto it = existing_logs.rbegin(); it != existing_logs.rend(); ++it)
{
if (zxid > storage.zxid)
deserializeLogAndApplyToStorage(storage, log_path, log);
else
LOG_INFO(log, "Skipping log {}, it's ZXID {} is smaller than storages ZXID {}", log_path, zxid, storage.zxid);
if (it->first >= storage.zxid)
{
stored_files.emplace_back(it->second);
}
else if (it->first < storage.zxid)
{
/// add the last logfile that is less than the zxid
stored_files.emplace_back(it->second);
break;
}
}
for (auto it = stored_files.rbegin(); it != stored_files.rend(); ++it)
{
deserializeLogAndApplyToStorage(storage, *it, log);
}
}

View File

@ -1,9 +1,9 @@
#include "Connection.h"
#if USE_LIBPQXX
#include <common/logger_useful.h>
namespace postgres
{
@ -42,7 +42,6 @@ void Connection::execWithRetry(const std::function<void(pqxx::nontransaction &)>
pqxx::connection & Connection::getRef()
{
connect();
assert(connection != nullptr);
return *connection;
}

View File

@ -19,6 +19,16 @@ ConnectionInfo formatConnectionString(String dbname, String host, UInt16 port, S
return std::make_pair(out.str(), host + ':' + DB::toString(port));
}
String formatNameForLogs(const String & postgres_database_name, const String & postgres_table_name)
{
/// Logger for StorageMaterializedPostgreSQL - both db and table names.
/// Logger for PostgreSQLReplicationHandler and Consumer - either both db and table names or only db name.
assert(!postgres_database_name.empty());
if (postgres_table_name.empty())
return postgres_database_name;
return postgres_database_name + '.' + postgres_table_name;
}
}
#endif

View File

@ -19,7 +19,11 @@ namespace pqxx
namespace postgres
{
ConnectionInfo formatConnectionString(String dbname, String host, UInt16 port, String user, String password);
String formatNameForLogs(const String & postgres_database_name, const String & postgres_table_name);
}
#endif

View File

@ -2,11 +2,20 @@
#include <string>
#include <tuple>
#include <optional>
#include <Common/Exception.h>
#include <Common/SipHash.h>
#include <Common/quoteString.h>
#include <fmt/format.h>
namespace DB
{
namespace ErrorCodes
{
extern const int SYNTAX_ERROR;
}
//TODO replace with StorageID
struct QualifiedTableName
{
@ -30,6 +39,46 @@ struct QualifiedTableName
hash_state.update(table.data(), table.size());
return hash_state.get64();
}
/// NOTE: It's different from compound identifier parsing and does not support escaping and dots in name.
/// Usually it's better to use ParserIdentifier instead,
/// but we parse DDL dictionary name (and similar things) this way for historical reasons.
static std::optional<QualifiedTableName> tryParseFromString(const String & maybe_qualified_name)
{
if (maybe_qualified_name.empty())
return {};
/// Do not allow dot at the beginning and at the end
auto pos = maybe_qualified_name.find('.');
if (pos == 0 || pos == (maybe_qualified_name.size() - 1))
return {};
QualifiedTableName name;
if (pos == std::string::npos)
{
name.table = std::move(maybe_qualified_name);
}
else if (maybe_qualified_name.find('.', pos + 1) != std::string::npos)
{
/// Do not allow multiple dots
return {};
}
else
{
name.database = maybe_qualified_name.substr(0, pos);
name.table = maybe_qualified_name.substr(pos + 1);
}
return name;
}
static QualifiedTableName parseFromString(const String & maybe_qualified_name)
{
auto name = tryParseFromString(maybe_qualified_name);
if (!name)
throw Exception(ErrorCodes::SYNTAX_ERROR, "Invalid qualified name: {}", maybe_qualified_name);
return *name;
}
};
}
@ -47,5 +96,23 @@ template <> struct hash<DB::QualifiedTableName>
return qualified_table.hash();
}
};
}
namespace fmt
{
template <>
struct formatter<DB::QualifiedTableName>
{
constexpr auto parse(format_parse_context & ctx)
{
return ctx.begin();
}
template <typename FormatContext>
auto format(const DB::QualifiedTableName & name, FormatContext & ctx)
{
return format_to(ctx.out(), "{}.{}", DB::backQuoteIfNeed(name.database), DB::backQuoteIfNeed(name.table));
}
};
}

View File

@ -455,7 +455,7 @@ class IColumn;
M(Bool, allow_non_metadata_alters, true, "Allow to execute alters which affects not only tables metadata, but also data on disk", 0) \
M(Bool, enable_global_with_statement, true, "Propagate WITH statements to UNION queries and all subqueries", 0) \
M(Bool, aggregate_functions_null_for_empty, false, "Rewrite all aggregate functions in a query, adding -OrNull suffix to them", 0) \
M(Bool, optimize_fuse_sum_count_avg, false, "Fuse aggregate functions sum(), avg(), count() with identical arguments into one sumCount() call, if the query has at least two different functions", 0) \
M(Bool, optimize_syntax_fuse_functions, false, "Fuse aggregate functions (`sum, avg, count` with identical arguments into one `sumCount`, quantile-family functions with the same argument into `quantiles*(...)[...]`)", 0) \
M(Bool, flatten_nested, true, "If true, columns of type Nested will be flatten to separate array columns instead of one array of tuples", 0) \
M(Bool, asterisk_include_materialized_columns, false, "Include MATERIALIZED columns for wildcard query", 0) \
M(Bool, asterisk_include_alias_columns, false, "Include ALIAS columns for wildcard query", 0) \
@ -507,6 +507,14 @@ class IColumn;
M(Bool, remote_filesystem_read_prefetch, true, "Should use prefetching when reading data from remote filesystem.", 0) \
M(Int64, read_priority, 0, "Priority to read data from local filesystem. Only supported for 'pread_threadpool' method.", 0) \
\
M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background. Zero means asynchronous mode is disabled", 0) \
M(Bool, async_insert, false, "If true, data from INSERT query is stored in queue and later flushed to table in background. Makes sense only for inserts via HTTP protocol. If wait_for_async_insert is false, INSERT query is processed almost instantly, otherwise client will wait until data will be flushed to table", 0) \
M(Bool, wait_for_async_insert, true, "If true wait for processing of asynchronous insertion", 0) \
M(Seconds, wait_for_async_insert_timeout, DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC, "Timeout for waiting for processing asynchronous insertion", 0) \
M(UInt64, async_insert_max_data_size, 100000, "Maximum size in bytes of unparsed data collected per query before being inserted", 0) \
M(Milliseconds, async_insert_busy_timeout_ms, 200, "Maximum time to wait before dumping collected data per query since the first data appeared", 0) \
M(Milliseconds, async_insert_stale_timeout_ms, 0, "Maximum time to wait before dumping collected data per query since the last data appeared. Zero means no timeout at all", 0) \
\
M(Int64, remote_disk_read_backoff_threashold, 10000, "Max wait time when trying to read data for remote disk", 0) \
M(Int64, remote_disk_read_backoff_max_tries, 5, "Max attempts to read with backoff", 0) \
\
@ -525,6 +533,7 @@ class IColumn;
M(HandleKafkaErrorMode, handle_kafka_error_mode, HandleKafkaErrorMode::DEFAULT, "Obsolete setting, does nothing.", 0) \
M(Bool, database_replicated_ddl_output, true, "Obsolete setting, does nothing.", 0) \
M(UInt64, replication_alter_columns_timeout, 60, "Obsolete setting, does nothing.", 0) \
M(Bool, optimize_fuse_sum_count_avg, false, "Obsolete, use optimize_syntax_fuse_functions", 0) \
/** The section above is for obsolete settings. Do not add anything there. */
@ -577,6 +586,7 @@ class IColumn;
M(String, output_format_avro_codec, "", "Compression codec used for output. Possible values: 'null', 'deflate', 'snappy'.", 0) \
M(UInt64, output_format_avro_sync_interval, 16 * 1024, "Sync interval in bytes.", 0) \
M(Bool, output_format_tsv_crlf_end_of_line, false, "If it is set true, end of line in TSV format will be \\r\\n instead of \\n.", 0) \
M(String, output_format_csv_null_representation, "\\N", "Custom NULL representation in CSV format", 0) \
M(String, output_format_tsv_null_representation, "\\N", "Custom NULL representation in TSV format", 0) \
M(Bool, output_format_decimal_trailing_zeros, false, "Output trailing zeros when printing Decimal values. E.g. 1.230000 instead of 1.23.", 0) \
\
@ -605,14 +615,6 @@ class IColumn;
M(Bool, output_format_pretty_row_numbers, false, "Add row numbers before each row for pretty output format", 0) \
M(Bool, insert_distributed_one_random_shard, false, "If setting is enabled, inserting into distributed table will choose a random shard to write when there is no sharding key", 0) \
\
M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background. Zero means asynchronous mode is disabled", 0) \
M(Bool, async_insert_mode, false, "Insert query is processed almost instantly, but an actual data queued for later asynchronous insertion", 0) \
M(Bool, wait_for_async_insert, true, "If true wait for processing of asynchronous insertion", 0) \
M(Seconds, wait_for_async_insert_timeout, DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC, "Timeout for waiting for processing asynchronous insertion", 0) \
M(UInt64, async_insert_max_data_size, 1000000, "Maximum size in bytes of unparsed data collected per query before being inserted", 0) \
M(Milliseconds, async_insert_busy_timeout, 200, "Maximum time to wait before dumping collected data per query since the first data appeared", 0) \
M(Milliseconds, async_insert_stale_timeout, 0, "Maximum time to wait before dumping collected data per query since the last data appeared. Zero means no timeout at all", 0) \
\
M(Bool, cross_to_inner_join_rewrite, true, "Use inner join instead of comma/cross join if possible", 0) \
\
M(Bool, output_format_arrow_low_cardinality_as_dictionary, false, "Enable output LowCardinality type as Dictionary Arrow type", 0) \

View File

@ -327,7 +327,7 @@ void SerializationNullable::serializeTextCSV(const IColumn & column, size_t row_
const ColumnNullable & col = assert_cast<const ColumnNullable &>(column);
if (col.isNullAt(row_num))
writeCString("\\N", ostr);
writeString(settings.csv.null_representation, ostr);
else
nested->serializeTextCSV(col.getNestedColumn(), row_num, ostr, settings);
}

View File

@ -0,0 +1,103 @@
#include <Databases/DDLDependencyVisitor.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTIdentifier.h>
#include <Dictionaries/getDictionaryConfigurationFromAST.h>
#include <Poco/String.h>
namespace DB
{
void DDLDependencyVisitor::visit(const ASTPtr & ast, Data & data)
{
/// Looking for functions in column default expressions and dictionary source definition
if (const auto * function = ast->as<ASTFunction>())
visit(*function, data);
else if (const auto * dict_source = ast->as<ASTFunctionWithKeyValueArguments>())
visit(*dict_source, data);
}
bool DDLDependencyVisitor::needChildVisit(const ASTPtr & node, const ASTPtr & /*child*/)
{
return !node->as<ASTStorage>();
}
void DDLDependencyVisitor::visit(const ASTFunction & function, Data & data)
{
if (function.name == "joinGet" ||
function.name == "dictHas" ||
function.name == "dictIsIn" ||
function.name.starts_with("dictGet"))
{
extractTableNameFromArgument(function, data, 0);
}
else if (Poco::toLower(function.name) == "in")
{
extractTableNameFromArgument(function, data, 1);
}
}
void DDLDependencyVisitor::visit(const ASTFunctionWithKeyValueArguments & dict_source, Data & data)
{
if (dict_source.name != "clickhouse")
return;
if (!dict_source.elements)
return;
auto config = getDictionaryConfigurationFromAST(data.create_query->as<ASTCreateQuery &>(), data.global_context);
auto info = getInfoIfClickHouseDictionarySource(config, data.global_context);
if (!info || !info->is_local)
return;
if (info->table_name.database.empty())
info->table_name.database = data.default_database;
data.dependencies.emplace(std::move(info->table_name));
}
void DDLDependencyVisitor::extractTableNameFromArgument(const ASTFunction & function, Data & data, size_t arg_idx)
{
/// Just ignore incorrect arguments, proper exception will be thrown later
if (!function.arguments || function.arguments->children.size() <= arg_idx)
return;
QualifiedTableName qualified_name;
const auto * arg = function.arguments->as<ASTExpressionList>()->children[arg_idx].get();
if (const auto * literal = arg->as<ASTLiteral>())
{
if (literal->value.getType() != Field::Types::String)
return;
auto maybe_qualified_name = QualifiedTableName::tryParseFromString(literal->value.get<String>());
/// Just return if name if invalid
if (!maybe_qualified_name)
return;
qualified_name = std::move(*maybe_qualified_name);
}
else if (const auto * identifier = arg->as<ASTIdentifier>())
{
auto table_identifier = identifier->createTable();
/// Just return if table identified is invalid
if (!table_identifier)
return;
qualified_name.database = table_identifier->getDatabaseName();
qualified_name.table = table_identifier->shortName();
}
else
{
assert(false);
return;
}
if (qualified_name.database.empty())
qualified_name.database = data.default_database;
data.dependencies.emplace(std::move(qualified_name));
}
}

View File

@ -0,0 +1,42 @@
#pragma once
#include <Core/QualifiedTableName.h>
#include <Parsers/IAST_fwd.h>
#include <Interpreters/InDepthNodeVisitor.h>
namespace DB
{
class ASTFunction;
class ASTFunctionWithKeyValueArguments;
/// Visits ASTCreateQuery and extracts names of table (or dictionary) dependencies
/// from column default expressions (joinGet, dictGet, etc)
/// or dictionary source (for dictionaries from local ClickHouse table).
/// Does not validate AST, works a best-effort way.
class DDLDependencyVisitor
{
public:
struct Data
{
using TableNamesSet = std::set<QualifiedTableName>;
String default_database;
TableNamesSet dependencies;
ContextPtr global_context;
ASTPtr create_query;
};
using Visitor = ConstInDepthNodeVisitor<DDLDependencyVisitor, true>;
static void visit(const ASTPtr & ast, Data & data);
static bool needChildVisit(const ASTPtr & node, const ASTPtr & child);
private:
static void visit(const ASTFunction & function, Data & data);
static void visit(const ASTFunctionWithKeyValueArguments & dict_source, Data & data);
static void extractTableNameFromArgument(const ASTFunction & function, Data & data, size_t arg_idx);
};
using TableLoadingDependenciesVisitor = DDLDependencyVisitor::Visitor;
}

View File

@ -416,40 +416,49 @@ UUID DatabaseAtomic::tryGetTableUUID(const String & table_name) const
return UUIDHelpers::Nil;
}
void DatabaseAtomic::loadStoredObjects(
ContextMutablePtr local_context, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables)
void DatabaseAtomic::beforeLoadingMetadata(ContextMutablePtr /*context*/, bool force_restore, bool /*force_attach*/)
{
if (!force_restore)
return;
/// Recreate symlinks to table data dirs in case of force restore, because some of them may be broken
if (has_force_restore_data_flag)
for (const auto & table_path : fs::directory_iterator(path_to_table_symlinks))
{
for (const auto & table_path : fs::directory_iterator(path_to_table_symlinks))
if (!fs::is_symlink(table_path))
{
if (!fs::is_symlink(table_path))
{
throw Exception(ErrorCodes::ABORTED,
"'{}' is not a symlink. Atomic database should contains only symlinks.", std::string(table_path.path()));
}
fs::remove(table_path);
}
}
DatabaseOrdinary::loadStoredObjects(local_context, has_force_restore_data_flag, force_attach, skip_startup_tables);
if (has_force_restore_data_flag)
{
NameToPathMap table_names;
{
std::lock_guard lock{mutex};
table_names = table_name_to_path;
throw Exception(ErrorCodes::ABORTED,
"'{}' is not a symlink. Atomic database should contains only symlinks.", std::string(table_path.path()));
}
fs::create_directories(path_to_table_symlinks);
for (const auto & table : table_names)
tryCreateSymlink(table.first, table.second, true);
fs::remove(table_path);
}
}
void DatabaseAtomic::loadStoredObjects(
ContextMutablePtr local_context, bool force_restore, bool force_attach, bool skip_startup_tables)
{
beforeLoadingMetadata(local_context, force_restore, force_attach);
DatabaseOrdinary::loadStoredObjects(local_context, force_restore, force_attach, skip_startup_tables);
}
void DatabaseAtomic::startupTables(ThreadPool & thread_pool, bool force_restore, bool force_attach)
{
DatabaseOrdinary::startupTables(thread_pool, force_restore, force_attach);
if (!force_restore)
return;
NameToPathMap table_names;
{
std::lock_guard lock{mutex};
table_names = table_name_to_path;
}
fs::create_directories(path_to_table_symlinks);
for (const auto & table : table_names)
tryCreateSymlink(table.first, table.second, true);
}
void DatabaseAtomic::tryCreateSymlink(const String & table_name, const String & actual_data_path, bool if_data_path_exist)
{
try

View File

@ -47,7 +47,11 @@ public:
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) const override;
void loadStoredObjects(ContextMutablePtr context, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables) override;
void loadStoredObjects(ContextMutablePtr context, bool force_restore, bool force_attach, bool skip_startup_tables) override;
void beforeLoadingMetadata(ContextMutablePtr context, bool force_restore, bool force_attach) override;
void startupTables(ThreadPool & thread_pool, bool force_restore, bool force_attach) override;
/// Atomic database cannot be detached if there is detached table which still in use
void assertCanBeDetached(bool cleanup) override;

View File

@ -103,13 +103,20 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
const String & engine_name = engine_define->engine->name;
const UUID & uuid = create.uuid;
static const std::unordered_set<std::string_view> database_engines{"Ordinary", "Atomic", "Memory",
"Dictionary", "Lazy", "Replicated", "MySQL", "MaterializeMySQL", "MaterializedMySQL",
"PostgreSQL", "MaterializedPostgreSQL", "SQLite"};
if (!database_engines.contains(engine_name))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine name `{}` does not exist", engine_name);
static const std::unordered_set<std::string_view> engines_with_arguments{"MySQL", "MaterializeMySQL", "MaterializedMySQL",
"Lazy", "Replicated", "PostgreSQL", "MaterializedPostgreSQL", "SQLite"};
bool engine_may_have_arguments = engines_with_arguments.contains(engine_name);
if (engine_define->engine->arguments && !engine_may_have_arguments)
throw Exception("Database engine " + engine_name + " cannot have arguments", ErrorCodes::BAD_ARGUMENTS);
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine `{}` cannot have arguments", engine_name);
bool has_unexpected_element = engine_define->engine->parameters || engine_define->partition_by ||
engine_define->primary_key || engine_define->order_by ||
@ -117,8 +124,8 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
bool may_have_settings = endsWith(engine_name, "MySQL") || engine_name == "Replicated" || engine_name == "MaterializedPostgreSQL";
if (has_unexpected_element || (!may_have_settings && engine_define->settings))
throw Exception("Database engine " + engine_name + " cannot have parameters, primary_key, order_by, sample_by, settings",
ErrorCodes::UNKNOWN_ELEMENT_IN_AST);
throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_AST,
"Database engine `{}` cannot have parameters, primary_key, order_by, sample_by, settings", engine_name);
if (engine_name == "Ordinary")
return std::make_shared<DatabaseOrdinary>(database_name, metadata_path, context);
@ -176,12 +183,12 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
if (create.uuid == UUIDHelpers::Nil)
return std::make_shared<DatabaseMaterializedMySQL<DatabaseOrdinary>>(
context, database_name, metadata_path, uuid, mysql_database_name, std::move(mysql_pool), std::move(client)
, std::move(materialize_mode_settings));
context, database_name, metadata_path, uuid, mysql_database_name,
std::move(mysql_pool), std::move(client), std::move(materialize_mode_settings));
else
return std::make_shared<DatabaseMaterializedMySQL<DatabaseAtomic>>(
context, database_name, metadata_path, uuid, mysql_database_name, std::move(mysql_pool), std::move(client)
, std::move(materialize_mode_settings));
context, database_name, metadata_path, uuid, mysql_database_name,
std::move(mysql_pool), std::move(client), std::move(materialize_mode_settings));
}
catch (...)
{
@ -304,7 +311,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
postgresql_replica_settings->loadFromQuery(*engine_define);
return std::make_shared<DatabaseMaterializedPostgreSQL>(
context, metadata_path, uuid, engine_define, create.attach,
context, metadata_path, uuid, create.attach,
database_name, postgres_database_name, connection_info,
std::move(postgresql_replica_settings));
}

View File

@ -36,7 +36,7 @@ DatabaseLazy::DatabaseLazy(const String & name_, const String & metadata_path_,
void DatabaseLazy::loadStoredObjects(
ContextMutablePtr local_context, bool /* has_force_restore_data_flag */, bool /*force_attach*/, bool /* skip_startup_tables */)
ContextMutablePtr local_context, bool /* force_restore */, bool /*force_attach*/, bool /* skip_startup_tables */)
{
iterateMetadataFiles(local_context, [this](const String & file_name)
{

View File

@ -26,7 +26,7 @@ public:
bool canContainDistributedTables() const override { return false; }
void loadStoredObjects(ContextMutablePtr context, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables) override;
void loadStoredObjects(ContextMutablePtr context, bool force_restore, bool force_attach, bool skip_startup_tables) override;
void createTable(
ContextPtr context,

View File

@ -46,7 +46,7 @@ std::pair<String, StoragePtr> createTableFromAST(
const String & database_name,
const String & table_data_path_relative,
ContextMutablePtr context,
bool has_force_restore_data_flag)
bool force_restore)
{
ast_create_query.attach = true;
ast_create_query.database = database_name;
@ -88,7 +88,7 @@ std::pair<String, StoragePtr> createTableFromAST(
context->getGlobalContext(),
columns,
constraints,
has_force_restore_data_flag)
force_restore)
};
}
@ -699,4 +699,55 @@ ASTPtr DatabaseOnDisk::getCreateQueryFromMetadata(const String & database_metada
return ast;
}
void DatabaseOnDisk::modifySettingsMetadata(const SettingsChanges & settings_changes, ContextPtr query_context)
{
std::lock_guard lock(modify_settings_mutex);
auto create_query = getCreateDatabaseQuery()->clone();
auto * create = create_query->as<ASTCreateQuery>();
auto * settings = create->storage->settings;
if (settings)
{
auto & storage_settings = settings->changes;
for (const auto & change : settings_changes)
{
auto it = std::find_if(storage_settings.begin(), storage_settings.end(),
[&](const auto & prev){ return prev.name == change.name; });
if (it != storage_settings.end())
it->value = change.value;
else
storage_settings.push_back(change);
}
}
else
{
auto storage_settings = std::make_shared<ASTSetQuery>();
storage_settings->is_standalone = false;
storage_settings->changes = settings_changes;
create->storage->set(create->storage->settings, storage_settings->clone());
}
create->attach = true;
create->if_not_exists = false;
WriteBufferFromOwnString statement_buf;
formatAST(*create, statement_buf, false);
writeChar('\n', statement_buf);
String statement = statement_buf.str();
String database_name_escaped = escapeForFileName(database_name);
fs::path metadata_root_path = fs::canonical(query_context->getGlobalContext()->getPath());
fs::path metadata_file_tmp_path = fs::path(metadata_root_path) / "metadata" / (database_name_escaped + ".sql.tmp");
fs::path metadata_file_path = fs::path(metadata_root_path) / "metadata" / (database_name_escaped + ".sql");
WriteBufferFromFile out(metadata_file_tmp_path, statement.size(), O_WRONLY | O_CREAT | O_EXCL);
writeString(statement, out);
out.next();
if (getContext()->getSettingsRef().fsync_metadata)
out.sync();
out.close();
fs::rename(metadata_file_tmp_path, metadata_file_path);
}
}

View File

@ -16,7 +16,7 @@ std::pair<String, StoragePtr> createTableFromAST(
const String & database_name,
const String & table_data_path_relative,
ContextMutablePtr context,
bool has_force_restore_data_flag);
bool force_restore);
/** Get the string with the table definition based on the CREATE query.
* It is an ATTACH query that you can execute to create a table from the correspondent database.
@ -74,6 +74,8 @@ public:
void checkMetadataFilenameAvailability(const String & to_table_name) const;
void checkMetadataFilenameAvailabilityUnlocked(const String & to_table_name, std::unique_lock<std::mutex> &) const;
void modifySettingsMetadata(const SettingsChanges & settings_changes, ContextPtr query_context);
protected:
static constexpr const char * create_suffix = ".tmp";
static constexpr const char * drop_suffix = ".tmp_drop";
@ -97,6 +99,9 @@ protected:
const String metadata_path;
const String data_path;
/// For alter settings.
std::mutex modify_settings_mutex;
};
}

View File

@ -4,6 +4,8 @@
#include <Databases/DatabaseOnDisk.h>
#include <Databases/DatabaseOrdinary.h>
#include <Databases/DatabasesCommon.h>
#include <Databases/DDLDependencyVisitor.h>
#include <Databases/TablesLoader.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
@ -27,8 +29,6 @@ namespace fs = std::filesystem;
namespace DB
{
static constexpr size_t PRINT_MESSAGE_EACH_N_OBJECTS = 256;
static constexpr size_t PRINT_MESSAGE_EACH_N_SECONDS = 5;
static constexpr size_t METADATA_FILE_BUFFER_SIZE = 32768;
namespace
@ -39,7 +39,7 @@ namespace
DatabaseOrdinary & database,
const String & database_name,
const String & metadata_path,
bool has_force_restore_data_flag)
bool force_restore)
{
try
{
@ -48,7 +48,7 @@ namespace
database_name,
database.getTableDataPath(query),
context,
has_force_restore_data_flag);
force_restore);
database.attachTable(table_name, table, database.getTableDataPath(query));
}
@ -60,15 +60,6 @@ namespace
throw;
}
}
void logAboutProgress(Poco::Logger * log, size_t processed, size_t total, AtomicStopwatch & watch)
{
if (processed % PRINT_MESSAGE_EACH_N_OBJECTS == 0 || watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS))
{
LOG_INFO(log, "{}%", processed * 100.0 / total);
watch.restart();
}
}
}
@ -84,20 +75,88 @@ DatabaseOrdinary::DatabaseOrdinary(
}
void DatabaseOrdinary::loadStoredObjects(
ContextMutablePtr local_context, bool has_force_restore_data_flag, bool /*force_attach*/, bool skip_startup_tables)
ContextMutablePtr local_context, bool force_restore, bool force_attach, bool skip_startup_tables)
{
/** Tables load faster if they are loaded in sorted (by name) order.
* Otherwise (for the ext4 filesystem), `DirectoryIterator` iterates through them in some order,
* which does not correspond to order tables creation and does not correspond to order of their location on disk.
*/
using FileNames = std::map<std::string, ASTPtr>;
std::mutex file_names_mutex;
FileNames file_names;
size_t total_dictionaries = 0;
ParsedTablesMetadata metadata;
loadTablesMetadata(local_context, metadata);
auto process_metadata = [&file_names, &total_dictionaries, &file_names_mutex, this](
const String & file_name)
size_t total_tables = metadata.parsed_tables.size() - metadata.total_dictionaries;
AtomicStopwatch watch;
std::atomic<size_t> dictionaries_processed{0};
std::atomic<size_t> tables_processed{0};
ThreadPool pool;
/// We must attach dictionaries before attaching tables
/// because while we're attaching tables we may need to have some dictionaries attached
/// (for example, dictionaries can be used in the default expressions for some tables).
/// On the other hand we can attach any dictionary (even sourced from ClickHouse table)
/// without having any tables attached. It is so because attaching of a dictionary means
/// loading of its config only, it doesn't involve loading the dictionary itself.
/// Attach dictionaries.
for (const auto & name_with_path_and_query : metadata.parsed_tables)
{
const auto & name = name_with_path_and_query.first;
const auto & path = name_with_path_and_query.second.path;
const auto & ast = name_with_path_and_query.second.ast;
const auto & create_query = ast->as<const ASTCreateQuery &>();
if (create_query.is_dictionary)
{
pool.scheduleOrThrowOnError([&]()
{
loadTableFromMetadata(local_context, path, name, ast, force_restore);
/// Messages, so that it's not boring to wait for the server to load for a long time.
logAboutProgress(log, ++dictionaries_processed, metadata.total_dictionaries, watch);
});
}
}
pool.wait();
/// Attach tables.
for (const auto & name_with_path_and_query : metadata.parsed_tables)
{
const auto & name = name_with_path_and_query.first;
const auto & path = name_with_path_and_query.second.path;
const auto & ast = name_with_path_and_query.second.ast;
const auto & create_query = ast->as<const ASTCreateQuery &>();
if (!create_query.is_dictionary)
{
pool.scheduleOrThrowOnError([&]()
{
loadTableFromMetadata(local_context, path, name, ast, force_restore);
/// Messages, so that it's not boring to wait for the server to load for a long time.
logAboutProgress(log, ++tables_processed, total_tables, watch);
});
}
}
pool.wait();
if (!skip_startup_tables)
{
/// After all tables was basically initialized, startup them.
startupTables(pool, force_restore, force_attach);
}
}
void DatabaseOrdinary::loadTablesMetadata(ContextPtr local_context, ParsedTablesMetadata & metadata)
{
size_t prev_tables_count = metadata.parsed_tables.size();
size_t prev_total_dictionaries = metadata.total_dictionaries;
auto process_metadata = [&metadata, this](const String & file_name)
{
fs::path path(getMetadataPath());
fs::path file_path(file_name);
@ -122,9 +181,29 @@ void DatabaseOrdinary::loadStoredObjects(
return;
}
std::lock_guard lock{file_names_mutex};
file_names[file_name] = ast;
total_dictionaries += create_query->is_dictionary;
TableLoadingDependenciesVisitor::Data data;
data.default_database = metadata.default_database;
data.create_query = ast;
data.global_context = getContext();
TableLoadingDependenciesVisitor visitor{data};
visitor.visit(ast);
QualifiedTableName qualified_name{database_name, create_query->table};
std::lock_guard lock{metadata.mutex};
metadata.parsed_tables[qualified_name] = ParsedTableMetadata{full_path.string(), ast};
if (data.dependencies.empty())
{
metadata.independent_database_objects.emplace_back(std::move(qualified_name));
}
else
{
for (const auto & dependency : data.dependencies)
{
metadata.dependencies_info[dependency].dependent_database_objects.push_back(qualified_name);
++metadata.dependencies_info[qualified_name].dependencies_count;
}
}
metadata.total_dictionaries += create_query->is_dictionary;
}
}
catch (Exception & e)
@ -136,86 +215,29 @@ void DatabaseOrdinary::loadStoredObjects(
iterateMetadataFiles(local_context, process_metadata);
size_t total_tables = file_names.size() - total_dictionaries;
size_t objects_in_database = metadata.parsed_tables.size() - prev_tables_count;
size_t dictionaries_in_database = metadata.total_dictionaries - prev_total_dictionaries;
size_t tables_in_database = objects_in_database - dictionaries_in_database;
LOG_INFO(log, "Total {} tables and {} dictionaries.", total_tables, total_dictionaries);
AtomicStopwatch watch;
std::atomic<size_t> tables_processed{0};
ThreadPool pool;
/// We must attach dictionaries before attaching tables
/// because while we're attaching tables we may need to have some dictionaries attached
/// (for example, dictionaries can be used in the default expressions for some tables).
/// On the other hand we can attach any dictionary (even sourced from ClickHouse table)
/// without having any tables attached. It is so because attaching of a dictionary means
/// loading of its config only, it doesn't involve loading the dictionary itself.
/// Attach dictionaries.
for (const auto & name_with_query : file_names)
{
const auto & create_query = name_with_query.second->as<const ASTCreateQuery &>();
if (create_query.is_dictionary)
{
pool.scheduleOrThrowOnError([&]()
{
tryAttachTable(
local_context,
create_query,
*this,
database_name,
getMetadataPath() + name_with_query.first,
has_force_restore_data_flag);
/// Messages, so that it's not boring to wait for the server to load for a long time.
logAboutProgress(log, ++tables_processed, total_tables, watch);
});
}
}
pool.wait();
/// Attach tables.
for (const auto & name_with_query : file_names)
{
const auto & create_query = name_with_query.second->as<const ASTCreateQuery &>();
if (!create_query.is_dictionary)
{
pool.scheduleOrThrowOnError([&]()
{
tryAttachTable(
local_context,
create_query,
*this,
database_name,
getMetadataPath() + name_with_query.first,
has_force_restore_data_flag);
/// Messages, so that it's not boring to wait for the server to load for a long time.
logAboutProgress(log, ++tables_processed, total_tables, watch);
});
}
}
pool.wait();
if (!skip_startup_tables)
{
/// After all tables was basically initialized, startup them.
startupTablesImpl(pool);
}
LOG_INFO(log, "Metadata processed, database {} has {} tables and {} dictionaries in total.",
database_name, tables_in_database, dictionaries_in_database);
}
void DatabaseOrdinary::startupTables()
void DatabaseOrdinary::loadTableFromMetadata(ContextMutablePtr local_context, const String & file_path, const QualifiedTableName & name, const ASTPtr & ast, bool force_restore)
{
ThreadPool pool;
startupTablesImpl(pool);
assert(name.database == database_name);
const auto & create_query = ast->as<const ASTCreateQuery &>();
tryAttachTable(
local_context,
create_query,
*this,
name.database,
file_path,
force_restore);
}
void DatabaseOrdinary::startupTablesImpl(ThreadPool & thread_pool)
void DatabaseOrdinary::startupTables(ThreadPool & thread_pool, bool /*force_restore*/, bool /*force_attach*/)
{
LOG_INFO(log, "Starting up tables.");
@ -240,6 +262,7 @@ void DatabaseOrdinary::startupTablesImpl(ThreadPool & thread_pool)
}
catch (...)
{
/// We have to wait for jobs to finish here, because job function has reference to variables on the stack of current thread.
thread_pool.wait();
throw;
}

View File

@ -16,13 +16,20 @@ class DatabaseOrdinary : public DatabaseOnDisk
public:
DatabaseOrdinary(const String & name_, const String & metadata_path_, ContextPtr context);
DatabaseOrdinary(
const String & name_, const String & metadata_path_, const String & data_path_, const String & logger, ContextPtr context_);
const String & name_, const String & metadata_path_, const String & data_path_,
const String & logger, ContextPtr context_);
String getEngineName() const override { return "Ordinary"; }
void loadStoredObjects(ContextMutablePtr context, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables) override;
void loadStoredObjects(ContextMutablePtr context, bool force_restore, bool force_attach, bool skip_startup_tables) override;
void startupTables() override;
bool supportsLoadingInTopologicalOrder() const override { return true; }
void loadTablesMetadata(ContextPtr context, ParsedTablesMetadata & metadata) override;
void loadTableFromMetadata(ContextMutablePtr local_context, const String & file_path, const QualifiedTableName & name, const ASTPtr & ast, bool force_restore) override;
void startupTables(ThreadPool & thread_pool, bool force_restore, bool force_attach) override;
void alterTable(
ContextPtr context,
@ -36,8 +43,6 @@ protected:
const String & table_metadata_path,
const String & statement,
ContextPtr query_context);
void startupTablesImpl(ThreadPool & thread_pool);
};
}

View File

@ -305,13 +305,21 @@ void DatabaseReplicated::createReplicaNodesInZooKeeper(const zkutil::ZooKeeperPt
createEmptyLogEntry(current_zookeeper);
}
void DatabaseReplicated::loadStoredObjects(
ContextMutablePtr local_context, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables)
void DatabaseReplicated::beforeLoadingMetadata(ContextMutablePtr /*context*/, bool /*force_restore*/, bool force_attach)
{
tryConnectToZooKeeperAndInitDatabase(force_attach);
}
DatabaseAtomic::loadStoredObjects(local_context, has_force_restore_data_flag, force_attach, skip_startup_tables);
void DatabaseReplicated::loadStoredObjects(
ContextMutablePtr local_context, bool force_restore, bool force_attach, bool skip_startup_tables)
{
beforeLoadingMetadata(local_context, force_restore, force_attach);
DatabaseAtomic::loadStoredObjects(local_context, force_restore, force_attach, skip_startup_tables);
}
void DatabaseReplicated::startupTables(ThreadPool & thread_pool, bool force_restore, bool force_attach)
{
DatabaseAtomic::startupTables(thread_pool, force_restore, force_attach);
ddl_worker = std::make_unique<DatabaseReplicatedDDLWorker>(this, getContext());
ddl_worker->startup();
}

View File

@ -57,7 +57,12 @@ public:
void drop(ContextPtr /*context*/) override;
void loadStoredObjects(ContextMutablePtr context, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables) override;
void loadStoredObjects(ContextMutablePtr context, bool force_restore, bool force_attach, bool skip_startup_tables) override;
void beforeLoadingMetadata(ContextMutablePtr context, bool force_restore, bool force_attach) override;
void startupTables(ThreadPool & thread_pool, bool force_restore, bool force_attach) override;
void shutdown() override;
friend struct DatabaseReplicatedTask;

View File

@ -5,6 +5,7 @@
#include <Storages/IStorage_fwd.h>
#include <Interpreters/Context_fwd.h>
#include <Common/Exception.h>
#include <Common/ThreadPool.h>
#include <Core/UUID.h>
#include <ctime>
@ -24,12 +25,17 @@ struct IndicesDescription;
struct StorageInMemoryMetadata;
struct StorageID;
class ASTCreateQuery;
class AlterCommands;
class SettingsChanges;
using DictionariesWithID = std::vector<std::pair<String, UUID>>;
struct ParsedTablesMetadata;
struct QualifiedTableName;
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int CANNOT_GET_CREATE_TABLE_QUERY;
extern const int LOGICAL_ERROR;
}
class IDatabaseTablesIterator
@ -125,13 +131,32 @@ public:
/// You can call only once, right after the object is created.
virtual void loadStoredObjects(
ContextMutablePtr /*context*/,
bool /*has_force_restore_data_flag*/,
bool /*force_restore*/,
bool /*force_attach*/ = false,
bool /* skip_startup_tables */ = false)
{
}
virtual void startupTables() {}
virtual bool supportsLoadingInTopologicalOrder() const { return false; }
virtual void beforeLoadingMetadata(
ContextMutablePtr /*context*/,
bool /*force_restore*/,
bool /*force_attach*/)
{
}
virtual void loadTablesMetadata(ContextPtr /*local_context*/, ParsedTablesMetadata & /*metadata*/)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented");
}
virtual void loadTableFromMetadata(ContextMutablePtr /*local_context*/, const String & /*file_path*/, const QualifiedTableName & /*name*/, const ASTPtr & /*ast*/, bool /*force_restore*/)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented");
}
virtual void startupTables(ThreadPool & /*thread_pool*/, bool /*force_restore*/, bool /*force_attach*/) {}
/// Check the existence of the table.
virtual bool isTableExist(const String & name, ContextPtr context) const = 0;
@ -280,6 +305,13 @@ public:
/// Delete data and metadata stored inside the database, if exists.
virtual void drop(ContextPtr /*context*/) {}
virtual void applySettingsChanges(const SettingsChanges &, ContextPtr)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"Database engine {} either does not support settings, or does not support altering settings",
getEngineName());
}
virtual ~IDatabase() = default;
protected:

View File

@ -94,10 +94,10 @@ void DatabaseMaterializedMySQL<Base>::setException(const std::exception_ptr & ex
}
template <typename Base>
void DatabaseMaterializedMySQL<Base>::loadStoredObjects(
ContextMutablePtr context_, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables)
void DatabaseMaterializedMySQL<Base>::startupTables(ThreadPool & thread_pool, bool force_restore, bool force_attach)
{
Base::loadStoredObjects(context_, has_force_restore_data_flag, force_attach, skip_startup_tables);
Base::startupTables(thread_pool, force_restore, force_attach);
if (!force_attach)
materialize_thread.assertMySQLAvailable();

View File

@ -43,7 +43,7 @@ protected:
public:
String getEngineName() const override { return "MaterializedMySQL"; }
void loadStoredObjects(ContextMutablePtr context_, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables) override;
void startupTables(ThreadPool & thread_pool, bool force_restore, bool force_attach) override;
void createTable(ContextPtr context_, const String & name, const StoragePtr & table, const ASTPtr & query) override;

View File

@ -5,45 +5,47 @@
#include <Storages/PostgreSQL/StorageMaterializedPostgreSQL.h>
#include <Databases/PostgreSQL/fetchPostgreSQLTableStructure.h>
#include <common/logger_useful.h>
#include <Common/Macros.h>
#include <Core/UUID.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeArray.h>
#include <Databases/DatabaseOrdinary.h>
#include <Databases/DatabaseAtomic.h>
#include <Storages/StoragePostgreSQL.h>
#include <Storages/AlterCommands.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/queryToString.h>
#include <Interpreters/InterpreterAlterQuery.h>
#include <Common/escapeForFileName.h>
#include <Poco/DirectoryIterator.h>
#include <Poco/File.h>
#include <Common/Macros.h>
#include <common/logger_useful.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int LOGICAL_ERROR;
extern const int QUERY_NOT_ALLOWED;
extern const int UNKNOWN_TABLE;
extern const int BAD_ARGUMENTS;
}
DatabaseMaterializedPostgreSQL::DatabaseMaterializedPostgreSQL(
ContextPtr context_,
const String & metadata_path_,
UUID uuid_,
const ASTStorage * database_engine_define_,
bool is_attach_,
const String & database_name_,
const String & postgres_database_name,
const postgres::ConnectionInfo & connection_info_,
std::unique_ptr<MaterializedPostgreSQLSettings> settings_)
: DatabaseAtomic(database_name_, metadata_path_, uuid_, "DatabaseMaterializedPostgreSQL (" + database_name_ + ")", context_)
, database_engine_define(database_engine_define_->clone())
, is_attach(is_attach_)
, remote_database_name(postgres_database_name)
, connection_info(connection_info_)
@ -64,11 +66,10 @@ void DatabaseMaterializedPostgreSQL::startSynchronization()
*settings,
/* is_materialized_postgresql_database = */ true);
postgres::Connection connection(connection_info);
NameSet tables_to_replicate;
try
{
tables_to_replicate = replication_handler->fetchRequiredTables(connection);
tables_to_replicate = replication_handler->fetchRequiredTables();
}
catch (...)
{
@ -87,12 +88,12 @@ void DatabaseMaterializedPostgreSQL::startSynchronization()
if (storage)
{
/// Nested table was already created and synchronized.
storage = StorageMaterializedPostgreSQL::create(storage, getContext());
storage = StorageMaterializedPostgreSQL::create(storage, getContext(), remote_database_name, table_name);
}
else
{
/// Nested table does not exist and will be created by replication thread.
storage = StorageMaterializedPostgreSQL::create(StorageID(database_name, table_name), getContext());
storage = StorageMaterializedPostgreSQL::create(StorageID(database_name, table_name), getContext(), remote_database_name, table_name);
}
/// Cache MaterializedPostgreSQL wrapper over nested table.
@ -107,11 +108,9 @@ void DatabaseMaterializedPostgreSQL::startSynchronization()
}
void DatabaseMaterializedPostgreSQL::loadStoredObjects(
ContextMutablePtr local_context, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables)
void DatabaseMaterializedPostgreSQL::startupTables(ThreadPool & thread_pool, bool force_restore, bool force_attach)
{
DatabaseAtomic::loadStoredObjects(local_context, has_force_restore_data_flag, force_attach, skip_startup_tables);
DatabaseAtomic::startupTables(thread_pool, force_restore, force_attach);
try
{
startSynchronization();
@ -126,6 +125,41 @@ void DatabaseMaterializedPostgreSQL::loadStoredObjects(
}
void DatabaseMaterializedPostgreSQL::applySettingsChanges(const SettingsChanges & settings_changes, ContextPtr query_context)
{
std::lock_guard lock(handler_mutex);
bool need_update_on_disk = false;
for (const auto & change : settings_changes)
{
if (!settings->has(change.name))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine {} does not support setting `{}`", getEngineName(), change.name);
if ((change.name == "materialized_postgresql_tables_list"))
{
if (!query_context->isInternalQuery())
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Changing setting `{}` is not allowed", change.name);
need_update_on_disk = true;
}
else if ((change.name == "materialized_postgresql_allow_automatic_update") || (change.name == "materialized_postgresql_max_block_size"))
{
replication_handler->setSetting(change);
need_update_on_disk = true;
}
else
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown setting");
}
settings->applyChange(change);
}
if (need_update_on_disk)
DatabaseOnDisk::modifySettingsMetadata(settings_changes, query_context);
}
StoragePtr DatabaseMaterializedPostgreSQL::tryGetTable(const String & name, ContextPtr local_context) const
{
/// In otder to define which table access is needed - to MaterializedPostgreSQL table (only in case of SELECT queries) or
@ -153,6 +187,64 @@ StoragePtr DatabaseMaterializedPostgreSQL::tryGetTable(const String & name, Cont
}
/// `except` is not empty in case it is detach and it will contain only one table name - name of detached table.
/// In case we have a user defined setting `materialized_postgresql_tables_list`, then list of tables is always taken there.
/// Otherwise we traverse materialized storages to find out the list.
String DatabaseMaterializedPostgreSQL::getFormattedTablesList(const String & except) const
{
String tables_list;
for (const auto & table : materialized_tables)
{
if (table.first == except)
continue;
if (!tables_list.empty())
tables_list += ',';
tables_list += table.first;
}
return tables_list;
}
ASTPtr DatabaseMaterializedPostgreSQL::getCreateTableQueryImpl(const String & table_name, ContextPtr local_context, bool throw_on_error) const
{
if (!local_context->hasQueryContext())
return DatabaseAtomic::getCreateTableQueryImpl(table_name, local_context, throw_on_error);
std::lock_guard lock(handler_mutex);
auto storage = StorageMaterializedPostgreSQL::create(StorageID(database_name, table_name), getContext(), remote_database_name, table_name);
auto ast_storage = replication_handler->getCreateNestedTableQuery(storage.get(), table_name);
assert_cast<ASTCreateQuery *>(ast_storage.get())->uuid = UUIDHelpers::generateV4();
return ast_storage;
}
ASTPtr DatabaseMaterializedPostgreSQL::createAlterSettingsQuery(const SettingChange & new_setting)
{
auto set = std::make_shared<ASTSetQuery>();
set->is_standalone = false;
set->changes = {new_setting};
auto command = std::make_shared<ASTAlterCommand>();
command->type = ASTAlterCommand::Type::MODIFY_DATABASE_SETTING;
command->settings_changes = std::move(set);
auto command_list = std::make_shared<ASTExpressionList>();
command_list->children.push_back(command);
auto query = std::make_shared<ASTAlterQuery>();
auto * alter = query->as<ASTAlterQuery>();
alter->alter_object = ASTAlterQuery::AlterObjectType::DATABASE;
alter->database = database_name;
alter->set(alter->command_list, command_list);
return query;
}
void DatabaseMaterializedPostgreSQL::createTable(ContextPtr local_context, const String & table_name, const StoragePtr & table, const ASTPtr & query)
{
/// Create table query can only be called from replication thread.
@ -162,8 +254,123 @@ void DatabaseMaterializedPostgreSQL::createTable(ContextPtr local_context, const
return;
}
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"Create table query allowed only for ReplacingMergeTree engine and from synchronization thread");
const auto & create = query->as<ASTCreateQuery>();
if (!create->attach)
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "CREATE TABLE is not allowed for database engine {}. Use ATTACH TABLE instead", getEngineName());
/// Create ReplacingMergeTree table.
auto query_copy = query->clone();
auto * create_query = assert_cast<ASTCreateQuery *>(query_copy.get());
create_query->attach = false;
create_query->attach_short_syntax = false;
DatabaseAtomic::createTable(StorageMaterializedPostgreSQL::makeNestedTableContext(local_context), table_name, table, query_copy);
/// Attach MaterializedPostgreSQL table.
attachTable(table_name, table, {});
}
void DatabaseMaterializedPostgreSQL::attachTable(const String & table_name, const StoragePtr & table, const String & relative_table_path)
{
/// If there is query context then we need to attach materialized storage.
/// If there is no query context then we need to attach internal storage from atomic database.
if (CurrentThread::isInitialized() && CurrentThread::get().getQueryContext())
{
auto current_context = Context::createCopy(getContext()->getGlobalContext());
current_context->setInternalQuery(true);
/// We just came from createTable() and created nested table there. Add assert.
auto nested_table = DatabaseAtomic::tryGetTable(table_name, current_context);
assert(nested_table != nullptr);
try
{
auto tables_to_replicate = settings->materialized_postgresql_tables_list.value;
if (tables_to_replicate.empty())
tables_to_replicate = getFormattedTablesList();
/// tables_to_replicate can be empty if postgres database had no tables when this database was created.
SettingChange new_setting("materialized_postgresql_tables_list", tables_to_replicate.empty() ? table_name : (tables_to_replicate + "," + table_name));
auto alter_query = createAlterSettingsQuery(new_setting);
InterpreterAlterQuery(alter_query, current_context).execute();
auto storage = StorageMaterializedPostgreSQL::create(table, getContext(), remote_database_name, table_name);
materialized_tables[table_name] = storage;
std::lock_guard lock(handler_mutex);
replication_handler->addTableToReplication(dynamic_cast<StorageMaterializedPostgreSQL *>(storage.get()), table_name);
}
catch (...)
{
/// This is a failed attach table. Remove already created nested table.
DatabaseAtomic::dropTable(current_context, table_name, true);
throw;
}
}
else
{
DatabaseAtomic::attachTable(table_name, table, relative_table_path);
}
}
StoragePtr DatabaseMaterializedPostgreSQL::detachTable(const String & table_name)
{
/// If there is query context then we need to detach materialized storage.
/// If there is no query context then we need to detach internal storage from atomic database.
if (CurrentThread::isInitialized() && CurrentThread::get().getQueryContext())
{
auto & table_to_delete = materialized_tables[table_name];
if (!table_to_delete)
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Materialized table `{}` does not exist", table_name);
auto tables_to_replicate = getFormattedTablesList(table_name);
/// tables_to_replicate can be empty if postgres database had no tables when this database was created.
SettingChange new_setting("materialized_postgresql_tables_list", tables_to_replicate);
auto alter_query = createAlterSettingsQuery(new_setting);
{
auto current_context = Context::createCopy(getContext()->getGlobalContext());
current_context->setInternalQuery(true);
InterpreterAlterQuery(alter_query, current_context).execute();
}
auto nested = table_to_delete->as<StorageMaterializedPostgreSQL>()->getNested();
if (!nested)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Inner table `{}` does not exist", table_name);
std::lock_guard lock(handler_mutex);
replication_handler->removeTableFromReplication(table_name);
try
{
auto current_context = Context::createCopy(getContext()->getGlobalContext());
current_context->makeQueryContext();
DatabaseAtomic::dropTable(current_context, table_name, true);
}
catch (Exception & e)
{
/// We already removed this table from replication and adding it back will be an overkill..
/// TODO: this is bad, we leave a table lying somewhere not dropped, and if user will want
/// to move it back into replication, he will fail to do so because there is undropped nested with the same name.
/// This can also happen if we crash after removing table from replication and before dropping nested.
/// As a solution, we could drop a table if it already exists and add a fresh one instead for these two cases.
/// TODO: sounds good.
materialized_tables.erase(table_name);
e.addMessage("while removing table `" + table_name + "` from replication");
throw;
}
materialized_tables.erase(table_name);
return nullptr;
}
else
{
return DatabaseAtomic::detachTable(table_name);
}
}
@ -176,6 +383,7 @@ void DatabaseMaterializedPostgreSQL::shutdown()
void DatabaseMaterializedPostgreSQL::stopReplication()
{
std::lock_guard lock(handler_mutex);
if (replication_handler)
replication_handler->shutdown();
@ -193,6 +401,7 @@ void DatabaseMaterializedPostgreSQL::dropTable(ContextPtr local_context, const S
void DatabaseMaterializedPostgreSQL::drop(ContextPtr local_context)
{
std::lock_guard lock(handler_mutex);
if (replication_handler)
replication_handler->shutdownFinal();
@ -207,7 +416,6 @@ DatabaseTablesIteratorPtr DatabaseMaterializedPostgreSQL::getTablesIterator(
return DatabaseAtomic::getTablesIterator(StorageMaterializedPostgreSQL::makeNestedTableContext(local_context), filter_by_table_name);
}
}
#endif

View File

@ -32,7 +32,6 @@ public:
ContextPtr context_,
const String & metadata_path_,
UUID uuid_,
const ASTStorage * database_engine_define_,
bool is_attach_,
const String & database_name_,
const String & postgres_database_name,
@ -43,14 +42,18 @@ public:
String getMetadataPath() const override { return metadata_path; }
void loadStoredObjects(ContextMutablePtr, bool, bool force_attach, bool skip_startup_tables) override;
void startupTables(ThreadPool & thread_pool, bool force_restore, bool force_attach) override;
DatabaseTablesIteratorPtr
getTablesIterator(ContextPtr context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const override;
StoragePtr tryGetTable(const String & name, ContextPtr context) const override;
void createTable(ContextPtr context, const String & name, const StoragePtr & table, const ASTPtr & query) override;
void createTable(ContextPtr context, const String & table_name, const StoragePtr & table, const ASTPtr & query) override;
void attachTable(const String & table_name, const StoragePtr & table, const String & relative_table_path) override;
StoragePtr detachTable(const String & table_name) override;
void dropTable(ContextPtr local_context, const String & name, bool no_delay) override;
@ -58,12 +61,22 @@ public:
void stopReplication();
void applySettingsChanges(const SettingsChanges & settings_changes, ContextPtr query_context) override;
void shutdown() override;
String getPostgreSQLDatabaseName() const { return remote_database_name; }
protected:
ASTPtr getCreateTableQueryImpl(const String & table_name, ContextPtr local_context, bool throw_on_error) const override;
private:
void startSynchronization();
ASTPtr database_engine_define;
ASTPtr createAlterSettingsQuery(const SettingChange & new_setting);
String getFormattedTablesList(const String & except = {}) const;
bool is_attach;
String remote_database_name;
postgres::ConnectionInfo connection_info;
@ -72,6 +85,7 @@ private:
std::shared_ptr<PostgreSQLReplicationHandler> replication_handler;
std::map<std::string, StoragePtr> materialized_tables;
mutable std::mutex tables_mutex;
mutable std::mutex handler_mutex;
};
}

View File

@ -290,12 +290,20 @@ PostgreSQLTableStructure fetchPostgreSQLTableStructure(
pqxx::ReplicationTransaction & tx, const String & postgres_table_name, bool use_nulls,
bool with_primary_key, bool with_replica_identity_index);
template
PostgreSQLTableStructure fetchPostgreSQLTableStructure(
pqxx::nontransaction & tx, const String & postgres_table_name, bool use_nulls,
bool with_primary_key, bool with_replica_identity_index);
template
std::unordered_set<std::string> fetchPostgreSQLTablesList(pqxx::work & tx, const String & postgres_schema);
template
std::unordered_set<std::string> fetchPostgreSQLTablesList(pqxx::ReadTransaction & tx, const String & postgres_schema);
template
std::unordered_set<std::string> fetchPostgreSQLTablesList(pqxx::nontransaction & tx, const String & postgres_schema);
}
#endif

View File

@ -0,0 +1,255 @@
#include <Databases/TablesLoader.h>
#include <Databases/IDatabase.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExternalDictionariesLoader.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <common/logger_useful.h>
#include <Common/ThreadPool.h>
#include <numeric>
namespace DB
{
namespace ErrorCodes
{
extern const int INFINITE_LOOP;
extern const int LOGICAL_ERROR;
}
static constexpr size_t PRINT_MESSAGE_EACH_N_OBJECTS = 256;
static constexpr size_t PRINT_MESSAGE_EACH_N_SECONDS = 5;
void logAboutProgress(Poco::Logger * log, size_t processed, size_t total, AtomicStopwatch & watch)
{
if (processed % PRINT_MESSAGE_EACH_N_OBJECTS == 0 || watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS))
{
LOG_INFO(log, "{}%", processed * 100.0 / total);
watch.restart();
}
}
TablesLoader::TablesLoader(ContextMutablePtr global_context_, Databases databases_, bool force_restore_, bool force_attach_)
: global_context(global_context_)
, databases(std::move(databases_))
, force_restore(force_restore_)
, force_attach(force_attach_)
{
metadata.default_database = global_context->getCurrentDatabase();
log = &Poco::Logger::get("TablesLoader");
}
void TablesLoader::loadTables()
{
bool need_resolve_dependencies = !global_context->getConfigRef().has("ignore_table_dependencies_on_metadata_loading");
/// Load all Lazy, MySQl, PostgreSQL, SQLite, etc databases first.
for (auto & database : databases)
{
if (need_resolve_dependencies && database.second->supportsLoadingInTopologicalOrder())
databases_to_load.push_back(database.first);
else
database.second->loadStoredObjects(global_context, force_restore, force_attach, true);
}
if (databases_to_load.empty())
return;
/// Read and parse metadata from Ordinary, Atomic, Materialized*, Replicated, etc databases. Build dependency graph.
for (auto & database_name : databases_to_load)
{
databases[database_name]->beforeLoadingMetadata(global_context, force_restore, force_attach);
databases[database_name]->loadTablesMetadata(global_context, metadata);
}
LOG_INFO(log, "Parsed metadata of {} tables in {} databases in {} sec",
metadata.parsed_tables.size(), databases_to_load.size(), stopwatch.elapsedSeconds());
stopwatch.restart();
logDependencyGraph();
/// Some tables were loaded by database with loadStoredObjects(...). Remove them from graph if necessary.
removeUnresolvableDependencies();
loadTablesInTopologicalOrder(pool);
}
void TablesLoader::startupTables()
{
/// Startup tables after all tables are loaded. Background tasks (merges, mutations, etc) may slow down data parts loading.
for (auto & database : databases)
database.second->startupTables(pool, force_restore, force_attach);
}
void TablesLoader::removeUnresolvableDependencies()
{
auto need_exclude_dependency = [this](const QualifiedTableName & dependency_name, const DependenciesInfo & info)
{
/// Table exists and will be loaded
if (metadata.parsed_tables.contains(dependency_name))
return false;
/// Table exists and it's already loaded
if (DatabaseCatalog::instance().isTableExist(StorageID(dependency_name.database, dependency_name.table), global_context))
return true;
/// It's XML dictionary. It was loaded before tables and DDL dictionaries.
if (dependency_name.database == metadata.default_database &&
global_context->getExternalDictionariesLoader().has(dependency_name.table))
return true;
/// Some tables depends on table "dependency_name", but there is no such table in DatabaseCatalog and we don't have its metadata.
/// We will ignore it and try to load dependent tables without "dependency_name"
/// (but most likely dependent tables will fail to load).
LOG_WARNING(log, "Tables {} depend on {}, but seems like the it does not exist. Will ignore it and try to load existing tables",
fmt::join(info.dependent_database_objects, ", "), dependency_name);
if (info.dependencies_count)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Table {} does not exist, but we have seen its AST and found {} dependencies."
"It's a bug", dependency_name, info.dependencies_count);
if (info.dependent_database_objects.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Table {} does not have dependencies and dependent tables as it expected to."
"It's a bug", dependency_name);
return true;
};
auto table_it = metadata.dependencies_info.begin();
while (table_it != metadata.dependencies_info.end())
{
auto & info = table_it->second;
if (need_exclude_dependency(table_it->first, info))
table_it = removeResolvedDependency(table_it, metadata.independent_database_objects);
else
++table_it;
}
}
void TablesLoader::loadTablesInTopologicalOrder(ThreadPool & pool)
{
/// Load independent tables in parallel.
/// Then remove loaded tables from dependency graph, find tables/dictionaries that do not have unresolved dependencies anymore,
/// move them to the list of independent tables and load.
/// Repeat until we have some tables to load.
/// If we do not, then either all objects are loaded or there is cyclic dependency.
/// Complexity: O(V + E)
size_t level = 0;
do
{
assert(metadata.parsed_tables.size() == tables_processed + metadata.independent_database_objects.size() + getNumberOfTablesWithDependencies());
logDependencyGraph();
startLoadingIndependentTables(pool, level);
TableNames new_independent_database_objects;
for (const auto & table_name : metadata.independent_database_objects)
{
auto info_it = metadata.dependencies_info.find(table_name);
if (info_it == metadata.dependencies_info.end())
{
/// No tables depend on table_name and it was not even added to dependencies_info
continue;
}
removeResolvedDependency(info_it, new_independent_database_objects);
}
pool.wait();
metadata.independent_database_objects = std::move(new_independent_database_objects);
++level;
} while (!metadata.independent_database_objects.empty());
checkCyclicDependencies();
}
DependenciesInfosIter TablesLoader::removeResolvedDependency(const DependenciesInfosIter & info_it, TableNames & independent_database_objects)
{
auto & info = info_it->second;
if (info.dependencies_count)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Table {} is in list of independent tables, but dependencies count is {}."
"It's a bug", info_it->first, info.dependencies_count);
if (info.dependent_database_objects.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Table {} does not have dependent tables. It's a bug", info_it->first);
/// Decrement number of dependencies for each dependent table
for (auto & dependent_table : info.dependent_database_objects)
{
auto & dependent_info = metadata.dependencies_info[dependent_table];
auto & dependencies_count = dependent_info.dependencies_count;
if (dependencies_count == 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to decrement 0 dependencies counter for {}. It's a bug", dependent_table);
--dependencies_count;
if (dependencies_count == 0)
{
independent_database_objects.push_back(dependent_table);
if (dependent_info.dependent_database_objects.empty())
metadata.dependencies_info.erase(dependent_table);
}
}
return metadata.dependencies_info.erase(info_it);
}
void TablesLoader::startLoadingIndependentTables(ThreadPool & pool, size_t level)
{
size_t total_tables = metadata.parsed_tables.size();
LOG_INFO(log, "Loading {} tables with {} dependency level", metadata.independent_database_objects.size(), level);
for (const auto & table_name : metadata.independent_database_objects)
{
pool.scheduleOrThrowOnError([this, total_tables, &table_name]()
{
const auto & path_and_query = metadata.parsed_tables[table_name];
databases[table_name.database]->loadTableFromMetadata(global_context, path_and_query.path, table_name, path_and_query.ast, force_restore);
logAboutProgress(log, ++tables_processed, total_tables, stopwatch);
});
}
}
size_t TablesLoader::getNumberOfTablesWithDependencies() const
{
size_t number_of_tables_with_dependencies = 0;
for (const auto & info : metadata.dependencies_info)
if (info.second.dependencies_count)
++number_of_tables_with_dependencies;
return number_of_tables_with_dependencies;
}
void TablesLoader::checkCyclicDependencies() const
{
/// Loading is finished if all dependencies are resolved
if (metadata.dependencies_info.empty())
return;
for (const auto & info : metadata.dependencies_info)
{
LOG_WARNING(log, "Cannot resolve dependencies: Table {} have {} dependencies and {} dependent tables. List of dependent tables: {}",
info.first, info.second.dependencies_count,
info.second.dependent_database_objects.size(), fmt::join(info.second.dependent_database_objects, ", "));
assert(info.second.dependencies_count == 0);
}
throw Exception(ErrorCodes::INFINITE_LOOP, "Cannot attach {} tables due to cyclic dependencies. "
"See server log for details.", metadata.dependencies_info.size());
}
void TablesLoader::logDependencyGraph() const
{
LOG_TEST(log, "Have {} independent tables: {}",
metadata.independent_database_objects.size(),
fmt::join(metadata.independent_database_objects, ", "));
for (const auto & dependencies : metadata.dependencies_info)
{
LOG_TEST(log,
"Table {} have {} dependencies and {} dependent tables. List of dependent tables: {}",
dependencies.first,
dependencies.second.dependencies_count,
dependencies.second.dependent_database_objects.size(),
fmt::join(dependencies.second.dependent_database_objects, ", "));
}
}
}

View File

@ -0,0 +1,112 @@
#pragma once
#include <Core/Types.h>
#include <Core/QualifiedTableName.h>
#include <Parsers/IAST_fwd.h>
#include <Interpreters/Context_fwd.h>
#include <Common/ThreadPool.h>
#include <Common/Stopwatch.h>
#include <map>
#include <unordered_map>
#include <unordered_set>
#include <mutex>
namespace Poco
{
class Logger;
}
class AtomicStopwatch;
namespace DB
{
void logAboutProgress(Poco::Logger * log, size_t processed, size_t total, AtomicStopwatch & watch);
class IDatabase;
using DatabasePtr = std::shared_ptr<IDatabase>;
struct ParsedTableMetadata
{
String path;
ASTPtr ast;
};
using ParsedMetadata = std::map<QualifiedTableName, ParsedTableMetadata>;
using TableNames = std::vector<QualifiedTableName>;
struct DependenciesInfo
{
/// How many dependencies this table have
size_t dependencies_count = 0;
/// List of tables/dictionaries which depend on this table/dictionary
TableNames dependent_database_objects;
};
using DependenciesInfos = std::unordered_map<QualifiedTableName, DependenciesInfo>;
using DependenciesInfosIter = std::unordered_map<QualifiedTableName, DependenciesInfo>::iterator;
struct ParsedTablesMetadata
{
String default_database;
std::mutex mutex;
ParsedMetadata parsed_tables;
/// For logging
size_t total_dictionaries = 0;
/// List of tables/dictionaries that do not have any dependencies and can be loaded
TableNames independent_database_objects;
/// Actually it contains two different maps (with, probably, intersecting keys):
/// 1. table/dictionary name -> number of dependencies
/// 2. table/dictionary name -> dependent tables/dictionaries list (adjacency list of dependencies graph).
/// If table A depends on table B, then there is an edge B --> A, i.e. dependencies_info[B].dependent_database_objects contains A.
/// And dependencies_info[C].dependencies_count is a number of incoming edges for vertex C (how many tables we have to load before C).
DependenciesInfos dependencies_info;
};
/// Loads tables (and dictionaries) from specified databases
/// taking into account dependencies between them.
class TablesLoader
{
public:
using Databases = std::map<String, DatabasePtr>;
TablesLoader(ContextMutablePtr global_context_, Databases databases_, bool force_restore_ = false, bool force_attach_ = false);
TablesLoader() = delete;
void loadTables();
void startupTables();
private:
ContextMutablePtr global_context;
Databases databases;
bool force_restore;
bool force_attach;
Strings databases_to_load;
ParsedTablesMetadata metadata;
Poco::Logger * log;
std::atomic<size_t> tables_processed{0};
AtomicStopwatch stopwatch;
ThreadPool pool;
void removeUnresolvableDependencies();
void loadTablesInTopologicalOrder(ThreadPool & pool);
DependenciesInfosIter removeResolvedDependency(const DependenciesInfosIter & info_it, TableNames & independent_database_objects);
void startLoadingIndependentTables(ThreadPool & pool, size_t level);
void checkCyclicDependencies() const;
size_t getNumberOfTablesWithDependencies() const;
void logDependencyGraph() const;
};
}

View File

@ -9,6 +9,7 @@ PEERDIR(
SRCS(
DDLDependencyVisitor.cpp
DatabaseAtomic.cpp
DatabaseDictionary.cpp
DatabaseFactory.cpp
@ -30,6 +31,7 @@ SRCS(
SQLite/DatabaseSQLite.cpp
SQLite/SQLiteUtils.cpp
SQLite/fetchSQLiteTableStructure.cpp
TablesLoader.cpp
)

View File

@ -1,6 +1,7 @@
#include "PostgreSQLDictionarySource.h"
#include <Poco/Util/AbstractConfiguration.h>
#include <Core/QualifiedTableName.h>
#include "DictionarySourceFactory.h"
#include "registerDictionaries.h"
@ -29,19 +30,13 @@ namespace
{
ExternalQueryBuilder makeExternalQueryBuilder(const DictionaryStructure & dict_struct, const String & schema, const String & table, const String & query, const String & where)
{
auto schema_value = schema;
auto table_value = table;
QualifiedTableName qualified_name{schema, table};
if (qualified_name.database.empty() && !qualified_name.table.empty())
qualified_name = QualifiedTableName::parseFromString(qualified_name.table);
if (schema_value.empty())
{
if (auto pos = table_value.find('.'); pos != std::string::npos)
{
schema_value = table_value.substr(0, pos);
table_value = table_value.substr(pos + 1);
}
}
/// Do not need db because it is already in a connection string.
return {dict_struct, "", schema_value, table_value, query, where, IdentifierQuotingStyle::DoubleQuotes};
return {dict_struct, "", qualified_name.database, qualified_name.table, query, where, IdentifierQuotingStyle::DoubleQuotes};
}
}

View File

@ -38,29 +38,22 @@ namespace
const std::string & where_,
IXDBCBridgeHelper & bridge_)
{
std::string schema = schema_;
std::string table = table_;
QualifiedTableName qualified_name{schema_, table_};
if (bridge_.isSchemaAllowed())
{
if (schema.empty())
{
if (auto pos = table.find('.'); pos != std::string::npos)
{
schema = table.substr(0, pos);
table = table.substr(pos + 1);
}
}
if (qualified_name.database.empty())
qualified_name = QualifiedTableName::parseFromString(qualified_name.table);
}
else
{
if (!schema.empty())
if (!qualified_name.database.empty())
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Dictionary source of type {} specifies a schema but schema is not supported by {}-driver",
bridge_.getName());
}
return {dict_struct_, db_, schema, table, query_, where_, bridge_.getIdentifierQuotingStyle()};
return {dict_struct_, db_, qualified_name.database, qualified_name.table, query_, where_, bridge_.getIdentifierQuotingStyle()};
}
}

View File

@ -4,7 +4,6 @@
#include <Poco/DOM/Document.h>
#include <Poco/DOM/Element.h>
#include <Poco/DOM/Text.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/Util/XMLConfiguration.h>
#include <IO/WriteHelpers.h>
#include <Parsers/queryToString.h>
@ -16,6 +15,8 @@
#include <Parsers/ASTDictionaryAttributeDeclaration.h>
#include <Dictionaries/DictionaryFactory.h>
#include <Functions/FunctionFactory.h>
#include <Common/isLocalAddress.h>
#include <Interpreters/Context.h>
namespace DB
@ -576,4 +577,28 @@ getDictionaryConfigurationFromAST(const ASTCreateQuery & query, ContextPtr conte
return conf;
}
std::optional<ClickHouseDictionarySourceInfo>
getInfoIfClickHouseDictionarySource(DictionaryConfigurationPtr & config, ContextPtr global_context)
{
ClickHouseDictionarySourceInfo info;
String host = config->getString("dictionary.source.clickhouse.host", "");
UInt16 port = config->getUInt("dictionary.source.clickhouse.port", 0);
String database = config->getString("dictionary.source.clickhouse.db", "");
String table = config->getString("dictionary.source.clickhouse.table", "");
bool secure = config->getBool("dictionary.source.clickhouse.secure", false);
if (host.empty() || port == 0 || table.empty())
return {};
info.table_name = {database, table};
UInt16 default_port = secure ? global_context->getTCPPortSecure().value_or(0) : global_context->getTCPPort();
if (!isLocalAddress({host, port}, default_port))
return info;
info.is_local = true;
return info;
}
}

View File

@ -15,4 +15,13 @@ using DictionaryConfigurationPtr = Poco::AutoPtr<Poco::Util::AbstractConfigurati
DictionaryConfigurationPtr
getDictionaryConfigurationFromAST(const ASTCreateQuery & query, ContextPtr context, const std::string & database_ = "");
struct ClickHouseDictionarySourceInfo
{
QualifiedTableName table_name;
bool is_local = false;
};
std::optional<ClickHouseDictionarySourceInfo>
getInfoIfClickHouseDictionarySource(DictionaryConfigurationPtr & config, ContextPtr global_context);
}

View File

@ -60,6 +60,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.csv.delimiter = settings.format_csv_delimiter;
format_settings.csv.empty_as_default = settings.input_format_defaults_for_omitted_fields;
format_settings.csv.input_format_enum_as_number = settings.input_format_csv_enum_as_number;
format_settings.csv.null_representation = settings.output_format_csv_null_representation;
format_settings.csv.unquoted_null_literal_as_null = settings.input_format_csv_unquoted_null_literal_as_null;
format_settings.csv.input_format_arrays_as_nested_csv = settings.input_format_csv_arrays_as_nested_csv;
format_settings.custom.escaping_rule = settings.format_custom_escaping_rule;
@ -439,6 +440,18 @@ bool FormatFactory::checkIfFormatIsColumnOriented(const String & name)
return target.is_column_oriented;
}
bool FormatFactory::isInputFormat(const String & name) const
{
auto it = dict.find(name);
return it != dict.end() && (it->second.input_creator || it->second.input_processor_creator);
}
bool FormatFactory::isOutputFormat(const String & name) const
{
auto it = dict.find(name);
return it != dict.end() && (it->second.output_creator || it->second.output_processor_creator);
}
FormatFactory & FormatFactory::instance()
{
static FormatFactory ret;

View File

@ -187,6 +187,9 @@ public:
return dict;
}
bool isInputFormat(const String & name) const;
bool isOutputFormat(const String & name) const;
private:
FormatsDictionary dict;

View File

@ -76,6 +76,7 @@ struct FormatSettings
bool crlf_end_of_line = false;
bool input_format_enum_as_number = false;
bool input_format_arrays_as_nested_csv = false;
String null_representation = "\\N";
} csv;
struct Custom

View File

@ -1,30 +1,35 @@
#pragma once
#include <Functions/IFunction.h>
#include <Functions/FunctionFactory.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/Context.h>
namespace DB
{
/** Returns server uptime in seconds.
*/
class FunctionUptime : public IFunction
/// Base class for constant functions
template<typename Derived, typename T, typename ColumnT>
class FunctionConstantBase : public IFunction
{
public:
static constexpr auto name = "uptime";
static FunctionPtr create(ContextPtr context)
/// For server-level constants (uptime(), version(), etc)
explicit FunctionConstantBase(ContextPtr context, T && constant_value_)
: is_distributed(context->isDistributed())
, constant_value(std::forward<T>(constant_value_))
{
return std::make_shared<FunctionUptime>(context->isDistributed(), context->getUptimeSeconds());
}
explicit FunctionUptime(bool is_distributed_, time_t uptime_) : is_distributed(is_distributed_), uptime(uptime_)
/// For real constants (pi(), e(), etc)
explicit FunctionConstantBase(const T & constant_value_)
: is_distributed(false)
, constant_value(constant_value_)
{
}
String getName() const override
{
return name;
return Derived::name;
}
size_t getNumberOfArguments() const override
@ -34,29 +39,26 @@ public:
DataTypePtr getReturnTypeImpl(const DataTypes & /*arguments*/) const override
{
return std::make_shared<DataTypeUInt32>();
return std::make_shared<ColumnT>();
}
bool isDeterministic() const override { return false; }
bool isDeterministicInScopeOfQuery() const override { return true; }
/// Some functions may return different values on different shards/replicas, so it's not constant for distributed query
bool isSuitableForConstantFolding() const override { return !is_distributed; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
ColumnPtr executeImpl(const ColumnsWithTypeAndName &, const DataTypePtr &, size_t input_rows_count) const override
{
return DataTypeUInt32().createColumnConst(input_rows_count, static_cast<UInt64>(uptime));
return ColumnT().createColumnConst(input_rows_count, constant_value);
}
private:
bool is_distributed;
time_t uptime;
const T constant_value;
};
void registerFunctionUptime(FunctionFactory & factory)
{
factory.registerFunction<FunctionUptime>();
}
}

View File

@ -48,22 +48,11 @@ getJoin(const ColumnsWithTypeAndName & arguments, ContextPtr context)
"Illegal type " + arguments[0].type->getName() + " of first argument of function joinGet, expected a const string.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
size_t dot = join_name.find('.');
String database_name;
if (dot == String::npos)
{
database_name = context->getCurrentDatabase();
dot = 0;
}
else
{
database_name = join_name.substr(0, dot);
++dot;
}
String table_name = join_name.substr(dot);
if (table_name.empty())
throw Exception("joinGet does not allow empty table name", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
auto table = DatabaseCatalog::instance().getTable({database_name, table_name}, std::const_pointer_cast<Context>(context));
auto qualified_name = QualifiedTableName::parseFromString(join_name);
if (qualified_name.database.empty())
qualified_name.database = context->getCurrentDatabase();
auto table = DatabaseCatalog::instance().getTable({qualified_name.database, qualified_name.table}, std::const_pointer_cast<Context>(context));
auto storage_join = std::dynamic_pointer_cast<StorageJoin>(table);
if (!storage_join)
throw Exception("Table " + join_name + " should have engine StorageJoin", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);

View File

@ -1,36 +0,0 @@
#pragma once
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnsNumber.h>
#include <Functions/IFunction.h>
namespace DB
{
template <typename Impl>
class FunctionMathConstFloat64 : public IFunction
{
public:
static constexpr auto name = Impl::name;
static FunctionPtr create(ContextPtr) { return std::make_shared<FunctionMathConstFloat64>(); }
private:
String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 0; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
DataTypePtr getReturnTypeImpl(const DataTypes & /*arguments*/) const override
{
return std::make_shared<DataTypeFloat64>();
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName &, const DataTypePtr & result_type, size_t input_rows_count) const override
{
return result_type->createColumnConst(input_rows_count, Impl::value);
}
};
}

View File

@ -1,80 +0,0 @@
#if defined(__ELF__) && !defined(__FreeBSD__)
#include <Functions/IFunction.h>
#include <Functions/FunctionFactory.h>
#include <DataTypes/DataTypeString.h>
#include <Common/SymbolIndex.h>
#include <Core/Field.h>
#include <Interpreters/Context.h>
namespace DB
{
namespace
{
/** buildId() - returns the compiler build id of the running binary.
*/
class FunctionBuildId : public IFunction
{
public:
static constexpr auto name = "buildId";
static FunctionPtr create(ContextPtr context)
{
return std::make_shared<FunctionBuildId>(context->isDistributed());
}
explicit FunctionBuildId(bool is_distributed_) : is_distributed(is_distributed_)
{
}
String getName() const override
{
return name;
}
size_t getNumberOfArguments() const override
{
return 0;
}
bool isDeterministic() const override { return false; }
bool isDeterministicInScopeOfQuery() const override { return true; }
bool isSuitableForConstantFolding() const override { return !is_distributed; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override
{
return false;
}
DataTypePtr getReturnTypeImpl(const DataTypes & /*arguments*/) const override
{
return std::make_shared<DataTypeString>();
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName &, const DataTypePtr &, size_t input_rows_count) const override
{
return DataTypeString().createColumnConst(input_rows_count, SymbolIndex::instance()->getBuildIDHex());
}
private:
bool is_distributed;
};
}
void registerFunctionBuildId(FunctionFactory & factory)
{
factory.registerFunction<FunctionBuildId>();
}
}
#else
namespace DB
{
class FunctionFactory;
void registerFunctionBuildId(FunctionFactory &) {}
}
#endif

View File

@ -1,24 +0,0 @@
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionMathConstFloat64.h>
namespace DB
{
namespace
{
struct EImpl
{
static constexpr auto name = "e";
static constexpr double value = 2.7182818284590452353602874713526624977572470;
};
using FunctionE = FunctionMathConstFloat64<EImpl>;
}
void registerFunctionE(FunctionFactory & factory)
{
factory.registerFunction<FunctionE>();
}
}

View File

@ -1,70 +0,0 @@
#include <Functions/IFunction.h>
#include <Functions/FunctionFactory.h>
#include <DataTypes/DataTypeString.h>
#include <Common/DNSResolver.h>
#include <Core/Field.h>
#include <Interpreters/Context.h>
namespace DB
{
namespace
{
/// Get the host name. Is is constant on single server, but is not constant in distributed queries.
class FunctionHostName : public IFunction
{
public:
static constexpr auto name = "hostName";
static FunctionPtr create(ContextPtr context)
{
return std::make_shared<FunctionHostName>(context->isDistributed());
}
explicit FunctionHostName(bool is_distributed_) : is_distributed(is_distributed_)
{
}
String getName() const override
{
return name;
}
bool isDeterministic() const override { return false; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
bool isDeterministicInScopeOfQuery() const override
{
return true;
}
bool isSuitableForConstantFolding() const override { return !is_distributed; }
size_t getNumberOfArguments() const override
{
return 0;
}
DataTypePtr getReturnTypeImpl(const DataTypes & /*arguments*/) const override
{
return std::make_shared<DataTypeString>();
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName &, const DataTypePtr & result_type, size_t input_rows_count) const override
{
return result_type->createColumnConst(input_rows_count, DNSResolver::instance().getHostName());
}
private:
bool is_distributed;
};
}
void registerFunctionHostName(FunctionFactory & factory)
{
factory.registerFunction<FunctionHostName>();
factory.registerAlias("hostname", "hostName");
}
}

View File

@ -0,0 +1,47 @@
#include <Functions/FunctionConstantBase.h>
#include <DataTypes/DataTypesNumber.h>
namespace DB
{
namespace
{
template <typename Impl>
class FunctionMathConstFloat64 : public FunctionConstantBase<FunctionMathConstFloat64<Impl>, Float64, DataTypeFloat64>
{
public:
static constexpr auto name = Impl::name;
static FunctionPtr create(ContextPtr) { return std::make_shared<FunctionMathConstFloat64>(); }
FunctionMathConstFloat64() : FunctionConstantBase<FunctionMathConstFloat64<Impl>, Float64, DataTypeFloat64>(Impl::value) {}
};
struct EImpl
{
static constexpr char name[] = "e";
static constexpr double value = 2.7182818284590452353602874713526624977572470;
};
using FunctionE = FunctionMathConstFloat64<EImpl>;
struct PiImpl
{
static constexpr char name[] = "pi";
static constexpr double value = 3.1415926535897932384626433832795028841971693;
};
using FunctionPi = FunctionMathConstFloat64<PiImpl>;
}
void registerFunctionE(FunctionFactory & factory)
{
factory.registerFunction<FunctionE>();
}
void registerFunctionPi(FunctionFactory & factory)
{
factory.registerFunction<FunctionPi>(FunctionFactory::CaseInsensitive);
}
}

View File

@ -1,24 +0,0 @@
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionMathConstFloat64.h>
namespace DB
{
namespace
{
struct PiImpl
{
static constexpr auto name = "pi";
static constexpr double value = 3.1415926535897932384626433832795028841971693;
};
using FunctionPi = FunctionMathConstFloat64<PiImpl>;
}
void registerFunctionPi(FunctionFactory & factory)
{
factory.registerFunction<FunctionPi>(FunctionFactory::CaseInsensitive);
}
}

View File

@ -80,6 +80,7 @@ void registerFunctionIsIPAddressContainedIn(FunctionFactory &);
void registerFunctionQueryID(FunctionFactory & factory);
void registerFunctionInitialQueryID(FunctionFactory & factory);
void registerFunctionServerUUID(FunctionFactory &);
void registerFunctionZooKeeperSessionUptime(FunctionFactory &);
#if USE_ICU
void registerFunctionConvertCharset(FunctionFactory &);
@ -160,6 +161,7 @@ void registerFunctionsMiscellaneous(FunctionFactory & factory)
registerFunctionQueryID(factory);
registerFunctionInitialQueryID(factory);
registerFunctionServerUUID(factory);
registerFunctionZooKeeperSessionUptime(factory);
#if USE_ICU
registerFunctionConvertCharset(factory);

View File

@ -0,0 +1,144 @@
#include <Functions/FunctionConstantBase.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeUUID.h>
#include <Core/ServerUUID.h>
#include <Common/SymbolIndex.h>
#include <Common/DNSResolver.h>
#include <common/DateLUT.h>
#if !defined(ARCADIA_BUILD)
# include <Common/config_version.h>
#endif
namespace DB
{
namespace
{
#if defined(__ELF__) && !defined(__FreeBSD__)
/// buildId() - returns the compiler build id of the running binary.
class FunctionBuildId : public FunctionConstantBase<FunctionBuildId, String, DataTypeString>
{
public:
static constexpr auto name = "buildId";
static FunctionPtr create(ContextPtr context) { return std::make_shared<FunctionBuildId>(context); }
explicit FunctionBuildId(ContextPtr context) : FunctionConstantBase(context, SymbolIndex::instance()->getBuildIDHex()) {}
};
#endif
/// Get the host name. Is is constant on single server, but is not constant in distributed queries.
class FunctionHostName : public FunctionConstantBase<FunctionHostName, String, DataTypeString>
{
public:
static constexpr auto name = "hostName";
static FunctionPtr create(ContextPtr context) { return std::make_shared<FunctionHostName>(context); }
explicit FunctionHostName(ContextPtr context) : FunctionConstantBase(context, DNSResolver::instance().getHostName()) {}
};
class FunctionServerUUID : public FunctionConstantBase<FunctionServerUUID, UUID, DataTypeUUID>
{
public:
static constexpr auto name = "serverUUID";
static FunctionPtr create(ContextPtr context) { return std::make_shared<FunctionServerUUID>(context); }
explicit FunctionServerUUID(ContextPtr context) : FunctionConstantBase(context, ServerUUID::get()) {}
};
class FunctionTcpPort : public FunctionConstantBase<FunctionTcpPort, UInt16, DataTypeUInt16>
{
public:
static constexpr auto name = "tcpPort";
static FunctionPtr create(ContextPtr context) { return std::make_shared<FunctionTcpPort>(context); }
explicit FunctionTcpPort(ContextPtr context) : FunctionConstantBase(context, context->getTCPPort()) {}
};
/// Returns the server time zone.
class FunctionTimezone : public FunctionConstantBase<FunctionTimezone, String, DataTypeString>
{
public:
static constexpr auto name = "timezone";
static FunctionPtr create(ContextPtr context) { return std::make_shared<FunctionTimezone>(context); }
explicit FunctionTimezone(ContextPtr context) : FunctionConstantBase(context, String{DateLUT::instance().getTimeZone()}) {}
};
/// Returns server uptime in seconds.
class FunctionUptime : public FunctionConstantBase<FunctionUptime, UInt32, DataTypeUInt32>
{
public:
static constexpr auto name = "uptime";
static FunctionPtr create(ContextPtr context) { return std::make_shared<FunctionUptime>(context); }
explicit FunctionUptime(ContextPtr context) : FunctionConstantBase(context, context->getUptimeSeconds()) {}
};
/// version() - returns the current version as a string.
class FunctionVersion : public FunctionConstantBase<FunctionVersion, String, DataTypeString>
{
public:
static constexpr auto name = "version";
static FunctionPtr create(ContextPtr context) { return std::make_shared<FunctionVersion>(context); }
explicit FunctionVersion(ContextPtr context) : FunctionConstantBase(context, VERSION_STRING) {}
};
class FunctionZooKeeperSessionUptime : public FunctionConstantBase<FunctionZooKeeperSessionUptime, UInt32, DataTypeUInt32>
{
public:
static constexpr auto name = "zookeeperSessionUptime";
explicit FunctionZooKeeperSessionUptime(ContextPtr context) : FunctionConstantBase(context, context->getZooKeeperSessionUptime()) {}
static FunctionPtr create(ContextPtr context) { return std::make_shared<FunctionZooKeeperSessionUptime>(context); }
};
}
void registerFunctionBuildId([[maybe_unused]] FunctionFactory & factory)
{
#if defined(__ELF__) && !defined(__FreeBSD__)
factory.registerFunction<FunctionBuildId>();
#endif
}
void registerFunctionHostName(FunctionFactory & factory)
{
factory.registerFunction<FunctionHostName>();
factory.registerAlias("hostname", "hostName");
}
void registerFunctionServerUUID(FunctionFactory & factory)
{
factory.registerFunction<FunctionServerUUID>();
}
void registerFunctionTcpPort(FunctionFactory & factory)
{
factory.registerFunction<FunctionTcpPort>();
}
void registerFunctionTimezone(FunctionFactory & factory)
{
factory.registerFunction<FunctionTimezone>();
factory.registerAlias("timeZone", "timezone");
}
void registerFunctionUptime(FunctionFactory & factory)
{
factory.registerFunction<FunctionUptime>();
}
void registerFunctionVersion(FunctionFactory & factory)
{
factory.registerFunction<FunctionVersion>(FunctionFactory::CaseInsensitive);
}
void registerFunctionZooKeeperSessionUptime(FunctionFactory & factory)
{
factory.registerFunction<FunctionZooKeeperSessionUptime>();
}
}

View File

@ -1,60 +0,0 @@
#include <Core/ServerUUID.h>
#include <DataTypes/DataTypeUUID.h>
#include <Functions/FunctionFactory.h>
#include <Interpreters/Context.h>
namespace DB
{
namespace
{
class FunctionServerUUID : public IFunction
{
public:
static constexpr auto name = "serverUUID";
static FunctionPtr create(ContextPtr context)
{
return std::make_shared<FunctionServerUUID>(context->isDistributed(), ServerUUID::get());
}
explicit FunctionServerUUID(bool is_distributed_, UUID server_uuid_)
: is_distributed(is_distributed_), server_uuid(server_uuid_)
{
}
String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 0; }
DataTypePtr getReturnTypeImpl(const DataTypes &) const override { return std::make_shared<DataTypeUUID>(); }
bool isDeterministic() const override { return false; }
bool isDeterministicInScopeOfQuery() const override { return true; }
bool isSuitableForConstantFolding() const override { return !is_distributed; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo &) const override { return false; }
ColumnPtr executeImpl(const ColumnsWithTypeAndName &, const DataTypePtr &, size_t input_rows_count) const override
{
return DataTypeUUID().createColumnConst(input_rows_count, server_uuid);
}
private:
bool is_distributed;
const UUID server_uuid;
};
}
void registerFunctionServerUUID(FunctionFactory & factory)
{
factory.registerFunction<FunctionServerUUID>();
}
}

View File

@ -1,57 +0,0 @@
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionFactory.h>
#include <Interpreters/Context.h>
namespace DB
{
namespace
{
class FunctionTcpPort : public IFunction
{
public:
static constexpr auto name = "tcpPort";
static FunctionPtr create(ContextPtr context)
{
return std::make_shared<FunctionTcpPort>(context->isDistributed(), context->getTCPPort());
}
explicit FunctionTcpPort(bool is_distributed_, UInt16 port_) : is_distributed(is_distributed_), port(port_)
{
}
String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 0; }
DataTypePtr getReturnTypeImpl(const DataTypes &) const override { return std::make_shared<DataTypeUInt16>(); }
bool isDeterministic() const override { return false; }
bool isDeterministicInScopeOfQuery() const override { return true; }
bool isSuitableForConstantFolding() const override { return !is_distributed; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
ColumnPtr executeImpl(const ColumnsWithTypeAndName &, const DataTypePtr &, size_t input_rows_count) const override
{
return DataTypeUInt16().createColumnConst(input_rows_count, port);
}
private:
bool is_distributed;
const UInt64 port;
};
}
void registerFunctionTcpPort(FunctionFactory & factory)
{
factory.registerFunction<FunctionTcpPort>();
}
}

View File

@ -1,65 +0,0 @@
#include <Functions/IFunction.h>
#include <Functions/FunctionFactory.h>
#include <common/DateLUT.h>
#include <Core/Field.h>
#include <DataTypes/DataTypeString.h>
#include <Interpreters/Context.h>
namespace DB
{
namespace
{
/** Returns the server time zone.
*/
class FunctionTimezone : public IFunction
{
public:
static constexpr auto name = "timezone";
static FunctionPtr create(ContextPtr context)
{
return std::make_shared<FunctionTimezone>(context->isDistributed());
}
explicit FunctionTimezone(bool is_distributed_) : is_distributed(is_distributed_)
{
}
String getName() const override
{
return name;
}
size_t getNumberOfArguments() const override
{
return 0;
}
DataTypePtr getReturnTypeImpl(const DataTypes & /*arguments*/) const override
{
return std::make_shared<DataTypeString>();
}
bool isDeterministic() const override { return false; }
bool isDeterministicInScopeOfQuery() const override { return true; }
bool isSuitableForConstantFolding() const override { return !is_distributed; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
ColumnPtr executeImpl(const ColumnsWithTypeAndName &, const DataTypePtr &, size_t input_rows_count) const override
{
return DataTypeString().createColumnConst(input_rows_count, DateLUT::instance().getTimeZone());
}
private:
bool is_distributed;
};
}
void registerFunctionTimezone(FunctionFactory & factory)
{
factory.registerFunction<FunctionTimezone>();
factory.registerAlias("timeZone", "timezone");
}
}

View File

@ -1,63 +0,0 @@
#include <Functions/IFunction.h>
#include <Functions/FunctionFactory.h>
#include <DataTypes/DataTypeString.h>
#include <Core/Field.h>
#include <Interpreters/Context.h>
#if !defined(ARCADIA_BUILD)
# include <Common/config_version.h>
#endif
namespace DB
{
/** version() - returns the current version as a string.
*/
class FunctionVersion : public IFunction
{
public:
static constexpr auto name = "version";
static FunctionPtr create(ContextPtr context)
{
return std::make_shared<FunctionVersion>(context->isDistributed());
}
explicit FunctionVersion(bool is_distributed_) : is_distributed(is_distributed_)
{
}
String getName() const override
{
return name;
}
bool isDeterministic() const override { return false; }
bool isDeterministicInScopeOfQuery() const override { return true; }
bool isSuitableForConstantFolding() const override { return !is_distributed; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
size_t getNumberOfArguments() const override
{
return 0;
}
DataTypePtr getReturnTypeImpl(const DataTypes & /*arguments*/) const override
{
return std::make_shared<DataTypeString>();
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName &, const DataTypePtr &, size_t input_rows_count) const override
{
return DataTypeString().createColumnConst(input_rows_count, VERSION_STRING);
}
private:
bool is_distributed;
};
void registerFunctionVersion(FunctionFactory & factory)
{
factory.registerFunction<FunctionVersion>(FunctionFactory::CaseInsensitive);
}
}

View File

@ -218,7 +218,6 @@ SRCS(
blockNumber.cpp
blockSerializedSize.cpp
blockSize.cpp
buildId.cpp
byteSize.cpp
caseWithExpression.cpp
cbrt.cpp
@ -249,7 +248,6 @@ SRCS(
divide/divide.cpp
divide/divideImpl.cpp
dumpColumnStructure.cpp
e.cpp
empty.cpp
encodeXMLComponent.cpp
encrypt.cpp
@ -306,6 +304,7 @@ SRCS(
h3IndexesAreNeighbors.cpp
h3IsValid.cpp
h3ToChildren.cpp
h3ToGeoBoundary.cpp
h3ToParent.cpp
h3ToString.cpp
h3kRing.cpp
@ -314,7 +313,6 @@ SRCS(
hasThreadFuzzer.cpp
hasToken.cpp
hasTokenCaseInsensitive.cpp
hostName.cpp
hyperscanRegexpChecker.cpp
hypot.cpp
identity.cpp
@ -362,6 +360,7 @@ SRCS(
map.cpp
match.cpp
materialize.cpp
mathConstants.cpp
minus.cpp
modulo.cpp
moduloOrZero.cpp
@ -402,7 +401,6 @@ SRCS(
nullIf.cpp
padString.cpp
partitionId.cpp
pi.cpp
plus.cpp
pointInEllipses.cpp
pointInPolygon.cpp
@ -479,7 +477,7 @@ SRCS(
s2RectIntersection.cpp
s2RectUnion.cpp
s2ToGeo.cpp
serverUUID.cpp
serverConstants.cpp
sigmoid.cpp
sign.cpp
sin.cpp
@ -505,13 +503,11 @@ SRCS(
synonyms.cpp
tan.cpp
tanh.cpp
tcpPort.cpp
tgamma.cpp
throwIf.cpp
tid.cpp
timeSlot.cpp
timeSlots.cpp
timezone.cpp
timezoneOf.cpp
timezoneOffset.cpp
toColumnTypeName.cpp
@ -574,9 +570,7 @@ SRCS(
tupleToNameValuePairs.cpp
upper.cpp
upperUTF8.cpp
uptime.cpp
validateNestedArraySizes.cpp
version.cpp
visibleWidth.cpp
visitParamExtractBool.cpp
visitParamExtractFloat.cpp

View File

@ -18,6 +18,7 @@
#include <Common/SipHash.h>
#include <Common/FieldVisitorHash.h>
#include <Access/AccessFlags.h>
#include <Formats/FormatFactory.h>
namespace DB
@ -27,6 +28,7 @@ namespace ErrorCodes
{
extern const int TIMEOUT_EXCEEDED;
extern const int UNKNOWN_EXCEPTION;
extern const int UNKNOWN_FORMAT;
}
AsynchronousInsertQueue::InsertQuery::InsertQuery(const ASTPtr & query_, const Settings & settings_)
@ -166,6 +168,9 @@ void AsynchronousInsertQueue::push(ASTPtr query, ContextPtr query_context)
auto table = interpreter.getTable(insert_query);
auto sample_block = interpreter.getSampleBlock(insert_query, table, table->getInMemoryMetadataPtr());
if (!FormatFactory::instance().isInputFormat(insert_query.format))
throw Exception(ErrorCodes::UNKNOWN_FORMAT, "Unknown input format {}", insert_query.format);
query_context->checkAccess(AccessType::INSERT, insert_query.table_id, sample_block.getNames());
String bytes;
@ -324,7 +329,7 @@ void AsynchronousInsertQueue::cleanup()
}
if (total_removed)
LOG_TRACE(log, "Removed stale entries for {} queries from asynchronous insertion queue", keys_to_remove.size());
LOG_TRACE(log, "Removed stale entries for {} queries from asynchronous insertion queue", total_removed);
}
{

View File

@ -1689,6 +1689,14 @@ zkutil::ZooKeeperPtr Context::getZooKeeper() const
return shared->zookeeper;
}
UInt32 Context::getZooKeeperSessionUptime() const
{
std::lock_guard lock(shared->zookeeper_mutex);
if (!shared->zookeeper || shared->zookeeper->expired())
return 0;
return shared->zookeeper->getSessionUptime();
}
void Context::setSystemZooKeeperLogAfterInitializationIfNeeded()
{
/// It can be nearly impossible to understand in which order global objects are initialized on server startup.
@ -2769,8 +2777,8 @@ void Context::setAsynchronousInsertQueue(const std::shared_ptr<AsynchronousInser
{
using namespace std::chrono;
if (std::chrono::milliseconds(settings.async_insert_busy_timeout) == 0ms)
throw Exception("Setting async_insert_busy_timeout can't be zero", ErrorCodes::INVALID_SETTING_VALUE);
if (std::chrono::milliseconds(settings.async_insert_busy_timeout_ms) == 0ms)
throw Exception("Setting async_insert_busy_timeout_ms can't be zero", ErrorCodes::INVALID_SETTING_VALUE);
shared->async_insert_queue = ptr;
}

View File

@ -659,6 +659,8 @@ public:
/// Same as above but return a zookeeper connection from auxiliary_zookeepers configuration entry.
std::shared_ptr<zkutil::ZooKeeper> getAuxiliaryZooKeeper(const String & name) const;
UInt32 getZooKeeperSessionUptime() const;
#if USE_NURAFT
std::shared_ptr<KeeperDispatcher> & getKeeperDispatcher() const;
#endif

View File

@ -30,6 +30,7 @@
#if USE_LIBPQXX
# include <Storages/PostgreSQL/StorageMaterializedPostgreSQL.h>
# include <Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.h>
#endif
namespace fs = std::filesystem;
@ -156,15 +157,6 @@ void DatabaseCatalog::loadDatabases()
/// Another background thread which drops temporary LiveViews.
/// We should start it after loadMarkedAsDroppedTables() to avoid race condition.
TemporaryLiveViewCleaner::instance().startup();
/// Start up tables after all databases are loaded.
for (const auto & [database_name, database] : databases)
{
if (database_name == DatabaseCatalog::TEMPORARY_DATABASE)
continue;
database->startupTables();
}
}
void DatabaseCatalog::shutdownImpl()
@ -249,7 +241,9 @@ DatabaseAndTable DatabaseCatalog::getTableImpl(
#if USE_LIBPQXX
if (!context_->isInternalQuery() && (db_and_table.first->getEngineName() == "MaterializedPostgreSQL"))
{
db_and_table.second = std::make_shared<StorageMaterializedPostgreSQL>(std::move(db_and_table.second), getContext());
db_and_table.second = std::make_shared<StorageMaterializedPostgreSQL>(std::move(db_and_table.second), getContext(),
assert_cast<const DatabaseMaterializedPostgreSQL *>(db_and_table.first.get())->getPostgreSQLDatabaseName(),
db_and_table.second->getStorageID().table_name);
}
#endif

View File

@ -89,57 +89,53 @@ DictionaryStructure ExternalDictionariesLoader::getDictionaryStructure(const std
std::string ExternalDictionariesLoader::resolveDictionaryName(const std::string & dictionary_name, const std::string & current_database_name) const
{
bool has_dictionary = has(dictionary_name);
if (has_dictionary)
if (has(dictionary_name))
return dictionary_name;
std::string resolved_name = resolveDictionaryNameFromDatabaseCatalog(dictionary_name);
has_dictionary = has(resolved_name);
std::string resolved_name = resolveDictionaryNameFromDatabaseCatalog(dictionary_name, current_database_name);
if (!has_dictionary)
{
/// If dictionary not found. And database was not implicitly specified
/// we can qualify dictionary name with current database name.
/// It will help if dictionary is created with DDL and is in current database.
if (dictionary_name.find('.') == std::string::npos)
{
String dictionary_name_with_database = current_database_name + '.' + dictionary_name;
resolved_name = resolveDictionaryNameFromDatabaseCatalog(dictionary_name_with_database);
has_dictionary = has(resolved_name);
}
}
if (has(resolved_name))
return resolved_name;
if (!has_dictionary)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Dictionary ({}) not found", backQuote(dictionary_name));
return resolved_name;
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Dictionary ({}) not found", backQuote(dictionary_name));
}
std::string ExternalDictionariesLoader::resolveDictionaryNameFromDatabaseCatalog(const std::string & name) const
std::string ExternalDictionariesLoader::resolveDictionaryNameFromDatabaseCatalog(const std::string & name, const std::string & current_database_name) const
{
/// If it's dictionary from Atomic database, then we need to convert qualified name to UUID.
/// Try to split name and get id from associated StorageDictionary.
/// If something went wrong, return name as is.
auto pos = name.find('.');
if (pos == std::string::npos || name.find('.', pos + 1) != std::string::npos)
return name;
String res = name;
std::string maybe_database_name = name.substr(0, pos);
std::string maybe_table_name = name.substr(pos + 1);
auto qualified_name = QualifiedTableName::tryParseFromString(name);
if (!qualified_name)
return res;
if (qualified_name->database.empty())
{
/// Ether database name is not specified and we should use current one
/// or it's an XML dictionary.
bool is_xml_dictionary = has(name);
if (is_xml_dictionary)
return res;
qualified_name->database = current_database_name;
res = current_database_name + '.' + name;
}
auto [db, table] = DatabaseCatalog::instance().tryGetDatabaseAndTable(
{maybe_database_name, maybe_table_name},
{qualified_name->database, qualified_name->table},
const_pointer_cast<Context>(getContext()));
if (!db)
return name;
return res;
assert(table);
if (db->getUUID() == UUIDHelpers::Nil)
return name;
return res;
if (table->getName() != "Dictionary")
return name;
return res;
return toString(table->getStorageID().uuid);
}

View File

@ -42,7 +42,7 @@ protected:
std::string resolveDictionaryName(const std::string & dictionary_name, const std::string & current_database_name) const;
/// Try convert qualified dictionary name to persistent UUID
std::string resolveDictionaryNameFromDatabaseCatalog(const std::string & name) const;
std::string resolveDictionaryNameFromDatabaseCatalog(const std::string & name, const std::string & current_database_name) const;
friend class StorageSystemDictionaries;
friend class DatabaseDictionary;

View File

@ -0,0 +1,83 @@
#include <string>
#include <Interpreters/GatherFunctionQuantileVisitor.h>
#include <Common/Exception.h>
#include <common/types.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
/// Mapping from quantile functions for single value to plural
static const std::unordered_map<String, String> quantile_fuse_name_mapping = {
{NameQuantile::name, NameQuantiles::name},
{NameQuantileDeterministic::name, NameQuantilesDeterministic::name},
{NameQuantileExact::name, NameQuantilesExact::name},
{NameQuantileExactLow::name, NameQuantilesExactLow::name},
{NameQuantileExactHigh::name, NameQuantilesExactHigh::name},
{NameQuantileExactExclusive::name, NameQuantilesExactExclusive::name},
{NameQuantileExactInclusive::name, NameQuantilesExactInclusive::name},
{NameQuantileExactWeighted::name, NameQuantilesExactWeighted::name},
{NameQuantileTiming::name, NameQuantilesTiming::name},
{NameQuantileTimingWeighted::name, NameQuantilesTimingWeighted::name},
{NameQuantileTDigest::name, NameQuantilesTDigest::name},
{NameQuantileTDigestWeighted::name, NameQuantilesTDigestWeighted::name},
{NameQuantileBFloat16::name, NameQuantilesBFloat16::name}
};
String GatherFunctionQuantileData::getFusedName(const String & func_name)
{
if (auto it = quantile_fuse_name_mapping.find(func_name); it != quantile_fuse_name_mapping.end())
return it->second;
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Function '{}' is not quantile-family or cannot be fused", func_name);
}
void GatherFunctionQuantileData::visit(ASTFunction & function, ASTPtr & ast)
{
if (!quantile_fuse_name_mapping.contains(function.name))
return;
fuse_quantile[function.name].addFuncNode(ast);
}
void GatherFunctionQuantileData::FuseQuantileAggregatesData::addFuncNode(ASTPtr & ast)
{
const auto * func = ast->as<ASTFunction>();
if (!func)
return;
const auto & arguments = func->arguments->children;
bool need_two_args = func->name == NameQuantileDeterministic::name
|| func->name == NameQuantileExactWeighted::name
|| func->name == NameQuantileTimingWeighted::name
|| func->name == NameQuantileTDigestWeighted::name;
if (arguments.size() != (need_two_args ? 2 : 1))
return;
if (arguments[0]->getColumnName().find(',') != std::string::npos)
return;
String arg_name = arguments[0]->getColumnName();
if (need_two_args)
{
if (arguments[1]->getColumnName().find(',') != std::string::npos)
return;
arg_name += "," + arguments[1]->getColumnName();
}
arg_map_function[arg_name].push_back(&ast);
}
bool GatherFunctionQuantileData::needChild(const ASTPtr & node, const ASTPtr &)
{
/// Skip children of quantile* functions to escape cycles in further processing
if (const auto * func = node ? node->as<ASTFunction>() : nullptr)
return !quantile_fuse_name_mapping.contains(func->name);
return true;
}
}

View File

@ -0,0 +1,35 @@
#pragma once
#include <Parsers/ASTFunction.h>
#include <Parsers/IAST.h>
#include <Interpreters/InDepthNodeVisitor.h>
#include <AggregateFunctions/AggregateFunctionQuantile.h>
namespace DB
{
/// Gather all the `quantile*` functions
class GatherFunctionQuantileData
{
public:
struct FuseQuantileAggregatesData
{
std::unordered_map<String, std::vector<ASTPtr *>> arg_map_function;
void addFuncNode(ASTPtr & ast);
};
using TypeToVisit = ASTFunction;
std::unordered_map<String, FuseQuantileAggregatesData> fuse_quantile;
void visit(ASTFunction & function, ASTPtr & ast);
static String getFusedName(const String & func_name);
static bool needChild(const ASTPtr & node, const ASTPtr &);
};
using GatherFunctionQuantileVisitor = InDepthNodeVisitor<OneTypeMatcher<GatherFunctionQuantileData, GatherFunctionQuantileData::needChild>, true>;
}

View File

@ -40,12 +40,23 @@ InterpreterAlterQuery::InterpreterAlterQuery(const ASTPtr & query_ptr_, ContextP
{
}
BlockIO InterpreterAlterQuery::execute()
{
BlockIO res;
const auto & alter = query_ptr->as<ASTAlterQuery &>();
if (alter.alter_object == ASTAlterQuery::AlterObjectType::DATABASE)
return executeToDatabase(alter);
else if (alter.alter_object == ASTAlterQuery::AlterObjectType::TABLE
|| alter.alter_object == ASTAlterQuery::AlterObjectType::LIVE_VIEW)
return executeToTable(alter);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown alter object type");
}
BlockIO InterpreterAlterQuery::executeToTable(const ASTAlterQuery & alter)
{
BlockIO res;
if (!alter.cluster.empty())
return executeDDLQueryOnCluster(query_ptr, getContext(), getRequiredAccess());
@ -81,7 +92,9 @@ BlockIO InterpreterAlterQuery::execute()
{
auto * command_ast = child->as<ASTAlterCommand>();
if (auto alter_command = AlterCommand::parse(command_ast))
{
alter_commands.emplace_back(std::move(*alter_command));
}
else if (auto partition_command = PartitionCommand::parse(command_ast))
{
partition_commands.emplace_back(std::move(*partition_command));
@ -95,7 +108,9 @@ BlockIO InterpreterAlterQuery::execute()
mutation_commands.emplace_back(std::move(*mut_command));
}
else if (auto live_view_command = LiveViewCommand::parse(command_ast))
{
live_view_commands.emplace_back(std::move(*live_view_command));
}
else
throw Exception("Wrong parameter type in ALTER query", ErrorCodes::LOGICAL_ERROR);
}
@ -152,6 +167,45 @@ BlockIO InterpreterAlterQuery::execute()
}
BlockIO InterpreterAlterQuery::executeToDatabase(const ASTAlterQuery & alter)
{
BlockIO res;
getContext()->checkAccess(getRequiredAccess());
DatabasePtr database = DatabaseCatalog::instance().getDatabase(alter.database);
AlterCommands alter_commands;
for (const auto & child : alter.command_list->children)
{
auto * command_ast = child->as<ASTAlterCommand>();
if (auto alter_command = AlterCommand::parse(command_ast))
alter_commands.emplace_back(std::move(*alter_command));
else
throw Exception("Wrong parameter type in ALTER DATABASE query", ErrorCodes::LOGICAL_ERROR);
}
if (!alter_commands.empty())
{
/// Only ALTER SETTING is supported.
for (const auto & command : alter_commands)
{
if (command.type != AlterCommand::MODIFY_DATABASE_SETTING)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported alter type for database engines");
}
for (const auto & command : alter_commands)
{
if (!command.ignore)
{
if (command.type == AlterCommand::MODIFY_DATABASE_SETTING)
database->applySettingsChanges(command.settings_changes, getContext());
else
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported alter command");
}
}
}
return res;
}
AccessRightsElements InterpreterAlterQuery::getRequiredAccess() const
{
AccessRightsElements required_access;
@ -351,6 +405,11 @@ AccessRightsElements InterpreterAlterQuery::getRequiredAccessForCommand(const AS
required_access.emplace_back(AccessType::ALTER_RENAME_COLUMN, database, table, column_name());
break;
}
case ASTAlterCommand::MODIFY_DATABASE_SETTING:
{
required_access.emplace_back(AccessType::ALTER_DATABASE_SETTINGS, database, table);
break;
}
case ASTAlterCommand::NO_TYPE: break;
}
@ -362,7 +421,7 @@ void InterpreterAlterQuery::extendQueryLogElemImpl(QueryLogElement & elem, const
const auto & alter = ast->as<const ASTAlterQuery &>();
elem.query_kind = "Alter";
if (alter.command_list != nullptr)
if (alter.command_list != nullptr && alter.alter_object != ASTAlterQuery::AlterObjectType::DATABASE)
{
// Alter queries already have their target table inserted into `elem`.
if (elem.query_tables.size() != 1)

View File

@ -9,6 +9,7 @@ namespace DB
class AccessRightsElements;
class ASTAlterCommand;
class ASTAlterQuery;
/** Allows you add or remove a column in the table.
@ -28,6 +29,10 @@ public:
private:
AccessRightsElements getRequiredAccess() const;
BlockIO executeToTable(const ASTAlterQuery & alter);
BlockIO executeToDatabase(const ASTAlterQuery & alter);
ASTPtr query_ptr;
};

View File

@ -53,6 +53,7 @@
#include <Databases/DatabaseReplicated.h>
#include <Databases/IDatabase.h>
#include <Databases/DatabaseOnDisk.h>
#include <Databases/TablesLoader.h>
#include <Compression/CompressionFactory.h>
@ -271,9 +272,13 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
renamed = true;
}
/// We use global context here, because storages lifetime is bigger than query context lifetime
database->loadStoredObjects(
getContext()->getGlobalContext(), has_force_restore_data_flag, create.attach && force_attach, skip_startup_tables); //-V560
if (!load_database_without_tables)
{
/// We use global context here, because storages lifetime is bigger than query context lifetime
TablesLoader loader{getContext()->getGlobalContext(), {{database_name, database}}, has_force_restore_data_flag, create.attach && force_attach}; //-V560
loader.loadTables();
loader.startupTables();
}
}
catch (...)
{

View File

@ -52,9 +52,9 @@ public:
force_attach = force_attach_;
}
void setSkipStartupTables(bool skip_startup_tables_)
void setLoadDatabaseWithoutTables(bool load_database_without_tables_)
{
skip_startup_tables = skip_startup_tables_;
load_database_without_tables = load_database_without_tables_;
}
/// Obtain information about columns, their types, default values and column comments,
@ -99,7 +99,7 @@ private:
/// Is this an internal query - not from the user.
bool internal = false;
bool force_attach = false;
bool skip_startup_tables = false;
bool load_database_without_tables = false;
mutable String as_database_saved;
mutable String as_table_saved;

View File

@ -355,6 +355,13 @@ BlockIO InterpreterDropQuery::executeToDatabaseImpl(const ASTDropQuery & query,
}
}
if (!drop && query.no_delay)
{
/// Avoid "some tables are still in use" when sync mode is enabled
for (const auto & table_uuid : uuids_to_wait)
database->waitDetachedTableNotInUse(table_uuid);
}
/// Protects from concurrent CREATE TABLE queries
auto db_guard = DatabaseCatalog::instance().getExclusiveDDLGuardForDatabase(database_name);

View File

@ -40,6 +40,7 @@
#include <Parsers/ASTSystemQuery.h>
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <Common/ThreadFuzzer.h>
#include <csignal>
#include <algorithm>
@ -445,6 +446,12 @@ BlockIO InterpreterSystemQuery::execute()
case Type::STOP_LISTEN_QUERIES:
case Type::START_LISTEN_QUERIES:
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "{} is not supported yet", query.type);
case Type::STOP_THREAD_FUZZER:
ThreadFuzzer::stop();
break;
case Type::START_THREAD_FUZZER:
ThreadFuzzer::start();
break;
default:
throw Exception("Unknown type of SYSTEM query", ErrorCodes::BAD_ARGUMENTS);
}
@ -877,6 +884,8 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
}
case Type::STOP_LISTEN_QUERIES: break;
case Type::START_LISTEN_QUERIES: break;
case Type::STOP_THREAD_FUZZER: break;
case Type::START_THREAD_FUZZER: break;
case Type::UNKNOWN: break;
case Type::END: break;
}

View File

@ -571,6 +571,7 @@ ASTs InterpreterAlterImpl::getRewrittenQueries(
auto rewritten_rename_query = std::make_shared<ASTRenameQuery>();
rewritten_alter_query->database = mapped_to_database;
rewritten_alter_query->table = alter_query.table;
rewritten_alter_query->alter_object = ASTAlterQuery::AlterObjectType::TABLE;
rewritten_alter_query->set(rewritten_alter_query->command_list, std::make_shared<ASTExpressionList>());
String default_after_column;

View File

@ -18,6 +18,7 @@
#include <Interpreters/RewriteFunctionToSubcolumnVisitor.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExternalDictionariesLoader.h>
#include <Interpreters/GatherFunctionQuantileVisitor.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
@ -626,6 +627,59 @@ void optimizeFunctionsToSubcolumns(ASTPtr & query, const StorageMetadataPtr & me
RewriteFunctionToSubcolumnVisitor(data).visit(query);
}
std::shared_ptr<ASTFunction> getQuantileFuseCandidate(const String & func_name, std::vector<ASTPtr *> & functions)
{
if (functions.size() < 2)
return nullptr;
const auto & common_arguments = (*functions[0])->as<ASTFunction>()->arguments->children;
auto func_base = makeASTFunction(GatherFunctionQuantileData::getFusedName(func_name));
func_base->arguments->children = common_arguments;
func_base->parameters = std::make_shared<ASTExpressionList>();
for (const auto * ast : functions)
{
assert(ast && *ast);
const auto * func = (*ast)->as<ASTFunction>();
assert(func && func->parameters->as<ASTExpressionList>());
const ASTs & parameters = func->parameters->as<ASTExpressionList &>().children;
if (parameters.size() != 1)
return nullptr; /// query is illegal, give up
func_base->parameters->children.push_back(parameters[0]);
}
return func_base;
}
/// Rewrites multi quantile()() functions with the same arguments to quantiles()()[]
/// eg:SELECT quantile(0.5)(x), quantile(0.9)(x), quantile(0.95)(x) FROM...
/// rewrite to : SELECT quantiles(0.5, 0.9, 0.95)(x)[1], quantiles(0.5, 0.9, 0.95)(x)[2], quantiles(0.5, 0.9, 0.95)(x)[3] FROM ...
void optimizeFuseQuantileFunctions(ASTPtr & query)
{
GatherFunctionQuantileVisitor::Data data{};
GatherFunctionQuantileVisitor(data).visit(query);
for (auto & candidate : data.fuse_quantile)
{
String func_name = candidate.first;
auto & args_to_functions = candidate.second;
/// Try to fuse multiply `quantile*` Function to plural
for (auto it : args_to_functions.arg_map_function)
{
std::vector<ASTPtr *> & functions = it.second;
auto func_base = getQuantileFuseCandidate(func_name, functions);
if (!func_base)
continue;
for (size_t i = 0; i < functions.size(); ++i)
{
std::shared_ptr<ASTFunction> ast_new = makeASTFunction("arrayElement", func_base, std::make_shared<ASTLiteral>(i + 1));
if (const auto & alias = (*functions[i])->tryGetAlias(); !alias.empty())
ast_new->setAlias(alias);
*functions[i] = ast_new;
}
}
}
}
}
void TreeOptimizer::optimizeIf(ASTPtr & query, Aliases & aliases, bool if_chain_to_multiif)
@ -723,6 +777,9 @@ void TreeOptimizer::apply(ASTPtr & query, TreeRewriterResult & result,
/// Remove duplicated columns from USING(...).
optimizeUsing(select_query);
if (settings.optimize_syntax_fuse_functions)
optimizeFuseQuantileFunctions(query);
}
}

View File

@ -1072,7 +1072,7 @@ void TreeRewriter::normalize(
// if we have at least two different functions. E.g. we will replace sum(x)
// and count(x) with sumCount(x).1 and sumCount(x).2, and sumCount() will
// be calculated only once because of CSE.
if (settings.optimize_fuse_sum_count_avg)
if (settings.optimize_fuse_sum_count_avg || settings.optimize_syntax_fuse_functions)
{
FuseSumCountAggregatesVisitor::Data data;
FuseSumCountAggregatesVisitor(data).visit(query);

View File

@ -582,7 +582,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
auto * queue = context->getAsynchronousInsertQueue();
const bool async_insert = queue
&& insert_query && !insert_query->select
&& insert_query->hasInlinedData() && settings.async_insert_mode;
&& insert_query->hasInlinedData() && settings.async_insert;
if (async_insert)
{

View File

@ -11,6 +11,7 @@
#include <Interpreters/loadMetadata.h>
#include <Databases/DatabaseOrdinary.h>
#include <Databases/TablesLoader.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
@ -43,7 +44,7 @@ static void executeCreateQuery(
interpreter.setInternal(true);
interpreter.setForceAttach(true);
interpreter.setForceRestoreData(has_force_restore_data_flag);
interpreter.setSkipStartupTables(true);
interpreter.setLoadDatabaseWithoutTables(true);
interpreter.execute();
}
@ -161,8 +162,16 @@ void loadMetadata(ContextMutablePtr context, const String & default_database_nam
if (create_default_db_if_not_exists && !metadata_dir_for_default_db_already_exists)
databases.emplace(default_database_name, path + "/" + escapeForFileName(default_database_name));
TablesLoader::Databases loaded_databases;
for (const auto & [name, db_path] : databases)
{
loadDatabase(context, name, db_path, has_force_restore_data_flag);
loaded_databases.insert({name, DatabaseCatalog::instance().getDatabase(name)});
}
TablesLoader loader{context, std::move(loaded_databases), has_force_restore_data_flag, /* force_attach */ true};
loader.loadTables();
loader.startupTables();
if (has_force_restore_data_flag)
{
@ -197,11 +206,28 @@ static void loadSystemDatabaseImpl(ContextMutablePtr context, const String & dat
}
}
void startupSystemTables()
{
ThreadPool pool;
DatabaseCatalog::instance().getSystemDatabase()->startupTables(pool, /* force_restore */ true, /* force_attach */ true);
}
void loadMetadataSystem(ContextMutablePtr context)
{
loadSystemDatabaseImpl(context, DatabaseCatalog::SYSTEM_DATABASE, "Atomic");
loadSystemDatabaseImpl(context, DatabaseCatalog::INFORMATION_SCHEMA, "Memory");
loadSystemDatabaseImpl(context, DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE, "Memory");
TablesLoader::Databases databases =
{
{DatabaseCatalog::SYSTEM_DATABASE, DatabaseCatalog::instance().getSystemDatabase()},
{DatabaseCatalog::INFORMATION_SCHEMA, DatabaseCatalog::instance().getDatabase(DatabaseCatalog::INFORMATION_SCHEMA)},
{DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE, DatabaseCatalog::instance().getDatabase(DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE)},
};
TablesLoader loader{context, databases, /* force_restore */ true, /* force_attach */ true};
loader.loadTables();
/// Will startup tables in system database after all databases are loaded.
}
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <Interpreters/Context_fwd.h>
#include <Databases/TablesLoader.h>
namespace DB
@ -14,4 +15,8 @@ void loadMetadataSystem(ContextMutablePtr context);
/// Use separate function to load system tables.
void loadMetadata(ContextMutablePtr context, const String & default_database_name = {});
/// Background operations in system tables may slowdown loading of the rest tables,
/// so we startup system tables after all databases are loaded.
void startupSystemTables();
}

View File

@ -28,6 +28,7 @@ SRCS(
ApplyWithSubqueryVisitor.cpp
ArithmeticOperationsInAgrFuncOptimize.cpp
ArrayJoinAction.cpp
AsynchronousInsertQueue.cpp
AsynchronousMetricLog.cpp
AsynchronousMetrics.cpp
BloomFilter.cpp
@ -64,6 +65,7 @@ SRCS(
ExtractExpressionInfoVisitor.cpp
FillingRow.cpp
FunctionNameNormalizer.cpp
GatherFunctionQuantileVisitor.cpp
HashJoin.cpp
IExternalLoadable.cpp
IInterpreter.cpp

View File

@ -400,6 +400,11 @@ void ASTAlterCommand::formatImpl(
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "RESET SETTING " << (settings.hilite ? hilite_none : "");
settings_resets->formatImpl(settings, state, frame);
}
else if (type == ASTAlterCommand::MODIFY_DATABASE_SETTING)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "MODIFY SETTING " << (settings.hilite ? hilite_none : "");
settings_changes->formatImpl(settings, state, frame);
}
else if (type == ASTAlterCommand::MODIFY_QUERY)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "MODIFY QUERY " << settings.nl_or_ws << (settings.hilite ? hilite_none : "");
@ -472,11 +477,24 @@ void ASTAlterQuery::formatQueryImpl(const FormatSettings & settings, FormatState
frame.need_parens = false;
std::string indent_str = settings.one_line ? "" : std::string(4u * frame.indent, ' ');
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str;
if (is_live_view)
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "ALTER LIVE VIEW " << (settings.hilite ? hilite_none : "");
else
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "ALTER TABLE " << (settings.hilite ? hilite_none : "");
switch (alter_object)
{
case AlterObjectType::TABLE:
settings.ostr << "ALTER TABLE ";
break;
case AlterObjectType::DATABASE:
settings.ostr << "ALTER DATABASE ";
break;
case AlterObjectType::LIVE_VIEW:
settings.ostr << "ALTER LIVE VIEW ";
break;
default:
break;
}
settings.ostr << (settings.hilite ? hilite_none : "");
if (!table.empty())
{
@ -487,6 +505,11 @@ void ASTAlterQuery::formatQueryImpl(const FormatSettings & settings, FormatState
}
settings.ostr << indent_str << backQuoteIfNeed(table);
}
else if (alter_object == AlterObjectType::DATABASE && !database.empty())
{
settings.ostr << indent_str << backQuoteIfNeed(database);
}
formatOnCluster(settings);
settings.ostr << settings.nl_or_ws;

Some files were not shown because too many files have changed in this diff Show More