Merge branch 'master' of github.com:yandex/ClickHouse

This commit is contained in:
BayoNet 2018-11-30 12:36:24 +03:00
commit be27d6a14b
238 changed files with 3104 additions and 1841 deletions

View File

@ -29,7 +29,9 @@ AllowShortFunctionsOnASingleLine: InlineOnly
AlwaysBreakTemplateDeclarations: true
IndentCaseLabels: true
SpaceAfterTemplateKeyword: true
SpaceBeforeCpp11BracedList: false
SortIncludes: true
IndentPPDirectives: AfterHash
IncludeCategories:
- Regex: '^<[a-z_]+>'
Priority: 1

View File

@ -1 +1,4 @@
* Настройка `enable_optimize_predicate_expression` выключена по-умолчанию.
### Улучшения:
* Файлы *-preprocessed.xml записываются в директорию с данными (/var/lib/clickhouse/preprocessed_configs). Для /etc/clickhouse-server больше не нужен +w для пользователя clickhouse. Для удобства создан симлинк /var/lib/clickhouse/preprocessed_configs -> /etc/clickhouse-server/preprocessed

View File

@ -236,7 +236,7 @@ include(GNUInstallDirs)
include (cmake/find_ssl.cmake)
include (cmake/lib_name.cmake)
include (cmake/find_icu4c.cmake)
include (cmake/find_icu.cmake)
include (cmake/find_boost.cmake)
include (cmake/find_zlib.cmake)
include (cmake/find_zstd.cmake)

394
cmake/Modules/FindICU.cmake Normal file
View File

@ -0,0 +1,394 @@
# Distributed under the OSI-approved BSD 3-Clause License. See accompanying
# file Copyright.txt or https://cmake.org/licensing for details.
#.rst:
# FindICU
# -------
#
# Find the International Components for Unicode (ICU) libraries and
# programs.
#
# This module supports multiple components.
# Components can include any of: ``data``, ``i18n``, ``io``, ``le``,
# ``lx``, ``test``, ``tu`` and ``uc``.
#
# Note that on Windows ``data`` is named ``dt`` and ``i18n`` is named
# ``in``; any of the names may be used, and the appropriate
# platform-specific library name will be automatically selected.
#
# This module reports information about the ICU installation in
# several variables. General variables::
#
# ICU_VERSION - ICU release version
# ICU_FOUND - true if the main programs and libraries were found
# ICU_LIBRARIES - component libraries to be linked
# ICU_INCLUDE_DIRS - the directories containing the ICU headers
#
# Imported targets::
#
# ICU::<C>
#
# Where ``<C>`` is the name of an ICU component, for example
# ``ICU::i18n``.
#
# ICU programs are reported in::
#
# ICU_GENCNVAL_EXECUTABLE - path to gencnval executable
# ICU_ICUINFO_EXECUTABLE - path to icuinfo executable
# ICU_GENBRK_EXECUTABLE - path to genbrk executable
# ICU_ICU-CONFIG_EXECUTABLE - path to icu-config executable
# ICU_GENRB_EXECUTABLE - path to genrb executable
# ICU_GENDICT_EXECUTABLE - path to gendict executable
# ICU_DERB_EXECUTABLE - path to derb executable
# ICU_PKGDATA_EXECUTABLE - path to pkgdata executable
# ICU_UCONV_EXECUTABLE - path to uconv executable
# ICU_GENCFU_EXECUTABLE - path to gencfu executable
# ICU_MAKECONV_EXECUTABLE - path to makeconv executable
# ICU_GENNORM2_EXECUTABLE - path to gennorm2 executable
# ICU_GENCCODE_EXECUTABLE - path to genccode executable
# ICU_GENSPREP_EXECUTABLE - path to gensprep executable
# ICU_ICUPKG_EXECUTABLE - path to icupkg executable
# ICU_GENCMN_EXECUTABLE - path to gencmn executable
#
# ICU component libraries are reported in::
#
# ICU_<C>_FOUND - ON if component was found
# ICU_<C>_LIBRARIES - libraries for component
#
# ICU datafiles are reported in::
#
# ICU_MAKEFILE_INC - Makefile.inc
# ICU_PKGDATA_INC - pkgdata.inc
#
# Note that ``<C>`` is the uppercased name of the component.
#
# This module reads hints about search results from::
#
# ICU_ROOT - the root of the ICU installation
#
# The environment variable ``ICU_ROOT`` may also be used; the
# ICU_ROOT variable takes precedence.
#
# The following cache variables may also be set::
#
# ICU_<P>_EXECUTABLE - the path to executable <P>
# ICU_INCLUDE_DIR - the directory containing the ICU headers
# ICU_<C>_LIBRARY - the library for component <C>
#
# .. note::
#
# In most cases none of the above variables will require setting,
# unless multiple ICU versions are available and a specific version
# is required.
#
# Other variables one may set to control this module are::
#
# ICU_DEBUG - Set to ON to enable debug output from FindICU.
# Written by Roger Leigh <rleigh@codelibre.net>
set(icu_programs
gencnval
icuinfo
genbrk
icu-config
genrb
gendict
derb
pkgdata
uconv
gencfu
makeconv
gennorm2
genccode
gensprep
icupkg
gencmn)
set(icu_data
Makefile.inc
pkgdata.inc)
# The ICU checks are contained in a function due to the large number
# of temporary variables needed.
function(_ICU_FIND)
# Set up search paths, taking compiler into account. Search ICU_ROOT,
# with ICU_ROOT in the environment as a fallback if unset.
if(ICU_ROOT)
list(APPEND icu_roots "${ICU_ROOT}")
else()
if(NOT "$ENV{ICU_ROOT}" STREQUAL "")
file(TO_CMAKE_PATH "$ENV{ICU_ROOT}" NATIVE_PATH)
list(APPEND icu_roots "${NATIVE_PATH}")
set(ICU_ROOT "${NATIVE_PATH}"
CACHE PATH "Location of the ICU installation" FORCE)
endif()
endif()
# Find include directory
list(APPEND icu_include_suffixes "include")
find_path(ICU_INCLUDE_DIR
NAMES "unicode/utypes.h"
HINTS ${icu_roots}
PATH_SUFFIXES ${icu_include_suffixes}
DOC "ICU include directory")
set(ICU_INCLUDE_DIR "${ICU_INCLUDE_DIR}" PARENT_SCOPE)
# Get version
if(ICU_INCLUDE_DIR AND EXISTS "${ICU_INCLUDE_DIR}/unicode/uvernum.h")
file(STRINGS "${ICU_INCLUDE_DIR}/unicode/uvernum.h" icu_header_str
REGEX "^#define[\t ]+U_ICU_VERSION[\t ]+\".*\".*")
string(REGEX REPLACE "^#define[\t ]+U_ICU_VERSION[\t ]+\"([^ \\n]*)\".*"
"\\1" icu_version_string "${icu_header_str}")
set(ICU_VERSION "${icu_version_string}")
set(ICU_VERSION "${icu_version_string}" PARENT_SCOPE)
unset(icu_header_str)
unset(icu_version_string)
endif()
if(CMAKE_SIZEOF_VOID_P EQUAL 8)
# 64-bit binary directory
set(_bin64 "bin64")
# 64-bit library directory
set(_lib64 "lib64")
endif()
# Find all ICU programs
list(APPEND icu_binary_suffixes "${_bin64}" "bin")
foreach(program ${icu_programs})
string(TOUPPER "${program}" program_upcase)
set(cache_var "ICU_${program_upcase}_EXECUTABLE")
set(program_var "ICU_${program_upcase}_EXECUTABLE")
find_program("${cache_var}" "${program}"
HINTS ${icu_roots}
PATH_SUFFIXES ${icu_binary_suffixes}
DOC "ICU ${program} executable")
mark_as_advanced(cache_var)
set("${program_var}" "${${cache_var}}" PARENT_SCOPE)
endforeach()
# Find all ICU libraries
list(APPEND icu_library_suffixes "${_lib64}" "lib")
set(ICU_REQUIRED_LIBS_FOUND ON)
foreach(component ${ICU_FIND_COMPONENTS})
string(TOUPPER "${component}" component_upcase)
set(component_cache "ICU_${component_upcase}_LIBRARY")
set(component_cache_release "${component_cache}_RELEASE")
set(component_cache_debug "${component_cache}_DEBUG")
set(component_found "${component_upcase}_FOUND")
set(component_libnames "icu${component}")
set(component_debug_libnames "icu${component}d")
# Special case deliberate library naming mismatches between Unix
# and Windows builds
unset(component_libnames)
unset(component_debug_libnames)
list(APPEND component_libnames "icu${component}")
list(APPEND component_debug_libnames "icu${component}d")
if(component STREQUAL "data")
list(APPEND component_libnames "icudt")
# Note there is no debug variant at present
list(APPEND component_debug_libnames "icudtd")
endif()
if(component STREQUAL "dt")
list(APPEND component_libnames "icudata")
# Note there is no debug variant at present
list(APPEND component_debug_libnames "icudatad")
endif()
if(component STREQUAL "i18n")
list(APPEND component_libnames "icuin")
list(APPEND component_debug_libnames "icuind")
endif()
if(component STREQUAL "in")
list(APPEND component_libnames "icui18n")
list(APPEND component_debug_libnames "icui18nd")
endif()
find_library("${component_cache_release}" ${component_libnames}
HINTS ${icu_roots}
PATH_SUFFIXES ${icu_library_suffixes}
DOC "ICU ${component} library (release)")
find_library("${component_cache_debug}" ${component_debug_libnames}
HINTS ${icu_roots}
PATH_SUFFIXES ${icu_library_suffixes}
DOC "ICU ${component} library (debug)")
include(SelectLibraryConfigurations)
select_library_configurations(ICU_${component_upcase})
mark_as_advanced("${component_cache_release}" "${component_cache_debug}")
if(${component_cache})
set("${component_found}" ON)
list(APPEND ICU_LIBRARY "${${component_cache}}")
endif()
mark_as_advanced("${component_found}")
set("${component_cache}" "${${component_cache}}" PARENT_SCOPE)
set("${component_found}" "${${component_found}}" PARENT_SCOPE)
if(${component_found})
if (ICU_FIND_REQUIRED_${component})
list(APPEND ICU_LIBS_FOUND "${component} (required)")
else()
list(APPEND ICU_LIBS_FOUND "${component} (optional)")
endif()
else()
if (ICU_FIND_REQUIRED_${component})
set(ICU_REQUIRED_LIBS_FOUND OFF)
list(APPEND ICU_LIBS_NOTFOUND "${component} (required)")
else()
list(APPEND ICU_LIBS_NOTFOUND "${component} (optional)")
endif()
endif()
endforeach()
set(_ICU_REQUIRED_LIBS_FOUND "${ICU_REQUIRED_LIBS_FOUND}" PARENT_SCOPE)
set(ICU_LIBRARY "${ICU_LIBRARY}" PARENT_SCOPE)
# Find all ICU data files
if(CMAKE_LIBRARY_ARCHITECTURE)
list(APPEND icu_data_suffixes
"${_lib64}/${CMAKE_LIBRARY_ARCHITECTURE}/icu/${ICU_VERSION}"
"lib/${CMAKE_LIBRARY_ARCHITECTURE}/icu/${ICU_VERSION}"
"${_lib64}/${CMAKE_LIBRARY_ARCHITECTURE}/icu"
"lib/${CMAKE_LIBRARY_ARCHITECTURE}/icu")
endif()
list(APPEND icu_data_suffixes
"${_lib64}/icu/${ICU_VERSION}"
"lib/icu/${ICU_VERSION}"
"${_lib64}/icu"
"lib/icu")
foreach(data ${icu_data})
string(TOUPPER "${data}" data_upcase)
string(REPLACE "." "_" data_upcase "${data_upcase}")
set(cache_var "ICU_${data_upcase}")
set(data_var "ICU_${data_upcase}")
find_file("${cache_var}" "${data}"
HINTS ${icu_roots}
PATH_SUFFIXES ${icu_data_suffixes}
DOC "ICU ${data} data file")
mark_as_advanced(cache_var)
set("${data_var}" "${${cache_var}}" PARENT_SCOPE)
endforeach()
if(NOT ICU_FIND_QUIETLY)
if(ICU_LIBS_FOUND)
message(STATUS "Found the following ICU libraries:")
foreach(found ${ICU_LIBS_FOUND})
message(STATUS " ${found}")
endforeach()
endif()
if(ICU_LIBS_NOTFOUND)
message(STATUS "The following ICU libraries were not found:")
foreach(notfound ${ICU_LIBS_NOTFOUND})
message(STATUS " ${notfound}")
endforeach()
endif()
endif()
if(ICU_DEBUG)
message(STATUS "--------FindICU.cmake search debug--------")
message(STATUS "ICU binary path search order: ${icu_roots}")
message(STATUS "ICU include path search order: ${icu_roots}")
message(STATUS "ICU library path search order: ${icu_roots}")
message(STATUS "----------------")
endif()
endfunction()
_ICU_FIND()
include(FindPackageHandleStandardArgs)
FIND_PACKAGE_HANDLE_STANDARD_ARGS(ICU
FOUND_VAR ICU_FOUND
REQUIRED_VARS ICU_INCLUDE_DIR
ICU_LIBRARY
_ICU_REQUIRED_LIBS_FOUND
VERSION_VAR ICU_VERSION
FAIL_MESSAGE "Failed to find all ICU components")
unset(_ICU_REQUIRED_LIBS_FOUND)
if(ICU_FOUND)
set(ICU_INCLUDE_DIRS "${ICU_INCLUDE_DIR}")
set(ICU_LIBRARIES "${ICU_LIBRARY}")
foreach(_ICU_component ${ICU_FIND_COMPONENTS})
string(TOUPPER "${_ICU_component}" _ICU_component_upcase)
set(_ICU_component_cache "ICU_${_ICU_component_upcase}_LIBRARY")
set(_ICU_component_cache_release "ICU_${_ICU_component_upcase}_LIBRARY_RELEASE")
set(_ICU_component_cache_debug "ICU_${_ICU_component_upcase}_LIBRARY_DEBUG")
set(_ICU_component_lib "ICU_${_ICU_component_upcase}_LIBRARIES")
set(_ICU_component_found "${_ICU_component_upcase}_FOUND")
set(_ICU_imported_target "ICU::${_ICU_component}")
if(${_ICU_component_found})
set("${_ICU_component_lib}" "${${_ICU_component_cache}}")
if(NOT TARGET ${_ICU_imported_target})
add_library(${_ICU_imported_target} UNKNOWN IMPORTED)
if(ICU_INCLUDE_DIR)
set_target_properties(${_ICU_imported_target} PROPERTIES
INTERFACE_INCLUDE_DIRECTORIES "${ICU_INCLUDE_DIR}")
endif()
if(EXISTS "${${_ICU_component_cache}}")
set_target_properties(${_ICU_imported_target} PROPERTIES
IMPORTED_LINK_INTERFACE_LANGUAGES "CXX"
IMPORTED_LOCATION "${${_ICU_component_cache}}")
endif()
if(EXISTS "${${_ICU_component_cache_release}}")
set_property(TARGET ${_ICU_imported_target} APPEND PROPERTY
IMPORTED_CONFIGURATIONS RELEASE)
set_target_properties(${_ICU_imported_target} PROPERTIES
IMPORTED_LINK_INTERFACE_LANGUAGES_RELEASE "CXX"
IMPORTED_LOCATION_RELEASE "${${_ICU_component_cache_release}}")
endif()
if(EXISTS "${${_ICU_component_cache_debug}}")
set_property(TARGET ${_ICU_imported_target} APPEND PROPERTY
IMPORTED_CONFIGURATIONS DEBUG)
set_target_properties(${_ICU_imported_target} PROPERTIES
IMPORTED_LINK_INTERFACE_LANGUAGES_DEBUG "CXX"
IMPORTED_LOCATION_DEBUG "${${_ICU_component_cache_debug}}")
endif()
endif()
endif()
unset(_ICU_component_upcase)
unset(_ICU_component_cache)
unset(_ICU_component_lib)
unset(_ICU_component_found)
unset(_ICU_imported_target)
endforeach()
endif()
if(ICU_DEBUG)
message(STATUS "--------FindICU.cmake results debug--------")
message(STATUS "ICU found: ${ICU_FOUND}")
message(STATUS "ICU_VERSION number: ${ICU_VERSION}")
message(STATUS "ICU_ROOT directory: ${ICU_ROOT}")
message(STATUS "ICU_INCLUDE_DIR directory: ${ICU_INCLUDE_DIR}")
message(STATUS "ICU_LIBRARIES: ${ICU_LIBRARIES}")
foreach(program IN LISTS icu_programs)
string(TOUPPER "${program}" program_upcase)
set(program_lib "ICU_${program_upcase}_EXECUTABLE")
message(STATUS "${program} program: ${${program_lib}}")
unset(program_upcase)
unset(program_lib)
endforeach()
foreach(data IN LISTS icu_data)
string(TOUPPER "${data}" data_upcase)
string(REPLACE "." "_" data_upcase "${data_upcase}")
set(data_lib "ICU_${data_upcase}")
message(STATUS "${data} data: ${${data_lib}}")
unset(data_upcase)
unset(data_lib)
endforeach()
foreach(component IN LISTS ICU_FIND_COMPONENTS)
string(TOUPPER "${component}" component_upcase)
set(component_lib "ICU_${component_upcase}_LIBRARIES")
set(component_found "${component_upcase}_FOUND")
message(STATUS "${component} library found: ${${component_found}}")
message(STATUS "${component} library: ${${component_lib}}")
unset(component_upcase)
unset(component_lib)
unset(component_found)
endforeach()
message(STATUS "----------------")
endif()
unset(icu_programs)

16
cmake/find_icu.cmake Normal file
View File

