Merge branch 'master' into unify-data-types-that-serialized-with-multiple-streams

This commit is contained in:
Alexey Milovidov 2017-08-18 22:41:27 +03:00
commit e01c23267a
313 changed files with 6284 additions and 3435 deletions

1
.gitignore vendored
View File

@ -199,7 +199,6 @@ vgcore*
*.changes
build-stamp
configure-stamp
debian/changelog
debian/*.debhelper.log
debian/*.debhelper
debian/*.substvars

103
CHANGELOG_RU.md Normal file
View File

@ -0,0 +1,103 @@
# Релиз ClickHouse 1.1.54276
## Новые возможности:
* Добавлена опциональная секция WITH запроса SELECT. Пример запроса: `WITH 1+1 AS a SELECT a, a*a`
* Добавлена возможность синхронной вставки в Distributed таблицу: выдается Ok только после того как все данные записались на все шарды. Активируется настройкой insert_distributed_sync=1
* Добавлен тип данных UUID для работы с 16-байтовыми идентификаторами
* Добавлены алиасы типов CHAR, FLOAT и т.д. для совместимости с Tableau
* Добавлены функции toYYYYMM, toYYYYMMDD, toYYYYMMDDhhmmss для перевода времени в числа
* Добавлена возможность использовать IP адреса (совместно с hostname) для идентификации сервера при работе с кластерными DDL запросами
* Добавлена поддержка неконстантных аргументов и отрицательных смещений в функции `substring(str, pos, len)`
* Добавлен параметр max_size для агрегатной функции `groupArray(max_size)(column)`, и оптимизирована её производительность
## Основные изменения:
* Улучшение безопасности: все файлы сервера создаются с правами 0640
* Улучшены сообщения об ошибках в случае синтаксически неверных запросов
* Значительно уменьшен расход оперативной памяти и улучшена производительность слияний больших MergeTree-кусков данных
* Значительно увеличена производительность слияний данных для движка ReplacingMergeTree
* Улучшена производительность асинхронных вставок из Distributed таблицы за счет объединения нескольких исходных вставок. Функционал включается настройкой distributed_directory_monitor_batch_inserts=1.
## Обратно несовместимые изменения:
* Изменился бинарный формат агрегатных состояний функции `groupArray(array_column)` для массивов
## Полный список изменений:
* Добавлен вывод nan и inf значений в формате JSON
* Более оптимальное выделение потоков при чтении из Distributed таблиц
* Разрешено задавать настройки в режиме readonly, если их значение не изменяется
* Добавлена возможность считывать нецелые гранулы движка MergeTree для выполнения ограничений на размер блока, задаваемый настройкой preferred_block_size_bytes - для уменьшения потребления оперативной памяти и увеличения кэш-локальности при обработке запросов из таблиц со столбцами большого размера
* Эффективное использование индекса, содержащего выражения типа `toStartOfHour(x)`, для условий вида `toStartOfHour(x) op сonstexpr`
* Добавлены новые настройки для MergeTree движков (секция merge_tree в config.xml):
- replicated_deduplication_window_seconds позволяет задать интервал дедупликации вставок в Replicated-таблицы в секундах
- cleanup_delay_period - периодичность запуска очистки неактуальных данных
- replicated_can_become_leader - запретить реплике становиться лидером (и назначать мержи)
* Ускорена очистка неактуальных данных из ZooKeeper
* Множественные улучшения и исправления работы кластерных DDL запросов. В частности, добавлена настройка distributed_ddl_task_timeout, ограничивающая время ожидания ответов серверов кластера.
* Улучшено отображение стэктрейсов в логах сервера
* Добавлен метод сжатия none
* Возможность использования нескольких секций dictionaries_config в config.xml
* Возможность подключения к MySQL через сокет на файловой системе
* В таблицу system.parts добавлен столбец с информацией о размере marks в байтах
## Исправления багов:
* Исправлена некорректная работа Distributed таблиц, использующих Merge таблицы, при SELECT с условием на поле _table
* Исправлен редкий race condition в ReplicatedMergeTree при проверке кусков данных
* Исправлено возможное зависание процедуры leader election при старте сервера
* Исправлено игнорирование настройки max_replica_delay_for_distributed_queries при использовании локальной реплики в качестве источника данных
* Исправлено некорректное поведение `ALTER TABLE CLEAR COLUMN IN PARTITION` при попытке очистить несуществующую колонку
* Исправлено исключение в функции multiIf при использовании пустых массивов или строк
* Исправлено чрезмерное выделение памяти при десериализации формата Native
* Исправлено некорректное автообновление Trie словарей
* Исправлено исключение при выполнении запросов с GROUP BY из Merge-таблицы при использовании SAMPLE
* Исправлено падение GROUP BY при использовании настройки distributed_aggregation_memory_efficient=1
* Добавлена возможность указывать database.table в правой части IN и JOIN
* Исправлено использование слишком большого количества потоков при параллельной агрегации
* Исправлена работа функции if с аргументами FixedString
* Исправлена некорректная работа SELECT из Distributed-таблицы для шардов с весом 0
* Исправлено падение запроса `CREATE VIEW IF EXISTS`
* Исправлено некорректное поведение при input_format_skip_unknown_fields=1 в случае отрицательных чисел
* Исправлен бесконечный цикл в функции `dictGetHierarchy()` в случае некоторых некорректных данных словаря
Исправлены ошибки типа `Syntax error: unexpected (...)` при выполнении распределенных запросов с подзапросами в секции IN или JOIN, в случае * использования совместно с Merge таблицами
* Исправлена неправильная интерпретация SELECT запроса из таблиц типа Dictionary
* Исправлена ошибка "Cannot mremap" при использовании множеств в секциях IN, JOIN, содержащих более 2 млрд. элементов
* Исправлен failover для словарей с источником MySQL
## Улучшения процесса разработки и сборки ClickHouse:
* Добавлена возмозможность сборки в Arcadia
* Добавлена возможность сборки с помощью gcc 7
* Ускорена параллельная сборка с помощью ccache+distcc
# Релиз ClickHouse 1.1.54245
## Новые возможности:
* Распределённые DDL (например, `CREATE TABLE ON CLUSTER`)
* Реплицируемый запрос `ALTER TABLE CLEAR COLUMN IN PARTITION`
* Движок таблиц Dictionary (доступ к данным словаря в виде таблицы)
* Движок баз данных Dictionary (в такой базе автоматически доступны Dictionary-таблицы для всех подключённых внешних словарей)
* Возможность проверки необходимости обновления словаря путём отправки запроса в источник
* Qualified имена столбцов
* Квотирование идентификаторов двойными кавычками
* Сессии в HTTP интерфейсе
* Запрос OPTIMIZE для Replicated таблицы теперь можно выполнять не только на лидере
## Обратно несовместимые изменения:
* Убрана команда SET GLOBAL
## Мелкие изменения:
* Теперь после получения сигнала в лог печатается полный стектрейс
* Ослаблена проверка на количество повреждённых/лишних кусков при старте (было слишком много ложных срабатываний)
## Исправления багов:
* Исправлено залипание плохого соединения при вставке в Distributed таблицу
* GLOBAL IN теперь работает при запросе из таблицы Merge, смотрящей в Distributed
* Теперь правильно определяется количество ядер на виртуалках Google Compute Engine
* Исправления в работе executable источника кэшируемых внешних словарей
* Исправлены сравнения строк, содержащих нулевые символы
* Исправлено сравнение полей первичного ключа типа Float32 с константами
* Раньше неправильная оценка размера поля могла приводить к слишком большим аллокациям
* Исправлено падение при запросе Nullable столбца, добавленного в таблицу ALTER-ом
* Исправлено падение при сортировке по Nullable столбцу, если количество строк меньше LIMIT
* Исправлен ORDER BY подзапроса, состоящего только из константных значений
* Раньше Replicated таблица могла остаться в невалидном состоянии после неудавшегося DROP TABLE
* Алиасы для скалярных подзапросов с пустым результатом теперь не теряются
* Теперь запрос, в котором использовалась компиляция, не завершается ошибкой, если .so файл повреждается

View File

@ -45,10 +45,10 @@ message (STATUS "CMAKE_BUILD_TYPE: " ${CMAKE_BUILD_TYPE} )
set (CMAKE_CONFIGURATION_TYPES "RelWithDebInfo;Debug;Release;MinSizeRel;ASan;UBSan" CACHE STRING "" FORCE)
if (CMAKE_SYSTEM_PROCESSOR MATCHES "^(aarch64.*|AARCH64.*)")
set (AARCH64 1)
set (ARCH_AARCH64 1)
endif ()
if (AARCH64 OR CMAKE_SYSTEM_PROCESSOR MATCHES "arm")
set (ARM 1)
if (ARCH_AARCH64 OR CMAKE_SYSTEM_PROCESSOR MATCHES "arm")
set (ARCH_ARM 1)
endif ()
set (COMMON_WARNING_FLAGS "-Wall") # -Werror is also added inside directories with our own code.
@ -175,6 +175,7 @@ if (NOT OPENSSL_FOUND)
message (FATAL_ERROR "Need openssl for build. debian tip: sudo apt install libssl-dev")
endif ()
include (cmake/lib_name.cmake)
include (cmake/find_icu4c.cmake)
include (cmake/find_boost.cmake)
# openssl, zlib before poco
@ -183,19 +184,24 @@ include (cmake/find_zstd.cmake)
include (cmake/find_poco.cmake)
include (cmake/find_lz4.cmake)
include (cmake/find_sparsehash.cmake)
include (cmake/find_libtool.cmake)
include (cmake/find_rt.cmake)
include (cmake/find_readline_edit.cmake)
include (cmake/find_zookeeper.cmake)
include (cmake/find_double-conversion.cmake)
include (cmake/find_re2.cmake)
include (cmake/find_contrib_lib.cmake)
find_contrib_lib(cityhash)
find_contrib_lib(farmhash)
find_contrib_lib(metrohash)
find_contrib_lib(btrie)
find_contrib_lib(double-conversion)
# Need to process before "contrib" dir:
include (libs/libcommon/cmake/find_gperftools.cmake)
include (libs/libcommon/cmake/find_jemalloc.cmake)
include (libs/libcommon/cmake/find_cctz.cmake)
include (libs/libmysqlxx/cmake/find_mysqlclient.cmake)
include (libs/libdaemon/cmake/find_unwind.cmake)
include (cmake/lib_name.cmake)
set (FULL_C_FLAGS "${CMAKE_C_FLAGS} ${CMAKE_C_FLAGS_${CMAKE_BUILD_TYPE}}")

View File

@ -0,0 +1,44 @@
# - Try to find btrie headers and libraries.
#
# Usage of this module as follows:
#
# find_package(btrie)
#
# Variables used by this module, they can change the default behaviour and need
# to be set before calling find_package:
#
# BTRIE_ROOT_DIR Set this variable to the root installation of
# btrie if the module has problems finding
# the proper installation path.
#
# Variables defined by this module:
#
# BTRIE_FOUND System has btrie libs/headers
# BTRIE_LIBRARIES The btrie library/libraries
# BTRIE_INCLUDE_DIR The location of btrie headers
find_path(BTRIE_ROOT_DIR
NAMES include/btrie.h
)
find_library(BTRIE_LIBRARIES
NAMES btrie
PATHS ${BTRIE_ROOT_DIR}/lib ${BTRIE_LIBRARIES_PATHS}
)
find_path(BTRIE_INCLUDE_DIR
NAMES btrie.h
PATHS ${BTRIE_ROOT_DIR}/include ${BTRIE_INCLUDE_PATHS}
)
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(btrie DEFAULT_MSG
BTRIE_LIBRARIES
BTRIE_INCLUDE_DIR
)
mark_as_advanced(
BTRIE_ROOT_DIR
BTRIE_LIBRARIES
BTRIE_INCLUDE_DIR
)

View File

@ -0,0 +1,44 @@
# - Try to find cityhash headers and libraries.
#
# Usage of this module as follows:
#
# find_package(cityhash)
#
# Variables used by this module, they can change the default behaviour and need
# to be set before calling find_package:
#
# CITYHASH_ROOT_DIR Set this variable to the root installation of
# cityhash if the module has problems finding
# the proper installation path.
#
# Variables defined by this module:
#
# CITYHASH_FOUND System has cityhash libs/headers
# CITYHASH_LIBRARIES The cityhash library/libraries
# CITYHASH_INCLUDE_DIR The location of cityhash headers
find_path(CITYHASH_ROOT_DIR
NAMES include/city.h
)
find_library(CITYHASH_LIBRARIES
NAMES cityhash
PATHS ${CITYHASH_ROOT_DIR}/lib ${CITYHASH_LIBRARIES_PATHS}
)
find_path(CITYHASH_INCLUDE_DIR
NAMES city.h
PATHS ${CITYHASH_ROOT_DIR}/include ${CITYHASH_INCLUDE_PATHS}
)
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(cityhash DEFAULT_MSG
CITYHASH_LIBRARIES
CITYHASH_INCLUDE_DIR
)
mark_as_advanced(
CITYHASH_ROOT_DIR
CITYHASH_LIBRARIES
CITYHASH_INCLUDE_DIR
)

View File

@ -0,0 +1,44 @@
# - Try to find double-conversion headers and libraries.
#
# Usage of this module as follows:
#
# find_package(double-conversion)
#
# Variables used by this module, they can change the default behaviour and need
# to be set before calling find_package:
#
# DOUBLE_CONVERSION_ROOT_DIR Set this variable to the root installation of
# double-conversion if the module has problems finding
# the proper installation path.
#
# Variables defined by this module:
#
# DOUBLE_CONVERSION_FOUND System has double-conversion libs/headers
# DOUBLE_CONVERSION_LIBRARIES The double-conversion library/libraries
# DOUBLE_CONVERSION_INCLUDE_DIR The location of double-conversion headers
find_path(DOUBLE_CONVERSION_ROOT_DIR
NAMES include/double-conversion/double-conversion.h
)
find_library(DOUBLE_CONVERSION_LIBRARIES
NAMES double-conversion
PATHS ${DOUBLE_CONVERSION_ROOT_DIR}/lib ${BTRIE_CITYHASH_PATHS}
)
find_path(DOUBLE_CONVERSION_INCLUDE_DIR
NAMES double-conversion/double-conversion.h
PATHS ${DOUBLE_CONVERSION_ROOT_DIR}/include ${DOUBLE_CONVERSION_INCLUDE_PATHS}
)
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(double_conversion DEFAULT_MSG
DOUBLE_CONVERSION_LIBRARIES
DOUBLE_CONVERSION_INCLUDE_DIR
)
mark_as_advanced(
DOUBLE_CONVERSION_ROOT_DIR
DOUBLE_CONVERSION_LIBRARIES
DOUBLE_CONVERSION_INCLUDE_DIR
)

View File

@ -0,0 +1,44 @@
# - Try to find farmhash headers and libraries.
#
# Usage of this module as follows:
#
# find_package(farmhash)
#
# Variables used by this module, they can change the default behaviour and need
# to be set before calling find_package:
#
# FARMHASH_ROOT_DIR Set this variable to the root installation of
# farmhash if the module has problems finding
# the proper installation path.
#
# Variables defined by this module:
#
# FARMHASH_FOUND System has farmhash libs/headers
# FARMHASH_LIBRARIES The farmhash library/libraries
# FARMHASH_INCLUDE_DIR The location of farmhash headers
find_path(FARMHASH_ROOT_DIR
NAMES include/farmhash.h
)
find_library(FARMHASH_LIBRARIES
NAMES farmhash
PATHS ${FARMHASH_ROOT_DIR}/lib ${FARMHASH_LIBRARIES_PATHS}
)
find_path(FARMHASH_INCLUDE_DIR
NAMES farmhash.h
PATHS ${FARMHASH_ROOT_DIR}/include ${FARMHASH_INCLUDE_PATHS}
)
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(farmhash DEFAULT_MSG
FARMHASH_LIBRARIES
FARMHASH_INCLUDE_DIR
)
mark_as_advanced(
FARMHASH_ROOT_DIR
FARMHASH_LIBRARIES
FARMHASH_INCLUDE_DIR
)

View File

@ -0,0 +1,44 @@
# - Try to find metrohash headers and libraries.
#
# Usage of this module as follows:
#
# find_package(metrohash)
#
# Variables used by this module, they can change the default behaviour and need
# to be set before calling find_package:
#
# METROHASH_ROOT_DIR Set this variable to the root installation of
# metrohash if the module has problems finding
# the proper installation path.
#
# Variables defined by this module:
#
# METROHASH_FOUND System has metrohash libs/headers
# METROHASH_LIBRARIES The metrohash library/libraries
# METROHASH_INCLUDE_DIR The location of metrohash headers
find_path(METROHASH_ROOT_DIR
NAMES include/metrohash.h
)
find_library(METROHASH_LIBRARIES
NAMES metrohash
PATHS ${METROHASH_ROOT_DIR}/lib ${METROHASH_LIBRARIES_PATHS}
)
find_path(METROHASH_INCLUDE_DIR
NAMES metrohash.h
PATHS ${METROHASH_ROOT_DIR}/include ${METROHASH_INCLUDE_PATHS}
)
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(metrohash DEFAULT_MSG
METROHASH_LIBRARIES
METROHASH_INCLUDE_DIR
)
mark_as_advanced(
METROHASH_ROOT_DIR
METROHASH_LIBRARIES
METROHASH_INCLUDE_DIR
)

View File

@ -0,0 +1,21 @@
macro(find_contrib_lib LIB_NAME)
string(TOLOWER ${LIB_NAME} LIB_NAME_LC)
string(TOUPPER ${LIB_NAME} LIB_NAME_UC)
string(REPLACE "-" "_" LIB_NAME_UC ${LIB_NAME_UC})
option (USE_INTERNAL_${LIB_NAME_UC}_LIBRARY "Use bundled library ${LIB_NAME} instead of system" ${NOT_UNBUNDLED})
if (NOT USE_INTERNAL_${LIB_NAME_UC}_LIBRARY)
find_package ("${LIB_NAME}")
endif ()
if (NOT ${LIB_NAME_UC}_FOUND)
set (USE_INTERNAL_${LIB_NAME_UC}_LIBRARY 1)
set (${LIB_NAME_UC}_LIBRARIES ${LIB_NAME_LC})
set (${LIB_NAME_UC}_INCLUDE_DIR ${${LIB_NAME_UC}_CONTRIB_INCLUDE_DIR})
endif ()
message (STATUS "Using ${LIB_NAME}: ${${LIB_NAME_UC}_INCLUDE_DIR} : ${${LIB_NAME_UC}_LIBRARIES}")
endmacro()

View File

@ -1,17 +0,0 @@
option (USE_INTERNAL_DOUBLE_CONVERSION_LIBRARY "Set to FALSE to use system double-conversion library instead of bundled" ${NOT_UNBUNDLED})
if (NOT USE_INTERNAL_DOUBLE_CONVERSION_LIBRARY)
find_library (DOUBLE_CONVERSION_LIBRARY double-conversion)
find_path (DOUBLE_CONVERSION_INCLUDE_DIR NAMES double-conversion/double-conversion.h PATHS ${DOUBLE_CONVERSION_INCLUDE_PATHS})
endif ()
if (DOUBLE_CONVERSION_LIBRARY AND DOUBLE_CONVERSION_INCLUDE_DIR)
include_directories (${DOUBLE_CONVERSION_INCLUDE_DIR})
else ()
set (USE_INTERNAL_DOUBLE_CONVERSION_LIBRARY 1)
set (DOUBLE_CONVERSION_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/libdouble-conversion")
include_directories (BEFORE ${DOUBLE_CONVERSION_INCLUDE_DIR})
set (DOUBLE_CONVERSION_LIBRARY double-conversion)
endif ()
message (STATUS "Using double-conversion: ${DOUBLE_CONVERSION_INCLUDE_DIR} : ${DOUBLE_CONVERSION_LIBRARY}")

View File

@ -9,8 +9,6 @@ if (LZ4_LIBRARY AND LZ4_INCLUDE_DIR)
include_directories (${LZ4_INCLUDE_DIR})
else ()
set (USE_INTERNAL_LZ4_LIBRARY 1)
set (LZ4_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/liblz4/include/lz4")
include_directories (BEFORE ${LZ4_INCLUDE_DIR})
set (LZ4_LIBRARY lz4)
endif ()

View File

@ -10,6 +10,7 @@ else ()
set (USE_INTERNAL_POCO_LIBRARY 1)
include (${ClickHouse_SOURCE_DIR}/cmake/find_ltdl.cmake)
include (${ClickHouse_SOURCE_DIR}/contrib/libpoco/cmake/FindODBC.cmake)
list (APPEND Poco_INCLUDE_DIRS
@ -29,6 +30,7 @@ else ()
if (ODBC_FOUND)
set (Poco_DataODBC_FOUND 1)
set (Poco_DataODBC_LIBRARY PocoDataODBC)
list (APPEND Poco_DataODBC_LIBRARY ${LTDL_LIB})
list (APPEND Poco_INCLUDE_DIRS "${ClickHouse_SOURCE_DIR}/contrib/libpoco/Data/ODBC/include/")
endif ()

View File

@ -10,10 +10,6 @@ if (RE2_LIBRARY AND RE2_INCLUDE_DIR)
set (RE2_ST_LIBRARY ${RE2_LIBRARY})
else ()
set (USE_INTERNAL_RE2_LIBRARY 1)
set (RE2_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/libre2")
set (RE2_ST_INCLUDE_DIR "${ClickHouse_BINARY_DIR}/contrib/libre2")
include_directories (BEFORE ${RE2_INCLUDE_DIR})
include_directories (BEFORE ${RE2_ST_INCLUDE_DIR})
set (RE2_LIBRARY re2)
set (RE2_ST_LIBRARY re2_st)
set (USE_RE2_ST 1)

View File

@ -9,7 +9,6 @@ if (SPARCEHASH_INCLUDE_DIR)
else ()
set (USE_INTERNAL_SPARCEHASH_LIBRARY 1)
set (SPARCEHASH_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/libsparsehash")
include_directories (BEFORE ${SPARCEHASH_INCLUDE_DIR})
endif ()
message (STATUS "Using sparsehash: ${SPARCEHASH_INCLUDE_DIR}")

View File

@ -2,15 +2,11 @@ option (USE_INTERNAL_ZLIB_LIBRARY "Set to FALSE to use system zlib library inste
if (NOT USE_INTERNAL_ZLIB_LIBRARY)
find_package (ZLIB)
if (ZLIB_FOUND)
include_directories (${ZLIB_INCLUDE_DIRS})
endif ()
endif ()
if (NOT ZLIB_FOUND)
set (USE_INTERNAL_ZLIB_LIBRARY 1)
set (ZLIB_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/libzlib-ng")
include_directories (BEFORE ${ZLIB_INCLUDE_DIR})
if (USE_STATIC_LIBRARIES)
set (ZLIB_LIBRARIES zlibstatic)
else ()

View File

@ -9,8 +9,6 @@ if (ZOOKEEPER_LIBRARY AND ZOOKEEPER_INCLUDE_DIR)
include_directories (${ZOOKEEPER_INCLUDE_DIR})
else ()
set (USE_INTERNAL_ZOOKEEPER_LIBRARY 1)
set (ZOOKEEPER_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/libzookeeper/include")
include_directories (BEFORE ${ZOOKEEPER_INCLUDE_DIR})
set (ZOOKEEPER_LIBRARY zookeeper_mt)
endif ()

View File

@ -9,8 +9,6 @@ if (ZSTD_LIBRARY AND ZSTD_INCLUDE_DIR)
include_directories (${ZSTD_INCLUDE_DIR})
else ()
set (USE_INTERNAL_ZSTD_LIBRARY 1)
set (ZSTD_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/libzstd/include/zstd")
include_directories (BEFORE ${ZSTD_INCLUDE_DIR})
set (ZSTD_LIBRARY zstd)
endif ()

View File

@ -1,10 +1,6 @@
set(CITYHASH_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/libcityhash/include)
set(CPUID_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/libcpuid/include)
set(DIVIDE_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/libdivide)
set(BTRIE_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/libbtrie/include)
set(CITYHASH_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/libcityhash/include)
set(MYSQLXX_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/libs/libmysqlxx/include)
set(POCOEXT_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/libs/libpocoext/include)
set(CITYHASH_CONTRIB_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/libcityhash/include)
set(COMMON_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/libs/libcommon/include ${ClickHouse_BINARY_DIR}/libs/libcommon/include)
set(DBMS_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/dbms/src ${ClickHouse_BINARY_DIR}/dbms/src)
set(DOUBLE_CONVERSION_CONTRIB_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/libdouble-conversion)

View File

@ -1,4 +1,17 @@
get_property (dirs TARGET dbms PROPERTY INCLUDE_DIRECTORIES)
# TODO? Maybe recursive collect on all deps
get_property (dirs1 TARGET dbms PROPERTY INCLUDE_DIRECTORIES)
list(APPEND dirs ${dirs1})
get_property (dirs1 TARGET common PROPERTY INCLUDE_DIRECTORIES)
list(APPEND dirs ${dirs1})
if (USE_INTERNAL_BOOST_LIBRARY)
get_property (dirs1 TARGET ${Boost_PROGRAM_OPTIONS_LIBRARY} PROPERTY INCLUDE_DIRECTORIES)
list(APPEND dirs ${dirs1})
endif ()
list(REMOVE_DUPLICATES dirs)
file (WRITE ${CMAKE_CURRENT_BINARY_DIR}/include_directories.txt "")
foreach (dir ${dirs})

View File

@ -1,21 +1,26 @@
include (CheckCXXSourceCompiles)
include (CMakePushCheckState)
cmake_push_check_state ()
if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
# clang4 : -no-pie cause error
# clang6 : -no-pie cause warning
else ()
set (TEST_FLAG "-no-pie")
set (CMAKE_REQUIRED_FLAGS "${TEST_FLAG}")
cmake_push_check_state ()
check_cxx_source_compiles("
int main() {
return 0;
}
" HAVE_NO_PIE)
set (TEST_FLAG "-no-pie")
set (CMAKE_REQUIRED_FLAGS "${TEST_FLAG}")
set (CMAKE_REQUIRED_FLAGS "")
check_cxx_source_compiles("
int main() {
return 0;
}
" HAVE_NO_PIE)
if (HAVE_NO_PIE)
set (FLAG_NO_PIE ${TEST_FLAG})
endif ()
cmake_pop_check_state ()
if (HAVE_NO_PIE)
set (FLAG_NO_PIE ${TEST_FLAG})
endif ()
cmake_pop_check_state ()

View File

@ -57,7 +57,7 @@ check_cxx_source_compiles("
}
" HAVE_POPCNT)
if (HAVE_POPCNT AND NOT AARCH64)
if (HAVE_POPCNT AND NOT ARCH_AARCH64)
set (COMPILER_FLAGS "${COMPILER_FLAGS} ${TEST_FLAG}")
endif ()

View File

@ -28,10 +28,21 @@ if (USE_INTERNAL_ZOOKEEPER_LIBRARY)
add_subdirectory (libzookeeper)
endif ()
add_subdirectory (libcityhash)
add_subdirectory (libfarmhash)
add_subdirectory (libmetrohash)
add_subdirectory (libbtrie)
if (USE_INTERNAL_CITYHASH_LIBRARY)
add_subdirectory (libcityhash)
endif ()
if (USE_INTERNAL_FARMHASH_LIBRARY)
add_subdirectory (libfarmhash)
endif ()
if (USE_INTERNAL_METROHASH_LIBRARY)
add_subdirectory (libmetrohash)
endif ()
if (USE_INTERNAL_BTRIE_LIBRARY)
add_subdirectory (libbtrie)
endif ()
if (USE_INTERNAL_UNWIND_LIBRARY)
add_subdirectory (libunwind)
@ -49,6 +60,6 @@ if (ENABLE_LIBTCMALLOC AND USE_INTERNAL_GPERFTOOLS_LIBRARY)
add_subdirectory (libtcmalloc)
endif ()
if (NOT ARM)
if (NOT ARCH_ARM)
add_subdirectory (libcpuid)
endif ()

View File

@ -16,3 +16,5 @@ include/cpuid/rdtsc.h
include/cpuid/recog_amd.h
include/cpuid/recog_intel.h
)
target_include_directories (cpuid PUBLIC include)

View File

@ -18,3 +18,5 @@ double-conversion/strtod.cc
double-conversion/strtod.h
double-conversion/utils.h
)
target_include_directories (double-conversion PUBLIC .)

View File

@ -6,4 +6,4 @@ add_library (lz4
include/lz4/lz4hc.h
include/lz4/lz4opt.h)
target_include_directories(lz4 PUBLIC include)
target_include_directories(lz4 PUBLIC include/lz4)

View File

@ -41,6 +41,9 @@ add_library (re2_st ${re2_sources})
target_compile_definitions (re2 PRIVATE NDEBUG)
target_compile_definitions (re2_st PRIVATE NDEBUG NO_THREADS re2=re2_st)
target_include_directories (re2 PUBLIC .)
target_include_directories (re2_st PRIVATE . PUBLIC ${CMAKE_CURRENT_BINARY_DIR})
file (MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/re2_st)
foreach (FILENAME filtered_re2.h re2.h set.h stringpiece.h variadic_function.h)
add_custom_command (OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/re2_st/${FILENAME}"

View File

@ -92,3 +92,5 @@ IF (ZSTD_LEGACY_SUPPORT)
ENDIF (ZSTD_LEGACY_SUPPORT)
ADD_LIBRARY(zstd ${Sources} ${Headers})
target_include_directories (zstd PUBLIC include/zstd)

View File

@ -28,6 +28,7 @@ add_subdirectory (src)
add_library(string_utils
src/Common/StringUtils.h
src/Common/StringUtils.cpp)
target_include_directories (string_utils PRIVATE ${DBMS_INCLUDE_DIR})
set(dbms_headers)
set(dbms_sources)
@ -150,7 +151,7 @@ if (NOT CMAKE_BUILD_TYPE STREQUAL "Debug")
PROPERTIES COMPILE_FLAGS -g0)
endif ()
if (NOT ARM)
if (NOT ARCH_ARM)
set (LINK_LIBRARIES_ONLY_ON_X86_64 cpuid)
endif()
@ -163,12 +164,13 @@ endif()
target_link_libraries (dbms
common
${MYSQLXX_LIBRARY}
cityhash farmhash metrohash
${FARMHASH_LIBRARIES}
${METROHASH_LIBRARIES}
${LZ4_LIBRARY}
${ZSTD_LIBRARY}
${ZOOKEEPER_LIBRARY}
string_utils
${DOUBLE_CONVERSION_LIBRARY}
${DOUBLE_CONVERSION_LIBRARIES}
${ZLIB_LIBRARIES}
${LINK_LIBRARIES_ONLY_ON_X86_64}
${RE2_LIBRARY}
@ -176,7 +178,7 @@ target_link_libraries (dbms
${OPENSSL_CRYPTO_LIBRARY}
${Boost_SYSTEM_LIBRARY}
${Poco_Data_LIBRARY}
btrie
${BTRIE_LIBRARIES}
)
if (Poco_DataODBC_FOUND)
@ -200,19 +202,18 @@ endif ()
target_link_libraries (dbms
${PLATFORM_LIBS}
${CMAKE_DL_LIBS}
${LTDL_LIB}
${CMAKE_THREAD_LIBS_INIT}
)
target_include_directories (dbms BEFORE PRIVATE ${CPUID_INCLUDE_DIR})
target_include_directories (dbms BEFORE PUBLIC ${DIVIDE_INCLUDE_DIR})
target_include_directories (dbms BEFORE PRIVATE ${BTRIE_INCLUDE_DIR})
target_include_directories (dbms BEFORE PRIVATE ${CITYHASH_INCLUDE_DIR})
target_include_directories (dbms PUBLIC ${MYSQLXX_INCLUDE_DIR})
target_include_directories (dbms PRIVATE ${POCOEXT_INCLUDE_DIR})
target_include_directories (dbms PRIVATE ${COMMON_INCLUDE_DIR})
target_include_directories (dbms BEFORE PRIVATE ${DIVIDE_INCLUDE_DIR})
target_include_directories (dbms BEFORE PRIVATE ${SPARCEHASH_INCLUDE_DIR})
target_include_directories (dbms PUBLIC ${DBMS_INCLUDE_DIR})
# only for copy_headers.sh:
target_include_directories (dbms PRIVATE ${COMMON_INCLUDE_DIR})
target_include_directories (dbms BEFORE PRIVATE ${DOUBLE_CONVERSION_INCLUDE_DIR})
if (ENABLE_TESTS)
add_subdirectory (tests)
# attach all dbms gtest sources

View File

@ -1,6 +1,6 @@
# This strings autochanged from release_lib.sh:
set(VERSION_DESCRIBE v1.1.54267-testing)
set(VERSION_REVISION 54267)
set(VERSION_DESCRIBE v1.1.54280-testing)
set(VERSION_REVISION 54280)
# end of autochange
set (VERSION_MAJOR 1)

View File

@ -1,13 +1,17 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteHelpers.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeNullable.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Common/StringUtils.h>
#include <Poco/String.h>
#include <Common/typeid_cast.h>
#include <Poco/String.h>
namespace DB
{

View File

@ -1,14 +1,22 @@
#pragma once
#include <unordered_map>
#include <AggregateFunctions/IAggregateFunction.h>
#include <ext/singleton.h>
#include <functional>
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>
namespace DB
{
class Context;
class IDataType;
using DataTypePtr = std::shared_ptr<IDataType>;
using DataTypes = std::vector<DataTypePtr>;
@ -19,22 +27,8 @@ class AggregateFunctionFactory final : public ext::singleton<AggregateFunctionFa
{
friend class StorageSystemFunctions;
private:
/// No std::function, for smaller object size and less indirection.
using Creator = AggregateFunctionPtr(*)(const String & name, const DataTypes & argument_types, const Array & parameters);
using AggregateFunctions = std::unordered_map<String, Creator>;
public:
AggregateFunctionPtr get(
const String & name,
const DataTypes & argument_types,
const Array & parameters = {},
int recursion_level = 0) const;
AggregateFunctionPtr tryGet(const String & name, const DataTypes & argument_types, const Array & parameters = {}) const;
bool isAggregateFunctionName(const String & name, int recursion_level = 0) const;
using Creator = std::function<AggregateFunctionPtr(const String &, const DataTypes &, const Array &)>;
/// For compatibility with SQL, it's possible to specify that certain aggregate function name is case insensitive.
enum CaseSensitiveness
@ -43,11 +37,29 @@ public:
CaseInsensitive
};
/// Register an aggregate function by its name.
void registerFunction(const String & name, Creator creator, CaseSensitiveness case_sensitiveness = CaseSensitive);
/// Register a function by its name.
/// No locking, you must register all functions before usage of get.
void registerFunction(
const String & name,
Creator creator,
CaseSensitiveness case_sensitiveness = CaseSensitive);
/// Throws an exception if not found.
AggregateFunctionPtr get(
const String & name,
const DataTypes & argument_types,
const Array & parameters = {},
int recursion_level = 0) const;
/// Returns nullptr if not found.
AggregateFunctionPtr tryGet(
const String & name,
const DataTypes & argument_types,
const Array & parameters = {}) const;
bool isAggregateFunctionName(const String & name, int recursion_level = 0) const;
private:
AggregateFunctionPtr getImpl(
const String & name,
const DataTypes & argument_types,
@ -55,6 +67,8 @@ private:
int recursion_level) const;
private:
using AggregateFunctions = std::unordered_map<String, Creator>;
AggregateFunctions aggregate_functions;
/// Case insensitive aggregate functions will be additionally added here with lowercased name.

View File

@ -8,6 +8,28 @@ namespace DB
namespace
{
template <template <typename, typename> class AggregateFunctionTemplate, class Data, typename ... TArgs>
static IAggregateFunction * createWithNumericOrTimeType(const IDataType & argument_type, TArgs && ... args)
{
if (typeid_cast<const DataTypeDate *>(&argument_type)) return new AggregateFunctionTemplate<UInt16, Data>(std::forward<TArgs>(args)...);
else if (typeid_cast<const DataTypeDateTime *>(&argument_type)) return new AggregateFunctionTemplate<UInt32, Data>(std::forward<TArgs>(args)...);
else return createWithNumericType<AggregateFunctionTemplate, Data, TArgs...>(argument_type, std::forward<TArgs>(args)...);
}
template <typename has_limit, typename ... TArgs>
inline AggregateFunctionPtr createAggregateFunctionGroupArrayImpl(const DataTypePtr & argument_type, TArgs ... args)
{
if (auto res = createWithNumericOrTimeType<GroupArrayNumericImpl, has_limit>(*argument_type, argument_type, std::forward<TArgs>(args)...))
return AggregateFunctionPtr(res);
if (typeid_cast<const DataTypeString *>(argument_type.get()))
return std::make_shared<GroupArrayGeneralListImpl<NodeString, has_limit::value>>(std::forward<TArgs>(args)...);
return std::make_shared<GroupArrayGeneralListImpl<NodeGeneral, has_limit::value>>(std::forward<TArgs>(args)...);
};
static AggregateFunctionPtr createAggregateFunctionGroupArray(const std::string & name, const DataTypes & argument_types, const Array & parameters)
{
if (argument_types.size() != 1)
@ -15,7 +37,7 @@ static AggregateFunctionPtr createAggregateFunctionGroupArray(const std::string
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
bool limit_size = false;
UInt64 max_elems = 0;
UInt64 max_elems = std::numeric_limits<UInt64>::max();
if (parameters.empty())
{
@ -39,23 +61,9 @@ static AggregateFunctionPtr createAggregateFunctionGroupArray(const std::string
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
if (!limit_size)
{
if (auto res = createWithNumericType<GroupArrayNumericImpl, std::false_type>(*argument_types[0]))
return AggregateFunctionPtr(res);
else if (typeid_cast<const DataTypeString *>(argument_types[0].get()))
return std::make_shared<GroupArrayGeneralListImpl<NodeString, false>>();
else
return std::make_shared<GroupArrayGeneralListImpl<NodeGeneral, false>>();
}
return createAggregateFunctionGroupArrayImpl<std::false_type>(argument_types[0]);
else
{
if (auto res = createWithNumericType<GroupArrayNumericImpl, std::true_type>(*argument_types[0], max_elems))
return AggregateFunctionPtr(res);
else if (typeid_cast<const DataTypeString *>(argument_types[0].get()))
return std::make_shared<GroupArrayGeneralListImpl<NodeString, true>>(max_elems);
else
return std::make_shared<GroupArrayGeneralListImpl<NodeGeneral, true>>(max_elems);
}
return createAggregateFunctionGroupArrayImpl<std::true_type>(argument_types[0], max_elems);
}
}

View File

@ -51,16 +51,18 @@ class GroupArrayNumericImpl final
: public IUnaryAggregateFunction<GroupArrayNumericData<T>, GroupArrayNumericImpl<T, Tlimit_num_elems>>
{
static constexpr bool limit_num_elems = Tlimit_num_elems::value;
DataTypePtr data_type;
UInt64 max_elems;
public:
GroupArrayNumericImpl(UInt64 max_elems_ = std::numeric_limits<UInt64>::max()) : max_elems(max_elems_) {}
explicit GroupArrayNumericImpl(const DataTypePtr & data_type_, UInt64 max_elems_ = std::numeric_limits<UInt64>::max())
: data_type(data_type_), max_elems(max_elems_) {}
String getName() const override { return "groupArray"; }
DataTypePtr getReturnType() const override
{
return std::make_shared<DataTypeArray>(std::make_shared<DataTypeNumber<T>>());
return std::make_shared<DataTypeArray>(data_type);
}
void setArgument(const DataTypePtr & argument) {}

View File

@ -28,5 +28,4 @@ list(REMOVE_ITEM clickhouse_aggregate_functions_headers
add_library(clickhouse_aggregate_functions ${clickhouse_aggregate_functions_sources})
target_link_libraries(clickhouse_aggregate_functions dbms)
target_include_directories (clickhouse_aggregate_functions BEFORE PUBLIC ${CITYHASH_INCLUDE_DIR})
target_include_directories (clickhouse_aggregate_functions PRIVATE ${COMMON_INCLUDE_DIR})

View File

@ -54,7 +54,7 @@ static IAggregateFunction * createWithNumericType(const IDataType & argument_typ
}
template <template <typename, typename> class AggregateFunctionTemplate, class Data, typename ... TArgs>
static IAggregateFunction * createWithNumericType(const IDataType & argument_type, TArgs ... args)
static IAggregateFunction * createWithNumericType(const IDataType & argument_type, TArgs && ... args)
{
if (typeid_cast<const DataTypeUInt8 *>(&argument_type)) return new AggregateFunctionTemplate<UInt8, Data>(std::forward<TArgs>(args)...);
else if (typeid_cast<const DataTypeUInt16 *>(&argument_type)) return new AggregateFunctionTemplate<UInt16, Data>(std::forward<TArgs>(args)...);

View File

@ -44,18 +44,18 @@
  */
