Merge branch 'master' into idisk-seekable-readbuffer

# Conflicts:
#	dbms/programs/server/config.xml
This commit is contained in:
Pavel Kovalenko 2020-01-27 21:45:53 +03:00
commit 07755ec8f7
293 changed files with 6536 additions and 1191 deletions

View File

@ -7,16 +7,20 @@ Changelog category (leave one):
- Performance Improvement
- Backward Incompatible Change
- Build/Testing/Packaging Improvement
- Documentation
- Documentation (changelog entry is not required)
- Other
- Non-significant (changelog entry is not needed)
- Non-significant (changelog entry is not required)
Changelog entry (up to few sentences, required except for Non-significant/Documentation categories):
Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):
...
Detailed description (optional):
Detailed description / Documentation draft:
...
By adding documentation, you'll allow users to try your new feature immediately, not when someone else will have time to document it later. Documentation is necessary for all features that affect user experience in any way. You can add brief documentation draft above, or add documentation right into your patch as Markdown files in [docs](https://github.com/ClickHouse/ClickHouse/tree/master/docs) folder.
If you are doing this for the first time, it's recommended to read the lightweight [Contributing to ClickHouse Documentation](https://github.com/ClickHouse/ClickHouse/tree/master/docs/README.md) guide first.

4
.gitmodules vendored
View File

@ -140,3 +140,7 @@
[submodule "contrib/ryu"]
path = contrib/ryu
url = https://github.com/ClickHouse-Extras/ryu.git
[submodule "contrib/avro"]
path = contrib/avro
url = https://github.com/ClickHouse-Extras/avro.git
ignore = untracked

45
AUTHORS
View File

@ -1,43 +1,2 @@
The following authors have created the source code of "ClickHouse"
published and distributed by YANDEX LLC as the owner:
Alexander Makarov <asealback@yandex-team.ru>
Alexander Prudaev <aprudaev@yandex-team.ru>
Alexey Arno <af-arno@yandex-team.ru>
Alexey Milovidov <milovidov@yandex-team.ru>
Alexey Tronov <vkusny@yandex-team.ru>
Alexey Vasiliev <loudhorr@yandex-team.ru>
Alexey Zatelepin <ztlpn@yandex-team.ru>
Amy Krishnevsky <krishnevsky@yandex-team.ru>
Andrey M <hertz@yandex-team.ru>
Andrey Mironov <hertz@yandex-team.ru>
Andrey Urusov <drobus@yandex-team.ru>
Anton Tikhonov <rokerjoker@yandex-team.ru>
Dmitry Bilunov <kmeaw@yandex-team.ru>
Dmitry Galuza <galuza@yandex-team.ru>
Eugene Konkov <konkov@yandex-team.ru>
Evgeniy Gatov <egatov@yandex-team.ru>
Ilya Khomutov <robert@yandex-team.ru>
Ilya Korolev <breeze@yandex-team.ru>
Ivan Blinkov <blinkov@yandex-team.ru>
Maxim Nikulin <mnikulin@yandex-team.ru>
Michael Kolupaev <mkolupaev@yandex-team.ru>
Michael Razuvaev <razuvaev@yandex-team.ru>
Nikolai Kochetov <nik-kochetov@yandex-team.ru>
Nikolay Vasiliev <lonlylocly@yandex-team.ru>
Nikolay Volosatov <bamx23@yandex-team.ru>
Pavel Artemkin <stanly@yandex-team.ru>
Pavel Kartaviy <kartavyy@yandex-team.ru>
Roman Nozdrin <drrtuy@yandex-team.ru>
Roman Peshkurov <peshkurov@yandex-team.ru>
Sergey Fedorov <fets@yandex-team.ru>
Sergey Lazarev <hamilkar@yandex-team.ru>
Sergey Magidovich <mgsergio@yandex-team.ru>
Sergey Serebryanik <serebrserg@yandex-team.ru>
Sergey Veletskiy <velom@yandex-team.ru>
Vasily Okunev <okunev@yandex-team.ru>
Vitaliy Lyudvichenko <vludv@yandex-team.ru>
Vladimir Chebotarev <chebotarev@yandex-team.ru>
Vsevolod Orlov <vorloff@yandex-team.ru>
Vyacheslav Alipov <alipov@yandex-team.ru>
Yuriy Galitskiy <orantius@yandex-team.ru>
To see the list of authors who created the source code of ClickHouse, published and distributed by YANDEX LLC as the owner,
run "SELECT * FROM system.contributors;" query on any ClickHouse server.

View File

@ -352,7 +352,7 @@ include (cmake/find/simdjson.cmake)
include (cmake/find/rapidjson.cmake)
include (cmake/find/fastops.cmake)
include (cmake/find/orc.cmake)
include (cmake/find/replxx.cmake)
include (cmake/find/avro.cmake)
find_contrib_lib(cityhash)
find_contrib_lib(farmhash)

View File

@ -1,24 +1,30 @@
# Contributing to ClickHouse
## Technical info
Developer guide for writing code for ClickHouse is published on official website alongside the usage and operations documentation:
https://clickhouse.yandex/docs/en/development/architecture/
ClickHouse is an open project, and you can contribute to it in many ways. You can help with ideas, code, or documentation. We appreciate any efforts that help us to make the project better.
## Legal info
Thank you.
In order for us (YANDEX LLC) to accept patches and other contributions from you, you will have to adopt our Yandex Contributor License Agreement (the "**CLA**"). The current version of the CLA you may find here:
## Technical Info
We have a [developer's guide](https://clickhouse.yandex/docs/en/development/developer_instruction/) for writing code for ClickHouse. Besides this guide, you can find [Overview of ClickHouse Architecture](https://clickhouse.yandex/docs/en/development/architecture/) and instructions on how to build ClickHouse in different environments.
If you want to contribute to documentation, read the [Contributing to ClickHouse Documentation](docs/README.md) guide.
## Legal Info
In order for us (YANDEX LLC) to accept patches and other contributions from you, you may adopt our Yandex Contributor License Agreement (the "**CLA**"). The current version of the CLA you may find here:
1) https://yandex.ru/legal/cla/?lang=en (in English) and
2) https://yandex.ru/legal/cla/?lang=ru (in Russian).
By adopting the CLA, you state the following:
* You obviously wish and are willingly licensing your contributions to us for our open source projects under the terms of the CLA,
* You has read the terms and conditions of the CLA and agree with them in full,
* You have read the terms and conditions of the CLA and agree with them in full,
* You are legally able to provide and license your contributions as stated,
* We may use your contributions for our open source projects and for any other our project too,
* We rely on your assurances concerning the rights of third parties in relation to your contributes.
* We rely on your assurances concerning the rights of third parties in relation to your contributions.
If you agree with these principles, please read and adopt our CLA. By providing us your contributions, you hereby declare that you has already read and adopt our CLA, and we may freely merge your contributions with our corresponding open source project and use it in further in accordance with terms and conditions of the CLA.
If you agree with these principles, please read and adopt our CLA. By providing us your contributions, you hereby declare that you have already read and adopt our CLA, and we may freely merge your contributions with our corresponding open source project and use it in further in accordance with terms and conditions of the CLA.
If you have already adopted terms and conditions of the CLA, you are able to provide your contributes. When you submit your pull request, please add the following information into it:
@ -31,4 +37,7 @@ Replace the bracketed text as follows:
It is enough to provide us such notification once.
If you don't agree with the CLA, you still can open a pull request to provide your contributions.
As an alternative, you can provide DCO instead of CLA. You can find the text of DCO here: https://developercertificate.org/
It is enough to read and copy it verbatim to your pull request.
If you don't agree with the CLA and don't want to provide DCO, you still can open a pull request to provide your contributions.

View File

@ -1,4 +1,4 @@
Copyright 2016-2019 Yandex LLC
Copyright 2016-2020 Yandex LLC
Apache License
Version 2.0, January 2004
@ -188,7 +188,7 @@ Copyright 2016-2019 Yandex LLC
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright 2016-2019 Yandex LLC
Copyright 2016-2020 Yandex LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.

28
cmake/find/avro.cmake Normal file
View File