@ -0,0 +1,16 @@
option (ENABLE_ICU "Enable ICU" ON)
if (ENABLE_ICU)
find_package(ICU COMPONENTS data i18n uc) # TODO: remove Modules/FindICU.cmake after cmake 3.7
#set (ICU_LIBRARIES ${ICU_I18N_LIBRARY} ${ICU_UC_LIBRARY} ${ICU_DATA_LIBRARY} CACHE STRING "")
set (ICU_LIBRARIES ICU::i18n ICU::uc ICU::data CACHE STRING "")
if (ICU_FOUND)
set(USE_ICU 1)
endif ()
endif ()
if (USE_ICU)
message (STATUS "Using icu=${USE_ICU}: ${ICU_INCLUDE_DIR} : ${ICU_LIBRARIES}")
else ()
message (STATUS "Build without ICU (support for collations and charset conversion functions will be disabled)")
endif ()

View File

@ -1,21 +0,0 @@
option (ENABLE_ICU "Enable ICU" ON)
if (ENABLE_ICU)
set (ICU_PATHS "/usr/local/opt/icu4c/lib")
set (ICU_INCLUDE_PATHS "/usr/local/opt/icu4c/include")
find_library (ICUI18N icui18n PATHS ${ICU_PATHS})
find_library (ICUUC icuuc PATHS ${ICU_PATHS})
find_library (ICUDATA icudata PATHS ${ICU_PATHS})
set (ICU_LIBS ${ICUI18N} ${ICUUC} ${ICUDATA})
find_path (ICU_INCLUDE_DIR NAMES unicode/unistr.h PATHS ${ICU_INCLUDE_PATHS})
if (ICU_INCLUDE_DIR AND ICU_LIBS)
set(USE_ICU 1)
endif ()
endif ()
if (USE_ICU)
message (STATUS "Using icu=${USE_ICU}: ${ICU_INCLUDE_DIR} : ${ICU_LIBS}")
else ()
message (STATUS "Build without ICU (support for collations and charset conversion functions will be disabled)")
endif ()

View File

@ -0,0 +1,5 @@
function(generate_code TEMPLATE_FILE)
foreach(NAME IN LISTS ARGN)
configure_file (${TEMPLATE_FILE}.cpp.in ${CMAKE_CURRENT_BINARY_DIR}/generated/${TEMPLATE_FILE}_${NAME}.cpp)
endforeach()
endfunction()

View File

@ -52,5 +52,11 @@ target_include_directories(jemalloc PRIVATE
target_compile_definitions(jemalloc PRIVATE -DJEMALLOC_NO_PRIVATE_NAMESPACE)
if (CMAKE_BUILD_TYPE_UC STREQUAL "DEBUG")
target_compile_definitions(jemalloc PRIVATE -DJEMALLOC_DEBUG=1)
target_compile_definitions(jemalloc PRIVATE -DJEMALLOC_DEBUG=1 -DJEMALLOC_PROF=1)
if (USE_UNWIND)
target_compile_definitions (jemalloc PRIVATE -DJEMALLOC_PROF_LIBUNWIND=1)
target_include_directories (jemalloc BEFORE PRIVATE ${UNWIND_INCLUDE_DIR})
target_link_libraries (jemalloc PRIVATE ${UNWIND_LIBRARY})
endif ()
endif ()

View File

@ -63,9 +63,6 @@ add_headers_and_sources(dbms src/Core)
add_headers_and_sources(dbms src/DataStreams)
add_headers_and_sources(dbms src/DataTypes)
add_headers_and_sources(dbms src/Databases)
add_headers_and_sources(dbms src/Dictionaries)
add_headers_and_sources(dbms src/Dictionaries/Embedded)
add_headers_and_sources(dbms src/Dictionaries/Embedded/GeodataProviders)
add_headers_and_sources(dbms src/Interpreters)
add_headers_and_sources(dbms src/Interpreters/ClusterProxy)
add_headers_and_sources(dbms src/Columns)
@ -184,8 +181,11 @@ target_link_libraries (dbms
clickhouse_common_config
PUBLIC
clickhouse_common_io
pocoext
PRIVATE
clickhouse_dictionaries
clickhouse_dictionaries_embedded
PUBLIC
pocoext
${MYSQLXX_LIBRARY}
PRIVATE
${BTRIE_LIBRARIES}
@ -242,8 +242,7 @@ endif()
target_link_libraries (dbms PRIVATE ${Poco_Foundation_LIBRARY})
if (USE_ICU)
target_link_libraries (dbms PRIVATE ${ICU_LIBS})
target_include_directories (dbms SYSTEM PRIVATE ${ICU_INCLUDE_DIR})
target_link_libraries (dbms PRIVATE ${ICU_LIBRARIES})
endif ()
if (USE_CAPNP)

View File

@ -2,10 +2,10 @@
set(VERSION_REVISION 54409 CACHE STRING "")
set(VERSION_MAJOR 18 CACHE STRING "")
set(VERSION_MINOR 14 CACHE STRING "")
set(VERSION_PATCH 9 CACHE STRING "")
set(VERSION_GITHASH 457f8fd495b2812940e69c15ab5b499cd863aae4 CACHE STRING "")
set(VERSION_DESCRIBE v18.14.9-testing CACHE STRING "")
set(VERSION_STRING 18.14.9 CACHE STRING "")
set(VERSION_PATCH 17 CACHE STRING "")
set(VERSION_GITHASH ac2895d769c3dcf070530dec7fcfdcf87bfa852a CACHE STRING "")
set(VERSION_DESCRIBE v18.14.17-testing CACHE STRING "")
set(VERSION_STRING 18.14.17 CACHE STRING "")
# end of autochange
set(VERSION_EXTRA "" CACHE STRING "")

View File

@ -31,6 +31,7 @@
#include <Interpreters/Context.h>
#include <Client/Connection.h>
#include <Common/InterruptListener.h>
#include <Common/Config/configReadClient.h>
/** A tool for evaluating ClickHouse performance.
@ -46,17 +47,17 @@ namespace ErrorCodes
extern const int EMPTY_DATA_PASSED;
}
class Benchmark
class Benchmark : public Poco::Util::Application
{
public:
Benchmark(unsigned concurrency_, double delay_,
const String & host_, UInt16 port_, const String & default_database_,
const String & host_, UInt16 port_, bool secure_, const String & default_database_,
const String & user_, const String & password_, const String & stage,
bool randomize_, size_t max_iterations_, double max_time_,
const String & json_path_, const ConnectionTimeouts & timeouts, const Settings & settings_)
:
concurrency(concurrency_), delay(delay_), queue(concurrency),
connections(concurrency, host_, port_, default_database_, user_, password_, timeouts),
connections(concurrency, host_, port_, default_database_, user_, password_, timeouts, "benchmark", Protocol::Compression::Enable, secure_ ? Protocol::Secure::Enable : Protocol::Secure::Disable),
randomize(randomize_), max_iterations(max_iterations_), max_time(max_time_),
json_path(json_path_), settings(settings_), global_context(Context::createGlobal()), pool(concurrency)
{
@ -75,13 +76,26 @@ public:
else
throw Exception("Unknown query processing stage: " + stage, ErrorCodes::BAD_ARGUMENTS);
}
void initialize(Poco::Util::Application & self [[maybe_unused]])
{
std::string home_path;
const char * home_path_cstr = getenv("HOME");
if (home_path_cstr)
home_path = home_path_cstr;
configReadClient(config(), home_path);
}
int main(const std::vector<std::string> &)
{
if (!json_path.empty() && Poco::File(json_path).exists()) /// Clear file with previous results
{
Poco::File(json_path).remove();
}
readQueries();
run();
runBenchmark();
return 0;
}
private:
@ -220,7 +234,7 @@ private:
return true;
}
void run()
void runBenchmark()
{
pcg64 generator(randomSeed());
std::uniform_int_distribution<size_t> distribution(0, queries.size() - 1);
@ -432,6 +446,7 @@ int mainEntryClickHouseBenchmark(int argc, char ** argv)
("json", value<std::string>()->default_value(""), "write final report to specified file in JSON format")
("host,h", value<std::string>()->default_value("localhost"), "")
("port", value<UInt16>()->default_value(9000), "")
("secure,s", "Use TLS connection")
("user", value<std::string>()->default_value("default"), "")
("password", value<std::string>()->default_value(""), "")
("database", value<std::string>()->default_value("default"), "")
@ -470,6 +485,7 @@ int mainEntryClickHouseBenchmark(int argc, char ** argv)
options["delay"].as<double>(),
options["host"].as<std::string>(),
options["port"].as<UInt16>(),
options.count("secure"),
options["database"].as<std::string>(),
options["user"].as<std::string>(),
options["password"].as<std::string>(),
@ -480,6 +496,7 @@ int mainEntryClickHouseBenchmark(int argc, char ** argv)
options["json"].as<std::string>(),
ConnectionTimeouts::getTCPTimeoutsWithoutFailover(settings),
settings);
return benchmark.run();
}
catch (...)
{

View File

@ -1,5 +1,5 @@
add_library (clickhouse-benchmark-lib ${LINK_MODE} Benchmark.cpp)
target_link_libraries (clickhouse-benchmark-lib PRIVATE clickhouse-client-lib clickhouse_common_io ${Boost_PROGRAM_OPTIONS_LIBRARY})
target_link_libraries (clickhouse-benchmark-lib PRIVATE clickhouse_aggregate_functions clickhouse-client-lib clickhouse_common_config clickhouse_common_io ${Boost_PROGRAM_OPTIONS_LIBRARY})
target_include_directories (clickhouse-benchmark-lib SYSTEM PRIVATE ${PCG_RANDOM_INCLUDE_DIR})
if (CLICKHOUSE_SPLIT_BINARY)

View File

@ -59,6 +59,7 @@
#include <Common/InterruptListener.h>
#include <Functions/registerFunctions.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <Common/Config/configReadClient.h>
#if USE_READLINE
#include "Suggest.h" // Y_IGNORE
@ -206,22 +207,7 @@ private:
if (home_path_cstr)
home_path = home_path_cstr;
std::string config_path;
if (config().has("config-file"))
config_path = config().getString("config-file");
else if (Poco::File("./clickhouse-client.xml").exists())
config_path = "./clickhouse-client.xml";
else if (!home_path.empty() && Poco::File(home_path + "/.clickhouse-client/config.xml").exists())
config_path = home_path + "/.clickhouse-client/config.xml";
else if (Poco::File("/etc/clickhouse-client/config.xml").exists())
config_path = "/etc/clickhouse-client/config.xml";
if (!config_path.empty())
{
ConfigProcessor config_processor(config_path);
auto loaded_config = config_processor.loadConfig();
config().add(loaded_config.configuration);
}
configReadClient(config(), home_path);
context.setApplicationType(Context::ApplicationType::CLIENT);

View File

@ -26,18 +26,18 @@ static void setupLogging(const std::string & log_level)
static std::string extractFromConfig(
const std::string & config_path, const std::string & key, bool process_zk_includes, bool try_get = false)
{
ConfigProcessor processor(config_path, /* throw_on_bad_incl = */ false, /* log_to_console = */ false);
DB::ConfigProcessor processor(config_path, /* throw_on_bad_incl = */ false, /* log_to_console = */ false);
bool has_zk_includes;
XMLDocumentPtr config_xml = processor.processConfig(&has_zk_includes);
DB::XMLDocumentPtr config_xml = processor.processConfig(&has_zk_includes);
if (has_zk_includes && process_zk_includes)
{
ConfigurationPtr bootstrap_configuration(new Poco::Util::XMLConfiguration(config_xml));
DB::ConfigurationPtr bootstrap_configuration(new Poco::Util::XMLConfiguration(config_xml));
zkutil::ZooKeeperPtr zookeeper = std::make_shared<zkutil::ZooKeeper>(
*bootstrap_configuration, "zookeeper");
zkutil::ZooKeeperNodeCache zk_node_cache([&] { return zookeeper; });
config_xml = processor.processConfig(&has_zk_includes, &zk_node_cache);
}
ConfigurationPtr configuration(new Poco::Util::XMLConfiguration(config_xml));
DB::ConfigurationPtr configuration(new Poco::Util::XMLConfiguration(config_xml));
// do not throw exception if not found
if (try_get)
return configuration->getString(key, "");

View File

@ -30,6 +30,7 @@
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <TableFunctions/registerTableFunctions.h>
#include <Storages/registerStorages.h>
#include <Dictionaries/registerDictionaries.h>
#include <boost/program_options/options_description.hpp>
#include <boost/program_options.hpp>
@ -115,9 +116,11 @@ try
/// Load config files if exists
if (config().has("config-file") || Poco::File("config.xml").exists())
{
ConfigProcessor config_processor(config().getString("config-file", "config.xml"), false, true);
const auto config_path = config().getString("config-file", "config.xml");
ConfigProcessor config_processor(config_path, false, true);
config_processor.setConfigPath(Poco::Path(config_path).makeParent().toString());
auto loaded_config = config_processor.loadConfig();
config_processor.savePreprocessedConfig(loaded_config);
config_processor.savePreprocessedConfig(loaded_config, loaded_config.configuration->getString("path", DBMS_DEFAULT_PATH));
config().add(loaded_config.configuration.duplicate(), PRIO_DEFAULT, false);
}
@ -140,6 +143,7 @@ try
registerAggregateFunctions();
registerTableFunctions();
registerStorages();
registerDictionaries();
/// Maybe useless
if (config().has("macros"))
@ -348,7 +352,7 @@ void LocalServer::setupUsers()
const auto users_config_path = config().getString("users_config", config().getString("config-file", "config.xml"));
ConfigProcessor config_processor(users_config_path);
const auto loaded_config = config_processor.loadConfig();
config_processor.savePreprocessedConfig(loaded_config);
config_processor.savePreprocessedConfig(loaded_config, config().getString("path", DBMS_DEFAULT_PATH));
users_config = loaded_config.configuration;
}
else

View File

@ -1,5 +1,5 @@
add_library (clickhouse-performance-test-lib ${LINK_MODE} PerformanceTest.cpp)
target_link_libraries (clickhouse-performance-test-lib PRIVATE dbms clickhouse_common_io ${Boost_PROGRAM_OPTIONS_LIBRARY})
target_link_libraries (clickhouse-performance-test-lib PRIVATE dbms clickhouse_common_io clickhouse_common_config ${Boost_PROGRAM_OPTIONS_LIBRARY})
target_include_directories (clickhouse-performance-test-lib SYSTEM PRIVATE ${PCG_RANDOM_INCLUDE_DIR})
if (CLICKHOUSE_SPLIT_BINARY)

View File