/// The maximum degree of buffer size before the values are discarded
#define UNIQUES_HASH_MAX_SIZE_DEGREE 17
#define UNIQUES_HASH_MAX_SIZE_DEGREE 17
/// The maximum number of elements before the values are discarded
#define UNIQUES_HASH_MAX_SIZE (1 << (UNIQUES_HASH_MAX_SIZE_DEGREE - 1))
#define UNIQUES_HASH_MAX_SIZE (1ULL << (UNIQUES_HASH_MAX_SIZE_DEGREE - 1))
/** The number of least significant bits used for thinning. The remaining high-order bits are used to determine the position in the hash table.
  * (high-order bits are taken because the younger bits will be constant after dropping some of the values)
  */
#define UNIQUES_HASH_BITS_FOR_SKIP (32 - UNIQUES_HASH_MAX_SIZE_DEGREE)
#define UNIQUES_HASH_BITS_FOR_SKIP (32 - UNIQUES_HASH_MAX_SIZE_DEGREE)
/// Initial buffer size degree
#define UNIQUES_HASH_SET_INITIAL_SIZE_DEGREE 4
#define UNIQUES_HASH_SET_INITIAL_SIZE_DEGREE 4
/** This hash function is not the most optimal, but UniquesHashSet states counted with it,
@ -71,15 +71,15 @@ struct UniquesHashSetDefaultHash
template <typename Hash = UniquesHashSetDefaultHash>
class UniquesHashSet : private HashTableAllocatorWithStackMemory<(1 << UNIQUES_HASH_SET_INITIAL_SIZE_DEGREE) * sizeof(UInt32)>
class UniquesHashSet : private HashTableAllocatorWithStackMemory<(1ULL << UNIQUES_HASH_SET_INITIAL_SIZE_DEGREE) * sizeof(UInt32)>
{
private:
using Value_t = UInt64;
using HashValue_t = UInt32;
using Allocator = HashTableAllocatorWithStackMemory<(1 << UNIQUES_HASH_SET_INITIAL_SIZE_DEGREE) * sizeof(UInt32)>;
using Allocator = HashTableAllocatorWithStackMemory<(1ULL << UNIQUES_HASH_SET_INITIAL_SIZE_DEGREE) * sizeof(UInt32)>;
UInt32 m_size; /// Number of elements
UInt8 size_degree; /// The size of the table as a power of 2
UInt32 m_size; /// Number of elements
UInt8 size_degree; /// The size of the table as a power of 2
UInt8 skip_degree; /// Skip elements not divisible by 2 ^ skip_degree
bool has_zero; /// The hash table contains an element with a hash value of 0.
@ -92,7 +92,7 @@ private:
void alloc(UInt8 new_size_degree)
{
buf = reinterpret_cast<HashValue_t *>(Allocator::alloc((1 << new_size_degree) * sizeof(buf[0])));
buf = reinterpret_cast<HashValue_t *>(Allocator::alloc((1ULL << new_size_degree) * sizeof(buf[0])));
size_degree = new_size_degree;
}
@ -105,10 +105,10 @@ private:
}
}
inline size_t buf_size() const { return 1 << size_degree; }
inline size_t max_fill() const { return 1 << (size_degree - 1); }
inline size_t mask() const { return buf_size() - 1; }
inline size_t place(HashValue_t x) const { return (x >> UNIQUES_HASH_BITS_FOR_SKIP) & mask(); }
inline size_t buf_size() const { return 1ULL << size_degree; }
inline size_t max_fill() const { return 1ULL << (size_degree - 1); }
inline size_t mask() const { return buf_size() - 1; }
inline size_t place(HashValue_t x) const { return (x >> UNIQUES_HASH_BITS_FOR_SKIP) & mask(); }
/// The value is divided by 2 ^ skip_degree
inline bool good(HashValue_t hash) const
@ -157,7 +157,7 @@ private:
new_size_degree = size_degree + 1;
/// Expand the space.
buf = reinterpret_cast<HashValue_t *>(Allocator::realloc(buf, old_size * sizeof(buf[0]), (1 << new_size_degree) * sizeof(buf[0])));
buf = reinterpret_cast<HashValue_t *>(Allocator::realloc(buf, old_size * sizeof(buf[0]), (1ULL << new_size_degree) * sizeof(buf[0])));
size_degree = new_size_degree;
/** Now some items may need to be moved to a new location.
@ -327,12 +327,12 @@ public:
if (0 == skip_degree)
return m_size;
size_t res = m_size * (1 << skip_degree);
size_t res = m_size * (1ULL << skip_degree);
/** Pseudo-random remainder - in order to be not visible,
* that the number is divided by the power of two.
*/
res += (intHashCRC32(m_size) & ((1 << skip_degree) - 1));
res += (intHashCRC32(m_size) & ((1ULL << skip_degree) - 1));
/** Correction of a systematic error due to collisions during hashing in UInt32.
* `fixed_res(res)` formula
@ -435,7 +435,7 @@ public:
if (rhs_size > UNIQUES_HASH_MAX_SIZE)
throw Poco::Exception("Cannot read UniquesHashSet: too large size_degree.");
if ((1U << size_degree) < rhs_size)
if ((1ULL << size_degree) < rhs_size)
{
UInt8 new_size_degree = std::max(UNIQUES_HASH_SET_INITIAL_SIZE_DEGREE, static_cast<int>(log2(rhs_size - 1)) + 2);
resize(new_size_degree);

View File

@ -351,8 +351,8 @@ void Connection::sendQuery(
block_in.reset();
block_out.reset();
/// If server version is new enough, send empty block which meand end of data.
if (server_revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES && !with_pending_data)
/// Send empty block which means end of data.
if (!with_pending_data)
{
sendData(Block());
out->next();
@ -384,9 +384,7 @@ void Connection::sendData(const Block & block, const String & name)
}
writeVarUInt(Protocol::Client::Data, *out);
if (server_revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES)
writeStringBinary(name, *out);
writeStringBinary(name, *out);
size_t prev_bytes = out->count();
@ -405,9 +403,7 @@ void Connection::sendPreparedData(ReadBuffer & input, size_t size, const String
/// NOTE 'Throttler' is not used in this method (could use, but it's not important right now).
writeVarUInt(Protocol::Client::Data, *out);
if (server_revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES)
writeStringBinary(name, *out);
writeStringBinary(name, *out);
if (0 == size)
copyData(input, *out);
@ -419,13 +415,6 @@ void Connection::sendPreparedData(ReadBuffer & input, size_t size, const String
void Connection::sendExternalTablesData(ExternalTablesData & data)
{
/// If working with older server, don't send any info.
if (server_revision < DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES)
{
out->next();
return;
}
if (data.empty())
{
/// Send empty block, which means end of data transfer.
@ -552,9 +541,7 @@ Block Connection::receiveData()
initBlockInput();
String external_table_name;
if (server_revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES)
readStringBinary(external_table_name, *in);
readStringBinary(external_table_name, *in);
size_t prev_bytes = in->count();

View File

@ -17,6 +17,14 @@ namespace ProfileEvents
namespace DB
{
namespace ErrorCodes
{
extern const int NETWORK_ERROR;
extern const int SOCKET_TIMEOUT;
extern const int LOGICAL_ERROR;
}
ConnectionPoolWithFailover::ConnectionPoolWithFailover(
ConnectionPoolPtrs nested_pools_,
LoadBalancing load_balancing,
@ -71,10 +79,17 @@ std::vector<IConnectionPool::Entry> ConnectionPoolWithFailover::getMany(const Se
{
return tryGetEntry(pool, fail_message, settings);
};
return getManyImpl(settings, pool_mode, try_get_entry);
std::vector<TryResult> results = getManyImpl(settings, pool_mode, try_get_entry);
std::vector<Entry> entries;
entries.reserve(results.size());
for (auto & result : results)
entries.emplace_back(std::move(result.entry));
return entries;
}
std::vector<IConnectionPool::Entry> ConnectionPoolWithFailover::getManyChecked(
std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::getManyChecked(
const Settings * settings, PoolMode pool_mode, const QualifiedTableName & table_to_check)
{
TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
@ -84,7 +99,7 @@ std::vector<IConnectionPool::Entry> ConnectionPoolWithFailover::getManyChecked(
return getManyImpl(settings, pool_mode, try_get_entry);
}
std::vector<ConnectionPool::Entry> ConnectionPoolWithFailover::getManyImpl(
std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::getManyImpl(
const Settings * settings,
PoolMode pool_mode,
const TryGetEntryFunc & try_get_entry)

View File

@ -7,13 +7,6 @@
namespace DB
{
namespace ErrorCodes
{
extern const int NETWORK_ERROR;
extern const int SOCKET_TIMEOUT;
extern const int LOGICAL_ERROR;
}
/** Connection pool with fault tolerance.
* Initialized by several other IConnectionPools.
* When a connection is received, it tries to create or select a live connection from a pool,
@ -54,16 +47,17 @@ public:
*/
std::vector<Entry> getMany(const Settings * settings, PoolMode pool_mode);
using Base = PoolWithFailoverBase<IConnectionPool>;
using TryResult = Base::TryResult;
/// The same as getMany(), but check that replication delay for table_to_check is acceptable.
/// Delay threshold is taken from settings.
std::vector<Entry> getManyChecked(
std::vector<TryResult> getManyChecked(
const Settings * settings, PoolMode pool_mode, const QualifiedTableName & table_to_check);
private:
using Base = PoolWithFailoverBase<IConnectionPool>;
/// Get the values of relevant settings and call Base::getMany()
std::vector<Entry> getManyImpl(
std::vector<TryResult> getManyImpl(
const Settings * settings,
PoolMode pool_mode,
const TryGetEntryFunc & try_get_entry);

View File

@ -12,64 +12,45 @@ namespace ErrorCodes
}
MultiplexedConnections::MultiplexedConnections(Connection * connection_, const Settings * settings_, ThrottlerPtr throttler_)
: settings(settings_), throttler(throttler_), supports_parallel_execution(false)
MultiplexedConnections::MultiplexedConnections(Connection & connection, const Settings & settings_, const ThrottlerPtr & throttler)
: settings(settings_)
{
if (connection_ == nullptr)
throw Exception("Invalid connection specified", ErrorCodes::LOGICAL_ERROR);
active_connection_total_count = 1;
ShardState shard_state;
shard_state.allocated_connection_count = active_connection_total_count;
shard_state.active_connection_count = active_connection_total_count;
shard_states.push_back(shard_state);
connection.setThrottler(throttler);
ReplicaState replica_state;
replica_state.connection_index = 0;
replica_state.shard_state = &shard_states[0];
replica_state.connection = &connection;
replica_states.push_back(replica_state);
fd_to_replica_state_idx.emplace(connection.socket.impl()->sockfd(), 0);
connection_->setThrottler(throttler);
connections.push_back(connection_);
auto res = replica_map.emplace(connections[0]->socket.impl()->sockfd(), replica_state);
if (!res.second)
throw Exception("Invalid set of connections", ErrorCodes::LOGICAL_ERROR);
active_connection_count = 1;
}
MultiplexedConnections::MultiplexedConnections(
ConnectionPoolWithFailover & pool_, const Settings * settings_, ThrottlerPtr throttler_,
bool append_extra_info, PoolMode pool_mode_, const QualifiedTableName * main_table)
: settings(settings_), throttler(throttler_), pool_mode(pool_mode_)
std::vector<IConnectionPool::Entry> && connections,
const Settings & settings_, const ThrottlerPtr & throttler, bool append_extra_info)
: settings(settings_)
{
initFromShard(pool_, main_table);
registerShards();
/// If we didn't get any connections from pool and getMany() did not throw exceptions, this means that
/// `skip_unavailable_shards` was set. Then just return.
if (connections.empty())
return;
supports_parallel_execution = active_connection_total_count > 1;
if (append_extra_info)
block_extra_info = std::make_unique<BlockExtraInfo>();
}
MultiplexedConnections::MultiplexedConnections(
const ConnectionPoolWithFailoverPtrs & pools_, const Settings * settings_, ThrottlerPtr throttler_,
bool append_extra_info, PoolMode pool_mode_, const QualifiedTableName * main_table)
: settings(settings_), throttler(throttler_), pool_mode(pool_mode_)
{
if (pools_.empty())
throw Exception("Pools are not specified", ErrorCodes::LOGICAL_ERROR);
for (auto & pool : pools_)
replica_states.reserve(connections.size());
fd_to_replica_state_idx.reserve(connections.size());
for (size_t i = 0; i < connections.size(); ++i)
{
if (!pool)
throw Exception("Invalid pool specified", ErrorCodes::LOGICAL_ERROR);
initFromShard(*pool, main_table);
Connection * connection = &(*connections[i]);
connection->setThrottler(throttler);
ReplicaState replica_state;
replica_state.pool_entry = std::move(connections[i]);
replica_state.connection = connection;
replica_states.push_back(std::move(replica_state));
fd_to_replica_state_idx.emplace(connection->socket.impl()->sockfd(), i);
}
registerShards();
supports_parallel_execution = active_connection_total_count > 1;
active_connection_count = connections.size();
if (append_extra_info)
block_extra_info = std::make_unique<BlockExtraInfo>();
@ -82,17 +63,18 @@ void MultiplexedConnections::sendExternalTablesData(std::vector<ExternalTablesDa
if (!sent_query)
throw Exception("Cannot send external tables data: query not yet sent.", ErrorCodes::LOGICAL_ERROR);
if (data.size() < active_connection_total_count)
if (data.size() != active_connection_count)
throw Exception("Mismatch between replicas and data sources", ErrorCodes::MISMATCH_REPLICAS_DATA_SOURCES);
auto it = data.begin();
for (auto & e : replica_map)
for (ReplicaState & state : replica_states)
{
ReplicaState & state = e.second;
Connection * connection = connections[state.connection_index];
Connection * connection = state.connection;
if (connection != nullptr)
{
connection->sendExternalTablesData(*it);
++it;
++it;
}
}
}
@ -108,54 +90,28 @@ void MultiplexedConnections::sendQuery(
if (sent_query)
throw Exception("Query already sent.", ErrorCodes::LOGICAL_ERROR);
if (supports_parallel_execution)
if (replica_states.size() > 1)
{
if (settings == nullptr)
Settings query_settings = settings;
query_settings.parallel_replicas_count = replica_states.size();
for (size_t i = 0; i < replica_states.size(); ++i)
{
/// Each shard has one address.
auto it = connections.begin();
for (size_t i = 0; i < shard_states.size(); ++i)
{
Connection * connection = *it;
if (connection == nullptr)
throw Exception("MultiplexedConnections: Internal error", ErrorCodes::LOGICAL_ERROR);
Connection * connection = replica_states[i].connection;
if (connection == nullptr)
throw Exception("MultiplexedConnections: Internal error", ErrorCodes::LOGICAL_ERROR);
connection->sendQuery(query, query_id, stage, nullptr, client_info, with_pending_data);
++it;
}
}
else
{
/// Each shard has one or more replicas.
auto it = connections.begin();
for (const auto & shard_state : shard_states)
{
Settings query_settings = *settings;
query_settings.parallel_replicas_count = shard_state.active_connection_count;
UInt64 offset = 0;
for (size_t i = 0; i < shard_state.allocated_connection_count; ++i)
{
Connection * connection = *it;
if (connection == nullptr)
throw Exception("MultiplexedConnections: Internal error", ErrorCodes::LOGICAL_ERROR);
query_settings.parallel_replica_offset = offset;
connection->sendQuery(query, query_id, stage, &query_settings, client_info, with_pending_data);
++offset;
++it;
}
}
query_settings.parallel_replica_offset = i;
connection->sendQuery(query, query_id, stage, &query_settings, client_info, with_pending_data);
}
}
else
{
Connection * connection = connections[0];
Connection * connection = replica_states[0].connection;
if (connection == nullptr)
throw Exception("MultiplexedConnections: Internal error", ErrorCodes::LOGICAL_ERROR);
connection->sendQuery(query, query_id, stage, settings, client_info, with_pending_data);
connection->sendQuery(query, query_id, stage, &settings, client_info, with_pending_data);
}
sent_query = true;
@ -187,14 +143,13 @@ void MultiplexedConnections::disconnect()
{
std::lock_guard<std::mutex> lock(cancel_mutex);
for (auto it = replica_map.begin(); it != replica_map.end(); ++it)
for (ReplicaState & state : replica_states)
{
ReplicaState & state = it->second;
Connection * connection = connections[state.connection_index];
Connection * connection = state.connection;
if (connection != nullptr)
{
connection->disconnect();
invalidateReplica(it);
invalidateReplica(state);
}
}
}
@ -206,10 +161,9 @@ void MultiplexedConnections::sendCancel()
if (!sent_query || cancelled)
throw Exception("Cannot cancel. Either no query sent or already cancelled.", ErrorCodes::LOGICAL_ERROR);
for (const auto & e : replica_map)
for (ReplicaState & state : replica_states)
{
const ReplicaState & state = e.second;
Connection * connection = connections[state.connection_index];
Connection * connection = state.connection;
if (connection != nullptr)
connection->sendCancel();
}
@ -243,7 +197,7 @@ Connection::Packet MultiplexedConnections::drain()
case Protocol::Server::Exception:
default:
/// If we receive an exception or an unknown package, we save it.
/// If we receive an exception or an unknown packet, we save it.
res = std::move(packet);
break;
}
@ -262,10 +216,9 @@ std::string MultiplexedConnections::dumpAddressesUnlocked() const
{
bool is_first = true;
std::ostringstream os;
for (const auto & e : replica_map)
for (const ReplicaState & state : replica_states)
{
const ReplicaState & state = e.second;
const Connection * connection = connections[state.connection_index];
const Connection * connection = state.connection;
if (connection != nullptr)
{
os << (is_first ? "" : "; ") << connection->getDescription();
@ -276,65 +229,6 @@ std::string MultiplexedConnections::dumpAddressesUnlocked() const
return os.str();
}
void MultiplexedConnections::initFromShard(ConnectionPoolWithFailover & pool, const QualifiedTableName * main_table)
{
std::vector<IConnectionPool::Entry> entries;
if (main_table)
entries = pool.getManyChecked(settings, pool_mode, *main_table);
else
entries = pool.getMany(settings, pool_mode);
/// If getMany() did not allocate connections and did not throw exceptions, this means that
/// `skip_unavailable_shards` was set. Then just return.
if (entries.empty())
return;
ShardState shard_state;
shard_state.allocated_connection_count = entries.size();
shard_state.active_connection_count = entries.size();
active_connection_total_count += shard_state.active_connection_count;
shard_states.push_back(shard_state);
pool_entries.insert(pool_entries.end(), entries.begin(), entries.end());
}
void MultiplexedConnections::registerShards()
{
replica_map.reserve(pool_entries.size());
connections.reserve(pool_entries.size());
size_t offset = 0;
for (auto & shard_state : shard_states)
{
size_t index_begin = offset;
size_t index_end = offset + shard_state.allocated_connection_count;
registerReplicas(index_begin, index_end, shard_state);
offset = index_end;
}
}
void MultiplexedConnections::registerReplicas(size_t index_begin, size_t index_end, ShardState & shard_state)
{
for (size_t i = index_begin; i < index_end; ++i)
{
ReplicaState replica_state;
replica_state.connection_index = i;
replica_state.shard_state = &shard_state;
Connection * connection = &*(pool_entries[i]);
if (connection == nullptr)
throw Exception("MultiplexedConnections: Internal error", ErrorCodes::LOGICAL_ERROR);
connection->setThrottler(throttler);
connections.push_back(connection);
auto res = replica_map.emplace(connection->socket.impl()->sockfd(), replica_state);
if (!res.second)
throw Exception("Invalid set of connections", ErrorCodes::LOGICAL_ERROR);
}
}
Connection::Packet MultiplexedConnections::receivePacketUnlocked()
{
if (!sent_query)
@ -342,14 +236,10 @@ Connection::Packet MultiplexedConnections::receivePacketUnlocked()
if (!hasActiveConnections())
throw Exception("No more packets are available.", ErrorCodes::LOGICAL_ERROR);
auto it = getReplicaForReading();
if (it == replica_map.end())
throw Exception("Logical error: no available replica", ErrorCodes::NO_AVAILABLE_REPLICA);
ReplicaState & state = it->second;
current_connection = connections[state.connection_index];
ReplicaState & state = getReplicaForReading();
current_connection = state.connection;
if (current_connection == nullptr)
throw Exception("MultiplexedConnections: Internal error", ErrorCodes::LOGICAL_ERROR);
throw Exception("Logical error: no available replica", ErrorCodes::NO_AVAILABLE_REPLICA);
Connection::Packet packet = current_connection->receivePacket();
@ -363,48 +253,32 @@ Connection::Packet MultiplexedConnections::receivePacketUnlocked()
break;
case Protocol::Server::EndOfStream:
invalidateReplica(it);
invalidateReplica(state);
break;
case Protocol::Server::Exception:
default:
current_connection->disconnect();
invalidateReplica(it);
invalidateReplica(state);
break;
}
return packet;
}
MultiplexedConnections::ReplicaMap::iterator MultiplexedConnections::getReplicaForReading()
MultiplexedConnections::ReplicaState & MultiplexedConnections::getReplicaForReading()
{
ReplicaMap::iterator it;
if (replica_states.size() == 1)
return replica_states[0];
if (supports_parallel_execution)
it = waitForReadEvent();
else
{
it = replica_map.begin();
const ReplicaState & state = it->second;
Connection * connection = connections[state.connection_index];
if (connection == nullptr)
it = replica_map.end();
}
return it;
}
MultiplexedConnections::ReplicaMap::iterator MultiplexedConnections::waitForReadEvent()
{
Poco::Net::Socket::SocketList read_list;
read_list.reserve(active_connection_total_count);
read_list.reserve(active_connection_count);
/// First, we check if there are data already in the buffer
/// of at least one connection.
for (const auto & e : replica_map)
for (const ReplicaState & state : replica_states)
{
const ReplicaState & state = e.second;
Connection * connection = connections[state.connection_index];
Connection * connection = state.connection;
if ((connection != nullptr) && connection->hasReadBufferPendingData())
read_list.push_back(connection->socket);
}
@ -416,32 +290,28 @@ MultiplexedConnections::ReplicaMap::iterator MultiplexedConnections::waitForRead
Poco::Net::Socket::SocketList write_list;
Poco::Net::Socket::SocketList except_list;
for (const auto & e : replica_map)
for (const ReplicaState & state : replica_states)
{
const ReplicaState & state = e.second;
Connection * connection = connections[state.connection_index];
Connection * connection = state.connection;
if (connection != nullptr)
read_list.push_back(connection->socket);
}
int n = Poco::Net::Socket::select(read_list, write_list, except_list, settings->receive_timeout);
int n = Poco::Net::Socket::select(read_list, write_list, except_list, settings.receive_timeout);
if (n == 0)
throw Exception("Timeout exceeded while reading from " + dumpAddressesUnlocked(), ErrorCodes::TIMEOUT_EXCEEDED);
}
auto & socket = read_list[rand() % read_list.size()];
return replica_map.find(socket.impl()->sockfd());
return replica_states[fd_to_replica_state_idx.at(socket.impl()->sockfd())];
}
void MultiplexedConnections::invalidateReplica(MultiplexedConnections::ReplicaMap::iterator it)
void MultiplexedConnections::invalidateReplica(ReplicaState & state)
{
ReplicaState & state = it->second;
ShardState * shard_state = state.shard_state;
connections[state.connection_index] = nullptr;
--shard_state->active_connection_count;
--active_connection_total_count;
state.connection = nullptr;
state.pool_entry = IConnectionPool::Entry();
--active_connection_count;
}
}

