Merge branch 'master' into insert-quorum-host-name-check

This commit is contained in:
Alexey Milovidov 2024-01-02 21:46:12 +01:00
commit deac0b5f3e
242 changed files with 2973 additions and 1491 deletions

View File

@ -1,4 +1,4 @@
Copyright 2016-2023 ClickHouse, Inc.
Copyright 2016-2024 ClickHouse, Inc.
Apache License
Version 2.0, January 2004
@ -188,7 +188,7 @@ Copyright 2016-2023 ClickHouse, Inc.
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright 2016-2023 ClickHouse, Inc.
Copyright 2016-2024 ClickHouse, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.

View File

@ -3,10 +3,10 @@
# NOTE: has nothing common with DBMS_TCP_PROTOCOL_VERSION,
# only DBMS_TCP_PROTOCOL_VERSION should be incremented on protocol changes.
SET(VERSION_REVISION 54482)
SET(VERSION_MAJOR 23)
SET(VERSION_MINOR 13)
SET(VERSION_MAJOR 24)
SET(VERSION_MINOR 1)
SET(VERSION_PATCH 1)
SET(VERSION_GITHASH a2faa65b080a587026c86844f3a20c74d23a86f8)
SET(VERSION_DESCRIBE v23.13.1.1-testing)
SET(VERSION_STRING 23.13.1.1)
SET(VERSION_DESCRIBE v24.1.1.1-testing)
SET(VERSION_STRING 24.1.1.1)
# end of autochange

View File

@ -1,4 +1,4 @@
if(OS_LINUX AND TARGET OpenSSL::SSL)
if((OS_LINUX OR OS_DARWIN) AND TARGET OpenSSL::SSL)
option(ENABLE_MYSQL "Enable MySQL" ${ENABLE_LIBRARIES})
else ()
option(ENABLE_MYSQL "Enable MySQL" FALSE)
@ -73,7 +73,7 @@ set(HAVE_SYS_TYPES_H 1)
set(HAVE_SYS_UN_H 1)
set(HAVE_UNISTD_H 1)
set(HAVE_UTIME_H 1)
set(HAVE_UCONTEXT_H 1)
set(HAVE_UCONTEXT_H 0)
set(HAVE_ALLOCA 1)
set(HAVE_DLERROR 0)
set(HAVE_DLOPEN 0)
@ -116,9 +116,13 @@ CONFIGURE_FILE(${CC_SOURCE_DIR}/include/ma_config.h.in
CONFIGURE_FILE(${CC_SOURCE_DIR}/include/mariadb_version.h.in
${CC_BINARY_DIR}/include-public/mariadb_version.h)
if(WITH_SSL)
if (WITH_SSL)
set(SYSTEM_LIBS ${SYSTEM_LIBS} ${SSL_LIBRARIES})
endif()
endif ()
if (OS_DARWIN)
set(SYSTEM_LIBS ${SYSTEM_LIBS} iconv)
endif ()
function(REGISTER_PLUGIN)
@ -227,15 +231,8 @@ ${CC_SOURCE_DIR}/libmariadb/secure/openssl_crypt.c
${CC_BINARY_DIR}/libmariadb/ma_client_plugin.c
)
if(ICONV_INCLUDE_DIR)
include_directories(BEFORE ${ICONV_INCLUDE_DIR})
endif()
add_definitions(-DLIBICONV_PLUG)
if(WITH_DYNCOL)
set(LIBMARIADB_SOURCES ${LIBMARIADB_SOURCES} ${CC_SOURCE_DIR}/libmariadb/mariadb_dyncol.c)
endif()
set(LIBMARIADB_SOURCES ${LIBMARIADB_SOURCES} ${CC_SOURCE_DIR}/libmariadb/mariadb_async.c ${CC_SOURCE_DIR}/libmariadb/ma_context.c)

View File