@ -30,7 +30,9 @@
#include <Poco/SAX/InputSource.h>
#include <Poco/Util/XMLConfiguration.h>
#include <Poco/XML/XMLStream.h>
#include <Poco/Util/Application.h>
#include <Common/InterruptListener.h>
#include <Common/Config/configReadClient.h>
#ifndef __clang__
#pragma GCC optimize("-fno-var-tracking-assignments")
@ -487,17 +489,18 @@ struct Stats
double Stats::avg_rows_speed_precision = 0.001;
double Stats::avg_bytes_speed_precision = 0.001;
class PerformanceTest
class PerformanceTest : public Poco::Util::Application
{
public:
using Strings = std::vector<String>;
PerformanceTest(const String & host_,
const UInt16 port_,
const bool secure_,
const String & default_database_,
const String & user_,
const String & password_,
const bool & lite_output_,
const bool lite_output_,
const String & profiles_file_,
Strings && input_files_,
Strings && tests_tags_,
@ -507,7 +510,7 @@ public:
Strings && tests_names_regexp_,
Strings && skip_names_regexp_,
const ConnectionTimeouts & timeouts)
: connection(host_, port_, default_database_, user_, password_, timeouts),
: connection(host_, port_, default_database_, user_, password_, timeouts, "performance-test", Protocol::Compression::Enable, secure_ ? Protocol::Secure::Enable : Protocol::Secure::Disable),
gotSIGINT(false),
lite_output(lite_output_),
profiles_file(profiles_file_),
@ -523,7 +526,19 @@ public:
{
throw DB::Exception("No tests were specified", DB::ErrorCodes::BAD_ARGUMENTS);
}
}
void initialize(Poco::Util::Application & self [[maybe_unused]])
{
std::string home_path;
const char * home_path_cstr = getenv("HOME");
if (home_path_cstr)
home_path = home_path_cstr;
configReadClient(Poco::Util::Application::instance().config(), home_path);
}
int main(const std::vector < std::string > & /* args */)
{
std::string name;
UInt64 version_major;
UInt64 version_minor;
@ -536,6 +551,8 @@ public:
server_version = ss.str();
processTestsConfigurations(input_files);
return 0;
}
private:
@ -1383,21 +1400,28 @@ try
using Strings = std::vector<String>;
boost::program_options::options_description desc("Allowed options");
desc.add_options()("help", "produce help message")("lite", "use lite version of output")(
"profiles-file", value<String>()->default_value(""), "Specify a file with global profiles")(
"host,h", value<String>()->default_value("localhost"), "")("port", value<UInt16>()->default_value(9000), "")(
"database", value<String>()->default_value("default"), "")("user", value<String>()->default_value("default"), "")(
"password", value<String>()->default_value(""), "")("tags", value<Strings>()->multitoken(), "Run only tests with tag")(
"skip-tags", value<Strings>()->multitoken(), "Do not run tests with tag")("names",
value<Strings>()->multitoken(),
"Run tests with specific name")("skip-names", value<Strings>()->multitoken(), "Do not run tests with name")(
"names-regexp", value<Strings>()->multitoken(), "Run tests with names matching regexp")("skip-names-regexp",
value<Strings>()->multitoken(),
"Do not run tests with names matching regexp")("recursive,r", "Recurse in directories to find all xml's");
desc.add_options()
("help", "produce help message")
("lite", "use lite version of output")
("profiles-file", value<String>()->default_value(""), "Specify a file with global profiles")
("host,h", value<String>()->default_value("localhost"), "")
("port", value<UInt16>()->default_value(9000), "")
("secure,s", "Use TLS connection")
("database", value<String>()->default_value("default"), "")
("user", value<String>()->default_value("default"), "")
("password", value<String>()->default_value(""), "")
("tags", value<Strings>()->multitoken(), "Run only tests with tag")
("skip-tags", value<Strings>()->multitoken(), "Do not run tests with tag")
("names", value<Strings>()->multitoken(), "Run tests with specific name")
("skip-names", value<Strings>()->multitoken(), "Do not run tests with name")
("names-regexp", value<Strings>()->multitoken(), "Run tests with names matching regexp")
("skip-names-regexp", value<Strings>()->multitoken(), "Do not run tests with names matching regexp")
("recursive,r", "Recurse in directories to find all xml's");
/// These options will not be displayed in --help
boost::program_options::options_description hidden("Hidden options");
hidden.add_options()("input-files", value<std::vector<String>>(), "");
hidden.add_options()
("input-files", value<std::vector<String>>(), "");
/// But they will be legit, though. And they must be given without name
boost::program_options::positional_options_description positional;
@ -1474,8 +1498,10 @@ try
DB::UseSSL use_ssl;
DB::PerformanceTest performanceTest(options["host"].as<String>(),
DB::PerformanceTest performance_test(
options["host"].as<String>(),
options["port"].as<UInt16>(),
options.count("secure"),
options["database"].as<String>(),
options["user"].as<String>(),
options["password"].as<String>(),
@ -1489,8 +1515,7 @@ try
std::move(tests_names_regexp),
std::move(skip_names_regexp),
timeouts);
return 0;
return performance_test.run();
}
catch (...)
{

View File

@ -36,6 +36,7 @@
#include <Functions/registerFunctions.h>
#include <TableFunctions/registerTableFunctions.h>
#include <Storages/registerStorages.h>
#include <Dictionaries/registerDictionaries.h>
#include <Common/Config/ConfigReloader.h>
#include "HTTPHandlerFactory.h"
#include "MetricsTransmitter.h"
@ -96,7 +97,7 @@ void Server::initialize(Poco::Util::Application & self)
std::string Server::getDefaultCorePath() const
{
return getCanonicalPath(config().getString("path")) + "cores";
return getCanonicalPath(config().getString("path", DBMS_DEFAULT_PATH)) + "cores";
}
int Server::main(const std::vector<std::string> & /*args*/)
@ -109,6 +110,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
registerAggregateFunctions();
registerTableFunctions();
registerStorages();
registerDictionaries();
CurrentMetrics::set(CurrentMetrics::Revision, ClickHouseRevision::get());
CurrentMetrics::set(CurrentMetrics::VersionInteger, ClickHouseRevision::getVersionInteger());
@ -129,7 +131,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
ConfigProcessor config_processor(config_path);
loaded_config = config_processor.loadConfigWithZooKeeperIncludes(
main_config_zk_node_cache, /* fallback_to_preprocessed = */ true);
config_processor.savePreprocessedConfig(loaded_config);
config_processor.savePreprocessedConfig(loaded_config, config().getString("path", DBMS_DEFAULT_PATH));
config().removeConfiguration(old_configuration.get());
config().add(loaded_config.configuration.duplicate(), PRIO_DEFAULT, false);
}
@ -160,7 +162,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
}
#endif
std::string path = getCanonicalPath(config().getString("path"));
std::string path = getCanonicalPath(config().getString("path", DBMS_DEFAULT_PATH));
std::string default_database = config().getString("default_database", "default");
global_context->setPath(path);
@ -301,6 +303,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
std::string include_from_path = config().getString("include_from", "/etc/metrika.xml");
auto main_config_reloader = std::make_unique<ConfigReloader>(config_path,
include_from_path,
config().getString("path", ""),
std::move(main_config_zk_node_cache),
[&](ConfigurationPtr config)
{
@ -322,6 +325,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
}
auto users_config_reloader = std::make_unique<ConfigReloader>(users_config_path,
include_from_path,
config().getString("path", ""),
zkutil::ZooKeeperNodeCache([&] { return global_context->getZooKeeper(); }),
[&](ConfigurationPtr config) { global_context->setUsersConfig(config); },
/* already_loaded = */ false);

View File

@ -27,7 +27,6 @@
#include <Poco/Net/SecureStreamSocket.h>
#endif
namespace CurrentMetrics
{
extern const Metric SendExternalTables;
@ -730,14 +729,6 @@ BlockStreamProfileInfo Connection::receiveProfileInfo()
return profile_info;
}
void Connection::fillBlockExtraInfo(BlockExtraInfo & info) const
{
info.is_valid = true;
info.host = host;
info.resolved_address = getResolvedAddress().toString();
info.port = port;
info.user = user;
}
void Connection::throwUnexpectedPacket(UInt64 packet_type, const char * expected) const
{

View File

@ -159,11 +159,6 @@ public:
*/
void disconnect();
/** Fill in the information that is needed when getting the block for some tasks
* (so far only for a DESCRIBE TABLE query with Distributed tables).
*/
void fillBlockExtraInfo(BlockExtraInfo & info) const;
size_t outBytesCount() const { return out ? out->count() : 0; }
size_t inBytesCount() const { return in ? in->count() : 0; }

View File

@ -26,7 +26,7 @@ MultiplexedConnections::MultiplexedConnections(Connection & connection, const Se
MultiplexedConnections::MultiplexedConnections(
std::vector<IConnectionPool::Entry> && connections,
const Settings & settings_, const ThrottlerPtr & throttler, bool append_extra_info)
const Settings & settings_, const ThrottlerPtr & throttler)
: settings(settings_)
{
/// If we didn't get any connections from pool and getMany() did not throw exceptions, this means that
@ -48,9 +48,6 @@ MultiplexedConnections::MultiplexedConnections(
}
active_connection_count = connections.size();
if (append_extra_info)
block_extra_info = std::make_unique<BlockExtraInfo>();
}
void MultiplexedConnections::sendExternalTablesData(std::vector<ExternalTablesData> & data)
@ -126,24 +123,9 @@ Connection::Packet MultiplexedConnections::receivePacket()
{
std::lock_guard<std::mutex> lock(cancel_mutex);
Connection::Packet packet = receivePacketUnlocked();
if (block_extra_info)
{
if (packet.type == Protocol::Server::Data)
current_connection->fillBlockExtraInfo(*block_extra_info);
else
block_extra_info->is_valid = false;
}
return packet;
}
BlockExtraInfo MultiplexedConnections::getBlockExtraInfo() const
{
if (!block_extra_info)
throw Exception("MultiplexedConnections object not configured for block extra info support",
ErrorCodes::LOGICAL_ERROR);
return *block_extra_info;
}
void MultiplexedConnections::disconnect()
{
std::lock_guard<std::mutex> lock(cancel_mutex);

View File

@ -21,12 +21,10 @@ public:
/// Accepts ready connection.
MultiplexedConnections(Connection & connection, const Settings & settings_, const ThrottlerPtr & throttler_);
/** Accepts a vector of connections to replicas of one shard already taken from pool.
* If the append_extra_info flag is set, additional information appended to each received block.
*/
/// Accepts a vector of connections to replicas of one shard already taken from pool.
MultiplexedConnections(
std::vector<IConnectionPool::Entry> && connections,
const Settings & settings_, const ThrottlerPtr & throttler_, bool append_extra_info);
std::vector<IConnectionPool::Entry> && connections,
const Settings & settings_, const ThrottlerPtr & throttler_);
/// Send all content of external tables to replicas.
void sendExternalTablesData(std::vector<ExternalTablesData> & data);
@ -42,9 +40,6 @@ public:
/// Get packet from any replica.
Connection::Packet receivePacket();
/// Get information about the last received packet.
BlockExtraInfo getBlockExtraInfo() const;
/// Break all active connections.
void disconnect();
@ -99,11 +94,8 @@ private:
/// Connection that received last block.
Connection * current_connection = nullptr;
/// Information about the last received block, if supported.
std::unique_ptr<BlockExtraInfo> block_extra_info;
bool sent_query = false;
bool cancelled = false;
/// A mutex for the sendCancel function to execute safely

View File

@ -259,7 +259,7 @@ void ColumnLowCardinality::getPermutation(bool reverse, size_t limit, int nan_di
if (limit == 0)
limit = size();
size_t unique_limit = std::min(limit, getDictionary().size());
size_t unique_limit = getDictionary().size();
Permutation unique_perm;
getDictionary().getNestedColumn()->getPermutation(reverse, unique_limit, nan_direction_hint, unique_perm);

View File

@ -20,6 +20,11 @@
using namespace Poco::XML;
namespace DB
{
/// For cutting prerpocessed path to this base
std::string main_config_path;
/// Extracts from a string the first encountered number consisting of at least two digits.
static std::string numberFromHost(const std::string & s)
@ -40,13 +45,6 @@ static std::string numberFromHost(const std::string & s)
return "";
}
static std::string preprocessedConfigPath(const std::string & path)
{
Poco::Path preprocessed_path(path);
preprocessed_path.setBaseName(preprocessed_path.getBaseName() + PREPROCESSED_SUFFIX);
return preprocessed_path.toString();
}
bool ConfigProcessor::isPreprocessedFile(const std::string & path)
{
return endsWith(Poco::Path(path).getBaseName(), PREPROCESSED_SUFFIX);
@ -59,7 +57,6 @@ ConfigProcessor::ConfigProcessor(
bool log_to_console,
const Substitutions & substitutions_)
: path(path_)
, preprocessed_path(preprocessedConfigPath(path))
, throw_on_bad_incl(throw_on_bad_incl_)
, substitutions(substitutions_)
/// We need larger name pool to allow to support vast amount of users in users.xml files for ClickHouse.
@ -522,7 +519,7 @@ ConfigProcessor::LoadedConfig ConfigProcessor::loadConfig(bool allow_zk_includes
ConfigurationPtr configuration(new Poco::Util::XMLConfiguration(config_xml));
return LoadedConfig{configuration, has_zk_includes, /* loaded_from_preprocessed = */ false, config_xml};
return LoadedConfig{configuration, has_zk_includes, /* loaded_from_preprocessed = */ false, config_xml, path};
}
ConfigProcessor::LoadedConfig ConfigProcessor::loadConfigWithZooKeeperIncludes(
@ -556,11 +553,44 @@ ConfigProcessor::LoadedConfig ConfigProcessor::loadConfigWithZooKeeperIncludes(
ConfigurationPtr configuration(new Poco::Util::XMLConfiguration(config_xml));
return LoadedConfig{configuration, has_zk_includes, !processed_successfully, config_xml};
return LoadedConfig{configuration, has_zk_includes, !processed_successfully, config_xml, path};
}
void ConfigProcessor::savePreprocessedConfig(const LoadedConfig & loaded_config)
void ConfigProcessor::savePreprocessedConfig(const LoadedConfig & loaded_config, std::string preprocessed_dir)
{
if (preprocessed_path.empty())
{
auto new_path = loaded_config.config_path;
if (new_path.substr(0, main_config_path.size()) == main_config_path)
new_path.replace(0, main_config_path.size(), "");
std::replace(new_path.begin(), new_path.end(), '/', '_');
if (preprocessed_dir.empty())
{
if (!loaded_config.configuration->has("path"))
{
// Will use current directory
auto parent_path = Poco::Path(loaded_config.config_path).makeParent();
preprocessed_dir = parent_path.toString();
Poco::Path poco_new_path(new_path);
poco_new_path.setBaseName(poco_new_path.getBaseName() + PREPROCESSED_SUFFIX);
new_path = poco_new_path.toString();
}
else
{
preprocessed_dir = loaded_config.configuration->getString("path") + "/preprocessed_configs/";
}
}
else
{
preprocessed_dir += "/preprocessed_configs/";
}
preprocessed_path = preprocessed_dir + new_path;
auto path = Poco::Path(preprocessed_path).makeParent();
if (!path.toString().empty())
Poco::File(path).createDirectories();
}
try
{
DOMWriter().writeNode(preprocessed_path, loaded_config.preprocessed_xml);
@ -570,3 +600,10 @@ void ConfigProcessor::savePreprocessedConfig(const LoadedConfig & loaded_config)
LOG_WARNING(log, "Couldn't save preprocessed config to " << preprocessed_path << ": " << e.displayText());
}
}
void ConfigProcessor::setConfigPath(const std::string & config_path)
{
main_config_path = config_path;
}
}

View File

@ -24,6 +24,9 @@ namespace zkutil
class ZooKeeperNodeCache;
}
namespace DB
{
using ConfigurationPtr = Poco::AutoPtr<Poco::Util::AbstractConfiguration>;
using XMLDocumentPtr = Poco::AutoPtr<Poco::XML::Document>;
@ -72,6 +75,7 @@ public:
bool has_zk_includes;
bool loaded_from_preprocessed;
XMLDocumentPtr preprocessed_xml;
std::string config_path;
};
/// If allow_zk_includes is true, expect that the configuration XML can contain from_zk nodes.
@ -85,7 +89,12 @@ public:
zkutil::ZooKeeperNodeCache & zk_node_cache,
bool fallback_to_preprocessed = false);
void savePreprocessedConfig(const LoadedConfig & loaded_config);
/// Save preprocessed config to specified directory.
/// If preprocessed_dir is empty - calculate from loaded_config.path + /preprocessed_configs/
void savePreprocessedConfig(const LoadedConfig & loaded_config, std::string preprocessed_dir);
/// Set path of main config.xml . It will be cutted from all configs placed to preprocessed_configs/
void setConfigPath(const std::string & config_path);
public:
using Files = std::vector<std::string>;
@ -99,7 +108,7 @@ public:
private:
const std::string path;
const std::string preprocessed_path;
std::string preprocessed_path;
bool throw_on_bad_incl;
@ -127,3 +136,5 @@ private:
zkutil::ZooKeeperNodeCache * zk_node_cache,
std::unordered_set<std::string> & contributing_zk_paths);
};
}

View File

@ -15,10 +15,12 @@ constexpr decltype(ConfigReloader::reload_interval) ConfigReloader::reload_inter
ConfigReloader::ConfigReloader(
const std::string & path_,
const std::string & include_from_path_,
const std::string & preprocessed_dir_,
zkutil::ZooKeeperNodeCache && zk_node_cache_,
Updater && updater_,
bool already_loaded)
: path(path_), include_from_path(include_from_path_)
, preprocessed_dir(preprocessed_dir_)
, zk_node_cache(std::move(zk_node_cache_))
, updater(std::move(updater_))
{
@ -98,7 +100,7 @@ void ConfigReloader::reloadIfNewer(bool force, bool throw_on_error, bool fallbac
tryLogCurrentException(log, "Error loading config from `" + path + "'");
return;
}
config_processor.savePreprocessedConfig(loaded_config);
config_processor.savePreprocessedConfig(loaded_config, preprocessed_dir);
/** We should remember last modification time if and only if config was sucessfully loaded
* Otherwise a race condition could occur during config files update:

View File

@ -33,6 +33,7 @@ public:
ConfigReloader(
const std::string & path,
const std::string & include_from_path,
const std::string & preprocessed_dir,
zkutil::ZooKeeperNodeCache && zk_node_cache,
Updater && updater,
bool already_loaded);
@ -70,6 +71,7 @@ private:
std::string path;
std::string include_from_path;
std::string preprocessed_dir;
FilesChangesTracker files;
zkutil::ZooKeeperNodeCache zk_node_cache;

View File

@ -0,0 +1,31 @@
#include "configReadClient.h"
#include <Poco/Util/Application.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <Poco/File.h>
#include "ConfigProcessor.h"
namespace DB
{
bool configReadClient(Poco::Util::LayeredConfiguration & config, const std::string & home_path)
{
std::string config_path;
if (config.has("config-file"))
config_path = config.getString("config-file");
else if (Poco::File("./clickhouse-client.xml").exists())
config_path = "./clickhouse-client.xml";
else if (!home_path.empty() && Poco::File(home_path + "/.clickhouse-client/config.xml").exists())
config_path = home_path + "/.clickhouse-client/config.xml";
else if (Poco::File("/etc/clickhouse-client/config.xml").exists())
config_path = "/etc/clickhouse-client/config.xml";
if (!config_path.empty())
{
ConfigProcessor config_processor(config_path);
auto loaded_config = config_processor.loadConfig();
config.add(loaded_config.configuration);
return true;
}
return false;
}
}

View File

@ -0,0 +1,10 @@
#pragma once
#include <string>
namespace Poco { class Logger; namespace Util { class LayeredConfiguration; } }
namespace DB
{
/// Read configuration files related to clickhouse-client like applications. Returns true if any configuration files were read.
bool configReadClient(Poco::Util::LayeredConfiguration & config, const std::string & home_path);
}

View File

@ -23,7 +23,7 @@ int main(int argc, char ** argv)
return 3;
}
ConfigProcessor processor(argv[1], false, true);
DB::ConfigProcessor processor(argv[1], false, true);
auto config = processor.loadConfig().configuration;
zkutil::ZooKeeper zk(*config, "zookeeper");
zkutil::EventPtr watch = std::make_shared<Poco::Event>();

View File

@ -151,22 +151,4 @@ void assertBlocksHaveEqualStructure(const Block & lhs, const Block & rhs, const
/// Calculate difference in structure of blocks and write description into output strings. NOTE It doesn't compare values of constant columns.
void getBlocksDifference(const Block & lhs, const Block & rhs, std::string & out_lhs_diff, std::string & out_rhs_diff);
/** Additional data to the blocks. They are only needed for a query
* DESCRIBE TABLE with Distributed tables.
*/
struct BlockExtraInfo
{
BlockExtraInfo() {}
operator bool() const { return is_valid; }
bool operator!() const { return !is_valid; }
std::string host;
std::string resolved_address;
std::string user;
UInt16 port = 0;
bool is_valid = false;
};
}