View File

@ -10,7 +10,7 @@ namespace DB
{
/** To retrieve data directly from multiple replicas (connections) from one or several shards
/** To retrieve data directly from multiple replicas (connections) from one shard
* within a single thread. As a degenerate case, it can also work with one connection.
* It is assumed that all functions except sendCancel are always executed in one thread.
*
@ -20,23 +20,14 @@ class MultiplexedConnections final : private boost::noncopyable
{
public:
/// Accepts ready connection.
MultiplexedConnections(Connection * connection_, const Settings * settings_, ThrottlerPtr throttler_);
MultiplexedConnections(Connection & connection, const Settings & settings_, const ThrottlerPtr & throttler_);
/** Accepts a pool from which it will be necessary to get one or more connections.
/** Accepts a vector of connections to replicas of one shard already taken from pool.
* If the append_extra_info flag is set, additional information appended to each received block.
* If the get_all_replicas flag is set, all connections are selected.
*/
MultiplexedConnections(
ConnectionPoolWithFailover & pool_, const Settings * settings_, ThrottlerPtr throttler_,
bool append_extra_info, PoolMode pool_mode_, const QualifiedTableName * main_table = nullptr);
/** Accepts pools, one for each shard, from which one will need to get one or more connections.
* If the append_extra_info flag is set, additional information appended to each received block.
* If the do_broadcast flag is set, all connections are received.
*/
MultiplexedConnections(
const ConnectionPoolWithFailoverPtrs & pools_, const Settings * settings_, ThrottlerPtr throttler_,
bool append_extra_info, PoolMode pool_mode_, const QualifiedTableName * main_table = nullptr);
std::vector<IConnectionPool::Entry> && connections,
const Settings & settings_, const ThrottlerPtr & throttler_, bool append_extra_info);
/// Send all content of external tables to replicas.
void sendExternalTablesData(std::vector<ExternalTablesData> & data);
@ -72,92 +63,50 @@ public:
/// Returns the number of replicas.
/// Without locking, because sendCancel() does not change this number.
size_t size() const { return replica_map.size(); }
size_t size() const { return replica_states.size(); }
/// Check if there are any valid replicas.
/// Without locking, because sendCancel() does not change the state of the replicas.
bool hasActiveConnections() const { return active_connection_total_count > 0; }
bool hasActiveConnections() const { return active_connection_count > 0; }
private:
/// Connections of the 1st shard, then the connections of the 2nd shard, etc.
using Connections = std::vector<Connection *>;
/// The state of the connections of one shard.
struct ShardState
{
/// The number of connections allocated, i.e. replicas for this shard.
size_t allocated_connection_count;
/// The current number of valid connections to the replicas of this shard.
size_t active_connection_count;
};
/// Description of a single replica.
struct ReplicaState
{
size_t connection_index;
/// The owner of this replica.
ShardState * shard_state;
};
/// Replicas hashed by id of the socket.
using ReplicaMap = std::unordered_map<int, ReplicaState>;
/// The state of each shard.
using ShardStates = std::vector<ShardState>;
private:
void initFromShard(ConnectionPoolWithFailover & pool, const QualifiedTableName * main_table);
void registerShards();
/// Register replicas of one shard.
void registerReplicas(size_t index_begin, size_t index_end, ShardState & shard_state);
/// Internal version of `receivePacket` function without locking.
Connection::Packet receivePacketUnlocked();
/// Internal version of `dumpAddresses` function without locking.
std::string dumpAddressesUnlocked() const;
/// Get a replica where you can read the data.
ReplicaMap::iterator getReplicaForReading();
/// Description of a single replica.
struct ReplicaState
{
Connection * connection = nullptr;
ConnectionPool::Entry pool_entry;
};
/** Check if there are any data that can be read on any of the replicas.
* Returns one such replica if it exists.
*/
ReplicaMap::iterator waitForReadEvent();
/// Get a replica where you can read the data.
ReplicaState & getReplicaForReading();
/// Mark the replica as invalid.
void invalidateReplica(ReplicaMap::iterator it);
void invalidateReplica(ReplicaState & replica_state);
private:
const Settings * settings;
const Settings & settings;
Connections connections;
ReplicaMap replica_map;
ShardStates shard_states;
/// The current number of valid connections to the replicas of this shard.
size_t active_connection_count = 0;
/// If not nullptr, then it is used to restrict network traffic.
ThrottlerPtr throttler;
std::vector<ConnectionPool::Entry> pool_entries;
std::vector<ReplicaState> replica_states;
std::unordered_map<int, size_t> fd_to_replica_state_idx;
/// Connection that received last block.
Connection * current_connection;
Connection * current_connection = nullptr;
/// Information about the last received block, if supported.
std::unique_ptr<BlockExtraInfo> block_extra_info;
/// The current number of valid connections to replicas.
size_t active_connection_total_count = 0;
/// The query is run in parallel on multiple replicas.
bool supports_parallel_execution;
bool sent_query = false;
bool cancelled = false;
PoolMode pool_mode = PoolMode::GET_MANY;
/// A mutex for the sendCancel function to execute safely
/// in separate thread.
mutable std::mutex cancel_mutex;