@ -3,10 +3,10 @@ compilers and build settings. Correctly configured Docker daemon is single depen
Usage:
Build deb package with `clang-14` in `debug` mode:
Build deb package with `clang-17` in `debug` mode:
```
$ mkdir deb/test_output
$ ./packager --output-dir deb/test_output/ --package-type deb --compiler=clang-14 --debug-build
$ ./packager --output-dir deb/test_output/ --package-type deb --compiler=clang-17 --debug-build
$ ls -l deb/test_output
-rw-r--r-- 1 root root 3730 clickhouse-client_22.2.2+debug_all.deb
-rw-r--r-- 1 root root 84221888 clickhouse-common-static_22.2.2+debug_amd64.deb
@ -17,11 +17,11 @@ $ ls -l deb/test_output
```
Build ClickHouse binary with `clang-14` and `address` sanitizer in `relwithdebuginfo`
Build ClickHouse binary with `clang-17` and `address` sanitizer in `relwithdebuginfo`
mode:
```
$ mkdir $HOME/some_clickhouse
$ ./packager --output-dir=$HOME/some_clickhouse --package-type binary --compiler=clang-14 --sanitizer=address
$ ./packager --output-dir=$HOME/some_clickhouse --package-type binary --compiler=clang-17 --sanitizer=address
$ ls -l $HOME/some_clickhouse
-rwxr-xr-x 1 root root 787061952 clickhouse
lrwxrwxrwx 1 root root 10 clickhouse-benchmark -> clickhouse

View File

@ -25,7 +25,7 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
[ORDER BY expr]
[PRIMARY KEY expr]
[SAMPLE BY expr]
[SETTINGS name=value, ...]
[SETTINGS name=value, clean_deleted_rows=value, ...]
```
For a description of request parameters, see [statement description](../../../sql-reference/statements/create/table.md).
@ -88,6 +88,53 @@ SELECT * FROM mySecondReplacingMT FINAL;
└─────┴─────────┴─────────────────────┘
```
### is_deleted
`is_deleted` — Name of a column used during a merge to determine whether the data in this row represents the state or is to be deleted; `1` is a “deleted“ row, `0` is a “state“ row.
Column data type — `UInt8`.
:::note
`is_deleted` can only be enabled when `ver` is used.
The row is deleted when `OPTIMIZE ... FINAL CLEANUP` or `OPTIMIZE ... FINAL` is used, or if the engine setting `clean_deleted_rows` has been set to `Always`.
No matter the operation on the data, the version must be increased. If two inserted rows have the same version number, the last inserted row is the one kept.
:::
Example:
```sql
-- with ver and is_deleted
CREATE OR REPLACE TABLE myThirdReplacingMT
(
`key` Int64,
`someCol` String,
`eventTime` DateTime,
`is_deleted` UInt8
)
ENGINE = ReplacingMergeTree(eventTime, is_deleted)
ORDER BY key;
INSERT INTO myThirdReplacingMT Values (1, 'first', '2020-01-01 01:01:01', 0);
INSERT INTO myThirdReplacingMT Values (1, 'first', '2020-01-01 01:01:01', 1);
select * from myThirdReplacingMT final;
0 rows in set. Elapsed: 0.003 sec.
-- delete rows with is_deleted
OPTIMIZE TABLE myThirdReplacingMT FINAL CLEANUP;
INSERT INTO myThirdReplacingMT Values (1, 'first', '2020-01-01 00:00:00', 0);
select * from myThirdReplacingMT final;
┌─key─┬─someCol─┬───────────eventTime─┬─is_deleted─┐
│ 1 │ first │ 2020-01-01 00:00:00 │ 0 │
└─────┴─────────┴─────────────────────┴────────────┘
```
## Query clauses
When creating a `ReplacingMergeTree` table the same [clauses](../../../engines/table-engines/mergetree-family/mergetree.md) are required, as when creating a `MergeTree` table.

View File

@ -25,8 +25,7 @@ The steps below will easily work on a local install of ClickHouse too. The only
1. Let's see what the data looks like. The `s3cluster` table function returns a table, so we can `DESCRIBE` the result:
```sql
DESCRIBE s3Cluster(
'default',
DESCRIBE s3(
'https://clickhouse-public-datasets.s3.amazonaws.com/youtube/original/files/*.zst',
'JSONLines'
);
@ -35,29 +34,29 @@ DESCRIBE s3Cluster(
ClickHouse infers the following schema from the JSON file:
```response
┌─name────────────────┬─type─────────────────────────────────┐
│ id │ Nullable(String) │
│ fetch_date │ Nullable(Int64)
│ upload_date │ Nullable(String) │
│ title │ Nullable(String) │
│ uploader_id │ Nullable(String) │
│ uploader │ Nullable(String) │
│ uploader_sub_count │ Nullable(Int64) │
│ is_age_limit │ Nullable(Bool) │
│ view_count │ Nullable(Int64) │
│ like_count │ Nullable(Int64) │
│ dislike_count │ Nullable(Int64) │
│ is_crawlable │ Nullable(Bool) │
│ is_live_content │ Nullable(Bool) │
│ has_subtitles │ Nullable(Bool) │
│ is_ads_enabled │ Nullable(Bool) │
│ is_comments_enabled │ Nullable(Bool) │
│ description │ Nullable(String) │
│ rich_metadata │ Array(Map(String, Nullable(String)))
│ super_titles │ Array(Map(String, Nullable(String)))
│ uploader_badges │ Nullable(String) │
│ video_badges │ Nullable(String) │
└─────────────────────┴──────────────────────────────────────┘
┌─name────────────────┬─type───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬─default_type─┬─default_expression─┬─comment─┬─codec_expression─┬─ttl_expression─
│ id │ Nullable(String) │ │ │ │ │
│ fetch_date │ Nullable(String) │ │ │ │ │
│ upload_date │ Nullable(String) │ │ │ │ │
│ title │ Nullable(String) │ │ │ │ │
│ uploader_id │ Nullable(String) │ │ │ │ │
│ uploader │ Nullable(String) │ │ │ │ │
│ uploader_sub_count │ Nullable(Int64) │ │ │ │ │
│ is_age_limit │ Nullable(Bool) │ │ │ │ │
│ view_count │ Nullable(Int64) │ │ │ │ │
│ like_count │ Nullable(Int64) │ │ │ │ │
│ dislike_count │ Nullable(Int64) │ │ │ │ │
│ is_crawlable │ Nullable(Bool) │ │ │ │ │
│ is_live_content │ Nullable(Bool) │ │ │ │ │
│ has_subtitles │ Nullable(Bool) │ │ │ │ │
│ is_ads_enabled │ Nullable(Bool) │ │ │ │ │
│ is_comments_enabled │ Nullable(Bool) │ │ │ │ │
│ description │ Nullable(String) │ │ │ │ │
│ rich_metadata │ Array(Tuple(call Nullable(String), content Nullable(String), subtitle Nullable(String), title Nullable(String), url Nullable(String))) │ │ │ │ │
│ super_titles │ Array(Tuple(text Nullable(String), url Nullable(String))) │ │ │ │ │
│ uploader_badges │ Nullable(String) │ │ │ │ │
│ video_badges │ Nullable(String) │ │ │ │ │
└─────────────────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┴──────────────┴────────────────────┴─────────┴──────────────────┴────────────────
```
2. Based on the inferred schema, we cleaned up the data types and added a primary key. Define the following table:
@ -82,13 +81,13 @@ CREATE TABLE youtube
`is_ads_enabled` Bool,
`is_comments_enabled` Bool,
`description` String,
`rich_metadata` Array(Map(String, String)),
`super_titles` Array(Map(String, String)),
`rich_metadata` Array(Tuple(call String, content String, subtitle String, title String, url String)),
`super_titles` Array(Tuple(text String, url String)),
`uploader_badges` String,
`video_badges` String
)
ENGINE = MergeTree
ORDER BY (uploader, upload_date);
ORDER BY (uploader, upload_date)
```
3. The following command streams the records from the S3 files into the `youtube` table.

View File

@ -852,6 +852,16 @@ If the file name for column is too long (more than `max_file_name_length` bytes)
The maximal length of the file name to keep it as is without hashing. Takes effect only if setting `replace_long_file_name_to_hash` is enabled. The value of this setting does not include the length of file extension. So, it is recommended to set it below the maximum filename length (usually 255 bytes) with some gap to avoid filesystem errors. Default value: 127.
## clean_deleted_rows
Enable/disable automatic deletion of rows flagged as `is_deleted` when perform `OPTIMIZE ... FINAL` on a table using the ReplacingMergeTree engine. When disabled, the `CLEANUP` keyword has to be added to the `OPTIMIZE ... FINAL` to have the same behaviour.
Possible values:
- `Always` or `Never`.
Default value: `Never`
## allow_experimental_block_number_column
Persists virtual column `_block_number` on merges.

View File

@ -0,0 +1,26 @@
---
slug: /en/operations/system-tables/database_engines
---
# database_engines
Contains the list of database engines supported by the server.
This table contains the following columns (the column type is shown in brackets):
- `name` (String) — The name of database engine.
Example:
``` sql
SELECT *
FROM system.database_engines
WHERE name in ('Atomic', 'Lazy', 'Ordinary')
```
``` text
┌─name─────┐
│ Ordinary │
│ Atomic │
│ Lazy │
└──────────┘
```

View File

@ -501,41 +501,3 @@ Result:
│ 0 │
└────────────────────────────────────────────────────────────────────┘
```
## reverseDNSQuery
Performs a reverse DNS query to get the PTR records associated with the IP address.
**Syntax**
``` sql
reverseDNSQuery(address)
```
This function performs reverse DNS resolutions on both IPv4 and IPv6.
**Arguments**
- `address` — An IPv4 or IPv6 address. [String](../../sql-reference/data-types/string.md).
**Returned value**
- Associated domains (PTR records).
Type: Type: [Array(String)](../../sql-reference/data-types/array.md).
**Example**
Query:
``` sql
SELECT reverseDNSQuery('192.168.0.2');
```
Result:
``` text
┌─reverseDNSQuery('192.168.0.2')────────────┐
│ ['test2.example.com','test3.example.com'] │
└───────────────────────────────────────────┘
```

View File

@ -86,6 +86,59 @@ SELECT * FROM mySecondReplacingMT FINAL;
│ 1 │ first │ 2020-01-01 01:01:01 │
└─────┴─────────┴─────────────────────┘
```
### is_deleted
`is_deleted` — Имя столбца, который используется во время слияния для обозначения того, нужно ли отображать строку или она подлежит удалению; `1` - для удаления строки, `0` - для отображения строки.
Тип данных столбца — `UInt8`.
:::note
`is_deleted` может быть использован, если `ver` используется.
Строка удаляется в следующих случаях:
- при использовании инструкции `OPTIMIZE ... FINAL CLEANUP`
- при использовании инструкции `OPTIMIZE ... FINAL`
- параметр движка `clean_deleted_rows` установлен в значение `Always` (по умолчанию - `Never`)
- есть новые версии строки
Не рекомендуется выполнять `FINAL CLEANUP` или использовать параметр движка `clean_deleted_rows` со значением `Always`, это может привести к неожиданным результатам, например удаленные строки могут вновь появиться.
Вне зависимости от производимых изменений над данными, версия должна увеличиваться. Если у двух строк одна и та же версия, то остается только последняя вставленная строка.
:::
Пример:
```sql
-- with ver and is_deleted
CREATE OR REPLACE TABLE myThirdReplacingMT
(
`key` Int64,
`someCol` String,
`eventTime` DateTime,
`is_deleted` UInt8
)
ENGINE = ReplacingMergeTree(eventTime, is_deleted)
ORDER BY key;
INSERT INTO myThirdReplacingMT Values (1, 'first', '2020-01-01 01:01:01', 0);
INSERT INTO myThirdReplacingMT Values (1, 'first', '2020-01-01 01:01:01', 1);
select * from myThirdReplacingMT final;
0 rows in set. Elapsed: 0.003 sec.
-- delete rows with is_deleted
OPTIMIZE TABLE myThirdReplacingMT FINAL CLEANUP;
INSERT INTO myThirdReplacingMT Values (1, 'first', '2020-01-01 00:00:00', 0);
select * from myThirdReplacingMT final;
┌─key─┬─someCol─┬───────────eventTime─┬─is_deleted─┐
│ 1 │ first │ 2020-01-01 00:00:00 │ 0 │
└─────┴─────────┴─────────────────────┴────────────┘
```
## Секции запроса

View File

@ -2,6 +2,7 @@
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/StatusFile.h>
#include <Common/TerminalSize.h>
#include <Databases/registerDatabases.h>
#include <IO/ConnectionTimeouts.h>
#include <Formats/registerFormats.h>
#include <Common/scope_guard_safe.h>
@ -159,6 +160,7 @@ void ClusterCopierApp::mainImpl()
registerFunctions();
registerAggregateFunctions();
registerTableFunctions();
registerDatabases();
registerStorages();
registerDictionaries();
registerDisks(/* global_skip_access_check= */ true);

View File

@ -17,6 +17,7 @@
#include <Interpreters/Context.h>
#include <Functions/FunctionFactory.h>
#include <Databases/registerDatabases.h>
#include <Functions/registerFunctions.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
@ -130,6 +131,7 @@ int mainEntryClickHouseFormat(int argc, char ** argv)
registerFunctions();
registerAggregateFunctions();
registerTableFunctions();
registerDatabases();
registerStorages();
registerFormats();

View File

@ -2,6 +2,7 @@
#include "CatBoostLibraryHandler.h"
#include "CatBoostLibraryHandlerFactory.h"
#include "Common/ProfileEvents.h"
#include "ExternalDictionaryLibraryHandler.h"
#include "ExternalDictionaryLibraryHandlerFactory.h"
@ -44,7 +45,7 @@ namespace
response.setStatusAndReason(HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
*response.send() << message << std::endl;
*response.send() << message << '\n';
LOG_WARNING(&Poco::Logger::get("LibraryBridge"), fmt::runtime(message));
}
@ -96,7 +97,7 @@ ExternalDictionaryLibraryBridgeRequestHandler::ExternalDictionaryLibraryBridgeRe
}
void ExternalDictionaryLibraryBridgeRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
void ExternalDictionaryLibraryBridgeRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & /*write_event*/)
{
LOG_TRACE(log, "Request URI: {}", request.getURI());
HTMLForm params(getContext()->getSettingsRef(), request);
@ -384,7 +385,7 @@ ExternalDictionaryLibraryBridgeExistsHandler::ExternalDictionaryLibraryBridgeExi
}
void ExternalDictionaryLibraryBridgeExistsHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
void ExternalDictionaryLibraryBridgeExistsHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & /*write_event*/)
{
try
{
@ -423,7 +424,7 @@ CatBoostLibraryBridgeRequestHandler::CatBoostLibraryBridgeRequestHandler(
}
void CatBoostLibraryBridgeRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
void CatBoostLibraryBridgeRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & /*write_event*/)
{
LOG_TRACE(log, "Request URI: {}", request.getURI());
HTMLForm params(getContext()->getSettingsRef(), request);
@ -621,7 +622,7 @@ CatBoostLibraryBridgeExistsHandler::CatBoostLibraryBridgeExistsHandler(size_t ke
}
void CatBoostLibraryBridgeExistsHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
void CatBoostLibraryBridgeExistsHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & /*write_event*/)
{
try
{

View File

@ -20,7 +20,7 @@ class ExternalDictionaryLibraryBridgeRequestHandler : public HTTPRequestHandler,
public:
ExternalDictionaryLibraryBridgeRequestHandler(size_t keep_alive_timeout_, ContextPtr context_);
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event) override;
private:
static constexpr inline auto FORMAT = "RowBinary";
@ -36,7 +36,7 @@ class ExternalDictionaryLibraryBridgeExistsHandler : public HTTPRequestHandler,
public:
ExternalDictionaryLibraryBridgeExistsHandler(size_t keep_alive_timeout_, ContextPtr context_);
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event) override;
private:
const size_t keep_alive_timeout;
@ -65,7 +65,7 @@ class CatBoostLibraryBridgeRequestHandler : public HTTPRequestHandler, WithConte
public:
CatBoostLibraryBridgeRequestHandler(size_t keep_alive_timeout_, ContextPtr context_);
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event) override;
private:
const size_t keep_alive_timeout;
@ -79,7 +79,7 @@ class CatBoostLibraryBridgeExistsHandler : public HTTPRequestHandler, WithContex
public:
CatBoostLibraryBridgeExistsHandler(size_t keep_alive_timeout_, ContextPtr context_);
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event) override;
private:
const size_t keep_alive_timeout;

View File

@ -10,6 +10,7 @@
#include <Poco/Logger.h>
#include <Poco/NullChannel.h>
#include <Poco/SimpleFileChannel.h>
#include <Databases/registerDatabases.h>
#include <Databases/DatabaseFilesystem.h>
#include <Databases/DatabaseMemory.h>
#include <Databases/DatabasesOverlay.h>
@ -489,6 +490,7 @@ try
registerFunctions();
registerAggregateFunctions();
registerTableFunctions();
registerDatabases();
registerStorages();
registerDictionaries();
registerDisks(/* global_skip_access_check= */ true);
@ -726,12 +728,7 @@ void LocalServer::processConfig()
/// We load temporary database first, because projections need it.
DatabaseCatalog::instance().initializeAndLoadTemporaryDatabase();
/** Init dummy default DB
* NOTE: We force using isolated default database to avoid conflicts with default database from server environment
* Otherwise, metadata of temporary File(format, EXPLICIT_PATH) tables will pollute metadata/ directory;
* if such tables will not be dropped, clickhouse-server will not be able to load them due to security reasons.
*/
std::string default_database = config().getString("default_database", "_local");
std::string default_database = config().getString("default_database", "default");
DatabaseCatalog::instance().attachDatabase(default_database, createClickHouseLocalDatabaseOverlay(default_database, global_context));
global_context->setCurrentDatabase(default_database);
@ -744,7 +741,7 @@ void LocalServer::processConfig()
LOG_DEBUG(log, "Loading metadata from {}", path);
auto startup_system_tasks = loadMetadataSystem(global_context);
attachSystemTablesLocal(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::SYSTEM_DATABASE));
attachSystemTablesServer(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::SYSTEM_DATABASE), false);
attachInformationSchema(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::INFORMATION_SCHEMA));
attachInformationSchema(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE));
waitLoad(TablesLoaderForegroundPoolId, startup_system_tasks);
@ -763,7 +760,7 @@ void LocalServer::processConfig()
}
else if (!config().has("no-system-tables"))
{
attachSystemTablesLocal(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::SYSTEM_DATABASE));
attachSystemTablesServer(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::SYSTEM_DATABASE), false);
attachInformationSchema(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::INFORMATION_SCHEMA));
attachInformationSchema(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE));
}

View File

@ -69,7 +69,7 @@ namespace
}
void ODBCColumnsInfoHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
void ODBCColumnsInfoHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & /*write_event*/)
{
HTMLForm params(getContext()->getSettingsRef(), request, request.getStream());
LOG_TRACE(log, "Request URI: {}", request.getURI());
@ -78,7 +78,7 @@ void ODBCColumnsInfoHandler::handleRequest(HTTPServerRequest & request, HTTPServ
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
*response.send() << message << std::endl;
*response.send() << message << '\n';
LOG_WARNING(log, fmt::runtime(message));
};

View File

@ -23,7 +23,7 @@ public:
{
}
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event) override;
private:
Poco::Logger * log;

View File

@ -21,7 +21,7 @@
namespace DB
{
void IdentifierQuoteHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
void IdentifierQuoteHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & /*write_event*/)
{
HTMLForm params(getContext()->getSettingsRef(), request, request.getStream());
LOG_TRACE(log, "Request URI: {}", request.getURI());
@ -30,7 +30,7 @@ void IdentifierQuoteHandler::handleRequest(HTTPServerRequest & request, HTTPServ
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
*response.send() << message << std::endl;
response.send()->writeln(message);
LOG_WARNING(log, fmt::runtime(message));
};

View File

@ -21,7 +21,7 @@ public:
{
}
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event) override;
private:
Poco::Logger * log;

View File

@ -46,12 +46,12 @@ void ODBCHandler::processError(HTTPServerResponse & response, const std::string
{
response.setStatusAndReason(HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
*response.send() << message << std::endl;
*response.send() << message << '\n';
LOG_WARNING(log, fmt::runtime(message));
}
void ODBCHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
void ODBCHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & /*write_event*/)
{
HTMLForm params(getContext()->getSettingsRef(), request);
LOG_TRACE(log, "Request URI: {}", request.getURI());

View File

@ -30,7 +30,7 @@ public:
{
}
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event) override;
private:
Poco::Logger * log;

View File

@ -6,7 +6,7 @@
namespace DB
{
void PingHandler::handleRequest(HTTPServerRequest & /* request */, HTTPServerResponse & response)
void PingHandler::handleRequest(HTTPServerRequest & /* request */, HTTPServerResponse & response, const ProfileEvents::Event & /*write_event*/)
{
try
{

View File

@ -10,7 +10,7 @@ class PingHandler : public HTTPRequestHandler
{
public:
explicit PingHandler(size_t keep_alive_timeout_) : keep_alive_timeout(keep_alive_timeout_) {}
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event) override;
private:
size_t keep_alive_timeout;

View File

@ -29,7 +29,7 @@ namespace
}
void SchemaAllowedHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
void SchemaAllowedHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & /*write_event*/)
{
HTMLForm params(getContext()->getSettingsRef(), request, request.getStream());
LOG_TRACE(log, "Request URI: {}", request.getURI());
@ -38,7 +38,7 @@ void SchemaAllowedHandler::handleRequest(HTTPServerRequest & request, HTTPServer
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
*response.send() << message << std::endl;
*response.send() << message << '\n';
LOG_WARNING(log, fmt::runtime(message));
};

View File

@ -24,7 +24,7 @@ public:
{
}
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event) override;
private:
Poco::Logger * log;

View File

@ -72,6 +72,7 @@
#include <TableFunctions/registerTableFunctions.h>
#include <Formats/registerFormats.h>
#include <Storages/registerStorages.h>
#include <Databases/registerDatabases.h>
#include <Dictionaries/registerDictionaries.h>
#include <Disks/registerDisks.h>
#include <IO/Resource/registerSchedulerNodes.h>
@ -151,6 +152,18 @@ namespace ProfileEvents
{
extern const Event MainConfigLoads;
extern const Event ServerStartupMilliseconds;
extern const Event InterfaceNativeSendBytes;
extern const Event InterfaceNativeReceiveBytes;
extern const Event InterfaceHTTPSendBytes;
extern const Event InterfaceHTTPReceiveBytes;
extern const Event InterfacePrometheusSendBytes;
extern const Event InterfacePrometheusReceiveBytes;
extern const Event InterfaceInterserverSendBytes;
extern const Event InterfaceInterserverReceiveBytes;
extern const Event InterfaceMySQLSendBytes;
extern const Event InterfaceMySQLReceiveBytes;
extern const Event InterfacePostgreSQLSendBytes;
extern const Event InterfacePostgreSQLReceiveBytes;
}
namespace fs = std::filesystem;
@ -648,6 +661,7 @@ try
registerFunctions();
registerAggregateFunctions();
registerTableFunctions();
registerDatabases();
registerStorages();
registerDictionaries();
registerDisks(/* global_skip_access_check= */ false);
@ -2045,7 +2059,7 @@ std::unique_ptr<TCPProtocolStackFactory> Server::buildProtocolStackFromConfig(
auto create_factory = [&](const std::string & type, const std::string & conf_name) -> TCPServerConnectionFactory::Ptr
{
if (type == "tcp")
return TCPServerConnectionFactory::Ptr(new TCPHandlerFactory(*this, false, false));
return TCPServerConnectionFactory::Ptr(new TCPHandlerFactory(*this, false, false, ProfileEvents::InterfaceNativeReceiveBytes, ProfileEvents::InterfaceNativeSendBytes));
if (type == "tls")
#if USE_SSL
@ -2057,20 +2071,20 @@ std::unique_ptr<TCPProtocolStackFactory> Server::buildProtocolStackFromConfig(
if (type == "proxy1")
return TCPServerConnectionFactory::Ptr(new ProxyV1HandlerFactory(*this, conf_name));
if (type == "mysql")
return TCPServerConnectionFactory::Ptr(new MySQLHandlerFactory(*this));
return TCPServerConnectionFactory::Ptr(new MySQLHandlerFactory(*this, ProfileEvents::InterfaceMySQLReceiveBytes, ProfileEvents::InterfaceMySQLSendBytes));
if (type == "postgres")
return TCPServerConnectionFactory::Ptr(new PostgreSQLHandlerFactory(*this));
return TCPServerConnectionFactory::Ptr(new PostgreSQLHandlerFactory(*this, ProfileEvents::InterfacePostgreSQLReceiveBytes, ProfileEvents::InterfacePostgreSQLSendBytes));
if (type == "http")
return TCPServerConnectionFactory::Ptr(
new HTTPServerConnectionFactory(httpContext(), http_params, createHandlerFactory(*this, config, async_metrics, "HTTPHandler-factory"))
new HTTPServerConnectionFactory(httpContext(), http_params, createHandlerFactory(*this, config, async_metrics, "HTTPHandler-factory"), ProfileEvents::InterfaceHTTPReceiveBytes, ProfileEvents::InterfaceHTTPSendBytes)
);
if (type == "prometheus")
return TCPServerConnectionFactory::Ptr(
new HTTPServerConnectionFactory(httpContext(), http_params, createHandlerFactory(*this, config, async_metrics, "PrometheusHandler-factory"))
new HTTPServerConnectionFactory(httpContext(), http_params, createHandlerFactory(*this, config, async_metrics, "PrometheusHandler-factory"), ProfileEvents::InterfacePrometheusReceiveBytes, ProfileEvents::InterfacePrometheusSendBytes)
);
if (type == "interserver")
return TCPServerConnectionFactory::Ptr(
new HTTPServerConnectionFactory(httpContext(), http_params, createHandlerFactory(*this, config, async_metrics, "InterserverIOHTTPHandler-factory"))
new HTTPServerConnectionFactory(httpContext(), http_params, createHandlerFactory(*this, config, async_metrics, "InterserverIOHTTPHandler-factory"), ProfileEvents::InterfaceInterserverReceiveBytes, ProfileEvents::InterfaceInterserverSendBytes)
);
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Protocol configuration error, unknown protocol name '{}'", type);
@ -2203,7 +2217,7 @@ void Server::createServers(
port_name,
"http://" + address.toString(),
std::make_unique<HTTPServer>(
httpContext(), createHandlerFactory(*this, config, async_metrics, "HTTPHandler-factory"), server_pool, socket, http_params));
httpContext(), createHandlerFactory(*this, config, async_metrics, "HTTPHandler-factory"), server_pool, socket, http_params, ProfileEvents::InterfaceHTTPReceiveBytes, ProfileEvents::InterfaceHTTPSendBytes));
});
}
@ -2223,7 +2237,7 @@ void Server::createServers(
port_name,
"https://" + address.toString(),
std::make_unique<HTTPServer>(
httpContext(), createHandlerFactory(*this, config, async_metrics, "HTTPSHandler-factory"), server_pool, socket, http_params));
httpContext(), createHandlerFactory(*this, config, async_metrics, "HTTPSHandler-factory"), server_pool, socket, http_params, ProfileEvents::InterfaceHTTPReceiveBytes, ProfileEvents::InterfaceHTTPSendBytes));
#else
UNUSED(port);
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "HTTPS protocol is disabled because Poco library was built without NetSSL support.");
@ -2246,7 +2260,7 @@ void Server::createServers(
port_name,
"native protocol (tcp): " + address.toString(),
std::make_unique<TCPServer>(
new TCPHandlerFactory(*this, /* secure */ false, /* proxy protocol */ false),
new TCPHandlerFactory(*this, /* secure */ false, /* proxy protocol */ false, ProfileEvents::InterfaceNativeReceiveBytes, ProfileEvents::InterfaceNativeSendBytes),
server_pool,
socket,
new Poco::Net::TCPServerParams));
@ -2268,7 +2282,7 @@ void Server::createServers(
port_name,
"native protocol (tcp) with PROXY: " + address.toString(),
std::make_unique<TCPServer>(
new TCPHandlerFactory(*this, /* secure */ false, /* proxy protocol */ true),
new TCPHandlerFactory(*this, /* secure */ false, /* proxy protocol */ true, ProfileEvents::InterfaceNativeReceiveBytes, ProfileEvents::InterfaceNativeSendBytes),
server_pool,
socket,
new Poco::Net::TCPServerParams));
@ -2291,7 +2305,7 @@ void Server::createServers(
port_name,
"secure native protocol (tcp_secure): " + address.toString(),
std::make_unique<TCPServer>(
new TCPHandlerFactory(*this, /* secure */ true, /* proxy protocol */ false),
new TCPHandlerFactory(*this, /* secure */ true, /* proxy protocol */ false, ProfileEvents::InterfaceNativeReceiveBytes, ProfileEvents::InterfaceNativeSendBytes),
server_pool,
socket,
new Poco::Net::TCPServerParams));
@ -2315,7 +2329,7 @@ void Server::createServers(
listen_host,
port_name,
"MySQL compatibility protocol: " + address.toString(),
std::make_unique<TCPServer>(new MySQLHandlerFactory(*this), server_pool, socket, new Poco::Net::TCPServerParams));
std::make_unique<TCPServer>(new MySQLHandlerFactory(*this, ProfileEvents::InterfaceMySQLReceiveBytes, ProfileEvents::InterfaceMySQLSendBytes), server_pool, socket, new Poco::Net::TCPServerParams));
});
}
@ -2332,7 +2346,7 @@ void Server::createServers(
listen_host,
port_name,
"PostgreSQL compatibility protocol: " + address.toString(),
std::make_unique<TCPServer>(new PostgreSQLHandlerFactory(*this), server_pool, socket, new Poco::Net::TCPServerParams));
std::make_unique<TCPServer>(new PostgreSQLHandlerFactory(*this, ProfileEvents::InterfacePostgreSQLReceiveBytes, ProfileEvents::InterfacePostgreSQLSendBytes), server_pool, socket, new Poco::Net::TCPServerParams));
});
}
@ -2366,7 +2380,7 @@ void Server::createServers(
port_name,
"Prometheus: http://" + address.toString(),
std::make_unique<HTTPServer>(
httpContext(), createHandlerFactory(*this, config, async_metrics, "PrometheusHandler-factory"), server_pool, socket, http_params));
httpContext(), createHandlerFactory(*this, config, async_metrics, "PrometheusHandler-factory"), server_pool, socket, http_params, ProfileEvents::InterfacePrometheusReceiveBytes, ProfileEvents::InterfacePrometheusSendBytes));
});
}
}
@ -2412,7 +2426,9 @@ void Server::createInterserverServers(
createHandlerFactory(*this, config, async_metrics, "InterserverIOHTTPHandler-factory"),
server_pool,
socket,
http_params));
http_params,
ProfileEvents::InterfaceInterserverReceiveBytes,
ProfileEvents::InterfaceInterserverSendBytes));
});
}
@ -2435,7 +2451,9 @@ void Server::createInterserverServers(
createHandlerFactory(*this, config, async_metrics, "InterserverIOHTTPSHandler-factory"),
server_pool,
socket,
http_params));
http_params,
ProfileEvents::InterfaceInterserverReceiveBytes,
ProfileEvents::InterfaceInterserverSendBytes));
#else
UNUSED(port);
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.");

View File

@ -1 +0,0 @@
../../../tests/config/config.d/graphite_alternative.xml

View File

@ -155,6 +155,7 @@ namespace
"formats",
"privileges",
"data_type_families",
"database_engines",
"table_engines",
"table_functions",
"aggregate_function_combinators",

View File

@ -77,6 +77,7 @@ static String getLoadSuggestionQuery(Int32 suggestion_limit, bool basic_suggesti
};
add_column("name", "functions", false, {});
add_column("name", "database_engines", false, {});
add_column("name", "table_engines", false, {});
add_column("name", "formats", false, {});
add_column("name", "table_functions", false, {});

View File

@ -5,6 +5,7 @@
#include <Core/ColumnsWithTypeAndName.h>
#include <Columns/IColumn.h>
namespace DB
{
namespace ErrorCodes
@ -16,7 +17,7 @@ class IFunctionBase;
using FunctionBasePtr = std::shared_ptr<const IFunctionBase>;
/** A column containing a lambda expression.
* Behaves like a constant-column. Contains an expression, but not input or output data.
* Contains an expression and captured columns, but not input arguments.
*/
class ColumnFunction final : public COWHelper<IColumn, ColumnFunction>
{
@ -207,8 +208,6 @@ private:
bool is_function_compiled;
void appendArgument(const ColumnWithTypeAndName & column);
void addOffsetsForReplication(const IColumn::Offsets & offsets);
};
const ColumnFunction * checkAndGetShortCircuitArgument(const ColumnPtr & column);

View File

@ -587,6 +587,19 @@ The server successfully detected this situation and will download merged part fr
M(LogError, "Number of log messages with level Error") \
M(LogFatal, "Number of log messages with level Fatal") \
\
M(InterfaceHTTPSendBytes, "Number of bytes sent through HTTP interfaces") \
M(InterfaceHTTPReceiveBytes, "Number of bytes received through HTTP interfaces") \
M(InterfaceNativeSendBytes, "Number of bytes sent through native interfaces") \
M(InterfaceNativeReceiveBytes, "Number of bytes received through native interfaces") \
M(InterfacePrometheusSendBytes, "Number of bytes sent through Prometheus interfaces") \
M(InterfacePrometheusReceiveBytes, "Number of bytes received through Prometheus interfaces") \
M(InterfaceInterserverSendBytes, "Number of bytes sent through interserver interfaces") \
M(InterfaceInterserverReceiveBytes, "Number of bytes received through interserver interfaces") \
M(InterfaceMySQLSendBytes, "Number of bytes sent through MySQL interfaces") \
M(InterfaceMySQLReceiveBytes, "Number of bytes received through MySQL interfaces") \
M(InterfacePostgreSQLSendBytes, "Number of bytes sent through PostgreSQL interfaces") \
M(InterfacePostgreSQLReceiveBytes, "Number of bytes received through PostgreSQL interfaces") \
\
M(ParallelReplicasUsedCount, "Number of replicas used to execute a query with task-based parallel replicas") \
#ifdef APPLY_FOR_EXTERNAL_EVENTS

View File

@ -660,6 +660,12 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
switch (type)
{
case nuraft::cb_func::PreAppendLogLeader:
{
/// we cannot preprocess anything new as leader because we don't have up-to-date in-memory state
/// until we preprocess all stored logs
return nuraft::cb_func::ReturnCode::ReturnNull;
}
case nuraft::cb_func::InitialBatchCommited:
{
preprocess_logs();

View File

@ -3,10 +3,16 @@
#include <Common/JSONBuilder.h>
#include <Core/InterpolateDescription.h>
#include <Interpreters/convertFieldToType.h>
#include <Core/SettingsEnums.h>
#include <Common/IntervalKind.h>
#include <Parsers/ASTOrderByElement.h>
#include <Parsers/ASTInterpolateElement.h>
#include <Interpreters/Aliases.h>
#include <Interpreters/ActionsDAG.h>
namespace DB
{
InterpolateDescription::InterpolateDescription(ActionsDAGPtr actions_, const Aliases & aliases)
: actions(actions_)
{
@ -28,5 +34,4 @@ namespace DB
result_columns_order.push_back(name);
}
}
}

View File

@ -2,20 +2,18 @@
#include <unordered_map>
#include <memory>
#include <cstddef>
#include <string>
#include <Core/Field.h>
#include <Core/SettingsEnums.h>
#include <Common/IntervalKind.h>
#include <Parsers/ASTOrderByElement.h>
#include <Parsers/ASTInterpolateElement.h>
#include <Functions/FunctionsMiscellaneous.h>
#include <Interpreters/Aliases.h>
#include <Core/NamesAndTypes.h>
#include <Parsers/IAST_fwd.h>
namespace DB
{
class ActionsDAG;
using ActionsDAGPtr = std::shared_ptr<ActionsDAG>;
using Aliases = std::unordered_map<String, ASTPtr>;
/// Interpolate description
struct InterpolateDescription
{

View File

@ -98,6 +98,8 @@ IMPLEMENT_SETTING_AUTO_ENUM(DefaultDatabaseEngine, ErrorCodes::BAD_ARGUMENTS)
IMPLEMENT_SETTING_AUTO_ENUM(DefaultTableEngine, ErrorCodes::BAD_ARGUMENTS)
IMPLEMENT_SETTING_AUTO_ENUM(CleanDeletedRows, ErrorCodes::BAD_ARGUMENTS)
IMPLEMENT_SETTING_MULTI_ENUM(MySQLDataTypesSupport, ErrorCodes::UNKNOWN_MYSQL_DATATYPES_SUPPORT_LEVEL,
{{"decimal", MySQLDataTypesSupport::DECIMAL},
{"datetime64", MySQLDataTypesSupport::DATETIME64},

View File

@ -140,6 +140,14 @@ enum class DefaultTableEngine
DECLARE_SETTING_ENUM(DefaultTableEngine)
enum class CleanDeletedRows
{
Never = 0, /// Disable.
Always,
};
DECLARE_SETTING_ENUM(CleanDeletedRows)
enum class MySQLDataTypesSupport
{
DECIMAL, // convert MySQL's decimal and number to ClickHouse Decimal when applicable

View File

@ -1,6 +1,7 @@
#include <Databases/DatabaseAtomic.h>
#include <Databases/DatabaseOnDisk.h>
#include <Databases/DatabaseReplicated.h>
#include <Databases/DatabaseFactory.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadBufferFromFile.h>
@ -622,4 +623,16 @@ void DatabaseAtomic::checkDetachedTableNotInUse(const UUID & uuid)
assertDetachedTableNotInUse(uuid);
}
void registerDatabaseAtomic(DatabaseFactory & factory)
{
auto create_fn = [](const DatabaseFactory::Arguments & args)
{
return make_shared<DatabaseAtomic>(
args.database_name,
args.metadata_path,
args.uuid,
args.context);
};
factory.registerDatabase("Atomic", create_fn);
}
}

View File

@ -1,4 +1,5 @@
#include <Databases/DatabaseDictionary.h>
#include <Databases/DatabaseFactory.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExternalDictionariesLoader.h>
#include <Dictionaries/DictionaryStructure.h>
@ -140,4 +141,14 @@ void DatabaseDictionary::shutdown()
{
}
void registerDatabaseDictionary(DatabaseFactory & factory)
{
auto create_fn = [](const DatabaseFactory::Arguments & args)
{
return make_shared<DatabaseDictionary>(
args.database_name,
args.context);
};
factory.registerDatabase("Dictionary", create_fn);
}
}

View File

@ -1,60 +1,15 @@
#include <Databases/DatabaseFactory.h>
#include <filesystem>
#include <Databases/DatabaseAtomic.h>
#include <Databases/DatabaseDictionary.h>
#include <Databases/DatabaseFilesystem.h>
#include <Databases/DatabaseLazy.h>
#include <Databases/DatabaseMemory.h>
#include <Databases/DatabaseOrdinary.h>
#include <Databases/DatabaseFactory.h>
#include <Databases/DatabaseReplicated.h>
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/queryToString.h>
#include <Storages/NamedCollectionsHelpers.h>
#include <Common/logger_useful.h>
#include <Common/Macros.h>
#include <Common/filesystemHelpers.h>
#include "config.h"
#if USE_MYSQL
# include <Core/MySQL/MySQLClient.h>
# include <Databases/MySQL/DatabaseMySQL.h>
# include <Databases/MySQL/MaterializedMySQLSettings.h>
# include <Storages/MySQL/MySQLHelpers.h>
# include <Storages/MySQL/MySQLSettings.h>
# include <Storages/StorageMySQL.h>
# include <Databases/MySQL/DatabaseMaterializedMySQL.h>
# include <mysqlxx/Pool.h>
#endif
#if USE_MYSQL || USE_LIBPQXX
#include <Common/parseRemoteDescription.h>
#include <Common/parseAddress.h>
#endif
#if USE_LIBPQXX
#include <Databases/PostgreSQL/DatabasePostgreSQL.h>
#include <Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.h>
#include <Storages/PostgreSQL/MaterializedPostgreSQLSettings.h>
#include <Storages/StoragePostgreSQL.h>
#endif
#if USE_SQLITE
#include <Databases/SQLite/DatabaseSQLite.h>
#endif
#if USE_AWS_S3
#include <Databases/DatabaseS3.h>
#endif
#if USE_HDFS
#include <Databases/DatabaseHDFS.h>
#endif
#include <Common/logger_useful.h>
namespace fs = std::filesystem;
@ -67,7 +22,7 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
extern const int UNKNOWN_DATABASE_ENGINE;
extern const int CANNOT_CREATE_DATABASE;
extern const int NOT_IMPLEMENTED;
extern const int LOGICAL_ERROR;
}
void cckMetadataPathForOrdinary(const ASTCreateQuery & create, const String & metadata_path)
@ -103,8 +58,47 @@ void cckMetadataPathForOrdinary(const ASTCreateQuery & create, const String & me
}
/// validate validates the database engine that's specified in the create query for
/// engine arguments, settings and table overrides.
void validate(const ASTCreateQuery & create_query)
{
auto * storage = create_query.storage;
/// Check engine may have arguments
static const std::unordered_set<std::string_view> engines_with_arguments{"MySQL", "MaterializeMySQL", "MaterializedMySQL",
"Lazy", "Replicated", "PostgreSQL", "MaterializedPostgreSQL", "SQLite", "Filesystem", "S3", "HDFS"};
const String & engine_name = storage->engine->name;
bool engine_may_have_arguments = engines_with_arguments.contains(engine_name);
if (storage->engine->arguments && !engine_may_have_arguments)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine `{}` cannot have arguments", engine_name);
/// Check engine may have settings
bool may_have_settings = endsWith(engine_name, "MySQL") || engine_name == "Replicated" || engine_name == "MaterializedPostgreSQL";
bool has_unexpected_element = storage->engine->parameters || storage->partition_by ||
storage->primary_key || storage->order_by ||
storage->sample_by;
if (has_unexpected_element || (!may_have_settings && storage->settings))
throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_AST,
"Database engine `{}` cannot have parameters, primary_key, order_by, sample_by, settings", engine_name);
/// Check engine with table overrides
static const std::unordered_set<std::string_view> engines_with_table_overrides{"MaterializeMySQL", "MaterializedMySQL", "MaterializedPostgreSQL"};
if (create_query.table_overrides && !engines_with_table_overrides.contains(engine_name))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine `{}` cannot have table overrides", engine_name);
}
DatabasePtr DatabaseFactory::get(const ASTCreateQuery & create, const String & metadata_path, ContextPtr context)
{
/// check if the database engine is a valid one before proceeding
if (!database_engines.contains(create.storage->engine->name))
throw Exception(ErrorCodes::UNKNOWN_DATABASE_ENGINE, "Unknown database engine: {}", create.storage->engine->name);
/// if the engine is found (i.e. registered with the factory instance), then validate if the
/// supplied engine arguments, settings and table overrides are valid for the engine.
validate(create);
cckMetadataPathForOrdinary(create, metadata_path);
DatabasePtr impl = getImpl(create, metadata_path, context);
@ -119,383 +113,42 @@ DatabasePtr DatabaseFactory::get(const ASTCreateQuery & create, const String & m
return impl;
}
template <typename ValueType>
static inline ValueType safeGetLiteralValue(const ASTPtr &ast, const String &engine_name)
void DatabaseFactory::registerDatabase(const std::string & name, CreatorFn creator_fn)
{
if (!ast || !ast->as<ASTLiteral>())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine {} requested literal argument.", engine_name);
if (!database_engines.emplace(name, std::move(creator_fn)).second)
throw Exception(ErrorCodes::LOGICAL_ERROR, "DatabaseFactory: the database engine name '{}' is not unique", name);
}
return ast->as<ASTLiteral>()->value.safeGet<ValueType>();
DatabaseFactory & DatabaseFactory::instance()
{
static DatabaseFactory db_fact;
return db_fact;
}
DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String & metadata_path, ContextPtr context)
{
auto * engine_define = create.storage;
auto * storage = create.storage;
const String & database_name = create.getDatabase();
const String & engine_name = engine_define->engine->name;
const UUID & uuid = create.uuid;
static const std::unordered_set<std::string_view> database_engines{"Ordinary", "Atomic", "Memory",
"Dictionary", "Lazy", "Replicated", "MySQL", "MaterializeMySQL", "MaterializedMySQL",
"PostgreSQL", "MaterializedPostgreSQL", "SQLite", "Filesystem", "S3", "HDFS"};
if (!database_engines.contains(engine_name))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine name `{}` does not exist", engine_name);
static const std::unordered_set<std::string_view> engines_with_arguments{"MySQL", "MaterializeMySQL", "MaterializedMySQL",
"Lazy", "Replicated", "PostgreSQL", "MaterializedPostgreSQL", "SQLite", "Filesystem", "S3", "HDFS"};
static const std::unordered_set<std::string_view> engines_with_table_overrides{"MaterializeMySQL", "MaterializedMySQL", "MaterializedPostgreSQL"};
bool engine_may_have_arguments = engines_with_arguments.contains(engine_name);
if (engine_define->engine->arguments && !engine_may_have_arguments)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine `{}` cannot have arguments", engine_name);
bool has_unexpected_element = engine_define->engine->parameters || engine_define->partition_by ||
engine_define->primary_key || engine_define->order_by ||
engine_define->sample_by;
bool may_have_settings = endsWith(engine_name, "MySQL") || engine_name == "Replicated" || engine_name == "MaterializedPostgreSQL";
if (has_unexpected_element || (!may_have_settings && engine_define->settings))
throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_AST,
"Database engine `{}` cannot have parameters, primary_key, order_by, sample_by, settings", engine_name);
if (create.table_overrides && !engines_with_table_overrides.contains(engine_name))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine `{}` cannot have table overrides", engine_name);
if (engine_name == "Ordinary")
{
if (!create.attach && !context->getSettingsRef().allow_deprecated_database_ordinary)
throw Exception(ErrorCodes::UNKNOWN_DATABASE_ENGINE,
"Ordinary database engine is deprecated (see also allow_deprecated_database_ordinary setting)");
return std::make_shared<DatabaseOrdinary>(database_name, metadata_path, context);
}
if (engine_name == "Atomic")
return std::make_shared<DatabaseAtomic>(database_name, metadata_path, uuid, context);
else if (engine_name == "Memory")
return std::make_shared<DatabaseMemory>(database_name, context);
else if (engine_name == "Dictionary")
return std::make_shared<DatabaseDictionary>(database_name, context);
#if USE_MYSQL
else if (engine_name == "MySQL" || engine_name == "MaterializeMySQL" || engine_name == "MaterializedMySQL")
{
const ASTFunction * engine = engine_define->engine;
if (!engine->arguments)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", engine_name);
StorageMySQL::Configuration configuration;
ASTs & arguments = engine->arguments->children;
auto mysql_settings = std::make_unique<MySQLSettings>();
if (auto named_collection = tryGetNamedCollectionWithOverrides(arguments, context))
{
configuration = StorageMySQL::processNamedCollectionResult(*named_collection, *mysql_settings, context, false);
}
else
{
if (arguments.size() != 4)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"MySQL database require mysql_hostname, mysql_database_name, mysql_username, mysql_password arguments.");
arguments[1] = evaluateConstantExpressionOrIdentifierAsLiteral(arguments[1], context);
const auto & host_port = safeGetLiteralValue<String>(arguments[0], engine_name);
if (engine_name == "MySQL")
{
size_t max_addresses = context->getSettingsRef().glob_expansion_max_elements;
configuration.addresses = parseRemoteDescriptionForExternalDatabase(host_port, max_addresses, 3306);
}
else
{
const auto & [remote_host, remote_port] = parseAddress(host_port, 3306);
configuration.host = remote_host;
configuration.port = remote_port;
}
configuration.database = safeGetLiteralValue<String>(arguments[1], engine_name);
configuration.username = safeGetLiteralValue<String>(arguments[2], engine_name);
configuration.password = safeGetLiteralValue<String>(arguments[3], engine_name);
}
try
{
if (engine_name == "MySQL")
{
mysql_settings->loadFromQueryContext(context, *engine_define);
if (engine_define->settings)
mysql_settings->loadFromQuery(*engine_define);
auto mysql_pool = createMySQLPoolWithFailover(configuration, *mysql_settings);
return std::make_shared<DatabaseMySQL>(
context, database_name, metadata_path, engine_define, configuration.database,
std::move(mysql_settings), std::move(mysql_pool), create.attach);
}
MySQLClient client(configuration.host, configuration.port, configuration.username, configuration.password);
auto mysql_pool = mysqlxx::Pool(configuration.database, configuration.host, configuration.username, configuration.password, configuration.port);
auto materialize_mode_settings = std::make_unique<MaterializedMySQLSettings>();
if (engine_define->settings)
materialize_mode_settings->loadFromQuery(*engine_define);
if (uuid == UUIDHelpers::Nil)
{
auto print_create_ast = create.clone();
print_create_ast->as<ASTCreateQuery>()->attach = false;
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"The MaterializedMySQL database engine no longer supports Ordinary databases. To re-create the database, delete "
"the old one by executing \"rm -rf {}{{,.sql}}\", then re-create the database with the following query: {}",
metadata_path,
queryToString(print_create_ast));
}
return std::make_shared<DatabaseMaterializedMySQL>(
context, database_name, metadata_path, uuid, configuration.database, std::move(mysql_pool),
std::move(client), std::move(materialize_mode_settings));
}
catch (...)
{
const auto & exception_message = getCurrentExceptionMessage(true);
throw Exception(ErrorCodes::CANNOT_CREATE_DATABASE, "Cannot create MySQL database, because {}", exception_message);
}
}
#endif
else if (engine_name == "Lazy")
{
const ASTFunction * engine = engine_define->engine;
if (!engine->arguments || engine->arguments->children.size() != 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Lazy database require cache_expiration_time_seconds argument");
const auto & arguments = engine->arguments->children;
const auto cache_expiration_time_seconds = safeGetLiteralValue<UInt64>(arguments[0], "Lazy");
return std::make_shared<DatabaseLazy>(database_name, metadata_path, cache_expiration_time_seconds, context);
}
else if (engine_name == "Replicated")
{
const ASTFunction * engine = engine_define->engine;
if (!engine->arguments || engine->arguments->children.size() != 3)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Replicated database requires 3 arguments: zookeeper path, shard name and replica name");
auto & arguments = engine->arguments->children;
for (auto & engine_arg : arguments)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, context);
String zookeeper_path = safeGetLiteralValue<String>(arguments[0], "Replicated");
String shard_name = safeGetLiteralValue<String>(arguments[1], "Replicated");
String replica_name = safeGetLiteralValue<String>(arguments[2], "Replicated");
zookeeper_path = context->getMacros()->expand(zookeeper_path);
shard_name = context->getMacros()->expand(shard_name);
replica_name = context->getMacros()->expand(replica_name);
DatabaseReplicatedSettings database_replicated_settings{};
if (engine_define->settings)
database_replicated_settings.loadFromQuery(*engine_define);
return std::make_shared<DatabaseReplicated>(database_name, metadata_path, uuid,
zookeeper_path, shard_name, replica_name,
std::move(database_replicated_settings), context);
}
#if USE_LIBPQXX
else if (engine_name == "PostgreSQL")
{
const ASTFunction * engine = engine_define->engine;
if (!engine->arguments)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", engine_name);
ASTs & engine_args = engine->arguments->children;
auto use_table_cache = false;
StoragePostgreSQL::Configuration configuration;
if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args, context))
{
configuration = StoragePostgreSQL::processNamedCollectionResult(*named_collection, context, false);
use_table_cache = named_collection->getOrDefault<UInt64>("use_table_cache", 0);
}
else
{
if (engine_args.size() < 4 || engine_args.size() > 6)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"PostgreSQL Database require `host:port`, `database_name`, `username`, `password`"
"[, `schema` = "", `use_table_cache` = 0");
for (auto & engine_arg : engine_args)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, context);
const auto & host_port = safeGetLiteralValue<String>(engine_args[0], engine_name);
size_t max_addresses = context->getSettingsRef().glob_expansion_max_elements;
configuration.addresses = parseRemoteDescriptionForExternalDatabase(host_port, max_addresses, 5432);
configuration.database = safeGetLiteralValue<String>(engine_args[1], engine_name);
configuration.username = safeGetLiteralValue<String>(engine_args[2], engine_name);
configuration.password = safeGetLiteralValue<String>(engine_args[3], engine_name);
bool is_deprecated_syntax = false;
if (engine_args.size() >= 5)
{
auto arg_value = engine_args[4]->as<ASTLiteral>()->value;
if (arg_value.getType() == Field::Types::Which::String)
{
configuration.schema = safeGetLiteralValue<String>(engine_args[4], engine_name);
}
else
{
use_table_cache = safeGetLiteralValue<UInt8>(engine_args[4], engine_name);
LOG_WARNING(&Poco::Logger::get("DatabaseFactory"), "A deprecated syntax of PostgreSQL database engine is used");
is_deprecated_syntax = true;
}
}
if (!is_deprecated_syntax && engine_args.size() >= 6)
use_table_cache = safeGetLiteralValue<UInt8>(engine_args[5], engine_name);
}
const auto & settings = context->getSettingsRef();
auto pool = std::make_shared<postgres::PoolWithFailover>(
configuration,
settings.postgresql_connection_pool_size,
settings.postgresql_connection_pool_wait_timeout,
POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES,
settings.postgresql_connection_pool_auto_close_connection);
return std::make_shared<DatabasePostgreSQL>(
context, metadata_path, engine_define, database_name, configuration, pool, use_table_cache);
}
else if (engine_name == "MaterializedPostgreSQL")
{
const ASTFunction * engine = engine_define->engine;
if (!engine->arguments)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", engine_name);
ASTs & engine_args = engine->arguments->children;
StoragePostgreSQL::Configuration configuration;
if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args, context))
{
configuration = StoragePostgreSQL::processNamedCollectionResult(*named_collection, context, false);
}
else
{
if (engine_args.size() != 4)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"MaterializedPostgreSQL Database require `host:port`, `database_name`, `username`, `password`.");
for (auto & engine_arg : engine_args)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, context);
auto parsed_host_port = parseAddress(safeGetLiteralValue<String>(engine_args[0], engine_name), 5432);
configuration.host = parsed_host_port.first;
configuration.port = parsed_host_port.second;
configuration.database = safeGetLiteralValue<String>(engine_args[1], engine_name);
configuration.username = safeGetLiteralValue<String>(engine_args[2], engine_name);
configuration.password = safeGetLiteralValue<String>(engine_args[3], engine_name);
}
auto connection_info = postgres::formatConnectionString(
configuration.database, configuration.host, configuration.port, configuration.username, configuration.password);
auto postgresql_replica_settings = std::make_unique<MaterializedPostgreSQLSettings>();
if (engine_define->settings)
postgresql_replica_settings->loadFromQuery(*engine_define);
return std::make_shared<DatabaseMaterializedPostgreSQL>(
context, metadata_path, uuid, create.attach,
database_name, configuration.database, connection_info,
std::move(postgresql_replica_settings));
}
#endif
#if USE_SQLITE
else if (engine_name == "SQLite")
{
const ASTFunction * engine = engine_define->engine;
if (!engine->arguments || engine->arguments->children.size() != 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "SQLite database requires 1 argument: database path");
const auto & arguments = engine->arguments->children;
String database_path = safeGetLiteralValue<String>(arguments[0], "SQLite");
return std::make_shared<DatabaseSQLite>(context, engine_define, create.attach, database_path);
}
#endif
else if (engine_name == "Filesystem")
{
const ASTFunction * engine = engine_define->engine;
/// If init_path is empty, then the current path will be used
std::string init_path;
if (engine->arguments && !engine->arguments->children.empty())
{
if (engine->arguments->children.size() != 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Filesystem database requires at most 1 argument: filesystem_path");
const auto & arguments = engine->arguments->children;
init_path = safeGetLiteralValue<String>(arguments[0], engine_name);
}
return std::make_shared<DatabaseFilesystem>(database_name, init_path, context);
}
#if USE_AWS_S3
else if (engine_name == "S3")
{
const ASTFunction * engine = engine_define->engine;
DatabaseS3::Configuration config;
if (engine->arguments && !engine->arguments->children.empty())
{
ASTs & engine_args = engine->arguments->children;
config = DatabaseS3::parseArguments(engine_args, context);
}
return std::make_shared<DatabaseS3>(database_name, config, context);
}
#endif
#if USE_HDFS
else if (engine_name == "HDFS")
{
const ASTFunction * engine = engine_define->engine;
/// If source_url is empty, then table name must contain full url
std::string source_url;
if (engine->arguments && !engine->arguments->children.empty())
{
if (engine->arguments->children.size() != 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "HDFS database requires at most 1 argument: source_url");
const auto & arguments = engine->arguments->children;
source_url = safeGetLiteralValue<String>(arguments[0], engine_name);
}
return std::make_shared<DatabaseHDFS>(database_name, source_url, context);
}
#endif
throw Exception(ErrorCodes::UNKNOWN_DATABASE_ENGINE, "Unknown database engine: {}", engine_name);
const String & engine_name = storage->engine->name;
bool has_engine_args = false;
if (storage->engine->arguments)
has_engine_args = true;
ASTs empty_engine_args;
Arguments arguments{
.engine_name = engine_name,
.engine_args = has_engine_args ? storage->engine->arguments->children : empty_engine_args,
.create_query = create,
.database_name = database_name,
.metadata_path = metadata_path,
.uuid = create.uuid,
.context = context};
// creator_fn creates and returns a DatabasePtr with the supplied arguments
auto creator_fn = database_engines.at(engine_name);
return creator_fn(arguments);
}
}

View File

@ -2,18 +2,60 @@
#include <Interpreters/Context_fwd.h>
#include <Databases/IDatabase.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTLiteral.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
class ASTCreateQuery;
class DatabaseFactory
template <typename ValueType>
static inline ValueType safeGetLiteralValue(const ASTPtr &ast, const String &engine_name)
{
if (!ast || !ast->as<ASTLiteral>())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine {} requested literal argument.", engine_name);
return ast->as<ASTLiteral>()->value.safeGet<ValueType>();
}
class DatabaseFactory : private boost::noncopyable
{
public:
static DatabasePtr get(const ASTCreateQuery & create, const String & metadata_path, ContextPtr context);
static DatabasePtr getImpl(const ASTCreateQuery & create, const String & metadata_path, ContextPtr context);
static DatabaseFactory & instance();
struct Arguments
{
const String & engine_name;
ASTs & engine_args;
ASTStorage * storage;
const ASTCreateQuery & create_query;
const String & database_name;
const String & metadata_path;
const UUID & uuid;
ContextPtr & context;
};
DatabasePtr get(const ASTCreateQuery & create, const String & metadata_path, ContextPtr context);
using CreatorFn = std::function<DatabasePtr(const Arguments & arguments)>;
using DatabaseEngines = std::unordered_map<std::string, CreatorFn>;
void registerDatabase(const std::string & name, CreatorFn creator_fn);
const DatabaseEngines & getDatabaseEngines() const { return database_engines; }
private:
DatabaseEngines database_engines;
DatabasePtr getImpl(const ASTCreateQuery & create, const String & metadata_path, ContextPtr context);
};
}

View File

@ -1,3 +1,4 @@
#include <Databases/DatabaseFactory.h>
#include <Databases/DatabaseFilesystem.h>
#include <IO/Operators.h>
@ -237,4 +238,28 @@ DatabaseTablesIteratorPtr DatabaseFilesystem::getTablesIterator(ContextPtr, cons
return std::make_unique<DatabaseTablesSnapshotIterator>(Tables{}, getDatabaseName());
}
void registerDatabaseFilesystem(DatabaseFactory & factory)
{
auto create_fn = [](const DatabaseFactory::Arguments & args)
{
auto * engine_define = args.create_query.storage;
const ASTFunction * engine = engine_define->engine;
const String & engine_name = engine_define->engine->name;
/// If init_path is empty, then the current path will be used
std::string init_path;
if (engine->arguments && !engine->arguments->children.empty())
{
if (engine->arguments->children.size() != 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Filesystem database requires at most 1 argument: filesystem_path");
const auto & arguments = engine->arguments->children;
init_path = safeGetLiteralValue<String>(arguments[0], engine_name);
}
return std::make_shared<DatabaseFilesystem>(args.database_name, init_path, args.context);
};
factory.registerDatabase("Filesystem", create_fn);
}
}

View File

@ -2,6 +2,7 @@
#if USE_HDFS
#include <Databases/DatabaseFactory.h>
#include <Databases/DatabaseHDFS.h>
#include <Interpreters/Context.h>
@ -237,6 +238,30 @@ DatabaseTablesIteratorPtr DatabaseHDFS::getTablesIterator(ContextPtr, const Filt
return std::make_unique<DatabaseTablesSnapshotIterator>(Tables{}, getDatabaseName());
}
void registerDatabaseHDFS(DatabaseFactory & factory)
{
auto create_fn = [](const DatabaseFactory::Arguments & args)
{
auto * engine_define = args.create_query.storage;
const ASTFunction * engine = engine_define->engine;
const String & engine_name = engine_define->engine->name;
/// If source_url is empty, then table name must contain full url
std::string source_url;
if (engine->arguments && !engine->arguments->children.empty())
{
if (engine->arguments->children.size() != 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "HDFS database requires at most 1 argument: source_url");
const auto & arguments = engine->arguments->children;
source_url = safeGetLiteralValue<String>(arguments[0], engine_name);
}
return std::make_shared<DatabaseHDFS>(args.database_name, source_url, args.context);
};
factory.registerDatabase("HDFS", create_fn);
}
} // DB
#endif

View File

@ -1,4 +1,5 @@
#include <Core/Settings.h>
#include <Databases/DatabaseFactory.h>
#include <Databases/DatabaseLazy.h>
#include <Databases/DatabaseOnDisk.h>
#include <Databases/DatabasesCommon.h>
@ -7,6 +8,7 @@
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h>
#include <Storages/IStorage.h>
#include <Common/escapeForFileName.h>
@ -34,6 +36,7 @@ namespace ErrorCodes
extern const int UNKNOWN_TABLE;
extern const int UNSUPPORTED_METHOD;
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
}
@ -354,4 +357,26 @@ const StoragePtr & DatabaseLazyIterator::table() const
return current_storage;
}
void registerDatabaseLazy(DatabaseFactory & factory)
{
auto create_fn = [](const DatabaseFactory::Arguments & args)
{
auto * engine_define = args.create_query.storage;
const ASTFunction * engine = engine_define->engine;
if (!engine->arguments || engine->arguments->children.size() != 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Lazy database require cache_expiration_time_seconds argument");
const auto & arguments = engine->arguments->children;
const auto cache_expiration_time_seconds = safeGetLiteralValue<UInt64>(arguments[0], "Lazy");
return make_shared<DatabaseLazy>(
args.database_name,
args.metadata_path,
cache_expiration_time_seconds,
args.context);
};
factory.registerDatabase("Lazy", create_fn);
}
}

View File

@ -1,5 +1,6 @@
#include <base/scope_guard.h>
#include <Common/logger_useful.h>
#include <Databases/DatabaseFactory.h>
#include <Databases/DatabaseMemory.h>
#include <Databases/DatabasesCommon.h>
#include <Databases/DDLDependencyVisitor.h>
@ -209,4 +210,15 @@ std::vector<std::pair<ASTPtr, StoragePtr>> DatabaseMemory::getTablesForBackup(co
return res;
}
void registerDatabaseMemory(DatabaseFactory & factory)
{
auto create_fn = [](const DatabaseFactory::Arguments & args)
{
return make_shared<DatabaseMemory>(
args.database_name,
args.context);
};
factory.registerDatabase("Memory", create_fn);
}
}

View File

@ -1,6 +1,7 @@
#include <filesystem>
#include <Core/Settings.h>
#include <Databases/DatabaseFactory.h>
#include <Databases/DatabaseOnDisk.h>
#include <Databases/DatabaseOrdinary.h>
#include <Databases/DatabasesCommon.h>
@ -37,6 +38,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int UNKNOWN_DATABASE_ENGINE;
}
static constexpr size_t METADATA_FILE_BUFFER_SIZE = 32768;
@ -321,4 +323,19 @@ void DatabaseOrdinary::commitAlterTable(const StorageID &, const String & table_
}
}
void registerDatabaseOrdinary(DatabaseFactory & factory)
{
auto create_fn = [](const DatabaseFactory::Arguments & args)
{
if (!args.create_query.attach && !args.context->getSettingsRef().allow_deprecated_database_ordinary)
throw Exception(
ErrorCodes::UNKNOWN_DATABASE_ENGINE,
"Ordinary database engine is deprecated (see also allow_deprecated_database_ordinary setting)");
return make_shared<DatabaseOrdinary>(
args.database_name,
args.metadata_path,
args.context);
};
factory.registerDatabase("Ordinary", create_fn);
}
}

View File

@ -13,6 +13,7 @@
#include <Common/ZooKeeper/Types.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/PoolId.h>
#include <Databases/DatabaseFactory.h>
#include <Databases/DatabaseReplicated.h>
#include <Databases/DatabaseReplicatedWorker.h>
#include <Databases/DDLDependencyVisitor.h>
@ -1054,7 +1055,7 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep
for (auto & [_, intermediate, to] : replicated_tables_to_rename)
rename_table(intermediate, to);
LOG_DEBUG(log, "Renames completed succesessfully");
LOG_DEBUG(log, "Renames completed successfully");
for (const auto & id : dropped_tables)
DatabaseCatalog::instance().waitTableFinallyDropped(id);
@ -1652,4 +1653,41 @@ bool DatabaseReplicated::shouldReplicateQuery(const ContextPtr & query_context,
return true;
}
void registerDatabaseReplicated(DatabaseFactory & factory)
{
auto create_fn = [](const DatabaseFactory::Arguments & args)
{
auto * engine_define = args.create_query.storage;
const ASTFunction * engine = engine_define->engine;
if (!engine->arguments || engine->arguments->children.size() != 3)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Replicated database requires 3 arguments: zookeeper path, shard name and replica name");
auto & arguments = engine->arguments->children;
for (auto & engine_arg : arguments)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, args.context);
String zookeeper_path = safeGetLiteralValue<String>(arguments[0], "Replicated");
String shard_name = safeGetLiteralValue<String>(arguments[1], "Replicated");
String replica_name = safeGetLiteralValue<String>(arguments[2], "Replicated");
zookeeper_path = args.context->getMacros()->expand(zookeeper_path);
shard_name = args.context->getMacros()->expand(shard_name);
replica_name = args.context->getMacros()->expand(replica_name);
DatabaseReplicatedSettings database_replicated_settings{};
if (engine_define->settings)
database_replicated_settings.loadFromQuery(*engine_define);
return std::make_shared<DatabaseReplicated>(
args.database_name,
args.metadata_path,
args.uuid,
zookeeper_path,
shard_name,
replica_name,
std::move(database_replicated_settings), args.context);
};
factory.registerDatabase("Replicated", create_fn);
}
}

View File

@ -2,6 +2,7 @@
#if USE_AWS_S3
#include <Databases/DatabaseFactory.h>
#include <Databases/DatabaseS3.h>
#include <Interpreters/Context.h>
@ -307,6 +308,24 @@ DatabaseTablesIteratorPtr DatabaseS3::getTablesIterator(ContextPtr, const Filter
return std::make_unique<DatabaseTablesSnapshotIterator>(Tables{}, getDatabaseName());
}
}
void registerDatabaseS3(DatabaseFactory & factory)
{
auto create_fn = [](const DatabaseFactory::Arguments & args)
{
auto * engine_define = args.create_query.storage;
const ASTFunction * engine = engine_define->engine;
DatabaseS3::Configuration config;
if (engine->arguments && !engine->arguments->children.empty())
{
ASTs & engine_args = engine->arguments->children;
config = DatabaseS3::parseArguments(engine_args, args.context);
}
return std::make_shared<DatabaseS3>(args.database_name, config, args.context);
};
factory.registerDatabase("S3", create_fn);
}
}
#endif

View File

@ -2,13 +2,20 @@
#if USE_MYSQL
# include <Common/parseAddress.h>
# include <Common/parseRemoteDescription.h>
# include <Databases/MySQL/DatabaseMaterializedMySQL.h>
# include <Interpreters/Context.h>
# include <Interpreters/evaluateConstantExpression.h>
# include <Databases/DatabaseFactory.h>
# include <Databases/MySQL/DatabaseMaterializedTablesIterator.h>
# include <Databases/MySQL/MaterializedMySQLSyncThread.h>
# include <Parsers/ASTCreateQuery.h>
# include <Parsers/ASTFunction.h>
# include <Parsers/queryToString.h>
# include <Storages/StorageMySQL.h>
# include <Storages/StorageMaterializedMySQL.h>
# include <Storages/NamedCollectionsHelpers.h>
# include <Common/setThreadName.h>
# include <Common/PoolId.h>
# include <filesystem>
@ -21,6 +28,7 @@ namespace DB
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int BAD_ARGUMENTS;
}
DatabaseMaterializedMySQL::DatabaseMaterializedMySQL(
@ -179,6 +187,86 @@ void DatabaseMaterializedMySQL::stopReplication()
started_up = false;
}
void registerDatabaseMaterializedMySQL(DatabaseFactory & factory)
{
auto create_fn = [](const DatabaseFactory::Arguments & args)
{
auto * engine_define = args.create_query.storage;
const ASTFunction * engine = engine_define->engine;
const String & engine_name = engine_define->engine->name;
if (!engine->arguments)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", engine_name);
StorageMySQL::Configuration configuration;
ASTs & arguments = engine->arguments->children;
auto mysql_settings = std::make_unique<MySQLSettings>();
if (auto named_collection = tryGetNamedCollectionWithOverrides(arguments, args.context))
{
configuration = StorageMySQL::processNamedCollectionResult(*named_collection, *mysql_settings, args.context, false);
}
else
{
if (arguments.size() != 4)
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"MySQL database require mysql_hostname, mysql_database_name, mysql_username, mysql_password arguments.");
arguments[1] = evaluateConstantExpressionOrIdentifierAsLiteral(arguments[1], args.context);
const auto & host_port = safeGetLiteralValue<String>(arguments[0], engine_name);
if (engine_name == "MySQL")
{
size_t max_addresses = args.context->getSettingsRef().glob_expansion_max_elements;
configuration.addresses = parseRemoteDescriptionForExternalDatabase(host_port, max_addresses, 3306);
}
else
{
const auto & [remote_host, remote_port] = parseAddress(host_port, 3306);
configuration.host = remote_host;
configuration.port = remote_port;
}
configuration.database = safeGetLiteralValue<String>(arguments[1], engine_name);
configuration.username = safeGetLiteralValue<String>(arguments[2], engine_name);
configuration.password = safeGetLiteralValue<String>(arguments[3], engine_name);
}
MySQLClient client(configuration.host, configuration.port, configuration.username, configuration.password);
auto mysql_pool
= mysqlxx::Pool(configuration.database, configuration.host, configuration.username, configuration.password, configuration.port);
auto materialize_mode_settings = std::make_unique<MaterializedMySQLSettings>();
if (engine_define->settings)
materialize_mode_settings->loadFromQuery(*engine_define);
if (args.uuid == UUIDHelpers::Nil)
{
auto print_create_ast = args.create_query.clone();
print_create_ast->as<ASTCreateQuery>()->attach = false;
throw Exception(
ErrorCodes::NOT_IMPLEMENTED,
"The MaterializedMySQL database engine no longer supports Ordinary databases. To re-create the database, delete "
"the old one by executing \"rm -rf {}{{,.sql}}\", then re-create the database with the following query: {}",
args.metadata_path,
queryToString(print_create_ast));
}
return make_shared<DatabaseMaterializedMySQL>(
args.context,
args.database_name,
args.metadata_path,
args.uuid,
configuration.database,
std::move(mysql_pool),
std::move(client),
std::move(materialize_mode_settings));
};
factory.registerDatabase("MaterializeMySQL", create_fn);
factory.registerDatabase("MaterializedMySQL", create_fn);
}
}
#endif

View File

@ -2,6 +2,7 @@
#if USE_MYSQL
# include <string>
# include <Databases/DatabaseFactory.h>
# include <DataTypes/DataTypeDateTime.h>
# include <DataTypes/DataTypeNullable.h>
# include <DataTypes/DataTypeString.h>
@ -14,6 +15,7 @@
# include <QueryPipeline/QueryPipelineBuilder.h>
# include <IO/Operators.h>
# include <Interpreters/Context.h>
# include <Interpreters/evaluateConstantExpression.h>
# include <Parsers/ASTCreateQuery.h>
# include <Parsers/ASTFunction.h>
# include <Parsers/ParserCreateQuery.h>
@ -21,8 +23,11 @@
# include <Parsers/queryToString.h>
# include <Storages/StorageMySQL.h>
# include <Storages/MySQL/MySQLSettings.h>
# include <Storages/MySQL/MySQLHelpers.h>
# include <Storages/NamedCollectionsHelpers.h>
# include <Common/escapeForFileName.h>
# include <Common/parseAddress.h>
# include <Common/parseRemoteDescription.h>
# include <Common/setThreadName.h>
# include <filesystem>
# include <Common/filesystemHelpers.h>
@ -41,6 +46,8 @@ namespace ErrorCodes
extern const int TABLE_IS_DROPPED;
extern const int TABLE_ALREADY_EXISTS;
extern const int UNEXPECTED_AST_STRUCTURE;
extern const int CANNOT_CREATE_DATABASE;
extern const int BAD_ARGUMENTS;
}
constexpr static const auto suffix = ".remove_flag";
@ -504,6 +511,77 @@ void DatabaseMySQL::createTable(ContextPtr local_context, const String & table_n
attachTable(local_context, table_name, storage, {});
}
void registerDatabaseMySQL(DatabaseFactory & factory)
{
auto create_fn = [](const DatabaseFactory::Arguments & args)
{
auto * engine_define = args.create_query.storage;
const ASTFunction * engine = engine_define->engine;
const String & engine_name = engine_define->engine->name;
if (!engine->arguments)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", engine_name);
StorageMySQL::Configuration configuration;
ASTs & arguments = engine->arguments->children;
auto mysql_settings = std::make_unique<MySQLSettings>();
if (auto named_collection = tryGetNamedCollectionWithOverrides(arguments, args.context))
{
configuration = StorageMySQL::processNamedCollectionResult(*named_collection, *mysql_settings, args.context, false);
}
else
{
if (arguments.size() != 4)
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"MySQL database require mysql_hostname, mysql_database_name, mysql_username, mysql_password arguments.");
arguments[1] = evaluateConstantExpressionOrIdentifierAsLiteral(arguments[1], args.context);
const auto & host_port = safeGetLiteralValue<String>(arguments[0], engine_name);
if (engine_name == "MySQL")
{
size_t max_addresses = args.context->getSettingsRef().glob_expansion_max_elements;
configuration.addresses = parseRemoteDescriptionForExternalDatabase(host_port, max_addresses, 3306);
}
else
{
const auto & [remote_host, remote_port] = parseAddress(host_port, 3306);
configuration.host = remote_host;
configuration.port = remote_port;
}
configuration.database = safeGetLiteralValue<String>(arguments[1], engine_name);
configuration.username = safeGetLiteralValue<String>(arguments[2], engine_name);
configuration.password = safeGetLiteralValue<String>(arguments[3], engine_name);
}
mysql_settings->loadFromQueryContext(args.context, *engine_define);
if (engine_define->settings)
mysql_settings->loadFromQuery(*engine_define);
auto mysql_pool = createMySQLPoolWithFailover(configuration, *mysql_settings);
try
{
return make_shared<DatabaseMySQL>(
args.context,
args.database_name,
args.metadata_path,
engine_define,
configuration.database,
std::move(mysql_settings),
std::move(mysql_pool),
args.create_query.attach);
}
catch (...)
{
const auto & exception_message = getCurrentExceptionMessage(true);
throw Exception(ErrorCodes::CANNOT_CREATE_DATABASE, "Cannot create MySQL database, because {}", exception_message);
}
};
factory.registerDatabase("MySQL", create_fn);
}
}
#endif

View File

@ -8,23 +8,25 @@
#include <Common/logger_useful.h>
#include <Common/Macros.h>
#include <Common/PoolId.h>
#include <Common/parseAddress.h>
#include <Common/parseRemoteDescription.h>
#include <Core/UUID.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeArray.h>
#include <Databases/DatabaseOrdinary.h>
#include <Databases/DatabaseAtomic.h>
#include <Databases/DatabaseFactory.h>
#include <Storages/NamedCollectionsHelpers.h>
#include <Storages/StoragePostgreSQL.h>
#include <Storages/AlterCommands.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/queryToString.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/InterpreterAlterQuery.h>
#include <Common/escapeForFileName.h>
#include <Poco/DirectoryIterator.h>
#include <Poco/File.h>
namespace DB
{
@ -471,6 +473,59 @@ DatabaseTablesIteratorPtr DatabaseMaterializedPostgreSQL::getTablesIterator(
return DatabaseAtomic::getTablesIterator(StorageMaterializedPostgreSQL::makeNestedTableContext(local_context), filter_by_table_name);
}
void registerDatabaseMaterializedPostgreSQL(DatabaseFactory & factory)
{
auto create_fn = [](const DatabaseFactory::Arguments & args)
{
auto * engine_define = args.create_query.storage;
const ASTFunction * engine = engine_define->engine;
ASTs & engine_args = engine->arguments->children;
const String & engine_name = engine_define->engine->name;
if (!engine->arguments)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", engine_name);
StoragePostgreSQL::Configuration configuration;
if (!engine->arguments)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", engine_name);
if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args, args.context))
{
configuration = StoragePostgreSQL::processNamedCollectionResult(*named_collection, args.context, false);
}
else
{
if (engine_args.size() != 4)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"MaterializedPostgreSQL Database require `host:port`, `database_name`, `username`, `password`.");
for (auto & engine_arg : engine_args)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, args.context);
auto parsed_host_port = parseAddress(safeGetLiteralValue<String>(engine_args[0], engine_name), 5432);
configuration.host = parsed_host_port.first;
configuration.port = parsed_host_port.second;
configuration.database = safeGetLiteralValue<String>(engine_args[1], engine_name);
configuration.username = safeGetLiteralValue<String>(engine_args[2], engine_name);
configuration.password = safeGetLiteralValue<String>(engine_args[3], engine_name);
}
auto connection_info = postgres::formatConnectionString(
configuration.database, configuration.host, configuration.port, configuration.username, configuration.password);
auto postgresql_replica_settings = std::make_unique<MaterializedPostgreSQLSettings>();
if (engine_define->settings)
postgresql_replica_settings->loadFromQuery(*engine_define);
return std::make_shared<DatabaseMaterializedPostgreSQL>(
args.context, args.metadata_path, args.uuid, args.create_query.attach,
args.database_name, configuration.database, connection_info,
std::move(postgresql_replica_settings));
};
factory.registerDatabase("MaterializedPostgreSQL", create_fn);
}
}
#endif

View File

@ -6,14 +6,18 @@
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeArray.h>
#include <Storages/NamedCollectionsHelpers.h>
#include <Storages/StoragePostgreSQL.h>
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/queryToString.h>
#include <Common/escapeForFileName.h>
#include <Common/parseRemoteDescription.h>
#include <Databases/DatabaseFactory.h>
#include <Databases/PostgreSQL/fetchPostgreSQLTableStructure.h>
#include <Common/quoteString.h>
#include <Common/filesystemHelpers.h>
@ -478,6 +482,83 @@ ASTPtr DatabasePostgreSQL::getColumnDeclaration(const DataTypePtr & data_type) c
return std::make_shared<ASTIdentifier>(data_type->getName());
}
void registerDatabasePostgreSQL(DatabaseFactory & factory)
{
auto create_fn = [](const DatabaseFactory::Arguments & args)
{
auto * engine_define = args.create_query.storage;
const ASTFunction * engine = engine_define->engine;
ASTs & engine_args = engine->arguments->children;
const String & engine_name = engine_define->engine->name;
if (!engine->arguments)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", engine_name);
auto use_table_cache = false;
StoragePostgreSQL::Configuration configuration;
if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args, args.context))
{
configuration = StoragePostgreSQL::processNamedCollectionResult(*named_collection, args.context, false);
use_table_cache = named_collection->getOrDefault<UInt64>("use_table_cache", 0);
}
else
{
if (engine_args.size() < 4 || engine_args.size() > 6)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"PostgreSQL Database require `host:port`, `database_name`, `username`, `password`"
"[, `schema` = "", `use_table_cache` = 0");
for (auto & engine_arg : engine_args)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, args.context);
const auto & host_port = safeGetLiteralValue<String>(engine_args[0], engine_name);
size_t max_addresses = args.context->getSettingsRef().glob_expansion_max_elements;
configuration.addresses = parseRemoteDescriptionForExternalDatabase(host_port, max_addresses, 5432);
configuration.database = safeGetLiteralValue<String>(engine_args[1], engine_name);
configuration.username = safeGetLiteralValue<String>(engine_args[2], engine_name);
configuration.password = safeGetLiteralValue<String>(engine_args[3], engine_name);
bool is_deprecated_syntax = false;
if (engine_args.size() >= 5)
{
auto arg_value = engine_args[4]->as<ASTLiteral>()->value;
if (arg_value.getType() == Field::Types::Which::String)
{
configuration.schema = safeGetLiteralValue<String>(engine_args[4], engine_name);
}
else
{
use_table_cache = safeGetLiteralValue<UInt8>(engine_args[4], engine_name);
LOG_WARNING(&Poco::Logger::get("DatabaseFactory"), "A deprecated syntax of PostgreSQL database engine is used");
is_deprecated_syntax = true;
}
}
if (!is_deprecated_syntax && engine_args.size() >= 6)
use_table_cache = safeGetLiteralValue<UInt8>(engine_args[5], engine_name);
}
const auto & settings = args.context->getSettingsRef();
auto pool = std::make_shared<postgres::PoolWithFailover>(
configuration,
settings.postgresql_connection_pool_size,
settings.postgresql_connection_pool_wait_timeout,
POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES,
settings.postgresql_connection_pool_auto_close_connection);
return std::make_shared<DatabasePostgreSQL>(
args.context,
args.metadata_path,
engine_define,
args.database_name,
configuration,
pool,
use_table_cache);
};
factory.registerDatabase("PostgreSQL", create_fn);
}
}
#endif

View File

@ -5,11 +5,11 @@
#include <Common/logger_useful.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeNullable.h>
#include <Databases/DatabaseFactory.h>
#include <Databases/SQLite/fetchSQLiteTableStructure.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTColumnDeclaration.h>
#include <Parsers/ASTFunction.h>
#include <Interpreters/Context.h>
#include <Storages/StorageSQLite.h>
#include <Databases/SQLite/SQLiteUtils.h>
@ -21,6 +21,7 @@ namespace ErrorCodes
{
extern const int SQLITE_ENGINE_ERROR;
extern const int UNKNOWN_TABLE;
extern const int BAD_ARGUMENTS;
}
DatabaseSQLite::DatabaseSQLite(
@ -201,6 +202,24 @@ ASTPtr DatabaseSQLite::getCreateTableQueryImpl(const String & table_name, Contex
return create_table_query;
}
void registerDatabaseSQLite(DatabaseFactory & factory)
{
auto create_fn = [](const DatabaseFactory::Arguments & args)
{
auto * engine_define = args.create_query.storage;
const ASTFunction * engine = engine_define->engine;
if (!engine->arguments || engine->arguments->children.size() != 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "SQLite database requires 1 argument: database path");
const auto & arguments = engine->arguments->children;
String database_path = safeGetLiteralValue<String>(arguments[0], "SQLite");
return std::make_shared<DatabaseSQLite>(args.context, engine_define, args.create_query.attach, database_path);
};
factory.registerDatabase("SQLite", create_fn);
}
}
#endif

View File

@ -0,0 +1,72 @@
#include <Databases/DatabaseFactory.h>
#include <Databases/registerDatabases.h>
namespace DB
{
void registerDatabaseAtomic(DatabaseFactory & factory);
void registerDatabaseOrdinary(DatabaseFactory & factory);
void registerDatabaseDictionary(DatabaseFactory & factory);
void registerDatabaseMemory(DatabaseFactory & factory);
void registerDatabaseLazy(DatabaseFactory & factory);
void registerDatabaseFilesystem(DatabaseFactory & factory);
void registerDatabaseReplicated(DatabaseFactory & factory);
#if USE_MYSQL
void registerDatabaseMySQL(DatabaseFactory & factory);
void registerDatabaseMaterializedMySQL(DatabaseFactory & factory);
#endif
#if USE_LIBPQXX
void registerDatabasePostgreSQL(DatabaseFactory & factory);
void registerDatabaseMaterializedPostgreSQL(DatabaseFactory & factory);
#endif
#if USE_SQLITE
void registerDatabaseSQLite(DatabaseFactory & factory);
#endif
#if USE_AWS_S3
void registerDatabaseS3(DatabaseFactory & factory);
#endif
#if USE_HDFS
void registerDatabaseHDFS(DatabaseFactory & factory);
#endif
void registerDatabases()
{
auto & factory = DatabaseFactory::instance();
registerDatabaseAtomic(factory);
registerDatabaseOrdinary(factory);
registerDatabaseDictionary(factory);
registerDatabaseMemory(factory);
registerDatabaseLazy(factory);
registerDatabaseFilesystem(factory);
registerDatabaseReplicated(factory);
#if USE_MYSQL
registerDatabaseMySQL(factory);
registerDatabaseMaterializedMySQL(factory);
#endif
#if USE_LIBPQXX
registerDatabasePostgreSQL(factory);
registerDatabaseMaterializedPostgreSQL(factory);
#endif
#if USE_SQLITE
registerDatabaseSQLite(factory);
#endif
#if USE_AWS_S3
registerDatabaseS3(factory);
#endif
#if USE_HDFS
registerDatabaseHDFS(factory);
#endif
}
}

View File

@ -0,0 +1,6 @@
#pragma once
namespace DB
{
void registerDatabases();
}

View File

@ -47,7 +47,6 @@
#include <Common/Exception.h>
#include <Core/AccurateComparison.h>
#include <Functions/IFunctionAdaptors.h>
#include <Functions/FunctionsMiscellaneous.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/DateTimeTransforms.h>
#include <Functions/toFixedString.h>

View File

@ -159,7 +159,6 @@ private:
class FunctionCapture : public IFunctionBase
{
public:
using Capture = ExecutableFunctionCapture::Capture;
using CapturePtr = ExecutableFunctionCapture::CapturePtr;
FunctionCapture(
@ -201,10 +200,10 @@ public:
FunctionCaptureOverloadResolver(
ExpressionActionsPtr expression_actions_,
const Names & captured_names_,
const NamesAndTypesList & lambda_arguments_,
const DataTypePtr & function_return_type_,
const String & expression_return_name_)
const Names & captured_names,
const NamesAndTypesList & lambda_arguments,
const DataTypePtr & function_return_type,
const String & expression_return_name)
: expression_actions(std::move(expression_actions_))
{
/// Check that expression does not contain unusual actions that will break columns structure.
@ -219,9 +218,9 @@ public:
arguments_map[arg.name] = arg.type;
DataTypes captured_types;
captured_types.reserve(captured_names_.size());
captured_types.reserve(captured_names.size());
for (const auto & captured_name : captured_names_)
for (const auto & captured_name : captured_names)
{
auto it = arguments_map.find(captured_name);
if (it == arguments_map.end())
@ -232,21 +231,21 @@ public:
}
DataTypes argument_types;
argument_types.reserve(lambda_arguments_.size());
for (const auto & lambda_argument : lambda_arguments_)
argument_types.reserve(lambda_arguments.size());
for (const auto & lambda_argument : lambda_arguments)
argument_types.push_back(lambda_argument.type);
return_type = std::make_shared<DataTypeFunction>(argument_types, function_return_type_);
return_type = std::make_shared<DataTypeFunction>(argument_types, function_return_type);
name = "Capture[" + toString(captured_types) + "](" + toString(argument_types) + ") -> "
+ function_return_type_->getName();
+ function_return_type->getName();
capture = std::make_shared<Capture>(Capture{
.captured_names = captured_names_,
.captured_names = captured_names,
.captured_types = std::move(captured_types),
.lambda_arguments = lambda_arguments_,
.return_name = expression_return_name_,
.return_type = function_return_type_,
.lambda_arguments = lambda_arguments,
.return_name = expression_return_name,
.return_type = function_return_type,
});
}

View File

@ -74,6 +74,8 @@ public:
size_t getNumberOfArguments() const override { return 0; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
bool useDefaultImplementationForConstants() const override { return true; }
/// Called if at least one function argument is a lambda expression.
/// For argument-lambda expressions, it defines the types of arguments of these expressions.
void getLambdaArgumentTypes(DataTypes & arguments) const override
@ -370,10 +372,10 @@ public:
/// Put all the necessary columns multiplied by the sizes of arrays into the columns.
auto replicated_column_function_ptr = IColumn::mutate(column_function->replicate(column_first_array->getOffsets()));
auto * replicated_column_function = typeid_cast<ColumnFunction *>(replicated_column_function_ptr.get());
replicated_column_function->appendArguments(arrays);
auto & replicated_column_function = typeid_cast<ColumnFunction &>(*replicated_column_function_ptr);
replicated_column_function.appendArguments(arrays);
auto lambda_result = replicated_column_function->reduce();
auto lambda_result = replicated_column_function.reduce();
/// Convert LowCardinality(T) -> T and Const(LowCardinality(T)) -> Const(T),
/// because we removed LowCardinality from return type of lambda expression.

View File

@ -412,14 +412,14 @@ private:
};
/** TimeDiff(t1, t2)
/** timeDiff(t1, t2)
* t1 and t2 can be Date or DateTime
*/
class FunctionTimeDiff : public IFunction
{
using ColumnDateTime64 = ColumnDecimal<DateTime64>;
public:
static constexpr auto name = "TimeDiff";
static constexpr auto name = "timeDiff";
static FunctionPtr create(ContextPtr) { return std::make_shared<FunctionTimeDiff>(); }
String getName() const override

View File

@ -1,118 +0,0 @@
#include <Columns/ColumnString.h>
#include <Columns/ColumnArray.h>
#include <Functions/FunctionFactory.h>
#include <Functions/IFunction.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeArray.h>
#include <Common/DNSResolver.h>
#include <Poco/Net/IPAddress.h>
#include <Interpreters/Context.h>
#include <Poco/Util/AbstractConfiguration.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int BAD_ARGUMENTS;
extern const int FUNCTION_NOT_ALLOWED;
}
class ReverseDNSQuery : public IFunction
{
public:
static constexpr auto name = "reverseDNSQuery";
static constexpr auto allow_function_config_name = "allow_reverse_dns_query_function";
static FunctionPtr create(ContextPtr)
{
return std::make_shared<ReverseDNSQuery>();
}
String getName() const override
{
return name;
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & data_type, size_t input_rows_count) const override
{
if (!Context::getGlobalContextInstance()->getConfigRef().getBool(allow_function_config_name, false))
{
throw Exception(ErrorCodes::FUNCTION_NOT_ALLOWED, "Function {} is not allowed because {} is not set", name, allow_function_config_name);
}
if (arguments.empty())
{
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Function {} requires at least one argument", name);
}
auto res_type = getReturnTypeImpl({data_type});
if (input_rows_count == 0u)
{
return res_type->createColumnConstWithDefaultValue(input_rows_count);
}
if (!isString(arguments[0].type))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Function {} requires the input column to be of type String", name);
}
auto input_column = arguments[0].column;
auto ip_address = Poco::Net::IPAddress(input_column->getDataAt(0).toString());
auto ptr_records = DNSResolver::instance().reverseResolve(ip_address);
if (ptr_records.empty())
return res_type->createColumnConstWithDefaultValue(input_rows_count);
Array res;
for (const auto & ptr_record : ptr_records)
{
res.push_back(ptr_record);
}
return res_type->createColumnConst(input_rows_count, res);
}
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override
{
return false;
}
size_t getNumberOfArguments() const override
{
return 1u;
}
DataTypePtr getReturnTypeImpl(const DataTypes & /*arguments*/) const override
{
return std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>());
}
};
REGISTER_FUNCTION(ReverseDNSQuery)
{
factory.registerFunction<ReverseDNSQuery>(
FunctionDocumentation{
.description = R"(Performs a reverse DNS query to get the PTR records associated with the IP address)",
.syntax = "reverseDNSQuery(address)",
.arguments = {{"address", "An IPv4 or IPv6 address. [String](../../sql-reference/data-types/string.md)"}},
.returned_value = "Associated domains (PTR records). [String](../../sql-reference/data-types/string.md).",
.examples = {{"",
"SELECT reverseDNSQuery('192.168.0.2');",
R"(
reverseDNSQuery('192.168.0.2')
['test2.example.com','test3.example.com']
)"}}
}
);
}
}

View File

@ -13,33 +13,14 @@ namespace ErrorCodes
}
class BrotliWriteBuffer::BrotliStateWrapper
BrotliWriteBuffer::BrotliStateWrapper::BrotliStateWrapper()
: state(BrotliEncoderCreateInstance(nullptr, nullptr, nullptr))
{
public:
BrotliStateWrapper()
: state(BrotliEncoderCreateInstance(nullptr, nullptr, nullptr))
{
}
}
~BrotliStateWrapper()
{
BrotliEncoderDestroyInstance(state);
}
BrotliEncoderState * state;
};
BrotliWriteBuffer::BrotliWriteBuffer(std::unique_ptr<WriteBuffer> out_, int compression_level, size_t buf_size, char * existing_memory, size_t alignment)
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment)
, brotli(std::make_unique<BrotliStateWrapper>())
, in_available(0)
, in_data(nullptr)
, out_capacity(0)
, out_data(nullptr)
BrotliWriteBuffer::BrotliStateWrapper::~BrotliStateWrapper()
{
BrotliEncoderSetParameter(brotli->state, BROTLI_PARAM_QUALITY, static_cast<uint32_t>(compression_level));
// Set LZ77 window size. According to brotli sources default value is 24 (c/tools/brotli.c:81)
BrotliEncoderSetParameter(brotli->state, BROTLI_PARAM_LGWIN, 24);
BrotliEncoderDestroyInstance(state);
}
BrotliWriteBuffer::~BrotliWriteBuffer() = default;
@ -58,18 +39,20 @@ void BrotliWriteBuffer::nextImpl()
{
do
{
const auto * in_data_ptr = in_data;
out->nextIfAtEnd();
out_data = reinterpret_cast<unsigned char *>(out->position());
out_capacity = out->buffer().end() - out->position();
int result = BrotliEncoderCompressStream(
brotli->state,
in_available ? BROTLI_OPERATION_PROCESS : BROTLI_OPERATION_FINISH,
BROTLI_OPERATION_PROCESS,
&in_available,
&in_data,
&out_capacity,
&out_data,
nullptr);
total_in += in_data - in_data_ptr;
out->position() = out->buffer().end() - out_capacity;
@ -92,6 +75,10 @@ void BrotliWriteBuffer::finalizeBefore()
{
next();
/// Don't write out if no data was ever compressed
if (!compress_empty && total_in == 0)
return;
while (true)
{
out->nextIfAtEnd();

View File

@ -4,18 +4,38 @@
#include <IO/BufferWithOwnMemory.h>
#include <IO/WriteBufferDecorator.h>
#include "config.h"
#if USE_BROTLI
# include <brotli/encode.h>
namespace DB
{
class BrotliWriteBuffer : public WriteBufferWithOwnMemoryDecorator
{
public:
template<typename WriteBufferT>
BrotliWriteBuffer(
std::unique_ptr<WriteBuffer> out_,
WriteBufferT && out_,
int compression_level,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0);
size_t alignment = 0,
bool compress_empty_ = true)
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment)
, brotli(std::make_unique<BrotliStateWrapper>())
, in_available(0)
, in_data(nullptr)
, out_capacity(0)
, out_data(nullptr)
, compress_empty(compress_empty_)
{
BrotliEncoderSetParameter(brotli->state, BROTLI_PARAM_QUALITY, static_cast<uint32_t>(compression_level));
// Set LZ77 window size. According to brotli sources default value is 24 (c/tools/brotli.c:81)
BrotliEncoderSetParameter(brotli->state, BROTLI_PARAM_LGWIN, 24);
}
~BrotliWriteBuffer() override;
@ -24,7 +44,15 @@ private:
void finalizeBefore() override;
class BrotliStateWrapper;
class BrotliStateWrapper
{
public:
BrotliStateWrapper();
~BrotliStateWrapper();
BrotliEncoderState * state;
};
std::unique_ptr<BrotliStateWrapper> brotli;
@ -33,6 +61,12 @@ private:
size_t out_capacity;
uint8_t * out_data;
protected:
UInt64 total_in = 0;
bool compress_empty = true;
};
}
#endif

View File

@ -2,6 +2,7 @@
#include <Core/Defines.h>
#include <algorithm>
#include <memory>
namespace DB

View File

@ -15,34 +15,22 @@ namespace ErrorCodes
}
class Bzip2WriteBuffer::Bzip2StateWrapper
Bzip2WriteBuffer::Bzip2StateWrapper::Bzip2StateWrapper(int compression_level)
{
public:
explicit Bzip2StateWrapper(int compression_level)
{
memset(&stream, 0, sizeof(stream));
memset(&stream, 0, sizeof(stream));
int ret = BZ2_bzCompressInit(&stream, compression_level, 0, 0);
int ret = BZ2_bzCompressInit(&stream, compression_level, 0, 0);
if (ret != BZ_OK)
throw Exception(
ErrorCodes::BZIP2_STREAM_ENCODER_FAILED,
"bzip2 stream encoder init failed: error code: {}",
ret);
}
if (ret != BZ_OK)
throw Exception(
ErrorCodes::BZIP2_STREAM_ENCODER_FAILED,
"bzip2 stream encoder init failed: error code: {}",
ret);
}
~Bzip2StateWrapper()
{
BZ2_bzCompressEnd(&stream);
}
bz_stream stream;
};
Bzip2WriteBuffer::Bzip2WriteBuffer(std::unique_ptr<WriteBuffer> out_, int compression_level, size_t buf_size, char * existing_memory, size_t alignment)
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment)
, bz(std::make_unique<Bzip2StateWrapper>(compression_level))
Bzip2WriteBuffer::Bzip2StateWrapper::~Bzip2StateWrapper()
{
BZ2_bzCompressEnd(&stream);
}
Bzip2WriteBuffer::~Bzip2WriteBuffer() = default;
@ -77,6 +65,8 @@ void Bzip2WriteBuffer::nextImpl()
}
while (bz->stream.avail_in > 0);
total_in += offset();
}
catch (...)
{
@ -90,6 +80,10 @@ void Bzip2WriteBuffer::finalizeBefore()
{
next();
/// Don't write out if no data was ever compressed
if (!compress_empty && total_in == 0)
return;
out->nextIfAtEnd();
bz->stream.next_out = out->position();
bz->stream.avail_out = static_cast<unsigned>(out->buffer().end() - out->position());

View File

@ -4,18 +4,29 @@
#include <IO/BufferWithOwnMemory.h>
#include <IO/WriteBufferDecorator.h>
#include "config.h"
#if USE_BZIP2
# include <bzlib.h>
namespace DB
{
class Bzip2WriteBuffer : public WriteBufferWithOwnMemoryDecorator
{
public:
template<typename WriteBufferT>
Bzip2WriteBuffer(
std::unique_ptr<WriteBuffer> out_,
WriteBufferT && out_,
int compression_level,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0);
size_t alignment = 0,
bool compress_empty_ = true)
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment), bz(std::make_unique<Bzip2StateWrapper>(compression_level))
, compress_empty(compress_empty_)
{
}
~Bzip2WriteBuffer() override;
@ -24,8 +35,20 @@ private:
void finalizeBefore() override;
class Bzip2StateWrapper;
class Bzip2StateWrapper
{
public:
explicit Bzip2StateWrapper(int compression_level);
~Bzip2StateWrapper();
bz_stream stream;
};
std::unique_ptr<Bzip2StateWrapper> bz;
bool compress_empty = true;
UInt64 total_in = 0;
};
}
#endif

View File

@ -169,37 +169,66 @@ std::unique_ptr<ReadBuffer> wrapReadBufferWithCompressionMethod(
return createCompressedWrapper(std::move(nested), method, buf_size, existing_memory, alignment, zstd_window_log_max);
}
std::unique_ptr<WriteBuffer> wrapWriteBufferWithCompressionMethod(
std::unique_ptr<WriteBuffer> nested, CompressionMethod method, int level, size_t buf_size, char * existing_memory, size_t alignment)
template<typename WriteBufferT>
std::unique_ptr<WriteBuffer> createWriteCompressedWrapper(
WriteBufferT && nested, CompressionMethod method, int level, size_t buf_size, char * existing_memory, size_t alignment, bool compress_empty)
{
if (method == DB::CompressionMethod::Gzip || method == CompressionMethod::Zlib)
return std::make_unique<ZlibDeflatingWriteBuffer>(std::move(nested), method, level, buf_size, existing_memory, alignment);
return std::make_unique<ZlibDeflatingWriteBuffer>(std::forward<WriteBufferT>(nested), method, level, buf_size, existing_memory, alignment, compress_empty);
#if USE_BROTLI
if (method == DB::CompressionMethod::Brotli)
return std::make_unique<BrotliWriteBuffer>(std::move(nested), level, buf_size, existing_memory, alignment);
return std::make_unique<BrotliWriteBuffer>(std::forward<WriteBufferT>(nested), level, buf_size, existing_memory, alignment, compress_empty);
#endif
if (method == CompressionMethod::Xz)
return std::make_unique<LZMADeflatingWriteBuffer>(std::move(nested), level, buf_size, existing_memory, alignment);
return std::make_unique<LZMADeflatingWriteBuffer>(std::forward<WriteBufferT>(nested), level, buf_size, existing_memory, alignment, compress_empty);
if (method == CompressionMethod::Zstd)
return std::make_unique<ZstdDeflatingWriteBuffer>(std::move(nested), level, buf_size, existing_memory, alignment);
return std::make_unique<ZstdDeflatingWriteBuffer>(std::forward<WriteBufferT>(nested), level, buf_size, existing_memory, alignment, compress_empty);
if (method == CompressionMethod::Lz4)
return std::make_unique<Lz4DeflatingWriteBuffer>(std::move(nested), level, buf_size, existing_memory, alignment);
return std::make_unique<Lz4DeflatingWriteBuffer>(std::forward<WriteBufferT>(nested), level, buf_size, existing_memory, alignment, compress_empty);
#if USE_BZIP2
if (method == CompressionMethod::Bzip2)
return std::make_unique<Bzip2WriteBuffer>(std::move(nested), level, buf_size, existing_memory, alignment);
return std::make_unique<Bzip2WriteBuffer>(std::forward<WriteBufferT>(nested), level, buf_size, existing_memory, alignment, compress_empty);
#endif
#if USE_SNAPPY
if (method == CompressionMethod::Snappy)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported compression method");
#endif
if (method == CompressionMethod::None)
return nested;
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported compression method");
}
std::unique_ptr<WriteBuffer> wrapWriteBufferWithCompressionMethod(
std::unique_ptr<WriteBuffer> nested,
CompressionMethod method,
int level,
size_t buf_size,
char * existing_memory,
size_t alignment,
bool compress_empty)
{
if (method == CompressionMethod::None)
return nested;
return createWriteCompressedWrapper(nested, method, level, buf_size, existing_memory, alignment, compress_empty);
}
std::unique_ptr<WriteBuffer> wrapWriteBufferWithCompressionMethod(
WriteBuffer * nested,
CompressionMethod method,
int level,
size_t buf_size,
char * existing_memory,
size_t alignment,
bool compress_empty)
{
assert(method != CompressionMethod::None);
return createWriteCompressedWrapper(nested, method, level, buf_size, existing_memory, alignment, compress_empty);
}
}

View File

@ -61,13 +61,22 @@ std::unique_ptr<ReadBuffer> wrapReadBufferWithCompressionMethod(
char * existing_memory = nullptr,
size_t alignment = 0);
std::unique_ptr<WriteBuffer> wrapWriteBufferWithCompressionMethod(
std::unique_ptr<WriteBuffer> nested,
CompressionMethod method,
int level,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0);
size_t alignment = 0,
bool compress_empty = true);
std::unique_ptr<WriteBuffer> wrapWriteBufferWithCompressionMethod(
WriteBuffer * nested,
CompressionMethod method,
int level,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0,
bool compress_empty = true);
}

View File

@ -7,9 +7,7 @@ namespace ErrorCodes
extern const int LZMA_STREAM_ENCODER_FAILED;
}
LZMADeflatingWriteBuffer::LZMADeflatingWriteBuffer(
std::unique_ptr<WriteBuffer> out_, int compression_level, size_t buf_size, char * existing_memory, size_t alignment)
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment)
void LZMADeflatingWriteBuffer::initialize(int compression_level)
{
lstr = LZMA_STREAM_INIT;
@ -94,6 +92,10 @@ void LZMADeflatingWriteBuffer::finalizeBefore()
{
next();
/// Don't write out if no data was ever compressed
if (!compress_empty && lstr.total_out == 0)
return;
do
{
out->nextIfAtEnd();

View File

@ -14,22 +14,32 @@ namespace DB
class LZMADeflatingWriteBuffer : public WriteBufferWithOwnMemoryDecorator
{
public:
template<typename WriteBufferT>
LZMADeflatingWriteBuffer(
std::unique_ptr<WriteBuffer> out_,
WriteBufferT && out_,
int compression_level,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0);
size_t alignment = 0,
bool compress_empty_ = true)
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment), compress_empty(compress_empty_)
{
initialize(compression_level);
}
~LZMADeflatingWriteBuffer() override;
private:
void initialize(int compression_level);
void nextImpl() override;
void finalizeBefore() override;
void finalizeAfter() override;
lzma_stream lstr;
bool compress_empty = true;
};
}

View File

@ -63,11 +63,8 @@ namespace ErrorCodes
extern const int LZ4_ENCODER_FAILED;
}
Lz4DeflatingWriteBuffer::Lz4DeflatingWriteBuffer(
std::unique_ptr<WriteBuffer> out_, int compression_level, size_t buf_size, char * existing_memory, size_t alignment)
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment)
, tmp_memory(buf_size)
void Lz4DeflatingWriteBuffer::initialize(int compression_level)
{
kPrefs = {
{LZ4F_max256KB,
@ -105,7 +102,7 @@ void Lz4DeflatingWriteBuffer::nextImpl()
if (first_time)
{
auto sink = SinkToOut(out.get(), tmp_memory, LZ4F_HEADER_SIZE_MAX);
auto sink = SinkToOut(out, tmp_memory, LZ4F_HEADER_SIZE_MAX);
chassert(sink.getCapacity() >= LZ4F_HEADER_SIZE_MAX);
/// write frame header and check for errors
@ -131,7 +128,7 @@ void Lz4DeflatingWriteBuffer::nextImpl()
/// Ensure that there is enough space for compressed block of minimal size
size_t min_compressed_block_size = LZ4F_compressBound(1, &kPrefs);
auto sink = SinkToOut(out.get(), tmp_memory, min_compressed_block_size);
auto sink = SinkToOut(out, tmp_memory, min_compressed_block_size);
chassert(sink.getCapacity() >= min_compressed_block_size);
/// LZ4F_compressUpdate compresses whole input buffer at once so we need to shink it manually
@ -163,8 +160,12 @@ void Lz4DeflatingWriteBuffer::finalizeBefore()
{
next();
/// Don't write out if no data was ever compressed
if (!compress_empty && first_time)
return;
auto suffix_size = LZ4F_compressBound(0, &kPrefs);
auto sink = SinkToOut(out.get(), tmp_memory, suffix_size);
auto sink = SinkToOut(out, tmp_memory, suffix_size);
chassert(sink.getCapacity() >= suffix_size);
/// compression end

View File

@ -14,16 +14,26 @@ namespace DB
class Lz4DeflatingWriteBuffer : public WriteBufferWithOwnMemoryDecorator
{
public:
template<typename WriteBufferT>
Lz4DeflatingWriteBuffer(
std::unique_ptr<WriteBuffer> out_,
WriteBufferT && out_,
int compression_level,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0);
size_t alignment = 0,
bool compress_empty_ = true)
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment)
, tmp_memory(buf_size)
, compress_empty(compress_empty_)
{
initialize(compression_level);
}
~Lz4DeflatingWriteBuffer() override;
private:
void initialize(int compression_level);
void nextImpl() override;
void finalizeBefore() override;
@ -35,5 +45,6 @@ private:
Memory<> tmp_memory;
bool first_time = true;
bool compress_empty = true;
};
}

View File

@ -20,33 +20,6 @@ PeekableReadBuffer::PeekableReadBuffer(ReadBuffer & sub_buf_, size_t start_size_
checkStateCorrect();
}
void PeekableReadBuffer::reset()
{
checkStateCorrect();
}
void PeekableReadBuffer::setSubBuffer(ReadBuffer & sub_buf_)
{
sub_buf = &sub_buf_;
resetImpl();
}
void PeekableReadBuffer::resetImpl()
{
peeked_size = 0;
checkpoint = std::nullopt;
checkpoint_in_own_memory = false;
use_stack_memory = true;
if (!currentlyReadFromOwnMemory())
sub_buf->position() = pos;
Buffer & sub_working = sub_buf->buffer();
BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf->offset());
checkStateCorrect();
}
bool PeekableReadBuffer::peekNext()
{
checkStateCorrect();

View File

@ -74,12 +74,6 @@ public:
/// This data will be lost after destruction of peekable buffer.
bool hasUnreadData() const;
// for streaming reading (like in Kafka) we need to restore initial state of the buffer
// without recreating the buffer.
void reset();
void setSubBuffer(ReadBuffer & sub_buf_);
const ReadBuffer & getSubBuffer() const { return *sub_buf; }
private:

View File

@ -99,6 +99,9 @@ bool ReadBufferFromPocoSocket::nextImpl()
if (bytes_read < 0)
throw NetException(ErrorCodes::CANNOT_READ_FROM_SOCKET, "Cannot read from socket ({})", peer_address.toString());
if (read_event != ProfileEvents::end())
ProfileEvents::increment(read_event, bytes_read);
if (bytes_read)
working_buffer.resize(bytes_read);
else
@ -111,10 +114,17 @@ ReadBufferFromPocoSocket::ReadBufferFromPocoSocket(Poco::Net::Socket & socket_,
: BufferWithOwnMemory<ReadBuffer>(buf_size)
, socket(socket_)
, peer_address(socket.peerAddress())
, read_event(ProfileEvents::end())
, socket_description("socket (" + peer_address.toString() + ")")
{
}
ReadBufferFromPocoSocket::ReadBufferFromPocoSocket(Poco::Net::Socket & socket_, const ProfileEvents::Event & read_event_, size_t buf_size)
: ReadBufferFromPocoSocket(socket_, buf_size)
{
read_event = read_event_;
}
bool ReadBufferFromPocoSocket::poll(size_t timeout_microseconds) const
{
if (available())

View File

@ -20,10 +20,13 @@ protected:
*/
Poco::Net::SocketAddress peer_address;
ProfileEvents::Event read_event;
bool nextImpl() override;
public:
explicit ReadBufferFromPocoSocket(Poco::Net::Socket & socket_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
explicit ReadBufferFromPocoSocket(Poco::Net::Socket & socket_, const ProfileEvents::Event & read_event_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
bool poll(size_t timeout_microseconds) const;

View File

@ -12,13 +12,21 @@ class WriteBuffer;
/// WriteBuffer that decorates data and delegates it to underlying buffer.
/// It's used for writing compressed and encrypted data
/// This class can own or not own underlying buffer - constructor will differentiate
/// std::unique_ptr<WriteBuffer> for owning and WriteBuffer* for not owning.
template <class Base>
class WriteBufferDecorator : public Base
{
public:
template <class ... BaseArgs>
explicit WriteBufferDecorator(std::unique_ptr<WriteBuffer> out_, BaseArgs && ... args)
: Base(std::forward<BaseArgs>(args)...), out(std::move(out_))
: Base(std::forward<BaseArgs>(args)...), owning_holder(std::move(out_)), out(owning_holder.get())
{
}
template <class ... BaseArgs>
explicit WriteBufferDecorator(WriteBuffer * out_, BaseArgs && ... args)
: Base(std::forward<BaseArgs>(args)...), out(out_)
{
}
@ -38,7 +46,7 @@ public:
}
}
WriteBuffer * getNestedBuffer() { return out.get(); }
WriteBuffer * getNestedBuffer() { return out; }
protected:
/// Do some finalization before finalization of underlying buffer.
@ -47,7 +55,8 @@ protected:
/// Do some finalization after finalization of underlying buffer.
virtual void finalizeAfter() {}
std::unique_ptr<WriteBuffer> out;
std::unique_ptr<WriteBuffer> owning_holder;
WriteBuffer * out;
};
using WriteBufferWithOwnMemoryDecorator = WriteBufferDecorator<BufferWithOwnMemory<WriteBuffer>>;

View File

@ -28,7 +28,7 @@ public:
void sync() override;
std::string getFileName() const override { return assert_cast<WriteBufferFromFileBase *>(out.get())->getFileName(); }
std::string getFileName() const override { return assert_cast<WriteBufferFromFileBase *>(out)->getFileName(); }
private:
void nextImpl() override;

View File

@ -34,6 +34,97 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
ssize_t WriteBufferFromPocoSocket::socketSendBytesImpl(const char * ptr, size_t size)
{
ssize_t res = 0;
/// If async_callback is specified, set socket to non-blocking mode
/// and try to write data to it, if socket is not ready for writing,
/// run async_callback and try again later.
/// It is expected that file descriptor may be polled externally.
/// Note that send timeout is not checked here. External code should check it while polling.
if (async_callback)
{
socket.setBlocking(false);
/// Set socket to blocking mode at the end.
SCOPE_EXIT(socket.setBlocking(true));
bool secure = socket.secure();
res = socket.impl()->sendBytes(ptr, static_cast<int>(size));
/// Check EAGAIN and ERR_SSL_WANT_WRITE/ERR_SSL_WANT_READ for secure socket (writing to secure socket can read too).
while (res < 0 && (errno == EAGAIN || (secure && (checkSSLWantRead(res) || checkSSLWantWrite(res)))))
{
/// In case of ERR_SSL_WANT_READ we should wait for socket to be ready for reading, otherwise - for writing.
if (secure && checkSSLWantRead(res))
async_callback(socket.impl()->sockfd(), socket.getReceiveTimeout(), AsyncEventTimeoutType::RECEIVE, socket_description, AsyncTaskExecutor::Event::READ | AsyncTaskExecutor::Event::ERROR);
else
async_callback(socket.impl()->sockfd(), socket.getSendTimeout(), AsyncEventTimeoutType::SEND, socket_description, AsyncTaskExecutor::Event::WRITE | AsyncTaskExecutor::Event::ERROR);
/// Try to write again.
res = socket.impl()->sendBytes(ptr, static_cast<int>(size));
}
}
else
{
res = socket.impl()->sendBytes(ptr, static_cast<int>(size));
}
return res;
}
void WriteBufferFromPocoSocket::socketSendBytes(const char * ptr, size_t size)
{
if (!size)
return;
Stopwatch watch;
size_t bytes_written = 0;
SCOPE_EXIT({
ProfileEvents::increment(ProfileEvents::NetworkSendElapsedMicroseconds, watch.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::NetworkSendBytes, bytes_written);
if (write_event != ProfileEvents::end())
ProfileEvents::increment(write_event, bytes_written);
});
while (bytes_written < size)
{
ssize_t res = 0;
/// Add more details to exceptions.
try
{
CurrentMetrics::Increment metric_increment(CurrentMetrics::NetworkSend);
if (size > INT_MAX)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Buffer overflow");
res = socketSendBytesImpl(ptr + bytes_written, size - bytes_written);
}
catch (const Poco::Net::NetException & e)
{
throw NetException(ErrorCodes::NETWORK_ERROR, "{}, while writing to socket ({} -> {})", e.displayText(),
our_address.toString(), peer_address.toString());
}
catch (const Poco::TimeoutException &)
{
throw NetException(ErrorCodes::SOCKET_TIMEOUT, "Timeout exceeded while writing to socket ({}, {} ms)",
peer_address.toString(),
socket.impl()->getSendTimeout().totalMilliseconds());
}
catch (const Poco::IOException & e)
{
throw NetException(ErrorCodes::NETWORK_ERROR, "{}, while writing to socket ({} -> {})", e.displayText(),
our_address.toString(), peer_address.toString());
}
if (res < 0)
throw NetException(ErrorCodes::CANNOT_WRITE_TO_SOCKET, "Cannot write to socket ({} -> {})",
our_address.toString(), peer_address.toString());
bytes_written += res;
}
}
void WriteBufferFromPocoSocket::nextImpl()
{
if (!offset())
@ -60,36 +151,7 @@ void WriteBufferFromPocoSocket::nextImpl()
if (size > INT_MAX)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Buffer overflow");
/// If async_callback is specified, set socket to non-blocking mode
/// and try to write data to it, if socket is not ready for writing,
/// run async_callback and try again later.
/// It is expected that file descriptor may be polled externally.
/// Note that send timeout is not checked here. External code should check it while polling.
if (async_callback)
{
socket.setBlocking(false);
/// Set socket to blocking mode at the end.
SCOPE_EXIT(socket.setBlocking(true));
bool secure = socket.secure();
res = socket.impl()->sendBytes(pos, static_cast<int>(size));
/// Check EAGAIN and ERR_SSL_WANT_WRITE/ERR_SSL_WANT_READ for secure socket (writing to secure socket can read too).
while (res < 0 && (errno == EAGAIN || (secure && (checkSSLWantRead(res) || checkSSLWantWrite(res)))))
{
/// In case of ERR_SSL_WANT_READ we should wait for socket to be ready for reading, otherwise - for writing.
if (secure && checkSSLWantRead(res))
async_callback(socket.impl()->sockfd(), socket.getReceiveTimeout(), AsyncEventTimeoutType::RECEIVE, socket_description, AsyncTaskExecutor::Event::READ | AsyncTaskExecutor::Event::ERROR);
else
async_callback(socket.impl()->sockfd(), socket.getSendTimeout(), AsyncEventTimeoutType::SEND, socket_description, AsyncTaskExecutor::Event::WRITE | AsyncTaskExecutor::Event::ERROR);
/// Try to write again.
res = socket.impl()->sendBytes(pos, static_cast<int>(size));
}
}
else
{
res = socket.impl()->sendBytes(pos, static_cast<int>(size));
}
res = socketSendBytesImpl(pos, size);
}
catch (const Poco::Net::NetException & e)
{
@ -125,6 +187,12 @@ WriteBufferFromPocoSocket::WriteBufferFromPocoSocket(Poco::Net::Socket & socket_
{
}
WriteBufferFromPocoSocket::WriteBufferFromPocoSocket(Poco::Net::Socket & socket_, const ProfileEvents::Event & write_event_, size_t buf_size)
: WriteBufferFromPocoSocket(socket_, buf_size)
{
write_event = write_event_;
}
WriteBufferFromPocoSocket::~WriteBufferFromPocoSocket()
{
try

View File

@ -17,14 +17,33 @@ class WriteBufferFromPocoSocket : public BufferWithOwnMemory<WriteBuffer>
{
public:
explicit WriteBufferFromPocoSocket(Poco::Net::Socket & socket_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
explicit WriteBufferFromPocoSocket(Poco::Net::Socket & socket_, const ProfileEvents::Event & write_event_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
~WriteBufferFromPocoSocket() override;
void setAsyncCallback(AsyncCallback async_callback_) { async_callback = std::move(async_callback_); }
using WriteBuffer::write;
void write(const std::string & str) { WriteBuffer::write(str.c_str(), str.size()); }
void write(std::string_view str) { WriteBuffer::write(str.data(), str.size()); }
void write(const char * str) { WriteBuffer::write(str, strlen(str)); }
void writeln(const std::string & str) { write(str); WriteBuffer::write("\n", 1); }
void writeln(std::string_view str) { write(str); WriteBuffer::write("\n", 1); }
void writeln(const char * str) { write(str); WriteBuffer::write("\n", 1); }
protected:
void nextImpl() override;
void socketSendBytes(const char * ptr, size_t size);
void socketSendStr(const std::string & str)
{
return socketSendBytes(str.data(), str.size());
}
void socketSendStr(const char * ptr)
{
return socketSendBytes(ptr, strlen(ptr));
}
Poco::Net::Socket & socket;
/** For error messages. It is necessary to receive this address in advance, because,
@ -34,9 +53,13 @@ protected:
Poco::Net::SocketAddress peer_address;
Poco::Net::SocketAddress our_address;
ProfileEvents::Event write_event;
private:
AsyncCallback async_callback;
std::string socket_description;
ssize_t socketSendBytesImpl(const char * ptr, size_t size);
};
}

View File

@ -63,9 +63,7 @@ namespace ErrorCodes
inline void writeChar(char x, WriteBuffer & buf)
{
buf.nextIfAtEnd();
*buf.position() = x;
++buf.position();
buf.write(x);
}
/// Write the same character n times.

View File

@ -10,36 +10,6 @@ namespace ErrorCodes
extern const int ZLIB_DEFLATE_FAILED;
}
ZlibDeflatingWriteBuffer::ZlibDeflatingWriteBuffer(
std::unique_ptr<WriteBuffer> out_,
CompressionMethod compression_method,
int compression_level,
size_t buf_size,
char * existing_memory,
size_t alignment)
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment)
{
zstr.zalloc = nullptr;
zstr.zfree = nullptr;
zstr.opaque = nullptr;
zstr.next_in = nullptr;
zstr.avail_in = 0;
zstr.next_out = nullptr;
zstr.avail_out = 0;
int window_bits = 15;
if (compression_method == CompressionMethod::Gzip)
{
window_bits += 16;
}
int rc = deflateInit2(&zstr, compression_level, Z_DEFLATED, window_bits, 8, Z_DEFAULT_STRATEGY);
if (rc != Z_OK)
throw Exception(ErrorCodes::ZLIB_DEFLATE_FAILED, "deflateInit2 failed: {}; zlib version: {}", zError(rc), ZLIB_VERSION);
}
void ZlibDeflatingWriteBuffer::nextImpl()
{
if (!offset())
@ -82,6 +52,10 @@ void ZlibDeflatingWriteBuffer::finalizeBefore()
{
next();
/// Don't write out if no data was ever compressed
if (!compress_empty && zstr.total_out == 0)
return;
/// https://github.com/zlib-ng/zlib-ng/issues/494
do
{

View File

@ -12,17 +12,45 @@
namespace DB
{
namespace ErrorCodes
{
extern const int ZLIB_DEFLATE_FAILED;
}
/// Performs compression using zlib library and writes compressed data to out_ WriteBuffer.
class ZlibDeflatingWriteBuffer : public WriteBufferWithOwnMemoryDecorator
{
public:
template<typename WriteBufferT>
ZlibDeflatingWriteBuffer(
std::unique_ptr<WriteBuffer> out_,
WriteBufferT && out_,
CompressionMethod compression_method,
int compression_level,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0);
size_t alignment = 0,
bool compress_empty_ = true)
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment), compress_empty(compress_empty_)
{
zstr.zalloc = nullptr;
zstr.zfree = nullptr;
zstr.opaque = nullptr;
zstr.next_in = nullptr;
zstr.avail_in = 0;
zstr.next_out = nullptr;
zstr.avail_out = 0;
int window_bits = 15;
if (compression_method == CompressionMethod::Gzip)
{
window_bits += 16;
}
int rc = deflateInit2(&zstr, compression_level, Z_DEFLATED, window_bits, 8, Z_DEFAULT_STRATEGY);
if (rc != Z_OK)
throw Exception(ErrorCodes::ZLIB_DEFLATE_FAILED, "deflateInit2 failed: {}; zlib version: {}", zError(rc), ZLIB_VERSION);
}
~ZlibDeflatingWriteBuffer() override;
@ -36,6 +64,7 @@ private:
virtual void finalizeAfter() override;
z_stream zstr;
bool compress_empty = true;
};
}

View File

@ -8,9 +8,7 @@ namespace ErrorCodes
extern const int ZSTD_ENCODER_FAILED;
}
ZstdDeflatingWriteBuffer::ZstdDeflatingWriteBuffer(
std::unique_ptr<WriteBuffer> out_, int compression_level, size_t buf_size, char * existing_memory, size_t alignment)
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment)
void ZstdDeflatingWriteBuffer::initialize(int compression_level)
{
cctx = ZSTD_createCCtx();
if (cctx == nullptr)
@ -44,6 +42,7 @@ void ZstdDeflatingWriteBuffer::flush(ZSTD_EndDirective mode)
try
{
size_t out_offset = out->offset();
bool ended = false;
do
{
@ -67,6 +66,8 @@ void ZstdDeflatingWriteBuffer::flush(ZSTD_EndDirective mode)
ended = everything_was_compressed && everything_was_flushed;
} while (!ended);
total_out += out->offset() - out_offset;
}
catch (...)
{
@ -84,6 +85,9 @@ void ZstdDeflatingWriteBuffer::nextImpl()
void ZstdDeflatingWriteBuffer::finalizeBefore()
{
/// Don't write out if no data was ever compressed
if (!compress_empty && total_out == 0)
return;
flush(ZSTD_e_end);
}

View File

@ -14,12 +14,18 @@ namespace DB
class ZstdDeflatingWriteBuffer : public WriteBufferWithOwnMemoryDecorator
{
public:
template<typename WriteBufferT>
ZstdDeflatingWriteBuffer(
std::unique_ptr<WriteBuffer> out_,
WriteBufferT && out_,
int compression_level,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0);
size_t alignment = 0,
bool compress_empty_ = true)
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment), compress_empty(compress_empty_)
{
initialize(compression_level);
}
~ZstdDeflatingWriteBuffer() override;
@ -29,6 +35,8 @@ public:
}
private:
void initialize(int compression_level);
void nextImpl() override;
/// Flush all pending data and write zstd footer to the underlying buffer.
@ -42,6 +50,9 @@ private:
ZSTD_CCtx * cctx;
ZSTD_inBuffer input;
ZSTD_outBuffer output;
size_t total_out = 0;
bool compress_empty = true;
};
}

View File

@ -767,7 +767,6 @@ Chunk AsynchronousInsertQueue::processEntriesWithParsing(
};
StreamingFormatExecutor executor(header, format, std::move(on_error), std::move(adding_defaults_transform));
std::unique_ptr<ReadBuffer> last_buffer;
auto chunk_info = std::make_shared<AsyncInsertInfo>();
auto query_for_logging = serializeQuery(*key.query, insert_context->getSettingsRef().log_queries_cut_to_length);
@ -783,11 +782,6 @@ Chunk AsynchronousInsertQueue::processEntriesWithParsing(
auto buffer = std::make_unique<ReadBufferFromString>(*bytes);
size_t num_bytes = bytes->size();
size_t num_rows = executor.execute(*buffer);
/// Keep buffer, because it still can be used
/// in destructor, while resetting buffer at next iteration.
last_buffer = std::move(buffer);
total_rows += num_rows;
chunk_info->offsets.push_back(total_rows);
chunk_info->tokens.push_back(entry->async_dedup_token);
@ -796,8 +790,6 @@ Chunk AsynchronousInsertQueue::processEntriesWithParsing(
current_exception.clear();
}
format->addBuffer(std::move(last_buffer));
Chunk chunk(executor.getResultColumns(), total_rows);
chunk.setChunkInfo(std::move(chunk_info));
return chunk;

View File

@ -15,6 +15,7 @@
#include <Common/thread_local_rng.h>
#include <Common/FieldVisitorToString.h>
#include <Common/getMultipleKeysFromConfig.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/callOnce.h>
#include <Common/SharedLockGuard.h>
#include <Coordination/KeeperDispatcher.h>
@ -32,6 +33,7 @@
#include <Storages/StorageS3Settings.h>
#include <Disks/DiskLocal.h>
#include <Disks/ObjectStorages/DiskObjectStorage.h>
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Disks/StoragePolicy.h>
#include <Disks/IO/IOUringReader.h>
#include <IO/SynchronousReader.h>
@ -43,6 +45,7 @@
#include <Interpreters/Cache/FileCacheFactory.h>
#include <Interpreters/SessionTracker.h>
#include <Core/ServerSettings.h>
#include <Interpreters/PreparedSets.h>
#include <Core/Settings.h>
#include <Core/SettingsQuirks.h>
#include <Access/AccessControl.h>

View File

@ -282,7 +282,7 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
else if (create.uuid != UUIDHelpers::Nil && !DatabaseCatalog::instance().hasUUIDMapping(create.uuid))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot find UUID mapping for {}, it's a bug", create.uuid);
DatabasePtr database = DatabaseFactory::get(create, metadata_path / "", getContext());
DatabasePtr database = DatabaseFactory::instance().get(create, metadata_path / "", getContext());
if (create.uuid != UUIDHelpers::Nil)
create.setDatabase(TABLE_WITH_UUID_NAME_PLACEHOLDER);

View File

@ -79,7 +79,7 @@ BlockIO InterpreterOptimizeQuery::execute()
if (auto * snapshot_data = dynamic_cast<MergeTreeData::SnapshotData *>(storage_snapshot->data.get()))
snapshot_data->parts = {};
table->optimize(query_ptr, metadata_snapshot, ast.partition, ast.final, ast.deduplicate, column_names, getContext());
table->optimize(query_ptr, metadata_snapshot, ast.partition, ast.final, ast.deduplicate, column_names, ast.cleanup, getContext());
return {};
}

View File

@ -112,8 +112,7 @@ public:
throw Exception(ErrorCodes::SESSION_NOT_FOUND, "Session {} not found", session_id);
/// Create a new session from current context.
auto context = Context::createCopy(global_context);
it = sessions.insert(std::make_pair(key, std::make_shared<NamedSessionData>(key, context, timeout, *this))).first;
it = sessions.insert(std::make_pair(key, std::make_shared<NamedSessionData>(key, global_context, timeout, *this))).first;
const auto & session = it->second;
if (!thread.joinable())
@ -128,7 +127,7 @@ public:
/// Use existing session.
const auto & session = it->second;
LOG_TEST(log, "Reuse session from storage with session_id: {}, user_id: {}", key.second, key.first);
LOG_TRACE(log, "Reuse session from storage with session_id: {}, user_id: {}", key.second, key.first);
if (!session.unique())
throw Exception(ErrorCodes::SESSION_IS_LOCKED, "Session {} is locked by a concurrent client", session_id);
@ -703,6 +702,10 @@ void Session::releaseSessionID()
{
if (!named_session)
return;
prepared_client_info = getClientInfo();
session_context.reset();
named_session->release();
named_session = nullptr;
}

View File

@ -8,6 +8,7 @@
#include <chrono>
#include <memory>
#include <mutex>
#include <optional>
namespace Poco::Net { class SocketAddress; }

View File

@ -2,6 +2,7 @@
#include <Interpreters/Context.h>
#include "Processors/Executors/PullingPipelineExecutor.h"
#include <Functions/registerDatabases.h>
#include <Functions/registerFunctions.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <TableFunctions/registerTableFunctions.h>
@ -31,6 +32,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t * data, size_t size)
registerFunctions();
registerAggregateFunctions();
registerTableFunctions();
registerDatabases();
registerStorages();
registerDictionaries();
registerDisks(/* global_skip_access_check= */ true);

View File

@ -24,6 +24,9 @@ void ASTOptimizeQuery::formatQueryImpl(const FormatSettings & settings, FormatSt
if (deduplicate)
settings.ostr << (settings.hilite ? hilite_keyword : "") << " DEDUPLICATE" << (settings.hilite ? hilite_none : "");
if (cleanup)
settings.ostr << (settings.hilite ? hilite_keyword : "") << " CLEANUP" << (settings.hilite ? hilite_none : "");
if (deduplicate_by_columns)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " BY " << (settings.hilite ? hilite_none : "");

View File

@ -21,10 +21,12 @@ public:
bool deduplicate = false;
/// Deduplicate by columns.
ASTPtr deduplicate_by_columns;
/// Delete 'is_deleted' data
bool cleanup = false;
/** Get the text that identifies this element. */
String getID(char delim) const override
{
return "OptimizeQuery" + (delim + getDatabase()) + delim + getTable() + (final ? "_final" : "") + (deduplicate ? "_deduplicate" : "");
return "OptimizeQuery" + (delim + getDatabase()) + delim + getTable() + (final ? "_final" : "") + (deduplicate ? "_deduplicate" : "")+ (cleanup ? "_cleanup" : "");
}
ASTPtr clone() const override

View File

@ -39,6 +39,7 @@ bool ParserOptimizeQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expecte
ASTPtr partition;
bool final = false;
bool deduplicate = false;
bool cleanup = false;
String cluster_str;
if (!s_optimize_table.ignore(pos, expected))
@ -69,6 +70,9 @@ bool ParserOptimizeQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expecte
if (s_deduplicate.ignore(pos, expected))
deduplicate = true;
if (s_cleanup.ignore(pos, expected))
cleanup = true;
ASTPtr deduplicate_by_columns;
if (deduplicate && s_by.ignore(pos, expected))
{
@ -77,9 +81,6 @@ bool ParserOptimizeQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expecte
return false;
}
/// Obsolete feature, ignored for backward compatibility.
s_cleanup.ignore(pos, expected);
auto query = std::make_shared<ASTOptimizeQuery>();
node = query;
@ -89,6 +90,7 @@ bool ParserOptimizeQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expecte
query->final = final;
query->deduplicate = deduplicate;
query->deduplicate_by_columns = deduplicate_by_columns;
query->cleanup = cleanup;
query->database = database;
query->table = table;

View File

@ -8,16 +8,13 @@
#include <Analyzer/LambdaNode.h>
#include <Analyzer/SortNode.h>
#include <Analyzer/WindowNode.h>
#include <Analyzer/UnionNode.h>
#include <Analyzer/QueryNode.h>
#include <Analyzer/ConstantValue.h>
#include <DataTypes/FieldToDataType.h>
#include <DataTypes/DataTypeSet.h>
#include <Common/FieldVisitorToString.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <Columns/ColumnSet.h>
#include <Columns/ColumnConst.h>
@ -33,6 +30,7 @@
#include <Planner/TableExpressionData.h>
#include <Planner/Utils.h>
namespace DB
{

View File

@ -34,15 +34,13 @@ MutableColumns StreamingFormatExecutor::getResultColumns()
size_t StreamingFormatExecutor::execute(ReadBuffer & buffer)
{
auto & initial_buf = format->getReadBuffer();
format->setReadBuffer(buffer);
size_t rows = execute();
/// Format destructor can touch read buffer (for example when we use PeekableReadBuffer),
/// but we cannot control lifetime of provided read buffer. To avoid heap use after free
/// we can set initial read buffer back, because initial read buffer was created before
/// format, so it will be destructed after it.
format->setReadBuffer(initial_buf);
return rows;
/// we call format->resetReadBuffer() method that resets all buffers inside format.
SCOPE_EXIT(format->resetReadBuffer());
return execute();
}
size_t StreamingFormatExecutor::execute()

View File

@ -24,7 +24,6 @@ void IInputFormat::resetParser()
void IInputFormat::setReadBuffer(ReadBuffer & in_)
{
chassert(in); // not supported by random-access formats
in = &in_;
}

View File

@ -36,7 +36,7 @@ public:
virtual void resetParser();
virtual void setReadBuffer(ReadBuffer & in_);
ReadBuffer & getReadBuffer() const { chassert(in); return *in; }
virtual void resetReadBuffer() { in = nullptr; }
virtual const BlockMissingValues & getMissingValues() const
{
@ -61,6 +61,8 @@ public:
void needOnlyCount() { need_only_count = true; }
protected:
ReadBuffer & getReadBuffer() const { chassert(in); return *in; }
virtual Chunk getChunkForCount(size_t rows);
ColumnMappingPtr column_mapping{};

View File

@ -7,7 +7,6 @@
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/WriteBufferFromString.h>
#include <IO/copyData.h>
#include <IO/PeekableReadBuffer.h>
#include <arrow/buffer.h>
#include <arrow/util/future.h>
#include <arrow/io/memory.h>

View File

@ -106,13 +106,14 @@ void CSVRowInputFormat::syncAfterError()
void CSVRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf->setSubBuffer(in_);
buf = std::make_unique<PeekableReadBuffer>(in_);
RowInputFormatWithNamesAndTypes::setReadBuffer(*buf);
}
void CSVRowInputFormat::resetParser()
void CSVRowInputFormat::resetReadBuffer()
{
RowInputFormatWithNamesAndTypes::resetParser();
buf->reset();
buf.reset();
RowInputFormatWithNamesAndTypes::resetReadBuffer();
}
void CSVFormatReader::skipRow()

View File

@ -27,7 +27,7 @@ public:
String getName() const override { return "CSVRowInputFormat"; }
void setReadBuffer(ReadBuffer & in_) override;
void resetParser() override;
void resetReadBuffer() override;
protected:
CSVRowInputFormat(const Block & header_, std::shared_ptr<PeekableReadBuffer> in_, const Params & params_,

Some files were not shown because too many files have changed in this diff Show More