View File

@ -66,6 +66,8 @@
/// the number is unmotivated
#define DEFAULT_COUNT_OF_HTTP_CONNECTIONS_PER_ENDPOINT 15
#define DBMS_DEFAULT_PATH "/var/lib/clickhouse/"
// more aliases: https://mailman.videolan.org/pipermail/x264-devel/2014-May/010660.html
#if defined(_MSC_VER)

View File

@ -0,0 +1,28 @@
#include <DataStreams/AddingDefaultBlockInputStream.h>
#include <Interpreters/addMissingDefaults.h>
namespace DB
{
AddingDefaultBlockInputStream::AddingDefaultBlockInputStream(
const BlockInputStreamPtr & input_,
const Block & header_,
const ColumnDefaults & column_defaults_,
const Context & context_)
: input(input_), header(header_),
column_defaults(column_defaults_), context(context_)
{
children.emplace_back(input);
}
Block AddingDefaultBlockInputStream::readImpl()
{
Block src = children.back()->read();
if (!src)
return src;
return addMissingDefaults(src, header.getNamesAndTypesList(), column_defaults, context);
}
}

View File

@ -0,0 +1,40 @@
#pragma once
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Storages/ColumnDefault.h>
namespace DB
{
/** This stream adds three types of columns into block
* 1. Columns, that are missed inside request, but present in table without defaults (missed columns)
* 2. Columns, that are missed inside request, but present in table with defaults (columns with default values)
* 3. Columns that materialized from other columns (materialized columns)
* All three types of columns are materialized (not constants).
*/
class AddingDefaultBlockInputStream : public IProfilingBlockInputStream
{
public:
AddingDefaultBlockInputStream(
const BlockInputStreamPtr & input_,
const Block & header_,
const ColumnDefaults & column_defaults_,
const Context & context_);
String getName() const override { return "AddingDefault"; }
Block getHeader() const override { return header; }
private:
Block readImpl() override;
BlockInputStreamPtr input;
/// Blocks after this stream should have this structure
const Block header;
const ColumnDefaults column_defaults;
const Context & context;
};
}

View File

@ -1,11 +1,5 @@
#include <DataStreams/AddingDefaultBlockOutputStream.h>
#include <Common/typeid_cast.h>
#include <DataTypes/NestedUtils.h>
#include <DataTypes/DataTypeArray.h>
#include <Columns/ColumnArray.h>
#include <Interpreters/evaluateMissingDefaults.h>
#include <Core/Block.h>
#include <Interpreters/addMissingDefaults.h>
namespace DB
@ -13,68 +7,7 @@ namespace DB
void AddingDefaultBlockOutputStream::write(const Block & block)
{
Block res;
/// We take given columns from input block
/// and missed columns without default value (default and meterialized will be computed later)
for (const auto & column : output_block)
{
if (block.has(column.name))
res.insert(block.getByName(column.name));
else if (!column_defaults.count(column.name))
res.insert(column);
}
/// Adds not specified default values.
size_t rows = block.rows();
/// For missing columns of nested structure, you need to create not a column of empty arrays, but a column of arrays of correct lengths.
/// First, remember the offset columns for all arrays in the block.
std::map<String, ColumnPtr> offset_columns;
for (size_t i = 0, size = block.columns(); i < size; ++i)
{
const auto & elem = block.getByPosition(i);
if (const ColumnArray * array = typeid_cast<const ColumnArray *>(&*elem.column))
{
String offsets_name = Nested::extractTableName(elem.name);
auto & offsets_column = offset_columns[offsets_name];
/// If for some reason there are different offset columns for one nested structure, then we take nonempty.
if (!offsets_column || offsets_column->empty())
offsets_column = array->getOffsetsPtr();
}
}
/// In this loop we fill missed columns
for (auto & column : res)
{
if (block.has(column.name))
continue;
String offsets_name = Nested::extractTableName(column.name);
if (offset_columns.count(offsets_name))
{
ColumnPtr offsets_column = offset_columns[offsets_name];
DataTypePtr nested_type = typeid_cast<const DataTypeArray &>(*column.type).getNestedType();
UInt64 nested_rows = rows ? get<UInt64>((*offsets_column)[rows - 1]) : 0;
ColumnPtr nested_column = nested_type->createColumnConstWithDefaultValue(nested_rows)->convertToFullColumnIfConst();
column.column = ColumnArray::create(nested_column, offsets_column);
}
else
{
/** It is necessary to turn a constant column into a full column, since in part of blocks (from other parts),
* it can be full (or the interpreter may decide that it is constant everywhere).
*/
column.column = column.type->createColumnConstWithDefaultValue(rows)->convertToFullColumnIfConst();
}
}
/// Computes explicitly specified values (in column_defaults) by default and materialized columns.
evaluateMissingDefaults(res, output_block.getNamesAndTypesList(), column_defaults, context);
output->write(res);
output->write(addMissingDefaults(block, output_block.getNamesAndTypesList(), column_defaults, context));
}
void AddingDefaultBlockOutputStream::flush()

View File

@ -63,13 +63,6 @@ public:
*/
virtual Block read() = 0;
/** Get information about the last block received.
*/
virtual BlockExtraInfo getBlockExtraInfo() const
{
throw Exception("Method getBlockExtraInfo is not supported by the data stream " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
/** Read something before starting all data or after the end of all data.
* In the `readSuffix` function, you can implement a finalization that can lead to an exception.
* readPrefix() must be called before the first call to read().

View File

@ -39,23 +39,12 @@ namespace CurrentMetrics
namespace DB
{
/** Union mode.
*/
enum class StreamUnionMode
{
Basic = 0, /// take out blocks
ExtraInfo /// take out blocks + additional information
};
/// Example of the handler.
struct ParallelInputsHandler
{
/// Processing the data block.
void onBlock(Block & /*block*/, size_t /*thread_num*/) {}
/// Processing the data block + additional information.
void onBlock(Block & /*block*/, BlockExtraInfo & /*extra_info*/, size_t /*thread_num*/) {}
/// Called for each thread, when the thread has nothing else to do.
/// Due to the fact that part of the sources has run out, and now there are fewer sources left than streams.
/// Called if the `onException` method does not throw an exception; is called before the `onFinish` method.
@ -70,7 +59,7 @@ struct ParallelInputsHandler
};
template <typename Handler, StreamUnionMode mode = StreamUnionMode::Basic>
template <typename Handler>
class ParallelInputsProcessor
{
public:
@ -183,15 +172,9 @@ private:
InputData(const BlockInputStreamPtr & in_, size_t i_) : in(in_), i(i_) {}
};
void publishPayload(BlockInputStreamPtr & stream, Block & block, size_t thread_num)
void publishPayload(Block & block, size_t thread_num)
{
if constexpr (mode == StreamUnionMode::Basic)
handler.onBlock(block, thread_num);
else
{
BlockExtraInfo extra_info = stream->getBlockExtraInfo();
handler.onBlock(block, extra_info, thread_num);
}
handler.onBlock(block, thread_num);
}
void thread(ThreadGroupStatusPtr thread_group, size_t thread_num)
@ -249,7 +232,7 @@ private:
{
additional_input_at_end->readPrefix();
while (Block block = additional_input_at_end->read())
publishPayload(additional_input_at_end, block, thread_num);
publishPayload(block, thread_num);
}
catch (...)
{
@ -312,7 +295,7 @@ private:
break;
if (block)
publishPayload(input.in, block, thread_num);
publishPayload(block, thread_num);
}
}
}

View File

@ -45,7 +45,7 @@ RemoteBlockInputStream::RemoteBlockInputStream(
create_multiplexed_connections = [this, connections, throttler]() mutable
{
return std::make_unique<MultiplexedConnections>(
std::move(connections), context.getSettingsRef(), throttler, append_extra_info);
std::move(connections), context.getSettingsRef(), throttler);
};
}
@ -74,7 +74,7 @@ RemoteBlockInputStream::RemoteBlockInputStream(
connections = pool->getMany(&current_settings, pool_mode);
return std::make_unique<MultiplexedConnections>(
std::move(connections), current_settings, throttler, append_extra_info);
std::move(connections), current_settings, throttler);
};
}
@ -88,11 +88,6 @@ RemoteBlockInputStream::~RemoteBlockInputStream()
multiplexed_connections->disconnect();
}
void RemoteBlockInputStream::appendExtraInfo()
{
append_extra_info = true;
}
void RemoteBlockInputStream::readPrefix()
{
if (!sent_query)

View File

@ -51,9 +51,6 @@ public:
void setMainTable(QualifiedTableName main_table_) { main_table = std::move(main_table_); }
/// Besides blocks themself, get blocks' extra info
void appendExtraInfo();
/// Sends query (initiates calculation) before read()
void readPrefix() override;
@ -66,11 +63,6 @@ public:
String getName() const override { return "Remote"; }
BlockExtraInfo getBlockExtraInfo() const override
{
return multiplexed_connections->getBlockExtraInfo();
}
Block getHeader() const override { return header; }
protected:
@ -143,7 +135,6 @@ private:
*/
std::atomic<bool> got_unknown_packet_from_replica { false };
bool append_extra_info = false;
PoolMode pool_mode = PoolMode::GET_MANY;
std::optional<QualifiedTableName> main_table;

View File

@ -16,39 +16,6 @@ namespace ErrorCodes
}
namespace
{
template <StreamUnionMode mode>
struct OutputData;
/// A block or an exception.
template <>
struct OutputData<StreamUnionMode::Basic>
{
Block block;
std::exception_ptr exception;
OutputData() {}
OutputData(Block & block_) : block(block_) {}
OutputData(std::exception_ptr & exception_) : exception(exception_) {}
};
/// Block + additional information or an exception.
template <>
struct OutputData<StreamUnionMode::ExtraInfo>
{
Block block;
BlockExtraInfo extra_info;
std::exception_ptr exception;
OutputData() {}
OutputData(Block & block_, BlockExtraInfo & extra_info_) : block(block_), extra_info(extra_info_) {}
OutputData(std::exception_ptr & exception_) : exception(exception_) {}
};
}
/** Merges several sources into one.
* Blocks from different sources are interleaved with each other in an arbitrary way.
* You can specify the number of threads (max_threads),
@ -58,20 +25,24 @@ struct OutputData<StreamUnionMode::ExtraInfo>
* - with the help of ParallelInputsProcessor in several threads it takes out blocks from the sources;
* - the completed blocks are added to a limited queue of finished blocks;
* - the main thread takes out completed blocks from the queue of finished blocks;
* - if the StreamUnionMode::ExtraInfo mode is specified, in addition to the UnionBlockInputStream
* extracts blocks information; In this case all sources should support such mode.
*/
template <StreamUnionMode mode = StreamUnionMode::Basic>
class UnionBlockInputStream final : public IProfilingBlockInputStream
{
private:
/// A block or an exception.
struct OutputData
{
Block block;
std::exception_ptr exception;
OutputData() {}
OutputData(Block & block_) : block(block_) {}
OutputData(std::exception_ptr & exception_) : exception(exception_) {}
};
public:
using ExceptionCallback = std::function<void()>;
private:
using Self = UnionBlockInputStream;
public:
UnionBlockInputStream(BlockInputStreams inputs, BlockInputStreamPtr additional_input_at_end, size_t max_threads,
ExceptionCallback exception_callback_ = ExceptionCallback()) :
output_queue(std::min(inputs.size(), max_threads)),
@ -125,11 +96,6 @@ public:
processor.cancel(kill);
}
BlockExtraInfo getBlockExtraInfo() const override
{
return doGetBlockExtraInfo();
}
Block getHeader() const override { return children.at(0)->getHeader(); }
protected:
@ -146,7 +112,7 @@ protected:
/** Let's read everything up to the end, so that ParallelInputsProcessor is not blocked when trying to insert into the queue.
* Maybe there is an exception in the queue.
*/
OutputData<mode> res;
OutputData res;
while (true)
{
//std::cerr << "popping\n";
@ -230,20 +196,9 @@ protected:
}
private:
BlockExtraInfo doGetBlockExtraInfo() const
{
if constexpr (mode == StreamUnionMode::ExtraInfo)
return received_payload.extra_info;
else
throw Exception("Method getBlockExtraInfo is not supported for mode StreamUnionMode::Basic",
ErrorCodes::NOT_IMPLEMENTED);
}
private:
using Payload = OutputData<mode>;
using Payload = OutputData;
using OutputQueue = ConcurrentBoundedQueue<Payload>;
private:
/** The queue of the finished blocks. Also, you can put an exception instead of a block.
* When data is run out, an empty block is inserted into the queue.
* Sooner or later, an empty block is always inserted into the queue (even after exception or query cancellation).
@ -254,18 +209,13 @@ private:
struct Handler
{
Handler(Self & parent_) : parent(parent_) {}
Handler(UnionBlockInputStream & parent_) : parent(parent_) {}
void onBlock(Block & block, size_t /*thread_num*/)
{
parent.output_queue.push(Payload(block));
}
void onBlock(Block & block, BlockExtraInfo & extra_info, size_t /*thread_num*/)
{
parent.output_queue.push(Payload(block, extra_info));
}
void onFinish()
{
parent.output_queue.push(Payload());
@ -287,11 +237,11 @@ private:
parent.cancel(false); /// Does not throw exceptions.
}
Self & parent;
UnionBlockInputStream & parent;
};
Handler handler;
ParallelInputsProcessor<Handler, mode> processor;
ParallelInputsProcessor<Handler> processor;
ExceptionCallback exception_callback;

View File

@ -40,7 +40,7 @@ try
for (size_t i = 0, size = streams.size(); i < size; ++i)
streams[i] = std::make_shared<AsynchronousBlockInputStream>(streams[i]);
BlockInputStreamPtr stream = std::make_shared<UnionBlockInputStream<>>(streams, nullptr, settings.max_threads);
BlockInputStreamPtr stream = std::make_shared<UnionBlockInputStream>(streams, nullptr, settings.max_threads);
stream = std::make_shared<LimitBlockInputStream>(stream, 10, 0);
WriteBufferFromFileDescriptor wb(STDERR_FILENO);

View File

@ -91,7 +91,7 @@ public:
const StoragePtr & table,
const ASTPtr & query) = 0;
/// Delete the table from the database and return it. Delete the metadata.
/// Delete the table from the database. Delete the metadata.
virtual void removeTable(
const Context & context,
const String & name) = 0;

View File

@ -0,0 +1,43 @@
include(${ClickHouse_SOURCE_DIR}/cmake/dbms_glob_sources.cmake)
include(${ClickHouse_SOURCE_DIR}/cmake/generate_code.cmake)
add_headers_and_sources(clickhouse_dictionaries .)
generate_code(ComplexKeyCacheDictionary_generate1 UInt8 UInt16 UInt32 UInt64 UInt128 Int8 Int16 Int32 Int64 Float32 Float64 Decimal32 Decimal64 Decimal128)
generate_code(ComplexKeyCacheDictionary_generate2 UInt8 UInt16 UInt32 UInt64 UInt128 Int8 Int16 Int32 Int64 Float32 Float64 Decimal32 Decimal64 Decimal128)
generate_code(ComplexKeyCacheDictionary_generate3 UInt8 UInt16 UInt32 UInt64 UInt128 Int8 Int16 Int32 Int64 Float32 Float64 Decimal32 Decimal64 Decimal128)
generate_code(CacheDictionary_generate1 UInt8 UInt16 UInt32 UInt64 UInt128 Int8 Int16 Int32 Int64 Float32 Float64 Decimal32 Decimal64 Decimal128)
generate_code(CacheDictionary_generate2 UInt8 UInt16 UInt32 UInt64 UInt128 Int8 Int16 Int32 Int64 Float32 Float64 Decimal32 Decimal64 Decimal128)
generate_code(CacheDictionary_generate3 UInt8 UInt16 UInt32 UInt64 UInt128 Int8 Int16 Int32 Int64 Float32 Float64 Decimal32 Decimal64 Decimal128)
add_headers_and_sources(clickhouse_dictionaries ${CMAKE_CURRENT_BINARY_DIR}/generated/)
add_library(clickhouse_dictionaries ${LINK_MODE} ${clickhouse_dictionaries_sources})
target_link_libraries(clickhouse_dictionaries PRIVATE clickhouse_common_io pocoext ${MYSQLXX_LIBRARY} ${BTRIE_LIBRARIES})
if(Poco_SQL_FOUND AND NOT USE_INTERNAL_POCO_LIBRARY)
target_include_directories(clickhouse_dictionaries SYSTEM PRIVATE ${Poco_SQL_INCLUDE_DIR})
endif()
if(USE_POCO_SQLODBC)
target_link_libraries(clickhouse_dictionaries PRIVATE ${Poco_SQLODBC_LIBRARY} ${Poco_SQL_LIBRARY})
if (NOT USE_INTERNAL_POCO_LIBRARY)
target_include_directories(clickhouse_dictionaries SYSTEM PRIVATE ${ODBC_INCLUDE_DIRECTORIES} ${Poco_SQLODBC_INCLUDE_DIR} ${Poco_SQL_INCLUDE_DIR})
endif()
endif()
if(Poco_Data_FOUND)
target_include_directories(clickhouse_dictionaries SYSTEM PRIVATE ${Poco_Data_INCLUDE_DIR})
endif()
if(USE_POCO_DATAODBC)
target_link_libraries(clickhouse_dictionaries PRIVATE ${Poco_DataODBC_LIBRARY} ${Poco_Data_LIBRARY})
if (NOT USE_INTERNAL_POCO_LIBRARY)
target_include_directories(clickhouse_dictionaries SYSTEM PRIVATE ${ODBC_INCLUDE_DIRECTORIES} ${Poco_DataODBC_INCLUDE_DIR})
endif()
endif()
if(USE_POCO_MONGODB)
target_link_libraries(clickhouse_dictionaries PRIVATE ${Poco_MongoDB_LIBRARY})
endif()
add_subdirectory(Embedded)