View File

@ -9,6 +9,8 @@
#include <Common/Exception.h>
#include <Common/Allocator.h>
#include <IO/WriteHelpers.h>
/// Required for older Darwin builds, that lack definition of MAP_ANONYMOUS
#ifndef MAP_ANONYMOUS
#define MAP_ANONYMOUS MAP_ANON
@ -37,7 +39,7 @@ namespace ErrorCodes
*
* PS. This is also required, because tcmalloc can not allocate a chunk of memory greater than 16 GB.
*/
static constexpr size_t MMAP_THRESHOLD = 64 * (1 << 20);
static constexpr size_t MMAP_THRESHOLD = 64 * (1ULL << 20);
static constexpr size_t MMAP_MIN_ALIGNMENT = 4096;
static constexpr size_t MALLOC_MIN_ALIGNMENT = 8;
@ -54,7 +56,7 @@ void * Allocator<clear_memory_>::alloc(size_t size, size_t alignment)
if (alignment > MMAP_MIN_ALIGNMENT)
throw DB::Exception("Too large alignment: more than page size.", DB::ErrorCodes::BAD_ARGUMENTS);
buf = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
buf = mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
if (MAP_FAILED == buf)
DB::throwFromErrno("Allocator: Cannot mmap.", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
@ -128,7 +130,7 @@ void * Allocator<clear_memory_>::realloc(void * buf, size_t old_size, size_t new
buf = mremap(buf, old_size, new_size, MREMAP_MAYMOVE);
if (MAP_FAILED == buf)
DB::throwFromErrno("Allocator: Cannot mremap.", DB::ErrorCodes::CANNOT_MREMAP);
DB::throwFromErrno("Allocator: Cannot mremap memory chunk from " + DB::toString(old_size) + " to " + DB::toString(new_size) + " bytes.", DB::ErrorCodes::CANNOT_MREMAP);
/// No need for zero-fill, because mmap guarantees it.
}

View File

@ -70,7 +70,7 @@ public:
}
/// no block of corresponding size, allocate a new one
return pool.alloc(1 << (list_idx + 1));
return pool.alloc(1ULL << (list_idx + 1));
}
void free(char * ptr, const size_t size)

View File

@ -242,9 +242,24 @@ void ExecutionStatus::deserializeText(const std::string & data)
rb >> code >> "\n" >> escape >> message;
}
bool ExecutionStatus::tryDeserializeText(const std::string & data)
{
try
{
deserializeText(data);
}
catch (...)
{
return false;
}
return true;
}
ExecutionStatus ExecutionStatus::fromCurrentException(const std::string & start_of_message)
{
return ExecutionStatus(getCurrentExceptionCode(), start_of_message + ": " + getCurrentExceptionMessage(false, true));
String msg = (start_of_message.empty() ? "" : (start_of_message + ": ")) + getCurrentExceptionMessage(false, true);
return ExecutionStatus(getCurrentExceptionCode(), msg);
}

View File

@ -107,6 +107,8 @@ struct ExecutionStatus
std::string serializeText() const;
void deserializeText(const std::string & data);
bool tryDeserializeText(const std::string & data);
};

View File