@ -0,0 +1,28 @@
option (ENABLE_AVRO "Enable Avro" ${ENABLE_LIBRARIES})
if (ENABLE_AVRO)
option (USE_INTERNAL_AVRO_LIBRARY "Set to FALSE to use system avro library instead of bundled" ${NOT_UNBUNDLED})
if(NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/avro/lang/c++/CMakeLists.txt")
if(USE_INTERNAL_AVRO_LIBRARY)
message(WARNING "submodule contrib/avro is missing. to fix try run: \n git submodule update --init --recursive")
endif()
set(MISSING_INTERNAL_AVRO_LIBRARY 1)
set(USE_INTERNAL_AVRO_LIBRARY 0)
endif()
if (NOT USE_INTERNAL_AVRO_LIBRARY)
elseif(NOT MISSING_INTERNAL_AVRO_LIBRARY)
include(cmake/find/snappy.cmake)
set(AVROCPP_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/avro/lang/c++/include")
set(AVROCPP_LIBRARY avrocpp)
endif ()
if (AVROCPP_LIBRARY AND AVROCPP_INCLUDE_DIR)
set(USE_AVRO 1)
endif()
endif()
message (STATUS "Using avro=${USE_AVRO}: ${AVROCPP_INCLUDE_DIR} : ${AVROCPP_LIBRARY}")

View File

@ -31,6 +31,7 @@ if (NOT Boost_SYSTEM_LIBRARY AND NOT MISSING_INTERNAL_BOOST_LIBRARY)
set (Boost_SYSTEM_LIBRARY boost_system_internal)
set (Boost_PROGRAM_OPTIONS_LIBRARY boost_program_options_internal)
set (Boost_FILESYSTEM_LIBRARY boost_filesystem_internal ${Boost_SYSTEM_LIBRARY})
set (Boost_IOSTREAMS_LIBRARY boost_iostreams_internal)
set (Boost_REGEX_LIBRARY boost_regex_internal)
set (Boost_INCLUDE_DIRS)
@ -48,4 +49,4 @@ if (NOT Boost_SYSTEM_LIBRARY AND NOT MISSING_INTERNAL_BOOST_LIBRARY)
list (APPEND Boost_INCLUDE_DIRS "${ClickHouse_SOURCE_DIR}/contrib/boost")
endif ()
message (STATUS "Using Boost: ${Boost_INCLUDE_DIRS} : ${Boost_PROGRAM_OPTIONS_LIBRARY},${Boost_SYSTEM_LIBRARY},${Boost_FILESYSTEM_LIBRARY},${Boost_REGEX_LIBRARY}")
message (STATUS "Using Boost: ${Boost_INCLUDE_DIRS} : ${Boost_PROGRAM_OPTIONS_LIBRARY},${Boost_SYSTEM_LIBRARY},${Boost_FILESYSTEM_LIBRARY},${Boost_IOSTREAMS_LIBRARY},${Boost_REGEX_LIBRARY}")

View File

@ -14,6 +14,7 @@ if (NOT ENABLE_LIBRARIES)
set (ENABLE_POCO_REDIS ${ENABLE_LIBRARIES} CACHE BOOL "")
set (ENABLE_POCO_ODBC ${ENABLE_LIBRARIES} CACHE BOOL "")
set (ENABLE_POCO_SQL ${ENABLE_LIBRARIES} CACHE BOOL "")
set (ENABLE_POCO_JSON ${ENABLE_LIBRARIES} CACHE BOOL "")
endif ()
set (POCO_COMPONENTS Net XML SQL Data)
@ -34,6 +35,9 @@ if (NOT DEFINED ENABLE_POCO_ODBC OR ENABLE_POCO_ODBC)
list (APPEND POCO_COMPONENTS DataODBC)
list (APPEND POCO_COMPONENTS SQLODBC)
endif ()
if (NOT DEFINED ENABLE_POCO_JSON OR ENABLE_POCO_JSON)
list (APPEND POCO_COMPONENTS JSON)
endif ()
if (NOT USE_INTERNAL_POCO_LIBRARY)
find_package (Poco COMPONENTS ${POCO_COMPONENTS})
@ -112,6 +116,11 @@ elseif (NOT MISSING_INTERNAL_POCO_LIBRARY)
endif ()
endif ()
if (NOT DEFINED ENABLE_POCO_JSON OR ENABLE_POCO_JSON)
set (Poco_JSON_LIBRARY PocoJSON)
set (Poco_JSON_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/poco/JSON/include/")
endif ()
if (OPENSSL_FOUND AND (NOT DEFINED ENABLE_POCO_NETSSL OR ENABLE_POCO_NETSSL))
set (Poco_NetSSL_LIBRARY PocoNetSSL ${OPENSSL_LIBRARIES})
set (Poco_Crypto_LIBRARY PocoCrypto ${OPENSSL_LIBRARIES})
@ -145,8 +154,11 @@ endif ()
if (Poco_SQLODBC_LIBRARY AND ODBC_FOUND)
set (USE_POCO_SQLODBC 1)
endif ()
if (Poco_JSON_LIBRARY)
set (USE_POCO_JSON 1)
endif ()
message(STATUS "Using Poco: ${Poco_INCLUDE_DIRS} : ${Poco_Foundation_LIBRARY},${Poco_Util_LIBRARY},${Poco_Net_LIBRARY},${Poco_NetSSL_LIBRARY},${Poco_Crypto_LIBRARY},${Poco_XML_LIBRARY},${Poco_Data_LIBRARY},${Poco_DataODBC_LIBRARY},${Poco_SQL_LIBRARY},${Poco_SQLODBC_LIBRARY},${Poco_MongoDB_LIBRARY},${Poco_Redis_LIBRARY}; MongoDB=${USE_POCO_MONGODB}, Redis=${USE_POCO_REDIS}, DataODBC=${USE_POCO_DATAODBC}, NetSSL=${USE_POCO_NETSSL}")
message(STATUS "Using Poco: ${Poco_INCLUDE_DIRS} : ${Poco_Foundation_LIBRARY},${Poco_Util_LIBRARY},${Poco_Net_LIBRARY},${Poco_NetSSL_LIBRARY},${Poco_Crypto_LIBRARY},${Poco_XML_LIBRARY},${Poco_Data_LIBRARY},${Poco_DataODBC_LIBRARY},${Poco_SQL_LIBRARY},${Poco_SQLODBC_LIBRARY},${Poco_MongoDB_LIBRARY},${Poco_Redis_LIBRARY},${Poco_JSON_LIBRARY}; MongoDB=${USE_POCO_MONGODB}, Redis=${USE_POCO_REDIS}, DataODBC=${USE_POCO_DATAODBC}, NetSSL=${USE_POCO_NETSSL}, JSON=${USE_POCO_JSON}")
# How to make sutable poco:
# use branch:

View File

@ -1,40 +0,0 @@
option (ENABLE_REPLXX "Enable replxx support" ${NOT_UNBUNDLED})
if (ENABLE_REPLXX)
option (USE_INTERNAL_REPLXX "Use internal replxx library" ${NOT_UNBUNDLED})
if (USE_INTERNAL_REPLXX AND NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/replxx/README.md")
message (WARNING "submodule contrib/replxx is missing. to fix try run: \n git submodule update --init --recursive")
set (USE_INTERNAL_REPLXX 0)
endif ()
if (NOT USE_INTERNAL_REPLXX)
find_library(LIBRARY_REPLXX NAMES replxx replxx-static)
find_path(INCLUDE_REPLXX replxx.hxx)
add_library(replxx UNKNOWN IMPORTED)
set_property(TARGET replxx PROPERTY IMPORTED_LOCATION ${LIBRARY_REPLXX})
target_include_directories(replxx PUBLIC ${INCLUDE_REPLXX})
set(CMAKE_REQUIRED_LIBRARIES replxx)
check_cxx_source_compiles(
"
#include <replxx.hxx>
int main() {
replxx::Replxx rx;
}
"
EXTERNAL_REPLXX_WORKS
)
if (NOT EXTERNAL_REPLXX_WORKS)
message (FATAL_ERROR "replxx is unusable: ${LIBRARY_REPLXX} ${INCLUDE_REPLXX}")
endif ()
endif ()
set(USE_REPLXX 1)
message (STATUS "Using replxx")
else ()
set(USE_REPLXX 0)
endif ()

View File

@ -53,6 +53,7 @@ if (SANITIZE)
set (USE_CAPNP 0 CACHE BOOL "")
set (USE_INTERNAL_ORC_LIBRARY 0 CACHE BOOL "")
set (USE_ORC 0 CACHE BOOL "")
set (USE_AVRO 0 CACHE BOOL "")
set (ENABLE_SSL 0 CACHE BOOL "")
elseif (SANITIZE STREQUAL "thread")

View File

@ -146,6 +146,20 @@ if (ENABLE_ICU AND USE_INTERNAL_ICU_LIBRARY)
add_subdirectory (icu-cmake)
endif ()
if(USE_INTERNAL_SNAPPY_LIBRARY)
set(SNAPPY_BUILD_TESTS 0 CACHE INTERNAL "")
if (NOT MAKE_STATIC_LIBRARIES)
set(BUILD_SHARED_LIBS 1) # TODO: set at root dir
endif()
add_subdirectory(snappy)
set (SNAPPY_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/snappy")
if(SANITIZE STREQUAL "undefined")
target_compile_options(${SNAPPY_LIBRARY} PRIVATE -fno-sanitize=undefined)
endif()
endif()
if (USE_INTERNAL_PARQUET_LIBRARY)
if (USE_INTERNAL_PARQUET_LIBRARY_NATIVE_CMAKE)
# We dont use arrow's cmakefiles because they uses too many depends and download some libs in compile time
@ -189,20 +203,6 @@ if (USE_INTERNAL_PARQUET_LIBRARY_NATIVE_CMAKE)
endif()
else()
if(USE_INTERNAL_SNAPPY_LIBRARY)
set(SNAPPY_BUILD_TESTS 0 CACHE INTERNAL "")
if (NOT MAKE_STATIC_LIBRARIES)
set(BUILD_SHARED_LIBS 1) # TODO: set at root dir
endif()
add_subdirectory(snappy)
set (SNAPPY_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/snappy")
if(SANITIZE STREQUAL "undefined")
target_compile_options(${SNAPPY_LIBRARY} PRIVATE -fno-sanitize=undefined)
endif()
endif()
add_subdirectory(arrow-cmake)
# The library is large - avoid bloat.
@ -212,6 +212,10 @@ else()
endif()
endif()
if (USE_INTERNAL_AVRO_LIBRARY)
add_subdirectory(avro-cmake)
endif()
if (USE_INTERNAL_POCO_LIBRARY)
set (POCO_VERBOSE_MESSAGES 0 CACHE INTERNAL "")
set (save_CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS})
@ -332,6 +336,4 @@ if (USE_FASTOPS)
add_subdirectory (fastops-cmake)
endif()
if (USE_INTERNAL_REPLXX)
add_subdirectory (replxx-cmake)
endif()
add_subdirectory(replxx-cmake)

1
contrib/avro vendored Submodule

@ -0,0 +1 @@
Subproject commit 5b2752041c8d2f75eb5c1dbec8b4c25fc0e24d12

View File

@ -0,0 +1,70 @@
set(AVROCPP_ROOT_DIR ${CMAKE_SOURCE_DIR}/contrib/avro/lang/c++)
set(AVROCPP_INCLUDE_DIR ${AVROCPP_ROOT_DIR}/api)
set(AVROCPP_SOURCE_DIR ${AVROCPP_ROOT_DIR}/impl)
set (CMAKE_CXX_STANDARD 17)
if (EXISTS ${AVROCPP_ROOT_DIR}/../../share/VERSION.txt)
file(READ "${AVROCPP_ROOT_DIR}/../../share/VERSION.txt"
AVRO_VERSION)
endif()
string(REPLACE "\n" "" AVRO_VERSION ${AVRO_VERSION})
set (AVRO_VERSION_MAJOR ${AVRO_VERSION})
set (AVRO_VERSION_MINOR "0")
set (AVROCPP_SOURCE_FILES
${AVROCPP_SOURCE_DIR}/Compiler.cc
${AVROCPP_SOURCE_DIR}/Node.cc
${AVROCPP_SOURCE_DIR}/LogicalType.cc
${AVROCPP_SOURCE_DIR}/NodeImpl.cc
${AVROCPP_SOURCE_DIR}/ResolverSchema.cc
${AVROCPP_SOURCE_DIR}/Schema.cc
${AVROCPP_SOURCE_DIR}/Types.cc
${AVROCPP_SOURCE_DIR}/ValidSchema.cc
${AVROCPP_SOURCE_DIR}/Zigzag.cc
${AVROCPP_SOURCE_DIR}/BinaryEncoder.cc
${AVROCPP_SOURCE_DIR}/BinaryDecoder.cc
${AVROCPP_SOURCE_DIR}/Stream.cc
${AVROCPP_SOURCE_DIR}/FileStream.cc
${AVROCPP_SOURCE_DIR}/Generic.cc
${AVROCPP_SOURCE_DIR}/GenericDatum.cc
${AVROCPP_SOURCE_DIR}/DataFile.cc
${AVROCPP_SOURCE_DIR}/parsing/Symbol.cc
${AVROCPP_SOURCE_DIR}/parsing/ValidatingCodec.cc
${AVROCPP_SOURCE_DIR}/parsing/JsonCodec.cc
${AVROCPP_SOURCE_DIR}/parsing/ResolvingDecoder.cc
${AVROCPP_SOURCE_DIR}/json/JsonIO.cc
${AVROCPP_SOURCE_DIR}/json/JsonDom.cc
${AVROCPP_SOURCE_DIR}/Resolver.cc
${AVROCPP_SOURCE_DIR}/Validator.cc
)
add_library (avrocpp ${AVROCPP_SOURCE_FILES})
set_target_properties (avrocpp PROPERTIES VERSION ${AVRO_VERSION_MAJOR}.${AVRO_VERSION_MINOR})
target_include_directories(avrocpp SYSTEM PUBLIC ${AVROCPP_INCLUDE_DIR})
target_include_directories(avrocpp SYSTEM PUBLIC ${Boost_INCLUDE_DIRS})
target_link_libraries (avrocpp ${Boost_IOSTREAMS_LIBRARY})
if (SNAPPY_INCLUDE_DIR AND SNAPPY_LIBRARY)
target_compile_definitions (avrocpp PUBLIC SNAPPY_CODEC_AVAILABLE)
target_include_directories (avrocpp PRIVATE ${SNAPPY_INCLUDE_DIR})
target_link_libraries (avrocpp ${SNAPPY_LIBRARY})
endif ()
if (COMPILER_GCC)
set (SUPPRESS_WARNINGS -Wno-non-virtual-dtor)
elseif (COMPILER_CLANG)
set (SUPPRESS_WARNINGS -Wno-non-virtual-dtor)
endif ()
target_compile_options(avrocpp PRIVATE ${SUPPRESS_WARNINGS})
# create a symlink to include headers with <avro/...>
ADD_CUSTOM_TARGET(avro_symlink_headers ALL
COMMAND ${CMAKE_COMMAND} -E make_directory ${AVROCPP_ROOT_DIR}/include
COMMAND ${CMAKE_COMMAND} -E create_symlink ${AVROCPP_ROOT_DIR}/api ${AVROCPP_ROOT_DIR}/include/avro
)
add_dependencies(avrocpp avro_symlink_headers)

2
contrib/boost vendored

@ -1 +1 @@
Subproject commit 830e51edb59c4f37a8638138581e1e56c29ac44f
Subproject commit 86be2aef20bee2356b744e5569eed6eaded85dbe

View File

@ -37,3 +37,8 @@ target_link_libraries(boost_filesystem_internal PRIVATE boost_system_internal)
if (USE_INTERNAL_PARQUET_LIBRARY)
add_boost_lib(regex)
endif()
if (USE_INTERNAL_AVRO_LIBRARY)
add_boost_lib(iostreams)
target_link_libraries(boost_iostreams_internal PUBLIC ${ZLIB_LIBRARIES})
endif()

View File

@ -23,6 +23,10 @@ typedef unsigned __int64 uint64_t;
#endif // !defined(_MSC_VER)
#ifdef __cplusplus
extern "C" {
#endif
//-----------------------------------------------------------------------------
void MurmurHash3_x86_32 ( const void * key, int len, uint32_t seed, void * out );
@ -32,3 +36,7 @@ void MurmurHash3_x86_128 ( const void * key, int len, uint32_t seed, void * out
void MurmurHash3_x64_128 ( const void * key, int len, uint32_t seed, void * out );
//-----------------------------------------------------------------------------
#ifdef __cplusplus
}
#endif

View File

@ -1,18 +1,57 @@
set (LIBRARY_DIR "${ClickHouse_SOURCE_DIR}/contrib/replxx")
option (ENABLE_REPLXX "Enable replxx support" ${ENABLE_LIBRARIES})
set(SRCS
${LIBRARY_DIR}/src/conversion.cxx
${LIBRARY_DIR}/src/escape.cxx
${LIBRARY_DIR}/src/history.cxx
${LIBRARY_DIR}/src/io.cxx
${LIBRARY_DIR}/src/prompt.cxx
${LIBRARY_DIR}/src/replxx.cxx
${LIBRARY_DIR}/src/replxx_impl.cxx
${LIBRARY_DIR}/src/util.cxx
${LIBRARY_DIR}/src/wcwidth.cpp
${LIBRARY_DIR}/src/ConvertUTF.cpp
)
if (ENABLE_REPLXX)
option (USE_INTERNAL_REPLXX "Use internal replxx library" ${NOT_UNBUNDLED})
add_library(replxx ${SRCS})
target_include_directories(replxx PUBLIC ${LIBRARY_DIR}/include)
target_compile_options(replxx PUBLIC -Wno-documentation)
if (USE_INTERNAL_REPLXX)
set (LIBRARY_DIR "${ClickHouse_SOURCE_DIR}/contrib/replxx")
set(SRCS
${LIBRARY_DIR}/src/conversion.cxx
${LIBRARY_DIR}/src/ConvertUTF.cpp
${LIBRARY_DIR}/src/escape.cxx
${LIBRARY_DIR}/src/history.cxx
${LIBRARY_DIR}/src/io.cxx
${LIBRARY_DIR}/src/prompt.cxx
${LIBRARY_DIR}/src/replxx_impl.cxx
${LIBRARY_DIR}/src/replxx.cxx
${LIBRARY_DIR}/src/util.cxx
${LIBRARY_DIR}/src/wcwidth.cpp
)
add_library (replxx ${SRCS})
target_include_directories(replxx PUBLIC ${LIBRARY_DIR}/include)
else ()
find_library(LIBRARY_REPLXX NAMES replxx replxx-static)
find_path(INCLUDE_REPLXX replxx.hxx)
add_library(replxx UNKNOWN IMPORTED)
set_property(TARGET replxx PROPERTY IMPORTED_LOCATION ${LIBRARY_REPLXX})
target_include_directories(replxx PUBLIC ${INCLUDE_REPLXX})
set(CMAKE_REQUIRED_LIBRARIES replxx)
check_cxx_source_compiles(
"
#include <replxx.hxx>
int main() {
replxx::Replxx rx;
}
"
EXTERNAL_REPLXX_WORKS
)
if (NOT EXTERNAL_REPLXX_WORKS)
message (FATAL_ERROR "replxx is unusable: ${LIBRARY_REPLXX} ${INCLUDE_REPLXX}")
endif ()
endif ()
target_compile_options(replxx PUBLIC -Wno-documentation)
target_compile_definitions(replxx PUBLIC USE_REPLXX=1)
message (STATUS "Using replxx")
else ()
add_library(replxx INTERFACE)
target_compile_definitions(replxx INTERFACE USE_REPLXX=0)
message (STATUS "Not using replxx (Beware! Runtime fallback to readline is possible!)")
endif ()

View File

@ -177,7 +177,7 @@ elseif (COMPILER_GCC)
# Warn for suspicious length parameters to certain string and memory built-in functions if the argument uses sizeof
add_cxx_compile_options(-Wsizeof-pointer-memaccess)
# Warn about overriding virtual functions that are not marked with the override keyword
# add_cxx_compile_options(-Wsuggest-override)
add_cxx_compile_options(-Wsuggest-override)
# Warn whenever a switch statement has an index of boolean type and the case values are outside the range of a boolean type
add_cxx_compile_options(-Wswitch-bool)
# Warn if a self-comparison always evaluates to true or false
@ -504,6 +504,10 @@ if (USE_POCO_NETSSL)
dbms_target_link_libraries (PRIVATE ${Poco_NetSSL_LIBRARY} ${Poco_Crypto_LIBRARY})
endif()
if (USE_POCO_JSON)
dbms_target_link_libraries (PRIVATE ${Poco_JSON_LIBRARY})
endif()
dbms_target_link_libraries (PRIVATE ${Poco_Foundation_LIBRARY})
if (USE_ICU)
@ -522,6 +526,11 @@ if (USE_PARQUET)
endif ()
endif ()
if (USE_AVRO)
dbms_target_link_libraries(PRIVATE ${AVROCPP_LIBRARY})
dbms_target_include_directories (SYSTEM BEFORE PRIVATE ${AVROCPP_INCLUDE_DIR})
endif ()
if (OPENSSL_CRYPTO_LIBRARY)
dbms_target_link_libraries (PRIVATE ${OPENSSL_CRYPTO_LIBRARY})
target_link_libraries (clickhouse_common_io PRIVATE ${OPENSSL_CRYPTO_LIBRARY})

View File

@ -0,0 +1,19 @@
#!/usr/bin/env bash
QUERIES_FILE="queries.sql"
TABLE=$1
TRIES=3
cat "$QUERIES_FILE" | sed "s|{table}|\"${TABLE}\"|g" | while read query; do
echo -n "["
for i in $(seq 1 $TRIES); do
while true; do
RES=$(command time -f %e -o /dev/stdout curl -sS --location-trusted -H "Authorization: OAuth $YT_TOKEN" "$YT_PROXY.yt.yandex.net/query?default_format=Null&database=*$YT_CLIQUE_ID" --data-binary @- <<< "$query" 2>/dev/null) && break;
done
[[ "$?" == "0" ]] && echo -n "${RES}" || echo -n "null"
[[ "$i" != $TRIES ]] && echo -n ", "
done
echo "],"
done

View File

@ -0,0 +1,19 @@
#!/usr/bin/env bash
QUERIES_FILE="queries.sql"
TABLE=$1
TRIES=3
cat "$QUERIES_FILE" | sed "s|{table}|\"${TABLE}\"|g" | while read query; do
echo -n "["
for i in $(seq 1 $TRIES); do
while true; do
RES=$(command time -f %e -o time ./yql --clickhouse --syntax-version 1 -f empty <<< "USE chyt.hume; PRAGMA max_memory_usage = 100000000000; PRAGMA max_memory_usage_for_all_queries = 100000000000; $query" >/dev/null 2>&1 && cat time) && break;
done
[[ "$?" == "0" ]] && echo -n "${RES}" || echo -n "null"
[[ "$i" != $TRIES ]] && echo -n ", "
done
echo "],"
done

View File

@ -101,7 +101,7 @@ public:
}
void initialize(Poco::Util::Application & self [[maybe_unused]])
void initialize(Poco::Util::Application & self [[maybe_unused]]) override
{
std::string home_path;
const char * home_path_cstr = getenv("HOME");
@ -111,7 +111,7 @@ public:
configReadClient(config(), home_path);
}
int main(const std::vector<std::string> &)
int main(const std::vector<std::string> &) override
{
if (!json_path.empty() && Poco::File(json_path).exists()) /// Clear file with previous results
Poco::File(json_path).remove();
@ -492,7 +492,7 @@ private:
public:
~Benchmark()
~Benchmark() override
{
shutdown = true;
}

View File

@ -4,7 +4,7 @@ set(CLICKHOUSE_CLIENT_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/Suggest.cpp
)
set(CLICKHOUSE_CLIENT_LINK PRIVATE clickhouse_common_config clickhouse_functions clickhouse_aggregate_functions clickhouse_common_io clickhouse_parsers string_utils ${LINE_EDITING_LIBS} ${Boost_PROGRAM_OPTIONS_LIBRARY})
set(CLICKHOUSE_CLIENT_LINK PRIVATE clickhouse_common_config clickhouse_functions clickhouse_aggregate_functions clickhouse_common_io clickhouse_parsers string_utils ${Boost_PROGRAM_OPTIONS_LIBRARY})
include(CheckSymbolExists)
check_symbol_exists(readpassphrase readpassphrase.h HAVE_READPASSPHRASE)

View File

@ -2,6 +2,12 @@
#include "ConnectionParameters.h"
#include "Suggest.h"
#if USE_REPLXX
# include <common/ReplxxLineReader.h>
#else
# include <common/LineReader.h>
#endif
#include <stdlib.h>
#include <fcntl.h>
#include <signal.h>
@ -19,7 +25,6 @@
#include <Poco/File.h>
#include <Poco/Util/Application.h>
#include <common/find_symbols.h>
#include <common/config_common.h>
#include <common/LineReader.h>
#include <Common/ClickHouseRevision.h>
#include <Common/Stopwatch.h>
@ -205,7 +210,7 @@ private:
ConnectionParameters connection_parameters;
void initialize(Poco::Util::Application & self)
void initialize(Poco::Util::Application & self) override
{
Poco::Util::Application::initialize(self);
@ -233,7 +238,7 @@ private:
}
int main(const std::vector<std::string> & /*args*/)
int main(const std::vector<std::string> & /*args*/) override
{
try
{
@ -496,7 +501,11 @@ private:
if (!history_file.empty() && !Poco::File(history_file).exists())
Poco::File(history_file).createFile();
LineReader lr(&Suggest::instance(), history_file, '\\', config().has("multiline") ? ';' : 0);
#if USE_REPLXX
ReplxxLineReader lr(Suggest::instance(), history_file, '\\', config().has("multiline") ? ';' : 0);
#else
LineReader lr(history_file, '\\', config().has("multiline") ? ';' : 0);
#endif
do
{
@ -504,6 +513,12 @@ private:
if (input.empty())
break;
if (input.ends_with("\\G"))
{
input.resize(input.size() - 2);
has_vertical_output_suffix = true;
}
try
{
if (!process(input))

View File

@ -111,7 +111,7 @@ void LocalServer::tryInitPath()
/// In case of empty path set paths to helpful directories
std::string cd = Poco::Path::current();
context->setTemporaryPath(cd + "tmp");
context->setTemporaryStorage(cd + "tmp");
context->setFlagsPath(cd + "flags");
context->setUserFilesPath(""); // user's files are everywhere
}

View File

@ -17,6 +17,7 @@
#include <Common/setThreadName.h>
#include <Common/config.h>
#include <Common/SettingsChanges.h>
#include <Disks/DiskSpaceMonitor.h>
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedWriteBuffer.h>
#include <IO/ReadBufferFromIStream.h>
@ -351,7 +352,8 @@ void HTTPHandler::processQuery(
if (buffer_until_eof)
{
std::string tmp_path_template = context.getTemporaryPath() + "http_buffers/";
const std::string tmp_path(context.getTemporaryVolume()->getNextDisk()->getPath());
const std::string tmp_path_template(tmp_path + "http_buffers/");
auto create_tmp_disk_buffer = [tmp_path_template] (const WriteBufferPtr &)
{
@ -590,7 +592,11 @@ void HTTPHandler::processQuery(
customizeContext(context);
executeQuery(*in, *used_output.out_maybe_delayed_and_compressed, /* allow_into_outfile = */ false, context,
[&response] (const String & content_type) { response.setContentType(content_type); },
[&response] (const String & content_type, const String & format)
{
response.setContentType(content_type);
response.add("X-ClickHouse-Format", format);
},
[&response] (const String & current_query_id) { response.add("X-ClickHouse-Query-Id", current_query_id); });
if (used_output.hasDelayed())
@ -610,6 +616,8 @@ void HTTPHandler::trySendExceptionToClient(const std::string & s, int exception_
{
try
{
response.set("X-ClickHouse-Exception-Code", toString<int>(exception_code));
/// If HTTP method is POST and Keep-Alive is turned on, we should read the whole request body
/// to avoid reading part of the current request body in the next request.
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST

View File

@ -282,7 +282,8 @@ void MySQLHandler::comQuery(ReadBuffer & payload)
else
{
bool with_output = false;
std::function<void(const String &)> set_content_type = [&with_output](const String &) -> void {
std::function<void(const String &, const String &)> set_content_type_and_format = [&with_output](const String &, const String &) -> void
{
with_output = true;
};
@ -305,7 +306,7 @@ void MySQLHandler::comQuery(ReadBuffer & payload)
ReadBufferFromString replacement(replacement_query);
Context query_context = connection_context;
executeQuery(should_replace ? replacement : payload, *out, true, query_context, set_content_type, nullptr);
executeQuery(should_replace ? replacement : payload, *out, true, query_context, set_content_type_and_format, {});
if (!with_output)
packet_sender->sendPacket(OK_Packet(0x00, client_capability_flags, 0, 0, 0), true);

View File

@ -77,6 +77,31 @@ namespace CurrentMetrics
extern const Metric VersionInteger;
}
namespace
{
void setupTmpPath(Logger * log, const std::string & path)
{
LOG_DEBUG(log, "Setting up " << path << " to store temporary data in it");
Poco::File(path).createDirectories();
/// Clearing old temporary files.
Poco::DirectoryIterator dir_end;
for (Poco::DirectoryIterator it(path); it != dir_end; ++it)
{
if (it->isFile() && startsWith(it.name(), "tmp"))
{
LOG_DEBUG(log, "Removing old temporary file " << it->path());
it->remove();
}
else
LOG_DEBUG(log, "Skipped file in temporary path " << it->path());
}
}
}
namespace DB
{
@ -331,22 +356,14 @@ int Server::main(const std::vector<std::string> & /*args*/)
DateLUT::instance();
LOG_TRACE(log, "Initialized DateLUT with time zone '" << DateLUT::instance().getTimeZone() << "'.");
/// Directory with temporary data for processing of heavy queries.
/// Storage with temporary data for processing of heavy queries.
{
std::string tmp_path = config().getString("tmp_path", path + "tmp/");
global_context->setTemporaryPath(tmp_path);
Poco::File(tmp_path).createDirectories();
/// Clearing old temporary files.
Poco::DirectoryIterator dir_end;
for (Poco::DirectoryIterator it(tmp_path); it != dir_end; ++it)
{
if (it->isFile() && startsWith(it.name(), "tmp"))
{
LOG_DEBUG(log, "Removing old temporary file " << it->path());
it->remove();
}
}
std::string tmp_policy = config().getString("tmp_policy", "");
const VolumePtr & volume = global_context->setTemporaryStorage(tmp_path, tmp_policy);
for (const DiskPtr & disk : volume->disks)
setupTmpPath(log, disk->getPath());
}
/** Directory with 'flags': files indicating temporary settings for the server set by system administrator.
@ -864,7 +881,11 @@ int Server::main(const std::vector<std::string> & /*args*/)
for (auto & server : servers)
server->start();
setTextLog(global_context->getTextLog());
{
String level_str = config().getString("text_log.level", "");
int level = level_str.empty() ? INT_MAX : Poco::Logger::parseLevel(level_str);
setTextLog(global_context->getTextLog(), level);
}
buildLoggers(config(), logger());
main_config_reloader->start();

View File

@ -591,11 +591,9 @@ void TCPHandler::processOrdinaryQueryWithProcessors(size_t num_threads)
}
});
/// Wait in case of exception. Delete pipeline to release memory.
/// Wait in case of exception happened outside of pool.
SCOPE_EXIT(
/// Clear queue in case if somebody is waiting lazy_format to push.
lazy_format->finish();
lazy_format->clearQueue();
try
{
@ -604,72 +602,58 @@ void TCPHandler::processOrdinaryQueryWithProcessors(size_t num_threads)
catch (...)
{
/// If exception was thrown during pipeline execution, skip it while processing other exception.
tryLogCurrentException(log);
}
/// pipeline = QueryPipeline()
);
while (true)
while (!lazy_format->isFinished() && !exception)
{
Block block;
while (true)
if (isQueryCancelled())
{
if (isQueryCancelled())
{
/// A packet was received requesting to stop execution of the request.
executor->cancel();
break;
}
else
{
if (after_send_progress.elapsed() / 1000 >= query_context->getSettingsRef().interactive_delay)
{
/// Some time passed and there is a progress.
after_send_progress.restart();
sendProgress();
}
sendLogs();
if ((block = lazy_format->getBlock(query_context->getSettingsRef().interactive_delay / 1000)))
break;
if (lazy_format->isFinished())
break;
if (exception)
{
pool.wait();
break;
}
}
}
/** If data has run out, we will send the profiling data and total values to
* the last zero block to be able to use
* this information in the suffix output of stream.
* If the request was interrupted, then `sendTotals` and other methods could not be called,
* because we have not read all the data yet,
* and there could be ongoing calculations in other threads at the same time.
*/
if (!block && !isQueryCancelled())
{
pool.wait();
pipeline.finalize();
sendTotals(lazy_format->getTotals());
sendExtremes(lazy_format->getExtremes());
sendProfileInfo(lazy_format->getProfileInfo());
sendProgress();
sendLogs();
}
sendData(block);
if (!block)
/// A packet was received requesting to stop execution of the request.
executor->cancel();
break;
}
if (after_send_progress.elapsed() / 1000 >= query_context->getSettingsRef().interactive_delay)
{
/// Some time passed and there is a progress.
after_send_progress.restart();
sendProgress();
}
sendLogs();
if (auto block = lazy_format->getBlock(query_context->getSettingsRef().interactive_delay / 1000))
{
if (!state.io.null_format)
sendData(block);
}
}
/// Finish lazy_format before waiting. Otherwise some thread may write into it, and waiting will lock.
lazy_format->finish();
pool.wait();
/** If data has run out, we will send the profiling data and total values to
* the last zero block to be able to use
* this information in the suffix output of stream.
* If the request was interrupted, then `sendTotals` and other methods could not be called,
* because we have not read all the data yet,
* and there could be ongoing calculations in other threads at the same time.
*/
if (!isQueryCancelled())
{
pipeline.finalize();
sendTotals(lazy_format->getTotals());
sendExtremes(lazy_format->getExtremes());
sendProfileInfo(lazy_format->getProfileInfo());
sendProgress();
sendLogs();
}
sendData({});
}
state.io.onFinish();

View File

@ -111,7 +111,7 @@ public:
server_display_name = server.config().getString("display_name", getFQDNOrHostName());
}
void run();
void run() override;
/// This method is called right before the query execution.
virtual void customizeContext(DB::Context & /*context*/) {}

View File

@ -3,30 +3,30 @@
NOTE: User and query level settings are set up in "users.xml" file.
-->
<yandex>
<!-- The list of hosts allowed to use in URL-related storage engines and table functions.
If this section is not present in configuration, all hosts are allowed.
-->
<remote_url_allow_hosts>
<!-- Host should be specified exactly as in URL. The name is checked before DNS resolution.
Example: "yandex.ru", "yandex.ru." and "www.yandex.ru" are different hosts.
If port is explicitly specified in URL, the host:port is checked as a whole.
If host specified here without port, any port with this host allowed.
"yandex.ru" -> "yandex.ru:443", "yandex.ru:80" etc. is allowed, but "yandex.ru:80" -> only "yandex.ru:80" is allowed.
If the host is specified as IP address, it is checked as specified in URL. Example: "[2a02:6b8:a::a]".
If there are redirects and support for redirects is enabled, every redirect (the Location field) is checked.
-->
<!-- The list of hosts allowed to use in URL-related storage engines and table functions.
If this section is not present in configuration, all hosts are allowed.
-->
<remote_url_allow_hosts>
<!-- Host should be specified exactly as in URL. The name is checked before DNS resolution.
Example: "yandex.ru", "yandex.ru." and "www.yandex.ru" are different hosts.
If port is explicitly specified in URL, the host:port is checked as a whole.
If host specified here without port, any port with this host allowed.
"yandex.ru" -> "yandex.ru:443", "yandex.ru:80" etc. is allowed, but "yandex.ru:80" -> only "yandex.ru:80" is allowed.
If the host is specified as IP address, it is checked as specified in URL. Example: "[2a02:6b8:a::a]".
If there are redirects and support for redirects is enabled, every redirect (the Location field) is checked.
-->
<!-- Regular expression can be specified. RE2 engine is used for regexps.
Regexps are not aligned: don't forget to add ^ and $. Also don't forget to escape dot (.) metacharacter
(forgetting to do so is a common source of error).
-->
</remote_url_allow_hosts>
<!-- Regular expression can be specified. RE2 engine is used for regexps.
Regexps are not aligned: don't forget to add ^ and $. Also don't forget to escape dot (.) metacharacter
(forgetting to do so is a common source of error).
-->
</remote_url_allow_hosts>
<logger>
<!-- Possible levels: https://github.com/pocoproject/poco/blob/develop/Foundation/include/Poco/Logger.h#L105 -->
<!-- Possible levels: https://github.com/pocoproject/poco/blob/poco-1.9.4-release/Foundation/include/Poco/Logger.h#L105 -->
<level>trace</level>
<log>/home/jokserfn/var/log/clickhouse-server/clickhouse-server.log</log>
<errorlog>/home/jokserfn/var/log/clickhouse-server/clickhouse-server.err.log</errorlog>
<log>/var/log/clickhouse-server/clickhouse-server.log</log>
<errorlog>/var/log/clickhouse-server/clickhouse-server.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<!-- <console>1</console> --> <!-- Default behavior is autodetection (log to console if not daemon mode and is tty) -->
@ -34,6 +34,7 @@
<!--display_name>production</display_name--> <!-- It is the name that will be shown in the client -->
<http_port>8123</http_port>
<tcp_port>9000</tcp_port>
<mysql_port>9004</mysql_port>
<!-- For HTTPS and SSL over native protocol. -->
<!--
<https_port>8443</https_port>
@ -127,13 +128,24 @@
<!-- Path to data directory, with trailing slash. -->
<path>/home/jokserfn/var/lib/clickhouse/</path>
<path>/var/lib/clickhouse/</path>
<!-- Path to temporary data for processing hard queries. -->
<tmp_path>/home/jokserfn/var/lib/clickhouse/tmp/</tmp_path>
<tmp_path>/var/lib/clickhouse/tmp/</tmp_path>
<!-- Policy from the <storage_configuration> for the temporary files.
If not set <tmp_path> is used, otherwise <tmp_path> is ignored.
Notes:
- move_factor is ignored
- keep_free_space_bytes is ignored
- max_data_part_size_bytes is ignored
- you must have exactly one volume in that policy
-->
<!-- <tmp_policy>tmp</tmp_policy> -->
<!-- Directory with user provided files that are accessible by 'file' table function. -->
<user_files_path>/home/jokserfn/var/lib/clickhouse/user_files/</user_files_path>
<user_files_path>/var/lib/clickhouse/user_files/</user_files_path>
<!-- Path to configuration file with users, access rights, profiles of settings, quotas. -->
<users_config>users.xml</users_config>
@ -327,44 +339,49 @@
-->
<!-- Query log. Used only for queries with setting log_queries = 1. -->
<!-- <query_log>
&lt;!&ndash; What table to insert data. If table is not exist, it will be created.
<query_log>
<!-- What table to insert data. If table is not exist, it will be created.
When query log structure is changed after system update,
then old table will be renamed and new table will be created automatically.
&ndash;&gt;
-->
<database>system</database>
<table>query_log</table>
&lt;!&ndash;
<!--
PARTITION BY expr https://clickhouse.yandex/docs/en/table_engines/custom_partitioning_key/
Example:
event_date
toMonday(event_date)
toYYYYMM(event_date)
toStartOfHour(event_time)
&ndash;&gt;
-->
<partition_by>toYYYYMM(event_date)</partition_by>
&lt;!&ndash; Interval of flushing data. &ndash;&gt;
<!-- Instead of partition_by, you can provide full engine expression (starting with ENGINE = ) with parameters,
Example: <engine>ENGINE = MergeTree PARTITION BY toYYYYMM(event_date) ORDER BY (event_date, event_time) SETTINGS index_granularity = 1024</engine>
-->
<!-- Interval of flushing data. -->
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
</query_log>-->
</query_log>
<!-- Trace log. Stores stack traces collected by query profilers.
See query_profiler_real_time_period_ns and query_profiler_cpu_time_period_ns settings. -->
<!-- <trace_log>
<trace_log>
<database>system</database>
<table>trace_log</table>
<partition_by>toYYYYMM(event_date)</partition_by>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
</trace_log>-->
</trace_log>
<!-- Query thread log. Has information about all threads participated in query execution.
Used only for queries with setting log_query_threads = 1. -->
<!-- <query_thread_log>
<query_thread_log>
<database>system</database>
<table>query_thread_log</table>
<partition_by>toYYYYMM(event_date)</partition_by>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
</query_thread_log>-->
</query_thread_log>
<!-- Uncomment if use part log.
Part log contains information about all actions with parts in MergeTree tables (creation, deletion, merges, downloads).
@ -377,10 +394,12 @@
<!-- Uncomment to write text log into table.
Text log contains all information from usual server log but stores it in structured and efficient way.
The level of the messages that goes to the table can be limited (<level>), if not specified all messages will go to the table.
<text_log>
<database>system</database>
<table>text_log</table>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
<level></level>
</text_log>
-->
@ -489,7 +508,7 @@
<!-- Directory in <clickhouse-path> containing schema files for various input formats.
The directory will be created if it doesn't exist.
-->
<format_schema_path>/home/jokserfn/var/lib/clickhouse/format_schemas/</format_schema_path>
<format_schema_path>/var/lib/clickhouse/format_schemas/</format_schema_path>
<!-- Uncomment to use query masking rules.

View File

@ -49,7 +49,7 @@
In first line will be password and in second - corresponding SHA256.
How to generate double SHA1:
Execute: PASSWORD=$(base64 < /dev/urandom | head -c8); echo "$PASSWORD"; echo -n "$PASSWORD" | openssl dgst -sha1 -binary | openssl dgst -sha1
Execute: PASSWORD=$(base64 < /dev/urandom | head -c8); echo "$PASSWORD"; echo -n "$PASSWORD" | sha1sum | tr -d '-' | xxd -r -p | sha1sum | tr -d '-'
In first line will be password and in second - corresponding double SHA1.
-->
<password></password>

View File

@ -309,7 +309,7 @@ protected:
/// Uses a DFA based approach in order to better handle patterns without
/// time assertions.
///
/// NOTE: This implementation relies on the assumption that the pattern are *small*.
/// NOTE: This implementation relies on the assumption that the pattern is *small*.
///
/// This algorithm performs in O(mn) (with m the number of DFA states and N the number
/// of events) with a memory consumption and memory allocations in O(m). It means that

View File

@ -50,16 +50,21 @@
*
* P.S. This is also required, because tcmalloc can not allocate a chunk of
* memory greater than 16 GB.
*
* P.P.S. Note that MMAP_THRESHOLD symbol is intentionally made weak. It allows
* to override it during linkage when using ClickHouse as a library in
* third-party applications which may already use own allocator doing mmaps
* in the implementation of alloc/realloc.
*/
#ifdef NDEBUG
static constexpr size_t MMAP_THRESHOLD = 64 * (1ULL << 20);
__attribute__((__weak__)) extern const size_t MMAP_THRESHOLD = 64 * (1ULL << 20);
#else
/**
* In debug build, use small mmap threshold to reproduce more memory
* stomping bugs. Along with ASLR it will hopefully detect more issues than
* ASan. The program may fail due to the limit on number of memory mappings.
*/
static constexpr size_t MMAP_THRESHOLD = 4096;
__attribute__((__weak__)) extern const size_t MMAP_THRESHOLD = 4096;
#endif
static constexpr size_t MMAP_MIN_ALIGNMENT = 4096;

View File

@ -478,6 +478,7 @@ namespace ErrorCodes
extern const int FILE_ALREADY_EXISTS = 504;
extern const int CANNOT_DELETE_DIRECTORY = 505;
extern const int UNEXPECTED_ERROR_CODE = 506;
extern const int UNABLE_TO_SKIP_UNUSED_SHARDS = 507;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -195,7 +195,7 @@ std::string getCurrentExceptionMessage(bool with_stacktrace, bool check_embedded
<< ", e.displayText() = " << e.displayText()
<< (with_stacktrace ? getExceptionStackTraceString(e) : "")
<< (with_extra_info ? getExtraExceptionInfo(e) : "")
<< " (version " << VERSION_STRING << VERSION_OFFICIAL;
<< " (version " << VERSION_STRING << VERSION_OFFICIAL << ")";
}
catch (...) {}
}

View File

@ -141,7 +141,15 @@ QueryProfilerBase<ProfilerImpl>::QueryProfilerBase(const Int32 thread_id, const
sev._sigev_un._tid = thread_id;
#endif
if (timer_create(clock_type, &sev, &timer_id))
{
/// In Google Cloud Run, the function "timer_create" is implemented incorrectly as of 2020-01-25.
/// https://mybranch.dev/posts/clickhouse-on-cloud-run/
if (errno == 0)
throw Exception("Failed to create thread timer. The function 'timer_create' returned non-zero but didn't set errno. This is bug in your OS.",
ErrorCodes::CANNOT_CREATE_TIMER);
throwFromErrno("Failed to create thread timer", ErrorCodes::CANNOT_CREATE_TIMER);
}
/// Randomize offset as uniform random value from 0 to period - 1.
/// It will allow to sample short queries even if timer period is large.

View File

@ -1,12 +1,13 @@
#include <re2/re2.h>
#include <Common/RemoteHostFilter.h>
#include <Poco/URI.h>
#include <Formats/FormatFactory.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Formats/FormatFactory.h>
#include <Common/RemoteHostFilter.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/Exception.h>
#include <IO/WriteHelpers.h>
namespace DB
{
namespace ErrorCodes

View File

@ -1,17 +1,19 @@
#pragma once
#include <string>
#include <vector>
#include <unordered_set>
#include <Poco/URI.h>
#include <Poco/Util/AbstractConfiguration.h>
namespace Poco { class URI; }
namespace Poco { namespace Util { class AbstractConfiguration; } }
namespace DB
{
class RemoteHostFilter
{
/**
* This class checks if url is allowed.
* This class checks if URL is allowed.
* If primary_hosts and regexp_hosts are empty all urls are allowed.
*/
public:
@ -25,6 +27,7 @@ private:
std::unordered_set<std::string> primary_hosts; /// Allowed primary (<host>) URL from config.xml
std::vector<std::string> regexp_hosts; /// Allowed regexp (<hots_regexp>) URL from config.xml
bool checkForDirectEntry(const std::string & str) const; /// Checks if the primary_hosts and regexp_hosts contain str. If primary_hosts and regexp_hosts are empty return true.
/// Checks if the primary_hosts and regexp_hosts contain str. If primary_hosts and regexp_hosts are empty return true.
bool checkForDirectEntry(const std::string & str) const;
};
}

View File

@ -23,7 +23,14 @@ namespace DB
static thread_local void * stack_address = nullptr;
static thread_local size_t max_stack_size = 0;
void checkStackSize()
/** It works fine when interpreters are instantiated by ClickHouse code in properly prepared threads,
* but there are cases when ClickHouse runs as a library inside another application.
* If application is using user-space lightweight threads with manually allocated stacks,
* current implementation is not reasonable, as it has no way to properly check the remaining
* stack size without knowing the details of how stacks are allocated.
* We mark this function as weak symbol to be able to replace it in another ClickHouse-based products.
*/
__attribute__((__weak__)) void checkStackSize()
{
using namespace DB;

View File

@ -147,6 +147,14 @@ using BlocksList = std::list<Block>;
using BlocksPtr = std::shared_ptr<Blocks>;
using BlocksPtrs = std::shared_ptr<std::vector<BlocksPtr>>;
/// Extends block with extra data in derived classes
struct ExtraBlock
{
Block block;
};
using ExtraBlockPtr = std::shared_ptr<ExtraBlock>;
/// Compare number of columns, data types, column types, column names, and values of constant columns.
bool blocksHaveEqualStructure(const Block & lhs, const Block & rhs);

View File

@ -99,7 +99,7 @@ class ExternalTablesHandler : public Poco::Net::PartHandler, BaseExternalTable
public:
ExternalTablesHandler(Context & context_, const Poco::Net::NameValueCollection & params_) : context(context_), params(params_) {}
void handlePart(const Poco::Net::MessageHeader & header, std::istream & stream);
void handlePart(const Poco::Net::MessageHeader & header, std::istream & stream) override;
private:
Context & context;

View File

@ -52,6 +52,8 @@ struct Settings : public SettingsCollection<Settings>
M(SettingUInt64, max_insert_block_size, DEFAULT_INSERT_BLOCK_SIZE, "The maximum block size for insertion, if we control the creation of blocks for insertion.", 0) \
M(SettingUInt64, min_insert_block_size_rows, DEFAULT_INSERT_BLOCK_SIZE, "Squash blocks passed to INSERT query to specified size in rows, if blocks are not big enough.", 0) \
M(SettingUInt64, min_insert_block_size_bytes, (DEFAULT_INSERT_BLOCK_SIZE * 256), "Squash blocks passed to INSERT query to specified size in bytes, if blocks are not big enough.", 0) \
M(SettingUInt64, max_joined_block_size_rows, DEFAULT_BLOCK_SIZE, "Maximum block size for JOIN result (if join algorithm supports it). 0 means unlimited.", 0) \
M(SettingUInt64, max_insert_threads, 0, "The maximum number of threads to execute the INSERT SELECT query. By default, it is determined automatically.", 0) \
M(SettingMaxThreads, max_threads, 0, "The maximum number of threads to execute the request. By default, it is determined automatically.", 0) \
M(SettingMaxThreads, max_alter_threads, 0, "The maximum number of threads to execute the ALTER requests. By default, it is determined automatically.", 0) \
M(SettingUInt64, max_read_buffer_size, DBMS_DEFAULT_BUFFER_SIZE, "The maximum size of the buffer to read from the filesystem.", 0) \
@ -110,6 +112,7 @@ struct Settings : public SettingsCollection<Settings>
\
M(SettingBool, distributed_group_by_no_merge, false, "Do not merge aggregation states from different servers for distributed query processing - in case it is for certain that there are different keys on different shards.", 0) \
M(SettingBool, optimize_skip_unused_shards, false, "Assumes that data is distributed by sharding_key. Optimization to skip unused shards if SELECT query filters by sharding_key.", 0) \
M(SettingUInt64, force_optimize_skip_unused_shards, 0, "Throw an exception if unused shards cannot be skipped (1 - throw only if the table has the sharding key, 2 - always throw.", 0) \
\
M(SettingBool, input_format_parallel_parsing, true, "Enable parallel parsing for some data formats.", 0) \
M(SettingUInt64, min_chunk_bytes_for_parallel_parsing, (1024 * 1024), "The minimum chunk size in bytes, which each thread will parse in parallel.", 0) \
@ -186,6 +189,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingBool, input_format_values_interpret_expressions, true, "For Values format: if the field could not be parsed by streaming parser, run SQL parser and try to interpret it as SQL expression.", 0) \
M(SettingBool, input_format_values_deduce_templates_of_expressions, true, "For Values format: if the field could not be parsed by streaming parser, run SQL parser, deduce template of the SQL expression, try to parse all rows using template and then interpret expression for all rows.", 0) \
M(SettingBool, input_format_values_accurate_types_of_literals, true, "For Values format: when parsing and interpreting expressions using template, check actual type of literal to avoid possible overflow and precision issues.", 0) \
M(SettingString, input_format_avro_schema_registry_url, "", "For AvroConfluent format: Confluent Schema Registry URL.", 0) \
\
M(SettingBool, output_format_json_quote_64bit_integers, true, "Controls quoting of 64-bit integers in JSON output format.", 0) \
\
@ -197,6 +201,8 @@ struct Settings : public SettingsCollection<Settings>
M(SettingUInt64, output_format_pretty_max_column_pad_width, 250, "Maximum width to pad all values in a column in Pretty formats.", 0) \
M(SettingBool, output_format_pretty_color, true, "Use ANSI escape sequences to paint colors in Pretty formats", 0) \
M(SettingUInt64, output_format_parquet_row_group_size, 1000000, "Row group size in rows.", 0) \
M(SettingString, output_format_avro_codec, "", "Compression codec used for output. Possible values: 'null', 'deflate', 'snappy'.", 0) \
M(SettingUInt64, output_format_avro_sync_interval, 16 * 1024, "Sync interval in bytes.", 0) \
\
M(SettingBool, use_client_time_zone, false, "Use client timezone for interpreting DateTime string values, instead of adopting server timezone.", 0) \
\
@ -312,7 +318,6 @@ struct Settings : public SettingsCollection<Settings>
M(SettingBool, partial_merge_join_optimizations, false, "Enable optimizations in partial merge join", 0) \
M(SettingUInt64, default_max_bytes_in_join, 100000000, "Maximum size of right-side table if limit is required but max_bytes_in_join is not set.", 0) \
M(SettingUInt64, partial_merge_join_rows_in_right_blocks, 10000, "Split right-hand joining data in blocks of specified size. It's a portion of data indexed by min-max values and possibly unloaded on disk.", 0) \
M(SettingUInt64, partial_merge_join_rows_in_left_blocks, 10000, "Group left-hand joining data in bigger blocks. Setting it to a bigger value increases JOIN performance and memory usage.", 0) \
\
M(SettingUInt64, max_rows_to_transfer, 0, "Maximum size (in rows) of the transmitted external table obtained when the GLOBAL IN/JOIN section is executed.", 0) \
M(SettingUInt64, max_bytes_to_transfer, 0, "Maximum size (in uncompressed bytes) of the transmitted external table obtained when the GLOBAL IN/JOIN section is executed.", 0) \
@ -360,7 +365,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingBool, cancel_http_readonly_queries_on_client_close, false, "Cancel HTTP readonly queries when a client closes the connection without waiting for response.", 0) \
M(SettingBool, external_table_functions_use_nulls, true, "If it is set to true, external table functions will implicitly use Nullable type if needed. Otherwise NULLs will be substituted with default values. Currently supported only by 'mysql' and 'odbc' table functions.", 0) \
\
M(SettingBool, experimental_use_processors, false, "Use processors pipeline.", 0) \
M(SettingBool, experimental_use_processors, true, "Use processors pipeline.", 0) \
\
M(SettingBool, allow_hyperscan, true, "Allow functions that use Hyperscan library. Disable to avoid potentially long compilation times and excessive resource usage.", 0) \
M(SettingBool, allow_simdjson, true, "Allow using simdjson library in 'JSON*' functions if AVX2 instructions are available. If disabled rapidjson will be used.", 0) \

View File

@ -62,7 +62,7 @@ void SettingNumber<Type>::set(const Field & x)
template <typename Type>
void SettingNumber<Type>::set(const String & x)
{
set(completeParse<Type>(x));
set(parseWithSizeSuffix<Type>(x));
}
template <>

View File

@ -31,7 +31,6 @@ enum class TypeIndex
Float64,
Date,
DateTime,
DateTime32 = DateTime,
DateTime64,
String,
FixedString,
@ -158,8 +157,6 @@ using Decimal32 = Decimal<Int32>;
using Decimal64 = Decimal<Int64>;
using Decimal128 = Decimal<Int128>;
// TODO (nemkov): consider making a strong typedef
//using DateTime32 = time_t;
using DateTime64 = Decimal64;
template <> struct TypeName<Decimal32> { static const char * get() { return "Decimal32"; } };

View File

@ -10,5 +10,6 @@
#cmakedefine01 USE_POCO_DATAODBC
#cmakedefine01 USE_POCO_MONGODB
#cmakedefine01 USE_POCO_REDIS
#cmakedefine01 USE_POCO_JSON
#cmakedefine01 USE_INTERNAL_LLVM_LIBRARY
#cmakedefine01 USE_SSL

View File

@ -66,6 +66,8 @@ struct BlockIO
finish_callback = rhs.finish_callback;
exception_callback = rhs.exception_callback;
null_format = rhs.null_format;
return *this;
}
};

View File

@ -44,4 +44,29 @@ Block ExpressionBlockInputStream::readImpl()
return res;
}
Block InflatingExpressionBlockInputStream::readImpl()
{
if (!initialized)
{
if (expression->resultIsAlwaysEmpty())
return {};
initialized = true;
}
Block res;
if (likely(!not_processed))
{
res = children.back()->read();
if (res)
expression->execute(res, not_processed, action_number);
}
else
{
res = std::move(not_processed->block);
expression->execute(res, not_processed, action_number);
}
return res;
}
}

View File

@ -15,10 +15,9 @@ class ExpressionActions;
*/
class ExpressionBlockInputStream : public IBlockInputStream
{
private:
public:
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
public:
ExpressionBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_);
String getName() const override;
@ -26,12 +25,29 @@ public:
Block getHeader() const override;
protected:
bool initialized = false;
ExpressionActionsPtr expression;
Block readImpl() override;
private:
ExpressionActionsPtr expression;
Block cached_header;
bool initialized = false;
};
/// ExpressionBlockInputStream that could generate many out blocks for single input block.
class InflatingExpressionBlockInputStream : public ExpressionBlockInputStream
{
public:
InflatingExpressionBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_)
: ExpressionBlockInputStream(input, expression_)
{}
protected:
Block readImpl() override;
private:
ExtraBlockPtr not_processed;
size_t action_number = 0;
};
}

View File

@ -12,5 +12,6 @@ class IBlockOutputStream;
using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;
using BlockInputStreams = std::vector<BlockInputStreamPtr>;
using BlockOutputStreamPtr = std::shared_ptr<IBlockOutputStream>;
using BlockOutputStreams = std::vector<BlockOutputStreamPtr>;
}

View File

@ -7,6 +7,7 @@
#include <IO/WriteBufferFromFile.h>
#include <Compression/CompressedWriteBuffer.h>
#include <Interpreters/sortBlock.h>
#include <Disks/DiskSpaceMonitor.h>
namespace ProfileEvents
@ -21,10 +22,10 @@ namespace DB
MergeSortingBlockInputStream::MergeSortingBlockInputStream(
const BlockInputStreamPtr & input, SortDescription & description_,
size_t max_merged_block_size_, UInt64 limit_, size_t max_bytes_before_remerge_,
size_t max_bytes_before_external_sort_, const std::string & tmp_path_, size_t min_free_disk_space_)
size_t max_bytes_before_external_sort_, VolumePtr tmp_volume_, size_t min_free_disk_space_)
: description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_),
max_bytes_before_remerge(max_bytes_before_remerge_),
max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_path(tmp_path_),
max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_volume(tmp_volume_),
min_free_disk_space(min_free_disk_space_)
{
children.push_back(input);
@ -78,10 +79,14 @@ Block MergeSortingBlockInputStream::readImpl()
*/
if (max_bytes_before_external_sort && sum_bytes_in_blocks > max_bytes_before_external_sort)
{
if (!enoughSpaceInDirectory(tmp_path, sum_bytes_in_blocks + min_free_disk_space))
throw Exception("Not enough space for external sort in " + tmp_path, ErrorCodes::NOT_ENOUGH_SPACE);
size_t size = sum_bytes_in_blocks + min_free_disk_space;
auto reservation = tmp_volume->reserve(size);
if (!reservation)
throw Exception("Not enough space for external sort in temporary storage", ErrorCodes::NOT_ENOUGH_SPACE);
const std::string tmp_path(reservation->getDisk()->getPath());
temporary_files.emplace_back(createTemporaryFile(tmp_path));
const std::string & path = temporary_files.back()->path();
MergeSortingBlocksBlockInputStream block_in(blocks, description, max_merged_block_size, limit);

View File

@ -18,6 +18,9 @@ namespace DB
struct TemporaryFileStream;
class Volume;
using VolumePtr = std::shared_ptr<Volume>;
namespace ErrorCodes
{
extern const int NOT_ENOUGH_SPACE;
@ -77,7 +80,7 @@ public:
MergeSortingBlockInputStream(const BlockInputStreamPtr & input, SortDescription & description_,
size_t max_merged_block_size_, UInt64 limit_,
size_t max_bytes_before_remerge_,
size_t max_bytes_before_external_sort_, const std::string & tmp_path_,
size_t max_bytes_before_external_sort_, VolumePtr tmp_volume_,
size_t min_free_disk_space_);
String getName() const override { return "MergeSorting"; }
@ -97,7 +100,7 @@ private:
size_t max_bytes_before_remerge;
size_t max_bytes_before_external_sort;
const std::string tmp_path;
VolumePtr tmp_volume;
size_t min_free_disk_space;
Logger * log = &Logger::get("MergeSortingBlockInputStream");

View File

@ -21,9 +21,19 @@ class NullAndDoCopyBlockInputStream : public IBlockInputStream
{
public:
NullAndDoCopyBlockInputStream(const BlockInputStreamPtr & input_, BlockOutputStreamPtr output_)
: input(input_), output(output_)
{
children.push_back(input_);
input_streams.push_back(input_);
output_streams.push_back(output_);
for (auto & input_stream : input_streams)
children.push_back(input_stream);
}
NullAndDoCopyBlockInputStream(const BlockInputStreams & input_, BlockOutputStreams & output_)
: input_streams(input_), output_streams(output_)
{
for (auto & input_stream : input_)
children.push_back(input_stream);
}
/// Suppress readPrefix and readSuffix, because they are called by copyData.
@ -39,13 +49,20 @@ public:
protected:
Block readImpl() override
{
copyData(*input, *output);
/// We do not use cancel flag here.
/// If query was cancelled, it will be processed by child streams.
/// Part of the data will be processed.
if (input_streams.size() == 1 && output_streams.size() == 1)
copyData(*input_streams.at(0), *output_streams.at(0));
else
copyData(input_streams, output_streams);
return Block();
}
private:
BlockInputStreamPtr input;
BlockOutputStreamPtr output;
BlockInputStreams input_streams;
BlockOutputStreams output_streams;
};
}

View File

@ -56,9 +56,24 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
StoragePtr inner_table = materialized_view->getTargetTable();
auto inner_table_id = inner_table->getStorageID();
query = materialized_view->getInnerQuery();
std::unique_ptr<ASTInsertQuery> insert = std::make_unique<ASTInsertQuery>();
insert->database = inner_table_id.database_name;
insert->table = inner_table_id.table_name;
/// Get list of columns we get from select query.
auto header = InterpreterSelectQuery(query, *views_context, SelectQueryOptions().analyze())
.getSampleBlock();
/// Insert only columns returned by select.
auto list = std::make_shared<ASTExpressionList>();
for (auto & column : header)
/// But skip columns which storage doesn't have.
if (inner_table->hasColumn(column.name))
list->children.emplace_back(std::make_shared<ASTIdentifier>(column.name));
insert->columns = std::move(list);
ASTPtr insert_query_ptr(insert.release());
InterpreterInsertQuery interpreter(insert_query_ptr, *views_context);
BlockIO io = interpreter.execute();

View File

@ -70,7 +70,7 @@ bool TTLBlockInputStream::isTTLExpired(time_t ttl)
Block TTLBlockInputStream::readImpl()
{
/// Skip all data if table ttl is expired for part
if (storage.hasTableTTL() && isTTLExpired(old_ttl_infos.table_ttl.max))
if (storage.hasRowsTTL() && isTTLExpired(old_ttl_infos.table_ttl.max))
{
rows_removed = data_part->rows_count;
return {};
@ -80,7 +80,7 @@ Block TTLBlockInputStream::readImpl()
if (!block)
return block;
if (storage.hasTableTTL() && (force || isTTLExpired(old_ttl_infos.table_ttl.min)))
if (storage.hasRowsTTL() && (force || isTTLExpired(old_ttl_infos.table_ttl.min)))
removeRowsWithExpiredTableTTL(block);
removeValuesWithExpiredColumnTTL(block);
@ -106,10 +106,10 @@ void TTLBlockInputStream::readSuffixImpl()
void TTLBlockInputStream::removeRowsWithExpiredTableTTL(Block & block)
{
storage.ttl_table_entry.expression->execute(block);
storage.rows_ttl_entry.expression->execute(block);
const IColumn * ttl_column =
block.getByName(storage.ttl_table_entry.result_column).column.get();
block.getByName(storage.rows_ttl_entry.result_column).column.get();
const auto & column_names = header.getNames();
MutableColumns result_columns;

View File

@ -1,6 +1,10 @@
#include <thread>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/copyData.h>
#include <DataStreams/ParallelInputsProcessor.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/ThreadPool.h>
namespace DB
@ -51,6 +55,79 @@ void copyDataImpl(IBlockInputStream & from, IBlockOutputStream & to, TCancelCall
inline void doNothing(const Block &) {}
namespace
{
struct ParallelInsertsHandler
{
using CencellationHook = std::function<void()>;
explicit ParallelInsertsHandler(BlockOutputStreams & output_streams, CencellationHook cancellation_hook_, size_t num_threads)
: outputs(output_streams.size()), cancellation_hook(std::move(cancellation_hook_))
{
exceptions.resize(num_threads);
for (auto & output : output_streams)
outputs.push(output.get());
}
void onBlock(Block & block, size_t /*thread_num*/)
{
IBlockOutputStream * out = nullptr;
outputs.pop(out);
out->write(block);
outputs.push(out);
}
void onFinishThread(size_t /*thread_num*/) {}
void onFinish() {}
void onException(std::exception_ptr & exception, size_t thread_num)
{
exceptions[thread_num] = exception;
cancellation_hook();
}
void rethrowFirstException()
{
for (auto & exception : exceptions)
if (exception)
std::rethrow_exception(exception);
}
ConcurrentBoundedQueue<IBlockOutputStream *> outputs;
std::vector<std::exception_ptr> exceptions;
CencellationHook cancellation_hook;
};
}
static void copyDataImpl(BlockInputStreams & inputs, BlockOutputStreams & outputs)
{
for (auto & output : outputs)
output->writePrefix();
using Processor = ParallelInputsProcessor<ParallelInsertsHandler>;
Processor * processor_ptr = nullptr;
ParallelInsertsHandler handler(outputs, [&processor_ptr]() { processor_ptr->cancel(false); }, inputs.size());
ParallelInputsProcessor<ParallelInsertsHandler> processor(inputs, nullptr, inputs.size(), handler);
processor_ptr = &processor;
processor.process();
processor.wait();
handler.rethrowFirstException();
/// readPrefix is called in ParallelInputsProcessor.
for (auto & input : inputs)
input->readSuffix();
for (auto & output : outputs)
output->writeSuffix();
}
void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic<bool> * is_cancelled)
{
auto is_cancelled_pred = [is_cancelled] ()
@ -61,6 +138,10 @@ void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic<boo
copyDataImpl(from, to, is_cancelled_pred, doNothing);
}
void copyData(BlockInputStreams & inputs, BlockOutputStreams & outputs)
{
copyDataImpl(inputs, outputs);
}
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<bool()> & is_cancelled)
{

View File

@ -16,6 +16,8 @@ class Block;
*/
void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic<bool> * is_cancelled = nullptr);
void copyData(BlockInputStreams & inputs, BlockOutputStreams & outputs);
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<bool()> & is_cancelled);
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<bool()> & is_cancelled,

View File

@ -60,7 +60,7 @@ std::ostream & operator<<(std::ostream & ostr, const TypesTestCase & test_case)
class TypeTest : public ::testing::TestWithParam<TypesTestCase>
{
public:
void SetUp()
void SetUp() override
{
const auto & p = GetParam();
from_types = typesFromString(p.from_types);

View File

@ -9,6 +9,7 @@ namespace DB
DatabaseMemory::DatabaseMemory(const String & name_)
: DatabaseWithOwnTablesBase(name_, "DatabaseMemory(" + name_ + ")")
, data_path("data/" + escapeForFileName(database_name) + "/")
{}
void DatabaseMemory::createTable(

View File

@ -1,6 +1,8 @@
#pragma once
#include <Databases/DatabasesCommon.h>
#include <Common/escapeForFileName.h>
#include <Parsers/ASTCreateQuery.h>
namespace Poco { class Logger; }
@ -32,6 +34,16 @@ public:
const String & table_name) override;
ASTPtr getCreateDatabaseQuery(const Context & /*context*/) const override;
/// DatabaseMemory allows to create tables, which store data on disk.
/// It's needed to create such tables in default database of clickhouse-local.
/// TODO May be it's better to use DiskMemory for such tables.
/// To save data on disk it's possible to explicitly CREATE DATABASE db ENGINE=Ordinary in clickhouse-local.
String getTableDataPath(const String & table_name) const override { return data_path + escapeForFileName(table_name) + "/"; }
String getTableDataPath(const ASTCreateQuery & query) const override { return getTableDataPath(query.table); }
private:
String data_path;
};
}

View File

@ -56,13 +56,13 @@ public:
DatabaseTablesSnapshotIterator(Tables && tables_) : tables(tables_), it(tables.begin()) {}
void next() { ++it; }
void next() override { ++it; }
bool isValid() const { return it != tables.end(); }
bool isValid() const override { return it != tables.end(); }
const String & name() const { return it->first; }
const String & name() const override { return it->first; }
const StoragePtr & table() const { return it->second; }
const StoragePtr & table() const override { return it->second; }
};
/// Copies list of dictionaries and iterates through such snapshot.

View File

@ -23,6 +23,8 @@ namespace DB
class Context;
/** Create dictionary according to its layout.
*/
class DictionaryFactory : private boost::noncopyable
{
public:

View File

@ -111,6 +111,12 @@ Volume::Volume(
<< " < " << formatReadableSizeWithBinarySuffix(MIN_PART_SIZE) << ")");
}
DiskPtr Volume::getNextDisk()
{
size_t start_from = last_used.fetch_add(1u, std::memory_order_relaxed);
size_t index = start_from % disks.size();
return disks[index];
}
ReservationPtr Volume::reserve(UInt64 expected_size)
{

View File

@ -67,6 +67,13 @@ public:
const String & config_prefix,
const DiskSelector & disk_selector);
/// Next disk (round-robin)
///
/// - Used with policy for temporary data
/// - Ignores all limitations
/// - Shares last access with reserve()
DiskPtr getNextDisk();
/// Uses Round-robin to choose disk for reservation.
/// Returns valid reservation or nullptr if there is no space left on any disk.
ReservationPtr reserve(UInt64 bytes) override;

View File

@ -5,6 +5,10 @@
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#if !__clang__
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wsuggest-override"
#endif
template <typename T>
DB::DiskPtr createDisk();

View File

@ -68,6 +68,7 @@ static FormatSettings getInputFormatSetting(const Settings & settings, const Con
format_settings.custom.row_before_delimiter = settings.format_custom_row_before_delimiter;
format_settings.custom.row_after_delimiter = settings.format_custom_row_after_delimiter;
format_settings.custom.row_between_delimiter = settings.format_custom_row_between_delimiter;
format_settings.avro.schema_registry_url = settings.input_format_avro_schema_registry_url;
return format_settings;
}
@ -99,6 +100,8 @@ static FormatSettings getOutputFormatSetting(const Settings & settings, const Co
format_settings.custom.row_before_delimiter = settings.format_custom_row_before_delimiter;
format_settings.custom.row_after_delimiter = settings.format_custom_row_after_delimiter;
format_settings.custom.row_between_delimiter = settings.format_custom_row_between_delimiter;
format_settings.avro.output_codec = settings.output_format_avro_codec;
format_settings.avro.output_sync_interval = settings.output_format_avro_sync_interval;
return format_settings;
}
@ -325,6 +328,8 @@ FormatFactory::FormatFactory()
registerInputFormatProcessorORC(*this);
registerInputFormatProcessorParquet(*this);
registerOutputFormatProcessorParquet(*this);
registerInputFormatProcessorAvro(*this);
registerOutputFormatProcessorAvro(*this);
registerInputFormatProcessorTemplate(*this);
registerOutputFormatProcessorTemplate(*this);

View File

@ -166,6 +166,8 @@ void registerInputFormatProcessorORC(FormatFactory & factory);
void registerOutputFormatProcessorParquet(FormatFactory & factory);
void registerInputFormatProcessorProtobuf(FormatFactory & factory);
void registerOutputFormatProcessorProtobuf(FormatFactory & factory);
void registerInputFormatProcessorAvro(FormatFactory & factory);
void registerOutputFormatProcessorAvro(FormatFactory & factory);
void registerInputFormatProcessorTemplate(FormatFactory & factory);
void registerOutputFormatProcessorTemplate(FormatFactory &factory);

View File

@ -110,6 +110,16 @@ struct FormatSettings
};
Custom custom;
struct Avro
{
String schema_registry_url;
String output_codec;
UInt64 output_sync_interval = 16 * 1024;
};
Avro avro;
};
}

View File

@ -16,11 +16,11 @@ namespace ErrorCodes
ParsedTemplateFormatString::ParsedTemplateFormatString(const FormatSchemaInfo & schema, const ColumnIdxGetter & idx_by_name)
{
ReadBufferFromFile schema_file(schema.absoluteSchemaPath(), 4096);
String format_string;
readStringUntilEOF(format_string, schema_file);
try
{
ReadBufferFromFile schema_file(schema.absoluteSchemaPath(), 4096);
String format_string;
readStringUntilEOF(format_string, schema_file);
parse(format_string, idx_by_name);
}
catch (DB::Exception & e)
@ -193,7 +193,7 @@ const char * ParsedTemplateFormatString::readMayBeQuotedColumnNameInto(const cha
String ParsedTemplateFormatString::dump() const
{
WriteBufferFromOwnString res;
res << "Delimiter " << 0 << ": ";
res << "\nDelimiter " << 0 << ": ";
verbosePrintString(delimiters.front().c_str(), delimiters.front().c_str() + delimiters.front().size(), res);
size_t num_columns = std::max(formats.size(), format_idx_to_column_idx.size());

View File

@ -2,6 +2,7 @@
// .h autogenerated by cmake!
#cmakedefine01 USE_AVRO
#cmakedefine01 USE_CAPNP
#cmakedefine01 USE_SNAPPY
#cmakedefine01 USE_PARQUET

View File

@ -121,6 +121,11 @@ struct NumericArraySource : public ArraySourceImpl<NumericArraySource<T>>
}
};
#if !__clang__
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wsuggest-override"
#endif
template <typename Base>
struct ConstSource : public Base
{
@ -199,6 +204,10 @@ struct ConstSource : public Base
}
};
#if !__clang__
#pragma GCC diagnostic pop
#endif
struct StringSource
{
using Slice = NumericArraySlice<UInt8>;

View File

@ -746,6 +746,23 @@ inline void readBinary(Decimal128 & x, ReadBuffer & buf) { readPODBinary(x, buf)
inline void readBinary(LocalDate & x, ReadBuffer & buf) { readPODBinary(x, buf); }
template <typename T>
inline std::enable_if_t<is_arithmetic_v<T> && (sizeof(T) <= 8), void>
readBinaryBigEndian(T & x, ReadBuffer & buf) /// Assuming little endian architecture.
{
readPODBinary(x, buf);
if constexpr (sizeof(x) == 1)
return;
else if constexpr (sizeof(x) == 2)
x = __builtin_bswap16(x);
else if constexpr (sizeof(x) == 4)
x = __builtin_bswap32(x);
else if constexpr (sizeof(x) == 8)
x = __builtin_bswap64(x);
}
/// Generic methods to read value in text tab-separated format.
template <typename T>
inline std::enable_if_t<is_integral_v<T>, void>
@ -955,28 +972,78 @@ inline T parse(const char * data, size_t size)
return res;
}
/// Read something from text format, but expect complete parse of given text
/// For example: 723145 -- ok, 213MB -- not ok
template <typename T>
inline T completeParse(const char * data, size_t size)
inline std::enable_if_t<!is_integral_v<T>, void>
readTextWithSizeSuffix(T & x, ReadBuffer & buf) { readText(x, buf); }
template <typename T>
inline std::enable_if_t<is_integral_v<T>, void>
readTextWithSizeSuffix(T & x, ReadBuffer & buf)
{
readIntText(x, buf);
if (buf.eof())
return;
/// Updates x depending on the suffix
auto finish = [&buf, &x] (UInt64 base, int power_of_two) mutable
{
++buf.position();
if (buf.eof())
{
x *= base; /// For decimal suffixes, such as k, M, G etc.
}
else if (*buf.position() == 'i')
{
x = (x << power_of_two); /// For binary suffixes, such as ki, Mi, Gi, etc.
++buf.position();
}
return;
};
switch (*buf.position())
{
case 'k': [[fallthrough]];
case 'K':
finish(1000, 10);
break;
case 'M':
finish(1000000, 20);
break;
case 'G':
finish(1000000000, 30);
break;
case 'T':
finish(1000000000000ULL, 40);
break;
default:
return;
}
return;
}
/// Read something from text format and trying to parse the suffix.
/// If the suffix is not valid gives an error
/// For example: 723145 -- ok, 213MB -- not ok, but 213Mi -- ok
template <typename T>
inline T parseWithSizeSuffix(const char * data, size_t size)
{
T res;
ReadBufferFromMemory buf(data, size);
readText(res, buf);
readTextWithSizeSuffix(res, buf);
assertEOF(buf);
return res;
}
template <typename T>
inline T completeParse(const String & s)
inline T parseWithSizeSuffix(const String & s)
{
return completeParse<T>(s.data(), s.size());
return parseWithSizeSuffix<T>(s.data(), s.size());
}
template <typename T>
inline T completeParse(const char * data)
inline T parseWithSizeSuffix(const char * data)
{
return completeParse<T>(data, strlen(data));
return parseWithSizeSuffix<T>(data, strlen(data));
}
template <typename T>

View File

@ -29,7 +29,7 @@ public:
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE);
/// Receives response from the server after sending all data.
void finalize();
void finalize() override;
};
}