View File

@ -1,3 +1,5 @@
#include "CacheDictionary.h"
#include <functional>
#include <sstream>
#include <memory>
@ -11,12 +13,12 @@
#include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h>
#include <Common/typeid_cast.h>
#include <Dictionaries/CacheDictionary.h>
#include <Dictionaries/DictionaryBlockInputStream.h>
#include "DictionaryBlockInputStream.h"
#include <ext/size.h>
#include <ext/range.h>
#include <ext/map.h>
#include "DictionaryFactory.h"
#include "CacheDictionary.inc.h"
namespace ProfileEvents
{
@ -47,6 +49,7 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
extern const int UNSUPPORTED_METHOD;
extern const int LOGICAL_ERROR;
extern const int TOO_SMALL_BUFFER_SIZE;
}
@ -206,34 +209,6 @@ void CacheDictionary::isInConstantVector(
out[i] = std::find(ancestors.begin(), ancestors.end(), ancestor_ids[i]) != ancestors.end();
}
#define DECLARE(TYPE)\
void CacheDictionary::get##TYPE(const std::string & attribute_name, const PaddedPODArray<Key> & ids, ResultArrayType<TYPE> & out) const\
{\
auto & attribute = getAttribute(attribute_name);\
if (!isAttributeTypeConvertibleTo(attribute.type, AttributeUnderlyingType::TYPE))\
throw Exception{name + ": type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type), ErrorCodes::TYPE_MISMATCH};\
\
const auto null_value = std::get<TYPE>(attribute.null_values);\
\
getItemsNumber<TYPE>(attribute, ids, out, [&] (const size_t) { return null_value; });\
}
DECLARE(UInt8)
DECLARE(UInt16)
DECLARE(UInt32)
DECLARE(UInt64)
DECLARE(UInt128)
DECLARE(Int8)
DECLARE(Int16)
DECLARE(Int32)
DECLARE(Int64)
DECLARE(Float32)
DECLARE(Float64)
DECLARE(Decimal32)
DECLARE(Decimal64)
DECLARE(Decimal128)
#undef DECLARE
void CacheDictionary::getString(const std::string & attribute_name, const PaddedPODArray<Key> & ids, ColumnString * out) const
{
auto & attribute = getAttribute(attribute_name);
@ -245,33 +220,6 @@ void CacheDictionary::getString(const std::string & attribute_name, const Padded
getItemsString(attribute, ids, out, [&] (const size_t) { return null_value; });
}
#define DECLARE(TYPE)\
void CacheDictionary::get##TYPE(\
const std::string & attribute_name, const PaddedPODArray<Key> & ids, const PaddedPODArray<TYPE> & def,\
ResultArrayType<TYPE> & out) const\
{\
auto & attribute = getAttribute(attribute_name);\
if (!isAttributeTypeConvertibleTo(attribute.type, AttributeUnderlyingType::TYPE))\
throw Exception{name + ": type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type), ErrorCodes::TYPE_MISMATCH};\
\
getItemsNumber<TYPE>(attribute, ids, out, [&] (const size_t row) { return def[row]; });\
}
DECLARE(UInt8)
DECLARE(UInt16)
DECLARE(UInt32)
DECLARE(UInt64)
DECLARE(UInt128)
DECLARE(Int8)
DECLARE(Int16)
DECLARE(Int32)
DECLARE(Int64)
DECLARE(Float32)
DECLARE(Float64)
DECLARE(Decimal32)
DECLARE(Decimal64)
DECLARE(Decimal128)
#undef DECLARE
void CacheDictionary::getString(
const std::string & attribute_name, const PaddedPODArray<Key> & ids, const ColumnString * const def,
ColumnString * const out) const
@ -283,32 +231,6 @@ void CacheDictionary::getString(
getItemsString(attribute, ids, out, [&] (const size_t row) { return def->getDataAt(row); });
}
#define DECLARE(TYPE)\
void CacheDictionary::get##TYPE(\
const std::string & attribute_name, const PaddedPODArray<Key> & ids, const TYPE def, ResultArrayType<TYPE> & out) const\
{\
auto & attribute = getAttribute(attribute_name);\
if (!isAttributeTypeConvertibleTo(attribute.type, AttributeUnderlyingType::TYPE))\
throw Exception{name + ": type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type), ErrorCodes::TYPE_MISMATCH};\
\
getItemsNumber<TYPE>(attribute, ids, out, [&] (const size_t) { return def; });\
}
DECLARE(UInt8)
DECLARE(UInt16)
DECLARE(UInt32)
DECLARE(UInt64)
DECLARE(UInt128)
DECLARE(Int8)
DECLARE(Int16)
DECLARE(Int32)
DECLARE(Int64)
DECLARE(Float32)
DECLARE(Float64)
DECLARE(Decimal32)
DECLARE(Decimal64)
DECLARE(Decimal128)
#undef DECLARE
void CacheDictionary::getString(
const std::string & attribute_name, const PaddedPODArray<Key> & ids, const String & def,
ColumnString * const out) const
@ -487,374 +409,6 @@ CacheDictionary::Attribute CacheDictionary::createAttributeWithType(const Attrib
return attr;
}
template <typename OutputType, typename DefaultGetter>
void CacheDictionary::getItemsNumber(
Attribute & attribute,
const PaddedPODArray<Key> & ids,
ResultArrayType<OutputType> & out,
DefaultGetter && get_default) const
{
if (false) {}
#define DISPATCH(TYPE) \
else if (attribute.type == AttributeUnderlyingType::TYPE) \
getItemsNumberImpl<TYPE, OutputType>(attribute, ids, out, std::forward<DefaultGetter>(get_default));
DISPATCH(UInt8)
DISPATCH(UInt16)
DISPATCH(UInt32)
DISPATCH(UInt64)
DISPATCH(UInt128)
DISPATCH(Int8)
DISPATCH(Int16)
DISPATCH(Int32)
DISPATCH(Int64)
DISPATCH(Float32)
DISPATCH(Float64)
DISPATCH(Decimal32)
DISPATCH(Decimal64)
DISPATCH(Decimal128)
#undef DISPATCH
else
throw Exception("Unexpected type of attribute: " + toString(attribute.type), ErrorCodes::LOGICAL_ERROR);
}
template <typename AttributeType, typename OutputType, typename DefaultGetter>
void CacheDictionary::getItemsNumberImpl(
Attribute & attribute,
const PaddedPODArray<Key> & ids,
ResultArrayType<OutputType> & out,
DefaultGetter && get_default) const
{
/// Mapping: <id> -> { all indices `i` of `ids` such that `ids[i]` = <id> }
std::unordered_map<Key, std::vector<size_t>> outdated_ids;
auto & attribute_array = std::get<ContainerPtrType<AttributeType>>(attribute.arrays);
const auto rows = ext::size(ids);
size_t cache_expired = 0, cache_not_found = 0, cache_hit = 0;
{
const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};
const auto now = std::chrono::system_clock::now();
/// fetch up-to-date values, decide which ones require update
for (const auto row : ext::range(0, rows))
{
const auto id = ids[row];
/** cell should be updated if either:
* 1. ids do not match,
* 2. cell has expired,
* 3. explicit defaults were specified and cell was set default. */
const auto find_result = findCellIdx(id, now);
if (!find_result.valid)
{
outdated_ids[id].push_back(row);
if (find_result.outdated)
++cache_expired;
else
++cache_not_found;
}
else
{
++cache_hit;
const auto & cell_idx = find_result.cell_idx;
const auto & cell = cells[cell_idx];
out[row] = cell.isDefault() ? get_default(row) : static_cast<OutputType>(attribute_array[cell_idx]);
}
}
}
ProfileEvents::increment(ProfileEvents::DictCacheKeysExpired, cache_expired);
ProfileEvents::increment(ProfileEvents::DictCacheKeysNotFound, cache_not_found);
ProfileEvents::increment(ProfileEvents::DictCacheKeysHit, cache_hit);
query_count.fetch_add(rows, std::memory_order_relaxed);
hit_count.fetch_add(rows - outdated_ids.size(), std::memory_order_release);
if (outdated_ids.empty())
return;
std::vector<Key> required_ids(outdated_ids.size());
std::transform(std::begin(outdated_ids), std::end(outdated_ids), std::begin(required_ids),
[] (auto & pair) { return pair.first; });
/// request new values
update(required_ids,
[&] (const auto id, const auto cell_idx)
{
const auto attribute_value = attribute_array[cell_idx];
for (const size_t row : outdated_ids[id])
out[row] = static_cast<OutputType>(attribute_value);
},
[&] (const auto id, const auto)
{
for (const size_t row : outdated_ids[id])
out[row] = get_default(row);
});
}
template <typename DefaultGetter>
void CacheDictionary::getItemsString(
Attribute & attribute,
const PaddedPODArray<Key> & ids,
ColumnString * out,
DefaultGetter && get_default) const
{
const auto rows = ext::size(ids);
/// save on some allocations
out->getOffsets().reserve(rows);
auto & attribute_array = std::get<ContainerPtrType<StringRef>>(attribute.arrays);
auto found_outdated_values = false;
/// perform optimistic version, fallback to pessimistic if failed
{
const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};
const auto now = std::chrono::system_clock::now();
/// fetch up-to-date values, discard on fail
for (const auto row : ext::range(0, rows))
{
const auto id = ids[row];
const auto find_result = findCellIdx(id, now);
if (!find_result.valid)
{
found_outdated_values = true;
break;
}
else
{
const auto & cell_idx = find_result.cell_idx;
const auto & cell = cells[cell_idx];
const auto string_ref = cell.isDefault() ? get_default(row) : attribute_array[cell_idx];
out->insertData(string_ref.data, string_ref.size);
}
}
}
/// optimistic code completed successfully
if (!found_outdated_values)
{
query_count.fetch_add(rows, std::memory_order_relaxed);
hit_count.fetch_add(rows, std::memory_order_release);
return;
}
/// now onto the pessimistic one, discard possible partial results from the optimistic path
out->getChars().resize_assume_reserved(0);
out->getOffsets().resize_assume_reserved(0);
/// Mapping: <id> -> { all indices `i` of `ids` such that `ids[i]` = <id> }
std::unordered_map<Key, std::vector<size_t>> outdated_ids;
/// we are going to store every string separately
std::unordered_map<Key, String> map;
size_t total_length = 0;
size_t cache_expired = 0, cache_not_found = 0, cache_hit = 0;
{
const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};
const auto now = std::chrono::system_clock::now();
for (const auto row : ext::range(0, ids.size()))
{
const auto id = ids[row];
const auto find_result = findCellIdx(id, now);
if (!find_result.valid)
{
outdated_ids[id].push_back(row);
if (find_result.outdated)
++cache_expired;
else
++cache_not_found;
}
else
{
++cache_hit;
const auto & cell_idx = find_result.cell_idx;
const auto & cell = cells[cell_idx];
const auto string_ref = cell.isDefault() ? get_default(row) : attribute_array[cell_idx];
if (!cell.isDefault())
map[id] = String{string_ref};
total_length += string_ref.size + 1;
}
}
}
ProfileEvents::increment(ProfileEvents::DictCacheKeysExpired, cache_expired);
ProfileEvents::increment(ProfileEvents::DictCacheKeysNotFound, cache_not_found);
ProfileEvents::increment(ProfileEvents::DictCacheKeysHit, cache_hit);
query_count.fetch_add(rows, std::memory_order_relaxed);
hit_count.fetch_add(rows - outdated_ids.size(), std::memory_order_release);
/// request new values
if (!outdated_ids.empty())
{
std::vector<Key> required_ids(outdated_ids.size());
std::transform(std::begin(outdated_ids), std::end(outdated_ids), std::begin(required_ids),
[] (auto & pair) { return pair.first; });
update(required_ids,
[&] (const auto id, const auto cell_idx)
{
const auto attribute_value = attribute_array[cell_idx];
map[id] = String{attribute_value};
total_length += (attribute_value.size + 1) * outdated_ids[id].size();
},
[&] (const auto id, const auto)
{
for (const auto row : outdated_ids[id])
total_length += get_default(row).size + 1;
});
}
out->getChars().reserve(total_length);
for (const auto row : ext::range(0, ext::size(ids)))
{
const auto id = ids[row];
const auto it = map.find(id);
const auto string_ref = it != std::end(map) ? StringRef{it->second} : get_default(row);
out->insertData(string_ref.data, string_ref.size);
}
}
template <typename PresentIdHandler, typename AbsentIdHandler>
void CacheDictionary::update(
const std::vector<Key> & requested_ids,
PresentIdHandler && on_cell_updated,
AbsentIdHandler && on_id_not_found) const
{
std::unordered_map<Key, UInt8> remaining_ids{requested_ids.size()};
for (const auto id : requested_ids)
remaining_ids.insert({ id, 0 });
std::uniform_int_distribution<UInt64> distribution
{
dict_lifetime.min_sec,
dict_lifetime.max_sec
};
const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::DictCacheRequests};
Stopwatch watch;
auto stream = source_ptr->loadIds(requested_ids);
stream->readPrefix();
const auto now = std::chrono::system_clock::now();
while (const auto block = stream->read())
{
const auto id_column = typeid_cast<const ColumnUInt64 *>(block.safeGetByPosition(0).column.get());
if (!id_column)
throw Exception{name + ": id column has type different from UInt64.", ErrorCodes::TYPE_MISMATCH};
const auto & ids = id_column->getData();
/// cache column pointers
const auto column_ptrs = ext::map<std::vector>(ext::range(0, attributes.size()), [&block] (size_t i)
{
return block.safeGetByPosition(i + 1).column.get();
});
for (const auto i : ext::range(0, ids.size()))
{
const auto id = ids[i];
const auto find_result = findCellIdx(id, now);
const auto & cell_idx = find_result.cell_idx;
auto & cell = cells[cell_idx];
for (const auto attribute_idx : ext::range(0, attributes.size()))
{
const auto & attribute_column = *column_ptrs[attribute_idx];
auto & attribute = attributes[attribute_idx];
setAttributeValue(attribute, cell_idx, attribute_column[i]);
}
/// if cell id is zero and zero does not map to this cell, then the cell is unused
if (cell.id == 0 && cell_idx != zero_cell_idx)
element_count.fetch_add(1, std::memory_order_relaxed);
cell.id = id;
if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0)
cell.setExpiresAt(std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)});
else
cell.setExpiresAt(std::chrono::time_point<std::chrono::system_clock>::max());
/// inform caller
on_cell_updated(id, cell_idx);
/// mark corresponding id as found
remaining_ids[id] = 1;
}
}
stream->readSuffix();
ProfileEvents::increment(ProfileEvents::DictCacheKeysRequested, requested_ids.size());
ProfileEvents::increment(ProfileEvents::DictCacheRequestTimeNs, watch.elapsed());
}
size_t not_found_num = 0, found_num = 0;
const auto now = std::chrono::system_clock::now();
/// Check which ids have not been found and require setting null_value
for (const auto & id_found_pair : remaining_ids)
{
if (id_found_pair.second)
{
++found_num;
continue;
}
++not_found_num;
const auto id = id_found_pair.first;
const auto find_result = findCellIdx(id, now);
const auto & cell_idx = find_result.cell_idx;
auto & cell = cells[cell_idx];
/// Set null_value for each attribute
for (auto & attribute : attributes)
setDefaultAttributeValue(attribute, cell_idx);
/// Check if cell had not been occupied before and increment element counter if it hadn't
if (cell.id == 0 && cell_idx != zero_cell_idx)
element_count.fetch_add(1, std::memory_order_relaxed);
cell.id = id;
if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0)
cell.setExpiresAt(std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)});
else
cell.setExpiresAt(std::chrono::time_point<std::chrono::system_clock>::max());
cell.setDefault();
/// inform caller that the cell has not been found
on_id_not_found(id, cell_idx);
}
ProfileEvents::increment(ProfileEvents::DictCacheKeysRequestedMiss, not_found_num);
ProfileEvents::increment(ProfileEvents::DictCacheKeysRequestedFound, found_num);
ProfileEvents::increment(ProfileEvents::DictCacheRequests);
}
void CacheDictionary::setDefaultAttributeValue(Attribute & attribute, const Key idx) const
{
switch (attribute.type)
@ -981,5 +535,41 @@ BlockInputStreamPtr CacheDictionary::getBlockInputStream(const Names & column_na
return std::make_shared<BlockInputStreamType>(shared_from_this(), max_block_size, getCachedIds(), column_names);
}
void registerDictionaryCache(DictionaryFactory & factory)
{
auto create_layout = [=](
const std::string & name,
const DictionaryStructure & dict_struct,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
DictionarySourcePtr source_ptr
) -> DictionaryPtr {
if (dict_struct.key)
throw Exception {"'key' is not supported for dictionary of layout 'cache'", ErrorCodes::UNSUPPORTED_METHOD};
if (dict_struct.range_min || dict_struct.range_max)
throw Exception {name
+ ": elements .structure.range_min and .structure.range_max should be defined only "
"for a dictionary of layout 'range_hashed'",
ErrorCodes::BAD_ARGUMENTS};
const auto & layout_prefix = config_prefix + ".layout";
const auto size = config.getInt(layout_prefix + ".cache.size_in_cells");
if (size == 0)
throw Exception {name + ": dictionary of layout 'cache' cannot have 0 cells", ErrorCodes::TOO_SMALL_BUFFER_SIZE};
const bool require_nonempty = config.getBool(config_prefix + ".require_nonempty", false);
if (require_nonempty)
throw Exception {name + ": dictionary of layout 'cache' cannot have 'require_nonempty' attribute set",
ErrorCodes::BAD_ARGUMENTS};
const DictionaryLifetime dict_lifetime {config, config_prefix + ".lifetime"};
return std::make_unique<CacheDictionary>(name, dict_struct, std::move(source_ptr), dict_lifetime, size);
};
factory.registerLayout("cache", create_layout);
}
}

