Merge branch 'master' into aggressive-fuzz-new-tests

This commit is contained in:
alexey-milovidov 2021-01-16 23:56:44 +03:00 committed by GitHub
commit b3d137471e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
56 changed files with 3055 additions and 72 deletions

6
.gitmodules vendored
View File

@ -209,6 +209,12 @@
[submodule "contrib/fast_float"]
path = contrib/fast_float
url = https://github.com/fastfloat/fast_float
[submodule "contrib/libpqxx"]
path = contrib/libpqxx
url = https://github.com/jtv/libpqxx
[submodule "contrib/libpq"]
path = contrib/libpq
url = https://github.com/ClickHouse-Extras/libpq
[submodule "contrib/boringssl"]
path = contrib/boringssl
url = https://github.com/ClickHouse-Extras/boringssl.git

View File

@ -490,6 +490,7 @@ include (cmake/find/rapidjson.cmake)
include (cmake/find/fastops.cmake)
include (cmake/find/odbc.cmake)
include (cmake/find/rocksdb.cmake)
include (cmake/find/libpqxx.cmake)
include (cmake/find/nuraft.cmake)

31
cmake/find/libpqxx.cmake Normal file
View File

@ -0,0 +1,31 @@
option(ENABLE_LIBPQXX "Enalbe libpqxx" ${ENABLE_LIBRARIES})
if (NOT ENABLE_LIBPQXX)
return()
endif()
if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/libpqxx/CMakeLists.txt")
message (WARNING "submodule contrib/libpqxx is missing. to fix try run: \n git submodule update --init --recursive")
message (${RECONFIGURE_MESSAGE_LEVEL} "Can't find internal libpqxx library")
set (USE_LIBPQXX 0)
return()
endif()
if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/libpq/include")
message (ERROR "submodule contrib/libpq is missing. to fix try run: \n git submodule update --init --recursive")
message (${RECONFIGURE_MESSAGE_LEVEL} "Can't find internal libpq needed for libpqxx")
set (USE_LIBPQXX 0)
return()
endif()
if (NOT USE_INTERNAL_SSL_LIBRARY)
set (USE_LIBPQXX 0)
else ()
set (USE_LIBPQXX 1)
set (LIBPQXX_LIBRARY libpqxx)
set (LIBPQ_LIBRARY libpq)
set (LIBPQXX_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/libpqxx/include")
set (LIBPQ_ROOT_DIR "${ClickHouse_SOURCE_DIR}/contrib/libpq")
message (STATUS "Using libpqxx=${USE_LIBPQXX}: ${LIBPQXX_INCLUDE_DIR} : ${LIBPQXX_LIBRARY}")
message (STATUS "Using libpq: ${LIBPQ_ROOT_DIR} : ${LIBPQ_INCLUDE_DIR} : ${LIBPQ_LIBRARY}")
endif()

View File

@ -310,6 +310,11 @@ if (USE_INTERNAL_ROCKSDB_LIBRARY)
add_subdirectory(rocksdb-cmake)
endif()
if (USE_LIBPQXX)
add_subdirectory (libpq-cmake)
add_subdirectory (libpqxx-cmake)
endif()
if (USE_NURAFT)
add_subdirectory(nuraft-cmake)
endif()

1
contrib/libpq vendored Submodule

@ -0,0 +1 @@
Subproject commit 8e7e905854714a7fbb49c124dbc45c7bd4b98e07

View File

@ -0,0 +1,58 @@
set(LIBPQ_SOURCE_DIR ${ClickHouse_SOURCE_DIR}/contrib/libpq)
set(SRCS
${LIBPQ_SOURCE_DIR}/fe-auth.c
${LIBPQ_SOURCE_DIR}/fe-auth-scram.c
${LIBPQ_SOURCE_DIR}/fe-connect.c
${LIBPQ_SOURCE_DIR}/fe-exec.c
${LIBPQ_SOURCE_DIR}/fe-lobj.c
${LIBPQ_SOURCE_DIR}/fe-misc.c
${LIBPQ_SOURCE_DIR}/fe-print.c
${LIBPQ_SOURCE_DIR}/fe-protocol2.c
${LIBPQ_SOURCE_DIR}/fe-protocol3.c
${LIBPQ_SOURCE_DIR}/fe-secure.c
${LIBPQ_SOURCE_DIR}/fe-secure-common.c
${LIBPQ_SOURCE_DIR}/fe-secure-openssl.c
${LIBPQ_SOURCE_DIR}/legacy-pqsignal.c
${LIBPQ_SOURCE_DIR}/libpq-events.c
${LIBPQ_SOURCE_DIR}/pqexpbuffer.c
${LIBPQ_SOURCE_DIR}/common/scram-common.c
${LIBPQ_SOURCE_DIR}/common/sha2_openssl.c
${LIBPQ_SOURCE_DIR}/common/md5.c
${LIBPQ_SOURCE_DIR}/common/saslprep.c
${LIBPQ_SOURCE_DIR}/common/unicode_norm.c
${LIBPQ_SOURCE_DIR}/common/ip.c
${LIBPQ_SOURCE_DIR}/common/jsonapi.c
${LIBPQ_SOURCE_DIR}/common/wchar.c
${LIBPQ_SOURCE_DIR}/common/base64.c
${LIBPQ_SOURCE_DIR}/common/link-canary.c
${LIBPQ_SOURCE_DIR}/common/fe_memutils.c
${LIBPQ_SOURCE_DIR}/common/string.c
${LIBPQ_SOURCE_DIR}/common/pg_get_line.c
${LIBPQ_SOURCE_DIR}/common/stringinfo.c
${LIBPQ_SOURCE_DIR}/common/psprintf.c
${LIBPQ_SOURCE_DIR}/common/encnames.c
${LIBPQ_SOURCE_DIR}/common/logging.c
${LIBPQ_SOURCE_DIR}/port/snprintf.c
${LIBPQ_SOURCE_DIR}/port/strlcpy.c
${LIBPQ_SOURCE_DIR}/port/strerror.c
${LIBPQ_SOURCE_DIR}/port/inet_net_ntop.c
${LIBPQ_SOURCE_DIR}/port/getpeereid.c
${LIBPQ_SOURCE_DIR}/port/chklocale.c
${LIBPQ_SOURCE_DIR}/port/noblock.c
${LIBPQ_SOURCE_DIR}/port/pg_strong_random.c
${LIBPQ_SOURCE_DIR}/port/pgstrcasecmp.c
${LIBPQ_SOURCE_DIR}/port/thread.c
${LIBPQ_SOURCE_DIR}/port/path.c
${LIBPQ_SOURCE_DIR}/port/explicit_bzero.c
)
add_library(libpq ${SRCS})
target_include_directories (libpq PUBLIC ${LIBPQ_SOURCE_DIR})
target_include_directories (libpq PUBLIC ${LIBPQ_SOURCE_DIR}/include)
target_include_directories (libpq PRIVATE ${LIBPQ_SOURCE_DIR}/configs)
target_link_libraries (libpq PRIVATE ssl)

1
contrib/libpqxx vendored Submodule

@ -0,0 +1 @@
Subproject commit 58d2a028d1600225ac3a478d6b3a06ba2f0c01f6

View File

@ -0,0 +1,78 @@
set (LIBRARY_DIR ${ClickHouse_SOURCE_DIR}/contrib/libpqxx)
set (SRCS
${LIBRARY_DIR}/src/strconv.cxx
${LIBRARY_DIR}/src/array.cxx
${LIBRARY_DIR}/src/binarystring.cxx
${LIBRARY_DIR}/src/connection.cxx
${LIBRARY_DIR}/src/cursor.cxx
${LIBRARY_DIR}/src/encodings.cxx
${LIBRARY_DIR}/src/errorhandler.cxx
${LIBRARY_DIR}/src/except.cxx
${LIBRARY_DIR}/src/field.cxx
${LIBRARY_DIR}/src/largeobject.cxx
${LIBRARY_DIR}/src/notification.cxx
${LIBRARY_DIR}/src/pipeline.cxx
${LIBRARY_DIR}/src/result.cxx
${LIBRARY_DIR}/src/robusttransaction.cxx
${LIBRARY_DIR}/src/sql_cursor.cxx
${LIBRARY_DIR}/src/stream_from.cxx
${LIBRARY_DIR}/src/stream_to.cxx
${LIBRARY_DIR}/src/subtransaction.cxx
${LIBRARY_DIR}/src/transaction.cxx
${LIBRARY_DIR}/src/transaction_base.cxx
${LIBRARY_DIR}/src/row.cxx
${LIBRARY_DIR}/src/util.cxx
${LIBRARY_DIR}/src/version.cxx
)
# Need to explicitly include each header file, because in the directory include/pqxx there are also files
# like just 'array'. So if including the whole directory with `target_include_directories`, it will make
# conflicts with all includes of <array>.
set (HDRS
${LIBRARY_DIR}/include/pqxx/array.hxx
${LIBRARY_DIR}/include/pqxx/binarystring.hxx
${LIBRARY_DIR}/include/pqxx/composite.hxx
${LIBRARY_DIR}/include/pqxx/connection.hxx
${LIBRARY_DIR}/include/pqxx/cursor.hxx
${LIBRARY_DIR}/include/pqxx/dbtransaction.hxx
${LIBRARY_DIR}/include/pqxx/errorhandler.hxx
${LIBRARY_DIR}/include/pqxx/except.hxx
${LIBRARY_DIR}/include/pqxx/field.hxx
${LIBRARY_DIR}/include/pqxx/isolation.hxx
${LIBRARY_DIR}/include/pqxx/largeobject.hxx
${LIBRARY_DIR}/include/pqxx/nontransaction.hxx
${LIBRARY_DIR}/include/pqxx/notification.hxx
${LIBRARY_DIR}/include/pqxx/pipeline.hxx
${LIBRARY_DIR}/include/pqxx/prepared_statement.hxx
${LIBRARY_DIR}/include/pqxx/result.hxx
${LIBRARY_DIR}/include/pqxx/robusttransaction.hxx
${LIBRARY_DIR}/include/pqxx/row.hxx
${LIBRARY_DIR}/include/pqxx/separated_list.hxx
${LIBRARY_DIR}/include/pqxx/strconv.hxx
${LIBRARY_DIR}/include/pqxx/stream_from.hxx
${LIBRARY_DIR}/include/pqxx/stream_to.hxx
${LIBRARY_DIR}/include/pqxx/subtransaction.hxx
${LIBRARY_DIR}/include/pqxx/transaction.hxx
${LIBRARY_DIR}/include/pqxx/transaction_base.hxx
${LIBRARY_DIR}/include/pqxx/types.hxx
${LIBRARY_DIR}/include/pqxx/util.hxx
${LIBRARY_DIR}/include/pqxx/version.hxx
${LIBRARY_DIR}/include/pqxx/zview.hxx
)
add_library(libpqxx ${SRCS} ${HDRS})
target_link_libraries(libpqxx PUBLIC ${LIBPQ_LIBRARY})
target_include_directories (libpqxx PRIVATE ${LIBRARY_DIR}/include)
# crutch
set(CM_CONFIG_H_IN "${LIBRARY_DIR}/include/pqxx/config.h.in")
set(CM_CONFIG_PUB "${LIBRARY_DIR}/include/pqxx/config-public-compiler.h")
set(CM_CONFIG_INT "${LIBRARY_DIR}/include/pqxx/config-internal-compiler.h")
set(CM_CONFIG_PQ "${LIBRARY_DIR}/include/pqxx/config-internal-libpq.h")
configure_file("${CM_CONFIG_H_IN}" "${CM_CONFIG_INT}" @ONLY)
configure_file("${CM_CONFIG_H_IN}" "${CM_CONFIG_PUB}" @ONLY)
configure_file("${CM_CONFIG_H_IN}" "${CM_CONFIG_PQ}" @ONLY)

View File

@ -10,6 +10,7 @@ stage=${stage:-}
script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
echo "$script_dir"
repo_dir=ch
BINARY_TO_DOWNLOAD=${BINARY_TO_DOWNLOAD:="clang-11_debug_none_bundled_unsplitted_disable_False_binary"}
function clone
{
@ -34,7 +35,7 @@ function clone
function download
{
wget -nv -nd -c "https://clickhouse-builds.s3.yandex.net/$PR_TO_TEST/$SHA_TO_TEST/clickhouse_build_check/clang-11_debug_none_bundled_unsplitted_disable_False_binary/clickhouse"
wget -nv -nd -c "https://clickhouse-builds.s3.yandex.net/$PR_TO_TEST/$SHA_TO_TEST/clickhouse_build_check/$BINARY_TO_DOWNLOAD/clickhouse"
chmod +x clickhouse
ln -s ./clickhouse ./clickhouse-server
ln -s ./clickhouse ./clickhouse-client

View File

@ -538,11 +538,11 @@ For case-insensitive search or/and in UTF-8 format use functions `ngramSearchCas
!!! note "Note"
For UTF-8 case we use 3-gram distance. All these are not perfectly fair n-gram distances. We use 2-byte hashes to hash n-grams and then calculate the (non-)symmetric difference between these hash tables collisions may occur. With UTF-8 case-insensitive format we do not use fair `tolower` function we zero the 5-th bit (starting from zero) of each codepoint byte and first bit of zeroth byte if bytes more than one this works for Latin and mostly for all Cyrillic letters.
## countSubstrings(haystack, needle) {#countSubstrings}
## countSubstrings {#countSubstrings}
Count the number of substring occurrences
Returns the number of substring occurrences.
For a case-insensitive search, use the function `countSubstringsCaseInsensitive` (or `countSubstringsCaseInsensitiveUTF8`).
For a case-insensitive search, use [countSubstringsCaseInsensitive](../../sql-reference/functions/string-search-functions.md#countSubstringsCaseInsensitive) or [countSubstringsCaseInsensitiveUTF8](../../sql-reference/functions/string-search-functions.md#countSubstringsCaseInsensitiveUTF8) functions.
**Syntax**
@ -554,20 +554,20 @@ countSubstrings(haystack, needle[, start_pos])
- `haystack` — The string to search in. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `needle` — The substring to search for. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `start_pos` Optional parameter, position of the first character in the string to start search. [UInt](../../sql-reference/data-types/int-uint.md)
- `start_pos` Position of the first character in the string to start search. Optional. [UInt](../../sql-reference/data-types/int-uint.md).
**Returned values**
- Number of occurrences.
Type: `Integer`.
Type: [UInt64](../../sql-reference/data-types/int-uint.md).
**Examples**
Query:
``` sql
SELECT countSubstrings('foobar.com', '.')
SELECT countSubstrings('foobar.com', '.');
```
Result:
@ -581,7 +581,7 @@ Result:
Query:
``` sql
SELECT countSubstrings('aaaa', 'aa')
SELECT countSubstrings('aaaa', 'aa');
```
Result:
@ -592,6 +592,138 @@ Result:
└───────────────────────────────┘
```
Query:
```sql
SELECT countSubstrings('abc___abc', 'abc', 4);
```
Result:
``` text
┌─countSubstrings('abc___abc', 'abc', 4)─┐
│ 1 │
└────────────────────────────────────────┘
```
## countSubstringsCaseInsensitive {#countSubstringsCaseInsensitive}
Returns the number of substring occurrences case-insensitive.
**Syntax**
``` sql
countSubstringsCaseInsensitive(haystack, needle[, start_pos])
```
**Parameters**
- `haystack` — The string to search in. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `needle` — The substring to search for. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `start_pos` Position of the first character in the string to start search. Optional. [UInt](../../sql-reference/data-types/int-uint.md).
**Returned values**
- Number of occurrences.
Type: [UInt64](../../sql-reference/data-types/int-uint.md).
**Examples**
Query:
``` sql
select countSubstringsCaseInsensitive('aba', 'B');
```
Result:
``` text
┌─countSubstringsCaseInsensitive('aba', 'B')─┐
│ 1 │
└────────────────────────────────────────────┘
```
Query:
``` sql
SELECT countSubstringsCaseInsensitive('foobar.com', 'CoM');
```
Result:
``` text
┌─countSubstringsCaseInsensitive('foobar.com', 'CoM')─┐
│ 1 │
└─────────────────────────────────────────────────────┘
```
Query:
``` sql
SELECT countSubstringsCaseInsensitive('abC___abC', 'aBc', 2);
```
Result:
``` text
┌─countSubstringsCaseInsensitive('abC___abC', 'aBc', 2)─┐
│ 1 │
└───────────────────────────────────────────────────────┘
```
## countSubstringsCaseInsensitiveUTF8 {#countSubstringsCaseInsensitiveUTF8}
Returns the number of substring occurrences in `UTF-8` case-insensitive.
**Syntax**
``` sql
SELECT countSubstringsCaseInsensitiveUTF8(haystack, needle[, start_pos])
```
**Parameters**
- `haystack` — The string to search in. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `needle` — The substring to search for. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `start_pos` Position of the first character in the string to start search. Optional. [UInt](../../sql-reference/data-types/int-uint.md).
**Returned values**
- Number of occurrences.
Type: [UInt64](../../sql-reference/data-types/int-uint.md).
**Examples**
Query:
``` sql
SELECT countSubstringsCaseInsensitiveUTF8('абв', 'A');
```
Result:
``` text
┌─countSubstringsCaseInsensitiveUTF8('абв', 'A')─┐
│ 1 │
└────────────────────────────────────────────────┘
```
Query:
```sql
SELECT countSubstringsCaseInsensitiveUTF8('аБв__АбВ__абв', 'Абв');
```
Result:
``` text
┌─countSubstringsCaseInsensitiveUTF8('аБв__АбВ__абв', 'Абв')─┐
│ 3 │
└────────────────────────────────────────────────────────────┘
```
## countMatches(haystack, pattern) {#countmatcheshaystack-pattern}
Returns the number of regular expression matches for a `pattern` in a `haystack`.

View File

@ -573,4 +573,190 @@ SELECT countMatches('aaaa', 'aa');
└───────────────────────────────┘
```
## countSubstrings {#countSubstrings}
Возвращает количество вхождений подстроки.
Для поиска без учета регистра, используйте функции [countSubstringsCaseInsensitive](../../sql-reference/functions/string-search-functions.md#countSubstringsCaseInsensitive) или [countSubstringsCaseInsensitiveUTF8](../../sql-reference/functions/string-search-functions.md#countSubstringsCaseInsensitiveUTF8)
**Синтаксис**
``` sql
countSubstrings(haystack, needle[, start_pos])
```
**Параметры**
- `haystack` — строка, в которой ведется поиск. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `needle` — искомая подстрока. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `start_pos` позиция первого символа в строке, с которого начнется поиск. Необязательный параметр. [UInt](../../sql-reference/data-types/int-uint.md).
**Возвращаемые значения**
- Число вхождений.
Тип: [UInt64](../../sql-reference/data-types/int-uint.md).
**Примеры**
Запрос:
``` sql
SELECT countSubstrings('foobar.com', '.');
```
Результат:
``` text
┌─countSubstrings('foobar.com', '.')─┐
│ 1 │
└────────────────────────────────────┘
```
Запрос:
``` sql
SELECT countSubstrings('aaaa', 'aa');
```
Результат:
``` text
┌─countSubstrings('aaaa', 'aa')─┐
│ 2 │
└───────────────────────────────┘
```
Запрос:
```sql
SELECT countSubstrings('abc___abc', 'abc', 4);
```
Результат:
``` text
┌─countSubstrings('abc___abc', 'abc', 4)─┐
│ 1 │
└────────────────────────────────────────┘
```
## countSubstringsCaseInsensitive {#countSubstringsCaseInsensitive}
Возвращает количество вхождений подстроки без учета регистра.
**Синтаксис**
``` sql
countSubstringsCaseInsensitive(haystack, needle[, start_pos])
```
**Параметры**
- `haystack` — строка, в которой ведется поиск. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `needle` — искомая подстрока. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `start_pos` позиция первого символа в строке, с которого начнется поиск. Необязательный параметр. [UInt](../../sql-reference/data-types/int-uint.md).
**Возвращаемые значения**
- Число вхождений.
Тип: [UInt64](../../sql-reference/data-types/int-uint.md).
**Примеры**
Запрос:
``` sql
select countSubstringsCaseInsensitive('aba', 'B');
```
Результат:
``` text
┌─countSubstringsCaseInsensitive('aba', 'B')─┐
│ 1 │
└────────────────────────────────────────────┘
```
Запрос:
``` sql
SELECT countSubstringsCaseInsensitive('foobar.com', 'CoM');
```
Результат:
``` text
┌─countSubstringsCaseInsensitive('foobar.com', 'CoM')─┐
│ 1 │
└─────────────────────────────────────────────────────┘
```
Запрос:
``` sql
SELECT countSubstringsCaseInsensitive('abC___abC', 'aBc', 2);
```
Результат:
``` text
┌─countSubstringsCaseInsensitive('abC___abC', 'aBc', 2)─┐
│ 1 │
└───────────────────────────────────────────────────────┘
```
## countSubstringsCaseInsensitiveUTF8 {#countSubstringsCaseInsensitiveUTF8}
Возвращает количество вхождений подстроки в `UTF-8` без учета регистра.
**Синтаксис**
``` sql
SELECT countSubstringsCaseInsensitiveUTF8(haystack, needle[, start_pos])
```
**Параметры**
- `haystack` — строка, в которой ведется поиск. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `needle` — искомая подстрока. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `start_pos` позиция первого символа в строке, с которого начнется поиск. Необязательный параметр. [UInt](../../sql-reference/data-types/int-uint.md).
**Возвращаемые значения**
- Число вхождений.
Тип: [UInt64](../../sql-reference/data-types/int-uint.md).
**Примеры**
Запрос:
``` sql
SELECT countSubstringsCaseInsensitiveUTF8('абв', 'A');
```
Результат:
``` text
┌─countSubstringsCaseInsensitiveUTF8('абв', 'A')─┐
│ 1 │
└────────────────────────────────────────────────┘
```
Запрос:
```sql
SELECT countSubstringsCaseInsensitiveUTF8('аБв__АбВ__абв', 'Абв');
```
Результат:
``` text
┌─countSubstringsCaseInsensitiveUTF8('аБв__АбВ__абв', 'Абв')─┐
│ 3 │
└────────────────────────────────────────────────────────────┘
```
[Оригинальная статья](https://clickhouse.tech/docs/ru/query_language/functions/string_search_functions/) <!--hide-->

View File

@ -159,6 +159,7 @@ enum class AccessType
M(REMOTE, "", GLOBAL, SOURCES) \
M(MONGO, "", GLOBAL, SOURCES) \
M(MYSQL, "", GLOBAL, SOURCES) \
M(POSTGRES, "", GLOBAL, SOURCES) \
M(ODBC, "", GLOBAL, SOURCES) \
M(JDBC, "", GLOBAL, SOURCES) \
M(HDFS, "", GLOBAL, SOURCES) \

View File

@ -79,6 +79,11 @@ if (USE_AMQPCPP)
add_headers_and_sources(dbms Storages/RabbitMQ)
endif()
if (USE_LIBPQXX)
add_headers_and_sources(dbms Databases/PostgreSQL)
add_headers_and_sources(dbms Storages/PostgreSQL)
endif()
if (USE_ROCKSDB)
add_headers_and_sources(dbms Storages/RocksDB)
endif()
@ -439,6 +444,11 @@ if (USE_ROCKSDB)
dbms_target_include_directories(SYSTEM BEFORE PUBLIC ${ROCKSDB_INCLUDE_DIR})
endif()
if (USE_LIBPQXX)
dbms_target_link_libraries(PUBLIC ${LIBPQXX_LIBRARY})
dbms_target_include_directories(SYSTEM BEFORE PUBLIC ${LIBPQXX_INCLUDE_DIR})
endif()
dbms_target_link_libraries(PRIVATE _boost_context)
if (ENABLE_TESTS AND USE_GTEST)

View File

@ -250,7 +250,16 @@ void BackgroundSchedulePool::threadFunction()
while (!shutdown)
{
if (Poco::AutoPtr<Poco::Notification> notification = queue.waitDequeueNotification())
/// We have to wait with timeout to prevent very rare deadlock, caused by the following race condition:
/// 1. Background thread N: threadFunction(): checks for shutdown (it's false)
/// 2. Main thread: ~BackgroundSchedulePool(): sets shutdown to true, calls queue.wakeUpAll(), it triggers
/// all existing Poco::Events inside Poco::NotificationQueue which background threads are waiting on.
/// 3. Background thread N: threadFunction(): calls queue.waitDequeueNotification(), it creates
/// new Poco::Event inside Poco::NotificationQueue and starts to wait on it
/// Background thread N will never be woken up.
/// TODO Do we really need Poco::NotificationQueue? Why not to use std::queue + mutex + condvar or maybe even DB::ThreadPool?
constexpr size_t wait_timeout_ms = 500;
if (Poco::AutoPtr<Poco::Notification> notification = queue.waitDequeueNotification(wait_timeout_ms))
{
TaskNotification & task_notification = static_cast<TaskNotification &>(*notification);
task_notification.execute();

View File

@ -4,6 +4,7 @@
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeFixedString.h>
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypesDecimal.h>
@ -35,49 +36,53 @@ void ExternalResultDescription::init(const Block & sample_block_)
DataTypePtr type_not_nullable = removeNullable(elem.type);
const IDataType * type = type_not_nullable.get();
if (typeid_cast<const DataTypeUInt8 *>(type))
WhichDataType which(type);
if (which.isUInt8())
types.emplace_back(ValueType::vtUInt8, is_nullable);
else if (typeid_cast<const DataTypeUInt16 *>(type))
else if (which.isUInt16())
types.emplace_back(ValueType::vtUInt16, is_nullable);
else if (typeid_cast<const DataTypeUInt32 *>(type))
else if (which.isUInt32())
types.emplace_back(ValueType::vtUInt32, is_nullable);
else if (typeid_cast<const DataTypeUInt64 *>(type))
else if (which.isUInt64())
types.emplace_back(ValueType::vtUInt64, is_nullable);
else if (typeid_cast<const DataTypeInt8 *>(type))
else if (which.isInt8())
types.emplace_back(ValueType::vtInt8, is_nullable);
else if (typeid_cast<const DataTypeInt16 *>(type))
else if (which.isInt16())
types.emplace_back(ValueType::vtInt16, is_nullable);
else if (typeid_cast<const DataTypeInt32 *>(type))
else if (which.isInt32())
types.emplace_back(ValueType::vtInt32, is_nullable);
else if (typeid_cast<const DataTypeInt64 *>(type))
else if (which.isInt64())
types.emplace_back(ValueType::vtInt64, is_nullable);
else if (typeid_cast<const DataTypeFloat32 *>(type))
else if (which.isFloat32())
types.emplace_back(ValueType::vtFloat32, is_nullable);
else if (typeid_cast<const DataTypeFloat64 *>(type))
else if (which.isFloat64())
types.emplace_back(ValueType::vtFloat64, is_nullable);
else if (typeid_cast<const DataTypeString *>(type))
else if (which.isString())
types.emplace_back(ValueType::vtString, is_nullable);
else if (typeid_cast<const DataTypeDate *>(type))
else if (which.isDate())
types.emplace_back(ValueType::vtDate, is_nullable);
else if (typeid_cast<const DataTypeDateTime *>(type))
else if (which.isDateTime())
types.emplace_back(ValueType::vtDateTime, is_nullable);
else if (typeid_cast<const DataTypeUUID *>(type))
else if (which.isUUID())
types.emplace_back(ValueType::vtUUID, is_nullable);
else if (typeid_cast<const DataTypeEnum8 *>(type))
else if (which.isEnum8())
types.emplace_back(ValueType::vtString, is_nullable);
else if (typeid_cast<const DataTypeEnum16 *>(type))
else if (which.isEnum16())
types.emplace_back(ValueType::vtString, is_nullable);
else if (typeid_cast<const DataTypeDateTime64 *>(type))
else if (which.isDateTime64())
types.emplace_back(ValueType::vtDateTime64, is_nullable);
else if (typeid_cast<const DataTypeDecimal<Decimal32> *>(type))
else if (which.isDecimal32())
types.emplace_back(ValueType::vtDecimal32, is_nullable);
else if (typeid_cast<const DataTypeDecimal<Decimal64> *>(type))
else if (which.isDecimal64())
types.emplace_back(ValueType::vtDecimal64, is_nullable);
else if (typeid_cast<const DataTypeDecimal<Decimal128> *>(type))
else if (which.isDecimal128())
types.emplace_back(ValueType::vtDecimal128, is_nullable);
else if (typeid_cast<const DataTypeDecimal<Decimal256> *>(type))
else if (which.isDecimal256())
types.emplace_back(ValueType::vtDecimal256, is_nullable);
else if (typeid_cast<const DataTypeFixedString *>(type))
else if (which.isArray())
types.emplace_back(ValueType::vtArray, is_nullable);
else if (which.isFixedString())
types.emplace_back(ValueType::vtFixedString, is_nullable);
else
throw Exception{"Unsupported type " + type->getName(), ErrorCodes::UNKNOWN_TYPE};

View File

@ -31,6 +31,7 @@ struct ExternalResultDescription
vtDecimal64,
vtDecimal128,
vtDecimal256,
vtArray,
vtFixedString
};

View File

@ -12,4 +12,4 @@
#cmakedefine01 USE_OPENCL
#cmakedefine01 USE_LDAP
#cmakedefine01 USE_ROCKSDB
#cmakedefine01 USE_LIBPQXX

View File

@ -0,0 +1,297 @@
#include "PostgreSQLBlockInputStream.h"
#if USE_LIBPQXX
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnDecimal.h>
#include <DataTypes/IDataType.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypesDecimal.h>
#include <Interpreters/convertFieldToType.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadBufferFromString.h>
#include <Common/assert_cast.h>
#include <ext/range.h>
#include <common/logger_useful.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
PostgreSQLBlockInputStream::PostgreSQLBlockInputStream(
ConnectionPtr connection_,
const std::string & query_str_,
const Block & sample_block,
const UInt64 max_block_size_)
: query_str(query_str_)
, max_block_size(max_block_size_)
, connection(connection_)
{
description.init(sample_block);
for (const auto idx : ext::range(0, description.sample_block.columns()))
if (description.types[idx].first == ValueType::vtArray)
prepareArrayInfo(idx, description.sample_block.getByPosition(idx).type);
/// pqxx::stream_from uses COPY command, will get error if ';' is present
if (query_str.ends_with(';'))
query_str.resize(query_str.size() - 1);
}
void PostgreSQLBlockInputStream::readPrefix()
{
tx = std::make_unique<pqxx::read_transaction>(*connection);
stream = std::make_unique<pqxx::stream_from>(*tx, pqxx::from_query, std::string_view(query_str));
}
Block PostgreSQLBlockInputStream::readImpl()
{
/// Check if pqxx::stream_from is finished
if (!stream || !(*stream))
return Block();
MutableColumns columns = description.sample_block.cloneEmptyColumns();
size_t num_rows = 0;
while (true)
{
const std::vector<pqxx::zview> * row{stream->read_row()};
/// row is nullptr if pqxx::stream_from is finished
if (!row)
break;
for (const auto idx : ext::range(0, row->size()))
{
const auto & sample = description.sample_block.getByPosition(idx);
/// if got NULL type, then pqxx::zview will return nullptr in c_str()
if ((*row)[idx].c_str())
{
if (description.types[idx].second)
{
ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*columns[idx]);
const auto & data_type = assert_cast<const DataTypeNullable &>(*sample.type);
insertValue(column_nullable.getNestedColumn(), (*row)[idx], description.types[idx].first, data_type.getNestedType(), idx);
column_nullable.getNullMapData().emplace_back(0);
}
else
{
insertValue(*columns[idx], (*row)[idx], description.types[idx].first, sample.type, idx);
}
}
else
{
insertDefaultValue(*columns[idx], *sample.column);
}
}
if (++num_rows == max_block_size)
break;
}
return description.sample_block.cloneWithColumns(std::move(columns));
}
void PostgreSQLBlockInputStream::readSuffix()
{
if (stream)
{
stream->complete();
tx->commit();
}
}
void PostgreSQLBlockInputStream::insertValue(IColumn & column, std::string_view value,
const ExternalResultDescription::ValueType type, const DataTypePtr data_type, size_t idx)
{
switch (type)
{
case ValueType::vtUInt8:
assert_cast<ColumnUInt8 &>(column).insertValue(pqxx::from_string<uint16_t>(value));
break;
case ValueType::vtUInt16:
assert_cast<ColumnUInt16 &>(column).insertValue(pqxx::from_string<uint16_t>(value));
break;
case ValueType::vtUInt32:
assert_cast<ColumnUInt32 &>(column).insertValue(pqxx::from_string<uint32_t>(value));
break;
case ValueType::vtUInt64:
assert_cast<ColumnUInt64 &>(column).insertValue(pqxx::from_string<uint64_t>(value));
break;
case ValueType::vtInt8:
assert_cast<ColumnInt8 &>(column).insertValue(pqxx::from_string<int16_t>(value));
break;
case ValueType::vtInt16:
assert_cast<ColumnInt16 &>(column).insertValue(pqxx::from_string<int16_t>(value));
break;
case ValueType::vtInt32:
assert_cast<ColumnInt32 &>(column).insertValue(pqxx::from_string<int32_t>(value));
break;
case ValueType::vtInt64:
assert_cast<ColumnInt64 &>(column).insertValue(pqxx::from_string<int64_t>(value));
break;
case ValueType::vtFloat32:
assert_cast<ColumnFloat32 &>(column).insertValue(pqxx::from_string<float>(value));
break;
case ValueType::vtFloat64:
assert_cast<ColumnFloat64 &>(column).insertValue(pqxx::from_string<double>(value));
break;
case ValueType::vtFixedString:[[fallthrough]];
case ValueType::vtString:
assert_cast<ColumnString &>(column).insertData(value.data(), value.size());
break;
case ValueType::vtUUID:
assert_cast<ColumnUInt128 &>(column).insert(parse<UUID>(value.data(), value.size()));
break;
case ValueType::vtDate:
assert_cast<ColumnUInt16 &>(column).insertValue(UInt16{LocalDate{std::string(value)}.getDayNum()});
break;
case ValueType::vtDateTime:
assert_cast<ColumnUInt32 &>(column).insertValue(time_t{LocalDateTime{std::string(value)}});
break;
case ValueType::vtDateTime64:[[fallthrough]];
case ValueType::vtDecimal32: [[fallthrough]];
case ValueType::vtDecimal64: [[fallthrough]];
case ValueType::vtDecimal128: [[fallthrough]];
case ValueType::vtDecimal256:
{
ReadBufferFromString istr(value);
data_type->deserializeAsWholeText(column, istr, FormatSettings{});
break;
}
case ValueType::vtArray:
{
pqxx::array_parser parser{value};
std::pair<pqxx::array_parser::juncture, std::string> parsed = parser.get_next();
size_t dimension = 0, max_dimension = 0, expected_dimensions = array_info[idx].num_dimensions;
const auto parse_value = array_info[idx].pqxx_parser;
std::vector<std::vector<Field>> dimensions(expected_dimensions + 1);
while (parsed.first != pqxx::array_parser::juncture::done)
{
if ((parsed.first == pqxx::array_parser::juncture::row_start) && (++dimension > expected_dimensions))
throw Exception("Got more dimensions than expected", ErrorCodes::BAD_ARGUMENTS);
else if (parsed.first == pqxx::array_parser::juncture::string_value)
dimensions[dimension].emplace_back(parse_value(parsed.second));
else if (parsed.first == pqxx::array_parser::juncture::null_value)
dimensions[dimension].emplace_back(array_info[idx].default_value);
else if (parsed.first == pqxx::array_parser::juncture::row_end)
{
max_dimension = std::max(max_dimension, dimension);
if (--dimension == 0)
break;
dimensions[dimension].emplace_back(Array(dimensions[dimension + 1].begin(), dimensions[dimension + 1].end()));
dimensions[dimension + 1].clear();
}
parsed = parser.get_next();
}
if (max_dimension < expected_dimensions)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Got less dimensions than expected. ({} instead of {})", max_dimension, expected_dimensions);
assert_cast<ColumnArray &>(column).insert(Array(dimensions[1].begin(), dimensions[1].end()));
break;
}
}
}
void PostgreSQLBlockInputStream::prepareArrayInfo(size_t column_idx, const DataTypePtr data_type)
{
const auto * array_type = typeid_cast<const DataTypeArray *>(data_type.get());
auto nested = array_type->getNestedType();
size_t count_dimensions = 1;
while (isArray(nested))
{
++count_dimensions;
nested = typeid_cast<const DataTypeArray *>(nested.get())->getNestedType();
}
Field default_value = nested->getDefault();
if (nested->isNullable())
nested = static_cast<const DataTypeNullable *>(nested.get())->getNestedType();
WhichDataType which(nested);
std::function<Field(std::string & fields)> parser;
if (which.isUInt8() || which.isUInt16())
parser = [](std::string & field) -> Field { return pqxx::from_string<uint16_t>(field); };
else if (which.isInt8() || which.isInt16())
parser = [](std::string & field) -> Field { return pqxx::from_string<int16_t>(field); };
else if (which.isUInt32())
parser = [](std::string & field) -> Field { return pqxx::from_string<uint32_t>(field); };
else if (which.isInt32())
parser = [](std::string & field) -> Field { return pqxx::from_string<int32_t>(field); };
else if (which.isUInt64())
parser = [](std::string & field) -> Field { return pqxx::from_string<uint64_t>(field); };
else if (which.isInt64())
parser = [](std::string & field) -> Field { return pqxx::from_string<int64_t>(field); };
else if (which.isFloat32())
parser = [](std::string & field) -> Field { return pqxx::from_string<float>(field); };
else if (which.isFloat64())
parser = [](std::string & field) -> Field { return pqxx::from_string<double>(field); };
else if (which.isString() || which.isFixedString())
parser = [](std::string & field) -> Field { return field; };
else if (which.isDate())
parser = [](std::string & field) -> Field { return UInt16{LocalDate{field}.getDayNum()}; };
else if (which.isDateTime())
parser = [](std::string & field) -> Field { return time_t{LocalDateTime{field}}; };
else if (which.isDecimal32())
parser = [nested](std::string & field) -> Field
{
const auto & type = typeid_cast<const DataTypeDecimal<Decimal32> *>(nested.get());
DataTypeDecimal<Decimal32> res(getDecimalPrecision(*type), getDecimalScale(*type));
return convertFieldToType(field, res);
};
else if (which.isDecimal64())
parser = [nested](std::string & field) -> Field
{
const auto & type = typeid_cast<const DataTypeDecimal<Decimal64> *>(nested.get());
DataTypeDecimal<Decimal64> res(getDecimalPrecision(*type), getDecimalScale(*type));
return convertFieldToType(field, res);
};
else if (which.isDecimal128())
parser = [nested](std::string & field) -> Field
{
const auto & type = typeid_cast<const DataTypeDecimal<Decimal128> *>(nested.get());
DataTypeDecimal<Decimal128> res(getDecimalPrecision(*type), getDecimalScale(*type));
return convertFieldToType(field, res);
};
else if (which.isDecimal256())
parser = [nested](std::string & field) -> Field
{
const auto & type = typeid_cast<const DataTypeDecimal<Decimal256> *>(nested.get());
DataTypeDecimal<Decimal256> res(getDecimalPrecision(*type), getDecimalScale(*type));
return convertFieldToType(field, res);
};
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Type conversion to {} is not supported", nested->getName());
array_info[column_idx] = {count_dimensions, default_value, parser};
}
}
#endif

View File

@ -0,0 +1,65 @@
#pragma once
#if !defined(ARCADIA_BUILD)
#include "config_core.h"
#endif
#if USE_LIBPQXX
#include <Core/Block.h>
#include <DataStreams/IBlockInputStream.h>
#include <Core/ExternalResultDescription.h>
#include <Core/Field.h>
#include <pqxx/pqxx>
namespace DB
{
using ConnectionPtr = std::shared_ptr<pqxx::connection>;
class PostgreSQLBlockInputStream : public IBlockInputStream
{
public:
PostgreSQLBlockInputStream(
ConnectionPtr connection_,
const std::string & query_str,
const Block & sample_block,
const UInt64 max_block_size_);
String getName() const override { return "PostgreSQL"; }
Block getHeader() const override { return description.sample_block.cloneEmpty(); }
private:
using ValueType = ExternalResultDescription::ValueType;
void readPrefix() override;
Block readImpl() override;
void readSuffix() override;
void insertValue(IColumn & column, std::string_view value,
const ExternalResultDescription::ValueType type, const DataTypePtr data_type, size_t idx);
void insertDefaultValue(IColumn & column, const IColumn & sample_column)
{
column.insertFrom(sample_column, 0);
}
void prepareArrayInfo(size_t column_idx, const DataTypePtr data_type);
String query_str;
const UInt64 max_block_size;
ExternalResultDescription description;
ConnectionPtr connection;
std::unique_ptr<pqxx::read_transaction> tx;
std::unique_ptr<pqxx::stream_from> stream;
struct ArrayInfo
{
size_t num_dimensions;
Field default_value;
std::function<Field(std::string & field)> pqxx_parser;
};
std::unordered_map<size_t, ArrayInfo> array_info;
};
}
#endif

View File

@ -12,7 +12,7 @@ NO_COMPILER_WARNINGS()
SRCS(
<? find . -name '*.cpp' | grep -v -F tests | sed 's/^\.\// /' | sort ?>
<? find . -name '*.cpp' | grep -v -P 'tests|PostgreSQL' | sed 's/^\.\// /' | sort ?>
)
END()

View File

@ -23,11 +23,19 @@
# include <Databases/MySQL/DatabaseConnectionMySQL.h>
# include <Databases/MySQL/MaterializeMySQLSettings.h>
# include <Databases/MySQL/DatabaseMaterializeMySQL.h>
# include <Interpreters/evaluateConstantExpression.h>
# include <Common/parseAddress.h>
# include <mysqlxx/Pool.h>
#endif
#if USE_MYSQL || USE_LIBPQXX
#include <Interpreters/evaluateConstantExpression.h>
#include <Common/parseAddress.h>
#endif
#if USE_LIBPQXX
#include <Databases/PostgreSQL/DatabasePostgreSQL.h> // Y_IGNORE
#include <Storages/PostgreSQL/PostgreSQLConnection.h>
#endif
namespace DB
{
@ -80,7 +88,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
const String & engine_name = engine_define->engine->name;
const UUID & uuid = create.uuid;
if (engine_name != "MySQL" && engine_name != "MaterializeMySQL" && engine_name != "Lazy" && engine_define->engine->arguments)
if (engine_name != "MySQL" && engine_name != "MaterializeMySQL" && engine_name != "Lazy" && engine_name != "PostgreSQL" && engine_define->engine->arguments)
throw Exception("Database engine " + engine_name + " cannot have arguments", ErrorCodes::BAD_ARGUMENTS);
if (engine_define->engine->parameters || engine_define->partition_by || engine_define->primary_key || engine_define->order_by ||
@ -168,6 +176,44 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
return std::make_shared<DatabaseLazy>(database_name, metadata_path, cache_expiration_time_seconds, context);
}
#if USE_LIBPQXX
else if (engine_name == "PostgreSQL")
{
const ASTFunction * engine = engine_define->engine;
if (!engine->arguments || engine->arguments->children.size() < 4 || engine->arguments->children.size() > 5)
throw Exception(fmt::format(
"{} Database require host:port, database_name, username, password arguments "
"[, use_table_cache = 0].", engine_name),
ErrorCodes::BAD_ARGUMENTS);
ASTs & engine_args = engine->arguments->children;
for (auto & engine_arg : engine_args)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, context);
const auto & host_port = safeGetLiteralValue<String>(engine_args[0], engine_name);
const auto & postgres_database_name = safeGetLiteralValue<String>(engine_args[1], engine_name);
const auto & username = safeGetLiteralValue<String>(engine_args[2], engine_name);
const auto & password = safeGetLiteralValue<String>(engine_args[3], engine_name);
auto use_table_cache = 0;
if (engine->arguments->children.size() == 5)
use_table_cache = safeGetLiteralValue<UInt64>(engine_args[4], engine_name);
auto parsed_host_port = parseAddress(host_port, 5432);
/// no connection is made here
auto connection = std::make_shared<PostgreSQLConnection>(
postgres_database_name, parsed_host_port.first, parsed_host_port.second, username, password);
return std::make_shared<DatabasePostgreSQL>(
context, metadata_path, engine_define, database_name, postgres_database_name, connection, use_table_cache);
}
#endif
throw Exception("Unknown database engine: " + engine_name, ErrorCodes::UNKNOWN_DATABASE_ENGINE);
}

View File

@ -0,0 +1,415 @@
#include <Databases/PostgreSQL/DatabasePostgreSQL.h>
#if USE_LIBPQXX
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeArray.h>
#include <Storages/StoragePostgreSQL.h>
#include <Storages/PostgreSQL/PostgreSQLConnection.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/queryToString.h>
#include <Common/escapeForFileName.h>
#include <Poco/DirectoryIterator.h>
#include <Poco/File.h>
#include <Databases/PostgreSQL/fetchPostgreSQLTableStructure.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int NOT_IMPLEMENTED;
extern const int UNKNOWN_TABLE;
extern const int TABLE_IS_DROPPED;
extern const int TABLE_ALREADY_EXISTS;
}
static const auto suffix = ".removed";
static const auto cleaner_reschedule_ms = 60000;
DatabasePostgreSQL::DatabasePostgreSQL(
const Context & context,
const String & metadata_path_,
const ASTStorage * database_engine_define_,
const String & dbname_,
const String & postgres_dbname,
PostgreSQLConnectionPtr connection_,
const bool cache_tables_)
: IDatabase(dbname_)
, global_context(context.getGlobalContext())
, metadata_path(metadata_path_)
, database_engine_define(database_engine_define_->clone())
, dbname(postgres_dbname)
, connection(std::move(connection_))
, cache_tables(cache_tables_)
{
cleaner_task = context.getSchedulePool().createTask("PostgreSQLCleanerTask", [this]{ removeOutdatedTables(); });
cleaner_task->deactivate();
}
bool DatabasePostgreSQL::empty() const
{
std::lock_guard<std::mutex> lock(mutex);
auto tables_list = fetchTablesList();
for (const auto & table_name : tables_list)
if (!detached_or_dropped.count(table_name))
return false;
return true;
}
DatabaseTablesIteratorPtr DatabasePostgreSQL::getTablesIterator(
const Context & context, const FilterByNameFunction & /* filter_by_table_name */)
{
std::lock_guard<std::mutex> lock(mutex);
Tables tables;
auto table_names = fetchTablesList();
for (const auto & table_name : table_names)
if (!detached_or_dropped.count(table_name))
tables[table_name] = fetchTable(table_name, context, true);
return std::make_unique<DatabaseTablesSnapshotIterator>(tables, database_name);
}
std::unordered_set<std::string> DatabasePostgreSQL::fetchTablesList() const
{
std::unordered_set<std::string> tables;
std::string query = "SELECT tablename FROM pg_catalog.pg_tables "
"WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema'";
pqxx::read_transaction tx(*connection->conn());
for (auto table_name : tx.stream<std::string>(query))
tables.insert(std::get<0>(table_name));
return tables;
}
bool DatabasePostgreSQL::checkPostgresTable(const String & table_name) const
{
if (table_name.find('\'') != std::string::npos
|| table_name.find('\\') != std::string::npos)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"PostgreSQL table name cannot contain single quote or backslash characters, passed {}", table_name);
}
pqxx::nontransaction tx(*connection->conn());
try
{
/// Casting table_name::regclass throws pqxx::indefined_table exception if table_name is incorrect.
pqxx::result result = tx.exec(fmt::format(
"SELECT '{}'::regclass, tablename "
"FROM pg_catalog.pg_tables "
"WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema' "
"AND tablename = '{}'", table_name, table_name));
}
catch (pqxx::undefined_table const &)
{
return false;
}
catch (Exception & e)
{
e.addMessage("while checking postgresql table existence");
throw;
}
return true;
}
bool DatabasePostgreSQL::isTableExist(const String & table_name, const Context & /* context */) const
{
std::lock_guard<std::mutex> lock(mutex);
if (detached_or_dropped.count(table_name))
return false;
return checkPostgresTable(table_name);
}
StoragePtr DatabasePostgreSQL::tryGetTable(const String & table_name, const Context & context) const
{
std::lock_guard<std::mutex> lock(mutex);
if (!detached_or_dropped.count(table_name))
return fetchTable(table_name, context, false);
return StoragePtr{};
}
StoragePtr DatabasePostgreSQL::fetchTable(const String & table_name, const Context & context, const bool table_checked) const
{
if (!cache_tables || !cached_tables.count(table_name))
{
if (!table_checked && !checkPostgresTable(table_name))
return StoragePtr{};
auto use_nulls = context.getSettingsRef().external_table_functions_use_nulls;
auto columns = fetchPostgreSQLTableStructure(connection->conn(), table_name, use_nulls);
if (!columns)
return StoragePtr{};
auto storage = StoragePostgreSQL::create(
StorageID(database_name, table_name), table_name, std::make_shared<PostgreSQLConnection>(connection->conn_str()),
ColumnsDescription{*columns}, ConstraintsDescription{}, context);
if (cache_tables)
cached_tables[table_name] = storage;
return storage;
}
if (table_checked || checkPostgresTable(table_name))
{
return cached_tables[table_name];
}
/// Table does not exist anymore
cached_tables.erase(table_name);
return StoragePtr{};
}
void DatabasePostgreSQL::attachTable(const String & table_name, const StoragePtr & storage, const String &)
{
std::lock_guard<std::mutex> lock{mutex};
if (!checkPostgresTable(table_name))
throw Exception(fmt::format("Cannot attach table {}.{} because it does not exist", database_name, table_name), ErrorCodes::UNKNOWN_TABLE);
if (!detached_or_dropped.count(table_name))
throw Exception(fmt::format("Cannot attach table {}.{}. It already exists", database_name, table_name), ErrorCodes::TABLE_ALREADY_EXISTS);
if (cache_tables)
cached_tables[table_name] = storage;
detached_or_dropped.erase(table_name);
Poco::File table_marked_as_removed(getMetadataPath() + '/' + escapeForFileName(table_name) + suffix);
if (table_marked_as_removed.exists())
table_marked_as_removed.remove();
}
StoragePtr DatabasePostgreSQL::detachTable(const String & table_name)
{
std::lock_guard<std::mutex> lock{mutex};
if (detached_or_dropped.count(table_name))
throw Exception(fmt::format("Cannot detach table {}.{}. It is already dropped/detached", database_name, table_name), ErrorCodes::TABLE_IS_DROPPED);
if (!checkPostgresTable(table_name))
throw Exception(fmt::format("Cannot detach table {}.{} because it does not exist", database_name, table_name), ErrorCodes::UNKNOWN_TABLE);
if (cache_tables)
cached_tables.erase(table_name);
detached_or_dropped.emplace(table_name);
/// not used anywhere (for postgres database)
return StoragePtr{};
}
void DatabasePostgreSQL::createTable(const Context &, const String & table_name, const StoragePtr & storage, const ASTPtr & create_query)
{
const auto & create = create_query->as<ASTCreateQuery>();
if (!create->attach)
throw Exception("PostgreSQL database engine does not support create table", ErrorCodes::NOT_IMPLEMENTED);
attachTable(table_name, storage, {});
}
void DatabasePostgreSQL::dropTable(const Context &, const String & table_name, bool /* no_delay */)
{
std::lock_guard<std::mutex> lock{mutex};
if (!checkPostgresTable(table_name))
throw Exception(fmt::format("Cannot drop table {}.{} because it does not exist", database_name, table_name), ErrorCodes::UNKNOWN_TABLE);
if (detached_or_dropped.count(table_name))
throw Exception(fmt::format("Table {}.{} is already dropped/detached", database_name, table_name), ErrorCodes::TABLE_IS_DROPPED);
Poco::File mark_table_removed(getMetadataPath() + '/' + escapeForFileName(table_name) + suffix);
try
{
mark_table_removed.createFile();
}
catch (...)
{
throw;
}
if (cache_tables)
cached_tables.erase(table_name);
detached_or_dropped.emplace(table_name);
}
void DatabasePostgreSQL::drop(const Context & /*context*/)
{
Poco::File(getMetadataPath()).remove(true);
}
void DatabasePostgreSQL::loadStoredObjects(Context & /* context */, bool, bool /*force_attach*/)
{
{
std::lock_guard<std::mutex> lock{mutex};
Poco::DirectoryIterator iterator(getMetadataPath());
/// Check for previously dropped tables
for (Poco::DirectoryIterator end; iterator != end; ++iterator)
{
if (iterator->isFile() && endsWith(iterator.name(), suffix))
{
const auto & file_name = iterator.name();
const auto & table_name = unescapeForFileName(file_name.substr(0, file_name.size() - strlen(suffix)));
detached_or_dropped.emplace(table_name);
}
}
}
cleaner_task->activateAndSchedule();
}
void DatabasePostgreSQL::removeOutdatedTables()
{
std::lock_guard<std::mutex> lock{mutex};
auto actual_tables = fetchTablesList();
if (cache_tables)
{
/// (Tables are cached only after being accessed at least once)
for (auto iter = cached_tables.begin(); iter != cached_tables.end();)
{
if (!actual_tables.count(iter->first))
iter = cached_tables.erase(iter);
else
++iter;
}
}
for (auto iter = detached_or_dropped.begin(); iter != detached_or_dropped.end();)
{
if (!actual_tables.count(*iter))
{
auto table_name = *iter;
iter = detached_or_dropped.erase(iter);
Poco::File table_marked_as_removed(getMetadataPath() + '/' + escapeForFileName(table_name) + suffix);
if (table_marked_as_removed.exists())
table_marked_as_removed.remove();
}
else
++iter;
}
cleaner_task->scheduleAfter(cleaner_reschedule_ms);
}
void DatabasePostgreSQL::shutdown()
{
cleaner_task->deactivate();
}
ASTPtr DatabasePostgreSQL::getCreateDatabaseQuery() const
{
const auto & create_query = std::make_shared<ASTCreateQuery>();
create_query->database = getDatabaseName();
create_query->set(create_query->storage, database_engine_define);
return create_query;
}
ASTPtr DatabasePostgreSQL::getCreateTableQueryImpl(const String & table_name, const Context & context, bool throw_on_error) const
{
auto storage = fetchTable(table_name, context, false);
if (!storage)
{
if (throw_on_error)
throw Exception(fmt::format("PostgreSQL table {}.{} does not exist", database_name, table_name), ErrorCodes::UNKNOWN_TABLE);
return nullptr;
}
auto create_table_query = std::make_shared<ASTCreateQuery>();
auto table_storage_define = database_engine_define->clone();
create_table_query->set(create_table_query->storage, table_storage_define);
auto columns_declare_list = std::make_shared<ASTColumns>();
auto columns_expression_list = std::make_shared<ASTExpressionList>();
columns_declare_list->set(columns_declare_list->columns, columns_expression_list);
create_table_query->set(create_table_query->columns_list, columns_declare_list);
/// init create query.
auto table_id = storage->getStorageID();
create_table_query->table = table_id.table_name;
create_table_query->database = table_id.database_name;
auto metadata_snapshot = storage->getInMemoryMetadataPtr();
for (const auto & column_type_and_name : metadata_snapshot->getColumns().getOrdinary())
{
const auto & column_declaration = std::make_shared<ASTColumnDeclaration>();
column_declaration->name = column_type_and_name.name;
column_declaration->type = getColumnDeclaration(column_type_and_name.type);
columns_expression_list->children.emplace_back(column_declaration);
}
ASTStorage * ast_storage = table_storage_define->as<ASTStorage>();
ASTs storage_children = ast_storage->children;
auto storage_engine_arguments = ast_storage->engine->arguments;
/// Remove extra engine argument (`use_table_cache`)
if (storage_engine_arguments->children.size() > 4)
storage_engine_arguments->children.resize(storage_engine_arguments->children.size() - 1);
/// Add table_name to engine arguments
assert(storage_engine_arguments->children.size() >= 2);
storage_engine_arguments->children.insert(storage_engine_arguments->children.begin() + 2, std::make_shared<ASTLiteral>(table_id.table_name));
return create_table_query;
}
ASTPtr DatabasePostgreSQL::getColumnDeclaration(const DataTypePtr & data_type) const
{
WhichDataType which(data_type);
if (which.isNullable())
return makeASTFunction("Nullable", getColumnDeclaration(typeid_cast<const DataTypeNullable *>(data_type.get())->getNestedType()));
if (which.isArray())
return makeASTFunction("Array", getColumnDeclaration(typeid_cast<const DataTypeArray *>(data_type.get())->getNestedType()));
return std::make_shared<ASTIdentifier>(data_type->getName());
}
}
#endif

View File

@ -0,0 +1,91 @@
#pragma once
#if !defined(ARCADIA_BUILD)
#include "config_core.h"
#endif
#if USE_LIBPQXX
#include <Databases/DatabasesCommon.h>
#include <Core/BackgroundSchedulePool.h>
#include <Parsers/ASTCreateQuery.h>
namespace DB
{
class Context;
class PostgreSQLConnection;
using PostgreSQLConnectionPtr = std::shared_ptr<PostgreSQLConnection>;
/** Real-time access to table list and table structure from remote PostgreSQL.
* All tables are created after pull-out structure from remote PostgreSQL.
* If `cache_tables` == 1 (default: 0) table structure is cached and not checked for being modififed,
* but it will be updated during detach->attach.
*/
class DatabasePostgreSQL final : public IDatabase
{
public:
DatabasePostgreSQL(
const Context & context,
const String & metadata_path_,
const ASTStorage * database_engine_define,
const String & dbname_,
const String & postgres_dbname,
PostgreSQLConnectionPtr connection_,
const bool cache_tables_);
String getEngineName() const override { return "PostgreSQL"; }
String getMetadataPath() const override { return metadata_path; }
bool canContainMergeTreeTables() const override { return false; }
bool canContainDistributedTables() const override { return false; }
bool shouldBeEmptyOnDetach() const override { return false; }
ASTPtr getCreateDatabaseQuery() const override;
bool empty() const override;
void loadStoredObjects(Context &, bool, bool force_attach) override;
DatabaseTablesIteratorPtr getTablesIterator(const Context & context, const FilterByNameFunction & filter_by_table_name) override;
bool isTableExist(const String & name, const Context & context) const override;
StoragePtr tryGetTable(const String & name, const Context & context) const override;
void createTable(const Context &, const String & table_name, const StoragePtr & storage, const ASTPtr & create_query) override;
void dropTable(const Context &, const String & table_name, bool no_delay) override;
void attachTable(const String & table_name, const StoragePtr & storage, const String & relative_table_path) override;
StoragePtr detachTable(const String & table_name) override;
void drop(const Context & /*context*/) override;
void shutdown() override;
protected:
ASTPtr getCreateTableQueryImpl(const String & table_name, const Context & context, bool throw_on_error) const override;
private:
const Context & global_context;
String metadata_path;
ASTPtr database_engine_define;
String dbname;
PostgreSQLConnectionPtr connection;
const bool cache_tables;
mutable Tables cached_tables;
std::unordered_set<std::string> detached_or_dropped;
BackgroundSchedulePool::TaskHolder cleaner_task;
bool checkPostgresTable(const String & table_name) const;
std::unordered_set<std::string> fetchTablesList() const;
StoragePtr fetchTable(const String & table_name, const Context & context, const bool table_checked) const;
void removeOutdatedTables();
ASTPtr getColumnDeclaration(const DataTypePtr & data_type) const;
};
}
#endif

View File

@ -0,0 +1,139 @@
#include <Databases/PostgreSQL/fetchPostgreSQLTableStructure.h>
#if USE_LIBPQXX
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/trim.hpp>
#include <pqxx/pqxx>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_TABLE;
extern const int BAD_ARGUMENTS;
}
static DataTypePtr convertPostgreSQLDataType(std::string & type, bool is_nullable, uint16_t dimensions)
{
DataTypePtr res;
/// Get rid of trailing '[]' for arrays
if (dimensions)
while (type.ends_with("[]"))
type.resize(type.size() - 2);
if (type == "smallint")
res = std::make_shared<DataTypeInt16>();
else if (type == "integer")
res = std::make_shared<DataTypeInt32>();
else if (type == "bigint")
res = std::make_shared<DataTypeInt64>();
else if (type == "real")
res = std::make_shared<DataTypeFloat32>();
else if (type == "double precision")
res = std::make_shared<DataTypeFloat64>();
else if (type == "serial")
res = std::make_shared<DataTypeUInt32>();
else if (type == "bigserial")
res = std::make_shared<DataTypeUInt64>();
else if (type.starts_with("timestamp"))
res = std::make_shared<DataTypeDateTime>();
else if (type == "date")
res = std::make_shared<DataTypeDate>();
else if (type.starts_with("numeric"))
{
/// Numeric and decimal will both end up here as numeric.
res = DataTypeFactory::instance().get(type);
uint32_t precision = getDecimalPrecision(*res);
uint32_t scale = getDecimalScale(*res);
if (precision <= DecimalUtils::maxPrecision<Decimal32>())
res = std::make_shared<DataTypeDecimal<Decimal32>>(precision, scale);
else if (precision <= DecimalUtils::maxPrecision<Decimal64>())
res = std::make_shared<DataTypeDecimal<Decimal64>>(precision, scale);
else if (precision <= DecimalUtils::maxPrecision<Decimal128>())
res = std::make_shared<DataTypeDecimal<Decimal128>>(precision, scale);
else if (precision <= DecimalUtils::maxPrecision<Decimal256>())
res = std::make_shared<DataTypeDecimal<Decimal256>>(precision, scale);
}
if (!res)
res = std::make_shared<DataTypeString>();
if (is_nullable)
res = std::make_shared<DataTypeNullable>(res);
while (dimensions--)
res = std::make_shared<DataTypeArray>(res);
return res;
}
std::shared_ptr<NamesAndTypesList> fetchPostgreSQLTableStructure(
std::shared_ptr<pqxx::connection> connection, const String & postgres_table_name, bool use_nulls)
{
auto columns = NamesAndTypesList();
if (postgres_table_name.find('\'') != std::string::npos
|| postgres_table_name.find('\\') != std::string::npos)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "PostgreSQL table name cannot contain single quote or backslash characters, passed {}",
postgres_table_name);
}
std::string query = fmt::format(
"SELECT attname AS name, format_type(atttypid, atttypmod) AS type, "
"attnotnull AS not_null, attndims AS dims "
"FROM pg_attribute "
"WHERE attrelid = '{}'::regclass "
"AND NOT attisdropped AND attnum > 0", postgres_table_name);
try
{
pqxx::read_transaction tx(*connection);
pqxx::stream_from stream(tx, pqxx::from_query, std::string_view(query));
std::tuple<std::string, std::string, std::string, uint16_t> row;
while (stream >> row)
{
columns.push_back(NameAndTypePair(
std::get<0>(row),
convertPostgreSQLDataType(
std::get<1>(row),
use_nulls && (std::get<2>(row) == "f"), /// 'f' means that postgres `not_null` is false, i.e. value is nullable
std::get<3>(row))));
}
stream.complete();
tx.commit();
}
catch (const pqxx::undefined_table &)
{
throw Exception(fmt::format(
"PostgreSQL table {}.{} does not exist",
connection->dbname(), postgres_table_name), ErrorCodes::UNKNOWN_TABLE);
}
catch (Exception & e)
{
e.addMessage("while fetching postgresql table structure");
throw;
}
if (columns.empty())
return nullptr;
return std::make_shared<NamesAndTypesList>(columns);
}
}
#endif

View File

@ -0,0 +1,19 @@
#pragma once
#if !defined(ARCADIA_BUILD)
#include "config_core.h"
#endif
#if USE_LIBPQXX
#include <Storages/StoragePostgreSQL.h>
namespace DB
{
std::shared_ptr<NamesAndTypesList> fetchPostgreSQLTableStructure(
std::shared_ptr<pqxx::connection> connection, const String & postgres_table_name, bool use_nulls);
}
#endif

View File

@ -8,7 +8,7 @@ PEERDIR(
SRCS(
<? find . -name '*.cpp' | sed 's/^\.\// /' | sort ?>
<? find . -name '*.cpp' | grep -v -F 'PostgreSQL' | sed 's/^\.\// /' | sort ?>
)
END()

View File

@ -0,0 +1,196 @@
#include "PostgreSQLDictionarySource.h"
#include <Poco/Util/AbstractConfiguration.h>
#include "DictionarySourceFactory.h"
#include "registerDictionaries.h"
#if USE_LIBPQXX
#include <Columns/ColumnString.h>
#include <DataTypes/DataTypeString.h>
#include <DataStreams/PostgreSQLBlockInputStream.h>
#include <Storages/PostgreSQL/PostgreSQLConnection.h>
#include "readInvalidateQuery.h"
#endif
namespace DB
{
namespace ErrorCodes
{
extern const int SUPPORT_IS_DISABLED;
}
#if USE_LIBPQXX
static const UInt64 max_block_size = 8192;
PostgreSQLDictionarySource::PostgreSQLDictionarySource(
const DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config_,
const std::string & config_prefix,
PostgreSQLConnectionPtr connection_,
const Block & sample_block_)
: dict_struct{dict_struct_}
, sample_block(sample_block_)
, connection(std::move(connection_))
, log(&Poco::Logger::get("PostgreSQLDictionarySource"))
, db(config_.getString(fmt::format("{}.db", config_prefix), ""))
, table(config_.getString(fmt::format("{}.table", config_prefix), ""))
, where(config_.getString(fmt::format("{}.where", config_prefix), ""))
, query_builder(dict_struct, "", "", table, where, IdentifierQuotingStyle::DoubleQuotes)
, load_all_query(query_builder.composeLoadAllQuery())
, invalidate_query(config_.getString(fmt::format("{}.invalidate_query", config_prefix), ""))
, update_field(config_.getString(fmt::format("{}.update_field", config_prefix), ""))
{
}
/// copy-constructor is provided in order to support cloneability
PostgreSQLDictionarySource::PostgreSQLDictionarySource(const PostgreSQLDictionarySource & other)
: dict_struct(other.dict_struct)
, sample_block(other.sample_block)
, connection(std::make_shared<PostgreSQLConnection>(other.connection->conn_str()))
, log(&Poco::Logger::get("PostgreSQLDictionarySource"))
, db(other.db)
, table(other.table)
, where(other.where)
, query_builder(dict_struct, "", "", table, where, IdentifierQuotingStyle::DoubleQuotes)
, load_all_query(query_builder.composeLoadAllQuery())
, invalidate_query(other.invalidate_query)
, update_time(other.update_time)
, update_field(other.update_field)
, invalidate_query_response(other.invalidate_query_response)
{
}
BlockInputStreamPtr PostgreSQLDictionarySource::loadAll()
{
LOG_TRACE(log, load_all_query);
return std::make_shared<PostgreSQLBlockInputStream>(
connection->conn(), load_all_query, sample_block, max_block_size);
}
BlockInputStreamPtr PostgreSQLDictionarySource::loadUpdatedAll()
{
auto load_update_query = getUpdateFieldAndDate();
LOG_TRACE(log, load_update_query);
return std::make_shared<PostgreSQLBlockInputStream>(connection->conn(), load_update_query, sample_block, max_block_size);
}
BlockInputStreamPtr PostgreSQLDictionarySource::loadIds(const std::vector<UInt64> & ids)
{
const auto query = query_builder.composeLoadIdsQuery(ids);
return std::make_shared<PostgreSQLBlockInputStream>(connection->conn(), query, sample_block, max_block_size);
}
BlockInputStreamPtr PostgreSQLDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
{
const auto query = query_builder.composeLoadKeysQuery(key_columns, requested_rows, ExternalQueryBuilder::AND_OR_CHAIN);
return std::make_shared<PostgreSQLBlockInputStream>(connection->conn(), query, sample_block, max_block_size);
}
bool PostgreSQLDictionarySource::isModified() const
{
if (!invalidate_query.empty())
{
auto response = doInvalidateQuery(invalidate_query);
if (response == invalidate_query_response)
return false;
invalidate_query_response = response;
}
return true;
}
std::string PostgreSQLDictionarySource::doInvalidateQuery(const std::string & request) const
{
Block invalidate_sample_block;
ColumnPtr column(ColumnString::create());
invalidate_sample_block.insert(ColumnWithTypeAndName(column, std::make_shared<DataTypeString>(), "Sample Block"));
PostgreSQLBlockInputStream block_input_stream(connection->conn(), request, invalidate_sample_block, 1);
return readInvalidateQuery(block_input_stream);
}
bool PostgreSQLDictionarySource::hasUpdateField() const
{
return !update_field.empty();
}
std::string PostgreSQLDictionarySource::getUpdateFieldAndDate()
{
if (update_time != std::chrono::system_clock::from_time_t(0))
{
auto tmp_time = update_time;
update_time = std::chrono::system_clock::now();
time_t hr_time = std::chrono::system_clock::to_time_t(tmp_time) - 1;
std::string str_time = std::to_string(LocalDateTime(hr_time));
return query_builder.composeUpdateQuery(update_field, str_time);
}
else
{
update_time = std::chrono::system_clock::now();
return query_builder.composeLoadAllQuery();
}
}
bool PostgreSQLDictionarySource::supportsSelectiveLoad() const
{
return true;
}
DictionarySourcePtr PostgreSQLDictionarySource::clone() const
{
return std::make_unique<PostgreSQLDictionarySource>(*this);
}
std::string PostgreSQLDictionarySource::toString() const
{
return "PostgreSQL: " + db + '.' + table + (where.empty() ? "" : ", where: " + where);
}
#endif
void registerDictionarySourcePostgreSQL(DictionarySourceFactory & factory)
{
auto create_table_source = [=](const DictionaryStructure & dict_struct,
const Poco::Util::AbstractConfiguration & config,
const std::string & root_config_prefix,
Block & sample_block,
const Context & /* context */,
const std::string & /* default_database */,
bool /* check_config */) -> DictionarySourcePtr
{
#if USE_LIBPQXX
const auto config_prefix = root_config_prefix + ".postgresql";
auto connection = std::make_shared<PostgreSQLConnection>(
config.getString(fmt::format("{}.db", config_prefix), ""),
config.getString(fmt::format("{}.host", config_prefix), ""),
config.getUInt(fmt::format("{}.port", config_prefix), 0),
config.getString(fmt::format("{}.user", config_prefix), ""),
config.getString(fmt::format("{}.password", config_prefix), ""));
return std::make_unique<PostgreSQLDictionarySource>(
dict_struct, config, config_prefix, connection, sample_block);
#else
(void)dict_struct;
(void)config;
(void)root_config_prefix;
(void)sample_block;
throw Exception{"Dictionary source of type `postgresql` is disabled because ClickHouse was built without postgresql support.",
ErrorCodes::SUPPORT_IS_DISABLED};
#endif
};
factory.registerSource("postgresql", create_table_source);
}
}

View File

@ -0,0 +1,70 @@
#pragma once
#if !defined(ARCADIA_BUILD)
#include "config_core.h"
#endif
#include "DictionaryStructure.h"
#include "IDictionarySource.h"
#if USE_LIBPQXX
#include "ExternalQueryBuilder.h"
#include <Core/Block.h>
#include <common/LocalDateTime.h>
#include <common/logger_useful.h>
#include <Storages/StoragePostgreSQL.h>
#include <pqxx/pqxx>
namespace DB
{
/// Allows loading dictionaries from a PostgreSQL database
class PostgreSQLDictionarySource final : public IDictionarySource
{
public:
PostgreSQLDictionarySource(
const DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config_,
const std::string & config_prefix,
PostgreSQLConnectionPtr connection_,
const Block & sample_block_);
/// copy-constructor is provided in order to support cloneability
PostgreSQLDictionarySource(const PostgreSQLDictionarySource & other);
PostgreSQLDictionarySource & operator=(const PostgreSQLDictionarySource &) = delete;
BlockInputStreamPtr loadAll() override;
BlockInputStreamPtr loadUpdatedAll() override;
BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override;
BlockInputStreamPtr loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) override;
bool isModified() const override;
bool supportsSelectiveLoad() const override;
bool hasUpdateField() const override;
DictionarySourcePtr clone() const override;
std::string toString() const override;
private:
std::string getUpdateFieldAndDate();
std::string doInvalidateQuery(const std::string & request) const;
const DictionaryStructure dict_struct;
Block sample_block;
PostgreSQLConnectionPtr connection;
Poco::Logger * log;
const std::string db;
const std::string table;
const std::string where;
ExternalQueryBuilder query_builder;
const std::string load_all_query;
std::string invalidate_query;
std::chrono::time_point<std::chrono::system_clock> update_time;
const std::string update_field;
mutable std::string invalidate_query_response;
};
}
#endif

View File

@ -1644,6 +1644,8 @@ void SSDComplexKeyCacheDictionary::has(
const DataTypes & key_types,
PaddedPODArray<UInt8> & out) const
{
dict_struct.validateKeyTypes(key_types);
const auto now = std::chrono::system_clock::now();
std::unordered_map<KeyRef, std::vector<size_t>> not_found_keys;

View File

@ -4,6 +4,40 @@
namespace DB
{
class DictionarySourceFactory;
void registerDictionarySourceFile(DictionarySourceFactory & source_factory);
void registerDictionarySourceMysql(DictionarySourceFactory & source_factory);
void registerDictionarySourceClickHouse(DictionarySourceFactory & source_factory);
void registerDictionarySourceMongoDB(DictionarySourceFactory & source_factory);
void registerDictionarySourceCassandra(DictionarySourceFactory & source_factory);
void registerDictionarySourceRedis(DictionarySourceFactory & source_factory);
void registerDictionarySourceXDBC(DictionarySourceFactory & source_factory);
void registerDictionarySourceJDBC(DictionarySourceFactory & source_factory);
#if !defined(ARCADIA_BUILD)
void registerDictionarySourcePostgreSQL(DictionarySourceFactory & source_factory);
#endif
void registerDictionarySourceExecutable(DictionarySourceFactory & source_factory);
void registerDictionarySourceHTTP(DictionarySourceFactory & source_factory);
void registerDictionarySourceLibrary(DictionarySourceFactory & source_factory);
class DictionaryFactory;
void registerDictionaryRangeHashed(DictionaryFactory & factory);
void registerDictionaryComplexKeyHashed(DictionaryFactory & factory);
void registerDictionaryComplexKeyCache(DictionaryFactory & factory);
void registerDictionaryComplexKeyDirect(DictionaryFactory & factory);
void registerDictionaryTrie(DictionaryFactory & factory);
void registerDictionaryFlat(DictionaryFactory & factory);
void registerDictionaryHashed(DictionaryFactory & factory);
void registerDictionaryCache(DictionaryFactory & factory);
#if defined(__linux__) || defined(__FreeBSD__)
void registerDictionarySSDCache(DictionaryFactory & factory);
void registerDictionarySSDComplexKeyCache(DictionaryFactory & factory);
#endif
void registerDictionaryPolygon(DictionaryFactory & factory);
void registerDictionaryDirect(DictionaryFactory & factory);
void registerDictionaries()
{
{
@ -16,6 +50,9 @@ void registerDictionaries()
registerDictionarySourceCassandra(source_factory);
registerDictionarySourceXDBC(source_factory);
registerDictionarySourceJDBC(source_factory);
#if !defined(ARCADIA_BUILD)
registerDictionarySourcePostgreSQL(source_factory);
#endif
registerDictionarySourceExecutable(source_factory);
registerDictionarySourceHTTP(source_factory);
registerDictionarySourceLibrary(source_factory);

View File

@ -2,36 +2,5 @@
namespace DB
{
class DictionarySourceFactory;
void registerDictionarySourceFile(DictionarySourceFactory & source_factory);
void registerDictionarySourceMysql(DictionarySourceFactory & source_factory);
void registerDictionarySourceClickHouse(DictionarySourceFactory & source_factory);
void registerDictionarySourceMongoDB(DictionarySourceFactory & source_factory);
void registerDictionarySourceCassandra(DictionarySourceFactory & source_factory);
void registerDictionarySourceRedis(DictionarySourceFactory & source_factory);
void registerDictionarySourceXDBC(DictionarySourceFactory & source_factory);
void registerDictionarySourceJDBC(DictionarySourceFactory & source_factory);
void registerDictionarySourceExecutable(DictionarySourceFactory & source_factory);
void registerDictionarySourceHTTP(DictionarySourceFactory & source_factory);
void registerDictionarySourceLibrary(DictionarySourceFactory & source_factory);
class DictionaryFactory;
void registerDictionaryRangeHashed(DictionaryFactory & factory);
void registerDictionaryComplexKeyHashed(DictionaryFactory & factory);
void registerDictionaryComplexKeyCache(DictionaryFactory & factory);
void registerDictionaryComplexKeyDirect(DictionaryFactory & factory);
void registerDictionaryTrie(DictionaryFactory & factory);
void registerDictionaryFlat(DictionaryFactory & factory);
void registerDictionaryHashed(DictionaryFactory & factory);
void registerDictionaryCache(DictionaryFactory & factory);
#if defined(__linux__) || defined(__FreeBSD__)
void registerDictionarySSDCache(DictionaryFactory & factory);
void registerDictionarySSDComplexKeyCache(DictionaryFactory & factory);
#endif
void registerDictionaryPolygon(DictionaryFactory & factory);
void registerDictionaryDirect(DictionaryFactory & factory);
void registerDictionaries();
}

View File

@ -15,7 +15,7 @@ NO_COMPILER_WARNINGS()
SRCS(
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F Trie | sed 's/^\.\// /' | sort ?>
<? find . -name '*.cpp' | grep -v -P 'tests|PostgreSQL' | sed 's/^\.\// /' | sort ?>
)
END()

View File

@ -23,6 +23,7 @@ namespace DB
namespace ErrorCodes
{
extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH;
extern const int NOT_IMPLEMENTED;
}
MySQLBlockInputStream::Connection::Connection(
@ -114,6 +115,8 @@ namespace
case ValueType::vtFixedString:
assert_cast<ColumnFixedString &>(column).insertData(value.data(), value.size());
break;
default:
throw Exception("Unsupported value type", ErrorCodes::NOT_IMPLEMENTED);
}
}

View File

@ -158,8 +158,8 @@ SRCS(
interpretSubquery.cpp
join_common.cpp
loadMetadata.cpp
replaceAliasColumnsInQuery.cpp
processColumnTransformers.cpp
replaceAliasColumnsInQuery.cpp
sortBlock.cpp
)

View File

@ -0,0 +1,40 @@
#if !defined(ARCADIA_BUILD)
#include "config_core.h"
#endif
#if USE_LIBPQXX
#include <Storages/PostgreSQL/PostgreSQLConnection.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
namespace DB
{
PostgreSQLConnection::ConnectionPtr PostgreSQLConnection::conn()
{
checkUpdateConnection();
return connection;
}
void PostgreSQLConnection::checkUpdateConnection()
{
if (!connection || !connection->is_open())
connection = std::make_unique<pqxx::connection>(connection_str);
}
std::string PostgreSQLConnection::formatConnectionString(
std::string dbname, std::string host, UInt16 port, std::string user, std::string password)
{
WriteBufferFromOwnString out;
out << "dbname=" << quote << dbname
<< " host=" << quote << host
<< " port=" << port
<< " user=" << quote << user
<< " password=" << quote << password;
return out.str();
}
}
#endif

View File

@ -0,0 +1,48 @@
#pragma once
#if !defined(ARCADIA_BUILD)
#include "config_core.h"
#endif
#if USE_LIBPQXX
#include <pqxx/pqxx> // Y_IGNORE
#include <Core/Types.h>
namespace DB
{
/// Tiny connection class to make it more convenient to use.
/// Connection is not made until actually used.
class PostgreSQLConnection
{
using ConnectionPtr = std::shared_ptr<pqxx::connection>;
public:
PostgreSQLConnection(std::string dbname, std::string host, UInt16 port, std::string user, std::string password)
: connection_str(formatConnectionString(std::move(dbname), std::move(host), port, std::move(user), std::move(password))) {}
PostgreSQLConnection(const std::string & connection_str_) : connection_str(connection_str_) {}
PostgreSQLConnection(const PostgreSQLConnection &) = delete;
PostgreSQLConnection operator =(const PostgreSQLConnection &) = delete;
ConnectionPtr conn();
std::string & conn_str() { return connection_str; }
private:
ConnectionPtr connection;
std::string connection_str;
static std::string formatConnectionString(
std::string dbname, std::string host, UInt16 port, std::string user, std::string password);
void checkUpdateConnection();
};
using PostgreSQLConnectionPtr = std::shared_ptr<PostgreSQLConnection>;
}
#endif

View File

@ -0,0 +1,324 @@
#include "StoragePostgreSQL.h"
#if USE_LIBPQXX
#include <Storages/StorageFactory.h>
#include <Storages/transformQueryForExternalDatabase.h>
#include <Storages/PostgreSQL/PostgreSQLConnection.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/Context.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypeNullable.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnDecimal.h>
#include <DataStreams/PostgreSQLBlockInputStream.h>
#include <Core/Settings.h>
#include <Common/parseAddress.h>
#include <Common/assert_cast.h>
#include <Parsers/ASTLiteral.h>
#include <Columns/ColumnNullable.h>
#include <Formats/FormatFactory.h>
#include <Formats/FormatSettings.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Pipe.h>
#include <IO/WriteHelpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int NOT_IMPLEMENTED;
}
StoragePostgreSQL::StoragePostgreSQL(
const StorageID & table_id_,
const String & remote_table_name_,
PostgreSQLConnectionPtr connection_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const Context & context_)
: IStorage(table_id_)
, remote_table_name(remote_table_name_)
, global_context(context_)
, connection(std::move(connection_))
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
storage_metadata.setConstraints(constraints_);
setInMemoryMetadata(storage_metadata);
}
Pipe StoragePostgreSQL::read(
const Names & column_names_,
const StorageMetadataPtr & metadata_snapshot,
SelectQueryInfo & query_info_,
const Context & context_,
QueryProcessingStage::Enum /*processed_stage*/,
size_t max_block_size_,
unsigned)
{
metadata_snapshot->check(column_names_, getVirtuals(), getStorageID());
String query = transformQueryForExternalDatabase(
query_info_, metadata_snapshot->getColumns().getOrdinary(),
IdentifierQuotingStyle::DoubleQuotes, "", remote_table_name, context_);
Block sample_block;
for (const String & column_name : column_names_)
{
auto column_data = metadata_snapshot->getColumns().getPhysical(column_name);
WhichDataType which(column_data.type);
if (which.isEnum())
column_data.type = std::make_shared<DataTypeString>();
sample_block.insert({ column_data.type, column_data.name });
}
return Pipe(std::make_shared<SourceFromInputStream>(
std::make_shared<PostgreSQLBlockInputStream>(connection->conn(), query, sample_block, max_block_size_)));
}
class PostgreSQLBlockOutputStream : public IBlockOutputStream
{
public:
explicit PostgreSQLBlockOutputStream(
const StorageMetadataPtr & metadata_snapshot_,
ConnectionPtr connection_,
const std::string & remote_table_name_)
: metadata_snapshot(metadata_snapshot_)
, connection(connection_)
, remote_table_name(remote_table_name_)
{
}
Block getHeader() const override { return metadata_snapshot->getSampleBlock(); }
void writePrefix() override
{
work = std::make_unique<pqxx::work>(*connection);
}
void write(const Block & block) override
{
if (!work)
return;
const auto columns = block.getColumns();
const size_t num_rows = block.rows(), num_cols = block.columns();
const auto data_types = block.getDataTypes();
if (!stream_inserter)
stream_inserter = std::make_unique<pqxx::stream_to>(*work, remote_table_name, block.getNames());
/// std::optional lets libpqxx to know if value is NULL
std::vector<std::optional<std::string>> row(num_cols);
for (const auto i : ext::range(0, num_rows))
{
for (const auto j : ext::range(0, num_cols))
{
if (columns[j]->isNullAt(i))
{
row[j] = std::nullopt;
}
else
{
WriteBufferFromOwnString ostr;
if (isArray(data_types[j]))
{
parseArray((*columns[j])[i], data_types[j], ostr);
}
else
{
data_types[j]->serializeAsText(*columns[j], i, ostr, FormatSettings{});
}
row[j] = ostr.str();
}
}
stream_inserter->write_values(row);
}
}
void writeSuffix() override
{
if (stream_inserter)
{
stream_inserter->complete();
work->commit();
}
}
/// Cannot just use serializeAsText for array data type even though it converts perfectly
/// any dimension number array into text format, because it incloses in '[]' and for postgres it must be '{}'.
/// Check if array[...] syntax from PostgreSQL will be applicable.
void parseArray(const Field & array_field, const DataTypePtr & data_type, WriteBuffer & ostr)
{
const auto * array_type = typeid_cast<const DataTypeArray *>(data_type.get());
const auto & nested = array_type->getNestedType();
const auto & array = array_field.get<Array>();
if (!isArray(nested))
{
writeText(clickhouseToPostgresArray(array, data_type), ostr);
return;
}
writeChar('{', ostr);
const auto * nested_array_type = typeid_cast<const DataTypeArray *>(nested.get());
for (auto iter = array.begin(); iter != array.end(); ++iter)
{
if (iter != array.begin())
writeText(", ", ostr);
if (!isArray(nested_array_type->getNestedType()))
{
writeText(clickhouseToPostgresArray(iter->get<Array>(), nested), ostr);
}
else
{
parseArray(*iter, nested, ostr);
}
}
writeChar('}', ostr);
}
/// Conversion is done via column casting because with writeText(Array..) got incorrect conversion
/// of Date and DateTime data types and it added extra quotes for values inside array.
static std::string clickhouseToPostgresArray(const Array & array_field, const DataTypePtr & data_type)
{
auto nested = typeid_cast<const DataTypeArray *>(data_type.get())->getNestedType();
auto array_column = ColumnArray::create(createNested(nested));
array_column->insert(array_field);
WriteBufferFromOwnString ostr;
data_type->serializeAsText(*array_column, 0, ostr, FormatSettings{});
/// ostr is guaranteed to be at least '[]', i.e. size is at least 2 and 2 only if ostr.str() == '[]'
assert(ostr.str().size() >= 2);
return '{' + std::string(ostr.str().begin() + 1, ostr.str().end() - 1) + '}';
}
static MutableColumnPtr createNested(DataTypePtr nested)
{
bool is_nullable = false;
if (nested->isNullable())
{
is_nullable = true;
nested = static_cast<const DataTypeNullable *>(nested.get())->getNestedType();
}
WhichDataType which(nested);
MutableColumnPtr nested_column;
if (which.isString() || which.isFixedString()) nested_column = ColumnString::create();
else if (which.isInt8() || which.isInt16()) nested_column = ColumnInt16::create();
else if (which.isUInt8() || which.isUInt16()) nested_column = ColumnUInt16::create();
else if (which.isInt32()) nested_column = ColumnInt32::create();
else if (which.isUInt32()) nested_column = ColumnUInt32::create();
else if (which.isInt64()) nested_column = ColumnInt64::create();
else if (which.isUInt64()) nested_column = ColumnUInt64::create();
else if (which.isFloat32()) nested_column = ColumnFloat32::create();
else if (which.isFloat64()) nested_column = ColumnFloat64::create();
else if (which.isDate()) nested_column = ColumnUInt16::create();
else if (which.isDateTime()) nested_column = ColumnUInt32::create();
else if (which.isDecimal32())
{
const auto & type = typeid_cast<const DataTypeDecimal<Decimal32> *>(nested.get());
nested_column = ColumnDecimal<Decimal32>::create(0, type->getScale());
}
else if (which.isDecimal64())
{
const auto & type = typeid_cast<const DataTypeDecimal<Decimal64> *>(nested.get());
nested_column = ColumnDecimal<Decimal64>::create(0, type->getScale());
}
else if (which.isDecimal128())
{
const auto & type = typeid_cast<const DataTypeDecimal<Decimal128> *>(nested.get());
nested_column = ColumnDecimal<Decimal128>::create(0, type->getScale());
}
else if (which.isDecimal256())
{
const auto & type = typeid_cast<const DataTypeDecimal<Decimal256> *>(nested.get());
nested_column = ColumnDecimal<Decimal256>::create(0, type->getScale());
}
else
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Type conversion not supported");
if (is_nullable)
return ColumnNullable::create(std::move(nested_column), ColumnUInt8::create(nested_column->size(), 0));
return nested_column;
}
private:
StorageMetadataPtr metadata_snapshot;
ConnectionPtr connection;
std::string remote_table_name;
std::unique_ptr<pqxx::work> work;
std::unique_ptr<pqxx::stream_to> stream_inserter;
};
BlockOutputStreamPtr StoragePostgreSQL::write(
const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /* context */)
{
return std::make_shared<PostgreSQLBlockOutputStream>(metadata_snapshot, connection->conn(), remote_table_name);
}
void registerStoragePostgreSQL(StorageFactory & factory)
{
factory.registerStorage("PostgreSQL", [](const StorageFactory::Arguments & args)
{
ASTs & engine_args = args.engine_args;
if (engine_args.size() != 5)
throw Exception("Storage PostgreSQL requires 5 parameters: "
"PostgreSQL('host:port', 'database', 'table', 'username', 'password'.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
for (auto & engine_arg : engine_args)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, args.local_context);
auto parsed_host_port = parseAddress(engine_args[0]->as<ASTLiteral &>().value.safeGet<String>(), 5432);
const String & remote_table = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
auto connection = std::make_shared<PostgreSQLConnection>(
engine_args[1]->as<ASTLiteral &>().value.safeGet<String>(),
parsed_host_port.first,
parsed_host_port.second,
engine_args[3]->as<ASTLiteral &>().value.safeGet<String>(),
engine_args[4]->as<ASTLiteral &>().value.safeGet<String>());
return StoragePostgreSQL::create(
args.table_id, remote_table, connection, args.columns, args.constraints, args.context);
},
{
.source_access_type = AccessType::POSTGRES,
});
}
}
#endif

View File

@ -0,0 +1,56 @@
#pragma once
#if !defined(ARCADIA_BUILD)
#include "config_core.h"
#endif
#if USE_LIBPQXX
#include <ext/shared_ptr_helper.h>
#include <Interpreters/Context.h>
#include <Storages/IStorage.h>
#include <DataStreams/IBlockOutputStream.h>
#include <pqxx/pqxx>
namespace DB
{
class PostgreSQLConnection;
using PostgreSQLConnectionPtr = std::shared_ptr<PostgreSQLConnection>;
class StoragePostgreSQL final : public ext::shared_ptr_helper<StoragePostgreSQL>, public IStorage
{
friend struct ext::shared_ptr_helper<StoragePostgreSQL>;
public:
StoragePostgreSQL(
const StorageID & table_id_,
const std::string & remote_table_name_,
PostgreSQLConnectionPtr connection_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const Context & context_);
String getName() const override { return "PostgreSQL"; }
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, const Context & context) override;
private:
friend class PostgreSQLBlockOutputStream;
String remote_table_name;
Context global_context;
PostgreSQLConnectionPtr connection;
};
}
#endif

View File

@ -58,6 +58,9 @@ void registerStorageRabbitMQ(StorageFactory & factory);
void registerStorageEmbeddedRocksDB(StorageFactory & factory);
#endif
#if USE_LIBPQXX
void registerStoragePostgreSQL(StorageFactory & factory);
#endif
void registerStorages()
{
@ -111,6 +114,10 @@ void registerStorages()
#if USE_ROCKSDB
registerStorageEmbeddedRocksDB(factory);
#endif
#if USE_LIBPQXX
registerStoragePostgreSQL(factory);
#endif
}
}

View File

@ -10,7 +10,7 @@ PEERDIR(
SRCS(
<? find . -name '*.cpp' | grep -v -F tests | grep -v -P 'Kafka|RabbitMQ|S3|HDFS|Licenses|TimeZones|RocksDB' | sed 's/^\.\// /' | sort ?>
<? find . -name '*.cpp' | grep -v -F tests | grep -v -P 'Kafka|RabbitMQ|S3|HDFS|Licenses|TimeZones|RocksDB|PostgreSQL' | sed 's/^\.\// /' | sort ?>
)
END()

View File

@ -0,0 +1,84 @@
#include <TableFunctions/TableFunctionPostgreSQL.h>
#if USE_LIBPQXX
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <TableFunctions/ITableFunction.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Common/Exception.h>
#include <Common/parseAddress.h>
#include "registerTableFunctions.h"
#include <Databases/PostgreSQL/fetchPostgreSQLTableStructure.h>
#include <Storages/PostgreSQL/PostgreSQLConnection.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int BAD_ARGUMENTS;
}
StoragePtr TableFunctionPostgreSQL::executeImpl(const ASTPtr & /*ast_function*/,
const Context & context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
{
auto columns = getActualTableStructure(context);
auto result = std::make_shared<StoragePostgreSQL>(
StorageID(getDatabaseName(), table_name), remote_table_name,
connection, columns, ConstraintsDescription{}, context);
result->startup();
return result;
}
ColumnsDescription TableFunctionPostgreSQL::getActualTableStructure(const Context & context) const
{
const bool use_nulls = context.getSettingsRef().external_table_functions_use_nulls;
auto columns = fetchPostgreSQLTableStructure(connection->conn(), remote_table_name, use_nulls);
return ColumnsDescription{*columns};
}
void TableFunctionPostgreSQL::parseArguments(const ASTPtr & ast_function, const Context & context)
{
const auto & func_args = ast_function->as<ASTFunction &>();
if (!func_args.arguments)
throw Exception("Table function 'PostgreSQL' must have arguments.", ErrorCodes::BAD_ARGUMENTS);
ASTs & args = func_args.arguments->children;
if (args.size() != 5)
throw Exception("Table function 'PostgreSQL' requires 5 parameters: "
"PostgreSQL('host:port', 'database', 'table', 'user', 'password').",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
for (auto & arg : args)
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context);
auto parsed_host_port = parseAddress(args[0]->as<ASTLiteral &>().value.safeGet<String>(), 5432);
remote_table_name = args[2]->as<ASTLiteral &>().value.safeGet<String>();
connection = std::make_shared<PostgreSQLConnection>(
args[1]->as<ASTLiteral &>().value.safeGet<String>(),
parsed_host_port.first,
parsed_host_port.second,
args[3]->as<ASTLiteral &>().value.safeGet<String>(),
args[4]->as<ASTLiteral &>().value.safeGet<String>());
}
void registerTableFunctionPostgreSQL(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionPostgreSQL>();
}
}
#endif

View File

@ -0,0 +1,39 @@
#pragma once
#if !defined(ARCADIA_BUILD)
#include "config_core.h"
#endif
#if USE_LIBPQXX
#include <TableFunctions/ITableFunction.h>
namespace DB
{
class PostgreSQLConnection;
using PostgreSQLConnectionPtr = std::shared_ptr<PostgreSQLConnection>;
class TableFunctionPostgreSQL : public ITableFunction
{
public:
static constexpr auto name = "postgresql";
std::string getName() const override { return name; }
private:
StoragePtr executeImpl(
const ASTPtr & ast_function, const Context & context,
const std::string & table_name, ColumnsDescription cached_columns) const override;
const char * getStorageTypeName() const override { return "PostgreSQL"; }
ColumnsDescription getActualTableStructure(const Context & context) const override;
void parseArguments(const ASTPtr & ast_function, const Context & context) override;
String connection_str;
String remote_table_name;
PostgreSQLConnectionPtr connection;
};
}
#endif

View File

@ -36,6 +36,10 @@ void registerTableFunctions()
#if USE_MYSQL
registerTableFunctionMySQL(factory);
#endif
#if USE_LIBPQXX
registerTableFunctionPostgreSQL(factory);
#endif
}
}

View File

@ -37,6 +37,9 @@ void registerTableFunctionView(TableFunctionFactory & factory);
void registerTableFunctionMySQL(TableFunctionFactory & factory);
#endif
#if USE_LIBPQXX
void registerTableFunctionPostgreSQL(TableFunctionFactory & factory);
#endif
void registerTableFunctions();

View File

@ -8,7 +8,7 @@ PEERDIR(
SRCS(
<? find . -name '*.cpp' | grep -v -P 'S3|HDFS' | sed 's/^\.\// /' | sort ?>
<? find . -name '*.cpp' | grep -v -P 'S3|HDFS|PostgreSQL' | sed 's/^\.\// /' | sort ?>
)
END()

View File

@ -0,0 +1,4 @@
<?xml version="1.0"?>
<yandex>
<dictionaries_config>/etc/clickhouse-server/config.d/postgres_dict.xml</dictionaries_config>
</yandex>

View File

@ -0,0 +1,12 @@
<?xml version="1.0" encoding="utf-8"?>
<yandex>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/log.log</log>
<errorlog>/var/log/clickhouse-server/log.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
</logger>
</yandex>

View File

@ -0,0 +1,37 @@
<?xml version="1.0"?>
<yandex>
<dictionary>
<name>dict0</name>
<source>
<postgresql>
<db>clickhouse</db>
<host>postgres1</host>
<port>5432</port>
<user>postgres</user>
<password>mysecretpassword</password>
<table>test0</table>
<invalidate_query>SELECT value FROM test0 WHERE id = 0</invalidate_query>
</postgresql>
</source>
<layout>
<hashed/>
</layout>
<structure>
<id>
<name>id</name>
<type>UInt32</type>
</id>
<attribute>
<name>id</name>
<type>UInt32</type>
<null_value></null_value>
</attribute>
<attribute>
<name>value</name>
<type>UInt32</type>
<null_value></null_value>
</attribute>
</structure>
<lifetime>1</lifetime>
</dictionary>
</yandex>

View File

@ -0,0 +1,119 @@
import pytest
import time
import psycopg2
from helpers.cluster import ClickHouseCluster
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/config.xml', 'configs/postgres_dict.xml', 'configs/log_conf.xml'], with_postgres=True)
postgres_dict_table_template = """
CREATE TABLE IF NOT EXISTS {} (
id Integer NOT NULL, value Integer NOT NULL, PRIMARY KEY (id))
"""
click_dict_table_template = """
CREATE TABLE IF NOT EXISTS `test`.`dict_table_{}` (
`id` UInt64, `value` UInt32
) ENGINE = Dictionary({})
"""
def get_postgres_conn(database=False):
if database == True:
conn_string = "host='localhost' dbname='clickhouse' user='postgres' password='mysecretpassword'"
else:
conn_string = "host='localhost' user='postgres' password='mysecretpassword'"
conn = psycopg2.connect(conn_string)
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
conn.autocommit = True
return conn
def create_postgres_db(conn, name):
cursor = conn.cursor()
cursor.execute("CREATE DATABASE {}".format(name))
def create_postgres_table(conn, table_name):
cursor = conn.cursor()
cursor.execute(postgres_dict_table_template.format(table_name))
def create_and_fill_postgres_table(table_name):
conn = get_postgres_conn(True)
create_postgres_table(conn, table_name)
# Fill postgres table using clickhouse postgres table function and check
table_func = '''postgresql('postgres1:5432', 'clickhouse', '{}', 'postgres', 'mysecretpassword')'''.format(table_name)
node1.query('''INSERT INTO TABLE FUNCTION {} SELECT number, number from numbers(10000)
'''.format(table_func, table_name))
result = node1.query("SELECT count() FROM {}".format(table_func))
assert result.rstrip() == '10000'
def create_dict(table_name, index=0):
node1.query(click_dict_table_template.format(table_name, 'dict' + str(index)))
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
postgres_conn = get_postgres_conn()
node1.query("CREATE DATABASE IF NOT EXISTS test")
print("postgres connected")
create_postgres_db(postgres_conn, 'clickhouse')
yield cluster
finally:
cluster.shutdown()
def test_load_dictionaries(started_cluster):
conn = get_postgres_conn(True)
cursor = conn.cursor()
table_name = 'test0'
create_and_fill_postgres_table(table_name)
create_dict(table_name)
dict_name = 'dict0'
node1.query("SYSTEM RELOAD DICTIONARIES")
assert node1.query("SELECT count() FROM `test`.`dict_table_{}`".format(table_name)).rstrip() == '10000'
assert node1.query("SELECT dictGetUInt32('{}', 'id', toUInt64(0))".format(dict_name)) == '0\n'
assert node1.query("SELECT dictGetUInt32('{}', 'value', toUInt64(9999))".format(dict_name)) == '9999\n'
cursor.execute("DROP TABLE IF EXISTS {}".format(table_name))
def test_invalidate_query(started_cluster):
conn = get_postgres_conn(True)
cursor = conn.cursor()
table_name = 'test0'
create_and_fill_postgres_table(table_name)
# invalidate query: SELECT value FROM test0 WHERE id = 0
dict_name = 'dict0'
create_dict(table_name)
node1.query("SYSTEM RELOAD DICTIONARY {}".format(dict_name))
assert node1.query("SELECT dictGetUInt32('{}', 'value', toUInt64(0))".format(dict_name)) == "0\n"
assert node1.query("SELECT dictGetUInt32('{}', 'value', toUInt64(1))".format(dict_name)) == "1\n"
# update should happen
cursor.execute("UPDATE {} SET value=value+1 WHERE id = 0".format(table_name))
while True:
result = node1.query("SELECT dictGetUInt32('{}', 'value', toUInt64(0))".format(dict_name))
if result != '0\n':
break
assert node1.query("SELECT dictGetUInt32('{}', 'value', toUInt64(0))".format(dict_name)) == '1\n'
# no update should happen
cursor.execute("UPDATE {} SET value=value*2 WHERE id != 0".format(table_name))
time.sleep(5)
assert node1.query("SELECT dictGetUInt32('{}', 'value', toUInt64(0))".format(dict_name)) == '1\n'
assert node1.query("SELECT dictGetUInt32('{}', 'value', toUInt64(1))".format(dict_name)) == '1\n'
# update should happen
cursor.execute("UPDATE {} SET value=value+1 WHERE id = 0".format(table_name))
time.sleep(5)
assert node1.query("SELECT dictGetUInt32('{}', 'value', toUInt64(0))".format(dict_name)) == '2\n'
assert node1.query("SELECT dictGetUInt32('{}', 'value', toUInt64(1))".format(dict_name)) == '2\n'
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")
cluster.shutdown()

View File

@ -0,0 +1,189 @@
import pytest
import time
import psycopg2
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import assert_eq_with_retry
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=[], with_postgres=True)
postgres_table_template = """
CREATE TABLE IF NOT EXISTS {} (
id Integer NOT NULL, value Integer, PRIMARY KEY (id))
"""
def get_postgres_conn(database=False):
if database == True:
conn_string = "host='localhost' dbname='test_database' user='postgres' password='mysecretpassword'"
else:
conn_string = "host='localhost' user='postgres' password='mysecretpassword'"
conn = psycopg2.connect(conn_string)
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
conn.autocommit = True
return conn
def create_postgres_db(cursor, name):
cursor.execute("CREATE DATABASE {}".format(name))
def create_postgres_table(cursor, table_name):
# database was specified in connection string
cursor.execute(postgres_table_template.format(table_name))
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
conn = get_postgres_conn()
cursor = conn.cursor()
create_postgres_db(cursor, 'test_database')
yield cluster
finally:
cluster.shutdown()
def test_postgres_database_engine_with_postgres_ddl(started_cluster):
# connect to database as well
conn = get_postgres_conn(True)
cursor = conn.cursor()
node1.query(
"CREATE DATABASE test_database ENGINE = PostgreSQL('postgres1:5432', 'test_database', 'postgres', 'mysecretpassword')")
assert 'test_database' in node1.query('SHOW DATABASES')
create_postgres_table(cursor, 'test_table')
assert 'test_table' in node1.query('SHOW TABLES FROM test_database')
cursor.execute('ALTER TABLE test_table ADD COLUMN data Text')
assert 'data' in node1.query("SELECT name FROM system.columns WHERE table = 'test_table' AND database = 'test_database'")
cursor.execute('ALTER TABLE test_table DROP COLUMN data')
assert 'data' not in node1.query("SELECT name FROM system.columns WHERE table = 'test_table' AND database = 'test_database'")
node1.query("DROP DATABASE test_database")
assert 'test_database' not in node1.query('SHOW DATABASES')
def test_postgresql_database_engine_with_clickhouse_ddl(started_cluster):
conn = get_postgres_conn(True)
cursor = conn.cursor()
node1.query(
"CREATE DATABASE test_database ENGINE = PostgreSQL('postgres1:5432', 'test_database', 'postgres', 'mysecretpassword')")
create_postgres_table(cursor, 'test_table')
assert 'test_table' in node1.query('SHOW TABLES FROM test_database')
node1.query("DROP TABLE test_database.test_table")
assert 'test_table' not in node1.query('SHOW TABLES FROM test_database')
node1.query("ATTACH TABLE test_database.test_table")
assert 'test_table' in node1.query('SHOW TABLES FROM test_database')
node1.query("DETACH TABLE test_database.test_table")
assert 'test_table' not in node1.query('SHOW TABLES FROM test_database')
node1.query("ATTACH TABLE test_database.test_table")
assert 'test_table' in node1.query('SHOW TABLES FROM test_database')
node1.query("DROP DATABASE test_database")
assert 'test_database' not in node1.query('SHOW DATABASES')
def test_postgresql_database_engine_queries(started_cluster):
conn = get_postgres_conn(True)
cursor = conn.cursor()
node1.query(
"CREATE DATABASE test_database ENGINE = PostgreSQL('postgres1:5432', 'test_database', 'postgres', 'mysecretpassword')")
create_postgres_table(cursor, 'test_table')
assert node1.query("SELECT count() FROM test_database.test_table").rstrip() == '0'
node1.query("INSERT INTO test_database.test_table SELECT number, number from numbers(10000)")
assert node1.query("SELECT count() FROM test_database.test_table").rstrip() == '10000'
cursor.execute('DROP TABLE test_table;')
assert 'test_table' not in node1.query('SHOW TABLES FROM test_database')
node1.query("DROP DATABASE test_database")
assert 'test_database' not in node1.query('SHOW DATABASES')
def test_get_create_table_query_with_multidim_arrays(started_cluster):
conn = get_postgres_conn(True)
cursor = conn.cursor()
node1.query(
"CREATE DATABASE test_database ENGINE = PostgreSQL('postgres1:5432', 'test_database', 'postgres', 'mysecretpassword')")
cursor.execute("""
CREATE TABLE IF NOT EXISTS array_columns (
b Integer[][][] NOT NULL,
c Integer[][][]
)""")
node1.query("DETACH TABLE test_database.array_columns")
node1.query("ATTACH TABLE test_database.array_columns")
node1.query("INSERT INTO test_database.array_columns "
"VALUES ("
"[[[1, 1], [1, 1]], [[3, 3], [3, 3]], [[4, 4], [5, 5]]], "
"[[[1, NULL], [NULL, 1]], [[NULL, NULL], [NULL, NULL]], [[4, 4], [5, 5]]] "
")")
result = node1.query('''
SELECT * FROM test_database.array_columns''')
expected = (
"[[[1,1],[1,1]],[[3,3],[3,3]],[[4,4],[5,5]]]\t"
"[[[1,NULL],[NULL,1]],[[NULL,NULL],[NULL,NULL]],[[4,4],[5,5]]]\n"
)
assert(result == expected)
node1.query("DROP DATABASE test_database")
assert 'test_database' not in node1.query('SHOW DATABASES')
def test_postgresql_database_engine_table_cache(started_cluster):
conn = get_postgres_conn(True)
cursor = conn.cursor()
node1.query(
"CREATE DATABASE test_database ENGINE = PostgreSQL('postgres1:5432', 'test_database', 'postgres', 'mysecretpassword', 1)")
create_postgres_table(cursor, 'test_table')
assert node1.query('DESCRIBE TABLE test_database.test_table').rstrip() == 'id\tInt32\t\t\t\t\t\nvalue\tNullable(Int32)'
cursor.execute('ALTER TABLE test_table ADD COLUMN data Text')
assert node1.query('DESCRIBE TABLE test_database.test_table').rstrip() == 'id\tInt32\t\t\t\t\t\nvalue\tNullable(Int32)'
node1.query("DETACH TABLE test_database.test_table")
assert 'test_table' not in node1.query('SHOW TABLES FROM test_database')
node1.query("ATTACH TABLE test_database.test_table")
assert 'test_table' in node1.query('SHOW TABLES FROM test_database')
assert node1.query('DESCRIBE TABLE test_database.test_table').rstrip() == 'id\tInt32\t\t\t\t\t\nvalue\tNullable(Int32)\t\t\t\t\t\ndata\tNullable(String)'
node1.query("DROP TABLE test_database.test_table")
assert 'test_table' not in node1.query('SHOW TABLES FROM test_database')
node1.query("ATTACH TABLE test_database.test_table")
assert 'test_table' in node1.query('SHOW TABLES FROM test_database')
node1.query("INSERT INTO test_database.test_table SELECT number, number, toString(number) from numbers(10000)")
assert node1.query("SELECT count() FROM test_database.test_table").rstrip() == '10000'
cursor.execute('DROP TABLE test_table;')
assert 'test_table' not in node1.query('SHOW TABLES FROM test_database')
node1.query("DROP DATABASE test_database")
assert 'test_database' not in node1.query('SHOW DATABASES')
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")
cluster.shutdown()

View File

@ -0,0 +1,138 @@
import time
import pytest
import psycopg2
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import assert_eq_with_retry
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=[], with_postgres=True)
def get_postgres_conn(database=False):
if database == True:
conn_string = "host='localhost' dbname='clickhouse' user='postgres' password='mysecretpassword'"
else:
conn_string = "host='localhost' user='postgres' password='mysecretpassword'"
conn = psycopg2.connect(conn_string)
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
conn.autocommit = True
return conn
def create_postgres_db(conn, name):
cursor = conn.cursor()
cursor.execute("CREATE DATABASE {}".format(name))
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
postgres_conn = get_postgres_conn()
print("postgres connected")
create_postgres_db(postgres_conn, 'clickhouse')
yield cluster
finally:
cluster.shutdown()
def test_postgres_select_insert(started_cluster):
conn = get_postgres_conn(True)
cursor = conn.cursor()
table_name = 'test_many'
table = '''postgresql('postgres1:5432', 'clickhouse', '{}', 'postgres', 'mysecretpassword')'''.format(table_name)
cursor.execute('CREATE TABLE IF NOT EXISTS {} (a integer, b text, c integer)'.format(table_name))
result = node1.query('''
INSERT INTO TABLE FUNCTION {}
SELECT number, concat('name_', toString(number)), 3 from numbers(10000)'''.format(table))
check1 = "SELECT count() FROM {}".format(table)
check2 = "SELECT Sum(c) FROM {}".format(table)
check3 = "SELECT count(c) FROM {} WHERE a % 2 == 0".format(table)
check4 = "SELECT count() FROM {} WHERE b LIKE concat('name_', toString(1))".format(table)
assert (node1.query(check1)).rstrip() == '10000'
assert (node1.query(check2)).rstrip() == '30000'
assert (node1.query(check3)).rstrip() == '5000'
assert (node1.query(check4)).rstrip() == '1'
def test_postgres_conversions(started_cluster):
conn = get_postgres_conn(True)
cursor = conn.cursor()
cursor.execute(
'''CREATE TABLE IF NOT EXISTS test_types (
a smallint, b integer, c bigint, d real, e double precision, f serial, g bigserial,
h timestamp, i date, j numeric(5, 5), k decimal(5, 5))''')
node1.query('''
INSERT INTO TABLE FUNCTION postgresql('postgres1:5432', 'clickhouse', 'test_types', 'postgres', 'mysecretpassword') VALUES
(-32768, -2147483648, -9223372036854775808, 1.12345, 1.1234567890, 2147483647, 9223372036854775807, '2000-05-12 12:12:12', '2000-05-12', 0.2, 0.2)''')
result = node1.query('''
SELECT * FROM postgresql('postgres1:5432', 'clickhouse', 'test_types', 'postgres', 'mysecretpassword')''')
assert(result == '-32768\t-2147483648\t-9223372036854775808\t1.12345\t1.123456789\t2147483647\t9223372036854775807\t2000-05-12 12:12:12\t2000-05-12\t0.20000\t0.20000\n')
cursor.execute(
'''CREATE TABLE IF NOT EXISTS test_array_dimensions
(
a Date[] NOT NULL, -- Date
b Timestamp[] NOT NULL, -- DateTime
c real[][] NOT NULL, -- Float32
d double precision[][] NOT NULL, -- Float64
e decimal(5, 5)[][][] NOT NULL, -- Decimal32
f integer[][][] NOT NULL, -- Int32
g Text[][][][][] NOT NULL, -- String
h Integer[][][], -- Nullable(Int32)
i Char(2)[][][][], -- Nullable(String)
k Char(2)[] -- Nullable(String)
)''')
result = node1.query('''
DESCRIBE TABLE postgresql('postgres1:5432', 'clickhouse', 'test_array_dimensions', 'postgres', 'mysecretpassword')''')
expected = ('a\tArray(Date)\t\t\t\t\t\n' +
'b\tArray(DateTime)\t\t\t\t\t\n' +
'c\tArray(Array(Float32))\t\t\t\t\t\n' +
'd\tArray(Array(Float64))\t\t\t\t\t\n' +
'e\tArray(Array(Array(Decimal(5, 5))))\t\t\t\t\t\n' +
'f\tArray(Array(Array(Int32)))\t\t\t\t\t\n' +
'g\tArray(Array(Array(Array(Array(String)))))\t\t\t\t\t\n' +
'h\tArray(Array(Array(Nullable(Int32))))\t\t\t\t\t\n' +
'i\tArray(Array(Array(Array(Nullable(String)))))\t\t\t\t\t\n' +
'k\tArray(Nullable(String))'
)
assert(result.rstrip() == expected)
node1.query("INSERT INTO TABLE FUNCTION postgresql('postgres1:5432', 'clickhouse', 'test_array_dimensions', 'postgres', 'mysecretpassword') "
"VALUES ("
"['2000-05-12', '2000-05-12'], "
"['2000-05-12 12:12:12', '2000-05-12 12:12:12'], "
"[[1.12345], [1.12345], [1.12345]], "
"[[1.1234567891], [1.1234567891], [1.1234567891]], "
"[[[0.11111, 0.11111]], [[0.22222, 0.22222]], [[0.33333, 0.33333]]], "
"[[[1, 1], [1, 1]], [[3, 3], [3, 3]], [[4, 4], [5, 5]]], "
"[[[[['winx', 'winx', 'winx']]]]], "
"[[[1, NULL], [NULL, 1]], [[NULL, NULL], [NULL, NULL]], [[4, 4], [5, 5]]], "
"[[[[NULL]]]], "
"[]"
")")
result = node1.query('''
SELECT * FROM postgresql('postgres1:5432', 'clickhouse', 'test_array_dimensions', 'postgres', 'mysecretpassword')''')
expected = (
"['2000-05-12','2000-05-12']\t" +
"['2000-05-12 12:12:12','2000-05-12 12:12:12']\t" +
"[[1.12345],[1.12345],[1.12345]]\t" +
"[[1.1234567891],[1.1234567891],[1.1234567891]]\t" +
"[[[0.11111,0.11111]],[[0.22222,0.22222]],[[0.33333,0.33333]]]\t"
"[[[1,1],[1,1]],[[3,3],[3,3]],[[4,4],[5,5]]]\t"
"[[[[['winx','winx','winx']]]]]\t"
"[[[1,NULL],[NULL,1]],[[NULL,NULL],[NULL,NULL]],[[4,4],[5,5]]]\t"
"[[[[NULL]]]]\t"
"[]\n"
)
assert(result == expected)
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")
cluster.shutdown()

View File

@ -108,6 +108,7 @@ URL [] GLOBAL SOURCES
REMOTE [] GLOBAL SOURCES
MONGO [] GLOBAL SOURCES
MYSQL [] GLOBAL SOURCES
POSTGRES [] GLOBAL SOURCES
ODBC [] GLOBAL SOURCES
JDBC [] GLOBAL SOURCES
HDFS [] GLOBAL SOURCES

View File

@ -42,6 +42,9 @@ LAYOUT(COMPLEX_KEY_SSD_CACHE(FILE_SIZE 8192 PATH '/var/lib/clickhouse/clickhouse
SELECT 'TEST_SMALL';
SELECT 'VALUE FROM RAM BUFFER';
-- NUMBER_OF_ARGUMENTS_DOESNT_MATCH
SELECT dictHas('database_for_dict.ssd_dict', 'a', tuple('1')); -- { serverError 42 }
SELECT dictGetUInt64('database_for_dict.ssd_dict', 'a', tuple('1', toInt32(3)));
SELECT dictGetInt32('database_for_dict.ssd_dict', 'b', tuple('1', toInt32(3)));
SELECT dictGetString('database_for_dict.ssd_dict', 'c', tuple('1', toInt32(3)));