View File

@ -101,7 +101,13 @@ inline bool readDigits(ReadBuffer & buf, T & x, unsigned int & digits, int & exp
{
++buf.position();
Int32 addition_exp = 0;
readIntText(addition_exp, buf);
if (!tryReadIntText(addition_exp, buf))
{
if constexpr (_throw_on_error)
throw Exception("Cannot parse exponent while reading decimal", ErrorCodes::CANNOT_PARSE_NUMBER);
else
return false;
}
exponent += addition_exp;
stop = true;
continue;

View File

@ -28,6 +28,7 @@
#include <common/config_common.h>
#include <AggregateFunctions/AggregateFunctionArray.h>
#include <AggregateFunctions/AggregateFunctionState.h>
#include <Disks/DiskSpaceMonitor.h>
namespace ProfileEvents
@ -681,22 +682,25 @@ bool Aggregator::executeOnBlock(Columns columns, UInt64 num_rows, AggregatedData
&& current_memory_usage > static_cast<Int64>(params.max_bytes_before_external_group_by)
&& worth_convert_to_two_level)
{
if (!enoughSpaceInDirectory(params.tmp_path, current_memory_usage + params.min_free_disk_space))
throw Exception("Not enough space for external aggregation in " + params.tmp_path, ErrorCodes::NOT_ENOUGH_SPACE);
size_t size = current_memory_usage + params.min_free_disk_space;
auto reservation = params.tmp_volume->reserve(size);
if (!reservation)
throw Exception("Not enough space for external aggregation in temporary storage", ErrorCodes::NOT_ENOUGH_SPACE);
writeToTemporaryFile(result);
const std::string tmp_path(reservation->getDisk()->getPath());
writeToTemporaryFile(result, tmp_path);
}
return true;
}
void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants)
void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, const String & tmp_path)
{
Stopwatch watch;
size_t rows = data_variants.size();
auto file = createTemporaryFile(params.tmp_path);
auto file = createTemporaryFile(tmp_path);
const std::string & path = file->path();
WriteBufferFromFile file_buf(path);
CompressedWriteBuffer compressed_buf(file_buf);
@ -753,6 +757,10 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants)
<< (uncompressed_bytes / elapsed_seconds / 1048576.0) << " MiB/sec. uncompressed, "
<< (compressed_bytes / elapsed_seconds / 1048576.0) << " MiB/sec. compressed)");
}
void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants)
{
return writeToTemporaryFile(data_variants, params.tmp_volume->getNextDisk()->getPath());
}
template <typename Method>