@ -155,9 +155,9 @@ struct HashTableGrower
UInt8 size_degree = initial_size_degree;
/// The size of the hash table in the cells.
size_t bufSize() const { return 1 << size_degree; }
size_t bufSize() const { return 1ULL << size_degree; }
size_t maxFill() const { return 1 << (size_degree - 1); }
size_t maxFill() const { return 1ULL << (size_degree - 1); }
size_t mask() const { return bufSize() - 1; }
/// From the hash value, get the cell number in the hash table.
@ -200,7 +200,7 @@ struct HashTableGrower
template <size_t key_bits>
struct HashTableFixedGrower
{
size_t bufSize() const { return 1 << key_bits; }
size_t bufSize() const { return 1ULL << key_bits; }
size_t place(size_t x) const { return x; }
/// You could write __builtin_unreachable(), but the compiler does not optimize everything, and it turns out less efficiently.
size_t next(size_t pos) const { return pos + 1; }

View File

@ -4,7 +4,7 @@
/** Two-level hash table.
* Represents 256 (or 1 << BITS_FOR_BUCKET) small hash tables (buckets of the first level).
* Represents 256 (or 1ULL << BITS_FOR_BUCKET) small hash tables (buckets of the first level).
* To determine which one to use, one of the bytes of the hash function is taken.
*
* Usually works a little slower than a simple hash table.
@ -47,7 +47,7 @@ protected:
public:
using Impl = ImplTable;
static constexpr size_t NUM_BUCKETS = 1 << BITS_FOR_BUCKET;
static constexpr size_t NUM_BUCKETS = 1ULL << BITS_FOR_BUCKET;
static constexpr size_t MAX_BUCKET = NUM_BUCKETS - 1;
size_t hash(const Key & x) const { return Hash::operator()(x); }

View File

@ -71,7 +71,7 @@ public:
TryResult() = default;
explicit TryResult(Entry entry_)
: entry(std::move(entry))
: entry(std::move(entry_))
, is_usable(true)
, is_up_to_date(true)
{
@ -107,7 +107,7 @@ public:
/// Returns at least min_entries and at most max_entries connections (at most one connection per nested pool).
/// The method will throw if it is unable to get min_entries alive connections or
/// if fallback_to_stale_replicas is false and it is unable to get min_entries connections to up-to-date replicas.
std::vector<Entry> getMany(
std::vector<TryResult> getMany(
size_t min_entries, size_t max_entries,
const TryGetEntryFunc & try_get_entry,
const GetPriorityFunc & get_priority = GetPriorityFunc(),
@ -141,16 +141,16 @@ template<typename TNestedPool>
typename TNestedPool::Entry
PoolWithFailoverBase<TNestedPool>::get(const TryGetEntryFunc & try_get_entry, const GetPriorityFunc & get_priority)
{
std::vector<Entry> entries = getMany(1, 1, try_get_entry, get_priority);
if (entries.empty() || entries[0].isNull())
std::vector<TryResult> results = getMany(1, 1, try_get_entry, get_priority);
if (results.empty() || results[0].entry.isNull())
throw DB::Exception(
"PoolWithFailoverBase::getMany() returned less than min_entries entries.",
DB::ErrorCodes::LOGICAL_ERROR);
return entries[0];
return results[0].entry;
}
template<typename TNestedPool>
std::vector<typename TNestedPool::Entry>
std::vector<typename PoolWithFailoverBase<TNestedPool>::TryResult>
PoolWithFailoverBase<TNestedPool>::getMany(
size_t min_entries, size_t max_entries,
const TryGetEntryFunc & try_get_entry,
@ -262,34 +262,27 @@ PoolWithFailoverBase<TNestedPool>::getMany(
[](const TryResult & r) { return r.entry.isNull() || !r.is_usable; }),
try_results.end());
std::vector<Entry> entries;
/// Sort so that preferred items are near the beginning.
std::stable_sort(
try_results.begin(), try_results.end(),
[](const TryResult & left, const TryResult & right)
{
return std::forward_as_tuple(!left.is_up_to_date, left.staleness)
< std::forward_as_tuple(!right.is_up_to_date, right.staleness);
});
if (up_to_date_count >= min_entries)
{
/// There is enough up-to-date entries.
entries.reserve(up_to_date_count);
for (const TryResult & result: try_results)
{
if (result.is_up_to_date)
entries.push_back(result.entry);
}
try_results.resize(up_to_date_count);
}
else if (fallback_to_stale_replicas)
{
/// There is not enough up-to-date entries but we are allowed to return stale entries.
/// Gather all up-to-date ones and least-bad stale ones.
std::stable_sort(
try_results.begin(), try_results.end(),
[](const TryResult & left, const TryResult & right)
{
return std::forward_as_tuple(!left.is_up_to_date, left.staleness)
< std::forward_as_tuple(!right.is_up_to_date, right.staleness);
});
size_t size = std::min(try_results.size(), max_entries);
entries.reserve(size);
for (size_t i = 0; i < size; ++i)
entries.push_back(try_results[i].entry);
try_results.resize(size);
}
else
throw DB::Exception(
@ -297,7 +290,7 @@ PoolWithFailoverBase<TNestedPool>::getMany(
+ ", needed: " + std::to_string(min_entries),
DB::ErrorCodes::ALL_REPLICAS_ARE_STALE);
return entries;
return try_results;
}
template<typename TNestedPool>

View File

@ -123,6 +123,7 @@
M(DictCacheLockWriteNs) \
M(DictCacheLockReadNs) \
\
M(DistributedSyncInsertionTimeoutExceeded) \
M(DataAfterMergeDiffersFromReplica)

View File

@ -74,25 +74,17 @@ String chooseSuffixForSet(const NamesAndTypesList & columns, const std::vector<S
void rewriteEntityInAst(ASTPtr ast, const String & column_name, const Field & value)
{
ASTSelectQuery & select = typeid_cast<ASTSelectQuery &>(*ast);
ASTExpressionList & node = typeid_cast<ASTExpressionList &>(*select.select_expression_list);
ASTs & asts = node.children;
auto cur = std::make_shared<ASTLiteral>(StringRange(), value);
cur->alias = column_name;
ASTPtr column_value = cur;
bool is_replaced = false;
for (size_t i = 0; i < asts.size(); ++i)
if (!select.with_expression_list)
{
if (const ASTIdentifier * identifier = typeid_cast<const ASTIdentifier *>(&* asts[i]))
{
if (identifier->kind == ASTIdentifier::Kind::Column && identifier->name == column_name)
{
asts[i] = column_value;
is_replaced = true;
}
}
select.with_expression_list = std::make_shared<ASTExpressionList>();
select.children.insert(select.children.begin(), select.with_expression_list);
}
if (!is_replaced)
asts.insert(asts.begin(), column_value);
ASTExpressionList & with = typeid_cast<ASTExpressionList &>(*select.with_expression_list);
auto literal = std::make_shared<ASTLiteral>(StringRange(), value);
literal->alias = column_name;
literal->prefer_alias_to_column_name = true;
with.children.push_back(literal);
}
/// Verifying that the function depends only on the specified columns

View File

@ -138,3 +138,8 @@ void Lock::unlockOrMoveIfFailed(std::vector<zkutil::Lock> & failed_to_unlock_loc
}
}
void Lock::unlockAssumeLockNodeRemovedManually()
{
locked.reset(nullptr);
}

View File

@ -60,6 +60,7 @@ namespace zkutil
void unlock();
void unlockOrMoveIfFailed(std::vector<zkutil::Lock> & failed_to_unlock_locks);
void unlockAssumeLockNodeRemovedManually();
bool tryLock();

View File

@ -11,7 +11,7 @@ namespace zkutil
{
using ACLPtr = const ACL_vector *;
using Stat = Stat;
using Stat = ::Stat;
struct Op
{
@ -19,6 +19,8 @@ public:
Op() : data(new zoo_op_t) {}
virtual ~Op() {}
virtual std::unique_ptr<Op> clone() const = 0;
virtual std::string describe() = 0;
std::unique_ptr<zoo_op_t> data;
@ -31,21 +33,32 @@ public:
struct Op::Remove : public Op
{
Remove(const std::string & path_, int32_t version) :
path(path_)
Remove(const std::string & path_, int32_t version_) :
path(path_), version(version_)
{
zoo_delete_op_init(data.get(), path.c_str(), version);
}
std::unique_ptr<Op> clone() const override
{
return std::unique_ptr<zkutil::Op>(new Remove(path, version));
}
std::string describe() override { return "command: remove, path: " + path; }
private:
std::string path;
int32_t version;
};
struct Op::Create : public Op
{
Create(const std::string & path_, const std::string & value_, ACLPtr acl, int32_t flags);
Create(const std::string & path_, const std::string & value_, ACLPtr acl_, int32_t flags_);
std::unique_ptr<Op> clone() const override
{
return std::unique_ptr<zkutil::Op>(new Create(path, value, acl, flags));
}
std::string getPathCreated()
{
@ -62,17 +75,24 @@ struct Op::Create : public Op
private:
std::string path;
std::string value;
ACLPtr acl;
int32_t flags;
std::vector<char> created_path;
};
struct Op::SetData : public Op
{
SetData(const std::string & path_, const std::string & value_, int32_t version) :
path(path_), value(value_)
SetData(const std::string & path_, const std::string & value_, int32_t version_) :
path(path_), value(value_), version(version_)
{
zoo_set_op_init(data.get(), path.c_str(), value.c_str(), value.size(), version, &stat);
}
std::unique_ptr<Op> clone() const override
{
return std::unique_ptr<zkutil::Op>(new SetData(path, value, version));
}
std::string describe() override
{
return
@ -85,21 +105,28 @@ struct Op::SetData : public Op
private:
std::string path;
std::string value;
int32_t version;
Stat stat;
};
struct Op::Check : public Op
{
Check(const std::string & path_, int32_t version) :
path(path_)
Check(const std::string & path_, int32_t version_) :
path(path_), version(version_)
{
zoo_check_op_init(data.get(), path.c_str(), version);
}
std::unique_ptr<Op> clone() const override
{
return std::unique_ptr<zkutil::Op>(new Check(path, version));
}
std::string describe() override { return "command: check, path: " + path; }
private:
std::string path;
int32_t version;
};
struct OpResult : public zoo_op_result_t

View File

@ -555,7 +555,7 @@ int32_t ZooKeeper::multiImpl(const Ops & ops_, OpResultsPtr * out_results_)
for (const auto & op : ops_)
ops.push_back(*(op->data));
int32_t code = zoo_multi(impl, ops.size(), ops.data(), out_results->data());
int32_t code = zoo_multi(impl, static_cast<int>(ops.size()), ops.data(), out_results->data());
ProfileEvents::increment(ProfileEvents::ZooKeeperMulti);
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
@ -612,15 +612,13 @@ int32_t ZooKeeper::tryMultiWithRetries(const Ops & ops, OpResultsPtr * out_resul
return code;
}
static const int BATCH_SIZE = 100;
void ZooKeeper::removeChildrenRecursive(const std::string & path)
{
Strings children = getChildren(path);
while (!children.empty())
{
zkutil::Ops ops;
for (size_t i = 0; i < BATCH_SIZE && !children.empty(); ++i)
for (size_t i = 0; i < MULTI_BATCH_SIZE && !children.empty(); ++i)
{
removeChildrenRecursive(path + "/" + children.back());
ops.emplace_back(std::make_unique<Op::Remove>(path + "/" + children.back(), -1));
@ -639,7 +637,7 @@ void ZooKeeper::tryRemoveChildrenRecursive(const std::string & path)
{
zkutil::Ops ops;
Strings batch;
for (size_t i = 0; i < BATCH_SIZE && !children.empty(); ++i)
for (size_t i = 0; i < MULTI_BATCH_SIZE && !children.empty(); ++i)
{
batch.push_back(path + "/" + children.back());
children.pop_back();
@ -712,8 +710,8 @@ ZooKeeperPtr ZooKeeper::startNewSession() const
return std::make_shared<ZooKeeper>(hosts, session_timeout_ms);
}
Op::Create::Create(const std::string & path_, const std::string & value_, ACLPtr acl, int32_t flags)
: path(path_), value(value_), created_path(path.size() + ZooKeeper::SEQUENTIAL_SUFFIX_SIZE)
Op::Create::Create(const std::string & path_, const std::string & value_, ACLPtr acl_, int32_t flags_)
: path(path_), value(value_), acl(acl_), flags(flags_), created_path(path.size() + ZooKeeper::SEQUENTIAL_SUFFIX_SIZE)
{
zoo_create_op_init(data.get(), path.c_str(), value.c_str(), value.size(), acl, flags, created_path.data(), created_path.size());
}
@ -904,4 +902,72 @@ ZooKeeper::RemoveFuture ZooKeeper::asyncRemove(const std::string & path)
return future;
}
ZooKeeper::MultiFuture ZooKeeper::asyncMultiImpl(const zkutil::Ops & ops_, bool throw_exception)
{
size_t count = ops_.size();
OpResultsPtr results(new OpResults(count));
/// We need to hold all references to ops data until the end of multi callback
struct OpsHolder
{
std::shared_ptr<zkutil::Ops> ops_ptr = std::make_shared<zkutil::Ops>();
std::shared_ptr<std::vector<zoo_op_t>> ops_raw_ptr = std::make_shared<std::vector<zoo_op_t>>();
} holder;
for (const auto & op : ops_)
{
holder.ops_ptr->emplace_back(op->clone());
holder.ops_raw_ptr->push_back(*holder.ops_ptr->back()->data);
}
MultiFuture future{ [throw_exception, results, holder] (int rc) {
OpResultsAndCode res;
res.code = rc;
res.results = results;
res.ops_ptr = holder.ops_ptr;
if (throw_exception && rc != ZOK)
throw zkutil::KeeperException(rc);
return res;
}};
if (ops_.empty())
{
(**future.task)(ZOK);
return future;
}
/// Workaround of the libzookeeper bug.
/// TODO: check if the bug is fixed in the latest version of libzookeeper.
if (expired())
throw KeeperException(ZINVALIDSTATE);
auto & ops = *holder.ops_raw_ptr;
int32_t code = zoo_amulti(impl, static_cast<int>(ops.size()), ops.data(), results->data(),
[] (int rc, const void * data)
{
MultiFuture::TaskPtr owned_task =
std::move(const_cast<MultiFuture::TaskPtr &>(*static_cast<const MultiFuture::TaskPtr *>(data)));
(*owned_task)(rc);
}, future.task.get());
ProfileEvents::increment(ProfileEvents::ZooKeeperMulti);
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
if (code != ZOK)
throw KeeperException(code);
return future;
}
ZooKeeper::MultiFuture ZooKeeper::tryAsyncMulti(const zkutil::Ops & ops)
{
return asyncMultiImpl(ops, false);
}
ZooKeeper::MultiFuture ZooKeeper::asyncMulti(const zkutil::Ops & ops)
{
return asyncMultiImpl(ops, true);
}
}

View File

@ -31,6 +31,9 @@ const UInt32 DEFAULT_SESSION_TIMEOUT = 30000;
const UInt32 MEDIUM_SESSION_TIMEOUT = 120000;
const UInt32 BIG_SESSION_TIMEOUT = 600000;
/// Preferred size of multi() command (in number of ops)
constexpr size_t MULTI_BATCH_SIZE = 100;
struct WatchContext;
@ -46,7 +49,7 @@ struct WatchContext;
/// Modifying methods do not retry, because it leads to problems of the double-delete type.
///
/// Methods with names not starting at try- raise KeeperException on any error.
class ZooKeeper
class ZooKeeper
{
public:
using Ptr = std::shared_ptr<ZooKeeper>;
@ -92,7 +95,7 @@ public:
/// Throw an exception if something went wrong.
std::string create(const std::string & path, const std::string & data, int32_t mode);
/// Doesn not throw in the following cases:
/// Does not throw in the following cases:
/// * The parent for the created node does not exist
/// * The parent is ephemeral.
/// * The node already exists.
@ -241,7 +244,7 @@ public:
/// The caller is responsible for ensuring that the context lives until the callback
/// is finished and we can't simply pass ownership of the context into function object.
/// Instead, we save the context in a Future object and return it to the caller.
/// The cantext will live until the Future lives.
/// The context will live until the Future lives.
/// Context data is wrapped in an unique_ptr so that its address (which is passed to
/// libzookeeper) remains unchanged after the Future is returned from the function.
///
@ -320,6 +323,19 @@ public:
RemoveFuture asyncRemove(const std::string & path);
struct OpResultsAndCode
{
OpResultsPtr results;
std::shared_ptr<Ops> ops_ptr;
int code;
};
using MultiFuture = Future<OpResultsAndCode, int>;
MultiFuture asyncMulti(const Ops & ops);
/// Like the previous one but don't throw any exceptions on future.get()
MultiFuture tryAsyncMulti(const Ops & ops);
static std::string error2string(int32_t code);
/// Max size of node contents in bytes.
@ -378,6 +394,8 @@ private:
int32_t multiImpl(const Ops & ops, OpResultsPtr * out_results = nullptr);
int32_t existsImpl(const std::string & path, Stat * stat_, WatchCallback watch_callback);
MultiFuture asyncMultiImpl(const zkutil::Ops & ops_, bool throw_exception);
std::string hosts;
int32_t session_timeout_ms;

View File

@ -17,4 +17,4 @@ add_executable (zk_many_watches_reconnect zk_many_watches_reconnect.cpp)
target_link_libraries (zk_many_watches_reconnect dbms)
add_executable (zkutil_test_multi_exception zkutil_test_multi_exception.cpp)
target_link_libraries (zkutil_test_multi_exception dbms)
target_link_libraries (zkutil_test_multi_exception dbms gtest_main)

View File

@ -1,24 +1,28 @@
#include <iostream>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/Exception.h>
#include <iostream>
#include <chrono>
#include <gtest/gtest.h>
using namespace DB;
int main()
TEST(zkutil, multi_nice_exception_msg)
{
auto zookeeper = std::make_unique<zkutil::ZooKeeper>("localhost:2181");
try
{
auto acl = zookeeper->getDefaultACL();
zkutil::Ops ops;
auto acl = zookeeper->getDefaultACL();
zkutil::Ops ops;
ASSERT_NO_THROW(
zookeeper->tryRemoveRecursive("/clickhouse_test_zkutil_multi");
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test_zkutil_multi", "_", acl, zkutil::CreateMode::Persistent));
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test_zkutil_multi/a", "_", acl, zkutil::CreateMode::Persistent));
zookeeper->multi(ops);
);
try
{
ops.clear();
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test_zkutil_multi/c", "_", acl, zkutil::CreateMode::Persistent));
ops.emplace_back(new zkutil::Op::Remove("/clickhouse_test_zkutil_multi/c", -1));
@ -27,6 +31,7 @@ int main()
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test_zkutil_multi/a", "_", acl, zkutil::CreateMode::Persistent));
zookeeper->multi(ops);
FAIL();
}
catch (...)
{
@ -34,16 +39,73 @@ int main()
String msg = getCurrentExceptionMessage(false);
if (msg.find("/clickhouse_test_zkutil_multi/a") == std::string::npos || msg.find("#2") == std::string::npos)
{
std::cerr << "Wrong: " << msg;
return -1;
}
bool msg_has_reqired_patterns = msg.find("/clickhouse_test_zkutil_multi/a") != std::string::npos && msg.find("#2") != std::string::npos;
EXPECT_TRUE(msg_has_reqired_patterns) << msg;
}
}
std::cout << "Ok: " << msg;
return 0;
TEST(zkutil, multi_async)
{
auto zookeeper = std::make_unique<zkutil::ZooKeeper>("localhost:2181");
auto acl = zookeeper->getDefaultACL();
zkutil::Ops ops;
zookeeper->tryRemoveRecursive("/clickhouse_test_zkutil_multi");
{
ops.clear();
auto fut = zookeeper->asyncMulti(ops);
}
std::cerr << "Unexpected";
return -1;
{
ops.clear();
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test_zkutil_multi", "", acl, zkutil::CreateMode::Persistent));
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test_zkutil_multi/a", "", acl, zkutil::CreateMode::Persistent));
auto fut = zookeeper->tryAsyncMulti(ops);
ops.clear();
auto res = fut.get();
ASSERT_TRUE(res.code == ZOK);
ASSERT_EQ(res.results->size(), 2);
ASSERT_EQ(res.ops_ptr->size(), 2);
}
EXPECT_ANY_THROW
(
std::vector<zkutil::ZooKeeper::MultiFuture> futures;
for (size_t i = 0; i < 10000; ++i)
{
ops.clear();
ops.emplace_back(new zkutil::Op::Remove("/clickhouse_test_zkutil_multi", -1));
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test_zkutil_multi", "_", acl, zkutil::CreateMode::Persistent));
ops.emplace_back(new zkutil::Op::Check("/clickhouse_test_zkutil_multi", -1));
ops.emplace_back(new zkutil::Op::SetData("/clickhouse_test_zkutil_multi", "xxx", 42));
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test_zkutil_multi/a", "_", acl, zkutil::CreateMode::Persistent));
futures.emplace_back(zookeeper->asyncMulti(ops));
}
futures[0].get();
);
/// Check there are no segfaults for remaining 999 futures
using namespace std::chrono_literals;
std::this_thread::sleep_for(1s);
{
ops.clear();
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test_zkutil_multi", "_", acl, zkutil::CreateMode::Persistent));
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test_zkutil_multi/a", "_", acl, zkutil::CreateMode::Persistent));
auto fut = zookeeper->tryAsyncMulti(ops);
ops.clear();
auto res = fut.get();
ASSERT_TRUE(res.code == ZNODEEXISTS);
ASSERT_EQ(res.results->size(), 2);
ASSERT_EQ(res.ops_ptr->size(), 2);
}
}

View File

@ -13,7 +13,7 @@ std::string escapeForFileName(const std::string & s)
while (pos != end)
{
char c = *pos;
unsigned char c = *pos;
if (isWordCharASCII(c))
res += c;
@ -38,23 +38,17 @@ std::string unescapeForFileName(const std::string & s)
while (pos != end)
{
if (*pos != '%')
if (!(*pos == '%' && pos + 2 < end))
{
res += *pos;
++pos;
}
else
{
/// skip '%'
if (++pos == end) break;
char val = unhex(*pos) * 16;
if (++pos == end) break;
val += unhex(*pos);
res += val;
++pos;
res += unhex2(pos);
pos += 2;
}
++pos;
}
return res;
}

View File

@ -5,7 +5,7 @@
namespace DB
{
std::vector<std::string> getMultipleKeysFromConfig(Poco::Util::AbstractConfiguration & config, const std::string & root, const std::string & name)
std::vector<std::string> getMultipleKeysFromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & root, const std::string & name)
{
std::vector<std::string> values;
Poco::Util::AbstractConfiguration::Keys config_keys;
@ -20,7 +20,7 @@ std::vector<std::string> getMultipleKeysFromConfig(Poco::Util::AbstractConfigura
}
std::vector<std::string> getMultipleValuesFromConfig(Poco::Util::AbstractConfiguration & config, const std::string & root, const std::string & name)
std::vector<std::string> getMultipleValuesFromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & root, const std::string & name)
{
std::vector<std::string> values;
for (const auto & key : DB::getMultipleKeysFromConfig(config, root, name))

View File

@ -12,7 +12,7 @@ namespace Util
namespace DB
{
/// get all internal key names for given key
std::vector<std::string> getMultipleKeysFromConfig(Poco::Util::AbstractConfiguration & config, const std::string & root, const std::string & name);
std::vector<std::string> getMultipleKeysFromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & root, const std::string & name);
/// Get all values for given key
std::vector<std::string> getMultipleValuesFromConfig(Poco::Util::AbstractConfiguration & config, const std::string & root, const std::string & name);
std::vector<std::string> getMultipleValuesFromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & root, const std::string & name);
}

View File

@ -20,17 +20,20 @@ inline char hexDigitLowercase(unsigned char c)
#include <cstring>
#include <cstddef>
#include <common/Types.h>
/// Maps 0..255 to 00..FF or 00..ff correspondingly
extern const char * const hex_byte_to_char_uppercase_table;
extern const char * const hex_byte_to_char_lowercase_table;
inline void writeHexByteUppercase(unsigned char byte, void * out)
inline void writeHexByteUppercase(UInt8 byte, void * out)
{
memcpy(out, &hex_byte_to_char_uppercase_table[static_cast<size_t>(byte) * 2], 2);
}
inline void writeHexByteLowercase(unsigned char byte, void * out)
inline void writeHexByteLowercase(UInt8 byte, void * out)
{
memcpy(out, &hex_byte_to_char_lowercase_table[static_cast<size_t>(byte) * 2], 2);
}
@ -42,5 +45,21 @@ extern const char * const hex_char_to_digit_table;
inline char unhex(char c)
{
return hex_char_to_digit_table[static_cast<unsigned char>(c)];
return hex_char_to_digit_table[static_cast<UInt8>(c)];
}
inline char unhex2(const char * data)
{
return
static_cast<UInt8>(unhex(data[0])) * 0x10
+ static_cast<UInt8>(unhex(data[1]));
}
inline UInt16 unhex4(const char * data)
{
return
static_cast<UInt16>(unhex(data[0])) * 0x1000
+ static_cast<UInt16>(unhex(data[1])) * 0x100
+ static_cast<UInt16>(unhex(data[2])) * 0x10
+ static_cast<UInt16>(unhex(data[3]));
}

View File

@ -35,6 +35,15 @@ std::ostream & operator<<(std::ostream & stream, const DB::IColumn & what);
#include <Client/Connection.h>
std::ostream & operator<<(std::ostream & stream, const DB::Connection::Packet & what);
#include <Common/PODArray.h>
template <typename T, size_t INITIAL_SIZE, typename TAllocator, size_t pad_right_>
std::ostream & operator<<(std::ostream & stream, const DB::PODArray<T, INITIAL_SIZE, TAllocator, pad_right_> & what)
{
stream << "PODArray(size = " << what.size() << ", capacity = " << what.capacity() << ")";
dumpContainer(stream, what);
return stream;
};
/// some operator<< should be declared before operator<<(... std::shared_ptr<>)
#include <common/iostream_debug_helpers.h>

View File

@ -0,0 +1,13 @@
#include <Common/escapeForFileName.h>
#include <gtest/gtest.h>
using namespace DB;
TEST(Common, unescapeForFileName)
{
EXPECT_EQ(unescapeForFileName(escapeForFileName("172.19.0.6")), "172.19.0.6");
EXPECT_EQ(unescapeForFileName(escapeForFileName("abcd.")), "abcd.");
EXPECT_EQ(unescapeForFileName(escapeForFileName("abcd")), "abcd");
EXPECT_EQ(unescapeForFileName(escapeForFileName("..::")), "..::");
}

View File

@ -204,7 +204,7 @@ void report(const char * name, size_t n, double elapsed, UInt64 tsc_diff, size_t
std::cerr << name << std::endl
<< "Done in " << elapsed
<< " (" << n / elapsed << " elem/sec."
<< ", " << n * sizeof(UInt64) / elapsed / (1 << 30) << " GiB/sec."
<< ", " << n * sizeof(UInt64) / elapsed / (1ULL << 30) << " GiB/sec."
<< ", " << (tsc_diff * 1.0 / n) << " tick/elem)"
<< "; res = " << res
<< std::endl << std::endl;

View File

@ -60,9 +60,6 @@
/// And NULL map.
#define NULL_MAP_COLUMN_NAME_SUFFIX ".null"
#define DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES 50264
#define DBMS_MIN_REVISION_WITH_TOTAL_ROWS_IN_PROGRESS 51554
#define DBMS_MIN_REVISION_WITH_BLOCK_INFO 51903
#define DBMS_MIN_REVISION_WITH_CLIENT_INFO 54032
#define DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE 54058
#define DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO 54060

View File

@ -381,7 +381,8 @@ namespace ErrorCodes
extern const int CANNOT_PARSE_UUID = 376;
extern const int ILLEGAL_SYNTAX_FOR_DATA_TYPE = 377;
extern const int DATA_TYPE_CANNOT_HAVE_ARGUMENTS = 378;
extern const int MULTIPLE_STREAMS_REQUIRED = 379;
extern const int UNKNOWN_STATUS_OF_DISTRIBUTED_DDL_TASK = 379;
extern const int MULTIPLE_STREAMS_REQUIRED = 380;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -17,9 +17,7 @@ void Progress::read(ReadBuffer & in, UInt64 server_revision)
readVarUInt(new_rows, in);
readVarUInt(new_bytes, in);
if (server_revision >= DBMS_MIN_REVISION_WITH_TOTAL_ROWS_IN_PROGRESS)
readVarUInt(new_total_rows, in);
readVarUInt(new_total_rows, in);
rows = new_rows;
bytes = new_bytes;
@ -31,9 +29,7 @@ void Progress::write(WriteBuffer & out, UInt64 client_revision) const
{
writeVarUInt(rows.load(), out);
writeVarUInt(bytes.load(), out);
if (client_revision >= DBMS_MIN_REVISION_WITH_TOTAL_ROWS_IN_PROGRESS)
writeVarUInt(total_rows.load(), out);
writeVarUInt(total_rows.load(), out);
}

View File

@ -3,6 +3,7 @@ target_link_libraries (exception dbms)
add_executable (string_pool string_pool.cpp)
target_link_libraries (string_pool dbms)
target_include_directories (string_pool BEFORE PRIVATE ${SPARCEHASH_INCLUDE_DIR})
add_executable (field field.cpp)
target_link_libraries (field dbms)

View File

@ -16,7 +16,7 @@ namespace DB
* During this for each group of consecutive identical values of the primary key (the columns by which the data is sorted),
* merges them into one row. When merging, the data is pre-aggregated - merge of states of aggregate functions,
* corresponding to a one value of the primary key. For columns that are not part of the primary key and which do not have the AggregateFunction type,
* when merged, the first random value is selected.
* when merged, the first value is selected.
*/
class AggregatingSortedBlockInputStream : public MergingSortedBlockInputStream
{

View File

@ -16,17 +16,31 @@ public:
using Generator = std::function<BlockInputStreamPtr()>;
LazyBlockInputStream(Generator generator_)
: generator(generator_) {}
: generator(std::move(generator_))
{
}
String getName() const override { return "Lazy"; }
LazyBlockInputStream(const char * name_, Generator generator_)
: name(name_)
, generator(std::move(generator_))
{
}
String getName() const override { return name; }
String getID() const override
{
std::stringstream res;
res << "Lazy(" << this << ")";
res << name << "(" << this << ")";
return res.str();
}
void cancel() override
{
std::lock_guard<std::mutex> lock(cancel_mutex);
IProfilingBlockInputStream::cancel();
}
protected:
Block readImpl() override
{
@ -37,9 +51,9 @@ protected:
if (!input)
return Block();
children.push_back(input);
auto * p_input = dynamic_cast<IProfilingBlockInputStream *>(input.get());
if (IProfilingBlockInputStream * p_input = dynamic_cast<IProfilingBlockInputStream *>(input.get()))
if (p_input)
{
/// They could have been set before, but were not passed into the `input`.
if (progress_callback)
@ -47,14 +61,29 @@ protected:
if (process_list_elem)
p_input->setProcessListElement(process_list_elem);
}
input->readPrefix();
{
std::lock_guard<std::mutex> lock(cancel_mutex);
children.push_back(input);
if (isCancelled() && p_input)
p_input->cancel();
}
}
return input->read();
}
private:
const char * name = "Lazy";
Generator generator;
BlockInputStreamPtr input;
std::mutex cancel_mutex;
};
}