View File

@ -1,8 +1,8 @@
#pragma once
#include <Dictionaries/IDictionary.h>
#include <Dictionaries/IDictionarySource.h>
#include <Dictionaries/DictionaryStructure.h>
#include "IDictionary.h"
#include "IDictionarySource.h"
#include "DictionaryStructure.h"
#include <Common/ArenaWithFreeLists.h>
#include <Common/CurrentMetrics.h>
#include <Columns/ColumnDecimal.h>

View File

@ -0,0 +1,403 @@
#include "CacheDictionary.h"
#include <ext/size.h>
#include <ext/map.h>
#include <ext/range.h>
#include <Common/ProfilingScopedRWLock.h>
#include <Common/typeid_cast.h>
#include <Columns/ColumnsNumber.h>
namespace ProfileEvents
{
extern const Event DictCacheKeysRequested;
extern const Event DictCacheKeysRequestedMiss;
extern const Event DictCacheKeysRequestedFound;
extern const Event DictCacheKeysExpired;
extern const Event DictCacheKeysNotFound;
extern const Event DictCacheKeysHit;
extern const Event DictCacheRequestTimeNs;
extern const Event DictCacheRequests;
extern const Event DictCacheLockWriteNs;
extern const Event DictCacheLockReadNs;
}
namespace CurrentMetrics
{
extern const Metric DictCacheRequests;
}
namespace DB
{
namespace ErrorCodes
{
extern const int TYPE_MISMATCH;
}
template <typename OutputType, typename DefaultGetter>
void CacheDictionary::getItemsNumber(
Attribute & attribute,
const PaddedPODArray<Key> & ids,
ResultArrayType<OutputType> & out,
DefaultGetter && get_default) const
{
if (false) {}
#define DISPATCH(TYPE) \
else if (attribute.type == AttributeUnderlyingType::TYPE) \
getItemsNumberImpl<TYPE, OutputType>(attribute, ids, out, std::forward<DefaultGetter>(get_default));
DISPATCH(UInt8)
DISPATCH(UInt16)
DISPATCH(UInt32)
DISPATCH(UInt64)
DISPATCH(UInt128)
DISPATCH(Int8)
DISPATCH(Int16)
DISPATCH(Int32)
DISPATCH(Int64)
DISPATCH(Float32)
DISPATCH(Float64)
DISPATCH(Decimal32)
DISPATCH(Decimal64)
DISPATCH(Decimal128)
#undef DISPATCH
else
throw Exception("Unexpected type of attribute: " + toString(attribute.type), ErrorCodes::LOGICAL_ERROR);
}
template <typename AttributeType, typename OutputType, typename DefaultGetter>
void CacheDictionary::getItemsNumberImpl(
Attribute & attribute,
const PaddedPODArray<Key> & ids,
ResultArrayType<OutputType> & out,
DefaultGetter && get_default) const
{
/// Mapping: <id> -> { all indices `i` of `ids` such that `ids[i]` = <id> }
std::unordered_map<Key, std::vector<size_t>> outdated_ids;
auto & attribute_array = std::get<ContainerPtrType<AttributeType>>(attribute.arrays);
const auto rows = ext::size(ids);
size_t cache_expired = 0, cache_not_found = 0, cache_hit = 0;
{
const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};
const auto now = std::chrono::system_clock::now();
/// fetch up-to-date values, decide which ones require update
for (const auto row : ext::range(0, rows))
{
const auto id = ids[row];
/** cell should be updated if either:
* 1. ids do not match,
* 2. cell has expired,
* 3. explicit defaults were specified and cell was set default. */
const auto find_result = findCellIdx(id, now);
if (!find_result.valid)
{
outdated_ids[id].push_back(row);
if (find_result.outdated)
++cache_expired;
else
++cache_not_found;
}
else
{
++cache_hit;
const auto & cell_idx = find_result.cell_idx;
const auto & cell = cells[cell_idx];
out[row] = cell.isDefault() ? get_default(row) : static_cast<OutputType>(attribute_array[cell_idx]);
}
}
}
ProfileEvents::increment(ProfileEvents::DictCacheKeysExpired, cache_expired);
ProfileEvents::increment(ProfileEvents::DictCacheKeysNotFound, cache_not_found);
ProfileEvents::increment(ProfileEvents::DictCacheKeysHit, cache_hit);
query_count.fetch_add(rows, std::memory_order_relaxed);
hit_count.fetch_add(rows - outdated_ids.size(), std::memory_order_release);
if (outdated_ids.empty())
return;
std::vector<Key> required_ids(outdated_ids.size());
std::transform(std::begin(outdated_ids), std::end(outdated_ids), std::begin(required_ids),
[] (auto & pair) { return pair.first; });
/// request new values
update(required_ids,
[&] (const auto id, const auto cell_idx)
{
const auto attribute_value = attribute_array[cell_idx];
for (const size_t row : outdated_ids[id])
out[row] = static_cast<OutputType>(attribute_value);
},
[&] (const auto id, const auto)
{
for (const size_t row : outdated_ids[id])
out[row] = get_default(row);
});
}
template <typename DefaultGetter>
void CacheDictionary::getItemsString(
Attribute & attribute,
const PaddedPODArray<Key> & ids,
ColumnString * out,
DefaultGetter && get_default) const
{
const auto rows = ext::size(ids);
/// save on some allocations
out->getOffsets().reserve(rows);
auto & attribute_array = std::get<ContainerPtrType<StringRef>>(attribute.arrays);
auto found_outdated_values = false;
/// perform optimistic version, fallback to pessimistic if failed
{
const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};
const auto now = std::chrono::system_clock::now();
/// fetch up-to-date values, discard on fail
for (const auto row : ext::range(0, rows))
{
const auto id = ids[row];
const auto find_result = findCellIdx(id, now);
if (!find_result.valid)
{
found_outdated_values = true;
break;
}
else
{
const auto & cell_idx = find_result.cell_idx;
const auto & cell = cells[cell_idx];
const auto string_ref = cell.isDefault() ? get_default(row) : attribute_array[cell_idx];
out->insertData(string_ref.data, string_ref.size);
}
}
}
/// optimistic code completed successfully
if (!found_outdated_values)
{
query_count.fetch_add(rows, std::memory_order_relaxed);
hit_count.fetch_add(rows, std::memory_order_release);
return;
}
/// now onto the pessimistic one, discard possible partial results from the optimistic path
out->getChars().resize_assume_reserved(0);
out->getOffsets().resize_assume_reserved(0);
/// Mapping: <id> -> { all indices `i` of `ids` such that `ids[i]` = <id> }
std::unordered_map<Key, std::vector<size_t>> outdated_ids;
/// we are going to store every string separately
std::unordered_map<Key, String> map;
size_t total_length = 0;
size_t cache_expired = 0, cache_not_found = 0, cache_hit = 0;
{
const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};
const auto now = std::chrono::system_clock::now();
for (const auto row : ext::range(0, ids.size()))
{
const auto id = ids[row];
const auto find_result = findCellIdx(id, now);
if (!find_result.valid)
{
outdated_ids[id].push_back(row);
if (find_result.outdated)
++cache_expired;
else
++cache_not_found;
}
else
{
++cache_hit;
const auto & cell_idx = find_result.cell_idx;
const auto & cell = cells[cell_idx];
const auto string_ref = cell.isDefault() ? get_default(row) : attribute_array[cell_idx];
if (!cell.isDefault())
map[id] = String{string_ref};
total_length += string_ref.size + 1;
}
}
}
ProfileEvents::increment(ProfileEvents::DictCacheKeysExpired, cache_expired);
ProfileEvents::increment(ProfileEvents::DictCacheKeysNotFound, cache_not_found);
ProfileEvents::increment(ProfileEvents::DictCacheKeysHit, cache_hit);
query_count.fetch_add(rows, std::memory_order_relaxed);
hit_count.fetch_add(rows - outdated_ids.size(), std::memory_order_release);
/// request new values
if (!outdated_ids.empty())
{
std::vector<Key> required_ids(outdated_ids.size());
std::transform(std::begin(outdated_ids), std::end(outdated_ids), std::begin(required_ids),
[] (auto & pair) { return pair.first; });
update(required_ids,
[&] (const auto id, const auto cell_idx)
{
const auto attribute_value = attribute_array[cell_idx];
map[id] = String{attribute_value};
total_length += (attribute_value.size + 1) * outdated_ids[id].size();
},
[&] (const auto id, const auto)
{
for (const auto row : outdated_ids[id])
total_length += get_default(row).size + 1;
});
}
out->getChars().reserve(total_length);
for (const auto row : ext::range(0, ext::size(ids)))
{
const auto id = ids[row];
const auto it = map.find(id);
const auto string_ref = it != std::end(map) ? StringRef{it->second} : get_default(row);
out->insertData(string_ref.data, string_ref.size);
}
}
template <typename PresentIdHandler, typename AbsentIdHandler>
void CacheDictionary::update(
const std::vector<Key> & requested_ids,
PresentIdHandler && on_cell_updated,
AbsentIdHandler && on_id_not_found) const
{
std::unordered_map<Key, UInt8> remaining_ids{requested_ids.size()};
for (const auto id : requested_ids)
remaining_ids.insert({ id, 0 });
std::uniform_int_distribution<UInt64> distribution
{
dict_lifetime.min_sec,
dict_lifetime.max_sec
};
const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::DictCacheRequests};
Stopwatch watch;
auto stream = source_ptr->loadIds(requested_ids);
stream->readPrefix();
const auto now = std::chrono::system_clock::now();
while (const auto block = stream->read())
{
const auto id_column = typeid_cast<const ColumnUInt64 *>(block.safeGetByPosition(0).column.get());
if (!id_column)
throw Exception{name + ": id column has type different from UInt64.", ErrorCodes::TYPE_MISMATCH};
const auto & ids = id_column->getData();
/// cache column pointers
const auto column_ptrs = ext::map<std::vector>(ext::range(0, attributes.size()), [&block] (size_t i)
{
return block.safeGetByPosition(i + 1).column.get();
});
for (const auto i : ext::range(0, ids.size()))
{
const auto id = ids[i];
const auto find_result = findCellIdx(id, now);
const auto & cell_idx = find_result.cell_idx;
auto & cell = cells[cell_idx];
for (const auto attribute_idx : ext::range(0, attributes.size()))
{
const auto & attribute_column = *column_ptrs[attribute_idx];
auto & attribute = attributes[attribute_idx];
setAttributeValue(attribute, cell_idx, attribute_column[i]);
}
/// if cell id is zero and zero does not map to this cell, then the cell is unused
if (cell.id == 0 && cell_idx != zero_cell_idx)
element_count.fetch_add(1, std::memory_order_relaxed);
cell.id = id;
if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0)
cell.setExpiresAt(std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)});
else
cell.setExpiresAt(std::chrono::time_point<std::chrono::system_clock>::max());
/// inform caller
on_cell_updated(id, cell_idx);
/// mark corresponding id as found
remaining_ids[id] = 1;
}
}
stream->readSuffix();
ProfileEvents::increment(ProfileEvents::DictCacheKeysRequested, requested_ids.size());
ProfileEvents::increment(ProfileEvents::DictCacheRequestTimeNs, watch.elapsed());
}
size_t not_found_num = 0, found_num = 0;
const auto now = std::chrono::system_clock::now();
/// Check which ids have not been found and require setting null_value
for (const auto & id_found_pair : remaining_ids)
{
if (id_found_pair.second)
{
++found_num;
continue;
}
++not_found_num;
const auto id = id_found_pair.first;
const auto find_result = findCellIdx(id, now);
const auto & cell_idx = find_result.cell_idx;
auto & cell = cells[cell_idx];
/// Set null_value for each attribute
for (auto & attribute : attributes)
setDefaultAttributeValue(attribute, cell_idx);
/// Check if cell had not been occupied before and increment element counter if it hadn't
if (cell.id == 0 && cell_idx != zero_cell_idx)
element_count.fetch_add(1, std::memory_order_relaxed);
cell.id = id;
if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0)
cell.setExpiresAt(std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)});
else
cell.setExpiresAt(std::chrono::time_point<std::chrono::system_clock>::max());
cell.setDefault();
/// inform caller that the cell has not been found
on_id_not_found(id, cell_idx);
}
ProfileEvents::increment(ProfileEvents::DictCacheKeysRequestedMiss, not_found_num);
ProfileEvents::increment(ProfileEvents::DictCacheKeysRequestedFound, found_num);
ProfileEvents::increment(ProfileEvents::DictCacheRequests);
}
}

View File

@ -0,0 +1,24 @@
#include <Dictionaries/CacheDictionary.h>
#include <Dictionaries/CacheDictionary.inc.h>
namespace DB
{
namespace ErrorCodes
{
extern const int TYPE_MISMATCH;
}
using TYPE = @NAME@;
void CacheDictionary::get@NAME@(const std::string & attribute_name, const PaddedPODArray<Key> & ids, ResultArrayType<TYPE> & out) const
{
auto & attribute = getAttribute(attribute_name);
if (!isAttributeTypeConvertibleTo(attribute.type, AttributeUnderlyingType::@NAME@))
throw Exception {name + ": type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type),
ErrorCodes::TYPE_MISMATCH};
const auto null_value = std::get<TYPE>(attribute.null_values);
getItemsNumber<TYPE>(attribute, ids, out, [&](const size_t) { return null_value; });
}
}

View File

@ -0,0 +1,25 @@
#include <Dictionaries/CacheDictionary.h>
#include <Dictionaries/CacheDictionary.inc.h>
namespace DB
{
namespace ErrorCodes
{
extern const int TYPE_MISMATCH;
}
using TYPE = @NAME@;
void CacheDictionary::get@NAME@(const std::string & attribute_name,
const PaddedPODArray<Key> & ids,
const PaddedPODArray<TYPE> & def,
ResultArrayType<TYPE> & out) const
{
auto & attribute = getAttribute(attribute_name);
if (!isAttributeTypeConvertibleTo(attribute.type, AttributeUnderlyingType::@NAME@))
throw Exception {name + ": type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type),
ErrorCodes::TYPE_MISMATCH};
getItemsNumber<TYPE>(attribute, ids, out, [&](const size_t row) { return def[row]; });
}
}

View File

@ -0,0 +1,22 @@
#include <Dictionaries/CacheDictionary.h>
#include <Dictionaries/CacheDictionary.inc.h>
namespace DB
{
namespace ErrorCodes
{
extern const int TYPE_MISMATCH;
}
using TYPE = @NAME@;
void CacheDictionary::get@NAME@(const std::string & attribute_name, const PaddedPODArray<Key> & ids, const TYPE def, ResultArrayType<TYPE> & out) const
{
auto & attribute = getAttribute(attribute_name);
if (!isAttributeTypeConvertibleTo(attribute.type, AttributeUnderlyingType::@NAME@))
throw Exception {name + ": type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type),
ErrorCodes::TYPE_MISMATCH};
getItemsNumber<TYPE>(attribute, ids, out, [&](const size_t) { return def; });
}
}

View File

@ -1,14 +1,17 @@
#include <Dictionaries/ClickHouseDictionarySource.h>
#include <Dictionaries/ExternalQueryBuilder.h>
#include <Dictionaries/writeParenthesisedString.h>
#include "ClickHouseDictionarySource.h"
#include "ExternalQueryBuilder.h"
#include "writeParenthesisedString.h"
#include <Client/ConnectionPool.h>
#include <DataStreams/RemoteBlockInputStream.h>
#include <Dictionaries/readInvalidateQuery.h>
#include "readInvalidateQuery.h"
#include <Interpreters/executeQuery.h>
#include <Common/isLocalAddress.h>
#include <memory>
#include <ext/range.h>
#include <IO/ConnectionTimeouts.h>
#include "DictionarySourceFactory.h"
#include "DictionaryStructure.h"
namespace DB
{
@ -175,4 +178,17 @@ std::string ClickHouseDictionarySource::doInvalidateQuery(const std::string & re
}
}
void registerDictionarySourceClickHouse(DictionarySourceFactory & factory)
{
auto createTableSource = [=](const DictionaryStructure & dict_struct,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
Block & sample_block,
Context & context) -> DictionarySourcePtr {
return std::make_unique<ClickHouseDictionarySource>(dict_struct, config, config_prefix + ".clickhouse", sample_block, context);
};
factory.registerSource("clickhouse", createTableSource);
}
}

View File

@ -1,10 +1,9 @@
#pragma once
#include <Dictionaries/IDictionarySource.h>
#include <Dictionaries/DictionaryStructure.h>
#include <Dictionaries/ExternalQueryBuilder.h>
#include "IDictionarySource.h"
#include "DictionaryStructure.h"
#include "ExternalQueryBuilder.h"
#include <Client/ConnectionPoolWithFailover.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <memory>

View File