View File

@ -46,6 +46,8 @@ namespace ErrorCodes
class IBlockOutputStream;
class Volume;
using VolumePtr = std::shared_ptr<Volume>;
/** Different data structures that can be used for aggregation
* For efficiency, the aggregation data itself is put into the pool.
@ -860,7 +862,7 @@ public:
/// Return empty result when aggregating without keys on empty set.
bool empty_result_for_aggregation_by_empty_set;
const std::string tmp_path;
VolumePtr tmp_volume;
/// Settings is used to determine cache size. No threads are created.
size_t max_threads;
@ -873,7 +875,7 @@ public:
size_t group_by_two_level_threshold_, size_t group_by_two_level_threshold_bytes_,
size_t max_bytes_before_external_group_by_,
bool empty_result_for_aggregation_by_empty_set_,
const std::string & tmp_path_, size_t max_threads_,
VolumePtr tmp_volume_, size_t max_threads_,
size_t min_free_disk_space_)
: src_header(src_header_),
keys(keys_), aggregates(aggregates_), keys_size(keys.size()), aggregates_size(aggregates.size()),
@ -881,7 +883,7 @@ public:
group_by_two_level_threshold(group_by_two_level_threshold_), group_by_two_level_threshold_bytes(group_by_two_level_threshold_bytes_),
max_bytes_before_external_group_by(max_bytes_before_external_group_by_),
empty_result_for_aggregation_by_empty_set(empty_result_for_aggregation_by_empty_set_),
tmp_path(tmp_path_), max_threads(max_threads_),
tmp_volume(tmp_volume_), max_threads(max_threads_),
min_free_disk_space(min_free_disk_space_)
{
}
@ -889,7 +891,7 @@ public:
/// Only parameters that matter during merge.
Params(const Block & intermediate_header_,
const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_, bool overflow_row_, size_t max_threads_)
: Params(Block(), keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, 0, 0, 0, false, "", max_threads_, 0)
: Params(Block(), keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, 0, 0, 0, false, nullptr, max_threads_, 0)
{
intermediate_header = intermediate_header_;
}
@ -955,6 +957,7 @@ public:
void setCancellationHook(const CancellationHook cancellation_hook);
/// For external aggregation.
void writeToTemporaryFile(AggregatedDataVariants & data_variants, const String & tmp_path);
void writeToTemporaryFile(AggregatedDataVariants & data_variants);
bool hasTemporaryFiles() const { return !temporary_files.empty(); }

View File

@ -19,14 +19,15 @@ namespace ErrorCodes
extern const int PARAMETER_OUT_OF_BOUND;
}
AnalyzedJoin::AnalyzedJoin(const Settings & settings, const String & tmp_path_)
AnalyzedJoin::AnalyzedJoin(const Settings & settings, VolumePtr tmp_volume_)
: size_limits(SizeLimits{settings.max_rows_in_join, settings.max_bytes_in_join, settings.join_overflow_mode})
, default_max_bytes(settings.default_max_bytes_in_join)
, join_use_nulls(settings.join_use_nulls)
, max_joined_block_rows(settings.max_joined_block_size_rows)
, partial_merge_join(settings.partial_merge_join)
, partial_merge_join_optimizations(settings.partial_merge_join_optimizations)
, partial_merge_join_rows_in_right_blocks(settings.partial_merge_join_rows_in_right_blocks)
, tmp_path(tmp_path_)
, tmp_volume(tmp_volume_)
{}
void AnalyzedJoin::addUsingKey(const ASTPtr & ast)

View File

@ -21,6 +21,9 @@ class Block;
struct Settings;
class Volume;
using VolumePtr = std::shared_ptr<Volume>;
class AnalyzedJoin
{
/** Query of the form `SELECT expr(x) AS k FROM t1 ANY LEFT JOIN (SELECT expr(x) AS k FROM t2) USING k`
@ -40,6 +43,7 @@ class AnalyzedJoin
const SizeLimits size_limits;
const size_t default_max_bytes;
const bool join_use_nulls;
const size_t max_joined_block_rows = 0;
const bool partial_merge_join = false;
const bool partial_merge_join_optimizations = false;
const size_t partial_merge_join_rows_in_right_blocks = 0;
@ -61,10 +65,10 @@ class AnalyzedJoin
/// Original name -> name. Only ranamed columns.
std::unordered_map<String, String> renames;
String tmp_path;
VolumePtr tmp_volume;
public:
AnalyzedJoin(const Settings &, const String & tmp_path);
AnalyzedJoin(const Settings &, VolumePtr tmp_volume);
/// for StorageJoin
AnalyzedJoin(SizeLimits limits, bool use_nulls, ASTTableJoin::Kind kind, ASTTableJoin::Strictness strictness,
@ -81,11 +85,12 @@ public:
ASTTableJoin::Kind kind() const { return table_join.kind; }
ASTTableJoin::Strictness strictness() const { return table_join.strictness; }
const SizeLimits & sizeLimits() const { return size_limits; }
const String & getTemporaryPath() const { return tmp_path; }
VolumePtr getTemporaryVolume() { return tmp_volume; }
bool forceNullableRight() const { return join_use_nulls && isLeftOrFull(table_join.kind); }
bool forceNullableLeft() const { return join_use_nulls && isRightOrFull(table_join.kind); }
size_t defaultMaxBytes() const { return default_max_bytes; }
size_t maxJoinedBlockRows() const { return max_joined_block_rows; }
size_t maxRowsInRightBlock() const { return partial_merge_join_rows_in_right_blocks; }
bool enablePartialMergeJoinOptimizations() const { return partial_merge_join_optimizations; }

View File

@ -22,6 +22,7 @@
#include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Storages/CompressionCodecSelector.h>
#include <Disks/DiskLocal.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Interpreters/ActionLocksManager.h>
#include <Core/Settings.h>
@ -95,6 +96,7 @@ namespace ErrorCodes
extern const int SCALAR_ALREADY_EXISTS;
extern const int UNKNOWN_SCALAR;
extern const int NOT_ENOUGH_PRIVILEGES;
extern const int UNKNOWN_POLICY;
}
@ -123,12 +125,14 @@ struct ContextShared
String interserver_scheme; /// http or https
String path; /// Path to the data directory, with a slash at the end.
String tmp_path; /// The path to the temporary files that occur when processing the request.
String flags_path; /// Path to the directory with some control flags for server maintenance.
String user_files_path; /// Path to the directory with user provided files, usable by 'file' table function.
String dictionaries_lib_path; /// Path to the directory with user provided binaries and libraries for external dictionaries.
ConfigurationPtr config; /// Global configuration settings.
String tmp_path; /// Path to the temporary files that occur when processing the request.
mutable VolumePtr tmp_volume; /// Volume for the the temporary files that occur when processing the request.
Databases databases; /// List of databases and tables in them.
mutable std::optional<EmbeddedDictionaries> embedded_dictionaries; /// Metrica's dictionaries. Have lazy initialization.
mutable std::optional<ExternalDictionariesLoader> external_dictionaries_loader;
@ -151,9 +155,9 @@ struct ContextShared
std::unique_ptr<DDLWorker> ddl_worker; /// Process ddl commands from zk.
/// Rules for selecting the compression settings, depending on the size of the part.
mutable std::unique_ptr<CompressionCodecSelector> compression_codec_selector;
/// Storage disk chooser
/// Storage disk chooser for MergeTree engines
mutable std::unique_ptr<DiskSelector> merge_tree_disk_selector;
/// Storage policy chooser
/// Storage policy chooser for MergeTree engines
mutable std::unique_ptr<StoragePolicySelector> merge_tree_storage_policy_selector;
std::optional<MergeTreeSettings> merge_tree_settings; /// Settings of MergeTree* engines.
@ -527,12 +531,6 @@ String Context::getPath() const
return shared->path;
}
String Context::getTemporaryPath() const
{
auto lock = getLock();
return shared->tmp_path;
}
String Context::getFlagsPath() const
{
auto lock = getLock();
@ -551,13 +549,19 @@ String Context::getDictionariesLibPath() const
return shared->dictionaries_lib_path;
}
VolumePtr Context::getTemporaryVolume() const
{
auto lock = getLock();
return shared->tmp_volume;
}
void Context::setPath(const String & path)
{
auto lock = getLock();
shared->path = path;
if (shared->tmp_path.empty())
if (shared->tmp_path.empty() && !shared->tmp_volume)
shared->tmp_path = shared->path + "tmp/";
if (shared->flags_path.empty())
@ -570,10 +574,31 @@ void Context::setPath(const String & path)
shared->dictionaries_lib_path = shared->path + "dictionaries_lib/";
}
void Context::setTemporaryPath(const String & path)
VolumePtr Context::setTemporaryStorage(const String & path, const String & policy_name)
{
auto lock = getLock();
shared->tmp_path = path;
if (policy_name.empty())
{
shared->tmp_path = path;
if (!shared->tmp_path.ends_with('/'))
shared->tmp_path += '/';
auto disk = std::make_shared<DiskLocal>("_tmp_default", shared->tmp_path, 0);
shared->tmp_volume = std::make_shared<Volume>("_tmp_default", std::vector<DiskPtr>{disk}, 0);
}
else
{
StoragePolicyPtr tmp_policy = getStoragePolicySelector()[policy_name];
if (tmp_policy->getVolumes().size() != 1)
throw Exception("Policy " + policy_name + " is used temporary files, such policy should have exactly one volume", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
shared->tmp_volume = tmp_policy->getVolume(0);
}
if (!shared->tmp_volume->disks.size())
throw Exception("No disks volume for temporary files", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
return shared->tmp_volume;
}
void Context::setFlagsPath(const String & path)

View File

@ -91,6 +91,9 @@ class StoragePolicySelector;
class IOutputFormat;
using OutputFormatPtr = std::shared_ptr<IOutputFormat>;
class Volume;
using VolumePtr = std::shared_ptr<Volume>;
#if USE_EMBEDDED_COMPILER
class CompiledExpressionCache;
@ -195,17 +198,19 @@ public:
~Context();
String getPath() const;
String getTemporaryPath() const;
String getFlagsPath() const;
String getUserFilesPath() const;
String getDictionariesLibPath() const;
VolumePtr getTemporaryVolume() const;
void setPath(const String & path);
void setTemporaryPath(const String & path);
void setFlagsPath(const String & path);
void setUserFilesPath(const String & path);
void setDictionariesLibPath(const String & path);
VolumePtr setTemporaryStorage(const String & path, const String & policy_name = "");
using ConfigurationPtr = Poco::AutoPtr<Poco::Util::AbstractConfiguration>;
/// Global application configuration settings.

View File

@ -346,7 +346,7 @@ void ExpressionAction::prepare(Block & sample_block, const Settings & settings,
}
void ExpressionAction::execute(Block & block, bool dry_run) const
void ExpressionAction::execute(Block & block, bool dry_run, ExtraBlockPtr & not_processed) const
{
size_t input_rows_count = block.rows();
@ -477,7 +477,7 @@ void ExpressionAction::execute(Block & block, bool dry_run) const
case JOIN:
{
join->joinBlock(block);
join->joinBlock(block, not_processed);
break;
}
@ -762,6 +762,21 @@ void ExpressionActions::execute(Block & block, bool dry_run) const
}
}
/// @warning It's a tricky method that allows to continue ONLY ONE action in reason of one-to-many ALL JOIN logic.
void ExpressionActions::execute(Block & block, ExtraBlockPtr & not_processed, size_t & start_action) const
{
size_t i = start_action;
start_action = 0;
for (; i < actions.size(); ++i)
{
actions[i].execute(block, false, not_processed);
checkLimits(block);
if (not_processed)
start_action = i;
}
}
bool ExpressionActions::hasTotalsInJoin() const
{
for (const auto & action : actions)

View File

@ -138,8 +138,16 @@ private:
friend class ExpressionActions;
void prepare(Block & sample_block, const Settings & settings, NameSet & names_not_for_constant_folding);
void execute(Block & block, bool dry_run) const;
void executeOnTotals(Block & block) const;
/// Executes action on block (modify it). Block could be splitted in case of JOIN. Then not_processed block is created.
void execute(Block & block, bool dry_run, ExtraBlockPtr & not_processed) const;
void execute(Block & block, bool dry_run) const
{
ExtraBlockPtr extra;
execute(block, dry_run, extra);
}
};
@ -221,6 +229,9 @@ public:
/// Execute the expression on the block. The block must contain all the columns returned by getRequiredColumns.
void execute(Block & block, bool dry_run = false) const;
/// Execute the expression on the block with continuation.
void execute(Block & block, ExtraBlockPtr & not_processed, size_t & start_action) const;
/// Check if joined subquery has totals.
bool hasTotalsInJoin() const;

View File

@ -540,6 +540,7 @@ public:
Strings getAllTriedToLoadNames() const
{
std::lock_guard lock{mutex};
Strings names;
for (auto & [name, info] : infos)
if (info.triedToLoad())

View File

@ -11,6 +11,7 @@ namespace DB
{
class Block;
struct ExtraBlock;
class IJoin
{
@ -23,7 +24,7 @@ public:
/// Join the block with data from left hand of JOIN to the right hand data (that was previously built by calls to addJoinedBlock).
/// Could be called from different threads in parallel.
virtual void joinBlock(Block & block) = 0;
virtual void joinBlock(Block & block, std::shared_ptr<ExtraBlock> & not_processed) = 0;
virtual bool hasTotals() const = 0;
virtual void setTotals(const Block & block) = 0;

View File

@ -96,63 +96,95 @@ Block InterpreterInsertQuery::getSampleBlock(const ASTInsertQuery & query, const
BlockIO InterpreterInsertQuery::execute()
{
const Settings & settings = context.getSettingsRef();
const auto & query = query_ptr->as<ASTInsertQuery &>();
checkAccess(query);
BlockIO res;
StoragePtr table = getTable(query);
auto table_lock = table->lockStructureForShare(true, context.getInitialQueryId());
/// We create a pipeline of several streams, into which we will write data.
BlockOutputStreamPtr out;
/// NOTE: we explicitly ignore bound materialized views when inserting into Kafka Storage.
/// Otherwise we'll get duplicates when MV reads same rows again from Kafka.
if (table->noPushingToViews() && !no_destination)
out = table->write(query_ptr, context);
else
out = std::make_shared<PushingToViewsBlockOutputStream>(table, context, query_ptr, no_destination);
/// Do not squash blocks if it is a sync INSERT into Distributed, since it lead to double bufferization on client and server side.
/// Client-side bufferization might cause excessive timeouts (especially in case of big blocks).
if (!(context.getSettingsRef().insert_distributed_sync && table->isRemote()) && !no_squash)
{
out = std::make_shared<SquashingBlockOutputStream>(
out, out->getHeader(), context.getSettingsRef().min_insert_block_size_rows, context.getSettingsRef().min_insert_block_size_bytes);
}
auto query_sample_block = getSampleBlock(query, table);
/// Actually we don't know structure of input blocks from query/table,
/// because some clients break insertion protocol (columns != header)
out = std::make_shared<AddingDefaultBlockOutputStream>(
out, query_sample_block, out->getHeader(), table->getColumns().getDefaults(), context);
if (const auto & constraints = table->getConstraints(); !constraints.empty())
out = std::make_shared<CheckConstraintsBlockOutputStream>(query.table,
out, query_sample_block, table->getConstraints(), context);
auto out_wrapper = std::make_shared<CountingBlockOutputStream>(out);
out_wrapper->setProcessListElement(context.getProcessListElement());
out = std::move(out_wrapper);
BlockIO res;
/// What type of query: INSERT or INSERT SELECT?
BlockInputStreams in_streams;
size_t out_streams_size = 1;
if (query.select)
{
/// Passing 1 as subquery_depth will disable limiting size of intermediate result.
InterpreterSelectWithUnionQuery interpreter_select{query.select, context, SelectQueryOptions(QueryProcessingStage::Complete, 1)};
/// BlockIO may hold StoragePtrs to temporary tables
res = interpreter_select.execute();
res.out = nullptr;
if (table->supportsParallelInsert() && settings.max_insert_threads > 0)
{
in_streams = interpreter_select.executeWithMultipleStreams(res.pipeline);
out_streams_size = std::min(size_t(settings.max_insert_threads), in_streams.size());
}
else
{
res = interpreter_select.execute();
in_streams.emplace_back(res.in);
res.in = nullptr;
res.out = nullptr;
}
}
res.in = std::make_shared<ConvertingBlockInputStream>(context, res.in, out->getHeader(), ConvertingBlockInputStream::MatchColumnsMode::Position);
res.in = std::make_shared<NullAndDoCopyBlockInputStream>(res.in, out);
BlockOutputStreams out_streams;
auto query_sample_block = getSampleBlock(query, table);
for (size_t i = 0; i < out_streams_size; i++)
{
/// We create a pipeline of several streams, into which we will write data.
BlockOutputStreamPtr out;
/// NOTE: we explicitly ignore bound materialized views when inserting into Kafka Storage.
/// Otherwise we'll get duplicates when MV reads same rows again from Kafka.
if (table->noPushingToViews() && !no_destination)
out = table->write(query_ptr, context);
else
out = std::make_shared<PushingToViewsBlockOutputStream>(table, context, query_ptr, no_destination);
/// Do not squash blocks if it is a sync INSERT into Distributed, since it lead to double bufferization on client and server side.
/// Client-side bufferization might cause excessive timeouts (especially in case of big blocks).
if (!(context.getSettingsRef().insert_distributed_sync && table->isRemote()) && !no_squash)
{
out = std::make_shared<SquashingBlockOutputStream>(
out, out->getHeader(), context.getSettingsRef().min_insert_block_size_rows, context.getSettingsRef().min_insert_block_size_bytes);
}
/// Actually we don't know structure of input blocks from query/table,
/// because some clients break insertion protocol (columns != header)
out = std::make_shared<AddingDefaultBlockOutputStream>(
out, query_sample_block, out->getHeader(), table->getColumns().getDefaults(), context);
if (const auto & constraints = table->getConstraints(); !constraints.empty())
out = std::make_shared<CheckConstraintsBlockOutputStream>(query.table,
out, query_sample_block, table->getConstraints(), context);
auto out_wrapper = std::make_shared<CountingBlockOutputStream>(out);
out_wrapper->setProcessListElement(context.getProcessListElement());
out = std::move(out_wrapper);
out_streams.emplace_back(std::move(out));
}
/// What type of query: INSERT or INSERT SELECT?
if (query.select)
{
for (auto & in_stream : in_streams)
{
in_stream = std::make_shared<ConvertingBlockInputStream>(
context, in_stream, out_streams.at(0)->getHeader(), ConvertingBlockInputStream::MatchColumnsMode::Position);
}
Block in_header = in_streams.at(0)->getHeader();
if (in_streams.size() > 1)
{
for (size_t i = 1; i < in_streams.size(); ++i)
assertBlocksHaveEqualStructure(in_streams[i]->getHeader(), in_header, "INSERT SELECT");
}
res.in = std::make_shared<NullAndDoCopyBlockInputStream>(in_streams, out_streams);
if (!allow_materialized)
{
Block in_header = res.in->getHeader();
for (const auto & column : table->getColumns())
if (column.default_desc.kind == ColumnDefaultKind::Materialized && in_header.has(column.name))
throw Exception("Cannot insert column " + column.name + ", because it is MATERIALIZED column.", ErrorCodes::ILLEGAL_COLUMN);
@ -160,12 +192,12 @@ BlockIO InterpreterInsertQuery::execute()
}
else if (query.data && !query.has_tail) /// can execute without additional data
{
// res.out = std::move(out_streams.at(0));
res.in = std::make_shared<InputStreamFromASTInsertQuery>(query_ptr, nullptr, query_sample_block, context, nullptr);
res.in = std::make_shared<NullAndDoCopyBlockInputStream>(res.in, out);
res.in = std::make_shared<NullAndDoCopyBlockInputStream>(res.in, out_streams.at(0));
}
else
res.out = std::move(out);
res.out = std::move(out_streams.at(0));
res.pipeline.addStorageHolder(table);
return res;

View File

@ -26,7 +26,6 @@
#include <DataStreams/ConvertingBlockInputStream.h>
#include <DataStreams/ReverseBlockInputStream.h>
#include <DataStreams/FillingBlockInputStream.h>
#include <DataStreams/SquashingBlockInputStream.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
@ -75,6 +74,7 @@
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Transforms/FilterTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/InflatingExpressionTransform.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <Processors/Transforms/MergingAggregatedTransform.h>
#include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h>
@ -1104,7 +1104,12 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType type)
{
bool on_totals = type == QueryPipeline::StreamType::Totals;
return std::make_shared<ExpressionTransform>(header, expressions.before_join, on_totals, default_totals);
std::shared_ptr<IProcessor> ret;
if (settings.partial_merge_join)
ret = std::make_shared<InflatingExpressionTransform>(header, expressions.before_join, on_totals, default_totals);
else
ret = std::make_shared<ExpressionTransform>(header, expressions.before_join, on_totals, default_totals);
return ret;
});
}
else
@ -1112,14 +1117,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
header_before_join = pipeline.firstStream()->getHeader();
/// Applies to all sources except stream_with_non_joined_data.
for (auto & stream : pipeline.streams)
stream = std::make_shared<ExpressionBlockInputStream>(stream, expressions.before_join);
if (isMergeJoin(expressions.before_join->getTableJoinAlgo()) && settings.partial_merge_join_optimizations)
{
if (size_t rows_in_block = settings.partial_merge_join_rows_in_left_blocks)
for (auto & stream : pipeline.streams)
stream = std::make_shared<SquashingBlockInputStream>(stream, rows_in_block, 0, true);
}
stream = std::make_shared<InflatingExpressionBlockInputStream>(stream, expressions.before_join);
}
if (JoinPtr join = expressions.before_join->getTableJoinAlgo())
@ -1873,7 +1871,7 @@ void InterpreterSelectQuery::executeAggregation(Pipeline & pipeline, const Expre
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0),
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0),
settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
context->getTemporaryPath(), settings.max_threads, settings.min_free_disk_space_for_temporary_data);
context->getTemporaryVolume(), settings.max_threads, settings.min_free_disk_space_for_temporary_data);
/// If there are several sources, then we perform parallel aggregation
if (pipeline.streams.size() > 1)
@ -1939,7 +1937,7 @@ void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0),
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0),
settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
context->getTemporaryPath(), settings.max_threads, settings.min_free_disk_space_for_temporary_data);
context->getTemporaryVolume(), settings.max_threads, settings.min_free_disk_space_for_temporary_data);
auto transform_params = std::make_shared<AggregatingTransformParams>(params, final);
@ -2165,7 +2163,7 @@ void InterpreterSelectQuery::executeRollupOrCube(Pipeline & pipeline, Modificato
false, settings.max_rows_to_group_by, settings.group_by_overflow_mode,
SettingUInt64(0), SettingUInt64(0),
settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
context->getTemporaryPath(), settings.max_threads, settings.min_free_disk_space_for_temporary_data);
context->getTemporaryVolume(), settings.max_threads, settings.min_free_disk_space_for_temporary_data);
if (modificator == Modificator::ROLLUP)
pipeline.firstStream() = std::make_shared<RollupBlockInputStream>(pipeline.firstStream(), params);
@ -2194,7 +2192,7 @@ void InterpreterSelectQuery::executeRollupOrCube(QueryPipeline & pipeline, Modif
false, settings.max_rows_to_group_by, settings.group_by_overflow_mode,
SettingUInt64(0), SettingUInt64(0),
settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
context->getTemporaryPath(), settings.max_threads, settings.min_free_disk_space_for_temporary_data);
context->getTemporaryVolume(), settings.max_threads, settings.min_free_disk_space_for_temporary_data);
auto transform_params = std::make_shared<AggregatingTransformParams>(params, true);
@ -2278,7 +2276,7 @@ void InterpreterSelectQuery::executeOrder(Pipeline & pipeline, InputSortingInfoP
sorting_stream, output_order_descr, settings.max_block_size, limit,
settings.max_bytes_before_remerge_sort,
settings.max_bytes_before_external_sort / pipeline.streams.size(),
context->getTemporaryPath(), settings.min_free_disk_space_for_temporary_data);
context->getTemporaryVolume(), settings.min_free_disk_space_for_temporary_data);
stream = merging_stream;
});
@ -2360,7 +2358,8 @@ void InterpreterSelectQuery::executeOrder(QueryPipeline & pipeline, InputSorting
return std::make_shared<MergeSortingTransform>(
header, output_order_descr, settings.max_block_size, limit,
settings.max_bytes_before_remerge_sort / pipeline.getNumStreams(),
settings.max_bytes_before_external_sort, context->getTemporaryPath(), settings.min_free_disk_space_for_temporary_data);
settings.max_bytes_before_external_sort, context->getTemporaryVolume(),
settings.min_free_disk_space_for_temporary_data);
});
/// If there are several streams, we merge them into one

View File

@ -1091,7 +1091,7 @@ void Join::joinGet(Block & block, const String & column_name) const
}
void Join::joinBlock(Block & block)
void Join::joinBlock(Block & block, ExtraBlockPtr &)
{
std::shared_lock lock(data->rwlock);

View File

@ -158,7 +158,7 @@ public:
/** Join data from the map (that was previously built by calls to addJoinedBlock) to the block with data from "left" table.
* Could be called from different threads in parallel.
*/
void joinBlock(Block & block) override;
void joinBlock(Block & block, ExtraBlockPtr & not_processed) override;
/// Infer the return type for joinGet function
DataTypePtr joinGetReturnType(const String & column_name) const;