View File

@ -74,7 +74,7 @@ Block NativeBlockInputStream::readImpl()
}
/// Additional information about the block.
if (server_revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO)
if (server_revision > 0)
res.info.read(istr);
/// Dimensions

View File

@ -60,8 +60,7 @@ struct IndexForNativeFormat
class NativeBlockInputStream : public IProfilingBlockInputStream
{
public:
/** If a non-zero server_revision is specified, additional block information may be expected and read,
* depending on what is supported for the specified revision.
/** If a non-zero server_revision is specified, additional block information may be expected and read.
*
* `index` is not required parameter. If set, only parts of columns specified in the index will be read.
*/

View File

@ -68,7 +68,7 @@ void NativeBlockOutputStream::writeData(const IDataType & type, const ColumnPtr
void NativeBlockOutputStream::write(const Block & block)
{
/// Additional information about the block.
if (client_revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO)
if (client_revision > 0)
block.info.write(ostr);
/// Dimensions

View File

@ -20,8 +20,7 @@ class CompressedWriteBuffer;
class NativeBlockOutputStream : public IBlockOutputStream
{
public:
/** If non-zero client_revision is specified, additional block information can be written,
* depending on what is supported for the specified revision.
/** If non-zero client_revision is specified, additional block information can be written.
*/
NativeBlockOutputStream(
WriteBuffer & ostr_, UInt64 client_revision_ = 0,

View File

@ -16,31 +16,64 @@ namespace ErrorCodes
}
RemoteBlockInputStream::RemoteBlockInputStream(Connection & connection_, const String & query_,
const Settings * settings_, const Context & context_, ThrottlerPtr throttler_,
const Tables & external_tables_, QueryProcessingStage::Enum stage_)
: connection(&connection_), query(query_), throttler(throttler_), external_tables(external_tables_),
stage(stage_), context(context_)
RemoteBlockInputStream::RemoteBlockInputStream(
Connection & connection,
const String & query_, const Context & context_, const Settings * settings,
const ThrottlerPtr & throttler, const Tables & external_tables_, QueryProcessingStage::Enum stage_)
: query(query_), context(context_), external_tables(external_tables_), stage(stage_)
{
init(settings_);
if (settings)
context.setSettings(*settings);
create_multiplexed_connections = [this, &connection, throttler]()
{
return std::make_unique<MultiplexedConnections>(connection, context.getSettingsRef(), throttler);
};
}
RemoteBlockInputStream::RemoteBlockInputStream(const ConnectionPoolWithFailoverPtr & pool_, const String & query_,
const Settings * settings_, const Context & context_, ThrottlerPtr throttler_,
const Tables & external_tables_, QueryProcessingStage::Enum stage_)
: pool(pool_), query(query_), throttler(throttler_), external_tables(external_tables_),
stage(stage_), context(context_)
RemoteBlockInputStream::RemoteBlockInputStream(
std::vector<IConnectionPool::Entry> && connections,
const String & query_, const Context & context_, const Settings * settings,
const ThrottlerPtr & throttler, const Tables & external_tables_, QueryProcessingStage::Enum stage_)
: query(query_), context(context_), external_tables(external_tables_), stage(stage_)
{
init(settings_);
if (settings)
context.setSettings(*settings);
create_multiplexed_connections = [this, connections, throttler]() mutable
{
return std::make_unique<MultiplexedConnections>(
std::move(connections), context.getSettingsRef(), throttler, append_extra_info);
};
}
RemoteBlockInputStream::RemoteBlockInputStream(ConnectionPoolWithFailoverPtrs && pools_, const String & query_,
const Settings * settings_, const Context & context_, ThrottlerPtr throttler_,
const Tables & external_tables_, QueryProcessingStage::Enum stage_)
: pools(std::move(pools_)), query(query_), throttler(throttler_), external_tables(external_tables_),
stage(stage_), context(context_)
RemoteBlockInputStream::RemoteBlockInputStream(
const ConnectionPoolWithFailoverPtr & pool,
const String & query_, const Context & context_, const Settings * settings,
const ThrottlerPtr & throttler, const Tables & external_tables_, QueryProcessingStage::Enum stage_)
: query(query_), context(context_), external_tables(external_tables_), stage(stage_)
{
init(settings_);
if (settings)
context.setSettings(*settings);
create_multiplexed_connections = [this, pool, throttler]()
{
const Settings & settings = context.getSettingsRef();
std::vector<IConnectionPool::Entry> connections;
if (main_table)
{
auto try_results = pool->getManyChecked(&settings, pool_mode, main_table.value());
connections.reserve(try_results.size());
for (auto & try_result : try_results)
connections.emplace_back(std::move(try_result.entry));
}
else
connections = pool->getMany(&settings, pool_mode);
return std::make_unique<MultiplexedConnections>(
std::move(connections), settings, throttler, append_extra_info);
};
}
RemoteBlockInputStream::~RemoteBlockInputStream()
@ -222,39 +255,9 @@ void RemoteBlockInputStream::readSuffixImpl()
}
}
void RemoteBlockInputStream::createMultiplexedConnections()
{
Settings * multiplexed_connections_settings = send_settings ? &context.getSettingsRef() : nullptr;
const QualifiedTableName * main_table_ptr = main_table ? &main_table.value() : nullptr;
if (connection != nullptr)
multiplexed_connections = std::make_unique<MultiplexedConnections>(
connection, multiplexed_connections_settings, throttler);
else if (pool != nullptr)
multiplexed_connections = std::make_unique<MultiplexedConnections>(
*pool, multiplexed_connections_settings, throttler,
append_extra_info, pool_mode, main_table_ptr);
else if (!pools.empty())
multiplexed_connections = std::make_unique<MultiplexedConnections>(
pools, multiplexed_connections_settings, throttler,
append_extra_info, pool_mode, main_table_ptr);
else
throw Exception("Internal error", ErrorCodes::LOGICAL_ERROR);
}
void RemoteBlockInputStream::init(const Settings * settings)
{
if (settings)
{
send_settings = true;
context.setSettings(*settings);
}
else
send_settings = false;
}
void RemoteBlockInputStream::sendQuery()
{
createMultiplexedConnections();
multiplexed_connections = create_multiplexed_connections();
if (context.getSettingsRef().skip_unavailable_shards && 0 == multiplexed_connections->size())
return;

View File

@ -20,20 +20,29 @@ namespace DB
class RemoteBlockInputStream : public IProfilingBlockInputStream
{
public:
/// Takes already set connection
RemoteBlockInputStream(Connection & connection_, const String & query_, const Settings * settings_,
const Context & context_, ThrottlerPtr throttler_ = nullptr, const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete);
/// Takes already set connection.
/// If `settings` is nullptr, settings will be taken from context.
RemoteBlockInputStream(
Connection & connection,
const String & query_, const Context & context_, const Settings * settings = nullptr,
const ThrottlerPtr & throttler = nullptr, const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete);
/// Takes a pool and gets one or several connections from it
RemoteBlockInputStream(const ConnectionPoolWithFailoverPtr & pool_, const String & query_, const Settings * settings_,
const Context & context_, ThrottlerPtr throttler_ = nullptr, const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete);
/// Accepts several connections already taken from pool.
/// If `settings` is nullptr, settings will be taken from context.
RemoteBlockInputStream(
std::vector<IConnectionPool::Entry> && connections,
const String & query_, const Context & context_, const Settings * settings = nullptr,
const ThrottlerPtr & throttler = nullptr, const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete);
/// Takes a pool for each shard and gets one or several connections from it
RemoteBlockInputStream(ConnectionPoolWithFailoverPtrs && pools_, const String & query_, const Settings * settings_,
const Context & context_, ThrottlerPtr throttler_ = nullptr, const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete);
/// Takes a pool and gets one or several connections from it.
/// If `settings` is nullptr, settings will be taken from context.
RemoteBlockInputStream(
const ConnectionPoolWithFailoverPtr & pool,
const String & query_, const Context & context_, const Settings * settings = nullptr,
const ThrottlerPtr & throttler = nullptr, const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete);
~RemoteBlockInputStream() override;
@ -77,9 +86,6 @@ protected:
void readSuffixImpl() override;
/// Creates an object to talk to one shard's replicas performing query
void createMultiplexedConnections();
/// Returns true if query was sent
bool isQueryPending() const;
@ -87,35 +93,23 @@ protected:
bool hasThrownException() const;
private:
void init(const Settings * settings);
void sendQuery();
/// If wasn't sent yet, send request to cancell all connections to replicas
void tryCancel(const char * reason);
private:
/// Already set connection
Connection * connection = nullptr;
/// One shard's connections pool
ConnectionPoolWithFailoverPtr pool = nullptr;
/// Connections pools of one or several shards
ConnectionPoolWithFailoverPtrs pools;
std::function<std::unique_ptr<MultiplexedConnections>()> create_multiplexed_connections;
std::unique_ptr<MultiplexedConnections> multiplexed_connections;
const String query;
bool send_settings;
/// If != nullptr, used to limit network trafic
ThrottlerPtr throttler;
Context context;
/// Temporary tables needed to be sent to remote servers
Tables external_tables;
QueryProcessingStage::Enum stage;
Context context;
/// Threads for reading from temporary tables and following sending of data
/// Streams for reading from temporary tables and following sending of data
/// to remote servers for GLOBAL-subqueries
std::vector<ExternalTablesData> external_tables_data;
std::mutex external_tables_mutex;

View File

@ -72,9 +72,9 @@ void DataTypeDateTime::serializeTextCSV(const IColumn & column, size_t row_num,
void DataTypeDateTime::deserializeTextCSV(IColumn & column, ReadBuffer & istr, const char delimiter) const
{
LocalDateTime value;
readCSV(value, istr);
static_cast<ColumnUInt32 &>(column).getData().push_back(static_cast<time_t>(value));
time_t x;
readCSVSimple(x, istr, readDateTimeText);
static_cast<ColumnUInt32 &>(column).getData().push_back(x);
}
void registerDataTypeDateTime(DataTypeFactory & factory)

View File

@ -138,8 +138,8 @@ static NO_INLINE void deserializeBinarySSE2(ColumnString::Chars_t & data, Column
while (sse_src_pos < sse_src_end)
{
/// NOTE gcc 4.9.2 expands the loop, but for some reason uses only one xmm register.
///for (size_t j = 0; j < UNROLL_TIMES; ++j)
/// NOTE gcc 4.9.2 unrolls the loop, but for some reason uses only one xmm register.
/// for (size_t j = 0; j < UNROLL_TIMES; ++j)
/// _mm_storeu_si128(sse_dst_pos + j, _mm_loadu_si128(sse_src_pos + j));
sse_src_pos += UNROLL_TIMES;

View File

@ -351,6 +351,10 @@ void DatabaseOrdinary::renameTable(
to_database_concrete->name,
to_table_name);
}
catch (const Exception & e)
{
throw;
}
catch (const Poco::Exception & e)
{
/// More good diagnostics.

View File

@ -137,6 +137,11 @@ void CacheDictionary::isInImpl(
{
out[out_idx] = 1;
}
/// Loop detected
else if (children[new_children_idx] == parents[parents_idx])
{
out[out_idx] = 1;
}
/// Found intermediate parent, add this value to search at next loop iteration
else
{

View File

@ -71,7 +71,7 @@ BlockInputStreamPtr ClickHouseDictionarySource::loadAll()
*/
if (is_local)
return executeQuery(load_all_query, context, true).in;
return std::make_shared<RemoteBlockInputStream>(pool, load_all_query, nullptr, context);
return std::make_shared<RemoteBlockInputStream>(pool, load_all_query, context);
}
@ -101,7 +101,7 @@ BlockInputStreamPtr ClickHouseDictionarySource::createStreamForSelectiveLoad(con
{
if (is_local)
return executeQuery(query, context, true).in;
return std::make_shared<RemoteBlockInputStream>(pool, query, nullptr, context);
return std::make_shared<RemoteBlockInputStream>(pool, query, context);
}
}

View File

@ -3,7 +3,6 @@
#include <Core/Block.h>
#include <Dictionaries/DictionaryStructure.h>
#include <Dictionaries/FileDictionarySource.h>
#include <Dictionaries/MySQLDictionarySource.h>
#include <Dictionaries/ClickHouseDictionarySource.h>
#include <Dictionaries/ExecutableDictionarySource.h>
#include <Dictionaries/HTTPDictionarySource.h>

View File

@ -66,7 +66,7 @@ add_library(clickhouse_functions ${clickhouse_functions_sources})
target_link_libraries(clickhouse_functions dbms)
target_include_directories (clickhouse_functions BEFORE PUBLIC ${ClickHouse_SOURCE_DIR}/contrib/libfarmhash)
target_include_directories (clickhouse_functions BEFORE PUBLIC ${ClickHouse_SOURCE_DIR}/contrib/libmetrohash/src)
target_include_directories (clickhouse_functions BEFORE PUBLIC ${CITYHASH_INCLUDE_DIR})
target_include_directories (clickhouse_functions BEFORE PUBLIC ${DIVIDE_INCLUDE_DIR})
if (USE_VECTORCLASS)
target_include_directories (clickhouse_functions BEFORE PUBLIC ${VECTORCLASS_INCLUDE_DIR})

View File

@ -1,6 +1,11 @@
#include <Functions/FunctionFactory.h>
#include <Interpreters/Context.h>
#include <Common/Exception.h>
#include <Poco/String.h>
namespace DB
{
@ -8,10 +13,23 @@ namespace DB
namespace ErrorCodes
{
extern const int UNKNOWN_FUNCTION;
extern const int LOGICAL_ERROR;
}
FunctionFactory::FunctionFactory()
void FunctionFactory::registerFunction(const
std::string & name,
Creator creator,
CaseSensitiveness case_sensitiveness)
{
if (!functions.emplace(name, creator).second)
throw Exception("FunctionFactory: the function name '" + name + "' is not unique",
ErrorCodes::LOGICAL_ERROR);
if (case_sensitiveness == CaseInsensitive
&& !case_insensitive_functions.emplace(Poco::toLower(name), creator).second)
throw Exception("FunctionFactory: the case insensitive function name '" + name + "' is not unique",
ErrorCodes::LOGICAL_ERROR);
}
@ -33,8 +51,12 @@ FunctionPtr FunctionFactory::tryGet(
auto it = functions.find(name);
if (functions.end() != it)
return it->second(context);
else
return {};
it = case_insensitive_functions.find(Poco::toLower(name));
if (case_insensitive_functions.end() != it)
return it->second(context);
return {};
}
}

View File

@ -1,23 +1,19 @@
#pragma once
#include <string>
#include <memory>
#include <unordered_map>
#include <Functions/IFunction.h>
#include <ext/singleton.h>
#include <Common/Exception.h>
#include <functional>
#include <memory>
#include <string>
#include <unordered_map>
namespace DB
{
class Context;
class IFunction;
using FunctionPtr = std::shared_ptr<IFunction>;
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
/** Creates function by name.
@ -28,25 +24,40 @@ class FunctionFactory : public ext::singleton<FunctionFactory>
{
friend class StorageSystemFunctions;
private:
using Creator = FunctionPtr(*)(const Context & context); /// Not std::function, for lower object size and less indirection.
std::unordered_map<std::string, Creator> functions;
public:
FunctionFactory();
using Creator = std::function<FunctionPtr(const Context &)>;
FunctionPtr get(const std::string & name, const Context & context) const; /// Throws an exception if not found.
FunctionPtr tryGet(const std::string & name, const Context & context) const; /// Returns nullptr if not found.
/// No locking, you must register all functions before usage of get, tryGet.
template <typename Function> void registerFunction()
/// For compatibility with SQL, it's possible to specify that certain function name is case insensitive.
enum CaseSensitiveness
{
static_assert(std::is_same<decltype(&Function::create), Creator>::value, "Function::create has incorrect type");
CaseSensitive,
CaseInsensitive
};
if (!functions.emplace(std::string(Function::name), &Function::create).second)
throw Exception("FunctionFactory: the function name '" + std::string(Function::name) + "' is not unique",
ErrorCodes::LOGICAL_ERROR);
/// Register a function by its name.
/// No locking, you must register all functions before usage of get.
void registerFunction(
const std::string & name,
Creator creator,
CaseSensitiveness case_sensitiveness = CaseSensitive);
template <typename Function>
void registerFunction()
{
registerFunction(Function::name, &Function::create);
}
/// Throws an exception if not found.
FunctionPtr get(const std::string & name, const Context & context) const;
/// Returns nullptr if not found.
FunctionPtr tryGet(const std::string & name, const Context & context) const;
private:
using Functions = std::unordered_map<std::string, Creator>;
Functions functions;
Functions case_insensitive_functions;
};
}

View File

@ -1401,7 +1401,7 @@ bool FunctionArrayUniq::executeNumber(const ColumnArray * array, const IColumn *
const typename ColumnVector<T>::Container_t & values = nested->getData();
using Set = ClearableHashSet<T, DefaultHash<T>, HashTableGrower<INITIAL_SIZE_DEGREE>,
HashTableAllocatorWithStackMemory<(1 << INITIAL_SIZE_DEGREE) * sizeof(T)>>;
HashTableAllocatorWithStackMemory<(1ULL << INITIAL_SIZE_DEGREE) * sizeof(T)>>;
const PaddedPODArray<UInt8> * null_map_data = nullptr;
if (null_map)
@ -1447,7 +1447,7 @@ bool FunctionArrayUniq::executeString(const ColumnArray * array, const IColumn *
const ColumnArray::Offsets_t & offsets = array->getOffsets();
using Set = ClearableHashSet<StringRef, StringRefHash, HashTableGrower<INITIAL_SIZE_DEGREE>,
HashTableAllocatorWithStackMemory<(1 << INITIAL_SIZE_DEGREE) * sizeof(StringRef)>>;
HashTableAllocatorWithStackMemory<(1ULL << INITIAL_SIZE_DEGREE) * sizeof(StringRef)>>;
const PaddedPODArray<UInt8> * null_map_data = nullptr;
if (null_map)
@ -1514,7 +1514,7 @@ bool FunctionArrayUniq::execute128bit(
return false;
using Set = ClearableHashSet<UInt128, UInt128HashCRC32, HashTableGrower<INITIAL_SIZE_DEGREE>,
HashTableAllocatorWithStackMemory<(1 << INITIAL_SIZE_DEGREE) * sizeof(UInt128)>>;
HashTableAllocatorWithStackMemory<(1ULL << INITIAL_SIZE_DEGREE) * sizeof(UInt128)>>;
/// Suppose that, for a given row, each of the N columns has an array whose length is M.
/// Denote arr_i each of these arrays (1 <= i <= N). Then the following is performed:
@ -1575,7 +1575,7 @@ void FunctionArrayUniq::executeHashed(
size_t count = columns.size();
using Set = ClearableHashSet<UInt128, UInt128TrivialHash, HashTableGrower<INITIAL_SIZE_DEGREE>,
HashTableAllocatorWithStackMemory<(1 << INITIAL_SIZE_DEGREE) * sizeof(UInt128)>>;
HashTableAllocatorWithStackMemory<(1ULL << INITIAL_SIZE_DEGREE) * sizeof(UInt128)>>;
Set set;
size_t prev_off = 0;
@ -1727,7 +1727,7 @@ bool FunctionArrayEnumerateUniq::executeNumber(const ColumnArray * array, const
const typename ColumnVector<T>::Container_t & values = nested->getData();
using ValuesToIndices = ClearableHashMap<T, UInt32, DefaultHash<T>, HashTableGrower<INITIAL_SIZE_DEGREE>,
HashTableAllocatorWithStackMemory<(1 << INITIAL_SIZE_DEGREE) * sizeof(T)>>;
HashTableAllocatorWithStackMemory<(1ULL << INITIAL_SIZE_DEGREE) * sizeof(T)>>;
const PaddedPODArray<UInt8> * null_map_data = nullptr;
if (null_map)
@ -1772,7 +1772,7 @@ bool FunctionArrayEnumerateUniq::executeString(const ColumnArray * array, const
size_t prev_off = 0;
using ValuesToIndices = ClearableHashMap<StringRef, UInt32, StringRefHash, HashTableGrower<INITIAL_SIZE_DEGREE>,
HashTableAllocatorWithStackMemory<(1 << INITIAL_SIZE_DEGREE) * sizeof(StringRef)>>;
HashTableAllocatorWithStackMemory<(1ULL << INITIAL_SIZE_DEGREE) * sizeof(StringRef)>>;
const PaddedPODArray<UInt8> * null_map_data = nullptr;
if (null_map)
@ -1840,7 +1840,7 @@ bool FunctionArrayEnumerateUniq::execute128bit(
return false;
using ValuesToIndices = ClearableHashMap<UInt128, UInt32, UInt128HashCRC32, HashTableGrower<INITIAL_SIZE_DEGREE>,
HashTableAllocatorWithStackMemory<(1 << INITIAL_SIZE_DEGREE) * sizeof(UInt128)>>;
HashTableAllocatorWithStackMemory<(1ULL << INITIAL_SIZE_DEGREE) * sizeof(UInt128)>>;
ValuesToIndices indices;
size_t prev_off = 0;
@ -1886,7 +1886,7 @@ void FunctionArrayEnumerateUniq::executeHashed(
size_t count = columns.size();
using ValuesToIndices = ClearableHashMap<UInt128, UInt32, UInt128TrivialHash, HashTableGrower<INITIAL_SIZE_DEGREE>,
HashTableAllocatorWithStackMemory<(1 << INITIAL_SIZE_DEGREE) * sizeof(UInt128)>>;
HashTableAllocatorWithStackMemory<(1ULL << INITIAL_SIZE_DEGREE) * sizeof(UInt128)>>;
ValuesToIndices indices;
size_t prev_off = 0;

View File

@ -1039,7 +1039,7 @@ private:
size_t dst_pos = 0;
for (; dst_pos < num_bytes; ++dst_pos)
{
dst[dst_pos] = unhex(src[src_pos]) * 16 + unhex(src[src_pos + 1]);
dst[dst_pos] = unhex2(reinterpret_cast<const char *>(&src[src_pos]));
src_pos += 2;
}
}
@ -1450,12 +1450,8 @@ public:
}
while (pos < end)
{
UInt8 major = unhex(*pos);
++pos;
UInt8 minor = unhex(*pos);
++pos;
*out = (major << 4) | minor;
*out = unhex2(pos);
pos += 2;
++out;
}
*out = '\0';

View File

@ -20,8 +20,13 @@ void registerFunctionsConditional(FunctionFactory & factory)
factory.registerFunction<FunctionMultiIf>();
factory.registerFunction<FunctionCaseWithExpression>();
factory.registerFunction<FunctionCaseWithoutExpression>();
/// These are obsolete function names.
factory.registerFunction("caseWithExpr", FunctionCaseWithExpression::create);
factory.registerFunction("caseWithoutExpr", FunctionCaseWithoutExpression::create);
}
namespace
{

View File

@ -1431,9 +1431,16 @@ private:
if (0 == id)
continue;
auto & hierarchy = hierarchies[i];
/// Checking for loop
if (std::find(std::begin(hierarchy), std::end(hierarchy), id) != std::end(hierarchy))
continue;
all_zeroes = false;
/// place id at it's corresponding place
hierarchies[i].push_back(id);
hierarchy.push_back(id);
++total_count;
}

View File

@ -871,10 +871,18 @@ public:
throw Exception("Third argument provided for function substring could not be negative.", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
}
if (const ColumnString * col = checkAndGetColumn<ColumnString>(&*column_string))
executeForSource(column_start, column_length, column_start_const, column_length_const, start_value, length_value, block, result, StringSource(*col));
else if (const ColumnFixedString * col = checkAndGetColumn<ColumnFixedString>(&*column_string))
executeForSource(column_start, column_length, column_start_const, column_length_const, start_value, length_value, block, result, FixedStringSource(*col));
if (const ColumnString * col = checkAndGetColumn<ColumnString>(column_string.get()))
executeForSource(column_start, column_length, column_start_const, column_length_const, start_value, length_value,
block, result, StringSource(*col));
else if (const ColumnFixedString * col = checkAndGetColumn<ColumnFixedString>(column_string.get()))
executeForSource(column_start, column_length, column_start_const, column_length_const, start_value, length_value,
block, result, FixedStringSource(*col));
else if (const ColumnConst * col = checkAndGetColumnConst<ColumnString>(column_string.get()))
executeForSource(column_start, column_length, column_start_const, column_length_const, start_value, length_value,
block, result, ConstSource<StringSource>(*col));
else if (const ColumnConst * col = checkAndGetColumnConst<ColumnFixedString>(column_string.get()))
executeForSource(column_start, column_length, column_start_const, column_length_const, start_value, length_value,
block, result, ConstSource<FixedStringSource>(*col));
else
throw Exception(
"Illegal column " + block.getByPosition(arguments[0]).column->getName() + " of first argument of function " + getName(),
@ -945,7 +953,7 @@ public:
if (start >= 0x8000000000000000ULL || length >= 0x8000000000000000ULL)
throw Exception("Too large values of 2nd or 3rd argument provided for function substring.", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
if (const ColumnString * col = checkAndGetColumn<ColumnString>(&*column_string))
if (const ColumnString * col = checkAndGetColumn<ColumnString>(column_string.get()))
{
std::shared_ptr<ColumnString> col_res = std::make_shared<ColumnString>();
block.getByPosition(result).column = col_res;

View File

@ -20,6 +20,7 @@ namespace DB
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int BAD_ARGUMENTS;
}
@ -210,6 +211,9 @@ public:
ErrorCodes::ILLEGAL_COLUMN);
sep = col->getValue<String>();
if (sep.empty())
throw Exception("Illegal separator for function " + getName() + ". Must be not empty.", ErrorCodes::BAD_ARGUMENTS);
}
/// Returns the position of the argument that is the column of strings

View File

@ -143,102 +143,11 @@ struct ExtractRaw
struct ExtractString
{
static UInt64 unhexCodePoint(const UInt8 * pos)
{
return unhex(pos[0]) * 0xFFF
+ unhex(pos[1]) * 0xFF
+ unhex(pos[2]) * 0xF
+ unhex(pos[3]);
}
static bool tryExtract(const UInt8 * pos, const UInt8 * end, ColumnString::Chars_t & res_data)
{
if (pos == end || *pos != '"')
return false;
++pos;
while (pos != end)
{
switch (*pos)
{
case '\\':
++pos;
if (pos >= end)
return false;
switch(*pos)
{
case '"':
res_data.push_back('"');
break;
case '\\':
res_data.push_back('\\');
break;
case '/':
res_data.push_back('/');
break;
case 'b':
res_data.push_back('\b');
break;
case 'f':
res_data.push_back('\f');
break;
case 'n':
res_data.push_back('\n');
break;
case 'r':
res_data.push_back('\r');
break;
case 't':
res_data.push_back('\t');
break;
case 'u':
{
++pos;
if (pos + 4 > end)
return false;
UInt16 code_point = unhexCodePoint(pos);
pos += 3;
static constexpr size_t max_code_point_byte_length = 4;
size_t old_size = res_data.size();
res_data.resize(old_size + max_code_point_byte_length);
Poco::UTF8Encoding utf8;
int length = utf8.convert(code_point,
&res_data[old_size], max_code_point_byte_length);
if (!length)
return false;
res_data.resize(old_size + length);
break;
}
default:
res_data.push_back(*pos);
break;
}
++pos;
break;
case '"':
return true;
default:
res_data.push_back(*pos);
++pos;
break;
}
}
return false;
}
static void extract(const UInt8 * pos, const UInt8 * end, ColumnString::Chars_t & res_data)
{
size_t old_size = res_data.size();
if (!tryExtract(pos, end, res_data))
ReadBufferFromMemory in(pos, end - pos);
if (!tryReadJSONStringInto(res_data, in))
res_data.resize(old_size);
}
};

View File

@ -107,16 +107,16 @@ struct NumericArraySource
Slice getSliceFromRight(size_t offset) const
{
size_t elem_size = offsets[row_num] - prev_offset;
if (offset >= elem_size)
return {&elements[prev_offset], 0};
if (offset > elem_size)
return {&elements[prev_offset], elem_size};
return {&elements[offsets[row_num] - offset], offset};
}
Slice getSliceFromRight(size_t offset, size_t length) const
{
size_t elem_size = offsets[row_num] - prev_offset;
if (offset >= elem_size)
return {&elements[prev_offset], 0};
if (offset > elem_size)
return {&elements[prev_offset], elem_size};
return {&elements[offsets[row_num] - offset], std::min(length, offset)};
}
};
@ -246,16 +246,16 @@ struct StringSource
Slice getSliceFromRight(size_t offset) const
{
size_t elem_size = offsets[row_num] - prev_offset - 1;
if (offset >= elem_size)
return {&elements[prev_offset], 0};
if (offset > elem_size)
return {&elements[prev_offset], elem_size};
return {&elements[prev_offset + elem_size - offset], offset};
}
Slice getSliceFromRight(size_t offset, size_t length) const
{
size_t elem_size = offsets[row_num] - prev_offset - 1;
if (offset >= elem_size)
return {&elements[prev_offset], 0};
if (offset > elem_size)
return {&elements[prev_offset], elem_size};
return {&elements[prev_offset + elem_size - offset], std::min(length, offset)};
}
};
@ -276,7 +276,7 @@ struct FixedStringSource
{
const auto & chars = col.getChars();
pos = chars.data();
end = pos + col.size();
end = pos + chars.size();
}
void next()
@ -321,15 +321,15 @@ struct FixedStringSource
Slice getSliceFromRight(size_t offset) const
{
if (offset >= string_size)
return {pos, 0};
if (offset > string_size)
return {pos, string_size};
return {pos + string_size - offset, offset};
}
Slice getSliceFromRight(size_t offset, size_t length) const
{
if (offset >= string_size)
return {pos, 0};
if (offset > string_size)
return {pos, string_size};
return {pos + string_size - offset, std::min(length, offset)};
}
};
@ -560,16 +560,16 @@ struct GenericArraySource
Slice getSliceFromRight(size_t offset) const
{
size_t elem_size = offsets[row_num] - prev_offset;
if (offset >= elem_size)
return {&elements, prev_offset, 0};
if (offset > elem_size)
return {&elements, prev_offset, elem_size};
return {&elements, offsets[row_num] - offset, offset};
}
Slice getSliceFromRight(size_t offset, size_t length) const
{
size_t elem_size = offsets[row_num] - prev_offset;
if (offset >= elem_size)
return {&elements, prev_offset, 0};
if (offset > elem_size)
return {&elements, prev_offset, elem_size};
return {&elements, offsets[row_num] - offset, std::min(length, offset)};
}
};

View File

@ -4,7 +4,7 @@ add_executable (number_traits number_traits.cpp)
target_link_libraries (number_traits dbms)
add_executable (functions_arithmetic functions_arithmetic.cpp)
target_link_libraries (functions_arithmetic dbms)
target_link_libraries (functions_arithmetic dbms clickhouse_functions)
add_executable (logical_functions_performance logical_functions_performance.cpp)
target_link_libraries (logical_functions_performance dbms)

View File

@ -315,13 +315,13 @@ public:
};
struct NameAnd { static const char * get() { return "and"; } };
struct NameOr { static const char * get() { return "or"; } };
struct NameXor { static const char * get() { return "xor"; } };
struct NameAnd { static const char * get() { return "and"; } };
struct NameOr { static const char * get() { return "or"; } };
struct NameXor { static const char * get() { return "xor"; } };
using FunctionAnd = FunctionAnyArityLogical <AndImpl, NameAnd>;
using FunctionOr = FunctionAnyArityLogical <OrImpl, NameOr> ;
using FunctionXor = FunctionAnyArityLogical <XorImpl, NameXor>;
using FunctionAnd = FunctionAnyArityLogical<AndImpl, NameAnd>;
using FunctionOr = FunctionAnyArityLogical<OrImpl, NameOr> ;
using FunctionXor = FunctionAnyArityLogical<XorImpl, NameXor>;
}
using namespace DB;
@ -331,7 +331,7 @@ int main(int argc, char ** argv)
{
try
{
size_t block_size = 1 << 20;
size_t block_size = 1ULL << 20;
if (argc > 1)
{
block_size = atoi(argv[1]);

View File

@ -0,0 +1,42 @@
#include <IO/LimitReadBuffer.h>
namespace DB
{
bool LimitReadBuffer::nextImpl()
{
/// Let underlying buffer calculate read bytes in `next()` call.
in.position() = position();
if (bytes >= limit || !in.next())
return false;
working_buffer = in.buffer();
if (limit - bytes < working_buffer.size())
working_buffer.resize(limit - bytes);
return true;
}
LimitReadBuffer::LimitReadBuffer(ReadBuffer & in_, size_t limit_)
: ReadBuffer(in_.position(), 0), in(in_), limit(limit_)
{
size_t remaining_bytes_in_buffer = in.buffer().end() - in.position();
if (remaining_bytes_in_buffer > limit)
remaining_bytes_in_buffer = limit;
working_buffer = Buffer(in.position(), in.position() + remaining_bytes_in_buffer);
}
LimitReadBuffer::~LimitReadBuffer()
{
/// Update underlying buffer's position in case when limit wasn't reached.
if (working_buffer.size() != 0)
in.position() = position();
}
}

View File

@ -1,12 +1,13 @@
#pragma once
#include <cstddef>
#include <IO/ReadBuffer.h>
namespace DB
{
/** Lets read from another ReadBuffer no more than the specified number of bytes.
/** Allows to read from another ReadBuffer no more than the specified number of bytes.
*/
class LimitReadBuffer : public ReadBuffer
{
@ -14,20 +15,11 @@ private:
ReadBuffer & in;
size_t limit;
bool nextImpl() override
{
if (count() >= limit || !in.next())
return false;
working_buffer = in.buffer();
if (limit - count() < working_buffer.size())
working_buffer.resize(limit - count());
return true;
}
bool nextImpl() override;
public:
LimitReadBuffer(ReadBuffer & in_, size_t limit_) : ReadBuffer(nullptr, 0), in(in_), limit(limit_) {}
LimitReadBuffer(ReadBuffer & in_, size_t limit_);
~LimitReadBuffer() override;
};
}

View File

@ -26,7 +26,7 @@ void parseHex(IteratorSrc src, IteratorDst dst, const size_t num_bytes)
size_t dst_pos = 0;
for (; dst_pos < num_bytes; ++dst_pos)
{
dst[dst_pos] = unhex(src[src_pos]) * 16 + unhex(src[src_pos + 1]);
dst[dst_pos] = UInt8(unhex(src[src_pos])) * 16 + UInt8(unhex(src[src_pos + 1]));
src_pos += 2;
}
}
@ -170,13 +170,10 @@ void readStringInto(Vector & s, ReadBuffer & buf)
{
while (!buf.eof())
{
size_t bytes = 0;
for (; buf.position() + bytes != buf.buffer().end(); ++bytes)
if (buf.position()[bytes] == '\t' || buf.position()[bytes] == '\n')
break;
const char * next_pos = find_first_symbols<'\t', '\n'>(buf.position(), buf.buffer().end());
appendToStringOrVector(s, buf.position(), buf.position() + bytes);
buf.position() += bytes;
appendToStringOrVector(s, buf.position(), next_pos);
buf.position() += next_pos - buf.position(); /// Code looks complicated, because "buf.position() = next_pos" doens't work due to const-ness.
if (buf.hasPendingData())
return;
@ -229,12 +226,10 @@ static void parseComplexEscapeSequence(Vector & s, ReadBuffer & buf)
if (*buf.position() == 'x')
{
++buf.position();
/// escape sequence of the form \ xAA
UInt8 c1;
UInt8 c2;
readPODBinary(c1, buf);
readPODBinary(c2, buf);
s.push_back(static_cast<char>(unhex(c1) * 16 + unhex(c2)));
/// escape sequence of the form \xAA
char hex_code[2];
readPODBinary(hex_code, buf);
s.push_back(unhex2(hex_code));
}
else if (*buf.position() == 'N')
{
@ -250,15 +245,23 @@ static void parseComplexEscapeSequence(Vector & s, ReadBuffer & buf)
}
/// TODO Compute with the code in FunctionsVisitParam.h and JSON.h
template <typename Vector>
static void parseJSONEscapeSequence(Vector & s, ReadBuffer & buf)
template <typename Vector, typename ReturnType>
static ReturnType parseJSONEscapeSequence(Vector & s, ReadBuffer & buf)
{
static constexpr bool throw_exception = std::is_same<ReturnType, void>::value;
auto error = [](const char * message, int code)
{
if (throw_exception)
throw Exception(message, code);
return ReturnType(false);
};
++buf.position();
if (buf.eof())
throw Exception("Cannot parse escape sequence", ErrorCodes::CANNOT_PARSE_ESCAPE_SEQUENCE);
return error("Cannot parse escape sequence", ErrorCodes::CANNOT_PARSE_ESCAPE_SEQUENCE);
switch(*buf.position())
switch (*buf.position())
{
case '"':
s.push_back('"');
@ -289,26 +292,23 @@ static void parseJSONEscapeSequence(Vector & s, ReadBuffer & buf)
++buf.position();
char hex_code[4];
readPODBinary(hex_code, buf);
if (4 != buf.read(hex_code, 4))
return error("Cannot parse escape sequence: less than four bytes after \\u", ErrorCodes::CANNOT_PARSE_ESCAPE_SEQUENCE);
/// \u0000 - special case
if (0 == memcmp(hex_code, "0000", 4))
if (0 == memcmp(hex_code, "0000", 4))
{
s.push_back(0);
return;
return ReturnType(true);
}
UInt16 code_point =
unhex(hex_code[0]) * 4096
+ unhex(hex_code[1]) * 256
+ unhex(hex_code[2]) * 16
+ unhex(hex_code[3]);
UInt16 code_point = unhex4(hex_code);
if (code_point <= 0x7F)
{
s.push_back(code_point);
}
else if (code_point <= 0x7FF)
else if (code_point <= 0x07FF)
{
s.push_back(((code_point >> 6) & 0x1F) | 0xC0);
s.push_back((code_point & 0x3F) | 0x80);
@ -318,15 +318,15 @@ static void parseJSONEscapeSequence(Vector & s, ReadBuffer & buf)
/// Surrogate pair.
if (code_point >= 0xD800 && code_point <= 0xDBFF)
{
assertString("\\u", buf);
char second_hex_code[4];
readPODBinary(second_hex_code, buf);
if (!checkString("\\u", buf))
return error("Cannot parse escape sequence: missing second part of surrogate pair", ErrorCodes::CANNOT_PARSE_ESCAPE_SEQUENCE);
UInt16 second_code_point =
unhex(second_hex_code[0]) * 4096
+ unhex(second_hex_code[1]) * 256
+ unhex(second_hex_code[2]) * 16
+ unhex(second_hex_code[3]);
char second_hex_code[4];
if (4 != buf.read(second_hex_code, 4))
return error("Cannot parse escape sequence: less than four bytes after \\u of second part of surrogate pair",
ErrorCodes::CANNOT_PARSE_ESCAPE_SEQUENCE);
UInt16 second_code_point = unhex4(second_hex_code);
if (second_code_point >= 0xDC00 && second_code_point <= 0xDFFF)
{
@ -338,7 +338,7 @@ static void parseJSONEscapeSequence(Vector & s, ReadBuffer & buf)
s.push_back((full_code_point & 0x3F) | 0x80);
}
else
throw Exception("Incorrect surrogate pair of unicode escape sequences in JSON", ErrorCodes::CANNOT_PARSE_ESCAPE_SEQUENCE);
return error("Incorrect surrogate pair of unicode escape sequences in JSON", ErrorCodes::CANNOT_PARSE_ESCAPE_SEQUENCE);
}
else
{
@ -348,7 +348,7 @@ static void parseJSONEscapeSequence(Vector & s, ReadBuffer & buf)
}
}
return;
return ReturnType(true);
}
default:
s.push_back(*buf.position());
@ -356,6 +356,7 @@ static void parseJSONEscapeSequence(Vector & s, ReadBuffer & buf)
}
++buf.position();
return ReturnType(true);
}
@ -581,12 +582,20 @@ void readCSVString(String & s, ReadBuffer & buf, const char delimiter)
template void readCSVStringInto<PaddedPODArray<UInt8>>(PaddedPODArray<UInt8> & s, ReadBuffer & buf, const char delimiter);
template <typename Vector>
void readJSONStringInto(Vector & s, ReadBuffer & buf)
template <typename Vector, typename ReturnType>
ReturnType readJSONStringInto(Vector & s, ReadBuffer & buf)
{
static constexpr bool throw_exception = std::is_same<ReturnType, void>::value;
auto error = [](const char * message, int code)
{
if (throw_exception)
throw Exception(message, code);
return ReturnType(false);
};
if (buf.eof() || *buf.position() != '"')
throw Exception("Cannot parse JSON string: expected opening quote",
ErrorCodes::CANNOT_PARSE_QUOTED_STRING);
return error("Cannot parse JSON string: expected opening quote", ErrorCodes::CANNOT_PARSE_QUOTED_STRING);
++buf.position();
while (!buf.eof())
@ -602,15 +611,14 @@ void readJSONStringInto(Vector & s, ReadBuffer & buf)
if (*buf.position() == '"')
{
++buf.position();
return;
return ReturnType(true);
}
if (*buf.position() == '\\')
parseJSONEscapeSequence(s, buf);
parseJSONEscapeSequence<Vector, ReturnType>(s, buf);
}
throw Exception("Cannot parse JSON string: expected closing quote",
ErrorCodes::CANNOT_PARSE_QUOTED_STRING);
return error("Cannot parse JSON string: expected closing quote", ErrorCodes::CANNOT_PARSE_QUOTED_STRING);
}
void readJSONString(String & s, ReadBuffer & buf)
@ -619,7 +627,8 @@ void readJSONString(String & s, ReadBuffer & buf)
readJSONStringInto(s, buf);
}
template void readJSONStringInto<PaddedPODArray<UInt8>>(PaddedPODArray<UInt8> & s, ReadBuffer & buf);
template void readJSONStringInto<PaddedPODArray<UInt8>, void>(PaddedPODArray<UInt8> & s, ReadBuffer & buf);
template bool readJSONStringInto<PaddedPODArray<UInt8>, bool>(PaddedPODArray<UInt8> & s, ReadBuffer & buf);
template void readJSONStringInto<NullSink>(NullSink & s, ReadBuffer & buf);

View File

@ -494,7 +494,7 @@ inline void readFloatText(T & x, ReadBuffer & buf)
readFloatTextImpl<T, void>(x, buf);
}
/// rough; all until '\n' or '\t'
/// simple: all until '\n' or '\t'
void readString(String & s, ReadBuffer & buf);
void readEscapedString(String & s, ReadBuffer & buf);
@ -549,8 +549,15 @@ void readStringUntilEOFInto(Vector & s, ReadBuffer & buf);
template <typename Vector>
void readCSVStringInto(Vector & s, ReadBuffer & buf, const char delimiter = ',');
/// ReturnType is either bool or void. If bool, the function will return false instead of throwing an exception.
template <typename Vector, typename ReturnType = void>
ReturnType readJSONStringInto(Vector & s, ReadBuffer & buf);
template <typename Vector>
void readJSONStringInto(Vector & s, ReadBuffer & buf);
bool tryReadJSONStringInto(Vector & s, ReadBuffer & buf)
{
return readJSONStringInto<Vector, bool>(s, buf);
}
/// This could be used as template parameter for functions above, if you want to just skip data.
struct NullSink
@ -622,7 +629,7 @@ void readDateTimeTextFallback(time_t & datetime, ReadBuffer & buf, const DateLUT
/** In YYYY-MM-DD hh:mm:ss format, according to specified time zone.
* As an exception, also supported parsing of unix timestamp in form of decimal number.
*/
inline void readDateTimeText(time_t & datetime, ReadBuffer & buf, const DateLUTImpl & date_lut = DateLUT::instance())
inline void readDateTimeText(time_t & datetime, ReadBuffer & buf, const DateLUTImpl & date_lut)
{
/** Read 10 characters, that could represent unix timestamp.
* Only unix timestamp of 5-10 characters is supported.
@ -659,6 +666,11 @@ inline void readDateTimeText(time_t & datetime, ReadBuffer & buf, const DateLUTI
readDateTimeTextFallback(datetime, buf, date_lut);
}
inline void readDateTimeText(time_t & datetime, ReadBuffer & buf)
{
readDateTimeText(datetime, buf, DateLUT::instance());
}
inline void readDateTimeText(LocalDateTime & datetime, ReadBuffer & buf)
{
char s[19];
@ -760,7 +772,7 @@ inline void readDoubleQuoted(LocalDateTime & x, ReadBuffer & buf)
/// CSV, for numbers, dates, datetimes: quotes are optional, no special escaping rules.
template <typename T>
inline void readCSVSimple(T & x, ReadBuffer & buf)
inline void readCSVSimple(T & x, ReadBuffer & buf, void (*readText_)(T & x, ReadBuffer & buf) = readText)
{
if (buf.eof())
throwReadAfterEOF();
@ -770,7 +782,7 @@ inline void readCSVSimple(T & x, ReadBuffer & buf)
if (maybe_quote == '\'' || maybe_quote == '\"')
++buf.position();
readText(x, buf);
readText_(x, buf);
if (maybe_quote == '\'' || maybe_quote == '\"')
assertChar(maybe_quote, buf);

Some files were not shown because too many files have changed in this diff Show More