@ -1,5 +1,5 @@
#include <Dictionaries/ComplexKeyCacheDictionary.h>
#include <Dictionaries/DictionaryBlockInputStream.h>
#include "ComplexKeyCacheDictionary.h"
#include "DictionaryBlockInputStream.h"
#include <Common/Arena.h>
#include <Common/BitHelpers.h>
#include <Common/randomSeed.h>
@ -9,6 +9,7 @@
#include <Common/CurrentMetrics.h>
#include <ext/range.h>
#include <ext/map.h>
#include "DictionaryFactory.h"
namespace ProfileEvents
@ -39,6 +40,7 @@ namespace ErrorCodes
extern const int TYPE_MISMATCH;
extern const int BAD_ARGUMENTS;
extern const int UNSUPPORTED_METHOD;
extern const int TOO_SMALL_BUFFER_SIZE;
}
@ -378,4 +380,32 @@ BlockInputStreamPtr ComplexKeyCacheDictionary::getBlockInputStream(const Names &
return std::make_shared<BlockInputStreamType>(shared_from_this(), max_block_size, keys, column_names);
}
void registerDictionaryComplexKeyCache(DictionaryFactory & factory)
{
auto create_layout = [=](
const std::string & name,
const DictionaryStructure & dict_struct,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
DictionarySourcePtr source_ptr
) -> DictionaryPtr {
if (!dict_struct.key)
throw Exception {"'key' is required for dictionary of layout 'complex_key_hashed'", ErrorCodes::BAD_ARGUMENTS};
const auto & layout_prefix = config_prefix + ".layout";
const auto size = config.getInt(layout_prefix + ".complex_key_cache.size_in_cells");
if (size == 0)
throw Exception {name + ": dictionary of layout 'cache' cannot have 0 cells", ErrorCodes::TOO_SMALL_BUFFER_SIZE};
const bool require_nonempty = config.getBool(config_prefix + ".require_nonempty", false);
if (require_nonempty)
throw Exception {name + ": dictionary of layout 'cache' cannot have 'require_nonempty' attribute set",
ErrorCodes::BAD_ARGUMENTS};
const DictionaryLifetime dict_lifetime {config, config_prefix + ".lifetime"};
return std::make_unique<ComplexKeyCacheDictionary>(name, dict_struct, std::move(source_ptr), dict_lifetime, size);
};
factory.registerLayout("complex_key_cache", create_layout);
}
}

View File

@ -12,9 +12,9 @@
#include <Common/HashTable/HashMap.h>
#include <Common/ProfilingScopedRWLock.h>
#include <Common/SmallObjectPool.h>
#include <Dictionaries/DictionaryStructure.h>
#include <Dictionaries/IDictionary.h>
#include <Dictionaries/IDictionarySource.h>
#include "DictionaryStructure.h"
#include "IDictionary.h"
#include "IDictionarySource.h"
#include <common/StringRef.h>
#include <ext/bit_cast.h>
#include <ext/map.h>

View File

@ -1,4 +1,4 @@
#include <Dictionaries/ComplexKeyCacheDictionary.h>
#include "ComplexKeyCacheDictionary.h"
namespace DB
{

View File

@ -1,40 +0,0 @@
#include "ComplexKeyCacheDictionary.h"
namespace DB
{
namespace ErrorCodes
{
extern const int TYPE_MISMATCH;
}
#define DECLARE(TYPE) \
void ComplexKeyCacheDictionary::get##TYPE( \
const std::string & attribute_name, const Columns & key_columns, const DataTypes & key_types, ResultArrayType<TYPE> & out) const \
{ \
dict_struct.validateKeyTypes(key_types); \
\
auto & attribute = getAttribute(attribute_name); \
if (!isAttributeTypeConvertibleTo(attribute.type, AttributeUnderlyingType::TYPE)) \
throw Exception{name + ": type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type), \
ErrorCodes::TYPE_MISMATCH}; \
\
const auto null_value = std::get<TYPE>(attribute.null_values); \
\
getItemsNumber<TYPE>(attribute, key_columns, out, [&](const size_t) { return null_value; }); \
}
DECLARE(UInt8)
DECLARE(UInt16)
DECLARE(UInt32)
DECLARE(UInt64)
DECLARE(UInt128)
DECLARE(Int8)
DECLARE(Int16)
DECLARE(Int32)
DECLARE(Int64)
DECLARE(Float32)
DECLARE(Float64)
DECLARE(Decimal32)
DECLARE(Decimal64)
DECLARE(Decimal128)
#undef DECLARE
}

View File

@ -0,0 +1,24 @@
#include <Dictionaries/ComplexKeyCacheDictionary.h>
namespace DB
{
namespace ErrorCodes
{
extern const int TYPE_MISMATCH;
}
using TYPE = @NAME@;
void ComplexKeyCacheDictionary::get@NAME@(const std::string & attribute_name, const Columns & key_columns, const DataTypes & key_types, ResultArrayType<TYPE> & out) const
{
dict_struct.validateKeyTypes(key_types);
auto & attribute = getAttribute(attribute_name);
if (!isAttributeTypeConvertibleTo(attribute.type, AttributeUnderlyingType::@NAME@))
throw Exception {name + ": type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type),
ErrorCodes::TYPE_MISMATCH};
const auto null_value = std::get<TYPE>(attribute.null_values);
getItemsNumber<TYPE>(attribute, key_columns, out, [&](const size_t) { return null_value; });
}
}

View File

@ -1,41 +0,0 @@
#include "ComplexKeyCacheDictionary.h"
namespace DB
{
namespace ErrorCodes
{
extern const int TYPE_MISMATCH;
}
#define DECLARE(TYPE) \
void ComplexKeyCacheDictionary::get##TYPE(const std::string & attribute_name, \
const Columns & key_columns, \
const DataTypes & key_types, \
const PaddedPODArray<TYPE> & def, \
ResultArrayType<TYPE> & out) const \
{ \
dict_struct.validateKeyTypes(key_types); \
\
auto & attribute = getAttribute(attribute_name); \
if (!isAttributeTypeConvertibleTo(attribute.type, AttributeUnderlyingType::TYPE)) \
throw Exception{name + ": type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type), \
ErrorCodes::TYPE_MISMATCH}; \
\
getItemsNumber<TYPE>(attribute, key_columns, out, [&](const size_t row) { return def[row]; }); \
}
DECLARE(UInt8)
DECLARE(UInt16)
DECLARE(UInt32)
DECLARE(UInt64)
DECLARE(UInt128)
DECLARE(Int8)
DECLARE(Int16)
DECLARE(Int32)
DECLARE(Int64)
DECLARE(Float32)
DECLARE(Float64)
DECLARE(Decimal32)
DECLARE(Decimal64)
DECLARE(Decimal128)
#undef DECLARE
}

View File

@ -0,0 +1,27 @@
#include <Dictionaries/ComplexKeyCacheDictionary.h>
namespace DB
{
namespace ErrorCodes
{
extern const int TYPE_MISMATCH;
}
using TYPE = @NAME@;
void ComplexKeyCacheDictionary::get@NAME@(const std::string & attribute_name,
const Columns & key_columns,
const DataTypes & key_types,
const PaddedPODArray<TYPE> & def,
ResultArrayType<TYPE> & out) const
{
dict_struct.validateKeyTypes(key_types);
auto & attribute = getAttribute(attribute_name);
if (!isAttributeTypeConvertibleTo(attribute.type, AttributeUnderlyingType::@NAME@))
throw Exception {name + ": type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type),
ErrorCodes::TYPE_MISMATCH};
getItemsNumber<TYPE>(attribute, key_columns, out, [&](const size_t row) { return def[row]; });
}
}

View File

@ -1,41 +0,0 @@
#include "ComplexKeyCacheDictionary.h"
namespace DB
{
namespace ErrorCodes
{
extern const int TYPE_MISMATCH;
}
#define DECLARE(TYPE) \
void ComplexKeyCacheDictionary::get##TYPE(const std::string & attribute_name, \
const Columns & key_columns, \
const DataTypes & key_types, \
const TYPE def, \
ResultArrayType<TYPE> & out) const \
{ \
dict_struct.validateKeyTypes(key_types); \
\
auto & attribute = getAttribute(attribute_name); \
if (!isAttributeTypeConvertibleTo(attribute.type, AttributeUnderlyingType::TYPE)) \
throw Exception{name + ": type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type), \
ErrorCodes::TYPE_MISMATCH}; \
\
getItemsNumber<TYPE>(attribute, key_columns, out, [&](const size_t) { return def; }); \
}
DECLARE(UInt8)
DECLARE(UInt16)
DECLARE(UInt32)
DECLARE(UInt64)
DECLARE(UInt128)
DECLARE(Int8)
DECLARE(Int16)
DECLARE(Int32)
DECLARE(Int64)
DECLARE(Float32)
DECLARE(Float64)
DECLARE(Decimal32)
DECLARE(Decimal64)
DECLARE(Decimal128)
#undef DECLARE
}

View File

@ -0,0 +1,27 @@
#include <Dictionaries/ComplexKeyCacheDictionary.h>
namespace DB
{
namespace ErrorCodes
{
extern const int TYPE_MISMATCH;
}
using TYPE = @NAME@;
void ComplexKeyCacheDictionary::get@NAME@(const std::string & attribute_name,
const Columns & key_columns,
const DataTypes & key_types,
const TYPE def,
ResultArrayType<TYPE> & out) const
{
dict_struct.validateKeyTypes(key_types);
auto & attribute = getAttribute(attribute_name);
if (!isAttributeTypeConvertibleTo(attribute.type, AttributeUnderlyingType::@NAME@))
throw Exception {name + ": type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type),
ErrorCodes::TYPE_MISMATCH};
getItemsNumber<TYPE>(attribute, key_columns, out, [&](const size_t) { return def; });
}
}

View File

@ -1,4 +1,4 @@
#include <Dictionaries/ComplexKeyCacheDictionary.h>
#include "ComplexKeyCacheDictionary.h"
namespace DB
{

View File

@ -1,4 +1,4 @@
#include <Dictionaries/ComplexKeyCacheDictionary.h>
#include "ComplexKeyCacheDictionary.h"
namespace DB
{

View File

@ -1,8 +1,8 @@
#include <ext/map.h>
#include <ext/range.h>
#include <Dictionaries/ComplexKeyHashedDictionary.h>
#include <Dictionaries/DictionaryBlockInputStream.h>
#include "ComplexKeyHashedDictionary.h"
#include "DictionaryBlockInputStream.h"
#include "DictionaryFactory.h"
namespace DB
{
@ -661,5 +661,24 @@ BlockInputStreamPtr ComplexKeyHashedDictionary::getBlockInputStream(const Names
return std::make_shared<BlockInputStreamType>(shared_from_this(), max_block_size, getKeys(), column_names);
}
void registerDictionaryComplexKeyHashed(DictionaryFactory & factory)
{
auto create_layout = [=](
const std::string & name,
const DictionaryStructure & dict_struct,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
DictionarySourcePtr source_ptr
) -> DictionaryPtr {
if (!dict_struct.key)
throw Exception {"'key' is required for dictionary of layout 'complex_key_hashed'", ErrorCodes::BAD_ARGUMENTS};
const DictionaryLifetime dict_lifetime {config, config_prefix + ".lifetime"};
const bool require_nonempty = config.getBool(config_prefix + ".require_nonempty", false);
return std::make_unique<ComplexKeyHashedDictionary>(name, dict_struct, std::move(source_ptr), dict_lifetime, require_nonempty);
};
factory.registerLayout("complex_key_hashed", create_layout);
}
}

View File

@ -1,8 +1,8 @@
#pragma once
#include <Dictionaries/IDictionary.h>
#include <Dictionaries/IDictionarySource.h>
#include <Dictionaries/DictionaryStructure.h>
#include "IDictionary.h"
#include "IDictionarySource.h"
#include "DictionaryStructure.h"
#include <common/StringRef.h>
#include <Common/HashTable/HashMap.h>
#include <Columns/ColumnDecimal.h>
@ -243,5 +243,4 @@ private:
BlockPtr saved_block;
};
}

View File

@ -6,9 +6,9 @@
#include <Columns/IColumn.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataTypes/DataTypesNumber.h>
#include <Dictionaries/DictionaryBlockInputStreamBase.h>
#include <Dictionaries/DictionaryStructure.h>
#include <Dictionaries/IDictionary.h>
#include "DictionaryBlockInputStreamBase.h"
#include "DictionaryStructure.h"
#include "IDictionary.h"
#include <ext/range.h>
#include <common/logger_useful.h>
#include <Core/Names.h>

View File

@ -1,4 +1,4 @@
#include <Dictionaries/DictionaryBlockInputStreamBase.h>
#include "DictionaryBlockInputStreamBase.h"
namespace DB
{

View File

@ -0,0 +1,51 @@
#include "DictionaryFactory.h"
#include <memory>
#include "DictionarySourceFactory.h"
#include "DictionaryStructure.h"
namespace DB
{
namespace ErrorCodes
{
extern const int EXCESSIVE_ELEMENT_IN_CONFIG;
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
}
void DictionaryFactory::registerLayout(const std::string & layout_type, Creator create_layout)
{
//LOG_DEBUG(log, "Register dictionary layout type `" + layout_type + "`");
if (!registered_layouts.emplace(layout_type, std::move(create_layout)).second)
throw Exception("DictionaryFactory: the layout name '" + layout_type + "' is not unique", ErrorCodes::LOGICAL_ERROR);
}
DictionaryPtr DictionaryFactory::create(
const std::string & name, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, Context & context) const
{
Poco::Util::AbstractConfiguration::Keys keys;
const auto & layout_prefix = config_prefix + ".layout";
config.keys(layout_prefix, keys);
if (keys.size() != 1)
throw Exception {name + ": element dictionary.layout should have exactly one child element",
ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG};
const DictionaryStructure dict_struct {config, config_prefix + ".structure"};
auto source_ptr = DictionarySourceFactory::instance().create(name, config, config_prefix + ".source", dict_struct, context);
const auto & layout_type = keys.front();
{
const auto found = registered_layouts.find(layout_type);
if (found != registered_layouts.end())
{
const auto & create_layout = found->second;
return create_layout(name, dict_struct, config, config_prefix, std::move(source_ptr));
}
}
throw Exception {name + ": unknown dictionary layout type: " + layout_type, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG};
}
}

View File

@ -1,20 +1,41 @@
#pragma once
#include <Dictionaries/IDictionary.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <ext/singleton.h>
#include "IDictionary.h"
namespace Poco
{
namespace Util
{
class AbstractConfiguration;
}
class Logger;
}
namespace DB
{
class Context;
class DictionaryFactory : public ext::singleton<DictionaryFactory>
{
public:
DictionaryPtr create(const std::string & name, const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix, Context & context) const;
DictionaryPtr
create(const std::string & name, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, Context & context)
const;
using Creator = std::function<DictionaryPtr(
const std::string & name,
const DictionaryStructure & dict_struct,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
DictionarySourcePtr source_ptr)>;
void registerLayout(const std::string & layout_type, Creator create_layout);
private:
using LayoutRegistry = std::unordered_map<std::string, Creator>;
LayoutRegistry registered_layouts;
};
}

View File

@ -1,41 +1,16 @@
#include <Dictionaries/DictionarySourceFactory.h>
#include "DictionarySourceFactory.h"
#include <Core/Block.h>
#include <Dictionaries/DictionaryStructure.h>
#include <Dictionaries/FileDictionarySource.h>
#include <Dictionaries/ClickHouseDictionarySource.h>
#include <Dictionaries/ExecutableDictionarySource.h>
#include <Dictionaries/HTTPDictionarySource.h>
#include <Dictionaries/LibraryDictionarySource.h>
#include <Dictionaries/XDBCDictionarySource.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeNullable.h>
#include <Common/FieldVisitors.h>
#include <Common/XDBCBridgeHelper.h>
#include <Columns/ColumnsNumber.h>
#include <IO/HTTPCommon.h>
#include <memory>
#include <mutex>
#include <Common/config.h>
#if USE_POCO_MONGODB
#include <Dictionaries/MongoDBDictionarySource.h>
#endif
#if USE_POCO_SQLODBC || USE_POCO_DATAODBC
#include <Poco/Data/ODBC/Connector.h>
#endif
#if USE_MYSQL
#include <Dictionaries/MySQLDictionarySource.h>
#endif
#include <Core/Block.h>
#include <Core/ColumnWithTypeAndName.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h>
#include <Poco/Logger.h>
#include <common/logger_useful.h>
#include "DictionaryStructure.h"
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
@ -46,149 +21,78 @@ namespace ErrorCodes
namespace
{
Block createSampleBlock(const DictionaryStructure & dict_struct)
{
Block block;
if (dict_struct.id)
block.insert(ColumnWithTypeAndName{ColumnUInt64::create(1, 0), std::make_shared<DataTypeUInt64>(), dict_struct.id->name});
if (dict_struct.key)
Block createSampleBlock(const DictionaryStructure & dict_struct)
{
for (const auto & attribute : *dict_struct.key)
Block block;
if (dict_struct.id)
block.insert(ColumnWithTypeAndName {ColumnUInt64::create(1, 0), std::make_shared<DataTypeUInt64>(), dict_struct.id->name});
if (dict_struct.key)
{
for (const auto & attribute : *dict_struct.key)
{
auto column = attribute.type->createColumn();
column->insertDefault();
block.insert(ColumnWithTypeAndName {std::move(column), attribute.type, attribute.name});
}
}
if (dict_struct.range_min)
{
for (const auto & attribute : {dict_struct.range_min, dict_struct.range_max})
{
const auto & type = std::make_shared<DataTypeNullable>(attribute->type);
auto column = type->createColumn();
column->insertDefault();
block.insert(ColumnWithTypeAndName {std::move(column), type, attribute->name});
}
}
for (const auto & attribute : dict_struct.attributes)
{
auto column = attribute.type->createColumn();
column->insertDefault();
column->insert(attribute.null_value);
block.insert(ColumnWithTypeAndName{std::move(column), attribute.type, attribute.name});
block.insert(ColumnWithTypeAndName {std::move(column), attribute.type, attribute.name});
}
return block;
}
if (dict_struct.range_min)
{
for (const auto & attribute : { dict_struct.range_min, dict_struct.range_max })
{
const auto & type = std::make_shared<DataTypeNullable>(attribute->type);
auto column = type->createColumn();
column->insertDefault();
block.insert(ColumnWithTypeAndName{std::move(column), type, attribute->name});
}
}
for (const auto & attribute : dict_struct.attributes)
{
auto column = attribute.type->createColumn();
column->insert(attribute.null_value);
block.insert(ColumnWithTypeAndName{std::move(column), attribute.type, attribute.name});
}
return block;
}
}
DictionarySourceFactory::DictionarySourceFactory()
: log(&Poco::Logger::get("DictionarySourceFactory"))
DictionarySourceFactory::DictionarySourceFactory() : log(&Poco::Logger::get("DictionarySourceFactory"))
{
#if USE_POCO_SQLODBC || USE_POCO_DATAODBC
Poco::Data::ODBC::Connector::registerConnector();
#endif
}
void DictionarySourceFactory::registerSource(const std::string & source_type, Creator create_source)
{
LOG_DEBUG(log, "Register dictionary source type `" + source_type + "`");
if (!registered_sources.emplace(source_type, std::move(create_source)).second)
throw Exception("DictionarySourceFactory: the source name '" + source_type + "' is not unique",
ErrorCodes::LOGICAL_ERROR);
throw Exception("DictionarySourceFactory: the source name '" + source_type + "' is not unique", ErrorCodes::LOGICAL_ERROR);
}
DictionarySourcePtr DictionarySourceFactory::create(
const std::string & name, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix,
const DictionaryStructure & dict_struct, Context & context) const
const std::string & name,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const DictionaryStructure & dict_struct,
Context & context) const
{
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);
if (keys.size() != 1)
throw Exception{name +": element dictionary.source should have exactly one child element", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG};
throw Exception {name + ": element dictionary.source should have exactly one child element",
ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG};
auto sample_block = createSampleBlock(dict_struct);
const auto & source_type = keys.front();
if ("file" == source_type)
{
if (dict_struct.has_expressions)
throw Exception{"Dictionary source of type `file` does not support attribute expressions", ErrorCodes::LOGICAL_ERROR};
const auto filename = config.getString(config_prefix + ".file.path");
const auto format = config.getString(config_prefix + ".file.format");
return std::make_unique<FileDictionarySource>(filename, format, sample_block, context);
}
else if ("mysql" == source_type)
{
#if USE_MYSQL
return std::make_unique<MySQLDictionarySource>(dict_struct, config, config_prefix + ".mysql", sample_block);
#else
throw Exception{"Dictionary source of type `mysql` is disabled because ClickHouse was built without mysql support.",
ErrorCodes::SUPPORT_IS_DISABLED};
#endif
}
else if ("clickhouse" == source_type)
{
return std::make_unique<ClickHouseDictionarySource>(dict_struct, config, config_prefix + ".clickhouse",
sample_block, context);
}
else if ("mongodb" == source_type)
{
#if USE_POCO_MONGODB
return std::make_unique<MongoDBDictionarySource>(dict_struct, config, config_prefix + ".mongodb", sample_block);
#else
throw Exception{"Dictionary source of type `mongodb` is disabled because poco library was built without mongodb support.",
ErrorCodes::SUPPORT_IS_DISABLED};
#endif
}
else if ("odbc" == source_type)
{
#if USE_POCO_SQLODBC || USE_POCO_DATAODBC
BridgeHelperPtr bridge = std::make_shared<XDBCBridgeHelper<ODBCBridgeMixin>>(context, context.getSettings().http_receive_timeout, config.getString(config_prefix + ".odbc.connection_string"));
return std::make_unique<XDBCDictionarySource>(dict_struct, config, config_prefix + ".odbc", sample_block, context, bridge);
#else
throw Exception{"Dictionary source of type `odbc` is disabled because poco library was built without ODBC support.",
ErrorCodes::SUPPORT_IS_DISABLED};
#endif
}
else if ("jdbc" == source_type)
{
throw Exception{"Dictionary source of type `jdbc` is disabled until consistent support for nullable fields.",
ErrorCodes::SUPPORT_IS_DISABLED};
// BridgeHelperPtr bridge = std::make_shared<XDBCBridgeHelper<JDBCBridgeMixin>>(config, context.getSettings().http_receive_timeout, config.getString(config_prefix + ".connection_string"));
// return std::make_unique<XDBCDictionarySource>(dict_struct, config, config_prefix + ".jdbc", sample_block, context, bridge);
}
else if ("executable" == source_type)
{
if (dict_struct.has_expressions)
throw Exception{"Dictionary source of type `executable` does not support attribute expressions", ErrorCodes::LOGICAL_ERROR};
return std::make_unique<ExecutableDictionarySource>(dict_struct, config, config_prefix + ".executable", sample_block, context);
}
else if ("http" == source_type)
{
if (dict_struct.has_expressions)
throw Exception{"Dictionary source of type `http` does not support attribute expressions", ErrorCodes::LOGICAL_ERROR};
return std::make_unique<HTTPDictionarySource>(dict_struct, config, config_prefix + ".http", sample_block, context);
}
else if ("library" == source_type)
{
return std::make_unique<LibraryDictionarySource>(dict_struct, config, config_prefix + ".library", sample_block, context);
}
else
{
const auto found = registered_sources.find(source_type);
if (found != registered_sources.end())
@ -198,7 +102,7 @@ DictionarySourcePtr DictionarySourceFactory::create(
}
}
throw Exception{name + ": unknown dictionary source type: " + source_type, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG};
throw Exception {name + ": unknown dictionary source type: " + source_type, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG};
}
}

View File

@ -1,23 +1,22 @@
#pragma once
#include <Dictionaries/IDictionarySource.h>
#include <ext/singleton.h>
#include "IDictionarySource.h"
#include <unordered_map>
#include <ext/singleton.h>
namespace Poco
{
namespace Util
{
class AbstractConfiguration;
}
namespace Util
{
class AbstractConfiguration;
}
class Logger;
class Logger;
}
namespace DB
{
class Context;
struct DictionaryStructure;
@ -30,15 +29,18 @@ public:
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
Block & sample_block,
const Context & context)>;
Context & context)>;
DictionarySourceFactory();
void registerSource(const std::string & source_type, Creator create_source);
DictionarySourcePtr create(
const std::string & name, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix,
const DictionaryStructure & dict_struct, Context & context) const;
const std::string & name,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const DictionaryStructure & dict_struct,
Context & context) const;
private:
using SourceRegistry = std::unordered_map<std::string, Creator>;