View File

@ -13,6 +13,7 @@
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/TemporaryFileStream.h>
#include <DataStreams/ConcatBlockInputStream.h>
#include <Disks/DiskSpaceMonitor.h>
namespace DB
{
@ -294,36 +295,56 @@ void joinEqualsAnyLeft(const Block & right_block, const Block & right_columns_to
copyRightRange(right_block, right_columns_to_add, right_columns, range.right_start, range.left_length);
}
void joinEquals(const Block & left_block, const Block & right_block, const Block & right_columns_to_add,
MutableColumns & left_columns, MutableColumns & right_columns, const Range & range, bool is_all)
template <bool is_all>
bool joinEquals(const Block & left_block, const Block & right_block, const Block & right_columns_to_add,
MutableColumns & left_columns, MutableColumns & right_columns, Range & range, size_t max_rows [[maybe_unused]])
{
size_t left_rows_to_add = range.left_length;
size_t right_rows_to_add = is_all ? range.right_length : 1;
bool one_more = true;
size_t row_position = range.right_start;
for (size_t right_row = 0; right_row < right_rows_to_add; ++right_row, ++row_position)
if constexpr (is_all)
{
copyLeftRange(left_block, left_columns, range.left_start, left_rows_to_add);
copyRightRange(right_block, right_columns_to_add, right_columns, row_position, left_rows_to_add);
size_t range_rows = range.left_length * range.right_length;
if (range_rows > max_rows)
{
/// We need progress. So we join at least one right row.
range.right_length = max_rows / range.left_length;
if (!range.right_length)
range.right_length = 1;
one_more = false;
}
size_t left_rows_to_add = range.left_length;
size_t row_position = range.right_start;
for (size_t right_row = 0; right_row < range.right_length; ++right_row, ++row_position)
{
copyLeftRange(left_block, left_columns, range.left_start, left_rows_to_add);
copyRightRange(right_block, right_columns_to_add, right_columns, row_position, left_rows_to_add);
}
}
else
{
size_t left_rows_to_add = range.left_length;
copyLeftRange(left_block, left_columns, range.left_start, left_rows_to_add);
copyRightRange(right_block, right_columns_to_add, right_columns, range.right_start, left_rows_to_add);
}
return one_more;
}
void appendNulls(MutableColumns & right_columns, size_t rows_to_add)
{
for (auto & column : right_columns)
column->insertManyDefaults(rows_to_add);
}
template <bool copy_left>
void joinInequalsLeft(const Block & left_block, MutableColumns & left_columns, MutableColumns & right_columns,
size_t start, size_t end, bool copy_left)
size_t start, size_t end)
{
if (end <= start)
return;
size_t rows_to_add = end - start;
if (copy_left)
if constexpr (copy_left)
copyLeftRange(left_block, left_columns, start, rows_to_add);
appendNulls(right_columns, rows_to_add);
/// append nulls
for (auto & column : right_columns)
column->insertManyDefaults(rows_to_add);
}
Blocks blocksListToBlocks(const BlocksList & in_blocks)
@ -386,6 +407,8 @@ void MiniLSM::insert(const BlocksList & blocks)
if (blocks.empty())
return;
const std::string path(volume->getNextDisk()->getPath());
SortedFiles sorted_blocks;
if (blocks.size() > 1)
{
@ -414,6 +437,7 @@ void MiniLSM::merge(std::function<void(const Block &)> callback)
BlockInputStreams inputs = makeSortedInputStreams(sorted_files, sample_block);
MergingSortedBlockInputStream sorted_stream(inputs, sort_description, rows_in_block);
const std::string path(volume->getNextDisk()->getPath());
SortedFiles out;
flushStreamToFiles(path, sample_block, sorted_stream, out, callback);
@ -427,10 +451,11 @@ MergeJoin::MergeJoin(std::shared_ptr<AnalyzedJoin> table_join_, const Block & ri
, size_limits(table_join->sizeLimits())
, right_sample_block(right_sample_block_)
, nullable_right_side(table_join->forceNullableRight())
, is_all(table_join->strictness() == ASTTableJoin::Strictness::All)
, is_all_join(table_join->strictness() == ASTTableJoin::Strictness::All)
, is_inner(isInner(table_join->kind()))
, is_left(isLeft(table_join->kind()))
, skip_not_intersected(table_join->enablePartialMergeJoinOptimizations())
, max_joined_block_rows(table_join->maxJoinedBlockRows())
, max_rows_in_right_block(table_join->maxRowsInRightBlock())
{
if (!isLeft(table_join->kind()) && !isInner(table_join->kind()))
@ -463,7 +488,7 @@ MergeJoin::MergeJoin(std::shared_ptr<AnalyzedJoin> table_join_, const Block & ri
makeSortAndMerge(table_join->keyNamesLeft(), left_sort_description, left_merge_description);
makeSortAndMerge(table_join->keyNamesRight(), right_sort_description, right_merge_description);
lsm = std::make_unique<MiniLSM>(table_join->getTemporaryPath(), right_sample_block, right_sort_description, max_rows_in_right_block);
lsm = std::make_unique<MiniLSM>(table_join->getTemporaryVolume(), right_sample_block, right_sort_description, max_rows_in_right_block);
}
void MergeJoin::setTotals(const Block & totals_block)
@ -567,7 +592,7 @@ bool MergeJoin::addJoinedBlock(const Block & src_block)
return saveRightBlock(std::move(block));
}
void MergeJoin::joinBlock(Block & block)
void MergeJoin::joinBlock(Block & block, ExtraBlockPtr & not_processed)
{
JoinCommon::checkTypesOfKeys(block, table_join->keyNamesLeft(), right_table_keys, table_join->keyNamesRight());
materializeBlockInplace(block);
@ -575,13 +600,23 @@ void MergeJoin::joinBlock(Block & block)
sortBlock(block, left_sort_description);
if (is_in_memory)
joinSortedBlock<true>(block);
{
if (is_all_join)
joinSortedBlock<true, true>(block, not_processed);
else
joinSortedBlock<true, false>(block, not_processed);
}
else
joinSortedBlock<false>(block);
{
if (is_all_join)
joinSortedBlock<false, true>(block, not_processed);
else
joinSortedBlock<false, false>(block, not_processed);
}
}
template <bool in_memory>
void MergeJoin::joinSortedBlock(Block & block)
template <bool in_memory, bool is_all>
void MergeJoin::joinSortedBlock(Block & block, ExtraBlockPtr & not_processed)
{
std::shared_lock lock(rwlock);
@ -590,11 +625,22 @@ void MergeJoin::joinSortedBlock(Block & block)
MutableColumns right_columns = makeMutableColumns(right_columns_to_add, rows_to_reserve);
MergeJoinCursor left_cursor(block, left_merge_description);
size_t left_key_tail = 0;
size_t skip_right = 0;
size_t right_blocks_count = rightBlocksCount<in_memory>();
size_t starting_right_block = 0;
if (not_processed)
{
auto & continuation = static_cast<NotProcessed &>(*not_processed);
left_cursor.nextN(continuation.left_position);
skip_right = continuation.right_position;
starting_right_block = continuation.right_block;
not_processed.reset();
}
if (is_left)
{
for (size_t i = 0; i < right_blocks_count; ++i)
for (size_t i = starting_right_block; i < right_blocks_count; ++i)
{
if (left_cursor.atEnd())
break;
@ -610,11 +656,16 @@ void MergeJoin::joinSortedBlock(Block & block)
std::shared_ptr<Block> right_block = loadRightBlock<in_memory>(i);
leftJoin(left_cursor, block, *right_block, left_columns, right_columns, left_key_tail);
if (!leftJoin<is_all>(left_cursor, block, *right_block, left_columns, right_columns, left_key_tail, skip_right))
{
not_processed = extraBlock<is_all>(block, std::move(left_columns), std::move(right_columns),
left_cursor.position(), skip_right, i);
return;
}
}
left_cursor.nextN(left_key_tail);
joinInequalsLeft(block, left_columns, right_columns, left_cursor.position(), left_cursor.end(), is_all);
joinInequalsLeft<is_all>(block, left_columns, right_columns, left_cursor.position(), left_cursor.end());
//left_cursor.nextN(left_cursor.end() - left_cursor.position());
changeLeftColumns(block, std::move(left_columns));
@ -622,7 +673,7 @@ void MergeJoin::joinSortedBlock(Block & block)
}
else if (is_inner)
{
for (size_t i = 0; i < right_blocks_count; ++i)
for (size_t i = starting_right_block; i < right_blocks_count; ++i)
{
if (left_cursor.atEnd())
break;
@ -638,7 +689,12 @@ void MergeJoin::joinSortedBlock(Block & block)
std::shared_ptr<Block> right_block = loadRightBlock<in_memory>(i);
innerJoin(left_cursor, block, *right_block, left_columns, right_columns, left_key_tail);
if (!innerJoin<is_all>(left_cursor, block, *right_block, left_columns, right_columns, left_key_tail, skip_right))
{
not_processed = extraBlock<is_all>(block, std::move(left_columns), std::move(right_columns),
left_cursor.position(), skip_right, i);
return;
}
}
left_cursor.nextN(left_key_tail);
@ -647,12 +703,30 @@ void MergeJoin::joinSortedBlock(Block & block)
}
}
void MergeJoin::leftJoin(MergeJoinCursor & left_cursor, const Block & left_block, const Block & right_block,
MutableColumns & left_columns, MutableColumns & right_columns, size_t & left_key_tail)
static size_t maxRangeRows(size_t current_rows, size_t max_rows)
{
if (!max_rows)
return std::numeric_limits<size_t>::max();
if (current_rows >= max_rows)
return 0;
return max_rows - current_rows;
}
template <bool is_all>
bool MergeJoin::leftJoin(MergeJoinCursor & left_cursor, const Block & left_block, const Block & right_block,
MutableColumns & left_columns, MutableColumns & right_columns, size_t & left_key_tail,
size_t & skip_right [[maybe_unused]])
{
MergeJoinCursor right_cursor(right_block, right_merge_description);
left_cursor.setCompareNullability(right_cursor);
/// Set right cursor position in first continuation right block
if constexpr (is_all)
{
right_cursor.nextN(skip_right);
skip_right = 0;
}
while (!left_cursor.atEnd() && !right_cursor.atEnd())
{
/// Not zero left_key_tail means there were equality for the last left key in previous leftJoin() call.
@ -662,56 +736,97 @@ void MergeJoin::leftJoin(MergeJoinCursor & left_cursor, const Block & left_block
Range range = left_cursor.getNextEqualRange(right_cursor);
joinInequalsLeft(left_block, left_columns, right_columns, left_unequal_position, range.left_start, is_all);
joinInequalsLeft<is_all>(left_block, left_columns, right_columns, left_unequal_position, range.left_start);
if (range.empty())
break;
if (is_all)
joinEquals(left_block, right_block, right_columns_to_add, left_columns, right_columns, range, is_all);
if constexpr (is_all)
{
size_t max_rows = maxRangeRows(left_columns[0]->size(), max_joined_block_rows);
if (!joinEquals<true>(left_block, right_block, right_columns_to_add, left_columns, right_columns, range, max_rows))
{
right_cursor.nextN(range.right_length);
skip_right = right_cursor.position();
return false;
}
}
else
joinEqualsAnyLeft(right_block, right_columns_to_add, right_columns, range);
right_cursor.nextN(range.right_length);
/// Do not run over last left keys for ALL JOIN (cause of possible duplicates in next right block)
if (is_all && right_cursor.atEnd())
if constexpr (is_all)
{
left_key_tail = range.left_length;
break;
if (right_cursor.atEnd())
{
left_key_tail = range.left_length;
break;
}
}
left_cursor.nextN(range.left_length);
}
return true;
}
void MergeJoin::innerJoin(MergeJoinCursor & left_cursor, const Block & left_block, const Block & right_block,
MutableColumns & left_columns, MutableColumns & right_columns, size_t & left_key_tail)
template <bool is_all>
bool MergeJoin::innerJoin(MergeJoinCursor & left_cursor, const Block & left_block, const Block & right_block,
MutableColumns & left_columns, MutableColumns & right_columns, size_t & left_key_tail,
size_t & skip_right [[maybe_unused]])
{
MergeJoinCursor right_cursor(right_block, right_merge_description);
left_cursor.setCompareNullability(right_cursor);
/// Set right cursor position in first continuation right block
if constexpr (is_all)
{
right_cursor.nextN(skip_right);
skip_right = 0;
}
while (!left_cursor.atEnd() && !right_cursor.atEnd())
{
Range range = left_cursor.getNextEqualRange(right_cursor);
if (range.empty())
break;
joinEquals(left_block, right_block, right_columns_to_add, left_columns, right_columns, range, is_all);
if constexpr (is_all)
{
size_t max_rows = maxRangeRows(left_columns[0]->size(), max_joined_block_rows);
if (!joinEquals<true>(left_block, right_block, right_columns_to_add, left_columns, right_columns, range, max_rows))
{
right_cursor.nextN(range.right_length);
skip_right = right_cursor.position();
return false;
}
}
else
joinEquals<false>(left_block, right_block, right_columns_to_add, left_columns, right_columns, range, 0);
right_cursor.nextN(range.right_length);
/// Do not run over last left keys for ALL JOIN (cause of possible duplicates in next right block)
if (is_all && right_cursor.atEnd())
if constexpr (is_all)
{
left_key_tail = range.left_length;
break;
if (right_cursor.atEnd())
{
left_key_tail = range.left_length;
break;
}
}
left_cursor.nextN(range.left_length);
}
return true;
}
void MergeJoin::changeLeftColumns(Block & block, MutableColumns && columns)
{
if (is_left && !is_all)
if (is_left && !is_all_join)
return;
block.setColumns(std::move(columns));
}
@ -725,6 +840,27 @@ void MergeJoin::addRightColumns(Block & block, MutableColumns && right_columns)
}
}
/// Split block into processed (result) and not processed. Not processed block would be joined next time.
template <bool is_all>
ExtraBlockPtr MergeJoin::extraBlock(Block & processed, MutableColumns && left_columns, MutableColumns && right_columns,
size_t left_position [[maybe_unused]], size_t right_position [[maybe_unused]],
size_t right_block_number [[maybe_unused]])
{
ExtraBlockPtr not_processed;
if constexpr (is_all)
{
not_processed = std::make_shared<NotProcessed>(
NotProcessed{{processed.cloneEmpty()}, left_position, right_position, right_block_number});
not_processed->block.swap(processed);
changeLeftColumns(processed, std::move(left_columns));
addRightColumns(processed, std::move(right_columns));
}
return not_processed;
}
template <bool in_memory>
size_t MergeJoin::rightBlocksCount()
{

View File

@ -17,20 +17,23 @@ class AnalyzedJoin;
class MergeJoinCursor;
struct MergeJoinEqualRange;
class Volume;
using VolumePtr = std::shared_ptr<Volume>;
struct MiniLSM
{
using SortedFiles = std::vector<std::unique_ptr<TemporaryFile>>;
const String & path;
VolumePtr volume;
const Block & sample_block;
const SortDescription & sort_description;
const size_t rows_in_block;
const size_t max_size;
std::vector<SortedFiles> sorted_files;
MiniLSM(const String & path_, const Block & sample_block_, const SortDescription & description,
MiniLSM(VolumePtr volume_, const Block & sample_block_, const SortDescription & description,
size_t rows_in_block_, size_t max_size_ = 16)
: path(path_)
: volume(volume_)
, sample_block(sample_block_)
, sort_description(description)
, rows_in_block(rows_in_block_)
@ -48,13 +51,20 @@ public:
MergeJoin(std::shared_ptr<AnalyzedJoin> table_join_, const Block & right_sample_block);
bool addJoinedBlock(const Block & block) override;
void joinBlock(Block &) override;
void joinBlock(Block &, ExtraBlockPtr & not_processed) override;
void joinTotals(Block &) const override;
void setTotals(const Block &) override;
bool hasTotals() const override { return totals; }
size_t getTotalRowCount() const override { return right_blocks_row_count; }
private:
struct NotProcessed : public ExtraBlock
{
size_t left_position;
size_t right_position;
size_t right_block;
};
/// There're two size limits for right-hand table: max_rows_in_join, max_bytes_in_join.
/// max_bytes is prefered. If it isn't set we approximate it as (max_rows * bytes/row).
struct BlockByteWeight
@ -85,28 +95,35 @@ private:
size_t right_blocks_bytes = 0;
bool is_in_memory = true;
const bool nullable_right_side;
const bool is_all;
const bool is_all_join;
const bool is_inner;
const bool is_left;
const bool skip_not_intersected;
const size_t max_joined_block_rows;
const size_t max_rows_in_right_block;
void changeLeftColumns(Block & block, MutableColumns && columns);
void addRightColumns(Block & block, MutableColumns && columns);
template <bool is_all>
ExtraBlockPtr extraBlock(Block & processed, MutableColumns && left_columns, MutableColumns && right_columns,
size_t left_position, size_t right_position, size_t right_block_number);
void mergeRightBlocks();
template <bool in_memory>
size_t rightBlocksCount();
template <bool in_memory>
void joinSortedBlock(Block & block);
template <bool in_memory, bool is_all>
void joinSortedBlock(Block & block, ExtraBlockPtr & not_processed);
template <bool in_memory>
std::shared_ptr<Block> loadRightBlock(size_t pos);
void leftJoin(MergeJoinCursor & left_cursor, const Block & left_block, const Block & right_block,
MutableColumns & left_columns, MutableColumns & right_columns, size_t & left_key_tail);
void innerJoin(MergeJoinCursor & left_cursor, const Block & left_block, const Block & right_block,
MutableColumns & left_columns, MutableColumns & right_columns, size_t & left_key_tail);
template <bool is_all>
bool leftJoin(MergeJoinCursor & left_cursor, const Block & left_block, const Block & right_block,
MutableColumns & left_columns, MutableColumns & right_columns, size_t & left_key_tail, size_t & skip_right);
template <bool is_all>
bool innerJoin(MergeJoinCursor & left_cursor, const Block & left_block, const Block & right_block,
MutableColumns & left_columns, MutableColumns & right_columns, size_t & left_key_tail, size_t & skip_right);
bool saveRightBlock(Block && block);
void flushRightBlocks();

View File

@ -49,6 +49,7 @@ Block QueryLogElement::createBlock()
{std::make_shared<DataTypeUInt64>(), "memory_usage"},
{std::make_shared<DataTypeString>(), "query"},
{std::make_shared<DataTypeInt32>(), "exception_code"},
{std::make_shared<DataTypeString>(), "exception"},
{std::make_shared<DataTypeString>(), "stack_trace"},
@ -107,6 +108,7 @@ void QueryLogElement::appendToBlock(Block & block) const
columns[i++]->insert(memory_usage);
columns[i++]->insertData(query.data(), query.size());
columns[i++]->insert(exception_code);
columns[i++]->insertData(exception.data(), exception.size());
columns[i++]->insertData(stack_trace.data(), stack_trace.size());

View File

@ -54,6 +54,7 @@ struct QueryLogElement
String query;
Int32 exception_code{}; // because ErrorCodes are int
String exception;
String stack_trace;

View File

@ -816,7 +816,7 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(
SyntaxAnalyzerResult result;
result.storage = storage;
result.source_columns = source_columns_;
result.analyzed_join = std::make_shared<AnalyzedJoin>(settings, context.getTemporaryPath()); /// TODO: move to select_query logic
result.analyzed_join = std::make_shared<AnalyzedJoin>(settings, context.getTemporaryVolume()); /// TODO: move to select_query logic
if (storage)
collectSourceColumns(storage->getColumns(), result.source_columns, (select_query != nullptr));

View File

@ -12,6 +12,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
namespace
{
@ -31,8 +36,19 @@ std::shared_ptr<TSystemLog> createSystemLog(
String database = config.getString(config_prefix + ".database", default_database_name);
String table = config.getString(config_prefix + ".table", default_table_name);
String partition_by = config.getString(config_prefix + ".partition_by", "toYYYYMM(event_date)");
String engine = "ENGINE = MergeTree PARTITION BY (" + partition_by + ") ORDER BY (event_date, event_time)";
String engine;
if (config.has(config_prefix + ".engine"))
{
if (config.has(config_prefix + ".partition_by"))
throw Exception("If 'engine' is specified for system table, PARTITION BY parameters should be specified directly inside 'engine' and 'partition_by' setting doesn't make sense", ErrorCodes::BAD_ARGUMENTS);
engine = config.getString(config_prefix + ".engine");
}
else
{
String partition_by = config.getString(config_prefix + ".partition_by", "toYYYYMM(event_date)");
engine = "ENGINE = MergeTree PARTITION BY (" + partition_by + ") ORDER BY (event_date, event_time) SETTINGS index_granularity = 1024";
}
size_t flush_interval_milliseconds = config.getUInt64(config_prefix + ".flush_interval_milliseconds", DEFAULT_SYSTEM_LOG_FLUSH_INTERVAL_MILLISECONDS);

View File

@ -160,15 +160,23 @@ void ThreadStatus::initQueryProfiler()
const auto & settings = query_context->getSettingsRef();
if (settings.query_profiler_real_time_period_ns > 0)
query_profiler_real = std::make_unique<QueryProfilerReal>(
/* thread_id */ os_thread_id,
/* period */ static_cast<UInt32>(settings.query_profiler_real_time_period_ns));
try
{
if (settings.query_profiler_real_time_period_ns > 0)
query_profiler_real = std::make_unique<QueryProfilerReal>(
/* thread_id */ os_thread_id,
/* period */ static_cast<UInt32>(settings.query_profiler_real_time_period_ns));
if (settings.query_profiler_cpu_time_period_ns > 0)
query_profiler_cpu = std::make_unique<QueryProfilerCpu>(
/* thread_id */ os_thread_id,
/* period */ static_cast<UInt32>(settings.query_profiler_cpu_time_period_ns));
if (settings.query_profiler_cpu_time_period_ns > 0)
query_profiler_cpu = std::make_unique<QueryProfilerCpu>(
/* thread_id */ os_thread_id,
/* period */ static_cast<UInt32>(settings.query_profiler_cpu_time_period_ns));
}
catch (...)
{
/// QueryProfiler is optional.
tryLogCurrentException("ThreadStatus", "Cannot initialize QueryProfiler");
}
}
void ThreadStatus::finalizeQueryProfiler()

View File

@ -163,6 +163,7 @@ static void onExceptionBeforeStart(const String & query_for_logging, Context & c
elem.query_start_time = current_time;
elem.query = query_for_logging;
elem.exception_code = getCurrentExceptionCode();
elem.exception = getCurrentExceptionMessage(false);
elem.client_info = context.getClientInfo();
@ -496,6 +497,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
elem.event_time = time(nullptr);
elem.query_duration_ms = 1000 * (elem.event_time - elem.query_start_time);
elem.exception_code = getCurrentExceptionCode();
elem.exception = getCurrentExceptionMessage(false);
QueryStatus * process_list_elem = context.getProcessListElement();
@ -573,14 +575,17 @@ BlockIO executeQuery(
BlockIO streams;
std::tie(ast, streams) = executeQueryImpl(query.data(), query.data() + query.size(), context,
internal, stage, !may_have_embedded_data, nullptr, allow_processors);
if (streams.in)
if (const auto * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get()))
{
const auto * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get());
String format_name = ast_query_with_output && (ast_query_with_output->format != nullptr)
? getIdentifierName(ast_query_with_output->format) : context.getDefaultFormat();
String format_name = ast_query_with_output->format
? getIdentifierName(ast_query_with_output->format)
: context.getDefaultFormat();
if (format_name == "Null")
streams.null_format = true;
}
return streams;
}
@ -590,7 +595,7 @@ void executeQuery(
WriteBuffer & ostr,
bool allow_into_outfile,
Context & context,
std::function<void(const String &)> set_content_type,
std::function<void(const String &, const String &)> set_content_type_and_format,
std::function<void(const String &)> set_query_id)
{
PODArray<char> parse_buf;
@ -680,8 +685,8 @@ void executeQuery(
out->onProgress(progress);
});
if (set_content_type)
set_content_type(out->getContentType());
if (set_content_type_and_format)
set_content_type_and_format(out->getContentType(), format_name);
if (set_query_id)
set_query_id(context.getClientInfo().current_query_id);
@ -742,8 +747,8 @@ void executeQuery(
out->onProgress(progress);
});
if (set_content_type)
set_content_type(out->getContentType());
if (set_content_type_and_format)
set_content_type_and_format(out->getContentType(), format_name);
if (set_query_id)
set_query_id(context.getClientInfo().current_query_id);

View File

@ -19,7 +19,7 @@ void executeQuery(
WriteBuffer & ostr, /// Where to write query output to.
bool allow_into_outfile, /// If true and the query contains INTO OUTFILE section, redirect output to that file.
Context & context, /// DB, tables, data types, storage engines, functions, aggregate functions...
std::function<void(const String &)> set_content_type, /// If non-empty callback is passed, it will be called with the Content-Type of the result.
std::function<void(const String &, const String &)> set_content_type_and_format, /// If non-empty callback is passed, it will be called with the Content-Type and the Format of the result.
std::function<void(const String &)> set_query_id /// If non-empty callback is passed, it will be called with the query id.
);

View File

@ -79,7 +79,7 @@ int main(int argc, char ** argv)
Aggregator::Params params(
stream->getHeader(), {0, 1}, aggregate_descriptions,
false, 0, OverflowMode::THROW, 0, 0, 0, false, "", 1, 0);
false, 0, OverflowMode::THROW, 0, 0, 0, false, nullptr, 1, 0);
Aggregator aggregator(params);

View File

@ -10,8 +10,8 @@ namespace DB
class ParserArray : public IParserBase
{
protected:
const char * getName() const { return "array"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
const char * getName() const override { return "array"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
@ -22,8 +22,8 @@ protected:
class ParserParenthesisExpression : public IParserBase
{
protected:
const char * getName() const { return "parenthesized expression"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
const char * getName() const override { return "parenthesized expression"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
@ -32,8 +32,8 @@ protected:
class ParserSubquery : public IParserBase
{
protected:
const char * getName() const { return "SELECT subquery"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
const char * getName() const override { return "SELECT subquery"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
@ -42,8 +42,8 @@ protected:
class ParserIdentifier : public IParserBase
{
protected:
const char * getName() const { return "identifier"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
const char * getName() const override { return "identifier"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
@ -52,16 +52,16 @@ protected:
class ParserCompoundIdentifier : public IParserBase
{
protected:
const char * getName() const { return "compound identifier"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
const char * getName() const override { return "compound identifier"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
/// Just *
class ParserAsterisk : public IParserBase
{
protected:
const char * getName() const { return "asterisk"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
const char * getName() const override { return "asterisk"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
/** Something like t.* or db.table.*
@ -69,8 +69,8 @@ protected:
class ParserQualifiedAsterisk : public IParserBase
{
protected:
const char * getName() const { return "qualified asterisk"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
const char * getName() const override { return "qualified asterisk"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
/** COLUMNS('<regular expression>')
@ -78,8 +78,8 @@ protected:
class ParserColumnsMatcher : public IParserBase
{
protected:
const char * getName() const { return "COLUMNS matcher"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
const char * getName() const override { return "COLUMNS matcher"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
/** A function, for example, f(x, y + 1, g(z)).
@ -93,16 +93,16 @@ class ParserFunction : public IParserBase
public:
ParserFunction(bool allow_function_parameters_ = true) : allow_function_parameters(allow_function_parameters_) {}
protected:
const char * getName() const { return "function"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
const char * getName() const override { return "function"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
bool allow_function_parameters;
};
class ParserCodecDeclarationList : public IParserBase
{
protected:
const char * getName() const { return "codec declaration list"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
const char * getName() const override { return "codec declaration list"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
/** Parse compression codec
@ -111,8 +111,8 @@ protected:
class ParserCodec : public IParserBase
{
protected:
const char * getName() const { return "codec"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
const char * getName() const override { return "codec"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
class ParserCastExpression : public IParserBase
@ -176,8 +176,8 @@ protected:
class ParserNull : public IParserBase
{
protected:
const char * getName() const { return "NULL"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
const char * getName() const override { return "NULL"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
@ -186,8 +186,8 @@ protected:
class ParserNumber : public IParserBase
{
protected:
const char * getName() const { return "number"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
const char * getName() const override { return "number"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
/** Unsigned integer, used in right hand side of tuple access operator (x.1).
@ -195,8 +195,8 @@ protected:
class ParserUnsignedInteger : public IParserBase
{
protected:
const char * getName() const { return "unsigned integer"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
const char * getName() const override { return "unsigned integer"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
@ -205,8 +205,8 @@ protected:
class ParserStringLiteral : public IParserBase
{
protected:
const char * getName() const { return "string literal"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
const char * getName() const override { return "string literal"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
@ -219,8 +219,8 @@ protected:
class ParserArrayOfLiterals : public IParserBase
{
protected:
const char * getName() const { return "array"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
const char * getName() const override { return "array"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
@ -229,8 +229,8 @@ protected:
class ParserLiteral : public IParserBase
{
protected:
const char * getName() const { return "literal"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
const char * getName() const override { return "literal"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
@ -246,8 +246,8 @@ private:
bool allow_alias_without_as_keyword;
const char * getName() const { return "alias"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
const char * getName() const override { return "alias"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
@ -257,8 +257,8 @@ private:
class ParserSubstitution : public IParserBase
{
protected:
const char * getName() const { return "substitution"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
const char * getName() const override { return "substitution"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
@ -267,8 +267,8 @@ protected:
class ParserExpressionElement : public IParserBase
{
protected:
const char * getName() const { return "element of expression"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
const char * getName() const override { return "element of expression"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
@ -283,8 +283,8 @@ protected:
ParserPtr elem_parser;
bool allow_alias_without_as_keyword;
const char * getName() const { return "element of expression with optional alias"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
const char * getName() const override { return "element of expression with optional alias"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
@ -296,8 +296,8 @@ protected:
class ParserOrderByElement : public IParserBase
{
protected:
const char * getName() const { return "element of ORDER BY expression"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
const char * getName() const override { return "element of ORDER BY expression"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
/** Parser for function with arguments like KEY VALUE (space separated)
@ -316,8 +316,8 @@ protected:
class ParserIdentifierWithOptionalParameters : public IParserBase
{
protected:
const char * getName() const { return "identifier with optional parameters"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
const char * getName() const override{ return "identifier with optional parameters"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
/** Element of TTL expression - same as expression element, but in addition,
@ -326,8 +326,8 @@ protected:
class ParserTTLElement : public IParserBase
{
protected:
const char * getName() const { return "element of TTL expression"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
const char * getName() const override { return "element of TTL expression"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
}

View File

@ -27,8 +27,8 @@ public:
{
}
protected:
const char * getName() const { return "list of elements"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
const char * getName() const override { return "list of elements"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
private:
ParserPtr elem_parser;
ParserPtr separator_parser;
@ -63,9 +63,9 @@ public:
}
protected:
const char * getName() const { return "list, delimited by binary operators"; }
const char * getName() const override { return "list, delimited by binary operators"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
@ -86,9 +86,9 @@ public:
}
protected:
const char * getName() const { return "list, delimited by operator of variable arity"; }
const char * getName() const override { return "list, delimited by operator of variable arity"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
@ -110,8 +110,8 @@ public:
}
protected:
const char * getName() const { return "expression with prefix unary operator"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
const char * getName() const override { return "expression with prefix unary operator"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
@ -121,9 +121,9 @@ private:
static const char * operators[];
protected:
const char * getName() const { return "array element expression"; }
const char * getName() const override{ return "array element expression"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
@ -133,9 +133,9 @@ private:
static const char * operators[];
protected:
const char * getName() const { return "tuple element expression"; }
const char * getName() const override { return "tuple element expression"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
@ -146,9 +146,9 @@ private:
ParserPrefixUnaryOperatorExpression operator_parser {operators, std::make_unique<ParserTupleElementExpression>()};
protected:
const char * getName() const { return "unary minus expression"; }
const char * getName() const override { return "unary minus expression"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
@ -159,9 +159,9 @@ private:
ParserLeftAssociativeBinaryOperatorList operator_parser {operators, std::make_unique<ParserUnaryMinusExpression>()};
protected:
const char * getName() const { return "multiplicative expression"; }
const char * getName() const override{ return "multiplicative expression"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override
{
return operator_parser.parse(pos, node, expected);
}
@ -174,8 +174,8 @@ class ParserIntervalOperatorExpression : public IParserBase
protected:
ParserMultiplicativeExpression next_parser;
const char * getName() const { return "INTERVAL operator expression"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
const char * getName() const override{ return "INTERVAL operator expression"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
@ -186,9 +186,9 @@ private:
ParserLeftAssociativeBinaryOperatorList operator_parser {operators, std::make_unique<ParserIntervalOperatorExpression>()};
protected:
const char * getName() const { return "additive expression"; }
const char * getName() const override{ return "additive expression"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override
{
return operator_parser.parse(pos, node, expected);
}
@ -200,9 +200,9 @@ class ParserConcatExpression : public IParserBase
ParserVariableArityOperatorList operator_parser {"||", "concat", std::make_unique<ParserAdditiveExpression>()};
protected:
const char * getName() const { return "string concatenation expression"; }
const char * getName() const override { return "string concatenation expression"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override
{
return operator_parser.parse(pos, node, expected);
}
@ -215,9 +215,9 @@ private:
ParserConcatExpression elem_parser;
protected:
const char * getName() const { return "BETWEEN expression"; }
const char * getName() const override { return "BETWEEN expression"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
@ -228,9 +228,9 @@ private:
ParserLeftAssociativeBinaryOperatorList operator_parser {operators, std::make_unique<ParserBetweenExpression>()};
protected:
const char * getName() const { return "comparison expression"; }
const char * getName() const override{ return "comparison expression"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override
{
return operator_parser.parse(pos, node, expected);
}
@ -257,9 +257,9 @@ private:
ParserPrefixUnaryOperatorExpression operator_parser {operators, std::make_unique<ParserNullityChecking>()};
protected:
const char * getName() const { return "logical-NOT expression"; }
const char * getName() const override{ return "logical-NOT expression"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override
{
return operator_parser.parse(pos, node, expected);
}
@ -272,9 +272,9 @@ private:
ParserVariableArityOperatorList operator_parser {"AND", "and", std::make_unique<ParserLogicalNotExpression>()};
protected:
const char * getName() const { return "logical-AND expression"; }
const char * getName() const override { return "logical-AND expression"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override
{
return operator_parser.parse(pos, node, expected);
}
@ -287,9 +287,9 @@ private:
ParserVariableArityOperatorList operator_parser {"OR", "or", std::make_unique<ParserLogicalAndExpression>()};
protected:
const char * getName() const { return "logical-OR expression"; }
const char * getName() const override { return "logical-OR expression"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override
{
return operator_parser.parse(pos, node, expected);
}
@ -305,9 +305,9 @@ private:
ParserLogicalOrExpression elem_parser;
protected:
const char * getName() const { return "expression with ternary operator"; }
const char * getName() const override { return "expression with ternary operator"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
@ -317,9 +317,9 @@ private:
ParserTernaryOperatorExpression elem_parser;
protected:
const char * getName() const { return "lambda expression"; }
const char * getName() const override { return "lambda expression"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
@ -333,9 +333,9 @@ public:
protected:
ParserPtr impl;
const char * getName() const { return "expression with optional alias"; }
const char * getName() const override { return "expression with optional alias"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override
{
return impl->parse(pos, node, expected);
}
@ -352,8 +352,8 @@ public:
protected:
bool allow_alias_without_as_keyword;
const char * getName() const { return "list of expressions"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
const char * getName() const override { return "list of expressions"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
@ -365,16 +365,16 @@ public:
private:
ParserExpressionList nested_parser;
protected:
const char * getName() const { return "not empty list of expressions"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
const char * getName() const override { return "not empty list of expressions"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
class ParserOrderByExpressionList : public IParserBase
{
protected:
const char * getName() const { return "order by expression"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
const char * getName() const override { return "order by expression"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
@ -399,8 +399,8 @@ protected:
class ParserTTLExpressionList : public IParserBase
{
protected:
const char * getName() const { return "ttl expression"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
const char * getName() const override { return "ttl expression"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
}

Some files were not shown because too many files have changed in this diff Show More