View File

@ -1,5 +1,5 @@
#include <Dictionaries/DictionarySourceHelpers.h>
#include <Dictionaries/DictionaryStructure.h>
#include "DictionarySourceHelpers.h"
#include "DictionaryStructure.h"
#include <Core/ColumnWithTypeAndName.h>
#include <Core/Block.h>
#include <Columns/ColumnsNumber.h>

View File

@ -1,4 +1,4 @@
#include <Dictionaries/DictionaryStructure.h>
#include "DictionaryStructure.h"
#include <Formats/FormatSettings.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeNullable.h>

View File

@ -0,0 +1,5 @@
include(${ClickHouse_SOURCE_DIR}/cmake/dbms_glob_sources.cmake)
add_headers_and_sources(clickhouse_dictionaries_embedded .)
add_headers_and_sources(clickhouse_dictionaries_embedded GeodataProviders)
add_library(clickhouse_dictionaries_embedded ${LINK_MODE} ${clickhouse_dictionaries_embedded_sources})
target_link_libraries(clickhouse_dictionaries_embedded PRIVATE clickhouse_common_io ${MYSQLXX_LIBRARY})

View File

@ -1,8 +1,8 @@
#include <Dictionaries/Embedded/GeoDictionariesLoader.h>
#include <Dictionaries/Embedded/GeodataProviders/HierarchiesProvider.h>
#include <Dictionaries/Embedded/GeodataProviders/NamesProvider.h>
#include "GeoDictionariesLoader.h"
#include <Poco/Util/AbstractConfiguration.h>
#include "GeodataProviders/HierarchiesProvider.h"
#include "GeodataProviders/NamesProvider.h"
std::unique_ptr<RegionsHierarchies> GeoDictionariesLoader::reloadRegionsHierarchies(
const Poco::Util::AbstractConfiguration & config)

View File

@ -1,6 +1,6 @@
#pragma once
#include <Dictionaries/Embedded/IGeoDictionariesLoader.h>
#include "IGeoDictionariesLoader.h"
// Default implementation of geo dictionaries loader used by native server application

View File

@ -1,6 +1,6 @@
#pragma once
#include <Dictionaries/Embedded/GeodataProviders/Types.h>
#include "Types.h"
#include <string>
struct RegionEntry

View File

@ -1,8 +1,7 @@
#include <Dictionaries/Embedded/GeodataProviders/HierarchiesProvider.h>
#include <Dictionaries/Embedded/GeodataProviders/HierarchyFormatReader.h>
#include "HierarchiesProvider.h"
#include "HierarchyFormatReader.h"
#include <IO/ReadBufferFromFile.h>
#include <Poco/Util/Application.h>
#include <Poco/Exception.h>
#include <Poco/DirectoryIterator.h>

View File

@ -1,9 +1,8 @@
#pragma once
#include <Dictionaries/Embedded/GeodataProviders/IHierarchiesProvider.h>
#include "IHierarchiesProvider.h"
#include <Common/FileUpdatesTracker.h>
#include <unordered_map>

View File

@ -1,4 +1,4 @@
#include <Dictionaries/Embedded/GeodataProviders/HierarchyFormatReader.h>
#include "HierarchyFormatReader.h"
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>

View File

@ -1,7 +1,6 @@
#pragma once
#include <Dictionaries/Embedded/GeodataProviders/IHierarchiesProvider.h>
#include "IHierarchiesProvider.h"
#include <IO/ReadBuffer.h>

View File

@ -1,7 +1,6 @@
#pragma once
#include <Dictionaries/Embedded/GeodataProviders/Entries.h>
#include "Entries.h"
#include <memory>
#include <string>
#include <vector>

View File

@ -1,7 +1,6 @@
#pragma once
#include <Dictionaries/Embedded/GeodataProviders/Entries.h>
#include "Entries.h"
#include <memory>

View File

@ -1,4 +1,4 @@
#include <Dictionaries/Embedded/GeodataProviders/NamesFormatReader.h>
#include "NamesFormatReader.h"
#include <IO/ReadHelpers.h>

View File

@ -1,7 +1,6 @@
#pragma once
#include <Dictionaries/Embedded/GeodataProviders/INamesProvider.h>
#include "INamesProvider.h"
#include <IO/ReadBuffer.h>

View File

@ -1,6 +1,6 @@
#include <Dictionaries/Embedded/GeodataProviders/NamesProvider.h>
#include <Dictionaries/Embedded/GeodataProviders/NamesFormatReader.h>
#include "NamesProvider.h"
#include "NamesFormatReader.h"
#include <IO/ReadBufferFromFile.h>

View File

@ -1,7 +1,6 @@
#pragma once
#include <Dictionaries/Embedded/GeodataProviders/INamesProvider.h>
#include "INamesProvider.h"
#include <Common/FileUpdatesTracker.h>

View File

@ -1,12 +1,19 @@
#pragma once
#include <Dictionaries/Embedded/RegionsHierarchies.h>
#include <Dictionaries/Embedded/RegionsNames.h>
#include <Poco/Util/AbstractConfiguration.h>
#include "RegionsHierarchies.h"
#include "RegionsNames.h"
#include <memory>
namespace Poco
{
namespace Util
{
class AbstractConfiguration;
}
class Logger;
}
// Provides actual versions of geo dictionaries (regions hierarchies, regions names)
// Bind data structures (RegionsHierarchies, RegionsNames) with data providers

View File

@ -1,7 +1,6 @@
#include <Dictionaries/Embedded/RegionsHierarchies.h>
#include "RegionsHierarchies.h"
#include <common/logger_useful.h>
#include <Poco/DirectoryIterator.h>

View File

@ -1,10 +1,8 @@
#pragma once
#include <Dictionaries/Embedded/RegionsHierarchy.h>
#include <Dictionaries/Embedded/GeodataProviders/IHierarchiesProvider.h>
#include "RegionsHierarchy.h"
#include "GeodataProviders/IHierarchiesProvider.h"
#include <Poco/Exception.h>
#include <unordered_map>

View File

@ -1,12 +1,10 @@
#include <Dictionaries/Embedded/RegionsHierarchy.h>
#include <Dictionaries/Embedded/GeodataProviders/IHierarchiesProvider.h>
#include "RegionsHierarchy.h"
#include "GeodataProviders/IHierarchiesProvider.h"
#include <Poco/Util/Application.h>
#include <Poco/Exception.h>
#include <common/logger_useful.h>
#include <ext/singleton.h>
#include <IO/WriteHelpers.h>

View File

@ -1,7 +1,6 @@
#pragma once
#include <Dictionaries/Embedded/GeodataProviders/IHierarchiesProvider.h>
#include "GeodataProviders/IHierarchiesProvider.h"
#include <vector>
#include <boost/noncopyable.hpp>
#include <common/Types.h>

View File

@ -1,11 +1,9 @@
#include <Dictionaries/Embedded/RegionsNames.h>
#include <Dictionaries/Embedded/GeodataProviders/INamesProvider.h>
#include "RegionsNames.h"
#include "GeodataProviders/INamesProvider.h"
#include <Poco/Util/Application.h>
#include <Poco/Exception.h>
#include <common/logger_useful.h>
#include <IO/WriteHelpers.h>
namespace DB

View File

@ -1,12 +1,9 @@
#pragma once
#include <Dictionaries/Embedded/GeodataProviders/INamesProvider.h>
#include "GeodataProviders/INamesProvider.h"
#include <Poco/Exception.h>
#include <common/Types.h>
#include <common/StringRef.h>
#include <string>
#include <vector>

View File

@ -1,7 +1,7 @@
#include <Common/config.h>
#if USE_MYSQL
#include <Dictionaries/Embedded/TechDataHierarchy.h>
#include "TechDataHierarchy.h"
#include <common/logger_useful.h>
#include <mysqlxx/PoolWithFailover.h>

View File

@ -1,12 +1,18 @@
#pragma once
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/Exception.h>
#include <common/Types.h>
#include <ext/singleton.h>
namespace Poco
{
namespace Util
{
class AbstractConfiguration;
}
class Logger;
}
/** @brief Class that lets you know if a search engine or operating system belongs
* another search engine or operating system, respectively.

View File

@ -1,12 +1,15 @@
#include "ExecutableDictionarySource.h"
#include <thread>
#include <future>
#include <Dictionaries/ExecutableDictionarySource.h>
#include <Common/ShellCommand.h>
#include <Interpreters/Context.h>
#include <DataStreams/OwningBlockInputStream.h>
#include <Dictionaries/DictionarySourceHelpers.h>
#include "DictionarySourceHelpers.h"
#include <DataStreams/IBlockOutputStream.h>
#include <common/logger_useful.h>
#include "DictionarySourceFactory.h"
#include "DictionaryStructure.h"
namespace DB
@ -229,4 +232,19 @@ std::string ExecutableDictionarySource::toString() const
return "Executable: " + command;
}
void registerDictionarySourceExecutable(DictionarySourceFactory & factory)
{
auto createTableSource = [=](const DictionaryStructure & dict_struct,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
Block & sample_block,
const Context & context) -> DictionarySourcePtr {
if (dict_struct.has_expressions)
throw Exception {"Dictionary source of type `executable` does not support attribute expressions", ErrorCodes::LOGICAL_ERROR};
return std::make_unique<ExecutableDictionarySource>(dict_struct, config, config_prefix + ".executable", sample_block, context);
};
factory.registerSource("executable", createTableSource);
}
}

View File

@ -1,7 +1,7 @@
#pragma once
#include <Dictionaries/IDictionarySource.h>
#include <Dictionaries/DictionaryStructure.h>
#include "IDictionarySource.h"
#include "DictionaryStructure.h"
namespace Poco { class Logger; }

View File

@ -3,9 +3,9 @@
#include <IO/WriteBuffer.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <Dictionaries/writeParenthesisedString.h>
#include <Dictionaries/DictionaryStructure.h>
#include <Dictionaries/ExternalQueryBuilder.h>
#include "writeParenthesisedString.h"
#include "DictionaryStructure.h"
#include "ExternalQueryBuilder.h"
namespace DB

View File

@ -1,5 +1,5 @@
#include <ext/range.h>
#include <Dictionaries/ExternalResultDescription.h>
#include "ExternalResultDescription.h"
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeDate.h>

View File

@ -1,9 +1,11 @@
#include "FileDictionarySource.h"
#include <Interpreters/Context.h>
#include <DataStreams/OwningBlockInputStream.h>
#include <Dictionaries/FileDictionarySource.h>
#include <IO/ReadBufferFromFile.h>
#include <Poco/File.h>
#include "DictionarySourceFactory.h"
#include "DictionaryStructure.h"
namespace DB
{
@ -46,4 +48,23 @@ Poco::Timestamp FileDictionarySource::getLastModification() const
return Poco::File{filename}.getLastModified();
}
void registerDictionarySourceFile(DictionarySourceFactory & factory)
{
auto createTableSource = [=](const DictionaryStructure & dict_struct,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
Block & sample_block,
const Context & context) -> DictionarySourcePtr {
if (dict_struct.has_expressions)
throw Exception {"Dictionary source of type `file` does not support attribute expressions", ErrorCodes::LOGICAL_ERROR};
const auto filename = config.getString(config_prefix + ".file.path");
const auto format = config.getString(config_prefix + ".file.format");
return std::make_unique<FileDictionarySource>(filename, format, sample_block, context);
};
factory.registerSource("file", createTableSource);
}
}

Some files were not shown because too many files have changed in this diff Show More